1 use std::sync::atomic::Ordering::SeqCst;
2 use std::sync::atomic::{AtomicBool, AtomicUsize};
3 use std::sync::{Arc, Mutex};
4 
5 use crossbeam_deque::Steal::{Empty, Success};
6 use crossbeam_deque::Worker;
7 use crossbeam_utils::thread::scope;
8 use rand::Rng;
9 
10 #[test]
smoke()11 fn smoke() {
12     let w = Worker::new_fifo();
13     let s = w.stealer();
14     assert_eq!(w.pop(), None);
15     assert_eq!(s.steal(), Empty);
16 
17     w.push(1);
18     assert_eq!(w.pop(), Some(1));
19     assert_eq!(w.pop(), None);
20     assert_eq!(s.steal(), Empty);
21 
22     w.push(2);
23     assert_eq!(s.steal(), Success(2));
24     assert_eq!(s.steal(), Empty);
25     assert_eq!(w.pop(), None);
26 
27     w.push(3);
28     w.push(4);
29     w.push(5);
30     assert_eq!(s.steal(), Success(3));
31     assert_eq!(s.steal(), Success(4));
32     assert_eq!(s.steal(), Success(5));
33     assert_eq!(s.steal(), Empty);
34 
35     w.push(6);
36     w.push(7);
37     w.push(8);
38     w.push(9);
39     assert_eq!(w.pop(), Some(6));
40     assert_eq!(s.steal(), Success(7));
41     assert_eq!(w.pop(), Some(8));
42     assert_eq!(w.pop(), Some(9));
43     assert_eq!(w.pop(), None);
44 }
45 
46 #[test]
is_empty()47 fn is_empty() {
48     let w = Worker::new_fifo();
49     let s = w.stealer();
50 
51     assert!(w.is_empty());
52     w.push(1);
53     assert!(!w.is_empty());
54     w.push(2);
55     assert!(!w.is_empty());
56     let _ = w.pop();
57     assert!(!w.is_empty());
58     let _ = w.pop();
59     assert!(w.is_empty());
60 
61     assert!(s.is_empty());
62     w.push(1);
63     assert!(!s.is_empty());
64     w.push(2);
65     assert!(!s.is_empty());
66     let _ = s.steal();
67     assert!(!s.is_empty());
68     let _ = s.steal();
69     assert!(s.is_empty());
70 }
71 
72 #[test]
spsc()73 fn spsc() {
74     #[cfg(miri)]
75     const STEPS: usize = 500;
76     #[cfg(not(miri))]
77     const STEPS: usize = 50_000;
78 
79     let w = Worker::new_fifo();
80     let s = w.stealer();
81 
82     scope(|scope| {
83         scope.spawn(|_| {
84             for i in 0..STEPS {
85                 loop {
86                     if let Success(v) = s.steal() {
87                         assert_eq!(i, v);
88                         break;
89                     }
90                 }
91             }
92 
93             assert_eq!(s.steal(), Empty);
94         });
95 
96         for i in 0..STEPS {
97             w.push(i);
98         }
99     })
100     .unwrap();
101 }
102 
103 #[test]
stampede()104 fn stampede() {
105     const THREADS: usize = 8;
106     #[cfg(miri)]
107     const COUNT: usize = 500;
108     #[cfg(not(miri))]
109     const COUNT: usize = 50_000;
110 
111     let w = Worker::new_fifo();
112 
113     for i in 0..COUNT {
114         w.push(Box::new(i + 1));
115     }
116     let remaining = Arc::new(AtomicUsize::new(COUNT));
117 
118     scope(|scope| {
119         for _ in 0..THREADS {
120             let s = w.stealer();
121             let remaining = remaining.clone();
122 
123             scope.spawn(move |_| {
124                 let mut last = 0;
125                 while remaining.load(SeqCst) > 0 {
126                     if let Success(x) = s.steal() {
127                         assert!(last < *x);
128                         last = *x;
129                         remaining.fetch_sub(1, SeqCst);
130                     }
131                 }
132             });
133         }
134 
135         let mut last = 0;
136         while remaining.load(SeqCst) > 0 {
137             if let Some(x) = w.pop() {
138                 assert!(last < *x);
139                 last = *x;
140                 remaining.fetch_sub(1, SeqCst);
141             }
142         }
143     })
144     .unwrap();
145 }
146 
147 #[test]
stress()148 fn stress() {
149     const THREADS: usize = 8;
150     #[cfg(miri)]
151     const COUNT: usize = 500;
152     #[cfg(not(miri))]
153     const COUNT: usize = 50_000;
154 
155     let w = Worker::new_fifo();
156     let done = Arc::new(AtomicBool::new(false));
157     let hits = Arc::new(AtomicUsize::new(0));
158 
159     scope(|scope| {
160         for _ in 0..THREADS {
161             let s = w.stealer();
162             let done = done.clone();
163             let hits = hits.clone();
164 
165             scope.spawn(move |_| {
166                 let w2 = Worker::new_fifo();
167 
168                 while !done.load(SeqCst) {
169                     if let Success(_) = s.steal() {
170                         hits.fetch_add(1, SeqCst);
171                     }
172 
173                     let _ = s.steal_batch(&w2);
174 
175                     if let Success(_) = s.steal_batch_and_pop(&w2) {
176                         hits.fetch_add(1, SeqCst);
177                     }
178 
179                     while w2.pop().is_some() {
180                         hits.fetch_add(1, SeqCst);
181                     }
182                 }
183             });
184         }
185 
186         let mut rng = rand::thread_rng();
187         let mut expected = 0;
188         while expected < COUNT {
189             if rng.gen_range(0..3) == 0 {
190                 while w.pop().is_some() {
191                     hits.fetch_add(1, SeqCst);
192                 }
193             } else {
194                 w.push(expected);
195                 expected += 1;
196             }
197         }
198 
199         while hits.load(SeqCst) < COUNT {
200             while w.pop().is_some() {
201                 hits.fetch_add(1, SeqCst);
202             }
203         }
204         done.store(true, SeqCst);
205     })
206     .unwrap();
207 }
208 
209 #[cfg_attr(miri, ignore)] // Miri is too slow
210 #[test]
no_starvation()211 fn no_starvation() {
212     const THREADS: usize = 8;
213     const COUNT: usize = 50_000;
214 
215     let w = Worker::new_fifo();
216     let done = Arc::new(AtomicBool::new(false));
217     let mut all_hits = Vec::new();
218 
219     scope(|scope| {
220         for _ in 0..THREADS {
221             let s = w.stealer();
222             let done = done.clone();
223             let hits = Arc::new(AtomicUsize::new(0));
224             all_hits.push(hits.clone());
225 
226             scope.spawn(move |_| {
227                 let w2 = Worker::new_fifo();
228 
229                 while !done.load(SeqCst) {
230                     if let Success(_) = s.steal() {
231                         hits.fetch_add(1, SeqCst);
232                     }
233 
234                     let _ = s.steal_batch(&w2);
235 
236                     if let Success(_) = s.steal_batch_and_pop(&w2) {
237                         hits.fetch_add(1, SeqCst);
238                     }
239 
240                     while w2.pop().is_some() {
241                         hits.fetch_add(1, SeqCst);
242                     }
243                 }
244             });
245         }
246 
247         let mut rng = rand::thread_rng();
248         let mut my_hits = 0;
249         loop {
250             for i in 0..rng.gen_range(0..COUNT) {
251                 if rng.gen_range(0..3) == 0 && my_hits == 0 {
252                     while w.pop().is_some() {
253                         my_hits += 1;
254                     }
255                 } else {
256                     w.push(i);
257                 }
258             }
259 
260             if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
261                 break;
262             }
263         }
264         done.store(true, SeqCst);
265     })
266     .unwrap();
267 }
268 
269 #[test]
destructors()270 fn destructors() {
271     #[cfg(miri)]
272     const THREADS: usize = 2;
273     #[cfg(not(miri))]
274     const THREADS: usize = 8;
275     #[cfg(miri)]
276     const COUNT: usize = 500;
277     #[cfg(not(miri))]
278     const COUNT: usize = 50_000;
279     #[cfg(miri)]
280     const STEPS: usize = 100;
281     #[cfg(not(miri))]
282     const STEPS: usize = 1000;
283 
284     struct Elem(usize, Arc<Mutex<Vec<usize>>>);
285 
286     impl Drop for Elem {
287         fn drop(&mut self) {
288             self.1.lock().unwrap().push(self.0);
289         }
290     }
291 
292     let w = Worker::new_fifo();
293     let dropped = Arc::new(Mutex::new(Vec::new()));
294     let remaining = Arc::new(AtomicUsize::new(COUNT));
295 
296     for i in 0..COUNT {
297         w.push(Elem(i, dropped.clone()));
298     }
299 
300     scope(|scope| {
301         for _ in 0..THREADS {
302             let remaining = remaining.clone();
303             let s = w.stealer();
304 
305             scope.spawn(move |_| {
306                 let w2 = Worker::new_fifo();
307                 let mut cnt = 0;
308 
309                 while cnt < STEPS {
310                     if let Success(_) = s.steal() {
311                         cnt += 1;
312                         remaining.fetch_sub(1, SeqCst);
313                     }
314 
315                     let _ = s.steal_batch(&w2);
316 
317                     if let Success(_) = s.steal_batch_and_pop(&w2) {
318                         cnt += 1;
319                         remaining.fetch_sub(1, SeqCst);
320                     }
321 
322                     while w2.pop().is_some() {
323                         cnt += 1;
324                         remaining.fetch_sub(1, SeqCst);
325                     }
326                 }
327             });
328         }
329 
330         for _ in 0..STEPS {
331             if w.pop().is_some() {
332                 remaining.fetch_sub(1, SeqCst);
333             }
334         }
335     })
336     .unwrap();
337 
338     let rem = remaining.load(SeqCst);
339     assert!(rem > 0);
340 
341     {
342         let mut v = dropped.lock().unwrap();
343         assert_eq!(v.len(), COUNT - rem);
344         v.clear();
345     }
346 
347     drop(w);
348 
349     {
350         let mut v = dropped.lock().unwrap();
351         assert_eq!(v.len(), rem);
352         v.sort_unstable();
353         for pair in v.windows(2) {
354             assert_eq!(pair[0] + 1, pair[1]);
355         }
356     }
357 }
358