1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import logging
21import time
22
23import click
24
25from bumble.company_ids import COMPANY_IDENTIFIERS
26from bumble.colors import color
27from bumble.core import name_or_number
28from bumble.hci import (
29    map_null_terminated_utf8_string,
30    CodecID,
31    LeFeature,
32    HCI_SUCCESS,
33    HCI_VERSION_NAMES,
34    LMP_VERSION_NAMES,
35    HCI_Command,
36    HCI_Command_Complete_Event,
37    HCI_Command_Status_Event,
38    HCI_READ_BUFFER_SIZE_COMMAND,
39    HCI_Read_Buffer_Size_Command,
40    HCI_READ_BD_ADDR_COMMAND,
41    HCI_Read_BD_ADDR_Command,
42    HCI_READ_LOCAL_NAME_COMMAND,
43    HCI_Read_Local_Name_Command,
44    HCI_LE_READ_BUFFER_SIZE_COMMAND,
45    HCI_LE_Read_Buffer_Size_Command,
46    HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND,
47    HCI_LE_Read_Maximum_Data_Length_Command,
48    HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND,
49    HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command,
50    HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND,
51    HCI_LE_Read_Maximum_Advertising_Data_Length_Command,
52    HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
53    HCI_LE_Read_Suggested_Default_Data_Length_Command,
54    HCI_Read_Local_Supported_Codecs_Command,
55    HCI_Read_Local_Supported_Codecs_V2_Command,
56    HCI_Read_Local_Version_Information_Command,
57)
58from bumble.host import Host
59from bumble.transport import open_transport_or_link
60
61
62# -----------------------------------------------------------------------------
63def command_succeeded(response):
64    if isinstance(response, HCI_Command_Status_Event):
65        return response.status == HCI_SUCCESS
66    if isinstance(response, HCI_Command_Complete_Event):
67        return response.return_parameters.status == HCI_SUCCESS
68    return False
69
70
71# -----------------------------------------------------------------------------
72async def get_classic_info(host: Host) -> None:
73    if host.supports_command(HCI_READ_BD_ADDR_COMMAND):
74        response = await host.send_command(HCI_Read_BD_ADDR_Command())
75        if command_succeeded(response):
76            print()
77            print(
78                color('Classic Address:', 'yellow'),
79                response.return_parameters.bd_addr.to_string(False),
80            )
81
82    if host.supports_command(HCI_READ_LOCAL_NAME_COMMAND):
83        response = await host.send_command(HCI_Read_Local_Name_Command())
84        if command_succeeded(response):
85            print()
86            print(
87                color('Local Name:', 'yellow'),
88                map_null_terminated_utf8_string(response.return_parameters.local_name),
89            )
90
91
92# -----------------------------------------------------------------------------
93async def get_le_info(host: Host) -> None:
94    print()
95
96    if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
97        response = await host.send_command(
98            HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
99        )
100        if command_succeeded(response):
101            print(
102                color('LE Number Of Supported Advertising Sets:', 'yellow'),
103                response.return_parameters.num_supported_advertising_sets,
104                '\n',
105            )
106
107    if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
108        response = await host.send_command(
109            HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
110        )
111        if command_succeeded(response):
112            print(
113                color('LE Maximum Advertising Data Length:', 'yellow'),
114                response.return_parameters.max_advertising_data_length,
115                '\n',
116            )
117
118    if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
119        response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
120        if command_succeeded(response):
121            print(
122                color('Maximum Data Length:', 'yellow'),
123                (
124                    f'tx:{response.return_parameters.supported_max_tx_octets}/'
125                    f'{response.return_parameters.supported_max_tx_time}, '
126                    f'rx:{response.return_parameters.supported_max_rx_octets}/'
127                    f'{response.return_parameters.supported_max_rx_time}'
128                ),
129                '\n',
130            )
131
132    if host.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND):
133        response = await host.send_command(
134            HCI_LE_Read_Suggested_Default_Data_Length_Command()
135        )
136        if command_succeeded(response):
137            print(
138                color('Suggested Default Data Length:', 'yellow'),
139                f'{response.return_parameters.suggested_max_tx_octets}/'
140                f'{response.return_parameters.suggested_max_tx_time}',
141                '\n',
142            )
143
144    print(color('LE Features:', 'yellow'))
145    for feature in host.supported_le_features:
146        print(f'  {LeFeature(feature).name}')
147
148
149# -----------------------------------------------------------------------------
150async def get_acl_flow_control_info(host: Host) -> None:
151    print()
152
153    if host.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
154        response = await host.send_command(
155            HCI_Read_Buffer_Size_Command(), check_result=True
156        )
157        print(
158            color('ACL Flow Control:', 'yellow'),
159            f'{response.return_parameters.hc_total_num_acl_data_packets} '
160            f'packets of size {response.return_parameters.hc_acl_data_packet_length}',
161        )
162
163    if host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
164        response = await host.send_command(
165            HCI_LE_Read_Buffer_Size_Command(), check_result=True
166        )
167        print(
168            color('LE ACL Flow Control:', 'yellow'),
169            f'{response.return_parameters.hc_total_num_le_acl_data_packets} '
170            f'packets of size {response.return_parameters.hc_le_acl_data_packet_length}',
171        )
172
173
174# -----------------------------------------------------------------------------
175async def get_codecs_info(host: Host) -> None:
176    print()
177
178    if host.supports_command(HCI_Read_Local_Supported_Codecs_V2_Command.op_code):
179        response = await host.send_command(
180            HCI_Read_Local_Supported_Codecs_V2_Command(), check_result=True
181        )
182        print(color('Codecs:', 'yellow'))
183
184        for codec_id, transport in zip(
185            response.return_parameters.standard_codec_ids,
186            response.return_parameters.standard_codec_transports,
187        ):
188            transport_name = HCI_Read_Local_Supported_Codecs_V2_Command.Transport(
189                transport
190            ).name
191            codec_name = CodecID(codec_id).name
192            print(f'  {codec_name} - {transport_name}')
193
194        for codec_id, transport in zip(
195            response.return_parameters.vendor_specific_codec_ids,
196            response.return_parameters.vendor_specific_codec_transports,
197        ):
198            transport_name = HCI_Read_Local_Supported_Codecs_V2_Command.Transport(
199                transport
200            ).name
201            company = name_or_number(COMPANY_IDENTIFIERS, codec_id >> 16)
202            print(f'  {company} / {codec_id & 0xFFFF} - {transport_name}')
203
204        if not response.return_parameters.standard_codec_ids:
205            print('  No standard codecs')
206        if not response.return_parameters.vendor_specific_codec_ids:
207            print('  No Vendor-specific codecs')
208
209    if host.supports_command(HCI_Read_Local_Supported_Codecs_Command.op_code):
210        response = await host.send_command(
211            HCI_Read_Local_Supported_Codecs_Command(), check_result=True
212        )
213        print(color('Codecs (BR/EDR):', 'yellow'))
214        for codec_id in response.return_parameters.standard_codec_ids:
215            codec_name = CodecID(codec_id).name
216            print(f'  {codec_name}')
217
218        for codec_id in response.return_parameters.vendor_specific_codec_ids:
219            company = name_or_number(COMPANY_IDENTIFIERS, codec_id >> 16)
220            print(f'  {company} / {codec_id & 0xFFFF}')
221
222        if not response.return_parameters.standard_codec_ids:
223            print('  No standard codecs')
224        if not response.return_parameters.vendor_specific_codec_ids:
225            print('  No Vendor-specific codecs')
226
227
228# -----------------------------------------------------------------------------
229async def async_main(latency_probes, transport):
230    print('<<< connecting to HCI...')
231    async with await open_transport_or_link(transport) as (hci_source, hci_sink):
232        print('<<< connected')
233
234        host = Host(hci_source, hci_sink)
235        await host.reset()
236
237        # Measure the latency if requested
238        latencies = []
239        if latency_probes:
240            for _ in range(latency_probes):
241                start = time.time()
242                await host.send_command(HCI_Read_Local_Version_Information_Command())
243                latencies.append(1000 * (time.time() - start))
244            print(
245                color('HCI Command Latency:', 'yellow'),
246                (
247                    f'min={min(latencies):.2f}, '
248                    f'max={max(latencies):.2f}, '
249                    f'average={sum(latencies)/len(latencies):.2f}'
250                ),
251                '\n',
252            )
253
254        # Print version
255        print(color('Version:', 'yellow'))
256        print(
257            color('  Manufacturer:  ', 'green'),
258            name_or_number(COMPANY_IDENTIFIERS, host.local_version.company_identifier),
259        )
260        print(
261            color('  HCI Version:   ', 'green'),
262            name_or_number(HCI_VERSION_NAMES, host.local_version.hci_version),
263        )
264        print(color('  HCI Subversion:', 'green'), host.local_version.hci_subversion)
265        print(
266            color('  LMP Version:   ', 'green'),
267            name_or_number(LMP_VERSION_NAMES, host.local_version.lmp_version),
268        )
269        print(color('  LMP Subversion:', 'green'), host.local_version.lmp_subversion)
270
271        # Get the Classic info
272        await get_classic_info(host)
273
274        # Get the LE info
275        await get_le_info(host)
276
277        # Print the ACL flow control info
278        await get_acl_flow_control_info(host)
279
280        # Get codec info
281        await get_codecs_info(host)
282
283        # Print the list of commands supported by the controller
284        print()
285        print(color('Supported Commands:', 'yellow'))
286        for command in host.supported_commands:
287            print(f'  {HCI_Command.command_name(command)}')
288
289
290# -----------------------------------------------------------------------------
291@click.command()
292@click.option(
293    '--latency-probes',
294    metavar='N',
295    type=int,
296    help='Send N commands to measure HCI transport latency statistics',
297)
298@click.argument('transport')
299def main(latency_probes, transport):
300    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
301    asyncio.run(async_main(latency_probes, transport))
302
303
304# -----------------------------------------------------------------------------
305if __name__ == '__main__':
306    main()
307