1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import sys 20import os 21import logging 22 23from bumble.colors import color 24 25import bumble.core 26from bumble.device import Device 27from bumble.transport import open_transport_or_link 28from bumble.core import ( 29 BT_L2CAP_PROTOCOL_ID, 30 BT_RFCOMM_PROTOCOL_ID, 31 BT_BR_EDR_TRANSPORT, 32) 33from bumble.rfcomm import Client 34from bumble.sdp import ( 35 Client as SDP_Client, 36 DataElement, 37 ServiceAttribute, 38 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 39 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 40 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 41) 42 43 44# ----------------------------------------------------------------------------- 45async def list_rfcomm_channels(connection): 46 # Connect to the SDP Server 47 sdp_client = SDP_Client(connection) 48 await sdp_client.connect() 49 50 # Search for services with an L2CAP service attribute 51 search_result = await sdp_client.search_attributes( 52 [BT_L2CAP_PROTOCOL_ID], 53 [ 54 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 55 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 56 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 57 ], 58 ) 59 print(color('==================================', 'blue')) 60 print(color('RFCOMM Services:', 'yellow')) 61 # pylint: disable-next=too-many-nested-blocks 62 for attribute_list in search_result: 63 # Look for the RFCOMM Channel number 64 protocol_descriptor_list = ServiceAttribute.find_attribute_in_list( 65 attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID 66 ) 67 if protocol_descriptor_list: 68 for protocol_descriptor in protocol_descriptor_list.value: 69 if len(protocol_descriptor.value) >= 2: 70 if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID: 71 print(color('SERVICE:', 'green')) 72 print( 73 color(' RFCOMM Channel:', 'cyan'), 74 protocol_descriptor.value[1].value, 75 ) 76 77 # List profiles 78 bluetooth_profile_descriptor_list = ( 79 ServiceAttribute.find_attribute_in_list( 80 attribute_list, 81 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 82 ) 83 ) 84 if bluetooth_profile_descriptor_list: 85 if bluetooth_profile_descriptor_list.value: 86 if ( 87 bluetooth_profile_descriptor_list.value[0].type 88 == DataElement.SEQUENCE 89 ): 90 bluetooth_profile_descriptors = ( 91 bluetooth_profile_descriptor_list.value 92 ) 93 else: 94 # Sometimes, instead of a list of lists, we just 95 # find a list. Fix that 96 bluetooth_profile_descriptors = [ 97 bluetooth_profile_descriptor_list 98 ] 99 100 print(color(' Profiles:', 'green')) 101 for ( 102 bluetooth_profile_descriptor 103 ) in bluetooth_profile_descriptors: 104 version_major = ( 105 bluetooth_profile_descriptor.value[1].value >> 8 106 ) 107 version_minor = ( 108 bluetooth_profile_descriptor.value[1].value 109 & 0xFF 110 ) 111 print( 112 ' ' 113 f'{bluetooth_profile_descriptor.value[0].value}' 114 f' - version {version_major}.{version_minor}' 115 ) 116 117 # List service classes 118 service_class_id_list = ServiceAttribute.find_attribute_in_list( 119 attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID 120 ) 121 if service_class_id_list: 122 if service_class_id_list.value: 123 print(color(' Service Classes:', 'green')) 124 for service_class_id in service_class_id_list.value: 125 print(' ', service_class_id.value) 126 127 await sdp_client.disconnect() 128 129 130# ----------------------------------------------------------------------------- 131class TcpServerProtocol(asyncio.Protocol): 132 def __init__(self, rfcomm_session): 133 self.rfcomm_session = rfcomm_session 134 self.transport = None 135 136 def connection_made(self, transport): 137 peer_name = transport.get_extra_info('peer_name') 138 print(f'<<< TCP Server: connection from {peer_name}') 139 self.transport = transport 140 self.rfcomm_session.sink = self.rfcomm_data_received 141 142 def rfcomm_data_received(self, data): 143 print(f'<<< RFCOMM Data: {data.hex()}') 144 if self.transport: 145 self.transport.write(data) 146 else: 147 print('!!! no TCP connection, dropping data') 148 149 def data_received(self, data): 150 print(f'<<< TCP Server: data received: {len(data)} bytes - {data.hex()}') 151 self.rfcomm_session.write(data) 152 153 154# ----------------------------------------------------------------------------- 155async def tcp_server(tcp_port, rfcomm_session): 156 print(f'$$$ Starting TCP server on port {tcp_port}') 157 158 server = await asyncio.get_running_loop().create_server( 159 lambda: TcpServerProtocol(rfcomm_session), '127.0.0.1', tcp_port 160 ) 161 await asyncio.get_running_loop().create_future() 162 163 async with server: 164 await server.serve_forever() 165 166 167# ----------------------------------------------------------------------------- 168async def main() -> None: 169 if len(sys.argv) < 5: 170 print( 171 'Usage: run_rfcomm_client.py <device-config> <transport-spec> ' 172 '<bluetooth-address> <channel>|discover [tcp-port]' 173 ) 174 print( 175 ' specifying a channel number, or "discover" to list all RFCOMM channels' 176 ) 177 print('example: run_rfcomm_client.py classic1.json usb:0 E1:CA:72:48:C4:E8 8') 178 return 179 180 print('<<< connecting to HCI...') 181 async with await open_transport_or_link(sys.argv[2]) as hci_transport: 182 print('<<< connected') 183 184 # Create a device 185 device = Device.from_config_file_with_hci( 186 sys.argv[1], hci_transport.source, hci_transport.sink 187 ) 188 device.classic_enabled = True 189 await device.power_on() 190 191 # Connect to a peer 192 target_address = sys.argv[3] 193 print(f'=== Connecting to {target_address}...') 194 connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) 195 print(f'=== Connected to {connection.peer_address}!') 196 197 channel_str = sys.argv[4] 198 if channel_str == 'discover': 199 await list_rfcomm_channels(connection) 200 return 201 202 # Request authentication 203 print('*** Authenticating...') 204 await connection.authenticate() 205 print('*** Authenticated') 206 207 # Enable encryption 208 print('*** Enabling encryption...') 209 await connection.encrypt() 210 print('*** Encryption on') 211 212 # Create a client and start it 213 print('@@@ Starting RFCOMM client...') 214 rfcomm_client = Client(connection) 215 rfcomm_mux = await rfcomm_client.start() 216 print('@@@ Started') 217 218 channel = int(channel_str) 219 print(f'### Opening session for channel {channel}...') 220 try: 221 session = await rfcomm_mux.open_dlc(channel) 222 print('### Session open', session) 223 except bumble.core.ConnectionError as error: 224 print(f'### Session open failed: {error}') 225 await rfcomm_mux.disconnect() 226 print('@@@ Disconnected from RFCOMM server') 227 return 228 229 if len(sys.argv) == 6: 230 # A TCP port was specified, start listening 231 tcp_port = int(sys.argv[5]) 232 asyncio.create_task(tcp_server(tcp_port, session)) 233 234 await hci_transport.source.wait_for_termination() 235 236 237# ----------------------------------------------------------------------------- 238logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 239asyncio.run(main()) 240