1import copy 2import dataclasses 3from enum import IntEnum, StrEnum 4from typing import Set 5from uuid import uuid4 6from lib.params import RangingPreference 7from mobly.controllers import android_device 8 9 10CALLBACK_WAIT_TIME_SEC = 5 11 12 13class RangingTechnology(IntEnum): 14 UWB = 0 15 BLE_CS = 1 16 WIFI_RTT = 2 17 BLE_RSSI = 3 18 19 20class Event(StrEnum): 21 OPENED = "OPENED" 22 OPEN_FAILED = "OPEN_FAILED" 23 STARTED = "STARTED" 24 DATA = "DATA" 25 STOPPED = "STOPPED" 26 CLOSED = "CLOSED" 27 28 29class RangingDecorator: 30 31 def __init__(self, ad: android_device.AndroidDevice): 32 """Initialize the ranging device. 33 34 Args: 35 ad: android device object 36 """ 37 self.id = str(uuid4()) 38 self.ad = ad 39 self._event_handlers = {} 40 self.log = self.ad.log 41 self.uwb_address = None 42 self.bt_addr = None 43 44 def start_ranging_and_assert_opened( 45 self, session_handle: str, preference: RangingPreference 46 ): 47 """Start ranging with the specified preference and wait for onStarted event. 48 49 Throws: 50 CallbackHandlerTimeoutError if ranging does not successfully start. 51 """ 52 handler = self.ad.ranging.startRanging( 53 session_handle, dataclasses.asdict(preference) 54 ) 55 self._event_handlers[session_handle] = handler 56 self.assert_ranging_event_received(session_handle, Event.OPENED) 57 58 def is_ranging_technology_supported(self, ranging_technology : RangingTechnology) -> bool: 59 60 """Checks whether a specific ranging technology is supported by the device""" 61 return self.ad.ranging.isTechnologySupported(ranging_technology) 62 63 64 def stop_ranging_and_assert_closed(self, session_handle: str): 65 """Stop ranging and wait for onStopped event. 66 67 Throws: 68 CallbackHandlerTimeoutError if ranging was not successfully stopped. 69 """ 70 self.ad.ranging.stopRanging(session_handle) 71 self.assert_ranging_event_received(session_handle, Event.CLOSED) 72 self._event_handlers.pop(session_handle) 73 74 def assert_ranging_event_received( 75 self, 76 session_handle: str, 77 event: Event, 78 timeout_s: int = CALLBACK_WAIT_TIME_SEC, 79 ): 80 """Asserts that the expected event is received before a timeout. 81 82 Args: 83 session_handle: identifier for the ranging session. 84 event: expected ranging event. 85 timeout_s: timeout in seconds. 86 87 Throws: 88 CallbackHandlerTimeoutError if the expected event was not received. 89 """ 90 handler = self._event_handlers[session_handle] 91 handler.waitAndGet(event, timeout=timeout_s) 92 93 def verify_received_data_from_peer_using_technologies( 94 self, 95 session_handle: str, 96 peer_id: str, 97 technologies: Set[RangingTechnology], 98 timeout_s: int = CALLBACK_WAIT_TIME_SEC, 99 ) -> bool: 100 """Verifies that the peer sends us data from all provided technologies before a timeout. 101 102 Args: 103 session_handle: unique identifier for the ranging session. 104 peer_id: UUID of the peer. 105 technologies: set of ranging technologies over which we want to receive 106 data. 107 timeout_s: timeout in seconds. 108 109 Returns: 110 True if peer was found, False otherwise 111 """ 112 remaining_technologies = copy.deepcopy(technologies) 113 114 def predicate(event): 115 peer = event.data["peer"] 116 technology = event.data["technology"] 117 118 if peer == peer_id and technology in copy.deepcopy( 119 remaining_technologies 120 ): 121 remaining_technologies.remove(technology) 122 123 return not bool(remaining_technologies) 124 125 handler = self._event_handlers[session_handle] 126 try: 127 handler.waitForEvent(Event.DATA, predicate, timeout=timeout_s) 128 return True 129 except Exception: 130 return False 131 132 def verify_received_data_from_peer( 133 self, 134 session_handle: str, 135 peer_id: str, 136 timeout_s: int = CALLBACK_WAIT_TIME_SEC, 137 ) -> bool: 138 """Verifies that the peer sends us data using any technology before a timeout. 139 140 Args: 141 session_handle: unique identifier for the ranging session. 142 peer_id: UUID of the peer. 143 timeout_s: timeout in seconds. 144 145 Returns: 146 True if peer received data, False otherwise 147 """ 148 149 def predicate(event): 150 return event.data["peer"] == peer_id 151 152 handler = self._event_handlers[session_handle] 153 try: 154 handler.waitForEvent(Event.DATA, predicate, timeout=timeout_s) 155 return True 156 except Exception: 157 return False 158 159 def clear_ranging_sessions(self): 160 """Stop all ranging sessions and clear callback events""" 161 for session_handle in self._event_handlers.keys(): 162 self.ad.ranging.stopRanging(session_handle) 163 164 self._event_handlers.clear() 165