xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/lib/parallel.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# -*- coding: utf-8 -*-
2*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li
6*9c5db199SXin Li"""Module for running cbuildbot stages in the background."""
7*9c5db199SXin Li
8*9c5db199SXin Lifrom __future__ import print_function
9*9c5db199SXin Li
10*9c5db199SXin Liimport collections
11*9c5db199SXin Liimport contextlib
12*9c5db199SXin Liimport ctypes
13*9c5db199SXin Liimport errno
14*9c5db199SXin Liimport functools
15*9c5db199SXin Liimport multiprocessing
16*9c5db199SXin Lifrom multiprocessing.managers import SyncManager
17*9c5db199SXin Liimport os
18*9c5db199SXin Liimport signal
19*9c5db199SXin Liimport sys
20*9c5db199SXin Liimport time
21*9c5db199SXin Liimport traceback
22*9c5db199SXin Li
23*9c5db199SXin Liimport six
24*9c5db199SXin Lifrom six.moves import queue as Queue
25*9c5db199SXin Li
26*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import failures_lib
27*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import results_lib
28*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import cros_build_lib
29*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import cros_logging as logging
30*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import osutils
31*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import signals
32*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import timeout_util
33*9c5db199SXin Li
34*9c5db199SXin Li
35*9c5db199SXin Li_BUFSIZE = 1024
36*9c5db199SXin Li
37*9c5db199SXin Li
38*9c5db199SXin Liclass HackTimeoutSyncManager(SyncManager):
39*9c5db199SXin Li  """Increase the process join timeout in SyncManager.
40*9c5db199SXin Li
41*9c5db199SXin Li  The timeout for the manager process to join in the core library is
42*9c5db199SXin Li  too low. The process is often killed before shutting down properly,
43*9c5db199SXin Li  resulting in temporary directories (pymp-xxx) not being cleaned
44*9c5db199SXin Li  up. This class increases the default timeout.
45*9c5db199SXin Li  """
46*9c5db199SXin Li
47*9c5db199SXin Li  @staticmethod
48*9c5db199SXin Li  def _finalize_manager(process, *args, **kwargs):
49*9c5db199SXin Li    """Shutdown the manager process."""
50*9c5db199SXin Li
51*9c5db199SXin Li    def _join(functor, *args, **kwargs):
52*9c5db199SXin Li      timeout = kwargs.get('timeout')
53*9c5db199SXin Li      if not timeout is None and timeout < 1:
54*9c5db199SXin Li        kwargs['timeout'] = 1
55*9c5db199SXin Li
56*9c5db199SXin Li      functor(*args, **kwargs)
57*9c5db199SXin Li
58*9c5db199SXin Li    process.join = functools.partial(_join, process.join)
59*9c5db199SXin Li    SyncManager._finalize_manager(process, *args, **kwargs)
60*9c5db199SXin Li
61*9c5db199SXin Li
62*9c5db199SXin Lidef IgnoreSigintAndSigterm():
63*9c5db199SXin Li  """Ignores any future SIGINTs and SIGTERMs."""
64*9c5db199SXin Li  signal.signal(signal.SIGINT, signal.SIG_IGN)
65*9c5db199SXin Li  signal.signal(signal.SIGTERM, signal.SIG_IGN)
66*9c5db199SXin Li
67*9c5db199SXin Li
68*9c5db199SXin Lidef Manager():
69*9c5db199SXin Li  """Create a background process for managing interprocess communication.
70*9c5db199SXin Li
71*9c5db199SXin Li  This manager wraps multiprocessing.Manager() and ensures that any sockets
72*9c5db199SXin Li  created during initialization are created under the /tmp tree rather than in a
73*9c5db199SXin Li  custom temp directory. This is needed because TMPDIR might be really long, and
74*9c5db199SXin Li  named sockets are limited to 108 characters.
75*9c5db199SXin Li
76*9c5db199SXin Li  Examples:
77*9c5db199SXin Li    with Manager() as manager:
78*9c5db199SXin Li      queue = manager.Queue()
79*9c5db199SXin Li      ...
80*9c5db199SXin Li
81*9c5db199SXin Li  Returns:
82*9c5db199SXin Li    The return value of multiprocessing.Manager()
83*9c5db199SXin Li  """
84*9c5db199SXin Li  # Use a short directory in /tmp. Do not use /tmp directly to keep these
85*9c5db199SXin Li  # temperary files together and because certain environments do not like too
86*9c5db199SXin Li  # many top-level paths in /tmp (see crbug.com/945523).
87*9c5db199SXin Li  # Make it mode 1777 to mirror /tmp, so that we don't have failures when root
88*9c5db199SXin Li  # calls parallel first, and some other user calls it later.
89*9c5db199SXin Li  tmp_dir = '/tmp/chromite.parallel.%d' % os.geteuid()
90*9c5db199SXin Li  osutils.SafeMakedirs(tmp_dir, mode=0o1777)
91*9c5db199SXin Li  old_tempdir_value, old_tempdir_env = osutils.SetGlobalTempDir(tmp_dir)
92*9c5db199SXin Li  try:
93*9c5db199SXin Li    m = HackTimeoutSyncManager()
94*9c5db199SXin Li    # SyncManager doesn't handle KeyboardInterrupt exceptions well; pipes get
95*9c5db199SXin Li    # broken and E_NOENT or E_PIPE errors are thrown from various places. We
96*9c5db199SXin Li    # can just ignore SIGINT in the SyncManager and things will close properly
97*9c5db199SXin Li    # when the enclosing with-statement exits.
98*9c5db199SXin Li    m.start(IgnoreSigintAndSigterm)
99*9c5db199SXin Li    return m
100*9c5db199SXin Li  finally:
101*9c5db199SXin Li    osutils.SetGlobalTempDir(old_tempdir_value, old_tempdir_env)
102*9c5db199SXin Li
103*9c5db199SXin Li
104*9c5db199SXin Liclass BackgroundFailure(failures_lib.CompoundFailure):
105*9c5db199SXin Li  """Exception to show a step failed while running in a background process."""
106*9c5db199SXin Li
107*9c5db199SXin Li
108*9c5db199SXin Liclass ProcessExitTimeout(Exception):
109*9c5db199SXin Li  """Raised if a process cannot exit within the timeout."""
110*9c5db199SXin Li
111*9c5db199SXin Li
112*9c5db199SXin Liclass ProcessUnexpectedExit(Exception):
113*9c5db199SXin Li  """Raised if a process exits unexpectedly."""
114*9c5db199SXin Li
115*9c5db199SXin Li
116*9c5db199SXin Liclass ProcessSilentTimeout(Exception):
117*9c5db199SXin Li  """Raised when there is no output for a prolonged period of time."""
118*9c5db199SXin Li
119*9c5db199SXin Li
120*9c5db199SXin Liclass UnexpectedException(Exception):
121*9c5db199SXin Li  """Raised when exception occurs at an unexpected place."""
122*9c5db199SXin Li
123*9c5db199SXin Li
124*9c5db199SXin Liclass _BackgroundTask(multiprocessing.Process):
125*9c5db199SXin Li  """Run a task in the background.
126*9c5db199SXin Li
127*9c5db199SXin Li  This task may be the 'Run' function from a buildbot stage or just a plain
128*9c5db199SXin Li  function. It will be run in the background. Output from this task is saved
129*9c5db199SXin Li  to a temporary file and is printed when the 'Wait' function is called.
130*9c5db199SXin Li  """
131*9c5db199SXin Li
132*9c5db199SXin Li  # The time we give Python to startup and exit.
133*9c5db199SXin Li  STARTUP_TIMEOUT = 60 * 5
134*9c5db199SXin Li  EXIT_TIMEOUT = 60 * 10
135*9c5db199SXin Li
136*9c5db199SXin Li  # The time we allow processes to be silent. This is in place so that we
137*9c5db199SXin Li  # eventually catch hanging processes, and print the remainder of our output.
138*9c5db199SXin Li  # Do not increase this. Instead, adjust your program to print regular progress
139*9c5db199SXin Li  # updates, so that cbuildbot (and buildbot) can know that it has not hung.
140*9c5db199SXin Li  SILENT_TIMEOUT = 60 * 145
141*9c5db199SXin Li
142*9c5db199SXin Li  # The amount by which we reduce the SILENT_TIMEOUT every time we launch
143*9c5db199SXin Li  # a subprocess. This helps ensure that children get a chance to enforce the
144*9c5db199SXin Li  # SILENT_TIMEOUT prior to the parents enforcing it.
145*9c5db199SXin Li  SILENT_TIMEOUT_STEP = 30
146*9c5db199SXin Li  MINIMUM_SILENT_TIMEOUT = 60 * 135
147*9c5db199SXin Li
148*9c5db199SXin Li  # The time before terminating or killing a task.
149*9c5db199SXin Li  SIGTERM_TIMEOUT = 30
150*9c5db199SXin Li  SIGKILL_TIMEOUT = 60
151*9c5db199SXin Li
152*9c5db199SXin Li  # How long we allow debug commands to run (so we don't hang will trying to
153*9c5db199SXin Li  # recover from a hang).
154*9c5db199SXin Li  DEBUG_CMD_TIMEOUT = 60
155*9c5db199SXin Li
156*9c5db199SXin Li  # Interval we check for updates from print statements.
157*9c5db199SXin Li  PRINT_INTERVAL = 1
158*9c5db199SXin Li
159*9c5db199SXin Li  def __init__(self, task, queue, semaphore=None, task_args=None,
160*9c5db199SXin Li               task_kwargs=None):
161*9c5db199SXin Li    """Create a new _BackgroundTask object.
162*9c5db199SXin Li
163*9c5db199SXin Li    If semaphore is supplied, it will be acquired for the duration of the
164*9c5db199SXin Li    steps that are run in the background. This can be used to limit the
165*9c5db199SXin Li    number of simultaneous parallel tasks.
166*9c5db199SXin Li
167*9c5db199SXin Li    Args:
168*9c5db199SXin Li      task: The task (a functor) to run in the background.
169*9c5db199SXin Li      queue: A queue to be used for managing communication between the parent
170*9c5db199SXin Li        and child process. This queue must be valid for the length of the
171*9c5db199SXin Li        life of the child process, until the parent has collected its status.
172*9c5db199SXin Li      semaphore: The lock to hold while |task| runs.
173*9c5db199SXin Li      task_args: A list of args to pass to the |task|.
174*9c5db199SXin Li      task_kwargs: A dict of optional args to pass to the |task|.
175*9c5db199SXin Li    """
176*9c5db199SXin Li    multiprocessing.Process.__init__(self)
177*9c5db199SXin Li    self._task = task
178*9c5db199SXin Li    self._queue = queue
179*9c5db199SXin Li    self._semaphore = semaphore
180*9c5db199SXin Li    self._started = multiprocessing.Event()
181*9c5db199SXin Li    self._killing = multiprocessing.Event()
182*9c5db199SXin Li    self._output = None
183*9c5db199SXin Li    self._parent_pid = None
184*9c5db199SXin Li    self._task_args = task_args if task_args else ()
185*9c5db199SXin Li    self._task_kwargs = task_kwargs if task_kwargs else {}
186*9c5db199SXin Li
187*9c5db199SXin Li  def _WaitForStartup(self):
188*9c5db199SXin Li    # TODO(davidjames): Use python-2.7 syntax to simplify this.
189*9c5db199SXin Li    self._started.wait(self.STARTUP_TIMEOUT)
190*9c5db199SXin Li    msg = 'Process failed to start in %d seconds' % self.STARTUP_TIMEOUT
191*9c5db199SXin Li    assert self._started.is_set(), msg
192*9c5db199SXin Li
193*9c5db199SXin Li  @classmethod
194*9c5db199SXin Li  def _DebugRunCommand(cls, cmd, **kwargs):
195*9c5db199SXin Li    """Swallow any exception run raises.
196*9c5db199SXin Li
197*9c5db199SXin Li    Since these commands are for purely informational purposes, we don't
198*9c5db199SXin Li    random issues causing the bot to die.
199*9c5db199SXin Li
200*9c5db199SXin Li    Returns:
201*9c5db199SXin Li      Stdout on success
202*9c5db199SXin Li    """
203*9c5db199SXin Li    log_level = kwargs['debug_level']
204*9c5db199SXin Li    try:
205*9c5db199SXin Li      with timeout_util.Timeout(cls.DEBUG_CMD_TIMEOUT):
206*9c5db199SXin Li        return cros_build_lib.run(cmd, **kwargs).output
207*9c5db199SXin Li    except (cros_build_lib.RunCommandError, timeout_util.TimeoutError) as e:
208*9c5db199SXin Li      logging.log(log_level, 'Running %s failed: %s', cmd[0], str(e))
209*9c5db199SXin Li      return ''
210*9c5db199SXin Li
211*9c5db199SXin Li  # Debug commands to run in gdb.  A class member so tests can stub it out.
212*9c5db199SXin Li  GDB_COMMANDS = (
213*9c5db199SXin Li      'info proc all',
214*9c5db199SXin Li      'info threads',
215*9c5db199SXin Li      'thread apply all py-list',
216*9c5db199SXin Li      'thread apply all py-bt',
217*9c5db199SXin Li      'thread apply all bt',
218*9c5db199SXin Li      'detach',
219*9c5db199SXin Li  )
220*9c5db199SXin Li
221*9c5db199SXin Li  @classmethod
222*9c5db199SXin Li  def _DumpDebugPid(cls, log_level, pid):
223*9c5db199SXin Li    """Dump debug info about the hanging |pid|."""
224*9c5db199SXin Li    pid = str(pid)
225*9c5db199SXin Li    commands = (
226*9c5db199SXin Li        ('pstree', '-Apals', pid),
227*9c5db199SXin Li        ('lsof', '-p', pid),
228*9c5db199SXin Li    )
229*9c5db199SXin Li    for cmd in commands:
230*9c5db199SXin Li      cls._DebugRunCommand(cmd, debug_level=log_level, check=False,
231*9c5db199SXin Li                           log_output=True, encoding='utf-8')
232*9c5db199SXin Li
233*9c5db199SXin Li    stdin = '\n'.join(['echo \\n>>> %s\\n\n%s' % (x, x)
234*9c5db199SXin Li                       for x in cls.GDB_COMMANDS])
235*9c5db199SXin Li    cmd = ('gdb', '--nx', '-q', '-p', pid, '-ex', 'set prompt',)
236*9c5db199SXin Li    cls._DebugRunCommand(cmd, debug_level=log_level, check=False,
237*9c5db199SXin Li                         log_output=True, input=stdin, encoding='utf-8')
238*9c5db199SXin Li
239*9c5db199SXin Li  def Kill(self, sig, log_level, first=False):
240*9c5db199SXin Li    """Kill process with signal, ignoring if the process is dead.
241*9c5db199SXin Li
242*9c5db199SXin Li    Args:
243*9c5db199SXin Li      sig: Signal to send.
244*9c5db199SXin Li      log_level: The log level of log messages.
245*9c5db199SXin Li      first: Whether this is the first signal we've sent.
246*9c5db199SXin Li    """
247*9c5db199SXin Li    self._killing.set()
248*9c5db199SXin Li    self._WaitForStartup()
249*9c5db199SXin Li    if logging.getLogger().isEnabledFor(log_level):
250*9c5db199SXin Li      # Dump debug information about the hanging process.
251*9c5db199SXin Li      logging.log(log_level, 'Killing %r (sig=%r %s)', self.pid, sig,
252*9c5db199SXin Li                  signals.StrSignal(sig))
253*9c5db199SXin Li
254*9c5db199SXin Li      if first:
255*9c5db199SXin Li        ppid = str(self.pid)
256*9c5db199SXin Li        output = self._DebugRunCommand(
257*9c5db199SXin Li            ('pgrep', '-P', ppid), debug_level=log_level, print_cmd=False,
258*9c5db199SXin Li            check=False, capture_output=True)
259*9c5db199SXin Li        for pid in [ppid] + output.splitlines():
260*9c5db199SXin Li          self._DumpDebugPid(log_level, pid)
261*9c5db199SXin Li
262*9c5db199SXin Li    try:
263*9c5db199SXin Li      os.kill(self.pid, sig)
264*9c5db199SXin Li    except OSError as ex:
265*9c5db199SXin Li      if ex.errno != errno.ESRCH:
266*9c5db199SXin Li        raise
267*9c5db199SXin Li
268*9c5db199SXin Li  def Cleanup(self, silent=False):
269*9c5db199SXin Li    """Wait for a process to exit."""
270*9c5db199SXin Li    if os.getpid() != self._parent_pid or self._output is None:
271*9c5db199SXin Li      return
272*9c5db199SXin Li    try:
273*9c5db199SXin Li      # Print output from subprocess.
274*9c5db199SXin Li      if not silent and logging.getLogger().isEnabledFor(logging.DEBUG):
275*9c5db199SXin Li        with open(self._output.name, 'r') as f:
276*9c5db199SXin Li          for line in f:
277*9c5db199SXin Li            logging.debug(line.rstrip('\n'))
278*9c5db199SXin Li    finally:
279*9c5db199SXin Li      # Clean up our temporary file.
280*9c5db199SXin Li      osutils.SafeUnlink(self._output.name)
281*9c5db199SXin Li      self._output.close()
282*9c5db199SXin Li      self._output = None
283*9c5db199SXin Li
284*9c5db199SXin Li  def Wait(self):
285*9c5db199SXin Li    """Wait for the task to complete.
286*9c5db199SXin Li
287*9c5db199SXin Li    Output from the task is printed as it runs.
288*9c5db199SXin Li
289*9c5db199SXin Li    If an exception occurs, return a string containing the traceback.
290*9c5db199SXin Li    """
291*9c5db199SXin Li    try:
292*9c5db199SXin Li      # Flush stdout and stderr to be sure no output is interleaved.
293*9c5db199SXin Li      sys.stdout.flush()
294*9c5db199SXin Li      sys.stderr.flush()
295*9c5db199SXin Li
296*9c5db199SXin Li      # File position pointers are shared across processes, so we must open
297*9c5db199SXin Li      # our own file descriptor to ensure output is not lost.
298*9c5db199SXin Li      self._WaitForStartup()
299*9c5db199SXin Li      silent_death_time = time.time() + self.SILENT_TIMEOUT
300*9c5db199SXin Li      results = []
301*9c5db199SXin Li      with open(self._output.name, 'r') as output:
302*9c5db199SXin Li        pos = 0
303*9c5db199SXin Li        running, exited_cleanly, task_errors, run_errors = (True, False, [], [])
304*9c5db199SXin Li        while running:
305*9c5db199SXin Li          # Check whether the process is still alive.
306*9c5db199SXin Li          running = self.is_alive()
307*9c5db199SXin Li
308*9c5db199SXin Li          try:
309*9c5db199SXin Li            errors, results = \
310*9c5db199SXin Li                self._queue.get(True, self.PRINT_INTERVAL)
311*9c5db199SXin Li            if errors:
312*9c5db199SXin Li              task_errors.extend(errors)
313*9c5db199SXin Li
314*9c5db199SXin Li            running = False
315*9c5db199SXin Li            exited_cleanly = True
316*9c5db199SXin Li          except Queue.Empty:
317*9c5db199SXin Li            pass
318*9c5db199SXin Li
319*9c5db199SXin Li          if not running:
320*9c5db199SXin Li            # Wait for the process to actually exit. If the child doesn't exit
321*9c5db199SXin Li            # in a timely fashion, kill it.
322*9c5db199SXin Li            self.join(self.EXIT_TIMEOUT)
323*9c5db199SXin Li            if self.exitcode is None:
324*9c5db199SXin Li              msg = '%r hung for %r seconds' % (self, self.EXIT_TIMEOUT)
325*9c5db199SXin Li              run_errors.extend(
326*9c5db199SXin Li                  failures_lib.CreateExceptInfo(ProcessExitTimeout(msg), ''))
327*9c5db199SXin Li              self._KillChildren([self])
328*9c5db199SXin Li            elif not exited_cleanly:
329*9c5db199SXin Li              msg = ('%r exited unexpectedly with code %s'
330*9c5db199SXin Li                     % (self, self.exitcode))
331*9c5db199SXin Li              run_errors.extend(
332*9c5db199SXin Li                  failures_lib.CreateExceptInfo(ProcessUnexpectedExit(msg), ''))
333*9c5db199SXin Li
334*9c5db199SXin Li          # Read output from process.
335*9c5db199SXin Li          output.seek(pos)
336*9c5db199SXin Li          buf = output.read(_BUFSIZE)
337*9c5db199SXin Li
338*9c5db199SXin Li          if buf:
339*9c5db199SXin Li            silent_death_time = time.time() + self.SILENT_TIMEOUT
340*9c5db199SXin Li          elif running and time.time() > silent_death_time:
341*9c5db199SXin Li            msg = ('No output from %r for %r seconds' %
342*9c5db199SXin Li                   (self, self.SILENT_TIMEOUT))
343*9c5db199SXin Li            run_errors.extend(
344*9c5db199SXin Li                failures_lib.CreateExceptInfo(ProcessSilentTimeout(msg), ''))
345*9c5db199SXin Li            self._KillChildren([self])
346*9c5db199SXin Li
347*9c5db199SXin Li            # Read remaining output from the process.
348*9c5db199SXin Li            output.seek(pos)
349*9c5db199SXin Li            buf = output.read(_BUFSIZE)
350*9c5db199SXin Li            running = False
351*9c5db199SXin Li
352*9c5db199SXin Li          # Print output so far.
353*9c5db199SXin Li          while buf:
354*9c5db199SXin Li            sys.stdout.write(buf)
355*9c5db199SXin Li            pos += len(buf)
356*9c5db199SXin Li            if len(buf) < _BUFSIZE:
357*9c5db199SXin Li              break
358*9c5db199SXin Li            buf = output.read(_BUFSIZE)
359*9c5db199SXin Li
360*9c5db199SXin Li          # Print error messages if anything exceptional occurred.
361*9c5db199SXin Li          if run_errors:
362*9c5db199SXin Li            logging.PrintBuildbotStepFailure()
363*9c5db199SXin Li            traceback.print_stack()
364*9c5db199SXin Li            logging.warning('\n'.join(x.str for x in run_errors if x))
365*9c5db199SXin Li            logging.info('\n'.join(x.str for x in task_errors if x))
366*9c5db199SXin Li
367*9c5db199SXin Li          sys.stdout.flush()
368*9c5db199SXin Li          sys.stderr.flush()
369*9c5db199SXin Li
370*9c5db199SXin Li      # Propagate any results.
371*9c5db199SXin Li      for result in results:
372*9c5db199SXin Li        results_lib.Results.Record(*result)
373*9c5db199SXin Li
374*9c5db199SXin Li    finally:
375*9c5db199SXin Li      self.Cleanup(silent=True)
376*9c5db199SXin Li
377*9c5db199SXin Li    # If an error occurred, return it.
378*9c5db199SXin Li    return run_errors + task_errors
379*9c5db199SXin Li
380*9c5db199SXin Li  def start(self):
381*9c5db199SXin Li    """Invoke multiprocessing.Process.start after flushing output/err."""
382*9c5db199SXin Li    if self.SILENT_TIMEOUT < self.MINIMUM_SILENT_TIMEOUT:
383*9c5db199SXin Li      raise AssertionError('Maximum recursion depth exceeded in %r' % self)
384*9c5db199SXin Li
385*9c5db199SXin Li    sys.stdout.flush()
386*9c5db199SXin Li    sys.stderr.flush()
387*9c5db199SXin Li    tmp_dir = '/tmp/chromite.parallel.%d' % os.geteuid()
388*9c5db199SXin Li    osutils.SafeMakedirs(tmp_dir, mode=0o1777)
389*9c5db199SXin Li    self._output = cros_build_lib.UnbufferedNamedTemporaryFile(
390*9c5db199SXin Li        delete=False, dir=tmp_dir, prefix='chromite-parallel-')
391*9c5db199SXin Li    self._parent_pid = os.getpid()
392*9c5db199SXin Li    return multiprocessing.Process.start(self)
393*9c5db199SXin Li
394*9c5db199SXin Li  def run(self):
395*9c5db199SXin Li    """Run the list of steps."""
396*9c5db199SXin Li    if self._semaphore is not None:
397*9c5db199SXin Li      self._semaphore.acquire()
398*9c5db199SXin Li
399*9c5db199SXin Li    errors = failures_lib.CreateExceptInfo(
400*9c5db199SXin Li        UnexpectedException('Unexpected exception in %r' % self), '')
401*9c5db199SXin Li    pid = os.getpid()
402*9c5db199SXin Li    try:
403*9c5db199SXin Li      errors = self._Run()
404*9c5db199SXin Li    finally:
405*9c5db199SXin Li      if not self._killing.is_set() and os.getpid() == pid:
406*9c5db199SXin Li        results = results_lib.Results.Get()
407*9c5db199SXin Li        self._queue.put((errors, results))
408*9c5db199SXin Li        if self._semaphore is not None:
409*9c5db199SXin Li          self._semaphore.release()
410*9c5db199SXin Li
411*9c5db199SXin Li  def _Run(self):
412*9c5db199SXin Li    """Internal method for running the list of steps."""
413*9c5db199SXin Li    # Register a handler for a signal that is rarely used.
414*9c5db199SXin Li    def trigger_bt(_sig_num, frame):
415*9c5db199SXin Li      logging.error('pre-kill notification (SIGXCPU); traceback:\n%s',
416*9c5db199SXin Li                    ''.join(traceback.format_stack(frame)))
417*9c5db199SXin Li    signal.signal(signal.SIGXCPU, trigger_bt)
418*9c5db199SXin Li
419*9c5db199SXin Li    sys.stdout.flush()
420*9c5db199SXin Li    sys.stderr.flush()
421*9c5db199SXin Li    errors = []
422*9c5db199SXin Li    # Send all output to a named temporary file.
423*9c5db199SXin Li    with open(self._output.name, 'wb', 0) as output:
424*9c5db199SXin Li      # Back up sys.std{err,out}. These aren't used, but we keep a copy so
425*9c5db199SXin Li      # that they aren't garbage collected. We intentionally don't restore
426*9c5db199SXin Li      # the old stdout and stderr at the end, because we want shutdown errors
427*9c5db199SXin Li      # to also be sent to the same log file.
428*9c5db199SXin Li      _orig_stdout, _orig_stderr = sys.stdout, sys.stderr
429*9c5db199SXin Li
430*9c5db199SXin Li      # Replace std{out,err} with unbuffered file objects.
431*9c5db199SXin Li      os.dup2(output.fileno(), sys.__stdout__.fileno())
432*9c5db199SXin Li      os.dup2(output.fileno(), sys.__stderr__.fileno())
433*9c5db199SXin Li      # The API of these funcs changed between versions.
434*9c5db199SXin Li      if sys.version_info.major < 3:
435*9c5db199SXin Li        sys.stdout = os.fdopen(sys.__stdout__.fileno(), 'w', 0)
436*9c5db199SXin Li        sys.stderr = os.fdopen(sys.__stderr__.fileno(), 'w', 0)
437*9c5db199SXin Li      else:
438*9c5db199SXin Li        sys.stdout = os.fdopen(sys.__stdout__.fileno(), 'w', closefd=False)
439*9c5db199SXin Li        sys.stderr = os.fdopen(sys.__stderr__.fileno(), 'w', closefd=False)
440*9c5db199SXin Li
441*9c5db199SXin Li      try:
442*9c5db199SXin Li        self._started.set()
443*9c5db199SXin Li        results_lib.Results.Clear()
444*9c5db199SXin Li
445*9c5db199SXin Li        # Reduce the silent timeout by the prescribed amount.
446*9c5db199SXin Li        cls = self.__class__
447*9c5db199SXin Li        cls.SILENT_TIMEOUT -= cls.SILENT_TIMEOUT_STEP
448*9c5db199SXin Li
449*9c5db199SXin Li        # Actually launch the task.
450*9c5db199SXin Li        self._task(*self._task_args, **self._task_kwargs)
451*9c5db199SXin Li      except failures_lib.StepFailure as ex:
452*9c5db199SXin Li        errors.extend(failures_lib.CreateExceptInfo(
453*9c5db199SXin Li            ex, traceback.format_exc()))
454*9c5db199SXin Li      except BaseException as ex:
455*9c5db199SXin Li        errors.extend(failures_lib.CreateExceptInfo(
456*9c5db199SXin Li            ex, traceback.format_exc()))
457*9c5db199SXin Li        if self._killing.is_set():
458*9c5db199SXin Li          traceback.print_exc()
459*9c5db199SXin Li      finally:
460*9c5db199SXin Li        sys.stdout.flush()
461*9c5db199SXin Li        sys.stderr.flush()
462*9c5db199SXin Li
463*9c5db199SXin Li    return errors
464*9c5db199SXin Li
465*9c5db199SXin Li  @classmethod
466*9c5db199SXin Li  def _KillChildren(cls, bg_tasks, log_level=logging.WARNING):
467*9c5db199SXin Li    """Kill a deque of background tasks.
468*9c5db199SXin Li
469*9c5db199SXin Li    This is needed to prevent hangs in the case where child processes refuse
470*9c5db199SXin Li    to exit.
471*9c5db199SXin Li
472*9c5db199SXin Li    Args:
473*9c5db199SXin Li      bg_tasks: A list filled with _BackgroundTask objects.
474*9c5db199SXin Li      log_level: The log level of log messages.
475*9c5db199SXin Li    """
476*9c5db199SXin Li    logging.log(log_level, 'Killing tasks: %r', bg_tasks)
477*9c5db199SXin Li    siglist = (
478*9c5db199SXin Li        (signal.SIGXCPU, cls.SIGTERM_TIMEOUT),
479*9c5db199SXin Li        (signal.SIGTERM, cls.SIGKILL_TIMEOUT),
480*9c5db199SXin Li        (signal.SIGKILL, None),
481*9c5db199SXin Li    )
482*9c5db199SXin Li    first = True
483*9c5db199SXin Li    for sig, timeout in siglist:
484*9c5db199SXin Li      # Send signal to all tasks.
485*9c5db199SXin Li      for task in bg_tasks:
486*9c5db199SXin Li        task.Kill(sig, log_level, first)
487*9c5db199SXin Li      first = False
488*9c5db199SXin Li
489*9c5db199SXin Li      # Wait for all tasks to exit, if requested.
490*9c5db199SXin Li      if timeout is None:
491*9c5db199SXin Li        for task in bg_tasks:
492*9c5db199SXin Li          task.join()
493*9c5db199SXin Li          task.Cleanup()
494*9c5db199SXin Li        break
495*9c5db199SXin Li
496*9c5db199SXin Li      # Wait until timeout expires.
497*9c5db199SXin Li      end_time = time.time() + timeout
498*9c5db199SXin Li      while bg_tasks:
499*9c5db199SXin Li        time_left = end_time - time.time()
500*9c5db199SXin Li        if time_left <= 0:
501*9c5db199SXin Li          break
502*9c5db199SXin Li        task = bg_tasks[-1]
503*9c5db199SXin Li        task.join(time_left)
504*9c5db199SXin Li        if task.exitcode is not None:
505*9c5db199SXin Li          task.Cleanup()
506*9c5db199SXin Li          bg_tasks.pop()
507*9c5db199SXin Li
508*9c5db199SXin Li  @classmethod
509*9c5db199SXin Li  @contextlib.contextmanager
510*9c5db199SXin Li  def ParallelTasks(cls, steps, max_parallel=None, halt_on_error=False):
511*9c5db199SXin Li    """Run a list of functions in parallel.
512*9c5db199SXin Li
513*9c5db199SXin Li    This function launches the provided functions in the background, yields,
514*9c5db199SXin Li    and then waits for the functions to exit.
515*9c5db199SXin Li
516*9c5db199SXin Li    The output from the functions is saved to a temporary file and printed as if
517*9c5db199SXin Li    they were run in sequence.
518*9c5db199SXin Li
519*9c5db199SXin Li    If exceptions occur in the steps, we join together the tracebacks and print
520*9c5db199SXin Li    them after all parallel tasks have finished running. Further, a
521*9c5db199SXin Li    BackgroundFailure is raised with full stack traces of all exceptions.
522*9c5db199SXin Li
523*9c5db199SXin Li    Args:
524*9c5db199SXin Li      steps: A list of functions to run.
525*9c5db199SXin Li      max_parallel: The maximum number of simultaneous tasks to run in parallel.
526*9c5db199SXin Li        By default, run all tasks in parallel.
527*9c5db199SXin Li      halt_on_error: After the first exception occurs, halt any running steps,
528*9c5db199SXin Li        and squelch any further output, including any exceptions that might
529*9c5db199SXin Li        occur.
530*9c5db199SXin Li    """
531*9c5db199SXin Li
532*9c5db199SXin Li    semaphore = None
533*9c5db199SXin Li    if max_parallel is not None:
534*9c5db199SXin Li      semaphore = multiprocessing.Semaphore(max_parallel)
535*9c5db199SXin Li
536*9c5db199SXin Li    # First, start all the steps.
537*9c5db199SXin Li    with Manager() as manager:
538*9c5db199SXin Li      bg_tasks = collections.deque()
539*9c5db199SXin Li      for step in steps:
540*9c5db199SXin Li        task = cls(step, queue=manager.Queue(), semaphore=semaphore)
541*9c5db199SXin Li        task.start()
542*9c5db199SXin Li        bg_tasks.append(task)
543*9c5db199SXin Li
544*9c5db199SXin Li      foreground_except = None
545*9c5db199SXin Li      try:
546*9c5db199SXin Li        yield
547*9c5db199SXin Li      except BaseException:
548*9c5db199SXin Li        foreground_except = sys.exc_info()
549*9c5db199SXin Li      finally:
550*9c5db199SXin Li        errors = []
551*9c5db199SXin Li        skip_bg_wait = halt_on_error and foreground_except is not None
552*9c5db199SXin Li        # Wait for each step to complete.
553*9c5db199SXin Li        while not skip_bg_wait and bg_tasks:
554*9c5db199SXin Li          task = bg_tasks.popleft()
555*9c5db199SXin Li          task_errors = task.Wait()
556*9c5db199SXin Li          if task_errors:
557*9c5db199SXin Li            errors.extend(task_errors)
558*9c5db199SXin Li            if halt_on_error:
559*9c5db199SXin Li              break
560*9c5db199SXin Li
561*9c5db199SXin Li        # If there are still tasks left, kill them.
562*9c5db199SXin Li        if bg_tasks:
563*9c5db199SXin Li          cls._KillChildren(bg_tasks, log_level=logging.DEBUG)
564*9c5db199SXin Li
565*9c5db199SXin Li        # Propagate any exceptions; foreground exceptions take precedence.
566*9c5db199SXin Li        if foreground_except is not None:
567*9c5db199SXin Li          # contextlib ignores caught exceptions unless explicitly re-raised.
568*9c5db199SXin Li          six.reraise(foreground_except[0], foreground_except[1],
569*9c5db199SXin Li                      foreground_except[2])
570*9c5db199SXin Li        if errors:
571*9c5db199SXin Li          raise BackgroundFailure(exc_infos=errors)
572*9c5db199SXin Li
573*9c5db199SXin Li  @staticmethod
574*9c5db199SXin Li  def TaskRunner(queue, task, onexit=None, task_args=None, task_kwargs=None):
575*9c5db199SXin Li    """Run task(*input) for each input in the queue.
576*9c5db199SXin Li
577*9c5db199SXin Li    Returns when it encounters an _AllTasksComplete object on the queue.
578*9c5db199SXin Li    If exceptions occur, save them off and re-raise them as a
579*9c5db199SXin Li    BackgroundFailure once we've finished processing the items in the queue.
580*9c5db199SXin Li
581*9c5db199SXin Li    Args:
582*9c5db199SXin Li      queue: A queue of tasks to run. Add tasks to this queue, and they will
583*9c5db199SXin Li        be run.
584*9c5db199SXin Li      task: Function to run on each queued input.
585*9c5db199SXin Li      onexit: Function to run after all inputs are processed.
586*9c5db199SXin Li      task_args: A list of args to pass to the |task|.
587*9c5db199SXin Li      task_kwargs: A dict of optional args to pass to the |task|.
588*9c5db199SXin Li    """
589*9c5db199SXin Li    if task_args is None:
590*9c5db199SXin Li      task_args = []
591*9c5db199SXin Li    elif not isinstance(task_args, list):
592*9c5db199SXin Li      task_args = list(task_args)
593*9c5db199SXin Li    if task_kwargs is None:
594*9c5db199SXin Li      task_kwargs = {}
595*9c5db199SXin Li
596*9c5db199SXin Li    errors = []
597*9c5db199SXin Li    while True:
598*9c5db199SXin Li      # Wait for a new item to show up on the queue. This is a blocking wait,
599*9c5db199SXin Li      # so if there's nothing to do, we just sit here.
600*9c5db199SXin Li      x = queue.get()
601*9c5db199SXin Li      if isinstance(x, _AllTasksComplete):
602*9c5db199SXin Li        # All tasks are complete, so we should exit.
603*9c5db199SXin Li        break
604*9c5db199SXin Li      elif not isinstance(x, list):
605*9c5db199SXin Li        x = task_args + list(x)
606*9c5db199SXin Li      else:
607*9c5db199SXin Li        x = task_args + x
608*9c5db199SXin Li
609*9c5db199SXin Li      # If no tasks failed yet, process the remaining tasks.
610*9c5db199SXin Li      if not errors:
611*9c5db199SXin Li        try:
612*9c5db199SXin Li          task(*x, **task_kwargs)
613*9c5db199SXin Li        except BaseException as ex:
614*9c5db199SXin Li          errors.extend(
615*9c5db199SXin Li              failures_lib.CreateExceptInfo(ex, traceback.format_exc()))
616*9c5db199SXin Li
617*9c5db199SXin Li    # Run exit handlers.
618*9c5db199SXin Li    if onexit:
619*9c5db199SXin Li      onexit()
620*9c5db199SXin Li
621*9c5db199SXin Li    # Propagate any exceptions.
622*9c5db199SXin Li    if errors:
623*9c5db199SXin Li      raise BackgroundFailure(exc_infos=errors)
624*9c5db199SXin Li
625*9c5db199SXin Li
626*9c5db199SXin Lidef RunParallelSteps(steps, max_parallel=None, halt_on_error=False,
627*9c5db199SXin Li                     return_values=False):
628*9c5db199SXin Li  """Run a list of functions in parallel.
629*9c5db199SXin Li
630*9c5db199SXin Li  This function blocks until all steps are completed.
631*9c5db199SXin Li
632*9c5db199SXin Li  The output from the functions is saved to a temporary file and printed as if
633*9c5db199SXin Li  they were run in sequence.
634*9c5db199SXin Li
635*9c5db199SXin Li  If exceptions occur in the steps, we join together the tracebacks and print
636*9c5db199SXin Li  them after all parallel tasks have finished running. Further, a
637*9c5db199SXin Li  BackgroundFailure is raised with full stack traces of all exceptions.
638*9c5db199SXin Li
639*9c5db199SXin Li  Examples:
640*9c5db199SXin Li    # This snippet will execute in parallel:
641*9c5db199SXin Li    #   somefunc()
642*9c5db199SXin Li    #   anotherfunc()
643*9c5db199SXin Li    #   funcfunc()
644*9c5db199SXin Li    steps = [somefunc, anotherfunc, funcfunc]
645*9c5db199SXin Li    RunParallelSteps(steps)
646*9c5db199SXin Li    # Blocks until all calls have completed.
647*9c5db199SXin Li
648*9c5db199SXin Li  Args:
649*9c5db199SXin Li    steps: A list of functions to run.
650*9c5db199SXin Li    max_parallel: The maximum number of simultaneous tasks to run in parallel.
651*9c5db199SXin Li      By default, run all tasks in parallel.
652*9c5db199SXin Li    halt_on_error: After the first exception occurs, halt any running steps,
653*9c5db199SXin Li      and squelch any further output, including any exceptions that might occur.
654*9c5db199SXin Li    return_values: If set to True, RunParallelSteps returns a list containing
655*9c5db199SXin Li      the return values of the steps.  Defaults to False.
656*9c5db199SXin Li
657*9c5db199SXin Li  Returns:
658*9c5db199SXin Li    If |return_values| is True, the function will return a list containing the
659*9c5db199SXin Li    return values of the steps.
660*9c5db199SXin Li  """
661*9c5db199SXin Li  def ReturnWrapper(queue, fn):
662*9c5db199SXin Li    """Put the return value of |fn| into |queue|."""
663*9c5db199SXin Li    queue.put(fn())
664*9c5db199SXin Li
665*9c5db199SXin Li  full_steps = []
666*9c5db199SXin Li  queues = []
667*9c5db199SXin Li  with cros_build_lib.ContextManagerStack() as stack:
668*9c5db199SXin Li    if return_values:
669*9c5db199SXin Li      # We use a managed queue here, because the child process will wait for the
670*9c5db199SXin Li      # queue(pipe) to be flushed (i.e., when items are read from the queue)
671*9c5db199SXin Li      # before exiting, and with a regular queue this may result in hangs for
672*9c5db199SXin Li      # large return values.  But with a managed queue, the manager process will
673*9c5db199SXin Li      # read the items and hold on to them until the managed queue goes out of
674*9c5db199SXin Li      # scope and is cleaned up.
675*9c5db199SXin Li      manager = stack.Add(Manager)
676*9c5db199SXin Li      for step in steps:
677*9c5db199SXin Li        queue = manager.Queue()
678*9c5db199SXin Li        queues.append(queue)
679*9c5db199SXin Li        full_steps.append(functools.partial(ReturnWrapper, queue, step))
680*9c5db199SXin Li    else:
681*9c5db199SXin Li      full_steps = steps
682*9c5db199SXin Li
683*9c5db199SXin Li    with _BackgroundTask.ParallelTasks(full_steps, max_parallel=max_parallel,
684*9c5db199SXin Li                                       halt_on_error=halt_on_error):
685*9c5db199SXin Li      pass
686*9c5db199SXin Li
687*9c5db199SXin Li    if return_values:
688*9c5db199SXin Li      return [queue.get_nowait() for queue in queues]
689*9c5db199SXin Li
690*9c5db199SXin Li
691*9c5db199SXin Liclass _AllTasksComplete(object):
692*9c5db199SXin Li  """Sentinel object to indicate that all tasks are complete."""
693*9c5db199SXin Li
694*9c5db199SXin Li
695*9c5db199SXin Li@contextlib.contextmanager
696*9c5db199SXin Lidef BackgroundTaskRunner(task, *args, **kwargs):
697*9c5db199SXin Li  """Run the specified task on each queued input in a pool of processes.
698*9c5db199SXin Li
699*9c5db199SXin Li  This context manager starts a set of workers in the background, who each
700*9c5db199SXin Li  wait for input on the specified queue. For each input on the queue, these
701*9c5db199SXin Li  workers run task(*args + *input, **kwargs). Note that certain kwargs will
702*9c5db199SXin Li  not pass through to the task (see Args below for the list).
703*9c5db199SXin Li
704*9c5db199SXin Li  The output from these tasks is saved to a temporary file. When control
705*9c5db199SXin Li  returns to the context manager, the background output is printed in order,
706*9c5db199SXin Li  as if the tasks were run in sequence.
707*9c5db199SXin Li
708*9c5db199SXin Li  If exceptions occur in the steps, we join together the tracebacks and print
709*9c5db199SXin Li  them after all parallel tasks have finished running. Further, a
710*9c5db199SXin Li  BackgroundFailure is raised with full stack traces of all exceptions.
711*9c5db199SXin Li
712*9c5db199SXin Li  Examples:
713*9c5db199SXin Li    # This will run somefunc(1, 'small', 'cow', foo='bar') in the background
714*9c5db199SXin Li    # as soon as data is added to the queue (i.e. queue.put() is called).
715*9c5db199SXin Li
716*9c5db199SXin Li    def somefunc(arg1, arg2, arg3, foo=None):
717*9c5db199SXin Li      ...
718*9c5db199SXin Li
719*9c5db199SXin Li    with BackgroundTaskRunner(somefunc, 1, foo='bar') as queue:
720*9c5db199SXin Li      ... do random stuff ...
721*9c5db199SXin Li      queue.put(['small', 'cow'])
722*9c5db199SXin Li      ... do more random stuff while somefunc() runs ...
723*9c5db199SXin Li    # Exiting the with statement will block until all calls have completed.
724*9c5db199SXin Li
725*9c5db199SXin Li  Args:
726*9c5db199SXin Li    task: Function to run on each queued input.
727*9c5db199SXin Li    queue: A queue of tasks to run. Add tasks to this queue, and they will
728*9c5db199SXin Li      be run in the background.  If None, one will be created on the fly.
729*9c5db199SXin Li    processes: Number of processes to launch.
730*9c5db199SXin Li    onexit: Function to run in each background process after all inputs are
731*9c5db199SXin Li      processed.
732*9c5db199SXin Li    halt_on_error: After the first exception occurs, halt any running steps, and
733*9c5db199SXin Li      squelch any further output, including any exceptions that might occur.
734*9c5db199SXin Li      Halts on exceptions in any of the background processes, or in the
735*9c5db199SXin Li      foreground process using the BackgroundTaskRunner.
736*9c5db199SXin Li  """
737*9c5db199SXin Li
738*9c5db199SXin Li  queue = kwargs.pop('queue', None)
739*9c5db199SXin Li  processes = kwargs.pop('processes', None)
740*9c5db199SXin Li  onexit = kwargs.pop('onexit', None)
741*9c5db199SXin Li  halt_on_error = kwargs.pop('halt_on_error', False)
742*9c5db199SXin Li
743*9c5db199SXin Li  with cros_build_lib.ContextManagerStack() as stack:
744*9c5db199SXin Li    if queue is None:
745*9c5db199SXin Li      manager = stack.Add(Manager)
746*9c5db199SXin Li      queue = manager.Queue()
747*9c5db199SXin Li
748*9c5db199SXin Li    if not processes:
749*9c5db199SXin Li      processes = multiprocessing.cpu_count()
750*9c5db199SXin Li
751*9c5db199SXin Li    child = functools.partial(_BackgroundTask.TaskRunner, queue, task,
752*9c5db199SXin Li                              onexit=onexit, task_args=args,
753*9c5db199SXin Li                              task_kwargs=kwargs)
754*9c5db199SXin Li    steps = [child] * processes
755*9c5db199SXin Li    with _BackgroundTask.ParallelTasks(steps, halt_on_error=halt_on_error):
756*9c5db199SXin Li      try:
757*9c5db199SXin Li        yield queue
758*9c5db199SXin Li      finally:
759*9c5db199SXin Li        for _ in range(processes):
760*9c5db199SXin Li          queue.put(_AllTasksComplete())
761*9c5db199SXin Li
762*9c5db199SXin Li
763*9c5db199SXin Lidef RunTasksInProcessPool(task, inputs, processes=None, onexit=None):
764*9c5db199SXin Li  """Run the specified function with each supplied input in a pool of processes.
765*9c5db199SXin Li
766*9c5db199SXin Li  This function runs task(*x) for x in inputs in a pool of processes. This
767*9c5db199SXin Li  function blocks until all tasks are completed.
768*9c5db199SXin Li
769*9c5db199SXin Li  The output from these tasks is saved to a temporary file. When control
770*9c5db199SXin Li  returns to the context manager, the background output is printed in order,
771*9c5db199SXin Li  as if the tasks were run in sequence.
772*9c5db199SXin Li
773*9c5db199SXin Li  If exceptions occur in the steps, we join together the tracebacks and print
774*9c5db199SXin Li  them after all parallel tasks have finished running. Further, a
775*9c5db199SXin Li  BackgroundFailure is raised with full stack traces of all exceptions.
776*9c5db199SXin Li
777*9c5db199SXin Li  Examples:
778*9c5db199SXin Li    # This snippet will execute in parallel:
779*9c5db199SXin Li    #   somefunc('hi', 'fat', 'code')
780*9c5db199SXin Li    #   somefunc('foo', 'bar', 'cow')
781*9c5db199SXin Li
782*9c5db199SXin Li    def somefunc(arg1, arg2, arg3):
783*9c5db199SXin Li      ...
784*9c5db199SXin Li    ...
785*9c5db199SXin Li    inputs = [
786*9c5db199SXin Li      ['hi', 'fat', 'code'],
787*9c5db199SXin Li      ['foo', 'bar', 'cow'],
788*9c5db199SXin Li    ]
789*9c5db199SXin Li    RunTasksInProcessPool(somefunc, inputs)
790*9c5db199SXin Li    # Blocks until all calls have completed.
791*9c5db199SXin Li
792*9c5db199SXin Li  Args:
793*9c5db199SXin Li    task: Function to run on each input.
794*9c5db199SXin Li    inputs: List of inputs.
795*9c5db199SXin Li    processes: Number of processes, at most, to launch.
796*9c5db199SXin Li    onexit: Function to run in each background process after all inputs are
797*9c5db199SXin Li      processed.
798*9c5db199SXin Li
799*9c5db199SXin Li  Returns:
800*9c5db199SXin Li    Returns a list containing the return values of the task for each input.
801*9c5db199SXin Li  """
802*9c5db199SXin Li  if not processes:
803*9c5db199SXin Li    # - Use >=16 processes by default, in case it's a network-bound operation.
804*9c5db199SXin Li    # - Try to use all of the CPUs, in case it's a CPU-bound operation.
805*9c5db199SXin Li    processes = min(max(16, multiprocessing.cpu_count()), len(inputs))
806*9c5db199SXin Li
807*9c5db199SXin Li  with Manager() as manager:
808*9c5db199SXin Li    # Set up output queue.
809*9c5db199SXin Li    out_queue = manager.Queue()
810*9c5db199SXin Li    fn = lambda idx, task_args: out_queue.put((idx, task(*task_args)))
811*9c5db199SXin Li
812*9c5db199SXin Li    # Micro-optimization: Setup the queue so that BackgroundTaskRunner
813*9c5db199SXin Li    # doesn't have to set up another Manager process.
814*9c5db199SXin Li    queue = manager.Queue()
815*9c5db199SXin Li
816*9c5db199SXin Li    with BackgroundTaskRunner(fn, queue=queue, processes=processes,
817*9c5db199SXin Li                              onexit=onexit) as queue:
818*9c5db199SXin Li      for idx, input_args in enumerate(inputs):
819*9c5db199SXin Li        queue.put((idx, input_args))
820*9c5db199SXin Li
821*9c5db199SXin Li    return [x[1] for x in sorted(out_queue.get() for _ in range(len(inputs)))]
822*9c5db199SXin Li
823*9c5db199SXin Li
824*9c5db199SXin LiPR_SET_PDEATHSIG = 1
825*9c5db199SXin Li
826*9c5db199SXin Li
827*9c5db199SXin Lidef ExitWithParent(sig=signal.SIGHUP):
828*9c5db199SXin Li  """Sets this process to receive |sig| when the parent dies.
829*9c5db199SXin Li
830*9c5db199SXin Li  Note: this uses libc, so it only works on linux.
831*9c5db199SXin Li
832*9c5db199SXin Li  Args:
833*9c5db199SXin Li    sig: Signal to recieve. Defaults to SIGHUP.
834*9c5db199SXin Li
835*9c5db199SXin Li  Returns:
836*9c5db199SXin Li    Whether we were successful in setting the deathsignal flag
837*9c5db199SXin Li  """
838*9c5db199SXin Li  libc_name = ctypes.util.find_library('c')
839*9c5db199SXin Li  if not libc_name:
840*9c5db199SXin Li    return False
841*9c5db199SXin Li  try:
842*9c5db199SXin Li    libc = ctypes.CDLL(libc_name)
843*9c5db199SXin Li    libc.prctl(PR_SET_PDEATHSIG, sig)
844*9c5db199SXin Li    return True
845*9c5db199SXin Li  # We might not be able to load the library (OSError), or prctl might be
846*9c5db199SXin Li  # missing (AttributeError)
847*9c5db199SXin Li  except (OSError, AttributeError):
848*9c5db199SXin Li    return False
849