1 //! Streams
2 //!
3 //! This module contains a number of functions for working with `Streams`s
4 //! that return `Result`s, allowing for short-circuiting computations.
5 
6 #[cfg(feature = "compat")]
7 use crate::compat::Compat;
8 use crate::fns::{
9     inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn,
10     IntoFn, MapErrFn, MapOkFn,
11 };
12 use crate::future::assert_future;
13 use crate::stream::assert_stream;
14 use crate::stream::{Inspect, Map};
15 #[cfg(feature = "alloc")]
16 use alloc::vec::Vec;
17 use core::pin::Pin;
18 
19 use futures_core::{
20     future::{Future, TryFuture},
21     stream::TryStream,
22     task::{Context, Poll},
23 };
24 
25 mod and_then;
26 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
27 pub use self::and_then::AndThen;
28 
29 delegate_all!(
30     /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
31     ErrInto<St, E>(
32         MapErr<St, IntoFn<E>>
33     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
34 );
35 
36 delegate_all!(
37     /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
38     InspectOk<St, F>(
39         Inspect<IntoStream<St>, InspectOkFn<F>>
40     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
41 );
42 
43 delegate_all!(
44     /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
45     InspectErr<St, F>(
46         Inspect<IntoStream<St>, InspectErrFn<F>>
47     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
48 );
49 
50 mod into_stream;
51 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
52 pub use self::into_stream::IntoStream;
53 
54 delegate_all!(
55     /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
56     MapOk<St, F>(
57         Map<IntoStream<St>, MapOkFn<F>>
58     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
59 );
60 
61 delegate_all!(
62     /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
63     MapErr<St, F>(
64         Map<IntoStream<St>, MapErrFn<F>>
65     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
66 );
67 
68 mod or_else;
69 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
70 pub use self::or_else::OrElse;
71 
72 mod try_next;
73 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
74 pub use self::try_next::TryNext;
75 
76 mod try_for_each;
77 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
78 pub use self::try_for_each::TryForEach;
79 
80 mod try_filter;
81 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
82 pub use self::try_filter::TryFilter;
83 
84 mod try_filter_map;
85 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
86 pub use self::try_filter_map::TryFilterMap;
87 
88 mod try_flatten;
89 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
90 pub use self::try_flatten::TryFlatten;
91 
92 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
93 #[cfg(feature = "alloc")]
94 mod try_flatten_unordered;
95 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
96 #[cfg(feature = "alloc")]
97 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
98 pub use self::try_flatten_unordered::TryFlattenUnordered;
99 
100 mod try_collect;
101 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
102 pub use self::try_collect::TryCollect;
103 
104 mod try_concat;
105 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
106 pub use self::try_concat::TryConcat;
107 
108 #[cfg(feature = "alloc")]
109 mod try_chunks;
110 #[cfg(feature = "alloc")]
111 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
112 pub use self::try_chunks::{TryChunks, TryChunksError};
113 
114 #[cfg(feature = "alloc")]
115 mod try_ready_chunks;
116 #[cfg(feature = "alloc")]
117 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
118 pub use self::try_ready_chunks::{TryReadyChunks, TryReadyChunksError};
119 
120 mod try_fold;
121 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
122 pub use self::try_fold::TryFold;
123 
124 mod try_unfold;
125 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
126 pub use self::try_unfold::{try_unfold, TryUnfold};
127 
128 mod try_skip_while;
129 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
130 pub use self::try_skip_while::TrySkipWhile;
131 
132 mod try_take_while;
133 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
134 pub use self::try_take_while::TryTakeWhile;
135 
136 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
137 #[cfg(feature = "alloc")]
138 mod try_buffer_unordered;
139 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
140 #[cfg(feature = "alloc")]
141 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
142 pub use self::try_buffer_unordered::TryBufferUnordered;
143 
144 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
145 #[cfg(feature = "alloc")]
146 mod try_buffered;
147 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
148 #[cfg(feature = "alloc")]
149 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
150 pub use self::try_buffered::TryBuffered;
151 
152 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
153 #[cfg(feature = "alloc")]
154 mod try_for_each_concurrent;
155 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
156 #[cfg(feature = "alloc")]
157 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
158 pub use self::try_for_each_concurrent::TryForEachConcurrent;
159 
160 #[cfg(feature = "io")]
161 #[cfg(feature = "std")]
162 mod into_async_read;
163 #[cfg(feature = "io")]
164 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
165 #[cfg(feature = "std")]
166 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
167 pub use self::into_async_read::IntoAsyncRead;
168 
169 mod try_all;
170 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
171 pub use self::try_all::TryAll;
172 
173 mod try_any;
174 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
175 pub use self::try_any::TryAny;
176 
177 impl<S: ?Sized + TryStream> TryStreamExt for S {}
178 
179 /// Adapters specific to `Result`-returning streams
180 pub trait TryStreamExt: TryStream {
181     /// Wraps the current stream in a new stream which converts the error type
182     /// into the one provided.
183     ///
184     /// # Examples
185     ///
186     /// ```
187     /// # futures::executor::block_on(async {
188     /// use futures::stream::{self, TryStreamExt};
189     ///
190     /// let mut stream =
191     ///     stream::iter(vec![Ok(()), Err(5i32)])
192     ///         .err_into::<i64>();
193     ///
194     /// assert_eq!(stream.try_next().await, Ok(Some(())));
195     /// assert_eq!(stream.try_next().await, Err(5i64));
196     /// # })
197     /// ```
err_into<E>(self) -> ErrInto<Self, E> where Self: Sized, Self::Error: Into<E>,198     fn err_into<E>(self) -> ErrInto<Self, E>
199     where
200         Self: Sized,
201         Self::Error: Into<E>,
202     {
203         assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
204     }
205 
206     /// Wraps the current stream in a new stream which maps the success value
207     /// using the provided closure.
208     ///
209     /// # Examples
210     ///
211     /// ```
212     /// # futures::executor::block_on(async {
213     /// use futures::stream::{self, TryStreamExt};
214     ///
215     /// let mut stream =
216     ///     stream::iter(vec![Ok(5), Err(0)])
217     ///         .map_ok(|x| x + 2);
218     ///
219     /// assert_eq!(stream.try_next().await, Ok(Some(7)));
220     /// assert_eq!(stream.try_next().await, Err(0));
221     /// # })
222     /// ```
map_ok<T, F>(self, f: F) -> MapOk<Self, F> where Self: Sized, F: FnMut(Self::Ok) -> T,223     fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
224     where
225         Self: Sized,
226         F: FnMut(Self::Ok) -> T,
227     {
228         assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
229     }
230 
231     /// Wraps the current stream in a new stream which maps the error value
232     /// using the provided closure.
233     ///
234     /// # Examples
235     ///
236     /// ```
237     /// # futures::executor::block_on(async {
238     /// use futures::stream::{self, TryStreamExt};
239     ///
240     /// let mut stream =
241     ///     stream::iter(vec![Ok(5), Err(0)])
242     ///         .map_err(|x| x + 2);
243     ///
244     /// assert_eq!(stream.try_next().await, Ok(Some(5)));
245     /// assert_eq!(stream.try_next().await, Err(2));
246     /// # })
247     /// ```
map_err<E, F>(self, f: F) -> MapErr<Self, F> where Self: Sized, F: FnMut(Self::Error) -> E,248     fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
249     where
250         Self: Sized,
251         F: FnMut(Self::Error) -> E,
252     {
253         assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
254     }
255 
256     /// Chain on a computation for when a value is ready, passing the successful
257     /// results to the provided closure `f`.
258     ///
259     /// This function can be used to run a unit of work when the next successful
260     /// value on a stream is ready. The closure provided will be yielded a value
261     /// when ready, and the returned future will then be run to completion to
262     /// produce the next value on this stream.
263     ///
264     /// Any errors produced by this stream will not be passed to the closure,
265     /// and will be passed through.
266     ///
267     /// The returned value of the closure must implement the `TryFuture` trait
268     /// and can represent some more work to be done before the composed stream
269     /// is finished.
270     ///
271     /// Note that this function consumes the receiving stream and returns a
272     /// wrapped version of it.
273     ///
274     /// To process the entire stream and return a single future representing
275     /// success or error, use `try_for_each` instead.
276     ///
277     /// # Examples
278     ///
279     /// ```
280     /// use futures::channel::mpsc;
281     /// use futures::future;
282     /// use futures::stream::TryStreamExt;
283     ///
284     /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1);
285     ///
286     /// let rx = rx.and_then(|result| {
287     ///     future::ok(if result % 2 == 0 {
288     ///         Some(result)
289     ///     } else {
290     ///         None
291     ///     })
292     /// });
293     /// ```
and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Error = Self::Error>, Self: Sized,294     fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
295     where
296         F: FnMut(Self::Ok) -> Fut,
297         Fut: TryFuture<Error = Self::Error>,
298         Self: Sized,
299     {
300         assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
301     }
302 
303     /// Chain on a computation for when an error happens, passing the
304     /// erroneous result to the provided closure `f`.
305     ///
306     /// This function can be used to run a unit of work and attempt to recover from
307     /// an error if one happens. The closure provided will be yielded an error
308     /// when one appears, and the returned future will then be run to completion
309     /// to produce the next value on this stream.
310     ///
311     /// Any successful values produced by this stream will not be passed to the
312     /// closure, and will be passed through.
313     ///
314     /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
315     /// and can represent some more work to be done before the composed stream
316     /// is finished.
317     ///
318     /// Note that this function consumes the receiving stream and returns a
319     /// wrapped version of it.
or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> where F: FnMut(Self::Error) -> Fut, Fut: TryFuture<Ok = Self::Ok>, Self: Sized,320     fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
321     where
322         F: FnMut(Self::Error) -> Fut,
323         Fut: TryFuture<Ok = Self::Ok>,
324         Self: Sized,
325     {
326         assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
327     }
328 
329     /// Do something with the success value of this stream, afterwards passing
330     /// it on.
331     ///
332     /// This is similar to the `StreamExt::inspect` method where it allows
333     /// easily inspecting the success value as it passes through the stream, for
334     /// example to debug what's going on.
inspect_ok<F>(self, f: F) -> InspectOk<Self, F> where F: FnMut(&Self::Ok), Self: Sized,335     fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
336     where
337         F: FnMut(&Self::Ok),
338         Self: Sized,
339     {
340         assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
341     }
342 
343     /// Do something with the error value of this stream, afterwards passing it on.
344     ///
345     /// This is similar to the `StreamExt::inspect` method where it allows
346     /// easily inspecting the error value as it passes through the stream, for
347     /// example to debug what's going on.
inspect_err<F>(self, f: F) -> InspectErr<Self, F> where F: FnMut(&Self::Error), Self: Sized,348     fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
349     where
350         F: FnMut(&Self::Error),
351         Self: Sized,
352     {
353         assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
354     }
355 
356     /// Wraps a [`TryStream`] into a type that implements
357     /// [`Stream`](futures_core::stream::Stream)
358     ///
359     /// [`TryStream`]s currently do not implement the
360     /// [`Stream`](futures_core::stream::Stream) trait because of limitations
361     /// of the compiler.
362     ///
363     /// # Examples
364     ///
365     /// ```
366     /// use futures::stream::{Stream, TryStream, TryStreamExt};
367     ///
368     /// # type T = i32;
369     /// # type E = ();
370     /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... }
371     /// # futures::stream::empty()
372     /// # }
373     /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ }
374     ///
375     /// take_stream(make_try_stream().into_stream());
376     /// ```
into_stream(self) -> IntoStream<Self> where Self: Sized,377     fn into_stream(self) -> IntoStream<Self>
378     where
379         Self: Sized,
380     {
381         assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
382     }
383 
384     /// Creates a future that attempts to resolve the next item in the stream.
385     /// If an error is encountered before the next item, the error is returned
386     /// instead.
387     ///
388     /// This is similar to the `Stream::next` combinator, but returns a
389     /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
390     /// for easy use with the `?` operator.
391     ///
392     /// # Examples
393     ///
394     /// ```
395     /// # futures::executor::block_on(async {
396     /// use futures::stream::{self, TryStreamExt};
397     ///
398     /// let mut stream = stream::iter(vec![Ok(()), Err(())]);
399     ///
400     /// assert_eq!(stream.try_next().await, Ok(Some(())));
401     /// assert_eq!(stream.try_next().await, Err(()));
402     /// # })
403     /// ```
try_next(&mut self) -> TryNext<'_, Self> where Self: Unpin,404     fn try_next(&mut self) -> TryNext<'_, Self>
405     where
406         Self: Unpin,
407     {
408         assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
409     }
410 
411     /// Attempts to run this stream to completion, executing the provided
412     /// asynchronous closure for each element on the stream.
413     ///
414     /// The provided closure will be called for each item this stream produces,
415     /// yielding a future. That future will then be executed to completion
416     /// before moving on to the next item.
417     ///
418     /// The returned value is a [`Future`](futures_core::future::Future) where the
419     /// [`Output`](futures_core::future::Future::Output) type is
420     /// `Result<(), Self::Error>`. If any of the intermediate
421     /// futures or the stream returns an error, this future will return
422     /// immediately with an error.
423     ///
424     /// # Examples
425     ///
426     /// ```
427     /// # futures::executor::block_on(async {
428     /// use futures::future;
429     /// use futures::stream::{self, TryStreamExt};
430     ///
431     /// let mut x = 0i32;
432     ///
433     /// {
434     ///     let fut = stream::repeat(Ok(1)).try_for_each(|item| {
435     ///         x += item;
436     ///         future::ready(if x == 3 { Err(()) } else { Ok(()) })
437     ///     });
438     ///     assert_eq!(fut.await, Err(()));
439     /// }
440     ///
441     /// assert_eq!(x, 3);
442     /// # })
443     /// ```
try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = Self::Error>, Self: Sized,444     fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
445     where
446         F: FnMut(Self::Ok) -> Fut,
447         Fut: TryFuture<Ok = (), Error = Self::Error>,
448         Self: Sized,
449     {
450         assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
451     }
452 
453     /// Skip elements on this stream while the provided asynchronous predicate
454     /// resolves to `true`.
455     ///
456     /// This function is similar to
457     /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
458     /// early if an error occurs.
459     ///
460     /// # Examples
461     ///
462     /// ```
463     /// # futures::executor::block_on(async {
464     /// use futures::future;
465     /// use futures::stream::{self, TryStreamExt};
466     ///
467     /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]);
468     /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3)));
469     ///
470     /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
471     /// assert_eq!(output, Ok(vec![3, 2]));
472     /// # })
473     /// ```
try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,474     fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
475     where
476         F: FnMut(&Self::Ok) -> Fut,
477         Fut: TryFuture<Ok = bool, Error = Self::Error>,
478         Self: Sized,
479     {
480         assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
481     }
482 
483     /// Take elements on this stream while the provided asynchronous predicate
484     /// resolves to `true`.
485     ///
486     /// This function is similar to
487     /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
488     /// early if an error occurs.
489     ///
490     /// # Examples
491     ///
492     /// ```
493     /// # futures::executor::block_on(async {
494     /// use futures::future;
495     /// use futures::stream::{self, TryStreamExt};
496     ///
497     /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
498     /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
499     ///
500     /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
501     /// assert_eq!(output, Ok(vec![1, 2]));
502     /// # })
503     /// ```
try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,504     fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
505     where
506         F: FnMut(&Self::Ok) -> Fut,
507         Fut: TryFuture<Ok = bool, Error = Self::Error>,
508         Self: Sized,
509     {
510         assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
511     }
512 
513     /// Attempts to run this stream to completion, executing the provided asynchronous
514     /// closure for each element on the stream concurrently as elements become
515     /// available, exiting as soon as an error occurs.
516     ///
517     /// This is similar to
518     /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
519     /// but will resolve to an error immediately if the underlying stream or the provided
520     /// closure return an error.
521     ///
522     /// This method is only available when the `std` or `alloc` feature of this
523     /// library is activated, and it is activated by default.
524     ///
525     /// # Examples
526     ///
527     /// ```
528     /// # futures::executor::block_on(async {
529     /// use futures::channel::oneshot;
530     /// use futures::stream::{self, StreamExt, TryStreamExt};
531     ///
532     /// let (tx1, rx1) = oneshot::channel();
533     /// let (tx2, rx2) = oneshot::channel();
534     /// let (_tx3, rx3) = oneshot::channel();
535     ///
536     /// let stream = stream::iter(vec![rx1, rx2, rx3]);
537     /// let fut = stream.map(Ok).try_for_each_concurrent(
538     ///     /* limit */ 2,
539     ///     |rx| async move {
540     ///         let res: Result<(), oneshot::Canceled> = rx.await;
541     ///         res
542     ///     }
543     /// );
544     ///
545     /// tx1.send(()).unwrap();
546     /// // Drop the second sender so that `rx2` resolves to `Canceled`.
547     /// drop(tx2);
548     ///
549     /// // The final result is an error because the second future
550     /// // resulted in an error.
551     /// assert_eq!(Err(oneshot::Canceled), fut.await);
552     /// # })
553     /// ```
554     #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
555     #[cfg(feature = "alloc")]
try_for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> TryForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = Result<(), Self::Error>>, Self: Sized,556     fn try_for_each_concurrent<Fut, F>(
557         self,
558         limit: impl Into<Option<usize>>,
559         f: F,
560     ) -> TryForEachConcurrent<Self, Fut, F>
561     where
562         F: FnMut(Self::Ok) -> Fut,
563         Fut: Future<Output = Result<(), Self::Error>>,
564         Self: Sized,
565     {
566         assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
567             self,
568             limit.into(),
569             f,
570         ))
571     }
572 
573     /// Attempt to transform a stream into a collection,
574     /// returning a future representing the result of that computation.
575     ///
576     /// This combinator will collect all successful results of this stream and
577     /// collect them into the specified collection type. If an error happens then all
578     /// collected elements will be dropped and the error will be returned.
579     ///
580     /// The returned future will be resolved when the stream terminates.
581     ///
582     /// # Examples
583     ///
584     /// ```
585     /// # futures::executor::block_on(async {
586     /// use futures::channel::mpsc;
587     /// use futures::stream::TryStreamExt;
588     /// use std::thread;
589     ///
590     /// let (tx, rx) = mpsc::unbounded();
591     ///
592     /// thread::spawn(move || {
593     ///     for i in 1..=5 {
594     ///         tx.unbounded_send(Ok(i)).unwrap();
595     ///     }
596     ///     tx.unbounded_send(Err(6)).unwrap();
597     /// });
598     ///
599     /// let output: Result<Vec<i32>, i32> = rx.try_collect().await;
600     /// assert_eq!(output, Err(6));
601     /// # })
602     /// ```
try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C> where Self: Sized,603     fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
604     where
605         Self: Sized,
606     {
607         assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
608     }
609 
610     /// An adaptor for chunking up successful items of the stream inside a vector.
611     ///
612     /// This combinator will attempt to pull successful items from this stream and buffer
613     /// them into a local vector. At most `capacity` items will get buffered
614     /// before they're yielded from the returned stream.
615     ///
616     /// Note that the vectors returned from this iterator may not always have
617     /// `capacity` elements. If the underlying stream ended and only a partial
618     /// vector was created, it'll be returned. Additionally if an error happens
619     /// from the underlying stream then the currently buffered items will be
620     /// yielded.
621     ///
622     /// This method is only available when the `std` or `alloc` feature of this
623     /// library is activated, and it is activated by default.
624     ///
625     /// This function is similar to
626     /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
627     /// early if an error occurs.
628     ///
629     /// # Examples
630     ///
631     /// ```
632     /// # futures::executor::block_on(async {
633     /// use futures::stream::{self, TryChunksError, TryStreamExt};
634     ///
635     /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
636     /// let mut stream = stream.try_chunks(2);
637     ///
638     /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
639     /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
640     /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
641     /// # })
642     /// ```
643     ///
644     /// # Panics
645     ///
646     /// This method will panic if `capacity` is zero.
647     #[cfg(feature = "alloc")]
try_chunks(self, capacity: usize) -> TryChunks<Self> where Self: Sized,648     fn try_chunks(self, capacity: usize) -> TryChunks<Self>
649     where
650         Self: Sized,
651     {
652         assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
653             TryChunks::new(self, capacity),
654         )
655     }
656 
657     /// An adaptor for chunking up successful, ready items of the stream inside a vector.
658     ///
659     /// This combinator will attempt to pull successful items from this stream and buffer
660     /// them into a local vector. At most `capacity` items will get buffered
661     /// before they're yielded from the returned stream. If the underlying stream
662     /// returns `Poll::Pending`, and the collected chunk is not empty, it will
663     /// be immediately returned.
664     ///
665     /// Note that the vectors returned from this iterator may not always have
666     /// `capacity` elements. If the underlying stream ended and only a partial
667     /// vector was created, it'll be returned. Additionally if an error happens
668     /// from the underlying stream then the currently buffered items will be
669     /// yielded.
670     ///
671     /// This method is only available when the `std` or `alloc` feature of this
672     /// library is activated, and it is activated by default.
673     ///
674     /// This function is similar to
675     /// [`StreamExt::ready_chunks`](crate::stream::StreamExt::ready_chunks) but exits
676     /// early if an error occurs.
677     ///
678     /// # Examples
679     ///
680     /// ```
681     /// # futures::executor::block_on(async {
682     /// use futures::stream::{self, TryReadyChunksError, TryStreamExt};
683     ///
684     /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
685     /// let mut stream = stream.try_ready_chunks(2);
686     ///
687     /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
688     /// assert_eq!(stream.try_next().await, Err(TryReadyChunksError(vec![3], 4)));
689     /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
690     /// # })
691     /// ```
692     ///
693     /// # Panics
694     ///
695     /// This method will panic if `capacity` is zero.
696     #[cfg(feature = "alloc")]
try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self> where Self: Sized,697     fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>
698     where
699         Self: Sized,
700     {
701         assert_stream::<Result<Vec<Self::Ok>, TryReadyChunksError<Self::Ok, Self::Error>>, _>(
702             TryReadyChunks::new(self, capacity),
703         )
704     }
705 
706     /// Attempt to filter the values produced by this stream according to the
707     /// provided asynchronous closure.
708     ///
709     /// As values of this stream are made available, the provided predicate `f`
710     /// will be run on them. If the predicate returns a `Future` which resolves
711     /// to `true`, then the stream will yield the value, but if the predicate
712     /// return a `Future` which resolves to `false`, then the value will be
713     /// discarded and the next value will be produced.
714     ///
715     /// All errors are passed through without filtering in this combinator.
716     ///
717     /// Note that this function consumes the stream passed into it and returns a
718     /// wrapped version of it, similar to the existing `filter` methods in
719     /// the standard library.
720     ///
721     /// # Examples
722     /// ```
723     /// # futures::executor::block_on(async {
724     /// use futures::future;
725     /// use futures::stream::{self, StreamExt, TryStreamExt};
726     ///
727     /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]);
728     /// let mut evens = stream.try_filter(|x| {
729     ///     future::ready(x % 2 == 0)
730     /// });
731     ///
732     /// assert_eq!(evens.next().await, Some(Ok(2)));
733     /// assert_eq!(evens.next().await, Some(Err("error")));
734     /// # })
735     /// ```
try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> where Fut: Future<Output = bool>, F: FnMut(&Self::Ok) -> Fut, Self: Sized,736     fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
737     where
738         Fut: Future<Output = bool>,
739         F: FnMut(&Self::Ok) -> Fut,
740         Self: Sized,
741     {
742         assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
743     }
744 
745     /// Attempt to filter the values produced by this stream while
746     /// simultaneously mapping them to a different type according to the
747     /// provided asynchronous closure.
748     ///
749     /// As values of this stream are made available, the provided function will
750     /// be run on them. If the future returned by the predicate `f` resolves to
751     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
752     /// it resolves to [`None`] then the next value will be produced.
753     ///
754     /// All errors are passed through without filtering in this combinator.
755     ///
756     /// Note that this function consumes the stream passed into it and returns a
757     /// wrapped version of it, similar to the existing `filter_map` methods in
758     /// the standard library.
759     ///
760     /// # Examples
761     /// ```
762     /// # futures::executor::block_on(async {
763     /// use futures::stream::{self, StreamExt, TryStreamExt};
764     /// use futures::pin_mut;
765     ///
766     /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]);
767     /// let halves = stream.try_filter_map(|x| async move {
768     ///     let ret = if x % 2 == 0 { Some(x / 2) } else { None };
769     ///     Ok(ret)
770     /// });
771     ///
772     /// pin_mut!(halves);
773     /// assert_eq!(halves.next().await, Some(Ok(3)));
774     /// assert_eq!(halves.next().await, Some(Err("error")));
775     /// # })
776     /// ```
try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> where Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, F: FnMut(Self::Ok) -> Fut, Self: Sized,777     fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
778     where
779         Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
780         F: FnMut(Self::Ok) -> Fut,
781         Self: Sized,
782     {
783         assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
784     }
785 
786     /// Flattens a stream of streams into just one continuous stream. Produced streams
787     /// will be polled concurrently and any errors will be passed through without looking at them.
788     /// If the underlying base stream returns an error, it will be **immediately** propagated.
789     ///
790     /// The only argument is an optional limit on the number of concurrently
791     /// polled streams. If this limit is not `None`, no more than `limit` streams
792     /// will be polled at the same time. The `limit` argument is of type
793     /// `Into<Option<usize>>`, and so can be provided as either `None`,
794     /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
795     /// no limit at all, and will have the same result as passing in `None`.
796     ///
797     /// # Examples
798     ///
799     /// ```
800     /// # futures::executor::block_on(async {
801     /// use futures::channel::mpsc;
802     /// use futures::stream::{StreamExt, TryStreamExt};
803     /// use std::thread;
804     ///
805     /// let (tx1, rx1) = mpsc::unbounded();
806     /// let (tx2, rx2) = mpsc::unbounded();
807     /// let (tx3, rx3) = mpsc::unbounded();
808     ///
809     /// thread::spawn(move || {
810     ///     tx1.unbounded_send(Ok(1)).unwrap();
811     /// });
812     /// thread::spawn(move || {
813     ///     tx2.unbounded_send(Ok(2)).unwrap();
814     ///     tx2.unbounded_send(Err(3)).unwrap();
815     ///     tx2.unbounded_send(Ok(4)).unwrap();
816     /// });
817     /// thread::spawn(move || {
818     ///     tx3.unbounded_send(Ok(rx1)).unwrap();
819     ///     tx3.unbounded_send(Ok(rx2)).unwrap();
820     ///     tx3.unbounded_send(Err(5)).unwrap();
821     /// });
822     ///
823     /// let stream = rx3.try_flatten_unordered(None);
824     /// let mut values: Vec<_> = stream.collect().await;
825     /// values.sort();
826     ///
827     /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]);
828     /// # });
829     /// ```
830     #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
831     #[cfg(feature = "alloc")]
try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self> where Self::Ok: TryStream + Unpin, <Self::Ok as TryStream>::Error: From<Self::Error>, Self: Sized,832     fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
833     where
834         Self::Ok: TryStream + Unpin,
835         <Self::Ok as TryStream>::Error: From<Self::Error>,
836         Self: Sized,
837     {
838         assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
839             TryFlattenUnordered::new(self, limit),
840         )
841     }
842 
843     /// Flattens a stream of streams into just one continuous stream.
844     ///
845     /// If this stream's elements are themselves streams then this combinator
846     /// will flatten out the entire stream to one long chain of elements. Any
847     /// errors are passed through without looking at them, but otherwise each
848     /// individual stream will get exhausted before moving on to the next.
849     ///
850     /// # Examples
851     ///
852     /// ```
853     /// # futures::executor::block_on(async {
854     /// use futures::channel::mpsc;
855     /// use futures::stream::{StreamExt, TryStreamExt};
856     /// use std::thread;
857     ///
858     /// let (tx1, rx1) = mpsc::unbounded();
859     /// let (tx2, rx2) = mpsc::unbounded();
860     /// let (tx3, rx3) = mpsc::unbounded();
861     ///
862     /// thread::spawn(move || {
863     ///     tx1.unbounded_send(Ok(1)).unwrap();
864     /// });
865     /// thread::spawn(move || {
866     ///     tx2.unbounded_send(Ok(2)).unwrap();
867     ///     tx2.unbounded_send(Err(3)).unwrap();
868     ///     tx2.unbounded_send(Ok(4)).unwrap();
869     /// });
870     /// thread::spawn(move || {
871     ///     tx3.unbounded_send(Ok(rx1)).unwrap();
872     ///     tx3.unbounded_send(Ok(rx2)).unwrap();
873     ///     tx3.unbounded_send(Err(5)).unwrap();
874     /// });
875     ///
876     /// let mut stream = rx3.try_flatten();
877     /// assert_eq!(stream.next().await, Some(Ok(1)));
878     /// assert_eq!(stream.next().await, Some(Ok(2)));
879     /// assert_eq!(stream.next().await, Some(Err(3)));
880     /// assert_eq!(stream.next().await, Some(Ok(4)));
881     /// assert_eq!(stream.next().await, Some(Err(5)));
882     /// assert_eq!(stream.next().await, None);
883     /// # });
884     /// ```
try_flatten(self) -> TryFlatten<Self> where Self::Ok: TryStream, <Self::Ok as TryStream>::Error: From<Self::Error>, Self: Sized,885     fn try_flatten(self) -> TryFlatten<Self>
886     where
887         Self::Ok: TryStream,
888         <Self::Ok as TryStream>::Error: From<Self::Error>,
889         Self: Sized,
890     {
891         assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
892             TryFlatten::new(self),
893         )
894     }
895 
896     /// Attempt to execute an accumulating asynchronous computation over a
897     /// stream, collecting all the values into one final result.
898     ///
899     /// This combinator will accumulate all values returned by this stream
900     /// according to the closure provided. The initial state is also provided to
901     /// this method and then is returned again by each execution of the closure.
902     /// Once the entire stream has been exhausted the returned future will
903     /// resolve to this value.
904     ///
905     /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will
906     /// exit early if an error is encountered in either the stream or the
907     /// provided closure.
908     ///
909     /// # Examples
910     ///
911     /// ```
912     /// # futures::executor::block_on(async {
913     /// use futures::stream::{self, TryStreamExt};
914     ///
915     /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
916     /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) });
917     /// assert_eq!(sum.await, Ok(3));
918     ///
919     /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
920     /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) });
921     /// assert_eq!(sum.await, Err(2));
922     /// # })
923     /// ```
try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> where F: FnMut(T, Self::Ok) -> Fut, Fut: TryFuture<Ok = T, Error = Self::Error>, Self: Sized,924     fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
925     where
926         F: FnMut(T, Self::Ok) -> Fut,
927         Fut: TryFuture<Ok = T, Error = Self::Error>,
928         Self: Sized,
929     {
930         assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
931     }
932 
933     /// Attempt to concatenate all items of a stream into a single
934     /// extendable destination, returning a future representing the end result.
935     ///
936     /// This combinator will extend the first item with the contents of all
937     /// the subsequent successful results of the stream. If the stream is empty,
938     /// the default value will be returned.
939     ///
940     /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait.
941     ///
942     /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will
943     /// exit early if an error is encountered in the stream.
944     ///
945     /// # Examples
946     ///
947     /// ```
948     /// # futures::executor::block_on(async {
949     /// use futures::channel::mpsc;
950     /// use futures::stream::TryStreamExt;
951     /// use std::thread;
952     ///
953     /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>();
954     ///
955     /// thread::spawn(move || {
956     ///     for i in (0..3).rev() {
957     ///         let n = i * 3;
958     ///         tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap();
959     ///     }
960     /// });
961     ///
962     /// let result = rx.try_concat().await;
963     ///
964     /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
965     /// # });
966     /// ```
try_concat(self) -> TryConcat<Self> where Self: Sized, Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,967     fn try_concat(self) -> TryConcat<Self>
968     where
969         Self: Sized,
970         Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
971     {
972         assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
973     }
974 
975     /// Attempt to execute several futures from a stream concurrently (unordered).
976     ///
977     /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
978     /// that matches the stream's `Error` type.
979     ///
980     /// This adaptor will buffer up to `n` futures and then return their
981     /// outputs in the order in which they complete. If the underlying stream
982     /// returns an error, it will be immediately propagated.
983     ///
984     /// The returned stream will be a stream of results, each containing either
985     /// an error or a future's output. An error can be produced either by the
986     /// underlying stream itself or by one of the futures it yielded.
987     ///
988     /// This method is only available when the `std` or `alloc` feature of this
989     /// library is activated, and it is activated by default.
990     ///
991     /// # Examples
992     ///
993     /// Results are returned in the order of completion:
994     /// ```
995     /// # futures::executor::block_on(async {
996     /// use futures::channel::oneshot;
997     /// use futures::stream::{self, StreamExt, TryStreamExt};
998     ///
999     /// let (send_one, recv_one) = oneshot::channel();
1000     /// let (send_two, recv_two) = oneshot::channel();
1001     ///
1002     /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
1003     ///
1004     /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
1005     ///
1006     /// send_two.send(2i32)?;
1007     /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1008     ///
1009     /// send_one.send(1i32)?;
1010     /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1011     ///
1012     /// assert_eq!(buffered.next().await, None);
1013     /// # Ok::<(), i32>(()) }).unwrap();
1014     /// ```
1015     ///
1016     /// Errors from the underlying stream itself are propagated:
1017     /// ```
1018     /// # futures::executor::block_on(async {
1019     /// use futures::channel::mpsc;
1020     /// use futures::stream::{StreamExt, TryStreamExt};
1021     ///
1022     /// let (sink, stream_of_futures) = mpsc::unbounded();
1023     /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
1024     ///
1025     /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
1026     /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
1027     ///
1028     /// sink.unbounded_send(Err("error in the stream"))?;
1029     /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
1030     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
1031     /// ```
1032     #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1033     #[cfg(feature = "alloc")]
try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,1034     fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
1035     where
1036         Self::Ok: TryFuture<Error = Self::Error>,
1037         Self: Sized,
1038     {
1039         assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
1040             TryBufferUnordered::new(self, n),
1041         )
1042     }
1043 
1044     /// Attempt to execute several futures from a stream concurrently.
1045     ///
1046     /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
1047     /// that matches the stream's `Error` type.
1048     ///
1049     /// This adaptor will buffer up to `n` futures and then return their
1050     /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
1051     /// be immediately propagated.
1052     ///
1053     /// The returned stream will be a stream of results, each containing either
1054     /// an error or a future's output. An error can be produced either by the
1055     /// underlying stream itself or by one of the futures it yielded.
1056     ///
1057     /// This method is only available when the `std` or `alloc` feature of this
1058     /// library is activated, and it is activated by default.
1059     ///
1060     /// # Examples
1061     ///
1062     /// Results are returned in the order of addition:
1063     /// ```
1064     /// # futures::executor::block_on(async {
1065     /// use futures::channel::oneshot;
1066     /// use futures::future::lazy;
1067     /// use futures::stream::{self, StreamExt, TryStreamExt};
1068     ///
1069     /// let (send_one, recv_one) = oneshot::channel();
1070     /// let (send_two, recv_two) = oneshot::channel();
1071     ///
1072     /// let mut buffered = lazy(move |cx| {
1073     ///     let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
1074     ///
1075     ///     let mut buffered = stream_of_futures.try_buffered(10);
1076     ///
1077     ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
1078     ///
1079     ///     send_two.send(2i32)?;
1080     ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
1081     ///     Ok::<_, i32>(buffered)
1082     /// }).await?;
1083     ///
1084     /// send_one.send(1i32)?;
1085     /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1086     /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1087     ///
1088     /// assert_eq!(buffered.next().await, None);
1089     /// # Ok::<(), i32>(()) }).unwrap();
1090     /// ```
1091     ///
1092     /// Errors from the underlying stream itself are propagated:
1093     /// ```
1094     /// # futures::executor::block_on(async {
1095     /// use futures::channel::mpsc;
1096     /// use futures::stream::{StreamExt, TryStreamExt};
1097     ///
1098     /// let (sink, stream_of_futures) = mpsc::unbounded();
1099     /// let mut buffered = stream_of_futures.try_buffered(10);
1100     ///
1101     /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
1102     /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
1103     ///
1104     /// sink.unbounded_send(Err("error in the stream"))?;
1105     /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
1106     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
1107     /// ```
1108     #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
1109     #[cfg(feature = "alloc")]
try_buffered(self, n: usize) -> TryBuffered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,1110     fn try_buffered(self, n: usize) -> TryBuffered<Self>
1111     where
1112         Self::Ok: TryFuture<Error = Self::Error>,
1113         Self: Sized,
1114     {
1115         assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(
1116             self, n,
1117         ))
1118     }
1119 
1120     // TODO: false positive warning from rustdoc. Verify once #43466 settles
1121     //
1122     /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
1123     /// stream types.
try_poll_next_unpin( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>> where Self: Unpin,1124     fn try_poll_next_unpin(
1125         &mut self,
1126         cx: &mut Context<'_>,
1127     ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
1128     where
1129         Self: Unpin,
1130     {
1131         Pin::new(self).try_poll_next(cx)
1132     }
1133 
1134     /// Wraps a [`TryStream`] into a stream compatible with libraries using
1135     /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
1136     /// ```
1137     /// # if cfg!(miri) { return; } // Miri does not support epoll
1138     /// use futures::future::{FutureExt, TryFutureExt};
1139     /// # let (tx, rx) = futures::channel::oneshot::channel();
1140     ///
1141     /// let future03 = async {
1142     ///     println!("Running on the pool");
1143     ///     tx.send(42).unwrap();
1144     /// };
1145     ///
1146     /// let future01 = future03
1147     ///     .unit_error() // Make it a TryFuture
1148     ///     .boxed()  // Make it Unpin
1149     ///     .compat();
1150     ///
1151     /// tokio::run(future01);
1152     /// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
1153     /// ```
1154     #[cfg(feature = "compat")]
1155     #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
compat(self) -> Compat<Self> where Self: Sized + Unpin,1156     fn compat(self) -> Compat<Self>
1157     where
1158         Self: Sized + Unpin,
1159     {
1160         Compat::new(self)
1161     }
1162 
1163     /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
1164     ///
1165     /// This method is only available when the `std` feature of this
1166     /// library is activated, and it is activated by default.
1167     ///
1168     /// # Examples
1169     ///
1170     /// ```
1171     /// # futures::executor::block_on(async {
1172     /// use futures::stream::{self, TryStreamExt};
1173     /// use futures::io::AsyncReadExt;
1174     ///
1175     /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
1176     /// let mut reader = stream.into_async_read();
1177     ///
1178     /// let mut buf = Vec::new();
1179     /// reader.read_to_end(&mut buf).await.unwrap();
1180     /// assert_eq!(buf, [1, 2, 3, 4, 5]);
1181     /// # })
1182     /// ```
1183     #[cfg(feature = "io")]
1184     #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
1185     #[cfg(feature = "std")]
into_async_read(self) -> IntoAsyncRead<Self> where Self: Sized + TryStreamExt<Error = std::io::Error>, Self::Ok: AsRef<[u8]>,1186     fn into_async_read(self) -> IntoAsyncRead<Self>
1187     where
1188         Self: Sized + TryStreamExt<Error = std::io::Error>,
1189         Self::Ok: AsRef<[u8]>,
1190     {
1191         crate::io::assert_read(IntoAsyncRead::new(self))
1192     }
1193 
1194     /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items
1195     /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
1196     /// that does not satisfy the predicate.
1197     ///
1198     /// # Examples
1199     ///
1200     /// ```
1201     /// # futures::executor::block_on(async {
1202     /// use futures::stream::{self, StreamExt, TryStreamExt};
1203     /// use std::convert::Infallible;
1204     ///
1205     /// let number_stream = stream::iter(1..10).map(Ok::<_, Infallible>);
1206     /// let positive = number_stream.try_all(|i| async move { i > 0 });
1207     /// assert_eq!(positive.await, Ok(true));
1208     ///
1209     /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]);
1210     /// let positive = stream_with_errors.try_all(|i| async move { i > 0 });
1211     /// assert_eq!(positive.await, Err("err"));
1212     /// # });
1213     /// ```
try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F> where Self: Sized, F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = bool>,1214     fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F>
1215     where
1216         Self: Sized,
1217         F: FnMut(Self::Ok) -> Fut,
1218         Fut: Future<Output = bool>,
1219     {
1220         assert_future::<Result<bool, Self::Error>, _>(TryAll::new(self, f))
1221     }
1222 
1223     /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items
1224     /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found
1225     /// that satisfies the predicate.
1226     ///
1227     /// # Examples
1228     ///
1229     /// ```
1230     /// # futures::executor::block_on(async {
1231     /// use futures::stream::{self, StreamExt, TryStreamExt};
1232     /// use std::convert::Infallible;
1233     ///
1234     /// let number_stream = stream::iter(0..10).map(Ok::<_, Infallible>);
1235     /// let contain_three = number_stream.try_any(|i| async move { i == 3 });
1236     /// assert_eq!(contain_three.await, Ok(true));
1237     ///
1238     /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]);
1239     /// let contain_three = stream_with_errors.try_any(|i| async move { i == 3 });
1240     /// assert_eq!(contain_three.await, Err("err"));
1241     /// # });
1242     /// ```
try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F> where Self: Sized, F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = bool>,1243     fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F>
1244     where
1245         Self: Sized,
1246         F: FnMut(Self::Ok) -> Fut,
1247         Fut: Future<Output = bool>,
1248     {
1249         assert_future::<Result<bool, Self::Error>, _>(TryAny::new(self, f))
1250     }
1251 }
1252