1# Lint as: python2, python3 2# Copyright 2021 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""This is a server side noise cancellation test using the Chameleon board.""" 6 7import logging 8import os 9import time 10 11from autotest_lib.client.common_lib import error 12from autotest_lib.client.cros.audio import audio_test_data 13from autotest_lib.client.cros.audio import sox_utils 14from autotest_lib.client.cros.audio import visqol_utils 15from autotest_lib.client.cros.bluetooth.bluetooth_audio_test_data import ( 16 download_file_from_bucket, get_visqol_binary) 17from autotest_lib.client.cros.chameleon import audio_test_utils 18from autotest_lib.client.cros.chameleon import chameleon_audio_ids 19from autotest_lib.client.cros.chameleon import chameleon_audio_helper 20from autotest_lib.server.cros.audio import audio_test 21 22DIST_FILES_DIR = 'gs://chromeos-localmirror/distfiles/test_noise_cancellation' 23DATA_DIR = '/tmp' 24 25 26# Verification steps for the Noise Cancellation processing (NC): 27# 1. Prepare the audio source file and reference file. 28# 2. Play the source file by Chameleon. 29# 3. Record by DUT Internal Mic when NC is on and get ViSQOL score A. 30# 4. Repeat step 2. 31# 5. Record by DUT Internal Mic when NC is off and get ViSQOL score B. 32# 6. Check if A - B >= threshold 33# 34# In practice, ViSQOL is not the most suitable metrics for NC due to its 35# intrusive design (reference: go/visqol). However, it is fair enough to compare 36# the relative gain (or degradation) between before and after de-noising. 37# 38# TODO(johnylin): replace ViSQOL with other metrics if applicable. 39# TODO(johnylin): add more speech and noise test inputs for inclusion. 40class audio_AudioNoiseCancellation(audio_test.AudioTest): 41 """Server side input audio noise cancellation test. 42 43 This test talks to a Chameleon board and a Cros device to verify 44 input audio noise cancellation function of the Cros device. 45 46 """ 47 version = 1 48 DELAY_BEFORE_PLAYBACK_SECONDS = 3.0 49 DELAY_AFTER_PLAYBACK_SECONDS = 2.0 50 DELAY_AFTER_BINDING = 0.5 51 DELAY_AFTER_NC_TOGGLED = 0.5 52 53 cleanup_files = [] 54 55 def cleanup(self): 56 # Restore the default state of bypass blocking mechanism in Cras. 57 # Restarting Cras is only way because we are not able to know the 58 # default state. 59 self.host.run('restart cras') 60 61 # Start Chrome UI. 62 self.host.run('start ui') 63 64 # Remove downloaded files and the temporary generated files. 65 for cleanup_file in self.cleanup_files: 66 if os.path.isfile(cleanup_file): 67 os.remove(cleanup_file) 68 69 def download_file_from_bucket(self, file): 70 """Download the file from GS bucket. 71 72 @param file: the file name for download. 73 74 @raises: error.TestError if failed. 75 76 @returns: the local path of the downloaded file. 77 """ 78 remote_path = os.path.join(DIST_FILES_DIR, file) 79 if not download_file_from_bucket( 80 DATA_DIR, remote_path, lambda _, __, p: p.returncode == 0): 81 logging.error('Failed to download %s to %s', remote_path, DATA_DIR) 82 raise error.TestError('Failed to download file %s from bucket.' % 83 file) 84 85 return os.path.join(DATA_DIR, file) 86 87 def generate_noisy_speech_file(self, speech_path, noise_path): 88 """Generate the mixed audio file of speech and noise data. 89 90 @param speech_path: the file path of the pure speech audio. 91 @param noise_path: the file path of the noise audio. 92 93 @raises: error.TestError if failed. 94 95 @returns: the file path of the mixed audio. 96 """ 97 mixed_wav_path = os.path.join(DATA_DIR, 'speech_noise_mixed.wav') 98 if os.path.exists(mixed_wav_path): 99 os.remove(mixed_wav_path) 100 sox_utils.mix_two_wav_files(speech_path, 101 noise_path, 102 mixed_wav_path, 103 input_volume=1.0) 104 if not os.path.isfile(mixed_wav_path): 105 logging.error('WAV file %s does not exist.', mixed_wav_path) 106 raise error.TestError('Failed to mix %s and %s by sox commands.' % 107 (speech_path, noise_path)) 108 109 return mixed_wav_path 110 111 def run_once(self, test_data): 112 """Runs Audio Noise Cancellation test. 113 114 Test scenarios can be distinguished by the elements (keys) in test_data. 115 Noisy environment test: 116 test_data = dict( 117 speech_file: the WAV file for the pure speech data. 118 noise_file: the WAV file for the noise data. 119 threshold: the min required score gain for NC effect.) 120 Quiet environment test: 121 test_data = dict( 122 speech_file: the WAV file for the pure speech data. 123 threshold: the min score diff tolerance for NC effect.) 124 125 @param test_data: the dict for files and threshold as mentioned above. 126 """ 127 if not self.facade.get_noise_cancellation_supported(): 128 logging.warning('Noise Cancellation is not supported.') 129 raise error.TestWarn('Noise Cancellation is not supported.') 130 131 def _remove_at_cleanup(filepath): 132 self.cleanup_files.append(filepath) 133 134 # Download the files from bucket. 135 speech_path = self.download_file_from_bucket(test_data['speech_file']) 136 _remove_at_cleanup(speech_path) 137 138 ref_infos = sox_utils.get_infos_from_wav_file(speech_path) 139 if ref_infos is None: 140 raise error.TestError('Failed to get infos from wav file %s.' % 141 speech_path) 142 143 if 'noise_file' in test_data: 144 # Noisy environment test when 'noise_file' is given. 145 noise_path = self.download_file_from_bucket( 146 test_data['noise_file']) 147 _remove_at_cleanup(noise_path) 148 149 test_audio_path = self.generate_noisy_speech_file( 150 speech_path, noise_path) 151 _remove_at_cleanup(test_audio_path) 152 153 test_infos = sox_utils.get_infos_from_wav_file(test_audio_path) 154 if test_infos is None: 155 raise error.TestError('Failed to get infos from wav file %s.' % 156 test_audio_path) 157 else: 158 # Quiet environment test. 159 test_audio_path = speech_path 160 test_infos = ref_infos 161 162 playback_testdata = audio_test_data.AudioTestData( 163 path=test_audio_path, 164 data_format=dict(file_type='wav', 165 sample_format='S{}_LE'.format( 166 test_infos['bits']), 167 channel=test_infos['channels'], 168 rate=test_infos['rate']), 169 duration_secs=test_infos['duration']) 170 171 # Get and set VISQOL working environment. 172 get_visqol_binary() 173 174 # Bypass blocking mechanism in Cras to make sure Noise Cancellation is 175 # enabled. 176 self.facade.set_bypass_block_noise_cancellation(bypass=True) 177 178 source = self.widget_factory.create_widget( 179 chameleon_audio_ids.ChameleonIds.LINEOUT) 180 sink = self.widget_factory.create_widget( 181 chameleon_audio_ids.PeripheralIds.SPEAKER) 182 binder = self.widget_factory.create_binder(source, sink) 183 184 recorder = self.widget_factory.create_widget( 185 chameleon_audio_ids.CrosIds.INTERNAL_MIC) 186 187 # Select and check the node selected by cras is correct. 188 audio_test_utils.check_and_set_chrome_active_node_types( 189 self.facade, None, 190 audio_test_utils.get_internal_mic_node(self.host)) 191 192 # Adjust the proper input gain. 193 self.facade.set_chrome_active_input_gain(50) 194 195 # Stop Chrome UI to avoid NC state preference intervened by Chrome. 196 self.host.run('stop ui') 197 logging.info( 198 'UI is stopped to avoid NC preference intervention from Chrome' 199 ) 200 201 def _run_routine(recorded_filename, nc_enabled): 202 # Set NC state via D-Bus control. 203 self.facade.set_noise_cancellation_enabled(nc_enabled) 204 time.sleep(self.DELAY_AFTER_NC_TOGGLED) 205 206 with chameleon_audio_helper.bind_widgets(binder): 207 time.sleep(self.DELAY_AFTER_BINDING) 208 209 logfile_suffix = 'nc_on' if nc_enabled else 'nc_off' 210 audio_test_utils.dump_cros_audio_logs( 211 self.host, self.facade, self.resultsdir, 212 'after_binding.{}'.format(logfile_suffix)) 213 214 logging.info('Set playback data on Chameleon') 215 source.set_playback_data(playback_testdata) 216 217 # Start recording, wait a few seconds, and then start playback. 218 # Make sure the recorded data has silent samples in the 219 # beginning to trim, and includes the entire playback content. 220 logging.info('Start recording from Cros device') 221 recorder.start_recording() 222 time.sleep(self.DELAY_BEFORE_PLAYBACK_SECONDS) 223 224 logging.info('Start playing %s from Chameleon', 225 playback_testdata.path) 226 source.start_playback() 227 228 time.sleep(test_infos['duration'] + 229 self.DELAY_AFTER_PLAYBACK_SECONDS) 230 231 recorder.stop_recording() 232 logging.info('Stopped recording from Cros device.') 233 234 audio_test_utils.dump_cros_audio_logs( 235 self.host, self.facade, self.resultsdir, 236 'after_recording.{}'.format(logfile_suffix)) 237 238 recorder.read_recorded_binary() 239 logging.info('Read recorded binary from Cros device.') 240 241 # Remove the beginning of recorded data. This is to avoid artifact 242 # caused by Cros device codec initialization in the beginning of 243 # recording. 244 recorder.remove_head(1.0) 245 246 recorded_file = os.path.join(self.resultsdir, 247 recorded_filename + '.raw') 248 logging.info('Saving recorded data to %s', recorded_file) 249 recorder.save_file(recorded_file) 250 _remove_at_cleanup(recorded_file) 251 252 # WAV file is also saved by recorder.save_file(). 253 recorded_wav_path = recorded_file + '.wav' 254 if not os.path.isfile(recorded_wav_path): 255 logging.error('WAV file %s does not exist.', recorded_wav_path) 256 raise error.TestError('Failed to find recorded wav file.') 257 _remove_at_cleanup(recorded_wav_path) 258 259 rec_infos = sox_utils.get_infos_from_wav_file(recorded_wav_path) 260 if rec_infos is None: 261 raise error.TestError('Failed to get infos from wav file %s.' % 262 recorded_wav_path) 263 264 # Downsample the recorded data from 48k to 16k rate. It is required 265 # for getting ViSQOL score in speech mode. 266 recorded_16k_path = '{}_16k{}'.format( 267 *os.path.splitext(recorded_wav_path)) 268 sox_utils.convert_format(recorded_wav_path, 269 rec_infos['channels'], 270 rec_infos['bits'], 271 rec_infos['rate'], 272 recorded_16k_path, 273 ref_infos['channels'], 274 ref_infos['bits'], 275 ref_infos['rate'], 276 1.0, 277 use_src_header=True, 278 use_dst_header=True) 279 280 # Remove the silence in the beginning and trim to the same duration 281 # as the reference file. 282 trimmed_recorded_16k_path = '{}_trim{}'.format( 283 *os.path.splitext(recorded_16k_path)) 284 sox_utils.trim_silence_from_wav_file(recorded_16k_path, 285 trimmed_recorded_16k_path, 286 ref_infos['duration'], 287 duration_threshold=0.05) 288 289 score = visqol_utils.get_visqol_score( 290 ref_file=speech_path, 291 deg_file=trimmed_recorded_16k_path, 292 log_dir=self.resultsdir, 293 speech_mode=True) 294 295 logging.info('Recorded audio %s got ViSQOL score: %f', 296 recorded_filename, score) 297 return score 298 299 logging.info('Run routine with NC enabled...') 300 nc_on_score = _run_routine('record_nc_enabled', nc_enabled=True) 301 logging.info('Run routine with NC disabled...') 302 nc_off_score = _run_routine('record_nc_disabled', nc_enabled=False) 303 304 score_diff = nc_on_score - nc_off_score 305 306 # Track ViSQOL performance score 307 test_desc = 'internal_mic_noise_cancellation_visqol_diff' 308 self.write_perf_keyval({test_desc: score_diff}) 309 310 if score_diff < test_data['threshold']: 311 raise error.TestFail( 312 'ViSQOL score diff for NC(=%f) is lower than threshold(=%f)' 313 % (score_diff, test_data['threshold'])) 314