1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for 13 14"""LE Audio - Audio Stream Control Service""" 15 16# ----------------------------------------------------------------------------- 17# Imports 18# ----------------------------------------------------------------------------- 19from __future__ import annotations 20import enum 21import logging 22import struct 23from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union 24 25from bumble import colors 26from bumble.profiles.bap import CodecSpecificConfiguration 27from bumble.profiles import le_audio 28from bumble import device 29from bumble import gatt 30from bumble import gatt_client 31from bumble import hci 32 33# ----------------------------------------------------------------------------- 34# Logging 35# ----------------------------------------------------------------------------- 36logger = logging.getLogger(__name__) 37 38 39# ----------------------------------------------------------------------------- 40# ASE Operations 41# ----------------------------------------------------------------------------- 42 43 44class ASE_Operation: 45 ''' 46 See Audio Stream Control Service - 5 ASE Control operations. 47 ''' 48 49 classes: Dict[int, Type[ASE_Operation]] = {} 50 op_code: int 51 name: str 52 fields: Optional[Sequence[Any]] = None 53 ase_id: List[int] 54 55 class Opcode(enum.IntEnum): 56 # fmt: off 57 CONFIG_CODEC = 0x01 58 CONFIG_QOS = 0x02 59 ENABLE = 0x03 60 RECEIVER_START_READY = 0x04 61 DISABLE = 0x05 62 RECEIVER_STOP_READY = 0x06 63 UPDATE_METADATA = 0x07 64 RELEASE = 0x08 65 66 @staticmethod 67 def from_bytes(pdu: bytes) -> ASE_Operation: 68 op_code = pdu[0] 69 70 cls = ASE_Operation.classes.get(op_code) 71 if cls is None: 72 instance = ASE_Operation(pdu) 73 instance.name = ASE_Operation.Opcode(op_code).name 74 instance.op_code = op_code 75 return instance 76 self = cls.__new__(cls) 77 ASE_Operation.__init__(self, pdu) 78 if self.fields is not None: 79 self.init_from_bytes(pdu, 1) 80 return self 81 82 @staticmethod 83 def subclass(fields): 84 def inner(cls: Type[ASE_Operation]): 85 try: 86 operation = ASE_Operation.Opcode[cls.__name__[4:].upper()] 87 cls.name = operation.name 88 cls.op_code = operation 89 except: 90 raise KeyError(f'PDU name {cls.name} not found in Ase_Operation.Opcode') 91 cls.fields = fields 92 93 # Register a factory for this class 94 ASE_Operation.classes[cls.op_code] = cls 95 96 return cls 97 98 return inner 99 100 def __init__(self, pdu: Optional[bytes] = None, **kwargs) -> None: 101 if self.fields is not None and kwargs: 102 hci.HCI_Object.init_from_fields(self, self.fields, kwargs) 103 if pdu is None: 104 pdu = bytes([self.op_code]) + hci.HCI_Object.dict_to_bytes( 105 kwargs, self.fields 106 ) 107 self.pdu = pdu 108 109 def init_from_bytes(self, pdu: bytes, offset: int): 110 return hci.HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 111 112 def __bytes__(self) -> bytes: 113 return self.pdu 114 115 def __str__(self) -> str: 116 result = f'{colors.color(self.name, "yellow")} ' 117 if fields := getattr(self, 'fields', None): 118 result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, ' ') 119 else: 120 if len(self.pdu) > 1: 121 result += f': {self.pdu.hex()}' 122 return result 123 124 125@ASE_Operation.subclass( 126 [ 127 [ 128 ('ase_id', 1), 129 ('target_latency', 1), 130 ('target_phy', 1), 131 ('codec_id', hci.CodingFormat.parse_from_bytes), 132 ('codec_specific_configuration', 'v'), 133 ], 134 ] 135) 136class ASE_Config_Codec(ASE_Operation): 137 ''' 138 See Audio Stream Control Service 5.1 - Config Codec Operation 139 ''' 140 141 target_latency: List[int] 142 target_phy: List[int] 143 codec_id: List[hci.CodingFormat] 144 codec_specific_configuration: List[bytes] 145 146 147@ASE_Operation.subclass( 148 [ 149 [ 150 ('ase_id', 1), 151 ('cig_id', 1), 152 ('cis_id', 1), 153 ('sdu_interval', 3), 154 ('framing', 1), 155 ('phy', 1), 156 ('max_sdu', 2), 157 ('retransmission_number', 1), 158 ('max_transport_latency', 2), 159 ('presentation_delay', 3), 160 ], 161 ] 162) 163class ASE_Config_QOS(ASE_Operation): 164 ''' 165 See Audio Stream Control Service 5.2 - Config Qos Operation 166 ''' 167 168 cig_id: List[int] 169 cis_id: List[int] 170 sdu_interval: List[int] 171 framing: List[int] 172 phy: List[int] 173 max_sdu: List[int] 174 retransmission_number: List[int] 175 max_transport_latency: List[int] 176 presentation_delay: List[int] 177 178 179@ASE_Operation.subclass([[('ase_id', 1), ('metadata', 'v')]]) 180class ASE_Enable(ASE_Operation): 181 ''' 182 See Audio Stream Control Service 5.3 - Enable Operation 183 ''' 184 185 metadata: bytes 186 187 188@ASE_Operation.subclass([[('ase_id', 1)]]) 189class ASE_Receiver_Start_Ready(ASE_Operation): 190 ''' 191 See Audio Stream Control Service 5.4 - Receiver Start Ready Operation 192 ''' 193 194 195@ASE_Operation.subclass([[('ase_id', 1)]]) 196class ASE_Disable(ASE_Operation): 197 ''' 198 See Audio Stream Control Service 5.5 - Disable Operation 199 ''' 200 201 202@ASE_Operation.subclass([[('ase_id', 1)]]) 203class ASE_Receiver_Stop_Ready(ASE_Operation): 204 ''' 205 See Audio Stream Control Service 5.6 - Receiver Stop Ready Operation 206 ''' 207 208 209@ASE_Operation.subclass([[('ase_id', 1), ('metadata', 'v')]]) 210class ASE_Update_Metadata(ASE_Operation): 211 ''' 212 See Audio Stream Control Service 5.7 - Update Metadata Operation 213 ''' 214 215 metadata: List[bytes] 216 217 218@ASE_Operation.subclass([[('ase_id', 1)]]) 219class ASE_Release(ASE_Operation): 220 ''' 221 See Audio Stream Control Service 5.8 - Release Operation 222 ''' 223 224 225class AseResponseCode(enum.IntEnum): 226 # fmt: off 227 SUCCESS = 0x00 228 UNSUPPORTED_OPCODE = 0x01 229 INVALID_LENGTH = 0x02 230 INVALID_ASE_ID = 0x03 231 INVALID_ASE_STATE_MACHINE_TRANSITION = 0x04 232 INVALID_ASE_DIRECTION = 0x05 233 UNSUPPORTED_AUDIO_CAPABILITIES = 0x06 234 UNSUPPORTED_CONFIGURATION_PARAMETER_VALUE = 0x07 235 REJECTED_CONFIGURATION_PARAMETER_VALUE = 0x08 236 INVALID_CONFIGURATION_PARAMETER_VALUE = 0x09 237 UNSUPPORTED_METADATA = 0x0A 238 REJECTED_METADATA = 0x0B 239 INVALID_METADATA = 0x0C 240 INSUFFICIENT_RESOURCES = 0x0D 241 UNSPECIFIED_ERROR = 0x0E 242 243 244class AseReasonCode(enum.IntEnum): 245 # fmt: off 246 NONE = 0x00 247 CODEC_ID = 0x01 248 CODEC_SPECIFIC_CONFIGURATION = 0x02 249 SDU_INTERVAL = 0x03 250 FRAMING = 0x04 251 PHY = 0x05 252 MAXIMUM_SDU_SIZE = 0x06 253 RETRANSMISSION_NUMBER = 0x07 254 MAX_TRANSPORT_LATENCY = 0x08 255 PRESENTATION_DELAY = 0x09 256 INVALID_ASE_CIS_MAPPING = 0x0A 257 258 259# ----------------------------------------------------------------------------- 260class AudioRole(enum.IntEnum): 261 SINK = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST 262 SOURCE = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER 263 264 265# ----------------------------------------------------------------------------- 266class AseStateMachine(gatt.Characteristic): 267 class State(enum.IntEnum): 268 # fmt: off 269 IDLE = 0x00 270 CODEC_CONFIGURED = 0x01 271 QOS_CONFIGURED = 0x02 272 ENABLING = 0x03 273 STREAMING = 0x04 274 DISABLING = 0x05 275 RELEASING = 0x06 276 277 cis_link: Optional[device.CisLink] = None 278 279 # Additional parameters in CODEC_CONFIGURED State 280 preferred_framing = 0 # Unframed PDU supported 281 preferred_phy = 0 282 preferred_retransmission_number = 13 283 preferred_max_transport_latency = 100 284 supported_presentation_delay_min = 0 285 supported_presentation_delay_max = 0 286 preferred_presentation_delay_min = 0 287 preferred_presentation_delay_max = 0 288 codec_id = hci.CodingFormat(hci.CodecID.LC3) 289 codec_specific_configuration: Union[CodecSpecificConfiguration, bytes] = b'' 290 291 # Additional parameters in QOS_CONFIGURED State 292 cig_id = 0 293 cis_id = 0 294 sdu_interval = 0 295 framing = 0 296 phy = 0 297 max_sdu = 0 298 retransmission_number = 0 299 max_transport_latency = 0 300 presentation_delay = 0 301 302 # Additional parameters in ENABLING, STREAMING, DISABLING State 303 metadata = le_audio.Metadata() 304 305 def __init__( 306 self, 307 role: AudioRole, 308 ase_id: int, 309 service: AudioStreamControlService, 310 ) -> None: 311 self.service = service 312 self.ase_id = ase_id 313 self._state = AseStateMachine.State.IDLE 314 self.role = role 315 316 uuid = ( 317 gatt.GATT_SINK_ASE_CHARACTERISTIC 318 if role == AudioRole.SINK 319 else gatt.GATT_SOURCE_ASE_CHARACTERISTIC 320 ) 321 super().__init__( 322 uuid=uuid, 323 properties=gatt.Characteristic.Properties.READ 324 | gatt.Characteristic.Properties.NOTIFY, 325 permissions=gatt.Characteristic.Permissions.READABLE, 326 value=gatt.CharacteristicValue(read=self.on_read), 327 ) 328 329 self.service.device.on('cis_request', self.on_cis_request) 330 self.service.device.on('cis_establishment', self.on_cis_establishment) 331 332 def on_cis_request( 333 self, 334 acl_connection: device.Connection, 335 cis_handle: int, 336 cig_id: int, 337 cis_id: int, 338 ) -> None: 339 if ( 340 cig_id == self.cig_id 341 and cis_id == self.cis_id 342 and self.state == self.State.ENABLING 343 ): 344 acl_connection.abort_on( 345 'flush', self.service.device.accept_cis_request(cis_handle) 346 ) 347 348 def on_cis_establishment(self, cis_link: device.CisLink) -> None: 349 if ( 350 cis_link.cig_id == self.cig_id 351 and cis_link.cis_id == self.cis_id 352 and self.state == self.State.ENABLING 353 ): 354 cis_link.on('disconnection', self.on_cis_disconnection) 355 356 async def post_cis_established(): 357 await self.service.device.send_command( 358 hci.HCI_LE_Setup_ISO_Data_Path_Command( 359 connection_handle=cis_link.handle, 360 data_path_direction=self.role, 361 data_path_id=0x00, # Fixed HCI 362 codec_id=hci.CodingFormat(hci.CodecID.TRANSPARENT), 363 controller_delay=0, 364 codec_configuration=b'', 365 ) 366 ) 367 if self.role == AudioRole.SINK: 368 self.state = self.State.STREAMING 369 await self.service.device.notify_subscribers(self, self.value) 370 371 cis_link.acl_connection.abort_on('flush', post_cis_established()) 372 self.cis_link = cis_link 373 374 def on_cis_disconnection(self, _reason) -> None: 375 self.cis_link = None 376 377 def on_config_codec( 378 self, 379 target_latency: int, 380 target_phy: int, 381 codec_id: hci.CodingFormat, 382 codec_specific_configuration: bytes, 383 ) -> Tuple[AseResponseCode, AseReasonCode]: 384 if self.state not in ( 385 self.State.IDLE, 386 self.State.CODEC_CONFIGURED, 387 self.State.QOS_CONFIGURED, 388 ): 389 return ( 390 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 391 AseReasonCode.NONE, 392 ) 393 394 self.max_transport_latency = target_latency 395 self.phy = target_phy 396 self.codec_id = codec_id 397 if codec_id.codec_id == hci.CodecID.VENDOR_SPECIFIC: 398 self.codec_specific_configuration = codec_specific_configuration 399 else: 400 self.codec_specific_configuration = CodecSpecificConfiguration.from_bytes( 401 codec_specific_configuration 402 ) 403 404 self.state = self.State.CODEC_CONFIGURED 405 406 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 407 408 def on_config_qos( 409 self, 410 cig_id: int, 411 cis_id: int, 412 sdu_interval: int, 413 framing: int, 414 phy: int, 415 max_sdu: int, 416 retransmission_number: int, 417 max_transport_latency: int, 418 presentation_delay: int, 419 ) -> Tuple[AseResponseCode, AseReasonCode]: 420 if self.state not in ( 421 AseStateMachine.State.CODEC_CONFIGURED, 422 AseStateMachine.State.QOS_CONFIGURED, 423 ): 424 return ( 425 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 426 AseReasonCode.NONE, 427 ) 428 429 self.cig_id = cig_id 430 self.cis_id = cis_id 431 self.sdu_interval = sdu_interval 432 self.framing = framing 433 self.phy = phy 434 self.max_sdu = max_sdu 435 self.retransmission_number = retransmission_number 436 self.max_transport_latency = max_transport_latency 437 self.presentation_delay = presentation_delay 438 439 self.state = self.State.QOS_CONFIGURED 440 441 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 442 443 def on_enable(self, metadata: bytes) -> Tuple[AseResponseCode, AseReasonCode]: 444 if self.state != AseStateMachine.State.QOS_CONFIGURED: 445 return ( 446 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 447 AseReasonCode.NONE, 448 ) 449 450 self.metadata = le_audio.Metadata.from_bytes(metadata) 451 self.state = self.State.ENABLING 452 453 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 454 455 def on_receiver_start_ready(self) -> Tuple[AseResponseCode, AseReasonCode]: 456 if self.state != AseStateMachine.State.ENABLING: 457 return ( 458 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 459 AseReasonCode.NONE, 460 ) 461 self.state = self.State.STREAMING 462 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 463 464 def on_disable(self) -> Tuple[AseResponseCode, AseReasonCode]: 465 if self.state not in ( 466 AseStateMachine.State.ENABLING, 467 AseStateMachine.State.STREAMING, 468 ): 469 return ( 470 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 471 AseReasonCode.NONE, 472 ) 473 if self.role == AudioRole.SINK: 474 self.state = self.State.QOS_CONFIGURED 475 else: 476 self.state = self.State.DISABLING 477 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 478 479 def on_receiver_stop_ready(self) -> Tuple[AseResponseCode, AseReasonCode]: 480 if ( 481 self.role != AudioRole.SOURCE 482 or self.state != AseStateMachine.State.DISABLING 483 ): 484 return ( 485 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 486 AseReasonCode.NONE, 487 ) 488 self.state = self.State.QOS_CONFIGURED 489 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 490 491 def on_update_metadata( 492 self, metadata: bytes 493 ) -> Tuple[AseResponseCode, AseReasonCode]: 494 if self.state not in ( 495 AseStateMachine.State.ENABLING, 496 AseStateMachine.State.STREAMING, 497 ): 498 return ( 499 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 500 AseReasonCode.NONE, 501 ) 502 self.metadata = le_audio.Metadata.from_bytes(metadata) 503 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 504 505 def on_release(self) -> Tuple[AseResponseCode, AseReasonCode]: 506 if self.state == AseStateMachine.State.IDLE: 507 return ( 508 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 509 AseReasonCode.NONE, 510 ) 511 self.state = self.State.RELEASING 512 513 async def remove_cis_async(): 514 await self.service.device.send_command( 515 hci.HCI_LE_Remove_ISO_Data_Path_Command( 516 connection_handle=self.cis_link.handle, 517 data_path_direction=self.role, 518 ) 519 ) 520 self.state = self.State.IDLE 521 await self.service.device.notify_subscribers(self, self.value) 522 523 self.service.device.abort_on('flush', remove_cis_async()) 524 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 525 526 @property 527 def state(self) -> State: 528 return self._state 529 530 @state.setter 531 def state(self, new_state: State) -> None: 532 logger.debug(f'{self} state change -> {colors.color(new_state.name, "cyan")}') 533 self._state = new_state 534 self.emit('state_change') 535 536 @property 537 def value(self): 538 '''Returns ASE_ID, ASE_STATE, and ASE Additional Parameters.''' 539 540 if self.state == self.State.CODEC_CONFIGURED: 541 codec_specific_configuration_bytes = bytes( 542 self.codec_specific_configuration 543 ) 544 additional_parameters = ( 545 struct.pack( 546 '<BBBH', 547 self.preferred_framing, 548 self.preferred_phy, 549 self.preferred_retransmission_number, 550 self.preferred_max_transport_latency, 551 ) 552 + self.supported_presentation_delay_min.to_bytes(3, 'little') 553 + self.supported_presentation_delay_max.to_bytes(3, 'little') 554 + self.preferred_presentation_delay_min.to_bytes(3, 'little') 555 + self.preferred_presentation_delay_max.to_bytes(3, 'little') 556 + bytes(self.codec_id) 557 + bytes([len(codec_specific_configuration_bytes)]) 558 + codec_specific_configuration_bytes 559 ) 560 elif self.state == self.State.QOS_CONFIGURED: 561 additional_parameters = ( 562 bytes([self.cig_id, self.cis_id]) 563 + self.sdu_interval.to_bytes(3, 'little') 564 + struct.pack( 565 '<BBHBH', 566 self.framing, 567 self.phy, 568 self.max_sdu, 569 self.retransmission_number, 570 self.max_transport_latency, 571 ) 572 + self.presentation_delay.to_bytes(3, 'little') 573 ) 574 elif self.state in ( 575 self.State.ENABLING, 576 self.State.STREAMING, 577 self.State.DISABLING, 578 ): 579 metadata_bytes = bytes(self.metadata) 580 additional_parameters = ( 581 bytes([self.cig_id, self.cis_id, len(metadata_bytes)]) + metadata_bytes 582 ) 583 else: 584 additional_parameters = b'' 585 586 return bytes([self.ase_id, self.state]) + additional_parameters 587 588 @value.setter 589 def value(self, _new_value): 590 # Readonly. Do nothing in the setter. 591 pass 592 593 def on_read(self, _: Optional[device.Connection]) -> bytes: 594 return self.value 595 596 def __str__(self) -> str: 597 return ( 598 f'AseStateMachine(id={self.ase_id}, role={self.role.name} ' 599 f'state={self._state.name})' 600 ) 601 602 603# ----------------------------------------------------------------------------- 604class AudioStreamControlService(gatt.TemplateService): 605 UUID = gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE 606 607 ase_state_machines: Dict[int, AseStateMachine] 608 ase_control_point: gatt.Characteristic 609 _active_client: Optional[device.Connection] = None 610 611 def __init__( 612 self, 613 device: device.Device, 614 source_ase_id: Sequence[int] = (), 615 sink_ase_id: Sequence[int] = (), 616 ) -> None: 617 self.device = device 618 self.ase_state_machines = { 619 **{ 620 id: AseStateMachine(role=AudioRole.SINK, ase_id=id, service=self) 621 for id in sink_ase_id 622 }, 623 **{ 624 id: AseStateMachine(role=AudioRole.SOURCE, ase_id=id, service=self) 625 for id in source_ase_id 626 }, 627 } # ASE state machines, by ASE ID 628 629 self.ase_control_point = gatt.Characteristic( 630 uuid=gatt.GATT_ASE_CONTROL_POINT_CHARACTERISTIC, 631 properties=gatt.Characteristic.Properties.WRITE 632 | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE 633 | gatt.Characteristic.Properties.NOTIFY, 634 permissions=gatt.Characteristic.Permissions.WRITEABLE, 635 value=gatt.CharacteristicValue(write=self.on_write_ase_control_point), 636 ) 637 638 super().__init__([self.ase_control_point, *self.ase_state_machines.values()]) 639 640 def on_operation(self, opcode: ASE_Operation.Opcode, ase_id: int, args): 641 if ase := self.ase_state_machines.get(ase_id): 642 handler = getattr(ase, 'on_' + opcode.name.lower()) 643 return (ase_id, *handler(*args)) 644 else: 645 return (ase_id, AseResponseCode.INVALID_ASE_ID, AseReasonCode.NONE) 646 647 def _on_client_disconnected(self, _reason: int) -> None: 648 for ase in self.ase_state_machines.values(): 649 ase.state = AseStateMachine.State.IDLE 650 self._active_client = None 651 652 def on_write_ase_control_point(self, connection, data): 653 if not self._active_client and connection: 654 self._active_client = connection 655 connection.once('disconnection', self._on_client_disconnected) 656 657 operation = ASE_Operation.from_bytes(data) 658 responses = [] 659 logger.debug(f'*** ASCS Write {operation} ***') 660 661 if operation.op_code == ASE_Operation.Opcode.CONFIG_CODEC: 662 for ase_id, *args in zip( 663 operation.ase_id, 664 operation.target_latency, 665 operation.target_phy, 666 operation.codec_id, 667 operation.codec_specific_configuration, 668 ): 669 responses.append(self.on_operation(operation.op_code, ase_id, args)) 670 elif operation.op_code == ASE_Operation.Opcode.CONFIG_QOS: 671 for ase_id, *args in zip( 672 operation.ase_id, 673 operation.cig_id, 674 operation.cis_id, 675 operation.sdu_interval, 676 operation.framing, 677 operation.phy, 678 operation.max_sdu, 679 operation.retransmission_number, 680 operation.max_transport_latency, 681 operation.presentation_delay, 682 ): 683 responses.append(self.on_operation(operation.op_code, ase_id, args)) 684 elif operation.op_code in ( 685 ASE_Operation.Opcode.ENABLE, 686 ASE_Operation.Opcode.UPDATE_METADATA, 687 ): 688 for ase_id, *args in zip( 689 operation.ase_id, 690 operation.metadata, 691 ): 692 responses.append(self.on_operation(operation.op_code, ase_id, args)) 693 elif operation.op_code in ( 694 ASE_Operation.Opcode.RECEIVER_START_READY, 695 ASE_Operation.Opcode.DISABLE, 696 ASE_Operation.Opcode.RECEIVER_STOP_READY, 697 ASE_Operation.Opcode.RELEASE, 698 ): 699 for ase_id in operation.ase_id: 700 responses.append(self.on_operation(operation.op_code, ase_id, [])) 701 702 control_point_notification = bytes( 703 [operation.op_code, len(responses)] 704 ) + b''.join(map(bytes, responses)) 705 self.device.abort_on( 706 'flush', 707 self.device.notify_subscribers( 708 self.ase_control_point, control_point_notification 709 ), 710 ) 711 712 for ase_id, *_ in responses: 713 if ase := self.ase_state_machines.get(ase_id): 714 self.device.abort_on( 715 'flush', 716 self.device.notify_subscribers(ase, ase.value), 717 ) 718 719 720# ----------------------------------------------------------------------------- 721class AudioStreamControlServiceProxy(gatt_client.ProfileServiceProxy): 722 SERVICE_CLASS = AudioStreamControlService 723 724 sink_ase: List[gatt_client.CharacteristicProxy] 725 source_ase: List[gatt_client.CharacteristicProxy] 726 ase_control_point: gatt_client.CharacteristicProxy 727 728 def __init__(self, service_proxy: gatt_client.ServiceProxy): 729 self.service_proxy = service_proxy 730 731 self.sink_ase = service_proxy.get_characteristics_by_uuid( 732 gatt.GATT_SINK_ASE_CHARACTERISTIC 733 ) 734 self.source_ase = service_proxy.get_characteristics_by_uuid( 735 gatt.GATT_SOURCE_ASE_CHARACTERISTIC 736 ) 737 self.ase_control_point = service_proxy.get_characteristics_by_uuid( 738 gatt.GATT_ASE_CONTROL_POINT_CHARACTERISTIC 739 )[0] 740