#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright 2021 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import argparse import ast from functools import partial import os import subprocess import graphviz import common from server.cros.dynamic_suite.control_file_getter import FileSystemGetter from server.cros.dynamic_suite.suite_common import retrieve_for_suite class TestSuite(object): def __init__(self, cf_object, name, file_path): self.name = name self.cf_object = cf_object self.tests = [] self.file_path = file_path def add_test(self, test_object): self.tests.append(test_object) def get_tests(self): return self.tests class TestObject(object): def __init__(self, cf_object, file_path): self.name = cf_object.name self.type = 'tast' if ('tast' in self.name) else 'tauto' self.cf_object = cf_object self.file_path = file_path self.tast_exprs = '' self.tast_string = '' def get_attributes(self): return self.cf_object.attributes def is_tast(self): return self.type == 'tast' # use the python syntax tree library to parse the run function # and grab the test_expr from the 'tast' run command def parse_cf_for_tast_string(self): with open(self.file_path, 'r') as cf: mod = ast.parse(cf.read()) for n in mod.body: if n.__class__ != ast.FunctionDef: continue if n.name != 'run': continue for sub_node in n.body: if sub_node.__class__ != ast.Expr: continue try: fn_name = sub_node.value.func.value.id if fn_name != 'job': continue except: continue if sub_node.value.func.attr != 'run_test': continue for keyword in sub_node.value.keywords: if keyword.arg == 'test_exprs' and keyword.value.__class__ == ast.List: test_exprs = [] regex_list = False for elem in keyword.value.elts: try: test_exprs.append(elem.s) regex_list = ('(' in elem.s or regex_list) except AttributeError: print('WARNING: Non-standard test found, check ' + self.file_path + ' manually') break if regex_list: self.tast_string = ' '.join(test_exprs) else: for it in range(len(test_exprs) - 1): test_exprs[it] = test_exprs[it] + ',' self.tast_string = ' '.join(test_exprs) def enumerate_tast_from_test_expr(self): self.parse_cf_for_tast_string() try: self.tast_exprs = self.tast_string.split(', ') except AttributeError: print('WARNING: Non-standard test found, check' + self.file_path + ' manually') def enumerate_tests_from_tast_exprs(self, dut): tests = [] print(self.tast_exprs) for expr in self.tast_exprs: en = subprocess.check_output( ['tast', 'list', str(dut), str(expr)], encoding='utf-8') for t in en.split('\n'): if t == '': continue tests.append(t) en = subprocess.check_output([ 'tast', 'list', '-buildbundle=crosint', str(dut), str(expr) ], encoding='utf-8') for t in en.split('\n'): if t == '': continue tests.append(t) return tests def describe(self): return 'test named ' + self.name + ' of type ' + self.type class TestParser(object): def get_all_test_objects(self, locations): tests = {} suites = {} cf_getter = FileSystemGetter(locations) for (file_path, cf_object) in retrieve_for_suite(cf_getter, '').items(): if cf_object.test_class == 'suite': suites[cf_object.name] = (TestSuite(cf_object, cf_object.name, file_path)) else: tests[cf_object.name] = (TestObject(cf_object, file_path)) if tests[cf_object.name].is_tast(): tests[cf_object.name].enumerate_tast_from_test_expr() return tests, suites class TestManager(object): def __init__(self): self.tests = {} self.suites = {} self.dut = None self.log_functions = [partial(print)] self.test_parser = TestParser() def log(self, log_text, *args): for fn in self.log_functions: fn(log_text, *args) def csv_logger(self, log_text, file_path): with open(file_path, 'a') as log: log.write(log_text) def register_csv_logger(self, file_path): if os.path.exists(file_path): os.remove(file_path) print_to_csv = partial(self.csv_logger, file_path=file_path) self.log_functions.append(print_to_csv) print_to_csv('suite,test\n') def initialize_from_fs(self, locations): self.tests, self.suites = self.test_parser.get_all_test_objects( locations) def process_all_tests(self): for test, test_object in self.tests.items(): for suite in test_object.get_attributes(): target_suite = self.find_suite_named(suite) if target_suite is not None: target_suite.add_test(test) def set_dut(self, target): self.dut = target def get_dut(self): if self.dut is not None: return self.dut else: raise AttributeError( 'DUT Address not set, please use the --dut flag to indicate the ip address of the DUT' ) def find_test_named(self, test_name): try: queried_test = self.tests[test_name] return queried_test except KeyError: return None def find_suite_named(self, suite_name): try: if suite_name[0:6] == 'suite.': queried_suite = self.suites[suite_name[6:]] elif suite_name[0:6] == 'suite:': queried_suite = self.suites[suite_name[6:]] else: queried_suite = self.suites[suite_name] return queried_suite except KeyError: return None def list_suite_named(self, suite_name, pretty=False): suite_tests = [] suite = self.find_suite_named(suite_name) if suite is None: if pretty: return '\n' return suite_tests for test in suite.get_tests(): if self.tests[test].is_tast(): found_tests = self.tests[test].enumerate_tests_from_tast_exprs( self.get_dut()) for t in found_tests: if t == '': continue suite_tests.append('tast.' + str(t)) else: suite_tests.append(test) if pretty: out_as_string = '' for test in suite_tests: out_as_string += suite_name + ',' + str(test) + '\n' return out_as_string return suite_tests def gs_query_link(self, suite_name): test_names = ','.join([ test for test in self.list_suite_named(suite_name) if test != '' ]) query = 'https://dashboards.corp.google.com/' query += '_86acf8a8_50a5_48e0_829e_fbf1033d3ac6' query += '?f=test_name:in:' + test_names query += '&f=create_date_7_day_filter:in:Past%207%20Days' query += '&f=test_type:in:Tast,Autotest' return query def graph_suite_named(self, suite_name, dot_graph=None): suite_tests = self.list_suite_named(suite_name) nodes_at_rank = 0 if dot_graph is None: dot_graph = graphviz.Digraph(comment=suite_name) dot_graph.node(suite_name, suite_name) last_level = suite_name child_graph = None for test_name in suite_tests: if nodes_at_rank == 0: child_graph = graphviz.Digraph() dot_graph.edge(last_level, test_name) last_level = test_name child_graph.node(test_name, test_name) dot_graph.edge(suite_name, test_name) if nodes_at_rank == 6: dot_graph.subgraph(child_graph) nodes_at_rank += 1 nodes_at_rank %= 7 dot_graph.subgraph(child_graph) return dot_graph def diff_test_suites(self, suite_a, suite_b): res = '' suite_a_set = set(self.list_suite_named(suite_a)) suite_b_set = set(self.list_suite_named(suite_b)) res = res + ('Suite B (+)' + str(list(suite_b_set - suite_a_set))) res = res + '\n' res = res + ('Suite B (-)' + str(list(suite_a_set - suite_b_set))) return res def main(args): tests = TestManager() basepath = os.path.dirname(os.path.abspath(__file__)) tests.initialize_from_fs([(basepath + '/../test_suites'), (basepath + '/../server/site_tests'), (basepath + '/../client/site_tests')]) tests.process_all_tests() if args.csv is not None: tests.register_csv_logger(args.csv) if args.dut is not None: tests.set_dut(args.dut) if args.find_test is not None: test = tests.find_test_named(args.find_test) if test is not None: tests.log(test.file_path) else: tests.log('Queried test not found') if args.find_suite is not None: suite = tests.find_suite_named(args.find_suite) if suite is not None: tests.log(suite.file_path) else: tests.log('Queried suite not found') if args.list_suite is not None: tests.log(tests.list_suite_named(args.list_suite, pretty=True)) if args.list_multiple_suites is not None: for suite_name in args.list_multiple_suites: tests.log(tests.list_suite_named(suite_name, pretty=True)) if args.diff is not None: tests.log(tests.diff_test_suites(args.diff[0], args.diff[1])) if args.graph_suite is not None: graph = tests.graph_suite_named(args.graph_suite) graph.render('./suite_data/suite_viz.gv', format='png') if args.gs_dashboard is not None: link = tests.gs_query_link(args.gs_dashboard) tests.log(link) if __name__ == '__main__': # pass in the url for the DUT via ssh parser = argparse.ArgumentParser() parser.add_argument('--csv', help='supply csv file path for logging output') parser.add_argument( '--diff', nargs=2, help= 'show diff between two suites. Ex: --diff bvt-tast-cq pvs-tast-cq') parser.add_argument('--find_test', help='find control file for test_name') parser.add_argument('--find_suite', help='find control file for suite_name') parser.add_argument( '--graph_suite', help= 'graph test dependencies of suite_name, will output to contrib/suite_data' ) parser.add_argument('--list_suite', help='list units in suite_name') parser.add_argument( '--list_multiple_suites', nargs='*', help='list units in suite_name_1 suite_name_2 suite_name_n') parser.add_argument('--dut', help='ip address and port for tast enumeration') parser.add_argument( '--gs_dashboard', help='generate green stainless dashboard for suite_name') parsed_args = parser.parse_args() main(parsed_args)