1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19
20import logging
21import asyncio
22import dataclasses
23import itertools
24import random
25import struct
26from bumble.colors import color
27from bumble.core import (
28    BT_CENTRAL_ROLE,
29    BT_PERIPHERAL_ROLE,
30    BT_LE_TRANSPORT,
31    BT_BR_EDR_TRANSPORT,
32)
33
34from bumble.hci import (
35    HCI_ACL_DATA_PACKET,
36    HCI_COMMAND_DISALLOWED_ERROR,
37    HCI_COMMAND_PACKET,
38    HCI_COMMAND_STATUS_PENDING,
39    HCI_CONNECTION_TIMEOUT_ERROR,
40    HCI_CONTROLLER_BUSY_ERROR,
41    HCI_EVENT_PACKET,
42    HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
43    HCI_LE_1M_PHY,
44    HCI_SUCCESS,
45    HCI_UNKNOWN_HCI_COMMAND_ERROR,
46    HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
47    HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
48    HCI_VERSION_BLUETOOTH_CORE_5_0,
49    Address,
50    HCI_AclDataPacket,
51    HCI_AclDataPacketAssembler,
52    HCI_Command_Complete_Event,
53    HCI_Command_Status_Event,
54    HCI_Connection_Complete_Event,
55    HCI_Connection_Request_Event,
56    HCI_Disconnection_Complete_Event,
57    HCI_Encryption_Change_Event,
58    HCI_Synchronous_Connection_Complete_Event,
59    HCI_LE_Advertising_Report_Event,
60    HCI_LE_CIS_Established_Event,
61    HCI_LE_CIS_Request_Event,
62    HCI_LE_Connection_Complete_Event,
63    HCI_LE_Read_Remote_Features_Complete_Event,
64    HCI_Number_Of_Completed_Packets_Event,
65    HCI_Packet,
66    HCI_Role_Change_Event,
67)
68from typing import Optional, Union, Dict, Any, TYPE_CHECKING
69
70if TYPE_CHECKING:
71    from bumble.link import LocalLink
72    from bumble.transport.common import TransportSink
73
74# -----------------------------------------------------------------------------
75# Logging
76# -----------------------------------------------------------------------------
77logger = logging.getLogger(__name__)
78
79
80# -----------------------------------------------------------------------------
81# Utils
82# -----------------------------------------------------------------------------
83class DataObject:
84    pass
85
86
87# -----------------------------------------------------------------------------
88@dataclasses.dataclass
89class CisLink:
90    handle: int
91    cis_id: int
92    cig_id: int
93    acl_connection: Optional[Connection] = None
94
95
96# -----------------------------------------------------------------------------
97@dataclasses.dataclass
98class Connection:
99    controller: Controller
100    handle: int
101    role: int
102    peer_address: Address
103    link: Any
104    transport: int
105    link_type: int
106
107    def __post_init__(self):
108        self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
109
110    def on_hci_acl_data_packet(self, packet):
111        self.assembler.feed_packet(packet)
112        self.controller.send_hci_packet(
113            HCI_Number_Of_Completed_Packets_Event([(self.handle, 1)])
114        )
115
116    def on_acl_pdu(self, data):
117        if self.link:
118            self.link.send_acl_data(
119                self.controller, self.peer_address, self.transport, data
120            )
121
122
123# -----------------------------------------------------------------------------
124class Controller:
125    def __init__(
126        self,
127        name: str,
128        host_source=None,
129        host_sink: Optional[TransportSink] = None,
130        link: Optional[LocalLink] = None,
131        public_address: Optional[Union[bytes, str, Address]] = None,
132    ):
133        self.name = name
134        self.hci_sink = None
135        self.link = link
136
137        self.central_connections: Dict[Address, Connection] = (
138            {}
139        )  # Connections where this controller is the central
140        self.peripheral_connections: Dict[Address, Connection] = (
141            {}
142        )  # Connections where this controller is the peripheral
143        self.classic_connections: Dict[Address, Connection] = (
144            {}
145        )  # Connections in BR/EDR
146        self.central_cis_links: Dict[int, CisLink] = {}  # CIS links by handle
147        self.peripheral_cis_links: Dict[int, CisLink] = {}  # CIS links by handle
148
149        self.hci_version = HCI_VERSION_BLUETOOTH_CORE_5_0
150        self.hci_revision = 0
151        self.lmp_version = HCI_VERSION_BLUETOOTH_CORE_5_0
152        self.lmp_subversion = 0
153        self.lmp_features = bytes.fromhex(
154            '0000000060000000'
155        )  # BR/EDR Not Supported, LE Supported (Controller)
156        self.manufacturer_name = 0xFFFF
157        self.hc_data_packet_length = 27
158        self.hc_total_num_data_packets = 64
159        self.hc_le_data_packet_length = 27
160        self.hc_total_num_le_data_packets = 64
161        self.event_mask = 0
162        self.event_mask_page_2 = 0
163        self.supported_commands = bytes.fromhex(
164            '2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
165            '30f0f9ff01008004000000000000000000000000000000000000000000000000'
166        )
167        self.le_event_mask = 0
168        self.advertising_parameters = None
169        self.le_features = bytes.fromhex('ff49010000000000')
170        self.le_states = bytes.fromhex('ffff3fffff030000')
171        self.advertising_channel_tx_power = 0
172        self.filter_accept_list_size = 8
173        self.filter_duplicates = False
174        self.resolving_list_size = 8
175        self.supported_max_tx_octets = 27
176        self.supported_max_tx_time = 10000  # microseconds
177        self.supported_max_rx_octets = 27
178        self.supported_max_rx_time = 10000  # microseconds
179        self.suggested_max_tx_octets = 27
180        self.suggested_max_tx_time = 0x0148  # microseconds
181        self.default_phy = bytes([0, 0, 0])
182        self.le_scan_type = 0
183        self.le_scan_interval = 0x10
184        self.le_scan_window = 0x10
185        self.le_scan_enable = 0
186        self.le_scan_own_address_type = Address.RANDOM_DEVICE_ADDRESS
187        self.le_scanning_filter_policy = 0
188        self.le_scan_response_data = None
189        self.le_address_resolution = False
190        self.le_rpa_timeout = 0
191        self.sync_flow_control = False
192        self.local_name = 'Bumble'
193
194        self.advertising_interval = 2000  # Fixed for now
195        self.advertising_data = None
196        self.advertising_timer_handle = None
197
198        self._random_address = Address('00:00:00:00:00:00')
199        if isinstance(public_address, Address):
200            self._public_address = public_address
201        elif public_address is not None:
202            self._public_address = Address(
203                public_address, Address.PUBLIC_DEVICE_ADDRESS
204            )
205        else:
206            self._public_address = Address('00:00:00:00:00:00')
207
208        # Set the source and sink interfaces
209        if host_source:
210            host_source.set_packet_sink(self)
211        self.host = host_sink
212
213        # Add this controller to the link if specified
214        if link:
215            link.add_controller(self)
216
217        self.terminated = asyncio.get_running_loop().create_future()
218
219    @property
220    def host(self):
221        return self.hci_sink
222
223    @host.setter
224    def host(self, host):
225        '''
226        Sets the host (sink) for this controller, and set this controller as the
227        controller (sink) for the host
228        '''
229        self.set_packet_sink(host)
230        if host:
231            host.controller = self
232
233    def set_packet_sink(self, sink):
234        '''
235        Method from the Packet Source interface
236        '''
237        self.hci_sink = sink
238
239    @property
240    def public_address(self):
241        return self._public_address
242
243    @public_address.setter
244    def public_address(self, address):
245        if isinstance(address, str):
246            address = Address(address)
247        self._public_address = address
248
249    @property
250    def random_address(self):
251        return self._random_address
252
253    @random_address.setter
254    def random_address(self, address):
255        if isinstance(address, str):
256            address = Address(address)
257        self._random_address = address
258        logger.debug(f'new random address: {address}')
259
260        if self.link:
261            self.link.on_address_changed(self)
262
263    # Packet Sink protocol (packets coming from the host via HCI)
264    def on_packet(self, packet):
265        self.on_hci_packet(HCI_Packet.from_bytes(packet))
266
267    def on_hci_packet(self, packet):
268        logger.debug(
269            f'{color("<<<", "blue")} [{self.name}] '
270            f'{color("HOST -> CONTROLLER", "blue")}: {packet}'
271        )
272
273        # If the packet is a command, invoke the handler for this packet
274        if packet.hci_packet_type == HCI_COMMAND_PACKET:
275            self.on_hci_command_packet(packet)
276        elif packet.hci_packet_type == HCI_EVENT_PACKET:
277            self.on_hci_event_packet(packet)
278        elif packet.hci_packet_type == HCI_ACL_DATA_PACKET:
279            self.on_hci_acl_data_packet(packet)
280        else:
281            logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
282
283    def on_hci_command_packet(self, command):
284        handler_name = f'on_{command.name.lower()}'
285        handler = getattr(self, handler_name, self.on_hci_command)
286        result = handler(command)
287        if isinstance(result, bytes):
288            self.send_hci_packet(
289                HCI_Command_Complete_Event(
290                    num_hci_command_packets=1,
291                    command_opcode=command.op_code,
292                    return_parameters=result,
293                )
294            )
295
296    def on_hci_event_packet(self, _event):
297        logger.warning('!!! unexpected event packet')
298
299    def on_hci_acl_data_packet(self, packet):
300        # Look for the connection to which this data belongs
301        connection = self.find_connection_by_handle(packet.connection_handle)
302        if connection is None:
303            logger.warning(
304                f'!!! no connection for handle 0x{packet.connection_handle:04X}'
305            )
306            return
307
308        # Pass the packet to the connection
309        connection.on_hci_acl_data_packet(packet)
310
311    def send_hci_packet(self, packet):
312        logger.debug(
313            f'{color(">>>", "green")} [{self.name}] '
314            f'{color("CONTROLLER -> HOST", "green")}: {packet}'
315        )
316        if self.host:
317            self.host.on_packet(packet.to_bytes())
318
319    # This method allows the controller to emulate the same API as a transport source
320    async def wait_for_termination(self):
321        await self.terminated
322
323    ############################################################
324    # Link connections
325    ############################################################
326    def allocate_connection_handle(self) -> int:
327        handle = 0
328        max_handle = 0
329        for connection in itertools.chain(
330            self.central_connections.values(),
331            self.peripheral_connections.values(),
332            self.classic_connections.values(),
333        ):
334            max_handle = max(max_handle, connection.handle)
335            if connection.handle == handle:
336                # Already used, continue searching after the current max
337                handle = max_handle + 1
338        for cis_handle in itertools.chain(
339            self.central_cis_links.keys(), self.peripheral_cis_links.keys()
340        ):
341            max_handle = max(max_handle, cis_handle)
342            if cis_handle == handle:
343                # Already used, continue searching after the current max
344                handle = max_handle + 1
345        return handle
346
347    def find_le_connection_by_address(self, address):
348        return self.central_connections.get(address) or self.peripheral_connections.get(
349            address
350        )
351
352    def find_classic_connection_by_address(self, address):
353        return self.classic_connections.get(address)
354
355    def find_connection_by_handle(self, handle):
356        for connection in itertools.chain(
357            self.central_connections.values(),
358            self.peripheral_connections.values(),
359            self.classic_connections.values(),
360        ):
361            if connection.handle == handle:
362                return connection
363        return None
364
365    def find_central_connection_by_handle(self, handle):
366        for connection in self.central_connections.values():
367            if connection.handle == handle:
368                return connection
369        return None
370
371    def find_classic_connection_by_handle(self, handle):
372        for connection in self.classic_connections.values():
373            if connection.handle == handle:
374                return connection
375        return None
376
377    def on_link_central_connected(self, central_address):
378        '''
379        Called when an incoming connection occurs from a central on the link
380        '''
381
382        # Allocate (or reuse) a connection handle
383        peer_address = central_address
384        peer_address_type = central_address.address_type
385        connection = self.peripheral_connections.get(peer_address)
386        if connection is None:
387            connection_handle = self.allocate_connection_handle()
388            connection = Connection(
389                controller=self,
390                handle=connection_handle,
391                role=BT_PERIPHERAL_ROLE,
392                peer_address=peer_address,
393                link=self.link,
394                transport=BT_LE_TRANSPORT,
395                link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
396            )
397            self.peripheral_connections[peer_address] = connection
398            logger.debug(f'New PERIPHERAL connection handle: 0x{connection_handle:04X}')
399
400        # Then say that the connection has completed
401        self.send_hci_packet(
402            HCI_LE_Connection_Complete_Event(
403                status=HCI_SUCCESS,
404                connection_handle=connection.handle,
405                role=connection.role,
406                peer_address_type=peer_address_type,
407                peer_address=peer_address,
408                connection_interval=10,  # FIXME
409                peripheral_latency=0,  # FIXME
410                supervision_timeout=10,  # FIXME
411                central_clock_accuracy=7,  # FIXME
412            )
413        )
414
415    def on_link_central_disconnected(self, peer_address, reason):
416        '''
417        Called when an active disconnection occurs from a peer
418        '''
419
420        # Send a disconnection complete event
421        if connection := self.peripheral_connections.get(peer_address):
422            self.send_hci_packet(
423                HCI_Disconnection_Complete_Event(
424                    status=HCI_SUCCESS,
425                    connection_handle=connection.handle,
426                    reason=reason,
427                )
428            )
429
430            # Remove the connection
431            del self.peripheral_connections[peer_address]
432        else:
433            logger.warning(f'!!! No peripheral connection found for {peer_address}')
434
435    def on_link_peripheral_connection_complete(
436        self, le_create_connection_command, status
437    ):
438        '''
439        Called by the link when a connection has been made or has failed to be made
440        '''
441
442        if status == HCI_SUCCESS:
443            # Allocate (or reuse) a connection handle
444            peer_address = le_create_connection_command.peer_address
445            connection = self.central_connections.get(peer_address)
446            if connection is None:
447                connection_handle = self.allocate_connection_handle()
448                connection = Connection(
449                    controller=self,
450                    handle=connection_handle,
451                    role=BT_CENTRAL_ROLE,
452                    peer_address=peer_address,
453                    link=self.link,
454                    transport=BT_LE_TRANSPORT,
455                    link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
456                )
457                self.central_connections[peer_address] = connection
458                logger.debug(
459                    f'New CENTRAL connection handle: 0x{connection_handle:04X}'
460                )
461        else:
462            connection = None
463
464        # Say that the connection has completed
465        self.send_hci_packet(
466            # pylint: disable=line-too-long
467            HCI_LE_Connection_Complete_Event(
468                status=status,
469                connection_handle=connection.handle if connection else 0,
470                role=BT_CENTRAL_ROLE,
471                peer_address_type=le_create_connection_command.peer_address_type,
472                peer_address=le_create_connection_command.peer_address,
473                connection_interval=le_create_connection_command.connection_interval_min,
474                peripheral_latency=le_create_connection_command.max_latency,
475                supervision_timeout=le_create_connection_command.supervision_timeout,
476                central_clock_accuracy=0,
477            )
478        )
479
480    def on_link_peripheral_disconnection_complete(self, disconnection_command, status):
481        '''
482        Called when a disconnection has been completed
483        '''
484
485        # Send a disconnection complete event
486        self.send_hci_packet(
487            HCI_Disconnection_Complete_Event(
488                status=status,
489                connection_handle=disconnection_command.connection_handle,
490                reason=disconnection_command.reason,
491            )
492        )
493
494        # Remove the connection
495        if connection := self.find_central_connection_by_handle(
496            disconnection_command.connection_handle
497        ):
498            logger.debug(f'CENTRAL Connection removed: {connection}')
499            del self.central_connections[connection.peer_address]
500
501    def on_link_peripheral_disconnected(self, peer_address):
502        '''
503        Called when a connection to a peripheral is broken
504        '''
505
506        # Send a disconnection complete event
507        if connection := self.central_connections.get(peer_address):
508            self.send_hci_packet(
509                HCI_Disconnection_Complete_Event(
510                    status=HCI_SUCCESS,
511                    connection_handle=connection.handle,
512                    reason=HCI_CONNECTION_TIMEOUT_ERROR,
513                )
514            )
515
516            # Remove the connection
517            del self.central_connections[peer_address]
518        else:
519            logger.warning(f'!!! No central connection found for {peer_address}')
520
521    def on_link_encrypted(self, peer_address, _rand, _ediv, _ltk):
522        # For now, just setup the encryption without asking the host
523        if connection := self.find_le_connection_by_address(peer_address):
524            self.send_hci_packet(
525                HCI_Encryption_Change_Event(
526                    status=0, connection_handle=connection.handle, encryption_enabled=1
527                )
528            )
529
530    def on_link_acl_data(self, sender_address, transport, data):
531        # Look for the connection to which this data belongs
532        if transport == BT_LE_TRANSPORT:
533            connection = self.find_le_connection_by_address(sender_address)
534        else:
535            connection = self.find_classic_connection_by_address(sender_address)
536        if connection is None:
537            logger.warning(f'!!! no connection for {sender_address}')
538            return
539
540        # Send the data to the host
541        # TODO: should fragment
542        acl_packet = HCI_AclDataPacket(connection.handle, 2, 0, len(data), data)
543        self.send_hci_packet(acl_packet)
544
545    def on_link_advertising_data(self, sender_address, data):
546        # Ignore if we're not scanning
547        if self.le_scan_enable == 0:
548            return
549
550        # Send a scan report
551        report = HCI_LE_Advertising_Report_Event.Report(
552            HCI_LE_Advertising_Report_Event.Report.FIELDS,
553            event_type=HCI_LE_Advertising_Report_Event.ADV_IND,
554            address_type=sender_address.address_type,
555            address=sender_address,
556            data=data,
557            rssi=-50,
558        )
559        self.send_hci_packet(HCI_LE_Advertising_Report_Event([report]))
560
561        # Simulate a scan response
562        report = HCI_LE_Advertising_Report_Event.Report(
563            HCI_LE_Advertising_Report_Event.Report.FIELDS,
564            event_type=HCI_LE_Advertising_Report_Event.SCAN_RSP,
565            address_type=sender_address.address_type,
566            address=sender_address,
567            data=data,
568            rssi=-50,
569        )
570        self.send_hci_packet(HCI_LE_Advertising_Report_Event([report]))
571
572    def on_link_cis_request(
573        self, central_address: Address, cig_id: int, cis_id: int
574    ) -> None:
575        '''
576        Called when an incoming CIS request occurs from a central on the link
577        '''
578
579        connection = self.peripheral_connections.get(central_address)
580        assert connection
581
582        pending_cis_link = CisLink(
583            handle=self.allocate_connection_handle(),
584            cis_id=cis_id,
585            cig_id=cig_id,
586            acl_connection=connection,
587        )
588        self.peripheral_cis_links[pending_cis_link.handle] = pending_cis_link
589
590        self.send_hci_packet(
591            HCI_LE_CIS_Request_Event(
592                acl_connection_handle=connection.handle,
593                cis_connection_handle=pending_cis_link.handle,
594                cig_id=cig_id,
595                cis_id=cis_id,
596            )
597        )
598
599    def on_link_cis_established(self, cig_id: int, cis_id: int) -> None:
600        '''
601        Called when an incoming CIS established.
602        '''
603
604        cis_link = next(
605            cis_link
606            for cis_link in itertools.chain(
607                self.central_cis_links.values(), self.peripheral_cis_links.values()
608            )
609            if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id
610        )
611
612        self.send_hci_packet(
613            HCI_LE_CIS_Established_Event(
614                status=HCI_SUCCESS,
615                connection_handle=cis_link.handle,
616                # CIS parameters are ignored.
617                cig_sync_delay=0,
618                cis_sync_delay=0,
619                transport_latency_c_to_p=0,
620                transport_latency_p_to_c=0,
621                phy_c_to_p=0,
622                phy_p_to_c=0,
623                nse=0,
624                bn_c_to_p=0,
625                bn_p_to_c=0,
626                ft_c_to_p=0,
627                ft_p_to_c=0,
628                max_pdu_c_to_p=0,
629                max_pdu_p_to_c=0,
630                iso_interval=0,
631            )
632        )
633
634    def on_link_cis_disconnected(self, cig_id: int, cis_id: int) -> None:
635        '''
636        Called when a CIS disconnected.
637        '''
638
639        if cis_link := next(
640            (
641                cis_link
642                for cis_link in self.peripheral_cis_links.values()
643                if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id
644            ),
645            None,
646        ):
647            # Remove peripheral CIS on disconnection.
648            self.peripheral_cis_links.pop(cis_link.handle)
649        elif cis_link := next(
650            (
651                cis_link
652                for cis_link in self.central_cis_links.values()
653                if cis_link.cis_id == cis_id and cis_link.cig_id == cig_id
654            ),
655            None,
656        ):
657            # Keep central CIS on disconnection. They should be removed by HCI_LE_Remove_CIG_Command.
658            cis_link.acl_connection = None
659        else:
660            return
661
662        self.send_hci_packet(
663            HCI_Disconnection_Complete_Event(
664                status=HCI_SUCCESS,
665                connection_handle=cis_link.handle,
666                reason=HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
667            )
668        )
669
670    ############################################################
671    # Classic link connections
672    ############################################################
673
674    def on_classic_connection_request(self, peer_address, link_type):
675        self.send_hci_packet(
676            HCI_Connection_Request_Event(
677                bd_addr=peer_address,
678                class_of_device=0,
679                link_type=link_type,
680            )
681        )
682
683    def on_classic_connection_complete(self, peer_address, status):
684        if status == HCI_SUCCESS:
685            # Allocate (or reuse) a connection handle
686            peer_address = peer_address
687            connection = self.classic_connections.get(peer_address)
688            if connection is None:
689                connection_handle = self.allocate_connection_handle()
690                connection = Connection(
691                    controller=self,
692                    handle=connection_handle,
693                    # Role doesn't matter in Classic because they are managed by HCI_Role_Change and HCI_Role_Discovery
694                    role=BT_CENTRAL_ROLE,
695                    peer_address=peer_address,
696                    link=self.link,
697                    transport=BT_BR_EDR_TRANSPORT,
698                    link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
699                )
700                self.classic_connections[peer_address] = connection
701                logger.debug(
702                    f'New CLASSIC connection handle: 0x{connection_handle:04X}'
703                )
704            else:
705                connection_handle = connection.handle
706            self.send_hci_packet(
707                HCI_Connection_Complete_Event(
708                    status=status,
709                    connection_handle=connection_handle,
710                    bd_addr=peer_address,
711                    encryption_enabled=False,
712                    link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
713                )
714            )
715        else:
716            connection = None
717            self.send_hci_packet(
718                HCI_Connection_Complete_Event(
719                    status=status,
720                    connection_handle=0,
721                    bd_addr=peer_address,
722                    encryption_enabled=False,
723                    link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
724                )
725            )
726
727    def on_classic_disconnected(self, peer_address, reason):
728        # Send a disconnection complete event
729        if connection := self.classic_connections.get(peer_address):
730            self.send_hci_packet(
731                HCI_Disconnection_Complete_Event(
732                    status=HCI_SUCCESS,
733                    connection_handle=connection.handle,
734                    reason=reason,
735                )
736            )
737
738            # Remove the connection
739            del self.classic_connections[peer_address]
740        else:
741            logger.warning(f'!!! No classic connection found for {peer_address}')
742
743    def on_classic_role_change(self, peer_address, new_role):
744        self.send_hci_packet(
745            HCI_Role_Change_Event(
746                status=HCI_SUCCESS,
747                bd_addr=peer_address,
748                new_role=new_role,
749            )
750        )
751
752    def on_classic_sco_connection_complete(
753        self, peer_address: Address, status: int, link_type: int
754    ):
755        if status == HCI_SUCCESS:
756            # Allocate (or reuse) a connection handle
757            connection_handle = self.allocate_connection_handle()
758            connection = Connection(
759                controller=self,
760                handle=connection_handle,
761                # Role doesn't matter in SCO.
762                role=BT_CENTRAL_ROLE,
763                peer_address=peer_address,
764                link=self.link,
765                transport=BT_BR_EDR_TRANSPORT,
766                link_type=link_type,
767            )
768            self.classic_connections[peer_address] = connection
769            logger.debug(f'New SCO connection handle: 0x{connection_handle:04X}')
770        else:
771            connection_handle = 0
772
773        self.send_hci_packet(
774            HCI_Synchronous_Connection_Complete_Event(
775                status=status,
776                connection_handle=connection_handle,
777                bd_addr=peer_address,
778                link_type=link_type,
779                # TODO: Provide SCO connection parameters.
780                transmission_interval=0,
781                retransmission_window=0,
782                rx_packet_length=0,
783                tx_packet_length=0,
784                air_mode=0,
785            )
786        )
787
788    ############################################################
789    # Advertising support
790    ############################################################
791    def on_advertising_timer_fired(self):
792        self.send_advertising_data()
793        self.advertising_timer_handle = asyncio.get_running_loop().call_later(
794            self.advertising_interval / 1000.0, self.on_advertising_timer_fired
795        )
796
797    def start_advertising(self):
798        # Stop any ongoing advertising before we start again
799        self.stop_advertising()
800
801        # Advertise now
802        self.advertising_timer_handle = asyncio.get_running_loop().call_soon(
803            self.on_advertising_timer_fired
804        )
805
806    def stop_advertising(self):
807        if self.advertising_timer_handle is not None:
808            self.advertising_timer_handle.cancel()
809            self.advertising_timer_handle = None
810
811    def send_advertising_data(self):
812        if self.link and self.advertising_data:
813            self.link.send_advertising_data(self.random_address, self.advertising_data)
814
815    @property
816    def is_advertising(self):
817        return self.advertising_timer_handle is not None
818
819    ############################################################
820    # HCI handlers
821    ############################################################
822    def on_hci_command(self, command):
823        logger.warning(color(f'--- Unsupported command {command}', 'red'))
824        return bytes([HCI_UNKNOWN_HCI_COMMAND_ERROR])
825
826    def on_hci_create_connection_command(self, command):
827        '''
828        See Bluetooth spec Vol 4, Part E - 7.1.5 Create Connection command
829        '''
830
831        if self.link is None:
832            return
833        logger.debug(f'Connection request to {command.bd_addr}')
834
835        # Check that we don't already have a pending connection
836        if self.link.get_pending_connection():
837            self.send_hci_packet(
838                HCI_Command_Status_Event(
839                    status=HCI_CONTROLLER_BUSY_ERROR,
840                    num_hci_command_packets=1,
841                    command_opcode=command.op_code,
842                )
843            )
844            return
845
846        self.link.classic_connect(self, command.bd_addr)
847
848        # Say that the connection is pending
849        self.send_hci_packet(
850            HCI_Command_Status_Event(
851                status=HCI_COMMAND_STATUS_PENDING,
852                num_hci_command_packets=1,
853                command_opcode=command.op_code,
854            )
855        )
856
857    def on_hci_disconnect_command(self, command):
858        '''
859        See Bluetooth spec Vol 4, Part E - 7.1.6 Disconnect Command
860        '''
861        # First, say that the disconnection is pending
862        self.send_hci_packet(
863            HCI_Command_Status_Event(
864                status=HCI_COMMAND_STATUS_PENDING,
865                num_hci_command_packets=1,
866                command_opcode=command.op_code,
867            )
868        )
869
870        # Notify the link of the disconnection
871        handle = command.connection_handle
872        if connection := self.find_central_connection_by_handle(handle):
873            if self.link:
874                self.link.disconnect(
875                    self.random_address, connection.peer_address, command
876                )
877            else:
878                # Remove the connection
879                del self.central_connections[connection.peer_address]
880        elif connection := self.find_classic_connection_by_handle(handle):
881            if self.link:
882                self.link.classic_disconnect(
883                    self,
884                    connection.peer_address,
885                    HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
886                )
887            else:
888                # Remove the connection
889                del self.classic_connections[connection.peer_address]
890        elif cis_link := (
891            self.central_cis_links.get(handle) or self.peripheral_cis_links.get(handle)
892        ):
893            if self.link:
894                self.link.disconnect_cis(
895                    initiator_controller=self,
896                    peer_address=cis_link.acl_connection.peer_address,
897                    cig_id=cis_link.cig_id,
898                    cis_id=cis_link.cis_id,
899                )
900            # Spec requires handle to be kept after disconnection.
901
902    def on_hci_accept_connection_request_command(self, command):
903        '''
904        See Bluetooth spec Vol 4, Part E - 7.1.8 Accept Connection Request command
905        '''
906
907        if self.link is None:
908            return
909        self.send_hci_packet(
910            HCI_Command_Status_Event(
911                status=HCI_SUCCESS,
912                num_hci_command_packets=1,
913                command_opcode=command.op_code,
914            )
915        )
916        self.link.classic_accept_connection(self, command.bd_addr, command.role)
917
918    def on_hci_enhanced_setup_synchronous_connection_command(self, command):
919        '''
920        See Bluetooth spec Vol 4, Part E - 7.1.45 Enhanced Setup Synchronous Connection command
921        '''
922
923        if self.link is None:
924            return
925
926        if not (
927            connection := self.find_classic_connection_by_handle(
928                command.connection_handle
929            )
930        ):
931            self.send_hci_packet(
932                HCI_Command_Status_Event(
933                    status=HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
934                    num_hci_command_packets=1,
935                    command_opcode=command.op_code,
936                )
937            )
938            return
939
940        self.send_hci_packet(
941            HCI_Command_Status_Event(
942                status=HCI_SUCCESS,
943                num_hci_command_packets=1,
944                command_opcode=command.op_code,
945            )
946        )
947        self.link.classic_sco_connect(
948            self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE
949        )
950
951    def on_hci_enhanced_accept_synchronous_connection_request_command(self, command):
952        '''
953        See Bluetooth spec Vol 4, Part E - 7.1.46 Enhanced Accept Synchronous Connection Request command
954        '''
955
956        if self.link is None:
957            return
958
959        if not (connection := self.find_classic_connection_by_address(command.bd_addr)):
960            self.send_hci_packet(
961                HCI_Command_Status_Event(
962                    status=HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
963                    num_hci_command_packets=1,
964                    command_opcode=command.op_code,
965                )
966            )
967            return
968
969        self.send_hci_packet(
970            HCI_Command_Status_Event(
971                status=HCI_SUCCESS,
972                num_hci_command_packets=1,
973                command_opcode=command.op_code,
974            )
975        )
976        self.link.classic_accept_sco_connection(
977            self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE
978        )
979
980    def on_hci_switch_role_command(self, command):
981        '''
982        See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command
983        '''
984
985        if self.link is None:
986            return
987        self.send_hci_packet(
988            HCI_Command_Status_Event(
989                status=HCI_SUCCESS,
990                num_hci_command_packets=1,
991                command_opcode=command.op_code,
992            )
993        )
994        self.link.classic_switch_role(self, command.bd_addr, command.role)
995
996    def on_hci_set_event_mask_command(self, command):
997        '''
998        See Bluetooth spec Vol 4, Part E - 7.3.1 Set Event Mask Command
999        '''
1000        self.event_mask = command.event_mask
1001        return bytes([HCI_SUCCESS])
1002
1003    def on_hci_reset_command(self, _command):
1004        '''
1005        See Bluetooth spec Vol 4, Part E - 7.3.2 Reset Command
1006        '''
1007        # TODO: cleanup what needs to be reset
1008        return bytes([HCI_SUCCESS])
1009
1010    def on_hci_write_local_name_command(self, command):
1011        '''
1012        See Bluetooth spec Vol 4, Part E - 7.3.11 Write Local Name Command
1013        '''
1014        local_name = command.local_name
1015        if len(local_name):
1016            try:
1017                first_null = local_name.find(0)
1018                if first_null >= 0:
1019                    local_name = local_name[:first_null]
1020                self.local_name = str(local_name, 'utf-8')
1021            except UnicodeDecodeError:
1022                pass
1023        return bytes([HCI_SUCCESS])
1024
1025    def on_hci_read_local_name_command(self, _command):
1026        '''
1027        See Bluetooth spec Vol 4, Part E - 7.3.12 Read Local Name Command
1028        '''
1029        local_name = bytes(self.local_name, 'utf-8')[:248]
1030        if len(local_name) < 248:
1031            local_name = local_name + bytes(248 - len(local_name))
1032
1033        return bytes([HCI_SUCCESS]) + local_name
1034
1035    def on_hci_read_class_of_device_command(self, _command):
1036        '''
1037        See Bluetooth spec Vol 4, Part E - 7.3.25 Read Class of Device Command
1038        '''
1039        return bytes([HCI_SUCCESS, 0, 0, 0])
1040
1041    def on_hci_write_class_of_device_command(self, _command):
1042        '''
1043        See Bluetooth spec Vol 4, Part E - 7.3.26 Write Class of Device Command
1044        '''
1045        return bytes([HCI_SUCCESS])
1046
1047    def on_hci_read_synchronous_flow_control_enable_command(self, _command):
1048        '''
1049        See Bluetooth spec Vol 4, Part E - 7.3.36 Read Synchronous Flow Control Enable
1050        Command
1051        '''
1052        if self.sync_flow_control:
1053            ret = 1
1054        else:
1055            ret = 0
1056        return bytes([HCI_SUCCESS, ret])
1057
1058    def on_hci_write_synchronous_flow_control_enable_command(self, command):
1059        '''
1060        See Bluetooth spec Vol 4, Part E - 7.3.37 Write Synchronous Flow Control Enable
1061        Command
1062        '''
1063        ret = HCI_SUCCESS
1064        if command.synchronous_flow_control_enable == 1:
1065            self.sync_flow_control = True
1066        elif command.synchronous_flow_control_enable == 0:
1067            self.sync_flow_control = False
1068        else:
1069            ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR
1070        return bytes([ret])
1071
1072    def on_hci_set_controller_to_host_flow_control_command(self, _command):
1073        '''
1074        See Bluetooth spec Vol 4, Part E - 7.3.38 Set Controller To Host Flow Control
1075        Command
1076        '''
1077        # For now we just accept the command but ignore the values.
1078        # TODO: respect the passed in values.
1079        return bytes([HCI_SUCCESS])
1080
1081    def on_hci_host_buffer_size_command(self, _command):
1082        '''
1083        See Bluetooth spec Vol 4, Part E - 7.3.39 Host Buffer Size Command
1084        '''
1085        # For now we just accept the command but ignore the values.
1086        # TODO: respect the passed in values.
1087        return bytes([HCI_SUCCESS])
1088
1089    def on_hci_write_extended_inquiry_response_command(self, _command):
1090        '''
1091        See Bluetooth spec Vol 4, Part E - 7.3.56 Write Extended Inquiry Response
1092        Command
1093        '''
1094        return bytes([HCI_SUCCESS])
1095
1096    def on_hci_write_simple_pairing_mode_command(self, _command):
1097        '''
1098        See Bluetooth spec Vol 4, Part E - 7.3.59 Write Simple Pairing Mode Command
1099        '''
1100        return bytes([HCI_SUCCESS])
1101
1102    def on_hci_set_event_mask_page_2_command(self, command):
1103        '''
1104        See Bluetooth spec Vol 4, Part E - 7.3.69 Set Event Mask Page 2 Command
1105        '''
1106        self.event_mask_page_2 = command.event_mask_page_2
1107        return bytes([HCI_SUCCESS])
1108
1109    def on_hci_read_le_host_support_command(self, _command):
1110        '''
1111        See Bluetooth spec Vol 4, Part E - 7.3.78 Write LE Host Support Command
1112        '''
1113        return bytes([HCI_SUCCESS, 1, 0])
1114
1115    def on_hci_write_le_host_support_command(self, _command):
1116        '''
1117        See Bluetooth spec Vol 4, Part E - 7.3.79 Write LE Host Support Command
1118        '''
1119        # TODO / Just ignore for now
1120        return bytes([HCI_SUCCESS])
1121
1122    def on_hci_write_authenticated_payload_timeout_command(self, command):
1123        '''
1124        See Bluetooth spec Vol 4, Part E - 7.3.94 Write Authenticated Payload Timeout
1125        Command
1126        '''
1127        # TODO
1128        return struct.pack('<BH', HCI_SUCCESS, command.connection_handle)
1129
1130    def on_hci_read_local_version_information_command(self, _command):
1131        '''
1132        See Bluetooth spec Vol 4, Part E - 7.4.1 Read Local Version Information Command
1133        '''
1134        return struct.pack(
1135            '<BBHBHH',
1136            HCI_SUCCESS,
1137            self.hci_version,
1138            self.hci_revision,
1139            self.lmp_version,
1140            self.manufacturer_name,
1141            self.lmp_subversion,
1142        )
1143
1144    def on_hci_read_local_supported_commands_command(self, _command):
1145        '''
1146        See Bluetooth spec Vol 4, Part E - 7.4.2 Read Local Supported Commands Command
1147        '''
1148        return bytes([HCI_SUCCESS]) + self.supported_commands
1149
1150    def on_hci_read_local_supported_features_command(self, _command):
1151        '''
1152        See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command
1153        '''
1154        return bytes([HCI_SUCCESS]) + self.lmp_features[:8]
1155
1156    def on_hci_read_local_extended_features_command(self, command):
1157        '''
1158        See Bluetooth spec Vol 4, Part E - 7.4.4 Read Local Extended Features Command
1159        '''
1160        if command.page_number * 8 > len(self.lmp_features):
1161            return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
1162        return (
1163            bytes(
1164                [
1165                    # Status
1166                    HCI_SUCCESS,
1167                    # Page number
1168                    command.page_number,
1169                    # Max page number
1170                    len(self.lmp_features) // 8 - 1,
1171                ]
1172            )
1173            # Features of the current page
1174            + self.lmp_features[command.page_number * 8 : (command.page_number + 1) * 8]
1175        )
1176
1177    def on_hci_read_buffer_size_command(self, _command):
1178        '''
1179        See Bluetooth spec Vol 4, Part E - 7.4.5 Read Buffer Size Command
1180        '''
1181        return struct.pack(
1182            '<BHBHH',
1183            HCI_SUCCESS,
1184            self.hc_data_packet_length,
1185            0,
1186            self.hc_total_num_data_packets,
1187            0,
1188        )
1189
1190    def on_hci_read_bd_addr_command(self, _command):
1191        '''
1192        See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
1193        '''
1194        bd_addr = (
1195            self._public_address.to_bytes()
1196            if self._public_address is not None
1197            else bytes(6)
1198        )
1199        return bytes([HCI_SUCCESS]) + bd_addr
1200
1201    def on_hci_le_set_event_mask_command(self, command):
1202        '''
1203        See Bluetooth spec Vol 4, Part E - 7.8.1 LE Set Event Mask Command
1204        '''
1205        self.le_event_mask = command.le_event_mask
1206        return bytes([HCI_SUCCESS])
1207
1208    def on_hci_le_read_buffer_size_command(self, _command):
1209        '''
1210        See Bluetooth spec Vol 4, Part E - 7.8.2 LE Read Buffer Size Command
1211        '''
1212        return struct.pack(
1213            '<BHB',
1214            HCI_SUCCESS,
1215            self.hc_le_data_packet_length,
1216            self.hc_total_num_le_data_packets,
1217        )
1218
1219    def on_hci_le_read_local_supported_features_command(self, _command):
1220        '''
1221        See Bluetooth spec Vol 4, Part E - 7.8.3 LE Read Local Supported Features
1222        Command
1223        '''
1224        return bytes([HCI_SUCCESS]) + self.le_features
1225
1226    def on_hci_le_set_random_address_command(self, command):
1227        '''
1228        See Bluetooth spec Vol 4, Part E - 7.8.4 LE Set Random Address Command
1229        '''
1230        self.random_address = command.random_address
1231        return bytes([HCI_SUCCESS])
1232
1233    def on_hci_le_set_advertising_parameters_command(self, command):
1234        '''
1235        See Bluetooth spec Vol 4, Part E - 7.8.5 LE Set Advertising Parameters Command
1236        '''
1237        self.advertising_parameters = command
1238        return bytes([HCI_SUCCESS])
1239
1240    def on_hci_le_read_advertising_physical_channel_tx_power_command(self, _command):
1241        '''
1242        See Bluetooth spec Vol 4, Part E - 7.8.6 LE Read Advertising Physical Channel
1243        Tx Power Command
1244        '''
1245        return bytes([HCI_SUCCESS, self.advertising_channel_tx_power])
1246
1247    def on_hci_le_set_advertising_data_command(self, command):
1248        '''
1249        See Bluetooth spec Vol 4, Part E - 7.8.7 LE Set Advertising Data Command
1250        '''
1251        self.advertising_data = command.advertising_data
1252        return bytes([HCI_SUCCESS])
1253
1254    def on_hci_le_set_scan_response_data_command(self, command):
1255        '''
1256        See Bluetooth spec Vol 4, Part E - 7.8.8 LE Set Scan Response Data Command
1257        '''
1258        self.le_scan_response_data = command.scan_response_data
1259        return bytes([HCI_SUCCESS])
1260
1261    def on_hci_le_set_advertising_enable_command(self, command):
1262        '''
1263        See Bluetooth spec Vol 4, Part E - 7.8.9 LE Set Advertising Enable Command
1264        '''
1265        if command.advertising_enable:
1266            self.start_advertising()
1267        else:
1268            self.stop_advertising()
1269
1270        return bytes([HCI_SUCCESS])
1271
1272    def on_hci_le_set_scan_parameters_command(self, command):
1273        '''
1274        See Bluetooth spec Vol 4, Part E - 7.8.10 LE Set Scan Parameters Command
1275        '''
1276        if self.le_scan_enable:
1277            return bytes([HCI_COMMAND_DISALLOWED_ERROR])
1278
1279        self.le_scan_type = command.le_scan_type
1280        self.le_scan_interval = command.le_scan_interval
1281        self.le_scan_window = command.le_scan_window
1282        self.le_scan_own_address_type = command.own_address_type
1283        self.le_scanning_filter_policy = command.scanning_filter_policy
1284        return bytes([HCI_SUCCESS])
1285
1286    def on_hci_le_set_scan_enable_command(self, command):
1287        '''
1288        See Bluetooth spec Vol 4, Part E - 7.8.11 LE Set Scan Enable Command
1289        '''
1290        self.le_scan_enable = command.le_scan_enable
1291        self.filter_duplicates = command.filter_duplicates
1292        return bytes([HCI_SUCCESS])
1293
1294    def on_hci_le_create_connection_command(self, command):
1295        '''
1296        See Bluetooth spec Vol 4, Part E - 7.8.12 LE Create Connection Command
1297        '''
1298
1299        if not self.link:
1300            return
1301
1302        logger.debug(f'Connection request to {command.peer_address}')
1303
1304        # Check that we don't already have a pending connection
1305        if self.link.get_pending_connection():
1306            self.send_hci_packet(
1307                HCI_Command_Status_Event(
1308                    status=HCI_COMMAND_DISALLOWED_ERROR,
1309                    num_hci_command_packets=1,
1310                    command_opcode=command.op_code,
1311                )
1312            )
1313            return
1314
1315        # Initiate the connection
1316        self.link.connect(self.random_address, command)
1317
1318        # Say that the connection is pending
1319        self.send_hci_packet(
1320            HCI_Command_Status_Event(
1321                status=HCI_COMMAND_STATUS_PENDING,
1322                num_hci_command_packets=1,
1323                command_opcode=command.op_code,
1324            )
1325        )
1326
1327    def on_hci_le_create_connection_cancel_command(self, _command):
1328        '''
1329        See Bluetooth spec Vol 4, Part E - 7.8.13 LE Create Connection Cancel Command
1330        '''
1331        return bytes([HCI_SUCCESS])
1332
1333    def on_hci_le_read_filter_accept_list_size_command(self, _command):
1334        '''
1335        See Bluetooth spec Vol 4, Part E - 7.8.14 LE Read Filter Accept List Size
1336        Command
1337        '''
1338        return bytes([HCI_SUCCESS, self.filter_accept_list_size])
1339
1340    def on_hci_le_clear_filter_accept_list_command(self, _command):
1341        '''
1342        See Bluetooth spec Vol 4, Part E - 7.8.15 LE Clear Filter Accept List Command
1343        '''
1344        return bytes([HCI_SUCCESS])
1345
1346    def on_hci_le_add_device_to_filter_accept_list_command(self, _command):
1347        '''
1348        See Bluetooth spec Vol 4, Part E - 7.8.16 LE Add Device To Filter Accept List
1349        Command
1350        '''
1351        return bytes([HCI_SUCCESS])
1352
1353    def on_hci_le_remove_device_from_filter_accept_list_command(self, _command):
1354        '''
1355        See Bluetooth spec Vol 4, Part E - 7.8.17 LE Remove Device From Filter Accept
1356        List Command
1357        '''
1358        return bytes([HCI_SUCCESS])
1359
1360    def on_hci_le_read_remote_features_command(self, command):
1361        '''
1362        See Bluetooth spec Vol 4, Part E - 7.8.21 LE Read Remote Features Command
1363        '''
1364
1365        handle = command.connection_handle
1366
1367        if not self.find_connection_by_handle(handle):
1368            self.send_hci_packet(
1369                HCI_Command_Status_Event(
1370                    status=HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
1371                    num_hci_command_packets=1,
1372                    command_opcode=command.op_code,
1373                )
1374            )
1375            return
1376
1377        # First, say that the command is pending
1378        self.send_hci_packet(
1379            HCI_Command_Status_Event(
1380                status=HCI_COMMAND_STATUS_PENDING,
1381                num_hci_command_packets=1,
1382                command_opcode=command.op_code,
1383            )
1384        )
1385
1386        # Then send the remote features
1387        self.send_hci_packet(
1388            HCI_LE_Read_Remote_Features_Complete_Event(
1389                status=HCI_SUCCESS,
1390                connection_handle=handle,
1391                le_features=bytes.fromhex('dd40000000000000'),
1392            )
1393        )
1394
1395    def on_hci_le_rand_command(self, _command):
1396        '''
1397        See Bluetooth spec Vol 4, Part E - 7.8.23 LE Rand Command
1398        '''
1399        return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64))
1400
1401    def on_hci_le_enable_encryption_command(self, command):
1402        '''
1403        See Bluetooth spec Vol 4, Part E - 7.8.24 LE Enable Encryption Command
1404        '''
1405
1406        # Check the parameters
1407        if not (
1408            connection := self.find_central_connection_by_handle(
1409                command.connection_handle
1410            )
1411        ):
1412            logger.warning('connection not found')
1413            return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
1414
1415        # Notify that the connection is now encrypted
1416        self.link.on_connection_encrypted(
1417            self.random_address,
1418            connection.peer_address,
1419            command.random_number,
1420            command.encrypted_diversifier,
1421            command.long_term_key,
1422        )
1423
1424        self.send_hci_packet(
1425            HCI_Command_Status_Event(
1426                status=HCI_COMMAND_STATUS_PENDING,
1427                num_hci_command_packets=1,
1428                command_opcode=command.op_code,
1429            )
1430        )
1431
1432        return None
1433
1434    def on_hci_le_read_supported_states_command(self, _command):
1435        '''
1436        See Bluetooth spec Vol 4, Part E - 7.8.27 LE Read Supported States Command
1437        '''
1438        return bytes([HCI_SUCCESS]) + self.le_states
1439
1440    def on_hci_le_read_suggested_default_data_length_command(self, _command):
1441        '''
1442        See Bluetooth spec Vol 4, Part E - 7.8.34 LE Read Suggested Default Data Length
1443        Command
1444        '''
1445        return struct.pack(
1446            '<BHH',
1447            HCI_SUCCESS,
1448            self.suggested_max_tx_octets,
1449            self.suggested_max_tx_time,
1450        )
1451
1452    def on_hci_le_write_suggested_default_data_length_command(self, command):
1453        '''
1454        See Bluetooth spec Vol 4, Part E - 7.8.35 LE Write Suggested Default Data Length
1455        Command
1456        '''
1457        self.suggested_max_tx_octets, self.suggested_max_tx_time = struct.unpack(
1458            '<HH', command.parameters[:4]
1459        )
1460        return bytes([HCI_SUCCESS])
1461
1462    def on_hci_le_read_local_p_256_public_key_command(self, _command):
1463        '''
1464        See Bluetooth spec Vol 4, Part E - 7.8.36 LE Read P-256 Public Key Command
1465        '''
1466        # TODO create key and send HCI_LE_Read_Local_P-256_Public_Key_Complete event
1467        return bytes([HCI_SUCCESS])
1468
1469    def on_hci_le_add_device_to_resolving_list_command(self, _command):
1470        '''
1471        See Bluetooth spec Vol 4, Part E - 7.8.38 LE Add Device To Resolving List
1472        Command
1473        '''
1474        return bytes([HCI_SUCCESS])
1475
1476    def on_hci_le_clear_resolving_list_command(self, _command):
1477        '''
1478        See Bluetooth spec Vol 4, Part E - 7.8.40 LE Clear Resolving List Command
1479        '''
1480        return bytes([HCI_SUCCESS])
1481
1482    def on_hci_le_read_resolving_list_size_command(self, _command):
1483        '''
1484        See Bluetooth spec Vol 4, Part E - 7.8.41 LE Read Resolving List Size Command
1485        '''
1486        return bytes([HCI_SUCCESS, self.resolving_list_size])
1487
1488    def on_hci_le_set_address_resolution_enable_command(self, command):
1489        '''
1490        See Bluetooth spec Vol 4, Part E - 7.8.44 LE Set Address Resolution Enable
1491        Command
1492        '''
1493        ret = HCI_SUCCESS
1494        if command.address_resolution_enable == 1:
1495            self.le_address_resolution = True
1496        elif command.address_resolution_enable == 0:
1497            self.le_address_resolution = False
1498        else:
1499            ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR
1500        return bytes([ret])
1501
1502    def on_hci_le_set_resolvable_private_address_timeout_command(self, command):
1503        '''
1504        See Bluetooth spec Vol 4, Part E - 7.8.45 LE Set Resolvable Private Address
1505        Timeout Command
1506        '''
1507        self.le_rpa_timeout = command.rpa_timeout
1508        return bytes([HCI_SUCCESS])
1509
1510    def on_hci_le_read_maximum_data_length_command(self, _command):
1511        '''
1512        See Bluetooth spec Vol 4, Part E - 7.8.46 LE Read Maximum Data Length Command
1513        '''
1514        return struct.pack(
1515            '<BHHHH',
1516            HCI_SUCCESS,
1517            self.supported_max_tx_octets,
1518            self.supported_max_tx_time,
1519            self.supported_max_rx_octets,
1520            self.supported_max_rx_time,
1521        )
1522
1523    def on_hci_le_read_phy_command(self, command):
1524        '''
1525        See Bluetooth spec Vol 4, Part E - 7.8.47 LE Read PHY Command
1526        '''
1527        return struct.pack(
1528            '<BHBB',
1529            HCI_SUCCESS,
1530            command.connection_handle,
1531            HCI_LE_1M_PHY,
1532            HCI_LE_1M_PHY,
1533        )
1534
1535    def on_hci_le_set_default_phy_command(self, command):
1536        '''
1537        See Bluetooth spec Vol 4, Part E - 7.8.48 LE Set Default PHY Command
1538        '''
1539        self.default_phy = {
1540            'all_phys': command.all_phys,
1541            'tx_phys': command.tx_phys,
1542            'rx_phys': command.rx_phys,
1543        }
1544        return bytes([HCI_SUCCESS])
1545
1546    def on_hci_le_read_maximum_advertising_data_length_command(self, _command):
1547        '''
1548        See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data
1549        Length Command
1550        '''
1551        return struct.pack('<BH', HCI_SUCCESS, 0x0672)
1552
1553    def on_hci_le_read_number_of_supported_advertising_sets_command(self, _command):
1554        '''
1555        See Bluetooth spec Vol 4, Part E - 7.8.58 LE Read Number of Supported
1556        Advertising Set Command
1557        '''
1558        return struct.pack('<BB', HCI_SUCCESS, 0xF0)
1559
1560    def on_hci_le_read_transmit_power_command(self, _command):
1561        '''
1562        See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command
1563        '''
1564        return struct.pack('<BBB', HCI_SUCCESS, 0, 0)
1565
1566    def on_hci_le_set_cig_parameters_command(self, command):
1567        '''
1568        See Bluetooth spec Vol 4, Part E - 7.8.97 LE Set CIG Parameter Command
1569        '''
1570
1571        # Remove old CIG implicitly.
1572        for handle, cis_link in self.central_cis_links.items():
1573            if cis_link.cig_id == command.cig_id:
1574                self.central_cis_links.pop(handle)
1575
1576        handles = []
1577        for cis_id in command.cis_id:
1578            handle = self.allocate_connection_handle()
1579            handles.append(handle)
1580            self.central_cis_links[handle] = CisLink(
1581                cis_id=cis_id,
1582                cig_id=command.cig_id,
1583                handle=handle,
1584            )
1585        return struct.pack(
1586            '<BBB', HCI_SUCCESS, command.cig_id, len(handles)
1587        ) + b''.join([struct.pack('<H', handle) for handle in handles])
1588
1589    def on_hci_le_create_cis_command(self, command):
1590        '''
1591        See Bluetooth spec Vol 4, Part E - 7.8.99 LE Create CIS Command
1592        '''
1593        if not self.link:
1594            return
1595
1596        for cis_handle, acl_handle in zip(
1597            command.cis_connection_handle, command.acl_connection_handle
1598        ):
1599            if not (connection := self.find_connection_by_handle(acl_handle)):
1600                logger.error(f'Cannot find connection with handle={acl_handle}')
1601                return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
1602
1603            if not (cis_link := self.central_cis_links.get(cis_handle)):
1604                logger.error(f'Cannot find CIS with handle={cis_handle}')
1605                return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
1606
1607            cis_link.acl_connection = connection
1608
1609            self.link.create_cis(
1610                self,
1611                peripheral_address=connection.peer_address,
1612                cig_id=cis_link.cig_id,
1613                cis_id=cis_link.cis_id,
1614            )
1615
1616        self.send_hci_packet(
1617            HCI_Command_Status_Event(
1618                status=HCI_COMMAND_STATUS_PENDING,
1619                num_hci_command_packets=1,
1620                command_opcode=command.op_code,
1621            )
1622        )
1623
1624    def on_hci_le_remove_cig_command(self, command):
1625        '''
1626        See Bluetooth spec Vol 4, Part E - 7.8.100 LE Remove CIG Command
1627        '''
1628
1629        status = HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR
1630
1631        for cis_handle, cis_link in self.central_cis_links.items():
1632            if cis_link.cig_id == command.cig_id:
1633                self.central_cis_links.pop(cis_handle)
1634                status = HCI_SUCCESS
1635
1636        return struct.pack('<BH', status, command.cig_id)
1637
1638    def on_hci_le_accept_cis_request_command(self, command):
1639        '''
1640        See Bluetooth spec Vol 4, Part E - 7.8.101 LE Accept CIS Request Command
1641        '''
1642        if not self.link:
1643            return
1644
1645        if not (
1646            pending_cis_link := self.peripheral_cis_links.get(command.connection_handle)
1647        ):
1648            logger.error(f'Cannot find CIS with handle={command.connection_handle}')
1649            return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
1650
1651        assert pending_cis_link.acl_connection
1652        self.link.accept_cis(
1653            peripheral_controller=self,
1654            central_address=pending_cis_link.acl_connection.peer_address,
1655            cig_id=pending_cis_link.cig_id,
1656            cis_id=pending_cis_link.cis_id,
1657        )
1658
1659        self.send_hci_packet(
1660            HCI_Command_Status_Event(
1661                status=HCI_COMMAND_STATUS_PENDING,
1662                num_hci_command_packets=1,
1663                command_opcode=command.op_code,
1664            )
1665        )
1666
1667    def on_hci_le_setup_iso_data_path_command(self, command):
1668        '''
1669        See Bluetooth spec Vol 4, Part E - 7.8.109 LE Setup ISO Data Path Command
1670        '''
1671        return struct.pack('<BH', HCI_SUCCESS, command.connection_handle)
1672
1673    def on_hci_le_remove_iso_data_path_command(self, command):
1674        '''
1675        See Bluetooth spec Vol 4, Part E - 7.8.110 LE Remove ISO Data Path Command
1676        '''
1677        return struct.pack('<BH', HCI_SUCCESS, command.connection_handle)
1678