xref: /aosp_15_r20/external/angle/build/android/gyp/util/parallel.py (revision 8975f5c5ed3d1c378011245431ada316dfb6f244)
1# Copyright 2020 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Helpers related to multiprocessing.
5
6Based on: //tools/binary_size/libsupersize/parallel.py
7"""
8
9import atexit
10import logging
11import multiprocessing
12import os
13import sys
14import threading
15import traceback
16
17DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1'
18if DISABLE_ASYNC:
19  logging.warning('Running in synchronous mode.')
20
21_all_pools = None
22_is_child_process = False
23_silence_exceptions = False
24
25# Used to pass parameters to forked processes without pickling.
26_fork_params = None
27_fork_kwargs = None
28
29# Ensure fork is used on MacOS for multiprocessing compatibility.
30# Starting from Python 3.8, the "spawn" method is the default on MacOS.
31# On Linux hosts this line will be a no-op.
32multiprocessing.set_start_method('fork')
33
34class _ImmediateResult:
35  def __init__(self, value):
36    self._value = value
37
38  def get(self):
39    return self._value
40
41  def wait(self):
42    pass
43
44  def ready(self):
45    return True
46
47  def successful(self):
48    return True
49
50
51class _ExceptionWrapper:
52  """Used to marshal exception messages back to main process."""
53
54  def __init__(self, msg, exception_type=None):
55    self.msg = msg
56    self.exception_type = exception_type
57
58  def MaybeThrow(self):
59    if self.exception_type:
60      raise getattr(__builtins__,
61                    self.exception_type)('Originally caused by: ' + self.msg)
62
63
64class _FuncWrapper:
65  """Runs on the fork()'ed side to catch exceptions and spread *args."""
66
67  def __init__(self, func):
68    global _is_child_process
69    _is_child_process = True
70    self._func = func
71
72  def __call__(self, index, _=None):
73    global _fork_kwargs
74    try:
75      if _fork_kwargs is None:  # Clarifies _fork_kwargs is map for pylint.
76        _fork_kwargs = {}
77      return self._func(*_fork_params[index], **_fork_kwargs)
78    except Exception as e:
79      # Only keep the exception type for builtin exception types or else risk
80      # further marshalling exceptions.
81      exception_type = None
82      if hasattr(__builtins__, type(e).__name__):
83        exception_type = type(e).__name__
84      # multiprocessing is supposed to catch and return exceptions automatically
85      # but it doesn't seem to work properly :(.
86      return _ExceptionWrapper(traceback.format_exc(), exception_type)
87    except:  # pylint: disable=bare-except
88      return _ExceptionWrapper(traceback.format_exc())
89
90
91class _WrappedResult:
92  """Allows for host-side logic to be run after child process has terminated.
93
94  * Unregisters associated pool _all_pools.
95  * Raises exception caught by _FuncWrapper.
96  """
97
98  def __init__(self, result, pool=None):
99    self._result = result
100    self._pool = pool
101
102  def get(self):
103    self.wait()
104    value = self._result.get()
105    _CheckForException(value)
106    return value
107
108  def wait(self):
109    self._result.wait()
110    if self._pool:
111      _all_pools.remove(self._pool)
112      self._pool = None
113
114  def ready(self):
115    return self._result.ready()
116
117  def successful(self):
118    return self._result.successful()
119
120
121def _TerminatePools():
122  """Calls .terminate() on all active process pools.
123
124  Not supposed to be necessary according to the docs, but seems to be required
125  when child process throws an exception or Ctrl-C is hit.
126  """
127  global _silence_exceptions
128  _silence_exceptions = True
129  # Child processes cannot have pools, but atexit runs this function because
130  # it was registered before fork()ing.
131  if _is_child_process:
132    return
133
134  def close_pool(pool):
135    try:
136      pool.terminate()
137    except:  # pylint: disable=bare-except
138      pass
139
140  for i, pool in enumerate(_all_pools):
141    # Without calling terminate() on a separate thread, the call can block
142    # forever.
143    thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
144                              target=close_pool,
145                              args=(pool, ))
146    thread.daemon = True
147    thread.start()
148
149
150def _CheckForException(value):
151  if isinstance(value, _ExceptionWrapper):
152    global _silence_exceptions
153    if not _silence_exceptions:
154      value.MaybeThrow()
155      _silence_exceptions = True
156      logging.error('Subprocess raised an exception:\n%s', value.msg)
157    sys.exit(1)
158
159
160def _MakeProcessPool(job_params, **job_kwargs):
161  global _all_pools
162  global _fork_params
163  global _fork_kwargs
164  assert _fork_params is None
165  assert _fork_kwargs is None
166  pool_size = min(len(job_params), multiprocessing.cpu_count())
167  _fork_params = job_params
168  _fork_kwargs = job_kwargs
169  ret = multiprocessing.Pool(pool_size)
170  _fork_params = None
171  _fork_kwargs = None
172  if _all_pools is None:
173    _all_pools = []
174    atexit.register(_TerminatePools)
175  _all_pools.append(ret)
176  return ret
177
178
179def ForkAndCall(func, args):
180  """Runs |func| in a fork'ed process.
181
182  Returns:
183    A Result object (call .get() to get the return value)
184  """
185  if DISABLE_ASYNC:
186    pool = None
187    result = _ImmediateResult(func(*args))
188  else:
189    pool = _MakeProcessPool([args])  # Omit |kwargs|.
190    result = pool.apply_async(_FuncWrapper(func), (0, ))
191    pool.close()
192  return _WrappedResult(result, pool=pool)
193
194
195def BulkForkAndCall(func, arg_tuples, **kwargs):
196  """Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
197
198  Args:
199    kwargs: Common keyword arguments to be passed to |func|.
200
201  Yields the return values in order.
202  """
203  arg_tuples = list(arg_tuples)
204  if not arg_tuples:
205    return
206
207  if DISABLE_ASYNC:
208    for args in arg_tuples:
209      yield func(*args, **kwargs)
210    return
211
212  pool = _MakeProcessPool(arg_tuples, **kwargs)
213  wrapped_func = _FuncWrapper(func)
214  try:
215    for result in pool.imap(wrapped_func, range(len(arg_tuples))):
216      _CheckForException(result)
217      yield result
218  finally:
219    pool.close()
220    pool.join()
221    _all_pools.remove(pool)
222