1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19 20import collections 21import collections.abc 22import logging 23import asyncio 24import dataclasses 25import enum 26import traceback 27import pyee 28import re 29from typing import ( 30 Dict, 31 List, 32 Union, 33 Set, 34 Any, 35 Optional, 36 Type, 37 Tuple, 38 ClassVar, 39 Iterable, 40 TYPE_CHECKING, 41) 42from typing_extensions import Self 43 44from bumble import at 45from bumble import device 46from bumble import rfcomm 47from bumble import sdp 48from bumble.colors import color 49from bumble.core import ( 50 ProtocolError, 51 BT_GENERIC_AUDIO_SERVICE, 52 BT_HANDSFREE_SERVICE, 53 BT_HANDSFREE_AUDIO_GATEWAY_SERVICE, 54 BT_L2CAP_PROTOCOL_ID, 55 BT_RFCOMM_PROTOCOL_ID, 56) 57from bumble.hci import ( 58 HCI_Enhanced_Setup_Synchronous_Connection_Command, 59 CodingFormat, 60 CodecID, 61) 62 63 64# ----------------------------------------------------------------------------- 65# Logging 66# ----------------------------------------------------------------------------- 67logger = logging.getLogger(__name__) 68 69# ----------------------------------------------------------------------------- 70# Error 71# ----------------------------------------------------------------------------- 72 73 74class HfpProtocolError(ProtocolError): 75 def __init__(self, error_name: str = '', details: str = ''): 76 super().__init__(None, 'hfp', error_name, details) 77 78 79# ----------------------------------------------------------------------------- 80# Protocol Support 81# ----------------------------------------------------------------------------- 82 83 84# ----------------------------------------------------------------------------- 85class HfpProtocol: 86 dlc: rfcomm.DLC 87 buffer: str 88 lines: collections.deque 89 lines_available: asyncio.Event 90 91 def __init__(self, dlc: rfcomm.DLC) -> None: 92 self.dlc = dlc 93 self.buffer = '' 94 self.lines = collections.deque() 95 self.lines_available = asyncio.Event() 96 97 dlc.sink = self.feed 98 99 def feed(self, data: Union[bytes, str]) -> None: 100 # Convert the data to a string if needed 101 if isinstance(data, bytes): 102 data = data.decode('utf-8') 103 104 logger.debug(f'<<< Data received: {data}') 105 106 # Add to the buffer and look for lines 107 self.buffer += data 108 while (separator := self.buffer.find('\r')) >= 0: 109 line = self.buffer[:separator].strip() 110 self.buffer = self.buffer[separator + 1 :] 111 if len(line) > 0: 112 self.on_line(line) 113 114 def on_line(self, line: str) -> None: 115 self.lines.append(line) 116 self.lines_available.set() 117 118 def send_command_line(self, line: str) -> None: 119 logger.debug(color(f'>>> {line}', 'yellow')) 120 self.dlc.write(line + '\r') 121 122 def send_response_line(self, line: str) -> None: 123 logger.debug(color(f'>>> {line}', 'yellow')) 124 self.dlc.write('\r\n' + line + '\r\n') 125 126 async def next_line(self) -> str: 127 await self.lines_available.wait() 128 line = self.lines.popleft() 129 if not self.lines: 130 self.lines_available.clear() 131 logger.debug(color(f'<<< {line}', 'green')) 132 return line 133 134 135# ----------------------------------------------------------------------------- 136# Normative protocol definitions 137# ----------------------------------------------------------------------------- 138 139 140class HfFeature(enum.IntFlag): 141 """ 142 HF supported features (AT+BRSF=) (normative). 143 144 Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. 145 """ 146 147 EC_NR = 0x001 # Echo Cancel & Noise reduction 148 THREE_WAY_CALLING = 0x002 149 CLI_PRESENTATION_CAPABILITY = 0x004 150 VOICE_RECOGNITION_ACTIVATION = 0x008 151 REMOTE_VOLUME_CONTROL = 0x010 152 ENHANCED_CALL_STATUS = 0x020 153 ENHANCED_CALL_CONTROL = 0x040 154 CODEC_NEGOTIATION = 0x080 155 HF_INDICATORS = 0x100 156 ESCO_S4_SETTINGS_SUPPORTED = 0x200 157 ENHANCED_VOICE_RECOGNITION_STATUS = 0x400 158 VOICE_RECOGNITION_TEST = 0x800 159 160 161class AgFeature(enum.IntFlag): 162 """ 163 AG supported features (+BRSF:) (normative). 164 165 Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. 166 """ 167 168 THREE_WAY_CALLING = 0x001 169 EC_NR = 0x002 # Echo Cancel & Noise reduction 170 VOICE_RECOGNITION_FUNCTION = 0x004 171 IN_BAND_RING_TONE_CAPABILITY = 0x008 172 VOICE_TAG = 0x010 # Attach a number to voice tag 173 REJECT_CALL = 0x020 # Ability to reject a call 174 ENHANCED_CALL_STATUS = 0x040 175 ENHANCED_CALL_CONTROL = 0x080 176 EXTENDED_ERROR_RESULT_CODES = 0x100 177 CODEC_NEGOTIATION = 0x200 178 HF_INDICATORS = 0x400 179 ESCO_S4_SETTINGS_SUPPORTED = 0x800 180 ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000 181 VOICE_RECOGNITION_TEST = 0x2000 182 183 184class AudioCodec(enum.IntEnum): 185 """ 186 Audio Codec IDs (normative). 187 188 Hands-Free Profile v1.9, 11 Appendix B 189 """ 190 191 CVSD = 0x01 # Support for CVSD audio codec 192 MSBC = 0x02 # Support for mSBC audio codec 193 LC3_SWB = 0x03 # Support for LC3-SWB audio codec 194 195 196class HfIndicator(enum.IntEnum): 197 """ 198 HF Indicators (normative). 199 200 Bluetooth Assigned Numbers, 6.10.1 HF Indicators. 201 """ 202 203 ENHANCED_SAFETY = 0x01 # Enhanced safety feature 204 BATTERY_LEVEL = 0x02 # Battery level feature 205 206 207class CallHoldOperation(enum.Enum): 208 """ 209 Call Hold supported operations (normative). 210 211 AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services. 212 """ 213 214 RELEASE_ALL_HELD_CALLS = "0" # Release all held calls 215 RELEASE_ALL_ACTIVE_CALLS = "1" # Release all active calls, accept other 216 RELEASE_SPECIFIC_CALL = "1x" # Release a specific call X 217 HOLD_ALL_ACTIVE_CALLS = "2" # Place all active calls on hold, accept other 218 HOLD_ALL_CALLS_EXCEPT = "2x" # Place all active calls except call X 219 ADD_HELD_CALL = "3" # Adds a held call to conversation 220 CONNECT_TWO_CALLS = ( 221 "4" # Connects the two calls and disconnects the subscriber from both calls 222 ) 223 224 225class ResponseHoldStatus(enum.IntEnum): 226 """ 227 Response Hold status (normative). 228 229 Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. 230 """ 231 232 INC_CALL_HELD = 0 # Put incoming call on hold 233 HELD_CALL_ACC = 1 # Accept a held incoming call 234 HELD_CALL_REJ = 2 # Reject a held incoming call 235 236 237class AgIndicator(enum.Enum): 238 """ 239 Values for the AG indicator (normative). 240 241 Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. 242 """ 243 244 SERVICE = 'service' 245 CALL = 'call' 246 CALL_SETUP = 'callsetup' 247 CALL_HELD = 'callheld' 248 SIGNAL = 'signal' 249 ROAM = 'roam' 250 BATTERY_CHARGE = 'battchg' 251 252 253class CallSetupAgIndicator(enum.IntEnum): 254 """ 255 Values for the Call Setup AG indicator (normative). 256 257 Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. 258 """ 259 260 NOT_IN_CALL_SETUP = 0 261 INCOMING_CALL_PROCESS = 1 262 OUTGOING_CALL_SETUP = 2 263 REMOTE_ALERTED = 3 # Remote party alerted in an outgoing call 264 265 266class CallHeldAgIndicator(enum.IntEnum): 267 """ 268 Values for the Call Held AG indicator (normative). 269 270 Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. 271 """ 272 273 NO_CALLS_HELD = 0 274 # Call is placed on hold or active/held calls swapped 275 # (The AG has both an active AND a held call) 276 CALL_ON_HOLD_AND_ACTIVE_CALL = 1 277 CALL_ON_HOLD_NO_ACTIVE_CALL = 2 # Call on hold, no active call 278 279 280class CallInfoDirection(enum.IntEnum): 281 """ 282 Call Info direction (normative). 283 284 AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls. 285 """ 286 287 MOBILE_ORIGINATED_CALL = 0 288 MOBILE_TERMINATED_CALL = 1 289 290 291class CallInfoStatus(enum.IntEnum): 292 """ 293 Call Info status (normative). 294 295 AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls. 296 """ 297 298 ACTIVE = 0 299 HELD = 1 300 DIALING = 2 301 ALERTING = 3 302 INCOMING = 4 303 WAITING = 5 304 305 306class CallInfoMode(enum.IntEnum): 307 """ 308 Call Info mode (normative). 309 310 AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls. 311 """ 312 313 VOICE = 0 314 DATA = 1 315 FAX = 2 316 UNKNOWN = 9 317 318 319class CallInfoMultiParty(enum.IntEnum): 320 """ 321 Call Info Multi-Party state (normative). 322 323 AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls. 324 """ 325 326 NOT_IN_CONFERENCE = 0 327 IN_CONFERENCE = 1 328 329 330@dataclasses.dataclass 331class CallInfo: 332 """ 333 Enhanced call status. 334 335 AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls. 336 """ 337 338 index: int 339 direction: CallInfoDirection 340 status: CallInfoStatus 341 mode: CallInfoMode 342 multi_party: CallInfoMultiParty 343 number: Optional[str] = None 344 type: Optional[int] = None 345 346 347@dataclasses.dataclass 348class CallLineIdentification: 349 """ 350 Calling Line Identification notification. 351 352 TS 127 007 - V6.8.0, 7.6 Calling line identification presentation +CLIP, but only 353 number, type and alpha are meaningful in HFP. 354 355 Attributes: 356 number: String type phone number of format specified by `type`. 357 type: Type of address octet in integer format (refer TS 24.008 [8] subclause 358 10.5.4.7). 359 subaddr: String type subaddress of format specified by `satype`. 360 satype: Type of subaddress octet in integer format (refer TS 24.008 [8] 361 subclause 10.5.4.8). 362 alpha: Optional string type alphanumeric representation of number corresponding 363 to the entry found in phonebook; used character set should be the one selected 364 with command Select TE Character Set +CSCS. 365 cli_validity: 0 CLI valid, 1 CLI has been withheld by the originator, 2 CLI is 366 not available due to interworking problems or limitations of originating 367 network. 368 """ 369 370 number: str 371 type: int 372 subaddr: Optional[str] = None 373 satype: Optional[int] = None 374 alpha: Optional[str] = None 375 cli_validity: Optional[int] = None 376 377 @classmethod 378 def parse_from(cls: Type[Self], parameters: List[bytes]) -> Self: 379 return cls( 380 number=parameters[0].decode(), 381 type=int(parameters[1]), 382 subaddr=parameters[2].decode() if len(parameters) >= 3 else None, 383 satype=( 384 int(parameters[3]) if len(parameters) >= 4 and parameters[3] else None 385 ), 386 alpha=parameters[4].decode() if len(parameters) >= 5 else None, 387 cli_validity=( 388 int(parameters[5]) if len(parameters) >= 6 and parameters[5] else None 389 ), 390 ) 391 392 def to_clip_string(self) -> str: 393 return ','.join( 394 str(arg) if arg else '' 395 for arg in [ 396 self.number, 397 self.type, 398 self.subaddr, 399 self.satype, 400 self.alpha, 401 self.cli_validity, 402 ] 403 ) 404 405 406class VoiceRecognitionState(enum.IntEnum): 407 """ 408 vrec values provided in AT+BVRA command. 409 410 Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. 411 """ 412 413 DISABLE = 0 414 ENABLE = 1 415 # (Enhanced Voice Recognition Status only) HF is ready to accept audio. 416 ENHANCED_READY = 2 417 418 419class CmeError(enum.IntEnum): 420 """ 421 CME ERROR codes (partial listed). 422 423 TS 127 007 - V6.8.0, 9.2.1 General errors 424 """ 425 426 PHONE_FAILURE = 0 427 OPERATION_NOT_ALLOWED = 3 428 OPERATION_NOT_SUPPORTED = 4 429 MEMORY_FULL = 20 430 INVALID_INDEX = 21 431 NOT_FOUND = 22 432 433 434# ----------------------------------------------------------------------------- 435# Hands-Free Control Interoperability Requirements 436# ----------------------------------------------------------------------------- 437 438# Response codes. 439RESPONSE_CODES = { 440 "+APLSIRI", 441 "+BAC", 442 "+BCC", 443 "+BCS", 444 "+BIA", 445 "+BIEV", 446 "+BIND", 447 "+BINP", 448 "+BLDN", 449 "+BRSF", 450 "+BTRH", 451 "+BVRA", 452 "+CCWA", 453 "+CHLD", 454 "+CHUP", 455 "+CIND", 456 "+CLCC", 457 "+CLIP", 458 "+CMEE", 459 "+CMER", 460 "+CNUM", 461 "+COPS", 462 "+IPHONEACCEV", 463 "+NREC", 464 "+VGM", 465 "+VGS", 466 "+VTS", 467 "+XAPL", 468 "A", 469 "D", 470} 471 472# Unsolicited responses and statuses. 473UNSOLICITED_CODES = { 474 "+APLSIRI", 475 "+BCS", 476 "+BIND", 477 "+BSIR", 478 "+BTRH", 479 "+BVRA", 480 "+CCWA", 481 "+CIEV", 482 "+CLIP", 483 "+VGM", 484 "+VGS", 485 "BLACKLISTED", 486 "BUSY", 487 "DELAYED", 488 "NO ANSWER", 489 "NO CARRIER", 490 "RING", 491} 492 493# Status codes 494STATUS_CODES = { 495 "+CME ERROR", 496 "BLACKLISTED", 497 "BUSY", 498 "DELAYED", 499 "ERROR", 500 "NO ANSWER", 501 "NO CARRIER", 502 "OK", 503} 504 505 506@dataclasses.dataclass 507class HfConfiguration: 508 supported_hf_features: List[HfFeature] 509 supported_hf_indicators: List[HfIndicator] 510 supported_audio_codecs: List[AudioCodec] 511 512 513@dataclasses.dataclass 514class AgConfiguration: 515 supported_ag_features: Iterable[AgFeature] 516 supported_ag_indicators: collections.abc.Sequence[AgIndicatorState] 517 supported_hf_indicators: Iterable[HfIndicator] 518 supported_ag_call_hold_operations: Iterable[CallHoldOperation] 519 supported_audio_codecs: Iterable[AudioCodec] 520 521 522class AtResponseType(enum.Enum): 523 """ 524 Indicates if a response is expected from an AT command, and if multiple responses are accepted. 525 """ 526 527 NONE = 0 528 SINGLE = 1 529 MULTIPLE = 2 530 531 532@dataclasses.dataclass 533class AtResponse: 534 code: str 535 parameters: list 536 537 @classmethod 538 def parse_from(cls: Type[Self], buffer: bytearray) -> Self: 539 code_and_parameters = buffer.split(b':') 540 parameters = ( 541 code_and_parameters[1] if len(code_and_parameters) > 1 else bytearray() 542 ) 543 return cls( 544 code=code_and_parameters[0].decode(), 545 parameters=at.parse_parameters(parameters), 546 ) 547 548 549@dataclasses.dataclass 550class AtCommand: 551 class SubCode(str, enum.Enum): 552 NONE = '' 553 SET = '=' 554 TEST = '=?' 555 READ = '?' 556 557 code: str 558 sub_code: SubCode 559 parameters: list 560 561 _PARSE_PATTERN: ClassVar[re.Pattern] = re.compile( 562 r'AT\+(?P<code>[A-Z]+)(?P<sub_code>=\?|=|\?)?(?P<parameters>.*)' 563 ) 564 565 @classmethod 566 def parse_from(cls: Type[Self], buffer: bytearray) -> Self: 567 if not (match := cls._PARSE_PATTERN.fullmatch(buffer.decode())): 568 if buffer.startswith(b'ATA'): 569 return cls(code='A', sub_code=AtCommand.SubCode.NONE, parameters=[]) 570 if buffer.startswith(b'ATD'): 571 return cls( 572 code='D', sub_code=AtCommand.SubCode.NONE, parameters=[buffer[3:]] 573 ) 574 raise HfpProtocolError('Invalid command') 575 576 parameters = [] 577 if parameters_text := match.group('parameters'): 578 parameters = at.parse_parameters(parameters_text.encode()) 579 580 return cls( 581 code=match.group('code'), 582 sub_code=AtCommand.SubCode(match.group('sub_code') or ''), 583 parameters=parameters, 584 ) 585 586 587@dataclasses.dataclass 588class AgIndicatorState: 589 """State wrapper of AG indicator. 590 591 Attributes: 592 indicator: Indicator of this indicator state. 593 supported_values: Supported values of this indicator. 594 current_status: Current status of this indicator. 595 index: (HF only) Index of this indicator. 596 enabled: (AG only) Whether this indicator is enabled to report. 597 on_test_text: Text message reported in AT+CIND=? of this indicator. 598 """ 599 600 indicator: AgIndicator 601 supported_values: Set[int] 602 current_status: int 603 index: Optional[int] = None 604 enabled: bool = True 605 606 @property 607 def on_test_text(self) -> str: 608 min_value = min(self.supported_values) 609 max_value = max(self.supported_values) 610 if len(self.supported_values) == (max_value - min_value + 1): 611 supported_values_text = f'({min_value}-{max_value})' 612 else: 613 supported_values_text = ( 614 f'({",".join(str(v) for v in self.supported_values)})' 615 ) 616 return f'(\"{self.indicator.value}\",{supported_values_text})' 617 618 @classmethod 619 def call(cls: Type[Self]) -> Self: 620 """Default call indicator state.""" 621 return cls( 622 indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0 623 ) 624 625 @classmethod 626 def callsetup(cls: Type[Self]) -> Self: 627 """Default callsetup indicator state.""" 628 return cls( 629 indicator=AgIndicator.CALL_SETUP, 630 supported_values={0, 1, 2, 3}, 631 current_status=0, 632 ) 633 634 @classmethod 635 def callheld(cls: Type[Self]) -> Self: 636 """Default call indicator state.""" 637 return cls( 638 indicator=AgIndicator.CALL_HELD, 639 supported_values={0, 1, 2}, 640 current_status=0, 641 ) 642 643 @classmethod 644 def service(cls: Type[Self]) -> Self: 645 """Default service indicator state.""" 646 return cls( 647 indicator=AgIndicator.SERVICE, supported_values={0, 1}, current_status=0 648 ) 649 650 @classmethod 651 def signal(cls: Type[Self]) -> Self: 652 """Default signal indicator state.""" 653 return cls( 654 indicator=AgIndicator.SIGNAL, 655 supported_values={0, 1, 2, 3, 4, 5}, 656 current_status=0, 657 ) 658 659 @classmethod 660 def roam(cls: Type[Self]) -> Self: 661 """Default roam indicator state.""" 662 return cls( 663 indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0 664 ) 665 666 @classmethod 667 def battchg(cls: Type[Self]) -> Self: 668 """Default battery charge indicator state.""" 669 return cls( 670 indicator=AgIndicator.BATTERY_CHARGE, 671 supported_values={0, 1, 2, 3, 4, 5}, 672 current_status=0, 673 ) 674 675 676@dataclasses.dataclass 677class HfIndicatorState: 678 """State wrapper of HF indicator. 679 680 Attributes: 681 indicator: Indicator of this indicator state. 682 supported: Whether this indicator is supported. 683 enabled: Whether this indicator is enabled. 684 current_status: Current (last-reported) status value of this indicaotr. 685 """ 686 687 indicator: HfIndicator 688 supported: bool = False 689 enabled: bool = False 690 current_status: int = 0 691 692 693class HfProtocol(pyee.EventEmitter): 694 """ 695 Implementation for the Hands-Free side of the Hands-Free profile. 696 697 Reference specification Hands-Free Profile v1.8. 698 699 Emitted events: 700 codec_negotiation: When codec is renegotiated, notify the new codec. 701 Args: 702 active_codec: AudioCodec 703 ag_indicator: When AG update their indicators, notify the new state. 704 Args: 705 ag_indicator: AgIndicator 706 speaker_volume: Emitted when AG update speaker volume autonomously. 707 Args: 708 volume: Int 709 microphone_volume: Emitted when AG update microphone volume autonomously. 710 Args: 711 volume: Int 712 microphone_volume: Emitted when AG sends a ringtone request. 713 Args: 714 None 715 cli_notification: Emitted when notify the call metadata on line. 716 Args: 717 cli_notification: CallLineIdentification 718 voice_recognition: Emitted when AG starts voice recognition autonomously. 719 Args: 720 vrec: VoiceRecognitionState 721 """ 722 723 class HfLoopTermination(HfpProtocolError): 724 """Termination signal for run() loop.""" 725 726 supported_hf_features: int 727 supported_audio_codecs: List[AudioCodec] 728 729 supported_ag_features: int 730 supported_ag_call_hold_operations: List[CallHoldOperation] 731 732 ag_indicators: List[AgIndicatorState] 733 hf_indicators: Dict[HfIndicator, HfIndicatorState] 734 735 dlc: rfcomm.DLC 736 command_lock: asyncio.Lock 737 if TYPE_CHECKING: 738 response_queue: asyncio.Queue[AtResponse] 739 unsolicited_queue: asyncio.Queue[Optional[AtResponse]] 740 else: 741 response_queue: asyncio.Queue 742 unsolicited_queue: asyncio.Queue 743 read_buffer: bytearray 744 active_codec: AudioCodec 745 746 def __init__( 747 self, 748 dlc: rfcomm.DLC, 749 configuration: HfConfiguration, 750 ) -> None: 751 super().__init__() 752 753 # Configure internal state. 754 self.dlc = dlc 755 self.command_lock = asyncio.Lock() 756 self.response_queue = asyncio.Queue() 757 self.unsolicited_queue = asyncio.Queue() 758 self.read_buffer = bytearray() 759 self.active_codec = AudioCodec.CVSD 760 self._slc_initialized = False 761 762 # Build local features. 763 self.supported_hf_features = sum(configuration.supported_hf_features) 764 self.supported_audio_codecs = configuration.supported_audio_codecs 765 766 self.hf_indicators = { 767 indicator: HfIndicatorState(indicator=indicator) 768 for indicator in configuration.supported_hf_indicators 769 } 770 771 # Clear remote features. 772 self.supported_ag_features = 0 773 self.supported_ag_call_hold_operations = [] 774 self.ag_indicators = [] 775 776 # Bind the AT reader to the RFCOMM channel. 777 self.dlc.sink = self._read_at 778 # Stop the run() loop when L2CAP is closed. 779 self.dlc.multiplexer.l2cap_channel.on( 780 'close', lambda: self.unsolicited_queue.put_nowait(None) 781 ) 782 783 def supports_hf_feature(self, feature: HfFeature) -> bool: 784 return (self.supported_hf_features & feature) != 0 785 786 def supports_ag_feature(self, feature: AgFeature) -> bool: 787 return (self.supported_ag_features & feature) != 0 788 789 def _read_at(self, data: bytes): 790 """ 791 Reads AT messages from the RFCOMM channel. 792 793 Enqueues AT commands, responses, unsolicited responses to their respective queues, and set the corresponding event. 794 """ 795 # Append to the read buffer. 796 self.read_buffer.extend(data) 797 798 # Locate header and trailer. 799 header = self.read_buffer.find(b'\r\n') 800 trailer = self.read_buffer.find(b'\r\n', header + 2) 801 if header == -1 or trailer == -1: 802 return 803 804 # Isolate the AT response code and parameters. 805 raw_response = self.read_buffer[header + 2 : trailer] 806 response = AtResponse.parse_from(raw_response) 807 logger.debug(f"<<< {raw_response.decode()}") 808 809 # Consume the response bytes. 810 self.read_buffer = self.read_buffer[trailer + 2 :] 811 812 # Forward the received code to the correct queue. 813 if self.command_lock.locked() and ( 814 response.code in STATUS_CODES or response.code in RESPONSE_CODES 815 ): 816 self.response_queue.put_nowait(response) 817 elif response.code in UNSOLICITED_CODES: 818 self.unsolicited_queue.put_nowait(response) 819 else: 820 logger.warning(f"dropping unexpected response with code '{response.code}'") 821 822 async def execute_command( 823 self, 824 cmd: str, 825 timeout: float = 1.0, 826 response_type: AtResponseType = AtResponseType.NONE, 827 ) -> Union[None, AtResponse, List[AtResponse]]: 828 """ 829 Sends an AT command and wait for the peer response. 830 Wait for the AT responses sent by the peer, to the status code. 831 832 Args: 833 cmd: the AT command in string to execute. 834 timeout: timeout in float seconds. 835 response_type: type of response. 836 837 Raises: 838 asyncio.TimeoutError: the status is not received after a timeout (default 1 second). 839 ProtocolError: the status is not OK. 840 """ 841 async with self.command_lock: 842 logger.debug(f">>> {cmd}") 843 self.dlc.write(cmd + '\r') 844 responses: List[AtResponse] = [] 845 846 while True: 847 result = await asyncio.wait_for( 848 self.response_queue.get(), timeout=timeout 849 ) 850 if result.code == 'OK': 851 if response_type == AtResponseType.SINGLE and len(responses) != 1: 852 raise HfpProtocolError("NO ANSWER") 853 854 if response_type == AtResponseType.MULTIPLE: 855 return responses 856 if response_type == AtResponseType.SINGLE: 857 return responses[0] 858 return None 859 if result.code in STATUS_CODES: 860 raise HfpProtocolError(result.code) 861 responses.append(result) 862 863 async def initiate_slc(self): 864 """4.2.1 Service Level Connection Initialization.""" 865 866 # 4.2.1.1 Supported features exchange 867 # First, in the initialization procedure, the HF shall send the 868 # AT+BRSF=<HF supported features> command to the AG to both notify 869 # the AG of the supported features in the HF, as well as to retrieve the 870 # supported features in the AG using the +BRSF result code. 871 response = await self.execute_command( 872 f"AT+BRSF={self.supported_hf_features}", response_type=AtResponseType.SINGLE 873 ) 874 875 self.supported_ag_features = int(response.parameters[0]) 876 logger.info(f"supported AG features: {self.supported_ag_features}") 877 for feature in AgFeature: 878 if self.supports_ag_feature(feature): 879 logger.info(f" - {feature.name}") 880 881 # 4.2.1.2 Codec Negotiation 882 # Secondly, in the initialization procedure, if the HF supports the 883 # Codec Negotiation feature, it shall check if the AT+BRSF command 884 # response from the AG has indicated that it supports the Codec 885 # Negotiation feature. 886 if self.supports_hf_feature( 887 HfFeature.CODEC_NEGOTIATION 888 ) and self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION): 889 # If both the HF and AG do support the Codec Negotiation feature 890 # then the HF shall send the AT+BAC=<HF available codecs> command to 891 # the AG to notify the AG of the available codecs in the HF. 892 codecs = [str(c.value) for c in self.supported_audio_codecs] 893 await self.execute_command(f"AT+BAC={','.join(codecs)}") 894 895 # 4.2.1.3 AG Indicators 896 # After having retrieved the supported features in the AG, the HF shall 897 # determine which indicators are supported by the AG, as well as the 898 # ordering of the supported indicators. This is because, according to 899 # the 3GPP 27.007 specification [2], the AG may support additional 900 # indicators not provided for by the Hands-Free Profile, and because the 901 # ordering of the indicators is implementation specific. The HF uses 902 # the AT+CIND=? Test command to retrieve information about the supported 903 # indicators and their ordering. 904 response = await self.execute_command( 905 "AT+CIND=?", response_type=AtResponseType.SINGLE 906 ) 907 908 self.ag_indicators = [] 909 for index, indicator in enumerate(response.parameters): 910 description = AgIndicator(indicator[0].decode()) 911 supported_values = [] 912 for value in indicator[1]: 913 value = value.split(b'-') 914 value = [int(v) for v in value] 915 value_min = value[0] 916 value_max = value[1] if len(value) > 1 else value[0] 917 supported_values.extend([v for v in range(value_min, value_max + 1)]) 918 919 self.ag_indicators.append( 920 AgIndicatorState(description, index, set(supported_values), 0) 921 ) 922 923 # Once the HF has the necessary supported indicator and ordering 924 # information, it shall retrieve the current status of the indicators 925 # in the AG using the AT+CIND? Read command. 926 response = await self.execute_command( 927 "AT+CIND?", response_type=AtResponseType.SINGLE 928 ) 929 930 for index, indicator in enumerate(response.parameters): 931 self.ag_indicators[index].current_status = int(indicator) 932 933 # After having retrieved the status of the indicators in the AG, the HF 934 # shall then enable the "Indicators status update" function in the AG by 935 # issuing the AT+CMER command, to which the AG shall respond with OK. 936 await self.execute_command("AT+CMER=3,,,1") 937 938 if self.supports_hf_feature( 939 HfFeature.THREE_WAY_CALLING 940 ) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING): 941 # After the HF has enabled the “Indicators status update” function in 942 # the AG, and if the “Call waiting and 3-way calling” bit was set in the 943 # supported features bitmap by both the HF and the AG, the HF shall 944 # issue the AT+CHLD=? test command to retrieve the information about how 945 # the call hold and multiparty services are supported in the AG. The HF 946 # shall not issue the AT+CHLD=? test command in case either the HF or 947 # the AG does not support the "Three-way calling" feature. 948 response = await self.execute_command( 949 "AT+CHLD=?", response_type=AtResponseType.SINGLE 950 ) 951 952 self.supported_ag_call_hold_operations = [ 953 CallHoldOperation(operation.decode()) 954 for operation in response.parameters[0] 955 ] 956 957 # 4.2.1.4 HF Indicators 958 # If the HF supports the HF indicator feature, it shall check the +BRSF 959 # response to see if the AG also supports the HF Indicator feature. 960 if self.supports_hf_feature( 961 HfFeature.HF_INDICATORS 962 ) and self.supports_ag_feature(AgFeature.HF_INDICATORS): 963 # If both the HF and AG support the HF Indicator feature, then the HF 964 # shall send the AT+BIND=<HF supported HF indicators> command to the AG 965 # to notify the AG of the supported indicators’ assigned numbers in the 966 # HF. The AG shall respond with OK 967 indicators = [str(i.value) for i in self.hf_indicators] 968 await self.execute_command(f"AT+BIND={','.join(indicators)}") 969 970 # After having provided the AG with the HF indicators it supports, 971 # the HF shall send the AT+BIND=? to request HF indicators supported 972 # by the AG. The AG shall reply with the +BIND response listing all 973 # HF indicators that it supports followed by an OK. 974 response = await self.execute_command( 975 "AT+BIND=?", response_type=AtResponseType.SINGLE 976 ) 977 978 logger.info("supported HF indicators:") 979 for indicator in response.parameters[0]: 980 indicator = HfIndicator(int(indicator)) 981 logger.info(f" - {indicator.name}") 982 if indicator in self.hf_indicators: 983 self.hf_indicators[indicator].supported = True 984 985 # Once the HF receives the supported HF indicators list from the AG, 986 # the HF shall send the AT+BIND? command to determine which HF 987 # indicators are enabled. The AG shall respond with one or more 988 # +BIND responses. The AG shall terminate the list with OK. 989 # (See Section 4.36.1.3). 990 responses = await self.execute_command( 991 "AT+BIND?", response_type=AtResponseType.MULTIPLE 992 ) 993 994 logger.info("enabled HF indicators:") 995 for response in responses: 996 indicator = HfIndicator(int(response.parameters[0])) 997 enabled = int(response.parameters[1]) != 0 998 logger.info(f" - {indicator.name}: {enabled}") 999 if indicator in self.hf_indicators: 1000 self.hf_indicators[indicator].enabled = True 1001 1002 logger.info("SLC setup completed") 1003 self._slc_initialized = True 1004 1005 async def setup_audio_connection(self): 1006 """4.11.2 Audio Connection Setup by HF.""" 1007 1008 # When the HF triggers the establishment of the Codec Connection it 1009 # shall send the AT command AT+BCC to the AG. The AG shall respond with 1010 # OK if it will start the Codec Connection procedure, and with ERROR 1011 # if it cannot start the Codec Connection procedure. 1012 await self.execute_command("AT+BCC") 1013 1014 async def setup_codec_connection(self, codec_id: int): 1015 """4.11.3 Codec Connection Setup.""" 1016 # The AG shall send a +BCS=<Codec ID> unsolicited response to the HF. 1017 # The HF shall then respond to the incoming unsolicited response with 1018 # the AT command AT+BCS=<Codec ID>. The ID shall be the same as in the 1019 # unsolicited response code as long as the ID is supported. 1020 # If the received ID is not available, the HF shall respond with 1021 # AT+BAC with its available codecs. 1022 if codec_id not in self.supported_audio_codecs: 1023 codecs = [str(c) for c in self.supported_audio_codecs] 1024 await self.execute_command(f"AT+BAC={','.join(codecs)}") 1025 return 1026 1027 await self.execute_command(f"AT+BCS={codec_id}") 1028 1029 # After sending the OK response, the AG shall open the 1030 # Synchronous Connection with the settings that are determined by the 1031 # ID. The HF shall be ready to accept the synchronous connection 1032 # establishment as soon as it has sent the AT commands AT+BCS=<Codec ID>. 1033 self.active_codec = AudioCodec(codec_id) 1034 self.emit('codec_negotiation', self.active_codec) 1035 1036 logger.info("codec connection setup completed") 1037 1038 async def answer_incoming_call(self): 1039 """4.13.1 Answer Incoming Call from the HF - In-Band Ringing.""" 1040 # The user accepts the incoming voice call by using the proper means 1041 # provided by the HF. The HF shall then send the ATA command 1042 # (see Section 4.34) to the AG. The AG shall then begin the procedure for 1043 # accepting the incoming call. 1044 await self.execute_command("ATA") 1045 1046 async def reject_incoming_call(self): 1047 """4.14.1 Reject an Incoming Call from the HF.""" 1048 # The user rejects the incoming call by using the User Interface on the 1049 # Hands-Free unit. The HF shall then send the AT+CHUP command 1050 # (see Section 4.34) to the AG. This may happen at any time during the 1051 # procedures described in Sections 4.13.1 and 4.13.2. 1052 await self.execute_command("AT+CHUP") 1053 1054 async def terminate_call(self): 1055 """4.15.1 Terminate a Call Process from the HF.""" 1056 # The user may abort the ongoing call process using whatever means 1057 # provided by the Hands-Free unit. The HF shall send AT+CHUP command 1058 # (see Section 4.34) to the AG, and the AG shall then start the 1059 # procedure to terminate or interrupt the current call procedure. 1060 # The AG shall then send the OK indication followed by the +CIEV result 1061 # code, with the value indicating (call=0). 1062 await self.execute_command("AT+CHUP") 1063 1064 async def query_current_calls(self) -> List[CallInfo]: 1065 """4.32.1 Query List of Current Calls in AG. 1066 1067 Return: 1068 List of current calls in AG. 1069 """ 1070 responses = await self.execute_command( 1071 "AT+CLCC", response_type=AtResponseType.MULTIPLE 1072 ) 1073 assert isinstance(responses, list) 1074 1075 calls = [] 1076 for response in responses: 1077 call_info = CallInfo( 1078 index=int(response.parameters[0]), 1079 direction=CallInfoDirection(int(response.parameters[1])), 1080 status=CallInfoStatus(int(response.parameters[2])), 1081 mode=CallInfoMode(int(response.parameters[3])), 1082 multi_party=CallInfoMultiParty(int(response.parameters[4])), 1083 ) 1084 if len(response.parameters) >= 6: 1085 call_info.number = response.parameters[5].decode() 1086 if len(response.parameters) >= 7: 1087 call_info.type = int(response.parameters[6]) 1088 calls.append(call_info) 1089 return calls 1090 1091 async def update_ag_indicator(self, index: int, value: int): 1092 # CIEV is in 1-index, while ag_indicators is in 0-index. 1093 ag_indicator = self.ag_indicators[index - 1] 1094 ag_indicator.current_status = value 1095 self.emit('ag_indicator', ag_indicator) 1096 logger.info(f"AG indicator updated: {ag_indicator.indicator}, {value}") 1097 1098 async def handle_unsolicited(self): 1099 """Handle unsolicited result codes sent by the audio gateway.""" 1100 result = await self.unsolicited_queue.get() 1101 if not result: 1102 raise HfProtocol.HfLoopTermination() 1103 if result.code == "+BCS": 1104 await self.setup_codec_connection(int(result.parameters[0])) 1105 elif result.code == "+CIEV": 1106 await self.update_ag_indicator( 1107 int(result.parameters[0]), int(result.parameters[1]) 1108 ) 1109 elif result.code == "+VGS": 1110 self.emit('speaker_volume', int(result.parameters[0])) 1111 elif result.code == "+VGM": 1112 self.emit('microphone_volume', int(result.parameters[0])) 1113 elif result.code == "RING": 1114 self.emit('ring') 1115 elif result.code == "+CLIP": 1116 self.emit( 1117 'cli_notification', CallLineIdentification.parse_from(result.parameters) 1118 ) 1119 elif result.code == "+BVRA": 1120 # TODO: Support Enhanced Voice Recognition. 1121 self.emit( 1122 'voice_recognition', VoiceRecognitionState(int(result.parameters[0])) 1123 ) 1124 else: 1125 logging.info(f"unhandled unsolicited response {result.code}") 1126 1127 async def run(self): 1128 """ 1129 Main routine for the Hands-Free side of the HFP protocol. 1130 1131 Initiates the service level connection then loops handling unsolicited AG responses. 1132 """ 1133 1134 try: 1135 if not self._slc_initialized: 1136 await self.initiate_slc() 1137 while True: 1138 await self.handle_unsolicited() 1139 except HfProtocol.HfLoopTermination: 1140 logger.info('Loop terminated') 1141 except Exception: 1142 logger.error("HFP-HF protocol failed with the following error:") 1143 logger.error(traceback.format_exc()) 1144 1145 1146class AgProtocol(pyee.EventEmitter): 1147 """ 1148 Implementation for the Audio-Gateway side of the Hands-Free profile. 1149 1150 Reference specification Hands-Free Profile v1.8. 1151 1152 Emitted events: 1153 slc_complete: Emit when SLC procedure is completed. 1154 codec_negotiation: When codec is renegotiated, notify the new codec. 1155 Args: 1156 active_codec: AudioCodec 1157 hf_indicator: When HF update their indicators, notify the new state. 1158 Args: 1159 hf_indicator: HfIndicatorState 1160 codec_connection_request: Emit when HF sends AT+BCC to request codec connection. 1161 answer: Emit when HF sends ATA to answer phone call. 1162 hang_up: Emit when HF sends AT+CHUP to hang up phone call. 1163 dial: Emit when HF sends ATD to dial phone call. 1164 voice_recognition: Emit when HF requests voice recognition state. 1165 Args: 1166 vrec: VoiceRecognitionState 1167 call_hold: Emit when HF requests call hold operation. 1168 Args: 1169 operation: CallHoldOperation 1170 call_index: Optional[int] 1171 speaker_volume: Emitted when AG update speaker volume autonomously. 1172 Args: 1173 volume: Int 1174 microphone_volume: Emitted when AG update microphone volume autonomously. 1175 Args: 1176 volume: Int 1177 """ 1178 1179 supported_hf_features: int 1180 supported_hf_indicators: Set[HfIndicator] 1181 supported_audio_codecs: List[AudioCodec] 1182 1183 supported_ag_features: int 1184 supported_ag_call_hold_operations: List[CallHoldOperation] 1185 1186 ag_indicators: List[AgIndicatorState] 1187 hf_indicators: collections.OrderedDict[HfIndicator, HfIndicatorState] 1188 1189 dlc: rfcomm.DLC 1190 1191 read_buffer: bytearray 1192 active_codec: AudioCodec 1193 calls: List[CallInfo] 1194 1195 indicator_report_enabled: bool 1196 inband_ringtone_enabled: bool 1197 cme_error_enabled: bool 1198 cli_notification_enabled: bool 1199 call_waiting_enabled: bool 1200 _remained_slc_setup_features: Set[HfFeature] 1201 1202 def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None: 1203 super().__init__() 1204 1205 # Configure internal state. 1206 self.dlc = dlc 1207 self.read_buffer = bytearray() 1208 self.active_codec = AudioCodec.CVSD 1209 self.calls = [] 1210 1211 # Build local features. 1212 self.supported_ag_features = sum(configuration.supported_ag_features) 1213 self.supported_ag_call_hold_operations = list( 1214 configuration.supported_ag_call_hold_operations 1215 ) 1216 self.ag_indicators = list(configuration.supported_ag_indicators) 1217 self.supported_hf_indicators = set(configuration.supported_hf_indicators) 1218 self.inband_ringtone_enabled = True 1219 self._remained_slc_setup_features = set() 1220 1221 # Clear remote features. 1222 self.supported_hf_features = 0 1223 self.supported_audio_codecs = [] 1224 self.indicator_report_enabled = False 1225 self.cme_error_enabled = False 1226 self.cli_notification_enabled = False 1227 self.call_waiting_enabled = False 1228 1229 self.hf_indicators = collections.OrderedDict() 1230 1231 # Bind the AT reader to the RFCOMM channel. 1232 self.dlc.sink = self._read_at 1233 1234 def supports_hf_feature(self, feature: HfFeature) -> bool: 1235 return (self.supported_hf_features & feature) != 0 1236 1237 def supports_ag_feature(self, feature: AgFeature) -> bool: 1238 return (self.supported_ag_features & feature) != 0 1239 1240 def _read_at(self, data: bytes): 1241 """ 1242 Reads AT messages from the RFCOMM channel. 1243 """ 1244 # Append to the read buffer. 1245 self.read_buffer.extend(data) 1246 1247 # Locate the trailer. 1248 trailer = self.read_buffer.find(b'\r') 1249 if trailer == -1: 1250 return 1251 1252 # Isolate the AT response code and parameters. 1253 raw_command = self.read_buffer[:trailer] 1254 command = AtCommand.parse_from(raw_command) 1255 logger.debug(f"<<< {raw_command.decode()}") 1256 1257 # Consume the response bytes. 1258 self.read_buffer = self.read_buffer[trailer + 1 :] 1259 1260 if command.sub_code == AtCommand.SubCode.TEST: 1261 handler_name = f'_on_{command.code.lower()}_test' 1262 elif command.sub_code == AtCommand.SubCode.READ: 1263 handler_name = f'_on_{command.code.lower()}_read' 1264 else: 1265 handler_name = f'_on_{command.code.lower()}' 1266 1267 if handler := getattr(self, handler_name, None): 1268 handler(*command.parameters) 1269 else: 1270 logger.warning('Handler %s not found', handler_name) 1271 self.send_response('ERROR') 1272 1273 def send_response(self, response: str) -> None: 1274 """Sends an AT response.""" 1275 self.dlc.write(f'\r\n{response}\r\n') 1276 1277 def send_cme_error(self, error_code: CmeError) -> None: 1278 """Sends an CME ERROR response. 1279 1280 If CME Error is not enabled by HF, sends ERROR instead. 1281 """ 1282 if self.cme_error_enabled: 1283 self.send_response(f'+CME ERROR: {error_code.value}') 1284 else: 1285 self.send_error() 1286 1287 def send_ok(self) -> None: 1288 """Sends an OK response.""" 1289 self.send_response('OK') 1290 1291 def send_error(self) -> None: 1292 """Sends an ERROR response.""" 1293 self.send_response('ERROR') 1294 1295 def set_inband_ringtone_enabled(self, enabled: bool) -> None: 1296 """Enables or disables in-band ringtone.""" 1297 1298 self.inband_ringtone_enabled = enabled 1299 self.send_response(f'+BSIR: {1 if enabled else 0}') 1300 1301 def set_speaker_volume(self, level: int) -> None: 1302 """Reports speaker volume.""" 1303 1304 self.send_response(f'+VGS: {level}') 1305 1306 def set_microphone_volume(self, level: int) -> None: 1307 """Reports microphone volume.""" 1308 1309 self.send_response(f'+VGM: {level}') 1310 1311 def send_ring(self) -> None: 1312 """Sends RING command to trigger ringtone on HF.""" 1313 1314 self.send_response('RING') 1315 1316 def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None: 1317 """Updates AG indicator. 1318 1319 Args: 1320 indicator: Name of the indicator. 1321 value: new value of the indicator. 1322 """ 1323 1324 search_result = next( 1325 ( 1326 (index, state) 1327 for index, state in enumerate(self.ag_indicators) 1328 if state.indicator == indicator 1329 ), 1330 None, 1331 ) 1332 if not search_result: 1333 raise KeyError(f'{indicator} is not supported.') 1334 1335 index, indicator_state = search_result 1336 if not self.indicator_report_enabled: 1337 logger.warning('AG indicator report is disabled') 1338 if not indicator_state.enabled: 1339 logger.warning(f'AG indicator {indicator} is disabled') 1340 1341 indicator_state.current_status = value 1342 self.send_response(f'+CIEV: {index+1},{value}') 1343 1344 async def negotiate_codec(self, codec: AudioCodec) -> None: 1345 """Starts codec negotiation.""" 1346 1347 if not self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION): 1348 logger.warning('Local does not support Codec Negotiation') 1349 if not self.supports_hf_feature(HfFeature.CODEC_NEGOTIATION): 1350 logger.warning('Peer does not support Codec Negotiation') 1351 if codec not in self.supported_audio_codecs: 1352 logger.warning(f'{codec} is not supported by peer') 1353 1354 at_bcs_future = asyncio.get_running_loop().create_future() 1355 self.once('codec_negotiation', at_bcs_future.set_result) 1356 self.send_response(f'+BCS: {codec.value}') 1357 if (new_codec := await at_bcs_future) != codec: 1358 raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}') 1359 1360 def send_cli_notification(self, cli: CallLineIdentification) -> None: 1361 """Sends +CLIP CLI notification.""" 1362 1363 if not self.cli_notification_enabled: 1364 logger.warning('Try to send CLIP while CLI notification is not enabled') 1365 1366 self.send_response(f'+CLIP: {cli.to_clip_string()}') 1367 1368 def _check_remained_slc_commands(self) -> None: 1369 if not self._remained_slc_setup_features: 1370 self.emit('slc_complete') 1371 1372 def _on_brsf(self, hf_features: bytes) -> None: 1373 self.supported_hf_features = int(hf_features) 1374 self.send_response(f'+BRSF: {self.supported_ag_features}') 1375 self.send_ok() 1376 1377 if self.supports_hf_feature( 1378 HfFeature.HF_INDICATORS 1379 ) and self.supports_ag_feature(AgFeature.HF_INDICATORS): 1380 self._remained_slc_setup_features.add(HfFeature.HF_INDICATORS) 1381 1382 if self.supports_hf_feature( 1383 HfFeature.THREE_WAY_CALLING 1384 ) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING): 1385 self._remained_slc_setup_features.add(HfFeature.THREE_WAY_CALLING) 1386 1387 def _on_bac(self, *args) -> None: 1388 self.supported_audio_codecs = [AudioCodec(int(value)) for value in args] 1389 self.send_ok() 1390 1391 def _on_bcs(self, codec: bytes) -> None: 1392 self.active_codec = AudioCodec(int(codec)) 1393 self.send_ok() 1394 self.emit('codec_negotiation', self.active_codec) 1395 1396 def _on_bvra(self, vrec: bytes) -> None: 1397 self.send_ok() 1398 self.emit('voice_recognition', VoiceRecognitionState(int(vrec))) 1399 1400 def _on_chld(self, operation_code: bytes) -> None: 1401 call_index: Optional[int] = None 1402 if len(operation_code) > 1: 1403 call_index = int(operation_code[1:]) 1404 operation_code = operation_code[:1] + b'x' 1405 try: 1406 operation = CallHoldOperation(operation_code.decode()) 1407 except: 1408 logger.error(f'Invalid operation: {operation_code.decode()}') 1409 self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED) 1410 return 1411 1412 if operation not in self.supported_ag_call_hold_operations: 1413 logger.error(f'Unsupported operation: {operation_code.decode()}') 1414 self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED) 1415 1416 if call_index is not None and not any( 1417 call.index == call_index for call in self.calls 1418 ): 1419 logger.error(f'No matching call {call_index}') 1420 self.send_cme_error(CmeError.INVALID_INDEX) 1421 1422 # Real three-way calls have more complicated situations, but this is not a popular issue - let users to handle the remaining :) 1423 1424 self.send_ok() 1425 self.emit('call_hold', operation, call_index) 1426 1427 def _on_chld_test(self) -> None: 1428 if not self.supports_ag_feature(AgFeature.THREE_WAY_CALLING): 1429 self.send_error() 1430 return 1431 1432 self.send_response( 1433 '+CHLD: ({})'.format( 1434 ','.join( 1435 operation.value 1436 for operation in self.supported_ag_call_hold_operations 1437 ) 1438 ) 1439 ) 1440 self.send_ok() 1441 self._remained_slc_setup_features.remove(HfFeature.THREE_WAY_CALLING) 1442 self._check_remained_slc_commands() 1443 1444 def _on_cind_test(self) -> None: 1445 if not self.ag_indicators: 1446 self.send_cme_error(CmeError.NOT_FOUND) 1447 return 1448 1449 indicator_list_str = ",".join( 1450 indicator.on_test_text for indicator in self.ag_indicators 1451 ) 1452 self.send_response(f'+CIND: {indicator_list_str}') 1453 self.send_ok() 1454 1455 def _on_cind_read(self) -> None: 1456 if not self.ag_indicators: 1457 self.send_cme_error(CmeError.NOT_FOUND) 1458 return 1459 1460 indicator_list_str = ",".join( 1461 str(indicator.current_status) for indicator in self.ag_indicators 1462 ) 1463 self.send_response(f'+CIND: {indicator_list_str}') 1464 self.send_ok() 1465 1466 self._check_remained_slc_commands() 1467 1468 def _on_cmer( 1469 self, 1470 mode: bytes, 1471 keypad: Optional[bytes] = None, 1472 display: Optional[bytes] = None, 1473 indicator: bytes = b'', 1474 ) -> None: 1475 if ( 1476 int(mode) != 3 1477 or (keypad and int(keypad)) 1478 or (display and int(display)) 1479 or int(indicator) not in (0, 1) 1480 ): 1481 logger.error( 1482 f'Unexpected values: mode={mode!r}, keypad={keypad!r}, ' 1483 f'display={display!r}, indicator={indicator!r}' 1484 ) 1485 self.send_cme_error(CmeError.INVALID_INDEX) 1486 1487 self.indicator_report_enabled = bool(int(indicator)) 1488 self.send_ok() 1489 1490 def _on_cmee(self, enabled: bytes) -> None: 1491 self.cme_error_enabled = bool(int(enabled)) 1492 self.send_ok() 1493 1494 def _on_ccwa(self, enabled: bytes) -> None: 1495 self.call_waiting_enabled = bool(int(enabled)) 1496 self.send_ok() 1497 1498 def _on_bind(self, *args) -> None: 1499 if not self.supports_ag_feature(AgFeature.HF_INDICATORS): 1500 self.send_error() 1501 return 1502 1503 peer_supported_indicators = set( 1504 HfIndicator(int(indicator)) for indicator in args 1505 ) 1506 self.hf_indicators = collections.OrderedDict( 1507 { 1508 indicator: HfIndicatorState(indicator=indicator) 1509 for indicator in self.supported_hf_indicators.intersection( 1510 peer_supported_indicators 1511 ) 1512 } 1513 ) 1514 self.send_ok() 1515 1516 def _on_bind_test(self) -> None: 1517 if not self.supports_ag_feature(AgFeature.HF_INDICATORS): 1518 self.send_error() 1519 return 1520 1521 hf_indicator_list_str = ",".join( 1522 str(indicator.value) for indicator in self.supported_hf_indicators 1523 ) 1524 self.send_response(f'+BIND: ({hf_indicator_list_str})') 1525 self.send_ok() 1526 1527 def _on_bind_read(self) -> None: 1528 if not self.supports_ag_feature(AgFeature.HF_INDICATORS): 1529 self.send_error() 1530 return 1531 1532 for indicator in self.hf_indicators: 1533 self.send_response(f'+BIND: {indicator.value},1') 1534 1535 self.send_ok() 1536 1537 self._remained_slc_setup_features.remove(HfFeature.HF_INDICATORS) 1538 self._check_remained_slc_commands() 1539 1540 def _on_biev(self, index_bytes: bytes, value_bytes: bytes) -> None: 1541 if not self.supports_ag_feature(AgFeature.HF_INDICATORS): 1542 self.send_error() 1543 return 1544 1545 index = HfIndicator(int(index_bytes)) 1546 if index not in self.hf_indicators: 1547 self.send_error() 1548 return 1549 1550 self.hf_indicators[index].current_status = int(value_bytes) 1551 self.emit('hf_indicator', self.hf_indicators[index]) 1552 self.send_ok() 1553 1554 def _on_bia(self, *args) -> None: 1555 for enabled, state in zip(args, self.ag_indicators): 1556 state.enabled = bool(int(enabled)) 1557 self.send_ok() 1558 1559 def _on_bcc(self) -> None: 1560 self.emit('codec_connection_request') 1561 self.send_ok() 1562 1563 def _on_a(self) -> None: 1564 """ATA handler.""" 1565 self.emit('answer') 1566 self.send_ok() 1567 1568 def _on_d(self, number: bytes) -> None: 1569 """ATD handler.""" 1570 self.emit('dial', number.decode()) 1571 self.send_ok() 1572 1573 def _on_chup(self) -> None: 1574 self.emit('hang_up') 1575 self.send_ok() 1576 1577 def _on_clcc(self) -> None: 1578 for call in self.calls: 1579 number_text = f',\"{call.number}\"' if call.number is not None else '' 1580 type_text = f',{call.type}' if call.type is not None else '' 1581 response = ( 1582 f'+CLCC: {call.index}' 1583 f',{call.direction.value}' 1584 f',{call.status.value}' 1585 f',{call.mode.value}' 1586 f',{call.multi_party.value}' 1587 f'{number_text}' 1588 f'{type_text}' 1589 ) 1590 self.send_response(response) 1591 self.send_ok() 1592 1593 def _on_clip(self, enabled: bytes) -> None: 1594 if not self.supports_hf_feature(HfFeature.CLI_PRESENTATION_CAPABILITY): 1595 logger.error('Remote doesn not support CLI but sends AT+CLIP') 1596 self.cli_notification_enabled = True if enabled == b'1' else False 1597 self.send_ok() 1598 1599 def _on_vgs(self, level: bytes) -> None: 1600 self.emit('speaker_volume', int(level)) 1601 self.send_ok() 1602 1603 def _on_vgm(self, level: bytes) -> None: 1604 self.emit('microphone_volume', int(level)) 1605 self.send_ok() 1606 1607 1608# ----------------------------------------------------------------------------- 1609# Normative SDP definitions 1610# ----------------------------------------------------------------------------- 1611 1612 1613class ProfileVersion(enum.IntEnum): 1614 """ 1615 Profile version (normative). 1616 1617 Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements. 1618 """ 1619 1620 V1_5 = 0x0105 1621 V1_6 = 0x0106 1622 V1_7 = 0x0107 1623 V1_8 = 0x0108 1624 V1_9 = 0x0109 1625 1626 1627class HfSdpFeature(enum.IntFlag): 1628 """ 1629 HF supported features (normative). 1630 1631 Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements. 1632 """ 1633 1634 EC_NR = 0x01 # Echo Cancel & Noise reduction 1635 THREE_WAY_CALLING = 0x02 1636 CLI_PRESENTATION_CAPABILITY = 0x04 1637 VOICE_RECOGNITION_ACTIVATION = 0x08 1638 REMOTE_VOLUME_CONTROL = 0x10 1639 WIDE_BAND = 0x20 # Wide band speech 1640 ENHANCED_VOICE_RECOGNITION_STATUS = 0x40 1641 VOICE_RECOGNITION_TEST = 0x80 1642 1643 1644class AgSdpFeature(enum.IntFlag): 1645 """ 1646 AG supported features (normative). 1647 1648 Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements. 1649 """ 1650 1651 THREE_WAY_CALLING = 0x01 1652 EC_NR = 0x02 # Echo Cancel & Noise reduction 1653 VOICE_RECOGNITION_FUNCTION = 0x04 1654 IN_BAND_RING_TONE_CAPABILITY = 0x08 1655 VOICE_TAG = 0x10 # Attach a number to voice tag 1656 WIDE_BAND = 0x20 # Wide band speech 1657 ENHANCED_VOICE_RECOGNITION_STATUS = 0x40 1658 VOICE_RECOGNITION_TEST = 0x80 1659 1660 1661def make_hf_sdp_records( 1662 service_record_handle: int, 1663 rfcomm_channel: int, 1664 configuration: HfConfiguration, 1665 version: ProfileVersion = ProfileVersion.V1_8, 1666) -> List[sdp.ServiceAttribute]: 1667 """ 1668 Generates the SDP record for HFP Hands-Free support. 1669 1670 The record exposes the features supported in the input configuration, 1671 and the allocated RFCOMM channel. 1672 """ 1673 1674 hf_supported_features = 0 1675 1676 if HfFeature.EC_NR in configuration.supported_hf_features: 1677 hf_supported_features |= HfSdpFeature.EC_NR 1678 if HfFeature.THREE_WAY_CALLING in configuration.supported_hf_features: 1679 hf_supported_features |= HfSdpFeature.THREE_WAY_CALLING 1680 if HfFeature.CLI_PRESENTATION_CAPABILITY in configuration.supported_hf_features: 1681 hf_supported_features |= HfSdpFeature.CLI_PRESENTATION_CAPABILITY 1682 if HfFeature.VOICE_RECOGNITION_ACTIVATION in configuration.supported_hf_features: 1683 hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_ACTIVATION 1684 if HfFeature.REMOTE_VOLUME_CONTROL in configuration.supported_hf_features: 1685 hf_supported_features |= HfSdpFeature.REMOTE_VOLUME_CONTROL 1686 if ( 1687 HfFeature.ENHANCED_VOICE_RECOGNITION_STATUS 1688 in configuration.supported_hf_features 1689 ): 1690 hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS 1691 if HfFeature.VOICE_RECOGNITION_TEST in configuration.supported_hf_features: 1692 hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEST 1693 1694 if AudioCodec.MSBC in configuration.supported_audio_codecs: 1695 hf_supported_features |= HfSdpFeature.WIDE_BAND 1696 1697 return [ 1698 sdp.ServiceAttribute( 1699 sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, 1700 sdp.DataElement.unsigned_integer_32(service_record_handle), 1701 ), 1702 sdp.ServiceAttribute( 1703 sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 1704 sdp.DataElement.sequence( 1705 [ 1706 sdp.DataElement.uuid(BT_HANDSFREE_SERVICE), 1707 sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), 1708 ] 1709 ), 1710 ), 1711 sdp.ServiceAttribute( 1712 sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 1713 sdp.DataElement.sequence( 1714 [ 1715 sdp.DataElement.sequence( 1716 [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)] 1717 ), 1718 sdp.DataElement.sequence( 1719 [ 1720 sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), 1721 sdp.DataElement.unsigned_integer_8(rfcomm_channel), 1722 ] 1723 ), 1724 ] 1725 ), 1726 ), 1727 sdp.ServiceAttribute( 1728 sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 1729 sdp.DataElement.sequence( 1730 [ 1731 sdp.DataElement.sequence( 1732 [ 1733 sdp.DataElement.uuid(BT_HANDSFREE_SERVICE), 1734 sdp.DataElement.unsigned_integer_16(version), 1735 ] 1736 ) 1737 ] 1738 ), 1739 ), 1740 sdp.ServiceAttribute( 1741 sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, 1742 sdp.DataElement.unsigned_integer_16(hf_supported_features), 1743 ), 1744 ] 1745 1746 1747def make_ag_sdp_records( 1748 service_record_handle: int, 1749 rfcomm_channel: int, 1750 configuration: AgConfiguration, 1751 version: ProfileVersion = ProfileVersion.V1_8, 1752) -> List[sdp.ServiceAttribute]: 1753 """ 1754 Generates the SDP record for HFP Audio-Gateway support. 1755 1756 The record exposes the features supported in the input configuration, 1757 and the allocated RFCOMM channel. 1758 """ 1759 1760 ag_supported_features = 0 1761 1762 if AgFeature.EC_NR in configuration.supported_ag_features: 1763 ag_supported_features |= AgSdpFeature.EC_NR 1764 if AgFeature.THREE_WAY_CALLING in configuration.supported_ag_features: 1765 ag_supported_features |= AgSdpFeature.THREE_WAY_CALLING 1766 if ( 1767 AgFeature.ENHANCED_VOICE_RECOGNITION_STATUS 1768 in configuration.supported_ag_features 1769 ): 1770 ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS 1771 if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features: 1772 ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST 1773 if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features: 1774 ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY 1775 if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features: 1776 ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION 1777 if AudioCodec.MSBC in configuration.supported_audio_codecs: 1778 ag_supported_features |= AgSdpFeature.WIDE_BAND 1779 1780 return [ 1781 sdp.ServiceAttribute( 1782 sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, 1783 sdp.DataElement.unsigned_integer_32(service_record_handle), 1784 ), 1785 sdp.ServiceAttribute( 1786 sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 1787 sdp.DataElement.sequence( 1788 [ 1789 sdp.DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), 1790 sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), 1791 ] 1792 ), 1793 ), 1794 sdp.ServiceAttribute( 1795 sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 1796 sdp.DataElement.sequence( 1797 [ 1798 sdp.DataElement.sequence( 1799 [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)] 1800 ), 1801 sdp.DataElement.sequence( 1802 [ 1803 sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), 1804 sdp.DataElement.unsigned_integer_8(rfcomm_channel), 1805 ] 1806 ), 1807 ] 1808 ), 1809 ), 1810 sdp.ServiceAttribute( 1811 sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 1812 sdp.DataElement.sequence( 1813 [ 1814 sdp.DataElement.sequence( 1815 [ 1816 sdp.DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), 1817 sdp.DataElement.unsigned_integer_16(version), 1818 ] 1819 ) 1820 ] 1821 ), 1822 ), 1823 sdp.ServiceAttribute( 1824 sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, 1825 sdp.DataElement.unsigned_integer_16(ag_supported_features), 1826 ), 1827 ] 1828 1829 1830async def find_hf_sdp_record( 1831 connection: device.Connection, 1832) -> Optional[Tuple[int, ProfileVersion, HfSdpFeature]]: 1833 """Searches a Hands-Free SDP record from remote device. 1834 1835 Args: 1836 connection: ACL connection to make SDP search. 1837 1838 Returns: 1839 Tuple of (<RFCOMM channel>, <Profile Version>, <HF SDP features>) 1840 """ 1841 async with sdp.Client(connection) as sdp_client: 1842 search_result = await sdp_client.search_attributes( 1843 uuids=[BT_HANDSFREE_SERVICE], 1844 attribute_ids=[ 1845 sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 1846 sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 1847 sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, 1848 sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 1849 ], 1850 ) 1851 for attribute_lists in search_result: 1852 channel: Optional[int] = None 1853 version: Optional[ProfileVersion] = None 1854 features: Optional[HfSdpFeature] = None 1855 for attribute in attribute_lists: 1856 # The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]]. 1857 if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: 1858 protocol_descriptor_list = attribute.value.value 1859 channel = protocol_descriptor_list[1].value[1].value 1860 elif ( 1861 attribute.id 1862 == sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID 1863 ): 1864 profile_descriptor_list = attribute.value.value 1865 version = ProfileVersion(profile_descriptor_list[0].value[1].value) 1866 elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: 1867 features = HfSdpFeature(attribute.value.value) 1868 elif attribute.id == sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID: 1869 class_id_list = attribute.value.value 1870 uuid = class_id_list[0].value 1871 # AG record may also contain HF UUID in its profile descriptor list. 1872 # If found, skip this record. 1873 if uuid == BT_HANDSFREE_AUDIO_GATEWAY_SERVICE: 1874 channel, version, features = (None, None, None) 1875 break 1876 1877 if channel is not None and version is not None and features is not None: 1878 return (channel, version, features) 1879 return None 1880 1881 1882async def find_ag_sdp_record( 1883 connection: device.Connection, 1884) -> Optional[Tuple[int, ProfileVersion, AgSdpFeature]]: 1885 """Searches an Audio-Gateway SDP record from remote device. 1886 1887 Args: 1888 connection: ACL connection to make SDP search. 1889 1890 Returns: 1891 Tuple of (<RFCOMM channel>, <Profile Version>, <AG SDP features>) 1892 """ 1893 async with sdp.Client(connection) as sdp_client: 1894 search_result = await sdp_client.search_attributes( 1895 uuids=[BT_HANDSFREE_AUDIO_GATEWAY_SERVICE], 1896 attribute_ids=[ 1897 sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 1898 sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 1899 sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, 1900 ], 1901 ) 1902 for attribute_lists in search_result: 1903 channel: Optional[int] = None 1904 version: Optional[ProfileVersion] = None 1905 features: Optional[AgSdpFeature] = None 1906 for attribute in attribute_lists: 1907 # The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]]. 1908 if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: 1909 protocol_descriptor_list = attribute.value.value 1910 channel = protocol_descriptor_list[1].value[1].value 1911 elif ( 1912 attribute.id 1913 == sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID 1914 ): 1915 profile_descriptor_list = attribute.value.value 1916 version = ProfileVersion(profile_descriptor_list[0].value[1].value) 1917 elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: 1918 features = AgSdpFeature(attribute.value.value) 1919 if not channel or not version or features is None: 1920 logger.warning(f"Bad result {attribute_lists}.") 1921 return None 1922 return (channel, version, features) 1923 return None 1924 1925 1926# ----------------------------------------------------------------------------- 1927# ESCO Codec Default Parameters 1928# ----------------------------------------------------------------------------- 1929 1930 1931# Hands-Free Profile v1.8, 5.7 Codec Interoperability Requirements 1932class DefaultCodecParameters(enum.IntEnum): 1933 SCO_CVSD_D0 = enum.auto() 1934 SCO_CVSD_D1 = enum.auto() 1935 ESCO_CVSD_S1 = enum.auto() 1936 ESCO_CVSD_S2 = enum.auto() 1937 ESCO_CVSD_S3 = enum.auto() 1938 ESCO_CVSD_S4 = enum.auto() 1939 ESCO_MSBC_T1 = enum.auto() 1940 ESCO_MSBC_T2 = enum.auto() 1941 1942 1943@dataclasses.dataclass 1944class EscoParameters: 1945 # Codec specific 1946 transmit_coding_format: CodingFormat 1947 receive_coding_format: CodingFormat 1948 packet_type: HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType 1949 retransmission_effort: ( 1950 HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort 1951 ) 1952 max_latency: int 1953 1954 # Common 1955 input_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM) 1956 output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM) 1957 input_coded_data_size: int = 16 1958 output_coded_data_size: int = 16 1959 input_pcm_data_format: ( 1960 HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat 1961 ) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT 1962 output_pcm_data_format: ( 1963 HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat 1964 ) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT 1965 input_pcm_sample_payload_msb_position: int = 0 1966 output_pcm_sample_payload_msb_position: int = 0 1967 input_data_path: HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath = ( 1968 HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath.HCI 1969 ) 1970 output_data_path: HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath = ( 1971 HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath.HCI 1972 ) 1973 input_transport_unit_size: int = 0 1974 output_transport_unit_size: int = 0 1975 input_bandwidth: int = 16000 1976 output_bandwidth: int = 16000 1977 transmit_bandwidth: int = 8000 1978 receive_bandwidth: int = 8000 1979 transmit_codec_frame_size: int = 60 1980 receive_codec_frame_size: int = 60 1981 1982 def asdict(self) -> Dict[str, Any]: 1983 # dataclasses.asdict() will recursively deep-copy the entire object, 1984 # which is expensive and breaks CodingFormat object, so let it simply copy here. 1985 return self.__dict__ 1986 1987 1988_ESCO_PARAMETERS_CVSD_D0 = EscoParameters( 1989 transmit_coding_format=CodingFormat(CodecID.CVSD), 1990 receive_coding_format=CodingFormat(CodecID.CVSD), 1991 max_latency=0xFFFF, 1992 packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV1, 1993 retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION, 1994) 1995 1996_ESCO_PARAMETERS_CVSD_D1 = EscoParameters( 1997 transmit_coding_format=CodingFormat(CodecID.CVSD), 1998 receive_coding_format=CodingFormat(CodecID.CVSD), 1999 max_latency=0xFFFF, 2000 packet_type=HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.HV3, 2001 retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.NO_RETRANSMISSION, 2002) 2003 2004_ESCO_PARAMETERS_CVSD_S1 = EscoParameters( 2005 transmit_coding_format=CodingFormat(CodecID.CVSD), 2006 receive_coding_format=CodingFormat(CodecID.CVSD), 2007 max_latency=0x0007, 2008 packet_type=( 2009 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 2010 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV3 2011 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3 2012 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 2013 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 2014 ), 2015 retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_POWER, 2016) 2017 2018_ESCO_PARAMETERS_CVSD_S2 = EscoParameters( 2019 transmit_coding_format=CodingFormat(CodecID.CVSD), 2020 receive_coding_format=CodingFormat(CodecID.CVSD), 2021 max_latency=0x0007, 2022 packet_type=( 2023 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 2024 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3 2025 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 2026 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 2027 ), 2028 retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_POWER, 2029) 2030 2031_ESCO_PARAMETERS_CVSD_S3 = EscoParameters( 2032 transmit_coding_format=CodingFormat(CodecID.CVSD), 2033 receive_coding_format=CodingFormat(CodecID.CVSD), 2034 max_latency=0x000A, 2035 packet_type=( 2036 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 2037 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3 2038 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 2039 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 2040 ), 2041 retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_POWER, 2042) 2043 2044_ESCO_PARAMETERS_CVSD_S4 = EscoParameters( 2045 transmit_coding_format=CodingFormat(CodecID.CVSD), 2046 receive_coding_format=CodingFormat(CodecID.CVSD), 2047 max_latency=0x000C, 2048 packet_type=( 2049 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 2050 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3 2051 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 2052 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 2053 ), 2054 retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY, 2055) 2056 2057_ESCO_PARAMETERS_MSBC_T1 = EscoParameters( 2058 transmit_coding_format=CodingFormat(CodecID.MSBC), 2059 receive_coding_format=CodingFormat(CodecID.MSBC), 2060 max_latency=0x0008, 2061 packet_type=( 2062 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 2063 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3 2064 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 2065 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 2066 ), 2067 input_bandwidth=32000, 2068 output_bandwidth=32000, 2069 retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY, 2070) 2071 2072_ESCO_PARAMETERS_MSBC_T2 = EscoParameters( 2073 transmit_coding_format=CodingFormat(CodecID.MSBC), 2074 receive_coding_format=CodingFormat(CodecID.MSBC), 2075 max_latency=0x000D, 2076 packet_type=( 2077 HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.EV3 2078 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV3 2079 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV3 2080 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 2081 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 2082 ), 2083 input_bandwidth=32000, 2084 output_bandwidth=32000, 2085 retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY, 2086) 2087 2088ESCO_PARAMETERS = { 2089 DefaultCodecParameters.SCO_CVSD_D0: _ESCO_PARAMETERS_CVSD_D0, 2090 DefaultCodecParameters.SCO_CVSD_D1: _ESCO_PARAMETERS_CVSD_D1, 2091 DefaultCodecParameters.ESCO_CVSD_S1: _ESCO_PARAMETERS_CVSD_S1, 2092 DefaultCodecParameters.ESCO_CVSD_S2: _ESCO_PARAMETERS_CVSD_S2, 2093 DefaultCodecParameters.ESCO_CVSD_S3: _ESCO_PARAMETERS_CVSD_S3, 2094 DefaultCodecParameters.ESCO_CVSD_S4: _ESCO_PARAMETERS_CVSD_S4, 2095 DefaultCodecParameters.ESCO_MSBC_T1: _ESCO_PARAMETERS_MSBC_T1, 2096 DefaultCodecParameters.ESCO_MSBC_T2: _ESCO_PARAMETERS_MSBC_T2, 2097} 2098