1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import grpc
17import logging
18
19from bumble.att import Attribute
20from bumble.core import ProtocolError
21from bumble.device import Connection as BumbleConnection, Device, Peer
22from bumble.gatt import Characteristic, Descriptor, Service, GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE
23from bumble.gatt_client import CharacteristicProxy, ServiceProxy
24from bumble.pandora import utils
25from pandora_experimental.gatt_grpc_aio import GATTServicer
26from pandora_experimental.gatt_pb2 import (
27    ATTRIBUTE_NOT_FOUND,
28    SUCCESS,
29    AttStatusCode,
30    AttValue,
31    ClearCacheRequest,
32    ClearCacheResponse,
33    DiscoverServiceByUuidRequest,
34    DiscoverServicesRequest,
35    DiscoverServicesResponse,
36    ExchangeMTURequest,
37    ExchangeMTUResponse,
38    GattCharacteristic,
39    GattCharacteristicDescriptor,
40    GattService,
41    IndicateOnCharacteristicRequest,
42    IndicateOnCharacteristicResponse,
43    NotifyOnCharacteristicRequest,
44    NotifyOnCharacteristicResponse,
45    PRIMARY as PRIMARY_SERVICE,
46    ReadCharacteristicDescriptorRequest,
47    ReadCharacteristicDescriptorResponse,
48    ReadCharacteristicRequest,
49    ReadCharacteristicResponse,
50    ReadCharacteristicsFromUuidRequest,
51    ReadCharacteristicsFromUuidResponse,
52    RegisterServiceRequest,
53    RegisterServiceResponse,
54    SECONDARY as SECONDARY_SERVICE,
55    ServiceType,
56    WriteRequest,
57    WriteResponse,
58)
59from typing import Dict, List
60
61
62class GATTService(GATTServicer):
63    device: Device
64    peers: Dict[int, Peer]
65
66    def __init__(self, device: Device) -> None:
67        super().__init__()
68        self.device = device
69        self.peers: Dict[int, Peer] = {}
70        self.device.on('connection', self.on_connection)  # type: ignore
71        self.device.on('disconnection', self.on_disconnection)  # type: ignore
72
73    def __del__(self) -> None:
74        self.device.remove_listener('connection', self.on_connection)  # type: ignore
75        self.device.remove_listener('disconnection', self.on_disconnection)  # type: ignore
76
77    def on_connection(self, connection: BumbleConnection) -> None:
78        self.peers[connection.handle] = Peer(connection)  # type: ignore[no-untyped-call]
79
80    def on_disconnection(self, connection: BumbleConnection) -> None:
81        del self.peers[connection.handle]
82
83    @utils.rpc
84    async def ExchangeMTU(self, request: ExchangeMTURequest, context: grpc.ServicerContext) -> ExchangeMTUResponse:
85        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
86        logging.info(f"ExchangeMTU: {connection_handle}")
87
88        connection = self.device.lookup_connection(connection_handle)
89        assert connection
90        peer = self.peers[connection.handle]
91
92        mtu = await peer.request_mtu(request.mtu)  # type: ignore
93        assert mtu == request.mtu
94
95        return ExchangeMTUResponse()
96
97    @utils.rpc
98    async def WriteAttFromHandle(self, request: WriteRequest, context: grpc.ServicerContext) -> WriteResponse:
99        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
100        logging.info(f"WriteAttFromHandle: {connection_handle}")
101
102        connection = self.device.lookup_connection(connection_handle)
103        assert connection
104        peer = self.peers[connection.handle]
105
106        try:
107            await peer.write_value(request.handle, request.value, with_response=True)  # type: ignore
108            status: AttStatusCode = SUCCESS
109        except ProtocolError as e:
110            status = e.error_code  # type: ignore
111
112        return WriteResponse(handle=request.handle, status=status)
113
114    @utils.rpc
115    async def DiscoverServiceByUuid(self, request: DiscoverServiceByUuidRequest,
116                                    context: grpc.ServicerContext) -> DiscoverServicesResponse:
117        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
118        logging.info(f"DiscoverServiceByUuid: {connection_handle}")
119
120        connection = self.device.lookup_connection(connection_handle)
121        assert connection
122        peer = self.peers[connection.handle]
123
124        services: List[ServiceProxy] = await peer.discover_service(request.uuid)  # type: ignore
125
126        async def feed_service(service: ServiceProxy) -> None:
127            characteristic: CharacteristicProxy
128            for characteristic in await peer.discover_characteristics(service=service):  # type: ignore
129                await characteristic.discover_descriptors()  # type: ignore[no-untyped-call]
130
131        await asyncio.gather(*(feed_service(service) for service in services))
132
133        return DiscoverServicesResponse(services=[
134            GattService(
135                handle=service.handle,
136                type=int.from_bytes(bytes(service.type), 'little'),
137                uuid=service.uuid.to_hex_str('-'),  # type: ignore
138                characteristics=[
139                    GattCharacteristic(
140                        properties=characteristic.properties,  # type: ignore
141                        permissions=0,  # TODO
142                        uuid=characteristic.uuid.to_hex_str('-'),  # type: ignore
143                        handle=characteristic.handle,  # type: ignore
144                        descriptors=[
145                            GattCharacteristicDescriptor(
146                                handle=descriptor.handle,  # type: ignore
147                                permissions=0,  # TODO
148                                uuid=str(descriptor.type),  # type: ignore
149                            ) for descriptor in characteristic.descriptors  # type: ignore
150                        ],
151                    ) for characteristic in service.characteristics  # type: ignore
152                ],
153            ) for service in services
154        ])
155
156    @utils.rpc
157    async def DiscoverServices(self, request: DiscoverServicesRequest,
158                               context: grpc.ServicerContext) -> DiscoverServicesResponse:
159        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
160        logging.info(f"DiscoverServices: {connection_handle}")
161
162        connection = self.device.lookup_connection(connection_handle)
163        assert connection
164        peer = self.peers[connection.handle]
165
166        services: List[ServiceProxy] = await peer.discover_services()  # type: ignore
167
168        async def feed_service(service: ServiceProxy) -> None:
169            for characteristic in await peer.discover_characteristics(service=service):  # type: ignore
170                await characteristic.discover_descriptors()  # type: ignore
171
172        await asyncio.gather(*(feed_service(service) for service in services))
173
174        return DiscoverServicesResponse(services=[
175            GattService(
176                handle=service.handle,
177                service_type=PRIMARY_SERVICE if service.type ==
178                GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE else SECONDARY_SERVICE,
179                uuid=service.uuid.to_hex_str('-'),  # type: ignore
180                characteristics=[
181                    GattCharacteristic(
182                        properties=characteristic.properties,  # type: ignore
183                        permissions=0,  # TODO
184                        uuid=characteristic.uuid.to_hex_str('-'),  # type: ignore
185                        handle=characteristic.handle,  # type: ignore
186                        descriptors=[
187                            GattCharacteristicDescriptor(
188                                handle=descriptor.handle,  # type: ignore
189                                permissions=0,  # TODO
190                                uuid=str(descriptor.type),  # type: ignore
191                            ) for descriptor in characteristic.descriptors  # type: ignore
192                        ],
193                    ) for characteristic in service.characteristics  # type: ignore
194                ],
195            ) for service in services
196        ])
197
198    # TODO: implement `DiscoverServicesSdp`
199
200    @utils.rpc
201    async def ClearCache(self, request: ClearCacheRequest, context: grpc.ServicerContext) -> ClearCacheResponse:
202        logging.info("ClearCache")
203        return ClearCacheResponse()
204
205    @utils.rpc
206    async def ReadCharacteristicFromHandle(self, request: ReadCharacteristicRequest,
207                                           context: grpc.ServicerContext) -> ReadCharacteristicResponse:
208        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
209        logging.info(f"ReadCharacteristicFromHandle: {connection_handle}")
210
211        connection = self.device.lookup_connection(connection_handle)
212        assert connection
213        peer = self.peers[connection.handle]
214
215        try:
216            value = await peer.read_value(request.handle)  # type: ignore
217            status: AttStatusCode = SUCCESS
218        except ProtocolError as e:
219            value = bytes()
220            status = e.error_code  # type: ignore
221
222        return ReadCharacteristicResponse(value=AttValue(value=value), status=status)
223
224    @utils.rpc
225    async def ReadCharacteristicsFromUuid(self, request: ReadCharacteristicsFromUuidRequest,
226                                          context: grpc.ServicerContext) -> ReadCharacteristicsFromUuidResponse:
227        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
228        logging.info(f"ReadCharacteristicsFromUuid: {connection_handle}")
229
230        connection = self.device.lookup_connection(connection_handle)
231        assert connection
232        peer = self.peers[connection.handle]
233
234        service_mock = type('', (), {'handle': request.start_handle, 'end_group_handle': request.end_handle})()
235
236        try:
237            characteristics = await peer.read_characteristics_by_uuid(request.uuid, service_mock)  # type: ignore
238
239            return ReadCharacteristicsFromUuidResponse(characteristics_read=[
240                ReadCharacteristicResponse(
241                    value=AttValue(value=value, handle=handle),  # type: ignore
242                    status=SUCCESS,
243                ) for handle, value in characteristics  # type: ignore
244            ])
245
246        except ProtocolError as e:
247            return ReadCharacteristicsFromUuidResponse(
248                characteristics_read=[ReadCharacteristicResponse(status=e.error_code)]  # type: ignore
249            )
250
251    @utils.rpc
252    async def ReadCharacteristicDescriptorFromHandle(
253            self, request: ReadCharacteristicDescriptorRequest,
254            context: grpc.ServicerContext) -> ReadCharacteristicDescriptorResponse:
255        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
256        logging.info(f"ReadCharacteristicDescriptorFromHandle: {connection_handle}")
257
258        connection = self.device.lookup_connection(connection_handle)
259        assert connection
260        peer = self.peers[connection.handle]
261
262        try:
263            value = await peer.read_value(request.handle)  # type: ignore
264            status: AttStatusCode = SUCCESS
265        except ProtocolError as e:
266            value = bytes()
267            status = e.error_code  # type: ignore
268
269        return ReadCharacteristicDescriptorResponse(value=AttValue(value=value), status=status)
270
271    @utils.rpc
272    def RegisterService(self, request: RegisterServiceRequest,
273                        context: grpc.ServicerContext) -> RegisterServiceResponse:
274        logging.info(f"RegisterService")
275
276        serviceUUID = request.service.uuid
277        characteristics = [
278            Characteristic(
279                properties=Characteristic.Properties(characteristicParam.properties),
280                permissions=Attribute.Permissions(characteristicParam.permissions),
281                uuid=characteristicParam.uuid,
282                descriptors=[
283                    Descriptor(
284                        attribute_type=descParam.uuid,
285                        permissions=Attribute.Permissions(descParam.permissions),
286                    ) for descParam in characteristicParam.descriptors
287                ],
288            ) for characteristicParam in request.service.characteristics
289        ]
290        service = Service(serviceUUID, characteristics)
291        self.device.add_service(service)  # type: ignore[no-untyped-call]
292
293        logging.info(f"RegisterService complete")
294        return RegisterServiceResponse()
295
296    @utils.rpc
297    async def NotifyOnCharacteristic(self, request: NotifyOnCharacteristicRequest,
298                                     context: grpc.ServicerContext) -> NotifyOnCharacteristicResponse:
299        logging.info(f"NotifyOnCharacteristic")
300
301        attr = self.device.gatt_server.get_attribute(request.handle)
302        if not attr:
303            return NotifyOnCharacteristicResponse(status=ATTRIBUTE_NOT_FOUND)
304        await self.device.notify_subscribers(attr, request.value)
305        return NotifyOnCharacteristicResponse(status=SUCCESS)
306
307    @utils.rpc
308    async def IndicateOnCharacteristic(self, request: IndicateOnCharacteristicRequest,
309                                       context: grpc.ServicerContext) -> IndicateOnCharacteristicResponse:
310        logging.info(f"IndicateOnCharacteristic")
311
312        attr = self.device.gatt_server.get_attribute(request.handle)
313        if not attr:
314            return IndicateOnCharacteristicResponse(status=ATTRIBUTE_NOT_FOUND)
315        await self.device.indicate_subscribers(attr, request.value)
316        return IndicateOnCharacteristicResponse(status=SUCCESS)
317