1 use core::future::Future;
2 use futures_core::Stream;
3 
4 mod all;
5 use all::AllFuture;
6 
7 mod any;
8 use any::AnyFuture;
9 
10 mod chain;
11 pub use chain::Chain;
12 
13 pub(crate) mod collect;
14 use collect::{Collect, FromStream};
15 
16 mod filter;
17 pub use filter::Filter;
18 
19 mod filter_map;
20 pub use filter_map::FilterMap;
21 
22 mod fold;
23 use fold::FoldFuture;
24 
25 mod fuse;
26 pub use fuse::Fuse;
27 
28 mod map;
29 pub use map::Map;
30 
31 mod map_while;
32 pub use map_while::MapWhile;
33 
34 mod merge;
35 pub use merge::Merge;
36 
37 mod next;
38 use next::Next;
39 
40 mod skip;
41 pub use skip::Skip;
42 
43 mod skip_while;
44 pub use skip_while::SkipWhile;
45 
46 mod take;
47 pub use take::Take;
48 
49 mod take_while;
50 pub use take_while::TakeWhile;
51 
52 mod then;
53 pub use then::Then;
54 
55 mod try_next;
56 use try_next::TryNext;
57 
58 mod peekable;
59 pub use peekable::Peekable;
60 
61 cfg_time! {
62     pub(crate) mod timeout;
63     pub(crate) mod timeout_repeating;
64     pub use timeout::Timeout;
65     pub use timeout_repeating::TimeoutRepeating;
66     use tokio::time::{Duration, Interval};
67     mod throttle;
68     use throttle::{throttle, Throttle};
69     mod chunks_timeout;
70     pub use chunks_timeout::ChunksTimeout;
71 }
72 
73 /// An extension trait for the [`Stream`] trait that provides a variety of
74 /// convenient combinator functions.
75 ///
76 /// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
77 /// in the [futures] crate, however both Tokio and futures provide separate
78 /// `StreamExt` utility traits, and some utilities are only available on one of
79 /// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
80 /// trait in the futures crate.
81 ///
82 /// If you need utilities from both `StreamExt` traits, you should prefer to
83 /// import one of them, and use the other through the fully qualified call
84 /// syntax. For example:
85 /// ```
86 /// // import one of the traits:
87 /// use futures::stream::StreamExt;
88 /// # #[tokio::main(flavor = "current_thread")]
89 /// # async fn main() {
90 ///
91 /// let a = tokio_stream::iter(vec![1, 3, 5]);
92 /// let b = tokio_stream::iter(vec![2, 4, 6]);
93 ///
94 /// // use the fully qualified call syntax for the other trait:
95 /// let merged = tokio_stream::StreamExt::merge(a, b);
96 ///
97 /// // use normal call notation for futures::stream::StreamExt::collect
98 /// let output: Vec<_> = merged.collect().await;
99 /// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
100 /// # }
101 /// ```
102 ///
103 /// [`Stream`]: crate::Stream
104 /// [futures]: https://docs.rs/futures
105 /// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
106 pub trait StreamExt: Stream {
107     /// Consumes and returns the next value in the stream or `None` if the
108     /// stream is finished.
109     ///
110     /// Equivalent to:
111     ///
112     /// ```ignore
113     /// async fn next(&mut self) -> Option<Self::Item>;
114     /// ```
115     ///
116     /// Note that because `next` doesn't take ownership over the stream,
117     /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
118     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
119     /// be done by boxing the stream using [`Box::pin`] or
120     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
121     /// crate.
122     ///
123     /// # Cancel safety
124     ///
125     /// This method is cancel safe. The returned future only
126     /// holds onto a reference to the underlying stream,
127     /// so dropping it will never lose a value.
128     ///
129     /// # Examples
130     ///
131     /// ```
132     /// # #[tokio::main]
133     /// # async fn main() {
134     /// use tokio_stream::{self as stream, StreamExt};
135     ///
136     /// let mut stream = stream::iter(1..=3);
137     ///
138     /// assert_eq!(stream.next().await, Some(1));
139     /// assert_eq!(stream.next().await, Some(2));
140     /// assert_eq!(stream.next().await, Some(3));
141     /// assert_eq!(stream.next().await, None);
142     /// # }
143     /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,144     fn next(&mut self) -> Next<'_, Self>
145     where
146         Self: Unpin,
147     {
148         Next::new(self)
149     }
150 
151     /// Consumes and returns the next item in the stream. If an error is
152     /// encountered before the next item, the error is returned instead.
153     ///
154     /// Equivalent to:
155     ///
156     /// ```ignore
157     /// async fn try_next(&mut self) -> Result<Option<T>, E>;
158     /// ```
159     ///
160     /// This is similar to the [`next`](StreamExt::next) combinator,
161     /// but returns a [`Result<Option<T>, E>`](Result) rather than
162     /// an [`Option<Result<T, E>>`](Option), making for easy use
163     /// with the [`?`](std::ops::Try) operator.
164     ///
165     /// # Cancel safety
166     ///
167     /// This method is cancel safe. The returned future only
168     /// holds onto a reference to the underlying stream,
169     /// so dropping it will never lose a value.
170     ///
171     /// # Examples
172     ///
173     /// ```
174     /// # #[tokio::main]
175     /// # async fn main() {
176     /// use tokio_stream::{self as stream, StreamExt};
177     ///
178     /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
179     ///
180     /// assert_eq!(stream.try_next().await, Ok(Some(1)));
181     /// assert_eq!(stream.try_next().await, Ok(Some(2)));
182     /// assert_eq!(stream.try_next().await, Err("nope"));
183     /// # }
184     /// ```
try_next<T, E>(&mut self) -> TryNext<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin,185     fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
186     where
187         Self: Stream<Item = Result<T, E>> + Unpin,
188     {
189         TryNext::new(self)
190     }
191 
192     /// Maps this stream's items to a different type, returning a new stream of
193     /// the resulting type.
194     ///
195     /// The provided closure is executed over all elements of this stream as
196     /// they are made available. It is executed inline with calls to
197     /// [`poll_next`](Stream::poll_next).
198     ///
199     /// Note that this function consumes the stream passed into it and returns a
200     /// wrapped version of it, similar to the existing `map` methods in the
201     /// standard library.
202     ///
203     /// # Examples
204     ///
205     /// ```
206     /// # #[tokio::main]
207     /// # async fn main() {
208     /// use tokio_stream::{self as stream, StreamExt};
209     ///
210     /// let stream = stream::iter(1..=3);
211     /// let mut stream = stream.map(|x| x + 3);
212     ///
213     /// assert_eq!(stream.next().await, Some(4));
214     /// assert_eq!(stream.next().await, Some(5));
215     /// assert_eq!(stream.next().await, Some(6));
216     /// # }
217     /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,218     fn map<T, F>(self, f: F) -> Map<Self, F>
219     where
220         F: FnMut(Self::Item) -> T,
221         Self: Sized,
222     {
223         Map::new(self, f)
224     }
225 
226     /// Map this stream's items to a different type for as long as determined by
227     /// the provided closure. A stream of the target type will be returned,
228     /// which will yield elements until the closure returns `None`.
229     ///
230     /// The provided closure is executed over all elements of this stream as
231     /// they are made available, until it returns `None`. It is executed inline
232     /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
233     /// the underlying stream will not be polled again.
234     ///
235     /// Note that this function consumes the stream passed into it and returns a
236     /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
237     /// standard library.
238     ///
239     /// # Examples
240     ///
241     /// ```
242     /// # #[tokio::main]
243     /// # async fn main() {
244     /// use tokio_stream::{self as stream, StreamExt};
245     ///
246     /// let stream = stream::iter(1..=10);
247     /// let mut stream = stream.map_while(|x| {
248     ///     if x < 4 {
249     ///         Some(x + 3)
250     ///     } else {
251     ///         None
252     ///     }
253     /// });
254     /// assert_eq!(stream.next().await, Some(4));
255     /// assert_eq!(stream.next().await, Some(5));
256     /// assert_eq!(stream.next().await, Some(6));
257     /// assert_eq!(stream.next().await, None);
258     /// # }
259     /// ```
map_while<T, F>(self, f: F) -> MapWhile<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,260     fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
261     where
262         F: FnMut(Self::Item) -> Option<T>,
263         Self: Sized,
264     {
265         MapWhile::new(self, f)
266     }
267 
268     /// Maps this stream's items asynchronously to a different type, returning a
269     /// new stream of the resulting type.
270     ///
271     /// The provided closure is executed over all elements of this stream as
272     /// they are made available, and the returned future is executed. Only one
273     /// future is executed at the time.
274     ///
275     /// Note that this function consumes the stream passed into it and returns a
276     /// wrapped version of it, similar to the existing `then` methods in the
277     /// standard library.
278     ///
279     /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
280     /// returned by this method. To handle this, you can use `tokio::pin!` as in
281     /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
282     ///
283     /// # Examples
284     ///
285     /// ```
286     /// # #[tokio::main]
287     /// # async fn main() {
288     /// use tokio_stream::{self as stream, StreamExt};
289     ///
290     /// async fn do_async_work(value: i32) -> i32 {
291     ///     value + 3
292     /// }
293     ///
294     /// let stream = stream::iter(1..=3);
295     /// let stream = stream.then(do_async_work);
296     ///
297     /// tokio::pin!(stream);
298     ///
299     /// assert_eq!(stream.next().await, Some(4));
300     /// assert_eq!(stream.next().await, Some(5));
301     /// assert_eq!(stream.next().await, Some(6));
302     /// # }
303     /// ```
then<F, Fut>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,304     fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
305     where
306         F: FnMut(Self::Item) -> Fut,
307         Fut: Future,
308         Self: Sized,
309     {
310         Then::new(self, f)
311     }
312 
313     /// Combine two streams into one by interleaving the output of both as it
314     /// is produced.
315     ///
316     /// Values are produced from the merged stream in the order they arrive from
317     /// the two source streams. If both source streams provide values
318     /// simultaneously, the merge stream alternates between them. This provides
319     /// some level of fairness. You should not chain calls to `merge`, as this
320     /// will break the fairness of the merging.
321     ///
322     /// The merged stream completes once **both** source streams complete. When
323     /// one source stream completes before the other, the merge stream
324     /// exclusively polls the remaining stream.
325     ///
326     /// For merging multiple streams, consider using [`StreamMap`] instead.
327     ///
328     /// [`StreamMap`]: crate::StreamMap
329     ///
330     /// # Examples
331     ///
332     /// ```
333     /// use tokio_stream::{StreamExt, Stream};
334     /// use tokio::sync::mpsc;
335     /// use tokio::time;
336     ///
337     /// use std::time::Duration;
338     /// use std::pin::Pin;
339     ///
340     /// # /*
341     /// #[tokio::main]
342     /// # */
343     /// # #[tokio::main(flavor = "current_thread")]
344     /// async fn main() {
345     /// # time::pause();
346     ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
347     ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
348     ///
349     ///     // Convert the channels to a `Stream`.
350     ///     let rx1 = Box::pin(async_stream::stream! {
351     ///           while let Some(item) = rx1.recv().await {
352     ///               yield item;
353     ///           }
354     ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
355     ///
356     ///     let rx2 = Box::pin(async_stream::stream! {
357     ///           while let Some(item) = rx2.recv().await {
358     ///               yield item;
359     ///           }
360     ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
361     ///
362     ///     let mut rx = rx1.merge(rx2);
363     ///
364     ///     tokio::spawn(async move {
365     ///         // Send some values immediately
366     ///         tx1.send(1).await.unwrap();
367     ///         tx1.send(2).await.unwrap();
368     ///
369     ///         // Let the other task send values
370     ///         time::sleep(Duration::from_millis(20)).await;
371     ///
372     ///         tx1.send(4).await.unwrap();
373     ///     });
374     ///
375     ///     tokio::spawn(async move {
376     ///         // Wait for the first task to send values
377     ///         time::sleep(Duration::from_millis(5)).await;
378     ///
379     ///         tx2.send(3).await.unwrap();
380     ///
381     ///         time::sleep(Duration::from_millis(25)).await;
382     ///
383     ///         // Send the final value
384     ///         tx2.send(5).await.unwrap();
385     ///     });
386     ///
387     ///    assert_eq!(1, rx.next().await.unwrap());
388     ///    assert_eq!(2, rx.next().await.unwrap());
389     ///    assert_eq!(3, rx.next().await.unwrap());
390     ///    assert_eq!(4, rx.next().await.unwrap());
391     ///    assert_eq!(5, rx.next().await.unwrap());
392     ///
393     ///    // The merged stream is consumed
394     ///    assert!(rx.next().await.is_none());
395     /// }
396     /// ```
merge<U>(self, other: U) -> Merge<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,397     fn merge<U>(self, other: U) -> Merge<Self, U>
398     where
399         U: Stream<Item = Self::Item>,
400         Self: Sized,
401     {
402         Merge::new(self, other)
403     }
404 
405     /// Filters the values produced by this stream according to the provided
406     /// predicate.
407     ///
408     /// As values of this stream are made available, the provided predicate `f`
409     /// will be run against them. If the predicate
410     /// resolves to `true`, then the stream will yield the value, but if the
411     /// predicate resolves to `false`, then the value
412     /// will be discarded and the next value will be produced.
413     ///
414     /// Note that this function consumes the stream passed into it and returns a
415     /// wrapped version of it, similar to [`Iterator::filter`] method in the
416     /// standard library.
417     ///
418     /// # Examples
419     ///
420     /// ```
421     /// # #[tokio::main]
422     /// # async fn main() {
423     /// use tokio_stream::{self as stream, StreamExt};
424     ///
425     /// let stream = stream::iter(1..=8);
426     /// let mut evens = stream.filter(|x| x % 2 == 0);
427     ///
428     /// assert_eq!(Some(2), evens.next().await);
429     /// assert_eq!(Some(4), evens.next().await);
430     /// assert_eq!(Some(6), evens.next().await);
431     /// assert_eq!(Some(8), evens.next().await);
432     /// assert_eq!(None, evens.next().await);
433     /// # }
434     /// ```
filter<F>(self, f: F) -> Filter<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,435     fn filter<F>(self, f: F) -> Filter<Self, F>
436     where
437         F: FnMut(&Self::Item) -> bool,
438         Self: Sized,
439     {
440         Filter::new(self, f)
441     }
442 
443     /// Filters the values produced by this stream while simultaneously mapping
444     /// them to a different type according to the provided closure.
445     ///
446     /// As values of this stream are made available, the provided function will
447     /// be run on them. If the predicate `f` resolves to
448     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
449     /// it resolves to [`None`], then the value will be skipped.
450     ///
451     /// Note that this function consumes the stream passed into it and returns a
452     /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
453     /// standard library.
454     ///
455     /// # Examples
456     /// ```
457     /// # #[tokio::main]
458     /// # async fn main() {
459     /// use tokio_stream::{self as stream, StreamExt};
460     ///
461     /// let stream = stream::iter(1..=8);
462     /// let mut evens = stream.filter_map(|x| {
463     ///     if x % 2 == 0 { Some(x + 1) } else { None }
464     /// });
465     ///
466     /// assert_eq!(Some(3), evens.next().await);
467     /// assert_eq!(Some(5), evens.next().await);
468     /// assert_eq!(Some(7), evens.next().await);
469     /// assert_eq!(Some(9), evens.next().await);
470     /// assert_eq!(None, evens.next().await);
471     /// # }
472     /// ```
filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where F: FnMut(Self::Item) -> Option<T>, Self: Sized,473     fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
474     where
475         F: FnMut(Self::Item) -> Option<T>,
476         Self: Sized,
477     {
478         FilterMap::new(self, f)
479     }
480 
481     /// Creates a stream which ends after the first `None`.
482     ///
483     /// After a stream returns `None`, behavior is undefined. Future calls to
484     /// `poll_next` may or may not return `Some(T)` again or they may panic.
485     /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
486     /// return `None` forever.
487     ///
488     /// # Examples
489     ///
490     /// ```
491     /// use tokio_stream::{Stream, StreamExt};
492     ///
493     /// use std::pin::Pin;
494     /// use std::task::{Context, Poll};
495     ///
496     /// // a stream which alternates between Some and None
497     /// struct Alternate {
498     ///     state: i32,
499     /// }
500     ///
501     /// impl Stream for Alternate {
502     ///     type Item = i32;
503     ///
504     ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
505     ///         let val = self.state;
506     ///         self.state = self.state + 1;
507     ///
508     ///         // if it's even, Some(i32), else None
509     ///         if val % 2 == 0 {
510     ///             Poll::Ready(Some(val))
511     ///         } else {
512     ///             Poll::Ready(None)
513     ///         }
514     ///     }
515     /// }
516     ///
517     /// #[tokio::main]
518     /// async fn main() {
519     ///     let mut stream = Alternate { state: 0 };
520     ///
521     ///     // the stream goes back and forth
522     ///     assert_eq!(stream.next().await, Some(0));
523     ///     assert_eq!(stream.next().await, None);
524     ///     assert_eq!(stream.next().await, Some(2));
525     ///     assert_eq!(stream.next().await, None);
526     ///
527     ///     // however, once it is fused
528     ///     let mut stream = stream.fuse();
529     ///
530     ///     assert_eq!(stream.next().await, Some(4));
531     ///     assert_eq!(stream.next().await, None);
532     ///
533     ///     // it will always return `None` after the first time.
534     ///     assert_eq!(stream.next().await, None);
535     ///     assert_eq!(stream.next().await, None);
536     ///     assert_eq!(stream.next().await, None);
537     /// }
538     /// ```
fuse(self) -> Fuse<Self> where Self: Sized,539     fn fuse(self) -> Fuse<Self>
540     where
541         Self: Sized,
542     {
543         Fuse::new(self)
544     }
545 
546     /// Creates a new stream of at most `n` items of the underlying stream.
547     ///
548     /// Once `n` items have been yielded from this stream then it will always
549     /// return that the stream is done.
550     ///
551     /// # Examples
552     ///
553     /// ```
554     /// # #[tokio::main]
555     /// # async fn main() {
556     /// use tokio_stream::{self as stream, StreamExt};
557     ///
558     /// let mut stream = stream::iter(1..=10).take(3);
559     ///
560     /// assert_eq!(Some(1), stream.next().await);
561     /// assert_eq!(Some(2), stream.next().await);
562     /// assert_eq!(Some(3), stream.next().await);
563     /// assert_eq!(None, stream.next().await);
564     /// # }
565     /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,566     fn take(self, n: usize) -> Take<Self>
567     where
568         Self: Sized,
569     {
570         Take::new(self, n)
571     }
572 
573     /// Take elements from this stream while the provided predicate
574     /// resolves to `true`.
575     ///
576     /// This function, like `Iterator::take_while`, will take elements from the
577     /// stream until the predicate `f` resolves to `false`. Once one element
578     /// returns false it will always return that the stream is done.
579     ///
580     /// # Examples
581     ///
582     /// ```
583     /// # #[tokio::main]
584     /// # async fn main() {
585     /// use tokio_stream::{self as stream, StreamExt};
586     ///
587     /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
588     ///
589     /// assert_eq!(Some(1), stream.next().await);
590     /// assert_eq!(Some(2), stream.next().await);
591     /// assert_eq!(Some(3), stream.next().await);
592     /// assert_eq!(None, stream.next().await);
593     /// # }
594     /// ```
take_while<F>(self, f: F) -> TakeWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,595     fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
596     where
597         F: FnMut(&Self::Item) -> bool,
598         Self: Sized,
599     {
600         TakeWhile::new(self, f)
601     }
602 
603     /// Creates a new stream that will skip the `n` first items of the
604     /// underlying stream.
605     ///
606     /// # Examples
607     ///
608     /// ```
609     /// # #[tokio::main]
610     /// # async fn main() {
611     /// use tokio_stream::{self as stream, StreamExt};
612     ///
613     /// let mut stream = stream::iter(1..=10).skip(7);
614     ///
615     /// assert_eq!(Some(8), stream.next().await);
616     /// assert_eq!(Some(9), stream.next().await);
617     /// assert_eq!(Some(10), stream.next().await);
618     /// assert_eq!(None, stream.next().await);
619     /// # }
620     /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,621     fn skip(self, n: usize) -> Skip<Self>
622     where
623         Self: Sized,
624     {
625         Skip::new(self, n)
626     }
627 
628     /// Skip elements from the underlying stream while the provided predicate
629     /// resolves to `true`.
630     ///
631     /// This function, like [`Iterator::skip_while`], will ignore elements from the
632     /// stream until the predicate `f` resolves to `false`. Once one element
633     /// returns false, the rest of the elements will be yielded.
634     ///
635     /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
636     ///
637     /// # Examples
638     ///
639     /// ```
640     /// # #[tokio::main]
641     /// # async fn main() {
642     /// use tokio_stream::{self as stream, StreamExt};
643     /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
644     ///
645     /// assert_eq!(Some(3), stream.next().await);
646     /// assert_eq!(Some(4), stream.next().await);
647     /// assert_eq!(Some(1), stream.next().await);
648     /// assert_eq!(None, stream.next().await);
649     /// # }
650     /// ```
skip_while<F>(self, f: F) -> SkipWhile<Self, F> where F: FnMut(&Self::Item) -> bool, Self: Sized,651     fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
652     where
653         F: FnMut(&Self::Item) -> bool,
654         Self: Sized,
655     {
656         SkipWhile::new(self, f)
657     }
658 
659     /// Tests if every element of the stream matches a predicate.
660     ///
661     /// Equivalent to:
662     ///
663     /// ```ignore
664     /// async fn all<F>(&mut self, f: F) -> bool;
665     /// ```
666     ///
667     /// `all()` takes a closure that returns `true` or `false`. It applies
668     /// this closure to each element of the stream, and if they all return
669     /// `true`, then so does `all`. If any of them return `false`, it
670     /// returns `false`. An empty stream returns `true`.
671     ///
672     /// `all()` is short-circuiting; in other words, it will stop processing
673     /// as soon as it finds a `false`, given that no matter what else happens,
674     /// the result will also be `false`.
675     ///
676     /// An empty stream returns `true`.
677     ///
678     /// # Examples
679     ///
680     /// Basic usage:
681     ///
682     /// ```
683     /// # #[tokio::main]
684     /// # async fn main() {
685     /// use tokio_stream::{self as stream, StreamExt};
686     ///
687     /// let a = [1, 2, 3];
688     ///
689     /// assert!(stream::iter(&a).all(|&x| x > 0).await);
690     ///
691     /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
692     /// # }
693     /// ```
694     ///
695     /// Stopping at the first `false`:
696     ///
697     /// ```
698     /// # #[tokio::main]
699     /// # async fn main() {
700     /// use tokio_stream::{self as stream, StreamExt};
701     ///
702     /// let a = [1, 2, 3];
703     ///
704     /// let mut iter = stream::iter(&a);
705     ///
706     /// assert!(!iter.all(|&x| x != 2).await);
707     ///
708     /// // we can still use `iter`, as there are more elements.
709     /// assert_eq!(iter.next().await, Some(&3));
710     /// # }
711     /// ```
all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,712     fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
713     where
714         Self: Unpin,
715         F: FnMut(Self::Item) -> bool,
716     {
717         AllFuture::new(self, f)
718     }
719 
720     /// Tests if any element of the stream matches a predicate.
721     ///
722     /// Equivalent to:
723     ///
724     /// ```ignore
725     /// async fn any<F>(&mut self, f: F) -> bool;
726     /// ```
727     ///
728     /// `any()` takes a closure that returns `true` or `false`. It applies
729     /// this closure to each element of the stream, and if any of them return
730     /// `true`, then so does `any()`. If they all return `false`, it
731     /// returns `false`.
732     ///
733     /// `any()` is short-circuiting; in other words, it will stop processing
734     /// as soon as it finds a `true`, given that no matter what else happens,
735     /// the result will also be `true`.
736     ///
737     /// An empty stream returns `false`.
738     ///
739     /// Basic usage:
740     ///
741     /// ```
742     /// # #[tokio::main]
743     /// # async fn main() {
744     /// use tokio_stream::{self as stream, StreamExt};
745     ///
746     /// let a = [1, 2, 3];
747     ///
748     /// assert!(stream::iter(&a).any(|&x| x > 0).await);
749     ///
750     /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
751     /// # }
752     /// ```
753     ///
754     /// Stopping at the first `true`:
755     ///
756     /// ```
757     /// # #[tokio::main]
758     /// # async fn main() {
759     /// use tokio_stream::{self as stream, StreamExt};
760     ///
761     /// let a = [1, 2, 3];
762     ///
763     /// let mut iter = stream::iter(&a);
764     ///
765     /// assert!(iter.any(|&x| x != 2).await);
766     ///
767     /// // we can still use `iter`, as there are more elements.
768     /// assert_eq!(iter.next().await, Some(&2));
769     /// # }
770     /// ```
any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> bool,771     fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
772     where
773         Self: Unpin,
774         F: FnMut(Self::Item) -> bool,
775     {
776         AnyFuture::new(self, f)
777     }
778 
779     /// Combine two streams into one by first returning all values from the
780     /// first stream then all values from the second stream.
781     ///
782     /// As long as `self` still has values to emit, no values from `other` are
783     /// emitted, even if some are ready.
784     ///
785     /// # Examples
786     ///
787     /// ```
788     /// use tokio_stream::{self as stream, StreamExt};
789     ///
790     /// #[tokio::main]
791     /// async fn main() {
792     ///     let one = stream::iter(vec![1, 2, 3]);
793     ///     let two = stream::iter(vec![4, 5, 6]);
794     ///
795     ///     let mut stream = one.chain(two);
796     ///
797     ///     assert_eq!(stream.next().await, Some(1));
798     ///     assert_eq!(stream.next().await, Some(2));
799     ///     assert_eq!(stream.next().await, Some(3));
800     ///     assert_eq!(stream.next().await, Some(4));
801     ///     assert_eq!(stream.next().await, Some(5));
802     ///     assert_eq!(stream.next().await, Some(6));
803     ///     assert_eq!(stream.next().await, None);
804     /// }
805     /// ```
chain<U>(self, other: U) -> Chain<Self, U> where U: Stream<Item = Self::Item>, Self: Sized,806     fn chain<U>(self, other: U) -> Chain<Self, U>
807     where
808         U: Stream<Item = Self::Item>,
809         Self: Sized,
810     {
811         Chain::new(self, other)
812     }
813 
814     /// A combinator that applies a function to every element in a stream
815     /// producing a single, final value.
816     ///
817     /// Equivalent to:
818     ///
819     /// ```ignore
820     /// async fn fold<B, F>(self, init: B, f: F) -> B;
821     /// ```
822     ///
823     /// # Examples
824     /// Basic usage:
825     /// ```
826     /// # #[tokio::main]
827     /// # async fn main() {
828     /// use tokio_stream::{self as stream, *};
829     ///
830     /// let s = stream::iter(vec![1u8, 2, 3]);
831     /// let sum = s.fold(0, |acc, x| acc + x).await;
832     ///
833     /// assert_eq!(sum, 6);
834     /// # }
835     /// ```
fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where Self: Sized, F: FnMut(B, Self::Item) -> B,836     fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
837     where
838         Self: Sized,
839         F: FnMut(B, Self::Item) -> B,
840     {
841         FoldFuture::new(self, init, f)
842     }
843 
844     /// Drain stream pushing all emitted values into a collection.
845     ///
846     /// Equivalent to:
847     ///
848     /// ```ignore
849     /// async fn collect<T>(self) -> T;
850     /// ```
851     ///
852     /// `collect` streams all values, awaiting as needed. Values are pushed into
853     /// a collection. A number of different target collection types are
854     /// supported, including [`Vec`], [`String`], and [`Bytes`].
855     ///
856     /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
857     ///
858     /// # `Result`
859     ///
860     /// `collect()` can also be used with streams of type `Result<T, E>` where
861     /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
862     /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
863     /// streaming is terminated and `collect()` returns the `Err`.
864     ///
865     /// # Notes
866     ///
867     /// `FromStream` is currently a sealed trait. Stabilization is pending
868     /// enhancements to the Rust language.
869     ///
870     /// # Examples
871     ///
872     /// Basic usage:
873     ///
874     /// ```
875     /// use tokio_stream::{self as stream, StreamExt};
876     ///
877     /// #[tokio::main]
878     /// async fn main() {
879     ///     let doubled: Vec<i32> =
880     ///         stream::iter(vec![1, 2, 3])
881     ///             .map(|x| x * 2)
882     ///             .collect()
883     ///             .await;
884     ///
885     ///     assert_eq!(vec![2, 4, 6], doubled);
886     /// }
887     /// ```
888     ///
889     /// Collecting a stream of `Result` values
890     ///
891     /// ```
892     /// use tokio_stream::{self as stream, StreamExt};
893     ///
894     /// #[tokio::main]
895     /// async fn main() {
896     ///     // A stream containing only `Ok` values will be collected
897     ///     let values: Result<Vec<i32>, &str> =
898     ///         stream::iter(vec![Ok(1), Ok(2), Ok(3)])
899     ///             .collect()
900     ///             .await;
901     ///
902     ///     assert_eq!(Ok(vec![1, 2, 3]), values);
903     ///
904     ///     // A stream containing `Err` values will return the first error.
905     ///     let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
906     ///
907     ///     let values: Result<Vec<i32>, &str> =
908     ///         stream::iter(results)
909     ///             .collect()
910     ///             .await;
911     ///
912     ///     assert_eq!(Err("no"), values);
913     /// }
914     /// ```
collect<T>(self) -> Collect<Self, T> where T: FromStream<Self::Item>, Self: Sized,915     fn collect<T>(self) -> Collect<Self, T>
916     where
917         T: FromStream<Self::Item>,
918         Self: Sized,
919     {
920         Collect::new(self)
921     }
922 
923     /// Applies a per-item timeout to the passed stream.
924     ///
925     /// `timeout()` takes a `Duration` that represents the maximum amount of
926     /// time each element of the stream has to complete before timing out.
927     ///
928     /// If the wrapped stream yields a value before the deadline is reached, the
929     /// value is returned. Otherwise, an error is returned. The caller may decide
930     /// to continue consuming the stream and will eventually get the next source
931     /// stream value once it becomes available. See
932     /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
933     /// where the timeouts will repeat.
934     ///
935     /// # Notes
936     ///
937     /// This function consumes the stream passed into it and returns a
938     /// wrapped version of it.
939     ///
940     /// Polling the returned stream will continue to poll the inner stream even
941     /// if one or more items time out.
942     ///
943     /// # Examples
944     ///
945     /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
946     ///
947     /// ```
948     /// # #[tokio::main]
949     /// # async fn main() {
950     /// use tokio_stream::{self as stream, StreamExt};
951     /// use std::time::Duration;
952     /// # let int_stream = stream::iter(1..=3);
953     ///
954     /// let int_stream = int_stream.timeout(Duration::from_secs(1));
955     /// tokio::pin!(int_stream);
956     ///
957     /// // When no items time out, we get the 3 elements in succession:
958     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
959     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
960     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
961     /// assert_eq!(int_stream.try_next().await, Ok(None));
962     ///
963     /// // If the second item times out, we get an error and continue polling the stream:
964     /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
965     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
966     /// assert!(int_stream.try_next().await.is_err());
967     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
968     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
969     /// assert_eq!(int_stream.try_next().await, Ok(None));
970     ///
971     /// // If we want to stop consuming the source stream the first time an
972     /// // element times out, we can use the `take_while` operator:
973     /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
974     /// let mut int_stream = int_stream.take_while(Result::is_ok);
975     ///
976     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
977     /// assert_eq!(int_stream.try_next().await, Ok(None));
978     /// # }
979     /// ```
980     ///
981     /// Once a timeout error is received, no further events will be received
982     /// unless the wrapped stream yields a value (timeouts do not repeat).
983     ///
984     /// ```
985     /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
986     /// # async fn main() {
987     /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
988     /// use std::time::Duration;
989     /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
990     /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
991     /// tokio::pin!(timeout_stream);
992     ///
993     /// // Only one timeout will be received between values in the source stream.
994     /// assert!(timeout_stream.try_next().await.is_ok());
995     /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
996     /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts");
997     /// # }
998     /// ```
999     #[cfg(feature = "time")]
1000     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout(self, duration: Duration) -> Timeout<Self> where Self: Sized,1001     fn timeout(self, duration: Duration) -> Timeout<Self>
1002     where
1003         Self: Sized,
1004     {
1005         Timeout::new(self, duration)
1006     }
1007 
1008     /// Applies a per-item timeout to the passed stream.
1009     ///
1010     /// `timeout_repeating()` takes an [`Interval`] that controls the time each
1011     /// element of the stream has to complete before timing out.
1012     ///
1013     /// If the wrapped stream yields a value before the deadline is reached, the
1014     /// value is returned. Otherwise, an error is returned. The caller may decide
1015     /// to continue consuming the stream and will eventually get the next source
1016     /// stream value once it becomes available. Unlike `timeout()`, if no value
1017     /// becomes available before the deadline is reached, additional errors are
1018     /// returned at the specified interval. See [`timeout`](StreamExt::timeout)
1019     /// for an alternative where the timeouts do not repeat.
1020     ///
1021     /// # Notes
1022     ///
1023     /// This function consumes the stream passed into it and returns a
1024     /// wrapped version of it.
1025     ///
1026     /// Polling the returned stream will continue to poll the inner stream even
1027     /// if one or more items time out.
1028     ///
1029     /// # Examples
1030     ///
1031     /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
1032     ///
1033     /// ```
1034     /// # #[tokio::main]
1035     /// # async fn main() {
1036     /// use tokio_stream::{self as stream, StreamExt};
1037     /// use std::time::Duration;
1038     /// # let int_stream = stream::iter(1..=3);
1039     ///
1040     /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
1041     /// tokio::pin!(int_stream);
1042     ///
1043     /// // When no items time out, we get the 3 elements in succession:
1044     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1045     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1046     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1047     /// assert_eq!(int_stream.try_next().await, Ok(None));
1048     ///
1049     /// // If the second item times out, we get an error and continue polling the stream:
1050     /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1051     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1052     /// assert!(int_stream.try_next().await.is_err());
1053     /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
1054     /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
1055     /// assert_eq!(int_stream.try_next().await, Ok(None));
1056     ///
1057     /// // If we want to stop consuming the source stream the first time an
1058     /// // element times out, we can use the `take_while` operator:
1059     /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
1060     /// let mut int_stream = int_stream.take_while(Result::is_ok);
1061     ///
1062     /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
1063     /// assert_eq!(int_stream.try_next().await, Ok(None));
1064     /// # }
1065     /// ```
1066     ///
1067     /// Timeout errors will be continuously produced at the specified interval
1068     /// until the wrapped stream yields a value.
1069     ///
1070     /// ```
1071     /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1072     /// # async fn main() {
1073     /// use tokio_stream::{StreamExt, wrappers::IntervalStream};
1074     /// use std::time::Duration;
1075     /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
1076     /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
1077     /// tokio::pin!(timeout_stream);
1078     ///
1079     /// // Multiple timeouts will be received between values in the source stream.
1080     /// assert!(timeout_stream.try_next().await.is_ok());
1081     /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout");
1082     /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout");
1083     /// // Will eventually receive another value from the source stream...
1084     /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout");
1085     /// # }
1086     /// ```
1087     #[cfg(feature = "time")]
1088     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self> where Self: Sized,1089     fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating<Self>
1090     where
1091         Self: Sized,
1092     {
1093         TimeoutRepeating::new(self, interval)
1094     }
1095 
1096     /// Slows down a stream by enforcing a delay between items.
1097     ///
1098     /// The underlying timer behind this utility has a granularity of one millisecond.
1099     ///
1100     /// # Example
1101     ///
1102     /// Create a throttled stream.
1103     /// ```rust,no_run
1104     /// use std::time::Duration;
1105     /// use tokio_stream::StreamExt;
1106     ///
1107     /// # async fn dox() {
1108     /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
1109     /// tokio::pin!(item_stream);
1110     ///
1111     /// loop {
1112     ///     // The string will be produced at most every 2 seconds
1113     ///     println!("{:?}", item_stream.next().await);
1114     /// }
1115     /// # }
1116     /// ```
1117     #[cfg(feature = "time")]
1118     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
throttle(self, duration: Duration) -> Throttle<Self> where Self: Sized,1119     fn throttle(self, duration: Duration) -> Throttle<Self>
1120     where
1121         Self: Sized,
1122     {
1123         throttle(duration, self)
1124     }
1125 
1126     /// Batches the items in the given stream using a maximum duration and size for each batch.
1127     ///
1128     /// This stream returns the next batch of items in the following situations:
1129     ///  1. The inner stream has returned at least `max_size` many items since the last batch.
1130     ///  2. The time since the first item of a batch is greater than the given duration.
1131     ///  3. The end of the stream is reached.
1132     ///
1133     /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
1134     /// will not be emitted if no items are received upstream.
1135     ///
1136     /// # Panics
1137     ///
1138     /// This function panics if `max_size` is zero
1139     ///
1140     /// # Example
1141     ///
1142     /// ```rust
1143     /// use std::time::Duration;
1144     /// use tokio::time;
1145     /// use tokio_stream::{self as stream, StreamExt};
1146     /// use futures::FutureExt;
1147     ///
1148     /// #[tokio::main]
1149     /// # async fn _unused() {}
1150     /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1151     /// async fn main() {
1152     ///     let iter = vec![1, 2, 3, 4].into_iter();
1153     ///     let stream0 = stream::iter(iter);
1154     ///
1155     ///     let iter = vec![5].into_iter();
1156     ///     let stream1 = stream::iter(iter)
1157     ///          .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
1158     ///
1159     ///     let chunk_stream = stream0
1160     ///         .chain(stream1)
1161     ///         .chunks_timeout(3, Duration::from_secs(2));
1162     ///     tokio::pin!(chunk_stream);
1163     ///
1164     ///     // a full batch was received
1165     ///     assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
1166     ///     // deadline was reached before max_size was reached
1167     ///     assert_eq!(chunk_stream.next().await, Some(vec![4]));
1168     ///     // last element in the stream
1169     ///     assert_eq!(chunk_stream.next().await, Some(vec![5]));
1170     /// }
1171     /// ```
1172     #[cfg(feature = "time")]
1173     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
1174     #[track_caller]
chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self> where Self: Sized,1175     fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
1176     where
1177         Self: Sized,
1178     {
1179         assert!(max_size > 0, "`max_size` must be non-zero.");
1180         ChunksTimeout::new(self, max_size, duration)
1181     }
1182 
1183     /// Turns the stream into a peekable stream, whose next element can be peeked at without being
1184     /// consumed.
1185     /// ```rust
1186     /// use tokio_stream::{self as stream, StreamExt};
1187     ///
1188     /// #[tokio::main]
1189     /// # async fn _unused() {}
1190     /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
1191     /// async fn main() {
1192     ///     let iter = vec![1, 2, 3, 4].into_iter();
1193     ///     let mut stream = stream::iter(iter).peekable();
1194     ///
1195     ///     assert_eq!(*stream.peek().await.unwrap(), 1);
1196     ///     assert_eq!(*stream.peek().await.unwrap(), 1);
1197     ///     assert_eq!(stream.next().await.unwrap(), 1);
1198     ///     assert_eq!(*stream.peek().await.unwrap(), 2);
1199     /// }
1200     /// ```
peekable(self) -> Peekable<Self> where Self: Sized,1201     fn peekable(self) -> Peekable<Self>
1202     where
1203         Self: Sized,
1204     {
1205         Peekable::new(self)
1206     }
1207 }
1208 
1209 impl<St: ?Sized> StreamExt for St where St: Stream {}
1210 
1211 /// Merge the size hints from two streams.
merge_size_hints( (left_low, left_high): (usize, Option<usize>), (right_low, right_high): (usize, Option<usize>), ) -> (usize, Option<usize>)1212 fn merge_size_hints(
1213     (left_low, left_high): (usize, Option<usize>),
1214     (right_low, right_high): (usize, Option<usize>),
1215 ) -> (usize, Option<usize>) {
1216     let low = left_low.saturating_add(right_low);
1217     let high = match (left_high, right_high) {
1218         (Some(h1), Some(h2)) => h1.checked_add(h2),
1219         _ => None,
1220     };
1221     (low, high)
1222 }
1223