1#!/usr/bin/python3 2from __future__ import absolute_import 3from __future__ import division 4from __future__ import print_function 5 6import logging 7import numpy 8import os 9import unittest 10 11import common 12from autotest_lib.client.cros.audio import audio_analysis 13from autotest_lib.client.cros.audio import audio_data 14from six.moves import range 15 16class SpectralAnalysisTest(unittest.TestCase): 17 def setUp(self): 18 """Uses the same seed to generate noise for each test.""" 19 numpy.random.seed(0) 20 21 22 def stub_peak_detection(self, array, window_size): 23 """Detects peaks in an array in simple way. 24 25 A point (i, array[i]) is a peak if array[i] is the maximum among 26 array[i - half_window_size] to array[i + half_window_size]. 27 If array[i - half_window_size] to array[i + half_window_size] are all 28 equal, then there is no peak in this window. 29 30 @param window_size: The window to detect peaks. 31 32 @returns: A list of tuples: 33 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), 34 ...] 35 where the tuples are sorted by peak values. 36 37 """ 38 half_window_size = window_size // 2 39 length = len(array) 40 41 def mid_is_peak(array, mid, left, right): 42 """Checks if value at mid is the largest among left to right. 43 44 @param array: A list of numbers. 45 @param mid: The mid index. 46 @param left: The left index. 47 @param rigth: The right index. 48 49 @returns: True if array[index] is the maximum among numbers in array 50 between index [left, right] inclusively. 51 52 """ 53 value_mid = array[mid] 54 for index in range(left, right + 1): 55 if index == mid: 56 continue 57 if array[index] >= value_mid: 58 return False 59 return True 60 61 results = [] 62 for mid in range(length): 63 left = max(0, mid - half_window_size) 64 right = min(length - 1, mid + half_window_size) 65 if mid_is_peak(array, mid, left, right): 66 results.append((mid, array[mid])) 67 68 # Sort the peaks by values. 69 return sorted(results, key=lambda x: x[1], reverse=True) 70 71 72 def testPeakDetection(self): 73 array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1] 74 result = audio_analysis.peak_detection(array, 4) 75 golden_answer = [(12, 5), (4, 4)] 76 self.assertEqual(result, golden_answer) 77 78 79 def testPeakDetectionLarge(self): 80 array = numpy.random.uniform(0, 1, 1000000) 81 window_size = 100 82 logging.debug('Test large array using stub peak detection') 83 stub_answer = self.stub_peak_detection(array, window_size) 84 logging.debug('Test large array using improved peak detection') 85 improved_answer = audio_analysis.peak_detection(array, window_size) 86 logging.debug('Compare the result') 87 self.assertEqual(stub_answer, improved_answer) 88 89 90 def testSpectralAnalysis(self): 91 rate = 48000 92 length_in_secs = 0.5 93 freq_1 = 490.0 94 freq_2 = 60.0 95 coeff_1 = 1 96 coeff_2 = 0.3 97 samples = int(length_in_secs * rate) 98 noise = numpy.random.standard_normal(samples) * 0.005 99 x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples) 100 y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + 101 coeff_2 * numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise 102 results = audio_analysis.spectral_analysis(y, rate) 103 # Results should contains 104 # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant 105 # frequency with coefficient 1, 60Hz is the second dominant frequency 106 # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient 107 # around 0.1. The k constant is resulted from window function. 108 logging.debug('Results: %s', results) 109 self.assertTrue(abs(results[0][0]-freq_1) < 1) 110 self.assertTrue(abs(results[1][0]-freq_2) < 1) 111 self.assertTrue( 112 abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01) 113 114 115 def testSpectralAnalysisRealData(self): 116 """This unittest checks the spectral analysis works on real data.""" 117 file_path = os.path.join( 118 os.path.dirname(__file__), 'test_data', '1k_2k.raw') 119 binary = open(file_path, 'rb').read() 120 data = audio_data.AudioRawData(binary, 2, 'S32_LE') 121 saturate_value = audio_data.get_maximum_value_from_sample_format( 122 'S32_LE') 123 golden_frequency = [1000, 2000] 124 for channel in [0, 1]: 125 normalized_signal = audio_analysis.normalize_signal( 126 data.channel_data[channel],saturate_value) 127 spectral = audio_analysis.spectral_analysis( 128 normalized_signal, 48000, 0.02) 129 logging.debug('channel %s: %s', channel, spectral) 130 self.assertTrue(abs(spectral[0][0] - golden_frequency[channel]) < 5, 131 'Dominant frequency is not correct') 132 133 134 def testNotMeaningfulData(self): 135 """Checks that sepectral analysis handles un-meaningful data.""" 136 rate = 48000 137 length_in_secs = 0.5 138 samples = int(length_in_secs * rate) 139 noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5 140 noise = numpy.random.standard_normal(samples) * noise_amplitude 141 results = audio_analysis.spectral_analysis(noise, rate) 142 self.assertEqual([(0, 0)], results) 143 144 145 def testEmptyData(self): 146 """Checks that sepectral analysis rejects empty data.""" 147 with self.assertRaises(audio_analysis.EmptyDataError): 148 results = audio_analysis.spectral_analysis([], 100) 149 150 151class NormalizeTest(unittest.TestCase): 152 def testNormalize(self): 153 y = [1, 2, 3, 4, 5] 154 normalized_y = audio_analysis.normalize_signal(y, 10) 155 expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5]) 156 for i in range(len(y)): 157 self.assertEqual(expected[i], normalized_y[i]) 158 159 160class AnomalyTest(unittest.TestCase): 161 def setUp(self): 162 """Creates a test signal of sine wave.""" 163 # Use the same seed for each test case. 164 numpy.random.seed(0) 165 166 self.block_size = 120 167 self.rate = 48000 168 self.freq = 440 169 length_in_secs = 0.25 170 self.samples = int(length_in_secs * self.rate) 171 x = numpy.linspace( 172 0.0, (self.samples - 1) * 1.0 / self.rate, self.samples) 173 self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x) 174 175 176 def add_noise(self): 177 """Add noise to the test signal.""" 178 noise_amplitude = 0.3 179 noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude 180 self.y = self.y + noise 181 182 183 def insert_anomaly(self): 184 """Inserts an anomaly to the test signal. 185 186 The anomaly self.anomaly_samples should be created before calling this 187 method. 188 189 """ 190 self.anomaly_start_secs = 0.1 191 self.y = numpy.insert(self.y, int(self.anomaly_start_secs * self.rate), 192 self.anomaly_samples) 193 194 195 def generate_skip_anomaly(self): 196 """Skips a section of test signal.""" 197 self.anomaly_start_secs = 0.1 198 self.anomaly_duration_secs = 0.005 199 anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs 200 anomaly_start_index = int(self.anomaly_start_secs * self.rate) 201 anomaly_append_index = int(anomaly_append_secs * self.rate) 202 self.y = numpy.append(self.y[:anomaly_start_index], self.y[anomaly_append_index:]) 203 204 205 def create_constant_anomaly(self, amplitude): 206 """Creates an anomaly of constant samples. 207 208 @param amplitude: The amplitude of the constant samples. 209 210 """ 211 self.anomaly_duration_secs = 0.005 212 self.anomaly_samples = ( 213 [amplitude] * int(self.anomaly_duration_secs * self.rate)) 214 215 216 def run_analysis(self): 217 """Runs the anomaly detection.""" 218 self.results = audio_analysis.anomaly_detection( 219 self.y, self.rate, self.freq, self.block_size) 220 logging.debug('Results: %s', self.results) 221 222 223 def check_no_anomaly(self): 224 """Verifies that there is no anomaly in detection result.""" 225 self.run_analysis() 226 self.assertFalse(self.results) 227 228 229 def check_anomaly(self): 230 """Verifies that there is anomaly in detection result. 231 232 The detection result should contain anomaly time stamps that are 233 close to where anomaly was inserted. There can be multiple anomalies 234 since the detection depends on the block size. 235 236 """ 237 self.run_analysis() 238 self.assertTrue(self.results) 239 # Anomaly can be detected as long as the detection window of block size 240 # overlaps with anomaly. 241 expected_detected_range_secs = ( 242 self.anomaly_start_secs - float(self.block_size) / self.rate, 243 self.anomaly_start_secs + self.anomaly_duration_secs) 244 for detected_secs in self.results: 245 self.assertTrue(detected_secs <= expected_detected_range_secs[1]) 246 self.assertTrue(detected_secs >= expected_detected_range_secs[0] ) 247 248 249 def testGoodSignal(self): 250 """Sine wave signal with no noise or anomaly.""" 251 self.check_no_anomaly() 252 253 254 def testGoodSignalNoise(self): 255 """Sine wave signal with noise.""" 256 self.add_noise() 257 self.check_no_anomaly() 258 259 260 def testZeroAnomaly(self): 261 """Sine wave signal with no noise but with anomaly. 262 263 This test case simulates underrun in digital data where there will be 264 one block of samples with 0 amplitude. 265 266 """ 267 self.create_constant_anomaly(0) 268 self.insert_anomaly() 269 self.check_anomaly() 270 271 272 def testZeroAnomalyNoise(self): 273 """Sine wave signal with noise and anomaly. 274 275 This test case simulates underrun in analog data where there will be 276 one block of samples with amplitudes close to 0. 277 278 """ 279 self.create_constant_anomaly(0) 280 self.insert_anomaly() 281 self.add_noise() 282 self.check_anomaly() 283 284 285 def testLowConstantAnomaly(self): 286 """Sine wave signal with low constant anomaly. 287 288 The anomaly is one block of constant values. 289 290 """ 291 self.create_constant_anomaly(0.05) 292 self.insert_anomaly() 293 self.check_anomaly() 294 295 296 def testLowConstantAnomalyNoise(self): 297 """Sine wave signal with low constant anomaly and noise. 298 299 The anomaly is one block of constant values. 300 301 """ 302 self.create_constant_anomaly(0.05) 303 self.insert_anomaly() 304 self.add_noise() 305 self.check_anomaly() 306 307 308 def testHighConstantAnomaly(self): 309 """Sine wave signal with high constant anomaly. 310 311 The anomaly is one block of constant values. 312 313 """ 314 self.create_constant_anomaly(2) 315 self.insert_anomaly() 316 self.check_anomaly() 317 318 319 def testHighConstantAnomalyNoise(self): 320 """Sine wave signal with high constant anomaly and noise. 321 322 The anomaly is one block of constant values. 323 324 """ 325 self.create_constant_anomaly(2) 326 self.insert_anomaly() 327 self.add_noise() 328 self.check_anomaly() 329 330 331 def testSkippedAnomaly(self): 332 """Sine wave signal with skipped anomaly. 333 334 The anomaly simulates the symptom where a block is skipped. 335 336 """ 337 self.generate_skip_anomaly() 338 self.check_anomaly() 339 340 341 def testSkippedAnomalyNoise(self): 342 """Sine wave signal with skipped anomaly with noise. 343 344 The anomaly simulates the symptom where a block is skipped. 345 346 """ 347 self.generate_skip_anomaly() 348 self.add_noise() 349 self.check_anomaly() 350 351 352 def testEmptyData(self): 353 """Checks that anomaly detection rejects empty data.""" 354 self.y = [] 355 with self.assertRaises(audio_analysis.EmptyDataError): 356 self.check_anomaly() 357 358 359if __name__ == '__main__': 360 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') 361 unittest.main() 362