xref: /aosp_15_r20/external/angle/build/android/gyp/util/parallel.py (revision 8975f5c5ed3d1c378011245431ada316dfb6f244)
1*8975f5c5SAndroid Build Coastguard Worker# Copyright 2020 The Chromium Authors
2*8975f5c5SAndroid Build Coastguard Worker# Use of this source code is governed by a BSD-style license that can be
3*8975f5c5SAndroid Build Coastguard Worker# found in the LICENSE file.
4*8975f5c5SAndroid Build Coastguard Worker"""Helpers related to multiprocessing.
5*8975f5c5SAndroid Build Coastguard Worker
6*8975f5c5SAndroid Build Coastguard WorkerBased on: //tools/binary_size/libsupersize/parallel.py
7*8975f5c5SAndroid Build Coastguard Worker"""
8*8975f5c5SAndroid Build Coastguard Worker
9*8975f5c5SAndroid Build Coastguard Workerimport atexit
10*8975f5c5SAndroid Build Coastguard Workerimport logging
11*8975f5c5SAndroid Build Coastguard Workerimport multiprocessing
12*8975f5c5SAndroid Build Coastguard Workerimport os
13*8975f5c5SAndroid Build Coastguard Workerimport sys
14*8975f5c5SAndroid Build Coastguard Workerimport threading
15*8975f5c5SAndroid Build Coastguard Workerimport traceback
16*8975f5c5SAndroid Build Coastguard Worker
17*8975f5c5SAndroid Build Coastguard WorkerDISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1'
18*8975f5c5SAndroid Build Coastguard Workerif DISABLE_ASYNC:
19*8975f5c5SAndroid Build Coastguard Worker  logging.warning('Running in synchronous mode.')
20*8975f5c5SAndroid Build Coastguard Worker
21*8975f5c5SAndroid Build Coastguard Worker_all_pools = None
22*8975f5c5SAndroid Build Coastguard Worker_is_child_process = False
23*8975f5c5SAndroid Build Coastguard Worker_silence_exceptions = False
24*8975f5c5SAndroid Build Coastguard Worker
25*8975f5c5SAndroid Build Coastguard Worker# Used to pass parameters to forked processes without pickling.
26*8975f5c5SAndroid Build Coastguard Worker_fork_params = None
27*8975f5c5SAndroid Build Coastguard Worker_fork_kwargs = None
28*8975f5c5SAndroid Build Coastguard Worker
29*8975f5c5SAndroid Build Coastguard Worker# Ensure fork is used on MacOS for multiprocessing compatibility.
30*8975f5c5SAndroid Build Coastguard Worker# Starting from Python 3.8, the "spawn" method is the default on MacOS.
31*8975f5c5SAndroid Build Coastguard Worker# On Linux hosts this line will be a no-op.
32*8975f5c5SAndroid Build Coastguard Workermultiprocessing.set_start_method('fork')
33*8975f5c5SAndroid Build Coastguard Worker
34*8975f5c5SAndroid Build Coastguard Workerclass _ImmediateResult:
35*8975f5c5SAndroid Build Coastguard Worker  def __init__(self, value):
36*8975f5c5SAndroid Build Coastguard Worker    self._value = value
37*8975f5c5SAndroid Build Coastguard Worker
38*8975f5c5SAndroid Build Coastguard Worker  def get(self):
39*8975f5c5SAndroid Build Coastguard Worker    return self._value
40*8975f5c5SAndroid Build Coastguard Worker
41*8975f5c5SAndroid Build Coastguard Worker  def wait(self):
42*8975f5c5SAndroid Build Coastguard Worker    pass
43*8975f5c5SAndroid Build Coastguard Worker
44*8975f5c5SAndroid Build Coastguard Worker  def ready(self):
45*8975f5c5SAndroid Build Coastguard Worker    return True
46*8975f5c5SAndroid Build Coastguard Worker
47*8975f5c5SAndroid Build Coastguard Worker  def successful(self):
48*8975f5c5SAndroid Build Coastguard Worker    return True
49*8975f5c5SAndroid Build Coastguard Worker
50*8975f5c5SAndroid Build Coastguard Worker
51*8975f5c5SAndroid Build Coastguard Workerclass _ExceptionWrapper:
52*8975f5c5SAndroid Build Coastguard Worker  """Used to marshal exception messages back to main process."""
53*8975f5c5SAndroid Build Coastguard Worker
54*8975f5c5SAndroid Build Coastguard Worker  def __init__(self, msg, exception_type=None):
55*8975f5c5SAndroid Build Coastguard Worker    self.msg = msg
56*8975f5c5SAndroid Build Coastguard Worker    self.exception_type = exception_type
57*8975f5c5SAndroid Build Coastguard Worker
58*8975f5c5SAndroid Build Coastguard Worker  def MaybeThrow(self):
59*8975f5c5SAndroid Build Coastguard Worker    if self.exception_type:
60*8975f5c5SAndroid Build Coastguard Worker      raise getattr(__builtins__,
61*8975f5c5SAndroid Build Coastguard Worker                    self.exception_type)('Originally caused by: ' + self.msg)
62*8975f5c5SAndroid Build Coastguard Worker
63*8975f5c5SAndroid Build Coastguard Worker
64*8975f5c5SAndroid Build Coastguard Workerclass _FuncWrapper:
65*8975f5c5SAndroid Build Coastguard Worker  """Runs on the fork()'ed side to catch exceptions and spread *args."""
66*8975f5c5SAndroid Build Coastguard Worker
67*8975f5c5SAndroid Build Coastguard Worker  def __init__(self, func):
68*8975f5c5SAndroid Build Coastguard Worker    global _is_child_process
69*8975f5c5SAndroid Build Coastguard Worker    _is_child_process = True
70*8975f5c5SAndroid Build Coastguard Worker    self._func = func
71*8975f5c5SAndroid Build Coastguard Worker
72*8975f5c5SAndroid Build Coastguard Worker  def __call__(self, index, _=None):
73*8975f5c5SAndroid Build Coastguard Worker    global _fork_kwargs
74*8975f5c5SAndroid Build Coastguard Worker    try:
75*8975f5c5SAndroid Build Coastguard Worker      if _fork_kwargs is None:  # Clarifies _fork_kwargs is map for pylint.
76*8975f5c5SAndroid Build Coastguard Worker        _fork_kwargs = {}
77*8975f5c5SAndroid Build Coastguard Worker      return self._func(*_fork_params[index], **_fork_kwargs)
78*8975f5c5SAndroid Build Coastguard Worker    except Exception as e:
79*8975f5c5SAndroid Build Coastguard Worker      # Only keep the exception type for builtin exception types or else risk
80*8975f5c5SAndroid Build Coastguard Worker      # further marshalling exceptions.
81*8975f5c5SAndroid Build Coastguard Worker      exception_type = None
82*8975f5c5SAndroid Build Coastguard Worker      if hasattr(__builtins__, type(e).__name__):
83*8975f5c5SAndroid Build Coastguard Worker        exception_type = type(e).__name__
84*8975f5c5SAndroid Build Coastguard Worker      # multiprocessing is supposed to catch and return exceptions automatically
85*8975f5c5SAndroid Build Coastguard Worker      # but it doesn't seem to work properly :(.
86*8975f5c5SAndroid Build Coastguard Worker      return _ExceptionWrapper(traceback.format_exc(), exception_type)
87*8975f5c5SAndroid Build Coastguard Worker    except:  # pylint: disable=bare-except
88*8975f5c5SAndroid Build Coastguard Worker      return _ExceptionWrapper(traceback.format_exc())
89*8975f5c5SAndroid Build Coastguard Worker
90*8975f5c5SAndroid Build Coastguard Worker
91*8975f5c5SAndroid Build Coastguard Workerclass _WrappedResult:
92*8975f5c5SAndroid Build Coastguard Worker  """Allows for host-side logic to be run after child process has terminated.
93*8975f5c5SAndroid Build Coastguard Worker
94*8975f5c5SAndroid Build Coastguard Worker  * Unregisters associated pool _all_pools.
95*8975f5c5SAndroid Build Coastguard Worker  * Raises exception caught by _FuncWrapper.
96*8975f5c5SAndroid Build Coastguard Worker  """
97*8975f5c5SAndroid Build Coastguard Worker
98*8975f5c5SAndroid Build Coastguard Worker  def __init__(self, result, pool=None):
99*8975f5c5SAndroid Build Coastguard Worker    self._result = result
100*8975f5c5SAndroid Build Coastguard Worker    self._pool = pool
101*8975f5c5SAndroid Build Coastguard Worker
102*8975f5c5SAndroid Build Coastguard Worker  def get(self):
103*8975f5c5SAndroid Build Coastguard Worker    self.wait()
104*8975f5c5SAndroid Build Coastguard Worker    value = self._result.get()
105*8975f5c5SAndroid Build Coastguard Worker    _CheckForException(value)
106*8975f5c5SAndroid Build Coastguard Worker    return value
107*8975f5c5SAndroid Build Coastguard Worker
108*8975f5c5SAndroid Build Coastguard Worker  def wait(self):
109*8975f5c5SAndroid Build Coastguard Worker    self._result.wait()
110*8975f5c5SAndroid Build Coastguard Worker    if self._pool:
111*8975f5c5SAndroid Build Coastguard Worker      _all_pools.remove(self._pool)
112*8975f5c5SAndroid Build Coastguard Worker      self._pool = None
113*8975f5c5SAndroid Build Coastguard Worker
114*8975f5c5SAndroid Build Coastguard Worker  def ready(self):
115*8975f5c5SAndroid Build Coastguard Worker    return self._result.ready()
116*8975f5c5SAndroid Build Coastguard Worker
117*8975f5c5SAndroid Build Coastguard Worker  def successful(self):
118*8975f5c5SAndroid Build Coastguard Worker    return self._result.successful()
119*8975f5c5SAndroid Build Coastguard Worker
120*8975f5c5SAndroid Build Coastguard Worker
121*8975f5c5SAndroid Build Coastguard Workerdef _TerminatePools():
122*8975f5c5SAndroid Build Coastguard Worker  """Calls .terminate() on all active process pools.
123*8975f5c5SAndroid Build Coastguard Worker
124*8975f5c5SAndroid Build Coastguard Worker  Not supposed to be necessary according to the docs, but seems to be required
125*8975f5c5SAndroid Build Coastguard Worker  when child process throws an exception or Ctrl-C is hit.
126*8975f5c5SAndroid Build Coastguard Worker  """
127*8975f5c5SAndroid Build Coastguard Worker  global _silence_exceptions
128*8975f5c5SAndroid Build Coastguard Worker  _silence_exceptions = True
129*8975f5c5SAndroid Build Coastguard Worker  # Child processes cannot have pools, but atexit runs this function because
130*8975f5c5SAndroid Build Coastguard Worker  # it was registered before fork()ing.
131*8975f5c5SAndroid Build Coastguard Worker  if _is_child_process:
132*8975f5c5SAndroid Build Coastguard Worker    return
133*8975f5c5SAndroid Build Coastguard Worker
134*8975f5c5SAndroid Build Coastguard Worker  def close_pool(pool):
135*8975f5c5SAndroid Build Coastguard Worker    try:
136*8975f5c5SAndroid Build Coastguard Worker      pool.terminate()
137*8975f5c5SAndroid Build Coastguard Worker    except:  # pylint: disable=bare-except
138*8975f5c5SAndroid Build Coastguard Worker      pass
139*8975f5c5SAndroid Build Coastguard Worker
140*8975f5c5SAndroid Build Coastguard Worker  for i, pool in enumerate(_all_pools):
141*8975f5c5SAndroid Build Coastguard Worker    # Without calling terminate() on a separate thread, the call can block
142*8975f5c5SAndroid Build Coastguard Worker    # forever.
143*8975f5c5SAndroid Build Coastguard Worker    thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
144*8975f5c5SAndroid Build Coastguard Worker                              target=close_pool,
145*8975f5c5SAndroid Build Coastguard Worker                              args=(pool, ))
146*8975f5c5SAndroid Build Coastguard Worker    thread.daemon = True
147*8975f5c5SAndroid Build Coastguard Worker    thread.start()
148*8975f5c5SAndroid Build Coastguard Worker
149*8975f5c5SAndroid Build Coastguard Worker
150*8975f5c5SAndroid Build Coastguard Workerdef _CheckForException(value):
151*8975f5c5SAndroid Build Coastguard Worker  if isinstance(value, _ExceptionWrapper):
152*8975f5c5SAndroid Build Coastguard Worker    global _silence_exceptions
153*8975f5c5SAndroid Build Coastguard Worker    if not _silence_exceptions:
154*8975f5c5SAndroid Build Coastguard Worker      value.MaybeThrow()
155*8975f5c5SAndroid Build Coastguard Worker      _silence_exceptions = True
156*8975f5c5SAndroid Build Coastguard Worker      logging.error('Subprocess raised an exception:\n%s', value.msg)
157*8975f5c5SAndroid Build Coastguard Worker    sys.exit(1)
158*8975f5c5SAndroid Build Coastguard Worker
159*8975f5c5SAndroid Build Coastguard Worker
160*8975f5c5SAndroid Build Coastguard Workerdef _MakeProcessPool(job_params, **job_kwargs):
161*8975f5c5SAndroid Build Coastguard Worker  global _all_pools
162*8975f5c5SAndroid Build Coastguard Worker  global _fork_params
163*8975f5c5SAndroid Build Coastguard Worker  global _fork_kwargs
164*8975f5c5SAndroid Build Coastguard Worker  assert _fork_params is None
165*8975f5c5SAndroid Build Coastguard Worker  assert _fork_kwargs is None
166*8975f5c5SAndroid Build Coastguard Worker  pool_size = min(len(job_params), multiprocessing.cpu_count())
167*8975f5c5SAndroid Build Coastguard Worker  _fork_params = job_params
168*8975f5c5SAndroid Build Coastguard Worker  _fork_kwargs = job_kwargs
169*8975f5c5SAndroid Build Coastguard Worker  ret = multiprocessing.Pool(pool_size)
170*8975f5c5SAndroid Build Coastguard Worker  _fork_params = None
171*8975f5c5SAndroid Build Coastguard Worker  _fork_kwargs = None
172*8975f5c5SAndroid Build Coastguard Worker  if _all_pools is None:
173*8975f5c5SAndroid Build Coastguard Worker    _all_pools = []
174*8975f5c5SAndroid Build Coastguard Worker    atexit.register(_TerminatePools)
175*8975f5c5SAndroid Build Coastguard Worker  _all_pools.append(ret)
176*8975f5c5SAndroid Build Coastguard Worker  return ret
177*8975f5c5SAndroid Build Coastguard Worker
178*8975f5c5SAndroid Build Coastguard Worker
179*8975f5c5SAndroid Build Coastguard Workerdef ForkAndCall(func, args):
180*8975f5c5SAndroid Build Coastguard Worker  """Runs |func| in a fork'ed process.
181*8975f5c5SAndroid Build Coastguard Worker
182*8975f5c5SAndroid Build Coastguard Worker  Returns:
183*8975f5c5SAndroid Build Coastguard Worker    A Result object (call .get() to get the return value)
184*8975f5c5SAndroid Build Coastguard Worker  """
185*8975f5c5SAndroid Build Coastguard Worker  if DISABLE_ASYNC:
186*8975f5c5SAndroid Build Coastguard Worker    pool = None
187*8975f5c5SAndroid Build Coastguard Worker    result = _ImmediateResult(func(*args))
188*8975f5c5SAndroid Build Coastguard Worker  else:
189*8975f5c5SAndroid Build Coastguard Worker    pool = _MakeProcessPool([args])  # Omit |kwargs|.
190*8975f5c5SAndroid Build Coastguard Worker    result = pool.apply_async(_FuncWrapper(func), (0, ))
191*8975f5c5SAndroid Build Coastguard Worker    pool.close()
192*8975f5c5SAndroid Build Coastguard Worker  return _WrappedResult(result, pool=pool)
193*8975f5c5SAndroid Build Coastguard Worker
194*8975f5c5SAndroid Build Coastguard Worker
195*8975f5c5SAndroid Build Coastguard Workerdef BulkForkAndCall(func, arg_tuples, **kwargs):
196*8975f5c5SAndroid Build Coastguard Worker  """Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
197*8975f5c5SAndroid Build Coastguard Worker
198*8975f5c5SAndroid Build Coastguard Worker  Args:
199*8975f5c5SAndroid Build Coastguard Worker    kwargs: Common keyword arguments to be passed to |func|.
200*8975f5c5SAndroid Build Coastguard Worker
201*8975f5c5SAndroid Build Coastguard Worker  Yields the return values in order.
202*8975f5c5SAndroid Build Coastguard Worker  """
203*8975f5c5SAndroid Build Coastguard Worker  arg_tuples = list(arg_tuples)
204*8975f5c5SAndroid Build Coastguard Worker  if not arg_tuples:
205*8975f5c5SAndroid Build Coastguard Worker    return
206*8975f5c5SAndroid Build Coastguard Worker
207*8975f5c5SAndroid Build Coastguard Worker  if DISABLE_ASYNC:
208*8975f5c5SAndroid Build Coastguard Worker    for args in arg_tuples:
209*8975f5c5SAndroid Build Coastguard Worker      yield func(*args, **kwargs)
210*8975f5c5SAndroid Build Coastguard Worker    return
211*8975f5c5SAndroid Build Coastguard Worker
212*8975f5c5SAndroid Build Coastguard Worker  pool = _MakeProcessPool(arg_tuples, **kwargs)
213*8975f5c5SAndroid Build Coastguard Worker  wrapped_func = _FuncWrapper(func)
214*8975f5c5SAndroid Build Coastguard Worker  try:
215*8975f5c5SAndroid Build Coastguard Worker    for result in pool.imap(wrapped_func, range(len(arg_tuples))):
216*8975f5c5SAndroid Build Coastguard Worker      _CheckForException(result)
217*8975f5c5SAndroid Build Coastguard Worker      yield result
218*8975f5c5SAndroid Build Coastguard Worker  finally:
219*8975f5c5SAndroid Build Coastguard Worker    pool.close()
220*8975f5c5SAndroid Build Coastguard Worker    pool.join()
221*8975f5c5SAndroid Build Coastguard Worker    _all_pools.remove(pool)
222