xref: /aosp_15_r20/external/autotest/server/cros/network/attenuator_controller.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import logging
7
8from autotest_lib.client.common_lib import error
9from autotest_lib.server.cros.network import attenuator
10from autotest_lib.server.cros.network import attenuator_hosts
11
12from autotest_lib.utils.frozen_chromite.lib import timeout_util
13
14HOST_TO_FIXED_ATTENUATIONS = attenuator_hosts.HOST_FIXED_ATTENUATIONS
15# Fake entry to deal with attenuator not added to attenuator_hosts.py file
16FAKE_HOST = HOST_TO_FIXED_ATTENUATIONS['fake-atten-host']
17
18
19class AttenuatorController(object):
20    """Represents a minicircuits variable attenuator.
21
22    This device is used to vary the attenuation between a router and a client.
23    This allows us to measure throughput as a function of signal strength and
24    test some roaming situations.  The throughput vs signal strength tests
25    are referred to rate vs range (RvR) tests in places.
26
27    Fixed attenuatations should be recorded in attenuator_hosts.py else
28    TestError will be raised when fixed attentuations are accessed.
29
30    """
31
32    @property
33    def supported_attenuators(self):
34        """@return iterable of int attenuators supported on this host."""
35        return list(self._fixed_attenuations.keys())
36
37
38    def __init__(self, hostname):
39        """Construct a AttenuatorController.
40
41        @param hostname: Hostname representing minicircuits attenuator.
42
43        """
44        self.hostname = hostname
45        super(AttenuatorController, self).__init__()
46        part = hostname.split('.cros', 1)[0]
47        if part not in list(HOST_TO_FIXED_ATTENUATIONS.keys()):
48            logging.debug('Attenuator %s not found in attenuator_host list',
49                          part)
50            self._fixed_attenuations = FAKE_HOST
51        else:
52            self._fixed_attenuations = HOST_TO_FIXED_ATTENUATIONS[part]
53        num_atten = len(self.supported_attenuators)
54
55        self._attenuator = attenuator.Attenuator(hostname, num_atten)
56        self.set_variable_attenuation(0)
57
58
59    def _approximate_frequency(self, attenuator_num, freq):
60        """Finds an approximate frequency to freq.
61
62        In case freq is not present in self._fixed_attenuations, we use a value
63        from a nearby channel as an approximation.
64
65        @param attenuator_num: attenuator in question on the remote host.  Each
66                attenuator has a different fixed path loss per frequency.
67        @param freq: int frequency in MHz.
68        @returns int approximate frequency from self._fixed_attenuations.
69        @raises TestError if attenuator is not in attenuator_hosts.py
70
71        """
72        self._fail_if_fake()
73
74        old_offset = None
75        approx_freq = None
76        for defined_freq in list(self._fixed_attenuations[attenuator_num].keys()):
77            new_offset = abs(defined_freq - freq)
78            if old_offset is None or new_offset < old_offset:
79                old_offset = new_offset
80                approx_freq = defined_freq
81
82        logging.debug('Approximating attenuation for frequency %d with '
83                      'constants for frequency %d.', freq, approx_freq)
84        return approx_freq
85
86
87    def close(self):
88        """Close variable attenuator connection."""
89        self._attenuator.close()
90
91
92    def set_total_attenuation(self, atten_db, frequency_mhz,
93                              attenuator_num=None):
94        """Set the total attenuation on one or all attenuators.
95
96        @param atten_db: int level of attenuation in dB.  This must be
97                higher than the fixed attenuation level of the affected
98                attenuators.
99        @param frequency_mhz: int frequency for which to calculate the
100                total attenuation.  The fixed component of attenuation
101                varies with frequency.
102        @param attenuator_num: int attenuator to change, or None to
103                set all variable attenuators.
104        @raises TestError if attenuator is not in attenuator_hosts.py
105
106        """
107        self._fail_if_fake()
108
109        affected_attenuators = self.supported_attenuators
110        if attenuator_num is not None:
111            affected_attenuators = [attenuator_num]
112        for atten in affected_attenuators:
113            freq_to_fixed_loss = self._fixed_attenuations[atten]
114            approx_freq = self._approximate_frequency(atten,
115                                                      frequency_mhz)
116            variable_atten_db = atten_db - freq_to_fixed_loss[approx_freq]
117            self.set_variable_attenuation(variable_atten_db,
118                                          attenuator_num=atten)
119
120
121    def set_variable_attenuation(self, atten_db, attenuator_num=None):
122        """Set the variable attenuation on one or all attenuators.
123
124        @param atten_db: int non-negative level of attenuation in dB.
125        @param attenuator_num: int attenuator to change, or None to
126                set all variable attenuators.
127
128        """
129        affected_attenuators = self.supported_attenuators
130        if attenuator_num is not None:
131            affected_attenuators = [attenuator_num]
132        for atten in affected_attenuators:
133            try:
134                self._attenuator.set_atten(atten, atten_db)
135                if int(self._attenuator.get_atten(atten)) != atten_db:
136                    raise error.TestError('Attenuation did not set as expected '
137                                          'on attenuator %d' % atten)
138            except error.TestError:
139                self._attenuator.reopen(self.hostname)
140                self._attenuator.set_atten(atten, atten_db)
141                if int(self._attenuator.get_atten(atten)) != atten_db:
142                    raise error.TestError('Attenuation did not set as expected '
143                                          'on attenuator %d' % atten)
144            logging.info('%ddb attenuation set successfully on attenautor %d',
145                         atten_db, atten)
146
147
148    def get_minimal_total_attenuation(self):
149        """Get attenuator's maximum fixed attenuation value.
150
151        This is pulled from the current attenuator's lines and becomes the
152        minimal total attenuation when stepping through attenuation levels.
153
154        @return maximum starting attenuation value
155        @raises TestError if attenuator is not in attenuator_hosts.py
156
157        """
158        self._fail_if_fake()
159
160        max_atten = 0
161        for atten_num in self._fixed_attenuations.keys():
162            atten_values = list(self._fixed_attenuations[atten_num].values())
163            max_atten = max(max(atten_values), max_atten)
164        return max_atten
165
166
167    def set_signal_level(self, client_context, requested_sig_level,
168            min_sig_level_allowed=-85, tolerance_percent=3, timeout=240):
169        """Set wifi signal to desired level by changing attenuation.
170
171        @param client_context: Client context object.
172        @param requested_sig_level: Negative int value in dBm for wifi signal
173                level to be set.
174        @param min_sig_level_allowed: Minimum signal level allowed; this is to
175                ensure that we don't set a signal that is too weak and DUT can
176                not associate.
177        @param tolerance_percent: Percentage to be used to calculate the desired
178                range for the wifi signal level.
179        """
180        atten_db = 0
181        starting_sig_level = client_context.wifi_signal_level
182        if not starting_sig_level:
183            raise error.TestError("No signal detected.")
184        if not (min_sig_level_allowed <= requested_sig_level <=
185                starting_sig_level):
186            raise error.TestError("Requested signal level (%d) is either "
187                                  "higher than current signal level (%r) with "
188                                  "0db attenuation or lower than minimum "
189                                  "signal level (%d) allowed." %
190                                  (requested_sig_level,
191                                  starting_sig_level,
192                                  min_sig_level_allowed))
193
194        try:
195            with timeout_util.Timeout(timeout):
196                while True:
197                    client_context.reassociate(timeout_seconds=1)
198                    current_sig_level = client_context.wifi_signal_level
199                    logging.info("Current signal level %r", current_sig_level)
200                    if not current_sig_level:
201                        raise error.TestError("No signal detected.")
202                    if self.signal_in_range(requested_sig_level,
203                            current_sig_level, tolerance_percent):
204                        logging.info("Signal level set to %r.",
205                                     current_sig_level)
206                        break
207                    if current_sig_level > requested_sig_level:
208                        self.set_variable_attenuation(atten_db)
209                        atten_db +=1
210                    if current_sig_level < requested_sig_level:
211                        self.set_variable_attenuation(atten_db)
212                        atten_db -= 1
213        except (timeout_util.TimeoutError, error.TestError,
214                error.TestFail) as e:
215            raise error.TestError("Not able to set wifi signal to requested "
216                                  "level. \n%s" % e)
217
218
219    def signal_in_range(self, req_sig_level, curr_sig_level, tolerance_percent):
220        """Check if wifi signal is within the threshold of requested signal.
221
222        @param req_sig_level: Negative int value in dBm for wifi signal
223                level to be set.
224        @param curr_sig_level: Current wifi signal level seen by the DUT.
225        @param tolerance_percent: Percentage to be used to calculate the desired
226                range for the wifi signal level.
227
228        @returns True if wifi signal is in the desired range.
229        """
230        min_sig = req_sig_level + (req_sig_level * tolerance_percent / 100)
231        max_sig = req_sig_level - (req_sig_level * tolerance_percent / 100)
232        if min_sig <= curr_sig_level <= max_sig:
233            return True
234        return False
235
236    def _fail_if_fake(self):
237        """ Raises test error if this attenuator is missing
238
239        If an attenuator is missing, we use use a fake entry. This function
240        will fail the test if the current attenuator is fake.
241        """
242        if self._fixed_attenuations == FAKE_HOST:
243            raise error.TestError(
244                    'Attenuator %r  not found in attenuator_hosts.py' %
245                    self.hostname)
246