1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4"""Implements ProcessPoolExecutor. 5 6The following diagram and text describe the data-flow through the system: 7 8|======================= In-process =====================|== Out-of-process ==| 9 10+----------+ +----------+ +--------+ +-----------+ +---------+ 11| | => | Work Ids | | | | Call Q | | Process | 12| | +----------+ | | +-----------+ | Pool | 13| | | ... | | | | ... | +---------+ 14| | | 6 | => | | => | 5, call() | => | | 15| | | 7 | | | | ... | | | 16| Process | | ... | | Local | +-----------+ | Process | 17| Pool | +----------+ | Worker | | #1..n | 18| Executor | | Thread | | | 19| | +----------- + | | +-----------+ | | 20| | <=> | Work Items | <=> | | <= | Result Q | <= | | 21| | +------------+ | | +-----------+ | | 22| | | 6: call() | | | | ... | | | 23| | | future | | | | 4, result | | | 24| | | ... | | | | 3, except | | | 25+----------+ +------------+ +--------+ +-----------+ +---------+ 26 27Executor.submit() called: 28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict 29- adds the id of the _WorkItem to the "Work Ids" queue 30 31Local worker thread: 32- reads work ids from the "Work Ids" queue and looks up the corresponding 33 WorkItem from the "Work Items" dict: if the work item has been cancelled then 34 it is simply removed from the dict, otherwise it is repackaged as a 35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" 36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because 37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). 38- reads _ResultItems from "Result Q", updates the future stored in the 39 "Work Items" dict and deletes the dict entry 40 41Process #1..n: 42- reads _CallItems from "Call Q", executes the calls, and puts the resulting 43 _ResultItems in "Result Q" 44""" 45 46__author__ = 'Brian Quinlan ([email protected])' 47 48import os 49from concurrent.futures import _base 50import queue 51import multiprocessing as mp 52import multiprocessing.connection 53from multiprocessing.queues import Queue 54import threading 55import weakref 56from functools import partial 57import itertools 58import sys 59from traceback import format_exception 60 61 62_threads_wakeups = weakref.WeakKeyDictionary() 63_global_shutdown = False 64 65 66class _ThreadWakeup: 67 def __init__(self): 68 self._closed = False 69 self._reader, self._writer = mp.Pipe(duplex=False) 70 71 def close(self): 72 if not self._closed: 73 self._closed = True 74 self._writer.close() 75 self._reader.close() 76 77 def wakeup(self): 78 if not self._closed: 79 self._writer.send_bytes(b"") 80 81 def clear(self): 82 if not self._closed: 83 while self._reader.poll(): 84 self._reader.recv_bytes() 85 86 87def _python_exit(): 88 global _global_shutdown 89 _global_shutdown = True 90 items = list(_threads_wakeups.items()) 91 for _, thread_wakeup in items: 92 # call not protected by ProcessPoolExecutor._shutdown_lock 93 thread_wakeup.wakeup() 94 for t, _ in items: 95 t.join() 96 97# Register for `_python_exit()` to be called just before joining all 98# non-daemon threads. This is used instead of `atexit.register()` for 99# compatibility with subinterpreters, which no longer support daemon threads. 100# See bpo-39812 for context. 101threading._register_atexit(_python_exit) 102 103# Controls how many more calls than processes will be queued in the call queue. 104# A smaller number will mean that processes spend more time idle waiting for 105# work while a larger number will make Future.cancel() succeed less frequently 106# (Futures in the call queue cannot be cancelled). 107EXTRA_QUEUED_CALLS = 1 108 109 110# On Windows, WaitForMultipleObjects is used to wait for processes to finish. 111# It can wait on, at most, 63 objects. There is an overhead of two objects: 112# - the result queue reader 113# - the thread wakeup reader 114_MAX_WINDOWS_WORKERS = 63 - 2 115 116# Hack to embed stringification of remote traceback in local traceback 117 118class _RemoteTraceback(Exception): 119 def __init__(self, tb): 120 self.tb = tb 121 def __str__(self): 122 return self.tb 123 124class _ExceptionWithTraceback: 125 def __init__(self, exc, tb): 126 tb = ''.join(format_exception(type(exc), exc, tb)) 127 self.exc = exc 128 # Traceback object needs to be garbage-collected as its frames 129 # contain references to all the objects in the exception scope 130 self.exc.__traceback__ = None 131 self.tb = '\n"""\n%s"""' % tb 132 def __reduce__(self): 133 return _rebuild_exc, (self.exc, self.tb) 134 135def _rebuild_exc(exc, tb): 136 exc.__cause__ = _RemoteTraceback(tb) 137 return exc 138 139class _WorkItem(object): 140 def __init__(self, future, fn, args, kwargs): 141 self.future = future 142 self.fn = fn 143 self.args = args 144 self.kwargs = kwargs 145 146class _ResultItem(object): 147 def __init__(self, work_id, exception=None, result=None, exit_pid=None): 148 self.work_id = work_id 149 self.exception = exception 150 self.result = result 151 self.exit_pid = exit_pid 152 153class _CallItem(object): 154 def __init__(self, work_id, fn, args, kwargs): 155 self.work_id = work_id 156 self.fn = fn 157 self.args = args 158 self.kwargs = kwargs 159 160 161class _SafeQueue(Queue): 162 """Safe Queue set exception to the future object linked to a job""" 163 def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, 164 thread_wakeup): 165 self.pending_work_items = pending_work_items 166 self.shutdown_lock = shutdown_lock 167 self.thread_wakeup = thread_wakeup 168 super().__init__(max_size, ctx=ctx) 169 170 def _on_queue_feeder_error(self, e, obj): 171 if isinstance(obj, _CallItem): 172 tb = format_exception(type(e), e, e.__traceback__) 173 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) 174 work_item = self.pending_work_items.pop(obj.work_id, None) 175 with self.shutdown_lock: 176 self.thread_wakeup.wakeup() 177 # work_item can be None if another process terminated. In this 178 # case, the executor_manager_thread fails all work_items 179 # with BrokenProcessPool 180 if work_item is not None: 181 work_item.future.set_exception(e) 182 else: 183 super()._on_queue_feeder_error(e, obj) 184 185 186def _get_chunks(*iterables, chunksize): 187 """ Iterates over zip()ed iterables in chunks. """ 188 it = zip(*iterables) 189 while True: 190 chunk = tuple(itertools.islice(it, chunksize)) 191 if not chunk: 192 return 193 yield chunk 194 195 196def _process_chunk(fn, chunk): 197 """ Processes a chunk of an iterable passed to map. 198 199 Runs the function passed to map() on a chunk of the 200 iterable passed to map. 201 202 This function is run in a separate process. 203 204 """ 205 return [fn(*args) for args in chunk] 206 207 208def _sendback_result(result_queue, work_id, result=None, exception=None, 209 exit_pid=None): 210 """Safely send back the given result or exception""" 211 try: 212 result_queue.put(_ResultItem(work_id, result=result, 213 exception=exception, exit_pid=exit_pid)) 214 except BaseException as e: 215 exc = _ExceptionWithTraceback(e, e.__traceback__) 216 result_queue.put(_ResultItem(work_id, exception=exc, 217 exit_pid=exit_pid)) 218 219 220def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): 221 """Evaluates calls from call_queue and places the results in result_queue. 222 223 This worker is run in a separate process. 224 225 Args: 226 call_queue: A ctx.Queue of _CallItems that will be read and 227 evaluated by the worker. 228 result_queue: A ctx.Queue of _ResultItems that will written 229 to by the worker. 230 initializer: A callable initializer, or None 231 initargs: A tuple of args for the initializer 232 """ 233 if initializer is not None: 234 try: 235 initializer(*initargs) 236 except BaseException: 237 _base.LOGGER.critical('Exception in initializer:', exc_info=True) 238 # The parent will notice that the process stopped and 239 # mark the pool broken 240 return 241 num_tasks = 0 242 exit_pid = None 243 while True: 244 call_item = call_queue.get(block=True) 245 if call_item is None: 246 # Wake up queue management thread 247 result_queue.put(os.getpid()) 248 return 249 250 if max_tasks is not None: 251 num_tasks += 1 252 if num_tasks >= max_tasks: 253 exit_pid = os.getpid() 254 255 try: 256 r = call_item.fn(*call_item.args, **call_item.kwargs) 257 except BaseException as e: 258 exc = _ExceptionWithTraceback(e, e.__traceback__) 259 _sendback_result(result_queue, call_item.work_id, exception=exc, 260 exit_pid=exit_pid) 261 else: 262 _sendback_result(result_queue, call_item.work_id, result=r, 263 exit_pid=exit_pid) 264 del r 265 266 # Liberate the resource as soon as possible, to avoid holding onto 267 # open files or shared memory that is not needed anymore 268 del call_item 269 270 if exit_pid is not None: 271 return 272 273 274class _ExecutorManagerThread(threading.Thread): 275 """Manages the communication between this process and the worker processes. 276 277 The manager is run in a local thread. 278 279 Args: 280 executor: A reference to the ProcessPoolExecutor that owns 281 this thread. A weakref will be own by the manager as well as 282 references to internal objects used to introspect the state of 283 the executor. 284 """ 285 286 def __init__(self, executor): 287 # Store references to necessary internals of the executor. 288 289 # A _ThreadWakeup to allow waking up the queue_manager_thread from the 290 # main Thread and avoid deadlocks caused by permanently locked queues. 291 self.thread_wakeup = executor._executor_manager_thread_wakeup 292 self.shutdown_lock = executor._shutdown_lock 293 294 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used 295 # to determine if the ProcessPoolExecutor has been garbage collected 296 # and that the manager can exit. 297 # When the executor gets garbage collected, the weakref callback 298 # will wake up the queue management thread so that it can terminate 299 # if there is no pending work item. 300 def weakref_cb(_, 301 thread_wakeup=self.thread_wakeup, 302 shutdown_lock=self.shutdown_lock): 303 mp.util.debug('Executor collected: triggering callback for' 304 ' QueueManager wakeup') 305 with shutdown_lock: 306 thread_wakeup.wakeup() 307 308 self.executor_reference = weakref.ref(executor, weakref_cb) 309 310 # A list of the ctx.Process instances used as workers. 311 self.processes = executor._processes 312 313 # A ctx.Queue that will be filled with _CallItems derived from 314 # _WorkItems for processing by the process workers. 315 self.call_queue = executor._call_queue 316 317 # A ctx.SimpleQueue of _ResultItems generated by the process workers. 318 self.result_queue = executor._result_queue 319 320 # A queue.Queue of work ids e.g. Queue([5, 6, ...]). 321 self.work_ids_queue = executor._work_ids 322 323 # Maximum number of tasks a worker process can execute before 324 # exiting safely 325 self.max_tasks_per_child = executor._max_tasks_per_child 326 327 # A dict mapping work ids to _WorkItems e.g. 328 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 329 self.pending_work_items = executor._pending_work_items 330 331 super().__init__() 332 333 def run(self): 334 # Main loop for the executor manager thread. 335 336 while True: 337 self.add_call_item_to_queue() 338 339 result_item, is_broken, cause = self.wait_result_broken_or_wakeup() 340 341 if is_broken: 342 self.terminate_broken(cause) 343 return 344 if result_item is not None: 345 self.process_result_item(result_item) 346 347 process_exited = result_item.exit_pid is not None 348 if process_exited: 349 p = self.processes.pop(result_item.exit_pid) 350 p.join() 351 352 # Delete reference to result_item to avoid keeping references 353 # while waiting on new results. 354 del result_item 355 356 if executor := self.executor_reference(): 357 if process_exited: 358 with self.shutdown_lock: 359 executor._adjust_process_count() 360 else: 361 executor._idle_worker_semaphore.release() 362 del executor 363 364 if self.is_shutting_down(): 365 self.flag_executor_shutting_down() 366 367 # When only canceled futures remain in pending_work_items, our 368 # next call to wait_result_broken_or_wakeup would hang forever. 369 # This makes sure we have some running futures or none at all. 370 self.add_call_item_to_queue() 371 372 # Since no new work items can be added, it is safe to shutdown 373 # this thread if there are no pending work items. 374 if not self.pending_work_items: 375 self.join_executor_internals() 376 return 377 378 def add_call_item_to_queue(self): 379 # Fills call_queue with _WorkItems from pending_work_items. 380 # This function never blocks. 381 while True: 382 if self.call_queue.full(): 383 return 384 try: 385 work_id = self.work_ids_queue.get(block=False) 386 except queue.Empty: 387 return 388 else: 389 work_item = self.pending_work_items[work_id] 390 391 if work_item.future.set_running_or_notify_cancel(): 392 self.call_queue.put(_CallItem(work_id, 393 work_item.fn, 394 work_item.args, 395 work_item.kwargs), 396 block=True) 397 else: 398 del self.pending_work_items[work_id] 399 continue 400 401 def wait_result_broken_or_wakeup(self): 402 # Wait for a result to be ready in the result_queue while checking 403 # that all worker processes are still running, or for a wake up 404 # signal send. The wake up signals come either from new tasks being 405 # submitted, from the executor being shutdown/gc-ed, or from the 406 # shutdown of the python interpreter. 407 result_reader = self.result_queue._reader 408 assert not self.thread_wakeup._closed 409 wakeup_reader = self.thread_wakeup._reader 410 readers = [result_reader, wakeup_reader] 411 worker_sentinels = [p.sentinel for p in list(self.processes.values())] 412 ready = mp.connection.wait(readers + worker_sentinels) 413 414 cause = None 415 is_broken = True 416 result_item = None 417 if result_reader in ready: 418 try: 419 result_item = result_reader.recv() 420 is_broken = False 421 except BaseException as e: 422 cause = format_exception(type(e), e, e.__traceback__) 423 424 elif wakeup_reader in ready: 425 is_broken = False 426 427 with self.shutdown_lock: 428 self.thread_wakeup.clear() 429 430 return result_item, is_broken, cause 431 432 def process_result_item(self, result_item): 433 # Process the received a result_item. This can be either the PID of a 434 # worker that exited gracefully or a _ResultItem 435 436 if isinstance(result_item, int): 437 # Clean shutdown of a worker using its PID 438 # (avoids marking the executor broken) 439 assert self.is_shutting_down() 440 p = self.processes.pop(result_item) 441 p.join() 442 if not self.processes: 443 self.join_executor_internals() 444 return 445 else: 446 # Received a _ResultItem so mark the future as completed. 447 work_item = self.pending_work_items.pop(result_item.work_id, None) 448 # work_item can be None if another process terminated (see above) 449 if work_item is not None: 450 if result_item.exception: 451 work_item.future.set_exception(result_item.exception) 452 else: 453 work_item.future.set_result(result_item.result) 454 455 def is_shutting_down(self): 456 # Check whether we should start shutting down the executor. 457 executor = self.executor_reference() 458 # No more work items can be added if: 459 # - The interpreter is shutting down OR 460 # - The executor that owns this worker has been collected OR 461 # - The executor that owns this worker has been shutdown. 462 return (_global_shutdown or executor is None 463 or executor._shutdown_thread) 464 465 def terminate_broken(self, cause): 466 # Terminate the executor because it is in a broken state. The cause 467 # argument can be used to display more information on the error that 468 # lead the executor into becoming broken. 469 470 # Mark the process pool broken so that submits fail right now. 471 executor = self.executor_reference() 472 if executor is not None: 473 executor._broken = ('A child process terminated ' 474 'abruptly, the process pool is not ' 475 'usable anymore') 476 executor._shutdown_thread = True 477 executor = None 478 479 # All pending tasks are to be marked failed with the following 480 # BrokenProcessPool error 481 bpe = BrokenProcessPool("A process in the process pool was " 482 "terminated abruptly while the future was " 483 "running or pending.") 484 if cause is not None: 485 bpe.__cause__ = _RemoteTraceback( 486 f"\n'''\n{''.join(cause)}'''") 487 488 # Mark pending tasks as failed. 489 for work_id, work_item in self.pending_work_items.items(): 490 work_item.future.set_exception(bpe) 491 # Delete references to object. See issue16284 492 del work_item 493 self.pending_work_items.clear() 494 495 # Terminate remaining workers forcibly: the queues or their 496 # locks may be in a dirty state and block forever. 497 for p in self.processes.values(): 498 p.terminate() 499 500 # clean up resources 501 self.join_executor_internals() 502 503 def flag_executor_shutting_down(self): 504 # Flag the executor as shutting down and cancel remaining tasks if 505 # requested as early as possible if it is not gc-ed yet. 506 executor = self.executor_reference() 507 if executor is not None: 508 executor._shutdown_thread = True 509 # Cancel pending work items if requested. 510 if executor._cancel_pending_futures: 511 # Cancel all pending futures and update pending_work_items 512 # to only have futures that are currently running. 513 new_pending_work_items = {} 514 for work_id, work_item in self.pending_work_items.items(): 515 if not work_item.future.cancel(): 516 new_pending_work_items[work_id] = work_item 517 self.pending_work_items = new_pending_work_items 518 # Drain work_ids_queue since we no longer need to 519 # add items to the call queue. 520 while True: 521 try: 522 self.work_ids_queue.get_nowait() 523 except queue.Empty: 524 break 525 # Make sure we do this only once to not waste time looping 526 # on running processes over and over. 527 executor._cancel_pending_futures = False 528 529 def shutdown_workers(self): 530 n_children_to_stop = self.get_n_children_alive() 531 n_sentinels_sent = 0 532 # Send the right number of sentinels, to make sure all children are 533 # properly terminated. 534 while (n_sentinels_sent < n_children_to_stop 535 and self.get_n_children_alive() > 0): 536 for i in range(n_children_to_stop - n_sentinels_sent): 537 try: 538 self.call_queue.put_nowait(None) 539 n_sentinels_sent += 1 540 except queue.Full: 541 break 542 543 def join_executor_internals(self): 544 self.shutdown_workers() 545 # Release the queue's resources as soon as possible. 546 self.call_queue.close() 547 self.call_queue.join_thread() 548 with self.shutdown_lock: 549 self.thread_wakeup.close() 550 # If .join() is not called on the created processes then 551 # some ctx.Queue methods may deadlock on Mac OS X. 552 for p in self.processes.values(): 553 p.join() 554 555 def get_n_children_alive(self): 556 # This is an upper bound on the number of children alive. 557 return sum(p.is_alive() for p in self.processes.values()) 558 559 560_system_limits_checked = False 561_system_limited = None 562 563 564def _check_system_limits(): 565 global _system_limits_checked, _system_limited 566 if _system_limits_checked: 567 if _system_limited: 568 raise NotImplementedError(_system_limited) 569 _system_limits_checked = True 570 try: 571 import multiprocessing.synchronize 572 except ImportError: 573 _system_limited = ( 574 "This Python build lacks multiprocessing.synchronize, usually due " 575 "to named semaphores being unavailable on this platform." 576 ) 577 raise NotImplementedError(_system_limited) 578 try: 579 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") 580 except (AttributeError, ValueError): 581 # sysconf not available or setting not available 582 return 583 if nsems_max == -1: 584 # indetermined limit, assume that limit is determined 585 # by available memory only 586 return 587 if nsems_max >= 256: 588 # minimum number of semaphores available 589 # according to POSIX 590 return 591 _system_limited = ("system provides too few semaphores (%d" 592 " available, 256 necessary)" % nsems_max) 593 raise NotImplementedError(_system_limited) 594 595 596def _chain_from_iterable_of_lists(iterable): 597 """ 598 Specialized implementation of itertools.chain.from_iterable. 599 Each item in *iterable* should be a list. This function is 600 careful not to keep references to yielded objects. 601 """ 602 for element in iterable: 603 element.reverse() 604 while element: 605 yield element.pop() 606 607 608class BrokenProcessPool(_base.BrokenExecutor): 609 """ 610 Raised when a process in a ProcessPoolExecutor terminated abruptly 611 while a future was in the running state. 612 """ 613 614 615class ProcessPoolExecutor(_base.Executor): 616 def __init__(self, max_workers=None, mp_context=None, 617 initializer=None, initargs=(), *, max_tasks_per_child=None): 618 """Initializes a new ProcessPoolExecutor instance. 619 620 Args: 621 max_workers: The maximum number of processes that can be used to 622 execute the given calls. If None or not given then as many 623 worker processes will be created as the machine has processors. 624 mp_context: A multiprocessing context to launch the workers. This 625 object should provide SimpleQueue, Queue and Process. Useful 626 to allow specific multiprocessing start methods. 627 initializer: A callable used to initialize worker processes. 628 initargs: A tuple of arguments to pass to the initializer. 629 max_tasks_per_child: The maximum number of tasks a worker process 630 can complete before it will exit and be replaced with a fresh 631 worker process. The default of None means worker process will 632 live as long as the executor. Requires a non-'fork' mp_context 633 start method. When given, we default to using 'spawn' if no 634 mp_context is supplied. 635 """ 636 _check_system_limits() 637 638 if max_workers is None: 639 self._max_workers = os.cpu_count() or 1 640 if sys.platform == 'win32': 641 self._max_workers = min(_MAX_WINDOWS_WORKERS, 642 self._max_workers) 643 else: 644 if max_workers <= 0: 645 raise ValueError("max_workers must be greater than 0") 646 elif (sys.platform == 'win32' and 647 max_workers > _MAX_WINDOWS_WORKERS): 648 raise ValueError( 649 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") 650 651 self._max_workers = max_workers 652 653 if mp_context is None: 654 if max_tasks_per_child is not None: 655 mp_context = mp.get_context("spawn") 656 else: 657 mp_context = mp.get_context() 658 self._mp_context = mp_context 659 660 # https://github.com/python/cpython/issues/90622 661 self._safe_to_dynamically_spawn_children = ( 662 self._mp_context.get_start_method(allow_none=False) != "fork") 663 664 if initializer is not None and not callable(initializer): 665 raise TypeError("initializer must be a callable") 666 self._initializer = initializer 667 self._initargs = initargs 668 669 if max_tasks_per_child is not None: 670 if not isinstance(max_tasks_per_child, int): 671 raise TypeError("max_tasks_per_child must be an integer") 672 elif max_tasks_per_child <= 0: 673 raise ValueError("max_tasks_per_child must be >= 1") 674 if self._mp_context.get_start_method(allow_none=False) == "fork": 675 # https://github.com/python/cpython/issues/90622 676 raise ValueError("max_tasks_per_child is incompatible with" 677 " the 'fork' multiprocessing start method;" 678 " supply a different mp_context.") 679 self._max_tasks_per_child = max_tasks_per_child 680 681 # Management thread 682 self._executor_manager_thread = None 683 684 # Map of pids to processes 685 self._processes = {} 686 687 # Shutdown is a two-step process. 688 self._shutdown_thread = False 689 self._shutdown_lock = threading.Lock() 690 self._idle_worker_semaphore = threading.Semaphore(0) 691 self._broken = False 692 self._queue_count = 0 693 self._pending_work_items = {} 694 self._cancel_pending_futures = False 695 696 # _ThreadWakeup is a communication channel used to interrupt the wait 697 # of the main loop of executor_manager_thread from another thread (e.g. 698 # when calling executor.submit or executor.shutdown). We do not use the 699 # _result_queue to send wakeup signals to the executor_manager_thread 700 # as it could result in a deadlock if a worker process dies with the 701 # _result_queue write lock still acquired. 702 # 703 # _shutdown_lock must be locked to access _ThreadWakeup. 704 self._executor_manager_thread_wakeup = _ThreadWakeup() 705 706 # Create communication channels for the executor 707 # Make the call queue slightly larger than the number of processes to 708 # prevent the worker processes from idling. But don't make it too big 709 # because futures in the call queue cannot be cancelled. 710 queue_size = self._max_workers + EXTRA_QUEUED_CALLS 711 self._call_queue = _SafeQueue( 712 max_size=queue_size, ctx=self._mp_context, 713 pending_work_items=self._pending_work_items, 714 shutdown_lock=self._shutdown_lock, 715 thread_wakeup=self._executor_manager_thread_wakeup) 716 # Killed worker processes can produce spurious "broken pipe" 717 # tracebacks in the queue's own worker thread. But we detect killed 718 # processes anyway, so silence the tracebacks. 719 self._call_queue._ignore_epipe = True 720 self._result_queue = mp_context.SimpleQueue() 721 self._work_ids = queue.Queue() 722 723 def _start_executor_manager_thread(self): 724 if self._executor_manager_thread is None: 725 # Start the processes so that their sentinels are known. 726 if not self._safe_to_dynamically_spawn_children: # ie, using fork. 727 self._launch_processes() 728 self._executor_manager_thread = _ExecutorManagerThread(self) 729 self._executor_manager_thread.start() 730 _threads_wakeups[self._executor_manager_thread] = \ 731 self._executor_manager_thread_wakeup 732 733 def _adjust_process_count(self): 734 # if there's an idle process, we don't need to spawn a new one. 735 if self._idle_worker_semaphore.acquire(blocking=False): 736 return 737 738 process_count = len(self._processes) 739 if process_count < self._max_workers: 740 # Assertion disabled as this codepath is also used to replace a 741 # worker that unexpectedly dies, even when using the 'fork' start 742 # method. That means there is still a potential deadlock bug. If a 743 # 'fork' mp_context worker dies, we'll be forking a new one when 744 # we know a thread is running (self._executor_manager_thread). 745 #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622' 746 self._spawn_process() 747 748 def _launch_processes(self): 749 # https://github.com/python/cpython/issues/90622 750 assert not self._executor_manager_thread, ( 751 'Processes cannot be fork()ed after the thread has started, ' 752 'deadlock in the child processes could result.') 753 for _ in range(len(self._processes), self._max_workers): 754 self._spawn_process() 755 756 def _spawn_process(self): 757 p = self._mp_context.Process( 758 target=_process_worker, 759 args=(self._call_queue, 760 self._result_queue, 761 self._initializer, 762 self._initargs, 763 self._max_tasks_per_child)) 764 p.start() 765 self._processes[p.pid] = p 766 767 def submit(self, fn, /, *args, **kwargs): 768 with self._shutdown_lock: 769 if self._broken: 770 raise BrokenProcessPool(self._broken) 771 if self._shutdown_thread: 772 raise RuntimeError('cannot schedule new futures after shutdown') 773 if _global_shutdown: 774 raise RuntimeError('cannot schedule new futures after ' 775 'interpreter shutdown') 776 777 f = _base.Future() 778 w = _WorkItem(f, fn, args, kwargs) 779 780 self._pending_work_items[self._queue_count] = w 781 self._work_ids.put(self._queue_count) 782 self._queue_count += 1 783 # Wake up queue management thread 784 self._executor_manager_thread_wakeup.wakeup() 785 786 if self._safe_to_dynamically_spawn_children: 787 self._adjust_process_count() 788 self._start_executor_manager_thread() 789 return f 790 submit.__doc__ = _base.Executor.submit.__doc__ 791 792 def map(self, fn, *iterables, timeout=None, chunksize=1): 793 """Returns an iterator equivalent to map(fn, iter). 794 795 Args: 796 fn: A callable that will take as many arguments as there are 797 passed iterables. 798 timeout: The maximum number of seconds to wait. If None, then there 799 is no limit on the wait time. 800 chunksize: If greater than one, the iterables will be chopped into 801 chunks of size chunksize and submitted to the process pool. 802 If set to one, the items in the list will be sent one at a time. 803 804 Returns: 805 An iterator equivalent to: map(func, *iterables) but the calls may 806 be evaluated out-of-order. 807 808 Raises: 809 TimeoutError: If the entire result iterator could not be generated 810 before the given timeout. 811 Exception: If fn(*args) raises for any values. 812 """ 813 if chunksize < 1: 814 raise ValueError("chunksize must be >= 1.") 815 816 results = super().map(partial(_process_chunk, fn), 817 _get_chunks(*iterables, chunksize=chunksize), 818 timeout=timeout) 819 return _chain_from_iterable_of_lists(results) 820 821 def shutdown(self, wait=True, *, cancel_futures=False): 822 with self._shutdown_lock: 823 self._cancel_pending_futures = cancel_futures 824 self._shutdown_thread = True 825 if self._executor_manager_thread_wakeup is not None: 826 # Wake up queue management thread 827 self._executor_manager_thread_wakeup.wakeup() 828 829 if self._executor_manager_thread is not None and wait: 830 self._executor_manager_thread.join() 831 # To reduce the risk of opening too many files, remove references to 832 # objects that use file descriptors. 833 self._executor_manager_thread = None 834 self._call_queue = None 835 if self._result_queue is not None and wait: 836 self._result_queue.close() 837 self._result_queue = None 838 self._processes = None 839 self._executor_manager_thread_wakeup = None 840 841 shutdown.__doc__ = _base.Executor.shutdown.__doc__ 842