1# Lint as: python2, python3 2# Copyright 2021 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import logging 7import os 8import subprocess 9import time 10 11from autotest_lib.client.bin import test 12from autotest_lib.client.bin import utils 13from autotest_lib.client.common_lib import error 14from autotest_lib.client.cros.audio import audio_helper 15from autotest_lib.client.cros.audio import sox_utils 16 17 18class audio_CrasAec(test.test): 19 """Verifies echo cancellation functions well.""" 20 version = 1 21 22 INT_SPK_CRAS_NODE_TYPE = 'INTERNAL_SPEAKER' 23 INT_MIC_CRAS_NODE_TYPE = 'INTERNAL_MIC' 24 25 # (sample rate, channels, rms threshold) 26 # The rms_threshold value is determined by experiments. 27 TEST_DATA = [ 28 (48000, 1, 0.015), 29 (44100, 1, 0.015), 30 (16000, 1, 0.015), 31 (44100, 2, 0.015), 32 (48000, 2, 0.015), 33 (16000, 2, 0.015), 34 ] 35 36 def play_sound(self): 37 """Plays the given audio content.""" 38 cmd = [ 39 'cras_test_client', '--playback_file', 40 os.path.join(self.bindir, 'human-voice.raw') 41 ] 42 self._play_sound_proc = subprocess.Popen(cmd) 43 44 def record_aec(self, rate, channels): 45 """Records the looped audio with AEC processing. """ 46 file_name = os.path.join(self.resultsdir, 47 'record-%d-ch%d.raw' % (rate, channels)) 48 cmd = [ 49 'cras_test_client', '--loopback_file', file_name, '--effects', 50 'aec', '--rate', 51 str(rate), '--post_dsp', '2', '--num_channels', 52 str(channels) 53 ] 54 self._record_aec_proc = subprocess.Popen(cmd) 55 return file_name 56 57 def aecdump(self, stream_id, rate, channels): 58 """Do the AEC dump parallelly.""" 59 60 file_name = os.path.join(self.resultsdir, 61 'aecdump-%d-ch%d.raw' % (rate, channels)) 62 cmd = [ 63 'cras_test_client', '--aecdump', file_name, '--stream_id', 64 str(stream_id), '--duration', 65 str(10) 66 ] 67 self._dump_aec_proc = subprocess.Popen(cmd) 68 69 def setup_test_procs(self): 70 """Initializes process variables for this test.""" 71 self._dump_aec_proc = None 72 self._record_aec_proc = None 73 self._play_sound_proc = None 74 75 def cleanup_test_procs(self): 76 """Cleans up all cras_test_client processes used in test.""" 77 if self._dump_aec_proc: 78 self._dump_aec_proc.kill() 79 if self._record_aec_proc: 80 self._record_aec_proc.kill() 81 if self._play_sound_proc: 82 self._play_sound_proc.kill() 83 84 def get_aec_stream_id(self): 85 """Gets the first AEC stream id in decimal. """ 86 proc = subprocess.Popen(['cras_test_client', '--dump_a'], 87 stdout=subprocess.PIPE) 88 output, err = proc.communicate() 89 lines = output.decode().split('\n') 90 # Filter through the summary lines by effects 0x0001 to find 91 # the stream id. 92 for line in lines: 93 words = line.split(' ') 94 if words[0] != 'Summary:': 95 continue 96 97 logging.debug("audio dump summaries: %s", line) 98 if words[8] == '0x0001': 99 return int(words[3], 16) 100 101 return None 102 103 def test_sample_rate_and_channels(self, rate, channels): 104 """ 105 Configures CRAS to use aloop as input and output option. 106 Plays the given audio content then record through aloop. 107 Expects the AEC cancels well because the two-way data 108 are the same except scaling and time shift. 109 110 @param rarte: the sample rate to create capture stream 111 @param channels: the number of channels to create capture stream 112 113 @returns: the rms value reported by sox util. 114 """ 115 self.setup_test_procs() 116 117 try: 118 self.play_sound() 119 recorded_file = self.record_aec(rate, channels) 120 121 # Wait at most 2 seconds for AEC stream to be ready for aecdump. 122 stream_id = utils.poll_for_condition(self.get_aec_stream_id, 123 timeout=2, 124 sleep_interval=0.1) 125 126 self.aecdump(stream_id, rate, channels) 127 time.sleep(3) 128 except utils.TimeoutError: 129 # Possibly error has occurred in capture proess. 130 audio_helper.dump_audio_diagnostics( 131 os.path.join(self.resultsdir, "audio_diagnostics.txt")) 132 raise error.TestFail("Fail to find aec stream's id") 133 finally: 134 self.cleanup_test_procs() 135 136 sox_stat = sox_utils.get_stat(recorded_file, 137 channels=channels, 138 rate=rate) 139 return sox_stat.rms 140 141 def run_once(self): 142 """Entry point of this test.""" 143 rms_results = [] 144 test_pass = True 145 try: 146 for sample_rate, channels, rms_threshold in self.TEST_DATA: 147 rms = self.test_sample_rate_and_channels(sample_rate, channels) 148 if rms > rms_threshold: 149 test_pass = False 150 rms_results.append(rms) 151 finally: 152 logging.debug("rms results: %s", rms_results) 153 154 if not test_pass: 155 raise error.TestFail("rms too high in at least one case %s" % 156 rms_results) 157