1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""A2DP proxy module."""
16
17import time
18from typing import Optional
19
20from grpc import RpcError
21
22from mmi2grpc._audio import AudioSignal
23from mmi2grpc._helpers import assert_description
24from mmi2grpc._proxy import ProfileProxy
25from pandora.a2dp_grpc import A2DP
26from pandora.a2dp_pb2 import Sink, Source, PlaybackAudioRequest
27from pandora.host_grpc import Host
28from pandora.host_pb2 import Connection
29
30AUDIO_SIGNAL_AMPLITUDE = 0.8
31AUDIO_SIGNAL_SAMPLING_RATE = 44100
32
33
34class A2DPProxy(ProfileProxy):
35    """A2DP proxy.
36
37    Implements A2DP and AVDTP PTS MMIs.
38    """
39
40    connection: Optional[Connection] = None
41    sink: Optional[Sink] = None
42    source: Optional[Source] = None
43
44    def __init__(self, channel):
45        super().__init__()
46
47        self.host = Host(channel)
48        self.a2dp = A2DP(channel)
49
50        def convert_frame(data):
51            return PlaybackAudioRequest(data=data, source=self.source)
52        self.audio = AudioSignal(
53            lambda frames: self.a2dp.PlaybackAudio(map(convert_frame, frames)),
54            AUDIO_SIGNAL_AMPLITUDE,
55            AUDIO_SIGNAL_SAMPLING_RATE
56        )
57
58    @assert_description
59    def TSC_AVDTP_mmi_iut_accept_connect(
60            self, test: str, pts_addr: bytes, **kwargs):
61        """
62        If necessary, take action to accept the AVDTP Signaling Channel
63        Connection initiated by the tester.
64
65        Description: Make sure the IUT
66        (Implementation Under Test) is in a state to accept incoming Bluetooth
67        connections.  Some devices may need to be on a specific screen, like a
68        Bluetooth settings screen, in order to pair with PTS.  If the IUT is
69        still having problems pairing with PTS, try running a test case where
70        the IUT connects to PTS to establish pairing.
71        """
72
73        if "SRC" in test:
74            self.connection = self.host.WaitConnection(
75                address=pts_addr).connection
76            try:
77                if "INT" in test:
78                    self.source = self.a2dp.OpenSource(
79                        connection=self.connection).source
80                else:
81                    self.source = self.a2dp.WaitSource(
82                        connection=self.connection).source
83            except RpcError:
84                pass
85        else:
86            self.connection = self.host.WaitConnection(
87                address=pts_addr).connection
88            try:
89                self.sink = self.a2dp.WaitSink(
90                    connection=self.connection).sink
91            except RpcError:
92                pass
93        return "OK"
94
95    @assert_description
96    def TSC_AVDTP_mmi_iut_initiate_discover(self, **kwargs):
97        """
98        Send a discover command to PTS.
99
100        Action: If the IUT (Implementation
101        Under Test) is already connected to PTS, attempting to send or receive
102        streaming media should trigger this action.  If the IUT is not connected
103        to PTS, attempting to connect may trigger this action.
104        """
105
106        return "OK"
107
108    @assert_description
109    def TSC_AVDTP_mmi_iut_initiate_start(self, test: str, **kwargs):
110        """
111        Send a start command to PTS.
112
113        Action: If the IUT (Implementation Under
114        Test) is already connected to PTS, attempting to send or receive
115        streaming media should trigger this action.  If the IUT is not connected
116        to PTS, attempting to connect may trigger this action.
117        """
118
119        if "SRC" in test:
120            self.a2dp.Start(source=self.source)
121        else:
122            self.a2dp.Start(sink=self.sink)
123        return "OK"
124
125    @assert_description
126    def TSC_AVDTP_mmi_iut_initiate_suspend(self, test: str, **kwargs):
127        """
128        Suspend the streaming channel.
129        """
130
131        if "SRC" in test:
132            self.a2dp.Suspend(source=self.source)
133        else:
134            assert False
135        return "OK"
136
137    @assert_description
138    def TSC_AVDTP_mmi_iut_initiate_close_stream(self, test: str, **kwargs):
139        """
140        Close the streaming channel.
141
142        Action: Disconnect the streaming channel,
143        or close the Bluetooth connection to the PTS.
144        """
145
146        if "SRC" in test:
147            self.a2dp.Close(source=self.source)
148            self.source = None
149        else:
150            self.a2dp.Close(sink=self.sink)
151            self.sink = None
152        return "OK"
153
154    @assert_description
155    def TSC_AVDTP_mmi_iut_initiate_out_of_range(
156            self, pts_addr: bytes, **kwargs):
157        """
158        Move the IUT out of range to create a link loss scenario.
159
160        Action: This
161        can be also be done by placing the IUT or PTS in an RF shielded box.
162         """
163
164        if self.connection is None:
165            self.connection = self.host.GetConnection(
166                address=pts_addr).connection
167        self.host.Disconnect(connection=self.connection)
168        self.connection = None
169        self.sink = None
170        self.source = None
171        return "OK"
172
173    @assert_description
174    def TSC_AVDTP_mmi_iut_begin_streaming(self, test: str, **kwargs):
175        """
176        Begin streaming media ...
177
178        Note: If the IUT has suspended the stream
179        please restart the stream to begin streaming media.
180        """
181
182        if test == "AVDTP/SRC/ACP/SIG/SMG/BI-29-C":
183            time.sleep(2)  # TODO: Remove, AVRCP SegFault
184        if test in ("A2DP/SRC/CC/BV-09-I",
185                    "A2DP/SRC/SET/BV-04-I",
186                    "AVDTP/SRC/ACP/SIG/SMG/BV-18-C",
187                    "AVDTP/SRC/ACP/SIG/SMG/BV-20-C",
188                    "AVDTP/SRC/ACP/SIG/SMG/BV-22-C"):
189            time.sleep(1)  # TODO: Remove, AVRCP SegFault
190        if test == "A2DP/SRC/SUS/BV-01-I":
191            # Stream is not suspended when we receive the interaction
192            time.sleep(1)
193
194        self.a2dp.Start(source=self.source)
195        self.audio.start()
196        return "OK"
197
198    @assert_description
199    def TSC_AVDTP_mmi_iut_initiate_media(self, **kwargs):
200        """
201        Take action if necessary to start streaming media to the tester.
202        """
203
204        self.audio.start()
205        return "OK"
206
207    @assert_description
208    def TSC_AVDTP_mmi_iut_initiate_stream_media(self, **kwargs):
209        """
210        Stream media to PTS.  If the IUT is a SNK, wait for PTS to start
211        streaming media.
212
213        Action: If the IUT (Implementation Under Test) is
214        already connected to PTS, attempting to send or receive streaming media
215        should trigger this action.  If the IUT is not connected to PTS,
216        attempting to connect may trigger this action.
217        """
218
219        self.audio.start()
220        return "OK"
221
222    @assert_description
223    def TSC_AVDTP_mmi_user_verify_media_playback(self, **kwargs):
224        """
225        Is the test system properly playing back the media being sent by the
226        IUT?
227        """
228
229        result = self.audio.verify()
230        assert result
231
232        return "Yes" if result else "No"
233
234    @assert_description
235    def TSC_AVDTP_mmi_iut_initiate_get_capabilities(self, **kwargs):
236        """
237        Send a get capabilities command to PTS.
238
239        Action: If the IUT
240        (Implementation Under Test) is already connected to PTS, attempting to
241        send or receive streaming media should trigger this action.  If the IUT
242        is not connected to PTS, attempting to connect may trigger this action.
243        """
244
245        # This will be done as part as the a2dp.OpenSource or a2dp.WaitSource
246        return "OK"
247
248    @assert_description
249    def TSC_AVDTP_mmi_iut_accept_discover(self, **kwargs):
250        """
251        If necessary, take action to accept the AVDTP Discover operation
252        initiated by the tester.
253        """
254
255        return "OK"
256
257    @assert_description
258    def TSC_AVDTP_mmi_iut_initiate_set_configuration(self, **kwargs):
259        """
260        Send a set configuration command to PTS.
261
262        Action: If the IUT
263        (Implementation Under Test) is already connected to PTS, attempting to
264        send or receive streaming media should trigger this action.  If the IUT
265        is not connected to PTS, attempting to connect may trigger this action.
266        """
267
268        return "OK"
269
270    @assert_description
271    def TSC_AVDTP_mmi_iut_accept_close_stream(self, **kwargs):
272        """
273        If necessary, take action to accept the AVDTP Close operation initiated
274        by the tester.
275        """
276
277        return "OK"
278
279    @assert_description
280    def TSC_AVDTP_mmi_iut_accept_abort(self, **kwargs):
281        """
282        If necessary, take action to accept the AVDTP Abort operation initiated
283        by the tester..
284        """
285
286        return "OK"
287
288    @assert_description
289    def TSC_AVDTP_mmi_iut_accept_get_all_capabilities(self, **kwargs):
290        """
291        If necessary, take action to accept the AVDTP Get All Capabilities
292        operation initiated by the tester.
293        """
294
295        return "OK"
296
297    @assert_description
298    def TSC_AVDTP_mmi_iut_accept_get_capabilities(self, **kwargs):
299        """
300        If necessary, take action to accept the AVDTP Get Capabilities operation
301        initiated by the tester.
302        """
303
304        return "OK"
305
306    @assert_description
307    def TSC_AVDTP_mmi_iut_accept_set_configuration(self, **kwargs):
308        """
309        If necessary, take action to accept the AVDTP Set Configuration
310        operation initiated by the tester.
311        """
312
313        return "OK"
314
315    @assert_description
316    def TSC_AVDTP_mmi_iut_accept_get_configuration(self, **kwargs):
317        """
318        Take action to accept the AVDTP Get Configuration command from the
319        tester.
320        """
321
322        return "OK"
323
324    @assert_description
325    def TSC_AVDTP_mmi_iut_accept_open_stream(self, **kwargs):
326        """
327        If necessary, take action to accept the AVDTP Open operation initiated
328        by the tester.
329        """
330
331        return "OK"
332
333    @assert_description
334    def TSC_AVDTP_mmi_iut_accept_start(self, **kwargs):
335        """
336        If necessary, take action to accept the AVDTP Start operation initiated
337        by the tester.
338        """
339
340        return "OK"
341
342    @assert_description
343    def TSC_AVDTP_mmi_iut_accept_suspend(self, **kwargs):
344        """
345        If necessary, take action to accept the AVDTP Suspend operation
346        initiated by the tester.
347        """
348
349        return "OK"
350
351    @assert_description
352    def TSC_AVDTP_mmi_iut_accept_reconfigure(self, **kwargs):
353        """
354        If necessary, take action to accept the AVDTP Reconfigure operation
355        initiated by the tester.
356        """
357
358        return "OK"
359
360    @assert_description
361    def TSC_AVDTP_mmi_iut_accept_media_transports(self, **kwargs):
362        """
363        Take action to accept transport channels for the recently configured
364        media stream.
365        """
366
367        return "OK"
368
369    @assert_description
370    def TSC_AVDTP_mmi_iut_confirm_streaming(self, **kwargs):
371        """
372        Is the IUT (Implementation Under Test) receiving streaming media from
373        PTS?
374
375        Action: Press 'Yes' if the IUT is receiving streaming data from
376        the PTS (in some cases the sound may not be clear, this is normal).
377        """
378
379        # TODO: verify
380        return "OK"
381
382    @assert_description
383    def TSC_AVDTP_mmi_iut_initiate_open_stream(self, **kwargs):
384        """
385        Open a streaming media channel.
386
387        Action: If the IUT (Implementation
388        Under Test) is already connected to PTS, attempting to send or receive
389        streaming media should trigger this action.  If the IUT is not connected
390        to PTS, attempting to connect may trigger this action.
391        """
392
393        return "OK"
394
395    @assert_description
396    def TSC_AVDTP_mmi_iut_accept_reconnect(self, pts_addr: bytes, **kwargs):
397        """
398        Press OK when the IUT (Implementation Under Test) is ready to allow the
399        PTS to reconnect the AVDTP signaling channel.
400
401        Action: Press OK when the
402        IUT is ready to accept Bluetooth connections again.
403        """
404
405        return "OK"
406
407    @assert_description
408    def TSC_AVDTP_mmi_iut_initiate_get_all_capabilities(self, **kwargs):
409        """
410        Send a GET ALL CAPABILITIES command to PTS.
411
412        Action: If the IUT
413        (Implementation Under Test) is already connected to PTS, attempting to
414        send or receive streaming media should trigger this action.  If the IUT
415        is not connected to PTS, attempting to connect may trigger this action.
416        """
417
418        return "OK"
419
420    @assert_description
421    def TSC_AVDTP_mmi_tester_verifying_suspend(self, **kwargs):
422        """
423        Please wait while the tester verifies the IUT does not send media during
424        suspend ...
425        """
426
427        return "Yes"
428
429    @assert_description
430    def TSC_A2DP_mmi_user_confirm_optional_data_attribute(self, **kwargs):
431        """
432        Tester found the optional SDP attribute named 'Supported Features'.
433        Press 'Yes' if the data displayed below is correct.
434
435        Value: 0x0001
436        """
437
438        # TODO: Extract and verify attribute name and value from description
439        return "OK"
440
441    @assert_description
442    def TSC_A2DP_mmi_user_confirm_optional_string_attribute(self, **kwargs):
443        """
444        Tester found the optional SDP attribute named 'Service Name'.  Press
445        'Yes' if the string displayed below is correct.
446
447        Value: Advanced Audio
448        Source
449        """
450
451        # TODO: Extract and verify attribute name and value from description
452        return "OK"
453
454    @assert_description
455    def TSC_A2DP_mmi_user_confirm_no_optional_attribute_support(self, **kwargs):
456        """
457        Tester could not find the optional SDP attribute named 'Provider Name'.
458        Is this correct?
459        """
460
461        # TODO: Extract and verify attribute name from description
462        return "OK"
463
464    @assert_description
465    def TSC_AVDTPEX_mmi_iut_accept_delayreport(self, **kwargs):
466        """
467        Take action if necessary to accept the Delay Reportl command from the
468        tester.
469        """
470
471        return "OK"
472
473    @assert_description
474    def TSC_AVDTPEX_mmi_iut_initiate_media_transport_connect(self, **kwargs):
475        """
476        Take action to initiate an AVDTP media transport.
477        """
478
479        return "OK"
480
481    @assert_description
482    def TSC_AVDTPEX_mmi_user_confirm_SIG_SMG_BV_28_C(self, **kwargs):
483        """
484        Were all the service capabilities reported to the upper tester valid?
485        """
486
487        # TODO: verify
488        return "Yes"
489
490    @assert_description
491    def TSC_AVDTPEX_mmi_iut_reject_invalid_command(self, **kwargs):
492        """
493        Take action to reject the invalid command sent by the tester.
494        """
495
496        return "OK"
497
498    @assert_description
499    def TSC_AVDTPEX_mmi_iut_reject_open(self, **kwargs):
500        """
501        Take action to reject the invalid OPEN command sent by the tester.
502        """
503
504        return "OK"
505
506    @assert_description
507    def TSC_AVDTPEX_mmi_iut_reject_start(self, **kwargs):
508        """
509        Take action to reject the invalid START command sent by the tester.
510        """
511
512        return "OK"
513
514    @assert_description
515    def TSC_AVDTPEX_mmi_iut_reject_suspend(self, **kwargs):
516        """
517        Take action to reject the invalid SUSPEND command sent by the tester.
518        """
519
520        return "OK"
521
522    @assert_description
523    def TSC_AVDTPEX_mmi_iut_reject_reconfigure(self, **kwargs):
524        """
525        Take action to reject the invalid or incompatible RECONFIGURE command
526        sent by the tester.
527        """
528
529        return "OK"
530
531    @assert_description
532    def TSC_AVDTPEX_mmi_iut_reject_get_all_capabilities(self, **kwargs):
533        """
534        Take action to reject the invalid GET ALL CAPABILITIES command with the
535        error code BAD_LENGTH.
536        """
537
538        return "OK"
539
540    @assert_description
541    def TSC_AVDTPEX_mmi_iut_reject_get_capabilities(self, **kwargs):
542        """
543        Take action to reject the invalid GET CAPABILITIES command with the
544        error code BAD_LENGTH.
545        """
546
547        return "OK"
548
549    @assert_description
550    def TSC_AVDTPEX_mmi_iut_reject_set_configuration(self, **kwargs):
551        """
552        Take action to reject the SET CONFIGURATION sent by the tester.  The IUT
553        is expected to respond with SEP_IN_USE because the SEP requested was
554        previously configured.
555        """
556
557        return "OK"
558
559    def TSC_AVDTPEX_mmi_iut_reject_get_configuration(self, **kwargs):
560        """
561        Take action to reject the GET CONFIGURATION sent by the tester.  The IUT
562        is expected to respond with BAD_ACP_SEID because the SEID requested was
563        not previously configured.
564        """
565
566        return "OK"
567
568    @assert_description
569    def TSC_AVDTPEX_mmi_iut_reject_close(self, **kwargs):
570        """
571        Take action to reject the invalid CLOSE command sent by the tester.
572        """
573
574        return "OK"
575
576    @assert_description
577    def TSC_AVDTPEX_mmi_user_confirm_SIG_SMG_BV_18_C(self, **kwargs):
578        """
579        Did the IUT receive media with the following information?
580
581        - V = RTP_Ver
582        - P = 0 (no padding bits)
583        - X = 0 (no extension)
584        - CC = 0 (no
585        contributing source)
586        - M = 0
587        """
588
589        # TODO: verify
590        return "OK"
591