1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import avatar
17import dataclasses
18import itertools
19import logging
20import numpy as np
21
22from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, pandora
23from avatar.pandora_server import AndroidPandoraServer
24import bumble
25from bumble.avctp import AVCTP_PSM
26from bumble.a2dp import (
27    A2DP_MPEG_2_4_AAC_CODEC_TYPE,
28    MPEG_2_AAC_LC_OBJECT_TYPE,
29    A2DP_SBC_CODEC_TYPE,
30    SBC_DUAL_CHANNEL_MODE,
31    SBC_JOINT_STEREO_CHANNEL_MODE,
32    SBC_LOUDNESS_ALLOCATION_METHOD,
33    SBC_MONO_CHANNEL_MODE,
34    SBC_SNR_ALLOCATION_METHOD,
35    SBC_STEREO_CHANNEL_MODE,
36    AacMediaCodecInformation,
37    SbcMediaCodecInformation,
38    make_audio_sink_service_sdp_records,
39)
40from bumble.avdtp import (AVDTP_AUDIO_MEDIA_TYPE, AVDTP_OPEN_STATE, AVDTP_PSM, AVDTP_STREAMING_STATE, AVDTP_IDLE_STATE,
41                          AVDTP_CLOSING_STATE, Listener, MediaCodecCapabilities, Protocol, AVDTP_BAD_STATE_ERROR,
42                          Suspend_Reject)
43from bumble.l2cap import (ChannelManager, ClassicChannel, ClassicChannelSpec, L2CAP_Configure_Request,
44                          L2CAP_Connection_Response, L2CAP_SIGNALING_CID)
45from bumble.pairing import PairingDelegate
46from mobly import base_test, test_runner
47from mobly.asserts import assert_equal  # type: ignore
48from mobly.asserts import assert_in  # type: ignore
49from mobly.asserts import assert_is_not_none  # type: ignore
50from mobly.asserts import fail  # type: ignore
51from pandora.a2dp_grpc_aio import A2DP
52from pandora.a2dp_pb2 import PlaybackAudioRequest, Source, Configuration, STEREO
53from pandora.host_pb2 import Connection
54from pandora.security_pb2 import LEVEL2
55from typing import Optional, Tuple
56
57logger = logging.getLogger(__name__)
58
59AVRCP_CONNECT_A2DP_WITH_DELAY = 'com.android.bluetooth.flags.avrcp_connect_a2dp_with_delay'
60AVDTP_HANDLE_SUSPEND_CFM_BAD_STATE = 'com.android.bluetooth.flags.avdt_handle_suspend_cfm_bad_state'
61
62
63async def initiate_pairing(device, address) -> Connection:
64    """Connect and pair a remote device."""
65
66    result = await device.aio.host.Connect(address=address)
67    connection = result.connection
68    assert connection
69
70    bond = await device.aio.security.Secure(connection=connection, classic=LEVEL2)
71    assert bond.success
72
73    return connection
74
75
76async def accept_pairing(device, address) -> Connection:
77    """Accept connection and pairing from a remote device."""
78
79    result = await device.aio.host.WaitConnection(address=address)
80    connection = result.connection
81    assert connection
82
83    bond = await device.aio.security.WaitSecurity(connection=connection, classic=LEVEL2)
84    assert bond.success
85
86    return connection
87
88
89async def open_source(device, connection) -> Source:
90    """Initiate AVDTP connection from Android device."""
91
92    result = await device.a2dp.OpenSource(connection=connection)
93    source = result.source
94    assert source
95
96    return source
97
98
99def sbc_codec_capabilities() -> MediaCodecCapabilities:
100    """Codec capabilities for the Bumble sink devices."""
101
102    return MediaCodecCapabilities(
103        media_type=AVDTP_AUDIO_MEDIA_TYPE,
104        media_codec_type=A2DP_SBC_CODEC_TYPE,
105        media_codec_information=SbcMediaCodecInformation.from_lists(
106            sampling_frequencies=[48000, 44100, 32000, 16000],
107            channel_modes=[
108                SBC_MONO_CHANNEL_MODE,
109                SBC_DUAL_CHANNEL_MODE,
110                SBC_STEREO_CHANNEL_MODE,
111                SBC_JOINT_STEREO_CHANNEL_MODE,
112            ],
113            block_lengths=[4, 8, 12, 16],
114            subbands=[4, 8],
115            allocation_methods=[
116                SBC_LOUDNESS_ALLOCATION_METHOD,
117                SBC_SNR_ALLOCATION_METHOD,
118            ],
119            minimum_bitpool_value=2,
120            maximum_bitpool_value=53,
121        ),
122    )
123
124
125def aac_codec_capabilities() -> MediaCodecCapabilities:
126    """Codec capabilities for the Bumble sink devices."""
127
128    return MediaCodecCapabilities(
129        media_type=AVDTP_AUDIO_MEDIA_TYPE,
130        media_codec_type=A2DP_MPEG_2_4_AAC_CODEC_TYPE,
131        media_codec_information=AacMediaCodecInformation.from_lists(
132            object_types=[MPEG_2_AAC_LC_OBJECT_TYPE],
133            sampling_frequencies=[48000, 44100],
134            channels=[1, 2],
135            vbr=1,
136            bitrate=256000,
137        ),
138    )
139
140
141class AudioSignal:
142    """Audio signal generator and verifier."""
143
144    SINE_FREQUENCY = 440
145    SINE_DURATION = 0.1
146
147    def __init__(self, a2dp: A2DP, source: Source, amplitude, fs):
148        """Init AudioSignal class.
149
150        Args:
151            a2dp: A2DP profile interface.
152            source: Source connection object to send the data to.
153            amplitude: amplitude of the signal to generate.
154            fs: sampling rate of the signal to generate.
155        """
156        self.a2dp = a2dp
157        self.source = source
158        self.amplitude = amplitude
159        self.fs = fs
160        self.task = None
161
162    def start(self):
163        """Generates the audio signal and send it to the transport."""
164        self.task = asyncio.create_task(self._run())
165
166    async def _run(self):
167        sine = self._generate_sine(self.SINE_FREQUENCY, self.SINE_DURATION)
168
169        # Interleaved audio.
170        stereo = np.zeros(sine.size * 2, dtype=sine.dtype)
171        stereo[0::2] = sine
172
173        # Send 4 second of audio.
174        audio = itertools.repeat(stereo.tobytes(), int(4 / self.SINE_DURATION))
175
176        for frame in audio:
177            await self.a2dp.PlaybackAudio(PlaybackAudioRequest(data=frame, source=self.source))
178
179    def _generate_sine(self, f, duration):
180        sine = self.amplitude * np.sin(2 * np.pi * np.arange(self.fs * duration) * (f / self.fs))
181        s16le = (sine * 32767).astype('<i2')
182        return s16le
183
184
185class A2dpTest(base_test.BaseTestClass):  # type: ignore[misc]
186    """A2DP test suite."""
187
188    devices: Optional[PandoraDevices] = None
189
190    # pandora devices.
191    dut: PandoraDevice
192    ref1: PandoraDevice
193    ref2: PandoraDevice
194
195    @avatar.asynchronous
196    async def setup_class(self) -> None:
197        self.devices = PandoraDevices(self)
198        self.dut, self.ref1, self.ref2, *_ = self.devices
199
200        if not isinstance(self.ref1, BumblePandoraDevice):
201            raise signals.TestAbortClass('Test require Bumble as reference device(s)')
202        if not isinstance(self.ref2, BumblePandoraDevice):
203            raise signals.TestAbortClass('Test require Bumble as reference device(s)')
204
205        # Enable BR/EDR mode and SSP for Bumble devices.
206        for device in self.devices:
207            if isinstance(device, BumblePandoraDevice):
208                device.config.setdefault('classic_enabled', True)
209                device.config.setdefault('classic_ssp_enabled', True)
210                device.config.setdefault('classic_smp_enabled', False)
211                device.server_config.io_capability = PairingDelegate.NO_OUTPUT_NO_INPUT
212
213    def teardown_class(self) -> None:
214        if self.devices:
215            self.devices.stop_all()
216
217    @avatar.asynchronous
218    async def setup_test(self) -> None:
219        await asyncio.gather(self.dut.reset(), self.ref1.reset(), self.ref2.reset())
220
221        self.dut.a2dp = A2DP(channel=self.dut.aio.channel)
222
223        handle = 0x00010001
224        self.ref1.device.sdp_service_records = {handle: make_audio_sink_service_sdp_records(handle)}
225        self.ref2.device.sdp_service_records = {handle: make_audio_sink_service_sdp_records(handle)}
226
227        self.ref1.a2dp = Listener.for_device(self.ref1.device)
228        self.ref2.a2dp = Listener.for_device(self.ref2.device)
229        self.ref1.a2dp_sink = None
230        self.ref2.a2dp_sink = None
231
232        def on_ref1_avdtp_connection(server):
233            self.ref1.a2dp_sink = server.add_sink(sbc_codec_capabilities())
234
235        def on_ref2_avdtp_connection(server):
236            self.ref2.a2dp_sink = server.add_sink(sbc_codec_capabilities())
237            self.ref2.a2dp_sink = server.add_sink(aac_codec_capabilities())
238
239        self.ref1.a2dp.on('connection', on_ref1_avdtp_connection)
240        self.ref2.a2dp.on('connection', on_ref2_avdtp_connection)
241
242    @avatar.asynchronous
243    async def test_connect_and_stream(self) -> None:
244        """Basic A2DP connection and streaming test.
245        This test wants to be a template to be reused for other tests.
246
247        1. Pair and Connect RD1
248        2. Start streaming
249        3. Check AVDTP status on RD1
250        4. Stop streaming
251        5. Check AVDTP status on RD1
252        """
253        # Connect and pair RD1.
254        dut_ref1, ref1_dut = await asyncio.gather(
255            initiate_pairing(self.dut, self.ref1.address),
256            accept_pairing(self.ref1, self.dut.address),
257        )
258
259        # Connect AVDTP to RD1.
260        dut_ref1_source = await open_source(self.dut, dut_ref1)
261        assert_is_not_none(self.ref1.a2dp_sink)
262        assert_is_not_none(self.ref1.a2dp_sink.stream)
263        assert_in(self.ref1.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE])
264
265        # Start streaming to RD1.
266        await self.dut.a2dp.Start(source=dut_ref1_source)
267        audio = AudioSignal(self.dut.a2dp, dut_ref1_source, 0.8, 44100)
268        assert_equal(self.ref1.a2dp_sink.stream.state, AVDTP_STREAMING_STATE)
269
270        # Stop streaming to RD1.
271        await self.dut.a2dp.Suspend(source=dut_ref1_source)
272        assert_equal(self.ref1.a2dp_sink.stream.state, AVDTP_OPEN_STATE)
273
274    @avatar.asynchronous
275    async def test_avdtp_autoconnect_when_only_avctp_connected(self) -> None:
276        """Test AVDTP automatically connects if peer device connects only AVCTP.
277
278        1. Pair and Connect RD1 -> DUT
279        2. Connect AVCTP RD1 -> DUT
280        3. Check AVDTP status on RD1
281        """
282
283        # Enable AVRCP connect A2DP delayed feature
284        for server in self.devices._servers:
285            if isinstance(server, AndroidPandoraServer):
286                server.device.adb.shell(['device_config override bluetooth', AVRCP_CONNECT_A2DP_WITH_DELAY,
287                                         'true'])  # type: ignore
288                break
289
290        # Connect and pair RD1.
291        ref1_dut, dut_ref1 = await asyncio.gather(
292            initiate_pairing(self.ref1, self.dut.address),
293            accept_pairing(self.dut, self.ref1.address),
294        )
295
296        # Create a listener to wait for AVDTP connections
297        avdtp_future = asyncio.get_running_loop().create_future()
298
299        def on_avdtp_connection(server):
300            nonlocal avdtp_future
301            self.ref1.a2dp_sink = server.add_sink(sbc_codec_capabilities())
302            self.ref1.log.info(f'Sink: {self.ref1.a2dp_sink}')
303            avdtp_future.set_result(None)
304
305        self.ref1.a2dp.on('connection', on_avdtp_connection)
306
307        # Retrieve Bumble connection object from Pandora connection token
308        connection = pandora.get_raw_connection(device=self.ref1, connection=ref1_dut)
309
310        # Open AVCTP L2CAP channel
311        avctp = await connection.create_l2cap_channel(spec=ClassicChannelSpec(AVCTP_PSM))
312        self.ref1.log.info(f'AVCTP: {avctp}')
313
314        # Wait for AVDTP L2CAP channel
315        await asyncio.wait_for(avdtp_future, timeout=10.0)
316
317    @avatar.asynchronous
318    async def test_avdt_signaling_channel_connection_collision(self) -> None:
319        """Test AVDTP signaling channel connection collision.
320
321        Test steps after DUT and RD1 connected and paired:
322        1. RD1 connects DUT over AVDTP - first AVDTP signaling channel
323        2. AVDTP signaling channel configuration postponed until DUT tries to initiate AVDTP signaling channel connection
324        3. DUT tries connecting RD1 - collision simulated
325        4. RD1 rejects AVDTP signaling channel connection request from DUT
326        5. RD1 proceeds with first AVDTP signaling channel configuration
327        6. Channel established - collision avoided
328        """
329
330        @dataclasses.dataclass
331        class L2capConfigurationRequest:
332            connection: Optional[Connection] = None
333            cid: Optional[int] = None
334            request: Optional[L2CAP_Configure_Request] = None
335
336        global pending_configuration_request
337        pending_configuration_request = L2capConfigurationRequest()
338
339        class TestChannelManager(ChannelManager):
340
341            def __init__(
342                self,
343                device: BumblePandoraDevice,
344            ) -> None:
345                super().__init__(
346                    device.l2cap_channel_manager.extended_features,
347                    device.l2cap_channel_manager.connectionless_mtu,
348                )
349                self.register_fixed_channel(bumble.smp.SMP_CID, device.on_smp_pdu)
350                device.sdp_server.register(self)
351                self.register_fixed_channel(bumble.att.ATT_CID, device.on_gatt_pdu)
352                self.host = device.host
353
354            def on_l2cap_connection_request(self, connection: Connection, cid: int, request) -> None:
355                global pending_configuration_request
356                if (request.psm == AVDTP_PSM and pending_configuration_request is not None):
357                    logger.info("<< 4. RD1 rejects AVDTP connection request from DUT >>")
358                    self.send_control_frame(
359                        connection, cid,
360                        L2CAP_Connection_Response(
361                            identifier=request.identifier,
362                            destination_cid=0,
363                            source_cid=request.source_cid,
364                            result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
365                            status=0x0000,
366                        ))
367                    logger.info("<< 5. RD1 proceeds with first AVDTP channel configuration >>")
368                    chan_connection = pending_configuration_request.connection
369                    chan_cid = pending_configuration_request.cid
370                    chan_request = pending_configuration_request.request
371                    pending_configuration_request = None
372                    super().on_control_frame(connection=chan_connection, cid=chan_cid, control_frame=chan_request)
373                    return
374                super().on_l2cap_connection_request(connection, cid, request)
375
376        class TestClassicChannel(ClassicChannel):
377
378            def on_connection_response(self, response):
379                assert self.state == self.State.WAIT_CONNECT_RSP
380                assert response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL, f"Connection response: {response}"
381                self.destination_cid = response.destination_cid
382                self._change_state(self.State.WAIT_CONFIG)
383                logger.info("<< 2. RD1 connected DUT, configuration postponed >>")
384
385            def on_configure_request(self, request) -> None:
386                global pending_configuration_request
387                if (pending_configuration_request is not None):
388                    logger.info("<< 3. Block RD1 until DUT tries AVDTP channel connection >>")
389                    pending_configuration_request.connection = self.connection
390                    pending_configuration_request.cid = self.source_cid
391                    pending_configuration_request.request = request
392                else:
393                    super().on_configure_request(request)
394
395        # Override L2CAP Channel Manager to control signaling
396        self.ref1.device.l2cap_channel_manager = TestChannelManager(self.ref1.device)
397
398        # Connect and pair DUT -> RD1.
399        dut_ref1, ref1_dut = await asyncio.gather(
400            initiate_pairing(self.dut, self.ref1.address),
401            accept_pairing(self.ref1, self.dut.address),
402        )
403
404        # Retrieve Bumble connection object from Pandora connection token
405        connection = pandora.get_raw_connection(device=self.ref1, connection=ref1_dut)
406        # Find a free CID for a new channel
407        connection_channels = self.ref1.device.l2cap_channel_manager.channels.setdefault(connection.handle, {})
408        source_cid = self.ref1.device.l2cap_channel_manager.find_free_br_edr_cid(connection_channels)
409        assert source_cid is not None, "source_cid is None"
410
411        spec = ClassicChannelSpec(AVDTP_PSM)
412        channel = TestClassicChannel(
413            self.ref1.device.l2cap_channel_manager,
414            connection,
415            L2CAP_SIGNALING_CID,
416            AVDTP_PSM,
417            source_cid,
418            spec.mtu,
419        )
420        connection_channels[source_cid] = channel
421
422        logger.info("<< 1. RD1 connects DUT over AVDTP - first channel >>")
423        await channel.connect()
424        logger.info(f"<< 6. Channel established: {channel} >>")
425        assert channel.state == ClassicChannel.State.OPEN
426
427        # Initiate AVDTP with connected L2CAP signaling channel
428        protocol = Protocol(channel)
429        protocol.add_sink(sbc_codec_capabilities())
430        logger.info("<< Test finished! >>")
431
432    @avatar.asynchronous
433    async def test_reconfigure_codec_success(self) -> None:
434        """Basic A2DP connection and codec reconfiguration.
435
436        1. Pair and Connect RD2
437        2. Check current codec configuration - should be AAC
438        3. Set SBC codec configuration
439        """
440        # Connect and pair RD2.
441        dut_ref2, ref2_dut = await asyncio.gather(
442            initiate_pairing(self.dut, self.ref2.address),
443            accept_pairing(self.ref2, self.dut.address),
444        )
445
446        # Connect AVDTP to RD2.
447        dut_ref2_source = await open_source(self.dut, dut_ref2)
448        assert_is_not_none(self.ref2.a2dp_sink)
449        assert_is_not_none(self.ref2.a2dp_sink.stream)
450        assert_in(self.ref2.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE])
451
452        # Get current codec status
453        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
454        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
455        assert configurationResponse.configuration.id.HasField('mpeg_aac')
456
457        new_configuration = Configuration()
458        new_configuration.id.sbc.SetInParent()
459        new_configuration.parameters.sampling_frequency_hz = 44100
460        new_configuration.parameters.bit_depth = 16
461        new_configuration.parameters.channel_mode = STEREO
462
463        # Set new codec
464        logger.info(f"Switching to codec: {new_configuration}")
465        result = await self.dut.a2dp.SetConfiguration(connection=dut_ref2, configuration=new_configuration)
466        assert result.success
467
468        # Get current codec status
469        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
470        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
471        assert configurationResponse.configuration.id.HasField('sbc')
472
473    @avatar.asynchronous
474    async def test_reconfigure_codec_error_unsupported(self) -> None:
475        """Basic A2DP connection and codec reconfiguration failure.
476
477        1. Pair and Connect RD2
478        2. Check current codec configuration - should be AAC
479        3. Set SBC codec configuration with unsupported parameters
480        """
481        # Connect and pair RD2.
482        dut_ref2, ref2_dut = await asyncio.gather(
483            initiate_pairing(self.dut, self.ref2.address),
484            accept_pairing(self.ref2, self.dut.address),
485        )
486
487        # Connect AVDTP to RD2.
488        dut_ref2_source = await open_source(self.dut, dut_ref2)
489        assert_is_not_none(self.ref2.a2dp_sink)
490        assert_is_not_none(self.ref2.a2dp_sink.stream)
491        assert_in(self.ref2.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE])
492
493        # Get current codec status
494        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
495        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
496        assert configurationResponse.configuration.id.HasField('mpeg_aac')
497
498        new_configuration = Configuration()
499        new_configuration.id.sbc.SetInParent()
500        new_configuration.parameters.sampling_frequency_hz = 176400
501        new_configuration.parameters.bit_depth = 24
502        new_configuration.parameters.channel_mode = STEREO
503
504        # Set new codec
505        logger.info(f"Switching to codec: {new_configuration}")
506        result = await self.dut.a2dp.SetConfiguration(connection=dut_ref2, configuration=new_configuration)
507        assert result.success == False
508
509        # Get current codec status, assure it did not change
510        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
511        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
512        assert configurationResponse.configuration.id.HasField('mpeg_aac')
513
514    @avatar.asynchronous
515    async def test_reconfigure_codec_aac_error(self) -> None:
516        # Connect and pair RD2.
517        dut_ref2, ref2_dut = await asyncio.gather(
518            initiate_pairing(self.dut, self.ref2.address),
519            accept_pairing(self.ref2, self.dut.address),
520        )
521
522        # Connect AVDTP to RD2.
523        dut_ref2_source = await open_source(self.dut, dut_ref2)
524        assert_is_not_none(self.ref2.a2dp_sink)
525        assert_is_not_none(self.ref2.a2dp_sink.stream)
526        assert_in(self.ref2.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE])
527
528        # Get current codec status
529        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
530        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
531        assert configurationResponse.configuration.id.HasField('mpeg_aac')
532
533        new_configuration = Configuration()
534        new_configuration.id.sbc.SetInParent()
535        new_configuration.parameters.sampling_frequency_hz = 176400
536        new_configuration.parameters.bit_depth = 24
537        new_configuration.parameters.channel_mode = STEREO
538
539        # Set new codec
540        logger.info(f"Switching to codec: {new_configuration}")
541        result = await self.dut.a2dp.SetConfiguration(connection=dut_ref2, configuration=new_configuration)
542        assert result.success == False
543
544        # Get current codec status, assure it did not change
545        configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2)
546        logger.info(f"Current codec configuration: {configurationResponse.configuration}")
547        assert configurationResponse.configuration.id.HasField('mpeg_aac')
548
549    @avatar.asynchronous
550    async def test_avdt_handle_suspend_cfm_bad_state_error(self) -> None:
551        """Test AVDTP handling of suspend confirmation BAD_STATE error.
552
553        Test steps after DUT and RD1 connected and paired:
554        1. Start streaming to RD1.
555        2. Suspend streaming, RD1 will simulate failure response - AVDTP_BAD_STATE_ERROR.
556        3. The DUT closes the AVDTP connection.
557        """
558
559        class TestAvdtProtocol(Protocol):
560
561            def on_suspend_command(self, command):
562                logger.info("<< Simulate suspend reject >>")
563                for seid in command.acp_seids:
564                    endpoint = self.get_local_endpoint_by_seid(seid)
565                    if endpoint:
566                        logger.info(f"<< Reject on endpoint: {endpoint} >>")
567                        return Suspend_Reject(seid, AVDTP_BAD_STATE_ERROR)
568
569        class TestA2dpListener(Listener):
570
571            @classmethod
572            def for_device(cls, device: bumble.device.Device, version: Tuple[int, int] = (1, 3)) -> Listener:
573                listener = TestA2dpListener(registrar=None, version=version)
574                l2cap_server = device.create_l2cap_server(spec=ClassicChannelSpec(psm=AVDTP_PSM))
575                l2cap_server.on('connection', listener.on_l2cap_connection)
576                return listener
577
578            def on_l2cap_connection(self, channel: ClassicChannel) -> None:
579                logger.info(f"<<< incoming L2CAP connection: {channel}")
580
581                if channel.connection.handle in self.servers:
582                    # This is a channel for a stream endpoint
583                    server = self.servers[channel.connection.handle]
584                    server.on_l2cap_connection(channel)
585                else:
586                    # This is a new command/response channel
587                    def on_channel_open():
588                        logger.info('setting up new TestAvdtProtocol for the connection')
589                        server = TestAvdtProtocol(channel, self.version)
590                        self.set_server(channel.connection, server)
591                        self.emit('connection', server)
592
593                    def on_channel_close():
594                        logger.info('removing TestAvdtProtocol for the connection')
595                        self.remove_server(channel.connection)
596
597                    channel.on('open', on_channel_open)
598                    channel.on('close', on_channel_close)
599
600        # Enable BAD_STATE handling
601        for server in self.devices._servers:
602            if isinstance(server, AndroidPandoraServer):
603                server.device.adb.shell(
604                    ['device_config override bluetooth', AVDTP_HANDLE_SUSPEND_CFM_BAD_STATE, 'true'])  # type: ignore
605                break
606
607        self.ref1.device.l2cap_channel_manager.servers.pop(AVDTP_PSM)
608        self.ref1.a2dp = TestA2dpListener.for_device(self.ref1.device)
609        self.ref1.a2dp_sink = None
610
611        def on_ref1_avdtp_connection(server):
612            logger.info("<< RD1: On AVDTP Connection, adding sink >>")
613            self.ref1.a2dp_sink = server.add_sink(sbc_codec_capabilities())
614
615        self.ref1.a2dp.on('connection', on_ref1_avdtp_connection)
616
617        # Connect and pair RD1.
618        dut_ref1, ref1_dut = await asyncio.gather(
619            initiate_pairing(self.dut, self.ref1.address),
620            accept_pairing(self.ref1, self.dut.address),
621        )
622
623        # Connect AVDTP to RD1.
624        dut_ref1_source = await open_source(self.dut, dut_ref1)
625        assert_is_not_none(self.ref1.a2dp_sink)
626        assert_is_not_none(self.ref1.a2dp_sink.stream)
627        assert_in(self.ref1.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE])
628
629        # Create a listener to wait for AVDTP close
630        avdtp_future = asyncio.get_running_loop().create_future()
631
632        def on_ref1_avdtp_close():
633            nonlocal avdtp_future
634            logger.info("AVDTP Close received")
635            avdtp_future.set_result(None)
636
637        self.ref1.a2dp_sink.on('close', on_ref1_avdtp_close)
638
639        # Start streaming to RD1.
640        await self.dut.a2dp.Start(source=dut_ref1_source)
641        audio = AudioSignal(self.dut.a2dp, dut_ref1_source, 0.8, 44100)
642        assert_equal(self.ref1.a2dp_sink.stream.state, AVDTP_STREAMING_STATE)
643
644        # Suspend streaming, peer device will simulate failure response.
645        # The stack should close the stream.
646        await self.dut.a2dp.Suspend(source=dut_ref1_source)
647
648        # Wait for AVDTP Close
649        await asyncio.wait_for(avdtp_future, timeout=10.0)
650
651
652if __name__ == '__main__':
653    logging.basicConfig(level=logging.DEBUG)
654    test_runner.main()  # type: ignore
655