1 //! Rayon-core houses the core stable APIs of Rayon.
2 //!
3 //! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there.
4 //!
5 //! [`join`] is used to take two closures and potentially run them in parallel.
6 //!   - It will run in parallel if task B gets stolen before task A can finish.
7 //!   - It will run sequentially if task A finishes before task B is stolen and can continue on task B.
8 //!
9 //! [`scope`] creates a scope in which you can run any number of parallel tasks.
10 //! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed.
11 //! The scope will exist until all tasks spawned within the scope have been completed.
12 //!
13 //! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function.
14 //!
15 //! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one.
16 //! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque,
17 //! where it becomes available for work stealing from other threads in the local threadpool.
18 //!
19 //! [`join`]: fn.join.html
20 //! [`scope`]: fn.scope.html
21 //! [`scope()`]: fn.scope.html
22 //! [`spawn`]: fn.spawn.html
23 //! [`ThreadPool`]: struct.threadpool.html
24 //! [`install()`]: struct.ThreadPool.html#method.install
25 //! [`spawn()`]: struct.ThreadPool.html#method.spawn
26 //! [`join()`]: struct.ThreadPool.html#method.join
27 //! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
28 //!
29 //! # Global fallback when threading is unsupported
30 //!
31 //! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
32 //! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
33 //! targets are notable examples of this. Rather than panicking on the unsupported error when
34 //! creating the implicit global threadpool, Rayon configures a fallback mode instead.
35 //!
36 //! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
37 //! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
38 //! there is no other thread to share the work. However, since the pool is not running independent
39 //! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
40 //! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
41 //! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
42 //! can also volunteer execution time.
43 //!
44 //! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
45 //!
46 //! # Restricting multiple versions
47 //!
48 //! In order to ensure proper coordination between threadpools, and especially
49 //! to make sure there's only one global threadpool, `rayon-core` is actively
50 //! restricted from building multiple versions of itself into a single target.
51 //! You may see a build error like this in violation:
52 //!
53 //! ```text
54 //! error: native library `rayon-core` is being linked to by more
55 //! than one package, and can only be linked to by one package
56 //! ```
57 //!
58 //! While we strive to keep `rayon-core` semver-compatible, it's still
59 //! possible to arrive at this situation if different crates have overly
60 //! restrictive tilde or inequality requirements for `rayon-core`.  The
61 //! conflicting requirements will need to be resolved before the build will
62 //! succeed.
63 
64 #![deny(missing_debug_implementations)]
65 #![deny(missing_docs)]
66 #![deny(unreachable_pub)]
67 #![warn(rust_2018_idioms)]
68 
69 use std::any::Any;
70 use std::env;
71 use std::error::Error;
72 use std::fmt;
73 use std::io;
74 use std::marker::PhantomData;
75 use std::str::FromStr;
76 use std::thread;
77 
78 #[macro_use]
79 mod private;
80 
81 mod broadcast;
82 mod job;
83 mod join;
84 mod latch;
85 mod registry;
86 mod scope;
87 mod sleep;
88 mod spawn;
89 mod thread_pool;
90 mod unwind;
91 
92 mod compile_fail;
93 mod test;
94 
95 pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
96 pub use self::join::{join, join_context};
97 pub use self::registry::ThreadBuilder;
98 pub use self::scope::{in_place_scope, scope, Scope};
99 pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
100 pub use self::spawn::{spawn, spawn_fifo};
101 pub use self::thread_pool::current_thread_has_pending_tasks;
102 pub use self::thread_pool::current_thread_index;
103 pub use self::thread_pool::ThreadPool;
104 pub use self::thread_pool::{yield_local, yield_now, Yield};
105 
106 #[cfg(not(feature = "web_spin_lock"))]
107 use std::sync;
108 
109 #[cfg(feature = "web_spin_lock")]
110 use wasm_sync as sync;
111 
112 use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
113 
114 /// Returns the maximum number of threads that Rayon supports in a single thread-pool.
115 ///
116 /// If a higher thread count is requested by calling `ThreadPoolBuilder::num_threads` or by setting
117 /// the `RAYON_NUM_THREADS` environment variable, then it will be reduced to this maximum.
118 ///
119 /// The value may vary between different targets, and is subject to change in new Rayon versions.
max_num_threads() -> usize120 pub fn max_num_threads() -> usize {
121     // We are limited by the bits available in the sleep counter's `AtomicUsize`.
122     crate::sleep::THREADS_MAX
123 }
124 
125 /// Returns the number of threads in the current registry. If this
126 /// code is executing within a Rayon thread-pool, then this will be
127 /// the number of threads for the thread-pool of the current
128 /// thread. Otherwise, it will be the number of threads for the global
129 /// thread-pool.
130 ///
131 /// This can be useful when trying to judge how many times to split
132 /// parallel work (the parallel iterator traits use this value
133 /// internally for this purpose).
134 ///
135 /// # Future compatibility note
136 ///
137 /// Note that unless this thread-pool was created with a
138 /// builder that specifies the number of threads, then this
139 /// number may vary over time in future versions (see [the
140 /// `num_threads()` method for details][snt]).
141 ///
142 /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
current_num_threads() -> usize143 pub fn current_num_threads() -> usize {
144     crate::registry::Registry::current_num_threads()
145 }
146 
147 /// Error when initializing a thread pool.
148 #[derive(Debug)]
149 pub struct ThreadPoolBuildError {
150     kind: ErrorKind,
151 }
152 
153 #[derive(Debug)]
154 enum ErrorKind {
155     GlobalPoolAlreadyInitialized,
156     CurrentThreadAlreadyInPool,
157     IOError(io::Error),
158 }
159 
160 /// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool.
161 /// ## Creating a ThreadPool
162 /// The following creates a thread pool with 22 threads.
163 ///
164 /// ```rust
165 /// # use rayon_core as rayon;
166 /// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
167 /// ```
168 ///
169 /// To instead configure the global thread pool, use [`build_global()`]:
170 ///
171 /// ```rust
172 /// # use rayon_core as rayon;
173 /// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
174 /// ```
175 ///
176 /// [`ThreadPool`]: struct.ThreadPool.html
177 /// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
178 pub struct ThreadPoolBuilder<S = DefaultSpawn> {
179     /// The number of threads in the rayon thread pool.
180     /// If zero will use the RAYON_NUM_THREADS environment variable.
181     /// If RAYON_NUM_THREADS is invalid or zero will use the default.
182     num_threads: usize,
183 
184     /// The thread we're building *from* will also be part of the pool.
185     use_current_thread: bool,
186 
187     /// Custom closure, if any, to handle a panic that we cannot propagate
188     /// anywhere else.
189     panic_handler: Option<Box<PanicHandler>>,
190 
191     /// Closure to compute the name of a thread.
192     get_thread_name: Option<Box<dyn FnMut(usize) -> String>>,
193 
194     /// The stack size for the created worker threads
195     stack_size: Option<usize>,
196 
197     /// Closure invoked on worker thread start.
198     start_handler: Option<Box<StartHandler>>,
199 
200     /// Closure invoked on worker thread exit.
201     exit_handler: Option<Box<ExitHandler>>,
202 
203     /// Closure invoked to spawn threads.
204     spawn_handler: S,
205 
206     /// If false, worker threads will execute spawned jobs in a
207     /// "depth-first" fashion. If true, they will do a "breadth-first"
208     /// fashion. Depth-first is the default.
209     breadth_first: bool,
210 }
211 
212 /// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
213 ///
214 /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
215 #[deprecated(note = "Use `ThreadPoolBuilder`")]
216 #[derive(Default)]
217 pub struct Configuration {
218     builder: ThreadPoolBuilder,
219 }
220 
221 /// The type for a panic handling closure. Note that this same closure
222 /// may be invoked multiple times in parallel.
223 type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
224 
225 /// The type for a closure that gets invoked when a thread starts. The
226 /// closure is passed the index of the thread on which it is invoked.
227 /// Note that this same closure may be invoked multiple times in parallel.
228 type StartHandler = dyn Fn(usize) + Send + Sync;
229 
230 /// The type for a closure that gets invoked when a thread exits. The
231 /// closure is passed the index of the thread on which is is invoked.
232 /// Note that this same closure may be invoked multiple times in parallel.
233 type ExitHandler = dyn Fn(usize) + Send + Sync;
234 
235 // NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
236 impl Default for ThreadPoolBuilder {
default() -> Self237     fn default() -> Self {
238         ThreadPoolBuilder {
239             num_threads: 0,
240             use_current_thread: false,
241             panic_handler: None,
242             get_thread_name: None,
243             stack_size: None,
244             start_handler: None,
245             exit_handler: None,
246             spawn_handler: DefaultSpawn,
247             breadth_first: false,
248         }
249     }
250 }
251 
252 impl ThreadPoolBuilder {
253     /// Creates and returns a valid rayon thread pool builder, but does not initialize it.
new() -> Self254     pub fn new() -> Self {
255         Self::default()
256     }
257 }
258 
259 /// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
260 /// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
261 impl<S> ThreadPoolBuilder<S>
262 where
263     S: ThreadSpawn,
264 {
265     /// Creates a new `ThreadPool` initialized using this configuration.
build(self) -> Result<ThreadPool, ThreadPoolBuildError>266     pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
267         ThreadPool::build(self)
268     }
269 
270     /// Initializes the global thread pool. This initialization is
271     /// **optional**.  If you do not call this function, the thread pool
272     /// will be automatically initialized with the default
273     /// configuration. Calling `build_global` is not recommended, except
274     /// in two scenarios:
275     ///
276     /// - You wish to change the default configuration.
277     /// - You are running a benchmark, in which case initializing may
278     ///   yield slightly more consistent results, since the worker threads
279     ///   will already be ready to go even in the first iteration.  But
280     ///   this cost is minimal.
281     ///
282     /// Initialization of the global thread pool happens exactly
283     /// once. Once started, the configuration cannot be
284     /// changed. Therefore, if you call `build_global` a second time, it
285     /// will return an error. An `Ok` result indicates that this
286     /// is the first initialization of the thread pool.
build_global(self) -> Result<(), ThreadPoolBuildError>287     pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
288         let registry = registry::init_global_registry(self)?;
289         registry.wait_until_primed();
290         Ok(())
291     }
292 }
293 
294 impl ThreadPoolBuilder {
295     /// Creates a scoped `ThreadPool` initialized using this configuration.
296     ///
297     /// This is a convenience function for building a pool using [`std::thread::scope`]
298     /// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
299     /// The threads in this pool will start by calling `wrapper`, which should
300     /// do initialization and continue by calling `ThreadBuilder::run()`.
301     ///
302     /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
303     ///
304     /// # Examples
305     ///
306     /// A scoped pool may be useful in combination with scoped thread-local variables.
307     ///
308     /// ```
309     /// # use rayon_core as rayon;
310     ///
311     /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
312     ///
313     /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
314     ///     let pool_data = vec![1, 2, 3];
315     ///
316     ///     // We haven't assigned any TLS data yet.
317     ///     assert!(!POOL_DATA.is_set());
318     ///
319     ///     rayon::ThreadPoolBuilder::new()
320     ///         .build_scoped(
321     ///             // Borrow `pool_data` in TLS for each thread.
322     ///             |thread| POOL_DATA.set(&pool_data, || thread.run()),
323     ///             // Do some work that needs the TLS data.
324     ///             |pool| pool.install(|| assert!(POOL_DATA.is_set())),
325     ///         )?;
326     ///
327     ///     // Once we've returned, `pool_data` is no longer borrowed.
328     ///     drop(pool_data);
329     ///     Ok(())
330     /// }
331     /// ```
build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError> where W: Fn(ThreadBuilder) + Sync, F: FnOnce(&ThreadPool) -> R,332     pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
333     where
334         W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
335         F: FnOnce(&ThreadPool) -> R,
336     {
337         std::thread::scope(|scope| {
338             let pool = self
339                 .spawn_handler(|thread| {
340                     let mut builder = std::thread::Builder::new();
341                     if let Some(name) = thread.name() {
342                         builder = builder.name(name.to_string());
343                     }
344                     if let Some(size) = thread.stack_size() {
345                         builder = builder.stack_size(size);
346                     }
347                     builder.spawn_scoped(scope, || wrapper(thread))?;
348                     Ok(())
349                 })
350                 .build()?;
351             Ok(with_pool(&pool))
352         })
353     }
354 }
355 
356 impl<S> ThreadPoolBuilder<S> {
357     /// Sets a custom function for spawning threads.
358     ///
359     /// Note that the threads will not exit until after the pool is dropped. It
360     /// is up to the caller to wait for thread termination if that is important
361     /// for any invariants. For instance, threads created in [`std::thread::scope`]
362     /// will be joined before that scope returns, and this will block indefinitely
363     /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
364     /// until the entire process exits!
365     ///
366     /// # Examples
367     ///
368     /// A minimal spawn handler just needs to call `run()` from an independent thread.
369     ///
370     /// ```
371     /// # use rayon_core as rayon;
372     /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
373     ///     let pool = rayon::ThreadPoolBuilder::new()
374     ///         .spawn_handler(|thread| {
375     ///             std::thread::spawn(|| thread.run());
376     ///             Ok(())
377     ///         })
378     ///         .build()?;
379     ///
380     ///     pool.install(|| println!("Hello from my custom thread!"));
381     ///     Ok(())
382     /// }
383     /// ```
384     ///
385     /// The default spawn handler sets the name and stack size if given, and propagates
386     /// any errors from the thread builder.
387     ///
388     /// ```
389     /// # use rayon_core as rayon;
390     /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
391     ///     let pool = rayon::ThreadPoolBuilder::new()
392     ///         .spawn_handler(|thread| {
393     ///             let mut b = std::thread::Builder::new();
394     ///             if let Some(name) = thread.name() {
395     ///                 b = b.name(name.to_owned());
396     ///             }
397     ///             if let Some(stack_size) = thread.stack_size() {
398     ///                 b = b.stack_size(stack_size);
399     ///             }
400     ///             b.spawn(|| thread.run())?;
401     ///             Ok(())
402     ///         })
403     ///         .build()?;
404     ///
405     ///     pool.install(|| println!("Hello from my fully custom thread!"));
406     ///     Ok(())
407     /// }
408     /// ```
409     ///
410     /// This can also be used for a pool of scoped threads like [`crossbeam::scope`],
411     /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
412     /// [`build_scoped`](#method.build_scoped).
413     ///
414     /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
415     /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
416     ///
417     /// ```
418     /// # use rayon_core as rayon;
419     /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
420     ///     std::thread::scope(|scope| {
421     ///         let pool = rayon::ThreadPoolBuilder::new()
422     ///             .spawn_handler(|thread| {
423     ///                 let mut builder = std::thread::Builder::new();
424     ///                 if let Some(name) = thread.name() {
425     ///                     builder = builder.name(name.to_string());
426     ///                 }
427     ///                 if let Some(size) = thread.stack_size() {
428     ///                     builder = builder.stack_size(size);
429     ///                 }
430     ///                 builder.spawn_scoped(scope, || {
431     ///                     // Add any scoped initialization here, then run!
432     ///                     thread.run()
433     ///                 })?;
434     ///                 Ok(())
435     ///             })
436     ///             .build()?;
437     ///
438     ///         pool.install(|| println!("Hello from my custom scoped thread!"));
439     ///         Ok(())
440     ///     })
441     /// }
442     /// ```
spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>> where F: FnMut(ThreadBuilder) -> io::Result<()>,443     pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
444     where
445         F: FnMut(ThreadBuilder) -> io::Result<()>,
446     {
447         ThreadPoolBuilder {
448             spawn_handler: CustomSpawn::new(spawn),
449             // ..self
450             num_threads: self.num_threads,
451             use_current_thread: self.use_current_thread,
452             panic_handler: self.panic_handler,
453             get_thread_name: self.get_thread_name,
454             stack_size: self.stack_size,
455             start_handler: self.start_handler,
456             exit_handler: self.exit_handler,
457             breadth_first: self.breadth_first,
458         }
459     }
460 
461     /// Returns a reference to the current spawn handler.
get_spawn_handler(&mut self) -> &mut S462     fn get_spawn_handler(&mut self) -> &mut S {
463         &mut self.spawn_handler
464     }
465 
466     /// Get the number of threads that will be used for the thread
467     /// pool. See `num_threads()` for more information.
get_num_threads(&self) -> usize468     fn get_num_threads(&self) -> usize {
469         if self.num_threads > 0 {
470             self.num_threads
471         } else {
472             let default = || {
473                 thread::available_parallelism()
474                     .map(|n| n.get())
475                     .unwrap_or(1)
476             };
477 
478             match env::var("RAYON_NUM_THREADS")
479                 .ok()
480                 .and_then(|s| usize::from_str(&s).ok())
481             {
482                 Some(x @ 1..) => return x,
483                 Some(0) => return default(),
484                 _ => {}
485             }
486 
487             // Support for deprecated `RAYON_RS_NUM_CPUS`.
488             match env::var("RAYON_RS_NUM_CPUS")
489                 .ok()
490                 .and_then(|s| usize::from_str(&s).ok())
491             {
492                 Some(x @ 1..) => x,
493                 _ => default(),
494             }
495         }
496     }
497 
498     /// Get the thread name for the thread with the given index.
get_thread_name(&mut self, index: usize) -> Option<String>499     fn get_thread_name(&mut self, index: usize) -> Option<String> {
500         let f = self.get_thread_name.as_mut()?;
501         Some(f(index))
502     }
503 
504     /// Sets a closure which takes a thread index and returns
505     /// the thread's name.
thread_name<F>(mut self, closure: F) -> Self where F: FnMut(usize) -> String + 'static,506     pub fn thread_name<F>(mut self, closure: F) -> Self
507     where
508         F: FnMut(usize) -> String + 'static,
509     {
510         self.get_thread_name = Some(Box::new(closure));
511         self
512     }
513 
514     /// Sets the number of threads to be used in the rayon threadpool.
515     ///
516     /// If you specify a non-zero number of threads using this
517     /// function, then the resulting thread-pools are guaranteed to
518     /// start at most this number of threads.
519     ///
520     /// If `num_threads` is 0, or you do not call this function, then
521     /// the Rayon runtime will select the number of threads
522     /// automatically. At present, this is based on the
523     /// `RAYON_NUM_THREADS` environment variable (if set),
524     /// or the number of logical CPUs (otherwise).
525     /// In the future, however, the default behavior may
526     /// change to dynamically add or remove threads as needed.
527     ///
528     /// **Future compatibility warning:** Given the default behavior
529     /// may change in the future, if you wish to rely on a fixed
530     /// number of threads, you should use this function to specify
531     /// that number. To reproduce the current default behavior, you
532     /// may wish to use [`std::thread::available_parallelism`]
533     /// to query the number of CPUs dynamically.
534     ///
535     /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
536     /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
537     /// variable. If both variables are specified, `RAYON_NUM_THREADS` will
538     /// be preferred.
num_threads(mut self, num_threads: usize) -> Self539     pub fn num_threads(mut self, num_threads: usize) -> Self {
540         self.num_threads = num_threads;
541         self
542     }
543 
544     /// Use the current thread as one of the threads in the pool.
545     ///
546     /// The current thread is guaranteed to be at index 0, and since the thread is not managed by
547     /// rayon, the spawn and exit handlers do not run for that thread.
548     ///
549     /// Note that the current thread won't run the main work-stealing loop, so jobs spawned into
550     /// the thread-pool will generally not be picked up automatically by this thread unless you
551     /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`].
552     ///
553     /// # Local thread-pools
554     ///
555     /// Using this in a local thread-pool means the registry will be leaked. In future versions
556     /// there might be a way of cleaning up the current-thread state.
use_current_thread(mut self) -> Self557     pub fn use_current_thread(mut self) -> Self {
558         self.use_current_thread = true;
559         self
560     }
561 
562     /// Returns a copy of the current panic handler.
take_panic_handler(&mut self) -> Option<Box<PanicHandler>>563     fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
564         self.panic_handler.take()
565     }
566 
567     /// Normally, whenever Rayon catches a panic, it tries to
568     /// propagate it to someplace sensible, to try and reflect the
569     /// semantics of sequential execution. But in some cases,
570     /// particularly with the `spawn()` APIs, there is no
571     /// obvious place where we should propagate the panic to.
572     /// In that case, this panic handler is invoked.
573     ///
574     /// If no panic handler is set, the default is to abort the
575     /// process, under the principle that panics should not go
576     /// unobserved.
577     ///
578     /// If the panic handler itself panics, this will abort the
579     /// process. To prevent this, wrap the body of your panic handler
580     /// in a call to `std::panic::catch_unwind()`.
panic_handler<H>(mut self, panic_handler: H) -> Self where H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,581     pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
582     where
583         H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
584     {
585         self.panic_handler = Some(Box::new(panic_handler));
586         self
587     }
588 
589     /// Get the stack size of the worker threads
get_stack_size(&self) -> Option<usize>590     fn get_stack_size(&self) -> Option<usize> {
591         self.stack_size
592     }
593 
594     /// Sets the stack size of the worker threads
stack_size(mut self, stack_size: usize) -> Self595     pub fn stack_size(mut self, stack_size: usize) -> Self {
596         self.stack_size = Some(stack_size);
597         self
598     }
599 
600     /// **(DEPRECATED)** Suggest to worker threads that they execute
601     /// spawned jobs in a "breadth-first" fashion.
602     ///
603     /// Typically, when a worker thread is idle or blocked, it will
604     /// attempt to execute the job from the *top* of its local deque of
605     /// work (i.e., the job most recently spawned). If this flag is set
606     /// to true, however, workers will prefer to execute in a
607     /// *breadth-first* fashion -- that is, they will search for jobs at
608     /// the *bottom* of their local deque. (At present, workers *always*
609     /// steal from the bottom of other workers' deques, regardless of
610     /// the setting of this flag.)
611     ///
612     /// If you think of the tasks as a tree, where a parent task
613     /// spawns its children in the tree, then this flag loosely
614     /// corresponds to doing a breadth-first traversal of the tree,
615     /// whereas the default would be to do a depth-first traversal.
616     ///
617     /// **Note that this is an "execution hint".** Rayon's task
618     /// execution is highly dynamic and the precise order in which
619     /// independent tasks are executed is not intended to be
620     /// guaranteed.
621     ///
622     /// This `breadth_first()` method is now deprecated per [RFC #1],
623     /// and in the future its effect may be removed. Consider using
624     /// [`scope_fifo()`] for a similar effect.
625     ///
626     /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
627     /// [`scope_fifo()`]: fn.scope_fifo.html
628     #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
breadth_first(mut self) -> Self629     pub fn breadth_first(mut self) -> Self {
630         self.breadth_first = true;
631         self
632     }
633 
get_breadth_first(&self) -> bool634     fn get_breadth_first(&self) -> bool {
635         self.breadth_first
636     }
637 
638     /// Takes the current thread start callback, leaving `None`.
take_start_handler(&mut self) -> Option<Box<StartHandler>>639     fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
640         self.start_handler.take()
641     }
642 
643     /// Sets a callback to be invoked on thread start.
644     ///
645     /// The closure is passed the index of the thread on which it is invoked.
646     /// Note that this same closure may be invoked multiple times in parallel.
647     /// If this closure panics, the panic will be passed to the panic handler.
648     /// If that handler returns, then startup will continue normally.
start_handler<H>(mut self, start_handler: H) -> Self where H: Fn(usize) + Send + Sync + 'static,649     pub fn start_handler<H>(mut self, start_handler: H) -> Self
650     where
651         H: Fn(usize) + Send + Sync + 'static,
652     {
653         self.start_handler = Some(Box::new(start_handler));
654         self
655     }
656 
657     /// Returns a current thread exit callback, leaving `None`.
take_exit_handler(&mut self) -> Option<Box<ExitHandler>>658     fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> {
659         self.exit_handler.take()
660     }
661 
662     /// Sets a callback to be invoked on thread exit.
663     ///
664     /// The closure is passed the index of the thread on which it is invoked.
665     /// Note that this same closure may be invoked multiple times in parallel.
666     /// If this closure panics, the panic will be passed to the panic handler.
667     /// If that handler returns, then the thread will exit normally.
exit_handler<H>(mut self, exit_handler: H) -> Self where H: Fn(usize) + Send + Sync + 'static,668     pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
669     where
670         H: Fn(usize) + Send + Sync + 'static,
671     {
672         self.exit_handler = Some(Box::new(exit_handler));
673         self
674     }
675 }
676 
677 #[allow(deprecated)]
678 impl Configuration {
679     /// Creates and return a valid rayon thread pool configuration, but does not initialize it.
new() -> Configuration680     pub fn new() -> Configuration {
681         Configuration {
682             builder: ThreadPoolBuilder::new(),
683         }
684     }
685 
686     /// Deprecated in favor of `ThreadPoolBuilder::build`.
build(self) -> Result<ThreadPool, Box<dyn Error + 'static>>687     pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> {
688         self.builder.build().map_err(Box::from)
689     }
690 
691     /// Deprecated in favor of `ThreadPoolBuilder::thread_name`.
thread_name<F>(mut self, closure: F) -> Self where F: FnMut(usize) -> String + 'static,692     pub fn thread_name<F>(mut self, closure: F) -> Self
693     where
694         F: FnMut(usize) -> String + 'static,
695     {
696         self.builder = self.builder.thread_name(closure);
697         self
698     }
699 
700     /// Deprecated in favor of `ThreadPoolBuilder::num_threads`.
num_threads(mut self, num_threads: usize) -> Configuration701     pub fn num_threads(mut self, num_threads: usize) -> Configuration {
702         self.builder = self.builder.num_threads(num_threads);
703         self
704     }
705 
706     /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`.
panic_handler<H>(mut self, panic_handler: H) -> Configuration where H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,707     pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration
708     where
709         H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
710     {
711         self.builder = self.builder.panic_handler(panic_handler);
712         self
713     }
714 
715     /// Deprecated in favor of `ThreadPoolBuilder::stack_size`.
stack_size(mut self, stack_size: usize) -> Self716     pub fn stack_size(mut self, stack_size: usize) -> Self {
717         self.builder = self.builder.stack_size(stack_size);
718         self
719     }
720 
721     /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`.
breadth_first(mut self) -> Self722     pub fn breadth_first(mut self) -> Self {
723         self.builder = self.builder.breadth_first();
724         self
725     }
726 
727     /// Deprecated in favor of `ThreadPoolBuilder::start_handler`.
start_handler<H>(mut self, start_handler: H) -> Configuration where H: Fn(usize) + Send + Sync + 'static,728     pub fn start_handler<H>(mut self, start_handler: H) -> Configuration
729     where
730         H: Fn(usize) + Send + Sync + 'static,
731     {
732         self.builder = self.builder.start_handler(start_handler);
733         self
734     }
735 
736     /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`.
exit_handler<H>(mut self, exit_handler: H) -> Configuration where H: Fn(usize) + Send + Sync + 'static,737     pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration
738     where
739         H: Fn(usize) + Send + Sync + 'static,
740     {
741         self.builder = self.builder.exit_handler(exit_handler);
742         self
743     }
744 
745     /// Returns a ThreadPoolBuilder with identical parameters.
into_builder(self) -> ThreadPoolBuilder746     fn into_builder(self) -> ThreadPoolBuilder {
747         self.builder
748     }
749 }
750 
751 impl ThreadPoolBuildError {
new(kind: ErrorKind) -> ThreadPoolBuildError752     fn new(kind: ErrorKind) -> ThreadPoolBuildError {
753         ThreadPoolBuildError { kind }
754     }
755 
is_unsupported(&self) -> bool756     fn is_unsupported(&self) -> bool {
757         matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
758     }
759 }
760 
761 const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
762     "The global thread pool has already been initialized.";
763 
764 const CURRENT_THREAD_ALREADY_IN_POOL: &str =
765     "The current thread is already part of another thread pool.";
766 
767 impl Error for ThreadPoolBuildError {
768     #[allow(deprecated)]
description(&self) -> &str769     fn description(&self) -> &str {
770         match self.kind {
771             ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED,
772             ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL,
773             ErrorKind::IOError(ref e) => e.description(),
774         }
775     }
776 
source(&self) -> Option<&(dyn Error + 'static)>777     fn source(&self) -> Option<&(dyn Error + 'static)> {
778         match &self.kind {
779             ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None,
780             ErrorKind::IOError(e) => Some(e),
781         }
782     }
783 }
784 
785 impl fmt::Display for ThreadPoolBuildError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result786     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
787         match &self.kind {
788             ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f),
789             ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f),
790             ErrorKind::IOError(e) => e.fmt(f),
791         }
792     }
793 }
794 
795 /// Deprecated in favor of `ThreadPoolBuilder::build_global`.
796 #[deprecated(note = "use `ThreadPoolBuilder::build_global`")]
797 #[allow(deprecated)]
initialize(config: Configuration) -> Result<(), Box<dyn Error>>798 pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> {
799     config.into_builder().build_global().map_err(Box::from)
800 }
801 
802 impl<S> fmt::Debug for ThreadPoolBuilder<S> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result803     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
804         let ThreadPoolBuilder {
805             ref num_threads,
806             ref use_current_thread,
807             ref get_thread_name,
808             ref panic_handler,
809             ref stack_size,
810             ref start_handler,
811             ref exit_handler,
812             spawn_handler: _,
813             ref breadth_first,
814         } = *self;
815 
816         // Just print `Some(<closure>)` or `None` to the debug
817         // output.
818         struct ClosurePlaceholder;
819         impl fmt::Debug for ClosurePlaceholder {
820             fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
821                 f.write_str("<closure>")
822             }
823         }
824         let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
825         let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
826         let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
827         let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
828 
829         f.debug_struct("ThreadPoolBuilder")
830             .field("num_threads", num_threads)
831             .field("use_current_thread", use_current_thread)
832             .field("get_thread_name", &get_thread_name)
833             .field("panic_handler", &panic_handler)
834             .field("stack_size", &stack_size)
835             .field("start_handler", &start_handler)
836             .field("exit_handler", &exit_handler)
837             .field("breadth_first", &breadth_first)
838             .finish()
839     }
840 }
841 
842 #[allow(deprecated)]
843 impl fmt::Debug for Configuration {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result844     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
845         self.builder.fmt(f)
846     }
847 }
848 
849 /// Provides the calling context to a closure called by `join_context`.
850 #[derive(Debug)]
851 pub struct FnContext {
852     migrated: bool,
853 
854     /// disable `Send` and `Sync`, just for a little future-proofing.
855     _marker: PhantomData<*mut ()>,
856 }
857 
858 impl FnContext {
859     #[inline]
new(migrated: bool) -> Self860     fn new(migrated: bool) -> Self {
861         FnContext {
862             migrated,
863             _marker: PhantomData,
864         }
865     }
866 }
867 
868 impl FnContext {
869     /// Returns `true` if the closure was called from a different thread
870     /// than it was provided from.
871     #[inline]
migrated(&self) -> bool872     pub fn migrated(&self) -> bool {
873         self.migrated
874     }
875 }
876