1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19import asyncio 20import dataclasses 21import enum 22import logging 23import struct 24 25from collections import deque 26from pyee import EventEmitter 27from typing import ( 28 Dict, 29 Type, 30 List, 31 Optional, 32 Tuple, 33 Callable, 34 Any, 35 Union, 36 Deque, 37 Iterable, 38 SupportsBytes, 39 TYPE_CHECKING, 40) 41 42from .utils import deprecated 43from .colors import color 44from .core import ( 45 BT_CENTRAL_ROLE, 46 InvalidStateError, 47 InvalidArgumentError, 48 InvalidPacketError, 49 OutOfResourcesError, 50 ProtocolError, 51) 52from .hci import ( 53 HCI_LE_Connection_Update_Command, 54 HCI_Object, 55 key_with_value, 56 name_or_number, 57) 58 59if TYPE_CHECKING: 60 from bumble.device import Connection 61 from bumble.host import Host 62 63# ----------------------------------------------------------------------------- 64# Logging 65# ----------------------------------------------------------------------------- 66logger = logging.getLogger(__name__) 67 68 69# ----------------------------------------------------------------------------- 70# Constants 71# ----------------------------------------------------------------------------- 72# fmt: off 73# pylint: disable=line-too-long 74 75L2CAP_SIGNALING_CID = 0x01 76L2CAP_LE_SIGNALING_CID = 0x05 77 78L2CAP_MIN_LE_MTU = 23 79L2CAP_MIN_BR_EDR_MTU = 48 80L2CAP_MAX_BR_EDR_MTU = 65535 81 82L2CAP_DEFAULT_MTU = 2048 # Default value for the MTU we are willing to accept 83 84L2CAP_DEFAULT_CONNECTIONLESS_MTU = 1024 85 86# See Bluetooth spec @ Vol 3, Part A - Table 2.1: CID name space on ACL-U, ASB-U, and AMP-U logical links 87L2CAP_ACL_U_DYNAMIC_CID_RANGE_START = 0x0040 88L2CAP_ACL_U_DYNAMIC_CID_RANGE_END = 0xFFFF 89 90# See Bluetooth spec @ Vol 3, Part A - Table 2.2: CID name space on LE-U logical link 91L2CAP_LE_U_DYNAMIC_CID_RANGE_START = 0x0040 92L2CAP_LE_U_DYNAMIC_CID_RANGE_END = 0x007F 93 94# PSM Range - See Bluetooth spec @ Vol 3, Part A / Table 4.5: PSM ranges and usage 95L2CAP_PSM_DYNAMIC_RANGE_START = 0x1001 96L2CAP_PSM_DYNAMIC_RANGE_END = 0xFFFF 97 98# LE PSM Ranges - See Bluetooth spec @ Vol 3, Part A / Table 4.19: LE Credit Based Connection Request LE_PSM ranges 99L2CAP_LE_PSM_DYNAMIC_RANGE_START = 0x0080 100L2CAP_LE_PSM_DYNAMIC_RANGE_END = 0x00FF 101 102# Frame types 103L2CAP_COMMAND_REJECT = 0x01 104L2CAP_CONNECTION_REQUEST = 0x02 105L2CAP_CONNECTION_RESPONSE = 0x03 106L2CAP_CONFIGURE_REQUEST = 0x04 107L2CAP_CONFIGURE_RESPONSE = 0x05 108L2CAP_DISCONNECTION_REQUEST = 0x06 109L2CAP_DISCONNECTION_RESPONSE = 0x07 110L2CAP_ECHO_REQUEST = 0x08 111L2CAP_ECHO_RESPONSE = 0x09 112L2CAP_INFORMATION_REQUEST = 0x0A 113L2CAP_INFORMATION_RESPONSE = 0x0B 114L2CAP_CREATE_CHANNEL_REQUEST = 0x0C 115L2CAP_CREATE_CHANNEL_RESPONSE = 0x0D 116L2CAP_MOVE_CHANNEL_REQUEST = 0x0E 117L2CAP_MOVE_CHANNEL_RESPONSE = 0x0F 118L2CAP_MOVE_CHANNEL_CONFIRMATION = 0x10 119L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE = 0x11 120L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST = 0x12 121L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE = 0x13 122L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST = 0x14 123L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE = 0x15 124L2CAP_LE_FLOW_CONTROL_CREDIT = 0x16 125 126L2CAP_CONTROL_FRAME_NAMES = { 127 L2CAP_COMMAND_REJECT: 'L2CAP_COMMAND_REJECT', 128 L2CAP_CONNECTION_REQUEST: 'L2CAP_CONNECTION_REQUEST', 129 L2CAP_CONNECTION_RESPONSE: 'L2CAP_CONNECTION_RESPONSE', 130 L2CAP_CONFIGURE_REQUEST: 'L2CAP_CONFIGURE_REQUEST', 131 L2CAP_CONFIGURE_RESPONSE: 'L2CAP_CONFIGURE_RESPONSE', 132 L2CAP_DISCONNECTION_REQUEST: 'L2CAP_DISCONNECTION_REQUEST', 133 L2CAP_DISCONNECTION_RESPONSE: 'L2CAP_DISCONNECTION_RESPONSE', 134 L2CAP_ECHO_REQUEST: 'L2CAP_ECHO_REQUEST', 135 L2CAP_ECHO_RESPONSE: 'L2CAP_ECHO_RESPONSE', 136 L2CAP_INFORMATION_REQUEST: 'L2CAP_INFORMATION_REQUEST', 137 L2CAP_INFORMATION_RESPONSE: 'L2CAP_INFORMATION_RESPONSE', 138 L2CAP_CREATE_CHANNEL_REQUEST: 'L2CAP_CREATE_CHANNEL_REQUEST', 139 L2CAP_CREATE_CHANNEL_RESPONSE: 'L2CAP_CREATE_CHANNEL_RESPONSE', 140 L2CAP_MOVE_CHANNEL_REQUEST: 'L2CAP_MOVE_CHANNEL_REQUEST', 141 L2CAP_MOVE_CHANNEL_RESPONSE: 'L2CAP_MOVE_CHANNEL_RESPONSE', 142 L2CAP_MOVE_CHANNEL_CONFIRMATION: 'L2CAP_MOVE_CHANNEL_CONFIRMATION', 143 L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE: 'L2CAP_MOVE_CHANNEL_CONFIRMATION_RESPONSE', 144 L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST: 'L2CAP_CONNECTION_PARAMETER_UPDATE_REQUEST', 145 L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE: 'L2CAP_CONNECTION_PARAMETER_UPDATE_RESPONSE', 146 L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST: 'L2CAP_LE_CREDIT_BASED_CONNECTION_REQUEST', 147 L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE: 'L2CAP_LE_CREDIT_BASED_CONNECTION_RESPONSE', 148 L2CAP_LE_FLOW_CONTROL_CREDIT: 'L2CAP_LE_FLOW_CONTROL_CREDIT' 149} 150 151L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT = 0x0000 152L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT = 0x0001 153 154L2CAP_COMMAND_NOT_UNDERSTOOD_REASON = 0x0000 155L2CAP_SIGNALING_MTU_EXCEEDED_REASON = 0x0001 156L2CAP_INVALID_CID_IN_REQUEST_REASON = 0x0002 157 158L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS = 65535 159L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU = 23 160L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU = 65535 161L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS = 23 162L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS = 65533 163L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2048 164L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2048 165L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256 166 167L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01 168 169L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE = 0x01 170 171# fmt: on 172# pylint: enable=line-too-long 173 174 175# ----------------------------------------------------------------------------- 176# Classes 177# ----------------------------------------------------------------------------- 178# pylint: disable=invalid-name 179 180 181@dataclasses.dataclass 182class ClassicChannelSpec: 183 psm: Optional[int] = None 184 mtu: int = L2CAP_DEFAULT_MTU 185 186 187@dataclasses.dataclass 188class LeCreditBasedChannelSpec: 189 psm: Optional[int] = None 190 mtu: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU 191 mps: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS 192 max_credits: int = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS 193 194 def __post_init__(self): 195 if ( 196 self.max_credits < 1 197 or self.max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS 198 ): 199 raise InvalidArgumentError('max credits out of range') 200 if ( 201 self.mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU 202 or self.mtu > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MTU 203 ): 204 raise InvalidArgumentError('MTU out of range') 205 if ( 206 self.mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS 207 or self.mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS 208 ): 209 raise InvalidArgumentError('MPS out of range') 210 211 212class L2CAP_PDU: 213 ''' 214 See Bluetooth spec @ Vol 3, Part A - 3 DATA PACKET FORMAT 215 ''' 216 217 @staticmethod 218 def from_bytes(data: bytes) -> L2CAP_PDU: 219 # Check parameters 220 if len(data) < 4: 221 raise InvalidPacketError('not enough data for L2CAP header') 222 223 _, l2cap_pdu_cid = struct.unpack_from('<HH', data, 0) 224 l2cap_pdu_payload = data[4:] 225 226 return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload) 227 228 def to_bytes(self) -> bytes: 229 header = struct.pack('<HH', len(self.payload), self.cid) 230 return header + self.payload 231 232 def __init__(self, cid: int, payload: bytes) -> None: 233 self.cid = cid 234 self.payload = payload 235 236 def __bytes__(self) -> bytes: 237 return self.to_bytes() 238 239 def __str__(self) -> str: 240 return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}' 241 242 243# ----------------------------------------------------------------------------- 244class L2CAP_Control_Frame: 245 ''' 246 See Bluetooth spec @ Vol 3, Part A - 4 SIGNALING PACKET FORMATS 247 ''' 248 249 classes: Dict[int, Type[L2CAP_Control_Frame]] = {} 250 code = 0 251 name: str 252 253 @staticmethod 254 def from_bytes(pdu: bytes) -> L2CAP_Control_Frame: 255 code = pdu[0] 256 257 cls = L2CAP_Control_Frame.classes.get(code) 258 if cls is None: 259 instance = L2CAP_Control_Frame(pdu) 260 instance.name = L2CAP_Control_Frame.code_name(code) 261 instance.code = code 262 return instance 263 self = cls.__new__(cls) 264 L2CAP_Control_Frame.__init__(self, pdu) 265 self.identifier = pdu[1] 266 length = struct.unpack_from('<H', pdu, 2)[0] 267 if length + 4 != len(pdu): 268 logger.warning( 269 color( 270 f'!!! length mismatch: expected {len(pdu) - 4} but got {length}', 271 'red', 272 ) 273 ) 274 if hasattr(self, 'fields'): 275 self.init_from_bytes(pdu, 4) 276 return self 277 278 @staticmethod 279 def code_name(code: int) -> str: 280 return name_or_number(L2CAP_CONTROL_FRAME_NAMES, code) 281 282 @staticmethod 283 def decode_configuration_options(data: bytes) -> List[Tuple[int, bytes]]: 284 options = [] 285 while len(data) >= 2: 286 value_type = data[0] 287 length = data[1] 288 value = data[2 : 2 + length] 289 data = data[2 + length :] 290 options.append((value_type, value)) 291 292 return options 293 294 @staticmethod 295 def encode_configuration_options(options: List[Tuple[int, bytes]]) -> bytes: 296 return b''.join( 297 [bytes([option[0], len(option[1])]) + option[1] for option in options] 298 ) 299 300 @staticmethod 301 def subclass(fields): 302 def inner(cls): 303 cls.name = cls.__name__.upper() 304 cls.code = key_with_value(L2CAP_CONTROL_FRAME_NAMES, cls.name) 305 if cls.code is None: 306 raise KeyError( 307 f'Control Frame name {cls.name} ' 308 'not found in L2CAP_CONTROL_FRAME_NAMES' 309 ) 310 cls.fields = fields 311 312 # Register a factory for this class 313 L2CAP_Control_Frame.classes[cls.code] = cls 314 315 return cls 316 317 return inner 318 319 def __init__(self, pdu=None, **kwargs) -> None: 320 self.identifier = kwargs.get('identifier', 0) 321 if hasattr(self, 'fields'): 322 if kwargs: 323 HCI_Object.init_from_fields(self, self.fields, kwargs) 324 if pdu is None: 325 data = HCI_Object.dict_to_bytes(kwargs, self.fields) 326 pdu = ( 327 bytes([self.code, self.identifier]) 328 + struct.pack('<H', len(data)) 329 + data 330 ) 331 self.pdu = pdu 332 333 def init_from_bytes(self, pdu, offset): 334 return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 335 336 def to_bytes(self) -> bytes: 337 return self.pdu 338 339 def __bytes__(self) -> bytes: 340 return self.to_bytes() 341 342 def __str__(self) -> str: 343 result = f'{color(self.name, "yellow")} [ID={self.identifier}]' 344 if fields := getattr(self, 'fields', None): 345 result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') 346 else: 347 if len(self.pdu) > 1: 348 result += f': {self.pdu.hex()}' 349 return result 350 351 352# ----------------------------------------------------------------------------- 353@L2CAP_Control_Frame.subclass( 354 # pylint: disable=unnecessary-lambda 355 [ 356 ( 357 'reason', 358 {'size': 2, 'mapper': lambda x: L2CAP_Command_Reject.reason_name(x)}, 359 ), 360 ('data', '*'), 361 ] 362) 363class L2CAP_Command_Reject(L2CAP_Control_Frame): 364 ''' 365 See Bluetooth spec @ Vol 3, Part A - 4.1 COMMAND REJECT 366 ''' 367 368 COMMAND_NOT_UNDERSTOOD = 0x0000 369 SIGNALING_MTU_EXCEEDED = 0x0001 370 INVALID_CID_IN_REQUEST = 0x0002 371 372 REASON_NAMES = { 373 COMMAND_NOT_UNDERSTOOD: 'COMMAND_NOT_UNDERSTOOD', 374 SIGNALING_MTU_EXCEEDED: 'SIGNALING_MTU_EXCEEDED', 375 INVALID_CID_IN_REQUEST: 'INVALID_CID_IN_REQUEST', 376 } 377 378 @staticmethod 379 def reason_name(reason: int) -> str: 380 return name_or_number(L2CAP_Command_Reject.REASON_NAMES, reason) 381 382 383# ----------------------------------------------------------------------------- 384@L2CAP_Control_Frame.subclass( 385 # pylint: disable=unnecessary-lambda 386 [ 387 ( 388 'psm', 389 { 390 'parser': lambda data, offset: L2CAP_Connection_Request.parse_psm( 391 data, offset 392 ), 393 'serializer': lambda value: L2CAP_Connection_Request.serialize_psm( 394 value 395 ), 396 }, 397 ), 398 ('source_cid', 2), 399 ] 400) 401class L2CAP_Connection_Request(L2CAP_Control_Frame): 402 ''' 403 See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST 404 ''' 405 406 psm: int 407 source_cid: int 408 409 @staticmethod 410 def parse_psm(data: bytes, offset: int = 0) -> Tuple[int, int]: 411 psm_length = 2 412 psm = data[offset] | data[offset + 1] << 8 413 414 # The PSM field extends until the first even octet (inclusive) 415 while data[offset + psm_length - 1] % 2 == 1: 416 psm |= data[offset + psm_length] << (8 * psm_length) 417 psm_length += 1 418 419 return offset + psm_length, psm 420 421 @staticmethod 422 def serialize_psm(psm: int) -> bytes: 423 serialized = struct.pack('<H', psm & 0xFFFF) 424 psm >>= 16 425 while psm: 426 serialized += bytes([psm & 0xFF]) 427 psm >>= 8 428 429 return serialized 430 431 432# ----------------------------------------------------------------------------- 433@L2CAP_Control_Frame.subclass( 434 # pylint: disable=unnecessary-lambda 435 [ 436 ('destination_cid', 2), 437 ('source_cid', 2), 438 ( 439 'result', 440 {'size': 2, 'mapper': lambda x: L2CAP_Connection_Response.result_name(x)}, 441 ), 442 ('status', 2), 443 ] 444) 445class L2CAP_Connection_Response(L2CAP_Control_Frame): 446 ''' 447 See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE 448 ''' 449 450 source_cid: int 451 destination_cid: int 452 status: int 453 result: int 454 455 CONNECTION_SUCCESSFUL = 0x0000 456 CONNECTION_PENDING = 0x0001 457 CONNECTION_REFUSED_PSM_NOT_SUPPORTED = 0x0002 458 CONNECTION_REFUSED_SECURITY_BLOCK = 0x0003 459 CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004 460 CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0006 461 CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x0007 462 CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B 463 464 # pylint: disable=line-too-long 465 RESULT_NAMES = { 466 CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL', 467 CONNECTION_PENDING: 'CONNECTION_PENDING', 468 CONNECTION_REFUSED_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_PSM_NOT_SUPPORTED', 469 CONNECTION_REFUSED_SECURITY_BLOCK: 'CONNECTION_REFUSED_SECURITY_BLOCK', 470 CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE', 471 CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID', 472 CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED', 473 CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS', 474 } 475 476 @staticmethod 477 def result_name(result: int) -> str: 478 return name_or_number(L2CAP_Connection_Response.RESULT_NAMES, result) 479 480 481# ----------------------------------------------------------------------------- 482@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('flags', 2), ('options', '*')]) 483class L2CAP_Configure_Request(L2CAP_Control_Frame): 484 ''' 485 See Bluetooth spec @ Vol 3, Part A - 4.4 CONFIGURATION REQUEST 486 ''' 487 488 489# ----------------------------------------------------------------------------- 490@L2CAP_Control_Frame.subclass( 491 # pylint: disable=unnecessary-lambda 492 [ 493 ('source_cid', 2), 494 ('flags', 2), 495 ( 496 'result', 497 {'size': 2, 'mapper': lambda x: L2CAP_Configure_Response.result_name(x)}, 498 ), 499 ('options', '*'), 500 ] 501) 502class L2CAP_Configure_Response(L2CAP_Control_Frame): 503 ''' 504 See Bluetooth spec @ Vol 3, Part A - 4.5 CONFIGURATION RESPONSE 505 ''' 506 507 SUCCESS = 0x0000 508 FAILURE_UNACCEPTABLE_PARAMETERS = 0x0001 509 FAILURE_REJECTED = 0x0002 510 FAILURE_UNKNOWN_OPTIONS = 0x0003 511 PENDING = 0x0004 512 FAILURE_FLOW_SPEC_REJECTED = 0x0005 513 514 RESULT_NAMES = { 515 SUCCESS: 'SUCCESS', 516 FAILURE_UNACCEPTABLE_PARAMETERS: 'FAILURE_UNACCEPTABLE_PARAMETERS', 517 FAILURE_REJECTED: 'FAILURE_REJECTED', 518 FAILURE_UNKNOWN_OPTIONS: 'FAILURE_UNKNOWN_OPTIONS', 519 PENDING: 'PENDING', 520 FAILURE_FLOW_SPEC_REJECTED: 'FAILURE_FLOW_SPEC_REJECTED', 521 } 522 523 @staticmethod 524 def result_name(result: int) -> str: 525 return name_or_number(L2CAP_Configure_Response.RESULT_NAMES, result) 526 527 528# ----------------------------------------------------------------------------- 529@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)]) 530class L2CAP_Disconnection_Request(L2CAP_Control_Frame): 531 ''' 532 See Bluetooth spec @ Vol 3, Part A - 4.6 DISCONNECTION REQUEST 533 ''' 534 535 536# ----------------------------------------------------------------------------- 537@L2CAP_Control_Frame.subclass([('destination_cid', 2), ('source_cid', 2)]) 538class L2CAP_Disconnection_Response(L2CAP_Control_Frame): 539 ''' 540 See Bluetooth spec @ Vol 3, Part A - 4.7 DISCONNECTION RESPONSE 541 ''' 542 543 544# ----------------------------------------------------------------------------- 545@L2CAP_Control_Frame.subclass([('data', '*')]) 546class L2CAP_Echo_Request(L2CAP_Control_Frame): 547 ''' 548 See Bluetooth spec @ Vol 3, Part A - 4.8 ECHO REQUEST 549 ''' 550 551 552# ----------------------------------------------------------------------------- 553@L2CAP_Control_Frame.subclass([('data', '*')]) 554class L2CAP_Echo_Response(L2CAP_Control_Frame): 555 ''' 556 See Bluetooth spec @ Vol 3, Part A - 4.9 ECHO RESPONSE 557 ''' 558 559 560# ----------------------------------------------------------------------------- 561@L2CAP_Control_Frame.subclass( 562 [ 563 ( 564 'info_type', 565 { 566 'size': 2, 567 # pylint: disable-next=unnecessary-lambda 568 'mapper': lambda x: L2CAP_Information_Request.info_type_name(x), 569 }, 570 ) 571 ] 572) 573class L2CAP_Information_Request(L2CAP_Control_Frame): 574 ''' 575 See Bluetooth spec @ Vol 3, Part A - 4.10 INFORMATION REQUEST 576 ''' 577 578 CONNECTIONLESS_MTU = 0x0001 579 EXTENDED_FEATURES_SUPPORTED = 0x0002 580 FIXED_CHANNELS_SUPPORTED = 0x0003 581 582 EXTENDED_FEATURE_FLOW_MODE_CONTROL = 0x0001 583 EXTENDED_FEATURE_RETRANSMISSION_MODE = 0x0002 584 EXTENDED_FEATURE_BIDIRECTIONAL_QOS = 0x0004 585 EXTENDED_FEATURE_ENHANCED_RETRANSMISSION_MODE = 0x0008 586 EXTENDED_FEATURE_STREAMING_MODE = 0x0010 587 EXTENDED_FEATURE_FCS_OPTION = 0x0020 588 EXTENDED_FEATURE_EXTENDED_FLOW_SPEC = 0x0040 589 EXTENDED_FEATURE_FIXED_CHANNELS = 0x0080 590 EXTENDED_FEATURE_EXTENDED_WINDOW_SIZE = 0x0100 591 EXTENDED_FEATURE_UNICAST_CONNECTIONLESS_DATA = 0x0200 592 EXTENDED_FEATURE_ENHANCED_CREDIT_BASE_FLOW_CONTROL = 0x0400 593 594 INFO_TYPE_NAMES = { 595 CONNECTIONLESS_MTU: 'CONNECTIONLESS_MTU', 596 EXTENDED_FEATURES_SUPPORTED: 'EXTENDED_FEATURES_SUPPORTED', 597 FIXED_CHANNELS_SUPPORTED: 'FIXED_CHANNELS_SUPPORTED', 598 } 599 600 @staticmethod 601 def info_type_name(info_type: int) -> str: 602 return name_or_number(L2CAP_Information_Request.INFO_TYPE_NAMES, info_type) 603 604 605# ----------------------------------------------------------------------------- 606@L2CAP_Control_Frame.subclass( 607 [ 608 ('info_type', {'size': 2, 'mapper': L2CAP_Information_Request.info_type_name}), 609 ( 610 'result', 611 # pylint: disable-next=unnecessary-lambda 612 {'size': 2, 'mapper': lambda x: L2CAP_Information_Response.result_name(x)}, 613 ), 614 ('data', '*'), 615 ] 616) 617class L2CAP_Information_Response(L2CAP_Control_Frame): 618 ''' 619 See Bluetooth spec @ Vol 3, Part A - 4.11 INFORMATION RESPONSE 620 ''' 621 622 SUCCESS = 0x00 623 NOT_SUPPORTED = 0x01 624 625 RESULT_NAMES = {SUCCESS: 'SUCCESS', NOT_SUPPORTED: 'NOT_SUPPORTED'} 626 627 @staticmethod 628 def result_name(result: int) -> str: 629 return name_or_number(L2CAP_Information_Response.RESULT_NAMES, result) 630 631 632# ----------------------------------------------------------------------------- 633@L2CAP_Control_Frame.subclass( 634 [('interval_min', 2), ('interval_max', 2), ('latency', 2), ('timeout', 2)] 635) 636class L2CAP_Connection_Parameter_Update_Request(L2CAP_Control_Frame): 637 ''' 638 See Bluetooth spec @ Vol 3, Part A - 4.20 CONNECTION PARAMETER UPDATE REQUEST 639 ''' 640 641 642# ----------------------------------------------------------------------------- 643@L2CAP_Control_Frame.subclass([('result', 2)]) 644class L2CAP_Connection_Parameter_Update_Response(L2CAP_Control_Frame): 645 ''' 646 See Bluetooth spec @ Vol 3, Part A - 4.21 CONNECTION PARAMETER UPDATE RESPONSE 647 ''' 648 649 650# ----------------------------------------------------------------------------- 651@L2CAP_Control_Frame.subclass( 652 [('le_psm', 2), ('source_cid', 2), ('mtu', 2), ('mps', 2), ('initial_credits', 2)] 653) 654class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame): 655 ''' 656 See Bluetooth spec @ Vol 3, Part A - 4.22 LE CREDIT BASED CONNECTION REQUEST 657 (CODE 0x14) 658 ''' 659 660 source_cid: int 661 662 663# ----------------------------------------------------------------------------- 664@L2CAP_Control_Frame.subclass( 665 # pylint: disable=unnecessary-lambda,line-too-long 666 [ 667 ('destination_cid', 2), 668 ('mtu', 2), 669 ('mps', 2), 670 ('initial_credits', 2), 671 ( 672 'result', 673 { 674 'size': 2, 675 'mapper': lambda x: L2CAP_LE_Credit_Based_Connection_Response.result_name( 676 x 677 ), 678 }, 679 ), 680 ] 681) 682class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame): 683 ''' 684 See Bluetooth spec @ Vol 3, Part A - 4.23 LE CREDIT BASED CONNECTION RESPONSE 685 (CODE 0x15) 686 ''' 687 688 CONNECTION_SUCCESSFUL = 0x0000 689 CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED = 0x0002 690 CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE = 0x0004 691 CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION = 0x0005 692 CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION = 0x0006 693 CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE = 0x0007 694 CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION = 0x0008 695 CONNECTION_REFUSED_INVALID_SOURCE_CID = 0x0009 696 CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x000A 697 CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS = 0x000B 698 699 # pylint: disable=line-too-long 700 RESULT_NAMES = { 701 CONNECTION_SUCCESSFUL: 'CONNECTION_SUCCESSFUL', 702 CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED: 'CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED', 703 CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE: 'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE', 704 CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHENTICATION', 705 CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION: 'CONNECTION_REFUSED_INSUFFICIENT_AUTHORIZATION', 706 CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION_KEY_SIZE', 707 CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION: 'CONNECTION_REFUSED_INSUFFICIENT_ENCRYPTION', 708 CONNECTION_REFUSED_INVALID_SOURCE_CID: 'CONNECTION_REFUSED_INVALID_SOURCE_CID', 709 CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED: 'CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED', 710 CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS: 'CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS', 711 } 712 713 @staticmethod 714 def result_name(result: int) -> str: 715 return name_or_number( 716 L2CAP_LE_Credit_Based_Connection_Response.RESULT_NAMES, result 717 ) 718 719 720# ----------------------------------------------------------------------------- 721@L2CAP_Control_Frame.subclass([('cid', 2), ('credits', 2)]) 722class L2CAP_LE_Flow_Control_Credit(L2CAP_Control_Frame): 723 ''' 724 See Bluetooth spec @ Vol 3, Part A - 4.24 LE FLOW CONTROL CREDIT (CODE 0x16) 725 ''' 726 727 728# ----------------------------------------------------------------------------- 729class ClassicChannel(EventEmitter): 730 class State(enum.IntEnum): 731 # States 732 CLOSED = 0x00 733 WAIT_CONNECT = 0x01 734 WAIT_CONNECT_RSP = 0x02 735 OPEN = 0x03 736 WAIT_DISCONNECT = 0x04 737 WAIT_CREATE = 0x05 738 WAIT_CREATE_RSP = 0x06 739 WAIT_MOVE = 0x07 740 WAIT_MOVE_RSP = 0x08 741 WAIT_MOVE_CONFIRM = 0x09 742 WAIT_CONFIRM_RSP = 0x0A 743 744 # CONFIG substates 745 WAIT_CONFIG = 0x10 746 WAIT_SEND_CONFIG = 0x11 747 WAIT_CONFIG_REQ_RSP = 0x12 748 WAIT_CONFIG_RSP = 0x13 749 WAIT_CONFIG_REQ = 0x14 750 WAIT_IND_FINAL_RSP = 0x15 751 WAIT_FINAL_RSP = 0x16 752 WAIT_CONTROL_IND = 0x17 753 754 connection_result: Optional[asyncio.Future[None]] 755 disconnection_result: Optional[asyncio.Future[None]] 756 response: Optional[asyncio.Future[bytes]] 757 sink: Optional[Callable[[bytes], Any]] 758 state: State 759 connection: Connection 760 mtu: int 761 peer_mtu: int 762 763 def __init__( 764 self, 765 manager: ChannelManager, 766 connection: Connection, 767 signaling_cid: int, 768 psm: int, 769 source_cid: int, 770 mtu: int, 771 ) -> None: 772 super().__init__() 773 self.manager = manager 774 self.connection = connection 775 self.signaling_cid = signaling_cid 776 self.state = self.State.CLOSED 777 self.mtu = mtu 778 self.peer_mtu = L2CAP_MIN_BR_EDR_MTU 779 self.psm = psm 780 self.source_cid = source_cid 781 self.destination_cid = 0 782 self.response = None 783 self.connection_result = None 784 self.disconnection_result = None 785 self.sink = None 786 787 def _change_state(self, new_state: State) -> None: 788 logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}') 789 self.state = new_state 790 791 def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None: 792 self.manager.send_pdu(self.connection, self.destination_cid, pdu) 793 794 def send_control_frame(self, frame: L2CAP_Control_Frame) -> None: 795 self.manager.send_control_frame(self.connection, self.signaling_cid, frame) 796 797 async def send_request(self, request: SupportsBytes) -> bytes: 798 # Check that there isn't already a request pending 799 if self.response: 800 raise InvalidStateError('request already pending') 801 if self.state != self.State.OPEN: 802 raise InvalidStateError('channel not open') 803 804 self.response = asyncio.get_running_loop().create_future() 805 self.send_pdu(request) 806 return await self.response 807 808 def on_pdu(self, pdu: bytes) -> None: 809 if self.response: 810 self.response.set_result(pdu) 811 self.response = None 812 elif self.sink: 813 # pylint: disable=not-callable 814 self.sink(pdu) 815 else: 816 logger.warning( 817 color('received pdu without a pending request or sink', 'red') 818 ) 819 820 async def connect(self) -> None: 821 if self.state != self.State.CLOSED: 822 raise InvalidStateError('invalid state') 823 824 # Check that we can start a new connection 825 if self.connection_result: 826 raise InvalidStateError('connection already pending') 827 828 self._change_state(self.State.WAIT_CONNECT_RSP) 829 self.send_control_frame( 830 L2CAP_Connection_Request( 831 identifier=self.manager.next_identifier(self.connection), 832 psm=self.psm, 833 source_cid=self.source_cid, 834 ) 835 ) 836 837 # Create a future to wait for the state machine to get to a success or error 838 # state 839 self.connection_result = asyncio.get_running_loop().create_future() 840 841 # Wait for the connection to succeed or fail 842 try: 843 return await self.connection.abort_on( 844 'disconnection', self.connection_result 845 ) 846 finally: 847 self.connection_result = None 848 849 async def disconnect(self) -> None: 850 if self.state != self.State.OPEN: 851 raise InvalidStateError('invalid state') 852 853 self._change_state(self.State.WAIT_DISCONNECT) 854 self.send_control_frame( 855 L2CAP_Disconnection_Request( 856 identifier=self.manager.next_identifier(self.connection), 857 destination_cid=self.destination_cid, 858 source_cid=self.source_cid, 859 ) 860 ) 861 862 # Create a future to wait for the state machine to get to a success or error 863 # state 864 self.disconnection_result = asyncio.get_running_loop().create_future() 865 return await self.disconnection_result 866 867 def abort(self) -> None: 868 if self.state == self.State.OPEN: 869 self._change_state(self.State.CLOSED) 870 self.emit('close') 871 872 def send_configure_request(self) -> None: 873 options = L2CAP_Control_Frame.encode_configuration_options( 874 [ 875 ( 876 L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE, 877 struct.pack('<H', self.mtu), 878 ) 879 ] 880 ) 881 self.send_control_frame( 882 L2CAP_Configure_Request( 883 identifier=self.manager.next_identifier(self.connection), 884 destination_cid=self.destination_cid, 885 flags=0x0000, 886 options=options, 887 ) 888 ) 889 890 def on_connection_request(self, request) -> None: 891 self.destination_cid = request.source_cid 892 self._change_state(self.State.WAIT_CONNECT) 893 self.send_control_frame( 894 L2CAP_Connection_Response( 895 identifier=request.identifier, 896 destination_cid=self.source_cid, 897 source_cid=self.destination_cid, 898 result=L2CAP_Connection_Response.CONNECTION_SUCCESSFUL, 899 status=0x0000, 900 ) 901 ) 902 self._change_state(self.State.WAIT_CONFIG) 903 self.send_configure_request() 904 self._change_state(self.State.WAIT_CONFIG_REQ_RSP) 905 906 def on_connection_response(self, response): 907 if self.state != self.State.WAIT_CONNECT_RSP: 908 logger.warning(color('invalid state', 'red')) 909 return 910 911 if response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL: 912 self.destination_cid = response.destination_cid 913 self._change_state(self.State.WAIT_CONFIG) 914 self.send_configure_request() 915 self._change_state(self.State.WAIT_CONFIG_REQ_RSP) 916 elif response.result == L2CAP_Connection_Response.CONNECTION_PENDING: 917 pass 918 else: 919 self._change_state(self.State.CLOSED) 920 self.connection_result.set_exception( 921 ProtocolError( 922 response.result, 923 'l2cap', 924 L2CAP_Connection_Response.result_name(response.result), 925 ) 926 ) 927 self.connection_result = None 928 929 def on_configure_request(self, request) -> None: 930 if self.state not in ( 931 self.State.WAIT_CONFIG, 932 self.State.WAIT_CONFIG_REQ, 933 self.State.WAIT_CONFIG_REQ_RSP, 934 ): 935 logger.warning(color('invalid state', 'red')) 936 return 937 938 # Decode the options 939 options = L2CAP_Control_Frame.decode_configuration_options(request.options) 940 for option in options: 941 if option[0] == L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE: 942 self.peer_mtu = struct.unpack('<H', option[1])[0] 943 logger.debug(f'peer MTU = {self.peer_mtu}') 944 945 self.send_control_frame( 946 L2CAP_Configure_Response( 947 identifier=request.identifier, 948 source_cid=self.destination_cid, 949 flags=0x0000, 950 result=L2CAP_Configure_Response.SUCCESS, 951 options=request.options, # TODO: don't accept everything blindly 952 ) 953 ) 954 if self.state == self.State.WAIT_CONFIG: 955 self._change_state(self.State.WAIT_SEND_CONFIG) 956 self.send_configure_request() 957 self._change_state(self.State.WAIT_CONFIG_RSP) 958 elif self.state == self.State.WAIT_CONFIG_REQ: 959 self._change_state(self.State.OPEN) 960 if self.connection_result: 961 self.connection_result.set_result(None) 962 self.connection_result = None 963 self.emit('open') 964 elif self.state == self.State.WAIT_CONFIG_REQ_RSP: 965 self._change_state(self.State.WAIT_CONFIG_RSP) 966 967 def on_configure_response(self, response) -> None: 968 if response.result == L2CAP_Configure_Response.SUCCESS: 969 if self.state == self.State.WAIT_CONFIG_REQ_RSP: 970 self._change_state(self.State.WAIT_CONFIG_REQ) 971 elif self.state in ( 972 self.State.WAIT_CONFIG_RSP, 973 self.State.WAIT_CONTROL_IND, 974 ): 975 self._change_state(self.State.OPEN) 976 if self.connection_result: 977 self.connection_result.set_result(None) 978 self.connection_result = None 979 self.emit('open') 980 else: 981 logger.warning(color('invalid state', 'red')) 982 elif ( 983 response.result == L2CAP_Configure_Response.FAILURE_UNACCEPTABLE_PARAMETERS 984 ): 985 # Re-configure with what's suggested in the response 986 self.send_control_frame( 987 L2CAP_Configure_Request( 988 identifier=self.manager.next_identifier(self.connection), 989 destination_cid=self.destination_cid, 990 flags=0x0000, 991 options=response.options, 992 ) 993 ) 994 else: 995 logger.warning( 996 color( 997 '!!! configuration rejected: ' 998 f'{L2CAP_Configure_Response.result_name(response.result)}', 999 'red', 1000 ) 1001 ) 1002 # TODO: decide how to fail gracefully 1003 1004 def on_disconnection_request(self, request) -> None: 1005 if self.state in (self.State.OPEN, self.State.WAIT_DISCONNECT): 1006 self.send_control_frame( 1007 L2CAP_Disconnection_Response( 1008 identifier=request.identifier, 1009 destination_cid=request.destination_cid, 1010 source_cid=request.source_cid, 1011 ) 1012 ) 1013 self._change_state(self.State.CLOSED) 1014 self.emit('close') 1015 self.manager.on_channel_closed(self) 1016 else: 1017 logger.warning(color('invalid state', 'red')) 1018 1019 def on_disconnection_response(self, response) -> None: 1020 if self.state != self.State.WAIT_DISCONNECT: 1021 logger.warning(color('invalid state', 'red')) 1022 return 1023 1024 if ( 1025 response.destination_cid != self.destination_cid 1026 or response.source_cid != self.source_cid 1027 ): 1028 logger.warning('unexpected source or destination CID') 1029 return 1030 1031 self._change_state(self.State.CLOSED) 1032 if self.disconnection_result: 1033 self.disconnection_result.set_result(None) 1034 self.disconnection_result = None 1035 self.emit('close') 1036 self.manager.on_channel_closed(self) 1037 1038 def __str__(self) -> str: 1039 return ( 1040 f'Channel({self.source_cid}->{self.destination_cid}, ' 1041 f'PSM={self.psm}, ' 1042 f'MTU={self.mtu}/{self.peer_mtu}, ' 1043 f'state={self.state.name})' 1044 ) 1045 1046 1047# ----------------------------------------------------------------------------- 1048class LeCreditBasedChannel(EventEmitter): 1049 """ 1050 LE Credit-based Connection Oriented Channel 1051 """ 1052 1053 class State(enum.IntEnum): 1054 INIT = 0 1055 CONNECTED = 1 1056 CONNECTING = 2 1057 DISCONNECTING = 3 1058 DISCONNECTED = 4 1059 CONNECTION_ERROR = 5 1060 1061 out_queue: Deque[bytes] 1062 connection_result: Optional[asyncio.Future[LeCreditBasedChannel]] 1063 disconnection_result: Optional[asyncio.Future[None]] 1064 in_sdu: Optional[bytes] 1065 out_sdu: Optional[bytes] 1066 state: State 1067 connection: Connection 1068 sink: Optional[Callable[[bytes], Any]] 1069 1070 def __init__( 1071 self, 1072 manager: ChannelManager, 1073 connection: Connection, 1074 le_psm: int, 1075 source_cid: int, 1076 destination_cid: int, 1077 mtu: int, 1078 mps: int, 1079 credits: int, # pylint: disable=redefined-builtin 1080 peer_mtu: int, 1081 peer_mps: int, 1082 peer_credits: int, 1083 connected: bool, 1084 ) -> None: 1085 super().__init__() 1086 self.manager = manager 1087 self.connection = connection 1088 self.le_psm = le_psm 1089 self.source_cid = source_cid 1090 self.destination_cid = destination_cid 1091 self.mtu = mtu 1092 self.mps = mps 1093 self.credits = credits 1094 self.peer_mtu = peer_mtu 1095 self.peer_mps = peer_mps 1096 self.peer_credits = peer_credits 1097 self.peer_max_credits = self.peer_credits 1098 self.peer_credits_threshold = self.peer_max_credits // 2 1099 self.in_sdu = None 1100 self.in_sdu_length = 0 1101 self.out_queue = deque() 1102 self.out_sdu = None 1103 self.sink = None 1104 self.connected = False 1105 self.connection_result = None 1106 self.disconnection_result = None 1107 self.drained = asyncio.Event() 1108 1109 self.drained.set() 1110 1111 if connected: 1112 self.state = self.State.CONNECTED 1113 else: 1114 self.state = self.State.INIT 1115 1116 def _change_state(self, new_state: State) -> None: 1117 logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}') 1118 self.state = new_state 1119 1120 if new_state == self.State.CONNECTED: 1121 self.emit('open') 1122 elif new_state == self.State.DISCONNECTED: 1123 self.emit('close') 1124 1125 def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None: 1126 self.manager.send_pdu(self.connection, self.destination_cid, pdu) 1127 1128 def send_control_frame(self, frame: L2CAP_Control_Frame) -> None: 1129 self.manager.send_control_frame(self.connection, L2CAP_LE_SIGNALING_CID, frame) 1130 1131 async def connect(self) -> LeCreditBasedChannel: 1132 # Check that we're in the right state 1133 if self.state != self.State.INIT: 1134 raise InvalidStateError('not in a connectable state') 1135 1136 # Check that we can start a new connection 1137 identifier = self.manager.next_identifier(self.connection) 1138 if identifier in self.manager.le_coc_requests: 1139 raise InvalidStateError('too many concurrent connection requests') 1140 1141 self._change_state(self.State.CONNECTING) 1142 request = L2CAP_LE_Credit_Based_Connection_Request( 1143 identifier=identifier, 1144 le_psm=self.le_psm, 1145 source_cid=self.source_cid, 1146 mtu=self.mtu, 1147 mps=self.mps, 1148 initial_credits=self.peer_credits, 1149 ) 1150 self.manager.le_coc_requests[identifier] = request 1151 self.send_control_frame(request) 1152 1153 # Create a future to wait for the response 1154 self.connection_result = asyncio.get_running_loop().create_future() 1155 1156 # Wait for the connection to succeed or fail 1157 return await self.connection_result 1158 1159 async def disconnect(self) -> None: 1160 # Check that we're connected 1161 if self.state != self.State.CONNECTED: 1162 raise InvalidStateError('not connected') 1163 1164 self._change_state(self.State.DISCONNECTING) 1165 self.flush_output() 1166 self.send_control_frame( 1167 L2CAP_Disconnection_Request( 1168 identifier=self.manager.next_identifier(self.connection), 1169 destination_cid=self.destination_cid, 1170 source_cid=self.source_cid, 1171 ) 1172 ) 1173 1174 # Create a future to wait for the state machine to get to a success or error 1175 # state 1176 self.disconnection_result = asyncio.get_running_loop().create_future() 1177 return await self.disconnection_result 1178 1179 def abort(self) -> None: 1180 if self.state == self.State.CONNECTED: 1181 self._change_state(self.State.DISCONNECTED) 1182 1183 def on_pdu(self, pdu: bytes) -> None: 1184 if self.sink is None: 1185 logger.warning('received pdu without a sink') 1186 return 1187 1188 if self.state != self.State.CONNECTED: 1189 logger.warning('received PDU while not connected, dropping') 1190 1191 # Manage the peer credits 1192 if self.peer_credits == 0: 1193 logger.warning('received LE frame when peer out of credits') 1194 else: 1195 self.peer_credits -= 1 1196 if self.peer_credits <= self.peer_credits_threshold: 1197 # The credits fell below the threshold, replenish them to the max 1198 self.send_control_frame( 1199 L2CAP_LE_Flow_Control_Credit( 1200 identifier=self.manager.next_identifier(self.connection), 1201 cid=self.source_cid, 1202 credits=self.peer_max_credits - self.peer_credits, 1203 ) 1204 ) 1205 self.peer_credits = self.peer_max_credits 1206 1207 # Check if this starts a new SDU 1208 if self.in_sdu is None: 1209 # Start a new SDU 1210 self.in_sdu = pdu 1211 else: 1212 # Continue an SDU 1213 self.in_sdu += pdu 1214 1215 # Check if the SDU is complete 1216 if self.in_sdu_length == 0: 1217 # We don't know the size yet, check if we have received the header to 1218 # compute it 1219 if len(self.in_sdu) >= 2: 1220 self.in_sdu_length = struct.unpack_from('<H', self.in_sdu, 0)[0] 1221 if self.in_sdu_length == 0: 1222 # We'll compute it later 1223 return 1224 if len(self.in_sdu) < 2 + self.in_sdu_length: 1225 # Not complete yet 1226 logger.debug( 1227 f'SDU: {len(self.in_sdu) - 2} of {self.in_sdu_length} bytes received' 1228 ) 1229 return 1230 if len(self.in_sdu) != 2 + self.in_sdu_length: 1231 # Overflow 1232 logger.warning( 1233 f'SDU overflow: sdu_length={self.in_sdu_length}, ' 1234 f'received {len(self.in_sdu) - 2}' 1235 ) 1236 # TODO: we should disconnect 1237 self.in_sdu = None 1238 self.in_sdu_length = 0 1239 return 1240 1241 # Send the SDU to the sink 1242 logger.debug(f'SDU complete: 2+{len(self.in_sdu) - 2} bytes') 1243 self.sink(self.in_sdu[2:]) # pylint: disable=not-callable 1244 1245 # Prepare for a new SDU 1246 self.in_sdu = None 1247 self.in_sdu_length = 0 1248 1249 def on_connection_response(self, response) -> None: 1250 # Look for a matching pending response result 1251 if self.connection_result is None: 1252 logger.warning( 1253 f'received unexpected connection response (id={response.identifier})' 1254 ) 1255 return 1256 1257 if ( 1258 response.result 1259 == L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL 1260 ): 1261 self.destination_cid = response.destination_cid 1262 self.peer_mtu = response.mtu 1263 self.peer_mps = response.mps 1264 self.credits = response.initial_credits 1265 self.connected = True 1266 self.connection_result.set_result(self) 1267 self._change_state(self.State.CONNECTED) 1268 else: 1269 self.connection_result.set_exception( 1270 ProtocolError( 1271 response.result, 1272 'l2cap', 1273 L2CAP_LE_Credit_Based_Connection_Response.result_name( 1274 response.result 1275 ), 1276 ) 1277 ) 1278 self._change_state(self.State.CONNECTION_ERROR) 1279 1280 # Cleanup 1281 self.connection_result = None 1282 1283 def on_credits(self, credits: int) -> None: # pylint: disable=redefined-builtin 1284 self.credits += credits 1285 logger.debug(f'received {credits} credits, total = {self.credits}') 1286 1287 # Try to send more data if we have any queued up 1288 self.process_output() 1289 1290 def on_disconnection_request(self, request) -> None: 1291 self.send_control_frame( 1292 L2CAP_Disconnection_Response( 1293 identifier=request.identifier, 1294 destination_cid=request.destination_cid, 1295 source_cid=request.source_cid, 1296 ) 1297 ) 1298 self._change_state(self.State.DISCONNECTED) 1299 self.flush_output() 1300 1301 def on_disconnection_response(self, response) -> None: 1302 if self.state != self.State.DISCONNECTING: 1303 logger.warning(color('invalid state', 'red')) 1304 return 1305 1306 if ( 1307 response.destination_cid != self.destination_cid 1308 or response.source_cid != self.source_cid 1309 ): 1310 logger.warning('unexpected source or destination CID') 1311 return 1312 1313 self._change_state(self.State.DISCONNECTED) 1314 if self.disconnection_result: 1315 self.disconnection_result.set_result(None) 1316 self.disconnection_result = None 1317 1318 def flush_output(self) -> None: 1319 self.out_queue.clear() 1320 self.out_sdu = None 1321 1322 def process_output(self) -> None: 1323 while self.credits > 0: 1324 if self.out_sdu is not None: 1325 # Finish the current SDU 1326 packet = self.out_sdu[: self.peer_mps] 1327 self.send_pdu(packet) 1328 self.credits -= 1 1329 logger.debug(f'sent {len(packet)} bytes, {self.credits} credits left') 1330 if len(packet) == len(self.out_sdu): 1331 # We sent everything 1332 self.out_sdu = None 1333 else: 1334 # Keep what's still left to send 1335 self.out_sdu = self.out_sdu[len(packet) :] 1336 continue 1337 1338 if self.out_queue: 1339 # Create the next SDU (2 bytes header plus up to MTU bytes payload) 1340 logger.debug( 1341 f'assembling SDU from {len(self.out_queue)} packets in output queue' 1342 ) 1343 payload = b'' 1344 while self.out_queue and len(payload) < self.peer_mtu: 1345 # We can add more data to the payload 1346 chunk = self.out_queue[0][: self.peer_mtu - len(payload)] 1347 payload += chunk 1348 self.out_queue[0] = self.out_queue[0][len(chunk) :] 1349 if len(self.out_queue[0]) == 0: 1350 # We consumed the entire buffer, remove it 1351 self.out_queue.popleft() 1352 logger.debug( 1353 f'packet completed, {len(self.out_queue)} left in queue' 1354 ) 1355 1356 # Construct the SDU with its header 1357 assert len(payload) != 0 1358 logger.debug(f'SDU complete: {len(payload)} payload bytes') 1359 self.out_sdu = struct.pack('<H', len(payload)) + payload 1360 else: 1361 # Nothing left to send for now 1362 self.drained.set() 1363 return 1364 1365 def write(self, data: bytes) -> None: 1366 if self.state != self.State.CONNECTED: 1367 logger.warning('not connected, dropping data') 1368 return 1369 1370 # Queue the data 1371 self.out_queue.append(data) 1372 self.drained.clear() 1373 logger.debug( 1374 f'{len(data)} bytes packet queued, {len(self.out_queue)} packets in queue' 1375 ) 1376 1377 # Send what we can 1378 self.process_output() 1379 1380 async def drain(self) -> None: 1381 await self.drained.wait() 1382 1383 def pause_reading(self) -> None: 1384 # TODO: not implemented yet 1385 pass 1386 1387 def resume_reading(self) -> None: 1388 # TODO: not implemented yet 1389 pass 1390 1391 def __str__(self) -> str: 1392 return ( 1393 f'CoC({self.source_cid}->{self.destination_cid}, ' 1394 f'State={self.state.name}, ' 1395 f'PSM={self.le_psm}, ' 1396 f'MTU={self.mtu}/{self.peer_mtu}, ' 1397 f'MPS={self.mps}/{self.peer_mps}, ' 1398 f'credits={self.credits}/{self.peer_credits})' 1399 ) 1400 1401 1402# ----------------------------------------------------------------------------- 1403class ClassicChannelServer(EventEmitter): 1404 def __init__( 1405 self, 1406 manager: ChannelManager, 1407 psm: int, 1408 handler: Optional[Callable[[ClassicChannel], Any]], 1409 mtu: int, 1410 ) -> None: 1411 super().__init__() 1412 self.manager = manager 1413 self.handler = handler 1414 self.psm = psm 1415 self.mtu = mtu 1416 1417 def on_connection(self, channel: ClassicChannel) -> None: 1418 self.emit('connection', channel) 1419 if self.handler: 1420 self.handler(channel) 1421 1422 def close(self) -> None: 1423 if self.psm in self.manager.servers: 1424 del self.manager.servers[self.psm] 1425 1426 1427# ----------------------------------------------------------------------------- 1428class LeCreditBasedChannelServer(EventEmitter): 1429 def __init__( 1430 self, 1431 manager: ChannelManager, 1432 psm: int, 1433 handler: Optional[Callable[[LeCreditBasedChannel], Any]], 1434 max_credits: int, 1435 mtu: int, 1436 mps: int, 1437 ) -> None: 1438 super().__init__() 1439 self.manager = manager 1440 self.handler = handler 1441 self.psm = psm 1442 self.max_credits = max_credits 1443 self.mtu = mtu 1444 self.mps = mps 1445 1446 def on_connection(self, channel: LeCreditBasedChannel) -> None: 1447 self.emit('connection', channel) 1448 if self.handler: 1449 self.handler(channel) 1450 1451 def close(self) -> None: 1452 if self.psm in self.manager.le_coc_servers: 1453 del self.manager.le_coc_servers[self.psm] 1454 1455 1456# ----------------------------------------------------------------------------- 1457class ChannelManager: 1458 identifiers: Dict[int, int] 1459 channels: Dict[int, Dict[int, Union[ClassicChannel, LeCreditBasedChannel]]] 1460 servers: Dict[int, ClassicChannelServer] 1461 le_coc_channels: Dict[int, Dict[int, LeCreditBasedChannel]] 1462 le_coc_servers: Dict[int, LeCreditBasedChannelServer] 1463 le_coc_requests: Dict[int, L2CAP_LE_Credit_Based_Connection_Request] 1464 fixed_channels: Dict[int, Optional[Callable[[int, bytes], Any]]] 1465 _host: Optional[Host] 1466 connection_parameters_update_response: Optional[asyncio.Future[int]] 1467 1468 def __init__( 1469 self, 1470 extended_features: Iterable[int] = (), 1471 connectionless_mtu: int = L2CAP_DEFAULT_CONNECTIONLESS_MTU, 1472 ) -> None: 1473 self._host = None 1474 self.identifiers = {} # Incrementing identifier values by connection 1475 self.channels = {} # All channels, mapped by connection and source cid 1476 self.fixed_channels = { # Fixed channel handlers, mapped by cid 1477 L2CAP_SIGNALING_CID: None, 1478 L2CAP_LE_SIGNALING_CID: None, 1479 } 1480 self.servers = {} # Servers accepting connections, by PSM 1481 self.le_coc_channels = ( 1482 {} 1483 ) # LE CoC channels, mapped by connection and destination cid 1484 self.le_coc_servers = {} # LE CoC - Servers accepting connections, by PSM 1485 self.le_coc_requests = {} # LE CoC connection requests, by identifier 1486 self.extended_features = extended_features 1487 self.connectionless_mtu = connectionless_mtu 1488 self.connection_parameters_update_response = None 1489 1490 @property 1491 def host(self) -> Host: 1492 assert self._host 1493 return self._host 1494 1495 @host.setter 1496 def host(self, host: Host) -> None: 1497 if self._host is not None: 1498 self._host.remove_listener('disconnection', self.on_disconnection) 1499 self._host = host 1500 if host is not None: 1501 host.on('disconnection', self.on_disconnection) 1502 1503 def find_channel(self, connection_handle: int, cid: int): 1504 if connection_channels := self.channels.get(connection_handle): 1505 return connection_channels.get(cid) 1506 1507 return None 1508 1509 def find_le_coc_channel(self, connection_handle: int, cid: int): 1510 if connection_channels := self.le_coc_channels.get(connection_handle): 1511 return connection_channels.get(cid) 1512 1513 return None 1514 1515 @staticmethod 1516 def find_free_br_edr_cid(channels: Iterable[int]) -> int: 1517 # Pick the smallest valid CID that's not already in the list 1518 # (not necessarily the most efficient algorithm, but the list of CID is 1519 # very small in practice) 1520 for cid in range( 1521 L2CAP_ACL_U_DYNAMIC_CID_RANGE_START, L2CAP_ACL_U_DYNAMIC_CID_RANGE_END + 1 1522 ): 1523 if cid not in channels: 1524 return cid 1525 1526 raise OutOfResourcesError('no free CID available') 1527 1528 @staticmethod 1529 def find_free_le_cid(channels: Iterable[int]) -> int: 1530 # Pick the smallest valid CID that's not already in the list 1531 # (not necessarily the most efficient algorithm, but the list of CID is 1532 # very small in practice) 1533 for cid in range( 1534 L2CAP_LE_U_DYNAMIC_CID_RANGE_START, L2CAP_LE_U_DYNAMIC_CID_RANGE_END + 1 1535 ): 1536 if cid not in channels: 1537 return cid 1538 1539 raise OutOfResourcesError('no free CID') 1540 1541 def next_identifier(self, connection: Connection) -> int: 1542 identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256 1543 self.identifiers[connection.handle] = identifier 1544 return identifier 1545 1546 def register_fixed_channel( 1547 self, cid: int, handler: Callable[[int, bytes], Any] 1548 ) -> None: 1549 self.fixed_channels[cid] = handler 1550 1551 def deregister_fixed_channel(self, cid: int) -> None: 1552 if cid in self.fixed_channels: 1553 del self.fixed_channels[cid] 1554 1555 @deprecated("Please use create_classic_server") 1556 def register_server( 1557 self, 1558 psm: int, 1559 server: Callable[[ClassicChannel], Any], 1560 ) -> int: 1561 return self.create_classic_server( 1562 handler=server, spec=ClassicChannelSpec(psm=psm) 1563 ).psm 1564 1565 def create_classic_server( 1566 self, 1567 spec: ClassicChannelSpec, 1568 handler: Optional[Callable[[ClassicChannel], Any]] = None, 1569 ) -> ClassicChannelServer: 1570 if not spec.psm: 1571 # Find a free PSM 1572 for candidate in range( 1573 L2CAP_PSM_DYNAMIC_RANGE_START, L2CAP_PSM_DYNAMIC_RANGE_END + 1, 2 1574 ): 1575 if (candidate >> 8) % 2 == 1: 1576 continue 1577 if candidate in self.servers: 1578 continue 1579 spec.psm = candidate 1580 break 1581 else: 1582 raise InvalidStateError('no free PSM') 1583 else: 1584 # Check that the PSM isn't already in use 1585 if spec.psm in self.servers: 1586 raise InvalidArgumentError('PSM already in use') 1587 1588 # Check that the PSM is valid 1589 if spec.psm % 2 == 0: 1590 raise InvalidArgumentError('invalid PSM (not odd)') 1591 check = spec.psm >> 8 1592 while check: 1593 if check % 2 != 0: 1594 raise InvalidArgumentError('invalid PSM') 1595 check >>= 8 1596 1597 self.servers[spec.psm] = ClassicChannelServer(self, spec.psm, handler, spec.mtu) 1598 1599 return self.servers[spec.psm] 1600 1601 @deprecated("Please use create_le_credit_based_server()") 1602 def register_le_coc_server( 1603 self, 1604 psm: int, 1605 server: Callable[[LeCreditBasedChannel], Any], 1606 max_credits: int, 1607 mtu: int, 1608 mps: int, 1609 ) -> int: 1610 return self.create_le_credit_based_server( 1611 spec=LeCreditBasedChannelSpec( 1612 psm=None if psm == 0 else psm, mtu=mtu, mps=mps, max_credits=max_credits 1613 ), 1614 handler=server, 1615 ).psm 1616 1617 def create_le_credit_based_server( 1618 self, 1619 spec: LeCreditBasedChannelSpec, 1620 handler: Optional[Callable[[LeCreditBasedChannel], Any]] = None, 1621 ) -> LeCreditBasedChannelServer: 1622 if not spec.psm: 1623 # Find a free PSM 1624 for candidate in range( 1625 L2CAP_LE_PSM_DYNAMIC_RANGE_START, L2CAP_LE_PSM_DYNAMIC_RANGE_END + 1 1626 ): 1627 if candidate in self.le_coc_servers: 1628 continue 1629 spec.psm = candidate 1630 break 1631 else: 1632 raise InvalidStateError('no free PSM') 1633 else: 1634 # Check that the PSM isn't already in use 1635 if spec.psm in self.le_coc_servers: 1636 raise InvalidArgumentError('PSM already in use') 1637 1638 self.le_coc_servers[spec.psm] = LeCreditBasedChannelServer( 1639 self, 1640 spec.psm, 1641 handler, 1642 max_credits=spec.max_credits, 1643 mtu=spec.mtu, 1644 mps=spec.mps, 1645 ) 1646 1647 return self.le_coc_servers[spec.psm] 1648 1649 def on_disconnection(self, connection_handle: int, _reason: int) -> None: 1650 logger.debug(f'disconnection from {connection_handle}, cleaning up channels') 1651 if connection_handle in self.channels: 1652 for _, channel in self.channels[connection_handle].items(): 1653 channel.abort() 1654 del self.channels[connection_handle] 1655 if connection_handle in self.le_coc_channels: 1656 for _, channel in self.le_coc_channels[connection_handle].items(): 1657 channel.abort() 1658 del self.le_coc_channels[connection_handle] 1659 if connection_handle in self.identifiers: 1660 del self.identifiers[connection_handle] 1661 1662 def send_pdu(self, connection, cid: int, pdu: Union[SupportsBytes, bytes]) -> None: 1663 pdu_str = pdu.hex() if isinstance(pdu, bytes) else str(pdu) 1664 pdu_bytes = bytes(pdu) 1665 logger.debug( 1666 f'{color(">>> Sending L2CAP PDU", "blue")} ' 1667 f'on connection [0x{connection.handle:04X}] (CID={cid}) ' 1668 f'{connection.peer_address}: {len(pdu_bytes)} bytes, {pdu_str}' 1669 ) 1670 self.host.send_l2cap_pdu(connection.handle, cid, pdu_bytes) 1671 1672 def on_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None: 1673 if cid in (L2CAP_SIGNALING_CID, L2CAP_LE_SIGNALING_CID): 1674 # Parse the L2CAP payload into a Control Frame object 1675 control_frame = L2CAP_Control_Frame.from_bytes(pdu) 1676 1677 self.on_control_frame(connection, cid, control_frame) 1678 elif cid in self.fixed_channels: 1679 handler = self.fixed_channels[cid] 1680 assert handler is not None 1681 handler(connection.handle, pdu) 1682 else: 1683 if (channel := self.find_channel(connection.handle, cid)) is None: 1684 logger.warning( 1685 color( 1686 f'channel not found for 0x{connection.handle:04X}:{cid}', 'red' 1687 ) 1688 ) 1689 return 1690 1691 channel.on_pdu(pdu) 1692 1693 def send_control_frame( 1694 self, connection: Connection, cid: int, control_frame: L2CAP_Control_Frame 1695 ) -> None: 1696 logger.debug( 1697 f'{color(">>> Sending L2CAP Signaling Control Frame", "blue")} ' 1698 f'on connection [0x{connection.handle:04X}] (CID={cid}) ' 1699 f'{connection.peer_address}:\n{control_frame}' 1700 ) 1701 self.host.send_l2cap_pdu(connection.handle, cid, bytes(control_frame)) 1702 1703 def on_control_frame( 1704 self, connection: Connection, cid: int, control_frame: L2CAP_Control_Frame 1705 ) -> None: 1706 logger.debug( 1707 f'{color("<<< Received L2CAP Signaling Control Frame", "green")} ' 1708 f'on connection [0x{connection.handle:04X}] (CID={cid}) ' 1709 f'{connection.peer_address}:\n{control_frame}' 1710 ) 1711 1712 # Find the handler method 1713 handler_name = f'on_{control_frame.name.lower()}' 1714 handler = getattr(self, handler_name, None) 1715 if handler: 1716 try: 1717 handler(connection, cid, control_frame) 1718 except Exception as error: 1719 logger.warning(f'{color("!!! Exception in handler:", "red")} {error}') 1720 self.send_control_frame( 1721 connection, 1722 cid, 1723 L2CAP_Command_Reject( 1724 identifier=control_frame.identifier, 1725 reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON, 1726 data=b'', 1727 ), 1728 ) 1729 raise error 1730 else: 1731 logger.error(color('Channel Manager command not handled???', 'red')) 1732 self.send_control_frame( 1733 connection, 1734 cid, 1735 L2CAP_Command_Reject( 1736 identifier=control_frame.identifier, 1737 reason=L2CAP_COMMAND_NOT_UNDERSTOOD_REASON, 1738 data=b'', 1739 ), 1740 ) 1741 1742 def on_l2cap_command_reject( 1743 self, _connection: Connection, _cid: int, packet 1744 ) -> None: 1745 logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}') 1746 1747 def on_l2cap_connection_request( 1748 self, connection: Connection, cid: int, request 1749 ) -> None: 1750 # Check if there's a server for this PSM 1751 server = self.servers.get(request.psm) 1752 if server: 1753 # Find a free CID for this new channel 1754 connection_channels = self.channels.setdefault(connection.handle, {}) 1755 source_cid = self.find_free_br_edr_cid(connection_channels) 1756 if source_cid is None: # Should never happen! 1757 self.send_control_frame( 1758 connection, 1759 cid, 1760 L2CAP_Connection_Response( 1761 identifier=request.identifier, 1762 destination_cid=request.source_cid, 1763 source_cid=0, 1764 # pylint: disable=line-too-long 1765 result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, 1766 status=0x0000, 1767 ), 1768 ) 1769 return 1770 1771 # Create a new channel 1772 logger.debug( 1773 f'creating server channel with cid={source_cid} for psm {request.psm}' 1774 ) 1775 channel = ClassicChannel( 1776 self, connection, cid, request.psm, source_cid, server.mtu 1777 ) 1778 connection_channels[source_cid] = channel 1779 1780 # Notify 1781 server.on_connection(channel) 1782 channel.on_connection_request(request) 1783 else: 1784 logger.warning( 1785 f'No server for connection 0x{connection.handle:04X} ' 1786 f'on PSM {request.psm}' 1787 ) 1788 self.send_control_frame( 1789 connection, 1790 cid, 1791 L2CAP_Connection_Response( 1792 identifier=request.identifier, 1793 destination_cid=request.source_cid, 1794 source_cid=0, 1795 # pylint: disable=line-too-long 1796 result=L2CAP_Connection_Response.CONNECTION_REFUSED_PSM_NOT_SUPPORTED, 1797 status=0x0000, 1798 ), 1799 ) 1800 1801 def on_l2cap_connection_response( 1802 self, connection: Connection, cid: int, response 1803 ) -> None: 1804 if ( 1805 channel := self.find_channel(connection.handle, response.source_cid) 1806 ) is None: 1807 logger.warning( 1808 color( 1809 f'channel {response.source_cid} not found for ' 1810 f'0x{connection.handle:04X}:{cid}', 1811 'red', 1812 ) 1813 ) 1814 return 1815 1816 channel.on_connection_response(response) 1817 1818 def on_l2cap_configure_request( 1819 self, connection: Connection, cid: int, request 1820 ) -> None: 1821 if ( 1822 channel := self.find_channel(connection.handle, request.destination_cid) 1823 ) is None: 1824 logger.warning( 1825 color( 1826 f'channel {request.destination_cid} not found for ' 1827 f'0x{connection.handle:04X}:{cid}', 1828 'red', 1829 ) 1830 ) 1831 return 1832 1833 channel.on_configure_request(request) 1834 1835 def on_l2cap_configure_response( 1836 self, connection: Connection, cid: int, response 1837 ) -> None: 1838 if ( 1839 channel := self.find_channel(connection.handle, response.source_cid) 1840 ) is None: 1841 logger.warning( 1842 color( 1843 f'channel {response.source_cid} not found for ' 1844 f'0x{connection.handle:04X}:{cid}', 1845 'red', 1846 ) 1847 ) 1848 return 1849 1850 channel.on_configure_response(response) 1851 1852 def on_l2cap_disconnection_request( 1853 self, connection: Connection, cid: int, request 1854 ) -> None: 1855 if ( 1856 channel := self.find_channel(connection.handle, request.destination_cid) 1857 ) is None: 1858 logger.warning( 1859 color( 1860 f'channel {request.destination_cid} not found for ' 1861 f'0x{connection.handle:04X}:{cid}', 1862 'red', 1863 ) 1864 ) 1865 return 1866 1867 channel.on_disconnection_request(request) 1868 1869 def on_l2cap_disconnection_response( 1870 self, connection: Connection, cid: int, response 1871 ) -> None: 1872 if ( 1873 channel := self.find_channel(connection.handle, response.source_cid) 1874 ) is None: 1875 logger.warning( 1876 color( 1877 f'channel {response.source_cid} not found for ' 1878 f'0x{connection.handle:04X}:{cid}', 1879 'red', 1880 ) 1881 ) 1882 return 1883 1884 channel.on_disconnection_response(response) 1885 1886 def on_l2cap_echo_request(self, connection: Connection, cid: int, request) -> None: 1887 logger.debug(f'<<< Echo request: data={request.data.hex()}') 1888 self.send_control_frame( 1889 connection, 1890 cid, 1891 L2CAP_Echo_Response(identifier=request.identifier, data=request.data), 1892 ) 1893 1894 def on_l2cap_echo_response( 1895 self, _connection: Connection, _cid: int, response 1896 ) -> None: 1897 logger.debug(f'<<< Echo response: data={response.data.hex()}') 1898 # TODO notify listeners 1899 1900 def on_l2cap_information_request( 1901 self, connection: Connection, cid: int, request 1902 ) -> None: 1903 if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU: 1904 result = L2CAP_Information_Response.SUCCESS 1905 data = self.connectionless_mtu.to_bytes(2, 'little') 1906 elif request.info_type == L2CAP_Information_Request.EXTENDED_FEATURES_SUPPORTED: 1907 result = L2CAP_Information_Response.SUCCESS 1908 data = sum(self.extended_features).to_bytes(4, 'little') 1909 elif request.info_type == L2CAP_Information_Request.FIXED_CHANNELS_SUPPORTED: 1910 result = L2CAP_Information_Response.SUCCESS 1911 data = sum(1 << cid for cid in self.fixed_channels).to_bytes(8, 'little') 1912 else: 1913 result = L2CAP_Information_Response.NOT_SUPPORTED 1914 1915 self.send_control_frame( 1916 connection, 1917 cid, 1918 L2CAP_Information_Response( 1919 identifier=request.identifier, 1920 info_type=request.info_type, 1921 result=result, 1922 data=data, 1923 ), 1924 ) 1925 1926 def on_l2cap_connection_parameter_update_request( 1927 self, connection: Connection, cid: int, request 1928 ): 1929 if connection.role == BT_CENTRAL_ROLE: 1930 self.send_control_frame( 1931 connection, 1932 cid, 1933 L2CAP_Connection_Parameter_Update_Response( 1934 identifier=request.identifier, 1935 result=L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT, 1936 ), 1937 ) 1938 self.host.send_command_sync( 1939 HCI_LE_Connection_Update_Command( 1940 connection_handle=connection.handle, 1941 connection_interval_min=request.interval_min, 1942 connection_interval_max=request.interval_max, 1943 max_latency=request.latency, 1944 supervision_timeout=request.timeout, 1945 min_ce_length=0, 1946 max_ce_length=0, 1947 ) 1948 ) 1949 else: 1950 self.send_control_frame( 1951 connection, 1952 cid, 1953 L2CAP_Connection_Parameter_Update_Response( 1954 identifier=request.identifier, 1955 result=L2CAP_CONNECTION_PARAMETERS_REJECTED_RESULT, 1956 ), 1957 ) 1958 1959 async def update_connection_parameters( 1960 self, 1961 connection: Connection, 1962 interval_min: int, 1963 interval_max: int, 1964 latency: int, 1965 timeout: int, 1966 ) -> int: 1967 # Check that there isn't already a request pending 1968 if self.connection_parameters_update_response: 1969 raise InvalidStateError('request already pending') 1970 self.connection_parameters_update_response = ( 1971 asyncio.get_running_loop().create_future() 1972 ) 1973 self.send_control_frame( 1974 connection, 1975 L2CAP_LE_SIGNALING_CID, 1976 L2CAP_Connection_Parameter_Update_Request( 1977 interval_min=interval_min, 1978 interval_max=interval_max, 1979 latency=latency, 1980 timeout=timeout, 1981 ), 1982 ) 1983 return await self.connection_parameters_update_response 1984 1985 def on_l2cap_connection_parameter_update_response( 1986 self, connection: Connection, cid: int, response 1987 ) -> None: 1988 if self.connection_parameters_update_response: 1989 self.connection_parameters_update_response.set_result(response.result) 1990 self.connection_parameters_update_response = None 1991 else: 1992 logger.warning( 1993 color( 1994 'received l2cap_connection_parameter_update_response without a pending request', 1995 'red', 1996 ) 1997 ) 1998 1999 def on_l2cap_le_credit_based_connection_request( 2000 self, connection: Connection, cid: int, request 2001 ) -> None: 2002 if request.le_psm in self.le_coc_servers: 2003 server = self.le_coc_servers[request.le_psm] 2004 2005 # Check that the CID isn't already used 2006 le_connection_channels = self.le_coc_channels.setdefault( 2007 connection.handle, {} 2008 ) 2009 if request.source_cid in le_connection_channels: 2010 logger.warning(f'source CID {request.source_cid} already in use') 2011 self.send_control_frame( 2012 connection, 2013 cid, 2014 L2CAP_LE_Credit_Based_Connection_Response( 2015 identifier=request.identifier, 2016 destination_cid=0, 2017 mtu=server.mtu, 2018 mps=server.mps, 2019 initial_credits=0, 2020 # pylint: disable=line-too-long 2021 result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED, 2022 ), 2023 ) 2024 return 2025 2026 # Find a free CID for this new channel 2027 connection_channels = self.channels.setdefault(connection.handle, {}) 2028 source_cid = self.find_free_le_cid(connection_channels) 2029 if source_cid is None: # Should never happen! 2030 self.send_control_frame( 2031 connection, 2032 cid, 2033 L2CAP_LE_Credit_Based_Connection_Response( 2034 identifier=request.identifier, 2035 destination_cid=0, 2036 mtu=server.mtu, 2037 mps=server.mps, 2038 initial_credits=0, 2039 # pylint: disable=line-too-long 2040 result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, 2041 ), 2042 ) 2043 return 2044 2045 # Create a new channel 2046 logger.debug( 2047 f'creating LE CoC server channel with cid={source_cid} for psm ' 2048 f'{request.le_psm}' 2049 ) 2050 channel = LeCreditBasedChannel( 2051 self, 2052 connection, 2053 request.le_psm, 2054 source_cid, 2055 request.source_cid, 2056 server.mtu, 2057 server.mps, 2058 request.initial_credits, 2059 request.mtu, 2060 request.mps, 2061 server.max_credits, 2062 True, 2063 ) 2064 connection_channels[source_cid] = channel 2065 le_connection_channels[request.source_cid] = channel 2066 2067 # Respond 2068 self.send_control_frame( 2069 connection, 2070 cid, 2071 L2CAP_LE_Credit_Based_Connection_Response( 2072 identifier=request.identifier, 2073 destination_cid=source_cid, 2074 mtu=server.mtu, 2075 mps=server.mps, 2076 initial_credits=server.max_credits, 2077 # pylint: disable=line-too-long 2078 result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL, 2079 ), 2080 ) 2081 2082 # Notify 2083 server.on_connection(channel) 2084 else: 2085 logger.info( 2086 f'No LE server for connection 0x{connection.handle:04X} ' 2087 f'on PSM {request.le_psm}' 2088 ) 2089 self.send_control_frame( 2090 connection, 2091 cid, 2092 L2CAP_LE_Credit_Based_Connection_Response( 2093 identifier=request.identifier, 2094 destination_cid=0, 2095 mtu=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU, 2096 mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS, 2097 initial_credits=0, 2098 # pylint: disable=line-too-long 2099 result=L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED, 2100 ), 2101 ) 2102 2103 def on_l2cap_le_credit_based_connection_response( 2104 self, connection: Connection, _cid: int, response 2105 ) -> None: 2106 # Find the pending request by identifier 2107 request = self.le_coc_requests.get(response.identifier) 2108 if request is None: 2109 logger.warning(color('!!! received response for unknown request', 'red')) 2110 return 2111 del self.le_coc_requests[response.identifier] 2112 2113 # Find the channel for this request 2114 channel = self.find_channel(connection.handle, request.source_cid) 2115 if channel is None: 2116 logger.warning( 2117 color( 2118 'received connection response for an unknown channel ' 2119 f'(cid={request.source_cid})', 2120 'red', 2121 ) 2122 ) 2123 return 2124 2125 # Process the response 2126 channel.on_connection_response(response) 2127 2128 def on_l2cap_le_flow_control_credit( 2129 self, connection: Connection, _cid: int, credit 2130 ) -> None: 2131 channel = self.find_le_coc_channel(connection.handle, credit.cid) 2132 if channel is None: 2133 logger.warning(f'received credits for an unknown channel (cid={credit.cid}') 2134 return 2135 2136 channel.on_credits(credit.credits) 2137 2138 def on_channel_closed(self, channel: ClassicChannel) -> None: 2139 connection_channels = self.channels.get(channel.connection.handle) 2140 if connection_channels: 2141 if channel.source_cid in connection_channels: 2142 del connection_channels[channel.source_cid] 2143 2144 @deprecated("Please use create_le_credit_based_channel()") 2145 async def open_le_coc( 2146 self, connection: Connection, psm: int, max_credits: int, mtu: int, mps: int 2147 ) -> LeCreditBasedChannel: 2148 return await self.create_le_credit_based_channel( 2149 connection=connection, 2150 spec=LeCreditBasedChannelSpec( 2151 psm=psm, max_credits=max_credits, mtu=mtu, mps=mps 2152 ), 2153 ) 2154 2155 async def create_le_credit_based_channel( 2156 self, 2157 connection: Connection, 2158 spec: LeCreditBasedChannelSpec, 2159 ) -> LeCreditBasedChannel: 2160 # Find a free CID for the new channel 2161 connection_channels = self.channels.setdefault(connection.handle, {}) 2162 source_cid = self.find_free_le_cid(connection_channels) 2163 if source_cid is None: # Should never happen! 2164 raise OutOfResourcesError('all CIDs already in use') 2165 2166 if spec.psm is None: 2167 raise InvalidArgumentError('PSM cannot be None') 2168 2169 # Create the channel 2170 logger.debug(f'creating coc channel with cid={source_cid} for psm {spec.psm}') 2171 channel = LeCreditBasedChannel( 2172 manager=self, 2173 connection=connection, 2174 le_psm=spec.psm, 2175 source_cid=source_cid, 2176 destination_cid=0, 2177 mtu=spec.mtu, 2178 mps=spec.mps, 2179 credits=0, 2180 peer_mtu=0, 2181 peer_mps=0, 2182 peer_credits=spec.max_credits, 2183 connected=False, 2184 ) 2185 connection_channels[source_cid] = channel 2186 2187 # Connect 2188 try: 2189 await channel.connect() 2190 except Exception as error: 2191 logger.warning(f'connection failed: {error}') 2192 del connection_channels[source_cid] 2193 raise 2194 2195 # Remember the channel by source CID and destination CID 2196 le_connection_channels = self.le_coc_channels.setdefault(connection.handle, {}) 2197 le_connection_channels[channel.destination_cid] = channel 2198 2199 return channel 2200 2201 @deprecated("Please use create_classic_channel()") 2202 async def connect(self, connection: Connection, psm: int) -> ClassicChannel: 2203 return await self.create_classic_channel( 2204 connection=connection, spec=ClassicChannelSpec(psm=psm) 2205 ) 2206 2207 async def create_classic_channel( 2208 self, connection: Connection, spec: ClassicChannelSpec 2209 ) -> ClassicChannel: 2210 # NOTE: this implementation hard-codes BR/EDR 2211 2212 # Find a free CID for a new channel 2213 connection_channels = self.channels.setdefault(connection.handle, {}) 2214 source_cid = self.find_free_br_edr_cid(connection_channels) 2215 if source_cid is None: # Should never happen! 2216 raise OutOfResourcesError('all CIDs already in use') 2217 2218 if spec.psm is None: 2219 raise InvalidArgumentError('PSM cannot be None') 2220 2221 # Create the channel 2222 logger.debug( 2223 f'creating client channel with cid={source_cid} for psm {spec.psm}' 2224 ) 2225 channel = ClassicChannel( 2226 self, 2227 connection, 2228 L2CAP_SIGNALING_CID, 2229 spec.psm, 2230 source_cid, 2231 spec.mtu, 2232 ) 2233 connection_channels[source_cid] = channel 2234 2235 # Connect 2236 try: 2237 await channel.connect() 2238 except BaseException as e: 2239 del connection_channels[source_cid] 2240 raise e 2241 2242 return channel 2243 2244 2245# ----------------------------------------------------------------------------- 2246# Deprecated Classes 2247# ----------------------------------------------------------------------------- 2248 2249 2250class Channel(ClassicChannel): 2251 @deprecated("Please use ClassicChannel") 2252 def __init__(self, *args, **kwargs) -> None: 2253 super().__init__(*args, **kwargs) 2254 2255 2256class LeConnectionOrientedChannel(LeCreditBasedChannel): 2257 @deprecated("Please use LeCreditBasedChannel") 2258 def __init__(self, *args, **kwargs) -> None: 2259 super().__init__(*args, **kwargs) 2260