1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for 13 14"""LE Audio - Published Audio Capabilities Service""" 15 16# ----------------------------------------------------------------------------- 17# Imports 18# ----------------------------------------------------------------------------- 19from __future__ import annotations 20import dataclasses 21import logging 22import struct 23from typing import Optional, Sequence, Union 24 25from bumble.profiles.bap import AudioLocation, CodecSpecificCapabilities, ContextType 26from bumble.profiles import le_audio 27from bumble import gatt 28from bumble import gatt_client 29from bumble import hci 30 31 32# ----------------------------------------------------------------------------- 33# Logging 34# ----------------------------------------------------------------------------- 35logger = logging.getLogger(__name__) 36 37 38# ----------------------------------------------------------------------------- 39@dataclasses.dataclass 40class PacRecord: 41 '''Published Audio Capabilities Service, Table 3.2/3.4.''' 42 43 coding_format: hci.CodingFormat 44 codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes] 45 metadata: le_audio.Metadata = dataclasses.field(default_factory=le_audio.Metadata) 46 47 @classmethod 48 def from_bytes(cls, data: bytes) -> PacRecord: 49 offset, coding_format = hci.CodingFormat.parse_from_bytes(data, 0) 50 codec_specific_capabilities_size = data[offset] 51 52 offset += 1 53 codec_specific_capabilities_bytes = data[ 54 offset : offset + codec_specific_capabilities_size 55 ] 56 offset += codec_specific_capabilities_size 57 metadata_size = data[offset] 58 offset += 1 59 metadata = le_audio.Metadata.from_bytes(data[offset : offset + metadata_size]) 60 61 codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes] 62 if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC: 63 codec_specific_capabilities = codec_specific_capabilities_bytes 64 else: 65 codec_specific_capabilities = CodecSpecificCapabilities.from_bytes( 66 codec_specific_capabilities_bytes 67 ) 68 69 return PacRecord( 70 coding_format=coding_format, 71 codec_specific_capabilities=codec_specific_capabilities, 72 metadata=metadata, 73 ) 74 75 def __bytes__(self) -> bytes: 76 capabilities_bytes = bytes(self.codec_specific_capabilities) 77 metadata_bytes = bytes(self.metadata) 78 return ( 79 bytes(self.coding_format) 80 + bytes([len(capabilities_bytes)]) 81 + capabilities_bytes 82 + bytes([len(metadata_bytes)]) 83 + metadata_bytes 84 ) 85 86 87# ----------------------------------------------------------------------------- 88# Server 89# ----------------------------------------------------------------------------- 90class PublishedAudioCapabilitiesService(gatt.TemplateService): 91 UUID = gatt.GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE 92 93 sink_pac: Optional[gatt.Characteristic] 94 sink_audio_locations: Optional[gatt.Characteristic] 95 source_pac: Optional[gatt.Characteristic] 96 source_audio_locations: Optional[gatt.Characteristic] 97 available_audio_contexts: gatt.Characteristic 98 supported_audio_contexts: gatt.Characteristic 99 100 def __init__( 101 self, 102 supported_source_context: ContextType, 103 supported_sink_context: ContextType, 104 available_source_context: ContextType, 105 available_sink_context: ContextType, 106 sink_pac: Sequence[PacRecord] = (), 107 sink_audio_locations: Optional[AudioLocation] = None, 108 source_pac: Sequence[PacRecord] = (), 109 source_audio_locations: Optional[AudioLocation] = None, 110 ) -> None: 111 characteristics = [] 112 113 self.supported_audio_contexts = gatt.Characteristic( 114 uuid=gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC, 115 properties=gatt.Characteristic.Properties.READ, 116 permissions=gatt.Characteristic.Permissions.READABLE, 117 value=struct.pack('<HH', supported_sink_context, supported_source_context), 118 ) 119 characteristics.append(self.supported_audio_contexts) 120 121 self.available_audio_contexts = gatt.Characteristic( 122 uuid=gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC, 123 properties=gatt.Characteristic.Properties.READ 124 | gatt.Characteristic.Properties.NOTIFY, 125 permissions=gatt.Characteristic.Permissions.READABLE, 126 value=struct.pack('<HH', available_sink_context, available_source_context), 127 ) 128 characteristics.append(self.available_audio_contexts) 129 130 if sink_pac: 131 self.sink_pac = gatt.Characteristic( 132 uuid=gatt.GATT_SINK_PAC_CHARACTERISTIC, 133 properties=gatt.Characteristic.Properties.READ, 134 permissions=gatt.Characteristic.Permissions.READABLE, 135 value=bytes([len(sink_pac)]) + b''.join(map(bytes, sink_pac)), 136 ) 137 characteristics.append(self.sink_pac) 138 139 if sink_audio_locations is not None: 140 self.sink_audio_locations = gatt.Characteristic( 141 uuid=gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC, 142 properties=gatt.Characteristic.Properties.READ, 143 permissions=gatt.Characteristic.Permissions.READABLE, 144 value=struct.pack('<I', sink_audio_locations), 145 ) 146 characteristics.append(self.sink_audio_locations) 147 148 if source_pac: 149 self.source_pac = gatt.Characteristic( 150 uuid=gatt.GATT_SOURCE_PAC_CHARACTERISTIC, 151 properties=gatt.Characteristic.Properties.READ, 152 permissions=gatt.Characteristic.Permissions.READABLE, 153 value=bytes([len(source_pac)]) + b''.join(map(bytes, source_pac)), 154 ) 155 characteristics.append(self.source_pac) 156 157 if source_audio_locations is not None: 158 self.source_audio_locations = gatt.Characteristic( 159 uuid=gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC, 160 properties=gatt.Characteristic.Properties.READ, 161 permissions=gatt.Characteristic.Permissions.READABLE, 162 value=struct.pack('<I', source_audio_locations), 163 ) 164 characteristics.append(self.source_audio_locations) 165 166 super().__init__(characteristics) 167 168 169# ----------------------------------------------------------------------------- 170# Client 171# ----------------------------------------------------------------------------- 172class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy): 173 SERVICE_CLASS = PublishedAudioCapabilitiesService 174 175 sink_pac: Optional[gatt_client.CharacteristicProxy] = None 176 sink_audio_locations: Optional[gatt_client.CharacteristicProxy] = None 177 source_pac: Optional[gatt_client.CharacteristicProxy] = None 178 source_audio_locations: Optional[gatt_client.CharacteristicProxy] = None 179 available_audio_contexts: gatt_client.CharacteristicProxy 180 supported_audio_contexts: gatt_client.CharacteristicProxy 181 182 def __init__(self, service_proxy: gatt_client.ServiceProxy): 183 self.service_proxy = service_proxy 184 185 self.available_audio_contexts = service_proxy.get_characteristics_by_uuid( 186 gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC 187 )[0] 188 self.supported_audio_contexts = service_proxy.get_characteristics_by_uuid( 189 gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC 190 )[0] 191 192 if characteristics := service_proxy.get_characteristics_by_uuid( 193 gatt.GATT_SINK_PAC_CHARACTERISTIC 194 ): 195 self.sink_pac = characteristics[0] 196 197 if characteristics := service_proxy.get_characteristics_by_uuid( 198 gatt.GATT_SOURCE_PAC_CHARACTERISTIC 199 ): 200 self.source_pac = characteristics[0] 201 202 if characteristics := service_proxy.get_characteristics_by_uuid( 203 gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC 204 ): 205 self.sink_audio_locations = characteristics[0] 206 207 if characteristics := service_proxy.get_characteristics_by_uuid( 208 gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC 209 ): 210 self.source_audio_locations = characteristics[0] 211