1 #[cfg(not(feature = "web_spin_lock"))]
2 use std::sync::Mutex;
3 
4 #[cfg(feature = "web_spin_lock")]
5 use wasm_sync::Mutex;
6 
7 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 
9 use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
10 use crate::iter::ParallelIterator;
11 use crate::{current_num_threads, current_thread_index};
12 
13 /// Conversion trait to convert an `Iterator` to a `ParallelIterator`.
14 ///
15 /// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items
16 /// across the Rayon thread pool. This has the advantage of being able to parallelize just about
17 /// anything, but the resulting `ParallelIterator` can be less efficient than if you started with
18 /// `par_iter` instead. However, it can still be useful for iterators that are difficult to
19 /// parallelize by other means, like channels or file or network I/O.
20 ///
21 /// Iterator items are pulled by `next()` one at a time, synchronized from each thread that is
22 /// ready for work, so this may become a bottleneck if the serial iterator can't keep up with the
23 /// parallel demand. The items are not buffered by `IterBridge`, so it's fine to use this with
24 /// large or even unbounded iterators.
25 ///
26 /// The resulting iterator is not guaranteed to keep the order of the original iterator.
27 ///
28 /// # Examples
29 ///
30 /// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can
31 /// use any of the `ParallelIterator` methods:
32 ///
33 /// ```
34 /// use rayon::iter::ParallelBridge;
35 /// use rayon::prelude::ParallelIterator;
36 /// use std::sync::mpsc::channel;
37 ///
38 /// let rx = {
39 ///     let (tx, rx) = channel();
40 ///
41 ///     tx.send("one!");
42 ///     tx.send("two!");
43 ///     tx.send("three!");
44 ///
45 ///     rx
46 /// };
47 ///
48 /// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
49 /// output.sort_unstable();
50 ///
51 /// assert_eq!(&*output, &["one!", "three!", "two!"]);
52 /// ```
53 pub trait ParallelBridge: Sized {
54     /// Creates a bridge from this type to a `ParallelIterator`.
par_bridge(self) -> IterBridge<Self>55     fn par_bridge(self) -> IterBridge<Self>;
56 }
57 
58 impl<T: Iterator + Send> ParallelBridge for T
59 where
60     T::Item: Send,
61 {
par_bridge(self) -> IterBridge<Self>62     fn par_bridge(self) -> IterBridge<Self> {
63         IterBridge { iter: self }
64     }
65 }
66 
67 /// `IterBridge` is a parallel iterator that wraps a sequential iterator.
68 ///
69 /// This type is created when using the `par_bridge` method on `ParallelBridge`. See the
70 /// [`ParallelBridge`] documentation for details.
71 ///
72 /// [`ParallelBridge`]: trait.ParallelBridge.html
73 #[derive(Debug, Clone)]
74 pub struct IterBridge<Iter> {
75     iter: Iter,
76 }
77 
78 impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
79 where
80     Iter::Item: Send,
81 {
82     type Item = Iter::Item;
83 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,84     fn drive_unindexed<C>(self, consumer: C) -> C::Result
85     where
86         C: UnindexedConsumer<Self::Item>,
87     {
88         let num_threads = current_num_threads();
89         let threads_started: Vec<_> = (0..num_threads).map(|_| AtomicBool::new(false)).collect();
90 
91         bridge_unindexed(
92             &IterParallelProducer {
93                 split_count: AtomicUsize::new(num_threads),
94                 iter: Mutex::new(self.iter.fuse()),
95                 threads_started: &threads_started,
96             },
97             consumer,
98         )
99     }
100 }
101 
102 struct IterParallelProducer<'a, Iter> {
103     split_count: AtomicUsize,
104     iter: Mutex<std::iter::Fuse<Iter>>,
105     threads_started: &'a [AtomicBool],
106 }
107 
108 impl<Iter: Iterator + Send> UnindexedProducer for &IterParallelProducer<'_, Iter> {
109     type Item = Iter::Item;
110 
split(self) -> (Self, Option<Self>)111     fn split(self) -> (Self, Option<Self>) {
112         let mut count = self.split_count.load(Ordering::SeqCst);
113 
114         loop {
115             // Check if the iterator is exhausted
116             if let Some(new_count) = count.checked_sub(1) {
117                 match self.split_count.compare_exchange_weak(
118                     count,
119                     new_count,
120                     Ordering::SeqCst,
121                     Ordering::SeqCst,
122                 ) {
123                     Ok(_) => return (self, Some(self)),
124                     Err(last_count) => count = last_count,
125                 }
126             } else {
127                 return (self, None);
128             }
129         }
130     }
131 
fold_with<F>(self, mut folder: F) -> F where F: Folder<Self::Item>,132     fn fold_with<F>(self, mut folder: F) -> F
133     where
134         F: Folder<Self::Item>,
135     {
136         // Guard against work-stealing-induced recursion, in case `Iter::next()`
137         // calls rayon internally, so we don't deadlock our mutex. We might also
138         // be recursing via `folder` methods, which doesn't present a mutex hazard,
139         // but it's lower overhead for us to just check this once, rather than
140         // updating additional shared state on every mutex lock/unlock.
141         // (If this isn't a rayon thread, then there's no work-stealing anyway...)
142         if let Some(i) = current_thread_index() {
143             // Note: If the number of threads in the pool ever grows dynamically, then
144             // we'll end up sharing flags and may falsely detect recursion -- that's
145             // still fine for overall correctness, just not optimal for parallelism.
146             let thread_started = &self.threads_started[i % self.threads_started.len()];
147             if thread_started.swap(true, Ordering::Relaxed) {
148                 // We can't make progress with a nested mutex, so just return and let
149                 // the outermost loop continue with the rest of the iterator items.
150                 return folder;
151             }
152         }
153 
154         loop {
155             if let Ok(mut iter) = self.iter.lock() {
156                 if let Some(it) = iter.next() {
157                     drop(iter);
158                     folder = folder.consume(it);
159                     if folder.full() {
160                         return folder;
161                     }
162                 } else {
163                     return folder;
164                 }
165             } else {
166                 // any panics from other threads will have been caught by the pool,
167                 // and will be re-thrown when joined - just exit
168                 return folder;
169             }
170         }
171     }
172 }
173