1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19 20import logging 21import asyncio 22import dataclasses 23import itertools 24import random 25import struct 26from bumble.colors import color 27from bumble.core import ( 28 BT_CENTRAL_ROLE, 29 BT_PERIPHERAL_ROLE, 30 BT_LE_TRANSPORT, 31 BT_BR_EDR_TRANSPORT, 32) 33 34from bumble.hci import ( 35 HCI_ACL_DATA_PACKET, 36 HCI_COMMAND_DISALLOWED_ERROR, 37 HCI_COMMAND_PACKET, 38 HCI_COMMAND_STATUS_PENDING, 39 HCI_CONNECTION_TIMEOUT_ERROR, 40 HCI_CONTROLLER_BUSY_ERROR, 41 HCI_EVENT_PACKET, 42 HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, 43 HCI_LE_1M_PHY, 44 HCI_SUCCESS, 45 HCI_UNKNOWN_HCI_COMMAND_ERROR, 46 HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, 47 HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, 48 HCI_VERSION_BLUETOOTH_CORE_5_0, 49 Address, 50 HCI_AclDataPacket, 51 HCI_AclDataPacketAssembler, 52 HCI_Command_Complete_Event, 53 HCI_Command_Status_Event, 54 HCI_Connection_Complete_Event, 55 HCI_Connection_Request_Event, 56 HCI_Disconnection_Complete_Event, 57 HCI_Encryption_Change_Event, 58 HCI_Synchronous_Connection_Complete_Event, 59 HCI_LE_Advertising_Report_Event, 60 HCI_LE_CIS_Established_Event, 61 HCI_LE_CIS_Request_Event, 62 HCI_LE_Connection_Complete_Event, 63 HCI_LE_Read_Remote_Features_Complete_Event, 64 HCI_Number_Of_Completed_Packets_Event, 65 HCI_Packet, 66 HCI_Role_Change_Event, 67) 68from typing import Optional, Union, Dict, Any, TYPE_CHECKING 69 70if TYPE_CHECKING: 71 from bumble.link import LocalLink 72 from bumble.transport.common import TransportSink 73 74# ----------------------------------------------------------------------------- 75# Logging 76# ----------------------------------------------------------------------------- 77logger = logging.getLogger(__name__) 78 79 80# ----------------------------------------------------------------------------- 81# Utils 82# ----------------------------------------------------------------------------- 83class DataObject: 84 pass 85 86 87# ----------------------------------------------------------------------------- 88@dataclasses.dataclass 89class CisLink: 90 handle: int 91 cis_id: int 92 cig_id: int 93 acl_connection: Optional[Connection] = None 94 95 96# ----------------------------------------------------------------------------- 97@dataclasses.dataclass 98class Connection: 99 controller: Controller 100 handle: int 101 role: int 102 peer_address: Address 103 link: Any 104 transport: int 105 link_type: int 106 107 def __post_init__(self): 108 self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) 109 110 def on_hci_acl_data_packet(self, packet): 111 self.assembler.feed_packet(packet) 112 self.controller.send_hci_packet( 113 HCI_Number_Of_Completed_Packets_Event([(self.handle, 1)]) 114 ) 115 116 def on_acl_pdu(self, data): 117 if self.link: 118 self.link.send_acl_data( 119 self.controller, self.peer_address, self.transport, data 120 ) 121 122 123# ----------------------------------------------------------------------------- 124class Controller: 125 def __init__( 126 self, 127 name: str, 128 host_source=None, 129 host_sink: Optional[TransportSink] = None, 130 link: Optional[LocalLink] = None, 131 public_address: Optional[Union[bytes, str, Address]] = None, 132 ): 133 self.name = name 134 self.hci_sink = None 135 self.link = link 136 137 self.central_connections: Dict[Address, Connection] = ( 138 {} 139 ) # Connections where this controller is the central 140 self.peripheral_connections: Dict[Address, Connection] = ( 141 {} 142 ) # Connections where this controller is the peripheral 143 self.classic_connections: Dict[Address, Connection] = ( 144 {} 145 ) # Connections in BR/EDR 146 self.central_cis_links: Dict[int, CisLink] = {} # CIS links by handle 147 self.peripheral_cis_links: Dict[int, CisLink] = {} # CIS links by handle 148 149 self.hci_version = HCI_VERSION_BLUETOOTH_CORE_5_0 150 self.hci_revision = 0 151 self.lmp_version = HCI_VERSION_BLUETOOTH_CORE_5_0 152 self.lmp_subversion = 0 153 self.lmp_features = bytes.fromhex( 154 '0000000060000000' 155 ) # BR/EDR Not Supported, LE Supported (Controller) 156 self.manufacturer_name = 0xFFFF 157 self.hc_data_packet_length = 27 158 self.hc_total_num_data_packets = 64 159 self.hc_le_data_packet_length = 27 160 self.hc_total_num_le_data_packets = 64 161 self.event_mask = 0 162 self.event_mask_page_2 = 0 163 self.supported_commands = bytes.fromhex( 164 '2000800000c000000000e4000000a822000000000000040000f7ffff7f000000' 165 '30f0f9ff01008004000000000000000000000000000000000000000000000000' 166 ) 167 self.le_event_mask = 0 168 self.advertising_parameters = None 169 self.le_features = bytes.fromhex('ff49010000000000') 170 self.le_states = bytes.fromhex('ffff3fffff030000') 171 self.advertising_channel_tx_power = 0 172 self.filter_accept_list_size = 8 173 self.filter_duplicates = False 174 self.resolving_list_size = 8 175 self.supported_max_tx_octets = 27 176 self.supported_max_tx_time = 10000 # microseconds 177 self.supported_max_rx_octets = 27 178 self.supported_max_rx_time = 10000 # microseconds 179 self.suggested_max_tx_octets = 27 180 self.suggested_max_tx_time = 0x0148 # microseconds 181 self.default_phy = bytes([0, 0, 0]) 182 self.le_scan_type = 0 183 self.le_scan_interval = 0x10 184 self.le_scan_window = 0x10 185 self.le_scan_enable = 0 186 self.le_scan_own_address_type = Address.RANDOM_DEVICE_ADDRESS 187 self.le_scanning_filter_policy = 0 188 self.le_scan_response_data = None 189 self.le_address_resolution = False 190 self.le_rpa_timeout = 0 191 self.sync_flow_control = False 192 self.local_name = 'Bumble' 193 194 self.advertising_interval = 2000 # Fixed for now 195 self.advertising_data = None 196 self.advertising_timer_handle = None 197 198 self._random_address = Address('00:00:00:00:00:00') 199 if isinstance(public_address, Address): 200 self._public_address = public_address 201 elif public_address is not None: 202 self._public_address = Address( 203 public_address, Address.PUBLIC_DEVICE_ADDRESS 204 ) 205 else: 206 self._public_address = Address('00:00:00:00:00:00') 207 208 # Set the source and sink interfaces 209 if host_source: 210 host_source.set_packet_sink(self) 211 self.host = host_sink 212 213 # Add this controller to the link if specified 214 if link: 215 link.add_controller(self) 216 217 self.terminated = asyncio.get_running_loop().create_future() 218 219 @property 220 def host(self): 221 return self.hci_sink 222 223 @host.setter 224 def host(self, host): 225 ''' 226 Sets the host (sink) for this controller, and set this controller as the 227 controller (sink) for the host 228 ''' 229 self.set_packet_sink(host) 230 if host: 231 host.controller = self 232 233 def set_packet_sink(self, sink): 234 ''' 235 Method from the Packet Source interface 236 ''' 237 self.hci_sink = sink 238 239 @property 240 def public_address(self): 241 return self._public_address 242 243 @public_address.setter 244 def public_address(self, address): 245 if isinstance(address, str): 246 address = Address(address) 247 self._public_address = address 248 249 @property 250 def random_address(self): 251 return self._random_address 252 253 @random_address.setter 254 def random_address(self, address): 255 if isinstance(address, str): 256 address = Address(address) 257 self._random_address = address 258 logger.debug(f'new random address: {address}') 259 260 if self.link: 261 self.link.on_address_changed(self) 262 263 # Packet Sink protocol (packets coming from the host via HCI) 264 def on_packet(self, packet): 265 self.on_hci_packet(HCI_Packet.from_bytes(packet)) 266 267 def on_hci_packet(self, packet): 268 logger.debug( 269 f'{color("<<<", "blue")} [{self.name}] ' 270 f'{color("HOST -> CONTROLLER", "blue")}: {packet}' 271 ) 272 273 # If the packet is a command, invoke the handler for this packet 274 if packet.hci_packet_type == HCI_COMMAND_PACKET: 275 self.on_hci_command_packet(packet) 276 elif packet.hci_packet_type == HCI_EVENT_PACKET: 277 self.on_hci_event_packet(packet) 278 elif packet.hci_packet_type == HCI_ACL_DATA_PACKET: 279 self.on_hci_acl_data_packet(packet) 280 else: 281 logger.warning(f'!!! unknown packet type {packet.hci_packet_type}') 282 283 def on_hci_command_packet(self, command): 284 handler_name = f'on_{command.name.lower()}' 285 handler = getattr(self, handler_name, self.on_hci_command) 286 result = handler(command) 287 if isinstance(result, bytes): 288 self.send_hci_packet( 289 HCI_Command_Complete_Event( 290 num_hci_command_packets=1, 291 command_opcode=command.op_code, 292 return_parameters=result, 293 ) 294 ) 295 296 def on_hci_event_packet(self, _event): 297 logger.warning('!!! unexpected event packet') 298 299 def on_hci_acl_data_packet(self, packet): 300 # Look for the connection to which this data belongs 301 connection = self.find_connection_by_handle(packet.connection_handle) 302 if connection is None: 303 logger.warning( 304 f'!!! no connection for handle 0x{packet.connection_handle:04X}' 305 ) 306 return 307 308 # Pass the packet to the connection 309 connection.on_hci_acl_data_packet(packet) 310 311 def send_hci_packet(self, packet): 312 logger.debug( 313 f'{color(">>>", "green")} [{self.name}] ' 314 f'{color("CONTROLLER -> HOST", "green")}: {packet}' 315 ) 316 if self.host: 317 self.host.on_packet(packet.to_bytes()) 318 319 # This method allows the controller to emulate the same API as a transport source 320 async def wait_for_termination(self): 321 await self.terminated 322 323 ############################################################ 324 # Link connections 325 ############################################################ 326 def allocate_connection_handle(self) -> int: 327 handle = 0 328 max_handle = 0 329 for connection in itertools.chain( 330 self.central_connections.values(), 331 self.peripheral_connections.values(), 332 self.classic_connections.values(), 333 ): 334 max_handle = max(max_handle, connection.handle) 335 if connection.handle == handle: 336 # Already used, continue searching after the current max 337 handle = max_handle + 1 338 for cis_handle in itertools.chain( 339 self.central_cis_links.keys(), self.peripheral_cis_links.keys() 340 ): 341 max_handle = max(max_handle, cis_handle) 342 if cis_handle == handle: 343 # Already used, continue searching after the current max 344 handle = max_handle + 1 345 return handle 346 347 def find_le_connection_by_address(self, address): 348 return self.central_connections.get(address) or self.peripheral_connections.get( 349 address 350 ) 351 352 def find_classic_connection_by_address(self, address): 353 return self.classic_connections.get(address) 354 355 def find_connection_by_handle(self, handle): 356 for connection in itertools.chain( 357 self.central_connections.values(), 358 self.peripheral_connections.values(), 359 self.classic_connections.values(), 360 ): 361 if connection.handle == handle: 362 return connection 363 return None 364 365 def find_central_connection_by_handle(self, handle): 366 for connection in self.central_connections.values(): 367 if connection.handle == handle: 368 return connection 369 return None 370 371 def find_classic_connection_by_handle(self, handle): 372 for connection in self.classic_connections.values(): 373 if connection.handle == handle: 374 return connection 375 return None 376 377 def on_link_central_connected(self, central_address): 378 ''' 379 Called when an incoming connection occurs from a central on the link 380 ''' 381 382 # Allocate (or reuse) a connection handle 383 peer_address = central_address 384 peer_address_type = central_address.address_type 385 connection = self.peripheral_connections.get(peer_address) 386 if connection is None: 387 connection_handle = self.allocate_connection_handle() 388 connection = Connection( 389 controller=self, 390 handle=connection_handle, 391 role=BT_PERIPHERAL_ROLE, 392 peer_address=peer_address, 393 link=self.link, 394 transport=BT_LE_TRANSPORT, 395 link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, 396 ) 397 self.peripheral_connections[peer_address] = connection 398 logger.debug(f'New PERIPHERAL connection handle: 0x{connection_handle:04X}') 399 400 # Then say that the connection has completed 401 self.send_hci_packet( 402 HCI_LE_Connection_Complete_Event( 403 status=HCI_SUCCESS, 404 connection_handle=connection.handle, 405 role=connection.role, 406 peer_address_type=peer_address_type, 407 peer_address=peer_address, 408 connection_interval=10, # FIXME 409 peripheral_latency=0, # FIXME 410 supervision_timeout=10, # FIXME 411 central_clock_accuracy=7, # FIXME 412 ) 413 ) 414 415 def on_link_central_disconnected(self, peer_address, reason): 416 ''' 417 Called when an active disconnection occurs from a peer 418 ''' 419 420 # Send a disconnection complete event 421 if connection := self.peripheral_connections.get(peer_address): 422 self.send_hci_packet( 423 HCI_Disconnection_Complete_Event( 424 status=HCI_SUCCESS, 425 connection_handle=connection.handle, 426 reason=reason, 427 ) 428 ) 429 430 # Remove the connection 431 del self.peripheral_connections[peer_address] 432 else: 433 logger.warning(f'!!! No peripheral connection found for {peer_address}') 434 435 def on_link_peripheral_connection_complete( 436 self, le_create_connection_command, status 437 ): 438 ''' 439 Called by the link when a connection has been made or has failed to be made 440 ''' 441 442 if status == HCI_SUCCESS: 443 # Allocate (or reuse) a connection handle 444 peer_address = le_create_connection_command.peer_address 445 connection = self.central_connections.get(peer_address) 446 if connection is None: 447 connection_handle = self.allocate_connection_handle() 448 connection = Connection( 449 controller=self, 450 handle=connection_handle, 451 role=BT_CENTRAL_ROLE, 452 peer_address=peer_address, 453 link=self.link, 454 transport=BT_LE_TRANSPORT, 455 link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, 456 ) 457 self.central_connections[peer_address] = connection 458 logger.debug( 459 f'New CENTRAL connection handle: 0x{connection_handle:04X}' 460 ) 461 else: 462 connection = None 463 464 # Say that the connection has completed 465 self.send_hci_packet( 466 # pylint: disable=line-too-long 467 HCI_LE_Connection_Complete_Event( 468 status=status, 469 connection_handle=connection.handle if connection else 0, 470 role=BT_CENTRAL_ROLE, 471 peer_address_type=le_create_connection_command.peer_address_type, 472 peer_address=le_create_connection_command.peer_address, 473 connection_interval=le_create_connection_command.connection_interval_min, 474 peripheral_latency=le_create_connection_command.max_latency, 475 supervision_timeout=le_create_connection_command.supervision_timeout, 476 central_clock_accuracy=0, 477 ) 478 ) 479 480 def on_link_peripheral_disconnection_complete(self, disconnection_command, status): 481 ''' 482 Called when a disconnection has been completed 483 ''' 484 485 # Send a disconnection complete event 486 self.send_hci_packet( 487 HCI_Disconnection_Complete_Event( 488 status=status, 489 connection_handle=disconnection_command.connection_handle, 490 reason=disconnection_command.reason, 491 ) 492 ) 493 494 # Remove the connection 495 if connection := self.find_central_connection_by_handle( 496 disconnection_command.connection_handle 497 ): 498 logger.debug(f'CENTRAL Connection removed: {connection}') 499 del self.central_connections[connection.peer_address] 500 501 def on_link_peripheral_disconnected(self, peer_address): 502 ''' 503 Called when a connection to a peripheral is broken 504 ''' 505 506 # Send a disconnection complete event 507 if connection := self.central_connections.get(peer_address): 508 self.send_hci_packet( 509 HCI_Disconnection_Complete_Event( 510 status=HCI_SUCCESS, 511 connection_handle=connection.handle, 512 reason=HCI_CONNECTION_TIMEOUT_ERROR, 513 ) 514 ) 515 516 # Remove the connection 517 del self.central_connections[peer_address] 518 else: 519 logger.warning(f'!!! No central connection found for {peer_address}') 520 521 def on_link_encrypted(self, peer_address, _rand, _ediv, _ltk): 522 # For now, just setup the encryption without asking the host 523 if connection := self.find_le_connection_by_address(peer_address): 524 self.send_hci_packet( 525 HCI_Encryption_Change_Event( 526 status=0, connection_handle=connection.handle, encryption_enabled=1 527 ) 528 ) 529 530 def on_link_acl_data(self, sender_address, transport, data): 531 # Look for the connection to which this data belongs 532 if transport == BT_LE_TRANSPORT: 533 connection = self.find_le_connection_by_address(sender_address) 534 else: 535 connection = self.find_classic_connection_by_address(sender_address) 536 if connection is None: 537 logger.warning(f'!!! no connection for {sender_address}') 538 return 539 540 # Send the data to the host 541 # TODO: should fragment 542 acl_packet = HCI_AclDataPacket(connection.handle, 2, 0, len(data), data) 543 self.send_hci_packet(acl_packet) 544 545 def on_link_advertising_data(self, sender_address, data): 546 # Ignore if we're not scanning 547 if self.le_scan_enable == 0: 548 return 549 550 # Send a scan report 551 report = HCI_LE_Advertising_Report_Event.Report( 552 HCI_LE_Advertising_Report_Event.Report.FIELDS, 553 event_type=HCI_LE_Advertising_Report_Event.ADV_IND, 554 address_type=sender_address.address_type, 555 address=sender_address, 556 data=data, 557 rssi=-50, 558 ) 559 self.send_hci_packet(HCI_LE_Advertising_Report_Event([report])) 560 561 # Simulate a scan response 562 report = HCI_LE_Advertising_Report_Event.Report( 563 HCI_LE_Advertising_Report_Event.Report.FIELDS, 564 event_type=HCI_LE_Advertising_Report_Event.SCAN_RSP, 565 address_type=sender_address.address_type, 566 address=sender_address, 567 data=data, 568 rssi=-50, 569 ) 570 self.send_hci_packet(HCI_LE_Advertising_Report_Event([report])) 571 572 def on_link_cis_request( 573 self, central_address: Address, cig_id: int, cis_id: int 574 ) -> None: 575 ''' 576 Called when an incoming CIS request occurs from a central on the link 577 ''' 578 579 connection = self.peripheral_connections.get(central_address) 580 assert connection 581 582 pending_cis_link = CisLink( 583 handle=self.allocate_connection_handle(), 584 cis_id=cis_id, 585 cig_id=cig_id, 586 acl_connection=connection, 587 ) 588 self.peripheral_cis_links[pending_cis_link.handle] = pending_cis_link 589 590 self.send_hci_packet( 591 HCI_LE_CIS_Request_Event( 592 acl_connection_handle=connection.handle, 593 cis_connection_handle=pending_cis_link.handle, 594 cig_id=cig_id, 595 cis_id=cis_id, 596 ) 597 ) 598 599 def on_link_cis_established(self, cig_id: int, cis_id: int) -> None: 600 ''' 601 Called when an incoming CIS established. 602 ''' 603 604 cis_link = next( 605 cis_link 606 for cis_link in itertools.chain( 607 self.central_cis_links.values(), self.peripheral_cis_links.values() 608 ) 609 if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id 610 ) 611 612 self.send_hci_packet( 613 HCI_LE_CIS_Established_Event( 614 status=HCI_SUCCESS, 615 connection_handle=cis_link.handle, 616 # CIS parameters are ignored. 617 cig_sync_delay=0, 618 cis_sync_delay=0, 619 transport_latency_c_to_p=0, 620 transport_latency_p_to_c=0, 621 phy_c_to_p=0, 622 phy_p_to_c=0, 623 nse=0, 624 bn_c_to_p=0, 625 bn_p_to_c=0, 626 ft_c_to_p=0, 627 ft_p_to_c=0, 628 max_pdu_c_to_p=0, 629 max_pdu_p_to_c=0, 630 iso_interval=0, 631 ) 632 ) 633 634 def on_link_cis_disconnected(self, cig_id: int, cis_id: int) -> None: 635 ''' 636 Called when a CIS disconnected. 637 ''' 638 639 if cis_link := next( 640 ( 641 cis_link 642 for cis_link in self.peripheral_cis_links.values() 643 if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id 644 ), 645 None, 646 ): 647 # Remove peripheral CIS on disconnection. 648 self.peripheral_cis_links.pop(cis_link.handle) 649 elif cis_link := next( 650 ( 651 cis_link 652 for cis_link in self.central_cis_links.values() 653 if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id 654 ), 655 None, 656 ): 657 # Keep central CIS on disconnection. They should be removed by HCI_LE_Remove_CIG_Command. 658 cis_link.acl_connection = None 659 else: 660 return 661 662 self.send_hci_packet( 663 HCI_Disconnection_Complete_Event( 664 status=HCI_SUCCESS, 665 connection_handle=cis_link.handle, 666 reason=HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, 667 ) 668 ) 669 670 ############################################################ 671 # Classic link connections 672 ############################################################ 673 674 def on_classic_connection_request(self, peer_address, link_type): 675 self.send_hci_packet( 676 HCI_Connection_Request_Event( 677 bd_addr=peer_address, 678 class_of_device=0, 679 link_type=link_type, 680 ) 681 ) 682 683 def on_classic_connection_complete(self, peer_address, status): 684 if status == HCI_SUCCESS: 685 # Allocate (or reuse) a connection handle 686 peer_address = peer_address 687 connection = self.classic_connections.get(peer_address) 688 if connection is None: 689 connection_handle = self.allocate_connection_handle() 690 connection = Connection( 691 controller=self, 692 handle=connection_handle, 693 # Role doesn't matter in Classic because they are managed by HCI_Role_Change and HCI_Role_Discovery 694 role=BT_CENTRAL_ROLE, 695 peer_address=peer_address, 696 link=self.link, 697 transport=BT_BR_EDR_TRANSPORT, 698 link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, 699 ) 700 self.classic_connections[peer_address] = connection 701 logger.debug( 702 f'New CLASSIC connection handle: 0x{connection_handle:04X}' 703 ) 704 else: 705 connection_handle = connection.handle 706 self.send_hci_packet( 707 HCI_Connection_Complete_Event( 708 status=status, 709 connection_handle=connection_handle, 710 bd_addr=peer_address, 711 encryption_enabled=False, 712 link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, 713 ) 714 ) 715 else: 716 connection = None 717 self.send_hci_packet( 718 HCI_Connection_Complete_Event( 719 status=status, 720 connection_handle=0, 721 bd_addr=peer_address, 722 encryption_enabled=False, 723 link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, 724 ) 725 ) 726 727 def on_classic_disconnected(self, peer_address, reason): 728 # Send a disconnection complete event 729 if connection := self.classic_connections.get(peer_address): 730 self.send_hci_packet( 731 HCI_Disconnection_Complete_Event( 732 status=HCI_SUCCESS, 733 connection_handle=connection.handle, 734 reason=reason, 735 ) 736 ) 737 738 # Remove the connection 739 del self.classic_connections[peer_address] 740 else: 741 logger.warning(f'!!! No classic connection found for {peer_address}') 742 743 def on_classic_role_change(self, peer_address, new_role): 744 self.send_hci_packet( 745 HCI_Role_Change_Event( 746 status=HCI_SUCCESS, 747 bd_addr=peer_address, 748 new_role=new_role, 749 ) 750 ) 751 752 def on_classic_sco_connection_complete( 753 self, peer_address: Address, status: int, link_type: int 754 ): 755 if status == HCI_SUCCESS: 756 # Allocate (or reuse) a connection handle 757 connection_handle = self.allocate_connection_handle() 758 connection = Connection( 759 controller=self, 760 handle=connection_handle, 761 # Role doesn't matter in SCO. 762 role=BT_CENTRAL_ROLE, 763 peer_address=peer_address, 764 link=self.link, 765 transport=BT_BR_EDR_TRANSPORT, 766 link_type=link_type, 767 ) 768 self.classic_connections[peer_address] = connection 769 logger.debug(f'New SCO connection handle: 0x{connection_handle:04X}') 770 else: 771 connection_handle = 0 772 773 self.send_hci_packet( 774 HCI_Synchronous_Connection_Complete_Event( 775 status=status, 776 connection_handle=connection_handle, 777 bd_addr=peer_address, 778 link_type=link_type, 779 # TODO: Provide SCO connection parameters. 780 transmission_interval=0, 781 retransmission_window=0, 782 rx_packet_length=0, 783 tx_packet_length=0, 784 air_mode=0, 785 ) 786 ) 787 788 ############################################################ 789 # Advertising support 790 ############################################################ 791 def on_advertising_timer_fired(self): 792 self.send_advertising_data() 793 self.advertising_timer_handle = asyncio.get_running_loop().call_later( 794 self.advertising_interval / 1000.0, self.on_advertising_timer_fired 795 ) 796 797 def start_advertising(self): 798 # Stop any ongoing advertising before we start again 799 self.stop_advertising() 800 801 # Advertise now 802 self.advertising_timer_handle = asyncio.get_running_loop().call_soon( 803 self.on_advertising_timer_fired 804 ) 805 806 def stop_advertising(self): 807 if self.advertising_timer_handle is not None: 808 self.advertising_timer_handle.cancel() 809 self.advertising_timer_handle = None 810 811 def send_advertising_data(self): 812 if self.link and self.advertising_data: 813 self.link.send_advertising_data(self.random_address, self.advertising_data) 814 815 @property 816 def is_advertising(self): 817 return self.advertising_timer_handle is not None 818 819 ############################################################ 820 # HCI handlers 821 ############################################################ 822 def on_hci_command(self, command): 823 logger.warning(color(f'--- Unsupported command {command}', 'red')) 824 return bytes([HCI_UNKNOWN_HCI_COMMAND_ERROR]) 825 826 def on_hci_create_connection_command(self, command): 827 ''' 828 See Bluetooth spec Vol 4, Part E - 7.1.5 Create Connection command 829 ''' 830 831 if self.link is None: 832 return 833 logger.debug(f'Connection request to {command.bd_addr}') 834 835 # Check that we don't already have a pending connection 836 if self.link.get_pending_connection(): 837 self.send_hci_packet( 838 HCI_Command_Status_Event( 839 status=HCI_CONTROLLER_BUSY_ERROR, 840 num_hci_command_packets=1, 841 command_opcode=command.op_code, 842 ) 843 ) 844 return 845 846 self.link.classic_connect(self, command.bd_addr) 847 848 # Say that the connection is pending 849 self.send_hci_packet( 850 HCI_Command_Status_Event( 851 status=HCI_COMMAND_STATUS_PENDING, 852 num_hci_command_packets=1, 853 command_opcode=command.op_code, 854 ) 855 ) 856 857 def on_hci_disconnect_command(self, command): 858 ''' 859 See Bluetooth spec Vol 4, Part E - 7.1.6 Disconnect Command 860 ''' 861 # First, say that the disconnection is pending 862 self.send_hci_packet( 863 HCI_Command_Status_Event( 864 status=HCI_COMMAND_STATUS_PENDING, 865 num_hci_command_packets=1, 866 command_opcode=command.op_code, 867 ) 868 ) 869 870 # Notify the link of the disconnection 871 handle = command.connection_handle 872 if connection := self.find_central_connection_by_handle(handle): 873 if self.link: 874 self.link.disconnect( 875 self.random_address, connection.peer_address, command 876 ) 877 else: 878 # Remove the connection 879 del self.central_connections[connection.peer_address] 880 elif connection := self.find_classic_connection_by_handle(handle): 881 if self.link: 882 self.link.classic_disconnect( 883 self, 884 connection.peer_address, 885 HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, 886 ) 887 else: 888 # Remove the connection 889 del self.classic_connections[connection.peer_address] 890 elif cis_link := ( 891 self.central_cis_links.get(handle) or self.peripheral_cis_links.get(handle) 892 ): 893 if self.link: 894 self.link.disconnect_cis( 895 initiator_controller=self, 896 peer_address=cis_link.acl_connection.peer_address, 897 cig_id=cis_link.cig_id, 898 cis_id=cis_link.cis_id, 899 ) 900 # Spec requires handle to be kept after disconnection. 901 902 def on_hci_accept_connection_request_command(self, command): 903 ''' 904 See Bluetooth spec Vol 4, Part E - 7.1.8 Accept Connection Request command 905 ''' 906 907 if self.link is None: 908 return 909 self.send_hci_packet( 910 HCI_Command_Status_Event( 911 status=HCI_SUCCESS, 912 num_hci_command_packets=1, 913 command_opcode=command.op_code, 914 ) 915 ) 916 self.link.classic_accept_connection(self, command.bd_addr, command.role) 917 918 def on_hci_enhanced_setup_synchronous_connection_command(self, command): 919 ''' 920 See Bluetooth spec Vol 4, Part E - 7.1.45 Enhanced Setup Synchronous Connection command 921 ''' 922 923 if self.link is None: 924 return 925 926 if not ( 927 connection := self.find_classic_connection_by_handle( 928 command.connection_handle 929 ) 930 ): 931 self.send_hci_packet( 932 HCI_Command_Status_Event( 933 status=HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, 934 num_hci_command_packets=1, 935 command_opcode=command.op_code, 936 ) 937 ) 938 return 939 940 self.send_hci_packet( 941 HCI_Command_Status_Event( 942 status=HCI_SUCCESS, 943 num_hci_command_packets=1, 944 command_opcode=command.op_code, 945 ) 946 ) 947 self.link.classic_sco_connect( 948 self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE 949 ) 950 951 def on_hci_enhanced_accept_synchronous_connection_request_command(self, command): 952 ''' 953 See Bluetooth spec Vol 4, Part E - 7.1.46 Enhanced Accept Synchronous Connection Request command 954 ''' 955 956 if self.link is None: 957 return 958 959 if not (connection := self.find_classic_connection_by_address(command.bd_addr)): 960 self.send_hci_packet( 961 HCI_Command_Status_Event( 962 status=HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, 963 num_hci_command_packets=1, 964 command_opcode=command.op_code, 965 ) 966 ) 967 return 968 969 self.send_hci_packet( 970 HCI_Command_Status_Event( 971 status=HCI_SUCCESS, 972 num_hci_command_packets=1, 973 command_opcode=command.op_code, 974 ) 975 ) 976 self.link.classic_accept_sco_connection( 977 self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE 978 ) 979 980 def on_hci_switch_role_command(self, command): 981 ''' 982 See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command 983 ''' 984 985 if self.link is None: 986 return 987 self.send_hci_packet( 988 HCI_Command_Status_Event( 989 status=HCI_SUCCESS, 990 num_hci_command_packets=1, 991 command_opcode=command.op_code, 992 ) 993 ) 994 self.link.classic_switch_role(self, command.bd_addr, command.role) 995 996 def on_hci_set_event_mask_command(self, command): 997 ''' 998 See Bluetooth spec Vol 4, Part E - 7.3.1 Set Event Mask Command 999 ''' 1000 self.event_mask = command.event_mask 1001 return bytes([HCI_SUCCESS]) 1002 1003 def on_hci_reset_command(self, _command): 1004 ''' 1005 See Bluetooth spec Vol 4, Part E - 7.3.2 Reset Command 1006 ''' 1007 # TODO: cleanup what needs to be reset 1008 return bytes([HCI_SUCCESS]) 1009 1010 def on_hci_write_local_name_command(self, command): 1011 ''' 1012 See Bluetooth spec Vol 4, Part E - 7.3.11 Write Local Name Command 1013 ''' 1014 local_name = command.local_name 1015 if len(local_name): 1016 try: 1017 first_null = local_name.find(0) 1018 if first_null >= 0: 1019 local_name = local_name[:first_null] 1020 self.local_name = str(local_name, 'utf-8') 1021 except UnicodeDecodeError: 1022 pass 1023 return bytes([HCI_SUCCESS]) 1024 1025 def on_hci_read_local_name_command(self, _command): 1026 ''' 1027 See Bluetooth spec Vol 4, Part E - 7.3.12 Read Local Name Command 1028 ''' 1029 local_name = bytes(self.local_name, 'utf-8')[:248] 1030 if len(local_name) < 248: 1031 local_name = local_name + bytes(248 - len(local_name)) 1032 1033 return bytes([HCI_SUCCESS]) + local_name 1034 1035 def on_hci_read_class_of_device_command(self, _command): 1036 ''' 1037 See Bluetooth spec Vol 4, Part E - 7.3.25 Read Class of Device Command 1038 ''' 1039 return bytes([HCI_SUCCESS, 0, 0, 0]) 1040 1041 def on_hci_write_class_of_device_command(self, _command): 1042 ''' 1043 See Bluetooth spec Vol 4, Part E - 7.3.26 Write Class of Device Command 1044 ''' 1045 return bytes([HCI_SUCCESS]) 1046 1047 def on_hci_read_synchronous_flow_control_enable_command(self, _command): 1048 ''' 1049 See Bluetooth spec Vol 4, Part E - 7.3.36 Read Synchronous Flow Control Enable 1050 Command 1051 ''' 1052 if self.sync_flow_control: 1053 ret = 1 1054 else: 1055 ret = 0 1056 return bytes([HCI_SUCCESS, ret]) 1057 1058 def on_hci_write_synchronous_flow_control_enable_command(self, command): 1059 ''' 1060 See Bluetooth spec Vol 4, Part E - 7.3.37 Write Synchronous Flow Control Enable 1061 Command 1062 ''' 1063 ret = HCI_SUCCESS 1064 if command.synchronous_flow_control_enable == 1: 1065 self.sync_flow_control = True 1066 elif command.synchronous_flow_control_enable == 0: 1067 self.sync_flow_control = False 1068 else: 1069 ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR 1070 return bytes([ret]) 1071 1072 def on_hci_set_controller_to_host_flow_control_command(self, _command): 1073 ''' 1074 See Bluetooth spec Vol 4, Part E - 7.3.38 Set Controller To Host Flow Control 1075 Command 1076 ''' 1077 # For now we just accept the command but ignore the values. 1078 # TODO: respect the passed in values. 1079 return bytes([HCI_SUCCESS]) 1080 1081 def on_hci_host_buffer_size_command(self, _command): 1082 ''' 1083 See Bluetooth spec Vol 4, Part E - 7.3.39 Host Buffer Size Command 1084 ''' 1085 # For now we just accept the command but ignore the values. 1086 # TODO: respect the passed in values. 1087 return bytes([HCI_SUCCESS]) 1088 1089 def on_hci_write_extended_inquiry_response_command(self, _command): 1090 ''' 1091 See Bluetooth spec Vol 4, Part E - 7.3.56 Write Extended Inquiry Response 1092 Command 1093 ''' 1094 return bytes([HCI_SUCCESS]) 1095 1096 def on_hci_write_simple_pairing_mode_command(self, _command): 1097 ''' 1098 See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command 1099 ''' 1100 return bytes([HCI_SUCCESS]) 1101 1102 def on_hci_set_event_mask_page_2_command(self, command): 1103 ''' 1104 See Bluetooth spec Vol 4, Part E - 7.3.69 Set Event Mask Page 2 Command 1105 ''' 1106 self.event_mask_page_2 = command.event_mask_page_2 1107 return bytes([HCI_SUCCESS]) 1108 1109 def on_hci_read_le_host_support_command(self, _command): 1110 ''' 1111 See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command 1112 ''' 1113 return bytes([HCI_SUCCESS, 1, 0]) 1114 1115 def on_hci_write_le_host_support_command(self, _command): 1116 ''' 1117 See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command 1118 ''' 1119 # TODO / Just ignore for now 1120 return bytes([HCI_SUCCESS]) 1121 1122 def on_hci_write_authenticated_payload_timeout_command(self, command): 1123 ''' 1124 See Bluetooth spec Vol 4, Part E - 7.3.94 Write Authenticated Payload Timeout 1125 Command 1126 ''' 1127 # TODO 1128 return struct.pack('<BH', HCI_SUCCESS, command.connection_handle) 1129 1130 def on_hci_read_local_version_information_command(self, _command): 1131 ''' 1132 See Bluetooth spec Vol 4, Part E - 7.4.1 Read Local Version Information Command 1133 ''' 1134 return struct.pack( 1135 '<BBHBHH', 1136 HCI_SUCCESS, 1137 self.hci_version, 1138 self.hci_revision, 1139 self.lmp_version, 1140 self.manufacturer_name, 1141 self.lmp_subversion, 1142 ) 1143 1144 def on_hci_read_local_supported_commands_command(self, _command): 1145 ''' 1146 See Bluetooth spec Vol 4, Part E - 7.4.2 Read Local Supported Commands Command 1147 ''' 1148 return bytes([HCI_SUCCESS]) + self.supported_commands 1149 1150 def on_hci_read_local_supported_features_command(self, _command): 1151 ''' 1152 See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command 1153 ''' 1154 return bytes([HCI_SUCCESS]) + self.lmp_features[:8] 1155 1156 def on_hci_read_local_extended_features_command(self, command): 1157 ''' 1158 See Bluetooth spec Vol 4, Part E - 7.4.4 Read Local Extended Features Command 1159 ''' 1160 if command.page_number * 8 > len(self.lmp_features): 1161 return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) 1162 return ( 1163 bytes( 1164 [ 1165 # Status 1166 HCI_SUCCESS, 1167 # Page number 1168 command.page_number, 1169 # Max page number 1170 len(self.lmp_features) // 8 - 1, 1171 ] 1172 ) 1173 # Features of the current page 1174 + self.lmp_features[command.page_number * 8 : (command.page_number + 1) * 8] 1175 ) 1176 1177 def on_hci_read_buffer_size_command(self, _command): 1178 ''' 1179 See Bluetooth spec Vol 4, Part E - 7.4.5 Read Buffer Size Command 1180 ''' 1181 return struct.pack( 1182 '<BHBHH', 1183 HCI_SUCCESS, 1184 self.hc_data_packet_length, 1185 0, 1186 self.hc_total_num_data_packets, 1187 0, 1188 ) 1189 1190 def on_hci_read_bd_addr_command(self, _command): 1191 ''' 1192 See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command 1193 ''' 1194 bd_addr = ( 1195 self._public_address.to_bytes() 1196 if self._public_address is not None 1197 else bytes(6) 1198 ) 1199 return bytes([HCI_SUCCESS]) + bd_addr 1200 1201 def on_hci_le_set_event_mask_command(self, command): 1202 ''' 1203 See Bluetooth spec Vol 4, Part E - 7.8.1 LE Set Event Mask Command 1204 ''' 1205 self.le_event_mask = command.le_event_mask 1206 return bytes([HCI_SUCCESS]) 1207 1208 def on_hci_le_read_buffer_size_command(self, _command): 1209 ''' 1210 See Bluetooth spec Vol 4, Part E - 7.8.2 LE Read Buffer Size Command 1211 ''' 1212 return struct.pack( 1213 '<BHB', 1214 HCI_SUCCESS, 1215 self.hc_le_data_packet_length, 1216 self.hc_total_num_le_data_packets, 1217 ) 1218 1219 def on_hci_le_read_local_supported_features_command(self, _command): 1220 ''' 1221 See Bluetooth spec Vol 4, Part E - 7.8.3 LE Read Local Supported Features 1222 Command 1223 ''' 1224 return bytes([HCI_SUCCESS]) + self.le_features 1225 1226 def on_hci_le_set_random_address_command(self, command): 1227 ''' 1228 See Bluetooth spec Vol 4, Part E - 7.8.4 LE Set Random Address Command 1229 ''' 1230 self.random_address = command.random_address 1231 return bytes([HCI_SUCCESS]) 1232 1233 def on_hci_le_set_advertising_parameters_command(self, command): 1234 ''' 1235 See Bluetooth spec Vol 4, Part E - 7.8.5 LE Set Advertising Parameters Command 1236 ''' 1237 self.advertising_parameters = command 1238 return bytes([HCI_SUCCESS]) 1239 1240 def on_hci_le_read_advertising_physical_channel_tx_power_command(self, _command): 1241 ''' 1242 See Bluetooth spec Vol 4, Part E - 7.8.6 LE Read Advertising Physical Channel 1243 Tx Power Command 1244 ''' 1245 return bytes([HCI_SUCCESS, self.advertising_channel_tx_power]) 1246 1247 def on_hci_le_set_advertising_data_command(self, command): 1248 ''' 1249 See Bluetooth spec Vol 4, Part E - 7.8.7 LE Set Advertising Data Command 1250 ''' 1251 self.advertising_data = command.advertising_data 1252 return bytes([HCI_SUCCESS]) 1253 1254 def on_hci_le_set_scan_response_data_command(self, command): 1255 ''' 1256 See Bluetooth spec Vol 4, Part E - 7.8.8 LE Set Scan Response Data Command 1257 ''' 1258 self.le_scan_response_data = command.scan_response_data 1259 return bytes([HCI_SUCCESS]) 1260 1261 def on_hci_le_set_advertising_enable_command(self, command): 1262 ''' 1263 See Bluetooth spec Vol 4, Part E - 7.8.9 LE Set Advertising Enable Command 1264 ''' 1265 if command.advertising_enable: 1266 self.start_advertising() 1267 else: 1268 self.stop_advertising() 1269 1270 return bytes([HCI_SUCCESS]) 1271 1272 def on_hci_le_set_scan_parameters_command(self, command): 1273 ''' 1274 See Bluetooth spec Vol 4, Part E - 7.8.10 LE Set Scan Parameters Command 1275 ''' 1276 if self.le_scan_enable: 1277 return bytes([HCI_COMMAND_DISALLOWED_ERROR]) 1278 1279 self.le_scan_type = command.le_scan_type 1280 self.le_scan_interval = command.le_scan_interval 1281 self.le_scan_window = command.le_scan_window 1282 self.le_scan_own_address_type = command.own_address_type 1283 self.le_scanning_filter_policy = command.scanning_filter_policy 1284 return bytes([HCI_SUCCESS]) 1285 1286 def on_hci_le_set_scan_enable_command(self, command): 1287 ''' 1288 See Bluetooth spec Vol 4, Part E - 7.8.11 LE Set Scan Enable Command 1289 ''' 1290 self.le_scan_enable = command.le_scan_enable 1291 self.filter_duplicates = command.filter_duplicates 1292 return bytes([HCI_SUCCESS]) 1293 1294 def on_hci_le_create_connection_command(self, command): 1295 ''' 1296 See Bluetooth spec Vol 4, Part E - 7.8.12 LE Create Connection Command 1297 ''' 1298 1299 if not self.link: 1300 return 1301 1302 logger.debug(f'Connection request to {command.peer_address}') 1303 1304 # Check that we don't already have a pending connection 1305 if self.link.get_pending_connection(): 1306 self.send_hci_packet( 1307 HCI_Command_Status_Event( 1308 status=HCI_COMMAND_DISALLOWED_ERROR, 1309 num_hci_command_packets=1, 1310 command_opcode=command.op_code, 1311 ) 1312 ) 1313 return 1314 1315 # Initiate the connection 1316 self.link.connect(self.random_address, command) 1317 1318 # Say that the connection is pending 1319 self.send_hci_packet( 1320 HCI_Command_Status_Event( 1321 status=HCI_COMMAND_STATUS_PENDING, 1322 num_hci_command_packets=1, 1323 command_opcode=command.op_code, 1324 ) 1325 ) 1326 1327 def on_hci_le_create_connection_cancel_command(self, _command): 1328 ''' 1329 See Bluetooth spec Vol 4, Part E - 7.8.13 LE Create Connection Cancel Command 1330 ''' 1331 return bytes([HCI_SUCCESS]) 1332 1333 def on_hci_le_read_filter_accept_list_size_command(self, _command): 1334 ''' 1335 See Bluetooth spec Vol 4, Part E - 7.8.14 LE Read Filter Accept List Size 1336 Command 1337 ''' 1338 return bytes([HCI_SUCCESS, self.filter_accept_list_size]) 1339 1340 def on_hci_le_clear_filter_accept_list_command(self, _command): 1341 ''' 1342 See Bluetooth spec Vol 4, Part E - 7.8.15 LE Clear Filter Accept List Command 1343 ''' 1344 return bytes([HCI_SUCCESS]) 1345 1346 def on_hci_le_add_device_to_filter_accept_list_command(self, _command): 1347 ''' 1348 See Bluetooth spec Vol 4, Part E - 7.8.16 LE Add Device To Filter Accept List 1349 Command 1350 ''' 1351 return bytes([HCI_SUCCESS]) 1352 1353 def on_hci_le_remove_device_from_filter_accept_list_command(self, _command): 1354 ''' 1355 See Bluetooth spec Vol 4, Part E - 7.8.17 LE Remove Device From Filter Accept 1356 List Command 1357 ''' 1358 return bytes([HCI_SUCCESS]) 1359 1360 def on_hci_le_read_remote_features_command(self, command): 1361 ''' 1362 See Bluetooth spec Vol 4, Part E - 7.8.21 LE Read Remote Features Command 1363 ''' 1364 1365 handle = command.connection_handle 1366 1367 if not self.find_connection_by_handle(handle): 1368 self.send_hci_packet( 1369 HCI_Command_Status_Event( 1370 status=HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, 1371 num_hci_command_packets=1, 1372 command_opcode=command.op_code, 1373 ) 1374 ) 1375 return 1376 1377 # First, say that the command is pending 1378 self.send_hci_packet( 1379 HCI_Command_Status_Event( 1380 status=HCI_COMMAND_STATUS_PENDING, 1381 num_hci_command_packets=1, 1382 command_opcode=command.op_code, 1383 ) 1384 ) 1385 1386 # Then send the remote features 1387 self.send_hci_packet( 1388 HCI_LE_Read_Remote_Features_Complete_Event( 1389 status=HCI_SUCCESS, 1390 connection_handle=handle, 1391 le_features=bytes.fromhex('dd40000000000000'), 1392 ) 1393 ) 1394 1395 def on_hci_le_rand_command(self, _command): 1396 ''' 1397 See Bluetooth spec Vol 4, Part E - 7.8.23 LE Rand Command 1398 ''' 1399 return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64)) 1400 1401 def on_hci_le_enable_encryption_command(self, command): 1402 ''' 1403 See Bluetooth spec Vol 4, Part E - 7.8.24 LE Enable Encryption Command 1404 ''' 1405 1406 # Check the parameters 1407 if not ( 1408 connection := self.find_central_connection_by_handle( 1409 command.connection_handle 1410 ) 1411 ): 1412 logger.warning('connection not found') 1413 return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) 1414 1415 # Notify that the connection is now encrypted 1416 self.link.on_connection_encrypted( 1417 self.random_address, 1418 connection.peer_address, 1419 command.random_number, 1420 command.encrypted_diversifier, 1421 command.long_term_key, 1422 ) 1423 1424 self.send_hci_packet( 1425 HCI_Command_Status_Event( 1426 status=HCI_COMMAND_STATUS_PENDING, 1427 num_hci_command_packets=1, 1428 command_opcode=command.op_code, 1429 ) 1430 ) 1431 1432 return None 1433 1434 def on_hci_le_read_supported_states_command(self, _command): 1435 ''' 1436 See Bluetooth spec Vol 4, Part E - 7.8.27 LE Read Supported States Command 1437 ''' 1438 return bytes([HCI_SUCCESS]) + self.le_states 1439 1440 def on_hci_le_read_suggested_default_data_length_command(self, _command): 1441 ''' 1442 See Bluetooth spec Vol 4, Part E - 7.8.34 LE Read Suggested Default Data Length 1443 Command 1444 ''' 1445 return struct.pack( 1446 '<BHH', 1447 HCI_SUCCESS, 1448 self.suggested_max_tx_octets, 1449 self.suggested_max_tx_time, 1450 ) 1451 1452 def on_hci_le_write_suggested_default_data_length_command(self, command): 1453 ''' 1454 See Bluetooth spec Vol 4, Part E - 7.8.35 LE Write Suggested Default Data Length 1455 Command 1456 ''' 1457 self.suggested_max_tx_octets, self.suggested_max_tx_time = struct.unpack( 1458 '<HH', command.parameters[:4] 1459 ) 1460 return bytes([HCI_SUCCESS]) 1461 1462 def on_hci_le_read_local_p_256_public_key_command(self, _command): 1463 ''' 1464 See Bluetooth spec Vol 4, Part E - 7.8.36 LE Read P-256 Public Key Command 1465 ''' 1466 # TODO create key and send HCI_LE_Read_Local_P-256_Public_Key_Complete event 1467 return bytes([HCI_SUCCESS]) 1468 1469 def on_hci_le_add_device_to_resolving_list_command(self, _command): 1470 ''' 1471 See Bluetooth spec Vol 4, Part E - 7.8.38 LE Add Device To Resolving List 1472 Command 1473 ''' 1474 return bytes([HCI_SUCCESS]) 1475 1476 def on_hci_le_clear_resolving_list_command(self, _command): 1477 ''' 1478 See Bluetooth spec Vol 4, Part E - 7.8.40 LE Clear Resolving List Command 1479 ''' 1480 return bytes([HCI_SUCCESS]) 1481 1482 def on_hci_le_read_resolving_list_size_command(self, _command): 1483 ''' 1484 See Bluetooth spec Vol 4, Part E - 7.8.41 LE Read Resolving List Size Command 1485 ''' 1486 return bytes([HCI_SUCCESS, self.resolving_list_size]) 1487 1488 def on_hci_le_set_address_resolution_enable_command(self, command): 1489 ''' 1490 See Bluetooth spec Vol 4, Part E - 7.8.44 LE Set Address Resolution Enable 1491 Command 1492 ''' 1493 ret = HCI_SUCCESS 1494 if command.address_resolution_enable == 1: 1495 self.le_address_resolution = True 1496 elif command.address_resolution_enable == 0: 1497 self.le_address_resolution = False 1498 else: 1499 ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR 1500 return bytes([ret]) 1501 1502 def on_hci_le_set_resolvable_private_address_timeout_command(self, command): 1503 ''' 1504 See Bluetooth spec Vol 4, Part E - 7.8.45 LE Set Resolvable Private Address 1505 Timeout Command 1506 ''' 1507 self.le_rpa_timeout = command.rpa_timeout 1508 return bytes([HCI_SUCCESS]) 1509 1510 def on_hci_le_read_maximum_data_length_command(self, _command): 1511 ''' 1512 See Bluetooth spec Vol 4, Part E - 7.8.46 LE Read Maximum Data Length Command 1513 ''' 1514 return struct.pack( 1515 '<BHHHH', 1516 HCI_SUCCESS, 1517 self.supported_max_tx_octets, 1518 self.supported_max_tx_time, 1519 self.supported_max_rx_octets, 1520 self.supported_max_rx_time, 1521 ) 1522 1523 def on_hci_le_read_phy_command(self, command): 1524 ''' 1525 See Bluetooth spec Vol 4, Part E - 7.8.47 LE Read PHY Command 1526 ''' 1527 return struct.pack( 1528 '<BHBB', 1529 HCI_SUCCESS, 1530 command.connection_handle, 1531 HCI_LE_1M_PHY, 1532 HCI_LE_1M_PHY, 1533 ) 1534 1535 def on_hci_le_set_default_phy_command(self, command): 1536 ''' 1537 See Bluetooth spec Vol 4, Part E - 7.8.48 LE Set Default PHY Command 1538 ''' 1539 self.default_phy = { 1540 'all_phys': command.all_phys, 1541 'tx_phys': command.tx_phys, 1542 'rx_phys': command.rx_phys, 1543 } 1544 return bytes([HCI_SUCCESS]) 1545 1546 def on_hci_le_read_maximum_advertising_data_length_command(self, _command): 1547 ''' 1548 See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data 1549 Length Command 1550 ''' 1551 return struct.pack('<BH', HCI_SUCCESS, 0x0672) 1552 1553 def on_hci_le_read_number_of_supported_advertising_sets_command(self, _command): 1554 ''' 1555 See Bluetooth spec Vol 4, Part E - 7.8.58 LE Read Number of Supported 1556 Advertising Set Command 1557 ''' 1558 return struct.pack('<BB', HCI_SUCCESS, 0xF0) 1559 1560 def on_hci_le_read_transmit_power_command(self, _command): 1561 ''' 1562 See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command 1563 ''' 1564 return struct.pack('<BBB', HCI_SUCCESS, 0, 0) 1565 1566 def on_hci_le_set_cig_parameters_command(self, command): 1567 ''' 1568 See Bluetooth spec Vol 4, Part E - 7.8.97 LE Set CIG Parameter Command 1569 ''' 1570 1571 # Remove old CIG implicitly. 1572 for handle, cis_link in self.central_cis_links.items(): 1573 if cis_link.cig_id == command.cig_id: 1574 self.central_cis_links.pop(handle) 1575 1576 handles = [] 1577 for cis_id in command.cis_id: 1578 handle = self.allocate_connection_handle() 1579 handles.append(handle) 1580 self.central_cis_links[handle] = CisLink( 1581 cis_id=cis_id, 1582 cig_id=command.cig_id, 1583 handle=handle, 1584 ) 1585 return struct.pack( 1586 '<BBB', HCI_SUCCESS, command.cig_id, len(handles) 1587 ) + b''.join([struct.pack('<H', handle) for handle in handles]) 1588 1589 def on_hci_le_create_cis_command(self, command): 1590 ''' 1591 See Bluetooth spec Vol 4, Part E - 7.8.99 LE Create CIS Command 1592 ''' 1593 if not self.link: 1594 return 1595 1596 for cis_handle, acl_handle in zip( 1597 command.cis_connection_handle, command.acl_connection_handle 1598 ): 1599 if not (connection := self.find_connection_by_handle(acl_handle)): 1600 logger.error(f'Cannot find connection with handle={acl_handle}') 1601 return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) 1602 1603 if not (cis_link := self.central_cis_links.get(cis_handle)): 1604 logger.error(f'Cannot find CIS with handle={cis_handle}') 1605 return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) 1606 1607 cis_link.acl_connection = connection 1608 1609 self.link.create_cis( 1610 self, 1611 peripheral_address=connection.peer_address, 1612 cig_id=cis_link.cig_id, 1613 cis_id=cis_link.cis_id, 1614 ) 1615 1616 self.send_hci_packet( 1617 HCI_Command_Status_Event( 1618 status=HCI_COMMAND_STATUS_PENDING, 1619 num_hci_command_packets=1, 1620 command_opcode=command.op_code, 1621 ) 1622 ) 1623 1624 def on_hci_le_remove_cig_command(self, command): 1625 ''' 1626 See Bluetooth spec Vol 4, Part E - 7.8.100 LE Remove CIG Command 1627 ''' 1628 1629 status = HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR 1630 1631 for cis_handle, cis_link in self.central_cis_links.items(): 1632 if cis_link.cig_id == command.cig_id: 1633 self.central_cis_links.pop(cis_handle) 1634 status = HCI_SUCCESS 1635 1636 return struct.pack('<BH', status, command.cig_id) 1637 1638 def on_hci_le_accept_cis_request_command(self, command): 1639 ''' 1640 See Bluetooth spec Vol 4, Part E - 7.8.101 LE Accept CIS Request Command 1641 ''' 1642 if not self.link: 1643 return 1644 1645 if not ( 1646 pending_cis_link := self.peripheral_cis_links.get(command.connection_handle) 1647 ): 1648 logger.error(f'Cannot find CIS with handle={command.connection_handle}') 1649 return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) 1650 1651 assert pending_cis_link.acl_connection 1652 self.link.accept_cis( 1653 peripheral_controller=self, 1654 central_address=pending_cis_link.acl_connection.peer_address, 1655 cig_id=pending_cis_link.cig_id, 1656 cis_id=pending_cis_link.cis_id, 1657 ) 1658 1659 self.send_hci_packet( 1660 HCI_Command_Status_Event( 1661 status=HCI_COMMAND_STATUS_PENDING, 1662 num_hci_command_packets=1, 1663 command_opcode=command.op_code, 1664 ) 1665 ) 1666 1667 def on_hci_le_setup_iso_data_path_command(self, command): 1668 ''' 1669 See Bluetooth spec Vol 4, Part E - 7.8.109 LE Setup ISO Data Path Command 1670 ''' 1671 return struct.pack('<BH', HCI_SUCCESS, command.connection_handle) 1672 1673 def on_hci_le_remove_iso_data_path_command(self, command): 1674 ''' 1675 See Bluetooth spec Vol 4, Part E - 7.8.110 LE Remove ISO Data Path Command 1676 ''' 1677 return struct.pack('<BH', HCI_SUCCESS, command.connection_handle) 1678