1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19import asyncio 20from collections.abc import Iterable 21from contextlib import ( 22 asynccontextmanager, 23 AsyncExitStack, 24 closing, 25) 26import copy 27from dataclasses import dataclass, field 28from enum import Enum, IntEnum 29import functools 30import itertools 31import json 32import logging 33import secrets 34import sys 35from typing import ( 36 Any, 37 Callable, 38 ClassVar, 39 Dict, 40 List, 41 Optional, 42 Tuple, 43 Type, 44 TypeVar, 45 Union, 46 cast, 47 overload, 48 TYPE_CHECKING, 49) 50from typing_extensions import Self 51 52from pyee import EventEmitter 53 54from bumble import hci 55from .colors import color 56from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU 57from .gatt import Characteristic, Descriptor, Service 58from .hci import ( 59 HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, 60 HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, 61 HCI_CENTRAL_ROLE, 62 HCI_PERIPHERAL_ROLE, 63 HCI_COMMAND_STATUS_PENDING, 64 HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, 65 HCI_DISPLAY_YES_NO_IO_CAPABILITY, 66 HCI_DISPLAY_ONLY_IO_CAPABILITY, 67 HCI_EXTENDED_INQUIRY_MODE, 68 HCI_GENERAL_INQUIRY_LAP, 69 HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, 70 HCI_KEYBOARD_ONLY_IO_CAPABILITY, 71 HCI_LE_1M_PHY, 72 HCI_LE_1M_PHY_BIT, 73 HCI_LE_2M_PHY, 74 HCI_LE_CODED_PHY, 75 HCI_LE_CODED_PHY_BIT, 76 HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND, 77 HCI_LE_RAND_COMMAND, 78 HCI_LE_READ_PHY_COMMAND, 79 HCI_LE_SET_PHY_COMMAND, 80 HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 81 HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 82 HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 83 HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 84 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, 85 HCI_OPERATION_CANCELLED_BY_HOST_ERROR, 86 HCI_R2_PAGE_SCAN_REPETITION_MODE, 87 HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, 88 HCI_SUCCESS, 89 HCI_WRITE_LE_HOST_SUPPORT_COMMAND, 90 HCI_Accept_Connection_Request_Command, 91 HCI_Authentication_Requested_Command, 92 HCI_Command_Status_Event, 93 HCI_Constant, 94 HCI_Create_Connection_Cancel_Command, 95 HCI_Create_Connection_Command, 96 HCI_Connection_Complete_Event, 97 HCI_Disconnect_Command, 98 HCI_Encryption_Change_Event, 99 HCI_Error, 100 HCI_IO_Capability_Request_Reply_Command, 101 HCI_Inquiry_Cancel_Command, 102 HCI_Inquiry_Command, 103 HCI_IsoDataPacket, 104 HCI_LE_Accept_CIS_Request_Command, 105 HCI_LE_Add_Device_To_Resolving_List_Command, 106 HCI_LE_Advertising_Report_Event, 107 HCI_LE_BIGInfo_Advertising_Report_Event, 108 HCI_LE_Clear_Resolving_List_Command, 109 HCI_LE_Connection_Update_Command, 110 HCI_LE_Create_Connection_Cancel_Command, 111 HCI_LE_Create_Connection_Command, 112 HCI_LE_Create_CIS_Command, 113 HCI_LE_Periodic_Advertising_Create_Sync_Command, 114 HCI_LE_Periodic_Advertising_Create_Sync_Cancel_Command, 115 HCI_LE_Periodic_Advertising_Report_Event, 116 HCI_LE_Periodic_Advertising_Sync_Transfer_Command, 117 HCI_LE_Periodic_Advertising_Terminate_Sync_Command, 118 HCI_LE_Enable_Encryption_Command, 119 HCI_LE_Extended_Advertising_Report_Event, 120 HCI_LE_Extended_Create_Connection_Command, 121 HCI_LE_Rand_Command, 122 HCI_LE_Read_PHY_Command, 123 HCI_LE_Read_Remote_Features_Command, 124 HCI_LE_Reject_CIS_Request_Command, 125 HCI_LE_Remove_Advertising_Set_Command, 126 HCI_LE_Set_Address_Resolution_Enable_Command, 127 HCI_LE_Set_Advertising_Data_Command, 128 HCI_LE_Set_Advertising_Enable_Command, 129 HCI_LE_Set_Advertising_Parameters_Command, 130 HCI_LE_Set_Advertising_Set_Random_Address_Command, 131 HCI_LE_Set_CIG_Parameters_Command, 132 HCI_LE_Set_Data_Length_Command, 133 HCI_LE_Set_Default_PHY_Command, 134 HCI_LE_Set_Extended_Scan_Enable_Command, 135 HCI_LE_Set_Extended_Scan_Parameters_Command, 136 HCI_LE_Set_Extended_Scan_Response_Data_Command, 137 HCI_LE_Set_Extended_Advertising_Data_Command, 138 HCI_LE_Set_Extended_Advertising_Enable_Command, 139 HCI_LE_Set_Extended_Advertising_Parameters_Command, 140 HCI_LE_Set_Host_Feature_Command, 141 HCI_LE_Set_Periodic_Advertising_Enable_Command, 142 HCI_LE_Set_PHY_Command, 143 HCI_LE_Set_Random_Address_Command, 144 HCI_LE_Set_Scan_Enable_Command, 145 HCI_LE_Set_Scan_Parameters_Command, 146 HCI_LE_Set_Scan_Response_Data_Command, 147 HCI_PIN_Code_Request_Reply_Command, 148 HCI_PIN_Code_Request_Negative_Reply_Command, 149 HCI_Read_BD_ADDR_Command, 150 HCI_Read_RSSI_Command, 151 HCI_Reject_Connection_Request_Command, 152 HCI_Remote_Name_Request_Command, 153 HCI_Switch_Role_Command, 154 HCI_Set_Connection_Encryption_Command, 155 HCI_StatusError, 156 HCI_SynchronousDataPacket, 157 HCI_User_Confirmation_Request_Negative_Reply_Command, 158 HCI_User_Confirmation_Request_Reply_Command, 159 HCI_User_Passkey_Request_Negative_Reply_Command, 160 HCI_User_Passkey_Request_Reply_Command, 161 HCI_Write_Class_Of_Device_Command, 162 HCI_Write_Extended_Inquiry_Response_Command, 163 HCI_Write_Inquiry_Mode_Command, 164 HCI_Write_LE_Host_Support_Command, 165 HCI_Write_Local_Name_Command, 166 HCI_Write_Scan_Enable_Command, 167 HCI_Write_Secure_Connections_Host_Support_Command, 168 HCI_Write_Simple_Pairing_Mode_Command, 169 Address, 170 OwnAddressType, 171 LeFeature, 172 LeFeatureMask, 173 LmpFeatureMask, 174 Phy, 175 phy_list_to_bits, 176) 177from .host import Host 178from .profiles.gap import GenericAccessService 179from .core import ( 180 BT_BR_EDR_TRANSPORT, 181 BT_CENTRAL_ROLE, 182 BT_LE_TRANSPORT, 183 BT_PERIPHERAL_ROLE, 184 AdvertisingData, 185 BaseBumbleError, 186 ConnectionParameterUpdateError, 187 CommandTimeoutError, 188 ConnectionParameters, 189 ConnectionPHY, 190 InvalidArgumentError, 191 InvalidOperationError, 192 InvalidStateError, 193 NotSupportedError, 194 OutOfResourcesError, 195 UnreachableError, 196) 197from .utils import ( 198 AsyncRunner, 199 CompositeEventEmitter, 200 EventWatcher, 201 setup_event_forwarding, 202 composite_listener, 203 deprecated, 204 experimental, 205) 206from .keys import ( 207 KeyStore, 208 PairingKeys, 209) 210from bumble import pairing 211from bumble import gatt_client 212from bumble import gatt_server 213from bumble import smp 214from bumble import sdp 215from bumble import l2cap 216from bumble import core 217 218if TYPE_CHECKING: 219 from .transport.common import TransportSource, TransportSink 220 221 222# ----------------------------------------------------------------------------- 223# Logging 224# ----------------------------------------------------------------------------- 225logger = logging.getLogger(__name__) 226 227# ----------------------------------------------------------------------------- 228# Constants 229# ----------------------------------------------------------------------------- 230# fmt: off 231# pylint: disable=line-too-long 232 233DEVICE_MIN_SCAN_INTERVAL = 25 234DEVICE_MAX_SCAN_INTERVAL = 10240 235DEVICE_MIN_SCAN_WINDOW = 25 236DEVICE_MAX_SCAN_WINDOW = 10240 237DEVICE_MIN_LE_RSSI = -127 238DEVICE_MAX_LE_RSSI = 20 239DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00 240DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF 241 242DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00' 243DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms 244DEVICE_DEFAULT_ADVERTISING_DATA = '' 245DEVICE_DEFAULT_NAME = 'Bumble' 246DEVICE_DEFAULT_INQUIRY_LENGTH = 8 # 10.24 seconds 247DEVICE_DEFAULT_CLASS_OF_DEVICE = 0 248DEVICE_DEFAULT_SCAN_RESPONSE_DATA = b'' 249DEVICE_DEFAULT_DATA_LENGTH = (27, 328, 27, 328) 250DEVICE_DEFAULT_SCAN_INTERVAL = 60 # ms 251DEVICE_DEFAULT_SCAN_WINDOW = 60 # ms 252DEVICE_DEFAULT_CONNECT_TIMEOUT = None # No timeout 253DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL = 60 # ms 254DEVICE_DEFAULT_CONNECT_SCAN_WINDOW = 60 # ms 255DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN = 15 # ms 256DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX = 30 # ms 257DEVICE_DEFAULT_CONNECTION_MAX_LATENCY = 0 258DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT = 720 # ms 259DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH = 0 # ms 260DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH = 0 # ms 261DEVICE_DEFAULT_L2CAP_COC_MTU = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU 262DEVICE_DEFAULT_L2CAP_COC_MPS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS 263DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS 264DEVICE_DEFAULT_ADVERTISING_TX_POWER = ( 265 HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE 266) 267DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP = 0 268DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT = 5.0 269DEVICE_DEFAULT_LE_RPA_TIMEOUT = 15 * 60 # 15 minutes (in seconds) 270 271# fmt: on 272# pylint: enable=line-too-long 273 274# As specified in 7.8.56 LE Set Extended Advertising Enable command 275DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION = 1.28 276 277 278# ----------------------------------------------------------------------------- 279# Classes 280# ----------------------------------------------------------------------------- 281class ObjectLookupError(BaseBumbleError): 282 """Error raised when failed to lookup an object.""" 283 284 285# ----------------------------------------------------------------------------- 286@dataclass 287class Advertisement: 288 # Attributes 289 address: Address 290 rssi: int = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE 291 is_legacy: bool = False 292 is_anonymous: bool = False 293 is_connectable: bool = False 294 is_directed: bool = False 295 is_scannable: bool = False 296 is_scan_response: bool = False 297 is_complete: bool = True 298 is_truncated: bool = False 299 primary_phy: int = 0 300 secondary_phy: int = 0 301 tx_power: int = ( 302 HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE 303 ) 304 sid: int = 0 305 data_bytes: bytes = b'' 306 307 # Constants 308 TX_POWER_NOT_AVAILABLE: ClassVar[int] = ( 309 HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE 310 ) 311 RSSI_NOT_AVAILABLE: ClassVar[int] = ( 312 HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE 313 ) 314 315 def __post_init__(self) -> None: 316 self.data = AdvertisingData.from_bytes(self.data_bytes) 317 318 @classmethod 319 def from_advertising_report(cls, report) -> Optional[Advertisement]: 320 if isinstance(report, HCI_LE_Advertising_Report_Event.Report): 321 return LegacyAdvertisement.from_advertising_report(report) 322 323 if isinstance(report, HCI_LE_Extended_Advertising_Report_Event.Report): 324 return ExtendedAdvertisement.from_advertising_report(report) 325 326 return None 327 328 329# ----------------------------------------------------------------------------- 330class LegacyAdvertisement(Advertisement): 331 @classmethod 332 def from_advertising_report(cls, report): 333 return cls( 334 address=report.address, 335 rssi=report.rssi, 336 is_legacy=True, 337 is_connectable=report.event_type 338 in ( 339 HCI_LE_Advertising_Report_Event.ADV_IND, 340 HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, 341 ), 342 is_directed=report.event_type 343 == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, 344 is_scannable=report.event_type 345 in ( 346 HCI_LE_Advertising_Report_Event.ADV_IND, 347 HCI_LE_Advertising_Report_Event.ADV_SCAN_IND, 348 ), 349 is_scan_response=report.event_type 350 == HCI_LE_Advertising_Report_Event.SCAN_RSP, 351 data_bytes=report.data, 352 ) 353 354 355# ----------------------------------------------------------------------------- 356class ExtendedAdvertisement(Advertisement): 357 @classmethod 358 def from_advertising_report(cls, report): 359 # fmt: off 360 # pylint: disable=line-too-long 361 return cls( 362 address = report.address, 363 rssi = report.rssi, 364 is_legacy = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0, 365 is_anonymous = report.address.address_type == HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE, 366 is_connectable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0, 367 is_directed = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0, 368 is_scannable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0, 369 is_scan_response = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0, 370 is_complete = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE, 371 is_truncated = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME, 372 primary_phy = report.primary_phy, 373 secondary_phy = report.secondary_phy, 374 tx_power = report.tx_power, 375 sid = report.advertising_sid, 376 data_bytes = report.data 377 ) 378 # fmt: on 379 380 381# ----------------------------------------------------------------------------- 382class AdvertisementDataAccumulator: 383 def __init__(self, passive=False): 384 self.passive = passive 385 self.last_advertisement = None 386 self.last_data = b'' 387 388 def update(self, report): 389 advertisement = Advertisement.from_advertising_report(report) 390 if advertisement is None: 391 return None 392 393 result = None 394 395 if advertisement.is_scan_response: 396 if ( 397 self.last_advertisement is not None 398 and not self.last_advertisement.is_scan_response 399 ): 400 # This is the response to a scannable advertisement 401 result = Advertisement.from_advertising_report(report) 402 result.is_connectable = self.last_advertisement.is_connectable 403 result.is_scannable = True 404 result.data = AdvertisingData.from_bytes(self.last_data + report.data) 405 self.last_data = b'' 406 else: 407 if ( 408 self.passive 409 or (not advertisement.is_scannable) 410 or ( 411 self.last_advertisement is not None 412 and not self.last_advertisement.is_scan_response 413 ) 414 ): 415 # Don't wait for a scan response 416 result = Advertisement.from_advertising_report(report) 417 418 self.last_data = report.data 419 420 self.last_advertisement = advertisement 421 422 return result 423 424 425# ----------------------------------------------------------------------------- 426class AdvertisingType(IntEnum): 427 # fmt: off 428 # pylint: disable=line-too-long 429 UNDIRECTED_CONNECTABLE_SCANNABLE = 0x00 # Undirected, connectable, scannable 430 DIRECTED_CONNECTABLE_HIGH_DUTY = 0x01 # Directed, connectable, non-scannable 431 UNDIRECTED_SCANNABLE = 0x02 # Undirected, non-connectable, scannable 432 UNDIRECTED = 0x03 # Undirected, non-connectable, non-scannable 433 DIRECTED_CONNECTABLE_LOW_DUTY = 0x04 # Directed, connectable, non-scannable 434 # fmt: on 435 436 @property 437 def has_data(self) -> bool: 438 return self in ( 439 AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 440 AdvertisingType.UNDIRECTED_SCANNABLE, 441 AdvertisingType.UNDIRECTED, 442 ) 443 444 @property 445 def is_connectable(self) -> bool: 446 return self in ( 447 AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 448 AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY, 449 AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY, 450 ) 451 452 @property 453 def is_scannable(self) -> bool: 454 return self in ( 455 AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 456 AdvertisingType.UNDIRECTED_SCANNABLE, 457 ) 458 459 @property 460 def is_directed(self) -> bool: 461 return self in ( 462 AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY, 463 AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY, 464 ) 465 466 @property 467 def is_high_duty_cycle_directed_connectable(self): 468 return self == AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY 469 470 471# ----------------------------------------------------------------------------- 472@dataclass 473class LegacyAdvertiser: 474 device: Device 475 advertising_type: AdvertisingType 476 own_address_type: OwnAddressType 477 peer_address: Address 478 auto_restart: bool 479 480 async def start(self) -> None: 481 # Set/update the advertising data if the advertising type allows it 482 if self.advertising_type.has_data: 483 await self.device.send_command( 484 HCI_LE_Set_Advertising_Data_Command( 485 advertising_data=self.device.advertising_data 486 ), 487 check_result=True, 488 ) 489 490 # Set/update the scan response data if the advertising is scannable 491 if self.advertising_type.is_scannable: 492 await self.device.send_command( 493 HCI_LE_Set_Scan_Response_Data_Command( 494 scan_response_data=self.device.scan_response_data 495 ), 496 check_result=True, 497 ) 498 499 # Set the advertising parameters 500 await self.device.send_command( 501 HCI_LE_Set_Advertising_Parameters_Command( 502 advertising_interval_min=self.device.advertising_interval_min, 503 advertising_interval_max=self.device.advertising_interval_max, 504 advertising_type=int(self.advertising_type), 505 own_address_type=self.own_address_type, 506 peer_address_type=self.peer_address.address_type, 507 peer_address=self.peer_address, 508 advertising_channel_map=7, 509 advertising_filter_policy=0, 510 ), 511 check_result=True, 512 ) 513 514 # Enable advertising 515 await self.device.send_command( 516 HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), 517 check_result=True, 518 ) 519 520 async def stop(self) -> None: 521 # Disable advertising 522 await self.device.send_command( 523 HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), 524 check_result=True, 525 ) 526 527 528# ----------------------------------------------------------------------------- 529@dataclass 530class AdvertisingEventProperties: 531 is_connectable: bool = True 532 is_scannable: bool = False 533 is_directed: bool = False 534 is_high_duty_cycle_directed_connectable: bool = False 535 is_legacy: bool = False 536 is_anonymous: bool = False 537 include_tx_power: bool = False 538 539 def __int__(self) -> int: 540 properties = ( 541 HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(0) 542 ) 543 if self.is_connectable: 544 properties |= properties.CONNECTABLE_ADVERTISING 545 if self.is_scannable: 546 properties |= properties.SCANNABLE_ADVERTISING 547 if self.is_directed: 548 properties |= properties.DIRECTED_ADVERTISING 549 if self.is_high_duty_cycle_directed_connectable: 550 properties |= properties.HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING 551 if self.is_legacy: 552 properties |= properties.USE_LEGACY_ADVERTISING_PDUS 553 if self.is_anonymous: 554 properties |= properties.ANONYMOUS_ADVERTISING 555 if self.include_tx_power: 556 properties |= properties.INCLUDE_TX_POWER 557 558 return int(properties) 559 560 @classmethod 561 def from_advertising_type( 562 cls: Type[AdvertisingEventProperties], 563 advertising_type: AdvertisingType, 564 ) -> AdvertisingEventProperties: 565 return cls( 566 is_connectable=advertising_type.is_connectable, 567 is_scannable=advertising_type.is_scannable, 568 is_directed=advertising_type.is_directed, 569 is_high_duty_cycle_directed_connectable=advertising_type.is_high_duty_cycle_directed_connectable, 570 is_legacy=True, 571 is_anonymous=False, 572 include_tx_power=False, 573 ) 574 575 576# ----------------------------------------------------------------------------- 577@dataclass 578class PeriodicAdvertisement: 579 address: Address 580 sid: int 581 tx_power: int = ( 582 HCI_LE_Periodic_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE 583 ) 584 rssi: int = HCI_LE_Periodic_Advertising_Report_Event.RSSI_NOT_AVAILABLE 585 is_truncated: bool = False 586 data_bytes: bytes = b'' 587 588 # Constants 589 TX_POWER_NOT_AVAILABLE: ClassVar[int] = ( 590 HCI_LE_Periodic_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE 591 ) 592 RSSI_NOT_AVAILABLE: ClassVar[int] = ( 593 HCI_LE_Periodic_Advertising_Report_Event.RSSI_NOT_AVAILABLE 594 ) 595 596 def __post_init__(self) -> None: 597 self.data = ( 598 None if self.is_truncated else AdvertisingData.from_bytes(self.data_bytes) 599 ) 600 601 602# ----------------------------------------------------------------------------- 603@dataclass 604class BIGInfoAdvertisement: 605 address: Address 606 sid: int 607 num_bis: int 608 nse: int 609 iso_interval: int 610 bn: int 611 pto: int 612 irc: int 613 max_pdu: int 614 sdu_interval: int 615 max_sdu: int 616 phy: Phy 617 framed: bool 618 encrypted: bool 619 620 @classmethod 621 def from_report(cls, address: Address, sid: int, report) -> Self: 622 return cls( 623 address, 624 sid, 625 report.num_bis, 626 report.nse, 627 report.iso_interval, 628 report.bn, 629 report.pto, 630 report.irc, 631 report.max_pdu, 632 report.sdu_interval, 633 report.max_sdu, 634 Phy(report.phy), 635 report.framing != 0, 636 report.encryption != 0, 637 ) 638 639 640# ----------------------------------------------------------------------------- 641# TODO: replace with typing.TypeAlias when the code base is all Python >= 3.10 642AdvertisingChannelMap = HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap 643 644 645# ----------------------------------------------------------------------------- 646@dataclass 647class AdvertisingParameters: 648 # pylint: disable=line-too-long 649 advertising_event_properties: AdvertisingEventProperties = field( 650 default_factory=AdvertisingEventProperties 651 ) 652 primary_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL 653 primary_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL 654 primary_advertising_channel_map: ( 655 HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap 656 ) = ( 657 AdvertisingChannelMap.CHANNEL_37 658 | AdvertisingChannelMap.CHANNEL_38 659 | AdvertisingChannelMap.CHANNEL_39 660 ) 661 own_address_type: OwnAddressType = OwnAddressType.RANDOM 662 peer_address: Address = Address.ANY 663 advertising_filter_policy: int = 0 664 advertising_tx_power: int = DEVICE_DEFAULT_ADVERTISING_TX_POWER 665 primary_advertising_phy: Phy = Phy.LE_1M 666 secondary_advertising_max_skip: int = 0 667 secondary_advertising_phy: Phy = Phy.LE_1M 668 advertising_sid: int = 0 669 enable_scan_request_notifications: bool = False 670 primary_advertising_phy_options: int = 0 671 secondary_advertising_phy_options: int = 0 672 673 674# ----------------------------------------------------------------------------- 675@dataclass 676class PeriodicAdvertisingParameters: 677 # TODO implement this class 678 pass 679 680 681# ----------------------------------------------------------------------------- 682@dataclass 683class AdvertisingSet(EventEmitter): 684 device: Device 685 advertising_handle: int 686 auto_restart: bool 687 random_address: Optional[Address] 688 advertising_parameters: AdvertisingParameters 689 advertising_data: bytes 690 scan_response_data: bytes 691 periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] 692 periodic_advertising_data: bytes 693 selected_tx_power: int = 0 694 enabled: bool = False 695 696 def __post_init__(self) -> None: 697 super().__init__() 698 699 async def set_advertising_parameters( 700 self, advertising_parameters: AdvertisingParameters 701 ) -> None: 702 # Compliance check 703 if ( 704 not advertising_parameters.advertising_event_properties.is_legacy 705 and advertising_parameters.advertising_event_properties.is_connectable 706 and advertising_parameters.advertising_event_properties.is_scannable 707 ): 708 logger.warning( 709 "non-legacy extended advertising event properties may not be both " 710 "connectable and scannable" 711 ) 712 713 response = await self.device.send_command( 714 HCI_LE_Set_Extended_Advertising_Parameters_Command( 715 advertising_handle=self.advertising_handle, 716 advertising_event_properties=int( 717 advertising_parameters.advertising_event_properties 718 ), 719 primary_advertising_interval_min=( 720 int(advertising_parameters.primary_advertising_interval_min / 0.625) 721 ), 722 primary_advertising_interval_max=( 723 int(advertising_parameters.primary_advertising_interval_min / 0.625) 724 ), 725 primary_advertising_channel_map=int( 726 advertising_parameters.primary_advertising_channel_map 727 ), 728 own_address_type=advertising_parameters.own_address_type, 729 peer_address_type=advertising_parameters.peer_address.address_type, 730 peer_address=advertising_parameters.peer_address, 731 advertising_tx_power=advertising_parameters.advertising_tx_power, 732 advertising_filter_policy=( 733 advertising_parameters.advertising_filter_policy 734 ), 735 primary_advertising_phy=advertising_parameters.primary_advertising_phy, 736 secondary_advertising_max_skip=( 737 advertising_parameters.secondary_advertising_max_skip 738 ), 739 secondary_advertising_phy=( 740 advertising_parameters.secondary_advertising_phy 741 ), 742 advertising_sid=advertising_parameters.advertising_sid, 743 scan_request_notification_enable=( 744 1 if advertising_parameters.enable_scan_request_notifications else 0 745 ), 746 ), 747 check_result=True, 748 ) 749 self.selected_tx_power = response.return_parameters.selected_tx_power 750 self.advertising_parameters = advertising_parameters 751 752 async def set_advertising_data(self, advertising_data: bytes) -> None: 753 # pylint: disable=line-too-long 754 await self.device.send_command( 755 HCI_LE_Set_Extended_Advertising_Data_Command( 756 advertising_handle=self.advertising_handle, 757 operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, 758 fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, 759 advertising_data=advertising_data, 760 ), 761 check_result=True, 762 ) 763 self.advertising_data = advertising_data 764 765 async def set_scan_response_data(self, scan_response_data: bytes) -> None: 766 # pylint: disable=line-too-long 767 if ( 768 scan_response_data 769 and not self.advertising_parameters.advertising_event_properties.is_scannable 770 ): 771 logger.warning( 772 "ignoring attempt to set non-empty scan response data on non-scannable " 773 "advertising set" 774 ) 775 return 776 777 await self.device.send_command( 778 HCI_LE_Set_Extended_Scan_Response_Data_Command( 779 advertising_handle=self.advertising_handle, 780 operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, 781 fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, 782 scan_response_data=scan_response_data, 783 ), 784 check_result=True, 785 ) 786 self.scan_response_data = scan_response_data 787 788 async def set_periodic_advertising_parameters( 789 self, advertising_parameters: PeriodicAdvertisingParameters 790 ) -> None: 791 # TODO: send command 792 self.periodic_advertising_parameters = advertising_parameters 793 794 async def set_periodic_advertising_data(self, advertising_data: bytes) -> None: 795 # TODO: send command 796 self.periodic_advertising_data = advertising_data 797 798 async def set_random_address(self, random_address: Address) -> None: 799 await self.device.send_command( 800 HCI_LE_Set_Advertising_Set_Random_Address_Command( 801 advertising_handle=self.advertising_handle, 802 random_address=(random_address or self.device.random_address), 803 ), 804 check_result=True, 805 ) 806 807 async def start( 808 self, duration: float = 0.0, max_advertising_events: int = 0 809 ) -> None: 810 """ 811 Start advertising. 812 813 Args: 814 duration: How long to advertise for, in seconds. Use 0 (the default) for 815 an unlimited duration, unless this advertising set is a High Duty Cycle 816 Directed Advertisement type. 817 max_advertising_events: Maximum number of events to advertise for. Use 0 818 (the default) for an unlimited number of advertisements. 819 """ 820 await self.device.send_command( 821 HCI_LE_Set_Extended_Advertising_Enable_Command( 822 enable=1, 823 advertising_handles=[self.advertising_handle], 824 durations=[round(duration * 100)], 825 max_extended_advertising_events=[max_advertising_events], 826 ), 827 check_result=True, 828 ) 829 self.enabled = True 830 831 self.emit('start') 832 833 async def start_periodic(self, include_adi: bool = False) -> None: 834 await self.device.send_command( 835 HCI_LE_Set_Periodic_Advertising_Enable_Command( 836 enable=1 | (2 if include_adi else 0), 837 advertising_handles=self.advertising_handle, 838 ), 839 check_result=True, 840 ) 841 842 self.emit('start_periodic') 843 844 async def stop(self) -> None: 845 await self.device.send_command( 846 HCI_LE_Set_Extended_Advertising_Enable_Command( 847 enable=0, 848 advertising_handles=[self.advertising_handle], 849 durations=[0], 850 max_extended_advertising_events=[0], 851 ), 852 check_result=True, 853 ) 854 self.enabled = False 855 856 self.emit('stop') 857 858 async def stop_periodic(self) -> None: 859 await self.device.send_command( 860 HCI_LE_Set_Periodic_Advertising_Enable_Command( 861 enable=0, 862 advertising_handles=self.advertising_handle, 863 ), 864 check_result=True, 865 ) 866 867 self.emit('stop_periodic') 868 869 async def remove(self) -> None: 870 await self.device.send_command( 871 HCI_LE_Remove_Advertising_Set_Command( 872 advertising_handle=self.advertising_handle 873 ), 874 check_result=True, 875 ) 876 del self.device.extended_advertising_sets[self.advertising_handle] 877 878 def on_termination(self, status: int) -> None: 879 self.enabled = False 880 self.emit('termination', status) 881 882 883# ----------------------------------------------------------------------------- 884class PeriodicAdvertisingSync(EventEmitter): 885 class State(Enum): 886 INIT = 0 887 PENDING = 1 888 ESTABLISHED = 2 889 CANCELLED = 3 890 ERROR = 4 891 LOST = 5 892 TERMINATED = 6 893 894 _state: State 895 sync_handle: Optional[int] 896 advertiser_address: Address 897 sid: int 898 skip: int 899 sync_timeout: float # Sync timeout, in seconds 900 filter_duplicates: bool 901 status: int 902 advertiser_phy: int 903 periodic_advertising_interval: int 904 advertiser_clock_accuracy: int 905 906 def __init__( 907 self, 908 device: Device, 909 advertiser_address: Address, 910 sid: int, 911 skip: int, 912 sync_timeout: float, 913 filter_duplicates: bool, 914 ) -> None: 915 super().__init__() 916 self._state = self.State.INIT 917 self.sync_handle = None 918 self.device = device 919 self.advertiser_address = advertiser_address 920 self.sid = sid 921 self.skip = skip 922 self.sync_timeout = sync_timeout 923 self.filter_duplicates = filter_duplicates 924 self.status = HCI_SUCCESS 925 self.advertiser_phy = 0 926 self.periodic_advertising_interval = 0 927 self.advertiser_clock_accuracy = 0 928 self.data_accumulator = b'' 929 930 @property 931 def state(self) -> State: 932 return self._state 933 934 @state.setter 935 def state(self, state: State) -> None: 936 logger.debug(f'{self} -> {state.name}') 937 self._state = state 938 self.emit('state_change') 939 940 async def establish(self) -> None: 941 if self.state != self.State.INIT: 942 raise InvalidStateError('sync not in init state') 943 944 options = HCI_LE_Periodic_Advertising_Create_Sync_Command.Options(0) 945 if self.filter_duplicates: 946 options |= ( 947 HCI_LE_Periodic_Advertising_Create_Sync_Command.Options.DUPLICATE_FILTERING_INITIALLY_ENABLED 948 ) 949 950 response = await self.device.send_command( 951 HCI_LE_Periodic_Advertising_Create_Sync_Command( 952 options=options, 953 advertising_sid=self.sid, 954 advertiser_address_type=self.advertiser_address.address_type, 955 advertiser_address=self.advertiser_address, 956 skip=self.skip, 957 sync_timeout=int(self.sync_timeout * 100), 958 sync_cte_type=0, 959 ) 960 ) 961 if response.status != HCI_Command_Status_Event.PENDING: 962 raise HCI_StatusError(response) 963 964 self.state = self.State.PENDING 965 966 async def terminate(self) -> None: 967 if self.state in (self.State.INIT, self.State.CANCELLED, self.State.TERMINATED): 968 return 969 970 if self.state == self.State.PENDING: 971 self.state = self.State.CANCELLED 972 response = await self.device.send_command( 973 HCI_LE_Periodic_Advertising_Create_Sync_Cancel_Command(), 974 ) 975 if response.return_parameters == HCI_SUCCESS: 976 if self in self.device.periodic_advertising_syncs: 977 self.device.periodic_advertising_syncs.remove(self) 978 return 979 980 if self.state in (self.State.ESTABLISHED, self.State.ERROR, self.State.LOST): 981 self.state = self.State.TERMINATED 982 if self.sync_handle is not None: 983 await self.device.send_command( 984 HCI_LE_Periodic_Advertising_Terminate_Sync_Command( 985 sync_handle=self.sync_handle 986 ) 987 ) 988 self.device.periodic_advertising_syncs.remove(self) 989 990 async def transfer(self, connection: Connection, service_data: int = 0) -> None: 991 if self.sync_handle is not None: 992 await connection.transfer_periodic_sync(self.sync_handle, service_data) 993 994 def on_establishment( 995 self, 996 status, 997 sync_handle, 998 advertiser_phy, 999 periodic_advertising_interval, 1000 advertiser_clock_accuracy, 1001 ) -> None: 1002 self.status = status 1003 1004 if self.state == self.State.CANCELLED: 1005 # Somehow, we receive an established event after trying to cancel, most 1006 # likely because the cancel command was sent too late, when the sync was 1007 # already established, but before the established event was sent. 1008 # We need to automatically terminate. 1009 logger.debug( 1010 "received established event for cancelled sync, will terminate" 1011 ) 1012 self.state = self.State.ESTABLISHED 1013 AsyncRunner.spawn(self.terminate()) 1014 return 1015 1016 if status == HCI_SUCCESS: 1017 self.sync_handle = sync_handle 1018 self.advertiser_phy = advertiser_phy 1019 self.periodic_advertising_interval = periodic_advertising_interval 1020 self.advertiser_clock_accuracy = advertiser_clock_accuracy 1021 self.state = self.State.ESTABLISHED 1022 self.emit('establishment') 1023 return 1024 1025 # We don't need to keep a reference anymore 1026 if self in self.device.periodic_advertising_syncs: 1027 self.device.periodic_advertising_syncs.remove(self) 1028 1029 if status == HCI_OPERATION_CANCELLED_BY_HOST_ERROR: 1030 self.state = self.State.CANCELLED 1031 self.emit('cancellation') 1032 return 1033 1034 self.state = self.State.ERROR 1035 self.emit('error') 1036 1037 def on_loss(self): 1038 self.state = self.State.LOST 1039 self.emit('loss') 1040 1041 def on_periodic_advertising_report(self, report) -> None: 1042 self.data_accumulator += report.data 1043 if ( 1044 report.data_status 1045 == HCI_LE_Periodic_Advertising_Report_Event.DataStatus.DATA_INCOMPLETE_MORE_TO_COME 1046 ): 1047 return 1048 1049 self.emit( 1050 'periodic_advertisement', 1051 PeriodicAdvertisement( 1052 self.advertiser_address, 1053 self.sid, 1054 report.tx_power, 1055 report.rssi, 1056 is_truncated=( 1057 report.data_status 1058 == HCI_LE_Periodic_Advertising_Report_Event.DataStatus.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME 1059 ), 1060 data_bytes=self.data_accumulator, 1061 ), 1062 ) 1063 self.data_accumulator = b'' 1064 1065 def on_biginfo_advertising_report(self, report) -> None: 1066 self.emit( 1067 'biginfo_advertisement', 1068 BIGInfoAdvertisement.from_report(self.advertiser_address, self.sid, report), 1069 ) 1070 1071 def __str__(self) -> str: 1072 return ( 1073 'PeriodicAdvertisingSync(' 1074 f'state={self.state.name}, ' 1075 f'sync_handle={self.sync_handle}, ' 1076 f'sid={self.sid}, ' 1077 f'skip={self.skip}, ' 1078 f'filter_duplicates={self.filter_duplicates}' 1079 ')' 1080 ) 1081 1082 1083# ----------------------------------------------------------------------------- 1084class LePhyOptions: 1085 # Coded PHY preference 1086 ANY_CODED_PHY = 0 1087 PREFER_S_2_CODED_PHY = 1 1088 PREFER_S_8_CODED_PHY = 2 1089 1090 def __init__(self, coded_phy_preference=0): 1091 self.coded_phy_preference = coded_phy_preference 1092 1093 def __int__(self): 1094 return self.coded_phy_preference & 3 1095 1096 1097# ----------------------------------------------------------------------------- 1098_PROXY_CLASS = TypeVar('_PROXY_CLASS', bound=gatt_client.ProfileServiceProxy) 1099 1100 1101class Peer: 1102 def __init__(self, connection: Connection) -> None: 1103 self.connection = connection 1104 1105 # Create a GATT client for the connection 1106 self.gatt_client = gatt_client.Client(connection) 1107 connection.gatt_client = self.gatt_client 1108 1109 @property 1110 def services(self) -> List[gatt_client.ServiceProxy]: 1111 return self.gatt_client.services 1112 1113 async def request_mtu(self, mtu: int) -> int: 1114 mtu = await self.gatt_client.request_mtu(mtu) 1115 self.connection.emit('connection_att_mtu_update') 1116 return mtu 1117 1118 async def discover_service( 1119 self, uuid: Union[core.UUID, str] 1120 ) -> List[gatt_client.ServiceProxy]: 1121 return await self.gatt_client.discover_service(uuid) 1122 1123 async def discover_services( 1124 self, uuids: Iterable[core.UUID] = () 1125 ) -> List[gatt_client.ServiceProxy]: 1126 return await self.gatt_client.discover_services(uuids) 1127 1128 async def discover_included_services( 1129 self, service: gatt_client.ServiceProxy 1130 ) -> List[gatt_client.ServiceProxy]: 1131 return await self.gatt_client.discover_included_services(service) 1132 1133 async def discover_characteristics( 1134 self, 1135 uuids: Iterable[Union[core.UUID, str]] = (), 1136 service: Optional[gatt_client.ServiceProxy] = None, 1137 ) -> List[gatt_client.CharacteristicProxy]: 1138 return await self.gatt_client.discover_characteristics( 1139 uuids=uuids, service=service 1140 ) 1141 1142 async def discover_descriptors( 1143 self, 1144 characteristic: Optional[gatt_client.CharacteristicProxy] = None, 1145 start_handle: Optional[int] = None, 1146 end_handle: Optional[int] = None, 1147 ): 1148 return await self.gatt_client.discover_descriptors( 1149 characteristic, start_handle, end_handle 1150 ) 1151 1152 async def discover_attributes(self) -> List[gatt_client.AttributeProxy]: 1153 return await self.gatt_client.discover_attributes() 1154 1155 async def discover_all(self): 1156 await self.discover_services() 1157 for service in self.services: 1158 await self.discover_characteristics(service=service) 1159 1160 for service in self.services: 1161 for characteristic in service.characteristics: 1162 await self.discover_descriptors(characteristic=characteristic) 1163 1164 async def subscribe( 1165 self, 1166 characteristic: gatt_client.CharacteristicProxy, 1167 subscriber: Optional[Callable[[bytes], Any]] = None, 1168 prefer_notify: bool = True, 1169 ) -> None: 1170 return await self.gatt_client.subscribe( 1171 characteristic, subscriber, prefer_notify 1172 ) 1173 1174 async def unsubscribe( 1175 self, 1176 characteristic: gatt_client.CharacteristicProxy, 1177 subscriber: Optional[Callable[[bytes], Any]] = None, 1178 ) -> None: 1179 return await self.gatt_client.unsubscribe(characteristic, subscriber) 1180 1181 async def read_value( 1182 self, attribute: Union[int, gatt_client.AttributeProxy] 1183 ) -> bytes: 1184 return await self.gatt_client.read_value(attribute) 1185 1186 async def write_value( 1187 self, 1188 attribute: Union[int, gatt_client.AttributeProxy], 1189 value: bytes, 1190 with_response: bool = False, 1191 ) -> None: 1192 return await self.gatt_client.write_value(attribute, value, with_response) 1193 1194 async def read_characteristics_by_uuid( 1195 self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None 1196 ) -> List[bytes]: 1197 return await self.gatt_client.read_characteristics_by_uuid(uuid, service) 1198 1199 def get_services_by_uuid(self, uuid: core.UUID) -> List[gatt_client.ServiceProxy]: 1200 return self.gatt_client.get_services_by_uuid(uuid) 1201 1202 def get_characteristics_by_uuid( 1203 self, 1204 uuid: core.UUID, 1205 service: Optional[Union[gatt_client.ServiceProxy, core.UUID]] = None, 1206 ) -> List[gatt_client.CharacteristicProxy]: 1207 if isinstance(service, core.UUID): 1208 return list( 1209 itertools.chain( 1210 *[ 1211 self.get_characteristics_by_uuid(uuid, s) 1212 for s in self.get_services_by_uuid(service) 1213 ] 1214 ) 1215 ) 1216 1217 return self.gatt_client.get_characteristics_by_uuid(uuid, service) 1218 1219 def create_service_proxy( 1220 self, proxy_class: Type[_PROXY_CLASS] 1221 ) -> Optional[_PROXY_CLASS]: 1222 if proxy := proxy_class.from_client(self.gatt_client): 1223 return cast(_PROXY_CLASS, proxy) 1224 1225 return None 1226 1227 async def discover_service_and_create_proxy( 1228 self, proxy_class: Type[_PROXY_CLASS] 1229 ) -> Optional[_PROXY_CLASS]: 1230 # Discover the first matching service and its characteristics 1231 services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID) 1232 if services: 1233 service = services[0] 1234 await service.discover_characteristics() 1235 return self.create_service_proxy(proxy_class) 1236 return None 1237 1238 async def sustain(self, timeout: Optional[float] = None) -> None: 1239 await self.connection.sustain(timeout) 1240 1241 # [Classic only] 1242 async def request_name(self) -> str: 1243 return await self.connection.request_remote_name() 1244 1245 async def __aenter__(self): 1246 await self.discover_services() 1247 for service in self.services: 1248 await service.discover_characteristics() 1249 1250 return self 1251 1252 async def __aexit__(self, exc_type, exc_value, traceback): 1253 pass 1254 1255 def __str__(self) -> str: 1256 return f'{self.connection.peer_address} as {self.connection.role_name}' 1257 1258 1259# ----------------------------------------------------------------------------- 1260@dataclass 1261class ConnectionParametersPreferences: 1262 default: ClassVar[ConnectionParametersPreferences] 1263 connection_interval_min: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN 1264 connection_interval_max: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX 1265 max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY 1266 supervision_timeout: int = DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT 1267 min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH 1268 max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH 1269 1270 1271ConnectionParametersPreferences.default = ConnectionParametersPreferences() 1272 1273 1274# ----------------------------------------------------------------------------- 1275@dataclass 1276class ScoLink(CompositeEventEmitter): 1277 device: Device 1278 acl_connection: Connection 1279 handle: int 1280 link_type: int 1281 sink: Optional[Callable[[HCI_SynchronousDataPacket], Any]] = None 1282 1283 def __post_init__(self) -> None: 1284 super().__init__() 1285 1286 async def disconnect( 1287 self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR 1288 ) -> None: 1289 await self.device.disconnect(self, reason) 1290 1291 1292# ----------------------------------------------------------------------------- 1293@dataclass 1294class CisLink(CompositeEventEmitter): 1295 class State(IntEnum): 1296 PENDING = 0 1297 ESTABLISHED = 1 1298 1299 device: Device 1300 acl_connection: Connection # Based ACL connection 1301 handle: int # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events) 1302 cis_id: int # CIS ID assigned by Central device 1303 cig_id: int # CIG ID assigned by Central device 1304 state: State = State.PENDING 1305 sink: Optional[Callable[[HCI_IsoDataPacket], Any]] = None 1306 1307 def __post_init__(self) -> None: 1308 super().__init__() 1309 1310 async def disconnect( 1311 self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR 1312 ) -> None: 1313 await self.device.disconnect(self, reason) 1314 1315 1316# ----------------------------------------------------------------------------- 1317class Connection(CompositeEventEmitter): 1318 device: Device 1319 handle: int 1320 transport: int 1321 self_address: Address 1322 self_resolvable_address: Optional[Address] 1323 peer_address: Address 1324 peer_resolvable_address: Optional[Address] 1325 peer_le_features: Optional[LeFeatureMask] 1326 role: int 1327 encryption: int 1328 authenticated: bool 1329 sc: bool 1330 link_key_type: int 1331 gatt_client: gatt_client.Client 1332 pairing_peer_io_capability: Optional[int] 1333 pairing_peer_authentication_requirements: Optional[int] 1334 1335 @composite_listener 1336 class Listener: 1337 def on_disconnection(self, reason): 1338 pass 1339 1340 def on_connection_parameters_update(self): 1341 pass 1342 1343 def on_connection_parameters_update_failure(self, error): 1344 pass 1345 1346 def on_connection_data_length_change(self): 1347 pass 1348 1349 def on_connection_phy_update(self): 1350 pass 1351 1352 def on_connection_phy_update_failure(self, error): 1353 pass 1354 1355 def on_connection_att_mtu_update(self): 1356 pass 1357 1358 def on_connection_encryption_change(self): 1359 pass 1360 1361 def on_connection_encryption_key_refresh(self): 1362 pass 1363 1364 def __init__( 1365 self, 1366 device, 1367 handle, 1368 transport, 1369 self_address, 1370 self_resolvable_address, 1371 peer_address, 1372 peer_resolvable_address, 1373 role, 1374 parameters, 1375 phy, 1376 ): 1377 super().__init__() 1378 self.device = device 1379 self.handle = handle 1380 self.transport = transport 1381 self.self_address = self_address 1382 self.self_resolvable_address = self_resolvable_address 1383 self.peer_address = peer_address 1384 self.peer_resolvable_address = peer_resolvable_address 1385 self.peer_name = None # Classic only 1386 self.role = role 1387 self.parameters = parameters 1388 self.encryption = 0 1389 self.authenticated = False 1390 self.sc = False 1391 self.link_key_type = None 1392 self.phy = phy 1393 self.att_mtu = ATT_DEFAULT_MTU 1394 self.data_length = DEVICE_DEFAULT_DATA_LENGTH 1395 self.gatt_client = None # Per-connection client 1396 self.gatt_server = ( 1397 device.gatt_server 1398 ) # By default, use the device's shared server 1399 self.pairing_peer_io_capability = None 1400 self.pairing_peer_authentication_requirements = None 1401 self.peer_le_features = None 1402 1403 # [Classic only] 1404 @classmethod 1405 def incomplete(cls, device, peer_address, role): 1406 """ 1407 Instantiate an incomplete connection (ie. one waiting for a HCI Connection 1408 Complete event). 1409 Once received it shall be completed using the `.complete` method. 1410 """ 1411 return cls( 1412 device, 1413 None, 1414 BT_BR_EDR_TRANSPORT, 1415 device.public_address, 1416 None, 1417 peer_address, 1418 None, 1419 role, 1420 None, 1421 None, 1422 ) 1423 1424 # [Classic only] 1425 def complete(self, handle, parameters): 1426 """ 1427 Finish an incomplete connection upon completion. 1428 """ 1429 assert self.handle is None 1430 assert self.transport == BT_BR_EDR_TRANSPORT 1431 self.handle = handle 1432 self.parameters = parameters 1433 1434 @property 1435 def role_name(self): 1436 if self.role is None: 1437 return 'NOT-SET' 1438 if self.role == BT_CENTRAL_ROLE: 1439 return 'CENTRAL' 1440 if self.role == BT_PERIPHERAL_ROLE: 1441 return 'PERIPHERAL' 1442 return f'UNKNOWN[{self.role}]' 1443 1444 @property 1445 def is_encrypted(self): 1446 return self.encryption != 0 1447 1448 @property 1449 def is_incomplete(self) -> bool: 1450 return self.handle is None 1451 1452 def send_l2cap_pdu(self, cid: int, pdu: bytes) -> None: 1453 self.device.send_l2cap_pdu(self.handle, cid, pdu) 1454 1455 @deprecated("Please use create_l2cap_channel()") 1456 async def open_l2cap_channel( 1457 self, 1458 psm, 1459 max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS, 1460 mtu=DEVICE_DEFAULT_L2CAP_COC_MTU, 1461 mps=DEVICE_DEFAULT_L2CAP_COC_MPS, 1462 ): 1463 return await self.device.open_l2cap_channel(self, psm, max_credits, mtu, mps) 1464 1465 @overload 1466 async def create_l2cap_channel( 1467 self, spec: l2cap.ClassicChannelSpec 1468 ) -> l2cap.ClassicChannel: ... 1469 1470 @overload 1471 async def create_l2cap_channel( 1472 self, spec: l2cap.LeCreditBasedChannelSpec 1473 ) -> l2cap.LeCreditBasedChannel: ... 1474 1475 async def create_l2cap_channel( 1476 self, spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec] 1477 ) -> Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]: 1478 return await self.device.create_l2cap_channel(connection=self, spec=spec) 1479 1480 async def disconnect( 1481 self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR 1482 ) -> None: 1483 await self.device.disconnect(self, reason) 1484 1485 async def pair(self) -> None: 1486 return await self.device.pair(self) 1487 1488 def request_pairing(self) -> None: 1489 return self.device.request_pairing(self) 1490 1491 # [Classic only] 1492 async def authenticate(self) -> None: 1493 return await self.device.authenticate(self) 1494 1495 async def encrypt(self, enable: bool = True) -> None: 1496 return await self.device.encrypt(self, enable) 1497 1498 async def switch_role(self, role: int) -> None: 1499 return await self.device.switch_role(self, role) 1500 1501 async def sustain(self, timeout: Optional[float] = None) -> None: 1502 """Idles the current task waiting for a disconnect or timeout""" 1503 1504 abort = asyncio.get_running_loop().create_future() 1505 self.on('disconnection', abort.set_result) 1506 self.on('disconnection_failure', abort.set_exception) 1507 1508 try: 1509 await asyncio.wait_for(self.device.abort_on('flush', abort), timeout) 1510 finally: 1511 self.remove_listener('disconnection', abort.set_result) 1512 self.remove_listener('disconnection_failure', abort.set_exception) 1513 1514 async def set_data_length(self, tx_octets, tx_time) -> None: 1515 return await self.device.set_data_length(self, tx_octets, tx_time) 1516 1517 async def update_parameters( 1518 self, 1519 connection_interval_min, 1520 connection_interval_max, 1521 max_latency, 1522 supervision_timeout, 1523 use_l2cap=False, 1524 ): 1525 return await self.device.update_connection_parameters( 1526 self, 1527 connection_interval_min, 1528 connection_interval_max, 1529 max_latency, 1530 supervision_timeout, 1531 use_l2cap=use_l2cap, 1532 ) 1533 1534 async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None): 1535 return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options) 1536 1537 async def get_rssi(self): 1538 return await self.device.get_connection_rssi(self) 1539 1540 async def get_phy(self): 1541 return await self.device.get_connection_phy(self) 1542 1543 async def transfer_periodic_sync( 1544 self, sync_handle: int, service_data: int = 0 1545 ) -> None: 1546 await self.device.transfer_periodic_sync(self, sync_handle, service_data) 1547 1548 # [Classic only] 1549 async def request_remote_name(self): 1550 return await self.device.request_remote_name(self) 1551 1552 async def get_remote_le_features(self) -> LeFeatureMask: 1553 """[LE Only] Reads remote LE supported features. 1554 1555 Returns: 1556 LE features supported by the remote device. 1557 """ 1558 self.peer_le_features = await self.device.get_remote_le_features(self) 1559 return self.peer_le_features 1560 1561 async def __aenter__(self): 1562 return self 1563 1564 async def __aexit__(self, exc_type, exc_value, traceback): 1565 if exc_type is None: 1566 try: 1567 await self.disconnect() 1568 except HCI_StatusError as error: 1569 # Invalid parameter means the connection is no longer valid 1570 if error.error_code != HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR: 1571 raise 1572 1573 def __str__(self): 1574 return ( 1575 f'Connection(handle=0x{self.handle:04X}, ' 1576 f'role={self.role_name}, ' 1577 f'self_address={self.self_address}, ' 1578 f'self_resolvable_address={self.self_resolvable_address}, ' 1579 f'peer_address={self.peer_address}, ' 1580 f'peer_resolvable_address={self.peer_resolvable_address})' 1581 ) 1582 1583 1584# ----------------------------------------------------------------------------- 1585@dataclass 1586class DeviceConfiguration: 1587 # Setup defaults 1588 name: str = DEVICE_DEFAULT_NAME 1589 address: Address = Address(DEVICE_DEFAULT_ADDRESS) 1590 class_of_device: int = DEVICE_DEFAULT_CLASS_OF_DEVICE 1591 scan_response_data: bytes = DEVICE_DEFAULT_SCAN_RESPONSE_DATA 1592 advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL 1593 advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL 1594 le_enabled: bool = True 1595 le_simultaneous_enabled: bool = False 1596 le_privacy_enabled: bool = False 1597 le_rpa_timeout: int = DEVICE_DEFAULT_LE_RPA_TIMEOUT 1598 classic_enabled: bool = False 1599 classic_sc_enabled: bool = True 1600 classic_ssp_enabled: bool = True 1601 classic_smp_enabled: bool = True 1602 classic_accept_any: bool = True 1603 classic_interlaced_scan_enabled: bool = True 1604 connectable: bool = True 1605 discoverable: bool = True 1606 advertising_data: bytes = bytes( 1607 AdvertisingData( 1608 [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(DEVICE_DEFAULT_NAME, 'utf-8'))] 1609 ) 1610 ) 1611 irk: bytes = bytes(16) # This really must be changed for any level of security 1612 keystore: Optional[str] = None 1613 address_resolution_offload: bool = False 1614 address_generation_offload: bool = False 1615 cis_enabled: bool = False 1616 identity_address_type: Optional[int] = None 1617 io_capability: int = pairing.PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT 1618 1619 def __post_init__(self) -> None: 1620 self.gatt_services: List[Dict[str, Any]] = [] 1621 1622 def load_from_dict(self, config: Dict[str, Any]) -> None: 1623 config = copy.deepcopy(config) 1624 1625 # Load simple properties 1626 if address := config.pop('address', None): 1627 self.address = Address(address) 1628 1629 # Load or synthesize an IRK 1630 if irk := config.pop('irk', None): 1631 self.irk = bytes.fromhex(irk) 1632 elif self.address != Address(DEVICE_DEFAULT_ADDRESS): 1633 # Construct an IRK from the address bytes 1634 # NOTE: this is not secure, but will always give the same IRK for the same 1635 # address 1636 address_bytes = bytes(self.address) 1637 self.irk = (address_bytes * 3)[:16] 1638 else: 1639 # Fallback - when both IRK and address are not set, randomly generate an IRK. 1640 self.irk = secrets.token_bytes(16) 1641 1642 if (name := config.pop('name', None)) is not None: 1643 self.name = name 1644 1645 # Load advertising data 1646 if advertising_data := config.pop('advertising_data', None): 1647 self.advertising_data = bytes.fromhex(advertising_data) 1648 elif name is not None: 1649 self.advertising_data = bytes( 1650 AdvertisingData( 1651 [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))] 1652 ) 1653 ) 1654 1655 # Load advertising interval (for backward compatibility) 1656 if advertising_interval := config.pop('advertising_interval', None): 1657 self.advertising_interval_min = advertising_interval 1658 self.advertising_interval_max = advertising_interval 1659 if ( 1660 'advertising_interval_max' in config 1661 or 'advertising_interval_min' in config 1662 ): 1663 logger.warning( 1664 'Trying to set both advertising_interval and ' 1665 'advertising_interval_min/max, advertising_interval will be' 1666 'ignored.' 1667 ) 1668 1669 # Load data in primitive types. 1670 for key, value in config.items(): 1671 setattr(self, key, value) 1672 1673 def load_from_file(self, filename: str) -> None: 1674 with open(filename, 'r', encoding='utf-8') as file: 1675 self.load_from_dict(json.load(file)) 1676 1677 @classmethod 1678 def from_file(cls: Type[Self], filename: str) -> Self: 1679 config = cls() 1680 config.load_from_file(filename) 1681 return config 1682 1683 @classmethod 1684 def from_dict(cls: Type[Self], config: Dict[str, Any]) -> Self: 1685 device_config = cls() 1686 device_config.load_from_dict(config) 1687 return device_config 1688 1689 1690# ----------------------------------------------------------------------------- 1691# Decorators used with the following Device class 1692# (we define them outside of the Device class, because defining decorators 1693# within a class requires unnecessarily complicated acrobatics) 1694# ----------------------------------------------------------------------------- 1695 1696 1697# Decorator that converts the first argument from a connection handle to a connection 1698def with_connection_from_handle(function): 1699 @functools.wraps(function) 1700 def wrapper(self, connection_handle, *args, **kwargs): 1701 if (connection := self.lookup_connection(connection_handle)) is None: 1702 raise ObjectLookupError( 1703 f'no connection for handle: 0x{connection_handle:04x}' 1704 ) 1705 return function(self, connection, *args, **kwargs) 1706 1707 return wrapper 1708 1709 1710# Decorator that converts the first argument from a bluetooth address to a connection 1711def with_connection_from_address(function): 1712 @functools.wraps(function) 1713 def wrapper(self, address, *args, **kwargs): 1714 if connection := self.pending_connections.get(address, False): 1715 return function(self, connection, *args, **kwargs) 1716 for connection in self.connections.values(): 1717 if connection.peer_address == address: 1718 return function(self, connection, *args, **kwargs) 1719 raise ObjectLookupError('no connection for address') 1720 1721 return wrapper 1722 1723 1724# Decorator that tries to convert the first argument from a bluetooth address to a 1725# connection 1726def try_with_connection_from_address(function): 1727 @functools.wraps(function) 1728 def wrapper(self, address, *args, **kwargs): 1729 if connection := self.pending_connections.get(address, False): 1730 return function(self, connection, address, *args, **kwargs) 1731 for connection in self.connections.values(): 1732 if connection.peer_address == address: 1733 return function(self, connection, address, *args, **kwargs) 1734 return function(self, None, address, *args, **kwargs) 1735 1736 return wrapper 1737 1738 1739# Decorator that converts the first argument from a sync handle to a periodic 1740# advertising sync object 1741def with_periodic_advertising_sync_from_handle(function): 1742 @functools.wraps(function) 1743 def wrapper(self, sync_handle, *args, **kwargs): 1744 if (sync := self.lookup_periodic_advertising_sync(sync_handle)) is None: 1745 raise ValueError( 1746 f'no periodic advertising sync for handle: 0x{sync_handle:04x}' 1747 ) 1748 return function(self, sync, *args, **kwargs) 1749 1750 return wrapper 1751 1752 1753# Decorator that adds a method to the list of event handlers for host events. 1754# This assumes that the method name starts with `on_` 1755def host_event_handler(function): 1756 device_host_event_handlers.append(function.__name__[3:]) 1757 return function 1758 1759 1760# List of host event handlers for the Device class. 1761# (we define this list outside the class, because referencing a class in method 1762# decorators is not straightforward) 1763device_host_event_handlers: List[str] = [] 1764 1765 1766# ----------------------------------------------------------------------------- 1767class Device(CompositeEventEmitter): 1768 # Incomplete list of fields. 1769 random_address: Address # Random private address that may change periodically 1770 public_address: Address # Public address that is globally unique (from controller) 1771 static_address: Address # Random static address that does not change once set 1772 classic_enabled: bool 1773 name: str 1774 class_of_device: int 1775 gatt_server: gatt_server.Server 1776 advertising_data: bytes 1777 scan_response_data: bytes 1778 connections: Dict[int, Connection] 1779 pending_connections: Dict[Address, Connection] 1780 classic_pending_accepts: Dict[ 1781 Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]] 1782 ] 1783 advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] 1784 periodic_advertising_syncs: List[PeriodicAdvertisingSync] 1785 config: DeviceConfiguration 1786 legacy_advertiser: Optional[LegacyAdvertiser] 1787 sco_links: Dict[int, ScoLink] 1788 cis_links: Dict[int, CisLink] 1789 _pending_cis: Dict[int, Tuple[int, int]] 1790 1791 @composite_listener 1792 class Listener: 1793 def on_advertisement(self, advertisement): 1794 pass 1795 1796 def on_inquiry_result(self, address, class_of_device, data, rssi): 1797 pass 1798 1799 def on_connection(self, connection): 1800 pass 1801 1802 def on_connection_failure(self, error): 1803 pass 1804 1805 def on_connection_request(self, bd_addr, class_of_device, link_type): 1806 pass 1807 1808 def on_characteristic_subscription( 1809 self, connection, characteristic, notify_enabled, indicate_enabled 1810 ): 1811 pass 1812 1813 @classmethod 1814 def with_hci( 1815 cls, 1816 name: str, 1817 address: Address, 1818 hci_source: TransportSource, 1819 hci_sink: TransportSink, 1820 ) -> Device: 1821 ''' 1822 Create a Device instance with a Host configured to communicate with a controller 1823 through an HCI source/sink 1824 ''' 1825 host = Host(controller_source=hci_source, controller_sink=hci_sink) 1826 return cls(name=name, address=address, host=host) 1827 1828 @classmethod 1829 def from_config_file(cls, filename: str) -> Device: 1830 config = DeviceConfiguration.from_file(filename) 1831 return cls(config=config) 1832 1833 @classmethod 1834 def from_config_with_hci( 1835 cls, 1836 config: DeviceConfiguration, 1837 hci_source: TransportSource, 1838 hci_sink: TransportSink, 1839 ) -> Device: 1840 host = Host(controller_source=hci_source, controller_sink=hci_sink) 1841 return cls(config=config, host=host) 1842 1843 @classmethod 1844 def from_config_file_with_hci( 1845 cls, filename: str, hci_source: TransportSource, hci_sink: TransportSink 1846 ) -> Device: 1847 config = DeviceConfiguration.from_file(filename) 1848 return cls.from_config_with_hci(config, hci_source, hci_sink) 1849 1850 def __init__( 1851 self, 1852 name: Optional[str] = None, 1853 address: Optional[Address] = None, 1854 config: Optional[DeviceConfiguration] = None, 1855 host: Optional[Host] = None, 1856 generic_access_service: bool = True, 1857 ) -> None: 1858 super().__init__() 1859 1860 self._host = None 1861 self.powered_on = False 1862 self.auto_restart_inquiry = True 1863 self.command_timeout = 10 # seconds 1864 self.gatt_server = gatt_server.Server(self) 1865 self.sdp_server = sdp.Server(self) 1866 self.l2cap_channel_manager = l2cap.ChannelManager( 1867 [l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS] 1868 ) 1869 self.advertisement_accumulators = {} # Accumulators, by address 1870 self.periodic_advertising_syncs = [] 1871 self.scanning = False 1872 self.scanning_is_passive = False 1873 self.discovering = False 1874 self.le_connecting = False 1875 self.disconnecting = False 1876 self.connections = {} # Connections, by connection handle 1877 self.pending_connections = {} # Connections, by BD address (BR/EDR only) 1878 self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only) 1879 self.cis_links = {} # CisLinks, by connection handle (LE only) 1880 self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle 1881 self.classic_enabled = False 1882 self.inquiry_response = None 1883 self.address_resolver = None 1884 self.classic_pending_accepts = { 1885 Address.ANY: [] 1886 } # Futures, by BD address OR [Futures] for Address.ANY 1887 1888 # In Python <= 3.9 + Rust Runtime, asyncio.Lock cannot be properly initiated. 1889 if sys.version_info >= (3, 10): 1890 self._cis_lock = asyncio.Lock() 1891 else: 1892 self._cis_lock = AsyncExitStack() 1893 1894 # Own address type cache 1895 self.connect_own_address_type = None 1896 1897 # Use the initial config or a default 1898 config = config or DeviceConfiguration() 1899 self.config = config 1900 1901 self.name = config.name 1902 self.public_address = Address.ANY 1903 self.random_address = config.address 1904 self.static_address = config.address 1905 self.class_of_device = config.class_of_device 1906 self.keystore = None 1907 self.irk = config.irk 1908 self.le_enabled = config.le_enabled 1909 self.le_simultaneous_enabled = config.le_simultaneous_enabled 1910 self.le_privacy_enabled = config.le_privacy_enabled 1911 self.le_rpa_timeout = config.le_rpa_timeout 1912 self.le_rpa_periodic_update_task: Optional[asyncio.Task] = None 1913 self.classic_enabled = config.classic_enabled 1914 self.cis_enabled = config.cis_enabled 1915 self.classic_sc_enabled = config.classic_sc_enabled 1916 self.classic_ssp_enabled = config.classic_ssp_enabled 1917 self.classic_smp_enabled = config.classic_smp_enabled 1918 self.classic_interlaced_scan_enabled = config.classic_interlaced_scan_enabled 1919 self.discoverable = config.discoverable 1920 self.connectable = config.connectable 1921 self.classic_accept_any = config.classic_accept_any 1922 self.address_resolution_offload = config.address_resolution_offload 1923 self.address_generation_offload = config.address_generation_offload 1924 1925 # Extended advertising. 1926 self.extended_advertising_sets: Dict[int, AdvertisingSet] = {} 1927 self.connecting_extended_advertising_sets: Dict[int, AdvertisingSet] = {} 1928 1929 # Legacy advertising. 1930 # The advertising and scan response data, as well as the advertising interval 1931 # values are stored as properties of this object for convenience so that they 1932 # can be initialized from a config object, and for backward compatibility for 1933 # client code that may set those values directly before calling 1934 # start_advertising(). 1935 self.legacy_advertising_set: Optional[AdvertisingSet] = None 1936 self.legacy_advertiser: Optional[LegacyAdvertiser] = None 1937 self.advertising_data = config.advertising_data 1938 self.scan_response_data = config.scan_response_data 1939 self.advertising_interval_min = config.advertising_interval_min 1940 self.advertising_interval_max = config.advertising_interval_max 1941 1942 for service in config.gatt_services: 1943 characteristics = [] 1944 for characteristic in service.get("characteristics", []): 1945 descriptors = [] 1946 for descriptor in characteristic.get("descriptors", []): 1947 # Leave this check until 5/25/2023 1948 if descriptor.get("permission", False): 1949 raise Exception( 1950 "Error parsing Device Config's GATT Services. " 1951 "The key 'permission' must be renamed to 'permissions'" 1952 ) 1953 new_descriptor = Descriptor( 1954 attribute_type=descriptor["descriptor_type"], 1955 permissions=descriptor["permissions"], 1956 ) 1957 descriptors.append(new_descriptor) 1958 new_characteristic = Characteristic( 1959 uuid=characteristic["uuid"], 1960 properties=Characteristic.Properties.from_string( 1961 characteristic["properties"] 1962 ), 1963 permissions=characteristic["permissions"], 1964 descriptors=descriptors, 1965 ) 1966 characteristics.append(new_characteristic) 1967 new_service = Service(uuid=service["uuid"], characteristics=characteristics) 1968 self.gatt_server.add_service(new_service) 1969 1970 # If a name is passed, override the name from the config 1971 if name: 1972 self.name = name 1973 1974 # If an address is passed, override the address from the config 1975 if address: 1976 if isinstance(address, str): 1977 address = Address(address) 1978 self.random_address = address 1979 self.static_address = address 1980 1981 # Setup SMP 1982 self.smp_manager = smp.Manager( 1983 self, 1984 pairing_config_factory=lambda connection: pairing.PairingConfig( 1985 identity_address_type=( 1986 pairing.PairingConfig.AddressType(self.config.identity_address_type) 1987 if self.config.identity_address_type 1988 else None 1989 ), 1990 delegate=pairing.PairingDelegate( 1991 io_capability=pairing.PairingDelegate.IoCapability( 1992 self.config.io_capability 1993 ) 1994 ), 1995 ), 1996 ) 1997 1998 self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu) 1999 2000 # Register the SDP server with the L2CAP Channel Manager 2001 self.sdp_server.register(self.l2cap_channel_manager) 2002 2003 self.add_default_services(generic_access_service) 2004 self.l2cap_channel_manager.register_fixed_channel(ATT_CID, self.on_gatt_pdu) 2005 2006 # Forward some events 2007 setup_event_forwarding(self.gatt_server, self, 'characteristic_subscription') 2008 2009 # Set the initial host 2010 if host: 2011 self.host = host 2012 2013 @property 2014 def host(self) -> Host: 2015 assert self._host 2016 return self._host 2017 2018 @host.setter 2019 def host(self, host: Host) -> None: 2020 # Unsubscribe from events from the current host 2021 if self._host: 2022 for event_name in device_host_event_handlers: 2023 self._host.remove_listener( 2024 event_name, getattr(self, f'on_{event_name}') 2025 ) 2026 2027 # Subscribe to events from the new host 2028 if host: 2029 for event_name in device_host_event_handlers: 2030 host.on(event_name, getattr(self, f'on_{event_name}')) 2031 2032 # Update the references to the new host 2033 self._host = host 2034 self.l2cap_channel_manager.host = host 2035 2036 # Set providers for the new host 2037 if host: 2038 host.long_term_key_provider = self.get_long_term_key 2039 host.link_key_provider = self.get_link_key 2040 2041 @property 2042 def sdp_service_records(self): 2043 return self.sdp_server.service_records 2044 2045 @sdp_service_records.setter 2046 def sdp_service_records(self, service_records): 2047 self.sdp_server.service_records = service_records 2048 2049 def lookup_connection(self, connection_handle: int) -> Optional[Connection]: 2050 if connection := self.connections.get(connection_handle): 2051 return connection 2052 2053 return None 2054 2055 def find_connection_by_bd_addr( 2056 self, 2057 bd_addr: Address, 2058 transport: Optional[int] = None, 2059 check_address_type: bool = False, 2060 ) -> Optional[Connection]: 2061 for connection in self.connections.values(): 2062 if connection.peer_address.to_bytes() == bd_addr.to_bytes(): 2063 if ( 2064 check_address_type 2065 and connection.peer_address.address_type != bd_addr.address_type 2066 ): 2067 continue 2068 if transport is None or connection.transport == transport: 2069 return connection 2070 2071 return None 2072 2073 def lookup_periodic_advertising_sync( 2074 self, sync_handle: int 2075 ) -> Optional[PeriodicAdvertisingSync]: 2076 return next( 2077 ( 2078 sync 2079 for sync in self.periodic_advertising_syncs 2080 if sync.sync_handle == sync_handle 2081 ), 2082 None, 2083 ) 2084 2085 @deprecated("Please use create_l2cap_server()") 2086 def register_l2cap_server(self, psm, server) -> int: 2087 return self.l2cap_channel_manager.register_server(psm, server) 2088 2089 @deprecated("Please use create_l2cap_server()") 2090 def register_l2cap_channel_server( 2091 self, 2092 psm, 2093 server, 2094 max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS, 2095 mtu=DEVICE_DEFAULT_L2CAP_COC_MTU, 2096 mps=DEVICE_DEFAULT_L2CAP_COC_MPS, 2097 ): 2098 return self.l2cap_channel_manager.register_le_coc_server( 2099 psm, server, max_credits, mtu, mps 2100 ) 2101 2102 @deprecated("Please use create_l2cap_channel()") 2103 async def open_l2cap_channel( 2104 self, 2105 connection, 2106 psm, 2107 max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS, 2108 mtu=DEVICE_DEFAULT_L2CAP_COC_MTU, 2109 mps=DEVICE_DEFAULT_L2CAP_COC_MPS, 2110 ): 2111 return await self.l2cap_channel_manager.open_le_coc( 2112 connection, psm, max_credits, mtu, mps 2113 ) 2114 2115 @overload 2116 async def create_l2cap_channel( 2117 self, 2118 connection: Connection, 2119 spec: l2cap.ClassicChannelSpec, 2120 ) -> l2cap.ClassicChannel: ... 2121 2122 @overload 2123 async def create_l2cap_channel( 2124 self, 2125 connection: Connection, 2126 spec: l2cap.LeCreditBasedChannelSpec, 2127 ) -> l2cap.LeCreditBasedChannel: ... 2128 2129 async def create_l2cap_channel( 2130 self, 2131 connection: Connection, 2132 spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec], 2133 ) -> Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]: 2134 if isinstance(spec, l2cap.ClassicChannelSpec): 2135 return await self.l2cap_channel_manager.create_classic_channel( 2136 connection=connection, spec=spec 2137 ) 2138 if isinstance(spec, l2cap.LeCreditBasedChannelSpec): 2139 return await self.l2cap_channel_manager.create_le_credit_based_channel( 2140 connection=connection, spec=spec 2141 ) 2142 2143 @overload 2144 def create_l2cap_server( 2145 self, 2146 spec: l2cap.ClassicChannelSpec, 2147 handler: Optional[Callable[[l2cap.ClassicChannel], Any]] = None, 2148 ) -> l2cap.ClassicChannelServer: ... 2149 2150 @overload 2151 def create_l2cap_server( 2152 self, 2153 spec: l2cap.LeCreditBasedChannelSpec, 2154 handler: Optional[Callable[[l2cap.LeCreditBasedChannel], Any]] = None, 2155 ) -> l2cap.LeCreditBasedChannelServer: ... 2156 2157 def create_l2cap_server( 2158 self, 2159 spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec], 2160 handler: Union[ 2161 Callable[[l2cap.ClassicChannel], Any], 2162 Callable[[l2cap.LeCreditBasedChannel], Any], 2163 None, 2164 ] = None, 2165 ) -> Union[l2cap.ClassicChannelServer, l2cap.LeCreditBasedChannelServer]: 2166 if isinstance(spec, l2cap.ClassicChannelSpec): 2167 return self.l2cap_channel_manager.create_classic_server( 2168 spec=spec, 2169 handler=cast(Callable[[l2cap.ClassicChannel], Any], handler), 2170 ) 2171 elif isinstance(spec, l2cap.LeCreditBasedChannelSpec): 2172 return self.l2cap_channel_manager.create_le_credit_based_server( 2173 handler=cast(Callable[[l2cap.LeCreditBasedChannel], Any], handler), 2174 spec=spec, 2175 ) 2176 else: 2177 raise InvalidArgumentError(f'Unexpected mode {spec}') 2178 2179 def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None: 2180 self.host.send_l2cap_pdu(connection_handle, cid, pdu) 2181 2182 async def send_command(self, command, check_result=False): 2183 try: 2184 return await asyncio.wait_for( 2185 self.host.send_command(command, check_result), self.command_timeout 2186 ) 2187 except asyncio.TimeoutError as error: 2188 logger.warning(f'!!! Command {command.name} timed out') 2189 raise CommandTimeoutError() from error 2190 2191 async def power_on(self) -> None: 2192 # Reset the controller 2193 await self.host.reset() 2194 2195 # Try to get the public address from the controller 2196 response = await self.send_command(HCI_Read_BD_ADDR_Command()) 2197 if response.return_parameters.status == HCI_SUCCESS: 2198 logger.debug( 2199 color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow') 2200 ) 2201 self.public_address = response.return_parameters.bd_addr 2202 2203 # Instantiate the Key Store (we do this here rather than at __init__ time 2204 # because some Key Store implementations use the public address as a namespace) 2205 if self.keystore is None: 2206 self.keystore = KeyStore.create_for_device(self) 2207 2208 # Finish setting up SMP based on post-init configurable options 2209 if self.classic_smp_enabled: 2210 self.l2cap_channel_manager.register_fixed_channel( 2211 smp.SMP_BR_CID, self.on_smp_pdu 2212 ) 2213 2214 if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND): 2215 await self.send_command( 2216 HCI_Write_LE_Host_Support_Command( 2217 le_supported_host=int(self.le_enabled), 2218 simultaneous_le_host=int(self.le_simultaneous_enabled), 2219 ), 2220 check_result=True, 2221 ) 2222 2223 if self.le_enabled: 2224 # Generate a random address if not set. 2225 if self.static_address == Address.ANY_RANDOM: 2226 self.static_address = Address.generate_static_address() 2227 2228 # If LE Privacy is enabled, generate an RPA 2229 if self.le_privacy_enabled: 2230 self.random_address = Address.generate_private_address(self.irk) 2231 logger.info(f'Initial RPA: {self.random_address}') 2232 if self.le_rpa_timeout > 0: 2233 # Start a task to periodically generate a new RPA 2234 self.le_rpa_periodic_update_task = asyncio.create_task( 2235 self._run_rpa_periodic_update() 2236 ) 2237 else: 2238 self.random_address = self.static_address 2239 2240 if self.random_address != Address.ANY_RANDOM: 2241 logger.debug( 2242 color( 2243 f'LE Random Address: {self.random_address}', 2244 'yellow', 2245 ) 2246 ) 2247 await self.send_command( 2248 HCI_LE_Set_Random_Address_Command( 2249 random_address=self.random_address 2250 ), 2251 check_result=True, 2252 ) 2253 2254 # Load the address resolving list 2255 if self.keystore: 2256 await self.refresh_resolving_list() 2257 2258 # Enable address resolution 2259 if self.address_resolution_offload: 2260 await self.send_command( 2261 HCI_LE_Set_Address_Resolution_Enable_Command( 2262 address_resolution_enable=1 2263 ), 2264 check_result=True, 2265 ) 2266 2267 if self.cis_enabled: 2268 await self.send_command( 2269 HCI_LE_Set_Host_Feature_Command( 2270 bit_number=LeFeature.CONNECTED_ISOCHRONOUS_STREAM, 2271 bit_value=1, 2272 ), 2273 check_result=True, 2274 ) 2275 2276 if self.classic_enabled: 2277 await self.send_command( 2278 HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) 2279 ) 2280 await self.send_command( 2281 HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device) 2282 ) 2283 await self.send_command( 2284 HCI_Write_Simple_Pairing_Mode_Command( 2285 simple_pairing_mode=int(self.classic_ssp_enabled) 2286 ) 2287 ) 2288 await self.send_command( 2289 HCI_Write_Secure_Connections_Host_Support_Command( 2290 secure_connections_host_support=int(self.classic_sc_enabled) 2291 ) 2292 ) 2293 await self.set_connectable(self.connectable) 2294 await self.set_discoverable(self.discoverable) 2295 2296 if self.classic_interlaced_scan_enabled: 2297 if self.host.supports_lmp_features(LmpFeatureMask.INTERLACED_PAGE_SCAN): 2298 await self.send_command( 2299 hci.HCI_Write_Page_Scan_Type_Command(page_scan_type=1), 2300 check_result=True, 2301 ) 2302 2303 if self.host.supports_lmp_features( 2304 LmpFeatureMask.INTERLACED_INQUIRY_SCAN 2305 ): 2306 await self.send_command( 2307 hci.HCI_Write_Inquiry_Scan_Type_Command(scan_type=1), 2308 check_result=True, 2309 ) 2310 2311 # Done 2312 self.powered_on = True 2313 2314 async def reset(self) -> None: 2315 await self.host.reset() 2316 2317 async def power_off(self) -> None: 2318 if self.powered_on: 2319 if self.le_rpa_periodic_update_task: 2320 self.le_rpa_periodic_update_task.cancel() 2321 2322 await self.host.flush() 2323 2324 self.powered_on = False 2325 2326 async def update_rpa(self) -> bool: 2327 """ 2328 Try to update the RPA. 2329 2330 Returns: 2331 True if the RPA was updated, False if it could not be updated. 2332 """ 2333 2334 # Check if this is a good time to rotate the address 2335 if self.is_advertising or self.is_scanning or self.is_le_connecting: 2336 logger.debug('skipping RPA update') 2337 return False 2338 2339 random_address = Address.generate_private_address(self.irk) 2340 response = await self.send_command( 2341 HCI_LE_Set_Random_Address_Command(random_address=self.random_address) 2342 ) 2343 if response.return_parameters == HCI_SUCCESS: 2344 logger.info(f'new RPA: {random_address}') 2345 self.random_address = random_address 2346 return True 2347 else: 2348 logger.warning(f'failed to set RPA: {response.return_parameters}') 2349 return False 2350 2351 async def _run_rpa_periodic_update(self) -> None: 2352 """Update the RPA periodically""" 2353 while self.le_rpa_timeout != 0: 2354 await asyncio.sleep(self.le_rpa_timeout) 2355 if not self.update_rpa(): 2356 logger.debug("periodic RPA update failed") 2357 2358 async def refresh_resolving_list(self) -> None: 2359 assert self.keystore is not None 2360 2361 resolving_keys = await self.keystore.get_resolving_keys() 2362 # Create a host-side address resolver 2363 self.address_resolver = smp.AddressResolver(resolving_keys) 2364 2365 if self.address_resolution_offload or self.address_generation_offload: 2366 await self.send_command(HCI_LE_Clear_Resolving_List_Command()) 2367 2368 # Add an empty entry for non-directed address generation. 2369 await self.send_command( 2370 HCI_LE_Add_Device_To_Resolving_List_Command( 2371 peer_identity_address_type=Address.ANY.address_type, 2372 peer_identity_address=Address.ANY, 2373 peer_irk=bytes(16), 2374 local_irk=self.irk, 2375 ) 2376 ) 2377 2378 for irk, address in resolving_keys: 2379 await self.send_command( 2380 HCI_LE_Add_Device_To_Resolving_List_Command( 2381 peer_identity_address_type=address.address_type, 2382 peer_identity_address=address, 2383 peer_irk=irk, 2384 local_irk=self.irk, 2385 ) 2386 ) 2387 2388 def supports_le_features(self, feature: LeFeatureMask) -> bool: 2389 return self.host.supports_le_features(feature) 2390 2391 def supports_le_phy(self, phy: int) -> bool: 2392 if phy == HCI_LE_1M_PHY: 2393 return True 2394 2395 feature_map = { 2396 HCI_LE_2M_PHY: LeFeatureMask.LE_2M_PHY, 2397 HCI_LE_CODED_PHY: LeFeatureMask.LE_CODED_PHY, 2398 } 2399 if phy not in feature_map: 2400 raise InvalidArgumentError('invalid PHY') 2401 2402 return self.supports_le_features(feature_map[phy]) 2403 2404 @property 2405 def supports_le_extended_advertising(self): 2406 return self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING) 2407 2408 @property 2409 def supports_le_periodic_advertising(self): 2410 return self.supports_le_features(LeFeatureMask.LE_PERIODIC_ADVERTISING) 2411 2412 async def start_advertising( 2413 self, 2414 advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 2415 target: Optional[Address] = None, 2416 own_address_type: int = OwnAddressType.RANDOM, 2417 auto_restart: bool = False, 2418 advertising_data: Optional[bytes] = None, 2419 scan_response_data: Optional[bytes] = None, 2420 advertising_interval_min: Optional[int] = None, 2421 advertising_interval_max: Optional[int] = None, 2422 ) -> None: 2423 """Start legacy advertising. 2424 2425 If the controller supports it, extended advertising commands with legacy PDUs 2426 will be used to advertise. If not, legacy advertising commands will be used. 2427 2428 Args: 2429 advertising_type: 2430 Type of advertising events. 2431 target: 2432 Peer address for directed advertising target. 2433 (Ignored if `advertising_type` is not directed) 2434 own_address_type: 2435 Own address type to use in the advertising. 2436 auto_restart: 2437 Whether the advertisement will be restarted after disconnection. 2438 advertising_data: 2439 Raw advertising data. If None, the value of the property 2440 self.advertising_data will be used. 2441 scan_response_data: 2442 Raw scan response. If None, the value of the property 2443 self.scan_response_data will be used. 2444 advertising_interval_min: 2445 Minimum advertising interval, in milliseconds. If None, the value of the 2446 property self.advertising_interval_min will be used. 2447 advertising_interval_max: 2448 Maximum advertising interval, in milliseconds. If None, the value of the 2449 property self.advertising_interval_max will be used. 2450 """ 2451 # Update backing properties. 2452 if advertising_data is not None: 2453 self.advertising_data = advertising_data 2454 if scan_response_data is not None: 2455 self.scan_response_data = scan_response_data 2456 if advertising_interval_min is not None: 2457 self.advertising_interval_min = advertising_interval_min 2458 if advertising_interval_max is not None: 2459 self.advertising_interval_max = advertising_interval_max 2460 2461 # Decide what peer address to use 2462 if advertising_type.is_directed: 2463 if target is None: 2464 raise InvalidArgumentError('directed advertising requires a target') 2465 peer_address = target 2466 else: 2467 peer_address = Address.ANY 2468 2469 # If we're already advertising, stop now because we'll be re-creating 2470 # a new advertiser or advertising set. 2471 await self.stop_advertising() 2472 assert self.legacy_advertiser is None 2473 assert self.legacy_advertising_set is None 2474 2475 if self.supports_le_extended_advertising: 2476 # Use extended advertising commands with legacy PDUs. 2477 self.legacy_advertising_set = await self.create_advertising_set( 2478 auto_start=True, 2479 auto_restart=auto_restart, 2480 random_address=self.random_address, 2481 advertising_parameters=AdvertisingParameters( 2482 advertising_event_properties=( 2483 AdvertisingEventProperties.from_advertising_type( 2484 advertising_type 2485 ) 2486 ), 2487 primary_advertising_interval_min=self.advertising_interval_min, 2488 primary_advertising_interval_max=self.advertising_interval_max, 2489 own_address_type=OwnAddressType(own_address_type), 2490 peer_address=peer_address, 2491 ), 2492 advertising_data=( 2493 self.advertising_data if advertising_type.has_data else b'' 2494 ), 2495 scan_response_data=( 2496 self.scan_response_data if advertising_type.is_scannable else b'' 2497 ), 2498 ) 2499 else: 2500 # Use legacy commands. 2501 self.legacy_advertiser = LegacyAdvertiser( 2502 device=self, 2503 advertising_type=advertising_type, 2504 own_address_type=OwnAddressType(own_address_type), 2505 peer_address=peer_address, 2506 auto_restart=auto_restart, 2507 ) 2508 2509 await self.legacy_advertiser.start() 2510 2511 async def stop_advertising(self) -> None: 2512 """Stop legacy advertising.""" 2513 # Disable advertising 2514 if self.legacy_advertising_set: 2515 if self.legacy_advertising_set.enabled: 2516 await self.legacy_advertising_set.stop() 2517 await self.legacy_advertising_set.remove() 2518 self.legacy_advertising_set = None 2519 elif self.legacy_advertiser: 2520 await self.legacy_advertiser.stop() 2521 self.legacy_advertiser = None 2522 2523 async def create_advertising_set( 2524 self, 2525 advertising_parameters: Optional[AdvertisingParameters] = None, 2526 random_address: Optional[Address] = None, 2527 advertising_data: bytes = b'', 2528 scan_response_data: bytes = b'', 2529 periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] = None, 2530 periodic_advertising_data: bytes = b'', 2531 auto_start: bool = True, 2532 auto_restart: bool = False, 2533 ) -> AdvertisingSet: 2534 """ 2535 Create an advertising set. 2536 2537 This method allows the creation of advertising sets for controllers that 2538 support extended advertising. 2539 2540 Args: 2541 advertising_parameters: 2542 The parameters to use for this set. If None, default parameters are used. 2543 random_address: 2544 The random address to use (only relevant when the parameters specify that 2545 own_address_type is random). 2546 advertising_data: 2547 Initial value for the set's advertising data. 2548 scan_response_data: 2549 Initial value for the set's scan response data. 2550 periodic_advertising_parameters: 2551 The parameters to use for periodic advertising (if needed). 2552 periodic_advertising_data: 2553 Initial value for the set's periodic advertising data. 2554 auto_start: 2555 True if the set should be automatically started upon creation. 2556 auto_restart: 2557 True if the set should be automatically restated after a disconnection. 2558 2559 Returns: 2560 An AdvertisingSet instance. 2561 """ 2562 # Instantiate default values 2563 if advertising_parameters is None: 2564 advertising_parameters = AdvertisingParameters() 2565 2566 if ( 2567 not advertising_parameters.advertising_event_properties.is_legacy 2568 and advertising_data 2569 and scan_response_data 2570 ): 2571 raise InvalidArgumentError( 2572 "Extended advertisements can't have both data and scan \ 2573 response data" 2574 ) 2575 2576 # Allocate a new handle 2577 try: 2578 advertising_handle = next( 2579 handle 2580 for handle in range( 2581 DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, 2582 DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, 2583 ) 2584 if handle not in self.extended_advertising_sets 2585 ) 2586 except StopIteration as exc: 2587 raise OutOfResourcesError( 2588 "all valid advertising handles already in use" 2589 ) from exc 2590 2591 # Use the device's random address if a random address is needed but none was 2592 # provided. 2593 if ( 2594 advertising_parameters.own_address_type 2595 in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) 2596 and random_address is None 2597 ): 2598 random_address = self.random_address 2599 2600 # Create the object that represents the set. 2601 advertising_set = AdvertisingSet( 2602 device=self, 2603 advertising_handle=advertising_handle, 2604 auto_restart=auto_restart, 2605 random_address=random_address, 2606 advertising_parameters=advertising_parameters, 2607 advertising_data=advertising_data, 2608 scan_response_data=scan_response_data, 2609 periodic_advertising_parameters=periodic_advertising_parameters, 2610 periodic_advertising_data=periodic_advertising_data, 2611 ) 2612 2613 # Create the set in the controller. 2614 await advertising_set.set_advertising_parameters(advertising_parameters) 2615 2616 # Update the set in the controller. 2617 try: 2618 if random_address: 2619 await advertising_set.set_random_address(random_address) 2620 2621 if advertising_data: 2622 await advertising_set.set_advertising_data(advertising_data) 2623 2624 if scan_response_data: 2625 await advertising_set.set_scan_response_data(scan_response_data) 2626 2627 if periodic_advertising_parameters: 2628 # TODO: call LE Set Periodic Advertising Parameters command 2629 raise NotImplementedError('periodic advertising not yet supported') 2630 2631 if periodic_advertising_data: 2632 # TODO: call LE Set Periodic Advertising Data command 2633 raise NotImplementedError('periodic advertising not yet supported') 2634 2635 except HCI_Error as error: 2636 # Remove the advertising set so that it doesn't stay dangling in the 2637 # controller. 2638 await self.send_command( 2639 HCI_LE_Remove_Advertising_Set_Command( 2640 advertising_handle=advertising_handle 2641 ), 2642 check_result=False, 2643 ) 2644 raise error 2645 2646 # Remember the set. 2647 self.extended_advertising_sets[advertising_handle] = advertising_set 2648 2649 # Try to start the set if requested. 2650 if auto_start: 2651 try: 2652 # pylint: disable=line-too-long 2653 duration = ( 2654 DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION 2655 if advertising_parameters.advertising_event_properties.is_high_duty_cycle_directed_connectable 2656 else 0 2657 ) 2658 await advertising_set.start(duration=duration) 2659 except Exception as error: 2660 logger.exception(f'failed to start advertising set: {error}') 2661 await advertising_set.remove() 2662 raise 2663 2664 return advertising_set 2665 2666 @property 2667 def is_advertising(self): 2668 if self.legacy_advertiser: 2669 return True 2670 2671 return any( 2672 advertising_set.enabled 2673 for advertising_set in self.extended_advertising_sets.values() 2674 ) 2675 2676 async def start_scanning( 2677 self, 2678 legacy: bool = False, 2679 active: bool = True, 2680 scan_interval: int = DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms 2681 scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms 2682 own_address_type: int = OwnAddressType.RANDOM, 2683 filter_duplicates: bool = False, 2684 scanning_phys: List[int] = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY], 2685 ) -> None: 2686 # Check that the arguments are legal 2687 if scan_interval < scan_window: 2688 raise InvalidArgumentError('scan_interval must be >= scan_window') 2689 if ( 2690 scan_interval < DEVICE_MIN_SCAN_INTERVAL 2691 or scan_interval > DEVICE_MAX_SCAN_INTERVAL 2692 ): 2693 raise InvalidArgumentError('scan_interval out of range') 2694 if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW: 2695 raise InvalidArgumentError('scan_interval out of range') 2696 2697 # Reset the accumulators 2698 self.advertisement_accumulators = {} 2699 2700 # Enable scanning 2701 if not legacy and self.supports_le_extended_advertising: 2702 # Set the scanning parameters 2703 scan_type = ( 2704 HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING 2705 if active 2706 else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING 2707 ) 2708 scanning_filter_policy = ( 2709 HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY 2710 ) # TODO: support other types 2711 2712 scanning_phy_count = 0 2713 scanning_phys_bits = 0 2714 if HCI_LE_1M_PHY in scanning_phys: 2715 scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT 2716 scanning_phy_count += 1 2717 if HCI_LE_CODED_PHY in scanning_phys: 2718 if self.supports_le_features(LeFeatureMask.LE_CODED_PHY): 2719 scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT 2720 scanning_phy_count += 1 2721 2722 if scanning_phy_count == 0: 2723 raise InvalidArgumentError('at least one scanning PHY must be enabled') 2724 2725 await self.send_command( 2726 HCI_LE_Set_Extended_Scan_Parameters_Command( 2727 own_address_type=own_address_type, 2728 scanning_filter_policy=scanning_filter_policy, 2729 scanning_phys=scanning_phys_bits, 2730 scan_types=[scan_type] * scanning_phy_count, 2731 scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count, 2732 scan_windows=[int(scan_window / 0.625)] * scanning_phy_count, 2733 ), 2734 check_result=True, 2735 ) 2736 2737 # Enable scanning 2738 await self.send_command( 2739 HCI_LE_Set_Extended_Scan_Enable_Command( 2740 enable=1, 2741 filter_duplicates=1 if filter_duplicates else 0, 2742 duration=0, # TODO allow other values 2743 period=0, # TODO allow other values 2744 ), 2745 check_result=True, 2746 ) 2747 else: 2748 # Set the scanning parameters 2749 scan_type = ( 2750 HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING 2751 if active 2752 else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING 2753 ) 2754 await self.send_command( 2755 # pylint: disable=line-too-long 2756 HCI_LE_Set_Scan_Parameters_Command( 2757 le_scan_type=scan_type, 2758 le_scan_interval=int(scan_window / 0.625), 2759 le_scan_window=int(scan_window / 0.625), 2760 own_address_type=own_address_type, 2761 scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY, 2762 ), 2763 check_result=True, 2764 ) 2765 2766 # Enable scanning 2767 await self.send_command( 2768 HCI_LE_Set_Scan_Enable_Command( 2769 le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0 2770 ), 2771 check_result=True, 2772 ) 2773 2774 self.scanning_is_passive = not active 2775 self.scanning = True 2776 2777 async def stop_scanning(self, legacy: bool = False) -> None: 2778 # Disable scanning 2779 if not legacy and self.supports_le_extended_advertising: 2780 await self.send_command( 2781 HCI_LE_Set_Extended_Scan_Enable_Command( 2782 enable=0, filter_duplicates=0, duration=0, period=0 2783 ), 2784 check_result=True, 2785 ) 2786 else: 2787 await self.send_command( 2788 HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0), 2789 check_result=True, 2790 ) 2791 2792 self.scanning = False 2793 2794 @property 2795 def is_scanning(self): 2796 return self.scanning 2797 2798 @host_event_handler 2799 def on_advertising_report(self, report): 2800 if not (accumulator := self.advertisement_accumulators.get(report.address)): 2801 accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive) 2802 self.advertisement_accumulators[report.address] = accumulator 2803 if advertisement := accumulator.update(report): 2804 self.emit('advertisement', advertisement) 2805 2806 async def create_periodic_advertising_sync( 2807 self, 2808 advertiser_address: Address, 2809 sid: int, 2810 skip: int = DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP, 2811 sync_timeout: float = DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT, 2812 filter_duplicates: bool = False, 2813 ) -> PeriodicAdvertisingSync: 2814 # Check that the controller supports the feature. 2815 if not self.supports_le_periodic_advertising: 2816 raise NotSupportedError() 2817 2818 # Check that there isn't already an equivalent entry 2819 if any( 2820 sync.advertiser_address == advertiser_address and sync.sid == sid 2821 for sync in self.periodic_advertising_syncs 2822 ): 2823 raise ValueError("equivalent entry already created") 2824 2825 # Create a new entry 2826 sync = PeriodicAdvertisingSync( 2827 device=self, 2828 advertiser_address=advertiser_address, 2829 sid=sid, 2830 skip=skip, 2831 sync_timeout=sync_timeout, 2832 filter_duplicates=filter_duplicates, 2833 ) 2834 2835 self.periodic_advertising_syncs.append(sync) 2836 2837 # Check if any sync should be started 2838 await self._update_periodic_advertising_syncs() 2839 2840 return sync 2841 2842 async def _update_periodic_advertising_syncs(self) -> None: 2843 # Check if there's already a pending sync 2844 if any( 2845 sync.state == PeriodicAdvertisingSync.State.PENDING 2846 for sync in self.periodic_advertising_syncs 2847 ): 2848 logger.debug("at least one sync pending, nothing to update yet") 2849 return 2850 2851 # Start the next sync that's waiting to be started 2852 if ready := next( 2853 ( 2854 sync 2855 for sync in self.periodic_advertising_syncs 2856 if sync.state == PeriodicAdvertisingSync.State.INIT 2857 ), 2858 None, 2859 ): 2860 await ready.establish() 2861 return 2862 2863 @host_event_handler 2864 def on_periodic_advertising_sync_establishment( 2865 self, 2866 status: int, 2867 sync_handle: int, 2868 advertising_sid: int, 2869 advertiser_address: Address, 2870 advertiser_phy: int, 2871 periodic_advertising_interval: int, 2872 advertiser_clock_accuracy: int, 2873 ) -> None: 2874 for periodic_advertising_sync in self.periodic_advertising_syncs: 2875 if ( 2876 periodic_advertising_sync.advertiser_address == advertiser_address 2877 and periodic_advertising_sync.sid == advertising_sid 2878 ): 2879 periodic_advertising_sync.on_establishment( 2880 status, 2881 sync_handle, 2882 advertiser_phy, 2883 periodic_advertising_interval, 2884 advertiser_clock_accuracy, 2885 ) 2886 2887 AsyncRunner.spawn(self._update_periodic_advertising_syncs()) 2888 2889 return 2890 2891 logger.warning( 2892 "periodic advertising sync establishment for unknown address/sid" 2893 ) 2894 2895 @host_event_handler 2896 @with_periodic_advertising_sync_from_handle 2897 def on_periodic_advertising_sync_loss( 2898 self, periodic_advertising_sync: PeriodicAdvertisingSync 2899 ): 2900 periodic_advertising_sync.on_loss() 2901 2902 @host_event_handler 2903 @with_periodic_advertising_sync_from_handle 2904 def on_periodic_advertising_report( 2905 self, 2906 periodic_advertising_sync: PeriodicAdvertisingSync, 2907 report: HCI_LE_Periodic_Advertising_Report_Event, 2908 ): 2909 periodic_advertising_sync.on_periodic_advertising_report(report) 2910 2911 @host_event_handler 2912 @with_periodic_advertising_sync_from_handle 2913 def on_biginfo_advertising_report( 2914 self, 2915 periodic_advertising_sync: PeriodicAdvertisingSync, 2916 report: HCI_LE_BIGInfo_Advertising_Report_Event, 2917 ): 2918 periodic_advertising_sync.on_biginfo_advertising_report(report) 2919 2920 async def start_discovery(self, auto_restart: bool = True) -> None: 2921 await self.send_command( 2922 HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE), 2923 check_result=True, 2924 ) 2925 2926 response = await self.send_command( 2927 HCI_Inquiry_Command( 2928 lap=HCI_GENERAL_INQUIRY_LAP, 2929 inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH, 2930 num_responses=0, # Unlimited number of responses. 2931 ) 2932 ) 2933 if response.status != HCI_Command_Status_Event.PENDING: 2934 self.discovering = False 2935 raise HCI_StatusError(response) 2936 2937 self.auto_restart_inquiry = auto_restart 2938 self.discovering = True 2939 2940 async def stop_discovery(self) -> None: 2941 if self.discovering: 2942 await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True) 2943 self.auto_restart_inquiry = True 2944 self.discovering = False 2945 2946 @host_event_handler 2947 def on_inquiry_result(self, address, class_of_device, data, rssi): 2948 self.emit( 2949 'inquiry_result', 2950 address, 2951 class_of_device, 2952 AdvertisingData.from_bytes(data), 2953 rssi, 2954 ) 2955 2956 async def set_scan_enable(self, inquiry_scan_enabled, page_scan_enabled): 2957 if inquiry_scan_enabled and page_scan_enabled: 2958 scan_enable = 0x03 2959 elif page_scan_enabled: 2960 scan_enable = 0x02 2961 elif inquiry_scan_enabled: 2962 scan_enable = 0x01 2963 else: 2964 scan_enable = 0x00 2965 2966 return await self.send_command( 2967 HCI_Write_Scan_Enable_Command(scan_enable=scan_enable) 2968 ) 2969 2970 async def set_discoverable(self, discoverable: bool = True) -> None: 2971 self.discoverable = discoverable 2972 if self.classic_enabled: 2973 # Synthesize an inquiry response if none is set already 2974 if self.inquiry_response is None: 2975 self.inquiry_response = bytes( 2976 AdvertisingData( 2977 [ 2978 ( 2979 AdvertisingData.COMPLETE_LOCAL_NAME, 2980 bytes(self.name, 'utf-8'), 2981 ) 2982 ] 2983 ) 2984 ) 2985 2986 # Update the controller 2987 await self.send_command( 2988 HCI_Write_Extended_Inquiry_Response_Command( 2989 fec_required=0, extended_inquiry_response=self.inquiry_response 2990 ), 2991 check_result=True, 2992 ) 2993 await self.set_scan_enable( 2994 inquiry_scan_enabled=self.discoverable, 2995 page_scan_enabled=self.connectable, 2996 ) 2997 2998 async def set_connectable(self, connectable: bool = True) -> None: 2999 self.connectable = connectable 3000 if self.classic_enabled: 3001 await self.set_scan_enable( 3002 inquiry_scan_enabled=self.discoverable, 3003 page_scan_enabled=self.connectable, 3004 ) 3005 3006 async def connect( 3007 self, 3008 peer_address: Union[Address, str], 3009 transport: int = BT_LE_TRANSPORT, 3010 connection_parameters_preferences: Optional[ 3011 Dict[int, ConnectionParametersPreferences] 3012 ] = None, 3013 own_address_type: int = OwnAddressType.RANDOM, 3014 timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT, 3015 always_resolve: bool = False, 3016 ) -> Connection: 3017 ''' 3018 Request a connection to a peer. 3019 3020 When the transport is BLE, this method cannot be called if there is already a 3021 pending connection. 3022 3023 Args: 3024 peer_address: 3025 Address or name of the device to connect to. 3026 If a string is passed: 3027 If the string is an address followed by a `@` suffix, the `always_resolve` 3028 argument is implicitly set to True, so the connection is made to the 3029 address after resolution. 3030 If the string is any other address, the connection is made to that 3031 address (with or without address resolution, depending on the 3032 `always_resolve` argument). 3033 For any other string, a scan for devices using that string as their name 3034 is initiated, and a connection to the first matching device's address 3035 is made. In that case, `always_resolve` is ignored. 3036 3037 connection_parameters_preferences: 3038 (BLE only, ignored for BR/EDR) 3039 * None: use the 1M PHY with default parameters 3040 * map: each entry has a PHY as key and a ConnectionParametersPreferences 3041 object as value 3042 3043 own_address_type: 3044 (BLE only, ignored for BR/EDR) 3045 OwnAddressType.RANDOM to use this device's random address, or 3046 OwnAddressType.PUBLIC to use this device's public address. 3047 3048 timeout: 3049 Maximum time to wait for a connection to be established, in seconds. 3050 Pass None for an unlimited time. 3051 3052 always_resolve: 3053 (BLE only, ignored for BR/EDR) 3054 If True, always initiate a scan, resolving addresses, and connect to the 3055 address that resolves to `peer_address`. 3056 ''' 3057 3058 # Check parameters 3059 if transport not in (BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT): 3060 raise InvalidArgumentError('invalid transport') 3061 3062 # Adjust the transport automatically if we need to 3063 if transport == BT_LE_TRANSPORT and not self.le_enabled: 3064 transport = BT_BR_EDR_TRANSPORT 3065 elif transport == BT_BR_EDR_TRANSPORT and not self.classic_enabled: 3066 transport = BT_LE_TRANSPORT 3067 3068 # Check that there isn't already a pending connection 3069 if transport == BT_LE_TRANSPORT and self.is_le_connecting: 3070 raise InvalidStateError('connection already pending') 3071 3072 if isinstance(peer_address, str): 3073 try: 3074 if transport == BT_LE_TRANSPORT and peer_address.endswith('@'): 3075 peer_address = Address.from_string_for_transport( 3076 peer_address[:-1], transport 3077 ) 3078 always_resolve = True 3079 logger.debug('forcing address resolution') 3080 else: 3081 peer_address = Address.from_string_for_transport( 3082 peer_address, transport 3083 ) 3084 except (InvalidArgumentError, ValueError): 3085 # If the address is not parsable, assume it is a name instead 3086 always_resolve = False 3087 logger.debug('looking for peer by name') 3088 peer_address = await self.find_peer_by_name( 3089 peer_address, transport 3090 ) # TODO: timeout 3091 else: 3092 # All BR/EDR addresses should be public addresses 3093 if ( 3094 transport == BT_BR_EDR_TRANSPORT 3095 and peer_address.address_type != Address.PUBLIC_DEVICE_ADDRESS 3096 ): 3097 raise InvalidArgumentError('BR/EDR addresses must be PUBLIC') 3098 3099 assert isinstance(peer_address, Address) 3100 3101 if transport == BT_LE_TRANSPORT and always_resolve: 3102 logger.debug('resolving address') 3103 peer_address = await self.find_peer_by_identity_address( 3104 peer_address 3105 ) # TODO: timeout 3106 3107 def on_connection(connection): 3108 if transport == BT_LE_TRANSPORT or ( 3109 # match BR/EDR connection event against peer address 3110 connection.transport == transport 3111 and connection.peer_address == peer_address 3112 ): 3113 pending_connection.set_result(connection) 3114 3115 def on_connection_failure(error): 3116 if transport == BT_LE_TRANSPORT or ( 3117 # match BR/EDR connection failure event against peer address 3118 error.transport == transport 3119 and error.peer_address == peer_address 3120 ): 3121 pending_connection.set_exception(error) 3122 3123 # Create a future so that we can wait for the connection's result 3124 pending_connection = asyncio.get_running_loop().create_future() 3125 self.on('connection', on_connection) 3126 self.on('connection_failure', on_connection_failure) 3127 3128 try: 3129 # Tell the controller to connect 3130 if transport == BT_LE_TRANSPORT: 3131 if connection_parameters_preferences is None: 3132 if connection_parameters_preferences is None: 3133 connection_parameters_preferences = { 3134 HCI_LE_1M_PHY: ConnectionParametersPreferences.default 3135 } 3136 3137 self.connect_own_address_type = own_address_type 3138 3139 if self.host.supports_command( 3140 HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND 3141 ): 3142 # Only keep supported PHYs 3143 phys = sorted( 3144 list( 3145 set( 3146 filter( 3147 self.supports_le_phy, 3148 connection_parameters_preferences.keys(), 3149 ) 3150 ) 3151 ) 3152 ) 3153 if not phys: 3154 raise InvalidArgumentError('at least one supported PHY needed') 3155 3156 phy_count = len(phys) 3157 initiating_phys = phy_list_to_bits(phys) 3158 3159 connection_interval_mins = [ 3160 int( 3161 connection_parameters_preferences[ 3162 phy 3163 ].connection_interval_min 3164 / 1.25 3165 ) 3166 for phy in phys 3167 ] 3168 connection_interval_maxs = [ 3169 int( 3170 connection_parameters_preferences[ 3171 phy 3172 ].connection_interval_max 3173 / 1.25 3174 ) 3175 for phy in phys 3176 ] 3177 max_latencies = [ 3178 connection_parameters_preferences[phy].max_latency 3179 for phy in phys 3180 ] 3181 supervision_timeouts = [ 3182 int( 3183 connection_parameters_preferences[phy].supervision_timeout 3184 / 10 3185 ) 3186 for phy in phys 3187 ] 3188 min_ce_lengths = [ 3189 int( 3190 connection_parameters_preferences[phy].min_ce_length / 0.625 3191 ) 3192 for phy in phys 3193 ] 3194 max_ce_lengths = [ 3195 int( 3196 connection_parameters_preferences[phy].max_ce_length / 0.625 3197 ) 3198 for phy in phys 3199 ] 3200 3201 result = await self.send_command( 3202 HCI_LE_Extended_Create_Connection_Command( 3203 initiator_filter_policy=0, 3204 own_address_type=own_address_type, 3205 peer_address_type=peer_address.address_type, 3206 peer_address=peer_address, 3207 initiating_phys=initiating_phys, 3208 scan_intervals=( 3209 int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625), 3210 ) 3211 * phy_count, 3212 scan_windows=( 3213 int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625), 3214 ) 3215 * phy_count, 3216 connection_interval_mins=connection_interval_mins, 3217 connection_interval_maxs=connection_interval_maxs, 3218 max_latencies=max_latencies, 3219 supervision_timeouts=supervision_timeouts, 3220 min_ce_lengths=min_ce_lengths, 3221 max_ce_lengths=max_ce_lengths, 3222 ) 3223 ) 3224 else: 3225 if HCI_LE_1M_PHY not in connection_parameters_preferences: 3226 raise InvalidArgumentError('1M PHY preferences required') 3227 3228 prefs = connection_parameters_preferences[HCI_LE_1M_PHY] 3229 result = await self.send_command( 3230 HCI_LE_Create_Connection_Command( 3231 le_scan_interval=int( 3232 DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625 3233 ), 3234 le_scan_window=int( 3235 DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625 3236 ), 3237 initiator_filter_policy=0, 3238 peer_address_type=peer_address.address_type, 3239 peer_address=peer_address, 3240 own_address_type=own_address_type, 3241 connection_interval_min=int( 3242 prefs.connection_interval_min / 1.25 3243 ), 3244 connection_interval_max=int( 3245 prefs.connection_interval_max / 1.25 3246 ), 3247 max_latency=prefs.max_latency, 3248 supervision_timeout=int(prefs.supervision_timeout / 10), 3249 min_ce_length=int(prefs.min_ce_length / 0.625), 3250 max_ce_length=int(prefs.max_ce_length / 0.625), 3251 ) 3252 ) 3253 else: 3254 # Save pending connection 3255 self.pending_connections[peer_address] = Connection.incomplete( 3256 self, peer_address, BT_CENTRAL_ROLE 3257 ) 3258 3259 # TODO: allow passing other settings 3260 result = await self.send_command( 3261 HCI_Create_Connection_Command( 3262 bd_addr=peer_address, 3263 packet_type=0xCC18, # FIXME: change 3264 page_scan_repetition_mode=HCI_R2_PAGE_SCAN_REPETITION_MODE, 3265 clock_offset=0x0000, 3266 allow_role_switch=0x01, 3267 reserved=0, 3268 ) 3269 ) 3270 3271 if result.status != HCI_Command_Status_Event.PENDING: 3272 raise HCI_StatusError(result) 3273 3274 # Wait for the connection process to complete 3275 if transport == BT_LE_TRANSPORT: 3276 self.le_connecting = True 3277 3278 if timeout is None: 3279 return await self.abort_on('flush', pending_connection) 3280 3281 try: 3282 return await asyncio.wait_for( 3283 asyncio.shield(pending_connection), timeout 3284 ) 3285 except asyncio.TimeoutError: 3286 if transport == BT_LE_TRANSPORT: 3287 await self.send_command(HCI_LE_Create_Connection_Cancel_Command()) 3288 else: 3289 await self.send_command( 3290 HCI_Create_Connection_Cancel_Command(bd_addr=peer_address) 3291 ) 3292 3293 try: 3294 return await self.abort_on('flush', pending_connection) 3295 except core.ConnectionError as error: 3296 raise core.TimeoutError() from error 3297 finally: 3298 self.remove_listener('connection', on_connection) 3299 self.remove_listener('connection_failure', on_connection_failure) 3300 if transport == BT_LE_TRANSPORT: 3301 self.le_connecting = False 3302 self.connect_own_address_type = None 3303 else: 3304 self.pending_connections.pop(peer_address, None) 3305 3306 async def accept( 3307 self, 3308 peer_address: Union[Address, str] = Address.ANY, 3309 role: int = BT_PERIPHERAL_ROLE, 3310 timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT, 3311 ) -> Connection: 3312 ''' 3313 Wait and accept any incoming connection or a connection from `peer_address` when 3314 set. 3315 3316 Notes: 3317 * A `connect` to the same peer will not complete this call. 3318 * The `timeout` parameter is only handled while waiting for the connection 3319 request, once received and accepted, the controller shall issue a connection 3320 complete event. 3321 ''' 3322 3323 if isinstance(peer_address, str): 3324 try: 3325 peer_address = Address(peer_address) 3326 except InvalidArgumentError: 3327 # If the address is not parsable, assume it is a name instead 3328 logger.debug('looking for peer by name') 3329 peer_address = await self.find_peer_by_name( 3330 peer_address, BT_BR_EDR_TRANSPORT 3331 ) # TODO: timeout 3332 3333 assert isinstance(peer_address, Address) 3334 3335 if peer_address == Address.NIL: 3336 raise InvalidArgumentError('accept on nil address') 3337 3338 # Create a future so that we can wait for the request 3339 pending_request_fut = asyncio.get_running_loop().create_future() 3340 3341 if peer_address == Address.ANY: 3342 self.classic_pending_accepts[Address.ANY].append(pending_request_fut) 3343 elif peer_address in self.classic_pending_accepts: 3344 raise InvalidStateError('accept connection already pending') 3345 else: 3346 self.classic_pending_accepts[peer_address] = [pending_request_fut] 3347 3348 try: 3349 # Wait for a request or a completed connection 3350 pending_request = self.abort_on('flush', pending_request_fut) 3351 result = await ( 3352 asyncio.wait_for(pending_request, timeout) 3353 if timeout 3354 else pending_request 3355 ) 3356 except Exception: 3357 # Remove future from device context 3358 if peer_address == Address.ANY: 3359 self.classic_pending_accepts[Address.ANY].remove(pending_request_fut) 3360 else: 3361 self.classic_pending_accepts.pop(peer_address) 3362 raise 3363 3364 # Result may already be a completed connection, 3365 # see `on_connection` for details 3366 if isinstance(result, Connection): 3367 return result 3368 3369 # Otherwise, result came from `on_connection_request` 3370 peer_address, _class_of_device, _link_type = result 3371 assert isinstance(peer_address, Address) 3372 3373 # Create a future so that we can wait for the connection's result 3374 pending_connection = asyncio.get_running_loop().create_future() 3375 3376 def on_connection(connection): 3377 if ( 3378 connection.transport == BT_BR_EDR_TRANSPORT 3379 and connection.peer_address == peer_address 3380 ): 3381 pending_connection.set_result(connection) 3382 3383 def on_connection_failure(error): 3384 if ( 3385 error.transport == BT_BR_EDR_TRANSPORT 3386 and error.peer_address == peer_address 3387 ): 3388 pending_connection.set_exception(error) 3389 3390 self.on('connection', on_connection) 3391 self.on('connection_failure', on_connection_failure) 3392 3393 # Save pending connection, with the Peripheral role. 3394 # Even if we requested a role switch in the HCI_Accept_Connection_Request 3395 # command, this connection is still considered Peripheral until an eventual 3396 # role change event. 3397 self.pending_connections[peer_address] = Connection.incomplete( 3398 self, peer_address, BT_PERIPHERAL_ROLE 3399 ) 3400 3401 try: 3402 # Accept connection request 3403 await self.send_command( 3404 HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role) 3405 ) 3406 3407 # Wait for connection complete 3408 return await self.abort_on('flush', pending_connection) 3409 3410 finally: 3411 self.remove_listener('connection', on_connection) 3412 self.remove_listener('connection_failure', on_connection_failure) 3413 self.pending_connections.pop(peer_address, None) 3414 3415 @asynccontextmanager 3416 async def connect_as_gatt(self, peer_address): 3417 async with AsyncExitStack() as stack: 3418 connection = await stack.enter_async_context( 3419 await self.connect(peer_address) 3420 ) 3421 peer = await stack.enter_async_context(Peer(connection)) 3422 3423 yield peer 3424 3425 @property 3426 def is_le_connecting(self): 3427 return self.le_connecting 3428 3429 @property 3430 def is_disconnecting(self): 3431 return self.disconnecting 3432 3433 async def cancel_connection(self, peer_address=None): 3434 # Low-energy: cancel ongoing connection 3435 if peer_address is None: 3436 if not self.is_le_connecting: 3437 return 3438 await self.send_command( 3439 HCI_LE_Create_Connection_Cancel_Command(), check_result=True 3440 ) 3441 3442 # BR/EDR: try to cancel to ongoing connection 3443 # NOTE: This API does not prevent from trying to cancel a connection which is 3444 # not currently being created 3445 else: 3446 if isinstance(peer_address, str): 3447 try: 3448 peer_address = Address(peer_address) 3449 except InvalidArgumentError: 3450 # If the address is not parsable, assume it is a name instead 3451 logger.debug('looking for peer by name') 3452 peer_address = await self.find_peer_by_name( 3453 peer_address, BT_BR_EDR_TRANSPORT 3454 ) # TODO: timeout 3455 3456 await self.send_command( 3457 HCI_Create_Connection_Cancel_Command(bd_addr=peer_address), 3458 check_result=True, 3459 ) 3460 3461 async def disconnect( 3462 self, connection: Union[Connection, ScoLink, CisLink], reason: int 3463 ) -> None: 3464 # Create a future so that we can wait for the disconnection's result 3465 pending_disconnection = asyncio.get_running_loop().create_future() 3466 connection.on('disconnection', pending_disconnection.set_result) 3467 connection.on('disconnection_failure', pending_disconnection.set_exception) 3468 3469 # Request a disconnection 3470 result = await self.send_command( 3471 HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) 3472 ) 3473 3474 try: 3475 if result.status != HCI_Command_Status_Event.PENDING: 3476 raise HCI_StatusError(result) 3477 3478 # Wait for the disconnection process to complete 3479 self.disconnecting = True 3480 return await self.abort_on('flush', pending_disconnection) 3481 finally: 3482 connection.remove_listener( 3483 'disconnection', pending_disconnection.set_result 3484 ) 3485 connection.remove_listener( 3486 'disconnection_failure', pending_disconnection.set_exception 3487 ) 3488 self.disconnecting = False 3489 3490 async def set_data_length(self, connection, tx_octets, tx_time) -> None: 3491 if tx_octets < 0x001B or tx_octets > 0x00FB: 3492 raise InvalidArgumentError('tx_octets must be between 0x001B and 0x00FB') 3493 3494 if tx_time < 0x0148 or tx_time > 0x4290: 3495 raise InvalidArgumentError('tx_time must be between 0x0148 and 0x4290') 3496 3497 return await self.send_command( 3498 HCI_LE_Set_Data_Length_Command( 3499 connection_handle=connection.handle, 3500 tx_octets=tx_octets, 3501 tx_time=tx_time, 3502 ), 3503 check_result=True, 3504 ) 3505 3506 async def update_connection_parameters( 3507 self, 3508 connection, 3509 connection_interval_min, 3510 connection_interval_max, 3511 max_latency, 3512 supervision_timeout, 3513 min_ce_length=0, 3514 max_ce_length=0, 3515 use_l2cap=False, 3516 ) -> None: 3517 ''' 3518 NOTE: the name of the parameters may look odd, but it just follows the names 3519 used in the Bluetooth spec. 3520 ''' 3521 3522 if use_l2cap: 3523 if connection.role != BT_PERIPHERAL_ROLE: 3524 raise InvalidStateError( 3525 'only peripheral can update connection parameters with l2cap' 3526 ) 3527 l2cap_result = ( 3528 await self.l2cap_channel_manager.update_connection_parameters( 3529 connection, 3530 connection_interval_min, 3531 connection_interval_max, 3532 max_latency, 3533 supervision_timeout, 3534 ) 3535 ) 3536 if l2cap_result != l2cap.L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT: 3537 raise ConnectionParameterUpdateError(l2cap_result) 3538 3539 result = await self.send_command( 3540 HCI_LE_Connection_Update_Command( 3541 connection_handle=connection.handle, 3542 connection_interval_min=connection_interval_min, 3543 connection_interval_max=connection_interval_max, 3544 max_latency=max_latency, 3545 supervision_timeout=supervision_timeout, 3546 min_ce_length=min_ce_length, 3547 max_ce_length=max_ce_length, 3548 ) 3549 ) 3550 if result.status != HCI_Command_Status_Event.PENDING: 3551 raise HCI_StatusError(result) 3552 3553 async def get_connection_rssi(self, connection): 3554 result = await self.send_command( 3555 HCI_Read_RSSI_Command(handle=connection.handle), check_result=True 3556 ) 3557 return result.return_parameters.rssi 3558 3559 async def get_connection_phy(self, connection): 3560 result = await self.send_command( 3561 HCI_LE_Read_PHY_Command(connection_handle=connection.handle), 3562 check_result=True, 3563 ) 3564 return (result.return_parameters.tx_phy, result.return_parameters.rx_phy) 3565 3566 async def set_connection_phy( 3567 self, connection, tx_phys=None, rx_phys=None, phy_options=None 3568 ): 3569 if not self.host.supports_command(HCI_LE_SET_PHY_COMMAND): 3570 logger.warning('ignoring request, command not supported') 3571 return 3572 3573 all_phys_bits = (1 if tx_phys is None else 0) | ( 3574 (1 if rx_phys is None else 0) << 1 3575 ) 3576 3577 result = await self.send_command( 3578 HCI_LE_Set_PHY_Command( 3579 connection_handle=connection.handle, 3580 all_phys=all_phys_bits, 3581 tx_phys=phy_list_to_bits(tx_phys), 3582 rx_phys=phy_list_to_bits(rx_phys), 3583 phy_options=0 if phy_options is None else int(phy_options), 3584 ) 3585 ) 3586 3587 if result.status != HCI_COMMAND_STATUS_PENDING: 3588 logger.warning( 3589 'HCI_LE_Set_PHY_Command failed: ' 3590 f'{HCI_Constant.error_name(result.status)}' 3591 ) 3592 raise HCI_StatusError(result) 3593 3594 async def set_default_phy(self, tx_phys=None, rx_phys=None): 3595 all_phys_bits = (1 if tx_phys is None else 0) | ( 3596 (1 if rx_phys is None else 0) << 1 3597 ) 3598 3599 return await self.send_command( 3600 HCI_LE_Set_Default_PHY_Command( 3601 all_phys=all_phys_bits, 3602 tx_phys=phy_list_to_bits(tx_phys), 3603 rx_phys=phy_list_to_bits(rx_phys), 3604 ), 3605 check_result=True, 3606 ) 3607 3608 async def transfer_periodic_sync( 3609 self, connection: Connection, sync_handle: int, service_data: int = 0 3610 ) -> None: 3611 return await self.send_command( 3612 HCI_LE_Periodic_Advertising_Sync_Transfer_Command( 3613 connection_handle=connection.handle, 3614 service_data=service_data, 3615 sync_handle=sync_handle, 3616 ), 3617 check_result=True, 3618 ) 3619 3620 async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT): 3621 """ 3622 Scan for a peer with a given name and return its address. 3623 """ 3624 3625 # Create a future to wait for an address to be found 3626 peer_address = asyncio.get_running_loop().create_future() 3627 3628 def on_peer_found(address, ad_data): 3629 local_name = ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) 3630 if local_name is None: 3631 local_name = ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True) 3632 if local_name is not None: 3633 if local_name.decode('utf-8') == name: 3634 peer_address.set_result(address) 3635 3636 listener = None 3637 was_scanning = self.scanning 3638 was_discovering = self.discovering 3639 try: 3640 if transport == BT_LE_TRANSPORT: 3641 event_name = 'advertisement' 3642 listener = self.on( 3643 event_name, 3644 lambda advertisement: on_peer_found( 3645 advertisement.address, advertisement.data 3646 ), 3647 ) 3648 3649 if not self.scanning: 3650 await self.start_scanning(filter_duplicates=True) 3651 3652 elif transport == BT_BR_EDR_TRANSPORT: 3653 event_name = 'inquiry_result' 3654 listener = self.on( 3655 event_name, 3656 lambda address, class_of_device, eir_data, rssi: on_peer_found( 3657 address, eir_data 3658 ), 3659 ) 3660 3661 if not self.discovering: 3662 await self.start_discovery() 3663 else: 3664 return None 3665 3666 return await self.abort_on('flush', peer_address) 3667 finally: 3668 if listener is not None: 3669 self.remove_listener(event_name, listener) 3670 3671 if transport == BT_LE_TRANSPORT and not was_scanning: 3672 await self.stop_scanning() 3673 elif transport == BT_BR_EDR_TRANSPORT and not was_discovering: 3674 await self.stop_discovery() 3675 3676 async def find_peer_by_identity_address(self, identity_address: Address) -> Address: 3677 """ 3678 Scan for a peer with a resolvable address that can be resolved to a given 3679 identity address. 3680 """ 3681 3682 # Create a future to wait for an address to be found 3683 peer_address = asyncio.get_running_loop().create_future() 3684 3685 def on_peer_found(address, _): 3686 if address == identity_address: 3687 if not peer_address.done(): 3688 logger.debug(f'*** Matching public address found for {address}') 3689 peer_address.set_result(address) 3690 return 3691 3692 if address.is_resolvable: 3693 resolved_address = self.address_resolver.resolve(address) 3694 if resolved_address == identity_address: 3695 if not peer_address.done(): 3696 logger.debug(f'*** Matching identity found for {address}') 3697 peer_address.set_result(address) 3698 return 3699 3700 was_scanning = self.scanning 3701 event_name = 'advertisement' 3702 listener = None 3703 try: 3704 listener = self.on( 3705 event_name, 3706 lambda advertisement: on_peer_found( 3707 advertisement.address, advertisement.data 3708 ), 3709 ) 3710 3711 if not self.scanning: 3712 await self.start_scanning(filter_duplicates=True) 3713 3714 return await self.abort_on('flush', peer_address) 3715 finally: 3716 if listener is not None: 3717 self.remove_listener(event_name, listener) 3718 3719 if not was_scanning: 3720 await self.stop_scanning() 3721 3722 @property 3723 def pairing_config_factory(self) -> Callable[[Connection], pairing.PairingConfig]: 3724 return self.smp_manager.pairing_config_factory 3725 3726 @pairing_config_factory.setter 3727 def pairing_config_factory( 3728 self, pairing_config_factory: Callable[[Connection], pairing.PairingConfig] 3729 ) -> None: 3730 self.smp_manager.pairing_config_factory = pairing_config_factory 3731 3732 @property 3733 def smp_session_proxy(self) -> Type[smp.Session]: 3734 return self.smp_manager.session_proxy 3735 3736 @smp_session_proxy.setter 3737 def smp_session_proxy(self, session_proxy: Type[smp.Session]) -> None: 3738 self.smp_manager.session_proxy = session_proxy 3739 3740 async def pair(self, connection): 3741 return await self.smp_manager.pair(connection) 3742 3743 def request_pairing(self, connection): 3744 return self.smp_manager.request_pairing(connection) 3745 3746 async def get_long_term_key( 3747 self, connection_handle: int, rand: bytes, ediv: int 3748 ) -> Optional[bytes]: 3749 if (connection := self.lookup_connection(connection_handle)) is None: 3750 return None 3751 3752 # Start by looking for the key in an SMP session 3753 ltk = self.smp_manager.get_long_term_key(connection, rand, ediv) 3754 if ltk is not None: 3755 return ltk 3756 3757 # Then look for the key in the keystore 3758 if self.keystore is not None: 3759 keys = await self.keystore.get(str(connection.peer_address)) 3760 if keys is not None: 3761 logger.debug('found keys in the key store') 3762 if keys.ltk: 3763 return keys.ltk.value 3764 3765 if connection.role == BT_CENTRAL_ROLE and keys.ltk_central: 3766 return keys.ltk_central.value 3767 3768 if connection.role == BT_PERIPHERAL_ROLE and keys.ltk_peripheral: 3769 return keys.ltk_peripheral.value 3770 return None 3771 3772 async def get_link_key(self, address: Address) -> Optional[bytes]: 3773 if self.keystore is None: 3774 return None 3775 3776 # Look for the key in the keystore 3777 keys = await self.keystore.get(str(address)) 3778 if keys is None: 3779 logger.debug(f'no keys found for {address}') 3780 return None 3781 3782 logger.debug('found keys in the key store') 3783 if keys.link_key is None: 3784 logger.warning('no link key') 3785 return None 3786 3787 return keys.link_key.value 3788 3789 # [Classic only] 3790 async def authenticate(self, connection): 3791 # Set up event handlers 3792 pending_authentication = asyncio.get_running_loop().create_future() 3793 3794 def on_authentication(): 3795 pending_authentication.set_result(None) 3796 3797 def on_authentication_failure(error_code): 3798 pending_authentication.set_exception(HCI_Error(error_code)) 3799 3800 connection.on('connection_authentication', on_authentication) 3801 connection.on('connection_authentication_failure', on_authentication_failure) 3802 3803 # Request the authentication 3804 try: 3805 result = await self.send_command( 3806 HCI_Authentication_Requested_Command( 3807 connection_handle=connection.handle 3808 ) 3809 ) 3810 if result.status != HCI_COMMAND_STATUS_PENDING: 3811 logger.warning( 3812 'HCI_Authentication_Requested_Command failed: ' 3813 f'{HCI_Constant.error_name(result.status)}' 3814 ) 3815 raise HCI_StatusError(result) 3816 3817 # Wait for the authentication to complete 3818 await connection.abort_on('disconnection', pending_authentication) 3819 finally: 3820 connection.remove_listener('connection_authentication', on_authentication) 3821 connection.remove_listener( 3822 'connection_authentication_failure', on_authentication_failure 3823 ) 3824 3825 async def encrypt(self, connection, enable=True): 3826 if not enable and connection.transport == BT_LE_TRANSPORT: 3827 raise InvalidArgumentError('`enable` parameter is classic only.') 3828 3829 # Set up event handlers 3830 pending_encryption = asyncio.get_running_loop().create_future() 3831 3832 def on_encryption_change(): 3833 pending_encryption.set_result(None) 3834 3835 def on_encryption_failure(error_code): 3836 pending_encryption.set_exception(HCI_Error(error_code)) 3837 3838 connection.on('connection_encryption_change', on_encryption_change) 3839 connection.on('connection_encryption_failure', on_encryption_failure) 3840 3841 # Request the encryption 3842 try: 3843 if connection.transport == BT_LE_TRANSPORT: 3844 # Look for a key in the key store 3845 if self.keystore is None: 3846 raise InvalidOperationError('no key store') 3847 3848 logger.debug(f'Looking up key for {connection.peer_address}') 3849 keys = await self.keystore.get(str(connection.peer_address)) 3850 if keys is None: 3851 raise InvalidOperationError('keys not found in key store') 3852 3853 if keys.ltk is not None: 3854 ltk = keys.ltk.value 3855 rand = bytes(8) 3856 ediv = 0 3857 elif keys.ltk_central is not None: 3858 ltk = keys.ltk_central.value 3859 rand = keys.ltk_central.rand 3860 ediv = keys.ltk_central.ediv 3861 else: 3862 raise InvalidOperationError('no LTK found for peer') 3863 3864 if connection.role != HCI_CENTRAL_ROLE: 3865 raise InvalidStateError('only centrals can start encryption') 3866 3867 result = await self.send_command( 3868 HCI_LE_Enable_Encryption_Command( 3869 connection_handle=connection.handle, 3870 random_number=rand, 3871 encrypted_diversifier=ediv, 3872 long_term_key=ltk, 3873 ) 3874 ) 3875 3876 if result.status != HCI_COMMAND_STATUS_PENDING: 3877 logger.warning( 3878 'HCI_LE_Enable_Encryption_Command failed: ' 3879 f'{HCI_Constant.error_name(result.status)}' 3880 ) 3881 raise HCI_StatusError(result) 3882 else: 3883 result = await self.send_command( 3884 HCI_Set_Connection_Encryption_Command( 3885 connection_handle=connection.handle, 3886 encryption_enable=0x01 if enable else 0x00, 3887 ) 3888 ) 3889 3890 if result.status != HCI_COMMAND_STATUS_PENDING: 3891 logger.warning( 3892 'HCI_Set_Connection_Encryption_Command failed: ' 3893 f'{HCI_Constant.error_name(result.status)}' 3894 ) 3895 raise HCI_StatusError(result) 3896 3897 # Wait for the result 3898 await connection.abort_on('disconnection', pending_encryption) 3899 finally: 3900 connection.remove_listener( 3901 'connection_encryption_change', on_encryption_change 3902 ) 3903 connection.remove_listener( 3904 'connection_encryption_failure', on_encryption_failure 3905 ) 3906 3907 async def update_keys(self, address: str, keys: PairingKeys) -> None: 3908 if self.keystore is None: 3909 return 3910 3911 try: 3912 await self.keystore.update(address, keys) 3913 await self.refresh_resolving_list() 3914 except Exception as error: 3915 logger.warning(f'!!! error while storing keys: {error}') 3916 else: 3917 self.emit('key_store_update') 3918 3919 # [Classic only] 3920 async def switch_role(self, connection: Connection, role: int): 3921 pending_role_change = asyncio.get_running_loop().create_future() 3922 3923 def on_role_change(new_role): 3924 pending_role_change.set_result(new_role) 3925 3926 def on_role_change_failure(error_code): 3927 pending_role_change.set_exception(HCI_Error(error_code)) 3928 3929 connection.on('role_change', on_role_change) 3930 connection.on('role_change_failure', on_role_change_failure) 3931 3932 try: 3933 result = await self.send_command( 3934 HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role) 3935 ) 3936 if result.status != HCI_COMMAND_STATUS_PENDING: 3937 logger.warning( 3938 'HCI_Switch_Role_Command failed: ' 3939 f'{HCI_Constant.error_name(result.status)}' 3940 ) 3941 raise HCI_StatusError(result) 3942 await connection.abort_on('disconnection', pending_role_change) 3943 finally: 3944 connection.remove_listener('role_change', on_role_change) 3945 connection.remove_listener('role_change_failure', on_role_change_failure) 3946 3947 # [Classic only] 3948 async def request_remote_name(self, remote: Union[Address, Connection]) -> str: 3949 # Set up event handlers 3950 pending_name = asyncio.get_running_loop().create_future() 3951 3952 peer_address = remote if isinstance(remote, Address) else remote.peer_address 3953 3954 handler = self.on( 3955 'remote_name', 3956 lambda address, remote_name: ( 3957 pending_name.set_result(remote_name) 3958 if address == peer_address 3959 else None 3960 ), 3961 ) 3962 failure_handler = self.on( 3963 'remote_name_failure', 3964 lambda address, error_code: ( 3965 pending_name.set_exception(HCI_Error(error_code)) 3966 if address == peer_address 3967 else None 3968 ), 3969 ) 3970 3971 try: 3972 result = await self.send_command( 3973 HCI_Remote_Name_Request_Command( 3974 bd_addr=peer_address, 3975 page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2, 3976 reserved=0, 3977 clock_offset=0, # TODO investigate non-0 values 3978 ) 3979 ) 3980 3981 if result.status != HCI_COMMAND_STATUS_PENDING: 3982 logger.warning( 3983 'HCI_Remote_Name_Request_Command failed: ' 3984 f'{HCI_Constant.error_name(result.status)}' 3985 ) 3986 raise HCI_StatusError(result) 3987 3988 # Wait for the result 3989 return await self.abort_on('flush', pending_name) 3990 finally: 3991 self.remove_listener('remote_name', handler) 3992 self.remove_listener('remote_name_failure', failure_handler) 3993 3994 # [LE only] 3995 @experimental('Only for testing.') 3996 async def setup_cig( 3997 self, 3998 cig_id: int, 3999 cis_id: List[int], 4000 sdu_interval: Tuple[int, int], 4001 framing: int, 4002 max_sdu: Tuple[int, int], 4003 retransmission_number: int, 4004 max_transport_latency: Tuple[int, int], 4005 ) -> List[int]: 4006 """Sends HCI_LE_Set_CIG_Parameters_Command. 4007 4008 Args: 4009 cig_id: CIG_ID. 4010 cis_id: CID ID list. 4011 sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental). 4012 framing: Un-framing(0) or Framing(1). 4013 max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental). 4014 retransmission_number: retransmission_number. 4015 max_transport_latency: Max transport latencies of 4016 (Central->Peripheral, Peripheral->Cental). 4017 4018 Returns: 4019 List of created CIS handles corresponding to the same order of [cid_id]. 4020 """ 4021 num_cis = len(cis_id) 4022 4023 response = await self.send_command( 4024 HCI_LE_Set_CIG_Parameters_Command( 4025 cig_id=cig_id, 4026 sdu_interval_c_to_p=sdu_interval[0], 4027 sdu_interval_p_to_c=sdu_interval[1], 4028 worst_case_sca=0x00, # 251-500 ppm 4029 packing=0x00, # Sequential 4030 framing=framing, 4031 max_transport_latency_c_to_p=max_transport_latency[0], 4032 max_transport_latency_p_to_c=max_transport_latency[1], 4033 cis_id=cis_id, 4034 max_sdu_c_to_p=[max_sdu[0]] * num_cis, 4035 max_sdu_p_to_c=[max_sdu[1]] * num_cis, 4036 phy_c_to_p=[HCI_LE_2M_PHY] * num_cis, 4037 phy_p_to_c=[HCI_LE_2M_PHY] * num_cis, 4038 rtn_c_to_p=[retransmission_number] * num_cis, 4039 rtn_p_to_c=[retransmission_number] * num_cis, 4040 ), 4041 check_result=True, 4042 ) 4043 4044 # Ideally, we should manage CIG lifecycle, but they are not useful for Unicast 4045 # Server, so here it only provides a basic functionality for testing. 4046 cis_handles = response.return_parameters.connection_handle[:] 4047 for id, cis_handle in zip(cis_id, cis_handles): 4048 self._pending_cis[cis_handle] = (id, cig_id) 4049 4050 return cis_handles 4051 4052 # [LE only] 4053 @experimental('Only for testing.') 4054 async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> List[CisLink]: 4055 for cis_handle, acl_handle in cis_acl_pairs: 4056 acl_connection = self.lookup_connection(acl_handle) 4057 assert acl_connection 4058 cis_id, cig_id = self._pending_cis.pop(cis_handle) 4059 self.cis_links[cis_handle] = CisLink( 4060 device=self, 4061 acl_connection=acl_connection, 4062 handle=cis_handle, 4063 cis_id=cis_id, 4064 cig_id=cig_id, 4065 ) 4066 4067 with closing(EventWatcher()) as watcher: 4068 pending_cis_establishments = { 4069 cis_handle: asyncio.get_running_loop().create_future() 4070 for cis_handle, _ in cis_acl_pairs 4071 } 4072 4073 def on_cis_establishment(cis_link: CisLink) -> None: 4074 if pending_future := pending_cis_establishments.get(cis_link.handle): 4075 pending_future.set_result(cis_link) 4076 4077 def on_cis_establishment_failure(cis_handle: int, status: int) -> None: 4078 if pending_future := pending_cis_establishments.get(cis_handle): 4079 pending_future.set_exception(HCI_Error(status)) 4080 4081 watcher.on(self, 'cis_establishment', on_cis_establishment) 4082 watcher.on(self, 'cis_establishment_failure', on_cis_establishment_failure) 4083 await self.send_command( 4084 HCI_LE_Create_CIS_Command( 4085 cis_connection_handle=[p[0] for p in cis_acl_pairs], 4086 acl_connection_handle=[p[1] for p in cis_acl_pairs], 4087 ), 4088 check_result=True, 4089 ) 4090 4091 return await asyncio.gather(*pending_cis_establishments.values()) 4092 4093 # [LE only] 4094 @experimental('Only for testing.') 4095 async def accept_cis_request(self, handle: int) -> CisLink: 4096 """[LE Only] Accepts an incoming CIS request. 4097 4098 When the specified CIS handle is already created, this method returns the 4099 existed CIS link object immediately. 4100 4101 Args: 4102 handle: CIS handle to accept. 4103 4104 Returns: 4105 CIS link object on the given handle. 4106 """ 4107 if not (cis_link := self.cis_links.get(handle)): 4108 raise InvalidStateError(f'No pending CIS request of handle {handle}') 4109 4110 # There might be multiple ASE sharing a CIS channel. 4111 # If one of them has accepted the request, the others should just leverage it. 4112 async with self._cis_lock: 4113 if cis_link.state == CisLink.State.ESTABLISHED: 4114 return cis_link 4115 4116 with closing(EventWatcher()) as watcher: 4117 pending_establishment = asyncio.get_running_loop().create_future() 4118 4119 def on_establishment() -> None: 4120 pending_establishment.set_result(None) 4121 4122 def on_establishment_failure(status: int) -> None: 4123 pending_establishment.set_exception(HCI_Error(status)) 4124 4125 watcher.on(cis_link, 'establishment', on_establishment) 4126 watcher.on(cis_link, 'establishment_failure', on_establishment_failure) 4127 4128 await self.send_command( 4129 HCI_LE_Accept_CIS_Request_Command(connection_handle=handle), 4130 check_result=True, 4131 ) 4132 4133 await pending_establishment 4134 return cis_link 4135 4136 # Mypy believes this is reachable when context is an ExitStack. 4137 raise UnreachableError() 4138 4139 # [LE only] 4140 @experimental('Only for testing.') 4141 async def reject_cis_request( 4142 self, 4143 handle: int, 4144 reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, 4145 ) -> None: 4146 await self.send_command( 4147 HCI_LE_Reject_CIS_Request_Command(connection_handle=handle, reason=reason), 4148 check_result=True, 4149 ) 4150 4151 async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask: 4152 """[LE Only] Reads remote LE supported features. 4153 4154 Args: 4155 handle: connection handle to read LE features. 4156 4157 Returns: 4158 LE features supported by the remote device. 4159 """ 4160 with closing(EventWatcher()) as watcher: 4161 read_feature_future: asyncio.Future[LeFeatureMask] = ( 4162 asyncio.get_running_loop().create_future() 4163 ) 4164 4165 def on_le_remote_features(handle: int, features: int): 4166 if handle == connection.handle: 4167 read_feature_future.set_result(LeFeatureMask(features)) 4168 4169 def on_failure(handle: int, status: int): 4170 if handle == connection.handle: 4171 read_feature_future.set_exception(HCI_Error(status)) 4172 4173 watcher.on(self.host, 'le_remote_features', on_le_remote_features) 4174 watcher.on(self.host, 'le_remote_features_failure', on_failure) 4175 await self.send_command( 4176 HCI_LE_Read_Remote_Features_Command( 4177 connection_handle=connection.handle 4178 ), 4179 check_result=True, 4180 ) 4181 return await read_feature_future 4182 4183 @host_event_handler 4184 def on_flush(self): 4185 self.emit('flush') 4186 for _, connection in self.connections.items(): 4187 connection.emit('disconnection', 0) 4188 self.connections = {} 4189 4190 # [Classic only] 4191 @host_event_handler 4192 def on_link_key(self, bd_addr, link_key, key_type): 4193 # Store the keys in the key store 4194 if self.keystore: 4195 authenticated = key_type in ( 4196 HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, 4197 HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, 4198 ) 4199 pairing_keys = PairingKeys() 4200 pairing_keys.link_key = PairingKeys.Key( 4201 value=link_key, authenticated=authenticated 4202 ) 4203 4204 self.abort_on('flush', self.update_keys(str(bd_addr), pairing_keys)) 4205 4206 if connection := self.find_connection_by_bd_addr( 4207 bd_addr, transport=BT_BR_EDR_TRANSPORT 4208 ): 4209 connection.link_key_type = key_type 4210 4211 def add_service(self, service): 4212 self.gatt_server.add_service(service) 4213 4214 def add_services(self, services): 4215 self.gatt_server.add_services(services) 4216 4217 def add_default_services(self, generic_access_service=True): 4218 # Add a GAP Service if requested 4219 if generic_access_service: 4220 self.gatt_server.add_service(GenericAccessService(self.name)) 4221 4222 async def notify_subscriber(self, connection, attribute, value=None, force=False): 4223 await self.gatt_server.notify_subscriber(connection, attribute, value, force) 4224 4225 async def notify_subscribers(self, attribute, value=None, force=False): 4226 await self.gatt_server.notify_subscribers(attribute, value, force) 4227 4228 async def indicate_subscriber(self, connection, attribute, value=None, force=False): 4229 await self.gatt_server.indicate_subscriber(connection, attribute, value, force) 4230 4231 async def indicate_subscribers(self, attribute, value=None, force=False): 4232 await self.gatt_server.indicate_subscribers(attribute, value, force) 4233 4234 @host_event_handler 4235 def on_advertising_set_termination( 4236 self, 4237 status, 4238 advertising_handle, 4239 connection_handle, 4240 number_of_completed_extended_advertising_events, 4241 ): 4242 # Legacy advertising set is also one of extended advertising sets. 4243 if not ( 4244 advertising_set := self.extended_advertising_sets.get(advertising_handle) 4245 ): 4246 logger.warning(f'advertising set {advertising_handle} not found') 4247 return 4248 4249 advertising_set.on_termination(status) 4250 4251 if status != HCI_SUCCESS: 4252 logger.debug( 4253 f'advertising set {advertising_handle} ' 4254 f'terminated with status {status}' 4255 ) 4256 return 4257 4258 if connection := self.lookup_connection(connection_handle): 4259 # We have already received the connection complete event. 4260 self._complete_le_extended_advertising_connection( 4261 connection, advertising_set 4262 ) 4263 return 4264 4265 # Associate the connection handle with the advertising set, the connection 4266 # will complete later. 4267 logger.debug( 4268 f'the connection with handle {connection_handle:04X} will complete later' 4269 ) 4270 self.connecting_extended_advertising_sets[connection_handle] = advertising_set 4271 4272 def _complete_le_extended_advertising_connection( 4273 self, connection: Connection, advertising_set: AdvertisingSet 4274 ) -> None: 4275 # Update the connection address. 4276 connection.self_address = ( 4277 advertising_set.random_address 4278 if advertising_set.random_address is not None 4279 and advertising_set.advertising_parameters.own_address_type 4280 in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) 4281 else self.public_address 4282 ) 4283 4284 if advertising_set.advertising_parameters.own_address_type in ( 4285 OwnAddressType.RANDOM, 4286 OwnAddressType.PUBLIC, 4287 ): 4288 connection.self_resolvable_address = None 4289 4290 # Setup auto-restart of the advertising set if needed. 4291 if advertising_set.auto_restart: 4292 connection.once( 4293 'disconnection', 4294 lambda _: self.abort_on('flush', advertising_set.start()), 4295 ) 4296 4297 self._emit_le_connection(connection) 4298 4299 def _emit_le_connection(self, connection: Connection) -> None: 4300 # If supported, read which PHY we're connected with before 4301 # notifying listeners of the new connection. 4302 if self.host.supports_command(HCI_LE_READ_PHY_COMMAND): 4303 4304 async def read_phy(): 4305 result = await self.send_command( 4306 HCI_LE_Read_PHY_Command(connection_handle=connection.handle), 4307 check_result=True, 4308 ) 4309 connection.phy = ConnectionPHY( 4310 result.return_parameters.tx_phy, result.return_parameters.rx_phy 4311 ) 4312 # Emit an event to notify listeners of the new connection 4313 self.emit('connection', connection) 4314 4315 # Do so asynchronously to not block the current event handler 4316 connection.abort_on('disconnection', read_phy()) 4317 4318 return 4319 4320 self.emit('connection', connection) 4321 4322 @host_event_handler 4323 def on_connection( 4324 self, 4325 connection_handle: int, 4326 transport: int, 4327 peer_address: Address, 4328 self_resolvable_address: Optional[Address], 4329 peer_resolvable_address: Optional[Address], 4330 role: int, 4331 connection_parameters: ConnectionParameters, 4332 ) -> None: 4333 # Convert all-zeros addresses into None. 4334 if self_resolvable_address == Address.ANY_RANDOM: 4335 self_resolvable_address = None 4336 if ( 4337 peer_resolvable_address == Address.ANY_RANDOM 4338 or not peer_address.is_resolved 4339 ): 4340 peer_resolvable_address = None 4341 4342 logger.debug( 4343 f'*** Connection: [0x{connection_handle:04X}] ' 4344 f'{peer_address} {"" if role is None else HCI_Constant.role_name(role)}' 4345 ) 4346 if connection_handle in self.connections: 4347 logger.warning( 4348 'new connection reuses the same handle as a previous connection' 4349 ) 4350 4351 if transport == BT_BR_EDR_TRANSPORT: 4352 # Create a new connection 4353 connection = self.pending_connections.pop(peer_address) 4354 connection.complete(connection_handle, connection_parameters) 4355 self.connections[connection_handle] = connection 4356 4357 # Emit an event to notify listeners of the new connection 4358 self.emit('connection', connection) 4359 4360 return 4361 4362 if peer_resolvable_address is None: 4363 # Resolve the peer address if we can 4364 if self.address_resolver: 4365 if peer_address.is_resolvable: 4366 resolved_address = self.address_resolver.resolve(peer_address) 4367 if resolved_address is not None: 4368 logger.debug(f'*** Address resolved as {resolved_address}') 4369 peer_resolvable_address = peer_address 4370 peer_address = resolved_address 4371 4372 self_address = None 4373 own_address_type: Optional[int] = None 4374 if role == HCI_CENTRAL_ROLE: 4375 own_address_type = self.connect_own_address_type 4376 assert own_address_type is not None 4377 else: 4378 if self.supports_le_extended_advertising: 4379 # We'll know the address when the advertising set terminates, 4380 # Use a temporary placeholder value for self_address. 4381 self_address = Address.ANY_RANDOM 4382 else: 4383 # We were connected via a legacy advertisement. 4384 if self.legacy_advertiser: 4385 own_address_type = self.legacy_advertiser.own_address_type 4386 else: 4387 # This should not happen, but just in case, pick a default. 4388 logger.warning("connection without an advertiser") 4389 self_address = self.random_address 4390 4391 if self_address is None: 4392 self_address = ( 4393 self.public_address 4394 if own_address_type 4395 in ( 4396 OwnAddressType.PUBLIC, 4397 OwnAddressType.RESOLVABLE_OR_PUBLIC, 4398 ) 4399 else self.random_address 4400 ) 4401 4402 # Some controllers may return local resolvable address even not using address 4403 # generation offloading. Ignore the value to prevent SMP failure. 4404 if own_address_type in (OwnAddressType.RANDOM, OwnAddressType.PUBLIC): 4405 self_resolvable_address = None 4406 4407 # Create a connection. 4408 connection = Connection( 4409 self, 4410 connection_handle, 4411 transport, 4412 self_address, 4413 self_resolvable_address, 4414 peer_address, 4415 peer_resolvable_address, 4416 role, 4417 connection_parameters, 4418 ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), 4419 ) 4420 self.connections[connection_handle] = connection 4421 4422 if role == HCI_PERIPHERAL_ROLE and self.legacy_advertiser: 4423 if self.legacy_advertiser.auto_restart: 4424 advertiser = self.legacy_advertiser 4425 connection.once( 4426 'disconnection', 4427 lambda _: self.abort_on('flush', advertiser.start()), 4428 ) 4429 else: 4430 self.legacy_advertiser = None 4431 4432 if role == HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising: 4433 # We can emit now, we have all the info we need 4434 self._emit_le_connection(connection) 4435 return 4436 4437 if role == HCI_PERIPHERAL_ROLE and self.supports_le_extended_advertising: 4438 if advertising_set := self.connecting_extended_advertising_sets.pop( 4439 connection_handle, None 4440 ): 4441 # We have already received the advertising set termination event. 4442 self._complete_le_extended_advertising_connection( 4443 connection, advertising_set 4444 ) 4445 4446 @host_event_handler 4447 def on_connection_failure(self, transport, peer_address, error_code): 4448 logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}') 4449 4450 # For directed advertising, this means a timeout 4451 if ( 4452 transport == BT_LE_TRANSPORT 4453 and self.legacy_advertiser 4454 and self.legacy_advertiser.advertising_type.is_directed 4455 ): 4456 self.legacy_advertiser = None 4457 4458 # Notify listeners 4459 error = core.ConnectionError( 4460 error_code, 4461 transport, 4462 peer_address, 4463 'hci', 4464 HCI_Constant.error_name(error_code), 4465 ) 4466 self.emit('connection_failure', error) 4467 4468 # FIXME: Explore a delegate-model for BR/EDR wait connection #56. 4469 @host_event_handler 4470 def on_connection_request(self, bd_addr, class_of_device, link_type): 4471 logger.debug(f'*** Connection request: {bd_addr}') 4472 4473 # Handle SCO request. 4474 if link_type in ( 4475 HCI_Connection_Complete_Event.SCO_LINK_TYPE, 4476 HCI_Connection_Complete_Event.ESCO_LINK_TYPE, 4477 ): 4478 if connection := self.find_connection_by_bd_addr( 4479 bd_addr, transport=BT_BR_EDR_TRANSPORT 4480 ): 4481 self.emit('sco_request', connection, link_type) 4482 else: 4483 logger.error(f'SCO request from a non-connected device {bd_addr}') 4484 return 4485 4486 # match a pending future using `bd_addr` 4487 elif bd_addr in self.classic_pending_accepts: 4488 future, *_ = self.classic_pending_accepts.pop(bd_addr) 4489 future.set_result((bd_addr, class_of_device, link_type)) 4490 4491 # match first pending future for ANY address 4492 elif len(self.classic_pending_accepts[Address.ANY]) > 0: 4493 future = self.classic_pending_accepts[Address.ANY].pop(0) 4494 future.set_result((bd_addr, class_of_device, link_type)) 4495 4496 # device configuration is set to accept any incoming connection 4497 elif self.classic_accept_any: 4498 # Save pending connection 4499 self.pending_connections[bd_addr] = Connection.incomplete( 4500 self, bd_addr, BT_PERIPHERAL_ROLE 4501 ) 4502 4503 self.host.send_command_sync( 4504 HCI_Accept_Connection_Request_Command( 4505 bd_addr=bd_addr, role=0x01 # Remain the peripheral 4506 ) 4507 ) 4508 4509 # reject incoming connection 4510 else: 4511 self.host.send_command_sync( 4512 HCI_Reject_Connection_Request_Command( 4513 bd_addr=bd_addr, 4514 reason=HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, 4515 ) 4516 ) 4517 4518 @host_event_handler 4519 def on_disconnection(self, connection_handle: int, reason: int) -> None: 4520 if connection := self.connections.pop(connection_handle, None): 4521 logger.debug( 4522 f'*** Disconnection: [0x{connection.handle:04X}] ' 4523 f'{connection.peer_address} as {connection.role_name}, reason={reason}' 4524 ) 4525 connection.emit('disconnection', reason) 4526 4527 # Cleanup subsystems that maintain per-connection state 4528 self.gatt_server.on_disconnection(connection) 4529 elif sco_link := self.sco_links.pop(connection_handle, None): 4530 sco_link.emit('disconnection', reason) 4531 elif cis_link := self.cis_links.pop(connection_handle, None): 4532 cis_link.emit('disconnection', reason) 4533 else: 4534 logger.error( 4535 f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***' 4536 ) 4537 4538 @host_event_handler 4539 @with_connection_from_handle 4540 def on_disconnection_failure(self, connection, error_code): 4541 logger.debug(f'*** Disconnection failed: {error_code}') 4542 error = core.ConnectionError( 4543 error_code, 4544 connection.transport, 4545 connection.peer_address, 4546 'hci', 4547 HCI_Constant.error_name(error_code), 4548 ) 4549 connection.emit('disconnection_failure', error) 4550 4551 @host_event_handler 4552 @AsyncRunner.run_in_task() 4553 async def on_inquiry_complete(self): 4554 if self.auto_restart_inquiry: 4555 # Inquire again 4556 await self.start_discovery(auto_restart=True) 4557 else: 4558 self.auto_restart_inquiry = True 4559 self.discovering = False 4560 self.emit('inquiry_complete') 4561 4562 @host_event_handler 4563 @with_connection_from_handle 4564 def on_connection_authentication(self, connection): 4565 logger.debug( 4566 f'*** Connection Authentication: [0x{connection.handle:04X}] ' 4567 f'{connection.peer_address} as {connection.role_name}' 4568 ) 4569 connection.authenticated = True 4570 connection.emit('connection_authentication') 4571 4572 @host_event_handler 4573 @with_connection_from_handle 4574 def on_connection_authentication_failure(self, connection, error): 4575 logger.debug( 4576 f'*** Connection Authentication Failure: [0x{connection.handle:04X}] ' 4577 f'{connection.peer_address} as {connection.role_name}, error={error}' 4578 ) 4579 connection.emit('connection_authentication_failure', error) 4580 4581 # [Classic only] 4582 @host_event_handler 4583 @with_connection_from_address 4584 def on_authentication_io_capability_request(self, connection): 4585 # Ask what the pairing config should be for this connection 4586 pairing_config = self.pairing_config_factory(connection) 4587 4588 # Compute the authentication requirements 4589 authentication_requirements = ( 4590 # No Bonding 4591 ( 4592 HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 4593 HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 4594 ), 4595 # General Bonding 4596 ( 4597 HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 4598 HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 4599 ), 4600 )[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0] 4601 4602 # Respond 4603 self.host.send_command_sync( 4604 HCI_IO_Capability_Request_Reply_Command( 4605 bd_addr=connection.peer_address, 4606 io_capability=pairing_config.delegate.classic_io_capability, 4607 oob_data_present=0x00, # Not present 4608 authentication_requirements=authentication_requirements, 4609 ) 4610 ) 4611 4612 # [Classic only] 4613 @host_event_handler 4614 @with_connection_from_address 4615 def on_authentication_io_capability_response( 4616 self, connection, io_capability, authentication_requirements 4617 ): 4618 connection.peer_pairing_io_capability = io_capability 4619 connection.peer_pairing_authentication_requirements = ( 4620 authentication_requirements 4621 ) 4622 4623 # [Classic only] 4624 @host_event_handler 4625 @with_connection_from_address 4626 def on_authentication_user_confirmation_request(self, connection, code) -> None: 4627 # Ask what the pairing config should be for this connection 4628 pairing_config = self.pairing_config_factory(connection) 4629 io_capability = pairing_config.delegate.classic_io_capability 4630 peer_io_capability = connection.peer_pairing_io_capability 4631 4632 async def confirm() -> bool: 4633 # Ask the user to confirm the pairing, without display 4634 return await pairing_config.delegate.confirm() 4635 4636 async def auto_confirm() -> bool: 4637 # Ask the user to auto-confirm the pairing, without display 4638 return await pairing_config.delegate.confirm(auto=True) 4639 4640 async def display_confirm() -> bool: 4641 # Display the code and ask the user to compare 4642 return await pairing_config.delegate.compare_numbers(code, digits=6) 4643 4644 async def display_auto_confirm() -> bool: 4645 # Display the code to the user and ask the delegate to auto-confirm 4646 await pairing_config.delegate.display_number(code, digits=6) 4647 return await pairing_config.delegate.confirm(auto=True) 4648 4649 async def na() -> bool: 4650 raise UnreachableError() 4651 4652 # See Bluetooth spec @ Vol 3, Part C 5.2.2.6 4653 methods = { 4654 HCI_DISPLAY_ONLY_IO_CAPABILITY: { 4655 HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, 4656 HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, 4657 HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, 4658 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, 4659 }, 4660 HCI_DISPLAY_YES_NO_IO_CAPABILITY: { 4661 HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, 4662 HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, 4663 HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, 4664 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, 4665 }, 4666 HCI_KEYBOARD_ONLY_IO_CAPABILITY: { 4667 HCI_DISPLAY_ONLY_IO_CAPABILITY: na, 4668 HCI_DISPLAY_YES_NO_IO_CAPABILITY: na, 4669 HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, 4670 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, 4671 }, 4672 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: { 4673 HCI_DISPLAY_ONLY_IO_CAPABILITY: confirm, 4674 HCI_DISPLAY_YES_NO_IO_CAPABILITY: confirm, 4675 HCI_KEYBOARD_ONLY_IO_CAPABILITY: auto_confirm, 4676 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, 4677 }, 4678 } 4679 4680 method = methods[peer_io_capability][io_capability] 4681 4682 async def reply() -> None: 4683 try: 4684 if await connection.abort_on('disconnection', method()): 4685 await self.host.send_command( 4686 HCI_User_Confirmation_Request_Reply_Command( 4687 bd_addr=connection.peer_address 4688 ) 4689 ) 4690 return 4691 except Exception as error: 4692 logger.warning(f'exception while confirming: {error}') 4693 4694 await self.host.send_command( 4695 HCI_User_Confirmation_Request_Negative_Reply_Command( 4696 bd_addr=connection.peer_address 4697 ) 4698 ) 4699 4700 AsyncRunner.spawn(reply()) 4701 4702 # [Classic only] 4703 @host_event_handler 4704 @with_connection_from_address 4705 def on_authentication_user_passkey_request(self, connection) -> None: 4706 # Ask what the pairing config should be for this connection 4707 pairing_config = self.pairing_config_factory(connection) 4708 4709 async def reply() -> None: 4710 try: 4711 number = await connection.abort_on( 4712 'disconnection', pairing_config.delegate.get_number() 4713 ) 4714 if number is not None: 4715 await self.host.send_command( 4716 HCI_User_Passkey_Request_Reply_Command( 4717 bd_addr=connection.peer_address, numeric_value=number 4718 ) 4719 ) 4720 return 4721 except Exception as error: 4722 logger.warning(f'exception while asking for pass-key: {error}') 4723 4724 await self.host.send_command( 4725 HCI_User_Passkey_Request_Negative_Reply_Command( 4726 bd_addr=connection.peer_address 4727 ) 4728 ) 4729 4730 AsyncRunner.spawn(reply()) 4731 4732 # [Classic only] 4733 @host_event_handler 4734 @with_connection_from_address 4735 def on_pin_code_request(self, connection): 4736 # Classic legacy pairing 4737 # Ask what the pairing config should be for this connection 4738 pairing_config = self.pairing_config_factory(connection) 4739 io_capability = pairing_config.delegate.classic_io_capability 4740 4741 # Respond 4742 if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY: 4743 # Ask the user to enter a string 4744 async def get_pin_code(): 4745 pin_code = await connection.abort_on( 4746 'disconnection', pairing_config.delegate.get_string(16) 4747 ) 4748 4749 if pin_code is not None: 4750 pin_code = bytes(pin_code, encoding='utf-8') 4751 pin_code_len = len(pin_code) 4752 assert 0 < pin_code_len <= 16, "pin_code should be 1-16 bytes" 4753 await self.host.send_command( 4754 HCI_PIN_Code_Request_Reply_Command( 4755 bd_addr=connection.peer_address, 4756 pin_code_length=pin_code_len, 4757 pin_code=pin_code, 4758 ) 4759 ) 4760 else: 4761 logger.debug("delegate.get_string() returned None") 4762 await self.host.send_command( 4763 HCI_PIN_Code_Request_Negative_Reply_Command( 4764 bd_addr=connection.peer_address 4765 ) 4766 ) 4767 4768 asyncio.create_task(get_pin_code()) 4769 else: 4770 self.host.send_command_sync( 4771 HCI_PIN_Code_Request_Negative_Reply_Command( 4772 bd_addr=connection.peer_address 4773 ) 4774 ) 4775 4776 # [Classic only] 4777 @host_event_handler 4778 @with_connection_from_address 4779 def on_authentication_user_passkey_notification(self, connection, passkey): 4780 # Ask what the pairing config should be for this connection 4781 pairing_config = self.pairing_config_factory(connection) 4782 4783 # Show the passkey to the user 4784 connection.abort_on( 4785 'disconnection', pairing_config.delegate.display_number(passkey) 4786 ) 4787 4788 # [Classic only] 4789 @host_event_handler 4790 @try_with_connection_from_address 4791 def on_remote_name(self, connection: Connection, address, remote_name): 4792 # Try to decode the name 4793 try: 4794 remote_name = remote_name.decode('utf-8') 4795 if connection: 4796 connection.peer_name = remote_name 4797 connection.emit('remote_name') 4798 self.emit('remote_name', address, remote_name) 4799 except UnicodeDecodeError as error: 4800 logger.warning('peer name is not valid UTF-8') 4801 if connection: 4802 connection.emit('remote_name_failure', error) 4803 else: 4804 self.emit('remote_name_failure', address, error) 4805 4806 # [Classic only] 4807 @host_event_handler 4808 @try_with_connection_from_address 4809 def on_remote_name_failure(self, connection: Connection, address, error): 4810 if connection: 4811 connection.emit('remote_name_failure', error) 4812 self.emit('remote_name_failure', address, error) 4813 4814 # [Classic only] 4815 @host_event_handler 4816 @with_connection_from_address 4817 @experimental('Only for testing.') 4818 def on_sco_connection( 4819 self, acl_connection: Connection, sco_handle: int, link_type: int 4820 ) -> None: 4821 logger.debug( 4822 f'*** SCO connected: {acl_connection.peer_address}, ' 4823 f'sco_handle=[0x{sco_handle:04X}], ' 4824 f'link_type=[0x{link_type:02X}] ***' 4825 ) 4826 sco_link = self.sco_links[sco_handle] = ScoLink( 4827 device=self, 4828 acl_connection=acl_connection, 4829 handle=sco_handle, 4830 link_type=link_type, 4831 ) 4832 self.emit('sco_connection', sco_link) 4833 4834 # [Classic only] 4835 @host_event_handler 4836 @with_connection_from_address 4837 @experimental('Only for testing.') 4838 def on_sco_connection_failure( 4839 self, acl_connection: Connection, status: int 4840 ) -> None: 4841 logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***') 4842 self.emit('sco_connection_failure') 4843 4844 # [Classic only] 4845 @host_event_handler 4846 @experimental('Only for testing') 4847 def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None: 4848 if (sco_link := self.sco_links.get(sco_handle)) and sco_link.sink: 4849 sco_link.sink(packet) 4850 4851 # [LE only] 4852 @host_event_handler 4853 @with_connection_from_handle 4854 @experimental('Only for testing') 4855 def on_cis_request( 4856 self, 4857 acl_connection: Connection, 4858 cis_handle: int, 4859 cig_id: int, 4860 cis_id: int, 4861 ) -> None: 4862 logger.debug( 4863 f'*** CIS Request ' 4864 f'acl_handle=[0x{acl_connection.handle:04X}]{acl_connection.peer_address}, ' 4865 f'cis_handle=[0x{cis_handle:04X}], ' 4866 f'cig_id=[0x{cig_id:02X}], ' 4867 f'cis_id=[0x{cis_id:02X}] ***' 4868 ) 4869 # LE_CIS_Established event doesn't provide info, so we must store them here. 4870 self.cis_links[cis_handle] = CisLink( 4871 device=self, 4872 acl_connection=acl_connection, 4873 handle=cis_handle, 4874 cig_id=cig_id, 4875 cis_id=cis_id, 4876 ) 4877 self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id) 4878 4879 # [LE only] 4880 @host_event_handler 4881 @experimental('Only for testing') 4882 def on_cis_establishment(self, cis_handle: int) -> None: 4883 cis_link = self.cis_links[cis_handle] 4884 cis_link.state = CisLink.State.ESTABLISHED 4885 4886 assert cis_link.acl_connection 4887 4888 logger.debug( 4889 f'*** CIS Establishment ' 4890 f'{cis_link.acl_connection.peer_address}, ' 4891 f'cis_handle=[0x{cis_handle:04X}], ' 4892 f'cig_id=[0x{cis_link.cig_id:02X}], ' 4893 f'cis_id=[0x{cis_link.cis_id:02X}] ***' 4894 ) 4895 4896 cis_link.emit('establishment') 4897 self.emit('cis_establishment', cis_link) 4898 4899 # [LE only] 4900 @host_event_handler 4901 @experimental('Only for testing') 4902 def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None: 4903 logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***') 4904 if cis_link := self.cis_links.pop(cis_handle): 4905 cis_link.emit('establishment_failure', status) 4906 self.emit('cis_establishment_failure', cis_handle, status) 4907 4908 # [LE only] 4909 @host_event_handler 4910 @experimental('Only for testing') 4911 def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None: 4912 if (cis_link := self.cis_links.get(handle)) and cis_link.sink: 4913 cis_link.sink(packet) 4914 4915 @host_event_handler 4916 @with_connection_from_handle 4917 def on_connection_encryption_change(self, connection, encryption): 4918 logger.debug( 4919 f'*** Connection Encryption Change: [0x{connection.handle:04X}] ' 4920 f'{connection.peer_address} as {connection.role_name}, ' 4921 f'encryption={encryption}' 4922 ) 4923 connection.encryption = encryption 4924 if ( 4925 not connection.authenticated 4926 and connection.transport == BT_BR_EDR_TRANSPORT 4927 and encryption == HCI_Encryption_Change_Event.AES_CCM 4928 ): 4929 connection.authenticated = True 4930 connection.sc = True 4931 if ( 4932 not connection.authenticated 4933 and connection.transport == BT_LE_TRANSPORT 4934 and encryption == HCI_Encryption_Change_Event.E0_OR_AES_CCM 4935 ): 4936 connection.authenticated = True 4937 connection.sc = True 4938 connection.emit('connection_encryption_change') 4939 4940 @host_event_handler 4941 @with_connection_from_handle 4942 def on_connection_encryption_failure(self, connection, error): 4943 logger.debug( 4944 f'*** Connection Encryption Failure: [0x{connection.handle:04X}] ' 4945 f'{connection.peer_address} as {connection.role_name}, ' 4946 f'error={error}' 4947 ) 4948 connection.emit('connection_encryption_failure', error) 4949 4950 @host_event_handler 4951 @with_connection_from_handle 4952 def on_connection_encryption_key_refresh(self, connection): 4953 logger.debug( 4954 f'*** Connection Key Refresh: [0x{connection.handle:04X}] ' 4955 f'{connection.peer_address} as {connection.role_name}' 4956 ) 4957 connection.emit('connection_encryption_key_refresh') 4958 4959 @host_event_handler 4960 @with_connection_from_handle 4961 def on_connection_parameters_update(self, connection, connection_parameters): 4962 logger.debug( 4963 f'*** Connection Parameters Update: [0x{connection.handle:04X}] ' 4964 f'{connection.peer_address} as {connection.role_name}, ' 4965 f'{connection_parameters}' 4966 ) 4967 connection.parameters = connection_parameters 4968 connection.emit('connection_parameters_update') 4969 4970 @host_event_handler 4971 @with_connection_from_handle 4972 def on_connection_parameters_update_failure(self, connection, error): 4973 logger.debug( 4974 f'*** Connection Parameters Update Failed: [0x{connection.handle:04X}] ' 4975 f'{connection.peer_address} as {connection.role_name}, ' 4976 f'error={error}' 4977 ) 4978 connection.emit('connection_parameters_update_failure', error) 4979 4980 @host_event_handler 4981 @with_connection_from_handle 4982 def on_connection_phy_update(self, connection, connection_phy): 4983 logger.debug( 4984 f'*** Connection PHY Update: [0x{connection.handle:04X}] ' 4985 f'{connection.peer_address} as {connection.role_name}, ' 4986 f'{connection_phy}' 4987 ) 4988 connection.phy = connection_phy 4989 connection.emit('connection_phy_update') 4990 4991 @host_event_handler 4992 @with_connection_from_handle 4993 def on_connection_phy_update_failure(self, connection, error): 4994 logger.debug( 4995 f'*** Connection PHY Update Failed: [0x{connection.handle:04X}] ' 4996 f'{connection.peer_address} as {connection.role_name}, ' 4997 f'error={error}' 4998 ) 4999 connection.emit('connection_phy_update_failure', error) 5000 5001 @host_event_handler 5002 @with_connection_from_handle 5003 def on_connection_att_mtu_update(self, connection, att_mtu): 5004 logger.debug( 5005 f'*** Connection ATT MTU Update: [0x{connection.handle:04X}] ' 5006 f'{connection.peer_address} as {connection.role_name}, ' 5007 f'{att_mtu}' 5008 ) 5009 connection.att_mtu = att_mtu 5010 connection.emit('connection_att_mtu_update') 5011 5012 @host_event_handler 5013 @with_connection_from_handle 5014 def on_connection_data_length_change( 5015 self, connection, max_tx_octets, max_tx_time, max_rx_octets, max_rx_time 5016 ): 5017 logger.debug( 5018 f'*** Connection Data Length Change: [0x{connection.handle:04X}] ' 5019 f'{connection.peer_address} as {connection.role_name}' 5020 ) 5021 connection.data_length = ( 5022 max_tx_octets, 5023 max_tx_time, 5024 max_rx_octets, 5025 max_rx_time, 5026 ) 5027 connection.emit('connection_data_length_change') 5028 5029 # [Classic only] 5030 @host_event_handler 5031 @with_connection_from_address 5032 def on_role_change(self, connection, new_role): 5033 connection.role = new_role 5034 connection.emit('role_change', new_role) 5035 5036 # [Classic only] 5037 @host_event_handler 5038 @try_with_connection_from_address 5039 def on_role_change_failure(self, connection, address, error): 5040 if connection: 5041 connection.emit('role_change_failure', error) 5042 self.emit('role_change_failure', address, error) 5043 5044 # [Classic only] 5045 @host_event_handler 5046 @with_connection_from_address 5047 def on_classic_pairing(self, connection: Connection) -> None: 5048 connection.emit('classic_pairing') 5049 5050 # [Classic only] 5051 @host_event_handler 5052 @with_connection_from_address 5053 def on_classic_pairing_failure(self, connection: Connection, status) -> None: 5054 connection.emit('classic_pairing_failure', status) 5055 5056 def on_pairing_start(self, connection: Connection) -> None: 5057 connection.emit('pairing_start') 5058 5059 def on_pairing( 5060 self, 5061 connection: Connection, 5062 identity_address: Optional[Address], 5063 keys: PairingKeys, 5064 sc: bool, 5065 ) -> None: 5066 if identity_address is not None: 5067 connection.peer_resolvable_address = connection.peer_address 5068 connection.peer_address = identity_address 5069 connection.sc = sc 5070 connection.authenticated = True 5071 connection.emit('pairing', keys) 5072 5073 def on_pairing_failure(self, connection: Connection, reason: int) -> None: 5074 connection.emit('pairing_failure', reason) 5075 5076 @with_connection_from_handle 5077 def on_gatt_pdu(self, connection, pdu): 5078 # Parse the L2CAP payload into an ATT PDU object 5079 att_pdu = ATT_PDU.from_bytes(pdu) 5080 5081 # Conveniently, even-numbered op codes are client->server and 5082 # odd-numbered ones are server->client 5083 if att_pdu.op_code & 1: 5084 if connection.gatt_client is None: 5085 logger.warning( 5086 color('no GATT client for connection 0x{connection_handle:04X}') 5087 ) 5088 return 5089 connection.gatt_client.on_gatt_pdu(att_pdu) 5090 else: 5091 if connection.gatt_server is None: 5092 logger.warning( 5093 color('no GATT server for connection 0x{connection_handle:04X}') 5094 ) 5095 return 5096 connection.gatt_server.on_gatt_pdu(connection, att_pdu) 5097 5098 @with_connection_from_handle 5099 def on_smp_pdu(self, connection, pdu): 5100 self.smp_manager.on_smp_pdu(connection, pdu) 5101 5102 @host_event_handler 5103 @with_connection_from_handle 5104 def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes): 5105 self.l2cap_channel_manager.on_pdu(connection, cid, pdu) 5106 5107 def __str__(self): 5108 return ( 5109 f'Device(name="{self.name}", ' 5110 f'random_address="{self.random_address}", ' 5111 f'public_address="{self.public_address}", ' 5112 f'static_address="{self.static_address}")' 5113 ) 5114