1import copy
2import dataclasses
3from enum import IntEnum, StrEnum
4from typing import Set
5from uuid import uuid4
6from lib.params import RangingPreference
7from mobly.controllers import android_device
8
9
10CALLBACK_WAIT_TIME_SEC = 5
11
12
13class RangingTechnology(IntEnum):
14  UWB = 0
15  BLE_CS = 1
16  WIFI_RTT = 2
17  BLE_RSSI = 3
18
19
20class Event(StrEnum):
21  OPENED = "OPENED"
22  OPEN_FAILED = "OPEN_FAILED"
23  STARTED = "STARTED"
24  DATA = "DATA"
25  STOPPED = "STOPPED"
26  CLOSED = "CLOSED"
27
28
29class RangingDecorator:
30
31  def __init__(self, ad: android_device.AndroidDevice):
32    """Initialize the ranging device.
33
34    Args:
35        ad: android device object
36    """
37    self.id = str(uuid4())
38    self.ad = ad
39    self._event_handlers = {}
40    self.log = self.ad.log
41    self.uwb_address = None
42    self.bt_addr = None
43
44  def start_ranging_and_assert_opened(
45      self, session_handle: str, preference: RangingPreference
46  ):
47    """Start ranging with the specified preference and wait for onStarted event.
48
49    Throws:
50      CallbackHandlerTimeoutError if ranging does not successfully start.
51    """
52    handler = self.ad.ranging.startRanging(
53        session_handle, dataclasses.asdict(preference)
54    )
55    self._event_handlers[session_handle] = handler
56    self.assert_ranging_event_received(session_handle, Event.OPENED)
57
58  def is_ranging_technology_supported(self, ranging_technology : RangingTechnology) -> bool:
59
60    """Checks whether a specific ranging technology is supported by the device"""
61    return self.ad.ranging.isTechnologySupported(ranging_technology)
62
63
64  def stop_ranging_and_assert_closed(self, session_handle: str):
65    """Stop ranging and wait for onStopped event.
66
67    Throws:
68      CallbackHandlerTimeoutError if ranging was not successfully stopped.
69    """
70    self.ad.ranging.stopRanging(session_handle)
71    self.assert_ranging_event_received(session_handle, Event.CLOSED)
72    self._event_handlers.pop(session_handle)
73
74  def assert_ranging_event_received(
75      self,
76      session_handle: str,
77      event: Event,
78      timeout_s: int = CALLBACK_WAIT_TIME_SEC,
79  ):
80    """Asserts that the expected event is received before a timeout.
81
82    Args:
83      session_handle: identifier for the ranging session.
84      event: expected ranging event.
85      timeout_s: timeout in seconds.
86
87    Throws:
88      CallbackHandlerTimeoutError if the expected event was not received.
89    """
90    handler = self._event_handlers[session_handle]
91    handler.waitAndGet(event, timeout=timeout_s)
92
93  def verify_received_data_from_peer_using_technologies(
94      self,
95      session_handle: str,
96      peer_id: str,
97      technologies: Set[RangingTechnology],
98      timeout_s: int = CALLBACK_WAIT_TIME_SEC,
99  ) -> bool:
100    """Verifies that the peer sends us data from all provided technologies before a timeout.
101
102    Args:
103      session_handle: unique identifier for the ranging session.
104      peer_id: UUID of the peer.
105      technologies: set of ranging technologies over which we want to receive
106        data.
107      timeout_s: timeout in seconds.
108
109    Returns:
110        True if peer was found, False otherwise
111    """
112    remaining_technologies = copy.deepcopy(technologies)
113
114    def predicate(event):
115      peer = event.data["peer"]
116      technology = event.data["technology"]
117
118      if peer == peer_id and technology in copy.deepcopy(
119          remaining_technologies
120      ):
121        remaining_technologies.remove(technology)
122
123      return not bool(remaining_technologies)
124
125    handler = self._event_handlers[session_handle]
126    try:
127      handler.waitForEvent(Event.DATA, predicate, timeout=timeout_s)
128      return True
129    except Exception:
130      return False
131
132  def verify_received_data_from_peer(
133      self,
134      session_handle: str,
135      peer_id: str,
136      timeout_s: int = CALLBACK_WAIT_TIME_SEC,
137  ) -> bool:
138    """Verifies that the peer sends us data using any technology before a timeout.
139
140    Args:
141      session_handle: unique identifier for the ranging session.
142      peer_id: UUID of the peer.
143      timeout_s: timeout in seconds.
144
145    Returns:
146        True if peer received data, False otherwise
147    """
148
149    def predicate(event):
150      return event.data["peer"] == peer_id
151
152    handler = self._event_handlers[session_handle]
153    try:
154      handler.waitForEvent(Event.DATA, predicate, timeout=timeout_s)
155      return True
156    except Exception:
157      return False
158
159  def clear_ranging_sessions(self):
160    """Stop all ranging sessions and clear callback events"""
161    for session_handle in self._event_handlers.keys():
162      self.ad.ranging.stopRanging(session_handle)
163
164    self._event_handlers.clear()
165