xref: /aosp_15_r20/external/cronet/build/android/gyp/util/parallel.py (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1# Copyright 2020 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Helpers related to multiprocessing.
5
6Based on: //tools/binary_size/libsupersize/parallel.py
7"""
8
9import atexit
10import logging
11import multiprocessing
12import os
13import sys
14import threading
15import traceback
16
17DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1'
18if DISABLE_ASYNC:
19  logging.warning('Running in synchronous mode.')
20
21_all_pools = None
22_is_child_process = False
23_silence_exceptions = False
24
25# Used to pass parameters to forked processes without pickling.
26_fork_params = None
27_fork_kwargs = None
28
29
30class _ImmediateResult:
31  def __init__(self, value):
32    self._value = value
33
34  def get(self):
35    return self._value
36
37  def wait(self):
38    pass
39
40  def ready(self):
41    return True
42
43  def successful(self):
44    return True
45
46
47class _ExceptionWrapper:
48  """Used to marshal exception messages back to main process."""
49
50  def __init__(self, msg, exception_type=None):
51    self.msg = msg
52    self.exception_type = exception_type
53
54  def MaybeThrow(self):
55    if self.exception_type:
56      raise getattr(__builtins__,
57                    self.exception_type)('Originally caused by: ' + self.msg)
58
59
60class _FuncWrapper:
61  """Runs on the fork()'ed side to catch exceptions and spread *args."""
62
63  def __init__(self, func):
64    global _is_child_process
65    _is_child_process = True
66    self._func = func
67
68  def __call__(self, index, _=None):
69    global _fork_kwargs
70    try:
71      if _fork_kwargs is None:  # Clarifies _fork_kwargs is map for pylint.
72        _fork_kwargs = {}
73      return self._func(*_fork_params[index], **_fork_kwargs)
74    except Exception as e:
75      # Only keep the exception type for builtin exception types or else risk
76      # further marshalling exceptions.
77      exception_type = None
78      if hasattr(__builtins__, type(e).__name__):
79        exception_type = type(e).__name__
80      # multiprocessing is supposed to catch and return exceptions automatically
81      # but it doesn't seem to work properly :(.
82      return _ExceptionWrapper(traceback.format_exc(), exception_type)
83    except:  # pylint: disable=bare-except
84      return _ExceptionWrapper(traceback.format_exc())
85
86
87class _WrappedResult:
88  """Allows for host-side logic to be run after child process has terminated.
89
90  * Unregisters associated pool _all_pools.
91  * Raises exception caught by _FuncWrapper.
92  """
93
94  def __init__(self, result, pool=None):
95    self._result = result
96    self._pool = pool
97
98  def get(self):
99    self.wait()
100    value = self._result.get()
101    _CheckForException(value)
102    return value
103
104  def wait(self):
105    self._result.wait()
106    if self._pool:
107      _all_pools.remove(self._pool)
108      self._pool = None
109
110  def ready(self):
111    return self._result.ready()
112
113  def successful(self):
114    return self._result.successful()
115
116
117def _TerminatePools():
118  """Calls .terminate() on all active process pools.
119
120  Not supposed to be necessary according to the docs, but seems to be required
121  when child process throws an exception or Ctrl-C is hit.
122  """
123  global _silence_exceptions
124  _silence_exceptions = True
125  # Child processes cannot have pools, but atexit runs this function because
126  # it was registered before fork()ing.
127  if _is_child_process:
128    return
129
130  def close_pool(pool):
131    try:
132      pool.terminate()
133    except:  # pylint: disable=bare-except
134      pass
135
136  for i, pool in enumerate(_all_pools):
137    # Without calling terminate() on a separate thread, the call can block
138    # forever.
139    thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
140                              target=close_pool,
141                              args=(pool, ))
142    thread.daemon = True
143    thread.start()
144
145
146def _CheckForException(value):
147  if isinstance(value, _ExceptionWrapper):
148    global _silence_exceptions
149    if not _silence_exceptions:
150      value.MaybeThrow()
151      _silence_exceptions = True
152      logging.error('Subprocess raised an exception:\n%s', value.msg)
153    sys.exit(1)
154
155
156def _MakeProcessPool(job_params, **job_kwargs):
157  global _all_pools
158  global _fork_params
159  global _fork_kwargs
160  assert _fork_params is None
161  assert _fork_kwargs is None
162  pool_size = min(len(job_params), multiprocessing.cpu_count())
163  _fork_params = job_params
164  _fork_kwargs = job_kwargs
165  ret = multiprocessing.Pool(pool_size)
166  _fork_params = None
167  _fork_kwargs = None
168  if _all_pools is None:
169    _all_pools = []
170    atexit.register(_TerminatePools)
171  _all_pools.append(ret)
172  return ret
173
174
175def ForkAndCall(func, args):
176  """Runs |func| in a fork'ed process.
177
178  Returns:
179    A Result object (call .get() to get the return value)
180  """
181  if DISABLE_ASYNC:
182    pool = None
183    result = _ImmediateResult(func(*args))
184  else:
185    pool = _MakeProcessPool([args])  # Omit |kwargs|.
186    result = pool.apply_async(_FuncWrapper(func), (0, ))
187    pool.close()
188  return _WrappedResult(result, pool=pool)
189
190
191def BulkForkAndCall(func, arg_tuples, **kwargs):
192  """Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
193
194  Args:
195    kwargs: Common keyword arguments to be passed to |func|.
196
197  Yields the return values in order.
198  """
199  arg_tuples = list(arg_tuples)
200  if not arg_tuples:
201    return
202
203  if DISABLE_ASYNC:
204    for args in arg_tuples:
205      yield func(*args, **kwargs)
206    return
207
208  pool = _MakeProcessPool(arg_tuples, **kwargs)
209  wrapped_func = _FuncWrapper(func)
210  try:
211    for result in pool.imap(wrapped_func, range(len(arg_tuples))):
212      _CheckForException(result)
213      yield result
214  finally:
215    pool.close()
216    pool.join()
217    _all_pools.remove(pool)
218