xref: /aosp_15_r20/external/autotest/server/cros/chaos_lib/chaos_analyzer.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/python3
2# Lint as: python2, python3
3# Copyright 2015 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7
8
9import argparse
10import os
11import re
12
13from . import chaos_capture_analyzer
14from . import chaos_log_analyzer
15
16class ChaosTestInfo(object):
17    """ Class to gather the relevant test information from a folder. """
18
19    MESSAGES_FILE_NAME = "messages"
20    NET_LOG_FILE_NAME = "net.log"
21    TEST_DEBUG_LOG_FILE_END = "DEBUG"
22    SYSINFO_FOLDER_NAME_END = "sysinfo"
23    TEST_DEBUG_FOLDER_NAME_END = "debug"
24
25    def __init__(self, dir_name, file_names, failures_only):
26        """
27        Gathers all the relevant Chaos test results from a given folder.
28
29        @param dir: Folder to check for test results.
30        @param files: Files present in the folder found during os.walk.
31        @param failures_only: Flag to indicate whether to analyze only
32                              failure test attempts.
33
34        """
35        self._meta_info = None
36        self._traces = []
37        self._message_log = None
38        self._net_log = None
39        self._test_debug_log = None
40        for file_name in file_names:
41            if file_name.endswith('.trc'):
42                basename = os.path.basename(file_name)
43                if 'success' in basename and failures_only:
44                    continue
45                self._traces.append(os.path.join(dir_name, file_name))
46        if self._traces:
47            for root, dir_name, file_names in os.walk(dir_name):
48                # Now get the log files from the sysinfo, debug folder
49                if root.endswith(self.SYSINFO_FOLDER_NAME_END):
50                    # There are multiple copies of |messages| file under
51                    # sysinfo tree. We only want the one directly in sysinfo.
52                    for file_name in file_names:
53                        if file_name == self.MESSAGES_FILE_NAME:
54                            self._message_log = os.path.join(root, file_name)
55                    for root, dir_name, file_names in os.walk(root):
56                        for file_name in file_names:
57                            if file_name == self.NET_LOG_FILE_NAME:
58                                self._net_log = os.path.join(root, file_name)
59                if root.endswith(self.TEST_DEBUG_FOLDER_NAME_END):
60                    for root, dir_name, file_names in os.walk(root):
61                        for file_name in file_names:
62                            if file_name.endswith(self.TEST_DEBUG_LOG_FILE_END):
63                                self._test_debug_log = (
64                                        os.path.join(root, file_name))
65                                self._parse_meta_info(
66                                        os.path.join(root, file_name))
67
68    def _parse_meta_info(self, file):
69        dut_mac_prefix ='\'DUT\': '
70        ap_bssid_prefix ='\'AP Info\': '
71        ap_ssid_prefix ='\'SSID\': '
72        self._meta_info = {}
73        with open(file) as infile:
74            for line in infile.readlines():
75                line = line.strip()
76                if line.startswith(dut_mac_prefix):
77                    dut_mac = line[len(dut_mac_prefix):].rstrip()
78                    self._meta_info['dut_mac'] = (
79                        dut_mac.replace('\'', '').replace(',', ''))
80                if line.startswith(ap_ssid_prefix):
81                    ap_ssid = line[len(ap_ssid_prefix):].rstrip()
82                    self._meta_info['ap_ssid'] = (
83                        ap_ssid.replace('\'', '').replace(',', ''))
84                if line.startswith(ap_bssid_prefix):
85                    debug_info = self._parse_debug_info(line)
86                    if debug_info:
87                        self._meta_info.update(debug_info)
88
89    def _parse_debug_info(self, line):
90        # Example output:
91        #'AP Info': "{'2.4 GHz MAC Address': '84:1b:5e:e9:74:ee', \n
92        #'5 GHz MAC Address': '84:1b:5e:e9:74:ed', \n
93        #'Controller class': 'Netgear3400APConfigurator', \n
94        #'Hostname': 'chromeos3-row2-rack2-host12', \n
95        #'Router name': 'wndr 3700 v3'}",
96        debug_info = line.replace('\'', '')
97        address_label = 'Address: '
98        bssids = []
99        for part in debug_info.split(','):
100            address_index = part.find(address_label)
101            if address_index >= 0:
102                address = part[(address_index+len(address_label)):]
103                if address != 'N/A':
104                    bssids.append(address)
105        if not bssids:
106            return None
107        return { 'ap_bssids': bssids }
108
109    def _is_meta_info_valid(self):
110        return ((self._meta_info is not None) and
111                ('dut_mac' in self._meta_info) and
112                ('ap_ssid' in self._meta_info) and
113                ('ap_bssids' in self._meta_info))
114
115    @property
116    def traces(self):
117        """Returns the trace files path in test info."""
118        return self._traces
119
120    @property
121    def message_log(self):
122        """Returns the message log path in test info."""
123        return self._message_log
124
125    @property
126    def net_log(self):
127        """Returns the net log path in test info."""
128        return self._net_log
129
130    @property
131    def test_debug_log(self):
132        """Returns the test debug log path in test info."""
133        return self._test_debug_log
134
135    @property
136    def bssids(self):
137        """Returns the BSSID of the AP in test info."""
138        return self._meta_info['ap_bssids']
139
140    @property
141    def ssid(self):
142        """Returns the SSID of the AP in test info."""
143        return self._meta_info['ap_ssid']
144
145    @property
146    def dut_mac(self):
147        """Returns the MAC of the DUT in test info."""
148        return self._meta_info['dut_mac']
149
150    def is_valid(self, packet_capture_only):
151        """
152        Checks if the given folder contains a valid Chaos test results.
153
154        @param packet_capture_only: Flag to indicate whether to analyze only
155                                    packet captures.
156
157        @return True if valid chaos results are found; False otherwise.
158
159        """
160        if packet_capture_only:
161            return ((self._is_meta_info_valid()) and
162                    (bool(self._traces)))
163        else:
164            return ((self._is_meta_info_valid()) and
165                    (bool(self._traces)) and
166                    (bool(self._message_log)) and
167                    (bool(self._net_log)))
168
169
170class ChaosLogger(object):
171    """ Class to log the analysis to the given output file. """
172
173    LOG_SECTION_DEMARKER = "--------------------------------------"
174
175    def __init__(self, output):
176        self._output = output
177
178    def log_to_output_file(self, log_msg):
179        """
180        Logs the provided string to the output file.
181
182        @param log_msg: String to print to the output file.
183
184        """
185        self._output.write(log_msg + "\n")
186
187    def log_start_section(self, section_description):
188        """
189        Starts a new section in the output file with demarkers.
190
191        @param log_msg: String to print in section description.
192
193        """
194        self.log_to_output_file(self.LOG_SECTION_DEMARKER)
195        self.log_to_output_file(section_description)
196        self.log_to_output_file(self.LOG_SECTION_DEMARKER)
197
198
199class ChaosAnalyzer(object):
200    """ Main Class to analyze the chaos test output from a given folder. """
201
202    LOG_OUTPUT_FILE_NAME_FORMAT = "chaos_analyzer_try_%s.log"
203    TRACE_FILE_ATTEMPT_NUM_RE = r'\d+'
204
205    def _get_attempt_number_from_trace(self, trace):
206        file_name = os.path.basename(trace)
207        return re.search(self.TRACE_FILE_ATTEMPT_NUM_RE, file_name).group(0)
208
209    def _get_all_test_infos(self, dir_name, failures_only, packet_capture_only):
210        test_infos = []
211        for root, dir, files in os.walk(dir_name):
212            test_info = ChaosTestInfo(root, files, failures_only)
213            if test_info.is_valid(packet_capture_only):
214                test_infos.append(test_info)
215        if not test_infos:
216            print("Did not find any valid test info!")
217        return test_infos
218
219    def analyze(self, input_dir_name=None, output_dir_name=None,
220                failures_only=False, packet_capture_only=False):
221        """
222        Starts the analysis of the Chaos test logs and packet capture.
223
224        @param input_dir_name: Directory which contains the chaos test results.
225        @param output_dir_name: Directory to which the chaos analysis is output.
226        @param failures_only: Flag to indicate whether to analyze only
227                              failure test attempts.
228        @param packet_capture_only: Flag to indicate whether to analyze only
229                                    packet captures.
230
231        """
232        for test_info in self._get_all_test_infos(input_dir_name, failures_only,
233                                                  packet_capture_only):
234            for trace in test_info.traces:
235                attempt_num = self._get_attempt_number_from_trace(trace)
236                trace_dir_name = os.path.dirname(trace)
237                print("Analyzing attempt number: " + attempt_num + \
238                      " from folder: " + os.path.abspath(trace_dir_name))
239                # Store the analysis output in the respective log folder
240                # itself unless there is an explicit output directory
241                # specified in which case we prepend the |testname_| to the
242                # output analysis file name.
243                output_file_name = (
244                        self.LOG_OUTPUT_FILE_NAME_FORMAT % (attempt_num))
245                if not output_dir_name:
246                    output_dir = trace_dir_name
247                else:
248                    output_dir = output_dir_name
249                    output_file_name = "_".join([trace_dir_name,
250                                                 output_file_name])
251                output_file_path = (
252                        os.path.join(output_dir, output_file_name))
253                try:
254                    with open(output_file_path, "w") as output_file:
255                         logger = ChaosLogger(output_file)
256                         protocol_analyzer = (
257                                chaos_capture_analyzer.ChaosCaptureAnalyzer(
258                                        test_info.bssids, test_info.ssid,
259                                        test_info.dut_mac, logger))
260                         protocol_analyzer.analyze(trace)
261                         if not packet_capture_only:
262                             with open(test_info.message_log, "r") as message_log, \
263                                  open(test_info.net_log, "r") as net_log:
264                                  log_analyzer = (
265                                         chaos_log_analyzer.ChaosLogAnalyzer(
266                                                message_log, net_log, logger))
267                                  log_analyzer.analyze(attempt_num)
268                except IOError as e:
269                    print('Operation failed: %s!' % e.strerror)
270
271
272def main():
273    # By default the script parses all the logs places under the current
274    # directory and places the analyzed output for each set of logs in their own
275    # respective directories.
276    parser = argparse.ArgumentParser(description='Analyze Chaos logs.')
277    parser.add_argument('-f', '--failures-only', action='store_true',
278                        help='analyze only failure logs.')
279    parser.add_argument('-p', '--packet-capture-only', action='store_true',
280                        help='analyze only packet captures.')
281    parser.add_argument('-i', '--input-dir', action='store', default='.',
282                        help='process the logs from directory.')
283    parser.add_argument('-o', '--output-dir', action='store',
284                        help='output the analysis to directory.')
285    args = parser.parse_args()
286    chaos_analyzer = ChaosAnalyzer()
287    chaos_analyzer.analyze(input_dir_name=args.input_dir,
288                           output_dir_name=args.output_dir,
289                           failures_only=args.failures_only,
290                           packet_capture_only=args.packet_capture_only)
291
292if __name__ == "__main__":
293    main()
294
295