1 #![cfg(test)]
2 
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::mpsc::channel;
5 use std::sync::{Arc, Mutex};
6 
7 use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
8 
9 #[test]
10 #[should_panic(expected = "Hello, world!")]
panic_propagate()11 fn panic_propagate() {
12     let thread_pool = ThreadPoolBuilder::new().build().unwrap();
13     thread_pool.install(|| {
14         panic!("Hello, world!");
15     });
16 }
17 
18 #[test]
19 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
workers_stop()20 fn workers_stop() {
21     let registry;
22 
23     {
24         // once we exit this block, thread-pool will be dropped
25         let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
26         registry = thread_pool.install(|| {
27             // do some work on these threads
28             join_a_lot(22);
29 
30             Arc::clone(&thread_pool.registry)
31         });
32         assert_eq!(registry.num_threads(), 22);
33     }
34 
35     // once thread-pool is dropped, registry should terminate, which
36     // should lead to worker threads stopping
37     registry.wait_until_stopped();
38 }
39 
join_a_lot(n: usize)40 fn join_a_lot(n: usize) {
41     if n > 0 {
42         join(|| join_a_lot(n - 1), || join_a_lot(n - 1));
43     }
44 }
45 
46 #[test]
47 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
sleeper_stop()48 fn sleeper_stop() {
49     use std::{thread, time};
50 
51     let registry;
52 
53     {
54         // once we exit this block, thread-pool will be dropped
55         let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
56         registry = Arc::clone(&thread_pool.registry);
57 
58         // Give time for at least some of the thread pool to fall asleep.
59         thread::sleep(time::Duration::from_secs(1));
60     }
61 
62     // once thread-pool is dropped, registry should terminate, which
63     // should lead to worker threads stopping
64     registry.wait_until_stopped();
65 }
66 
67 /// Creates a start/exit handler that increments an atomic counter.
count_handler() -> (Arc<AtomicUsize>, impl Fn(usize))68 fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
69     let count = Arc::new(AtomicUsize::new(0));
70     (Arc::clone(&count), move |_| {
71         count.fetch_add(1, Ordering::SeqCst);
72     })
73 }
74 
75 /// Wait until a counter is no longer shared, then return its value.
wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize76 fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
77     use std::{thread, time};
78 
79     for _ in 0..60 {
80         counter = match Arc::try_unwrap(counter) {
81             Ok(counter) => return counter.into_inner(),
82             Err(counter) => {
83                 thread::sleep(time::Duration::from_secs(1));
84                 counter
85             }
86         };
87     }
88 
89     // That's too long!
90     panic!("Counter is still shared!");
91 }
92 
93 #[test]
94 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
failed_thread_stack()95 fn failed_thread_stack() {
96     // Note: we first tried to force failure with a `usize::MAX` stack, but
97     // macOS and Windows weren't fazed, or at least didn't fail the way we want.
98     // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a
99     // 2GB stack, so it might not fail until the second thread.
100     let stack_size = ::std::isize::MAX as usize;
101 
102     let (start_count, start_handler) = count_handler();
103     let (exit_count, exit_handler) = count_handler();
104     let builder = ThreadPoolBuilder::new()
105         .num_threads(10)
106         .stack_size(stack_size)
107         .start_handler(start_handler)
108         .exit_handler(exit_handler);
109 
110     let pool = builder.build();
111     assert!(pool.is_err(), "thread stack should have failed!");
112 
113     // With such a huge stack, 64-bit will probably fail on the first thread;
114     // 32-bit might manage the first 2GB, but certainly fail the second.
115     let start_count = wait_for_counter(start_count);
116     assert!(start_count <= 1);
117     assert_eq!(start_count, wait_for_counter(exit_count));
118 }
119 
120 #[test]
121 #[cfg_attr(not(panic = "unwind"), ignore)]
panic_thread_name()122 fn panic_thread_name() {
123     let (start_count, start_handler) = count_handler();
124     let (exit_count, exit_handler) = count_handler();
125     let builder = ThreadPoolBuilder::new()
126         .num_threads(10)
127         .start_handler(start_handler)
128         .exit_handler(exit_handler)
129         .thread_name(|i| {
130             if i >= 5 {
131                 panic!();
132             }
133             format!("panic_thread_name#{}", i)
134         });
135 
136     let pool = crate::unwind::halt_unwinding(|| builder.build());
137     assert!(pool.is_err(), "thread-name panic should propagate!");
138 
139     // Assuming they're created in order, threads 0 through 4 should have
140     // been started already, and then terminated by the panic.
141     assert_eq!(5, wait_for_counter(start_count));
142     assert_eq!(5, wait_for_counter(exit_count));
143 }
144 
145 #[test]
146 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
self_install()147 fn self_install() {
148     let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
149 
150     // If the inner `install` blocks, then nothing will actually run it!
151     assert!(pool.install(|| pool.install(|| true)));
152 }
153 
154 #[test]
155 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
mutual_install()156 fn mutual_install() {
157     let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
158     let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
159 
160     let ok = pool1.install(|| {
161         // This creates a dependency from `pool1` -> `pool2`
162         pool2.install(|| {
163             // This creates a dependency from `pool2` -> `pool1`
164             pool1.install(|| {
165                 // If they blocked on inter-pool installs, there would be no
166                 // threads left to run this!
167                 true
168             })
169         })
170     });
171     assert!(ok);
172 }
173 
174 #[test]
175 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
mutual_install_sleepy()176 fn mutual_install_sleepy() {
177     use std::{thread, time};
178 
179     let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
180     let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
181 
182     let ok = pool1.install(|| {
183         // This creates a dependency from `pool1` -> `pool2`
184         pool2.install(|| {
185             // Give `pool1` time to fall asleep.
186             thread::sleep(time::Duration::from_secs(1));
187 
188             // This creates a dependency from `pool2` -> `pool1`
189             pool1.install(|| {
190                 // Give `pool2` time to fall asleep.
191                 thread::sleep(time::Duration::from_secs(1));
192 
193                 // If they blocked on inter-pool installs, there would be no
194                 // threads left to run this!
195                 true
196             })
197         })
198     });
199     assert!(ok);
200 }
201 
202 #[test]
203 #[allow(deprecated)]
204 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
check_thread_pool_new()205 fn check_thread_pool_new() {
206     let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
207     assert_eq!(pool.current_num_threads(), 22);
208 }
209 
210 macro_rules! test_scope_order {
211     ($scope:ident => $spawn:ident) => {{
212         let builder = ThreadPoolBuilder::new().num_threads(1);
213         let pool = builder.build().unwrap();
214         pool.install(|| {
215             let vec = Mutex::new(vec![]);
216             pool.$scope(|scope| {
217                 let vec = &vec;
218                 for i in 0..10 {
219                     scope.$spawn(move |_| {
220                         vec.lock().unwrap().push(i);
221                     });
222                 }
223             });
224             vec.into_inner().unwrap()
225         })
226     }};
227 }
228 
229 #[test]
230 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
scope_lifo_order()231 fn scope_lifo_order() {
232     let vec = test_scope_order!(scope => spawn);
233     let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
234     assert_eq!(vec, expected);
235 }
236 
237 #[test]
238 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
scope_fifo_order()239 fn scope_fifo_order() {
240     let vec = test_scope_order!(scope_fifo => spawn_fifo);
241     let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
242     assert_eq!(vec, expected);
243 }
244 
245 macro_rules! test_spawn_order {
246     ($spawn:ident) => {{
247         let builder = ThreadPoolBuilder::new().num_threads(1);
248         let pool = &builder.build().unwrap();
249         let (tx, rx) = channel();
250         pool.install(move || {
251             for i in 0..10 {
252                 let tx = tx.clone();
253                 pool.$spawn(move || {
254                     tx.send(i).unwrap();
255                 });
256             }
257         });
258         rx.iter().collect::<Vec<i32>>()
259     }};
260 }
261 
262 #[test]
263 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_lifo_order()264 fn spawn_lifo_order() {
265     let vec = test_spawn_order!(spawn);
266     let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
267     assert_eq!(vec, expected);
268 }
269 
270 #[test]
271 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
spawn_fifo_order()272 fn spawn_fifo_order() {
273     let vec = test_spawn_order!(spawn_fifo);
274     let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
275     assert_eq!(vec, expected);
276 }
277 
278 #[test]
279 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
nested_scopes()280 fn nested_scopes() {
281     // Create matching scopes for every thread pool.
282     fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
283     where
284         OP: FnOnce(&[&Scope<'scope>]) + Send,
285     {
286         if let Some((pool, tail)) = pools.split_first() {
287             pool.scope(move |s| {
288                 // This move reduces the reference lifetimes by variance to match s,
289                 // but the actual scopes are still tied to the invariant 'scope.
290                 let mut scopes = scopes;
291                 scopes.push(s);
292                 nest(tail, scopes, op)
293             })
294         } else {
295             (op)(&scopes)
296         }
297     }
298 
299     let pools: Vec<_> = (0..10)
300         .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
301         .collect();
302 
303     let counter = AtomicUsize::new(0);
304     nest(&pools, vec![], |scopes| {
305         for &s in scopes {
306             s.spawn(|_| {
307                 // Our 'scope lets us borrow the counter in every pool.
308                 counter.fetch_add(1, Ordering::Relaxed);
309             });
310         }
311     });
312     assert_eq!(counter.into_inner(), pools.len());
313 }
314 
315 #[test]
316 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
nested_fifo_scopes()317 fn nested_fifo_scopes() {
318     // Create matching fifo scopes for every thread pool.
319     fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
320     where
321         OP: FnOnce(&[&ScopeFifo<'scope>]) + Send,
322     {
323         if let Some((pool, tail)) = pools.split_first() {
324             pool.scope_fifo(move |s| {
325                 // This move reduces the reference lifetimes by variance to match s,
326                 // but the actual scopes are still tied to the invariant 'scope.
327                 let mut scopes = scopes;
328                 scopes.push(s);
329                 nest(tail, scopes, op)
330             })
331         } else {
332             (op)(&scopes)
333         }
334     }
335 
336     let pools: Vec<_> = (0..10)
337         .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
338         .collect();
339 
340     let counter = AtomicUsize::new(0);
341     nest(&pools, vec![], |scopes| {
342         for &s in scopes {
343             s.spawn_fifo(|_| {
344                 // Our 'scope lets us borrow the counter in every pool.
345                 counter.fetch_add(1, Ordering::Relaxed);
346             });
347         }
348     });
349     assert_eq!(counter.into_inner(), pools.len());
350 }
351 
352 #[test]
353 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
in_place_scope_no_deadlock()354 fn in_place_scope_no_deadlock() {
355     let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
356     let (tx, rx) = channel();
357     let rx_ref = &rx;
358     pool.in_place_scope(move |s| {
359         // With regular scopes this closure would never run because this scope op
360         // itself would block the only worker thread.
361         s.spawn(move |_| {
362             tx.send(()).unwrap();
363         });
364         rx_ref.recv().unwrap();
365     });
366 }
367 
368 #[test]
369 #[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
in_place_scope_fifo_no_deadlock()370 fn in_place_scope_fifo_no_deadlock() {
371     let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
372     let (tx, rx) = channel();
373     let rx_ref = &rx;
374     pool.in_place_scope_fifo(move |s| {
375         // With regular scopes this closure would never run because this scope op
376         // itself would block the only worker thread.
377         s.spawn_fifo(move |_| {
378             tx.send(()).unwrap();
379         });
380         rx_ref.recv().unwrap();
381     });
382 }
383 
384 #[test]
yield_now_to_spawn()385 fn yield_now_to_spawn() {
386     let (tx, rx) = channel();
387 
388     // Queue a regular spawn.
389     crate::spawn(move || tx.send(22).unwrap());
390 
391     // The single-threaded fallback mode (for wasm etc.) won't
392     // get a chance to run the spawn if we never yield to it.
393     crate::registry::in_worker(move |_, _| {
394         crate::yield_now();
395     });
396 
397     // The spawn **must** have started by now, but we still might have to wait
398     // for it to finish if a different thread stole it first.
399     assert_eq!(22, rx.recv().unwrap());
400 }
401 
402 #[test]
yield_local_to_spawn()403 fn yield_local_to_spawn() {
404     let (tx, rx) = channel();
405 
406     // Queue a regular spawn.
407     crate::spawn(move || tx.send(22).unwrap());
408 
409     // The single-threaded fallback mode (for wasm etc.) won't
410     // get a chance to run the spawn if we never yield to it.
411     crate::registry::in_worker(move |_, _| {
412         crate::yield_local();
413     });
414 
415     // The spawn **must** have started by now, but we still might have to wait
416     // for it to finish if a different thread stole it first.
417     assert_eq!(22, rx.recv().unwrap());
418 }
419