1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import grpc
16import logging
17
18from bumble.core import UUID as BumbleUUID, AdvertisingData
19from bumble.device import Connection, Device
20from bumble.gatt import Characteristic, CharacteristicValue, TemplateService
21from bumble.l2cap import LeCreditBasedChannel, LeCreditBasedChannelSpec
22from bumble.pandora import utils
23from google.protobuf.empty_pb2 import Empty
24from pandora_experimental.dck_grpc_aio import DckServicer
25from typing import Optional
26
27
28class DckGattService(TemplateService):
29    CCC_DK_UUID: BumbleUUID = BumbleUUID.from_16_bits(0xFFF5, 'Car Connectivity Consortium, LLC')
30    UUID = CCC_DK_UUID
31    UUID_SPSM = BumbleUUID("D3B5A130-9E23-4B3A-8BE4-6B1EE5F980A3", "Vehicle SPSM")
32    UUID_SPSM_DK_VERSION = BumbleUUID("D3B5A130-9E23-4B3A-8BE4-6B1EE5B780A3", "DK version")
33    UUID_DEVICE_DK_VERSION = BumbleUUID("BD4B9502-3F54-11EC-B919-0242AC120005", "Device Selected DK version")
34    UUID_ANTENNA_IDENTIFIER = BumbleUUID("c6d7d4a1-e2b0-4e95-b576-df983d1a5d9f", "Vehicle Antenna Identifier")
35
36    def __init__(self, device: Device):
37        logger = logging.getLogger(__name__)
38
39        def on_l2cap_channel(channel: LeCreditBasedChannel) -> None:
40            logger.info(f"--- DckGattService on_l2cap_channel {channel}")
41
42        self.device_dk_version_value = None
43        self.l2cap_server = device.create_l2cap_server(
44            spec=LeCreditBasedChannelSpec(),
45            handler=on_l2cap_channel,
46        )
47        self.psm = self.l2cap_server.psm
48
49        def on_device_version_write(connection: Connection, value: bytes) -> None:
50            logger.info(f"--- DK Device Version Write: {value!r}")
51            self.device_dk_version_value = value
52
53        characteristics = [
54            Characteristic(
55                DckGattService.UUID_SPSM,
56                Characteristic.Properties.READ,
57                Characteristic.READABLE,
58                # CCC Specification Digital-Key R3-1.2.3
59                # 19.2.1.6 DK Service
60                self.psm.to_bytes(2, 'big'),
61            ),
62            Characteristic(
63                DckGattService.UUID_SPSM_DK_VERSION,
64                Characteristic.Properties.READ,
65                Characteristic.READ_REQUIRES_ENCRYPTION,
66                b'',
67            ),
68            Characteristic(
69                DckGattService.UUID_DEVICE_DK_VERSION,
70                Characteristic.Properties.WRITE,
71                Characteristic.READ_REQUIRES_ENCRYPTION,
72                CharacteristicValue(write=on_device_version_write),  # type: ignore[no-untyped-call]
73            ),
74            Characteristic(
75                DckGattService.UUID_ANTENNA_IDENTIFIER,
76                Characteristic.READ,
77                Characteristic.READABLE,
78                b'',
79            ),
80        ]
81
82        super().__init__(characteristics)  # type: ignore[no-untyped-call]
83
84    def __del__(self):
85        if self.l2cap_server:
86            self.l2cap_server.close()
87
88    def get_advertising_data(self) -> bytes:
89        # CCC Specification Digital-Key R3-1.2.0-r14
90        # 19.2 LE Procedures AdvData field of ADV_IND
91
92        return bytes(AdvertisingData([(AdvertisingData.SERVICE_DATA_16_BIT_UUID, bytes(DckGattService.CCC_DK_UUID))]))
93
94
95class DckService(DckServicer):
96    device: Device
97    dck_gatt_service: Optional[DckGattService]
98
99    def __init__(self, device: Device) -> None:
100        self.log = utils.BumbleServerLoggerAdapter(logging.getLogger(), {"service_name": "Dck", "device": device})
101        self.device = device
102        self.dck_gatt_service = None
103
104    @utils.rpc
105    def Register(self, request: Empty, context: grpc.ServicerContext) -> Empty:
106        if self.dck_gatt_service is None:
107            self.dck_gatt_service = DckGattService(self.device)
108            self.device.add_service(self.dck_gatt_service)  # type: ignore[no-untyped-call]
109
110        return Empty()
111