xref: /aosp_15_r20/external/autotest/client/site_tests/audio_CrasAec/audio_CrasAec.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright 2021 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import logging
7import os
8import subprocess
9import time
10
11from autotest_lib.client.bin import test
12from autotest_lib.client.bin import utils
13from autotest_lib.client.common_lib import error
14from autotest_lib.client.cros.audio import audio_helper
15from autotest_lib.client.cros.audio import sox_utils
16
17
18class audio_CrasAec(test.test):
19    """Verifies echo cancellation functions well."""
20    version = 1
21
22    INT_SPK_CRAS_NODE_TYPE = 'INTERNAL_SPEAKER'
23    INT_MIC_CRAS_NODE_TYPE = 'INTERNAL_MIC'
24
25    # (sample rate, channels, rms threshold)
26    # The rms_threshold value is determined by experiments.
27    TEST_DATA = [
28            (48000, 1, 0.015),
29            (44100, 1, 0.015),
30            (16000, 1, 0.015),
31            (44100, 2, 0.015),
32            (48000, 2, 0.015),
33            (16000, 2, 0.015),
34    ]
35
36    def play_sound(self):
37        """Plays the given audio content."""
38        cmd = [
39                'cras_test_client', '--playback_file',
40                os.path.join(self.bindir, 'human-voice.raw')
41        ]
42        self._play_sound_proc = subprocess.Popen(cmd)
43
44    def record_aec(self, rate, channels):
45        """Records the looped audio with AEC processing. """
46        file_name = os.path.join(self.resultsdir,
47                                 'record-%d-ch%d.raw' % (rate, channels))
48        cmd = [
49                'cras_test_client', '--loopback_file', file_name, '--effects',
50                'aec', '--rate',
51                str(rate), '--post_dsp', '2', '--num_channels',
52                str(channels)
53        ]
54        self._record_aec_proc = subprocess.Popen(cmd)
55        return file_name
56
57    def aecdump(self, stream_id, rate, channels):
58        """Do the AEC dump parallelly."""
59
60        file_name = os.path.join(self.resultsdir,
61                                 'aecdump-%d-ch%d.raw' % (rate, channels))
62        cmd = [
63                'cras_test_client', '--aecdump', file_name, '--stream_id',
64                str(stream_id), '--duration',
65                str(10)
66        ]
67        self._dump_aec_proc = subprocess.Popen(cmd)
68
69    def setup_test_procs(self):
70        """Initializes process variables for this test."""
71        self._dump_aec_proc = None
72        self._record_aec_proc = None
73        self._play_sound_proc = None
74
75    def cleanup_test_procs(self):
76        """Cleans up all cras_test_client processes used in test."""
77        if self._dump_aec_proc:
78            self._dump_aec_proc.kill()
79        if self._record_aec_proc:
80            self._record_aec_proc.kill()
81        if self._play_sound_proc:
82            self._play_sound_proc.kill()
83
84    def get_aec_stream_id(self):
85        """Gets the first AEC stream id in decimal. """
86        proc = subprocess.Popen(['cras_test_client', '--dump_a'],
87                                stdout=subprocess.PIPE)
88        output, err = proc.communicate()
89        lines = output.decode().split('\n')
90        # Filter through the summary lines by effects 0x0001 to find
91        # the stream id.
92        for line in lines:
93            words = line.split(' ')
94            if words[0] != 'Summary:':
95                continue
96
97            logging.debug("audio dump summaries: %s", line)
98            if words[8] == '0x0001':
99                return int(words[3], 16)
100
101        return None
102
103    def test_sample_rate_and_channels(self, rate, channels):
104        """
105        Configures CRAS to use aloop as input and output option.
106        Plays the given audio content then record through aloop.
107        Expects the AEC cancels well because the two-way data
108        are the same except scaling and time shift.
109
110        @param rarte: the sample rate to create capture stream
111        @param channels: the number of channels to create capture stream
112
113        @returns: the rms value reported by sox util.
114        """
115        self.setup_test_procs()
116
117        try:
118            self.play_sound()
119            recorded_file = self.record_aec(rate, channels)
120
121            # Wait at most 2 seconds for AEC stream to be ready for aecdump.
122            stream_id = utils.poll_for_condition(self.get_aec_stream_id,
123                                                 timeout=2,
124                                                 sleep_interval=0.1)
125
126            self.aecdump(stream_id, rate, channels)
127            time.sleep(3)
128        except utils.TimeoutError:
129            # Possibly error has occurred in capture proess.
130            audio_helper.dump_audio_diagnostics(
131                    os.path.join(self.resultsdir, "audio_diagnostics.txt"))
132            raise error.TestFail("Fail to find aec stream's id")
133        finally:
134            self.cleanup_test_procs()
135
136        sox_stat = sox_utils.get_stat(recorded_file,
137                                      channels=channels,
138                                      rate=rate)
139        return sox_stat.rms
140
141    def run_once(self):
142        """Entry point of this test."""
143        rms_results = []
144        test_pass = True
145        try:
146            for sample_rate, channels, rms_threshold in self.TEST_DATA:
147                rms = self.test_sample_rate_and_channels(sample_rate, channels)
148                if rms > rms_threshold:
149                    test_pass = False
150                rms_results.append(rms)
151        finally:
152            logging.debug("rms results: %s", rms_results)
153
154        if not test_pass:
155            raise error.TestFail("rms too high in at least one case %s" %
156                                 rms_results)
157