xref: /aosp_15_r20/external/chromium-trace/catapult/devil/devil/utils/parallelizer_test.py (revision 1fa4b3da657c0e9ad43c0220bacf9731820715a5)
1#! /usr/bin/env python
2# Copyright 2014 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Unit tests for the contents of parallelizer.py."""
6
7# pylint: disable=protected-access
8# pylint: disable=unused-argument
9
10import contextlib
11import os
12import tempfile
13import time
14import sys
15import unittest
16
17if __name__ == '__main__':
18  sys.path.append(
19      os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
20
21from devil.utils import parallelizer
22from devil.base_error import BaseError
23
24
25class ParallelizerTestObject(object):
26  """Class used to test parallelizer.Parallelizer."""
27
28  parallel = parallelizer.Parallelizer
29
30  def __init__(self, thing, completion_file_name=None):
31    self._thing = thing
32    self._completion_file_name = completion_file_name
33    self.helper = ParallelizerTestObjectHelper(thing)
34
35  @staticmethod
36  def doReturn(what):
37    return what
38
39  @classmethod
40  def doRaise(cls, what):
41    raise what
42
43  def doSetTheThing(self, new_thing):
44    self._thing = new_thing
45
46  def doReturnTheThing(self):
47    return self._thing
48
49  def doRaiseTheThing(self):
50    raise self._thing
51
52  def doRaiseIfExceptionElseSleepFor(self, sleep_duration):
53    if isinstance(self._thing, Exception):
54      raise self._thing
55    time.sleep(sleep_duration)
56    self._write_completion_file()
57    return self._thing
58
59  def _write_completion_file(self):
60    if self._completion_file_name and len(self._completion_file_name):
61      with open(self._completion_file_name, 'w+b') as completion_file:
62        completion_file.write(b'complete')
63
64  def __getitem__(self, index):
65    return self._thing[index]
66
67  def __str__(self):
68    return type(self).__name__
69
70
71class ParallelizerTestObjectHelper(object):
72  def __init__(self, thing):
73    self._thing = thing
74
75  def doReturnStringThing(self):
76    return str(self._thing)
77
78
79class ParallelizerTest(unittest.TestCase):
80  def testInitEmptyList(self):
81    r = parallelizer.Parallelizer([]).replace('a', 'b').pGet(0.1)
82    self.assertEquals([], r)
83
84  def testMethodCall(self):
85    test_data = ['abc_foo', 'def_foo', 'ghi_foo']
86    expected = ['abc_bar', 'def_bar', 'ghi_bar']
87    r = parallelizer.Parallelizer(test_data).replace('_foo', '_bar').pGet(0.1)
88    self.assertEquals(expected, r)
89
90  def testMutate(self):
91    devices = [ParallelizerTestObject(True) for _ in range(0, 10)]
92    self.assertTrue(all(d.doReturnTheThing() for d in devices))
93    ParallelizerTestObject.parallel(devices).doSetTheThing(False).pFinish(1)
94    self.assertTrue(not any(d.doReturnTheThing() for d in devices))
95
96  def testAllReturn(self):
97    devices = [ParallelizerTestObject(True) for _ in range(0, 10)]
98    results = ParallelizerTestObject.parallel(devices).doReturnTheThing().pGet(
99        1)
100    self.assertTrue(isinstance(results, list))
101    self.assertEquals(10, len(results))
102    self.assertTrue(all(results))
103
104  def testAllRaise(self):
105    devices = [
106        ParallelizerTestObject(Exception('thing %d' % i))
107        for i in range(0, 10)
108    ]
109    p = ParallelizerTestObject.parallel(devices).doRaiseTheThing()
110    with self.assertRaises(Exception):
111      p.pGet(1)
112
113  def testOneFailOthersComplete(self):
114    parallel_device_count = 10
115    exception_index = 7
116    exception_msg = 'thing %d' % exception_index
117
118    try:
119      completion_files = [
120          tempfile.NamedTemporaryFile(delete=False)
121          for _ in range(0, parallel_device_count)
122      ]
123      devices = [
124          ParallelizerTestObject(
125              i if i != exception_index else BaseError(exception_msg),
126              completion_files[i].name)
127          for i in range(0, parallel_device_count)
128      ]
129      for f in completion_files:
130        f.close()
131      p = ParallelizerTestObject.parallel(devices)
132      with self.assertRaises(BaseError) as e:
133        p.doRaiseIfExceptionElseSleepFor(2).pGet(3)
134      self.assertTrue(exception_msg in str(e.exception))
135      for i in range(0, parallel_device_count):
136        with open(completion_files[i].name) as f:
137          if i == exception_index:
138            self.assertEquals('', f.read())
139          else:
140            self.assertEquals('complete', f.read())
141    finally:
142      for f in completion_files:
143        os.remove(f.name)
144
145  def testReusable(self):
146    devices = [ParallelizerTestObject(True) for _ in range(0, 10)]
147    p = ParallelizerTestObject.parallel(devices)
148    results = p.doReturn(True).pGet(1)
149    self.assertTrue(all(results))
150    results = p.doReturn(True).pGet(1)
151    self.assertTrue(all(results))
152    with self.assertRaises(Exception):
153      results = p.doRaise(Exception('reusableTest')).pGet(1)
154
155  def testContained(self):
156    devices = [ParallelizerTestObject(i) for i in range(0, 10)]
157    results = (ParallelizerTestObject.parallel(devices).helper.
158               doReturnStringThing().pGet(1))
159    self.assertTrue(isinstance(results, list))
160    self.assertEquals(10, len(results))
161    for i in range(0, 10):
162      self.assertEquals(str(i), results[i])
163
164  def testGetItem(self):
165    devices = [ParallelizerTestObject(range(i, i + 10)) for i in range(0, 10)]
166    results = ParallelizerTestObject.parallel(devices)[9].pGet(1)
167    self.assertEquals(list(range(9, 19)), results)
168
169
170class SyncParallelizerTest(unittest.TestCase):
171  def testContextManager(self):
172    in_context = [False for i in range(10)]
173
174    @contextlib.contextmanager
175    def enter_into_context(i):
176      in_context[i] = True
177      try:
178        yield
179      finally:
180        in_context[i] = False
181
182    parallelized_context = parallelizer.SyncParallelizer(
183        [enter_into_context(i) for i in range(10)])
184
185    with parallelized_context:
186      self.assertTrue(all(in_context))
187    self.assertFalse(any(in_context))
188
189
190if __name__ == '__main__':
191  unittest.main(verbosity=2)
192