1*9c5db199SXin Li# -*- coding: utf-8 -*- 2*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be 4*9c5db199SXin Li# found in the LICENSE file. 5*9c5db199SXin Li 6*9c5db199SXin Li"""Basic locking functionality.""" 7*9c5db199SXin Li 8*9c5db199SXin Lifrom __future__ import print_function 9*9c5db199SXin Li 10*9c5db199SXin Liimport contextlib 11*9c5db199SXin Liimport os 12*9c5db199SXin Liimport errno 13*9c5db199SXin Liimport fcntl 14*9c5db199SXin Liimport stat 15*9c5db199SXin Liimport tempfile 16*9c5db199SXin Li 17*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import cros_build_lib 18*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import cros_logging as logging 19*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import osutils 20*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import retry_util 21*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import timeout_util 22*9c5db199SXin Li 23*9c5db199SXin Li 24*9c5db199SXin LiLOCKF = 'lockf' 25*9c5db199SXin LiFLOCK = 'flock' 26*9c5db199SXin Li 27*9c5db199SXin Li 28*9c5db199SXin Liclass LockNotAcquiredError(Exception): 29*9c5db199SXin Li """Signals that the lock was not acquired.""" 30*9c5db199SXin Li 31*9c5db199SXin Li 32*9c5db199SXin Liclass LockingError(Exception): 33*9c5db199SXin Li """Signals miscellaneous problems in the locking process.""" 34*9c5db199SXin Li 35*9c5db199SXin Li 36*9c5db199SXin Li@contextlib.contextmanager 37*9c5db199SXin Lidef _optional_timer_context(timeout): 38*9c5db199SXin Li """Use the timeout_util.Timeout contextmanager if timeout is set.""" 39*9c5db199SXin Li if timeout: 40*9c5db199SXin Li with timeout_util.Timeout(timeout): 41*9c5db199SXin Li yield 42*9c5db199SXin Li else: 43*9c5db199SXin Li yield 44*9c5db199SXin Li 45*9c5db199SXin Li 46*9c5db199SXin Liclass _Lock(cros_build_lib.MasterPidContextManager): 47*9c5db199SXin Li """Base lockf based locking. Derivatives need to override _GetFd""" 48*9c5db199SXin Li 49*9c5db199SXin Li def __init__(self, description=None, verbose=True, locktype=LOCKF, 50*9c5db199SXin Li blocking=True, blocking_timeout=None): 51*9c5db199SXin Li """Initialize this instance. 52*9c5db199SXin Li 53*9c5db199SXin Li Two types of locks are available: LOCKF and FLOCK. 54*9c5db199SXin Li 55*9c5db199SXin Li Use LOCKF (POSIX locks) if: 56*9c5db199SXin Li - you need to lock a file between processes created by the 57*9c5db199SXin Li parallel/multiprocess libraries 58*9c5db199SXin Li 59*9c5db199SXin Li Use FLOCK (BSD locks) if these scenarios apply: 60*9c5db199SXin Li - you need to lock a file between shell scripts running the flock program 61*9c5db199SXin Li - you need the lock to be bound to the fd and thus inheritable across 62*9c5db199SXin Li execs 63*9c5db199SXin Li 64*9c5db199SXin Li Note: These two locks are completely independent; using one on a path will 65*9c5db199SXin Li not block using the other on the same path. 66*9c5db199SXin Li 67*9c5db199SXin Li Args: 68*9c5db199SXin Li path: On disk pathway to lock. Can be a directory or a file. 69*9c5db199SXin Li description: A description for this lock- what is it protecting? 70*9c5db199SXin Li verbose: Verbose logging? 71*9c5db199SXin Li locktype: Type of lock to use (lockf or flock). 72*9c5db199SXin Li blocking: If True, use a blocking lock. 73*9c5db199SXin Li blocking_timeout: If not None, time is seconds to wait on blocking calls. 74*9c5db199SXin Li """ 75*9c5db199SXin Li cros_build_lib.MasterPidContextManager.__init__(self) 76*9c5db199SXin Li self._verbose = verbose 77*9c5db199SXin Li self.description = description 78*9c5db199SXin Li self._fd = None 79*9c5db199SXin Li self.locking_mechanism = fcntl.flock if locktype == FLOCK else fcntl.lockf 80*9c5db199SXin Li # Store (to log) the locktype string. 81*9c5db199SXin Li self.locktype = locktype 82*9c5db199SXin Li self.blocking = blocking 83*9c5db199SXin Li self.blocking_timeout = blocking_timeout 84*9c5db199SXin Li 85*9c5db199SXin Li @property 86*9c5db199SXin Li def fd(self): 87*9c5db199SXin Li if self._fd is None: 88*9c5db199SXin Li self._fd = self._GetFd() 89*9c5db199SXin Li # Ensure that all derivatives of this lock can't bleed the fd 90*9c5db199SXin Li # across execs. 91*9c5db199SXin Li fcntl.fcntl(self._fd, fcntl.F_SETFD, 92*9c5db199SXin Li fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC) 93*9c5db199SXin Li return self._fd 94*9c5db199SXin Li 95*9c5db199SXin Li def _GetFd(self): 96*9c5db199SXin Li raise NotImplementedError(self, '_GetFd') 97*9c5db199SXin Li 98*9c5db199SXin Li def _enforce_lock(self, flags, message): 99*9c5db199SXin Li # Try nonblocking first, if it fails, display the context/message, 100*9c5db199SXin Li # and then wait on the lock. 101*9c5db199SXin Li try: 102*9c5db199SXin Li self.locking_mechanism(self.fd, flags|fcntl.LOCK_NB) 103*9c5db199SXin Li return 104*9c5db199SXin Li except EnvironmentError as e: 105*9c5db199SXin Li if e.errno == errno.EDEADLK: 106*9c5db199SXin Li self.unlock() 107*9c5db199SXin Li elif e.errno != errno.EAGAIN: 108*9c5db199SXin Li raise 109*9c5db199SXin Li if self.description: 110*9c5db199SXin Li message = '%s: blocking (LOCK_NB) (%s) while %s' % (self.description, 111*9c5db199SXin Li self.locktype, 112*9c5db199SXin Li message) 113*9c5db199SXin Li if not self.blocking: 114*9c5db199SXin Li self.close() 115*9c5db199SXin Li raise LockNotAcquiredError(message) 116*9c5db199SXin Li if self._verbose: 117*9c5db199SXin Li logging.info(message) 118*9c5db199SXin Li 119*9c5db199SXin Li try: 120*9c5db199SXin Li with _optional_timer_context(self.blocking_timeout): 121*9c5db199SXin Li self.locking_mechanism(self.fd, flags) 122*9c5db199SXin Li except timeout_util.TimeoutError: 123*9c5db199SXin Li description = self.description or 'locking._enforce_lock' 124*9c5db199SXin Li logging.error( 125*9c5db199SXin Li 'Timed out after waiting %d seconds for blocking lock (%s): %s', 126*9c5db199SXin Li self.blocking_timeout, self.locktype, description) 127*9c5db199SXin Li raise 128*9c5db199SXin Li except EnvironmentError as e: 129*9c5db199SXin Li if e.errno != errno.EDEADLK: 130*9c5db199SXin Li message = ('%s: blocking wait failed errno %s' 131*9c5db199SXin Li % (self.description, e)) 132*9c5db199SXin Li raise 133*9c5db199SXin Li self.unlock() 134*9c5db199SXin Li self.locking_mechanism(self.fd, flags) 135*9c5db199SXin Li logging.debug('%s: lock has been acquired (%s), continuing.', 136*9c5db199SXin Li self.description, self.locktype) 137*9c5db199SXin Li 138*9c5db199SXin Li def lock(self, shared=False): 139*9c5db199SXin Li """Take a lock of type |shared|. 140*9c5db199SXin Li 141*9c5db199SXin Li Any existing lock will be updated if need be. 142*9c5db199SXin Li 143*9c5db199SXin Li Args: 144*9c5db199SXin Li shared: If True make the lock shared. 145*9c5db199SXin Li 146*9c5db199SXin Li Returns: 147*9c5db199SXin Li self, allowing it to be used as a `with` target. 148*9c5db199SXin Li 149*9c5db199SXin Li Raises: 150*9c5db199SXin Li IOError if the operation fails in some way. 151*9c5db199SXin Li LockNotAcquiredError if the lock couldn't be acquired (non-blocking 152*9c5db199SXin Li mode only). 153*9c5db199SXin Li """ 154*9c5db199SXin Li self._enforce_lock( 155*9c5db199SXin Li fcntl.LOCK_SH if shared else fcntl.LOCK_EX, 156*9c5db199SXin Li 'taking a %s lock' % ('shared' if shared else 'exclusive')) 157*9c5db199SXin Li return self 158*9c5db199SXin Li 159*9c5db199SXin Li def read_lock(self, message='taking read lock'): 160*9c5db199SXin Li """Take a read lock (shared), downgrading from write if required. 161*9c5db199SXin Li 162*9c5db199SXin Li Args: 163*9c5db199SXin Li message: A description of what/why this lock is being taken. 164*9c5db199SXin Li 165*9c5db199SXin Li Returns: 166*9c5db199SXin Li self, allowing it to be used as a `with` target. 167*9c5db199SXin Li 168*9c5db199SXin Li Raises: 169*9c5db199SXin Li IOError if the operation fails in some way. 170*9c5db199SXin Li """ 171*9c5db199SXin Li self._enforce_lock(fcntl.LOCK_SH, message) 172*9c5db199SXin Li return self 173*9c5db199SXin Li 174*9c5db199SXin Li def write_lock(self, message='taking write lock'): 175*9c5db199SXin Li """Take a write lock (exclusive), upgrading from read if required. 176*9c5db199SXin Li 177*9c5db199SXin Li Note that if the lock state is being upgraded from read to write, 178*9c5db199SXin Li a deadlock potential exists- as such we *will* release the lock 179*9c5db199SXin Li to work around it. Any consuming code should not assume that 180*9c5db199SXin Li transitioning from shared to exclusive means no one else has 181*9c5db199SXin Li gotten at the critical resource in between for this reason. 182*9c5db199SXin Li 183*9c5db199SXin Li Args: 184*9c5db199SXin Li message: A description of what/why this lock is being taken. 185*9c5db199SXin Li 186*9c5db199SXin Li Returns: 187*9c5db199SXin Li self, allowing it to be used as a `with` target. 188*9c5db199SXin Li 189*9c5db199SXin Li Raises: 190*9c5db199SXin Li IOError if the operation fails in some way. 191*9c5db199SXin Li """ 192*9c5db199SXin Li self._enforce_lock(fcntl.LOCK_EX, message) 193*9c5db199SXin Li return self 194*9c5db199SXin Li 195*9c5db199SXin Li def unlock(self): 196*9c5db199SXin Li """Release any locks held. Noop if no locks are held. 197*9c5db199SXin Li 198*9c5db199SXin Li Raises: 199*9c5db199SXin Li IOError if the operation fails in some way. 200*9c5db199SXin Li """ 201*9c5db199SXin Li if self._fd is not None: 202*9c5db199SXin Li logging.debug('%s: lock is being released (%s).', 203*9c5db199SXin Li self.description, self.locktype) 204*9c5db199SXin Li self.locking_mechanism(self._fd, fcntl.LOCK_UN) 205*9c5db199SXin Li 206*9c5db199SXin Li def __del__(self): 207*9c5db199SXin Li # TODO(ferringb): Convert this to snakeoil.weakref.WeakRefFinalizer 208*9c5db199SXin Li # if/when that rebasing occurs. 209*9c5db199SXin Li self.close() 210*9c5db199SXin Li 211*9c5db199SXin Li def close(self): 212*9c5db199SXin Li """Release the underlying lock and close the fd.""" 213*9c5db199SXin Li if self._fd is not None: 214*9c5db199SXin Li self.unlock() 215*9c5db199SXin Li os.close(self._fd) 216*9c5db199SXin Li self._fd = None 217*9c5db199SXin Li 218*9c5db199SXin Li def _enter(self): 219*9c5db199SXin Li # Force the fd to be opened via touching the property. 220*9c5db199SXin Li # We do this to ensure that even if entering a context w/out a lock 221*9c5db199SXin Li # held, we can do locking in that critical section if the code requests it. 222*9c5db199SXin Li # pylint: disable=pointless-statement 223*9c5db199SXin Li self.fd 224*9c5db199SXin Li return self 225*9c5db199SXin Li 226*9c5db199SXin Li def _exit(self, exc_type, exc, exc_tb): 227*9c5db199SXin Li try: 228*9c5db199SXin Li self.unlock() 229*9c5db199SXin Li finally: 230*9c5db199SXin Li self.close() 231*9c5db199SXin Li 232*9c5db199SXin Li def IsLocked(self): 233*9c5db199SXin Li """Return True if the lock is grabbed.""" 234*9c5db199SXin Li return bool(self._fd) 235*9c5db199SXin Li 236*9c5db199SXin Li 237*9c5db199SXin Liclass FileLock(_Lock): 238*9c5db199SXin Li """Use a specified file as a locking mechanism.""" 239*9c5db199SXin Li 240*9c5db199SXin Li def __init__(self, path, description=None, verbose=True, 241*9c5db199SXin Li locktype=LOCKF, world_writable=False, blocking=True, 242*9c5db199SXin Li blocking_timeout=None): 243*9c5db199SXin Li """Initializer for FileLock. 244*9c5db199SXin Li 245*9c5db199SXin Li Args: 246*9c5db199SXin Li path: On disk pathway to lock. Can be a directory or a file. 247*9c5db199SXin Li description: A description for this lock- what is it protecting? 248*9c5db199SXin Li verbose: Verbose logging? 249*9c5db199SXin Li locktype: Type of lock to use (lockf or flock). 250*9c5db199SXin Li world_writable: If true, the lock file will be created as root and be made 251*9c5db199SXin Li writable to all users. 252*9c5db199SXin Li blocking: If True, use a blocking lock. 253*9c5db199SXin Li blocking_timeout: If not None, time is seconds to wait on blocking calls. 254*9c5db199SXin Li """ 255*9c5db199SXin Li if description is None: 256*9c5db199SXin Li description = 'lock %s' % (path,) 257*9c5db199SXin Li _Lock.__init__(self, description=description, verbose=verbose, 258*9c5db199SXin Li locktype=locktype, blocking=blocking, 259*9c5db199SXin Li blocking_timeout=blocking_timeout) 260*9c5db199SXin Li self.path = os.path.abspath(path) 261*9c5db199SXin Li self.world_writable = world_writable 262*9c5db199SXin Li 263*9c5db199SXin Li def _GetFd(self): 264*9c5db199SXin Li if self.world_writable: 265*9c5db199SXin Li create = True 266*9c5db199SXin Li try: 267*9c5db199SXin Li create = stat.S_IMODE(os.stat(self.path).st_mode) != 0o666 268*9c5db199SXin Li except OSError as e: 269*9c5db199SXin Li if e.errno != errno.ENOENT: 270*9c5db199SXin Li raise 271*9c5db199SXin Li if create: 272*9c5db199SXin Li osutils.SafeMakedirs(os.path.dirname(self.path), sudo=True) 273*9c5db199SXin Li cros_build_lib.sudo_run(['touch', self.path], print_cmd=False) 274*9c5db199SXin Li cros_build_lib.sudo_run(['chmod', '666', self.path], print_cmd=False) 275*9c5db199SXin Li 276*9c5db199SXin Li # If we're on py3.4 and this attribute is exposed, use it to close 277*9c5db199SXin Li # the threading race between open and fcntl setting; this is 278*9c5db199SXin Li # extremely paranoid code, but might as well. 279*9c5db199SXin Li cloexec = getattr(os, 'O_CLOEXEC', 0) 280*9c5db199SXin Li # There exist race conditions where the lock may be created by 281*9c5db199SXin Li # root, thus denying subsequent accesses from others. To prevent 282*9c5db199SXin Li # this, we create the lock with mode 0o666. 283*9c5db199SXin Li try: 284*9c5db199SXin Li value = os.umask(000) 285*9c5db199SXin Li fd = os.open(self.path, os.W_OK|os.O_CREAT|cloexec, 0o666) 286*9c5db199SXin Li finally: 287*9c5db199SXin Li os.umask(value) 288*9c5db199SXin Li return fd 289*9c5db199SXin Li 290*9c5db199SXin Li 291*9c5db199SXin Liclass ProcessLock(_Lock): 292*9c5db199SXin Li """Process level locking visible to parent/child only. 293*9c5db199SXin Li 294*9c5db199SXin Li This lock is basically a more robust version of what 295*9c5db199SXin Li multiprocessing.Lock does. That implementation uses semaphores 296*9c5db199SXin Li internally which require cleanup/deallocation code to run to release 297*9c5db199SXin Li the lock; a SIGKILL hitting the process holding the lock violates those 298*9c5db199SXin Li assumptions leading to a stuck lock. 299*9c5db199SXin Li 300*9c5db199SXin Li Thus this implementation is based around locking of a deleted tempfile; 301*9c5db199SXin Li lockf locks are guranteed to be released once the process/fd is closed. 302*9c5db199SXin Li """ 303*9c5db199SXin Li 304*9c5db199SXin Li def _GetFd(self): 305*9c5db199SXin Li with tempfile.TemporaryFile() as f: 306*9c5db199SXin Li # We don't want to hold onto the object indefinitely; we just want 307*9c5db199SXin Li # the fd to a temporary inode, preferably one that isn't vfs accessible. 308*9c5db199SXin Li # Since TemporaryFile closes the fd once the object is GC'd, we just 309*9c5db199SXin Li # dupe the fd so we retain a copy, while the original TemporaryFile 310*9c5db199SXin Li # goes away. 311*9c5db199SXin Li return os.dup(f.fileno()) 312*9c5db199SXin Li 313*9c5db199SXin Li 314*9c5db199SXin Liclass PortableLinkLock(object): 315*9c5db199SXin Li """A more primitive lock that relies on the atomicity of creating hardlinks. 316*9c5db199SXin Li 317*9c5db199SXin Li Use this lock if you need to be compatible with shadow utils like groupadd 318*9c5db199SXin Li or useradd. 319*9c5db199SXin Li """ 320*9c5db199SXin Li 321*9c5db199SXin Li def __init__(self, path, max_retry=0, sleep=1): 322*9c5db199SXin Li """Construct an instance. 323*9c5db199SXin Li 324*9c5db199SXin Li Args: 325*9c5db199SXin Li path: path to file to lock on. Multiple processes attempting to lock the 326*9c5db199SXin Li same path will compete for a system wide lock. 327*9c5db199SXin Li max_retry: maximum number of times to attempt to acquire the lock. 328*9c5db199SXin Li sleep: See retry_util.GenericRetry's sleep parameter. 329*9c5db199SXin Li """ 330*9c5db199SXin Li self._path = path 331*9c5db199SXin Li self._target_path = None 332*9c5db199SXin Li # These two poorly named variables are just passed straight through to 333*9c5db199SXin Li # retry_util.RetryException. 334*9c5db199SXin Li self._max_retry = max_retry 335*9c5db199SXin Li self._sleep = sleep 336*9c5db199SXin Li 337*9c5db199SXin Li def __enter__(self): 338*9c5db199SXin Li fd, self._target_path = tempfile.mkstemp( 339*9c5db199SXin Li prefix=self._path + '.chromite.portablelock.') 340*9c5db199SXin Li os.close(fd) 341*9c5db199SXin Li try: 342*9c5db199SXin Li retry_util.RetryException(OSError, self._max_retry, 343*9c5db199SXin Li os.link, self._target_path, self._path, 344*9c5db199SXin Li sleep=self._sleep) 345*9c5db199SXin Li except OSError: 346*9c5db199SXin Li raise LockNotAcquiredError('Timeout while trying to lock %s' % self._path) 347*9c5db199SXin Li finally: 348*9c5db199SXin Li osutils.SafeUnlink(self._target_path) 349*9c5db199SXin Li 350*9c5db199SXin Li return self 351*9c5db199SXin Li 352*9c5db199SXin Li def __exit__(self, exc_type, exc_val, exc_tb): 353*9c5db199SXin Li try: 354*9c5db199SXin Li if self._target_path: 355*9c5db199SXin Li osutils.SafeUnlink(self._target_path) 356*9c5db199SXin Li finally: 357*9c5db199SXin Li osutils.SafeUnlink(self._path) 358*9c5db199SXin Li 359*9c5db199SXin Li 360*9c5db199SXin Liclass PipeLock(object): 361*9c5db199SXin Li """A simple one-way lock based on pipe(). 362*9c5db199SXin Li 363*9c5db199SXin Li This is used when code is calling os.fork() directly and needs to synchronize 364*9c5db199SXin Li behavior between the two. The same process should not try to use Wait/Post 365*9c5db199SXin Li as it will just see its own results. If you need bidirection locks, you'll 366*9c5db199SXin Li need to create two yourself. 367*9c5db199SXin Li 368*9c5db199SXin Li Be sure to delete the lock when you're done to prevent fd leakage. 369*9c5db199SXin Li """ 370*9c5db199SXin Li 371*9c5db199SXin Li def __init__(self): 372*9c5db199SXin Li # TODO(vapier): Simplify this when we're Python 3 only. 373*9c5db199SXin Li # pylint: disable=using-constant-test 374*9c5db199SXin Li pipe2 = getattr(os, 'pipe2', None) 375*9c5db199SXin Li if pipe2: 376*9c5db199SXin Li cloexec = getattr(os, 'O_CLOEXEC', 0) 377*9c5db199SXin Li # Pylint-1.7 is unable to handle this conditional logic. 378*9c5db199SXin Li # pylint: disable=not-callable 379*9c5db199SXin Li pipes = pipe2(cloexec) 380*9c5db199SXin Li else: 381*9c5db199SXin Li pipes = os.pipe() 382*9c5db199SXin Li self.read_fd, self.write_fd = pipes 383*9c5db199SXin Li 384*9c5db199SXin Li def Wait(self, size=1): 385*9c5db199SXin Li """Read |size| bytes from the pipe. 386*9c5db199SXin Li 387*9c5db199SXin Li Args: 388*9c5db199SXin Li size: How many bytes to read. It must match the length of |data| passed 389*9c5db199SXin Li by the other end during its call to Post. 390*9c5db199SXin Li 391*9c5db199SXin Li Returns: 392*9c5db199SXin Li The data read back. 393*9c5db199SXin Li """ 394*9c5db199SXin Li return os.read(self.read_fd, size) 395*9c5db199SXin Li 396*9c5db199SXin Li def Post(self, data=b'!'): 397*9c5db199SXin Li """Write |data| to the pipe. 398*9c5db199SXin Li 399*9c5db199SXin Li Args: 400*9c5db199SXin Li data: The data to send to the other side calling Wait. It must be of the 401*9c5db199SXin Li exact length that is passed to Wait. 402*9c5db199SXin Li """ 403*9c5db199SXin Li os.write(self.write_fd, data) 404*9c5db199SXin Li 405*9c5db199SXin Li def __del__(self): 406*9c5db199SXin Li os.close(self.read_fd) 407*9c5db199SXin Li os.close(self.write_fd) 408