1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import grpc 17import logging 18 19from bumble.att import Attribute 20from bumble.core import ProtocolError 21from bumble.device import Connection as BumbleConnection, Device, Peer 22from bumble.gatt import Characteristic, Descriptor, Service, GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE 23from bumble.gatt_client import CharacteristicProxy, ServiceProxy 24from bumble.pandora import utils 25from pandora_experimental.gatt_grpc_aio import GATTServicer 26from pandora_experimental.gatt_pb2 import ( 27 ATTRIBUTE_NOT_FOUND, 28 SUCCESS, 29 AttStatusCode, 30 AttValue, 31 ClearCacheRequest, 32 ClearCacheResponse, 33 DiscoverServiceByUuidRequest, 34 DiscoverServicesRequest, 35 DiscoverServicesResponse, 36 ExchangeMTURequest, 37 ExchangeMTUResponse, 38 GattCharacteristic, 39 GattCharacteristicDescriptor, 40 GattService, 41 IndicateOnCharacteristicRequest, 42 IndicateOnCharacteristicResponse, 43 NotifyOnCharacteristicRequest, 44 NotifyOnCharacteristicResponse, 45 PRIMARY as PRIMARY_SERVICE, 46 ReadCharacteristicDescriptorRequest, 47 ReadCharacteristicDescriptorResponse, 48 ReadCharacteristicRequest, 49 ReadCharacteristicResponse, 50 ReadCharacteristicsFromUuidRequest, 51 ReadCharacteristicsFromUuidResponse, 52 RegisterServiceRequest, 53 RegisterServiceResponse, 54 SECONDARY as SECONDARY_SERVICE, 55 ServiceType, 56 WriteRequest, 57 WriteResponse, 58) 59from typing import Dict, List 60 61 62class GATTService(GATTServicer): 63 device: Device 64 peers: Dict[int, Peer] 65 66 def __init__(self, device: Device) -> None: 67 super().__init__() 68 self.device = device 69 self.peers: Dict[int, Peer] = {} 70 self.device.on('connection', self.on_connection) # type: ignore 71 self.device.on('disconnection', self.on_disconnection) # type: ignore 72 73 def __del__(self) -> None: 74 self.device.remove_listener('connection', self.on_connection) # type: ignore 75 self.device.remove_listener('disconnection', self.on_disconnection) # type: ignore 76 77 def on_connection(self, connection: BumbleConnection) -> None: 78 self.peers[connection.handle] = Peer(connection) # type: ignore[no-untyped-call] 79 80 def on_disconnection(self, connection: BumbleConnection) -> None: 81 del self.peers[connection.handle] 82 83 @utils.rpc 84 async def ExchangeMTU(self, request: ExchangeMTURequest, context: grpc.ServicerContext) -> ExchangeMTUResponse: 85 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 86 logging.info(f"ExchangeMTU: {connection_handle}") 87 88 connection = self.device.lookup_connection(connection_handle) 89 assert connection 90 peer = self.peers[connection.handle] 91 92 mtu = await peer.request_mtu(request.mtu) # type: ignore 93 assert mtu == request.mtu 94 95 return ExchangeMTUResponse() 96 97 @utils.rpc 98 async def WriteAttFromHandle(self, request: WriteRequest, context: grpc.ServicerContext) -> WriteResponse: 99 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 100 logging.info(f"WriteAttFromHandle: {connection_handle}") 101 102 connection = self.device.lookup_connection(connection_handle) 103 assert connection 104 peer = self.peers[connection.handle] 105 106 try: 107 await peer.write_value(request.handle, request.value, with_response=True) # type: ignore 108 status: AttStatusCode = SUCCESS 109 except ProtocolError as e: 110 status = e.error_code # type: ignore 111 112 return WriteResponse(handle=request.handle, status=status) 113 114 @utils.rpc 115 async def DiscoverServiceByUuid(self, request: DiscoverServiceByUuidRequest, 116 context: grpc.ServicerContext) -> DiscoverServicesResponse: 117 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 118 logging.info(f"DiscoverServiceByUuid: {connection_handle}") 119 120 connection = self.device.lookup_connection(connection_handle) 121 assert connection 122 peer = self.peers[connection.handle] 123 124 services: List[ServiceProxy] = await peer.discover_service(request.uuid) # type: ignore 125 126 async def feed_service(service: ServiceProxy) -> None: 127 characteristic: CharacteristicProxy 128 for characteristic in await peer.discover_characteristics(service=service): # type: ignore 129 await characteristic.discover_descriptors() # type: ignore[no-untyped-call] 130 131 await asyncio.gather(*(feed_service(service) for service in services)) 132 133 return DiscoverServicesResponse(services=[ 134 GattService( 135 handle=service.handle, 136 type=int.from_bytes(bytes(service.type), 'little'), 137 uuid=service.uuid.to_hex_str('-'), # type: ignore 138 characteristics=[ 139 GattCharacteristic( 140 properties=characteristic.properties, # type: ignore 141 permissions=0, # TODO 142 uuid=characteristic.uuid.to_hex_str('-'), # type: ignore 143 handle=characteristic.handle, # type: ignore 144 descriptors=[ 145 GattCharacteristicDescriptor( 146 handle=descriptor.handle, # type: ignore 147 permissions=0, # TODO 148 uuid=str(descriptor.type), # type: ignore 149 ) for descriptor in characteristic.descriptors # type: ignore 150 ], 151 ) for characteristic in service.characteristics # type: ignore 152 ], 153 ) for service in services 154 ]) 155 156 @utils.rpc 157 async def DiscoverServices(self, request: DiscoverServicesRequest, 158 context: grpc.ServicerContext) -> DiscoverServicesResponse: 159 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 160 logging.info(f"DiscoverServices: {connection_handle}") 161 162 connection = self.device.lookup_connection(connection_handle) 163 assert connection 164 peer = self.peers[connection.handle] 165 166 services: List[ServiceProxy] = await peer.discover_services() # type: ignore 167 168 async def feed_service(service: ServiceProxy) -> None: 169 for characteristic in await peer.discover_characteristics(service=service): # type: ignore 170 await characteristic.discover_descriptors() # type: ignore 171 172 await asyncio.gather(*(feed_service(service) for service in services)) 173 174 return DiscoverServicesResponse(services=[ 175 GattService( 176 handle=service.handle, 177 service_type=PRIMARY_SERVICE if service.type == 178 GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE else SECONDARY_SERVICE, 179 uuid=service.uuid.to_hex_str('-'), # type: ignore 180 characteristics=[ 181 GattCharacteristic( 182 properties=characteristic.properties, # type: ignore 183 permissions=0, # TODO 184 uuid=characteristic.uuid.to_hex_str('-'), # type: ignore 185 handle=characteristic.handle, # type: ignore 186 descriptors=[ 187 GattCharacteristicDescriptor( 188 handle=descriptor.handle, # type: ignore 189 permissions=0, # TODO 190 uuid=str(descriptor.type), # type: ignore 191 ) for descriptor in characteristic.descriptors # type: ignore 192 ], 193 ) for characteristic in service.characteristics # type: ignore 194 ], 195 ) for service in services 196 ]) 197 198 # TODO: implement `DiscoverServicesSdp` 199 200 @utils.rpc 201 async def ClearCache(self, request: ClearCacheRequest, context: grpc.ServicerContext) -> ClearCacheResponse: 202 logging.info("ClearCache") 203 return ClearCacheResponse() 204 205 @utils.rpc 206 async def ReadCharacteristicFromHandle(self, request: ReadCharacteristicRequest, 207 context: grpc.ServicerContext) -> ReadCharacteristicResponse: 208 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 209 logging.info(f"ReadCharacteristicFromHandle: {connection_handle}") 210 211 connection = self.device.lookup_connection(connection_handle) 212 assert connection 213 peer = self.peers[connection.handle] 214 215 try: 216 value = await peer.read_value(request.handle) # type: ignore 217 status: AttStatusCode = SUCCESS 218 except ProtocolError as e: 219 value = bytes() 220 status = e.error_code # type: ignore 221 222 return ReadCharacteristicResponse(value=AttValue(value=value), status=status) 223 224 @utils.rpc 225 async def ReadCharacteristicsFromUuid(self, request: ReadCharacteristicsFromUuidRequest, 226 context: grpc.ServicerContext) -> ReadCharacteristicsFromUuidResponse: 227 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 228 logging.info(f"ReadCharacteristicsFromUuid: {connection_handle}") 229 230 connection = self.device.lookup_connection(connection_handle) 231 assert connection 232 peer = self.peers[connection.handle] 233 234 service_mock = type('', (), {'handle': request.start_handle, 'end_group_handle': request.end_handle})() 235 236 try: 237 characteristics = await peer.read_characteristics_by_uuid(request.uuid, service_mock) # type: ignore 238 239 return ReadCharacteristicsFromUuidResponse(characteristics_read=[ 240 ReadCharacteristicResponse( 241 value=AttValue(value=value, handle=handle), # type: ignore 242 status=SUCCESS, 243 ) for handle, value in characteristics # type: ignore 244 ]) 245 246 except ProtocolError as e: 247 return ReadCharacteristicsFromUuidResponse( 248 characteristics_read=[ReadCharacteristicResponse(status=e.error_code)] # type: ignore 249 ) 250 251 @utils.rpc 252 async def ReadCharacteristicDescriptorFromHandle( 253 self, request: ReadCharacteristicDescriptorRequest, 254 context: grpc.ServicerContext) -> ReadCharacteristicDescriptorResponse: 255 connection_handle = int.from_bytes(request.connection.cookie.value, 'big') 256 logging.info(f"ReadCharacteristicDescriptorFromHandle: {connection_handle}") 257 258 connection = self.device.lookup_connection(connection_handle) 259 assert connection 260 peer = self.peers[connection.handle] 261 262 try: 263 value = await peer.read_value(request.handle) # type: ignore 264 status: AttStatusCode = SUCCESS 265 except ProtocolError as e: 266 value = bytes() 267 status = e.error_code # type: ignore 268 269 return ReadCharacteristicDescriptorResponse(value=AttValue(value=value), status=status) 270 271 @utils.rpc 272 def RegisterService(self, request: RegisterServiceRequest, 273 context: grpc.ServicerContext) -> RegisterServiceResponse: 274 logging.info(f"RegisterService") 275 276 serviceUUID = request.service.uuid 277 characteristics = [ 278 Characteristic( 279 properties=Characteristic.Properties(characteristicParam.properties), 280 permissions=Attribute.Permissions(characteristicParam.permissions), 281 uuid=characteristicParam.uuid, 282 descriptors=[ 283 Descriptor( 284 attribute_type=descParam.uuid, 285 permissions=Attribute.Permissions(descParam.permissions), 286 ) for descParam in characteristicParam.descriptors 287 ], 288 ) for characteristicParam in request.service.characteristics 289 ] 290 service = Service(serviceUUID, characteristics) 291 self.device.add_service(service) # type: ignore[no-untyped-call] 292 293 logging.info(f"RegisterService complete") 294 return RegisterServiceResponse() 295 296 @utils.rpc 297 async def NotifyOnCharacteristic(self, request: NotifyOnCharacteristicRequest, 298 context: grpc.ServicerContext) -> NotifyOnCharacteristicResponse: 299 logging.info(f"NotifyOnCharacteristic") 300 301 attr = self.device.gatt_server.get_attribute(request.handle) 302 if not attr: 303 return NotifyOnCharacteristicResponse(status=ATTRIBUTE_NOT_FOUND) 304 await self.device.notify_subscribers(attr, request.value) 305 return NotifyOnCharacteristicResponse(status=SUCCESS) 306 307 @utils.rpc 308 async def IndicateOnCharacteristic(self, request: IndicateOnCharacteristicRequest, 309 context: grpc.ServicerContext) -> IndicateOnCharacteristicResponse: 310 logging.info(f"IndicateOnCharacteristic") 311 312 attr = self.device.gatt_server.get_attribute(request.handle) 313 if not attr: 314 return IndicateOnCharacteristicResponse(status=ATTRIBUTE_NOT_FOUND) 315 await self.device.indicate_subscribers(attr, request.value) 316 return IndicateOnCharacteristicResponse(status=SUCCESS) 317