1"""Test utils for UWB."""
2
3import logging
4import random
5import time
6from typing import List, Optional
7from lib import uwb_ranging_decorator
8from mobly import asserts
9from mobly.controllers import android_device
10from mobly.controllers.android_device_lib import adb
11from mobly.controllers.android_device_lib import callback_handler_v2
12
13WAIT_TIME_SEC = 3
14
15
16def verify_uwb_state_callback(
17    ad: android_device.AndroidDevice,
18    uwb_event: str,
19    handler: Optional[callback_handler_v2.CallbackHandlerV2] = None,
20    timeout: int = WAIT_TIME_SEC,
21) -> bool:
22  """Verifies expected UWB callback is received.
23
24  Args:
25    ad: android device object.
26    uwb_event: expected callback event.
27    handler: callback handler.
28    timeout: timeout for callback event.
29
30  Returns:
31    True if expected callback is received, False if not.
32  """
33  callback_status = False
34  callback_key = None
35  start_time = time.time()
36  if handler is None:
37    callback_key = "uwb_state_%s" % random.randint(1, 100)
38    handler = ad.uwb.registerUwbAdapterStateCallback(callback_key)
39  # wait until expected callback is received.
40  while time.time() - start_time < timeout and not callback_status:
41    time.sleep(0.1)
42    events = handler.getAll("UwbAdapterStateCallback")
43    for event in events:
44      event_received = event.data["uwbAdapterStateEvent"]
45      logging.debug("Received event - %s", event_received)
46      if event_received == uwb_event:
47        logging.debug("Received the '%s' callback in %ss", uwb_event,
48                      round(time.time() - start_time, 2))
49        callback_status = True
50        break
51  if callback_key is not None:
52    ad.uwb.unregisterUwbAdapterStateCallback(callback_key)
53  return callback_status
54
55
56def get_uwb_state(ad: android_device.AndroidDevice) -> bool:
57  """Gets the current UWB state.
58
59  Args:
60    ad: android device object.
61
62  Returns:
63    UWB state, True if enabled, False if not.
64  """
65  if ad.build_info["build_id"].startswith("S"):
66    uwb_state = bool(ad.uwb.getAdapterState())
67  else:
68    uwb_state = ad.uwb.isUwbEnabled()
69  return uwb_state
70
71
72def set_uwb_state_and_verify(
73    ad: android_device.AndroidDevice,
74    state: bool,
75    handler: Optional[callback_handler_v2.CallbackHandlerV2] = None,
76):
77  """Sets UWB state to on or off and verifies it.
78
79  Args:
80    ad: android device object.
81    state: bool, True for UWB on, False for off.
82    handler: callback_handler.
83  """
84  failure_msg = "enabled" if state else "disabled"
85  ad.uwb.setUwbEnabled(state)
86  event_str = "Inactive" if state else "Disabled"
87  asserts.assert_true(verify_uwb_state_callback(ad, event_str, handler),
88                      "Uwb is not %s" % failure_msg)
89
90
91def verify_peer_found(ranging_dut: uwb_ranging_decorator.UwbRangingDecorator,
92                      peer_addr: List[int], session: int = 0,
93                      timeout: int = WAIT_TIME_SEC):
94  """Verifies if the UWB peer is found.
95
96  Args:
97    ranging_dut: uwb ranging device.
98    peer_addr: uwb peer device address.
99    session: session id.
100    timeout: timeout for peer to be found.
101  """
102  ranging_dut.ad.log.info("Look for peer: %s" % peer_addr)
103  start_time = time.time()
104  while not ranging_dut.is_uwb_peer_found(peer_addr, session):
105    if time.time() - start_time > timeout:
106      asserts.fail("UWB peer with address %s not found" % peer_addr)
107  logging.info("Peer %s found in %s seconds", peer_addr,
108               round(time.time() - start_time, 2))
109
110
111def set_airplane_mode(ad: android_device.AndroidDevice, state: bool):
112  """Sets the airplane mode to the given state.
113
114  Args:
115    ad: android device object.
116    state: bool, True for Airplane mode on, False for off.
117  """
118  ad.uwb.setAirplaneMode(state)
119  start_time = time.time()
120  while get_airplane_mode(ad) != state:
121    time.sleep(0.5)
122    if time.time() - start_time > WAIT_TIME_SEC:
123      asserts.fail("Failed to set airplane mode to: %s" % state)
124
125
126def get_airplane_mode(ad: android_device.AndroidDevice) -> bool:
127  """Gets the airplane mode.
128
129  Args:
130    ad: android device object.
131
132  Returns:
133    True if airplane mode On, False for Off.
134  """
135  state = ad.adb.shell(["settings", "get", "global", "airplane_mode_on"])
136  return bool(int(state.decode().strip()))
137
138
139def set_screen_rotation(ad: android_device.AndroidDevice, val: int):
140  """Sets screen orientation to landscape or portrait mode.
141
142  Args:
143    ad: android device object.
144    val: False for potrait, True 1 for landscape mode.
145  """
146  ad.adb.shell(["settings", "put", "system", "accelerometer_rotation", "0"])
147  ad.adb.shell(["settings", "put", "system", "user_rotation", str(val)])
148
149
150def initialize_uwb_country_code_if_not_set(
151    ad: android_device.AndroidDevice,
152    handler: Optional[callback_handler_v2.CallbackHandlerV2] = None,
153):
154  """Sets UWB country code to US if the device does not have it set.
155
156  Note: This intentionally relies on an unstable API (shell command) since we
157  don't want to expose an API that allows users to circumvent the UWB
158  regulatory requirements.
159
160  Args:
161    ad: android device object.
162    handler: callback handler.
163  """
164  # Wait to see if UWB state is reported as enabled. If not, this could be
165  # because the country code is not set. Try forcing the country code in that
166  # case.
167  state = verify_uwb_state_callback(
168      ad=ad, uwb_event="Inactive", handler=handler, timeout=120
169  )
170
171  # Country code already available, nothing to do.
172  if state:
173    return
174  try:
175    ad.adb.shell(["cmd", "uwb", "force-country-code", "enabled", "US"])
176  except adb.AdbError:
177    logging.warning("Unable to force country code")
178
179  # Unable to get UWB enabled even after setting country code, abort!
180  asserts.fail(
181      not verify_uwb_state_callback(
182          ad=ad, uwb_event="Inactive", handler=handler, timeout=120
183      ),
184      "Uwb is not enabled",
185  )
186