1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import avatar
17import enum
18import grpc
19import itertools
20import logging
21import random
22
23from avatar import BumblePandoraDevice
24from avatar import PandoraDevice
25from avatar import PandoraDevices
26from mobly import base_test
27from mobly import test_runner
28from mobly.asserts import assert_equal  # type: ignore
29from mobly.asserts import assert_false  # type: ignore
30from mobly.asserts import assert_is_not_none  # type: ignore
31from mobly.asserts import assert_true  # type: ignore
32from mobly.asserts import explicit_pass  # type: ignore
33from pandora.host_pb2 import PUBLIC
34from pandora.host_pb2 import RANDOM
35from pandora.host_pb2 import Connection
36from pandora.host_pb2 import DataTypes
37from pandora.host_pb2 import OwnAddressType
38from typing import Any, Dict, Literal, Optional, Union
39
40
41class AdvertisingEventProperties(enum.IntEnum):
42    ADV_IND = 0x13
43    ADV_DIRECT_IND = 0x15
44    ADV_SCAN_IND = 0x12
45    ADV_NONCONN_IND = 0x10
46
47    CONNECTABLE = 0x01
48    SCANNABLE = 0x02
49    DIRECTED = 0x04
50    LEGACY = 0x10
51    ANONYMOUS = 0x20
52
53
54class LeHostTest(base_test.BaseTestClass):  # type: ignore[misc]
55    devices: Optional[PandoraDevices] = None
56
57    # pandora devices.
58    dut: PandoraDevice
59    ref: PandoraDevice
60
61    def setup_class(self) -> None:
62        self.devices = PandoraDevices(self)
63        self.dut, self.ref, *_ = self.devices
64
65        # Enable BR/EDR mode for Bumble devices.
66        for device in self.devices:
67            if isinstance(device, BumblePandoraDevice):
68                device.config.setdefault('classic_enabled', True)
69
70    def teardown_class(self) -> None:
71        if self.devices:
72            self.devices.stop_all()
73
74    @avatar.asynchronous
75    async def setup_test(self) -> None:  # pytype: disable=wrong-arg-types
76        await asyncio.gather(self.dut.reset(), self.ref.reset())
77
78    @avatar.parameterized(
79        *itertools.product(
80            ('connectable', 'non_connectable'),
81            ('scannable', 'non_scannable'),
82            ('directed', 'undirected'),
83            (0, 31),
84        )
85    )  # type: ignore[misc]
86    def test_scan(
87        self,
88        connectable: Union[Literal['connectable'], Literal['non_connectable']],
89        scannable: Union[Literal['scannable'], Literal['non_scannable']],
90        directed: Union[Literal['directed'], Literal['undirected']],
91        data_len: int,
92    ) -> None:
93        '''
94        Advertise from the REF device with the specified legacy advertising
95        event properties. Use the manufacturer specific data to pad the advertising data to the
96        desired length. The scan response data must always be provided when
97        scannable but it is defaulted.
98        '''
99        man_specific_data_length = max(0, data_len - 5)  # Flags (3) + LV (2)
100        man_specific_data = bytes([random.randint(1, 255) for _ in range(man_specific_data_length)])
101        data = DataTypes(manufacturer_specific_data=man_specific_data) if data_len > 0 else None
102
103        is_connectable = True if connectable == 'connectable' else False
104        scan_response_data = DataTypes() if scannable == 'scannable' else None
105        target = self.dut.address if directed == 'directed' else None
106
107        advertise = self.ref.host.Advertise(
108            legacy=True,
109            connectable=is_connectable,
110            data=data,  # type: ignore[arg-type]
111            scan_response_data=scan_response_data,  # type: ignore[arg-type]
112            public=target,
113            own_address_type=PUBLIC,
114        )
115
116        scan = self.dut.host.Scan(legacy=False, passive=False, timeout=5.0)
117        report = next((x for x in scan if x.public == self.ref.address))
118        try:
119            report = next((x for x in scan if x.public == self.ref.address))
120
121            # TODO: scannable is not set by the android server
122            # TODO: direct_address is not set by the android server
123            assert_true(report.legacy, msg='expected legacy advertising report')
124            assert_equal(report.connectable, is_connectable or directed == 'directed')
125            assert_equal(
126                report.data.manufacturer_specific_data, man_specific_data if directed == 'undirected' else b''
127            )
128            assert_false(report.truncated, msg='expected non-truncated advertising report')
129        except grpc.aio.AioRpcError as e:
130            if (
131                e.code() == grpc.StatusCode.DEADLINE_EXCEEDED
132                and scannable == 'non_scannable'
133                and directed == 'undirected'
134            ):
135                explicit_pass('')
136            raise e
137        finally:
138            scan.cancel()
139            advertise.cancel()
140
141    @avatar.parameterized(
142        (dict(incomplete_service_class_uuids16=["183A", "181F"]),),
143        (dict(incomplete_service_class_uuids32=["FFFF183A", "FFFF181F"]),),
144        (dict(incomplete_service_class_uuids128=["FFFF181F-FFFF-1000-8000-00805F9B34FB"]),),
145        (dict(shortened_local_name="avatar"),),
146        (dict(complete_local_name="avatar_the_last_test_blender"),),
147        (dict(tx_power_level=20),),
148        (dict(class_of_device=0x40680),),
149        (dict(peripheral_connection_interval_min=0x0006, peripheral_connection_interval_max=0x0C80),),
150        (dict(service_solicitation_uuids16=["183A", "181F"]),),
151        (dict(service_solicitation_uuids32=["FFFF183A", "FFFF181F"]),),
152        (dict(service_solicitation_uuids128=["FFFF183A-FFFF-1000-8000-00805F9B34FB"]),),
153        (dict(service_data_uuid16={"183A": bytes([1, 2, 3, 4])}),),
154        (dict(service_data_uuid32={"FFFF183A": bytes([1, 2, 3, 4])}),),
155        (dict(service_data_uuid128={"FFFF181F-FFFF-1000-8000-00805F9B34FB": bytes([1, 2, 3, 4])}),),
156        (dict(appearance=0x0591),),
157        (dict(advertising_interval=0x1000),),
158        (dict(uri="https://www.google.com"),),
159        (dict(le_supported_features=bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x10, 0x9F])),),
160        (dict(manufacturer_specific_data=bytes([0, 1, 2, 3, 4])),),
161    )  # type: ignore[misc]
162    def test_scan_response_data(self, data: Dict[str, Any]) -> None:
163        '''
164        Advertise from the REF device with the specified advertising data.
165        Validate that the REF generates the correct advertising data,
166        and that the dut presents the correct advertising data in the scan
167        result.
168        '''
169        advertise = self.ref.host.Advertise(
170            legacy=True,
171            connectable=True,
172            data=DataTypes(**data),
173            own_address_type=PUBLIC,
174        )
175
176        scan = self.dut.host.Scan(legacy=False, passive=False)
177        report = next((x for x in scan if x.public == self.ref.address))
178
179        scan.cancel()
180        advertise.cancel()
181
182        assert_true(report.legacy, msg='expected legacy advertising report')
183        assert_equal(report.connectable, True)
184        for key, value in data.items():
185            assert_equal(getattr(report.data, key), value)  # type: ignore[misc]
186        assert_false(report.truncated, msg='expected non-truncated advertising report')
187
188    @avatar.parameterized(
189        (RANDOM,),
190        (PUBLIC,),
191    )  # type: ignore[misc]
192    @avatar.asynchronous
193    async def test_connect(self, ref_address_type: OwnAddressType) -> None:
194        advertise = self.ref.aio.host.Advertise(
195            legacy=True,
196            connectable=True,
197            own_address_type=ref_address_type,
198            data=DataTypes(manufacturer_specific_data=b'pause cafe'),
199        )
200
201        scan = self.dut.aio.host.Scan(own_address_type=RANDOM)
202        ref = await anext((x async for x in scan if x.data.manufacturer_specific_data == b'pause cafe'))
203        scan.cancel()
204
205        ref_dut_res, dut_ref_res = await asyncio.gather(
206            anext(aiter(advertise)),
207            self.dut.aio.host.ConnectLE(**ref.address_asdict(), own_address_type=RANDOM),
208        )
209        assert_equal(dut_ref_res.result_variant(), 'connection')
210        dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection
211        assert_is_not_none(dut_ref)
212        assert dut_ref
213        advertise.cancel()
214        assert_true(await self.is_connected(self.ref, ref_dut), "")
215
216    @avatar.parameterized(
217        (RANDOM,),
218        (PUBLIC,),
219    )  # type: ignore[misc]
220    @avatar.asynchronous
221    async def test_disconnect(self, ref_address_type: OwnAddressType) -> None:
222        advertise = self.ref.aio.host.Advertise(
223            legacy=True,
224            connectable=True,
225            own_address_type=ref_address_type,
226            data=DataTypes(manufacturer_specific_data=b'pause cafe'),
227        )
228
229        scan = self.dut.aio.host.Scan(own_address_type=RANDOM)
230        ref = await anext((x async for x in scan if x.data.manufacturer_specific_data == b'pause cafe'))
231        scan.cancel()
232
233        ref_dut_res, dut_ref_res = await asyncio.gather(
234            anext(aiter(advertise)),
235            self.dut.aio.host.ConnectLE(**ref.address_asdict(), own_address_type=RANDOM),
236        )
237        assert_equal(dut_ref_res.result_variant(), 'connection')
238        dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection
239        assert_is_not_none(dut_ref)
240        assert dut_ref
241        advertise.cancel()
242        assert_true(await self.is_connected(self.ref, ref_dut), "")
243        await self.dut.aio.host.Disconnect(connection=dut_ref)
244        assert_false(await self.is_connected(self.ref, ref_dut), "")
245
246    async def is_connected(self, device: PandoraDevice, connection: Connection) -> bool:
247        try:
248            await device.aio.host.WaitDisconnection(connection=connection, timeout=5)
249            return False
250        except grpc.RpcError as e:
251            assert_equal(e.code(), grpc.StatusCode.DEADLINE_EXCEEDED)  # type: ignore
252            return True
253
254
255if __name__ == '__main__':
256    logging.basicConfig(level=logging.DEBUG)
257    test_runner.main()  # type: ignore
258