1# Lint as: python3 2"""Utils for blue tooth tests. 3 4Partly ported from acts/framework/acts/test_utils/bt/bt_test_utils.py 5""" 6 7from __future__ import absolute_import 8from __future__ import division 9from __future__ import print_function 10 11import logging as log 12import os 13import random 14import string 15import time 16import wave 17from queue import Empty 18from typing import Optional 19 20from mobly.controllers.android_device import AndroidDevice 21 22 23class BtTestUtilsError(Exception): 24 pass 25 26 27def convert_pcm_to_wav(pcm_file_path, wave_file_path, audio_params): 28 """Converts raw pcm data into wave file. 29 30 Args: 31 pcm_file_path: File path of origin pcm file. 32 wave_file_path: File path of converted wave file. 33 audio_params: A dict with audio configuration. 34 """ 35 with open(pcm_file_path, 'rb') as pcm_file: 36 frames = pcm_file.read() 37 write_record_file(wave_file_path, audio_params, frames) 38 39 40def create_vcf_from_vcard(output_path: str, 41 num_of_contacts: int, 42 first_name: Optional[str] = None, 43 last_name: Optional[str] = None, 44 phone_number: Optional[int] = None) -> str: 45 """Creates a vcf file from vCard. 46 47 Args: 48 output_path: Path of the output vcf file. 49 num_of_contacts: Number of contacts to be generated. 50 first_name: First name of the contacts. 51 last_name: Last name of the contacts. 52 phone_number: Phone number of the contacts. 53 54 Returns: 55 vcf_file_path: Path of the output vcf file. E.g. 56 "/<output_path>/contacts_<time>.vcf". 57 """ 58 file_name = f'contacts_{int(time.time())}.vcf' 59 vcf_file_path = os.path.join(output_path, file_name) 60 with open(vcf_file_path, 'w+') as f: 61 for i in range(num_of_contacts): 62 lines = [] 63 if first_name is None: 64 first_name = 'Person' 65 vcard_last_name = last_name 66 if last_name is None: 67 vcard_last_name = i 68 vcard_phone_number = phone_number 69 if phone_number is None: 70 vcard_phone_number = random.randrange(int(10e10)) 71 lines.append('BEGIN:VCARD\n') 72 lines.append('VERSION:2.1\n') 73 lines.append(f'N:{vcard_last_name};{first_name};;;\n') 74 lines.append(f'FN:{first_name} {vcard_last_name}\n') 75 lines.append(f'TEL;CELL:{vcard_phone_number}\n') 76 lines.append(f'EMAIL;PREF:{first_name}{vcard_last_name}@gmail.com\n') 77 lines.append('END:VCARD\n') 78 f.write(''.join(lines)) 79 return vcf_file_path 80 81 82def generate_id_by_size(size, chars=(string.ascii_lowercase + string.ascii_uppercase + string.digits)): 83 """Generate random ascii characters of input size and input char types. 84 85 Args: 86 size: Input size of string. 87 chars: (Optional) Chars to use in generating a random string. 88 89 Returns: 90 String of random input chars at the input size. 91 """ 92 return ''.join(random.choice(chars) for _ in range(size)) 93 94 95def get_duration_seconds(wav_file_path): 96 """Get duration of most recently recorded file. 97 98 Args: 99 wav_file_path: path of the wave file. 100 101 Returns: 102 duration (float): duration of recorded file in seconds. 103 """ 104 f = wave.open(wav_file_path, 'r') 105 frames = f.getnframes() 106 rate = f.getframerate() 107 duration = (frames / float(rate)) 108 f.close() 109 return duration 110 111 112def wait_until(timeout_sec, condition_func, func_args, expected_value, exception=None, interval_sec=0.5): 113 """Waits until a function returns a expected value or timeout is reached. 114 115 Example usage: 116 ``` 117 def is_bluetooth_enabled(device) -> bool: 118 do something and return something... 119 120 # Waits and checks if Bluetooth is turned on. 121 bt_test_utils.wait_until( 122 timeout_sec=10, 123 condition_func=is_bluetooth_enabled, 124 func_args=[dut], 125 expected_value=True, 126 exception=signals.TestFailure('Failed to turn on Bluetooth.'), 127 interval_sec=1) 128 ``` 129 130 Args: 131 timeout_sec: float, max waiting time in seconds. 132 condition_func: function, when the condiction function returns the expected 133 value, the waiting mechanism will be interrupted. 134 func_args: tuple or list, the arguments for the condition function. 135 expected_value: a expected value that the condition function returns. 136 exception: Exception, an exception will be raised when timed out if needed. 137 interval_sec: float, interval time between calls of the condition function 138 in seconds. 139 140 Returns: 141 True if the function returns the expected value else False. 142 """ 143 start_time = time.time() 144 end_time = start_time + timeout_sec 145 while time.time() < end_time: 146 if condition_func(*func_args) == expected_value: 147 return True 148 time.sleep(interval_sec) 149 args_string = ', '.join(list(map(str, func_args))) 150 log.warning('Timed out after %.1fs waiting for "%s(%s)" to be "%s".', timeout_sec, condition_func.__name__, 151 args_string, expected_value) 152 if exception: 153 raise exception 154 return False 155 156 157def write_read_verify_data_sl4a(client_ad, server_ad, msg, binary=False): 158 """Verify that the client wrote data to the server Android device correctly. 159 160 Args: 161 client_ad: the Android device to perform the write. 162 server_ad: the Android device to read the data written. 163 msg: the message to write. 164 binary: if the msg arg is binary or not. 165 166 Returns: 167 True if the data written matches the data read, false if not. 168 """ 169 client_ad.log.info('Write message %s.', msg) 170 if binary: 171 client_ad.sl4a.bluetoothSocketConnWriteBinary(msg) 172 else: 173 client_ad.sl4a.bluetoothSocketConnWrite(msg) 174 server_ad.log.info('Read message %s.', msg) 175 if binary: 176 read_msg = server_ad.sl4a.bluetoothSocketConnReadBinary().rstrip('\r\n') 177 else: 178 read_msg = server_ad.sl4a.bluetoothSocketConnRead() 179 log.info('Verify message.') 180 if msg != read_msg: 181 log.error('Mismatch! Read: %s, Expected: %s', read_msg, msg) 182 return False 183 log.info('Matched! Read: %s, Expected: %s', read_msg, msg) 184 return True 185 186 187def write_record_file(file_name, audio_params, frames): 188 """Writes the recorded audio into the file. 189 190 Args: 191 file_name: The file name for writing the recorded audio. 192 audio_params: A dict with audio configuration. 193 frames: Recorded audio frames. 194 """ 195 log.debug('writing frame to %s', file_name) 196 wf = wave.open(file_name, 'wb') 197 wf.setnchannels(audio_params['channel']) 198 wf.setsampwidth(audio_params.get('sample_width', 1)) 199 wf.setframerate(audio_params['sample_rate']) 200 wf.writeframes(frames) 201 wf.close() 202