1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16# -----------------------------------------------------------------------------
17# Imports
18# -----------------------------------------------------------------------------
19from __future__ import annotations
20
21from collections.abc import Sequence
22import dataclasses
23import enum
24import struct
25import functools
26import logging
27from typing import List
28from typing_extensions import Self
29
30from bumble import core
31from bumble import hci
32from bumble import gatt
33from bumble import utils
34from bumble.profiles import le_audio
35
36
37# -----------------------------------------------------------------------------
38# Logging
39# -----------------------------------------------------------------------------
40logger = logging.getLogger(__name__)
41
42# -----------------------------------------------------------------------------
43# Constants
44# -----------------------------------------------------------------------------
45
46
47class AudioLocation(enum.IntFlag):
48    '''Bluetooth Assigned Numbers, Section 6.12.1 - Audio Location'''
49
50    # fmt: off
51    NOT_ALLOWED             = 0x00000000
52    FRONT_LEFT              = 0x00000001
53    FRONT_RIGHT             = 0x00000002
54    FRONT_CENTER            = 0x00000004
55    LOW_FREQUENCY_EFFECTS_1 = 0x00000008
56    BACK_LEFT               = 0x00000010
57    BACK_RIGHT              = 0x00000020
58    FRONT_LEFT_OF_CENTER    = 0x00000040
59    FRONT_RIGHT_OF_CENTER   = 0x00000080
60    BACK_CENTER             = 0x00000100
61    LOW_FREQUENCY_EFFECTS_2 = 0x00000200
62    SIDE_LEFT               = 0x00000400
63    SIDE_RIGHT              = 0x00000800
64    TOP_FRONT_LEFT          = 0x00001000
65    TOP_FRONT_RIGHT         = 0x00002000
66    TOP_FRONT_CENTER        = 0x00004000
67    TOP_CENTER              = 0x00008000
68    TOP_BACK_LEFT           = 0x00010000
69    TOP_BACK_RIGHT          = 0x00020000
70    TOP_SIDE_LEFT           = 0x00040000
71    TOP_SIDE_RIGHT          = 0x00080000
72    TOP_BACK_CENTER         = 0x00100000
73    BOTTOM_FRONT_CENTER     = 0x00200000
74    BOTTOM_FRONT_LEFT       = 0x00400000
75    BOTTOM_FRONT_RIGHT      = 0x00800000
76    FRONT_LEFT_WIDE         = 0x01000000
77    FRONT_RIGHT_WIDE        = 0x02000000
78    LEFT_SURROUND           = 0x04000000
79    RIGHT_SURROUND          = 0x08000000
80
81    @property
82    def channel_count(self) -> int:
83        return bin(self.value).count('1')
84
85
86class AudioInputType(enum.IntEnum):
87    '''Bluetooth Assigned Numbers, Section 6.12.2 - Audio Input Type'''
88
89    # fmt: off
90    UNSPECIFIED = 0x00
91    BLUETOOTH   = 0x01
92    MICROPHONE  = 0x02
93    ANALOG      = 0x03
94    DIGITAL     = 0x04
95    RADIO       = 0x05
96    STREAMING   = 0x06
97    AMBIENT     = 0x07
98
99
100class ContextType(enum.IntFlag):
101    '''Bluetooth Assigned Numbers, Section 6.12.3 - Context Type'''
102
103    # fmt: off
104    PROHIBITED       = 0x0000
105    CONVERSATIONAL   = 0x0002
106    MEDIA            = 0x0004
107    GAME             = 0x0008
108    INSTRUCTIONAL    = 0x0010
109    VOICE_ASSISTANTS = 0x0020
110    LIVE             = 0x0040
111    SOUND_EFFECTS    = 0x0080
112    NOTIFICATIONS    = 0x0100
113    RINGTONE         = 0x0200
114    ALERTS           = 0x0400
115    EMERGENCY_ALARM  = 0x0800
116
117
118class SamplingFrequency(utils.OpenIntEnum):
119    '''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency'''
120
121    # fmt: off
122    FREQ_8000    = 0x01
123    FREQ_11025   = 0x02
124    FREQ_16000   = 0x03
125    FREQ_22050   = 0x04
126    FREQ_24000   = 0x05
127    FREQ_32000   = 0x06
128    FREQ_44100   = 0x07
129    FREQ_48000   = 0x08
130    FREQ_88200   = 0x09
131    FREQ_96000   = 0x0A
132    FREQ_176400  = 0x0B
133    FREQ_192000  = 0x0C
134    FREQ_384000  = 0x0D
135    # fmt: on
136
137    @classmethod
138    def from_hz(cls, frequency: int) -> SamplingFrequency:
139        return {
140            8000: SamplingFrequency.FREQ_8000,
141            11025: SamplingFrequency.FREQ_11025,
142            16000: SamplingFrequency.FREQ_16000,
143            22050: SamplingFrequency.FREQ_22050,
144            24000: SamplingFrequency.FREQ_24000,
145            32000: SamplingFrequency.FREQ_32000,
146            44100: SamplingFrequency.FREQ_44100,
147            48000: SamplingFrequency.FREQ_48000,
148            88200: SamplingFrequency.FREQ_88200,
149            96000: SamplingFrequency.FREQ_96000,
150            176400: SamplingFrequency.FREQ_176400,
151            192000: SamplingFrequency.FREQ_192000,
152            384000: SamplingFrequency.FREQ_384000,
153        }[frequency]
154
155    @property
156    def hz(self) -> int:
157        return {
158            SamplingFrequency.FREQ_8000: 8000,
159            SamplingFrequency.FREQ_11025: 11025,
160            SamplingFrequency.FREQ_16000: 16000,
161            SamplingFrequency.FREQ_22050: 22050,
162            SamplingFrequency.FREQ_24000: 24000,
163            SamplingFrequency.FREQ_32000: 32000,
164            SamplingFrequency.FREQ_44100: 44100,
165            SamplingFrequency.FREQ_48000: 48000,
166            SamplingFrequency.FREQ_88200: 88200,
167            SamplingFrequency.FREQ_96000: 96000,
168            SamplingFrequency.FREQ_176400: 176400,
169            SamplingFrequency.FREQ_192000: 192000,
170            SamplingFrequency.FREQ_384000: 384000,
171        }[self]
172
173
174class SupportedSamplingFrequency(enum.IntFlag):
175    '''Bluetooth Assigned Numbers, Section 6.12.4.1 - Sample Frequency'''
176
177    # fmt: off
178    FREQ_8000    = 1 << (SamplingFrequency.FREQ_8000 - 1)
179    FREQ_11025   = 1 << (SamplingFrequency.FREQ_11025 - 1)
180    FREQ_16000   = 1 << (SamplingFrequency.FREQ_16000 - 1)
181    FREQ_22050   = 1 << (SamplingFrequency.FREQ_22050 - 1)
182    FREQ_24000   = 1 << (SamplingFrequency.FREQ_24000 - 1)
183    FREQ_32000   = 1 << (SamplingFrequency.FREQ_32000 - 1)
184    FREQ_44100   = 1 << (SamplingFrequency.FREQ_44100 - 1)
185    FREQ_48000   = 1 << (SamplingFrequency.FREQ_48000 - 1)
186    FREQ_88200   = 1 << (SamplingFrequency.FREQ_88200 - 1)
187    FREQ_96000   = 1 << (SamplingFrequency.FREQ_96000 - 1)
188    FREQ_176400  = 1 << (SamplingFrequency.FREQ_176400 - 1)
189    FREQ_192000  = 1 << (SamplingFrequency.FREQ_192000 - 1)
190    FREQ_384000  = 1 << (SamplingFrequency.FREQ_384000 - 1)
191    # fmt: on
192
193    @classmethod
194    def from_hz(cls, frequencies: Sequence[int]) -> SupportedSamplingFrequency:
195        MAPPING = {
196            8000: SupportedSamplingFrequency.FREQ_8000,
197            11025: SupportedSamplingFrequency.FREQ_11025,
198            16000: SupportedSamplingFrequency.FREQ_16000,
199            22050: SupportedSamplingFrequency.FREQ_22050,
200            24000: SupportedSamplingFrequency.FREQ_24000,
201            32000: SupportedSamplingFrequency.FREQ_32000,
202            44100: SupportedSamplingFrequency.FREQ_44100,
203            48000: SupportedSamplingFrequency.FREQ_48000,
204            88200: SupportedSamplingFrequency.FREQ_88200,
205            96000: SupportedSamplingFrequency.FREQ_96000,
206            176400: SupportedSamplingFrequency.FREQ_176400,
207            192000: SupportedSamplingFrequency.FREQ_192000,
208            384000: SupportedSamplingFrequency.FREQ_384000,
209        }
210
211        return functools.reduce(
212            lambda x, y: x | MAPPING[y],
213            frequencies,
214            cls(0),
215        )
216
217
218class FrameDuration(enum.IntEnum):
219    '''Bluetooth Assigned Numbers, Section 6.12.5.2 - Frame Duration'''
220
221    # fmt: off
222    DURATION_7500_US  = 0x00
223    DURATION_10000_US = 0x01
224
225    @property
226    def us(self) -> int:
227        return {
228            FrameDuration.DURATION_7500_US: 7500,
229            FrameDuration.DURATION_10000_US: 10000,
230        }[self]
231
232
233class SupportedFrameDuration(enum.IntFlag):
234    '''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration'''
235
236    # fmt: off
237    DURATION_7500_US_SUPPORTED  = 0b0001
238    DURATION_10000_US_SUPPORTED = 0b0010
239    DURATION_7500_US_PREFERRED  = 0b0001
240    DURATION_10000_US_PREFERRED = 0b0010
241
242
243class AnnouncementType(utils.OpenIntEnum):
244    '''Basic Audio Profile, 3.5.3. Additional Audio Stream Control Service requirements'''
245
246    # fmt: off
247    GENERAL  = 0x00
248    TARGETED = 0x01
249
250
251@dataclasses.dataclass
252class UnicastServerAdvertisingData:
253    """Advertising Data for ASCS."""
254
255    announcement_type: AnnouncementType = AnnouncementType.TARGETED
256    available_audio_contexts: ContextType = ContextType.MEDIA
257    metadata: bytes = b''
258
259    def __bytes__(self) -> bytes:
260        return bytes(
261            core.AdvertisingData(
262                [
263                    (
264                        core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
265                        struct.pack(
266                            '<2sBIB',
267                            gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(),
268                            self.announcement_type,
269                            self.available_audio_contexts,
270                            len(self.metadata),
271                        )
272                        + self.metadata,
273                    )
274                ]
275            )
276        )
277
278
279# -----------------------------------------------------------------------------
280# Utils
281# -----------------------------------------------------------------------------
282
283
284def bits_to_channel_counts(data: int) -> List[int]:
285    pos = 0
286    counts = []
287    while data != 0:
288        # Bit 0 = count 1
289        # Bit 1 = count 2, and so on
290        pos += 1
291        if data & 1:
292            counts.append(pos)
293        data >>= 1
294    return counts
295
296
297def channel_counts_to_bits(counts: Sequence[int]) -> int:
298    return sum(set([1 << (count - 1) for count in counts]))
299
300
301# -----------------------------------------------------------------------------
302# Structures
303# -----------------------------------------------------------------------------
304
305
306@dataclasses.dataclass
307class CodecSpecificCapabilities:
308    '''See:
309    * Bluetooth Assigned Numbers, 6.12.4 - Codec Specific Capabilities LTV Structures
310    * Basic Audio Profile, 4.3.1 - Codec_Specific_Capabilities LTV requirements
311    '''
312
313    class Type(enum.IntEnum):
314        # fmt: off
315        SAMPLING_FREQUENCY   = 0x01
316        FRAME_DURATION       = 0x02
317        AUDIO_CHANNEL_COUNT  = 0x03
318        OCTETS_PER_FRAME     = 0x04
319        CODEC_FRAMES_PER_SDU = 0x05
320
321    supported_sampling_frequencies: SupportedSamplingFrequency
322    supported_frame_durations: SupportedFrameDuration
323    supported_audio_channel_count: Sequence[int]
324    min_octets_per_codec_frame: int
325    max_octets_per_codec_frame: int
326    supported_max_codec_frames_per_sdu: int
327
328    @classmethod
329    def from_bytes(cls, data: bytes) -> CodecSpecificCapabilities:
330        offset = 0
331        # Allowed default values.
332        supported_audio_channel_count = [1]
333        supported_max_codec_frames_per_sdu = 1
334        while offset < len(data):
335            length, type = struct.unpack_from('BB', data, offset)
336            offset += 2
337            value = int.from_bytes(data[offset : offset + length - 1], 'little')
338            offset += length - 1
339
340            if type == CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY:
341                supported_sampling_frequencies = SupportedSamplingFrequency(value)
342            elif type == CodecSpecificCapabilities.Type.FRAME_DURATION:
343                supported_frame_durations = SupportedFrameDuration(value)
344            elif type == CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT:
345                supported_audio_channel_count = bits_to_channel_counts(value)
346            elif type == CodecSpecificCapabilities.Type.OCTETS_PER_FRAME:
347                min_octets_per_sample = value & 0xFFFF
348                max_octets_per_sample = value >> 16
349            elif type == CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU:
350                supported_max_codec_frames_per_sdu = value
351
352        # It is expected here that if some fields are missing, an error should be raised.
353        return CodecSpecificCapabilities(
354            supported_sampling_frequencies=supported_sampling_frequencies,
355            supported_frame_durations=supported_frame_durations,
356            supported_audio_channel_count=supported_audio_channel_count,
357            min_octets_per_codec_frame=min_octets_per_sample,
358            max_octets_per_codec_frame=max_octets_per_sample,
359            supported_max_codec_frames_per_sdu=supported_max_codec_frames_per_sdu,
360        )
361
362    def __bytes__(self) -> bytes:
363        return struct.pack(
364            '<BBHBBBBBBBBHHBBB',
365            3,
366            CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY,
367            self.supported_sampling_frequencies,
368            2,
369            CodecSpecificCapabilities.Type.FRAME_DURATION,
370            self.supported_frame_durations,
371            2,
372            CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT,
373            channel_counts_to_bits(self.supported_audio_channel_count),
374            5,
375            CodecSpecificCapabilities.Type.OCTETS_PER_FRAME,
376            self.min_octets_per_codec_frame,
377            self.max_octets_per_codec_frame,
378            2,
379            CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU,
380            self.supported_max_codec_frames_per_sdu,
381        )
382
383
384@dataclasses.dataclass
385class CodecSpecificConfiguration:
386    '''See:
387    * Bluetooth Assigned Numbers, 6.12.5 - Codec Specific Configuration LTV Structures
388    * Basic Audio Profile, 4.3.2 - Codec_Specific_Capabilities LTV requirements
389    '''
390
391    class Type(utils.OpenIntEnum):
392        # fmt: off
393        SAMPLING_FREQUENCY       = 0x01
394        FRAME_DURATION           = 0x02
395        AUDIO_CHANNEL_ALLOCATION = 0x03
396        OCTETS_PER_FRAME         = 0x04
397        CODEC_FRAMES_PER_SDU     = 0x05
398
399    sampling_frequency: SamplingFrequency
400    frame_duration: FrameDuration
401    audio_channel_allocation: AudioLocation
402    octets_per_codec_frame: int
403    codec_frames_per_sdu: int
404
405    @classmethod
406    def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration:
407        offset = 0
408        # Allowed default values.
409        audio_channel_allocation = AudioLocation.NOT_ALLOWED
410        codec_frames_per_sdu = 1
411        while offset < len(data):
412            length, type = struct.unpack_from('BB', data, offset)
413            offset += 2
414            value = int.from_bytes(data[offset : offset + length - 1], 'little')
415            offset += length - 1
416
417            if type == CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY:
418                sampling_frequency = SamplingFrequency(value)
419            elif type == CodecSpecificConfiguration.Type.FRAME_DURATION:
420                frame_duration = FrameDuration(value)
421            elif type == CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION:
422                audio_channel_allocation = AudioLocation(value)
423            elif type == CodecSpecificConfiguration.Type.OCTETS_PER_FRAME:
424                octets_per_codec_frame = value
425            elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU:
426                codec_frames_per_sdu = value
427
428        # It is expected here that if some fields are missing, an error should be raised.
429        return CodecSpecificConfiguration(
430            sampling_frequency=sampling_frequency,
431            frame_duration=frame_duration,
432            audio_channel_allocation=audio_channel_allocation,
433            octets_per_codec_frame=octets_per_codec_frame,
434            codec_frames_per_sdu=codec_frames_per_sdu,
435        )
436
437    def __bytes__(self) -> bytes:
438        return struct.pack(
439            '<BBBBBBBBIBBHBBB',
440            2,
441            CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
442            self.sampling_frequency,
443            2,
444            CodecSpecificConfiguration.Type.FRAME_DURATION,
445            self.frame_duration,
446            5,
447            CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
448            self.audio_channel_allocation,
449            3,
450            CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
451            self.octets_per_codec_frame,
452            2,
453            CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
454            self.codec_frames_per_sdu,
455        )
456
457
458@dataclasses.dataclass
459class BroadcastAudioAnnouncement:
460    broadcast_id: int
461
462    @classmethod
463    def from_bytes(cls, data: bytes) -> Self:
464        return cls(int.from_bytes(data[:3], 'little'))
465
466
467@dataclasses.dataclass
468class BasicAudioAnnouncement:
469    @dataclasses.dataclass
470    class BIS:
471        index: int
472        codec_specific_configuration: CodecSpecificConfiguration
473
474    @dataclasses.dataclass
475    class CodecInfo:
476        coding_format: hci.CodecID
477        company_id: int
478        vendor_specific_codec_id: int
479
480        @classmethod
481        def from_bytes(cls, data: bytes) -> Self:
482            coding_format = hci.CodecID(data[0])
483            company_id = int.from_bytes(data[1:3], 'little')
484            vendor_specific_codec_id = int.from_bytes(data[3:5], 'little')
485            return cls(coding_format, company_id, vendor_specific_codec_id)
486
487    @dataclasses.dataclass
488    class Subgroup:
489        codec_id: BasicAudioAnnouncement.CodecInfo
490        codec_specific_configuration: CodecSpecificConfiguration
491        metadata: le_audio.Metadata
492        bis: List[BasicAudioAnnouncement.BIS]
493
494    presentation_delay: int
495    subgroups: List[BasicAudioAnnouncement.Subgroup]
496
497    @classmethod
498    def from_bytes(cls, data: bytes) -> Self:
499        presentation_delay = int.from_bytes(data[:3], 'little')
500        subgroups = []
501        offset = 4
502        for _ in range(data[3]):
503            num_bis = data[offset]
504            offset += 1
505            codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5])
506            offset += 5
507            codec_specific_configuration_length = data[offset]
508            offset += 1
509            codec_specific_configuration = data[
510                offset : offset + codec_specific_configuration_length
511            ]
512            offset += codec_specific_configuration_length
513            metadata_length = data[offset]
514            offset += 1
515            metadata = le_audio.Metadata.from_bytes(
516                data[offset : offset + metadata_length]
517            )
518            offset += metadata_length
519
520            bis = []
521            for _ in range(num_bis):
522                bis_index = data[offset]
523                offset += 1
524                bis_codec_specific_configuration_length = data[offset]
525                offset += 1
526                bis_codec_specific_configuration = data[
527                    offset : offset + bis_codec_specific_configuration_length
528                ]
529                offset += bis_codec_specific_configuration_length
530                bis.append(
531                    cls.BIS(
532                        bis_index,
533                        CodecSpecificConfiguration.from_bytes(
534                            bis_codec_specific_configuration
535                        ),
536                    )
537                )
538
539            subgroups.append(
540                cls.Subgroup(
541                    codec_id,
542                    CodecSpecificConfiguration.from_bytes(codec_specific_configuration),
543                    metadata,
544                    bis,
545                )
546            )
547
548        return cls(presentation_delay, subgroups)
549