1 use crate::lock::RwLock;
2 use crate::mapref::multiple::{RefMulti, RefMutMulti};
3 use crate::util;
4 use crate::{DashMap, HashMap};
5 use core::hash::{BuildHasher, Hash};
6 use rayon::iter::plumbing::UnindexedConsumer;
7 use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
8 use std::collections::hash_map::RandomState;
9 use std::sync::Arc;
10 
11 impl<K, V, S> ParallelExtend<(K, V)> for DashMap<K, V, S>
12 where
13     K: Send + Sync + Eq + Hash,
14     V: Send + Sync,
15     S: Send + Sync + Clone + BuildHasher,
16 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (K, V)>,17     fn par_extend<I>(&mut self, par_iter: I)
18     where
19         I: IntoParallelIterator<Item = (K, V)>,
20     {
21         (&*self).par_extend(par_iter);
22     }
23 }
24 
25 // Since we don't actually need mutability, we can implement this on a
26 // reference, similar to `io::Write for &File`.
27 impl<K, V, S> ParallelExtend<(K, V)> for &'_ DashMap<K, V, S>
28 where
29     K: Send + Sync + Eq + Hash,
30     V: Send + Sync,
31     S: Send + Sync + Clone + BuildHasher,
32 {
par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (K, V)>,33     fn par_extend<I>(&mut self, par_iter: I)
34     where
35         I: IntoParallelIterator<Item = (K, V)>,
36     {
37         let &mut map = self;
38         par_iter.into_par_iter().for_each(move |(key, value)| {
39             map.insert(key, value);
40         });
41     }
42 }
43 
44 impl<K, V, S> FromParallelIterator<(K, V)> for DashMap<K, V, S>
45 where
46     K: Send + Sync + Eq + Hash,
47     V: Send + Sync,
48     S: Send + Sync + Clone + Default + BuildHasher,
49 {
from_par_iter<I>(par_iter: I) -> Self where I: IntoParallelIterator<Item = (K, V)>,50     fn from_par_iter<I>(par_iter: I) -> Self
51     where
52         I: IntoParallelIterator<Item = (K, V)>,
53     {
54         let map = Self::default();
55         (&map).par_extend(par_iter);
56         map
57     }
58 }
59 
60 // Implementation note: while the shards will iterate in parallel, we flatten
61 // sequentially within each shard (`flat_map_iter`), because the standard
62 // `HashMap` only implements `ParallelIterator` by collecting to a `Vec` first.
63 // There is real parallel support in the `hashbrown/rayon` feature, but we don't
64 // always use that map.
65 
66 impl<K, V, S> IntoParallelIterator for DashMap<K, V, S>
67 where
68     K: Send + Eq + Hash,
69     V: Send,
70     S: Send + Clone + BuildHasher,
71 {
72     type Iter = OwningIter<K, V, S>;
73     type Item = (K, V);
74 
into_par_iter(self) -> Self::Iter75     fn into_par_iter(self) -> Self::Iter {
76         OwningIter {
77             shards: self.shards,
78         }
79     }
80 }
81 
82 pub struct OwningIter<K, V, S = RandomState> {
83     pub(super) shards: Box<[RwLock<HashMap<K, V, S>>]>,
84 }
85 
86 impl<K, V, S> ParallelIterator for OwningIter<K, V, S>
87 where
88     K: Send + Eq + Hash,
89     V: Send,
90     S: Send + Clone + BuildHasher,
91 {
92     type Item = (K, V);
93 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,94     fn drive_unindexed<C>(self, consumer: C) -> C::Result
95     where
96         C: UnindexedConsumer<Self::Item>,
97     {
98         Vec::from(self.shards)
99             .into_par_iter()
100             .flat_map_iter(|shard| {
101                 shard
102                     .into_inner()
103                     .into_iter()
104                     .map(|(k, v)| (k, v.into_inner()))
105             })
106             .drive_unindexed(consumer)
107     }
108 }
109 
110 // This impl also enables `IntoParallelRefIterator::par_iter`
111 impl<'a, K, V, S> IntoParallelIterator for &'a DashMap<K, V, S>
112 where
113     K: Send + Sync + Eq + Hash,
114     V: Send + Sync,
115     S: Send + Sync + Clone + BuildHasher,
116 {
117     type Iter = Iter<'a, K, V, S>;
118     type Item = RefMulti<'a, K, V, S>;
119 
into_par_iter(self) -> Self::Iter120     fn into_par_iter(self) -> Self::Iter {
121         Iter {
122             shards: &self.shards,
123         }
124     }
125 }
126 
127 pub struct Iter<'a, K, V, S = RandomState> {
128     pub(super) shards: &'a [RwLock<HashMap<K, V, S>>],
129 }
130 
131 impl<'a, K, V, S> ParallelIterator for Iter<'a, K, V, S>
132 where
133     K: Send + Sync + Eq + Hash,
134     V: Send + Sync,
135     S: Send + Sync + Clone + BuildHasher,
136 {
137     type Item = RefMulti<'a, K, V, S>;
138 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,139     fn drive_unindexed<C>(self, consumer: C) -> C::Result
140     where
141         C: UnindexedConsumer<Self::Item>,
142     {
143         self.shards
144             .into_par_iter()
145             .flat_map_iter(|shard| {
146                 let guard = shard.read();
147                 let sref: &'a HashMap<K, V, S> = unsafe { util::change_lifetime_const(&*guard) };
148 
149                 let guard = Arc::new(guard);
150                 sref.iter().map(move |(k, v)| {
151                     let guard = Arc::clone(&guard);
152                     unsafe { RefMulti::new(guard, k, v.get()) }
153                 })
154             })
155             .drive_unindexed(consumer)
156     }
157 }
158 
159 // This impl also enables `IntoParallelRefMutIterator::par_iter_mut`
160 impl<'a, K, V, S> IntoParallelIterator for &'a mut DashMap<K, V, S>
161 where
162     K: Send + Sync + Eq + Hash,
163     V: Send + Sync,
164     S: Send + Sync + Clone + BuildHasher,
165 {
166     type Iter = IterMut<'a, K, V, S>;
167     type Item = RefMutMulti<'a, K, V, S>;
168 
into_par_iter(self) -> Self::Iter169     fn into_par_iter(self) -> Self::Iter {
170         IterMut {
171             shards: &self.shards,
172         }
173     }
174 }
175 
176 impl<K, V, S> DashMap<K, V, S>
177 where
178     K: Send + Sync + Eq + Hash,
179     V: Send + Sync,
180     S: Send + Sync + Clone + BuildHasher,
181 {
182     // Unlike `IntoParallelRefMutIterator::par_iter_mut`, we only _need_ `&self`.
par_iter_mut(&self) -> IterMut<'_, K, V, S>183     pub fn par_iter_mut(&self) -> IterMut<'_, K, V, S> {
184         IterMut {
185             shards: &self.shards,
186         }
187     }
188 }
189 
190 pub struct IterMut<'a, K, V, S = RandomState> {
191     shards: &'a [RwLock<HashMap<K, V, S>>],
192 }
193 
194 impl<'a, K, V, S> ParallelIterator for IterMut<'a, K, V, S>
195 where
196     K: Send + Sync + Eq + Hash,
197     V: Send + Sync,
198     S: Send + Sync + Clone + BuildHasher,
199 {
200     type Item = RefMutMulti<'a, K, V, S>;
201 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,202     fn drive_unindexed<C>(self, consumer: C) -> C::Result
203     where
204         C: UnindexedConsumer<Self::Item>,
205     {
206         self.shards
207             .into_par_iter()
208             .flat_map_iter(|shard| {
209                 let mut guard = shard.write();
210                 let sref: &'a mut HashMap<K, V, S> =
211                     unsafe { util::change_lifetime_mut(&mut *guard) };
212 
213                 let guard = Arc::new(guard);
214                 sref.iter_mut().map(move |(k, v)| {
215                     let guard = Arc::clone(&guard);
216                     unsafe { RefMutMulti::new(guard, k, v.get_mut()) }
217                 })
218             })
219             .drive_unindexed(consumer)
220     }
221 }
222