1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import avatar
17import collections
18import logging
19
20from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
21from avatar.pandora_server import AndroidPandoraServer
22from bumble import rfcomm
23from bumble.colors import color
24from bumble.core import (
25    BT_GENERIC_AUDIO_SERVICE,
26    BT_HANDSFREE_AUDIO_GATEWAY_SERVICE,
27    BT_L2CAP_PROTOCOL_ID,
28    BT_RFCOMM_PROTOCOL_ID,
29)
30from bumble.rfcomm import DLC, Server as RfcommServer
31from bumble.sdp import (
32    SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
33    SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
34    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
35    SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
36    DataElement,
37    ServiceAttribute,
38)
39from mobly import base_test, test_runner
40from mobly.asserts import assert_equal  # type: ignore
41from mobly.asserts import assert_in  # type: ignore
42from mobly.asserts import assert_not_equal  # type: ignore
43from mobly.asserts import assert_not_in  # type: ignore
44from pandora.host_pb2 import Connection as PandoraConnection
45from pandora.security_pb2 import LEVEL2
46from typing import Dict, List, Optional, Tuple, Union
47
48SDP_PROFILE_SUPPORTED_FEATURES_ID = 0x0311
49
50HFP_AG_FEATURE_HF_INDICATORS = 1 << 10
51HFP_AG_FEATURE_DEFAULT = HFP_AG_FEATURE_HF_INDICATORS
52
53HFP_HF_FEATURE_HF_INDICATORS = 1 << 8
54HFP_HF_FEATURE_DEFAULT = hex(0x01B5)
55
56PROPERTY_HF_ENABLED = 'bluetooth.profile.hfp.hf.enabled'
57PROPERTY_HF_FEATURES = 'bluetooth.hfp.hf_client_features.config'
58PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY = 'bluetooth.headset_client.indicator.enhanced_driver_safety.enabled'
59
60HFP_VERSION_1_7 = 0x0107
61
62
63# Stub for Audio Gateway implementation
64# TODO: b/296471045
65logger = logging.getLogger(__name__)
66
67
68class HfpProtocol:
69    dlc: rfcomm.DLC
70    buffer: str
71    lines: collections.deque[str]
72    lines_available: asyncio.Event
73
74    def __init__(self, dlc: rfcomm.DLC) -> None:
75        self.dlc = dlc
76        self.buffer = ''
77        self.lines = collections.deque()
78        self.lines_available = asyncio.Event()
79
80        dlc.sink = self.feed
81
82    def feed(self, data: Union[bytes, str]) -> None:
83        # Convert the data to a string if needed
84        if isinstance(data, bytes):
85            data = data.decode('utf-8')
86
87        logger.debug(f'<<< Data received: {data}')
88
89        # Add to the buffer and look for lines
90        self.buffer += data
91        while (separator := self.buffer.find('\r')) >= 0:
92            line = self.buffer[:separator].strip()
93            self.buffer = self.buffer[separator + 1 :]
94            if len(line) > 0:
95                self.on_line(line)
96
97    def on_line(self, line: str) -> None:
98        self.lines.append(line)
99        self.lines_available.set()
100
101    def send_command_line(self, line: str) -> None:
102        logger.debug(color(f'>>> {line}', 'yellow'))
103        self.dlc.write(line + '\r')
104
105    def send_response_line(self, line: str) -> None:
106        logger.debug(color(f'>>> {line}', 'yellow'))
107        self.dlc.write('\r\n' + line + '\r\n')
108
109    async def next_line(self) -> str:
110        await self.lines_available.wait()
111        line = self.lines.popleft()
112        if not self.lines:
113            self.lines_available.clear()
114        logger.debug(color(f'<<< {line}', 'green'))
115        return line
116
117
118class HfpClientTest(base_test.BaseTestClass):  # type: ignore[misc]
119    devices: Optional[PandoraDevices] = None
120
121    # pandora devices.
122    dut: PandoraDevice
123    ref: BumblePandoraDevice
124
125    def setup_class(self) -> None:
126        self.devices = PandoraDevices(self)
127        self.dut, ref, *_ = self.devices
128        assert isinstance(ref, BumblePandoraDevice)
129        self.ref = ref
130
131        # Enable BR/EDR mode and SSP for Bumble devices.
132        self.ref.config.setdefault('classic_enabled', True)
133        self.ref.config.setdefault('classic_ssp_enabled', True)
134        self.ref.config.setdefault(
135            'server',
136            {
137                'io_capability': 'no_output_no_input',
138            },
139        )
140
141        for server in self.devices._servers:
142            if isinstance(server, AndroidPandoraServer):
143                self.dut_adb = server.device.adb
144                # Enable HFP Client
145                self.dut_adb.shell(['setprop', PROPERTY_HF_ENABLED, 'true'])  # type: ignore
146                # Set HF features if not set yet
147                hf_feature_text = self.dut_adb.getprop(PROPERTY_HF_FEATURES)  # type: ignore
148                if len(hf_feature_text) == 0:
149                    self.dut_adb.shell(['setprop', PROPERTY_HF_FEATURES, HFP_HF_FEATURE_DEFAULT])  # type: ignore
150                break
151
152    def teardown_class(self) -> None:
153        if self.devices:
154            self.devices.stop_all()
155
156    @avatar.asynchronous
157    async def setup_test(self) -> None:
158        self.ref._bumble.config.update({'server': {'identity_address_type': 'public'}})
159        await asyncio.gather(self.dut.reset(), self.ref.reset())
160
161    # TODO(b/286338264): Moving connecting and bonding methods to a shared util scripts
162    async def make_classic_connection(self) -> Tuple[PandoraConnection, PandoraConnection]:
163        dut_ref, ref_dut = await asyncio.gather(
164            self.dut.aio.host.WaitConnection(address=self.ref.address),
165            self.ref.aio.host.Connect(address=self.dut.address),
166        )
167
168        assert_equal(dut_ref.result_variant(), 'connection')
169        assert_equal(ref_dut.result_variant(), 'connection')
170        assert dut_ref.connection is not None and ref_dut.connection is not None
171
172        return dut_ref.connection, ref_dut.connection
173
174    async def make_classic_bond(self, dut_ref: PandoraConnection, ref_dut: PandoraConnection) -> None:
175        dut_ref_sec, ref_dut_sec = await asyncio.gather(
176            self.dut.aio.security.Secure(connection=dut_ref, classic=LEVEL2),
177            self.ref.aio.security.WaitSecurity(connection=ref_dut, classic=LEVEL2),
178        )
179        assert_equal(dut_ref_sec.result_variant(), 'success')
180        assert_equal(ref_dut_sec.result_variant(), 'success')
181
182    async def make_hfp_connection(self) -> HfpProtocol:
183        # Listen RFCOMM
184        dlc_connected = asyncio.get_running_loop().create_future()
185
186        def on_dlc(dlc: DLC) -> None:
187            dlc_connected.set_result(dlc)
188
189        rfcomm_server = RfcommServer(self.ref.device)  # type: ignore
190        channel_number = rfcomm_server.listen(on_dlc)  # type: ignore
191
192        # Setup SDP records
193        self.ref.device.sdp_service_records = make_bumble_ag_sdp_records(HFP_VERSION_1_7, channel_number, 0)
194
195        # Connect and pair
196        dut_ref, ref_dut = await self.make_classic_connection()
197        await self.make_classic_bond(dut_ref, ref_dut)
198
199        # By default, Android HF should auto connect
200        dlc = await dlc_connected
201        assert isinstance(dlc, DLC)
202
203        return HfpProtocol(dlc)  # type: ignore
204
205    @avatar.parameterized((True,), (False,))  # type: ignore[misc]
206    @avatar.asynchronous
207    async def test_hf_indicator_setup(self, enhanced_driver_safety_enabled: bool) -> None:
208        if enhanced_driver_safety_enabled:
209            self.dut_adb.shell(['setprop', PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY, 'true'])  # type: ignore
210        else:
211            self.dut_adb.shell(['setprop', PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY, 'false'])  # type: ignore
212
213        ref_dut_hfp_protocol = await self.make_hfp_connection()
214
215        class TestAgServer(HfpAgServer):
216            def on_brsf(self, hf_features: int) -> None:
217                # HF indicators should be enabled
218                assert_not_equal(hf_features & HFP_HF_FEATURE_HF_INDICATORS, 0)
219                return super().on_brsf(hf_features)
220
221            def on_bind_list(self, indicators: list[int]) -> None:
222                if enhanced_driver_safety_enabled:
223                    assert_in(1, indicators)
224                else:
225                    assert_not_in(1, indicators)
226                self.terminated = True
227
228        server = TestAgServer(ref_dut_hfp_protocol, ag_features=HFP_AG_FEATURE_HF_INDICATORS)
229        await server.serve()
230
231
232def make_bumble_ag_sdp_records(
233    hfp_version: int, rfcomm_channel: int, ag_sdp_features: int
234) -> Dict[int, List[ServiceAttribute]]:
235    return {
236        0x00010001: [
237            ServiceAttribute(
238                SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
239                DataElement.unsigned_integer_32(0x00010001),
240            ),
241            ServiceAttribute(
242                SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
243                DataElement.sequence(
244                    [
245                        DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
246                        DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
247                    ]
248                ),
249            ),
250            ServiceAttribute(
251                SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
252                DataElement.sequence(
253                    [
254                        DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
255                        DataElement.sequence(
256                            [
257                                DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
258                                DataElement.unsigned_integer_8(rfcomm_channel),
259                            ]
260                        ),
261                    ]
262                ),
263            ),
264            ServiceAttribute(
265                SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
266                DataElement.sequence(
267                    [
268                        DataElement.sequence(
269                            [
270                                DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE),
271                                DataElement.unsigned_integer_16(hfp_version),
272                            ]
273                        )
274                    ]
275                ),
276            ),
277            ServiceAttribute(
278                SDP_PROFILE_SUPPORTED_FEATURES_ID,
279                DataElement.unsigned_integer_16(ag_sdp_features),
280            ),
281        ]
282    }
283
284
285class HfpAgServer:
286    enabled_hf_indicators: list[int]
287    hf_features: int
288
289    def __init__(self, protocol: HfpProtocol, ag_features: int = HFP_AG_FEATURE_DEFAULT) -> None:
290        self.protocol = protocol
291        self.ag_features = ag_features
292        self.terminated = False
293        self.hf_features = 0  # Unknown
294
295    def send_response_line(self, response: str) -> None:
296        self.protocol.send_response_line(response)  # type: ignore
297
298    async def serve(self) -> None:
299        while not self.terminated:
300            line = await self.protocol.next_line()  # type: ignore
301
302            if line.startswith('AT+BRSF='):
303                hf_features = int(line[len('AT+BRSF=') :])
304                self.on_brsf(hf_features)
305            elif line.startswith('AT+BIND=?'):
306                self.on_bind_read_capabilities()
307            elif line.startswith('AT+BIND='):
308                indicators = [int(i) for i in line[len('AT+BIND=') :].split(',')]
309                self.on_bind_list(indicators)
310            elif line.startswith('AT+BIND?'):
311                self.on_bind_read_configuration()
312            elif line.startswith('AT+CIND=?'):
313                self.on_cind_read()
314            elif line.startswith('AT+CIND?'):
315                self.on_cind_test()
316            # TODO(b/286226902): Implement handlers for these commands
317            elif line.startswith(
318                (
319                    'AT+CLIP=',
320                    'AT+VGS=',
321                    'AT+BIA=',
322                    'AT+CMER=',
323                    'AT+XEVENT=',
324                    'AT+XAPL=',
325                )
326            ):
327                self.send_response_line('OK')
328            else:
329                self.send_response_line('ERROR')
330
331    def on_brsf(self, hf_features: int) -> None:
332        self.hf_features = hf_features
333        self.send_response_line(f'+BRSF: {self.ag_features}')
334        self.send_response_line('OK')
335
336    # AT+CIND?
337    def on_cind_read(self) -> None:
338        self.send_response_line('+CIND: 0,0,1,4,1,5,0')
339        self.send_response_line('OK')
340
341    # AT+CIND=?
342    def on_cind_test(self) -> None:
343        self.send_response_line(
344            '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
345            '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
346            '("callheld",(0-2))'
347        )
348        self.send_response_line('OK')
349
350    # AT+BIND=
351    def on_bind_list(self, indicators: list[int]) -> None:
352        self.enabled_hf_indicators = indicators[:]
353        self.send_response_line('OK')
354
355    # AT+BIND=?
356    def on_bind_read_capabilities(self) -> None:
357        self.send_response_line('+BIND: ' + ','.join(map(str, self.enabled_hf_indicators)))
358        self.send_response_line('OK')
359
360    # AT+BIND?
361    def on_bind_read_configuration(self) -> None:
362        for i in self.enabled_hf_indicators:
363            self.send_response_line(f'+BIND: {i},1')
364        self.send_response_line('OK')
365
366
367if __name__ == '__main__':
368    logging.basicConfig(level=logging.DEBUG)
369    test_runner.main()  # type: ignore
370