1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import itertools
19import json
20import logging
21import numpy
22import os
23import time
24from acts import asserts
25from acts import base_test
26from acts import utils
27from acts.controllers import iperf_server as ipf
28from acts.controllers.utils_lib import ssh
29from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
30from acts_contrib.test_utils.wifi import ota_chamber
31from acts_contrib.test_utils.wifi import ota_sniffer
32from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
33from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
34from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
35from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
36from functools import partial
37
38
39class WifiRvrTest(base_test.BaseTestClass):
40    """Class to test WiFi rate versus range.
41
42    This class implements WiFi rate versus range tests on single AP single STA
43    links. The class setups up the AP in the desired configurations, configures
44    and connects the phone to the AP, and runs iperf throughput test while
45    sweeping attenuation. For an example config file to run this test class see
46    example_connectivity_performance_ap_sta.json.
47    """
48
49    TEST_TIMEOUT = 6
50    MAX_CONSECUTIVE_ZEROS = 3
51
52    def __init__(self, controllers):
53        base_test.BaseTestClass.__init__(self, controllers)
54        self.testcase_metric_logger = (
55            BlackboxMappedMetricLogger.for_test_case())
56        self.testclass_metric_logger = (
57            BlackboxMappedMetricLogger.for_test_class())
58        self.publish_testcase_metrics = True
59
60    def setup_class(self):
61        """Initializes common test hardware and parameters.
62
63        This function initializes hardwares and compiles parameters that are
64        common to all tests in this class.
65        """
66        self.sta_dut = self.android_devices[0]
67        req_params = [
68            'RetailAccessPoints', 'rvr_test_params', 'testbed_params',
69            'RemoteServer', 'main_network'
70        ]
71        opt_params = ['golden_files_list', 'OTASniffer']
72        self.unpack_userparams(req_params, opt_params)
73        self.testclass_params = self.rvr_test_params
74        self.num_atten = self.attenuators[0].instrument.num_atten
75        self.iperf_server = self.iperf_servers[0]
76        self.remote_server = ssh.connection.SshConnection(
77            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
78        self.iperf_client = self.iperf_clients[0]
79        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
80        if hasattr(self,
81                   'OTASniffer') and self.testbed_params['sniffer_enable']:
82            try:
83                self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
84            except:
85                self.log.warning('Could not start sniffer. Disabling sniffs.')
86                self.testbed_params['sniffer_enable'] = 0
87        self.log.info('Access Point Configuration: {}'.format(
88            self.access_point.ap_settings))
89        self.log_path = os.path.join(logging.log_path, 'results')
90        os.makedirs(self.log_path, exist_ok=True)
91        if not hasattr(self, 'golden_files_list'):
92            if 'golden_results_path' in self.testbed_params:
93                self.golden_files_list = [
94                    os.path.join(self.testbed_params['golden_results_path'],
95                                 file) for file in
96                    os.listdir(self.testbed_params['golden_results_path'])
97                ]
98            else:
99                self.log.warning('No golden files found.')
100                self.golden_files_list = []
101        self.testclass_results = []
102
103        # Turn WiFi ON
104        if self.testclass_params.get('airplane_mode', 1):
105            for dev in self.android_devices:
106                self.log.info('Turning on airplane mode.')
107                asserts.assert_true(utils.force_airplane_mode(dev, True),
108                                    'Can not turn on airplane mode.')
109                wutils.reset_wifi(dev)
110                wutils.wifi_toggle_state(dev, True)
111
112    def teardown_test(self):
113        self.iperf_server.stop()
114
115    def teardown_class(self):
116        # Turn WiFi OFF
117        self.access_point.teardown()
118        for dev in self.android_devices:
119            wutils.wifi_toggle_state(dev, False)
120            dev.go_to_sleep()
121        self.process_testclass_results()
122
123    def process_testclass_results(self):
124        """Saves plot with all test results to enable comparison."""
125        # Plot and save all results
126        plots = collections.OrderedDict()
127        for result in self.testclass_results:
128            plot_id = (result['testcase_params']['channel'],
129                       result['testcase_params']['mode'])
130            if plot_id not in plots:
131                plots[plot_id] = BokehFigure(
132                    title='Channel {} {} ({})'.format(
133                        result['testcase_params']['channel'],
134                        result['testcase_params']['mode'],
135                        result['testcase_params']['traffic_type']),
136                    x_label='Attenuation (dB)',
137                    primary_y_label='Throughput (Mbps)')
138            plots[plot_id].add_line(result['total_attenuation'],
139                                    result['throughput_receive'],
140                                    result['test_name'].strip('test_rvr_'),
141                                    hover_text=result['hover_text'],
142                                    marker='circle')
143            plots[plot_id].add_line(result['total_attenuation'],
144                                    result['rx_phy_rate'],
145                                    result['test_name'].strip('test_rvr_') +
146                                    ' (Rx PHY)',
147                                    hover_text=result['hover_text'],
148                                    style='dashed',
149                                    marker='inverted_triangle')
150            plots[plot_id].add_line(result['total_attenuation'],
151                                    result['tx_phy_rate'],
152                                    result['test_name'].strip('test_rvr_') +
153                                    ' (Tx PHY)',
154                                    hover_text=result['hover_text'],
155                                    style='dashed',
156                                    marker='triangle')
157
158        figure_list = []
159        for plot_id, plot in plots.items():
160            plot.generate_figure()
161            figure_list.append(plot)
162        output_file_path = os.path.join(self.log_path, 'results.html')
163        BokehFigure.save_figures(figure_list, output_file_path)
164
165    def pass_fail_check(self, rvr_result):
166        """Check the test result and decide if it passed or failed.
167
168        Checks the RvR test result and compares to a throughput limites for
169        the same configuration. The pass/fail tolerances are provided in the
170        config file.
171
172        Args:
173            rvr_result: dict containing attenuation, throughput and other data
174        """
175        try:
176            throughput_limits = self.compute_throughput_limits(rvr_result)
177        except:
178            asserts.explicit_pass(
179                'Test passed by default. Golden file not found')
180
181        failure_count = 0
182        for idx, current_throughput in enumerate(
183                rvr_result['throughput_receive']):
184            if (current_throughput < throughput_limits['lower_limit'][idx]
185                    or current_throughput >
186                    throughput_limits['upper_limit'][idx]):
187                failure_count = failure_count + 1
188
189        # Set test metrics
190        rvr_result['metrics']['failure_count'] = failure_count
191        if self.publish_testcase_metrics:
192            self.testcase_metric_logger.add_metric('failure_count',
193                                                   failure_count)
194
195        # Assert pass or fail
196        if failure_count >= self.testclass_params['failure_count_tolerance']:
197            asserts.fail('Test failed. Found {} points outside limits.'.format(
198                failure_count))
199        asserts.explicit_pass(
200            'Test passed. Found {} points outside throughput limits.'.format(
201                failure_count))
202
203    def compute_throughput_limits(self, rvr_result):
204        """Compute throughput limits for current test.
205
206        Checks the RvR test result and compares to a throughput limites for
207        the same configuration. The pass/fail tolerances are provided in the
208        config file.
209
210        Args:
211            rvr_result: dict containing attenuation, throughput and other meta
212            data
213        Returns:
214            throughput_limits: dict containing attenuation and throughput limit data
215        """
216        test_name = self.current_test_name
217        golden_path = next(file_name for file_name in self.golden_files_list
218                           if test_name in file_name)
219        with open(golden_path, 'r') as golden_file:
220            golden_results = json.load(golden_file)
221            golden_attenuation = [
222                att + golden_results['fixed_attenuation']
223                for att in golden_results['attenuation']
224            ]
225        attenuation = []
226        lower_limit = []
227        upper_limit = []
228        for idx, current_throughput in enumerate(
229                rvr_result['throughput_receive']):
230            current_att = rvr_result['attenuation'][idx] + rvr_result[
231                'fixed_attenuation']
232            att_distances = [
233                abs(current_att - golden_att)
234                for golden_att in golden_attenuation
235            ]
236            sorted_distances = sorted(enumerate(att_distances),
237                                      key=lambda x: x[1])
238            closest_indeces = [dist[0] for dist in sorted_distances[0:3]]
239            closest_throughputs = [
240                golden_results['throughput_receive'][index]
241                for index in closest_indeces
242            ]
243            closest_throughputs.sort()
244
245            attenuation.append(current_att)
246            lower_limit.append(
247                max(
248                    closest_throughputs[0] - max(
249                        self.testclass_params['abs_tolerance'],
250                        closest_throughputs[0] *
251                        self.testclass_params['pct_tolerance'] / 100), 0))
252            upper_limit.append(closest_throughputs[-1] + max(
253                self.testclass_params['abs_tolerance'], closest_throughputs[-1]
254                * self.testclass_params['pct_tolerance'] / 100))
255        throughput_limits = {
256            'attenuation': attenuation,
257            'lower_limit': lower_limit,
258            'upper_limit': upper_limit
259        }
260        return throughput_limits
261
262    def plot_rvr_result(self, rvr_result):
263        """Saves plots and JSON formatted results.
264
265        Args:
266            rvr_result: dict containing attenuation, throughput and other meta
267            data
268        """
269        # Save output as text file
270        results_file_path = os.path.join(
271            self.log_path, '{}.json'.format(self.current_test_name))
272        with open(results_file_path, 'w') as results_file:
273            json.dump(wputils.serialize_dict(rvr_result),
274                      results_file,
275                      indent=4)
276        # Plot and save
277        figure = BokehFigure(title=self.current_test_name,
278                             x_label='Attenuation (dB)',
279                             primary_y_label='Throughput (Mbps)')
280        try:
281            golden_path = next(file_name
282                               for file_name in self.golden_files_list
283                               if self.current_test_name in file_name)
284            with open(golden_path, 'r') as golden_file:
285                golden_results = json.load(golden_file)
286            golden_attenuation = [
287                att + golden_results['fixed_attenuation']
288                for att in golden_results['attenuation']
289            ]
290            throughput_limits = self.compute_throughput_limits(rvr_result)
291            shaded_region = {
292                'x_vector': throughput_limits['attenuation'],
293                'lower_limit': throughput_limits['lower_limit'],
294                'upper_limit': throughput_limits['upper_limit']
295            }
296            figure.add_line(golden_attenuation,
297                            golden_results['throughput_receive'],
298                            'Golden Results',
299                            color='green',
300                            marker='circle',
301                            shaded_region=shaded_region)
302        except:
303            self.log.warning('ValueError: Golden file not found')
304
305        # Generate graph annotatios
306        rvr_result['hover_text'] = {
307            'llstats': [
308                'TX MCS = {0} ({1:.1f}%). RX MCS = {2} ({3:.1f}%)'.format(
309                    curr_llstats['summary']['common_tx_mcs'],
310                    curr_llstats['summary']['common_tx_mcs_freq'] * 100,
311                    curr_llstats['summary']['common_rx_mcs'],
312                    curr_llstats['summary']['common_rx_mcs_freq'] * 100)
313                for curr_llstats in rvr_result['llstats']
314            ],
315            'rssi': [
316                '{0:.2f} [{1:.2f},{2:.2f}]'.format(
317                    rssi['signal_poll_rssi'],
318                    rssi['chain_0_rssi'],
319                    rssi['chain_1_rssi'],
320                ) for rssi in rvr_result['rssi']
321            ]
322        }
323
324        figure.add_line(rvr_result['total_attenuation'],
325                        rvr_result['throughput_receive'],
326                        'Measured Throughput',
327                        hover_text=rvr_result['hover_text'],
328                        color='black',
329                        marker='circle')
330        figure.add_line(
331            rvr_result['total_attenuation'][0:len(rvr_result['rx_phy_rate'])],
332            rvr_result['rx_phy_rate'],
333            'Rx PHY Rate',
334            hover_text=rvr_result['hover_text'],
335            color='blue',
336            style='dashed',
337            marker='inverted_triangle')
338        figure.add_line(
339            rvr_result['total_attenuation'][0:len(rvr_result['rx_phy_rate'])],
340            rvr_result['tx_phy_rate'],
341            'Tx PHY Rate',
342            hover_text=rvr_result['hover_text'],
343            color='red',
344            style='dashed',
345            marker='triangle')
346
347        output_file_path = os.path.join(
348            self.log_path, '{}.html'.format(self.current_test_name))
349        figure.generate_figure(output_file_path)
350
351    def compute_test_metrics(self, rvr_result):
352        # Set test metrics
353        rvr_result['metrics'] = {}
354        rvr_result['metrics']['peak_tput'] = max(
355            rvr_result['throughput_receive'])
356        if self.publish_testcase_metrics:
357            self.testcase_metric_logger.add_metric(
358                'peak_tput', rvr_result['metrics']['peak_tput'])
359
360        test_mode = rvr_result['ap_settings'][rvr_result['testcase_params']
361                                              ['band']]['bandwidth']
362        tput_below_limit = [
363            tput <
364            self.testclass_params['tput_metric_targets'][test_mode]['high']
365            for tput in rvr_result['throughput_receive']
366        ]
367        rvr_result['metrics']['high_tput_range'] = -1
368        for idx in range(len(tput_below_limit)):
369            if all(tput_below_limit[idx:]):
370                if idx == 0:
371                    # Throughput was never above limit
372                    rvr_result['metrics']['high_tput_range'] = -1
373                else:
374                    rvr_result['metrics']['high_tput_range'] = rvr_result[
375                        'total_attenuation'][max(idx, 1) - 1]
376                break
377        if self.publish_testcase_metrics:
378            self.testcase_metric_logger.add_metric(
379                'high_tput_range', rvr_result['metrics']['high_tput_range'])
380
381        tput_below_limit = [
382            tput <
383            self.testclass_params['tput_metric_targets'][test_mode]['low']
384            for tput in rvr_result['throughput_receive']
385        ]
386        for idx in range(len(tput_below_limit)):
387            if all(tput_below_limit[idx:]):
388                rvr_result['metrics']['low_tput_range'] = rvr_result[
389                    'total_attenuation'][max(idx, 1) - 1]
390                break
391        else:
392            rvr_result['metrics']['low_tput_range'] = -1
393        if self.publish_testcase_metrics:
394            self.testcase_metric_logger.add_metric(
395                'low_tput_range', rvr_result['metrics']['low_tput_range'])
396
397    def process_test_results(self, rvr_result):
398        self.plot_rvr_result(rvr_result)
399        self.compute_test_metrics(rvr_result)
400
401    def run_rvr_test(self, testcase_params):
402        """Test function to run RvR.
403
404        The function runs an RvR test in the current device/AP configuration.
405        Function is called from another wrapper function that sets up the
406        testbed for the RvR test
407
408        Args:
409            testcase_params: dict containing test-specific parameters
410        Returns:
411            rvr_result: dict containing rvr_results and meta data
412        """
413        self.log.info('Start running RvR')
414        # Refresh link layer stats before test
415        llstats_obj = wputils.LinkLayerStats(
416            self.monitored_dut,
417            self.testclass_params.get('monitor_llstats', 1))
418        zero_counter = 0
419        throughput = []
420        rx_phy_rate = []
421        tx_phy_rate = []
422        llstats = []
423        rssi = []
424        for atten in testcase_params['atten_range']:
425            for dev in self.android_devices:
426                if not wputils.health_check(dev, 5, 50):
427                    asserts.skip('DUT health check failed. Skipping test.')
428            # Set Attenuation
429            for attenuator in self.attenuators:
430                attenuator.set_atten(atten, strict=False, retry=True)
431            # Refresh link layer stats
432            llstats_obj.update_stats()
433            # Setup sniffer
434            if self.testbed_params['sniffer_enable']:
435                self.sniffer.start_capture(
436                    network=testcase_params['test_network'],
437                    chan=testcase_params['channel'],
438                    bw=testcase_params['bandwidth'],
439                    duration=self.testclass_params['iperf_duration'] / 5)
440            # Start iperf session
441            if self.testclass_params.get('monitor_rssi', 1):
442                rssi_future = wputils.get_connected_rssi_nb(
443                    self.monitored_dut,
444                    self.testclass_params['iperf_duration'] - 1,
445                    1,
446                    1,
447                    interface=self.monitored_interface)
448            self.iperf_server.start(tag=str(atten))
449            client_output_path = self.iperf_client.start(
450                testcase_params['iperf_server_address'],
451                testcase_params['iperf_args'], str(atten),
452                self.testclass_params['iperf_duration'] + self.TEST_TIMEOUT)
453            server_output_path = self.iperf_server.stop()
454            if self.testclass_params.get('monitor_rssi', 1):
455                rssi_result = rssi_future.result()
456                current_rssi = {
457                    'signal_poll_rssi':
458                    rssi_result['signal_poll_rssi']['mean'],
459                    'chain_0_rssi': rssi_result['chain_0_rssi']['mean'],
460                    'chain_1_rssi': rssi_result['chain_1_rssi']['mean']
461                }
462            else:
463                current_rssi = {
464                    'signal_poll_rssi': float('nan'),
465                    'chain_0_rssi': float('nan'),
466                    'chain_1_rssi': float('nan')
467                }
468            rssi.append(current_rssi)
469            # Stop sniffer
470            if self.testbed_params['sniffer_enable']:
471                self.sniffer.stop_capture(tag=str(atten))
472            # Parse and log result
473            if testcase_params['use_client_output']:
474                iperf_file = client_output_path
475            else:
476                iperf_file = server_output_path
477            try:
478                iperf_result = ipf.IPerfResult(iperf_file)
479                curr_throughput = numpy.mean(iperf_result.instantaneous_rates[
480                    self.testclass_params['iperf_ignored_interval']:-1]
481                                             ) * 8 * (1.024**2)
482            except:
483                self.log.warning(
484                    'ValueError: Cannot get iperf result. Setting to 0')
485                curr_throughput = 0
486            throughput.append(curr_throughput)
487            llstats_obj.update_stats()
488            curr_llstats = llstats_obj.llstats_incremental.copy()
489            llstats.append(curr_llstats)
490            rx_phy_rate.append(curr_llstats['summary'].get(
491                'mean_rx_phy_rate', 0))
492            tx_phy_rate.append(curr_llstats['summary'].get(
493                'mean_tx_phy_rate', 0))
494            self.log.info(
495                ('Throughput at {0:.2f} dB is {1:.2f} Mbps. '
496                 'RSSI = {2:.2f} [{3:.2f}, {4:.2f}].').format(
497                     atten, curr_throughput, current_rssi['signal_poll_rssi'],
498                     current_rssi['chain_0_rssi'],
499                     current_rssi['chain_1_rssi']))
500            if curr_throughput == 0:
501                zero_counter = zero_counter + 1
502            else:
503                zero_counter = 0
504            if zero_counter == self.MAX_CONSECUTIVE_ZEROS:
505                self.log.info(
506                    'Throughput stable at 0 Mbps. Stopping test now.')
507                zero_padding = len(
508                    testcase_params['atten_range']) - len(throughput)
509                throughput.extend([0] * zero_padding)
510                rx_phy_rate.extend([0] * zero_padding)
511                tx_phy_rate.extend([0] * zero_padding)
512                break
513        for attenuator in self.attenuators:
514            attenuator.set_atten(0, strict=False, retry=True)
515        # Compile test result and meta data
516        rvr_result = collections.OrderedDict()
517        rvr_result['test_name'] = self.current_test_name
518        rvr_result['phone_fold_status'] = wputils.check_fold_status(self.sta_dut)
519        rvr_result['testcase_params'] = testcase_params.copy()
520        rvr_result['ap_settings'] = self.access_point.ap_settings.copy()
521        rvr_result['fixed_attenuation'] = self.testbed_params[
522            'fixed_attenuation'][str(testcase_params['channel'])]
523        rvr_result['attenuation'] = list(testcase_params['atten_range'])
524        rvr_result['total_attenuation'] = [
525            att + rvr_result['fixed_attenuation']
526            for att in rvr_result['attenuation']
527        ]
528        rvr_result['rssi'] = rssi
529        rvr_result['throughput_receive'] = throughput
530        rvr_result['rx_phy_rate'] = rx_phy_rate
531        rvr_result['tx_phy_rate'] = tx_phy_rate
532        rvr_result['llstats'] = llstats
533        return rvr_result
534
535    def setup_ap(self, testcase_params):
536        """Sets up the access point in the configuration required by the test.
537
538        Args:
539            testcase_params: dict containing AP and other test params
540        """
541        band = self.access_point.band_lookup_by_channel(
542            testcase_params['channel'])
543        if '6G' in band:
544            frequency = wutils.WifiEnums.channel_6G_to_freq[int(
545                testcase_params['channel'].strip('6g'))]
546        else:
547            if testcase_params['channel'] < 13:
548                frequency = wutils.WifiEnums.channel_2G_to_freq[
549                    testcase_params['channel']]
550            else:
551                frequency = wutils.WifiEnums.channel_5G_to_freq[
552                    testcase_params['channel']]
553        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
554            self.access_point.set_region(self.testbed_params['DFS_region'])
555        else:
556            self.access_point.set_region(self.testbed_params['default_region'])
557        self.access_point.set_channel_and_bandwidth(testcase_params['band'],
558                                                    testcase_params['channel'],
559                                                    testcase_params['mode'])
560        self.log.info('Access Point Configuration: {}'.format(
561            self.access_point.ap_settings))
562
563    def setup_dut(self, testcase_params):
564        """Sets up the DUT in the configuration required by the test.
565
566        Args:
567            testcase_params: dict containing AP and other test params
568        """
569        # Turn screen off to preserve battery
570        if self.testbed_params.get('screen_on',
571                                   False) or self.testclass_params.get(
572                                       'screen_on', False):
573            self.sta_dut.droid.wakeLockAcquireDim()
574        else:
575            self.sta_dut.go_to_sleep()
576        # Enable Tune Code
577        band = self.access_point.band_lookup_by_channel(testcase_params['channel'])
578        if 'tune_code' in self.testbed_params:
579            if int(self.testbed_params['tune_code']['manual_tune_code']):
580                self.log.info('Tune Code forcing enabled in config file')
581                wputils.write_antenna_tune_code(self.sta_dut, self.testbed_params['tune_code'][band])
582        if (wputils.validate_network(self.sta_dut,
583                                     testcase_params['test_network']['SSID'])
584                and not self.testclass_params.get('force_reconnect', 0)):
585            self.log.info('Already connected to desired network')
586        else:
587            wutils.wifi_toggle_state(self.sta_dut, False)
588            wutils.set_wifi_country_code(self.sta_dut,
589                                         self.testclass_params['country_code'])
590            wutils.wifi_toggle_state(self.sta_dut, True)
591            wutils.reset_wifi(self.sta_dut)
592            if self.testbed_params.get('txbf_off', False):
593                wputils.disable_beamforming(self.sta_dut)
594            wutils.set_wifi_country_code(self.sta_dut,
595                                         self.testclass_params['country_code'])
596            if self.testbed_params['sniffer_enable']:
597                self.sniffer.start_capture(
598                    network={'SSID': testcase_params['test_network']['SSID']},
599                    chan=testcase_params['channel'],
600                    bw=testcase_params['bandwidth'],
601                    duration=180)
602            try:
603                wutils.wifi_connect(self.sta_dut,
604                                    testcase_params['test_network'],
605                                    num_of_tries=5,
606                                    check_connectivity=True)
607                if self.testclass_params.get('num_streams', 2) == 1:
608                    wputils.set_nss_capability(self.sta_dut, 1)
609            finally:
610                if self.testbed_params['sniffer_enable']:
611                    self.sniffer.stop_capture(tag='connection_setup')
612
613    def setup_rvr_test(self, testcase_params):
614        """Function that gets devices ready for the test.
615
616        Args:
617            testcase_params: dict containing test-specific parameters
618        """
619        # Configure AP
620        self.setup_ap(testcase_params)
621        # Set attenuator to 0 dB
622        for attenuator in self.attenuators:
623            attenuator.set_atten(0, strict=False, retry=True)
624        # Reset, configure, and connect DUT
625        self.setup_dut(testcase_params)
626        # Wait before running the first wifi test
627        first_test_delay = self.testclass_params.get('first_test_delay', 600)
628        if first_test_delay > 0 and len(self.testclass_results) == 0:
629            self.log.info('Waiting before the first RvR test.')
630            time.sleep(first_test_delay)
631            self.setup_dut(testcase_params)
632        # Get iperf_server address
633        sta_dut_ip = self.sta_dut.droid.connectivityGetIPv4Addresses(
634            'wlan0')[0]
635        if isinstance(self.iperf_server, ipf.IPerfServerOverAdb):
636            testcase_params['iperf_server_address'] = sta_dut_ip
637        else:
638            if self.testbed_params.get('lan_traffic_only', True):
639                testcase_params[
640                    'iperf_server_address'] = wputils.get_server_address(
641                        self.remote_server, sta_dut_ip, '255.255.255.0')
642            else:
643                testcase_params[
644                    'iperf_server_address'] = wputils.get_server_address(
645                        self.remote_server, sta_dut_ip, 'public')
646        # Set DUT to monitor RSSI and LLStats on
647        self.monitored_dut = self.sta_dut
648        self.monitored_interface = 'wlan0'
649
650    def compile_test_params(self, testcase_params):
651        """Function that completes all test params based on the test name.
652
653        Args:
654            testcase_params: dict containing test-specific parameters
655        """
656        # Check if test should be skipped based on parameters.
657        wputils.check_skip_conditions(testcase_params, self.sta_dut,
658                                      self.access_point,
659                                      getattr(self, 'ota_chamber', None))
660
661        band = wputils.CHANNEL_TO_BAND_MAP[testcase_params['channel']]
662        start_atten = self.testclass_params['atten_start'].get(band, 0)
663        num_atten_steps = int(
664            (self.testclass_params['atten_stop'] - start_atten) /
665            self.testclass_params['atten_step'])
666        testcase_params['atten_range'] = [
667            start_atten + x * self.testclass_params['atten_step']
668            for x in range(0, num_atten_steps)
669        ]
670        band = self.access_point.band_lookup_by_channel(
671            testcase_params['channel'])
672        testcase_params['band'] = band
673        testcase_params['test_network'] = self.main_network[band]
674        if testcase_params['traffic_type'] == 'TCP':
675            testcase_params['iperf_socket_size'] = self.testclass_params.get(
676                'tcp_socket_size', None)
677            testcase_params['iperf_processes'] = self.testclass_params.get(
678                'tcp_processes', 1)
679        elif testcase_params['traffic_type'] == 'UDP':
680            testcase_params['iperf_socket_size'] = self.testclass_params.get(
681                'udp_socket_size', None)
682            testcase_params['iperf_processes'] = self.testclass_params.get(
683                'udp_processes', 1)
684        if (testcase_params['traffic_direction'] == 'DL'
685                and not isinstance(self.iperf_server, ipf.IPerfServerOverAdb)
686            ) or (testcase_params['traffic_direction'] == 'UL'
687                  and isinstance(self.iperf_server, ipf.IPerfServerOverAdb)):
688            testcase_params['iperf_args'] = wputils.get_iperf_arg_string(
689                duration=self.testclass_params['iperf_duration'],
690                reverse_direction=1,
691                traffic_type=testcase_params['traffic_type'],
692                socket_size=testcase_params['iperf_socket_size'],
693                num_processes=testcase_params['iperf_processes'],
694                udp_throughput=self.testclass_params['UDP_rates'][
695                    testcase_params['mode']])
696            testcase_params['use_client_output'] = True
697        else:
698            testcase_params['iperf_args'] = wputils.get_iperf_arg_string(
699                duration=self.testclass_params['iperf_duration'],
700                reverse_direction=0,
701                traffic_type=testcase_params['traffic_type'],
702                socket_size=testcase_params['iperf_socket_size'],
703                num_processes=testcase_params['iperf_processes'],
704                udp_throughput=self.testclass_params['UDP_rates'][
705                    testcase_params['mode']])
706            testcase_params['use_client_output'] = False
707        return testcase_params
708
709    def _test_rvr(self, testcase_params):
710        """ Function that gets called for each test case
711
712        Args:
713            testcase_params: dict containing test-specific parameters
714        """
715        # Compile test parameters from config and test name
716        testcase_params = self.compile_test_params(testcase_params)
717
718        # Prepare devices and run test
719        self.setup_rvr_test(testcase_params)
720        rvr_result = self.run_rvr_test(testcase_params)
721
722        # Post-process results
723        self.testclass_results.append(rvr_result)
724        self.process_test_results(rvr_result)
725        self.pass_fail_check(rvr_result)
726
727    def generate_test_cases(self, channels, modes, traffic_types,
728                            traffic_directions):
729        """Function that auto-generates test cases for a test class."""
730        test_cases = []
731        allowed_configs = {
732            20: [
733                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
734                116, 132, 140, 149, 153, 157, 161, '6g37', '6g117', '6g213'
735            ],
736            40: [36, 44, 100, 149, 157, '6g37', '6g117', '6g213'],
737            80: [36, 100, 149, '6g37', '6g117', '6g213'],
738            160: [36, '6g37', '6g117', '6g213']
739        }
740
741        for channel, mode, traffic_type, traffic_direction in itertools.product(
742                channels, modes, traffic_types, traffic_directions):
743            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
744            if channel not in allowed_configs[bandwidth]:
745                continue
746            test_name = 'test_rvr_{}_{}_ch{}_{}'.format(
747                traffic_type, traffic_direction, channel, mode)
748            test_params = collections.OrderedDict(
749                channel=channel,
750                mode=mode,
751                bandwidth=bandwidth,
752                traffic_type=traffic_type,
753                traffic_direction=traffic_direction)
754            setattr(self, test_name, partial(self._test_rvr, test_params))
755            test_cases.append(test_name)
756        return test_cases
757
758
759class WifiRvr_TCP_Test(WifiRvrTest):
760
761    def __init__(self, controllers):
762        super().__init__(controllers)
763        self.tests = self.generate_test_cases(
764            channels=[
765                1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161, '6g37', '6g117',
766                '6g213'
767            ],
768            modes=['bw20', 'bw40', 'bw80', 'bw160'],
769            traffic_types=['TCP'],
770            traffic_directions=['DL', 'UL'])
771
772
773class WifiRvr_VHT_TCP_Test(WifiRvrTest):
774
775    def __init__(self, controllers):
776        super().__init__(controllers)
777        self.tests = self.generate_test_cases(
778            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
779            modes=['VHT20', 'VHT40', 'VHT80'],
780            traffic_types=['TCP'],
781            traffic_directions=['DL', 'UL'])
782
783
784class WifiRvr_HE_TCP_Test(WifiRvrTest):
785
786    def __init__(self, controllers):
787        super().__init__(controllers)
788        self.tests = self.generate_test_cases(
789            channels=[
790                1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161, '6g37', '6g117',
791                '6g213'
792            ],
793            modes=['HE20', 'HE40', 'HE80', 'HE160'],
794            traffic_types=['TCP'],
795            traffic_directions=['DL', 'UL'])
796
797
798class WifiRvr_SampleUDP_Test(WifiRvrTest):
799
800    def __init__(self, controllers):
801        super().__init__(controllers)
802        self.tests = self.generate_test_cases(
803            channels=[6, 36, 149, '6g37'],
804            modes=['bw20', 'bw40', 'bw80', 'bw160'],
805            traffic_types=['UDP'],
806            traffic_directions=['DL', 'UL'])
807
808
809class WifiRvr_VHT_SampleUDP_Test(WifiRvrTest):
810
811    def __init__(self, controllers):
812        super().__init__(controllers)
813        self.tests = self.generate_test_cases(
814            channels=[6, 36, 149],
815            modes=['VHT20', 'VHT40', 'VHT80', 'VHT160'],
816            traffic_types=['UDP'],
817            traffic_directions=['DL', 'UL'])
818
819
820class WifiRvr_HE_SampleUDP_Test(WifiRvrTest):
821
822    def __init__(self, controllers):
823        super().__init__(controllers)
824        self.tests = self.generate_test_cases(
825            channels=[6, 36, 149],
826            modes=['HE20', 'HE40', 'HE80', 'HE160', '6g37'],
827            traffic_types=['UDP'],
828            traffic_directions=['DL', 'UL'])
829
830
831class WifiRvr_SampleDFS_Test(WifiRvrTest):
832
833    def __init__(self, controllers):
834        super().__init__(controllers)
835        self.tests = self.generate_test_cases(
836            channels=[64, 100, 116, 132, 140],
837            modes=['bw20', 'bw40', 'bw80'],
838            traffic_types=['TCP'],
839            traffic_directions=['DL', 'UL'])
840
841
842class WifiRvr_SingleChain_TCP_Test(WifiRvrTest):
843
844    def __init__(self, controllers):
845        super().__init__(controllers)
846        self.tests = self.generate_test_cases(
847            channels=[
848                1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161, '6g37', '6g117',
849                '6g213'
850            ],
851            modes=['bw20', 'bw40', 'bw80', 'bw160'],
852            traffic_types=['TCP'],
853            traffic_directions=['DL', 'UL'],
854            chains=[0, 1, '2x2'])
855
856    def setup_dut(self, testcase_params):
857        self.sta_dut = self.android_devices[0]
858        wputils.set_chain_mask(self.sta_dut, testcase_params['chain'])
859        WifiRvrTest.setup_dut(self, testcase_params)
860
861    def generate_test_cases(self, channels, modes, traffic_types,
862                            traffic_directions, chains):
863        """Function that auto-generates test cases for a test class."""
864        test_cases = []
865        allowed_configs = {
866            20: [
867                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
868                116, 132, 140, 149, 153, 157, 161, '6g37', '6g117', '6g213'
869            ],
870            40: [36, 44, 100, 149, 157, '6g37', '6g117', '6g213'],
871            80: [36, 100, 149, '6g37', '6g117', '6g213'],
872            160: [36, '6g37', '6g117', '6g213']
873        }
874
875        for channel, mode, chain, traffic_type, traffic_direction in itertools.product(
876                channels, modes, chains, traffic_types, traffic_directions):
877            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
878            if channel not in allowed_configs[bandwidth]:
879                continue
880            test_name = 'test_rvr_{}_{}_ch{}_{}_ch{}'.format(
881                traffic_type, traffic_direction, channel, mode, chain)
882            test_params = collections.OrderedDict(
883                channel=channel,
884                mode=mode,
885                bandwidth=bandwidth,
886                traffic_type=traffic_type,
887                traffic_direction=traffic_direction,
888                chain=chain)
889            setattr(self, test_name, partial(self._test_rvr, test_params))
890            test_cases.append(test_name)
891        return test_cases
892
893
894# Over-the air version of RVR tests
895class WifiOtaRvrTest(WifiRvrTest):
896    """Class to test over-the-air RvR
897
898    This class implements measures WiFi RvR tests in an OTA chamber. It enables
899    setting turntable orientation and other chamber parameters to study
900    performance in varying channel conditions
901    """
902
903    def __init__(self, controllers):
904        base_test.BaseTestClass.__init__(self, controllers)
905        self.testcase_metric_logger = (
906            BlackboxMappedMetricLogger.for_test_case())
907        self.testclass_metric_logger = (
908            BlackboxMappedMetricLogger.for_test_class())
909        self.publish_testcase_metrics = False
910
911    def setup_class(self):
912        WifiRvrTest.setup_class(self)
913        self.ota_chamber = ota_chamber.create(
914            self.user_params['OTAChamber'])[0]
915
916    def teardown_class(self):
917        WifiRvrTest.teardown_class(self)
918        self.ota_chamber.reset_chamber()
919
920    def extract_test_id(self, testcase_params, id_fields):
921        test_id = collections.OrderedDict(
922            (param, testcase_params.get(param, None)) for param in id_fields)
923        return test_id
924
925    def process_testclass_results(self):
926        """Saves plot with all test results to enable comparison."""
927        # Plot individual test id results raw data and compile metrics
928        plots = collections.OrderedDict()
929        compiled_data = collections.OrderedDict()
930        for result in self.testclass_results:
931            test_id = tuple(
932                self.extract_test_id(result['testcase_params'], [
933                    'channel', 'mode', 'traffic_type', 'traffic_direction',
934                    'chain'
935                ]).items())
936            if test_id not in plots:
937                # Initialize test id data when not present
938                compiled_data[test_id] = {
939                    'throughput': [],
940                    'rx_phy_rate': [],
941                    'tx_phy_rate': [],
942                    'metrics': {}
943                }
944                compiled_data[test_id]['metrics'] = {
945                    key: []
946                    for key in result['metrics'].keys()
947                }
948                plots[test_id] = BokehFigure(
949                    title='Channel {} {} ({} {})'.format(
950                        result['testcase_params']['channel'],
951                        result['testcase_params']['mode'],
952                        result['testcase_params']['traffic_type'],
953                        result['testcase_params']['traffic_direction']),
954                    x_label='Attenuation (dB)',
955                    primary_y_label='Throughput (Mbps)')
956                test_id_phy = test_id + tuple('PHY')
957                plots[test_id_phy] = BokehFigure(
958                    title='Channel {} {} ({} {}) (PHY Rate)'.format(
959                        result['testcase_params']['channel'],
960                        result['testcase_params']['mode'],
961                        result['testcase_params']['traffic_type'],
962                        result['testcase_params']['traffic_direction']),
963                    x_label='Attenuation (dB)',
964                    primary_y_label='PHY Rate (Mbps)')
965            # Compile test id data and metrics
966            compiled_data[test_id]['throughput'].append(
967                result['throughput_receive'])
968            compiled_data[test_id]['rx_phy_rate'].append(result['rx_phy_rate'])
969            compiled_data[test_id]['tx_phy_rate'].append(result['tx_phy_rate'])
970            compiled_data[test_id]['total_attenuation'] = result[
971                'total_attenuation']
972            for metric_key, metric_value in result['metrics'].items():
973                compiled_data[test_id]['metrics'][metric_key].append(
974                    metric_value)
975            # Add test id to plots
976            plots[test_id].add_line(result['total_attenuation'],
977                                    result['throughput_receive'],
978                                    result['test_name'].strip('test_rvr_'),
979                                    hover_text=result['hover_text'],
980                                    width=1,
981                                    style='dashed',
982                                    marker='circle')
983            plots[test_id_phy].add_line(
984                result['total_attenuation'],
985                result['rx_phy_rate'],
986                result['test_name'].strip('test_rvr_') + ' Rx PHY Rate',
987                hover_text=result['hover_text'],
988                width=1,
989                style='dashed',
990                marker='inverted_triangle')
991            plots[test_id_phy].add_line(
992                result['total_attenuation'],
993                result['tx_phy_rate'],
994                result['test_name'].strip('test_rvr_') + ' Tx PHY Rate',
995                hover_text=result['hover_text'],
996                width=1,
997                style='dashed',
998                marker='triangle')
999
1000        # Compute average RvRs and compute metrics over orientations
1001        for test_id, test_data in compiled_data.items():
1002            test_id_dict = dict(test_id)
1003            metric_tag = '{}_{}_ch{}_{}'.format(
1004                test_id_dict['traffic_type'],
1005                test_id_dict['traffic_direction'], test_id_dict['channel'],
1006                test_id_dict['mode'])
1007            high_tput_hit_freq = numpy.mean(
1008                numpy.not_equal(test_data['metrics']['high_tput_range'], -1))
1009            self.testclass_metric_logger.add_metric(
1010                '{}.high_tput_hit_freq'.format(metric_tag), high_tput_hit_freq)
1011            for metric_key, metric_value in test_data['metrics'].items():
1012                metric_key = '{}.avg_{}'.format(metric_tag, metric_key)
1013                metric_value = numpy.mean(metric_value)
1014                self.testclass_metric_logger.add_metric(
1015                    metric_key, metric_value)
1016            test_data['avg_rvr'] = numpy.mean(test_data['throughput'], 0)
1017            test_data['median_rvr'] = numpy.median(test_data['throughput'], 0)
1018            test_data['avg_rx_phy_rate'] = numpy.mean(test_data['rx_phy_rate'],
1019                                                      0)
1020            test_data['avg_tx_phy_rate'] = numpy.mean(test_data['tx_phy_rate'],
1021                                                      0)
1022            plots[test_id].add_line(test_data['total_attenuation'],
1023                                    test_data['avg_rvr'],
1024                                    legend='Average Throughput',
1025                                    marker='circle')
1026            plots[test_id].add_line(test_data['total_attenuation'],
1027                                    test_data['median_rvr'],
1028                                    legend='Median Throughput',
1029                                    marker='square')
1030            test_id_phy = test_id + tuple('PHY')
1031            plots[test_id_phy].add_line(test_data['total_attenuation'],
1032                                        test_data['avg_rx_phy_rate'],
1033                                        legend='Average Rx Rate',
1034                                        marker='inverted_triangle')
1035            plots[test_id_phy].add_line(test_data['total_attenuation'],
1036                                        test_data['avg_tx_phy_rate'],
1037                                        legend='Average Tx Rate',
1038                                        marker='triangle')
1039
1040        figure_list = []
1041        for plot_id, plot in plots.items():
1042            plot.generate_figure()
1043            figure_list.append(plot)
1044        output_file_path = os.path.join(self.log_path, 'results.html')
1045        BokehFigure.save_figures(figure_list, output_file_path)
1046
1047    def setup_rvr_test(self, testcase_params):
1048        # Continue test setup
1049        WifiRvrTest.setup_rvr_test(self, testcase_params)
1050        # Set turntable orientation
1051        self.ota_chamber.set_orientation(testcase_params['orientation'])
1052
1053    def generate_test_cases(self, channels, modes, angles, traffic_types,
1054                            directions):
1055        test_cases = []
1056        allowed_configs = {
1057            20: [
1058                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
1059                116, 132, 140, 149, 153, 157, 161
1060            ],
1061            40: [36, 44, 100, 149, 157],
1062            80: [36, 100, 149],
1063            160: [36, '6g37', '6g117', '6g213']
1064        }
1065        for channel, mode, angle, traffic_type, direction in itertools.product(
1066                channels, modes, angles, traffic_types, directions):
1067            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
1068            if channel not in allowed_configs[bandwidth]:
1069                continue
1070            testcase_name = 'test_rvr_{}_{}_ch{}_{}_{}deg'.format(
1071                traffic_type, direction, channel, mode, angle)
1072            test_params = collections.OrderedDict(channel=channel,
1073                                                  mode=mode,
1074                                                  bandwidth=bandwidth,
1075                                                  traffic_type=traffic_type,
1076                                                  traffic_direction=direction,
1077                                                  orientation=angle)
1078            setattr(self, testcase_name, partial(self._test_rvr, test_params))
1079            test_cases.append(testcase_name)
1080        return test_cases
1081
1082
1083class WifiOtaRvr_StandardOrientation_Test(WifiOtaRvrTest):
1084
1085    def __init__(self, controllers):
1086        WifiOtaRvrTest.__init__(self, controllers)
1087        self.tests = self.generate_test_cases(
1088            [1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161, '6g37'],
1089            ['bw20', 'bw40', 'bw80', 'bw160'], list(range(0, 360, 45)),
1090            ['TCP'], ['DL', 'UL'])
1091
1092
1093class WifiOtaRvr_SampleChannel_Test(WifiOtaRvrTest):
1094
1095    def __init__(self, controllers):
1096        WifiOtaRvrTest.__init__(self, controllers)
1097        self.tests = self.generate_test_cases([6], ['bw20'],
1098                                              list(range(0, 360, 45)), ['TCP'],
1099                                              ['DL'])
1100        self.tests.extend(
1101            self.generate_test_cases([36, 149], ['bw80', 'bw160'],
1102                                     list(range(0, 360, 45)), ['TCP'], ['DL']))
1103        self.tests.extend(
1104            self.generate_test_cases(['6g37'], ['bw160'],
1105                                     list(range(0, 360, 45)), ['TCP'], ['DL']))
1106
1107class WifiOtaRvr_SampleChannel_UDP_Test(WifiOtaRvrTest):
1108
1109    def __init__(self, controllers):
1110        WifiOtaRvrTest.__init__(self, controllers)
1111        self.tests = self.generate_test_cases([6], ['bw20'],
1112                                              list(range(0, 360, 45)), ['UDP'],
1113                                              ['DL', 'UL'])
1114        self.tests.extend(
1115            self.generate_test_cases([36, 149], ['bw80', 'bw160'],
1116                                     list(range(0, 360, 45)), ['UDP'], ['DL', 'UL']))
1117        self.tests.extend(
1118            self.generate_test_cases(['6g37'], ['bw160'],
1119                                     list(range(0, 360, 45)), ['UDP'], ['DL', 'UL']))
1120
1121class WifiOtaRvr_SingleOrientation_Test(WifiOtaRvrTest):
1122
1123    def __init__(self, controllers):
1124        WifiOtaRvrTest.__init__(self, controllers)
1125        self.tests = self.generate_test_cases(
1126            [6, 36, 40, 44, 48, 149, 153, 157, 161, '6g37'],
1127            ['bw20', 'bw40', 'bw80', 'bw160'], [0], ['TCP'], ['DL', 'UL'])
1128
1129
1130class WifiOtaRvr_SingleChain_Test(WifiOtaRvrTest):
1131
1132    def __init__(self, controllers):
1133        WifiOtaRvrTest.__init__(self, controllers)
1134        self.tests = self.generate_test_cases([6], ['bw20'],
1135                                              list(range(0, 360, 45)), ['TCP'],
1136                                              ['DL', 'UL'], [0, 1])
1137        self.tests.extend(
1138            self.generate_test_cases([36, 149], ['bw20', 'bw80', 'bw160'],
1139                                     list(range(0, 360, 45)), ['TCP'],
1140                                     ['DL', 'UL'], [0, 1, '2x2']))
1141        self.tests.extend(
1142            self.generate_test_cases(['6g37'], ['bw20', 'bw80', 'bw160'],
1143                                     list(range(0, 360, 45)), ['TCP'],
1144                                     ['DL', 'UL'], [0, 1, '2x2']))
1145
1146    def setup_dut(self, testcase_params):
1147        self.sta_dut = self.android_devices[0]
1148        wputils.set_chain_mask(self.sta_dut, testcase_params['chain'])
1149        WifiRvrTest.setup_dut(self, testcase_params)
1150
1151    def generate_test_cases(self, channels, modes, angles, traffic_types,
1152                            directions, chains):
1153        test_cases = []
1154        allowed_configs = {
1155            20: [
1156                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
1157                116, 132, 140, 149, 153, 157, 161
1158            ],
1159            40: [36, 44, 100, 149, 157],
1160            80: [36, 100, 149],
1161            160: [36, '6g37', '6g117', '6g213']
1162        }
1163        for channel, mode, chain, angle, traffic_type, direction in itertools.product(
1164                channels, modes, chains, angles, traffic_types, directions):
1165            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
1166            if channel not in allowed_configs[bandwidth]:
1167                continue
1168            testcase_name = 'test_rvr_{}_{}_ch{}_{}_ch{}_{}deg'.format(
1169                traffic_type, direction, channel, mode, chain, angle)
1170            test_params = collections.OrderedDict(channel=channel,
1171                                                  mode=mode,
1172                                                  bandwidth=bandwidth,
1173                                                  chain=chain,
1174                                                  traffic_type=traffic_type,
1175                                                  traffic_direction=direction,
1176                                                  orientation=angle)
1177            setattr(self, testcase_name, partial(self._test_rvr, test_params))
1178            test_cases.append(testcase_name)
1179        return test_cases
1180