1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import os 20import logging 21import time 22 23import click 24 25from bumble.company_ids import COMPANY_IDENTIFIERS 26from bumble.colors import color 27from bumble.core import name_or_number 28from bumble.hci import ( 29 map_null_terminated_utf8_string, 30 CodecID, 31 LeFeature, 32 HCI_SUCCESS, 33 HCI_VERSION_NAMES, 34 LMP_VERSION_NAMES, 35 HCI_Command, 36 HCI_Command_Complete_Event, 37 HCI_Command_Status_Event, 38 HCI_READ_BUFFER_SIZE_COMMAND, 39 HCI_Read_Buffer_Size_Command, 40 HCI_READ_BD_ADDR_COMMAND, 41 HCI_Read_BD_ADDR_Command, 42 HCI_READ_LOCAL_NAME_COMMAND, 43 HCI_Read_Local_Name_Command, 44 HCI_LE_READ_BUFFER_SIZE_COMMAND, 45 HCI_LE_Read_Buffer_Size_Command, 46 HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND, 47 HCI_LE_Read_Maximum_Data_Length_Command, 48 HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND, 49 HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command, 50 HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND, 51 HCI_LE_Read_Maximum_Advertising_Data_Length_Command, 52 HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND, 53 HCI_LE_Read_Suggested_Default_Data_Length_Command, 54 HCI_Read_Local_Supported_Codecs_Command, 55 HCI_Read_Local_Supported_Codecs_V2_Command, 56 HCI_Read_Local_Version_Information_Command, 57) 58from bumble.host import Host 59from bumble.transport import open_transport_or_link 60 61 62# ----------------------------------------------------------------------------- 63def command_succeeded(response): 64 if isinstance(response, HCI_Command_Status_Event): 65 return response.status == HCI_SUCCESS 66 if isinstance(response, HCI_Command_Complete_Event): 67 return response.return_parameters.status == HCI_SUCCESS 68 return False 69 70 71# ----------------------------------------------------------------------------- 72async def get_classic_info(host: Host) -> None: 73 if host.supports_command(HCI_READ_BD_ADDR_COMMAND): 74 response = await host.send_command(HCI_Read_BD_ADDR_Command()) 75 if command_succeeded(response): 76 print() 77 print( 78 color('Classic Address:', 'yellow'), 79 response.return_parameters.bd_addr.to_string(False), 80 ) 81 82 if host.supports_command(HCI_READ_LOCAL_NAME_COMMAND): 83 response = await host.send_command(HCI_Read_Local_Name_Command()) 84 if command_succeeded(response): 85 print() 86 print( 87 color('Local Name:', 'yellow'), 88 map_null_terminated_utf8_string(response.return_parameters.local_name), 89 ) 90 91 92# ----------------------------------------------------------------------------- 93async def get_le_info(host: Host) -> None: 94 print() 95 96 if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND): 97 response = await host.send_command( 98 HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command() 99 ) 100 if command_succeeded(response): 101 print( 102 color('LE Number Of Supported Advertising Sets:', 'yellow'), 103 response.return_parameters.num_supported_advertising_sets, 104 '\n', 105 ) 106 107 if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND): 108 response = await host.send_command( 109 HCI_LE_Read_Maximum_Advertising_Data_Length_Command() 110 ) 111 if command_succeeded(response): 112 print( 113 color('LE Maximum Advertising Data Length:', 'yellow'), 114 response.return_parameters.max_advertising_data_length, 115 '\n', 116 ) 117 118 if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND): 119 response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command()) 120 if command_succeeded(response): 121 print( 122 color('Maximum Data Length:', 'yellow'), 123 ( 124 f'tx:{response.return_parameters.supported_max_tx_octets}/' 125 f'{response.return_parameters.supported_max_tx_time}, ' 126 f'rx:{response.return_parameters.supported_max_rx_octets}/' 127 f'{response.return_parameters.supported_max_rx_time}' 128 ), 129 '\n', 130 ) 131 132 if host.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND): 133 response = await host.send_command( 134 HCI_LE_Read_Suggested_Default_Data_Length_Command() 135 ) 136 if command_succeeded(response): 137 print( 138 color('Suggested Default Data Length:', 'yellow'), 139 f'{response.return_parameters.suggested_max_tx_octets}/' 140 f'{response.return_parameters.suggested_max_tx_time}', 141 '\n', 142 ) 143 144 print(color('LE Features:', 'yellow')) 145 for feature in host.supported_le_features: 146 print(f' {LeFeature(feature).name}') 147 148 149# ----------------------------------------------------------------------------- 150async def get_acl_flow_control_info(host: Host) -> None: 151 print() 152 153 if host.supports_command(HCI_READ_BUFFER_SIZE_COMMAND): 154 response = await host.send_command( 155 HCI_Read_Buffer_Size_Command(), check_result=True 156 ) 157 print( 158 color('ACL Flow Control:', 'yellow'), 159 f'{response.return_parameters.hc_total_num_acl_data_packets} ' 160 f'packets of size {response.return_parameters.hc_acl_data_packet_length}', 161 ) 162 163 if host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND): 164 response = await host.send_command( 165 HCI_LE_Read_Buffer_Size_Command(), check_result=True 166 ) 167 print( 168 color('LE ACL Flow Control:', 'yellow'), 169 f'{response.return_parameters.hc_total_num_le_acl_data_packets} ' 170 f'packets of size {response.return_parameters.hc_le_acl_data_packet_length}', 171 ) 172 173 174# ----------------------------------------------------------------------------- 175async def get_codecs_info(host: Host) -> None: 176 print() 177 178 if host.supports_command(HCI_Read_Local_Supported_Codecs_V2_Command.op_code): 179 response = await host.send_command( 180 HCI_Read_Local_Supported_Codecs_V2_Command(), check_result=True 181 ) 182 print(color('Codecs:', 'yellow')) 183 184 for codec_id, transport in zip( 185 response.return_parameters.standard_codec_ids, 186 response.return_parameters.standard_codec_transports, 187 ): 188 transport_name = HCI_Read_Local_Supported_Codecs_V2_Command.Transport( 189 transport 190 ).name 191 codec_name = CodecID(codec_id).name 192 print(f' {codec_name} - {transport_name}') 193 194 for codec_id, transport in zip( 195 response.return_parameters.vendor_specific_codec_ids, 196 response.return_parameters.vendor_specific_codec_transports, 197 ): 198 transport_name = HCI_Read_Local_Supported_Codecs_V2_Command.Transport( 199 transport 200 ).name 201 company = name_or_number(COMPANY_IDENTIFIERS, codec_id >> 16) 202 print(f' {company} / {codec_id & 0xFFFF} - {transport_name}') 203 204 if not response.return_parameters.standard_codec_ids: 205 print(' No standard codecs') 206 if not response.return_parameters.vendor_specific_codec_ids: 207 print(' No Vendor-specific codecs') 208 209 if host.supports_command(HCI_Read_Local_Supported_Codecs_Command.op_code): 210 response = await host.send_command( 211 HCI_Read_Local_Supported_Codecs_Command(), check_result=True 212 ) 213 print(color('Codecs (BR/EDR):', 'yellow')) 214 for codec_id in response.return_parameters.standard_codec_ids: 215 codec_name = CodecID(codec_id).name 216 print(f' {codec_name}') 217 218 for codec_id in response.return_parameters.vendor_specific_codec_ids: 219 company = name_or_number(COMPANY_IDENTIFIERS, codec_id >> 16) 220 print(f' {company} / {codec_id & 0xFFFF}') 221 222 if not response.return_parameters.standard_codec_ids: 223 print(' No standard codecs') 224 if not response.return_parameters.vendor_specific_codec_ids: 225 print(' No Vendor-specific codecs') 226 227 228# ----------------------------------------------------------------------------- 229async def async_main(latency_probes, transport): 230 print('<<< connecting to HCI...') 231 async with await open_transport_or_link(transport) as (hci_source, hci_sink): 232 print('<<< connected') 233 234 host = Host(hci_source, hci_sink) 235 await host.reset() 236 237 # Measure the latency if requested 238 latencies = [] 239 if latency_probes: 240 for _ in range(latency_probes): 241 start = time.time() 242 await host.send_command(HCI_Read_Local_Version_Information_Command()) 243 latencies.append(1000 * (time.time() - start)) 244 print( 245 color('HCI Command Latency:', 'yellow'), 246 ( 247 f'min={min(latencies):.2f}, ' 248 f'max={max(latencies):.2f}, ' 249 f'average={sum(latencies)/len(latencies):.2f}' 250 ), 251 '\n', 252 ) 253 254 # Print version 255 print(color('Version:', 'yellow')) 256 print( 257 color(' Manufacturer: ', 'green'), 258 name_or_number(COMPANY_IDENTIFIERS, host.local_version.company_identifier), 259 ) 260 print( 261 color(' HCI Version: ', 'green'), 262 name_or_number(HCI_VERSION_NAMES, host.local_version.hci_version), 263 ) 264 print(color(' HCI Subversion:', 'green'), host.local_version.hci_subversion) 265 print( 266 color(' LMP Version: ', 'green'), 267 name_or_number(LMP_VERSION_NAMES, host.local_version.lmp_version), 268 ) 269 print(color(' LMP Subversion:', 'green'), host.local_version.lmp_subversion) 270 271 # Get the Classic info 272 await get_classic_info(host) 273 274 # Get the LE info 275 await get_le_info(host) 276 277 # Print the ACL flow control info 278 await get_acl_flow_control_info(host) 279 280 # Get codec info 281 await get_codecs_info(host) 282 283 # Print the list of commands supported by the controller 284 print() 285 print(color('Supported Commands:', 'yellow')) 286 for command in host.supported_commands: 287 print(f' {HCI_Command.command_name(command)}') 288 289 290# ----------------------------------------------------------------------------- 291@click.command() 292@click.option( 293 '--latency-probes', 294 metavar='N', 295 type=int, 296 help='Send N commands to measure HCI transport latency statistics', 297) 298@click.argument('transport') 299def main(latency_probes, transport): 300 logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper()) 301 asyncio.run(async_main(latency_probes, transport)) 302 303 304# ----------------------------------------------------------------------------- 305if __name__ == '__main__': 306 main() 307