1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import avatar 17import grpc 18import logging 19 20from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices 21from bumble import pandora as bumble_server 22from bumble.gatt import (Characteristic, Service, GATT_VOLUME_CONTROL_SERVICE, GATT_AUDIO_INPUT_CONTROL_SERVICE, 23 GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE, GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, UUID) 24from bumble.l2cap import L2CAP_Control_Frame 25from bumble.pairing import PairingConfig 26from bumble_experimental.gatt import GATTService 27from mobly import base_test, signals, test_runner 28from mobly.asserts import assert_equal # type: ignore 29from mobly.asserts import assert_in # type: ignore 30from mobly.asserts import assert_is_not_none # type: ignore 31from mobly.asserts import assert_not_in # type: ignore 32from mobly.asserts import assert_true # type: ignore 33from pandora.host_pb2 import RANDOM, Connection, DataTypes 34from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer, SecureResponse 35from pandora_experimental.gatt_grpc import GATT 36from pandora_experimental.gatt_grpc_aio import GATT as AioGATT, add_GATTServicer_to_server 37from pandora_experimental.gatt_pb2 import SUCCESS, ReadCharacteristicsFromUuidResponse 38from typing import Optional, Tuple 39 40 41class GattTest(base_test.BaseTestClass): # type: ignore[misc] 42 devices: Optional[PandoraDevices] = None 43 44 # pandora devices. 45 dut: PandoraDevice 46 ref: PandoraDevice 47 48 def setup_class(self) -> None: 49 # Register experimental bumble servicers hook. 50 bumble_server.register_servicer_hook( 51 lambda bumble, _, server: add_GATTServicer_to_server(GATTService(bumble.device), server)) 52 53 self.devices = PandoraDevices(self) 54 self.dut, self.ref, *_ = self.devices 55 56 def teardown_class(self) -> None: 57 if self.devices: 58 self.devices.stop_all() 59 60 @avatar.asynchronous 61 async def setup_test(self) -> None: 62 await asyncio.gather(self.dut.reset(), self.ref.reset()) 63 64 def test_print_dut_gatt_services(self) -> None: 65 advertise = self.ref.host.Advertise(legacy=True, connectable=True) 66 dut_ref = self.dut.host.ConnectLE(public=self.ref.address, own_address_type=RANDOM).connection 67 assert_is_not_none(dut_ref) 68 assert dut_ref 69 advertise.cancel() 70 71 gatt = GATT(self.dut.channel) 72 services = gatt.DiscoverServices(dut_ref) 73 self.dut.log.info(f'DUT services: {services}') 74 75 def test_print_ref_gatt_services(self) -> None: 76 advertise = self.dut.host.Advertise( 77 legacy=True, 78 connectable=True, 79 own_address_type=RANDOM, 80 data=DataTypes(manufacturer_specific_data=b'pause cafe'), 81 ) 82 83 scan = self.ref.host.Scan() 84 dut = next((x for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)) 85 scan.cancel() 86 87 ref_dut = self.ref.host.ConnectLE(own_address_type=RANDOM, **dut.address_asdict()).connection 88 assert_is_not_none(ref_dut) 89 assert ref_dut 90 advertise.cancel() 91 92 gatt = GATT(self.ref.channel) 93 services = gatt.DiscoverServices(ref_dut) 94 self.ref.log.info(f'REF services: {services}') 95 96 async def connect_dut_to_ref(self) -> Tuple[Connection, Connection]: 97 ref_advertisement = self.ref.aio.host.Advertise( 98 legacy=True, 99 connectable=True, 100 ) 101 102 dut_connection_to_ref = (await self.dut.aio.host.ConnectLE(public=self.ref.address, 103 own_address_type=RANDOM)).connection 104 assert_is_not_none(dut_connection_to_ref) 105 assert dut_connection_to_ref 106 107 ref_connection_to_dut = (await anext(aiter(ref_advertisement))).connection 108 ref_advertisement.cancel() 109 110 return dut_connection_to_ref, ref_connection_to_dut 111 112 @avatar.asynchronous 113 async def test_read_characteristic_while_pairing(self) -> None: 114 if isinstance(self.dut, BumblePandoraDevice): 115 raise signals.TestSkip('TODO: b/273941061') 116 if not isinstance(self.ref, BumblePandoraDevice): 117 raise signals.TestSkip('Test require Bumble as reference device(s)') 118 119 # arrange: set up GATT service on REF side with a characteristic 120 # that can only be read after pairing 121 SERVICE_UUID = "00005A00-0000-1000-8000-00805F9B34FB" 122 CHARACTERISTIC_UUID = "00006A00-0000-1000-8000-00805F9B34FB" 123 service = Service( 124 SERVICE_UUID, 125 [ 126 Characteristic( 127 CHARACTERISTIC_UUID, 128 Characteristic.READ, 129 Characteristic.READ_REQUIRES_ENCRYPTION, 130 b"Hello, world!", 131 ), 132 ], 133 ) 134 self.ref.device.add_service(service) # type:ignore 135 # disable MITM requirement on REF side (since it only does just works) 136 self.ref.device.pairing_config_factory = lambda _: PairingConfig( # type:ignore 137 sc=True, mitm=False, bonding=True) 138 # manually handle pairing on the DUT side 139 dut_pairing_events = self.dut.aio.security.OnPairing() 140 # set up connection 141 dut_connection_to_ref, ref_connection_to_dut = await self.connect_dut_to_ref() 142 143 # act: initiate pairing from REF side (send a security request) 144 async def ref_secure() -> SecureResponse: 145 return await self.ref.aio.security.Secure(connection=ref_connection_to_dut, le=LE_LEVEL3) 146 147 ref_secure_task = asyncio.create_task(ref_secure()) 148 149 # wait for pairing to start 150 event = await anext(dut_pairing_events) 151 152 # before acknowledging pairing, start a GATT read 153 dut_gatt = AioGATT(self.dut.aio.channel) 154 155 async def dut_read() -> ReadCharacteristicsFromUuidResponse: 156 return await dut_gatt.ReadCharacteristicsFromUuid(dut_connection_to_ref, CHARACTERISTIC_UUID, 1, 0xFFFF) 157 158 dut_read_task = asyncio.create_task(dut_read()) 159 160 await asyncio.sleep(3) 161 162 # now continue with pairing 163 dut_pairing_events.send_nowait(PairingEventAnswer(event=event, confirm=True)) 164 165 # android pops up a second pairing notification for some reason, accept it 166 event = await anext(dut_pairing_events) 167 dut_pairing_events.send_nowait(PairingEventAnswer(event=event, confirm=True)) 168 169 # assert: that the read succeeded (so Android re-tried the read after pairing) 170 read_response = await dut_read_task 171 self.ref.log.info(read_response) 172 assert_equal(read_response.characteristics_read[0].status, SUCCESS) 173 assert_equal(read_response.characteristics_read[0].value.value, b"Hello, world!") 174 175 # make sure pairing was successful 176 ref_secure_res = await ref_secure_task 177 assert_equal(ref_secure_res.result_variant(), 'success') 178 179 @avatar.asynchronous 180 async def test_rediscover_whenever_unbonded(self) -> None: 181 if not isinstance(self.ref, BumblePandoraDevice): 182 raise signals.TestSkip('Test require Bumble as reference device(s)') 183 184 # arrange: set up one GATT service on REF side 185 dut_gatt = AioGATT(self.dut.aio.channel) 186 SERVICE_UUID_1 = "00005A00-0000-1000-8000-00805F9B34FB" 187 SERVICE_UUID_2 = "00005A01-0000-1000-8000-00805F9B34FB" 188 self.ref.device.add_service(Service(SERVICE_UUID_1, [])) # type:ignore 189 # connect both devices 190 dut_connection_to_ref, ref_connection_to_dut = await self.connect_dut_to_ref() 191 192 # act: perform service discovery, disconnect, add the second service, reconnect, and try discovery again 193 first_discovery = await dut_gatt.DiscoverServices(dut_connection_to_ref) 194 await self.ref.aio.host.Disconnect(ref_connection_to_dut) 195 self.ref.device.add_service(Service(SERVICE_UUID_2, [])) # type:ignore 196 dut_connection_to_ref, _ = await self.connect_dut_to_ref() 197 second_discovery = await dut_gatt.DiscoverServices(dut_connection_to_ref) 198 199 # assert: that we found only one service in the first discovery 200 assert_in(SERVICE_UUID_1, (service.uuid for service in first_discovery.services)) 201 assert_not_in(SERVICE_UUID_2, (service.uuid for service in first_discovery.services)) 202 # assert: but found both in the second discovery 203 assert_in(SERVICE_UUID_1, (service.uuid for service in second_discovery.services)) 204 assert_in(SERVICE_UUID_2, (service.uuid for service in second_discovery.services)) 205 206 @avatar.asynchronous 207 async def test_do_not_discover_when_bonded(self) -> None: 208 # NOTE: if service change indication is ever enabled in Bumble, both this test + the previous test must DISABLE IT 209 # otherwise this test will fail, and the previous test will pass even on a broken implementation 210 211 raise signals.TestSkip('TODO(aryarahul): b/276757181') 212 if not isinstance(self.ref, BumblePandoraDevice): 213 raise signals.TestSkip('Test require Bumble as reference device(s)') 214 215 # arrange: set up one GATT service on REF side 216 dut_gatt = AioGATT(self.dut.aio.channel) 217 SERVICE_UUID_1 = "00005A00-0000-1000-8000-00805F9B34FB" 218 SERVICE_UUID_2 = "00005A01-0000-1000-8000-00805F9B34FB" 219 self.ref.device.add_service(Service(SERVICE_UUID_1, [])) # type:ignore 220 # connect both devices 221 dut_connection_to_ref, ref_connection_to_dut = await self.connect_dut_to_ref() 222 # bond devices and disconnect 223 await self.dut.aio.security.Secure(connection=dut_connection_to_ref, le=LE_LEVEL3) 224 await self.ref.aio.host.Disconnect(ref_connection_to_dut) 225 226 # act: connect, perform service discovery, disconnect, add the second service, reconnect, and try discovery again 227 dut_connection_to_ref, ref_connection_to_dut = await self.connect_dut_to_ref() 228 first_discovery = await dut_gatt.DiscoverServices(dut_connection_to_ref) 229 await self.ref.aio.host.Disconnect(ref_connection_to_dut) 230 231 self.ref.device.add_service(Service(SERVICE_UUID_2, [])) # type:ignore 232 dut_connection_to_ref, _ = await self.connect_dut_to_ref() 233 second_discovery = await dut_gatt.DiscoverServices(dut_connection_to_ref) 234 235 # assert: that we found only one service in the first discovery 236 assert_in(SERVICE_UUID_1, (service.uuid for service in first_discovery.services)) 237 assert_not_in(SERVICE_UUID_2, (service.uuid for service in first_discovery.services)) 238 # assert: but found both in the second discovery 239 assert_in(SERVICE_UUID_1, (service.uuid for service in second_discovery.services)) 240 assert_in(SERVICE_UUID_2, (service.uuid for service in second_discovery.services)) 241 242 @avatar.asynchronous 243 async def test_eatt_when_not_encrypted_no_timeout(self) -> None: 244 if not isinstance(self.ref, BumblePandoraDevice): 245 raise signals.TestSkip('Test require Bumble as reference device(s)') 246 advertise = self.dut.aio.host.Advertise( 247 legacy=True, 248 connectable=True, 249 own_address_type=RANDOM, 250 data=DataTypes(manufacturer_specific_data=b'pause cafe'), 251 ) 252 253 scan = self.ref.aio.host.Scan() 254 dut = await anext((x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)) 255 scan.cancel() 256 257 ref_dut = (await self.ref.aio.host.ConnectLE(own_address_type=RANDOM, **dut.address_asdict())).connection 258 assert_is_not_none(ref_dut) 259 assert ref_dut 260 advertise.cancel() 261 262 connection = self.ref.device.lookup_connection(int.from_bytes(ref_dut.cookie.value, 'big')) 263 assert connection 264 265 connection_request = L2CAP_Control_Frame.from_bytes(( 266 b"\x17" # code of L2CAP_CREDIT_BASED_CONNECTION_REQ 267 b"\x01" # identifier 268 b"\x0a\x00" # data length 269 b"\x27\x00" # psm(EATT) 270 b"\x64\x00" # MTU 271 b"\x64\x00" # MPS 272 b"\x64\x00" # initial credit 273 b"\x40\x00" # source cid[0] 274 )) 275 276 fut = asyncio.get_running_loop().create_future() 277 setattr(self.ref.device.l2cap_channel_manager, "on_[0x18]", lambda _, _1, frame: fut.set_result(frame)) 278 self.ref.device.l2cap_channel_manager.send_control_frame( # type:ignore 279 connection, 0x05, connection_request) 280 control_frame = await fut 281 282 assert_equal(bytes(control_frame)[10], 0x05) # All connections refused – insufficient authentication 283 assert_true(await is_connected(self.ref, ref_dut), "Device is no longer connected") 284 285 @avatar.parameterized( 286 ('primary_service',), 287 ('secondary_service',), 288 ) 289 def test_discover_included_service(self, attribute_type: str) -> None: 290 PRIMARY_SERVICE_UUID = GATT_VOLUME_CONTROL_SERVICE 291 INCLUDED_SERVICE_UUID = GATT_AUDIO_INPUT_CONTROL_SERVICE 292 293 is_primary_service = True if attribute_type == 'primary_service' else False 294 included_service = Service(INCLUDED_SERVICE_UUID, [], primary=is_primary_service) 295 primary_service = Service(PRIMARY_SERVICE_UUID, [], included_services=[included_service]) 296 self.ref.device.add_service(included_service) # type: ignore 297 self.ref.device.add_service(primary_service) # type: ignore 298 299 advertise = self.ref.host.Advertise(legacy=True, connectable=True) 300 dut_ref_connection = self.dut.host.ConnectLE(public=self.ref.address, own_address_type=RANDOM).connection 301 assert dut_ref_connection 302 advertise.cancel() # type: ignore 303 304 dut_gatt = GATT(self.dut.channel) # type: ignore 305 services = dut_gatt.DiscoverServices(dut_ref_connection).services 306 307 filtered_services = [service for service in services if service.uuid == PRIMARY_SERVICE_UUID] 308 assert len(filtered_services) == 1 309 primary_service = filtered_services[0] 310 311 included_services_uuids = [included_service.uuid for included_service in primary_service.included_services] 312 assert_in(INCLUDED_SERVICE_UUID, included_services_uuids) 313 314 315async def is_connected(device: PandoraDevice, connection: Connection) -> bool: 316 try: 317 await device.aio.host.WaitDisconnection(connection=connection, timeout=5) 318 return False 319 except grpc.RpcError as e: 320 assert_equal(e.code(), grpc.StatusCode.DEADLINE_EXCEEDED) # type: ignore 321 return True 322 323 324if __name__ == '__main__': 325 logging.basicConfig(level=logging.DEBUG) 326 test_runner.main() # type: ignore 327