1 use std::sync::atomic::Ordering::SeqCst;
2 use std::sync::atomic::{AtomicBool, AtomicUsize};
3 use std::sync::{Arc, Mutex};
4 
5 use crossbeam_deque::Steal::{Empty, Success};
6 use crossbeam_deque::Worker;
7 use crossbeam_utils::thread::scope;
8 use rand::Rng;
9 
10 #[test]
smoke()11 fn smoke() {
12     let w = Worker::new_lifo();
13     let s = w.stealer();
14     assert_eq!(w.pop(), None);
15     assert_eq!(s.steal(), Empty);
16 
17     w.push(1);
18     assert_eq!(w.pop(), Some(1));
19     assert_eq!(w.pop(), None);
20     assert_eq!(s.steal(), Empty);
21 
22     w.push(2);
23     assert_eq!(s.steal(), Success(2));
24     assert_eq!(s.steal(), Empty);
25     assert_eq!(w.pop(), None);
26 
27     w.push(3);
28     w.push(4);
29     w.push(5);
30     assert_eq!(s.steal(), Success(3));
31     assert_eq!(s.steal(), Success(4));
32     assert_eq!(s.steal(), Success(5));
33     assert_eq!(s.steal(), Empty);
34 
35     w.push(6);
36     w.push(7);
37     w.push(8);
38     w.push(9);
39     assert_eq!(w.pop(), Some(9));
40     assert_eq!(s.steal(), Success(6));
41     assert_eq!(w.pop(), Some(8));
42     assert_eq!(w.pop(), Some(7));
43     assert_eq!(w.pop(), None);
44 }
45 
46 #[test]
is_empty()47 fn is_empty() {
48     let w = Worker::new_lifo();
49     let s = w.stealer();
50 
51     assert!(w.is_empty());
52     w.push(1);
53     assert!(!w.is_empty());
54     w.push(2);
55     assert!(!w.is_empty());
56     let _ = w.pop();
57     assert!(!w.is_empty());
58     let _ = w.pop();
59     assert!(w.is_empty());
60 
61     assert!(s.is_empty());
62     w.push(1);
63     assert!(!s.is_empty());
64     w.push(2);
65     assert!(!s.is_empty());
66     let _ = s.steal();
67     assert!(!s.is_empty());
68     let _ = s.steal();
69     assert!(s.is_empty());
70 }
71 
72 #[test]
spsc()73 fn spsc() {
74     #[cfg(miri)]
75     const STEPS: usize = 500;
76     #[cfg(not(miri))]
77     const STEPS: usize = 50_000;
78 
79     let w = Worker::new_lifo();
80     let s = w.stealer();
81 
82     scope(|scope| {
83         scope.spawn(|_| {
84             for i in 0..STEPS {
85                 loop {
86                     if let Success(v) = s.steal() {
87                         assert_eq!(i, v);
88                         break;
89                     }
90                     #[cfg(miri)]
91                     std::hint::spin_loop();
92                 }
93             }
94 
95             assert_eq!(s.steal(), Empty);
96         });
97 
98         for i in 0..STEPS {
99             w.push(i);
100         }
101     })
102     .unwrap();
103 }
104 
105 #[test]
stampede()106 fn stampede() {
107     const THREADS: usize = 8;
108     #[cfg(miri)]
109     const COUNT: usize = 500;
110     #[cfg(not(miri))]
111     const COUNT: usize = 50_000;
112 
113     let w = Worker::new_lifo();
114 
115     for i in 0..COUNT {
116         w.push(Box::new(i + 1));
117     }
118     let remaining = Arc::new(AtomicUsize::new(COUNT));
119 
120     scope(|scope| {
121         for _ in 0..THREADS {
122             let s = w.stealer();
123             let remaining = remaining.clone();
124 
125             scope.spawn(move |_| {
126                 let mut last = 0;
127                 while remaining.load(SeqCst) > 0 {
128                     if let Success(x) = s.steal() {
129                         assert!(last < *x);
130                         last = *x;
131                         remaining.fetch_sub(1, SeqCst);
132                     }
133                 }
134             });
135         }
136 
137         let mut last = COUNT + 1;
138         while remaining.load(SeqCst) > 0 {
139             if let Some(x) = w.pop() {
140                 assert!(last > *x);
141                 last = *x;
142                 remaining.fetch_sub(1, SeqCst);
143             }
144         }
145     })
146     .unwrap();
147 }
148 
149 #[test]
stress()150 fn stress() {
151     const THREADS: usize = 8;
152     #[cfg(miri)]
153     const COUNT: usize = 500;
154     #[cfg(not(miri))]
155     const COUNT: usize = 50_000;
156 
157     let w = Worker::new_lifo();
158     let done = Arc::new(AtomicBool::new(false));
159     let hits = Arc::new(AtomicUsize::new(0));
160 
161     scope(|scope| {
162         for _ in 0..THREADS {
163             let s = w.stealer();
164             let done = done.clone();
165             let hits = hits.clone();
166 
167             scope.spawn(move |_| {
168                 let w2 = Worker::new_lifo();
169 
170                 while !done.load(SeqCst) {
171                     if let Success(_) = s.steal() {
172                         hits.fetch_add(1, SeqCst);
173                     }
174 
175                     let _ = s.steal_batch(&w2);
176 
177                     if let Success(_) = s.steal_batch_and_pop(&w2) {
178                         hits.fetch_add(1, SeqCst);
179                     }
180 
181                     while w2.pop().is_some() {
182                         hits.fetch_add(1, SeqCst);
183                     }
184                 }
185             });
186         }
187 
188         let mut rng = rand::thread_rng();
189         let mut expected = 0;
190         while expected < COUNT {
191             if rng.gen_range(0..3) == 0 {
192                 while w.pop().is_some() {
193                     hits.fetch_add(1, SeqCst);
194                 }
195             } else {
196                 w.push(expected);
197                 expected += 1;
198             }
199         }
200 
201         while hits.load(SeqCst) < COUNT {
202             while w.pop().is_some() {
203                 hits.fetch_add(1, SeqCst);
204             }
205         }
206         done.store(true, SeqCst);
207     })
208     .unwrap();
209 }
210 
211 #[cfg_attr(miri, ignore)] // Miri is too slow
212 #[test]
no_starvation()213 fn no_starvation() {
214     const THREADS: usize = 8;
215     const COUNT: usize = 50_000;
216 
217     let w = Worker::new_lifo();
218     let done = Arc::new(AtomicBool::new(false));
219     let mut all_hits = Vec::new();
220 
221     scope(|scope| {
222         for _ in 0..THREADS {
223             let s = w.stealer();
224             let done = done.clone();
225             let hits = Arc::new(AtomicUsize::new(0));
226             all_hits.push(hits.clone());
227 
228             scope.spawn(move |_| {
229                 let w2 = Worker::new_lifo();
230 
231                 while !done.load(SeqCst) {
232                     if let Success(_) = s.steal() {
233                         hits.fetch_add(1, SeqCst);
234                     }
235 
236                     let _ = s.steal_batch(&w2);
237 
238                     if let Success(_) = s.steal_batch_and_pop(&w2) {
239                         hits.fetch_add(1, SeqCst);
240                     }
241 
242                     while w2.pop().is_some() {
243                         hits.fetch_add(1, SeqCst);
244                     }
245                 }
246             });
247         }
248 
249         let mut rng = rand::thread_rng();
250         let mut my_hits = 0;
251         loop {
252             for i in 0..rng.gen_range(0..COUNT) {
253                 if rng.gen_range(0..3) == 0 && my_hits == 0 {
254                     while w.pop().is_some() {
255                         my_hits += 1;
256                     }
257                 } else {
258                     w.push(i);
259                 }
260             }
261 
262             if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
263                 break;
264             }
265         }
266         done.store(true, SeqCst);
267     })
268     .unwrap();
269 }
270 
271 #[test]
destructors()272 fn destructors() {
273     #[cfg(miri)]
274     const THREADS: usize = 2;
275     #[cfg(not(miri))]
276     const THREADS: usize = 8;
277     #[cfg(miri)]
278     const COUNT: usize = 500;
279     #[cfg(not(miri))]
280     const COUNT: usize = 50_000;
281     #[cfg(miri)]
282     const STEPS: usize = 100;
283     #[cfg(not(miri))]
284     const STEPS: usize = 1000;
285 
286     struct Elem(usize, Arc<Mutex<Vec<usize>>>);
287 
288     impl Drop for Elem {
289         fn drop(&mut self) {
290             self.1.lock().unwrap().push(self.0);
291         }
292     }
293 
294     let w = Worker::new_lifo();
295     let dropped = Arc::new(Mutex::new(Vec::new()));
296     let remaining = Arc::new(AtomicUsize::new(COUNT));
297 
298     for i in 0..COUNT {
299         w.push(Elem(i, dropped.clone()));
300     }
301 
302     scope(|scope| {
303         for _ in 0..THREADS {
304             let remaining = remaining.clone();
305             let s = w.stealer();
306 
307             scope.spawn(move |_| {
308                 let w2 = Worker::new_lifo();
309                 let mut cnt = 0;
310 
311                 while cnt < STEPS {
312                     if let Success(_) = s.steal() {
313                         cnt += 1;
314                         remaining.fetch_sub(1, SeqCst);
315                     }
316 
317                     let _ = s.steal_batch(&w2);
318 
319                     if let Success(_) = s.steal_batch_and_pop(&w2) {
320                         cnt += 1;
321                         remaining.fetch_sub(1, SeqCst);
322                     }
323 
324                     while w2.pop().is_some() {
325                         cnt += 1;
326                         remaining.fetch_sub(1, SeqCst);
327                     }
328                 }
329             });
330         }
331 
332         for _ in 0..STEPS {
333             if w.pop().is_some() {
334                 remaining.fetch_sub(1, SeqCst);
335             }
336         }
337     })
338     .unwrap();
339 
340     let rem = remaining.load(SeqCst);
341     assert!(rem > 0);
342 
343     {
344         let mut v = dropped.lock().unwrap();
345         assert_eq!(v.len(), COUNT - rem);
346         v.clear();
347     }
348 
349     drop(w);
350 
351     {
352         let mut v = dropped.lock().unwrap();
353         assert_eq!(v.len(), rem);
354         v.sort_unstable();
355         for pair in v.windows(2) {
356             assert_eq!(pair[0] + 1, pair[1]);
357         }
358     }
359 }
360