1 //! A ticket-based mutex. 2 //! 3 //! Waiting threads take a 'ticket' from the lock in the order they arrive and gain access to the lock when their 4 //! ticket is next in the queue. Best-case latency is slightly worse than a regular spinning mutex, but worse-case 5 //! latency is infinitely better. Waiting threads simply need to wait for all threads that come before them in the 6 //! queue to finish. 7 8 use crate::{ 9 atomic::{AtomicUsize, Ordering}, 10 RelaxStrategy, Spin, 11 }; 12 use core::{ 13 cell::UnsafeCell, 14 fmt, 15 marker::PhantomData, 16 ops::{Deref, DerefMut}, 17 }; 18 19 /// A spin-based [ticket lock](https://en.wikipedia.org/wiki/Ticket_lock) providing mutually exclusive access to data. 20 /// 21 /// A ticket lock is analogous to a queue management system for lock requests. When a thread tries to take a lock, it 22 /// is assigned a 'ticket'. It then spins until its ticket becomes next in line. When the lock guard is released, the 23 /// next ticket will be processed. 24 /// 25 /// Ticket locks significantly reduce the worse-case performance of locking at the cost of slightly higher average-time 26 /// overhead. 27 /// 28 /// # Example 29 /// 30 /// ``` 31 /// use spin; 32 /// 33 /// let lock = spin::mutex::TicketMutex::<_>::new(0); 34 /// 35 /// // Modify the data 36 /// *lock.lock() = 2; 37 /// 38 /// // Read the data 39 /// let answer = *lock.lock(); 40 /// assert_eq!(answer, 2); 41 /// ``` 42 /// 43 /// # Thread safety example 44 /// 45 /// ``` 46 /// use spin; 47 /// use std::sync::{Arc, Barrier}; 48 /// 49 /// let thread_count = 1000; 50 /// let spin_mutex = Arc::new(spin::mutex::TicketMutex::<_>::new(0)); 51 /// 52 /// // We use a barrier to ensure the readout happens after all writing 53 /// let barrier = Arc::new(Barrier::new(thread_count + 1)); 54 /// 55 /// for _ in (0..thread_count) { 56 /// let my_barrier = barrier.clone(); 57 /// let my_lock = spin_mutex.clone(); 58 /// std::thread::spawn(move || { 59 /// let mut guard = my_lock.lock(); 60 /// *guard += 1; 61 /// 62 /// // Release the lock to prevent a deadlock 63 /// drop(guard); 64 /// my_barrier.wait(); 65 /// }); 66 /// } 67 /// 68 /// barrier.wait(); 69 /// 70 /// let answer = { *spin_mutex.lock() }; 71 /// assert_eq!(answer, thread_count); 72 /// ``` 73 pub struct TicketMutex<T: ?Sized, R = Spin> { 74 phantom: PhantomData<R>, 75 next_ticket: AtomicUsize, 76 next_serving: AtomicUsize, 77 data: UnsafeCell<T>, 78 } 79 80 /// A guard that protects some data. 81 /// 82 /// When the guard is dropped, the next ticket will be processed. 83 pub struct TicketMutexGuard<'a, T: ?Sized + 'a> { 84 next_serving: &'a AtomicUsize, 85 ticket: usize, 86 data: &'a mut T, 87 } 88 89 unsafe impl<T: ?Sized + Send, R> Sync for TicketMutex<T, R> {} 90 unsafe impl<T: ?Sized + Send, R> Send for TicketMutex<T, R> {} 91 92 impl<T, R> TicketMutex<T, R> { 93 /// Creates a new [`TicketMutex`] wrapping the supplied data. 94 /// 95 /// # Example 96 /// 97 /// ``` 98 /// use spin::mutex::TicketMutex; 99 /// 100 /// static MUTEX: TicketMutex<()> = TicketMutex::<_>::new(()); 101 /// 102 /// fn demo() { 103 /// let lock = MUTEX.lock(); 104 /// // do something with lock 105 /// drop(lock); 106 /// } 107 /// ``` 108 #[inline(always)] new(data: T) -> Self109 pub const fn new(data: T) -> Self { 110 Self { 111 phantom: PhantomData, 112 next_ticket: AtomicUsize::new(0), 113 next_serving: AtomicUsize::new(0), 114 data: UnsafeCell::new(data), 115 } 116 } 117 118 /// Consumes this [`TicketMutex`] and unwraps the underlying data. 119 /// 120 /// # Example 121 /// 122 /// ``` 123 /// let lock = spin::mutex::TicketMutex::<_>::new(42); 124 /// assert_eq!(42, lock.into_inner()); 125 /// ``` 126 #[inline(always)] into_inner(self) -> T127 pub fn into_inner(self) -> T { 128 self.data.into_inner() 129 } 130 /// Returns a mutable pointer to the underying data. 131 /// 132 /// This is mostly meant to be used for applications which require manual unlocking, but where 133 /// storing both the lock and the pointer to the inner data gets inefficient. 134 /// 135 /// # Example 136 /// ``` 137 /// let lock = spin::mutex::SpinMutex::<_>::new(42); 138 /// 139 /// unsafe { 140 /// core::mem::forget(lock.lock()); 141 /// 142 /// assert_eq!(lock.as_mut_ptr().read(), 42); 143 /// lock.as_mut_ptr().write(58); 144 /// 145 /// lock.force_unlock(); 146 /// } 147 /// 148 /// assert_eq!(*lock.lock(), 58); 149 /// 150 /// ``` 151 #[inline(always)] as_mut_ptr(&self) -> *mut T152 pub fn as_mut_ptr(&self) -> *mut T { 153 self.data.get() 154 } 155 } 156 157 impl<T: ?Sized + fmt::Debug, R> fmt::Debug for TicketMutex<T, R> { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result158 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 159 match self.try_lock() { 160 Some(guard) => write!(f, "Mutex {{ data: ") 161 .and_then(|()| (&*guard).fmt(f)) 162 .and_then(|()| write!(f, "}}")), 163 None => write!(f, "Mutex {{ <locked> }}"), 164 } 165 } 166 } 167 168 impl<T: ?Sized, R: RelaxStrategy> TicketMutex<T, R> { 169 /// Locks the [`TicketMutex`] and returns a guard that permits access to the inner data. 170 /// 171 /// The returned data may be dereferenced for data access 172 /// and the lock will be dropped when the guard falls out of scope. 173 /// 174 /// ``` 175 /// let lock = spin::mutex::TicketMutex::<_>::new(0); 176 /// { 177 /// let mut data = lock.lock(); 178 /// // The lock is now locked and the data can be accessed 179 /// *data += 1; 180 /// // The lock is implicitly dropped at the end of the scope 181 /// } 182 /// ``` 183 #[inline(always)] lock(&self) -> TicketMutexGuard<T>184 pub fn lock(&self) -> TicketMutexGuard<T> { 185 let ticket = self.next_ticket.fetch_add(1, Ordering::Relaxed); 186 187 while self.next_serving.load(Ordering::Acquire) != ticket { 188 R::relax(); 189 } 190 191 TicketMutexGuard { 192 next_serving: &self.next_serving, 193 ticket, 194 // Safety 195 // We know that we are the next ticket to be served, 196 // so there's no other thread accessing the data. 197 // 198 // Every other thread has another ticket number so it's 199 // definitely stuck in the spin loop above. 200 data: unsafe { &mut *self.data.get() }, 201 } 202 } 203 } 204 205 impl<T: ?Sized, R> TicketMutex<T, R> { 206 /// Returns `true` if the lock is currently held. 207 /// 208 /// # Safety 209 /// 210 /// This function provides no synchronization guarantees and so its result should be considered 'out of date' 211 /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic. 212 #[inline(always)] is_locked(&self) -> bool213 pub fn is_locked(&self) -> bool { 214 let ticket = self.next_ticket.load(Ordering::Relaxed); 215 self.next_serving.load(Ordering::Relaxed) != ticket 216 } 217 218 /// Force unlock this [`TicketMutex`], by serving the next ticket. 219 /// 220 /// # Safety 221 /// 222 /// This is *extremely* unsafe if the lock is not held by the current 223 /// thread. However, this can be useful in some instances for exposing the 224 /// lock to FFI that doesn't know how to deal with RAII. 225 #[inline(always)] force_unlock(&self)226 pub unsafe fn force_unlock(&self) { 227 self.next_serving.fetch_add(1, Ordering::Release); 228 } 229 230 /// Try to lock this [`TicketMutex`], returning a lock guard if successful. 231 /// 232 /// # Example 233 /// 234 /// ``` 235 /// let lock = spin::mutex::TicketMutex::<_>::new(42); 236 /// 237 /// let maybe_guard = lock.try_lock(); 238 /// assert!(maybe_guard.is_some()); 239 /// 240 /// // `maybe_guard` is still held, so the second call fails 241 /// let maybe_guard2 = lock.try_lock(); 242 /// assert!(maybe_guard2.is_none()); 243 /// ``` 244 #[inline(always)] try_lock(&self) -> Option<TicketMutexGuard<T>>245 pub fn try_lock(&self) -> Option<TicketMutexGuard<T>> { 246 let ticket = self 247 .next_ticket 248 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |ticket| { 249 if self.next_serving.load(Ordering::Acquire) == ticket { 250 Some(ticket + 1) 251 } else { 252 None 253 } 254 }); 255 256 ticket.ok().map(|ticket| TicketMutexGuard { 257 next_serving: &self.next_serving, 258 ticket, 259 // Safety 260 // We have a ticket that is equal to the next_serving ticket, so we know: 261 // - that no other thread can have the same ticket id as this thread 262 // - that we are the next one to be served so we have exclusive access to the data 263 data: unsafe { &mut *self.data.get() }, 264 }) 265 } 266 267 /// Returns a mutable reference to the underlying data. 268 /// 269 /// Since this call borrows the [`TicketMutex`] mutably, and a mutable reference is guaranteed to be exclusive in 270 /// Rust, no actual locking needs to take place -- the mutable borrow statically guarantees no locks exist. As 271 /// such, this is a 'zero-cost' operation. 272 /// 273 /// # Example 274 /// 275 /// ``` 276 /// let mut lock = spin::mutex::TicketMutex::<_>::new(0); 277 /// *lock.get_mut() = 10; 278 /// assert_eq!(*lock.lock(), 10); 279 /// ``` 280 #[inline(always)] get_mut(&mut self) -> &mut T281 pub fn get_mut(&mut self) -> &mut T { 282 // Safety: 283 // We know that there are no other references to `self`, 284 // so it's safe to return a exclusive reference to the data. 285 unsafe { &mut *self.data.get() } 286 } 287 } 288 289 impl<T: ?Sized + Default, R> Default for TicketMutex<T, R> { default() -> Self290 fn default() -> Self { 291 Self::new(Default::default()) 292 } 293 } 294 295 impl<T, R> From<T> for TicketMutex<T, R> { from(data: T) -> Self296 fn from(data: T) -> Self { 297 Self::new(data) 298 } 299 } 300 301 impl<'a, T: ?Sized> TicketMutexGuard<'a, T> { 302 /// Leak the lock guard, yielding a mutable reference to the underlying data. 303 /// 304 /// Note that this function will permanently lock the original [`TicketMutex`]. 305 /// 306 /// ``` 307 /// let mylock = spin::mutex::TicketMutex::<_>::new(0); 308 /// 309 /// let data: &mut i32 = spin::mutex::TicketMutexGuard::leak(mylock.lock()); 310 /// 311 /// *data = 1; 312 /// assert_eq!(*data, 1); 313 /// ``` 314 #[inline(always)] leak(this: Self) -> &'a mut T315 pub fn leak(this: Self) -> &'a mut T { 316 let data = this.data as *mut _; // Keep it in pointer form temporarily to avoid double-aliasing 317 core::mem::forget(this); 318 unsafe { &mut *data } 319 } 320 } 321 322 impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for TicketMutexGuard<'a, T> { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result323 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 324 fmt::Debug::fmt(&**self, f) 325 } 326 } 327 328 impl<'a, T: ?Sized + fmt::Display> fmt::Display for TicketMutexGuard<'a, T> { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result329 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 330 fmt::Display::fmt(&**self, f) 331 } 332 } 333 334 impl<'a, T: ?Sized> Deref for TicketMutexGuard<'a, T> { 335 type Target = T; deref(&self) -> &T336 fn deref(&self) -> &T { 337 self.data 338 } 339 } 340 341 impl<'a, T: ?Sized> DerefMut for TicketMutexGuard<'a, T> { deref_mut(&mut self) -> &mut T342 fn deref_mut(&mut self) -> &mut T { 343 self.data 344 } 345 } 346 347 impl<'a, T: ?Sized> Drop for TicketMutexGuard<'a, T> { drop(&mut self)348 fn drop(&mut self) { 349 let new_ticket = self.ticket + 1; 350 self.next_serving.store(new_ticket, Ordering::Release); 351 } 352 } 353 354 #[cfg(feature = "lock_api")] 355 unsafe impl<R: RelaxStrategy> lock_api_crate::RawMutex for TicketMutex<(), R> { 356 type GuardMarker = lock_api_crate::GuardSend; 357 358 const INIT: Self = Self::new(()); 359 lock(&self)360 fn lock(&self) { 361 // Prevent guard destructor running 362 core::mem::forget(Self::lock(self)); 363 } 364 try_lock(&self) -> bool365 fn try_lock(&self) -> bool { 366 // Prevent guard destructor running 367 Self::try_lock(self).map(core::mem::forget).is_some() 368 } 369 unlock(&self)370 unsafe fn unlock(&self) { 371 self.force_unlock(); 372 } 373 is_locked(&self) -> bool374 fn is_locked(&self) -> bool { 375 Self::is_locked(self) 376 } 377 } 378 379 #[cfg(test)] 380 mod tests { 381 use std::prelude::v1::*; 382 383 use std::sync::atomic::{AtomicUsize, Ordering}; 384 use std::sync::mpsc::channel; 385 use std::sync::Arc; 386 use std::thread; 387 388 type TicketMutex<T> = super::TicketMutex<T>; 389 390 #[derive(Eq, PartialEq, Debug)] 391 struct NonCopy(i32); 392 393 #[test] smoke()394 fn smoke() { 395 let m = TicketMutex::<_>::new(()); 396 drop(m.lock()); 397 drop(m.lock()); 398 } 399 400 #[test] lots_and_lots()401 fn lots_and_lots() { 402 static M: TicketMutex<()> = TicketMutex::<_>::new(()); 403 static mut CNT: u32 = 0; 404 const J: u32 = 1000; 405 const K: u32 = 3; 406 407 fn inc() { 408 for _ in 0..J { 409 unsafe { 410 let _g = M.lock(); 411 CNT += 1; 412 } 413 } 414 } 415 416 let (tx, rx) = channel(); 417 for _ in 0..K { 418 let tx2 = tx.clone(); 419 thread::spawn(move || { 420 inc(); 421 tx2.send(()).unwrap(); 422 }); 423 let tx2 = tx.clone(); 424 thread::spawn(move || { 425 inc(); 426 tx2.send(()).unwrap(); 427 }); 428 } 429 430 drop(tx); 431 for _ in 0..2 * K { 432 rx.recv().unwrap(); 433 } 434 assert_eq!(unsafe { CNT }, J * K * 2); 435 } 436 437 #[test] try_lock()438 fn try_lock() { 439 let mutex = TicketMutex::<_>::new(42); 440 441 // First lock succeeds 442 let a = mutex.try_lock(); 443 assert_eq!(a.as_ref().map(|r| **r), Some(42)); 444 445 // Additional lock fails 446 let b = mutex.try_lock(); 447 assert!(b.is_none()); 448 449 // After dropping lock, it succeeds again 450 ::core::mem::drop(a); 451 let c = mutex.try_lock(); 452 assert_eq!(c.as_ref().map(|r| **r), Some(42)); 453 } 454 455 #[test] test_into_inner()456 fn test_into_inner() { 457 let m = TicketMutex::<_>::new(NonCopy(10)); 458 assert_eq!(m.into_inner(), NonCopy(10)); 459 } 460 461 #[test] test_into_inner_drop()462 fn test_into_inner_drop() { 463 struct Foo(Arc<AtomicUsize>); 464 impl Drop for Foo { 465 fn drop(&mut self) { 466 self.0.fetch_add(1, Ordering::SeqCst); 467 } 468 } 469 let num_drops = Arc::new(AtomicUsize::new(0)); 470 let m = TicketMutex::<_>::new(Foo(num_drops.clone())); 471 assert_eq!(num_drops.load(Ordering::SeqCst), 0); 472 { 473 let _inner = m.into_inner(); 474 assert_eq!(num_drops.load(Ordering::SeqCst), 0); 475 } 476 assert_eq!(num_drops.load(Ordering::SeqCst), 1); 477 } 478 479 #[test] test_mutex_arc_nested()480 fn test_mutex_arc_nested() { 481 // Tests nested mutexes and access 482 // to underlying data. 483 let arc = Arc::new(TicketMutex::<_>::new(1)); 484 let arc2 = Arc::new(TicketMutex::<_>::new(arc)); 485 let (tx, rx) = channel(); 486 let _t = thread::spawn(move || { 487 let lock = arc2.lock(); 488 let lock2 = lock.lock(); 489 assert_eq!(*lock2, 1); 490 tx.send(()).unwrap(); 491 }); 492 rx.recv().unwrap(); 493 } 494 495 #[test] 496 #[ignore = "Android uses panic_abort"] test_mutex_arc_access_in_unwind()497 fn test_mutex_arc_access_in_unwind() { 498 let arc = Arc::new(TicketMutex::<_>::new(1)); 499 let arc2 = arc.clone(); 500 let _ = thread::spawn(move || -> () { 501 struct Unwinder { 502 i: Arc<TicketMutex<i32>>, 503 } 504 impl Drop for Unwinder { 505 fn drop(&mut self) { 506 *self.i.lock() += 1; 507 } 508 } 509 let _u = Unwinder { i: arc2 }; 510 panic!(); 511 }) 512 .join(); 513 let lock = arc.lock(); 514 assert_eq!(*lock, 2); 515 } 516 517 #[test] test_mutex_unsized()518 fn test_mutex_unsized() { 519 let mutex: &TicketMutex<[i32]> = &TicketMutex::<_>::new([1, 2, 3]); 520 { 521 let b = &mut *mutex.lock(); 522 b[0] = 4; 523 b[2] = 5; 524 } 525 let comp: &[i32] = &[4, 2, 5]; 526 assert_eq!(&*mutex.lock(), comp); 527 } 528 529 #[test] is_locked()530 fn is_locked() { 531 let mutex = TicketMutex::<_>::new(()); 532 assert!(!mutex.is_locked()); 533 let lock = mutex.lock(); 534 assert!(mutex.is_locked()); 535 drop(lock); 536 assert!(!mutex.is_locked()); 537 } 538 } 539