1# Lint as: python2, python3
2# Copyright 2021 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""This is a server side noise cancellation test using the Chameleon board."""
6
7import logging
8import os
9import time
10
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.cros.audio import audio_test_data
13from autotest_lib.client.cros.audio import sox_utils
14from autotest_lib.client.cros.audio import visqol_utils
15from autotest_lib.client.cros.bluetooth.bluetooth_audio_test_data import (
16        download_file_from_bucket, get_visqol_binary)
17from autotest_lib.client.cros.chameleon import audio_test_utils
18from autotest_lib.client.cros.chameleon import chameleon_audio_ids
19from autotest_lib.client.cros.chameleon import chameleon_audio_helper
20from autotest_lib.server.cros.audio import audio_test
21
22DIST_FILES_DIR = 'gs://chromeos-localmirror/distfiles/test_noise_cancellation'
23DATA_DIR = '/tmp'
24
25
26# Verification steps for the Noise Cancellation processing (NC):
27# 1. Prepare the audio source file and reference file.
28# 2. Play the source file by Chameleon.
29# 3. Record by DUT Internal Mic when NC is on and get ViSQOL score A.
30# 4. Repeat step 2.
31# 5. Record by DUT Internal Mic when NC is off and get ViSQOL score B.
32# 6. Check if A - B >= threshold
33#
34# In practice, ViSQOL is not the most suitable metrics for NC due to its
35# intrusive design (reference: go/visqol). However, it is fair enough to compare
36# the relative gain (or degradation) between before and after de-noising.
37#
38# TODO(johnylin): replace ViSQOL with other metrics if applicable.
39# TODO(johnylin): add more speech and noise test inputs for inclusion.
40class audio_AudioNoiseCancellation(audio_test.AudioTest):
41    """Server side input audio noise cancellation test.
42
43    This test talks to a Chameleon board and a Cros device to verify
44    input audio noise cancellation function of the Cros device.
45
46    """
47    version = 1
48    DELAY_BEFORE_PLAYBACK_SECONDS = 3.0
49    DELAY_AFTER_PLAYBACK_SECONDS = 2.0
50    DELAY_AFTER_BINDING = 0.5
51    DELAY_AFTER_NC_TOGGLED = 0.5
52
53    cleanup_files = []
54
55    def cleanup(self):
56        # Restore the default state of bypass blocking mechanism in Cras.
57        # Restarting Cras is only way because we are not able to know the
58        # default state.
59        self.host.run('restart cras')
60
61        # Start Chrome UI.
62        self.host.run('start ui')
63
64        # Remove downloaded files and the temporary generated files.
65        for cleanup_file in self.cleanup_files:
66            if os.path.isfile(cleanup_file):
67                os.remove(cleanup_file)
68
69    def download_file_from_bucket(self, file):
70        """Download the file from GS bucket.
71
72        @param file: the file name for download.
73
74        @raises: error.TestError if failed.
75
76        @returns: the local path of the downloaded file.
77        """
78        remote_path = os.path.join(DIST_FILES_DIR, file)
79        if not download_file_from_bucket(
80                DATA_DIR, remote_path, lambda _, __, p: p.returncode == 0):
81            logging.error('Failed to download %s to %s', remote_path, DATA_DIR)
82            raise error.TestError('Failed to download file %s from bucket.' %
83                                  file)
84
85        return os.path.join(DATA_DIR, file)
86
87    def generate_noisy_speech_file(self, speech_path, noise_path):
88        """Generate the mixed audio file of speech and noise data.
89
90        @param speech_path: the file path of the pure speech audio.
91        @param noise_path: the file path of the noise audio.
92
93        @raises: error.TestError if failed.
94
95        @returns: the file path of the mixed audio.
96        """
97        mixed_wav_path = os.path.join(DATA_DIR, 'speech_noise_mixed.wav')
98        if os.path.exists(mixed_wav_path):
99            os.remove(mixed_wav_path)
100        sox_utils.mix_two_wav_files(speech_path,
101                                    noise_path,
102                                    mixed_wav_path,
103                                    input_volume=1.0)
104        if not os.path.isfile(mixed_wav_path):
105            logging.error('WAV file %s does not exist.', mixed_wav_path)
106            raise error.TestError('Failed to mix %s and %s by sox commands.' %
107                                  (speech_path, noise_path))
108
109        return mixed_wav_path
110
111    def run_once(self, test_data):
112        """Runs Audio Noise Cancellation test.
113
114        Test scenarios can be distinguished by the elements (keys) in test_data.
115        Noisy environment test:
116            test_data = dict(
117                speech_file: the WAV file for the pure speech data.
118                noise_file: the WAV file for the noise data.
119                threshold: the min required score gain for NC effect.)
120        Quiet environment test:
121            test_data = dict(
122                speech_file: the WAV file for the pure speech data.
123                threshold: the min score diff tolerance for NC effect.)
124
125        @param test_data: the dict for files and threshold as mentioned above.
126        """
127        if not self.facade.get_noise_cancellation_supported():
128            logging.warning('Noise Cancellation is not supported.')
129            raise error.TestWarn('Noise Cancellation is not supported.')
130
131        def _remove_at_cleanup(filepath):
132            self.cleanup_files.append(filepath)
133
134        # Download the files from bucket.
135        speech_path = self.download_file_from_bucket(test_data['speech_file'])
136        _remove_at_cleanup(speech_path)
137
138        ref_infos = sox_utils.get_infos_from_wav_file(speech_path)
139        if ref_infos is None:
140            raise error.TestError('Failed to get infos from wav file %s.' %
141                                  speech_path)
142
143        if 'noise_file' in test_data:
144            # Noisy environment test when 'noise_file' is given.
145            noise_path = self.download_file_from_bucket(
146                    test_data['noise_file'])
147            _remove_at_cleanup(noise_path)
148
149            test_audio_path = self.generate_noisy_speech_file(
150                    speech_path, noise_path)
151            _remove_at_cleanup(test_audio_path)
152
153            test_infos = sox_utils.get_infos_from_wav_file(test_audio_path)
154            if test_infos is None:
155                raise error.TestError('Failed to get infos from wav file %s.' %
156                                      test_audio_path)
157        else:
158            # Quiet environment test.
159            test_audio_path = speech_path
160            test_infos = ref_infos
161
162        playback_testdata = audio_test_data.AudioTestData(
163                path=test_audio_path,
164                data_format=dict(file_type='wav',
165                                 sample_format='S{}_LE'.format(
166                                         test_infos['bits']),
167                                 channel=test_infos['channels'],
168                                 rate=test_infos['rate']),
169                duration_secs=test_infos['duration'])
170
171        # Get and set VISQOL working environment.
172        get_visqol_binary()
173
174        # Bypass blocking mechanism in Cras to make sure Noise Cancellation is
175        # enabled.
176        self.facade.set_bypass_block_noise_cancellation(bypass=True)
177
178        source = self.widget_factory.create_widget(
179                chameleon_audio_ids.ChameleonIds.LINEOUT)
180        sink = self.widget_factory.create_widget(
181                chameleon_audio_ids.PeripheralIds.SPEAKER)
182        binder = self.widget_factory.create_binder(source, sink)
183
184        recorder = self.widget_factory.create_widget(
185                chameleon_audio_ids.CrosIds.INTERNAL_MIC)
186
187        # Select and check the node selected by cras is correct.
188        audio_test_utils.check_and_set_chrome_active_node_types(
189                self.facade, None,
190                audio_test_utils.get_internal_mic_node(self.host))
191
192        # Adjust the proper input gain.
193        self.facade.set_chrome_active_input_gain(50)
194
195        # Stop Chrome UI to avoid NC state preference intervened by Chrome.
196        self.host.run('stop ui')
197        logging.info(
198                'UI is stopped to avoid NC preference intervention from Chrome'
199        )
200
201        def _run_routine(recorded_filename, nc_enabled):
202            # Set NC state via D-Bus control.
203            self.facade.set_noise_cancellation_enabled(nc_enabled)
204            time.sleep(self.DELAY_AFTER_NC_TOGGLED)
205
206            with chameleon_audio_helper.bind_widgets(binder):
207                time.sleep(self.DELAY_AFTER_BINDING)
208
209                logfile_suffix = 'nc_on' if nc_enabled else 'nc_off'
210                audio_test_utils.dump_cros_audio_logs(
211                        self.host, self.facade, self.resultsdir,
212                        'after_binding.{}'.format(logfile_suffix))
213
214                logging.info('Set playback data on Chameleon')
215                source.set_playback_data(playback_testdata)
216
217                # Start recording, wait a few seconds, and then start playback.
218                # Make sure the recorded data has silent samples in the
219                # beginning to trim, and includes the entire playback content.
220                logging.info('Start recording from Cros device')
221                recorder.start_recording()
222                time.sleep(self.DELAY_BEFORE_PLAYBACK_SECONDS)
223
224                logging.info('Start playing %s from Chameleon',
225                             playback_testdata.path)
226                source.start_playback()
227
228                time.sleep(test_infos['duration'] +
229                           self.DELAY_AFTER_PLAYBACK_SECONDS)
230
231                recorder.stop_recording()
232                logging.info('Stopped recording from Cros device.')
233
234                audio_test_utils.dump_cros_audio_logs(
235                        self.host, self.facade, self.resultsdir,
236                        'after_recording.{}'.format(logfile_suffix))
237
238                recorder.read_recorded_binary()
239                logging.info('Read recorded binary from Cros device.')
240
241            # Remove the beginning of recorded data. This is to avoid artifact
242            # caused by Cros device codec initialization in the beginning of
243            # recording.
244            recorder.remove_head(1.0)
245
246            recorded_file = os.path.join(self.resultsdir,
247                                         recorded_filename + '.raw')
248            logging.info('Saving recorded data to %s', recorded_file)
249            recorder.save_file(recorded_file)
250            _remove_at_cleanup(recorded_file)
251
252            # WAV file is also saved by recorder.save_file().
253            recorded_wav_path = recorded_file + '.wav'
254            if not os.path.isfile(recorded_wav_path):
255                logging.error('WAV file %s does not exist.', recorded_wav_path)
256                raise error.TestError('Failed to find recorded wav file.')
257            _remove_at_cleanup(recorded_wav_path)
258
259            rec_infos = sox_utils.get_infos_from_wav_file(recorded_wav_path)
260            if rec_infos is None:
261                raise error.TestError('Failed to get infos from wav file %s.' %
262                                      recorded_wav_path)
263
264            # Downsample the recorded data from 48k to 16k rate. It is required
265            # for getting ViSQOL score in speech mode.
266            recorded_16k_path = '{}_16k{}'.format(
267                    *os.path.splitext(recorded_wav_path))
268            sox_utils.convert_format(recorded_wav_path,
269                                     rec_infos['channels'],
270                                     rec_infos['bits'],
271                                     rec_infos['rate'],
272                                     recorded_16k_path,
273                                     ref_infos['channels'],
274                                     ref_infos['bits'],
275                                     ref_infos['rate'],
276                                     1.0,
277                                     use_src_header=True,
278                                     use_dst_header=True)
279
280            # Remove the silence in the beginning and trim to the same duration
281            # as the reference file.
282            trimmed_recorded_16k_path = '{}_trim{}'.format(
283                    *os.path.splitext(recorded_16k_path))
284            sox_utils.trim_silence_from_wav_file(recorded_16k_path,
285                                                 trimmed_recorded_16k_path,
286                                                 ref_infos['duration'],
287                                                 duration_threshold=0.05)
288
289            score = visqol_utils.get_visqol_score(
290                    ref_file=speech_path,
291                    deg_file=trimmed_recorded_16k_path,
292                    log_dir=self.resultsdir,
293                    speech_mode=True)
294
295            logging.info('Recorded audio %s got ViSQOL score: %f',
296                         recorded_filename, score)
297            return score
298
299        logging.info('Run routine with NC enabled...')
300        nc_on_score = _run_routine('record_nc_enabled', nc_enabled=True)
301        logging.info('Run routine with NC disabled...')
302        nc_off_score = _run_routine('record_nc_disabled', nc_enabled=False)
303
304        score_diff = nc_on_score - nc_off_score
305
306        # Track ViSQOL performance score
307        test_desc = 'internal_mic_noise_cancellation_visqol_diff'
308        self.write_perf_keyval({test_desc: score_diff})
309
310        if score_diff < test_data['threshold']:
311            raise error.TestFail(
312                    'ViSQOL score diff for NC(=%f) is lower than threshold(=%f)'
313                    % (score_diff, test_data['threshold']))
314