1*8975f5c5SAndroid Build Coastguard Worker# Copyright 2020 The Chromium Authors 2*8975f5c5SAndroid Build Coastguard Worker# Use of this source code is governed by a BSD-style license that can be 3*8975f5c5SAndroid Build Coastguard Worker# found in the LICENSE file. 4*8975f5c5SAndroid Build Coastguard Worker"""Helpers related to multiprocessing. 5*8975f5c5SAndroid Build Coastguard Worker 6*8975f5c5SAndroid Build Coastguard WorkerBased on: //tools/binary_size/libsupersize/parallel.py 7*8975f5c5SAndroid Build Coastguard Worker""" 8*8975f5c5SAndroid Build Coastguard Worker 9*8975f5c5SAndroid Build Coastguard Workerimport atexit 10*8975f5c5SAndroid Build Coastguard Workerimport logging 11*8975f5c5SAndroid Build Coastguard Workerimport multiprocessing 12*8975f5c5SAndroid Build Coastguard Workerimport os 13*8975f5c5SAndroid Build Coastguard Workerimport sys 14*8975f5c5SAndroid Build Coastguard Workerimport threading 15*8975f5c5SAndroid Build Coastguard Workerimport traceback 16*8975f5c5SAndroid Build Coastguard Worker 17*8975f5c5SAndroid Build Coastguard WorkerDISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1' 18*8975f5c5SAndroid Build Coastguard Workerif DISABLE_ASYNC: 19*8975f5c5SAndroid Build Coastguard Worker logging.warning('Running in synchronous mode.') 20*8975f5c5SAndroid Build Coastguard Worker 21*8975f5c5SAndroid Build Coastguard Worker_all_pools = None 22*8975f5c5SAndroid Build Coastguard Worker_is_child_process = False 23*8975f5c5SAndroid Build Coastguard Worker_silence_exceptions = False 24*8975f5c5SAndroid Build Coastguard Worker 25*8975f5c5SAndroid Build Coastguard Worker# Used to pass parameters to forked processes without pickling. 26*8975f5c5SAndroid Build Coastguard Worker_fork_params = None 27*8975f5c5SAndroid Build Coastguard Worker_fork_kwargs = None 28*8975f5c5SAndroid Build Coastguard Worker 29*8975f5c5SAndroid Build Coastguard Worker# Ensure fork is used on MacOS for multiprocessing compatibility. 30*8975f5c5SAndroid Build Coastguard Worker# Starting from Python 3.8, the "spawn" method is the default on MacOS. 31*8975f5c5SAndroid Build Coastguard Worker# On Linux hosts this line will be a no-op. 32*8975f5c5SAndroid Build Coastguard Workermultiprocessing.set_start_method('fork') 33*8975f5c5SAndroid Build Coastguard Worker 34*8975f5c5SAndroid Build Coastguard Workerclass _ImmediateResult: 35*8975f5c5SAndroid Build Coastguard Worker def __init__(self, value): 36*8975f5c5SAndroid Build Coastguard Worker self._value = value 37*8975f5c5SAndroid Build Coastguard Worker 38*8975f5c5SAndroid Build Coastguard Worker def get(self): 39*8975f5c5SAndroid Build Coastguard Worker return self._value 40*8975f5c5SAndroid Build Coastguard Worker 41*8975f5c5SAndroid Build Coastguard Worker def wait(self): 42*8975f5c5SAndroid Build Coastguard Worker pass 43*8975f5c5SAndroid Build Coastguard Worker 44*8975f5c5SAndroid Build Coastguard Worker def ready(self): 45*8975f5c5SAndroid Build Coastguard Worker return True 46*8975f5c5SAndroid Build Coastguard Worker 47*8975f5c5SAndroid Build Coastguard Worker def successful(self): 48*8975f5c5SAndroid Build Coastguard Worker return True 49*8975f5c5SAndroid Build Coastguard Worker 50*8975f5c5SAndroid Build Coastguard Worker 51*8975f5c5SAndroid Build Coastguard Workerclass _ExceptionWrapper: 52*8975f5c5SAndroid Build Coastguard Worker """Used to marshal exception messages back to main process.""" 53*8975f5c5SAndroid Build Coastguard Worker 54*8975f5c5SAndroid Build Coastguard Worker def __init__(self, msg, exception_type=None): 55*8975f5c5SAndroid Build Coastguard Worker self.msg = msg 56*8975f5c5SAndroid Build Coastguard Worker self.exception_type = exception_type 57*8975f5c5SAndroid Build Coastguard Worker 58*8975f5c5SAndroid Build Coastguard Worker def MaybeThrow(self): 59*8975f5c5SAndroid Build Coastguard Worker if self.exception_type: 60*8975f5c5SAndroid Build Coastguard Worker raise getattr(__builtins__, 61*8975f5c5SAndroid Build Coastguard Worker self.exception_type)('Originally caused by: ' + self.msg) 62*8975f5c5SAndroid Build Coastguard Worker 63*8975f5c5SAndroid Build Coastguard Worker 64*8975f5c5SAndroid Build Coastguard Workerclass _FuncWrapper: 65*8975f5c5SAndroid Build Coastguard Worker """Runs on the fork()'ed side to catch exceptions and spread *args.""" 66*8975f5c5SAndroid Build Coastguard Worker 67*8975f5c5SAndroid Build Coastguard Worker def __init__(self, func): 68*8975f5c5SAndroid Build Coastguard Worker global _is_child_process 69*8975f5c5SAndroid Build Coastguard Worker _is_child_process = True 70*8975f5c5SAndroid Build Coastguard Worker self._func = func 71*8975f5c5SAndroid Build Coastguard Worker 72*8975f5c5SAndroid Build Coastguard Worker def __call__(self, index, _=None): 73*8975f5c5SAndroid Build Coastguard Worker global _fork_kwargs 74*8975f5c5SAndroid Build Coastguard Worker try: 75*8975f5c5SAndroid Build Coastguard Worker if _fork_kwargs is None: # Clarifies _fork_kwargs is map for pylint. 76*8975f5c5SAndroid Build Coastguard Worker _fork_kwargs = {} 77*8975f5c5SAndroid Build Coastguard Worker return self._func(*_fork_params[index], **_fork_kwargs) 78*8975f5c5SAndroid Build Coastguard Worker except Exception as e: 79*8975f5c5SAndroid Build Coastguard Worker # Only keep the exception type for builtin exception types or else risk 80*8975f5c5SAndroid Build Coastguard Worker # further marshalling exceptions. 81*8975f5c5SAndroid Build Coastguard Worker exception_type = None 82*8975f5c5SAndroid Build Coastguard Worker if hasattr(__builtins__, type(e).__name__): 83*8975f5c5SAndroid Build Coastguard Worker exception_type = type(e).__name__ 84*8975f5c5SAndroid Build Coastguard Worker # multiprocessing is supposed to catch and return exceptions automatically 85*8975f5c5SAndroid Build Coastguard Worker # but it doesn't seem to work properly :(. 86*8975f5c5SAndroid Build Coastguard Worker return _ExceptionWrapper(traceback.format_exc(), exception_type) 87*8975f5c5SAndroid Build Coastguard Worker except: # pylint: disable=bare-except 88*8975f5c5SAndroid Build Coastguard Worker return _ExceptionWrapper(traceback.format_exc()) 89*8975f5c5SAndroid Build Coastguard Worker 90*8975f5c5SAndroid Build Coastguard Worker 91*8975f5c5SAndroid Build Coastguard Workerclass _WrappedResult: 92*8975f5c5SAndroid Build Coastguard Worker """Allows for host-side logic to be run after child process has terminated. 93*8975f5c5SAndroid Build Coastguard Worker 94*8975f5c5SAndroid Build Coastguard Worker * Unregisters associated pool _all_pools. 95*8975f5c5SAndroid Build Coastguard Worker * Raises exception caught by _FuncWrapper. 96*8975f5c5SAndroid Build Coastguard Worker """ 97*8975f5c5SAndroid Build Coastguard Worker 98*8975f5c5SAndroid Build Coastguard Worker def __init__(self, result, pool=None): 99*8975f5c5SAndroid Build Coastguard Worker self._result = result 100*8975f5c5SAndroid Build Coastguard Worker self._pool = pool 101*8975f5c5SAndroid Build Coastguard Worker 102*8975f5c5SAndroid Build Coastguard Worker def get(self): 103*8975f5c5SAndroid Build Coastguard Worker self.wait() 104*8975f5c5SAndroid Build Coastguard Worker value = self._result.get() 105*8975f5c5SAndroid Build Coastguard Worker _CheckForException(value) 106*8975f5c5SAndroid Build Coastguard Worker return value 107*8975f5c5SAndroid Build Coastguard Worker 108*8975f5c5SAndroid Build Coastguard Worker def wait(self): 109*8975f5c5SAndroid Build Coastguard Worker self._result.wait() 110*8975f5c5SAndroid Build Coastguard Worker if self._pool: 111*8975f5c5SAndroid Build Coastguard Worker _all_pools.remove(self._pool) 112*8975f5c5SAndroid Build Coastguard Worker self._pool = None 113*8975f5c5SAndroid Build Coastguard Worker 114*8975f5c5SAndroid Build Coastguard Worker def ready(self): 115*8975f5c5SAndroid Build Coastguard Worker return self._result.ready() 116*8975f5c5SAndroid Build Coastguard Worker 117*8975f5c5SAndroid Build Coastguard Worker def successful(self): 118*8975f5c5SAndroid Build Coastguard Worker return self._result.successful() 119*8975f5c5SAndroid Build Coastguard Worker 120*8975f5c5SAndroid Build Coastguard Worker 121*8975f5c5SAndroid Build Coastguard Workerdef _TerminatePools(): 122*8975f5c5SAndroid Build Coastguard Worker """Calls .terminate() on all active process pools. 123*8975f5c5SAndroid Build Coastguard Worker 124*8975f5c5SAndroid Build Coastguard Worker Not supposed to be necessary according to the docs, but seems to be required 125*8975f5c5SAndroid Build Coastguard Worker when child process throws an exception or Ctrl-C is hit. 126*8975f5c5SAndroid Build Coastguard Worker """ 127*8975f5c5SAndroid Build Coastguard Worker global _silence_exceptions 128*8975f5c5SAndroid Build Coastguard Worker _silence_exceptions = True 129*8975f5c5SAndroid Build Coastguard Worker # Child processes cannot have pools, but atexit runs this function because 130*8975f5c5SAndroid Build Coastguard Worker # it was registered before fork()ing. 131*8975f5c5SAndroid Build Coastguard Worker if _is_child_process: 132*8975f5c5SAndroid Build Coastguard Worker return 133*8975f5c5SAndroid Build Coastguard Worker 134*8975f5c5SAndroid Build Coastguard Worker def close_pool(pool): 135*8975f5c5SAndroid Build Coastguard Worker try: 136*8975f5c5SAndroid Build Coastguard Worker pool.terminate() 137*8975f5c5SAndroid Build Coastguard Worker except: # pylint: disable=bare-except 138*8975f5c5SAndroid Build Coastguard Worker pass 139*8975f5c5SAndroid Build Coastguard Worker 140*8975f5c5SAndroid Build Coastguard Worker for i, pool in enumerate(_all_pools): 141*8975f5c5SAndroid Build Coastguard Worker # Without calling terminate() on a separate thread, the call can block 142*8975f5c5SAndroid Build Coastguard Worker # forever. 143*8975f5c5SAndroid Build Coastguard Worker thread = threading.Thread(name='Pool-Terminate-{}'.format(i), 144*8975f5c5SAndroid Build Coastguard Worker target=close_pool, 145*8975f5c5SAndroid Build Coastguard Worker args=(pool, )) 146*8975f5c5SAndroid Build Coastguard Worker thread.daemon = True 147*8975f5c5SAndroid Build Coastguard Worker thread.start() 148*8975f5c5SAndroid Build Coastguard Worker 149*8975f5c5SAndroid Build Coastguard Worker 150*8975f5c5SAndroid Build Coastguard Workerdef _CheckForException(value): 151*8975f5c5SAndroid Build Coastguard Worker if isinstance(value, _ExceptionWrapper): 152*8975f5c5SAndroid Build Coastguard Worker global _silence_exceptions 153*8975f5c5SAndroid Build Coastguard Worker if not _silence_exceptions: 154*8975f5c5SAndroid Build Coastguard Worker value.MaybeThrow() 155*8975f5c5SAndroid Build Coastguard Worker _silence_exceptions = True 156*8975f5c5SAndroid Build Coastguard Worker logging.error('Subprocess raised an exception:\n%s', value.msg) 157*8975f5c5SAndroid Build Coastguard Worker sys.exit(1) 158*8975f5c5SAndroid Build Coastguard Worker 159*8975f5c5SAndroid Build Coastguard Worker 160*8975f5c5SAndroid Build Coastguard Workerdef _MakeProcessPool(job_params, **job_kwargs): 161*8975f5c5SAndroid Build Coastguard Worker global _all_pools 162*8975f5c5SAndroid Build Coastguard Worker global _fork_params 163*8975f5c5SAndroid Build Coastguard Worker global _fork_kwargs 164*8975f5c5SAndroid Build Coastguard Worker assert _fork_params is None 165*8975f5c5SAndroid Build Coastguard Worker assert _fork_kwargs is None 166*8975f5c5SAndroid Build Coastguard Worker pool_size = min(len(job_params), multiprocessing.cpu_count()) 167*8975f5c5SAndroid Build Coastguard Worker _fork_params = job_params 168*8975f5c5SAndroid Build Coastguard Worker _fork_kwargs = job_kwargs 169*8975f5c5SAndroid Build Coastguard Worker ret = multiprocessing.Pool(pool_size) 170*8975f5c5SAndroid Build Coastguard Worker _fork_params = None 171*8975f5c5SAndroid Build Coastguard Worker _fork_kwargs = None 172*8975f5c5SAndroid Build Coastguard Worker if _all_pools is None: 173*8975f5c5SAndroid Build Coastguard Worker _all_pools = [] 174*8975f5c5SAndroid Build Coastguard Worker atexit.register(_TerminatePools) 175*8975f5c5SAndroid Build Coastguard Worker _all_pools.append(ret) 176*8975f5c5SAndroid Build Coastguard Worker return ret 177*8975f5c5SAndroid Build Coastguard Worker 178*8975f5c5SAndroid Build Coastguard Worker 179*8975f5c5SAndroid Build Coastguard Workerdef ForkAndCall(func, args): 180*8975f5c5SAndroid Build Coastguard Worker """Runs |func| in a fork'ed process. 181*8975f5c5SAndroid Build Coastguard Worker 182*8975f5c5SAndroid Build Coastguard Worker Returns: 183*8975f5c5SAndroid Build Coastguard Worker A Result object (call .get() to get the return value) 184*8975f5c5SAndroid Build Coastguard Worker """ 185*8975f5c5SAndroid Build Coastguard Worker if DISABLE_ASYNC: 186*8975f5c5SAndroid Build Coastguard Worker pool = None 187*8975f5c5SAndroid Build Coastguard Worker result = _ImmediateResult(func(*args)) 188*8975f5c5SAndroid Build Coastguard Worker else: 189*8975f5c5SAndroid Build Coastguard Worker pool = _MakeProcessPool([args]) # Omit |kwargs|. 190*8975f5c5SAndroid Build Coastguard Worker result = pool.apply_async(_FuncWrapper(func), (0, )) 191*8975f5c5SAndroid Build Coastguard Worker pool.close() 192*8975f5c5SAndroid Build Coastguard Worker return _WrappedResult(result, pool=pool) 193*8975f5c5SAndroid Build Coastguard Worker 194*8975f5c5SAndroid Build Coastguard Worker 195*8975f5c5SAndroid Build Coastguard Workerdef BulkForkAndCall(func, arg_tuples, **kwargs): 196*8975f5c5SAndroid Build Coastguard Worker """Calls |func| in a fork'ed process for each set of args within |arg_tuples|. 197*8975f5c5SAndroid Build Coastguard Worker 198*8975f5c5SAndroid Build Coastguard Worker Args: 199*8975f5c5SAndroid Build Coastguard Worker kwargs: Common keyword arguments to be passed to |func|. 200*8975f5c5SAndroid Build Coastguard Worker 201*8975f5c5SAndroid Build Coastguard Worker Yields the return values in order. 202*8975f5c5SAndroid Build Coastguard Worker """ 203*8975f5c5SAndroid Build Coastguard Worker arg_tuples = list(arg_tuples) 204*8975f5c5SAndroid Build Coastguard Worker if not arg_tuples: 205*8975f5c5SAndroid Build Coastguard Worker return 206*8975f5c5SAndroid Build Coastguard Worker 207*8975f5c5SAndroid Build Coastguard Worker if DISABLE_ASYNC: 208*8975f5c5SAndroid Build Coastguard Worker for args in arg_tuples: 209*8975f5c5SAndroid Build Coastguard Worker yield func(*args, **kwargs) 210*8975f5c5SAndroid Build Coastguard Worker return 211*8975f5c5SAndroid Build Coastguard Worker 212*8975f5c5SAndroid Build Coastguard Worker pool = _MakeProcessPool(arg_tuples, **kwargs) 213*8975f5c5SAndroid Build Coastguard Worker wrapped_func = _FuncWrapper(func) 214*8975f5c5SAndroid Build Coastguard Worker try: 215*8975f5c5SAndroid Build Coastguard Worker for result in pool.imap(wrapped_func, range(len(arg_tuples))): 216*8975f5c5SAndroid Build Coastguard Worker _CheckForException(result) 217*8975f5c5SAndroid Build Coastguard Worker yield result 218*8975f5c5SAndroid Build Coastguard Worker finally: 219*8975f5c5SAndroid Build Coastguard Worker pool.close() 220*8975f5c5SAndroid Build Coastguard Worker pool.join() 221*8975f5c5SAndroid Build Coastguard Worker _all_pools.remove(pool) 222