1*9c5db199SXin Li# Lint as: python3 2*9c5db199SXin Li# Copyright 2020 The Chromium OS Authors. All rights reserved. 3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be 4*9c5db199SXin Li# found in the LICENSE file. 5*9c5db199SXin Li 6*9c5db199SXin Liimport ast 7*9c5db199SXin Liimport collections 8*9c5db199SXin Liimport glob 9*9c5db199SXin Liimport logging 10*9c5db199SXin Liimport os 11*9c5db199SXin Liimport pdb 12*9c5db199SXin Lifrom typing import Generator, List, Optional, Tuple 13*9c5db199SXin Li 14*9c5db199SXin Li 15*9c5db199SXin Liclass ControlFileError(Exception): 16*9c5db199SXin Li """Generic error from this package.""" 17*9c5db199SXin Li 18*9c5db199SXin Li 19*9c5db199SXin LiControl = collections.namedtuple('Control', 20*9c5db199SXin Li 'path, category, name, suites, main_package') 21*9c5db199SXin Li 22*9c5db199SXin Li 23*9c5db199SXin Lidef load_all() -> List[Control]: 24*9c5db199SXin Li controls = [] 25*9c5db199SXin Li for path in _enumerate_files(): 26*9c5db199SXin Li logging.debug('Processing %s', path) 27*9c5db199SXin Li control = _load_one(path) 28*9c5db199SXin Li logging.debug(' -> %s', control) 29*9c5db199SXin Li controls.append(control) 30*9c5db199SXin Li return controls 31*9c5db199SXin Li 32*9c5db199SXin Li 33*9c5db199SXin Li_ROOT_DIR = os.path.realpath( 34*9c5db199SXin Li os.path.join(os.path.realpath(__file__), "../../..")) 35*9c5db199SXin Li 36*9c5db199SXin Li 37*9c5db199SXin Lidef _enumerate_files() -> Generator[str, None, None]: 38*9c5db199SXin Li for ttype in ['client', 'server']: 39*9c5db199SXin Li tpath = os.path.join(_ROOT_DIR, ttype) 40*9c5db199SXin Li for path in glob.iglob(tpath + '/site_tests/*/control*'): 41*9c5db199SXin Li # There are some text files with names like control_sequence.txt 42*9c5db199SXin Li _, ext = os.path.splitext(path) 43*9c5db199SXin Li if ext: 44*9c5db199SXin Li continue 45*9c5db199SXin Li yield path 46*9c5db199SXin Li 47*9c5db199SXin Li 48*9c5db199SXin Lidef _load_one(path: str) -> Control: 49*9c5db199SXin Li with open(path) as r: 50*9c5db199SXin Li text = r.read() 51*9c5db199SXin Li module = ast.parse(text) 52*9c5db199SXin Li name = _extract_name(module) 53*9c5db199SXin Li category, name = _categorize_name(name) 54*9c5db199SXin Li return Control( 55*9c5db199SXin Li path=path, 56*9c5db199SXin Li category=category, 57*9c5db199SXin Li name=name, 58*9c5db199SXin Li suites=_extract_suites(module), 59*9c5db199SXin Li main_package=_extract_main_package(path, module) or '', 60*9c5db199SXin Li ) 61*9c5db199SXin Li 62*9c5db199SXin Li 63*9c5db199SXin Lidef _extract_name(module: ast.Module) -> Optional[str]: 64*9c5db199SXin Li stmt = _find_last_global_assignment(module.body, 'NAME') 65*9c5db199SXin Li if stmt is None: 66*9c5db199SXin Li raise ControlFileError('No global NAME assignment') 67*9c5db199SXin Li name = _extract_str_value(stmt) 68*9c5db199SXin Li if not name: 69*9c5db199SXin Li raise ControlFileError('Empty value') 70*9c5db199SXin Li return name 71*9c5db199SXin Li 72*9c5db199SXin Li 73*9c5db199SXin Lidef _find_last_global_assignment(stmts: List[ast.Assign], 74*9c5db199SXin Li name: str) -> Optional[ast.Assign]: 75*9c5db199SXin Li found = _find_global_assignments(stmts, name) 76*9c5db199SXin Li if len(found) > 0: 77*9c5db199SXin Li return found[-1] 78*9c5db199SXin Li return None 79*9c5db199SXin Li 80*9c5db199SXin Li 81*9c5db199SXin Lidef _find_global_assignments(stmts: List[ast.Assign], 82*9c5db199SXin Li name: str) -> List[ast.Assign]: 83*9c5db199SXin Li found = [] 84*9c5db199SXin Li for stmt in stmts: 85*9c5db199SXin Li if isinstance(stmt, ast.Assign) and _contains_name(stmt.targets, name): 86*9c5db199SXin Li found.append(stmt) 87*9c5db199SXin Li return found 88*9c5db199SXin Li 89*9c5db199SXin Li 90*9c5db199SXin Lidef _contains_name(targets: List[ast.Expr], want: str) -> bool: 91*9c5db199SXin Li for target in targets: 92*9c5db199SXin Li if not isinstance(target, ast.Name): 93*9c5db199SXin Li # We do not support complex lvalues. 94*9c5db199SXin Li # In particular, multi-valued assignments are not handled properly. 95*9c5db199SXin Li continue 96*9c5db199SXin Li name: ast.Name = target 97*9c5db199SXin Li if name.id == want: 98*9c5db199SXin Li return True 99*9c5db199SXin Li return False 100*9c5db199SXin Li 101*9c5db199SXin Li 102*9c5db199SXin Lidef _extract_str_value(stmt: ast.Assign) -> str: 103*9c5db199SXin Li if not isinstance(stmt.value, ast.Constant): 104*9c5db199SXin Li raise ControlFileError( 105*9c5db199SXin Li 'Name assignment value is of type %s, want ast.Constant' % 106*9c5db199SXin Li type(stmt.value)) 107*9c5db199SXin Li v = str(stmt.value.value) 108*9c5db199SXin Li return v 109*9c5db199SXin Li 110*9c5db199SXin Li 111*9c5db199SXin Lidef _categorize_name(name: str) -> Tuple[str, str]: 112*9c5db199SXin Li parts = name.split('_', 1) 113*9c5db199SXin Li if len(parts) == 2: 114*9c5db199SXin Li category, rest = parts[0], parts[1] 115*9c5db199SXin Li else: 116*9c5db199SXin Li category, rest = '', parts[0] 117*9c5db199SXin Li return category, rest 118*9c5db199SXin Li 119*9c5db199SXin Li 120*9c5db199SXin Li_SUITE_PREFIX_LEN = len('suite:') 121*9c5db199SXin Li 122*9c5db199SXin Li 123*9c5db199SXin Lidef _extract_suites(module: ast.Module) -> List[str]: 124*9c5db199SXin Li stmt = _find_last_global_assignment(module.body, 'ATTRIBUTES') 125*9c5db199SXin Li if stmt is None: 126*9c5db199SXin Li return [] 127*9c5db199SXin Li v = _extract_str_value(stmt) 128*9c5db199SXin Li suites = [] 129*9c5db199SXin Li for attr in v.split(','): 130*9c5db199SXin Li attr = attr.strip() 131*9c5db199SXin Li if attr.startswith('suite:'): 132*9c5db199SXin Li suites.append(attr[_SUITE_PREFIX_LEN:]) 133*9c5db199SXin Li return suites 134*9c5db199SXin Li 135*9c5db199SXin Li 136*9c5db199SXin Lidef _extract_main_package(path: str, module: ast.Module) -> Optional[str]: 137*9c5db199SXin Li fname = _extract_main_file(path, module) 138*9c5db199SXin Li if fname is None: 139*9c5db199SXin Li return None 140*9c5db199SXin Li relpath = os.path.relpath(os.path.dirname(path), _ROOT_DIR) 141*9c5db199SXin Li assert '.' not in relpath 142*9c5db199SXin Li return 'autotest_lib.%s.%s' % (relpath.replace('/', '.'), fname) 143*9c5db199SXin Li 144*9c5db199SXin Li 145*9c5db199SXin Lidef _extract_main_file(path: str, module: ast.Module) -> Optional[str]: 146*9c5db199SXin Li calls = _find_run_test_calls(module) 147*9c5db199SXin Li if not calls: 148*9c5db199SXin Li logging.warning('Found no job.run_test() calls in %s', path) 149*9c5db199SXin Li return None 150*9c5db199SXin Li if len(calls) > 1: 151*9c5db199SXin Li logging.warning('Found %d job.run_test() calls in %s, want 1', len(calls), 152*9c5db199SXin Li path) 153*9c5db199SXin Li return None 154*9c5db199SXin Li return _extract_run_test_target(path, calls[0]) 155*9c5db199SXin Li 156*9c5db199SXin Li 157*9c5db199SXin Lidef _find_run_test_calls(module: ast.Module) -> List[ast.Call]: 158*9c5db199SXin Li calls = [] 159*9c5db199SXin Li for stmt in module.body: 160*9c5db199SXin Li if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call): 161*9c5db199SXin Li call = stmt.value 162*9c5db199SXin Li func = call.func 163*9c5db199SXin Li if (isinstance(func, ast.Attribute) and func.attr == 'run_test' and 164*9c5db199SXin Li isinstance(func.value, ast.Name) and func.value.id == 'job'): 165*9c5db199SXin Li calls.append(call) 166*9c5db199SXin Li return calls 167*9c5db199SXin Li 168*9c5db199SXin Li 169*9c5db199SXin Lidef _extract_run_test_target(path: str, call: ast.Call) -> Optional[str]: 170*9c5db199SXin Li if len(call.args) != 1: 171*9c5db199SXin Li logging.warning('job.run_test() has %d arguments in %s, want 1', 172*9c5db199SXin Li len(call.args), path) 173*9c5db199SXin Li return None 174*9c5db199SXin Li arg = call.args[0] 175*9c5db199SXin Li if not isinstance(arg, ast.Constant): 176*9c5db199SXin Li logging.warning('job.run_test() has a non constant argument in %s', path) 177*9c5db199SXin Li return None 178*9c5db199SXin Li return arg.value 179