1 use crate::Stream;
2 
3 use std::borrow::Borrow;
4 use std::future::poll_fn;
5 use std::hash::Hash;
6 use std::pin::Pin;
7 use std::task::{ready, Context, Poll};
8 
9 /// Combine many streams into one, indexing each source stream with a unique
10 /// key.
11 ///
12 /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
13 /// streams into a single merged stream that yields values in the order that
14 /// they arrive from the source streams. However, `StreamMap` has a lot more
15 /// flexibility in usage patterns.
16 ///
17 /// `StreamMap` can:
18 ///
19 /// * Merge an arbitrary number of streams.
20 /// * Track which source stream the value was received from.
21 /// * Handle inserting and removing streams from the set of managed streams at
22 ///   any point during iteration.
23 ///
24 /// All source streams held by `StreamMap` are indexed using a key. This key is
25 /// included with the value when a source stream yields a value. The key is also
26 /// used to remove the stream from the `StreamMap` before the stream has
27 /// completed streaming.
28 ///
29 /// # `Unpin`
30 ///
31 /// Because the `StreamMap` API moves streams during runtime, both streams and
32 /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
33 /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
34 /// pin the stream in the heap.
35 ///
36 /// # Implementation
37 ///
38 /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
39 /// internal implementation detail will persist in future versions, but it is
40 /// important to know the runtime implications. In general, `StreamMap` works
41 /// best with a "smallish" number of streams as all entries are scanned on
42 /// insert, remove, and polling. In cases where a large number of streams need
43 /// to be merged, it may be advisable to use tasks sending values on a shared
44 /// [`mpsc`] channel.
45 ///
46 /// # Notes
47 ///
48 /// `StreamMap` removes finished streams automatically, without alerting the user.
49 /// In some scenarios, the caller would want to know on closed streams.
50 /// To do this, use [`StreamNotifyClose`] as a wrapper to your stream.
51 /// It will return None when the stream is closed.
52 ///
53 /// [`StreamExt::merge`]: crate::StreamExt::merge
54 /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
55 /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
56 /// [`Box::pin`]: std::boxed::Box::pin
57 /// [`StreamNotifyClose`]: crate::StreamNotifyClose
58 ///
59 /// # Examples
60 ///
61 /// Merging two streams, then remove them after receiving the first value
62 ///
63 /// ```
64 /// use tokio_stream::{StreamExt, StreamMap, Stream};
65 /// use tokio::sync::mpsc;
66 /// use std::pin::Pin;
67 ///
68 /// #[tokio::main]
69 /// async fn main() {
70 ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
71 ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
72 ///
73 ///     // Convert the channels to a `Stream`.
74 ///     let rx1 = Box::pin(async_stream::stream! {
75 ///           while let Some(item) = rx1.recv().await {
76 ///               yield item;
77 ///           }
78 ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
79 ///
80 ///     let rx2 = Box::pin(async_stream::stream! {
81 ///           while let Some(item) = rx2.recv().await {
82 ///               yield item;
83 ///           }
84 ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
85 ///
86 ///     tokio::spawn(async move {
87 ///         tx1.send(1).await.unwrap();
88 ///
89 ///         // This value will never be received. The send may or may not return
90 ///         // `Err` depending on if the remote end closed first or not.
91 ///         let _ = tx1.send(2).await;
92 ///     });
93 ///
94 ///     tokio::spawn(async move {
95 ///         tx2.send(3).await.unwrap();
96 ///         let _ = tx2.send(4).await;
97 ///     });
98 ///
99 ///     let mut map = StreamMap::new();
100 ///
101 ///     // Insert both streams
102 ///     map.insert("one", rx1);
103 ///     map.insert("two", rx2);
104 ///
105 ///     // Read twice
106 ///     for _ in 0..2 {
107 ///         let (key, val) = map.next().await.unwrap();
108 ///
109 ///         if key == "one" {
110 ///             assert_eq!(val, 1);
111 ///         } else {
112 ///             assert_eq!(val, 3);
113 ///         }
114 ///
115 ///         // Remove the stream to prevent reading the next value
116 ///         map.remove(key);
117 ///     }
118 /// }
119 /// ```
120 ///
121 /// This example models a read-only client to a chat system with channels. The
122 /// client sends commands to join and leave channels. `StreamMap` is used to
123 /// manage active channel subscriptions.
124 ///
125 /// For simplicity, messages are displayed with `println!`, but they could be
126 /// sent to the client over a socket.
127 ///
128 /// ```no_run
129 /// use tokio_stream::{Stream, StreamExt, StreamMap};
130 ///
131 /// enum Command {
132 ///     Join(String),
133 ///     Leave(String),
134 /// }
135 ///
136 /// fn commands() -> impl Stream<Item = Command> {
137 ///     // Streams in user commands by parsing `stdin`.
138 /// # tokio_stream::pending()
139 /// }
140 ///
141 /// // Join a channel, returns a stream of messages received on the channel.
142 /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
143 ///     // left as an exercise to the reader
144 /// # tokio_stream::pending()
145 /// }
146 ///
147 /// #[tokio::main]
148 /// async fn main() {
149 ///     let mut channels = StreamMap::new();
150 ///
151 ///     // Input commands (join / leave channels).
152 ///     let cmds = commands();
153 ///     tokio::pin!(cmds);
154 ///
155 ///     loop {
156 ///         tokio::select! {
157 ///             Some(cmd) = cmds.next() => {
158 ///                 match cmd {
159 ///                     Command::Join(chan) => {
160 ///                         // Join the channel and add it to the `channels`
161 ///                         // stream map
162 ///                         let msgs = join(&chan);
163 ///                         channels.insert(chan, msgs);
164 ///                     }
165 ///                     Command::Leave(chan) => {
166 ///                         channels.remove(&chan);
167 ///                     }
168 ///                 }
169 ///             }
170 ///             Some((chan, msg)) = channels.next() => {
171 ///                 // Received a message, display it on stdout with the channel
172 ///                 // it originated from.
173 ///                 println!("{}: {}", chan, msg);
174 ///             }
175 ///             // Both the `commands` stream and the `channels` stream are
176 ///             // complete. There is no more work to do, so leave the loop.
177 ///             else => break,
178 ///         }
179 ///     }
180 /// }
181 /// ```
182 ///
183 /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
184 ///
185 /// ```
186 /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
187 ///
188 /// #[tokio::main]
189 /// async fn main() {
190 ///     let mut map = StreamMap::new();
191 ///     let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
192 ///     let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
193 ///     map.insert(0, stream);
194 ///     map.insert(1, stream2);
195 ///     while let Some((key, val)) = map.next().await {
196 ///         match val {
197 ///             Some(val) => println!("got {val:?} from stream {key:?}"),
198 ///             None => println!("stream {key:?} closed"),
199 ///         }
200 ///     }
201 /// }
202 /// ```
203 
204 #[derive(Debug)]
205 pub struct StreamMap<K, V> {
206     /// Streams stored in the map
207     entries: Vec<(K, V)>,
208 }
209 
210 impl<K, V> StreamMap<K, V> {
211     /// An iterator visiting all key-value pairs in arbitrary order.
212     ///
213     /// The iterator element type is `&'a (K, V)`.
214     ///
215     /// # Examples
216     ///
217     /// ```
218     /// use tokio_stream::{StreamMap, pending};
219     ///
220     /// let mut map = StreamMap::new();
221     ///
222     /// map.insert("a", pending::<i32>());
223     /// map.insert("b", pending());
224     /// map.insert("c", pending());
225     ///
226     /// for (key, stream) in map.iter() {
227     ///     println!("({}, {:?})", key, stream);
228     /// }
229     /// ```
iter(&self) -> impl Iterator<Item = &(K, V)>230     pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
231         self.entries.iter()
232     }
233 
234     /// An iterator visiting all key-value pairs mutably in arbitrary order.
235     ///
236     /// The iterator element type is `&'a mut (K, V)`.
237     ///
238     /// # Examples
239     ///
240     /// ```
241     /// use tokio_stream::{StreamMap, pending};
242     ///
243     /// let mut map = StreamMap::new();
244     ///
245     /// map.insert("a", pending::<i32>());
246     /// map.insert("b", pending());
247     /// map.insert("c", pending());
248     ///
249     /// for (key, stream) in map.iter_mut() {
250     ///     println!("({}, {:?})", key, stream);
251     /// }
252     /// ```
iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)>253     pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
254         self.entries.iter_mut()
255     }
256 
257     /// Creates an empty `StreamMap`.
258     ///
259     /// The stream map is initially created with a capacity of `0`, so it will
260     /// not allocate until it is first inserted into.
261     ///
262     /// # Examples
263     ///
264     /// ```
265     /// use tokio_stream::{StreamMap, Pending};
266     ///
267     /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
268     /// ```
new() -> StreamMap<K, V>269     pub fn new() -> StreamMap<K, V> {
270         StreamMap { entries: vec![] }
271     }
272 
273     /// Creates an empty `StreamMap` with the specified capacity.
274     ///
275     /// The stream map will be able to hold at least `capacity` elements without
276     /// reallocating. If `capacity` is 0, the stream map will not allocate.
277     ///
278     /// # Examples
279     ///
280     /// ```
281     /// use tokio_stream::{StreamMap, Pending};
282     ///
283     /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
284     /// ```
with_capacity(capacity: usize) -> StreamMap<K, V>285     pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
286         StreamMap {
287             entries: Vec::with_capacity(capacity),
288         }
289     }
290 
291     /// Returns an iterator visiting all keys in arbitrary order.
292     ///
293     /// The iterator element type is `&'a K`.
294     ///
295     /// # Examples
296     ///
297     /// ```
298     /// use tokio_stream::{StreamMap, pending};
299     ///
300     /// let mut map = StreamMap::new();
301     ///
302     /// map.insert("a", pending::<i32>());
303     /// map.insert("b", pending());
304     /// map.insert("c", pending());
305     ///
306     /// for key in map.keys() {
307     ///     println!("{}", key);
308     /// }
309     /// ```
keys(&self) -> impl Iterator<Item = &K>310     pub fn keys(&self) -> impl Iterator<Item = &K> {
311         self.iter().map(|(k, _)| k)
312     }
313 
314     /// An iterator visiting all values in arbitrary order.
315     ///
316     /// The iterator element type is `&'a V`.
317     ///
318     /// # Examples
319     ///
320     /// ```
321     /// use tokio_stream::{StreamMap, pending};
322     ///
323     /// let mut map = StreamMap::new();
324     ///
325     /// map.insert("a", pending::<i32>());
326     /// map.insert("b", pending());
327     /// map.insert("c", pending());
328     ///
329     /// for stream in map.values() {
330     ///     println!("{:?}", stream);
331     /// }
332     /// ```
values(&self) -> impl Iterator<Item = &V>333     pub fn values(&self) -> impl Iterator<Item = &V> {
334         self.iter().map(|(_, v)| v)
335     }
336 
337     /// An iterator visiting all values mutably in arbitrary order.
338     ///
339     /// The iterator element type is `&'a mut V`.
340     ///
341     /// # Examples
342     ///
343     /// ```
344     /// use tokio_stream::{StreamMap, pending};
345     ///
346     /// let mut map = StreamMap::new();
347     ///
348     /// map.insert("a", pending::<i32>());
349     /// map.insert("b", pending());
350     /// map.insert("c", pending());
351     ///
352     /// for stream in map.values_mut() {
353     ///     println!("{:?}", stream);
354     /// }
355     /// ```
values_mut(&mut self) -> impl Iterator<Item = &mut V>356     pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
357         self.iter_mut().map(|(_, v)| v)
358     }
359 
360     /// Returns the number of streams the map can hold without reallocating.
361     ///
362     /// This number is a lower bound; the `StreamMap` might be able to hold
363     /// more, but is guaranteed to be able to hold at least this many.
364     ///
365     /// # Examples
366     ///
367     /// ```
368     /// use tokio_stream::{StreamMap, Pending};
369     ///
370     /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
371     /// assert!(map.capacity() >= 100);
372     /// ```
capacity(&self) -> usize373     pub fn capacity(&self) -> usize {
374         self.entries.capacity()
375     }
376 
377     /// Returns the number of streams in the map.
378     ///
379     /// # Examples
380     ///
381     /// ```
382     /// use tokio_stream::{StreamMap, pending};
383     ///
384     /// let mut a = StreamMap::new();
385     /// assert_eq!(a.len(), 0);
386     /// a.insert(1, pending::<i32>());
387     /// assert_eq!(a.len(), 1);
388     /// ```
len(&self) -> usize389     pub fn len(&self) -> usize {
390         self.entries.len()
391     }
392 
393     /// Returns `true` if the map contains no elements.
394     ///
395     /// # Examples
396     ///
397     /// ```
398     /// use tokio_stream::{StreamMap, pending};
399     ///
400     /// let mut a = StreamMap::new();
401     /// assert!(a.is_empty());
402     /// a.insert(1, pending::<i32>());
403     /// assert!(!a.is_empty());
404     /// ```
is_empty(&self) -> bool405     pub fn is_empty(&self) -> bool {
406         self.entries.is_empty()
407     }
408 
409     /// Clears the map, removing all key-stream pairs. Keeps the allocated
410     /// memory for reuse.
411     ///
412     /// # Examples
413     ///
414     /// ```
415     /// use tokio_stream::{StreamMap, pending};
416     ///
417     /// let mut a = StreamMap::new();
418     /// a.insert(1, pending::<i32>());
419     /// a.clear();
420     /// assert!(a.is_empty());
421     /// ```
clear(&mut self)422     pub fn clear(&mut self) {
423         self.entries.clear();
424     }
425 
426     /// Insert a key-stream pair into the map.
427     ///
428     /// If the map did not have this key present, `None` is returned.
429     ///
430     /// If the map did have this key present, the new `stream` replaces the old
431     /// one and the old stream is returned.
432     ///
433     /// # Examples
434     ///
435     /// ```
436     /// use tokio_stream::{StreamMap, pending};
437     ///
438     /// let mut map = StreamMap::new();
439     ///
440     /// assert!(map.insert(37, pending::<i32>()).is_none());
441     /// assert!(!map.is_empty());
442     ///
443     /// map.insert(37, pending());
444     /// assert!(map.insert(37, pending()).is_some());
445     /// ```
insert(&mut self, k: K, stream: V) -> Option<V> where K: Hash + Eq,446     pub fn insert(&mut self, k: K, stream: V) -> Option<V>
447     where
448         K: Hash + Eq,
449     {
450         let ret = self.remove(&k);
451         self.entries.push((k, stream));
452 
453         ret
454     }
455 
456     /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
457     ///
458     /// The key may be any borrowed form of the map's key type, but `Hash` and
459     /// `Eq` on the borrowed form must match those for the key type.
460     ///
461     /// # Examples
462     ///
463     /// ```
464     /// use tokio_stream::{StreamMap, pending};
465     ///
466     /// let mut map = StreamMap::new();
467     /// map.insert(1, pending::<i32>());
468     /// assert!(map.remove(&1).is_some());
469     /// assert!(map.remove(&1).is_none());
470     /// ```
remove<Q>(&mut self, k: &Q) -> Option<V> where K: Borrow<Q>, Q: Hash + Eq + ?Sized,471     pub fn remove<Q>(&mut self, k: &Q) -> Option<V>
472     where
473         K: Borrow<Q>,
474         Q: Hash + Eq + ?Sized,
475     {
476         for i in 0..self.entries.len() {
477             if self.entries[i].0.borrow() == k {
478                 return Some(self.entries.swap_remove(i).1);
479             }
480         }
481 
482         None
483     }
484 
485     /// Returns `true` if the map contains a stream for the specified key.
486     ///
487     /// The key may be any borrowed form of the map's key type, but `Hash` and
488     /// `Eq` on the borrowed form must match those for the key type.
489     ///
490     /// # Examples
491     ///
492     /// ```
493     /// use tokio_stream::{StreamMap, pending};
494     ///
495     /// let mut map = StreamMap::new();
496     /// map.insert(1, pending::<i32>());
497     /// assert_eq!(map.contains_key(&1), true);
498     /// assert_eq!(map.contains_key(&2), false);
499     /// ```
contains_key<Q>(&self, k: &Q) -> bool where K: Borrow<Q>, Q: Hash + Eq + ?Sized,500     pub fn contains_key<Q>(&self, k: &Q) -> bool
501     where
502         K: Borrow<Q>,
503         Q: Hash + Eq + ?Sized,
504     {
505         for i in 0..self.entries.len() {
506             if self.entries[i].0.borrow() == k {
507                 return true;
508             }
509         }
510 
511         false
512     }
513 }
514 
515 impl<K, V> StreamMap<K, V>
516 where
517     K: Unpin,
518     V: Stream + Unpin,
519 {
520     /// Polls the next value, includes the vec entry index
poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>>521     fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
522         let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
523         let mut idx = start;
524 
525         for _ in 0..self.entries.len() {
526             let (_, stream) = &mut self.entries[idx];
527 
528             match Pin::new(stream).poll_next(cx) {
529                 Poll::Ready(Some(val)) => return Poll::Ready(Some((idx, val))),
530                 Poll::Ready(None) => {
531                     // Remove the entry
532                     self.entries.swap_remove(idx);
533 
534                     // Check if this was the last entry, if so the cursor needs
535                     // to wrap
536                     if idx == self.entries.len() {
537                         idx = 0;
538                     } else if idx < start && start <= self.entries.len() {
539                         // The stream being swapped into the current index has
540                         // already been polled, so skip it.
541                         idx = idx.wrapping_add(1) % self.entries.len();
542                     }
543                 }
544                 Poll::Pending => {
545                     idx = idx.wrapping_add(1) % self.entries.len();
546                 }
547             }
548         }
549 
550         // If the map is empty, then the stream is complete.
551         if self.entries.is_empty() {
552             Poll::Ready(None)
553         } else {
554             Poll::Pending
555         }
556     }
557 }
558 
559 impl<K, V> Default for StreamMap<K, V> {
default() -> Self560     fn default() -> Self {
561         Self::new()
562     }
563 }
564 
565 impl<K, V> StreamMap<K, V>
566 where
567     K: Clone + Unpin,
568     V: Stream + Unpin,
569 {
570     /// Receives multiple items on this [`StreamMap`], extending the provided `buffer`.
571     ///
572     /// This method returns the number of items that is appended to the `buffer`.
573     ///
574     /// Note that this method does not guarantee that exactly `limit` items
575     /// are received. Rather, if at least one item is available, it returns
576     /// as many items as it can up to the given limit. This method returns
577     /// zero only if the `StreamMap` is empty (or if `limit` is zero).
578     ///
579     /// # Cancel safety
580     ///
581     /// This method is cancel safe. If `next_many` is used as the event in a
582     /// [`tokio::select!`](tokio::select) statement and some other branch
583     /// completes first, it is guaranteed that no items were received on any of
584     /// the underlying streams.
next_many(&mut self, buffer: &mut Vec<(K, V::Item)>, limit: usize) -> usize585     pub async fn next_many(&mut self, buffer: &mut Vec<(K, V::Item)>, limit: usize) -> usize {
586         poll_fn(|cx| self.poll_next_many(cx, buffer, limit)).await
587     }
588 
589     /// Polls to receive multiple items on this `StreamMap`, extending the provided `buffer`.
590     ///
591     /// This method returns:
592     /// * `Poll::Pending` if no items are available but the `StreamMap` is not empty.
593     /// * `Poll::Ready(count)` where `count` is the number of items successfully received and
594     ///   stored in `buffer`. This can be less than, or equal to, `limit`.
595     /// * `Poll::Ready(0)` if `limit` is set to zero or when the `StreamMap` is empty.
596     ///
597     /// Note that this method does not guarantee that exactly `limit` items
598     /// are received. Rather, if at least one item is available, it returns
599     /// as many items as it can up to the given limit. This method returns
600     /// zero only if the `StreamMap` is empty (or if `limit` is zero).
poll_next_many( &mut self, cx: &mut Context<'_>, buffer: &mut Vec<(K, V::Item)>, limit: usize, ) -> Poll<usize>601     pub fn poll_next_many(
602         &mut self,
603         cx: &mut Context<'_>,
604         buffer: &mut Vec<(K, V::Item)>,
605         limit: usize,
606     ) -> Poll<usize> {
607         if limit == 0 || self.entries.is_empty() {
608             return Poll::Ready(0);
609         }
610 
611         let mut added = 0;
612 
613         let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
614         let mut idx = start;
615 
616         while added < limit {
617             // Indicates whether at least one stream returned a value when polled or not
618             let mut should_loop = false;
619 
620             for _ in 0..self.entries.len() {
621                 let (_, stream) = &mut self.entries[idx];
622 
623                 match Pin::new(stream).poll_next(cx) {
624                     Poll::Ready(Some(val)) => {
625                         added += 1;
626 
627                         let key = self.entries[idx].0.clone();
628                         buffer.push((key, val));
629 
630                         should_loop = true;
631 
632                         idx = idx.wrapping_add(1) % self.entries.len();
633                     }
634                     Poll::Ready(None) => {
635                         // Remove the entry
636                         self.entries.swap_remove(idx);
637 
638                         // Check if this was the last entry, if so the cursor needs
639                         // to wrap
640                         if idx == self.entries.len() {
641                             idx = 0;
642                         } else if idx < start && start <= self.entries.len() {
643                             // The stream being swapped into the current index has
644                             // already been polled, so skip it.
645                             idx = idx.wrapping_add(1) % self.entries.len();
646                         }
647                     }
648                     Poll::Pending => {
649                         idx = idx.wrapping_add(1) % self.entries.len();
650                     }
651                 }
652             }
653 
654             if !should_loop {
655                 break;
656             }
657         }
658 
659         if added > 0 {
660             Poll::Ready(added)
661         } else if self.entries.is_empty() {
662             Poll::Ready(0)
663         } else {
664             Poll::Pending
665         }
666     }
667 }
668 
669 impl<K, V> Stream for StreamMap<K, V>
670 where
671     K: Clone + Unpin,
672     V: Stream + Unpin,
673 {
674     type Item = (K, V::Item);
675 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>676     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
677         if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
678             let key = self.entries[idx].0.clone();
679             Poll::Ready(Some((key, val)))
680         } else {
681             Poll::Ready(None)
682         }
683     }
684 
size_hint(&self) -> (usize, Option<usize>)685     fn size_hint(&self) -> (usize, Option<usize>) {
686         let mut ret = (0, Some(0));
687 
688         for (_, stream) in &self.entries {
689             let hint = stream.size_hint();
690 
691             ret.0 += hint.0;
692 
693             match (ret.1, hint.1) {
694                 (Some(a), Some(b)) => ret.1 = Some(a + b),
695                 (Some(_), None) => ret.1 = None,
696                 _ => {}
697             }
698         }
699 
700         ret
701     }
702 }
703 
704 impl<K, V> FromIterator<(K, V)> for StreamMap<K, V>
705 where
706     K: Hash + Eq,
707 {
from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self708     fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
709         let iterator = iter.into_iter();
710         let (lower_bound, _) = iterator.size_hint();
711         let mut stream_map = Self::with_capacity(lower_bound);
712 
713         for (key, value) in iterator {
714             stream_map.insert(key, value);
715         }
716 
717         stream_map
718     }
719 }
720 
721 impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
extend<T>(&mut self, iter: T) where T: IntoIterator<Item = (K, V)>,722     fn extend<T>(&mut self, iter: T)
723     where
724         T: IntoIterator<Item = (K, V)>,
725     {
726         self.entries.extend(iter);
727     }
728 }
729 
730 mod rand {
731     use std::cell::Cell;
732 
733     mod loom {
734         #[cfg(not(loom))]
735         pub(crate) mod rand {
736             use std::collections::hash_map::RandomState;
737             use std::hash::{BuildHasher, Hash, Hasher};
738             use std::sync::atomic::AtomicU32;
739             use std::sync::atomic::Ordering::Relaxed;
740 
741             static COUNTER: AtomicU32 = AtomicU32::new(1);
742 
seed() -> u64743             pub(crate) fn seed() -> u64 {
744                 let rand_state = RandomState::new();
745 
746                 let mut hasher = rand_state.build_hasher();
747 
748                 // Hash some unique-ish data to generate some new state
749                 COUNTER.fetch_add(1, Relaxed).hash(&mut hasher);
750 
751                 // Get the seed
752                 hasher.finish()
753             }
754         }
755 
756         #[cfg(loom)]
757         pub(crate) mod rand {
seed() -> u64758             pub(crate) fn seed() -> u64 {
759                 1
760             }
761         }
762     }
763 
764     /// Fast random number generate
765     ///
766     /// Implement `xorshift64+`: 2 32-bit `xorshift` sequences added together.
767     /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
768     /// `Xorshift` paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf>
769     /// This generator passes the SmallCrush suite, part of TestU01 framework:
770     /// <http://simul.iro.umontreal.ca/testu01/tu01.html>
771     #[derive(Debug)]
772     pub(crate) struct FastRand {
773         one: Cell<u32>,
774         two: Cell<u32>,
775     }
776 
777     impl FastRand {
778         /// Initialize a new, thread-local, fast random number generator.
new(seed: u64) -> FastRand779         pub(crate) fn new(seed: u64) -> FastRand {
780             let one = (seed >> 32) as u32;
781             let mut two = seed as u32;
782 
783             if two == 0 {
784                 // This value cannot be zero
785                 two = 1;
786             }
787 
788             FastRand {
789                 one: Cell::new(one),
790                 two: Cell::new(two),
791             }
792         }
793 
fastrand_n(&self, n: u32) -> u32794         pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
795             // This is similar to fastrand() % n, but faster.
796             // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
797             let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
798             (mul >> 32) as u32
799         }
800 
fastrand(&self) -> u32801         fn fastrand(&self) -> u32 {
802             let mut s1 = self.one.get();
803             let s0 = self.two.get();
804 
805             s1 ^= s1 << 17;
806             s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
807 
808             self.one.set(s0);
809             self.two.set(s1);
810 
811             s0.wrapping_add(s1)
812         }
813     }
814 
815     // Used by `StreamMap`
thread_rng_n(n: u32) -> u32816     pub(crate) fn thread_rng_n(n: u32) -> u32 {
817         thread_local! {
818             static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
819         }
820 
821         THREAD_RNG.with(|rng| rng.fastrand_n(n))
822     }
823 }
824