1 use super::plumbing::*;
2 use super::*;
3 use std::sync::atomic::{AtomicBool, Ordering};
4
find<I, P>(pi: I, find_op: P) -> Option<I::Item> where I: ParallelIterator, P: Fn(&I::Item) -> bool + Sync,5 pub(super) fn find<I, P>(pi: I, find_op: P) -> Option<I::Item>
6 where
7 I: ParallelIterator,
8 P: Fn(&I::Item) -> bool + Sync,
9 {
10 let found = AtomicBool::new(false);
11 let consumer = FindConsumer::new(&find_op, &found);
12 pi.drive_unindexed(consumer)
13 }
14
15 struct FindConsumer<'p, P> {
16 find_op: &'p P,
17 found: &'p AtomicBool,
18 }
19
20 impl<'p, P> FindConsumer<'p, P> {
new(find_op: &'p P, found: &'p AtomicBool) -> Self21 fn new(find_op: &'p P, found: &'p AtomicBool) -> Self {
22 FindConsumer { find_op, found }
23 }
24 }
25
26 impl<'p, T, P: 'p> Consumer<T> for FindConsumer<'p, P>
27 where
28 T: Send,
29 P: Fn(&T) -> bool + Sync,
30 {
31 type Folder = FindFolder<'p, T, P>;
32 type Reducer = FindReducer;
33 type Result = Option<T>;
34
split_at(self, _index: usize) -> (Self, Self, Self::Reducer)35 fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
36 (self.split_off_left(), self, FindReducer)
37 }
38
into_folder(self) -> Self::Folder39 fn into_folder(self) -> Self::Folder {
40 FindFolder {
41 find_op: self.find_op,
42 found: self.found,
43 item: None,
44 }
45 }
46
full(&self) -> bool47 fn full(&self) -> bool {
48 self.found.load(Ordering::Relaxed)
49 }
50 }
51
52 impl<'p, T, P: 'p> UnindexedConsumer<T> for FindConsumer<'p, P>
53 where
54 T: Send,
55 P: Fn(&T) -> bool + Sync,
56 {
split_off_left(&self) -> Self57 fn split_off_left(&self) -> Self {
58 FindConsumer::new(self.find_op, self.found)
59 }
60
to_reducer(&self) -> Self::Reducer61 fn to_reducer(&self) -> Self::Reducer {
62 FindReducer
63 }
64 }
65
66 struct FindFolder<'p, T, P> {
67 find_op: &'p P,
68 found: &'p AtomicBool,
69 item: Option<T>,
70 }
71
72 impl<'p, T, P> Folder<T> for FindFolder<'p, T, P>
73 where
74 P: Fn(&T) -> bool + 'p,
75 {
76 type Result = Option<T>;
77
consume(mut self, item: T) -> Self78 fn consume(mut self, item: T) -> Self {
79 if (self.find_op)(&item) {
80 self.found.store(true, Ordering::Relaxed);
81 self.item = Some(item);
82 }
83 self
84 }
85
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,86 fn consume_iter<I>(mut self, iter: I) -> Self
87 where
88 I: IntoIterator<Item = T>,
89 {
90 fn not_full<T>(found: &AtomicBool) -> impl Fn(&T) -> bool + '_ {
91 move |_| !found.load(Ordering::Relaxed)
92 }
93
94 self.item = iter
95 .into_iter()
96 // stop iterating if another thread has found something
97 .take_while(not_full(self.found))
98 .find(self.find_op);
99 if self.item.is_some() {
100 self.found.store(true, Ordering::Relaxed)
101 }
102 self
103 }
104
complete(self) -> Self::Result105 fn complete(self) -> Self::Result {
106 self.item
107 }
108
full(&self) -> bool109 fn full(&self) -> bool {
110 self.found.load(Ordering::Relaxed)
111 }
112 }
113
114 struct FindReducer;
115
116 impl<T> Reducer<Option<T>> for FindReducer {
reduce(self, left: Option<T>, right: Option<T>) -> Option<T>117 fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
118 left.or(right)
119 }
120 }
121