1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright 2021 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import argparse 8import ast 9from functools import partial 10import os 11import subprocess 12import graphviz 13import common 14 15from server.cros.dynamic_suite.control_file_getter import FileSystemGetter 16from server.cros.dynamic_suite.suite_common import retrieve_for_suite 17 18class TestSuite(object): 19 def __init__(self, cf_object, name, file_path): 20 self.name = name 21 self.cf_object = cf_object 22 self.tests = [] 23 self.file_path = file_path 24 25 def add_test(self, test_object): 26 self.tests.append(test_object) 27 28 def get_tests(self): 29 return self.tests 30 31 32class TestObject(object): 33 def __init__(self, cf_object, file_path): 34 self.name = cf_object.name 35 self.type = 'tast' if ('tast' in self.name) else 'tauto' 36 self.cf_object = cf_object 37 self.file_path = file_path 38 self.tast_exprs = '' 39 self.tast_string = '' 40 41 def get_attributes(self): 42 return self.cf_object.attributes 43 44 def is_tast(self): 45 return self.type == 'tast' 46 47 # use the python syntax tree library to parse the run function 48 # and grab the test_expr from the 'tast' run command 49 def parse_cf_for_tast_string(self): 50 with open(self.file_path, 'r') as cf: 51 mod = ast.parse(cf.read()) 52 for n in mod.body: 53 if n.__class__ != ast.FunctionDef: 54 continue 55 if n.name != 'run': 56 continue 57 for sub_node in n.body: 58 if sub_node.__class__ != ast.Expr: 59 continue 60 try: 61 fn_name = sub_node.value.func.value.id 62 if fn_name != 'job': 63 continue 64 except: 65 continue 66 if sub_node.value.func.attr != 'run_test': 67 continue 68 for keyword in sub_node.value.keywords: 69 if keyword.arg == 'test_exprs' and keyword.value.__class__ == ast.List: 70 test_exprs = [] 71 regex_list = False 72 for elem in keyword.value.elts: 73 try: 74 test_exprs.append(elem.s) 75 regex_list = ('(' in elem.s or regex_list) 76 except AttributeError: 77 print('WARNING: Non-standard test found, check ' 78 + self.file_path + ' manually') 79 break 80 if regex_list: 81 self.tast_string = ' '.join(test_exprs) 82 else: 83 for it in range(len(test_exprs) - 1): 84 test_exprs[it] = test_exprs[it] + ',' 85 self.tast_string = ' '.join(test_exprs) 86 87 def enumerate_tast_from_test_expr(self): 88 self.parse_cf_for_tast_string() 89 try: 90 self.tast_exprs = self.tast_string.split(', ') 91 except AttributeError: 92 print('WARNING: Non-standard test found, check' + self.file_path + 93 ' manually') 94 95 def enumerate_tests_from_tast_exprs(self, dut): 96 tests = [] 97 print(self.tast_exprs) 98 for expr in self.tast_exprs: 99 en = subprocess.check_output( 100 ['tast', 'list', str(dut), 101 str(expr)], encoding='utf-8') 102 for t in en.split('\n'): 103 if t == '': 104 continue 105 tests.append(t) 106 en = subprocess.check_output([ 107 'tast', 'list', '-buildbundle=crosint', 108 str(dut), 109 str(expr) 110 ], 111 encoding='utf-8') 112 for t in en.split('\n'): 113 if t == '': 114 continue 115 tests.append(t) 116 117 return tests 118 119 def describe(self): 120 return 'test named ' + self.name + ' of type ' + self.type 121 122 123class TestParser(object): 124 def get_all_test_objects(self, locations): 125 tests = {} 126 suites = {} 127 128 cf_getter = FileSystemGetter(locations) 129 for (file_path, cf_object) in retrieve_for_suite(cf_getter, 130 '').items(): 131 if cf_object.test_class == 'suite': 132 suites[cf_object.name] = (TestSuite(cf_object, cf_object.name, 133 file_path)) 134 else: 135 tests[cf_object.name] = (TestObject(cf_object, file_path)) 136 if tests[cf_object.name].is_tast(): 137 tests[cf_object.name].enumerate_tast_from_test_expr() 138 139 return tests, suites 140 141 142class TestManager(object): 143 def __init__(self): 144 self.tests = {} 145 self.suites = {} 146 self.dut = None 147 self.log_functions = [partial(print)] 148 self.test_parser = TestParser() 149 150 def log(self, log_text, *args): 151 for fn in self.log_functions: 152 fn(log_text, *args) 153 154 def csv_logger(self, log_text, file_path): 155 with open(file_path, 'a') as log: 156 log.write(log_text) 157 158 def register_csv_logger(self, file_path): 159 if os.path.exists(file_path): 160 os.remove(file_path) 161 print_to_csv = partial(self.csv_logger, file_path=file_path) 162 self.log_functions.append(print_to_csv) 163 print_to_csv('suite,test\n') 164 165 def initialize_from_fs(self, locations): 166 self.tests, self.suites = self.test_parser.get_all_test_objects( 167 locations) 168 169 def process_all_tests(self): 170 for test, test_object in self.tests.items(): 171 for suite in test_object.get_attributes(): 172 target_suite = self.find_suite_named(suite) 173 if target_suite is not None: 174 target_suite.add_test(test) 175 176 def set_dut(self, target): 177 self.dut = target 178 179 def get_dut(self): 180 if self.dut is not None: 181 return self.dut 182 else: 183 raise AttributeError( 184 'DUT Address not set, please use the --dut flag to indicate the ip address of the DUT' 185 ) 186 187 def find_test_named(self, test_name): 188 try: 189 queried_test = self.tests[test_name] 190 return queried_test 191 except KeyError: 192 return None 193 194 def find_suite_named(self, suite_name): 195 try: 196 if suite_name[0:6] == 'suite.': 197 queried_suite = self.suites[suite_name[6:]] 198 elif suite_name[0:6] == 'suite:': 199 queried_suite = self.suites[suite_name[6:]] 200 else: 201 queried_suite = self.suites[suite_name] 202 return queried_suite 203 except KeyError: 204 return None 205 206 def list_suite_named(self, suite_name, pretty=False): 207 suite_tests = [] 208 suite = self.find_suite_named(suite_name) 209 210 if suite is None: 211 if pretty: 212 return '\n' 213 return suite_tests 214 215 for test in suite.get_tests(): 216 if self.tests[test].is_tast(): 217 found_tests = self.tests[test].enumerate_tests_from_tast_exprs( 218 self.get_dut()) 219 for t in found_tests: 220 if t == '': 221 continue 222 suite_tests.append('tast.' + str(t)) 223 else: 224 suite_tests.append(test) 225 226 if pretty: 227 out_as_string = '' 228 for test in suite_tests: 229 out_as_string += suite_name + ',' + str(test) + '\n' 230 return out_as_string 231 return suite_tests 232 233 def gs_query_link(self, suite_name): 234 test_names = ','.join([ 235 test for test in self.list_suite_named(suite_name) 236 if test != '' 237 ]) 238 239 query = 'https://dashboards.corp.google.com/' 240 query += '_86acf8a8_50a5_48e0_829e_fbf1033d3ac6' 241 query += '?f=test_name:in:' + test_names 242 query += '&f=create_date_7_day_filter:in:Past%207%20Days' 243 query += '&f=test_type:in:Tast,Autotest' 244 245 return query 246 247 def graph_suite_named(self, suite_name, dot_graph=None): 248 suite_tests = self.list_suite_named(suite_name) 249 nodes_at_rank = 0 250 251 if dot_graph is None: 252 dot_graph = graphviz.Digraph(comment=suite_name) 253 254 dot_graph.node(suite_name, suite_name) 255 last_level = suite_name 256 child_graph = None 257 258 for test_name in suite_tests: 259 if nodes_at_rank == 0: 260 child_graph = graphviz.Digraph() 261 dot_graph.edge(last_level, test_name) 262 last_level = test_name 263 264 child_graph.node(test_name, test_name) 265 dot_graph.edge(suite_name, test_name) 266 267 if nodes_at_rank == 6: 268 dot_graph.subgraph(child_graph) 269 270 nodes_at_rank += 1 271 nodes_at_rank %= 7 272 273 dot_graph.subgraph(child_graph) 274 275 return dot_graph 276 277 def diff_test_suites(self, suite_a, suite_b): 278 res = '' 279 suite_a_set = set(self.list_suite_named(suite_a)) 280 suite_b_set = set(self.list_suite_named(suite_b)) 281 res = res + ('Suite B (+)' + str(list(suite_b_set - suite_a_set))) 282 res = res + '\n' 283 res = res + ('Suite B (-)' + str(list(suite_a_set - suite_b_set))) 284 return res 285 286 287def main(args): 288 tests = TestManager() 289 290 basepath = os.path.dirname(os.path.abspath(__file__)) 291 tests.initialize_from_fs([(basepath + '/../test_suites'), 292 (basepath + '/../server/site_tests'), 293 (basepath + '/../client/site_tests')]) 294 tests.process_all_tests() 295 296 if args.csv is not None: 297 tests.register_csv_logger(args.csv) 298 if args.dut is not None: 299 tests.set_dut(args.dut) 300 if args.find_test is not None: 301 test = tests.find_test_named(args.find_test) 302 if test is not None: 303 tests.log(test.file_path) 304 else: 305 tests.log('Queried test not found') 306 if args.find_suite is not None: 307 suite = tests.find_suite_named(args.find_suite) 308 if suite is not None: 309 tests.log(suite.file_path) 310 else: 311 tests.log('Queried suite not found') 312 if args.list_suite is not None: 313 tests.log(tests.list_suite_named(args.list_suite, pretty=True)) 314 if args.list_multiple_suites is not None: 315 for suite_name in args.list_multiple_suites: 316 tests.log(tests.list_suite_named(suite_name, pretty=True)) 317 if args.diff is not None: 318 tests.log(tests.diff_test_suites(args.diff[0], args.diff[1])) 319 if args.graph_suite is not None: 320 graph = tests.graph_suite_named(args.graph_suite) 321 graph.render('./suite_data/suite_viz.gv', format='png') 322 if args.gs_dashboard is not None: 323 link = tests.gs_query_link(args.gs_dashboard) 324 tests.log(link) 325 326 327if __name__ == '__main__': 328 # pass in the url for the DUT via ssh 329 parser = argparse.ArgumentParser() 330 parser.add_argument('--csv', 331 help='supply csv file path for logging output') 332 parser.add_argument( 333 '--diff', 334 nargs=2, 335 help= 336 'show diff between two suites. Ex: --diff bvt-tast-cq pvs-tast-cq') 337 parser.add_argument('--find_test', 338 help='find control file for test_name') 339 parser.add_argument('--find_suite', 340 help='find control file for suite_name') 341 parser.add_argument( 342 '--graph_suite', 343 help= 344 'graph test dependencies of suite_name, will output to contrib/suite_data' 345 ) 346 parser.add_argument('--list_suite', 347 help='list units in suite_name') 348 parser.add_argument( 349 '--list_multiple_suites', 350 nargs='*', 351 help='list units in suite_name_1 suite_name_2 suite_name_n') 352 parser.add_argument('--dut', 353 help='ip address and port for tast enumeration') 354 parser.add_argument( 355 '--gs_dashboard', 356 help='generate green stainless dashboard for suite_name') 357 parsed_args = parser.parse_args() 358 359 main(parsed_args) 360