1*9c5db199SXin Li# -*- coding: utf-8 -*- 2*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be 4*9c5db199SXin Li# found in the LICENSE file. 5*9c5db199SXin Li 6*9c5db199SXin Li"""Module for running cbuildbot stages in the background.""" 7*9c5db199SXin Li 8*9c5db199SXin Lifrom __future__ import print_function 9*9c5db199SXin Li 10*9c5db199SXin Liimport collections 11*9c5db199SXin Liimport contextlib 12*9c5db199SXin Liimport ctypes 13*9c5db199SXin Liimport errno 14*9c5db199SXin Liimport functools 15*9c5db199SXin Liimport multiprocessing 16*9c5db199SXin Lifrom multiprocessing.managers import SyncManager 17*9c5db199SXin Liimport os 18*9c5db199SXin Liimport signal 19*9c5db199SXin Liimport sys 20*9c5db199SXin Liimport time 21*9c5db199SXin Liimport traceback 22*9c5db199SXin Li 23*9c5db199SXin Liimport six 24*9c5db199SXin Lifrom six.moves import queue as Queue 25*9c5db199SXin Li 26*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import failures_lib 27*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import results_lib 28*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import cros_build_lib 29*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import cros_logging as logging 30*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import osutils 31*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import signals 32*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import timeout_util 33*9c5db199SXin Li 34*9c5db199SXin Li 35*9c5db199SXin Li_BUFSIZE = 1024 36*9c5db199SXin Li 37*9c5db199SXin Li 38*9c5db199SXin Liclass HackTimeoutSyncManager(SyncManager): 39*9c5db199SXin Li """Increase the process join timeout in SyncManager. 40*9c5db199SXin Li 41*9c5db199SXin Li The timeout for the manager process to join in the core library is 42*9c5db199SXin Li too low. The process is often killed before shutting down properly, 43*9c5db199SXin Li resulting in temporary directories (pymp-xxx) not being cleaned 44*9c5db199SXin Li up. This class increases the default timeout. 45*9c5db199SXin Li """ 46*9c5db199SXin Li 47*9c5db199SXin Li @staticmethod 48*9c5db199SXin Li def _finalize_manager(process, *args, **kwargs): 49*9c5db199SXin Li """Shutdown the manager process.""" 50*9c5db199SXin Li 51*9c5db199SXin Li def _join(functor, *args, **kwargs): 52*9c5db199SXin Li timeout = kwargs.get('timeout') 53*9c5db199SXin Li if not timeout is None and timeout < 1: 54*9c5db199SXin Li kwargs['timeout'] = 1 55*9c5db199SXin Li 56*9c5db199SXin Li functor(*args, **kwargs) 57*9c5db199SXin Li 58*9c5db199SXin Li process.join = functools.partial(_join, process.join) 59*9c5db199SXin Li SyncManager._finalize_manager(process, *args, **kwargs) 60*9c5db199SXin Li 61*9c5db199SXin Li 62*9c5db199SXin Lidef IgnoreSigintAndSigterm(): 63*9c5db199SXin Li """Ignores any future SIGINTs and SIGTERMs.""" 64*9c5db199SXin Li signal.signal(signal.SIGINT, signal.SIG_IGN) 65*9c5db199SXin Li signal.signal(signal.SIGTERM, signal.SIG_IGN) 66*9c5db199SXin Li 67*9c5db199SXin Li 68*9c5db199SXin Lidef Manager(): 69*9c5db199SXin Li """Create a background process for managing interprocess communication. 70*9c5db199SXin Li 71*9c5db199SXin Li This manager wraps multiprocessing.Manager() and ensures that any sockets 72*9c5db199SXin Li created during initialization are created under the /tmp tree rather than in a 73*9c5db199SXin Li custom temp directory. This is needed because TMPDIR might be really long, and 74*9c5db199SXin Li named sockets are limited to 108 characters. 75*9c5db199SXin Li 76*9c5db199SXin Li Examples: 77*9c5db199SXin Li with Manager() as manager: 78*9c5db199SXin Li queue = manager.Queue() 79*9c5db199SXin Li ... 80*9c5db199SXin Li 81*9c5db199SXin Li Returns: 82*9c5db199SXin Li The return value of multiprocessing.Manager() 83*9c5db199SXin Li """ 84*9c5db199SXin Li # Use a short directory in /tmp. Do not use /tmp directly to keep these 85*9c5db199SXin Li # temperary files together and because certain environments do not like too 86*9c5db199SXin Li # many top-level paths in /tmp (see crbug.com/945523). 87*9c5db199SXin Li # Make it mode 1777 to mirror /tmp, so that we don't have failures when root 88*9c5db199SXin Li # calls parallel first, and some other user calls it later. 89*9c5db199SXin Li tmp_dir = '/tmp/chromite.parallel.%d' % os.geteuid() 90*9c5db199SXin Li osutils.SafeMakedirs(tmp_dir, mode=0o1777) 91*9c5db199SXin Li old_tempdir_value, old_tempdir_env = osutils.SetGlobalTempDir(tmp_dir) 92*9c5db199SXin Li try: 93*9c5db199SXin Li m = HackTimeoutSyncManager() 94*9c5db199SXin Li # SyncManager doesn't handle KeyboardInterrupt exceptions well; pipes get 95*9c5db199SXin Li # broken and E_NOENT or E_PIPE errors are thrown from various places. We 96*9c5db199SXin Li # can just ignore SIGINT in the SyncManager and things will close properly 97*9c5db199SXin Li # when the enclosing with-statement exits. 98*9c5db199SXin Li m.start(IgnoreSigintAndSigterm) 99*9c5db199SXin Li return m 100*9c5db199SXin Li finally: 101*9c5db199SXin Li osutils.SetGlobalTempDir(old_tempdir_value, old_tempdir_env) 102*9c5db199SXin Li 103*9c5db199SXin Li 104*9c5db199SXin Liclass BackgroundFailure(failures_lib.CompoundFailure): 105*9c5db199SXin Li """Exception to show a step failed while running in a background process.""" 106*9c5db199SXin Li 107*9c5db199SXin Li 108*9c5db199SXin Liclass ProcessExitTimeout(Exception): 109*9c5db199SXin Li """Raised if a process cannot exit within the timeout.""" 110*9c5db199SXin Li 111*9c5db199SXin Li 112*9c5db199SXin Liclass ProcessUnexpectedExit(Exception): 113*9c5db199SXin Li """Raised if a process exits unexpectedly.""" 114*9c5db199SXin Li 115*9c5db199SXin Li 116*9c5db199SXin Liclass ProcessSilentTimeout(Exception): 117*9c5db199SXin Li """Raised when there is no output for a prolonged period of time.""" 118*9c5db199SXin Li 119*9c5db199SXin Li 120*9c5db199SXin Liclass UnexpectedException(Exception): 121*9c5db199SXin Li """Raised when exception occurs at an unexpected place.""" 122*9c5db199SXin Li 123*9c5db199SXin Li 124*9c5db199SXin Liclass _BackgroundTask(multiprocessing.Process): 125*9c5db199SXin Li """Run a task in the background. 126*9c5db199SXin Li 127*9c5db199SXin Li This task may be the 'Run' function from a buildbot stage or just a plain 128*9c5db199SXin Li function. It will be run in the background. Output from this task is saved 129*9c5db199SXin Li to a temporary file and is printed when the 'Wait' function is called. 130*9c5db199SXin Li """ 131*9c5db199SXin Li 132*9c5db199SXin Li # The time we give Python to startup and exit. 133*9c5db199SXin Li STARTUP_TIMEOUT = 60 * 5 134*9c5db199SXin Li EXIT_TIMEOUT = 60 * 10 135*9c5db199SXin Li 136*9c5db199SXin Li # The time we allow processes to be silent. This is in place so that we 137*9c5db199SXin Li # eventually catch hanging processes, and print the remainder of our output. 138*9c5db199SXin Li # Do not increase this. Instead, adjust your program to print regular progress 139*9c5db199SXin Li # updates, so that cbuildbot (and buildbot) can know that it has not hung. 140*9c5db199SXin Li SILENT_TIMEOUT = 60 * 145 141*9c5db199SXin Li 142*9c5db199SXin Li # The amount by which we reduce the SILENT_TIMEOUT every time we launch 143*9c5db199SXin Li # a subprocess. This helps ensure that children get a chance to enforce the 144*9c5db199SXin Li # SILENT_TIMEOUT prior to the parents enforcing it. 145*9c5db199SXin Li SILENT_TIMEOUT_STEP = 30 146*9c5db199SXin Li MINIMUM_SILENT_TIMEOUT = 60 * 135 147*9c5db199SXin Li 148*9c5db199SXin Li # The time before terminating or killing a task. 149*9c5db199SXin Li SIGTERM_TIMEOUT = 30 150*9c5db199SXin Li SIGKILL_TIMEOUT = 60 151*9c5db199SXin Li 152*9c5db199SXin Li # How long we allow debug commands to run (so we don't hang will trying to 153*9c5db199SXin Li # recover from a hang). 154*9c5db199SXin Li DEBUG_CMD_TIMEOUT = 60 155*9c5db199SXin Li 156*9c5db199SXin Li # Interval we check for updates from print statements. 157*9c5db199SXin Li PRINT_INTERVAL = 1 158*9c5db199SXin Li 159*9c5db199SXin Li def __init__(self, task, queue, semaphore=None, task_args=None, 160*9c5db199SXin Li task_kwargs=None): 161*9c5db199SXin Li """Create a new _BackgroundTask object. 162*9c5db199SXin Li 163*9c5db199SXin Li If semaphore is supplied, it will be acquired for the duration of the 164*9c5db199SXin Li steps that are run in the background. This can be used to limit the 165*9c5db199SXin Li number of simultaneous parallel tasks. 166*9c5db199SXin Li 167*9c5db199SXin Li Args: 168*9c5db199SXin Li task: The task (a functor) to run in the background. 169*9c5db199SXin Li queue: A queue to be used for managing communication between the parent 170*9c5db199SXin Li and child process. This queue must be valid for the length of the 171*9c5db199SXin Li life of the child process, until the parent has collected its status. 172*9c5db199SXin Li semaphore: The lock to hold while |task| runs. 173*9c5db199SXin Li task_args: A list of args to pass to the |task|. 174*9c5db199SXin Li task_kwargs: A dict of optional args to pass to the |task|. 175*9c5db199SXin Li """ 176*9c5db199SXin Li multiprocessing.Process.__init__(self) 177*9c5db199SXin Li self._task = task 178*9c5db199SXin Li self._queue = queue 179*9c5db199SXin Li self._semaphore = semaphore 180*9c5db199SXin Li self._started = multiprocessing.Event() 181*9c5db199SXin Li self._killing = multiprocessing.Event() 182*9c5db199SXin Li self._output = None 183*9c5db199SXin Li self._parent_pid = None 184*9c5db199SXin Li self._task_args = task_args if task_args else () 185*9c5db199SXin Li self._task_kwargs = task_kwargs if task_kwargs else {} 186*9c5db199SXin Li 187*9c5db199SXin Li def _WaitForStartup(self): 188*9c5db199SXin Li # TODO(davidjames): Use python-2.7 syntax to simplify this. 189*9c5db199SXin Li self._started.wait(self.STARTUP_TIMEOUT) 190*9c5db199SXin Li msg = 'Process failed to start in %d seconds' % self.STARTUP_TIMEOUT 191*9c5db199SXin Li assert self._started.is_set(), msg 192*9c5db199SXin Li 193*9c5db199SXin Li @classmethod 194*9c5db199SXin Li def _DebugRunCommand(cls, cmd, **kwargs): 195*9c5db199SXin Li """Swallow any exception run raises. 196*9c5db199SXin Li 197*9c5db199SXin Li Since these commands are for purely informational purposes, we don't 198*9c5db199SXin Li random issues causing the bot to die. 199*9c5db199SXin Li 200*9c5db199SXin Li Returns: 201*9c5db199SXin Li Stdout on success 202*9c5db199SXin Li """ 203*9c5db199SXin Li log_level = kwargs['debug_level'] 204*9c5db199SXin Li try: 205*9c5db199SXin Li with timeout_util.Timeout(cls.DEBUG_CMD_TIMEOUT): 206*9c5db199SXin Li return cros_build_lib.run(cmd, **kwargs).output 207*9c5db199SXin Li except (cros_build_lib.RunCommandError, timeout_util.TimeoutError) as e: 208*9c5db199SXin Li logging.log(log_level, 'Running %s failed: %s', cmd[0], str(e)) 209*9c5db199SXin Li return '' 210*9c5db199SXin Li 211*9c5db199SXin Li # Debug commands to run in gdb. A class member so tests can stub it out. 212*9c5db199SXin Li GDB_COMMANDS = ( 213*9c5db199SXin Li 'info proc all', 214*9c5db199SXin Li 'info threads', 215*9c5db199SXin Li 'thread apply all py-list', 216*9c5db199SXin Li 'thread apply all py-bt', 217*9c5db199SXin Li 'thread apply all bt', 218*9c5db199SXin Li 'detach', 219*9c5db199SXin Li ) 220*9c5db199SXin Li 221*9c5db199SXin Li @classmethod 222*9c5db199SXin Li def _DumpDebugPid(cls, log_level, pid): 223*9c5db199SXin Li """Dump debug info about the hanging |pid|.""" 224*9c5db199SXin Li pid = str(pid) 225*9c5db199SXin Li commands = ( 226*9c5db199SXin Li ('pstree', '-Apals', pid), 227*9c5db199SXin Li ('lsof', '-p', pid), 228*9c5db199SXin Li ) 229*9c5db199SXin Li for cmd in commands: 230*9c5db199SXin Li cls._DebugRunCommand(cmd, debug_level=log_level, check=False, 231*9c5db199SXin Li log_output=True, encoding='utf-8') 232*9c5db199SXin Li 233*9c5db199SXin Li stdin = '\n'.join(['echo \\n>>> %s\\n\n%s' % (x, x) 234*9c5db199SXin Li for x in cls.GDB_COMMANDS]) 235*9c5db199SXin Li cmd = ('gdb', '--nx', '-q', '-p', pid, '-ex', 'set prompt',) 236*9c5db199SXin Li cls._DebugRunCommand(cmd, debug_level=log_level, check=False, 237*9c5db199SXin Li log_output=True, input=stdin, encoding='utf-8') 238*9c5db199SXin Li 239*9c5db199SXin Li def Kill(self, sig, log_level, first=False): 240*9c5db199SXin Li """Kill process with signal, ignoring if the process is dead. 241*9c5db199SXin Li 242*9c5db199SXin Li Args: 243*9c5db199SXin Li sig: Signal to send. 244*9c5db199SXin Li log_level: The log level of log messages. 245*9c5db199SXin Li first: Whether this is the first signal we've sent. 246*9c5db199SXin Li """ 247*9c5db199SXin Li self._killing.set() 248*9c5db199SXin Li self._WaitForStartup() 249*9c5db199SXin Li if logging.getLogger().isEnabledFor(log_level): 250*9c5db199SXin Li # Dump debug information about the hanging process. 251*9c5db199SXin Li logging.log(log_level, 'Killing %r (sig=%r %s)', self.pid, sig, 252*9c5db199SXin Li signals.StrSignal(sig)) 253*9c5db199SXin Li 254*9c5db199SXin Li if first: 255*9c5db199SXin Li ppid = str(self.pid) 256*9c5db199SXin Li output = self._DebugRunCommand( 257*9c5db199SXin Li ('pgrep', '-P', ppid), debug_level=log_level, print_cmd=False, 258*9c5db199SXin Li check=False, capture_output=True) 259*9c5db199SXin Li for pid in [ppid] + output.splitlines(): 260*9c5db199SXin Li self._DumpDebugPid(log_level, pid) 261*9c5db199SXin Li 262*9c5db199SXin Li try: 263*9c5db199SXin Li os.kill(self.pid, sig) 264*9c5db199SXin Li except OSError as ex: 265*9c5db199SXin Li if ex.errno != errno.ESRCH: 266*9c5db199SXin Li raise 267*9c5db199SXin Li 268*9c5db199SXin Li def Cleanup(self, silent=False): 269*9c5db199SXin Li """Wait for a process to exit.""" 270*9c5db199SXin Li if os.getpid() != self._parent_pid or self._output is None: 271*9c5db199SXin Li return 272*9c5db199SXin Li try: 273*9c5db199SXin Li # Print output from subprocess. 274*9c5db199SXin Li if not silent and logging.getLogger().isEnabledFor(logging.DEBUG): 275*9c5db199SXin Li with open(self._output.name, 'r') as f: 276*9c5db199SXin Li for line in f: 277*9c5db199SXin Li logging.debug(line.rstrip('\n')) 278*9c5db199SXin Li finally: 279*9c5db199SXin Li # Clean up our temporary file. 280*9c5db199SXin Li osutils.SafeUnlink(self._output.name) 281*9c5db199SXin Li self._output.close() 282*9c5db199SXin Li self._output = None 283*9c5db199SXin Li 284*9c5db199SXin Li def Wait(self): 285*9c5db199SXin Li """Wait for the task to complete. 286*9c5db199SXin Li 287*9c5db199SXin Li Output from the task is printed as it runs. 288*9c5db199SXin Li 289*9c5db199SXin Li If an exception occurs, return a string containing the traceback. 290*9c5db199SXin Li """ 291*9c5db199SXin Li try: 292*9c5db199SXin Li # Flush stdout and stderr to be sure no output is interleaved. 293*9c5db199SXin Li sys.stdout.flush() 294*9c5db199SXin Li sys.stderr.flush() 295*9c5db199SXin Li 296*9c5db199SXin Li # File position pointers are shared across processes, so we must open 297*9c5db199SXin Li # our own file descriptor to ensure output is not lost. 298*9c5db199SXin Li self._WaitForStartup() 299*9c5db199SXin Li silent_death_time = time.time() + self.SILENT_TIMEOUT 300*9c5db199SXin Li results = [] 301*9c5db199SXin Li with open(self._output.name, 'r') as output: 302*9c5db199SXin Li pos = 0 303*9c5db199SXin Li running, exited_cleanly, task_errors, run_errors = (True, False, [], []) 304*9c5db199SXin Li while running: 305*9c5db199SXin Li # Check whether the process is still alive. 306*9c5db199SXin Li running = self.is_alive() 307*9c5db199SXin Li 308*9c5db199SXin Li try: 309*9c5db199SXin Li errors, results = \ 310*9c5db199SXin Li self._queue.get(True, self.PRINT_INTERVAL) 311*9c5db199SXin Li if errors: 312*9c5db199SXin Li task_errors.extend(errors) 313*9c5db199SXin Li 314*9c5db199SXin Li running = False 315*9c5db199SXin Li exited_cleanly = True 316*9c5db199SXin Li except Queue.Empty: 317*9c5db199SXin Li pass 318*9c5db199SXin Li 319*9c5db199SXin Li if not running: 320*9c5db199SXin Li # Wait for the process to actually exit. If the child doesn't exit 321*9c5db199SXin Li # in a timely fashion, kill it. 322*9c5db199SXin Li self.join(self.EXIT_TIMEOUT) 323*9c5db199SXin Li if self.exitcode is None: 324*9c5db199SXin Li msg = '%r hung for %r seconds' % (self, self.EXIT_TIMEOUT) 325*9c5db199SXin Li run_errors.extend( 326*9c5db199SXin Li failures_lib.CreateExceptInfo(ProcessExitTimeout(msg), '')) 327*9c5db199SXin Li self._KillChildren([self]) 328*9c5db199SXin Li elif not exited_cleanly: 329*9c5db199SXin Li msg = ('%r exited unexpectedly with code %s' 330*9c5db199SXin Li % (self, self.exitcode)) 331*9c5db199SXin Li run_errors.extend( 332*9c5db199SXin Li failures_lib.CreateExceptInfo(ProcessUnexpectedExit(msg), '')) 333*9c5db199SXin Li 334*9c5db199SXin Li # Read output from process. 335*9c5db199SXin Li output.seek(pos) 336*9c5db199SXin Li buf = output.read(_BUFSIZE) 337*9c5db199SXin Li 338*9c5db199SXin Li if buf: 339*9c5db199SXin Li silent_death_time = time.time() + self.SILENT_TIMEOUT 340*9c5db199SXin Li elif running and time.time() > silent_death_time: 341*9c5db199SXin Li msg = ('No output from %r for %r seconds' % 342*9c5db199SXin Li (self, self.SILENT_TIMEOUT)) 343*9c5db199SXin Li run_errors.extend( 344*9c5db199SXin Li failures_lib.CreateExceptInfo(ProcessSilentTimeout(msg), '')) 345*9c5db199SXin Li self._KillChildren([self]) 346*9c5db199SXin Li 347*9c5db199SXin Li # Read remaining output from the process. 348*9c5db199SXin Li output.seek(pos) 349*9c5db199SXin Li buf = output.read(_BUFSIZE) 350*9c5db199SXin Li running = False 351*9c5db199SXin Li 352*9c5db199SXin Li # Print output so far. 353*9c5db199SXin Li while buf: 354*9c5db199SXin Li sys.stdout.write(buf) 355*9c5db199SXin Li pos += len(buf) 356*9c5db199SXin Li if len(buf) < _BUFSIZE: 357*9c5db199SXin Li break 358*9c5db199SXin Li buf = output.read(_BUFSIZE) 359*9c5db199SXin Li 360*9c5db199SXin Li # Print error messages if anything exceptional occurred. 361*9c5db199SXin Li if run_errors: 362*9c5db199SXin Li logging.PrintBuildbotStepFailure() 363*9c5db199SXin Li traceback.print_stack() 364*9c5db199SXin Li logging.warning('\n'.join(x.str for x in run_errors if x)) 365*9c5db199SXin Li logging.info('\n'.join(x.str for x in task_errors if x)) 366*9c5db199SXin Li 367*9c5db199SXin Li sys.stdout.flush() 368*9c5db199SXin Li sys.stderr.flush() 369*9c5db199SXin Li 370*9c5db199SXin Li # Propagate any results. 371*9c5db199SXin Li for result in results: 372*9c5db199SXin Li results_lib.Results.Record(*result) 373*9c5db199SXin Li 374*9c5db199SXin Li finally: 375*9c5db199SXin Li self.Cleanup(silent=True) 376*9c5db199SXin Li 377*9c5db199SXin Li # If an error occurred, return it. 378*9c5db199SXin Li return run_errors + task_errors 379*9c5db199SXin Li 380*9c5db199SXin Li def start(self): 381*9c5db199SXin Li """Invoke multiprocessing.Process.start after flushing output/err.""" 382*9c5db199SXin Li if self.SILENT_TIMEOUT < self.MINIMUM_SILENT_TIMEOUT: 383*9c5db199SXin Li raise AssertionError('Maximum recursion depth exceeded in %r' % self) 384*9c5db199SXin Li 385*9c5db199SXin Li sys.stdout.flush() 386*9c5db199SXin Li sys.stderr.flush() 387*9c5db199SXin Li tmp_dir = '/tmp/chromite.parallel.%d' % os.geteuid() 388*9c5db199SXin Li osutils.SafeMakedirs(tmp_dir, mode=0o1777) 389*9c5db199SXin Li self._output = cros_build_lib.UnbufferedNamedTemporaryFile( 390*9c5db199SXin Li delete=False, dir=tmp_dir, prefix='chromite-parallel-') 391*9c5db199SXin Li self._parent_pid = os.getpid() 392*9c5db199SXin Li return multiprocessing.Process.start(self) 393*9c5db199SXin Li 394*9c5db199SXin Li def run(self): 395*9c5db199SXin Li """Run the list of steps.""" 396*9c5db199SXin Li if self._semaphore is not None: 397*9c5db199SXin Li self._semaphore.acquire() 398*9c5db199SXin Li 399*9c5db199SXin Li errors = failures_lib.CreateExceptInfo( 400*9c5db199SXin Li UnexpectedException('Unexpected exception in %r' % self), '') 401*9c5db199SXin Li pid = os.getpid() 402*9c5db199SXin Li try: 403*9c5db199SXin Li errors = self._Run() 404*9c5db199SXin Li finally: 405*9c5db199SXin Li if not self._killing.is_set() and os.getpid() == pid: 406*9c5db199SXin Li results = results_lib.Results.Get() 407*9c5db199SXin Li self._queue.put((errors, results)) 408*9c5db199SXin Li if self._semaphore is not None: 409*9c5db199SXin Li self._semaphore.release() 410*9c5db199SXin Li 411*9c5db199SXin Li def _Run(self): 412*9c5db199SXin Li """Internal method for running the list of steps.""" 413*9c5db199SXin Li # Register a handler for a signal that is rarely used. 414*9c5db199SXin Li def trigger_bt(_sig_num, frame): 415*9c5db199SXin Li logging.error('pre-kill notification (SIGXCPU); traceback:\n%s', 416*9c5db199SXin Li ''.join(traceback.format_stack(frame))) 417*9c5db199SXin Li signal.signal(signal.SIGXCPU, trigger_bt) 418*9c5db199SXin Li 419*9c5db199SXin Li sys.stdout.flush() 420*9c5db199SXin Li sys.stderr.flush() 421*9c5db199SXin Li errors = [] 422*9c5db199SXin Li # Send all output to a named temporary file. 423*9c5db199SXin Li with open(self._output.name, 'wb', 0) as output: 424*9c5db199SXin Li # Back up sys.std{err,out}. These aren't used, but we keep a copy so 425*9c5db199SXin Li # that they aren't garbage collected. We intentionally don't restore 426*9c5db199SXin Li # the old stdout and stderr at the end, because we want shutdown errors 427*9c5db199SXin Li # to also be sent to the same log file. 428*9c5db199SXin Li _orig_stdout, _orig_stderr = sys.stdout, sys.stderr 429*9c5db199SXin Li 430*9c5db199SXin Li # Replace std{out,err} with unbuffered file objects. 431*9c5db199SXin Li os.dup2(output.fileno(), sys.__stdout__.fileno()) 432*9c5db199SXin Li os.dup2(output.fileno(), sys.__stderr__.fileno()) 433*9c5db199SXin Li # The API of these funcs changed between versions. 434*9c5db199SXin Li if sys.version_info.major < 3: 435*9c5db199SXin Li sys.stdout = os.fdopen(sys.__stdout__.fileno(), 'w', 0) 436*9c5db199SXin Li sys.stderr = os.fdopen(sys.__stderr__.fileno(), 'w', 0) 437*9c5db199SXin Li else: 438*9c5db199SXin Li sys.stdout = os.fdopen(sys.__stdout__.fileno(), 'w', closefd=False) 439*9c5db199SXin Li sys.stderr = os.fdopen(sys.__stderr__.fileno(), 'w', closefd=False) 440*9c5db199SXin Li 441*9c5db199SXin Li try: 442*9c5db199SXin Li self._started.set() 443*9c5db199SXin Li results_lib.Results.Clear() 444*9c5db199SXin Li 445*9c5db199SXin Li # Reduce the silent timeout by the prescribed amount. 446*9c5db199SXin Li cls = self.__class__ 447*9c5db199SXin Li cls.SILENT_TIMEOUT -= cls.SILENT_TIMEOUT_STEP 448*9c5db199SXin Li 449*9c5db199SXin Li # Actually launch the task. 450*9c5db199SXin Li self._task(*self._task_args, **self._task_kwargs) 451*9c5db199SXin Li except failures_lib.StepFailure as ex: 452*9c5db199SXin Li errors.extend(failures_lib.CreateExceptInfo( 453*9c5db199SXin Li ex, traceback.format_exc())) 454*9c5db199SXin Li except BaseException as ex: 455*9c5db199SXin Li errors.extend(failures_lib.CreateExceptInfo( 456*9c5db199SXin Li ex, traceback.format_exc())) 457*9c5db199SXin Li if self._killing.is_set(): 458*9c5db199SXin Li traceback.print_exc() 459*9c5db199SXin Li finally: 460*9c5db199SXin Li sys.stdout.flush() 461*9c5db199SXin Li sys.stderr.flush() 462*9c5db199SXin Li 463*9c5db199SXin Li return errors 464*9c5db199SXin Li 465*9c5db199SXin Li @classmethod 466*9c5db199SXin Li def _KillChildren(cls, bg_tasks, log_level=logging.WARNING): 467*9c5db199SXin Li """Kill a deque of background tasks. 468*9c5db199SXin Li 469*9c5db199SXin Li This is needed to prevent hangs in the case where child processes refuse 470*9c5db199SXin Li to exit. 471*9c5db199SXin Li 472*9c5db199SXin Li Args: 473*9c5db199SXin Li bg_tasks: A list filled with _BackgroundTask objects. 474*9c5db199SXin Li log_level: The log level of log messages. 475*9c5db199SXin Li """ 476*9c5db199SXin Li logging.log(log_level, 'Killing tasks: %r', bg_tasks) 477*9c5db199SXin Li siglist = ( 478*9c5db199SXin Li (signal.SIGXCPU, cls.SIGTERM_TIMEOUT), 479*9c5db199SXin Li (signal.SIGTERM, cls.SIGKILL_TIMEOUT), 480*9c5db199SXin Li (signal.SIGKILL, None), 481*9c5db199SXin Li ) 482*9c5db199SXin Li first = True 483*9c5db199SXin Li for sig, timeout in siglist: 484*9c5db199SXin Li # Send signal to all tasks. 485*9c5db199SXin Li for task in bg_tasks: 486*9c5db199SXin Li task.Kill(sig, log_level, first) 487*9c5db199SXin Li first = False 488*9c5db199SXin Li 489*9c5db199SXin Li # Wait for all tasks to exit, if requested. 490*9c5db199SXin Li if timeout is None: 491*9c5db199SXin Li for task in bg_tasks: 492*9c5db199SXin Li task.join() 493*9c5db199SXin Li task.Cleanup() 494*9c5db199SXin Li break 495*9c5db199SXin Li 496*9c5db199SXin Li # Wait until timeout expires. 497*9c5db199SXin Li end_time = time.time() + timeout 498*9c5db199SXin Li while bg_tasks: 499*9c5db199SXin Li time_left = end_time - time.time() 500*9c5db199SXin Li if time_left <= 0: 501*9c5db199SXin Li break 502*9c5db199SXin Li task = bg_tasks[-1] 503*9c5db199SXin Li task.join(time_left) 504*9c5db199SXin Li if task.exitcode is not None: 505*9c5db199SXin Li task.Cleanup() 506*9c5db199SXin Li bg_tasks.pop() 507*9c5db199SXin Li 508*9c5db199SXin Li @classmethod 509*9c5db199SXin Li @contextlib.contextmanager 510*9c5db199SXin Li def ParallelTasks(cls, steps, max_parallel=None, halt_on_error=False): 511*9c5db199SXin Li """Run a list of functions in parallel. 512*9c5db199SXin Li 513*9c5db199SXin Li This function launches the provided functions in the background, yields, 514*9c5db199SXin Li and then waits for the functions to exit. 515*9c5db199SXin Li 516*9c5db199SXin Li The output from the functions is saved to a temporary file and printed as if 517*9c5db199SXin Li they were run in sequence. 518*9c5db199SXin Li 519*9c5db199SXin Li If exceptions occur in the steps, we join together the tracebacks and print 520*9c5db199SXin Li them after all parallel tasks have finished running. Further, a 521*9c5db199SXin Li BackgroundFailure is raised with full stack traces of all exceptions. 522*9c5db199SXin Li 523*9c5db199SXin Li Args: 524*9c5db199SXin Li steps: A list of functions to run. 525*9c5db199SXin Li max_parallel: The maximum number of simultaneous tasks to run in parallel. 526*9c5db199SXin Li By default, run all tasks in parallel. 527*9c5db199SXin Li halt_on_error: After the first exception occurs, halt any running steps, 528*9c5db199SXin Li and squelch any further output, including any exceptions that might 529*9c5db199SXin Li occur. 530*9c5db199SXin Li """ 531*9c5db199SXin Li 532*9c5db199SXin Li semaphore = None 533*9c5db199SXin Li if max_parallel is not None: 534*9c5db199SXin Li semaphore = multiprocessing.Semaphore(max_parallel) 535*9c5db199SXin Li 536*9c5db199SXin Li # First, start all the steps. 537*9c5db199SXin Li with Manager() as manager: 538*9c5db199SXin Li bg_tasks = collections.deque() 539*9c5db199SXin Li for step in steps: 540*9c5db199SXin Li task = cls(step, queue=manager.Queue(), semaphore=semaphore) 541*9c5db199SXin Li task.start() 542*9c5db199SXin Li bg_tasks.append(task) 543*9c5db199SXin Li 544*9c5db199SXin Li foreground_except = None 545*9c5db199SXin Li try: 546*9c5db199SXin Li yield 547*9c5db199SXin Li except BaseException: 548*9c5db199SXin Li foreground_except = sys.exc_info() 549*9c5db199SXin Li finally: 550*9c5db199SXin Li errors = [] 551*9c5db199SXin Li skip_bg_wait = halt_on_error and foreground_except is not None 552*9c5db199SXin Li # Wait for each step to complete. 553*9c5db199SXin Li while not skip_bg_wait and bg_tasks: 554*9c5db199SXin Li task = bg_tasks.popleft() 555*9c5db199SXin Li task_errors = task.Wait() 556*9c5db199SXin Li if task_errors: 557*9c5db199SXin Li errors.extend(task_errors) 558*9c5db199SXin Li if halt_on_error: 559*9c5db199SXin Li break 560*9c5db199SXin Li 561*9c5db199SXin Li # If there are still tasks left, kill them. 562*9c5db199SXin Li if bg_tasks: 563*9c5db199SXin Li cls._KillChildren(bg_tasks, log_level=logging.DEBUG) 564*9c5db199SXin Li 565*9c5db199SXin Li # Propagate any exceptions; foreground exceptions take precedence. 566*9c5db199SXin Li if foreground_except is not None: 567*9c5db199SXin Li # contextlib ignores caught exceptions unless explicitly re-raised. 568*9c5db199SXin Li six.reraise(foreground_except[0], foreground_except[1], 569*9c5db199SXin Li foreground_except[2]) 570*9c5db199SXin Li if errors: 571*9c5db199SXin Li raise BackgroundFailure(exc_infos=errors) 572*9c5db199SXin Li 573*9c5db199SXin Li @staticmethod 574*9c5db199SXin Li def TaskRunner(queue, task, onexit=None, task_args=None, task_kwargs=None): 575*9c5db199SXin Li """Run task(*input) for each input in the queue. 576*9c5db199SXin Li 577*9c5db199SXin Li Returns when it encounters an _AllTasksComplete object on the queue. 578*9c5db199SXin Li If exceptions occur, save them off and re-raise them as a 579*9c5db199SXin Li BackgroundFailure once we've finished processing the items in the queue. 580*9c5db199SXin Li 581*9c5db199SXin Li Args: 582*9c5db199SXin Li queue: A queue of tasks to run. Add tasks to this queue, and they will 583*9c5db199SXin Li be run. 584*9c5db199SXin Li task: Function to run on each queued input. 585*9c5db199SXin Li onexit: Function to run after all inputs are processed. 586*9c5db199SXin Li task_args: A list of args to pass to the |task|. 587*9c5db199SXin Li task_kwargs: A dict of optional args to pass to the |task|. 588*9c5db199SXin Li """ 589*9c5db199SXin Li if task_args is None: 590*9c5db199SXin Li task_args = [] 591*9c5db199SXin Li elif not isinstance(task_args, list): 592*9c5db199SXin Li task_args = list(task_args) 593*9c5db199SXin Li if task_kwargs is None: 594*9c5db199SXin Li task_kwargs = {} 595*9c5db199SXin Li 596*9c5db199SXin Li errors = [] 597*9c5db199SXin Li while True: 598*9c5db199SXin Li # Wait for a new item to show up on the queue. This is a blocking wait, 599*9c5db199SXin Li # so if there's nothing to do, we just sit here. 600*9c5db199SXin Li x = queue.get() 601*9c5db199SXin Li if isinstance(x, _AllTasksComplete): 602*9c5db199SXin Li # All tasks are complete, so we should exit. 603*9c5db199SXin Li break 604*9c5db199SXin Li elif not isinstance(x, list): 605*9c5db199SXin Li x = task_args + list(x) 606*9c5db199SXin Li else: 607*9c5db199SXin Li x = task_args + x 608*9c5db199SXin Li 609*9c5db199SXin Li # If no tasks failed yet, process the remaining tasks. 610*9c5db199SXin Li if not errors: 611*9c5db199SXin Li try: 612*9c5db199SXin Li task(*x, **task_kwargs) 613*9c5db199SXin Li except BaseException as ex: 614*9c5db199SXin Li errors.extend( 615*9c5db199SXin Li failures_lib.CreateExceptInfo(ex, traceback.format_exc())) 616*9c5db199SXin Li 617*9c5db199SXin Li # Run exit handlers. 618*9c5db199SXin Li if onexit: 619*9c5db199SXin Li onexit() 620*9c5db199SXin Li 621*9c5db199SXin Li # Propagate any exceptions. 622*9c5db199SXin Li if errors: 623*9c5db199SXin Li raise BackgroundFailure(exc_infos=errors) 624*9c5db199SXin Li 625*9c5db199SXin Li 626*9c5db199SXin Lidef RunParallelSteps(steps, max_parallel=None, halt_on_error=False, 627*9c5db199SXin Li return_values=False): 628*9c5db199SXin Li """Run a list of functions in parallel. 629*9c5db199SXin Li 630*9c5db199SXin Li This function blocks until all steps are completed. 631*9c5db199SXin Li 632*9c5db199SXin Li The output from the functions is saved to a temporary file and printed as if 633*9c5db199SXin Li they were run in sequence. 634*9c5db199SXin Li 635*9c5db199SXin Li If exceptions occur in the steps, we join together the tracebacks and print 636*9c5db199SXin Li them after all parallel tasks have finished running. Further, a 637*9c5db199SXin Li BackgroundFailure is raised with full stack traces of all exceptions. 638*9c5db199SXin Li 639*9c5db199SXin Li Examples: 640*9c5db199SXin Li # This snippet will execute in parallel: 641*9c5db199SXin Li # somefunc() 642*9c5db199SXin Li # anotherfunc() 643*9c5db199SXin Li # funcfunc() 644*9c5db199SXin Li steps = [somefunc, anotherfunc, funcfunc] 645*9c5db199SXin Li RunParallelSteps(steps) 646*9c5db199SXin Li # Blocks until all calls have completed. 647*9c5db199SXin Li 648*9c5db199SXin Li Args: 649*9c5db199SXin Li steps: A list of functions to run. 650*9c5db199SXin Li max_parallel: The maximum number of simultaneous tasks to run in parallel. 651*9c5db199SXin Li By default, run all tasks in parallel. 652*9c5db199SXin Li halt_on_error: After the first exception occurs, halt any running steps, 653*9c5db199SXin Li and squelch any further output, including any exceptions that might occur. 654*9c5db199SXin Li return_values: If set to True, RunParallelSteps returns a list containing 655*9c5db199SXin Li the return values of the steps. Defaults to False. 656*9c5db199SXin Li 657*9c5db199SXin Li Returns: 658*9c5db199SXin Li If |return_values| is True, the function will return a list containing the 659*9c5db199SXin Li return values of the steps. 660*9c5db199SXin Li """ 661*9c5db199SXin Li def ReturnWrapper(queue, fn): 662*9c5db199SXin Li """Put the return value of |fn| into |queue|.""" 663*9c5db199SXin Li queue.put(fn()) 664*9c5db199SXin Li 665*9c5db199SXin Li full_steps = [] 666*9c5db199SXin Li queues = [] 667*9c5db199SXin Li with cros_build_lib.ContextManagerStack() as stack: 668*9c5db199SXin Li if return_values: 669*9c5db199SXin Li # We use a managed queue here, because the child process will wait for the 670*9c5db199SXin Li # queue(pipe) to be flushed (i.e., when items are read from the queue) 671*9c5db199SXin Li # before exiting, and with a regular queue this may result in hangs for 672*9c5db199SXin Li # large return values. But with a managed queue, the manager process will 673*9c5db199SXin Li # read the items and hold on to them until the managed queue goes out of 674*9c5db199SXin Li # scope and is cleaned up. 675*9c5db199SXin Li manager = stack.Add(Manager) 676*9c5db199SXin Li for step in steps: 677*9c5db199SXin Li queue = manager.Queue() 678*9c5db199SXin Li queues.append(queue) 679*9c5db199SXin Li full_steps.append(functools.partial(ReturnWrapper, queue, step)) 680*9c5db199SXin Li else: 681*9c5db199SXin Li full_steps = steps 682*9c5db199SXin Li 683*9c5db199SXin Li with _BackgroundTask.ParallelTasks(full_steps, max_parallel=max_parallel, 684*9c5db199SXin Li halt_on_error=halt_on_error): 685*9c5db199SXin Li pass 686*9c5db199SXin Li 687*9c5db199SXin Li if return_values: 688*9c5db199SXin Li return [queue.get_nowait() for queue in queues] 689*9c5db199SXin Li 690*9c5db199SXin Li 691*9c5db199SXin Liclass _AllTasksComplete(object): 692*9c5db199SXin Li """Sentinel object to indicate that all tasks are complete.""" 693*9c5db199SXin Li 694*9c5db199SXin Li 695*9c5db199SXin Li@contextlib.contextmanager 696*9c5db199SXin Lidef BackgroundTaskRunner(task, *args, **kwargs): 697*9c5db199SXin Li """Run the specified task on each queued input in a pool of processes. 698*9c5db199SXin Li 699*9c5db199SXin Li This context manager starts a set of workers in the background, who each 700*9c5db199SXin Li wait for input on the specified queue. For each input on the queue, these 701*9c5db199SXin Li workers run task(*args + *input, **kwargs). Note that certain kwargs will 702*9c5db199SXin Li not pass through to the task (see Args below for the list). 703*9c5db199SXin Li 704*9c5db199SXin Li The output from these tasks is saved to a temporary file. When control 705*9c5db199SXin Li returns to the context manager, the background output is printed in order, 706*9c5db199SXin Li as if the tasks were run in sequence. 707*9c5db199SXin Li 708*9c5db199SXin Li If exceptions occur in the steps, we join together the tracebacks and print 709*9c5db199SXin Li them after all parallel tasks have finished running. Further, a 710*9c5db199SXin Li BackgroundFailure is raised with full stack traces of all exceptions. 711*9c5db199SXin Li 712*9c5db199SXin Li Examples: 713*9c5db199SXin Li # This will run somefunc(1, 'small', 'cow', foo='bar') in the background 714*9c5db199SXin Li # as soon as data is added to the queue (i.e. queue.put() is called). 715*9c5db199SXin Li 716*9c5db199SXin Li def somefunc(arg1, arg2, arg3, foo=None): 717*9c5db199SXin Li ... 718*9c5db199SXin Li 719*9c5db199SXin Li with BackgroundTaskRunner(somefunc, 1, foo='bar') as queue: 720*9c5db199SXin Li ... do random stuff ... 721*9c5db199SXin Li queue.put(['small', 'cow']) 722*9c5db199SXin Li ... do more random stuff while somefunc() runs ... 723*9c5db199SXin Li # Exiting the with statement will block until all calls have completed. 724*9c5db199SXin Li 725*9c5db199SXin Li Args: 726*9c5db199SXin Li task: Function to run on each queued input. 727*9c5db199SXin Li queue: A queue of tasks to run. Add tasks to this queue, and they will 728*9c5db199SXin Li be run in the background. If None, one will be created on the fly. 729*9c5db199SXin Li processes: Number of processes to launch. 730*9c5db199SXin Li onexit: Function to run in each background process after all inputs are 731*9c5db199SXin Li processed. 732*9c5db199SXin Li halt_on_error: After the first exception occurs, halt any running steps, and 733*9c5db199SXin Li squelch any further output, including any exceptions that might occur. 734*9c5db199SXin Li Halts on exceptions in any of the background processes, or in the 735*9c5db199SXin Li foreground process using the BackgroundTaskRunner. 736*9c5db199SXin Li """ 737*9c5db199SXin Li 738*9c5db199SXin Li queue = kwargs.pop('queue', None) 739*9c5db199SXin Li processes = kwargs.pop('processes', None) 740*9c5db199SXin Li onexit = kwargs.pop('onexit', None) 741*9c5db199SXin Li halt_on_error = kwargs.pop('halt_on_error', False) 742*9c5db199SXin Li 743*9c5db199SXin Li with cros_build_lib.ContextManagerStack() as stack: 744*9c5db199SXin Li if queue is None: 745*9c5db199SXin Li manager = stack.Add(Manager) 746*9c5db199SXin Li queue = manager.Queue() 747*9c5db199SXin Li 748*9c5db199SXin Li if not processes: 749*9c5db199SXin Li processes = multiprocessing.cpu_count() 750*9c5db199SXin Li 751*9c5db199SXin Li child = functools.partial(_BackgroundTask.TaskRunner, queue, task, 752*9c5db199SXin Li onexit=onexit, task_args=args, 753*9c5db199SXin Li task_kwargs=kwargs) 754*9c5db199SXin Li steps = [child] * processes 755*9c5db199SXin Li with _BackgroundTask.ParallelTasks(steps, halt_on_error=halt_on_error): 756*9c5db199SXin Li try: 757*9c5db199SXin Li yield queue 758*9c5db199SXin Li finally: 759*9c5db199SXin Li for _ in range(processes): 760*9c5db199SXin Li queue.put(_AllTasksComplete()) 761*9c5db199SXin Li 762*9c5db199SXin Li 763*9c5db199SXin Lidef RunTasksInProcessPool(task, inputs, processes=None, onexit=None): 764*9c5db199SXin Li """Run the specified function with each supplied input in a pool of processes. 765*9c5db199SXin Li 766*9c5db199SXin Li This function runs task(*x) for x in inputs in a pool of processes. This 767*9c5db199SXin Li function blocks until all tasks are completed. 768*9c5db199SXin Li 769*9c5db199SXin Li The output from these tasks is saved to a temporary file. When control 770*9c5db199SXin Li returns to the context manager, the background output is printed in order, 771*9c5db199SXin Li as if the tasks were run in sequence. 772*9c5db199SXin Li 773*9c5db199SXin Li If exceptions occur in the steps, we join together the tracebacks and print 774*9c5db199SXin Li them after all parallel tasks have finished running. Further, a 775*9c5db199SXin Li BackgroundFailure is raised with full stack traces of all exceptions. 776*9c5db199SXin Li 777*9c5db199SXin Li Examples: 778*9c5db199SXin Li # This snippet will execute in parallel: 779*9c5db199SXin Li # somefunc('hi', 'fat', 'code') 780*9c5db199SXin Li # somefunc('foo', 'bar', 'cow') 781*9c5db199SXin Li 782*9c5db199SXin Li def somefunc(arg1, arg2, arg3): 783*9c5db199SXin Li ... 784*9c5db199SXin Li ... 785*9c5db199SXin Li inputs = [ 786*9c5db199SXin Li ['hi', 'fat', 'code'], 787*9c5db199SXin Li ['foo', 'bar', 'cow'], 788*9c5db199SXin Li ] 789*9c5db199SXin Li RunTasksInProcessPool(somefunc, inputs) 790*9c5db199SXin Li # Blocks until all calls have completed. 791*9c5db199SXin Li 792*9c5db199SXin Li Args: 793*9c5db199SXin Li task: Function to run on each input. 794*9c5db199SXin Li inputs: List of inputs. 795*9c5db199SXin Li processes: Number of processes, at most, to launch. 796*9c5db199SXin Li onexit: Function to run in each background process after all inputs are 797*9c5db199SXin Li processed. 798*9c5db199SXin Li 799*9c5db199SXin Li Returns: 800*9c5db199SXin Li Returns a list containing the return values of the task for each input. 801*9c5db199SXin Li """ 802*9c5db199SXin Li if not processes: 803*9c5db199SXin Li # - Use >=16 processes by default, in case it's a network-bound operation. 804*9c5db199SXin Li # - Try to use all of the CPUs, in case it's a CPU-bound operation. 805*9c5db199SXin Li processes = min(max(16, multiprocessing.cpu_count()), len(inputs)) 806*9c5db199SXin Li 807*9c5db199SXin Li with Manager() as manager: 808*9c5db199SXin Li # Set up output queue. 809*9c5db199SXin Li out_queue = manager.Queue() 810*9c5db199SXin Li fn = lambda idx, task_args: out_queue.put((idx, task(*task_args))) 811*9c5db199SXin Li 812*9c5db199SXin Li # Micro-optimization: Setup the queue so that BackgroundTaskRunner 813*9c5db199SXin Li # doesn't have to set up another Manager process. 814*9c5db199SXin Li queue = manager.Queue() 815*9c5db199SXin Li 816*9c5db199SXin Li with BackgroundTaskRunner(fn, queue=queue, processes=processes, 817*9c5db199SXin Li onexit=onexit) as queue: 818*9c5db199SXin Li for idx, input_args in enumerate(inputs): 819*9c5db199SXin Li queue.put((idx, input_args)) 820*9c5db199SXin Li 821*9c5db199SXin Li return [x[1] for x in sorted(out_queue.get() for _ in range(len(inputs)))] 822*9c5db199SXin Li 823*9c5db199SXin Li 824*9c5db199SXin LiPR_SET_PDEATHSIG = 1 825*9c5db199SXin Li 826*9c5db199SXin Li 827*9c5db199SXin Lidef ExitWithParent(sig=signal.SIGHUP): 828*9c5db199SXin Li """Sets this process to receive |sig| when the parent dies. 829*9c5db199SXin Li 830*9c5db199SXin Li Note: this uses libc, so it only works on linux. 831*9c5db199SXin Li 832*9c5db199SXin Li Args: 833*9c5db199SXin Li sig: Signal to recieve. Defaults to SIGHUP. 834*9c5db199SXin Li 835*9c5db199SXin Li Returns: 836*9c5db199SXin Li Whether we were successful in setting the deathsignal flag 837*9c5db199SXin Li """ 838*9c5db199SXin Li libc_name = ctypes.util.find_library('c') 839*9c5db199SXin Li if not libc_name: 840*9c5db199SXin Li return False 841*9c5db199SXin Li try: 842*9c5db199SXin Li libc = ctypes.CDLL(libc_name) 843*9c5db199SXin Li libc.prctl(PR_SET_PDEATHSIG, sig) 844*9c5db199SXin Li return True 845*9c5db199SXin Li # We might not be able to load the library (OSError), or prctl might be 846*9c5db199SXin Li # missing (AttributeError) 847*9c5db199SXin Li except (OSError, AttributeError): 848*9c5db199SXin Li return False 849