xref: /aosp_15_r20/external/cronet/build/android/gyp/util/parallel.py (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1*6777b538SAndroid Build Coastguard Worker# Copyright 2020 The Chromium Authors
2*6777b538SAndroid Build Coastguard Worker# Use of this source code is governed by a BSD-style license that can be
3*6777b538SAndroid Build Coastguard Worker# found in the LICENSE file.
4*6777b538SAndroid Build Coastguard Worker"""Helpers related to multiprocessing.
5*6777b538SAndroid Build Coastguard Worker
6*6777b538SAndroid Build Coastguard WorkerBased on: //tools/binary_size/libsupersize/parallel.py
7*6777b538SAndroid Build Coastguard Worker"""
8*6777b538SAndroid Build Coastguard Worker
9*6777b538SAndroid Build Coastguard Workerimport atexit
10*6777b538SAndroid Build Coastguard Workerimport logging
11*6777b538SAndroid Build Coastguard Workerimport multiprocessing
12*6777b538SAndroid Build Coastguard Workerimport os
13*6777b538SAndroid Build Coastguard Workerimport sys
14*6777b538SAndroid Build Coastguard Workerimport threading
15*6777b538SAndroid Build Coastguard Workerimport traceback
16*6777b538SAndroid Build Coastguard Worker
17*6777b538SAndroid Build Coastguard WorkerDISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1'
18*6777b538SAndroid Build Coastguard Workerif DISABLE_ASYNC:
19*6777b538SAndroid Build Coastguard Worker  logging.warning('Running in synchronous mode.')
20*6777b538SAndroid Build Coastguard Worker
21*6777b538SAndroid Build Coastguard Worker_all_pools = None
22*6777b538SAndroid Build Coastguard Worker_is_child_process = False
23*6777b538SAndroid Build Coastguard Worker_silence_exceptions = False
24*6777b538SAndroid Build Coastguard Worker
25*6777b538SAndroid Build Coastguard Worker# Used to pass parameters to forked processes without pickling.
26*6777b538SAndroid Build Coastguard Worker_fork_params = None
27*6777b538SAndroid Build Coastguard Worker_fork_kwargs = None
28*6777b538SAndroid Build Coastguard Worker
29*6777b538SAndroid Build Coastguard Worker
30*6777b538SAndroid Build Coastguard Workerclass _ImmediateResult:
31*6777b538SAndroid Build Coastguard Worker  def __init__(self, value):
32*6777b538SAndroid Build Coastguard Worker    self._value = value
33*6777b538SAndroid Build Coastguard Worker
34*6777b538SAndroid Build Coastguard Worker  def get(self):
35*6777b538SAndroid Build Coastguard Worker    return self._value
36*6777b538SAndroid Build Coastguard Worker
37*6777b538SAndroid Build Coastguard Worker  def wait(self):
38*6777b538SAndroid Build Coastguard Worker    pass
39*6777b538SAndroid Build Coastguard Worker
40*6777b538SAndroid Build Coastguard Worker  def ready(self):
41*6777b538SAndroid Build Coastguard Worker    return True
42*6777b538SAndroid Build Coastguard Worker
43*6777b538SAndroid Build Coastguard Worker  def successful(self):
44*6777b538SAndroid Build Coastguard Worker    return True
45*6777b538SAndroid Build Coastguard Worker
46*6777b538SAndroid Build Coastguard Worker
47*6777b538SAndroid Build Coastguard Workerclass _ExceptionWrapper:
48*6777b538SAndroid Build Coastguard Worker  """Used to marshal exception messages back to main process."""
49*6777b538SAndroid Build Coastguard Worker
50*6777b538SAndroid Build Coastguard Worker  def __init__(self, msg, exception_type=None):
51*6777b538SAndroid Build Coastguard Worker    self.msg = msg
52*6777b538SAndroid Build Coastguard Worker    self.exception_type = exception_type
53*6777b538SAndroid Build Coastguard Worker
54*6777b538SAndroid Build Coastguard Worker  def MaybeThrow(self):
55*6777b538SAndroid Build Coastguard Worker    if self.exception_type:
56*6777b538SAndroid Build Coastguard Worker      raise getattr(__builtins__,
57*6777b538SAndroid Build Coastguard Worker                    self.exception_type)('Originally caused by: ' + self.msg)
58*6777b538SAndroid Build Coastguard Worker
59*6777b538SAndroid Build Coastguard Worker
60*6777b538SAndroid Build Coastguard Workerclass _FuncWrapper:
61*6777b538SAndroid Build Coastguard Worker  """Runs on the fork()'ed side to catch exceptions and spread *args."""
62*6777b538SAndroid Build Coastguard Worker
63*6777b538SAndroid Build Coastguard Worker  def __init__(self, func):
64*6777b538SAndroid Build Coastguard Worker    global _is_child_process
65*6777b538SAndroid Build Coastguard Worker    _is_child_process = True
66*6777b538SAndroid Build Coastguard Worker    self._func = func
67*6777b538SAndroid Build Coastguard Worker
68*6777b538SAndroid Build Coastguard Worker  def __call__(self, index, _=None):
69*6777b538SAndroid Build Coastguard Worker    global _fork_kwargs
70*6777b538SAndroid Build Coastguard Worker    try:
71*6777b538SAndroid Build Coastguard Worker      if _fork_kwargs is None:  # Clarifies _fork_kwargs is map for pylint.
72*6777b538SAndroid Build Coastguard Worker        _fork_kwargs = {}
73*6777b538SAndroid Build Coastguard Worker      return self._func(*_fork_params[index], **_fork_kwargs)
74*6777b538SAndroid Build Coastguard Worker    except Exception as e:
75*6777b538SAndroid Build Coastguard Worker      # Only keep the exception type for builtin exception types or else risk
76*6777b538SAndroid Build Coastguard Worker      # further marshalling exceptions.
77*6777b538SAndroid Build Coastguard Worker      exception_type = None
78*6777b538SAndroid Build Coastguard Worker      if hasattr(__builtins__, type(e).__name__):
79*6777b538SAndroid Build Coastguard Worker        exception_type = type(e).__name__
80*6777b538SAndroid Build Coastguard Worker      # multiprocessing is supposed to catch and return exceptions automatically
81*6777b538SAndroid Build Coastguard Worker      # but it doesn't seem to work properly :(.
82*6777b538SAndroid Build Coastguard Worker      return _ExceptionWrapper(traceback.format_exc(), exception_type)
83*6777b538SAndroid Build Coastguard Worker    except:  # pylint: disable=bare-except
84*6777b538SAndroid Build Coastguard Worker      return _ExceptionWrapper(traceback.format_exc())
85*6777b538SAndroid Build Coastguard Worker
86*6777b538SAndroid Build Coastguard Worker
87*6777b538SAndroid Build Coastguard Workerclass _WrappedResult:
88*6777b538SAndroid Build Coastguard Worker  """Allows for host-side logic to be run after child process has terminated.
89*6777b538SAndroid Build Coastguard Worker
90*6777b538SAndroid Build Coastguard Worker  * Unregisters associated pool _all_pools.
91*6777b538SAndroid Build Coastguard Worker  * Raises exception caught by _FuncWrapper.
92*6777b538SAndroid Build Coastguard Worker  """
93*6777b538SAndroid Build Coastguard Worker
94*6777b538SAndroid Build Coastguard Worker  def __init__(self, result, pool=None):
95*6777b538SAndroid Build Coastguard Worker    self._result = result
96*6777b538SAndroid Build Coastguard Worker    self._pool = pool
97*6777b538SAndroid Build Coastguard Worker
98*6777b538SAndroid Build Coastguard Worker  def get(self):
99*6777b538SAndroid Build Coastguard Worker    self.wait()
100*6777b538SAndroid Build Coastguard Worker    value = self._result.get()
101*6777b538SAndroid Build Coastguard Worker    _CheckForException(value)
102*6777b538SAndroid Build Coastguard Worker    return value
103*6777b538SAndroid Build Coastguard Worker
104*6777b538SAndroid Build Coastguard Worker  def wait(self):
105*6777b538SAndroid Build Coastguard Worker    self._result.wait()
106*6777b538SAndroid Build Coastguard Worker    if self._pool:
107*6777b538SAndroid Build Coastguard Worker      _all_pools.remove(self._pool)
108*6777b538SAndroid Build Coastguard Worker      self._pool = None
109*6777b538SAndroid Build Coastguard Worker
110*6777b538SAndroid Build Coastguard Worker  def ready(self):
111*6777b538SAndroid Build Coastguard Worker    return self._result.ready()
112*6777b538SAndroid Build Coastguard Worker
113*6777b538SAndroid Build Coastguard Worker  def successful(self):
114*6777b538SAndroid Build Coastguard Worker    return self._result.successful()
115*6777b538SAndroid Build Coastguard Worker
116*6777b538SAndroid Build Coastguard Worker
117*6777b538SAndroid Build Coastguard Workerdef _TerminatePools():
118*6777b538SAndroid Build Coastguard Worker  """Calls .terminate() on all active process pools.
119*6777b538SAndroid Build Coastguard Worker
120*6777b538SAndroid Build Coastguard Worker  Not supposed to be necessary according to the docs, but seems to be required
121*6777b538SAndroid Build Coastguard Worker  when child process throws an exception or Ctrl-C is hit.
122*6777b538SAndroid Build Coastguard Worker  """
123*6777b538SAndroid Build Coastguard Worker  global _silence_exceptions
124*6777b538SAndroid Build Coastguard Worker  _silence_exceptions = True
125*6777b538SAndroid Build Coastguard Worker  # Child processes cannot have pools, but atexit runs this function because
126*6777b538SAndroid Build Coastguard Worker  # it was registered before fork()ing.
127*6777b538SAndroid Build Coastguard Worker  if _is_child_process:
128*6777b538SAndroid Build Coastguard Worker    return
129*6777b538SAndroid Build Coastguard Worker
130*6777b538SAndroid Build Coastguard Worker  def close_pool(pool):
131*6777b538SAndroid Build Coastguard Worker    try:
132*6777b538SAndroid Build Coastguard Worker      pool.terminate()
133*6777b538SAndroid Build Coastguard Worker    except:  # pylint: disable=bare-except
134*6777b538SAndroid Build Coastguard Worker      pass
135*6777b538SAndroid Build Coastguard Worker
136*6777b538SAndroid Build Coastguard Worker  for i, pool in enumerate(_all_pools):
137*6777b538SAndroid Build Coastguard Worker    # Without calling terminate() on a separate thread, the call can block
138*6777b538SAndroid Build Coastguard Worker    # forever.
139*6777b538SAndroid Build Coastguard Worker    thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
140*6777b538SAndroid Build Coastguard Worker                              target=close_pool,
141*6777b538SAndroid Build Coastguard Worker                              args=(pool, ))
142*6777b538SAndroid Build Coastguard Worker    thread.daemon = True
143*6777b538SAndroid Build Coastguard Worker    thread.start()
144*6777b538SAndroid Build Coastguard Worker
145*6777b538SAndroid Build Coastguard Worker
146*6777b538SAndroid Build Coastguard Workerdef _CheckForException(value):
147*6777b538SAndroid Build Coastguard Worker  if isinstance(value, _ExceptionWrapper):
148*6777b538SAndroid Build Coastguard Worker    global _silence_exceptions
149*6777b538SAndroid Build Coastguard Worker    if not _silence_exceptions:
150*6777b538SAndroid Build Coastguard Worker      value.MaybeThrow()
151*6777b538SAndroid Build Coastguard Worker      _silence_exceptions = True
152*6777b538SAndroid Build Coastguard Worker      logging.error('Subprocess raised an exception:\n%s', value.msg)
153*6777b538SAndroid Build Coastguard Worker    sys.exit(1)
154*6777b538SAndroid Build Coastguard Worker
155*6777b538SAndroid Build Coastguard Worker
156*6777b538SAndroid Build Coastguard Workerdef _MakeProcessPool(job_params, **job_kwargs):
157*6777b538SAndroid Build Coastguard Worker  global _all_pools
158*6777b538SAndroid Build Coastguard Worker  global _fork_params
159*6777b538SAndroid Build Coastguard Worker  global _fork_kwargs
160*6777b538SAndroid Build Coastguard Worker  assert _fork_params is None
161*6777b538SAndroid Build Coastguard Worker  assert _fork_kwargs is None
162*6777b538SAndroid Build Coastguard Worker  pool_size = min(len(job_params), multiprocessing.cpu_count())
163*6777b538SAndroid Build Coastguard Worker  _fork_params = job_params
164*6777b538SAndroid Build Coastguard Worker  _fork_kwargs = job_kwargs
165*6777b538SAndroid Build Coastguard Worker  ret = multiprocessing.Pool(pool_size)
166*6777b538SAndroid Build Coastguard Worker  _fork_params = None
167*6777b538SAndroid Build Coastguard Worker  _fork_kwargs = None
168*6777b538SAndroid Build Coastguard Worker  if _all_pools is None:
169*6777b538SAndroid Build Coastguard Worker    _all_pools = []
170*6777b538SAndroid Build Coastguard Worker    atexit.register(_TerminatePools)
171*6777b538SAndroid Build Coastguard Worker  _all_pools.append(ret)
172*6777b538SAndroid Build Coastguard Worker  return ret
173*6777b538SAndroid Build Coastguard Worker
174*6777b538SAndroid Build Coastguard Worker
175*6777b538SAndroid Build Coastguard Workerdef ForkAndCall(func, args):
176*6777b538SAndroid Build Coastguard Worker  """Runs |func| in a fork'ed process.
177*6777b538SAndroid Build Coastguard Worker
178*6777b538SAndroid Build Coastguard Worker  Returns:
179*6777b538SAndroid Build Coastguard Worker    A Result object (call .get() to get the return value)
180*6777b538SAndroid Build Coastguard Worker  """
181*6777b538SAndroid Build Coastguard Worker  if DISABLE_ASYNC:
182*6777b538SAndroid Build Coastguard Worker    pool = None
183*6777b538SAndroid Build Coastguard Worker    result = _ImmediateResult(func(*args))
184*6777b538SAndroid Build Coastguard Worker  else:
185*6777b538SAndroid Build Coastguard Worker    pool = _MakeProcessPool([args])  # Omit |kwargs|.
186*6777b538SAndroid Build Coastguard Worker    result = pool.apply_async(_FuncWrapper(func), (0, ))
187*6777b538SAndroid Build Coastguard Worker    pool.close()
188*6777b538SAndroid Build Coastguard Worker  return _WrappedResult(result, pool=pool)
189*6777b538SAndroid Build Coastguard Worker
190*6777b538SAndroid Build Coastguard Worker
191*6777b538SAndroid Build Coastguard Workerdef BulkForkAndCall(func, arg_tuples, **kwargs):
192*6777b538SAndroid Build Coastguard Worker  """Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
193*6777b538SAndroid Build Coastguard Worker
194*6777b538SAndroid Build Coastguard Worker  Args:
195*6777b538SAndroid Build Coastguard Worker    kwargs: Common keyword arguments to be passed to |func|.
196*6777b538SAndroid Build Coastguard Worker
197*6777b538SAndroid Build Coastguard Worker  Yields the return values in order.
198*6777b538SAndroid Build Coastguard Worker  """
199*6777b538SAndroid Build Coastguard Worker  arg_tuples = list(arg_tuples)
200*6777b538SAndroid Build Coastguard Worker  if not arg_tuples:
201*6777b538SAndroid Build Coastguard Worker    return
202*6777b538SAndroid Build Coastguard Worker
203*6777b538SAndroid Build Coastguard Worker  if DISABLE_ASYNC:
204*6777b538SAndroid Build Coastguard Worker    for args in arg_tuples:
205*6777b538SAndroid Build Coastguard Worker      yield func(*args, **kwargs)
206*6777b538SAndroid Build Coastguard Worker    return
207*6777b538SAndroid Build Coastguard Worker
208*6777b538SAndroid Build Coastguard Worker  pool = _MakeProcessPool(arg_tuples, **kwargs)
209*6777b538SAndroid Build Coastguard Worker  wrapped_func = _FuncWrapper(func)
210*6777b538SAndroid Build Coastguard Worker  try:
211*6777b538SAndroid Build Coastguard Worker    for result in pool.imap(wrapped_func, range(len(arg_tuples))):
212*6777b538SAndroid Build Coastguard Worker      _CheckForException(result)
213*6777b538SAndroid Build Coastguard Worker      yield result
214*6777b538SAndroid Build Coastguard Worker  finally:
215*6777b538SAndroid Build Coastguard Worker    pool.close()
216*6777b538SAndroid Build Coastguard Worker    pool.join()
217*6777b538SAndroid Build Coastguard Worker    _all_pools.remove(pool)
218