1# Copyright (C) 2024 The Android Open Source Project 2# Licensed under the Apache License, Version 2.0 (the "License"); 3# you may not use this file except in compliance with the License. 4# You may obtain a copy of the License at 5 6# http://www.apache.org/licenses/LICENSE-2.0 7 8# Unless required by applicable law or agreed to in writing, software 9# distributed under the License is distributed on an "AS IS" BASIS, 10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11# See the License for the specific language governing permissions and 12# limitations under the License. 13 14import asyncio 15 16from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous 17from bumble.gatt import GATT_HEARING_ACCESS_SERVICE, GATT_AUDIO_STREAM_CONTROL_SERVICE, GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE 18from bumble.profiles import hap 19from bumble.profiles.hap import DynamicPresets, HearingAccessService, HearingAidFeatures, HearingAidType, IndependentPresets, PresetChangedOperation, PresetChangedOperationAvailable, PresetRecord, PresetSynchronizationSupport, WritablePresetsSupport 20 21from pandora_experimental.os_grpc_aio import Os as OsAio 22from pandora_experimental.gatt_grpc_aio import GATT 23from pandora_experimental.hap_grpc_aio import HAP 24from pandora_experimental.hap_pb2 import PresetRecord as grpcPresetRecord # type: ignore 25from pandora._utils import AioStream 26from pandora.security_pb2 import LE_LEVEL3 27from pandora.host_pb2 import RANDOM, AdvertiseResponse, Connection, DataTypes, ScanningResponse 28from mobly import base_test, signals 29from truth.truth import AssertThat # type: ignore 30from typing import List, Tuple 31 32COMPLETE_LOCAL_NAME: str = "Bumble" 33HAP_UUID = GATT_HEARING_ACCESS_SERVICE.to_hex_str('-') 34ASCS_UUID = GATT_AUDIO_STREAM_CONTROL_SERVICE.to_hex_str('-') 35PACS_UUID = GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE.to_hex_str('-') 36 37long_name = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua." 38foo_preset = PresetRecord(1, "foo preset") 39bar_preset = PresetRecord(50, "bar preset") 40longname_preset = PresetRecord(5, f'[{long_name[:38]}]') 41unavailable_preset = PresetRecord( 42 7, "unavailable preset", 43 PresetRecord.Property(PresetRecord.Property.Writable.CANNOT_BE_WRITTEN, 44 PresetRecord.Property.IsAvailable.IS_UNAVAILABLE)) 45 46 47def toBumblePreset(grpc_preset: grpcPresetRecord) -> PresetRecord: 48 return PresetRecord( 49 grpc_preset.index, 50 grpc_preset.name, # type: ignore 51 PresetRecord.Property( 52 PresetRecord.Property.Writable(grpc_preset.isWritable), # type: ignore 53 PresetRecord.Property.IsAvailable(grpc_preset.isAvailable))) # type: ignore 54 55 56def toBumblePresetList(grpc_preset_list: List[grpcPresetRecord]) -> List[PresetRecord]: # type: ignore 57 return [toBumblePreset(grpc_preset) for grpc_preset in grpc_preset_list] # type: ignore 58 59 60def get_server_preset_sorted(has: HearingAccessService) -> List[PresetRecord]: 61 return [has.preset_records[key] for key in sorted(has.preset_records.keys())] 62 63 64class HapTest(base_test.BaseTestClass): 65 devices: PandoraDevices 66 dut: PandoraDevice 67 ref_left: BumblePandoraDevice 68 hap_grpc: HAP 69 has: HearingAccessService 70 71 def setup_class(self): 72 self.devices = PandoraDevices(self) 73 dut, ref_left, *_ = self.devices 74 75 if isinstance(dut, BumblePandoraDevice): 76 raise signals.TestAbortClass('DUT Bumble does not support HAP') 77 self.dut = dut 78 if not isinstance(ref_left, BumblePandoraDevice): 79 raise signals.TestAbortClass('Test require Bumble as reference device(s)') 80 self.ref_left = ref_left 81 82 def teardown_class(self): 83 self.devices.stop_all() 84 85 @asynchronous 86 async def setup_test(self) -> None: 87 await asyncio.gather(self.dut.reset(), self.ref_left.reset()) 88 self.logcat = OsAio(channel=self.dut.aio.channel) 89 await self.logcat.Log("setup test") 90 self.hap_grpc = HAP(channel=self.dut.aio.channel) 91 device_features = HearingAidFeatures(HearingAidType.MONAURAL_HEARING_AID, 92 PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_NOT_SUPPORTED, 93 IndependentPresets.IDENTICAL_PRESET_RECORD, 94 DynamicPresets.PRESET_RECORDS_DOES_NOT_CHANGE, 95 WritablePresetsSupport.WRITABLE_PRESET_RECORDS_SUPPORTED) 96 self.has = HearingAccessService(self.ref_left.device, device_features, 97 [foo_preset, bar_preset, longname_preset, unavailable_preset]) 98 self.dut_gatt = GATT(channel=self.dut.aio.channel) 99 100 self.ref_left.device.add_service(self.has) # type: ignore 101 102 async def advertise_hap(self, device: PandoraDevice) -> AioStream[AdvertiseResponse]: 103 return device.aio.host.Advertise( 104 legacy=True, 105 connectable=True, 106 own_address_type=RANDOM, 107 data=DataTypes( 108 complete_local_name=COMPLETE_LOCAL_NAME, 109 incomplete_service_class_uuids16=[HAP_UUID], 110 ), 111 ) 112 113 async def dut_scan_for_hap(self) -> ScanningResponse: 114 """ 115 DUT starts to scan for the Ref device. 116 :return: ScanningResponse for ASHA 117 """ 118 dut_scan = self.dut.aio.host.Scan(RANDOM) # type: ignore 119 scan_response = await anext((x async for x in dut_scan if HAP_UUID in x.data.incomplete_service_class_uuids16)) 120 dut_scan.cancel() 121 return scan_response 122 123 async def dut_connect_to_ref(self, advertisement: AioStream[AdvertiseResponse], 124 ref: ScanningResponse) -> Tuple[Connection, Connection]: 125 """ 126 Helper method for Dut connects to Ref 127 :return: a Tuple (DUT to REF connection, REF to DUT connection) 128 """ 129 (dut_ref_res, ref_dut_res) = await asyncio.gather( 130 self.dut.aio.host.ConnectLE(own_address_type=RANDOM, **ref.address_asdict()), 131 anext(aiter(advertisement)), 132 ) 133 AssertThat(dut_ref_res.result_variant()).IsEqualTo('connection') # type: ignore 134 dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection 135 AssertThat(dut_ref).IsNotNone() # type: ignore 136 assert dut_ref 137 advertisement.cancel() 138 return dut_ref, ref_dut 139 140 async def setupHapConnection(self): 141 advertisement = await self.advertise_hap(self.ref_left) 142 scan_response = await self.dut_scan_for_hap() 143 dut_connection_to_ref, ref_connection_to_dut = await self.dut_connect_to_ref(advertisement, scan_response) 144 145 await self.dut_gatt.ExchangeMTU(mtu=512, connection=dut_connection_to_ref) 146 147 (secure, wait_security) = await asyncio.gather( 148 self.dut.aio.security.Secure(connection=dut_connection_to_ref, le=LE_LEVEL3), 149 self.ref_left.aio.security.WaitSecurity(connection=ref_connection_to_dut, le=LE_LEVEL3), 150 ) 151 152 AssertThat(secure.result_variant()).IsEqualTo('success') # type: ignore 153 AssertThat(wait_security.result_variant()).IsEqualTo('success') # type: ignore 154 155 await self.hap_grpc.WaitPeripheral(connection=dut_connection_to_ref) # type: ignore 156 advertisement.cancel() 157 158 return dut_connection_to_ref 159 160 async def assertIdenticalPreset(self, dut_connection_to_ref: Connection) -> None: 161 remote_preset = toBumblePresetList( 162 (await self.hap_grpc.GetAllPresetRecords(connection=dut_connection_to_ref)).preset_record_list) 163 AssertThat(remote_preset).ContainsExactlyElementsIn( # type: ignore 164 get_server_preset_sorted(self.has)).InOrder() # type: ignore 165 166 async def verify_no_crash(self, dut_connection_to_ref: Connection) -> None: 167 ''' Periodically check that there is no android crash ''' 168 for __i__ in range(10): 169 await asyncio.sleep(.3) 170 await self.assertIdenticalPreset(dut_connection_to_ref) 171 172 @asynchronous 173 async def test_get_features(self) -> None: 174 await self.logcat.Log("test_get_features") 175 dut_connection_to_ref = await self.setupHapConnection() 176 177 features = hap.HearingAidFeatures_from_bytes( 178 (await self.hap_grpc.GetFeatures(connection=dut_connection_to_ref)).features) 179 AssertThat(features).IsEqualTo(self.has.server_features) # type: ignore 180 181 @asynchronous 182 async def test_get_preset(self) -> None: 183 await self.logcat.Log("test_get_preset") 184 dut_connection_to_ref = await self.setupHapConnection() 185 186 await self.assertIdenticalPreset(dut_connection_to_ref) 187 188 @asynchronous 189 async def test_preset__remove_preset__verify_dut_is_updated(self) -> None: 190 await self.logcat.Log("test_preset__remove_preset__verify_dut_is_updated") 191 dut_connection_to_ref = await self.setupHapConnection() 192 193 await self.assertIdenticalPreset(dut_connection_to_ref) 194 195 await self.logcat.Log("Remove preset in server") 196 await self.has.delete_preset(unavailable_preset.index) 197 await asyncio.sleep(1) # wait event 198 199 await self.assertIdenticalPreset(dut_connection_to_ref) 200 201 @asynchronous 202 async def test__add_preset__verify_dut_is_updated(self) -> None: 203 await self.logcat.Log("test__add_preset__verify_dut_is_updated") 204 dut_connection_to_ref = await self.setupHapConnection() 205 206 await self.assertIdenticalPreset(dut_connection_to_ref) 207 208 added_preset = PresetRecord(bar_preset.index + 3, "added_preset") 209 self.has.preset_records[added_preset.index] = added_preset 210 211 await self.logcat.Log("Preset added in server. Notify now") 212 await self.has.generic_update( 213 PresetChangedOperation(PresetChangedOperation.ChangeId.GENERIC_UPDATE, 214 PresetChangedOperation.Generic(bar_preset.index, added_preset))) 215 await asyncio.sleep(1) # wait event 216 217 await self.assertIdenticalPreset(dut_connection_to_ref) 218 219 @asynchronous 220 async def test__set_non_existing_preset_as_active__verify_no_crash_and_no_update(self) -> None: 221 await self.logcat.Log("test__set_non_existing_preset_as_active__verify_no_crash_and_no_update") 222 non_existing_preset_index = 79 223 AssertThat(non_existing_preset_index).IsNotIn(self.has.preset_records.keys()) # type: ignore 224 dut_connection_to_ref = await self.setupHapConnection() 225 AssertThat( 226 toBumblePreset( # type: ignore 227 (await self.hap_grpc.GetActivePresetRecord(connection=dut_connection_to_ref 228 )).preset_record)).IsEqualTo(foo_preset) 229 230 await self.logcat.Log("Notify active update to non existing index") 231 # bypass the set_active_preset checks by sending an invalid index on purpose 232 self.has.active_preset_index = non_existing_preset_index 233 await self.has.notify_active_preset() 234 235 await self.verify_no_crash(dut_connection_to_ref) 236 AssertThat( 237 toBumblePreset( # type: ignore 238 (await self.hap_grpc.GetActivePresetRecord(connection=dut_connection_to_ref 239 )).preset_record)).IsEqualTo(foo_preset) 240 241 @asynchronous 242 async def test__set_non_existing_preset_as_available__verify_no_crash_and_no_update(self) -> None: 243 await self.logcat.Log("test__set_non_existing_preset_as_available__verify_no_crash_and_no_update") 244 non_existing_preset_index = 79 245 AssertThat(non_existing_preset_index).IsNotIn(self.has.preset_records.keys()) # type: ignore 246 dut_connection_to_ref = await self.setupHapConnection() 247 248 await self.logcat.Log("Notify available preset to non existing index") 249 await self.has.generic_update(PresetChangedOperationAvailable(non_existing_preset_index)) 250 251 await self.verify_no_crash(dut_connection_to_ref) 252