1 use super::plumbing::*;
2 use super::*;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 
5 /// `SkipAny` is an iterator that skips over `n` elements from anywhere in `I`.
6 /// This struct is created by the [`skip_any()`] method on [`ParallelIterator`]
7 ///
8 /// [`skip_any()`]: trait.ParallelIterator.html#method.skip_any
9 /// [`ParallelIterator`]: trait.ParallelIterator.html
10 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
11 #[derive(Clone, Debug)]
12 pub struct SkipAny<I: ParallelIterator> {
13     base: I,
14     count: usize,
15 }
16 
17 impl<I> SkipAny<I>
18 where
19     I: ParallelIterator,
20 {
21     /// Creates a new `SkipAny` iterator.
new(base: I, count: usize) -> Self22     pub(super) fn new(base: I, count: usize) -> Self {
23         SkipAny { base, count }
24     }
25 }
26 
27 impl<I> ParallelIterator for SkipAny<I>
28 where
29     I: ParallelIterator,
30 {
31     type Item = I::Item;
32 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,33     fn drive_unindexed<C>(self, consumer: C) -> C::Result
34     where
35         C: UnindexedConsumer<Self::Item>,
36     {
37         let consumer1 = SkipAnyConsumer {
38             base: consumer,
39             count: &AtomicUsize::new(self.count),
40         };
41         self.base.drive_unindexed(consumer1)
42     }
43 }
44 
45 /// ////////////////////////////////////////////////////////////////////////
46 /// Consumer implementation
47 
48 struct SkipAnyConsumer<'f, C> {
49     base: C,
50     count: &'f AtomicUsize,
51 }
52 
53 impl<'f, T, C> Consumer<T> for SkipAnyConsumer<'f, C>
54 where
55     C: Consumer<T>,
56     T: Send,
57 {
58     type Folder = SkipAnyFolder<'f, C::Folder>;
59     type Reducer = C::Reducer;
60     type Result = C::Result;
61 
split_at(self, index: usize) -> (Self, Self, Self::Reducer)62     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
63         let (left, right, reducer) = self.base.split_at(index);
64         (
65             SkipAnyConsumer { base: left, ..self },
66             SkipAnyConsumer {
67                 base: right,
68                 ..self
69             },
70             reducer,
71         )
72     }
73 
into_folder(self) -> Self::Folder74     fn into_folder(self) -> Self::Folder {
75         SkipAnyFolder {
76             base: self.base.into_folder(),
77             count: self.count,
78         }
79     }
80 
full(&self) -> bool81     fn full(&self) -> bool {
82         self.base.full()
83     }
84 }
85 
86 impl<'f, T, C> UnindexedConsumer<T> for SkipAnyConsumer<'f, C>
87 where
88     C: UnindexedConsumer<T>,
89     T: Send,
90 {
split_off_left(&self) -> Self91     fn split_off_left(&self) -> Self {
92         SkipAnyConsumer {
93             base: self.base.split_off_left(),
94             ..*self
95         }
96     }
97 
to_reducer(&self) -> Self::Reducer98     fn to_reducer(&self) -> Self::Reducer {
99         self.base.to_reducer()
100     }
101 }
102 
103 struct SkipAnyFolder<'f, C> {
104     base: C,
105     count: &'f AtomicUsize,
106 }
107 
checked_decrement(u: &AtomicUsize) -> bool108 fn checked_decrement(u: &AtomicUsize) -> bool {
109     u.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| u.checked_sub(1))
110         .is_ok()
111 }
112 
113 impl<'f, T, C> Folder<T> for SkipAnyFolder<'f, C>
114 where
115     C: Folder<T>,
116 {
117     type Result = C::Result;
118 
consume(mut self, item: T) -> Self119     fn consume(mut self, item: T) -> Self {
120         if !checked_decrement(self.count) {
121             self.base = self.base.consume(item);
122         }
123         self
124     }
125 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,126     fn consume_iter<I>(mut self, iter: I) -> Self
127     where
128         I: IntoIterator<Item = T>,
129     {
130         self.base = self.base.consume_iter(
131             iter.into_iter()
132                 .skip_while(move |_| checked_decrement(self.count)),
133         );
134         self
135     }
136 
complete(self) -> C::Result137     fn complete(self) -> C::Result {
138         self.base.complete()
139     }
140 
full(&self) -> bool141     fn full(&self) -> bool {
142         self.base.full()
143     }
144 }
145