1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import asyncio
20import functools
21from bumble import att, gatt, gatt_client
22from bumble.core import InvalidArgumentError, InvalidStateError
23from bumble.device import Device, Connection
24from bumble.utils import AsyncRunner, OpenIntEnum
25from bumble.hci import Address
26from dataclasses import dataclass, field
27import logging
28from typing import Any, Dict, List, Optional, Set, Union
29
30
31# -----------------------------------------------------------------------------
32# Constants
33# -----------------------------------------------------------------------------
34class ErrorCode(OpenIntEnum):
35    '''See Hearing Access Service 2.4. Attribute Profile error codes.'''
36
37    INVALID_OPCODE = 0x80
38    WRITE_NAME_NOT_ALLOWED = 0x81
39    PRESET_SYNCHRONIZATION_NOT_SUPPORTED = 0x82
40    PRESET_OPERATION_NOT_POSSIBLE = 0x83
41    INVALID_PARAMETERS_LENGTH = 0x84
42
43
44class HearingAidType(OpenIntEnum):
45    '''See Hearing Access Service 3.1. Hearing Aid Features.'''
46
47    BINAURAL_HEARING_AID = 0b00
48    MONAURAL_HEARING_AID = 0b01
49    BANDED_HEARING_AID = 0b10
50
51
52class PresetSynchronizationSupport(OpenIntEnum):
53    '''See Hearing Access Service 3.1. Hearing Aid Features.'''
54
55    PRESET_SYNCHRONIZATION_IS_NOT_SUPPORTED = 0b0
56    PRESET_SYNCHRONIZATION_IS_SUPPORTED = 0b1
57
58
59class IndependentPresets(OpenIntEnum):
60    '''See Hearing Access Service 3.1. Hearing Aid Features.'''
61
62    IDENTICAL_PRESET_RECORD = 0b0
63    DIFFERENT_PRESET_RECORD = 0b1
64
65
66class DynamicPresets(OpenIntEnum):
67    '''See Hearing Access Service 3.1. Hearing Aid Features.'''
68
69    PRESET_RECORDS_DOES_NOT_CHANGE = 0b0
70    PRESET_RECORDS_MAY_CHANGE = 0b1
71
72
73class WritablePresetsSupport(OpenIntEnum):
74    '''See Hearing Access Service 3.1. Hearing Aid Features.'''
75
76    WRITABLE_PRESET_RECORDS_NOT_SUPPORTED = 0b0
77    WRITABLE_PRESET_RECORDS_SUPPORTED = 0b1
78
79
80class HearingAidPresetControlPointOpcode(OpenIntEnum):
81    '''See Hearing Access Service 3.3.1 Hearing Aid Preset Control Point operation requirements.'''
82
83    # fmt: off
84    READ_PRESETS_REQUEST                     = 0x01
85    READ_PRESET_RESPONSE                     = 0x02
86    PRESET_CHANGED                           = 0x03
87    WRITE_PRESET_NAME                        = 0x04
88    SET_ACTIVE_PRESET                        = 0x05
89    SET_NEXT_PRESET                          = 0x06
90    SET_PREVIOUS_PRESET                      = 0x07
91    SET_ACTIVE_PRESET_SYNCHRONIZED_LOCALLY   = 0x08
92    SET_NEXT_PRESET_SYNCHRONIZED_LOCALLY     = 0x09
93    SET_PREVIOUS_PRESET_SYNCHRONIZED_LOCALLY = 0x0A
94
95
96@dataclass
97class HearingAidFeatures:
98    '''See Hearing Access Service 3.1. Hearing Aid Features.'''
99
100    hearing_aid_type: HearingAidType
101    preset_synchronization_support: PresetSynchronizationSupport
102    independent_presets: IndependentPresets
103    dynamic_presets: DynamicPresets
104    writable_presets_support: WritablePresetsSupport
105
106    def __bytes__(self) -> bytes:
107        return bytes(
108            [
109                (self.hearing_aid_type << 0)
110                | (self.preset_synchronization_support << 2)
111                | (self.independent_presets << 3)
112                | (self.dynamic_presets << 4)
113                | (self.writable_presets_support << 5)
114            ]
115        )
116
117
118def HearingAidFeatures_from_bytes(data: int) -> HearingAidFeatures:
119    return HearingAidFeatures(
120        HearingAidType(data & 0b11),
121        PresetSynchronizationSupport(data >> 2 & 0b1),
122        IndependentPresets(data >> 3 & 0b1),
123        DynamicPresets(data >> 4 & 0b1),
124        WritablePresetsSupport(data >> 5 & 0b1),
125    )
126
127
128@dataclass
129class PresetChangedOperation:
130    '''See Hearing Access Service 3.2.2.2. Preset Changed operation.'''
131
132    class ChangeId(OpenIntEnum):
133        # fmt: off
134        GENERIC_UPDATE            = 0x00
135        PRESET_RECORD_DELETED     = 0x01
136        PRESET_RECORD_AVAILABLE   = 0x02
137        PRESET_RECORD_UNAVAILABLE = 0x03
138
139    @dataclass
140    class Generic:
141        prev_index: int
142        preset_record: PresetRecord
143
144        def __bytes__(self) -> bytes:
145            return bytes([self.prev_index]) + bytes(self.preset_record)
146
147    change_id: ChangeId
148    additional_parameters: Union[Generic, int]
149
150    def to_bytes(self, is_last: bool) -> bytes:
151        if isinstance(self.additional_parameters, PresetChangedOperation.Generic):
152            additional_parameters_bytes = bytes(self.additional_parameters)
153        else:
154            additional_parameters_bytes = bytes([self.additional_parameters])
155
156        return (
157            bytes(
158                [
159                    HearingAidPresetControlPointOpcode.PRESET_CHANGED,
160                    self.change_id,
161                    is_last,
162                ]
163            )
164            + additional_parameters_bytes
165        )
166
167
168class PresetChangedOperationDeleted(PresetChangedOperation):
169    def __init__(self, index) -> None:
170        self.change_id = PresetChangedOperation.ChangeId.PRESET_RECORD_DELETED
171        self.additional_parameters = index
172
173
174class PresetChangedOperationAvailable(PresetChangedOperation):
175    def __init__(self, index) -> None:
176        self.change_id = PresetChangedOperation.ChangeId.PRESET_RECORD_AVAILABLE
177        self.additional_parameters = index
178
179
180class PresetChangedOperationUnavailable(PresetChangedOperation):
181    def __init__(self, index) -> None:
182        self.change_id = PresetChangedOperation.ChangeId.PRESET_RECORD_UNAVAILABLE
183        self.additional_parameters = index
184
185
186@dataclass
187class PresetRecord:
188    '''See Hearing Access Service 2.8. Preset record.'''
189
190    @dataclass
191    class Property:
192        class Writable(OpenIntEnum):
193            CANNOT_BE_WRITTEN = 0b0
194            CAN_BE_WRITTEN = 0b1
195
196        class IsAvailable(OpenIntEnum):
197            IS_UNAVAILABLE = 0b0
198            IS_AVAILABLE = 0b1
199
200        writable: Writable = Writable.CAN_BE_WRITTEN
201        is_available: IsAvailable = IsAvailable.IS_AVAILABLE
202
203        def __bytes__(self) -> bytes:
204            return bytes([self.writable | (self.is_available << 1)])
205
206    index: int
207    name: str
208    properties: Property = field(default_factory=Property)
209
210    def __bytes__(self) -> bytes:
211        return bytes([self.index]) + bytes(self.properties) + self.name.encode('utf-8')
212
213    def is_available(self) -> bool:
214        return (
215            self.properties.is_available
216            == PresetRecord.Property.IsAvailable.IS_AVAILABLE
217        )
218
219
220# -----------------------------------------------------------------------------
221# Server
222# -----------------------------------------------------------------------------
223class HearingAccessService(gatt.TemplateService):
224    UUID = gatt.GATT_HEARING_ACCESS_SERVICE
225
226    hearing_aid_features_characteristic: gatt.Characteristic
227    hearing_aid_preset_control_point: gatt.Characteristic
228    active_preset_index_characteristic: gatt.Characteristic
229    active_preset_index: int
230    active_preset_index_per_device: Dict[Address, int]
231
232    device: Device
233
234    server_features: HearingAidFeatures
235    preset_records: Dict[int, PresetRecord]  # key is the preset index
236    read_presets_request_in_progress: bool
237
238    preset_changed_operations_history_per_device: Dict[
239        Address, List[PresetChangedOperation]
240    ]
241
242    # Keep an updated list of connected client to send notification to
243    currently_connected_clients: Set[Connection]
244
245    def __init__(
246        self, device: Device, features: HearingAidFeatures, presets: List[PresetRecord]
247    ) -> None:
248        self.active_preset_index_per_device = {}
249        self.read_presets_request_in_progress = False
250        self.preset_changed_operations_history_per_device = {}
251        self.currently_connected_clients = set()
252
253        self.device = device
254        self.server_features = features
255        if len(presets) < 1:
256            raise InvalidArgumentError(f'Invalid presets: {presets}')
257
258        self.preset_records = {}
259        for p in presets:
260            if len(p.name.encode()) < 1 or len(p.name.encode()) > 40:
261                raise InvalidArgumentError(f'Invalid name: {p.name}')
262
263            self.preset_records[p.index] = p
264
265        # associate the lowest index as the current active preset at startup
266        self.active_preset_index = sorted(self.preset_records.keys())[0]
267
268        @device.on('connection')  # type: ignore
269        def on_connection(connection: Connection) -> None:
270            @connection.on('disconnection')  # type: ignore
271            def on_disconnection(_reason) -> None:
272                self.currently_connected_clients.remove(connection)
273
274            @connection.on('pairing')  # type: ignore
275            def on_pairing(*_: Any) -> None:
276                self.on_incoming_paired_connection(connection)
277
278            if connection.peer_resolvable_address:
279                self.on_incoming_paired_connection(connection)
280
281        self.hearing_aid_features_characteristic = gatt.Characteristic(
282            uuid=gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC,
283            properties=gatt.Characteristic.Properties.READ,
284            permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
285            value=bytes(self.server_features),
286        )
287        self.hearing_aid_preset_control_point = gatt.Characteristic(
288            uuid=gatt.GATT_HEARING_AID_PRESET_CONTROL_POINT_CHARACTERISTIC,
289            properties=(
290                gatt.Characteristic.Properties.WRITE
291                | gatt.Characteristic.Properties.INDICATE
292            ),
293            permissions=gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
294            value=gatt.CharacteristicValue(
295                write=self._on_write_hearing_aid_preset_control_point
296            ),
297        )
298        self.active_preset_index_characteristic = gatt.Characteristic(
299            uuid=gatt.GATT_ACTIVE_PRESET_INDEX_CHARACTERISTIC,
300            properties=(
301                gatt.Characteristic.Properties.READ
302                | gatt.Characteristic.Properties.NOTIFY
303            ),
304            permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
305            value=gatt.CharacteristicValue(read=self._on_read_active_preset_index),
306        )
307
308        super().__init__(
309            [
310                self.hearing_aid_features_characteristic,
311                self.hearing_aid_preset_control_point,
312                self.active_preset_index_characteristic,
313            ]
314        )
315
316    def on_incoming_paired_connection(self, connection: Connection):
317        '''Setup initial operations to handle a remote bonded HAP device'''
318        # TODO Should we filter on HAP device only ?
319        self.currently_connected_clients.add(connection)
320        if (
321            connection.peer_address
322            not in self.preset_changed_operations_history_per_device
323        ):
324            self.preset_changed_operations_history_per_device[
325                connection.peer_address
326            ] = []
327            return
328
329        async def on_connection_async() -> None:
330            # Send all the PresetChangedOperation that occur when not connected
331            await self._preset_changed_operation(connection)
332            # Update the active preset index if needed
333            await self.notify_active_preset_for_connection(connection)
334
335        connection.abort_on('disconnection', on_connection_async())
336
337    def _on_read_active_preset_index(
338        self, __connection__: Optional[Connection]
339    ) -> bytes:
340        return bytes([self.active_preset_index])
341
342    # TODO this need to be triggered when device is unbonded
343    def on_forget(self, addr: Address) -> None:
344        self.preset_changed_operations_history_per_device.pop(addr)
345
346    async def _on_write_hearing_aid_preset_control_point(
347        self, connection: Optional[Connection], value: bytes
348    ):
349        assert connection
350
351        opcode = HearingAidPresetControlPointOpcode(value[0])
352        handler = getattr(self, '_on_' + opcode.name.lower())
353        await handler(connection, value)
354
355    async def _on_read_presets_request(
356        self, connection: Optional[Connection], value: bytes
357    ):
358        assert connection
359        if connection.att_mtu < 49:  # 2.5. GATT sub-procedure requirements
360            logging.warning(f'HAS require MTU >= 49: {connection}')
361
362        if self.read_presets_request_in_progress:
363            raise att.ATT_Error(att.ErrorCode.PROCEDURE_ALREADY_IN_PROGRESS)
364        self.read_presets_request_in_progress = True
365
366        start_index = value[1]
367        if start_index == 0x00:
368            raise att.ATT_Error(att.ErrorCode.OUT_OF_RANGE)
369
370        num_presets = value[2]
371        if num_presets == 0x00:
372            raise att.ATT_Error(att.ErrorCode.OUT_OF_RANGE)
373
374        # Sending `num_presets` presets ordered by increasing index field, starting from start_index
375        presets = [
376            self.preset_records[key]
377            for key in sorted(self.preset_records.keys())
378            if self.preset_records[key].index >= start_index
379        ]
380        del presets[num_presets:]
381        if len(presets) == 0:
382            raise att.ATT_Error(att.ErrorCode.OUT_OF_RANGE)
383
384        AsyncRunner.spawn(self._read_preset_response(connection, presets))
385
386    async def _read_preset_response(
387        self, connection: Connection, presets: List[PresetRecord]
388    ):
389        # If the ATT bearer is terminated before all notifications or indications are sent, then the server shall consider the Read Presets Request operation aborted and shall not either continue or restart the operation when the client reconnects.
390        try:
391            for i, preset in enumerate(presets):
392                await connection.device.indicate_subscriber(
393                    connection,
394                    self.hearing_aid_preset_control_point,
395                    value=bytes(
396                        [
397                            HearingAidPresetControlPointOpcode.READ_PRESET_RESPONSE,
398                            i == len(presets) - 1,
399                        ]
400                    )
401                    + bytes(preset),
402                )
403
404        finally:
405            # indicate_subscriber can raise a TimeoutError, we need to gracefully terminate the operation
406            self.read_presets_request_in_progress = False
407
408    async def generic_update(self, op: PresetChangedOperation) -> None:
409        '''Server API to perform a generic update. It is the responsibility of the caller to modify the preset_records to match the PresetChangedOperation being sent'''
410        await self._notifyPresetOperations(op)
411
412    async def delete_preset(self, index: int) -> None:
413        '''Server API to delete a preset. It should not be the current active preset'''
414
415        if index == self.active_preset_index:
416            raise InvalidStateError('Cannot delete active preset')
417
418        del self.preset_records[index]
419        await self._notifyPresetOperations(PresetChangedOperationDeleted(index))
420
421    async def available_preset(self, index: int) -> None:
422        '''Server API to make a preset available'''
423
424        preset = self.preset_records[index]
425        preset.properties.is_available = PresetRecord.Property.IsAvailable.IS_AVAILABLE
426        await self._notifyPresetOperations(PresetChangedOperationAvailable(index))
427
428    async def unavailable_preset(self, index: int) -> None:
429        '''Server API to make a preset unavailable. It should not be the current active preset'''
430
431        if index == self.active_preset_index:
432            raise InvalidStateError('Cannot set active preset as unavailable')
433
434        preset = self.preset_records[index]
435        preset.properties.is_available = (
436            PresetRecord.Property.IsAvailable.IS_UNAVAILABLE
437        )
438        await self._notifyPresetOperations(PresetChangedOperationUnavailable(index))
439
440    async def _preset_changed_operation(self, connection: Connection) -> None:
441        '''Send all PresetChangedOperation saved for a given connection'''
442        op_list = self.preset_changed_operations_history_per_device.get(
443            connection.peer_address, []
444        )
445
446        # Notification will be sent in index order
447        def get_op_index(op: PresetChangedOperation) -> int:
448            if isinstance(op.additional_parameters, PresetChangedOperation.Generic):
449                return op.additional_parameters.prev_index
450            return op.additional_parameters
451
452        op_list.sort(key=get_op_index)
453        # If the ATT bearer is terminated before all notifications or indications are sent, then the server shall consider the Preset Changed operation aborted and shall continue the operation when the client reconnects.
454        while len(op_list) > 0:
455            try:
456                await connection.device.indicate_subscriber(
457                    connection,
458                    self.hearing_aid_preset_control_point,
459                    value=op_list[0].to_bytes(len(op_list) == 1),
460                )
461                # Remove item once sent, and keep the non sent item in the list
462                op_list.pop(0)
463            except TimeoutError:
464                break
465
466    async def _notifyPresetOperations(self, op: PresetChangedOperation) -> None:
467        for historyList in self.preset_changed_operations_history_per_device.values():
468            historyList.append(op)
469
470        for connection in self.currently_connected_clients:
471            await self._preset_changed_operation(connection)
472
473    async def _on_write_preset_name(
474        self, connection: Optional[Connection], value: bytes
475    ):
476        assert connection
477
478        if self.read_presets_request_in_progress:
479            raise att.ATT_Error(att.ErrorCode.PROCEDURE_ALREADY_IN_PROGRESS)
480
481        index = value[1]
482        preset = self.preset_records.get(index, None)
483        if (
484            not preset
485            or preset.properties.writable
486            == PresetRecord.Property.Writable.CANNOT_BE_WRITTEN
487        ):
488            raise att.ATT_Error(ErrorCode.WRITE_NAME_NOT_ALLOWED)
489
490        name = value[2:].decode('utf-8')
491        if not name or len(name) > 40:
492            raise att.ATT_Error(ErrorCode.INVALID_PARAMETERS_LENGTH)
493
494        preset.name = name
495
496        await self.generic_update(
497            PresetChangedOperation(
498                PresetChangedOperation.ChangeId.GENERIC_UPDATE,
499                PresetChangedOperation.Generic(index, preset),
500            )
501        )
502
503    async def notify_active_preset_for_connection(self, connection: Connection) -> None:
504        if (
505            self.active_preset_index_per_device.get(connection.peer_address, 0x00)
506            == self.active_preset_index
507        ):
508            # Nothing to do, peer is already updated
509            return
510
511        await connection.device.notify_subscriber(
512            connection,
513            attribute=self.active_preset_index_characteristic,
514            value=bytes([self.active_preset_index]),
515        )
516        self.active_preset_index_per_device[connection.peer_address] = (
517            self.active_preset_index
518        )
519
520    async def notify_active_preset(self) -> None:
521        for connection in self.currently_connected_clients:
522            await self.notify_active_preset_for_connection(connection)
523
524    async def set_active_preset(
525        self, connection: Optional[Connection], value: bytes
526    ) -> None:
527        assert connection
528        index = value[1]
529        preset = self.preset_records.get(index, None)
530        if (
531            not preset
532            or preset.properties.is_available
533            != PresetRecord.Property.IsAvailable.IS_AVAILABLE
534        ):
535            raise att.ATT_Error(ErrorCode.PRESET_OPERATION_NOT_POSSIBLE)
536
537        if index == self.active_preset_index:
538            # Already at correct value
539            return
540
541        self.active_preset_index = index
542        await self.notify_active_preset()
543
544    async def _on_set_active_preset(
545        self, connection: Optional[Connection], value: bytes
546    ):
547        await self.set_active_preset(connection, value)
548
549    async def set_next_or_previous_preset(
550        self, connection: Optional[Connection], is_previous
551    ):
552        '''Set the next or the previous preset as active'''
553        assert connection
554
555        if self.active_preset_index == 0x00:
556            raise att.ATT_Error(ErrorCode.PRESET_OPERATION_NOT_POSSIBLE)
557
558        first_preset: Optional[PresetRecord] = None  # To loop to first preset
559        next_preset: Optional[PresetRecord] = None
560        for index, record in sorted(self.preset_records.items(), reverse=is_previous):
561            if not record.is_available():
562                continue
563            if first_preset == None:
564                first_preset = record
565            if is_previous:
566                if index >= self.active_preset_index:
567                    continue
568            elif index <= self.active_preset_index:
569                continue
570            next_preset = record
571            break
572
573        if not first_preset:  # If no other preset are available
574            raise att.ATT_Error(ErrorCode.PRESET_OPERATION_NOT_POSSIBLE)
575
576        if next_preset:
577            self.active_preset_index = next_preset.index
578        else:
579            self.active_preset_index = first_preset.index
580        await self.notify_active_preset()
581
582    async def _on_set_next_preset(
583        self, connection: Optional[Connection], __value__: bytes
584    ) -> None:
585        await self.set_next_or_previous_preset(connection, False)
586
587    async def _on_set_previous_preset(
588        self, connection: Optional[Connection], __value__: bytes
589    ) -> None:
590        await self.set_next_or_previous_preset(connection, True)
591
592    async def _on_set_active_preset_synchronized_locally(
593        self, connection: Optional[Connection], value: bytes
594    ):
595        if (
596            self.server_features.preset_synchronization_support
597            == PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED
598        ):
599            raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED)
600        await self.set_active_preset(connection, value)
601        # TODO (low priority) inform other server of the change
602
603    async def _on_set_next_preset_synchronized_locally(
604        self, connection: Optional[Connection], __value__: bytes
605    ):
606        if (
607            self.server_features.preset_synchronization_support
608            == PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED
609        ):
610            raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED)
611        await self.set_next_or_previous_preset(connection, False)
612        # TODO (low priority) inform other server of the change
613
614    async def _on_set_previous_preset_synchronized_locally(
615        self, connection: Optional[Connection], __value__: bytes
616    ):
617        if (
618            self.server_features.preset_synchronization_support
619            == PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED
620        ):
621            raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED)
622        await self.set_next_or_previous_preset(connection, True)
623        # TODO (low priority) inform other server of the change
624
625
626# -----------------------------------------------------------------------------
627# Client
628# -----------------------------------------------------------------------------
629class HearingAccessServiceProxy(gatt_client.ProfileServiceProxy):
630    SERVICE_CLASS = HearingAccessService
631
632    hearing_aid_preset_control_point: gatt_client.CharacteristicProxy
633    preset_control_point_indications: asyncio.Queue
634
635    def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
636        self.service_proxy = service_proxy
637
638        self.server_features = gatt.PackedCharacteristicAdapter(
639            service_proxy.get_characteristics_by_uuid(
640                gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC
641            )[0],
642            'B',
643        )
644
645        self.hearing_aid_preset_control_point = (
646            service_proxy.get_characteristics_by_uuid(
647                gatt.GATT_HEARING_AID_PRESET_CONTROL_POINT_CHARACTERISTIC
648            )[0]
649        )
650
651        self.active_preset_index = gatt.PackedCharacteristicAdapter(
652            service_proxy.get_characteristics_by_uuid(
653                gatt.GATT_ACTIVE_PRESET_INDEX_CHARACTERISTIC
654            )[0],
655            'B',
656        )
657
658    async def setup_subscription(self):
659        self.preset_control_point_indications = asyncio.Queue()
660        self.active_preset_index_notification = asyncio.Queue()
661
662        def on_active_preset_index_notification(data: bytes):
663            self.active_preset_index_notification.put_nowait(data)
664
665        def on_preset_control_point_indication(data: bytes):
666            self.preset_control_point_indications.put_nowait(data)
667
668        await self.hearing_aid_preset_control_point.subscribe(
669            functools.partial(on_preset_control_point_indication), prefer_notify=False
670        )
671
672        await self.active_preset_index.subscribe(
673            functools.partial(on_active_preset_index_notification)
674        )
675