1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import grpc 16import logging 17 18from bumble.core import UUID as BumbleUUID, AdvertisingData 19from bumble.device import Connection, Device 20from bumble.gatt import Characteristic, CharacteristicValue, TemplateService 21from bumble.l2cap import LeCreditBasedChannel, LeCreditBasedChannelSpec 22from bumble.pandora import utils 23from google.protobuf.empty_pb2 import Empty 24from pandora_experimental.dck_grpc_aio import DckServicer 25from typing import Optional 26 27 28class DckGattService(TemplateService): 29 CCC_DK_UUID: BumbleUUID = BumbleUUID.from_16_bits(0xFFF5, 'Car Connectivity Consortium, LLC') 30 UUID = CCC_DK_UUID 31 UUID_SPSM = BumbleUUID("D3B5A130-9E23-4B3A-8BE4-6B1EE5F980A3", "Vehicle SPSM") 32 UUID_SPSM_DK_VERSION = BumbleUUID("D3B5A130-9E23-4B3A-8BE4-6B1EE5B780A3", "DK version") 33 UUID_DEVICE_DK_VERSION = BumbleUUID("BD4B9502-3F54-11EC-B919-0242AC120005", "Device Selected DK version") 34 UUID_ANTENNA_IDENTIFIER = BumbleUUID("c6d7d4a1-e2b0-4e95-b576-df983d1a5d9f", "Vehicle Antenna Identifier") 35 36 def __init__(self, device: Device): 37 logger = logging.getLogger(__name__) 38 39 def on_l2cap_channel(channel: LeCreditBasedChannel) -> None: 40 logger.info(f"--- DckGattService on_l2cap_channel {channel}") 41 42 self.device_dk_version_value = None 43 self.l2cap_server = device.create_l2cap_server( 44 spec=LeCreditBasedChannelSpec(), 45 handler=on_l2cap_channel, 46 ) 47 self.psm = self.l2cap_server.psm 48 49 def on_device_version_write(connection: Connection, value: bytes) -> None: 50 logger.info(f"--- DK Device Version Write: {value!r}") 51 self.device_dk_version_value = value 52 53 characteristics = [ 54 Characteristic( 55 DckGattService.UUID_SPSM, 56 Characteristic.Properties.READ, 57 Characteristic.READABLE, 58 # CCC Specification Digital-Key R3-1.2.3 59 # 19.2.1.6 DK Service 60 self.psm.to_bytes(2, 'big'), 61 ), 62 Characteristic( 63 DckGattService.UUID_SPSM_DK_VERSION, 64 Characteristic.Properties.READ, 65 Characteristic.READ_REQUIRES_ENCRYPTION, 66 b'', 67 ), 68 Characteristic( 69 DckGattService.UUID_DEVICE_DK_VERSION, 70 Characteristic.Properties.WRITE, 71 Characteristic.READ_REQUIRES_ENCRYPTION, 72 CharacteristicValue(write=on_device_version_write), # type: ignore[no-untyped-call] 73 ), 74 Characteristic( 75 DckGattService.UUID_ANTENNA_IDENTIFIER, 76 Characteristic.READ, 77 Characteristic.READABLE, 78 b'', 79 ), 80 ] 81 82 super().__init__(characteristics) # type: ignore[no-untyped-call] 83 84 def __del__(self): 85 if self.l2cap_server: 86 self.l2cap_server.close() 87 88 def get_advertising_data(self) -> bytes: 89 # CCC Specification Digital-Key R3-1.2.0-r14 90 # 19.2 LE Procedures AdvData field of ADV_IND 91 92 return bytes(AdvertisingData([(AdvertisingData.SERVICE_DATA_16_BIT_UUID, bytes(DckGattService.CCC_DK_UUID))])) 93 94 95class DckService(DckServicer): 96 device: Device 97 dck_gatt_service: Optional[DckGattService] 98 99 def __init__(self, device: Device) -> None: 100 self.log = utils.BumbleServerLoggerAdapter(logging.getLogger(), {"service_name": "Dck", "device": device}) 101 self.device = device 102 self.dck_gatt_service = None 103 104 @utils.rpc 105 def Register(self, request: Empty, context: grpc.ServicerContext) -> Empty: 106 if self.dck_gatt_service is None: 107 self.dck_gatt_service = DckGattService(self.device) 108 self.device.add_service(self.dck_gatt_service) # type: ignore[no-untyped-call] 109 110 return Empty() 111