xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/lib/locking.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# -*- coding: utf-8 -*-
2*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li
6*9c5db199SXin Li"""Basic locking functionality."""
7*9c5db199SXin Li
8*9c5db199SXin Lifrom __future__ import print_function
9*9c5db199SXin Li
10*9c5db199SXin Liimport contextlib
11*9c5db199SXin Liimport os
12*9c5db199SXin Liimport errno
13*9c5db199SXin Liimport fcntl
14*9c5db199SXin Liimport stat
15*9c5db199SXin Liimport tempfile
16*9c5db199SXin Li
17*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import cros_build_lib
18*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import cros_logging as logging
19*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import osutils
20*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import retry_util
21*9c5db199SXin Lifrom autotest_lib.utils.frozen_chromite.lib import timeout_util
22*9c5db199SXin Li
23*9c5db199SXin Li
24*9c5db199SXin LiLOCKF = 'lockf'
25*9c5db199SXin LiFLOCK = 'flock'
26*9c5db199SXin Li
27*9c5db199SXin Li
28*9c5db199SXin Liclass LockNotAcquiredError(Exception):
29*9c5db199SXin Li  """Signals that the lock was not acquired."""
30*9c5db199SXin Li
31*9c5db199SXin Li
32*9c5db199SXin Liclass LockingError(Exception):
33*9c5db199SXin Li  """Signals miscellaneous problems in the locking process."""
34*9c5db199SXin Li
35*9c5db199SXin Li
36*9c5db199SXin Li@contextlib.contextmanager
37*9c5db199SXin Lidef _optional_timer_context(timeout):
38*9c5db199SXin Li  """Use the timeout_util.Timeout contextmanager if timeout is set."""
39*9c5db199SXin Li  if timeout:
40*9c5db199SXin Li    with timeout_util.Timeout(timeout):
41*9c5db199SXin Li      yield
42*9c5db199SXin Li  else:
43*9c5db199SXin Li    yield
44*9c5db199SXin Li
45*9c5db199SXin Li
46*9c5db199SXin Liclass _Lock(cros_build_lib.MasterPidContextManager):
47*9c5db199SXin Li  """Base lockf based locking.  Derivatives need to override _GetFd"""
48*9c5db199SXin Li
49*9c5db199SXin Li  def __init__(self, description=None, verbose=True, locktype=LOCKF,
50*9c5db199SXin Li               blocking=True, blocking_timeout=None):
51*9c5db199SXin Li    """Initialize this instance.
52*9c5db199SXin Li
53*9c5db199SXin Li    Two types of locks are available: LOCKF and FLOCK.
54*9c5db199SXin Li
55*9c5db199SXin Li    Use LOCKF (POSIX locks) if:
56*9c5db199SXin Li      - you need to lock a file between processes created by the
57*9c5db199SXin Li        parallel/multiprocess libraries
58*9c5db199SXin Li
59*9c5db199SXin Li    Use FLOCK (BSD locks) if these scenarios apply:
60*9c5db199SXin Li      - you need to lock a file between shell scripts running the flock program
61*9c5db199SXin Li      - you need the lock to be bound to the fd and thus inheritable across
62*9c5db199SXin Li        execs
63*9c5db199SXin Li
64*9c5db199SXin Li    Note: These two locks are completely independent; using one on a path will
65*9c5db199SXin Li          not block using the other on the same path.
66*9c5db199SXin Li
67*9c5db199SXin Li    Args:
68*9c5db199SXin Li      path: On disk pathway to lock.  Can be a directory or a file.
69*9c5db199SXin Li      description: A description for this lock- what is it protecting?
70*9c5db199SXin Li      verbose: Verbose logging?
71*9c5db199SXin Li      locktype: Type of lock to use (lockf or flock).
72*9c5db199SXin Li      blocking: If True, use a blocking lock.
73*9c5db199SXin Li      blocking_timeout: If not None, time is seconds to wait on blocking calls.
74*9c5db199SXin Li    """
75*9c5db199SXin Li    cros_build_lib.MasterPidContextManager.__init__(self)
76*9c5db199SXin Li    self._verbose = verbose
77*9c5db199SXin Li    self.description = description
78*9c5db199SXin Li    self._fd = None
79*9c5db199SXin Li    self.locking_mechanism = fcntl.flock if locktype == FLOCK else fcntl.lockf
80*9c5db199SXin Li    # Store (to log) the locktype string.
81*9c5db199SXin Li    self.locktype = locktype
82*9c5db199SXin Li    self.blocking = blocking
83*9c5db199SXin Li    self.blocking_timeout = blocking_timeout
84*9c5db199SXin Li
85*9c5db199SXin Li  @property
86*9c5db199SXin Li  def fd(self):
87*9c5db199SXin Li    if self._fd is None:
88*9c5db199SXin Li      self._fd = self._GetFd()
89*9c5db199SXin Li      # Ensure that all derivatives of this lock can't bleed the fd
90*9c5db199SXin Li      # across execs.
91*9c5db199SXin Li      fcntl.fcntl(self._fd, fcntl.F_SETFD,
92*9c5db199SXin Li                  fcntl.fcntl(self._fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
93*9c5db199SXin Li    return self._fd
94*9c5db199SXin Li
95*9c5db199SXin Li  def _GetFd(self):
96*9c5db199SXin Li    raise NotImplementedError(self, '_GetFd')
97*9c5db199SXin Li
98*9c5db199SXin Li  def _enforce_lock(self, flags, message):
99*9c5db199SXin Li    # Try nonblocking first, if it fails, display the context/message,
100*9c5db199SXin Li    # and then wait on the lock.
101*9c5db199SXin Li    try:
102*9c5db199SXin Li      self.locking_mechanism(self.fd, flags|fcntl.LOCK_NB)
103*9c5db199SXin Li      return
104*9c5db199SXin Li    except EnvironmentError as e:
105*9c5db199SXin Li      if e.errno == errno.EDEADLK:
106*9c5db199SXin Li        self.unlock()
107*9c5db199SXin Li      elif e.errno != errno.EAGAIN:
108*9c5db199SXin Li        raise
109*9c5db199SXin Li    if self.description:
110*9c5db199SXin Li      message = '%s: blocking (LOCK_NB) (%s) while %s' % (self.description,
111*9c5db199SXin Li                                                          self.locktype,
112*9c5db199SXin Li                                                          message)
113*9c5db199SXin Li    if not self.blocking:
114*9c5db199SXin Li      self.close()
115*9c5db199SXin Li      raise LockNotAcquiredError(message)
116*9c5db199SXin Li    if self._verbose:
117*9c5db199SXin Li      logging.info(message)
118*9c5db199SXin Li
119*9c5db199SXin Li    try:
120*9c5db199SXin Li      with _optional_timer_context(self.blocking_timeout):
121*9c5db199SXin Li        self.locking_mechanism(self.fd, flags)
122*9c5db199SXin Li    except timeout_util.TimeoutError:
123*9c5db199SXin Li      description = self.description or 'locking._enforce_lock'
124*9c5db199SXin Li      logging.error(
125*9c5db199SXin Li          'Timed out after waiting %d seconds for blocking lock (%s): %s',
126*9c5db199SXin Li          self.blocking_timeout, self.locktype, description)
127*9c5db199SXin Li      raise
128*9c5db199SXin Li    except EnvironmentError as e:
129*9c5db199SXin Li      if e.errno != errno.EDEADLK:
130*9c5db199SXin Li        message = ('%s: blocking wait failed errno %s'
131*9c5db199SXin Li                   % (self.description, e))
132*9c5db199SXin Li        raise
133*9c5db199SXin Li      self.unlock()
134*9c5db199SXin Li      self.locking_mechanism(self.fd, flags)
135*9c5db199SXin Li    logging.debug('%s: lock has been acquired (%s), continuing.',
136*9c5db199SXin Li                  self.description, self.locktype)
137*9c5db199SXin Li
138*9c5db199SXin Li  def lock(self, shared=False):
139*9c5db199SXin Li    """Take a lock of type |shared|.
140*9c5db199SXin Li
141*9c5db199SXin Li    Any existing lock will be updated if need be.
142*9c5db199SXin Li
143*9c5db199SXin Li    Args:
144*9c5db199SXin Li      shared: If True make the lock shared.
145*9c5db199SXin Li
146*9c5db199SXin Li    Returns:
147*9c5db199SXin Li      self, allowing it to be used as a `with` target.
148*9c5db199SXin Li
149*9c5db199SXin Li    Raises:
150*9c5db199SXin Li      IOError if the operation fails in some way.
151*9c5db199SXin Li      LockNotAcquiredError if the lock couldn't be acquired (non-blocking
152*9c5db199SXin Li        mode only).
153*9c5db199SXin Li    """
154*9c5db199SXin Li    self._enforce_lock(
155*9c5db199SXin Li        fcntl.LOCK_SH if shared else fcntl.LOCK_EX,
156*9c5db199SXin Li        'taking a %s lock' % ('shared' if shared else 'exclusive'))
157*9c5db199SXin Li    return self
158*9c5db199SXin Li
159*9c5db199SXin Li  def read_lock(self, message='taking read lock'):
160*9c5db199SXin Li    """Take a read lock (shared), downgrading from write if required.
161*9c5db199SXin Li
162*9c5db199SXin Li    Args:
163*9c5db199SXin Li      message: A description of what/why this lock is being taken.
164*9c5db199SXin Li
165*9c5db199SXin Li    Returns:
166*9c5db199SXin Li      self, allowing it to be used as a `with` target.
167*9c5db199SXin Li
168*9c5db199SXin Li    Raises:
169*9c5db199SXin Li      IOError if the operation fails in some way.
170*9c5db199SXin Li    """
171*9c5db199SXin Li    self._enforce_lock(fcntl.LOCK_SH, message)
172*9c5db199SXin Li    return self
173*9c5db199SXin Li
174*9c5db199SXin Li  def write_lock(self, message='taking write lock'):
175*9c5db199SXin Li    """Take a write lock (exclusive), upgrading from read if required.
176*9c5db199SXin Li
177*9c5db199SXin Li    Note that if the lock state is being upgraded from read to write,
178*9c5db199SXin Li    a deadlock potential exists- as such we *will* release the lock
179*9c5db199SXin Li    to work around it.  Any consuming code should not assume that
180*9c5db199SXin Li    transitioning from shared to exclusive means no one else has
181*9c5db199SXin Li    gotten at the critical resource in between for this reason.
182*9c5db199SXin Li
183*9c5db199SXin Li    Args:
184*9c5db199SXin Li      message: A description of what/why this lock is being taken.
185*9c5db199SXin Li
186*9c5db199SXin Li    Returns:
187*9c5db199SXin Li      self, allowing it to be used as a `with` target.
188*9c5db199SXin Li
189*9c5db199SXin Li    Raises:
190*9c5db199SXin Li      IOError if the operation fails in some way.
191*9c5db199SXin Li    """
192*9c5db199SXin Li    self._enforce_lock(fcntl.LOCK_EX, message)
193*9c5db199SXin Li    return self
194*9c5db199SXin Li
195*9c5db199SXin Li  def unlock(self):
196*9c5db199SXin Li    """Release any locks held.  Noop if no locks are held.
197*9c5db199SXin Li
198*9c5db199SXin Li    Raises:
199*9c5db199SXin Li      IOError if the operation fails in some way.
200*9c5db199SXin Li    """
201*9c5db199SXin Li    if self._fd is not None:
202*9c5db199SXin Li      logging.debug('%s: lock is being released (%s).',
203*9c5db199SXin Li                    self.description, self.locktype)
204*9c5db199SXin Li      self.locking_mechanism(self._fd, fcntl.LOCK_UN)
205*9c5db199SXin Li
206*9c5db199SXin Li  def __del__(self):
207*9c5db199SXin Li    # TODO(ferringb): Convert this to snakeoil.weakref.WeakRefFinalizer
208*9c5db199SXin Li    # if/when that rebasing occurs.
209*9c5db199SXin Li    self.close()
210*9c5db199SXin Li
211*9c5db199SXin Li  def close(self):
212*9c5db199SXin Li    """Release the underlying lock and close the fd."""
213*9c5db199SXin Li    if self._fd is not None:
214*9c5db199SXin Li      self.unlock()
215*9c5db199SXin Li      os.close(self._fd)
216*9c5db199SXin Li      self._fd = None
217*9c5db199SXin Li
218*9c5db199SXin Li  def _enter(self):
219*9c5db199SXin Li    # Force the fd to be opened via touching the property.
220*9c5db199SXin Li    # We do this to ensure that even if entering a context w/out a lock
221*9c5db199SXin Li    # held, we can do locking in that critical section if the code requests it.
222*9c5db199SXin Li    # pylint: disable=pointless-statement
223*9c5db199SXin Li    self.fd
224*9c5db199SXin Li    return self
225*9c5db199SXin Li
226*9c5db199SXin Li  def _exit(self, exc_type, exc, exc_tb):
227*9c5db199SXin Li    try:
228*9c5db199SXin Li      self.unlock()
229*9c5db199SXin Li    finally:
230*9c5db199SXin Li      self.close()
231*9c5db199SXin Li
232*9c5db199SXin Li  def IsLocked(self):
233*9c5db199SXin Li    """Return True if the lock is grabbed."""
234*9c5db199SXin Li    return bool(self._fd)
235*9c5db199SXin Li
236*9c5db199SXin Li
237*9c5db199SXin Liclass FileLock(_Lock):
238*9c5db199SXin Li  """Use a specified file as a locking mechanism."""
239*9c5db199SXin Li
240*9c5db199SXin Li  def __init__(self, path, description=None, verbose=True,
241*9c5db199SXin Li               locktype=LOCKF, world_writable=False, blocking=True,
242*9c5db199SXin Li               blocking_timeout=None):
243*9c5db199SXin Li    """Initializer for FileLock.
244*9c5db199SXin Li
245*9c5db199SXin Li    Args:
246*9c5db199SXin Li      path: On disk pathway to lock.  Can be a directory or a file.
247*9c5db199SXin Li      description: A description for this lock- what is it protecting?
248*9c5db199SXin Li      verbose: Verbose logging?
249*9c5db199SXin Li      locktype: Type of lock to use (lockf or flock).
250*9c5db199SXin Li      world_writable: If true, the lock file will be created as root and be made
251*9c5db199SXin Li        writable to all users.
252*9c5db199SXin Li      blocking: If True, use a blocking lock.
253*9c5db199SXin Li      blocking_timeout: If not None, time is seconds to wait on blocking calls.
254*9c5db199SXin Li    """
255*9c5db199SXin Li    if description is None:
256*9c5db199SXin Li      description = 'lock %s' % (path,)
257*9c5db199SXin Li    _Lock.__init__(self, description=description, verbose=verbose,
258*9c5db199SXin Li                   locktype=locktype, blocking=blocking,
259*9c5db199SXin Li                   blocking_timeout=blocking_timeout)
260*9c5db199SXin Li    self.path = os.path.abspath(path)
261*9c5db199SXin Li    self.world_writable = world_writable
262*9c5db199SXin Li
263*9c5db199SXin Li  def _GetFd(self):
264*9c5db199SXin Li    if self.world_writable:
265*9c5db199SXin Li      create = True
266*9c5db199SXin Li      try:
267*9c5db199SXin Li        create = stat.S_IMODE(os.stat(self.path).st_mode) != 0o666
268*9c5db199SXin Li      except OSError as e:
269*9c5db199SXin Li        if e.errno != errno.ENOENT:
270*9c5db199SXin Li          raise
271*9c5db199SXin Li      if create:
272*9c5db199SXin Li        osutils.SafeMakedirs(os.path.dirname(self.path), sudo=True)
273*9c5db199SXin Li        cros_build_lib.sudo_run(['touch', self.path], print_cmd=False)
274*9c5db199SXin Li        cros_build_lib.sudo_run(['chmod', '666', self.path], print_cmd=False)
275*9c5db199SXin Li
276*9c5db199SXin Li    # If we're on py3.4 and this attribute is exposed, use it to close
277*9c5db199SXin Li    # the threading race between open and fcntl setting; this is
278*9c5db199SXin Li    # extremely paranoid code, but might as well.
279*9c5db199SXin Li    cloexec = getattr(os, 'O_CLOEXEC', 0)
280*9c5db199SXin Li    # There exist race conditions where the lock may be created by
281*9c5db199SXin Li    # root, thus denying subsequent accesses from others. To prevent
282*9c5db199SXin Li    # this, we create the lock with mode 0o666.
283*9c5db199SXin Li    try:
284*9c5db199SXin Li      value = os.umask(000)
285*9c5db199SXin Li      fd = os.open(self.path, os.W_OK|os.O_CREAT|cloexec, 0o666)
286*9c5db199SXin Li    finally:
287*9c5db199SXin Li      os.umask(value)
288*9c5db199SXin Li    return fd
289*9c5db199SXin Li
290*9c5db199SXin Li
291*9c5db199SXin Liclass ProcessLock(_Lock):
292*9c5db199SXin Li  """Process level locking visible to parent/child only.
293*9c5db199SXin Li
294*9c5db199SXin Li  This lock is basically a more robust version of what
295*9c5db199SXin Li  multiprocessing.Lock does.  That implementation uses semaphores
296*9c5db199SXin Li  internally which require cleanup/deallocation code to run to release
297*9c5db199SXin Li  the lock; a SIGKILL hitting the process holding the lock violates those
298*9c5db199SXin Li  assumptions leading to a stuck lock.
299*9c5db199SXin Li
300*9c5db199SXin Li  Thus this implementation is based around locking of a deleted tempfile;
301*9c5db199SXin Li  lockf locks are guranteed to be released once the process/fd is closed.
302*9c5db199SXin Li  """
303*9c5db199SXin Li
304*9c5db199SXin Li  def _GetFd(self):
305*9c5db199SXin Li    with tempfile.TemporaryFile() as f:
306*9c5db199SXin Li      # We don't want to hold onto the object indefinitely; we just want
307*9c5db199SXin Li      # the fd to a temporary inode, preferably one that isn't vfs accessible.
308*9c5db199SXin Li      # Since TemporaryFile closes the fd once the object is GC'd, we just
309*9c5db199SXin Li      # dupe the fd so we retain a copy, while the original TemporaryFile
310*9c5db199SXin Li      # goes away.
311*9c5db199SXin Li      return os.dup(f.fileno())
312*9c5db199SXin Li
313*9c5db199SXin Li
314*9c5db199SXin Liclass PortableLinkLock(object):
315*9c5db199SXin Li  """A more primitive lock that relies on the atomicity of creating hardlinks.
316*9c5db199SXin Li
317*9c5db199SXin Li  Use this lock if you need to be compatible with shadow utils like groupadd
318*9c5db199SXin Li  or useradd.
319*9c5db199SXin Li  """
320*9c5db199SXin Li
321*9c5db199SXin Li  def __init__(self, path, max_retry=0, sleep=1):
322*9c5db199SXin Li    """Construct an instance.
323*9c5db199SXin Li
324*9c5db199SXin Li    Args:
325*9c5db199SXin Li      path: path to file to lock on.  Multiple processes attempting to lock the
326*9c5db199SXin Li        same path will compete for a system wide lock.
327*9c5db199SXin Li      max_retry: maximum number of times to attempt to acquire the lock.
328*9c5db199SXin Li      sleep: See retry_util.GenericRetry's sleep parameter.
329*9c5db199SXin Li    """
330*9c5db199SXin Li    self._path = path
331*9c5db199SXin Li    self._target_path = None
332*9c5db199SXin Li    # These two poorly named variables are just passed straight through to
333*9c5db199SXin Li    # retry_util.RetryException.
334*9c5db199SXin Li    self._max_retry = max_retry
335*9c5db199SXin Li    self._sleep = sleep
336*9c5db199SXin Li
337*9c5db199SXin Li  def __enter__(self):
338*9c5db199SXin Li    fd, self._target_path = tempfile.mkstemp(
339*9c5db199SXin Li        prefix=self._path + '.chromite.portablelock.')
340*9c5db199SXin Li    os.close(fd)
341*9c5db199SXin Li    try:
342*9c5db199SXin Li      retry_util.RetryException(OSError, self._max_retry,
343*9c5db199SXin Li                                os.link, self._target_path, self._path,
344*9c5db199SXin Li                                sleep=self._sleep)
345*9c5db199SXin Li    except OSError:
346*9c5db199SXin Li      raise LockNotAcquiredError('Timeout while trying to lock %s' % self._path)
347*9c5db199SXin Li    finally:
348*9c5db199SXin Li      osutils.SafeUnlink(self._target_path)
349*9c5db199SXin Li
350*9c5db199SXin Li    return self
351*9c5db199SXin Li
352*9c5db199SXin Li  def __exit__(self, exc_type, exc_val, exc_tb):
353*9c5db199SXin Li    try:
354*9c5db199SXin Li      if self._target_path:
355*9c5db199SXin Li        osutils.SafeUnlink(self._target_path)
356*9c5db199SXin Li    finally:
357*9c5db199SXin Li      osutils.SafeUnlink(self._path)
358*9c5db199SXin Li
359*9c5db199SXin Li
360*9c5db199SXin Liclass PipeLock(object):
361*9c5db199SXin Li  """A simple one-way lock based on pipe().
362*9c5db199SXin Li
363*9c5db199SXin Li  This is used when code is calling os.fork() directly and needs to synchronize
364*9c5db199SXin Li  behavior between the two.  The same process should not try to use Wait/Post
365*9c5db199SXin Li  as it will just see its own results.  If you need bidirection locks, you'll
366*9c5db199SXin Li  need to create two yourself.
367*9c5db199SXin Li
368*9c5db199SXin Li  Be sure to delete the lock when you're done to prevent fd leakage.
369*9c5db199SXin Li  """
370*9c5db199SXin Li
371*9c5db199SXin Li  def __init__(self):
372*9c5db199SXin Li    # TODO(vapier): Simplify this when we're Python 3 only.
373*9c5db199SXin Li    # pylint: disable=using-constant-test
374*9c5db199SXin Li    pipe2 = getattr(os, 'pipe2', None)
375*9c5db199SXin Li    if pipe2:
376*9c5db199SXin Li      cloexec = getattr(os, 'O_CLOEXEC', 0)
377*9c5db199SXin Li      # Pylint-1.7 is unable to handle this conditional logic.
378*9c5db199SXin Li      # pylint: disable=not-callable
379*9c5db199SXin Li      pipes = pipe2(cloexec)
380*9c5db199SXin Li    else:
381*9c5db199SXin Li      pipes = os.pipe()
382*9c5db199SXin Li    self.read_fd, self.write_fd = pipes
383*9c5db199SXin Li
384*9c5db199SXin Li  def Wait(self, size=1):
385*9c5db199SXin Li    """Read |size| bytes from the pipe.
386*9c5db199SXin Li
387*9c5db199SXin Li    Args:
388*9c5db199SXin Li      size: How many bytes to read.  It must match the length of |data| passed
389*9c5db199SXin Li        by the other end during its call to Post.
390*9c5db199SXin Li
391*9c5db199SXin Li    Returns:
392*9c5db199SXin Li      The data read back.
393*9c5db199SXin Li    """
394*9c5db199SXin Li    return os.read(self.read_fd, size)
395*9c5db199SXin Li
396*9c5db199SXin Li  def Post(self, data=b'!'):
397*9c5db199SXin Li    """Write |data| to the pipe.
398*9c5db199SXin Li
399*9c5db199SXin Li    Args:
400*9c5db199SXin Li      data: The data to send to the other side calling Wait.  It must be of the
401*9c5db199SXin Li        exact length that is passed to Wait.
402*9c5db199SXin Li    """
403*9c5db199SXin Li    os.write(self.write_fd, data)
404*9c5db199SXin Li
405*9c5db199SXin Li  def __del__(self):
406*9c5db199SXin Li    os.close(self.read_fd)
407*9c5db199SXin Li    os.close(self.write_fd)
408