1# Lint as: python2, python3 2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import logging 7 8from autotest_lib.client.common_lib import error 9from autotest_lib.server.cros.network import attenuator 10from autotest_lib.server.cros.network import attenuator_hosts 11 12from autotest_lib.utils.frozen_chromite.lib import timeout_util 13 14HOST_TO_FIXED_ATTENUATIONS = attenuator_hosts.HOST_FIXED_ATTENUATIONS 15# Fake entry to deal with attenuator not added to attenuator_hosts.py file 16FAKE_HOST = HOST_TO_FIXED_ATTENUATIONS['fake-atten-host'] 17 18 19class AttenuatorController(object): 20 """Represents a minicircuits variable attenuator. 21 22 This device is used to vary the attenuation between a router and a client. 23 This allows us to measure throughput as a function of signal strength and 24 test some roaming situations. The throughput vs signal strength tests 25 are referred to rate vs range (RvR) tests in places. 26 27 Fixed attenuatations should be recorded in attenuator_hosts.py else 28 TestError will be raised when fixed attentuations are accessed. 29 30 """ 31 32 @property 33 def supported_attenuators(self): 34 """@return iterable of int attenuators supported on this host.""" 35 return list(self._fixed_attenuations.keys()) 36 37 38 def __init__(self, hostname): 39 """Construct a AttenuatorController. 40 41 @param hostname: Hostname representing minicircuits attenuator. 42 43 """ 44 self.hostname = hostname 45 super(AttenuatorController, self).__init__() 46 part = hostname.split('.cros', 1)[0] 47 if part not in list(HOST_TO_FIXED_ATTENUATIONS.keys()): 48 logging.debug('Attenuator %s not found in attenuator_host list', 49 part) 50 self._fixed_attenuations = FAKE_HOST 51 else: 52 self._fixed_attenuations = HOST_TO_FIXED_ATTENUATIONS[part] 53 num_atten = len(self.supported_attenuators) 54 55 self._attenuator = attenuator.Attenuator(hostname, num_atten) 56 self.set_variable_attenuation(0) 57 58 59 def _approximate_frequency(self, attenuator_num, freq): 60 """Finds an approximate frequency to freq. 61 62 In case freq is not present in self._fixed_attenuations, we use a value 63 from a nearby channel as an approximation. 64 65 @param attenuator_num: attenuator in question on the remote host. Each 66 attenuator has a different fixed path loss per frequency. 67 @param freq: int frequency in MHz. 68 @returns int approximate frequency from self._fixed_attenuations. 69 @raises TestError if attenuator is not in attenuator_hosts.py 70 71 """ 72 self._fail_if_fake() 73 74 old_offset = None 75 approx_freq = None 76 for defined_freq in list(self._fixed_attenuations[attenuator_num].keys()): 77 new_offset = abs(defined_freq - freq) 78 if old_offset is None or new_offset < old_offset: 79 old_offset = new_offset 80 approx_freq = defined_freq 81 82 logging.debug('Approximating attenuation for frequency %d with ' 83 'constants for frequency %d.', freq, approx_freq) 84 return approx_freq 85 86 87 def close(self): 88 """Close variable attenuator connection.""" 89 self._attenuator.close() 90 91 92 def set_total_attenuation(self, atten_db, frequency_mhz, 93 attenuator_num=None): 94 """Set the total attenuation on one or all attenuators. 95 96 @param atten_db: int level of attenuation in dB. This must be 97 higher than the fixed attenuation level of the affected 98 attenuators. 99 @param frequency_mhz: int frequency for which to calculate the 100 total attenuation. The fixed component of attenuation 101 varies with frequency. 102 @param attenuator_num: int attenuator to change, or None to 103 set all variable attenuators. 104 @raises TestError if attenuator is not in attenuator_hosts.py 105 106 """ 107 self._fail_if_fake() 108 109 affected_attenuators = self.supported_attenuators 110 if attenuator_num is not None: 111 affected_attenuators = [attenuator_num] 112 for atten in affected_attenuators: 113 freq_to_fixed_loss = self._fixed_attenuations[atten] 114 approx_freq = self._approximate_frequency(atten, 115 frequency_mhz) 116 variable_atten_db = atten_db - freq_to_fixed_loss[approx_freq] 117 self.set_variable_attenuation(variable_atten_db, 118 attenuator_num=atten) 119 120 121 def set_variable_attenuation(self, atten_db, attenuator_num=None): 122 """Set the variable attenuation on one or all attenuators. 123 124 @param atten_db: int non-negative level of attenuation in dB. 125 @param attenuator_num: int attenuator to change, or None to 126 set all variable attenuators. 127 128 """ 129 affected_attenuators = self.supported_attenuators 130 if attenuator_num is not None: 131 affected_attenuators = [attenuator_num] 132 for atten in affected_attenuators: 133 try: 134 self._attenuator.set_atten(atten, atten_db) 135 if int(self._attenuator.get_atten(atten)) != atten_db: 136 raise error.TestError('Attenuation did not set as expected ' 137 'on attenuator %d' % atten) 138 except error.TestError: 139 self._attenuator.reopen(self.hostname) 140 self._attenuator.set_atten(atten, atten_db) 141 if int(self._attenuator.get_atten(atten)) != atten_db: 142 raise error.TestError('Attenuation did not set as expected ' 143 'on attenuator %d' % atten) 144 logging.info('%ddb attenuation set successfully on attenautor %d', 145 atten_db, atten) 146 147 148 def get_minimal_total_attenuation(self): 149 """Get attenuator's maximum fixed attenuation value. 150 151 This is pulled from the current attenuator's lines and becomes the 152 minimal total attenuation when stepping through attenuation levels. 153 154 @return maximum starting attenuation value 155 @raises TestError if attenuator is not in attenuator_hosts.py 156 157 """ 158 self._fail_if_fake() 159 160 max_atten = 0 161 for atten_num in self._fixed_attenuations.keys(): 162 atten_values = list(self._fixed_attenuations[atten_num].values()) 163 max_atten = max(max(atten_values), max_atten) 164 return max_atten 165 166 167 def set_signal_level(self, client_context, requested_sig_level, 168 min_sig_level_allowed=-85, tolerance_percent=3, timeout=240): 169 """Set wifi signal to desired level by changing attenuation. 170 171 @param client_context: Client context object. 172 @param requested_sig_level: Negative int value in dBm for wifi signal 173 level to be set. 174 @param min_sig_level_allowed: Minimum signal level allowed; this is to 175 ensure that we don't set a signal that is too weak and DUT can 176 not associate. 177 @param tolerance_percent: Percentage to be used to calculate the desired 178 range for the wifi signal level. 179 """ 180 atten_db = 0 181 starting_sig_level = client_context.wifi_signal_level 182 if not starting_sig_level: 183 raise error.TestError("No signal detected.") 184 if not (min_sig_level_allowed <= requested_sig_level <= 185 starting_sig_level): 186 raise error.TestError("Requested signal level (%d) is either " 187 "higher than current signal level (%r) with " 188 "0db attenuation or lower than minimum " 189 "signal level (%d) allowed." % 190 (requested_sig_level, 191 starting_sig_level, 192 min_sig_level_allowed)) 193 194 try: 195 with timeout_util.Timeout(timeout): 196 while True: 197 client_context.reassociate(timeout_seconds=1) 198 current_sig_level = client_context.wifi_signal_level 199 logging.info("Current signal level %r", current_sig_level) 200 if not current_sig_level: 201 raise error.TestError("No signal detected.") 202 if self.signal_in_range(requested_sig_level, 203 current_sig_level, tolerance_percent): 204 logging.info("Signal level set to %r.", 205 current_sig_level) 206 break 207 if current_sig_level > requested_sig_level: 208 self.set_variable_attenuation(atten_db) 209 atten_db +=1 210 if current_sig_level < requested_sig_level: 211 self.set_variable_attenuation(atten_db) 212 atten_db -= 1 213 except (timeout_util.TimeoutError, error.TestError, 214 error.TestFail) as e: 215 raise error.TestError("Not able to set wifi signal to requested " 216 "level. \n%s" % e) 217 218 219 def signal_in_range(self, req_sig_level, curr_sig_level, tolerance_percent): 220 """Check if wifi signal is within the threshold of requested signal. 221 222 @param req_sig_level: Negative int value in dBm for wifi signal 223 level to be set. 224 @param curr_sig_level: Current wifi signal level seen by the DUT. 225 @param tolerance_percent: Percentage to be used to calculate the desired 226 range for the wifi signal level. 227 228 @returns True if wifi signal is in the desired range. 229 """ 230 min_sig = req_sig_level + (req_sig_level * tolerance_percent / 100) 231 max_sig = req_sig_level - (req_sig_level * tolerance_percent / 100) 232 if min_sig <= curr_sig_level <= max_sig: 233 return True 234 return False 235 236 def _fail_if_fake(self): 237 """ Raises test error if this attenuator is missing 238 239 If an attenuator is missing, we use use a fake entry. This function 240 will fail the test if the current attenuator is fake. 241 """ 242 if self._fixed_attenuations == FAKE_HOST: 243 raise error.TestError( 244 'Attenuator %r not found in attenuator_hosts.py' % 245 self.hostname) 246