1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14import sys
15import time
16from typing import Set
17from lib import cs
18from lib import ranging_base_test
19from lib import rssi
20from lib import rtt
21from lib import utils
22from lib import uwb
23from lib.params import *
24from lib.ranging_decorator import *
25from mobly import asserts
26from mobly import config_parser
27from mobly import suite_runner
28
29
30_TEST_CASES = (
31    "test_one_to_one_uwb_ranging",
32    "test_one_to_one_uwb_ranging_provisioned_sts",
33    "test_one_to_one_uwb_ranging_disable_range_data_ntf",
34    "test_one_to_one_rtt_ranging",
35    "test_one_to_one_ble_rssi_ranging",
36    "test_one_to_one_ble_cs_ranging",
37)
38
39SERVICE_UUID = "0000fffb-0000-1000-8000-00805f9b34fc"
40
41class RangingManagerTest(ranging_base_test.RangingBaseTest):
42  """Tests for UWB Ranging APIs.
43
44  Attributes:
45
46  android_devices: list of android device objects.
47  """
48
49  def __init__(self, configs: config_parser.TestRunConfig):
50    """Init method for the test class.
51
52    Args:
53
54    configs: A config_parser.TestRunConfig object.
55    """
56    super().__init__(configs)
57    self.tests = _TEST_CASES
58
59  def setup_class(self):
60    super().setup_class()
61    self.devices = [RangingDecorator(ad) for ad in self.android_devices]
62    self.initiator, self.responder = self.devices
63
64    self.initiator.uwb_address = [1, 2]
65    self.responder.uwb_address = [3, 4]
66
67  def setup_test(self):
68    super().setup_test()
69    for device in self.devices:
70      utils.set_airplane_mode(device.ad, state=False)
71      if device.is_ranging_technology_supported(RangingTechnology.UWB):
72        utils.set_uwb_state_and_verify(device.ad, state=True)
73      utils.set_snippet_foreground_state(device.ad, isForeground=True)
74      utils.set_screen_state(device.ad, on=True)
75
76  def teardown_test(self):
77    super().teardown_test()
78    for device in self.devices:
79      device.clear_ranging_sessions()
80
81  ### Helpers ###
82
83  def _start_mutual_ranging_and_assert_started(
84      self,
85      session_handle: str,
86      initiator_preference: RangingPreference,
87      responder_preference: RangingPreference,
88      technologies: Set[RangingTechnology],
89  ):
90    """Starts one-to-one ranging session between initiator and responder.
91
92    Args:
93        session_id: id to use for the ranging session.
94    """
95    self.initiator.start_ranging_and_assert_opened(
96        session_handle, initiator_preference
97    )
98    if responder_preference is not None:
99        self.responder.start_ranging_and_assert_opened(
100            session_handle, responder_preference
101        )
102
103    asserts.assert_true(
104        self.initiator.verify_received_data_from_peer_using_technologies(
105            session_handle,
106            self.responder.id,
107            technologies,
108        ),
109        f"Initiator did not find responder",
110    )
111    if responder_preference is not None:
112        asserts.assert_true(
113            self.responder.verify_received_data_from_peer_using_technologies(
114                session_handle,
115                self.initiator.id,
116                technologies,
117            ),
118            f"Responder did not find initiator",
119        )
120
121  def _reset_bt_state(self):
122    utils.reset_bt_state(self.initiator.ad)
123    utils.reset_bt_state(self.responder.ad)
124
125
126  def _ble_connect(self):
127    """Create BLE GATT connection between initiator and responder.
128
129    """
130    # Start and advertise regular server
131    self.responder.ad.bluetooth.createAndAdvertiseServer(SERVICE_UUID)
132    # Connect to the advertisement
133    self.responder.bt_addr = self.initiator.ad.bluetooth.connectGatt(SERVICE_UUID)
134    asserts.assert_true(self.responder.bt_addr, "Server not connected")
135    # Check the target UUID is present
136    asserts.assert_true(self.initiator.ad.bluetooth.containsService(SERVICE_UUID), "Service not found")
137    connected_devices = self.responder.ad.bluetooth.getConnectedDevices()
138    asserts.assert_true(connected_devices, "No clients found connected to server")
139    self.initiator.bt_addr = connected_devices[0]
140
141  def _ble_disconnect(self):
142    asserts.assert_true(
143        self.initiator.ad.bluetooth.connectGatt(SERVICE_UUID), "Server not disconnected")
144
145  def _ble_bond(self):
146    """Create BLE GATT connection and bonding between initiator and responder.
147
148    """
149    # Start and advertise regular server
150    self.responder.ad.bluetooth.createAndAdvertiseServer(SERVICE_UUID)
151    oob_data = self.responder.ad.bluetooth.generateServerLocalOobData()
152    asserts.assert_true(oob_data, "OOB data not generated")
153    # Connect to the advertisement using OOB data generated on responder.
154    self.responder.bt_addr = self.initiator.ad.bluetooth.createBondOob(SERVICE_UUID, oob_data)
155    # Check the target UUID is present
156    asserts.assert_true(self.initiator.ad.bluetooth.containsService(SERVICE_UUID), "Service not found")
157    asserts.assert_true(self.responder.bt_addr, "Server not bonded")
158    connected_devices = self.responder.ad.bluetooth.getConnectedDevices()
159    asserts.assert_true(connected_devices, "No clients found connected to server")
160    self.initiator.bt_addr = connected_devices[0]
161
162  def _ble_unbond(self):
163    asserts.assert_true(
164        self.initiator.ad.bluetooth.removeBond(SERVICE_UUID), "Server not unbonded")
165
166  ### Test Cases ###
167
168  def test_one_to_one_uwb_ranging(self):
169    """Verifies uwb ranging with peer device, devices range for 10 seconds."""
170    SESSION_HANDLE = str(uuid4())
171    UWB_SESSION_ID = 5
172    TECHNOLOGIES = {RangingTechnology.UWB}
173
174    asserts.skip_if(
175        not self.responder.is_ranging_technology_supported(RangingTechnology.UWB),
176        f"UWB not supported by responder",
177    )
178    asserts.skip_if(
179        not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB),
180        f"UWB not supported by initiator",
181    )
182
183    initiator_preference = RangingPreference(
184        device_role=DeviceRole.INITIATOR,
185        ranging_params=RawInitiatorRangingParams(
186            peer_params=[
187                DeviceParams(
188                    peer_id=self.responder.id,
189                    uwb_params=uwb.UwbRangingParams(
190                        session_id=UWB_SESSION_ID,
191                        config_id=uwb.ConfigId.UNICAST_DS_TWR,
192                        device_address=self.initiator.uwb_address,
193                        peer_address=self.responder.uwb_address,
194                    ),
195                )
196            ],
197        ),
198    )
199
200    responder_preference = RangingPreference(
201        device_role=DeviceRole.RESPONDER,
202        ranging_params=RawResponderRangingParams(
203            peer_params=DeviceParams(
204                peer_id=self.initiator.id,
205                uwb_params=uwb.UwbRangingParams(
206                    session_id=UWB_SESSION_ID,
207                    config_id=uwb.ConfigId.UNICAST_DS_TWR,
208                    device_address=self.responder.uwb_address,
209                    peer_address=self.initiator.uwb_address,
210                ),
211            ),
212        ),
213    )
214
215    self._start_mutual_ranging_and_assert_started(
216        SESSION_HANDLE,
217        initiator_preference,
218        responder_preference,
219        TECHNOLOGIES,
220    )
221
222    time.sleep(10)
223
224    asserts.assert_true(
225        self.initiator.verify_received_data_from_peer_using_technologies(
226            SESSION_HANDLE, self.responder.id, TECHNOLOGIES
227        ),
228        "Initiator did not find responder",
229    )
230    asserts.assert_true(
231        self.responder.verify_received_data_from_peer_using_technologies(
232            SESSION_HANDLE,
233            self.initiator.id,
234            TECHNOLOGIES,
235        ),
236        "Responder did not find initiator",
237    )
238
239    self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
240    self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE)
241
242  def test_one_to_one_uwb_ranging_provisioned_sts(self):
243    """Verifies uwb ranging with peer device using provisioned sts"""
244    SESSION_HANDLE = str(uuid4())
245    UWB_SESSION_ID = 5
246    TECHNOLOGIES = {RangingTechnology.UWB}
247
248    asserts.skip_if(
249        not self.responder.is_ranging_technology_supported(RangingTechnology.UWB),
250        f"UWB not supported by responder",
251    )
252    asserts.skip_if(
253      not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB),
254      f"UWB not supported by initiator",
255  )
256
257    initiator_preference = RangingPreference(
258        device_role=DeviceRole.INITIATOR,
259        ranging_params=RawInitiatorRangingParams(
260            peer_params=[
261                DeviceParams(
262                    peer_id=self.responder.id,
263                    uwb_params=uwb.UwbRangingParams(
264                        session_id=UWB_SESSION_ID,
265                        config_id=uwb.ConfigId.PROVISIONED_UNICAST_DS_TWR,
266                        device_address=self.initiator.uwb_address,
267                        peer_address=self.responder.uwb_address,
268                    ),
269                )
270            ],
271        ),
272    )
273
274    responder_preference = RangingPreference(
275        device_role=DeviceRole.RESPONDER,
276        ranging_params=RawResponderRangingParams(
277            peer_params=DeviceParams(
278                peer_id=self.initiator.id,
279                uwb_params=uwb.UwbRangingParams(
280                    session_id=UWB_SESSION_ID,
281                    config_id=uwb.ConfigId.PROVISIONED_UNICAST_DS_TWR,
282                    device_address=self.responder.uwb_address,
283                    peer_address=self.initiator.uwb_address,
284                ),
285            ),
286        ),
287    )
288
289    self._start_mutual_ranging_and_assert_started(
290        SESSION_HANDLE, initiator_preference, responder_preference, TECHNOLOGIES
291    )
292
293    self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
294    self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE)
295
296  def test_one_to_one_uwb_ranging_disable_range_data_ntf(self):
297    """Verifies device does not receive range data after disabling range data notifications"""
298    SESSION_HANDLE = str(uuid4())
299    UWB_SESSION_ID = 5
300    asserts.skip_if(
301        not self.responder.is_ranging_technology_supported(RangingTechnology.UWB),
302        f"UWB not supported by responder",
303    )
304    asserts.skip_if(
305        not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB),
306        f"UWB not supported by initiator",
307    )
308    initiator_preference = RangingPreference(
309        device_role=DeviceRole.INITIATOR,
310        ranging_params=RawInitiatorRangingParams(
311            peer_params=[
312                DeviceParams(
313                    peer_id=self.responder.id,
314                    uwb_params=uwb.UwbRangingParams(
315                        session_id=UWB_SESSION_ID,
316                        config_id=uwb.ConfigId.UNICAST_DS_TWR,
317                        device_address=self.initiator.uwb_address,
318                        peer_address=self.responder.uwb_address,
319                    ),
320                )
321            ],
322        ),
323        enable_range_data_notifications=False,
324    )
325
326    responder_preference = RangingPreference(
327        device_role=DeviceRole.RESPONDER,
328        ranging_params=RawResponderRangingParams(
329            peer_params=DeviceParams(
330                peer_id=self.initiator.id,
331                uwb_params=uwb.UwbRangingParams(
332                    session_id=UWB_SESSION_ID,
333                    config_id=uwb.ConfigId.UNICAST_DS_TWR,
334                    device_address=self.responder.uwb_address,
335                    peer_address=self.initiator.uwb_address,
336                ),
337            ),
338        ),
339        enable_range_data_notifications=True,
340    )
341
342    self.initiator.start_ranging_and_assert_opened(
343        SESSION_HANDLE, initiator_preference
344    )
345    self.responder.start_ranging_and_assert_opened(
346        SESSION_HANDLE, responder_preference
347    )
348
349    asserts.assert_false(
350        self.initiator.verify_received_data_from_peer(
351            SESSION_HANDLE, self.responder.id
352        ),
353        "Initiator found responder but initiator has range data"
354        " notifications disabled",
355    )
356    asserts.assert_true(
357        self.responder.verify_received_data_from_peer(
358            SESSION_HANDLE, self.initiator.id
359        ),
360        "Responder did not find initiator but responder has range data"
361        " notifications enabled",
362    )
363
364    self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
365    self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE)
366
367  def test_one_to_one_rtt_ranging(self):
368    """Verifies uwb ranging with peer device, devices range for 10 seconds."""
369    SESSION_HANDLE = str(uuid4())
370    TECHNOLOGIES = {RangingTechnology.WIFI_RTT}
371
372    asserts.skip_if(
373        not self.responder.is_ranging_technology_supported(RangingTechnology.WIFI_RTT),
374        f"Wifi nan rtt not supported by responder",
375    )
376    asserts.skip_if(
377        not self.initiator.is_ranging_technology_supported(RangingTechnology.WIFI_RTT),
378        f"Wifi nan rtt not supported by initiator",
379    )
380
381    initiator_preference = RangingPreference(
382        device_role=DeviceRole.INITIATOR,
383        ranging_params=RawInitiatorRangingParams(
384            peer_params=[
385                DeviceParams(
386                    peer_id=self.responder.id,
387                    rtt_params=rtt.RttRangingParams(
388                        service_name="test_service_name1",
389                    ),
390                )
391            ],
392        ),
393    )
394
395    responder_preference = RangingPreference(
396        device_role=DeviceRole.RESPONDER,
397        ranging_params=RawResponderRangingParams(
398            peer_params=DeviceParams(
399                peer_id=self.initiator.id,
400                rtt_params=rtt.RttRangingParams(
401                    service_name="test_service_name1",
402                ),
403            ),
404        ),
405    )
406
407    # Should be able to call _start_mutual_ranging_and_assert_started once we get consistent data.
408    self.initiator.start_ranging_and_assert_opened(
409        SESSION_HANDLE, initiator_preference
410    )
411    self.responder.start_ranging_and_assert_opened(
412        SESSION_HANDLE, responder_preference
413    )
414
415    time.sleep(10)
416    asserts.assert_true(
417        self.initiator.verify_received_data_from_peer_using_technologies(
418            SESSION_HANDLE, self.responder.id, TECHNOLOGIES
419        ),
420        "Initiator did not find responder",
421    )
422
423    # Enable when this is supported.
424    # asserts.assert_true(
425    #     self.responder.verify_peer_found_with_technologies(
426    #         SESSION_HANDLE,
427    #         self.initiator.id,
428    #         TECHNOLOGIES,
429    #     ),
430    #     "Responder did not find initiator",
431    # )
432
433    self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
434    self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE)
435
436  def test_one_to_one_ble_rssi_ranging(self):
437    """Verifies cs ranging with peer device, devices range for 10 seconds."""
438    SESSION_HANDLE = str(uuid4())
439    TECHNOLOGIES = {RangingTechnology.BLE_RSSI}
440
441    asserts.skip_if(
442        not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_RSSI),
443        f"BLE RSSI not supported by responder",
444    )
445    asserts.skip_if(
446        not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_RSSI),
447        f"BLE RSSI not supported by initiator",
448    )
449    self._reset_bt_state()
450
451    self._ble_connect()
452
453    initiator_preference = RangingPreference(
454        device_role=DeviceRole.INITIATOR,
455        ranging_params=RawInitiatorRangingParams(
456            peer_params=[
457                DeviceParams(
458                    peer_id=self.responder.id,
459                    rssi_params=rssi.BleRssiRangingParams(
460                      peer_address=self.responder.bt_addr,
461                    ),
462                )
463            ],
464        ),
465    )
466
467    responder_preference = RangingPreference(
468        device_role=DeviceRole.RESPONDER,
469        ranging_params=RawResponderRangingParams(
470            peer_params=DeviceParams(
471                peer_id=self.initiator.id,
472                rssi_params=rssi.BleRssiRangingParams(
473                  peer_address=self.initiator.bt_addr,
474                ),
475            ),
476        ),
477    )
478
479    try:
480      self._start_mutual_ranging_and_assert_started(
481          SESSION_HANDLE,
482          initiator_preference,
483          responder_preference,
484          TECHNOLOGIES,
485      )
486
487      time.sleep(10)
488
489      asserts.assert_true(
490          self.initiator.verify_received_data_from_peer_using_technologies(
491              SESSION_HANDLE,
492              self.responder.id,
493              TECHNOLOGIES
494          ),
495          "Initiator did not find responder",
496      )
497      asserts.assert_true(
498          self.responder.verify_received_data_from_peer_using_technologies(
499              SESSION_HANDLE,
500              self.initiator.id,
501              TECHNOLOGIES,
502          ),
503          "Responder did not find initiator",
504      )
505    finally:
506      self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
507      self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE)
508
509      self._ble_disconnect()
510
511  def test_one_to_one_ble_cs_ranging(self):
512    """
513    Verifies cs ranging with peer device, devices range for 10 seconds.
514    This test is only one way since we don't test if responder also can simultaneously get the data.
515    """
516    SESSION_HANDLE = str(uuid4())
517    TECHNOLOGIES = {RangingTechnology.BLE_CS}
518
519    asserts.skip_if(
520        not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_CS),
521        f"BLE_CS not supported by responder",
522    )
523    asserts.skip_if(
524        not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_CS),
525        f"BLE CS not supported by initiator",
526    )
527    self._reset_bt_state()
528
529    self._ble_bond()
530
531    initiator_preference = RangingPreference(
532        device_role=DeviceRole.INITIATOR,
533        ranging_params=RawInitiatorRangingParams(
534            peer_params=[
535                DeviceParams(
536                    peer_id=self.responder.id,
537                    cs_params=cs.CsRangingParams(
538                      peer_address=self.responder.bt_addr,
539                    ),
540                )
541            ],
542        ),
543    )
544
545    try:
546      self._start_mutual_ranging_and_assert_started(
547          SESSION_HANDLE,
548          initiator_preference,
549          None,
550          TECHNOLOGIES,
551      )
552
553      time.sleep(10)
554
555      asserts.assert_true(
556          self.initiator.verify_received_data_from_peer_using_technologies(
557              SESSION_HANDLE,
558              self.responder.id,
559              TECHNOLOGIES
560          ),
561          "Initiator did not find responder",
562      )
563    finally:
564      self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
565
566      self._ble_unbond()
567
568if __name__ == "__main__":
569  if "--" in sys.argv:
570    index = sys.argv.index("--")
571    sys.argv = sys.argv[:1] + sys.argv[index + 1 :]
572  suite_runner.run_suite([RangingManagerTest])
573