1import collections 2import numpy 3import paramiko 4import time 5from acts_contrib.test_utils.wifi.wifi_retail_ap import WifiRetailAP 6from acts_contrib.test_utils.wifi.wifi_retail_ap import BlockingBrowser 7 8BROWSER_WAIT_SHORT = 1 9BROWSER_WAIT_MED = 3 10BROWSER_WAIT_LONG = 10 11BROWSER_WAIT_EXTRA_LONG = 60 12SSH_WAIT_SHORT = 0.1 13SSH_READ_BYTES = 600000 14 15 16class BrcmRefAP(WifiRetailAP): 17 """Class that implements Netgear RAX200 AP. 18 19 Since most of the class' implementation is shared with the R7000, this 20 class inherits from NetgearR7000AP and simply redefines config parameters 21 """ 22 23 def __init__(self, ap_settings): 24 super().__init__(ap_settings) 25 self.init_gui_data() 26 # Initialize SSH connection 27 self.init_ssh_connection() 28 # Read and update AP settings 29 self.read_ap_settings() 30 self.update_ap_settings(ap_settings) 31 32 def teardown(self): 33 """Function to perform destroy operations.""" 34 if self.ap_settings.get('lock_ap', 0): 35 self._unlock_ap() 36 self.close_ssh_connection() 37 38 def init_ssh_connection(self): 39 self.ssh_client = paramiko.SSHClient() 40 self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 41 self.ssh_client.connect(hostname=self.ap_settings['ip_address'], 42 username=self.ap_settings['admin_username'], 43 password=self.ap_settings['admin_password'], 44 look_for_keys=False, 45 allow_agent=False) 46 47 def close_ssh_connection(self): 48 self.ssh_client.close() 49 50 def run_ssh_cmd(self, command): 51 with self.ssh_client.invoke_shell() as shell: 52 shell.send('sh\n') 53 time.sleep(SSH_WAIT_SHORT) 54 shell.recv(SSH_READ_BYTES) 55 shell.send('{}\n'.format(command)) 56 time.sleep(SSH_WAIT_SHORT) 57 response = shell.recv(SSH_READ_BYTES).decode('utf-8').splitlines() 58 response = [line for line in response[1:] if line != '# '] 59 return response 60 61 def init_gui_data(self): 62 self.config_page = ('{protocol}://{username}:{password}@' 63 '{ip_address}:{port}/info.html').format( 64 protocol=self.ap_settings['protocol'], 65 username=self.ap_settings['admin_username'], 66 password=self.ap_settings['admin_password'], 67 ip_address=self.ap_settings['ip_address'], 68 port=self.ap_settings['port']) 69 self.config_page_nologin = ( 70 '{protocol}://{ip_address}:{port}/' 71 'wlrouter/radio.asp').format( 72 protocol=self.ap_settings['protocol'], 73 ip_address=self.ap_settings['ip_address'], 74 port=self.ap_settings['port']) 75 76 self.capabilities = { 77 'interfaces': ['2G_5G', '6G'], 78 'channels': { 79 '2G_5G': [ 80 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 52, 56, 81 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 136, 82 140, 144, 149, 153, 157, 161, 165 83 ], 84 '6G': ['6g' + str(ch) for ch in numpy.arange(1, 222, 4)] 85 }, 86 'modes': { 87 '2G_5G': [ 88 'VHT20', 'VHT40', 'VHT80', 'VHT160', 'HE20', 'HE40', 89 'HE80', 'HE160' 90 ], 91 '6G': [ 92 'VHT20', 'VHT40', 'VHT80', 'VHT160', 'HE20', 'HE40', 93 'HE80', 'HE160' 94 ] 95 }, 96 'default_mode': 'HE' 97 } 98 self.ap_settings['region'] = 'United States' 99 for interface in self.capabilities['interfaces']: 100 self.ap_settings[interface] = { 101 'ssid': 'BrcmAP0' if interface == '6G' else 'BrcmAP1', 102 'security_type': 'Open', 103 'password': '1234567890' 104 } 105 self.config_page_fields = collections.OrderedDict({ 106 ('2G_5G', 'interface'): ('wl_unit', 1), 107 ('2G_5G', 'band'): 108 'wl_nband', 109 ('2G_5G', 'bandwidth'): 110 'wl_bw_cap', 111 ('2G_5G', 'channel'): 112 'wl_chanspec', 113 ('6G', 'interface'): ('wl_unit', 0), 114 ('6G', 'band'): 115 'wl_nband', 116 ('6G', 'bandwidth'): 117 'wl_bw_cap', 118 ('6G', 'channel'): 119 'wl_chanspec', 120 }) 121 122 self.band_mode_values = {'1': '5 GHz', '2': '2.4 GHz', '4': '6 GHz'} 123 124 self.band_values = {'5 GHz': 1, '2.4 GHz': 2, '6 GHz': 4} 125 126 self.bandwidth_mode_values = { 127 '1': 'HE20', 128 '3': 'HE40', 129 '7': 'HE80', 130 '15': 'HE160' 131 } 132 133 def _decode_channel_string(self, channel_string): 134 if channel_string == '0': 135 return 'Auto' 136 if 'u' in channel_string or 'l' in channel_string: 137 channel_string = channel_string[0:-1] 138 elif len(channel_string.split('/')) > 1: 139 channel_string = channel_string.split('/')[0] 140 if '6g' in channel_string: 141 return channel_string 142 else: 143 return int(channel_string) 144 145 def _get_channel_str(self, interface, channel, bandwidth): 146 bandwidth = int(''.join([x for x in bandwidth if x.isdigit()])) 147 if bandwidth == 20: 148 channel_str = str(channel) 149 elif bandwidth in [80, 160]: 150 channel_str = str(channel) + '/' + str(bandwidth) 151 elif interface == '6G' and bandwidth == 40: 152 channel_str = str(channel) + '/' + str(bandwidth) 153 elif interface == '2G_5G' and bandwidth == 40: 154 lower_lookup = [ 155 36, 44, 52, 60, 100, 108, 116, 124, 132, 140, 149, 157 156 ] 157 if int(channel) in lower_lookup: 158 channel_str = str(channel) + 'l' 159 else: 160 channel_str = str(channel) + 'u' 161 return channel_str 162 163 def read_ap_settings(self): 164 with BlockingBrowser(self.ap_settings['headless_browser'], 165 900) as browser: 166 # Visit URL 167 browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 10) 168 browser.visit_persistent(self.config_page_nologin, 169 BROWSER_WAIT_MED, 10, self.config_page) 170 171 for key in self.config_page_fields.keys(): 172 if 'interface' in key: 173 config_item = browser.find_by_name( 174 self.config_page_fields[key][0]).first 175 config_item.select(self.config_page_fields[key][1]) 176 time.sleep(BROWSER_WAIT_SHORT) 177 else: 178 config_item = browser.find_by_name( 179 self.config_page_fields[key]).first 180 if 'band' in key: 181 self.ap_settings[key[0]][ 182 key[1]] = self.band_mode_values[config_item.value] 183 elif 'bandwidth' in key: 184 self.ap_settings[key[0]][key[ 185 1]] = self.bandwidth_mode_values[config_item.value] 186 elif 'channel' in key: 187 self.ap_settings[key[0]][ 188 key[1]] = self._decode_channel_string( 189 config_item.value) 190 else: 191 self.ap_settings[key[0]][key[1]] = config_item.value 192 193 def update_ap_settings(self, dict_settings={}, **named_settings): 194 """Function to update settings of existing AP. 195 196 Function copies arguments into ap_settings and calls configure_ap 197 to apply them. 198 199 Args: 200 dict_settings: single dictionary of settings to update 201 **named_settings: named settings to update 202 Note: dict and named_settings cannot contain the same settings. 203 """ 204 205 settings_to_update = dict(dict_settings, **named_settings) 206 if len(settings_to_update) != len(dict_settings) + len(named_settings): 207 raise KeyError('The following keys were passed twice: {}'.format( 208 (set(dict_settings.keys()).intersection( 209 set(named_settings.keys()))))) 210 211 updating_6G = '6G' in settings_to_update.keys() 212 updating_2G_5G = '2G_5G' in settings_to_update.keys() 213 214 if updating_2G_5G: 215 if 'channel' in settings_to_update['2G_5G']: 216 band = '2.4 GHz' if int( 217 settings_to_update['2G_5G']['channel']) < 13 else '5 GHz' 218 if band == '2.4 GHz': 219 settings_to_update['2G_5G']['bandwidth'] = 'HE20' 220 settings_to_update['2G_5G']['band'] = band 221 self.ap_settings, updates_requested, status_toggle_flag = self._update_settings_dict( 222 self.ap_settings, settings_to_update) 223 if updates_requested: 224 self.configure_ap(updating_2G_5G, updating_6G) 225 226 def configure_ap(self, updating_2G_5G, updating_6G): 227 228 with BlockingBrowser(self.ap_settings['headless_browser'], 229 900) as browser: 230 231 interfaces_to_update = [] 232 if updating_2G_5G: 233 interfaces_to_update.append('2G_5G') 234 if updating_6G: 235 interfaces_to_update.append('6G') 236 for interface in interfaces_to_update: 237 # Visit URL 238 browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 239 10) 240 browser.visit_persistent(self.config_page_nologin, 241 BROWSER_WAIT_MED, 10, 242 self.config_page) 243 244 config_item = browser.find_by_name( 245 self.config_page_fields[(interface, 'interface')][0]).first 246 config_item.select(self.config_page_fields[(interface, 247 'interface')][1]) 248 time.sleep(BROWSER_WAIT_SHORT) 249 250 for key, value in self.config_page_fields.items(): 251 if 'interface' in key or interface not in key: 252 continue 253 config_item = browser.find_by_name( 254 self.config_page_fields[key]).first 255 if 'band' in key: 256 config_item.select( 257 self.band_values[self.ap_settings[key[0]][key[1]]]) 258 elif 'bandwidth' in key: 259 config_item.select_by_text( 260 str(self.ap_settings[key[0]][key[1]])[2:] + ' MHz') 261 elif 'channel' in key: 262 channel_str = self._get_channel_str( 263 interface, self.ap_settings[interface][key[1]], 264 self.ap_settings[interface]['bandwidth']) 265 config_item.select_by_text(channel_str) 266 else: 267 self.ap_settings[key[0]][key[1]] = config_item.value 268 time.sleep(BROWSER_WAIT_SHORT) 269 # Apply 270 config_item = browser.find_by_name('action') 271 config_item.first.click() 272 time.sleep(BROWSER_WAIT_MED) 273 config_item = browser.find_by_name('action') 274 time.sleep(BROWSER_WAIT_SHORT) 275 config_item.first.click() 276 time.sleep(BROWSER_WAIT_LONG) 277 browser.visit_persistent(self.config_page, BROWSER_WAIT_LONG, 278 10) 279 280 def set_power(self, interface, power): 281 """Function that sets interface transmit power. 282 283 Args: 284 interface: string containing interface identifier (2G_5G, 6G) 285 power: power level in dBm 286 """ 287 wl_interface = 'wl0' if interface == '6G' else 'wl1' 288 289 if power == 'auto': 290 response = self.run_ssh_cmd( 291 'wl -i {} txpwr1 -1'.format(wl_interface)) 292 else: 293 power_qdbm = int(power * 4) 294 response = self.run_ssh_cmd('wl -i {} txpwr1 -o -q {}'.format( 295 wl_interface, power_qdbm)) 296 297 self.ap_settings[interface]['power'] = power_qdbm / 4 298 299 def get_power(self, interface): 300 """Function to get power used by AP 301 302 Args: 303 interface: interface to get rate on (2G_5G, 6G) 304 Returns: 305 power string returned by AP. 306 """ 307 wl_interface = 'wl0' if interface == '6G' else 'wl1' 308 return self.run_ssh_cmd('wl -i {} txpwr1'.format(wl_interface)) 309 310 def set_rate(self, 311 interface, 312 mode=None, 313 num_streams=None, 314 rate='auto', 315 short_gi=0, 316 tx_expansion=0): 317 """Function that sets rate. 318 319 Args: 320 interface: string containing interface identifier (2G, 5G_1) 321 mode: string indicating the WiFi standard to use 322 num_streams: number of MIMO streams. used only for VHT 323 rate: data rate of MCS index to use 324 short_gi: boolean controlling the use of short guard interval 325 """ 326 wl_interface = 'wl0' if interface == '6G' else 'wl1' 327 328 if interface == '6G': 329 band_rate = '6g_rate' 330 elif self.ap_settings['2G_5G']['channel'] < 13: 331 band_rate = '2g_rate' 332 else: 333 band_rate = '5g_rate' 334 335 if rate == 'auto': 336 cmd_string = 'wl -i {} {} auto'.format(wl_interface, band_rate) 337 elif 'legacy' in mode.lower(): 338 cmd_string = 'wl -i {} {} -r {} -x {}'.format( 339 wl_interface, band_rate, rate, tx_expansion) 340 elif 'ht' in mode.lower(): 341 cmd_string = 'wl -i {} {} -h {} -x {}'.format( 342 wl_interface, band_rate, rate, tx_expansion) 343 if short_gi: 344 cmd_string = cmd_string + '--sgi' 345 elif 'vht' in mode.lower(): 346 cmd_string = 'wl -i {} {} -v {}x{} -x {}'.format( 347 wl_interface, band_rate, rate, num_streams, tx_expansion) 348 if short_gi: 349 cmd_string = cmd_string + '--sgi' 350 elif 'he' in mode.lower(): 351 cmd_string = 'wl -i {} {} -e {}x{} -l -x {}'.format( 352 wl_interface, band_rate, rate, num_streams, tx_expansion) 353 if short_gi: 354 cmd_string = cmd_string + '-i {}'.format(short_gi) 355 356 response = self.run_ssh_cmd(cmd_string) 357 358 self.ap_settings[interface]['mode'] = mode 359 self.ap_settings[interface]['num_streams'] = num_streams 360 self.ap_settings[interface]['rate'] = rate 361 self.ap_settings[interface]['short_gi'] = short_gi 362 363 def get_rate(self, interface): 364 """Function to get rate used by AP 365 366 Args: 367 interface: interface to get rate on (2G_5G, 6G) 368 Returns: 369 rate string returned by AP. 370 """ 371 372 wl_interface = 'wl0' if interface == '6G' else 'wl1' 373 374 if interface == '6G': 375 band_rate = '6g_rate' 376 elif self.ap_settings['2G_5G']['channel'] < 13: 377 band_rate = '2g_rate' 378 else: 379 band_rate = '5g_rate' 380 return self.run_ssh_cmd('wl -i {} {}'.format(wl_interface, band_rate)) 381 382 def set_rts_enable(self, interface, enable): 383 """Function to enable or disable RTS/CTS 384 385 Args: 386 interface: interface to be configured (2G_5G, 6G) 387 enable: boolean controlling RTS/CTS behavior 388 """ 389 wl_interface = 'wl0' if interface == '6G' else 'wl1' 390 if enable: 391 self.run_ssh_cmd('wl -i {} ampdu_rts 1'.format(wl_interface)) 392 self.run_ssh_cmd('wl -i {} rtsthresh 2437'.format(wl_interface)) 393 else: 394 self.run_ssh_cmd('wl -i {} ampdu_rts 0'.format(wl_interface)) 395 self.run_ssh_cmd('wl -i {} rtsthresh 15000'.format(wl_interface)) 396 397 def set_tx_beamformer(self, interface, enable): 398 """Function to enable or disable transmit beamforming 399 400 Args: 401 interface: interface to be configured (2G_5G, 6G) 402 enable: boolean controlling beamformer behavior 403 """ 404 wl_interface = 'wl0' if interface == '6G' else 'wl1' 405 406 self.run_ssh_cmd('wl down') 407 self.run_ssh_cmd('wl -i {} txbf {}'.format(wl_interface, int(enable))) 408 self.run_ssh_cmd('wl up') 409 410 def get_sta_rssi(self, interface, sta_macaddr): 411 """Function to get RSSI from connected STA 412 413 Args: 414 interface: interface to be configured (2G_5G, 6G) 415 sta_macaddr: mac address of STA of interest 416 """ 417 wl_interface = 'wl0' if interface == '6G' else 'wl1' 418 419 return self.run_ssh_cmd('wl -i {} phy_rssi_ant {}'.format( 420 wl_interface, sta_macaddr)) 421