# Lint as: python3 # Copyright 2022 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Module to execute hcitool commands according to Bluetooth Core Spec v5.2.""" import btsocket import logging import struct from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import error class Hcitool(object): """Executes hcitool commands according to Bluetooth Core Spec v5.2.""" CONTROLLER_PASS_CODE_VALUE = 0 HCI_COMMAND_COMPLETE_EVENT = '0x0e' def _execute_hcitool_cmd(self, ogf, ocf, *parameter): """Executes hcitool commands using 'hcitool cmd ... ' NOTE: return list depend on the Bluetooth Core Spec documentation. @param ogf: btsocket.OGF_... (int value). @param ocf: btsocket.OCF_... (int value). @param *parameter: parameter as hex string, e.g., ...,'1A','FA'. @return: list of the hcitool output. In case of failure, returns [hcitool status]. """ params = ['hcitool', 'cmd', hex(ogf), hex(ocf)] params.extend(parameter) cmd = ' '.join(params) logging.debug('Running "%s"', cmd) # Output format of hcitool command: # < HCI Command: ogf 0xXX, ocf 0xXXXX, plen X # > HCI Event: 0xXX plen XX # XX XX XX XX XX XX XX XX XX XX ... output = utils.system_output(cmd, retain_output=True) output_parse_value = HciToolParser.parse_output(output) event_type, plen_value, status, event_bytearray = output_parse_value if event_type != self.HCI_COMMAND_COMPLETE_EVENT: raise error.TestError( 'Expect Command complete event with value: ' + self.HCI_COMMAND_COMPLETE_EVENT + ' but got ' + event_type) if len(event_bytearray) != plen_value: raise error.TestError('Expect plen value of ' + str(plen_value) + 'but got ' + str(len(event_bytearray))) if status != self.CONTROLLER_PASS_CODE_VALUE: return [status] return HciToolParser.parse_payload(event_bytearray, ogf, ocf) @staticmethod def filter_with_mask(names, mask): """Picks the supported names base on the given mask. @param names: List of names like feature,commands,... @param mask: A bitmask (8 bit little-endian) or a list of bitmasks. @return: List of supported names (features/commands/...). """ if isinstance(mask, list): # Convert masks to bitstring in little-endian. mask = ''.join('{0:08b}'.format(m)[::-1] for m in mask) else: mask = '{:b}'.format(mask) mask = mask[::-1] return [names[i] for i, m in enumerate(mask) if m == '1'] def _execute_hcitool_cmd_or_raise(self, ogf, ocf, *parameter): result = self._execute_hcitool_cmd(ogf, ocf, *parameter) status = result[0] if status != self.CONTROLLER_PASS_CODE_VALUE: raise error.TestError( 'Unexpected command output, the status code is ' + str(status)) return result def read_buffer_size(self): """Reads the buffer size of the BT controller. @returns: (status, acl_data_packet_length, synchronous_data_packet_length, total_num_acl_data_packets, total_num_synchronous_data_packets). """ return self._execute_hcitool_cmd_or_raise( btsocket.OGF_INFO_PARAM, btsocket.OCF_READ_BUFFER_SIZE) def read_local_supported_features(self): """Reads local supported features for BR/EDR. @returns: (status, [features_name_list]). """ execute_command_result = self._execute_hcitool_cmd_or_raise( btsocket.OGF_INFO_PARAM, btsocket.OCF_READ_LOCAL_FEATURES) status = execute_command_result[0] lmp_features_mask = execute_command_result[1] supported_features = SupportedFeatures.SUPPORTED_FEATURES_PAGE_ZERO final_result = self.filter_with_mask(supported_features, lmp_features_mask) return status, final_result def read_local_extended_features(self, page_number): """Reads local supported extended features for BR/EDR. @param: page number (0,1,2). @returns: (status, return_page_number, maximum_page_number, [features_name_list]). """ if page_number not in (0, 1, 2): raise error.TestError( 'Invalid page_number: want (0, 1, 2), actual: ' + str(page_number)) execute_command_result = self._execute_hcitool_cmd_or_raise( btsocket.OGF_INFO_PARAM, btsocket.OCF_READ_LOCAL_EXT_FEATURES, str(page_number)) status = execute_command_result[0] return_page_number = execute_command_result[1] maximum_page_number = execute_command_result[2] extended_mask = execute_command_result[3] supported_features = [] if page_number == 0: supported_features = SupportedFeatures.SUPPORTED_FEATURES_PAGE_ZERO elif page_number == 1: supported_features = SupportedFeatures.SUPPORTED_FEATURES_PAGE_ONE elif page_number == 2: supported_features = SupportedFeatures.SUPPORTED_FEATURES_PAGE_TWO final_result = self.filter_with_mask(supported_features, extended_mask) return status, return_page_number, maximum_page_number, final_result def read_le_local_supported_features(self): """Reads LE (Low Energy) supported features. @return: (status, [LE_features_name_list]). """ execute_command_result = self._execute_hcitool_cmd_or_raise( btsocket.OGF_LE_CTL, btsocket.OCF_LE_READ_LOCAL_SUPPORTED_FEATURES) status = execute_command_result[0] le_features_mask = execute_command_result[1] le_supported_features = SupportedFeatures.LE_SUPPORTED_FEATURE final_result = self.filter_with_mask(le_supported_features, le_features_mask) return status, final_result def set_event_filter(self, filter_type, filter_condition_type, condition): """Sets event filter. @param filter_type: filter type. @param filter_condition_type: filter condition type. @param condition: condition. @return: [status]. """ execute_command_result = self._execute_hcitool_cmd( btsocket.OGF_HOST_CTL, btsocket.OCF_SET_EVENT_FLT, filter_type, filter_condition_type, condition) return execute_command_result def read_local_supported_commands(self): """Reads local supported commands. @return: (status, [supported_commands_name_list]). """ execute_command_result = self._execute_hcitool_cmd_or_raise( btsocket.OGF_INFO_PARAM, btsocket.OCF_READ_LOCAL_COMMANDS) status = execute_command_result[0] commands_mask = list(execute_command_result[1:]) commands = SupportedCommands.SUPPORTED_COMMANDS final_result = self.filter_with_mask(commands, commands_mask) return status, final_result def check_command_supported(self, command_name): """Check if the given command name is supported. @param: command_name as string, e.g., HCI_Inquiry. @return: True if the command is supported, False otherwise. """ supported_commands = self.read_local_supported_commands()[1] return command_name in supported_commands def le_read_accept_list_size(self): """Reads accept list size of the BT LE controller. @returns: (status, accept_list_size). """ return self._execute_hcitool_cmd_or_raise( btsocket.OGF_LE_CTL, btsocket.OCF_LE_READ_ACCEPT_LIST_SIZE) def le_read_maximum_data_length(self): """Reads packet data length of the BT LE controller. @returns: (status, supported_max_tx_octets, supported_max_tx_time, supported_max_rx_octets, supported_max_rx_time). """ return self._execute_hcitool_cmd_or_raise( btsocket.OGF_LE_CTL, HciToolParser.OCF_LE_READ_MAXIMUM_DATA_LENGTH) def le_read_resolving_list_size(self): """Reads resolving list size of the BT LE controller. @returns: (status, resolving_list_size). """ return self._execute_hcitool_cmd_or_raise( btsocket.OGF_LE_CTL, HciToolParser.OCF_LE_READ_RESOLVING_LIST_SIZE) def le_read_number_of_supported_advertising_sets(self): """Reads number of supported advertisement sets. @returns: (status, num_supported_advertising_sets). """ return self._execute_hcitool_cmd_or_raise( btsocket.OGF_LE_CTL, HciToolParser.OCF_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS) def vs_msft_read_supported_features(self, msft_ocf): """Reads VS MSFT supported features. @param msft_ocf: The msft_ocf for different chipset. @returns: (status, subcommand_opcode, [vs_msft_features_name_list], microsoft_event_prefix_length, microsoft_event_prefix) """ VS_MSFT_READ_SUPPORTED_FEATURES_SUBCOMMAND_OPCODE = '00' execute_command_result = self._execute_hcitool_cmd_or_raise( btsocket.OGF_VENDOR_CMD, msft_ocf, VS_MSFT_READ_SUPPORTED_FEATURES_SUBCOMMAND_OPCODE) status = execute_command_result[0] vs_msft_features_mask = execute_command_result[2] vs_msft_supported_features = ( SupportedFeatures.VS_MSFT_SUPPORTED_FEATURES) final_result = self.filter_with_mask(vs_msft_supported_features, vs_msft_features_mask) (_, subcommand_opcode, _, microsoft_event_prefix_length, microsoft_event_prefix) = execute_command_result return (status, subcommand_opcode, final_result, microsoft_event_prefix_length, microsoft_event_prefix) def le_get_vendor_capabilities_command(self): """Gets AOSP LE vendor capabilities. @returns: (status, max_advt_instances(deprecated), offloaded_resolution_of_private-address(deprecated), total_scan_results_storage, max_irk_list_sz, filtering_support, max_filter, activity_energy_info_support, version_supported, total_num_of_advt_tracked, extended_scan_support, debug_logging_supported, LE_address_generation_offloading_support(deprecated), A2DP_source_offload_capability_mask, bluetooth_quality_report_support, dynamic_audio_buffer_support). """ execute_command_result = self._execute_hcitool_cmd_or_raise( btsocket.OGF_VENDOR_CMD, HciToolParser.OCF_LE_GET_VENDOR_CAPABILITIES_COMMAND) pack_format = '<{}B'.format(len(execute_command_result)) execute_command_result = struct.pack(pack_format, execute_command_result) aosp_formats = [ '