1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import contextlib 20import sys 21import os 22import logging 23import json 24import websockets 25import functools 26from typing import Optional 27 28from bumble import rfcomm 29from bumble import hci 30from bumble.device import Device, Connection 31from bumble.transport import open_transport_or_link 32from bumble import hfp 33from bumble.hfp import HfProtocol 34 35ws: Optional[websockets.WebSocketServerProtocol] = None 36hf_protocol: Optional[HfProtocol] = None 37 38 39# ----------------------------------------------------------------------------- 40def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration): 41 print('*** DLC connected', dlc) 42 global hf_protocol 43 hf_protocol = HfProtocol(dlc, configuration) 44 asyncio.create_task(hf_protocol.run()) 45 46 def on_sco_request(connection: Connection, link_type: int, protocol: HfProtocol): 47 if connection == protocol.dlc.multiplexer.l2cap_channel.connection: 48 if link_type == hci.HCI_Connection_Complete_Event.SCO_LINK_TYPE: 49 esco_parameters = hfp.ESCO_PARAMETERS[ 50 hfp.DefaultCodecParameters.SCO_CVSD_D1 51 ] 52 elif protocol.active_codec == hfp.AudioCodec.MSBC: 53 esco_parameters = hfp.ESCO_PARAMETERS[ 54 hfp.DefaultCodecParameters.ESCO_MSBC_T2 55 ] 56 elif protocol.active_codec == hfp.AudioCodec.CVSD: 57 esco_parameters = hfp.ESCO_PARAMETERS[ 58 hfp.DefaultCodecParameters.ESCO_CVSD_S4 59 ] 60 connection.abort_on( 61 'disconnection', 62 connection.device.send_command( 63 hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command( 64 bd_addr=connection.peer_address, **esco_parameters.asdict() 65 ) 66 ), 67 ) 68 69 handler = functools.partial(on_sco_request, protocol=hf_protocol) 70 dlc.multiplexer.l2cap_channel.connection.device.on('sco_request', handler) 71 dlc.multiplexer.l2cap_channel.once( 72 'close', 73 lambda: dlc.multiplexer.l2cap_channel.connection.device.remove_listener( 74 'sco_request', handler 75 ), 76 ) 77 78 def on_ag_indicator(indicator): 79 global ws 80 if ws: 81 asyncio.create_task(ws.send(str(indicator))) 82 83 hf_protocol.on('ag_indicator', on_ag_indicator) 84 85 86# ----------------------------------------------------------------------------- 87async def main() -> None: 88 if len(sys.argv) < 3: 89 print('Usage: run_classic_hfp.py <device-config> <transport-spec>') 90 print('example: run_classic_hfp.py classic2.json usb:04b4:f901') 91 return 92 93 print('<<< connecting to HCI...') 94 async with await open_transport_or_link(sys.argv[2]) as hci_transport: 95 print('<<< connected') 96 97 # Hands-Free profile configuration. 98 # TODO: load configuration from file. 99 configuration = hfp.HfConfiguration( 100 supported_hf_features=[ 101 hfp.HfFeature.THREE_WAY_CALLING, 102 hfp.HfFeature.REMOTE_VOLUME_CONTROL, 103 hfp.HfFeature.ENHANCED_CALL_STATUS, 104 hfp.HfFeature.ENHANCED_CALL_CONTROL, 105 hfp.HfFeature.CODEC_NEGOTIATION, 106 hfp.HfFeature.HF_INDICATORS, 107 hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED, 108 ], 109 supported_hf_indicators=[ 110 hfp.HfIndicator.BATTERY_LEVEL, 111 ], 112 supported_audio_codecs=[ 113 hfp.AudioCodec.CVSD, 114 hfp.AudioCodec.MSBC, 115 ], 116 ) 117 118 # Create a device 119 device = Device.from_config_file_with_hci( 120 sys.argv[1], hci_transport.source, hci_transport.sink 121 ) 122 device.classic_enabled = True 123 124 # Create and register a server 125 rfcomm_server = rfcomm.Server(device) 126 127 # Listen for incoming DLC connections 128 channel_number = rfcomm_server.listen(lambda dlc: on_dlc(dlc, configuration)) 129 print(f'### Listening for connection on channel {channel_number}') 130 131 # Advertise the HFP RFComm channel in the SDP 132 device.sdp_service_records = { 133 0x00010001: hfp.make_hf_sdp_records( 134 0x00010001, channel_number, configuration 135 ) 136 } 137 138 # Let's go! 139 await device.power_on() 140 141 # Start being discoverable and connectable 142 await device.set_discoverable(True) 143 await device.set_connectable(True) 144 145 # Start the UI websocket server to offer a few buttons and input boxes 146 async def serve(websocket: websockets.WebSocketServerProtocol, _path): 147 global ws 148 ws = websocket 149 async for message in websocket: 150 with contextlib.suppress(websockets.exceptions.ConnectionClosedOK): 151 print('Received: ', str(message)) 152 153 parsed = json.loads(message) 154 message_type = parsed['type'] 155 if message_type == 'at_command': 156 if hf_protocol is not None: 157 response = str( 158 await hf_protocol.execute_command( 159 parsed['command'], 160 response_type=hfp.AtResponseType.MULTIPLE, 161 ) 162 ) 163 await websocket.send(response) 164 elif message_type == 'query_call': 165 if hf_protocol: 166 response = str(await hf_protocol.query_current_calls()) 167 await websocket.send(response) 168 169 await websockets.serve(serve, 'localhost', 8989) 170 171 await hci_transport.source.wait_for_termination() 172 173 174# ----------------------------------------------------------------------------- 175logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 176asyncio.run(main()) 177