1#
2# Module providing the `Pool` class for managing a process pool
3#
4# multiprocessing/pool.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10__all__ = ['Pool', 'ThreadPool']
11
12#
13# Imports
14#
15
16import collections
17import itertools
18import os
19import queue
20import threading
21import time
22import traceback
23import types
24import warnings
25
26# If threading is available then ThreadPool should be provided.  Therefore
27# we avoid top-level imports which are liable to fail on some systems.
28from . import util
29from . import get_context, TimeoutError
30from .connection import wait
31
32#
33# Constants representing the state of a pool
34#
35
36INIT = "INIT"
37RUN = "RUN"
38CLOSE = "CLOSE"
39TERMINATE = "TERMINATE"
40
41#
42# Miscellaneous
43#
44
45job_counter = itertools.count()
46
47def mapstar(args):
48    return list(map(*args))
49
50def starmapstar(args):
51    return list(itertools.starmap(args[0], args[1]))
52
53#
54# Hack to embed stringification of remote traceback in local traceback
55#
56
57class RemoteTraceback(Exception):
58    def __init__(self, tb):
59        self.tb = tb
60    def __str__(self):
61        return self.tb
62
63class ExceptionWithTraceback:
64    def __init__(self, exc, tb):
65        tb = traceback.format_exception(type(exc), exc, tb)
66        tb = ''.join(tb)
67        self.exc = exc
68        self.tb = '\n"""\n%s"""' % tb
69    def __reduce__(self):
70        return rebuild_exc, (self.exc, self.tb)
71
72def rebuild_exc(exc, tb):
73    exc.__cause__ = RemoteTraceback(tb)
74    return exc
75
76#
77# Code run by worker processes
78#
79
80class MaybeEncodingError(Exception):
81    """Wraps possible unpickleable errors, so they can be
82    safely sent through the socket."""
83
84    def __init__(self, exc, value):
85        self.exc = repr(exc)
86        self.value = repr(value)
87        super(MaybeEncodingError, self).__init__(self.exc, self.value)
88
89    def __str__(self):
90        return "Error sending result: '%s'. Reason: '%s'" % (self.value,
91                                                             self.exc)
92
93    def __repr__(self):
94        return "<%s: %s>" % (self.__class__.__name__, self)
95
96
97def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
98           wrap_exception=False):
99    if (maxtasks is not None) and not (isinstance(maxtasks, int)
100                                       and maxtasks >= 1):
101        raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
102    put = outqueue.put
103    get = inqueue.get
104    if hasattr(inqueue, '_writer'):
105        inqueue._writer.close()
106        outqueue._reader.close()
107
108    if initializer is not None:
109        initializer(*initargs)
110
111    completed = 0
112    while maxtasks is None or (maxtasks and completed < maxtasks):
113        try:
114            task = get()
115        except (EOFError, OSError):
116            util.debug('worker got EOFError or OSError -- exiting')
117            break
118
119        if task is None:
120            util.debug('worker got sentinel -- exiting')
121            break
122
123        job, i, func, args, kwds = task
124        try:
125            result = (True, func(*args, **kwds))
126        except Exception as e:
127            if wrap_exception and func is not _helper_reraises_exception:
128                e = ExceptionWithTraceback(e, e.__traceback__)
129            result = (False, e)
130        try:
131            put((job, i, result))
132        except Exception as e:
133            wrapped = MaybeEncodingError(e, result[1])
134            util.debug("Possible encoding error while sending result: %s" % (
135                wrapped))
136            put((job, i, (False, wrapped)))
137
138        task = job = result = func = args = kwds = None
139        completed += 1
140    util.debug('worker exiting after %d tasks' % completed)
141
142def _helper_reraises_exception(ex):
143    'Pickle-able helper function for use by _guarded_task_generation.'
144    raise ex
145
146#
147# Class representing a process pool
148#
149
150class _PoolCache(dict):
151    """
152    Class that implements a cache for the Pool class that will notify
153    the pool management threads every time the cache is emptied. The
154    notification is done by the use of a queue that is provided when
155    instantiating the cache.
156    """
157    def __init__(self, /, *args, notifier=None, **kwds):
158        self.notifier = notifier
159        super().__init__(*args, **kwds)
160
161    def __delitem__(self, item):
162        super().__delitem__(item)
163
164        # Notify that the cache is empty. This is important because the
165        # pool keeps maintaining workers until the cache gets drained. This
166        # eliminates a race condition in which a task is finished after the
167        # the pool's _handle_workers method has enter another iteration of the
168        # loop. In this situation, the only event that can wake up the pool
169        # is the cache to be emptied (no more tasks available).
170        if not self:
171            self.notifier.put(None)
172
173class Pool(object):
174    '''
175    Class which supports an async version of applying functions to arguments.
176    '''
177    _wrap_exception = True
178
179    @staticmethod
180    def Process(ctx, *args, **kwds):
181        return ctx.Process(*args, **kwds)
182
183    def __init__(self, processes=None, initializer=None, initargs=(),
184                 maxtasksperchild=None, context=None):
185        # Attributes initialized early to make sure that they exist in
186        # __del__() if __init__() raises an exception
187        self._pool = []
188        self._state = INIT
189
190        self._ctx = context or get_context()
191        self._setup_queues()
192        self._taskqueue = queue.SimpleQueue()
193        # The _change_notifier queue exist to wake up self._handle_workers()
194        # when the cache (self._cache) is empty or when there is a change in
195        # the _state variable of the thread that runs _handle_workers.
196        self._change_notifier = self._ctx.SimpleQueue()
197        self._cache = _PoolCache(notifier=self._change_notifier)
198        self._maxtasksperchild = maxtasksperchild
199        self._initializer = initializer
200        self._initargs = initargs
201
202        if processes is None:
203            processes = os.cpu_count() or 1
204        if processes < 1:
205            raise ValueError("Number of processes must be at least 1")
206        if maxtasksperchild is not None:
207            if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0:
208                raise ValueError("maxtasksperchild must be a positive int or None")
209
210        if initializer is not None and not callable(initializer):
211            raise TypeError('initializer must be a callable')
212
213        self._processes = processes
214        try:
215            self._repopulate_pool()
216        except Exception:
217            for p in self._pool:
218                if p.exitcode is None:
219                    p.terminate()
220            for p in self._pool:
221                p.join()
222            raise
223
224        sentinels = self._get_sentinels()
225
226        self._worker_handler = threading.Thread(
227            target=Pool._handle_workers,
228            args=(self._cache, self._taskqueue, self._ctx, self.Process,
229                  self._processes, self._pool, self._inqueue, self._outqueue,
230                  self._initializer, self._initargs, self._maxtasksperchild,
231                  self._wrap_exception, sentinels, self._change_notifier)
232            )
233        self._worker_handler.daemon = True
234        self._worker_handler._state = RUN
235        self._worker_handler.start()
236
237
238        self._task_handler = threading.Thread(
239            target=Pool._handle_tasks,
240            args=(self._taskqueue, self._quick_put, self._outqueue,
241                  self._pool, self._cache)
242            )
243        self._task_handler.daemon = True
244        self._task_handler._state = RUN
245        self._task_handler.start()
246
247        self._result_handler = threading.Thread(
248            target=Pool._handle_results,
249            args=(self._outqueue, self._quick_get, self._cache)
250            )
251        self._result_handler.daemon = True
252        self._result_handler._state = RUN
253        self._result_handler.start()
254
255        self._terminate = util.Finalize(
256            self, self._terminate_pool,
257            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
258                  self._change_notifier, self._worker_handler, self._task_handler,
259                  self._result_handler, self._cache),
260            exitpriority=15
261            )
262        self._state = RUN
263
264    # Copy globals as function locals to make sure that they are available
265    # during Python shutdown when the Pool is destroyed.
266    def __del__(self, _warn=warnings.warn, RUN=RUN):
267        if self._state == RUN:
268            _warn(f"unclosed running multiprocessing pool {self!r}",
269                  ResourceWarning, source=self)
270            if getattr(self, '_change_notifier', None) is not None:
271                self._change_notifier.put(None)
272
273    def __repr__(self):
274        cls = self.__class__
275        return (f'<{cls.__module__}.{cls.__qualname__} '
276                f'state={self._state} '
277                f'pool_size={len(self._pool)}>')
278
279    def _get_sentinels(self):
280        task_queue_sentinels = [self._outqueue._reader]
281        self_notifier_sentinels = [self._change_notifier._reader]
282        return [*task_queue_sentinels, *self_notifier_sentinels]
283
284    @staticmethod
285    def _get_worker_sentinels(workers):
286        return [worker.sentinel for worker in
287                workers if hasattr(worker, "sentinel")]
288
289    @staticmethod
290    def _join_exited_workers(pool):
291        """Cleanup after any worker processes which have exited due to reaching
292        their specified lifetime.  Returns True if any workers were cleaned up.
293        """
294        cleaned = False
295        for i in reversed(range(len(pool))):
296            worker = pool[i]
297            if worker.exitcode is not None:
298                # worker exited
299                util.debug('cleaning up worker %d' % i)
300                worker.join()
301                cleaned = True
302                del pool[i]
303        return cleaned
304
305    def _repopulate_pool(self):
306        return self._repopulate_pool_static(self._ctx, self.Process,
307                                            self._processes,
308                                            self._pool, self._inqueue,
309                                            self._outqueue, self._initializer,
310                                            self._initargs,
311                                            self._maxtasksperchild,
312                                            self._wrap_exception)
313
314    @staticmethod
315    def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
316                                outqueue, initializer, initargs,
317                                maxtasksperchild, wrap_exception):
318        """Bring the number of pool processes up to the specified number,
319        for use after reaping workers which have exited.
320        """
321        for i in range(processes - len(pool)):
322            w = Process(ctx, target=worker,
323                        args=(inqueue, outqueue,
324                              initializer,
325                              initargs, maxtasksperchild,
326                              wrap_exception))
327            w.name = w.name.replace('Process', 'PoolWorker')
328            w.daemon = True
329            w.start()
330            pool.append(w)
331            util.debug('added worker')
332
333    @staticmethod
334    def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
335                       initializer, initargs, maxtasksperchild,
336                       wrap_exception):
337        """Clean up any exited workers and start replacements for them.
338        """
339        if Pool._join_exited_workers(pool):
340            Pool._repopulate_pool_static(ctx, Process, processes, pool,
341                                         inqueue, outqueue, initializer,
342                                         initargs, maxtasksperchild,
343                                         wrap_exception)
344
345    def _setup_queues(self):
346        self._inqueue = self._ctx.SimpleQueue()
347        self._outqueue = self._ctx.SimpleQueue()
348        self._quick_put = self._inqueue._writer.send
349        self._quick_get = self._outqueue._reader.recv
350
351    def _check_running(self):
352        if self._state != RUN:
353            raise ValueError("Pool not running")
354
355    def apply(self, func, args=(), kwds={}):
356        '''
357        Equivalent of `func(*args, **kwds)`.
358        Pool must be running.
359        '''
360        return self.apply_async(func, args, kwds).get()
361
362    def map(self, func, iterable, chunksize=None):
363        '''
364        Apply `func` to each element in `iterable`, collecting the results
365        in a list that is returned.
366        '''
367        return self._map_async(func, iterable, mapstar, chunksize).get()
368
369    def starmap(self, func, iterable, chunksize=None):
370        '''
371        Like `map()` method but the elements of the `iterable` are expected to
372        be iterables as well and will be unpacked as arguments. Hence
373        `func` and (a, b) becomes func(a, b).
374        '''
375        return self._map_async(func, iterable, starmapstar, chunksize).get()
376
377    def starmap_async(self, func, iterable, chunksize=None, callback=None,
378            error_callback=None):
379        '''
380        Asynchronous version of `starmap()` method.
381        '''
382        return self._map_async(func, iterable, starmapstar, chunksize,
383                               callback, error_callback)
384
385    def _guarded_task_generation(self, result_job, func, iterable):
386        '''Provides a generator of tasks for imap and imap_unordered with
387        appropriate handling for iterables which throw exceptions during
388        iteration.'''
389        try:
390            i = -1
391            for i, x in enumerate(iterable):
392                yield (result_job, i, func, (x,), {})
393        except Exception as e:
394            yield (result_job, i+1, _helper_reraises_exception, (e,), {})
395
396    def imap(self, func, iterable, chunksize=1):
397        '''
398        Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
399        '''
400        self._check_running()
401        if chunksize == 1:
402            result = IMapIterator(self)
403            self._taskqueue.put(
404                (
405                    self._guarded_task_generation(result._job, func, iterable),
406                    result._set_length
407                ))
408            return result
409        else:
410            if chunksize < 1:
411                raise ValueError(
412                    "Chunksize must be 1+, not {0:n}".format(
413                        chunksize))
414            task_batches = Pool._get_tasks(func, iterable, chunksize)
415            result = IMapIterator(self)
416            self._taskqueue.put(
417                (
418                    self._guarded_task_generation(result._job,
419                                                  mapstar,
420                                                  task_batches),
421                    result._set_length
422                ))
423            return (item for chunk in result for item in chunk)
424
425    def imap_unordered(self, func, iterable, chunksize=1):
426        '''
427        Like `imap()` method but ordering of results is arbitrary.
428        '''
429        self._check_running()
430        if chunksize == 1:
431            result = IMapUnorderedIterator(self)
432            self._taskqueue.put(
433                (
434                    self._guarded_task_generation(result._job, func, iterable),
435                    result._set_length
436                ))
437            return result
438        else:
439            if chunksize < 1:
440                raise ValueError(
441                    "Chunksize must be 1+, not {0!r}".format(chunksize))
442            task_batches = Pool._get_tasks(func, iterable, chunksize)
443            result = IMapUnorderedIterator(self)
444            self._taskqueue.put(
445                (
446                    self._guarded_task_generation(result._job,
447                                                  mapstar,
448                                                  task_batches),
449                    result._set_length
450                ))
451            return (item for chunk in result for item in chunk)
452
453    def apply_async(self, func, args=(), kwds={}, callback=None,
454            error_callback=None):
455        '''
456        Asynchronous version of `apply()` method.
457        '''
458        self._check_running()
459        result = ApplyResult(self, callback, error_callback)
460        self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
461        return result
462
463    def map_async(self, func, iterable, chunksize=None, callback=None,
464            error_callback=None):
465        '''
466        Asynchronous version of `map()` method.
467        '''
468        return self._map_async(func, iterable, mapstar, chunksize, callback,
469            error_callback)
470
471    def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
472            error_callback=None):
473        '''
474        Helper function to implement map, starmap and their async counterparts.
475        '''
476        self._check_running()
477        if not hasattr(iterable, '__len__'):
478            iterable = list(iterable)
479
480        if chunksize is None:
481            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
482            if extra:
483                chunksize += 1
484        if len(iterable) == 0:
485            chunksize = 0
486
487        task_batches = Pool._get_tasks(func, iterable, chunksize)
488        result = MapResult(self, chunksize, len(iterable), callback,
489                           error_callback=error_callback)
490        self._taskqueue.put(
491            (
492                self._guarded_task_generation(result._job,
493                                              mapper,
494                                              task_batches),
495                None
496            )
497        )
498        return result
499
500    @staticmethod
501    def _wait_for_updates(sentinels, change_notifier, timeout=None):
502        wait(sentinels, timeout=timeout)
503        while not change_notifier.empty():
504            change_notifier.get()
505
506    @classmethod
507    def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
508                        pool, inqueue, outqueue, initializer, initargs,
509                        maxtasksperchild, wrap_exception, sentinels,
510                        change_notifier):
511        thread = threading.current_thread()
512
513        # Keep maintaining workers until the cache gets drained, unless the pool
514        # is terminated.
515        while thread._state == RUN or (cache and thread._state != TERMINATE):
516            cls._maintain_pool(ctx, Process, processes, pool, inqueue,
517                               outqueue, initializer, initargs,
518                               maxtasksperchild, wrap_exception)
519
520            current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
521
522            cls._wait_for_updates(current_sentinels, change_notifier)
523        # send sentinel to stop workers
524        taskqueue.put(None)
525        util.debug('worker handler exiting')
526
527    @staticmethod
528    def _handle_tasks(taskqueue, put, outqueue, pool, cache):
529        thread = threading.current_thread()
530
531        for taskseq, set_length in iter(taskqueue.get, None):
532            task = None
533            try:
534                # iterating taskseq cannot fail
535                for task in taskseq:
536                    if thread._state != RUN:
537                        util.debug('task handler found thread._state != RUN')
538                        break
539                    try:
540                        put(task)
541                    except Exception as e:
542                        job, idx = task[:2]
543                        try:
544                            cache[job]._set(idx, (False, e))
545                        except KeyError:
546                            pass
547                else:
548                    if set_length:
549                        util.debug('doing set_length()')
550                        idx = task[1] if task else -1
551                        set_length(idx + 1)
552                    continue
553                break
554            finally:
555                task = taskseq = job = None
556        else:
557            util.debug('task handler got sentinel')
558
559        try:
560            # tell result handler to finish when cache is empty
561            util.debug('task handler sending sentinel to result handler')
562            outqueue.put(None)
563
564            # tell workers there is no more work
565            util.debug('task handler sending sentinel to workers')
566            for p in pool:
567                put(None)
568        except OSError:
569            util.debug('task handler got OSError when sending sentinels')
570
571        util.debug('task handler exiting')
572
573    @staticmethod
574    def _handle_results(outqueue, get, cache):
575        thread = threading.current_thread()
576
577        while 1:
578            try:
579                task = get()
580            except (OSError, EOFError):
581                util.debug('result handler got EOFError/OSError -- exiting')
582                return
583
584            if thread._state != RUN:
585                assert thread._state == TERMINATE, "Thread not in TERMINATE"
586                util.debug('result handler found thread._state=TERMINATE')
587                break
588
589            if task is None:
590                util.debug('result handler got sentinel')
591                break
592
593            job, i, obj = task
594            try:
595                cache[job]._set(i, obj)
596            except KeyError:
597                pass
598            task = job = obj = None
599
600        while cache and thread._state != TERMINATE:
601            try:
602                task = get()
603            except (OSError, EOFError):
604                util.debug('result handler got EOFError/OSError -- exiting')
605                return
606
607            if task is None:
608                util.debug('result handler ignoring extra sentinel')
609                continue
610            job, i, obj = task
611            try:
612                cache[job]._set(i, obj)
613            except KeyError:
614                pass
615            task = job = obj = None
616
617        if hasattr(outqueue, '_reader'):
618            util.debug('ensuring that outqueue is not full')
619            # If we don't make room available in outqueue then
620            # attempts to add the sentinel (None) to outqueue may
621            # block.  There is guaranteed to be no more than 2 sentinels.
622            try:
623                for i in range(10):
624                    if not outqueue._reader.poll():
625                        break
626                    get()
627            except (OSError, EOFError):
628                pass
629
630        util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
631              len(cache), thread._state)
632
633    @staticmethod
634    def _get_tasks(func, it, size):
635        it = iter(it)
636        while 1:
637            x = tuple(itertools.islice(it, size))
638            if not x:
639                return
640            yield (func, x)
641
642    def __reduce__(self):
643        raise NotImplementedError(
644              'pool objects cannot be passed between processes or pickled'
645              )
646
647    def close(self):
648        util.debug('closing pool')
649        if self._state == RUN:
650            self._state = CLOSE
651            self._worker_handler._state = CLOSE
652            self._change_notifier.put(None)
653
654    def terminate(self):
655        util.debug('terminating pool')
656        self._state = TERMINATE
657        self._terminate()
658
659    def join(self):
660        util.debug('joining pool')
661        if self._state == RUN:
662            raise ValueError("Pool is still running")
663        elif self._state not in (CLOSE, TERMINATE):
664            raise ValueError("In unknown state")
665        self._worker_handler.join()
666        self._task_handler.join()
667        self._result_handler.join()
668        for p in self._pool:
669            p.join()
670
671    @staticmethod
672    def _help_stuff_finish(inqueue, task_handler, size):
673        # task_handler may be blocked trying to put items on inqueue
674        util.debug('removing tasks from inqueue until task handler finished')
675        inqueue._rlock.acquire()
676        while task_handler.is_alive() and inqueue._reader.poll():
677            inqueue._reader.recv()
678            time.sleep(0)
679
680    @classmethod
681    def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
682                        worker_handler, task_handler, result_handler, cache):
683        # this is guaranteed to only be called once
684        util.debug('finalizing pool')
685
686        # Notify that the worker_handler state has been changed so the
687        # _handle_workers loop can be unblocked (and exited) in order to
688        # send the finalization sentinel all the workers.
689        worker_handler._state = TERMINATE
690        change_notifier.put(None)
691
692        task_handler._state = TERMINATE
693
694        util.debug('helping task handler/workers to finish')
695        cls._help_stuff_finish(inqueue, task_handler, len(pool))
696
697        if (not result_handler.is_alive()) and (len(cache) != 0):
698            raise AssertionError(
699                "Cannot have cache with result_hander not alive")
700
701        result_handler._state = TERMINATE
702        change_notifier.put(None)
703        outqueue.put(None)                  # sentinel
704
705        # We must wait for the worker handler to exit before terminating
706        # workers because we don't want workers to be restarted behind our back.
707        util.debug('joining worker handler')
708        if threading.current_thread() is not worker_handler:
709            worker_handler.join()
710
711        # Terminate workers which haven't already finished.
712        if pool and hasattr(pool[0], 'terminate'):
713            util.debug('terminating workers')
714            for p in pool:
715                if p.exitcode is None:
716                    p.terminate()
717
718        util.debug('joining task handler')
719        if threading.current_thread() is not task_handler:
720            task_handler.join()
721
722        util.debug('joining result handler')
723        if threading.current_thread() is not result_handler:
724            result_handler.join()
725
726        if pool and hasattr(pool[0], 'terminate'):
727            util.debug('joining pool workers')
728            for p in pool:
729                if p.is_alive():
730                    # worker has not yet exited
731                    util.debug('cleaning up worker %d' % p.pid)
732                    p.join()
733
734    def __enter__(self):
735        self._check_running()
736        return self
737
738    def __exit__(self, exc_type, exc_val, exc_tb):
739        self.terminate()
740
741#
742# Class whose instances are returned by `Pool.apply_async()`
743#
744
745class ApplyResult(object):
746
747    def __init__(self, pool, callback, error_callback):
748        self._pool = pool
749        self._event = threading.Event()
750        self._job = next(job_counter)
751        self._cache = pool._cache
752        self._callback = callback
753        self._error_callback = error_callback
754        self._cache[self._job] = self
755
756    def ready(self):
757        return self._event.is_set()
758
759    def successful(self):
760        if not self.ready():
761            raise ValueError("{0!r} not ready".format(self))
762        return self._success
763
764    def wait(self, timeout=None):
765        self._event.wait(timeout)
766
767    def get(self, timeout=None):
768        self.wait(timeout)
769        if not self.ready():
770            raise TimeoutError
771        if self._success:
772            return self._value
773        else:
774            raise self._value
775
776    def _set(self, i, obj):
777        self._success, self._value = obj
778        if self._callback and self._success:
779            self._callback(self._value)
780        if self._error_callback and not self._success:
781            self._error_callback(self._value)
782        self._event.set()
783        del self._cache[self._job]
784        self._pool = None
785
786    __class_getitem__ = classmethod(types.GenericAlias)
787
788AsyncResult = ApplyResult       # create alias -- see #17805
789
790#
791# Class whose instances are returned by `Pool.map_async()`
792#
793
794class MapResult(ApplyResult):
795
796    def __init__(self, pool, chunksize, length, callback, error_callback):
797        ApplyResult.__init__(self, pool, callback,
798                             error_callback=error_callback)
799        self._success = True
800        self._value = [None] * length
801        self._chunksize = chunksize
802        if chunksize <= 0:
803            self._number_left = 0
804            self._event.set()
805            del self._cache[self._job]
806        else:
807            self._number_left = length//chunksize + bool(length % chunksize)
808
809    def _set(self, i, success_result):
810        self._number_left -= 1
811        success, result = success_result
812        if success and self._success:
813            self._value[i*self._chunksize:(i+1)*self._chunksize] = result
814            if self._number_left == 0:
815                if self._callback:
816                    self._callback(self._value)
817                del self._cache[self._job]
818                self._event.set()
819                self._pool = None
820        else:
821            if not success and self._success:
822                # only store first exception
823                self._success = False
824                self._value = result
825            if self._number_left == 0:
826                # only consider the result ready once all jobs are done
827                if self._error_callback:
828                    self._error_callback(self._value)
829                del self._cache[self._job]
830                self._event.set()
831                self._pool = None
832
833#
834# Class whose instances are returned by `Pool.imap()`
835#
836
837class IMapIterator(object):
838
839    def __init__(self, pool):
840        self._pool = pool
841        self._cond = threading.Condition(threading.Lock())
842        self._job = next(job_counter)
843        self._cache = pool._cache
844        self._items = collections.deque()
845        self._index = 0
846        self._length = None
847        self._unsorted = {}
848        self._cache[self._job] = self
849
850    def __iter__(self):
851        return self
852
853    def next(self, timeout=None):
854        with self._cond:
855            try:
856                item = self._items.popleft()
857            except IndexError:
858                if self._index == self._length:
859                    self._pool = None
860                    raise StopIteration from None
861                self._cond.wait(timeout)
862                try:
863                    item = self._items.popleft()
864                except IndexError:
865                    if self._index == self._length:
866                        self._pool = None
867                        raise StopIteration from None
868                    raise TimeoutError from None
869
870        success, value = item
871        if success:
872            return value
873        raise value
874
875    __next__ = next                    # XXX
876
877    def _set(self, i, obj):
878        with self._cond:
879            if self._index == i:
880                self._items.append(obj)
881                self._index += 1
882                while self._index in self._unsorted:
883                    obj = self._unsorted.pop(self._index)
884                    self._items.append(obj)
885                    self._index += 1
886                self._cond.notify()
887            else:
888                self._unsorted[i] = obj
889
890            if self._index == self._length:
891                del self._cache[self._job]
892                self._pool = None
893
894    def _set_length(self, length):
895        with self._cond:
896            self._length = length
897            if self._index == self._length:
898                self._cond.notify()
899                del self._cache[self._job]
900                self._pool = None
901
902#
903# Class whose instances are returned by `Pool.imap_unordered()`
904#
905
906class IMapUnorderedIterator(IMapIterator):
907
908    def _set(self, i, obj):
909        with self._cond:
910            self._items.append(obj)
911            self._index += 1
912            self._cond.notify()
913            if self._index == self._length:
914                del self._cache[self._job]
915                self._pool = None
916
917#
918#
919#
920
921class ThreadPool(Pool):
922    _wrap_exception = False
923
924    @staticmethod
925    def Process(ctx, *args, **kwds):
926        from .dummy import Process
927        return Process(*args, **kwds)
928
929    def __init__(self, processes=None, initializer=None, initargs=()):
930        Pool.__init__(self, processes, initializer, initargs)
931
932    def _setup_queues(self):
933        self._inqueue = queue.SimpleQueue()
934        self._outqueue = queue.SimpleQueue()
935        self._quick_put = self._inqueue.put
936        self._quick_get = self._outqueue.get
937
938    def _get_sentinels(self):
939        return [self._change_notifier._reader]
940
941    @staticmethod
942    def _get_worker_sentinels(workers):
943        return []
944
945    @staticmethod
946    def _help_stuff_finish(inqueue, task_handler, size):
947        # drain inqueue, and put sentinels at its head to make workers finish
948        try:
949            while True:
950                inqueue.get(block=False)
951        except queue.Empty:
952            pass
953        for i in range(size):
954            inqueue.put(None)
955
956    def _wait_for_updates(self, sentinels, change_notifier, timeout):
957        time.sleep(timeout)
958