xref: /aosp_15_r20/external/angle/build/android/pylib/local/device/local_device_test_run.py (revision 8975f5c5ed3d1c378011245431ada316dfb6f244)
1# Copyright 2014 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import fnmatch
6import hashlib
7import logging
8import os
9import signal
10try:
11  import _thread as thread
12except ImportError:
13  import thread
14import threading
15
16from devil import base_error
17from devil.android import crash_handler
18from devil.android import device_errors
19from devil.android.sdk import version_codes
20from devil.android.tools import device_recovery
21from devil.utils import signal_handler
22from pylib.base import base_test_result
23from pylib.base import test_collection
24from pylib.base import test_exception
25from pylib.base import test_run
26from pylib.utils import device_dependencies
27from pylib.local.device import local_device_environment
28
29from lib.proto import exception_recorder
30
31
32_SIGTERM_TEST_LOG = (
33  '  Suite execution terminated, probably due to swarming timeout.\n'
34  '  Your test may not have run.')
35
36
37class TestsTerminated(Exception):
38  pass
39
40
41class LocalDeviceTestRun(test_run.TestRun):
42
43  def __init__(self, env, test_instance):
44    super().__init__(env, test_instance)
45    # This is intended to be filled by a child class.
46    self._installed_packages = []
47    env.SetPreferredAbis(test_instance.GetPreferredAbis())
48
49  #override
50  def RunTests(self, results, raw_logs_fh=None):
51    tests = self._GetTests()
52
53    exit_now = threading.Event()
54
55    @local_device_environment.handle_shard_failures
56    def run_tests_on_device(dev, tests, results):
57      # This is performed here instead of during setup because restarting the
58      # device clears app compatibility flags, which will happen if a device
59      # needs to be recovered.
60      SetAppCompatibilityFlagsIfNecessary(self._installed_packages, dev)
61      consecutive_device_errors = 0
62      for test in tests:
63        if not test:
64          logging.warning('No tests in shard. Continuing.')
65          tests.test_completed()
66          continue
67        if exit_now.isSet():
68          thread.exit()
69
70        result = None
71        rerun = None
72        try:
73          result, rerun = crash_handler.RetryOnSystemCrash(
74              lambda d, t=test: self._RunTest(d, t),
75              device=dev)
76          consecutive_device_errors = 0
77          if isinstance(result, base_test_result.BaseTestResult):
78            results.AddResult(result)
79          elif isinstance(result, list):
80            results.AddResults(result)
81          else:
82            raise Exception(
83                'Unexpected result type: %s' % type(result).__name__)
84        except device_errors.CommandTimeoutError as e:
85          exception_recorder.register(e)
86          # Test timeouts don't count as device errors for the purpose
87          # of bad device detection.
88          consecutive_device_errors = 0
89
90          if isinstance(test, list):
91            result_log = ''
92            if len(test) > 1:
93              result_log = ('The test command timed out when running multiple '
94                            'tests including this test. It does not '
95                            'necessarily mean this specific test timed out.')
96              # Ensure instrumentation tests not batched at env level retries.
97              for t in test:
98                # |dict| type infers it's an instrumentation test.
99                if isinstance(t, dict) and t['annotations']:
100                  t['annotations'].pop('Batch', None)
101
102            results.AddResults(
103                base_test_result.BaseTestResult(
104                    self._GetUniqueTestName(t),
105                    base_test_result.ResultType.TIMEOUT,
106                    log=result_log) for t in test)
107          else:
108            results.AddResult(
109                base_test_result.BaseTestResult(
110                    self._GetUniqueTestName(test),
111                    base_test_result.ResultType.TIMEOUT))
112        except device_errors.DeviceUnreachableError as e:
113          exception_recorder.register(e)
114          # If the device is no longer reachable then terminate this
115          # run_tests_on_device call.
116          raise
117        except base_error.BaseError as e:
118          exception_recorder.register(e)
119          # If we get a device error but believe the device is still
120          # reachable, attempt to continue using it.
121          if isinstance(tests, test_collection.TestCollection):
122            rerun = test
123
124          consecutive_device_errors += 1
125          if consecutive_device_errors >= 3:
126            # We believe the device is still reachable and may still be usable,
127            # but if it fails repeatedly, we shouldn't attempt to keep using
128            # it.
129            logging.error('Repeated failures on device %s. Abandoning.',
130                          str(dev))
131            raise
132
133          logging.exception(
134              'Attempting to continue using device %s despite failure (%d/3).',
135              str(dev), consecutive_device_errors)
136
137        finally:
138          if isinstance(tests, test_collection.TestCollection):
139            if rerun:
140              tests.add(rerun)
141            tests.test_completed()
142
143      logging.info('Finished running tests on this device.')
144
145    def stop_tests(_signum, _frame):
146      logging.critical('Received SIGTERM. Stopping test execution.')
147      exit_now.set()
148      raise TestsTerminated()
149
150    try:
151      with signal_handler.AddSignalHandler(signal.SIGTERM, stop_tests):
152        self._env.ResetCurrentTry()
153        while self._env.current_try < self._env.max_tries and tests:
154          tries = self._env.current_try
155          tests = self._SortTests(tests)
156          grouped_tests = self._GroupTestsAfterSharding(tests)
157          logging.info('STARTING TRY #%d/%d', tries + 1, self._env.max_tries)
158          if tries > 0 and self._env.recover_devices:
159            if any(d.build_version_sdk == version_codes.LOLLIPOP_MR1
160                   for d in self._env.devices):
161              logging.info(
162                  'Attempting to recover devices due to known issue on L MR1. '
163                  'See crbug.com/787056 for details.')
164              self._env.parallel_devices.pMap(
165                  device_recovery.RecoverDevice, None)
166            elif tries + 1 == self._env.max_tries:
167              logging.info(
168                  'Attempting to recover devices prior to last test attempt.')
169              self._env.parallel_devices.pMap(
170                  device_recovery.RecoverDevice, None)
171          logging.info(
172              'Will run %d tests, grouped into %d groups, on %d devices: %s',
173              len(tests), len(grouped_tests), len(self._env.devices),
174              ', '.join(str(d) for d in self._env.devices))
175          for t in tests:
176            logging.debug('  %s', t)
177
178          try_results = base_test_result.TestRunResults()
179          test_names = (self._GetUniqueTestName(t) for t in tests)
180          try_results.AddResults(
181              base_test_result.BaseTestResult(
182                  t, base_test_result.ResultType.NOTRUN)
183              for t in test_names if not t.endswith('*'))
184
185          # As soon as we know the names of the tests, we populate |results|.
186          # The tests in try_results will have their results updated by
187          # try_results.AddResult() as they are run.
188          results.append(try_results)
189
190          try:
191            if self._ShouldShardTestsForDevices():
192              tc = test_collection.TestCollection(
193                  self._CreateShardsForDevices(grouped_tests))
194              self._env.parallel_devices.pMap(
195                  run_tests_on_device, tc, try_results).pGet(None)
196            else:
197              self._env.parallel_devices.pMap(run_tests_on_device,
198                                              grouped_tests,
199                                              try_results).pGet(None)
200          except TestsTerminated:
201            for unknown_result in try_results.GetUnknown():
202              try_results.AddResult(
203                  base_test_result.BaseTestResult(
204                      unknown_result.GetName(),
205                      base_test_result.ResultType.TIMEOUT,
206                      log=_SIGTERM_TEST_LOG))
207            raise
208
209          self._env.IncrementCurrentTry()
210          tests = self._GetTestsToRetry(tests, try_results)
211
212          logging.info('FINISHED TRY #%d/%d', tries + 1, self._env.max_tries)
213          if tests:
214            logging.info('%d failed tests remain.', len(tests))
215          else:
216            logging.info('All tests completed.')
217    except TestsTerminated:
218      pass
219
220  def _GetTestsToRetry(self, tests, try_results):
221
222    def is_failure_result(test_result):
223      if isinstance(test_result, list):
224        return any(is_failure_result(r) for r in test_result)
225      return (
226          test_result is None
227          or test_result.GetType() not in (
228              base_test_result.ResultType.PASS,
229              base_test_result.ResultType.SKIP))
230
231    all_test_results = {r.GetName(): r for r in try_results.GetAll()}
232
233    tests_and_names = ((t, self._GetUniqueTestName(t)) for t in tests)
234
235    tests_and_results = {}
236    for test, name in tests_and_names:
237      if name.endswith('*'):
238        tests_and_results[name] = (test, [
239            r for n, r in all_test_results.items() if fnmatch.fnmatch(n, name)
240        ])
241      else:
242        tests_and_results[name] = (test, all_test_results.get(name))
243
244    failed_tests_and_results = ((test, result)
245                                for test, result in tests_and_results.values()
246                                if is_failure_result(result))
247
248    failed_tests = [
249        t for t, r in failed_tests_and_results if self._ShouldRetry(t, r)
250    ]
251    return self._AppendPreTestsForRetry(failed_tests, tests)
252
253  def _ApplyExternalSharding(self, tests, shard_index, total_shards):
254    logging.info('Using external sharding settings. This is shard %d/%d',
255                 shard_index, total_shards)
256
257    if total_shards < 0 or shard_index < 0 or total_shards <= shard_index:
258      raise test_exception.InvalidShardingSettings(shard_index, total_shards)
259
260    sharded_tests = []
261
262    # Sort tests by hash.
263    # TODO(crbug.com/40200835): Add sorting logic back to _PartitionTests.
264    tests = self._SortTests(tests)
265
266    # Group tests by tests that should run in the same test invocation - either
267    # unit tests or batched tests.
268    grouped_tests = self._GroupTests(tests)
269
270    # Partition grouped tests approximately evenly across shards.
271    partitioned_tests = self._PartitionTests(grouped_tests, total_shards,
272                                             float('inf'))
273    if len(partitioned_tests) <= shard_index:
274      return []
275    for t in partitioned_tests[shard_index]:
276      if isinstance(t, list):
277        sharded_tests.extend(t)
278      else:
279        sharded_tests.append(t)
280    return sharded_tests
281
282  # Sort by hash so we don't put all tests in a slow suite in the same
283  # partition.
284  def _SortTests(self, tests):
285    return sorted(tests,
286                  key=lambda t: hashlib.sha256(
287                      self._GetUniqueTestName(t[0] if isinstance(t, list) else t
288                                              ).encode()).hexdigest())
289
290  # Partition tests evenly into |num_desired_partitions| partitions where
291  # possible. However, many constraints make partitioning perfectly impossible.
292  # If the max_partition_size isn't large enough, extra partitions may be
293  # created (infinite max size should always return precisely the desired
294  # number of partitions). Even if the |max_partition_size| is technically large
295  # enough to hold all of the tests in |num_desired_partitions|, we attempt to
296  # keep test order relatively stable to minimize flakes, so when tests are
297  # grouped (eg. batched tests), we cannot perfectly fill all paritions as that
298  # would require breaking up groups.
299  def _PartitionTests(self, tests, num_desired_partitions, max_partition_size):
300    # pylint: disable=no-self-use
301    partitions = []
302
303
304    num_not_yet_allocated = sum(
305        [len(test) - 1 for test in tests if self._CountTestsIndividually(test)])
306    num_not_yet_allocated += len(tests)
307
308    # Fast linear partition approximation capped by max_partition_size. We
309    # cannot round-robin or otherwise re-order tests dynamically because we want
310    # test order to remain stable.
311    partition_size = min(num_not_yet_allocated // num_desired_partitions,
312                         max_partition_size)
313    partitions.append([])
314    last_partition_size = 0
315    for test in tests:
316      test_count = len(test) if self._CountTestsIndividually(test) else 1
317      # Make a new shard whenever we would overfill the previous one. However,
318      # if the size of the test group is larger than the max partition size on
319      # its own, just put the group in its own shard instead of splitting up the
320      # group.
321      # TODO(crbug.com/40200835): Add logic to support PRE_ test recognition but
322      # it may hurt performance in most scenarios. Currently all PRE_ tests are
323      # partitioned into the last shard. Unless the number of PRE_ tests are
324      # larger than the partition size, the PRE_ test may get assigned into a
325      # different shard and cause test failure.
326      if (last_partition_size + test_count > partition_size
327          and last_partition_size > 0):
328        num_desired_partitions -= 1
329        if num_desired_partitions <= 0:
330          # Too many tests for number of partitions, just fill all partitions
331          # beyond num_desired_partitions.
332          partition_size = max_partition_size
333        else:
334          # Re-balance remaining partitions.
335          partition_size = min(num_not_yet_allocated // num_desired_partitions,
336                               max_partition_size)
337        partitions.append([])
338        partitions[-1].append(test)
339        last_partition_size = test_count
340      else:
341        partitions[-1].append(test)
342        last_partition_size += test_count
343
344      num_not_yet_allocated -= test_count
345
346    if not partitions[-1]:
347      partitions.pop()
348    return partitions
349
350  def _CountTestsIndividually(self, test):
351    # pylint: disable=no-self-use
352    if not isinstance(test, list):
353      return False
354    annotations = test[0]['annotations']
355    # UnitTests tests are really fast, so to balance shards better, count
356    # UnitTests Batches as single tests.
357    return ('Batch' not in annotations
358            or annotations['Batch']['value'] != 'UnitTests')
359
360  def _CreateShardsForDevices(self, tests):
361    raise NotImplementedError
362
363  def _GetUniqueTestName(self, test):
364    # pylint: disable=no-self-use
365    return test
366
367  def _ShouldRetry(self, test, result):
368    # pylint: disable=no-self-use,unused-argument
369    return True
370
371  #override
372  def GetTestsForListing(self):
373    ret = self._GetTests()
374    ret = FlattenTestList(ret)
375    ret.sort()
376    return ret
377
378  def GetDataDepsForListing(self):
379    device_root = '$CHROMIUM_TESTS_ROOT'
380    host_device_tuples = self._test_instance.GetDataDependencies()
381    host_device_tuples = device_dependencies.SubstituteDeviceRoot(
382        host_device_tuples, device_root)
383    host_device_tuples = device_dependencies.ExpandDataDependencies(
384        host_device_tuples)
385
386    return sorted(f'{d} <- {os.path.relpath(h)}' for h, d in host_device_tuples)
387
388  def _GetTests(self):
389    raise NotImplementedError
390
391  def _GroupTests(self, tests):
392    # pylint: disable=no-self-use
393    return tests
394
395  def _GroupTestsAfterSharding(self, tests):
396    # pylint: disable=no-self-use
397    return tests
398
399  def _AppendPreTestsForRetry(self, failed_tests, tests):
400    # pylint: disable=no-self-use,unused-argument
401    return failed_tests
402
403  def _RunTest(self, device, test):
404    raise NotImplementedError
405
406  def _ShouldShardTestsForDevices(self):
407    raise NotImplementedError
408
409
410def FlattenTestList(values):
411  """Returns a list with all nested lists (shard groupings) expanded."""
412  ret = []
413  for v in values:
414    if isinstance(v, list):
415      ret += v
416    else:
417      ret.append(v)
418  return ret
419
420
421def SetAppCompatibilityFlagsIfNecessary(packages, device):
422  """Sets app compatibility flags on the given packages and device.
423
424  Args:
425    packages: A list of strings containing package names to apply flags to.
426    device: A DeviceUtils instance to apply the flags on.
427  """
428
429  def set_flag_for_packages(flag, enable):
430    enable_str = 'enable' if enable else 'disable'
431    for p in packages:
432      cmd = ['am', 'compat', enable_str, flag, p]
433      device.RunShellCommand(cmd)
434
435  sdk_version = device.build_version_sdk
436  if sdk_version >= version_codes.R:
437    # These flags are necessary to use the legacy storage permissions on R+.
438    # See crbug.com/1173699 for more information.
439    set_flag_for_packages('DEFAULT_SCOPED_STORAGE', False)
440    set_flag_for_packages('FORCE_ENABLE_SCOPED_STORAGE', False)
441
442
443class NoTestsError(Exception):
444  """Error for when no tests are found."""
445