1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16# -----------------------------------------------------------------------------
17# Imports
18# -----------------------------------------------------------------------------
19import enum
20import struct
21import logging
22from typing import List, Optional, Callable, Union, Any
23
24from bumble import l2cap
25from bumble import utils
26from bumble import gatt
27from bumble import gatt_client
28from bumble.core import AdvertisingData
29from bumble.device import Device, Connection
30
31# -----------------------------------------------------------------------------
32# Logging
33# -----------------------------------------------------------------------------
34_logger = logging.getLogger(__name__)
35
36
37# -----------------------------------------------------------------------------
38# Constants
39# -----------------------------------------------------------------------------
40class DeviceCapabilities(enum.IntFlag):
41    IS_RIGHT = 0x01
42    IS_DUAL = 0x02
43    CSIS_SUPPORTED = 0x04
44
45
46class FeatureMap(enum.IntFlag):
47    LE_COC_AUDIO_OUTPUT_STREAMING_SUPPORTED = 0x01
48
49
50class AudioType(utils.OpenIntEnum):
51    UNKNOWN = 0x00
52    RINGTONE = 0x01
53    PHONE_CALL = 0x02
54    MEDIA = 0x03
55
56
57class OpCode(utils.OpenIntEnum):
58    START = 1
59    STOP = 2
60    STATUS = 3
61
62
63class Codec(utils.OpenIntEnum):
64    G_722_16KHZ = 1
65
66
67class SupportedCodecs(enum.IntFlag):
68    G_722_16KHZ = 1 << Codec.G_722_16KHZ
69
70
71class PeripheralStatus(utils.OpenIntEnum):
72    """Status update on the other peripheral."""
73
74    OTHER_PERIPHERAL_DISCONNECTED = 1
75    OTHER_PERIPHERAL_CONNECTED = 2
76    CONNECTION_PARAMETER_UPDATED = 3
77
78
79class AudioStatus(utils.OpenIntEnum):
80    """Status report field for the audio control point."""
81
82    OK = 0
83    UNKNOWN_COMMAND = -1
84    ILLEGAL_PARAMETERS = -2
85
86
87# -----------------------------------------------------------------------------
88class AshaService(gatt.TemplateService):
89    UUID = gatt.GATT_ASHA_SERVICE
90
91    audio_sink: Optional[Callable[[bytes], Any]]
92    active_codec: Optional[Codec] = None
93    audio_type: Optional[AudioType] = None
94    volume: Optional[int] = None
95    other_state: Optional[int] = None
96    connection: Optional[Connection] = None
97
98    def __init__(
99        self,
100        capability: int,
101        hisyncid: Union[List[int], bytes],
102        device: Device,
103        psm: int = 0,
104        audio_sink: Optional[Callable[[bytes], Any]] = None,
105        feature_map: int = FeatureMap.LE_COC_AUDIO_OUTPUT_STREAMING_SUPPORTED,
106        protocol_version: int = 0x01,
107        render_delay_milliseconds: int = 0,
108        supported_codecs: int = SupportedCodecs.G_722_16KHZ,
109    ) -> None:
110        if len(hisyncid) != 8:
111            _logger.warning('HiSyncId should have a length of 8, got %d', len(hisyncid))
112
113        self.hisyncid = bytes(hisyncid)
114        self.capability = capability
115        self.device = device
116        self.audio_out_data = b''
117        self.psm = psm  # a non-zero psm is mainly for testing purpose
118        self.audio_sink = audio_sink
119        self.protocol_version = protocol_version
120
121        self.read_only_properties_characteristic = gatt.Characteristic(
122            gatt.GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
123            gatt.Characteristic.Properties.READ,
124            gatt.Characteristic.READABLE,
125            struct.pack(
126                "<BB8sBH2sH",
127                protocol_version,
128                capability,
129                self.hisyncid,
130                feature_map,
131                render_delay_milliseconds,
132                b'\x00\x00',
133                supported_codecs,
134            ),
135        )
136
137        self.audio_control_point_characteristic = gatt.Characteristic(
138            gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
139            gatt.Characteristic.Properties.WRITE
140            | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
141            gatt.Characteristic.WRITEABLE,
142            gatt.CharacteristicValue(write=self._on_audio_control_point_write),
143        )
144        self.audio_status_characteristic = gatt.Characteristic(
145            gatt.GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
146            gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.NOTIFY,
147            gatt.Characteristic.READABLE,
148            bytes([AudioStatus.OK]),
149        )
150        self.volume_characteristic = gatt.Characteristic(
151            gatt.GATT_ASHA_VOLUME_CHARACTERISTIC,
152            gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
153            gatt.Characteristic.WRITEABLE,
154            gatt.CharacteristicValue(write=self._on_volume_write),
155        )
156
157        # let the server find a free PSM
158        self.psm = device.create_l2cap_server(
159            spec=l2cap.LeCreditBasedChannelSpec(psm=self.psm, max_credits=8),
160            handler=self._on_connection,
161        ).psm
162        self.le_psm_out_characteristic = gatt.Characteristic(
163            gatt.GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
164            gatt.Characteristic.Properties.READ,
165            gatt.Characteristic.READABLE,
166            struct.pack('<H', self.psm),
167        )
168
169        characteristics = [
170            self.read_only_properties_characteristic,
171            self.audio_control_point_characteristic,
172            self.audio_status_characteristic,
173            self.volume_characteristic,
174            self.le_psm_out_characteristic,
175        ]
176
177        super().__init__(characteristics)
178
179    def get_advertising_data(self) -> bytes:
180        # Advertisement only uses 4 least significant bytes of the HiSyncId.
181        return bytes(
182            AdvertisingData(
183                [
184                    (
185                        AdvertisingData.SERVICE_DATA_16_BIT_UUID,
186                        bytes(gatt.GATT_ASHA_SERVICE)
187                        + bytes([self.protocol_version, self.capability])
188                        + self.hisyncid[:4],
189                    ),
190                ]
191            )
192        )
193
194    # Handler for audio control commands
195    async def _on_audio_control_point_write(
196        self, connection: Optional[Connection], value: bytes
197    ) -> None:
198        _logger.debug(f'--- AUDIO CONTROL POINT Write:{value.hex()}')
199        opcode = value[0]
200        if opcode == OpCode.START:
201            # Start
202            self.active_codec = Codec(value[1])
203            self.audio_type = AudioType(value[2])
204            self.volume = value[3]
205            self.other_state = value[4]
206            _logger.debug(
207                f'### START: codec={self.active_codec.name}, '
208                f'audio_type={self.audio_type.name}, '
209                f'volume={self.volume}, '
210                f'other_state={self.other_state}'
211            )
212            self.emit('started')
213        elif opcode == OpCode.STOP:
214            _logger.debug('### STOP')
215            self.active_codec = None
216            self.audio_type = None
217            self.volume = None
218            self.other_state = None
219            self.emit('stopped')
220        elif opcode == OpCode.STATUS:
221            _logger.debug('### STATUS: %s', PeripheralStatus(value[1]).name)
222
223        if self.connection is None and connection:
224            self.connection = connection
225
226            def on_disconnection(_reason) -> None:
227                self.connection = None
228                self.active_codec = None
229                self.audio_type = None
230                self.volume = None
231                self.other_state = None
232                self.emit('disconnected')
233
234            connection.once('disconnection', on_disconnection)
235
236        # OPCODE_STATUS does not need audio status point update
237        if opcode != OpCode.STATUS:
238            await self.device.notify_subscribers(
239                self.audio_status_characteristic, force=True
240            )
241
242    # Handler for volume control
243    def _on_volume_write(self, connection: Optional[Connection], value: bytes) -> None:
244        _logger.debug(f'--- VOLUME Write:{value[0]}')
245        self.volume = value[0]
246        self.emit('volume_changed')
247
248    # Register an L2CAP CoC server
249    def _on_connection(self, channel: l2cap.LeCreditBasedChannel) -> None:
250        def on_data(data: bytes) -> None:
251            if self.audio_sink:  # pylint: disable=not-callable
252                self.audio_sink(data)
253
254        channel.sink = on_data
255
256
257# -----------------------------------------------------------------------------
258class AshaServiceProxy(gatt_client.ProfileServiceProxy):
259    SERVICE_CLASS = AshaService
260    read_only_properties_characteristic: gatt_client.CharacteristicProxy
261    audio_control_point_characteristic: gatt_client.CharacteristicProxy
262    audio_status_point_characteristic: gatt_client.CharacteristicProxy
263    volume_characteristic: gatt_client.CharacteristicProxy
264    psm_characteristic: gatt_client.CharacteristicProxy
265
266    def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
267        self.service_proxy = service_proxy
268
269        for uuid, attribute_name in (
270            (
271                gatt.GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
272                'read_only_properties_characteristic',
273            ),
274            (
275                gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
276                'audio_control_point_characteristic',
277            ),
278            (
279                gatt.GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
280                'audio_status_point_characteristic',
281            ),
282            (
283                gatt.GATT_ASHA_VOLUME_CHARACTERISTIC,
284                'volume_characteristic',
285            ),
286            (
287                gatt.GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
288                'psm_characteristic',
289            ),
290        ):
291            if not (
292                characteristics := self.service_proxy.get_characteristics_by_uuid(uuid)
293            ):
294                raise gatt.InvalidServiceError(f"Missing {uuid} Characteristic")
295            setattr(self, attribute_name, characteristics[0])
296