xref: /aosp_15_r20/external/autotest/autotest_lib/contrib/suite_utils.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright 2021 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import argparse
8import ast
9from functools import partial
10import os
11import subprocess
12import graphviz
13import common
14
15from server.cros.dynamic_suite.control_file_getter import FileSystemGetter
16from server.cros.dynamic_suite.suite_common import retrieve_for_suite
17
18class TestSuite(object):
19    def __init__(self, cf_object, name, file_path):
20        self.name = name
21        self.cf_object = cf_object
22        self.tests = []
23        self.file_path = file_path
24
25    def add_test(self, test_object):
26        self.tests.append(test_object)
27
28    def get_tests(self):
29        return self.tests
30
31
32class TestObject(object):
33    def __init__(self, cf_object, file_path):
34        self.name = cf_object.name
35        self.type = 'tast' if ('tast' in self.name) else 'tauto'
36        self.cf_object = cf_object
37        self.file_path = file_path
38        self.tast_exprs = ''
39        self.tast_string = ''
40
41    def get_attributes(self):
42        return self.cf_object.attributes
43
44    def is_tast(self):
45        return self.type == 'tast'
46
47    # use the python syntax tree library to parse the run function
48    # and grab the test_expr from the 'tast' run command
49    def parse_cf_for_tast_string(self):
50        with open(self.file_path, 'r') as cf:
51            mod = ast.parse(cf.read())
52            for n in mod.body:
53                if n.__class__ != ast.FunctionDef:
54                    continue
55                if n.name != 'run':
56                    continue
57                for sub_node in n.body:
58                    if sub_node.__class__ != ast.Expr:
59                        continue
60                    try:
61                        fn_name = sub_node.value.func.value.id
62                        if fn_name != 'job':
63                            continue
64                    except:
65                        continue
66                    if sub_node.value.func.attr != 'run_test':
67                        continue
68                    for keyword in sub_node.value.keywords:
69                        if keyword.arg == 'test_exprs' and keyword.value.__class__ == ast.List:
70                            test_exprs = []
71                            regex_list = False
72                            for elem in keyword.value.elts:
73                                try:
74                                    test_exprs.append(elem.s)
75                                    regex_list = ('(' in elem.s or regex_list)
76                                except AttributeError:
77                                    print('WARNING: Non-standard test found, check '
78                                          + self.file_path + ' manually')
79                                    break
80                            if regex_list:
81                                self.tast_string = ' '.join(test_exprs)
82                            else:
83                                for it in range(len(test_exprs) - 1):
84                                    test_exprs[it] = test_exprs[it] + ','
85                                self.tast_string = ' '.join(test_exprs)
86
87    def enumerate_tast_from_test_expr(self):
88        self.parse_cf_for_tast_string()
89        try:
90            self.tast_exprs = self.tast_string.split(', ')
91        except AttributeError:
92            print('WARNING: Non-standard test found, check' + self.file_path +
93                  ' manually')
94
95    def enumerate_tests_from_tast_exprs(self, dut):
96        tests = []
97        print(self.tast_exprs)
98        for expr in self.tast_exprs:
99            en = subprocess.check_output(
100                    ['tast', 'list', str(dut),
101                     str(expr)], encoding='utf-8')
102            for t in en.split('\n'):
103                if t == '':
104                    continue
105                tests.append(t)
106            en = subprocess.check_output([
107                    'tast', 'list', '-buildbundle=crosint',
108                    str(dut),
109                    str(expr)
110            ],
111                                         encoding='utf-8')
112            for t in en.split('\n'):
113                if t == '':
114                    continue
115                tests.append(t)
116
117        return tests
118
119    def describe(self):
120        return 'test named ' + self.name + ' of type ' + self.type
121
122
123class TestParser(object):
124    def get_all_test_objects(self, locations):
125        tests = {}
126        suites = {}
127
128        cf_getter = FileSystemGetter(locations)
129        for (file_path, cf_object) in retrieve_for_suite(cf_getter,
130                                                         '').items():
131            if cf_object.test_class == 'suite':
132                suites[cf_object.name] = (TestSuite(cf_object, cf_object.name,
133                                                    file_path))
134            else:
135                tests[cf_object.name] = (TestObject(cf_object, file_path))
136                if tests[cf_object.name].is_tast():
137                    tests[cf_object.name].enumerate_tast_from_test_expr()
138
139        return tests, suites
140
141
142class TestManager(object):
143    def __init__(self):
144        self.tests = {}
145        self.suites = {}
146        self.dut = None
147        self.log_functions = [partial(print)]
148        self.test_parser = TestParser()
149
150    def log(self, log_text, *args):
151        for fn in self.log_functions:
152            fn(log_text, *args)
153
154    def csv_logger(self, log_text, file_path):
155        with open(file_path, 'a') as log:
156            log.write(log_text)
157
158    def register_csv_logger(self, file_path):
159        if os.path.exists(file_path):
160            os.remove(file_path)
161        print_to_csv = partial(self.csv_logger, file_path=file_path)
162        self.log_functions.append(print_to_csv)
163        print_to_csv('suite,test\n')
164
165    def initialize_from_fs(self, locations):
166        self.tests, self.suites = self.test_parser.get_all_test_objects(
167                locations)
168
169    def process_all_tests(self):
170        for test, test_object in self.tests.items():
171            for suite in test_object.get_attributes():
172                target_suite = self.find_suite_named(suite)
173                if target_suite is not None:
174                    target_suite.add_test(test)
175
176    def set_dut(self, target):
177        self.dut = target
178
179    def get_dut(self):
180        if self.dut is not None:
181            return self.dut
182        else:
183            raise AttributeError(
184                    'DUT Address not set, please use the --dut flag to indicate the ip address of the DUT'
185            )
186
187    def find_test_named(self, test_name):
188        try:
189            queried_test = self.tests[test_name]
190            return queried_test
191        except KeyError:
192            return None
193
194    def find_suite_named(self, suite_name):
195        try:
196            if suite_name[0:6] == 'suite.':
197                queried_suite = self.suites[suite_name[6:]]
198            elif suite_name[0:6] == 'suite:':
199                queried_suite = self.suites[suite_name[6:]]
200            else:
201                queried_suite = self.suites[suite_name]
202            return queried_suite
203        except KeyError:
204            return None
205
206    def list_suite_named(self, suite_name, pretty=False):
207        suite_tests = []
208        suite = self.find_suite_named(suite_name)
209
210        if suite is None:
211            if pretty:
212                return '\n'
213            return suite_tests
214
215        for test in suite.get_tests():
216            if self.tests[test].is_tast():
217                found_tests = self.tests[test].enumerate_tests_from_tast_exprs(
218                        self.get_dut())
219                for t in found_tests:
220                    if t == '':
221                        continue
222                    suite_tests.append('tast.' + str(t))
223            else:
224                suite_tests.append(test)
225
226        if pretty:
227            out_as_string = ''
228            for test in suite_tests:
229                out_as_string += suite_name + ',' + str(test) + '\n'
230            return out_as_string
231        return suite_tests
232
233    def gs_query_link(self, suite_name):
234        test_names = ','.join([
235                test for test in self.list_suite_named(suite_name)
236                if test != ''
237        ])
238
239        query = 'https://dashboards.corp.google.com/'
240        query += '_86acf8a8_50a5_48e0_829e_fbf1033d3ac6'
241        query += '?f=test_name:in:' + test_names
242        query += '&f=create_date_7_day_filter:in:Past%207%20Days'
243        query += '&f=test_type:in:Tast,Autotest'
244
245        return query
246
247    def graph_suite_named(self, suite_name, dot_graph=None):
248        suite_tests = self.list_suite_named(suite_name)
249        nodes_at_rank = 0
250
251        if dot_graph is None:
252            dot_graph = graphviz.Digraph(comment=suite_name)
253
254        dot_graph.node(suite_name, suite_name)
255        last_level = suite_name
256        child_graph = None
257
258        for test_name in suite_tests:
259            if nodes_at_rank == 0:
260                child_graph = graphviz.Digraph()
261                dot_graph.edge(last_level, test_name)
262                last_level = test_name
263
264            child_graph.node(test_name, test_name)
265            dot_graph.edge(suite_name, test_name)
266
267            if nodes_at_rank == 6:
268                dot_graph.subgraph(child_graph)
269
270            nodes_at_rank += 1
271            nodes_at_rank %= 7
272
273        dot_graph.subgraph(child_graph)
274
275        return dot_graph
276
277    def diff_test_suites(self, suite_a, suite_b):
278        res = ''
279        suite_a_set = set(self.list_suite_named(suite_a))
280        suite_b_set = set(self.list_suite_named(suite_b))
281        res = res + ('Suite B (+)' + str(list(suite_b_set - suite_a_set)))
282        res = res + '\n'
283        res = res + ('Suite B (-)' + str(list(suite_a_set - suite_b_set)))
284        return res
285
286
287def main(args):
288    tests = TestManager()
289
290    basepath = os.path.dirname(os.path.abspath(__file__))
291    tests.initialize_from_fs([(basepath + '/../test_suites'),
292                              (basepath + '/../server/site_tests'),
293                              (basepath + '/../client/site_tests')])
294    tests.process_all_tests()
295
296    if args.csv is not None:
297        tests.register_csv_logger(args.csv)
298    if args.dut is not None:
299        tests.set_dut(args.dut)
300    if args.find_test is not None:
301        test = tests.find_test_named(args.find_test)
302        if test is not None:
303            tests.log(test.file_path)
304        else:
305            tests.log('Queried test not found')
306    if args.find_suite is not None:
307        suite = tests.find_suite_named(args.find_suite)
308        if suite is not None:
309            tests.log(suite.file_path)
310        else:
311            tests.log('Queried suite not found')
312    if args.list_suite is not None:
313        tests.log(tests.list_suite_named(args.list_suite, pretty=True))
314    if args.list_multiple_suites is not None:
315        for suite_name in args.list_multiple_suites:
316            tests.log(tests.list_suite_named(suite_name, pretty=True))
317    if args.diff is not None:
318        tests.log(tests.diff_test_suites(args.diff[0], args.diff[1]))
319    if args.graph_suite is not None:
320        graph = tests.graph_suite_named(args.graph_suite)
321        graph.render('./suite_data/suite_viz.gv', format='png')
322    if args.gs_dashboard is not None:
323        link = tests.gs_query_link(args.gs_dashboard)
324        tests.log(link)
325
326
327if __name__ == '__main__':
328    # pass in the url for the DUT via ssh
329    parser = argparse.ArgumentParser()
330    parser.add_argument('--csv',
331                        help='supply csv file path for logging output')
332    parser.add_argument(
333            '--diff',
334            nargs=2,
335            help=
336            'show diff between two suites. Ex: --diff bvt-tast-cq pvs-tast-cq')
337    parser.add_argument('--find_test',
338                        help='find control file for test_name')
339    parser.add_argument('--find_suite',
340                        help='find control file for suite_name')
341    parser.add_argument(
342            '--graph_suite',
343            help=
344            'graph test dependencies of suite_name, will output to contrib/suite_data'
345    )
346    parser.add_argument('--list_suite',
347                        help='list units in suite_name')
348    parser.add_argument(
349            '--list_multiple_suites',
350            nargs='*',
351            help='list units in suite_name_1 suite_name_2 suite_name_n')
352    parser.add_argument('--dut',
353                        help='ip address and port for tast enumeration')
354    parser.add_argument(
355            '--gs_dashboard',
356            help='generate green stainless dashboard for suite_name')
357    parsed_args = parser.parse_args()
358
359    main(parsed_args)
360