1# Copyright (C) 2024 The Android Open Source Project
2# Licensed under the Apache License, Version 2.0 (the "License");
3# you may not use this file except in compliance with the License.
4# You may obtain a copy of the License at
5
6# http://www.apache.org/licenses/LICENSE-2.0
7
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS,
10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11# See the License for the specific language governing permissions and
12# limitations under the License.
13
14import asyncio
15
16from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous
17from bumble.gatt import GATT_HEARING_ACCESS_SERVICE, GATT_AUDIO_STREAM_CONTROL_SERVICE, GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE
18from bumble.profiles import hap
19from bumble.profiles.hap import DynamicPresets, HearingAccessService, HearingAidFeatures, HearingAidType, IndependentPresets, PresetChangedOperation, PresetChangedOperationAvailable, PresetRecord, PresetSynchronizationSupport, WritablePresetsSupport
20
21from pandora_experimental.os_grpc_aio import Os as OsAio
22from pandora_experimental.gatt_grpc_aio import GATT
23from pandora_experimental.hap_grpc_aio import HAP
24from pandora_experimental.hap_pb2 import PresetRecord as grpcPresetRecord  # type: ignore
25from pandora._utils import AioStream
26from pandora.security_pb2 import LE_LEVEL3
27from pandora.host_pb2 import RANDOM, AdvertiseResponse, Connection, DataTypes, ScanningResponse
28from mobly import base_test, signals
29from truth.truth import AssertThat  # type: ignore
30from typing import List, Tuple
31
32COMPLETE_LOCAL_NAME: str = "Bumble"
33HAP_UUID = GATT_HEARING_ACCESS_SERVICE.to_hex_str('-')
34ASCS_UUID = GATT_AUDIO_STREAM_CONTROL_SERVICE.to_hex_str('-')
35PACS_UUID = GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE.to_hex_str('-')
36
37long_name = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
38foo_preset = PresetRecord(1, "foo preset")
39bar_preset = PresetRecord(50, "bar preset")
40longname_preset = PresetRecord(5, f'[{long_name[:38]}]')
41unavailable_preset = PresetRecord(
42    7, "unavailable preset",
43    PresetRecord.Property(PresetRecord.Property.Writable.CANNOT_BE_WRITTEN,
44                          PresetRecord.Property.IsAvailable.IS_UNAVAILABLE))
45
46
47def toBumblePreset(grpc_preset: grpcPresetRecord) -> PresetRecord:
48    return PresetRecord(
49        grpc_preset.index,
50        grpc_preset.name,  # type: ignore
51        PresetRecord.Property(
52            PresetRecord.Property.Writable(grpc_preset.isWritable),  # type: ignore
53            PresetRecord.Property.IsAvailable(grpc_preset.isAvailable)))  # type: ignore
54
55
56def toBumblePresetList(grpc_preset_list: List[grpcPresetRecord]) -> List[PresetRecord]:  # type: ignore
57    return [toBumblePreset(grpc_preset) for grpc_preset in grpc_preset_list]  # type: ignore
58
59
60def get_server_preset_sorted(has: HearingAccessService) -> List[PresetRecord]:
61    return [has.preset_records[key] for key in sorted(has.preset_records.keys())]
62
63
64class HapTest(base_test.BaseTestClass):
65    devices: PandoraDevices
66    dut: PandoraDevice
67    ref_left: BumblePandoraDevice
68    hap_grpc: HAP
69    has: HearingAccessService
70
71    def setup_class(self):
72        self.devices = PandoraDevices(self)
73        dut, ref_left, *_ = self.devices
74
75        if isinstance(dut, BumblePandoraDevice):
76            raise signals.TestAbortClass('DUT Bumble does not support HAP')
77        self.dut = dut
78        if not isinstance(ref_left, BumblePandoraDevice):
79            raise signals.TestAbortClass('Test require Bumble as reference device(s)')
80        self.ref_left = ref_left
81
82    def teardown_class(self):
83        self.devices.stop_all()
84
85    @asynchronous
86    async def setup_test(self) -> None:
87        await asyncio.gather(self.dut.reset(), self.ref_left.reset())
88        self.logcat = OsAio(channel=self.dut.aio.channel)
89        await self.logcat.Log("setup test")
90        self.hap_grpc = HAP(channel=self.dut.aio.channel)
91        device_features = HearingAidFeatures(HearingAidType.MONAURAL_HEARING_AID,
92                                             PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_NOT_SUPPORTED,
93                                             IndependentPresets.IDENTICAL_PRESET_RECORD,
94                                             DynamicPresets.PRESET_RECORDS_DOES_NOT_CHANGE,
95                                             WritablePresetsSupport.WRITABLE_PRESET_RECORDS_SUPPORTED)
96        self.has = HearingAccessService(self.ref_left.device, device_features,
97                                        [foo_preset, bar_preset, longname_preset, unavailable_preset])
98        self.dut_gatt = GATT(channel=self.dut.aio.channel)
99
100        self.ref_left.device.add_service(self.has)  # type: ignore
101
102    async def advertise_hap(self, device: PandoraDevice) -> AioStream[AdvertiseResponse]:
103        return device.aio.host.Advertise(
104            legacy=True,
105            connectable=True,
106            own_address_type=RANDOM,
107            data=DataTypes(
108                complete_local_name=COMPLETE_LOCAL_NAME,
109                incomplete_service_class_uuids16=[HAP_UUID],
110            ),
111        )
112
113    async def dut_scan_for_hap(self) -> ScanningResponse:
114        """
115        DUT starts to scan for the Ref device.
116        :return: ScanningResponse for ASHA
117        """
118        dut_scan = self.dut.aio.host.Scan(RANDOM)  # type: ignore
119        scan_response = await anext((x async for x in dut_scan if HAP_UUID in x.data.incomplete_service_class_uuids16))
120        dut_scan.cancel()
121        return scan_response
122
123    async def dut_connect_to_ref(self, advertisement: AioStream[AdvertiseResponse],
124                                 ref: ScanningResponse) -> Tuple[Connection, Connection]:
125        """
126        Helper method for Dut connects to Ref
127        :return: a Tuple (DUT to REF connection, REF to DUT connection)
128        """
129        (dut_ref_res, ref_dut_res) = await asyncio.gather(
130            self.dut.aio.host.ConnectLE(own_address_type=RANDOM, **ref.address_asdict()),
131            anext(aiter(advertisement)),
132        )
133        AssertThat(dut_ref_res.result_variant()).IsEqualTo('connection')  # type: ignore
134        dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection
135        AssertThat(dut_ref).IsNotNone()  # type: ignore
136        assert dut_ref
137        advertisement.cancel()
138        return dut_ref, ref_dut
139
140    async def setupHapConnection(self):
141        advertisement = await self.advertise_hap(self.ref_left)
142        scan_response = await self.dut_scan_for_hap()
143        dut_connection_to_ref, ref_connection_to_dut = await self.dut_connect_to_ref(advertisement, scan_response)
144
145        await self.dut_gatt.ExchangeMTU(mtu=512, connection=dut_connection_to_ref)
146
147        (secure, wait_security) = await asyncio.gather(
148            self.dut.aio.security.Secure(connection=dut_connection_to_ref, le=LE_LEVEL3),
149            self.ref_left.aio.security.WaitSecurity(connection=ref_connection_to_dut, le=LE_LEVEL3),
150        )
151
152        AssertThat(secure.result_variant()).IsEqualTo('success')  # type: ignore
153        AssertThat(wait_security.result_variant()).IsEqualTo('success')  # type: ignore
154
155        await self.hap_grpc.WaitPeripheral(connection=dut_connection_to_ref)  # type: ignore
156        advertisement.cancel()
157
158        return dut_connection_to_ref
159
160    async def assertIdenticalPreset(self, dut_connection_to_ref: Connection) -> None:
161        remote_preset = toBumblePresetList(
162            (await self.hap_grpc.GetAllPresetRecords(connection=dut_connection_to_ref)).preset_record_list)
163        AssertThat(remote_preset).ContainsExactlyElementsIn(  # type: ignore
164            get_server_preset_sorted(self.has)).InOrder()  # type: ignore
165
166    async def verify_no_crash(self, dut_connection_to_ref: Connection) -> None:
167        ''' Periodically check that there is no android crash '''
168        for __i__ in range(10):
169            await asyncio.sleep(.3)
170            await self.assertIdenticalPreset(dut_connection_to_ref)
171
172    @asynchronous
173    async def test_get_features(self) -> None:
174        await self.logcat.Log("test_get_features")
175        dut_connection_to_ref = await self.setupHapConnection()
176
177        features = hap.HearingAidFeatures_from_bytes(
178            (await self.hap_grpc.GetFeatures(connection=dut_connection_to_ref)).features)
179        AssertThat(features).IsEqualTo(self.has.server_features)  # type: ignore
180
181    @asynchronous
182    async def test_get_preset(self) -> None:
183        await self.logcat.Log("test_get_preset")
184        dut_connection_to_ref = await self.setupHapConnection()
185
186        await self.assertIdenticalPreset(dut_connection_to_ref)
187
188    @asynchronous
189    async def test_preset__remove_preset__verify_dut_is_updated(self) -> None:
190        await self.logcat.Log("test_preset__remove_preset__verify_dut_is_updated")
191        dut_connection_to_ref = await self.setupHapConnection()
192
193        await self.assertIdenticalPreset(dut_connection_to_ref)
194
195        await self.logcat.Log("Remove preset in server")
196        await self.has.delete_preset(unavailable_preset.index)
197        await asyncio.sleep(1)  # wait event
198
199        await self.assertIdenticalPreset(dut_connection_to_ref)
200
201    @asynchronous
202    async def test__add_preset__verify_dut_is_updated(self) -> None:
203        await self.logcat.Log("test__add_preset__verify_dut_is_updated")
204        dut_connection_to_ref = await self.setupHapConnection()
205
206        await self.assertIdenticalPreset(dut_connection_to_ref)
207
208        added_preset = PresetRecord(bar_preset.index + 3, "added_preset")
209        self.has.preset_records[added_preset.index] = added_preset
210
211        await self.logcat.Log("Preset added in server. Notify now")
212        await self.has.generic_update(
213            PresetChangedOperation(PresetChangedOperation.ChangeId.GENERIC_UPDATE,
214                                   PresetChangedOperation.Generic(bar_preset.index, added_preset)))
215        await asyncio.sleep(1)  # wait event
216
217        await self.assertIdenticalPreset(dut_connection_to_ref)
218
219    @asynchronous
220    async def test__set_non_existing_preset_as_active__verify_no_crash_and_no_update(self) -> None:
221        await self.logcat.Log("test__set_non_existing_preset_as_active__verify_no_crash_and_no_update")
222        non_existing_preset_index = 79
223        AssertThat(non_existing_preset_index).IsNotIn(self.has.preset_records.keys())  # type: ignore
224        dut_connection_to_ref = await self.setupHapConnection()
225        AssertThat(
226            toBumblePreset(  # type: ignore
227                (await self.hap_grpc.GetActivePresetRecord(connection=dut_connection_to_ref
228                                                          )).preset_record)).IsEqualTo(foo_preset)
229
230        await self.logcat.Log("Notify active update to non existing index")
231        # bypass the set_active_preset checks by sending an invalid index on purpose
232        self.has.active_preset_index = non_existing_preset_index
233        await self.has.notify_active_preset()
234
235        await self.verify_no_crash(dut_connection_to_ref)
236        AssertThat(
237            toBumblePreset(  # type: ignore
238                (await self.hap_grpc.GetActivePresetRecord(connection=dut_connection_to_ref
239                                                          )).preset_record)).IsEqualTo(foo_preset)
240
241    @asynchronous
242    async def test__set_non_existing_preset_as_available__verify_no_crash_and_no_update(self) -> None:
243        await self.logcat.Log("test__set_non_existing_preset_as_available__verify_no_crash_and_no_update")
244        non_existing_preset_index = 79
245        AssertThat(non_existing_preset_index).IsNotIn(self.has.preset_records.keys())  # type: ignore
246        dut_connection_to_ref = await self.setupHapConnection()
247
248        await self.logcat.Log("Notify available preset to non existing index")
249        await self.has.generic_update(PresetChangedOperationAvailable(non_existing_preset_index))
250
251        await self.verify_no_crash(dut_connection_to_ref)
252