1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for
13
14"""LE Audio - Published Audio Capabilities Service"""
15
16# -----------------------------------------------------------------------------
17# Imports
18# -----------------------------------------------------------------------------
19from __future__ import annotations
20import dataclasses
21import logging
22import struct
23from typing import Optional, Sequence, Union
24
25from bumble.profiles.bap import AudioLocation, CodecSpecificCapabilities, ContextType
26from bumble.profiles import le_audio
27from bumble import gatt
28from bumble import gatt_client
29from bumble import hci
30
31
32# -----------------------------------------------------------------------------
33# Logging
34# -----------------------------------------------------------------------------
35logger = logging.getLogger(__name__)
36
37
38# -----------------------------------------------------------------------------
39@dataclasses.dataclass
40class PacRecord:
41    '''Published Audio Capabilities Service, Table 3.2/3.4.'''
42
43    coding_format: hci.CodingFormat
44    codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
45    metadata: le_audio.Metadata = dataclasses.field(default_factory=le_audio.Metadata)
46
47    @classmethod
48    def from_bytes(cls, data: bytes) -> PacRecord:
49        offset, coding_format = hci.CodingFormat.parse_from_bytes(data, 0)
50        codec_specific_capabilities_size = data[offset]
51
52        offset += 1
53        codec_specific_capabilities_bytes = data[
54            offset : offset + codec_specific_capabilities_size
55        ]
56        offset += codec_specific_capabilities_size
57        metadata_size = data[offset]
58        offset += 1
59        metadata = le_audio.Metadata.from_bytes(data[offset : offset + metadata_size])
60
61        codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
62        if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC:
63            codec_specific_capabilities = codec_specific_capabilities_bytes
64        else:
65            codec_specific_capabilities = CodecSpecificCapabilities.from_bytes(
66                codec_specific_capabilities_bytes
67            )
68
69        return PacRecord(
70            coding_format=coding_format,
71            codec_specific_capabilities=codec_specific_capabilities,
72            metadata=metadata,
73        )
74
75    def __bytes__(self) -> bytes:
76        capabilities_bytes = bytes(self.codec_specific_capabilities)
77        metadata_bytes = bytes(self.metadata)
78        return (
79            bytes(self.coding_format)
80            + bytes([len(capabilities_bytes)])
81            + capabilities_bytes
82            + bytes([len(metadata_bytes)])
83            + metadata_bytes
84        )
85
86
87# -----------------------------------------------------------------------------
88# Server
89# -----------------------------------------------------------------------------
90class PublishedAudioCapabilitiesService(gatt.TemplateService):
91    UUID = gatt.GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE
92
93    sink_pac: Optional[gatt.Characteristic]
94    sink_audio_locations: Optional[gatt.Characteristic]
95    source_pac: Optional[gatt.Characteristic]
96    source_audio_locations: Optional[gatt.Characteristic]
97    available_audio_contexts: gatt.Characteristic
98    supported_audio_contexts: gatt.Characteristic
99
100    def __init__(
101        self,
102        supported_source_context: ContextType,
103        supported_sink_context: ContextType,
104        available_source_context: ContextType,
105        available_sink_context: ContextType,
106        sink_pac: Sequence[PacRecord] = (),
107        sink_audio_locations: Optional[AudioLocation] = None,
108        source_pac: Sequence[PacRecord] = (),
109        source_audio_locations: Optional[AudioLocation] = None,
110    ) -> None:
111        characteristics = []
112
113        self.supported_audio_contexts = gatt.Characteristic(
114            uuid=gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC,
115            properties=gatt.Characteristic.Properties.READ,
116            permissions=gatt.Characteristic.Permissions.READABLE,
117            value=struct.pack('<HH', supported_sink_context, supported_source_context),
118        )
119        characteristics.append(self.supported_audio_contexts)
120
121        self.available_audio_contexts = gatt.Characteristic(
122            uuid=gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC,
123            properties=gatt.Characteristic.Properties.READ
124            | gatt.Characteristic.Properties.NOTIFY,
125            permissions=gatt.Characteristic.Permissions.READABLE,
126            value=struct.pack('<HH', available_sink_context, available_source_context),
127        )
128        characteristics.append(self.available_audio_contexts)
129
130        if sink_pac:
131            self.sink_pac = gatt.Characteristic(
132                uuid=gatt.GATT_SINK_PAC_CHARACTERISTIC,
133                properties=gatt.Characteristic.Properties.READ,
134                permissions=gatt.Characteristic.Permissions.READABLE,
135                value=bytes([len(sink_pac)]) + b''.join(map(bytes, sink_pac)),
136            )
137            characteristics.append(self.sink_pac)
138
139        if sink_audio_locations is not None:
140            self.sink_audio_locations = gatt.Characteristic(
141                uuid=gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC,
142                properties=gatt.Characteristic.Properties.READ,
143                permissions=gatt.Characteristic.Permissions.READABLE,
144                value=struct.pack('<I', sink_audio_locations),
145            )
146            characteristics.append(self.sink_audio_locations)
147
148        if source_pac:
149            self.source_pac = gatt.Characteristic(
150                uuid=gatt.GATT_SOURCE_PAC_CHARACTERISTIC,
151                properties=gatt.Characteristic.Properties.READ,
152                permissions=gatt.Characteristic.Permissions.READABLE,
153                value=bytes([len(source_pac)]) + b''.join(map(bytes, source_pac)),
154            )
155            characteristics.append(self.source_pac)
156
157        if source_audio_locations is not None:
158            self.source_audio_locations = gatt.Characteristic(
159                uuid=gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC,
160                properties=gatt.Characteristic.Properties.READ,
161                permissions=gatt.Characteristic.Permissions.READABLE,
162                value=struct.pack('<I', source_audio_locations),
163            )
164            characteristics.append(self.source_audio_locations)
165
166        super().__init__(characteristics)
167
168
169# -----------------------------------------------------------------------------
170# Client
171# -----------------------------------------------------------------------------
172class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
173    SERVICE_CLASS = PublishedAudioCapabilitiesService
174
175    sink_pac: Optional[gatt_client.CharacteristicProxy] = None
176    sink_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
177    source_pac: Optional[gatt_client.CharacteristicProxy] = None
178    source_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
179    available_audio_contexts: gatt_client.CharacteristicProxy
180    supported_audio_contexts: gatt_client.CharacteristicProxy
181
182    def __init__(self, service_proxy: gatt_client.ServiceProxy):
183        self.service_proxy = service_proxy
184
185        self.available_audio_contexts = service_proxy.get_characteristics_by_uuid(
186            gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
187        )[0]
188        self.supported_audio_contexts = service_proxy.get_characteristics_by_uuid(
189            gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
190        )[0]
191
192        if characteristics := service_proxy.get_characteristics_by_uuid(
193            gatt.GATT_SINK_PAC_CHARACTERISTIC
194        ):
195            self.sink_pac = characteristics[0]
196
197        if characteristics := service_proxy.get_characteristics_by_uuid(
198            gatt.GATT_SOURCE_PAC_CHARACTERISTIC
199        ):
200            self.source_pac = characteristics[0]
201
202        if characteristics := service_proxy.get_characteristics_by_uuid(
203            gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC
204        ):
205            self.sink_audio_locations = characteristics[0]
206
207        if characteristics := service_proxy.get_characteristics_by_uuid(
208            gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
209        ):
210            self.source_audio_locations = characteristics[0]
211