1*cda5da8dSAndroid Build Coastguard Worker# 2*cda5da8dSAndroid Build Coastguard Worker# Module implementing synchronization primitives 3*cda5da8dSAndroid Build Coastguard Worker# 4*cda5da8dSAndroid Build Coastguard Worker# multiprocessing/synchronize.py 5*cda5da8dSAndroid Build Coastguard Worker# 6*cda5da8dSAndroid Build Coastguard Worker# Copyright (c) 2006-2008, R Oudkerk 7*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement. 8*cda5da8dSAndroid Build Coastguard Worker# 9*cda5da8dSAndroid Build Coastguard Worker 10*cda5da8dSAndroid Build Coastguard Worker__all__ = [ 11*cda5da8dSAndroid Build Coastguard Worker 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' 12*cda5da8dSAndroid Build Coastguard Worker ] 13*cda5da8dSAndroid Build Coastguard Worker 14*cda5da8dSAndroid Build Coastguard Workerimport threading 15*cda5da8dSAndroid Build Coastguard Workerimport sys 16*cda5da8dSAndroid Build Coastguard Workerimport tempfile 17*cda5da8dSAndroid Build Coastguard Workerimport _multiprocessing 18*cda5da8dSAndroid Build Coastguard Workerimport time 19*cda5da8dSAndroid Build Coastguard Worker 20*cda5da8dSAndroid Build Coastguard Workerfrom . import context 21*cda5da8dSAndroid Build Coastguard Workerfrom . import process 22*cda5da8dSAndroid Build Coastguard Workerfrom . import util 23*cda5da8dSAndroid Build Coastguard Worker 24*cda5da8dSAndroid Build Coastguard Worker# Try to import the mp.synchronize module cleanly, if it fails 25*cda5da8dSAndroid Build Coastguard Worker# raise ImportError for platforms lacking a working sem_open implementation. 26*cda5da8dSAndroid Build Coastguard Worker# See issue 3770 27*cda5da8dSAndroid Build Coastguard Workertry: 28*cda5da8dSAndroid Build Coastguard Worker from _multiprocessing import SemLock, sem_unlink 29*cda5da8dSAndroid Build Coastguard Workerexcept (ImportError): 30*cda5da8dSAndroid Build Coastguard Worker raise ImportError("This platform lacks a functioning sem_open" + 31*cda5da8dSAndroid Build Coastguard Worker " implementation, therefore, the required" + 32*cda5da8dSAndroid Build Coastguard Worker " synchronization primitives needed will not" + 33*cda5da8dSAndroid Build Coastguard Worker " function, see issue 3770.") 34*cda5da8dSAndroid Build Coastguard Worker 35*cda5da8dSAndroid Build Coastguard Worker# 36*cda5da8dSAndroid Build Coastguard Worker# Constants 37*cda5da8dSAndroid Build Coastguard Worker# 38*cda5da8dSAndroid Build Coastguard Worker 39*cda5da8dSAndroid Build Coastguard WorkerRECURSIVE_MUTEX, SEMAPHORE = list(range(2)) 40*cda5da8dSAndroid Build Coastguard WorkerSEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX 41*cda5da8dSAndroid Build Coastguard Worker 42*cda5da8dSAndroid Build Coastguard Worker# 43*cda5da8dSAndroid Build Coastguard Worker# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` 44*cda5da8dSAndroid Build Coastguard Worker# 45*cda5da8dSAndroid Build Coastguard Worker 46*cda5da8dSAndroid Build Coastguard Workerclass SemLock(object): 47*cda5da8dSAndroid Build Coastguard Worker 48*cda5da8dSAndroid Build Coastguard Worker _rand = tempfile._RandomNameSequence() 49*cda5da8dSAndroid Build Coastguard Worker 50*cda5da8dSAndroid Build Coastguard Worker def __init__(self, kind, value, maxvalue, *, ctx): 51*cda5da8dSAndroid Build Coastguard Worker if ctx is None: 52*cda5da8dSAndroid Build Coastguard Worker ctx = context._default_context.get_context() 53*cda5da8dSAndroid Build Coastguard Worker name = ctx.get_start_method() 54*cda5da8dSAndroid Build Coastguard Worker unlink_now = sys.platform == 'win32' or name == 'fork' 55*cda5da8dSAndroid Build Coastguard Worker for i in range(100): 56*cda5da8dSAndroid Build Coastguard Worker try: 57*cda5da8dSAndroid Build Coastguard Worker sl = self._semlock = _multiprocessing.SemLock( 58*cda5da8dSAndroid Build Coastguard Worker kind, value, maxvalue, self._make_name(), 59*cda5da8dSAndroid Build Coastguard Worker unlink_now) 60*cda5da8dSAndroid Build Coastguard Worker except FileExistsError: 61*cda5da8dSAndroid Build Coastguard Worker pass 62*cda5da8dSAndroid Build Coastguard Worker else: 63*cda5da8dSAndroid Build Coastguard Worker break 64*cda5da8dSAndroid Build Coastguard Worker else: 65*cda5da8dSAndroid Build Coastguard Worker raise FileExistsError('cannot find name for semaphore') 66*cda5da8dSAndroid Build Coastguard Worker 67*cda5da8dSAndroid Build Coastguard Worker util.debug('created semlock with handle %s' % sl.handle) 68*cda5da8dSAndroid Build Coastguard Worker self._make_methods() 69*cda5da8dSAndroid Build Coastguard Worker 70*cda5da8dSAndroid Build Coastguard Worker if sys.platform != 'win32': 71*cda5da8dSAndroid Build Coastguard Worker def _after_fork(obj): 72*cda5da8dSAndroid Build Coastguard Worker obj._semlock._after_fork() 73*cda5da8dSAndroid Build Coastguard Worker util.register_after_fork(self, _after_fork) 74*cda5da8dSAndroid Build Coastguard Worker 75*cda5da8dSAndroid Build Coastguard Worker if self._semlock.name is not None: 76*cda5da8dSAndroid Build Coastguard Worker # We only get here if we are on Unix with forking 77*cda5da8dSAndroid Build Coastguard Worker # disabled. When the object is garbage collected or the 78*cda5da8dSAndroid Build Coastguard Worker # process shuts down we unlink the semaphore name 79*cda5da8dSAndroid Build Coastguard Worker from .resource_tracker import register 80*cda5da8dSAndroid Build Coastguard Worker register(self._semlock.name, "semaphore") 81*cda5da8dSAndroid Build Coastguard Worker util.Finalize(self, SemLock._cleanup, (self._semlock.name,), 82*cda5da8dSAndroid Build Coastguard Worker exitpriority=0) 83*cda5da8dSAndroid Build Coastguard Worker 84*cda5da8dSAndroid Build Coastguard Worker @staticmethod 85*cda5da8dSAndroid Build Coastguard Worker def _cleanup(name): 86*cda5da8dSAndroid Build Coastguard Worker from .resource_tracker import unregister 87*cda5da8dSAndroid Build Coastguard Worker sem_unlink(name) 88*cda5da8dSAndroid Build Coastguard Worker unregister(name, "semaphore") 89*cda5da8dSAndroid Build Coastguard Worker 90*cda5da8dSAndroid Build Coastguard Worker def _make_methods(self): 91*cda5da8dSAndroid Build Coastguard Worker self.acquire = self._semlock.acquire 92*cda5da8dSAndroid Build Coastguard Worker self.release = self._semlock.release 93*cda5da8dSAndroid Build Coastguard Worker 94*cda5da8dSAndroid Build Coastguard Worker def __enter__(self): 95*cda5da8dSAndroid Build Coastguard Worker return self._semlock.__enter__() 96*cda5da8dSAndroid Build Coastguard Worker 97*cda5da8dSAndroid Build Coastguard Worker def __exit__(self, *args): 98*cda5da8dSAndroid Build Coastguard Worker return self._semlock.__exit__(*args) 99*cda5da8dSAndroid Build Coastguard Worker 100*cda5da8dSAndroid Build Coastguard Worker def __getstate__(self): 101*cda5da8dSAndroid Build Coastguard Worker context.assert_spawning(self) 102*cda5da8dSAndroid Build Coastguard Worker sl = self._semlock 103*cda5da8dSAndroid Build Coastguard Worker if sys.platform == 'win32': 104*cda5da8dSAndroid Build Coastguard Worker h = context.get_spawning_popen().duplicate_for_child(sl.handle) 105*cda5da8dSAndroid Build Coastguard Worker else: 106*cda5da8dSAndroid Build Coastguard Worker h = sl.handle 107*cda5da8dSAndroid Build Coastguard Worker return (h, sl.kind, sl.maxvalue, sl.name) 108*cda5da8dSAndroid Build Coastguard Worker 109*cda5da8dSAndroid Build Coastguard Worker def __setstate__(self, state): 110*cda5da8dSAndroid Build Coastguard Worker self._semlock = _multiprocessing.SemLock._rebuild(*state) 111*cda5da8dSAndroid Build Coastguard Worker util.debug('recreated blocker with handle %r' % state[0]) 112*cda5da8dSAndroid Build Coastguard Worker self._make_methods() 113*cda5da8dSAndroid Build Coastguard Worker 114*cda5da8dSAndroid Build Coastguard Worker @staticmethod 115*cda5da8dSAndroid Build Coastguard Worker def _make_name(): 116*cda5da8dSAndroid Build Coastguard Worker return '%s-%s' % (process.current_process()._config['semprefix'], 117*cda5da8dSAndroid Build Coastguard Worker next(SemLock._rand)) 118*cda5da8dSAndroid Build Coastguard Worker 119*cda5da8dSAndroid Build Coastguard Worker# 120*cda5da8dSAndroid Build Coastguard Worker# Semaphore 121*cda5da8dSAndroid Build Coastguard Worker# 122*cda5da8dSAndroid Build Coastguard Worker 123*cda5da8dSAndroid Build Coastguard Workerclass Semaphore(SemLock): 124*cda5da8dSAndroid Build Coastguard Worker 125*cda5da8dSAndroid Build Coastguard Worker def __init__(self, value=1, *, ctx): 126*cda5da8dSAndroid Build Coastguard Worker SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx) 127*cda5da8dSAndroid Build Coastguard Worker 128*cda5da8dSAndroid Build Coastguard Worker def get_value(self): 129*cda5da8dSAndroid Build Coastguard Worker return self._semlock._get_value() 130*cda5da8dSAndroid Build Coastguard Worker 131*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 132*cda5da8dSAndroid Build Coastguard Worker try: 133*cda5da8dSAndroid Build Coastguard Worker value = self._semlock._get_value() 134*cda5da8dSAndroid Build Coastguard Worker except Exception: 135*cda5da8dSAndroid Build Coastguard Worker value = 'unknown' 136*cda5da8dSAndroid Build Coastguard Worker return '<%s(value=%s)>' % (self.__class__.__name__, value) 137*cda5da8dSAndroid Build Coastguard Worker 138*cda5da8dSAndroid Build Coastguard Worker# 139*cda5da8dSAndroid Build Coastguard Worker# Bounded semaphore 140*cda5da8dSAndroid Build Coastguard Worker# 141*cda5da8dSAndroid Build Coastguard Worker 142*cda5da8dSAndroid Build Coastguard Workerclass BoundedSemaphore(Semaphore): 143*cda5da8dSAndroid Build Coastguard Worker 144*cda5da8dSAndroid Build Coastguard Worker def __init__(self, value=1, *, ctx): 145*cda5da8dSAndroid Build Coastguard Worker SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx) 146*cda5da8dSAndroid Build Coastguard Worker 147*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 148*cda5da8dSAndroid Build Coastguard Worker try: 149*cda5da8dSAndroid Build Coastguard Worker value = self._semlock._get_value() 150*cda5da8dSAndroid Build Coastguard Worker except Exception: 151*cda5da8dSAndroid Build Coastguard Worker value = 'unknown' 152*cda5da8dSAndroid Build Coastguard Worker return '<%s(value=%s, maxvalue=%s)>' % \ 153*cda5da8dSAndroid Build Coastguard Worker (self.__class__.__name__, value, self._semlock.maxvalue) 154*cda5da8dSAndroid Build Coastguard Worker 155*cda5da8dSAndroid Build Coastguard Worker# 156*cda5da8dSAndroid Build Coastguard Worker# Non-recursive lock 157*cda5da8dSAndroid Build Coastguard Worker# 158*cda5da8dSAndroid Build Coastguard Worker 159*cda5da8dSAndroid Build Coastguard Workerclass Lock(SemLock): 160*cda5da8dSAndroid Build Coastguard Worker 161*cda5da8dSAndroid Build Coastguard Worker def __init__(self, *, ctx): 162*cda5da8dSAndroid Build Coastguard Worker SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) 163*cda5da8dSAndroid Build Coastguard Worker 164*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 165*cda5da8dSAndroid Build Coastguard Worker try: 166*cda5da8dSAndroid Build Coastguard Worker if self._semlock._is_mine(): 167*cda5da8dSAndroid Build Coastguard Worker name = process.current_process().name 168*cda5da8dSAndroid Build Coastguard Worker if threading.current_thread().name != 'MainThread': 169*cda5da8dSAndroid Build Coastguard Worker name += '|' + threading.current_thread().name 170*cda5da8dSAndroid Build Coastguard Worker elif self._semlock._get_value() == 1: 171*cda5da8dSAndroid Build Coastguard Worker name = 'None' 172*cda5da8dSAndroid Build Coastguard Worker elif self._semlock._count() > 0: 173*cda5da8dSAndroid Build Coastguard Worker name = 'SomeOtherThread' 174*cda5da8dSAndroid Build Coastguard Worker else: 175*cda5da8dSAndroid Build Coastguard Worker name = 'SomeOtherProcess' 176*cda5da8dSAndroid Build Coastguard Worker except Exception: 177*cda5da8dSAndroid Build Coastguard Worker name = 'unknown' 178*cda5da8dSAndroid Build Coastguard Worker return '<%s(owner=%s)>' % (self.__class__.__name__, name) 179*cda5da8dSAndroid Build Coastguard Worker 180*cda5da8dSAndroid Build Coastguard Worker# 181*cda5da8dSAndroid Build Coastguard Worker# Recursive lock 182*cda5da8dSAndroid Build Coastguard Worker# 183*cda5da8dSAndroid Build Coastguard Worker 184*cda5da8dSAndroid Build Coastguard Workerclass RLock(SemLock): 185*cda5da8dSAndroid Build Coastguard Worker 186*cda5da8dSAndroid Build Coastguard Worker def __init__(self, *, ctx): 187*cda5da8dSAndroid Build Coastguard Worker SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx) 188*cda5da8dSAndroid Build Coastguard Worker 189*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 190*cda5da8dSAndroid Build Coastguard Worker try: 191*cda5da8dSAndroid Build Coastguard Worker if self._semlock._is_mine(): 192*cda5da8dSAndroid Build Coastguard Worker name = process.current_process().name 193*cda5da8dSAndroid Build Coastguard Worker if threading.current_thread().name != 'MainThread': 194*cda5da8dSAndroid Build Coastguard Worker name += '|' + threading.current_thread().name 195*cda5da8dSAndroid Build Coastguard Worker count = self._semlock._count() 196*cda5da8dSAndroid Build Coastguard Worker elif self._semlock._get_value() == 1: 197*cda5da8dSAndroid Build Coastguard Worker name, count = 'None', 0 198*cda5da8dSAndroid Build Coastguard Worker elif self._semlock._count() > 0: 199*cda5da8dSAndroid Build Coastguard Worker name, count = 'SomeOtherThread', 'nonzero' 200*cda5da8dSAndroid Build Coastguard Worker else: 201*cda5da8dSAndroid Build Coastguard Worker name, count = 'SomeOtherProcess', 'nonzero' 202*cda5da8dSAndroid Build Coastguard Worker except Exception: 203*cda5da8dSAndroid Build Coastguard Worker name, count = 'unknown', 'unknown' 204*cda5da8dSAndroid Build Coastguard Worker return '<%s(%s, %s)>' % (self.__class__.__name__, name, count) 205*cda5da8dSAndroid Build Coastguard Worker 206*cda5da8dSAndroid Build Coastguard Worker# 207*cda5da8dSAndroid Build Coastguard Worker# Condition variable 208*cda5da8dSAndroid Build Coastguard Worker# 209*cda5da8dSAndroid Build Coastguard Worker 210*cda5da8dSAndroid Build Coastguard Workerclass Condition(object): 211*cda5da8dSAndroid Build Coastguard Worker 212*cda5da8dSAndroid Build Coastguard Worker def __init__(self, lock=None, *, ctx): 213*cda5da8dSAndroid Build Coastguard Worker self._lock = lock or ctx.RLock() 214*cda5da8dSAndroid Build Coastguard Worker self._sleeping_count = ctx.Semaphore(0) 215*cda5da8dSAndroid Build Coastguard Worker self._woken_count = ctx.Semaphore(0) 216*cda5da8dSAndroid Build Coastguard Worker self._wait_semaphore = ctx.Semaphore(0) 217*cda5da8dSAndroid Build Coastguard Worker self._make_methods() 218*cda5da8dSAndroid Build Coastguard Worker 219*cda5da8dSAndroid Build Coastguard Worker def __getstate__(self): 220*cda5da8dSAndroid Build Coastguard Worker context.assert_spawning(self) 221*cda5da8dSAndroid Build Coastguard Worker return (self._lock, self._sleeping_count, 222*cda5da8dSAndroid Build Coastguard Worker self._woken_count, self._wait_semaphore) 223*cda5da8dSAndroid Build Coastguard Worker 224*cda5da8dSAndroid Build Coastguard Worker def __setstate__(self, state): 225*cda5da8dSAndroid Build Coastguard Worker (self._lock, self._sleeping_count, 226*cda5da8dSAndroid Build Coastguard Worker self._woken_count, self._wait_semaphore) = state 227*cda5da8dSAndroid Build Coastguard Worker self._make_methods() 228*cda5da8dSAndroid Build Coastguard Worker 229*cda5da8dSAndroid Build Coastguard Worker def __enter__(self): 230*cda5da8dSAndroid Build Coastguard Worker return self._lock.__enter__() 231*cda5da8dSAndroid Build Coastguard Worker 232*cda5da8dSAndroid Build Coastguard Worker def __exit__(self, *args): 233*cda5da8dSAndroid Build Coastguard Worker return self._lock.__exit__(*args) 234*cda5da8dSAndroid Build Coastguard Worker 235*cda5da8dSAndroid Build Coastguard Worker def _make_methods(self): 236*cda5da8dSAndroid Build Coastguard Worker self.acquire = self._lock.acquire 237*cda5da8dSAndroid Build Coastguard Worker self.release = self._lock.release 238*cda5da8dSAndroid Build Coastguard Worker 239*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 240*cda5da8dSAndroid Build Coastguard Worker try: 241*cda5da8dSAndroid Build Coastguard Worker num_waiters = (self._sleeping_count._semlock._get_value() - 242*cda5da8dSAndroid Build Coastguard Worker self._woken_count._semlock._get_value()) 243*cda5da8dSAndroid Build Coastguard Worker except Exception: 244*cda5da8dSAndroid Build Coastguard Worker num_waiters = 'unknown' 245*cda5da8dSAndroid Build Coastguard Worker return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters) 246*cda5da8dSAndroid Build Coastguard Worker 247*cda5da8dSAndroid Build Coastguard Worker def wait(self, timeout=None): 248*cda5da8dSAndroid Build Coastguard Worker assert self._lock._semlock._is_mine(), \ 249*cda5da8dSAndroid Build Coastguard Worker 'must acquire() condition before using wait()' 250*cda5da8dSAndroid Build Coastguard Worker 251*cda5da8dSAndroid Build Coastguard Worker # indicate that this thread is going to sleep 252*cda5da8dSAndroid Build Coastguard Worker self._sleeping_count.release() 253*cda5da8dSAndroid Build Coastguard Worker 254*cda5da8dSAndroid Build Coastguard Worker # release lock 255*cda5da8dSAndroid Build Coastguard Worker count = self._lock._semlock._count() 256*cda5da8dSAndroid Build Coastguard Worker for i in range(count): 257*cda5da8dSAndroid Build Coastguard Worker self._lock.release() 258*cda5da8dSAndroid Build Coastguard Worker 259*cda5da8dSAndroid Build Coastguard Worker try: 260*cda5da8dSAndroid Build Coastguard Worker # wait for notification or timeout 261*cda5da8dSAndroid Build Coastguard Worker return self._wait_semaphore.acquire(True, timeout) 262*cda5da8dSAndroid Build Coastguard Worker finally: 263*cda5da8dSAndroid Build Coastguard Worker # indicate that this thread has woken 264*cda5da8dSAndroid Build Coastguard Worker self._woken_count.release() 265*cda5da8dSAndroid Build Coastguard Worker 266*cda5da8dSAndroid Build Coastguard Worker # reacquire lock 267*cda5da8dSAndroid Build Coastguard Worker for i in range(count): 268*cda5da8dSAndroid Build Coastguard Worker self._lock.acquire() 269*cda5da8dSAndroid Build Coastguard Worker 270*cda5da8dSAndroid Build Coastguard Worker def notify(self, n=1): 271*cda5da8dSAndroid Build Coastguard Worker assert self._lock._semlock._is_mine(), 'lock is not owned' 272*cda5da8dSAndroid Build Coastguard Worker assert not self._wait_semaphore.acquire( 273*cda5da8dSAndroid Build Coastguard Worker False), ('notify: Should not have been able to acquire ' 274*cda5da8dSAndroid Build Coastguard Worker + '_wait_semaphore') 275*cda5da8dSAndroid Build Coastguard Worker 276*cda5da8dSAndroid Build Coastguard Worker # to take account of timeouts since last notify*() we subtract 277*cda5da8dSAndroid Build Coastguard Worker # woken_count from sleeping_count and rezero woken_count 278*cda5da8dSAndroid Build Coastguard Worker while self._woken_count.acquire(False): 279*cda5da8dSAndroid Build Coastguard Worker res = self._sleeping_count.acquire(False) 280*cda5da8dSAndroid Build Coastguard Worker assert res, ('notify: Bug in sleeping_count.acquire' 281*cda5da8dSAndroid Build Coastguard Worker + '- res should not be False') 282*cda5da8dSAndroid Build Coastguard Worker 283*cda5da8dSAndroid Build Coastguard Worker sleepers = 0 284*cda5da8dSAndroid Build Coastguard Worker while sleepers < n and self._sleeping_count.acquire(False): 285*cda5da8dSAndroid Build Coastguard Worker self._wait_semaphore.release() # wake up one sleeper 286*cda5da8dSAndroid Build Coastguard Worker sleepers += 1 287*cda5da8dSAndroid Build Coastguard Worker 288*cda5da8dSAndroid Build Coastguard Worker if sleepers: 289*cda5da8dSAndroid Build Coastguard Worker for i in range(sleepers): 290*cda5da8dSAndroid Build Coastguard Worker self._woken_count.acquire() # wait for a sleeper to wake 291*cda5da8dSAndroid Build Coastguard Worker 292*cda5da8dSAndroid Build Coastguard Worker # rezero wait_semaphore in case some timeouts just happened 293*cda5da8dSAndroid Build Coastguard Worker while self._wait_semaphore.acquire(False): 294*cda5da8dSAndroid Build Coastguard Worker pass 295*cda5da8dSAndroid Build Coastguard Worker 296*cda5da8dSAndroid Build Coastguard Worker def notify_all(self): 297*cda5da8dSAndroid Build Coastguard Worker self.notify(n=sys.maxsize) 298*cda5da8dSAndroid Build Coastguard Worker 299*cda5da8dSAndroid Build Coastguard Worker def wait_for(self, predicate, timeout=None): 300*cda5da8dSAndroid Build Coastguard Worker result = predicate() 301*cda5da8dSAndroid Build Coastguard Worker if result: 302*cda5da8dSAndroid Build Coastguard Worker return result 303*cda5da8dSAndroid Build Coastguard Worker if timeout is not None: 304*cda5da8dSAndroid Build Coastguard Worker endtime = time.monotonic() + timeout 305*cda5da8dSAndroid Build Coastguard Worker else: 306*cda5da8dSAndroid Build Coastguard Worker endtime = None 307*cda5da8dSAndroid Build Coastguard Worker waittime = None 308*cda5da8dSAndroid Build Coastguard Worker while not result: 309*cda5da8dSAndroid Build Coastguard Worker if endtime is not None: 310*cda5da8dSAndroid Build Coastguard Worker waittime = endtime - time.monotonic() 311*cda5da8dSAndroid Build Coastguard Worker if waittime <= 0: 312*cda5da8dSAndroid Build Coastguard Worker break 313*cda5da8dSAndroid Build Coastguard Worker self.wait(waittime) 314*cda5da8dSAndroid Build Coastguard Worker result = predicate() 315*cda5da8dSAndroid Build Coastguard Worker return result 316*cda5da8dSAndroid Build Coastguard Worker 317*cda5da8dSAndroid Build Coastguard Worker# 318*cda5da8dSAndroid Build Coastguard Worker# Event 319*cda5da8dSAndroid Build Coastguard Worker# 320*cda5da8dSAndroid Build Coastguard Worker 321*cda5da8dSAndroid Build Coastguard Workerclass Event(object): 322*cda5da8dSAndroid Build Coastguard Worker 323*cda5da8dSAndroid Build Coastguard Worker def __init__(self, *, ctx): 324*cda5da8dSAndroid Build Coastguard Worker self._cond = ctx.Condition(ctx.Lock()) 325*cda5da8dSAndroid Build Coastguard Worker self._flag = ctx.Semaphore(0) 326*cda5da8dSAndroid Build Coastguard Worker 327*cda5da8dSAndroid Build Coastguard Worker def is_set(self): 328*cda5da8dSAndroid Build Coastguard Worker with self._cond: 329*cda5da8dSAndroid Build Coastguard Worker if self._flag.acquire(False): 330*cda5da8dSAndroid Build Coastguard Worker self._flag.release() 331*cda5da8dSAndroid Build Coastguard Worker return True 332*cda5da8dSAndroid Build Coastguard Worker return False 333*cda5da8dSAndroid Build Coastguard Worker 334*cda5da8dSAndroid Build Coastguard Worker def set(self): 335*cda5da8dSAndroid Build Coastguard Worker with self._cond: 336*cda5da8dSAndroid Build Coastguard Worker self._flag.acquire(False) 337*cda5da8dSAndroid Build Coastguard Worker self._flag.release() 338*cda5da8dSAndroid Build Coastguard Worker self._cond.notify_all() 339*cda5da8dSAndroid Build Coastguard Worker 340*cda5da8dSAndroid Build Coastguard Worker def clear(self): 341*cda5da8dSAndroid Build Coastguard Worker with self._cond: 342*cda5da8dSAndroid Build Coastguard Worker self._flag.acquire(False) 343*cda5da8dSAndroid Build Coastguard Worker 344*cda5da8dSAndroid Build Coastguard Worker def wait(self, timeout=None): 345*cda5da8dSAndroid Build Coastguard Worker with self._cond: 346*cda5da8dSAndroid Build Coastguard Worker if self._flag.acquire(False): 347*cda5da8dSAndroid Build Coastguard Worker self._flag.release() 348*cda5da8dSAndroid Build Coastguard Worker else: 349*cda5da8dSAndroid Build Coastguard Worker self._cond.wait(timeout) 350*cda5da8dSAndroid Build Coastguard Worker 351*cda5da8dSAndroid Build Coastguard Worker if self._flag.acquire(False): 352*cda5da8dSAndroid Build Coastguard Worker self._flag.release() 353*cda5da8dSAndroid Build Coastguard Worker return True 354*cda5da8dSAndroid Build Coastguard Worker return False 355*cda5da8dSAndroid Build Coastguard Worker 356*cda5da8dSAndroid Build Coastguard Worker def __repr__(self) -> str: 357*cda5da8dSAndroid Build Coastguard Worker set_status = 'set' if self.is_set() else 'unset' 358*cda5da8dSAndroid Build Coastguard Worker return f"<{type(self).__qualname__} at {id(self):#x} {set_status}>" 359*cda5da8dSAndroid Build Coastguard Worker# 360*cda5da8dSAndroid Build Coastguard Worker# Barrier 361*cda5da8dSAndroid Build Coastguard Worker# 362*cda5da8dSAndroid Build Coastguard Worker 363*cda5da8dSAndroid Build Coastguard Workerclass Barrier(threading.Barrier): 364*cda5da8dSAndroid Build Coastguard Worker 365*cda5da8dSAndroid Build Coastguard Worker def __init__(self, parties, action=None, timeout=None, *, ctx): 366*cda5da8dSAndroid Build Coastguard Worker import struct 367*cda5da8dSAndroid Build Coastguard Worker from .heap import BufferWrapper 368*cda5da8dSAndroid Build Coastguard Worker wrapper = BufferWrapper(struct.calcsize('i') * 2) 369*cda5da8dSAndroid Build Coastguard Worker cond = ctx.Condition() 370*cda5da8dSAndroid Build Coastguard Worker self.__setstate__((parties, action, timeout, cond, wrapper)) 371*cda5da8dSAndroid Build Coastguard Worker self._state = 0 372*cda5da8dSAndroid Build Coastguard Worker self._count = 0 373*cda5da8dSAndroid Build Coastguard Worker 374*cda5da8dSAndroid Build Coastguard Worker def __setstate__(self, state): 375*cda5da8dSAndroid Build Coastguard Worker (self._parties, self._action, self._timeout, 376*cda5da8dSAndroid Build Coastguard Worker self._cond, self._wrapper) = state 377*cda5da8dSAndroid Build Coastguard Worker self._array = self._wrapper.create_memoryview().cast('i') 378*cda5da8dSAndroid Build Coastguard Worker 379*cda5da8dSAndroid Build Coastguard Worker def __getstate__(self): 380*cda5da8dSAndroid Build Coastguard Worker return (self._parties, self._action, self._timeout, 381*cda5da8dSAndroid Build Coastguard Worker self._cond, self._wrapper) 382*cda5da8dSAndroid Build Coastguard Worker 383*cda5da8dSAndroid Build Coastguard Worker @property 384*cda5da8dSAndroid Build Coastguard Worker def _state(self): 385*cda5da8dSAndroid Build Coastguard Worker return self._array[0] 386*cda5da8dSAndroid Build Coastguard Worker 387*cda5da8dSAndroid Build Coastguard Worker @_state.setter 388*cda5da8dSAndroid Build Coastguard Worker def _state(self, value): 389*cda5da8dSAndroid Build Coastguard Worker self._array[0] = value 390*cda5da8dSAndroid Build Coastguard Worker 391*cda5da8dSAndroid Build Coastguard Worker @property 392*cda5da8dSAndroid Build Coastguard Worker def _count(self): 393*cda5da8dSAndroid Build Coastguard Worker return self._array[1] 394*cda5da8dSAndroid Build Coastguard Worker 395*cda5da8dSAndroid Build Coastguard Worker @_count.setter 396*cda5da8dSAndroid Build Coastguard Worker def _count(self, value): 397*cda5da8dSAndroid Build Coastguard Worker self._array[1] = value 398