1#! /usr/bin/env python 2# Copyright 2014 The Chromium Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Unit tests for the contents of parallelizer.py.""" 6 7# pylint: disable=protected-access 8# pylint: disable=unused-argument 9 10import contextlib 11import os 12import tempfile 13import time 14import sys 15import unittest 16 17if __name__ == '__main__': 18 sys.path.append( 19 os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) 20 21from devil.utils import parallelizer 22from devil.base_error import BaseError 23 24 25class ParallelizerTestObject(object): 26 """Class used to test parallelizer.Parallelizer.""" 27 28 parallel = parallelizer.Parallelizer 29 30 def __init__(self, thing, completion_file_name=None): 31 self._thing = thing 32 self._completion_file_name = completion_file_name 33 self.helper = ParallelizerTestObjectHelper(thing) 34 35 @staticmethod 36 def doReturn(what): 37 return what 38 39 @classmethod 40 def doRaise(cls, what): 41 raise what 42 43 def doSetTheThing(self, new_thing): 44 self._thing = new_thing 45 46 def doReturnTheThing(self): 47 return self._thing 48 49 def doRaiseTheThing(self): 50 raise self._thing 51 52 def doRaiseIfExceptionElseSleepFor(self, sleep_duration): 53 if isinstance(self._thing, Exception): 54 raise self._thing 55 time.sleep(sleep_duration) 56 self._write_completion_file() 57 return self._thing 58 59 def _write_completion_file(self): 60 if self._completion_file_name and len(self._completion_file_name): 61 with open(self._completion_file_name, 'w+b') as completion_file: 62 completion_file.write(b'complete') 63 64 def __getitem__(self, index): 65 return self._thing[index] 66 67 def __str__(self): 68 return type(self).__name__ 69 70 71class ParallelizerTestObjectHelper(object): 72 def __init__(self, thing): 73 self._thing = thing 74 75 def doReturnStringThing(self): 76 return str(self._thing) 77 78 79class ParallelizerTest(unittest.TestCase): 80 def testInitEmptyList(self): 81 r = parallelizer.Parallelizer([]).replace('a', 'b').pGet(0.1) 82 self.assertEquals([], r) 83 84 def testMethodCall(self): 85 test_data = ['abc_foo', 'def_foo', 'ghi_foo'] 86 expected = ['abc_bar', 'def_bar', 'ghi_bar'] 87 r = parallelizer.Parallelizer(test_data).replace('_foo', '_bar').pGet(0.1) 88 self.assertEquals(expected, r) 89 90 def testMutate(self): 91 devices = [ParallelizerTestObject(True) for _ in range(0, 10)] 92 self.assertTrue(all(d.doReturnTheThing() for d in devices)) 93 ParallelizerTestObject.parallel(devices).doSetTheThing(False).pFinish(1) 94 self.assertTrue(not any(d.doReturnTheThing() for d in devices)) 95 96 def testAllReturn(self): 97 devices = [ParallelizerTestObject(True) for _ in range(0, 10)] 98 results = ParallelizerTestObject.parallel(devices).doReturnTheThing().pGet( 99 1) 100 self.assertTrue(isinstance(results, list)) 101 self.assertEquals(10, len(results)) 102 self.assertTrue(all(results)) 103 104 def testAllRaise(self): 105 devices = [ 106 ParallelizerTestObject(Exception('thing %d' % i)) 107 for i in range(0, 10) 108 ] 109 p = ParallelizerTestObject.parallel(devices).doRaiseTheThing() 110 with self.assertRaises(Exception): 111 p.pGet(1) 112 113 def testOneFailOthersComplete(self): 114 parallel_device_count = 10 115 exception_index = 7 116 exception_msg = 'thing %d' % exception_index 117 118 try: 119 completion_files = [ 120 tempfile.NamedTemporaryFile(delete=False) 121 for _ in range(0, parallel_device_count) 122 ] 123 devices = [ 124 ParallelizerTestObject( 125 i if i != exception_index else BaseError(exception_msg), 126 completion_files[i].name) 127 for i in range(0, parallel_device_count) 128 ] 129 for f in completion_files: 130 f.close() 131 p = ParallelizerTestObject.parallel(devices) 132 with self.assertRaises(BaseError) as e: 133 p.doRaiseIfExceptionElseSleepFor(2).pGet(3) 134 self.assertTrue(exception_msg in str(e.exception)) 135 for i in range(0, parallel_device_count): 136 with open(completion_files[i].name) as f: 137 if i == exception_index: 138 self.assertEquals('', f.read()) 139 else: 140 self.assertEquals('complete', f.read()) 141 finally: 142 for f in completion_files: 143 os.remove(f.name) 144 145 def testReusable(self): 146 devices = [ParallelizerTestObject(True) for _ in range(0, 10)] 147 p = ParallelizerTestObject.parallel(devices) 148 results = p.doReturn(True).pGet(1) 149 self.assertTrue(all(results)) 150 results = p.doReturn(True).pGet(1) 151 self.assertTrue(all(results)) 152 with self.assertRaises(Exception): 153 results = p.doRaise(Exception('reusableTest')).pGet(1) 154 155 def testContained(self): 156 devices = [ParallelizerTestObject(i) for i in range(0, 10)] 157 results = (ParallelizerTestObject.parallel(devices).helper. 158 doReturnStringThing().pGet(1)) 159 self.assertTrue(isinstance(results, list)) 160 self.assertEquals(10, len(results)) 161 for i in range(0, 10): 162 self.assertEquals(str(i), results[i]) 163 164 def testGetItem(self): 165 devices = [ParallelizerTestObject(range(i, i + 10)) for i in range(0, 10)] 166 results = ParallelizerTestObject.parallel(devices)[9].pGet(1) 167 self.assertEquals(list(range(9, 19)), results) 168 169 170class SyncParallelizerTest(unittest.TestCase): 171 def testContextManager(self): 172 in_context = [False for i in range(10)] 173 174 @contextlib.contextmanager 175 def enter_into_context(i): 176 in_context[i] = True 177 try: 178 yield 179 finally: 180 in_context[i] = False 181 182 parallelized_context = parallelizer.SyncParallelizer( 183 [enter_into_context(i) for i in range(10)]) 184 185 with parallelized_context: 186 self.assertTrue(all(in_context)) 187 self.assertFalse(any(in_context)) 188 189 190if __name__ == '__main__': 191 unittest.main(verbosity=2) 192