1# 2# Module providing the `Pool` class for managing a process pool 3# 4# multiprocessing/pool.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = ['Pool', 'ThreadPool'] 11 12# 13# Imports 14# 15 16import collections 17import itertools 18import os 19import queue 20import threading 21import time 22import traceback 23import types 24import warnings 25 26# If threading is available then ThreadPool should be provided. Therefore 27# we avoid top-level imports which are liable to fail on some systems. 28from . import util 29from . import get_context, TimeoutError 30from .connection import wait 31 32# 33# Constants representing the state of a pool 34# 35 36INIT = "INIT" 37RUN = "RUN" 38CLOSE = "CLOSE" 39TERMINATE = "TERMINATE" 40 41# 42# Miscellaneous 43# 44 45job_counter = itertools.count() 46 47def mapstar(args): 48 return list(map(*args)) 49 50def starmapstar(args): 51 return list(itertools.starmap(args[0], args[1])) 52 53# 54# Hack to embed stringification of remote traceback in local traceback 55# 56 57class RemoteTraceback(Exception): 58 def __init__(self, tb): 59 self.tb = tb 60 def __str__(self): 61 return self.tb 62 63class ExceptionWithTraceback: 64 def __init__(self, exc, tb): 65 tb = traceback.format_exception(type(exc), exc, tb) 66 tb = ''.join(tb) 67 self.exc = exc 68 self.tb = '\n"""\n%s"""' % tb 69 def __reduce__(self): 70 return rebuild_exc, (self.exc, self.tb) 71 72def rebuild_exc(exc, tb): 73 exc.__cause__ = RemoteTraceback(tb) 74 return exc 75 76# 77# Code run by worker processes 78# 79 80class MaybeEncodingError(Exception): 81 """Wraps possible unpickleable errors, so they can be 82 safely sent through the socket.""" 83 84 def __init__(self, exc, value): 85 self.exc = repr(exc) 86 self.value = repr(value) 87 super(MaybeEncodingError, self).__init__(self.exc, self.value) 88 89 def __str__(self): 90 return "Error sending result: '%s'. Reason: '%s'" % (self.value, 91 self.exc) 92 93 def __repr__(self): 94 return "<%s: %s>" % (self.__class__.__name__, self) 95 96 97def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, 98 wrap_exception=False): 99 if (maxtasks is not None) and not (isinstance(maxtasks, int) 100 and maxtasks >= 1): 101 raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks)) 102 put = outqueue.put 103 get = inqueue.get 104 if hasattr(inqueue, '_writer'): 105 inqueue._writer.close() 106 outqueue._reader.close() 107 108 if initializer is not None: 109 initializer(*initargs) 110 111 completed = 0 112 while maxtasks is None or (maxtasks and completed < maxtasks): 113 try: 114 task = get() 115 except (EOFError, OSError): 116 util.debug('worker got EOFError or OSError -- exiting') 117 break 118 119 if task is None: 120 util.debug('worker got sentinel -- exiting') 121 break 122 123 job, i, func, args, kwds = task 124 try: 125 result = (True, func(*args, **kwds)) 126 except Exception as e: 127 if wrap_exception and func is not _helper_reraises_exception: 128 e = ExceptionWithTraceback(e, e.__traceback__) 129 result = (False, e) 130 try: 131 put((job, i, result)) 132 except Exception as e: 133 wrapped = MaybeEncodingError(e, result[1]) 134 util.debug("Possible encoding error while sending result: %s" % ( 135 wrapped)) 136 put((job, i, (False, wrapped))) 137 138 task = job = result = func = args = kwds = None 139 completed += 1 140 util.debug('worker exiting after %d tasks' % completed) 141 142def _helper_reraises_exception(ex): 143 'Pickle-able helper function for use by _guarded_task_generation.' 144 raise ex 145 146# 147# Class representing a process pool 148# 149 150class _PoolCache(dict): 151 """ 152 Class that implements a cache for the Pool class that will notify 153 the pool management threads every time the cache is emptied. The 154 notification is done by the use of a queue that is provided when 155 instantiating the cache. 156 """ 157 def __init__(self, /, *args, notifier=None, **kwds): 158 self.notifier = notifier 159 super().__init__(*args, **kwds) 160 161 def __delitem__(self, item): 162 super().__delitem__(item) 163 164 # Notify that the cache is empty. This is important because the 165 # pool keeps maintaining workers until the cache gets drained. This 166 # eliminates a race condition in which a task is finished after the 167 # the pool's _handle_workers method has enter another iteration of the 168 # loop. In this situation, the only event that can wake up the pool 169 # is the cache to be emptied (no more tasks available). 170 if not self: 171 self.notifier.put(None) 172 173class Pool(object): 174 ''' 175 Class which supports an async version of applying functions to arguments. 176 ''' 177 _wrap_exception = True 178 179 @staticmethod 180 def Process(ctx, *args, **kwds): 181 return ctx.Process(*args, **kwds) 182 183 def __init__(self, processes=None, initializer=None, initargs=(), 184 maxtasksperchild=None, context=None): 185 # Attributes initialized early to make sure that they exist in 186 # __del__() if __init__() raises an exception 187 self._pool = [] 188 self._state = INIT 189 190 self._ctx = context or get_context() 191 self._setup_queues() 192 self._taskqueue = queue.SimpleQueue() 193 # The _change_notifier queue exist to wake up self._handle_workers() 194 # when the cache (self._cache) is empty or when there is a change in 195 # the _state variable of the thread that runs _handle_workers. 196 self._change_notifier = self._ctx.SimpleQueue() 197 self._cache = _PoolCache(notifier=self._change_notifier) 198 self._maxtasksperchild = maxtasksperchild 199 self._initializer = initializer 200 self._initargs = initargs 201 202 if processes is None: 203 processes = os.cpu_count() or 1 204 if processes < 1: 205 raise ValueError("Number of processes must be at least 1") 206 if maxtasksperchild is not None: 207 if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0: 208 raise ValueError("maxtasksperchild must be a positive int or None") 209 210 if initializer is not None and not callable(initializer): 211 raise TypeError('initializer must be a callable') 212 213 self._processes = processes 214 try: 215 self._repopulate_pool() 216 except Exception: 217 for p in self._pool: 218 if p.exitcode is None: 219 p.terminate() 220 for p in self._pool: 221 p.join() 222 raise 223 224 sentinels = self._get_sentinels() 225 226 self._worker_handler = threading.Thread( 227 target=Pool._handle_workers, 228 args=(self._cache, self._taskqueue, self._ctx, self.Process, 229 self._processes, self._pool, self._inqueue, self._outqueue, 230 self._initializer, self._initargs, self._maxtasksperchild, 231 self._wrap_exception, sentinels, self._change_notifier) 232 ) 233 self._worker_handler.daemon = True 234 self._worker_handler._state = RUN 235 self._worker_handler.start() 236 237 238 self._task_handler = threading.Thread( 239 target=Pool._handle_tasks, 240 args=(self._taskqueue, self._quick_put, self._outqueue, 241 self._pool, self._cache) 242 ) 243 self._task_handler.daemon = True 244 self._task_handler._state = RUN 245 self._task_handler.start() 246 247 self._result_handler = threading.Thread( 248 target=Pool._handle_results, 249 args=(self._outqueue, self._quick_get, self._cache) 250 ) 251 self._result_handler.daemon = True 252 self._result_handler._state = RUN 253 self._result_handler.start() 254 255 self._terminate = util.Finalize( 256 self, self._terminate_pool, 257 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 258 self._change_notifier, self._worker_handler, self._task_handler, 259 self._result_handler, self._cache), 260 exitpriority=15 261 ) 262 self._state = RUN 263 264 # Copy globals as function locals to make sure that they are available 265 # during Python shutdown when the Pool is destroyed. 266 def __del__(self, _warn=warnings.warn, RUN=RUN): 267 if self._state == RUN: 268 _warn(f"unclosed running multiprocessing pool {self!r}", 269 ResourceWarning, source=self) 270 if getattr(self, '_change_notifier', None) is not None: 271 self._change_notifier.put(None) 272 273 def __repr__(self): 274 cls = self.__class__ 275 return (f'<{cls.__module__}.{cls.__qualname__} ' 276 f'state={self._state} ' 277 f'pool_size={len(self._pool)}>') 278 279 def _get_sentinels(self): 280 task_queue_sentinels = [self._outqueue._reader] 281 self_notifier_sentinels = [self._change_notifier._reader] 282 return [*task_queue_sentinels, *self_notifier_sentinels] 283 284 @staticmethod 285 def _get_worker_sentinels(workers): 286 return [worker.sentinel for worker in 287 workers if hasattr(worker, "sentinel")] 288 289 @staticmethod 290 def _join_exited_workers(pool): 291 """Cleanup after any worker processes which have exited due to reaching 292 their specified lifetime. Returns True if any workers were cleaned up. 293 """ 294 cleaned = False 295 for i in reversed(range(len(pool))): 296 worker = pool[i] 297 if worker.exitcode is not None: 298 # worker exited 299 util.debug('cleaning up worker %d' % i) 300 worker.join() 301 cleaned = True 302 del pool[i] 303 return cleaned 304 305 def _repopulate_pool(self): 306 return self._repopulate_pool_static(self._ctx, self.Process, 307 self._processes, 308 self._pool, self._inqueue, 309 self._outqueue, self._initializer, 310 self._initargs, 311 self._maxtasksperchild, 312 self._wrap_exception) 313 314 @staticmethod 315 def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, 316 outqueue, initializer, initargs, 317 maxtasksperchild, wrap_exception): 318 """Bring the number of pool processes up to the specified number, 319 for use after reaping workers which have exited. 320 """ 321 for i in range(processes - len(pool)): 322 w = Process(ctx, target=worker, 323 args=(inqueue, outqueue, 324 initializer, 325 initargs, maxtasksperchild, 326 wrap_exception)) 327 w.name = w.name.replace('Process', 'PoolWorker') 328 w.daemon = True 329 w.start() 330 pool.append(w) 331 util.debug('added worker') 332 333 @staticmethod 334 def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, 335 initializer, initargs, maxtasksperchild, 336 wrap_exception): 337 """Clean up any exited workers and start replacements for them. 338 """ 339 if Pool._join_exited_workers(pool): 340 Pool._repopulate_pool_static(ctx, Process, processes, pool, 341 inqueue, outqueue, initializer, 342 initargs, maxtasksperchild, 343 wrap_exception) 344 345 def _setup_queues(self): 346 self._inqueue = self._ctx.SimpleQueue() 347 self._outqueue = self._ctx.SimpleQueue() 348 self._quick_put = self._inqueue._writer.send 349 self._quick_get = self._outqueue._reader.recv 350 351 def _check_running(self): 352 if self._state != RUN: 353 raise ValueError("Pool not running") 354 355 def apply(self, func, args=(), kwds={}): 356 ''' 357 Equivalent of `func(*args, **kwds)`. 358 Pool must be running. 359 ''' 360 return self.apply_async(func, args, kwds).get() 361 362 def map(self, func, iterable, chunksize=None): 363 ''' 364 Apply `func` to each element in `iterable`, collecting the results 365 in a list that is returned. 366 ''' 367 return self._map_async(func, iterable, mapstar, chunksize).get() 368 369 def starmap(self, func, iterable, chunksize=None): 370 ''' 371 Like `map()` method but the elements of the `iterable` are expected to 372 be iterables as well and will be unpacked as arguments. Hence 373 `func` and (a, b) becomes func(a, b). 374 ''' 375 return self._map_async(func, iterable, starmapstar, chunksize).get() 376 377 def starmap_async(self, func, iterable, chunksize=None, callback=None, 378 error_callback=None): 379 ''' 380 Asynchronous version of `starmap()` method. 381 ''' 382 return self._map_async(func, iterable, starmapstar, chunksize, 383 callback, error_callback) 384 385 def _guarded_task_generation(self, result_job, func, iterable): 386 '''Provides a generator of tasks for imap and imap_unordered with 387 appropriate handling for iterables which throw exceptions during 388 iteration.''' 389 try: 390 i = -1 391 for i, x in enumerate(iterable): 392 yield (result_job, i, func, (x,), {}) 393 except Exception as e: 394 yield (result_job, i+1, _helper_reraises_exception, (e,), {}) 395 396 def imap(self, func, iterable, chunksize=1): 397 ''' 398 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. 399 ''' 400 self._check_running() 401 if chunksize == 1: 402 result = IMapIterator(self) 403 self._taskqueue.put( 404 ( 405 self._guarded_task_generation(result._job, func, iterable), 406 result._set_length 407 )) 408 return result 409 else: 410 if chunksize < 1: 411 raise ValueError( 412 "Chunksize must be 1+, not {0:n}".format( 413 chunksize)) 414 task_batches = Pool._get_tasks(func, iterable, chunksize) 415 result = IMapIterator(self) 416 self._taskqueue.put( 417 ( 418 self._guarded_task_generation(result._job, 419 mapstar, 420 task_batches), 421 result._set_length 422 )) 423 return (item for chunk in result for item in chunk) 424 425 def imap_unordered(self, func, iterable, chunksize=1): 426 ''' 427 Like `imap()` method but ordering of results is arbitrary. 428 ''' 429 self._check_running() 430 if chunksize == 1: 431 result = IMapUnorderedIterator(self) 432 self._taskqueue.put( 433 ( 434 self._guarded_task_generation(result._job, func, iterable), 435 result._set_length 436 )) 437 return result 438 else: 439 if chunksize < 1: 440 raise ValueError( 441 "Chunksize must be 1+, not {0!r}".format(chunksize)) 442 task_batches = Pool._get_tasks(func, iterable, chunksize) 443 result = IMapUnorderedIterator(self) 444 self._taskqueue.put( 445 ( 446 self._guarded_task_generation(result._job, 447 mapstar, 448 task_batches), 449 result._set_length 450 )) 451 return (item for chunk in result for item in chunk) 452 453 def apply_async(self, func, args=(), kwds={}, callback=None, 454 error_callback=None): 455 ''' 456 Asynchronous version of `apply()` method. 457 ''' 458 self._check_running() 459 result = ApplyResult(self, callback, error_callback) 460 self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) 461 return result 462 463 def map_async(self, func, iterable, chunksize=None, callback=None, 464 error_callback=None): 465 ''' 466 Asynchronous version of `map()` method. 467 ''' 468 return self._map_async(func, iterable, mapstar, chunksize, callback, 469 error_callback) 470 471 def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, 472 error_callback=None): 473 ''' 474 Helper function to implement map, starmap and their async counterparts. 475 ''' 476 self._check_running() 477 if not hasattr(iterable, '__len__'): 478 iterable = list(iterable) 479 480 if chunksize is None: 481 chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 482 if extra: 483 chunksize += 1 484 if len(iterable) == 0: 485 chunksize = 0 486 487 task_batches = Pool._get_tasks(func, iterable, chunksize) 488 result = MapResult(self, chunksize, len(iterable), callback, 489 error_callback=error_callback) 490 self._taskqueue.put( 491 ( 492 self._guarded_task_generation(result._job, 493 mapper, 494 task_batches), 495 None 496 ) 497 ) 498 return result 499 500 @staticmethod 501 def _wait_for_updates(sentinels, change_notifier, timeout=None): 502 wait(sentinels, timeout=timeout) 503 while not change_notifier.empty(): 504 change_notifier.get() 505 506 @classmethod 507 def _handle_workers(cls, cache, taskqueue, ctx, Process, processes, 508 pool, inqueue, outqueue, initializer, initargs, 509 maxtasksperchild, wrap_exception, sentinels, 510 change_notifier): 511 thread = threading.current_thread() 512 513 # Keep maintaining workers until the cache gets drained, unless the pool 514 # is terminated. 515 while thread._state == RUN or (cache and thread._state != TERMINATE): 516 cls._maintain_pool(ctx, Process, processes, pool, inqueue, 517 outqueue, initializer, initargs, 518 maxtasksperchild, wrap_exception) 519 520 current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels] 521 522 cls._wait_for_updates(current_sentinels, change_notifier) 523 # send sentinel to stop workers 524 taskqueue.put(None) 525 util.debug('worker handler exiting') 526 527 @staticmethod 528 def _handle_tasks(taskqueue, put, outqueue, pool, cache): 529 thread = threading.current_thread() 530 531 for taskseq, set_length in iter(taskqueue.get, None): 532 task = None 533 try: 534 # iterating taskseq cannot fail 535 for task in taskseq: 536 if thread._state != RUN: 537 util.debug('task handler found thread._state != RUN') 538 break 539 try: 540 put(task) 541 except Exception as e: 542 job, idx = task[:2] 543 try: 544 cache[job]._set(idx, (False, e)) 545 except KeyError: 546 pass 547 else: 548 if set_length: 549 util.debug('doing set_length()') 550 idx = task[1] if task else -1 551 set_length(idx + 1) 552 continue 553 break 554 finally: 555 task = taskseq = job = None 556 else: 557 util.debug('task handler got sentinel') 558 559 try: 560 # tell result handler to finish when cache is empty 561 util.debug('task handler sending sentinel to result handler') 562 outqueue.put(None) 563 564 # tell workers there is no more work 565 util.debug('task handler sending sentinel to workers') 566 for p in pool: 567 put(None) 568 except OSError: 569 util.debug('task handler got OSError when sending sentinels') 570 571 util.debug('task handler exiting') 572 573 @staticmethod 574 def _handle_results(outqueue, get, cache): 575 thread = threading.current_thread() 576 577 while 1: 578 try: 579 task = get() 580 except (OSError, EOFError): 581 util.debug('result handler got EOFError/OSError -- exiting') 582 return 583 584 if thread._state != RUN: 585 assert thread._state == TERMINATE, "Thread not in TERMINATE" 586 util.debug('result handler found thread._state=TERMINATE') 587 break 588 589 if task is None: 590 util.debug('result handler got sentinel') 591 break 592 593 job, i, obj = task 594 try: 595 cache[job]._set(i, obj) 596 except KeyError: 597 pass 598 task = job = obj = None 599 600 while cache and thread._state != TERMINATE: 601 try: 602 task = get() 603 except (OSError, EOFError): 604 util.debug('result handler got EOFError/OSError -- exiting') 605 return 606 607 if task is None: 608 util.debug('result handler ignoring extra sentinel') 609 continue 610 job, i, obj = task 611 try: 612 cache[job]._set(i, obj) 613 except KeyError: 614 pass 615 task = job = obj = None 616 617 if hasattr(outqueue, '_reader'): 618 util.debug('ensuring that outqueue is not full') 619 # If we don't make room available in outqueue then 620 # attempts to add the sentinel (None) to outqueue may 621 # block. There is guaranteed to be no more than 2 sentinels. 622 try: 623 for i in range(10): 624 if not outqueue._reader.poll(): 625 break 626 get() 627 except (OSError, EOFError): 628 pass 629 630 util.debug('result handler exiting: len(cache)=%s, thread._state=%s', 631 len(cache), thread._state) 632 633 @staticmethod 634 def _get_tasks(func, it, size): 635 it = iter(it) 636 while 1: 637 x = tuple(itertools.islice(it, size)) 638 if not x: 639 return 640 yield (func, x) 641 642 def __reduce__(self): 643 raise NotImplementedError( 644 'pool objects cannot be passed between processes or pickled' 645 ) 646 647 def close(self): 648 util.debug('closing pool') 649 if self._state == RUN: 650 self._state = CLOSE 651 self._worker_handler._state = CLOSE 652 self._change_notifier.put(None) 653 654 def terminate(self): 655 util.debug('terminating pool') 656 self._state = TERMINATE 657 self._terminate() 658 659 def join(self): 660 util.debug('joining pool') 661 if self._state == RUN: 662 raise ValueError("Pool is still running") 663 elif self._state not in (CLOSE, TERMINATE): 664 raise ValueError("In unknown state") 665 self._worker_handler.join() 666 self._task_handler.join() 667 self._result_handler.join() 668 for p in self._pool: 669 p.join() 670 671 @staticmethod 672 def _help_stuff_finish(inqueue, task_handler, size): 673 # task_handler may be blocked trying to put items on inqueue 674 util.debug('removing tasks from inqueue until task handler finished') 675 inqueue._rlock.acquire() 676 while task_handler.is_alive() and inqueue._reader.poll(): 677 inqueue._reader.recv() 678 time.sleep(0) 679 680 @classmethod 681 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, 682 worker_handler, task_handler, result_handler, cache): 683 # this is guaranteed to only be called once 684 util.debug('finalizing pool') 685 686 # Notify that the worker_handler state has been changed so the 687 # _handle_workers loop can be unblocked (and exited) in order to 688 # send the finalization sentinel all the workers. 689 worker_handler._state = TERMINATE 690 change_notifier.put(None) 691 692 task_handler._state = TERMINATE 693 694 util.debug('helping task handler/workers to finish') 695 cls._help_stuff_finish(inqueue, task_handler, len(pool)) 696 697 if (not result_handler.is_alive()) and (len(cache) != 0): 698 raise AssertionError( 699 "Cannot have cache with result_hander not alive") 700 701 result_handler._state = TERMINATE 702 change_notifier.put(None) 703 outqueue.put(None) # sentinel 704 705 # We must wait for the worker handler to exit before terminating 706 # workers because we don't want workers to be restarted behind our back. 707 util.debug('joining worker handler') 708 if threading.current_thread() is not worker_handler: 709 worker_handler.join() 710 711 # Terminate workers which haven't already finished. 712 if pool and hasattr(pool[0], 'terminate'): 713 util.debug('terminating workers') 714 for p in pool: 715 if p.exitcode is None: 716 p.terminate() 717 718 util.debug('joining task handler') 719 if threading.current_thread() is not task_handler: 720 task_handler.join() 721 722 util.debug('joining result handler') 723 if threading.current_thread() is not result_handler: 724 result_handler.join() 725 726 if pool and hasattr(pool[0], 'terminate'): 727 util.debug('joining pool workers') 728 for p in pool: 729 if p.is_alive(): 730 # worker has not yet exited 731 util.debug('cleaning up worker %d' % p.pid) 732 p.join() 733 734 def __enter__(self): 735 self._check_running() 736 return self 737 738 def __exit__(self, exc_type, exc_val, exc_tb): 739 self.terminate() 740 741# 742# Class whose instances are returned by `Pool.apply_async()` 743# 744 745class ApplyResult(object): 746 747 def __init__(self, pool, callback, error_callback): 748 self._pool = pool 749 self._event = threading.Event() 750 self._job = next(job_counter) 751 self._cache = pool._cache 752 self._callback = callback 753 self._error_callback = error_callback 754 self._cache[self._job] = self 755 756 def ready(self): 757 return self._event.is_set() 758 759 def successful(self): 760 if not self.ready(): 761 raise ValueError("{0!r} not ready".format(self)) 762 return self._success 763 764 def wait(self, timeout=None): 765 self._event.wait(timeout) 766 767 def get(self, timeout=None): 768 self.wait(timeout) 769 if not self.ready(): 770 raise TimeoutError 771 if self._success: 772 return self._value 773 else: 774 raise self._value 775 776 def _set(self, i, obj): 777 self._success, self._value = obj 778 if self._callback and self._success: 779 self._callback(self._value) 780 if self._error_callback and not self._success: 781 self._error_callback(self._value) 782 self._event.set() 783 del self._cache[self._job] 784 self._pool = None 785 786 __class_getitem__ = classmethod(types.GenericAlias) 787 788AsyncResult = ApplyResult # create alias -- see #17805 789 790# 791# Class whose instances are returned by `Pool.map_async()` 792# 793 794class MapResult(ApplyResult): 795 796 def __init__(self, pool, chunksize, length, callback, error_callback): 797 ApplyResult.__init__(self, pool, callback, 798 error_callback=error_callback) 799 self._success = True 800 self._value = [None] * length 801 self._chunksize = chunksize 802 if chunksize <= 0: 803 self._number_left = 0 804 self._event.set() 805 del self._cache[self._job] 806 else: 807 self._number_left = length//chunksize + bool(length % chunksize) 808 809 def _set(self, i, success_result): 810 self._number_left -= 1 811 success, result = success_result 812 if success and self._success: 813 self._value[i*self._chunksize:(i+1)*self._chunksize] = result 814 if self._number_left == 0: 815 if self._callback: 816 self._callback(self._value) 817 del self._cache[self._job] 818 self._event.set() 819 self._pool = None 820 else: 821 if not success and self._success: 822 # only store first exception 823 self._success = False 824 self._value = result 825 if self._number_left == 0: 826 # only consider the result ready once all jobs are done 827 if self._error_callback: 828 self._error_callback(self._value) 829 del self._cache[self._job] 830 self._event.set() 831 self._pool = None 832 833# 834# Class whose instances are returned by `Pool.imap()` 835# 836 837class IMapIterator(object): 838 839 def __init__(self, pool): 840 self._pool = pool 841 self._cond = threading.Condition(threading.Lock()) 842 self._job = next(job_counter) 843 self._cache = pool._cache 844 self._items = collections.deque() 845 self._index = 0 846 self._length = None 847 self._unsorted = {} 848 self._cache[self._job] = self 849 850 def __iter__(self): 851 return self 852 853 def next(self, timeout=None): 854 with self._cond: 855 try: 856 item = self._items.popleft() 857 except IndexError: 858 if self._index == self._length: 859 self._pool = None 860 raise StopIteration from None 861 self._cond.wait(timeout) 862 try: 863 item = self._items.popleft() 864 except IndexError: 865 if self._index == self._length: 866 self._pool = None 867 raise StopIteration from None 868 raise TimeoutError from None 869 870 success, value = item 871 if success: 872 return value 873 raise value 874 875 __next__ = next # XXX 876 877 def _set(self, i, obj): 878 with self._cond: 879 if self._index == i: 880 self._items.append(obj) 881 self._index += 1 882 while self._index in self._unsorted: 883 obj = self._unsorted.pop(self._index) 884 self._items.append(obj) 885 self._index += 1 886 self._cond.notify() 887 else: 888 self._unsorted[i] = obj 889 890 if self._index == self._length: 891 del self._cache[self._job] 892 self._pool = None 893 894 def _set_length(self, length): 895 with self._cond: 896 self._length = length 897 if self._index == self._length: 898 self._cond.notify() 899 del self._cache[self._job] 900 self._pool = None 901 902# 903# Class whose instances are returned by `Pool.imap_unordered()` 904# 905 906class IMapUnorderedIterator(IMapIterator): 907 908 def _set(self, i, obj): 909 with self._cond: 910 self._items.append(obj) 911 self._index += 1 912 self._cond.notify() 913 if self._index == self._length: 914 del self._cache[self._job] 915 self._pool = None 916 917# 918# 919# 920 921class ThreadPool(Pool): 922 _wrap_exception = False 923 924 @staticmethod 925 def Process(ctx, *args, **kwds): 926 from .dummy import Process 927 return Process(*args, **kwds) 928 929 def __init__(self, processes=None, initializer=None, initargs=()): 930 Pool.__init__(self, processes, initializer, initargs) 931 932 def _setup_queues(self): 933 self._inqueue = queue.SimpleQueue() 934 self._outqueue = queue.SimpleQueue() 935 self._quick_put = self._inqueue.put 936 self._quick_get = self._outqueue.get 937 938 def _get_sentinels(self): 939 return [self._change_notifier._reader] 940 941 @staticmethod 942 def _get_worker_sentinels(workers): 943 return [] 944 945 @staticmethod 946 def _help_stuff_finish(inqueue, task_handler, size): 947 # drain inqueue, and put sentinels at its head to make workers finish 948 try: 949 while True: 950 inqueue.get(block=False) 951 except queue.Empty: 952 pass 953 for i in range(size): 954 inqueue.put(None) 955 956 def _wait_for_updates(self, sentinels, change_notifier, timeout): 957 time.sleep(timeout) 958