1*6777b538SAndroid Build Coastguard Worker# Copyright 2020 The Chromium Authors 2*6777b538SAndroid Build Coastguard Worker# Use of this source code is governed by a BSD-style license that can be 3*6777b538SAndroid Build Coastguard Worker# found in the LICENSE file. 4*6777b538SAndroid Build Coastguard Worker"""Helpers related to multiprocessing. 5*6777b538SAndroid Build Coastguard Worker 6*6777b538SAndroid Build Coastguard WorkerBased on: //tools/binary_size/libsupersize/parallel.py 7*6777b538SAndroid Build Coastguard Worker""" 8*6777b538SAndroid Build Coastguard Worker 9*6777b538SAndroid Build Coastguard Workerimport atexit 10*6777b538SAndroid Build Coastguard Workerimport logging 11*6777b538SAndroid Build Coastguard Workerimport multiprocessing 12*6777b538SAndroid Build Coastguard Workerimport os 13*6777b538SAndroid Build Coastguard Workerimport sys 14*6777b538SAndroid Build Coastguard Workerimport threading 15*6777b538SAndroid Build Coastguard Workerimport traceback 16*6777b538SAndroid Build Coastguard Worker 17*6777b538SAndroid Build Coastguard WorkerDISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1' 18*6777b538SAndroid Build Coastguard Workerif DISABLE_ASYNC: 19*6777b538SAndroid Build Coastguard Worker logging.warning('Running in synchronous mode.') 20*6777b538SAndroid Build Coastguard Worker 21*6777b538SAndroid Build Coastguard Worker_all_pools = None 22*6777b538SAndroid Build Coastguard Worker_is_child_process = False 23*6777b538SAndroid Build Coastguard Worker_silence_exceptions = False 24*6777b538SAndroid Build Coastguard Worker 25*6777b538SAndroid Build Coastguard Worker# Used to pass parameters to forked processes without pickling. 26*6777b538SAndroid Build Coastguard Worker_fork_params = None 27*6777b538SAndroid Build Coastguard Worker_fork_kwargs = None 28*6777b538SAndroid Build Coastguard Worker 29*6777b538SAndroid Build Coastguard Worker 30*6777b538SAndroid Build Coastguard Workerclass _ImmediateResult: 31*6777b538SAndroid Build Coastguard Worker def __init__(self, value): 32*6777b538SAndroid Build Coastguard Worker self._value = value 33*6777b538SAndroid Build Coastguard Worker 34*6777b538SAndroid Build Coastguard Worker def get(self): 35*6777b538SAndroid Build Coastguard Worker return self._value 36*6777b538SAndroid Build Coastguard Worker 37*6777b538SAndroid Build Coastguard Worker def wait(self): 38*6777b538SAndroid Build Coastguard Worker pass 39*6777b538SAndroid Build Coastguard Worker 40*6777b538SAndroid Build Coastguard Worker def ready(self): 41*6777b538SAndroid Build Coastguard Worker return True 42*6777b538SAndroid Build Coastguard Worker 43*6777b538SAndroid Build Coastguard Worker def successful(self): 44*6777b538SAndroid Build Coastguard Worker return True 45*6777b538SAndroid Build Coastguard Worker 46*6777b538SAndroid Build Coastguard Worker 47*6777b538SAndroid Build Coastguard Workerclass _ExceptionWrapper: 48*6777b538SAndroid Build Coastguard Worker """Used to marshal exception messages back to main process.""" 49*6777b538SAndroid Build Coastguard Worker 50*6777b538SAndroid Build Coastguard Worker def __init__(self, msg, exception_type=None): 51*6777b538SAndroid Build Coastguard Worker self.msg = msg 52*6777b538SAndroid Build Coastguard Worker self.exception_type = exception_type 53*6777b538SAndroid Build Coastguard Worker 54*6777b538SAndroid Build Coastguard Worker def MaybeThrow(self): 55*6777b538SAndroid Build Coastguard Worker if self.exception_type: 56*6777b538SAndroid Build Coastguard Worker raise getattr(__builtins__, 57*6777b538SAndroid Build Coastguard Worker self.exception_type)('Originally caused by: ' + self.msg) 58*6777b538SAndroid Build Coastguard Worker 59*6777b538SAndroid Build Coastguard Worker 60*6777b538SAndroid Build Coastguard Workerclass _FuncWrapper: 61*6777b538SAndroid Build Coastguard Worker """Runs on the fork()'ed side to catch exceptions and spread *args.""" 62*6777b538SAndroid Build Coastguard Worker 63*6777b538SAndroid Build Coastguard Worker def __init__(self, func): 64*6777b538SAndroid Build Coastguard Worker global _is_child_process 65*6777b538SAndroid Build Coastguard Worker _is_child_process = True 66*6777b538SAndroid Build Coastguard Worker self._func = func 67*6777b538SAndroid Build Coastguard Worker 68*6777b538SAndroid Build Coastguard Worker def __call__(self, index, _=None): 69*6777b538SAndroid Build Coastguard Worker global _fork_kwargs 70*6777b538SAndroid Build Coastguard Worker try: 71*6777b538SAndroid Build Coastguard Worker if _fork_kwargs is None: # Clarifies _fork_kwargs is map for pylint. 72*6777b538SAndroid Build Coastguard Worker _fork_kwargs = {} 73*6777b538SAndroid Build Coastguard Worker return self._func(*_fork_params[index], **_fork_kwargs) 74*6777b538SAndroid Build Coastguard Worker except Exception as e: 75*6777b538SAndroid Build Coastguard Worker # Only keep the exception type for builtin exception types or else risk 76*6777b538SAndroid Build Coastguard Worker # further marshalling exceptions. 77*6777b538SAndroid Build Coastguard Worker exception_type = None 78*6777b538SAndroid Build Coastguard Worker if hasattr(__builtins__, type(e).__name__): 79*6777b538SAndroid Build Coastguard Worker exception_type = type(e).__name__ 80*6777b538SAndroid Build Coastguard Worker # multiprocessing is supposed to catch and return exceptions automatically 81*6777b538SAndroid Build Coastguard Worker # but it doesn't seem to work properly :(. 82*6777b538SAndroid Build Coastguard Worker return _ExceptionWrapper(traceback.format_exc(), exception_type) 83*6777b538SAndroid Build Coastguard Worker except: # pylint: disable=bare-except 84*6777b538SAndroid Build Coastguard Worker return _ExceptionWrapper(traceback.format_exc()) 85*6777b538SAndroid Build Coastguard Worker 86*6777b538SAndroid Build Coastguard Worker 87*6777b538SAndroid Build Coastguard Workerclass _WrappedResult: 88*6777b538SAndroid Build Coastguard Worker """Allows for host-side logic to be run after child process has terminated. 89*6777b538SAndroid Build Coastguard Worker 90*6777b538SAndroid Build Coastguard Worker * Unregisters associated pool _all_pools. 91*6777b538SAndroid Build Coastguard Worker * Raises exception caught by _FuncWrapper. 92*6777b538SAndroid Build Coastguard Worker """ 93*6777b538SAndroid Build Coastguard Worker 94*6777b538SAndroid Build Coastguard Worker def __init__(self, result, pool=None): 95*6777b538SAndroid Build Coastguard Worker self._result = result 96*6777b538SAndroid Build Coastguard Worker self._pool = pool 97*6777b538SAndroid Build Coastguard Worker 98*6777b538SAndroid Build Coastguard Worker def get(self): 99*6777b538SAndroid Build Coastguard Worker self.wait() 100*6777b538SAndroid Build Coastguard Worker value = self._result.get() 101*6777b538SAndroid Build Coastguard Worker _CheckForException(value) 102*6777b538SAndroid Build Coastguard Worker return value 103*6777b538SAndroid Build Coastguard Worker 104*6777b538SAndroid Build Coastguard Worker def wait(self): 105*6777b538SAndroid Build Coastguard Worker self._result.wait() 106*6777b538SAndroid Build Coastguard Worker if self._pool: 107*6777b538SAndroid Build Coastguard Worker _all_pools.remove(self._pool) 108*6777b538SAndroid Build Coastguard Worker self._pool = None 109*6777b538SAndroid Build Coastguard Worker 110*6777b538SAndroid Build Coastguard Worker def ready(self): 111*6777b538SAndroid Build Coastguard Worker return self._result.ready() 112*6777b538SAndroid Build Coastguard Worker 113*6777b538SAndroid Build Coastguard Worker def successful(self): 114*6777b538SAndroid Build Coastguard Worker return self._result.successful() 115*6777b538SAndroid Build Coastguard Worker 116*6777b538SAndroid Build Coastguard Worker 117*6777b538SAndroid Build Coastguard Workerdef _TerminatePools(): 118*6777b538SAndroid Build Coastguard Worker """Calls .terminate() on all active process pools. 119*6777b538SAndroid Build Coastguard Worker 120*6777b538SAndroid Build Coastguard Worker Not supposed to be necessary according to the docs, but seems to be required 121*6777b538SAndroid Build Coastguard Worker when child process throws an exception or Ctrl-C is hit. 122*6777b538SAndroid Build Coastguard Worker """ 123*6777b538SAndroid Build Coastguard Worker global _silence_exceptions 124*6777b538SAndroid Build Coastguard Worker _silence_exceptions = True 125*6777b538SAndroid Build Coastguard Worker # Child processes cannot have pools, but atexit runs this function because 126*6777b538SAndroid Build Coastguard Worker # it was registered before fork()ing. 127*6777b538SAndroid Build Coastguard Worker if _is_child_process: 128*6777b538SAndroid Build Coastguard Worker return 129*6777b538SAndroid Build Coastguard Worker 130*6777b538SAndroid Build Coastguard Worker def close_pool(pool): 131*6777b538SAndroid Build Coastguard Worker try: 132*6777b538SAndroid Build Coastguard Worker pool.terminate() 133*6777b538SAndroid Build Coastguard Worker except: # pylint: disable=bare-except 134*6777b538SAndroid Build Coastguard Worker pass 135*6777b538SAndroid Build Coastguard Worker 136*6777b538SAndroid Build Coastguard Worker for i, pool in enumerate(_all_pools): 137*6777b538SAndroid Build Coastguard Worker # Without calling terminate() on a separate thread, the call can block 138*6777b538SAndroid Build Coastguard Worker # forever. 139*6777b538SAndroid Build Coastguard Worker thread = threading.Thread(name='Pool-Terminate-{}'.format(i), 140*6777b538SAndroid Build Coastguard Worker target=close_pool, 141*6777b538SAndroid Build Coastguard Worker args=(pool, )) 142*6777b538SAndroid Build Coastguard Worker thread.daemon = True 143*6777b538SAndroid Build Coastguard Worker thread.start() 144*6777b538SAndroid Build Coastguard Worker 145*6777b538SAndroid Build Coastguard Worker 146*6777b538SAndroid Build Coastguard Workerdef _CheckForException(value): 147*6777b538SAndroid Build Coastguard Worker if isinstance(value, _ExceptionWrapper): 148*6777b538SAndroid Build Coastguard Worker global _silence_exceptions 149*6777b538SAndroid Build Coastguard Worker if not _silence_exceptions: 150*6777b538SAndroid Build Coastguard Worker value.MaybeThrow() 151*6777b538SAndroid Build Coastguard Worker _silence_exceptions = True 152*6777b538SAndroid Build Coastguard Worker logging.error('Subprocess raised an exception:\n%s', value.msg) 153*6777b538SAndroid Build Coastguard Worker sys.exit(1) 154*6777b538SAndroid Build Coastguard Worker 155*6777b538SAndroid Build Coastguard Worker 156*6777b538SAndroid Build Coastguard Workerdef _MakeProcessPool(job_params, **job_kwargs): 157*6777b538SAndroid Build Coastguard Worker global _all_pools 158*6777b538SAndroid Build Coastguard Worker global _fork_params 159*6777b538SAndroid Build Coastguard Worker global _fork_kwargs 160*6777b538SAndroid Build Coastguard Worker assert _fork_params is None 161*6777b538SAndroid Build Coastguard Worker assert _fork_kwargs is None 162*6777b538SAndroid Build Coastguard Worker pool_size = min(len(job_params), multiprocessing.cpu_count()) 163*6777b538SAndroid Build Coastguard Worker _fork_params = job_params 164*6777b538SAndroid Build Coastguard Worker _fork_kwargs = job_kwargs 165*6777b538SAndroid Build Coastguard Worker ret = multiprocessing.Pool(pool_size) 166*6777b538SAndroid Build Coastguard Worker _fork_params = None 167*6777b538SAndroid Build Coastguard Worker _fork_kwargs = None 168*6777b538SAndroid Build Coastguard Worker if _all_pools is None: 169*6777b538SAndroid Build Coastguard Worker _all_pools = [] 170*6777b538SAndroid Build Coastguard Worker atexit.register(_TerminatePools) 171*6777b538SAndroid Build Coastguard Worker _all_pools.append(ret) 172*6777b538SAndroid Build Coastguard Worker return ret 173*6777b538SAndroid Build Coastguard Worker 174*6777b538SAndroid Build Coastguard Worker 175*6777b538SAndroid Build Coastguard Workerdef ForkAndCall(func, args): 176*6777b538SAndroid Build Coastguard Worker """Runs |func| in a fork'ed process. 177*6777b538SAndroid Build Coastguard Worker 178*6777b538SAndroid Build Coastguard Worker Returns: 179*6777b538SAndroid Build Coastguard Worker A Result object (call .get() to get the return value) 180*6777b538SAndroid Build Coastguard Worker """ 181*6777b538SAndroid Build Coastguard Worker if DISABLE_ASYNC: 182*6777b538SAndroid Build Coastguard Worker pool = None 183*6777b538SAndroid Build Coastguard Worker result = _ImmediateResult(func(*args)) 184*6777b538SAndroid Build Coastguard Worker else: 185*6777b538SAndroid Build Coastguard Worker pool = _MakeProcessPool([args]) # Omit |kwargs|. 186*6777b538SAndroid Build Coastguard Worker result = pool.apply_async(_FuncWrapper(func), (0, )) 187*6777b538SAndroid Build Coastguard Worker pool.close() 188*6777b538SAndroid Build Coastguard Worker return _WrappedResult(result, pool=pool) 189*6777b538SAndroid Build Coastguard Worker 190*6777b538SAndroid Build Coastguard Worker 191*6777b538SAndroid Build Coastguard Workerdef BulkForkAndCall(func, arg_tuples, **kwargs): 192*6777b538SAndroid Build Coastguard Worker """Calls |func| in a fork'ed process for each set of args within |arg_tuples|. 193*6777b538SAndroid Build Coastguard Worker 194*6777b538SAndroid Build Coastguard Worker Args: 195*6777b538SAndroid Build Coastguard Worker kwargs: Common keyword arguments to be passed to |func|. 196*6777b538SAndroid Build Coastguard Worker 197*6777b538SAndroid Build Coastguard Worker Yields the return values in order. 198*6777b538SAndroid Build Coastguard Worker """ 199*6777b538SAndroid Build Coastguard Worker arg_tuples = list(arg_tuples) 200*6777b538SAndroid Build Coastguard Worker if not arg_tuples: 201*6777b538SAndroid Build Coastguard Worker return 202*6777b538SAndroid Build Coastguard Worker 203*6777b538SAndroid Build Coastguard Worker if DISABLE_ASYNC: 204*6777b538SAndroid Build Coastguard Worker for args in arg_tuples: 205*6777b538SAndroid Build Coastguard Worker yield func(*args, **kwargs) 206*6777b538SAndroid Build Coastguard Worker return 207*6777b538SAndroid Build Coastguard Worker 208*6777b538SAndroid Build Coastguard Worker pool = _MakeProcessPool(arg_tuples, **kwargs) 209*6777b538SAndroid Build Coastguard Worker wrapped_func = _FuncWrapper(func) 210*6777b538SAndroid Build Coastguard Worker try: 211*6777b538SAndroid Build Coastguard Worker for result in pool.imap(wrapped_func, range(len(arg_tuples))): 212*6777b538SAndroid Build Coastguard Worker _CheckForException(result) 213*6777b538SAndroid Build Coastguard Worker yield result 214*6777b538SAndroid Build Coastguard Worker finally: 215*6777b538SAndroid Build Coastguard Worker pool.close() 216*6777b538SAndroid Build Coastguard Worker pool.join() 217*6777b538SAndroid Build Coastguard Worker _all_pools.remove(pool) 218