1 use std::cmp::min;
2 
3 use super::plumbing::*;
4 use super::*;
5 use crate::math::div_round_up;
6 
7 /// `Chunks` is an iterator that groups elements of an underlying iterator.
8 ///
9 /// This struct is created by the [`chunks()`] method on [`IndexedParallelIterator`]
10 ///
11 /// [`chunks()`]: trait.IndexedParallelIterator.html#method.chunks
12 /// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
13 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
14 #[derive(Debug, Clone)]
15 pub struct Chunks<I>
16 where
17     I: IndexedParallelIterator,
18 {
19     size: usize,
20     i: I,
21 }
22 
23 impl<I> Chunks<I>
24 where
25     I: IndexedParallelIterator,
26 {
27     /// Creates a new `Chunks` iterator
new(i: I, size: usize) -> Self28     pub(super) fn new(i: I, size: usize) -> Self {
29         Chunks { i, size }
30     }
31 }
32 
33 impl<I> ParallelIterator for Chunks<I>
34 where
35     I: IndexedParallelIterator,
36 {
37     type Item = Vec<I::Item>;
38 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: Consumer<Vec<I::Item>>,39     fn drive_unindexed<C>(self, consumer: C) -> C::Result
40     where
41         C: Consumer<Vec<I::Item>>,
42     {
43         bridge(self, consumer)
44     }
45 
opt_len(&self) -> Option<usize>46     fn opt_len(&self) -> Option<usize> {
47         Some(self.len())
48     }
49 }
50 
51 impl<I> IndexedParallelIterator for Chunks<I>
52 where
53     I: IndexedParallelIterator,
54 {
drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>,55     fn drive<C>(self, consumer: C) -> C::Result
56     where
57         C: Consumer<Self::Item>,
58     {
59         bridge(self, consumer)
60     }
61 
len(&self) -> usize62     fn len(&self) -> usize {
63         div_round_up(self.i.len(), self.size)
64     }
65 
with_producer<CB>(self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>,66     fn with_producer<CB>(self, callback: CB) -> CB::Output
67     where
68         CB: ProducerCallback<Self::Item>,
69     {
70         let len = self.i.len();
71         return self.i.with_producer(Callback {
72             size: self.size,
73             len,
74             callback,
75         });
76 
77         struct Callback<CB> {
78             size: usize,
79             len: usize,
80             callback: CB,
81         }
82 
83         impl<T, CB> ProducerCallback<T> for Callback<CB>
84         where
85             CB: ProducerCallback<Vec<T>>,
86         {
87             type Output = CB::Output;
88 
89             fn callback<P>(self, base: P) -> CB::Output
90             where
91                 P: Producer<Item = T>,
92             {
93                 let producer = ChunkProducer::new(self.size, self.len, base, Vec::from_iter);
94                 self.callback.callback(producer)
95             }
96         }
97     }
98 }
99 
100 pub(super) struct ChunkProducer<P, F> {
101     chunk_size: usize,
102     len: usize,
103     base: P,
104     map: F,
105 }
106 
107 impl<P, F> ChunkProducer<P, F> {
new(chunk_size: usize, len: usize, base: P, map: F) -> Self108     pub(super) fn new(chunk_size: usize, len: usize, base: P, map: F) -> Self {
109         Self {
110             chunk_size,
111             len,
112             base,
113             map,
114         }
115     }
116 }
117 
118 impl<P, F, T> Producer for ChunkProducer<P, F>
119 where
120     P: Producer,
121     F: Fn(P::IntoIter) -> T + Send + Clone,
122 {
123     type Item = T;
124     type IntoIter = std::iter::Map<ChunkSeq<P>, F>;
125 
into_iter(self) -> Self::IntoIter126     fn into_iter(self) -> Self::IntoIter {
127         let chunks = ChunkSeq {
128             chunk_size: self.chunk_size,
129             len: self.len,
130             inner: if self.len > 0 { Some(self.base) } else { None },
131         };
132         chunks.map(self.map)
133     }
134 
split_at(self, index: usize) -> (Self, Self)135     fn split_at(self, index: usize) -> (Self, Self) {
136         let elem_index = min(index * self.chunk_size, self.len);
137         let (left, right) = self.base.split_at(elem_index);
138         (
139             ChunkProducer {
140                 chunk_size: self.chunk_size,
141                 len: elem_index,
142                 base: left,
143                 map: self.map.clone(),
144             },
145             ChunkProducer {
146                 chunk_size: self.chunk_size,
147                 len: self.len - elem_index,
148                 base: right,
149                 map: self.map,
150             },
151         )
152     }
153 
min_len(&self) -> usize154     fn min_len(&self) -> usize {
155         div_round_up(self.base.min_len(), self.chunk_size)
156     }
157 
max_len(&self) -> usize158     fn max_len(&self) -> usize {
159         self.base.max_len() / self.chunk_size
160     }
161 }
162 
163 pub(super) struct ChunkSeq<P> {
164     chunk_size: usize,
165     len: usize,
166     inner: Option<P>,
167 }
168 
169 impl<P> Iterator for ChunkSeq<P>
170 where
171     P: Producer,
172 {
173     type Item = P::IntoIter;
174 
next(&mut self) -> Option<Self::Item>175     fn next(&mut self) -> Option<Self::Item> {
176         let producer = self.inner.take()?;
177         if self.len > self.chunk_size {
178             let (left, right) = producer.split_at(self.chunk_size);
179             self.inner = Some(right);
180             self.len -= self.chunk_size;
181             Some(left.into_iter())
182         } else {
183             debug_assert!(self.len > 0);
184             self.len = 0;
185             Some(producer.into_iter())
186         }
187     }
188 
size_hint(&self) -> (usize, Option<usize>)189     fn size_hint(&self) -> (usize, Option<usize>) {
190         let len = self.len();
191         (len, Some(len))
192     }
193 }
194 
195 impl<P> ExactSizeIterator for ChunkSeq<P>
196 where
197     P: Producer,
198 {
199     #[inline]
len(&self) -> usize200     fn len(&self) -> usize {
201         div_round_up(self.len, self.chunk_size)
202     }
203 }
204 
205 impl<P> DoubleEndedIterator for ChunkSeq<P>
206 where
207     P: Producer,
208 {
next_back(&mut self) -> Option<Self::Item>209     fn next_back(&mut self) -> Option<Self::Item> {
210         let producer = self.inner.take()?;
211         if self.len > self.chunk_size {
212             let mut size = self.len % self.chunk_size;
213             if size == 0 {
214                 size = self.chunk_size;
215             }
216             let (left, right) = producer.split_at(self.len - size);
217             self.inner = Some(left);
218             self.len -= size;
219             Some(right.into_iter())
220         } else {
221             debug_assert!(self.len > 0);
222             self.len = 0;
223             Some(producer.into_iter())
224         }
225     }
226 }
227