1 //! Streams
2 //!
3 //! This module contains a number of functions for working with `Stream`s,
4 //! including the `StreamExt` trait which adds methods to `Stream` types.
5 
6 use crate::future::{assert_future, Either};
7 use crate::stream::assert_stream;
8 #[cfg(feature = "alloc")]
9 use alloc::boxed::Box;
10 #[cfg(feature = "alloc")]
11 use alloc::vec::Vec;
12 use core::pin::Pin;
13 #[cfg(feature = "sink")]
14 use futures_core::stream::TryStream;
15 #[cfg(feature = "alloc")]
16 use futures_core::stream::{BoxStream, LocalBoxStream};
17 use futures_core::{
18     future::Future,
19     stream::{FusedStream, Stream},
20     task::{Context, Poll},
21 };
22 #[cfg(feature = "sink")]
23 use futures_sink::Sink;
24 
25 use crate::fns::{inspect_fn, InspectFn};
26 
27 mod chain;
28 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
29 pub use self::chain::Chain;
30 
31 mod collect;
32 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
33 pub use self::collect::Collect;
34 
35 mod unzip;
36 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
37 pub use self::unzip::Unzip;
38 
39 mod concat;
40 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
41 pub use self::concat::Concat;
42 
43 mod count;
44 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
45 pub use self::count::Count;
46 
47 mod cycle;
48 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
49 pub use self::cycle::Cycle;
50 
51 mod enumerate;
52 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
53 pub use self::enumerate::Enumerate;
54 
55 mod filter;
56 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
57 pub use self::filter::Filter;
58 
59 mod filter_map;
60 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
61 pub use self::filter_map::FilterMap;
62 
63 mod flatten;
64 
65 delegate_all!(
66     /// Stream for the [`flatten`](StreamExt::flatten) method.
67     Flatten<St>(
68         flatten::Flatten<St, St::Item>
69     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)]
70     where St: Stream
71 );
72 
73 mod fold;
74 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
75 pub use self::fold::Fold;
76 
77 mod any;
78 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
79 pub use self::any::Any;
80 
81 mod all;
82 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
83 pub use self::all::All;
84 
85 #[cfg(feature = "sink")]
86 mod forward;
87 
88 #[cfg(feature = "sink")]
89 delegate_all!(
90     /// Future for the [`forward`](super::StreamExt::forward) method.
91     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
92     Forward<St, Si>(
93         forward::Forward<St, Si, St::Ok>
94     ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)]
95     where St: TryStream
96 );
97 
98 mod for_each;
99 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
100 pub use self::for_each::ForEach;
101 
102 mod fuse;
103 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
104 pub use self::fuse::Fuse;
105 
106 mod into_future;
107 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
108 pub use self::into_future::StreamFuture;
109 
110 delegate_all!(
111     /// Stream for the [`inspect`](StreamExt::inspect) method.
112     Inspect<St, F>(
113         map::Map<St, InspectFn<F>>
114     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))]
115 );
116 
117 mod map;
118 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
119 pub use self::map::Map;
120 
121 delegate_all!(
122     /// Stream for the [`flat_map`](StreamExt::flat_map) method.
123     FlatMap<St, U, F>(
124         flatten::Flatten<Map<St, F>, U>
125     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))]
126 );
127 
128 mod next;
129 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
130 pub use self::next::Next;
131 
132 mod select_next_some;
133 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
134 pub use self::select_next_some::SelectNextSome;
135 
136 mod peek;
137 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
138 pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable};
139 
140 mod skip;
141 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
142 pub use self::skip::Skip;
143 
144 mod skip_while;
145 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
146 pub use self::skip_while::SkipWhile;
147 
148 mod take;
149 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
150 pub use self::take::Take;
151 
152 mod take_while;
153 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
154 pub use self::take_while::TakeWhile;
155 
156 mod take_until;
157 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
158 pub use self::take_until::TakeUntil;
159 
160 mod then;
161 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
162 pub use self::then::Then;
163 
164 mod zip;
165 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
166 pub use self::zip::Zip;
167 
168 #[cfg(feature = "alloc")]
169 mod chunks;
170 #[cfg(feature = "alloc")]
171 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
172 pub use self::chunks::Chunks;
173 
174 #[cfg(feature = "alloc")]
175 mod ready_chunks;
176 #[cfg(feature = "alloc")]
177 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
178 pub use self::ready_chunks::ReadyChunks;
179 
180 mod scan;
181 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
182 pub use self::scan::Scan;
183 
184 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
185 #[cfg(feature = "alloc")]
186 mod buffer_unordered;
187 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
188 #[cfg(feature = "alloc")]
189 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
190 pub use self::buffer_unordered::BufferUnordered;
191 
192 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
193 #[cfg(feature = "alloc")]
194 mod buffered;
195 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
196 #[cfg(feature = "alloc")]
197 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
198 pub use self::buffered::Buffered;
199 
200 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
201 #[cfg(feature = "alloc")]
202 pub(crate) mod flatten_unordered;
203 
204 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
205 #[cfg(feature = "alloc")]
206 #[allow(unreachable_pub)]
207 pub use self::flatten_unordered::FlattenUnordered;
208 
209 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
210 #[cfg(feature = "alloc")]
211 delegate_all!(
212     /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method.
213     FlatMapUnordered<St, U, F>(
214         FlattenUnordered<Map<St, F>>
215     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)]
216     where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
217 );
218 
219 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
220 #[cfg(feature = "alloc")]
221 mod for_each_concurrent;
222 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
223 #[cfg(feature = "alloc")]
224 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
225 pub use self::for_each_concurrent::ForEachConcurrent;
226 
227 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
228 #[cfg(feature = "sink")]
229 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
230 #[cfg(feature = "alloc")]
231 mod split;
232 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
233 #[cfg(feature = "sink")]
234 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
235 #[cfg(feature = "alloc")]
236 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
237 pub use self::split::{ReuniteError, SplitSink, SplitStream};
238 
239 #[cfg(feature = "std")]
240 mod catch_unwind;
241 #[cfg(feature = "std")]
242 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
243 pub use self::catch_unwind::CatchUnwind;
244 
245 impl<T: ?Sized> StreamExt for T where T: Stream {}
246 
247 /// An extension trait for `Stream`s that provides a variety of convenient
248 /// combinator functions.
249 pub trait StreamExt: Stream {
250     /// Creates a future that resolves to the next item in the stream.
251     ///
252     /// Note that because `next` doesn't take ownership over the stream,
253     /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
254     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
255     /// be done by boxing the stream using [`Box::pin`] or
256     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
257     /// crate.
258     ///
259     /// # Examples
260     ///
261     /// ```
262     /// # futures::executor::block_on(async {
263     /// use futures::stream::{self, StreamExt};
264     ///
265     /// let mut stream = stream::iter(1..=3);
266     ///
267     /// assert_eq!(stream.next().await, Some(1));
268     /// assert_eq!(stream.next().await, Some(2));
269     /// assert_eq!(stream.next().await, Some(3));
270     /// assert_eq!(stream.next().await, None);
271     /// # });
272     /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,273     fn next(&mut self) -> Next<'_, Self>
274     where
275         Self: Unpin,
276     {
277         assert_future::<Option<Self::Item>, _>(Next::new(self))
278     }
279 
280     /// Converts this stream into a future of `(next_item, tail_of_stream)`.
281     /// If the stream terminates, then the next item is [`None`].
282     ///
283     /// The returned future can be used to compose streams and futures together
284     /// by placing everything into the "world of futures".
285     ///
286     /// Note that because `into_future` moves the stream, the [`Stream`] type
287     /// must be [`Unpin`]. If you want to use `into_future` with a
288     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
289     /// be done by boxing the stream using [`Box::pin`] or
290     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
291     /// crate.
292     ///
293     /// # Examples
294     ///
295     /// ```
296     /// # futures::executor::block_on(async {
297     /// use futures::stream::{self, StreamExt};
298     ///
299     /// let stream = stream::iter(1..=3);
300     ///
301     /// let (item, stream) = stream.into_future().await;
302     /// assert_eq!(Some(1), item);
303     ///
304     /// let (item, stream) = stream.into_future().await;
305     /// assert_eq!(Some(2), item);
306     /// # });
307     /// ```
into_future(self) -> StreamFuture<Self> where Self: Sized + Unpin,308     fn into_future(self) -> StreamFuture<Self>
309     where
310         Self: Sized + Unpin,
311     {
312         assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self))
313     }
314 
315     /// Maps this stream's items to a different type, returning a new stream of
316     /// the resulting type.
317     ///
318     /// The provided closure is executed over all elements of this stream as
319     /// they are made available. It is executed inline with calls to
320     /// [`poll_next`](Stream::poll_next).
321     ///
322     /// Note that this function consumes the stream passed into it and returns a
323     /// wrapped version of it, similar to the existing `map` methods in the
324     /// standard library.
325     ///
326     /// See [`StreamExt::then`](Self::then) if you want to use a closure that
327     /// returns a future instead of a value.
328     ///
329     /// # Examples
330     ///
331     /// ```
332     /// # futures::executor::block_on(async {
333     /// use futures::stream::{self, StreamExt};
334     ///
335     /// let stream = stream::iter(1..=3);
336     /// let stream = stream.map(|x| x + 3);
337     ///
338     /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
339     /// # });
340     /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,341     fn map<T, F>(self, f: F) -> Map<Self, F>
342     where
343         F: FnMut(Self::Item) -> T,
344         Self: Sized,
345     {
346         assert_stream::<T, _>(Map::new(self, f))
347     }
348 
349     /// Creates a stream which gives the current iteration count as well as
350     /// the next value.
351     ///
352     /// The stream returned yields pairs `(i, val)`, where `i` is the
353     /// current index of iteration and `val` is the value returned by the
354     /// stream.
355     ///
356     /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a
357     /// different sized integer, the [`zip`](StreamExt::zip) function provides similar
358     /// functionality.
359     ///
360     /// # Overflow Behavior
361     ///
362     /// The method does no guarding against overflows, so enumerating more than
363     /// [`usize::MAX`] elements either produces the wrong result or panics. If
364     /// debug assertions are enabled, a panic is guaranteed.
365     ///
366     /// # Panics
367     ///
368     /// The returned stream might panic if the to-be-returned index would
369     /// overflow a [`usize`].
370     ///
371     /// # Examples
372     ///
373     /// ```
374     /// # futures::executor::block_on(async {
375     /// use futures::stream::{self, StreamExt};
376     ///
377     /// let stream = stream::iter(vec!['a', 'b', 'c']);
378     ///
379     /// let mut stream = stream.enumerate();
380     ///
381     /// assert_eq!(stream.next().await, Some((0, 'a')));
382     /// assert_eq!(stream.next().await, Some((1, 'b')));
383     /// assert_eq!(stream.next().await, Some((2, 'c')));
384     /// assert_eq!(stream.next().await, None);
385     /// # });
386     /// ```
enumerate(self) -> Enumerate<Self> where Self: Sized,387     fn enumerate(self) -> Enumerate<Self>
388     where
389         Self: Sized,
390     {
391         assert_stream::<(usize, Self::Item), _>(Enumerate::new(self))
392     }
393 
394     /// Filters the values produced by this stream according to the provided
395     /// asynchronous predicate.
396     ///
397     /// As values of this stream are made available, the provided predicate `f`
398     /// will be run against them. If the predicate returns a `Future` which
399     /// resolves to `true`, then the stream will yield the value, but if the
400     /// predicate returns a `Future` which resolves to `false`, then the value
401     /// will be discarded and the next value will be produced.
402     ///
403     /// Note that this function consumes the stream passed into it and returns a
404     /// wrapped version of it, similar to the existing `filter` methods in the
405     /// standard library.
406     ///
407     /// # Examples
408     ///
409     /// ```
410     /// # futures::executor::block_on(async {
411     /// use futures::future;
412     /// use futures::stream::{self, StreamExt};
413     ///
414     /// let stream = stream::iter(1..=10);
415     /// let events = stream.filter(|x| future::ready(x % 2 == 0));
416     ///
417     /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await);
418     /// # });
419     /// ```
filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,420     fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
421     where
422         F: FnMut(&Self::Item) -> Fut,
423         Fut: Future<Output = bool>,
424         Self: Sized,
425     {
426         assert_stream::<Self::Item, _>(Filter::new(self, f))
427     }
428 
429     /// Filters the values produced by this stream while simultaneously mapping
430     /// them to a different type according to the provided asynchronous closure.
431     ///
432     /// As values of this stream are made available, the provided function will
433     /// be run on them. If the future returned by the predicate `f` resolves to
434     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
435     /// it resolves to [`None`] then the next value will be produced.
436     ///
437     /// Note that this function consumes the stream passed into it and returns a
438     /// wrapped version of it, similar to the existing `filter_map` methods in
439     /// the standard library.
440     ///
441     /// # Examples
442     /// ```
443     /// # futures::executor::block_on(async {
444     /// use futures::stream::{self, StreamExt};
445     ///
446     /// let stream = stream::iter(1..=10);
447     /// let events = stream.filter_map(|x| async move {
448     ///     if x % 2 == 0 { Some(x + 1) } else { None }
449     /// });
450     ///
451     /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await);
452     /// # });
453     /// ```
filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,454     fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
455     where
456         F: FnMut(Self::Item) -> Fut,
457         Fut: Future<Output = Option<T>>,
458         Self: Sized,
459     {
460         assert_stream::<T, _>(FilterMap::new(self, f))
461     }
462 
463     /// Computes from this stream's items new items of a different type using
464     /// an asynchronous closure.
465     ///
466     /// The provided closure `f` will be called with an `Item` once a value is
467     /// ready, it returns a future which will then be run to completion
468     /// to produce the next value on this stream.
469     ///
470     /// Note that this function consumes the stream passed into it and returns a
471     /// wrapped version of it.
472     ///
473     /// See [`StreamExt::map`](Self::map) if you want to use a closure that
474     /// returns a value instead of a future.
475     ///
476     /// # Examples
477     ///
478     /// ```
479     /// # futures::executor::block_on(async {
480     /// use futures::stream::{self, StreamExt};
481     ///
482     /// let stream = stream::iter(1..=3);
483     /// let stream = stream.then(|x| async move { x + 3 });
484     ///
485     /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
486     /// # });
487     /// ```
then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,488     fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
489     where
490         F: FnMut(Self::Item) -> Fut,
491         Fut: Future,
492         Self: Sized,
493     {
494         assert_stream::<Fut::Output, _>(Then::new(self, f))
495     }
496 
497     /// Transforms a stream into a collection, returning a
498     /// future representing the result of that computation.
499     ///
500     /// The returned future will be resolved when the stream terminates.
501     ///
502     /// # Examples
503     ///
504     /// ```
505     /// # futures::executor::block_on(async {
506     /// use futures::channel::mpsc;
507     /// use futures::stream::StreamExt;
508     /// use std::thread;
509     ///
510     /// let (tx, rx) = mpsc::unbounded();
511     ///
512     /// thread::spawn(move || {
513     ///     for i in 1..=5 {
514     ///         tx.unbounded_send(i).unwrap();
515     ///     }
516     /// });
517     ///
518     /// let output = rx.collect::<Vec<i32>>().await;
519     /// assert_eq!(output, vec![1, 2, 3, 4, 5]);
520     /// # });
521     /// ```
collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> where Self: Sized,522     fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C>
523     where
524         Self: Sized,
525     {
526         assert_future::<C, _>(Collect::new(self))
527     }
528 
529     /// Converts a stream of pairs into a future, which
530     /// resolves to pair of containers.
531     ///
532     /// `unzip()` produces a future, which resolves to two
533     /// collections: one from the left elements of the pairs,
534     /// and one from the right elements.
535     ///
536     /// The returned future will be resolved when the stream terminates.
537     ///
538     /// # Examples
539     ///
540     /// ```
541     /// # futures::executor::block_on(async {
542     /// use futures::channel::mpsc;
543     /// use futures::stream::StreamExt;
544     /// use std::thread;
545     ///
546     /// let (tx, rx) = mpsc::unbounded();
547     ///
548     /// thread::spawn(move || {
549     ///     tx.unbounded_send((1, 2)).unwrap();
550     ///     tx.unbounded_send((3, 4)).unwrap();
551     ///     tx.unbounded_send((5, 6)).unwrap();
552     /// });
553     ///
554     /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await;
555     /// assert_eq!(o1, vec![1, 3, 5]);
556     /// assert_eq!(o2, vec![2, 4, 6]);
557     /// # });
558     /// ```
unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Sized + Stream<Item = (A, B)>,559     fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
560     where
561         FromA: Default + Extend<A>,
562         FromB: Default + Extend<B>,
563         Self: Sized + Stream<Item = (A, B)>,
564     {
565         assert_future::<(FromA, FromB), _>(Unzip::new(self))
566     }
567 
568     /// Concatenate all items of a stream into a single extendable
569     /// destination, returning a future representing the end result.
570     ///
571     /// This combinator will extend the first item with the contents
572     /// of all the subsequent results of the stream. If the stream is
573     /// empty, the default value will be returned.
574     ///
575     /// Works with all collections that implement the
576     /// [`Extend`](std::iter::Extend) trait.
577     ///
578     /// # Examples
579     ///
580     /// ```
581     /// # futures::executor::block_on(async {
582     /// use futures::channel::mpsc;
583     /// use futures::stream::StreamExt;
584     /// use std::thread;
585     ///
586     /// let (tx, rx) = mpsc::unbounded();
587     ///
588     /// thread::spawn(move || {
589     ///     for i in (0..3).rev() {
590     ///         let n = i * 3;
591     ///         tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap();
592     ///     }
593     /// });
594     ///
595     /// let result = rx.concat().await;
596     ///
597     /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
598     /// # });
599     /// ```
concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,600     fn concat(self) -> Concat<Self>
601     where
602         Self: Sized,
603         Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
604     {
605         assert_future::<Self::Item, _>(Concat::new(self))
606     }
607 
608     /// Drives the stream to completion, counting the number of items.
609     ///
610     /// # Overflow Behavior
611     ///
612     /// The method does no guarding against overflows, so counting elements of a
613     /// stream with more than [`usize::MAX`] elements either produces the wrong
614     /// result or panics. If debug assertions are enabled, a panic is guaranteed.
615     ///
616     /// # Panics
617     ///
618     /// This function might panic if the iterator has more than [`usize::MAX`]
619     /// elements.
620     ///
621     /// # Examples
622     ///
623     /// ```
624     /// # futures::executor::block_on(async {
625     /// use futures::stream::{self, StreamExt};
626     ///
627     /// let stream = stream::iter(1..=10);
628     /// let count = stream.count().await;
629     ///
630     /// assert_eq!(count, 10);
631     /// # });
632     /// ```
count(self) -> Count<Self> where Self: Sized,633     fn count(self) -> Count<Self>
634     where
635         Self: Sized,
636     {
637         assert_future::<usize, _>(Count::new(self))
638     }
639 
640     /// Repeats a stream endlessly.
641     ///
642     /// The stream never terminates. Note that you likely want to avoid
643     /// usage of `collect` or such on the returned stream as it will exhaust
644     /// available memory as it tries to just fill up all RAM.
645     ///
646     /// # Examples
647     ///
648     /// ```
649     /// # futures::executor::block_on(async {
650     /// use futures::stream::{self, StreamExt};
651     /// let a = [1, 2, 3];
652     /// let mut s = stream::iter(a.iter()).cycle();
653     ///
654     /// assert_eq!(s.next().await, Some(&1));
655     /// assert_eq!(s.next().await, Some(&2));
656     /// assert_eq!(s.next().await, Some(&3));
657     /// assert_eq!(s.next().await, Some(&1));
658     /// assert_eq!(s.next().await, Some(&2));
659     /// assert_eq!(s.next().await, Some(&3));
660     /// assert_eq!(s.next().await, Some(&1));
661     /// # });
662     /// ```
cycle(self) -> Cycle<Self> where Self: Sized + Clone,663     fn cycle(self) -> Cycle<Self>
664     where
665         Self: Sized + Clone,
666     {
667         assert_stream::<Self::Item, _>(Cycle::new(self))
668     }
669 
670     /// Execute an accumulating asynchronous computation over a stream,
671     /// collecting all the values into one final result.
672     ///
673     /// This combinator will accumulate all values returned by this stream
674     /// according to the closure provided. The initial state is also provided to
675     /// this method and then is returned again by each execution of the closure.
676     /// Once the entire stream has been exhausted the returned future will
677     /// resolve to this value.
678     ///
679     /// # Examples
680     ///
681     /// ```
682     /// # futures::executor::block_on(async {
683     /// use futures::stream::{self, StreamExt};
684     ///
685     /// let number_stream = stream::iter(0..6);
686     /// let sum = number_stream.fold(0, |acc, x| async move { acc + x });
687     /// assert_eq!(sum.await, 15);
688     /// # });
689     /// ```
fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,690     fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
691     where
692         F: FnMut(T, Self::Item) -> Fut,
693         Fut: Future<Output = T>,
694         Self: Sized,
695     {
696         assert_future::<T, _>(Fold::new(self, f, init))
697     }
698 
699     /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate.
700     ///
701     /// # Examples
702     ///
703     /// ```
704     /// # futures::executor::block_on(async {
705     /// use futures::stream::{self, StreamExt};
706     ///
707     /// let number_stream = stream::iter(0..10);
708     /// let contain_three = number_stream.any(|i| async move { i == 3 });
709     /// assert_eq!(contain_three.await, true);
710     /// # });
711     /// ```
any<Fut, F>(self, f: F) -> Any<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,712     fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
713     where
714         F: FnMut(Self::Item) -> Fut,
715         Fut: Future<Output = bool>,
716         Self: Sized,
717     {
718         assert_future::<bool, _>(Any::new(self, f))
719     }
720 
721     /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate.
722     ///
723     /// # Examples
724     ///
725     /// ```
726     /// # futures::executor::block_on(async {
727     /// use futures::stream::{self, StreamExt};
728     ///
729     /// let number_stream = stream::iter(0..10);
730     /// let less_then_twenty = number_stream.all(|i| async move { i < 20 });
731     /// assert_eq!(less_then_twenty.await, true);
732     /// # });
733     /// ```
all<Fut, F>(self, f: F) -> All<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,734     fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
735     where
736         F: FnMut(Self::Item) -> Fut,
737         Fut: Future<Output = bool>,
738         Self: Sized,
739     {
740         assert_future::<bool, _>(All::new(self, f))
741     }
742 
743     /// Flattens a stream of streams into just one continuous stream.
744     ///
745     /// # Examples
746     ///
747     /// ```
748     /// # futures::executor::block_on(async {
749     /// use futures::channel::mpsc;
750     /// use futures::stream::StreamExt;
751     /// use std::thread;
752     ///
753     /// let (tx1, rx1) = mpsc::unbounded();
754     /// let (tx2, rx2) = mpsc::unbounded();
755     /// let (tx3, rx3) = mpsc::unbounded();
756     ///
757     /// thread::spawn(move || {
758     ///     tx1.unbounded_send(1).unwrap();
759     ///     tx1.unbounded_send(2).unwrap();
760     /// });
761     /// thread::spawn(move || {
762     ///     tx2.unbounded_send(3).unwrap();
763     ///     tx2.unbounded_send(4).unwrap();
764     /// });
765     /// thread::spawn(move || {
766     ///     tx3.unbounded_send(rx1).unwrap();
767     ///     tx3.unbounded_send(rx2).unwrap();
768     /// });
769     ///
770     /// let output = rx3.flatten().collect::<Vec<i32>>().await;
771     /// assert_eq!(output, vec![1, 2, 3, 4]);
772     /// # });
773     /// ```
flatten(self) -> Flatten<Self> where Self::Item: Stream, Self: Sized,774     fn flatten(self) -> Flatten<Self>
775     where
776         Self::Item: Stream,
777         Self: Sized,
778     {
779         assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
780     }
781 
782     /// Flattens a stream of streams into just one continuous stream. Polls
783     /// inner streams produced by the base stream concurrently.
784     ///
785     /// The only argument is an optional limit on the number of concurrently
786     /// polled streams. If this limit is not `None`, no more than `limit` streams
787     /// will be polled at the same time. The `limit` argument is of type
788     /// `Into<Option<usize>>`, and so can be provided as either `None`,
789     /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
790     /// no limit at all, and will have the same result as passing in `None`.
791     ///
792     /// # Examples
793     ///
794     /// ```
795     /// # futures::executor::block_on(async {
796     /// use futures::channel::mpsc;
797     /// use futures::stream::StreamExt;
798     /// use std::thread;
799     ///
800     /// let (tx1, rx1) = mpsc::unbounded();
801     /// let (tx2, rx2) = mpsc::unbounded();
802     /// let (tx3, rx3) = mpsc::unbounded();
803     ///
804     /// thread::spawn(move || {
805     ///     tx1.unbounded_send(1).unwrap();
806     ///     tx1.unbounded_send(2).unwrap();
807     /// });
808     /// thread::spawn(move || {
809     ///     tx2.unbounded_send(3).unwrap();
810     ///     tx2.unbounded_send(4).unwrap();
811     /// });
812     /// thread::spawn(move || {
813     ///     tx3.unbounded_send(rx1).unwrap();
814     ///     tx3.unbounded_send(rx2).unwrap();
815     /// });
816     ///
817     /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await;
818     /// output.sort();
819     ///
820     /// assert_eq!(output, vec![1, 2, 3, 4]);
821     /// # });
822     /// ```
823     #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
824     #[cfg(feature = "alloc")]
flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self> where Self::Item: Stream + Unpin, Self: Sized,825     fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
826     where
827         Self::Item: Stream + Unpin,
828         Self: Sized,
829     {
830         assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into()))
831     }
832 
833     /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
834     ///
835     /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
836     /// you would have to chain combinators like `.map(f).flatten()` while this
837     /// combinator provides ability to write `.flat_map(f)` instead of chaining.
838     ///
839     /// The provided closure which produces inner streams is executed over all elements
840     /// of stream as last inner stream is terminated and next stream item is available.
841     ///
842     /// Note that this function consumes the stream passed into it and returns a
843     /// wrapped version of it, similar to the existing `flat_map` methods in the
844     /// standard library.
845     ///
846     /// # Examples
847     ///
848     /// ```
849     /// # futures::executor::block_on(async {
850     /// use futures::stream::{self, StreamExt};
851     ///
852     /// let stream = stream::iter(1..=3);
853     /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x]));
854     ///
855     /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await);
856     /// # });
857     /// ```
flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,858     fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
859     where
860         F: FnMut(Self::Item) -> U,
861         U: Stream,
862         Self: Sized,
863     {
864         assert_stream::<U::Item, _>(FlatMap::new(self, f))
865     }
866 
867     /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s
868     /// and polls them concurrently, yielding items in any order, as they made
869     /// available.
870     ///
871     /// [`StreamExt::map`] is very useful, but if it produces `Stream`s
872     /// instead, and you need to poll all of them concurrently, you would
873     /// have to use something like `for_each_concurrent` and merge values
874     /// by hand. This combinator provides ability to collect all values
875     /// from concurrently polled streams into one stream.
876     ///
877     /// The first argument is an optional limit on the number of concurrently
878     /// polled streams. If this limit is not `None`, no more than `limit` streams
879     /// will be polled at the same time. The `limit` argument is of type
880     /// `Into<Option<usize>>`, and so can be provided as either `None`,
881     /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
882     /// no limit at all, and will have the same result as passing in `None`.
883     ///
884     /// The provided closure which produces inner streams is executed over
885     /// all elements of stream as next stream item is available and limit
886     /// of concurrently processed streams isn't exceeded.
887     ///
888     /// Note that this function consumes the stream passed into it and
889     /// returns a wrapped version of it.
890     ///
891     /// # Examples
892     ///
893     /// ```
894     /// # futures::executor::block_on(async {
895     /// use futures::stream::{self, StreamExt};
896     ///
897     /// let stream = stream::iter(1..5);
898     /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x]));
899     /// let mut values = stream.collect::<Vec<_>>().await;
900     /// values.sort();
901     ///
902     /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values);
903     /// # });
904     /// ```
905     #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
906     #[cfg(feature = "alloc")]
flat_map_unordered<U, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> FlatMapUnordered<Self, U, F> where U: Stream + Unpin, F: FnMut(Self::Item) -> U, Self: Sized,907     fn flat_map_unordered<U, F>(
908         self,
909         limit: impl Into<Option<usize>>,
910         f: F,
911     ) -> FlatMapUnordered<Self, U, F>
912     where
913         U: Stream + Unpin,
914         F: FnMut(Self::Item) -> U,
915         Self: Sized,
916     {
917         assert_stream::<U::Item, _>(FlatMapUnordered::new(self, limit.into(), f))
918     }
919 
920     /// Combinator similar to [`StreamExt::fold`] that holds internal state
921     /// and produces a new stream.
922     ///
923     /// Accepts initial state and closure which will be applied to each element
924     /// of the stream until provided closure returns `None`. Once `None` is
925     /// returned, stream will be terminated.
926     ///
927     /// # Examples
928     ///
929     /// ```
930     /// # futures::executor::block_on(async {
931     /// use futures::future;
932     /// use futures::stream::{self, StreamExt};
933     ///
934     /// let stream = stream::iter(1..=10);
935     ///
936     /// let stream = stream.scan(0, |state, x| {
937     ///     *state += x;
938     ///     future::ready(if *state < 10 { Some(x) } else { None })
939     /// });
940     ///
941     /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
942     /// # });
943     /// ```
scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,944     fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
945     where
946         F: FnMut(&mut S, Self::Item) -> Fut,
947         Fut: Future<Output = Option<B>>,
948         Self: Sized,
949     {
950         assert_stream::<B, _>(Scan::new(self, initial_state, f))
951     }
952 
953     /// Skip elements on this stream while the provided asynchronous predicate
954     /// resolves to `true`.
955     ///
956     /// This function, like `Iterator::skip_while`, will skip elements on the
957     /// stream until the predicate `f` resolves to `false`. Once one element
958     /// returns `false`, all future elements will be returned from the underlying
959     /// stream.
960     ///
961     /// # Examples
962     ///
963     /// ```
964     /// # futures::executor::block_on(async {
965     /// use futures::future;
966     /// use futures::stream::{self, StreamExt};
967     ///
968     /// let stream = stream::iter(1..=10);
969     ///
970     /// let stream = stream.skip_while(|x| future::ready(*x <= 5));
971     ///
972     /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
973     /// # });
974     /// ```
skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,975     fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
976     where
977         F: FnMut(&Self::Item) -> Fut,
978         Fut: Future<Output = bool>,
979         Self: Sized,
980     {
981         assert_stream::<Self::Item, _>(SkipWhile::new(self, f))
982     }
983 
984     /// Take elements from this stream while the provided asynchronous predicate
985     /// resolves to `true`.
986     ///
987     /// This function, like `Iterator::take_while`, will take elements from the
988     /// stream until the predicate `f` resolves to `false`. Once one element
989     /// returns `false`, it will always return that the stream is done.
990     ///
991     /// # Examples
992     ///
993     /// ```
994     /// # futures::executor::block_on(async {
995     /// use futures::future;
996     /// use futures::stream::{self, StreamExt};
997     ///
998     /// let stream = stream::iter(1..=10);
999     ///
1000     /// let stream = stream.take_while(|x| future::ready(*x <= 5));
1001     ///
1002     /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
1003     /// # });
1004     /// ```
take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,1005     fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
1006     where
1007         F: FnMut(&Self::Item) -> Fut,
1008         Fut: Future<Output = bool>,
1009         Self: Sized,
1010     {
1011         assert_stream::<Self::Item, _>(TakeWhile::new(self, f))
1012     }
1013 
1014     /// Take elements from this stream until the provided future resolves.
1015     ///
1016     /// This function will take elements from the stream until the provided
1017     /// stopping future `fut` resolves. Once the `fut` future becomes ready,
1018     /// this stream combinator will always return that the stream is done.
1019     ///
1020     /// The stopping future may return any type. Once the stream is stopped
1021     /// the result of the stopping future may be accessed with `TakeUntil::take_result()`.
1022     /// The stream may also be resumed with `TakeUntil::take_future()`.
1023     /// See the documentation of [`TakeUntil`] for more information.
1024     ///
1025     /// # Examples
1026     ///
1027     /// ```
1028     /// # futures::executor::block_on(async {
1029     /// use futures::future;
1030     /// use futures::stream::{self, StreamExt};
1031     /// use futures::task::Poll;
1032     ///
1033     /// let stream = stream::iter(1..=10);
1034     ///
1035     /// let mut i = 0;
1036     /// let stop_fut = future::poll_fn(|_cx| {
1037     ///     i += 1;
1038     ///     if i <= 5 {
1039     ///         Poll::Pending
1040     ///     } else {
1041     ///         Poll::Ready(())
1042     ///     }
1043     /// });
1044     ///
1045     /// let stream = stream.take_until(stop_fut);
1046     ///
1047     /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
1048     /// # });
1049     /// ```
take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where Fut: Future, Self: Sized,1050     fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
1051     where
1052         Fut: Future,
1053         Self: Sized,
1054     {
1055         assert_stream::<Self::Item, _>(TakeUntil::new(self, fut))
1056     }
1057 
1058     /// Runs this stream to completion, executing the provided asynchronous
1059     /// closure for each element on the stream.
1060     ///
1061     /// The closure provided will be called for each item this stream produces,
1062     /// yielding a future. That future will then be executed to completion
1063     /// before moving on to the next item.
1064     ///
1065     /// The returned value is a `Future` where the `Output` type is `()`; it is
1066     /// executed entirely for its side effects.
1067     ///
1068     /// To process each item in the stream and produce another stream instead
1069     /// of a single future, use `then` instead.
1070     ///
1071     /// # Examples
1072     ///
1073     /// ```
1074     /// # futures::executor::block_on(async {
1075     /// use futures::future;
1076     /// use futures::stream::{self, StreamExt};
1077     ///
1078     /// let mut x = 0;
1079     ///
1080     /// {
1081     ///     let fut = stream::repeat(1).take(3).for_each(|item| {
1082     ///         x += item;
1083     ///         future::ready(())
1084     ///     });
1085     ///     fut.await;
1086     /// }
1087     ///
1088     /// assert_eq!(x, 3);
1089     /// # });
1090     /// ```
for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,1091     fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
1092     where
1093         F: FnMut(Self::Item) -> Fut,
1094         Fut: Future<Output = ()>,
1095         Self: Sized,
1096     {
1097         assert_future::<(), _>(ForEach::new(self, f))
1098     }
1099 
1100     /// Runs this stream to completion, executing the provided asynchronous
1101     /// closure for each element on the stream concurrently as elements become
1102     /// available.
1103     ///
1104     /// This is similar to [`StreamExt::for_each`], but the futures
1105     /// produced by the closure are run concurrently (but not in parallel--
1106     /// this combinator does not introduce any threads).
1107     ///
1108     /// The closure provided will be called for each item this stream produces,
1109     /// yielding a future. That future will then be executed to completion
1110     /// concurrently with the other futures produced by the closure.
1111     ///
1112     /// The first argument is an optional limit on the number of concurrent
1113     /// futures. If this limit is not `None`, no more than `limit` futures
1114     /// will be run concurrently. The `limit` argument is of type
1115     /// `Into<Option<usize>>`, and so can be provided as either `None`,
1116     /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
1117     /// no limit at all, and will have the same result as passing in `None`.
1118     ///
1119     /// This method is only available when the `std` or `alloc` feature of this
1120     /// library is activated, and it is activated by default.
1121     ///
1122     /// # Examples
1123     ///
1124     /// ```
1125     /// # futures::executor::block_on(async {
1126     /// use futures::channel::oneshot;
1127     /// use futures::stream::{self, StreamExt};
1128     ///
1129     /// let (tx1, rx1) = oneshot::channel();
1130     /// let (tx2, rx2) = oneshot::channel();
1131     /// let (tx3, rx3) = oneshot::channel();
1132     ///
1133     /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent(
1134     ///     /* limit */ 2,
1135     ///     |rx| async move {
1136     ///         rx.await.unwrap();
1137     ///     }
1138     /// );
1139     /// tx1.send(()).unwrap();
1140     /// tx2.send(()).unwrap();
1141     /// tx3.send(()).unwrap();
1142     /// fut.await;
1143     /// # })
1144     /// ```
1145     #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1146     #[cfg(feature = "alloc")]
for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,1147     fn for_each_concurrent<Fut, F>(
1148         self,
1149         limit: impl Into<Option<usize>>,
1150         f: F,
1151     ) -> ForEachConcurrent<Self, Fut, F>
1152     where
1153         F: FnMut(Self::Item) -> Fut,
1154         Fut: Future<Output = ()>,
1155         Self: Sized,
1156     {
1157         assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f))
1158     }
1159 
1160     /// Creates a new stream of at most `n` items of the underlying stream.
1161     ///
1162     /// Once `n` items have been yielded from this stream then it will always
1163     /// return that the stream is done.
1164     ///
1165     /// # Examples
1166     ///
1167     /// ```
1168     /// # futures::executor::block_on(async {
1169     /// use futures::stream::{self, StreamExt};
1170     ///
1171     /// let stream = stream::iter(1..=10).take(3);
1172     ///
1173     /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
1174     /// # });
1175     /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,1176     fn take(self, n: usize) -> Take<Self>
1177     where
1178         Self: Sized,
1179     {
1180         assert_stream::<Self::Item, _>(Take::new(self, n))
1181     }
1182 
1183     /// Creates a new stream which skips `n` items of the underlying stream.
1184     ///
1185     /// Once `n` items have been skipped from this stream then it will always
1186     /// return the remaining items on this stream.
1187     ///
1188     /// # Examples
1189     ///
1190     /// ```
1191     /// # futures::executor::block_on(async {
1192     /// use futures::stream::{self, StreamExt};
1193     ///
1194     /// let stream = stream::iter(1..=10).skip(5);
1195     ///
1196     /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
1197     /// # });
1198     /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,1199     fn skip(self, n: usize) -> Skip<Self>
1200     where
1201         Self: Sized,
1202     {
1203         assert_stream::<Self::Item, _>(Skip::new(self, n))
1204     }
1205 
1206     /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never
1207     /// again be called once it has finished. This method can be used to turn
1208     /// any `Stream` into a `FusedStream`.
1209     ///
1210     /// Normally, once a stream has returned [`None`] from
1211     /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad
1212     /// behavior such as block forever, panic, never return, etc. If it is known
1213     /// that [`poll_next`](Stream::poll_next) may be called after stream
1214     /// has already finished, then this method can be used to ensure that it has
1215     /// defined semantics.
1216     ///
1217     /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream
1218     /// is guaranteed to return [`None`] after the underlying stream has
1219     /// finished.
1220     ///
1221     /// # Examples
1222     ///
1223     /// ```
1224     /// use futures::executor::block_on_stream;
1225     /// use futures::stream::{self, StreamExt};
1226     /// use futures::task::Poll;
1227     ///
1228     /// let mut x = 0;
1229     /// let stream = stream::poll_fn(|_| {
1230     ///     x += 1;
1231     ///     match x {
1232     ///         0..=2 => Poll::Ready(Some(x)),
1233     ///         3 => Poll::Ready(None),
1234     ///         _ => panic!("should not happen")
1235     ///     }
1236     /// }).fuse();
1237     ///
1238     /// let mut iter = block_on_stream(stream);
1239     /// assert_eq!(Some(1), iter.next());
1240     /// assert_eq!(Some(2), iter.next());
1241     /// assert_eq!(None, iter.next());
1242     /// assert_eq!(None, iter.next());
1243     /// // ...
1244     /// ```
fuse(self) -> Fuse<Self> where Self: Sized,1245     fn fuse(self) -> Fuse<Self>
1246     where
1247         Self: Sized,
1248     {
1249         assert_stream::<Self::Item, _>(Fuse::new(self))
1250     }
1251 
1252     /// Borrows a stream, rather than consuming it.
1253     ///
1254     /// This is useful to allow applying stream adaptors while still retaining
1255     /// ownership of the original stream.
1256     ///
1257     /// # Examples
1258     ///
1259     /// ```
1260     /// # futures::executor::block_on(async {
1261     /// use futures::stream::{self, StreamExt};
1262     ///
1263     /// let mut stream = stream::iter(1..5);
1264     ///
1265     /// let sum = stream.by_ref()
1266     ///                 .take(2)
1267     ///                 .fold(0, |a, b| async move { a + b })
1268     ///                 .await;
1269     /// assert_eq!(sum, 3);
1270     ///
1271     /// // You can use the stream again
1272     /// let sum = stream.take(2)
1273     ///                 .fold(0, |a, b| async move { a + b })
1274     ///                 .await;
1275     /// assert_eq!(sum, 7);
1276     /// # });
1277     /// ```
by_ref(&mut self) -> &mut Self1278     fn by_ref(&mut self) -> &mut Self {
1279         self
1280     }
1281 
1282     /// Catches unwinding panics while polling the stream.
1283     ///
1284     /// Caught panic (if any) will be the last element of the resulting stream.
1285     ///
1286     /// In general, panics within a stream can propagate all the way out to the
1287     /// task level. This combinator makes it possible to halt unwinding within
1288     /// the stream itself. It's most commonly used within task executors. This
1289     /// method should not be used for error handling.
1290     ///
1291     /// Note that this method requires the `UnwindSafe` bound from the standard
1292     /// library. This isn't always applied automatically, and the standard
1293     /// library provides an `AssertUnwindSafe` wrapper type to apply it
1294     /// after-the fact. To assist using this method, the [`Stream`] trait is
1295     /// also implemented for `AssertUnwindSafe<St>` where `St` implements
1296     /// [`Stream`].
1297     ///
1298     /// This method is only available when the `std` feature of this
1299     /// library is activated, and it is activated by default.
1300     ///
1301     /// # Examples
1302     ///
1303     /// ```
1304     /// # futures::executor::block_on(async {
1305     /// use futures::stream::{self, StreamExt};
1306     ///
1307     /// let stream = stream::iter(vec![Some(10), None, Some(11)]);
1308     /// // Panic on second element
1309     /// let stream_panicking = stream.map(|o| o.unwrap());
1310     /// // Collect all the results
1311     /// let stream = stream_panicking.catch_unwind();
1312     ///
1313     /// let results: Vec<Result<i32, _>> = stream.collect().await;
1314     /// match results[0] {
1315     ///     Ok(10) => {}
1316     ///     _ => panic!("unexpected result!"),
1317     /// }
1318     /// assert!(results[1].is_err());
1319     /// assert_eq!(results.len(), 2);
1320     /// # });
1321     /// ```
1322     #[cfg(feature = "std")]
catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe,1323     fn catch_unwind(self) -> CatchUnwind<Self>
1324     where
1325         Self: Sized + std::panic::UnwindSafe,
1326     {
1327         assert_stream(CatchUnwind::new(self))
1328     }
1329 
1330     /// Wrap the stream in a Box, pinning it.
1331     ///
1332     /// This method is only available when the `std` or `alloc` feature of this
1333     /// library is activated, and it is activated by default.
1334     #[cfg(feature = "alloc")]
boxed<'a>(self) -> BoxStream<'a, Self::Item> where Self: Sized + Send + 'a,1335     fn boxed<'a>(self) -> BoxStream<'a, Self::Item>
1336     where
1337         Self: Sized + Send + 'a,
1338     {
1339         assert_stream::<Self::Item, _>(Box::pin(self))
1340     }
1341 
1342     /// Wrap the stream in a Box, pinning it.
1343     ///
1344     /// Similar to `boxed`, but without the `Send` requirement.
1345     ///
1346     /// This method is only available when the `std` or `alloc` feature of this
1347     /// library is activated, and it is activated by default.
1348     #[cfg(feature = "alloc")]
boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> where Self: Sized + 'a,1349     fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item>
1350     where
1351         Self: Sized + 'a,
1352     {
1353         assert_stream::<Self::Item, _>(Box::pin(self))
1354     }
1355 
1356     /// An adaptor for creating a buffered list of pending futures.
1357     ///
1358     /// If this stream's item can be converted into a future, then this adaptor
1359     /// will buffer up to at most `n` futures and then return the outputs in the
1360     /// same order as the underlying stream. No more than `n` futures will be
1361     /// buffered at any point in time, and less than `n` may also be buffered
1362     /// depending on the state of each future.
1363     ///
1364     /// The returned stream will be a stream of each future's output.
1365     ///
1366     /// This method is only available when the `std` or `alloc` feature of this
1367     /// library is activated, and it is activated by default.
1368     #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1369     #[cfg(feature = "alloc")]
buffered(self, n: usize) -> Buffered<Self> where Self::Item: Future, Self: Sized,1370     fn buffered(self, n: usize) -> Buffered<Self>
1371     where
1372         Self::Item: Future,
1373         Self: Sized,
1374     {
1375         assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n))
1376     }
1377 
1378     /// An adaptor for creating a buffered list of pending futures (unordered).
1379     ///
1380     /// If this stream's item can be converted into a future, then this adaptor
1381     /// will buffer up to `n` futures and then return the outputs in the order
1382     /// in which they complete. No more than `n` futures will be buffered at
1383     /// any point in time, and less than `n` may also be buffered depending on
1384     /// the state of each future.
1385     ///
1386     /// The returned stream will be a stream of each future's output.
1387     ///
1388     /// This method is only available when the `std` or `alloc` feature of this
1389     /// library is activated, and it is activated by default.
1390     ///
1391     /// # Examples
1392     ///
1393     /// ```
1394     /// # futures::executor::block_on(async {
1395     /// use futures::channel::oneshot;
1396     /// use futures::stream::{self, StreamExt};
1397     ///
1398     /// let (send_one, recv_one) = oneshot::channel();
1399     /// let (send_two, recv_two) = oneshot::channel();
1400     ///
1401     /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
1402     /// let mut buffered = stream_of_futures.buffer_unordered(10);
1403     ///
1404     /// send_two.send(2i32)?;
1405     /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1406     ///
1407     /// send_one.send(1i32)?;
1408     /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1409     ///
1410     /// assert_eq!(buffered.next().await, None);
1411     /// # Ok::<(), i32>(()) }).unwrap();
1412     /// ```
1413     #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1414     #[cfg(feature = "alloc")]
buffer_unordered(self, n: usize) -> BufferUnordered<Self> where Self::Item: Future, Self: Sized,1415     fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
1416     where
1417         Self::Item: Future,
1418         Self: Sized,
1419     {
1420         assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n))
1421     }
1422 
1423     /// An adapter for zipping two streams together.
1424     ///
1425     /// The zipped stream waits for both streams to produce an item, and then
1426     /// returns that pair. If either stream ends then the zipped stream will
1427     /// also end.
1428     ///
1429     /// # Examples
1430     ///
1431     /// ```
1432     /// # futures::executor::block_on(async {
1433     /// use futures::stream::{self, StreamExt};
1434     ///
1435     /// let stream1 = stream::iter(1..=3);
1436     /// let stream2 = stream::iter(5..=10);
1437     ///
1438     /// let vec = stream1.zip(stream2)
1439     ///                  .collect::<Vec<_>>()
1440     ///                  .await;
1441     /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec);
1442     /// # });
1443     /// ```
1444     ///
zip<St>(self, other: St) -> Zip<Self, St> where St: Stream, Self: Sized,1445     fn zip<St>(self, other: St) -> Zip<Self, St>
1446     where
1447         St: Stream,
1448         Self: Sized,
1449     {
1450         assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other))
1451     }
1452 
1453     /// Adapter for chaining two streams.
1454     ///
1455     /// The resulting stream emits elements from the first stream, and when
1456     /// first stream reaches the end, emits the elements from the second stream.
1457     ///
1458     /// ```
1459     /// # futures::executor::block_on(async {
1460     /// use futures::stream::{self, StreamExt};
1461     ///
1462     /// let stream1 = stream::iter(vec![Ok(10), Err(false)]);
1463     /// let stream2 = stream::iter(vec![Err(true), Ok(20)]);
1464     ///
1465     /// let stream = stream1.chain(stream2);
1466     ///
1467     /// let result: Vec<_> = stream.collect().await;
1468     /// assert_eq!(result, vec![
1469     ///     Ok(10),
1470     ///     Err(false),
1471     ///     Err(true),
1472     ///     Ok(20),
1473     /// ]);
1474     /// # });
1475     /// ```
chain<St>(self, other: St) -> Chain<Self, St> where St: Stream<Item = Self::Item>, Self: Sized,1476     fn chain<St>(self, other: St) -> Chain<Self, St>
1477     where
1478         St: Stream<Item = Self::Item>,
1479         Self: Sized,
1480     {
1481         assert_stream::<Self::Item, _>(Chain::new(self, other))
1482     }
1483 
1484     /// Creates a new stream which exposes a `peek` method.
1485     ///
1486     /// Calling `peek` returns a reference to the next item in the stream.
peekable(self) -> Peekable<Self> where Self: Sized,1487     fn peekable(self) -> Peekable<Self>
1488     where
1489         Self: Sized,
1490     {
1491         assert_stream::<Self::Item, _>(Peekable::new(self))
1492     }
1493 
1494     /// An adaptor for chunking up items of the stream inside a vector.
1495     ///
1496     /// This combinator will attempt to pull items from this stream and buffer
1497     /// them into a local vector. At most `capacity` items will get buffered
1498     /// before they're yielded from the returned stream.
1499     ///
1500     /// Note that the vectors returned from this iterator may not always have
1501     /// `capacity` elements. If the underlying stream ended and only a partial
1502     /// vector was created, it'll be returned. Additionally if an error happens
1503     /// from the underlying stream then the currently buffered items will be
1504     /// yielded.
1505     ///
1506     /// This method is only available when the `std` or `alloc` feature of this
1507     /// library is activated, and it is activated by default.
1508     ///
1509     /// # Panics
1510     ///
1511     /// This method will panic if `capacity` is zero.
1512     #[cfg(feature = "alloc")]
chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized,1513     fn chunks(self, capacity: usize) -> Chunks<Self>
1514     where
1515         Self: Sized,
1516     {
1517         assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity))
1518     }
1519 
1520     /// An adaptor for chunking up ready items of the stream inside a vector.
1521     ///
1522     /// This combinator will attempt to pull ready items from this stream and
1523     /// buffer them into a local vector. At most `capacity` items will get
1524     /// buffered before they're yielded from the returned stream. If underlying
1525     /// stream returns `Poll::Pending`, and collected chunk is not empty, it will
1526     /// be immediately returned.
1527     ///
1528     /// If the underlying stream ended and only a partial vector was created,
1529     /// it will be returned.
1530     ///
1531     /// This method is only available when the `std` or `alloc` feature of this
1532     /// library is activated, and it is activated by default.
1533     ///
1534     /// # Panics
1535     ///
1536     /// This method will panic if `capacity` is zero.
1537     #[cfg(feature = "alloc")]
ready_chunks(self, capacity: usize) -> ReadyChunks<Self> where Self: Sized,1538     fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
1539     where
1540         Self: Sized,
1541     {
1542         assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity))
1543     }
1544 
1545     /// A future that completes after the given stream has been fully processed
1546     /// into the sink and the sink has been flushed and closed.
1547     ///
1548     /// This future will drive the stream to keep producing items until it is
1549     /// exhausted, sending each item to the sink. It will complete once the
1550     /// stream is exhausted, the sink has received and flushed all items, and
1551     /// the sink is closed. Note that neither the original stream nor provided
1552     /// sink will be output by this future. Pass the sink by `Pin<&mut S>`
1553     /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in
1554     /// order to preserve access to the `Sink`. If the stream produces an error,
1555     /// that error will be returned by this future without flushing/closing the sink.
1556     #[cfg(feature = "sink")]
1557     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized,1558     fn forward<S>(self, sink: S) -> Forward<Self, S>
1559     where
1560         S: Sink<Self::Ok, Error = Self::Error>,
1561         Self: TryStream + Sized,
1562         // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>,
1563     {
1564         // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>`
1565         // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink))
1566         Forward::new(self, sink)
1567     }
1568 
1569     /// Splits this `Stream + Sink` object into separate `Sink` and `Stream`
1570     /// objects.
1571     ///
1572     /// This can be useful when you want to split ownership between tasks, or
1573     /// allow direct interaction between the two objects (e.g. via
1574     /// `Sink::send_all`).
1575     ///
1576     /// This method is only available when the `std` or `alloc` feature of this
1577     /// library is activated, and it is activated by default.
1578     #[cfg(feature = "sink")]
1579     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
1580     #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1581     #[cfg(feature = "alloc")]
split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where Self: Sink<Item> + Sized,1582     fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
1583     where
1584         Self: Sink<Item> + Sized,
1585     {
1586         let (sink, stream) = split::split(self);
1587         (
1588             crate::sink::assert_sink::<Item, Self::Error, _>(sink),
1589             assert_stream::<Self::Item, _>(stream),
1590         )
1591     }
1592 
1593     /// Do something with each item of this stream, afterwards passing it on.
1594     ///
1595     /// This is similar to the `Iterator::inspect` method in the standard
1596     /// library where it allows easily inspecting each value as it passes
1597     /// through the stream, for example to debug what's going on.
inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1598     fn inspect<F>(self, f: F) -> Inspect<Self, F>
1599     where
1600         F: FnMut(&Self::Item),
1601         Self: Sized,
1602     {
1603         assert_stream::<Self::Item, _>(Inspect::new(self, f))
1604     }
1605 
1606     /// Wrap this stream in an `Either` stream, making it the left-hand variant
1607     /// of that `Either`.
1608     ///
1609     /// This can be used in combination with the `right_stream` method to write `if`
1610     /// statements that evaluate to different streams in different branches.
left_stream<B>(self) -> Either<Self, B> where B: Stream<Item = Self::Item>, Self: Sized,1611     fn left_stream<B>(self) -> Either<Self, B>
1612     where
1613         B: Stream<Item = Self::Item>,
1614         Self: Sized,
1615     {
1616         assert_stream::<Self::Item, _>(Either::Left(self))
1617     }
1618 
1619     /// Wrap this stream in an `Either` stream, making it the right-hand variant
1620     /// of that `Either`.
1621     ///
1622     /// This can be used in combination with the `left_stream` method to write `if`
1623     /// statements that evaluate to different streams in different branches.
right_stream<B>(self) -> Either<B, Self> where B: Stream<Item = Self::Item>, Self: Sized,1624     fn right_stream<B>(self) -> Either<B, Self>
1625     where
1626         B: Stream<Item = Self::Item>,
1627         Self: Sized,
1628     {
1629         assert_stream::<Self::Item, _>(Either::Right(self))
1630     }
1631 
1632     /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`]
1633     /// stream types.
poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,1634     fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
1635     where
1636         Self: Unpin,
1637     {
1638         Pin::new(self).poll_next(cx)
1639     }
1640 
1641     /// Returns a [`Future`] that resolves when the next item in this stream is
1642     /// ready.
1643     ///
1644     /// This is similar to the [`next`][StreamExt::next] method, but it won't
1645     /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the
1646     /// returned future type will return `true` from
1647     /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing
1648     /// [`select_next_some`][StreamExt::select_next_some] to be easily used with
1649     /// the [`select!`] macro.
1650     ///
1651     /// If the future is polled after this [`Stream`] is empty it will panic.
1652     /// Using the future with a [`FusedFuture`][]-aware primitive like the
1653     /// [`select!`] macro will prevent this.
1654     ///
1655     /// [`FusedFuture`]: futures_core::future::FusedFuture
1656     /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated
1657     ///
1658     /// # Examples
1659     ///
1660     /// ```
1661     /// # futures::executor::block_on(async {
1662     /// use futures::{future, select};
1663     /// use futures::stream::{StreamExt, FuturesUnordered};
1664     ///
1665     /// let mut fut = future::ready(1);
1666     /// let mut async_tasks = FuturesUnordered::new();
1667     /// let mut total = 0;
1668     /// loop {
1669     ///     select! {
1670     ///         num = fut => {
1671     ///             // First, the `ready` future completes.
1672     ///             total += num;
1673     ///             // Then we spawn a new task onto `async_tasks`,
1674     ///             async_tasks.push(async { 5 });
1675     ///         },
1676     ///         // On the next iteration of the loop, the task we spawned
1677     ///         // completes.
1678     ///         num = async_tasks.select_next_some() => {
1679     ///             total += num;
1680     ///         }
1681     ///         // Finally, both the `ready` future and `async_tasks` have
1682     ///         // finished, so we enter the `complete` branch.
1683     ///         complete => break,
1684     ///     }
1685     /// }
1686     /// assert_eq!(total, 6);
1687     /// # });
1688     /// ```
1689     ///
1690     /// [`select!`]: crate::select
select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream,1691     fn select_next_some(&mut self) -> SelectNextSome<'_, Self>
1692     where
1693         Self: Unpin + FusedStream,
1694     {
1695         assert_future::<Self::Item, _>(SelectNextSome::new(self))
1696     }
1697 }
1698