1 use crate::lock::RwLock; 2 use crate::mapref::multiple::{RefMulti, RefMutMulti}; 3 use crate::util; 4 use crate::{DashMap, HashMap}; 5 use core::hash::{BuildHasher, Hash}; 6 use rayon::iter::plumbing::UnindexedConsumer; 7 use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; 8 use std::collections::hash_map::RandomState; 9 use std::sync::Arc; 10 11 impl<K, V, S> ParallelExtend<(K, V)> for DashMap<K, V, S> 12 where 13 K: Send + Sync + Eq + Hash, 14 V: Send + Sync, 15 S: Send + Sync + Clone + BuildHasher, 16 { par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (K, V)>,17 fn par_extend<I>(&mut self, par_iter: I) 18 where 19 I: IntoParallelIterator<Item = (K, V)>, 20 { 21 (&*self).par_extend(par_iter); 22 } 23 } 24 25 // Since we don't actually need mutability, we can implement this on a 26 // reference, similar to `io::Write for &File`. 27 impl<K, V, S> ParallelExtend<(K, V)> for &'_ DashMap<K, V, S> 28 where 29 K: Send + Sync + Eq + Hash, 30 V: Send + Sync, 31 S: Send + Sync + Clone + BuildHasher, 32 { par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = (K, V)>,33 fn par_extend<I>(&mut self, par_iter: I) 34 where 35 I: IntoParallelIterator<Item = (K, V)>, 36 { 37 let &mut map = self; 38 par_iter.into_par_iter().for_each(move |(key, value)| { 39 map.insert(key, value); 40 }); 41 } 42 } 43 44 impl<K, V, S> FromParallelIterator<(K, V)> for DashMap<K, V, S> 45 where 46 K: Send + Sync + Eq + Hash, 47 V: Send + Sync, 48 S: Send + Sync + Clone + Default + BuildHasher, 49 { from_par_iter<I>(par_iter: I) -> Self where I: IntoParallelIterator<Item = (K, V)>,50 fn from_par_iter<I>(par_iter: I) -> Self 51 where 52 I: IntoParallelIterator<Item = (K, V)>, 53 { 54 let map = Self::default(); 55 (&map).par_extend(par_iter); 56 map 57 } 58 } 59 60 // Implementation note: while the shards will iterate in parallel, we flatten 61 // sequentially within each shard (`flat_map_iter`), because the standard 62 // `HashMap` only implements `ParallelIterator` by collecting to a `Vec` first. 63 // There is real parallel support in the `hashbrown/rayon` feature, but we don't 64 // always use that map. 65 66 impl<K, V, S> IntoParallelIterator for DashMap<K, V, S> 67 where 68 K: Send + Eq + Hash, 69 V: Send, 70 S: Send + Clone + BuildHasher, 71 { 72 type Iter = OwningIter<K, V, S>; 73 type Item = (K, V); 74 into_par_iter(self) -> Self::Iter75 fn into_par_iter(self) -> Self::Iter { 76 OwningIter { 77 shards: self.shards, 78 } 79 } 80 } 81 82 pub struct OwningIter<K, V, S = RandomState> { 83 pub(super) shards: Box<[RwLock<HashMap<K, V, S>>]>, 84 } 85 86 impl<K, V, S> ParallelIterator for OwningIter<K, V, S> 87 where 88 K: Send + Eq + Hash, 89 V: Send, 90 S: Send + Clone + BuildHasher, 91 { 92 type Item = (K, V); 93 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,94 fn drive_unindexed<C>(self, consumer: C) -> C::Result 95 where 96 C: UnindexedConsumer<Self::Item>, 97 { 98 Vec::from(self.shards) 99 .into_par_iter() 100 .flat_map_iter(|shard| { 101 shard 102 .into_inner() 103 .into_iter() 104 .map(|(k, v)| (k, v.into_inner())) 105 }) 106 .drive_unindexed(consumer) 107 } 108 } 109 110 // This impl also enables `IntoParallelRefIterator::par_iter` 111 impl<'a, K, V, S> IntoParallelIterator for &'a DashMap<K, V, S> 112 where 113 K: Send + Sync + Eq + Hash, 114 V: Send + Sync, 115 S: Send + Sync + Clone + BuildHasher, 116 { 117 type Iter = Iter<'a, K, V, S>; 118 type Item = RefMulti<'a, K, V, S>; 119 into_par_iter(self) -> Self::Iter120 fn into_par_iter(self) -> Self::Iter { 121 Iter { 122 shards: &self.shards, 123 } 124 } 125 } 126 127 pub struct Iter<'a, K, V, S = RandomState> { 128 pub(super) shards: &'a [RwLock<HashMap<K, V, S>>], 129 } 130 131 impl<'a, K, V, S> ParallelIterator for Iter<'a, K, V, S> 132 where 133 K: Send + Sync + Eq + Hash, 134 V: Send + Sync, 135 S: Send + Sync + Clone + BuildHasher, 136 { 137 type Item = RefMulti<'a, K, V, S>; 138 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,139 fn drive_unindexed<C>(self, consumer: C) -> C::Result 140 where 141 C: UnindexedConsumer<Self::Item>, 142 { 143 self.shards 144 .into_par_iter() 145 .flat_map_iter(|shard| { 146 let guard = shard.read(); 147 let sref: &'a HashMap<K, V, S> = unsafe { util::change_lifetime_const(&*guard) }; 148 149 let guard = Arc::new(guard); 150 sref.iter().map(move |(k, v)| { 151 let guard = Arc::clone(&guard); 152 unsafe { RefMulti::new(guard, k, v.get()) } 153 }) 154 }) 155 .drive_unindexed(consumer) 156 } 157 } 158 159 // This impl also enables `IntoParallelRefMutIterator::par_iter_mut` 160 impl<'a, K, V, S> IntoParallelIterator for &'a mut DashMap<K, V, S> 161 where 162 K: Send + Sync + Eq + Hash, 163 V: Send + Sync, 164 S: Send + Sync + Clone + BuildHasher, 165 { 166 type Iter = IterMut<'a, K, V, S>; 167 type Item = RefMutMulti<'a, K, V, S>; 168 into_par_iter(self) -> Self::Iter169 fn into_par_iter(self) -> Self::Iter { 170 IterMut { 171 shards: &self.shards, 172 } 173 } 174 } 175 176 impl<K, V, S> DashMap<K, V, S> 177 where 178 K: Send + Sync + Eq + Hash, 179 V: Send + Sync, 180 S: Send + Sync + Clone + BuildHasher, 181 { 182 // Unlike `IntoParallelRefMutIterator::par_iter_mut`, we only _need_ `&self`. par_iter_mut(&self) -> IterMut<'_, K, V, S>183 pub fn par_iter_mut(&self) -> IterMut<'_, K, V, S> { 184 IterMut { 185 shards: &self.shards, 186 } 187 } 188 } 189 190 pub struct IterMut<'a, K, V, S = RandomState> { 191 shards: &'a [RwLock<HashMap<K, V, S>>], 192 } 193 194 impl<'a, K, V, S> ParallelIterator for IterMut<'a, K, V, S> 195 where 196 K: Send + Sync + Eq + Hash, 197 V: Send + Sync, 198 S: Send + Sync + Clone + BuildHasher, 199 { 200 type Item = RefMutMulti<'a, K, V, S>; 201 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,202 fn drive_unindexed<C>(self, consumer: C) -> C::Result 203 where 204 C: UnindexedConsumer<Self::Item>, 205 { 206 self.shards 207 .into_par_iter() 208 .flat_map_iter(|shard| { 209 let mut guard = shard.write(); 210 let sref: &'a mut HashMap<K, V, S> = 211 unsafe { util::change_lifetime_mut(&mut *guard) }; 212 213 let guard = Arc::new(guard); 214 sref.iter_mut().map(move |(k, v)| { 215 let guard = Arc::clone(&guard); 216 unsafe { RefMutMulti::new(guard, k, v.get_mut()) } 217 }) 218 }) 219 .drive_unindexed(consumer) 220 } 221 } 222