1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import re 16import sys 17import textwrap 18import threading 19import uuid 20 21from mmi2grpc._helpers import assert_description, match_description 22from mmi2grpc._proxy import ProfileProxy 23from mmi2grpc._rootcanal import Dongle 24from pandora.host_grpc import Host 25from pandora.host_pb2 import RANDOM 26from pandora.security_grpc import Security 27from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer 28from pandora_experimental.gatt_grpc import GATT 29 30BASE_UUID = uuid.UUID("00000000-0000-1000-8000-00805F9B34FB") 31 32 33def short_uuid(full: uuid.UUID) -> int: 34 return (uuid.UUID(full).int - BASE_UUID.int) >> 96 35 36 37class HOGPProxy(ProfileProxy): 38 39 def __init__(self, channel, rootcanal): 40 super().__init__(channel) 41 self.host = Host(channel) 42 self.security = Security(channel) 43 self.gatt = GATT(channel) 44 self.rootcanal = rootcanal 45 self.connection = None 46 self.pairing_stream = self.security.OnPairing() 47 self.characteristic_reads = {} 48 49 def test_started(self, test: str, **kwargs): 50 self.rootcanal.select_pts_dongle(Dongle.CSR_RCK_PTS_DONGLE) 51 52 return "OK" 53 54 @assert_description 55 def IUT_INITIATE_CONNECTION(self, pts_addr: bytes, **kwargs): 56 """ 57 Please initiate a GATT connection to the PTS. 58 59 Description: Verify that 60 the Implementation Under Test (IUT) can initiate a GATT connect request 61 to the PTS. 62 """ 63 64 self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=pts_addr).connection 65 66 def secure(): 67 self.security.Secure(connection=self.connection, le=LE_LEVEL3) 68 69 threading.Thread(target=secure).start() 70 71 return "OK" 72 73 @match_description 74 def _mmi_2004(self, pts_addr: bytes, passkey: str, **kwargs): 75 """ 76 Please confirm that 6 digit number is matched with (?P<passkey>[0-9]*). 77 """ 78 received = [] 79 for event in self.pairing_stream: 80 if event.address == pts_addr and event.numeric_comparison == int(passkey): 81 self.pairing_stream.send(PairingEventAnswer( 82 event=event, 83 confirm=True, 84 )) 85 return "OK" 86 received.append(event.numeric_comparison) 87 88 assert False, f"mismatched passcode: expected {passkey}, received {received}" 89 90 @match_description 91 def IUT_SEND_WRITE_REQUEST(self, handle: str, properties: str, **kwargs): 92 r""" 93 Please send write request to handle (?P<handle>\S*) with following value. 94 Client 95 Characteristic Configuration: 96 Properties: \[0x00(?P<properties>\S*)\] 97 """ 98 99 self.gatt.WriteAttFromHandle( 100 connection=self.connection, 101 handle=int(handle, base=16), 102 value=bytes([int(f"0x{properties}", base=16), 0]), 103 ) 104 105 return "OK" 106 107 @match_description 108 def USER_CONFIRM_CHARACTERISTIC(self, body: str, **kwargs): 109 r""" 110 Please verify that following attribute handle/UUID pair was returned 111 containing the UUID for the (.*)\. 112 113 (?P<body>.*) 114 """ 115 116 PATTERN = re.compile( 117 textwrap.dedent(r""" 118 Attribute Handle = (\S*) 119 Characteristic Properties = (?P<properties>\S*) 120 Handle = (?P<handle>\S*) 121 UUID = (?P<uuid>\S*) 122 """).strip().replace("\n", " ")) 123 124 targets = set() 125 126 for match in PATTERN.finditer(body): 127 targets.add(( 128 int(match.group("properties"), base=16), 129 int(match.group("handle"), base=16), 130 int(match.group("uuid"), base=16), 131 )) 132 133 assert len(targets) == body.count("Characteristic Properties"), "safety check that regex is matching something" 134 135 services = self.gatt.DiscoverServices(connection=self.connection).services 136 137 for service in services: 138 for characteristic in service.characteristics: 139 uuid_16 = short_uuid(characteristic.uuid) 140 key = (characteristic.properties, characteristic.handle, uuid_16) 141 if key in targets: 142 targets.remove(key) 143 144 assert not targets, f"could not find handles: {targets}" 145 146 return "OK" 147 148 @match_description 149 def USER_CONFIRM_CHARACTERISTIC_DESCRIPTOR(self, body: str, **kwargs): 150 r""" 151 Please verify that following attribute handle/UUID pair was returned 152 containing the UUID for the (.*)\. 153 154 (?P<body>.*) 155 """ 156 157 PATTERN = re.compile(rf"handle = (?P<handle>\S*)\s* uuid = (?P<uuid>\S*)") 158 159 targets = set() 160 161 for match in PATTERN.finditer(body): 162 targets.add(( 163 int(match.group("handle"), base=16), 164 int(match.group("uuid"), base=16), 165 )) 166 167 assert len(targets) == body.count("uuid = "), "safety check that regex is matching something" 168 169 services = self.gatt.DiscoverServices(connection=self.connection).services 170 171 for service in services: 172 for characteristic in service.characteristics: 173 for descriptor in characteristic.descriptors: 174 uuid_16 = short_uuid(descriptor.uuid) 175 key = (descriptor.handle, uuid_16) 176 if key in targets: 177 targets.remove(key) 178 179 assert not targets, f"could not find handles: {targets}" 180 181 return "OK" 182 183 @match_description 184 def USER_CONFIRM_SERVICE_HANDLE(self, service_name: str, body: str, **kwargs): 185 r""" 186 Please confirm the following handles for (?P<service_name>.*)\. 187 188 (?P<body>.*) 189 """ 190 191 PATTERN = re.compile(r"Start Handle: (?P<start_handle>\S*) End Handle: (?P<end_handle>\S*)") 192 193 SERVICE_UUIDS = { 194 "Device Information": 0x180A, 195 "Battery Service": 0x180F, 196 "Human Interface Device": 0x1812, 197 } 198 199 target_uuid = SERVICE_UUIDS[service_name] 200 201 services = self.gatt.DiscoverServices(connection=self.connection).services 202 203 assert len( 204 PATTERN.findall(body)) == body.count("Start Handle:"), "safety check that regex is matching something" 205 206 for match in PATTERN.finditer(body): 207 start_handle = match.group("start_handle") 208 209 for service in services: 210 if service.handle == int(start_handle, base=16): 211 assert (short_uuid(service.uuid) == target_uuid), "service UUID does not match expected type" 212 break 213 else: 214 assert False, f"cannot find service with start handle {start_handle}" 215 216 return "OK" 217 218 @assert_description 219 def _mmi_1(self, **kwargs): 220 """ 221 Please confirm that the IUT ignored the received Notification and did 222 not report the values to the Upper Tester. 223 """ 224 225 # TODO 226 227 return "OK" 228 229 @match_description 230 def IUT_CONFIG_NOTIFICATION(self, value: str, **kwargs): 231 r""" 232 Please write to Client Characteristic Configuration Descriptor of Report 233 characteristic to enable notification. 234 235 Descriptor handle value: (?P<value>\S*) 236 """ 237 238 self.gatt.WriteAttFromHandle( 239 connection=self.connection, 240 handle=int(value, base=16), 241 value=bytes([0x01, 0x00]), 242 ) 243 244 return "OK" 245 246 @match_description 247 def IUT_READ_CHARACTERISTIC(self, test: str, characteristic_name: str, handle: str, **kwargs): 248 r""" 249 Please send Read Request to read (?P<characteristic_name>.*) characteristic with handle = 250 (?P<handle>\S*). 251 """ 252 253 TESTS_READING_CHARACTERISTIC_NOT_DESCRIPTORS = [ 254 "HOGP/RH/HGRF/BV-01-I", 255 "HOGP/RH/HGRF/BV-10-I", 256 "HOGP/RH/HGRF/BV-12-I", 257 ] 258 259 action = (self.gatt.ReadCharacteristicFromHandle if test in TESTS_READING_CHARACTERISTIC_NOT_DESCRIPTORS else 260 self.gatt.ReadCharacteristicDescriptorFromHandle) 261 262 handle = int(handle, base=16) 263 self.characteristic_reads[handle] = action( 264 connection=self.connection, 265 handle=handle, 266 ).value.value 267 268 return "OK" 269 270 @match_description 271 def USER_CONFIRM_READ_RESULT(self, characteristic_name: str, body: str, **kwargs): 272 r""" 273 Please verify following (?P<characteristic_name>.*) Characteristic value is Read. 274 275 (?P<body>.*) 276 """ 277 278 blocks = re.split("Handle:", body) 279 280 HEX = "[0-9A-F]" 281 PATTERN = re.compile(f"0x{HEX*2}(?:{HEX*2})?") 282 283 num_checks = 0 284 285 for block in blocks: 286 data = PATTERN.findall(block) 287 if not data: 288 continue 289 290 # first hex value is the handle, rest is the expected data 291 handle, *data = data 292 293 handle = int(handle, base=16) 294 295 actual = self.characteristic_reads[handle] 296 297 expected = [] 298 for word in data: 299 if len(word) == len("0x0000"): 300 first = int(word[2:4], base=16) 301 second = int(word[4:6], base=16) 302 303 if "bytes in LSB order" in body: 304 little = first 305 big = second 306 else: 307 little = second 308 big = first 309 310 expected.append(little) 311 expected.append(big) 312 else: 313 expected.append(int(word, base=16)) 314 315 expected = bytes(expected) 316 317 num_checks += 1 318 assert (expected == actual), f"Got unexpected value for handle {handle}: {repr(expected)} != {repr(actual)}" 319 320 assert (body.count("Handle:") == num_checks), "safety check that regex is matching something" 321 322 return "OK" 323 324 @assert_description 325 def MMI_VERIFY_SECURE_ID(self, pts_addr: bytes, **kwargs): 326 """ 327 Please enter the secure ID. 328 """ 329 330 for event in self.pairing_stream: 331 if event.address == pts_addr and event.passkey_entry_notification: 332 print(f"Got passkey entry {event.passkey_entry_notification}", file=sys.stderr) 333 return str(event.passkey_entry_notification) 334 335 assert False 336