1 use crate::Stream; 2 3 use std::borrow::Borrow; 4 use std::future::poll_fn; 5 use std::hash::Hash; 6 use std::pin::Pin; 7 use std::task::{ready, Context, Poll}; 8 9 /// Combine many streams into one, indexing each source stream with a unique 10 /// key. 11 /// 12 /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source 13 /// streams into a single merged stream that yields values in the order that 14 /// they arrive from the source streams. However, `StreamMap` has a lot more 15 /// flexibility in usage patterns. 16 /// 17 /// `StreamMap` can: 18 /// 19 /// * Merge an arbitrary number of streams. 20 /// * Track which source stream the value was received from. 21 /// * Handle inserting and removing streams from the set of managed streams at 22 /// any point during iteration. 23 /// 24 /// All source streams held by `StreamMap` are indexed using a key. This key is 25 /// included with the value when a source stream yields a value. The key is also 26 /// used to remove the stream from the `StreamMap` before the stream has 27 /// completed streaming. 28 /// 29 /// # `Unpin` 30 /// 31 /// Because the `StreamMap` API moves streams during runtime, both streams and 32 /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a 33 /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to 34 /// pin the stream in the heap. 35 /// 36 /// # Implementation 37 /// 38 /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this 39 /// internal implementation detail will persist in future versions, but it is 40 /// important to know the runtime implications. In general, `StreamMap` works 41 /// best with a "smallish" number of streams as all entries are scanned on 42 /// insert, remove, and polling. In cases where a large number of streams need 43 /// to be merged, it may be advisable to use tasks sending values on a shared 44 /// [`mpsc`] channel. 45 /// 46 /// # Notes 47 /// 48 /// `StreamMap` removes finished streams automatically, without alerting the user. 49 /// In some scenarios, the caller would want to know on closed streams. 50 /// To do this, use [`StreamNotifyClose`] as a wrapper to your stream. 51 /// It will return None when the stream is closed. 52 /// 53 /// [`StreamExt::merge`]: crate::StreamExt::merge 54 /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html 55 /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html 56 /// [`Box::pin`]: std::boxed::Box::pin 57 /// [`StreamNotifyClose`]: crate::StreamNotifyClose 58 /// 59 /// # Examples 60 /// 61 /// Merging two streams, then remove them after receiving the first value 62 /// 63 /// ``` 64 /// use tokio_stream::{StreamExt, StreamMap, Stream}; 65 /// use tokio::sync::mpsc; 66 /// use std::pin::Pin; 67 /// 68 /// #[tokio::main] 69 /// async fn main() { 70 /// let (tx1, mut rx1) = mpsc::channel::<usize>(10); 71 /// let (tx2, mut rx2) = mpsc::channel::<usize>(10); 72 /// 73 /// // Convert the channels to a `Stream`. 74 /// let rx1 = Box::pin(async_stream::stream! { 75 /// while let Some(item) = rx1.recv().await { 76 /// yield item; 77 /// } 78 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; 79 /// 80 /// let rx2 = Box::pin(async_stream::stream! { 81 /// while let Some(item) = rx2.recv().await { 82 /// yield item; 83 /// } 84 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; 85 /// 86 /// tokio::spawn(async move { 87 /// tx1.send(1).await.unwrap(); 88 /// 89 /// // This value will never be received. The send may or may not return 90 /// // `Err` depending on if the remote end closed first or not. 91 /// let _ = tx1.send(2).await; 92 /// }); 93 /// 94 /// tokio::spawn(async move { 95 /// tx2.send(3).await.unwrap(); 96 /// let _ = tx2.send(4).await; 97 /// }); 98 /// 99 /// let mut map = StreamMap::new(); 100 /// 101 /// // Insert both streams 102 /// map.insert("one", rx1); 103 /// map.insert("two", rx2); 104 /// 105 /// // Read twice 106 /// for _ in 0..2 { 107 /// let (key, val) = map.next().await.unwrap(); 108 /// 109 /// if key == "one" { 110 /// assert_eq!(val, 1); 111 /// } else { 112 /// assert_eq!(val, 3); 113 /// } 114 /// 115 /// // Remove the stream to prevent reading the next value 116 /// map.remove(key); 117 /// } 118 /// } 119 /// ``` 120 /// 121 /// This example models a read-only client to a chat system with channels. The 122 /// client sends commands to join and leave channels. `StreamMap` is used to 123 /// manage active channel subscriptions. 124 /// 125 /// For simplicity, messages are displayed with `println!`, but they could be 126 /// sent to the client over a socket. 127 /// 128 /// ```no_run 129 /// use tokio_stream::{Stream, StreamExt, StreamMap}; 130 /// 131 /// enum Command { 132 /// Join(String), 133 /// Leave(String), 134 /// } 135 /// 136 /// fn commands() -> impl Stream<Item = Command> { 137 /// // Streams in user commands by parsing `stdin`. 138 /// # tokio_stream::pending() 139 /// } 140 /// 141 /// // Join a channel, returns a stream of messages received on the channel. 142 /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin { 143 /// // left as an exercise to the reader 144 /// # tokio_stream::pending() 145 /// } 146 /// 147 /// #[tokio::main] 148 /// async fn main() { 149 /// let mut channels = StreamMap::new(); 150 /// 151 /// // Input commands (join / leave channels). 152 /// let cmds = commands(); 153 /// tokio::pin!(cmds); 154 /// 155 /// loop { 156 /// tokio::select! { 157 /// Some(cmd) = cmds.next() => { 158 /// match cmd { 159 /// Command::Join(chan) => { 160 /// // Join the channel and add it to the `channels` 161 /// // stream map 162 /// let msgs = join(&chan); 163 /// channels.insert(chan, msgs); 164 /// } 165 /// Command::Leave(chan) => { 166 /// channels.remove(&chan); 167 /// } 168 /// } 169 /// } 170 /// Some((chan, msg)) = channels.next() => { 171 /// // Received a message, display it on stdout with the channel 172 /// // it originated from. 173 /// println!("{}: {}", chan, msg); 174 /// } 175 /// // Both the `commands` stream and the `channels` stream are 176 /// // complete. There is no more work to do, so leave the loop. 177 /// else => break, 178 /// } 179 /// } 180 /// } 181 /// ``` 182 /// 183 /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`. 184 /// 185 /// ``` 186 /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose}; 187 /// 188 /// #[tokio::main] 189 /// async fn main() { 190 /// let mut map = StreamMap::new(); 191 /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); 192 /// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); 193 /// map.insert(0, stream); 194 /// map.insert(1, stream2); 195 /// while let Some((key, val)) = map.next().await { 196 /// match val { 197 /// Some(val) => println!("got {val:?} from stream {key:?}"), 198 /// None => println!("stream {key:?} closed"), 199 /// } 200 /// } 201 /// } 202 /// ``` 203 204 #[derive(Debug)] 205 pub struct StreamMap<K, V> { 206 /// Streams stored in the map 207 entries: Vec<(K, V)>, 208 } 209 210 impl<K, V> StreamMap<K, V> { 211 /// An iterator visiting all key-value pairs in arbitrary order. 212 /// 213 /// The iterator element type is `&'a (K, V)`. 214 /// 215 /// # Examples 216 /// 217 /// ``` 218 /// use tokio_stream::{StreamMap, pending}; 219 /// 220 /// let mut map = StreamMap::new(); 221 /// 222 /// map.insert("a", pending::<i32>()); 223 /// map.insert("b", pending()); 224 /// map.insert("c", pending()); 225 /// 226 /// for (key, stream) in map.iter() { 227 /// println!("({}, {:?})", key, stream); 228 /// } 229 /// ``` iter(&self) -> impl Iterator<Item = &(K, V)>230 pub fn iter(&self) -> impl Iterator<Item = &(K, V)> { 231 self.entries.iter() 232 } 233 234 /// An iterator visiting all key-value pairs mutably in arbitrary order. 235 /// 236 /// The iterator element type is `&'a mut (K, V)`. 237 /// 238 /// # Examples 239 /// 240 /// ``` 241 /// use tokio_stream::{StreamMap, pending}; 242 /// 243 /// let mut map = StreamMap::new(); 244 /// 245 /// map.insert("a", pending::<i32>()); 246 /// map.insert("b", pending()); 247 /// map.insert("c", pending()); 248 /// 249 /// for (key, stream) in map.iter_mut() { 250 /// println!("({}, {:?})", key, stream); 251 /// } 252 /// ``` iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)>253 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> { 254 self.entries.iter_mut() 255 } 256 257 /// Creates an empty `StreamMap`. 258 /// 259 /// The stream map is initially created with a capacity of `0`, so it will 260 /// not allocate until it is first inserted into. 261 /// 262 /// # Examples 263 /// 264 /// ``` 265 /// use tokio_stream::{StreamMap, Pending}; 266 /// 267 /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); 268 /// ``` new() -> StreamMap<K, V>269 pub fn new() -> StreamMap<K, V> { 270 StreamMap { entries: vec![] } 271 } 272 273 /// Creates an empty `StreamMap` with the specified capacity. 274 /// 275 /// The stream map will be able to hold at least `capacity` elements without 276 /// reallocating. If `capacity` is 0, the stream map will not allocate. 277 /// 278 /// # Examples 279 /// 280 /// ``` 281 /// use tokio_stream::{StreamMap, Pending}; 282 /// 283 /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); 284 /// ``` with_capacity(capacity: usize) -> StreamMap<K, V>285 pub fn with_capacity(capacity: usize) -> StreamMap<K, V> { 286 StreamMap { 287 entries: Vec::with_capacity(capacity), 288 } 289 } 290 291 /// Returns an iterator visiting all keys in arbitrary order. 292 /// 293 /// The iterator element type is `&'a K`. 294 /// 295 /// # Examples 296 /// 297 /// ``` 298 /// use tokio_stream::{StreamMap, pending}; 299 /// 300 /// let mut map = StreamMap::new(); 301 /// 302 /// map.insert("a", pending::<i32>()); 303 /// map.insert("b", pending()); 304 /// map.insert("c", pending()); 305 /// 306 /// for key in map.keys() { 307 /// println!("{}", key); 308 /// } 309 /// ``` keys(&self) -> impl Iterator<Item = &K>310 pub fn keys(&self) -> impl Iterator<Item = &K> { 311 self.iter().map(|(k, _)| k) 312 } 313 314 /// An iterator visiting all values in arbitrary order. 315 /// 316 /// The iterator element type is `&'a V`. 317 /// 318 /// # Examples 319 /// 320 /// ``` 321 /// use tokio_stream::{StreamMap, pending}; 322 /// 323 /// let mut map = StreamMap::new(); 324 /// 325 /// map.insert("a", pending::<i32>()); 326 /// map.insert("b", pending()); 327 /// map.insert("c", pending()); 328 /// 329 /// for stream in map.values() { 330 /// println!("{:?}", stream); 331 /// } 332 /// ``` values(&self) -> impl Iterator<Item = &V>333 pub fn values(&self) -> impl Iterator<Item = &V> { 334 self.iter().map(|(_, v)| v) 335 } 336 337 /// An iterator visiting all values mutably in arbitrary order. 338 /// 339 /// The iterator element type is `&'a mut V`. 340 /// 341 /// # Examples 342 /// 343 /// ``` 344 /// use tokio_stream::{StreamMap, pending}; 345 /// 346 /// let mut map = StreamMap::new(); 347 /// 348 /// map.insert("a", pending::<i32>()); 349 /// map.insert("b", pending()); 350 /// map.insert("c", pending()); 351 /// 352 /// for stream in map.values_mut() { 353 /// println!("{:?}", stream); 354 /// } 355 /// ``` values_mut(&mut self) -> impl Iterator<Item = &mut V>356 pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> { 357 self.iter_mut().map(|(_, v)| v) 358 } 359 360 /// Returns the number of streams the map can hold without reallocating. 361 /// 362 /// This number is a lower bound; the `StreamMap` might be able to hold 363 /// more, but is guaranteed to be able to hold at least this many. 364 /// 365 /// # Examples 366 /// 367 /// ``` 368 /// use tokio_stream::{StreamMap, Pending}; 369 /// 370 /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100); 371 /// assert!(map.capacity() >= 100); 372 /// ``` capacity(&self) -> usize373 pub fn capacity(&self) -> usize { 374 self.entries.capacity() 375 } 376 377 /// Returns the number of streams in the map. 378 /// 379 /// # Examples 380 /// 381 /// ``` 382 /// use tokio_stream::{StreamMap, pending}; 383 /// 384 /// let mut a = StreamMap::new(); 385 /// assert_eq!(a.len(), 0); 386 /// a.insert(1, pending::<i32>()); 387 /// assert_eq!(a.len(), 1); 388 /// ``` len(&self) -> usize389 pub fn len(&self) -> usize { 390 self.entries.len() 391 } 392 393 /// Returns `true` if the map contains no elements. 394 /// 395 /// # Examples 396 /// 397 /// ``` 398 /// use tokio_stream::{StreamMap, pending}; 399 /// 400 /// let mut a = StreamMap::new(); 401 /// assert!(a.is_empty()); 402 /// a.insert(1, pending::<i32>()); 403 /// assert!(!a.is_empty()); 404 /// ``` is_empty(&self) -> bool405 pub fn is_empty(&self) -> bool { 406 self.entries.is_empty() 407 } 408 409 /// Clears the map, removing all key-stream pairs. Keeps the allocated 410 /// memory for reuse. 411 /// 412 /// # Examples 413 /// 414 /// ``` 415 /// use tokio_stream::{StreamMap, pending}; 416 /// 417 /// let mut a = StreamMap::new(); 418 /// a.insert(1, pending::<i32>()); 419 /// a.clear(); 420 /// assert!(a.is_empty()); 421 /// ``` clear(&mut self)422 pub fn clear(&mut self) { 423 self.entries.clear(); 424 } 425 426 /// Insert a key-stream pair into the map. 427 /// 428 /// If the map did not have this key present, `None` is returned. 429 /// 430 /// If the map did have this key present, the new `stream` replaces the old 431 /// one and the old stream is returned. 432 /// 433 /// # Examples 434 /// 435 /// ``` 436 /// use tokio_stream::{StreamMap, pending}; 437 /// 438 /// let mut map = StreamMap::new(); 439 /// 440 /// assert!(map.insert(37, pending::<i32>()).is_none()); 441 /// assert!(!map.is_empty()); 442 /// 443 /// map.insert(37, pending()); 444 /// assert!(map.insert(37, pending()).is_some()); 445 /// ``` insert(&mut self, k: K, stream: V) -> Option<V> where K: Hash + Eq,446 pub fn insert(&mut self, k: K, stream: V) -> Option<V> 447 where 448 K: Hash + Eq, 449 { 450 let ret = self.remove(&k); 451 self.entries.push((k, stream)); 452 453 ret 454 } 455 456 /// Removes a key from the map, returning the stream at the key if the key was previously in the map. 457 /// 458 /// The key may be any borrowed form of the map's key type, but `Hash` and 459 /// `Eq` on the borrowed form must match those for the key type. 460 /// 461 /// # Examples 462 /// 463 /// ``` 464 /// use tokio_stream::{StreamMap, pending}; 465 /// 466 /// let mut map = StreamMap::new(); 467 /// map.insert(1, pending::<i32>()); 468 /// assert!(map.remove(&1).is_some()); 469 /// assert!(map.remove(&1).is_none()); 470 /// ``` remove<Q>(&mut self, k: &Q) -> Option<V> where K: Borrow<Q>, Q: Hash + Eq + ?Sized,471 pub fn remove<Q>(&mut self, k: &Q) -> Option<V> 472 where 473 K: Borrow<Q>, 474 Q: Hash + Eq + ?Sized, 475 { 476 for i in 0..self.entries.len() { 477 if self.entries[i].0.borrow() == k { 478 return Some(self.entries.swap_remove(i).1); 479 } 480 } 481 482 None 483 } 484 485 /// Returns `true` if the map contains a stream for the specified key. 486 /// 487 /// The key may be any borrowed form of the map's key type, but `Hash` and 488 /// `Eq` on the borrowed form must match those for the key type. 489 /// 490 /// # Examples 491 /// 492 /// ``` 493 /// use tokio_stream::{StreamMap, pending}; 494 /// 495 /// let mut map = StreamMap::new(); 496 /// map.insert(1, pending::<i32>()); 497 /// assert_eq!(map.contains_key(&1), true); 498 /// assert_eq!(map.contains_key(&2), false); 499 /// ``` contains_key<Q>(&self, k: &Q) -> bool where K: Borrow<Q>, Q: Hash + Eq + ?Sized,500 pub fn contains_key<Q>(&self, k: &Q) -> bool 501 where 502 K: Borrow<Q>, 503 Q: Hash + Eq + ?Sized, 504 { 505 for i in 0..self.entries.len() { 506 if self.entries[i].0.borrow() == k { 507 return true; 508 } 509 } 510 511 false 512 } 513 } 514 515 impl<K, V> StreamMap<K, V> 516 where 517 K: Unpin, 518 V: Stream + Unpin, 519 { 520 /// Polls the next value, includes the vec entry index poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>>521 fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> { 522 let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; 523 let mut idx = start; 524 525 for _ in 0..self.entries.len() { 526 let (_, stream) = &mut self.entries[idx]; 527 528 match Pin::new(stream).poll_next(cx) { 529 Poll::Ready(Some(val)) => return Poll::Ready(Some((idx, val))), 530 Poll::Ready(None) => { 531 // Remove the entry 532 self.entries.swap_remove(idx); 533 534 // Check if this was the last entry, if so the cursor needs 535 // to wrap 536 if idx == self.entries.len() { 537 idx = 0; 538 } else if idx < start && start <= self.entries.len() { 539 // The stream being swapped into the current index has 540 // already been polled, so skip it. 541 idx = idx.wrapping_add(1) % self.entries.len(); 542 } 543 } 544 Poll::Pending => { 545 idx = idx.wrapping_add(1) % self.entries.len(); 546 } 547 } 548 } 549 550 // If the map is empty, then the stream is complete. 551 if self.entries.is_empty() { 552 Poll::Ready(None) 553 } else { 554 Poll::Pending 555 } 556 } 557 } 558 559 impl<K, V> Default for StreamMap<K, V> { default() -> Self560 fn default() -> Self { 561 Self::new() 562 } 563 } 564 565 impl<K, V> StreamMap<K, V> 566 where 567 K: Clone + Unpin, 568 V: Stream + Unpin, 569 { 570 /// Receives multiple items on this [`StreamMap`], extending the provided `buffer`. 571 /// 572 /// This method returns the number of items that is appended to the `buffer`. 573 /// 574 /// Note that this method does not guarantee that exactly `limit` items 575 /// are received. Rather, if at least one item is available, it returns 576 /// as many items as it can up to the given limit. This method returns 577 /// zero only if the `StreamMap` is empty (or if `limit` is zero). 578 /// 579 /// # Cancel safety 580 /// 581 /// This method is cancel safe. If `next_many` is used as the event in a 582 /// [`tokio::select!`](tokio::select) statement and some other branch 583 /// completes first, it is guaranteed that no items were received on any of 584 /// the underlying streams. next_many(&mut self, buffer: &mut Vec<(K, V::Item)>, limit: usize) -> usize585 pub async fn next_many(&mut self, buffer: &mut Vec<(K, V::Item)>, limit: usize) -> usize { 586 poll_fn(|cx| self.poll_next_many(cx, buffer, limit)).await 587 } 588 589 /// Polls to receive multiple items on this `StreamMap`, extending the provided `buffer`. 590 /// 591 /// This method returns: 592 /// * `Poll::Pending` if no items are available but the `StreamMap` is not empty. 593 /// * `Poll::Ready(count)` where `count` is the number of items successfully received and 594 /// stored in `buffer`. This can be less than, or equal to, `limit`. 595 /// * `Poll::Ready(0)` if `limit` is set to zero or when the `StreamMap` is empty. 596 /// 597 /// Note that this method does not guarantee that exactly `limit` items 598 /// are received. Rather, if at least one item is available, it returns 599 /// as many items as it can up to the given limit. This method returns 600 /// zero only if the `StreamMap` is empty (or if `limit` is zero). poll_next_many( &mut self, cx: &mut Context<'_>, buffer: &mut Vec<(K, V::Item)>, limit: usize, ) -> Poll<usize>601 pub fn poll_next_many( 602 &mut self, 603 cx: &mut Context<'_>, 604 buffer: &mut Vec<(K, V::Item)>, 605 limit: usize, 606 ) -> Poll<usize> { 607 if limit == 0 || self.entries.is_empty() { 608 return Poll::Ready(0); 609 } 610 611 let mut added = 0; 612 613 let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; 614 let mut idx = start; 615 616 while added < limit { 617 // Indicates whether at least one stream returned a value when polled or not 618 let mut should_loop = false; 619 620 for _ in 0..self.entries.len() { 621 let (_, stream) = &mut self.entries[idx]; 622 623 match Pin::new(stream).poll_next(cx) { 624 Poll::Ready(Some(val)) => { 625 added += 1; 626 627 let key = self.entries[idx].0.clone(); 628 buffer.push((key, val)); 629 630 should_loop = true; 631 632 idx = idx.wrapping_add(1) % self.entries.len(); 633 } 634 Poll::Ready(None) => { 635 // Remove the entry 636 self.entries.swap_remove(idx); 637 638 // Check if this was the last entry, if so the cursor needs 639 // to wrap 640 if idx == self.entries.len() { 641 idx = 0; 642 } else if idx < start && start <= self.entries.len() { 643 // The stream being swapped into the current index has 644 // already been polled, so skip it. 645 idx = idx.wrapping_add(1) % self.entries.len(); 646 } 647 } 648 Poll::Pending => { 649 idx = idx.wrapping_add(1) % self.entries.len(); 650 } 651 } 652 } 653 654 if !should_loop { 655 break; 656 } 657 } 658 659 if added > 0 { 660 Poll::Ready(added) 661 } else if self.entries.is_empty() { 662 Poll::Ready(0) 663 } else { 664 Poll::Pending 665 } 666 } 667 } 668 669 impl<K, V> Stream for StreamMap<K, V> 670 where 671 K: Clone + Unpin, 672 V: Stream + Unpin, 673 { 674 type Item = (K, V::Item); 675 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>676 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 677 if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) { 678 let key = self.entries[idx].0.clone(); 679 Poll::Ready(Some((key, val))) 680 } else { 681 Poll::Ready(None) 682 } 683 } 684 size_hint(&self) -> (usize, Option<usize>)685 fn size_hint(&self) -> (usize, Option<usize>) { 686 let mut ret = (0, Some(0)); 687 688 for (_, stream) in &self.entries { 689 let hint = stream.size_hint(); 690 691 ret.0 += hint.0; 692 693 match (ret.1, hint.1) { 694 (Some(a), Some(b)) => ret.1 = Some(a + b), 695 (Some(_), None) => ret.1 = None, 696 _ => {} 697 } 698 } 699 700 ret 701 } 702 } 703 704 impl<K, V> FromIterator<(K, V)> for StreamMap<K, V> 705 where 706 K: Hash + Eq, 707 { from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self708 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self { 709 let iterator = iter.into_iter(); 710 let (lower_bound, _) = iterator.size_hint(); 711 let mut stream_map = Self::with_capacity(lower_bound); 712 713 for (key, value) in iterator { 714 stream_map.insert(key, value); 715 } 716 717 stream_map 718 } 719 } 720 721 impl<K, V> Extend<(K, V)> for StreamMap<K, V> { extend<T>(&mut self, iter: T) where T: IntoIterator<Item = (K, V)>,722 fn extend<T>(&mut self, iter: T) 723 where 724 T: IntoIterator<Item = (K, V)>, 725 { 726 self.entries.extend(iter); 727 } 728 } 729 730 mod rand { 731 use std::cell::Cell; 732 733 mod loom { 734 #[cfg(not(loom))] 735 pub(crate) mod rand { 736 use std::collections::hash_map::RandomState; 737 use std::hash::{BuildHasher, Hash, Hasher}; 738 use std::sync::atomic::AtomicU32; 739 use std::sync::atomic::Ordering::Relaxed; 740 741 static COUNTER: AtomicU32 = AtomicU32::new(1); 742 seed() -> u64743 pub(crate) fn seed() -> u64 { 744 let rand_state = RandomState::new(); 745 746 let mut hasher = rand_state.build_hasher(); 747 748 // Hash some unique-ish data to generate some new state 749 COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); 750 751 // Get the seed 752 hasher.finish() 753 } 754 } 755 756 #[cfg(loom)] 757 pub(crate) mod rand { seed() -> u64758 pub(crate) fn seed() -> u64 { 759 1 760 } 761 } 762 } 763 764 /// Fast random number generate 765 /// 766 /// Implement `xorshift64+`: 2 32-bit `xorshift` sequences added together. 767 /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's 768 /// `Xorshift` paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf> 769 /// This generator passes the SmallCrush suite, part of TestU01 framework: 770 /// <http://simul.iro.umontreal.ca/testu01/tu01.html> 771 #[derive(Debug)] 772 pub(crate) struct FastRand { 773 one: Cell<u32>, 774 two: Cell<u32>, 775 } 776 777 impl FastRand { 778 /// Initialize a new, thread-local, fast random number generator. new(seed: u64) -> FastRand779 pub(crate) fn new(seed: u64) -> FastRand { 780 let one = (seed >> 32) as u32; 781 let mut two = seed as u32; 782 783 if two == 0 { 784 // This value cannot be zero 785 two = 1; 786 } 787 788 FastRand { 789 one: Cell::new(one), 790 two: Cell::new(two), 791 } 792 } 793 fastrand_n(&self, n: u32) -> u32794 pub(crate) fn fastrand_n(&self, n: u32) -> u32 { 795 // This is similar to fastrand() % n, but faster. 796 // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ 797 let mul = (self.fastrand() as u64).wrapping_mul(n as u64); 798 (mul >> 32) as u32 799 } 800 fastrand(&self) -> u32801 fn fastrand(&self) -> u32 { 802 let mut s1 = self.one.get(); 803 let s0 = self.two.get(); 804 805 s1 ^= s1 << 17; 806 s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; 807 808 self.one.set(s0); 809 self.two.set(s1); 810 811 s0.wrapping_add(s1) 812 } 813 } 814 815 // Used by `StreamMap` thread_rng_n(n: u32) -> u32816 pub(crate) fn thread_rng_n(n: u32) -> u32 { 817 thread_local! { 818 static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed()); 819 } 820 821 THREAD_RNG.with(|rng| rng.fastrand_n(n)) 822 } 823 } 824