1# Lint as: python2, python3 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import logging 7import os 8import random 9import re 10import subprocess 11import time 12 13from autotest_lib.client.bin import test 14from autotest_lib.client.common_lib import error 15from autotest_lib.client.cros import upstart 16from autotest_lib.client.cros.audio import audio_helper 17 18_STREAM_TYPE_INPUT = 0 19_STREAM_TYPE_OUTPUT = 1 20 21class audio_CrasStress(test.test): 22 """Checks if output buffer will drift to super high level.""" 23 version = 2 24 _MAX_STREAMS = 3 25 _LOOP_COUNT = 300 26 _INPUT_BUFFER_LEVEL = '.*?READ_AUDIO.*?hw_level.*?(\d+).*?' 27 _OUTPUT_BUFFER_LEVEL = '.*?FILL_AUDIO.*?hw_level.*?(\d+).*?' 28 _CHECK_PERIOD_TIME_SECS = 1 # Check buffer level every second. 29 30 """ 31 We only run 1024 and 512 block size streams in this test. So buffer level 32 of input device should stay between 0 and 1024. Buffer level of output 33 device should between 1024 to 2048. Sometimes it will be a little more. 34 Therefore, we set input buffer criteria to 2 * 1024 and output buffer 35 criteria to 3 * 1024. 36 """ 37 _RATES = ['48000', '44100'] 38 _BLOCK_SIZES = ['512', '1024'] 39 _INPUT_BUFFER_DRIFT_CRITERIA = 2 * 1024 40 _OUTPUT_BUFFER_DRIFT_CRITERIA = 3 * 1024 41 42 def initialize(self): 43 """Initialize the test""" 44 upstart.stop_job('ui') 45 46 def _new_stream(self, stream_type): 47 """Runs new stream by cras_test_client.""" 48 if stream_type == _STREAM_TYPE_INPUT: 49 cmd = ['cras_test_client', '--capture_file', '/dev/null'] 50 else: 51 cmd = ['cras_test_client', '--playback_file', '/dev/zero'] 52 53 cmd += ['--rate', self._RATES[random.randint(0, 1)], 54 '--block_size', self._BLOCK_SIZES[random.randint(0, 1)]] 55 56 return subprocess.Popen(cmd) 57 58 def _check_buffer_level(self, stream_type): 59 60 buffer_level = self._get_buffer_level(stream_type) 61 62 if stream_type == _STREAM_TYPE_INPUT: 63 logging.debug("Max input buffer level: %d", buffer_level) 64 if buffer_level > self._INPUT_BUFFER_DRIFT_CRITERIA: 65 audio_helper.dump_audio_diagnostics( 66 os.path.join(self.resultsdir, "audio_diagnostics.txt")) 67 raise error.TestFail('Input buffer level %d drift too high' % 68 buffer_level) 69 70 if stream_type == _STREAM_TYPE_OUTPUT: 71 logging.debug("Max output buffer level: %d", buffer_level) 72 if buffer_level > self._OUTPUT_BUFFER_DRIFT_CRITERIA: 73 audio_helper.dump_audio_diagnostics( 74 os.path.join(self.resultsdir, "audio_diagnostics.txt")) 75 raise error.TestFail('Output buffer level %d drift too high' % 76 buffer_level) 77 78 def cleanup(self): 79 """Clean up all streams.""" 80 while len(self._streams) > 0: 81 self._streams[0].kill() 82 self._streams.remove(self._streams[0]) 83 upstart.restart_job('ui') 84 85 def run_once(self, input_stream=True, output_stream=True): 86 """ 87 Repeatedly add output streams of random configurations and 88 remove them to verify if output buffer level would drift. 89 90 @params input_stream: If true, run input stream in the test. 91 @params output_stream: If true, run output stream in the test. 92 """ 93 94 if not input_stream and not output_stream: 95 raise error.TestError('Not supported mode.') 96 97 self._streams = [] 98 99 loop_count = 0 100 past_time = time.time() 101 while loop_count < self._LOOP_COUNT: 102 103 # 1 for adding stream, 0 for removing stream. 104 add = random.randint(0, 1) 105 if not self._streams: 106 add = 1 107 elif len(self._streams) == self._MAX_STREAMS: 108 add = 0 109 110 if add == 1: 111 # 0 for input stream, 1 for output stream. 112 stream_type = random.randint(0, 1) 113 if not input_stream: 114 stream_type = _STREAM_TYPE_OUTPUT 115 elif not output_stream: 116 stream_type = _STREAM_TYPE_INPUT 117 118 self._streams.append(self._new_stream(stream_type)) 119 else: 120 self._streams[0].kill() 121 self._streams.remove(self._streams[0]) 122 time.sleep(0.1) 123 124 now = time.time() 125 126 # Check buffer level. 127 if now - past_time > self._CHECK_PERIOD_TIME_SECS: 128 past_time = now 129 if input_stream: 130 self._check_buffer_level(_STREAM_TYPE_INPUT) 131 if output_stream: 132 self._check_buffer_level(_STREAM_TYPE_OUTPUT) 133 134 loop_count += 1 135 136 def _get_buffer_level(self, stream_type): 137 """Gets a rough number about current buffer level. 138 139 @returns: The current buffer level. 140 141 """ 142 if stream_type == _STREAM_TYPE_INPUT: 143 match_str = self._INPUT_BUFFER_LEVEL 144 else: 145 match_str = self._OUTPUT_BUFFER_LEVEL 146 147 proc = subprocess.Popen(['cras_test_client', '--dump_a'], 148 stdout=subprocess.PIPE) 149 output, err = proc.communicate() 150 buffer_level = 0 151 for line in output.decode().split('\n'): 152 search = re.match(match_str, line) 153 if search: 154 tmp = int(search.group(1)) 155 if tmp > buffer_level: 156 buffer_level = tmp 157 return buffer_level 158