1 use super::plumbing::*;
2 use super::ParallelIterator;
3 use super::Try;
4 
5 use std::ops::ControlFlow::{self, Break, Continue};
6 use std::sync::atomic::{AtomicBool, Ordering};
7 
try_reduce_with<PI, R, T>(pi: PI, reduce_op: R) -> Option<T> where PI: ParallelIterator<Item = T>, R: Fn(T::Output, T::Output) -> T + Sync, T: Try + Send,8 pub(super) fn try_reduce_with<PI, R, T>(pi: PI, reduce_op: R) -> Option<T>
9 where
10     PI: ParallelIterator<Item = T>,
11     R: Fn(T::Output, T::Output) -> T + Sync,
12     T: Try + Send,
13 {
14     let full = AtomicBool::new(false);
15     let consumer = TryReduceWithConsumer {
16         reduce_op: &reduce_op,
17         full: &full,
18     };
19     pi.drive_unindexed(consumer)
20 }
21 
22 struct TryReduceWithConsumer<'r, R> {
23     reduce_op: &'r R,
24     full: &'r AtomicBool,
25 }
26 
27 impl<'r, R> Copy for TryReduceWithConsumer<'r, R> {}
28 
29 impl<'r, R> Clone for TryReduceWithConsumer<'r, R> {
clone(&self) -> Self30     fn clone(&self) -> Self {
31         *self
32     }
33 }
34 
35 impl<'r, R, T> Consumer<T> for TryReduceWithConsumer<'r, R>
36 where
37     R: Fn(T::Output, T::Output) -> T + Sync,
38     T: Try + Send,
39 {
40     type Folder = TryReduceWithFolder<'r, R, T>;
41     type Reducer = Self;
42     type Result = Option<T>;
43 
split_at(self, _index: usize) -> (Self, Self, Self)44     fn split_at(self, _index: usize) -> (Self, Self, Self) {
45         (self, self, self)
46     }
47 
into_folder(self) -> Self::Folder48     fn into_folder(self) -> Self::Folder {
49         TryReduceWithFolder {
50             reduce_op: self.reduce_op,
51             opt_control: None,
52             full: self.full,
53         }
54     }
55 
full(&self) -> bool56     fn full(&self) -> bool {
57         self.full.load(Ordering::Relaxed)
58     }
59 }
60 
61 impl<'r, R, T> UnindexedConsumer<T> for TryReduceWithConsumer<'r, R>
62 where
63     R: Fn(T::Output, T::Output) -> T + Sync,
64     T: Try + Send,
65 {
split_off_left(&self) -> Self66     fn split_off_left(&self) -> Self {
67         *self
68     }
69 
to_reducer(&self) -> Self::Reducer70     fn to_reducer(&self) -> Self::Reducer {
71         *self
72     }
73 }
74 
75 impl<'r, R, T> Reducer<Option<T>> for TryReduceWithConsumer<'r, R>
76 where
77     R: Fn(T::Output, T::Output) -> T + Sync,
78     T: Try,
79 {
reduce(self, left: Option<T>, right: Option<T>) -> Option<T>80     fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
81         let reduce_op = self.reduce_op;
82         match (left, right) {
83             (Some(left), Some(right)) => match (left.branch(), right.branch()) {
84                 (Continue(left), Continue(right)) => Some(reduce_op(left, right)),
85                 (Break(r), _) | (_, Break(r)) => Some(T::from_residual(r)),
86             },
87             (None, x) | (x, None) => x,
88         }
89     }
90 }
91 
92 struct TryReduceWithFolder<'r, R, T: Try> {
93     reduce_op: &'r R,
94     opt_control: Option<ControlFlow<T::Residual, T::Output>>,
95     full: &'r AtomicBool,
96 }
97 
98 impl<'r, R, T> Folder<T> for TryReduceWithFolder<'r, R, T>
99 where
100     R: Fn(T::Output, T::Output) -> T,
101     T: Try,
102 {
103     type Result = Option<T>;
104 
consume(mut self, item: T) -> Self105     fn consume(mut self, item: T) -> Self {
106         let reduce_op = self.reduce_op;
107         let control = match (self.opt_control, item.branch()) {
108             (Some(Continue(left)), Continue(right)) => reduce_op(left, right).branch(),
109             (Some(control @ Break(_)), _) | (_, control) => control,
110         };
111         if let Break(_) = control {
112             self.full.store(true, Ordering::Relaxed)
113         }
114         self.opt_control = Some(control);
115         self
116     }
117 
complete(self) -> Option<T>118     fn complete(self) -> Option<T> {
119         match self.opt_control {
120             Some(Continue(c)) => Some(T::from_output(c)),
121             Some(Break(r)) => Some(T::from_residual(r)),
122             None => None,
123         }
124     }
125 
full(&self) -> bool126     fn full(&self) -> bool {
127         match self.opt_control {
128             Some(Break(_)) => true,
129             _ => self.full.load(Ordering::Relaxed),
130         }
131     }
132 }
133