1# Copyright 2021-2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16# ----------------------------------------------------------------------------- 17# Imports 18# ----------------------------------------------------------------------------- 19from __future__ import annotations 20 21from collections.abc import Sequence 22import dataclasses 23import enum 24import struct 25import functools 26import logging 27from typing import List 28from typing_extensions import Self 29 30from bumble import core 31from bumble import hci 32from bumble import gatt 33from bumble import utils 34from bumble.profiles import le_audio 35 36 37# ----------------------------------------------------------------------------- 38# Logging 39# ----------------------------------------------------------------------------- 40logger = logging.getLogger(__name__) 41 42# ----------------------------------------------------------------------------- 43# Constants 44# ----------------------------------------------------------------------------- 45 46 47class AudioLocation(enum.IntFlag): 48 '''Bluetooth Assigned Numbers, Section 6.12.1 - Audio Location''' 49 50 # fmt: off 51 NOT_ALLOWED = 0x00000000 52 FRONT_LEFT = 0x00000001 53 FRONT_RIGHT = 0x00000002 54 FRONT_CENTER = 0x00000004 55 LOW_FREQUENCY_EFFECTS_1 = 0x00000008 56 BACK_LEFT = 0x00000010 57 BACK_RIGHT = 0x00000020 58 FRONT_LEFT_OF_CENTER = 0x00000040 59 FRONT_RIGHT_OF_CENTER = 0x00000080 60 BACK_CENTER = 0x00000100 61 LOW_FREQUENCY_EFFECTS_2 = 0x00000200 62 SIDE_LEFT = 0x00000400 63 SIDE_RIGHT = 0x00000800 64 TOP_FRONT_LEFT = 0x00001000 65 TOP_FRONT_RIGHT = 0x00002000 66 TOP_FRONT_CENTER = 0x00004000 67 TOP_CENTER = 0x00008000 68 TOP_BACK_LEFT = 0x00010000 69 TOP_BACK_RIGHT = 0x00020000 70 TOP_SIDE_LEFT = 0x00040000 71 TOP_SIDE_RIGHT = 0x00080000 72 TOP_BACK_CENTER = 0x00100000 73 BOTTOM_FRONT_CENTER = 0x00200000 74 BOTTOM_FRONT_LEFT = 0x00400000 75 BOTTOM_FRONT_RIGHT = 0x00800000 76 FRONT_LEFT_WIDE = 0x01000000 77 FRONT_RIGHT_WIDE = 0x02000000 78 LEFT_SURROUND = 0x04000000 79 RIGHT_SURROUND = 0x08000000 80 81 @property 82 def channel_count(self) -> int: 83 return bin(self.value).count('1') 84 85 86class AudioInputType(enum.IntEnum): 87 '''Bluetooth Assigned Numbers, Section 6.12.2 - Audio Input Type''' 88 89 # fmt: off 90 UNSPECIFIED = 0x00 91 BLUETOOTH = 0x01 92 MICROPHONE = 0x02 93 ANALOG = 0x03 94 DIGITAL = 0x04 95 RADIO = 0x05 96 STREAMING = 0x06 97 AMBIENT = 0x07 98 99 100class ContextType(enum.IntFlag): 101 '''Bluetooth Assigned Numbers, Section 6.12.3 - Context Type''' 102 103 # fmt: off 104 PROHIBITED = 0x0000 105 CONVERSATIONAL = 0x0002 106 MEDIA = 0x0004 107 GAME = 0x0008 108 INSTRUCTIONAL = 0x0010 109 VOICE_ASSISTANTS = 0x0020 110 LIVE = 0x0040 111 SOUND_EFFECTS = 0x0080 112 NOTIFICATIONS = 0x0100 113 RINGTONE = 0x0200 114 ALERTS = 0x0400 115 EMERGENCY_ALARM = 0x0800 116 117 118class SamplingFrequency(utils.OpenIntEnum): 119 '''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency''' 120 121 # fmt: off 122 FREQ_8000 = 0x01 123 FREQ_11025 = 0x02 124 FREQ_16000 = 0x03 125 FREQ_22050 = 0x04 126 FREQ_24000 = 0x05 127 FREQ_32000 = 0x06 128 FREQ_44100 = 0x07 129 FREQ_48000 = 0x08 130 FREQ_88200 = 0x09 131 FREQ_96000 = 0x0A 132 FREQ_176400 = 0x0B 133 FREQ_192000 = 0x0C 134 FREQ_384000 = 0x0D 135 # fmt: on 136 137 @classmethod 138 def from_hz(cls, frequency: int) -> SamplingFrequency: 139 return { 140 8000: SamplingFrequency.FREQ_8000, 141 11025: SamplingFrequency.FREQ_11025, 142 16000: SamplingFrequency.FREQ_16000, 143 22050: SamplingFrequency.FREQ_22050, 144 24000: SamplingFrequency.FREQ_24000, 145 32000: SamplingFrequency.FREQ_32000, 146 44100: SamplingFrequency.FREQ_44100, 147 48000: SamplingFrequency.FREQ_48000, 148 88200: SamplingFrequency.FREQ_88200, 149 96000: SamplingFrequency.FREQ_96000, 150 176400: SamplingFrequency.FREQ_176400, 151 192000: SamplingFrequency.FREQ_192000, 152 384000: SamplingFrequency.FREQ_384000, 153 }[frequency] 154 155 @property 156 def hz(self) -> int: 157 return { 158 SamplingFrequency.FREQ_8000: 8000, 159 SamplingFrequency.FREQ_11025: 11025, 160 SamplingFrequency.FREQ_16000: 16000, 161 SamplingFrequency.FREQ_22050: 22050, 162 SamplingFrequency.FREQ_24000: 24000, 163 SamplingFrequency.FREQ_32000: 32000, 164 SamplingFrequency.FREQ_44100: 44100, 165 SamplingFrequency.FREQ_48000: 48000, 166 SamplingFrequency.FREQ_88200: 88200, 167 SamplingFrequency.FREQ_96000: 96000, 168 SamplingFrequency.FREQ_176400: 176400, 169 SamplingFrequency.FREQ_192000: 192000, 170 SamplingFrequency.FREQ_384000: 384000, 171 }[self] 172 173 174class SupportedSamplingFrequency(enum.IntFlag): 175 '''Bluetooth Assigned Numbers, Section 6.12.4.1 - Sample Frequency''' 176 177 # fmt: off 178 FREQ_8000 = 1 << (SamplingFrequency.FREQ_8000 - 1) 179 FREQ_11025 = 1 << (SamplingFrequency.FREQ_11025 - 1) 180 FREQ_16000 = 1 << (SamplingFrequency.FREQ_16000 - 1) 181 FREQ_22050 = 1 << (SamplingFrequency.FREQ_22050 - 1) 182 FREQ_24000 = 1 << (SamplingFrequency.FREQ_24000 - 1) 183 FREQ_32000 = 1 << (SamplingFrequency.FREQ_32000 - 1) 184 FREQ_44100 = 1 << (SamplingFrequency.FREQ_44100 - 1) 185 FREQ_48000 = 1 << (SamplingFrequency.FREQ_48000 - 1) 186 FREQ_88200 = 1 << (SamplingFrequency.FREQ_88200 - 1) 187 FREQ_96000 = 1 << (SamplingFrequency.FREQ_96000 - 1) 188 FREQ_176400 = 1 << (SamplingFrequency.FREQ_176400 - 1) 189 FREQ_192000 = 1 << (SamplingFrequency.FREQ_192000 - 1) 190 FREQ_384000 = 1 << (SamplingFrequency.FREQ_384000 - 1) 191 # fmt: on 192 193 @classmethod 194 def from_hz(cls, frequencies: Sequence[int]) -> SupportedSamplingFrequency: 195 MAPPING = { 196 8000: SupportedSamplingFrequency.FREQ_8000, 197 11025: SupportedSamplingFrequency.FREQ_11025, 198 16000: SupportedSamplingFrequency.FREQ_16000, 199 22050: SupportedSamplingFrequency.FREQ_22050, 200 24000: SupportedSamplingFrequency.FREQ_24000, 201 32000: SupportedSamplingFrequency.FREQ_32000, 202 44100: SupportedSamplingFrequency.FREQ_44100, 203 48000: SupportedSamplingFrequency.FREQ_48000, 204 88200: SupportedSamplingFrequency.FREQ_88200, 205 96000: SupportedSamplingFrequency.FREQ_96000, 206 176400: SupportedSamplingFrequency.FREQ_176400, 207 192000: SupportedSamplingFrequency.FREQ_192000, 208 384000: SupportedSamplingFrequency.FREQ_384000, 209 } 210 211 return functools.reduce( 212 lambda x, y: x | MAPPING[y], 213 frequencies, 214 cls(0), 215 ) 216 217 218class FrameDuration(enum.IntEnum): 219 '''Bluetooth Assigned Numbers, Section 6.12.5.2 - Frame Duration''' 220 221 # fmt: off 222 DURATION_7500_US = 0x00 223 DURATION_10000_US = 0x01 224 225 @property 226 def us(self) -> int: 227 return { 228 FrameDuration.DURATION_7500_US: 7500, 229 FrameDuration.DURATION_10000_US: 10000, 230 }[self] 231 232 233class SupportedFrameDuration(enum.IntFlag): 234 '''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration''' 235 236 # fmt: off 237 DURATION_7500_US_SUPPORTED = 0b0001 238 DURATION_10000_US_SUPPORTED = 0b0010 239 DURATION_7500_US_PREFERRED = 0b0001 240 DURATION_10000_US_PREFERRED = 0b0010 241 242 243class AnnouncementType(utils.OpenIntEnum): 244 '''Basic Audio Profile, 3.5.3. Additional Audio Stream Control Service requirements''' 245 246 # fmt: off 247 GENERAL = 0x00 248 TARGETED = 0x01 249 250 251@dataclasses.dataclass 252class UnicastServerAdvertisingData: 253 """Advertising Data for ASCS.""" 254 255 announcement_type: AnnouncementType = AnnouncementType.TARGETED 256 available_audio_contexts: ContextType = ContextType.MEDIA 257 metadata: bytes = b'' 258 259 def __bytes__(self) -> bytes: 260 return bytes( 261 core.AdvertisingData( 262 [ 263 ( 264 core.AdvertisingData.SERVICE_DATA_16_BIT_UUID, 265 struct.pack( 266 '<2sBIB', 267 gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(), 268 self.announcement_type, 269 self.available_audio_contexts, 270 len(self.metadata), 271 ) 272 + self.metadata, 273 ) 274 ] 275 ) 276 ) 277 278 279# ----------------------------------------------------------------------------- 280# Utils 281# ----------------------------------------------------------------------------- 282 283 284def bits_to_channel_counts(data: int) -> List[int]: 285 pos = 0 286 counts = [] 287 while data != 0: 288 # Bit 0 = count 1 289 # Bit 1 = count 2, and so on 290 pos += 1 291 if data & 1: 292 counts.append(pos) 293 data >>= 1 294 return counts 295 296 297def channel_counts_to_bits(counts: Sequence[int]) -> int: 298 return sum(set([1 << (count - 1) for count in counts])) 299 300 301# ----------------------------------------------------------------------------- 302# Structures 303# ----------------------------------------------------------------------------- 304 305 306@dataclasses.dataclass 307class CodecSpecificCapabilities: 308 '''See: 309 * Bluetooth Assigned Numbers, 6.12.4 - Codec Specific Capabilities LTV Structures 310 * Basic Audio Profile, 4.3.1 - Codec_Specific_Capabilities LTV requirements 311 ''' 312 313 class Type(enum.IntEnum): 314 # fmt: off 315 SAMPLING_FREQUENCY = 0x01 316 FRAME_DURATION = 0x02 317 AUDIO_CHANNEL_COUNT = 0x03 318 OCTETS_PER_FRAME = 0x04 319 CODEC_FRAMES_PER_SDU = 0x05 320 321 supported_sampling_frequencies: SupportedSamplingFrequency 322 supported_frame_durations: SupportedFrameDuration 323 supported_audio_channel_count: Sequence[int] 324 min_octets_per_codec_frame: int 325 max_octets_per_codec_frame: int 326 supported_max_codec_frames_per_sdu: int 327 328 @classmethod 329 def from_bytes(cls, data: bytes) -> CodecSpecificCapabilities: 330 offset = 0 331 # Allowed default values. 332 supported_audio_channel_count = [1] 333 supported_max_codec_frames_per_sdu = 1 334 while offset < len(data): 335 length, type = struct.unpack_from('BB', data, offset) 336 offset += 2 337 value = int.from_bytes(data[offset : offset + length - 1], 'little') 338 offset += length - 1 339 340 if type == CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY: 341 supported_sampling_frequencies = SupportedSamplingFrequency(value) 342 elif type == CodecSpecificCapabilities.Type.FRAME_DURATION: 343 supported_frame_durations = SupportedFrameDuration(value) 344 elif type == CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT: 345 supported_audio_channel_count = bits_to_channel_counts(value) 346 elif type == CodecSpecificCapabilities.Type.OCTETS_PER_FRAME: 347 min_octets_per_sample = value & 0xFFFF 348 max_octets_per_sample = value >> 16 349 elif type == CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU: 350 supported_max_codec_frames_per_sdu = value 351 352 # It is expected here that if some fields are missing, an error should be raised. 353 return CodecSpecificCapabilities( 354 supported_sampling_frequencies=supported_sampling_frequencies, 355 supported_frame_durations=supported_frame_durations, 356 supported_audio_channel_count=supported_audio_channel_count, 357 min_octets_per_codec_frame=min_octets_per_sample, 358 max_octets_per_codec_frame=max_octets_per_sample, 359 supported_max_codec_frames_per_sdu=supported_max_codec_frames_per_sdu, 360 ) 361 362 def __bytes__(self) -> bytes: 363 return struct.pack( 364 '<BBHBBBBBBBBHHBBB', 365 3, 366 CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY, 367 self.supported_sampling_frequencies, 368 2, 369 CodecSpecificCapabilities.Type.FRAME_DURATION, 370 self.supported_frame_durations, 371 2, 372 CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT, 373 channel_counts_to_bits(self.supported_audio_channel_count), 374 5, 375 CodecSpecificCapabilities.Type.OCTETS_PER_FRAME, 376 self.min_octets_per_codec_frame, 377 self.max_octets_per_codec_frame, 378 2, 379 CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU, 380 self.supported_max_codec_frames_per_sdu, 381 ) 382 383 384@dataclasses.dataclass 385class CodecSpecificConfiguration: 386 '''See: 387 * Bluetooth Assigned Numbers, 6.12.5 - Codec Specific Configuration LTV Structures 388 * Basic Audio Profile, 4.3.2 - Codec_Specific_Capabilities LTV requirements 389 ''' 390 391 class Type(utils.OpenIntEnum): 392 # fmt: off 393 SAMPLING_FREQUENCY = 0x01 394 FRAME_DURATION = 0x02 395 AUDIO_CHANNEL_ALLOCATION = 0x03 396 OCTETS_PER_FRAME = 0x04 397 CODEC_FRAMES_PER_SDU = 0x05 398 399 sampling_frequency: SamplingFrequency 400 frame_duration: FrameDuration 401 audio_channel_allocation: AudioLocation 402 octets_per_codec_frame: int 403 codec_frames_per_sdu: int 404 405 @classmethod 406 def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration: 407 offset = 0 408 # Allowed default values. 409 audio_channel_allocation = AudioLocation.NOT_ALLOWED 410 codec_frames_per_sdu = 1 411 while offset < len(data): 412 length, type = struct.unpack_from('BB', data, offset) 413 offset += 2 414 value = int.from_bytes(data[offset : offset + length - 1], 'little') 415 offset += length - 1 416 417 if type == CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY: 418 sampling_frequency = SamplingFrequency(value) 419 elif type == CodecSpecificConfiguration.Type.FRAME_DURATION: 420 frame_duration = FrameDuration(value) 421 elif type == CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION: 422 audio_channel_allocation = AudioLocation(value) 423 elif type == CodecSpecificConfiguration.Type.OCTETS_PER_FRAME: 424 octets_per_codec_frame = value 425 elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU: 426 codec_frames_per_sdu = value 427 428 # It is expected here that if some fields are missing, an error should be raised. 429 return CodecSpecificConfiguration( 430 sampling_frequency=sampling_frequency, 431 frame_duration=frame_duration, 432 audio_channel_allocation=audio_channel_allocation, 433 octets_per_codec_frame=octets_per_codec_frame, 434 codec_frames_per_sdu=codec_frames_per_sdu, 435 ) 436 437 def __bytes__(self) -> bytes: 438 return struct.pack( 439 '<BBBBBBBBIBBHBBB', 440 2, 441 CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY, 442 self.sampling_frequency, 443 2, 444 CodecSpecificConfiguration.Type.FRAME_DURATION, 445 self.frame_duration, 446 5, 447 CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION, 448 self.audio_channel_allocation, 449 3, 450 CodecSpecificConfiguration.Type.OCTETS_PER_FRAME, 451 self.octets_per_codec_frame, 452 2, 453 CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU, 454 self.codec_frames_per_sdu, 455 ) 456 457 458@dataclasses.dataclass 459class BroadcastAudioAnnouncement: 460 broadcast_id: int 461 462 @classmethod 463 def from_bytes(cls, data: bytes) -> Self: 464 return cls(int.from_bytes(data[:3], 'little')) 465 466 467@dataclasses.dataclass 468class BasicAudioAnnouncement: 469 @dataclasses.dataclass 470 class BIS: 471 index: int 472 codec_specific_configuration: CodecSpecificConfiguration 473 474 @dataclasses.dataclass 475 class CodecInfo: 476 coding_format: hci.CodecID 477 company_id: int 478 vendor_specific_codec_id: int 479 480 @classmethod 481 def from_bytes(cls, data: bytes) -> Self: 482 coding_format = hci.CodecID(data[0]) 483 company_id = int.from_bytes(data[1:3], 'little') 484 vendor_specific_codec_id = int.from_bytes(data[3:5], 'little') 485 return cls(coding_format, company_id, vendor_specific_codec_id) 486 487 @dataclasses.dataclass 488 class Subgroup: 489 codec_id: BasicAudioAnnouncement.CodecInfo 490 codec_specific_configuration: CodecSpecificConfiguration 491 metadata: le_audio.Metadata 492 bis: List[BasicAudioAnnouncement.BIS] 493 494 presentation_delay: int 495 subgroups: List[BasicAudioAnnouncement.Subgroup] 496 497 @classmethod 498 def from_bytes(cls, data: bytes) -> Self: 499 presentation_delay = int.from_bytes(data[:3], 'little') 500 subgroups = [] 501 offset = 4 502 for _ in range(data[3]): 503 num_bis = data[offset] 504 offset += 1 505 codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5]) 506 offset += 5 507 codec_specific_configuration_length = data[offset] 508 offset += 1 509 codec_specific_configuration = data[ 510 offset : offset + codec_specific_configuration_length 511 ] 512 offset += codec_specific_configuration_length 513 metadata_length = data[offset] 514 offset += 1 515 metadata = le_audio.Metadata.from_bytes( 516 data[offset : offset + metadata_length] 517 ) 518 offset += metadata_length 519 520 bis = [] 521 for _ in range(num_bis): 522 bis_index = data[offset] 523 offset += 1 524 bis_codec_specific_configuration_length = data[offset] 525 offset += 1 526 bis_codec_specific_configuration = data[ 527 offset : offset + bis_codec_specific_configuration_length 528 ] 529 offset += bis_codec_specific_configuration_length 530 bis.append( 531 cls.BIS( 532 bis_index, 533 CodecSpecificConfiguration.from_bytes( 534 bis_codec_specific_configuration 535 ), 536 ) 537 ) 538 539 subgroups.append( 540 cls.Subgroup( 541 codec_id, 542 CodecSpecificConfiguration.from_bytes(codec_specific_configuration), 543 metadata, 544 bis, 545 ) 546 ) 547 548 return cls(presentation_delay, subgroups) 549