1# Copyright 2020 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Helpers related to multiprocessing. 5 6Based on: //tools/binary_size/libsupersize/parallel.py 7""" 8 9import atexit 10import logging 11import multiprocessing 12import os 13import sys 14import threading 15import traceback 16 17DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1' 18if DISABLE_ASYNC: 19 logging.warning('Running in synchronous mode.') 20 21_all_pools = None 22_is_child_process = False 23_silence_exceptions = False 24 25# Used to pass parameters to forked processes without pickling. 26_fork_params = None 27_fork_kwargs = None 28 29 30class _ImmediateResult: 31 def __init__(self, value): 32 self._value = value 33 34 def get(self): 35 return self._value 36 37 def wait(self): 38 pass 39 40 def ready(self): 41 return True 42 43 def successful(self): 44 return True 45 46 47class _ExceptionWrapper: 48 """Used to marshal exception messages back to main process.""" 49 50 def __init__(self, msg, exception_type=None): 51 self.msg = msg 52 self.exception_type = exception_type 53 54 def MaybeThrow(self): 55 if self.exception_type: 56 raise getattr(__builtins__, 57 self.exception_type)('Originally caused by: ' + self.msg) 58 59 60class _FuncWrapper: 61 """Runs on the fork()'ed side to catch exceptions and spread *args.""" 62 63 def __init__(self, func): 64 global _is_child_process 65 _is_child_process = True 66 self._func = func 67 68 def __call__(self, index, _=None): 69 global _fork_kwargs 70 try: 71 if _fork_kwargs is None: # Clarifies _fork_kwargs is map for pylint. 72 _fork_kwargs = {} 73 return self._func(*_fork_params[index], **_fork_kwargs) 74 except Exception as e: 75 # Only keep the exception type for builtin exception types or else risk 76 # further marshalling exceptions. 77 exception_type = None 78 if hasattr(__builtins__, type(e).__name__): 79 exception_type = type(e).__name__ 80 # multiprocessing is supposed to catch and return exceptions automatically 81 # but it doesn't seem to work properly :(. 82 return _ExceptionWrapper(traceback.format_exc(), exception_type) 83 except: # pylint: disable=bare-except 84 return _ExceptionWrapper(traceback.format_exc()) 85 86 87class _WrappedResult: 88 """Allows for host-side logic to be run after child process has terminated. 89 90 * Unregisters associated pool _all_pools. 91 * Raises exception caught by _FuncWrapper. 92 """ 93 94 def __init__(self, result, pool=None): 95 self._result = result 96 self._pool = pool 97 98 def get(self): 99 self.wait() 100 value = self._result.get() 101 _CheckForException(value) 102 return value 103 104 def wait(self): 105 self._result.wait() 106 if self._pool: 107 _all_pools.remove(self._pool) 108 self._pool = None 109 110 def ready(self): 111 return self._result.ready() 112 113 def successful(self): 114 return self._result.successful() 115 116 117def _TerminatePools(): 118 """Calls .terminate() on all active process pools. 119 120 Not supposed to be necessary according to the docs, but seems to be required 121 when child process throws an exception or Ctrl-C is hit. 122 """ 123 global _silence_exceptions 124 _silence_exceptions = True 125 # Child processes cannot have pools, but atexit runs this function because 126 # it was registered before fork()ing. 127 if _is_child_process: 128 return 129 130 def close_pool(pool): 131 try: 132 pool.terminate() 133 except: # pylint: disable=bare-except 134 pass 135 136 for i, pool in enumerate(_all_pools): 137 # Without calling terminate() on a separate thread, the call can block 138 # forever. 139 thread = threading.Thread(name='Pool-Terminate-{}'.format(i), 140 target=close_pool, 141 args=(pool, )) 142 thread.daemon = True 143 thread.start() 144 145 146def _CheckForException(value): 147 if isinstance(value, _ExceptionWrapper): 148 global _silence_exceptions 149 if not _silence_exceptions: 150 value.MaybeThrow() 151 _silence_exceptions = True 152 logging.error('Subprocess raised an exception:\n%s', value.msg) 153 sys.exit(1) 154 155 156def _MakeProcessPool(job_params, **job_kwargs): 157 global _all_pools 158 global _fork_params 159 global _fork_kwargs 160 assert _fork_params is None 161 assert _fork_kwargs is None 162 pool_size = min(len(job_params), multiprocessing.cpu_count()) 163 _fork_params = job_params 164 _fork_kwargs = job_kwargs 165 ret = multiprocessing.Pool(pool_size) 166 _fork_params = None 167 _fork_kwargs = None 168 if _all_pools is None: 169 _all_pools = [] 170 atexit.register(_TerminatePools) 171 _all_pools.append(ret) 172 return ret 173 174 175def ForkAndCall(func, args): 176 """Runs |func| in a fork'ed process. 177 178 Returns: 179 A Result object (call .get() to get the return value) 180 """ 181 if DISABLE_ASYNC: 182 pool = None 183 result = _ImmediateResult(func(*args)) 184 else: 185 pool = _MakeProcessPool([args]) # Omit |kwargs|. 186 result = pool.apply_async(_FuncWrapper(func), (0, )) 187 pool.close() 188 return _WrappedResult(result, pool=pool) 189 190 191def BulkForkAndCall(func, arg_tuples, **kwargs): 192 """Calls |func| in a fork'ed process for each set of args within |arg_tuples|. 193 194 Args: 195 kwargs: Common keyword arguments to be passed to |func|. 196 197 Yields the return values in order. 198 """ 199 arg_tuples = list(arg_tuples) 200 if not arg_tuples: 201 return 202 203 if DISABLE_ASYNC: 204 for args in arg_tuples: 205 yield func(*args, **kwargs) 206 return 207 208 pool = _MakeProcessPool(arg_tuples, **kwargs) 209 wrapped_func = _FuncWrapper(func) 210 try: 211 for result in pool.imap(wrapped_func, range(len(arg_tuples))): 212 _CheckForException(result) 213 yield result 214 finally: 215 pool.close() 216 pool.join() 217 _all_pools.remove(pool) 218