1 //! Threads that can borrow variables from the stack.
2 //!
3 //! Create a scope when spawned threads need to access variables on the stack:
4 //!
5 //! ```
6 //! use crossbeam_utils::thread;
7 //!
8 //! let people = vec![
9 //!     "Alice".to_string(),
10 //!     "Bob".to_string(),
11 //!     "Carol".to_string(),
12 //! ];
13 //!
14 //! thread::scope(|s| {
15 //!     for person in &people {
16 //!         s.spawn(move |_| {
17 //!             println!("Hello, {}!", person);
18 //!         });
19 //!     }
20 //! }).unwrap();
21 //! ```
22 //!
23 //! # Why scoped threads?
24 //!
25 //! Suppose we wanted to re-write the previous example using plain threads:
26 //!
27 //! ```compile_fail,E0597
28 //! use std::thread;
29 //!
30 //! let people = vec![
31 //!     "Alice".to_string(),
32 //!     "Bob".to_string(),
33 //!     "Carol".to_string(),
34 //! ];
35 //!
36 //! let mut threads = Vec::new();
37 //!
38 //! for person in &people {
39 //!     threads.push(thread::spawn(move || {
40 //!         println!("Hello, {}!", person);
41 //!     }));
42 //! }
43 //!
44 //! for thread in threads {
45 //!     thread.join().unwrap();
46 //! }
47 //! ```
48 //!
49 //! This doesn't work because the borrow checker complains about `people` not living long enough:
50 //!
51 //! ```text
52 //! error[E0597]: `people` does not live long enough
53 //!   --> src/main.rs:12:20
54 //!    |
55 //! 12 |     for person in &people {
56 //!    |                    ^^^^^^ borrowed value does not live long enough
57 //! ...
58 //! 21 | }
59 //!    | - borrowed value only lives until here
60 //!    |
61 //!    = note: borrowed value must be valid for the static lifetime...
62 //! ```
63 //!
64 //! The problem here is that spawned threads are not allowed to borrow variables on stack because
65 //! the compiler cannot prove they will be joined before `people` is destroyed.
66 //!
67 //! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined
68 //! before the scope ends.
69 //!
70 //! # How scoped threads work
71 //!
72 //! If a variable is borrowed by a thread, the thread must complete before the variable is
73 //! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the
74 //! `'static` lifetime because the borrow checker cannot be sure when the thread will complete.
75 //!
76 //! A scope creates a clear boundary between variables outside the scope and threads inside the
77 //! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends.
78 //! This way we guarantee to the borrow checker that scoped threads only live within the scope and
79 //! can safely access variables outside it.
80 //!
81 //! # Nesting scoped threads
82 //!
83 //! Sometimes scoped threads need to spawn more threads within the same scope. This is a little
84 //! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such
85 //! cannot be borrowed by scoped threads:
86 //!
87 //! ```compile_fail,E0521
88 //! use crossbeam_utils::thread;
89 //!
90 //! thread::scope(|s| {
91 //!     s.spawn(|_| {
92 //!         // Not going to compile because we're trying to borrow `s`,
93 //!         // which lives *inside* the scope! :(
94 //!         s.spawn(|_| println!("nested thread"));
95 //!     });
96 //! });
97 //! ```
98 //!
99 //! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an
100 //! argument, which can be used for spawning nested threads:
101 //!
102 //! ```
103 //! use crossbeam_utils::thread;
104 //!
105 //! thread::scope(|s| {
106 //!     // Note the `|s|` here.
107 //!     s.spawn(|s| {
108 //!         // Yay, this works because we're using a fresh argument `s`! :)
109 //!         s.spawn(|_| println!("nested thread"));
110 //!     });
111 //! }).unwrap();
112 //! ```
113 
114 use std::boxed::Box;
115 use std::fmt;
116 use std::io;
117 use std::marker::PhantomData;
118 use std::mem;
119 use std::panic;
120 use std::string::String;
121 use std::sync::{Arc, Mutex};
122 use std::thread;
123 use std::vec::Vec;
124 
125 use crate::sync::WaitGroup;
126 
127 type SharedVec<T> = Arc<Mutex<Vec<T>>>;
128 type SharedOption<T> = Arc<Mutex<Option<T>>>;
129 
130 /// Creates a new scope for spawning threads.
131 ///
132 /// All child threads that haven't been manually joined will be automatically joined just before
133 /// this function invocation ends. If all joined threads have successfully completed, `Ok` is
134 /// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is
135 /// returned containing errors from panicked threads. Note that if panics are implemented by
136 /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
137 ///
138 /// **Note:** Since Rust 1.63, this function is soft-deprecated in favor of the more efficient [`std::thread::scope`].
139 ///
140 /// # Examples
141 ///
142 /// ```
143 /// use crossbeam_utils::thread;
144 ///
145 /// let var = vec![1, 2, 3];
146 ///
147 /// thread::scope(|s| {
148 ///     s.spawn(|_| {
149 ///         println!("A child thread borrowing `var`: {:?}", var);
150 ///     });
151 /// }).unwrap();
152 /// ```
scope<'env, F, R>(f: F) -> thread::Result<R> where F: FnOnce(&Scope<'env>) -> R,153 pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
154 where
155     F: FnOnce(&Scope<'env>) -> R,
156 {
157     struct AbortOnPanic;
158     impl Drop for AbortOnPanic {
159         fn drop(&mut self) {
160             if thread::panicking() {
161                 std::process::abort();
162             }
163         }
164     }
165 
166     let wg = WaitGroup::new();
167     let scope = Scope::<'env> {
168         handles: SharedVec::default(),
169         wait_group: wg.clone(),
170         _marker: PhantomData,
171     };
172 
173     // Execute the scoped function, but catch any panics.
174     let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));
175 
176     // If an unwinding panic occurs before all threads are joined
177     // promote it to an aborting panic to prevent any threads from escaping the scope.
178     let guard = AbortOnPanic;
179 
180     // Wait until all nested scopes are dropped.
181     drop(scope.wait_group);
182     wg.wait();
183 
184     // Join all remaining spawned threads.
185     let panics: Vec<_> = scope
186         .handles
187         .lock()
188         .unwrap()
189         // Filter handles that haven't been joined, join them, and collect errors.
190         .drain(..)
191         .filter_map(|handle| handle.lock().unwrap().take())
192         .filter_map(|handle| handle.join().err())
193         .collect();
194 
195     mem::forget(guard);
196 
197     // If `f` has panicked, resume unwinding.
198     // If any of the child threads have panicked, return the panic errors.
199     // Otherwise, everything is OK and return the result of `f`.
200     match result {
201         Err(err) => panic::resume_unwind(err),
202         Ok(res) => {
203             if panics.is_empty() {
204                 Ok(res)
205             } else {
206                 Err(Box::new(panics))
207             }
208         }
209     }
210 }
211 
212 /// A scope for spawning threads.
213 pub struct Scope<'env> {
214     /// The list of the thread join handles.
215     handles: SharedVec<SharedOption<thread::JoinHandle<()>>>,
216 
217     /// Used to wait until all subscopes all dropped.
218     wait_group: WaitGroup,
219 
220     /// Borrows data with invariant lifetime `'env`.
221     _marker: PhantomData<&'env mut &'env ()>,
222 }
223 
224 unsafe impl Sync for Scope<'_> {}
225 
226 impl<'env> Scope<'env> {
227     /// Spawns a scoped thread.
228     ///
229     /// This method is similar to the [`spawn`] function in Rust's standard library. The difference
230     /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits,
231     /// allowing it to reference variables outside the scope.
232     ///
233     /// The scoped thread is passed a reference to this scope as an argument, which can be used for
234     /// spawning nested threads.
235     ///
236     /// The returned [handle](ScopedJoinHandle) can be used to manually
237     /// [join](ScopedJoinHandle::join) the thread before the scope exits.
238     ///
239     /// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the
240     /// stack size or the name of the thread, use this API instead.
241     ///
242     /// [`spawn`]: std::thread::spawn
243     ///
244     /// # Panics
245     ///
246     /// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`]
247     /// to recover from such errors.
248     ///
249     /// # Examples
250     ///
251     /// ```
252     /// use crossbeam_utils::thread;
253     ///
254     /// thread::scope(|s| {
255     ///     let handle = s.spawn(|_| {
256     ///         println!("A child thread is running");
257     ///         42
258     ///     });
259     ///
260     ///     // Join the thread and retrieve its result.
261     ///     let res = handle.join().unwrap();
262     ///     assert_eq!(res, 42);
263     /// }).unwrap();
264     /// ```
spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> where F: FnOnce(&Scope<'env>) -> T, F: Send + 'env, T: Send + 'env,265     pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
266     where
267         F: FnOnce(&Scope<'env>) -> T,
268         F: Send + 'env,
269         T: Send + 'env,
270     {
271         self.builder()
272             .spawn(f)
273             .expect("failed to spawn scoped thread")
274     }
275 
276     /// Creates a builder that can configure a thread before spawning.
277     ///
278     /// # Examples
279     ///
280     /// ```
281     /// use crossbeam_utils::thread;
282     ///
283     /// thread::scope(|s| {
284     ///     s.builder()
285     ///         .spawn(|_| println!("A child thread is running"))
286     ///         .unwrap();
287     /// }).unwrap();
288     /// ```
builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env>289     pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> {
290         ScopedThreadBuilder {
291             scope: self,
292             builder: thread::Builder::new(),
293         }
294     }
295 }
296 
297 impl fmt::Debug for Scope<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result298     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
299         f.pad("Scope { .. }")
300     }
301 }
302 
303 /// Configures the properties of a new thread.
304 ///
305 /// The two configurable properties are:
306 ///
307 /// - [`name`]: Specifies an [associated name for the thread][naming-threads].
308 /// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size].
309 ///
310 /// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the
311 /// thread handle with the given configuration.
312 ///
313 /// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return
314 /// value. You may want to use this builder when you want to recover from a failure to launch a
315 /// thread.
316 ///
317 /// # Examples
318 ///
319 /// ```
320 /// use crossbeam_utils::thread;
321 ///
322 /// thread::scope(|s| {
323 ///     s.builder()
324 ///         .spawn(|_| println!("Running a child thread"))
325 ///         .unwrap();
326 /// }).unwrap();
327 /// ```
328 ///
329 /// [`name`]: ScopedThreadBuilder::name
330 /// [`stack_size`]: ScopedThreadBuilder::stack_size
331 /// [`spawn`]: ScopedThreadBuilder::spawn
332 /// [`io::Result`]: std::io::Result
333 /// [naming-threads]: std::thread#naming-threads
334 /// [stack-size]: std::thread#stack-size
335 #[derive(Debug)]
336 pub struct ScopedThreadBuilder<'scope, 'env> {
337     scope: &'scope Scope<'env>,
338     builder: thread::Builder,
339 }
340 
341 impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> {
342     /// Sets the name for the new thread.
343     ///
344     /// The name must not contain null bytes (`\0`).
345     ///
346     /// For more information about named threads, see [here][naming-threads].
347     ///
348     /// # Examples
349     ///
350     /// ```
351     /// use crossbeam_utils::thread;
352     /// use std::thread::current;
353     ///
354     /// thread::scope(|s| {
355     ///     s.builder()
356     ///         .name("my thread".to_string())
357     ///         .spawn(|_| assert_eq!(current().name(), Some("my thread")))
358     ///         .unwrap();
359     /// }).unwrap();
360     /// ```
361     ///
362     /// [naming-threads]: std::thread#naming-threads
name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env>363     pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> {
364         self.builder = self.builder.name(name);
365         self
366     }
367 
368     /// Sets the size of the stack for the new thread.
369     ///
370     /// The stack size is measured in bytes.
371     ///
372     /// For more information about the stack size for threads, see [here][stack-size].
373     ///
374     /// # Examples
375     ///
376     /// ```
377     /// use crossbeam_utils::thread;
378     ///
379     /// thread::scope(|s| {
380     ///     s.builder()
381     ///         .stack_size(32 * 1024)
382     ///         .spawn(|_| println!("Running a child thread"))
383     ///         .unwrap();
384     /// }).unwrap();
385     /// ```
386     ///
387     /// [stack-size]: std::thread#stack-size
stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env>388     pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> {
389         self.builder = self.builder.stack_size(size);
390         self
391     }
392 
393     /// Spawns a scoped thread with this configuration.
394     ///
395     /// The scoped thread is passed a reference to this scope as an argument, which can be used for
396     /// spawning nested threads.
397     ///
398     /// The returned handle can be used to manually join the thread before the scope exits.
399     ///
400     /// # Errors
401     ///
402     /// Unlike the [`Scope::spawn`] method, this method yields an
403     /// [`io::Result`] to capture any failure to create the thread at
404     /// the OS level.
405     ///
406     /// [`io::Result`]: std::io::Result
407     ///
408     /// # Panics
409     ///
410     /// Panics if a thread name was set and it contained null bytes.
411     ///
412     /// # Examples
413     ///
414     /// ```
415     /// use crossbeam_utils::thread;
416     ///
417     /// thread::scope(|s| {
418     ///     let handle = s.builder()
419     ///         .spawn(|_| {
420     ///             println!("A child thread is running");
421     ///             42
422     ///         })
423     ///         .unwrap();
424     ///
425     ///     // Join the thread and retrieve its result.
426     ///     let res = handle.join().unwrap();
427     ///     assert_eq!(res, 42);
428     /// }).unwrap();
429     /// ```
spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>> where F: FnOnce(&Scope<'env>) -> T, F: Send + 'env, T: Send + 'env,430     pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>>
431     where
432         F: FnOnce(&Scope<'env>) -> T,
433         F: Send + 'env,
434         T: Send + 'env,
435     {
436         // The result of `f` will be stored here.
437         let result = SharedOption::default();
438 
439         // Spawn the thread and grab its join handle and thread handle.
440         let (handle, thread) = {
441             let result = Arc::clone(&result);
442 
443             // A clone of the scope that will be moved into the new thread.
444             let scope = Scope::<'env> {
445                 handles: Arc::clone(&self.scope.handles),
446                 wait_group: self.scope.wait_group.clone(),
447                 _marker: PhantomData,
448             };
449 
450             // Spawn the thread.
451             let handle = {
452                 let closure = move || {
453                     // Make sure the scope is inside the closure with the proper `'env` lifetime.
454                     let scope: Scope<'env> = scope;
455 
456                     // Run the closure.
457                     let res = f(&scope);
458 
459                     // Store the result if the closure didn't panic.
460                     *result.lock().unwrap() = Some(res);
461                 };
462 
463                 // Allocate `closure` on the heap and erase the `'env` bound.
464                 let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure);
465                 let closure: Box<dyn FnOnce() + Send + 'static> =
466                     unsafe { mem::transmute(closure) };
467 
468                 // Finally, spawn the closure.
469                 self.builder.spawn(closure)?
470             };
471 
472             let thread = handle.thread().clone();
473             let handle = Arc::new(Mutex::new(Some(handle)));
474             (handle, thread)
475         };
476 
477         // Add the handle to the shared list of join handles.
478         self.scope.handles.lock().unwrap().push(Arc::clone(&handle));
479 
480         Ok(ScopedJoinHandle {
481             handle,
482             result,
483             thread,
484             _marker: PhantomData,
485         })
486     }
487 }
488 
489 unsafe impl<T> Send for ScopedJoinHandle<'_, T> {}
490 unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {}
491 
492 /// A handle that can be used to join its scoped thread.
493 ///
494 /// This struct is created by the [`Scope::spawn`] method and the
495 /// [`ScopedThreadBuilder::spawn`] method.
496 pub struct ScopedJoinHandle<'scope, T> {
497     /// A join handle to the spawned thread.
498     handle: SharedOption<thread::JoinHandle<()>>,
499 
500     /// Holds the result of the inner closure.
501     result: SharedOption<T>,
502 
503     /// A handle to the spawned thread.
504     thread: thread::Thread,
505 
506     /// Borrows the parent scope with lifetime `'scope`.
507     _marker: PhantomData<&'scope ()>,
508 }
509 
510 impl<T> ScopedJoinHandle<'_, T> {
511     /// Waits for the thread to finish and returns its result.
512     ///
513     /// If the child thread panics, an error is returned. Note that if panics are implemented by
514     /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
515     ///
516     /// # Panics
517     ///
518     /// This function may panic on some platforms if a thread attempts to join itself or otherwise
519     /// may create a deadlock with joining threads.
520     ///
521     /// # Examples
522     ///
523     /// ```
524     /// use crossbeam_utils::thread;
525     ///
526     /// thread::scope(|s| {
527     ///     let handle1 = s.spawn(|_| println!("I'm a happy thread :)"));
528     ///     let handle2 = s.spawn(|_| panic!("I'm a sad thread :("));
529     ///
530     ///     // Join the first thread and verify that it succeeded.
531     ///     let res = handle1.join();
532     ///     assert!(res.is_ok());
533     ///
534     ///     // Join the second thread and verify that it panicked.
535     ///     let res = handle2.join();
536     ///     assert!(res.is_err());
537     /// }).unwrap();
538     /// ```
join(self) -> thread::Result<T>539     pub fn join(self) -> thread::Result<T> {
540         // Take out the handle. The handle will surely be available because the root scope waits
541         // for nested scopes before joining remaining threads.
542         let handle = self.handle.lock().unwrap().take().unwrap();
543 
544         // Join the thread and then take the result out of its inner closure.
545         handle
546             .join()
547             .map(|()| self.result.lock().unwrap().take().unwrap())
548     }
549 
550     /// Returns a handle to the underlying thread.
551     ///
552     /// # Examples
553     ///
554     /// ```
555     /// use crossbeam_utils::thread;
556     ///
557     /// thread::scope(|s| {
558     ///     let handle = s.spawn(|_| println!("A child thread is running"));
559     ///     println!("The child thread ID: {:?}", handle.thread().id());
560     /// }).unwrap();
561     /// ```
thread(&self) -> &thread::Thread562     pub fn thread(&self) -> &thread::Thread {
563         &self.thread
564     }
565 }
566 
567 /// Unix-specific extensions.
568 #[cfg(unix)]
569 mod unix {
570     use super::ScopedJoinHandle;
571     use std::os::unix::thread::{JoinHandleExt, RawPthread};
572 
573     impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> {
as_pthread_t(&self) -> RawPthread574         fn as_pthread_t(&self) -> RawPthread {
575             // Borrow the handle. The handle will surely be available because the root scope waits
576             // for nested scopes before joining remaining threads.
577             let handle = self.handle.lock().unwrap();
578             handle.as_ref().unwrap().as_pthread_t()
579         }
into_pthread_t(self) -> RawPthread580         fn into_pthread_t(self) -> RawPthread {
581             self.as_pthread_t()
582         }
583     }
584 }
585 /// Windows-specific extensions.
586 #[cfg(windows)]
587 mod windows {
588     use super::ScopedJoinHandle;
589     use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle};
590 
591     impl<T> AsRawHandle for ScopedJoinHandle<'_, T> {
as_raw_handle(&self) -> RawHandle592         fn as_raw_handle(&self) -> RawHandle {
593             // Borrow the handle. The handle will surely be available because the root scope waits
594             // for nested scopes before joining remaining threads.
595             let handle = self.handle.lock().unwrap();
596             handle.as_ref().unwrap().as_raw_handle()
597         }
598     }
599 
600     impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> {
into_raw_handle(self) -> RawHandle601         fn into_raw_handle(self) -> RawHandle {
602             self.as_raw_handle()
603         }
604     }
605 }
606 
607 impl<T> fmt::Debug for ScopedJoinHandle<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result608     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
609         f.pad("ScopedJoinHandle { .. }")
610     }
611 }
612