1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# SMP - Security Manager Protocol
17#
18# See Bluetooth spec @ Vol 3, Part H
19#
20# -----------------------------------------------------------------------------
21
22# -----------------------------------------------------------------------------
23# Imports
24# -----------------------------------------------------------------------------
25from __future__ import annotations
26import logging
27import asyncio
28import enum
29import secrets
30from dataclasses import dataclass
31from typing import (
32    TYPE_CHECKING,
33    Any,
34    Awaitable,
35    Callable,
36    Dict,
37    List,
38    Optional,
39    Tuple,
40    Type,
41    cast,
42)
43
44from pyee import EventEmitter
45
46from .colors import color
47from .hci import (
48    Address,
49    HCI_LE_Enable_Encryption_Command,
50    HCI_Object,
51    key_with_value,
52)
53from .core import (
54    BT_BR_EDR_TRANSPORT,
55    BT_CENTRAL_ROLE,
56    BT_LE_TRANSPORT,
57    AdvertisingData,
58    InvalidArgumentError,
59    ProtocolError,
60    name_or_number,
61)
62from .keys import PairingKeys
63from . import crypto
64
65if TYPE_CHECKING:
66    from bumble.device import Connection, Device
67    from bumble.pairing import PairingConfig
68
69
70# -----------------------------------------------------------------------------
71# Logging
72# -----------------------------------------------------------------------------
73logger = logging.getLogger(__name__)
74
75
76# -----------------------------------------------------------------------------
77# Constants
78# -----------------------------------------------------------------------------
79# fmt: off
80# pylint: disable=line-too-long
81
82SMP_CID = 0x06
83SMP_BR_CID = 0x07
84
85SMP_PAIRING_REQUEST_COMMAND               = 0x01
86SMP_PAIRING_RESPONSE_COMMAND              = 0x02
87SMP_PAIRING_CONFIRM_COMMAND               = 0x03
88SMP_PAIRING_RANDOM_COMMAND                = 0x04
89SMP_PAIRING_FAILED_COMMAND                = 0x05
90SMP_ENCRYPTION_INFORMATION_COMMAND        = 0x06
91SMP_MASTER_IDENTIFICATION_COMMAND         = 0x07
92SMP_IDENTITY_INFORMATION_COMMAND          = 0x08
93SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND  = 0x09
94SMP_SIGNING_INFORMATION_COMMAND           = 0x0A
95SMP_SECURITY_REQUEST_COMMAND              = 0x0B
96SMP_PAIRING_PUBLIC_KEY_COMMAND            = 0x0C
97SMP_PAIRING_DHKEY_CHECK_COMMAND           = 0x0D
98SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND = 0x0E
99
100SMP_COMMAND_NAMES = {
101    SMP_PAIRING_REQUEST_COMMAND:               'SMP_PAIRING_REQUEST_COMMAND',
102    SMP_PAIRING_RESPONSE_COMMAND:              'SMP_PAIRING_RESPONSE_COMMAND',
103    SMP_PAIRING_CONFIRM_COMMAND:               'SMP_PAIRING_CONFIRM_COMMAND',
104    SMP_PAIRING_RANDOM_COMMAND:                'SMP_PAIRING_RANDOM_COMMAND',
105    SMP_PAIRING_FAILED_COMMAND:                'SMP_PAIRING_FAILED_COMMAND',
106    SMP_ENCRYPTION_INFORMATION_COMMAND:        'SMP_ENCRYPTION_INFORMATION_COMMAND',
107    SMP_MASTER_IDENTIFICATION_COMMAND:         'SMP_MASTER_IDENTIFICATION_COMMAND',
108    SMP_IDENTITY_INFORMATION_COMMAND:          'SMP_IDENTITY_INFORMATION_COMMAND',
109    SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND:  'SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND',
110    SMP_SIGNING_INFORMATION_COMMAND:           'SMP_SIGNING_INFORMATION_COMMAND',
111    SMP_SECURITY_REQUEST_COMMAND:              'SMP_SECURITY_REQUEST_COMMAND',
112    SMP_PAIRING_PUBLIC_KEY_COMMAND:            'SMP_PAIRING_PUBLIC_KEY_COMMAND',
113    SMP_PAIRING_DHKEY_CHECK_COMMAND:           'SMP_PAIRING_DHKEY_CHECK_COMMAND',
114    SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND: 'SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND'
115}
116
117SMP_DISPLAY_ONLY_IO_CAPABILITY       = 0x00
118SMP_DISPLAY_YES_NO_IO_CAPABILITY     = 0x01
119SMP_KEYBOARD_ONLY_IO_CAPABILITY      = 0x02
120SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = 0x03
121SMP_KEYBOARD_DISPLAY_IO_CAPABILITY   = 0x04
122
123SMP_IO_CAPABILITY_NAMES = {
124    SMP_DISPLAY_ONLY_IO_CAPABILITY:       'SMP_DISPLAY_ONLY_IO_CAPABILITY',
125    SMP_DISPLAY_YES_NO_IO_CAPABILITY:     'SMP_DISPLAY_YES_NO_IO_CAPABILITY',
126    SMP_KEYBOARD_ONLY_IO_CAPABILITY:      'SMP_KEYBOARD_ONLY_IO_CAPABILITY',
127    SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: 'SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY',
128    SMP_KEYBOARD_DISPLAY_IO_CAPABILITY:   'SMP_KEYBOARD_DISPLAY_IO_CAPABILITY'
129}
130
131SMP_PASSKEY_ENTRY_FAILED_ERROR                       = 0x01
132SMP_OOB_NOT_AVAILABLE_ERROR                          = 0x02
133SMP_AUTHENTICATION_REQUIREMENTS_ERROR                = 0x03
134SMP_CONFIRM_VALUE_FAILED_ERROR                       = 0x04
135SMP_PAIRING_NOT_SUPPORTED_ERROR                      = 0x05
136SMP_ENCRYPTION_KEY_SIZE_ERROR                        = 0x06
137SMP_COMMAND_NOT_SUPPORTED_ERROR                      = 0x07
138SMP_UNSPECIFIED_REASON_ERROR                         = 0x08
139SMP_REPEATED_ATTEMPTS_ERROR                          = 0x09
140SMP_INVALID_PARAMETERS_ERROR                         = 0x0A
141SMP_DHKEY_CHECK_FAILED_ERROR                         = 0x0B
142SMP_NUMERIC_COMPARISON_FAILED_ERROR                  = 0x0C
143SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR                 = 0x0D
144SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = 0x0E
145
146SMP_ERROR_NAMES = {
147    SMP_PASSKEY_ENTRY_FAILED_ERROR:                       'SMP_PASSKEY_ENTRY_FAILED_ERROR',
148    SMP_OOB_NOT_AVAILABLE_ERROR:                          'SMP_OOB_NOT_AVAILABLE_ERROR',
149    SMP_AUTHENTICATION_REQUIREMENTS_ERROR:                'SMP_AUTHENTICATION_REQUIREMENTS_ERROR',
150    SMP_CONFIRM_VALUE_FAILED_ERROR:                       'SMP_CONFIRM_VALUE_FAILED_ERROR',
151    SMP_PAIRING_NOT_SUPPORTED_ERROR:                      'SMP_PAIRING_NOT_SUPPORTED_ERROR',
152    SMP_ENCRYPTION_KEY_SIZE_ERROR:                        'SMP_ENCRYPTION_KEY_SIZE_ERROR',
153    SMP_COMMAND_NOT_SUPPORTED_ERROR:                      'SMP_COMMAND_NOT_SUPPORTED_ERROR',
154    SMP_UNSPECIFIED_REASON_ERROR:                         'SMP_UNSPECIFIED_REASON_ERROR',
155    SMP_REPEATED_ATTEMPTS_ERROR:                          'SMP_REPEATED_ATTEMPTS_ERROR',
156    SMP_INVALID_PARAMETERS_ERROR:                         'SMP_INVALID_PARAMETERS_ERROR',
157    SMP_DHKEY_CHECK_FAILED_ERROR:                         'SMP_DHKEY_CHECK_FAILED_ERROR',
158    SMP_NUMERIC_COMPARISON_FAILED_ERROR:                  'SMP_NUMERIC_COMPARISON_FAILED_ERROR',
159    SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR:                 'SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR',
160    SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR: 'SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR'
161}
162
163SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE   = 0
164SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE   = 1
165SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE    = 2
166SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE         = 3
167SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE = 4
168
169SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES = {
170    SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE:   'SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE',
171    SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE:   'SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE',
172    SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE:    'SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE',
173    SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE:         'SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE',
174    SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE'
175}
176
177# Bit flags for key distribution/generation
178SMP_ENC_KEY_DISTRIBUTION_FLAG  = 0b0001
179SMP_ID_KEY_DISTRIBUTION_FLAG   = 0b0010
180SMP_SIGN_KEY_DISTRIBUTION_FLAG = 0b0100
181SMP_LINK_KEY_DISTRIBUTION_FLAG = 0b1000
182
183# AuthReq fields
184SMP_BONDING_AUTHREQ  = 0b00000001
185SMP_MITM_AUTHREQ     = 0b00000100
186SMP_SC_AUTHREQ       = 0b00001000
187SMP_KEYPRESS_AUTHREQ = 0b00010000
188SMP_CT2_AUTHREQ      = 0b00100000
189
190# Crypto salt
191SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
192SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
193
194# fmt: on
195# pylint: enable=line-too-long
196# pylint: disable=invalid-name
197
198
199# -----------------------------------------------------------------------------
200# Utils
201# -----------------------------------------------------------------------------
202def error_name(error_code: int) -> str:
203    return name_or_number(SMP_ERROR_NAMES, error_code)
204
205
206# -----------------------------------------------------------------------------
207# Classes
208# -----------------------------------------------------------------------------
209class SMP_Command:
210    '''
211    See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL
212    '''
213
214    smp_classes: Dict[int, Type[SMP_Command]] = {}
215    fields: Any
216    code = 0
217    name = ''
218
219    @staticmethod
220    def from_bytes(pdu: bytes) -> "SMP_Command":
221        code = pdu[0]
222
223        cls = SMP_Command.smp_classes.get(code)
224        if cls is None:
225            instance = SMP_Command(pdu)
226            instance.name = SMP_Command.command_name(code)
227            instance.code = code
228            return instance
229        self = cls.__new__(cls)
230        SMP_Command.__init__(self, pdu)
231        if hasattr(self, 'fields'):
232            self.init_from_bytes(pdu, 1)
233        return self
234
235    @staticmethod
236    def command_name(code: int) -> str:
237        return name_or_number(SMP_COMMAND_NAMES, code)
238
239    @staticmethod
240    def auth_req_str(value: int) -> str:
241        bonding_flags = value & 3
242        mitm = (value >> 2) & 1
243        sc = (value >> 3) & 1
244        keypress = (value >> 4) & 1
245        ct2 = (value >> 5) & 1
246
247        return (
248            f'bonding_flags={bonding_flags}, '
249            f'MITM={mitm}, sc={sc}, keypress={keypress}, ct2={ct2}'
250        )
251
252    @staticmethod
253    def io_capability_name(io_capability: int) -> str:
254        return name_or_number(SMP_IO_CAPABILITY_NAMES, io_capability)
255
256    @staticmethod
257    def key_distribution_str(value: int) -> str:
258        key_types: List[str] = []
259        if value & SMP_ENC_KEY_DISTRIBUTION_FLAG:
260            key_types.append('ENC')
261        if value & SMP_ID_KEY_DISTRIBUTION_FLAG:
262            key_types.append('ID')
263        if value & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
264            key_types.append('SIGN')
265        if value & SMP_LINK_KEY_DISTRIBUTION_FLAG:
266            key_types.append('LINK')
267        return ','.join(key_types)
268
269    @staticmethod
270    def keypress_notification_type_name(notification_type: int) -> str:
271        return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type)
272
273    @staticmethod
274    def subclass(fields):
275        def inner(cls):
276            cls.name = cls.__name__.upper()
277            cls.code = key_with_value(SMP_COMMAND_NAMES, cls.name)
278            if cls.code is None:
279                raise KeyError(
280                    f'Command name {cls.name} not found in SMP_COMMAND_NAMES'
281                )
282            cls.fields = fields
283
284            # Register a factory for this class
285            SMP_Command.smp_classes[cls.code] = cls
286
287            return cls
288
289        return inner
290
291    def __init__(self, pdu: Optional[bytes] = None, **kwargs: Any) -> None:
292        if hasattr(self, 'fields') and kwargs:
293            HCI_Object.init_from_fields(self, self.fields, kwargs)
294        if pdu is None:
295            pdu = bytes([self.code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
296        self.pdu = pdu
297
298    def init_from_bytes(self, pdu: bytes, offset: int) -> None:
299        return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
300
301    def to_bytes(self):
302        return self.pdu
303
304    def __bytes__(self):
305        return self.to_bytes()
306
307    def __str__(self):
308        result = color(self.name, 'yellow')
309        if fields := getattr(self, 'fields', None):
310            result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, '  ')
311        else:
312            if len(self.pdu) > 1:
313                result += f': {self.pdu.hex()}'
314        return result
315
316
317# -----------------------------------------------------------------------------
318@SMP_Command.subclass(
319    [
320        ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
321        ('oob_data_flag', 1),
322        ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
323        ('maximum_encryption_key_size', 1),
324        (
325            'initiator_key_distribution',
326            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
327        ),
328        (
329            'responder_key_distribution',
330            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
331        ),
332    ]
333)
334class SMP_Pairing_Request_Command(SMP_Command):
335    '''
336    See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request
337    '''
338
339    io_capability: int
340    oob_data_flag: int
341    auth_req: int
342    maximum_encryption_key_size: int
343    initiator_key_distribution: int
344    responder_key_distribution: int
345
346
347# -----------------------------------------------------------------------------
348@SMP_Command.subclass(
349    [
350        ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
351        ('oob_data_flag', 1),
352        ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
353        ('maximum_encryption_key_size', 1),
354        (
355            'initiator_key_distribution',
356            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
357        ),
358        (
359            'responder_key_distribution',
360            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
361        ),
362    ]
363)
364class SMP_Pairing_Response_Command(SMP_Command):
365    '''
366    See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response
367    '''
368
369    io_capability: int
370    oob_data_flag: int
371    auth_req: int
372    maximum_encryption_key_size: int
373    initiator_key_distribution: int
374    responder_key_distribution: int
375
376
377# -----------------------------------------------------------------------------
378@SMP_Command.subclass([('confirm_value', 16)])
379class SMP_Pairing_Confirm_Command(SMP_Command):
380    '''
381    See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm
382    '''
383
384    confirm_value: bytes
385
386
387# -----------------------------------------------------------------------------
388@SMP_Command.subclass([('random_value', 16)])
389class SMP_Pairing_Random_Command(SMP_Command):
390    '''
391    See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random
392    '''
393
394    random_value: bytes
395
396
397# -----------------------------------------------------------------------------
398@SMP_Command.subclass([('reason', {'size': 1, 'mapper': error_name})])
399class SMP_Pairing_Failed_Command(SMP_Command):
400    '''
401    See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed
402    '''
403
404    reason: int
405
406
407# -----------------------------------------------------------------------------
408@SMP_Command.subclass([('public_key_x', 32), ('public_key_y', 32)])
409class SMP_Pairing_Public_Key_Command(SMP_Command):
410    '''
411    See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key
412    '''
413
414    public_key_x: bytes
415    public_key_y: bytes
416
417
418# -----------------------------------------------------------------------------
419@SMP_Command.subclass(
420    [
421        ('dhkey_check', 16),
422    ]
423)
424class SMP_Pairing_DHKey_Check_Command(SMP_Command):
425    '''
426    See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check
427    '''
428
429    dhkey_check: bytes
430
431
432# -----------------------------------------------------------------------------
433@SMP_Command.subclass(
434    [
435        (
436            'notification_type',
437            {'size': 1, 'mapper': SMP_Command.keypress_notification_type_name},
438        ),
439    ]
440)
441class SMP_Pairing_Keypress_Notification_Command(SMP_Command):
442    '''
443    See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification
444    '''
445
446    notification_type: int
447
448
449# -----------------------------------------------------------------------------
450@SMP_Command.subclass([('long_term_key', 16)])
451class SMP_Encryption_Information_Command(SMP_Command):
452    '''
453    See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information
454    '''
455
456    long_term_key: bytes
457
458
459# -----------------------------------------------------------------------------
460@SMP_Command.subclass([('ediv', 2), ('rand', 8)])
461class SMP_Master_Identification_Command(SMP_Command):
462    '''
463    See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification
464    '''
465
466    ediv: int
467    rand: bytes
468
469
470# -----------------------------------------------------------------------------
471@SMP_Command.subclass([('identity_resolving_key', 16)])
472class SMP_Identity_Information_Command(SMP_Command):
473    '''
474    See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information
475    '''
476
477    identity_resolving_key: bytes
478
479
480# -----------------------------------------------------------------------------
481@SMP_Command.subclass(
482    [
483        ('addr_type', Address.ADDRESS_TYPE_SPEC),
484        ('bd_addr', Address.parse_address_preceded_by_type),
485    ]
486)
487class SMP_Identity_Address_Information_Command(SMP_Command):
488    '''
489    See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information
490    '''
491
492    addr_type: int
493    bd_addr: Address
494
495
496# -----------------------------------------------------------------------------
497@SMP_Command.subclass([('signature_key', 16)])
498class SMP_Signing_Information_Command(SMP_Command):
499    '''
500    See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information
501    '''
502
503    signature_key: bytes
504
505
506# -----------------------------------------------------------------------------
507@SMP_Command.subclass(
508    [
509        ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
510    ]
511)
512class SMP_Security_Request_Command(SMP_Command):
513    '''
514    See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request
515    '''
516
517    auth_req: int
518
519
520# -----------------------------------------------------------------------------
521def smp_auth_req(bonding: bool, mitm: bool, sc: bool, keypress: bool, ct2: bool) -> int:
522    value = 0
523    if bonding:
524        value |= SMP_BONDING_AUTHREQ
525    if mitm:
526        value |= SMP_MITM_AUTHREQ
527    if sc:
528        value |= SMP_SC_AUTHREQ
529    if keypress:
530        value |= SMP_KEYPRESS_AUTHREQ
531    if ct2:
532        value |= SMP_CT2_AUTHREQ
533    return value
534
535
536# -----------------------------------------------------------------------------
537class AddressResolver:
538    def __init__(self, resolving_keys):
539        self.resolving_keys = resolving_keys
540
541    def resolve(self, address):
542        address_bytes = bytes(address)
543        hash_part = address_bytes[0:3]
544        prand = address_bytes[3:6]
545        for irk, resolved_address in self.resolving_keys:
546            local_hash = crypto.ah(irk, prand)
547            if local_hash == hash_part:
548                # Match!
549                if resolved_address.address_type == Address.PUBLIC_DEVICE_ADDRESS:
550                    resolved_address_type = Address.PUBLIC_IDENTITY_ADDRESS
551                else:
552                    resolved_address_type = Address.RANDOM_IDENTITY_ADDRESS
553                return Address(
554                    address=str(resolved_address), address_type=resolved_address_type
555                )
556
557        return None
558
559
560# -----------------------------------------------------------------------------
561class PairingMethod(enum.IntEnum):
562    JUST_WORKS = 0
563    NUMERIC_COMPARISON = 1
564    PASSKEY = 2
565    OOB = 3
566    CTKD_OVER_CLASSIC = 4
567
568
569# -----------------------------------------------------------------------------
570class OobContext:
571    """Cryptographic context for LE SC OOB pairing."""
572
573    ecc_key: crypto.EccKey
574    r: bytes
575
576    def __init__(
577        self, ecc_key: Optional[crypto.EccKey] = None, r: Optional[bytes] = None
578    ) -> None:
579        self.ecc_key = crypto.EccKey.generate() if ecc_key is None else ecc_key
580        self.r = crypto.r() if r is None else r
581
582    def share(self) -> OobSharedData:
583        pkx = self.ecc_key.x[::-1]
584        return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
585
586
587# -----------------------------------------------------------------------------
588class OobLegacyContext:
589    """Cryptographic context for LE Legacy OOB pairing."""
590
591    tk: bytes
592
593    def __init__(self, tk: Optional[bytes] = None) -> None:
594        self.tk = crypto.r() if tk is None else tk
595
596
597# -----------------------------------------------------------------------------
598@dataclass
599class OobSharedData:
600    """Shareable data for LE SC OOB pairing."""
601
602    c: bytes
603    r: bytes
604
605    def to_ad(self) -> AdvertisingData:
606        return AdvertisingData(
607            [
608                (AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE, self.c),
609                (AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE, self.r),
610            ]
611        )
612
613    def __str__(self) -> str:
614        return f'OOB(C={self.c.hex()}, R={self.r.hex()})'
615
616
617# -----------------------------------------------------------------------------
618class Session:
619    # I/O Capability to pairing method decision matrix
620    #
621    # See Bluetooth spec @ Vol 3, part H - Table 2.8: Mapping of IO Capabilities to Key
622    # Generation Method
623    #
624    # Map: initiator -> responder -> <method>
625    # where <method> may be a simple entry or a 2-element tuple, with the first element
626    # for legacy pairing and the second  for secure connections, when the two are
627    # different. Each entry is either a method name, or, for PASSKEY, a tuple:
628    # (method, initiator_displays, responder_displays)
629    # to specify if the initiator and responder should display (True) or input a code
630    # (False).
631    PAIRING_METHODS = {
632        SMP_DISPLAY_ONLY_IO_CAPABILITY: {
633            SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
634            SMP_DISPLAY_YES_NO_IO_CAPABILITY: PairingMethod.JUST_WORKS,
635            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
636            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
637            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
638        },
639        SMP_DISPLAY_YES_NO_IO_CAPABILITY: {
640            SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
641            SMP_DISPLAY_YES_NO_IO_CAPABILITY: (
642                PairingMethod.JUST_WORKS,
643                PairingMethod.NUMERIC_COMPARISON,
644            ),
645            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
646            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
647            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (
648                (PairingMethod.PASSKEY, True, False),
649                PairingMethod.NUMERIC_COMPARISON,
650            ),
651        },
652        SMP_KEYBOARD_ONLY_IO_CAPABILITY: {
653            SMP_DISPLAY_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
654            SMP_DISPLAY_YES_NO_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
655            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, False),
656            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
657            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
658        },
659        SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: {
660            SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
661            SMP_DISPLAY_YES_NO_IO_CAPABILITY: PairingMethod.JUST_WORKS,
662            SMP_KEYBOARD_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
663            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
664            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
665        },
666        SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: {
667            SMP_DISPLAY_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
668            SMP_DISPLAY_YES_NO_IO_CAPABILITY: (
669                (PairingMethod.PASSKEY, False, True),
670                PairingMethod.NUMERIC_COMPARISON,
671            ),
672            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
673            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
674            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (
675                (PairingMethod.PASSKEY, True, False),
676                PairingMethod.NUMERIC_COMPARISON,
677            ),
678        },
679    }
680
681    ea: bytes
682    eb: bytes
683    ltk: bytes
684    preq: bytes
685    pres: bytes
686    tk: bytes
687
688    def __init__(
689        self,
690        manager: Manager,
691        connection: Connection,
692        pairing_config: PairingConfig,
693        is_initiator: bool,
694    ) -> None:
695        self.manager = manager
696        self.connection = connection
697        self.stk = None
698        self.ltk_ediv = 0
699        self.ltk_rand = bytes(8)
700        self.link_key: Optional[bytes] = None
701        self.initiator_key_distribution: int = 0
702        self.responder_key_distribution: int = 0
703        self.peer_random_value: Optional[bytes] = None
704        self.peer_public_key_x: bytes = bytes(32)
705        self.peer_public_key_y = bytes(32)
706        self.peer_ltk = None
707        self.peer_ediv = None
708        self.peer_rand: Optional[bytes] = None
709        self.peer_identity_resolving_key = None
710        self.peer_bd_addr: Optional[Address] = None
711        self.peer_signature_key = None
712        self.peer_expected_distributions: List[Type[SMP_Command]] = []
713        self.dh_key = b''
714        self.confirm_value = None
715        self.passkey: Optional[int] = None
716        self.passkey_ready = asyncio.Event()
717        self.passkey_step = 0
718        self.passkey_display = False
719        self.pairing_method: PairingMethod = PairingMethod.JUST_WORKS
720        self.pairing_config = pairing_config
721        self.wait_before_continuing: Optional[asyncio.Future[None]] = None
722        self.completed = False
723        self.ctkd_task: Optional[Awaitable[None]] = None
724
725        # Decide if we're the initiator or the responder
726        self.is_initiator = is_initiator
727        self.is_responder = not self.is_initiator
728
729        # Listen for connection events
730        connection.on('disconnection', self.on_disconnection)
731        connection.on(
732            'connection_encryption_change', self.on_connection_encryption_change
733        )
734        connection.on(
735            'connection_encryption_key_refresh',
736            self.on_connection_encryption_key_refresh,
737        )
738
739        # Create a future that can be used to wait for the session to complete
740        if self.is_initiator:
741            self.pairing_result: Optional[asyncio.Future[None]] = (
742                asyncio.get_running_loop().create_future()
743            )
744        else:
745            self.pairing_result = None
746
747        # Key Distribution (default values before negotiation)
748        self.initiator_key_distribution = (
749            pairing_config.delegate.local_initiator_key_distribution
750        )
751        self.responder_key_distribution = (
752            pairing_config.delegate.local_responder_key_distribution
753        )
754
755        # Authentication Requirements Flags - Vol 3, Part H, Figure 3.3
756        self.bonding: bool = pairing_config.bonding
757        self.sc: bool = pairing_config.sc
758        self.mitm: bool = pairing_config.mitm
759        self.keypress = False
760        self.ct2: bool = False
761
762        # I/O Capabilities
763        self.io_capability = pairing_config.delegate.io_capability
764        self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
765
766        # OOB
767        self.oob_data_flag = (
768            1 if pairing_config.oob and pairing_config.oob.peer_data else 0
769        )
770
771        # Set up addresses
772        self_address = connection.self_resolvable_address or connection.self_address
773        peer_address = connection.peer_resolvable_address or connection.peer_address
774        logger.debug(
775            f"pairing with self_address={self_address}, peer_address={peer_address}"
776        )
777        if self.is_initiator:
778            self.ia = bytes(self_address)
779            self.iat = 1 if self_address.is_random else 0
780            self.ra = bytes(peer_address)
781            self.rat = 1 if peer_address.is_random else 0
782        else:
783            self.ra = bytes(self_address)
784            self.rat = 1 if self_address.is_random else 0
785            self.ia = bytes(peer_address)
786            self.iat = 1 if peer_address.is_random else 0
787
788        # Select the ECC key, TK and r initial value
789        if pairing_config.oob:
790            self.peer_oob_data = pairing_config.oob.peer_data
791            if pairing_config.sc:
792                if pairing_config.oob.our_context is None:
793                    raise InvalidArgumentError(
794                        "oob pairing config requires a context when sc is True"
795                    )
796                self.r = pairing_config.oob.our_context.r
797                self.ecc_key = pairing_config.oob.our_context.ecc_key
798                if pairing_config.oob.legacy_context is not None:
799                    self.tk = pairing_config.oob.legacy_context.tk
800            else:
801                if pairing_config.oob.legacy_context is None:
802                    raise InvalidArgumentError(
803                        "oob pairing config requires a legacy context when sc is False"
804                    )
805                self.r = bytes(16)
806                self.ecc_key = manager.ecc_key
807                self.tk = pairing_config.oob.legacy_context.tk
808        else:
809            self.peer_oob_data = None
810            self.r = bytes(16)
811            self.ecc_key = manager.ecc_key
812            self.tk = bytes(16)
813
814    @property
815    def pkx(self) -> Tuple[bytes, bytes]:
816        return (self.ecc_key.x[::-1], self.peer_public_key_x)
817
818    @property
819    def pka(self) -> bytes:
820        return self.pkx[0 if self.is_initiator else 1]
821
822    @property
823    def pkb(self) -> bytes:
824        return self.pkx[0 if self.is_responder else 1]
825
826    @property
827    def nx(self) -> Tuple[bytes, bytes]:
828        assert self.peer_random_value
829        return (self.r, self.peer_random_value)
830
831    @property
832    def na(self) -> bytes:
833        return self.nx[0 if self.is_initiator else 1]
834
835    @property
836    def nb(self) -> bytes:
837        return self.nx[0 if self.is_responder else 1]
838
839    @property
840    def auth_req(self) -> int:
841        return smp_auth_req(self.bonding, self.mitm, self.sc, self.keypress, self.ct2)
842
843    def get_long_term_key(self, rand: bytes, ediv: int) -> Optional[bytes]:
844        if not self.sc and not self.completed:
845            if rand == self.ltk_rand and ediv == self.ltk_ediv:
846                return self.stk
847        else:
848            return self.ltk
849
850        return None
851
852    def decide_pairing_method(
853        self,
854        auth_req: int,
855        initiator_io_capability: int,
856        responder_io_capability: int,
857    ) -> None:
858        if self.connection.transport == BT_BR_EDR_TRANSPORT:
859            self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC
860            return
861        if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0):
862            self.pairing_method = PairingMethod.JUST_WORKS
863            return
864
865        details = self.PAIRING_METHODS[initiator_io_capability][responder_io_capability]  # type: ignore[index]
866        if isinstance(details, tuple) and len(details) == 2:
867            # One entry for legacy pairing and one for secure connections
868            details = details[1 if self.sc else 0]
869        if isinstance(details, PairingMethod):
870            # Just a method ID
871            self.pairing_method = details
872        else:
873            # PASSKEY method, with a method ID and display/input flags
874            assert isinstance(details[0], PairingMethod)
875            self.pairing_method = details[0]
876            self.passkey_display = details[1 if self.is_initiator else 2]
877
878    def check_expected_value(
879        self, expected: bytes, received: bytes, error: int
880    ) -> bool:
881        logger.debug(f'expected={expected.hex()} got={received.hex()}')
882        if expected != received:
883            logger.info(color('pairing confirm/check mismatch', 'red'))
884            self.send_pairing_failed(error)
885            return False
886        return True
887
888    def prompt_user_for_confirmation(self, next_steps: Callable[[], None]) -> None:
889        async def prompt() -> None:
890            logger.debug('ask for confirmation')
891            try:
892                response = await self.pairing_config.delegate.confirm()
893                if response:
894                    next_steps()
895                    return
896            except Exception as error:
897                logger.warning(f'exception while confirm: {error}')
898
899            self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR)
900
901        self.connection.abort_on('disconnection', prompt())
902
903    def prompt_user_for_numeric_comparison(
904        self, code: int, next_steps: Callable[[], None]
905    ) -> None:
906        async def prompt() -> None:
907            logger.debug(f'verification code: {code}')
908            try:
909                response = await self.pairing_config.delegate.compare_numbers(
910                    code, digits=6
911                )
912                if response:
913                    next_steps()
914                    return
915            except Exception as error:
916                logger.warning(f'exception while prompting: {error}')
917
918            self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR)
919
920        self.connection.abort_on('disconnection', prompt())
921
922    def prompt_user_for_number(self, next_steps: Callable[[int], None]) -> None:
923        async def prompt() -> None:
924            logger.debug('prompting user for passkey')
925            try:
926                passkey = await self.pairing_config.delegate.get_number()
927                if passkey is None:
928                    logger.debug('Passkey request rejected')
929                    self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR)
930                    return
931                logger.debug(f'user input: {passkey}')
932                next_steps(passkey)
933            except Exception as error:
934                logger.warning(f'exception while prompting: {error}')
935                self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR)
936
937        self.connection.abort_on('disconnection', prompt())
938
939    def display_passkey(self) -> None:
940        # Generate random Passkey/PIN code
941        self.passkey = secrets.randbelow(1000000)
942        assert self.passkey is not None
943        logger.debug(f'Pairing PIN CODE: {self.passkey:06}')
944        self.passkey_ready.set()
945
946        # The value of TK is computed from the PIN code
947        if not self.sc:
948            self.tk = self.passkey.to_bytes(16, byteorder='little')
949            logger.debug(f'TK from passkey = {self.tk.hex()}')
950
951        try:
952            self.connection.abort_on(
953                'disconnection',
954                self.pairing_config.delegate.display_number(self.passkey, digits=6),
955            )
956        except Exception as error:
957            logger.warning(f'exception while displaying number: {error}')
958
959    def input_passkey(self, next_steps: Optional[Callable[[], None]] = None) -> None:
960        # Prompt the user for the passkey displayed on the peer
961        def after_input(passkey: int) -> None:
962            self.passkey = passkey
963
964            if not self.sc:
965                self.tk = passkey.to_bytes(16, byteorder='little')
966                logger.debug(f'TK from passkey = {self.tk.hex()}')
967
968            self.passkey_ready.set()
969
970            if next_steps is not None:
971                next_steps()
972
973        self.prompt_user_for_number(after_input)
974
975    def display_or_input_passkey(
976        self, next_steps: Optional[Callable[[], None]] = None
977    ) -> None:
978        if self.passkey_display:
979            self.display_passkey()
980            if next_steps is not None:
981                next_steps()
982        else:
983            self.input_passkey(next_steps)
984
985    def send_command(self, command: SMP_Command) -> None:
986        self.manager.send_command(self.connection, command)
987
988    def send_pairing_failed(self, error: int) -> None:
989        self.send_command(SMP_Pairing_Failed_Command(reason=error))
990        self.on_pairing_failure(error)
991
992    def send_pairing_request_command(self) -> None:
993        self.manager.on_session_start(self)
994
995        command = SMP_Pairing_Request_Command(
996            io_capability=self.io_capability,
997            oob_data_flag=self.oob_data_flag,
998            auth_req=self.auth_req,
999            maximum_encryption_key_size=16,
1000            initiator_key_distribution=self.initiator_key_distribution,
1001            responder_key_distribution=self.responder_key_distribution,
1002        )
1003        self.preq = bytes(command)
1004        self.send_command(command)
1005
1006    def send_pairing_response_command(self) -> None:
1007        response = SMP_Pairing_Response_Command(
1008            io_capability=self.io_capability,
1009            oob_data_flag=self.oob_data_flag,
1010            auth_req=self.auth_req,
1011            maximum_encryption_key_size=16,
1012            initiator_key_distribution=self.initiator_key_distribution,
1013            responder_key_distribution=self.responder_key_distribution,
1014        )
1015        self.pres = bytes(response)
1016        self.send_command(response)
1017
1018    def send_pairing_confirm_command(self) -> None:
1019
1020        if self.pairing_method != PairingMethod.OOB:
1021            self.r = crypto.r()
1022            logger.debug(f'generated random: {self.r.hex()}')
1023
1024        if self.sc:
1025
1026            async def next_steps() -> None:
1027                if self.pairing_method in (
1028                    PairingMethod.JUST_WORKS,
1029                    PairingMethod.NUMERIC_COMPARISON,
1030                ):
1031                    z = 0
1032                elif self.pairing_method == PairingMethod.PASSKEY:
1033                    # We need a passkey
1034                    await self.passkey_ready.wait()
1035                    assert self.passkey
1036
1037                    z = 0x80 + ((self.passkey >> self.passkey_step) & 1)
1038                else:
1039                    return
1040
1041                if self.is_initiator:
1042                    confirm_value = crypto.f4(self.pka, self.pkb, self.r, bytes([z]))
1043                else:
1044                    confirm_value = crypto.f4(self.pkb, self.pka, self.r, bytes([z]))
1045
1046                self.send_command(
1047                    SMP_Pairing_Confirm_Command(confirm_value=confirm_value)
1048                )
1049
1050            # Perform the next steps asynchronously in case we need to wait for input
1051            self.connection.abort_on('disconnection', next_steps())
1052        else:
1053            confirm_value = crypto.c1(
1054                self.tk,
1055                self.r,
1056                self.preq,
1057                self.pres,
1058                self.iat,
1059                self.rat,
1060                self.ia,
1061                self.ra,
1062            )
1063
1064            self.send_command(SMP_Pairing_Confirm_Command(confirm_value=confirm_value))
1065
1066    def send_pairing_random_command(self) -> None:
1067        self.send_command(SMP_Pairing_Random_Command(random_value=self.r))
1068
1069    def send_public_key_command(self) -> None:
1070        self.send_command(
1071            SMP_Pairing_Public_Key_Command(
1072                public_key_x=self.ecc_key.x[::-1],
1073                public_key_y=self.ecc_key.y[::-1],
1074            )
1075        )
1076
1077    def send_pairing_dhkey_check_command(self) -> None:
1078        self.send_command(
1079            SMP_Pairing_DHKey_Check_Command(
1080                dhkey_check=self.ea if self.is_initiator else self.eb
1081            )
1082        )
1083
1084    def send_identity_address_command(self) -> None:
1085        if self.pairing_config.identity_address_type == Address.PUBLIC_DEVICE_ADDRESS:
1086            identity_address = self.manager.device.public_address
1087        elif self.pairing_config.identity_address_type == Address.RANDOM_DEVICE_ADDRESS:
1088            identity_address = self.manager.device.static_address
1089        else:
1090            # No identity address type set. If the controller has a public address, it
1091            # will be more responsible to be the identity address.
1092            if self.manager.device.public_address != Address.ANY:
1093                logger.debug("No identity address type set, using PUBLIC")
1094                identity_address = self.manager.device.public_address
1095            else:
1096                logger.debug("No identity address type set, using RANDOM")
1097                identity_address = self.manager.device.static_address
1098        self.send_command(
1099            SMP_Identity_Address_Information_Command(
1100                addr_type=identity_address.address_type,
1101                bd_addr=identity_address,
1102            )
1103        )
1104
1105    def start_encryption(self, key: bytes) -> None:
1106        # We can now encrypt the connection with the short term key, so that we can
1107        # distribute the long term and/or other keys over an encrypted connection
1108        self.manager.device.host.send_command_sync(
1109            HCI_LE_Enable_Encryption_Command(
1110                connection_handle=self.connection.handle,
1111                random_number=bytes(8),
1112                encrypted_diversifier=0,
1113                long_term_key=key,
1114            )
1115        )
1116
1117    @classmethod
1118    def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes:
1119        '''Derives Long Term Key from Link Key.
1120
1121        Args:
1122            link_key: BR/EDR Link Key bytes in little-endian.
1123            ct2: whether ct2 is supported on both devices.
1124        Returns:
1125            LE Long Tern Key bytes in little-endian.
1126        '''
1127        ilk = (
1128            crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key)
1129            if ct2
1130            else crypto.h6(link_key, b'tmp2')
1131        )
1132        return crypto.h6(ilk, b'brle')
1133
1134    @classmethod
1135    def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes:
1136        '''Derives Link Key from Long Term Key.
1137
1138        Args:
1139            ltk: LE Long Term Key bytes in little-endian.
1140            ct2: whether ct2 is supported on both devices.
1141        Returns:
1142            BR/EDR Link Key bytes in little-endian.
1143        '''
1144        ilk = (
1145            crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk)
1146            if ct2
1147            else crypto.h6(ltk, b'tmp1')
1148        )
1149        return crypto.h6(ilk, b'lebr')
1150
1151    async def get_link_key_and_derive_ltk(self) -> None:
1152        '''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.'''
1153        self.link_key = await self.manager.device.get_link_key(
1154            self.connection.peer_address
1155        )
1156        if self.link_key is None:
1157            logging.warning(
1158                'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
1159            )
1160            self.send_pairing_failed(
1161                SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR
1162            )
1163        else:
1164            self.ltk = self.derive_ltk(self.link_key, self.ct2)
1165
1166    def distribute_keys(self) -> None:
1167        # Distribute the keys as required
1168        if self.is_initiator:
1169            # CTKD: Derive LTK from LinkKey
1170            if (
1171                self.connection.transport == BT_BR_EDR_TRANSPORT
1172                and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
1173            ):
1174                self.ctkd_task = self.connection.abort_on(
1175                    'disconnection', self.get_link_key_and_derive_ltk()
1176                )
1177            elif not self.sc:
1178                # Distribute the LTK, EDIV and RAND
1179                if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
1180                    self.send_command(
1181                        SMP_Encryption_Information_Command(long_term_key=self.ltk)
1182                    )
1183                    self.send_command(
1184                        SMP_Master_Identification_Command(
1185                            ediv=self.ltk_ediv, rand=self.ltk_rand
1186                        )
1187                    )
1188
1189            # Distribute IRK & BD ADDR
1190            if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
1191                self.send_command(
1192                    SMP_Identity_Information_Command(
1193                        identity_resolving_key=self.manager.device.irk
1194                    )
1195                )
1196                self.send_identity_address_command()
1197
1198            # Distribute CSRK
1199            csrk = bytes(16)  # FIXME: testing
1200            if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
1201                self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
1202
1203            # CTKD, calculate BR/EDR link key
1204            if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
1205                self.link_key = self.derive_link_key(self.ltk, self.ct2)
1206
1207        else:
1208            # CTKD: Derive LTK from LinkKey
1209            if (
1210                self.connection.transport == BT_BR_EDR_TRANSPORT
1211                and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
1212            ):
1213                self.ctkd_task = self.connection.abort_on(
1214                    'disconnection', self.get_link_key_and_derive_ltk()
1215                )
1216            # Distribute the LTK, EDIV and RAND
1217            elif not self.sc:
1218                if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
1219                    self.send_command(
1220                        SMP_Encryption_Information_Command(long_term_key=self.ltk)
1221                    )
1222                    self.send_command(
1223                        SMP_Master_Identification_Command(
1224                            ediv=self.ltk_ediv, rand=self.ltk_rand
1225                        )
1226                    )
1227
1228            # Distribute IRK & BD ADDR
1229            if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
1230                self.send_command(
1231                    SMP_Identity_Information_Command(
1232                        identity_resolving_key=self.manager.device.irk
1233                    )
1234                )
1235                self.send_identity_address_command()
1236
1237            # Distribute CSRK
1238            csrk = bytes(16)  # FIXME: testing
1239            if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
1240                self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
1241
1242            # CTKD, calculate BR/EDR link key
1243            if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
1244                self.link_key = self.derive_link_key(self.ltk, self.ct2)
1245
1246    def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None:
1247        # Set our expectations for what to wait for in the key distribution phase
1248        self.peer_expected_distributions = []
1249        if not self.sc and self.connection.transport == BT_LE_TRANSPORT:
1250            if key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0:
1251                self.peer_expected_distributions.append(
1252                    SMP_Encryption_Information_Command
1253                )
1254                self.peer_expected_distributions.append(
1255                    SMP_Master_Identification_Command
1256                )
1257        if key_distribution_flags & SMP_ID_KEY_DISTRIBUTION_FLAG != 0:
1258            self.peer_expected_distributions.append(SMP_Identity_Information_Command)
1259            self.peer_expected_distributions.append(
1260                SMP_Identity_Address_Information_Command
1261            )
1262        if key_distribution_flags & SMP_SIGN_KEY_DISTRIBUTION_FLAG != 0:
1263            self.peer_expected_distributions.append(SMP_Signing_Information_Command)
1264        logger.debug(
1265            'expecting distributions: '
1266            f'{[c.__name__ for c in self.peer_expected_distributions]}'
1267        )
1268
1269    def check_key_distribution(self, command_class: Type[SMP_Command]) -> None:
1270        # First, check that the connection is encrypted
1271        if not self.connection.is_encrypted:
1272            logger.warning(
1273                color('received key distribution on a non-encrypted connection', 'red')
1274            )
1275            self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
1276            return
1277
1278        # Check that this command class is expected
1279        if command_class in self.peer_expected_distributions:
1280            self.peer_expected_distributions.remove(command_class)
1281            logger.debug(
1282                'remaining distributions: '
1283                f'{[c.__name__ for c in self.peer_expected_distributions]}'
1284            )
1285            if not self.peer_expected_distributions:
1286                self.on_peer_key_distribution_complete()
1287        else:
1288            logger.warning(
1289                color(
1290                    '!!! unexpected key distribution command: '
1291                    f'{command_class.__name__}',
1292                    'red',
1293                )
1294            )
1295            self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
1296
1297    async def pair(self) -> None:
1298        # Start pairing as an initiator
1299        # TODO: check that this session isn't already active
1300
1301        # Send the pairing request to start the process
1302        self.send_pairing_request_command()
1303
1304        # Wait for the pairing process to finish
1305        assert self.pairing_result
1306        await self.connection.abort_on('disconnection', self.pairing_result)
1307
1308    def on_disconnection(self, _: int) -> None:
1309        self.connection.remove_listener('disconnection', self.on_disconnection)
1310        self.connection.remove_listener(
1311            'connection_encryption_change', self.on_connection_encryption_change
1312        )
1313        self.connection.remove_listener(
1314            'connection_encryption_key_refresh',
1315            self.on_connection_encryption_key_refresh,
1316        )
1317        self.manager.on_session_end(self)
1318
1319    def on_peer_key_distribution_complete(self) -> None:
1320        # The initiator can now send its keys
1321        if self.is_initiator:
1322            self.distribute_keys()
1323
1324        self.connection.abort_on('disconnection', self.on_pairing())
1325
1326    def on_connection_encryption_change(self) -> None:
1327        if self.connection.is_encrypted:
1328            if self.is_responder:
1329                # The responder distributes its keys first, the initiator later
1330                self.distribute_keys()
1331
1332            # If we're not expecting key distributions from the peer, we're done
1333            if not self.peer_expected_distributions:
1334                self.on_peer_key_distribution_complete()
1335
1336    def on_connection_encryption_key_refresh(self) -> None:
1337        # Do as if the connection had just been encrypted
1338        self.on_connection_encryption_change()
1339
1340    async def on_pairing(self) -> None:
1341        logger.debug('pairing complete')
1342
1343        if self.completed:
1344            return
1345
1346        self.completed = True
1347
1348        if self.pairing_result is not None and not self.pairing_result.done():
1349            self.pairing_result.set_result(None)
1350
1351        # Use the peer address from the pairing protocol or the connection
1352        if self.peer_bd_addr is not None:
1353            peer_address = self.peer_bd_addr
1354        else:
1355            peer_address = self.connection.peer_address
1356
1357        # Wait for link key fetch and key derivation
1358        if self.ctkd_task is not None:
1359            await self.ctkd_task
1360            self.ctkd_task = None
1361
1362        # Create an object to hold the keys
1363        keys = PairingKeys()
1364        keys.address_type = peer_address.address_type
1365        authenticated = self.pairing_method != PairingMethod.JUST_WORKS
1366        if self.sc or self.connection.transport == BT_BR_EDR_TRANSPORT:
1367            keys.ltk = PairingKeys.Key(value=self.ltk, authenticated=authenticated)
1368        else:
1369            our_ltk_key = PairingKeys.Key(
1370                value=self.ltk,
1371                authenticated=authenticated,
1372                ediv=self.ltk_ediv,
1373                rand=self.ltk_rand,
1374            )
1375            peer_ltk_key = PairingKeys.Key(
1376                value=self.peer_ltk,
1377                authenticated=authenticated,
1378                ediv=self.peer_ediv,
1379                rand=self.peer_rand,
1380            )
1381            if self.is_initiator:
1382                keys.ltk_central = peer_ltk_key
1383                keys.ltk_peripheral = our_ltk_key
1384            else:
1385                keys.ltk_central = our_ltk_key
1386                keys.ltk_peripheral = peer_ltk_key
1387        if self.peer_identity_resolving_key is not None:
1388            keys.irk = PairingKeys.Key(
1389                value=self.peer_identity_resolving_key, authenticated=authenticated
1390            )
1391        if self.peer_signature_key is not None:
1392            keys.csrk = PairingKeys.Key(
1393                value=self.peer_signature_key, authenticated=authenticated
1394            )
1395        if self.link_key is not None:
1396            keys.link_key = PairingKeys.Key(
1397                value=self.link_key, authenticated=authenticated
1398            )
1399        await self.manager.on_pairing(self, peer_address, keys)
1400
1401    def on_pairing_failure(self, reason: int) -> None:
1402        logger.warning(f'pairing failure ({error_name(reason)})')
1403
1404        if self.completed:
1405            return
1406
1407        self.completed = True
1408
1409        error = ProtocolError(reason, 'smp', error_name(reason))
1410        if self.pairing_result is not None and not self.pairing_result.done():
1411            self.pairing_result.set_exception(error)
1412        self.manager.on_pairing_failure(self, reason)
1413
1414    def on_smp_command(self, command: SMP_Command) -> None:
1415        # Find the handler method
1416        handler_name = f'on_{command.name.lower()}'
1417        handler = getattr(self, handler_name, None)
1418        if handler is not None:
1419            try:
1420                handler(command)
1421            except Exception as error:
1422                logger.exception(f'{color("!!! Exception in handler:", "red")} {error}')
1423                response = SMP_Pairing_Failed_Command(
1424                    reason=SMP_UNSPECIFIED_REASON_ERROR
1425                )
1426                self.send_command(response)
1427        else:
1428            logger.error(color('SMP command not handled???', 'red'))
1429
1430    def on_smp_pairing_request_command(
1431        self, command: SMP_Pairing_Request_Command
1432    ) -> None:
1433        self.connection.abort_on(
1434            'disconnection', self.on_smp_pairing_request_command_async(command)
1435        )
1436
1437    async def on_smp_pairing_request_command_async(
1438        self, command: SMP_Pairing_Request_Command
1439    ) -> None:
1440        # Check if the request should proceed
1441        try:
1442            accepted = await self.pairing_config.delegate.accept()
1443        except Exception as error:
1444            logger.warning(f'exception while accepting: {error}')
1445            accepted = False
1446        if not accepted:
1447            logger.debug('pairing rejected by delegate')
1448            self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR)
1449            return
1450
1451        # Save the request
1452        self.preq = bytes(command)
1453
1454        # Bonding and SC require both sides to request/support it
1455        self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
1456        self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
1457        self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
1458
1459        # Infer the pairing method
1460        if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
1461            not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
1462        ):
1463            # Use OOB
1464            self.pairing_method = PairingMethod.OOB
1465            if not self.sc and self.tk is None:
1466                # For legacy OOB, TK is required.
1467                logger.warning("legacy OOB without TK")
1468                self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
1469                return
1470            if command.oob_data_flag == 0:
1471                # The peer doesn't have OOB data, use r=0
1472                self.r = bytes(16)
1473        else:
1474            # Decide which pairing method to use from the IO capability
1475            self.decide_pairing_method(
1476                command.auth_req,
1477                command.io_capability,
1478                self.io_capability,
1479            )
1480
1481        logger.debug(f'pairing method: {self.pairing_method.name}')
1482
1483        # Key distribution
1484        (
1485            self.initiator_key_distribution,
1486            self.responder_key_distribution,
1487        ) = await self.pairing_config.delegate.key_distribution_response(
1488            command.initiator_key_distribution, command.responder_key_distribution
1489        )
1490        self.compute_peer_expected_distributions(self.initiator_key_distribution)
1491
1492        # The pairing is now starting
1493        self.manager.on_session_start(self)
1494
1495        # Display a passkey if we need to
1496        if not self.sc:
1497            if self.pairing_method == PairingMethod.PASSKEY and self.passkey_display:
1498                self.display_passkey()
1499
1500        # Respond
1501        self.send_pairing_response_command()
1502
1503        # Vol 3, Part C, 5.2.2.1.3
1504        # CTKD over BR/EDR should happen after the connection has been encrypted,
1505        # so when receiving pairing requests, responder should start distributing keys
1506        if (
1507            self.connection.transport == BT_BR_EDR_TRANSPORT
1508            and self.connection.is_encrypted
1509            and self.is_responder
1510            and accepted
1511        ):
1512            self.distribute_keys()
1513
1514    def on_smp_pairing_response_command(
1515        self, command: SMP_Pairing_Response_Command
1516    ) -> None:
1517        if self.is_responder:
1518            logger.warning(color('received pairing response as a responder', 'red'))
1519            return
1520
1521        # Save the response
1522        self.pres = bytes(command)
1523        self.peer_io_capability = command.io_capability
1524
1525        # Bonding and SC require both sides to request/support it
1526        self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
1527        self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
1528
1529        # Infer the pairing method
1530        if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
1531            not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
1532        ):
1533            # Use OOB
1534            self.pairing_method = PairingMethod.OOB
1535            if not self.sc and self.tk is None:
1536                # For legacy OOB, TK is required.
1537                logger.warning("legacy OOB without TK")
1538                self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
1539                return
1540            if command.oob_data_flag == 0:
1541                # The peer doesn't have OOB data, use r=0
1542                self.r = bytes(16)
1543        else:
1544            # Decide which pairing method to use from the IO capability
1545            self.decide_pairing_method(
1546                command.auth_req, self.io_capability, command.io_capability
1547            )
1548
1549        logger.debug(f'pairing method: {self.pairing_method.name}')
1550
1551        # Key distribution
1552        if (
1553            command.initiator_key_distribution & ~self.initiator_key_distribution != 0
1554        ) or (
1555            command.responder_key_distribution & ~self.responder_key_distribution != 0
1556        ):
1557            # The response isn't a subset of the request
1558            self.send_pairing_failed(SMP_INVALID_PARAMETERS_ERROR)
1559            return
1560        self.initiator_key_distribution = command.initiator_key_distribution
1561        self.responder_key_distribution = command.responder_key_distribution
1562        self.compute_peer_expected_distributions(self.responder_key_distribution)
1563
1564        # Start phase 2
1565        if self.pairing_method == PairingMethod.CTKD_OVER_CLASSIC:
1566            # Authentication is already done in SMP, so remote shall start keys distribution immediately
1567            return
1568        elif self.sc:
1569            if self.pairing_method == PairingMethod.PASSKEY:
1570                self.display_or_input_passkey()
1571
1572            self.send_public_key_command()
1573        else:
1574            if self.pairing_method == PairingMethod.PASSKEY:
1575                self.display_or_input_passkey(self.send_pairing_confirm_command)
1576            else:
1577                self.send_pairing_confirm_command()
1578
1579    def on_smp_pairing_confirm_command_legacy(
1580        self, _: SMP_Pairing_Confirm_Command
1581    ) -> None:
1582        if self.is_initiator:
1583            self.send_pairing_random_command()
1584        else:
1585            # If the method is PASSKEY, now is the time to input the code
1586            if (
1587                self.pairing_method == PairingMethod.PASSKEY
1588                and not self.passkey_display
1589            ):
1590                self.input_passkey(self.send_pairing_confirm_command)
1591            else:
1592                self.send_pairing_confirm_command()
1593
1594    def on_smp_pairing_confirm_command_secure_connections(
1595        self, _: SMP_Pairing_Confirm_Command
1596    ) -> None:
1597        if self.pairing_method in (
1598            PairingMethod.JUST_WORKS,
1599            PairingMethod.NUMERIC_COMPARISON,
1600        ):
1601            if self.is_initiator:
1602                self.r = crypto.r()
1603                self.send_pairing_random_command()
1604        elif self.pairing_method == PairingMethod.PASSKEY:
1605            if self.is_initiator:
1606                self.send_pairing_random_command()
1607            else:
1608                self.send_pairing_confirm_command()
1609
1610    def on_smp_pairing_confirm_command(
1611        self, command: SMP_Pairing_Confirm_Command
1612    ) -> None:
1613        self.confirm_value = command.confirm_value
1614        if self.sc:
1615            self.on_smp_pairing_confirm_command_secure_connections(command)
1616        else:
1617            self.on_smp_pairing_confirm_command_legacy(command)
1618
1619    def on_smp_pairing_random_command_legacy(
1620        self, command: SMP_Pairing_Random_Command
1621    ) -> None:
1622        # Check that the confirmation values match
1623        confirm_verifier = crypto.c1(
1624            self.tk,
1625            command.random_value,
1626            self.preq,
1627            self.pres,
1628            self.iat,
1629            self.rat,
1630            self.ia,
1631            self.ra,
1632        )
1633        assert self.confirm_value
1634        if not self.check_expected_value(
1635            self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1636        ):
1637            return
1638
1639        # Compute STK
1640        if self.is_initiator:
1641            mrand = self.r
1642            srand = command.random_value
1643        else:
1644            srand = self.r
1645            mrand = command.random_value
1646        self.stk = crypto.s1(self.tk, srand, mrand)
1647        logger.debug(f'STK = {self.stk.hex()}')
1648
1649        # Generate LTK
1650        self.ltk = crypto.r()
1651
1652        if self.is_initiator:
1653            self.start_encryption(self.stk)
1654        else:
1655            self.send_pairing_random_command()
1656
1657    def on_smp_pairing_random_command_secure_connections(
1658        self, command: SMP_Pairing_Random_Command
1659    ) -> None:
1660        if self.pairing_method == PairingMethod.PASSKEY and self.passkey is None:
1661            logger.warning('no passkey entered, ignoring command')
1662            return
1663
1664        # pylint: disable=too-many-return-statements
1665        if self.is_initiator:
1666            if self.pairing_method in (
1667                PairingMethod.JUST_WORKS,
1668                PairingMethod.NUMERIC_COMPARISON,
1669            ):
1670                assert self.confirm_value
1671                # Check that the random value matches what was committed to earlier
1672                confirm_verifier = crypto.f4(
1673                    self.pkb, self.pka, command.random_value, bytes([0])
1674                )
1675                if not self.check_expected_value(
1676                    self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1677                ):
1678                    return
1679            elif self.pairing_method == PairingMethod.PASSKEY:
1680                assert self.passkey and self.confirm_value
1681                # Check that the random value matches what was committed to earlier
1682                confirm_verifier = crypto.f4(
1683                    self.pkb,
1684                    self.pka,
1685                    command.random_value,
1686                    bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]),
1687                )
1688                if not self.check_expected_value(
1689                    self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1690                ):
1691                    return
1692
1693                # Move on to the next iteration
1694                self.passkey_step += 1
1695                logger.debug(f'passkey finished step {self.passkey_step} of 20')
1696                if self.passkey_step < 20:
1697                    self.send_pairing_confirm_command()
1698                    return
1699            elif self.pairing_method != PairingMethod.OOB:
1700                return
1701        else:
1702            if self.pairing_method in (
1703                PairingMethod.JUST_WORKS,
1704                PairingMethod.NUMERIC_COMPARISON,
1705                PairingMethod.OOB,
1706            ):
1707                self.send_pairing_random_command()
1708            elif self.pairing_method == PairingMethod.PASSKEY:
1709                assert self.passkey and self.confirm_value
1710                # Check that the random value matches what was committed to earlier
1711                confirm_verifier = crypto.f4(
1712                    self.pka,
1713                    self.pkb,
1714                    command.random_value,
1715                    bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]),
1716                )
1717                if not self.check_expected_value(
1718                    self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1719                ):
1720                    return
1721
1722                self.send_pairing_random_command()
1723
1724                # Move on to the next iteration
1725                self.passkey_step += 1
1726                logger.debug(f'passkey finished step {self.passkey_step} of 20')
1727                if self.passkey_step < 20:
1728                    self.r = crypto.r()
1729                    return
1730            else:
1731                return
1732
1733        # Compute the MacKey and LTK
1734        a = self.ia + bytes([self.iat])
1735        b = self.ra + bytes([self.rat])
1736        (mac_key, self.ltk) = crypto.f5(self.dh_key, self.na, self.nb, a, b)
1737
1738        # Compute the DH Key checks
1739        if self.pairing_method in (
1740            PairingMethod.JUST_WORKS,
1741            PairingMethod.NUMERIC_COMPARISON,
1742        ):
1743            ra = bytes(16)
1744            rb = ra
1745        elif self.pairing_method == PairingMethod.PASSKEY:
1746            assert self.passkey
1747            ra = self.passkey.to_bytes(16, byteorder='little')
1748            rb = ra
1749        elif self.pairing_method == PairingMethod.OOB:
1750            if self.is_initiator:
1751                if self.peer_oob_data:
1752                    rb = self.peer_oob_data.r
1753                    ra = self.r
1754                else:
1755                    rb = bytes(16)
1756                    ra = self.r
1757            else:
1758                if self.peer_oob_data:
1759                    ra = self.peer_oob_data.r
1760                    rb = self.r
1761                else:
1762                    ra = bytes(16)
1763                    rb = self.r
1764
1765        else:
1766            return
1767
1768        assert self.preq and self.pres
1769        io_cap_a = self.preq[1:4]
1770        io_cap_b = self.pres[1:4]
1771        self.ea = crypto.f6(mac_key, self.na, self.nb, rb, io_cap_a, a, b)
1772        self.eb = crypto.f6(mac_key, self.nb, self.na, ra, io_cap_b, b, a)
1773
1774        # Next steps to be performed after possible user confirmation
1775        def next_steps() -> None:
1776            # The initiator sends the DH Key check to the responder
1777            if self.is_initiator:
1778                self.send_pairing_dhkey_check_command()
1779            else:
1780                if self.wait_before_continuing:
1781                    self.wait_before_continuing.set_result(None)
1782
1783        # Prompt the user for confirmation if needed
1784        if self.pairing_method in (
1785            PairingMethod.JUST_WORKS,
1786            PairingMethod.NUMERIC_COMPARISON,
1787        ):
1788            # Compute the 6-digit code
1789            code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000
1790
1791            # Ask for user confirmation
1792            self.wait_before_continuing = asyncio.get_running_loop().create_future()
1793            if self.pairing_method == PairingMethod.JUST_WORKS:
1794                self.prompt_user_for_confirmation(next_steps)
1795            else:
1796                self.prompt_user_for_numeric_comparison(code, next_steps)
1797        else:
1798            next_steps()
1799
1800    def on_smp_pairing_random_command(
1801        self, command: SMP_Pairing_Random_Command
1802    ) -> None:
1803        self.peer_random_value = command.random_value
1804        if self.sc:
1805            self.on_smp_pairing_random_command_secure_connections(command)
1806        else:
1807            self.on_smp_pairing_random_command_legacy(command)
1808
1809    def on_smp_pairing_public_key_command(
1810        self, command: SMP_Pairing_Public_Key_Command
1811    ) -> None:
1812        # Store the public key so that we can compute the confirmation value later
1813        self.peer_public_key_x = command.public_key_x
1814        self.peer_public_key_y = command.public_key_y
1815
1816        # Compute the DH key
1817        self.dh_key = self.ecc_key.dh(
1818            command.public_key_x[::-1],
1819            command.public_key_y[::-1],
1820        )[::-1]
1821        logger.debug(f'DH key: {self.dh_key.hex()}')
1822
1823        if self.pairing_method == PairingMethod.OOB:
1824            # Check against shared OOB data
1825            if self.peer_oob_data:
1826                confirm_verifier = crypto.f4(
1827                    self.peer_public_key_x,
1828                    self.peer_public_key_x,
1829                    self.peer_oob_data.r,
1830                    bytes(1),
1831                )
1832                if not self.check_expected_value(
1833                    self.peer_oob_data.c,
1834                    confirm_verifier,
1835                    SMP_CONFIRM_VALUE_FAILED_ERROR,
1836                ):
1837                    return
1838
1839        if self.is_initiator:
1840            if self.pairing_method == PairingMethod.OOB:
1841                self.send_pairing_random_command()
1842            else:
1843                self.send_pairing_confirm_command()
1844        else:
1845            if self.pairing_method == PairingMethod.PASSKEY:
1846                self.display_or_input_passkey()
1847
1848            # Send our public key back to the initiator
1849            self.send_public_key_command()
1850
1851            if self.pairing_method in (
1852                PairingMethod.JUST_WORKS,
1853                PairingMethod.NUMERIC_COMPARISON,
1854                PairingMethod.OOB,
1855            ):
1856                # We can now send the confirmation value
1857                self.send_pairing_confirm_command()
1858
1859    def on_smp_pairing_dhkey_check_command(
1860        self, command: SMP_Pairing_DHKey_Check_Command
1861    ) -> None:
1862        # Check that what we received matches what we computed earlier
1863        expected = self.eb if self.is_initiator else self.ea
1864        assert expected
1865        if not self.check_expected_value(
1866            expected, command.dhkey_check, SMP_DHKEY_CHECK_FAILED_ERROR
1867        ):
1868            return
1869
1870        if self.is_responder:
1871            if self.wait_before_continuing is not None:
1872
1873                async def next_steps() -> None:
1874                    assert self.wait_before_continuing
1875                    await self.wait_before_continuing
1876                    self.wait_before_continuing = None
1877                    self.send_pairing_dhkey_check_command()
1878
1879                self.connection.abort_on('disconnection', next_steps())
1880            else:
1881                self.send_pairing_dhkey_check_command()
1882        else:
1883            self.start_encryption(self.ltk)
1884
1885    def on_smp_pairing_failed_command(
1886        self, command: SMP_Pairing_Failed_Command
1887    ) -> None:
1888        self.on_pairing_failure(command.reason)
1889
1890    def on_smp_encryption_information_command(
1891        self, command: SMP_Encryption_Information_Command
1892    ) -> None:
1893        self.peer_ltk = command.long_term_key
1894        self.check_key_distribution(SMP_Encryption_Information_Command)
1895
1896    def on_smp_master_identification_command(
1897        self, command: SMP_Master_Identification_Command
1898    ) -> None:
1899        self.peer_ediv = command.ediv
1900        self.peer_rand = command.rand
1901        self.check_key_distribution(SMP_Master_Identification_Command)
1902
1903    def on_smp_identity_information_command(
1904        self, command: SMP_Identity_Information_Command
1905    ) -> None:
1906        self.peer_identity_resolving_key = command.identity_resolving_key
1907        self.check_key_distribution(SMP_Identity_Information_Command)
1908
1909    def on_smp_identity_address_information_command(
1910        self, command: SMP_Identity_Address_Information_Command
1911    ) -> None:
1912        self.peer_bd_addr = command.bd_addr
1913        self.check_key_distribution(SMP_Identity_Address_Information_Command)
1914
1915    def on_smp_signing_information_command(
1916        self, command: SMP_Signing_Information_Command
1917    ) -> None:
1918        self.peer_signature_key = command.signature_key
1919        self.check_key_distribution(SMP_Signing_Information_Command)
1920
1921
1922# -----------------------------------------------------------------------------
1923class Manager(EventEmitter):
1924    '''
1925    Implements the Initiator and Responder roles of the Security Manager Protocol
1926    '''
1927
1928    device: Device
1929    sessions: Dict[int, Session]
1930    pairing_config_factory: Callable[[Connection], PairingConfig]
1931    session_proxy: Type[Session]
1932    _ecc_key: Optional[crypto.EccKey]
1933
1934    def __init__(
1935        self,
1936        device: Device,
1937        pairing_config_factory: Callable[[Connection], PairingConfig],
1938    ) -> None:
1939        super().__init__()
1940        self.device = device
1941        self.sessions = {}
1942        self._ecc_key = None
1943        self.pairing_config_factory = pairing_config_factory
1944        self.session_proxy = Session
1945
1946    def send_command(self, connection: Connection, command: SMP_Command) -> None:
1947        logger.debug(
1948            f'>>> Sending SMP Command on connection [0x{connection.handle:04X}] '
1949            f'{connection.peer_address}: {command}'
1950        )
1951        cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
1952        connection.send_l2cap_pdu(cid, command.to_bytes())
1953
1954    def on_smp_security_request_command(
1955        self, connection: Connection, request: SMP_Security_Request_Command
1956    ) -> None:
1957        connection.emit('security_request', request.auth_req)
1958
1959    def on_smp_pdu(self, connection: Connection, pdu: bytes) -> None:
1960        # Parse the L2CAP payload into an SMP Command object
1961        command = SMP_Command.from_bytes(pdu)
1962        logger.debug(
1963            f'<<< Received SMP Command on connection [0x{connection.handle:04X}] '
1964            f'{connection.peer_address}: {command}'
1965        )
1966
1967        # Security request is more than just pairing, so let applications handle them
1968        if command.code == SMP_SECURITY_REQUEST_COMMAND:
1969            self.on_smp_security_request_command(
1970                connection, cast(SMP_Security_Request_Command, command)
1971            )
1972            return
1973
1974        # Look for a session with this connection, and create one if none exists
1975        if not (session := self.sessions.get(connection.handle)):
1976            if connection.role == BT_CENTRAL_ROLE:
1977                logger.warning('Remote starts pairing as Peripheral!')
1978            pairing_config = self.pairing_config_factory(connection)
1979            session = self.session_proxy(
1980                self, connection, pairing_config, is_initiator=False
1981            )
1982            self.sessions[connection.handle] = session
1983
1984        # Delegate the handling of the command to the session
1985        session.on_smp_command(command)
1986
1987    @property
1988    def ecc_key(self) -> crypto.EccKey:
1989        if self._ecc_key is None:
1990            self._ecc_key = crypto.EccKey.generate()
1991        assert self._ecc_key
1992        return self._ecc_key
1993
1994    async def pair(self, connection: Connection) -> None:
1995        # TODO: check if there's already a session for this connection
1996        if connection.role != BT_CENTRAL_ROLE:
1997            logger.warning('Start pairing as Peripheral!')
1998        pairing_config = self.pairing_config_factory(connection)
1999        session = self.session_proxy(
2000            self, connection, pairing_config, is_initiator=True
2001        )
2002        self.sessions[connection.handle] = session
2003        return await session.pair()
2004
2005    def request_pairing(self, connection: Connection) -> None:
2006        pairing_config = self.pairing_config_factory(connection)
2007        if pairing_config:
2008            auth_req = smp_auth_req(
2009                pairing_config.bonding,
2010                pairing_config.mitm,
2011                pairing_config.sc,
2012                False,
2013                False,
2014            )
2015        else:
2016            auth_req = 0
2017        self.send_command(connection, SMP_Security_Request_Command(auth_req=auth_req))
2018
2019    def on_session_start(self, session: Session) -> None:
2020        self.device.on_pairing_start(session.connection)
2021
2022    async def on_pairing(
2023        self, session: Session, identity_address: Optional[Address], keys: PairingKeys
2024    ) -> None:
2025        # Store the keys in the key store
2026        if self.device.keystore and identity_address is not None:
2027            # Make sure on_pairing emits after key update.
2028            await self.device.update_keys(str(identity_address), keys)
2029        # Notify the device
2030        self.device.on_pairing(session.connection, identity_address, keys, session.sc)
2031
2032    def on_pairing_failure(self, session: Session, reason: int) -> None:
2033        self.device.on_pairing_failure(session.connection, reason)
2034
2035    def on_session_end(self, session: Session) -> None:
2036        logger.debug(f'session end for connection 0x{session.connection.handle:04X}')
2037        if session.connection.handle in self.sessions:
2038            del self.sessions[session.connection.handle]
2039
2040    def get_long_term_key(
2041        self, connection: Connection, rand: bytes, ediv: int
2042    ) -> Optional[bytes]:
2043        if session := self.sessions.get(connection.handle):
2044            return session.get_long_term_key(rand, ediv)
2045
2046        return None
2047