1 use std::sync::atomic::Ordering::SeqCst;
2 use std::sync::atomic::{AtomicBool, AtomicUsize};
3 use std::sync::{Arc, Mutex};
4 
5 use crossbeam_deque::Steal::{Empty, Success};
6 use crossbeam_deque::{Injector, Worker};
7 use crossbeam_utils::thread::scope;
8 use rand::Rng;
9 
10 #[test]
smoke()11 fn smoke() {
12     let q = Injector::new();
13     assert_eq!(q.steal(), Empty);
14 
15     q.push(1);
16     q.push(2);
17     assert_eq!(q.steal(), Success(1));
18     assert_eq!(q.steal(), Success(2));
19     assert_eq!(q.steal(), Empty);
20 
21     q.push(3);
22     assert_eq!(q.steal(), Success(3));
23     assert_eq!(q.steal(), Empty);
24 }
25 
26 #[test]
is_empty()27 fn is_empty() {
28     let q = Injector::new();
29     assert!(q.is_empty());
30 
31     q.push(1);
32     assert!(!q.is_empty());
33     q.push(2);
34     assert!(!q.is_empty());
35 
36     let _ = q.steal();
37     assert!(!q.is_empty());
38     let _ = q.steal();
39     assert!(q.is_empty());
40 
41     q.push(3);
42     assert!(!q.is_empty());
43     let _ = q.steal();
44     assert!(q.is_empty());
45 }
46 
47 #[test]
spsc()48 fn spsc() {
49     #[cfg(miri)]
50     const COUNT: usize = 500;
51     #[cfg(not(miri))]
52     const COUNT: usize = 100_000;
53 
54     let q = Injector::new();
55 
56     scope(|scope| {
57         scope.spawn(|_| {
58             for i in 0..COUNT {
59                 loop {
60                     if let Success(v) = q.steal() {
61                         assert_eq!(i, v);
62                         break;
63                     }
64                     #[cfg(miri)]
65                     std::hint::spin_loop();
66                 }
67             }
68 
69             assert_eq!(q.steal(), Empty);
70         });
71 
72         for i in 0..COUNT {
73             q.push(i);
74         }
75     })
76     .unwrap();
77 }
78 
79 #[test]
mpmc()80 fn mpmc() {
81     #[cfg(miri)]
82     const COUNT: usize = 500;
83     #[cfg(not(miri))]
84     const COUNT: usize = 25_000;
85     const THREADS: usize = 4;
86 
87     let q = Injector::new();
88     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
89 
90     scope(|scope| {
91         for _ in 0..THREADS {
92             scope.spawn(|_| {
93                 for i in 0..COUNT {
94                     q.push(i);
95                 }
96             });
97         }
98 
99         for _ in 0..THREADS {
100             scope.spawn(|_| {
101                 for _ in 0..COUNT {
102                     loop {
103                         if let Success(n) = q.steal() {
104                             v[n].fetch_add(1, SeqCst);
105                             break;
106                         }
107                         #[cfg(miri)]
108                         std::hint::spin_loop();
109                     }
110                 }
111             });
112         }
113     })
114     .unwrap();
115 
116     for c in v {
117         assert_eq!(c.load(SeqCst), THREADS);
118     }
119 }
120 
121 #[test]
stampede()122 fn stampede() {
123     const THREADS: usize = 8;
124     #[cfg(miri)]
125     const COUNT: usize = 500;
126     #[cfg(not(miri))]
127     const COUNT: usize = 50_000;
128 
129     let q = Injector::new();
130 
131     for i in 0..COUNT {
132         q.push(Box::new(i + 1));
133     }
134     let remaining = Arc::new(AtomicUsize::new(COUNT));
135 
136     scope(|scope| {
137         for _ in 0..THREADS {
138             let remaining = remaining.clone();
139             let q = &q;
140 
141             scope.spawn(move |_| {
142                 let mut last = 0;
143                 while remaining.load(SeqCst) > 0 {
144                     if let Success(x) = q.steal() {
145                         assert!(last < *x);
146                         last = *x;
147                         remaining.fetch_sub(1, SeqCst);
148                     }
149                 }
150             });
151         }
152 
153         let mut last = 0;
154         while remaining.load(SeqCst) > 0 {
155             if let Success(x) = q.steal() {
156                 assert!(last < *x);
157                 last = *x;
158                 remaining.fetch_sub(1, SeqCst);
159             }
160         }
161     })
162     .unwrap();
163 }
164 
165 #[test]
stress()166 fn stress() {
167     const THREADS: usize = 8;
168     #[cfg(miri)]
169     const COUNT: usize = 500;
170     #[cfg(not(miri))]
171     const COUNT: usize = 50_000;
172 
173     let q = Injector::new();
174     let done = Arc::new(AtomicBool::new(false));
175     let hits = Arc::new(AtomicUsize::new(0));
176 
177     scope(|scope| {
178         for _ in 0..THREADS {
179             let done = done.clone();
180             let hits = hits.clone();
181             let q = &q;
182 
183             scope.spawn(move |_| {
184                 let w2 = Worker::new_fifo();
185 
186                 while !done.load(SeqCst) {
187                     if let Success(_) = q.steal() {
188                         hits.fetch_add(1, SeqCst);
189                     }
190 
191                     let _ = q.steal_batch(&w2);
192 
193                     if let Success(_) = q.steal_batch_and_pop(&w2) {
194                         hits.fetch_add(1, SeqCst);
195                     }
196 
197                     while w2.pop().is_some() {
198                         hits.fetch_add(1, SeqCst);
199                     }
200                 }
201             });
202         }
203 
204         let mut rng = rand::thread_rng();
205         let mut expected = 0;
206         while expected < COUNT {
207             if rng.gen_range(0..3) == 0 {
208                 while let Success(_) = q.steal() {
209                     hits.fetch_add(1, SeqCst);
210                 }
211             } else {
212                 q.push(expected);
213                 expected += 1;
214             }
215         }
216 
217         while hits.load(SeqCst) < COUNT {
218             while let Success(_) = q.steal() {
219                 hits.fetch_add(1, SeqCst);
220             }
221         }
222         done.store(true, SeqCst);
223     })
224     .unwrap();
225 }
226 
227 #[cfg_attr(miri, ignore)] // Miri is too slow
228 #[test]
no_starvation()229 fn no_starvation() {
230     const THREADS: usize = 8;
231     const COUNT: usize = 50_000;
232 
233     let q = Injector::new();
234     let done = Arc::new(AtomicBool::new(false));
235     let mut all_hits = Vec::new();
236 
237     scope(|scope| {
238         for _ in 0..THREADS {
239             let done = done.clone();
240             let hits = Arc::new(AtomicUsize::new(0));
241             all_hits.push(hits.clone());
242             let q = &q;
243 
244             scope.spawn(move |_| {
245                 let w2 = Worker::new_fifo();
246 
247                 while !done.load(SeqCst) {
248                     if let Success(_) = q.steal() {
249                         hits.fetch_add(1, SeqCst);
250                     }
251 
252                     let _ = q.steal_batch(&w2);
253 
254                     if let Success(_) = q.steal_batch_and_pop(&w2) {
255                         hits.fetch_add(1, SeqCst);
256                     }
257 
258                     while w2.pop().is_some() {
259                         hits.fetch_add(1, SeqCst);
260                     }
261                 }
262             });
263         }
264 
265         let mut rng = rand::thread_rng();
266         let mut my_hits = 0;
267         loop {
268             for i in 0..rng.gen_range(0..COUNT) {
269                 if rng.gen_range(0..3) == 0 && my_hits == 0 {
270                     while let Success(_) = q.steal() {
271                         my_hits += 1;
272                     }
273                 } else {
274                     q.push(i);
275                 }
276             }
277 
278             if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
279                 break;
280             }
281         }
282         done.store(true, SeqCst);
283     })
284     .unwrap();
285 }
286 
287 #[test]
destructors()288 fn destructors() {
289     #[cfg(miri)]
290     const THREADS: usize = 2;
291     #[cfg(not(miri))]
292     const THREADS: usize = 8;
293     #[cfg(miri)]
294     const COUNT: usize = 500;
295     #[cfg(not(miri))]
296     const COUNT: usize = 50_000;
297     #[cfg(miri)]
298     const STEPS: usize = 100;
299     #[cfg(not(miri))]
300     const STEPS: usize = 1000;
301 
302     struct Elem(usize, Arc<Mutex<Vec<usize>>>);
303 
304     impl Drop for Elem {
305         fn drop(&mut self) {
306             self.1.lock().unwrap().push(self.0);
307         }
308     }
309 
310     let q = Injector::new();
311     let dropped = Arc::new(Mutex::new(Vec::new()));
312     let remaining = Arc::new(AtomicUsize::new(COUNT));
313 
314     for i in 0..COUNT {
315         q.push(Elem(i, dropped.clone()));
316     }
317 
318     scope(|scope| {
319         for _ in 0..THREADS {
320             let remaining = remaining.clone();
321             let q = &q;
322 
323             scope.spawn(move |_| {
324                 let w2 = Worker::new_fifo();
325                 let mut cnt = 0;
326 
327                 while cnt < STEPS {
328                     if let Success(_) = q.steal() {
329                         cnt += 1;
330                         remaining.fetch_sub(1, SeqCst);
331                     }
332 
333                     let _ = q.steal_batch(&w2);
334 
335                     if let Success(_) = q.steal_batch_and_pop(&w2) {
336                         cnt += 1;
337                         remaining.fetch_sub(1, SeqCst);
338                     }
339 
340                     while w2.pop().is_some() {
341                         cnt += 1;
342                         remaining.fetch_sub(1, SeqCst);
343                     }
344                 }
345             });
346         }
347 
348         for _ in 0..STEPS {
349             if let Success(_) = q.steal() {
350                 remaining.fetch_sub(1, SeqCst);
351             }
352         }
353     })
354     .unwrap();
355 
356     let rem = remaining.load(SeqCst);
357     assert!(rem > 0);
358 
359     {
360         let mut v = dropped.lock().unwrap();
361         assert_eq!(v.len(), COUNT - rem);
362         v.clear();
363     }
364 
365     drop(q);
366 
367     {
368         let mut v = dropped.lock().unwrap();
369         assert_eq!(v.len(), rem);
370         v.sort_unstable();
371         for pair in v.windows(2) {
372             assert_eq!(pair[0] + 1, pair[1]);
373         }
374     }
375 }
376