1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import logging
21import numpy
22import os
23from acts import asserts
24from acts import context
25from acts import base_test
26from acts import utils
27from acts.controllers import iperf_client
28from acts.controllers.utils_lib import ssh
29from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
30from acts_contrib.test_utils.wifi import ota_chamber
31from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
32from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
33from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
34from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
35from acts_contrib.test_utils.wifi import ota_sniffer
36from functools import partial
37from WifiRvrTest import WifiRvrTest
38from WifiPingTest import WifiPingTest
39
40
41class WifiSensitivityTest(WifiRvrTest, WifiPingTest):
42    """Class to test WiFi sensitivity tests.
43
44    This class implements measures WiFi sensitivity per rate. It heavily
45    leverages the WifiRvrTest class and introduced minor differences to set
46    specific rates and the access point, and implements a different pass/fail
47    check. For an example config file to run this test class see
48    example_connectivity_performance_ap_sta.json.
49    """
50
51    MAX_CONSECUTIVE_ZEROS = 5
52    RSSI_POLL_INTERVAL = 0.2
53    VALID_TEST_CONFIGS = {
54        1: ['legacy', 'VHT20'],
55        2: ['legacy', 'VHT20'],
56        6: ['legacy', 'VHT20'],
57        10: ['legacy', 'VHT20'],
58        11: ['legacy', 'VHT20'],
59        36: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
60        40: ['legacy', 'VHT20'],
61        44: ['legacy', 'VHT20'],
62        48: ['legacy', 'VHT20'],
63        149: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
64        153: ['legacy', 'VHT20'],
65        157: ['legacy', 'VHT20'],
66        161: ['legacy', 'VHT20']
67    }
68    RateTuple = collections.namedtuple(('RateTuple'),
69                                       ['mcs', 'streams', 'data_rate'])
70    #yapf:disable
71    VALID_RATES = {
72        'legacy_2GHz': [
73            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
74            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
75            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
76            RateTuple(11, 1, 11), RateTuple(9, 1, 9),
77            RateTuple(6, 1, 6), RateTuple(5.5, 1, 5.5),
78            RateTuple(2, 1, 2), RateTuple(1, 1, 1)],
79        'legacy_5GHz': [
80            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
81            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
82            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
83            RateTuple(9, 1, 9), RateTuple(6, 1, 6)],
84        'HT20': [
85            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
86            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
87            RateTuple(3, 1, 26), RateTuple(2, 1, 21.7),
88            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
89            RateTuple(15, 2, 144.4), RateTuple(14, 2, 130),
90            RateTuple(13, 2, 115.6), RateTuple(12, 2, 86.7),
91            RateTuple(11, 2, 57.8), RateTuple(10, 2, 43.4),
92            RateTuple(9, 2, 28.9), RateTuple(8, 2, 14.4)],
93        'VHT20': [
94            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
95            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
96            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
97            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
98            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
99            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
100            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
101            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
102            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
103            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
104        'VHT40': [
105            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
106            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
107            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
108            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
109            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
110            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
111            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
112            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
113            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
114            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
115        'VHT80': [
116            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
117            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
118            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
119            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
120            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
121            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
122            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
123            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
124            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
125            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
126    }
127    #yapf:enable
128
129    def __init__(self, controllers):
130        base_test.BaseTestClass.__init__(self, controllers)
131        self.testcase_metric_logger = (
132            BlackboxMappedMetricLogger.for_test_case())
133        self.testclass_metric_logger = (
134            BlackboxMappedMetricLogger.for_test_class())
135        self.publish_testcase_metrics = True
136
137    def setup_class(self):
138        """Initializes common test hardware and parameters.
139
140        This function initializes hardwares and compiles parameters that are
141        common to all tests in this class.
142        """
143        self.dut = self.android_devices[-1]
144        self.sta_dut = self.android_devices[-1]
145        req_params = [
146            'RetailAccessPoints', 'sensitivity_test_params', 'testbed_params',
147            'RemoteServer'
148        ]
149        opt_params = ['main_network', 'OTASniffer']
150        self.unpack_userparams(req_params, opt_params)
151        self.testclass_params = self.sensitivity_test_params
152        self.num_atten = self.attenuators[0].instrument.num_atten
153        self.ping_server = ssh.connection.SshConnection(
154            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
155        if hasattr(self,
156                   'OTASniffer') and self.testbed_params['sniffer_enable']:
157            try:
158                self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
159            except:
160                self.log.warning('Could not start sniffer. Disabling sniffs.')
161                self.testbed_params['sniffer_enable'] = 0
162        self.remote_server = self.ping_server
163        self.iperf_server = self.iperf_servers[0]
164        self.iperf_client = self.iperf_clients[0]
165        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
166        self.log.info('Access Point Configuration: {}'.format(
167            self.access_point.ap_settings))
168        self.log_path = os.path.join(logging.log_path, 'results')
169        os.makedirs(self.log_path, exist_ok=True)
170        self.atten_dut_chain_map = {}
171        self.testclass_results = []
172
173        # Turn WiFi ON
174        if self.testclass_params.get('airplane_mode', 1):
175            self.log.info('Turning on airplane mode.')
176            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
177                                'Can not turn on airplane mode.')
178        wutils.wifi_toggle_state(self.dut, True)
179
180        # Configure test retries
181        self.user_params['retry_tests'] = [self.__class__.__name__]
182
183    def teardown_class(self):
184        self.access_point.teardown()
185        # Turn WiFi OFF
186        for dev in self.android_devices:
187            wutils.wifi_toggle_state(dev, False)
188            dev.go_to_sleep()
189        self.process_testclass_results()
190
191    def setup_test(self):
192        self.retry_flag = False
193
194    def teardown_test(self):
195        self.retry_flag = False
196
197    def on_retry(self):
198        """Function to control test logic on retried tests.
199
200        This function is automatically executed on tests that are being
201        retried. In this case the function resets wifi, toggles it off and on
202        and sets a retry_flag to enable further tweaking the test logic on
203        second attempts.
204        """
205        self.retry_flag = True
206        for dev in self.android_devices:
207            wutils.reset_wifi(dev)
208            wutils.toggle_wifi_off_and_on(dev)
209
210    def pass_fail_check(self, result):
211        """Checks sensitivity results and decides on pass/fail.
212
213        Args:
214            result: dict containing attenuation, throughput and other meta
215                data
216        """
217        result_string = ('Throughput = {}%, Sensitivity = {}.'.format(
218            result['peak_throughput_pct'], result['sensitivity']))
219        if result['peak_throughput_pct'] < 95:
220            asserts.fail('Result unreliable. {}'.format(result_string))
221        else:
222            asserts.explicit_pass('Test Passed. {}'.format(result_string))
223
224    def plot_per_curves(self):
225        """Plots PER curves to help debug sensitivity."""
226
227        plots = collections.OrderedDict()
228        id_fields = ['channel', 'mode', 'num_streams']
229        for result in self.testclass_results:
230            testcase_params = result['testcase_params']
231            plot_id = self.extract_test_id(testcase_params, id_fields)
232            plot_id = tuple(plot_id.items())
233            if plot_id not in plots:
234                plots[plot_id] = BokehFigure(
235                    title='Channel {} {} Nss{}'.format(
236                        result['testcase_params']['channel'],
237                        result['testcase_params']['mode'],
238                        result['testcase_params']['num_streams']),
239                    x_label='Attenuation (dB)',
240                    primary_y_label='PER (%)')
241            per = [stat['summary']['rx_per'] for stat in result['llstats']]
242            if len(per) < len(result['total_attenuation']):
243                per.extend([100] *
244                           (len(result['total_attenuation']) - len(per)))
245            plots[plot_id].add_line(result['total_attenuation'], per,
246                                    result['test_name'])
247        figure_list = []
248        for plot_id, plot in plots.items():
249            plot.generate_figure()
250            figure_list.append(plot)
251        output_file_path = os.path.join(self.log_path, 'PER_curves.html')
252        BokehFigure.save_figures(figure_list, output_file_path)
253
254    def process_testclass_results(self):
255        """Saves and plots test results from all executed test cases."""
256        # write json output
257        self.plot_per_curves()
258        testclass_results_dict = collections.OrderedDict()
259        id_fields = ['mode', 'rate', 'num_streams', 'chain_mask']
260        channels_tested = []
261        for result in self.testclass_results:
262            testcase_params = result['testcase_params']
263            test_id = self.extract_test_id(testcase_params, id_fields)
264            test_id = tuple(test_id.items())
265            if test_id not in testclass_results_dict:
266                testclass_results_dict[test_id] = collections.OrderedDict()
267            channel = testcase_params['channel']
268            if channel not in channels_tested:
269                channels_tested.append(channel)
270            if result['peak_throughput_pct'] >= 95:
271                testclass_results_dict[test_id][channel] = result[
272                    'sensitivity']
273            else:
274                testclass_results_dict[test_id][channel] = ''
275
276        # calculate average metrics
277        metrics_dict = collections.OrderedDict()
278        id_fields = ['channel', 'mode', 'num_streams', 'chain_mask']
279        for test_id in testclass_results_dict.keys():
280            for channel in testclass_results_dict[test_id].keys():
281                metric_tag = collections.OrderedDict(test_id, channel=channel)
282                metric_tag = self.extract_test_id(metric_tag, id_fields)
283                metric_tag = tuple(metric_tag.items())
284                metrics_dict.setdefault(metric_tag, [])
285                sensitivity_result = testclass_results_dict[test_id][channel]
286                if sensitivity_result != '':
287                    metrics_dict[metric_tag].append(sensitivity_result)
288        for metric_tag_tuple, metric_data in metrics_dict.items():
289            metric_tag_dict = collections.OrderedDict(metric_tag_tuple)
290            metric_tag = 'ch{}_{}_nss{}_chain{}'.format(
291                metric_tag_dict['channel'], metric_tag_dict['mode'],
292                metric_tag_dict['num_streams'], metric_tag_dict['chain_mask'])
293            metric_key = '{}.avg_sensitivity'.format(metric_tag)
294            metric_value = numpy.mean(metric_data)
295            self.testclass_metric_logger.add_metric(metric_key, metric_value)
296
297        # write csv
298        csv_header = ['Mode', 'MCS', 'Streams', 'Chain', 'Rate (Mbps)']
299        for channel in channels_tested:
300            csv_header.append('Ch. ' + str(channel))
301        results_file_path = os.path.join(self.log_path, 'results.csv')
302        with open(results_file_path, mode='w') as csv_file:
303            writer = csv.DictWriter(csv_file, fieldnames=csv_header)
304            writer.writeheader()
305            for test_id, test_results in testclass_results_dict.items():
306                test_id_dict = dict(test_id)
307                if 'legacy' in test_id_dict['mode']:
308                    rate_list = self.VALID_RATES['legacy_2GHz']
309                else:
310                    rate_list = self.VALID_RATES[test_id_dict['mode']]
311                data_rate = next(rate.data_rate for rate in rate_list
312                                 if rate[:-1] == (test_id_dict['rate'],
313                                                  test_id_dict['num_streams']))
314                row_value = {
315                    'Mode': test_id_dict['mode'],
316                    'MCS': test_id_dict['rate'],
317                    'Streams': test_id_dict['num_streams'],
318                    'Chain': test_id_dict['chain_mask'],
319                    'Rate (Mbps)': data_rate,
320                }
321                for channel in channels_tested:
322                    row_value['Ch. ' + str(channel)] = test_results.pop(
323                        channel, ' ')
324                writer.writerow(row_value)
325
326        if not self.testclass_params['traffic_type'].lower() == 'ping':
327            WifiRvrTest.process_testclass_results(self)
328
329    def process_rvr_test_results(self, testcase_params, rvr_result):
330        """Post processes RvR results to compute sensitivity.
331
332        Takes in the results of the RvR tests and computes the sensitivity of
333        the current rate by looking at the point at which throughput drops
334        below the percentage specified in the config file. The function then
335        calls on its parent class process_test_results to plot the result.
336
337        Args:
338            rvr_result: dict containing attenuation, throughput and other meta
339            data
340        """
341        rvr_result['peak_throughput'] = max(rvr_result['throughput_receive'])
342        rvr_result['peak_throughput_pct'] = 100
343        throughput_check = [
344            throughput < rvr_result['peak_throughput'] *
345            (self.testclass_params['throughput_pct_at_sensitivity'] / 100)
346            for throughput in rvr_result['throughput_receive']
347        ]
348        consistency_check = [
349            idx for idx in range(len(throughput_check))
350            if all(throughput_check[idx:])
351        ]
352        rvr_result['atten_at_range'] = rvr_result['attenuation'][
353            consistency_check[0] - 1]
354        rvr_result['range'] = rvr_result['fixed_attenuation'] + (
355            rvr_result['atten_at_range'])
356        rvr_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
357            self.testbed_params['ap_tx_power_offset'][str(
358                testcase_params['channel'])] - rvr_result['range'])
359        WifiRvrTest.process_test_results(self, rvr_result)
360
361    def process_ping_test_results(self, testcase_params, ping_result):
362        """Post processes RvR results to compute sensitivity.
363
364        Takes in the results of the RvR tests and computes the sensitivity of
365        the current rate by looking at the point at which throughput drops
366        below the percentage specified in the config file. The function then
367        calls on its parent class process_test_results to plot the result.
368
369        Args:
370            rvr_result: dict containing attenuation, throughput and other meta
371            data
372        """
373        WifiPingTest.process_ping_results(self, testcase_params, ping_result)
374        ping_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
375            self.testbed_params['ap_tx_power_offset'][str(
376                testcase_params['channel'])] - ping_result['range'])
377
378    def setup_ping_test(self, testcase_params):
379        """Function that gets devices ready for the test.
380
381        Args:
382            testcase_params: dict containing test-specific parameters
383        """
384        # Configure AP
385        self.setup_ap(testcase_params)
386        # Set attenuator to starting attenuation
387        for attenuator in self.attenuators:
388            attenuator.set_atten(testcase_params['atten_start'],
389                                 strict=False,
390                                 retry=True)
391        # Reset, configure, and connect DUT
392        self.setup_dut(testcase_params)
393
394    def setup_ap(self, testcase_params):
395        """Sets up the AP and attenuator to compensate for AP chain imbalance.
396
397        Args:
398            testcase_params: dict containing AP and other test params
399        """
400        band = self.access_point.band_lookup_by_channel(
401            testcase_params['channel'])
402        if '2G' in band:
403            frequency = wutils.WifiEnums.channel_2G_to_freq[
404                testcase_params['channel']]
405        else:
406            frequency = wutils.WifiEnums.channel_5G_to_freq[
407                testcase_params['channel']]
408        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
409            self.access_point.set_region(self.testbed_params['DFS_region'])
410        else:
411            self.access_point.set_region(self.testbed_params['default_region'])
412        self.access_point.set_channel(band, testcase_params['channel'])
413        self.access_point.set_bandwidth(band, testcase_params['mode'])
414        self.access_point.set_power(band, testcase_params['ap_tx_power'])
415        self.access_point.set_rate(band, testcase_params['mode'],
416                                   testcase_params['num_streams'],
417                                   testcase_params['rate'],
418                                   testcase_params['short_gi'])
419        # Set attenuator offsets and set attenuators to initial condition
420        atten_offsets = self.testbed_params['chain_offset'][str(
421            testcase_params['channel'])]
422        for atten in self.attenuators:
423            if 'AP-Chain-0' in atten.path:
424                atten.offset = atten_offsets[0]
425            elif 'AP-Chain-1' in atten.path:
426                atten.offset = atten_offsets[1]
427            else:
428                atten.offset = 0
429        self.log.info('Access Point Configuration: {}'.format(
430            self.access_point.ap_settings))
431
432    def setup_dut(self, testcase_params):
433        """Sets up the DUT in the configuration required by the test.
434
435        Args:
436            testcase_params: dict containing AP and other test params
437        """
438        # Turn screen off to preserve battery
439        if self.testbed_params.get('screen_on',
440                                   False) or self.testclass_params.get(
441                                       'screen_on', False):
442            self.dut.droid.wakeLockAcquireDim()
443        else:
444            self.dut.go_to_sleep()
445        if wputils.validate_network(self.dut,
446                                    testcase_params['test_network']['SSID']):
447            self.log.info('Already connected to desired network')
448        else:
449            wutils.wifi_toggle_state(self.dut, False)
450            wutils.set_wifi_country_code(self.dut,
451                                         self.testclass_params['country_code'])
452            wutils.wifi_toggle_state(self.dut, True)
453            wutils.reset_wifi(self.dut)
454            wutils.set_wifi_country_code(self.dut,
455                                         self.testclass_params['country_code'])
456            testcase_params['test_network']['channel'] = testcase_params[
457                'channel']
458            wutils.wifi_connect(self.dut,
459                                testcase_params['test_network'],
460                                num_of_tries=5,
461                                check_connectivity=False)
462        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
463        # Activate/attenuate the correct chains
464        if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
465            self.atten_dut_chain_map[testcase_params[
466                'channel']] = wputils.get_current_atten_dut_chain_map(
467                    self.attenuators, self.dut, self.ping_server)
468        self.log.info('Current Attenuator-DUT Chain Map: {}'.format(
469            self.atten_dut_chain_map[testcase_params['channel']]))
470        for idx, atten in enumerate(self.attenuators):
471            if self.atten_dut_chain_map[testcase_params['channel']][
472                    idx] == testcase_params['attenuated_chain']:
473                atten.offset = atten.instrument.max_atten
474
475    def extract_test_id(self, testcase_params, id_fields):
476        test_id = collections.OrderedDict(
477            (param, testcase_params[param]) for param in id_fields)
478        return test_id
479
480    def get_start_atten(self, testcase_params):
481        """Gets the starting attenuation for this sensitivity test.
482
483        The function gets the starting attenuation by checking whether a test
484        as the next higher MCS has been executed. If so it sets the starting
485        point a configurable number of dBs below the next MCS's sensitivity.
486
487        Returns:
488            start_atten: starting attenuation for current test
489        """
490        # If the test is being retried, start from the beginning
491        if self.retry_flag:
492            self.log.info('Retry flag set. Setting attenuation to minimum.')
493            return self.testclass_params['atten_start']
494        # Get the current and reference test config. The reference test is the
495        # one performed at the current MCS+1
496        current_rate = testcase_params['rate']
497        ref_test_params = self.extract_test_id(
498            testcase_params,
499            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
500        if 'legacy' in testcase_params['mode']:
501            if testcase_params['channel'] <= 13:
502                rate_list = self.VALID_RATES['legacy_2GHz']
503            else:
504                rate_list = self.VALID_RATES['legacy_5GHz']
505            ref_index = max(
506                0,
507                rate_list.index(self.RateTuple(current_rate, 1, current_rate))
508                - 1)
509            ref_test_params['rate'] = rate_list[ref_index].mcs
510        else:
511            ref_test_params['rate'] = current_rate + 1
512
513        # Check if reference test has been run and set attenuation accordingly
514        previous_params = [
515            self.extract_test_id(
516                result['testcase_params'],
517                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
518            for result in self.testclass_results
519        ]
520
521        try:
522            ref_index = previous_params.index(ref_test_params)
523            start_atten = self.testclass_results[ref_index][
524                'atten_at_range'] - (
525                    self.testclass_params['adjacent_mcs_range_gap'])
526        except ValueError:
527            self.log.warning(
528                'Reference test not found. Starting from {} dB'.format(
529                    self.testclass_params['atten_start']))
530            start_atten = self.testclass_params['atten_start']
531            start_atten = max(start_atten, 0)
532        return start_atten
533
534    def compile_test_params(self, testcase_params):
535        """Function that generates test params based on the test name."""
536        # Check if test should be skipped.
537        wputils.check_skip_conditions(testcase_params, self.dut,
538                                      self.access_point,
539                                      getattr(self, 'ota_chamber', None))
540
541        band = self.access_point.band_lookup_by_channel(
542            testcase_params['channel'])
543        testcase_params['band'] = band
544        testcase_params['test_network'] = self.main_network[band]
545        if testcase_params['chain_mask'] in ['0', '1']:
546            testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
547                1 if testcase_params['chain_mask'] == '0' else 0)
548        else:
549            # Set attenuated chain to -1. Do not set to None as this will be
550            # compared to RF chain map which may include None
551            testcase_params['attenuated_chain'] = -1
552
553        self.testclass_params[
554            'range_ping_loss_threshold'] = 100 - self.testclass_params[
555                'throughput_pct_at_sensitivity']
556        if self.testclass_params['traffic_type'] == 'UDP':
557            testcase_params['iperf_args'] = '-i 1 -t {} -J -u -b {}'.format(
558                self.testclass_params['iperf_duration'],
559                self.testclass_params['UDP_rates'][testcase_params['mode']])
560        elif self.testclass_params['traffic_type'] == 'TCP':
561            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
562                self.testclass_params['iperf_duration'])
563
564        if self.testclass_params['traffic_type'] != 'ping' and isinstance(
565                self.iperf_client, iperf_client.IPerfClientOverAdb):
566            testcase_params['iperf_args'] += ' -R'
567            testcase_params['use_client_output'] = True
568        else:
569            testcase_params['use_client_output'] = False
570
571        return testcase_params
572
573    def _test_sensitivity(self, testcase_params):
574        """Function that gets called for each test case
575
576        The function gets called in each rvr test case. The function customizes
577        the rvr test based on the test name of the test that called it
578        """
579        # Compile test parameters from config and test name
580        testcase_params = self.compile_test_params(testcase_params)
581        testcase_params.update(self.testclass_params)
582        testcase_params['atten_start'] = self.get_start_atten(testcase_params)
583        num_atten_steps = int(
584            (testcase_params['atten_stop'] - testcase_params['atten_start']) /
585            testcase_params['atten_step'])
586        testcase_params['atten_range'] = [
587            testcase_params['atten_start'] + x * testcase_params['atten_step']
588            for x in range(0, num_atten_steps)
589        ]
590
591        # Prepare devices and run test
592        if testcase_params['traffic_type'].lower() == 'ping':
593            self.setup_ping_test(testcase_params)
594            result = self.run_ping_test(testcase_params)
595            self.process_ping_test_results(testcase_params, result)
596        else:
597            self.setup_rvr_test(testcase_params)
598            result = self.run_rvr_test(testcase_params)
599            self.process_rvr_test_results(testcase_params, result)
600
601        # Post-process results
602        self.testclass_results.append(result)
603        self.pass_fail_check(result)
604
605    def generate_test_cases(self, channels, modes, chain_mask):
606        """Function that auto-generates test cases for a test class."""
607        test_cases = []
608        for channel in channels:
609            requested_modes = [
610                mode for mode in modes
611                if mode in self.VALID_TEST_CONFIGS[channel]
612            ]
613            for mode in requested_modes:
614                bandwidth = int(''.join([x for x in mode if x.isdigit()]))
615                if 'VHT' in mode:
616                    rates = self.VALID_RATES[mode]
617                elif 'HT' in mode:
618                    rates = self.VALID_RATES[mode]
619                elif 'legacy' in mode and channel < 14:
620                    rates = self.VALID_RATES['legacy_2GHz']
621                elif 'legacy' in mode and channel > 14:
622                    rates = self.VALID_RATES['legacy_5GHz']
623                else:
624                    raise ValueError('Invalid test mode.')
625                for chain, rate in itertools.product(chain_mask, rates):
626                    testcase_params = collections.OrderedDict(
627                        channel=channel,
628                        mode=mode,
629                        bandwidth=bandwidth,
630                        rate=rate.mcs,
631                        num_streams=rate.streams,
632                        short_gi=1,
633                        chain_mask=chain)
634                    if chain in ['0', '1'] and rate[1] == 2:
635                        # Do not test 2-stream rates in single chain mode
636                        continue
637                    if 'legacy' in mode:
638                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
639                                         '_ch{}'.format(
640                                             channel, mode,
641                                             str(rate.mcs).replace('.', 'p'),
642                                             rate.streams, chain))
643                    else:
644                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
645                                         '_ch{}'.format(
646                                             channel, mode, rate.mcs,
647                                             rate.streams, chain))
648                    setattr(self, testcase_name,
649                            partial(self._test_sensitivity, testcase_params))
650                    test_cases.append(testcase_name)
651        return test_cases
652
653
654class WifiSensitivity_AllChannels_Test(WifiSensitivityTest):
655
656    def __init__(self, controllers):
657        super().__init__(controllers)
658        self.tests = self.generate_test_cases(
659            [6, 36, 40, 44, 48, 149, 153, 157, 161],
660            ['VHT20', 'VHT40', 'VHT80'], ['0', '1', '2x2'])
661
662
663class WifiSensitivity_SampleChannels_Test(WifiSensitivityTest):
664
665    def __init__(self, controllers):
666        super().__init__(controllers)
667        self.tests = self.generate_test_cases([6, 36, 149],
668                                              ['VHT20', 'VHT40', 'VHT80'],
669                                              ['0', '1', '2x2'])
670
671
672class WifiSensitivity_2GHz_Test(WifiSensitivityTest):
673
674    def __init__(self, controllers):
675        super().__init__(controllers)
676        self.tests = self.generate_test_cases([1, 2, 6, 10, 11], ['VHT20'],
677                                              ['0', '1', '2x2'])
678
679
680class WifiSensitivity_5GHz_Test(WifiSensitivityTest):
681
682    def __init__(self, controllers):
683        super().__init__(controllers)
684        self.tests = self.generate_test_cases(
685            [36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'],
686            ['0', '1', '2x2'])
687
688
689class WifiSensitivity_UNII1_Test(WifiSensitivityTest):
690
691    def __init__(self, controllers):
692        super().__init__(controllers)
693        self.tests = self.generate_test_cases([36, 40, 44, 48],
694                                              ['VHT20', 'VHT40', 'VHT80'],
695                                              ['0', '1', '2x2'])
696
697
698class WifiSensitivity_UNII3_Test(WifiSensitivityTest):
699
700    def __init__(self, controllers):
701        super().__init__(controllers)
702        self.tests = self.generate_test_cases([149, 153, 157, 161],
703                                              ['VHT20', 'VHT40', 'VHT80'],
704                                              ['0', '1', '2x2'])
705
706
707# Over-the air version of senstivity tests
708class WifiOtaSensitivityTest(WifiSensitivityTest):
709    """Class to test over-the-air senstivity.
710
711    This class implements measures WiFi sensitivity tests in an OTA chamber.
712    It allows setting orientation and other chamber parameters to study
713    performance in varying channel conditions
714    """
715
716    def __init__(self, controllers):
717        base_test.BaseTestClass.__init__(self, controllers)
718        self.testcase_metric_logger = (
719            BlackboxMappedMetricLogger.for_test_case())
720        self.testclass_metric_logger = (
721            BlackboxMappedMetricLogger.for_test_class())
722        self.publish_testcase_metrics = False
723
724    def setup_class(self):
725        WifiSensitivityTest.setup_class(self)
726        self.current_chain_mask = '2x2'
727        self.ota_chamber = ota_chamber.create(
728            self.user_params['OTAChamber'])[0]
729
730    def teardown_class(self):
731        WifiSensitivityTest.teardown_class(self)
732        self.ota_chamber.reset_chamber()
733
734    def setup_sensitivity_test(self, testcase_params):
735        # Setup turntable
736        self.ota_chamber.set_orientation(testcase_params['orientation'])
737        # Continue test setup
738        WifiSensitivityTest.setup_sensitivity_test(self, testcase_params)
739
740    def setup_dut(self, testcase_params):
741        """Sets up the DUT in the configuration required by the test.
742
743        Args:
744            testcase_params: dict containing AP and other test params
745        """
746        # Configure the right INI settings
747        wputils.set_chain_mask(self.dut, testcase_params['chain_mask'])
748        # Turn screen off to preserve battery
749        if self.testbed_params.get('screen_on',
750                                   False) or self.testclass_params.get(
751                                       'screen_on', False):
752            self.dut.droid.wakeLockAcquireDim()
753        else:
754            self.dut.go_to_sleep()
755        self.validate_and_connect(testcase_params)
756        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
757
758    def process_testclass_results(self):
759        """Saves and plots test results from all executed test cases."""
760        self.plot_per_curves()
761        testclass_results_dict = collections.OrderedDict()
762        id_fields = ['channel', 'mode', 'rate']
763        plots = []
764        for result in self.testclass_results:
765            test_id = self.extract_test_id(result['testcase_params'],
766                                           id_fields)
767            test_id = tuple(test_id.items())
768            chain_mask = result['testcase_params']['chain_mask']
769            num_streams = result['testcase_params']['num_streams']
770            line_id = (chain_mask, num_streams)
771            if test_id not in testclass_results_dict:
772                testclass_results_dict[test_id] = collections.OrderedDict()
773            if line_id not in testclass_results_dict[test_id]:
774                testclass_results_dict[test_id][line_id] = {
775                    'orientation': [],
776                    'sensitivity': []
777                }
778            orientation = result['testcase_params']['orientation']
779            if result['peak_throughput_pct'] >= 95:
780                sensitivity = result['sensitivity']
781            else:
782                sensitivity = float('nan')
783            if orientation not in testclass_results_dict[test_id][line_id][
784                    'orientation']:
785                testclass_results_dict[test_id][line_id]['orientation'].append(
786                    orientation)
787                testclass_results_dict[test_id][line_id]['sensitivity'].append(
788                    sensitivity)
789            else:
790                testclass_results_dict[test_id][line_id]['sensitivity'][
791                    -1] = sensitivity
792
793        for test_id, test_data in testclass_results_dict.items():
794            test_id_dict = dict(test_id)
795            if 'legacy' in test_id_dict['mode']:
796                test_id_str = 'Channel {} - {} {}Mbps'.format(
797                    test_id_dict['channel'], test_id_dict['mode'],
798                    test_id_dict['rate'])
799            else:
800                test_id_str = 'Channel {} - {} MCS{}'.format(
801                    test_id_dict['channel'], test_id_dict['mode'],
802                    test_id_dict['rate'])
803            curr_plot = BokehFigure(title=str(test_id_str),
804                                    x_label='Orientation (deg)',
805                                    primary_y_label='Sensitivity (dBm)')
806            for line_id, line_results in test_data.items():
807                curr_plot.add_line(line_results['orientation'],
808                                   line_results['sensitivity'],
809                                   legend='Nss{} - Chain Mask {}'.format(
810                                       line_id[1], line_id[0]),
811                                   marker='circle')
812                if 'legacy' in test_id_dict['mode']:
813                    metric_tag = 'ota_summary_ch{}_{}_{}_ch{}'.format(
814                        test_id_dict['channel'], test_id_dict['mode'],
815                        test_id_dict['rate'], line_id[0])
816                else:
817                    metric_tag = 'ota_summary_ch{}_{}_mcs{}_nss{}_ch{}'.format(
818                        test_id_dict['channel'], test_id_dict['mode'],
819                        test_id_dict['rate'], line_id[1], line_id[0])
820
821                metric_name = metric_tag + '.avg_sensitivity'
822                metric_value = numpy.nanmean(line_results['sensitivity'])
823                self.testclass_metric_logger.add_metric(
824                    metric_name, metric_value)
825                self.log.info(('Average Sensitivity for {}: {:.1f}').format(
826                    metric_tag, metric_value))
827            current_context = (
828                context.get_current_context().get_full_output_path())
829            output_file_path = os.path.join(current_context,
830                                            str(test_id_str) + '.html')
831            curr_plot.generate_figure(output_file_path)
832            plots.append(curr_plot)
833        output_file_path = os.path.join(current_context, 'results.html')
834        BokehFigure.save_figures(plots, output_file_path)
835
836    def get_start_atten(self, testcase_params):
837        """Gets the starting attenuation for this sensitivity test.
838
839        The function gets the starting attenuation by checking whether a test
840        at the same rate configuration has executed. If so it sets the starting
841        point a configurable number of dBs below the reference test.
842
843        Returns:
844            start_atten: starting attenuation for current test
845        """
846        # If the test is being retried, start from the beginning
847        if self.retry_flag:
848            self.log.info('Retry flag set. Setting attenuation to minimum.')
849            return self.testclass_params['atten_start']
850        # Get the current and reference test config. The reference test is the
851        # one performed at the current MCS+1
852        ref_test_params = self.extract_test_id(
853            testcase_params,
854            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
855        # Check if reference test has been run and set attenuation accordingly
856        previous_params = [
857            self.extract_test_id(
858                result['testcase_params'],
859                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
860            for result in self.testclass_results
861        ]
862        try:
863            ref_index = previous_params[::-1].index(ref_test_params)
864            ref_index = len(previous_params) - 1 - ref_index
865            start_atten = self.testclass_results[ref_index][
866                'atten_at_range'] - (
867                    self.testclass_params['adjacent_mcs_range_gap'])
868        except ValueError:
869            print('Reference test not found. Starting from {} dB'.format(
870                self.testclass_params['atten_start']))
871            start_atten = self.testclass_params['atten_start']
872        start_atten = max(start_atten, 0)
873        return start_atten
874
875    def generate_test_cases(self, channels, modes, requested_rates, chain_mask,
876                            angles):
877        """Function that auto-generates test cases for a test class."""
878        test_cases = []
879        for channel in channels:
880            requested_modes = [
881                mode for mode in modes
882                if mode in self.VALID_TEST_CONFIGS[channel]
883            ]
884            for chain, mode in itertools.product(chain_mask, requested_modes):
885                bandwidth = int(''.join([x for x in mode if x.isdigit()]))
886                if 'VHT' in mode:
887                    valid_rates = self.VALID_RATES[mode]
888                elif 'HT' in mode:
889                    valid_rates = self.VALID_RATES[mode]
890                elif 'legacy' in mode and channel < 14:
891                    valid_rates = self.VALID_RATES['legacy_2GHz']
892                elif 'legacy' in mode and channel > 14:
893                    valid_rates = self.VALID_RATES['legacy_5GHz']
894                else:
895                    raise ValueError('Invalid test mode.')
896                for rate, angle in itertools.product(valid_rates, angles):
897                    testcase_params = collections.OrderedDict(
898                        channel=channel,
899                        mode=mode,
900                        bandwidth=bandwidth,
901                        rate=rate.mcs,
902                        num_streams=rate.streams,
903                        short_gi=1,
904                        chain_mask=chain,
905                        orientation=angle)
906                    if rate not in requested_rates:
907                        continue
908                    if str(chain) in ['0', '1'] and rate[1] == 2:
909                        # Do not test 2-stream rates in single chain mode
910                        continue
911                    if 'legacy' in mode:
912                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
913                                         '_ch{}_{}deg'.format(
914                                             channel, mode,
915                                             str(rate.mcs).replace('.', 'p'),
916                                             rate.streams, chain, angle))
917                    else:
918                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
919                                         '_ch{}_{}deg'.format(
920                                             channel, mode, rate.mcs,
921                                             rate.streams, chain, angle))
922                    setattr(self, testcase_name,
923                            partial(self._test_sensitivity, testcase_params))
924                    test_cases.append(testcase_name)
925        return test_cases
926
927
928class WifiOtaSensitivity_TenDegree_Test(WifiOtaSensitivityTest):
929
930    def __init__(self, controllers):
931        WifiOtaSensitivityTest.__init__(self, controllers)
932        requested_channels = [6, 36, 149]
933        requested_rates = [
934            self.RateTuple(8, 1, 86.7),
935            self.RateTuple(6, 1, 65),
936            self.RateTuple(2, 1, 21.7),
937            self.RateTuple(8, 2, 173.3),
938            self.RateTuple(6, 2, 130.3),
939            self.RateTuple(2, 2, 43.3)
940        ]
941        self.tests = self.generate_test_cases(requested_channels,
942                                              ['VHT20', 'VHT80'],
943                                              requested_rates, ['2x2'],
944                                              list(range(0, 360, 10)))
945
946
947class WifiOtaSensitivity_PerChain_TenDegree_Test(WifiOtaSensitivityTest):
948
949    def __init__(self, controllers):
950        WifiOtaSensitivityTest.__init__(self, controllers)
951        requested_channels = [6, 36, 149]
952        requested_rates = [
953            self.RateTuple(9, 1, 96),
954            self.RateTuple(9, 2, 192),
955            self.RateTuple(6, 1, 65),
956            self.RateTuple(6, 2, 130.3),
957            self.RateTuple(2, 1, 21.7),
958            self.RateTuple(2, 2, 43.3)
959        ]
960        self.tests = self.generate_test_cases(requested_channels,
961                                              ['VHT20', 'VHT80'],
962                                              requested_rates, [0, 1, '2x2'],
963                                              list(range(0, 360, 10)))
964
965
966class WifiOtaSensitivity_ThirtyDegree_Test(WifiOtaSensitivityTest):
967
968    def __init__(self, controllers):
969        WifiOtaSensitivityTest.__init__(self, controllers)
970        requested_channels = [6, 36, 149]
971        requested_rates = [
972            self.RateTuple(9, 1, 96),
973            self.RateTuple(8, 1, 86.7),
974            self.RateTuple(7, 1, 72.2),
975            self.RateTuple(4, 1, 43.3),
976            self.RateTuple(2, 1, 21.7),
977            self.RateTuple(0, 1, 7.2),
978            self.RateTuple(9, 2, 192),
979            self.RateTuple(8, 2, 173.3),
980            self.RateTuple(7, 2, 144.4),
981            self.RateTuple(4, 2, 86.7),
982            self.RateTuple(2, 2, 43.3),
983            self.RateTuple(0, 2, 14.4)
984        ]
985        self.tests = self.generate_test_cases(requested_channels,
986                                              ['VHT20', 'VHT80'],
987                                              requested_rates, ['2x2'],
988                                              list(range(0, 360, 30)))
989
990
991class WifiOtaSensitivity_45Degree_Test(WifiOtaSensitivityTest):
992
993    def __init__(self, controllers):
994        WifiOtaSensitivityTest.__init__(self, controllers)
995        requested_rates = [
996            self.RateTuple(8, 1, 86.7),
997            self.RateTuple(2, 1, 21.7),
998            self.RateTuple(8, 2, 173.3),
999            self.RateTuple(2, 2, 43.3)
1000        ]
1001        self.tests = self.generate_test_cases(
1002            [1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT80'],
1003            requested_rates, ['2x2'], list(range(0, 360, 45)))
1004