xref: /aosp_15_r20/external/autotest/client/cros/audio/audio_analysis_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/python3
2from __future__ import absolute_import
3from __future__ import division
4from __future__ import print_function
5
6import logging
7import numpy
8import os
9import unittest
10
11import common
12from autotest_lib.client.cros.audio import audio_analysis
13from autotest_lib.client.cros.audio import audio_data
14from six.moves import range
15
16class SpectralAnalysisTest(unittest.TestCase):
17    def setUp(self):
18        """Uses the same seed to generate noise for each test."""
19        numpy.random.seed(0)
20
21
22    def stub_peak_detection(self, array, window_size):
23        """Detects peaks in an array in simple way.
24
25        A point (i, array[i]) is a peak if array[i] is the maximum among
26        array[i - half_window_size] to array[i + half_window_size].
27        If array[i - half_window_size] to array[i + half_window_size] are all
28        equal, then there is no peak in this window.
29
30        @param window_size: The window to detect peaks.
31
32        @returns: A list of tuples:
33                  [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2),
34                   ...]
35                  where the tuples are sorted by peak values.
36
37        """
38        half_window_size = window_size // 2
39        length = len(array)
40
41        def mid_is_peak(array, mid, left, right):
42            """Checks if value at mid is the largest among left to right.
43
44            @param array: A list of numbers.
45            @param mid: The mid index.
46            @param left: The left index.
47            @param rigth: The right index.
48
49            @returns: True if array[index] is the maximum among numbers in array
50                      between index [left, right] inclusively.
51
52            """
53            value_mid = array[mid]
54            for index in range(left, right + 1):
55                if index == mid:
56                    continue
57                if array[index] >= value_mid:
58                    return False
59            return True
60
61        results = []
62        for mid in range(length):
63            left = max(0, mid - half_window_size)
64            right = min(length - 1, mid + half_window_size)
65            if mid_is_peak(array, mid, left, right):
66                results.append((mid, array[mid]))
67
68        # Sort the peaks by values.
69        return sorted(results, key=lambda x: x[1], reverse=True)
70
71
72    def testPeakDetection(self):
73        array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1]
74        result = audio_analysis.peak_detection(array, 4)
75        golden_answer = [(12, 5), (4, 4)]
76        self.assertEqual(result, golden_answer)
77
78
79    def testPeakDetectionLarge(self):
80        array = numpy.random.uniform(0, 1, 1000000)
81        window_size = 100
82        logging.debug('Test large array using stub peak detection')
83        stub_answer = self.stub_peak_detection(array, window_size)
84        logging.debug('Test large array using improved peak detection')
85        improved_answer = audio_analysis.peak_detection(array, window_size)
86        logging.debug('Compare the result')
87        self.assertEqual(stub_answer, improved_answer)
88
89
90    def testSpectralAnalysis(self):
91        rate = 48000
92        length_in_secs = 0.5
93        freq_1 = 490.0
94        freq_2 = 60.0
95        coeff_1 = 1
96        coeff_2 = 0.3
97        samples = int(length_in_secs * rate)
98        noise = numpy.random.standard_normal(samples) * 0.005
99        x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples)
100        y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) +
101             coeff_2 * numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise
102        results = audio_analysis.spectral_analysis(y, rate)
103        # Results should contains
104        # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant
105        # frequency with coefficient 1, 60Hz is the second dominant frequency
106        # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient
107        # around 0.1. The k constant is resulted from window function.
108        logging.debug('Results: %s', results)
109        self.assertTrue(abs(results[0][0]-freq_1) < 1)
110        self.assertTrue(abs(results[1][0]-freq_2) < 1)
111        self.assertTrue(
112                abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01)
113
114
115    def testSpectralAnalysisRealData(self):
116        """This unittest checks the spectral analysis works on real data."""
117        file_path = os.path.join(
118                os.path.dirname(__file__), 'test_data', '1k_2k.raw')
119        binary = open(file_path, 'rb').read()
120        data = audio_data.AudioRawData(binary, 2, 'S32_LE')
121        saturate_value = audio_data.get_maximum_value_from_sample_format(
122                'S32_LE')
123        golden_frequency = [1000, 2000]
124        for channel in [0, 1]:
125            normalized_signal = audio_analysis.normalize_signal(
126                    data.channel_data[channel],saturate_value)
127            spectral = audio_analysis.spectral_analysis(
128                    normalized_signal, 48000, 0.02)
129            logging.debug('channel %s: %s', channel, spectral)
130            self.assertTrue(abs(spectral[0][0] - golden_frequency[channel]) < 5,
131                            'Dominant frequency is not correct')
132
133
134    def testNotMeaningfulData(self):
135        """Checks that sepectral analysis handles un-meaningful data."""
136        rate = 48000
137        length_in_secs = 0.5
138        samples = int(length_in_secs * rate)
139        noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5
140        noise = numpy.random.standard_normal(samples) * noise_amplitude
141        results = audio_analysis.spectral_analysis(noise, rate)
142        self.assertEqual([(0, 0)], results)
143
144
145    def testEmptyData(self):
146        """Checks that sepectral analysis rejects empty data."""
147        with self.assertRaises(audio_analysis.EmptyDataError):
148            results = audio_analysis.spectral_analysis([], 100)
149
150
151class NormalizeTest(unittest.TestCase):
152    def testNormalize(self):
153        y = [1, 2, 3, 4, 5]
154        normalized_y = audio_analysis.normalize_signal(y, 10)
155        expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5])
156        for i in range(len(y)):
157            self.assertEqual(expected[i], normalized_y[i])
158
159
160class AnomalyTest(unittest.TestCase):
161    def setUp(self):
162        """Creates a test signal of sine wave."""
163        # Use the same seed for each test case.
164        numpy.random.seed(0)
165
166        self.block_size = 120
167        self.rate = 48000
168        self.freq = 440
169        length_in_secs = 0.25
170        self.samples = int(length_in_secs * self.rate)
171        x = numpy.linspace(
172                0.0, (self.samples - 1) * 1.0 / self.rate, self.samples)
173        self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x)
174
175
176    def add_noise(self):
177        """Add noise to the test signal."""
178        noise_amplitude = 0.3
179        noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude
180        self.y = self.y + noise
181
182
183    def insert_anomaly(self):
184        """Inserts an anomaly to the test signal.
185
186        The anomaly self.anomaly_samples should be created before calling this
187        method.
188
189        """
190        self.anomaly_start_secs = 0.1
191        self.y = numpy.insert(self.y, int(self.anomaly_start_secs * self.rate),
192                              self.anomaly_samples)
193
194
195    def generate_skip_anomaly(self):
196        """Skips a section of test signal."""
197        self.anomaly_start_secs = 0.1
198        self.anomaly_duration_secs = 0.005
199        anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs
200        anomaly_start_index = int(self.anomaly_start_secs * self.rate)
201        anomaly_append_index = int(anomaly_append_secs * self.rate)
202        self.y = numpy.append(self.y[:anomaly_start_index], self.y[anomaly_append_index:])
203
204
205    def create_constant_anomaly(self, amplitude):
206        """Creates an anomaly of constant samples.
207
208        @param amplitude: The amplitude of the constant samples.
209
210        """
211        self.anomaly_duration_secs = 0.005
212        self.anomaly_samples = (
213                [amplitude] * int(self.anomaly_duration_secs * self.rate))
214
215
216    def run_analysis(self):
217        """Runs the anomaly detection."""
218        self.results = audio_analysis.anomaly_detection(
219                self.y, self.rate, self.freq, self.block_size)
220        logging.debug('Results: %s', self.results)
221
222
223    def check_no_anomaly(self):
224        """Verifies that there is no anomaly in detection result."""
225        self.run_analysis()
226        self.assertFalse(self.results)
227
228
229    def check_anomaly(self):
230        """Verifies that there is anomaly in detection result.
231
232        The detection result should contain anomaly time stamps that are
233        close to where anomaly was inserted. There can be multiple anomalies
234        since the detection depends on the block size.
235
236        """
237        self.run_analysis()
238        self.assertTrue(self.results)
239        # Anomaly can be detected as long as the detection window of block size
240        # overlaps with anomaly.
241        expected_detected_range_secs = (
242                self.anomaly_start_secs - float(self.block_size) / self.rate,
243                self.anomaly_start_secs + self.anomaly_duration_secs)
244        for detected_secs in self.results:
245            self.assertTrue(detected_secs <= expected_detected_range_secs[1])
246            self.assertTrue(detected_secs >= expected_detected_range_secs[0] )
247
248
249    def testGoodSignal(self):
250        """Sine wave signal with no noise or anomaly."""
251        self.check_no_anomaly()
252
253
254    def testGoodSignalNoise(self):
255        """Sine wave signal with noise."""
256        self.add_noise()
257        self.check_no_anomaly()
258
259
260    def testZeroAnomaly(self):
261        """Sine wave signal with no noise but with anomaly.
262
263        This test case simulates underrun in digital data where there will be
264        one block of samples with 0 amplitude.
265
266        """
267        self.create_constant_anomaly(0)
268        self.insert_anomaly()
269        self.check_anomaly()
270
271
272    def testZeroAnomalyNoise(self):
273        """Sine wave signal with noise and anomaly.
274
275        This test case simulates underrun in analog data where there will be
276        one block of samples with amplitudes close to 0.
277
278        """
279        self.create_constant_anomaly(0)
280        self.insert_anomaly()
281        self.add_noise()
282        self.check_anomaly()
283
284
285    def testLowConstantAnomaly(self):
286        """Sine wave signal with low constant anomaly.
287
288        The anomaly is one block of constant values.
289
290        """
291        self.create_constant_anomaly(0.05)
292        self.insert_anomaly()
293        self.check_anomaly()
294
295
296    def testLowConstantAnomalyNoise(self):
297        """Sine wave signal with low constant anomaly and noise.
298
299        The anomaly is one block of constant values.
300
301        """
302        self.create_constant_anomaly(0.05)
303        self.insert_anomaly()
304        self.add_noise()
305        self.check_anomaly()
306
307
308    def testHighConstantAnomaly(self):
309        """Sine wave signal with high constant anomaly.
310
311        The anomaly is one block of constant values.
312
313        """
314        self.create_constant_anomaly(2)
315        self.insert_anomaly()
316        self.check_anomaly()
317
318
319    def testHighConstantAnomalyNoise(self):
320        """Sine wave signal with high constant anomaly and noise.
321
322        The anomaly is one block of constant values.
323
324        """
325        self.create_constant_anomaly(2)
326        self.insert_anomaly()
327        self.add_noise()
328        self.check_anomaly()
329
330
331    def testSkippedAnomaly(self):
332        """Sine wave signal with skipped anomaly.
333
334        The anomaly simulates the symptom where a block is skipped.
335
336        """
337        self.generate_skip_anomaly()
338        self.check_anomaly()
339
340
341    def testSkippedAnomalyNoise(self):
342        """Sine wave signal with skipped anomaly with noise.
343
344        The anomaly simulates the symptom where a block is skipped.
345
346        """
347        self.generate_skip_anomaly()
348        self.add_noise()
349        self.check_anomaly()
350
351
352    def testEmptyData(self):
353        """Checks that anomaly detection rejects empty data."""
354        self.y = []
355        with self.assertRaises(audio_analysis.EmptyDataError):
356            self.check_anomaly()
357
358
359if __name__ == '__main__':
360    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
361    unittest.main()
362