xref: /aosp_15_r20/external/autotest/server/cros/network/wpa_mon.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2020 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from collections import namedtuple
7import os
8import re
9import time
10
11class WpaMon(object):
12    """wpa_supplicant event monitor."""
13
14    WPAS_CTRL_DIR = '/var/run/wpa_supplicant/'
15    LOCAL_CTRL = 'local_ctrl'
16    REQUEST_PIPE = 'request_pipe'
17    WPAS_EVENT_LOG = 'wpa_event.log'
18
19    CTRL_EVENT_DO_ROAM = 'CTRL-EVENT-DO-ROAM'
20    CTRL_EVENT_SKIP_ROAM = 'CTRL-EVENT-SKIP-ROAM'
21    CTRL_EVENT_DISCONNECTED = 'CTRL-EVENT-DISCONNECTED'
22    CTRL_EVENT_SCAN_RESULTS = 'CTRL-EVENT-SCAN-RESULTS'
23    CTRL_EVENT_BSS_ADDED = 'CTRL-EVENT-BSS-ADDED'
24
25    ROAM_MATCH = str(r' cur_bssid=([\da-fA-F:]+) cur_freq=(\d+) ' \
26                 r'cur_level=([\d-]+) cur_est=(\d+) ' \
27                 r'sel_bssid=([\da-fA-F:]+) sel_freq=(\d+) ' \
28                 r'sel_level=([\d-]+) sel_est=(\d+)')
29    DISCONNECT_MATCH = str(r' bssid=([\da-fA-F:]+) reason=(\d+)' \
30                       '(?: locally_generated=(1))?')
31    SCAN_RESULTS_MATCH = str(r'()')
32    BSS_ADDED_MATCH = str(r' ([\d]+) ([\da-fA-F:]+)')
33
34    Roam = namedtuple('Roam',
35                      ['cur_bssid', 'cur_freq', 'cur_level', 'cur_est',
36                       'sel_bssid', 'sel_freq', 'sel_level', 'sel_est'])
37    Disconnect = namedtuple('Disconnect', ['bssid', 'reason',
38                                           'locally_generated'])
39    ScanResults = namedtuple('ScanResults', [])
40    Bss = namedtuple('Bss', ['id', 'bssid'])
41
42    MatchFields = namedtuple('MatchFields', ['match_str', 'obj'])
43
44    EVENT_MATCH_DICT = \
45        {CTRL_EVENT_DO_ROAM: MatchFields(ROAM_MATCH, Roam),
46         CTRL_EVENT_SKIP_ROAM: MatchFields(ROAM_MATCH, Roam),
47         CTRL_EVENT_DISCONNECTED: MatchFields(DISCONNECT_MATCH, Disconnect),
48         CTRL_EVENT_SCAN_RESULTS: MatchFields(SCAN_RESULTS_MATCH, ScanResults),
49         CTRL_EVENT_BSS_ADDED: MatchFields(BSS_ADDED_MATCH, Bss),
50        }
51
52    def __init__(self, host, wifi_if):
53        self._host = host
54        self._dest = os.path.join(self.WPAS_CTRL_DIR, wifi_if)
55        self._pgid = None
56        self._started = False
57
58    def __enter__(self):
59        """Connect to wpa_supplicant control interface."""
60        tmp_dir = self._host.get_tmp_dir()
61        tmp_dir = self._host.get_tmp_dir(parent=tmp_dir)
62        # Relax permissions for self._tmp_dir so that socat (run as wpa user)
63        # can create files in this directory.
64        self._host.run('chmod 777 %s' % tmp_dir)
65        local = os.path.join(tmp_dir, self.LOCAL_CTRL)
66        self._pipe = os.path.join(tmp_dir, self.REQUEST_PIPE)
67        self._log_path = os.path.join(tmp_dir, self.WPAS_EVENT_LOG)
68        # Run socat as wpa user so that the socket we bind to can be written to
69        # by wpa_supplicant. We use a `tail -f` on a named pipe to send requests
70        # to wpa_supplicant because `tail -f` continues to read even after it
71        # encounters an EOF. Using `cat` or the PIPE address type would close
72        # the input stream after the first write, instructing socat to tear
73        # everything else down.
74        command = r"nohup sudo -u wpa -g wpa socat SYSTEM:'mkfifo %s; " \
75                  r"tail -f %s'\!\!STDOUT UNIX-CONNECT:%s,type=2,bind=%s " \
76                  r"</dev/null >%s 2>&1 & echo $!" % \
77                  (self._pipe, self._pipe, self._dest, local, self._log_path)
78        out_lines = self._host.run(command).stdout.splitlines()
79        pid = int(out_lines[0])
80        self._pgid = \
81            int(self._host.run('ps -p %d -o pgid=' % pid).stdout.strip())
82        self._capture_index = 0
83        self._start()
84        return self
85
86    def __exit__(self, exception, value, traceback):
87        """Disconnect from wpa_supplicant control interface."""
88        self._stop()
89        # socat spawns a subprocess with the SYSTEM address type, so we must
90        # kill the process group in order to properly clean up.
91        self._host.run('kill -- -%d' % self._pgid)
92        self._pgid = None
93        self._capture_index = 0
94
95    def _start(self):
96        """
97        Attach to the wpa_supplicant control interface to start subscribing to
98        events.
99
100        @return False if already attached, True otherwise.
101        """
102        if self._started:
103            return False
104        self._request('ATTACH')
105        self._started = True
106        return True
107
108    def _stop(self):
109        """
110        Detach from the wpa_supplicant control interface to no longer receive
111        events.
112
113        @return False if not currently attached, True otherwise.
114        """
115        if not self._started:
116            return False
117        self._request('DETACH')
118        self._started = False
119        return True
120
121    def _request(self, cmd):
122        """
123        Send a request to the control interface by writing to the named pipe.
124
125        We use the -n option because wpa_supplicant expects there to be no
126        newline character after the command.
127
128        @param cmd string: command to run
129        """
130        self._host.run('echo -n "%s" > %s' % (cmd, self._pipe))
131
132    def get_log_entries(self):
133        """
134        Get all event log entries and command replies.
135
136        @return string event log
137        """
138        return self._host.run('cat %s' % self._log_path).stdout.rstrip()
139
140    def start_event_capture(self):
141        """
142        Set _capture_index to mark the point in the logs at which an event
143        capture was started.
144        """
145        self._capture_index = len(self.get_log_entries())
146
147    def wait_for_event(self, event, timeout=10, sleep_interval=1.0, attrs={}):
148        """
149        Wait for a wpa_supplicant event. start_event_capture should be called
150        before this.
151
152        @param event string: the wpa_supplicant event to wait for.
153        @param timeout int: timeout in seconds.
154        @param sleep_interval float: sleep interval in seconds.
155        @return list of strings of all event occurrences.
156        """
157        start_time = time.time()
158        while True:
159            objs = self.get_events(event, True, attrs)
160            if objs:
161                return objs
162            if time.time() + sleep_interval - start_time > timeout:
163                return []
164            time.sleep(sleep_interval)
165        return []
166
167    def get_events(self, event, captured_events=False, attrs={}):
168        """
169        Get all wpa_supplicant events of type |event|.
170
171        @param event string: the wpa_supplicant event to get.
172        @param captured_events boolean: True to get events starting from the
173        last start_event_capture call, False to get all events.
174        @return list of namedtuples corresponding to the event.
175        """
176        wpa_log = self.get_log_entries()
177        if captured_events:
178            wpa_log = wpa_log[self._capture_index:]
179        match_str = event + self.EVENT_MATCH_DICT[event].match_str
180        matches = re.findall(match_str, wpa_log)
181        objs = []
182        for match in matches:
183            obj = self.EVENT_MATCH_DICT[event].obj(*match)
184            does_match = True
185            for attr, val in list(attrs.items()):
186                if getattr(obj, attr) != val:
187                    does_match = False
188                    break
189            if does_match:
190                objs.append(obj)
191        return objs
192