1import collections
2import numpy
3import paramiko
4import time
5from acts_contrib.test_utils.wifi.wifi_retail_ap import WifiRetailAP
6from acts_contrib.test_utils.wifi.wifi_retail_ap import BlockingBrowser
7
8BROWSER_WAIT_SHORT = 1
9BROWSER_WAIT_MED = 3
10BROWSER_WAIT_LONG = 10
11BROWSER_WAIT_EXTRA_LONG = 60
12SSH_WAIT_SHORT = 0.1
13SSH_READ_BYTES = 600000
14
15
16class BrcmRefAP(WifiRetailAP):
17    """Class that implements Netgear RAX200 AP.
18
19    Since most of the class' implementation is shared with the R7000, this
20    class inherits from NetgearR7000AP and simply redefines config parameters
21    """
22
23    def __init__(self, ap_settings):
24        super().__init__(ap_settings)
25        self.init_gui_data()
26        # Initialize SSH connection
27        self.init_ssh_connection()
28        # Read and update AP settings
29        self.read_ap_settings()
30        self.update_ap_settings(ap_settings)
31
32    def teardown(self):
33        """Function to perform destroy operations."""
34        if self.ap_settings.get('lock_ap', 0):
35            self._unlock_ap()
36        self.close_ssh_connection()
37
38    def init_ssh_connection(self):
39        self.ssh_client = paramiko.SSHClient()
40        self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
41        self.ssh_client.connect(hostname=self.ap_settings['ip_address'],
42                                username=self.ap_settings['admin_username'],
43                                password=self.ap_settings['admin_password'],
44                                look_for_keys=False,
45                                allow_agent=False)
46
47    def close_ssh_connection(self):
48        self.ssh_client.close()
49
50    def run_ssh_cmd(self, command):
51        with self.ssh_client.invoke_shell() as shell:
52            shell.send('sh\n')
53            time.sleep(SSH_WAIT_SHORT)
54            shell.recv(SSH_READ_BYTES)
55            shell.send('{}\n'.format(command))
56            time.sleep(SSH_WAIT_SHORT)
57            response = shell.recv(SSH_READ_BYTES).decode('utf-8').splitlines()
58            response = [line for line in response[1:] if line != '# ']
59        return response
60
61    def init_gui_data(self):
62        self.config_page = ('{protocol}://{username}:{password}@'
63                            '{ip_address}:{port}/info.html').format(
64                                protocol=self.ap_settings['protocol'],
65                                username=self.ap_settings['admin_username'],
66                                password=self.ap_settings['admin_password'],
67                                ip_address=self.ap_settings['ip_address'],
68                                port=self.ap_settings['port'])
69        self.config_page_nologin = (
70            '{protocol}://{ip_address}:{port}/'
71            'wlrouter/radio.asp').format(
72                protocol=self.ap_settings['protocol'],
73                ip_address=self.ap_settings['ip_address'],
74                port=self.ap_settings['port'])
75
76        self.capabilities = {
77            'interfaces': ['2G_5G', '6G'],
78            'channels': {
79                '2G_5G': [
80                    1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 52, 56,
81                    60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 136,
82                    140, 144, 149, 153, 157, 161, 165
83                ],
84                '6G': ['6g' + str(ch) for ch in numpy.arange(1, 222, 4)]
85            },
86            'modes': {
87                '2G_5G': [
88                    'VHT20', 'VHT40', 'VHT80', 'VHT160', 'HE20', 'HE40',
89                    'HE80', 'HE160'
90                ],
91                '6G': [
92                    'VHT20', 'VHT40', 'VHT80', 'VHT160', 'HE20', 'HE40',
93                    'HE80', 'HE160'
94                ]
95            },
96            'default_mode': 'HE'
97        }
98        self.ap_settings['region'] = 'United States'
99        for interface in self.capabilities['interfaces']:
100            self.ap_settings[interface] = {
101                'ssid': 'BrcmAP0' if interface == '6G' else 'BrcmAP1',
102                'security_type': 'Open',
103                'password': '1234567890'
104            }
105        self.config_page_fields = collections.OrderedDict({
106            ('2G_5G', 'interface'): ('wl_unit', 1),
107            ('2G_5G', 'band'):
108            'wl_nband',
109            ('2G_5G', 'bandwidth'):
110            'wl_bw_cap',
111            ('2G_5G', 'channel'):
112            'wl_chanspec',
113            ('6G', 'interface'): ('wl_unit', 0),
114            ('6G', 'band'):
115            'wl_nband',
116            ('6G', 'bandwidth'):
117            'wl_bw_cap',
118            ('6G', 'channel'):
119            'wl_chanspec',
120        })
121
122        self.band_mode_values = {'1': '5 GHz', '2': '2.4 GHz', '4': '6 GHz'}
123
124        self.band_values = {'5 GHz': 1, '2.4 GHz': 2, '6 GHz': 4}
125
126        self.bandwidth_mode_values = {
127            '1': 'HE20',
128            '3': 'HE40',
129            '7': 'HE80',
130            '15': 'HE160'
131        }
132
133    def _decode_channel_string(self, channel_string):
134        if channel_string == '0':
135            return 'Auto'
136        if 'u' in channel_string or 'l' in channel_string:
137            channel_string = channel_string[0:-1]
138        elif len(channel_string.split('/')) > 1:
139            channel_string = channel_string.split('/')[0]
140        if '6g' in channel_string:
141            return channel_string
142        else:
143            return int(channel_string)
144
145    def _get_channel_str(self, interface, channel, bandwidth):
146        bandwidth = int(''.join([x for x in bandwidth if x.isdigit()]))
147        if bandwidth == 20:
148            channel_str = str(channel)
149        elif bandwidth in [80, 160]:
150            channel_str = str(channel) + '/' + str(bandwidth)
151        elif interface == '6G' and bandwidth == 40:
152            channel_str = str(channel) + '/' + str(bandwidth)
153        elif interface == '2G_5G' and bandwidth == 40:
154            lower_lookup = [
155                36, 44, 52, 60, 100, 108, 116, 124, 132, 140, 149, 157
156            ]
157            if int(channel) in lower_lookup:
158                channel_str = str(channel) + 'l'
159            else:
160                channel_str = str(channel) + 'u'
161        return channel_str
162
163    def read_ap_settings(self):
164        with BlockingBrowser(self.ap_settings['headless_browser'],
165                             900) as browser:
166            # Visit URL
167            browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 10)
168            browser.visit_persistent(self.config_page_nologin,
169                                     BROWSER_WAIT_MED, 10, self.config_page)
170
171            for key in self.config_page_fields.keys():
172                if 'interface' in key:
173                    config_item = browser.find_by_name(
174                        self.config_page_fields[key][0]).first
175                    config_item.select(self.config_page_fields[key][1])
176                    time.sleep(BROWSER_WAIT_SHORT)
177                else:
178                    config_item = browser.find_by_name(
179                        self.config_page_fields[key]).first
180                    if 'band' in key:
181                        self.ap_settings[key[0]][
182                            key[1]] = self.band_mode_values[config_item.value]
183                    elif 'bandwidth' in key:
184                        self.ap_settings[key[0]][key[
185                            1]] = self.bandwidth_mode_values[config_item.value]
186                    elif 'channel' in key:
187                        self.ap_settings[key[0]][
188                            key[1]] = self._decode_channel_string(
189                                config_item.value)
190                    else:
191                        self.ap_settings[key[0]][key[1]] = config_item.value
192
193    def update_ap_settings(self, dict_settings={}, **named_settings):
194        """Function to update settings of existing AP.
195
196        Function copies arguments into ap_settings and calls configure_ap
197        to apply them.
198
199        Args:
200            dict_settings: single dictionary of settings to update
201            **named_settings: named settings to update
202            Note: dict and named_settings cannot contain the same settings.
203        """
204
205        settings_to_update = dict(dict_settings, **named_settings)
206        if len(settings_to_update) != len(dict_settings) + len(named_settings):
207            raise KeyError('The following keys were passed twice: {}'.format(
208                (set(dict_settings.keys()).intersection(
209                    set(named_settings.keys())))))
210
211        updating_6G = '6G' in settings_to_update.keys()
212        updating_2G_5G = '2G_5G' in settings_to_update.keys()
213
214        if updating_2G_5G:
215            if 'channel' in settings_to_update['2G_5G']:
216                band = '2.4 GHz' if int(
217                    settings_to_update['2G_5G']['channel']) < 13 else '5 GHz'
218                if band == '2.4 GHz':
219                    settings_to_update['2G_5G']['bandwidth'] = 'HE20'
220                settings_to_update['2G_5G']['band'] = band
221        self.ap_settings, updates_requested, status_toggle_flag = self._update_settings_dict(
222            self.ap_settings, settings_to_update)
223        if updates_requested:
224            self.configure_ap(updating_2G_5G, updating_6G)
225
226    def configure_ap(self, updating_2G_5G, updating_6G):
227
228        with BlockingBrowser(self.ap_settings['headless_browser'],
229                             900) as browser:
230
231            interfaces_to_update = []
232            if updating_2G_5G:
233                interfaces_to_update.append('2G_5G')
234            if updating_6G:
235                interfaces_to_update.append('6G')
236            for interface in interfaces_to_update:
237                # Visit URL
238                browser.visit_persistent(self.config_page, BROWSER_WAIT_MED,
239                                         10)
240                browser.visit_persistent(self.config_page_nologin,
241                                         BROWSER_WAIT_MED, 10,
242                                         self.config_page)
243
244                config_item = browser.find_by_name(
245                    self.config_page_fields[(interface, 'interface')][0]).first
246                config_item.select(self.config_page_fields[(interface,
247                                                            'interface')][1])
248                time.sleep(BROWSER_WAIT_SHORT)
249
250                for key, value in self.config_page_fields.items():
251                    if 'interface' in key or interface not in key:
252                        continue
253                    config_item = browser.find_by_name(
254                        self.config_page_fields[key]).first
255                    if 'band' in key:
256                        config_item.select(
257                            self.band_values[self.ap_settings[key[0]][key[1]]])
258                    elif 'bandwidth' in key:
259                        config_item.select_by_text(
260                            str(self.ap_settings[key[0]][key[1]])[2:] + ' MHz')
261                    elif 'channel' in key:
262                        channel_str = self._get_channel_str(
263                            interface, self.ap_settings[interface][key[1]],
264                            self.ap_settings[interface]['bandwidth'])
265                        config_item.select_by_text(channel_str)
266                    else:
267                        self.ap_settings[key[0]][key[1]] = config_item.value
268                    time.sleep(BROWSER_WAIT_SHORT)
269                # Apply
270                config_item = browser.find_by_name('action')
271                config_item.first.click()
272                time.sleep(BROWSER_WAIT_MED)
273                config_item = browser.find_by_name('action')
274                time.sleep(BROWSER_WAIT_SHORT)
275                config_item.first.click()
276                time.sleep(BROWSER_WAIT_LONG)
277                browser.visit_persistent(self.config_page, BROWSER_WAIT_LONG,
278                                         10)
279
280    def set_power(self, interface, power):
281        """Function that sets interface transmit power.
282
283        Args:
284            interface: string containing interface identifier (2G_5G, 6G)
285            power: power level in dBm
286        """
287        wl_interface = 'wl0' if interface == '6G' else 'wl1'
288
289        if power == 'auto':
290            response = self.run_ssh_cmd(
291                'wl -i {} txpwr1 -1'.format(wl_interface))
292        else:
293            power_qdbm = int(power * 4)
294            response = self.run_ssh_cmd('wl -i {} txpwr1 -o -q {}'.format(
295                wl_interface, power_qdbm))
296
297        self.ap_settings[interface]['power'] = power_qdbm / 4
298
299    def get_power(self, interface):
300        """Function to get power used by AP
301
302        Args:
303            interface: interface to get rate on (2G_5G, 6G)
304        Returns:
305            power string returned by AP.
306        """
307        wl_interface = 'wl0' if interface == '6G' else 'wl1'
308        return self.run_ssh_cmd('wl -i {} txpwr1'.format(wl_interface))
309
310    def set_rate(self,
311                 interface,
312                 mode=None,
313                 num_streams=None,
314                 rate='auto',
315                 short_gi=0,
316                 tx_expansion=0):
317        """Function that sets rate.
318
319        Args:
320            interface: string containing interface identifier (2G, 5G_1)
321            mode: string indicating the WiFi standard to use
322            num_streams: number of MIMO streams. used only for VHT
323            rate: data rate of MCS index to use
324            short_gi: boolean controlling the use of short guard interval
325        """
326        wl_interface = 'wl0' if interface == '6G' else 'wl1'
327
328        if interface == '6G':
329            band_rate = '6g_rate'
330        elif self.ap_settings['2G_5G']['channel'] < 13:
331            band_rate = '2g_rate'
332        else:
333            band_rate = '5g_rate'
334
335        if rate == 'auto':
336            cmd_string = 'wl -i {} {} auto'.format(wl_interface, band_rate)
337        elif 'legacy' in mode.lower():
338            cmd_string = 'wl -i {} {} -r {} -x {}'.format(
339                wl_interface, band_rate, rate, tx_expansion)
340        elif 'ht' in mode.lower():
341            cmd_string = 'wl -i {} {} -h {} -x {}'.format(
342                wl_interface, band_rate, rate, tx_expansion)
343            if short_gi:
344                cmd_string = cmd_string + '--sgi'
345        elif 'vht' in mode.lower():
346            cmd_string = 'wl -i {} {} -v {}x{} -x {}'.format(
347                wl_interface, band_rate, rate, num_streams, tx_expansion)
348            if short_gi:
349                cmd_string = cmd_string + '--sgi'
350        elif 'he' in mode.lower():
351            cmd_string = 'wl -i {} {} -e {}x{} -l -x {}'.format(
352                wl_interface, band_rate, rate, num_streams, tx_expansion)
353            if short_gi:
354                cmd_string = cmd_string + '-i {}'.format(short_gi)
355
356        response = self.run_ssh_cmd(cmd_string)
357
358        self.ap_settings[interface]['mode'] = mode
359        self.ap_settings[interface]['num_streams'] = num_streams
360        self.ap_settings[interface]['rate'] = rate
361        self.ap_settings[interface]['short_gi'] = short_gi
362
363    def get_rate(self, interface):
364        """Function to get rate used by AP
365
366        Args:
367            interface: interface to get rate on (2G_5G, 6G)
368        Returns:
369            rate string returned by AP.
370        """
371
372        wl_interface = 'wl0' if interface == '6G' else 'wl1'
373
374        if interface == '6G':
375            band_rate = '6g_rate'
376        elif self.ap_settings['2G_5G']['channel'] < 13:
377            band_rate = '2g_rate'
378        else:
379            band_rate = '5g_rate'
380        return self.run_ssh_cmd('wl -i {} {}'.format(wl_interface, band_rate))
381
382    def set_rts_enable(self, interface, enable):
383        """Function to enable or disable RTS/CTS
384
385        Args:
386            interface: interface to be configured (2G_5G, 6G)
387            enable: boolean controlling RTS/CTS behavior
388        """
389        wl_interface = 'wl0' if interface == '6G' else 'wl1'
390        if enable:
391            self.run_ssh_cmd('wl -i {} ampdu_rts 1'.format(wl_interface))
392            self.run_ssh_cmd('wl -i {} rtsthresh 2437'.format(wl_interface))
393        else:
394            self.run_ssh_cmd('wl -i {} ampdu_rts 0'.format(wl_interface))
395            self.run_ssh_cmd('wl -i {} rtsthresh 15000'.format(wl_interface))
396
397    def set_tx_beamformer(self, interface, enable):
398        """Function to enable or disable transmit beamforming
399
400        Args:
401            interface: interface to be configured (2G_5G, 6G)
402            enable: boolean controlling beamformer behavior
403        """
404        wl_interface = 'wl0' if interface == '6G' else 'wl1'
405
406        self.run_ssh_cmd('wl down')
407        self.run_ssh_cmd('wl -i {} txbf {}'.format(wl_interface, int(enable)))
408        self.run_ssh_cmd('wl up')
409
410    def get_sta_rssi(self, interface, sta_macaddr):
411        """Function to get RSSI from connected STA
412
413        Args:
414            interface: interface to be configured (2G_5G, 6G)
415            sta_macaddr: mac address of STA of interest
416        """
417        wl_interface = 'wl0' if interface == '6G' else 'wl1'
418
419        return self.run_ssh_cmd('wl -i {} phy_rssi_ant {}'.format(
420            wl_interface, sta_macaddr))
421