1# Copyright 2014 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import fnmatch 6import hashlib 7import logging 8import os 9import signal 10try: 11 import _thread as thread 12except ImportError: 13 import thread 14import threading 15 16from devil import base_error 17from devil.android import crash_handler 18from devil.android import device_errors 19from devil.android.sdk import version_codes 20from devil.android.tools import device_recovery 21from devil.utils import signal_handler 22from pylib.base import base_test_result 23from pylib.base import test_collection 24from pylib.base import test_exception 25from pylib.base import test_run 26from pylib.utils import device_dependencies 27from pylib.local.device import local_device_environment 28 29from lib.proto import exception_recorder 30 31 32_SIGTERM_TEST_LOG = ( 33 ' Suite execution terminated, probably due to swarming timeout.\n' 34 ' Your test may not have run.') 35 36 37class TestsTerminated(Exception): 38 pass 39 40 41class LocalDeviceTestRun(test_run.TestRun): 42 43 def __init__(self, env, test_instance): 44 super().__init__(env, test_instance) 45 # This is intended to be filled by a child class. 46 self._installed_packages = [] 47 env.SetPreferredAbis(test_instance.GetPreferredAbis()) 48 49 #override 50 def RunTests(self, results, raw_logs_fh=None): 51 tests = self._GetTests() 52 53 exit_now = threading.Event() 54 55 @local_device_environment.handle_shard_failures 56 def run_tests_on_device(dev, tests, results): 57 # This is performed here instead of during setup because restarting the 58 # device clears app compatibility flags, which will happen if a device 59 # needs to be recovered. 60 SetAppCompatibilityFlagsIfNecessary(self._installed_packages, dev) 61 consecutive_device_errors = 0 62 for test in tests: 63 if not test: 64 logging.warning('No tests in shard. Continuing.') 65 tests.test_completed() 66 continue 67 if exit_now.isSet(): 68 thread.exit() 69 70 result = None 71 rerun = None 72 try: 73 result, rerun = crash_handler.RetryOnSystemCrash( 74 lambda d, t=test: self._RunTest(d, t), 75 device=dev) 76 consecutive_device_errors = 0 77 if isinstance(result, base_test_result.BaseTestResult): 78 results.AddResult(result) 79 elif isinstance(result, list): 80 results.AddResults(result) 81 else: 82 raise Exception( 83 'Unexpected result type: %s' % type(result).__name__) 84 except device_errors.CommandTimeoutError as e: 85 exception_recorder.register(e) 86 # Test timeouts don't count as device errors for the purpose 87 # of bad device detection. 88 consecutive_device_errors = 0 89 90 if isinstance(test, list): 91 result_log = '' 92 if len(test) > 1: 93 result_log = ('The test command timed out when running multiple ' 94 'tests including this test. It does not ' 95 'necessarily mean this specific test timed out.') 96 # Ensure instrumentation tests not batched at env level retries. 97 for t in test: 98 # |dict| type infers it's an instrumentation test. 99 if isinstance(t, dict) and t['annotations']: 100 t['annotations'].pop('Batch', None) 101 102 results.AddResults( 103 base_test_result.BaseTestResult( 104 self._GetUniqueTestName(t), 105 base_test_result.ResultType.TIMEOUT, 106 log=result_log) for t in test) 107 else: 108 results.AddResult( 109 base_test_result.BaseTestResult( 110 self._GetUniqueTestName(test), 111 base_test_result.ResultType.TIMEOUT)) 112 except device_errors.DeviceUnreachableError as e: 113 exception_recorder.register(e) 114 # If the device is no longer reachable then terminate this 115 # run_tests_on_device call. 116 raise 117 except base_error.BaseError as e: 118 exception_recorder.register(e) 119 # If we get a device error but believe the device is still 120 # reachable, attempt to continue using it. 121 if isinstance(tests, test_collection.TestCollection): 122 rerun = test 123 124 consecutive_device_errors += 1 125 if consecutive_device_errors >= 3: 126 # We believe the device is still reachable and may still be usable, 127 # but if it fails repeatedly, we shouldn't attempt to keep using 128 # it. 129 logging.error('Repeated failures on device %s. Abandoning.', 130 str(dev)) 131 raise 132 133 logging.exception( 134 'Attempting to continue using device %s despite failure (%d/3).', 135 str(dev), consecutive_device_errors) 136 137 finally: 138 if isinstance(tests, test_collection.TestCollection): 139 if rerun: 140 tests.add(rerun) 141 tests.test_completed() 142 143 logging.info('Finished running tests on this device.') 144 145 def stop_tests(_signum, _frame): 146 logging.critical('Received SIGTERM. Stopping test execution.') 147 exit_now.set() 148 raise TestsTerminated() 149 150 try: 151 with signal_handler.AddSignalHandler(signal.SIGTERM, stop_tests): 152 self._env.ResetCurrentTry() 153 while self._env.current_try < self._env.max_tries and tests: 154 tries = self._env.current_try 155 tests = self._SortTests(tests) 156 grouped_tests = self._GroupTestsAfterSharding(tests) 157 logging.info('STARTING TRY #%d/%d', tries + 1, self._env.max_tries) 158 if tries > 0 and self._env.recover_devices: 159 if any(d.build_version_sdk == version_codes.LOLLIPOP_MR1 160 for d in self._env.devices): 161 logging.info( 162 'Attempting to recover devices due to known issue on L MR1. ' 163 'See crbug.com/787056 for details.') 164 self._env.parallel_devices.pMap( 165 device_recovery.RecoverDevice, None) 166 elif tries + 1 == self._env.max_tries: 167 logging.info( 168 'Attempting to recover devices prior to last test attempt.') 169 self._env.parallel_devices.pMap( 170 device_recovery.RecoverDevice, None) 171 logging.info( 172 'Will run %d tests, grouped into %d groups, on %d devices: %s', 173 len(tests), len(grouped_tests), len(self._env.devices), 174 ', '.join(str(d) for d in self._env.devices)) 175 for t in tests: 176 logging.debug(' %s', t) 177 178 try_results = base_test_result.TestRunResults() 179 test_names = (self._GetUniqueTestName(t) for t in tests) 180 try_results.AddResults( 181 base_test_result.BaseTestResult( 182 t, base_test_result.ResultType.NOTRUN) 183 for t in test_names if not t.endswith('*')) 184 185 # As soon as we know the names of the tests, we populate |results|. 186 # The tests in try_results will have their results updated by 187 # try_results.AddResult() as they are run. 188 results.append(try_results) 189 190 try: 191 if self._ShouldShardTestsForDevices(): 192 tc = test_collection.TestCollection( 193 self._CreateShardsForDevices(grouped_tests)) 194 self._env.parallel_devices.pMap( 195 run_tests_on_device, tc, try_results).pGet(None) 196 else: 197 self._env.parallel_devices.pMap(run_tests_on_device, 198 grouped_tests, 199 try_results).pGet(None) 200 except TestsTerminated: 201 for unknown_result in try_results.GetUnknown(): 202 try_results.AddResult( 203 base_test_result.BaseTestResult( 204 unknown_result.GetName(), 205 base_test_result.ResultType.TIMEOUT, 206 log=_SIGTERM_TEST_LOG)) 207 raise 208 209 self._env.IncrementCurrentTry() 210 tests = self._GetTestsToRetry(tests, try_results) 211 212 logging.info('FINISHED TRY #%d/%d', tries + 1, self._env.max_tries) 213 if tests: 214 logging.info('%d failed tests remain.', len(tests)) 215 else: 216 logging.info('All tests completed.') 217 except TestsTerminated: 218 pass 219 220 def _GetTestsToRetry(self, tests, try_results): 221 222 def is_failure_result(test_result): 223 if isinstance(test_result, list): 224 return any(is_failure_result(r) for r in test_result) 225 return ( 226 test_result is None 227 or test_result.GetType() not in ( 228 base_test_result.ResultType.PASS, 229 base_test_result.ResultType.SKIP)) 230 231 all_test_results = {r.GetName(): r for r in try_results.GetAll()} 232 233 tests_and_names = ((t, self._GetUniqueTestName(t)) for t in tests) 234 235 tests_and_results = {} 236 for test, name in tests_and_names: 237 if name.endswith('*'): 238 tests_and_results[name] = (test, [ 239 r for n, r in all_test_results.items() if fnmatch.fnmatch(n, name) 240 ]) 241 else: 242 tests_and_results[name] = (test, all_test_results.get(name)) 243 244 failed_tests_and_results = ((test, result) 245 for test, result in tests_and_results.values() 246 if is_failure_result(result)) 247 248 failed_tests = [ 249 t for t, r in failed_tests_and_results if self._ShouldRetry(t, r) 250 ] 251 return self._AppendPreTestsForRetry(failed_tests, tests) 252 253 def _ApplyExternalSharding(self, tests, shard_index, total_shards): 254 logging.info('Using external sharding settings. This is shard %d/%d', 255 shard_index, total_shards) 256 257 if total_shards < 0 or shard_index < 0 or total_shards <= shard_index: 258 raise test_exception.InvalidShardingSettings(shard_index, total_shards) 259 260 sharded_tests = [] 261 262 # Sort tests by hash. 263 # TODO(crbug.com/40200835): Add sorting logic back to _PartitionTests. 264 tests = self._SortTests(tests) 265 266 # Group tests by tests that should run in the same test invocation - either 267 # unit tests or batched tests. 268 grouped_tests = self._GroupTests(tests) 269 270 # Partition grouped tests approximately evenly across shards. 271 partitioned_tests = self._PartitionTests(grouped_tests, total_shards, 272 float('inf')) 273 if len(partitioned_tests) <= shard_index: 274 return [] 275 for t in partitioned_tests[shard_index]: 276 if isinstance(t, list): 277 sharded_tests.extend(t) 278 else: 279 sharded_tests.append(t) 280 return sharded_tests 281 282 # Sort by hash so we don't put all tests in a slow suite in the same 283 # partition. 284 def _SortTests(self, tests): 285 return sorted(tests, 286 key=lambda t: hashlib.sha256( 287 self._GetUniqueTestName(t[0] if isinstance(t, list) else t 288 ).encode()).hexdigest()) 289 290 # Partition tests evenly into |num_desired_partitions| partitions where 291 # possible. However, many constraints make partitioning perfectly impossible. 292 # If the max_partition_size isn't large enough, extra partitions may be 293 # created (infinite max size should always return precisely the desired 294 # number of partitions). Even if the |max_partition_size| is technically large 295 # enough to hold all of the tests in |num_desired_partitions|, we attempt to 296 # keep test order relatively stable to minimize flakes, so when tests are 297 # grouped (eg. batched tests), we cannot perfectly fill all paritions as that 298 # would require breaking up groups. 299 def _PartitionTests(self, tests, num_desired_partitions, max_partition_size): 300 # pylint: disable=no-self-use 301 partitions = [] 302 303 304 num_not_yet_allocated = sum( 305 [len(test) - 1 for test in tests if self._CountTestsIndividually(test)]) 306 num_not_yet_allocated += len(tests) 307 308 # Fast linear partition approximation capped by max_partition_size. We 309 # cannot round-robin or otherwise re-order tests dynamically because we want 310 # test order to remain stable. 311 partition_size = min(num_not_yet_allocated // num_desired_partitions, 312 max_partition_size) 313 partitions.append([]) 314 last_partition_size = 0 315 for test in tests: 316 test_count = len(test) if self._CountTestsIndividually(test) else 1 317 # Make a new shard whenever we would overfill the previous one. However, 318 # if the size of the test group is larger than the max partition size on 319 # its own, just put the group in its own shard instead of splitting up the 320 # group. 321 # TODO(crbug.com/40200835): Add logic to support PRE_ test recognition but 322 # it may hurt performance in most scenarios. Currently all PRE_ tests are 323 # partitioned into the last shard. Unless the number of PRE_ tests are 324 # larger than the partition size, the PRE_ test may get assigned into a 325 # different shard and cause test failure. 326 if (last_partition_size + test_count > partition_size 327 and last_partition_size > 0): 328 num_desired_partitions -= 1 329 if num_desired_partitions <= 0: 330 # Too many tests for number of partitions, just fill all partitions 331 # beyond num_desired_partitions. 332 partition_size = max_partition_size 333 else: 334 # Re-balance remaining partitions. 335 partition_size = min(num_not_yet_allocated // num_desired_partitions, 336 max_partition_size) 337 partitions.append([]) 338 partitions[-1].append(test) 339 last_partition_size = test_count 340 else: 341 partitions[-1].append(test) 342 last_partition_size += test_count 343 344 num_not_yet_allocated -= test_count 345 346 if not partitions[-1]: 347 partitions.pop() 348 return partitions 349 350 def _CountTestsIndividually(self, test): 351 # pylint: disable=no-self-use 352 if not isinstance(test, list): 353 return False 354 annotations = test[0]['annotations'] 355 # UnitTests tests are really fast, so to balance shards better, count 356 # UnitTests Batches as single tests. 357 return ('Batch' not in annotations 358 or annotations['Batch']['value'] != 'UnitTests') 359 360 def _CreateShardsForDevices(self, tests): 361 raise NotImplementedError 362 363 def _GetUniqueTestName(self, test): 364 # pylint: disable=no-self-use 365 return test 366 367 def _ShouldRetry(self, test, result): 368 # pylint: disable=no-self-use,unused-argument 369 return True 370 371 #override 372 def GetTestsForListing(self): 373 ret = self._GetTests() 374 ret = FlattenTestList(ret) 375 ret.sort() 376 return ret 377 378 def GetDataDepsForListing(self): 379 device_root = '$CHROMIUM_TESTS_ROOT' 380 host_device_tuples = self._test_instance.GetDataDependencies() 381 host_device_tuples = device_dependencies.SubstituteDeviceRoot( 382 host_device_tuples, device_root) 383 host_device_tuples = device_dependencies.ExpandDataDependencies( 384 host_device_tuples) 385 386 return sorted(f'{d} <- {os.path.relpath(h)}' for h, d in host_device_tuples) 387 388 def _GetTests(self): 389 raise NotImplementedError 390 391 def _GroupTests(self, tests): 392 # pylint: disable=no-self-use 393 return tests 394 395 def _GroupTestsAfterSharding(self, tests): 396 # pylint: disable=no-self-use 397 return tests 398 399 def _AppendPreTestsForRetry(self, failed_tests, tests): 400 # pylint: disable=no-self-use,unused-argument 401 return failed_tests 402 403 def _RunTest(self, device, test): 404 raise NotImplementedError 405 406 def _ShouldShardTestsForDevices(self): 407 raise NotImplementedError 408 409 410def FlattenTestList(values): 411 """Returns a list with all nested lists (shard groupings) expanded.""" 412 ret = [] 413 for v in values: 414 if isinstance(v, list): 415 ret += v 416 else: 417 ret.append(v) 418 return ret 419 420 421def SetAppCompatibilityFlagsIfNecessary(packages, device): 422 """Sets app compatibility flags on the given packages and device. 423 424 Args: 425 packages: A list of strings containing package names to apply flags to. 426 device: A DeviceUtils instance to apply the flags on. 427 """ 428 429 def set_flag_for_packages(flag, enable): 430 enable_str = 'enable' if enable else 'disable' 431 for p in packages: 432 cmd = ['am', 'compat', enable_str, flag, p] 433 device.RunShellCommand(cmd) 434 435 sdk_version = device.build_version_sdk 436 if sdk_version >= version_codes.R: 437 # These flags are necessary to use the legacy storage permissions on R+. 438 # See crbug.com/1173699 for more information. 439 set_flag_for_packages('DEFAULT_SCOPED_STORAGE', False) 440 set_flag_for_packages('FORCE_ENABLE_SCOPED_STORAGE', False) 441 442 443class NoTestsError(Exception): 444 """Error for when no tests are found.""" 445