1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import asyncio
20from collections.abc import Iterable
21from contextlib import (
22    asynccontextmanager,
23    AsyncExitStack,
24    closing,
25)
26import copy
27from dataclasses import dataclass, field
28from enum import Enum, IntEnum
29import functools
30import itertools
31import json
32import logging
33import secrets
34import sys
35from typing import (
36    Any,
37    Callable,
38    ClassVar,
39    Dict,
40    List,
41    Optional,
42    Tuple,
43    Type,
44    TypeVar,
45    Union,
46    cast,
47    overload,
48    TYPE_CHECKING,
49)
50from typing_extensions import Self
51
52from pyee import EventEmitter
53
54from bumble import hci
55from .colors import color
56from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
57from .gatt import Characteristic, Descriptor, Service
58from .hci import (
59    HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
60    HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
61    HCI_CENTRAL_ROLE,
62    HCI_PERIPHERAL_ROLE,
63    HCI_COMMAND_STATUS_PENDING,
64    HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
65    HCI_DISPLAY_YES_NO_IO_CAPABILITY,
66    HCI_DISPLAY_ONLY_IO_CAPABILITY,
67    HCI_EXTENDED_INQUIRY_MODE,
68    HCI_GENERAL_INQUIRY_LAP,
69    HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
70    HCI_KEYBOARD_ONLY_IO_CAPABILITY,
71    HCI_LE_1M_PHY,
72    HCI_LE_1M_PHY_BIT,
73    HCI_LE_2M_PHY,
74    HCI_LE_CODED_PHY,
75    HCI_LE_CODED_PHY_BIT,
76    HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND,
77    HCI_LE_RAND_COMMAND,
78    HCI_LE_READ_PHY_COMMAND,
79    HCI_LE_SET_PHY_COMMAND,
80    HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
81    HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
82    HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
83    HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
84    HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
85    HCI_OPERATION_CANCELLED_BY_HOST_ERROR,
86    HCI_R2_PAGE_SCAN_REPETITION_MODE,
87    HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
88    HCI_SUCCESS,
89    HCI_WRITE_LE_HOST_SUPPORT_COMMAND,
90    HCI_Accept_Connection_Request_Command,
91    HCI_Authentication_Requested_Command,
92    HCI_Command_Status_Event,
93    HCI_Constant,
94    HCI_Create_Connection_Cancel_Command,
95    HCI_Create_Connection_Command,
96    HCI_Connection_Complete_Event,
97    HCI_Disconnect_Command,
98    HCI_Encryption_Change_Event,
99    HCI_Error,
100    HCI_IO_Capability_Request_Reply_Command,
101    HCI_Inquiry_Cancel_Command,
102    HCI_Inquiry_Command,
103    HCI_IsoDataPacket,
104    HCI_LE_Accept_CIS_Request_Command,
105    HCI_LE_Add_Device_To_Resolving_List_Command,
106    HCI_LE_Advertising_Report_Event,
107    HCI_LE_BIGInfo_Advertising_Report_Event,
108    HCI_LE_Clear_Resolving_List_Command,
109    HCI_LE_Connection_Update_Command,
110    HCI_LE_Create_Connection_Cancel_Command,
111    HCI_LE_Create_Connection_Command,
112    HCI_LE_Create_CIS_Command,
113    HCI_LE_Periodic_Advertising_Create_Sync_Command,
114    HCI_LE_Periodic_Advertising_Create_Sync_Cancel_Command,
115    HCI_LE_Periodic_Advertising_Report_Event,
116    HCI_LE_Periodic_Advertising_Sync_Transfer_Command,
117    HCI_LE_Periodic_Advertising_Terminate_Sync_Command,
118    HCI_LE_Enable_Encryption_Command,
119    HCI_LE_Extended_Advertising_Report_Event,
120    HCI_LE_Extended_Create_Connection_Command,
121    HCI_LE_Rand_Command,
122    HCI_LE_Read_PHY_Command,
123    HCI_LE_Read_Remote_Features_Command,
124    HCI_LE_Reject_CIS_Request_Command,
125    HCI_LE_Remove_Advertising_Set_Command,
126    HCI_LE_Set_Address_Resolution_Enable_Command,
127    HCI_LE_Set_Advertising_Data_Command,
128    HCI_LE_Set_Advertising_Enable_Command,
129    HCI_LE_Set_Advertising_Parameters_Command,
130    HCI_LE_Set_Advertising_Set_Random_Address_Command,
131    HCI_LE_Set_CIG_Parameters_Command,
132    HCI_LE_Set_Data_Length_Command,
133    HCI_LE_Set_Default_PHY_Command,
134    HCI_LE_Set_Extended_Scan_Enable_Command,
135    HCI_LE_Set_Extended_Scan_Parameters_Command,
136    HCI_LE_Set_Extended_Scan_Response_Data_Command,
137    HCI_LE_Set_Extended_Advertising_Data_Command,
138    HCI_LE_Set_Extended_Advertising_Enable_Command,
139    HCI_LE_Set_Extended_Advertising_Parameters_Command,
140    HCI_LE_Set_Host_Feature_Command,
141    HCI_LE_Set_Periodic_Advertising_Enable_Command,
142    HCI_LE_Set_PHY_Command,
143    HCI_LE_Set_Random_Address_Command,
144    HCI_LE_Set_Scan_Enable_Command,
145    HCI_LE_Set_Scan_Parameters_Command,
146    HCI_LE_Set_Scan_Response_Data_Command,
147    HCI_PIN_Code_Request_Reply_Command,
148    HCI_PIN_Code_Request_Negative_Reply_Command,
149    HCI_Read_BD_ADDR_Command,
150    HCI_Read_RSSI_Command,
151    HCI_Reject_Connection_Request_Command,
152    HCI_Remote_Name_Request_Command,
153    HCI_Switch_Role_Command,
154    HCI_Set_Connection_Encryption_Command,
155    HCI_StatusError,
156    HCI_SynchronousDataPacket,
157    HCI_User_Confirmation_Request_Negative_Reply_Command,
158    HCI_User_Confirmation_Request_Reply_Command,
159    HCI_User_Passkey_Request_Negative_Reply_Command,
160    HCI_User_Passkey_Request_Reply_Command,
161    HCI_Write_Class_Of_Device_Command,
162    HCI_Write_Extended_Inquiry_Response_Command,
163    HCI_Write_Inquiry_Mode_Command,
164    HCI_Write_LE_Host_Support_Command,
165    HCI_Write_Local_Name_Command,
166    HCI_Write_Scan_Enable_Command,
167    HCI_Write_Secure_Connections_Host_Support_Command,
168    HCI_Write_Simple_Pairing_Mode_Command,
169    Address,
170    OwnAddressType,
171    LeFeature,
172    LeFeatureMask,
173    LmpFeatureMask,
174    Phy,
175    phy_list_to_bits,
176)
177from .host import Host
178from .profiles.gap import GenericAccessService
179from .core import (
180    BT_BR_EDR_TRANSPORT,
181    BT_CENTRAL_ROLE,
182    BT_LE_TRANSPORT,
183    BT_PERIPHERAL_ROLE,
184    AdvertisingData,
185    BaseBumbleError,
186    ConnectionParameterUpdateError,
187    CommandTimeoutError,
188    ConnectionParameters,
189    ConnectionPHY,
190    InvalidArgumentError,
191    InvalidOperationError,
192    InvalidStateError,
193    NotSupportedError,
194    OutOfResourcesError,
195    UnreachableError,
196)
197from .utils import (
198    AsyncRunner,
199    CompositeEventEmitter,
200    EventWatcher,
201    setup_event_forwarding,
202    composite_listener,
203    deprecated,
204    experimental,
205)
206from .keys import (
207    KeyStore,
208    PairingKeys,
209)
210from bumble import pairing
211from bumble import gatt_client
212from bumble import gatt_server
213from bumble import smp
214from bumble import sdp
215from bumble import l2cap
216from bumble import core
217
218if TYPE_CHECKING:
219    from .transport.common import TransportSource, TransportSink
220
221
222# -----------------------------------------------------------------------------
223# Logging
224# -----------------------------------------------------------------------------
225logger = logging.getLogger(__name__)
226
227# -----------------------------------------------------------------------------
228# Constants
229# -----------------------------------------------------------------------------
230# fmt: off
231# pylint: disable=line-too-long
232
233DEVICE_MIN_SCAN_INTERVAL                      = 25
234DEVICE_MAX_SCAN_INTERVAL                      = 10240
235DEVICE_MIN_SCAN_WINDOW                        = 25
236DEVICE_MAX_SCAN_WINDOW                        = 10240
237DEVICE_MIN_LE_RSSI                            = -127
238DEVICE_MAX_LE_RSSI                            = 20
239DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE    = 0x00
240DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE    = 0xEF
241
242DEVICE_DEFAULT_ADDRESS                        = '00:00:00:00:00:00'
243DEVICE_DEFAULT_ADVERTISING_INTERVAL           = 1000  # ms
244DEVICE_DEFAULT_ADVERTISING_DATA               = ''
245DEVICE_DEFAULT_NAME                           = 'Bumble'
246DEVICE_DEFAULT_INQUIRY_LENGTH                 = 8  # 10.24 seconds
247DEVICE_DEFAULT_CLASS_OF_DEVICE                = 0
248DEVICE_DEFAULT_SCAN_RESPONSE_DATA             = b''
249DEVICE_DEFAULT_DATA_LENGTH                    = (27, 328, 27, 328)
250DEVICE_DEFAULT_SCAN_INTERVAL                  = 60  # ms
251DEVICE_DEFAULT_SCAN_WINDOW                    = 60  # ms
252DEVICE_DEFAULT_CONNECT_TIMEOUT                = None  # No timeout
253DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL          = 60  # ms
254DEVICE_DEFAULT_CONNECT_SCAN_WINDOW            = 60  # ms
255DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN        = 15  # ms
256DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX        = 30  # ms
257DEVICE_DEFAULT_CONNECTION_MAX_LATENCY         = 0
258DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT = 720  # ms
259DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH       = 0   # ms
260DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH       = 0   # ms
261DEVICE_DEFAULT_L2CAP_COC_MTU                  = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU
262DEVICE_DEFAULT_L2CAP_COC_MPS                  = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS
263DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS          = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS
264DEVICE_DEFAULT_ADVERTISING_TX_POWER           = (
265    HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE
266)
267DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP = 0
268DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT = 5.0
269DEVICE_DEFAULT_LE_RPA_TIMEOUT                 = 15 * 60 # 15 minutes (in seconds)
270
271# fmt: on
272# pylint: enable=line-too-long
273
274# As specified in 7.8.56 LE Set Extended Advertising Enable command
275DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION = 1.28
276
277
278# -----------------------------------------------------------------------------
279# Classes
280# -----------------------------------------------------------------------------
281class ObjectLookupError(BaseBumbleError):
282    """Error raised when failed to lookup an object."""
283
284
285# -----------------------------------------------------------------------------
286@dataclass
287class Advertisement:
288    # Attributes
289    address: Address
290    rssi: int = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
291    is_legacy: bool = False
292    is_anonymous: bool = False
293    is_connectable: bool = False
294    is_directed: bool = False
295    is_scannable: bool = False
296    is_scan_response: bool = False
297    is_complete: bool = True
298    is_truncated: bool = False
299    primary_phy: int = 0
300    secondary_phy: int = 0
301    tx_power: int = (
302        HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
303    )
304    sid: int = 0
305    data_bytes: bytes = b''
306
307    # Constants
308    TX_POWER_NOT_AVAILABLE: ClassVar[int] = (
309        HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
310    )
311    RSSI_NOT_AVAILABLE: ClassVar[int] = (
312        HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
313    )
314
315    def __post_init__(self) -> None:
316        self.data = AdvertisingData.from_bytes(self.data_bytes)
317
318    @classmethod
319    def from_advertising_report(cls, report) -> Optional[Advertisement]:
320        if isinstance(report, HCI_LE_Advertising_Report_Event.Report):
321            return LegacyAdvertisement.from_advertising_report(report)
322
323        if isinstance(report, HCI_LE_Extended_Advertising_Report_Event.Report):
324            return ExtendedAdvertisement.from_advertising_report(report)
325
326        return None
327
328
329# -----------------------------------------------------------------------------
330class LegacyAdvertisement(Advertisement):
331    @classmethod
332    def from_advertising_report(cls, report):
333        return cls(
334            address=report.address,
335            rssi=report.rssi,
336            is_legacy=True,
337            is_connectable=report.event_type
338            in (
339                HCI_LE_Advertising_Report_Event.ADV_IND,
340                HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND,
341            ),
342            is_directed=report.event_type
343            == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND,
344            is_scannable=report.event_type
345            in (
346                HCI_LE_Advertising_Report_Event.ADV_IND,
347                HCI_LE_Advertising_Report_Event.ADV_SCAN_IND,
348            ),
349            is_scan_response=report.event_type
350            == HCI_LE_Advertising_Report_Event.SCAN_RSP,
351            data_bytes=report.data,
352        )
353
354
355# -----------------------------------------------------------------------------
356class ExtendedAdvertisement(Advertisement):
357    @classmethod
358    def from_advertising_report(cls, report):
359        # fmt: off
360        # pylint: disable=line-too-long
361        return cls(
362            address          = report.address,
363            rssi             = report.rssi,
364            is_legacy        = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0,
365            is_anonymous     = report.address.address_type == HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE,
366            is_connectable   = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0,
367            is_directed      = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0,
368            is_scannable     = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0,
369            is_scan_response = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0,
370            is_complete      = (report.event_type >> 5 & 3)  == HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE,
371            is_truncated     = (report.event_type >> 5 & 3)  == HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME,
372            primary_phy      = report.primary_phy,
373            secondary_phy    = report.secondary_phy,
374            tx_power         = report.tx_power,
375            sid              = report.advertising_sid,
376            data_bytes       = report.data
377        )
378        # fmt: on
379
380
381# -----------------------------------------------------------------------------
382class AdvertisementDataAccumulator:
383    def __init__(self, passive=False):
384        self.passive = passive
385        self.last_advertisement = None
386        self.last_data = b''
387
388    def update(self, report):
389        advertisement = Advertisement.from_advertising_report(report)
390        if advertisement is None:
391            return None
392
393        result = None
394
395        if advertisement.is_scan_response:
396            if (
397                self.last_advertisement is not None
398                and not self.last_advertisement.is_scan_response
399            ):
400                # This is the response to a scannable advertisement
401                result = Advertisement.from_advertising_report(report)
402                result.is_connectable = self.last_advertisement.is_connectable
403                result.is_scannable = True
404                result.data = AdvertisingData.from_bytes(self.last_data + report.data)
405            self.last_data = b''
406        else:
407            if (
408                self.passive
409                or (not advertisement.is_scannable)
410                or (
411                    self.last_advertisement is not None
412                    and not self.last_advertisement.is_scan_response
413                )
414            ):
415                # Don't wait for a scan response
416                result = Advertisement.from_advertising_report(report)
417
418            self.last_data = report.data
419
420        self.last_advertisement = advertisement
421
422        return result
423
424
425# -----------------------------------------------------------------------------
426class AdvertisingType(IntEnum):
427    # fmt: off
428    # pylint: disable=line-too-long
429    UNDIRECTED_CONNECTABLE_SCANNABLE = 0x00  # Undirected, connectable,     scannable
430    DIRECTED_CONNECTABLE_HIGH_DUTY   = 0x01  # Directed,   connectable,     non-scannable
431    UNDIRECTED_SCANNABLE             = 0x02  # Undirected, non-connectable, scannable
432    UNDIRECTED                       = 0x03  # Undirected, non-connectable, non-scannable
433    DIRECTED_CONNECTABLE_LOW_DUTY    = 0x04  # Directed,   connectable,     non-scannable
434    # fmt: on
435
436    @property
437    def has_data(self) -> bool:
438        return self in (
439            AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
440            AdvertisingType.UNDIRECTED_SCANNABLE,
441            AdvertisingType.UNDIRECTED,
442        )
443
444    @property
445    def is_connectable(self) -> bool:
446        return self in (
447            AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
448            AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
449            AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY,
450        )
451
452    @property
453    def is_scannable(self) -> bool:
454        return self in (
455            AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
456            AdvertisingType.UNDIRECTED_SCANNABLE,
457        )
458
459    @property
460    def is_directed(self) -> bool:
461        return self in (
462            AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
463            AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY,
464        )
465
466    @property
467    def is_high_duty_cycle_directed_connectable(self):
468        return self == AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY
469
470
471# -----------------------------------------------------------------------------
472@dataclass
473class LegacyAdvertiser:
474    device: Device
475    advertising_type: AdvertisingType
476    own_address_type: OwnAddressType
477    peer_address: Address
478    auto_restart: bool
479
480    async def start(self) -> None:
481        # Set/update the advertising data if the advertising type allows it
482        if self.advertising_type.has_data:
483            await self.device.send_command(
484                HCI_LE_Set_Advertising_Data_Command(
485                    advertising_data=self.device.advertising_data
486                ),
487                check_result=True,
488            )
489
490        # Set/update the scan response data if the advertising is scannable
491        if self.advertising_type.is_scannable:
492            await self.device.send_command(
493                HCI_LE_Set_Scan_Response_Data_Command(
494                    scan_response_data=self.device.scan_response_data
495                ),
496                check_result=True,
497            )
498
499        # Set the advertising parameters
500        await self.device.send_command(
501            HCI_LE_Set_Advertising_Parameters_Command(
502                advertising_interval_min=self.device.advertising_interval_min,
503                advertising_interval_max=self.device.advertising_interval_max,
504                advertising_type=int(self.advertising_type),
505                own_address_type=self.own_address_type,
506                peer_address_type=self.peer_address.address_type,
507                peer_address=self.peer_address,
508                advertising_channel_map=7,
509                advertising_filter_policy=0,
510            ),
511            check_result=True,
512        )
513
514        # Enable advertising
515        await self.device.send_command(
516            HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1),
517            check_result=True,
518        )
519
520    async def stop(self) -> None:
521        # Disable advertising
522        await self.device.send_command(
523            HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0),
524            check_result=True,
525        )
526
527
528# -----------------------------------------------------------------------------
529@dataclass
530class AdvertisingEventProperties:
531    is_connectable: bool = True
532    is_scannable: bool = False
533    is_directed: bool = False
534    is_high_duty_cycle_directed_connectable: bool = False
535    is_legacy: bool = False
536    is_anonymous: bool = False
537    include_tx_power: bool = False
538
539    def __int__(self) -> int:
540        properties = (
541            HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(0)
542        )
543        if self.is_connectable:
544            properties |= properties.CONNECTABLE_ADVERTISING
545        if self.is_scannable:
546            properties |= properties.SCANNABLE_ADVERTISING
547        if self.is_directed:
548            properties |= properties.DIRECTED_ADVERTISING
549        if self.is_high_duty_cycle_directed_connectable:
550            properties |= properties.HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING
551        if self.is_legacy:
552            properties |= properties.USE_LEGACY_ADVERTISING_PDUS
553        if self.is_anonymous:
554            properties |= properties.ANONYMOUS_ADVERTISING
555        if self.include_tx_power:
556            properties |= properties.INCLUDE_TX_POWER
557
558        return int(properties)
559
560    @classmethod
561    def from_advertising_type(
562        cls: Type[AdvertisingEventProperties],
563        advertising_type: AdvertisingType,
564    ) -> AdvertisingEventProperties:
565        return cls(
566            is_connectable=advertising_type.is_connectable,
567            is_scannable=advertising_type.is_scannable,
568            is_directed=advertising_type.is_directed,
569            is_high_duty_cycle_directed_connectable=advertising_type.is_high_duty_cycle_directed_connectable,
570            is_legacy=True,
571            is_anonymous=False,
572            include_tx_power=False,
573        )
574
575
576# -----------------------------------------------------------------------------
577@dataclass
578class PeriodicAdvertisement:
579    address: Address
580    sid: int
581    tx_power: int = (
582        HCI_LE_Periodic_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
583    )
584    rssi: int = HCI_LE_Periodic_Advertising_Report_Event.RSSI_NOT_AVAILABLE
585    is_truncated: bool = False
586    data_bytes: bytes = b''
587
588    # Constants
589    TX_POWER_NOT_AVAILABLE: ClassVar[int] = (
590        HCI_LE_Periodic_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
591    )
592    RSSI_NOT_AVAILABLE: ClassVar[int] = (
593        HCI_LE_Periodic_Advertising_Report_Event.RSSI_NOT_AVAILABLE
594    )
595
596    def __post_init__(self) -> None:
597        self.data = (
598            None if self.is_truncated else AdvertisingData.from_bytes(self.data_bytes)
599        )
600
601
602# -----------------------------------------------------------------------------
603@dataclass
604class BIGInfoAdvertisement:
605    address: Address
606    sid: int
607    num_bis: int
608    nse: int
609    iso_interval: int
610    bn: int
611    pto: int
612    irc: int
613    max_pdu: int
614    sdu_interval: int
615    max_sdu: int
616    phy: Phy
617    framed: bool
618    encrypted: bool
619
620    @classmethod
621    def from_report(cls, address: Address, sid: int, report) -> Self:
622        return cls(
623            address,
624            sid,
625            report.num_bis,
626            report.nse,
627            report.iso_interval,
628            report.bn,
629            report.pto,
630            report.irc,
631            report.max_pdu,
632            report.sdu_interval,
633            report.max_sdu,
634            Phy(report.phy),
635            report.framing != 0,
636            report.encryption != 0,
637        )
638
639
640# -----------------------------------------------------------------------------
641# TODO: replace with typing.TypeAlias when the code base is all Python >= 3.10
642AdvertisingChannelMap = HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap
643
644
645# -----------------------------------------------------------------------------
646@dataclass
647class AdvertisingParameters:
648    # pylint: disable=line-too-long
649    advertising_event_properties: AdvertisingEventProperties = field(
650        default_factory=AdvertisingEventProperties
651    )
652    primary_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
653    primary_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
654    primary_advertising_channel_map: (
655        HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap
656    ) = (
657        AdvertisingChannelMap.CHANNEL_37
658        | AdvertisingChannelMap.CHANNEL_38
659        | AdvertisingChannelMap.CHANNEL_39
660    )
661    own_address_type: OwnAddressType = OwnAddressType.RANDOM
662    peer_address: Address = Address.ANY
663    advertising_filter_policy: int = 0
664    advertising_tx_power: int = DEVICE_DEFAULT_ADVERTISING_TX_POWER
665    primary_advertising_phy: Phy = Phy.LE_1M
666    secondary_advertising_max_skip: int = 0
667    secondary_advertising_phy: Phy = Phy.LE_1M
668    advertising_sid: int = 0
669    enable_scan_request_notifications: bool = False
670    primary_advertising_phy_options: int = 0
671    secondary_advertising_phy_options: int = 0
672
673
674# -----------------------------------------------------------------------------
675@dataclass
676class PeriodicAdvertisingParameters:
677    # TODO implement this class
678    pass
679
680
681# -----------------------------------------------------------------------------
682@dataclass
683class AdvertisingSet(EventEmitter):
684    device: Device
685    advertising_handle: int
686    auto_restart: bool
687    random_address: Optional[Address]
688    advertising_parameters: AdvertisingParameters
689    advertising_data: bytes
690    scan_response_data: bytes
691    periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters]
692    periodic_advertising_data: bytes
693    selected_tx_power: int = 0
694    enabled: bool = False
695
696    def __post_init__(self) -> None:
697        super().__init__()
698
699    async def set_advertising_parameters(
700        self, advertising_parameters: AdvertisingParameters
701    ) -> None:
702        # Compliance check
703        if (
704            not advertising_parameters.advertising_event_properties.is_legacy
705            and advertising_parameters.advertising_event_properties.is_connectable
706            and advertising_parameters.advertising_event_properties.is_scannable
707        ):
708            logger.warning(
709                "non-legacy extended advertising event properties may not be both "
710                "connectable and scannable"
711            )
712
713        response = await self.device.send_command(
714            HCI_LE_Set_Extended_Advertising_Parameters_Command(
715                advertising_handle=self.advertising_handle,
716                advertising_event_properties=int(
717                    advertising_parameters.advertising_event_properties
718                ),
719                primary_advertising_interval_min=(
720                    int(advertising_parameters.primary_advertising_interval_min / 0.625)
721                ),
722                primary_advertising_interval_max=(
723                    int(advertising_parameters.primary_advertising_interval_min / 0.625)
724                ),
725                primary_advertising_channel_map=int(
726                    advertising_parameters.primary_advertising_channel_map
727                ),
728                own_address_type=advertising_parameters.own_address_type,
729                peer_address_type=advertising_parameters.peer_address.address_type,
730                peer_address=advertising_parameters.peer_address,
731                advertising_tx_power=advertising_parameters.advertising_tx_power,
732                advertising_filter_policy=(
733                    advertising_parameters.advertising_filter_policy
734                ),
735                primary_advertising_phy=advertising_parameters.primary_advertising_phy,
736                secondary_advertising_max_skip=(
737                    advertising_parameters.secondary_advertising_max_skip
738                ),
739                secondary_advertising_phy=(
740                    advertising_parameters.secondary_advertising_phy
741                ),
742                advertising_sid=advertising_parameters.advertising_sid,
743                scan_request_notification_enable=(
744                    1 if advertising_parameters.enable_scan_request_notifications else 0
745                ),
746            ),
747            check_result=True,
748        )
749        self.selected_tx_power = response.return_parameters.selected_tx_power
750        self.advertising_parameters = advertising_parameters
751
752    async def set_advertising_data(self, advertising_data: bytes) -> None:
753        # pylint: disable=line-too-long
754        await self.device.send_command(
755            HCI_LE_Set_Extended_Advertising_Data_Command(
756                advertising_handle=self.advertising_handle,
757                operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
758                fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT,
759                advertising_data=advertising_data,
760            ),
761            check_result=True,
762        )
763        self.advertising_data = advertising_data
764
765    async def set_scan_response_data(self, scan_response_data: bytes) -> None:
766        # pylint: disable=line-too-long
767        if (
768            scan_response_data
769            and not self.advertising_parameters.advertising_event_properties.is_scannable
770        ):
771            logger.warning(
772                "ignoring attempt to set non-empty scan response data on non-scannable "
773                "advertising set"
774            )
775            return
776
777        await self.device.send_command(
778            HCI_LE_Set_Extended_Scan_Response_Data_Command(
779                advertising_handle=self.advertising_handle,
780                operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
781                fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT,
782                scan_response_data=scan_response_data,
783            ),
784            check_result=True,
785        )
786        self.scan_response_data = scan_response_data
787
788    async def set_periodic_advertising_parameters(
789        self, advertising_parameters: PeriodicAdvertisingParameters
790    ) -> None:
791        # TODO: send command
792        self.periodic_advertising_parameters = advertising_parameters
793
794    async def set_periodic_advertising_data(self, advertising_data: bytes) -> None:
795        # TODO: send command
796        self.periodic_advertising_data = advertising_data
797
798    async def set_random_address(self, random_address: Address) -> None:
799        await self.device.send_command(
800            HCI_LE_Set_Advertising_Set_Random_Address_Command(
801                advertising_handle=self.advertising_handle,
802                random_address=(random_address or self.device.random_address),
803            ),
804            check_result=True,
805        )
806
807    async def start(
808        self, duration: float = 0.0, max_advertising_events: int = 0
809    ) -> None:
810        """
811        Start advertising.
812
813        Args:
814          duration: How long to advertise for, in seconds. Use 0 (the default) for
815          an unlimited duration, unless this advertising set is a High Duty Cycle
816          Directed Advertisement type.
817          max_advertising_events: Maximum number of events to advertise for. Use 0
818          (the default) for an unlimited number of advertisements.
819        """
820        await self.device.send_command(
821            HCI_LE_Set_Extended_Advertising_Enable_Command(
822                enable=1,
823                advertising_handles=[self.advertising_handle],
824                durations=[round(duration * 100)],
825                max_extended_advertising_events=[max_advertising_events],
826            ),
827            check_result=True,
828        )
829        self.enabled = True
830
831        self.emit('start')
832
833    async def start_periodic(self, include_adi: bool = False) -> None:
834        await self.device.send_command(
835            HCI_LE_Set_Periodic_Advertising_Enable_Command(
836                enable=1 | (2 if include_adi else 0),
837                advertising_handles=self.advertising_handle,
838            ),
839            check_result=True,
840        )
841
842        self.emit('start_periodic')
843
844    async def stop(self) -> None:
845        await self.device.send_command(
846            HCI_LE_Set_Extended_Advertising_Enable_Command(
847                enable=0,
848                advertising_handles=[self.advertising_handle],
849                durations=[0],
850                max_extended_advertising_events=[0],
851            ),
852            check_result=True,
853        )
854        self.enabled = False
855
856        self.emit('stop')
857
858    async def stop_periodic(self) -> None:
859        await self.device.send_command(
860            HCI_LE_Set_Periodic_Advertising_Enable_Command(
861                enable=0,
862                advertising_handles=self.advertising_handle,
863            ),
864            check_result=True,
865        )
866
867        self.emit('stop_periodic')
868
869    async def remove(self) -> None:
870        await self.device.send_command(
871            HCI_LE_Remove_Advertising_Set_Command(
872                advertising_handle=self.advertising_handle
873            ),
874            check_result=True,
875        )
876        del self.device.extended_advertising_sets[self.advertising_handle]
877
878    def on_termination(self, status: int) -> None:
879        self.enabled = False
880        self.emit('termination', status)
881
882
883# -----------------------------------------------------------------------------
884class PeriodicAdvertisingSync(EventEmitter):
885    class State(Enum):
886        INIT = 0
887        PENDING = 1
888        ESTABLISHED = 2
889        CANCELLED = 3
890        ERROR = 4
891        LOST = 5
892        TERMINATED = 6
893
894    _state: State
895    sync_handle: Optional[int]
896    advertiser_address: Address
897    sid: int
898    skip: int
899    sync_timeout: float  # Sync timeout, in seconds
900    filter_duplicates: bool
901    status: int
902    advertiser_phy: int
903    periodic_advertising_interval: int
904    advertiser_clock_accuracy: int
905
906    def __init__(
907        self,
908        device: Device,
909        advertiser_address: Address,
910        sid: int,
911        skip: int,
912        sync_timeout: float,
913        filter_duplicates: bool,
914    ) -> None:
915        super().__init__()
916        self._state = self.State.INIT
917        self.sync_handle = None
918        self.device = device
919        self.advertiser_address = advertiser_address
920        self.sid = sid
921        self.skip = skip
922        self.sync_timeout = sync_timeout
923        self.filter_duplicates = filter_duplicates
924        self.status = HCI_SUCCESS
925        self.advertiser_phy = 0
926        self.periodic_advertising_interval = 0
927        self.advertiser_clock_accuracy = 0
928        self.data_accumulator = b''
929
930    @property
931    def state(self) -> State:
932        return self._state
933
934    @state.setter
935    def state(self, state: State) -> None:
936        logger.debug(f'{self} -> {state.name}')
937        self._state = state
938        self.emit('state_change')
939
940    async def establish(self) -> None:
941        if self.state != self.State.INIT:
942            raise InvalidStateError('sync not in init state')
943
944        options = HCI_LE_Periodic_Advertising_Create_Sync_Command.Options(0)
945        if self.filter_duplicates:
946            options |= (
947                HCI_LE_Periodic_Advertising_Create_Sync_Command.Options.DUPLICATE_FILTERING_INITIALLY_ENABLED
948            )
949
950        response = await self.device.send_command(
951            HCI_LE_Periodic_Advertising_Create_Sync_Command(
952                options=options,
953                advertising_sid=self.sid,
954                advertiser_address_type=self.advertiser_address.address_type,
955                advertiser_address=self.advertiser_address,
956                skip=self.skip,
957                sync_timeout=int(self.sync_timeout * 100),
958                sync_cte_type=0,
959            )
960        )
961        if response.status != HCI_Command_Status_Event.PENDING:
962            raise HCI_StatusError(response)
963
964        self.state = self.State.PENDING
965
966    async def terminate(self) -> None:
967        if self.state in (self.State.INIT, self.State.CANCELLED, self.State.TERMINATED):
968            return
969
970        if self.state == self.State.PENDING:
971            self.state = self.State.CANCELLED
972            response = await self.device.send_command(
973                HCI_LE_Periodic_Advertising_Create_Sync_Cancel_Command(),
974            )
975            if response.return_parameters == HCI_SUCCESS:
976                if self in self.device.periodic_advertising_syncs:
977                    self.device.periodic_advertising_syncs.remove(self)
978            return
979
980        if self.state in (self.State.ESTABLISHED, self.State.ERROR, self.State.LOST):
981            self.state = self.State.TERMINATED
982            if self.sync_handle is not None:
983                await self.device.send_command(
984                    HCI_LE_Periodic_Advertising_Terminate_Sync_Command(
985                        sync_handle=self.sync_handle
986                    )
987                )
988            self.device.periodic_advertising_syncs.remove(self)
989
990    async def transfer(self, connection: Connection, service_data: int = 0) -> None:
991        if self.sync_handle is not None:
992            await connection.transfer_periodic_sync(self.sync_handle, service_data)
993
994    def on_establishment(
995        self,
996        status,
997        sync_handle,
998        advertiser_phy,
999        periodic_advertising_interval,
1000        advertiser_clock_accuracy,
1001    ) -> None:
1002        self.status = status
1003
1004        if self.state == self.State.CANCELLED:
1005            # Somehow, we receive an established event after trying to cancel, most
1006            # likely because the cancel command was sent too late, when the sync was
1007            # already established, but before the established event was sent.
1008            # We need to automatically terminate.
1009            logger.debug(
1010                "received established event for cancelled sync, will terminate"
1011            )
1012            self.state = self.State.ESTABLISHED
1013            AsyncRunner.spawn(self.terminate())
1014            return
1015
1016        if status == HCI_SUCCESS:
1017            self.sync_handle = sync_handle
1018            self.advertiser_phy = advertiser_phy
1019            self.periodic_advertising_interval = periodic_advertising_interval
1020            self.advertiser_clock_accuracy = advertiser_clock_accuracy
1021            self.state = self.State.ESTABLISHED
1022            self.emit('establishment')
1023            return
1024
1025        # We don't need to keep a reference anymore
1026        if self in self.device.periodic_advertising_syncs:
1027            self.device.periodic_advertising_syncs.remove(self)
1028
1029        if status == HCI_OPERATION_CANCELLED_BY_HOST_ERROR:
1030            self.state = self.State.CANCELLED
1031            self.emit('cancellation')
1032            return
1033
1034        self.state = self.State.ERROR
1035        self.emit('error')
1036
1037    def on_loss(self):
1038        self.state = self.State.LOST
1039        self.emit('loss')
1040
1041    def on_periodic_advertising_report(self, report) -> None:
1042        self.data_accumulator += report.data
1043        if (
1044            report.data_status
1045            == HCI_LE_Periodic_Advertising_Report_Event.DataStatus.DATA_INCOMPLETE_MORE_TO_COME
1046        ):
1047            return
1048
1049        self.emit(
1050            'periodic_advertisement',
1051            PeriodicAdvertisement(
1052                self.advertiser_address,
1053                self.sid,
1054                report.tx_power,
1055                report.rssi,
1056                is_truncated=(
1057                    report.data_status
1058                    == HCI_LE_Periodic_Advertising_Report_Event.DataStatus.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME
1059                ),
1060                data_bytes=self.data_accumulator,
1061            ),
1062        )
1063        self.data_accumulator = b''
1064
1065    def on_biginfo_advertising_report(self, report) -> None:
1066        self.emit(
1067            'biginfo_advertisement',
1068            BIGInfoAdvertisement.from_report(self.advertiser_address, self.sid, report),
1069        )
1070
1071    def __str__(self) -> str:
1072        return (
1073            'PeriodicAdvertisingSync('
1074            f'state={self.state.name}, '
1075            f'sync_handle={self.sync_handle}, '
1076            f'sid={self.sid}, '
1077            f'skip={self.skip}, '
1078            f'filter_duplicates={self.filter_duplicates}'
1079            ')'
1080        )
1081
1082
1083# -----------------------------------------------------------------------------
1084class LePhyOptions:
1085    # Coded PHY preference
1086    ANY_CODED_PHY = 0
1087    PREFER_S_2_CODED_PHY = 1
1088    PREFER_S_8_CODED_PHY = 2
1089
1090    def __init__(self, coded_phy_preference=0):
1091        self.coded_phy_preference = coded_phy_preference
1092
1093    def __int__(self):
1094        return self.coded_phy_preference & 3
1095
1096
1097# -----------------------------------------------------------------------------
1098_PROXY_CLASS = TypeVar('_PROXY_CLASS', bound=gatt_client.ProfileServiceProxy)
1099
1100
1101class Peer:
1102    def __init__(self, connection: Connection) -> None:
1103        self.connection = connection
1104
1105        # Create a GATT client for the connection
1106        self.gatt_client = gatt_client.Client(connection)
1107        connection.gatt_client = self.gatt_client
1108
1109    @property
1110    def services(self) -> List[gatt_client.ServiceProxy]:
1111        return self.gatt_client.services
1112
1113    async def request_mtu(self, mtu: int) -> int:
1114        mtu = await self.gatt_client.request_mtu(mtu)
1115        self.connection.emit('connection_att_mtu_update')
1116        return mtu
1117
1118    async def discover_service(
1119        self, uuid: Union[core.UUID, str]
1120    ) -> List[gatt_client.ServiceProxy]:
1121        return await self.gatt_client.discover_service(uuid)
1122
1123    async def discover_services(
1124        self, uuids: Iterable[core.UUID] = ()
1125    ) -> List[gatt_client.ServiceProxy]:
1126        return await self.gatt_client.discover_services(uuids)
1127
1128    async def discover_included_services(
1129        self, service: gatt_client.ServiceProxy
1130    ) -> List[gatt_client.ServiceProxy]:
1131        return await self.gatt_client.discover_included_services(service)
1132
1133    async def discover_characteristics(
1134        self,
1135        uuids: Iterable[Union[core.UUID, str]] = (),
1136        service: Optional[gatt_client.ServiceProxy] = None,
1137    ) -> List[gatt_client.CharacteristicProxy]:
1138        return await self.gatt_client.discover_characteristics(
1139            uuids=uuids, service=service
1140        )
1141
1142    async def discover_descriptors(
1143        self,
1144        characteristic: Optional[gatt_client.CharacteristicProxy] = None,
1145        start_handle: Optional[int] = None,
1146        end_handle: Optional[int] = None,
1147    ):
1148        return await self.gatt_client.discover_descriptors(
1149            characteristic, start_handle, end_handle
1150        )
1151
1152    async def discover_attributes(self) -> List[gatt_client.AttributeProxy]:
1153        return await self.gatt_client.discover_attributes()
1154
1155    async def discover_all(self):
1156        await self.discover_services()
1157        for service in self.services:
1158            await self.discover_characteristics(service=service)
1159
1160        for service in self.services:
1161            for characteristic in service.characteristics:
1162                await self.discover_descriptors(characteristic=characteristic)
1163
1164    async def subscribe(
1165        self,
1166        characteristic: gatt_client.CharacteristicProxy,
1167        subscriber: Optional[Callable[[bytes], Any]] = None,
1168        prefer_notify: bool = True,
1169    ) -> None:
1170        return await self.gatt_client.subscribe(
1171            characteristic, subscriber, prefer_notify
1172        )
1173
1174    async def unsubscribe(
1175        self,
1176        characteristic: gatt_client.CharacteristicProxy,
1177        subscriber: Optional[Callable[[bytes], Any]] = None,
1178    ) -> None:
1179        return await self.gatt_client.unsubscribe(characteristic, subscriber)
1180
1181    async def read_value(
1182        self, attribute: Union[int, gatt_client.AttributeProxy]
1183    ) -> bytes:
1184        return await self.gatt_client.read_value(attribute)
1185
1186    async def write_value(
1187        self,
1188        attribute: Union[int, gatt_client.AttributeProxy],
1189        value: bytes,
1190        with_response: bool = False,
1191    ) -> None:
1192        return await self.gatt_client.write_value(attribute, value, with_response)
1193
1194    async def read_characteristics_by_uuid(
1195        self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
1196    ) -> List[bytes]:
1197        return await self.gatt_client.read_characteristics_by_uuid(uuid, service)
1198
1199    def get_services_by_uuid(self, uuid: core.UUID) -> List[gatt_client.ServiceProxy]:
1200        return self.gatt_client.get_services_by_uuid(uuid)
1201
1202    def get_characteristics_by_uuid(
1203        self,
1204        uuid: core.UUID,
1205        service: Optional[Union[gatt_client.ServiceProxy, core.UUID]] = None,
1206    ) -> List[gatt_client.CharacteristicProxy]:
1207        if isinstance(service, core.UUID):
1208            return list(
1209                itertools.chain(
1210                    *[
1211                        self.get_characteristics_by_uuid(uuid, s)
1212                        for s in self.get_services_by_uuid(service)
1213                    ]
1214                )
1215            )
1216
1217        return self.gatt_client.get_characteristics_by_uuid(uuid, service)
1218
1219    def create_service_proxy(
1220        self, proxy_class: Type[_PROXY_CLASS]
1221    ) -> Optional[_PROXY_CLASS]:
1222        if proxy := proxy_class.from_client(self.gatt_client):
1223            return cast(_PROXY_CLASS, proxy)
1224
1225        return None
1226
1227    async def discover_service_and_create_proxy(
1228        self, proxy_class: Type[_PROXY_CLASS]
1229    ) -> Optional[_PROXY_CLASS]:
1230        # Discover the first matching service and its characteristics
1231        services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
1232        if services:
1233            service = services[0]
1234            await service.discover_characteristics()
1235            return self.create_service_proxy(proxy_class)
1236        return None
1237
1238    async def sustain(self, timeout: Optional[float] = None) -> None:
1239        await self.connection.sustain(timeout)
1240
1241    # [Classic only]
1242    async def request_name(self) -> str:
1243        return await self.connection.request_remote_name()
1244
1245    async def __aenter__(self):
1246        await self.discover_services()
1247        for service in self.services:
1248            await service.discover_characteristics()
1249
1250        return self
1251
1252    async def __aexit__(self, exc_type, exc_value, traceback):
1253        pass
1254
1255    def __str__(self) -> str:
1256        return f'{self.connection.peer_address} as {self.connection.role_name}'
1257
1258
1259# -----------------------------------------------------------------------------
1260@dataclass
1261class ConnectionParametersPreferences:
1262    default: ClassVar[ConnectionParametersPreferences]
1263    connection_interval_min: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN
1264    connection_interval_max: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX
1265    max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY
1266    supervision_timeout: int = DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT
1267    min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH
1268    max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH
1269
1270
1271ConnectionParametersPreferences.default = ConnectionParametersPreferences()
1272
1273
1274# -----------------------------------------------------------------------------
1275@dataclass
1276class ScoLink(CompositeEventEmitter):
1277    device: Device
1278    acl_connection: Connection
1279    handle: int
1280    link_type: int
1281    sink: Optional[Callable[[HCI_SynchronousDataPacket], Any]] = None
1282
1283    def __post_init__(self) -> None:
1284        super().__init__()
1285
1286    async def disconnect(
1287        self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
1288    ) -> None:
1289        await self.device.disconnect(self, reason)
1290
1291
1292# -----------------------------------------------------------------------------
1293@dataclass
1294class CisLink(CompositeEventEmitter):
1295    class State(IntEnum):
1296        PENDING = 0
1297        ESTABLISHED = 1
1298
1299    device: Device
1300    acl_connection: Connection  # Based ACL connection
1301    handle: int  # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events)
1302    cis_id: int  # CIS ID assigned by Central device
1303    cig_id: int  # CIG ID assigned by Central device
1304    state: State = State.PENDING
1305    sink: Optional[Callable[[HCI_IsoDataPacket], Any]] = None
1306
1307    def __post_init__(self) -> None:
1308        super().__init__()
1309
1310    async def disconnect(
1311        self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
1312    ) -> None:
1313        await self.device.disconnect(self, reason)
1314
1315
1316# -----------------------------------------------------------------------------
1317class Connection(CompositeEventEmitter):
1318    device: Device
1319    handle: int
1320    transport: int
1321    self_address: Address
1322    self_resolvable_address: Optional[Address]
1323    peer_address: Address
1324    peer_resolvable_address: Optional[Address]
1325    peer_le_features: Optional[LeFeatureMask]
1326    role: int
1327    encryption: int
1328    authenticated: bool
1329    sc: bool
1330    link_key_type: int
1331    gatt_client: gatt_client.Client
1332    pairing_peer_io_capability: Optional[int]
1333    pairing_peer_authentication_requirements: Optional[int]
1334
1335    @composite_listener
1336    class Listener:
1337        def on_disconnection(self, reason):
1338            pass
1339
1340        def on_connection_parameters_update(self):
1341            pass
1342
1343        def on_connection_parameters_update_failure(self, error):
1344            pass
1345
1346        def on_connection_data_length_change(self):
1347            pass
1348
1349        def on_connection_phy_update(self):
1350            pass
1351
1352        def on_connection_phy_update_failure(self, error):
1353            pass
1354
1355        def on_connection_att_mtu_update(self):
1356            pass
1357
1358        def on_connection_encryption_change(self):
1359            pass
1360
1361        def on_connection_encryption_key_refresh(self):
1362            pass
1363
1364    def __init__(
1365        self,
1366        device,
1367        handle,
1368        transport,
1369        self_address,
1370        self_resolvable_address,
1371        peer_address,
1372        peer_resolvable_address,
1373        role,
1374        parameters,
1375        phy,
1376    ):
1377        super().__init__()
1378        self.device = device
1379        self.handle = handle
1380        self.transport = transport
1381        self.self_address = self_address
1382        self.self_resolvable_address = self_resolvable_address
1383        self.peer_address = peer_address
1384        self.peer_resolvable_address = peer_resolvable_address
1385        self.peer_name = None  # Classic only
1386        self.role = role
1387        self.parameters = parameters
1388        self.encryption = 0
1389        self.authenticated = False
1390        self.sc = False
1391        self.link_key_type = None
1392        self.phy = phy
1393        self.att_mtu = ATT_DEFAULT_MTU
1394        self.data_length = DEVICE_DEFAULT_DATA_LENGTH
1395        self.gatt_client = None  # Per-connection client
1396        self.gatt_server = (
1397            device.gatt_server
1398        )  # By default, use the device's shared server
1399        self.pairing_peer_io_capability = None
1400        self.pairing_peer_authentication_requirements = None
1401        self.peer_le_features = None
1402
1403    # [Classic only]
1404    @classmethod
1405    def incomplete(cls, device, peer_address, role):
1406        """
1407        Instantiate an incomplete connection (ie. one waiting for a HCI Connection
1408        Complete event).
1409        Once received it shall be completed using the `.complete` method.
1410        """
1411        return cls(
1412            device,
1413            None,
1414            BT_BR_EDR_TRANSPORT,
1415            device.public_address,
1416            None,
1417            peer_address,
1418            None,
1419            role,
1420            None,
1421            None,
1422        )
1423
1424    # [Classic only]
1425    def complete(self, handle, parameters):
1426        """
1427        Finish an incomplete connection upon completion.
1428        """
1429        assert self.handle is None
1430        assert self.transport == BT_BR_EDR_TRANSPORT
1431        self.handle = handle
1432        self.parameters = parameters
1433
1434    @property
1435    def role_name(self):
1436        if self.role is None:
1437            return 'NOT-SET'
1438        if self.role == BT_CENTRAL_ROLE:
1439            return 'CENTRAL'
1440        if self.role == BT_PERIPHERAL_ROLE:
1441            return 'PERIPHERAL'
1442        return f'UNKNOWN[{self.role}]'
1443
1444    @property
1445    def is_encrypted(self):
1446        return self.encryption != 0
1447
1448    @property
1449    def is_incomplete(self) -> bool:
1450        return self.handle is None
1451
1452    def send_l2cap_pdu(self, cid: int, pdu: bytes) -> None:
1453        self.device.send_l2cap_pdu(self.handle, cid, pdu)
1454
1455    @deprecated("Please use create_l2cap_channel()")
1456    async def open_l2cap_channel(
1457        self,
1458        psm,
1459        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
1460        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
1461        mps=DEVICE_DEFAULT_L2CAP_COC_MPS,
1462    ):
1463        return await self.device.open_l2cap_channel(self, psm, max_credits, mtu, mps)
1464
1465    @overload
1466    async def create_l2cap_channel(
1467        self, spec: l2cap.ClassicChannelSpec
1468    ) -> l2cap.ClassicChannel: ...
1469
1470    @overload
1471    async def create_l2cap_channel(
1472        self, spec: l2cap.LeCreditBasedChannelSpec
1473    ) -> l2cap.LeCreditBasedChannel: ...
1474
1475    async def create_l2cap_channel(
1476        self, spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec]
1477    ) -> Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]:
1478        return await self.device.create_l2cap_channel(connection=self, spec=spec)
1479
1480    async def disconnect(
1481        self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
1482    ) -> None:
1483        await self.device.disconnect(self, reason)
1484
1485    async def pair(self) -> None:
1486        return await self.device.pair(self)
1487
1488    def request_pairing(self) -> None:
1489        return self.device.request_pairing(self)
1490
1491    # [Classic only]
1492    async def authenticate(self) -> None:
1493        return await self.device.authenticate(self)
1494
1495    async def encrypt(self, enable: bool = True) -> None:
1496        return await self.device.encrypt(self, enable)
1497
1498    async def switch_role(self, role: int) -> None:
1499        return await self.device.switch_role(self, role)
1500
1501    async def sustain(self, timeout: Optional[float] = None) -> None:
1502        """Idles the current task waiting for a disconnect or timeout"""
1503
1504        abort = asyncio.get_running_loop().create_future()
1505        self.on('disconnection', abort.set_result)
1506        self.on('disconnection_failure', abort.set_exception)
1507
1508        try:
1509            await asyncio.wait_for(self.device.abort_on('flush', abort), timeout)
1510        finally:
1511            self.remove_listener('disconnection', abort.set_result)
1512            self.remove_listener('disconnection_failure', abort.set_exception)
1513
1514    async def set_data_length(self, tx_octets, tx_time) -> None:
1515        return await self.device.set_data_length(self, tx_octets, tx_time)
1516
1517    async def update_parameters(
1518        self,
1519        connection_interval_min,
1520        connection_interval_max,
1521        max_latency,
1522        supervision_timeout,
1523        use_l2cap=False,
1524    ):
1525        return await self.device.update_connection_parameters(
1526            self,
1527            connection_interval_min,
1528            connection_interval_max,
1529            max_latency,
1530            supervision_timeout,
1531            use_l2cap=use_l2cap,
1532        )
1533
1534    async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None):
1535        return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options)
1536
1537    async def get_rssi(self):
1538        return await self.device.get_connection_rssi(self)
1539
1540    async def get_phy(self):
1541        return await self.device.get_connection_phy(self)
1542
1543    async def transfer_periodic_sync(
1544        self, sync_handle: int, service_data: int = 0
1545    ) -> None:
1546        await self.device.transfer_periodic_sync(self, sync_handle, service_data)
1547
1548    # [Classic only]
1549    async def request_remote_name(self):
1550        return await self.device.request_remote_name(self)
1551
1552    async def get_remote_le_features(self) -> LeFeatureMask:
1553        """[LE Only] Reads remote LE supported features.
1554
1555        Returns:
1556            LE features supported by the remote device.
1557        """
1558        self.peer_le_features = await self.device.get_remote_le_features(self)
1559        return self.peer_le_features
1560
1561    async def __aenter__(self):
1562        return self
1563
1564    async def __aexit__(self, exc_type, exc_value, traceback):
1565        if exc_type is None:
1566            try:
1567                await self.disconnect()
1568            except HCI_StatusError as error:
1569                # Invalid parameter means the connection is no longer valid
1570                if error.error_code != HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR:
1571                    raise
1572
1573    def __str__(self):
1574        return (
1575            f'Connection(handle=0x{self.handle:04X}, '
1576            f'role={self.role_name}, '
1577            f'self_address={self.self_address}, '
1578            f'self_resolvable_address={self.self_resolvable_address}, '
1579            f'peer_address={self.peer_address}, '
1580            f'peer_resolvable_address={self.peer_resolvable_address})'
1581        )
1582
1583
1584# -----------------------------------------------------------------------------
1585@dataclass
1586class DeviceConfiguration:
1587    # Setup defaults
1588    name: str = DEVICE_DEFAULT_NAME
1589    address: Address = Address(DEVICE_DEFAULT_ADDRESS)
1590    class_of_device: int = DEVICE_DEFAULT_CLASS_OF_DEVICE
1591    scan_response_data: bytes = DEVICE_DEFAULT_SCAN_RESPONSE_DATA
1592    advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
1593    advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
1594    le_enabled: bool = True
1595    le_simultaneous_enabled: bool = False
1596    le_privacy_enabled: bool = False
1597    le_rpa_timeout: int = DEVICE_DEFAULT_LE_RPA_TIMEOUT
1598    classic_enabled: bool = False
1599    classic_sc_enabled: bool = True
1600    classic_ssp_enabled: bool = True
1601    classic_smp_enabled: bool = True
1602    classic_accept_any: bool = True
1603    classic_interlaced_scan_enabled: bool = True
1604    connectable: bool = True
1605    discoverable: bool = True
1606    advertising_data: bytes = bytes(
1607        AdvertisingData(
1608            [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(DEVICE_DEFAULT_NAME, 'utf-8'))]
1609        )
1610    )
1611    irk: bytes = bytes(16)  # This really must be changed for any level of security
1612    keystore: Optional[str] = None
1613    address_resolution_offload: bool = False
1614    address_generation_offload: bool = False
1615    cis_enabled: bool = False
1616    identity_address_type: Optional[int] = None
1617    io_capability: int = pairing.PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT
1618
1619    def __post_init__(self) -> None:
1620        self.gatt_services: List[Dict[str, Any]] = []
1621
1622    def load_from_dict(self, config: Dict[str, Any]) -> None:
1623        config = copy.deepcopy(config)
1624
1625        # Load simple properties
1626        if address := config.pop('address', None):
1627            self.address = Address(address)
1628
1629        # Load or synthesize an IRK
1630        if irk := config.pop('irk', None):
1631            self.irk = bytes.fromhex(irk)
1632        elif self.address != Address(DEVICE_DEFAULT_ADDRESS):
1633            # Construct an IRK from the address bytes
1634            # NOTE: this is not secure, but will always give the same IRK for the same
1635            # address
1636            address_bytes = bytes(self.address)
1637            self.irk = (address_bytes * 3)[:16]
1638        else:
1639            # Fallback - when both IRK and address are not set, randomly generate an IRK.
1640            self.irk = secrets.token_bytes(16)
1641
1642        if (name := config.pop('name', None)) is not None:
1643            self.name = name
1644
1645        # Load advertising data
1646        if advertising_data := config.pop('advertising_data', None):
1647            self.advertising_data = bytes.fromhex(advertising_data)
1648        elif name is not None:
1649            self.advertising_data = bytes(
1650                AdvertisingData(
1651                    [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))]
1652                )
1653            )
1654
1655        # Load advertising interval (for backward compatibility)
1656        if advertising_interval := config.pop('advertising_interval', None):
1657            self.advertising_interval_min = advertising_interval
1658            self.advertising_interval_max = advertising_interval
1659            if (
1660                'advertising_interval_max' in config
1661                or 'advertising_interval_min' in config
1662            ):
1663                logger.warning(
1664                    'Trying to set both advertising_interval and '
1665                    'advertising_interval_min/max, advertising_interval will be'
1666                    'ignored.'
1667                )
1668
1669        # Load data in primitive types.
1670        for key, value in config.items():
1671            setattr(self, key, value)
1672
1673    def load_from_file(self, filename: str) -> None:
1674        with open(filename, 'r', encoding='utf-8') as file:
1675            self.load_from_dict(json.load(file))
1676
1677    @classmethod
1678    def from_file(cls: Type[Self], filename: str) -> Self:
1679        config = cls()
1680        config.load_from_file(filename)
1681        return config
1682
1683    @classmethod
1684    def from_dict(cls: Type[Self], config: Dict[str, Any]) -> Self:
1685        device_config = cls()
1686        device_config.load_from_dict(config)
1687        return device_config
1688
1689
1690# -----------------------------------------------------------------------------
1691# Decorators used with the following Device class
1692# (we define them outside of the Device class, because defining decorators
1693#  within a class requires unnecessarily complicated acrobatics)
1694# -----------------------------------------------------------------------------
1695
1696
1697# Decorator that converts the first argument from a connection handle to a connection
1698def with_connection_from_handle(function):
1699    @functools.wraps(function)
1700    def wrapper(self, connection_handle, *args, **kwargs):
1701        if (connection := self.lookup_connection(connection_handle)) is None:
1702            raise ObjectLookupError(
1703                f'no connection for handle: 0x{connection_handle:04x}'
1704            )
1705        return function(self, connection, *args, **kwargs)
1706
1707    return wrapper
1708
1709
1710# Decorator that converts the first argument from a bluetooth address to a connection
1711def with_connection_from_address(function):
1712    @functools.wraps(function)
1713    def wrapper(self, address, *args, **kwargs):
1714        if connection := self.pending_connections.get(address, False):
1715            return function(self, connection, *args, **kwargs)
1716        for connection in self.connections.values():
1717            if connection.peer_address == address:
1718                return function(self, connection, *args, **kwargs)
1719        raise ObjectLookupError('no connection for address')
1720
1721    return wrapper
1722
1723
1724# Decorator that tries to convert the first argument from a bluetooth address to a
1725# connection
1726def try_with_connection_from_address(function):
1727    @functools.wraps(function)
1728    def wrapper(self, address, *args, **kwargs):
1729        if connection := self.pending_connections.get(address, False):
1730            return function(self, connection, address, *args, **kwargs)
1731        for connection in self.connections.values():
1732            if connection.peer_address == address:
1733                return function(self, connection, address, *args, **kwargs)
1734        return function(self, None, address, *args, **kwargs)
1735
1736    return wrapper
1737
1738
1739# Decorator that converts the first argument from a sync handle to a periodic
1740# advertising sync object
1741def with_periodic_advertising_sync_from_handle(function):
1742    @functools.wraps(function)
1743    def wrapper(self, sync_handle, *args, **kwargs):
1744        if (sync := self.lookup_periodic_advertising_sync(sync_handle)) is None:
1745            raise ValueError(
1746                f'no periodic advertising sync for handle: 0x{sync_handle:04x}'
1747            )
1748        return function(self, sync, *args, **kwargs)
1749
1750    return wrapper
1751
1752
1753# Decorator that adds a method to the list of event handlers for host events.
1754# This assumes that the method name starts with `on_`
1755def host_event_handler(function):
1756    device_host_event_handlers.append(function.__name__[3:])
1757    return function
1758
1759
1760# List of host event handlers for the Device class.
1761# (we define this list outside the class, because referencing a class in method
1762#  decorators is not straightforward)
1763device_host_event_handlers: List[str] = []
1764
1765
1766# -----------------------------------------------------------------------------
1767class Device(CompositeEventEmitter):
1768    # Incomplete list of fields.
1769    random_address: Address  # Random private address that may change periodically
1770    public_address: Address  # Public address that is globally unique (from controller)
1771    static_address: Address  # Random static address that does not change once set
1772    classic_enabled: bool
1773    name: str
1774    class_of_device: int
1775    gatt_server: gatt_server.Server
1776    advertising_data: bytes
1777    scan_response_data: bytes
1778    connections: Dict[int, Connection]
1779    pending_connections: Dict[Address, Connection]
1780    classic_pending_accepts: Dict[
1781        Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]]
1782    ]
1783    advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
1784    periodic_advertising_syncs: List[PeriodicAdvertisingSync]
1785    config: DeviceConfiguration
1786    legacy_advertiser: Optional[LegacyAdvertiser]
1787    sco_links: Dict[int, ScoLink]
1788    cis_links: Dict[int, CisLink]
1789    _pending_cis: Dict[int, Tuple[int, int]]
1790
1791    @composite_listener
1792    class Listener:
1793        def on_advertisement(self, advertisement):
1794            pass
1795
1796        def on_inquiry_result(self, address, class_of_device, data, rssi):
1797            pass
1798
1799        def on_connection(self, connection):
1800            pass
1801
1802        def on_connection_failure(self, error):
1803            pass
1804
1805        def on_connection_request(self, bd_addr, class_of_device, link_type):
1806            pass
1807
1808        def on_characteristic_subscription(
1809            self, connection, characteristic, notify_enabled, indicate_enabled
1810        ):
1811            pass
1812
1813    @classmethod
1814    def with_hci(
1815        cls,
1816        name: str,
1817        address: Address,
1818        hci_source: TransportSource,
1819        hci_sink: TransportSink,
1820    ) -> Device:
1821        '''
1822        Create a Device instance with a Host configured to communicate with a controller
1823        through an HCI source/sink
1824        '''
1825        host = Host(controller_source=hci_source, controller_sink=hci_sink)
1826        return cls(name=name, address=address, host=host)
1827
1828    @classmethod
1829    def from_config_file(cls, filename: str) -> Device:
1830        config = DeviceConfiguration.from_file(filename)
1831        return cls(config=config)
1832
1833    @classmethod
1834    def from_config_with_hci(
1835        cls,
1836        config: DeviceConfiguration,
1837        hci_source: TransportSource,
1838        hci_sink: TransportSink,
1839    ) -> Device:
1840        host = Host(controller_source=hci_source, controller_sink=hci_sink)
1841        return cls(config=config, host=host)
1842
1843    @classmethod
1844    def from_config_file_with_hci(
1845        cls, filename: str, hci_source: TransportSource, hci_sink: TransportSink
1846    ) -> Device:
1847        config = DeviceConfiguration.from_file(filename)
1848        return cls.from_config_with_hci(config, hci_source, hci_sink)
1849
1850    def __init__(
1851        self,
1852        name: Optional[str] = None,
1853        address: Optional[Address] = None,
1854        config: Optional[DeviceConfiguration] = None,
1855        host: Optional[Host] = None,
1856        generic_access_service: bool = True,
1857    ) -> None:
1858        super().__init__()
1859
1860        self._host = None
1861        self.powered_on = False
1862        self.auto_restart_inquiry = True
1863        self.command_timeout = 10  # seconds
1864        self.gatt_server = gatt_server.Server(self)
1865        self.sdp_server = sdp.Server(self)
1866        self.l2cap_channel_manager = l2cap.ChannelManager(
1867            [l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS]
1868        )
1869        self.advertisement_accumulators = {}  # Accumulators, by address
1870        self.periodic_advertising_syncs = []
1871        self.scanning = False
1872        self.scanning_is_passive = False
1873        self.discovering = False
1874        self.le_connecting = False
1875        self.disconnecting = False
1876        self.connections = {}  # Connections, by connection handle
1877        self.pending_connections = {}  # Connections, by BD address (BR/EDR only)
1878        self.sco_links = {}  # ScoLinks, by connection handle (BR/EDR only)
1879        self.cis_links = {}  # CisLinks, by connection handle (LE only)
1880        self._pending_cis = {}  # (CIS_ID, CIG_ID), by CIS_handle
1881        self.classic_enabled = False
1882        self.inquiry_response = None
1883        self.address_resolver = None
1884        self.classic_pending_accepts = {
1885            Address.ANY: []
1886        }  # Futures, by BD address OR [Futures] for Address.ANY
1887
1888        # In Python <= 3.9 + Rust Runtime, asyncio.Lock cannot be properly initiated.
1889        if sys.version_info >= (3, 10):
1890            self._cis_lock = asyncio.Lock()
1891        else:
1892            self._cis_lock = AsyncExitStack()
1893
1894        # Own address type cache
1895        self.connect_own_address_type = None
1896
1897        # Use the initial config or a default
1898        config = config or DeviceConfiguration()
1899        self.config = config
1900
1901        self.name = config.name
1902        self.public_address = Address.ANY
1903        self.random_address = config.address
1904        self.static_address = config.address
1905        self.class_of_device = config.class_of_device
1906        self.keystore = None
1907        self.irk = config.irk
1908        self.le_enabled = config.le_enabled
1909        self.le_simultaneous_enabled = config.le_simultaneous_enabled
1910        self.le_privacy_enabled = config.le_privacy_enabled
1911        self.le_rpa_timeout = config.le_rpa_timeout
1912        self.le_rpa_periodic_update_task: Optional[asyncio.Task] = None
1913        self.classic_enabled = config.classic_enabled
1914        self.cis_enabled = config.cis_enabled
1915        self.classic_sc_enabled = config.classic_sc_enabled
1916        self.classic_ssp_enabled = config.classic_ssp_enabled
1917        self.classic_smp_enabled = config.classic_smp_enabled
1918        self.classic_interlaced_scan_enabled = config.classic_interlaced_scan_enabled
1919        self.discoverable = config.discoverable
1920        self.connectable = config.connectable
1921        self.classic_accept_any = config.classic_accept_any
1922        self.address_resolution_offload = config.address_resolution_offload
1923        self.address_generation_offload = config.address_generation_offload
1924
1925        # Extended advertising.
1926        self.extended_advertising_sets: Dict[int, AdvertisingSet] = {}
1927        self.connecting_extended_advertising_sets: Dict[int, AdvertisingSet] = {}
1928
1929        # Legacy advertising.
1930        # The advertising and scan response data, as well as the advertising interval
1931        # values are stored as properties of this object for convenience so that they
1932        # can be initialized from a config object, and for backward compatibility for
1933        # client code that may set those values directly before calling
1934        # start_advertising().
1935        self.legacy_advertising_set: Optional[AdvertisingSet] = None
1936        self.legacy_advertiser: Optional[LegacyAdvertiser] = None
1937        self.advertising_data = config.advertising_data
1938        self.scan_response_data = config.scan_response_data
1939        self.advertising_interval_min = config.advertising_interval_min
1940        self.advertising_interval_max = config.advertising_interval_max
1941
1942        for service in config.gatt_services:
1943            characteristics = []
1944            for characteristic in service.get("characteristics", []):
1945                descriptors = []
1946                for descriptor in characteristic.get("descriptors", []):
1947                    # Leave this check until 5/25/2023
1948                    if descriptor.get("permission", False):
1949                        raise Exception(
1950                            "Error parsing Device Config's GATT Services. "
1951                            "The key 'permission' must be renamed to 'permissions'"
1952                        )
1953                    new_descriptor = Descriptor(
1954                        attribute_type=descriptor["descriptor_type"],
1955                        permissions=descriptor["permissions"],
1956                    )
1957                    descriptors.append(new_descriptor)
1958                new_characteristic = Characteristic(
1959                    uuid=characteristic["uuid"],
1960                    properties=Characteristic.Properties.from_string(
1961                        characteristic["properties"]
1962                    ),
1963                    permissions=characteristic["permissions"],
1964                    descriptors=descriptors,
1965                )
1966                characteristics.append(new_characteristic)
1967            new_service = Service(uuid=service["uuid"], characteristics=characteristics)
1968            self.gatt_server.add_service(new_service)
1969
1970        # If a name is passed, override the name from the config
1971        if name:
1972            self.name = name
1973
1974        # If an address is passed, override the address from the config
1975        if address:
1976            if isinstance(address, str):
1977                address = Address(address)
1978            self.random_address = address
1979            self.static_address = address
1980
1981        # Setup SMP
1982        self.smp_manager = smp.Manager(
1983            self,
1984            pairing_config_factory=lambda connection: pairing.PairingConfig(
1985                identity_address_type=(
1986                    pairing.PairingConfig.AddressType(self.config.identity_address_type)
1987                    if self.config.identity_address_type
1988                    else None
1989                ),
1990                delegate=pairing.PairingDelegate(
1991                    io_capability=pairing.PairingDelegate.IoCapability(
1992                        self.config.io_capability
1993                    )
1994                ),
1995            ),
1996        )
1997
1998        self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
1999
2000        # Register the SDP server with the L2CAP Channel Manager
2001        self.sdp_server.register(self.l2cap_channel_manager)
2002
2003        self.add_default_services(generic_access_service)
2004        self.l2cap_channel_manager.register_fixed_channel(ATT_CID, self.on_gatt_pdu)
2005
2006        # Forward some events
2007        setup_event_forwarding(self.gatt_server, self, 'characteristic_subscription')
2008
2009        # Set the initial host
2010        if host:
2011            self.host = host
2012
2013    @property
2014    def host(self) -> Host:
2015        assert self._host
2016        return self._host
2017
2018    @host.setter
2019    def host(self, host: Host) -> None:
2020        # Unsubscribe from events from the current host
2021        if self._host:
2022            for event_name in device_host_event_handlers:
2023                self._host.remove_listener(
2024                    event_name, getattr(self, f'on_{event_name}')
2025                )
2026
2027        # Subscribe to events from the new host
2028        if host:
2029            for event_name in device_host_event_handlers:
2030                host.on(event_name, getattr(self, f'on_{event_name}'))
2031
2032        # Update the references to the new host
2033        self._host = host
2034        self.l2cap_channel_manager.host = host
2035
2036        # Set providers for the new host
2037        if host:
2038            host.long_term_key_provider = self.get_long_term_key
2039            host.link_key_provider = self.get_link_key
2040
2041    @property
2042    def sdp_service_records(self):
2043        return self.sdp_server.service_records
2044
2045    @sdp_service_records.setter
2046    def sdp_service_records(self, service_records):
2047        self.sdp_server.service_records = service_records
2048
2049    def lookup_connection(self, connection_handle: int) -> Optional[Connection]:
2050        if connection := self.connections.get(connection_handle):
2051            return connection
2052
2053        return None
2054
2055    def find_connection_by_bd_addr(
2056        self,
2057        bd_addr: Address,
2058        transport: Optional[int] = None,
2059        check_address_type: bool = False,
2060    ) -> Optional[Connection]:
2061        for connection in self.connections.values():
2062            if connection.peer_address.to_bytes() == bd_addr.to_bytes():
2063                if (
2064                    check_address_type
2065                    and connection.peer_address.address_type != bd_addr.address_type
2066                ):
2067                    continue
2068                if transport is None or connection.transport == transport:
2069                    return connection
2070
2071        return None
2072
2073    def lookup_periodic_advertising_sync(
2074        self, sync_handle: int
2075    ) -> Optional[PeriodicAdvertisingSync]:
2076        return next(
2077            (
2078                sync
2079                for sync in self.periodic_advertising_syncs
2080                if sync.sync_handle == sync_handle
2081            ),
2082            None,
2083        )
2084
2085    @deprecated("Please use create_l2cap_server()")
2086    def register_l2cap_server(self, psm, server) -> int:
2087        return self.l2cap_channel_manager.register_server(psm, server)
2088
2089    @deprecated("Please use create_l2cap_server()")
2090    def register_l2cap_channel_server(
2091        self,
2092        psm,
2093        server,
2094        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
2095        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
2096        mps=DEVICE_DEFAULT_L2CAP_COC_MPS,
2097    ):
2098        return self.l2cap_channel_manager.register_le_coc_server(
2099            psm, server, max_credits, mtu, mps
2100        )
2101
2102    @deprecated("Please use create_l2cap_channel()")
2103    async def open_l2cap_channel(
2104        self,
2105        connection,
2106        psm,
2107        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
2108        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
2109        mps=DEVICE_DEFAULT_L2CAP_COC_MPS,
2110    ):
2111        return await self.l2cap_channel_manager.open_le_coc(
2112            connection, psm, max_credits, mtu, mps
2113        )
2114
2115    @overload
2116    async def create_l2cap_channel(
2117        self,
2118        connection: Connection,
2119        spec: l2cap.ClassicChannelSpec,
2120    ) -> l2cap.ClassicChannel: ...
2121
2122    @overload
2123    async def create_l2cap_channel(
2124        self,
2125        connection: Connection,
2126        spec: l2cap.LeCreditBasedChannelSpec,
2127    ) -> l2cap.LeCreditBasedChannel: ...
2128
2129    async def create_l2cap_channel(
2130        self,
2131        connection: Connection,
2132        spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec],
2133    ) -> Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]:
2134        if isinstance(spec, l2cap.ClassicChannelSpec):
2135            return await self.l2cap_channel_manager.create_classic_channel(
2136                connection=connection, spec=spec
2137            )
2138        if isinstance(spec, l2cap.LeCreditBasedChannelSpec):
2139            return await self.l2cap_channel_manager.create_le_credit_based_channel(
2140                connection=connection, spec=spec
2141            )
2142
2143    @overload
2144    def create_l2cap_server(
2145        self,
2146        spec: l2cap.ClassicChannelSpec,
2147        handler: Optional[Callable[[l2cap.ClassicChannel], Any]] = None,
2148    ) -> l2cap.ClassicChannelServer: ...
2149
2150    @overload
2151    def create_l2cap_server(
2152        self,
2153        spec: l2cap.LeCreditBasedChannelSpec,
2154        handler: Optional[Callable[[l2cap.LeCreditBasedChannel], Any]] = None,
2155    ) -> l2cap.LeCreditBasedChannelServer: ...
2156
2157    def create_l2cap_server(
2158        self,
2159        spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec],
2160        handler: Union[
2161            Callable[[l2cap.ClassicChannel], Any],
2162            Callable[[l2cap.LeCreditBasedChannel], Any],
2163            None,
2164        ] = None,
2165    ) -> Union[l2cap.ClassicChannelServer, l2cap.LeCreditBasedChannelServer]:
2166        if isinstance(spec, l2cap.ClassicChannelSpec):
2167            return self.l2cap_channel_manager.create_classic_server(
2168                spec=spec,
2169                handler=cast(Callable[[l2cap.ClassicChannel], Any], handler),
2170            )
2171        elif isinstance(spec, l2cap.LeCreditBasedChannelSpec):
2172            return self.l2cap_channel_manager.create_le_credit_based_server(
2173                handler=cast(Callable[[l2cap.LeCreditBasedChannel], Any], handler),
2174                spec=spec,
2175            )
2176        else:
2177            raise InvalidArgumentError(f'Unexpected mode {spec}')
2178
2179    def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None:
2180        self.host.send_l2cap_pdu(connection_handle, cid, pdu)
2181
2182    async def send_command(self, command, check_result=False):
2183        try:
2184            return await asyncio.wait_for(
2185                self.host.send_command(command, check_result), self.command_timeout
2186            )
2187        except asyncio.TimeoutError as error:
2188            logger.warning(f'!!! Command {command.name} timed out')
2189            raise CommandTimeoutError() from error
2190
2191    async def power_on(self) -> None:
2192        # Reset the controller
2193        await self.host.reset()
2194
2195        # Try to get the public address from the controller
2196        response = await self.send_command(HCI_Read_BD_ADDR_Command())
2197        if response.return_parameters.status == HCI_SUCCESS:
2198            logger.debug(
2199                color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow')
2200            )
2201            self.public_address = response.return_parameters.bd_addr
2202
2203        # Instantiate the Key Store (we do this here rather than at __init__ time
2204        # because some Key Store implementations use the public address as a namespace)
2205        if self.keystore is None:
2206            self.keystore = KeyStore.create_for_device(self)
2207
2208        # Finish setting up SMP based on post-init configurable options
2209        if self.classic_smp_enabled:
2210            self.l2cap_channel_manager.register_fixed_channel(
2211                smp.SMP_BR_CID, self.on_smp_pdu
2212            )
2213
2214        if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
2215            await self.send_command(
2216                HCI_Write_LE_Host_Support_Command(
2217                    le_supported_host=int(self.le_enabled),
2218                    simultaneous_le_host=int(self.le_simultaneous_enabled),
2219                ),
2220                check_result=True,
2221            )
2222
2223        if self.le_enabled:
2224            # Generate a random address if not set.
2225            if self.static_address == Address.ANY_RANDOM:
2226                self.static_address = Address.generate_static_address()
2227
2228            # If LE Privacy is enabled, generate an RPA
2229            if self.le_privacy_enabled:
2230                self.random_address = Address.generate_private_address(self.irk)
2231                logger.info(f'Initial RPA: {self.random_address}')
2232                if self.le_rpa_timeout > 0:
2233                    # Start a task to periodically generate a new RPA
2234                    self.le_rpa_periodic_update_task = asyncio.create_task(
2235                        self._run_rpa_periodic_update()
2236                    )
2237            else:
2238                self.random_address = self.static_address
2239
2240            if self.random_address != Address.ANY_RANDOM:
2241                logger.debug(
2242                    color(
2243                        f'LE Random Address: {self.random_address}',
2244                        'yellow',
2245                    )
2246                )
2247                await self.send_command(
2248                    HCI_LE_Set_Random_Address_Command(
2249                        random_address=self.random_address
2250                    ),
2251                    check_result=True,
2252                )
2253
2254            # Load the address resolving list
2255            if self.keystore:
2256                await self.refresh_resolving_list()
2257
2258            # Enable address resolution
2259            if self.address_resolution_offload:
2260                await self.send_command(
2261                    HCI_LE_Set_Address_Resolution_Enable_Command(
2262                        address_resolution_enable=1
2263                    ),
2264                    check_result=True,
2265                )
2266
2267            if self.cis_enabled:
2268                await self.send_command(
2269                    HCI_LE_Set_Host_Feature_Command(
2270                        bit_number=LeFeature.CONNECTED_ISOCHRONOUS_STREAM,
2271                        bit_value=1,
2272                    ),
2273                    check_result=True,
2274                )
2275
2276        if self.classic_enabled:
2277            await self.send_command(
2278                HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))
2279            )
2280            await self.send_command(
2281                HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device)
2282            )
2283            await self.send_command(
2284                HCI_Write_Simple_Pairing_Mode_Command(
2285                    simple_pairing_mode=int(self.classic_ssp_enabled)
2286                )
2287            )
2288            await self.send_command(
2289                HCI_Write_Secure_Connections_Host_Support_Command(
2290                    secure_connections_host_support=int(self.classic_sc_enabled)
2291                )
2292            )
2293            await self.set_connectable(self.connectable)
2294            await self.set_discoverable(self.discoverable)
2295
2296            if self.classic_interlaced_scan_enabled:
2297                if self.host.supports_lmp_features(LmpFeatureMask.INTERLACED_PAGE_SCAN):
2298                    await self.send_command(
2299                        hci.HCI_Write_Page_Scan_Type_Command(page_scan_type=1),
2300                        check_result=True,
2301                    )
2302
2303                if self.host.supports_lmp_features(
2304                    LmpFeatureMask.INTERLACED_INQUIRY_SCAN
2305                ):
2306                    await self.send_command(
2307                        hci.HCI_Write_Inquiry_Scan_Type_Command(scan_type=1),
2308                        check_result=True,
2309                    )
2310
2311        # Done
2312        self.powered_on = True
2313
2314    async def reset(self) -> None:
2315        await self.host.reset()
2316
2317    async def power_off(self) -> None:
2318        if self.powered_on:
2319            if self.le_rpa_periodic_update_task:
2320                self.le_rpa_periodic_update_task.cancel()
2321
2322            await self.host.flush()
2323
2324            self.powered_on = False
2325
2326    async def update_rpa(self) -> bool:
2327        """
2328        Try to update the RPA.
2329
2330        Returns:
2331          True if the RPA was updated, False if it could not be updated.
2332        """
2333
2334        # Check if this is a good time to rotate the address
2335        if self.is_advertising or self.is_scanning or self.is_le_connecting:
2336            logger.debug('skipping RPA update')
2337            return False
2338
2339        random_address = Address.generate_private_address(self.irk)
2340        response = await self.send_command(
2341            HCI_LE_Set_Random_Address_Command(random_address=self.random_address)
2342        )
2343        if response.return_parameters == HCI_SUCCESS:
2344            logger.info(f'new RPA: {random_address}')
2345            self.random_address = random_address
2346            return True
2347        else:
2348            logger.warning(f'failed to set RPA: {response.return_parameters}')
2349            return False
2350
2351    async def _run_rpa_periodic_update(self) -> None:
2352        """Update the RPA periodically"""
2353        while self.le_rpa_timeout != 0:
2354            await asyncio.sleep(self.le_rpa_timeout)
2355            if not self.update_rpa():
2356                logger.debug("periodic RPA update failed")
2357
2358    async def refresh_resolving_list(self) -> None:
2359        assert self.keystore is not None
2360
2361        resolving_keys = await self.keystore.get_resolving_keys()
2362        # Create a host-side address resolver
2363        self.address_resolver = smp.AddressResolver(resolving_keys)
2364
2365        if self.address_resolution_offload or self.address_generation_offload:
2366            await self.send_command(HCI_LE_Clear_Resolving_List_Command())
2367
2368            # Add an empty entry for non-directed address generation.
2369            await self.send_command(
2370                HCI_LE_Add_Device_To_Resolving_List_Command(
2371                    peer_identity_address_type=Address.ANY.address_type,
2372                    peer_identity_address=Address.ANY,
2373                    peer_irk=bytes(16),
2374                    local_irk=self.irk,
2375                )
2376            )
2377
2378            for irk, address in resolving_keys:
2379                await self.send_command(
2380                    HCI_LE_Add_Device_To_Resolving_List_Command(
2381                        peer_identity_address_type=address.address_type,
2382                        peer_identity_address=address,
2383                        peer_irk=irk,
2384                        local_irk=self.irk,
2385                    )
2386                )
2387
2388    def supports_le_features(self, feature: LeFeatureMask) -> bool:
2389        return self.host.supports_le_features(feature)
2390
2391    def supports_le_phy(self, phy: int) -> bool:
2392        if phy == HCI_LE_1M_PHY:
2393            return True
2394
2395        feature_map = {
2396            HCI_LE_2M_PHY: LeFeatureMask.LE_2M_PHY,
2397            HCI_LE_CODED_PHY: LeFeatureMask.LE_CODED_PHY,
2398        }
2399        if phy not in feature_map:
2400            raise InvalidArgumentError('invalid PHY')
2401
2402        return self.supports_le_features(feature_map[phy])
2403
2404    @property
2405    def supports_le_extended_advertising(self):
2406        return self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING)
2407
2408    @property
2409    def supports_le_periodic_advertising(self):
2410        return self.supports_le_features(LeFeatureMask.LE_PERIODIC_ADVERTISING)
2411
2412    async def start_advertising(
2413        self,
2414        advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
2415        target: Optional[Address] = None,
2416        own_address_type: int = OwnAddressType.RANDOM,
2417        auto_restart: bool = False,
2418        advertising_data: Optional[bytes] = None,
2419        scan_response_data: Optional[bytes] = None,
2420        advertising_interval_min: Optional[int] = None,
2421        advertising_interval_max: Optional[int] = None,
2422    ) -> None:
2423        """Start legacy advertising.
2424
2425        If the controller supports it, extended advertising commands with legacy PDUs
2426        will be used to advertise. If not, legacy advertising commands will be used.
2427
2428        Args:
2429          advertising_type:
2430            Type of advertising events.
2431          target:
2432            Peer address for directed advertising target.
2433            (Ignored if `advertising_type` is not directed)
2434          own_address_type:
2435            Own address type to use in the advertising.
2436          auto_restart:
2437            Whether the advertisement will be restarted after disconnection.
2438          advertising_data:
2439            Raw advertising data. If None, the value of the property
2440            self.advertising_data will be used.
2441          scan_response_data:
2442            Raw scan response. If None, the value of the property
2443            self.scan_response_data will be used.
2444          advertising_interval_min:
2445            Minimum advertising interval, in milliseconds. If None, the value of the
2446            property self.advertising_interval_min will be used.
2447          advertising_interval_max:
2448            Maximum advertising interval, in milliseconds. If None, the value of the
2449            property self.advertising_interval_max will be used.
2450        """
2451        # Update backing properties.
2452        if advertising_data is not None:
2453            self.advertising_data = advertising_data
2454        if scan_response_data is not None:
2455            self.scan_response_data = scan_response_data
2456        if advertising_interval_min is not None:
2457            self.advertising_interval_min = advertising_interval_min
2458        if advertising_interval_max is not None:
2459            self.advertising_interval_max = advertising_interval_max
2460
2461        # Decide what peer address to use
2462        if advertising_type.is_directed:
2463            if target is None:
2464                raise InvalidArgumentError('directed advertising requires a target')
2465            peer_address = target
2466        else:
2467            peer_address = Address.ANY
2468
2469        # If we're already advertising, stop now because we'll be re-creating
2470        # a new advertiser or advertising set.
2471        await self.stop_advertising()
2472        assert self.legacy_advertiser is None
2473        assert self.legacy_advertising_set is None
2474
2475        if self.supports_le_extended_advertising:
2476            # Use extended advertising commands with legacy PDUs.
2477            self.legacy_advertising_set = await self.create_advertising_set(
2478                auto_start=True,
2479                auto_restart=auto_restart,
2480                random_address=self.random_address,
2481                advertising_parameters=AdvertisingParameters(
2482                    advertising_event_properties=(
2483                        AdvertisingEventProperties.from_advertising_type(
2484                            advertising_type
2485                        )
2486                    ),
2487                    primary_advertising_interval_min=self.advertising_interval_min,
2488                    primary_advertising_interval_max=self.advertising_interval_max,
2489                    own_address_type=OwnAddressType(own_address_type),
2490                    peer_address=peer_address,
2491                ),
2492                advertising_data=(
2493                    self.advertising_data if advertising_type.has_data else b''
2494                ),
2495                scan_response_data=(
2496                    self.scan_response_data if advertising_type.is_scannable else b''
2497                ),
2498            )
2499        else:
2500            # Use legacy commands.
2501            self.legacy_advertiser = LegacyAdvertiser(
2502                device=self,
2503                advertising_type=advertising_type,
2504                own_address_type=OwnAddressType(own_address_type),
2505                peer_address=peer_address,
2506                auto_restart=auto_restart,
2507            )
2508
2509            await self.legacy_advertiser.start()
2510
2511    async def stop_advertising(self) -> None:
2512        """Stop legacy advertising."""
2513        # Disable advertising
2514        if self.legacy_advertising_set:
2515            if self.legacy_advertising_set.enabled:
2516                await self.legacy_advertising_set.stop()
2517            await self.legacy_advertising_set.remove()
2518            self.legacy_advertising_set = None
2519        elif self.legacy_advertiser:
2520            await self.legacy_advertiser.stop()
2521            self.legacy_advertiser = None
2522
2523    async def create_advertising_set(
2524        self,
2525        advertising_parameters: Optional[AdvertisingParameters] = None,
2526        random_address: Optional[Address] = None,
2527        advertising_data: bytes = b'',
2528        scan_response_data: bytes = b'',
2529        periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] = None,
2530        periodic_advertising_data: bytes = b'',
2531        auto_start: bool = True,
2532        auto_restart: bool = False,
2533    ) -> AdvertisingSet:
2534        """
2535        Create an advertising set.
2536
2537        This method allows the creation of advertising sets for controllers that
2538        support extended advertising.
2539
2540        Args:
2541          advertising_parameters:
2542            The parameters to use for this set. If None, default parameters are used.
2543          random_address:
2544            The random address to use (only relevant when the parameters specify that
2545            own_address_type is random).
2546          advertising_data:
2547            Initial value for the set's advertising data.
2548          scan_response_data:
2549            Initial value for the set's scan response data.
2550          periodic_advertising_parameters:
2551            The parameters to use for periodic advertising (if needed).
2552          periodic_advertising_data:
2553            Initial value for the set's periodic advertising data.
2554          auto_start:
2555            True if the set should be automatically started upon creation.
2556          auto_restart:
2557            True if the set should be automatically restated after a disconnection.
2558
2559        Returns:
2560          An AdvertisingSet instance.
2561        """
2562        # Instantiate default values
2563        if advertising_parameters is None:
2564            advertising_parameters = AdvertisingParameters()
2565
2566        if (
2567            not advertising_parameters.advertising_event_properties.is_legacy
2568            and advertising_data
2569            and scan_response_data
2570        ):
2571            raise InvalidArgumentError(
2572                "Extended advertisements can't have both data and scan \
2573                              response data"
2574            )
2575
2576        # Allocate a new handle
2577        try:
2578            advertising_handle = next(
2579                handle
2580                for handle in range(
2581                    DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
2582                    DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
2583                )
2584                if handle not in self.extended_advertising_sets
2585            )
2586        except StopIteration as exc:
2587            raise OutOfResourcesError(
2588                "all valid advertising handles already in use"
2589            ) from exc
2590
2591        # Use the device's random address if a random address is needed but none was
2592        # provided.
2593        if (
2594            advertising_parameters.own_address_type
2595            in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM)
2596            and random_address is None
2597        ):
2598            random_address = self.random_address
2599
2600        # Create the object that represents the set.
2601        advertising_set = AdvertisingSet(
2602            device=self,
2603            advertising_handle=advertising_handle,
2604            auto_restart=auto_restart,
2605            random_address=random_address,
2606            advertising_parameters=advertising_parameters,
2607            advertising_data=advertising_data,
2608            scan_response_data=scan_response_data,
2609            periodic_advertising_parameters=periodic_advertising_parameters,
2610            periodic_advertising_data=periodic_advertising_data,
2611        )
2612
2613        # Create the set in the controller.
2614        await advertising_set.set_advertising_parameters(advertising_parameters)
2615
2616        # Update the set in the controller.
2617        try:
2618            if random_address:
2619                await advertising_set.set_random_address(random_address)
2620
2621            if advertising_data:
2622                await advertising_set.set_advertising_data(advertising_data)
2623
2624            if scan_response_data:
2625                await advertising_set.set_scan_response_data(scan_response_data)
2626
2627            if periodic_advertising_parameters:
2628                # TODO: call LE Set Periodic Advertising Parameters command
2629                raise NotImplementedError('periodic advertising not yet supported')
2630
2631            if periodic_advertising_data:
2632                # TODO: call LE Set Periodic Advertising Data command
2633                raise NotImplementedError('periodic advertising not yet supported')
2634
2635        except HCI_Error as error:
2636            # Remove the advertising set so that it doesn't stay dangling in the
2637            # controller.
2638            await self.send_command(
2639                HCI_LE_Remove_Advertising_Set_Command(
2640                    advertising_handle=advertising_handle
2641                ),
2642                check_result=False,
2643            )
2644            raise error
2645
2646        # Remember the set.
2647        self.extended_advertising_sets[advertising_handle] = advertising_set
2648
2649        # Try to start the set if requested.
2650        if auto_start:
2651            try:
2652                # pylint: disable=line-too-long
2653                duration = (
2654                    DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION
2655                    if advertising_parameters.advertising_event_properties.is_high_duty_cycle_directed_connectable
2656                    else 0
2657                )
2658                await advertising_set.start(duration=duration)
2659            except Exception as error:
2660                logger.exception(f'failed to start advertising set: {error}')
2661                await advertising_set.remove()
2662                raise
2663
2664        return advertising_set
2665
2666    @property
2667    def is_advertising(self):
2668        if self.legacy_advertiser:
2669            return True
2670
2671        return any(
2672            advertising_set.enabled
2673            for advertising_set in self.extended_advertising_sets.values()
2674        )
2675
2676    async def start_scanning(
2677        self,
2678        legacy: bool = False,
2679        active: bool = True,
2680        scan_interval: int = DEVICE_DEFAULT_SCAN_INTERVAL,  # Scan interval in ms
2681        scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW,  # Scan window in ms
2682        own_address_type: int = OwnAddressType.RANDOM,
2683        filter_duplicates: bool = False,
2684        scanning_phys: List[int] = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY],
2685    ) -> None:
2686        # Check that the arguments are legal
2687        if scan_interval < scan_window:
2688            raise InvalidArgumentError('scan_interval must be >= scan_window')
2689        if (
2690            scan_interval < DEVICE_MIN_SCAN_INTERVAL
2691            or scan_interval > DEVICE_MAX_SCAN_INTERVAL
2692        ):
2693            raise InvalidArgumentError('scan_interval out of range')
2694        if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW:
2695            raise InvalidArgumentError('scan_interval out of range')
2696
2697        # Reset the accumulators
2698        self.advertisement_accumulators = {}
2699
2700        # Enable scanning
2701        if not legacy and self.supports_le_extended_advertising:
2702            # Set the scanning parameters
2703            scan_type = (
2704                HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING
2705                if active
2706                else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING
2707            )
2708            scanning_filter_policy = (
2709                HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY
2710            )  # TODO: support other types
2711
2712            scanning_phy_count = 0
2713            scanning_phys_bits = 0
2714            if HCI_LE_1M_PHY in scanning_phys:
2715                scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT
2716                scanning_phy_count += 1
2717            if HCI_LE_CODED_PHY in scanning_phys:
2718                if self.supports_le_features(LeFeatureMask.LE_CODED_PHY):
2719                    scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT
2720                    scanning_phy_count += 1
2721
2722            if scanning_phy_count == 0:
2723                raise InvalidArgumentError('at least one scanning PHY must be enabled')
2724
2725            await self.send_command(
2726                HCI_LE_Set_Extended_Scan_Parameters_Command(
2727                    own_address_type=own_address_type,
2728                    scanning_filter_policy=scanning_filter_policy,
2729                    scanning_phys=scanning_phys_bits,
2730                    scan_types=[scan_type] * scanning_phy_count,
2731                    scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count,
2732                    scan_windows=[int(scan_window / 0.625)] * scanning_phy_count,
2733                ),
2734                check_result=True,
2735            )
2736
2737            # Enable scanning
2738            await self.send_command(
2739                HCI_LE_Set_Extended_Scan_Enable_Command(
2740                    enable=1,
2741                    filter_duplicates=1 if filter_duplicates else 0,
2742                    duration=0,  # TODO allow other values
2743                    period=0,  # TODO allow other values
2744                ),
2745                check_result=True,
2746            )
2747        else:
2748            # Set the scanning parameters
2749            scan_type = (
2750                HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING
2751                if active
2752                else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING
2753            )
2754            await self.send_command(
2755                # pylint: disable=line-too-long
2756                HCI_LE_Set_Scan_Parameters_Command(
2757                    le_scan_type=scan_type,
2758                    le_scan_interval=int(scan_window / 0.625),
2759                    le_scan_window=int(scan_window / 0.625),
2760                    own_address_type=own_address_type,
2761                    scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY,
2762                ),
2763                check_result=True,
2764            )
2765
2766            # Enable scanning
2767            await self.send_command(
2768                HCI_LE_Set_Scan_Enable_Command(
2769                    le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0
2770                ),
2771                check_result=True,
2772            )
2773
2774        self.scanning_is_passive = not active
2775        self.scanning = True
2776
2777    async def stop_scanning(self, legacy: bool = False) -> None:
2778        # Disable scanning
2779        if not legacy and self.supports_le_extended_advertising:
2780            await self.send_command(
2781                HCI_LE_Set_Extended_Scan_Enable_Command(
2782                    enable=0, filter_duplicates=0, duration=0, period=0
2783                ),
2784                check_result=True,
2785            )
2786        else:
2787            await self.send_command(
2788                HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0),
2789                check_result=True,
2790            )
2791
2792        self.scanning = False
2793
2794    @property
2795    def is_scanning(self):
2796        return self.scanning
2797
2798    @host_event_handler
2799    def on_advertising_report(self, report):
2800        if not (accumulator := self.advertisement_accumulators.get(report.address)):
2801            accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive)
2802            self.advertisement_accumulators[report.address] = accumulator
2803        if advertisement := accumulator.update(report):
2804            self.emit('advertisement', advertisement)
2805
2806    async def create_periodic_advertising_sync(
2807        self,
2808        advertiser_address: Address,
2809        sid: int,
2810        skip: int = DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP,
2811        sync_timeout: float = DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT,
2812        filter_duplicates: bool = False,
2813    ) -> PeriodicAdvertisingSync:
2814        # Check that the controller supports the feature.
2815        if not self.supports_le_periodic_advertising:
2816            raise NotSupportedError()
2817
2818        # Check that there isn't already an equivalent entry
2819        if any(
2820            sync.advertiser_address == advertiser_address and sync.sid == sid
2821            for sync in self.periodic_advertising_syncs
2822        ):
2823            raise ValueError("equivalent entry already created")
2824
2825        # Create a new entry
2826        sync = PeriodicAdvertisingSync(
2827            device=self,
2828            advertiser_address=advertiser_address,
2829            sid=sid,
2830            skip=skip,
2831            sync_timeout=sync_timeout,
2832            filter_duplicates=filter_duplicates,
2833        )
2834
2835        self.periodic_advertising_syncs.append(sync)
2836
2837        # Check if any sync should be started
2838        await self._update_periodic_advertising_syncs()
2839
2840        return sync
2841
2842    async def _update_periodic_advertising_syncs(self) -> None:
2843        # Check if there's already a pending sync
2844        if any(
2845            sync.state == PeriodicAdvertisingSync.State.PENDING
2846            for sync in self.periodic_advertising_syncs
2847        ):
2848            logger.debug("at least one sync pending, nothing to update yet")
2849            return
2850
2851        # Start the next sync that's waiting to be started
2852        if ready := next(
2853            (
2854                sync
2855                for sync in self.periodic_advertising_syncs
2856                if sync.state == PeriodicAdvertisingSync.State.INIT
2857            ),
2858            None,
2859        ):
2860            await ready.establish()
2861            return
2862
2863    @host_event_handler
2864    def on_periodic_advertising_sync_establishment(
2865        self,
2866        status: int,
2867        sync_handle: int,
2868        advertising_sid: int,
2869        advertiser_address: Address,
2870        advertiser_phy: int,
2871        periodic_advertising_interval: int,
2872        advertiser_clock_accuracy: int,
2873    ) -> None:
2874        for periodic_advertising_sync in self.periodic_advertising_syncs:
2875            if (
2876                periodic_advertising_sync.advertiser_address == advertiser_address
2877                and periodic_advertising_sync.sid == advertising_sid
2878            ):
2879                periodic_advertising_sync.on_establishment(
2880                    status,
2881                    sync_handle,
2882                    advertiser_phy,
2883                    periodic_advertising_interval,
2884                    advertiser_clock_accuracy,
2885                )
2886
2887                AsyncRunner.spawn(self._update_periodic_advertising_syncs())
2888
2889                return
2890
2891        logger.warning(
2892            "periodic advertising sync establishment for unknown address/sid"
2893        )
2894
2895    @host_event_handler
2896    @with_periodic_advertising_sync_from_handle
2897    def on_periodic_advertising_sync_loss(
2898        self, periodic_advertising_sync: PeriodicAdvertisingSync
2899    ):
2900        periodic_advertising_sync.on_loss()
2901
2902    @host_event_handler
2903    @with_periodic_advertising_sync_from_handle
2904    def on_periodic_advertising_report(
2905        self,
2906        periodic_advertising_sync: PeriodicAdvertisingSync,
2907        report: HCI_LE_Periodic_Advertising_Report_Event,
2908    ):
2909        periodic_advertising_sync.on_periodic_advertising_report(report)
2910
2911    @host_event_handler
2912    @with_periodic_advertising_sync_from_handle
2913    def on_biginfo_advertising_report(
2914        self,
2915        periodic_advertising_sync: PeriodicAdvertisingSync,
2916        report: HCI_LE_BIGInfo_Advertising_Report_Event,
2917    ):
2918        periodic_advertising_sync.on_biginfo_advertising_report(report)
2919
2920    async def start_discovery(self, auto_restart: bool = True) -> None:
2921        await self.send_command(
2922            HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE),
2923            check_result=True,
2924        )
2925
2926        response = await self.send_command(
2927            HCI_Inquiry_Command(
2928                lap=HCI_GENERAL_INQUIRY_LAP,
2929                inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH,
2930                num_responses=0,  # Unlimited number of responses.
2931            )
2932        )
2933        if response.status != HCI_Command_Status_Event.PENDING:
2934            self.discovering = False
2935            raise HCI_StatusError(response)
2936
2937        self.auto_restart_inquiry = auto_restart
2938        self.discovering = True
2939
2940    async def stop_discovery(self) -> None:
2941        if self.discovering:
2942            await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True)
2943        self.auto_restart_inquiry = True
2944        self.discovering = False
2945
2946    @host_event_handler
2947    def on_inquiry_result(self, address, class_of_device, data, rssi):
2948        self.emit(
2949            'inquiry_result',
2950            address,
2951            class_of_device,
2952            AdvertisingData.from_bytes(data),
2953            rssi,
2954        )
2955
2956    async def set_scan_enable(self, inquiry_scan_enabled, page_scan_enabled):
2957        if inquiry_scan_enabled and page_scan_enabled:
2958            scan_enable = 0x03
2959        elif page_scan_enabled:
2960            scan_enable = 0x02
2961        elif inquiry_scan_enabled:
2962            scan_enable = 0x01
2963        else:
2964            scan_enable = 0x00
2965
2966        return await self.send_command(
2967            HCI_Write_Scan_Enable_Command(scan_enable=scan_enable)
2968        )
2969
2970    async def set_discoverable(self, discoverable: bool = True) -> None:
2971        self.discoverable = discoverable
2972        if self.classic_enabled:
2973            # Synthesize an inquiry response if none is set already
2974            if self.inquiry_response is None:
2975                self.inquiry_response = bytes(
2976                    AdvertisingData(
2977                        [
2978                            (
2979                                AdvertisingData.COMPLETE_LOCAL_NAME,
2980                                bytes(self.name, 'utf-8'),
2981                            )
2982                        ]
2983                    )
2984                )
2985
2986            # Update the controller
2987            await self.send_command(
2988                HCI_Write_Extended_Inquiry_Response_Command(
2989                    fec_required=0, extended_inquiry_response=self.inquiry_response
2990                ),
2991                check_result=True,
2992            )
2993            await self.set_scan_enable(
2994                inquiry_scan_enabled=self.discoverable,
2995                page_scan_enabled=self.connectable,
2996            )
2997
2998    async def set_connectable(self, connectable: bool = True) -> None:
2999        self.connectable = connectable
3000        if self.classic_enabled:
3001            await self.set_scan_enable(
3002                inquiry_scan_enabled=self.discoverable,
3003                page_scan_enabled=self.connectable,
3004            )
3005
3006    async def connect(
3007        self,
3008        peer_address: Union[Address, str],
3009        transport: int = BT_LE_TRANSPORT,
3010        connection_parameters_preferences: Optional[
3011            Dict[int, ConnectionParametersPreferences]
3012        ] = None,
3013        own_address_type: int = OwnAddressType.RANDOM,
3014        timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
3015        always_resolve: bool = False,
3016    ) -> Connection:
3017        '''
3018        Request a connection to a peer.
3019
3020        When the transport is BLE, this method cannot be called if there is already a
3021        pending connection.
3022
3023        Args:
3024          peer_address:
3025            Address or name of the device to connect to.
3026            If a string is passed:
3027              If the string is an address followed by a `@` suffix, the `always_resolve`
3028              argument is implicitly set to True, so the connection is made to the
3029              address after resolution.
3030              If the string is any other address, the connection is made to that
3031              address (with or without address resolution, depending on the
3032              `always_resolve` argument).
3033              For any other string, a scan for devices using that string as their name
3034              is initiated, and a connection to the first matching device's address
3035              is made. In that case, `always_resolve` is ignored.
3036
3037          connection_parameters_preferences:
3038            (BLE only, ignored for BR/EDR)
3039            * None: use the 1M PHY with default parameters
3040            * map: each entry has a PHY as key and a ConnectionParametersPreferences
3041              object as value
3042
3043          own_address_type:
3044            (BLE only, ignored for BR/EDR)
3045            OwnAddressType.RANDOM to use this device's random address, or
3046            OwnAddressType.PUBLIC to use this device's public address.
3047
3048          timeout:
3049            Maximum time to wait for a connection to be established, in seconds.
3050            Pass None for an unlimited time.
3051
3052          always_resolve:
3053            (BLE only, ignored for BR/EDR)
3054            If True, always initiate a scan, resolving addresses, and connect to the
3055            address that resolves to `peer_address`.
3056        '''
3057
3058        # Check parameters
3059        if transport not in (BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT):
3060            raise InvalidArgumentError('invalid transport')
3061
3062        # Adjust the transport automatically if we need to
3063        if transport == BT_LE_TRANSPORT and not self.le_enabled:
3064            transport = BT_BR_EDR_TRANSPORT
3065        elif transport == BT_BR_EDR_TRANSPORT and not self.classic_enabled:
3066            transport = BT_LE_TRANSPORT
3067
3068        # Check that there isn't already a pending connection
3069        if transport == BT_LE_TRANSPORT and self.is_le_connecting:
3070            raise InvalidStateError('connection already pending')
3071
3072        if isinstance(peer_address, str):
3073            try:
3074                if transport == BT_LE_TRANSPORT and peer_address.endswith('@'):
3075                    peer_address = Address.from_string_for_transport(
3076                        peer_address[:-1], transport
3077                    )
3078                    always_resolve = True
3079                    logger.debug('forcing address resolution')
3080                else:
3081                    peer_address = Address.from_string_for_transport(
3082                        peer_address, transport
3083                    )
3084            except (InvalidArgumentError, ValueError):
3085                # If the address is not parsable, assume it is a name instead
3086                always_resolve = False
3087                logger.debug('looking for peer by name')
3088                peer_address = await self.find_peer_by_name(
3089                    peer_address, transport
3090                )  # TODO: timeout
3091        else:
3092            # All BR/EDR addresses should be public addresses
3093            if (
3094                transport == BT_BR_EDR_TRANSPORT
3095                and peer_address.address_type != Address.PUBLIC_DEVICE_ADDRESS
3096            ):
3097                raise InvalidArgumentError('BR/EDR addresses must be PUBLIC')
3098
3099        assert isinstance(peer_address, Address)
3100
3101        if transport == BT_LE_TRANSPORT and always_resolve:
3102            logger.debug('resolving address')
3103            peer_address = await self.find_peer_by_identity_address(
3104                peer_address
3105            )  # TODO: timeout
3106
3107        def on_connection(connection):
3108            if transport == BT_LE_TRANSPORT or (
3109                # match BR/EDR connection event against peer address
3110                connection.transport == transport
3111                and connection.peer_address == peer_address
3112            ):
3113                pending_connection.set_result(connection)
3114
3115        def on_connection_failure(error):
3116            if transport == BT_LE_TRANSPORT or (
3117                # match BR/EDR connection failure event against peer address
3118                error.transport == transport
3119                and error.peer_address == peer_address
3120            ):
3121                pending_connection.set_exception(error)
3122
3123        # Create a future so that we can wait for the connection's result
3124        pending_connection = asyncio.get_running_loop().create_future()
3125        self.on('connection', on_connection)
3126        self.on('connection_failure', on_connection_failure)
3127
3128        try:
3129            # Tell the controller to connect
3130            if transport == BT_LE_TRANSPORT:
3131                if connection_parameters_preferences is None:
3132                    if connection_parameters_preferences is None:
3133                        connection_parameters_preferences = {
3134                            HCI_LE_1M_PHY: ConnectionParametersPreferences.default
3135                        }
3136
3137                self.connect_own_address_type = own_address_type
3138
3139                if self.host.supports_command(
3140                    HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND
3141                ):
3142                    # Only keep supported PHYs
3143                    phys = sorted(
3144                        list(
3145                            set(
3146                                filter(
3147                                    self.supports_le_phy,
3148                                    connection_parameters_preferences.keys(),
3149                                )
3150                            )
3151                        )
3152                    )
3153                    if not phys:
3154                        raise InvalidArgumentError('at least one supported PHY needed')
3155
3156                    phy_count = len(phys)
3157                    initiating_phys = phy_list_to_bits(phys)
3158
3159                    connection_interval_mins = [
3160                        int(
3161                            connection_parameters_preferences[
3162                                phy
3163                            ].connection_interval_min
3164                            / 1.25
3165                        )
3166                        for phy in phys
3167                    ]
3168                    connection_interval_maxs = [
3169                        int(
3170                            connection_parameters_preferences[
3171                                phy
3172                            ].connection_interval_max
3173                            / 1.25
3174                        )
3175                        for phy in phys
3176                    ]
3177                    max_latencies = [
3178                        connection_parameters_preferences[phy].max_latency
3179                        for phy in phys
3180                    ]
3181                    supervision_timeouts = [
3182                        int(
3183                            connection_parameters_preferences[phy].supervision_timeout
3184                            / 10
3185                        )
3186                        for phy in phys
3187                    ]
3188                    min_ce_lengths = [
3189                        int(
3190                            connection_parameters_preferences[phy].min_ce_length / 0.625
3191                        )
3192                        for phy in phys
3193                    ]
3194                    max_ce_lengths = [
3195                        int(
3196                            connection_parameters_preferences[phy].max_ce_length / 0.625
3197                        )
3198                        for phy in phys
3199                    ]
3200
3201                    result = await self.send_command(
3202                        HCI_LE_Extended_Create_Connection_Command(
3203                            initiator_filter_policy=0,
3204                            own_address_type=own_address_type,
3205                            peer_address_type=peer_address.address_type,
3206                            peer_address=peer_address,
3207                            initiating_phys=initiating_phys,
3208                            scan_intervals=(
3209                                int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625),
3210                            )
3211                            * phy_count,
3212                            scan_windows=(
3213                                int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625),
3214                            )
3215                            * phy_count,
3216                            connection_interval_mins=connection_interval_mins,
3217                            connection_interval_maxs=connection_interval_maxs,
3218                            max_latencies=max_latencies,
3219                            supervision_timeouts=supervision_timeouts,
3220                            min_ce_lengths=min_ce_lengths,
3221                            max_ce_lengths=max_ce_lengths,
3222                        )
3223                    )
3224                else:
3225                    if HCI_LE_1M_PHY not in connection_parameters_preferences:
3226                        raise InvalidArgumentError('1M PHY preferences required')
3227
3228                    prefs = connection_parameters_preferences[HCI_LE_1M_PHY]
3229                    result = await self.send_command(
3230                        HCI_LE_Create_Connection_Command(
3231                            le_scan_interval=int(
3232                                DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625
3233                            ),
3234                            le_scan_window=int(
3235                                DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625
3236                            ),
3237                            initiator_filter_policy=0,
3238                            peer_address_type=peer_address.address_type,
3239                            peer_address=peer_address,
3240                            own_address_type=own_address_type,
3241                            connection_interval_min=int(
3242                                prefs.connection_interval_min / 1.25
3243                            ),
3244                            connection_interval_max=int(
3245                                prefs.connection_interval_max / 1.25
3246                            ),
3247                            max_latency=prefs.max_latency,
3248                            supervision_timeout=int(prefs.supervision_timeout / 10),
3249                            min_ce_length=int(prefs.min_ce_length / 0.625),
3250                            max_ce_length=int(prefs.max_ce_length / 0.625),
3251                        )
3252                    )
3253            else:
3254                # Save pending connection
3255                self.pending_connections[peer_address] = Connection.incomplete(
3256                    self, peer_address, BT_CENTRAL_ROLE
3257                )
3258
3259                # TODO: allow passing other settings
3260                result = await self.send_command(
3261                    HCI_Create_Connection_Command(
3262                        bd_addr=peer_address,
3263                        packet_type=0xCC18,  # FIXME: change
3264                        page_scan_repetition_mode=HCI_R2_PAGE_SCAN_REPETITION_MODE,
3265                        clock_offset=0x0000,
3266                        allow_role_switch=0x01,
3267                        reserved=0,
3268                    )
3269                )
3270
3271            if result.status != HCI_Command_Status_Event.PENDING:
3272                raise HCI_StatusError(result)
3273
3274            # Wait for the connection process to complete
3275            if transport == BT_LE_TRANSPORT:
3276                self.le_connecting = True
3277
3278            if timeout is None:
3279                return await self.abort_on('flush', pending_connection)
3280
3281            try:
3282                return await asyncio.wait_for(
3283                    asyncio.shield(pending_connection), timeout
3284                )
3285            except asyncio.TimeoutError:
3286                if transport == BT_LE_TRANSPORT:
3287                    await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
3288                else:
3289                    await self.send_command(
3290                        HCI_Create_Connection_Cancel_Command(bd_addr=peer_address)
3291                    )
3292
3293                try:
3294                    return await self.abort_on('flush', pending_connection)
3295                except core.ConnectionError as error:
3296                    raise core.TimeoutError() from error
3297        finally:
3298            self.remove_listener('connection', on_connection)
3299            self.remove_listener('connection_failure', on_connection_failure)
3300            if transport == BT_LE_TRANSPORT:
3301                self.le_connecting = False
3302                self.connect_own_address_type = None
3303            else:
3304                self.pending_connections.pop(peer_address, None)
3305
3306    async def accept(
3307        self,
3308        peer_address: Union[Address, str] = Address.ANY,
3309        role: int = BT_PERIPHERAL_ROLE,
3310        timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
3311    ) -> Connection:
3312        '''
3313        Wait and accept any incoming connection or a connection from `peer_address` when
3314        set.
3315
3316        Notes:
3317          * A `connect` to the same peer will not complete this call.
3318          * The `timeout` parameter is only handled while waiting for the connection
3319            request, once received and accepted, the controller shall issue a connection
3320            complete event.
3321        '''
3322
3323        if isinstance(peer_address, str):
3324            try:
3325                peer_address = Address(peer_address)
3326            except InvalidArgumentError:
3327                # If the address is not parsable, assume it is a name instead
3328                logger.debug('looking for peer by name')
3329                peer_address = await self.find_peer_by_name(
3330                    peer_address, BT_BR_EDR_TRANSPORT
3331                )  # TODO: timeout
3332
3333        assert isinstance(peer_address, Address)
3334
3335        if peer_address == Address.NIL:
3336            raise InvalidArgumentError('accept on nil address')
3337
3338        # Create a future so that we can wait for the request
3339        pending_request_fut = asyncio.get_running_loop().create_future()
3340
3341        if peer_address == Address.ANY:
3342            self.classic_pending_accepts[Address.ANY].append(pending_request_fut)
3343        elif peer_address in self.classic_pending_accepts:
3344            raise InvalidStateError('accept connection already pending')
3345        else:
3346            self.classic_pending_accepts[peer_address] = [pending_request_fut]
3347
3348        try:
3349            # Wait for a request or a completed connection
3350            pending_request = self.abort_on('flush', pending_request_fut)
3351            result = await (
3352                asyncio.wait_for(pending_request, timeout)
3353                if timeout
3354                else pending_request
3355            )
3356        except Exception:
3357            # Remove future from device context
3358            if peer_address == Address.ANY:
3359                self.classic_pending_accepts[Address.ANY].remove(pending_request_fut)
3360            else:
3361                self.classic_pending_accepts.pop(peer_address)
3362            raise
3363
3364        # Result may already be a completed connection,
3365        # see `on_connection` for details
3366        if isinstance(result, Connection):
3367            return result
3368
3369        # Otherwise, result came from `on_connection_request`
3370        peer_address, _class_of_device, _link_type = result
3371        assert isinstance(peer_address, Address)
3372
3373        # Create a future so that we can wait for the connection's result
3374        pending_connection = asyncio.get_running_loop().create_future()
3375
3376        def on_connection(connection):
3377            if (
3378                connection.transport == BT_BR_EDR_TRANSPORT
3379                and connection.peer_address == peer_address
3380            ):
3381                pending_connection.set_result(connection)
3382
3383        def on_connection_failure(error):
3384            if (
3385                error.transport == BT_BR_EDR_TRANSPORT
3386                and error.peer_address == peer_address
3387            ):
3388                pending_connection.set_exception(error)
3389
3390        self.on('connection', on_connection)
3391        self.on('connection_failure', on_connection_failure)
3392
3393        # Save pending connection, with the Peripheral role.
3394        # Even if we requested a role switch in the HCI_Accept_Connection_Request
3395        # command, this connection is still considered Peripheral until an eventual
3396        # role change event.
3397        self.pending_connections[peer_address] = Connection.incomplete(
3398            self, peer_address, BT_PERIPHERAL_ROLE
3399        )
3400
3401        try:
3402            # Accept connection request
3403            await self.send_command(
3404                HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role)
3405            )
3406
3407            # Wait for connection complete
3408            return await self.abort_on('flush', pending_connection)
3409
3410        finally:
3411            self.remove_listener('connection', on_connection)
3412            self.remove_listener('connection_failure', on_connection_failure)
3413            self.pending_connections.pop(peer_address, None)
3414
3415    @asynccontextmanager
3416    async def connect_as_gatt(self, peer_address):
3417        async with AsyncExitStack() as stack:
3418            connection = await stack.enter_async_context(
3419                await self.connect(peer_address)
3420            )
3421            peer = await stack.enter_async_context(Peer(connection))
3422
3423            yield peer
3424
3425    @property
3426    def is_le_connecting(self):
3427        return self.le_connecting
3428
3429    @property
3430    def is_disconnecting(self):
3431        return self.disconnecting
3432
3433    async def cancel_connection(self, peer_address=None):
3434        # Low-energy: cancel ongoing connection
3435        if peer_address is None:
3436            if not self.is_le_connecting:
3437                return
3438            await self.send_command(
3439                HCI_LE_Create_Connection_Cancel_Command(), check_result=True
3440            )
3441
3442        # BR/EDR: try to cancel to ongoing connection
3443        # NOTE: This API does not prevent from trying to cancel a connection which is
3444        # not currently being created
3445        else:
3446            if isinstance(peer_address, str):
3447                try:
3448                    peer_address = Address(peer_address)
3449                except InvalidArgumentError:
3450                    # If the address is not parsable, assume it is a name instead
3451                    logger.debug('looking for peer by name')
3452                    peer_address = await self.find_peer_by_name(
3453                        peer_address, BT_BR_EDR_TRANSPORT
3454                    )  # TODO: timeout
3455
3456            await self.send_command(
3457                HCI_Create_Connection_Cancel_Command(bd_addr=peer_address),
3458                check_result=True,
3459            )
3460
3461    async def disconnect(
3462        self, connection: Union[Connection, ScoLink, CisLink], reason: int
3463    ) -> None:
3464        # Create a future so that we can wait for the disconnection's result
3465        pending_disconnection = asyncio.get_running_loop().create_future()
3466        connection.on('disconnection', pending_disconnection.set_result)
3467        connection.on('disconnection_failure', pending_disconnection.set_exception)
3468
3469        # Request a disconnection
3470        result = await self.send_command(
3471            HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason)
3472        )
3473
3474        try:
3475            if result.status != HCI_Command_Status_Event.PENDING:
3476                raise HCI_StatusError(result)
3477
3478            # Wait for the disconnection process to complete
3479            self.disconnecting = True
3480            return await self.abort_on('flush', pending_disconnection)
3481        finally:
3482            connection.remove_listener(
3483                'disconnection', pending_disconnection.set_result
3484            )
3485            connection.remove_listener(
3486                'disconnection_failure', pending_disconnection.set_exception
3487            )
3488            self.disconnecting = False
3489
3490    async def set_data_length(self, connection, tx_octets, tx_time) -> None:
3491        if tx_octets < 0x001B or tx_octets > 0x00FB:
3492            raise InvalidArgumentError('tx_octets must be between 0x001B and 0x00FB')
3493
3494        if tx_time < 0x0148 or tx_time > 0x4290:
3495            raise InvalidArgumentError('tx_time must be between 0x0148 and 0x4290')
3496
3497        return await self.send_command(
3498            HCI_LE_Set_Data_Length_Command(
3499                connection_handle=connection.handle,
3500                tx_octets=tx_octets,
3501                tx_time=tx_time,
3502            ),
3503            check_result=True,
3504        )
3505
3506    async def update_connection_parameters(
3507        self,
3508        connection,
3509        connection_interval_min,
3510        connection_interval_max,
3511        max_latency,
3512        supervision_timeout,
3513        min_ce_length=0,
3514        max_ce_length=0,
3515        use_l2cap=False,
3516    ) -> None:
3517        '''
3518        NOTE: the name of the parameters may look odd, but it just follows the names
3519        used in the Bluetooth spec.
3520        '''
3521
3522        if use_l2cap:
3523            if connection.role != BT_PERIPHERAL_ROLE:
3524                raise InvalidStateError(
3525                    'only peripheral can update connection parameters with l2cap'
3526                )
3527            l2cap_result = (
3528                await self.l2cap_channel_manager.update_connection_parameters(
3529                    connection,
3530                    connection_interval_min,
3531                    connection_interval_max,
3532                    max_latency,
3533                    supervision_timeout,
3534                )
3535            )
3536            if l2cap_result != l2cap.L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT:
3537                raise ConnectionParameterUpdateError(l2cap_result)
3538
3539        result = await self.send_command(
3540            HCI_LE_Connection_Update_Command(
3541                connection_handle=connection.handle,
3542                connection_interval_min=connection_interval_min,
3543                connection_interval_max=connection_interval_max,
3544                max_latency=max_latency,
3545                supervision_timeout=supervision_timeout,
3546                min_ce_length=min_ce_length,
3547                max_ce_length=max_ce_length,
3548            )
3549        )
3550        if result.status != HCI_Command_Status_Event.PENDING:
3551            raise HCI_StatusError(result)
3552
3553    async def get_connection_rssi(self, connection):
3554        result = await self.send_command(
3555            HCI_Read_RSSI_Command(handle=connection.handle), check_result=True
3556        )
3557        return result.return_parameters.rssi
3558
3559    async def get_connection_phy(self, connection):
3560        result = await self.send_command(
3561            HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
3562            check_result=True,
3563        )
3564        return (result.return_parameters.tx_phy, result.return_parameters.rx_phy)
3565
3566    async def set_connection_phy(
3567        self, connection, tx_phys=None, rx_phys=None, phy_options=None
3568    ):
3569        if not self.host.supports_command(HCI_LE_SET_PHY_COMMAND):
3570            logger.warning('ignoring request, command not supported')
3571            return
3572
3573        all_phys_bits = (1 if tx_phys is None else 0) | (
3574            (1 if rx_phys is None else 0) << 1
3575        )
3576
3577        result = await self.send_command(
3578            HCI_LE_Set_PHY_Command(
3579                connection_handle=connection.handle,
3580                all_phys=all_phys_bits,
3581                tx_phys=phy_list_to_bits(tx_phys),
3582                rx_phys=phy_list_to_bits(rx_phys),
3583                phy_options=0 if phy_options is None else int(phy_options),
3584            )
3585        )
3586
3587        if result.status != HCI_COMMAND_STATUS_PENDING:
3588            logger.warning(
3589                'HCI_LE_Set_PHY_Command failed: '
3590                f'{HCI_Constant.error_name(result.status)}'
3591            )
3592            raise HCI_StatusError(result)
3593
3594    async def set_default_phy(self, tx_phys=None, rx_phys=None):
3595        all_phys_bits = (1 if tx_phys is None else 0) | (
3596            (1 if rx_phys is None else 0) << 1
3597        )
3598
3599        return await self.send_command(
3600            HCI_LE_Set_Default_PHY_Command(
3601                all_phys=all_phys_bits,
3602                tx_phys=phy_list_to_bits(tx_phys),
3603                rx_phys=phy_list_to_bits(rx_phys),
3604            ),
3605            check_result=True,
3606        )
3607
3608    async def transfer_periodic_sync(
3609        self, connection: Connection, sync_handle: int, service_data: int = 0
3610    ) -> None:
3611        return await self.send_command(
3612            HCI_LE_Periodic_Advertising_Sync_Transfer_Command(
3613                connection_handle=connection.handle,
3614                service_data=service_data,
3615                sync_handle=sync_handle,
3616            ),
3617            check_result=True,
3618        )
3619
3620    async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT):
3621        """
3622        Scan for a peer with a given name and return its address.
3623        """
3624
3625        # Create a future to wait for an address to be found
3626        peer_address = asyncio.get_running_loop().create_future()
3627
3628        def on_peer_found(address, ad_data):
3629            local_name = ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True)
3630            if local_name is None:
3631                local_name = ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True)
3632            if local_name is not None:
3633                if local_name.decode('utf-8') == name:
3634                    peer_address.set_result(address)
3635
3636        listener = None
3637        was_scanning = self.scanning
3638        was_discovering = self.discovering
3639        try:
3640            if transport == BT_LE_TRANSPORT:
3641                event_name = 'advertisement'
3642                listener = self.on(
3643                    event_name,
3644                    lambda advertisement: on_peer_found(
3645                        advertisement.address, advertisement.data
3646                    ),
3647                )
3648
3649                if not self.scanning:
3650                    await self.start_scanning(filter_duplicates=True)
3651
3652            elif transport == BT_BR_EDR_TRANSPORT:
3653                event_name = 'inquiry_result'
3654                listener = self.on(
3655                    event_name,
3656                    lambda address, class_of_device, eir_data, rssi: on_peer_found(
3657                        address, eir_data
3658                    ),
3659                )
3660
3661                if not self.discovering:
3662                    await self.start_discovery()
3663            else:
3664                return None
3665
3666            return await self.abort_on('flush', peer_address)
3667        finally:
3668            if listener is not None:
3669                self.remove_listener(event_name, listener)
3670
3671            if transport == BT_LE_TRANSPORT and not was_scanning:
3672                await self.stop_scanning()
3673            elif transport == BT_BR_EDR_TRANSPORT and not was_discovering:
3674                await self.stop_discovery()
3675
3676    async def find_peer_by_identity_address(self, identity_address: Address) -> Address:
3677        """
3678        Scan for a peer with a resolvable address that can be resolved to a given
3679        identity address.
3680        """
3681
3682        # Create a future to wait for an address to be found
3683        peer_address = asyncio.get_running_loop().create_future()
3684
3685        def on_peer_found(address, _):
3686            if address == identity_address:
3687                if not peer_address.done():
3688                    logger.debug(f'*** Matching public address found for {address}')
3689                    peer_address.set_result(address)
3690                return
3691
3692            if address.is_resolvable:
3693                resolved_address = self.address_resolver.resolve(address)
3694                if resolved_address == identity_address:
3695                    if not peer_address.done():
3696                        logger.debug(f'*** Matching identity found for {address}')
3697                        peer_address.set_result(address)
3698                return
3699
3700        was_scanning = self.scanning
3701        event_name = 'advertisement'
3702        listener = None
3703        try:
3704            listener = self.on(
3705                event_name,
3706                lambda advertisement: on_peer_found(
3707                    advertisement.address, advertisement.data
3708                ),
3709            )
3710
3711            if not self.scanning:
3712                await self.start_scanning(filter_duplicates=True)
3713
3714            return await self.abort_on('flush', peer_address)
3715        finally:
3716            if listener is not None:
3717                self.remove_listener(event_name, listener)
3718
3719            if not was_scanning:
3720                await self.stop_scanning()
3721
3722    @property
3723    def pairing_config_factory(self) -> Callable[[Connection], pairing.PairingConfig]:
3724        return self.smp_manager.pairing_config_factory
3725
3726    @pairing_config_factory.setter
3727    def pairing_config_factory(
3728        self, pairing_config_factory: Callable[[Connection], pairing.PairingConfig]
3729    ) -> None:
3730        self.smp_manager.pairing_config_factory = pairing_config_factory
3731
3732    @property
3733    def smp_session_proxy(self) -> Type[smp.Session]:
3734        return self.smp_manager.session_proxy
3735
3736    @smp_session_proxy.setter
3737    def smp_session_proxy(self, session_proxy: Type[smp.Session]) -> None:
3738        self.smp_manager.session_proxy = session_proxy
3739
3740    async def pair(self, connection):
3741        return await self.smp_manager.pair(connection)
3742
3743    def request_pairing(self, connection):
3744        return self.smp_manager.request_pairing(connection)
3745
3746    async def get_long_term_key(
3747        self, connection_handle: int, rand: bytes, ediv: int
3748    ) -> Optional[bytes]:
3749        if (connection := self.lookup_connection(connection_handle)) is None:
3750            return None
3751
3752        # Start by looking for the key in an SMP session
3753        ltk = self.smp_manager.get_long_term_key(connection, rand, ediv)
3754        if ltk is not None:
3755            return ltk
3756
3757        # Then look for the key in the keystore
3758        if self.keystore is not None:
3759            keys = await self.keystore.get(str(connection.peer_address))
3760            if keys is not None:
3761                logger.debug('found keys in the key store')
3762                if keys.ltk:
3763                    return keys.ltk.value
3764
3765                if connection.role == BT_CENTRAL_ROLE and keys.ltk_central:
3766                    return keys.ltk_central.value
3767
3768                if connection.role == BT_PERIPHERAL_ROLE and keys.ltk_peripheral:
3769                    return keys.ltk_peripheral.value
3770        return None
3771
3772    async def get_link_key(self, address: Address) -> Optional[bytes]:
3773        if self.keystore is None:
3774            return None
3775
3776        # Look for the key in the keystore
3777        keys = await self.keystore.get(str(address))
3778        if keys is None:
3779            logger.debug(f'no keys found for {address}')
3780            return None
3781
3782        logger.debug('found keys in the key store')
3783        if keys.link_key is None:
3784            logger.warning('no link key')
3785            return None
3786
3787        return keys.link_key.value
3788
3789    # [Classic only]
3790    async def authenticate(self, connection):
3791        # Set up event handlers
3792        pending_authentication = asyncio.get_running_loop().create_future()
3793
3794        def on_authentication():
3795            pending_authentication.set_result(None)
3796
3797        def on_authentication_failure(error_code):
3798            pending_authentication.set_exception(HCI_Error(error_code))
3799
3800        connection.on('connection_authentication', on_authentication)
3801        connection.on('connection_authentication_failure', on_authentication_failure)
3802
3803        # Request the authentication
3804        try:
3805            result = await self.send_command(
3806                HCI_Authentication_Requested_Command(
3807                    connection_handle=connection.handle
3808                )
3809            )
3810            if result.status != HCI_COMMAND_STATUS_PENDING:
3811                logger.warning(
3812                    'HCI_Authentication_Requested_Command failed: '
3813                    f'{HCI_Constant.error_name(result.status)}'
3814                )
3815                raise HCI_StatusError(result)
3816
3817            # Wait for the authentication to complete
3818            await connection.abort_on('disconnection', pending_authentication)
3819        finally:
3820            connection.remove_listener('connection_authentication', on_authentication)
3821            connection.remove_listener(
3822                'connection_authentication_failure', on_authentication_failure
3823            )
3824
3825    async def encrypt(self, connection, enable=True):
3826        if not enable and connection.transport == BT_LE_TRANSPORT:
3827            raise InvalidArgumentError('`enable` parameter is classic only.')
3828
3829        # Set up event handlers
3830        pending_encryption = asyncio.get_running_loop().create_future()
3831
3832        def on_encryption_change():
3833            pending_encryption.set_result(None)
3834
3835        def on_encryption_failure(error_code):
3836            pending_encryption.set_exception(HCI_Error(error_code))
3837
3838        connection.on('connection_encryption_change', on_encryption_change)
3839        connection.on('connection_encryption_failure', on_encryption_failure)
3840
3841        # Request the encryption
3842        try:
3843            if connection.transport == BT_LE_TRANSPORT:
3844                # Look for a key in the key store
3845                if self.keystore is None:
3846                    raise InvalidOperationError('no key store')
3847
3848                logger.debug(f'Looking up key for {connection.peer_address}')
3849                keys = await self.keystore.get(str(connection.peer_address))
3850                if keys is None:
3851                    raise InvalidOperationError('keys not found in key store')
3852
3853                if keys.ltk is not None:
3854                    ltk = keys.ltk.value
3855                    rand = bytes(8)
3856                    ediv = 0
3857                elif keys.ltk_central is not None:
3858                    ltk = keys.ltk_central.value
3859                    rand = keys.ltk_central.rand
3860                    ediv = keys.ltk_central.ediv
3861                else:
3862                    raise InvalidOperationError('no LTK found for peer')
3863
3864                if connection.role != HCI_CENTRAL_ROLE:
3865                    raise InvalidStateError('only centrals can start encryption')
3866
3867                result = await self.send_command(
3868                    HCI_LE_Enable_Encryption_Command(
3869                        connection_handle=connection.handle,
3870                        random_number=rand,
3871                        encrypted_diversifier=ediv,
3872                        long_term_key=ltk,
3873                    )
3874                )
3875
3876                if result.status != HCI_COMMAND_STATUS_PENDING:
3877                    logger.warning(
3878                        'HCI_LE_Enable_Encryption_Command failed: '
3879                        f'{HCI_Constant.error_name(result.status)}'
3880                    )
3881                    raise HCI_StatusError(result)
3882            else:
3883                result = await self.send_command(
3884                    HCI_Set_Connection_Encryption_Command(
3885                        connection_handle=connection.handle,
3886                        encryption_enable=0x01 if enable else 0x00,
3887                    )
3888                )
3889
3890                if result.status != HCI_COMMAND_STATUS_PENDING:
3891                    logger.warning(
3892                        'HCI_Set_Connection_Encryption_Command failed: '
3893                        f'{HCI_Constant.error_name(result.status)}'
3894                    )
3895                    raise HCI_StatusError(result)
3896
3897            # Wait for the result
3898            await connection.abort_on('disconnection', pending_encryption)
3899        finally:
3900            connection.remove_listener(
3901                'connection_encryption_change', on_encryption_change
3902            )
3903            connection.remove_listener(
3904                'connection_encryption_failure', on_encryption_failure
3905            )
3906
3907    async def update_keys(self, address: str, keys: PairingKeys) -> None:
3908        if self.keystore is None:
3909            return
3910
3911        try:
3912            await self.keystore.update(address, keys)
3913            await self.refresh_resolving_list()
3914        except Exception as error:
3915            logger.warning(f'!!! error while storing keys: {error}')
3916        else:
3917            self.emit('key_store_update')
3918
3919    # [Classic only]
3920    async def switch_role(self, connection: Connection, role: int):
3921        pending_role_change = asyncio.get_running_loop().create_future()
3922
3923        def on_role_change(new_role):
3924            pending_role_change.set_result(new_role)
3925
3926        def on_role_change_failure(error_code):
3927            pending_role_change.set_exception(HCI_Error(error_code))
3928
3929        connection.on('role_change', on_role_change)
3930        connection.on('role_change_failure', on_role_change_failure)
3931
3932        try:
3933            result = await self.send_command(
3934                HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role)
3935            )
3936            if result.status != HCI_COMMAND_STATUS_PENDING:
3937                logger.warning(
3938                    'HCI_Switch_Role_Command failed: '
3939                    f'{HCI_Constant.error_name(result.status)}'
3940                )
3941                raise HCI_StatusError(result)
3942            await connection.abort_on('disconnection', pending_role_change)
3943        finally:
3944            connection.remove_listener('role_change', on_role_change)
3945            connection.remove_listener('role_change_failure', on_role_change_failure)
3946
3947    # [Classic only]
3948    async def request_remote_name(self, remote: Union[Address, Connection]) -> str:
3949        # Set up event handlers
3950        pending_name = asyncio.get_running_loop().create_future()
3951
3952        peer_address = remote if isinstance(remote, Address) else remote.peer_address
3953
3954        handler = self.on(
3955            'remote_name',
3956            lambda address, remote_name: (
3957                pending_name.set_result(remote_name)
3958                if address == peer_address
3959                else None
3960            ),
3961        )
3962        failure_handler = self.on(
3963            'remote_name_failure',
3964            lambda address, error_code: (
3965                pending_name.set_exception(HCI_Error(error_code))
3966                if address == peer_address
3967                else None
3968            ),
3969        )
3970
3971        try:
3972            result = await self.send_command(
3973                HCI_Remote_Name_Request_Command(
3974                    bd_addr=peer_address,
3975                    page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2,
3976                    reserved=0,
3977                    clock_offset=0,  # TODO investigate non-0 values
3978                )
3979            )
3980
3981            if result.status != HCI_COMMAND_STATUS_PENDING:
3982                logger.warning(
3983                    'HCI_Remote_Name_Request_Command failed: '
3984                    f'{HCI_Constant.error_name(result.status)}'
3985                )
3986                raise HCI_StatusError(result)
3987
3988            # Wait for the result
3989            return await self.abort_on('flush', pending_name)
3990        finally:
3991            self.remove_listener('remote_name', handler)
3992            self.remove_listener('remote_name_failure', failure_handler)
3993
3994    # [LE only]
3995    @experimental('Only for testing.')
3996    async def setup_cig(
3997        self,
3998        cig_id: int,
3999        cis_id: List[int],
4000        sdu_interval: Tuple[int, int],
4001        framing: int,
4002        max_sdu: Tuple[int, int],
4003        retransmission_number: int,
4004        max_transport_latency: Tuple[int, int],
4005    ) -> List[int]:
4006        """Sends HCI_LE_Set_CIG_Parameters_Command.
4007
4008        Args:
4009            cig_id: CIG_ID.
4010            cis_id: CID ID list.
4011            sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental).
4012            framing: Un-framing(0) or Framing(1).
4013            max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental).
4014            retransmission_number: retransmission_number.
4015            max_transport_latency: Max transport latencies of
4016                                   (Central->Peripheral, Peripheral->Cental).
4017
4018        Returns:
4019            List of created CIS handles corresponding to the same order of [cid_id].
4020        """
4021        num_cis = len(cis_id)
4022
4023        response = await self.send_command(
4024            HCI_LE_Set_CIG_Parameters_Command(
4025                cig_id=cig_id,
4026                sdu_interval_c_to_p=sdu_interval[0],
4027                sdu_interval_p_to_c=sdu_interval[1],
4028                worst_case_sca=0x00,  # 251-500 ppm
4029                packing=0x00,  # Sequential
4030                framing=framing,
4031                max_transport_latency_c_to_p=max_transport_latency[0],
4032                max_transport_latency_p_to_c=max_transport_latency[1],
4033                cis_id=cis_id,
4034                max_sdu_c_to_p=[max_sdu[0]] * num_cis,
4035                max_sdu_p_to_c=[max_sdu[1]] * num_cis,
4036                phy_c_to_p=[HCI_LE_2M_PHY] * num_cis,
4037                phy_p_to_c=[HCI_LE_2M_PHY] * num_cis,
4038                rtn_c_to_p=[retransmission_number] * num_cis,
4039                rtn_p_to_c=[retransmission_number] * num_cis,
4040            ),
4041            check_result=True,
4042        )
4043
4044        # Ideally, we should manage CIG lifecycle, but they are not useful for Unicast
4045        # Server, so here it only provides a basic functionality for testing.
4046        cis_handles = response.return_parameters.connection_handle[:]
4047        for id, cis_handle in zip(cis_id, cis_handles):
4048            self._pending_cis[cis_handle] = (id, cig_id)
4049
4050        return cis_handles
4051
4052    # [LE only]
4053    @experimental('Only for testing.')
4054    async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> List[CisLink]:
4055        for cis_handle, acl_handle in cis_acl_pairs:
4056            acl_connection = self.lookup_connection(acl_handle)
4057            assert acl_connection
4058            cis_id, cig_id = self._pending_cis.pop(cis_handle)
4059            self.cis_links[cis_handle] = CisLink(
4060                device=self,
4061                acl_connection=acl_connection,
4062                handle=cis_handle,
4063                cis_id=cis_id,
4064                cig_id=cig_id,
4065            )
4066
4067        with closing(EventWatcher()) as watcher:
4068            pending_cis_establishments = {
4069                cis_handle: asyncio.get_running_loop().create_future()
4070                for cis_handle, _ in cis_acl_pairs
4071            }
4072
4073            def on_cis_establishment(cis_link: CisLink) -> None:
4074                if pending_future := pending_cis_establishments.get(cis_link.handle):
4075                    pending_future.set_result(cis_link)
4076
4077            def on_cis_establishment_failure(cis_handle: int, status: int) -> None:
4078                if pending_future := pending_cis_establishments.get(cis_handle):
4079                    pending_future.set_exception(HCI_Error(status))
4080
4081            watcher.on(self, 'cis_establishment', on_cis_establishment)
4082            watcher.on(self, 'cis_establishment_failure', on_cis_establishment_failure)
4083            await self.send_command(
4084                HCI_LE_Create_CIS_Command(
4085                    cis_connection_handle=[p[0] for p in cis_acl_pairs],
4086                    acl_connection_handle=[p[1] for p in cis_acl_pairs],
4087                ),
4088                check_result=True,
4089            )
4090
4091            return await asyncio.gather(*pending_cis_establishments.values())
4092
4093    # [LE only]
4094    @experimental('Only for testing.')
4095    async def accept_cis_request(self, handle: int) -> CisLink:
4096        """[LE Only] Accepts an incoming CIS request.
4097
4098        When the specified CIS handle is already created, this method returns the
4099        existed CIS link object immediately.
4100
4101        Args:
4102            handle: CIS handle to accept.
4103
4104        Returns:
4105            CIS link object on the given handle.
4106        """
4107        if not (cis_link := self.cis_links.get(handle)):
4108            raise InvalidStateError(f'No pending CIS request of handle {handle}')
4109
4110        # There might be multiple ASE sharing a CIS channel.
4111        # If one of them has accepted the request, the others should just leverage it.
4112        async with self._cis_lock:
4113            if cis_link.state == CisLink.State.ESTABLISHED:
4114                return cis_link
4115
4116            with closing(EventWatcher()) as watcher:
4117                pending_establishment = asyncio.get_running_loop().create_future()
4118
4119                def on_establishment() -> None:
4120                    pending_establishment.set_result(None)
4121
4122                def on_establishment_failure(status: int) -> None:
4123                    pending_establishment.set_exception(HCI_Error(status))
4124
4125                watcher.on(cis_link, 'establishment', on_establishment)
4126                watcher.on(cis_link, 'establishment_failure', on_establishment_failure)
4127
4128                await self.send_command(
4129                    HCI_LE_Accept_CIS_Request_Command(connection_handle=handle),
4130                    check_result=True,
4131                )
4132
4133                await pending_establishment
4134                return cis_link
4135
4136        # Mypy believes this is reachable when context is an ExitStack.
4137        raise UnreachableError()
4138
4139    # [LE only]
4140    @experimental('Only for testing.')
4141    async def reject_cis_request(
4142        self,
4143        handle: int,
4144        reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
4145    ) -> None:
4146        await self.send_command(
4147            HCI_LE_Reject_CIS_Request_Command(connection_handle=handle, reason=reason),
4148            check_result=True,
4149        )
4150
4151    async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask:
4152        """[LE Only] Reads remote LE supported features.
4153
4154        Args:
4155            handle: connection handle to read LE features.
4156
4157        Returns:
4158            LE features supported by the remote device.
4159        """
4160        with closing(EventWatcher()) as watcher:
4161            read_feature_future: asyncio.Future[LeFeatureMask] = (
4162                asyncio.get_running_loop().create_future()
4163            )
4164
4165            def on_le_remote_features(handle: int, features: int):
4166                if handle == connection.handle:
4167                    read_feature_future.set_result(LeFeatureMask(features))
4168
4169            def on_failure(handle: int, status: int):
4170                if handle == connection.handle:
4171                    read_feature_future.set_exception(HCI_Error(status))
4172
4173            watcher.on(self.host, 'le_remote_features', on_le_remote_features)
4174            watcher.on(self.host, 'le_remote_features_failure', on_failure)
4175            await self.send_command(
4176                HCI_LE_Read_Remote_Features_Command(
4177                    connection_handle=connection.handle
4178                ),
4179                check_result=True,
4180            )
4181            return await read_feature_future
4182
4183    @host_event_handler
4184    def on_flush(self):
4185        self.emit('flush')
4186        for _, connection in self.connections.items():
4187            connection.emit('disconnection', 0)
4188        self.connections = {}
4189
4190    # [Classic only]
4191    @host_event_handler
4192    def on_link_key(self, bd_addr, link_key, key_type):
4193        # Store the keys in the key store
4194        if self.keystore:
4195            authenticated = key_type in (
4196                HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
4197                HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
4198            )
4199            pairing_keys = PairingKeys()
4200            pairing_keys.link_key = PairingKeys.Key(
4201                value=link_key, authenticated=authenticated
4202            )
4203
4204            self.abort_on('flush', self.update_keys(str(bd_addr), pairing_keys))
4205
4206        if connection := self.find_connection_by_bd_addr(
4207            bd_addr, transport=BT_BR_EDR_TRANSPORT
4208        ):
4209            connection.link_key_type = key_type
4210
4211    def add_service(self, service):
4212        self.gatt_server.add_service(service)
4213
4214    def add_services(self, services):
4215        self.gatt_server.add_services(services)
4216
4217    def add_default_services(self, generic_access_service=True):
4218        # Add a GAP Service if requested
4219        if generic_access_service:
4220            self.gatt_server.add_service(GenericAccessService(self.name))
4221
4222    async def notify_subscriber(self, connection, attribute, value=None, force=False):
4223        await self.gatt_server.notify_subscriber(connection, attribute, value, force)
4224
4225    async def notify_subscribers(self, attribute, value=None, force=False):
4226        await self.gatt_server.notify_subscribers(attribute, value, force)
4227
4228    async def indicate_subscriber(self, connection, attribute, value=None, force=False):
4229        await self.gatt_server.indicate_subscriber(connection, attribute, value, force)
4230
4231    async def indicate_subscribers(self, attribute, value=None, force=False):
4232        await self.gatt_server.indicate_subscribers(attribute, value, force)
4233
4234    @host_event_handler
4235    def on_advertising_set_termination(
4236        self,
4237        status,
4238        advertising_handle,
4239        connection_handle,
4240        number_of_completed_extended_advertising_events,
4241    ):
4242        # Legacy advertising set is also one of extended advertising sets.
4243        if not (
4244            advertising_set := self.extended_advertising_sets.get(advertising_handle)
4245        ):
4246            logger.warning(f'advertising set {advertising_handle} not found')
4247            return
4248
4249        advertising_set.on_termination(status)
4250
4251        if status != HCI_SUCCESS:
4252            logger.debug(
4253                f'advertising set {advertising_handle} '
4254                f'terminated with status {status}'
4255            )
4256            return
4257
4258        if connection := self.lookup_connection(connection_handle):
4259            # We have already received the connection complete event.
4260            self._complete_le_extended_advertising_connection(
4261                connection, advertising_set
4262            )
4263            return
4264
4265        # Associate the connection handle with the advertising set, the connection
4266        # will complete later.
4267        logger.debug(
4268            f'the connection with handle {connection_handle:04X} will complete later'
4269        )
4270        self.connecting_extended_advertising_sets[connection_handle] = advertising_set
4271
4272    def _complete_le_extended_advertising_connection(
4273        self, connection: Connection, advertising_set: AdvertisingSet
4274    ) -> None:
4275        # Update the connection address.
4276        connection.self_address = (
4277            advertising_set.random_address
4278            if advertising_set.random_address is not None
4279            and advertising_set.advertising_parameters.own_address_type
4280            in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM)
4281            else self.public_address
4282        )
4283
4284        if advertising_set.advertising_parameters.own_address_type in (
4285            OwnAddressType.RANDOM,
4286            OwnAddressType.PUBLIC,
4287        ):
4288            connection.self_resolvable_address = None
4289
4290        # Setup auto-restart of the advertising set if needed.
4291        if advertising_set.auto_restart:
4292            connection.once(
4293                'disconnection',
4294                lambda _: self.abort_on('flush', advertising_set.start()),
4295            )
4296
4297        self._emit_le_connection(connection)
4298
4299    def _emit_le_connection(self, connection: Connection) -> None:
4300        # If supported, read which PHY we're connected with before
4301        # notifying listeners of the new connection.
4302        if self.host.supports_command(HCI_LE_READ_PHY_COMMAND):
4303
4304            async def read_phy():
4305                result = await self.send_command(
4306                    HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
4307                    check_result=True,
4308                )
4309                connection.phy = ConnectionPHY(
4310                    result.return_parameters.tx_phy, result.return_parameters.rx_phy
4311                )
4312                # Emit an event to notify listeners of the new connection
4313                self.emit('connection', connection)
4314
4315            # Do so asynchronously to not block the current event handler
4316            connection.abort_on('disconnection', read_phy())
4317
4318            return
4319
4320        self.emit('connection', connection)
4321
4322    @host_event_handler
4323    def on_connection(
4324        self,
4325        connection_handle: int,
4326        transport: int,
4327        peer_address: Address,
4328        self_resolvable_address: Optional[Address],
4329        peer_resolvable_address: Optional[Address],
4330        role: int,
4331        connection_parameters: ConnectionParameters,
4332    ) -> None:
4333        # Convert all-zeros addresses into None.
4334        if self_resolvable_address == Address.ANY_RANDOM:
4335            self_resolvable_address = None
4336        if (
4337            peer_resolvable_address == Address.ANY_RANDOM
4338            or not peer_address.is_resolved
4339        ):
4340            peer_resolvable_address = None
4341
4342        logger.debug(
4343            f'*** Connection: [0x{connection_handle:04X}] '
4344            f'{peer_address} {"" if role is None else HCI_Constant.role_name(role)}'
4345        )
4346        if connection_handle in self.connections:
4347            logger.warning(
4348                'new connection reuses the same handle as a previous connection'
4349            )
4350
4351        if transport == BT_BR_EDR_TRANSPORT:
4352            # Create a new connection
4353            connection = self.pending_connections.pop(peer_address)
4354            connection.complete(connection_handle, connection_parameters)
4355            self.connections[connection_handle] = connection
4356
4357            # Emit an event to notify listeners of the new connection
4358            self.emit('connection', connection)
4359
4360            return
4361
4362        if peer_resolvable_address is None:
4363            # Resolve the peer address if we can
4364            if self.address_resolver:
4365                if peer_address.is_resolvable:
4366                    resolved_address = self.address_resolver.resolve(peer_address)
4367                    if resolved_address is not None:
4368                        logger.debug(f'*** Address resolved as {resolved_address}')
4369                        peer_resolvable_address = peer_address
4370                        peer_address = resolved_address
4371
4372        self_address = None
4373        own_address_type: Optional[int] = None
4374        if role == HCI_CENTRAL_ROLE:
4375            own_address_type = self.connect_own_address_type
4376            assert own_address_type is not None
4377        else:
4378            if self.supports_le_extended_advertising:
4379                # We'll know the address when the advertising set terminates,
4380                # Use a temporary placeholder value for self_address.
4381                self_address = Address.ANY_RANDOM
4382            else:
4383                # We were connected via a legacy advertisement.
4384                if self.legacy_advertiser:
4385                    own_address_type = self.legacy_advertiser.own_address_type
4386                else:
4387                    # This should not happen, but just in case, pick a default.
4388                    logger.warning("connection without an advertiser")
4389                    self_address = self.random_address
4390
4391        if self_address is None:
4392            self_address = (
4393                self.public_address
4394                if own_address_type
4395                in (
4396                    OwnAddressType.PUBLIC,
4397                    OwnAddressType.RESOLVABLE_OR_PUBLIC,
4398                )
4399                else self.random_address
4400            )
4401
4402        # Some controllers may return local resolvable address even not using address
4403        # generation offloading. Ignore the value to prevent SMP failure.
4404        if own_address_type in (OwnAddressType.RANDOM, OwnAddressType.PUBLIC):
4405            self_resolvable_address = None
4406
4407        # Create a connection.
4408        connection = Connection(
4409            self,
4410            connection_handle,
4411            transport,
4412            self_address,
4413            self_resolvable_address,
4414            peer_address,
4415            peer_resolvable_address,
4416            role,
4417            connection_parameters,
4418            ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY),
4419        )
4420        self.connections[connection_handle] = connection
4421
4422        if role == HCI_PERIPHERAL_ROLE and self.legacy_advertiser:
4423            if self.legacy_advertiser.auto_restart:
4424                advertiser = self.legacy_advertiser
4425                connection.once(
4426                    'disconnection',
4427                    lambda _: self.abort_on('flush', advertiser.start()),
4428                )
4429            else:
4430                self.legacy_advertiser = None
4431
4432        if role == HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising:
4433            # We can emit now, we have all the info we need
4434            self._emit_le_connection(connection)
4435            return
4436
4437        if role == HCI_PERIPHERAL_ROLE and self.supports_le_extended_advertising:
4438            if advertising_set := self.connecting_extended_advertising_sets.pop(
4439                connection_handle, None
4440            ):
4441                # We have already received the advertising set termination event.
4442                self._complete_le_extended_advertising_connection(
4443                    connection, advertising_set
4444                )
4445
4446    @host_event_handler
4447    def on_connection_failure(self, transport, peer_address, error_code):
4448        logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}')
4449
4450        # For directed advertising, this means a timeout
4451        if (
4452            transport == BT_LE_TRANSPORT
4453            and self.legacy_advertiser
4454            and self.legacy_advertiser.advertising_type.is_directed
4455        ):
4456            self.legacy_advertiser = None
4457
4458        # Notify listeners
4459        error = core.ConnectionError(
4460            error_code,
4461            transport,
4462            peer_address,
4463            'hci',
4464            HCI_Constant.error_name(error_code),
4465        )
4466        self.emit('connection_failure', error)
4467
4468    # FIXME: Explore a delegate-model for BR/EDR wait connection #56.
4469    @host_event_handler
4470    def on_connection_request(self, bd_addr, class_of_device, link_type):
4471        logger.debug(f'*** Connection request: {bd_addr}')
4472
4473        # Handle SCO request.
4474        if link_type in (
4475            HCI_Connection_Complete_Event.SCO_LINK_TYPE,
4476            HCI_Connection_Complete_Event.ESCO_LINK_TYPE,
4477        ):
4478            if connection := self.find_connection_by_bd_addr(
4479                bd_addr, transport=BT_BR_EDR_TRANSPORT
4480            ):
4481                self.emit('sco_request', connection, link_type)
4482            else:
4483                logger.error(f'SCO request from a non-connected device {bd_addr}')
4484            return
4485
4486        # match a pending future using `bd_addr`
4487        elif bd_addr in self.classic_pending_accepts:
4488            future, *_ = self.classic_pending_accepts.pop(bd_addr)
4489            future.set_result((bd_addr, class_of_device, link_type))
4490
4491        # match first pending future for ANY address
4492        elif len(self.classic_pending_accepts[Address.ANY]) > 0:
4493            future = self.classic_pending_accepts[Address.ANY].pop(0)
4494            future.set_result((bd_addr, class_of_device, link_type))
4495
4496        # device configuration is set to accept any incoming connection
4497        elif self.classic_accept_any:
4498            # Save pending connection
4499            self.pending_connections[bd_addr] = Connection.incomplete(
4500                self, bd_addr, BT_PERIPHERAL_ROLE
4501            )
4502
4503            self.host.send_command_sync(
4504                HCI_Accept_Connection_Request_Command(
4505                    bd_addr=bd_addr, role=0x01  # Remain the peripheral
4506                )
4507            )
4508
4509        # reject incoming connection
4510        else:
4511            self.host.send_command_sync(
4512                HCI_Reject_Connection_Request_Command(
4513                    bd_addr=bd_addr,
4514                    reason=HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
4515                )
4516            )
4517
4518    @host_event_handler
4519    def on_disconnection(self, connection_handle: int, reason: int) -> None:
4520        if connection := self.connections.pop(connection_handle, None):
4521            logger.debug(
4522                f'*** Disconnection: [0x{connection.handle:04X}] '
4523                f'{connection.peer_address} as {connection.role_name}, reason={reason}'
4524            )
4525            connection.emit('disconnection', reason)
4526
4527            # Cleanup subsystems that maintain per-connection state
4528            self.gatt_server.on_disconnection(connection)
4529        elif sco_link := self.sco_links.pop(connection_handle, None):
4530            sco_link.emit('disconnection', reason)
4531        elif cis_link := self.cis_links.pop(connection_handle, None):
4532            cis_link.emit('disconnection', reason)
4533        else:
4534            logger.error(
4535                f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***'
4536            )
4537
4538    @host_event_handler
4539    @with_connection_from_handle
4540    def on_disconnection_failure(self, connection, error_code):
4541        logger.debug(f'*** Disconnection failed: {error_code}')
4542        error = core.ConnectionError(
4543            error_code,
4544            connection.transport,
4545            connection.peer_address,
4546            'hci',
4547            HCI_Constant.error_name(error_code),
4548        )
4549        connection.emit('disconnection_failure', error)
4550
4551    @host_event_handler
4552    @AsyncRunner.run_in_task()
4553    async def on_inquiry_complete(self):
4554        if self.auto_restart_inquiry:
4555            # Inquire again
4556            await self.start_discovery(auto_restart=True)
4557        else:
4558            self.auto_restart_inquiry = True
4559            self.discovering = False
4560            self.emit('inquiry_complete')
4561
4562    @host_event_handler
4563    @with_connection_from_handle
4564    def on_connection_authentication(self, connection):
4565        logger.debug(
4566            f'*** Connection Authentication: [0x{connection.handle:04X}] '
4567            f'{connection.peer_address} as {connection.role_name}'
4568        )
4569        connection.authenticated = True
4570        connection.emit('connection_authentication')
4571
4572    @host_event_handler
4573    @with_connection_from_handle
4574    def on_connection_authentication_failure(self, connection, error):
4575        logger.debug(
4576            f'*** Connection Authentication Failure: [0x{connection.handle:04X}] '
4577            f'{connection.peer_address} as {connection.role_name}, error={error}'
4578        )
4579        connection.emit('connection_authentication_failure', error)
4580
4581    # [Classic only]
4582    @host_event_handler
4583    @with_connection_from_address
4584    def on_authentication_io_capability_request(self, connection):
4585        # Ask what the pairing config should be for this connection
4586        pairing_config = self.pairing_config_factory(connection)
4587
4588        # Compute the authentication requirements
4589        authentication_requirements = (
4590            # No Bonding
4591            (
4592                HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
4593                HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
4594            ),
4595            # General Bonding
4596            (
4597                HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
4598                HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
4599            ),
4600        )[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0]
4601
4602        # Respond
4603        self.host.send_command_sync(
4604            HCI_IO_Capability_Request_Reply_Command(
4605                bd_addr=connection.peer_address,
4606                io_capability=pairing_config.delegate.classic_io_capability,
4607                oob_data_present=0x00,  # Not present
4608                authentication_requirements=authentication_requirements,
4609            )
4610        )
4611
4612    # [Classic only]
4613    @host_event_handler
4614    @with_connection_from_address
4615    def on_authentication_io_capability_response(
4616        self, connection, io_capability, authentication_requirements
4617    ):
4618        connection.peer_pairing_io_capability = io_capability
4619        connection.peer_pairing_authentication_requirements = (
4620            authentication_requirements
4621        )
4622
4623    # [Classic only]
4624    @host_event_handler
4625    @with_connection_from_address
4626    def on_authentication_user_confirmation_request(self, connection, code) -> None:
4627        # Ask what the pairing config should be for this connection
4628        pairing_config = self.pairing_config_factory(connection)
4629        io_capability = pairing_config.delegate.classic_io_capability
4630        peer_io_capability = connection.peer_pairing_io_capability
4631
4632        async def confirm() -> bool:
4633            # Ask the user to confirm the pairing, without display
4634            return await pairing_config.delegate.confirm()
4635
4636        async def auto_confirm() -> bool:
4637            # Ask the user to auto-confirm the pairing, without display
4638            return await pairing_config.delegate.confirm(auto=True)
4639
4640        async def display_confirm() -> bool:
4641            # Display the code and ask the user to compare
4642            return await pairing_config.delegate.compare_numbers(code, digits=6)
4643
4644        async def display_auto_confirm() -> bool:
4645            # Display the code to the user and ask the delegate to auto-confirm
4646            await pairing_config.delegate.display_number(code, digits=6)
4647            return await pairing_config.delegate.confirm(auto=True)
4648
4649        async def na() -> bool:
4650            raise UnreachableError()
4651
4652        # See Bluetooth spec @ Vol 3, Part C 5.2.2.6
4653        methods = {
4654            HCI_DISPLAY_ONLY_IO_CAPABILITY: {
4655                HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm,
4656                HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm,
4657                HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
4658                HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
4659            },
4660            HCI_DISPLAY_YES_NO_IO_CAPABILITY: {
4661                HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm,
4662                HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm,
4663                HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
4664                HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
4665            },
4666            HCI_KEYBOARD_ONLY_IO_CAPABILITY: {
4667                HCI_DISPLAY_ONLY_IO_CAPABILITY: na,
4668                HCI_DISPLAY_YES_NO_IO_CAPABILITY: na,
4669                HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
4670                HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
4671            },
4672            HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: {
4673                HCI_DISPLAY_ONLY_IO_CAPABILITY: confirm,
4674                HCI_DISPLAY_YES_NO_IO_CAPABILITY: confirm,
4675                HCI_KEYBOARD_ONLY_IO_CAPABILITY: auto_confirm,
4676                HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
4677            },
4678        }
4679
4680        method = methods[peer_io_capability][io_capability]
4681
4682        async def reply() -> None:
4683            try:
4684                if await connection.abort_on('disconnection', method()):
4685                    await self.host.send_command(
4686                        HCI_User_Confirmation_Request_Reply_Command(
4687                            bd_addr=connection.peer_address
4688                        )
4689                    )
4690                    return
4691            except Exception as error:
4692                logger.warning(f'exception while confirming: {error}')
4693
4694            await self.host.send_command(
4695                HCI_User_Confirmation_Request_Negative_Reply_Command(
4696                    bd_addr=connection.peer_address
4697                )
4698            )
4699
4700        AsyncRunner.spawn(reply())
4701
4702    # [Classic only]
4703    @host_event_handler
4704    @with_connection_from_address
4705    def on_authentication_user_passkey_request(self, connection) -> None:
4706        # Ask what the pairing config should be for this connection
4707        pairing_config = self.pairing_config_factory(connection)
4708
4709        async def reply() -> None:
4710            try:
4711                number = await connection.abort_on(
4712                    'disconnection', pairing_config.delegate.get_number()
4713                )
4714                if number is not None:
4715                    await self.host.send_command(
4716                        HCI_User_Passkey_Request_Reply_Command(
4717                            bd_addr=connection.peer_address, numeric_value=number
4718                        )
4719                    )
4720                    return
4721            except Exception as error:
4722                logger.warning(f'exception while asking for pass-key: {error}')
4723
4724            await self.host.send_command(
4725                HCI_User_Passkey_Request_Negative_Reply_Command(
4726                    bd_addr=connection.peer_address
4727                )
4728            )
4729
4730        AsyncRunner.spawn(reply())
4731
4732    # [Classic only]
4733    @host_event_handler
4734    @with_connection_from_address
4735    def on_pin_code_request(self, connection):
4736        # Classic legacy pairing
4737        # Ask what the pairing config should be for this connection
4738        pairing_config = self.pairing_config_factory(connection)
4739        io_capability = pairing_config.delegate.classic_io_capability
4740
4741        # Respond
4742        if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY:
4743            # Ask the user to enter a string
4744            async def get_pin_code():
4745                pin_code = await connection.abort_on(
4746                    'disconnection', pairing_config.delegate.get_string(16)
4747                )
4748
4749                if pin_code is not None:
4750                    pin_code = bytes(pin_code, encoding='utf-8')
4751                    pin_code_len = len(pin_code)
4752                    assert 0 < pin_code_len <= 16, "pin_code should be 1-16 bytes"
4753                    await self.host.send_command(
4754                        HCI_PIN_Code_Request_Reply_Command(
4755                            bd_addr=connection.peer_address,
4756                            pin_code_length=pin_code_len,
4757                            pin_code=pin_code,
4758                        )
4759                    )
4760                else:
4761                    logger.debug("delegate.get_string() returned None")
4762                    await self.host.send_command(
4763                        HCI_PIN_Code_Request_Negative_Reply_Command(
4764                            bd_addr=connection.peer_address
4765                        )
4766                    )
4767
4768            asyncio.create_task(get_pin_code())
4769        else:
4770            self.host.send_command_sync(
4771                HCI_PIN_Code_Request_Negative_Reply_Command(
4772                    bd_addr=connection.peer_address
4773                )
4774            )
4775
4776    # [Classic only]
4777    @host_event_handler
4778    @with_connection_from_address
4779    def on_authentication_user_passkey_notification(self, connection, passkey):
4780        # Ask what the pairing config should be for this connection
4781        pairing_config = self.pairing_config_factory(connection)
4782
4783        # Show the passkey to the user
4784        connection.abort_on(
4785            'disconnection', pairing_config.delegate.display_number(passkey)
4786        )
4787
4788    # [Classic only]
4789    @host_event_handler
4790    @try_with_connection_from_address
4791    def on_remote_name(self, connection: Connection, address, remote_name):
4792        # Try to decode the name
4793        try:
4794            remote_name = remote_name.decode('utf-8')
4795            if connection:
4796                connection.peer_name = remote_name
4797                connection.emit('remote_name')
4798            self.emit('remote_name', address, remote_name)
4799        except UnicodeDecodeError as error:
4800            logger.warning('peer name is not valid UTF-8')
4801            if connection:
4802                connection.emit('remote_name_failure', error)
4803            else:
4804                self.emit('remote_name_failure', address, error)
4805
4806    # [Classic only]
4807    @host_event_handler
4808    @try_with_connection_from_address
4809    def on_remote_name_failure(self, connection: Connection, address, error):
4810        if connection:
4811            connection.emit('remote_name_failure', error)
4812        self.emit('remote_name_failure', address, error)
4813
4814    # [Classic only]
4815    @host_event_handler
4816    @with_connection_from_address
4817    @experimental('Only for testing.')
4818    def on_sco_connection(
4819        self, acl_connection: Connection, sco_handle: int, link_type: int
4820    ) -> None:
4821        logger.debug(
4822            f'*** SCO connected: {acl_connection.peer_address}, '
4823            f'sco_handle=[0x{sco_handle:04X}], '
4824            f'link_type=[0x{link_type:02X}] ***'
4825        )
4826        sco_link = self.sco_links[sco_handle] = ScoLink(
4827            device=self,
4828            acl_connection=acl_connection,
4829            handle=sco_handle,
4830            link_type=link_type,
4831        )
4832        self.emit('sco_connection', sco_link)
4833
4834    # [Classic only]
4835    @host_event_handler
4836    @with_connection_from_address
4837    @experimental('Only for testing.')
4838    def on_sco_connection_failure(
4839        self, acl_connection: Connection, status: int
4840    ) -> None:
4841        logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***')
4842        self.emit('sco_connection_failure')
4843
4844    # [Classic only]
4845    @host_event_handler
4846    @experimental('Only for testing')
4847    def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None:
4848        if (sco_link := self.sco_links.get(sco_handle)) and sco_link.sink:
4849            sco_link.sink(packet)
4850
4851    # [LE only]
4852    @host_event_handler
4853    @with_connection_from_handle
4854    @experimental('Only for testing')
4855    def on_cis_request(
4856        self,
4857        acl_connection: Connection,
4858        cis_handle: int,
4859        cig_id: int,
4860        cis_id: int,
4861    ) -> None:
4862        logger.debug(
4863            f'*** CIS Request '
4864            f'acl_handle=[0x{acl_connection.handle:04X}]{acl_connection.peer_address}, '
4865            f'cis_handle=[0x{cis_handle:04X}], '
4866            f'cig_id=[0x{cig_id:02X}], '
4867            f'cis_id=[0x{cis_id:02X}] ***'
4868        )
4869        # LE_CIS_Established event doesn't provide info, so we must store them here.
4870        self.cis_links[cis_handle] = CisLink(
4871            device=self,
4872            acl_connection=acl_connection,
4873            handle=cis_handle,
4874            cig_id=cig_id,
4875            cis_id=cis_id,
4876        )
4877        self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id)
4878
4879    # [LE only]
4880    @host_event_handler
4881    @experimental('Only for testing')
4882    def on_cis_establishment(self, cis_handle: int) -> None:
4883        cis_link = self.cis_links[cis_handle]
4884        cis_link.state = CisLink.State.ESTABLISHED
4885
4886        assert cis_link.acl_connection
4887
4888        logger.debug(
4889            f'*** CIS Establishment '
4890            f'{cis_link.acl_connection.peer_address}, '
4891            f'cis_handle=[0x{cis_handle:04X}], '
4892            f'cig_id=[0x{cis_link.cig_id:02X}], '
4893            f'cis_id=[0x{cis_link.cis_id:02X}] ***'
4894        )
4895
4896        cis_link.emit('establishment')
4897        self.emit('cis_establishment', cis_link)
4898
4899    # [LE only]
4900    @host_event_handler
4901    @experimental('Only for testing')
4902    def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None:
4903        logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***')
4904        if cis_link := self.cis_links.pop(cis_handle):
4905            cis_link.emit('establishment_failure', status)
4906        self.emit('cis_establishment_failure', cis_handle, status)
4907
4908    # [LE only]
4909    @host_event_handler
4910    @experimental('Only for testing')
4911    def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None:
4912        if (cis_link := self.cis_links.get(handle)) and cis_link.sink:
4913            cis_link.sink(packet)
4914
4915    @host_event_handler
4916    @with_connection_from_handle
4917    def on_connection_encryption_change(self, connection, encryption):
4918        logger.debug(
4919            f'*** Connection Encryption Change: [0x{connection.handle:04X}] '
4920            f'{connection.peer_address} as {connection.role_name}, '
4921            f'encryption={encryption}'
4922        )
4923        connection.encryption = encryption
4924        if (
4925            not connection.authenticated
4926            and connection.transport == BT_BR_EDR_TRANSPORT
4927            and encryption == HCI_Encryption_Change_Event.AES_CCM
4928        ):
4929            connection.authenticated = True
4930            connection.sc = True
4931        if (
4932            not connection.authenticated
4933            and connection.transport == BT_LE_TRANSPORT
4934            and encryption == HCI_Encryption_Change_Event.E0_OR_AES_CCM
4935        ):
4936            connection.authenticated = True
4937            connection.sc = True
4938        connection.emit('connection_encryption_change')
4939
4940    @host_event_handler
4941    @with_connection_from_handle
4942    def on_connection_encryption_failure(self, connection, error):
4943        logger.debug(
4944            f'*** Connection Encryption Failure: [0x{connection.handle:04X}] '
4945            f'{connection.peer_address} as {connection.role_name}, '
4946            f'error={error}'
4947        )
4948        connection.emit('connection_encryption_failure', error)
4949
4950    @host_event_handler
4951    @with_connection_from_handle
4952    def on_connection_encryption_key_refresh(self, connection):
4953        logger.debug(
4954            f'*** Connection Key Refresh: [0x{connection.handle:04X}] '
4955            f'{connection.peer_address} as {connection.role_name}'
4956        )
4957        connection.emit('connection_encryption_key_refresh')
4958
4959    @host_event_handler
4960    @with_connection_from_handle
4961    def on_connection_parameters_update(self, connection, connection_parameters):
4962        logger.debug(
4963            f'*** Connection Parameters Update: [0x{connection.handle:04X}] '
4964            f'{connection.peer_address} as {connection.role_name}, '
4965            f'{connection_parameters}'
4966        )
4967        connection.parameters = connection_parameters
4968        connection.emit('connection_parameters_update')
4969
4970    @host_event_handler
4971    @with_connection_from_handle
4972    def on_connection_parameters_update_failure(self, connection, error):
4973        logger.debug(
4974            f'*** Connection Parameters Update Failed: [0x{connection.handle:04X}] '
4975            f'{connection.peer_address} as {connection.role_name}, '
4976            f'error={error}'
4977        )
4978        connection.emit('connection_parameters_update_failure', error)
4979
4980    @host_event_handler
4981    @with_connection_from_handle
4982    def on_connection_phy_update(self, connection, connection_phy):
4983        logger.debug(
4984            f'*** Connection PHY Update: [0x{connection.handle:04X}] '
4985            f'{connection.peer_address} as {connection.role_name}, '
4986            f'{connection_phy}'
4987        )
4988        connection.phy = connection_phy
4989        connection.emit('connection_phy_update')
4990
4991    @host_event_handler
4992    @with_connection_from_handle
4993    def on_connection_phy_update_failure(self, connection, error):
4994        logger.debug(
4995            f'*** Connection PHY Update Failed: [0x{connection.handle:04X}] '
4996            f'{connection.peer_address} as {connection.role_name}, '
4997            f'error={error}'
4998        )
4999        connection.emit('connection_phy_update_failure', error)
5000
5001    @host_event_handler
5002    @with_connection_from_handle
5003    def on_connection_att_mtu_update(self, connection, att_mtu):
5004        logger.debug(
5005            f'*** Connection ATT MTU Update: [0x{connection.handle:04X}] '
5006            f'{connection.peer_address} as {connection.role_name}, '
5007            f'{att_mtu}'
5008        )
5009        connection.att_mtu = att_mtu
5010        connection.emit('connection_att_mtu_update')
5011
5012    @host_event_handler
5013    @with_connection_from_handle
5014    def on_connection_data_length_change(
5015        self, connection, max_tx_octets, max_tx_time, max_rx_octets, max_rx_time
5016    ):
5017        logger.debug(
5018            f'*** Connection Data Length Change: [0x{connection.handle:04X}] '
5019            f'{connection.peer_address} as {connection.role_name}'
5020        )
5021        connection.data_length = (
5022            max_tx_octets,
5023            max_tx_time,
5024            max_rx_octets,
5025            max_rx_time,
5026        )
5027        connection.emit('connection_data_length_change')
5028
5029    # [Classic only]
5030    @host_event_handler
5031    @with_connection_from_address
5032    def on_role_change(self, connection, new_role):
5033        connection.role = new_role
5034        connection.emit('role_change', new_role)
5035
5036    # [Classic only]
5037    @host_event_handler
5038    @try_with_connection_from_address
5039    def on_role_change_failure(self, connection, address, error):
5040        if connection:
5041            connection.emit('role_change_failure', error)
5042        self.emit('role_change_failure', address, error)
5043
5044    # [Classic only]
5045    @host_event_handler
5046    @with_connection_from_address
5047    def on_classic_pairing(self, connection: Connection) -> None:
5048        connection.emit('classic_pairing')
5049
5050    # [Classic only]
5051    @host_event_handler
5052    @with_connection_from_address
5053    def on_classic_pairing_failure(self, connection: Connection, status) -> None:
5054        connection.emit('classic_pairing_failure', status)
5055
5056    def on_pairing_start(self, connection: Connection) -> None:
5057        connection.emit('pairing_start')
5058
5059    def on_pairing(
5060        self,
5061        connection: Connection,
5062        identity_address: Optional[Address],
5063        keys: PairingKeys,
5064        sc: bool,
5065    ) -> None:
5066        if identity_address is not None:
5067            connection.peer_resolvable_address = connection.peer_address
5068            connection.peer_address = identity_address
5069        connection.sc = sc
5070        connection.authenticated = True
5071        connection.emit('pairing', keys)
5072
5073    def on_pairing_failure(self, connection: Connection, reason: int) -> None:
5074        connection.emit('pairing_failure', reason)
5075
5076    @with_connection_from_handle
5077    def on_gatt_pdu(self, connection, pdu):
5078        # Parse the L2CAP payload into an ATT PDU object
5079        att_pdu = ATT_PDU.from_bytes(pdu)
5080
5081        # Conveniently, even-numbered op codes are client->server and
5082        # odd-numbered ones are server->client
5083        if att_pdu.op_code & 1:
5084            if connection.gatt_client is None:
5085                logger.warning(
5086                    color('no GATT client for connection 0x{connection_handle:04X}')
5087                )
5088                return
5089            connection.gatt_client.on_gatt_pdu(att_pdu)
5090        else:
5091            if connection.gatt_server is None:
5092                logger.warning(
5093                    color('no GATT server for connection 0x{connection_handle:04X}')
5094                )
5095                return
5096            connection.gatt_server.on_gatt_pdu(connection, att_pdu)
5097
5098    @with_connection_from_handle
5099    def on_smp_pdu(self, connection, pdu):
5100        self.smp_manager.on_smp_pdu(connection, pdu)
5101
5102    @host_event_handler
5103    @with_connection_from_handle
5104    def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes):
5105        self.l2cap_channel_manager.on_pdu(connection, cid, pdu)
5106
5107    def __str__(self):
5108        return (
5109            f'Device(name="{self.name}", '
5110            f'random_address="{self.random_address}", '
5111            f'public_address="{self.public_address}", '
5112            f'static_address="{self.static_address}")'
5113        )
5114