1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for
13
14"""LE Audio - Audio Stream Control Service"""
15
16# -----------------------------------------------------------------------------
17# Imports
18# -----------------------------------------------------------------------------
19from __future__ import annotations
20import enum
21import logging
22import struct
23from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union
24
25from bumble import colors
26from bumble.profiles.bap import CodecSpecificConfiguration
27from bumble.profiles import le_audio
28from bumble import device
29from bumble import gatt
30from bumble import gatt_client
31from bumble import hci
32
33# -----------------------------------------------------------------------------
34# Logging
35# -----------------------------------------------------------------------------
36logger = logging.getLogger(__name__)
37
38
39# -----------------------------------------------------------------------------
40# ASE Operations
41# -----------------------------------------------------------------------------
42
43
44class ASE_Operation:
45    '''
46    See Audio Stream Control Service - 5 ASE Control operations.
47    '''
48
49    classes: Dict[int, Type[ASE_Operation]] = {}
50    op_code: int
51    name: str
52    fields: Optional[Sequence[Any]] = None
53    ase_id: List[int]
54
55    class Opcode(enum.IntEnum):
56        # fmt: off
57        CONFIG_CODEC         = 0x01
58        CONFIG_QOS           = 0x02
59        ENABLE               = 0x03
60        RECEIVER_START_READY = 0x04
61        DISABLE              = 0x05
62        RECEIVER_STOP_READY  = 0x06
63        UPDATE_METADATA      = 0x07
64        RELEASE              = 0x08
65
66    @staticmethod
67    def from_bytes(pdu: bytes) -> ASE_Operation:
68        op_code = pdu[0]
69
70        cls = ASE_Operation.classes.get(op_code)
71        if cls is None:
72            instance = ASE_Operation(pdu)
73            instance.name = ASE_Operation.Opcode(op_code).name
74            instance.op_code = op_code
75            return instance
76        self = cls.__new__(cls)
77        ASE_Operation.__init__(self, pdu)
78        if self.fields is not None:
79            self.init_from_bytes(pdu, 1)
80        return self
81
82    @staticmethod
83    def subclass(fields):
84        def inner(cls: Type[ASE_Operation]):
85            try:
86                operation = ASE_Operation.Opcode[cls.__name__[4:].upper()]
87                cls.name = operation.name
88                cls.op_code = operation
89            except:
90                raise KeyError(f'PDU name {cls.name} not found in Ase_Operation.Opcode')
91            cls.fields = fields
92
93            # Register a factory for this class
94            ASE_Operation.classes[cls.op_code] = cls
95
96            return cls
97
98        return inner
99
100    def __init__(self, pdu: Optional[bytes] = None, **kwargs) -> None:
101        if self.fields is not None and kwargs:
102            hci.HCI_Object.init_from_fields(self, self.fields, kwargs)
103        if pdu is None:
104            pdu = bytes([self.op_code]) + hci.HCI_Object.dict_to_bytes(
105                kwargs, self.fields
106            )
107        self.pdu = pdu
108
109    def init_from_bytes(self, pdu: bytes, offset: int):
110        return hci.HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
111
112    def __bytes__(self) -> bytes:
113        return self.pdu
114
115    def __str__(self) -> str:
116        result = f'{colors.color(self.name, "yellow")} '
117        if fields := getattr(self, 'fields', None):
118            result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, '  ')
119        else:
120            if len(self.pdu) > 1:
121                result += f': {self.pdu.hex()}'
122        return result
123
124
125@ASE_Operation.subclass(
126    [
127        [
128            ('ase_id', 1),
129            ('target_latency', 1),
130            ('target_phy', 1),
131            ('codec_id', hci.CodingFormat.parse_from_bytes),
132            ('codec_specific_configuration', 'v'),
133        ],
134    ]
135)
136class ASE_Config_Codec(ASE_Operation):
137    '''
138    See Audio Stream Control Service 5.1 - Config Codec Operation
139    '''
140
141    target_latency: List[int]
142    target_phy: List[int]
143    codec_id: List[hci.CodingFormat]
144    codec_specific_configuration: List[bytes]
145
146
147@ASE_Operation.subclass(
148    [
149        [
150            ('ase_id', 1),
151            ('cig_id', 1),
152            ('cis_id', 1),
153            ('sdu_interval', 3),
154            ('framing', 1),
155            ('phy', 1),
156            ('max_sdu', 2),
157            ('retransmission_number', 1),
158            ('max_transport_latency', 2),
159            ('presentation_delay', 3),
160        ],
161    ]
162)
163class ASE_Config_QOS(ASE_Operation):
164    '''
165    See Audio Stream Control Service 5.2 - Config Qos Operation
166    '''
167
168    cig_id: List[int]
169    cis_id: List[int]
170    sdu_interval: List[int]
171    framing: List[int]
172    phy: List[int]
173    max_sdu: List[int]
174    retransmission_number: List[int]
175    max_transport_latency: List[int]
176    presentation_delay: List[int]
177
178
179@ASE_Operation.subclass([[('ase_id', 1), ('metadata', 'v')]])
180class ASE_Enable(ASE_Operation):
181    '''
182    See Audio Stream Control Service 5.3 - Enable Operation
183    '''
184
185    metadata: bytes
186
187
188@ASE_Operation.subclass([[('ase_id', 1)]])
189class ASE_Receiver_Start_Ready(ASE_Operation):
190    '''
191    See Audio Stream Control Service 5.4 - Receiver Start Ready Operation
192    '''
193
194
195@ASE_Operation.subclass([[('ase_id', 1)]])
196class ASE_Disable(ASE_Operation):
197    '''
198    See Audio Stream Control Service 5.5 - Disable Operation
199    '''
200
201
202@ASE_Operation.subclass([[('ase_id', 1)]])
203class ASE_Receiver_Stop_Ready(ASE_Operation):
204    '''
205    See Audio Stream Control Service 5.6 - Receiver Stop Ready Operation
206    '''
207
208
209@ASE_Operation.subclass([[('ase_id', 1), ('metadata', 'v')]])
210class ASE_Update_Metadata(ASE_Operation):
211    '''
212    See Audio Stream Control Service 5.7 - Update Metadata Operation
213    '''
214
215    metadata: List[bytes]
216
217
218@ASE_Operation.subclass([[('ase_id', 1)]])
219class ASE_Release(ASE_Operation):
220    '''
221    See Audio Stream Control Service 5.8 - Release Operation
222    '''
223
224
225class AseResponseCode(enum.IntEnum):
226    # fmt: off
227    SUCCESS                                     = 0x00
228    UNSUPPORTED_OPCODE                          = 0x01
229    INVALID_LENGTH                              = 0x02
230    INVALID_ASE_ID                              = 0x03
231    INVALID_ASE_STATE_MACHINE_TRANSITION        = 0x04
232    INVALID_ASE_DIRECTION                       = 0x05
233    UNSUPPORTED_AUDIO_CAPABILITIES              = 0x06
234    UNSUPPORTED_CONFIGURATION_PARAMETER_VALUE   = 0x07
235    REJECTED_CONFIGURATION_PARAMETER_VALUE      = 0x08
236    INVALID_CONFIGURATION_PARAMETER_VALUE       = 0x09
237    UNSUPPORTED_METADATA                        = 0x0A
238    REJECTED_METADATA                           = 0x0B
239    INVALID_METADATA                            = 0x0C
240    INSUFFICIENT_RESOURCES                      = 0x0D
241    UNSPECIFIED_ERROR                           = 0x0E
242
243
244class AseReasonCode(enum.IntEnum):
245    # fmt: off
246    NONE                            = 0x00
247    CODEC_ID                        = 0x01
248    CODEC_SPECIFIC_CONFIGURATION    = 0x02
249    SDU_INTERVAL                    = 0x03
250    FRAMING                         = 0x04
251    PHY                             = 0x05
252    MAXIMUM_SDU_SIZE                = 0x06
253    RETRANSMISSION_NUMBER           = 0x07
254    MAX_TRANSPORT_LATENCY           = 0x08
255    PRESENTATION_DELAY              = 0x09
256    INVALID_ASE_CIS_MAPPING         = 0x0A
257
258
259# -----------------------------------------------------------------------------
260class AudioRole(enum.IntEnum):
261    SINK = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST
262    SOURCE = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER
263
264
265# -----------------------------------------------------------------------------
266class AseStateMachine(gatt.Characteristic):
267    class State(enum.IntEnum):
268        # fmt: off
269        IDLE             = 0x00
270        CODEC_CONFIGURED = 0x01
271        QOS_CONFIGURED   = 0x02
272        ENABLING         = 0x03
273        STREAMING        = 0x04
274        DISABLING        = 0x05
275        RELEASING        = 0x06
276
277    cis_link: Optional[device.CisLink] = None
278
279    # Additional parameters in CODEC_CONFIGURED State
280    preferred_framing = 0  # Unframed PDU supported
281    preferred_phy = 0
282    preferred_retransmission_number = 13
283    preferred_max_transport_latency = 100
284    supported_presentation_delay_min = 0
285    supported_presentation_delay_max = 0
286    preferred_presentation_delay_min = 0
287    preferred_presentation_delay_max = 0
288    codec_id = hci.CodingFormat(hci.CodecID.LC3)
289    codec_specific_configuration: Union[CodecSpecificConfiguration, bytes] = b''
290
291    # Additional parameters in QOS_CONFIGURED State
292    cig_id = 0
293    cis_id = 0
294    sdu_interval = 0
295    framing = 0
296    phy = 0
297    max_sdu = 0
298    retransmission_number = 0
299    max_transport_latency = 0
300    presentation_delay = 0
301
302    # Additional parameters in ENABLING, STREAMING, DISABLING State
303    metadata = le_audio.Metadata()
304
305    def __init__(
306        self,
307        role: AudioRole,
308        ase_id: int,
309        service: AudioStreamControlService,
310    ) -> None:
311        self.service = service
312        self.ase_id = ase_id
313        self._state = AseStateMachine.State.IDLE
314        self.role = role
315
316        uuid = (
317            gatt.GATT_SINK_ASE_CHARACTERISTIC
318            if role == AudioRole.SINK
319            else gatt.GATT_SOURCE_ASE_CHARACTERISTIC
320        )
321        super().__init__(
322            uuid=uuid,
323            properties=gatt.Characteristic.Properties.READ
324            | gatt.Characteristic.Properties.NOTIFY,
325            permissions=gatt.Characteristic.Permissions.READABLE,
326            value=gatt.CharacteristicValue(read=self.on_read),
327        )
328
329        self.service.device.on('cis_request', self.on_cis_request)
330        self.service.device.on('cis_establishment', self.on_cis_establishment)
331
332    def on_cis_request(
333        self,
334        acl_connection: device.Connection,
335        cis_handle: int,
336        cig_id: int,
337        cis_id: int,
338    ) -> None:
339        if (
340            cig_id == self.cig_id
341            and cis_id == self.cis_id
342            and self.state == self.State.ENABLING
343        ):
344            acl_connection.abort_on(
345                'flush', self.service.device.accept_cis_request(cis_handle)
346            )
347
348    def on_cis_establishment(self, cis_link: device.CisLink) -> None:
349        if (
350            cis_link.cig_id == self.cig_id
351            and cis_link.cis_id == self.cis_id
352            and self.state == self.State.ENABLING
353        ):
354            cis_link.on('disconnection', self.on_cis_disconnection)
355
356            async def post_cis_established():
357                await self.service.device.send_command(
358                    hci.HCI_LE_Setup_ISO_Data_Path_Command(
359                        connection_handle=cis_link.handle,
360                        data_path_direction=self.role,
361                        data_path_id=0x00,  # Fixed HCI
362                        codec_id=hci.CodingFormat(hci.CodecID.TRANSPARENT),
363                        controller_delay=0,
364                        codec_configuration=b'',
365                    )
366                )
367                if self.role == AudioRole.SINK:
368                    self.state = self.State.STREAMING
369                await self.service.device.notify_subscribers(self, self.value)
370
371            cis_link.acl_connection.abort_on('flush', post_cis_established())
372            self.cis_link = cis_link
373
374    def on_cis_disconnection(self, _reason) -> None:
375        self.cis_link = None
376
377    def on_config_codec(
378        self,
379        target_latency: int,
380        target_phy: int,
381        codec_id: hci.CodingFormat,
382        codec_specific_configuration: bytes,
383    ) -> Tuple[AseResponseCode, AseReasonCode]:
384        if self.state not in (
385            self.State.IDLE,
386            self.State.CODEC_CONFIGURED,
387            self.State.QOS_CONFIGURED,
388        ):
389            return (
390                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
391                AseReasonCode.NONE,
392            )
393
394        self.max_transport_latency = target_latency
395        self.phy = target_phy
396        self.codec_id = codec_id
397        if codec_id.codec_id == hci.CodecID.VENDOR_SPECIFIC:
398            self.codec_specific_configuration = codec_specific_configuration
399        else:
400            self.codec_specific_configuration = CodecSpecificConfiguration.from_bytes(
401                codec_specific_configuration
402            )
403
404        self.state = self.State.CODEC_CONFIGURED
405
406        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
407
408    def on_config_qos(
409        self,
410        cig_id: int,
411        cis_id: int,
412        sdu_interval: int,
413        framing: int,
414        phy: int,
415        max_sdu: int,
416        retransmission_number: int,
417        max_transport_latency: int,
418        presentation_delay: int,
419    ) -> Tuple[AseResponseCode, AseReasonCode]:
420        if self.state not in (
421            AseStateMachine.State.CODEC_CONFIGURED,
422            AseStateMachine.State.QOS_CONFIGURED,
423        ):
424            return (
425                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
426                AseReasonCode.NONE,
427            )
428
429        self.cig_id = cig_id
430        self.cis_id = cis_id
431        self.sdu_interval = sdu_interval
432        self.framing = framing
433        self.phy = phy
434        self.max_sdu = max_sdu
435        self.retransmission_number = retransmission_number
436        self.max_transport_latency = max_transport_latency
437        self.presentation_delay = presentation_delay
438
439        self.state = self.State.QOS_CONFIGURED
440
441        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
442
443    def on_enable(self, metadata: bytes) -> Tuple[AseResponseCode, AseReasonCode]:
444        if self.state != AseStateMachine.State.QOS_CONFIGURED:
445            return (
446                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
447                AseReasonCode.NONE,
448            )
449
450        self.metadata = le_audio.Metadata.from_bytes(metadata)
451        self.state = self.State.ENABLING
452
453        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
454
455    def on_receiver_start_ready(self) -> Tuple[AseResponseCode, AseReasonCode]:
456        if self.state != AseStateMachine.State.ENABLING:
457            return (
458                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
459                AseReasonCode.NONE,
460            )
461        self.state = self.State.STREAMING
462        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
463
464    def on_disable(self) -> Tuple[AseResponseCode, AseReasonCode]:
465        if self.state not in (
466            AseStateMachine.State.ENABLING,
467            AseStateMachine.State.STREAMING,
468        ):
469            return (
470                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
471                AseReasonCode.NONE,
472            )
473        if self.role == AudioRole.SINK:
474            self.state = self.State.QOS_CONFIGURED
475        else:
476            self.state = self.State.DISABLING
477        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
478
479    def on_receiver_stop_ready(self) -> Tuple[AseResponseCode, AseReasonCode]:
480        if (
481            self.role != AudioRole.SOURCE
482            or self.state != AseStateMachine.State.DISABLING
483        ):
484            return (
485                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
486                AseReasonCode.NONE,
487            )
488        self.state = self.State.QOS_CONFIGURED
489        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
490
491    def on_update_metadata(
492        self, metadata: bytes
493    ) -> Tuple[AseResponseCode, AseReasonCode]:
494        if self.state not in (
495            AseStateMachine.State.ENABLING,
496            AseStateMachine.State.STREAMING,
497        ):
498            return (
499                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
500                AseReasonCode.NONE,
501            )
502        self.metadata = le_audio.Metadata.from_bytes(metadata)
503        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
504
505    def on_release(self) -> Tuple[AseResponseCode, AseReasonCode]:
506        if self.state == AseStateMachine.State.IDLE:
507            return (
508                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
509                AseReasonCode.NONE,
510            )
511        self.state = self.State.RELEASING
512
513        async def remove_cis_async():
514            await self.service.device.send_command(
515                hci.HCI_LE_Remove_ISO_Data_Path_Command(
516                    connection_handle=self.cis_link.handle,
517                    data_path_direction=self.role,
518                )
519            )
520            self.state = self.State.IDLE
521            await self.service.device.notify_subscribers(self, self.value)
522
523        self.service.device.abort_on('flush', remove_cis_async())
524        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
525
526    @property
527    def state(self) -> State:
528        return self._state
529
530    @state.setter
531    def state(self, new_state: State) -> None:
532        logger.debug(f'{self} state change -> {colors.color(new_state.name, "cyan")}')
533        self._state = new_state
534        self.emit('state_change')
535
536    @property
537    def value(self):
538        '''Returns ASE_ID, ASE_STATE, and ASE Additional Parameters.'''
539
540        if self.state == self.State.CODEC_CONFIGURED:
541            codec_specific_configuration_bytes = bytes(
542                self.codec_specific_configuration
543            )
544            additional_parameters = (
545                struct.pack(
546                    '<BBBH',
547                    self.preferred_framing,
548                    self.preferred_phy,
549                    self.preferred_retransmission_number,
550                    self.preferred_max_transport_latency,
551                )
552                + self.supported_presentation_delay_min.to_bytes(3, 'little')
553                + self.supported_presentation_delay_max.to_bytes(3, 'little')
554                + self.preferred_presentation_delay_min.to_bytes(3, 'little')
555                + self.preferred_presentation_delay_max.to_bytes(3, 'little')
556                + bytes(self.codec_id)
557                + bytes([len(codec_specific_configuration_bytes)])
558                + codec_specific_configuration_bytes
559            )
560        elif self.state == self.State.QOS_CONFIGURED:
561            additional_parameters = (
562                bytes([self.cig_id, self.cis_id])
563                + self.sdu_interval.to_bytes(3, 'little')
564                + struct.pack(
565                    '<BBHBH',
566                    self.framing,
567                    self.phy,
568                    self.max_sdu,
569                    self.retransmission_number,
570                    self.max_transport_latency,
571                )
572                + self.presentation_delay.to_bytes(3, 'little')
573            )
574        elif self.state in (
575            self.State.ENABLING,
576            self.State.STREAMING,
577            self.State.DISABLING,
578        ):
579            metadata_bytes = bytes(self.metadata)
580            additional_parameters = (
581                bytes([self.cig_id, self.cis_id, len(metadata_bytes)]) + metadata_bytes
582            )
583        else:
584            additional_parameters = b''
585
586        return bytes([self.ase_id, self.state]) + additional_parameters
587
588    @value.setter
589    def value(self, _new_value):
590        # Readonly. Do nothing in the setter.
591        pass
592
593    def on_read(self, _: Optional[device.Connection]) -> bytes:
594        return self.value
595
596    def __str__(self) -> str:
597        return (
598            f'AseStateMachine(id={self.ase_id}, role={self.role.name} '
599            f'state={self._state.name})'
600        )
601
602
603# -----------------------------------------------------------------------------
604class AudioStreamControlService(gatt.TemplateService):
605    UUID = gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE
606
607    ase_state_machines: Dict[int, AseStateMachine]
608    ase_control_point: gatt.Characteristic
609    _active_client: Optional[device.Connection] = None
610
611    def __init__(
612        self,
613        device: device.Device,
614        source_ase_id: Sequence[int] = (),
615        sink_ase_id: Sequence[int] = (),
616    ) -> None:
617        self.device = device
618        self.ase_state_machines = {
619            **{
620                id: AseStateMachine(role=AudioRole.SINK, ase_id=id, service=self)
621                for id in sink_ase_id
622            },
623            **{
624                id: AseStateMachine(role=AudioRole.SOURCE, ase_id=id, service=self)
625                for id in source_ase_id
626            },
627        }  # ASE state machines, by ASE ID
628
629        self.ase_control_point = gatt.Characteristic(
630            uuid=gatt.GATT_ASE_CONTROL_POINT_CHARACTERISTIC,
631            properties=gatt.Characteristic.Properties.WRITE
632            | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE
633            | gatt.Characteristic.Properties.NOTIFY,
634            permissions=gatt.Characteristic.Permissions.WRITEABLE,
635            value=gatt.CharacteristicValue(write=self.on_write_ase_control_point),
636        )
637
638        super().__init__([self.ase_control_point, *self.ase_state_machines.values()])
639
640    def on_operation(self, opcode: ASE_Operation.Opcode, ase_id: int, args):
641        if ase := self.ase_state_machines.get(ase_id):
642            handler = getattr(ase, 'on_' + opcode.name.lower())
643            return (ase_id, *handler(*args))
644        else:
645            return (ase_id, AseResponseCode.INVALID_ASE_ID, AseReasonCode.NONE)
646
647    def _on_client_disconnected(self, _reason: int) -> None:
648        for ase in self.ase_state_machines.values():
649            ase.state = AseStateMachine.State.IDLE
650        self._active_client = None
651
652    def on_write_ase_control_point(self, connection, data):
653        if not self._active_client and connection:
654            self._active_client = connection
655            connection.once('disconnection', self._on_client_disconnected)
656
657        operation = ASE_Operation.from_bytes(data)
658        responses = []
659        logger.debug(f'*** ASCS Write {operation} ***')
660
661        if operation.op_code == ASE_Operation.Opcode.CONFIG_CODEC:
662            for ase_id, *args in zip(
663                operation.ase_id,
664                operation.target_latency,
665                operation.target_phy,
666                operation.codec_id,
667                operation.codec_specific_configuration,
668            ):
669                responses.append(self.on_operation(operation.op_code, ase_id, args))
670        elif operation.op_code == ASE_Operation.Opcode.CONFIG_QOS:
671            for ase_id, *args in zip(
672                operation.ase_id,
673                operation.cig_id,
674                operation.cis_id,
675                operation.sdu_interval,
676                operation.framing,
677                operation.phy,
678                operation.max_sdu,
679                operation.retransmission_number,
680                operation.max_transport_latency,
681                operation.presentation_delay,
682            ):
683                responses.append(self.on_operation(operation.op_code, ase_id, args))
684        elif operation.op_code in (
685            ASE_Operation.Opcode.ENABLE,
686            ASE_Operation.Opcode.UPDATE_METADATA,
687        ):
688            for ase_id, *args in zip(
689                operation.ase_id,
690                operation.metadata,
691            ):
692                responses.append(self.on_operation(operation.op_code, ase_id, args))
693        elif operation.op_code in (
694            ASE_Operation.Opcode.RECEIVER_START_READY,
695            ASE_Operation.Opcode.DISABLE,
696            ASE_Operation.Opcode.RECEIVER_STOP_READY,
697            ASE_Operation.Opcode.RELEASE,
698        ):
699            for ase_id in operation.ase_id:
700                responses.append(self.on_operation(operation.op_code, ase_id, []))
701
702        control_point_notification = bytes(
703            [operation.op_code, len(responses)]
704        ) + b''.join(map(bytes, responses))
705        self.device.abort_on(
706            'flush',
707            self.device.notify_subscribers(
708                self.ase_control_point, control_point_notification
709            ),
710        )
711
712        for ase_id, *_ in responses:
713            if ase := self.ase_state_machines.get(ase_id):
714                self.device.abort_on(
715                    'flush',
716                    self.device.notify_subscribers(ase, ase.value),
717                )
718
719
720# -----------------------------------------------------------------------------
721class AudioStreamControlServiceProxy(gatt_client.ProfileServiceProxy):
722    SERVICE_CLASS = AudioStreamControlService
723
724    sink_ase: List[gatt_client.CharacteristicProxy]
725    source_ase: List[gatt_client.CharacteristicProxy]
726    ase_control_point: gatt_client.CharacteristicProxy
727
728    def __init__(self, service_proxy: gatt_client.ServiceProxy):
729        self.service_proxy = service_proxy
730
731        self.sink_ase = service_proxy.get_characteristics_by_uuid(
732            gatt.GATT_SINK_ASE_CHARACTERISTIC
733        )
734        self.source_ase = service_proxy.get_characteristics_by_uuid(
735            gatt.GATT_SOURCE_ASE_CHARACTERISTIC
736        )
737        self.ase_control_point = service_proxy.get_characteristics_by_uuid(
738            gatt.GATT_ASE_CONTROL_POINT_CHARACTERISTIC
739        )[0]
740