1 //! Michael-Scott lock-free queue.
2 //!
3 //! Usable with any number of producers and consumers.
4 //!
5 //! Michael and Scott.  Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
6 //! Algorithms.  PODC 1996.  <http://dl.acm.org/citation.cfm?id=248106>
7 //!
8 //! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
9 //! Practical Lock-Free Queue Algorithm. <https://doi.org/10.1007/978-3-540-30232-2_7>
10 
11 use core::mem::MaybeUninit;
12 use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
13 
14 use crossbeam_utils::CachePadded;
15 
16 use crate::{unprotected, Atomic, Guard, Owned, Shared};
17 
18 // The representation here is a singly-linked list, with a sentinel node at the front. In general
19 // the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or
20 // all `Blocked` (requests for data from blocked threads).
21 #[derive(Debug)]
22 pub(crate) struct Queue<T> {
23     head: CachePadded<Atomic<Node<T>>>,
24     tail: CachePadded<Atomic<Node<T>>>,
25 }
26 
27 struct Node<T> {
28     /// The slot in which a value of type `T` can be stored.
29     ///
30     /// The type of `data` is `MaybeUninit<T>` because a `Node<T>` doesn't always contain a `T`.
31     /// For example, the sentinel node in a queue never contains a value: its slot is always empty.
32     /// Other nodes start their life with a push operation and contain a value until it gets popped
33     /// out. After that such empty nodes get added to the collector for destruction.
34     data: MaybeUninit<T>,
35 
36     next: Atomic<Node<T>>,
37 }
38 
39 // Any particular `T` should never be accessed concurrently, so no need for `Sync`.
40 unsafe impl<T: Send> Sync for Queue<T> {}
41 unsafe impl<T: Send> Send for Queue<T> {}
42 
43 impl<T> Queue<T> {
44     /// Create a new, empty queue.
new() -> Queue<T>45     pub(crate) fn new() -> Queue<T> {
46         let q = Queue {
47             head: CachePadded::new(Atomic::null()),
48             tail: CachePadded::new(Atomic::null()),
49         };
50         let sentinel = Owned::new(Node {
51             data: MaybeUninit::uninit(),
52             next: Atomic::null(),
53         });
54         unsafe {
55             let guard = unprotected();
56             let sentinel = sentinel.into_shared(guard);
57             q.head.store(sentinel, Relaxed);
58             q.tail.store(sentinel, Relaxed);
59             q
60         }
61     }
62 
63     /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on
64     /// success. The queue's `tail` pointer may be updated.
65     #[inline(always)]
push_internal( &self, onto: Shared<'_, Node<T>>, new: Shared<'_, Node<T>>, guard: &Guard, ) -> bool66     fn push_internal(
67         &self,
68         onto: Shared<'_, Node<T>>,
69         new: Shared<'_, Node<T>>,
70         guard: &Guard,
71     ) -> bool {
72         // is `onto` the actual tail?
73         let o = unsafe { onto.deref() };
74         let next = o.next.load(Acquire, guard);
75         if unsafe { next.as_ref().is_some() } {
76             // if not, try to "help" by moving the tail pointer forward
77             let _ = self
78                 .tail
79                 .compare_exchange(onto, next, Release, Relaxed, guard);
80             false
81         } else {
82             // looks like the actual tail; attempt to link in `n`
83             let result = o
84                 .next
85                 .compare_exchange(Shared::null(), new, Release, Relaxed, guard)
86                 .is_ok();
87             if result {
88                 // try to move the tail pointer forward
89                 let _ = self
90                     .tail
91                     .compare_exchange(onto, new, Release, Relaxed, guard);
92             }
93             result
94         }
95     }
96 
97     /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`.
push(&self, t: T, guard: &Guard)98     pub(crate) fn push(&self, t: T, guard: &Guard) {
99         let new = Owned::new(Node {
100             data: MaybeUninit::new(t),
101             next: Atomic::null(),
102         });
103         let new = Owned::into_shared(new, guard);
104 
105         loop {
106             // We push onto the tail, so we'll start optimistically by looking there first.
107             let tail = self.tail.load(Acquire, guard);
108 
109             // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
110             if self.push_internal(tail, new, guard) {
111                 break;
112             }
113         }
114     }
115 
116     /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop.
117     #[inline(always)]
pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()>118     fn pop_internal(&self, guard: &Guard) -> Result<Option<T>, ()> {
119         let head = self.head.load(Acquire, guard);
120         let h = unsafe { head.deref() };
121         let next = h.next.load(Acquire, guard);
122         match unsafe { next.as_ref() } {
123             Some(n) => unsafe {
124                 self.head
125                     .compare_exchange(head, next, Release, Relaxed, guard)
126                     .map(|_| {
127                         let tail = self.tail.load(Relaxed, guard);
128                         // Advance the tail so that we don't retire a pointer to a reachable node.
129                         if head == tail {
130                             let _ = self
131                                 .tail
132                                 .compare_exchange(tail, next, Release, Relaxed, guard);
133                         }
134                         guard.defer_destroy(head);
135                         Some(n.data.assume_init_read())
136                     })
137                     .map_err(|_| ())
138             },
139             None => Ok(None),
140         }
141     }
142 
143     /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue
144     /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop.
145     #[inline(always)]
pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()> where T: Sync, F: Fn(&T) -> bool,146     fn pop_if_internal<F>(&self, condition: F, guard: &Guard) -> Result<Option<T>, ()>
147     where
148         T: Sync,
149         F: Fn(&T) -> bool,
150     {
151         let head = self.head.load(Acquire, guard);
152         let h = unsafe { head.deref() };
153         let next = h.next.load(Acquire, guard);
154         match unsafe { next.as_ref() } {
155             Some(n) if condition(unsafe { &*n.data.as_ptr() }) => unsafe {
156                 self.head
157                     .compare_exchange(head, next, Release, Relaxed, guard)
158                     .map(|_| {
159                         let tail = self.tail.load(Relaxed, guard);
160                         // Advance the tail so that we don't retire a pointer to a reachable node.
161                         if head == tail {
162                             let _ = self
163                                 .tail
164                                 .compare_exchange(tail, next, Release, Relaxed, guard);
165                         }
166                         guard.defer_destroy(head);
167                         Some(n.data.assume_init_read())
168                     })
169                     .map_err(|_| ())
170             },
171             None | Some(_) => Ok(None),
172         }
173     }
174 
175     /// Attempts to dequeue from the front.
176     ///
177     /// Returns `None` if the queue is observed to be empty.
try_pop(&self, guard: &Guard) -> Option<T>178     pub(crate) fn try_pop(&self, guard: &Guard) -> Option<T> {
179         loop {
180             if let Ok(head) = self.pop_internal(guard) {
181                 return head;
182             }
183         }
184     }
185 
186     /// Attempts to dequeue from the front, if the item satisfies the given condition.
187     ///
188     /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given
189     /// condition.
try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T> where T: Sync, F: Fn(&T) -> bool,190     pub(crate) fn try_pop_if<F>(&self, condition: F, guard: &Guard) -> Option<T>
191     where
192         T: Sync,
193         F: Fn(&T) -> bool,
194     {
195         loop {
196             if let Ok(head) = self.pop_if_internal(&condition, guard) {
197                 return head;
198             }
199         }
200     }
201 }
202 
203 impl<T> Drop for Queue<T> {
drop(&mut self)204     fn drop(&mut self) {
205         unsafe {
206             let guard = unprotected();
207 
208             while self.try_pop(guard).is_some() {}
209 
210             // Destroy the remaining sentinel node.
211             let sentinel = self.head.load(Relaxed, guard);
212             drop(sentinel.into_owned());
213         }
214     }
215 }
216 
217 #[cfg(all(test, not(crossbeam_loom)))]
218 mod test {
219     use super::*;
220     use crate::pin;
221     use crossbeam_utils::thread;
222 
223     struct Queue<T> {
224         queue: super::Queue<T>,
225     }
226 
227     impl<T> Queue<T> {
new() -> Queue<T>228         pub(crate) fn new() -> Queue<T> {
229             Queue {
230                 queue: super::Queue::new(),
231             }
232         }
233 
push(&self, t: T)234         pub(crate) fn push(&self, t: T) {
235             let guard = &pin();
236             self.queue.push(t, guard);
237         }
238 
is_empty(&self) -> bool239         pub(crate) fn is_empty(&self) -> bool {
240             let guard = &pin();
241             let head = self.queue.head.load(Acquire, guard);
242             let h = unsafe { head.deref() };
243             h.next.load(Acquire, guard).is_null()
244         }
245 
try_pop(&self) -> Option<T>246         pub(crate) fn try_pop(&self) -> Option<T> {
247             let guard = &pin();
248             self.queue.try_pop(guard)
249         }
250 
pop(&self) -> T251         pub(crate) fn pop(&self) -> T {
252             loop {
253                 match self.try_pop() {
254                     None => continue,
255                     Some(t) => return t,
256                 }
257             }
258         }
259     }
260 
261     #[cfg(miri)]
262     const CONC_COUNT: i64 = 1000;
263     #[cfg(not(miri))]
264     const CONC_COUNT: i64 = 1000000;
265 
266     #[test]
push_try_pop_1()267     fn push_try_pop_1() {
268         let q: Queue<i64> = Queue::new();
269         assert!(q.is_empty());
270         q.push(37);
271         assert!(!q.is_empty());
272         assert_eq!(q.try_pop(), Some(37));
273         assert!(q.is_empty());
274     }
275 
276     #[test]
push_try_pop_2()277     fn push_try_pop_2() {
278         let q: Queue<i64> = Queue::new();
279         assert!(q.is_empty());
280         q.push(37);
281         q.push(48);
282         assert_eq!(q.try_pop(), Some(37));
283         assert!(!q.is_empty());
284         assert_eq!(q.try_pop(), Some(48));
285         assert!(q.is_empty());
286     }
287 
288     #[test]
push_try_pop_many_seq()289     fn push_try_pop_many_seq() {
290         let q: Queue<i64> = Queue::new();
291         assert!(q.is_empty());
292         for i in 0..200 {
293             q.push(i)
294         }
295         assert!(!q.is_empty());
296         for i in 0..200 {
297             assert_eq!(q.try_pop(), Some(i));
298         }
299         assert!(q.is_empty());
300     }
301 
302     #[test]
push_pop_1()303     fn push_pop_1() {
304         let q: Queue<i64> = Queue::new();
305         assert!(q.is_empty());
306         q.push(37);
307         assert!(!q.is_empty());
308         assert_eq!(q.pop(), 37);
309         assert!(q.is_empty());
310     }
311 
312     #[test]
push_pop_2()313     fn push_pop_2() {
314         let q: Queue<i64> = Queue::new();
315         q.push(37);
316         q.push(48);
317         assert_eq!(q.pop(), 37);
318         assert_eq!(q.pop(), 48);
319     }
320 
321     #[test]
push_pop_many_seq()322     fn push_pop_many_seq() {
323         let q: Queue<i64> = Queue::new();
324         assert!(q.is_empty());
325         for i in 0..200 {
326             q.push(i)
327         }
328         assert!(!q.is_empty());
329         for i in 0..200 {
330             assert_eq!(q.pop(), i);
331         }
332         assert!(q.is_empty());
333     }
334 
335     #[test]
push_try_pop_many_spsc()336     fn push_try_pop_many_spsc() {
337         let q: Queue<i64> = Queue::new();
338         assert!(q.is_empty());
339 
340         thread::scope(|scope| {
341             scope.spawn(|_| {
342                 let mut next = 0;
343 
344                 while next < CONC_COUNT {
345                     if let Some(elem) = q.try_pop() {
346                         assert_eq!(elem, next);
347                         next += 1;
348                     }
349                 }
350             });
351 
352             for i in 0..CONC_COUNT {
353                 q.push(i)
354             }
355         })
356         .unwrap();
357     }
358 
359     #[test]
push_try_pop_many_spmc()360     fn push_try_pop_many_spmc() {
361         fn recv(_t: i32, q: &Queue<i64>) {
362             let mut cur = -1;
363             for _i in 0..CONC_COUNT {
364                 if let Some(elem) = q.try_pop() {
365                     assert!(elem > cur);
366                     cur = elem;
367 
368                     if cur == CONC_COUNT - 1 {
369                         break;
370                     }
371                 }
372             }
373         }
374 
375         let q: Queue<i64> = Queue::new();
376         assert!(q.is_empty());
377         thread::scope(|scope| {
378             for i in 0..3 {
379                 let q = &q;
380                 scope.spawn(move |_| recv(i, q));
381             }
382 
383             scope.spawn(|_| {
384                 for i in 0..CONC_COUNT {
385                     q.push(i);
386                 }
387             });
388         })
389         .unwrap();
390     }
391 
392     #[test]
push_try_pop_many_mpmc()393     fn push_try_pop_many_mpmc() {
394         enum LR {
395             Left(i64),
396             Right(i64),
397         }
398 
399         let q: Queue<LR> = Queue::new();
400         assert!(q.is_empty());
401 
402         thread::scope(|scope| {
403             for _t in 0..2 {
404                 scope.spawn(|_| {
405                     for i in CONC_COUNT - 1..CONC_COUNT {
406                         q.push(LR::Left(i))
407                     }
408                 });
409                 scope.spawn(|_| {
410                     for i in CONC_COUNT - 1..CONC_COUNT {
411                         q.push(LR::Right(i))
412                     }
413                 });
414                 scope.spawn(|_| {
415                     let mut vl = vec![];
416                     let mut vr = vec![];
417                     for _i in 0..CONC_COUNT {
418                         match q.try_pop() {
419                             Some(LR::Left(x)) => vl.push(x),
420                             Some(LR::Right(x)) => vr.push(x),
421                             _ => {}
422                         }
423                     }
424 
425                     let mut vl2 = vl.clone();
426                     let mut vr2 = vr.clone();
427                     vl2.sort_unstable();
428                     vr2.sort_unstable();
429 
430                     assert_eq!(vl, vl2);
431                     assert_eq!(vr, vr2);
432                 });
433             }
434         })
435         .unwrap();
436     }
437 
438     #[test]
push_pop_many_spsc()439     fn push_pop_many_spsc() {
440         let q: Queue<i64> = Queue::new();
441 
442         thread::scope(|scope| {
443             scope.spawn(|_| {
444                 let mut next = 0;
445                 while next < CONC_COUNT {
446                     assert_eq!(q.pop(), next);
447                     next += 1;
448                 }
449             });
450 
451             for i in 0..CONC_COUNT {
452                 q.push(i)
453             }
454         })
455         .unwrap();
456         assert!(q.is_empty());
457     }
458 
459     #[test]
is_empty_dont_pop()460     fn is_empty_dont_pop() {
461         let q: Queue<i64> = Queue::new();
462         q.push(20);
463         q.push(20);
464         assert!(!q.is_empty());
465         assert!(!q.is_empty());
466         assert!(q.try_pop().is_some());
467     }
468 }
469