1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import itertools
19import json
20import logging
21import os
22import statistics
23from acts import asserts
24from acts import context
25from acts import base_test
26from acts import utils
27from acts.controllers.utils_lib import ssh
28from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
29from acts_contrib.test_utils.wifi import ota_chamber
30from acts_contrib.test_utils.wifi import ota_sniffer
31from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
32from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
33from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
34from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
35from functools import partial
36
37
38class WifiPingTest(base_test.BaseTestClass):
39    """Class for ping-based Wifi performance tests.
40
41    This class implements WiFi ping performance tests such as range and RTT.
42    The class setups up the AP in the desired configurations, configures
43    and connects the phone to the AP, and runs  For an example config file to
44    run this test class see example_connectivity_performance_ap_sta.json.
45    """
46
47    TEST_TIMEOUT = 10
48    RSSI_POLL_INTERVAL = 0.2
49    SHORT_SLEEP = 1
50    MED_SLEEP = 5
51    MAX_CONSECUTIVE_ZEROS = 5
52    DISCONNECTED_PING_RESULT = {
53        'connected': 0,
54        'rtt': [],
55        'time_stamp': [],
56        'ping_interarrivals': [],
57        'packet_loss_percentage': 100
58    }
59
60    def __init__(self, controllers):
61        base_test.BaseTestClass.__init__(self, controllers)
62        self.testcase_metric_logger = (
63            BlackboxMappedMetricLogger.for_test_case())
64        self.testclass_metric_logger = (
65            BlackboxMappedMetricLogger.for_test_class())
66        self.publish_testcase_metrics = True
67
68    def setup_class(self):
69        self.dut = self.android_devices[-1]
70        req_params = [
71            'ping_test_params', 'testbed_params', 'main_network',
72            'RetailAccessPoints', 'RemoteServer'
73        ]
74        opt_params = ['OTASniffer']
75        self.unpack_userparams(req_params, opt_params)
76        self.testclass_params = self.ping_test_params
77        self.num_atten = self.attenuators[0].instrument.num_atten
78        self.ping_server = ssh.connection.SshConnection(
79            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
80        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
81        if hasattr(self,
82                   'OTASniffer') and self.testbed_params['sniffer_enable']:
83            try:
84                self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
85            except:
86                self.log.warning('Could not start sniffer. Disabling sniffs.')
87                self.testbed_params['sniffer_enable'] = 0
88        self.log.info('Access Point Configuration: {}'.format(
89            self.access_point.ap_settings))
90        self.log_path = os.path.join(logging.log_path, 'results')
91        os.makedirs(self.log_path, exist_ok=True)
92        self.atten_dut_chain_map = {}
93        self.testclass_results = []
94
95        # Turn WiFi ON
96        if self.testclass_params.get('airplane_mode', 1):
97            self.log.info('Turning on airplane mode.')
98            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
99                                'Can not turn on airplane mode.')
100        wutils.wifi_toggle_state(self.dut, True)
101
102        # Configure test retries
103        self.user_params['retry_tests'] = [self.__class__.__name__]
104
105    def teardown_class(self):
106        for attenuator in self.attenuators:
107            attenuator.set_atten(0, strict=False, retry=True)
108        # Turn WiFi OFF and reset AP
109        self.access_point.teardown()
110        for dev in self.android_devices:
111            wutils.wifi_toggle_state(dev, False)
112            dev.go_to_sleep()
113        self.process_testclass_results()
114
115    def setup_test(self):
116        self.retry_flag = False
117
118    def teardown_test(self):
119        self.retry_flag = False
120
121    def on_retry(self):
122        """Function to control test logic on retried tests.
123
124        This function is automatically executed on tests that are being
125        retried. In this case the function resets wifi, toggles it off and on
126        and sets a retry_flag to enable further tweaking the test logic on
127        second attempts.
128        """
129        self.retry_flag = True
130        for dev in self.android_devices:
131            wutils.reset_wifi(dev)
132            wutils.toggle_wifi_off_and_on(dev)
133
134    def process_testclass_results(self):
135        """Saves all test results to enable comparison."""
136        testclass_summary = {}
137        for test in self.testclass_results:
138            if 'range' in test['test_name']:
139                testclass_summary[test['test_name']] = test['range']
140        # Save results
141        results_file_path = os.path.join(self.log_path,
142                                         'testclass_summary.json')
143        with open(results_file_path, 'w') as results_file:
144            json.dump(wputils.serialize_dict(testclass_summary),
145                      results_file,
146                      indent=4)
147
148    def pass_fail_check_ping_rtt(self, result):
149        """Check the test result and decide if it passed or failed.
150
151        The function computes RTT statistics and fails any tests in which the
152        tail of the ping latency results exceeds the threshold defined in the
153        configuration file.
154
155        Args:
156            result: dict containing ping results and other meta data
157        """
158        ignored_fraction = (self.testclass_params['rtt_ignored_interval'] /
159                            self.testclass_params['rtt_ping_duration'])
160        sorted_rtt = [
161            sorted(x['rtt'][round(ignored_fraction * len(x['rtt'])):])
162            for x in result['ping_results']
163        ]
164        disconnected = any([len(x) == 0 for x in sorted_rtt])
165        if disconnected:
166            asserts.fail('Test failed. DUT disconnected at least once.')
167
168        rtt_at_test_percentile = [
169            x[int((1 - self.testclass_params['rtt_test_percentile'] / 100) *
170                  len(x))] for x in sorted_rtt
171        ]
172        # Set blackbox metric
173        if self.publish_testcase_metrics:
174            self.testcase_metric_logger.add_metric('ping_rtt',
175                                                   max(rtt_at_test_percentile))
176        # Evaluate test pass/fail
177        rtt_failed = any([
178            rtt > self.testclass_params['rtt_threshold'] * 1000
179            for rtt in rtt_at_test_percentile
180        ])
181        if rtt_failed:
182            #TODO: figure out how to cleanly exclude RTT tests from retry
183            asserts.explicit_pass(
184                'Test failed. RTTs at test percentile = {}'.format(
185                    rtt_at_test_percentile))
186        else:
187            asserts.explicit_pass(
188                'Test Passed. RTTs at test percentile = {}'.format(
189                    rtt_at_test_percentile))
190
191    def pass_fail_check_ping_range(self, result):
192        """Check the test result and decide if it passed or failed.
193
194        Checks whether the attenuation at which ping packet losses begin to
195        exceed the threshold matches the range derived from golden
196        rate-vs-range result files. The test fails is ping range is
197        range_gap_threshold worse than RvR range.
198
199        Args:
200            result: dict containing ping results and meta data
201        """
202        # Evaluate test pass/fail
203        test_message = ('Attenuation at range is {}dB. '
204                        'LLStats at Range: {}'.format(
205                            result['range'], result['llstats_at_range']))
206        if result['peak_throughput_pct'] < 95:
207            asserts.fail('(RESULT NOT RELIABLE) {}'.format(test_message))
208
209        # If pass, set Blackbox metric
210        if self.publish_testcase_metrics:
211            self.testcase_metric_logger.add_metric('ping_range',
212                                                   result['range'])
213        asserts.explicit_pass(test_message)
214
215    def pass_fail_check(self, result):
216        if 'range' in result['testcase_params']['test_type']:
217            self.pass_fail_check_ping_range(result)
218        else:
219            self.pass_fail_check_ping_rtt(result)
220
221    def process_ping_results(self, testcase_params, ping_range_result):
222        """Saves and plots ping results.
223
224        Args:
225            ping_range_result: dict containing ping results and metadata
226        """
227        # Compute range
228        ping_loss_over_att = [
229            x['packet_loss_percentage']
230            for x in ping_range_result['ping_results']
231        ]
232        ping_loss_above_threshold = [
233            x > self.testclass_params['range_ping_loss_threshold']
234            for x in ping_loss_over_att
235        ]
236        for idx in range(len(ping_loss_above_threshold)):
237            if all(ping_loss_above_threshold[idx:]):
238                range_index = max(idx, 1) - 1
239                break
240        else:
241            range_index = -1
242        ping_range_result['atten_at_range'] = testcase_params['atten_range'][
243            range_index]
244        ping_range_result['peak_throughput_pct'] = 100 - min(
245            ping_loss_over_att)
246        ping_range_result['total_attenuation'] = [
247            ping_range_result['fixed_attenuation'] + att
248            for att in testcase_params['atten_range']
249        ]
250        ping_range_result['range'] = (ping_range_result['atten_at_range'] +
251                                      ping_range_result['fixed_attenuation'])
252        ping_range_result['llstats_at_range'] = (
253            'TX MCS = {0} ({1:.1f}%). '
254            'RX MCS = {2} ({3:.1f}%)'.format(
255                ping_range_result['llstats'][range_index]['summary']
256                ['common_tx_mcs'], ping_range_result['llstats'][range_index]
257                ['summary']['common_tx_mcs_freq'] * 100,
258                ping_range_result['llstats'][range_index]['summary']
259                ['common_rx_mcs'], ping_range_result['llstats'][range_index]
260                ['summary']['common_rx_mcs_freq'] * 100))
261
262        # Save results
263        results_file_path = os.path.join(
264            self.log_path, '{}.json'.format(self.current_test_name))
265        with open(results_file_path, 'w') as results_file:
266            json.dump(wputils.serialize_dict(ping_range_result),
267                      results_file,
268                      indent=4)
269
270        # Plot results
271        if 'rtt' in self.current_test_name:
272            figure = BokehFigure(self.current_test_name,
273                                 x_label='Timestamp (s)',
274                                 primary_y_label='Round Trip Time (ms)')
275            for idx, result in enumerate(ping_range_result['ping_results']):
276                if len(result['rtt']) > 1:
277                    x_data = [
278                        t - result['time_stamp'][0]
279                        for t in result['time_stamp']
280                    ]
281                    figure.add_line(
282                        x_data, result['rtt'], 'RTT @ {}dB'.format(
283                            ping_range_result['attenuation'][idx]))
284
285            output_file_path = os.path.join(
286                self.log_path, '{}.html'.format(self.current_test_name))
287            figure.generate_figure(output_file_path)
288
289    def run_ping_test(self, testcase_params):
290        """Main function to test ping.
291
292        The function sets up the AP in the correct channel and mode
293        configuration and calls get_ping_stats while sweeping attenuation
294
295        Args:
296            testcase_params: dict containing all test parameters
297        Returns:
298            test_result: dict containing ping results and other meta data
299        """
300        # Prepare results dict
301        llstats_obj = wputils.LinkLayerStats(
302            self.dut, self.testclass_params.get('llstats_enabled', True))
303        test_result = collections.OrderedDict()
304        test_result['testcase_params'] = testcase_params.copy()
305        test_result['test_name'] = self.current_test_name
306        test_result['ap_config'] = self.access_point.ap_settings.copy()
307        test_result['attenuation'] = testcase_params['atten_range']
308        test_result['fixed_attenuation'] = self.testbed_params[
309            'fixed_attenuation'][str(testcase_params['channel'])]
310        test_result['rssi_results'] = []
311        test_result['ping_results'] = []
312        test_result['llstats'] = []
313        # Setup sniffer
314        if self.testbed_params['sniffer_enable']:
315            self.sniffer.start_capture(
316                testcase_params['test_network'],
317                chan=testcase_params['channel'],
318                bw=testcase_params['bandwidth'],
319                duration=testcase_params['ping_duration'] *
320                len(testcase_params['atten_range']) + self.TEST_TIMEOUT)
321        # Run ping and sweep attenuation as needed
322        zero_counter = 0
323        pending_first_ping = 1
324        for atten in testcase_params['atten_range']:
325            for attenuator in self.attenuators:
326                attenuator.set_atten(atten, strict=False, retry=True)
327            if self.testclass_params.get('monitor_rssi', 1):
328                rssi_future = wputils.get_connected_rssi_nb(
329                    self.dut,
330                    int(testcase_params['ping_duration'] / 2 /
331                        self.RSSI_POLL_INTERVAL), self.RSSI_POLL_INTERVAL,
332                    testcase_params['ping_duration'] / 2)
333            # Refresh link layer stats
334            llstats_obj.update_stats()
335            if testcase_params.get('ping_from_dut', False):
336                current_ping_stats = wputils.get_ping_stats(
337                    self.dut,
338                    wputils.get_server_address(self.ping_server, self.dut_ip,
339                                               '255.255.255.0'),
340                    testcase_params['ping_duration'],
341                    testcase_params['ping_interval'],
342                    testcase_params['ping_size'])
343            else:
344                current_ping_stats = wputils.get_ping_stats(
345                    self.ping_server, self.dut_ip,
346                    testcase_params['ping_duration'],
347                    testcase_params['ping_interval'],
348                    testcase_params['ping_size'])
349            if self.testclass_params.get('monitor_rssi', 1):
350                current_rssi = rssi_future.result()
351            else:
352                current_rssi = collections.OrderedDict([
353                    ('time_stamp', []), ('bssid', []), ('ssid', []),
354                    ('frequency', []),
355                    ('signal_poll_rssi', wputils.empty_rssi_result()),
356                    ('signal_poll_avg_rssi', wputils.empty_rssi_result()),
357                    ('chain_0_rssi', wputils.empty_rssi_result()),
358                    ('chain_1_rssi', wputils.empty_rssi_result())
359                ])
360            test_result['rssi_results'].append(current_rssi)
361            llstats_obj.update_stats()
362            curr_llstats = llstats_obj.llstats_incremental.copy()
363            test_result['llstats'].append(curr_llstats)
364            if current_ping_stats['connected']:
365                llstats_str = 'TX MCS = {0} ({1:.1f}%). RX MCS = {2} ({3:.1f}%)'.format(
366                    curr_llstats['summary']['common_tx_mcs'],
367                    curr_llstats['summary']['common_tx_mcs_freq'] * 100,
368                    curr_llstats['summary']['common_rx_mcs'],
369                    curr_llstats['summary']['common_rx_mcs_freq'] * 100)
370                self.log.info(
371                    'Attenuation = {0}dB\tPacket Loss = {1:.1f}%\t'
372                    'Avg RTT = {2:.2f}ms\tRSSI = {3:.1f} [{4:.1f},{5:.1f}]\t{6}\t'
373                    .format(atten,
374                            current_ping_stats['packet_loss_percentage'],
375                            statistics.mean(current_ping_stats['rtt']),
376                            current_rssi['signal_poll_rssi']['mean'],
377                            current_rssi['chain_0_rssi']['mean'],
378                            current_rssi['chain_1_rssi']['mean'], llstats_str))
379                if current_ping_stats['packet_loss_percentage'] == 100:
380                    zero_counter = zero_counter + 1
381                else:
382                    zero_counter = 0
383                    pending_first_ping = 0
384            else:
385                self.log.info(
386                    'Attenuation = {}dB. Disconnected.'.format(atten))
387                zero_counter = zero_counter + 1
388            test_result['ping_results'].append(current_ping_stats.as_dict())
389            # Test ends when ping loss stable at 0. If test has successfully
390            # started, test ends on MAX_CONSECUTIVE_ZEROS. In case of a restry
391            # extra zeros are allowed to ensure a test properly starts.
392            if self.retry_flag and pending_first_ping:
393                allowable_zeros = self.MAX_CONSECUTIVE_ZEROS**2
394            else:
395                allowable_zeros = self.MAX_CONSECUTIVE_ZEROS
396            if zero_counter == allowable_zeros:
397                self.log.info('Ping loss stable at 100%. Stopping test now.')
398                for idx in range(
399                        len(testcase_params['atten_range']) -
400                        len(test_result['ping_results'])):
401                    test_result['ping_results'].append(
402                        self.DISCONNECTED_PING_RESULT)
403                break
404        # Set attenuator to initial setting
405        for attenuator in self.attenuators:
406            attenuator.set_atten(testcase_params['atten_range'][0],
407                                 strict=False,
408                                 retry=True)
409        if self.testbed_params['sniffer_enable']:
410            self.sniffer.stop_capture()
411        return test_result
412
413    def setup_ap(self, testcase_params):
414        """Sets up the access point in the configuration required by the test.
415
416        Args:
417            testcase_params: dict containing AP and other test params
418        """
419        band = self.access_point.band_lookup_by_channel(
420            testcase_params['channel'])
421        if '6G' in band:
422            frequency = wutils.WifiEnums.channel_6G_to_freq[int(
423                testcase_params['channel'].strip('6g'))]
424        else:
425            if testcase_params['channel'] < 13:
426                frequency = wutils.WifiEnums.channel_2G_to_freq[
427                    testcase_params['channel']]
428            else:
429                frequency = wutils.WifiEnums.channel_5G_to_freq[
430                    testcase_params['channel']]
431        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
432            self.access_point.set_region(self.testbed_params['DFS_region'])
433        else:
434            self.access_point.set_region(self.testbed_params['default_region'])
435        self.access_point.set_channel(band, testcase_params['channel'])
436        self.access_point.set_bandwidth(band, testcase_params['mode'])
437        if 'low' in testcase_params['ap_power']:
438            self.log.info('Setting low AP power.')
439            self.access_point.set_power(
440                band, self.testclass_params['low_ap_tx_power'])
441        self.log.info('Access Point Configuration: {}'.format(
442            self.access_point.ap_settings))
443
444    def validate_and_connect(self, testcase_params):
445        if wputils.validate_network(self.dut,
446                                    testcase_params['test_network']['SSID']):
447            self.log.info('Already connected to desired network')
448        else:
449            current_country = wputils.get_country_code(self.dut)
450            if current_country != self.testclass_params['country_code']:
451                self.log.warning(
452                    'Requested CC: {}, Current CC: {}. Resetting WiFi'.format(
453                        self.testclass_params['country_code'],
454                        current_country))
455                wutils.wifi_toggle_state(self.dut, False)
456                wutils.set_wifi_country_code(
457                    self.dut, self.testclass_params['country_code'])
458                wutils.wifi_toggle_state(self.dut, True)
459                wutils.reset_wifi(self.dut)
460                wutils.set_wifi_country_code(
461                    self.dut, self.testclass_params['country_code'])
462            if self.testbed_params.get('txbf_off', False):
463                wputils.disable_beamforming(self.dut)
464            testcase_params['test_network']['channel'] = testcase_params[
465                'channel']
466            wutils.wifi_connect(self.dut,
467                                testcase_params['test_network'],
468                                num_of_tries=5,
469                                check_connectivity=True)
470
471    def setup_dut(self, testcase_params):
472        """Sets up the DUT in the configuration required by the test.
473
474        Args:
475            testcase_params: dict containing AP and other test params
476        """
477        # Turn screen off to preserve battery
478        if self.testbed_params.get('screen_on',
479                                   False) or self.testclass_params.get(
480                                       'screen_on', False):
481            self.dut.droid.wakeLockAcquireDim()
482        else:
483            self.dut.go_to_sleep()
484        self.validate_and_connect(testcase_params)
485        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
486        if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
487            self.atten_dut_chain_map[testcase_params[
488                'channel']] = wputils.get_current_atten_dut_chain_map(
489                    self.attenuators, self.dut, self.ping_server)
490        self.log.info('Current Attenuator-DUT Chain Map: {}'.format(
491            self.atten_dut_chain_map[testcase_params['channel']]))
492        for idx, atten in enumerate(self.attenuators):
493            if self.atten_dut_chain_map[testcase_params['channel']][
494                    idx] == testcase_params['attenuated_chain']:
495                atten.offset = atten.instrument.max_atten
496            else:
497                atten.offset = 0
498
499    def setup_ping_test(self, testcase_params):
500        """Function that gets devices ready for the test.
501
502        Args:
503            testcase_params: dict containing test-specific parameters
504        """
505        # Configure AP
506        self.setup_ap(testcase_params)
507        # Set attenuator to starting attenuation
508        band = wputils.CHANNEL_TO_BAND_MAP[testcase_params['channel']]
509        for attenuator in self.attenuators:
510            attenuator.set_atten(
511                self.testclass_params['range_atten_start'].get(band, 0),
512                strict=False,
513                retry=True)
514        # Reset, configure, and connect DUT
515        self.setup_dut(testcase_params)
516
517    def get_range_start_atten(self, testcase_params):
518        """Gets the starting attenuation for this ping test.
519
520        The function gets the starting attenuation by checking whether a test
521        at the same configuration has executed. If so it sets the starting
522        point a configurable number of dBs below the reference test.
523
524        Args:
525            testcase_params: dict containing all test parameters
526        Returns:
527            start_atten: starting attenuation for current test
528        """
529        band = wputils.CHANNEL_TO_BAND_MAP[testcase_params['channel']]
530        # If the test is being retried, start from the beginning
531        if self.retry_flag:
532            self.log.info('Retry flag set. Setting attenuation to minimum.')
533            return self.testclass_params['range_atten_start'].get(band, 0)
534        # Get the current and reference test config. The reference test is the
535        # one performed at the current MCS+1
536        ref_test_params = wputils.extract_sub_dict(
537            testcase_params, testcase_params['reference_params'])
538        # Check if reference test has been run and set attenuation accordingly
539        previous_params = [
540            wputils.extract_sub_dict(result['testcase_params'],
541                                     testcase_params['reference_params'])
542            for result in self.testclass_results
543        ]
544        try:
545            ref_index = previous_params[::-1].index(ref_test_params)
546            ref_index = len(previous_params) - 1 - ref_index
547            start_atten = self.testclass_results[ref_index][
548                'atten_at_range'] - (
549                    self.testclass_params['adjacent_range_test_gap'])
550        except ValueError:
551            start_atten = self.testclass_params['range_atten_start'].get(
552                band, 0)
553            self.log.info(
554                'Reference test not found. Starting from {} dB'.format(
555                    start_atten))
556        return start_atten
557
558    def compile_test_params(self, testcase_params):
559        # Check if test should be skipped.
560        wputils.check_skip_conditions(testcase_params, self.dut,
561                                      self.access_point,
562                                      getattr(self, 'ota_chamber', None))
563
564        band = self.access_point.band_lookup_by_channel(
565            testcase_params['channel'])
566        testcase_params['test_network'] = self.main_network[band]
567        testcase_params['band'] = band
568        if testcase_params['chain_mask'] in ['0', '1']:
569            testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
570                1 if testcase_params['chain_mask'] == '0' else 0)
571        else:
572            # Set attenuated chain to -1. Do not set to None as this will be
573            # compared to RF chain map which may include None
574            testcase_params['attenuated_chain'] = -1
575        if testcase_params['test_type'] == 'test_ping_range':
576            testcase_params.update(
577                ping_interval=self.testclass_params['range_ping_interval'],
578                ping_duration=self.testclass_params['range_ping_duration'],
579                ping_size=self.testclass_params['ping_size'],
580            )
581        elif testcase_params['test_type'] == 'test_fast_ping_rtt':
582            testcase_params.update(
583                ping_interval=self.testclass_params['rtt_ping_interval']
584                ['fast'],
585                ping_duration=self.testclass_params['rtt_ping_duration'],
586                ping_size=self.testclass_params['ping_size'],
587            )
588        elif testcase_params['test_type'] == 'test_slow_ping_rtt':
589            testcase_params.update(
590                ping_interval=self.testclass_params['rtt_ping_interval']
591                ['slow'],
592                ping_duration=self.testclass_params['rtt_ping_duration'],
593                ping_size=self.testclass_params['ping_size'])
594
595        if testcase_params['test_type'] == 'test_ping_range':
596            start_atten = self.get_range_start_atten(testcase_params)
597            num_atten_steps = int(
598                (self.testclass_params['range_atten_stop'] - start_atten) /
599                self.testclass_params['range_atten_step'])
600            testcase_params['atten_range'] = [
601                start_atten + x * self.testclass_params['range_atten_step']
602                for x in range(0, num_atten_steps)
603            ]
604        else:
605            testcase_params['atten_range'] = self.testclass_params[
606                'rtt_test_attenuation']
607        return testcase_params
608
609    def _test_ping(self, testcase_params):
610        """ Function that gets called for each range test case
611
612        The function gets called in each range test case. It customizes the
613        range test based on the test name of the test that called it
614
615        Args:
616            testcase_params: dict containing preliminary set of parameters
617        """
618        # Compile test parameters from config and test name
619        testcase_params = self.compile_test_params(testcase_params)
620        # Run ping test
621        self.setup_ping_test(testcase_params)
622        ping_result = self.run_ping_test(testcase_params)
623        # Postprocess results
624        self.process_ping_results(testcase_params, ping_result)
625        self.testclass_results.append(ping_result)
626        self.pass_fail_check(ping_result)
627
628    def generate_test_cases(self, ap_power, channels, modes, chain_mask,
629                            test_types, **kwargs):
630        """Function that auto-generates test cases for a test class."""
631        test_cases = []
632        allowed_configs = {
633            20: [
634                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
635                116, 132, 140, 149, 153, 157, 161, '6g37', '6g117', '6g213'
636            ],
637            40: [36, 44, 100, 149, 157, '6g37', '6g117', '6g213'],
638            80: [36, 100, 149, '6g37', '6g117', '6g213'],
639            160: [36, '6g37', '6g117', '6g213']
640        }
641
642        for channel, mode, chain, test_type in itertools.product(
643                channels, modes, chain_mask, test_types):
644            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
645            if channel not in allowed_configs[bandwidth]:
646                continue
647            testcase_name = '{}_ch{}_{}_ch{}'.format(test_type, channel, mode,
648                                                     chain)
649            testcase_params = collections.OrderedDict(test_type=test_type,
650                                                      ap_power=ap_power,
651                                                      channel=channel,
652                                                      mode=mode,
653                                                      bandwidth=bandwidth,
654                                                      chain_mask=chain,
655                                                      **kwargs)
656            setattr(self, testcase_name,
657                    partial(self._test_ping, testcase_params))
658            test_cases.append(testcase_name)
659        return test_cases
660
661
662class WifiPing_TwoChain_Test(WifiPingTest):
663
664    def __init__(self, controllers):
665        super().__init__(controllers)
666        self.tests = self.generate_test_cases(
667            ap_power='standard',
668            channels=[
669                1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161, '6g37', '6g117',
670                '6g213'
671            ],
672            modes=['bw20', 'bw80', 'bw160'],
673            test_types=[
674                'test_ping_range', 'test_fast_ping_rtt', 'test_slow_ping_rtt'
675            ],
676            chain_mask=['2x2'],
677            reference_params=['band', 'chain_mask'])
678
679
680class WifiPing_PerChainRange_Test(WifiPingTest):
681
682    def __init__(self, controllers):
683        super().__init__(controllers)
684        self.tests = self.generate_test_cases(
685            ap_power='standard',
686            chain_mask=['0', '1', '2x2'],
687            channels=[
688                1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161, '6g37', '6g117',
689                '6g213'
690            ],
691            modes=['bw20', 'bw80', 'bw160'],
692            test_types=['test_ping_range'],
693            reference_params=['band', 'chain_mask'])
694
695
696class WifiPing_LowPowerAP_Test(WifiPingTest):
697
698    def __init__(self, controllers):
699        super().__init__(controllers)
700        self.tests = self.generate_test_cases(
701            ap_power='low_power',
702            chain_mask=['0', '1', '2x2'],
703            channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
704            modes=['bw20', 'bw80'],
705            test_types=['test_ping_range'],
706            reference_params=['band', 'chain_mask'])
707
708
709# Over-the air version of ping tests
710class WifiOtaPingTest(WifiPingTest):
711    """Class to test over-the-air ping
712
713    This class tests WiFi ping performance in an OTA chamber. It enables
714    setting turntable orientation and other chamber parameters to study
715    performance in varying channel conditions
716    """
717
718    def __init__(self, controllers):
719        base_test.BaseTestClass.__init__(self, controllers)
720        self.testcase_metric_logger = (
721            BlackboxMappedMetricLogger.for_test_case())
722        self.testclass_metric_logger = (
723            BlackboxMappedMetricLogger.for_test_class())
724        self.publish_testcase_metrics = False
725
726    def setup_class(self):
727        WifiPingTest.setup_class(self)
728        self.ota_chamber = ota_chamber.create(
729            self.user_params['OTAChamber'])[0]
730
731    def teardown_class(self):
732        WifiPingTest.teardown_class(self)
733        self.process_testclass_results()
734        self.ota_chamber.reset_chamber()
735
736    def process_testclass_results(self):
737        """Saves all test results to enable comparison."""
738        WifiPingTest.process_testclass_results(self)
739
740        range_vs_angle = collections.OrderedDict()
741        for test in self.testclass_results:
742            curr_params = test['testcase_params']
743            curr_config = wputils.extract_sub_dict(
744                curr_params, ['channel', 'mode', 'chain_mask'])
745            curr_config_id = tuple(curr_config.items())
746            if curr_config_id in range_vs_angle:
747                if curr_params['position'] not in range_vs_angle[
748                        curr_config_id]['position']:
749                    range_vs_angle[curr_config_id]['position'].append(
750                        curr_params['position'])
751                    range_vs_angle[curr_config_id]['range'].append(
752                        test['range'])
753                    range_vs_angle[curr_config_id]['llstats_at_range'].append(
754                        test['llstats_at_range'])
755                else:
756                    range_vs_angle[curr_config_id]['range'][-1] = test['range']
757                    range_vs_angle[curr_config_id]['llstats_at_range'][
758                        -1] = test['llstats_at_range']
759            else:
760                range_vs_angle[curr_config_id] = {
761                    'position': [curr_params['position']],
762                    'range': [test['range']],
763                    'llstats_at_range': [test['llstats_at_range']]
764                }
765        chamber_mode = self.testclass_results[0]['testcase_params'][
766            'chamber_mode']
767        if chamber_mode == 'orientation':
768            x_label = 'Angle (deg)'
769        elif chamber_mode == 'stepped stirrers':
770            x_label = 'Position Index'
771        figure = BokehFigure(
772            title='Range vs. Position',
773            x_label=x_label,
774            primary_y_label='Range (dB)',
775        )
776        for curr_config_id, curr_config_data in range_vs_angle.items():
777            curr_config = collections.OrderedDict(curr_config_id)
778            figure.add_line(x_data=curr_config_data['position'],
779                            y_data=curr_config_data['range'],
780                            hover_text=curr_config_data['llstats_at_range'],
781                            legend='{}'.format(curr_config_id))
782            average_range = sum(curr_config_data['range']) / len(
783                curr_config_data['range'])
784            self.log.info('Average range for {} is: {}dB'.format(
785                curr_config_id, average_range))
786            metric_name = 'ota_summary_ch{}_{}_ch{}.avg_range'.format(
787                curr_config['channel'], curr_config['mode'],
788                curr_config['chain_mask'])
789            self.testclass_metric_logger.add_metric(metric_name, average_range)
790        current_context = context.get_current_context().get_full_output_path()
791        plot_file_path = os.path.join(current_context, 'results.html')
792        figure.generate_figure(plot_file_path)
793
794        # Save results
795        results_file_path = os.path.join(current_context,
796                                         'testclass_summary.json')
797        with open(results_file_path, 'w') as results_file:
798            json.dump(wputils.serialize_dict(range_vs_angle),
799                      results_file,
800                      indent=4)
801
802    def setup_dut(self, testcase_params):
803        """Sets up the DUT in the configuration required by the test.
804
805        Args:
806            testcase_params: dict containing AP and other test params
807        """
808        wputils.set_chain_mask(self.dut, testcase_params['chain_mask'])
809        # Turn screen off to preserve battery
810        if self.testbed_params.get('screen_on',
811                                   False) or self.testclass_params.get(
812                                       'screen_on', False):
813            self.dut.droid.wakeLockAcquireDim()
814        else:
815            self.dut.go_to_sleep()
816        self.validate_and_connect(testcase_params)
817        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
818
819    def setup_ping_test(self, testcase_params):
820        # Setup turntable
821        if testcase_params['chamber_mode'] == 'orientation':
822            self.ota_chamber.set_orientation(testcase_params['position'])
823        elif testcase_params['chamber_mode'] == 'stepped stirrers':
824            self.ota_chamber.step_stirrers(testcase_params['total_positions'])
825        # Continue setting up ping test
826        WifiPingTest.setup_ping_test(self, testcase_params)
827
828    def generate_test_cases(self, ap_power, channels, modes, chain_masks,
829                            chamber_mode, positions, **kwargs):
830        test_cases = []
831        allowed_configs = {
832            20: [
833                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 64, 100,
834                116, 132, 140, 149, 153, 157, 161, '6g37', '6g117', '6g213'
835            ],
836            40: [36, 44, 100, 149, 157, '6g37', '6g117', '6g213'],
837            80: [36, 100, 149, '6g37', '6g117', '6g213'],
838            160: [36, '6g37', '6g117', '6g213']
839        }
840        for channel, mode, chain_mask, position in itertools.product(
841                channels, modes, chain_masks, positions):
842            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
843            if channel not in allowed_configs[bandwidth]:
844                continue
845            testcase_name = 'test_ping_range_ch{}_{}_ch{}_pos{}'.format(
846                channel, mode, chain_mask, position)
847            testcase_params = collections.OrderedDict(
848                test_type='test_ping_range',
849                ap_power=ap_power,
850                channel=channel,
851                mode=mode,
852                bandwidth=bandwidth,
853                chain_mask=chain_mask,
854                chamber_mode=chamber_mode,
855                total_positions=len(positions),
856                position=position,
857                **kwargs)
858            setattr(self, testcase_name,
859                    partial(self._test_ping, testcase_params))
860            test_cases.append(testcase_name)
861        return test_cases
862
863
864class WifiOtaPing_TenDegree_Test(WifiOtaPingTest):
865
866    def __init__(self, controllers):
867        WifiOtaPingTest.__init__(self, controllers)
868        self.tests = self.generate_test_cases(
869            ap_power='standard',
870            channels=[6, 36, 149, '6g37', '6g117', '6g213'],
871            modes=['bw20', 'bw80', 'bw160'],
872            chain_masks=['2x2'],
873            chamber_mode='orientation',
874            positions=list(range(0, 360, 10)),
875            reference_params=['channel', 'mode', 'chain_mask'])
876
877
878class WifiOtaPing_SteppedStirrers_Test(WifiOtaPingTest):
879
880    def __init__(self, controllers):
881        WifiOtaPingTest.__init__(self, controllers)
882        self.tests = self.generate_test_cases(
883            ap_power='standard',
884            channels=[6, 36, 149],
885            modes=['bw20'],
886            chain_masks=['2x2'],
887            chamber_mode='stepped stirrers',
888            positions=list(range(100)),
889            reference_params=['channel', 'mode', 'chain_mask'])
890
891
892class WifiOtaPing_LowPowerAP_TenDegree_Test(WifiOtaPingTest):
893
894    def __init__(self, controllers):
895        WifiOtaPingTest.__init__(self, controllers)
896        self.tests = self.generate_test_cases(
897            ap_power='low_power',
898            channels=[6, 36, 149],
899            modes=['bw20'],
900            chain_masks=['2x2'],
901            chamber_mode='orientation',
902            positions=list(range(0, 360, 10)),
903            reference_params=['channel', 'mode', 'chain_mask'])
904
905
906class WifiOtaPing_LowPowerAP_SteppedStirrers_Test(WifiOtaPingTest):
907
908    def __init__(self, controllers):
909        WifiOtaPingTest.__init__(self, controllers)
910        self.tests = self.generate_test_cases(
911            ap_power='low_power',
912            channels=[6, 36, 149],
913            modes=['bw20'],
914            chain_masks=['2x2'],
915            chamber_mode='stepped stirrers',
916            positions=list(range(100)),
917            reference_params=['channel', 'mode', 'chain_mask'])
918
919
920class WifiOtaPing_LowPowerAP_PerChain_TenDegree_Test(WifiOtaPingTest):
921
922    def __init__(self, controllers):
923        WifiOtaPingTest.__init__(self, controllers)
924        self.tests = self.generate_test_cases(
925            ap_power='low_power',
926            channels=[6, 36, 149],
927            modes=['bw20'],
928            chain_masks=[0, 1, '2x2'],
929            chamber_mode='orientation',
930            positions=list(range(0, 360, 10)),
931            reference_params=['channel', 'mode', 'chain_mask'])
932
933
934class WifiOtaPing_PerChain_TenDegree_Test(WifiOtaPingTest):
935
936    def __init__(self, controllers):
937        WifiOtaPingTest.__init__(self, controllers)
938        self.tests = self.generate_test_cases(
939            ap_power='standard',
940            channels=[6, 36, 149, '6g37', '6g117', '6g213'],
941            modes=['bw20'],
942            chain_masks=[0, 1, '2x2'],
943            chamber_mode='orientation',
944            positions=list(range(0, 360, 10)),
945            reference_params=['channel', 'mode', 'chain_mask'])
946