1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import avatar 17import dataclasses 18import itertools 19import logging 20import numpy as np 21 22from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, pandora 23from avatar.pandora_server import AndroidPandoraServer 24import bumble 25from bumble.avctp import AVCTP_PSM 26from bumble.a2dp import ( 27 A2DP_MPEG_2_4_AAC_CODEC_TYPE, 28 MPEG_2_AAC_LC_OBJECT_TYPE, 29 A2DP_SBC_CODEC_TYPE, 30 SBC_DUAL_CHANNEL_MODE, 31 SBC_JOINT_STEREO_CHANNEL_MODE, 32 SBC_LOUDNESS_ALLOCATION_METHOD, 33 SBC_MONO_CHANNEL_MODE, 34 SBC_SNR_ALLOCATION_METHOD, 35 SBC_STEREO_CHANNEL_MODE, 36 AacMediaCodecInformation, 37 SbcMediaCodecInformation, 38 make_audio_sink_service_sdp_records, 39) 40from bumble.avdtp import (AVDTP_AUDIO_MEDIA_TYPE, AVDTP_OPEN_STATE, AVDTP_PSM, AVDTP_STREAMING_STATE, AVDTP_IDLE_STATE, 41 AVDTP_CLOSING_STATE, Listener, MediaCodecCapabilities, Protocol, AVDTP_BAD_STATE_ERROR, 42 Suspend_Reject) 43from bumble.l2cap import (ChannelManager, ClassicChannel, ClassicChannelSpec, L2CAP_Configure_Request, 44 L2CAP_Connection_Response, L2CAP_SIGNALING_CID) 45from bumble.pairing import PairingDelegate 46from mobly import base_test, test_runner 47from mobly.asserts import assert_equal # type: ignore 48from mobly.asserts import assert_in # type: ignore 49from mobly.asserts import assert_is_not_none # type: ignore 50from mobly.asserts import fail # type: ignore 51from pandora.a2dp_grpc_aio import A2DP 52from pandora.a2dp_pb2 import PlaybackAudioRequest, Source, Configuration, STEREO 53from pandora.host_pb2 import Connection 54from pandora.security_pb2 import LEVEL2 55from typing import Optional, Tuple 56 57logger = logging.getLogger(__name__) 58 59AVRCP_CONNECT_A2DP_WITH_DELAY = 'com.android.bluetooth.flags.avrcp_connect_a2dp_with_delay' 60AVDTP_HANDLE_SUSPEND_CFM_BAD_STATE = 'com.android.bluetooth.flags.avdt_handle_suspend_cfm_bad_state' 61 62 63async def initiate_pairing(device, address) -> Connection: 64 """Connect and pair a remote device.""" 65 66 result = await device.aio.host.Connect(address=address) 67 connection = result.connection 68 assert connection 69 70 bond = await device.aio.security.Secure(connection=connection, classic=LEVEL2) 71 assert bond.success 72 73 return connection 74 75 76async def accept_pairing(device, address) -> Connection: 77 """Accept connection and pairing from a remote device.""" 78 79 result = await device.aio.host.WaitConnection(address=address) 80 connection = result.connection 81 assert connection 82 83 bond = await device.aio.security.WaitSecurity(connection=connection, classic=LEVEL2) 84 assert bond.success 85 86 return connection 87 88 89async def open_source(device, connection) -> Source: 90 """Initiate AVDTP connection from Android device.""" 91 92 result = await device.a2dp.OpenSource(connection=connection) 93 source = result.source 94 assert source 95 96 return source 97 98 99def sbc_codec_capabilities() -> MediaCodecCapabilities: 100 """Codec capabilities for the Bumble sink devices.""" 101 102 return MediaCodecCapabilities( 103 media_type=AVDTP_AUDIO_MEDIA_TYPE, 104 media_codec_type=A2DP_SBC_CODEC_TYPE, 105 media_codec_information=SbcMediaCodecInformation.from_lists( 106 sampling_frequencies=[48000, 44100, 32000, 16000], 107 channel_modes=[ 108 SBC_MONO_CHANNEL_MODE, 109 SBC_DUAL_CHANNEL_MODE, 110 SBC_STEREO_CHANNEL_MODE, 111 SBC_JOINT_STEREO_CHANNEL_MODE, 112 ], 113 block_lengths=[4, 8, 12, 16], 114 subbands=[4, 8], 115 allocation_methods=[ 116 SBC_LOUDNESS_ALLOCATION_METHOD, 117 SBC_SNR_ALLOCATION_METHOD, 118 ], 119 minimum_bitpool_value=2, 120 maximum_bitpool_value=53, 121 ), 122 ) 123 124 125def aac_codec_capabilities() -> MediaCodecCapabilities: 126 """Codec capabilities for the Bumble sink devices.""" 127 128 return MediaCodecCapabilities( 129 media_type=AVDTP_AUDIO_MEDIA_TYPE, 130 media_codec_type=A2DP_MPEG_2_4_AAC_CODEC_TYPE, 131 media_codec_information=AacMediaCodecInformation.from_lists( 132 object_types=[MPEG_2_AAC_LC_OBJECT_TYPE], 133 sampling_frequencies=[48000, 44100], 134 channels=[1, 2], 135 vbr=1, 136 bitrate=256000, 137 ), 138 ) 139 140 141class AudioSignal: 142 """Audio signal generator and verifier.""" 143 144 SINE_FREQUENCY = 440 145 SINE_DURATION = 0.1 146 147 def __init__(self, a2dp: A2DP, source: Source, amplitude, fs): 148 """Init AudioSignal class. 149 150 Args: 151 a2dp: A2DP profile interface. 152 source: Source connection object to send the data to. 153 amplitude: amplitude of the signal to generate. 154 fs: sampling rate of the signal to generate. 155 """ 156 self.a2dp = a2dp 157 self.source = source 158 self.amplitude = amplitude 159 self.fs = fs 160 self.task = None 161 162 def start(self): 163 """Generates the audio signal and send it to the transport.""" 164 self.task = asyncio.create_task(self._run()) 165 166 async def _run(self): 167 sine = self._generate_sine(self.SINE_FREQUENCY, self.SINE_DURATION) 168 169 # Interleaved audio. 170 stereo = np.zeros(sine.size * 2, dtype=sine.dtype) 171 stereo[0::2] = sine 172 173 # Send 4 second of audio. 174 audio = itertools.repeat(stereo.tobytes(), int(4 / self.SINE_DURATION)) 175 176 for frame in audio: 177 await self.a2dp.PlaybackAudio(PlaybackAudioRequest(data=frame, source=self.source)) 178 179 def _generate_sine(self, f, duration): 180 sine = self.amplitude * np.sin(2 * np.pi * np.arange(self.fs * duration) * (f / self.fs)) 181 s16le = (sine * 32767).astype('<i2') 182 return s16le 183 184 185class A2dpTest(base_test.BaseTestClass): # type: ignore[misc] 186 """A2DP test suite.""" 187 188 devices: Optional[PandoraDevices] = None 189 190 # pandora devices. 191 dut: PandoraDevice 192 ref1: PandoraDevice 193 ref2: PandoraDevice 194 195 @avatar.asynchronous 196 async def setup_class(self) -> None: 197 self.devices = PandoraDevices(self) 198 self.dut, self.ref1, self.ref2, *_ = self.devices 199 200 if not isinstance(self.ref1, BumblePandoraDevice): 201 raise signals.TestAbortClass('Test require Bumble as reference device(s)') 202 if not isinstance(self.ref2, BumblePandoraDevice): 203 raise signals.TestAbortClass('Test require Bumble as reference device(s)') 204 205 # Enable BR/EDR mode and SSP for Bumble devices. 206 for device in self.devices: 207 if isinstance(device, BumblePandoraDevice): 208 device.config.setdefault('classic_enabled', True) 209 device.config.setdefault('classic_ssp_enabled', True) 210 device.config.setdefault('classic_smp_enabled', False) 211 device.server_config.io_capability = PairingDelegate.NO_OUTPUT_NO_INPUT 212 213 def teardown_class(self) -> None: 214 if self.devices: 215 self.devices.stop_all() 216 217 @avatar.asynchronous 218 async def setup_test(self) -> None: 219 await asyncio.gather(self.dut.reset(), self.ref1.reset(), self.ref2.reset()) 220 221 self.dut.a2dp = A2DP(channel=self.dut.aio.channel) 222 223 handle = 0x00010001 224 self.ref1.device.sdp_service_records = {handle: make_audio_sink_service_sdp_records(handle)} 225 self.ref2.device.sdp_service_records = {handle: make_audio_sink_service_sdp_records(handle)} 226 227 self.ref1.a2dp = Listener.for_device(self.ref1.device) 228 self.ref2.a2dp = Listener.for_device(self.ref2.device) 229 self.ref1.a2dp_sink = None 230 self.ref2.a2dp_sink = None 231 232 def on_ref1_avdtp_connection(server): 233 self.ref1.a2dp_sink = server.add_sink(sbc_codec_capabilities()) 234 235 def on_ref2_avdtp_connection(server): 236 self.ref2.a2dp_sink = server.add_sink(sbc_codec_capabilities()) 237 self.ref2.a2dp_sink = server.add_sink(aac_codec_capabilities()) 238 239 self.ref1.a2dp.on('connection', on_ref1_avdtp_connection) 240 self.ref2.a2dp.on('connection', on_ref2_avdtp_connection) 241 242 @avatar.asynchronous 243 async def test_connect_and_stream(self) -> None: 244 """Basic A2DP connection and streaming test. 245 This test wants to be a template to be reused for other tests. 246 247 1. Pair and Connect RD1 248 2. Start streaming 249 3. Check AVDTP status on RD1 250 4. Stop streaming 251 5. Check AVDTP status on RD1 252 """ 253 # Connect and pair RD1. 254 dut_ref1, ref1_dut = await asyncio.gather( 255 initiate_pairing(self.dut, self.ref1.address), 256 accept_pairing(self.ref1, self.dut.address), 257 ) 258 259 # Connect AVDTP to RD1. 260 dut_ref1_source = await open_source(self.dut, dut_ref1) 261 assert_is_not_none(self.ref1.a2dp_sink) 262 assert_is_not_none(self.ref1.a2dp_sink.stream) 263 assert_in(self.ref1.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE]) 264 265 # Start streaming to RD1. 266 await self.dut.a2dp.Start(source=dut_ref1_source) 267 audio = AudioSignal(self.dut.a2dp, dut_ref1_source, 0.8, 44100) 268 assert_equal(self.ref1.a2dp_sink.stream.state, AVDTP_STREAMING_STATE) 269 270 # Stop streaming to RD1. 271 await self.dut.a2dp.Suspend(source=dut_ref1_source) 272 assert_equal(self.ref1.a2dp_sink.stream.state, AVDTP_OPEN_STATE) 273 274 @avatar.asynchronous 275 async def test_avdtp_autoconnect_when_only_avctp_connected(self) -> None: 276 """Test AVDTP automatically connects if peer device connects only AVCTP. 277 278 1. Pair and Connect RD1 -> DUT 279 2. Connect AVCTP RD1 -> DUT 280 3. Check AVDTP status on RD1 281 """ 282 283 # Enable AVRCP connect A2DP delayed feature 284 for server in self.devices._servers: 285 if isinstance(server, AndroidPandoraServer): 286 server.device.adb.shell(['device_config override bluetooth', AVRCP_CONNECT_A2DP_WITH_DELAY, 287 'true']) # type: ignore 288 break 289 290 # Connect and pair RD1. 291 ref1_dut, dut_ref1 = await asyncio.gather( 292 initiate_pairing(self.ref1, self.dut.address), 293 accept_pairing(self.dut, self.ref1.address), 294 ) 295 296 # Create a listener to wait for AVDTP connections 297 avdtp_future = asyncio.get_running_loop().create_future() 298 299 def on_avdtp_connection(server): 300 nonlocal avdtp_future 301 self.ref1.a2dp_sink = server.add_sink(sbc_codec_capabilities()) 302 self.ref1.log.info(f'Sink: {self.ref1.a2dp_sink}') 303 avdtp_future.set_result(None) 304 305 self.ref1.a2dp.on('connection', on_avdtp_connection) 306 307 # Retrieve Bumble connection object from Pandora connection token 308 connection = pandora.get_raw_connection(device=self.ref1, connection=ref1_dut) 309 310 # Open AVCTP L2CAP channel 311 avctp = await connection.create_l2cap_channel(spec=ClassicChannelSpec(AVCTP_PSM)) 312 self.ref1.log.info(f'AVCTP: {avctp}') 313 314 # Wait for AVDTP L2CAP channel 315 await asyncio.wait_for(avdtp_future, timeout=10.0) 316 317 @avatar.asynchronous 318 async def test_avdt_signaling_channel_connection_collision(self) -> None: 319 """Test AVDTP signaling channel connection collision. 320 321 Test steps after DUT and RD1 connected and paired: 322 1. RD1 connects DUT over AVDTP - first AVDTP signaling channel 323 2. AVDTP signaling channel configuration postponed until DUT tries to initiate AVDTP signaling channel connection 324 3. DUT tries connecting RD1 - collision simulated 325 4. RD1 rejects AVDTP signaling channel connection request from DUT 326 5. RD1 proceeds with first AVDTP signaling channel configuration 327 6. Channel established - collision avoided 328 """ 329 330 @dataclasses.dataclass 331 class L2capConfigurationRequest: 332 connection: Optional[Connection] = None 333 cid: Optional[int] = None 334 request: Optional[L2CAP_Configure_Request] = None 335 336 global pending_configuration_request 337 pending_configuration_request = L2capConfigurationRequest() 338 339 class TestChannelManager(ChannelManager): 340 341 def __init__( 342 self, 343 device: BumblePandoraDevice, 344 ) -> None: 345 super().__init__( 346 device.l2cap_channel_manager.extended_features, 347 device.l2cap_channel_manager.connectionless_mtu, 348 ) 349 self.register_fixed_channel(bumble.smp.SMP_CID, device.on_smp_pdu) 350 device.sdp_server.register(self) 351 self.register_fixed_channel(bumble.att.ATT_CID, device.on_gatt_pdu) 352 self.host = device.host 353 354 def on_l2cap_connection_request(self, connection: Connection, cid: int, request) -> None: 355 global pending_configuration_request 356 if (request.psm == AVDTP_PSM and pending_configuration_request is not None): 357 logger.info("<< 4. RD1 rejects AVDTP connection request from DUT >>") 358 self.send_control_frame( 359 connection, cid, 360 L2CAP_Connection_Response( 361 identifier=request.identifier, 362 destination_cid=0, 363 source_cid=request.source_cid, 364 result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, 365 status=0x0000, 366 )) 367 logger.info("<< 5. RD1 proceeds with first AVDTP channel configuration >>") 368 chan_connection = pending_configuration_request.connection 369 chan_cid = pending_configuration_request.cid 370 chan_request = pending_configuration_request.request 371 pending_configuration_request = None 372 super().on_control_frame(connection=chan_connection, cid=chan_cid, control_frame=chan_request) 373 return 374 super().on_l2cap_connection_request(connection, cid, request) 375 376 class TestClassicChannel(ClassicChannel): 377 378 def on_connection_response(self, response): 379 assert self.state == self.State.WAIT_CONNECT_RSP 380 assert response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL, f"Connection response: {response}" 381 self.destination_cid = response.destination_cid 382 self._change_state(self.State.WAIT_CONFIG) 383 logger.info("<< 2. RD1 connected DUT, configuration postponed >>") 384 385 def on_configure_request(self, request) -> None: 386 global pending_configuration_request 387 if (pending_configuration_request is not None): 388 logger.info("<< 3. Block RD1 until DUT tries AVDTP channel connection >>") 389 pending_configuration_request.connection = self.connection 390 pending_configuration_request.cid = self.source_cid 391 pending_configuration_request.request = request 392 else: 393 super().on_configure_request(request) 394 395 # Override L2CAP Channel Manager to control signaling 396 self.ref1.device.l2cap_channel_manager = TestChannelManager(self.ref1.device) 397 398 # Connect and pair DUT -> RD1. 399 dut_ref1, ref1_dut = await asyncio.gather( 400 initiate_pairing(self.dut, self.ref1.address), 401 accept_pairing(self.ref1, self.dut.address), 402 ) 403 404 # Retrieve Bumble connection object from Pandora connection token 405 connection = pandora.get_raw_connection(device=self.ref1, connection=ref1_dut) 406 # Find a free CID for a new channel 407 connection_channels = self.ref1.device.l2cap_channel_manager.channels.setdefault(connection.handle, {}) 408 source_cid = self.ref1.device.l2cap_channel_manager.find_free_br_edr_cid(connection_channels) 409 assert source_cid is not None, "source_cid is None" 410 411 spec = ClassicChannelSpec(AVDTP_PSM) 412 channel = TestClassicChannel( 413 self.ref1.device.l2cap_channel_manager, 414 connection, 415 L2CAP_SIGNALING_CID, 416 AVDTP_PSM, 417 source_cid, 418 spec.mtu, 419 ) 420 connection_channels[source_cid] = channel 421 422 logger.info("<< 1. RD1 connects DUT over AVDTP - first channel >>") 423 await channel.connect() 424 logger.info(f"<< 6. Channel established: {channel} >>") 425 assert channel.state == ClassicChannel.State.OPEN 426 427 # Initiate AVDTP with connected L2CAP signaling channel 428 protocol = Protocol(channel) 429 protocol.add_sink(sbc_codec_capabilities()) 430 logger.info("<< Test finished! >>") 431 432 @avatar.asynchronous 433 async def test_reconfigure_codec_success(self) -> None: 434 """Basic A2DP connection and codec reconfiguration. 435 436 1. Pair and Connect RD2 437 2. Check current codec configuration - should be AAC 438 3. Set SBC codec configuration 439 """ 440 # Connect and pair RD2. 441 dut_ref2, ref2_dut = await asyncio.gather( 442 initiate_pairing(self.dut, self.ref2.address), 443 accept_pairing(self.ref2, self.dut.address), 444 ) 445 446 # Connect AVDTP to RD2. 447 dut_ref2_source = await open_source(self.dut, dut_ref2) 448 assert_is_not_none(self.ref2.a2dp_sink) 449 assert_is_not_none(self.ref2.a2dp_sink.stream) 450 assert_in(self.ref2.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE]) 451 452 # Get current codec status 453 configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2) 454 logger.info(f"Current codec configuration: {configurationResponse.configuration}") 455 assert configurationResponse.configuration.id.HasField('mpeg_aac') 456 457 new_configuration = Configuration() 458 new_configuration.id.sbc.SetInParent() 459 new_configuration.parameters.sampling_frequency_hz = 44100 460 new_configuration.parameters.bit_depth = 16 461 new_configuration.parameters.channel_mode = STEREO 462 463 # Set new codec 464 logger.info(f"Switching to codec: {new_configuration}") 465 result = await self.dut.a2dp.SetConfiguration(connection=dut_ref2, configuration=new_configuration) 466 assert result.success 467 468 # Get current codec status 469 configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2) 470 logger.info(f"Current codec configuration: {configurationResponse.configuration}") 471 assert configurationResponse.configuration.id.HasField('sbc') 472 473 @avatar.asynchronous 474 async def test_reconfigure_codec_error_unsupported(self) -> None: 475 """Basic A2DP connection and codec reconfiguration failure. 476 477 1. Pair and Connect RD2 478 2. Check current codec configuration - should be AAC 479 3. Set SBC codec configuration with unsupported parameters 480 """ 481 # Connect and pair RD2. 482 dut_ref2, ref2_dut = await asyncio.gather( 483 initiate_pairing(self.dut, self.ref2.address), 484 accept_pairing(self.ref2, self.dut.address), 485 ) 486 487 # Connect AVDTP to RD2. 488 dut_ref2_source = await open_source(self.dut, dut_ref2) 489 assert_is_not_none(self.ref2.a2dp_sink) 490 assert_is_not_none(self.ref2.a2dp_sink.stream) 491 assert_in(self.ref2.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE]) 492 493 # Get current codec status 494 configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2) 495 logger.info(f"Current codec configuration: {configurationResponse.configuration}") 496 assert configurationResponse.configuration.id.HasField('mpeg_aac') 497 498 new_configuration = Configuration() 499 new_configuration.id.sbc.SetInParent() 500 new_configuration.parameters.sampling_frequency_hz = 176400 501 new_configuration.parameters.bit_depth = 24 502 new_configuration.parameters.channel_mode = STEREO 503 504 # Set new codec 505 logger.info(f"Switching to codec: {new_configuration}") 506 result = await self.dut.a2dp.SetConfiguration(connection=dut_ref2, configuration=new_configuration) 507 assert result.success == False 508 509 # Get current codec status, assure it did not change 510 configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2) 511 logger.info(f"Current codec configuration: {configurationResponse.configuration}") 512 assert configurationResponse.configuration.id.HasField('mpeg_aac') 513 514 @avatar.asynchronous 515 async def test_reconfigure_codec_aac_error(self) -> None: 516 # Connect and pair RD2. 517 dut_ref2, ref2_dut = await asyncio.gather( 518 initiate_pairing(self.dut, self.ref2.address), 519 accept_pairing(self.ref2, self.dut.address), 520 ) 521 522 # Connect AVDTP to RD2. 523 dut_ref2_source = await open_source(self.dut, dut_ref2) 524 assert_is_not_none(self.ref2.a2dp_sink) 525 assert_is_not_none(self.ref2.a2dp_sink.stream) 526 assert_in(self.ref2.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE]) 527 528 # Get current codec status 529 configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2) 530 logger.info(f"Current codec configuration: {configurationResponse.configuration}") 531 assert configurationResponse.configuration.id.HasField('mpeg_aac') 532 533 new_configuration = Configuration() 534 new_configuration.id.sbc.SetInParent() 535 new_configuration.parameters.sampling_frequency_hz = 176400 536 new_configuration.parameters.bit_depth = 24 537 new_configuration.parameters.channel_mode = STEREO 538 539 # Set new codec 540 logger.info(f"Switching to codec: {new_configuration}") 541 result = await self.dut.a2dp.SetConfiguration(connection=dut_ref2, configuration=new_configuration) 542 assert result.success == False 543 544 # Get current codec status, assure it did not change 545 configurationResponse = await self.dut.a2dp.GetConfiguration(connection=dut_ref2) 546 logger.info(f"Current codec configuration: {configurationResponse.configuration}") 547 assert configurationResponse.configuration.id.HasField('mpeg_aac') 548 549 @avatar.asynchronous 550 async def test_avdt_handle_suspend_cfm_bad_state_error(self) -> None: 551 """Test AVDTP handling of suspend confirmation BAD_STATE error. 552 553 Test steps after DUT and RD1 connected and paired: 554 1. Start streaming to RD1. 555 2. Suspend streaming, RD1 will simulate failure response - AVDTP_BAD_STATE_ERROR. 556 3. The DUT closes the AVDTP connection. 557 """ 558 559 class TestAvdtProtocol(Protocol): 560 561 def on_suspend_command(self, command): 562 logger.info("<< Simulate suspend reject >>") 563 for seid in command.acp_seids: 564 endpoint = self.get_local_endpoint_by_seid(seid) 565 if endpoint: 566 logger.info(f"<< Reject on endpoint: {endpoint} >>") 567 return Suspend_Reject(seid, AVDTP_BAD_STATE_ERROR) 568 569 class TestA2dpListener(Listener): 570 571 @classmethod 572 def for_device(cls, device: bumble.device.Device, version: Tuple[int, int] = (1, 3)) -> Listener: 573 listener = TestA2dpListener(registrar=None, version=version) 574 l2cap_server = device.create_l2cap_server(spec=ClassicChannelSpec(psm=AVDTP_PSM)) 575 l2cap_server.on('connection', listener.on_l2cap_connection) 576 return listener 577 578 def on_l2cap_connection(self, channel: ClassicChannel) -> None: 579 logger.info(f"<<< incoming L2CAP connection: {channel}") 580 581 if channel.connection.handle in self.servers: 582 # This is a channel for a stream endpoint 583 server = self.servers[channel.connection.handle] 584 server.on_l2cap_connection(channel) 585 else: 586 # This is a new command/response channel 587 def on_channel_open(): 588 logger.info('setting up new TestAvdtProtocol for the connection') 589 server = TestAvdtProtocol(channel, self.version) 590 self.set_server(channel.connection, server) 591 self.emit('connection', server) 592 593 def on_channel_close(): 594 logger.info('removing TestAvdtProtocol for the connection') 595 self.remove_server(channel.connection) 596 597 channel.on('open', on_channel_open) 598 channel.on('close', on_channel_close) 599 600 # Enable BAD_STATE handling 601 for server in self.devices._servers: 602 if isinstance(server, AndroidPandoraServer): 603 server.device.adb.shell( 604 ['device_config override bluetooth', AVDTP_HANDLE_SUSPEND_CFM_BAD_STATE, 'true']) # type: ignore 605 break 606 607 self.ref1.device.l2cap_channel_manager.servers.pop(AVDTP_PSM) 608 self.ref1.a2dp = TestA2dpListener.for_device(self.ref1.device) 609 self.ref1.a2dp_sink = None 610 611 def on_ref1_avdtp_connection(server): 612 logger.info("<< RD1: On AVDTP Connection, adding sink >>") 613 self.ref1.a2dp_sink = server.add_sink(sbc_codec_capabilities()) 614 615 self.ref1.a2dp.on('connection', on_ref1_avdtp_connection) 616 617 # Connect and pair RD1. 618 dut_ref1, ref1_dut = await asyncio.gather( 619 initiate_pairing(self.dut, self.ref1.address), 620 accept_pairing(self.ref1, self.dut.address), 621 ) 622 623 # Connect AVDTP to RD1. 624 dut_ref1_source = await open_source(self.dut, dut_ref1) 625 assert_is_not_none(self.ref1.a2dp_sink) 626 assert_is_not_none(self.ref1.a2dp_sink.stream) 627 assert_in(self.ref1.a2dp_sink.stream.state, [AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE]) 628 629 # Create a listener to wait for AVDTP close 630 avdtp_future = asyncio.get_running_loop().create_future() 631 632 def on_ref1_avdtp_close(): 633 nonlocal avdtp_future 634 logger.info("AVDTP Close received") 635 avdtp_future.set_result(None) 636 637 self.ref1.a2dp_sink.on('close', on_ref1_avdtp_close) 638 639 # Start streaming to RD1. 640 await self.dut.a2dp.Start(source=dut_ref1_source) 641 audio = AudioSignal(self.dut.a2dp, dut_ref1_source, 0.8, 44100) 642 assert_equal(self.ref1.a2dp_sink.stream.state, AVDTP_STREAMING_STATE) 643 644 # Suspend streaming, peer device will simulate failure response. 645 # The stack should close the stream. 646 await self.dut.a2dp.Suspend(source=dut_ref1_source) 647 648 # Wait for AVDTP Close 649 await asyncio.wait_for(avdtp_future, timeout=10.0) 650 651 652if __name__ == '__main__': 653 logging.basicConfig(level=logging.DEBUG) 654 test_runner.main() # type: ignore 655