1# Copyright (C) 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import re
16import sys
17import textwrap
18import threading
19import uuid
20
21from mmi2grpc._helpers import assert_description, match_description
22from mmi2grpc._proxy import ProfileProxy
23from mmi2grpc._rootcanal import Dongle
24from pandora.host_grpc import Host
25from pandora.host_pb2 import RANDOM
26from pandora.security_grpc import Security
27from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer
28from pandora_experimental.gatt_grpc import GATT
29
30BASE_UUID = uuid.UUID("00000000-0000-1000-8000-00805F9B34FB")
31
32
33def short_uuid(full: uuid.UUID) -> int:
34    return (uuid.UUID(full).int - BASE_UUID.int) >> 96
35
36
37class HOGPProxy(ProfileProxy):
38
39    def __init__(self, channel, rootcanal):
40        super().__init__(channel)
41        self.host = Host(channel)
42        self.security = Security(channel)
43        self.gatt = GATT(channel)
44        self.rootcanal = rootcanal
45        self.connection = None
46        self.pairing_stream = self.security.OnPairing()
47        self.characteristic_reads = {}
48
49    def test_started(self, test: str, **kwargs):
50        self.rootcanal.select_pts_dongle(Dongle.CSR_RCK_PTS_DONGLE)
51
52        return "OK"
53
54    @assert_description
55    def IUT_INITIATE_CONNECTION(self, pts_addr: bytes, **kwargs):
56        """
57        Please initiate a GATT connection to the PTS.
58
59        Description: Verify that
60        the Implementation Under Test (IUT) can initiate a GATT connect request
61        to the PTS.
62        """
63
64        self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=pts_addr).connection
65
66        def secure():
67            self.security.Secure(connection=self.connection, le=LE_LEVEL3)
68
69        threading.Thread(target=secure).start()
70
71        return "OK"
72
73    @match_description
74    def _mmi_2004(self, pts_addr: bytes, passkey: str, **kwargs):
75        """
76        Please confirm that 6 digit number is matched with (?P<passkey>[0-9]*).
77        """
78        received = []
79        for event in self.pairing_stream:
80            if event.address == pts_addr and event.numeric_comparison == int(passkey):
81                self.pairing_stream.send(PairingEventAnswer(
82                    event=event,
83                    confirm=True,
84                ))
85                return "OK"
86            received.append(event.numeric_comparison)
87
88        assert False, f"mismatched passcode: expected {passkey}, received {received}"
89
90    @match_description
91    def IUT_SEND_WRITE_REQUEST(self, handle: str, properties: str, **kwargs):
92        r"""
93        Please send write request to handle (?P<handle>\S*) with following value.
94        Client
95        Characteristic Configuration:
96             Properties: \[0x00(?P<properties>\S*)\]
97        """
98
99        self.gatt.WriteAttFromHandle(
100            connection=self.connection,
101            handle=int(handle, base=16),
102            value=bytes([int(f"0x{properties}", base=16), 0]),
103        )
104
105        return "OK"
106
107    @match_description
108    def USER_CONFIRM_CHARACTERISTIC(self, body: str, **kwargs):
109        r"""
110        Please verify that following attribute handle/UUID pair was returned
111        containing the UUID for the (.*)\.
112
113        (?P<body>.*)
114        """
115
116        PATTERN = re.compile(
117            textwrap.dedent(r"""
118                Attribute Handle = (\S*)
119                Characteristic Properties = (?P<properties>\S*)
120                Handle = (?P<handle>\S*)
121                UUID = (?P<uuid>\S*)
122                """).strip().replace("\n", " "))
123
124        targets = set()
125
126        for match in PATTERN.finditer(body):
127            targets.add((
128                int(match.group("properties"), base=16),
129                int(match.group("handle"), base=16),
130                int(match.group("uuid"), base=16),
131            ))
132
133        assert len(targets) == body.count("Characteristic Properties"), "safety check that regex is matching something"
134
135        services = self.gatt.DiscoverServices(connection=self.connection).services
136
137        for service in services:
138            for characteristic in service.characteristics:
139                uuid_16 = short_uuid(characteristic.uuid)
140                key = (characteristic.properties, characteristic.handle, uuid_16)
141                if key in targets:
142                    targets.remove(key)
143
144        assert not targets, f"could not find handles: {targets}"
145
146        return "OK"
147
148    @match_description
149    def USER_CONFIRM_CHARACTERISTIC_DESCRIPTOR(self, body: str, **kwargs):
150        r"""
151        Please verify that following attribute handle/UUID pair was returned
152        containing the UUID for the (.*)\.
153
154        (?P<body>.*)
155        """
156
157        PATTERN = re.compile(rf"handle = (?P<handle>\S*)\s* uuid = (?P<uuid>\S*)")
158
159        targets = set()
160
161        for match in PATTERN.finditer(body):
162            targets.add((
163                int(match.group("handle"), base=16),
164                int(match.group("uuid"), base=16),
165            ))
166
167        assert len(targets) == body.count("uuid = "), "safety check that regex is matching something"
168
169        services = self.gatt.DiscoverServices(connection=self.connection).services
170
171        for service in services:
172            for characteristic in service.characteristics:
173                for descriptor in characteristic.descriptors:
174                    uuid_16 = short_uuid(descriptor.uuid)
175                    key = (descriptor.handle, uuid_16)
176                    if key in targets:
177                        targets.remove(key)
178
179        assert not targets, f"could not find handles: {targets}"
180
181        return "OK"
182
183    @match_description
184    def USER_CONFIRM_SERVICE_HANDLE(self, service_name: str, body: str, **kwargs):
185        r"""
186        Please confirm the following handles for (?P<service_name>.*)\.
187
188        (?P<body>.*)
189        """
190
191        PATTERN = re.compile(r"Start Handle: (?P<start_handle>\S*) End Handle: (?P<end_handle>\S*)")
192
193        SERVICE_UUIDS = {
194            "Device Information": 0x180A,
195            "Battery Service": 0x180F,
196            "Human Interface Device": 0x1812,
197        }
198
199        target_uuid = SERVICE_UUIDS[service_name]
200
201        services = self.gatt.DiscoverServices(connection=self.connection).services
202
203        assert len(
204            PATTERN.findall(body)) == body.count("Start Handle:"), "safety check that regex is matching something"
205
206        for match in PATTERN.finditer(body):
207            start_handle = match.group("start_handle")
208
209            for service in services:
210                if service.handle == int(start_handle, base=16):
211                    assert (short_uuid(service.uuid) == target_uuid), "service UUID does not match expected type"
212                    break
213            else:
214                assert False, f"cannot find service with start handle {start_handle}"
215
216        return "OK"
217
218    @assert_description
219    def _mmi_1(self, **kwargs):
220        """
221        Please confirm that the IUT ignored the received Notification and did
222        not report the values to the Upper Tester.
223        """
224
225        # TODO
226
227        return "OK"
228
229    @match_description
230    def IUT_CONFIG_NOTIFICATION(self, value: str, **kwargs):
231        r"""
232        Please write to Client Characteristic Configuration Descriptor of Report
233        characteristic to enable notification.
234
235        Descriptor handle value: (?P<value>\S*)
236        """
237
238        self.gatt.WriteAttFromHandle(
239            connection=self.connection,
240            handle=int(value, base=16),
241            value=bytes([0x01, 0x00]),
242        )
243
244        return "OK"
245
246    @match_description
247    def IUT_READ_CHARACTERISTIC(self, test: str, characteristic_name: str, handle: str, **kwargs):
248        r"""
249        Please send Read Request to read (?P<characteristic_name>.*) characteristic with handle =
250        (?P<handle>\S*).
251        """
252
253        TESTS_READING_CHARACTERISTIC_NOT_DESCRIPTORS = [
254            "HOGP/RH/HGRF/BV-01-I",
255            "HOGP/RH/HGRF/BV-10-I",
256            "HOGP/RH/HGRF/BV-12-I",
257        ]
258
259        action = (self.gatt.ReadCharacteristicFromHandle if test in TESTS_READING_CHARACTERISTIC_NOT_DESCRIPTORS else
260                  self.gatt.ReadCharacteristicDescriptorFromHandle)
261
262        handle = int(handle, base=16)
263        self.characteristic_reads[handle] = action(
264            connection=self.connection,
265            handle=handle,
266        ).value.value
267
268        return "OK"
269
270    @match_description
271    def USER_CONFIRM_READ_RESULT(self, characteristic_name: str, body: str, **kwargs):
272        r"""
273        Please verify following (?P<characteristic_name>.*) Characteristic value is Read.
274
275        (?P<body>.*)
276        """
277
278        blocks = re.split("Handle:", body)
279
280        HEX = "[0-9A-F]"
281        PATTERN = re.compile(f"0x{HEX*2}(?:{HEX*2})?")
282
283        num_checks = 0
284
285        for block in blocks:
286            data = PATTERN.findall(block)
287            if not data:
288                continue
289
290            # first hex value is the handle, rest is the expected data
291            handle, *data = data
292
293            handle = int(handle, base=16)
294
295            actual = self.characteristic_reads[handle]
296
297            expected = []
298            for word in data:
299                if len(word) == len("0x0000"):
300                    first = int(word[2:4], base=16)
301                    second = int(word[4:6], base=16)
302
303                    if "bytes in LSB order" in body:
304                        little = first
305                        big = second
306                    else:
307                        little = second
308                        big = first
309
310                    expected.append(little)
311                    expected.append(big)
312                else:
313                    expected.append(int(word, base=16))
314
315            expected = bytes(expected)
316
317            num_checks += 1
318            assert (expected == actual), f"Got unexpected value for handle {handle}: {repr(expected)} != {repr(actual)}"
319
320        assert (body.count("Handle:") == num_checks), "safety check that regex is matching something"
321
322        return "OK"
323
324    @assert_description
325    def MMI_VERIFY_SECURE_ID(self, pts_addr: bytes, **kwargs):
326        """
327        Please enter the secure ID.
328        """
329
330        for event in self.pairing_stream:
331            if event.address == pts_addr and event.passkey_entry_notification:
332                print(f"Got passkey entry {event.passkey_entry_notification}", file=sys.stderr)
333                return str(event.passkey_entry_notification)
334
335        assert False
336