xref: /aosp_15_r20/external/autotest/client/cros/audio/audio_analysis.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Li# Copyright 2015 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li
6*9c5db199SXin Li"""This module provides utilities to do audio data analysis."""
7*9c5db199SXin Li
8*9c5db199SXin Lifrom __future__ import absolute_import
9*9c5db199SXin Lifrom __future__ import division
10*9c5db199SXin Lifrom __future__ import print_function
11*9c5db199SXin Liimport logging
12*9c5db199SXin Liimport numpy
13*9c5db199SXin Liimport operator
14*9c5db199SXin Lifrom six.moves import range
15*9c5db199SXin Li
16*9c5db199SXin Li# Only peaks with coefficient greater than 0.01 of the first peak should be
17*9c5db199SXin Li# considered. Note that this correspond to -40dB in the spectrum.
18*9c5db199SXin LiDEFAULT_MIN_PEAK_RATIO = 0.01
19*9c5db199SXin Li
20*9c5db199SXin LiPEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection.
21*9c5db199SXin Li
22*9c5db199SXin Li# The minimum RMS value of meaningful audio data.
23*9c5db199SXin LiMEANINGFUL_RMS_THRESHOLD = 0.001
24*9c5db199SXin Li
25*9c5db199SXin Liclass RMSTooSmallError(Exception):
26*9c5db199SXin Li    """Error when signal RMS is too small."""
27*9c5db199SXin Li    pass
28*9c5db199SXin Li
29*9c5db199SXin Li
30*9c5db199SXin Liclass EmptyDataError(Exception):
31*9c5db199SXin Li    """Error when signal is empty."""
32*9c5db199SXin Li
33*9c5db199SXin Li
34*9c5db199SXin Lidef normalize_signal(signal, saturate_value):
35*9c5db199SXin Li    """Normalizes the signal with respect to the saturate value.
36*9c5db199SXin Li
37*9c5db199SXin Li    @param signal: A list for one-channel PCM data.
38*9c5db199SXin Li    @param saturate_value: The maximum value that the PCM data might be.
39*9c5db199SXin Li
40*9c5db199SXin Li    @returns: A numpy array containing normalized signal. The normalized signal
41*9c5db199SXin Li              has value -1 and 1 when it saturates.
42*9c5db199SXin Li
43*9c5db199SXin Li    """
44*9c5db199SXin Li    signal = numpy.array(signal)
45*9c5db199SXin Li    return signal / float(saturate_value)
46*9c5db199SXin Li
47*9c5db199SXin Li
48*9c5db199SXin Lidef spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
49*9c5db199SXin Li                      peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
50*9c5db199SXin Li    """Gets the dominant frequencies by spectral analysis.
51*9c5db199SXin Li
52*9c5db199SXin Li    @param signal: A list of numbers for one-channel PCM data. This should be
53*9c5db199SXin Li                   normalized to [-1, 1] so the function can check if signal RMS
54*9c5db199SXin Li                   is too small to be meaningful.
55*9c5db199SXin Li    @param rate: Sampling rate.
56*9c5db199SXin Li    @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the
57*9c5db199SXin Li                           peaks other than the greatest one should be
58*9c5db199SXin Li                           considered.
59*9c5db199SXin Li                           This is to ignore peaks that are too small compared
60*9c5db199SXin Li                           to the first peak peak_0.
61*9c5db199SXin Li    @param peak_window_size_hz: The window size in Hz to find the peaks.
62*9c5db199SXin Li                                The minimum differences between found peaks will
63*9c5db199SXin Li                                be half of this value.
64*9c5db199SXin Li
65*9c5db199SXin Li    @returns: A list of tuples:
66*9c5db199SXin Li              [(peak_frequency_0, peak_coefficient_0),
67*9c5db199SXin Li               (peak_frequency_1, peak_coefficient_1),
68*9c5db199SXin Li               (peak_frequency_2, peak_coefficient_2), ...]
69*9c5db199SXin Li              where the tuples are sorted by coefficients.
70*9c5db199SXin Li              The last peak_coefficient will be no less than
71*9c5db199SXin Li              peak_coefficient * min_peak_ratio.
72*9c5db199SXin Li              If RMS is less than MEANINGFUL_RMS_THRESHOLD, returns [(0, 0)].
73*9c5db199SXin Li
74*9c5db199SXin Li    """
75*9c5db199SXin Li    # Checks the signal is meaningful.
76*9c5db199SXin Li    if len(signal) == 0:
77*9c5db199SXin Li        raise EmptyDataError('Signal data is empty')
78*9c5db199SXin Li
79*9c5db199SXin Li    signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
80*9c5db199SXin Li    logging.debug('signal RMS = %s', signal_rms)
81*9c5db199SXin Li
82*9c5db199SXin Li    # If RMS is too small, set dominant frequency and coefficient to 0.
83*9c5db199SXin Li    if signal_rms < MEANINGFUL_RMS_THRESHOLD:
84*9c5db199SXin Li        logging.warning(
85*9c5db199SXin Li                'RMS %s is too small to be meaningful. Set frequency to 0.',
86*9c5db199SXin Li                signal_rms)
87*9c5db199SXin Li        return [(0, 0)]
88*9c5db199SXin Li
89*9c5db199SXin Li    logging.debug('Doing spectral analysis ...')
90*9c5db199SXin Li
91*9c5db199SXin Li    # First, pass signal through a window function to mitigate spectral leakage.
92*9c5db199SXin Li    y_conv_w = signal * numpy.hanning(len(signal))
93*9c5db199SXin Li
94*9c5db199SXin Li    length = len(y_conv_w)
95*9c5db199SXin Li
96*9c5db199SXin Li    # x_f is the frequency in Hz, y_f is the transformed coefficient.
97*9c5db199SXin Li    x_f = _rfft_freq(length, rate)
98*9c5db199SXin Li    y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
99*9c5db199SXin Li
100*9c5db199SXin Li    # y_f is complex so consider its absolute value for magnitude.
101*9c5db199SXin Li    abs_y_f = numpy.abs(y_f)
102*9c5db199SXin Li    threshold = max(abs_y_f) * min_peak_ratio
103*9c5db199SXin Li
104*9c5db199SXin Li    # Suppresses all coefficients that are below threshold.
105*9c5db199SXin Li    for i in range(len(abs_y_f)):
106*9c5db199SXin Li        if abs_y_f[i] < threshold:
107*9c5db199SXin Li            abs_y_f[i] = 0
108*9c5db199SXin Li
109*9c5db199SXin Li    # Gets the peak detection window size in indice.
110*9c5db199SXin Li    # x_f[1] is the frequency difference per index.
111*9c5db199SXin Li    peak_window_size = int(peak_window_size_hz / x_f[1])
112*9c5db199SXin Li
113*9c5db199SXin Li    # Detects peaks.
114*9c5db199SXin Li    peaks = peak_detection(abs_y_f, peak_window_size)
115*9c5db199SXin Li
116*9c5db199SXin Li    # Transform back the peak location from index to frequency.
117*9c5db199SXin Li    results = []
118*9c5db199SXin Li    for index, value in peaks:
119*9c5db199SXin Li        results.append((x_f[index], value))
120*9c5db199SXin Li    return results
121*9c5db199SXin Li
122*9c5db199SXin Li
123*9c5db199SXin Lidef _rfft_freq(length, rate):
124*9c5db199SXin Li    """Gets the frequency at each index of real FFT.
125*9c5db199SXin Li
126*9c5db199SXin Li    @param length: The window length of FFT.
127*9c5db199SXin Li    @param rate: Sampling rate.
128*9c5db199SXin Li
129*9c5db199SXin Li    @returns: A numpy array containing frequency corresponding to
130*9c5db199SXin Li              numpy.fft.rfft result at each index.
131*9c5db199SXin Li
132*9c5db199SXin Li    """
133*9c5db199SXin Li    # The difference in Hz between each index.
134*9c5db199SXin Li    val = rate / float(length)
135*9c5db199SXin Li    # Only care half of frequencies for FFT on real signal.
136*9c5db199SXin Li    result_length = length // 2 + 1
137*9c5db199SXin Li    return numpy.linspace(0, (result_length - 1) * val, result_length)
138*9c5db199SXin Li
139*9c5db199SXin Li
140*9c5db199SXin Lidef peak_detection(array, window_size):
141*9c5db199SXin Li    """Detects peaks in an array.
142*9c5db199SXin Li
143*9c5db199SXin Li    A point (i, array[i]) is a peak if array[i] is the maximum among
144*9c5db199SXin Li    array[i - half_window_size] to array[i + half_window_size].
145*9c5db199SXin Li    If array[i - half_window_size] to array[i + half_window_size] are all equal,
146*9c5db199SXin Li    then there is no peak in this window.
147*9c5db199SXin Li    Note that we only consider peak with value greater than 0.
148*9c5db199SXin Li
149*9c5db199SXin Li    @param window_size: The window to detect peaks.
150*9c5db199SXin Li
151*9c5db199SXin Li    @returns: A list of tuples:
152*9c5db199SXin Li              [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
153*9c5db199SXin Li              where the tuples are sorted by peak values.
154*9c5db199SXin Li
155*9c5db199SXin Li    """
156*9c5db199SXin Li    half_window_size = window_size // 2
157*9c5db199SXin Li    length = len(array)
158*9c5db199SXin Li
159*9c5db199SXin Li    def mid_is_peak(array, mid, left, right):
160*9c5db199SXin Li        """Checks if value at mid is the largest among left to right in array.
161*9c5db199SXin Li
162*9c5db199SXin Li        @param array: A list of numbers.
163*9c5db199SXin Li        @param mid: The mid index.
164*9c5db199SXin Li        @param left: The left index.
165*9c5db199SXin Li        @param rigth: The right index.
166*9c5db199SXin Li
167*9c5db199SXin Li        @returns: A tuple (is_peak, next_candidate)
168*9c5db199SXin Li                  is_peak is True if array[index] is the maximum among numbers
169*9c5db199SXin Li                  in array between index [left, right] inclusively.
170*9c5db199SXin Li                  next_candidate is the index of next candidate for peak if
171*9c5db199SXin Li                  is_peak is False. It is the index of maximum value in
172*9c5db199SXin Li                  [mid + 1, right]. If is_peak is True, next_candidate is
173*9c5db199SXin Li                  right + 1.
174*9c5db199SXin Li
175*9c5db199SXin Li        """
176*9c5db199SXin Li        value_mid = array[mid]
177*9c5db199SXin Li        is_peak = True
178*9c5db199SXin Li        next_peak_candidate_index = None
179*9c5db199SXin Li
180*9c5db199SXin Li        # Check the left half window.
181*9c5db199SXin Li        for index in range(left, mid):
182*9c5db199SXin Li            if array[index] >= value_mid:
183*9c5db199SXin Li                is_peak = False
184*9c5db199SXin Li                break
185*9c5db199SXin Li
186*9c5db199SXin Li        # Mid is at the end of array.
187*9c5db199SXin Li        if mid == right:
188*9c5db199SXin Li            return is_peak, right + 1
189*9c5db199SXin Li
190*9c5db199SXin Li        # Check the right half window and also record next candidate.
191*9c5db199SXin Li        # Favor the larger index for next_peak_candidate_index.
192*9c5db199SXin Li        for index in range(right, mid, -1):
193*9c5db199SXin Li            if (next_peak_candidate_index is None or
194*9c5db199SXin Li                array[index] > array[next_peak_candidate_index]):
195*9c5db199SXin Li                next_peak_candidate_index = index
196*9c5db199SXin Li
197*9c5db199SXin Li        if array[next_peak_candidate_index] >= value_mid:
198*9c5db199SXin Li            is_peak = False
199*9c5db199SXin Li
200*9c5db199SXin Li        if is_peak:
201*9c5db199SXin Li            next_peak_candidate_index = right + 1
202*9c5db199SXin Li
203*9c5db199SXin Li        return is_peak, next_peak_candidate_index
204*9c5db199SXin Li
205*9c5db199SXin Li    results = []
206*9c5db199SXin Li    mid = 0
207*9c5db199SXin Li    next_candidate_idx = None
208*9c5db199SXin Li    while mid < length:
209*9c5db199SXin Li        left = max(0, mid - half_window_size)
210*9c5db199SXin Li        right = min(length - 1, mid + half_window_size)
211*9c5db199SXin Li
212*9c5db199SXin Li        # Only consider value greater than 0.
213*9c5db199SXin Li        if array[mid] == 0:
214*9c5db199SXin Li            mid = mid + 1
215*9c5db199SXin Li            continue;
216*9c5db199SXin Li
217*9c5db199SXin Li        is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right)
218*9c5db199SXin Li
219*9c5db199SXin Li        if is_peak:
220*9c5db199SXin Li            results.append((mid, array[mid]))
221*9c5db199SXin Li
222*9c5db199SXin Li        # Use the next candidate found in [mid + 1, right], or right + 1.
223*9c5db199SXin Li        mid = next_candidate_idx
224*9c5db199SXin Li
225*9c5db199SXin Li    # Sort the peaks by values.
226*9c5db199SXin Li    return sorted(results, key=lambda x: x[1], reverse=True)
227*9c5db199SXin Li
228*9c5db199SXin Li
229*9c5db199SXin Li# The default pattern mathing threshold. By experiment, this threshold
230*9c5db199SXin Li# can tolerate normal noise of 0.3 amplitude when sine wave signal
231*9c5db199SXin Li# amplitude is 1.
232*9c5db199SXin LiPATTERN_MATCHING_THRESHOLD = 0.85
233*9c5db199SXin Li
234*9c5db199SXin Li# The default block size of pattern matching.
235*9c5db199SXin LiANOMALY_DETECTION_BLOCK_SIZE = 120
236*9c5db199SXin Li
237*9c5db199SXin Lidef anomaly_detection(signal, rate, freq,
238*9c5db199SXin Li                      block_size=ANOMALY_DETECTION_BLOCK_SIZE,
239*9c5db199SXin Li                      threshold=PATTERN_MATCHING_THRESHOLD):
240*9c5db199SXin Li    """Detects anomaly in a sine wave signal.
241*9c5db199SXin Li
242*9c5db199SXin Li    This method detects anomaly in a sine wave signal by matching
243*9c5db199SXin Li    patterns of each block.
244*9c5db199SXin Li    For each moving window of block in the test signal, checks if there
245*9c5db199SXin Li    is any block in golden signal that is similar to this block of test signal.
246*9c5db199SXin Li    If there is such a block in golden signal, then this block of test
247*9c5db199SXin Li    signal is matched and there is no anomaly in this block of test signal.
248*9c5db199SXin Li    If there is any block in test signal that is not matched, then this block
249*9c5db199SXin Li    covers an anomaly.
250*9c5db199SXin Li    The block of test signal starts from index 0, and proceeds in steps of
251*9c5db199SXin Li    half block size. The overlapping of test signal blocks makes sure there must
252*9c5db199SXin Li    be at least one block covering the transition from sine wave to anomaly.
253*9c5db199SXin Li
254*9c5db199SXin Li    @param signal: A 1-D array-like object for 1-channel PCM data.
255*9c5db199SXin Li    @param rate: The sampling rate.
256*9c5db199SXin Li    @param freq: The expected frequency of signal.
257*9c5db199SXin Li    @param block_size: The block size in samples to detect anomaly.
258*9c5db199SXin Li    @param threshold: The threshold of correlation index to be judge as matched.
259*9c5db199SXin Li
260*9c5db199SXin Li    @returns: A list containing detected anomaly time in seconds.
261*9c5db199SXin Li
262*9c5db199SXin Li    """
263*9c5db199SXin Li    if len(signal) == 0:
264*9c5db199SXin Li        raise EmptyDataError('Signal data is empty')
265*9c5db199SXin Li
266*9c5db199SXin Li    golden_y = _generate_golden_pattern(rate, freq, block_size)
267*9c5db199SXin Li
268*9c5db199SXin Li    results = []
269*9c5db199SXin Li
270*9c5db199SXin Li    for start in range(0, len(signal), block_size // 2):
271*9c5db199SXin Li        end = start + block_size
272*9c5db199SXin Li        test_signal = signal[start:end]
273*9c5db199SXin Li        matched = _moving_pattern_matching(golden_y, test_signal, threshold)
274*9c5db199SXin Li        if not matched:
275*9c5db199SXin Li            results.append(start)
276*9c5db199SXin Li
277*9c5db199SXin Li    results = [float(x) / rate for x in results]
278*9c5db199SXin Li
279*9c5db199SXin Li    return results
280*9c5db199SXin Li
281*9c5db199SXin Li
282*9c5db199SXin Lidef _generate_golden_pattern(rate, freq, block_size):
283*9c5db199SXin Li    """Generates a golden pattern of certain frequency.
284*9c5db199SXin Li
285*9c5db199SXin Li    The golden pattern must cover all the possibilities of waveforms in a
286*9c5db199SXin Li    block. So, we need a golden pattern covering 1 period + 1 block size,
287*9c5db199SXin Li    such that the test block can start anywhere in a period, and extends
288*9c5db199SXin Li    a block size.
289*9c5db199SXin Li
290*9c5db199SXin Li    |period |1 bk|
291*9c5db199SXin Li    |       |    |
292*9c5db199SXin Li     . .     . .
293*9c5db199SXin Li    .   .   .   .
294*9c5db199SXin Li         . .     .
295*9c5db199SXin Li
296*9c5db199SXin Li    @param rate: The sampling rate.
297*9c5db199SXin Li    @param freq: The frequency of golden pattern.
298*9c5db199SXin Li    @param block_size: The block size in samples to detect anomaly.
299*9c5db199SXin Li
300*9c5db199SXin Li    @returns: A 1-D array for golden pattern.
301*9c5db199SXin Li
302*9c5db199SXin Li    """
303*9c5db199SXin Li    samples_in_a_period = int(rate / freq) + 1
304*9c5db199SXin Li    samples_in_golden_pattern = samples_in_a_period + block_size
305*9c5db199SXin Li    golden_x = numpy.linspace(
306*9c5db199SXin Li            0.0, (samples_in_golden_pattern - 1) * 1.0 / rate,
307*9c5db199SXin Li            samples_in_golden_pattern)
308*9c5db199SXin Li    golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
309*9c5db199SXin Li    return golden_y
310*9c5db199SXin Li
311*9c5db199SXin Li
312*9c5db199SXin Lidef _moving_pattern_matching(golden_signal, test_signal, threshold):
313*9c5db199SXin Li    """Checks if test_signal is similar to any block of golden_signal.
314*9c5db199SXin Li
315*9c5db199SXin Li    Compares test signal with each block of golden signal by correlation
316*9c5db199SXin Li    index. If there is any block of golden signal that is similar to
317*9c5db199SXin Li    test signal, then it is matched.
318*9c5db199SXin Li
319*9c5db199SXin Li    @param golden_signal: A 1-D array for golden signal.
320*9c5db199SXin Li    @param test_signal: A 1-D array for test signal.
321*9c5db199SXin Li    @param threshold: The threshold of correlation index to be judge as matched.
322*9c5db199SXin Li
323*9c5db199SXin Li    @returns: True if there is a match. False otherwise.
324*9c5db199SXin Li
325*9c5db199SXin Li    @raises: ValueError: if test signal is longer than golden signal.
326*9c5db199SXin Li
327*9c5db199SXin Li    """
328*9c5db199SXin Li    if len(golden_signal) < len(test_signal):
329*9c5db199SXin Li        raise ValueError('Test signal is longer than golden signal')
330*9c5db199SXin Li
331*9c5db199SXin Li    block_length = len(test_signal)
332*9c5db199SXin Li    number_of_movings = len(golden_signal) - block_length + 1
333*9c5db199SXin Li    correlation_indices = []
334*9c5db199SXin Li    for moving_index in range(number_of_movings):
335*9c5db199SXin Li        # Cuts one block of golden signal from start index.
336*9c5db199SXin Li        # The block length is the same as test signal.
337*9c5db199SXin Li        start = moving_index
338*9c5db199SXin Li        end = start + block_length
339*9c5db199SXin Li        golden_signal_block = golden_signal[start:end]
340*9c5db199SXin Li        try:
341*9c5db199SXin Li            correlation_index = _get_correlation_index(
342*9c5db199SXin Li                    golden_signal_block, test_signal)
343*9c5db199SXin Li        except TestSignalNormTooSmallError:
344*9c5db199SXin Li            logging.info('Caught one block of test signal that has no meaningful norm')
345*9c5db199SXin Li            return False
346*9c5db199SXin Li        correlation_indices.append(correlation_index)
347*9c5db199SXin Li
348*9c5db199SXin Li    # Checks if the maximum correlation index is high enough.
349*9c5db199SXin Li    max_corr = max(correlation_indices)
350*9c5db199SXin Li    if max_corr < threshold:
351*9c5db199SXin Li        logging.debug('Got one unmatched block with max_corr: %s', max_corr)
352*9c5db199SXin Li        return False
353*9c5db199SXin Li    return True
354*9c5db199SXin Li
355*9c5db199SXin Li
356*9c5db199SXin Liclass GoldenSignalNormTooSmallError(Exception):
357*9c5db199SXin Li    """Exception when golden signal norm is too small."""
358*9c5db199SXin Li    pass
359*9c5db199SXin Li
360*9c5db199SXin Li
361*9c5db199SXin Liclass TestSignalNormTooSmallError(Exception):
362*9c5db199SXin Li    """Exception when test signal norm is too small."""
363*9c5db199SXin Li    pass
364*9c5db199SXin Li
365*9c5db199SXin Li
366*9c5db199SXin Li_MINIMUM_SIGNAL_NORM = 0.001
367*9c5db199SXin Li
368*9c5db199SXin Lidef _get_correlation_index(golden_signal, test_signal):
369*9c5db199SXin Li    """Computes correlation index of two signal of same length.
370*9c5db199SXin Li
371*9c5db199SXin Li    @param golden_signal: An 1-D array-like object.
372*9c5db199SXin Li    @param test_signal: An 1-D array-like object.
373*9c5db199SXin Li
374*9c5db199SXin Li    @raises: ValueError: if two signal have different lengths.
375*9c5db199SXin Li    @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small
376*9c5db199SXin Li    @raises: TestSignalNormTooSmallError: if test signal norm is too small.
377*9c5db199SXin Li
378*9c5db199SXin Li    @returns: The correlation index.
379*9c5db199SXin Li    """
380*9c5db199SXin Li    if len(golden_signal) != len(test_signal):
381*9c5db199SXin Li        raise ValueError(
382*9c5db199SXin Li                'Only accepts signal of same length: %s, %s' % (
383*9c5db199SXin Li                        len(golden_signal), len(test_signal)))
384*9c5db199SXin Li
385*9c5db199SXin Li    norm_golden = numpy.linalg.norm(golden_signal)
386*9c5db199SXin Li    norm_test = numpy.linalg.norm(test_signal)
387*9c5db199SXin Li    if norm_golden <= _MINIMUM_SIGNAL_NORM:
388*9c5db199SXin Li        raise GoldenSignalNormTooSmallError(
389*9c5db199SXin Li                'No meaningful data as norm is too small.')
390*9c5db199SXin Li    if norm_test <= _MINIMUM_SIGNAL_NORM:
391*9c5db199SXin Li        raise TestSignalNormTooSmallError(
392*9c5db199SXin Li                'No meaningful data as norm is too small.')
393*9c5db199SXin Li
394*9c5db199SXin Li    # The 'valid' cross correlation result of two signals of same length will
395*9c5db199SXin Li    # contain only one number.
396*9c5db199SXin Li    correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
397*9c5db199SXin Li    return correlation / (norm_golden * norm_test)
398