1# Lint as: python2, python3 2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import collections 7import logging 8import re 9import time 10 11import six 12 13from autotest_lib.client.common_lib import error 14from autotest_lib.client.common_lib import utils 15from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes 16 17 18# Used to represent stations we parse out of scan results. 19Station = collections.namedtuple('Station', 20 ['bssid', 'frequency', 'signal', 'ssid']) 21 22class WpaCliProxy(object): 23 """Interacts with a DUT through wpa_cli rather than shill.""" 24 25 SCANNING_INTERVAL_SECONDS = 5 26 POLLING_INTERVAL_SECONDS = 0.5 27 # From wpa_supplicant.c:wpa_supplicant_state_txt() 28 WPA_SUPPLICANT_ASSOCIATING_STATES = ( 29 'AUTHENTICATING', 30 'ASSOCIATING', 31 'ASSOCIATED', 32 '4WAY_HANDSHAKE', 33 'GROUP_HANDSHAKE') 34 WPA_SUPPLICANT_ASSOCIATED_STATES = ( 35 'COMPLETED',) 36 ANDROID_CMD_FORMAT = '/system/bin/wpa_cli IFNAME={0[ifname]} {0[cmd]}' 37 BRILLO_CMD_FORMAT = 'su system /system/bin/wpa_cli -i{0[ifname]} -p/data/misc/wifi/sockets {0[cmd]}' 38 CROS_CMD_FORMAT = ('su wpa -s /bin/bash ' 39 '-c "/usr/bin/wpa_cli -i {0[ifname]} {0[cmd]}"') 40 CAST_CMD_FORMAT = '/system/bin/wpa_cli -i {0[ifname]} {0[cmd]}' 41 RPI_CMD_FORMAT = '/sbin/wpa_cli -i {0[ifname]} {0[cmd]}' 42 43 44 def __init__(self, host, wifi_if, RPi=False): 45 self._host = host 46 self._wifi_if = wifi_if 47 self._created_networks = {} 48 if RPi: 49 self._wpa_cli_cmd_format = self.RPI_CMD_FORMAT 50 # TODO(wiley) Hardcoding this IFNAME prefix makes some big assumptions. 51 # we'll need to discover this parameter as it becomes more 52 # generally useful. 53 elif host.get_os_type() == 'android': 54 self._wpa_cli_cmd_format = self.ANDROID_CMD_FORMAT 55 elif host.get_os_type() == 'brillo': 56 self._wpa_cli_cmd_format = self.BRILLO_CMD_FORMAT 57 elif host.get_os_type() == 'cros': 58 self._wpa_cli_cmd_format = self.CROS_CMD_FORMAT 59 elif host.get_os_type() == 'cast_os': 60 self._wpa_cli_cmd_format = self.CAST_CMD_FORMAT 61 62 63 def _add_network(self, ssid): 64 """ 65 Add a wpa_supplicant network for ssid. 66 67 @param ssid string: name of network to add. 68 @return int network id of added network. 69 70 """ 71 add_result = self.run_wpa_cli_cmd('add_network', check_result=False) 72 network_id = int(add_result.stdout.splitlines()[-1]) 73 self.run_wpa_cli_cmd('set_network %d ssid \\"%s\\"' % 74 (network_id, ssid)) 75 self._created_networks[ssid] = network_id 76 logging.debug('Added network %s=%d', ssid, network_id) 77 return network_id 78 79 80 def run_wpa_cli_cmd(self, command, if_name=None, check_result=True): 81 """ 82 Run a wpa_cli command and optionally check the result. 83 84 Note: if you're using this function to do things like initiating scans, 85 consider initating those through Shill instead, to avoid collisions. 86 87 @param command string: suffix of a command to be prefixed with 88 an appropriate wpa_cli for this host. 89 @param if_name string: interface name. The wifi interface (self._wifi_if) 90 would be used, if the if_name was not specified. 91 @param check_result bool: True iff we want to check that the 92 command comes back with an 'OK' response. 93 @return result object returned by host.run. 94 95 """ 96 iface = if_name if if_name else self._wifi_if 97 cmd = self._wpa_cli_cmd_format.format({ 98 'ifname': iface, 99 'cmd': command 100 }) 101 result = self._host.run(cmd) 102 if check_result and not result.stdout.strip().endswith('OK'): 103 raise error.TestFail('wpa_cli command failed: %s' % command) 104 105 return result 106 107 108 def _get_status_dict(self): 109 """ 110 Gets the status output for a WiFi interface. 111 112 Get the output of wpa_cli status. This summarizes what wpa_supplicant 113 is doing with respect to the WiFi interface. 114 115 Example output: 116 117 Using interface 'wlan0' 118 wpa_state=INACTIVE 119 p2p_device_address=32:76:6f:f2:a6:c4 120 address=30:76:6f:f2:a6:c4 121 122 @return dict of key/value pairs parsed from output using = as divider. 123 124 """ 125 status_result = self.run_wpa_cli_cmd('status', check_result=False) 126 return dict([line.strip().split('=', 1) 127 for line in status_result.stdout.splitlines() 128 if line.find('=') > 0]) 129 130 131 def _is_associating_or_associated(self): 132 """@return True if the DUT is assocating or associated with a BSS.""" 133 state = self._get_status_dict().get('wpa_state', None) 134 return state in (self.WPA_SUPPLICANT_ASSOCIATING_STATES + 135 self.WPA_SUPPLICANT_ASSOCIATED_STATES) 136 137 138 def _is_associated(self, ssid): 139 """ 140 Check if the DUT is associated to a given SSID. 141 142 @param ssid string: SSID of the network we're concerned about. 143 @return True if we're associated with the specified SSID. 144 145 """ 146 status_dict = self._get_status_dict() 147 return (status_dict.get('ssid', None) == ssid and 148 status_dict.get('wpa_state', None) in 149 self.WPA_SUPPLICANT_ASSOCIATED_STATES) 150 151 152 def _is_connected(self, ssid): 153 """ 154 Check that we're connected to |ssid| and have an IP address. 155 156 @param ssid string: SSID of the network we're concerned about. 157 @return True if we have an IP and we're associated with |ssid|. 158 159 """ 160 status_dict = self._get_status_dict() 161 return (status_dict.get('ssid', None) == ssid and 162 status_dict.get('ip_address', None)) 163 164 165 def clean_profiles(self): 166 """Remove state associated with past networks we've connected to.""" 167 # list_networks output looks like: 168 # Using interface 'wlan0'^M 169 # network id / ssid / bssid / flags^M 170 # 0 SimpleConnect_jstja_ch1 any [DISABLED]^M 171 # 1 SimpleConnect_gjji2_ch6 any [DISABLED]^M 172 # 2 SimpleConnect_xe9d1_ch11 any [DISABLED]^M 173 list_networks_result = self.run_wpa_cli_cmd( 174 'list_networks', check_result=False) 175 start_parsing = False 176 for line in list_networks_result.stdout.splitlines(): 177 if not start_parsing: 178 if line.startswith('network id'): 179 start_parsing = True 180 continue 181 182 network_id = int(line.split()[0]) 183 self.run_wpa_cli_cmd('remove_network %d' % network_id) 184 self._created_networks = {} 185 186 187 def create_profile(self, _): 188 """ 189 This is a no op, since we don't have profiles. 190 191 @param _ ignored. 192 193 """ 194 logging.info('Skipping create_profile on %s', self.__class__.__name__) 195 196 197 def pop_profile(self, _): 198 """ 199 This is a no op, since we don't have profiles. 200 201 @param _ ignored. 202 203 """ 204 logging.info('Skipping pop_profile on %s', self.__class__.__name__) 205 206 207 def push_profile(self, _): 208 """ 209 This is a no op, since we don't have profiles. 210 211 @param _ ignored. 212 213 """ 214 logging.info('Skipping push_profile on %s', self.__class__.__name__) 215 216 217 def remove_profile(self, _): 218 """ 219 This is a no op, since we don't have profiles. 220 221 @param _ ignored. 222 223 """ 224 logging.info('Skipping remove_profile on %s', self.__class__.__name__) 225 226 227 def init_test_network_state(self): 228 """Create a clean slate for tests with respect to remembered networks. 229 230 For wpa_cli hosts, this means removing all remembered networks. 231 232 @return True iff operation succeeded, False otherwise. 233 234 """ 235 self.clean_profiles() 236 return True 237 238 239 def connect_wifi(self, assoc_params): 240 """ 241 Connect to the WiFi network described by AssociationParameters. 242 243 @param assoc_params AssociationParameters object. 244 @return serialized AssociationResult object. 245 246 """ 247 logging.debug('connect_wifi()') 248 # Ouptut should look like: 249 # Using interface 'wlan0' 250 # 0 251 assoc_result = xmlrpc_datatypes.AssociationResult() 252 network_id = self._add_network(assoc_params.ssid) 253 if assoc_params.is_hidden: 254 self.run_wpa_cli_cmd('set_network %d %s %s' % 255 (network_id, 'scan_ssid', '1')) 256 257 sec_config = assoc_params.security_config 258 for field, value in six.iteritems(sec_config.get_wpa_cli_properties()): 259 self.run_wpa_cli_cmd('set_network %d %s %s' % 260 (network_id, field, value)) 261 self.run_wpa_cli_cmd('select_network %d' % network_id) 262 263 # Wait for an appropriate BSS to appear in scan results. 264 scan_results_pattern = '\t'.join(['([0-9a-f:]{17})', # BSSID 265 '([0-9]+)', # Frequency 266 '(-[0-9]+)', # Signal level 267 '(.*)', # Encryption types 268 '(.*)']) # SSID 269 last_scan_time = -1.0 270 start_time = time.time() 271 while time.time() - start_time < assoc_params.discovery_timeout: 272 assoc_result.discovery_time = time.time() - start_time 273 if self._is_associating_or_associated(): 274 # Internally, wpa_supplicant writes its scan_results response 275 # to a 4kb buffer. When there are many BSS's, the buffer fills 276 # up, and we'll never see the BSS we care about in some cases. 277 break 278 279 scan_result = self.run_wpa_cli_cmd('scan_results', 280 check_result=False) 281 found_stations = [] 282 for line in scan_result.stdout.strip().splitlines(): 283 match = re.match(scan_results_pattern, line) 284 if match is None: 285 continue 286 found_stations.append( 287 Station(bssid=match.group(1), frequency=match.group(2), 288 signal=match.group(3), ssid=match.group(5))) 289 logging.debug('Found stations: %r', 290 [station.ssid for station in found_stations]) 291 if [station for station in found_stations 292 if station.ssid == assoc_params.ssid]: 293 break 294 295 if time.time() - last_scan_time > self.SCANNING_INTERVAL_SECONDS: 296 # Sometimes this might fail with a FAIL-BUSY if the previous 297 # scan hasn't finished. 298 scan_result = self.run_wpa_cli_cmd('scan', check_result=False) 299 if scan_result.stdout.strip().endswith('OK'): 300 last_scan_time = time.time() 301 time.sleep(self.POLLING_INTERVAL_SECONDS) 302 else: 303 assoc_result.failure_reason = 'Discovery timed out' 304 return assoc_result.serialize() 305 306 # Wait on association to finish. 307 start_time = time.time() 308 success = utils.poll_for_condition( 309 condition=lambda: self._is_associated(assoc_params.ssid), 310 timeout=assoc_params.association_timeout, 311 sleep_interval=self.POLLING_INTERVAL_SECONDS, 312 desc='Wait on association to finish') 313 assoc_result.association_time = time.time() - start_time 314 if not success: 315 assoc_result.failure_reason = 'Association timed out' 316 return assoc_result.serialize() 317 318 # Then wait for ip configuration to finish. 319 start_time = time.time() 320 success = utils.poll_for_condition( 321 condition=lambda: self._is_connected(assoc_params.ssid), 322 timeout=assoc_params.configuration_timeout, 323 sleep_interval=self.POLLING_INTERVAL_SECONDS, 324 desc='Wait for ip configuration to finish') 325 assoc_result.configuration_time = time.time() - start_time 326 if not success: 327 assoc_result.failure_reason = 'DHCP negotiation timed out' 328 return assoc_result.serialize() 329 330 assoc_result.success = True 331 logging.info('Connected to %s', assoc_params.ssid) 332 return assoc_result.serialize() 333 334 335 def disconnect(self, ssid): 336 """ 337 Disconnect from a WiFi network named |ssid|. 338 339 @param ssid string: name of network to disable in wpa_supplicant. 340 341 """ 342 logging.debug('disconnect()') 343 if ssid not in self._created_networks: 344 return False 345 self.run_wpa_cli_cmd('disable_network %d' % 346 self._created_networks[ssid]) 347 return True 348 349 350 def delete_entries_for_ssid(self, ssid): 351 """Delete a profile entry. 352 353 @param ssid string of WiFi service for which to delete entries. 354 @return True on success, False otherwise. 355 """ 356 return self.disconnect(ssid) 357 358 359 def set_device_enabled(self, wifi_interface, enabled): 360 """Enable or disable the WiFi device. 361 362 @param wifi_interface: string name of interface being modified. 363 @param enabled: boolean; true if this device should be enabled, 364 false if this device should be disabled. 365 @return True if it worked; false, otherwise 366 367 """ 368 return False 369 370 371 def sync_time_to(self, epoch_seconds): 372 """ 373 Sync time on the DUT to |epoch_seconds| from the epoch. 374 375 @param epoch_seconds float: number of seconds since the epoch. 376 377 """ 378 # This will claim to fail, but will work anyway. 379 self._host.run('date -u %f' % epoch_seconds, ignore_status=True) 380