1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import avatar 17import collections 18import logging 19 20from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices 21from avatar.pandora_server import AndroidPandoraServer 22from bumble import rfcomm 23from bumble.colors import color 24from bumble.core import ( 25 BT_GENERIC_AUDIO_SERVICE, 26 BT_HANDSFREE_AUDIO_GATEWAY_SERVICE, 27 BT_L2CAP_PROTOCOL_ID, 28 BT_RFCOMM_PROTOCOL_ID, 29) 30from bumble.rfcomm import DLC, Server as RfcommServer 31from bumble.sdp import ( 32 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 33 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 34 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 35 SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, 36 DataElement, 37 ServiceAttribute, 38) 39from mobly import base_test, test_runner 40from mobly.asserts import assert_equal # type: ignore 41from mobly.asserts import assert_in # type: ignore 42from mobly.asserts import assert_not_equal # type: ignore 43from mobly.asserts import assert_not_in # type: ignore 44from pandora.host_pb2 import Connection as PandoraConnection 45from pandora.security_pb2 import LEVEL2 46from typing import Dict, List, Optional, Tuple, Union 47 48SDP_PROFILE_SUPPORTED_FEATURES_ID = 0x0311 49 50HFP_AG_FEATURE_HF_INDICATORS = 1 << 10 51HFP_AG_FEATURE_DEFAULT = HFP_AG_FEATURE_HF_INDICATORS 52 53HFP_HF_FEATURE_HF_INDICATORS = 1 << 8 54HFP_HF_FEATURE_DEFAULT = hex(0x01B5) 55 56PROPERTY_HF_ENABLED = 'bluetooth.profile.hfp.hf.enabled' 57PROPERTY_HF_FEATURES = 'bluetooth.hfp.hf_client_features.config' 58PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY = 'bluetooth.headset_client.indicator.enhanced_driver_safety.enabled' 59 60HFP_VERSION_1_7 = 0x0107 61 62 63# Stub for Audio Gateway implementation 64# TODO: b/296471045 65logger = logging.getLogger(__name__) 66 67 68class HfpProtocol: 69 dlc: rfcomm.DLC 70 buffer: str 71 lines: collections.deque[str] 72 lines_available: asyncio.Event 73 74 def __init__(self, dlc: rfcomm.DLC) -> None: 75 self.dlc = dlc 76 self.buffer = '' 77 self.lines = collections.deque() 78 self.lines_available = asyncio.Event() 79 80 dlc.sink = self.feed 81 82 def feed(self, data: Union[bytes, str]) -> None: 83 # Convert the data to a string if needed 84 if isinstance(data, bytes): 85 data = data.decode('utf-8') 86 87 logger.debug(f'<<< Data received: {data}') 88 89 # Add to the buffer and look for lines 90 self.buffer += data 91 while (separator := self.buffer.find('\r')) >= 0: 92 line = self.buffer[:separator].strip() 93 self.buffer = self.buffer[separator + 1 :] 94 if len(line) > 0: 95 self.on_line(line) 96 97 def on_line(self, line: str) -> None: 98 self.lines.append(line) 99 self.lines_available.set() 100 101 def send_command_line(self, line: str) -> None: 102 logger.debug(color(f'>>> {line}', 'yellow')) 103 self.dlc.write(line + '\r') 104 105 def send_response_line(self, line: str) -> None: 106 logger.debug(color(f'>>> {line}', 'yellow')) 107 self.dlc.write('\r\n' + line + '\r\n') 108 109 async def next_line(self) -> str: 110 await self.lines_available.wait() 111 line = self.lines.popleft() 112 if not self.lines: 113 self.lines_available.clear() 114 logger.debug(color(f'<<< {line}', 'green')) 115 return line 116 117 118class HfpClientTest(base_test.BaseTestClass): # type: ignore[misc] 119 devices: Optional[PandoraDevices] = None 120 121 # pandora devices. 122 dut: PandoraDevice 123 ref: BumblePandoraDevice 124 125 def setup_class(self) -> None: 126 self.devices = PandoraDevices(self) 127 self.dut, ref, *_ = self.devices 128 assert isinstance(ref, BumblePandoraDevice) 129 self.ref = ref 130 131 # Enable BR/EDR mode and SSP for Bumble devices. 132 self.ref.config.setdefault('classic_enabled', True) 133 self.ref.config.setdefault('classic_ssp_enabled', True) 134 self.ref.config.setdefault( 135 'server', 136 { 137 'io_capability': 'no_output_no_input', 138 }, 139 ) 140 141 for server in self.devices._servers: 142 if isinstance(server, AndroidPandoraServer): 143 self.dut_adb = server.device.adb 144 # Enable HFP Client 145 self.dut_adb.shell(['setprop', PROPERTY_HF_ENABLED, 'true']) # type: ignore 146 # Set HF features if not set yet 147 hf_feature_text = self.dut_adb.getprop(PROPERTY_HF_FEATURES) # type: ignore 148 if len(hf_feature_text) == 0: 149 self.dut_adb.shell(['setprop', PROPERTY_HF_FEATURES, HFP_HF_FEATURE_DEFAULT]) # type: ignore 150 break 151 152 def teardown_class(self) -> None: 153 if self.devices: 154 self.devices.stop_all() 155 156 @avatar.asynchronous 157 async def setup_test(self) -> None: 158 self.ref._bumble.config.update({'server': {'identity_address_type': 'public'}}) 159 await asyncio.gather(self.dut.reset(), self.ref.reset()) 160 161 # TODO(b/286338264): Moving connecting and bonding methods to a shared util scripts 162 async def make_classic_connection(self) -> Tuple[PandoraConnection, PandoraConnection]: 163 dut_ref, ref_dut = await asyncio.gather( 164 self.dut.aio.host.WaitConnection(address=self.ref.address), 165 self.ref.aio.host.Connect(address=self.dut.address), 166 ) 167 168 assert_equal(dut_ref.result_variant(), 'connection') 169 assert_equal(ref_dut.result_variant(), 'connection') 170 assert dut_ref.connection is not None and ref_dut.connection is not None 171 172 return dut_ref.connection, ref_dut.connection 173 174 async def make_classic_bond(self, dut_ref: PandoraConnection, ref_dut: PandoraConnection) -> None: 175 dut_ref_sec, ref_dut_sec = await asyncio.gather( 176 self.dut.aio.security.Secure(connection=dut_ref, classic=LEVEL2), 177 self.ref.aio.security.WaitSecurity(connection=ref_dut, classic=LEVEL2), 178 ) 179 assert_equal(dut_ref_sec.result_variant(), 'success') 180 assert_equal(ref_dut_sec.result_variant(), 'success') 181 182 async def make_hfp_connection(self) -> HfpProtocol: 183 # Listen RFCOMM 184 dlc_connected = asyncio.get_running_loop().create_future() 185 186 def on_dlc(dlc: DLC) -> None: 187 dlc_connected.set_result(dlc) 188 189 rfcomm_server = RfcommServer(self.ref.device) # type: ignore 190 channel_number = rfcomm_server.listen(on_dlc) # type: ignore 191 192 # Setup SDP records 193 self.ref.device.sdp_service_records = make_bumble_ag_sdp_records(HFP_VERSION_1_7, channel_number, 0) 194 195 # Connect and pair 196 dut_ref, ref_dut = await self.make_classic_connection() 197 await self.make_classic_bond(dut_ref, ref_dut) 198 199 # By default, Android HF should auto connect 200 dlc = await dlc_connected 201 assert isinstance(dlc, DLC) 202 203 return HfpProtocol(dlc) # type: ignore 204 205 @avatar.parameterized((True,), (False,)) # type: ignore[misc] 206 @avatar.asynchronous 207 async def test_hf_indicator_setup(self, enhanced_driver_safety_enabled: bool) -> None: 208 if enhanced_driver_safety_enabled: 209 self.dut_adb.shell(['setprop', PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY, 'true']) # type: ignore 210 else: 211 self.dut_adb.shell(['setprop', PROPERTY_HF_INDICATOR_ENHANCED_DRIVER_SAFETY, 'false']) # type: ignore 212 213 ref_dut_hfp_protocol = await self.make_hfp_connection() 214 215 class TestAgServer(HfpAgServer): 216 def on_brsf(self, hf_features: int) -> None: 217 # HF indicators should be enabled 218 assert_not_equal(hf_features & HFP_HF_FEATURE_HF_INDICATORS, 0) 219 return super().on_brsf(hf_features) 220 221 def on_bind_list(self, indicators: list[int]) -> None: 222 if enhanced_driver_safety_enabled: 223 assert_in(1, indicators) 224 else: 225 assert_not_in(1, indicators) 226 self.terminated = True 227 228 server = TestAgServer(ref_dut_hfp_protocol, ag_features=HFP_AG_FEATURE_HF_INDICATORS) 229 await server.serve() 230 231 232def make_bumble_ag_sdp_records( 233 hfp_version: int, rfcomm_channel: int, ag_sdp_features: int 234) -> Dict[int, List[ServiceAttribute]]: 235 return { 236 0x00010001: [ 237 ServiceAttribute( 238 SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, 239 DataElement.unsigned_integer_32(0x00010001), 240 ), 241 ServiceAttribute( 242 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 243 DataElement.sequence( 244 [ 245 DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), 246 DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), 247 ] 248 ), 249 ), 250 ServiceAttribute( 251 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 252 DataElement.sequence( 253 [ 254 DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), 255 DataElement.sequence( 256 [ 257 DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), 258 DataElement.unsigned_integer_8(rfcomm_channel), 259 ] 260 ), 261 ] 262 ), 263 ), 264 ServiceAttribute( 265 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 266 DataElement.sequence( 267 [ 268 DataElement.sequence( 269 [ 270 DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), 271 DataElement.unsigned_integer_16(hfp_version), 272 ] 273 ) 274 ] 275 ), 276 ), 277 ServiceAttribute( 278 SDP_PROFILE_SUPPORTED_FEATURES_ID, 279 DataElement.unsigned_integer_16(ag_sdp_features), 280 ), 281 ] 282 } 283 284 285class HfpAgServer: 286 enabled_hf_indicators: list[int] 287 hf_features: int 288 289 def __init__(self, protocol: HfpProtocol, ag_features: int = HFP_AG_FEATURE_DEFAULT) -> None: 290 self.protocol = protocol 291 self.ag_features = ag_features 292 self.terminated = False 293 self.hf_features = 0 # Unknown 294 295 def send_response_line(self, response: str) -> None: 296 self.protocol.send_response_line(response) # type: ignore 297 298 async def serve(self) -> None: 299 while not self.terminated: 300 line = await self.protocol.next_line() # type: ignore 301 302 if line.startswith('AT+BRSF='): 303 hf_features = int(line[len('AT+BRSF=') :]) 304 self.on_brsf(hf_features) 305 elif line.startswith('AT+BIND=?'): 306 self.on_bind_read_capabilities() 307 elif line.startswith('AT+BIND='): 308 indicators = [int(i) for i in line[len('AT+BIND=') :].split(',')] 309 self.on_bind_list(indicators) 310 elif line.startswith('AT+BIND?'): 311 self.on_bind_read_configuration() 312 elif line.startswith('AT+CIND=?'): 313 self.on_cind_read() 314 elif line.startswith('AT+CIND?'): 315 self.on_cind_test() 316 # TODO(b/286226902): Implement handlers for these commands 317 elif line.startswith( 318 ( 319 'AT+CLIP=', 320 'AT+VGS=', 321 'AT+BIA=', 322 'AT+CMER=', 323 'AT+XEVENT=', 324 'AT+XAPL=', 325 ) 326 ): 327 self.send_response_line('OK') 328 else: 329 self.send_response_line('ERROR') 330 331 def on_brsf(self, hf_features: int) -> None: 332 self.hf_features = hf_features 333 self.send_response_line(f'+BRSF: {self.ag_features}') 334 self.send_response_line('OK') 335 336 # AT+CIND? 337 def on_cind_read(self) -> None: 338 self.send_response_line('+CIND: 0,0,1,4,1,5,0') 339 self.send_response_line('OK') 340 341 # AT+CIND=? 342 def on_cind_test(self) -> None: 343 self.send_response_line( 344 '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' 345 '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' 346 '("callheld",(0-2))' 347 ) 348 self.send_response_line('OK') 349 350 # AT+BIND= 351 def on_bind_list(self, indicators: list[int]) -> None: 352 self.enabled_hf_indicators = indicators[:] 353 self.send_response_line('OK') 354 355 # AT+BIND=? 356 def on_bind_read_capabilities(self) -> None: 357 self.send_response_line('+BIND: ' + ','.join(map(str, self.enabled_hf_indicators))) 358 self.send_response_line('OK') 359 360 # AT+BIND? 361 def on_bind_read_configuration(self) -> None: 362 for i in self.enabled_hf_indicators: 363 self.send_response_line(f'+BIND: {i},1') 364 self.send_response_line('OK') 365 366 367if __name__ == '__main__': 368 logging.basicConfig(level=logging.DEBUG) 369 test_runner.main() # type: ignore 370