1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import sys 20import os 21import logging 22 23from bumble.colors import color 24from bumble.device import Device 25from bumble.transport import open_transport_or_link 26from bumble.core import ( 27 BT_BR_EDR_TRANSPORT, 28 BT_AVDTP_PROTOCOL_ID, 29 BT_AUDIO_SINK_SERVICE, 30 BT_L2CAP_PROTOCOL_ID, 31) 32from bumble.avdtp import Protocol as AVDTP_Protocol 33from bumble.a2dp import make_audio_source_service_sdp_records 34from bumble.sdp import ( 35 Client as SDP_Client, 36 ServiceAttribute, 37 DataElement, 38 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 39 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 40 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 41) 42 43 44# ----------------------------------------------------------------------------- 45def sdp_records(): 46 service_record_handle = 0x00010001 47 return { 48 service_record_handle: make_audio_source_service_sdp_records( 49 service_record_handle 50 ) 51 } 52 53 54# ----------------------------------------------------------------------------- 55# pylint: disable-next=too-many-nested-blocks 56async def find_a2dp_service(connection): 57 # Connect to the SDP Server 58 sdp_client = SDP_Client(connection) 59 await sdp_client.connect() 60 61 # Search for services with an Audio Sink service class 62 search_result = await sdp_client.search_attributes( 63 [BT_AUDIO_SINK_SERVICE], 64 [ 65 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 66 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 67 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 68 ], 69 ) 70 71 print(color('==================================', 'blue')) 72 print(color('A2DP Sink Services:', 'yellow')) 73 74 service_version = None 75 76 for attribute_list in search_result: 77 print(color('SERVICE:', 'green')) 78 79 # Service classes 80 service_class_id_list = ServiceAttribute.find_attribute_in_list( 81 attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID 82 ) 83 if service_class_id_list: 84 if service_class_id_list.value: 85 print(color(' Service Classes:', 'green')) 86 for service_class_id in service_class_id_list.value: 87 print(' ', service_class_id.value) 88 89 # Protocol info 90 protocol_descriptor_list = ServiceAttribute.find_attribute_in_list( 91 attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID 92 ) 93 if protocol_descriptor_list: 94 print(color(' Protocol:', 'green')) 95 for protocol_descriptor in protocol_descriptor_list.value: 96 if protocol_descriptor.value[0].value == BT_L2CAP_PROTOCOL_ID: 97 if len(protocol_descriptor.value) >= 2: 98 psm = protocol_descriptor.value[1].value 99 print(f'{color(" L2CAP PSM:", "cyan")} {psm}') 100 elif protocol_descriptor.value[0].value == BT_AVDTP_PROTOCOL_ID: 101 if len(protocol_descriptor.value) >= 2: 102 avdtp_version_major = protocol_descriptor.value[1].value >> 8 103 avdtp_version_minor = protocol_descriptor.value[1].value & 0xFF 104 print( 105 f'{color(" AVDTP Version:", "cyan")} ' 106 f'{avdtp_version_major}.{avdtp_version_minor}' 107 ) 108 service_version = (avdtp_version_major, avdtp_version_minor) 109 110 # Profile info 111 bluetooth_profile_descriptor_list = ServiceAttribute.find_attribute_in_list( 112 attribute_list, SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID 113 ) 114 if bluetooth_profile_descriptor_list: 115 if bluetooth_profile_descriptor_list.value: 116 if ( 117 bluetooth_profile_descriptor_list.value[0].type 118 == DataElement.SEQUENCE 119 ): 120 bluetooth_profile_descriptors = ( 121 bluetooth_profile_descriptor_list.value 122 ) 123 else: 124 # Sometimes, instead of a list of lists, we just find a list. 125 # Fix that. 126 bluetooth_profile_descriptors = [bluetooth_profile_descriptor_list] 127 128 print(color(' Profiles:', 'green')) 129 for bluetooth_profile_descriptor in bluetooth_profile_descriptors: 130 version_major = bluetooth_profile_descriptor.value[1].value >> 8 131 version_minor = bluetooth_profile_descriptor.value[1].value & 0xFF 132 print( 133 f' {bluetooth_profile_descriptor.value[0].value}' 134 f' - version {version_major}.{version_minor}' 135 ) 136 137 await sdp_client.disconnect() 138 return service_version 139 140 141# ----------------------------------------------------------------------------- 142async def main() -> None: 143 if len(sys.argv) < 4: 144 print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>') 145 print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8') 146 return 147 148 print('<<< connecting to HCI...') 149 async with await open_transport_or_link(sys.argv[2]) as hci_transport: 150 print('<<< connected') 151 152 # Create a device 153 device = Device.from_config_file_with_hci( 154 sys.argv[1], hci_transport.source, hci_transport.sink 155 ) 156 device.classic_enabled = True 157 158 # Start the controller 159 await device.power_on() 160 161 # Setup the SDP to expose a SRC service, in case the remote device queries us 162 # back 163 device.sdp_service_records = sdp_records() 164 165 # Connect to a peer 166 target_address = sys.argv[3] 167 print(f'=== Connecting to {target_address}...') 168 connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) 169 print(f'=== Connected to {connection.peer_address}!') 170 171 # Request authentication 172 print('*** Authenticating...') 173 await connection.authenticate() 174 print('*** Authenticated') 175 176 # Enable encryption 177 print('*** Enabling encryption...') 178 await connection.encrypt() 179 print('*** Encryption on') 180 181 # Look for an A2DP service 182 avdtp_version = await find_a2dp_service(connection) 183 if not avdtp_version: 184 print(color('!!! no AVDTP service found')) 185 return 186 print(f'AVDTP version: {avdtp_version[0]}.{avdtp_version[1]}') 187 188 # Create a client to interact with the remote device 189 client = await AVDTP_Protocol.connect(connection, avdtp_version) 190 191 # Discover all endpoints on the remote device 192 endpoints = list(await client.discover_remote_endpoints()) 193 print(f'@@@ Found {len(endpoints)} endpoints') 194 for endpoint in endpoints: 195 print('@@@', endpoint) 196 197 198# ----------------------------------------------------------------------------- 199logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 200asyncio.run(main()) 201