1# Copyright 2021-2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16# ----------------------------------------------------------------------------- 17# Imports 18# ----------------------------------------------------------------------------- 19from __future__ import annotations 20import enum 21 22from bumble import att 23from bumble import device 24from bumble import gatt 25from bumble import gatt_client 26 27from typing import Optional, Sequence 28 29# ----------------------------------------------------------------------------- 30# Constants 31# ----------------------------------------------------------------------------- 32 33MIN_VOLUME = 0 34MAX_VOLUME = 255 35 36 37class ErrorCode(enum.IntEnum): 38 ''' 39 See Volume Control Service 1.6. Application error codes. 40 ''' 41 42 INVALID_CHANGE_COUNTER = 0x80 43 OPCODE_NOT_SUPPORTED = 0x81 44 45 46class VolumeFlags(enum.IntFlag): 47 ''' 48 See Volume Control Service 3.3. Volume Flags. 49 ''' 50 51 VOLUME_SETTING_PERSISTED = 0x01 52 # RFU 53 54 55class VolumeControlPointOpcode(enum.IntEnum): 56 ''' 57 See Volume Control Service Table 3.3: Volume Control Point procedure requirements. 58 ''' 59 60 # fmt: off 61 RELATIVE_VOLUME_DOWN = 0x00 62 RELATIVE_VOLUME_UP = 0x01 63 UNMUTE_RELATIVE_VOLUME_DOWN = 0x02 64 UNMUTE_RELATIVE_VOLUME_UP = 0x03 65 SET_ABSOLUTE_VOLUME = 0x04 66 UNMUTE = 0x05 67 MUTE = 0x06 68 69 70# ----------------------------------------------------------------------------- 71# Server 72# ----------------------------------------------------------------------------- 73class VolumeControlService(gatt.TemplateService): 74 UUID = gatt.GATT_VOLUME_CONTROL_SERVICE 75 76 volume_state: gatt.Characteristic 77 volume_control_point: gatt.Characteristic 78 volume_flags: gatt.Characteristic 79 80 volume_setting: int 81 muted: int 82 change_counter: int 83 84 def __init__( 85 self, 86 step_size: int = 16, 87 volume_setting: int = 0, 88 muted: int = 0, 89 change_counter: int = 0, 90 volume_flags: int = 0, 91 included_services: Sequence[gatt.Service] = (), 92 ) -> None: 93 self.step_size = step_size 94 self.volume_setting = volume_setting 95 self.muted = muted 96 self.change_counter = change_counter 97 98 self.volume_state = gatt.Characteristic( 99 uuid=gatt.GATT_VOLUME_STATE_CHARACTERISTIC, 100 properties=( 101 gatt.Characteristic.Properties.READ 102 | gatt.Characteristic.Properties.NOTIFY 103 ), 104 permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, 105 value=gatt.CharacteristicValue(read=self._on_read_volume_state), 106 ) 107 self.volume_control_point = gatt.Characteristic( 108 uuid=gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC, 109 properties=gatt.Characteristic.Properties.WRITE, 110 permissions=gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION, 111 value=gatt.CharacteristicValue(write=self._on_write_volume_control_point), 112 ) 113 self.volume_flags = gatt.Characteristic( 114 uuid=gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC, 115 properties=gatt.Characteristic.Properties.READ, 116 permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, 117 value=bytes([volume_flags]), 118 ) 119 120 super().__init__( 121 characteristics=[ 122 self.volume_state, 123 self.volume_control_point, 124 self.volume_flags, 125 ], 126 included_services=list(included_services), 127 ) 128 129 @property 130 def volume_state_bytes(self) -> bytes: 131 return bytes([self.volume_setting, self.muted, self.change_counter]) 132 133 @volume_state_bytes.setter 134 def volume_state_bytes(self, new_value: bytes) -> None: 135 self.volume_setting, self.muted, self.change_counter = new_value 136 137 def _on_read_volume_state(self, _connection: Optional[device.Connection]) -> bytes: 138 return self.volume_state_bytes 139 140 def _on_write_volume_control_point( 141 self, connection: Optional[device.Connection], value: bytes 142 ) -> None: 143 assert connection 144 145 opcode = VolumeControlPointOpcode(value[0]) 146 change_counter = value[1] 147 148 if change_counter != self.change_counter: 149 raise att.ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER) 150 151 handler = getattr(self, '_on_' + opcode.name.lower()) 152 if handler(*value[2:]): 153 self.change_counter = (self.change_counter + 1) % 256 154 connection.abort_on( 155 'disconnection', 156 connection.device.notify_subscribers( 157 attribute=self.volume_state, 158 value=self.volume_state_bytes, 159 ), 160 ) 161 self.emit( 162 'volume_state', self.volume_setting, self.muted, self.change_counter 163 ) 164 165 def _on_relative_volume_down(self) -> bool: 166 old_volume = self.volume_setting 167 self.volume_setting = max(self.volume_setting - self.step_size, MIN_VOLUME) 168 return self.volume_setting != old_volume 169 170 def _on_relative_volume_up(self) -> bool: 171 old_volume = self.volume_setting 172 self.volume_setting = min(self.volume_setting + self.step_size, MAX_VOLUME) 173 return self.volume_setting != old_volume 174 175 def _on_unmute_relative_volume_down(self) -> bool: 176 old_volume, old_muted_state = self.volume_setting, self.muted 177 self.volume_setting = max(self.volume_setting - self.step_size, MIN_VOLUME) 178 self.muted = 0 179 return (self.volume_setting, self.muted) != (old_volume, old_muted_state) 180 181 def _on_unmute_relative_volume_up(self) -> bool: 182 old_volume, old_muted_state = self.volume_setting, self.muted 183 self.volume_setting = min(self.volume_setting + self.step_size, MAX_VOLUME) 184 self.muted = 0 185 return (self.volume_setting, self.muted) != (old_volume, old_muted_state) 186 187 def _on_set_absolute_volume(self, volume_setting: int) -> bool: 188 old_volume_setting = self.volume_setting 189 self.volume_setting = volume_setting 190 return old_volume_setting != self.volume_setting 191 192 def _on_unmute(self) -> bool: 193 old_muted_state = self.muted 194 self.muted = 0 195 return self.muted != old_muted_state 196 197 def _on_mute(self) -> bool: 198 old_muted_state = self.muted 199 self.muted = 1 200 return self.muted != old_muted_state 201 202 203# ----------------------------------------------------------------------------- 204# Client 205# ----------------------------------------------------------------------------- 206class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy): 207 SERVICE_CLASS = VolumeControlService 208 209 volume_control_point: gatt_client.CharacteristicProxy 210 211 def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: 212 self.service_proxy = service_proxy 213 214 self.volume_state = gatt.PackedCharacteristicAdapter( 215 service_proxy.get_characteristics_by_uuid( 216 gatt.GATT_VOLUME_STATE_CHARACTERISTIC 217 )[0], 218 'BBB', 219 ) 220 221 self.volume_control_point = service_proxy.get_characteristics_by_uuid( 222 gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC 223 )[0] 224 225 self.volume_flags = gatt.PackedCharacteristicAdapter( 226 service_proxy.get_characteristics_by_uuid( 227 gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC 228 )[0], 229 'B', 230 ) 231