1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import sys 15import time 16from typing import Set 17from lib import cs 18from lib import ranging_base_test 19from lib import rssi 20from lib import rtt 21from lib import utils 22from lib import uwb 23from lib.params import * 24from lib.ranging_decorator import * 25from mobly import asserts 26from mobly import config_parser 27from mobly import suite_runner 28 29 30_TEST_CASES = ( 31 "test_one_to_one_uwb_ranging", 32 "test_one_to_one_uwb_ranging_provisioned_sts", 33 "test_one_to_one_uwb_ranging_disable_range_data_ntf", 34 "test_one_to_one_rtt_ranging", 35 "test_one_to_one_ble_rssi_ranging", 36 "test_one_to_one_ble_cs_ranging", 37) 38 39SERVICE_UUID = "0000fffb-0000-1000-8000-00805f9b34fc" 40 41class RangingManagerTest(ranging_base_test.RangingBaseTest): 42 """Tests for UWB Ranging APIs. 43 44 Attributes: 45 46 android_devices: list of android device objects. 47 """ 48 49 def __init__(self, configs: config_parser.TestRunConfig): 50 """Init method for the test class. 51 52 Args: 53 54 configs: A config_parser.TestRunConfig object. 55 """ 56 super().__init__(configs) 57 self.tests = _TEST_CASES 58 59 def setup_class(self): 60 super().setup_class() 61 self.devices = [RangingDecorator(ad) for ad in self.android_devices] 62 self.initiator, self.responder = self.devices 63 64 self.initiator.uwb_address = [1, 2] 65 self.responder.uwb_address = [3, 4] 66 67 def setup_test(self): 68 super().setup_test() 69 for device in self.devices: 70 utils.set_airplane_mode(device.ad, state=False) 71 if device.is_ranging_technology_supported(RangingTechnology.UWB): 72 utils.set_uwb_state_and_verify(device.ad, state=True) 73 utils.set_snippet_foreground_state(device.ad, isForeground=True) 74 utils.set_screen_state(device.ad, on=True) 75 76 def teardown_test(self): 77 super().teardown_test() 78 for device in self.devices: 79 device.clear_ranging_sessions() 80 81 ### Helpers ### 82 83 def _start_mutual_ranging_and_assert_started( 84 self, 85 session_handle: str, 86 initiator_preference: RangingPreference, 87 responder_preference: RangingPreference, 88 technologies: Set[RangingTechnology], 89 ): 90 """Starts one-to-one ranging session between initiator and responder. 91 92 Args: 93 session_id: id to use for the ranging session. 94 """ 95 self.initiator.start_ranging_and_assert_opened( 96 session_handle, initiator_preference 97 ) 98 if responder_preference is not None: 99 self.responder.start_ranging_and_assert_opened( 100 session_handle, responder_preference 101 ) 102 103 asserts.assert_true( 104 self.initiator.verify_received_data_from_peer_using_technologies( 105 session_handle, 106 self.responder.id, 107 technologies, 108 ), 109 f"Initiator did not find responder", 110 ) 111 if responder_preference is not None: 112 asserts.assert_true( 113 self.responder.verify_received_data_from_peer_using_technologies( 114 session_handle, 115 self.initiator.id, 116 technologies, 117 ), 118 f"Responder did not find initiator", 119 ) 120 121 def _reset_bt_state(self): 122 utils.reset_bt_state(self.initiator.ad) 123 utils.reset_bt_state(self.responder.ad) 124 125 126 def _ble_connect(self): 127 """Create BLE GATT connection between initiator and responder. 128 129 """ 130 # Start and advertise regular server 131 self.responder.ad.bluetooth.createAndAdvertiseServer(SERVICE_UUID) 132 # Connect to the advertisement 133 self.responder.bt_addr = self.initiator.ad.bluetooth.connectGatt(SERVICE_UUID) 134 asserts.assert_true(self.responder.bt_addr, "Server not connected") 135 # Check the target UUID is present 136 asserts.assert_true(self.initiator.ad.bluetooth.containsService(SERVICE_UUID), "Service not found") 137 connected_devices = self.responder.ad.bluetooth.getConnectedDevices() 138 asserts.assert_true(connected_devices, "No clients found connected to server") 139 self.initiator.bt_addr = connected_devices[0] 140 141 def _ble_disconnect(self): 142 asserts.assert_true( 143 self.initiator.ad.bluetooth.connectGatt(SERVICE_UUID), "Server not disconnected") 144 145 def _ble_bond(self): 146 """Create BLE GATT connection and bonding between initiator and responder. 147 148 """ 149 # Start and advertise regular server 150 self.responder.ad.bluetooth.createAndAdvertiseServer(SERVICE_UUID) 151 oob_data = self.responder.ad.bluetooth.generateServerLocalOobData() 152 asserts.assert_true(oob_data, "OOB data not generated") 153 # Connect to the advertisement using OOB data generated on responder. 154 self.responder.bt_addr = self.initiator.ad.bluetooth.createBondOob(SERVICE_UUID, oob_data) 155 # Check the target UUID is present 156 asserts.assert_true(self.initiator.ad.bluetooth.containsService(SERVICE_UUID), "Service not found") 157 asserts.assert_true(self.responder.bt_addr, "Server not bonded") 158 connected_devices = self.responder.ad.bluetooth.getConnectedDevices() 159 asserts.assert_true(connected_devices, "No clients found connected to server") 160 self.initiator.bt_addr = connected_devices[0] 161 162 def _ble_unbond(self): 163 asserts.assert_true( 164 self.initiator.ad.bluetooth.removeBond(SERVICE_UUID), "Server not unbonded") 165 166 ### Test Cases ### 167 168 def test_one_to_one_uwb_ranging(self): 169 """Verifies uwb ranging with peer device, devices range for 10 seconds.""" 170 SESSION_HANDLE = str(uuid4()) 171 UWB_SESSION_ID = 5 172 TECHNOLOGIES = {RangingTechnology.UWB} 173 174 asserts.skip_if( 175 not self.responder.is_ranging_technology_supported(RangingTechnology.UWB), 176 f"UWB not supported by responder", 177 ) 178 asserts.skip_if( 179 not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB), 180 f"UWB not supported by initiator", 181 ) 182 183 initiator_preference = RangingPreference( 184 device_role=DeviceRole.INITIATOR, 185 ranging_params=RawInitiatorRangingParams( 186 peer_params=[ 187 DeviceParams( 188 peer_id=self.responder.id, 189 uwb_params=uwb.UwbRangingParams( 190 session_id=UWB_SESSION_ID, 191 config_id=uwb.ConfigId.UNICAST_DS_TWR, 192 device_address=self.initiator.uwb_address, 193 peer_address=self.responder.uwb_address, 194 ), 195 ) 196 ], 197 ), 198 ) 199 200 responder_preference = RangingPreference( 201 device_role=DeviceRole.RESPONDER, 202 ranging_params=RawResponderRangingParams( 203 peer_params=DeviceParams( 204 peer_id=self.initiator.id, 205 uwb_params=uwb.UwbRangingParams( 206 session_id=UWB_SESSION_ID, 207 config_id=uwb.ConfigId.UNICAST_DS_TWR, 208 device_address=self.responder.uwb_address, 209 peer_address=self.initiator.uwb_address, 210 ), 211 ), 212 ), 213 ) 214 215 self._start_mutual_ranging_and_assert_started( 216 SESSION_HANDLE, 217 initiator_preference, 218 responder_preference, 219 TECHNOLOGIES, 220 ) 221 222 time.sleep(10) 223 224 asserts.assert_true( 225 self.initiator.verify_received_data_from_peer_using_technologies( 226 SESSION_HANDLE, self.responder.id, TECHNOLOGIES 227 ), 228 "Initiator did not find responder", 229 ) 230 asserts.assert_true( 231 self.responder.verify_received_data_from_peer_using_technologies( 232 SESSION_HANDLE, 233 self.initiator.id, 234 TECHNOLOGIES, 235 ), 236 "Responder did not find initiator", 237 ) 238 239 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 240 self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE) 241 242 def test_one_to_one_uwb_ranging_provisioned_sts(self): 243 """Verifies uwb ranging with peer device using provisioned sts""" 244 SESSION_HANDLE = str(uuid4()) 245 UWB_SESSION_ID = 5 246 TECHNOLOGIES = {RangingTechnology.UWB} 247 248 asserts.skip_if( 249 not self.responder.is_ranging_technology_supported(RangingTechnology.UWB), 250 f"UWB not supported by responder", 251 ) 252 asserts.skip_if( 253 not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB), 254 f"UWB not supported by initiator", 255 ) 256 257 initiator_preference = RangingPreference( 258 device_role=DeviceRole.INITIATOR, 259 ranging_params=RawInitiatorRangingParams( 260 peer_params=[ 261 DeviceParams( 262 peer_id=self.responder.id, 263 uwb_params=uwb.UwbRangingParams( 264 session_id=UWB_SESSION_ID, 265 config_id=uwb.ConfigId.PROVISIONED_UNICAST_DS_TWR, 266 device_address=self.initiator.uwb_address, 267 peer_address=self.responder.uwb_address, 268 ), 269 ) 270 ], 271 ), 272 ) 273 274 responder_preference = RangingPreference( 275 device_role=DeviceRole.RESPONDER, 276 ranging_params=RawResponderRangingParams( 277 peer_params=DeviceParams( 278 peer_id=self.initiator.id, 279 uwb_params=uwb.UwbRangingParams( 280 session_id=UWB_SESSION_ID, 281 config_id=uwb.ConfigId.PROVISIONED_UNICAST_DS_TWR, 282 device_address=self.responder.uwb_address, 283 peer_address=self.initiator.uwb_address, 284 ), 285 ), 286 ), 287 ) 288 289 self._start_mutual_ranging_and_assert_started( 290 SESSION_HANDLE, initiator_preference, responder_preference, TECHNOLOGIES 291 ) 292 293 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 294 self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE) 295 296 def test_one_to_one_uwb_ranging_disable_range_data_ntf(self): 297 """Verifies device does not receive range data after disabling range data notifications""" 298 SESSION_HANDLE = str(uuid4()) 299 UWB_SESSION_ID = 5 300 asserts.skip_if( 301 not self.responder.is_ranging_technology_supported(RangingTechnology.UWB), 302 f"UWB not supported by responder", 303 ) 304 asserts.skip_if( 305 not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB), 306 f"UWB not supported by initiator", 307 ) 308 initiator_preference = RangingPreference( 309 device_role=DeviceRole.INITIATOR, 310 ranging_params=RawInitiatorRangingParams( 311 peer_params=[ 312 DeviceParams( 313 peer_id=self.responder.id, 314 uwb_params=uwb.UwbRangingParams( 315 session_id=UWB_SESSION_ID, 316 config_id=uwb.ConfigId.UNICAST_DS_TWR, 317 device_address=self.initiator.uwb_address, 318 peer_address=self.responder.uwb_address, 319 ), 320 ) 321 ], 322 ), 323 enable_range_data_notifications=False, 324 ) 325 326 responder_preference = RangingPreference( 327 device_role=DeviceRole.RESPONDER, 328 ranging_params=RawResponderRangingParams( 329 peer_params=DeviceParams( 330 peer_id=self.initiator.id, 331 uwb_params=uwb.UwbRangingParams( 332 session_id=UWB_SESSION_ID, 333 config_id=uwb.ConfigId.UNICAST_DS_TWR, 334 device_address=self.responder.uwb_address, 335 peer_address=self.initiator.uwb_address, 336 ), 337 ), 338 ), 339 enable_range_data_notifications=True, 340 ) 341 342 self.initiator.start_ranging_and_assert_opened( 343 SESSION_HANDLE, initiator_preference 344 ) 345 self.responder.start_ranging_and_assert_opened( 346 SESSION_HANDLE, responder_preference 347 ) 348 349 asserts.assert_false( 350 self.initiator.verify_received_data_from_peer( 351 SESSION_HANDLE, self.responder.id 352 ), 353 "Initiator found responder but initiator has range data" 354 " notifications disabled", 355 ) 356 asserts.assert_true( 357 self.responder.verify_received_data_from_peer( 358 SESSION_HANDLE, self.initiator.id 359 ), 360 "Responder did not find initiator but responder has range data" 361 " notifications enabled", 362 ) 363 364 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 365 self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE) 366 367 def test_one_to_one_rtt_ranging(self): 368 """Verifies uwb ranging with peer device, devices range for 10 seconds.""" 369 SESSION_HANDLE = str(uuid4()) 370 TECHNOLOGIES = {RangingTechnology.WIFI_RTT} 371 372 asserts.skip_if( 373 not self.responder.is_ranging_technology_supported(RangingTechnology.WIFI_RTT), 374 f"Wifi nan rtt not supported by responder", 375 ) 376 asserts.skip_if( 377 not self.initiator.is_ranging_technology_supported(RangingTechnology.WIFI_RTT), 378 f"Wifi nan rtt not supported by initiator", 379 ) 380 381 initiator_preference = RangingPreference( 382 device_role=DeviceRole.INITIATOR, 383 ranging_params=RawInitiatorRangingParams( 384 peer_params=[ 385 DeviceParams( 386 peer_id=self.responder.id, 387 rtt_params=rtt.RttRangingParams( 388 service_name="test_service_name1", 389 ), 390 ) 391 ], 392 ), 393 ) 394 395 responder_preference = RangingPreference( 396 device_role=DeviceRole.RESPONDER, 397 ranging_params=RawResponderRangingParams( 398 peer_params=DeviceParams( 399 peer_id=self.initiator.id, 400 rtt_params=rtt.RttRangingParams( 401 service_name="test_service_name1", 402 ), 403 ), 404 ), 405 ) 406 407 # Should be able to call _start_mutual_ranging_and_assert_started once we get consistent data. 408 self.initiator.start_ranging_and_assert_opened( 409 SESSION_HANDLE, initiator_preference 410 ) 411 self.responder.start_ranging_and_assert_opened( 412 SESSION_HANDLE, responder_preference 413 ) 414 415 time.sleep(10) 416 asserts.assert_true( 417 self.initiator.verify_received_data_from_peer_using_technologies( 418 SESSION_HANDLE, self.responder.id, TECHNOLOGIES 419 ), 420 "Initiator did not find responder", 421 ) 422 423 # Enable when this is supported. 424 # asserts.assert_true( 425 # self.responder.verify_peer_found_with_technologies( 426 # SESSION_HANDLE, 427 # self.initiator.id, 428 # TECHNOLOGIES, 429 # ), 430 # "Responder did not find initiator", 431 # ) 432 433 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 434 self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE) 435 436 def test_one_to_one_ble_rssi_ranging(self): 437 """Verifies cs ranging with peer device, devices range for 10 seconds.""" 438 SESSION_HANDLE = str(uuid4()) 439 TECHNOLOGIES = {RangingTechnology.BLE_RSSI} 440 441 asserts.skip_if( 442 not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_RSSI), 443 f"BLE RSSI not supported by responder", 444 ) 445 asserts.skip_if( 446 not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_RSSI), 447 f"BLE RSSI not supported by initiator", 448 ) 449 self._reset_bt_state() 450 451 self._ble_connect() 452 453 initiator_preference = RangingPreference( 454 device_role=DeviceRole.INITIATOR, 455 ranging_params=RawInitiatorRangingParams( 456 peer_params=[ 457 DeviceParams( 458 peer_id=self.responder.id, 459 rssi_params=rssi.BleRssiRangingParams( 460 peer_address=self.responder.bt_addr, 461 ), 462 ) 463 ], 464 ), 465 ) 466 467 responder_preference = RangingPreference( 468 device_role=DeviceRole.RESPONDER, 469 ranging_params=RawResponderRangingParams( 470 peer_params=DeviceParams( 471 peer_id=self.initiator.id, 472 rssi_params=rssi.BleRssiRangingParams( 473 peer_address=self.initiator.bt_addr, 474 ), 475 ), 476 ), 477 ) 478 479 try: 480 self._start_mutual_ranging_and_assert_started( 481 SESSION_HANDLE, 482 initiator_preference, 483 responder_preference, 484 TECHNOLOGIES, 485 ) 486 487 time.sleep(10) 488 489 asserts.assert_true( 490 self.initiator.verify_received_data_from_peer_using_technologies( 491 SESSION_HANDLE, 492 self.responder.id, 493 TECHNOLOGIES 494 ), 495 "Initiator did not find responder", 496 ) 497 asserts.assert_true( 498 self.responder.verify_received_data_from_peer_using_technologies( 499 SESSION_HANDLE, 500 self.initiator.id, 501 TECHNOLOGIES, 502 ), 503 "Responder did not find initiator", 504 ) 505 finally: 506 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 507 self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE) 508 509 self._ble_disconnect() 510 511 def test_one_to_one_ble_cs_ranging(self): 512 """ 513 Verifies cs ranging with peer device, devices range for 10 seconds. 514 This test is only one way since we don't test if responder also can simultaneously get the data. 515 """ 516 SESSION_HANDLE = str(uuid4()) 517 TECHNOLOGIES = {RangingTechnology.BLE_CS} 518 519 asserts.skip_if( 520 not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_CS), 521 f"BLE_CS not supported by responder", 522 ) 523 asserts.skip_if( 524 not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_CS), 525 f"BLE CS not supported by initiator", 526 ) 527 self._reset_bt_state() 528 529 self._ble_bond() 530 531 initiator_preference = RangingPreference( 532 device_role=DeviceRole.INITIATOR, 533 ranging_params=RawInitiatorRangingParams( 534 peer_params=[ 535 DeviceParams( 536 peer_id=self.responder.id, 537 cs_params=cs.CsRangingParams( 538 peer_address=self.responder.bt_addr, 539 ), 540 ) 541 ], 542 ), 543 ) 544 545 try: 546 self._start_mutual_ranging_and_assert_started( 547 SESSION_HANDLE, 548 initiator_preference, 549 None, 550 TECHNOLOGIES, 551 ) 552 553 time.sleep(10) 554 555 asserts.assert_true( 556 self.initiator.verify_received_data_from_peer_using_technologies( 557 SESSION_HANDLE, 558 self.responder.id, 559 TECHNOLOGIES 560 ), 561 "Initiator did not find responder", 562 ) 563 finally: 564 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 565 566 self._ble_unbond() 567 568if __name__ == "__main__": 569 if "--" in sys.argv: 570 index = sys.argv.index("--") 571 sys.argv = sys.argv[:1] + sys.argv[index + 1 :] 572 suite_runner.run_suite([RangingManagerTest]) 573