1#!/usr/bin/env python3 2# Copyright 2019 The ChromiumOS Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Runs tests for the given input files. 7 8Tries its best to autodetect all tests based on path name without being *too* 9aggressive. 10 11In short, there's a small set of directories in which, if you make any change, 12all of the tests in those directories get run. Additionally, if you change a 13python file named foo, it'll run foo_test.py or foo_unittest.py if either of 14those exist. 15 16All tests are run in parallel. 17""" 18 19# NOTE: An alternative mentioned on the initial CL for this 20# https://chromium-review.googlesource.com/c/chromiumos/third_party/toolchain-utils/+/1516414 21# is pytest. It looks like that brings some complexity (and makes use outside 22# of the chroot a bit more obnoxious?), but might be worth exploring if this 23# starts to grow quite complex on its own. 24 25 26import argparse 27import collections 28import multiprocessing.pool 29import os 30import shlex 31import signal 32import subprocess 33import sys 34from typing import Optional, Tuple 35 36 37TestSpec = collections.namedtuple("TestSpec", ["directory", "command"]) 38 39# List of python scripts that are not test with relative path to 40# toolchain-utils. 41non_test_py_files = { 42 "debug_info_test/debug_info_test.py", 43} 44 45 46def _make_relative_to_toolchain_utils(toolchain_utils, path): 47 """Cleans & makes a path relative to toolchain_utils. 48 49 Raises if that path isn't under toolchain_utils. 50 """ 51 # abspath has the nice property that it removes any markers like './'. 52 as_abs = os.path.abspath(path) 53 result = os.path.relpath(as_abs, start=toolchain_utils) 54 55 if result.startswith("../"): 56 raise ValueError("Non toolchain-utils directory found: %s" % result) 57 return result 58 59 60def _filter_python_tests(test_files, toolchain_utils): 61 """Returns all files that are real python tests.""" 62 python_tests = [] 63 for test_file in test_files: 64 rel_path = _make_relative_to_toolchain_utils(toolchain_utils, test_file) 65 if rel_path not in non_test_py_files: 66 python_tests.append(_python_test_to_spec(test_file)) 67 else: 68 print("## %s ... NON_TEST_PY_FILE" % rel_path) 69 return python_tests 70 71 72def _gather_python_tests_in(rel_subdir, toolchain_utils): 73 """Returns all files that appear to be Python tests in a given directory.""" 74 subdir = os.path.join(toolchain_utils, rel_subdir) 75 test_files = ( 76 os.path.join(subdir, file_name) 77 for file_name in os.listdir(subdir) 78 if file_name.endswith("_test.py") or file_name.endswith("_unittest.py") 79 ) 80 return _filter_python_tests(test_files, toolchain_utils) 81 82 83def _run_test(test_spec: TestSpec, timeout: int) -> Tuple[Optional[int], str]: 84 """Runs a test. 85 86 Returns a tuple indicating the process' exit code, and the combined 87 stdout+stderr of the process. If the exit code is None, the process timed 88 out. 89 """ 90 # Each subprocess gets its own process group, since many of these tests 91 # spawn subprocesses for a variety of reasons. If these tests time out, we 92 # want to be able to clean up all of the children swiftly. 93 # pylint: disable=subprocess-popen-preexec-fn 94 with subprocess.Popen( 95 test_spec.command, 96 cwd=test_spec.directory, 97 stdin=subprocess.DEVNULL, 98 stdout=subprocess.PIPE, 99 stderr=subprocess.STDOUT, 100 encoding="utf-8", 101 # TODO(b/296616854): This is unsafe, and we should use 102 # process_group=0 when we have upgraded to Python 3.11. 103 preexec_fn=lambda: os.setpgid(0, 0), 104 ) as p: 105 child_pgid = p.pid 106 try: 107 out, _ = p.communicate(timeout=timeout) 108 return p.returncode, out 109 except BaseException as e: 110 # Try to shut the processes down gracefully. 111 os.killpg(child_pgid, signal.SIGINT) 112 try: 113 # 2 seconds is arbitrary, but given that these are unittests, 114 # should be plenty of time for them to shut down. 115 p.wait(timeout=2) 116 except subprocess.TimeoutExpired: 117 os.killpg(child_pgid, signal.SIGKILL) 118 except: 119 os.killpg(child_pgid, signal.SIGKILL) 120 raise 121 122 if isinstance(e, subprocess.TimeoutExpired): 123 # We just killed the entire process group. This should complete 124 # ~immediately. If it doesn't, something is very wrong. 125 out, _ = p.communicate(timeout=5) 126 return (None, out) 127 raise 128 129 130def _python_test_to_spec(test_file): 131 """Given a .py file, convert it to a TestSpec.""" 132 # Run tests in the directory they exist in, since some of them are sensitive 133 # to that. 134 test_directory = os.path.dirname(os.path.abspath(test_file)) 135 file_name = os.path.basename(test_file) 136 137 if os.access(test_file, os.X_OK): 138 command = ["./" + file_name] 139 else: 140 # Assume the user wanted py3. 141 command = ["python3", file_name] 142 143 return TestSpec(directory=test_directory, command=command) 144 145 146def _autodetect_python_tests_for(test_file, toolchain_utils): 147 """Given a test file, detect if there may be related tests.""" 148 if not test_file.endswith(".py"): 149 return [] 150 151 test_prefixes = ("test_", "unittest_") 152 test_suffixes = ("_test.py", "_unittest.py") 153 154 test_file_name = os.path.basename(test_file) 155 test_file_is_a_test = any( 156 test_file_name.startswith(x) for x in test_prefixes 157 ) or any(test_file_name.endswith(x) for x in test_suffixes) 158 159 if test_file_is_a_test: 160 test_files = [test_file] 161 else: 162 test_file_no_suffix = test_file[:-3] 163 candidates = [test_file_no_suffix + x for x in test_suffixes] 164 165 dir_name = os.path.dirname(test_file) 166 candidates += ( 167 os.path.join(dir_name, x + test_file_name) for x in test_prefixes 168 ) 169 test_files = (x for x in candidates if os.path.exists(x)) 170 return _filter_python_tests(test_files, toolchain_utils) 171 172 173def _run_test_scripts(pool, all_tests, timeout, show_successful_output=False): 174 """Runs a list of TestSpecs. Returns whether all of them succeeded.""" 175 results = [ 176 pool.apply_async(_run_test, (test, timeout)) for test in all_tests 177 ] 178 179 failures = [] 180 for i, (test, future) in enumerate(zip(all_tests, results)): 181 # Add a bit more spacing between outputs. 182 if show_successful_output and i: 183 print("\n") 184 185 pretty_test = shlex.join(test.command) 186 pretty_directory = os.path.relpath(test.directory) 187 if pretty_directory == ".": 188 test_message = pretty_test 189 else: 190 test_message = "%s in %s/" % (pretty_test, pretty_directory) 191 192 print("## %s ... " % test_message, end="") 193 # Be sure that the users sees which test is running. 194 sys.stdout.flush() 195 196 exit_code, stdout = future.get() 197 if exit_code == 0: 198 print("PASS") 199 is_failure = False 200 else: 201 print("TIMEOUT" if exit_code is None else "FAIL") 202 failures.append(test_message) 203 is_failure = True 204 205 if show_successful_output or is_failure: 206 if stdout: 207 print("-- Stdout:\n", stdout) 208 else: 209 print("-- No stdout was produced.") 210 211 if failures: 212 word = "tests" if len(failures) > 1 else "test" 213 print(f"{len(failures)} {word} failed:") 214 for failure in failures: 215 print(f"\t{failure}") 216 217 return not failures 218 219 220def _compress_list(l): 221 """Removes consecutive duplicate elements from |l|. 222 223 >>> _compress_list([]) 224 [] 225 >>> _compress_list([1, 1]) 226 [1] 227 >>> _compress_list([1, 2, 1]) 228 [1, 2, 1] 229 """ 230 result = [] 231 for e in l: 232 if result and result[-1] == e: 233 continue 234 result.append(e) 235 return result 236 237 238def _fix_python_path(toolchain_utils): 239 pypath = os.environ.get("PYTHONPATH", "") 240 if pypath: 241 pypath = ":" + pypath 242 os.environ["PYTHONPATH"] = toolchain_utils + pypath 243 244 245def _find_forced_subdir_python_tests(test_paths, toolchain_utils): 246 assert all(os.path.isabs(path) for path in test_paths) 247 248 # Directories under toolchain_utils for which any change will cause all 249 # tests in that directory to be rerun. Includes changes in subdirectories. 250 all_dirs = { 251 "crosperf", 252 "cros_utils", 253 } 254 255 relative_paths = [ 256 _make_relative_to_toolchain_utils(toolchain_utils, path) 257 for path in test_paths 258 ] 259 260 gather_test_dirs = set() 261 262 for path in relative_paths: 263 top_level_dir = path.split("/")[0] 264 if top_level_dir in all_dirs: 265 gather_test_dirs.add(top_level_dir) 266 267 results = [] 268 for d in sorted(gather_test_dirs): 269 results += _gather_python_tests_in(d, toolchain_utils) 270 return results 271 272 273def _find_go_tests(test_paths): 274 """Returns TestSpecs for the go folders of the given files""" 275 assert all(os.path.isabs(path) for path in test_paths) 276 277 dirs_with_gofiles = set( 278 os.path.dirname(p) for p in test_paths if p.endswith(".go") 279 ) 280 command = ["go", "test", "-vet=all"] 281 # Note: We sort the directories to be deterministic. 282 return [ 283 TestSpec(directory=d, command=command) 284 for d in sorted(dirs_with_gofiles) 285 ] 286 287 288def main(argv): 289 default_toolchain_utils = os.path.abspath(os.path.dirname(__file__)) 290 291 parser = argparse.ArgumentParser(description=__doc__) 292 parser.add_argument( 293 "--show_all_output", 294 action="store_true", 295 help="show stdout of successful tests", 296 ) 297 parser.add_argument( 298 "--toolchain_utils", 299 default=default_toolchain_utils, 300 help="directory of toolchain-utils. Often auto-detected", 301 ) 302 parser.add_argument( 303 "file", nargs="*", help="a file that we should run tests for" 304 ) 305 parser.add_argument( 306 "--timeout", 307 default=120, 308 type=int, 309 help="Time to allow a test to execute before timing it out, in " 310 "seconds.", 311 ) 312 args = parser.parse_args(argv) 313 314 modified_files = [os.path.abspath(f) for f in args.file] 315 show_all_output = args.show_all_output 316 toolchain_utils = args.toolchain_utils 317 318 if not modified_files: 319 print("No files given. Exit.") 320 return 0 321 322 _fix_python_path(toolchain_utils) 323 324 tests_to_run = _find_forced_subdir_python_tests( 325 modified_files, toolchain_utils 326 ) 327 for f in modified_files: 328 tests_to_run += _autodetect_python_tests_for(f, toolchain_utils) 329 tests_to_run += _find_go_tests(modified_files) 330 331 # TestSpecs have lists, so we can't use a set. We'd likely want to keep them 332 # sorted for determinism anyway. 333 tests_to_run.sort() 334 tests_to_run = _compress_list(tests_to_run) 335 336 with multiprocessing.pool.ThreadPool() as pool: 337 success = _run_test_scripts( 338 pool, tests_to_run, args.timeout, show_all_output 339 ) 340 return 0 if success else 1 341 342 343if __name__ == "__main__": 344 sys.exit(main(sys.argv[1:])) 345