xref: /aosp_15_r20/external/autotest/metadata/utils/control_files.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python3
2*9c5db199SXin Li# Copyright 2020 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li
6*9c5db199SXin Liimport ast
7*9c5db199SXin Liimport collections
8*9c5db199SXin Liimport glob
9*9c5db199SXin Liimport logging
10*9c5db199SXin Liimport os
11*9c5db199SXin Liimport pdb
12*9c5db199SXin Lifrom typing import Generator, List, Optional, Tuple
13*9c5db199SXin Li
14*9c5db199SXin Li
15*9c5db199SXin Liclass ControlFileError(Exception):
16*9c5db199SXin Li  """Generic error from this package."""
17*9c5db199SXin Li
18*9c5db199SXin Li
19*9c5db199SXin LiControl = collections.namedtuple('Control',
20*9c5db199SXin Li                                 'path, category, name, suites, main_package')
21*9c5db199SXin Li
22*9c5db199SXin Li
23*9c5db199SXin Lidef load_all() -> List[Control]:
24*9c5db199SXin Li  controls = []
25*9c5db199SXin Li  for path in _enumerate_files():
26*9c5db199SXin Li    logging.debug('Processing %s', path)
27*9c5db199SXin Li    control = _load_one(path)
28*9c5db199SXin Li    logging.debug('  -> %s', control)
29*9c5db199SXin Li    controls.append(control)
30*9c5db199SXin Li  return controls
31*9c5db199SXin Li
32*9c5db199SXin Li
33*9c5db199SXin Li_ROOT_DIR = os.path.realpath(
34*9c5db199SXin Li    os.path.join(os.path.realpath(__file__), "../../.."))
35*9c5db199SXin Li
36*9c5db199SXin Li
37*9c5db199SXin Lidef _enumerate_files() -> Generator[str, None, None]:
38*9c5db199SXin Li  for ttype in ['client', 'server']:
39*9c5db199SXin Li    tpath = os.path.join(_ROOT_DIR, ttype)
40*9c5db199SXin Li    for path in glob.iglob(tpath + '/site_tests/*/control*'):
41*9c5db199SXin Li      # There are some text files with names like control_sequence.txt
42*9c5db199SXin Li      _, ext = os.path.splitext(path)
43*9c5db199SXin Li      if ext:
44*9c5db199SXin Li        continue
45*9c5db199SXin Li      yield path
46*9c5db199SXin Li
47*9c5db199SXin Li
48*9c5db199SXin Lidef _load_one(path: str) -> Control:
49*9c5db199SXin Li  with open(path) as r:
50*9c5db199SXin Li    text = r.read()
51*9c5db199SXin Li  module = ast.parse(text)
52*9c5db199SXin Li  name = _extract_name(module)
53*9c5db199SXin Li  category, name = _categorize_name(name)
54*9c5db199SXin Li  return Control(
55*9c5db199SXin Li      path=path,
56*9c5db199SXin Li      category=category,
57*9c5db199SXin Li      name=name,
58*9c5db199SXin Li      suites=_extract_suites(module),
59*9c5db199SXin Li      main_package=_extract_main_package(path, module) or '',
60*9c5db199SXin Li  )
61*9c5db199SXin Li
62*9c5db199SXin Li
63*9c5db199SXin Lidef _extract_name(module: ast.Module) -> Optional[str]:
64*9c5db199SXin Li  stmt = _find_last_global_assignment(module.body, 'NAME')
65*9c5db199SXin Li  if stmt is None:
66*9c5db199SXin Li    raise ControlFileError('No global NAME assignment')
67*9c5db199SXin Li  name = _extract_str_value(stmt)
68*9c5db199SXin Li  if not name:
69*9c5db199SXin Li    raise ControlFileError('Empty value')
70*9c5db199SXin Li  return name
71*9c5db199SXin Li
72*9c5db199SXin Li
73*9c5db199SXin Lidef _find_last_global_assignment(stmts: List[ast.Assign],
74*9c5db199SXin Li                                 name: str) -> Optional[ast.Assign]:
75*9c5db199SXin Li  found = _find_global_assignments(stmts, name)
76*9c5db199SXin Li  if len(found) > 0:
77*9c5db199SXin Li    return found[-1]
78*9c5db199SXin Li  return None
79*9c5db199SXin Li
80*9c5db199SXin Li
81*9c5db199SXin Lidef _find_global_assignments(stmts: List[ast.Assign],
82*9c5db199SXin Li                             name: str) -> List[ast.Assign]:
83*9c5db199SXin Li  found = []
84*9c5db199SXin Li  for stmt in stmts:
85*9c5db199SXin Li    if isinstance(stmt, ast.Assign) and _contains_name(stmt.targets, name):
86*9c5db199SXin Li      found.append(stmt)
87*9c5db199SXin Li  return found
88*9c5db199SXin Li
89*9c5db199SXin Li
90*9c5db199SXin Lidef _contains_name(targets: List[ast.Expr], want: str) -> bool:
91*9c5db199SXin Li  for target in targets:
92*9c5db199SXin Li    if not isinstance(target, ast.Name):
93*9c5db199SXin Li      # We do not support complex lvalues.
94*9c5db199SXin Li      # In particular, multi-valued assignments are not handled properly.
95*9c5db199SXin Li      continue
96*9c5db199SXin Li    name: ast.Name = target
97*9c5db199SXin Li    if name.id == want:
98*9c5db199SXin Li      return True
99*9c5db199SXin Li  return False
100*9c5db199SXin Li
101*9c5db199SXin Li
102*9c5db199SXin Lidef _extract_str_value(stmt: ast.Assign) -> str:
103*9c5db199SXin Li  if not isinstance(stmt.value, ast.Constant):
104*9c5db199SXin Li    raise ControlFileError(
105*9c5db199SXin Li        'Name assignment value is of type %s, want ast.Constant' %
106*9c5db199SXin Li        type(stmt.value))
107*9c5db199SXin Li  v = str(stmt.value.value)
108*9c5db199SXin Li  return v
109*9c5db199SXin Li
110*9c5db199SXin Li
111*9c5db199SXin Lidef _categorize_name(name: str) -> Tuple[str, str]:
112*9c5db199SXin Li  parts = name.split('_', 1)
113*9c5db199SXin Li  if len(parts) == 2:
114*9c5db199SXin Li    category, rest = parts[0], parts[1]
115*9c5db199SXin Li  else:
116*9c5db199SXin Li    category, rest = '', parts[0]
117*9c5db199SXin Li  return category, rest
118*9c5db199SXin Li
119*9c5db199SXin Li
120*9c5db199SXin Li_SUITE_PREFIX_LEN = len('suite:')
121*9c5db199SXin Li
122*9c5db199SXin Li
123*9c5db199SXin Lidef _extract_suites(module: ast.Module) -> List[str]:
124*9c5db199SXin Li  stmt = _find_last_global_assignment(module.body, 'ATTRIBUTES')
125*9c5db199SXin Li  if stmt is None:
126*9c5db199SXin Li    return []
127*9c5db199SXin Li  v = _extract_str_value(stmt)
128*9c5db199SXin Li  suites = []
129*9c5db199SXin Li  for attr in v.split(','):
130*9c5db199SXin Li    attr = attr.strip()
131*9c5db199SXin Li    if attr.startswith('suite:'):
132*9c5db199SXin Li      suites.append(attr[_SUITE_PREFIX_LEN:])
133*9c5db199SXin Li  return suites
134*9c5db199SXin Li
135*9c5db199SXin Li
136*9c5db199SXin Lidef _extract_main_package(path: str, module: ast.Module) -> Optional[str]:
137*9c5db199SXin Li  fname = _extract_main_file(path, module)
138*9c5db199SXin Li  if fname is None:
139*9c5db199SXin Li    return None
140*9c5db199SXin Li  relpath = os.path.relpath(os.path.dirname(path), _ROOT_DIR)
141*9c5db199SXin Li  assert '.' not in relpath
142*9c5db199SXin Li  return 'autotest_lib.%s.%s' % (relpath.replace('/', '.'), fname)
143*9c5db199SXin Li
144*9c5db199SXin Li
145*9c5db199SXin Lidef _extract_main_file(path: str, module: ast.Module) -> Optional[str]:
146*9c5db199SXin Li  calls = _find_run_test_calls(module)
147*9c5db199SXin Li  if not calls:
148*9c5db199SXin Li    logging.warning('Found no job.run_test() calls in %s', path)
149*9c5db199SXin Li    return None
150*9c5db199SXin Li  if len(calls) > 1:
151*9c5db199SXin Li    logging.warning('Found %d job.run_test() calls in %s, want 1', len(calls),
152*9c5db199SXin Li                    path)
153*9c5db199SXin Li    return None
154*9c5db199SXin Li  return _extract_run_test_target(path, calls[0])
155*9c5db199SXin Li
156*9c5db199SXin Li
157*9c5db199SXin Lidef _find_run_test_calls(module: ast.Module) -> List[ast.Call]:
158*9c5db199SXin Li  calls = []
159*9c5db199SXin Li  for stmt in module.body:
160*9c5db199SXin Li    if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call):
161*9c5db199SXin Li      call = stmt.value
162*9c5db199SXin Li      func = call.func
163*9c5db199SXin Li      if (isinstance(func, ast.Attribute) and func.attr == 'run_test' and
164*9c5db199SXin Li          isinstance(func.value, ast.Name) and func.value.id == 'job'):
165*9c5db199SXin Li        calls.append(call)
166*9c5db199SXin Li  return calls
167*9c5db199SXin Li
168*9c5db199SXin Li
169*9c5db199SXin Lidef _extract_run_test_target(path: str, call: ast.Call) -> Optional[str]:
170*9c5db199SXin Li  if len(call.args) != 1:
171*9c5db199SXin Li    logging.warning('job.run_test() has %d arguments in %s, want 1',
172*9c5db199SXin Li                    len(call.args), path)
173*9c5db199SXin Li    return None
174*9c5db199SXin Li  arg = call.args[0]
175*9c5db199SXin Li  if not isinstance(arg, ast.Constant):
176*9c5db199SXin Li    logging.warning('job.run_test() has a non constant argument in %s', path)
177*9c5db199SXin Li    return None
178*9c5db199SXin Li  return arg.value
179