1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# SMP - Security Manager Protocol 17# 18# See Bluetooth spec @ Vol 3, Part H 19# 20# ----------------------------------------------------------------------------- 21 22# ----------------------------------------------------------------------------- 23# Imports 24# ----------------------------------------------------------------------------- 25from __future__ import annotations 26import logging 27import asyncio 28import enum 29import secrets 30from dataclasses import dataclass 31from typing import ( 32 TYPE_CHECKING, 33 Any, 34 Awaitable, 35 Callable, 36 Dict, 37 List, 38 Optional, 39 Tuple, 40 Type, 41 cast, 42) 43 44from pyee import EventEmitter 45 46from .colors import color 47from .hci import ( 48 Address, 49 HCI_LE_Enable_Encryption_Command, 50 HCI_Object, 51 key_with_value, 52) 53from .core import ( 54 BT_BR_EDR_TRANSPORT, 55 BT_CENTRAL_ROLE, 56 BT_LE_TRANSPORT, 57 AdvertisingData, 58 InvalidArgumentError, 59 ProtocolError, 60 name_or_number, 61) 62from .keys import PairingKeys 63from . import crypto 64 65if TYPE_CHECKING: 66 from bumble.device import Connection, Device 67 from bumble.pairing import PairingConfig 68 69 70# ----------------------------------------------------------------------------- 71# Logging 72# ----------------------------------------------------------------------------- 73logger = logging.getLogger(__name__) 74 75 76# ----------------------------------------------------------------------------- 77# Constants 78# ----------------------------------------------------------------------------- 79# fmt: off 80# pylint: disable=line-too-long 81 82SMP_CID = 0x06 83SMP_BR_CID = 0x07 84 85SMP_PAIRING_REQUEST_COMMAND = 0x01 86SMP_PAIRING_RESPONSE_COMMAND = 0x02 87SMP_PAIRING_CONFIRM_COMMAND = 0x03 88SMP_PAIRING_RANDOM_COMMAND = 0x04 89SMP_PAIRING_FAILED_COMMAND = 0x05 90SMP_ENCRYPTION_INFORMATION_COMMAND = 0x06 91SMP_MASTER_IDENTIFICATION_COMMAND = 0x07 92SMP_IDENTITY_INFORMATION_COMMAND = 0x08 93SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND = 0x09 94SMP_SIGNING_INFORMATION_COMMAND = 0x0A 95SMP_SECURITY_REQUEST_COMMAND = 0x0B 96SMP_PAIRING_PUBLIC_KEY_COMMAND = 0x0C 97SMP_PAIRING_DHKEY_CHECK_COMMAND = 0x0D 98SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND = 0x0E 99 100SMP_COMMAND_NAMES = { 101 SMP_PAIRING_REQUEST_COMMAND: 'SMP_PAIRING_REQUEST_COMMAND', 102 SMP_PAIRING_RESPONSE_COMMAND: 'SMP_PAIRING_RESPONSE_COMMAND', 103 SMP_PAIRING_CONFIRM_COMMAND: 'SMP_PAIRING_CONFIRM_COMMAND', 104 SMP_PAIRING_RANDOM_COMMAND: 'SMP_PAIRING_RANDOM_COMMAND', 105 SMP_PAIRING_FAILED_COMMAND: 'SMP_PAIRING_FAILED_COMMAND', 106 SMP_ENCRYPTION_INFORMATION_COMMAND: 'SMP_ENCRYPTION_INFORMATION_COMMAND', 107 SMP_MASTER_IDENTIFICATION_COMMAND: 'SMP_MASTER_IDENTIFICATION_COMMAND', 108 SMP_IDENTITY_INFORMATION_COMMAND: 'SMP_IDENTITY_INFORMATION_COMMAND', 109 SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND: 'SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND', 110 SMP_SIGNING_INFORMATION_COMMAND: 'SMP_SIGNING_INFORMATION_COMMAND', 111 SMP_SECURITY_REQUEST_COMMAND: 'SMP_SECURITY_REQUEST_COMMAND', 112 SMP_PAIRING_PUBLIC_KEY_COMMAND: 'SMP_PAIRING_PUBLIC_KEY_COMMAND', 113 SMP_PAIRING_DHKEY_CHECK_COMMAND: 'SMP_PAIRING_DHKEY_CHECK_COMMAND', 114 SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND: 'SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND' 115} 116 117SMP_DISPLAY_ONLY_IO_CAPABILITY = 0x00 118SMP_DISPLAY_YES_NO_IO_CAPABILITY = 0x01 119SMP_KEYBOARD_ONLY_IO_CAPABILITY = 0x02 120SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = 0x03 121SMP_KEYBOARD_DISPLAY_IO_CAPABILITY = 0x04 122 123SMP_IO_CAPABILITY_NAMES = { 124 SMP_DISPLAY_ONLY_IO_CAPABILITY: 'SMP_DISPLAY_ONLY_IO_CAPABILITY', 125 SMP_DISPLAY_YES_NO_IO_CAPABILITY: 'SMP_DISPLAY_YES_NO_IO_CAPABILITY', 126 SMP_KEYBOARD_ONLY_IO_CAPABILITY: 'SMP_KEYBOARD_ONLY_IO_CAPABILITY', 127 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: 'SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY', 128 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: 'SMP_KEYBOARD_DISPLAY_IO_CAPABILITY' 129} 130 131SMP_PASSKEY_ENTRY_FAILED_ERROR = 0x01 132SMP_OOB_NOT_AVAILABLE_ERROR = 0x02 133SMP_AUTHENTICATION_REQUIREMENTS_ERROR = 0x03 134SMP_CONFIRM_VALUE_FAILED_ERROR = 0x04 135SMP_PAIRING_NOT_SUPPORTED_ERROR = 0x05 136SMP_ENCRYPTION_KEY_SIZE_ERROR = 0x06 137SMP_COMMAND_NOT_SUPPORTED_ERROR = 0x07 138SMP_UNSPECIFIED_REASON_ERROR = 0x08 139SMP_REPEATED_ATTEMPTS_ERROR = 0x09 140SMP_INVALID_PARAMETERS_ERROR = 0x0A 141SMP_DHKEY_CHECK_FAILED_ERROR = 0x0B 142SMP_NUMERIC_COMPARISON_FAILED_ERROR = 0x0C 143SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR = 0x0D 144SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = 0x0E 145 146SMP_ERROR_NAMES = { 147 SMP_PASSKEY_ENTRY_FAILED_ERROR: 'SMP_PASSKEY_ENTRY_FAILED_ERROR', 148 SMP_OOB_NOT_AVAILABLE_ERROR: 'SMP_OOB_NOT_AVAILABLE_ERROR', 149 SMP_AUTHENTICATION_REQUIREMENTS_ERROR: 'SMP_AUTHENTICATION_REQUIREMENTS_ERROR', 150 SMP_CONFIRM_VALUE_FAILED_ERROR: 'SMP_CONFIRM_VALUE_FAILED_ERROR', 151 SMP_PAIRING_NOT_SUPPORTED_ERROR: 'SMP_PAIRING_NOT_SUPPORTED_ERROR', 152 SMP_ENCRYPTION_KEY_SIZE_ERROR: 'SMP_ENCRYPTION_KEY_SIZE_ERROR', 153 SMP_COMMAND_NOT_SUPPORTED_ERROR: 'SMP_COMMAND_NOT_SUPPORTED_ERROR', 154 SMP_UNSPECIFIED_REASON_ERROR: 'SMP_UNSPECIFIED_REASON_ERROR', 155 SMP_REPEATED_ATTEMPTS_ERROR: 'SMP_REPEATED_ATTEMPTS_ERROR', 156 SMP_INVALID_PARAMETERS_ERROR: 'SMP_INVALID_PARAMETERS_ERROR', 157 SMP_DHKEY_CHECK_FAILED_ERROR: 'SMP_DHKEY_CHECK_FAILED_ERROR', 158 SMP_NUMERIC_COMPARISON_FAILED_ERROR: 'SMP_NUMERIC_COMPARISON_FAILED_ERROR', 159 SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR: 'SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR', 160 SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR: 'SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR' 161} 162 163SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE = 0 164SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE = 1 165SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE = 2 166SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE = 3 167SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE = 4 168 169SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES = { 170 SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE', 171 SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE', 172 SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE', 173 SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE', 174 SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE' 175} 176 177# Bit flags for key distribution/generation 178SMP_ENC_KEY_DISTRIBUTION_FLAG = 0b0001 179SMP_ID_KEY_DISTRIBUTION_FLAG = 0b0010 180SMP_SIGN_KEY_DISTRIBUTION_FLAG = 0b0100 181SMP_LINK_KEY_DISTRIBUTION_FLAG = 0b1000 182 183# AuthReq fields 184SMP_BONDING_AUTHREQ = 0b00000001 185SMP_MITM_AUTHREQ = 0b00000100 186SMP_SC_AUTHREQ = 0b00001000 187SMP_KEYPRESS_AUTHREQ = 0b00010000 188SMP_CT2_AUTHREQ = 0b00100000 189 190# Crypto salt 191SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031') 192SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032') 193 194# fmt: on 195# pylint: enable=line-too-long 196# pylint: disable=invalid-name 197 198 199# ----------------------------------------------------------------------------- 200# Utils 201# ----------------------------------------------------------------------------- 202def error_name(error_code: int) -> str: 203 return name_or_number(SMP_ERROR_NAMES, error_code) 204 205 206# ----------------------------------------------------------------------------- 207# Classes 208# ----------------------------------------------------------------------------- 209class SMP_Command: 210 ''' 211 See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL 212 ''' 213 214 smp_classes: Dict[int, Type[SMP_Command]] = {} 215 fields: Any 216 code = 0 217 name = '' 218 219 @staticmethod 220 def from_bytes(pdu: bytes) -> "SMP_Command": 221 code = pdu[0] 222 223 cls = SMP_Command.smp_classes.get(code) 224 if cls is None: 225 instance = SMP_Command(pdu) 226 instance.name = SMP_Command.command_name(code) 227 instance.code = code 228 return instance 229 self = cls.__new__(cls) 230 SMP_Command.__init__(self, pdu) 231 if hasattr(self, 'fields'): 232 self.init_from_bytes(pdu, 1) 233 return self 234 235 @staticmethod 236 def command_name(code: int) -> str: 237 return name_or_number(SMP_COMMAND_NAMES, code) 238 239 @staticmethod 240 def auth_req_str(value: int) -> str: 241 bonding_flags = value & 3 242 mitm = (value >> 2) & 1 243 sc = (value >> 3) & 1 244 keypress = (value >> 4) & 1 245 ct2 = (value >> 5) & 1 246 247 return ( 248 f'bonding_flags={bonding_flags}, ' 249 f'MITM={mitm}, sc={sc}, keypress={keypress}, ct2={ct2}' 250 ) 251 252 @staticmethod 253 def io_capability_name(io_capability: int) -> str: 254 return name_or_number(SMP_IO_CAPABILITY_NAMES, io_capability) 255 256 @staticmethod 257 def key_distribution_str(value: int) -> str: 258 key_types: List[str] = [] 259 if value & SMP_ENC_KEY_DISTRIBUTION_FLAG: 260 key_types.append('ENC') 261 if value & SMP_ID_KEY_DISTRIBUTION_FLAG: 262 key_types.append('ID') 263 if value & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 264 key_types.append('SIGN') 265 if value & SMP_LINK_KEY_DISTRIBUTION_FLAG: 266 key_types.append('LINK') 267 return ','.join(key_types) 268 269 @staticmethod 270 def keypress_notification_type_name(notification_type: int) -> str: 271 return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type) 272 273 @staticmethod 274 def subclass(fields): 275 def inner(cls): 276 cls.name = cls.__name__.upper() 277 cls.code = key_with_value(SMP_COMMAND_NAMES, cls.name) 278 if cls.code is None: 279 raise KeyError( 280 f'Command name {cls.name} not found in SMP_COMMAND_NAMES' 281 ) 282 cls.fields = fields 283 284 # Register a factory for this class 285 SMP_Command.smp_classes[cls.code] = cls 286 287 return cls 288 289 return inner 290 291 def __init__(self, pdu: Optional[bytes] = None, **kwargs: Any) -> None: 292 if hasattr(self, 'fields') and kwargs: 293 HCI_Object.init_from_fields(self, self.fields, kwargs) 294 if pdu is None: 295 pdu = bytes([self.code]) + HCI_Object.dict_to_bytes(kwargs, self.fields) 296 self.pdu = pdu 297 298 def init_from_bytes(self, pdu: bytes, offset: int) -> None: 299 return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 300 301 def to_bytes(self): 302 return self.pdu 303 304 def __bytes__(self): 305 return self.to_bytes() 306 307 def __str__(self): 308 result = color(self.name, 'yellow') 309 if fields := getattr(self, 'fields', None): 310 result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') 311 else: 312 if len(self.pdu) > 1: 313 result += f': {self.pdu.hex()}' 314 return result 315 316 317# ----------------------------------------------------------------------------- 318@SMP_Command.subclass( 319 [ 320 ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}), 321 ('oob_data_flag', 1), 322 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 323 ('maximum_encryption_key_size', 1), 324 ( 325 'initiator_key_distribution', 326 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 327 ), 328 ( 329 'responder_key_distribution', 330 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 331 ), 332 ] 333) 334class SMP_Pairing_Request_Command(SMP_Command): 335 ''' 336 See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request 337 ''' 338 339 io_capability: int 340 oob_data_flag: int 341 auth_req: int 342 maximum_encryption_key_size: int 343 initiator_key_distribution: int 344 responder_key_distribution: int 345 346 347# ----------------------------------------------------------------------------- 348@SMP_Command.subclass( 349 [ 350 ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}), 351 ('oob_data_flag', 1), 352 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 353 ('maximum_encryption_key_size', 1), 354 ( 355 'initiator_key_distribution', 356 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 357 ), 358 ( 359 'responder_key_distribution', 360 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 361 ), 362 ] 363) 364class SMP_Pairing_Response_Command(SMP_Command): 365 ''' 366 See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response 367 ''' 368 369 io_capability: int 370 oob_data_flag: int 371 auth_req: int 372 maximum_encryption_key_size: int 373 initiator_key_distribution: int 374 responder_key_distribution: int 375 376 377# ----------------------------------------------------------------------------- 378@SMP_Command.subclass([('confirm_value', 16)]) 379class SMP_Pairing_Confirm_Command(SMP_Command): 380 ''' 381 See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm 382 ''' 383 384 confirm_value: bytes 385 386 387# ----------------------------------------------------------------------------- 388@SMP_Command.subclass([('random_value', 16)]) 389class SMP_Pairing_Random_Command(SMP_Command): 390 ''' 391 See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random 392 ''' 393 394 random_value: bytes 395 396 397# ----------------------------------------------------------------------------- 398@SMP_Command.subclass([('reason', {'size': 1, 'mapper': error_name})]) 399class SMP_Pairing_Failed_Command(SMP_Command): 400 ''' 401 See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed 402 ''' 403 404 reason: int 405 406 407# ----------------------------------------------------------------------------- 408@SMP_Command.subclass([('public_key_x', 32), ('public_key_y', 32)]) 409class SMP_Pairing_Public_Key_Command(SMP_Command): 410 ''' 411 See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key 412 ''' 413 414 public_key_x: bytes 415 public_key_y: bytes 416 417 418# ----------------------------------------------------------------------------- 419@SMP_Command.subclass( 420 [ 421 ('dhkey_check', 16), 422 ] 423) 424class SMP_Pairing_DHKey_Check_Command(SMP_Command): 425 ''' 426 See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check 427 ''' 428 429 dhkey_check: bytes 430 431 432# ----------------------------------------------------------------------------- 433@SMP_Command.subclass( 434 [ 435 ( 436 'notification_type', 437 {'size': 1, 'mapper': SMP_Command.keypress_notification_type_name}, 438 ), 439 ] 440) 441class SMP_Pairing_Keypress_Notification_Command(SMP_Command): 442 ''' 443 See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification 444 ''' 445 446 notification_type: int 447 448 449# ----------------------------------------------------------------------------- 450@SMP_Command.subclass([('long_term_key', 16)]) 451class SMP_Encryption_Information_Command(SMP_Command): 452 ''' 453 See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information 454 ''' 455 456 long_term_key: bytes 457 458 459# ----------------------------------------------------------------------------- 460@SMP_Command.subclass([('ediv', 2), ('rand', 8)]) 461class SMP_Master_Identification_Command(SMP_Command): 462 ''' 463 See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification 464 ''' 465 466 ediv: int 467 rand: bytes 468 469 470# ----------------------------------------------------------------------------- 471@SMP_Command.subclass([('identity_resolving_key', 16)]) 472class SMP_Identity_Information_Command(SMP_Command): 473 ''' 474 See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information 475 ''' 476 477 identity_resolving_key: bytes 478 479 480# ----------------------------------------------------------------------------- 481@SMP_Command.subclass( 482 [ 483 ('addr_type', Address.ADDRESS_TYPE_SPEC), 484 ('bd_addr', Address.parse_address_preceded_by_type), 485 ] 486) 487class SMP_Identity_Address_Information_Command(SMP_Command): 488 ''' 489 See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information 490 ''' 491 492 addr_type: int 493 bd_addr: Address 494 495 496# ----------------------------------------------------------------------------- 497@SMP_Command.subclass([('signature_key', 16)]) 498class SMP_Signing_Information_Command(SMP_Command): 499 ''' 500 See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information 501 ''' 502 503 signature_key: bytes 504 505 506# ----------------------------------------------------------------------------- 507@SMP_Command.subclass( 508 [ 509 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 510 ] 511) 512class SMP_Security_Request_Command(SMP_Command): 513 ''' 514 See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request 515 ''' 516 517 auth_req: int 518 519 520# ----------------------------------------------------------------------------- 521def smp_auth_req(bonding: bool, mitm: bool, sc: bool, keypress: bool, ct2: bool) -> int: 522 value = 0 523 if bonding: 524 value |= SMP_BONDING_AUTHREQ 525 if mitm: 526 value |= SMP_MITM_AUTHREQ 527 if sc: 528 value |= SMP_SC_AUTHREQ 529 if keypress: 530 value |= SMP_KEYPRESS_AUTHREQ 531 if ct2: 532 value |= SMP_CT2_AUTHREQ 533 return value 534 535 536# ----------------------------------------------------------------------------- 537class AddressResolver: 538 def __init__(self, resolving_keys): 539 self.resolving_keys = resolving_keys 540 541 def resolve(self, address): 542 address_bytes = bytes(address) 543 hash_part = address_bytes[0:3] 544 prand = address_bytes[3:6] 545 for irk, resolved_address in self.resolving_keys: 546 local_hash = crypto.ah(irk, prand) 547 if local_hash == hash_part: 548 # Match! 549 if resolved_address.address_type == Address.PUBLIC_DEVICE_ADDRESS: 550 resolved_address_type = Address.PUBLIC_IDENTITY_ADDRESS 551 else: 552 resolved_address_type = Address.RANDOM_IDENTITY_ADDRESS 553 return Address( 554 address=str(resolved_address), address_type=resolved_address_type 555 ) 556 557 return None 558 559 560# ----------------------------------------------------------------------------- 561class PairingMethod(enum.IntEnum): 562 JUST_WORKS = 0 563 NUMERIC_COMPARISON = 1 564 PASSKEY = 2 565 OOB = 3 566 CTKD_OVER_CLASSIC = 4 567 568 569# ----------------------------------------------------------------------------- 570class OobContext: 571 """Cryptographic context for LE SC OOB pairing.""" 572 573 ecc_key: crypto.EccKey 574 r: bytes 575 576 def __init__( 577 self, ecc_key: Optional[crypto.EccKey] = None, r: Optional[bytes] = None 578 ) -> None: 579 self.ecc_key = crypto.EccKey.generate() if ecc_key is None else ecc_key 580 self.r = crypto.r() if r is None else r 581 582 def share(self) -> OobSharedData: 583 pkx = self.ecc_key.x[::-1] 584 return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r) 585 586 587# ----------------------------------------------------------------------------- 588class OobLegacyContext: 589 """Cryptographic context for LE Legacy OOB pairing.""" 590 591 tk: bytes 592 593 def __init__(self, tk: Optional[bytes] = None) -> None: 594 self.tk = crypto.r() if tk is None else tk 595 596 597# ----------------------------------------------------------------------------- 598@dataclass 599class OobSharedData: 600 """Shareable data for LE SC OOB pairing.""" 601 602 c: bytes 603 r: bytes 604 605 def to_ad(self) -> AdvertisingData: 606 return AdvertisingData( 607 [ 608 (AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE, self.c), 609 (AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE, self.r), 610 ] 611 ) 612 613 def __str__(self) -> str: 614 return f'OOB(C={self.c.hex()}, R={self.r.hex()})' 615 616 617# ----------------------------------------------------------------------------- 618class Session: 619 # I/O Capability to pairing method decision matrix 620 # 621 # See Bluetooth spec @ Vol 3, part H - Table 2.8: Mapping of IO Capabilities to Key 622 # Generation Method 623 # 624 # Map: initiator -> responder -> <method> 625 # where <method> may be a simple entry or a 2-element tuple, with the first element 626 # for legacy pairing and the second for secure connections, when the two are 627 # different. Each entry is either a method name, or, for PASSKEY, a tuple: 628 # (method, initiator_displays, responder_displays) 629 # to specify if the initiator and responder should display (True) or input a code 630 # (False). 631 PAIRING_METHODS = { 632 SMP_DISPLAY_ONLY_IO_CAPABILITY: { 633 SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS, 634 SMP_DISPLAY_YES_NO_IO_CAPABILITY: PairingMethod.JUST_WORKS, 635 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False), 636 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS, 637 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False), 638 }, 639 SMP_DISPLAY_YES_NO_IO_CAPABILITY: { 640 SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS, 641 SMP_DISPLAY_YES_NO_IO_CAPABILITY: ( 642 PairingMethod.JUST_WORKS, 643 PairingMethod.NUMERIC_COMPARISON, 644 ), 645 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False), 646 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS, 647 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: ( 648 (PairingMethod.PASSKEY, True, False), 649 PairingMethod.NUMERIC_COMPARISON, 650 ), 651 }, 652 SMP_KEYBOARD_ONLY_IO_CAPABILITY: { 653 SMP_DISPLAY_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True), 654 SMP_DISPLAY_YES_NO_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True), 655 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, False), 656 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS, 657 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True), 658 }, 659 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: { 660 SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS, 661 SMP_DISPLAY_YES_NO_IO_CAPABILITY: PairingMethod.JUST_WORKS, 662 SMP_KEYBOARD_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS, 663 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS, 664 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: PairingMethod.JUST_WORKS, 665 }, 666 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: { 667 SMP_DISPLAY_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True), 668 SMP_DISPLAY_YES_NO_IO_CAPABILITY: ( 669 (PairingMethod.PASSKEY, False, True), 670 PairingMethod.NUMERIC_COMPARISON, 671 ), 672 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False), 673 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS, 674 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: ( 675 (PairingMethod.PASSKEY, True, False), 676 PairingMethod.NUMERIC_COMPARISON, 677 ), 678 }, 679 } 680 681 ea: bytes 682 eb: bytes 683 ltk: bytes 684 preq: bytes 685 pres: bytes 686 tk: bytes 687 688 def __init__( 689 self, 690 manager: Manager, 691 connection: Connection, 692 pairing_config: PairingConfig, 693 is_initiator: bool, 694 ) -> None: 695 self.manager = manager 696 self.connection = connection 697 self.stk = None 698 self.ltk_ediv = 0 699 self.ltk_rand = bytes(8) 700 self.link_key: Optional[bytes] = None 701 self.initiator_key_distribution: int = 0 702 self.responder_key_distribution: int = 0 703 self.peer_random_value: Optional[bytes] = None 704 self.peer_public_key_x: bytes = bytes(32) 705 self.peer_public_key_y = bytes(32) 706 self.peer_ltk = None 707 self.peer_ediv = None 708 self.peer_rand: Optional[bytes] = None 709 self.peer_identity_resolving_key = None 710 self.peer_bd_addr: Optional[Address] = None 711 self.peer_signature_key = None 712 self.peer_expected_distributions: List[Type[SMP_Command]] = [] 713 self.dh_key = b'' 714 self.confirm_value = None 715 self.passkey: Optional[int] = None 716 self.passkey_ready = asyncio.Event() 717 self.passkey_step = 0 718 self.passkey_display = False 719 self.pairing_method: PairingMethod = PairingMethod.JUST_WORKS 720 self.pairing_config = pairing_config 721 self.wait_before_continuing: Optional[asyncio.Future[None]] = None 722 self.completed = False 723 self.ctkd_task: Optional[Awaitable[None]] = None 724 725 # Decide if we're the initiator or the responder 726 self.is_initiator = is_initiator 727 self.is_responder = not self.is_initiator 728 729 # Listen for connection events 730 connection.on('disconnection', self.on_disconnection) 731 connection.on( 732 'connection_encryption_change', self.on_connection_encryption_change 733 ) 734 connection.on( 735 'connection_encryption_key_refresh', 736 self.on_connection_encryption_key_refresh, 737 ) 738 739 # Create a future that can be used to wait for the session to complete 740 if self.is_initiator: 741 self.pairing_result: Optional[asyncio.Future[None]] = ( 742 asyncio.get_running_loop().create_future() 743 ) 744 else: 745 self.pairing_result = None 746 747 # Key Distribution (default values before negotiation) 748 self.initiator_key_distribution = ( 749 pairing_config.delegate.local_initiator_key_distribution 750 ) 751 self.responder_key_distribution = ( 752 pairing_config.delegate.local_responder_key_distribution 753 ) 754 755 # Authentication Requirements Flags - Vol 3, Part H, Figure 3.3 756 self.bonding: bool = pairing_config.bonding 757 self.sc: bool = pairing_config.sc 758 self.mitm: bool = pairing_config.mitm 759 self.keypress = False 760 self.ct2: bool = False 761 762 # I/O Capabilities 763 self.io_capability = pairing_config.delegate.io_capability 764 self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY 765 766 # OOB 767 self.oob_data_flag = ( 768 1 if pairing_config.oob and pairing_config.oob.peer_data else 0 769 ) 770 771 # Set up addresses 772 self_address = connection.self_resolvable_address or connection.self_address 773 peer_address = connection.peer_resolvable_address or connection.peer_address 774 logger.debug( 775 f"pairing with self_address={self_address}, peer_address={peer_address}" 776 ) 777 if self.is_initiator: 778 self.ia = bytes(self_address) 779 self.iat = 1 if self_address.is_random else 0 780 self.ra = bytes(peer_address) 781 self.rat = 1 if peer_address.is_random else 0 782 else: 783 self.ra = bytes(self_address) 784 self.rat = 1 if self_address.is_random else 0 785 self.ia = bytes(peer_address) 786 self.iat = 1 if peer_address.is_random else 0 787 788 # Select the ECC key, TK and r initial value 789 if pairing_config.oob: 790 self.peer_oob_data = pairing_config.oob.peer_data 791 if pairing_config.sc: 792 if pairing_config.oob.our_context is None: 793 raise InvalidArgumentError( 794 "oob pairing config requires a context when sc is True" 795 ) 796 self.r = pairing_config.oob.our_context.r 797 self.ecc_key = pairing_config.oob.our_context.ecc_key 798 if pairing_config.oob.legacy_context is not None: 799 self.tk = pairing_config.oob.legacy_context.tk 800 else: 801 if pairing_config.oob.legacy_context is None: 802 raise InvalidArgumentError( 803 "oob pairing config requires a legacy context when sc is False" 804 ) 805 self.r = bytes(16) 806 self.ecc_key = manager.ecc_key 807 self.tk = pairing_config.oob.legacy_context.tk 808 else: 809 self.peer_oob_data = None 810 self.r = bytes(16) 811 self.ecc_key = manager.ecc_key 812 self.tk = bytes(16) 813 814 @property 815 def pkx(self) -> Tuple[bytes, bytes]: 816 return (self.ecc_key.x[::-1], self.peer_public_key_x) 817 818 @property 819 def pka(self) -> bytes: 820 return self.pkx[0 if self.is_initiator else 1] 821 822 @property 823 def pkb(self) -> bytes: 824 return self.pkx[0 if self.is_responder else 1] 825 826 @property 827 def nx(self) -> Tuple[bytes, bytes]: 828 assert self.peer_random_value 829 return (self.r, self.peer_random_value) 830 831 @property 832 def na(self) -> bytes: 833 return self.nx[0 if self.is_initiator else 1] 834 835 @property 836 def nb(self) -> bytes: 837 return self.nx[0 if self.is_responder else 1] 838 839 @property 840 def auth_req(self) -> int: 841 return smp_auth_req(self.bonding, self.mitm, self.sc, self.keypress, self.ct2) 842 843 def get_long_term_key(self, rand: bytes, ediv: int) -> Optional[bytes]: 844 if not self.sc and not self.completed: 845 if rand == self.ltk_rand and ediv == self.ltk_ediv: 846 return self.stk 847 else: 848 return self.ltk 849 850 return None 851 852 def decide_pairing_method( 853 self, 854 auth_req: int, 855 initiator_io_capability: int, 856 responder_io_capability: int, 857 ) -> None: 858 if self.connection.transport == BT_BR_EDR_TRANSPORT: 859 self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC 860 return 861 if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0): 862 self.pairing_method = PairingMethod.JUST_WORKS 863 return 864 865 details = self.PAIRING_METHODS[initiator_io_capability][responder_io_capability] # type: ignore[index] 866 if isinstance(details, tuple) and len(details) == 2: 867 # One entry for legacy pairing and one for secure connections 868 details = details[1 if self.sc else 0] 869 if isinstance(details, PairingMethod): 870 # Just a method ID 871 self.pairing_method = details 872 else: 873 # PASSKEY method, with a method ID and display/input flags 874 assert isinstance(details[0], PairingMethod) 875 self.pairing_method = details[0] 876 self.passkey_display = details[1 if self.is_initiator else 2] 877 878 def check_expected_value( 879 self, expected: bytes, received: bytes, error: int 880 ) -> bool: 881 logger.debug(f'expected={expected.hex()} got={received.hex()}') 882 if expected != received: 883 logger.info(color('pairing confirm/check mismatch', 'red')) 884 self.send_pairing_failed(error) 885 return False 886 return True 887 888 def prompt_user_for_confirmation(self, next_steps: Callable[[], None]) -> None: 889 async def prompt() -> None: 890 logger.debug('ask for confirmation') 891 try: 892 response = await self.pairing_config.delegate.confirm() 893 if response: 894 next_steps() 895 return 896 except Exception as error: 897 logger.warning(f'exception while confirm: {error}') 898 899 self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR) 900 901 self.connection.abort_on('disconnection', prompt()) 902 903 def prompt_user_for_numeric_comparison( 904 self, code: int, next_steps: Callable[[], None] 905 ) -> None: 906 async def prompt() -> None: 907 logger.debug(f'verification code: {code}') 908 try: 909 response = await self.pairing_config.delegate.compare_numbers( 910 code, digits=6 911 ) 912 if response: 913 next_steps() 914 return 915 except Exception as error: 916 logger.warning(f'exception while prompting: {error}') 917 918 self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR) 919 920 self.connection.abort_on('disconnection', prompt()) 921 922 def prompt_user_for_number(self, next_steps: Callable[[int], None]) -> None: 923 async def prompt() -> None: 924 logger.debug('prompting user for passkey') 925 try: 926 passkey = await self.pairing_config.delegate.get_number() 927 if passkey is None: 928 logger.debug('Passkey request rejected') 929 self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR) 930 return 931 logger.debug(f'user input: {passkey}') 932 next_steps(passkey) 933 except Exception as error: 934 logger.warning(f'exception while prompting: {error}') 935 self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR) 936 937 self.connection.abort_on('disconnection', prompt()) 938 939 def display_passkey(self) -> None: 940 # Generate random Passkey/PIN code 941 self.passkey = secrets.randbelow(1000000) 942 assert self.passkey is not None 943 logger.debug(f'Pairing PIN CODE: {self.passkey:06}') 944 self.passkey_ready.set() 945 946 # The value of TK is computed from the PIN code 947 if not self.sc: 948 self.tk = self.passkey.to_bytes(16, byteorder='little') 949 logger.debug(f'TK from passkey = {self.tk.hex()}') 950 951 try: 952 self.connection.abort_on( 953 'disconnection', 954 self.pairing_config.delegate.display_number(self.passkey, digits=6), 955 ) 956 except Exception as error: 957 logger.warning(f'exception while displaying number: {error}') 958 959 def input_passkey(self, next_steps: Optional[Callable[[], None]] = None) -> None: 960 # Prompt the user for the passkey displayed on the peer 961 def after_input(passkey: int) -> None: 962 self.passkey = passkey 963 964 if not self.sc: 965 self.tk = passkey.to_bytes(16, byteorder='little') 966 logger.debug(f'TK from passkey = {self.tk.hex()}') 967 968 self.passkey_ready.set() 969 970 if next_steps is not None: 971 next_steps() 972 973 self.prompt_user_for_number(after_input) 974 975 def display_or_input_passkey( 976 self, next_steps: Optional[Callable[[], None]] = None 977 ) -> None: 978 if self.passkey_display: 979 self.display_passkey() 980 if next_steps is not None: 981 next_steps() 982 else: 983 self.input_passkey(next_steps) 984 985 def send_command(self, command: SMP_Command) -> None: 986 self.manager.send_command(self.connection, command) 987 988 def send_pairing_failed(self, error: int) -> None: 989 self.send_command(SMP_Pairing_Failed_Command(reason=error)) 990 self.on_pairing_failure(error) 991 992 def send_pairing_request_command(self) -> None: 993 self.manager.on_session_start(self) 994 995 command = SMP_Pairing_Request_Command( 996 io_capability=self.io_capability, 997 oob_data_flag=self.oob_data_flag, 998 auth_req=self.auth_req, 999 maximum_encryption_key_size=16, 1000 initiator_key_distribution=self.initiator_key_distribution, 1001 responder_key_distribution=self.responder_key_distribution, 1002 ) 1003 self.preq = bytes(command) 1004 self.send_command(command) 1005 1006 def send_pairing_response_command(self) -> None: 1007 response = SMP_Pairing_Response_Command( 1008 io_capability=self.io_capability, 1009 oob_data_flag=self.oob_data_flag, 1010 auth_req=self.auth_req, 1011 maximum_encryption_key_size=16, 1012 initiator_key_distribution=self.initiator_key_distribution, 1013 responder_key_distribution=self.responder_key_distribution, 1014 ) 1015 self.pres = bytes(response) 1016 self.send_command(response) 1017 1018 def send_pairing_confirm_command(self) -> None: 1019 1020 if self.pairing_method != PairingMethod.OOB: 1021 self.r = crypto.r() 1022 logger.debug(f'generated random: {self.r.hex()}') 1023 1024 if self.sc: 1025 1026 async def next_steps() -> None: 1027 if self.pairing_method in ( 1028 PairingMethod.JUST_WORKS, 1029 PairingMethod.NUMERIC_COMPARISON, 1030 ): 1031 z = 0 1032 elif self.pairing_method == PairingMethod.PASSKEY: 1033 # We need a passkey 1034 await self.passkey_ready.wait() 1035 assert self.passkey 1036 1037 z = 0x80 + ((self.passkey >> self.passkey_step) & 1) 1038 else: 1039 return 1040 1041 if self.is_initiator: 1042 confirm_value = crypto.f4(self.pka, self.pkb, self.r, bytes([z])) 1043 else: 1044 confirm_value = crypto.f4(self.pkb, self.pka, self.r, bytes([z])) 1045 1046 self.send_command( 1047 SMP_Pairing_Confirm_Command(confirm_value=confirm_value) 1048 ) 1049 1050 # Perform the next steps asynchronously in case we need to wait for input 1051 self.connection.abort_on('disconnection', next_steps()) 1052 else: 1053 confirm_value = crypto.c1( 1054 self.tk, 1055 self.r, 1056 self.preq, 1057 self.pres, 1058 self.iat, 1059 self.rat, 1060 self.ia, 1061 self.ra, 1062 ) 1063 1064 self.send_command(SMP_Pairing_Confirm_Command(confirm_value=confirm_value)) 1065 1066 def send_pairing_random_command(self) -> None: 1067 self.send_command(SMP_Pairing_Random_Command(random_value=self.r)) 1068 1069 def send_public_key_command(self) -> None: 1070 self.send_command( 1071 SMP_Pairing_Public_Key_Command( 1072 public_key_x=self.ecc_key.x[::-1], 1073 public_key_y=self.ecc_key.y[::-1], 1074 ) 1075 ) 1076 1077 def send_pairing_dhkey_check_command(self) -> None: 1078 self.send_command( 1079 SMP_Pairing_DHKey_Check_Command( 1080 dhkey_check=self.ea if self.is_initiator else self.eb 1081 ) 1082 ) 1083 1084 def send_identity_address_command(self) -> None: 1085 if self.pairing_config.identity_address_type == Address.PUBLIC_DEVICE_ADDRESS: 1086 identity_address = self.manager.device.public_address 1087 elif self.pairing_config.identity_address_type == Address.RANDOM_DEVICE_ADDRESS: 1088 identity_address = self.manager.device.static_address 1089 else: 1090 # No identity address type set. If the controller has a public address, it 1091 # will be more responsible to be the identity address. 1092 if self.manager.device.public_address != Address.ANY: 1093 logger.debug("No identity address type set, using PUBLIC") 1094 identity_address = self.manager.device.public_address 1095 else: 1096 logger.debug("No identity address type set, using RANDOM") 1097 identity_address = self.manager.device.static_address 1098 self.send_command( 1099 SMP_Identity_Address_Information_Command( 1100 addr_type=identity_address.address_type, 1101 bd_addr=identity_address, 1102 ) 1103 ) 1104 1105 def start_encryption(self, key: bytes) -> None: 1106 # We can now encrypt the connection with the short term key, so that we can 1107 # distribute the long term and/or other keys over an encrypted connection 1108 self.manager.device.host.send_command_sync( 1109 HCI_LE_Enable_Encryption_Command( 1110 connection_handle=self.connection.handle, 1111 random_number=bytes(8), 1112 encrypted_diversifier=0, 1113 long_term_key=key, 1114 ) 1115 ) 1116 1117 @classmethod 1118 def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes: 1119 '''Derives Long Term Key from Link Key. 1120 1121 Args: 1122 link_key: BR/EDR Link Key bytes in little-endian. 1123 ct2: whether ct2 is supported on both devices. 1124 Returns: 1125 LE Long Tern Key bytes in little-endian. 1126 ''' 1127 ilk = ( 1128 crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key) 1129 if ct2 1130 else crypto.h6(link_key, b'tmp2') 1131 ) 1132 return crypto.h6(ilk, b'brle') 1133 1134 @classmethod 1135 def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes: 1136 '''Derives Link Key from Long Term Key. 1137 1138 Args: 1139 ltk: LE Long Term Key bytes in little-endian. 1140 ct2: whether ct2 is supported on both devices. 1141 Returns: 1142 BR/EDR Link Key bytes in little-endian. 1143 ''' 1144 ilk = ( 1145 crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk) 1146 if ct2 1147 else crypto.h6(ltk, b'tmp1') 1148 ) 1149 return crypto.h6(ilk, b'lebr') 1150 1151 async def get_link_key_and_derive_ltk(self) -> None: 1152 '''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.''' 1153 self.link_key = await self.manager.device.get_link_key( 1154 self.connection.peer_address 1155 ) 1156 if self.link_key is None: 1157 logging.warning( 1158 'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!' 1159 ) 1160 self.send_pairing_failed( 1161 SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR 1162 ) 1163 else: 1164 self.ltk = self.derive_ltk(self.link_key, self.ct2) 1165 1166 def distribute_keys(self) -> None: 1167 # Distribute the keys as required 1168 if self.is_initiator: 1169 # CTKD: Derive LTK from LinkKey 1170 if ( 1171 self.connection.transport == BT_BR_EDR_TRANSPORT 1172 and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG 1173 ): 1174 self.ctkd_task = self.connection.abort_on( 1175 'disconnection', self.get_link_key_and_derive_ltk() 1176 ) 1177 elif not self.sc: 1178 # Distribute the LTK, EDIV and RAND 1179 if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: 1180 self.send_command( 1181 SMP_Encryption_Information_Command(long_term_key=self.ltk) 1182 ) 1183 self.send_command( 1184 SMP_Master_Identification_Command( 1185 ediv=self.ltk_ediv, rand=self.ltk_rand 1186 ) 1187 ) 1188 1189 # Distribute IRK & BD ADDR 1190 if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: 1191 self.send_command( 1192 SMP_Identity_Information_Command( 1193 identity_resolving_key=self.manager.device.irk 1194 ) 1195 ) 1196 self.send_identity_address_command() 1197 1198 # Distribute CSRK 1199 csrk = bytes(16) # FIXME: testing 1200 if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 1201 self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) 1202 1203 # CTKD, calculate BR/EDR link key 1204 if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: 1205 self.link_key = self.derive_link_key(self.ltk, self.ct2) 1206 1207 else: 1208 # CTKD: Derive LTK from LinkKey 1209 if ( 1210 self.connection.transport == BT_BR_EDR_TRANSPORT 1211 and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG 1212 ): 1213 self.ctkd_task = self.connection.abort_on( 1214 'disconnection', self.get_link_key_and_derive_ltk() 1215 ) 1216 # Distribute the LTK, EDIV and RAND 1217 elif not self.sc: 1218 if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: 1219 self.send_command( 1220 SMP_Encryption_Information_Command(long_term_key=self.ltk) 1221 ) 1222 self.send_command( 1223 SMP_Master_Identification_Command( 1224 ediv=self.ltk_ediv, rand=self.ltk_rand 1225 ) 1226 ) 1227 1228 # Distribute IRK & BD ADDR 1229 if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: 1230 self.send_command( 1231 SMP_Identity_Information_Command( 1232 identity_resolving_key=self.manager.device.irk 1233 ) 1234 ) 1235 self.send_identity_address_command() 1236 1237 # Distribute CSRK 1238 csrk = bytes(16) # FIXME: testing 1239 if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 1240 self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) 1241 1242 # CTKD, calculate BR/EDR link key 1243 if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: 1244 self.link_key = self.derive_link_key(self.ltk, self.ct2) 1245 1246 def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None: 1247 # Set our expectations for what to wait for in the key distribution phase 1248 self.peer_expected_distributions = [] 1249 if not self.sc and self.connection.transport == BT_LE_TRANSPORT: 1250 if key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0: 1251 self.peer_expected_distributions.append( 1252 SMP_Encryption_Information_Command 1253 ) 1254 self.peer_expected_distributions.append( 1255 SMP_Master_Identification_Command 1256 ) 1257 if key_distribution_flags & SMP_ID_KEY_DISTRIBUTION_FLAG != 0: 1258 self.peer_expected_distributions.append(SMP_Identity_Information_Command) 1259 self.peer_expected_distributions.append( 1260 SMP_Identity_Address_Information_Command 1261 ) 1262 if key_distribution_flags & SMP_SIGN_KEY_DISTRIBUTION_FLAG != 0: 1263 self.peer_expected_distributions.append(SMP_Signing_Information_Command) 1264 logger.debug( 1265 'expecting distributions: ' 1266 f'{[c.__name__ for c in self.peer_expected_distributions]}' 1267 ) 1268 1269 def check_key_distribution(self, command_class: Type[SMP_Command]) -> None: 1270 # First, check that the connection is encrypted 1271 if not self.connection.is_encrypted: 1272 logger.warning( 1273 color('received key distribution on a non-encrypted connection', 'red') 1274 ) 1275 self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) 1276 return 1277 1278 # Check that this command class is expected 1279 if command_class in self.peer_expected_distributions: 1280 self.peer_expected_distributions.remove(command_class) 1281 logger.debug( 1282 'remaining distributions: ' 1283 f'{[c.__name__ for c in self.peer_expected_distributions]}' 1284 ) 1285 if not self.peer_expected_distributions: 1286 self.on_peer_key_distribution_complete() 1287 else: 1288 logger.warning( 1289 color( 1290 '!!! unexpected key distribution command: ' 1291 f'{command_class.__name__}', 1292 'red', 1293 ) 1294 ) 1295 self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) 1296 1297 async def pair(self) -> None: 1298 # Start pairing as an initiator 1299 # TODO: check that this session isn't already active 1300 1301 # Send the pairing request to start the process 1302 self.send_pairing_request_command() 1303 1304 # Wait for the pairing process to finish 1305 assert self.pairing_result 1306 await self.connection.abort_on('disconnection', self.pairing_result) 1307 1308 def on_disconnection(self, _: int) -> None: 1309 self.connection.remove_listener('disconnection', self.on_disconnection) 1310 self.connection.remove_listener( 1311 'connection_encryption_change', self.on_connection_encryption_change 1312 ) 1313 self.connection.remove_listener( 1314 'connection_encryption_key_refresh', 1315 self.on_connection_encryption_key_refresh, 1316 ) 1317 self.manager.on_session_end(self) 1318 1319 def on_peer_key_distribution_complete(self) -> None: 1320 # The initiator can now send its keys 1321 if self.is_initiator: 1322 self.distribute_keys() 1323 1324 self.connection.abort_on('disconnection', self.on_pairing()) 1325 1326 def on_connection_encryption_change(self) -> None: 1327 if self.connection.is_encrypted: 1328 if self.is_responder: 1329 # The responder distributes its keys first, the initiator later 1330 self.distribute_keys() 1331 1332 # If we're not expecting key distributions from the peer, we're done 1333 if not self.peer_expected_distributions: 1334 self.on_peer_key_distribution_complete() 1335 1336 def on_connection_encryption_key_refresh(self) -> None: 1337 # Do as if the connection had just been encrypted 1338 self.on_connection_encryption_change() 1339 1340 async def on_pairing(self) -> None: 1341 logger.debug('pairing complete') 1342 1343 if self.completed: 1344 return 1345 1346 self.completed = True 1347 1348 if self.pairing_result is not None and not self.pairing_result.done(): 1349 self.pairing_result.set_result(None) 1350 1351 # Use the peer address from the pairing protocol or the connection 1352 if self.peer_bd_addr is not None: 1353 peer_address = self.peer_bd_addr 1354 else: 1355 peer_address = self.connection.peer_address 1356 1357 # Wait for link key fetch and key derivation 1358 if self.ctkd_task is not None: 1359 await self.ctkd_task 1360 self.ctkd_task = None 1361 1362 # Create an object to hold the keys 1363 keys = PairingKeys() 1364 keys.address_type = peer_address.address_type 1365 authenticated = self.pairing_method != PairingMethod.JUST_WORKS 1366 if self.sc or self.connection.transport == BT_BR_EDR_TRANSPORT: 1367 keys.ltk = PairingKeys.Key(value=self.ltk, authenticated=authenticated) 1368 else: 1369 our_ltk_key = PairingKeys.Key( 1370 value=self.ltk, 1371 authenticated=authenticated, 1372 ediv=self.ltk_ediv, 1373 rand=self.ltk_rand, 1374 ) 1375 peer_ltk_key = PairingKeys.Key( 1376 value=self.peer_ltk, 1377 authenticated=authenticated, 1378 ediv=self.peer_ediv, 1379 rand=self.peer_rand, 1380 ) 1381 if self.is_initiator: 1382 keys.ltk_central = peer_ltk_key 1383 keys.ltk_peripheral = our_ltk_key 1384 else: 1385 keys.ltk_central = our_ltk_key 1386 keys.ltk_peripheral = peer_ltk_key 1387 if self.peer_identity_resolving_key is not None: 1388 keys.irk = PairingKeys.Key( 1389 value=self.peer_identity_resolving_key, authenticated=authenticated 1390 ) 1391 if self.peer_signature_key is not None: 1392 keys.csrk = PairingKeys.Key( 1393 value=self.peer_signature_key, authenticated=authenticated 1394 ) 1395 if self.link_key is not None: 1396 keys.link_key = PairingKeys.Key( 1397 value=self.link_key, authenticated=authenticated 1398 ) 1399 await self.manager.on_pairing(self, peer_address, keys) 1400 1401 def on_pairing_failure(self, reason: int) -> None: 1402 logger.warning(f'pairing failure ({error_name(reason)})') 1403 1404 if self.completed: 1405 return 1406 1407 self.completed = True 1408 1409 error = ProtocolError(reason, 'smp', error_name(reason)) 1410 if self.pairing_result is not None and not self.pairing_result.done(): 1411 self.pairing_result.set_exception(error) 1412 self.manager.on_pairing_failure(self, reason) 1413 1414 def on_smp_command(self, command: SMP_Command) -> None: 1415 # Find the handler method 1416 handler_name = f'on_{command.name.lower()}' 1417 handler = getattr(self, handler_name, None) 1418 if handler is not None: 1419 try: 1420 handler(command) 1421 except Exception as error: 1422 logger.exception(f'{color("!!! Exception in handler:", "red")} {error}') 1423 response = SMP_Pairing_Failed_Command( 1424 reason=SMP_UNSPECIFIED_REASON_ERROR 1425 ) 1426 self.send_command(response) 1427 else: 1428 logger.error(color('SMP command not handled???', 'red')) 1429 1430 def on_smp_pairing_request_command( 1431 self, command: SMP_Pairing_Request_Command 1432 ) -> None: 1433 self.connection.abort_on( 1434 'disconnection', self.on_smp_pairing_request_command_async(command) 1435 ) 1436 1437 async def on_smp_pairing_request_command_async( 1438 self, command: SMP_Pairing_Request_Command 1439 ) -> None: 1440 # Check if the request should proceed 1441 try: 1442 accepted = await self.pairing_config.delegate.accept() 1443 except Exception as error: 1444 logger.warning(f'exception while accepting: {error}') 1445 accepted = False 1446 if not accepted: 1447 logger.debug('pairing rejected by delegate') 1448 self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR) 1449 return 1450 1451 # Save the request 1452 self.preq = bytes(command) 1453 1454 # Bonding and SC require both sides to request/support it 1455 self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) 1456 self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) 1457 self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0) 1458 1459 # Infer the pairing method 1460 if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( 1461 not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0) 1462 ): 1463 # Use OOB 1464 self.pairing_method = PairingMethod.OOB 1465 if not self.sc and self.tk is None: 1466 # For legacy OOB, TK is required. 1467 logger.warning("legacy OOB without TK") 1468 self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) 1469 return 1470 if command.oob_data_flag == 0: 1471 # The peer doesn't have OOB data, use r=0 1472 self.r = bytes(16) 1473 else: 1474 # Decide which pairing method to use from the IO capability 1475 self.decide_pairing_method( 1476 command.auth_req, 1477 command.io_capability, 1478 self.io_capability, 1479 ) 1480 1481 logger.debug(f'pairing method: {self.pairing_method.name}') 1482 1483 # Key distribution 1484 ( 1485 self.initiator_key_distribution, 1486 self.responder_key_distribution, 1487 ) = await self.pairing_config.delegate.key_distribution_response( 1488 command.initiator_key_distribution, command.responder_key_distribution 1489 ) 1490 self.compute_peer_expected_distributions(self.initiator_key_distribution) 1491 1492 # The pairing is now starting 1493 self.manager.on_session_start(self) 1494 1495 # Display a passkey if we need to 1496 if not self.sc: 1497 if self.pairing_method == PairingMethod.PASSKEY and self.passkey_display: 1498 self.display_passkey() 1499 1500 # Respond 1501 self.send_pairing_response_command() 1502 1503 # Vol 3, Part C, 5.2.2.1.3 1504 # CTKD over BR/EDR should happen after the connection has been encrypted, 1505 # so when receiving pairing requests, responder should start distributing keys 1506 if ( 1507 self.connection.transport == BT_BR_EDR_TRANSPORT 1508 and self.connection.is_encrypted 1509 and self.is_responder 1510 and accepted 1511 ): 1512 self.distribute_keys() 1513 1514 def on_smp_pairing_response_command( 1515 self, command: SMP_Pairing_Response_Command 1516 ) -> None: 1517 if self.is_responder: 1518 logger.warning(color('received pairing response as a responder', 'red')) 1519 return 1520 1521 # Save the response 1522 self.pres = bytes(command) 1523 self.peer_io_capability = command.io_capability 1524 1525 # Bonding and SC require both sides to request/support it 1526 self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) 1527 self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) 1528 1529 # Infer the pairing method 1530 if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( 1531 not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0) 1532 ): 1533 # Use OOB 1534 self.pairing_method = PairingMethod.OOB 1535 if not self.sc and self.tk is None: 1536 # For legacy OOB, TK is required. 1537 logger.warning("legacy OOB without TK") 1538 self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) 1539 return 1540 if command.oob_data_flag == 0: 1541 # The peer doesn't have OOB data, use r=0 1542 self.r = bytes(16) 1543 else: 1544 # Decide which pairing method to use from the IO capability 1545 self.decide_pairing_method( 1546 command.auth_req, self.io_capability, command.io_capability 1547 ) 1548 1549 logger.debug(f'pairing method: {self.pairing_method.name}') 1550 1551 # Key distribution 1552 if ( 1553 command.initiator_key_distribution & ~self.initiator_key_distribution != 0 1554 ) or ( 1555 command.responder_key_distribution & ~self.responder_key_distribution != 0 1556 ): 1557 # The response isn't a subset of the request 1558 self.send_pairing_failed(SMP_INVALID_PARAMETERS_ERROR) 1559 return 1560 self.initiator_key_distribution = command.initiator_key_distribution 1561 self.responder_key_distribution = command.responder_key_distribution 1562 self.compute_peer_expected_distributions(self.responder_key_distribution) 1563 1564 # Start phase 2 1565 if self.pairing_method == PairingMethod.CTKD_OVER_CLASSIC: 1566 # Authentication is already done in SMP, so remote shall start keys distribution immediately 1567 return 1568 elif self.sc: 1569 if self.pairing_method == PairingMethod.PASSKEY: 1570 self.display_or_input_passkey() 1571 1572 self.send_public_key_command() 1573 else: 1574 if self.pairing_method == PairingMethod.PASSKEY: 1575 self.display_or_input_passkey(self.send_pairing_confirm_command) 1576 else: 1577 self.send_pairing_confirm_command() 1578 1579 def on_smp_pairing_confirm_command_legacy( 1580 self, _: SMP_Pairing_Confirm_Command 1581 ) -> None: 1582 if self.is_initiator: 1583 self.send_pairing_random_command() 1584 else: 1585 # If the method is PASSKEY, now is the time to input the code 1586 if ( 1587 self.pairing_method == PairingMethod.PASSKEY 1588 and not self.passkey_display 1589 ): 1590 self.input_passkey(self.send_pairing_confirm_command) 1591 else: 1592 self.send_pairing_confirm_command() 1593 1594 def on_smp_pairing_confirm_command_secure_connections( 1595 self, _: SMP_Pairing_Confirm_Command 1596 ) -> None: 1597 if self.pairing_method in ( 1598 PairingMethod.JUST_WORKS, 1599 PairingMethod.NUMERIC_COMPARISON, 1600 ): 1601 if self.is_initiator: 1602 self.r = crypto.r() 1603 self.send_pairing_random_command() 1604 elif self.pairing_method == PairingMethod.PASSKEY: 1605 if self.is_initiator: 1606 self.send_pairing_random_command() 1607 else: 1608 self.send_pairing_confirm_command() 1609 1610 def on_smp_pairing_confirm_command( 1611 self, command: SMP_Pairing_Confirm_Command 1612 ) -> None: 1613 self.confirm_value = command.confirm_value 1614 if self.sc: 1615 self.on_smp_pairing_confirm_command_secure_connections(command) 1616 else: 1617 self.on_smp_pairing_confirm_command_legacy(command) 1618 1619 def on_smp_pairing_random_command_legacy( 1620 self, command: SMP_Pairing_Random_Command 1621 ) -> None: 1622 # Check that the confirmation values match 1623 confirm_verifier = crypto.c1( 1624 self.tk, 1625 command.random_value, 1626 self.preq, 1627 self.pres, 1628 self.iat, 1629 self.rat, 1630 self.ia, 1631 self.ra, 1632 ) 1633 assert self.confirm_value 1634 if not self.check_expected_value( 1635 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1636 ): 1637 return 1638 1639 # Compute STK 1640 if self.is_initiator: 1641 mrand = self.r 1642 srand = command.random_value 1643 else: 1644 srand = self.r 1645 mrand = command.random_value 1646 self.stk = crypto.s1(self.tk, srand, mrand) 1647 logger.debug(f'STK = {self.stk.hex()}') 1648 1649 # Generate LTK 1650 self.ltk = crypto.r() 1651 1652 if self.is_initiator: 1653 self.start_encryption(self.stk) 1654 else: 1655 self.send_pairing_random_command() 1656 1657 def on_smp_pairing_random_command_secure_connections( 1658 self, command: SMP_Pairing_Random_Command 1659 ) -> None: 1660 if self.pairing_method == PairingMethod.PASSKEY and self.passkey is None: 1661 logger.warning('no passkey entered, ignoring command') 1662 return 1663 1664 # pylint: disable=too-many-return-statements 1665 if self.is_initiator: 1666 if self.pairing_method in ( 1667 PairingMethod.JUST_WORKS, 1668 PairingMethod.NUMERIC_COMPARISON, 1669 ): 1670 assert self.confirm_value 1671 # Check that the random value matches what was committed to earlier 1672 confirm_verifier = crypto.f4( 1673 self.pkb, self.pka, command.random_value, bytes([0]) 1674 ) 1675 if not self.check_expected_value( 1676 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1677 ): 1678 return 1679 elif self.pairing_method == PairingMethod.PASSKEY: 1680 assert self.passkey and self.confirm_value 1681 # Check that the random value matches what was committed to earlier 1682 confirm_verifier = crypto.f4( 1683 self.pkb, 1684 self.pka, 1685 command.random_value, 1686 bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]), 1687 ) 1688 if not self.check_expected_value( 1689 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1690 ): 1691 return 1692 1693 # Move on to the next iteration 1694 self.passkey_step += 1 1695 logger.debug(f'passkey finished step {self.passkey_step} of 20') 1696 if self.passkey_step < 20: 1697 self.send_pairing_confirm_command() 1698 return 1699 elif self.pairing_method != PairingMethod.OOB: 1700 return 1701 else: 1702 if self.pairing_method in ( 1703 PairingMethod.JUST_WORKS, 1704 PairingMethod.NUMERIC_COMPARISON, 1705 PairingMethod.OOB, 1706 ): 1707 self.send_pairing_random_command() 1708 elif self.pairing_method == PairingMethod.PASSKEY: 1709 assert self.passkey and self.confirm_value 1710 # Check that the random value matches what was committed to earlier 1711 confirm_verifier = crypto.f4( 1712 self.pka, 1713 self.pkb, 1714 command.random_value, 1715 bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]), 1716 ) 1717 if not self.check_expected_value( 1718 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1719 ): 1720 return 1721 1722 self.send_pairing_random_command() 1723 1724 # Move on to the next iteration 1725 self.passkey_step += 1 1726 logger.debug(f'passkey finished step {self.passkey_step} of 20') 1727 if self.passkey_step < 20: 1728 self.r = crypto.r() 1729 return 1730 else: 1731 return 1732 1733 # Compute the MacKey and LTK 1734 a = self.ia + bytes([self.iat]) 1735 b = self.ra + bytes([self.rat]) 1736 (mac_key, self.ltk) = crypto.f5(self.dh_key, self.na, self.nb, a, b) 1737 1738 # Compute the DH Key checks 1739 if self.pairing_method in ( 1740 PairingMethod.JUST_WORKS, 1741 PairingMethod.NUMERIC_COMPARISON, 1742 ): 1743 ra = bytes(16) 1744 rb = ra 1745 elif self.pairing_method == PairingMethod.PASSKEY: 1746 assert self.passkey 1747 ra = self.passkey.to_bytes(16, byteorder='little') 1748 rb = ra 1749 elif self.pairing_method == PairingMethod.OOB: 1750 if self.is_initiator: 1751 if self.peer_oob_data: 1752 rb = self.peer_oob_data.r 1753 ra = self.r 1754 else: 1755 rb = bytes(16) 1756 ra = self.r 1757 else: 1758 if self.peer_oob_data: 1759 ra = self.peer_oob_data.r 1760 rb = self.r 1761 else: 1762 ra = bytes(16) 1763 rb = self.r 1764 1765 else: 1766 return 1767 1768 assert self.preq and self.pres 1769 io_cap_a = self.preq[1:4] 1770 io_cap_b = self.pres[1:4] 1771 self.ea = crypto.f6(mac_key, self.na, self.nb, rb, io_cap_a, a, b) 1772 self.eb = crypto.f6(mac_key, self.nb, self.na, ra, io_cap_b, b, a) 1773 1774 # Next steps to be performed after possible user confirmation 1775 def next_steps() -> None: 1776 # The initiator sends the DH Key check to the responder 1777 if self.is_initiator: 1778 self.send_pairing_dhkey_check_command() 1779 else: 1780 if self.wait_before_continuing: 1781 self.wait_before_continuing.set_result(None) 1782 1783 # Prompt the user for confirmation if needed 1784 if self.pairing_method in ( 1785 PairingMethod.JUST_WORKS, 1786 PairingMethod.NUMERIC_COMPARISON, 1787 ): 1788 # Compute the 6-digit code 1789 code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000 1790 1791 # Ask for user confirmation 1792 self.wait_before_continuing = asyncio.get_running_loop().create_future() 1793 if self.pairing_method == PairingMethod.JUST_WORKS: 1794 self.prompt_user_for_confirmation(next_steps) 1795 else: 1796 self.prompt_user_for_numeric_comparison(code, next_steps) 1797 else: 1798 next_steps() 1799 1800 def on_smp_pairing_random_command( 1801 self, command: SMP_Pairing_Random_Command 1802 ) -> None: 1803 self.peer_random_value = command.random_value 1804 if self.sc: 1805 self.on_smp_pairing_random_command_secure_connections(command) 1806 else: 1807 self.on_smp_pairing_random_command_legacy(command) 1808 1809 def on_smp_pairing_public_key_command( 1810 self, command: SMP_Pairing_Public_Key_Command 1811 ) -> None: 1812 # Store the public key so that we can compute the confirmation value later 1813 self.peer_public_key_x = command.public_key_x 1814 self.peer_public_key_y = command.public_key_y 1815 1816 # Compute the DH key 1817 self.dh_key = self.ecc_key.dh( 1818 command.public_key_x[::-1], 1819 command.public_key_y[::-1], 1820 )[::-1] 1821 logger.debug(f'DH key: {self.dh_key.hex()}') 1822 1823 if self.pairing_method == PairingMethod.OOB: 1824 # Check against shared OOB data 1825 if self.peer_oob_data: 1826 confirm_verifier = crypto.f4( 1827 self.peer_public_key_x, 1828 self.peer_public_key_x, 1829 self.peer_oob_data.r, 1830 bytes(1), 1831 ) 1832 if not self.check_expected_value( 1833 self.peer_oob_data.c, 1834 confirm_verifier, 1835 SMP_CONFIRM_VALUE_FAILED_ERROR, 1836 ): 1837 return 1838 1839 if self.is_initiator: 1840 if self.pairing_method == PairingMethod.OOB: 1841 self.send_pairing_random_command() 1842 else: 1843 self.send_pairing_confirm_command() 1844 else: 1845 if self.pairing_method == PairingMethod.PASSKEY: 1846 self.display_or_input_passkey() 1847 1848 # Send our public key back to the initiator 1849 self.send_public_key_command() 1850 1851 if self.pairing_method in ( 1852 PairingMethod.JUST_WORKS, 1853 PairingMethod.NUMERIC_COMPARISON, 1854 PairingMethod.OOB, 1855 ): 1856 # We can now send the confirmation value 1857 self.send_pairing_confirm_command() 1858 1859 def on_smp_pairing_dhkey_check_command( 1860 self, command: SMP_Pairing_DHKey_Check_Command 1861 ) -> None: 1862 # Check that what we received matches what we computed earlier 1863 expected = self.eb if self.is_initiator else self.ea 1864 assert expected 1865 if not self.check_expected_value( 1866 expected, command.dhkey_check, SMP_DHKEY_CHECK_FAILED_ERROR 1867 ): 1868 return 1869 1870 if self.is_responder: 1871 if self.wait_before_continuing is not None: 1872 1873 async def next_steps() -> None: 1874 assert self.wait_before_continuing 1875 await self.wait_before_continuing 1876 self.wait_before_continuing = None 1877 self.send_pairing_dhkey_check_command() 1878 1879 self.connection.abort_on('disconnection', next_steps()) 1880 else: 1881 self.send_pairing_dhkey_check_command() 1882 else: 1883 self.start_encryption(self.ltk) 1884 1885 def on_smp_pairing_failed_command( 1886 self, command: SMP_Pairing_Failed_Command 1887 ) -> None: 1888 self.on_pairing_failure(command.reason) 1889 1890 def on_smp_encryption_information_command( 1891 self, command: SMP_Encryption_Information_Command 1892 ) -> None: 1893 self.peer_ltk = command.long_term_key 1894 self.check_key_distribution(SMP_Encryption_Information_Command) 1895 1896 def on_smp_master_identification_command( 1897 self, command: SMP_Master_Identification_Command 1898 ) -> None: 1899 self.peer_ediv = command.ediv 1900 self.peer_rand = command.rand 1901 self.check_key_distribution(SMP_Master_Identification_Command) 1902 1903 def on_smp_identity_information_command( 1904 self, command: SMP_Identity_Information_Command 1905 ) -> None: 1906 self.peer_identity_resolving_key = command.identity_resolving_key 1907 self.check_key_distribution(SMP_Identity_Information_Command) 1908 1909 def on_smp_identity_address_information_command( 1910 self, command: SMP_Identity_Address_Information_Command 1911 ) -> None: 1912 self.peer_bd_addr = command.bd_addr 1913 self.check_key_distribution(SMP_Identity_Address_Information_Command) 1914 1915 def on_smp_signing_information_command( 1916 self, command: SMP_Signing_Information_Command 1917 ) -> None: 1918 self.peer_signature_key = command.signature_key 1919 self.check_key_distribution(SMP_Signing_Information_Command) 1920 1921 1922# ----------------------------------------------------------------------------- 1923class Manager(EventEmitter): 1924 ''' 1925 Implements the Initiator and Responder roles of the Security Manager Protocol 1926 ''' 1927 1928 device: Device 1929 sessions: Dict[int, Session] 1930 pairing_config_factory: Callable[[Connection], PairingConfig] 1931 session_proxy: Type[Session] 1932 _ecc_key: Optional[crypto.EccKey] 1933 1934 def __init__( 1935 self, 1936 device: Device, 1937 pairing_config_factory: Callable[[Connection], PairingConfig], 1938 ) -> None: 1939 super().__init__() 1940 self.device = device 1941 self.sessions = {} 1942 self._ecc_key = None 1943 self.pairing_config_factory = pairing_config_factory 1944 self.session_proxy = Session 1945 1946 def send_command(self, connection: Connection, command: SMP_Command) -> None: 1947 logger.debug( 1948 f'>>> Sending SMP Command on connection [0x{connection.handle:04X}] ' 1949 f'{connection.peer_address}: {command}' 1950 ) 1951 cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID 1952 connection.send_l2cap_pdu(cid, command.to_bytes()) 1953 1954 def on_smp_security_request_command( 1955 self, connection: Connection, request: SMP_Security_Request_Command 1956 ) -> None: 1957 connection.emit('security_request', request.auth_req) 1958 1959 def on_smp_pdu(self, connection: Connection, pdu: bytes) -> None: 1960 # Parse the L2CAP payload into an SMP Command object 1961 command = SMP_Command.from_bytes(pdu) 1962 logger.debug( 1963 f'<<< Received SMP Command on connection [0x{connection.handle:04X}] ' 1964 f'{connection.peer_address}: {command}' 1965 ) 1966 1967 # Security request is more than just pairing, so let applications handle them 1968 if command.code == SMP_SECURITY_REQUEST_COMMAND: 1969 self.on_smp_security_request_command( 1970 connection, cast(SMP_Security_Request_Command, command) 1971 ) 1972 return 1973 1974 # Look for a session with this connection, and create one if none exists 1975 if not (session := self.sessions.get(connection.handle)): 1976 if connection.role == BT_CENTRAL_ROLE: 1977 logger.warning('Remote starts pairing as Peripheral!') 1978 pairing_config = self.pairing_config_factory(connection) 1979 session = self.session_proxy( 1980 self, connection, pairing_config, is_initiator=False 1981 ) 1982 self.sessions[connection.handle] = session 1983 1984 # Delegate the handling of the command to the session 1985 session.on_smp_command(command) 1986 1987 @property 1988 def ecc_key(self) -> crypto.EccKey: 1989 if self._ecc_key is None: 1990 self._ecc_key = crypto.EccKey.generate() 1991 assert self._ecc_key 1992 return self._ecc_key 1993 1994 async def pair(self, connection: Connection) -> None: 1995 # TODO: check if there's already a session for this connection 1996 if connection.role != BT_CENTRAL_ROLE: 1997 logger.warning('Start pairing as Peripheral!') 1998 pairing_config = self.pairing_config_factory(connection) 1999 session = self.session_proxy( 2000 self, connection, pairing_config, is_initiator=True 2001 ) 2002 self.sessions[connection.handle] = session 2003 return await session.pair() 2004 2005 def request_pairing(self, connection: Connection) -> None: 2006 pairing_config = self.pairing_config_factory(connection) 2007 if pairing_config: 2008 auth_req = smp_auth_req( 2009 pairing_config.bonding, 2010 pairing_config.mitm, 2011 pairing_config.sc, 2012 False, 2013 False, 2014 ) 2015 else: 2016 auth_req = 0 2017 self.send_command(connection, SMP_Security_Request_Command(auth_req=auth_req)) 2018 2019 def on_session_start(self, session: Session) -> None: 2020 self.device.on_pairing_start(session.connection) 2021 2022 async def on_pairing( 2023 self, session: Session, identity_address: Optional[Address], keys: PairingKeys 2024 ) -> None: 2025 # Store the keys in the key store 2026 if self.device.keystore and identity_address is not None: 2027 # Make sure on_pairing emits after key update. 2028 await self.device.update_keys(str(identity_address), keys) 2029 # Notify the device 2030 self.device.on_pairing(session.connection, identity_address, keys, session.sc) 2031 2032 def on_pairing_failure(self, session: Session, reason: int) -> None: 2033 self.device.on_pairing_failure(session.connection, reason) 2034 2035 def on_session_end(self, session: Session) -> None: 2036 logger.debug(f'session end for connection 0x{session.connection.handle:04X}') 2037 if session.connection.handle in self.sessions: 2038 del self.sessions[session.connection.handle] 2039 2040 def get_long_term_key( 2041 self, connection: Connection, rand: bytes, ediv: int 2042 ) -> Optional[bytes]: 2043 if session := self.sessions.get(connection.handle): 2044 return session.get_long_term_key(rand, ediv) 2045 2046 return None 2047