1 //! Streams 2 //! 3 //! This module contains a number of functions for working with `Stream`s, 4 //! including the `StreamExt` trait which adds methods to `Stream` types. 5 6 use crate::future::{assert_future, Either}; 7 use crate::stream::assert_stream; 8 #[cfg(feature = "alloc")] 9 use alloc::boxed::Box; 10 #[cfg(feature = "alloc")] 11 use alloc::vec::Vec; 12 use core::pin::Pin; 13 #[cfg(feature = "sink")] 14 use futures_core::stream::TryStream; 15 #[cfg(feature = "alloc")] 16 use futures_core::stream::{BoxStream, LocalBoxStream}; 17 use futures_core::{ 18 future::Future, 19 stream::{FusedStream, Stream}, 20 task::{Context, Poll}, 21 }; 22 #[cfg(feature = "sink")] 23 use futures_sink::Sink; 24 25 use crate::fns::{inspect_fn, InspectFn}; 26 27 mod chain; 28 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 29 pub use self::chain::Chain; 30 31 mod collect; 32 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 33 pub use self::collect::Collect; 34 35 mod unzip; 36 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 37 pub use self::unzip::Unzip; 38 39 mod concat; 40 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 41 pub use self::concat::Concat; 42 43 mod count; 44 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 45 pub use self::count::Count; 46 47 mod cycle; 48 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 49 pub use self::cycle::Cycle; 50 51 mod enumerate; 52 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 53 pub use self::enumerate::Enumerate; 54 55 mod filter; 56 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 57 pub use self::filter::Filter; 58 59 mod filter_map; 60 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 61 pub use self::filter_map::FilterMap; 62 63 mod flatten; 64 65 delegate_all!( 66 /// Stream for the [`flatten`](StreamExt::flatten) method. 67 Flatten<St>( 68 flatten::Flatten<St, St::Item> 69 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)] 70 where St: Stream 71 ); 72 73 mod fold; 74 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 75 pub use self::fold::Fold; 76 77 mod any; 78 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 79 pub use self::any::Any; 80 81 mod all; 82 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 83 pub use self::all::All; 84 85 #[cfg(feature = "sink")] 86 mod forward; 87 88 #[cfg(feature = "sink")] 89 delegate_all!( 90 /// Future for the [`forward`](super::StreamExt::forward) method. 91 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 92 Forward<St, Si>( 93 forward::Forward<St, Si, St::Ok> 94 ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)] 95 where St: TryStream 96 ); 97 98 mod for_each; 99 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 100 pub use self::for_each::ForEach; 101 102 mod fuse; 103 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 104 pub use self::fuse::Fuse; 105 106 mod into_future; 107 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 108 pub use self::into_future::StreamFuture; 109 110 delegate_all!( 111 /// Stream for the [`inspect`](StreamExt::inspect) method. 112 Inspect<St, F>( 113 map::Map<St, InspectFn<F>> 114 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))] 115 ); 116 117 mod map; 118 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 119 pub use self::map::Map; 120 121 delegate_all!( 122 /// Stream for the [`flat_map`](StreamExt::flat_map) method. 123 FlatMap<St, U, F>( 124 flatten::Flatten<Map<St, F>, U> 125 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))] 126 ); 127 128 mod next; 129 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 130 pub use self::next::Next; 131 132 mod select_next_some; 133 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 134 pub use self::select_next_some::SelectNextSome; 135 136 mod peek; 137 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 138 pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable}; 139 140 mod skip; 141 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 142 pub use self::skip::Skip; 143 144 mod skip_while; 145 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 146 pub use self::skip_while::SkipWhile; 147 148 mod take; 149 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 150 pub use self::take::Take; 151 152 mod take_while; 153 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 154 pub use self::take_while::TakeWhile; 155 156 mod take_until; 157 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 158 pub use self::take_until::TakeUntil; 159 160 mod then; 161 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 162 pub use self::then::Then; 163 164 mod zip; 165 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 166 pub use self::zip::Zip; 167 168 #[cfg(feature = "alloc")] 169 mod chunks; 170 #[cfg(feature = "alloc")] 171 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 172 pub use self::chunks::Chunks; 173 174 #[cfg(feature = "alloc")] 175 mod ready_chunks; 176 #[cfg(feature = "alloc")] 177 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 178 pub use self::ready_chunks::ReadyChunks; 179 180 mod scan; 181 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 182 pub use self::scan::Scan; 183 184 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 185 #[cfg(feature = "alloc")] 186 mod buffer_unordered; 187 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 188 #[cfg(feature = "alloc")] 189 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 190 pub use self::buffer_unordered::BufferUnordered; 191 192 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 193 #[cfg(feature = "alloc")] 194 mod buffered; 195 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 196 #[cfg(feature = "alloc")] 197 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 198 pub use self::buffered::Buffered; 199 200 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 201 #[cfg(feature = "alloc")] 202 pub(crate) mod flatten_unordered; 203 204 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 205 #[cfg(feature = "alloc")] 206 #[allow(unreachable_pub)] 207 pub use self::flatten_unordered::FlattenUnordered; 208 209 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 210 #[cfg(feature = "alloc")] 211 delegate_all!( 212 /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method. 213 FlatMapUnordered<St, U, F>( 214 FlattenUnordered<Map<St, F>> 215 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)] 216 where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U 217 ); 218 219 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 220 #[cfg(feature = "alloc")] 221 mod for_each_concurrent; 222 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 223 #[cfg(feature = "alloc")] 224 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 225 pub use self::for_each_concurrent::ForEachConcurrent; 226 227 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 228 #[cfg(feature = "sink")] 229 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 230 #[cfg(feature = "alloc")] 231 mod split; 232 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 233 #[cfg(feature = "sink")] 234 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 235 #[cfg(feature = "alloc")] 236 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 237 pub use self::split::{ReuniteError, SplitSink, SplitStream}; 238 239 #[cfg(feature = "std")] 240 mod catch_unwind; 241 #[cfg(feature = "std")] 242 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 243 pub use self::catch_unwind::CatchUnwind; 244 245 impl<T: ?Sized> StreamExt for T where T: Stream {} 246 247 /// An extension trait for `Stream`s that provides a variety of convenient 248 /// combinator functions. 249 pub trait StreamExt: Stream { 250 /// Creates a future that resolves to the next item in the stream. 251 /// 252 /// Note that because `next` doesn't take ownership over the stream, 253 /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a 254 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can 255 /// be done by boxing the stream using [`Box::pin`] or 256 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` 257 /// crate. 258 /// 259 /// # Examples 260 /// 261 /// ``` 262 /// # futures::executor::block_on(async { 263 /// use futures::stream::{self, StreamExt}; 264 /// 265 /// let mut stream = stream::iter(1..=3); 266 /// 267 /// assert_eq!(stream.next().await, Some(1)); 268 /// assert_eq!(stream.next().await, Some(2)); 269 /// assert_eq!(stream.next().await, Some(3)); 270 /// assert_eq!(stream.next().await, None); 271 /// # }); 272 /// ``` next(&mut self) -> Next<'_, Self> where Self: Unpin,273 fn next(&mut self) -> Next<'_, Self> 274 where 275 Self: Unpin, 276 { 277 assert_future::<Option<Self::Item>, _>(Next::new(self)) 278 } 279 280 /// Converts this stream into a future of `(next_item, tail_of_stream)`. 281 /// If the stream terminates, then the next item is [`None`]. 282 /// 283 /// The returned future can be used to compose streams and futures together 284 /// by placing everything into the "world of futures". 285 /// 286 /// Note that because `into_future` moves the stream, the [`Stream`] type 287 /// must be [`Unpin`]. If you want to use `into_future` with a 288 /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can 289 /// be done by boxing the stream using [`Box::pin`] or 290 /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` 291 /// crate. 292 /// 293 /// # Examples 294 /// 295 /// ``` 296 /// # futures::executor::block_on(async { 297 /// use futures::stream::{self, StreamExt}; 298 /// 299 /// let stream = stream::iter(1..=3); 300 /// 301 /// let (item, stream) = stream.into_future().await; 302 /// assert_eq!(Some(1), item); 303 /// 304 /// let (item, stream) = stream.into_future().await; 305 /// assert_eq!(Some(2), item); 306 /// # }); 307 /// ``` into_future(self) -> StreamFuture<Self> where Self: Sized + Unpin,308 fn into_future(self) -> StreamFuture<Self> 309 where 310 Self: Sized + Unpin, 311 { 312 assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self)) 313 } 314 315 /// Maps this stream's items to a different type, returning a new stream of 316 /// the resulting type. 317 /// 318 /// The provided closure is executed over all elements of this stream as 319 /// they are made available. It is executed inline with calls to 320 /// [`poll_next`](Stream::poll_next). 321 /// 322 /// Note that this function consumes the stream passed into it and returns a 323 /// wrapped version of it, similar to the existing `map` methods in the 324 /// standard library. 325 /// 326 /// See [`StreamExt::then`](Self::then) if you want to use a closure that 327 /// returns a future instead of a value. 328 /// 329 /// # Examples 330 /// 331 /// ``` 332 /// # futures::executor::block_on(async { 333 /// use futures::stream::{self, StreamExt}; 334 /// 335 /// let stream = stream::iter(1..=3); 336 /// let stream = stream.map(|x| x + 3); 337 /// 338 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await); 339 /// # }); 340 /// ``` map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,341 fn map<T, F>(self, f: F) -> Map<Self, F> 342 where 343 F: FnMut(Self::Item) -> T, 344 Self: Sized, 345 { 346 assert_stream::<T, _>(Map::new(self, f)) 347 } 348 349 /// Creates a stream which gives the current iteration count as well as 350 /// the next value. 351 /// 352 /// The stream returned yields pairs `(i, val)`, where `i` is the 353 /// current index of iteration and `val` is the value returned by the 354 /// stream. 355 /// 356 /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a 357 /// different sized integer, the [`zip`](StreamExt::zip) function provides similar 358 /// functionality. 359 /// 360 /// # Overflow Behavior 361 /// 362 /// The method does no guarding against overflows, so enumerating more than 363 /// [`usize::MAX`] elements either produces the wrong result or panics. If 364 /// debug assertions are enabled, a panic is guaranteed. 365 /// 366 /// # Panics 367 /// 368 /// The returned stream might panic if the to-be-returned index would 369 /// overflow a [`usize`]. 370 /// 371 /// # Examples 372 /// 373 /// ``` 374 /// # futures::executor::block_on(async { 375 /// use futures::stream::{self, StreamExt}; 376 /// 377 /// let stream = stream::iter(vec!['a', 'b', 'c']); 378 /// 379 /// let mut stream = stream.enumerate(); 380 /// 381 /// assert_eq!(stream.next().await, Some((0, 'a'))); 382 /// assert_eq!(stream.next().await, Some((1, 'b'))); 383 /// assert_eq!(stream.next().await, Some((2, 'c'))); 384 /// assert_eq!(stream.next().await, None); 385 /// # }); 386 /// ``` enumerate(self) -> Enumerate<Self> where Self: Sized,387 fn enumerate(self) -> Enumerate<Self> 388 where 389 Self: Sized, 390 { 391 assert_stream::<(usize, Self::Item), _>(Enumerate::new(self)) 392 } 393 394 /// Filters the values produced by this stream according to the provided 395 /// asynchronous predicate. 396 /// 397 /// As values of this stream are made available, the provided predicate `f` 398 /// will be run against them. If the predicate returns a `Future` which 399 /// resolves to `true`, then the stream will yield the value, but if the 400 /// predicate returns a `Future` which resolves to `false`, then the value 401 /// will be discarded and the next value will be produced. 402 /// 403 /// Note that this function consumes the stream passed into it and returns a 404 /// wrapped version of it, similar to the existing `filter` methods in the 405 /// standard library. 406 /// 407 /// # Examples 408 /// 409 /// ``` 410 /// # futures::executor::block_on(async { 411 /// use futures::future; 412 /// use futures::stream::{self, StreamExt}; 413 /// 414 /// let stream = stream::iter(1..=10); 415 /// let events = stream.filter(|x| future::ready(x % 2 == 0)); 416 /// 417 /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await); 418 /// # }); 419 /// ``` filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,420 fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> 421 where 422 F: FnMut(&Self::Item) -> Fut, 423 Fut: Future<Output = bool>, 424 Self: Sized, 425 { 426 assert_stream::<Self::Item, _>(Filter::new(self, f)) 427 } 428 429 /// Filters the values produced by this stream while simultaneously mapping 430 /// them to a different type according to the provided asynchronous closure. 431 /// 432 /// As values of this stream are made available, the provided function will 433 /// be run on them. If the future returned by the predicate `f` resolves to 434 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if 435 /// it resolves to [`None`] then the next value will be produced. 436 /// 437 /// Note that this function consumes the stream passed into it and returns a 438 /// wrapped version of it, similar to the existing `filter_map` methods in 439 /// the standard library. 440 /// 441 /// # Examples 442 /// ``` 443 /// # futures::executor::block_on(async { 444 /// use futures::stream::{self, StreamExt}; 445 /// 446 /// let stream = stream::iter(1..=10); 447 /// let events = stream.filter_map(|x| async move { 448 /// if x % 2 == 0 { Some(x + 1) } else { None } 449 /// }); 450 /// 451 /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await); 452 /// # }); 453 /// ``` filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,454 fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> 455 where 456 F: FnMut(Self::Item) -> Fut, 457 Fut: Future<Output = Option<T>>, 458 Self: Sized, 459 { 460 assert_stream::<T, _>(FilterMap::new(self, f)) 461 } 462 463 /// Computes from this stream's items new items of a different type using 464 /// an asynchronous closure. 465 /// 466 /// The provided closure `f` will be called with an `Item` once a value is 467 /// ready, it returns a future which will then be run to completion 468 /// to produce the next value on this stream. 469 /// 470 /// Note that this function consumes the stream passed into it and returns a 471 /// wrapped version of it. 472 /// 473 /// See [`StreamExt::map`](Self::map) if you want to use a closure that 474 /// returns a value instead of a future. 475 /// 476 /// # Examples 477 /// 478 /// ``` 479 /// # futures::executor::block_on(async { 480 /// use futures::stream::{self, StreamExt}; 481 /// 482 /// let stream = stream::iter(1..=3); 483 /// let stream = stream.then(|x| async move { x + 3 }); 484 /// 485 /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await); 486 /// # }); 487 /// ``` then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,488 fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> 489 where 490 F: FnMut(Self::Item) -> Fut, 491 Fut: Future, 492 Self: Sized, 493 { 494 assert_stream::<Fut::Output, _>(Then::new(self, f)) 495 } 496 497 /// Transforms a stream into a collection, returning a 498 /// future representing the result of that computation. 499 /// 500 /// The returned future will be resolved when the stream terminates. 501 /// 502 /// # Examples 503 /// 504 /// ``` 505 /// # futures::executor::block_on(async { 506 /// use futures::channel::mpsc; 507 /// use futures::stream::StreamExt; 508 /// use std::thread; 509 /// 510 /// let (tx, rx) = mpsc::unbounded(); 511 /// 512 /// thread::spawn(move || { 513 /// for i in 1..=5 { 514 /// tx.unbounded_send(i).unwrap(); 515 /// } 516 /// }); 517 /// 518 /// let output = rx.collect::<Vec<i32>>().await; 519 /// assert_eq!(output, vec![1, 2, 3, 4, 5]); 520 /// # }); 521 /// ``` collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> where Self: Sized,522 fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> 523 where 524 Self: Sized, 525 { 526 assert_future::<C, _>(Collect::new(self)) 527 } 528 529 /// Converts a stream of pairs into a future, which 530 /// resolves to pair of containers. 531 /// 532 /// `unzip()` produces a future, which resolves to two 533 /// collections: one from the left elements of the pairs, 534 /// and one from the right elements. 535 /// 536 /// The returned future will be resolved when the stream terminates. 537 /// 538 /// # Examples 539 /// 540 /// ``` 541 /// # futures::executor::block_on(async { 542 /// use futures::channel::mpsc; 543 /// use futures::stream::StreamExt; 544 /// use std::thread; 545 /// 546 /// let (tx, rx) = mpsc::unbounded(); 547 /// 548 /// thread::spawn(move || { 549 /// tx.unbounded_send((1, 2)).unwrap(); 550 /// tx.unbounded_send((3, 4)).unwrap(); 551 /// tx.unbounded_send((5, 6)).unwrap(); 552 /// }); 553 /// 554 /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await; 555 /// assert_eq!(o1, vec![1, 3, 5]); 556 /// assert_eq!(o2, vec![2, 4, 6]); 557 /// # }); 558 /// ``` unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Sized + Stream<Item = (A, B)>,559 fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> 560 where 561 FromA: Default + Extend<A>, 562 FromB: Default + Extend<B>, 563 Self: Sized + Stream<Item = (A, B)>, 564 { 565 assert_future::<(FromA, FromB), _>(Unzip::new(self)) 566 } 567 568 /// Concatenate all items of a stream into a single extendable 569 /// destination, returning a future representing the end result. 570 /// 571 /// This combinator will extend the first item with the contents 572 /// of all the subsequent results of the stream. If the stream is 573 /// empty, the default value will be returned. 574 /// 575 /// Works with all collections that implement the 576 /// [`Extend`](std::iter::Extend) trait. 577 /// 578 /// # Examples 579 /// 580 /// ``` 581 /// # futures::executor::block_on(async { 582 /// use futures::channel::mpsc; 583 /// use futures::stream::StreamExt; 584 /// use std::thread; 585 /// 586 /// let (tx, rx) = mpsc::unbounded(); 587 /// 588 /// thread::spawn(move || { 589 /// for i in (0..3).rev() { 590 /// let n = i * 3; 591 /// tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap(); 592 /// } 593 /// }); 594 /// 595 /// let result = rx.concat().await; 596 /// 597 /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]); 598 /// # }); 599 /// ``` concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,600 fn concat(self) -> Concat<Self> 601 where 602 Self: Sized, 603 Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default, 604 { 605 assert_future::<Self::Item, _>(Concat::new(self)) 606 } 607 608 /// Drives the stream to completion, counting the number of items. 609 /// 610 /// # Overflow Behavior 611 /// 612 /// The method does no guarding against overflows, so counting elements of a 613 /// stream with more than [`usize::MAX`] elements either produces the wrong 614 /// result or panics. If debug assertions are enabled, a panic is guaranteed. 615 /// 616 /// # Panics 617 /// 618 /// This function might panic if the iterator has more than [`usize::MAX`] 619 /// elements. 620 /// 621 /// # Examples 622 /// 623 /// ``` 624 /// # futures::executor::block_on(async { 625 /// use futures::stream::{self, StreamExt}; 626 /// 627 /// let stream = stream::iter(1..=10); 628 /// let count = stream.count().await; 629 /// 630 /// assert_eq!(count, 10); 631 /// # }); 632 /// ``` count(self) -> Count<Self> where Self: Sized,633 fn count(self) -> Count<Self> 634 where 635 Self: Sized, 636 { 637 assert_future::<usize, _>(Count::new(self)) 638 } 639 640 /// Repeats a stream endlessly. 641 /// 642 /// The stream never terminates. Note that you likely want to avoid 643 /// usage of `collect` or such on the returned stream as it will exhaust 644 /// available memory as it tries to just fill up all RAM. 645 /// 646 /// # Examples 647 /// 648 /// ``` 649 /// # futures::executor::block_on(async { 650 /// use futures::stream::{self, StreamExt}; 651 /// let a = [1, 2, 3]; 652 /// let mut s = stream::iter(a.iter()).cycle(); 653 /// 654 /// assert_eq!(s.next().await, Some(&1)); 655 /// assert_eq!(s.next().await, Some(&2)); 656 /// assert_eq!(s.next().await, Some(&3)); 657 /// assert_eq!(s.next().await, Some(&1)); 658 /// assert_eq!(s.next().await, Some(&2)); 659 /// assert_eq!(s.next().await, Some(&3)); 660 /// assert_eq!(s.next().await, Some(&1)); 661 /// # }); 662 /// ``` cycle(self) -> Cycle<Self> where Self: Sized + Clone,663 fn cycle(self) -> Cycle<Self> 664 where 665 Self: Sized + Clone, 666 { 667 assert_stream::<Self::Item, _>(Cycle::new(self)) 668 } 669 670 /// Execute an accumulating asynchronous computation over a stream, 671 /// collecting all the values into one final result. 672 /// 673 /// This combinator will accumulate all values returned by this stream 674 /// according to the closure provided. The initial state is also provided to 675 /// this method and then is returned again by each execution of the closure. 676 /// Once the entire stream has been exhausted the returned future will 677 /// resolve to this value. 678 /// 679 /// # Examples 680 /// 681 /// ``` 682 /// # futures::executor::block_on(async { 683 /// use futures::stream::{self, StreamExt}; 684 /// 685 /// let number_stream = stream::iter(0..6); 686 /// let sum = number_stream.fold(0, |acc, x| async move { acc + x }); 687 /// assert_eq!(sum.await, 15); 688 /// # }); 689 /// ``` fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,690 fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> 691 where 692 F: FnMut(T, Self::Item) -> Fut, 693 Fut: Future<Output = T>, 694 Self: Sized, 695 { 696 assert_future::<T, _>(Fold::new(self, f, init)) 697 } 698 699 /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate. 700 /// 701 /// # Examples 702 /// 703 /// ``` 704 /// # futures::executor::block_on(async { 705 /// use futures::stream::{self, StreamExt}; 706 /// 707 /// let number_stream = stream::iter(0..10); 708 /// let contain_three = number_stream.any(|i| async move { i == 3 }); 709 /// assert_eq!(contain_three.await, true); 710 /// # }); 711 /// ``` any<Fut, F>(self, f: F) -> Any<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,712 fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F> 713 where 714 F: FnMut(Self::Item) -> Fut, 715 Fut: Future<Output = bool>, 716 Self: Sized, 717 { 718 assert_future::<bool, _>(Any::new(self, f)) 719 } 720 721 /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate. 722 /// 723 /// # Examples 724 /// 725 /// ``` 726 /// # futures::executor::block_on(async { 727 /// use futures::stream::{self, StreamExt}; 728 /// 729 /// let number_stream = stream::iter(0..10); 730 /// let less_then_twenty = number_stream.all(|i| async move { i < 20 }); 731 /// assert_eq!(less_then_twenty.await, true); 732 /// # }); 733 /// ``` all<Fut, F>(self, f: F) -> All<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,734 fn all<Fut, F>(self, f: F) -> All<Self, Fut, F> 735 where 736 F: FnMut(Self::Item) -> Fut, 737 Fut: Future<Output = bool>, 738 Self: Sized, 739 { 740 assert_future::<bool, _>(All::new(self, f)) 741 } 742 743 /// Flattens a stream of streams into just one continuous stream. 744 /// 745 /// # Examples 746 /// 747 /// ``` 748 /// # futures::executor::block_on(async { 749 /// use futures::channel::mpsc; 750 /// use futures::stream::StreamExt; 751 /// use std::thread; 752 /// 753 /// let (tx1, rx1) = mpsc::unbounded(); 754 /// let (tx2, rx2) = mpsc::unbounded(); 755 /// let (tx3, rx3) = mpsc::unbounded(); 756 /// 757 /// thread::spawn(move || { 758 /// tx1.unbounded_send(1).unwrap(); 759 /// tx1.unbounded_send(2).unwrap(); 760 /// }); 761 /// thread::spawn(move || { 762 /// tx2.unbounded_send(3).unwrap(); 763 /// tx2.unbounded_send(4).unwrap(); 764 /// }); 765 /// thread::spawn(move || { 766 /// tx3.unbounded_send(rx1).unwrap(); 767 /// tx3.unbounded_send(rx2).unwrap(); 768 /// }); 769 /// 770 /// let output = rx3.flatten().collect::<Vec<i32>>().await; 771 /// assert_eq!(output, vec![1, 2, 3, 4]); 772 /// # }); 773 /// ``` flatten(self) -> Flatten<Self> where Self::Item: Stream, Self: Sized,774 fn flatten(self) -> Flatten<Self> 775 where 776 Self::Item: Stream, 777 Self: Sized, 778 { 779 assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self)) 780 } 781 782 /// Flattens a stream of streams into just one continuous stream. Polls 783 /// inner streams produced by the base stream concurrently. 784 /// 785 /// The only argument is an optional limit on the number of concurrently 786 /// polled streams. If this limit is not `None`, no more than `limit` streams 787 /// will be polled at the same time. The `limit` argument is of type 788 /// `Into<Option<usize>>`, and so can be provided as either `None`, 789 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as 790 /// no limit at all, and will have the same result as passing in `None`. 791 /// 792 /// # Examples 793 /// 794 /// ``` 795 /// # futures::executor::block_on(async { 796 /// use futures::channel::mpsc; 797 /// use futures::stream::StreamExt; 798 /// use std::thread; 799 /// 800 /// let (tx1, rx1) = mpsc::unbounded(); 801 /// let (tx2, rx2) = mpsc::unbounded(); 802 /// let (tx3, rx3) = mpsc::unbounded(); 803 /// 804 /// thread::spawn(move || { 805 /// tx1.unbounded_send(1).unwrap(); 806 /// tx1.unbounded_send(2).unwrap(); 807 /// }); 808 /// thread::spawn(move || { 809 /// tx2.unbounded_send(3).unwrap(); 810 /// tx2.unbounded_send(4).unwrap(); 811 /// }); 812 /// thread::spawn(move || { 813 /// tx3.unbounded_send(rx1).unwrap(); 814 /// tx3.unbounded_send(rx2).unwrap(); 815 /// }); 816 /// 817 /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await; 818 /// output.sort(); 819 /// 820 /// assert_eq!(output, vec![1, 2, 3, 4]); 821 /// # }); 822 /// ``` 823 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 824 #[cfg(feature = "alloc")] flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self> where Self::Item: Stream + Unpin, Self: Sized,825 fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self> 826 where 827 Self::Item: Stream + Unpin, 828 Self: Sized, 829 { 830 assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into())) 831 } 832 833 /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. 834 /// 835 /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead, 836 /// you would have to chain combinators like `.map(f).flatten()` while this 837 /// combinator provides ability to write `.flat_map(f)` instead of chaining. 838 /// 839 /// The provided closure which produces inner streams is executed over all elements 840 /// of stream as last inner stream is terminated and next stream item is available. 841 /// 842 /// Note that this function consumes the stream passed into it and returns a 843 /// wrapped version of it, similar to the existing `flat_map` methods in the 844 /// standard library. 845 /// 846 /// # Examples 847 /// 848 /// ``` 849 /// # futures::executor::block_on(async { 850 /// use futures::stream::{self, StreamExt}; 851 /// 852 /// let stream = stream::iter(1..=3); 853 /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x])); 854 /// 855 /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await); 856 /// # }); 857 /// ``` flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,858 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> 859 where 860 F: FnMut(Self::Item) -> U, 861 U: Stream, 862 Self: Sized, 863 { 864 assert_stream::<U::Item, _>(FlatMap::new(self, f)) 865 } 866 867 /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s 868 /// and polls them concurrently, yielding items in any order, as they made 869 /// available. 870 /// 871 /// [`StreamExt::map`] is very useful, but if it produces `Stream`s 872 /// instead, and you need to poll all of them concurrently, you would 873 /// have to use something like `for_each_concurrent` and merge values 874 /// by hand. This combinator provides ability to collect all values 875 /// from concurrently polled streams into one stream. 876 /// 877 /// The first argument is an optional limit on the number of concurrently 878 /// polled streams. If this limit is not `None`, no more than `limit` streams 879 /// will be polled at the same time. The `limit` argument is of type 880 /// `Into<Option<usize>>`, and so can be provided as either `None`, 881 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as 882 /// no limit at all, and will have the same result as passing in `None`. 883 /// 884 /// The provided closure which produces inner streams is executed over 885 /// all elements of stream as next stream item is available and limit 886 /// of concurrently processed streams isn't exceeded. 887 /// 888 /// Note that this function consumes the stream passed into it and 889 /// returns a wrapped version of it. 890 /// 891 /// # Examples 892 /// 893 /// ``` 894 /// # futures::executor::block_on(async { 895 /// use futures::stream::{self, StreamExt}; 896 /// 897 /// let stream = stream::iter(1..5); 898 /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x])); 899 /// let mut values = stream.collect::<Vec<_>>().await; 900 /// values.sort(); 901 /// 902 /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values); 903 /// # }); 904 /// ``` 905 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 906 #[cfg(feature = "alloc")] flat_map_unordered<U, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> FlatMapUnordered<Self, U, F> where U: Stream + Unpin, F: FnMut(Self::Item) -> U, Self: Sized,907 fn flat_map_unordered<U, F>( 908 self, 909 limit: impl Into<Option<usize>>, 910 f: F, 911 ) -> FlatMapUnordered<Self, U, F> 912 where 913 U: Stream + Unpin, 914 F: FnMut(Self::Item) -> U, 915 Self: Sized, 916 { 917 assert_stream::<U::Item, _>(FlatMapUnordered::new(self, limit.into(), f)) 918 } 919 920 /// Combinator similar to [`StreamExt::fold`] that holds internal state 921 /// and produces a new stream. 922 /// 923 /// Accepts initial state and closure which will be applied to each element 924 /// of the stream until provided closure returns `None`. Once `None` is 925 /// returned, stream will be terminated. 926 /// 927 /// # Examples 928 /// 929 /// ``` 930 /// # futures::executor::block_on(async { 931 /// use futures::future; 932 /// use futures::stream::{self, StreamExt}; 933 /// 934 /// let stream = stream::iter(1..=10); 935 /// 936 /// let stream = stream.scan(0, |state, x| { 937 /// *state += x; 938 /// future::ready(if *state < 10 { Some(x) } else { None }) 939 /// }); 940 /// 941 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await); 942 /// # }); 943 /// ``` scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,944 fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> 945 where 946 F: FnMut(&mut S, Self::Item) -> Fut, 947 Fut: Future<Output = Option<B>>, 948 Self: Sized, 949 { 950 assert_stream::<B, _>(Scan::new(self, initial_state, f)) 951 } 952 953 /// Skip elements on this stream while the provided asynchronous predicate 954 /// resolves to `true`. 955 /// 956 /// This function, like `Iterator::skip_while`, will skip elements on the 957 /// stream until the predicate `f` resolves to `false`. Once one element 958 /// returns `false`, all future elements will be returned from the underlying 959 /// stream. 960 /// 961 /// # Examples 962 /// 963 /// ``` 964 /// # futures::executor::block_on(async { 965 /// use futures::future; 966 /// use futures::stream::{self, StreamExt}; 967 /// 968 /// let stream = stream::iter(1..=10); 969 /// 970 /// let stream = stream.skip_while(|x| future::ready(*x <= 5)); 971 /// 972 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await); 973 /// # }); 974 /// ``` skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,975 fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> 976 where 977 F: FnMut(&Self::Item) -> Fut, 978 Fut: Future<Output = bool>, 979 Self: Sized, 980 { 981 assert_stream::<Self::Item, _>(SkipWhile::new(self, f)) 982 } 983 984 /// Take elements from this stream while the provided asynchronous predicate 985 /// resolves to `true`. 986 /// 987 /// This function, like `Iterator::take_while`, will take elements from the 988 /// stream until the predicate `f` resolves to `false`. Once one element 989 /// returns `false`, it will always return that the stream is done. 990 /// 991 /// # Examples 992 /// 993 /// ``` 994 /// # futures::executor::block_on(async { 995 /// use futures::future; 996 /// use futures::stream::{self, StreamExt}; 997 /// 998 /// let stream = stream::iter(1..=10); 999 /// 1000 /// let stream = stream.take_while(|x| future::ready(*x <= 5)); 1001 /// 1002 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await); 1003 /// # }); 1004 /// ``` take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,1005 fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> 1006 where 1007 F: FnMut(&Self::Item) -> Fut, 1008 Fut: Future<Output = bool>, 1009 Self: Sized, 1010 { 1011 assert_stream::<Self::Item, _>(TakeWhile::new(self, f)) 1012 } 1013 1014 /// Take elements from this stream until the provided future resolves. 1015 /// 1016 /// This function will take elements from the stream until the provided 1017 /// stopping future `fut` resolves. Once the `fut` future becomes ready, 1018 /// this stream combinator will always return that the stream is done. 1019 /// 1020 /// The stopping future may return any type. Once the stream is stopped 1021 /// the result of the stopping future may be accessed with `TakeUntil::take_result()`. 1022 /// The stream may also be resumed with `TakeUntil::take_future()`. 1023 /// See the documentation of [`TakeUntil`] for more information. 1024 /// 1025 /// # Examples 1026 /// 1027 /// ``` 1028 /// # futures::executor::block_on(async { 1029 /// use futures::future; 1030 /// use futures::stream::{self, StreamExt}; 1031 /// use futures::task::Poll; 1032 /// 1033 /// let stream = stream::iter(1..=10); 1034 /// 1035 /// let mut i = 0; 1036 /// let stop_fut = future::poll_fn(|_cx| { 1037 /// i += 1; 1038 /// if i <= 5 { 1039 /// Poll::Pending 1040 /// } else { 1041 /// Poll::Ready(()) 1042 /// } 1043 /// }); 1044 /// 1045 /// let stream = stream.take_until(stop_fut); 1046 /// 1047 /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await); 1048 /// # }); 1049 /// ``` take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where Fut: Future, Self: Sized,1050 fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> 1051 where 1052 Fut: Future, 1053 Self: Sized, 1054 { 1055 assert_stream::<Self::Item, _>(TakeUntil::new(self, fut)) 1056 } 1057 1058 /// Runs this stream to completion, executing the provided asynchronous 1059 /// closure for each element on the stream. 1060 /// 1061 /// The closure provided will be called for each item this stream produces, 1062 /// yielding a future. That future will then be executed to completion 1063 /// before moving on to the next item. 1064 /// 1065 /// The returned value is a `Future` where the `Output` type is `()`; it is 1066 /// executed entirely for its side effects. 1067 /// 1068 /// To process each item in the stream and produce another stream instead 1069 /// of a single future, use `then` instead. 1070 /// 1071 /// # Examples 1072 /// 1073 /// ``` 1074 /// # futures::executor::block_on(async { 1075 /// use futures::future; 1076 /// use futures::stream::{self, StreamExt}; 1077 /// 1078 /// let mut x = 0; 1079 /// 1080 /// { 1081 /// let fut = stream::repeat(1).take(3).for_each(|item| { 1082 /// x += item; 1083 /// future::ready(()) 1084 /// }); 1085 /// fut.await; 1086 /// } 1087 /// 1088 /// assert_eq!(x, 3); 1089 /// # }); 1090 /// ``` for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,1091 fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> 1092 where 1093 F: FnMut(Self::Item) -> Fut, 1094 Fut: Future<Output = ()>, 1095 Self: Sized, 1096 { 1097 assert_future::<(), _>(ForEach::new(self, f)) 1098 } 1099 1100 /// Runs this stream to completion, executing the provided asynchronous 1101 /// closure for each element on the stream concurrently as elements become 1102 /// available. 1103 /// 1104 /// This is similar to [`StreamExt::for_each`], but the futures 1105 /// produced by the closure are run concurrently (but not in parallel-- 1106 /// this combinator does not introduce any threads). 1107 /// 1108 /// The closure provided will be called for each item this stream produces, 1109 /// yielding a future. That future will then be executed to completion 1110 /// concurrently with the other futures produced by the closure. 1111 /// 1112 /// The first argument is an optional limit on the number of concurrent 1113 /// futures. If this limit is not `None`, no more than `limit` futures 1114 /// will be run concurrently. The `limit` argument is of type 1115 /// `Into<Option<usize>>`, and so can be provided as either `None`, 1116 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as 1117 /// no limit at all, and will have the same result as passing in `None`. 1118 /// 1119 /// This method is only available when the `std` or `alloc` feature of this 1120 /// library is activated, and it is activated by default. 1121 /// 1122 /// # Examples 1123 /// 1124 /// ``` 1125 /// # futures::executor::block_on(async { 1126 /// use futures::channel::oneshot; 1127 /// use futures::stream::{self, StreamExt}; 1128 /// 1129 /// let (tx1, rx1) = oneshot::channel(); 1130 /// let (tx2, rx2) = oneshot::channel(); 1131 /// let (tx3, rx3) = oneshot::channel(); 1132 /// 1133 /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent( 1134 /// /* limit */ 2, 1135 /// |rx| async move { 1136 /// rx.await.unwrap(); 1137 /// } 1138 /// ); 1139 /// tx1.send(()).unwrap(); 1140 /// tx2.send(()).unwrap(); 1141 /// tx3.send(()).unwrap(); 1142 /// fut.await; 1143 /// # }) 1144 /// ``` 1145 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 1146 #[cfg(feature = "alloc")] for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,1147 fn for_each_concurrent<Fut, F>( 1148 self, 1149 limit: impl Into<Option<usize>>, 1150 f: F, 1151 ) -> ForEachConcurrent<Self, Fut, F> 1152 where 1153 F: FnMut(Self::Item) -> Fut, 1154 Fut: Future<Output = ()>, 1155 Self: Sized, 1156 { 1157 assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f)) 1158 } 1159 1160 /// Creates a new stream of at most `n` items of the underlying stream. 1161 /// 1162 /// Once `n` items have been yielded from this stream then it will always 1163 /// return that the stream is done. 1164 /// 1165 /// # Examples 1166 /// 1167 /// ``` 1168 /// # futures::executor::block_on(async { 1169 /// use futures::stream::{self, StreamExt}; 1170 /// 1171 /// let stream = stream::iter(1..=10).take(3); 1172 /// 1173 /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await); 1174 /// # }); 1175 /// ``` take(self, n: usize) -> Take<Self> where Self: Sized,1176 fn take(self, n: usize) -> Take<Self> 1177 where 1178 Self: Sized, 1179 { 1180 assert_stream::<Self::Item, _>(Take::new(self, n)) 1181 } 1182 1183 /// Creates a new stream which skips `n` items of the underlying stream. 1184 /// 1185 /// Once `n` items have been skipped from this stream then it will always 1186 /// return the remaining items on this stream. 1187 /// 1188 /// # Examples 1189 /// 1190 /// ``` 1191 /// # futures::executor::block_on(async { 1192 /// use futures::stream::{self, StreamExt}; 1193 /// 1194 /// let stream = stream::iter(1..=10).skip(5); 1195 /// 1196 /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await); 1197 /// # }); 1198 /// ``` skip(self, n: usize) -> Skip<Self> where Self: Sized,1199 fn skip(self, n: usize) -> Skip<Self> 1200 where 1201 Self: Sized, 1202 { 1203 assert_stream::<Self::Item, _>(Skip::new(self, n)) 1204 } 1205 1206 /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never 1207 /// again be called once it has finished. This method can be used to turn 1208 /// any `Stream` into a `FusedStream`. 1209 /// 1210 /// Normally, once a stream has returned [`None`] from 1211 /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad 1212 /// behavior such as block forever, panic, never return, etc. If it is known 1213 /// that [`poll_next`](Stream::poll_next) may be called after stream 1214 /// has already finished, then this method can be used to ensure that it has 1215 /// defined semantics. 1216 /// 1217 /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream 1218 /// is guaranteed to return [`None`] after the underlying stream has 1219 /// finished. 1220 /// 1221 /// # Examples 1222 /// 1223 /// ``` 1224 /// use futures::executor::block_on_stream; 1225 /// use futures::stream::{self, StreamExt}; 1226 /// use futures::task::Poll; 1227 /// 1228 /// let mut x = 0; 1229 /// let stream = stream::poll_fn(|_| { 1230 /// x += 1; 1231 /// match x { 1232 /// 0..=2 => Poll::Ready(Some(x)), 1233 /// 3 => Poll::Ready(None), 1234 /// _ => panic!("should not happen") 1235 /// } 1236 /// }).fuse(); 1237 /// 1238 /// let mut iter = block_on_stream(stream); 1239 /// assert_eq!(Some(1), iter.next()); 1240 /// assert_eq!(Some(2), iter.next()); 1241 /// assert_eq!(None, iter.next()); 1242 /// assert_eq!(None, iter.next()); 1243 /// // ... 1244 /// ``` fuse(self) -> Fuse<Self> where Self: Sized,1245 fn fuse(self) -> Fuse<Self> 1246 where 1247 Self: Sized, 1248 { 1249 assert_stream::<Self::Item, _>(Fuse::new(self)) 1250 } 1251 1252 /// Borrows a stream, rather than consuming it. 1253 /// 1254 /// This is useful to allow applying stream adaptors while still retaining 1255 /// ownership of the original stream. 1256 /// 1257 /// # Examples 1258 /// 1259 /// ``` 1260 /// # futures::executor::block_on(async { 1261 /// use futures::stream::{self, StreamExt}; 1262 /// 1263 /// let mut stream = stream::iter(1..5); 1264 /// 1265 /// let sum = stream.by_ref() 1266 /// .take(2) 1267 /// .fold(0, |a, b| async move { a + b }) 1268 /// .await; 1269 /// assert_eq!(sum, 3); 1270 /// 1271 /// // You can use the stream again 1272 /// let sum = stream.take(2) 1273 /// .fold(0, |a, b| async move { a + b }) 1274 /// .await; 1275 /// assert_eq!(sum, 7); 1276 /// # }); 1277 /// ``` by_ref(&mut self) -> &mut Self1278 fn by_ref(&mut self) -> &mut Self { 1279 self 1280 } 1281 1282 /// Catches unwinding panics while polling the stream. 1283 /// 1284 /// Caught panic (if any) will be the last element of the resulting stream. 1285 /// 1286 /// In general, panics within a stream can propagate all the way out to the 1287 /// task level. This combinator makes it possible to halt unwinding within 1288 /// the stream itself. It's most commonly used within task executors. This 1289 /// method should not be used for error handling. 1290 /// 1291 /// Note that this method requires the `UnwindSafe` bound from the standard 1292 /// library. This isn't always applied automatically, and the standard 1293 /// library provides an `AssertUnwindSafe` wrapper type to apply it 1294 /// after-the fact. To assist using this method, the [`Stream`] trait is 1295 /// also implemented for `AssertUnwindSafe<St>` where `St` implements 1296 /// [`Stream`]. 1297 /// 1298 /// This method is only available when the `std` feature of this 1299 /// library is activated, and it is activated by default. 1300 /// 1301 /// # Examples 1302 /// 1303 /// ``` 1304 /// # futures::executor::block_on(async { 1305 /// use futures::stream::{self, StreamExt}; 1306 /// 1307 /// let stream = stream::iter(vec![Some(10), None, Some(11)]); 1308 /// // Panic on second element 1309 /// let stream_panicking = stream.map(|o| o.unwrap()); 1310 /// // Collect all the results 1311 /// let stream = stream_panicking.catch_unwind(); 1312 /// 1313 /// let results: Vec<Result<i32, _>> = stream.collect().await; 1314 /// match results[0] { 1315 /// Ok(10) => {} 1316 /// _ => panic!("unexpected result!"), 1317 /// } 1318 /// assert!(results[1].is_err()); 1319 /// assert_eq!(results.len(), 2); 1320 /// # }); 1321 /// ``` 1322 #[cfg(feature = "std")] catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe,1323 fn catch_unwind(self) -> CatchUnwind<Self> 1324 where 1325 Self: Sized + std::panic::UnwindSafe, 1326 { 1327 assert_stream(CatchUnwind::new(self)) 1328 } 1329 1330 /// Wrap the stream in a Box, pinning it. 1331 /// 1332 /// This method is only available when the `std` or `alloc` feature of this 1333 /// library is activated, and it is activated by default. 1334 #[cfg(feature = "alloc")] boxed<'a>(self) -> BoxStream<'a, Self::Item> where Self: Sized + Send + 'a,1335 fn boxed<'a>(self) -> BoxStream<'a, Self::Item> 1336 where 1337 Self: Sized + Send + 'a, 1338 { 1339 assert_stream::<Self::Item, _>(Box::pin(self)) 1340 } 1341 1342 /// Wrap the stream in a Box, pinning it. 1343 /// 1344 /// Similar to `boxed`, but without the `Send` requirement. 1345 /// 1346 /// This method is only available when the `std` or `alloc` feature of this 1347 /// library is activated, and it is activated by default. 1348 #[cfg(feature = "alloc")] boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> where Self: Sized + 'a,1349 fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> 1350 where 1351 Self: Sized + 'a, 1352 { 1353 assert_stream::<Self::Item, _>(Box::pin(self)) 1354 } 1355 1356 /// An adaptor for creating a buffered list of pending futures. 1357 /// 1358 /// If this stream's item can be converted into a future, then this adaptor 1359 /// will buffer up to at most `n` futures and then return the outputs in the 1360 /// same order as the underlying stream. No more than `n` futures will be 1361 /// buffered at any point in time, and less than `n` may also be buffered 1362 /// depending on the state of each future. 1363 /// 1364 /// The returned stream will be a stream of each future's output. 1365 /// 1366 /// This method is only available when the `std` or `alloc` feature of this 1367 /// library is activated, and it is activated by default. 1368 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 1369 #[cfg(feature = "alloc")] buffered(self, n: usize) -> Buffered<Self> where Self::Item: Future, Self: Sized,1370 fn buffered(self, n: usize) -> Buffered<Self> 1371 where 1372 Self::Item: Future, 1373 Self: Sized, 1374 { 1375 assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n)) 1376 } 1377 1378 /// An adaptor for creating a buffered list of pending futures (unordered). 1379 /// 1380 /// If this stream's item can be converted into a future, then this adaptor 1381 /// will buffer up to `n` futures and then return the outputs in the order 1382 /// in which they complete. No more than `n` futures will be buffered at 1383 /// any point in time, and less than `n` may also be buffered depending on 1384 /// the state of each future. 1385 /// 1386 /// The returned stream will be a stream of each future's output. 1387 /// 1388 /// This method is only available when the `std` or `alloc` feature of this 1389 /// library is activated, and it is activated by default. 1390 /// 1391 /// # Examples 1392 /// 1393 /// ``` 1394 /// # futures::executor::block_on(async { 1395 /// use futures::channel::oneshot; 1396 /// use futures::stream::{self, StreamExt}; 1397 /// 1398 /// let (send_one, recv_one) = oneshot::channel(); 1399 /// let (send_two, recv_two) = oneshot::channel(); 1400 /// 1401 /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]); 1402 /// let mut buffered = stream_of_futures.buffer_unordered(10); 1403 /// 1404 /// send_two.send(2i32)?; 1405 /// assert_eq!(buffered.next().await, Some(Ok(2i32))); 1406 /// 1407 /// send_one.send(1i32)?; 1408 /// assert_eq!(buffered.next().await, Some(Ok(1i32))); 1409 /// 1410 /// assert_eq!(buffered.next().await, None); 1411 /// # Ok::<(), i32>(()) }).unwrap(); 1412 /// ``` 1413 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 1414 #[cfg(feature = "alloc")] buffer_unordered(self, n: usize) -> BufferUnordered<Self> where Self::Item: Future, Self: Sized,1415 fn buffer_unordered(self, n: usize) -> BufferUnordered<Self> 1416 where 1417 Self::Item: Future, 1418 Self: Sized, 1419 { 1420 assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n)) 1421 } 1422 1423 /// An adapter for zipping two streams together. 1424 /// 1425 /// The zipped stream waits for both streams to produce an item, and then 1426 /// returns that pair. If either stream ends then the zipped stream will 1427 /// also end. 1428 /// 1429 /// # Examples 1430 /// 1431 /// ``` 1432 /// # futures::executor::block_on(async { 1433 /// use futures::stream::{self, StreamExt}; 1434 /// 1435 /// let stream1 = stream::iter(1..=3); 1436 /// let stream2 = stream::iter(5..=10); 1437 /// 1438 /// let vec = stream1.zip(stream2) 1439 /// .collect::<Vec<_>>() 1440 /// .await; 1441 /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec); 1442 /// # }); 1443 /// ``` 1444 /// zip<St>(self, other: St) -> Zip<Self, St> where St: Stream, Self: Sized,1445 fn zip<St>(self, other: St) -> Zip<Self, St> 1446 where 1447 St: Stream, 1448 Self: Sized, 1449 { 1450 assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other)) 1451 } 1452 1453 /// Adapter for chaining two streams. 1454 /// 1455 /// The resulting stream emits elements from the first stream, and when 1456 /// first stream reaches the end, emits the elements from the second stream. 1457 /// 1458 /// ``` 1459 /// # futures::executor::block_on(async { 1460 /// use futures::stream::{self, StreamExt}; 1461 /// 1462 /// let stream1 = stream::iter(vec![Ok(10), Err(false)]); 1463 /// let stream2 = stream::iter(vec![Err(true), Ok(20)]); 1464 /// 1465 /// let stream = stream1.chain(stream2); 1466 /// 1467 /// let result: Vec<_> = stream.collect().await; 1468 /// assert_eq!(result, vec![ 1469 /// Ok(10), 1470 /// Err(false), 1471 /// Err(true), 1472 /// Ok(20), 1473 /// ]); 1474 /// # }); 1475 /// ``` chain<St>(self, other: St) -> Chain<Self, St> where St: Stream<Item = Self::Item>, Self: Sized,1476 fn chain<St>(self, other: St) -> Chain<Self, St> 1477 where 1478 St: Stream<Item = Self::Item>, 1479 Self: Sized, 1480 { 1481 assert_stream::<Self::Item, _>(Chain::new(self, other)) 1482 } 1483 1484 /// Creates a new stream which exposes a `peek` method. 1485 /// 1486 /// Calling `peek` returns a reference to the next item in the stream. peekable(self) -> Peekable<Self> where Self: Sized,1487 fn peekable(self) -> Peekable<Self> 1488 where 1489 Self: Sized, 1490 { 1491 assert_stream::<Self::Item, _>(Peekable::new(self)) 1492 } 1493 1494 /// An adaptor for chunking up items of the stream inside a vector. 1495 /// 1496 /// This combinator will attempt to pull items from this stream and buffer 1497 /// them into a local vector. At most `capacity` items will get buffered 1498 /// before they're yielded from the returned stream. 1499 /// 1500 /// Note that the vectors returned from this iterator may not always have 1501 /// `capacity` elements. If the underlying stream ended and only a partial 1502 /// vector was created, it'll be returned. Additionally if an error happens 1503 /// from the underlying stream then the currently buffered items will be 1504 /// yielded. 1505 /// 1506 /// This method is only available when the `std` or `alloc` feature of this 1507 /// library is activated, and it is activated by default. 1508 /// 1509 /// # Panics 1510 /// 1511 /// This method will panic if `capacity` is zero. 1512 #[cfg(feature = "alloc")] chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized,1513 fn chunks(self, capacity: usize) -> Chunks<Self> 1514 where 1515 Self: Sized, 1516 { 1517 assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity)) 1518 } 1519 1520 /// An adaptor for chunking up ready items of the stream inside a vector. 1521 /// 1522 /// This combinator will attempt to pull ready items from this stream and 1523 /// buffer them into a local vector. At most `capacity` items will get 1524 /// buffered before they're yielded from the returned stream. If underlying 1525 /// stream returns `Poll::Pending`, and collected chunk is not empty, it will 1526 /// be immediately returned. 1527 /// 1528 /// If the underlying stream ended and only a partial vector was created, 1529 /// it will be returned. 1530 /// 1531 /// This method is only available when the `std` or `alloc` feature of this 1532 /// library is activated, and it is activated by default. 1533 /// 1534 /// # Panics 1535 /// 1536 /// This method will panic if `capacity` is zero. 1537 #[cfg(feature = "alloc")] ready_chunks(self, capacity: usize) -> ReadyChunks<Self> where Self: Sized,1538 fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self> 1539 where 1540 Self: Sized, 1541 { 1542 assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity)) 1543 } 1544 1545 /// A future that completes after the given stream has been fully processed 1546 /// into the sink and the sink has been flushed and closed. 1547 /// 1548 /// This future will drive the stream to keep producing items until it is 1549 /// exhausted, sending each item to the sink. It will complete once the 1550 /// stream is exhausted, the sink has received and flushed all items, and 1551 /// the sink is closed. Note that neither the original stream nor provided 1552 /// sink will be output by this future. Pass the sink by `Pin<&mut S>` 1553 /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in 1554 /// order to preserve access to the `Sink`. If the stream produces an error, 1555 /// that error will be returned by this future without flushing/closing the sink. 1556 #[cfg(feature = "sink")] 1557 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized,1558 fn forward<S>(self, sink: S) -> Forward<Self, S> 1559 where 1560 S: Sink<Self::Ok, Error = Self::Error>, 1561 Self: TryStream + Sized, 1562 // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>, 1563 { 1564 // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>` 1565 // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink)) 1566 Forward::new(self, sink) 1567 } 1568 1569 /// Splits this `Stream + Sink` object into separate `Sink` and `Stream` 1570 /// objects. 1571 /// 1572 /// This can be useful when you want to split ownership between tasks, or 1573 /// allow direct interaction between the two objects (e.g. via 1574 /// `Sink::send_all`). 1575 /// 1576 /// This method is only available when the `std` or `alloc` feature of this 1577 /// library is activated, and it is activated by default. 1578 #[cfg(feature = "sink")] 1579 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] 1580 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 1581 #[cfg(feature = "alloc")] split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where Self: Sink<Item> + Sized,1582 fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) 1583 where 1584 Self: Sink<Item> + Sized, 1585 { 1586 let (sink, stream) = split::split(self); 1587 ( 1588 crate::sink::assert_sink::<Item, Self::Error, _>(sink), 1589 assert_stream::<Self::Item, _>(stream), 1590 ) 1591 } 1592 1593 /// Do something with each item of this stream, afterwards passing it on. 1594 /// 1595 /// This is similar to the `Iterator::inspect` method in the standard 1596 /// library where it allows easily inspecting each value as it passes 1597 /// through the stream, for example to debug what's going on. inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1598 fn inspect<F>(self, f: F) -> Inspect<Self, F> 1599 where 1600 F: FnMut(&Self::Item), 1601 Self: Sized, 1602 { 1603 assert_stream::<Self::Item, _>(Inspect::new(self, f)) 1604 } 1605 1606 /// Wrap this stream in an `Either` stream, making it the left-hand variant 1607 /// of that `Either`. 1608 /// 1609 /// This can be used in combination with the `right_stream` method to write `if` 1610 /// statements that evaluate to different streams in different branches. left_stream<B>(self) -> Either<Self, B> where B: Stream<Item = Self::Item>, Self: Sized,1611 fn left_stream<B>(self) -> Either<Self, B> 1612 where 1613 B: Stream<Item = Self::Item>, 1614 Self: Sized, 1615 { 1616 assert_stream::<Self::Item, _>(Either::Left(self)) 1617 } 1618 1619 /// Wrap this stream in an `Either` stream, making it the right-hand variant 1620 /// of that `Either`. 1621 /// 1622 /// This can be used in combination with the `left_stream` method to write `if` 1623 /// statements that evaluate to different streams in different branches. right_stream<B>(self) -> Either<B, Self> where B: Stream<Item = Self::Item>, Self: Sized,1624 fn right_stream<B>(self) -> Either<B, Self> 1625 where 1626 B: Stream<Item = Self::Item>, 1627 Self: Sized, 1628 { 1629 assert_stream::<Self::Item, _>(Either::Right(self)) 1630 } 1631 1632 /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`] 1633 /// stream types. poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,1634 fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> 1635 where 1636 Self: Unpin, 1637 { 1638 Pin::new(self).poll_next(cx) 1639 } 1640 1641 /// Returns a [`Future`] that resolves when the next item in this stream is 1642 /// ready. 1643 /// 1644 /// This is similar to the [`next`][StreamExt::next] method, but it won't 1645 /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the 1646 /// returned future type will return `true` from 1647 /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing 1648 /// [`select_next_some`][StreamExt::select_next_some] to be easily used with 1649 /// the [`select!`] macro. 1650 /// 1651 /// If the future is polled after this [`Stream`] is empty it will panic. 1652 /// Using the future with a [`FusedFuture`][]-aware primitive like the 1653 /// [`select!`] macro will prevent this. 1654 /// 1655 /// [`FusedFuture`]: futures_core::future::FusedFuture 1656 /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated 1657 /// 1658 /// # Examples 1659 /// 1660 /// ``` 1661 /// # futures::executor::block_on(async { 1662 /// use futures::{future, select}; 1663 /// use futures::stream::{StreamExt, FuturesUnordered}; 1664 /// 1665 /// let mut fut = future::ready(1); 1666 /// let mut async_tasks = FuturesUnordered::new(); 1667 /// let mut total = 0; 1668 /// loop { 1669 /// select! { 1670 /// num = fut => { 1671 /// // First, the `ready` future completes. 1672 /// total += num; 1673 /// // Then we spawn a new task onto `async_tasks`, 1674 /// async_tasks.push(async { 5 }); 1675 /// }, 1676 /// // On the next iteration of the loop, the task we spawned 1677 /// // completes. 1678 /// num = async_tasks.select_next_some() => { 1679 /// total += num; 1680 /// } 1681 /// // Finally, both the `ready` future and `async_tasks` have 1682 /// // finished, so we enter the `complete` branch. 1683 /// complete => break, 1684 /// } 1685 /// } 1686 /// assert_eq!(total, 6); 1687 /// # }); 1688 /// ``` 1689 /// 1690 /// [`select!`]: crate::select select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream,1691 fn select_next_some(&mut self) -> SelectNextSome<'_, Self> 1692 where 1693 Self: Unpin + FusedStream, 1694 { 1695 assert_future::<Self::Item, _>(SelectNextSome::new(self)) 1696 } 1697 } 1698