xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/concurrent/futures/process.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The following diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+     +----------+       +--------+     +-----------+    +---------+
11|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
12|          |     +----------+       |        |     +-----------+    |  Pool   |
13|          |     | ...      |       |        |     | ...       |    +---------+
14|          |     | 6        |    => |        |  => | 5, call() | => |         |
15|          |     | 7        |       |        |     | ...       |    |         |
16| Process  |     | ...      |       | Local  |     +-----------+    | Process |
17|  Pool    |     +----------+       | Worker |                      |  #1..n  |
18| Executor |                        | Thread |                      |         |
19|          |     +----------- +     |        |     +-----------+    |         |
20|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
21|          |     +------------+     |        |     +-----------+    |         |
22|          |     | 6: call()  |     |        |     | ...       |    |         |
23|          |     |    future  |     |        |     | 4, result |    |         |
24|          |     | ...        |     |        |     | 3, except |    |         |
25+----------+     +------------+     +--------+     +-----------+    +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33  WorkItem from the "Work Items" dict: if the work item has been cancelled then
34  it is simply removed from the dict, otherwise it is repackaged as a
35  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39  "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43  _ResultItems in "Result Q"
44"""
45
46__author__ = 'Brian Quinlan ([email protected])'
47
48import os
49from concurrent.futures import _base
50import queue
51import multiprocessing as mp
52import multiprocessing.connection
53from multiprocessing.queues import Queue
54import threading
55import weakref
56from functools import partial
57import itertools
58import sys
59from traceback import format_exception
60
61
62_threads_wakeups = weakref.WeakKeyDictionary()
63_global_shutdown = False
64
65
66class _ThreadWakeup:
67    def __init__(self):
68        self._closed = False
69        self._reader, self._writer = mp.Pipe(duplex=False)
70
71    def close(self):
72        if not self._closed:
73            self._closed = True
74            self._writer.close()
75            self._reader.close()
76
77    def wakeup(self):
78        if not self._closed:
79            self._writer.send_bytes(b"")
80
81    def clear(self):
82        if not self._closed:
83            while self._reader.poll():
84                self._reader.recv_bytes()
85
86
87def _python_exit():
88    global _global_shutdown
89    _global_shutdown = True
90    items = list(_threads_wakeups.items())
91    for _, thread_wakeup in items:
92        # call not protected by ProcessPoolExecutor._shutdown_lock
93        thread_wakeup.wakeup()
94    for t, _ in items:
95        t.join()
96
97# Register for `_python_exit()` to be called just before joining all
98# non-daemon threads. This is used instead of `atexit.register()` for
99# compatibility with subinterpreters, which no longer support daemon threads.
100# See bpo-39812 for context.
101threading._register_atexit(_python_exit)
102
103# Controls how many more calls than processes will be queued in the call queue.
104# A smaller number will mean that processes spend more time idle waiting for
105# work while a larger number will make Future.cancel() succeed less frequently
106# (Futures in the call queue cannot be cancelled).
107EXTRA_QUEUED_CALLS = 1
108
109
110# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
111# It can wait on, at most, 63 objects. There is an overhead of two objects:
112# - the result queue reader
113# - the thread wakeup reader
114_MAX_WINDOWS_WORKERS = 63 - 2
115
116# Hack to embed stringification of remote traceback in local traceback
117
118class _RemoteTraceback(Exception):
119    def __init__(self, tb):
120        self.tb = tb
121    def __str__(self):
122        return self.tb
123
124class _ExceptionWithTraceback:
125    def __init__(self, exc, tb):
126        tb = ''.join(format_exception(type(exc), exc, tb))
127        self.exc = exc
128        # Traceback object needs to be garbage-collected as its frames
129        # contain references to all the objects in the exception scope
130        self.exc.__traceback__ = None
131        self.tb = '\n"""\n%s"""' % tb
132    def __reduce__(self):
133        return _rebuild_exc, (self.exc, self.tb)
134
135def _rebuild_exc(exc, tb):
136    exc.__cause__ = _RemoteTraceback(tb)
137    return exc
138
139class _WorkItem(object):
140    def __init__(self, future, fn, args, kwargs):
141        self.future = future
142        self.fn = fn
143        self.args = args
144        self.kwargs = kwargs
145
146class _ResultItem(object):
147    def __init__(self, work_id, exception=None, result=None, exit_pid=None):
148        self.work_id = work_id
149        self.exception = exception
150        self.result = result
151        self.exit_pid = exit_pid
152
153class _CallItem(object):
154    def __init__(self, work_id, fn, args, kwargs):
155        self.work_id = work_id
156        self.fn = fn
157        self.args = args
158        self.kwargs = kwargs
159
160
161class _SafeQueue(Queue):
162    """Safe Queue set exception to the future object linked to a job"""
163    def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
164                 thread_wakeup):
165        self.pending_work_items = pending_work_items
166        self.shutdown_lock = shutdown_lock
167        self.thread_wakeup = thread_wakeup
168        super().__init__(max_size, ctx=ctx)
169
170    def _on_queue_feeder_error(self, e, obj):
171        if isinstance(obj, _CallItem):
172            tb = format_exception(type(e), e, e.__traceback__)
173            e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
174            work_item = self.pending_work_items.pop(obj.work_id, None)
175            with self.shutdown_lock:
176                self.thread_wakeup.wakeup()
177            # work_item can be None if another process terminated. In this
178            # case, the executor_manager_thread fails all work_items
179            # with BrokenProcessPool
180            if work_item is not None:
181                work_item.future.set_exception(e)
182        else:
183            super()._on_queue_feeder_error(e, obj)
184
185
186def _get_chunks(*iterables, chunksize):
187    """ Iterates over zip()ed iterables in chunks. """
188    it = zip(*iterables)
189    while True:
190        chunk = tuple(itertools.islice(it, chunksize))
191        if not chunk:
192            return
193        yield chunk
194
195
196def _process_chunk(fn, chunk):
197    """ Processes a chunk of an iterable passed to map.
198
199    Runs the function passed to map() on a chunk of the
200    iterable passed to map.
201
202    This function is run in a separate process.
203
204    """
205    return [fn(*args) for args in chunk]
206
207
208def _sendback_result(result_queue, work_id, result=None, exception=None,
209                     exit_pid=None):
210    """Safely send back the given result or exception"""
211    try:
212        result_queue.put(_ResultItem(work_id, result=result,
213                                     exception=exception, exit_pid=exit_pid))
214    except BaseException as e:
215        exc = _ExceptionWithTraceback(e, e.__traceback__)
216        result_queue.put(_ResultItem(work_id, exception=exc,
217                                     exit_pid=exit_pid))
218
219
220def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
221    """Evaluates calls from call_queue and places the results in result_queue.
222
223    This worker is run in a separate process.
224
225    Args:
226        call_queue: A ctx.Queue of _CallItems that will be read and
227            evaluated by the worker.
228        result_queue: A ctx.Queue of _ResultItems that will written
229            to by the worker.
230        initializer: A callable initializer, or None
231        initargs: A tuple of args for the initializer
232    """
233    if initializer is not None:
234        try:
235            initializer(*initargs)
236        except BaseException:
237            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
238            # The parent will notice that the process stopped and
239            # mark the pool broken
240            return
241    num_tasks = 0
242    exit_pid = None
243    while True:
244        call_item = call_queue.get(block=True)
245        if call_item is None:
246            # Wake up queue management thread
247            result_queue.put(os.getpid())
248            return
249
250        if max_tasks is not None:
251            num_tasks += 1
252            if num_tasks >= max_tasks:
253                exit_pid = os.getpid()
254
255        try:
256            r = call_item.fn(*call_item.args, **call_item.kwargs)
257        except BaseException as e:
258            exc = _ExceptionWithTraceback(e, e.__traceback__)
259            _sendback_result(result_queue, call_item.work_id, exception=exc,
260                             exit_pid=exit_pid)
261        else:
262            _sendback_result(result_queue, call_item.work_id, result=r,
263                             exit_pid=exit_pid)
264            del r
265
266        # Liberate the resource as soon as possible, to avoid holding onto
267        # open files or shared memory that is not needed anymore
268        del call_item
269
270        if exit_pid is not None:
271            return
272
273
274class _ExecutorManagerThread(threading.Thread):
275    """Manages the communication between this process and the worker processes.
276
277    The manager is run in a local thread.
278
279    Args:
280        executor: A reference to the ProcessPoolExecutor that owns
281            this thread. A weakref will be own by the manager as well as
282            references to internal objects used to introspect the state of
283            the executor.
284    """
285
286    def __init__(self, executor):
287        # Store references to necessary internals of the executor.
288
289        # A _ThreadWakeup to allow waking up the queue_manager_thread from the
290        # main Thread and avoid deadlocks caused by permanently locked queues.
291        self.thread_wakeup = executor._executor_manager_thread_wakeup
292        self.shutdown_lock = executor._shutdown_lock
293
294        # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
295        # to determine if the ProcessPoolExecutor has been garbage collected
296        # and that the manager can exit.
297        # When the executor gets garbage collected, the weakref callback
298        # will wake up the queue management thread so that it can terminate
299        # if there is no pending work item.
300        def weakref_cb(_,
301                       thread_wakeup=self.thread_wakeup,
302                       shutdown_lock=self.shutdown_lock):
303            mp.util.debug('Executor collected: triggering callback for'
304                          ' QueueManager wakeup')
305            with shutdown_lock:
306                thread_wakeup.wakeup()
307
308        self.executor_reference = weakref.ref(executor, weakref_cb)
309
310        # A list of the ctx.Process instances used as workers.
311        self.processes = executor._processes
312
313        # A ctx.Queue that will be filled with _CallItems derived from
314        # _WorkItems for processing by the process workers.
315        self.call_queue = executor._call_queue
316
317        # A ctx.SimpleQueue of _ResultItems generated by the process workers.
318        self.result_queue = executor._result_queue
319
320        # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
321        self.work_ids_queue = executor._work_ids
322
323        # Maximum number of tasks a worker process can execute before
324        # exiting safely
325        self.max_tasks_per_child = executor._max_tasks_per_child
326
327        # A dict mapping work ids to _WorkItems e.g.
328        #     {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
329        self.pending_work_items = executor._pending_work_items
330
331        super().__init__()
332
333    def run(self):
334        # Main loop for the executor manager thread.
335
336        while True:
337            self.add_call_item_to_queue()
338
339            result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
340
341            if is_broken:
342                self.terminate_broken(cause)
343                return
344            if result_item is not None:
345                self.process_result_item(result_item)
346
347                process_exited = result_item.exit_pid is not None
348                if process_exited:
349                    p = self.processes.pop(result_item.exit_pid)
350                    p.join()
351
352                # Delete reference to result_item to avoid keeping references
353                # while waiting on new results.
354                del result_item
355
356                if executor := self.executor_reference():
357                    if process_exited:
358                        with self.shutdown_lock:
359                            executor._adjust_process_count()
360                    else:
361                        executor._idle_worker_semaphore.release()
362                    del executor
363
364            if self.is_shutting_down():
365                self.flag_executor_shutting_down()
366
367                # When only canceled futures remain in pending_work_items, our
368                # next call to wait_result_broken_or_wakeup would hang forever.
369                # This makes sure we have some running futures or none at all.
370                self.add_call_item_to_queue()
371
372                # Since no new work items can be added, it is safe to shutdown
373                # this thread if there are no pending work items.
374                if not self.pending_work_items:
375                    self.join_executor_internals()
376                    return
377
378    def add_call_item_to_queue(self):
379        # Fills call_queue with _WorkItems from pending_work_items.
380        # This function never blocks.
381        while True:
382            if self.call_queue.full():
383                return
384            try:
385                work_id = self.work_ids_queue.get(block=False)
386            except queue.Empty:
387                return
388            else:
389                work_item = self.pending_work_items[work_id]
390
391                if work_item.future.set_running_or_notify_cancel():
392                    self.call_queue.put(_CallItem(work_id,
393                                                  work_item.fn,
394                                                  work_item.args,
395                                                  work_item.kwargs),
396                                        block=True)
397                else:
398                    del self.pending_work_items[work_id]
399                    continue
400
401    def wait_result_broken_or_wakeup(self):
402        # Wait for a result to be ready in the result_queue while checking
403        # that all worker processes are still running, or for a wake up
404        # signal send. The wake up signals come either from new tasks being
405        # submitted, from the executor being shutdown/gc-ed, or from the
406        # shutdown of the python interpreter.
407        result_reader = self.result_queue._reader
408        assert not self.thread_wakeup._closed
409        wakeup_reader = self.thread_wakeup._reader
410        readers = [result_reader, wakeup_reader]
411        worker_sentinels = [p.sentinel for p in list(self.processes.values())]
412        ready = mp.connection.wait(readers + worker_sentinels)
413
414        cause = None
415        is_broken = True
416        result_item = None
417        if result_reader in ready:
418            try:
419                result_item = result_reader.recv()
420                is_broken = False
421            except BaseException as e:
422                cause = format_exception(type(e), e, e.__traceback__)
423
424        elif wakeup_reader in ready:
425            is_broken = False
426
427        with self.shutdown_lock:
428            self.thread_wakeup.clear()
429
430        return result_item, is_broken, cause
431
432    def process_result_item(self, result_item):
433        # Process the received a result_item. This can be either the PID of a
434        # worker that exited gracefully or a _ResultItem
435
436        if isinstance(result_item, int):
437            # Clean shutdown of a worker using its PID
438            # (avoids marking the executor broken)
439            assert self.is_shutting_down()
440            p = self.processes.pop(result_item)
441            p.join()
442            if not self.processes:
443                self.join_executor_internals()
444                return
445        else:
446            # Received a _ResultItem so mark the future as completed.
447            work_item = self.pending_work_items.pop(result_item.work_id, None)
448            # work_item can be None if another process terminated (see above)
449            if work_item is not None:
450                if result_item.exception:
451                    work_item.future.set_exception(result_item.exception)
452                else:
453                    work_item.future.set_result(result_item.result)
454
455    def is_shutting_down(self):
456        # Check whether we should start shutting down the executor.
457        executor = self.executor_reference()
458        # No more work items can be added if:
459        #   - The interpreter is shutting down OR
460        #   - The executor that owns this worker has been collected OR
461        #   - The executor that owns this worker has been shutdown.
462        return (_global_shutdown or executor is None
463                or executor._shutdown_thread)
464
465    def terminate_broken(self, cause):
466        # Terminate the executor because it is in a broken state. The cause
467        # argument can be used to display more information on the error that
468        # lead the executor into becoming broken.
469
470        # Mark the process pool broken so that submits fail right now.
471        executor = self.executor_reference()
472        if executor is not None:
473            executor._broken = ('A child process terminated '
474                                'abruptly, the process pool is not '
475                                'usable anymore')
476            executor._shutdown_thread = True
477            executor = None
478
479        # All pending tasks are to be marked failed with the following
480        # BrokenProcessPool error
481        bpe = BrokenProcessPool("A process in the process pool was "
482                                "terminated abruptly while the future was "
483                                "running or pending.")
484        if cause is not None:
485            bpe.__cause__ = _RemoteTraceback(
486                f"\n'''\n{''.join(cause)}'''")
487
488        # Mark pending tasks as failed.
489        for work_id, work_item in self.pending_work_items.items():
490            work_item.future.set_exception(bpe)
491            # Delete references to object. See issue16284
492            del work_item
493        self.pending_work_items.clear()
494
495        # Terminate remaining workers forcibly: the queues or their
496        # locks may be in a dirty state and block forever.
497        for p in self.processes.values():
498            p.terminate()
499
500        # clean up resources
501        self.join_executor_internals()
502
503    def flag_executor_shutting_down(self):
504        # Flag the executor as shutting down and cancel remaining tasks if
505        # requested as early as possible if it is not gc-ed yet.
506        executor = self.executor_reference()
507        if executor is not None:
508            executor._shutdown_thread = True
509            # Cancel pending work items if requested.
510            if executor._cancel_pending_futures:
511                # Cancel all pending futures and update pending_work_items
512                # to only have futures that are currently running.
513                new_pending_work_items = {}
514                for work_id, work_item in self.pending_work_items.items():
515                    if not work_item.future.cancel():
516                        new_pending_work_items[work_id] = work_item
517                self.pending_work_items = new_pending_work_items
518                # Drain work_ids_queue since we no longer need to
519                # add items to the call queue.
520                while True:
521                    try:
522                        self.work_ids_queue.get_nowait()
523                    except queue.Empty:
524                        break
525                # Make sure we do this only once to not waste time looping
526                # on running processes over and over.
527                executor._cancel_pending_futures = False
528
529    def shutdown_workers(self):
530        n_children_to_stop = self.get_n_children_alive()
531        n_sentinels_sent = 0
532        # Send the right number of sentinels, to make sure all children are
533        # properly terminated.
534        while (n_sentinels_sent < n_children_to_stop
535                and self.get_n_children_alive() > 0):
536            for i in range(n_children_to_stop - n_sentinels_sent):
537                try:
538                    self.call_queue.put_nowait(None)
539                    n_sentinels_sent += 1
540                except queue.Full:
541                    break
542
543    def join_executor_internals(self):
544        self.shutdown_workers()
545        # Release the queue's resources as soon as possible.
546        self.call_queue.close()
547        self.call_queue.join_thread()
548        with self.shutdown_lock:
549            self.thread_wakeup.close()
550        # If .join() is not called on the created processes then
551        # some ctx.Queue methods may deadlock on Mac OS X.
552        for p in self.processes.values():
553            p.join()
554
555    def get_n_children_alive(self):
556        # This is an upper bound on the number of children alive.
557        return sum(p.is_alive() for p in self.processes.values())
558
559
560_system_limits_checked = False
561_system_limited = None
562
563
564def _check_system_limits():
565    global _system_limits_checked, _system_limited
566    if _system_limits_checked:
567        if _system_limited:
568            raise NotImplementedError(_system_limited)
569    _system_limits_checked = True
570    try:
571        import multiprocessing.synchronize
572    except ImportError:
573        _system_limited = (
574            "This Python build lacks multiprocessing.synchronize, usually due "
575            "to named semaphores being unavailable on this platform."
576        )
577        raise NotImplementedError(_system_limited)
578    try:
579        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
580    except (AttributeError, ValueError):
581        # sysconf not available or setting not available
582        return
583    if nsems_max == -1:
584        # indetermined limit, assume that limit is determined
585        # by available memory only
586        return
587    if nsems_max >= 256:
588        # minimum number of semaphores available
589        # according to POSIX
590        return
591    _system_limited = ("system provides too few semaphores (%d"
592                       " available, 256 necessary)" % nsems_max)
593    raise NotImplementedError(_system_limited)
594
595
596def _chain_from_iterable_of_lists(iterable):
597    """
598    Specialized implementation of itertools.chain.from_iterable.
599    Each item in *iterable* should be a list.  This function is
600    careful not to keep references to yielded objects.
601    """
602    for element in iterable:
603        element.reverse()
604        while element:
605            yield element.pop()
606
607
608class BrokenProcessPool(_base.BrokenExecutor):
609    """
610    Raised when a process in a ProcessPoolExecutor terminated abruptly
611    while a future was in the running state.
612    """
613
614
615class ProcessPoolExecutor(_base.Executor):
616    def __init__(self, max_workers=None, mp_context=None,
617                 initializer=None, initargs=(), *, max_tasks_per_child=None):
618        """Initializes a new ProcessPoolExecutor instance.
619
620        Args:
621            max_workers: The maximum number of processes that can be used to
622                execute the given calls. If None or not given then as many
623                worker processes will be created as the machine has processors.
624            mp_context: A multiprocessing context to launch the workers. This
625                object should provide SimpleQueue, Queue and Process. Useful
626                to allow specific multiprocessing start methods.
627            initializer: A callable used to initialize worker processes.
628            initargs: A tuple of arguments to pass to the initializer.
629            max_tasks_per_child: The maximum number of tasks a worker process
630                can complete before it will exit and be replaced with a fresh
631                worker process. The default of None means worker process will
632                live as long as the executor. Requires a non-'fork' mp_context
633                start method. When given, we default to using 'spawn' if no
634                mp_context is supplied.
635        """
636        _check_system_limits()
637
638        if max_workers is None:
639            self._max_workers = os.cpu_count() or 1
640            if sys.platform == 'win32':
641                self._max_workers = min(_MAX_WINDOWS_WORKERS,
642                                        self._max_workers)
643        else:
644            if max_workers <= 0:
645                raise ValueError("max_workers must be greater than 0")
646            elif (sys.platform == 'win32' and
647                max_workers > _MAX_WINDOWS_WORKERS):
648                raise ValueError(
649                    f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
650
651            self._max_workers = max_workers
652
653        if mp_context is None:
654            if max_tasks_per_child is not None:
655                mp_context = mp.get_context("spawn")
656            else:
657                mp_context = mp.get_context()
658        self._mp_context = mp_context
659
660        # https://github.com/python/cpython/issues/90622
661        self._safe_to_dynamically_spawn_children = (
662                self._mp_context.get_start_method(allow_none=False) != "fork")
663
664        if initializer is not None and not callable(initializer):
665            raise TypeError("initializer must be a callable")
666        self._initializer = initializer
667        self._initargs = initargs
668
669        if max_tasks_per_child is not None:
670            if not isinstance(max_tasks_per_child, int):
671                raise TypeError("max_tasks_per_child must be an integer")
672            elif max_tasks_per_child <= 0:
673                raise ValueError("max_tasks_per_child must be >= 1")
674            if self._mp_context.get_start_method(allow_none=False) == "fork":
675                # https://github.com/python/cpython/issues/90622
676                raise ValueError("max_tasks_per_child is incompatible with"
677                                 " the 'fork' multiprocessing start method;"
678                                 " supply a different mp_context.")
679        self._max_tasks_per_child = max_tasks_per_child
680
681        # Management thread
682        self._executor_manager_thread = None
683
684        # Map of pids to processes
685        self._processes = {}
686
687        # Shutdown is a two-step process.
688        self._shutdown_thread = False
689        self._shutdown_lock = threading.Lock()
690        self._idle_worker_semaphore = threading.Semaphore(0)
691        self._broken = False
692        self._queue_count = 0
693        self._pending_work_items = {}
694        self._cancel_pending_futures = False
695
696        # _ThreadWakeup is a communication channel used to interrupt the wait
697        # of the main loop of executor_manager_thread from another thread (e.g.
698        # when calling executor.submit or executor.shutdown). We do not use the
699        # _result_queue to send wakeup signals to the executor_manager_thread
700        # as it could result in a deadlock if a worker process dies with the
701        # _result_queue write lock still acquired.
702        #
703        # _shutdown_lock must be locked to access _ThreadWakeup.
704        self._executor_manager_thread_wakeup = _ThreadWakeup()
705
706        # Create communication channels for the executor
707        # Make the call queue slightly larger than the number of processes to
708        # prevent the worker processes from idling. But don't make it too big
709        # because futures in the call queue cannot be cancelled.
710        queue_size = self._max_workers + EXTRA_QUEUED_CALLS
711        self._call_queue = _SafeQueue(
712            max_size=queue_size, ctx=self._mp_context,
713            pending_work_items=self._pending_work_items,
714            shutdown_lock=self._shutdown_lock,
715            thread_wakeup=self._executor_manager_thread_wakeup)
716        # Killed worker processes can produce spurious "broken pipe"
717        # tracebacks in the queue's own worker thread. But we detect killed
718        # processes anyway, so silence the tracebacks.
719        self._call_queue._ignore_epipe = True
720        self._result_queue = mp_context.SimpleQueue()
721        self._work_ids = queue.Queue()
722
723    def _start_executor_manager_thread(self):
724        if self._executor_manager_thread is None:
725            # Start the processes so that their sentinels are known.
726            if not self._safe_to_dynamically_spawn_children:  # ie, using fork.
727                self._launch_processes()
728            self._executor_manager_thread = _ExecutorManagerThread(self)
729            self._executor_manager_thread.start()
730            _threads_wakeups[self._executor_manager_thread] = \
731                self._executor_manager_thread_wakeup
732
733    def _adjust_process_count(self):
734        # if there's an idle process, we don't need to spawn a new one.
735        if self._idle_worker_semaphore.acquire(blocking=False):
736            return
737
738        process_count = len(self._processes)
739        if process_count < self._max_workers:
740            # Assertion disabled as this codepath is also used to replace a
741            # worker that unexpectedly dies, even when using the 'fork' start
742            # method. That means there is still a potential deadlock bug. If a
743            # 'fork' mp_context worker dies, we'll be forking a new one when
744            # we know a thread is running (self._executor_manager_thread).
745            #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
746            self._spawn_process()
747
748    def _launch_processes(self):
749        # https://github.com/python/cpython/issues/90622
750        assert not self._executor_manager_thread, (
751                'Processes cannot be fork()ed after the thread has started, '
752                'deadlock in the child processes could result.')
753        for _ in range(len(self._processes), self._max_workers):
754            self._spawn_process()
755
756    def _spawn_process(self):
757        p = self._mp_context.Process(
758            target=_process_worker,
759            args=(self._call_queue,
760                  self._result_queue,
761                  self._initializer,
762                  self._initargs,
763                  self._max_tasks_per_child))
764        p.start()
765        self._processes[p.pid] = p
766
767    def submit(self, fn, /, *args, **kwargs):
768        with self._shutdown_lock:
769            if self._broken:
770                raise BrokenProcessPool(self._broken)
771            if self._shutdown_thread:
772                raise RuntimeError('cannot schedule new futures after shutdown')
773            if _global_shutdown:
774                raise RuntimeError('cannot schedule new futures after '
775                                   'interpreter shutdown')
776
777            f = _base.Future()
778            w = _WorkItem(f, fn, args, kwargs)
779
780            self._pending_work_items[self._queue_count] = w
781            self._work_ids.put(self._queue_count)
782            self._queue_count += 1
783            # Wake up queue management thread
784            self._executor_manager_thread_wakeup.wakeup()
785
786            if self._safe_to_dynamically_spawn_children:
787                self._adjust_process_count()
788            self._start_executor_manager_thread()
789            return f
790    submit.__doc__ = _base.Executor.submit.__doc__
791
792    def map(self, fn, *iterables, timeout=None, chunksize=1):
793        """Returns an iterator equivalent to map(fn, iter).
794
795        Args:
796            fn: A callable that will take as many arguments as there are
797                passed iterables.
798            timeout: The maximum number of seconds to wait. If None, then there
799                is no limit on the wait time.
800            chunksize: If greater than one, the iterables will be chopped into
801                chunks of size chunksize and submitted to the process pool.
802                If set to one, the items in the list will be sent one at a time.
803
804        Returns:
805            An iterator equivalent to: map(func, *iterables) but the calls may
806            be evaluated out-of-order.
807
808        Raises:
809            TimeoutError: If the entire result iterator could not be generated
810                before the given timeout.
811            Exception: If fn(*args) raises for any values.
812        """
813        if chunksize < 1:
814            raise ValueError("chunksize must be >= 1.")
815
816        results = super().map(partial(_process_chunk, fn),
817                              _get_chunks(*iterables, chunksize=chunksize),
818                              timeout=timeout)
819        return _chain_from_iterable_of_lists(results)
820
821    def shutdown(self, wait=True, *, cancel_futures=False):
822        with self._shutdown_lock:
823            self._cancel_pending_futures = cancel_futures
824            self._shutdown_thread = True
825            if self._executor_manager_thread_wakeup is not None:
826                # Wake up queue management thread
827                self._executor_manager_thread_wakeup.wakeup()
828
829        if self._executor_manager_thread is not None and wait:
830            self._executor_manager_thread.join()
831        # To reduce the risk of opening too many files, remove references to
832        # objects that use file descriptors.
833        self._executor_manager_thread = None
834        self._call_queue = None
835        if self._result_queue is not None and wait:
836            self._result_queue.close()
837        self._result_queue = None
838        self._processes = None
839        self._executor_manager_thread_wakeup = None
840
841    shutdown.__doc__ = _base.Executor.shutdown.__doc__
842