xref: /aosp_15_r20/external/autotest/server/cros/network/wpa_cli_proxy.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import collections
7import logging
8import re
9import time
10
11import six
12
13from autotest_lib.client.common_lib import error
14from autotest_lib.client.common_lib import utils
15from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
16
17
18# Used to represent stations we parse out of scan results.
19Station = collections.namedtuple('Station',
20                                 ['bssid', 'frequency', 'signal', 'ssid'])
21
22class WpaCliProxy(object):
23    """Interacts with a DUT through wpa_cli rather than shill."""
24
25    SCANNING_INTERVAL_SECONDS = 5
26    POLLING_INTERVAL_SECONDS = 0.5
27    # From wpa_supplicant.c:wpa_supplicant_state_txt()
28    WPA_SUPPLICANT_ASSOCIATING_STATES = (
29            'AUTHENTICATING',
30            'ASSOCIATING',
31            'ASSOCIATED',
32            '4WAY_HANDSHAKE',
33            'GROUP_HANDSHAKE')
34    WPA_SUPPLICANT_ASSOCIATED_STATES = (
35            'COMPLETED',)
36    ANDROID_CMD_FORMAT = '/system/bin/wpa_cli IFNAME={0[ifname]} {0[cmd]}'
37    BRILLO_CMD_FORMAT = 'su system /system/bin/wpa_cli -i{0[ifname]} -p/data/misc/wifi/sockets {0[cmd]}'
38    CROS_CMD_FORMAT = ('su wpa -s /bin/bash '
39                       '-c "/usr/bin/wpa_cli -i {0[ifname]} {0[cmd]}"')
40    CAST_CMD_FORMAT = '/system/bin/wpa_cli -i {0[ifname]} {0[cmd]}'
41    RPI_CMD_FORMAT = '/sbin/wpa_cli -i {0[ifname]} {0[cmd]}'
42
43
44    def __init__(self, host, wifi_if, RPi=False):
45        self._host = host
46        self._wifi_if = wifi_if
47        self._created_networks = {}
48        if RPi:
49            self._wpa_cli_cmd_format = self.RPI_CMD_FORMAT
50        # TODO(wiley) Hardcoding this IFNAME prefix makes some big assumptions.
51        #             we'll need to discover this parameter as it becomes more
52        #             generally useful.
53        elif host.get_os_type() == 'android':
54            self._wpa_cli_cmd_format = self.ANDROID_CMD_FORMAT
55        elif host.get_os_type() == 'brillo':
56            self._wpa_cli_cmd_format = self.BRILLO_CMD_FORMAT
57        elif host.get_os_type() == 'cros':
58            self._wpa_cli_cmd_format = self.CROS_CMD_FORMAT
59        elif host.get_os_type() == 'cast_os':
60            self._wpa_cli_cmd_format = self.CAST_CMD_FORMAT
61
62
63    def _add_network(self, ssid):
64        """
65        Add a wpa_supplicant network for ssid.
66
67        @param ssid string: name of network to add.
68        @return int network id of added network.
69
70        """
71        add_result = self.run_wpa_cli_cmd('add_network', check_result=False)
72        network_id = int(add_result.stdout.splitlines()[-1])
73        self.run_wpa_cli_cmd('set_network %d ssid \\"%s\\"' %
74                             (network_id, ssid))
75        self._created_networks[ssid] = network_id
76        logging.debug('Added network %s=%d', ssid, network_id)
77        return network_id
78
79
80    def run_wpa_cli_cmd(self, command, if_name=None, check_result=True):
81        """
82        Run a wpa_cli command and optionally check the result.
83
84        Note: if you're using this function to do things like initiating scans,
85        consider initating those through Shill instead, to avoid collisions.
86
87        @param command string: suffix of a command to be prefixed with
88                an appropriate wpa_cli for this host.
89        @param if_name string: interface name. The wifi interface (self._wifi_if)
90                would be used, if the if_name was not specified.
91        @param check_result bool: True iff we want to check that the
92                command comes back with an 'OK' response.
93        @return result object returned by host.run.
94
95        """
96        iface = if_name if if_name else self._wifi_if
97        cmd = self._wpa_cli_cmd_format.format({
98                'ifname': iface,
99                'cmd': command
100        })
101        result = self._host.run(cmd)
102        if check_result and not result.stdout.strip().endswith('OK'):
103            raise error.TestFail('wpa_cli command failed: %s' % command)
104
105        return result
106
107
108    def _get_status_dict(self):
109        """
110        Gets the status output for a WiFi interface.
111
112        Get the output of wpa_cli status.  This summarizes what wpa_supplicant
113        is doing with respect to the WiFi interface.
114
115        Example output:
116
117            Using interface 'wlan0'
118            wpa_state=INACTIVE
119            p2p_device_address=32:76:6f:f2:a6:c4
120            address=30:76:6f:f2:a6:c4
121
122        @return dict of key/value pairs parsed from output using = as divider.
123
124        """
125        status_result = self.run_wpa_cli_cmd('status', check_result=False)
126        return dict([line.strip().split('=', 1)
127                     for line in status_result.stdout.splitlines()
128                     if line.find('=') > 0])
129
130
131    def _is_associating_or_associated(self):
132        """@return True if the DUT is assocating or associated with a BSS."""
133        state = self._get_status_dict().get('wpa_state', None)
134        return state in (self.WPA_SUPPLICANT_ASSOCIATING_STATES +
135                         self.WPA_SUPPLICANT_ASSOCIATED_STATES)
136
137
138    def _is_associated(self, ssid):
139        """
140        Check if the DUT is associated to a given SSID.
141
142        @param ssid string: SSID of the network we're concerned about.
143        @return True if we're associated with the specified SSID.
144
145        """
146        status_dict = self._get_status_dict()
147        return (status_dict.get('ssid', None) == ssid and
148                status_dict.get('wpa_state', None) in
149                        self.WPA_SUPPLICANT_ASSOCIATED_STATES)
150
151
152    def _is_connected(self, ssid):
153        """
154        Check that we're connected to |ssid| and have an IP address.
155
156        @param ssid string: SSID of the network we're concerned about.
157        @return True if we have an IP and we're associated with |ssid|.
158
159        """
160        status_dict = self._get_status_dict()
161        return (status_dict.get('ssid', None) == ssid and
162                status_dict.get('ip_address', None))
163
164
165    def clean_profiles(self):
166        """Remove state associated with past networks we've connected to."""
167        # list_networks output looks like:
168        # Using interface 'wlan0'^M
169        # network id / ssid / bssid / flags^M
170        # 0    SimpleConnect_jstja_ch1 any     [DISABLED]^M
171        # 1    SimpleConnect_gjji2_ch6 any     [DISABLED]^M
172        # 2    SimpleConnect_xe9d1_ch11        any     [DISABLED]^M
173        list_networks_result = self.run_wpa_cli_cmd(
174                'list_networks', check_result=False)
175        start_parsing = False
176        for line in list_networks_result.stdout.splitlines():
177            if not start_parsing:
178                if line.startswith('network id'):
179                    start_parsing = True
180                continue
181
182            network_id = int(line.split()[0])
183            self.run_wpa_cli_cmd('remove_network %d' % network_id)
184        self._created_networks = {}
185
186
187    def create_profile(self, _):
188        """
189        This is a no op, since we don't have profiles.
190
191        @param _ ignored.
192
193        """
194        logging.info('Skipping create_profile on %s', self.__class__.__name__)
195
196
197    def pop_profile(self, _):
198        """
199        This is a no op, since we don't have profiles.
200
201        @param _ ignored.
202
203        """
204        logging.info('Skipping pop_profile on %s', self.__class__.__name__)
205
206
207    def push_profile(self, _):
208        """
209        This is a no op, since we don't have profiles.
210
211        @param _ ignored.
212
213        """
214        logging.info('Skipping push_profile on %s', self.__class__.__name__)
215
216
217    def remove_profile(self, _):
218        """
219        This is a no op, since we don't have profiles.
220
221        @param _ ignored.
222
223        """
224        logging.info('Skipping remove_profile on %s', self.__class__.__name__)
225
226
227    def init_test_network_state(self):
228        """Create a clean slate for tests with respect to remembered networks.
229
230        For wpa_cli hosts, this means removing all remembered networks.
231
232        @return True iff operation succeeded, False otherwise.
233
234        """
235        self.clean_profiles()
236        return True
237
238
239    def connect_wifi(self, assoc_params):
240        """
241        Connect to the WiFi network described by AssociationParameters.
242
243        @param assoc_params AssociationParameters object.
244        @return serialized AssociationResult object.
245
246        """
247        logging.debug('connect_wifi()')
248        # Ouptut should look like:
249        #   Using interface 'wlan0'
250        #   0
251        assoc_result = xmlrpc_datatypes.AssociationResult()
252        network_id = self._add_network(assoc_params.ssid)
253        if assoc_params.is_hidden:
254            self.run_wpa_cli_cmd('set_network %d %s %s' %
255                                 (network_id, 'scan_ssid', '1'))
256
257        sec_config = assoc_params.security_config
258        for field, value in six.iteritems(sec_config.get_wpa_cli_properties()):
259            self.run_wpa_cli_cmd('set_network %d %s %s' %
260                                 (network_id, field, value))
261        self.run_wpa_cli_cmd('select_network %d' % network_id)
262
263        # Wait for an appropriate BSS to appear in scan results.
264        scan_results_pattern = '\t'.join(['([0-9a-f:]{17})', # BSSID
265                                          '([0-9]+)',  # Frequency
266                                          '(-[0-9]+)',  # Signal level
267                                          '(.*)',  # Encryption types
268                                          '(.*)'])  # SSID
269        last_scan_time = -1.0
270        start_time = time.time()
271        while time.time() - start_time < assoc_params.discovery_timeout:
272            assoc_result.discovery_time = time.time() - start_time
273            if self._is_associating_or_associated():
274                # Internally, wpa_supplicant writes its scan_results response
275                # to a 4kb buffer.  When there are many BSS's, the buffer fills
276                # up, and we'll never see the BSS we care about in some cases.
277                break
278
279            scan_result = self.run_wpa_cli_cmd('scan_results',
280                                               check_result=False)
281            found_stations = []
282            for line in scan_result.stdout.strip().splitlines():
283                match = re.match(scan_results_pattern, line)
284                if match is None:
285                    continue
286                found_stations.append(
287                        Station(bssid=match.group(1), frequency=match.group(2),
288                                signal=match.group(3), ssid=match.group(5)))
289            logging.debug('Found stations: %r',
290                          [station.ssid for station in found_stations])
291            if [station for station in found_stations
292                    if station.ssid == assoc_params.ssid]:
293                break
294
295            if time.time() - last_scan_time > self.SCANNING_INTERVAL_SECONDS:
296                # Sometimes this might fail with a FAIL-BUSY if the previous
297                # scan hasn't finished.
298                scan_result = self.run_wpa_cli_cmd('scan', check_result=False)
299                if scan_result.stdout.strip().endswith('OK'):
300                    last_scan_time = time.time()
301            time.sleep(self.POLLING_INTERVAL_SECONDS)
302        else:
303            assoc_result.failure_reason = 'Discovery timed out'
304            return assoc_result.serialize()
305
306        # Wait on association to finish.
307        start_time = time.time()
308        success = utils.poll_for_condition(
309                condition=lambda: self._is_associated(assoc_params.ssid),
310                timeout=assoc_params.association_timeout,
311                sleep_interval=self.POLLING_INTERVAL_SECONDS,
312                desc='Wait on association to finish')
313        assoc_result.association_time = time.time() - start_time
314        if not success:
315            assoc_result.failure_reason = 'Association timed out'
316            return assoc_result.serialize()
317
318        # Then wait for ip configuration to finish.
319        start_time = time.time()
320        success = utils.poll_for_condition(
321                condition=lambda: self._is_connected(assoc_params.ssid),
322                timeout=assoc_params.configuration_timeout,
323                sleep_interval=self.POLLING_INTERVAL_SECONDS,
324                desc='Wait for ip configuration to finish')
325        assoc_result.configuration_time = time.time() - start_time
326        if not success:
327            assoc_result.failure_reason = 'DHCP negotiation timed out'
328            return assoc_result.serialize()
329
330        assoc_result.success = True
331        logging.info('Connected to %s', assoc_params.ssid)
332        return assoc_result.serialize()
333
334
335    def disconnect(self, ssid):
336        """
337        Disconnect from a WiFi network named |ssid|.
338
339        @param ssid string: name of network to disable in wpa_supplicant.
340
341        """
342        logging.debug('disconnect()')
343        if ssid not in self._created_networks:
344            return False
345        self.run_wpa_cli_cmd('disable_network %d' %
346                             self._created_networks[ssid])
347        return True
348
349
350    def delete_entries_for_ssid(self, ssid):
351        """Delete a profile entry.
352
353        @param ssid string of WiFi service for which to delete entries.
354        @return True on success, False otherwise.
355        """
356        return self.disconnect(ssid)
357
358
359    def set_device_enabled(self, wifi_interface, enabled):
360        """Enable or disable the WiFi device.
361
362        @param wifi_interface: string name of interface being modified.
363        @param enabled: boolean; true if this device should be enabled,
364                false if this device should be disabled.
365        @return True if it worked; false, otherwise
366
367        """
368        return False
369
370
371    def sync_time_to(self, epoch_seconds):
372        """
373        Sync time on the DUT to |epoch_seconds| from the epoch.
374
375        @param epoch_seconds float: number of seconds since the epoch.
376
377        """
378        # This will claim to fail, but will work anyway.
379        self._host.run('date -u %f' % epoch_seconds, ignore_status=True)
380