1 //! Michael-Scott lock-free queue. 2 //! 3 //! Usable with any number of producers and consumers. 4 //! 5 //! Michael and Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue 6 //! Algorithms. PODC 1996. <http://dl.acm.org/citation.cfm?id=248106> 7 //! 8 //! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a 9 //! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7> 10 11 use core::mem::MaybeUninit; 12 use core::sync::atomic::Ordering::{Acquire, Relaxed, Release}; 13 14 use crossbeam_utils::CachePadded; 15 16 use crate::{unprotected, Atomic, Guard, Owned, Shared}; 17 18 // The representation here is a singly-linked list, with a sentinel node at the front. In general 19 // the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or 20 // all `Blocked` (requests for data from blocked threads). 21 #[derive(Debug)] 22 pub(crate) struct Queue<T> { 23 head: CachePadded<Atomic<Node<T>>>, 24 tail: CachePadded<Atomic<Node<T>>>, 25 } 26 27 struct Node<T> { 28 /// The slot in which a value of type `T` can be stored. 29 /// 30 /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`. 31 /// For example, the sentinel node in a queue never contains a value: its slot is always empty. 32 /// Other nodes start their life with a push operation and contain a value until it gets popped 33 /// out. After that such empty nodes get added to the collector for destruction. 34 data: MaybeUninit<T>, 35 36 next: Atomic<Node<T>>, 37 } 38 39 // Any particular `T` should never be accessed concurrently, so no need for `Sync`. 40 unsafe impl<T: Send> Sync for Queue<T> {} 41 unsafe impl<T: Send> Send for Queue<T> {} 42 43 impl<T> Queue<T> { 44 /// Create a new, empty queue. new() -> Queue<T>45 pub(crate) fn new() -> Queue<T> { 46 let q = Queue { 47 head: CachePadded::new(Atomic::null()), 48 tail: CachePadded::new(Atomic::null()), 49 }; 50 let sentinel = Owned::new(Node { 51 data: MaybeUninit::uninit(), 52 next: Atomic::null(), 53 }); 54 unsafe { 55 let guard = unprotected(); 56 let sentinel = sentinel.into_shared(guard); 57 q.head.store(sentinel, Relaxed); 58 q.tail.store(sentinel, Relaxed); 59 q 60 } 61 } 62 63 /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on 64 /// success. The queue's `tail` pointer may be updated. 65 #[inline(always)] push_internal( &self, onto: Shared<'_, Node<T>>, new: Shared<'_, Node<T>>, guard: &Guard, ) -> bool66 fn push_internal( 67 &self, 68 onto: Shared<'_, Node<T>>, 69 new: Shared<'_, Node<T>>, 70 guard: &Guard, 71 ) -> bool { 72 // is `onto` the actual tail? 73 let o = unsafe { onto.deref() }; 74 let next = o.next.load(Acquire, guard); 75 if unsafe { next.as_ref().is_some() } { 76 // if not, try to "help" by moving the tail pointer forward 77 let _ = self 78 .tail 79 .compare_exchange(onto, next, Release, Relaxed, guard); 80 false 81 } else { 82 // looks like the actual tail; attempt to link in `n` 83 let result = o 84 .next 85 .compare_exchange(Shared::null(), new, Release, Relaxed, guard) 86 .is_ok(); 87 if result { 88 // try to move the tail pointer forward 89 let _ = self 90 .tail 91 .compare_exchange(onto, new, Release, Relaxed, guard); 92 } 93 result 94 } 95 } 96 97 /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`. push(&self, t: T, guard: &Guard)98 pub(crate) fn push(&self, t: T, guard: &Guard) { 99 let new = Owned::new(Node { 100 data: MaybeUninit::new(t), 101 next: Atomic::null(), 102 }); 103 let new = Owned::into_shared(new, guard); 104 105 loop { 106 // We push onto the tail, so we'll start optimistically by looking there first. 107 let tail = self.tail.load(Acquire, guard); 108 109 // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. 110 if self.push_internal(tail, new, guard) { 111 break; 112 } 113 } 114 } 115 116 /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop. 117 #[inline(always)] pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()>118 fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> { 119 let head = self.head.load(Acquire, guard); 120 let h = unsafe { head.deref() }; 121 let next = h.next.load(Acquire, guard); 122 match unsafe { next.as_ref() } { 123 Some(n) => unsafe { 124 self.head 125 .compare_exchange(head, next, Release, Relaxed, guard) 126 .map(|_| { 127 let tail = self.tail.load(Relaxed, guard); 128 // Advance the tail so that we don't retire a pointer to a reachable node. 129 if head == tail { 130 let _ = self 131 .tail 132 .compare_exchange(tail, next, Release, Relaxed, guard); 133 } 134 guard.defer_destroy(head); 135 Some(n.data.assume_init_read()) 136 }) 137 .map_err(|_| ()) 138 }, 139 None => Ok(None), 140 } 141 } 142 143 /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue 144 /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop. 145 #[inline(always)] pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> where T: Sync, F: Fn(&T) -> bool,146 fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> 147 where 148 T: Sync, 149 F: Fn(&T) -> bool, 150 { 151 let head = self.head.load(Acquire, guard); 152 let h = unsafe { head.deref() }; 153 let next = h.next.load(Acquire, guard); 154 match unsafe { next.as_ref() } { 155 Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe { 156 self.head 157 .compare_exchange(head, next, Release, Relaxed, guard) 158 .map(|_| { 159 let tail = self.tail.load(Relaxed, guard); 160 // Advance the tail so that we don't retire a pointer to a reachable node. 161 if head == tail { 162 let _ = self 163 .tail 164 .compare_exchange(tail, next, Release, Relaxed, guard); 165 } 166 guard.defer_destroy(head); 167 Some(n.data.assume_init_read()) 168 }) 169 .map_err(|_| ()) 170 }, 171 None | Some(_) => Ok(None), 172 } 173 } 174 175 /// Attempts to dequeue from the front. 176 /// 177 /// Returns `None` if the queue is observed to be empty. try_pop(&self, guard: &Guard) -> Option<T>178 pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> { 179 loop { 180 if let Ok(head) = self.pop_internal(guard) { 181 return head; 182 } 183 } 184 } 185 186 /// Attempts to dequeue from the front, if the item satisfies the given condition. 187 /// 188 /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given 189 /// condition. try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> where T: Sync, F: Fn(&T) -> bool,190 pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> 191 where 192 T: Sync, 193 F: Fn(&T) -> bool, 194 { 195 loop { 196 if let Ok(head) = self.pop_if_internal(&condition, guard) { 197 return head; 198 } 199 } 200 } 201 } 202 203 impl<T> Drop for Queue<T> { drop(&mut self)204 fn drop(&mut self) { 205 unsafe { 206 let guard = unprotected(); 207 208 while self.try_pop(guard).is_some() {} 209 210 // Destroy the remaining sentinel node. 211 let sentinel = self.head.load(Relaxed, guard); 212 drop(sentinel.into_owned()); 213 } 214 } 215 } 216 217 #[cfg(all(test, not(crossbeam_loom)))] 218 mod test { 219 use super::*; 220 use crate::pin; 221 use crossbeam_utils::thread; 222 223 struct Queue<T> { 224 queue: super::Queue<T>, 225 } 226 227 impl<T> Queue<T> { new() -> Queue<T>228 pub(crate) fn new() -> Queue<T> { 229 Queue { 230 queue: super::Queue::new(), 231 } 232 } 233 push(&self, t: T)234 pub(crate) fn push(&self, t: T) { 235 let guard = &pin(); 236 self.queue.push(t, guard); 237 } 238 is_empty(&self) -> bool239 pub(crate) fn is_empty(&self) -> bool { 240 let guard = &pin(); 241 let head = self.queue.head.load(Acquire, guard); 242 let h = unsafe { head.deref() }; 243 h.next.load(Acquire, guard).is_null() 244 } 245 try_pop(&self) -> Option<T>246 pub(crate) fn try_pop(&self) -> Option<T> { 247 let guard = &pin(); 248 self.queue.try_pop(guard) 249 } 250 pop(&self) -> T251 pub(crate) fn pop(&self) -> T { 252 loop { 253 match self.try_pop() { 254 None => continue, 255 Some(t) => return t, 256 } 257 } 258 } 259 } 260 261 #[cfg(miri)] 262 const CONC_COUNT: i64 = 1000; 263 #[cfg(not(miri))] 264 const CONC_COUNT: i64 = 1000000; 265 266 #[test] push_try_pop_1()267 fn push_try_pop_1() { 268 let q: Queue<i64> = Queue::new(); 269 assert!(q.is_empty()); 270 q.push(37); 271 assert!(!q.is_empty()); 272 assert_eq!(q.try_pop(), Some(37)); 273 assert!(q.is_empty()); 274 } 275 276 #[test] push_try_pop_2()277 fn push_try_pop_2() { 278 let q: Queue<i64> = Queue::new(); 279 assert!(q.is_empty()); 280 q.push(37); 281 q.push(48); 282 assert_eq!(q.try_pop(), Some(37)); 283 assert!(!q.is_empty()); 284 assert_eq!(q.try_pop(), Some(48)); 285 assert!(q.is_empty()); 286 } 287 288 #[test] push_try_pop_many_seq()289 fn push_try_pop_many_seq() { 290 let q: Queue<i64> = Queue::new(); 291 assert!(q.is_empty()); 292 for i in 0..200 { 293 q.push(i) 294 } 295 assert!(!q.is_empty()); 296 for i in 0..200 { 297 assert_eq!(q.try_pop(), Some(i)); 298 } 299 assert!(q.is_empty()); 300 } 301 302 #[test] push_pop_1()303 fn push_pop_1() { 304 let q: Queue<i64> = Queue::new(); 305 assert!(q.is_empty()); 306 q.push(37); 307 assert!(!q.is_empty()); 308 assert_eq!(q.pop(), 37); 309 assert!(q.is_empty()); 310 } 311 312 #[test] push_pop_2()313 fn push_pop_2() { 314 let q: Queue<i64> = Queue::new(); 315 q.push(37); 316 q.push(48); 317 assert_eq!(q.pop(), 37); 318 assert_eq!(q.pop(), 48); 319 } 320 321 #[test] push_pop_many_seq()322 fn push_pop_many_seq() { 323 let q: Queue<i64> = Queue::new(); 324 assert!(q.is_empty()); 325 for i in 0..200 { 326 q.push(i) 327 } 328 assert!(!q.is_empty()); 329 for i in 0..200 { 330 assert_eq!(q.pop(), i); 331 } 332 assert!(q.is_empty()); 333 } 334 335 #[test] push_try_pop_many_spsc()336 fn push_try_pop_many_spsc() { 337 let q: Queue<i64> = Queue::new(); 338 assert!(q.is_empty()); 339 340 thread::scope(|scope| { 341 scope.spawn(|_| { 342 let mut next = 0; 343 344 while next < CONC_COUNT { 345 if let Some(elem) = q.try_pop() { 346 assert_eq!(elem, next); 347 next += 1; 348 } 349 } 350 }); 351 352 for i in 0..CONC_COUNT { 353 q.push(i) 354 } 355 }) 356 .unwrap(); 357 } 358 359 #[test] push_try_pop_many_spmc()360 fn push_try_pop_many_spmc() { 361 fn recv(_t: i32, q: &Queue<i64>) { 362 let mut cur = -1; 363 for _i in 0..CONC_COUNT { 364 if let Some(elem) = q.try_pop() { 365 assert!(elem > cur); 366 cur = elem; 367 368 if cur == CONC_COUNT - 1 { 369 break; 370 } 371 } 372 } 373 } 374 375 let q: Queue<i64> = Queue::new(); 376 assert!(q.is_empty()); 377 thread::scope(|scope| { 378 for i in 0..3 { 379 let q = &q; 380 scope.spawn(move |_| recv(i, q)); 381 } 382 383 scope.spawn(|_| { 384 for i in 0..CONC_COUNT { 385 q.push(i); 386 } 387 }); 388 }) 389 .unwrap(); 390 } 391 392 #[test] push_try_pop_many_mpmc()393 fn push_try_pop_many_mpmc() { 394 enum LR { 395 Left(i64), 396 Right(i64), 397 } 398 399 let q: Queue<LR> = Queue::new(); 400 assert!(q.is_empty()); 401 402 thread::scope(|scope| { 403 for _t in 0..2 { 404 scope.spawn(|_| { 405 for i in CONC_COUNT - 1..CONC_COUNT { 406 q.push(LR::Left(i)) 407 } 408 }); 409 scope.spawn(|_| { 410 for i in CONC_COUNT - 1..CONC_COUNT { 411 q.push(LR::Right(i)) 412 } 413 }); 414 scope.spawn(|_| { 415 let mut vl = vec![]; 416 let mut vr = vec![]; 417 for _i in 0..CONC_COUNT { 418 match q.try_pop() { 419 Some(LR::Left(x)) => vl.push(x), 420 Some(LR::Right(x)) => vr.push(x), 421 _ => {} 422 } 423 } 424 425 let mut vl2 = vl.clone(); 426 let mut vr2 = vr.clone(); 427 vl2.sort_unstable(); 428 vr2.sort_unstable(); 429 430 assert_eq!(vl, vl2); 431 assert_eq!(vr, vr2); 432 }); 433 } 434 }) 435 .unwrap(); 436 } 437 438 #[test] push_pop_many_spsc()439 fn push_pop_many_spsc() { 440 let q: Queue<i64> = Queue::new(); 441 442 thread::scope(|scope| { 443 scope.spawn(|_| { 444 let mut next = 0; 445 while next < CONC_COUNT { 446 assert_eq!(q.pop(), next); 447 next += 1; 448 } 449 }); 450 451 for i in 0..CONC_COUNT { 452 q.push(i) 453 } 454 }) 455 .unwrap(); 456 assert!(q.is_empty()); 457 } 458 459 #[test] is_empty_dont_pop()460 fn is_empty_dont_pop() { 461 let q: Queue<i64> = Queue::new(); 462 q.push(20); 463 q.push(20); 464 assert!(!q.is_empty()); 465 assert!(!q.is_empty()); 466 assert!(q.try_pop().is_some()); 467 } 468 } 469