1# Copyright 2020 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Helpers related to multiprocessing. 5 6Based on: //tools/binary_size/libsupersize/parallel.py 7""" 8 9import atexit 10import logging 11import multiprocessing 12import os 13import sys 14import threading 15import traceback 16 17DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1' 18if DISABLE_ASYNC: 19 logging.warning('Running in synchronous mode.') 20 21_all_pools = None 22_is_child_process = False 23_silence_exceptions = False 24 25# Used to pass parameters to forked processes without pickling. 26_fork_params = None 27_fork_kwargs = None 28 29# Ensure fork is used on MacOS for multiprocessing compatibility. 30# Starting from Python 3.8, the "spawn" method is the default on MacOS. 31# On Linux hosts this line will be a no-op. 32multiprocessing.set_start_method('fork') 33 34class _ImmediateResult: 35 def __init__(self, value): 36 self._value = value 37 38 def get(self): 39 return self._value 40 41 def wait(self): 42 pass 43 44 def ready(self): 45 return True 46 47 def successful(self): 48 return True 49 50 51class _ExceptionWrapper: 52 """Used to marshal exception messages back to main process.""" 53 54 def __init__(self, msg, exception_type=None): 55 self.msg = msg 56 self.exception_type = exception_type 57 58 def MaybeThrow(self): 59 if self.exception_type: 60 raise getattr(__builtins__, 61 self.exception_type)('Originally caused by: ' + self.msg) 62 63 64class _FuncWrapper: 65 """Runs on the fork()'ed side to catch exceptions and spread *args.""" 66 67 def __init__(self, func): 68 global _is_child_process 69 _is_child_process = True 70 self._func = func 71 72 def __call__(self, index, _=None): 73 global _fork_kwargs 74 try: 75 if _fork_kwargs is None: # Clarifies _fork_kwargs is map for pylint. 76 _fork_kwargs = {} 77 return self._func(*_fork_params[index], **_fork_kwargs) 78 except Exception as e: 79 # Only keep the exception type for builtin exception types or else risk 80 # further marshalling exceptions. 81 exception_type = None 82 if hasattr(__builtins__, type(e).__name__): 83 exception_type = type(e).__name__ 84 # multiprocessing is supposed to catch and return exceptions automatically 85 # but it doesn't seem to work properly :(. 86 return _ExceptionWrapper(traceback.format_exc(), exception_type) 87 except: # pylint: disable=bare-except 88 return _ExceptionWrapper(traceback.format_exc()) 89 90 91class _WrappedResult: 92 """Allows for host-side logic to be run after child process has terminated. 93 94 * Unregisters associated pool _all_pools. 95 * Raises exception caught by _FuncWrapper. 96 """ 97 98 def __init__(self, result, pool=None): 99 self._result = result 100 self._pool = pool 101 102 def get(self): 103 self.wait() 104 value = self._result.get() 105 _CheckForException(value) 106 return value 107 108 def wait(self): 109 self._result.wait() 110 if self._pool: 111 _all_pools.remove(self._pool) 112 self._pool = None 113 114 def ready(self): 115 return self._result.ready() 116 117 def successful(self): 118 return self._result.successful() 119 120 121def _TerminatePools(): 122 """Calls .terminate() on all active process pools. 123 124 Not supposed to be necessary according to the docs, but seems to be required 125 when child process throws an exception or Ctrl-C is hit. 126 """ 127 global _silence_exceptions 128 _silence_exceptions = True 129 # Child processes cannot have pools, but atexit runs this function because 130 # it was registered before fork()ing. 131 if _is_child_process: 132 return 133 134 def close_pool(pool): 135 try: 136 pool.terminate() 137 except: # pylint: disable=bare-except 138 pass 139 140 for i, pool in enumerate(_all_pools): 141 # Without calling terminate() on a separate thread, the call can block 142 # forever. 143 thread = threading.Thread(name='Pool-Terminate-{}'.format(i), 144 target=close_pool, 145 args=(pool, )) 146 thread.daemon = True 147 thread.start() 148 149 150def _CheckForException(value): 151 if isinstance(value, _ExceptionWrapper): 152 global _silence_exceptions 153 if not _silence_exceptions: 154 value.MaybeThrow() 155 _silence_exceptions = True 156 logging.error('Subprocess raised an exception:\n%s', value.msg) 157 sys.exit(1) 158 159 160def _MakeProcessPool(job_params, **job_kwargs): 161 global _all_pools 162 global _fork_params 163 global _fork_kwargs 164 assert _fork_params is None 165 assert _fork_kwargs is None 166 pool_size = min(len(job_params), multiprocessing.cpu_count()) 167 _fork_params = job_params 168 _fork_kwargs = job_kwargs 169 ret = multiprocessing.Pool(pool_size) 170 _fork_params = None 171 _fork_kwargs = None 172 if _all_pools is None: 173 _all_pools = [] 174 atexit.register(_TerminatePools) 175 _all_pools.append(ret) 176 return ret 177 178 179def ForkAndCall(func, args): 180 """Runs |func| in a fork'ed process. 181 182 Returns: 183 A Result object (call .get() to get the return value) 184 """ 185 if DISABLE_ASYNC: 186 pool = None 187 result = _ImmediateResult(func(*args)) 188 else: 189 pool = _MakeProcessPool([args]) # Omit |kwargs|. 190 result = pool.apply_async(_FuncWrapper(func), (0, )) 191 pool.close() 192 return _WrappedResult(result, pool=pool) 193 194 195def BulkForkAndCall(func, arg_tuples, **kwargs): 196 """Calls |func| in a fork'ed process for each set of args within |arg_tuples|. 197 198 Args: 199 kwargs: Common keyword arguments to be passed to |func|. 200 201 Yields the return values in order. 202 """ 203 arg_tuples = list(arg_tuples) 204 if not arg_tuples: 205 return 206 207 if DISABLE_ASYNC: 208 for args in arg_tuples: 209 yield func(*args, **kwargs) 210 return 211 212 pool = _MakeProcessPool(arg_tuples, **kwargs) 213 wrapped_func = _FuncWrapper(func) 214 try: 215 for result in pool.imap(wrapped_func, range(len(arg_tuples))): 216 _CheckForException(result) 217 yield result 218 finally: 219 pool.close() 220 pool.join() 221 _all_pools.remove(pool) 222