1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import avatar 17import enum 18import grpc 19import itertools 20import logging 21import random 22 23from avatar import BumblePandoraDevice 24from avatar import PandoraDevice 25from avatar import PandoraDevices 26from mobly import base_test 27from mobly import test_runner 28from mobly.asserts import assert_equal # type: ignore 29from mobly.asserts import assert_false # type: ignore 30from mobly.asserts import assert_is_not_none # type: ignore 31from mobly.asserts import assert_true # type: ignore 32from mobly.asserts import explicit_pass # type: ignore 33from pandora.host_pb2 import PUBLIC 34from pandora.host_pb2 import RANDOM 35from pandora.host_pb2 import Connection 36from pandora.host_pb2 import DataTypes 37from pandora.host_pb2 import OwnAddressType 38from typing import Any, Dict, Literal, Optional, Union 39 40 41class AdvertisingEventProperties(enum.IntEnum): 42 ADV_IND = 0x13 43 ADV_DIRECT_IND = 0x15 44 ADV_SCAN_IND = 0x12 45 ADV_NONCONN_IND = 0x10 46 47 CONNECTABLE = 0x01 48 SCANNABLE = 0x02 49 DIRECTED = 0x04 50 LEGACY = 0x10 51 ANONYMOUS = 0x20 52 53 54class LeHostTest(base_test.BaseTestClass): # type: ignore[misc] 55 devices: Optional[PandoraDevices] = None 56 57 # pandora devices. 58 dut: PandoraDevice 59 ref: PandoraDevice 60 61 def setup_class(self) -> None: 62 self.devices = PandoraDevices(self) 63 self.dut, self.ref, *_ = self.devices 64 65 # Enable BR/EDR mode for Bumble devices. 66 for device in self.devices: 67 if isinstance(device, BumblePandoraDevice): 68 device.config.setdefault('classic_enabled', True) 69 70 def teardown_class(self) -> None: 71 if self.devices: 72 self.devices.stop_all() 73 74 @avatar.asynchronous 75 async def setup_test(self) -> None: # pytype: disable=wrong-arg-types 76 await asyncio.gather(self.dut.reset(), self.ref.reset()) 77 78 @avatar.parameterized( 79 *itertools.product( 80 ('connectable', 'non_connectable'), 81 ('scannable', 'non_scannable'), 82 ('directed', 'undirected'), 83 (0, 31), 84 ) 85 ) # type: ignore[misc] 86 def test_scan( 87 self, 88 connectable: Union[Literal['connectable'], Literal['non_connectable']], 89 scannable: Union[Literal['scannable'], Literal['non_scannable']], 90 directed: Union[Literal['directed'], Literal['undirected']], 91 data_len: int, 92 ) -> None: 93 ''' 94 Advertise from the REF device with the specified legacy advertising 95 event properties. Use the manufacturer specific data to pad the advertising data to the 96 desired length. The scan response data must always be provided when 97 scannable but it is defaulted. 98 ''' 99 man_specific_data_length = max(0, data_len - 5) # Flags (3) + LV (2) 100 man_specific_data = bytes([random.randint(1, 255) for _ in range(man_specific_data_length)]) 101 data = DataTypes(manufacturer_specific_data=man_specific_data) if data_len > 0 else None 102 103 is_connectable = True if connectable == 'connectable' else False 104 scan_response_data = DataTypes() if scannable == 'scannable' else None 105 target = self.dut.address if directed == 'directed' else None 106 107 advertise = self.ref.host.Advertise( 108 legacy=True, 109 connectable=is_connectable, 110 data=data, # type: ignore[arg-type] 111 scan_response_data=scan_response_data, # type: ignore[arg-type] 112 public=target, 113 own_address_type=PUBLIC, 114 ) 115 116 scan = self.dut.host.Scan(legacy=False, passive=False, timeout=5.0) 117 report = next((x for x in scan if x.public == self.ref.address)) 118 try: 119 report = next((x for x in scan if x.public == self.ref.address)) 120 121 # TODO: scannable is not set by the android server 122 # TODO: direct_address is not set by the android server 123 assert_true(report.legacy, msg='expected legacy advertising report') 124 assert_equal(report.connectable, is_connectable or directed == 'directed') 125 assert_equal( 126 report.data.manufacturer_specific_data, man_specific_data if directed == 'undirected' else b'' 127 ) 128 assert_false(report.truncated, msg='expected non-truncated advertising report') 129 except grpc.aio.AioRpcError as e: 130 if ( 131 e.code() == grpc.StatusCode.DEADLINE_EXCEEDED 132 and scannable == 'non_scannable' 133 and directed == 'undirected' 134 ): 135 explicit_pass('') 136 raise e 137 finally: 138 scan.cancel() 139 advertise.cancel() 140 141 @avatar.parameterized( 142 (dict(incomplete_service_class_uuids16=["183A", "181F"]),), 143 (dict(incomplete_service_class_uuids32=["FFFF183A", "FFFF181F"]),), 144 (dict(incomplete_service_class_uuids128=["FFFF181F-FFFF-1000-8000-00805F9B34FB"]),), 145 (dict(shortened_local_name="avatar"),), 146 (dict(complete_local_name="avatar_the_last_test_blender"),), 147 (dict(tx_power_level=20),), 148 (dict(class_of_device=0x40680),), 149 (dict(peripheral_connection_interval_min=0x0006, peripheral_connection_interval_max=0x0C80),), 150 (dict(service_solicitation_uuids16=["183A", "181F"]),), 151 (dict(service_solicitation_uuids32=["FFFF183A", "FFFF181F"]),), 152 (dict(service_solicitation_uuids128=["FFFF183A-FFFF-1000-8000-00805F9B34FB"]),), 153 (dict(service_data_uuid16={"183A": bytes([1, 2, 3, 4])}),), 154 (dict(service_data_uuid32={"FFFF183A": bytes([1, 2, 3, 4])}),), 155 (dict(service_data_uuid128={"FFFF181F-FFFF-1000-8000-00805F9B34FB": bytes([1, 2, 3, 4])}),), 156 (dict(appearance=0x0591),), 157 (dict(advertising_interval=0x1000),), 158 (dict(uri="https://www.google.com"),), 159 (dict(le_supported_features=bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x10, 0x9F])),), 160 (dict(manufacturer_specific_data=bytes([0, 1, 2, 3, 4])),), 161 ) # type: ignore[misc] 162 def test_scan_response_data(self, data: Dict[str, Any]) -> None: 163 ''' 164 Advertise from the REF device with the specified advertising data. 165 Validate that the REF generates the correct advertising data, 166 and that the dut presents the correct advertising data in the scan 167 result. 168 ''' 169 advertise = self.ref.host.Advertise( 170 legacy=True, 171 connectable=True, 172 data=DataTypes(**data), 173 own_address_type=PUBLIC, 174 ) 175 176 scan = self.dut.host.Scan(legacy=False, passive=False) 177 report = next((x for x in scan if x.public == self.ref.address)) 178 179 scan.cancel() 180 advertise.cancel() 181 182 assert_true(report.legacy, msg='expected legacy advertising report') 183 assert_equal(report.connectable, True) 184 for key, value in data.items(): 185 assert_equal(getattr(report.data, key), value) # type: ignore[misc] 186 assert_false(report.truncated, msg='expected non-truncated advertising report') 187 188 @avatar.parameterized( 189 (RANDOM,), 190 (PUBLIC,), 191 ) # type: ignore[misc] 192 @avatar.asynchronous 193 async def test_connect(self, ref_address_type: OwnAddressType) -> None: 194 advertise = self.ref.aio.host.Advertise( 195 legacy=True, 196 connectable=True, 197 own_address_type=ref_address_type, 198 data=DataTypes(manufacturer_specific_data=b'pause cafe'), 199 ) 200 201 scan = self.dut.aio.host.Scan(own_address_type=RANDOM) 202 ref = await anext((x async for x in scan if x.data.manufacturer_specific_data == b'pause cafe')) 203 scan.cancel() 204 205 ref_dut_res, dut_ref_res = await asyncio.gather( 206 anext(aiter(advertise)), 207 self.dut.aio.host.ConnectLE(**ref.address_asdict(), own_address_type=RANDOM), 208 ) 209 assert_equal(dut_ref_res.result_variant(), 'connection') 210 dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection 211 assert_is_not_none(dut_ref) 212 assert dut_ref 213 advertise.cancel() 214 assert_true(await self.is_connected(self.ref, ref_dut), "") 215 216 @avatar.parameterized( 217 (RANDOM,), 218 (PUBLIC,), 219 ) # type: ignore[misc] 220 @avatar.asynchronous 221 async def test_disconnect(self, ref_address_type: OwnAddressType) -> None: 222 advertise = self.ref.aio.host.Advertise( 223 legacy=True, 224 connectable=True, 225 own_address_type=ref_address_type, 226 data=DataTypes(manufacturer_specific_data=b'pause cafe'), 227 ) 228 229 scan = self.dut.aio.host.Scan(own_address_type=RANDOM) 230 ref = await anext((x async for x in scan if x.data.manufacturer_specific_data == b'pause cafe')) 231 scan.cancel() 232 233 ref_dut_res, dut_ref_res = await asyncio.gather( 234 anext(aiter(advertise)), 235 self.dut.aio.host.ConnectLE(**ref.address_asdict(), own_address_type=RANDOM), 236 ) 237 assert_equal(dut_ref_res.result_variant(), 'connection') 238 dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection 239 assert_is_not_none(dut_ref) 240 assert dut_ref 241 advertise.cancel() 242 assert_true(await self.is_connected(self.ref, ref_dut), "") 243 await self.dut.aio.host.Disconnect(connection=dut_ref) 244 assert_false(await self.is_connected(self.ref, ref_dut), "") 245 246 async def is_connected(self, device: PandoraDevice, connection: Connection) -> bool: 247 try: 248 await device.aio.host.WaitDisconnection(connection=connection, timeout=5) 249 return False 250 except grpc.RpcError as e: 251 assert_equal(e.code(), grpc.StatusCode.DEADLINE_EXCEEDED) # type: ignore 252 return True 253 254 255if __name__ == '__main__': 256 logging.basicConfig(level=logging.DEBUG) 257 test_runner.main() # type: ignore 258