1 //! This module contains the parallel iterator types for double-ended queues 2 //! (`VecDeque<T>`). You will rarely need to interact with it directly 3 //! unless you have need to name one of the iterator types. 4 5 use std::collections::VecDeque; 6 use std::ops::{Range, RangeBounds}; 7 8 use crate::iter::plumbing::*; 9 use crate::iter::*; 10 use crate::math::simplify_range; 11 12 use crate::slice; 13 use crate::vec; 14 15 /// Parallel iterator over a double-ended queue 16 #[derive(Debug, Clone)] 17 pub struct IntoIter<T: Send> { 18 inner: vec::IntoIter<T>, 19 } 20 21 impl<T: Send> IntoParallelIterator for VecDeque<T> { 22 type Item = T; 23 type Iter = IntoIter<T>; 24 into_par_iter(self) -> Self::Iter25 fn into_par_iter(self) -> Self::Iter { 26 // NOTE: requires data movement if the deque doesn't start at offset 0. 27 let inner = Vec::from(self).into_par_iter(); 28 IntoIter { inner } 29 } 30 } 31 32 delegate_indexed_iterator! { 33 IntoIter<T> => T, 34 impl<T: Send> 35 } 36 37 /// Parallel iterator over an immutable reference to a double-ended queue 38 #[derive(Debug)] 39 pub struct Iter<'a, T: Sync> { 40 inner: Chain<slice::Iter<'a, T>, slice::Iter<'a, T>>, 41 } 42 43 impl<'a, T: Sync> Clone for Iter<'a, T> { clone(&self) -> Self44 fn clone(&self) -> Self { 45 Iter { 46 inner: self.inner.clone(), 47 } 48 } 49 } 50 51 impl<'a, T: Sync> IntoParallelIterator for &'a VecDeque<T> { 52 type Item = &'a T; 53 type Iter = Iter<'a, T>; 54 into_par_iter(self) -> Self::Iter55 fn into_par_iter(self) -> Self::Iter { 56 let (a, b) = self.as_slices(); 57 Iter { 58 inner: a.into_par_iter().chain(b), 59 } 60 } 61 } 62 63 delegate_indexed_iterator! { 64 Iter<'a, T> => &'a T, 65 impl<'a, T: Sync + 'a> 66 } 67 68 /// Parallel iterator over a mutable reference to a double-ended queue 69 #[derive(Debug)] 70 pub struct IterMut<'a, T: Send> { 71 inner: Chain<slice::IterMut<'a, T>, slice::IterMut<'a, T>>, 72 } 73 74 impl<'a, T: Send> IntoParallelIterator for &'a mut VecDeque<T> { 75 type Item = &'a mut T; 76 type Iter = IterMut<'a, T>; 77 into_par_iter(self) -> Self::Iter78 fn into_par_iter(self) -> Self::Iter { 79 let (a, b) = self.as_mut_slices(); 80 IterMut { 81 inner: a.into_par_iter().chain(b), 82 } 83 } 84 } 85 86 delegate_indexed_iterator! { 87 IterMut<'a, T> => &'a mut T, 88 impl<'a, T: Send + 'a> 89 } 90 91 /// Draining parallel iterator that moves a range out of a double-ended queue, 92 /// but keeps the total capacity. 93 #[derive(Debug)] 94 pub struct Drain<'a, T: Send> { 95 deque: &'a mut VecDeque<T>, 96 range: Range<usize>, 97 orig_len: usize, 98 } 99 100 impl<'a, T: Send> ParallelDrainRange<usize> for &'a mut VecDeque<T> { 101 type Iter = Drain<'a, T>; 102 type Item = T; 103 par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter104 fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter { 105 Drain { 106 orig_len: self.len(), 107 range: simplify_range(range, self.len()), 108 deque: self, 109 } 110 } 111 } 112 113 impl<'a, T: Send> ParallelIterator for Drain<'a, T> { 114 type Item = T; 115 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,116 fn drive_unindexed<C>(self, consumer: C) -> C::Result 117 where 118 C: UnindexedConsumer<Self::Item>, 119 { 120 bridge(self, consumer) 121 } 122 opt_len(&self) -> Option<usize>123 fn opt_len(&self) -> Option<usize> { 124 Some(self.len()) 125 } 126 } 127 128 impl<'a, T: Send> IndexedParallelIterator for Drain<'a, T> { drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>,129 fn drive<C>(self, consumer: C) -> C::Result 130 where 131 C: Consumer<Self::Item>, 132 { 133 bridge(self, consumer) 134 } 135 len(&self) -> usize136 fn len(&self) -> usize { 137 self.range.len() 138 } 139 with_producer<CB>(self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>,140 fn with_producer<CB>(self, callback: CB) -> CB::Output 141 where 142 CB: ProducerCallback<Self::Item>, 143 { 144 // NOTE: requires data movement if the deque doesn't start at offset 0. 145 super::DrainGuard::new(self.deque) 146 .par_drain(self.range.clone()) 147 .with_producer(callback) 148 } 149 } 150 151 impl<'a, T: Send> Drop for Drain<'a, T> { drop(&mut self)152 fn drop(&mut self) { 153 if self.deque.len() != self.orig_len - self.range.len() { 154 // We must not have produced, so just call a normal drain to remove the items. 155 assert_eq!(self.deque.len(), self.orig_len); 156 self.deque.drain(self.range.clone()); 157 } 158 } 159 } 160