1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19import asyncio 20import struct 21import time 22import logging 23import enum 24import warnings 25from pyee import EventEmitter 26from typing import ( 27 Any, 28 Awaitable, 29 Dict, 30 Type, 31 Tuple, 32 Optional, 33 Callable, 34 List, 35 AsyncGenerator, 36 Iterable, 37 Union, 38 SupportsBytes, 39 cast, 40) 41 42from .core import ( 43 BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE, 44 InvalidStateError, 45 ProtocolError, 46 InvalidArgumentError, 47 name_or_number, 48) 49from .a2dp import ( 50 A2DP_CODEC_TYPE_NAMES, 51 A2DP_MPEG_2_4_AAC_CODEC_TYPE, 52 A2DP_NON_A2DP_CODEC_TYPE, 53 A2DP_SBC_CODEC_TYPE, 54 AacMediaCodecInformation, 55 SbcMediaCodecInformation, 56 VendorSpecificMediaCodecInformation, 57) 58from . import sdp, device, l2cap 59from .colors import color 60 61# ----------------------------------------------------------------------------- 62# Logging 63# ----------------------------------------------------------------------------- 64logger = logging.getLogger(__name__) 65 66 67# ----------------------------------------------------------------------------- 68# Constants 69# ----------------------------------------------------------------------------- 70# fmt: off 71# pylint: disable=line-too-long 72 73AVDTP_PSM = 0x0019 74 75AVDTP_DEFAULT_RTX_SIG_TIMER = 5 # Seconds 76 77# Signal Identifiers (AVDTP spec - 8.5 Signal Command Set) 78AVDTP_DISCOVER = 0x01 79AVDTP_GET_CAPABILITIES = 0x02 80AVDTP_SET_CONFIGURATION = 0x03 81AVDTP_GET_CONFIGURATION = 0x04 82AVDTP_RECONFIGURE = 0x05 83AVDTP_OPEN = 0x06 84AVDTP_START = 0x07 85AVDTP_CLOSE = 0x08 86AVDTP_SUSPEND = 0x09 87AVDTP_ABORT = 0x0A 88AVDTP_SECURITY_CONTROL = 0x0B 89AVDTP_GET_ALL_CAPABILITIES = 0x0C 90AVDTP_DELAYREPORT = 0x0D 91 92AVDTP_SIGNAL_NAMES = { 93 AVDTP_DISCOVER: 'AVDTP_DISCOVER', 94 AVDTP_GET_CAPABILITIES: 'AVDTP_GET_CAPABILITIES', 95 AVDTP_SET_CONFIGURATION: 'AVDTP_SET_CONFIGURATION', 96 AVDTP_GET_CONFIGURATION: 'AVDTP_GET_CONFIGURATION', 97 AVDTP_RECONFIGURE: 'AVDTP_RECONFIGURE', 98 AVDTP_OPEN: 'AVDTP_OPEN', 99 AVDTP_START: 'AVDTP_START', 100 AVDTP_CLOSE: 'AVDTP_CLOSE', 101 AVDTP_SUSPEND: 'AVDTP_SUSPEND', 102 AVDTP_ABORT: 'AVDTP_ABORT', 103 AVDTP_SECURITY_CONTROL: 'AVDTP_SECURITY_CONTROL', 104 AVDTP_GET_ALL_CAPABILITIES: 'AVDTP_GET_ALL_CAPABILITIES', 105 AVDTP_DELAYREPORT: 'AVDTP_DELAYREPORT' 106} 107 108AVDTP_SIGNAL_IDENTIFIERS = { 109 'AVDTP_DISCOVER': AVDTP_DISCOVER, 110 'AVDTP_GET_CAPABILITIES': AVDTP_GET_CAPABILITIES, 111 'AVDTP_SET_CONFIGURATION': AVDTP_SET_CONFIGURATION, 112 'AVDTP_GET_CONFIGURATION': AVDTP_GET_CONFIGURATION, 113 'AVDTP_RECONFIGURE': AVDTP_RECONFIGURE, 114 'AVDTP_OPEN': AVDTP_OPEN, 115 'AVDTP_START': AVDTP_START, 116 'AVDTP_CLOSE': AVDTP_CLOSE, 117 'AVDTP_SUSPEND': AVDTP_SUSPEND, 118 'AVDTP_ABORT': AVDTP_ABORT, 119 'AVDTP_SECURITY_CONTROL': AVDTP_SECURITY_CONTROL, 120 'AVDTP_GET_ALL_CAPABILITIES': AVDTP_GET_ALL_CAPABILITIES, 121 'AVDTP_DELAYREPORT': AVDTP_DELAYREPORT 122} 123 124# Error codes (AVDTP spec - 8.20.6.2 ERROR_CODE tables) 125AVDTP_BAD_HEADER_FORMAT_ERROR = 0x01 126AVDTP_BAD_LENGTH_ERROR = 0x11 127AVDTP_BAD_ACP_SEID_ERROR = 0x12 128AVDTP_SEP_IN_USE_ERROR = 0x13 129AVDTP_SEP_NOT_IN_USE_ERROR = 0x14 130AVDTP_BAD_SERV_CATEGORY_ERROR = 0x17 131AVDTP_BAD_PAYLOAD_FORMAT_ERROR = 0x18 132AVDTP_NOT_SUPPORTED_COMMAND_ERROR = 0x19 133AVDTP_INVALID_CAPABILITIES_ERROR = 0x1A 134AVDTP_BAD_RECOVERY_TYPE_ERROR = 0x22 135AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR = 0x23 136AVDTP_BAD_RECOVERY_FORMAT_ERROR = 0x25 137AVDTP_BAD_ROHC_FORMAT_ERROR = 0x26 138AVDTP_BAD_CP_FORMAT_ERROR = 0x27 139AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR = 0x28 140AVDTP_UNSUPPORTED_CONFIGURATION_ERROR = 0x29 141AVDTP_BAD_STATE_ERROR = 0x31 142 143AVDTP_ERROR_NAMES = { 144 AVDTP_BAD_HEADER_FORMAT_ERROR: 'AVDTP_BAD_HEADER_FORMAT_ERROR', 145 AVDTP_BAD_LENGTH_ERROR: 'AVDTP_BAD_LENGTH_ERROR', 146 AVDTP_BAD_ACP_SEID_ERROR: 'AVDTP_BAD_ACP_SEID_ERROR', 147 AVDTP_SEP_IN_USE_ERROR: 'AVDTP_SEP_IN_USE_ERROR', 148 AVDTP_SEP_NOT_IN_USE_ERROR: 'AVDTP_SEP_NOT_IN_USE_ERROR', 149 AVDTP_BAD_SERV_CATEGORY_ERROR: 'AVDTP_BAD_SERV_CATEGORY_ERROR', 150 AVDTP_BAD_PAYLOAD_FORMAT_ERROR: 'AVDTP_BAD_PAYLOAD_FORMAT_ERROR', 151 AVDTP_NOT_SUPPORTED_COMMAND_ERROR: 'AVDTP_NOT_SUPPORTED_COMMAND_ERROR', 152 AVDTP_INVALID_CAPABILITIES_ERROR: 'AVDTP_INVALID_CAPABILITIES_ERROR', 153 AVDTP_BAD_RECOVERY_TYPE_ERROR: 'AVDTP_BAD_RECOVERY_TYPE_ERROR', 154 AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR: 'AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR', 155 AVDTP_BAD_RECOVERY_FORMAT_ERROR: 'AVDTP_BAD_RECOVERY_FORMAT_ERROR', 156 AVDTP_BAD_ROHC_FORMAT_ERROR: 'AVDTP_BAD_ROHC_FORMAT_ERROR', 157 AVDTP_BAD_CP_FORMAT_ERROR: 'AVDTP_BAD_CP_FORMAT_ERROR', 158 AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR: 'AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR', 159 AVDTP_UNSUPPORTED_CONFIGURATION_ERROR: 'AVDTP_UNSUPPORTED_CONFIGURATION_ERROR', 160 AVDTP_BAD_STATE_ERROR: 'AVDTP_BAD_STATE_ERROR' 161} 162 163AVDTP_AUDIO_MEDIA_TYPE = 0x00 164AVDTP_VIDEO_MEDIA_TYPE = 0x01 165AVDTP_MULTIMEDIA_MEDIA_TYPE = 0x02 166 167AVDTP_MEDIA_TYPE_NAMES = { 168 AVDTP_AUDIO_MEDIA_TYPE: 'AVDTP_AUDIO_MEDIA_TYPE', 169 AVDTP_VIDEO_MEDIA_TYPE: 'AVDTP_VIDEO_MEDIA_TYPE', 170 AVDTP_MULTIMEDIA_MEDIA_TYPE: 'AVDTP_MULTIMEDIA_MEDIA_TYPE' 171} 172 173# TSEP (AVDTP spec - 8.20.3 Stream End-point Type, Source or Sink (TSEP)) 174AVDTP_TSEP_SRC = 0x00 175AVDTP_TSEP_SNK = 0x01 176 177AVDTP_TSEP_NAMES = { 178 AVDTP_TSEP_SRC: 'AVDTP_TSEP_SRC', 179 AVDTP_TSEP_SNK: 'AVDTP_TSEP_SNK' 180} 181 182# Service Categories (AVDTP spec - Table 8.47: Service Category information element field values) 183AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY = 0x01 184AVDTP_REPORTING_SERVICE_CATEGORY = 0x02 185AVDTP_RECOVERY_SERVICE_CATEGORY = 0x03 186AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY = 0x04 187AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY = 0x05 188AVDTP_MULTIPLEXING_SERVICE_CATEGORY = 0x06 189AVDTP_MEDIA_CODEC_SERVICE_CATEGORY = 0x07 190AVDTP_DELAY_REPORTING_SERVICE_CATEGORY = 0x08 191 192AVDTP_SERVICE_CATEGORY_NAMES = { 193 AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY: 'AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY', 194 AVDTP_REPORTING_SERVICE_CATEGORY: 'AVDTP_REPORTING_SERVICE_CATEGORY', 195 AVDTP_RECOVERY_SERVICE_CATEGORY: 'AVDTP_RECOVERY_SERVICE_CATEGORY', 196 AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY: 'AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY', 197 AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY: 'AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY', 198 AVDTP_MULTIPLEXING_SERVICE_CATEGORY: 'AVDTP_MULTIPLEXING_SERVICE_CATEGORY', 199 AVDTP_MEDIA_CODEC_SERVICE_CATEGORY: 'AVDTP_MEDIA_CODEC_SERVICE_CATEGORY', 200 AVDTP_DELAY_REPORTING_SERVICE_CATEGORY: 'AVDTP_DELAY_REPORTING_SERVICE_CATEGORY' 201} 202 203# States (AVDTP spec - 9.1 State Definitions) 204AVDTP_IDLE_STATE = 0x00 205AVDTP_CONFIGURED_STATE = 0x01 206AVDTP_OPEN_STATE = 0x02 207AVDTP_STREAMING_STATE = 0x03 208AVDTP_CLOSING_STATE = 0x04 209AVDTP_ABORTING_STATE = 0x05 210 211AVDTP_STATE_NAMES = { 212 AVDTP_IDLE_STATE: 'AVDTP_IDLE_STATE', 213 AVDTP_CONFIGURED_STATE: 'AVDTP_CONFIGURED_STATE', 214 AVDTP_OPEN_STATE: 'AVDTP_OPEN_STATE', 215 AVDTP_STREAMING_STATE: 'AVDTP_STREAMING_STATE', 216 AVDTP_CLOSING_STATE: 'AVDTP_CLOSING_STATE', 217 AVDTP_ABORTING_STATE: 'AVDTP_ABORTING_STATE' 218} 219 220# fmt: on 221# pylint: enable=line-too-long 222# pylint: disable=invalid-name 223 224 225# ----------------------------------------------------------------------------- 226async def find_avdtp_service_with_sdp_client( 227 sdp_client: sdp.Client, 228) -> Optional[Tuple[int, int]]: 229 ''' 230 Find an AVDTP service, using a connected SDP client, and return its version, 231 or None if none is found 232 ''' 233 234 # Search for services with an Audio Sink service class 235 search_result = await sdp_client.search_attributes( 236 [BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE], 237 [sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID], 238 ) 239 for attribute_list in search_result: 240 profile_descriptor_list = sdp.ServiceAttribute.find_attribute_in_list( 241 attribute_list, sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID 242 ) 243 if profile_descriptor_list: 244 for profile_descriptor in profile_descriptor_list.value: 245 if ( 246 profile_descriptor.type == sdp.DataElement.SEQUENCE 247 and len(profile_descriptor.value) >= 2 248 ): 249 avdtp_version_major = profile_descriptor.value[1].value >> 8 250 avdtp_version_minor = profile_descriptor.value[1].value & 0xFF 251 return (avdtp_version_major, avdtp_version_minor) 252 return None 253 254 255# ----------------------------------------------------------------------------- 256async def find_avdtp_service_with_connection( 257 connection: device.Connection, 258) -> Optional[Tuple[int, int]]: 259 ''' 260 Find an AVDTP service, for a connection, and return its version, 261 or None if none is found 262 ''' 263 264 sdp_client = sdp.Client(connection) 265 await sdp_client.connect() 266 service_version = await find_avdtp_service_with_sdp_client(sdp_client) 267 await sdp_client.disconnect() 268 269 return service_version 270 271 272# ----------------------------------------------------------------------------- 273class RealtimeClock: 274 def now(self) -> float: 275 return time.time() 276 277 async def sleep(self, duration: float) -> None: 278 await asyncio.sleep(duration) 279 280 281# ----------------------------------------------------------------------------- 282class MediaPacket: 283 @staticmethod 284 def from_bytes(data: bytes) -> MediaPacket: 285 version = (data[0] >> 6) & 0x03 286 padding = (data[0] >> 5) & 0x01 287 extension = (data[0] >> 4) & 0x01 288 csrc_count = data[0] & 0x0F 289 marker = (data[1] >> 7) & 0x01 290 payload_type = data[1] & 0x7F 291 sequence_number = struct.unpack_from('>H', data, 2)[0] 292 timestamp = struct.unpack_from('>I', data, 4)[0] 293 ssrc = struct.unpack_from('>I', data, 8)[0] 294 csrc_list = [ 295 struct.unpack_from('>I', data, 12 + i)[0] for i in range(csrc_count) 296 ] 297 payload = data[12 + csrc_count * 4 :] 298 299 return MediaPacket( 300 version, 301 padding, 302 extension, 303 marker, 304 sequence_number, 305 timestamp, 306 ssrc, 307 csrc_list, 308 payload_type, 309 payload, 310 ) 311 312 def __init__( 313 self, 314 version: int, 315 padding: int, 316 extension: int, 317 marker: int, 318 sequence_number: int, 319 timestamp: int, 320 ssrc: int, 321 csrc_list: List[int], 322 payload_type: int, 323 payload: bytes, 324 ) -> None: 325 self.version = version 326 self.padding = padding 327 self.extension = extension 328 self.marker = marker 329 self.sequence_number = sequence_number & 0xFFFF 330 self.timestamp = timestamp & 0xFFFFFFFF 331 self.ssrc = ssrc 332 self.csrc_list = csrc_list 333 self.payload_type = payload_type 334 self.payload = payload 335 336 def __bytes__(self) -> bytes: 337 header = bytes( 338 [ 339 self.version << 6 340 | self.padding << 5 341 | self.extension << 4 342 | len(self.csrc_list), 343 self.marker << 7 | self.payload_type, 344 ] 345 ) + struct.pack( 346 '>HII', 347 self.sequence_number, 348 self.timestamp, 349 self.ssrc, 350 ) 351 for csrc in self.csrc_list: 352 header += struct.pack('>I', csrc) 353 return header + self.payload 354 355 def __str__(self) -> str: 356 return ( 357 f'RTP(v={self.version},' 358 f'p={self.padding},' 359 f'x={self.extension},' 360 f'm={self.marker},' 361 f'pt={self.payload_type},' 362 f'sn={self.sequence_number},' 363 f'ts={self.timestamp},' 364 f'ssrc={self.ssrc},' 365 f'csrcs={self.csrc_list},' 366 f'payload_size={len(self.payload)})' 367 ) 368 369 370# ----------------------------------------------------------------------------- 371class MediaPacketPump: 372 pump_task: Optional[asyncio.Task] 373 374 def __init__( 375 self, packets: AsyncGenerator, clock: RealtimeClock = RealtimeClock() 376 ) -> None: 377 self.packets = packets 378 self.clock = clock 379 self.pump_task = None 380 381 async def start(self, rtp_channel: l2cap.ClassicChannel) -> None: 382 async def pump_packets(): 383 start_time = 0 384 start_timestamp = 0 385 386 try: 387 logger.debug('pump starting') 388 async for packet in self.packets: 389 # Capture the timestamp of the first packet 390 if start_time == 0: 391 start_time = self.clock.now() 392 start_timestamp = packet.timestamp_seconds 393 394 # Wait until we can send 395 when = start_time + (packet.timestamp_seconds - start_timestamp) 396 now = self.clock.now() 397 if when > now: 398 delay = when - now 399 logger.debug(f'waiting for {delay}') 400 await self.clock.sleep(delay) 401 402 # Emit 403 rtp_channel.send_pdu(bytes(packet)) 404 logger.debug( 405 f'{color(">>> sending RTP packet:", "green")} {packet}' 406 ) 407 except asyncio.exceptions.CancelledError: 408 logger.debug('pump canceled') 409 410 # Pump packets 411 self.pump_task = asyncio.create_task(pump_packets()) 412 413 async def stop(self) -> None: 414 # Stop the pump 415 if self.pump_task: 416 self.pump_task.cancel() 417 await self.pump_task 418 self.pump_task = None 419 420 421# ----------------------------------------------------------------------------- 422class MessageAssembler: 423 message: Optional[bytes] 424 425 def __init__(self, callback: Callable[[int, Message], Any]) -> None: 426 self.callback = callback 427 self.reset() 428 429 def reset(self) -> None: 430 self.transaction_label = 0 431 self.message = None 432 self.message_type = Message.MessageType.COMMAND 433 self.signal_identifier = 0 434 self.number_of_signal_packets = 0 435 self.packet_count = 0 436 437 def on_pdu(self, pdu: bytes) -> None: 438 self.packet_count += 1 439 440 transaction_label = pdu[0] >> 4 441 packet_type = Protocol.PacketType((pdu[0] >> 2) & 3) 442 message_type = Message.MessageType(pdu[0] & 3) 443 444 logger.debug( 445 f'transaction_label={transaction_label}, ' 446 f'packet_type={packet_type.name}, ' 447 f'message_type={message_type.name}' 448 ) 449 if packet_type in ( 450 Protocol.PacketType.SINGLE_PACKET, 451 Protocol.PacketType.START_PACKET, 452 ): 453 if self.message is not None: 454 # The previous message has not been terminated 455 logger.warning( 456 'received a start or single packet when expecting an end or ' 457 'continuation' 458 ) 459 self.reset() 460 461 self.transaction_label = transaction_label 462 self.signal_identifier = pdu[1] & 0x3F 463 self.message_type = message_type 464 465 if packet_type == Protocol.PacketType.SINGLE_PACKET: 466 self.message = pdu[2:] 467 self.on_message_complete() 468 else: 469 self.number_of_signal_packets = pdu[2] 470 self.message = pdu[3:] 471 elif packet_type in ( 472 Protocol.PacketType.CONTINUE_PACKET, 473 Protocol.PacketType.END_PACKET, 474 ): 475 if self.packet_count == 0: 476 logger.warning('unexpected continuation') 477 return 478 479 if transaction_label != self.transaction_label: 480 logger.warning( 481 f'transaction label mismatch: expected {self.transaction_label}, ' 482 f'received {transaction_label}' 483 ) 484 return 485 486 if message_type != self.message_type: 487 logger.warning( 488 f'message type mismatch: expected {self.message_type}, ' 489 f'received {message_type}' 490 ) 491 return 492 493 self.message = (self.message or b'') + pdu[1:] 494 495 if packet_type == Protocol.PacketType.END_PACKET: 496 if self.packet_count != self.number_of_signal_packets: 497 logger.warning( 498 'incomplete fragmented message: ' 499 f'expected {self.number_of_signal_packets} packets, ' 500 f'received {self.packet_count}' 501 ) 502 self.reset() 503 return 504 505 self.on_message_complete() 506 else: 507 if self.packet_count > self.number_of_signal_packets: 508 logger.warning( 509 'too many packets: ' 510 f'expected {self.number_of_signal_packets}, ' 511 f'received {self.packet_count}' 512 ) 513 self.reset() 514 return 515 516 def on_message_complete(self) -> None: 517 message = Message.create( 518 self.signal_identifier, self.message_type, self.message or b'' 519 ) 520 try: 521 self.callback(self.transaction_label, message) 522 except Exception as error: 523 logger.exception(color(f'!!! exception in callback: {error}', 'red')) 524 525 self.reset() 526 527 528# ----------------------------------------------------------------------------- 529class ServiceCapabilities: 530 @staticmethod 531 def create( 532 service_category: int, service_capabilities_bytes: bytes 533 ) -> ServiceCapabilities: 534 # Select the appropriate subclass 535 cls: Type[ServiceCapabilities] 536 if service_category == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY: 537 cls = MediaCodecCapabilities 538 else: 539 cls = ServiceCapabilities 540 541 # Create an instance and initialize it 542 instance = cls.__new__(cls) 543 instance.service_category = service_category 544 instance.service_capabilities_bytes = service_capabilities_bytes 545 instance.init_from_bytes() 546 547 return instance 548 549 @staticmethod 550 def parse_capabilities(payload: bytes) -> List[ServiceCapabilities]: 551 capabilities = [] 552 while payload: 553 service_category = payload[0] 554 length_of_service_capabilities = payload[1] 555 service_capabilities_bytes = payload[2 : 2 + length_of_service_capabilities] 556 capabilities.append( 557 ServiceCapabilities.create(service_category, service_capabilities_bytes) 558 ) 559 560 payload = payload[2 + length_of_service_capabilities :] 561 562 return capabilities 563 564 @staticmethod 565 def serialize_capabilities(capabilities: Iterable[ServiceCapabilities]) -> bytes: 566 serialized = b'' 567 for item in capabilities: 568 serialized += ( 569 bytes([item.service_category, len(item.service_capabilities_bytes)]) 570 + item.service_capabilities_bytes 571 ) 572 return serialized 573 574 def init_from_bytes(self) -> None: 575 pass 576 577 def __init__( 578 self, service_category: int, service_capabilities_bytes: bytes = b'' 579 ) -> None: 580 self.service_category = service_category 581 self.service_capabilities_bytes = service_capabilities_bytes 582 583 def to_string(self, details: Optional[List[str]] = None) -> str: 584 attributes = ','.join( 585 [name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)] 586 + (details or []) 587 ) 588 return f'ServiceCapabilities({attributes})' 589 590 def __str__(self) -> str: 591 if self.service_capabilities_bytes: 592 details = [self.service_capabilities_bytes.hex()] 593 else: 594 details = [] 595 return self.to_string(details) 596 597 598# ----------------------------------------------------------------------------- 599class MediaCodecCapabilities(ServiceCapabilities): 600 media_codec_information: Union[bytes, SupportsBytes] 601 media_type: int 602 media_codec_type: int 603 604 def init_from_bytes(self) -> None: 605 self.media_type = self.service_capabilities_bytes[0] 606 self.media_codec_type = self.service_capabilities_bytes[1] 607 self.media_codec_information = self.service_capabilities_bytes[2:] 608 609 if self.media_codec_type == A2DP_SBC_CODEC_TYPE: 610 self.media_codec_information = SbcMediaCodecInformation.from_bytes( 611 self.media_codec_information 612 ) 613 elif self.media_codec_type == A2DP_MPEG_2_4_AAC_CODEC_TYPE: 614 self.media_codec_information = AacMediaCodecInformation.from_bytes( 615 self.media_codec_information 616 ) 617 elif self.media_codec_type == A2DP_NON_A2DP_CODEC_TYPE: 618 self.media_codec_information = ( 619 VendorSpecificMediaCodecInformation.from_bytes( 620 self.media_codec_information 621 ) 622 ) 623 624 def __init__( 625 self, 626 media_type: int, 627 media_codec_type: int, 628 media_codec_information: Union[bytes, SupportsBytes], 629 ) -> None: 630 super().__init__( 631 AVDTP_MEDIA_CODEC_SERVICE_CATEGORY, 632 bytes([media_type, media_codec_type]) + bytes(media_codec_information), 633 ) 634 self.media_type = media_type 635 self.media_codec_type = media_codec_type 636 self.media_codec_information = media_codec_information 637 638 def __str__(self) -> str: 639 codec_info = ( 640 self.media_codec_information.hex() 641 if isinstance(self.media_codec_information, bytes) 642 else str(self.media_codec_information) 643 ) 644 645 details = [ 646 f'media_type={name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}', 647 f'codec={name_or_number(A2DP_CODEC_TYPE_NAMES, self.media_codec_type)}', 648 f'codec_info={codec_info}', 649 ] 650 return self.to_string(details) 651 652 653# ----------------------------------------------------------------------------- 654class EndPointInfo: 655 @staticmethod 656 def from_bytes(payload: bytes) -> EndPointInfo: 657 return EndPointInfo( 658 payload[0] >> 2, payload[0] >> 1 & 1, payload[1] >> 4, payload[1] >> 3 & 1 659 ) 660 661 def __bytes__(self) -> bytes: 662 return bytes( 663 [self.seid << 2 | self.in_use << 1, self.media_type << 4 | self.tsep << 3] 664 ) 665 666 def __init__(self, seid: int, in_use: int, media_type: int, tsep: int) -> None: 667 self.seid = seid 668 self.in_use = in_use 669 self.media_type = media_type 670 self.tsep = tsep 671 672 673# ----------------------------------------------------------------------------- 674class Message: # pylint:disable=attribute-defined-outside-init 675 class MessageType(enum.IntEnum): 676 COMMAND = 0 677 GENERAL_REJECT = 1 678 RESPONSE_ACCEPT = 2 679 RESPONSE_REJECT = 3 680 681 # Subclasses, by signal identifier and message type 682 subclasses: Dict[int, Dict[int, Type[Message]]] = {} 683 message_type: MessageType 684 signal_identifier: int 685 686 @staticmethod 687 def subclass(subclass): 688 # Infer the signal identifier and message subtype from the class name 689 name = subclass.__name__ 690 if name == 'General_Reject': 691 subclass.signal_identifier = 0 692 signal_identifier_str = None 693 message_type = Message.MessageType.COMMAND 694 elif name.endswith('_Command'): 695 signal_identifier_str = name[:-8] 696 message_type = Message.MessageType.COMMAND 697 elif name.endswith('_Response'): 698 signal_identifier_str = name[:-9] 699 message_type = Message.MessageType.RESPONSE_ACCEPT 700 elif name.endswith('_Reject'): 701 signal_identifier_str = name[:-7] 702 message_type = Message.MessageType.RESPONSE_REJECT 703 else: 704 raise InvalidArgumentError('invalid class name') 705 706 subclass.message_type = message_type 707 708 if signal_identifier_str is not None: 709 for name, signal_identifier in AVDTP_SIGNAL_IDENTIFIERS.items(): 710 if name.lower().endswith(signal_identifier_str.lower()): 711 subclass.signal_identifier = signal_identifier 712 break 713 714 # Register the subclass 715 Message.subclasses.setdefault(subclass.signal_identifier, {})[ 716 subclass.message_type 717 ] = subclass 718 719 return subclass 720 721 # Factory method to create a subclass based on the signal identifier and message 722 # type 723 @staticmethod 724 def create( 725 signal_identifier: int, message_type: MessageType, payload: bytes 726 ) -> Message: 727 # Look for a registered subclass 728 subclasses = Message.subclasses.get(signal_identifier) 729 if subclasses: 730 subclass = subclasses.get(message_type) 731 if subclass: 732 instance = subclass.__new__(subclass) 733 instance.payload = payload 734 instance.init_from_payload() 735 return instance 736 737 # Instantiate the appropriate class based on the message type 738 if message_type == Message.MessageType.RESPONSE_REJECT: 739 # Assume a simple reject message 740 instance = Simple_Reject(payload) 741 instance.init_from_payload() 742 else: 743 instance = Message(payload) 744 instance.signal_identifier = signal_identifier 745 instance.message_type = message_type 746 return instance 747 748 def init_from_payload(self) -> None: 749 pass 750 751 def __init__(self, payload: bytes = b'') -> None: 752 self.payload = payload 753 754 def to_string(self, details: Union[str, Iterable[str]]) -> str: 755 base = color( 756 f'{name_or_number(AVDTP_SIGNAL_NAMES, self.signal_identifier)}_' 757 f'{self.message_type.name}', 758 'yellow', 759 ) 760 761 if details: 762 if isinstance(details, str): 763 return f'{base}: {details}' 764 765 return ( 766 base 767 + ':\n' 768 + '\n'.join([' ' + color(detail, 'cyan') for detail in details]) 769 ) 770 771 return base 772 773 def __str__(self) -> str: 774 return self.to_string(self.payload.hex()) 775 776 777# ----------------------------------------------------------------------------- 778class Simple_Command(Message): 779 ''' 780 Command message with just one seid 781 ''' 782 783 def init_from_payload(self): 784 self.acp_seid = self.payload[0] >> 2 785 786 def __init__(self, seid): 787 super().__init__(payload=bytes([seid << 2])) 788 self.acp_seid = seid 789 790 def __str__(self) -> str: 791 return self.to_string([f'ACP SEID: {self.acp_seid}']) 792 793 794# ----------------------------------------------------------------------------- 795class Simple_Reject(Message): 796 ''' 797 Reject messages with just an error code 798 ''' 799 800 def init_from_payload(self): 801 self.error_code = self.payload[0] 802 803 def __init__(self, error_code): 804 super().__init__(payload=bytes([error_code])) 805 self.error_code = error_code 806 807 def __str__(self) -> str: 808 details = [f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}'] 809 return self.to_string(details) 810 811 812# ----------------------------------------------------------------------------- 813@Message.subclass 814class Discover_Command(Message): 815 ''' 816 See Bluetooth AVDTP spec - 8.6.1 Stream End Point Discovery Command 817 ''' 818 819 820# ----------------------------------------------------------------------------- 821@Message.subclass 822class Discover_Response(Message): 823 ''' 824 See Bluetooth AVDTP spec - 8.6.2 Stream End Point Discovery Response 825 ''' 826 827 endpoints: List[EndPointInfo] 828 829 def init_from_payload(self): 830 self.endpoints = [] 831 endpoint_count = len(self.payload) // 2 832 for i in range(endpoint_count): 833 self.endpoints.append( 834 EndPointInfo.from_bytes(self.payload[i * 2 : (i + 1) * 2]) 835 ) 836 837 def __init__(self, endpoints): 838 super().__init__(payload=b''.join([bytes(endpoint) for endpoint in endpoints])) 839 self.endpoints = endpoints 840 841 def __str__(self) -> str: 842 details = [] 843 for endpoint in self.endpoints: 844 details.extend( 845 # pylint: disable=line-too-long 846 [ 847 f'ACP SEID: {endpoint.seid}', 848 f' in_use: {endpoint.in_use}', 849 f' media_type: {name_or_number(AVDTP_MEDIA_TYPE_NAMES, endpoint.media_type)}', 850 f' tsep: {name_or_number(AVDTP_TSEP_NAMES, endpoint.tsep)}', 851 ] 852 ) 853 return self.to_string(details) 854 855 856# ----------------------------------------------------------------------------- 857@Message.subclass 858class Get_Capabilities_Command(Simple_Command): 859 ''' 860 See Bluetooth AVDTP spec - 8.7.1 Get Capabilities Command 861 ''' 862 863 864# ----------------------------------------------------------------------------- 865@Message.subclass 866class Get_Capabilities_Response(Message): 867 ''' 868 See Bluetooth AVDTP spec - 8.7.2 Get All Capabilities Response 869 ''' 870 871 def init_from_payload(self): 872 self.capabilities = ServiceCapabilities.parse_capabilities(self.payload) 873 874 def __init__(self, capabilities): 875 super().__init__( 876 payload=ServiceCapabilities.serialize_capabilities(capabilities) 877 ) 878 self.capabilities = capabilities 879 880 def __str__(self) -> str: 881 details = [str(capability) for capability in self.capabilities] 882 return self.to_string(details) 883 884 885# ----------------------------------------------------------------------------- 886@Message.subclass 887class Get_Capabilities_Reject(Simple_Reject): 888 ''' 889 See Bluetooth AVDTP spec - 8.7.3 Get Capabilities Reject 890 ''' 891 892 893# ----------------------------------------------------------------------------- 894@Message.subclass 895class Get_All_Capabilities_Command(Get_Capabilities_Command): 896 ''' 897 See Bluetooth AVDTP spec - 8.8.1 Get All Capabilities Command 898 ''' 899 900 901# ----------------------------------------------------------------------------- 902@Message.subclass 903class Get_All_Capabilities_Response(Get_Capabilities_Response): 904 ''' 905 See Bluetooth AVDTP spec - 8.8.2 Get All Capabilities Response 906 ''' 907 908 909# ----------------------------------------------------------------------------- 910@Message.subclass 911class Get_All_Capabilities_Reject(Simple_Reject): 912 ''' 913 See Bluetooth AVDTP spec - 8.8.3 Get All Capabilities Reject 914 ''' 915 916 917# ----------------------------------------------------------------------------- 918@Message.subclass 919class Set_Configuration_Command(Message): 920 ''' 921 See Bluetooth AVDTP spec - 8.9.1 Set Configuration Command 922 ''' 923 924 def init_from_payload(self): 925 self.acp_seid = self.payload[0] >> 2 926 self.int_seid = self.payload[1] >> 2 927 self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[2:]) 928 929 def __init__( 930 self, acp_seid: int, int_seid: int, capabilities: Iterable[ServiceCapabilities] 931 ) -> None: 932 super().__init__( 933 payload=bytes([acp_seid << 2, int_seid << 2]) 934 + ServiceCapabilities.serialize_capabilities(capabilities) 935 ) 936 self.acp_seid = acp_seid 937 self.int_seid = int_seid 938 self.capabilities = capabilities 939 940 def __str__(self) -> str: 941 details = [f'ACP SEID: {self.acp_seid}', f'INT SEID: {self.int_seid}'] + [ 942 str(capability) for capability in self.capabilities 943 ] 944 return self.to_string(details) 945 946 947# ----------------------------------------------------------------------------- 948@Message.subclass 949class Set_Configuration_Response(Message): 950 ''' 951 See Bluetooth AVDTP spec - 8.9.2 Set Configuration Response 952 ''' 953 954 955# ----------------------------------------------------------------------------- 956@Message.subclass 957class Set_Configuration_Reject(Message): 958 ''' 959 See Bluetooth AVDTP spec - 8.9.3 Set Configuration Reject 960 ''' 961 962 def init_from_payload(self): 963 self.service_category = self.payload[0] 964 self.error_code = self.payload[1] 965 966 def __init__(self, service_category, error_code): 967 super().__init__(payload=bytes([service_category, error_code])) 968 self.service_category = service_category 969 self.error_code = error_code 970 971 def __str__(self) -> str: 972 details = [ 973 ( 974 'service_category: ' 975 f'{name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)}' 976 ), 977 ( 978 'error_code: ' 979 f'{name_or_number(AVDTP_ERROR_NAMES, self.error_code)}' 980 ), 981 ] 982 return self.to_string(details) 983 984 985# ----------------------------------------------------------------------------- 986@Message.subclass 987class Get_Configuration_Command(Simple_Command): 988 ''' 989 See Bluetooth AVDTP spec - 8.10.1 Get Configuration Command 990 ''' 991 992 993# ----------------------------------------------------------------------------- 994@Message.subclass 995class Get_Configuration_Response(Message): 996 ''' 997 See Bluetooth AVDTP spec - 8.10.2 Get Configuration Response 998 ''' 999 1000 def init_from_payload(self): 1001 self.capabilities = ServiceCapabilities.parse_capabilities(self.payload) 1002 1003 def __init__(self, capabilities: Iterable[ServiceCapabilities]) -> None: 1004 super().__init__( 1005 payload=ServiceCapabilities.serialize_capabilities(capabilities) 1006 ) 1007 self.capabilities = capabilities 1008 1009 def __str__(self) -> str: 1010 details = [str(capability) for capability in self.capabilities] 1011 return self.to_string(details) 1012 1013 1014# ----------------------------------------------------------------------------- 1015@Message.subclass 1016class Get_Configuration_Reject(Simple_Reject): 1017 ''' 1018 See Bluetooth AVDTP spec - 8.10.3 Get Configuration Reject 1019 ''' 1020 1021 1022# ----------------------------------------------------------------------------- 1023@Message.subclass 1024class Reconfigure_Command(Message): 1025 ''' 1026 See Bluetooth AVDTP spec - 8.11.1 Reconfigure Command 1027 ''' 1028 1029 def init_from_payload(self): 1030 # pylint: disable=attribute-defined-outside-init 1031 self.acp_seid = self.payload[0] >> 2 1032 self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[1:]) 1033 1034 def __str__(self) -> str: 1035 details = [ 1036 f'ACP SEID: {self.acp_seid}', 1037 ] + [str(capability) for capability in self.capabilities] 1038 return self.to_string(details) 1039 1040 1041# ----------------------------------------------------------------------------- 1042@Message.subclass 1043class Reconfigure_Response(Message): 1044 ''' 1045 See Bluetooth AVDTP spec - 8.11.2 Reconfigure Response 1046 ''' 1047 1048 1049# ----------------------------------------------------------------------------- 1050@Message.subclass 1051class Reconfigure_Reject(Set_Configuration_Reject): 1052 ''' 1053 See Bluetooth AVDTP spec - 8.11.3 Reconfigure Reject 1054 ''' 1055 1056 1057# ----------------------------------------------------------------------------- 1058@Message.subclass 1059class Open_Command(Simple_Command): 1060 ''' 1061 See Bluetooth AVDTP spec - 8.12.1 Open Stream Command 1062 ''' 1063 1064 1065# ----------------------------------------------------------------------------- 1066@Message.subclass 1067class Open_Response(Message): 1068 ''' 1069 See Bluetooth AVDTP spec - 8.12.2 Open Stream Response 1070 ''' 1071 1072 1073# ----------------------------------------------------------------------------- 1074@Message.subclass 1075class Open_Reject(Simple_Reject): 1076 ''' 1077 See Bluetooth AVDTP spec - 8.12.3 Open Stream Reject 1078 ''' 1079 1080 1081# ----------------------------------------------------------------------------- 1082@Message.subclass 1083class Start_Command(Message): 1084 ''' 1085 See Bluetooth AVDTP spec - 8.13.1 Start Stream Command 1086 ''' 1087 1088 def init_from_payload(self): 1089 self.acp_seids = [x >> 2 for x in self.payload] 1090 1091 def __init__(self, seids: Iterable[int]) -> None: 1092 super().__init__(payload=bytes([seid << 2 for seid in seids])) 1093 self.acp_seids = seids 1094 1095 def __str__(self) -> str: 1096 return self.to_string([f'ACP SEIDs: {self.acp_seids}']) 1097 1098 1099# ----------------------------------------------------------------------------- 1100@Message.subclass 1101class Start_Response(Message): 1102 ''' 1103 See Bluetooth AVDTP spec - 8.13.2 Start Stream Response 1104 ''' 1105 1106 1107# ----------------------------------------------------------------------------- 1108@Message.subclass 1109class Start_Reject(Message): 1110 ''' 1111 See Bluetooth AVDTP spec - 8.13.3 Set Configuration Reject 1112 ''' 1113 1114 def init_from_payload(self): 1115 self.acp_seid = self.payload[0] >> 2 1116 self.error_code = self.payload[1] 1117 1118 def __init__(self, acp_seid, error_code): 1119 super().__init__(payload=bytes([acp_seid << 2, error_code])) 1120 self.acp_seid = acp_seid 1121 self.error_code = error_code 1122 1123 def __str__(self) -> str: 1124 details = [ 1125 f'acp_seid: {self.acp_seid}', 1126 f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}', 1127 ] 1128 return self.to_string(details) 1129 1130 1131# ----------------------------------------------------------------------------- 1132@Message.subclass 1133class Close_Command(Simple_Command): 1134 ''' 1135 See Bluetooth AVDTP spec - 8.14.1 Close Stream Command 1136 ''' 1137 1138 1139# ----------------------------------------------------------------------------- 1140@Message.subclass 1141class Close_Response(Message): 1142 ''' 1143 See Bluetooth AVDTP spec - 8.14.2 Close Stream Response 1144 ''' 1145 1146 1147# ----------------------------------------------------------------------------- 1148@Message.subclass 1149class Close_Reject(Simple_Reject): 1150 ''' 1151 See Bluetooth AVDTP spec - 8.14.3 Close Stream Reject 1152 ''' 1153 1154 1155# ----------------------------------------------------------------------------- 1156@Message.subclass 1157class Suspend_Command(Start_Command): 1158 ''' 1159 See Bluetooth AVDTP spec - 8.15.1 Suspend Command 1160 ''' 1161 1162 1163# ----------------------------------------------------------------------------- 1164@Message.subclass 1165class Suspend_Response(Message): 1166 ''' 1167 See Bluetooth AVDTP spec - 8.15.2 Suspend Response 1168 ''' 1169 1170 1171# ----------------------------------------------------------------------------- 1172@Message.subclass 1173class Suspend_Reject(Start_Reject): 1174 ''' 1175 See Bluetooth AVDTP spec - 8.15.3 Suspend Reject 1176 ''' 1177 1178 1179# ----------------------------------------------------------------------------- 1180@Message.subclass 1181class Abort_Command(Simple_Command): 1182 ''' 1183 See Bluetooth AVDTP spec - 8.16.1 Abort Command 1184 ''' 1185 1186 1187# ----------------------------------------------------------------------------- 1188@Message.subclass 1189class Abort_Response(Message): 1190 ''' 1191 See Bluetooth AVDTP spec - 8.16.2 Abort Response 1192 ''' 1193 1194 1195# ----------------------------------------------------------------------------- 1196@Message.subclass 1197class Security_Control_Command(Message): 1198 ''' 1199 See Bluetooth AVDTP spec - 8.17.1 Security Control Command 1200 ''' 1201 1202 1203# ----------------------------------------------------------------------------- 1204@Message.subclass 1205class Security_Control_Response(Message): 1206 ''' 1207 See Bluetooth AVDTP spec - 8.17.2 Security Control Response 1208 ''' 1209 1210 1211# ----------------------------------------------------------------------------- 1212@Message.subclass 1213class Security_Control_Reject(Simple_Reject): 1214 ''' 1215 See Bluetooth AVDTP spec - 8.17.3 Security Control Reject 1216 ''' 1217 1218 1219# ----------------------------------------------------------------------------- 1220@Message.subclass 1221class General_Reject(Message): 1222 ''' 1223 See Bluetooth AVDTP spec - 8.18 General Reject 1224 ''' 1225 1226 def to_string(self, details): 1227 return color('GENERAL_REJECT', 'yellow') 1228 1229 1230# ----------------------------------------------------------------------------- 1231@Message.subclass 1232class DelayReport_Command(Message): 1233 ''' 1234 See Bluetooth AVDTP spec - 8.19.1 Delay Report Command 1235 ''' 1236 1237 def init_from_payload(self): 1238 # pylint: disable=attribute-defined-outside-init 1239 self.acp_seid = self.payload[0] >> 2 1240 self.delay = (self.payload[1] << 8) | (self.payload[2]) 1241 1242 def __str__(self) -> str: 1243 return self.to_string([f'ACP_SEID: {self.acp_seid}', f'delay: {self.delay}']) 1244 1245 1246# ----------------------------------------------------------------------------- 1247@Message.subclass 1248class DelayReport_Response(Message): 1249 ''' 1250 See Bluetooth AVDTP spec - 8.19.2 Delay Report Response 1251 ''' 1252 1253 1254# ----------------------------------------------------------------------------- 1255@Message.subclass 1256class DelayReport_Reject(Simple_Reject): 1257 ''' 1258 See Bluetooth AVDTP spec - 8.19.3 Delay Report Reject 1259 ''' 1260 1261 1262# ----------------------------------------------------------------------------- 1263class Protocol(EventEmitter): 1264 local_endpoints: List[LocalStreamEndPoint] 1265 remote_endpoints: Dict[int, DiscoveredStreamEndPoint] 1266 streams: Dict[int, Stream] 1267 transaction_results: List[Optional[asyncio.Future[Message]]] 1268 channel_connector: Callable[[], Awaitable[l2cap.ClassicChannel]] 1269 1270 class PacketType(enum.IntEnum): 1271 SINGLE_PACKET = 0 1272 START_PACKET = 1 1273 CONTINUE_PACKET = 2 1274 END_PACKET = 3 1275 1276 @staticmethod 1277 def packet_type_name(packet_type): 1278 return name_or_number(Protocol.PACKET_TYPE_NAMES, packet_type) 1279 1280 @staticmethod 1281 async def connect( 1282 connection: device.Connection, version: Tuple[int, int] = (1, 3) 1283 ) -> Protocol: 1284 channel = await connection.create_l2cap_channel( 1285 spec=l2cap.ClassicChannelSpec(psm=AVDTP_PSM) 1286 ) 1287 protocol = Protocol(channel, version) 1288 1289 return protocol 1290 1291 def __init__( 1292 self, l2cap_channel: l2cap.ClassicChannel, version: Tuple[int, int] = (1, 3) 1293 ) -> None: 1294 super().__init__() 1295 self.l2cap_channel = l2cap_channel 1296 self.version = version 1297 self.rtx_sig_timer = AVDTP_DEFAULT_RTX_SIG_TIMER 1298 self.message_assembler = MessageAssembler(self.on_message) 1299 self.transaction_results = [None] * 16 # Futures for up to 16 transactions 1300 self.transaction_semaphore = asyncio.Semaphore(16) 1301 self.transaction_count = 0 1302 self.channel_acceptor = None 1303 self.local_endpoints = [] # Local endpoints, with contiguous seid values 1304 self.remote_endpoints = {} # Remote stream endpoints, by seid 1305 self.streams = {} # Streams, by seid 1306 1307 # Register to receive PDUs from the channel 1308 l2cap_channel.sink = self.on_pdu 1309 l2cap_channel.on('open', self.on_l2cap_channel_open) 1310 l2cap_channel.on('close', self.on_l2cap_channel_close) 1311 1312 def get_local_endpoint_by_seid(self, seid: int) -> Optional[LocalStreamEndPoint]: 1313 if 0 < seid <= len(self.local_endpoints): 1314 return self.local_endpoints[seid - 1] 1315 1316 return None 1317 1318 def add_source( 1319 self, codec_capabilities: MediaCodecCapabilities, packet_pump: MediaPacketPump 1320 ) -> LocalSource: 1321 seid = len(self.local_endpoints) + 1 1322 source = LocalSource(self, seid, codec_capabilities, packet_pump) 1323 self.local_endpoints.append(source) 1324 1325 return source 1326 1327 def add_sink(self, codec_capabilities: MediaCodecCapabilities) -> LocalSink: 1328 seid = len(self.local_endpoints) + 1 1329 sink = LocalSink(self, seid, codec_capabilities) 1330 self.local_endpoints.append(sink) 1331 1332 return sink 1333 1334 async def create_stream( 1335 self, source: LocalStreamEndPoint, sink: StreamEndPointProxy 1336 ) -> Stream: 1337 # Check that the source isn't already used in a stream 1338 if source.in_use: 1339 raise InvalidStateError('source already in use') 1340 1341 # Create or reuse a new stream to associate the source and the sink 1342 if source.seid in self.streams: 1343 stream = self.streams[source.seid] 1344 else: 1345 stream = Stream(self, source, sink) 1346 self.streams[source.seid] = stream 1347 1348 # The stream can now be configured 1349 await stream.configure() 1350 1351 return stream 1352 1353 async def discover_remote_endpoints(self) -> Iterable[DiscoveredStreamEndPoint]: 1354 self.remote_endpoints = {} 1355 1356 response: Discover_Response = await self.send_command(Discover_Command()) 1357 for endpoint_entry in response.endpoints: 1358 logger.debug( 1359 f'getting endpoint capabilities for endpoint {endpoint_entry.seid}' 1360 ) 1361 get_capabilities_response = await self.get_capabilities(endpoint_entry.seid) 1362 endpoint = DiscoveredStreamEndPoint( 1363 self, 1364 endpoint_entry.seid, 1365 endpoint_entry.media_type, 1366 endpoint_entry.tsep, 1367 endpoint_entry.in_use, 1368 get_capabilities_response.capabilities, 1369 ) 1370 self.remote_endpoints[endpoint_entry.seid] = endpoint 1371 1372 return self.remote_endpoints.values() 1373 1374 def find_remote_sink_by_codec( 1375 self, media_type: int, codec_type: int 1376 ) -> Optional[DiscoveredStreamEndPoint]: 1377 for endpoint in self.remote_endpoints.values(): 1378 if ( 1379 not endpoint.in_use 1380 and endpoint.media_type == media_type 1381 and endpoint.tsep == AVDTP_TSEP_SNK 1382 ): 1383 has_media_transport = False 1384 has_codec = False 1385 for capabilities in endpoint.capabilities: 1386 if ( 1387 capabilities.service_category 1388 == AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY 1389 ): 1390 has_media_transport = True 1391 elif ( 1392 capabilities.service_category 1393 == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY 1394 ): 1395 codec_capabilities = cast(MediaCodecCapabilities, capabilities) 1396 if ( 1397 codec_capabilities.media_type == AVDTP_AUDIO_MEDIA_TYPE 1398 and codec_capabilities.media_codec_type == codec_type 1399 ): 1400 has_codec = True 1401 if has_media_transport and has_codec: 1402 return endpoint 1403 1404 return None 1405 1406 def on_pdu(self, pdu: bytes) -> None: 1407 self.message_assembler.on_pdu(pdu) 1408 1409 def on_message(self, transaction_label: int, message: Message) -> None: 1410 logger.debug( 1411 f'{color("<<< Received AVDTP message", "magenta")}: ' 1412 f'[{transaction_label}] {message}' 1413 ) 1414 1415 # Check that the identifier is not reserved 1416 if message.signal_identifier == 0: 1417 logger.warning('!!! reserved signal identifier') 1418 return 1419 1420 # Check that the identifier is valid 1421 if ( 1422 message.signal_identifier < 0 1423 or message.signal_identifier > AVDTP_DELAYREPORT 1424 ): 1425 logger.warning('!!! invalid signal identifier') 1426 self.send_message(transaction_label, General_Reject()) 1427 1428 if message.message_type == Message.MessageType.COMMAND: 1429 # Command 1430 signal_name = ( 1431 AVDTP_SIGNAL_NAMES.get(message.signal_identifier, "") 1432 .replace("AVDTP_", "") 1433 .lower() 1434 ) 1435 handler_name = f'on_{signal_name}_command' 1436 handler = getattr(self, handler_name, None) 1437 if handler: 1438 try: 1439 response = handler(message) 1440 self.send_message(transaction_label, response) 1441 except Exception as error: 1442 logger.warning( 1443 f'{color("!!! Exception in handler:", "red")} {error}' 1444 ) 1445 else: 1446 logger.warning('unhandled command') 1447 else: 1448 # Response, look for a pending transaction with the same label 1449 transaction_result = self.transaction_results[transaction_label] 1450 if transaction_result is None: 1451 logger.warning(color('!!! no pending transaction for label', 'red')) 1452 return 1453 1454 transaction_result.set_result(message) 1455 self.transaction_results[transaction_label] = None 1456 self.transaction_semaphore.release() 1457 1458 def on_l2cap_connection(self, channel): 1459 # Forward the channel to the endpoint that's expecting it 1460 if self.channel_acceptor is None: 1461 logger.warning(color('!!! l2cap connection with no acceptor', 'red')) 1462 return 1463 self.channel_acceptor.on_l2cap_connection(channel) 1464 1465 def on_l2cap_channel_open(self): 1466 logger.debug(color('<<< L2CAP channel open', 'magenta')) 1467 self.emit('open') 1468 1469 def on_l2cap_channel_close(self): 1470 logger.debug(color('<<< L2CAP channel close', 'magenta')) 1471 self.emit('close') 1472 1473 def send_message(self, transaction_label: int, message: Message) -> None: 1474 logger.debug( 1475 f'{color(">>> Sending AVDTP message", "magenta")}: ' 1476 f'[{transaction_label}] {message}' 1477 ) 1478 max_fragment_size = ( 1479 self.l2cap_channel.peer_mtu - 3 1480 ) # Enough space for a 3-byte start packet header 1481 payload = message.payload 1482 if len(payload) + 2 <= self.l2cap_channel.peer_mtu: 1483 # Fits in a single packet 1484 packet_type = self.PacketType.SINGLE_PACKET 1485 else: 1486 packet_type = self.PacketType.START_PACKET 1487 1488 done = False 1489 while not done: 1490 first_header_byte = ( 1491 transaction_label << 4 | packet_type << 2 | message.message_type 1492 ) 1493 1494 if packet_type == self.PacketType.SINGLE_PACKET: 1495 header = bytes([first_header_byte, message.signal_identifier]) 1496 elif packet_type == self.PacketType.START_PACKET: 1497 packet_count = ( 1498 max_fragment_size - 1 + len(payload) 1499 ) // max_fragment_size 1500 header = bytes( 1501 [first_header_byte, message.signal_identifier, packet_count] 1502 ) 1503 else: 1504 header = bytes([first_header_byte]) 1505 1506 # Send one packet 1507 self.l2cap_channel.send_pdu(header + payload[:max_fragment_size]) 1508 1509 # Prepare for the next packet 1510 payload = payload[max_fragment_size:] 1511 if payload: 1512 packet_type = ( 1513 self.PacketType.CONTINUE_PACKET 1514 if len(payload) > max_fragment_size 1515 else self.PacketType.END_PACKET 1516 ) 1517 else: 1518 done = True 1519 1520 async def send_command(self, command: Message): 1521 # TODO: support timeouts 1522 # Send the command 1523 (transaction_label, transaction_result) = await self.start_transaction() 1524 self.send_message(transaction_label, command) 1525 1526 # Wait for the response 1527 response = await transaction_result 1528 1529 # Check for errors 1530 if response.message_type in ( 1531 Message.MessageType.GENERAL_REJECT, 1532 Message.MessageType.RESPONSE_REJECT, 1533 ): 1534 assert hasattr(response, 'error_code') 1535 raise ProtocolError(response.error_code, 'avdtp') 1536 1537 return response 1538 1539 async def start_transaction(self) -> Tuple[int, asyncio.Future[Message]]: 1540 # Wait until we can start a new transaction 1541 await self.transaction_semaphore.acquire() 1542 1543 # Look for the next free entry to store the transaction result 1544 for i in range(16): 1545 transaction_label = (self.transaction_count + i) % 16 1546 if self.transaction_results[transaction_label] is None: 1547 transaction_result = asyncio.get_running_loop().create_future() 1548 self.transaction_results[transaction_label] = transaction_result 1549 self.transaction_count += 1 1550 return (transaction_label, transaction_result) 1551 1552 assert False # Should never reach this 1553 1554 async def get_capabilities(self, seid: int) -> Union[ 1555 Get_Capabilities_Response, 1556 Get_All_Capabilities_Response, 1557 ]: 1558 if self.version > (1, 2): 1559 return await self.send_command(Get_All_Capabilities_Command(seid)) 1560 1561 return await self.send_command(Get_Capabilities_Command(seid)) 1562 1563 async def set_configuration( 1564 self, acp_seid: int, int_seid: int, capabilities: Iterable[ServiceCapabilities] 1565 ) -> Set_Configuration_Response: 1566 return await self.send_command( 1567 Set_Configuration_Command(acp_seid, int_seid, capabilities) 1568 ) 1569 1570 async def get_configuration(self, seid: int) -> Get_Configuration_Response: 1571 response = await self.send_command(Get_Configuration_Command(seid)) 1572 return response.capabilities 1573 1574 async def open(self, seid: int) -> Open_Response: 1575 return await self.send_command(Open_Command(seid)) 1576 1577 async def start(self, seids: Iterable[int]) -> Start_Response: 1578 return await self.send_command(Start_Command(seids)) 1579 1580 async def suspend(self, seids: Iterable[int]) -> Suspend_Response: 1581 return await self.send_command(Suspend_Command(seids)) 1582 1583 async def close(self, seid: int) -> Close_Response: 1584 return await self.send_command(Close_Command(seid)) 1585 1586 async def abort(self, seid: int) -> Abort_Response: 1587 return await self.send_command(Abort_Command(seid)) 1588 1589 def on_discover_command(self, _command): 1590 endpoint_infos = [ 1591 EndPointInfo(endpoint.seid, 0, endpoint.media_type, endpoint.tsep) 1592 for endpoint in self.local_endpoints 1593 ] 1594 return Discover_Response(endpoint_infos) 1595 1596 def on_get_capabilities_command(self, command): 1597 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1598 if endpoint is None: 1599 return Get_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1600 1601 return Get_Capabilities_Response(endpoint.capabilities) 1602 1603 def on_get_all_capabilities_command(self, command): 1604 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1605 if endpoint is None: 1606 return Get_All_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1607 1608 return Get_All_Capabilities_Response(endpoint.capabilities) 1609 1610 def on_set_configuration_command(self, command): 1611 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1612 if endpoint is None: 1613 return Set_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1614 1615 # Check that the local endpoint isn't in use 1616 if endpoint.in_use: 1617 return Set_Configuration_Reject(AVDTP_SEP_IN_USE_ERROR) 1618 1619 # Create a stream object for the pair of endpoints 1620 stream = Stream(self, endpoint, StreamEndPointProxy(self, command.int_seid)) 1621 self.streams[command.acp_seid] = stream 1622 1623 result = stream.on_set_configuration_command(command.capabilities) 1624 return result or Set_Configuration_Response() 1625 1626 def on_get_configuration_command(self, command): 1627 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1628 if endpoint is None: 1629 return Get_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1630 if endpoint.stream is None: 1631 return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR) 1632 1633 return endpoint.stream.on_get_configuration_command() 1634 1635 def on_reconfigure_command(self, command): 1636 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1637 if endpoint is None: 1638 return Reconfigure_Reject(0, AVDTP_BAD_ACP_SEID_ERROR) 1639 if endpoint.stream is None: 1640 return Reconfigure_Reject(0, AVDTP_BAD_STATE_ERROR) 1641 1642 result = endpoint.stream.on_reconfigure_command(command.capabilities) 1643 return result or Reconfigure_Response() 1644 1645 def on_open_command(self, command): 1646 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1647 if endpoint is None: 1648 return Open_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1649 if endpoint.stream is None: 1650 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1651 1652 result = endpoint.stream.on_open_command() 1653 return result or Open_Response() 1654 1655 def on_start_command(self, command): 1656 for seid in command.acp_seids: 1657 endpoint = self.get_local_endpoint_by_seid(seid) 1658 if endpoint is None: 1659 return Start_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR) 1660 if endpoint.stream is None: 1661 return Start_Reject(AVDTP_BAD_STATE_ERROR) 1662 1663 # Start all streams 1664 # TODO: deal with partial failures 1665 for seid in command.acp_seids: 1666 endpoint = self.get_local_endpoint_by_seid(seid) 1667 result = endpoint.stream.on_start_command() 1668 if result is not None: 1669 return result 1670 1671 return Start_Response() 1672 1673 def on_suspend_command(self, command): 1674 for seid in command.acp_seids: 1675 endpoint = self.get_local_endpoint_by_seid(seid) 1676 if endpoint is None: 1677 return Suspend_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR) 1678 if endpoint.stream is None: 1679 return Suspend_Reject(seid, AVDTP_BAD_STATE_ERROR) 1680 1681 # Suspend all streams 1682 # TODO: deal with partial failures 1683 for seid in command.acp_seids: 1684 endpoint = self.get_local_endpoint_by_seid(seid) 1685 result = endpoint.stream.on_suspend_command() 1686 if result is not None: 1687 return result 1688 1689 return Suspend_Response() 1690 1691 def on_close_command(self, command): 1692 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1693 if endpoint is None: 1694 return Close_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1695 if endpoint.stream is None: 1696 return Close_Reject(AVDTP_BAD_STATE_ERROR) 1697 1698 result = endpoint.stream.on_close_command() 1699 return result or Close_Response() 1700 1701 def on_abort_command(self, command): 1702 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1703 if endpoint is None or endpoint.stream is None: 1704 return Abort_Response() 1705 1706 endpoint.stream.on_abort_command() 1707 return Abort_Response() 1708 1709 def on_security_control_command(self, command): 1710 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1711 if endpoint is None: 1712 return Security_Control_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1713 1714 result = endpoint.on_security_control_command(command.payload) 1715 return result or Security_Control_Response() 1716 1717 def on_delayreport_command(self, command): 1718 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1719 if endpoint is None: 1720 return DelayReport_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1721 1722 result = endpoint.on_delayreport_command(command.delay) 1723 return result or DelayReport_Response() 1724 1725 1726# ----------------------------------------------------------------------------- 1727class Listener(EventEmitter): 1728 servers: Dict[int, Protocol] 1729 1730 @staticmethod 1731 def create_registrar(device: device.Device): 1732 warnings.warn("Please use Listener.for_device()", DeprecationWarning) 1733 1734 def wrapper(handler: Callable[[l2cap.ClassicChannel], None]) -> None: 1735 device.create_l2cap_server(l2cap.ClassicChannelSpec(psm=AVDTP_PSM), handler) 1736 1737 return wrapper 1738 1739 def set_server(self, connection: device.Connection, server: Protocol) -> None: 1740 self.servers[connection.handle] = server 1741 1742 def remove_server(self, connection: device.Connection) -> None: 1743 if connection.handle in self.servers: 1744 del self.servers[connection.handle] 1745 1746 def __init__(self, registrar=None, version=(1, 3)): 1747 super().__init__() 1748 self.version = version 1749 self.servers = {} # Servers, by connection handle 1750 1751 # Listen for incoming L2CAP connections 1752 if registrar: 1753 warnings.warn("Please use Listener.for_device()", DeprecationWarning) 1754 registrar(self.on_l2cap_connection) 1755 1756 @classmethod 1757 def for_device( 1758 cls, device: device.Device, version: Tuple[int, int] = (1, 3) 1759 ) -> Listener: 1760 listener = Listener(registrar=None, version=version) 1761 l2cap_server = device.create_l2cap_server( 1762 spec=l2cap.ClassicChannelSpec(psm=AVDTP_PSM) 1763 ) 1764 l2cap_server.on('connection', listener.on_l2cap_connection) 1765 return listener 1766 1767 def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None: 1768 logger.debug(f'{color("<<< incoming L2CAP connection:", "magenta")} {channel}') 1769 1770 if channel.connection.handle in self.servers: 1771 # This is a channel for a stream endpoint 1772 server = self.servers[channel.connection.handle] 1773 server.on_l2cap_connection(channel) 1774 else: 1775 # This is a new command/response channel 1776 def on_channel_open(): 1777 logger.debug('setting up new Protocol for the connection') 1778 server = Protocol(channel, self.version) 1779 self.set_server(channel.connection, server) 1780 self.emit('connection', server) 1781 1782 def on_channel_close(): 1783 logger.debug('removing Protocol for the connection') 1784 self.remove_server(channel.connection) 1785 1786 channel.on('open', on_channel_open) 1787 channel.on('close', on_channel_close) 1788 1789 1790# ----------------------------------------------------------------------------- 1791class Stream: 1792 ''' 1793 Pair of a local and a remote stream endpoint that can stream from one to the other 1794 ''' 1795 1796 rtp_channel: Optional[l2cap.ClassicChannel] 1797 1798 @staticmethod 1799 def state_name(state: int) -> str: 1800 return name_or_number(AVDTP_STATE_NAMES, state) 1801 1802 def change_state(self, state: int) -> None: 1803 logger.debug(f'{self} state change -> {color(self.state_name(state), "cyan")}') 1804 self.state = state 1805 1806 def send_media_packet(self, packet: MediaPacket) -> None: 1807 assert self.rtp_channel 1808 self.rtp_channel.send_pdu(bytes(packet)) 1809 1810 async def configure(self) -> None: 1811 if self.state != AVDTP_IDLE_STATE: 1812 raise InvalidStateError('current state is not IDLE') 1813 1814 await self.remote_endpoint.set_configuration( 1815 self.local_endpoint.seid, self.local_endpoint.configuration 1816 ) 1817 self.change_state(AVDTP_CONFIGURED_STATE) 1818 1819 async def open(self) -> None: 1820 if self.state != AVDTP_CONFIGURED_STATE: 1821 raise InvalidStateError('current state is not CONFIGURED') 1822 1823 logger.debug('opening remote endpoint') 1824 await self.remote_endpoint.open() 1825 1826 self.change_state(AVDTP_OPEN_STATE) 1827 1828 # Create a channel for RTP packets 1829 self.rtp_channel = ( 1830 await self.protocol.l2cap_channel.connection.create_l2cap_channel( 1831 l2cap.ClassicChannelSpec(psm=AVDTP_PSM) 1832 ) 1833 ) 1834 1835 async def start(self) -> None: 1836 # Auto-open if needed 1837 if self.state == AVDTP_CONFIGURED_STATE: 1838 await self.open() 1839 1840 if self.state != AVDTP_OPEN_STATE: 1841 raise InvalidStateError('current state is not OPEN') 1842 1843 logger.debug('starting remote endpoint') 1844 await self.remote_endpoint.start() 1845 1846 logger.debug('starting local endpoint') 1847 await self.local_endpoint.start() 1848 1849 self.change_state(AVDTP_STREAMING_STATE) 1850 1851 async def stop(self) -> None: 1852 if self.state != AVDTP_STREAMING_STATE: 1853 raise InvalidStateError('current state is not STREAMING') 1854 1855 logger.debug('stopping local endpoint') 1856 await self.local_endpoint.stop() 1857 1858 logger.debug('stopping remote endpoint') 1859 await self.remote_endpoint.stop() 1860 1861 self.change_state(AVDTP_OPEN_STATE) 1862 1863 async def close(self) -> None: 1864 if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE): 1865 raise InvalidStateError('current state is not OPEN or STREAMING') 1866 1867 logger.debug('closing local endpoint') 1868 await self.local_endpoint.close() 1869 1870 logger.debug('closing remote endpoint') 1871 await self.remote_endpoint.close() 1872 1873 # Release any channels we may have created 1874 self.change_state(AVDTP_CLOSING_STATE) 1875 if self.rtp_channel: 1876 await self.rtp_channel.disconnect() 1877 self.rtp_channel = None 1878 1879 # Release the endpoint 1880 self.local_endpoint.in_use = 0 1881 1882 self.change_state(AVDTP_IDLE_STATE) 1883 1884 def on_set_configuration_command(self, configuration): 1885 if self.state != AVDTP_IDLE_STATE: 1886 return Set_Configuration_Reject(AVDTP_BAD_STATE_ERROR) 1887 1888 result = self.local_endpoint.on_set_configuration_command(configuration) 1889 if result is not None: 1890 return result 1891 1892 self.change_state(AVDTP_CONFIGURED_STATE) 1893 return None 1894 1895 def on_get_configuration_command(self, configuration): 1896 if self.state not in ( 1897 AVDTP_CONFIGURED_STATE, 1898 AVDTP_OPEN_STATE, 1899 AVDTP_STREAMING_STATE, 1900 ): 1901 return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR) 1902 1903 return self.local_endpoint.on_get_configuration_command(configuration) 1904 1905 def on_reconfigure_command(self, configuration): 1906 if self.state != AVDTP_OPEN_STATE: 1907 return Reconfigure_Reject(AVDTP_BAD_STATE_ERROR) 1908 1909 result = self.local_endpoint.on_reconfigure_command(configuration) 1910 if result is not None: 1911 return result 1912 1913 return None 1914 1915 def on_open_command(self): 1916 if self.state != AVDTP_CONFIGURED_STATE: 1917 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1918 1919 result = self.local_endpoint.on_open_command() 1920 if result is not None: 1921 return result 1922 1923 # Register to accept the next channel 1924 self.protocol.channel_acceptor = self 1925 1926 self.change_state(AVDTP_OPEN_STATE) 1927 return None 1928 1929 def on_start_command(self): 1930 if self.state != AVDTP_OPEN_STATE: 1931 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1932 1933 # Check that we have an RTP channel 1934 if self.rtp_channel is None: 1935 logger.warning('received start command before RTP channel establishment') 1936 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1937 1938 result = self.local_endpoint.on_start_command() 1939 if result is not None: 1940 return result 1941 1942 self.change_state(AVDTP_STREAMING_STATE) 1943 return None 1944 1945 def on_suspend_command(self): 1946 if self.state != AVDTP_STREAMING_STATE: 1947 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1948 1949 result = self.local_endpoint.on_suspend_command() 1950 if result is not None: 1951 return result 1952 1953 self.change_state(AVDTP_OPEN_STATE) 1954 return None 1955 1956 def on_close_command(self): 1957 if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE): 1958 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1959 1960 result = self.local_endpoint.on_close_command() 1961 if result is not None: 1962 return result 1963 1964 self.change_state(AVDTP_CLOSING_STATE) 1965 1966 if self.rtp_channel is None: 1967 # No channel to release, we're done 1968 self.change_state(AVDTP_IDLE_STATE) 1969 else: 1970 # TODO: set a timer as we wait for the RTP channel to be closed 1971 pass 1972 1973 return None 1974 1975 def on_abort_command(self): 1976 if self.rtp_channel is None: 1977 # No need to wait 1978 self.change_state(AVDTP_IDLE_STATE) 1979 else: 1980 # Wait for the RTP channel to be closed 1981 self.change_state(AVDTP_ABORTING_STATE) 1982 1983 def on_l2cap_connection(self, channel): 1984 logger.debug(color('<<< stream channel connected', 'magenta')) 1985 self.rtp_channel = channel 1986 channel.on('open', self.on_l2cap_channel_open) 1987 channel.on('close', self.on_l2cap_channel_close) 1988 1989 # We don't need more channels 1990 self.protocol.channel_acceptor = None 1991 1992 def on_l2cap_channel_open(self): 1993 logger.debug(color('<<< stream channel open', 'magenta')) 1994 self.local_endpoint.on_rtp_channel_open() 1995 1996 def on_l2cap_channel_close(self): 1997 logger.debug(color('<<< stream channel closed', 'magenta')) 1998 self.local_endpoint.on_rtp_channel_close() 1999 self.local_endpoint.in_use = 0 2000 self.rtp_channel = None 2001 2002 if self.state in (AVDTP_CLOSING_STATE, AVDTP_ABORTING_STATE): 2003 self.change_state(AVDTP_IDLE_STATE) 2004 else: 2005 logger.warning('unexpected channel close while not CLOSING or ABORTING') 2006 2007 def __init__( 2008 self, 2009 protocol: Protocol, 2010 local_endpoint: LocalStreamEndPoint, 2011 remote_endpoint: StreamEndPointProxy, 2012 ) -> None: 2013 ''' 2014 remote_endpoint must be a subclass of StreamEndPointProxy 2015 2016 ''' 2017 self.protocol = protocol 2018 self.local_endpoint = local_endpoint 2019 self.remote_endpoint = remote_endpoint 2020 self.rtp_channel = None 2021 self.state = AVDTP_IDLE_STATE 2022 2023 local_endpoint.stream = self 2024 local_endpoint.in_use = 1 2025 2026 def __str__(self) -> str: 2027 return ( 2028 f'Stream({self.local_endpoint.seid} -> ' 2029 f'{self.remote_endpoint.seid} {self.state_name(self.state)})' 2030 ) 2031 2032 2033# ----------------------------------------------------------------------------- 2034class StreamEndPoint: 2035 def __init__( 2036 self, 2037 seid: int, 2038 media_type: int, 2039 tsep: int, 2040 in_use: int, 2041 capabilities: Iterable[ServiceCapabilities], 2042 ) -> None: 2043 self.seid = seid 2044 self.media_type = media_type 2045 self.tsep = tsep 2046 self.in_use = in_use 2047 self.capabilities = capabilities 2048 2049 def __str__(self) -> str: 2050 media_type = f'{name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}' 2051 tsep = f'{name_or_number(AVDTP_TSEP_NAMES, self.tsep)}' 2052 return '\n'.join( 2053 [ 2054 'SEP(', 2055 f' seid={self.seid}', 2056 f' media_type={media_type}', 2057 f' tsep={tsep}', 2058 f' in_use={self.in_use}', 2059 ' capabilities=[', 2060 '\n'.join([f' {x}' for x in self.capabilities]), 2061 ' ]', 2062 ')', 2063 ] 2064 ) 2065 2066 2067# ----------------------------------------------------------------------------- 2068class StreamEndPointProxy: 2069 def __init__(self, protocol: Protocol, seid: int) -> None: 2070 self.seid = seid 2071 self.protocol = protocol 2072 2073 async def set_configuration( 2074 self, int_seid: int, configuration: Iterable[ServiceCapabilities] 2075 ) -> Set_Configuration_Response: 2076 return await self.protocol.set_configuration(self.seid, int_seid, configuration) 2077 2078 async def open(self) -> Open_Response: 2079 return await self.protocol.open(self.seid) 2080 2081 async def start(self) -> Start_Response: 2082 return await self.protocol.start([self.seid]) 2083 2084 async def stop(self) -> Suspend_Response: 2085 return await self.protocol.suspend([self.seid]) 2086 2087 async def close(self) -> Close_Response: 2088 return await self.protocol.close(self.seid) 2089 2090 async def abort(self) -> Abort_Response: 2091 return await self.protocol.abort(self.seid) 2092 2093 2094# ----------------------------------------------------------------------------- 2095class DiscoveredStreamEndPoint(StreamEndPoint, StreamEndPointProxy): 2096 def __init__( 2097 self, 2098 protocol: Protocol, 2099 seid: int, 2100 media_type: int, 2101 tsep: int, 2102 in_use: int, 2103 capabilities: Iterable[ServiceCapabilities], 2104 ) -> None: 2105 StreamEndPoint.__init__(self, seid, media_type, tsep, in_use, capabilities) 2106 StreamEndPointProxy.__init__(self, protocol, seid) 2107 2108 2109# ----------------------------------------------------------------------------- 2110class LocalStreamEndPoint(StreamEndPoint, EventEmitter): 2111 stream: Optional[Stream] 2112 2113 def __init__( 2114 self, 2115 protocol: Protocol, 2116 seid: int, 2117 media_type: int, 2118 tsep: int, 2119 capabilities: Iterable[ServiceCapabilities], 2120 configuration: Optional[Iterable[ServiceCapabilities]] = None, 2121 ): 2122 StreamEndPoint.__init__(self, seid, media_type, tsep, 0, capabilities) 2123 EventEmitter.__init__(self) 2124 self.protocol = protocol 2125 self.configuration = configuration if configuration is not None else [] 2126 self.stream = None 2127 2128 async def start(self): 2129 pass 2130 2131 async def stop(self): 2132 pass 2133 2134 async def close(self): 2135 pass 2136 2137 def on_reconfigure_command(self, command): 2138 pass 2139 2140 def on_set_configuration_command(self, configuration): 2141 logger.debug( 2142 '<<< received configuration: ' 2143 f'{",".join([str(capability) for capability in configuration])}' 2144 ) 2145 self.configuration = configuration 2146 self.emit('configuration') 2147 2148 def on_get_configuration_command(self): 2149 return Get_Configuration_Response(self.configuration) 2150 2151 def on_open_command(self): 2152 self.emit('open') 2153 2154 def on_start_command(self): 2155 self.emit('start') 2156 2157 def on_suspend_command(self): 2158 self.emit('suspend') 2159 2160 def on_close_command(self): 2161 self.emit('close') 2162 2163 def on_abort_command(self): 2164 self.emit('abort') 2165 2166 def on_delayreport_command(self, delay: int): 2167 self.emit('delay_report', delay) 2168 2169 def on_rtp_channel_open(self): 2170 self.emit('rtp_channel_open') 2171 2172 def on_rtp_channel_close(self): 2173 self.emit('rtp_channel_close') 2174 2175 2176# ----------------------------------------------------------------------------- 2177class LocalSource(LocalStreamEndPoint): 2178 def __init__( 2179 self, 2180 protocol: Protocol, 2181 seid: int, 2182 codec_capabilities: MediaCodecCapabilities, 2183 packet_pump: MediaPacketPump, 2184 ) -> None: 2185 capabilities = [ 2186 ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY), 2187 codec_capabilities, 2188 ] 2189 super().__init__( 2190 protocol, 2191 seid, 2192 codec_capabilities.media_type, 2193 AVDTP_TSEP_SRC, 2194 capabilities, 2195 capabilities, 2196 ) 2197 self.packet_pump = packet_pump 2198 2199 async def start(self) -> None: 2200 if self.packet_pump and self.stream and self.stream.rtp_channel: 2201 return await self.packet_pump.start(self.stream.rtp_channel) 2202 2203 self.emit('start') 2204 2205 async def stop(self) -> None: 2206 if self.packet_pump: 2207 return await self.packet_pump.stop() 2208 2209 self.emit('stop') 2210 2211 def on_start_command(self): 2212 asyncio.create_task(self.start()) 2213 2214 def on_suspend_command(self): 2215 asyncio.create_task(self.stop()) 2216 2217 2218# ----------------------------------------------------------------------------- 2219class LocalSink(LocalStreamEndPoint): 2220 def __init__( 2221 self, protocol: Protocol, seid: int, codec_capabilities: MediaCodecCapabilities 2222 ) -> None: 2223 capabilities = [ 2224 ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY), 2225 codec_capabilities, 2226 ] 2227 super().__init__( 2228 protocol, 2229 seid, 2230 codec_capabilities.media_type, 2231 AVDTP_TSEP_SNK, 2232 capabilities, 2233 ) 2234 2235 def on_rtp_channel_open(self): 2236 logger.debug(color('<<< RTP channel open', 'magenta')) 2237 self.stream.rtp_channel.sink = self.on_avdtp_packet 2238 super().on_rtp_channel_open() 2239 2240 def on_rtp_channel_close(self): 2241 logger.debug(color('<<< RTP channel close', 'magenta')) 2242 super().on_rtp_channel_close() 2243 2244 def on_avdtp_packet(self, packet): 2245 rtp_packet = MediaPacket.from_bytes(packet) 2246 logger.debug( 2247 f'{color("<<< RTP Packet:", "green")} ' 2248 f'{rtp_packet} {rtp_packet.payload[:16].hex()}' 2249 ) 2250 self.emit('rtp_packet', rtp_packet) 2251