1 //! Streams 2 //! 3 //! This module contains a number of functions for working with `Streams`s 4 //! that return `Result`s, allowing for short-circuiting computations. 5 6 #[cfg(feature = "compat")] 7 use crate::compat::Compat; 8 use crate::fns::{ 9 inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn, 10 IntoFn, MapErrFn, MapOkFn, 11 }; 12 use crate::future::assert_future; 13 use crate::stream::assert_stream; 14 use crate::stream::{Inspect, Map}; 15 #[cfg(feature = "alloc")] 16 use alloc::vec::Vec; 17 use core::pin::Pin; 18 19 use futures_core::{ 20 future::{Future, TryFuture}, 21 stream::TryStream, 22 task::{Context, Poll}, 23 }; 24 25 mod and_then; 26 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 27 pub use self::and_then::AndThen; 28 29 delegate_all!( 30 /// Stream for the [`err_into`](super::TryStreamExt::err_into) method. 31 ErrInto<St, E>( 32 MapErr<St, IntoFn<E>> 33 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())] 34 ); 35 36 delegate_all!( 37 /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method. 38 InspectOk<St, F>( 39 Inspect<IntoStream<St>, InspectOkFn<F>> 40 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))] 41 ); 42 43 delegate_all!( 44 /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method. 45 InspectErr<St, F>( 46 Inspect<IntoStream<St>, InspectErrFn<F>> 47 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))] 48 ); 49 50 mod into_stream; 51 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 52 pub use self::into_stream::IntoStream; 53 54 delegate_all!( 55 /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method. 56 MapOk<St, F>( 57 Map<IntoStream<St>, MapOkFn<F>> 58 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))] 59 ); 60 61 delegate_all!( 62 /// Stream for the [`map_err`](super::TryStreamExt::map_err) method. 63 MapErr<St, F>( 64 Map<IntoStream<St>, MapErrFn<F>> 65 ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))] 66 ); 67 68 mod or_else; 69 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 70 pub use self::or_else::OrElse; 71 72 mod try_next; 73 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 74 pub use self::try_next::TryNext; 75 76 mod try_for_each; 77 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 78 pub use self::try_for_each::TryForEach; 79 80 mod try_filter; 81 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 82 pub use self::try_filter::TryFilter; 83 84 mod try_filter_map; 85 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 86 pub use self::try_filter_map::TryFilterMap; 87 88 mod try_flatten; 89 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 90 pub use self::try_flatten::TryFlatten; 91 92 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 93 #[cfg(feature = "alloc")] 94 mod try_flatten_unordered; 95 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 96 #[cfg(feature = "alloc")] 97 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 98 pub use self::try_flatten_unordered::TryFlattenUnordered; 99 100 mod try_collect; 101 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 102 pub use self::try_collect::TryCollect; 103 104 mod try_concat; 105 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 106 pub use self::try_concat::TryConcat; 107 108 #[cfg(feature = "alloc")] 109 mod try_chunks; 110 #[cfg(feature = "alloc")] 111 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 112 pub use self::try_chunks::{TryChunks, TryChunksError}; 113 114 #[cfg(feature = "alloc")] 115 mod try_ready_chunks; 116 #[cfg(feature = "alloc")] 117 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 118 pub use self::try_ready_chunks::{TryReadyChunks, TryReadyChunksError}; 119 120 mod try_fold; 121 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 122 pub use self::try_fold::TryFold; 123 124 mod try_unfold; 125 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 126 pub use self::try_unfold::{try_unfold, TryUnfold}; 127 128 mod try_skip_while; 129 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 130 pub use self::try_skip_while::TrySkipWhile; 131 132 mod try_take_while; 133 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 134 pub use self::try_take_while::TryTakeWhile; 135 136 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 137 #[cfg(feature = "alloc")] 138 mod try_buffer_unordered; 139 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 140 #[cfg(feature = "alloc")] 141 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 142 pub use self::try_buffer_unordered::TryBufferUnordered; 143 144 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 145 #[cfg(feature = "alloc")] 146 mod try_buffered; 147 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 148 #[cfg(feature = "alloc")] 149 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 150 pub use self::try_buffered::TryBuffered; 151 152 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 153 #[cfg(feature = "alloc")] 154 mod try_for_each_concurrent; 155 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 156 #[cfg(feature = "alloc")] 157 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 158 pub use self::try_for_each_concurrent::TryForEachConcurrent; 159 160 #[cfg(feature = "io")] 161 #[cfg(feature = "std")] 162 mod into_async_read; 163 #[cfg(feature = "io")] 164 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 165 #[cfg(feature = "std")] 166 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 167 pub use self::into_async_read::IntoAsyncRead; 168 169 mod try_all; 170 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 171 pub use self::try_all::TryAll; 172 173 mod try_any; 174 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 175 pub use self::try_any::TryAny; 176 177 impl<S: ?Sized + TryStream> TryStreamExt for S {} 178 179 /// Adapters specific to `Result`-returning streams 180 pub trait TryStreamExt: TryStream { 181 /// Wraps the current stream in a new stream which converts the error type 182 /// into the one provided. 183 /// 184 /// # Examples 185 /// 186 /// ``` 187 /// # futures::executor::block_on(async { 188 /// use futures::stream::{self, TryStreamExt}; 189 /// 190 /// let mut stream = 191 /// stream::iter(vec![Ok(()), Err(5i32)]) 192 /// .err_into::<i64>(); 193 /// 194 /// assert_eq!(stream.try_next().await, Ok(Some(()))); 195 /// assert_eq!(stream.try_next().await, Err(5i64)); 196 /// # }) 197 /// ``` err_into<E>(self) -> ErrInto<Self, E> where Self: Sized, Self::Error: Into<E>,198 fn err_into<E>(self) -> ErrInto<Self, E> 199 where 200 Self: Sized, 201 Self::Error: Into<E>, 202 { 203 assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self)) 204 } 205 206 /// Wraps the current stream in a new stream which maps the success value 207 /// using the provided closure. 208 /// 209 /// # Examples 210 /// 211 /// ``` 212 /// # futures::executor::block_on(async { 213 /// use futures::stream::{self, TryStreamExt}; 214 /// 215 /// let mut stream = 216 /// stream::iter(vec![Ok(5), Err(0)]) 217 /// .map_ok(|x| x + 2); 218 /// 219 /// assert_eq!(stream.try_next().await, Ok(Some(7))); 220 /// assert_eq!(stream.try_next().await, Err(0)); 221 /// # }) 222 /// ``` map_ok<T, F>(self, f: F) -> MapOk<Self, F> where Self: Sized, F: FnMut(Self::Ok) -> T,223 fn map_ok<T, F>(self, f: F) -> MapOk<Self, F> 224 where 225 Self: Sized, 226 F: FnMut(Self::Ok) -> T, 227 { 228 assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f)) 229 } 230 231 /// Wraps the current stream in a new stream which maps the error value 232 /// using the provided closure. 233 /// 234 /// # Examples 235 /// 236 /// ``` 237 /// # futures::executor::block_on(async { 238 /// use futures::stream::{self, TryStreamExt}; 239 /// 240 /// let mut stream = 241 /// stream::iter(vec![Ok(5), Err(0)]) 242 /// .map_err(|x| x + 2); 243 /// 244 /// assert_eq!(stream.try_next().await, Ok(Some(5))); 245 /// assert_eq!(stream.try_next().await, Err(2)); 246 /// # }) 247 /// ``` map_err<E, F>(self, f: F) -> MapErr<Self, F> where Self: Sized, F: FnMut(Self::Error) -> E,248 fn map_err<E, F>(self, f: F) -> MapErr<Self, F> 249 where 250 Self: Sized, 251 F: FnMut(Self::Error) -> E, 252 { 253 assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f)) 254 } 255 256 /// Chain on a computation for when a value is ready, passing the successful 257 /// results to the provided closure `f`. 258 /// 259 /// This function can be used to run a unit of work when the next successful 260 /// value on a stream is ready. The closure provided will be yielded a value 261 /// when ready, and the returned future will then be run to completion to 262 /// produce the next value on this stream. 263 /// 264 /// Any errors produced by this stream will not be passed to the closure, 265 /// and will be passed through. 266 /// 267 /// The returned value of the closure must implement the `TryFuture` trait 268 /// and can represent some more work to be done before the composed stream 269 /// is finished. 270 /// 271 /// Note that this function consumes the receiving stream and returns a 272 /// wrapped version of it. 273 /// 274 /// To process the entire stream and return a single future representing 275 /// success or error, use `try_for_each` instead. 276 /// 277 /// # Examples 278 /// 279 /// ``` 280 /// use futures::channel::mpsc; 281 /// use futures::future; 282 /// use futures::stream::TryStreamExt; 283 /// 284 /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1); 285 /// 286 /// let rx = rx.and_then(|result| { 287 /// future::ok(if result % 2 == 0 { 288 /// Some(result) 289 /// } else { 290 /// None 291 /// }) 292 /// }); 293 /// ``` and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Error = Self::Error>, Self: Sized,294 fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> 295 where 296 F: FnMut(Self::Ok) -> Fut, 297 Fut: TryFuture<Error = Self::Error>, 298 Self: Sized, 299 { 300 assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f)) 301 } 302 303 /// Chain on a computation for when an error happens, passing the 304 /// erroneous result to the provided closure `f`. 305 /// 306 /// This function can be used to run a unit of work and attempt to recover from 307 /// an error if one happens. The closure provided will be yielded an error 308 /// when one appears, and the returned future will then be run to completion 309 /// to produce the next value on this stream. 310 /// 311 /// Any successful values produced by this stream will not be passed to the 312 /// closure, and will be passed through. 313 /// 314 /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait 315 /// and can represent some more work to be done before the composed stream 316 /// is finished. 317 /// 318 /// Note that this function consumes the receiving stream and returns a 319 /// wrapped version of it. or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> where F: FnMut(Self::Error) -> Fut, Fut: TryFuture<Ok = Self::Ok>, Self: Sized,320 fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> 321 where 322 F: FnMut(Self::Error) -> Fut, 323 Fut: TryFuture<Ok = Self::Ok>, 324 Self: Sized, 325 { 326 assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f)) 327 } 328 329 /// Do something with the success value of this stream, afterwards passing 330 /// it on. 331 /// 332 /// This is similar to the `StreamExt::inspect` method where it allows 333 /// easily inspecting the success value as it passes through the stream, for 334 /// example to debug what's going on. inspect_ok<F>(self, f: F) -> InspectOk<Self, F> where F: FnMut(&Self::Ok), Self: Sized,335 fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F> 336 where 337 F: FnMut(&Self::Ok), 338 Self: Sized, 339 { 340 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f)) 341 } 342 343 /// Do something with the error value of this stream, afterwards passing it on. 344 /// 345 /// This is similar to the `StreamExt::inspect` method where it allows 346 /// easily inspecting the error value as it passes through the stream, for 347 /// example to debug what's going on. inspect_err<F>(self, f: F) -> InspectErr<Self, F> where F: FnMut(&Self::Error), Self: Sized,348 fn inspect_err<F>(self, f: F) -> InspectErr<Self, F> 349 where 350 F: FnMut(&Self::Error), 351 Self: Sized, 352 { 353 assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f)) 354 } 355 356 /// Wraps a [`TryStream`] into a type that implements 357 /// [`Stream`](futures_core::stream::Stream) 358 /// 359 /// [`TryStream`]s currently do not implement the 360 /// [`Stream`](futures_core::stream::Stream) trait because of limitations 361 /// of the compiler. 362 /// 363 /// # Examples 364 /// 365 /// ``` 366 /// use futures::stream::{Stream, TryStream, TryStreamExt}; 367 /// 368 /// # type T = i32; 369 /// # type E = (); 370 /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... } 371 /// # futures::stream::empty() 372 /// # } 373 /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ } 374 /// 375 /// take_stream(make_try_stream().into_stream()); 376 /// ``` into_stream(self) -> IntoStream<Self> where Self: Sized,377 fn into_stream(self) -> IntoStream<Self> 378 where 379 Self: Sized, 380 { 381 assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self)) 382 } 383 384 /// Creates a future that attempts to resolve the next item in the stream. 385 /// If an error is encountered before the next item, the error is returned 386 /// instead. 387 /// 388 /// This is similar to the `Stream::next` combinator, but returns a 389 /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making 390 /// for easy use with the `?` operator. 391 /// 392 /// # Examples 393 /// 394 /// ``` 395 /// # futures::executor::block_on(async { 396 /// use futures::stream::{self, TryStreamExt}; 397 /// 398 /// let mut stream = stream::iter(vec![Ok(()), Err(())]); 399 /// 400 /// assert_eq!(stream.try_next().await, Ok(Some(()))); 401 /// assert_eq!(stream.try_next().await, Err(())); 402 /// # }) 403 /// ``` try_next(&mut self) -> TryNext<'_, Self> where Self: Unpin,404 fn try_next(&mut self) -> TryNext<'_, Self> 405 where 406 Self: Unpin, 407 { 408 assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self)) 409 } 410 411 /// Attempts to run this stream to completion, executing the provided 412 /// asynchronous closure for each element on the stream. 413 /// 414 /// The provided closure will be called for each item this stream produces, 415 /// yielding a future. That future will then be executed to completion 416 /// before moving on to the next item. 417 /// 418 /// The returned value is a [`Future`](futures_core::future::Future) where the 419 /// [`Output`](futures_core::future::Future::Output) type is 420 /// `Result<(), Self::Error>`. If any of the intermediate 421 /// futures or the stream returns an error, this future will return 422 /// immediately with an error. 423 /// 424 /// # Examples 425 /// 426 /// ``` 427 /// # futures::executor::block_on(async { 428 /// use futures::future; 429 /// use futures::stream::{self, TryStreamExt}; 430 /// 431 /// let mut x = 0i32; 432 /// 433 /// { 434 /// let fut = stream::repeat(Ok(1)).try_for_each(|item| { 435 /// x += item; 436 /// future::ready(if x == 3 { Err(()) } else { Ok(()) }) 437 /// }); 438 /// assert_eq!(fut.await, Err(())); 439 /// } 440 /// 441 /// assert_eq!(x, 3); 442 /// # }) 443 /// ``` try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = Self::Error>, Self: Sized,444 fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> 445 where 446 F: FnMut(Self::Ok) -> Fut, 447 Fut: TryFuture<Ok = (), Error = Self::Error>, 448 Self: Sized, 449 { 450 assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f)) 451 } 452 453 /// Skip elements on this stream while the provided asynchronous predicate 454 /// resolves to `true`. 455 /// 456 /// This function is similar to 457 /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits 458 /// early if an error occurs. 459 /// 460 /// # Examples 461 /// 462 /// ``` 463 /// # futures::executor::block_on(async { 464 /// use futures::future; 465 /// use futures::stream::{self, TryStreamExt}; 466 /// 467 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]); 468 /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3))); 469 /// 470 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await; 471 /// assert_eq!(output, Ok(vec![3, 2])); 472 /// # }) 473 /// ``` try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,474 fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> 475 where 476 F: FnMut(&Self::Ok) -> Fut, 477 Fut: TryFuture<Ok = bool, Error = Self::Error>, 478 Self: Sized, 479 { 480 assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f)) 481 } 482 483 /// Take elements on this stream while the provided asynchronous predicate 484 /// resolves to `true`. 485 /// 486 /// This function is similar to 487 /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits 488 /// early if an error occurs. 489 /// 490 /// # Examples 491 /// 492 /// ``` 493 /// # futures::executor::block_on(async { 494 /// use futures::future; 495 /// use futures::stream::{self, TryStreamExt}; 496 /// 497 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]); 498 /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3))); 499 /// 500 /// let output: Result<Vec<i32>, i32> = stream.try_collect().await; 501 /// assert_eq!(output, Ok(vec![1, 2])); 502 /// # }) 503 /// ``` try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,504 fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> 505 where 506 F: FnMut(&Self::Ok) -> Fut, 507 Fut: TryFuture<Ok = bool, Error = Self::Error>, 508 Self: Sized, 509 { 510 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f)) 511 } 512 513 /// Attempts to run this stream to completion, executing the provided asynchronous 514 /// closure for each element on the stream concurrently as elements become 515 /// available, exiting as soon as an error occurs. 516 /// 517 /// This is similar to 518 /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent), 519 /// but will resolve to an error immediately if the underlying stream or the provided 520 /// closure return an error. 521 /// 522 /// This method is only available when the `std` or `alloc` feature of this 523 /// library is activated, and it is activated by default. 524 /// 525 /// # Examples 526 /// 527 /// ``` 528 /// # futures::executor::block_on(async { 529 /// use futures::channel::oneshot; 530 /// use futures::stream::{self, StreamExt, TryStreamExt}; 531 /// 532 /// let (tx1, rx1) = oneshot::channel(); 533 /// let (tx2, rx2) = oneshot::channel(); 534 /// let (_tx3, rx3) = oneshot::channel(); 535 /// 536 /// let stream = stream::iter(vec![rx1, rx2, rx3]); 537 /// let fut = stream.map(Ok).try_for_each_concurrent( 538 /// /* limit */ 2, 539 /// |rx| async move { 540 /// let res: Result<(), oneshot::Canceled> = rx.await; 541 /// res 542 /// } 543 /// ); 544 /// 545 /// tx1.send(()).unwrap(); 546 /// // Drop the second sender so that `rx2` resolves to `Canceled`. 547 /// drop(tx2); 548 /// 549 /// // The final result is an error because the second future 550 /// // resulted in an error. 551 /// assert_eq!(Err(oneshot::Canceled), fut.await); 552 /// # }) 553 /// ``` 554 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 555 #[cfg(feature = "alloc")] try_for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> TryForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = Result<(), Self::Error>>, Self: Sized,556 fn try_for_each_concurrent<Fut, F>( 557 self, 558 limit: impl Into<Option<usize>>, 559 f: F, 560 ) -> TryForEachConcurrent<Self, Fut, F> 561 where 562 F: FnMut(Self::Ok) -> Fut, 563 Fut: Future<Output = Result<(), Self::Error>>, 564 Self: Sized, 565 { 566 assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new( 567 self, 568 limit.into(), 569 f, 570 )) 571 } 572 573 /// Attempt to transform a stream into a collection, 574 /// returning a future representing the result of that computation. 575 /// 576 /// This combinator will collect all successful results of this stream and 577 /// collect them into the specified collection type. If an error happens then all 578 /// collected elements will be dropped and the error will be returned. 579 /// 580 /// The returned future will be resolved when the stream terminates. 581 /// 582 /// # Examples 583 /// 584 /// ``` 585 /// # futures::executor::block_on(async { 586 /// use futures::channel::mpsc; 587 /// use futures::stream::TryStreamExt; 588 /// use std::thread; 589 /// 590 /// let (tx, rx) = mpsc::unbounded(); 591 /// 592 /// thread::spawn(move || { 593 /// for i in 1..=5 { 594 /// tx.unbounded_send(Ok(i)).unwrap(); 595 /// } 596 /// tx.unbounded_send(Err(6)).unwrap(); 597 /// }); 598 /// 599 /// let output: Result<Vec<i32>, i32> = rx.try_collect().await; 600 /// assert_eq!(output, Err(6)); 601 /// # }) 602 /// ``` try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C> where Self: Sized,603 fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C> 604 where 605 Self: Sized, 606 { 607 assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self)) 608 } 609 610 /// An adaptor for chunking up successful items of the stream inside a vector. 611 /// 612 /// This combinator will attempt to pull successful items from this stream and buffer 613 /// them into a local vector. At most `capacity` items will get buffered 614 /// before they're yielded from the returned stream. 615 /// 616 /// Note that the vectors returned from this iterator may not always have 617 /// `capacity` elements. If the underlying stream ended and only a partial 618 /// vector was created, it'll be returned. Additionally if an error happens 619 /// from the underlying stream then the currently buffered items will be 620 /// yielded. 621 /// 622 /// This method is only available when the `std` or `alloc` feature of this 623 /// library is activated, and it is activated by default. 624 /// 625 /// This function is similar to 626 /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits 627 /// early if an error occurs. 628 /// 629 /// # Examples 630 /// 631 /// ``` 632 /// # futures::executor::block_on(async { 633 /// use futures::stream::{self, TryChunksError, TryStreamExt}; 634 /// 635 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]); 636 /// let mut stream = stream.try_chunks(2); 637 /// 638 /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2]))); 639 /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4))); 640 /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6]))); 641 /// # }) 642 /// ``` 643 /// 644 /// # Panics 645 /// 646 /// This method will panic if `capacity` is zero. 647 #[cfg(feature = "alloc")] try_chunks(self, capacity: usize) -> TryChunks<Self> where Self: Sized,648 fn try_chunks(self, capacity: usize) -> TryChunks<Self> 649 where 650 Self: Sized, 651 { 652 assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>( 653 TryChunks::new(self, capacity), 654 ) 655 } 656 657 /// An adaptor for chunking up successful, ready items of the stream inside a vector. 658 /// 659 /// This combinator will attempt to pull successful items from this stream and buffer 660 /// them into a local vector. At most `capacity` items will get buffered 661 /// before they're yielded from the returned stream. If the underlying stream 662 /// returns `Poll::Pending`, and the collected chunk is not empty, it will 663 /// be immediately returned. 664 /// 665 /// Note that the vectors returned from this iterator may not always have 666 /// `capacity` elements. If the underlying stream ended and only a partial 667 /// vector was created, it'll be returned. Additionally if an error happens 668 /// from the underlying stream then the currently buffered items will be 669 /// yielded. 670 /// 671 /// This method is only available when the `std` or `alloc` feature of this 672 /// library is activated, and it is activated by default. 673 /// 674 /// This function is similar to 675 /// [`StreamExt::ready_chunks`](crate::stream::StreamExt::ready_chunks) but exits 676 /// early if an error occurs. 677 /// 678 /// # Examples 679 /// 680 /// ``` 681 /// # futures::executor::block_on(async { 682 /// use futures::stream::{self, TryReadyChunksError, TryStreamExt}; 683 /// 684 /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]); 685 /// let mut stream = stream.try_ready_chunks(2); 686 /// 687 /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2]))); 688 /// assert_eq!(stream.try_next().await, Err(TryReadyChunksError(vec![3], 4))); 689 /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6]))); 690 /// # }) 691 /// ``` 692 /// 693 /// # Panics 694 /// 695 /// This method will panic if `capacity` is zero. 696 #[cfg(feature = "alloc")] try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self> where Self: Sized,697 fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self> 698 where 699 Self: Sized, 700 { 701 assert_stream::<Result<Vec<Self::Ok>, TryReadyChunksError<Self::Ok, Self::Error>>, _>( 702 TryReadyChunks::new(self, capacity), 703 ) 704 } 705 706 /// Attempt to filter the values produced by this stream according to the 707 /// provided asynchronous closure. 708 /// 709 /// As values of this stream are made available, the provided predicate `f` 710 /// will be run on them. If the predicate returns a `Future` which resolves 711 /// to `true`, then the stream will yield the value, but if the predicate 712 /// return a `Future` which resolves to `false`, then the value will be 713 /// discarded and the next value will be produced. 714 /// 715 /// All errors are passed through without filtering in this combinator. 716 /// 717 /// Note that this function consumes the stream passed into it and returns a 718 /// wrapped version of it, similar to the existing `filter` methods in 719 /// the standard library. 720 /// 721 /// # Examples 722 /// ``` 723 /// # futures::executor::block_on(async { 724 /// use futures::future; 725 /// use futures::stream::{self, StreamExt, TryStreamExt}; 726 /// 727 /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]); 728 /// let mut evens = stream.try_filter(|x| { 729 /// future::ready(x % 2 == 0) 730 /// }); 731 /// 732 /// assert_eq!(evens.next().await, Some(Ok(2))); 733 /// assert_eq!(evens.next().await, Some(Err("error"))); 734 /// # }) 735 /// ``` try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> where Fut: Future<Output = bool>, F: FnMut(&Self::Ok) -> Fut, Self: Sized,736 fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> 737 where 738 Fut: Future<Output = bool>, 739 F: FnMut(&Self::Ok) -> Fut, 740 Self: Sized, 741 { 742 assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f)) 743 } 744 745 /// Attempt to filter the values produced by this stream while 746 /// simultaneously mapping them to a different type according to the 747 /// provided asynchronous closure. 748 /// 749 /// As values of this stream are made available, the provided function will 750 /// be run on them. If the future returned by the predicate `f` resolves to 751 /// [`Some(item)`](Some) then the stream will yield the value `item`, but if 752 /// it resolves to [`None`] then the next value will be produced. 753 /// 754 /// All errors are passed through without filtering in this combinator. 755 /// 756 /// Note that this function consumes the stream passed into it and returns a 757 /// wrapped version of it, similar to the existing `filter_map` methods in 758 /// the standard library. 759 /// 760 /// # Examples 761 /// ``` 762 /// # futures::executor::block_on(async { 763 /// use futures::stream::{self, StreamExt, TryStreamExt}; 764 /// use futures::pin_mut; 765 /// 766 /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]); 767 /// let halves = stream.try_filter_map(|x| async move { 768 /// let ret = if x % 2 == 0 { Some(x / 2) } else { None }; 769 /// Ok(ret) 770 /// }); 771 /// 772 /// pin_mut!(halves); 773 /// assert_eq!(halves.next().await, Some(Ok(3))); 774 /// assert_eq!(halves.next().await, Some(Err("error"))); 775 /// # }) 776 /// ``` try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> where Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, F: FnMut(Self::Ok) -> Fut, Self: Sized,777 fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> 778 where 779 Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, 780 F: FnMut(Self::Ok) -> Fut, 781 Self: Sized, 782 { 783 assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f)) 784 } 785 786 /// Flattens a stream of streams into just one continuous stream. Produced streams 787 /// will be polled concurrently and any errors will be passed through without looking at them. 788 /// If the underlying base stream returns an error, it will be **immediately** propagated. 789 /// 790 /// The only argument is an optional limit on the number of concurrently 791 /// polled streams. If this limit is not `None`, no more than `limit` streams 792 /// will be polled at the same time. The `limit` argument is of type 793 /// `Into<Option<usize>>`, and so can be provided as either `None`, 794 /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as 795 /// no limit at all, and will have the same result as passing in `None`. 796 /// 797 /// # Examples 798 /// 799 /// ``` 800 /// # futures::executor::block_on(async { 801 /// use futures::channel::mpsc; 802 /// use futures::stream::{StreamExt, TryStreamExt}; 803 /// use std::thread; 804 /// 805 /// let (tx1, rx1) = mpsc::unbounded(); 806 /// let (tx2, rx2) = mpsc::unbounded(); 807 /// let (tx3, rx3) = mpsc::unbounded(); 808 /// 809 /// thread::spawn(move || { 810 /// tx1.unbounded_send(Ok(1)).unwrap(); 811 /// }); 812 /// thread::spawn(move || { 813 /// tx2.unbounded_send(Ok(2)).unwrap(); 814 /// tx2.unbounded_send(Err(3)).unwrap(); 815 /// tx2.unbounded_send(Ok(4)).unwrap(); 816 /// }); 817 /// thread::spawn(move || { 818 /// tx3.unbounded_send(Ok(rx1)).unwrap(); 819 /// tx3.unbounded_send(Ok(rx2)).unwrap(); 820 /// tx3.unbounded_send(Err(5)).unwrap(); 821 /// }); 822 /// 823 /// let stream = rx3.try_flatten_unordered(None); 824 /// let mut values: Vec<_> = stream.collect().await; 825 /// values.sort(); 826 /// 827 /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]); 828 /// # }); 829 /// ``` 830 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 831 #[cfg(feature = "alloc")] try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self> where Self::Ok: TryStream + Unpin, <Self::Ok as TryStream>::Error: From<Self::Error>, Self: Sized,832 fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self> 833 where 834 Self::Ok: TryStream + Unpin, 835 <Self::Ok as TryStream>::Error: From<Self::Error>, 836 Self: Sized, 837 { 838 assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>( 839 TryFlattenUnordered::new(self, limit), 840 ) 841 } 842 843 /// Flattens a stream of streams into just one continuous stream. 844 /// 845 /// If this stream's elements are themselves streams then this combinator 846 /// will flatten out the entire stream to one long chain of elements. Any 847 /// errors are passed through without looking at them, but otherwise each 848 /// individual stream will get exhausted before moving on to the next. 849 /// 850 /// # Examples 851 /// 852 /// ``` 853 /// # futures::executor::block_on(async { 854 /// use futures::channel::mpsc; 855 /// use futures::stream::{StreamExt, TryStreamExt}; 856 /// use std::thread; 857 /// 858 /// let (tx1, rx1) = mpsc::unbounded(); 859 /// let (tx2, rx2) = mpsc::unbounded(); 860 /// let (tx3, rx3) = mpsc::unbounded(); 861 /// 862 /// thread::spawn(move || { 863 /// tx1.unbounded_send(Ok(1)).unwrap(); 864 /// }); 865 /// thread::spawn(move || { 866 /// tx2.unbounded_send(Ok(2)).unwrap(); 867 /// tx2.unbounded_send(Err(3)).unwrap(); 868 /// tx2.unbounded_send(Ok(4)).unwrap(); 869 /// }); 870 /// thread::spawn(move || { 871 /// tx3.unbounded_send(Ok(rx1)).unwrap(); 872 /// tx3.unbounded_send(Ok(rx2)).unwrap(); 873 /// tx3.unbounded_send(Err(5)).unwrap(); 874 /// }); 875 /// 876 /// let mut stream = rx3.try_flatten(); 877 /// assert_eq!(stream.next().await, Some(Ok(1))); 878 /// assert_eq!(stream.next().await, Some(Ok(2))); 879 /// assert_eq!(stream.next().await, Some(Err(3))); 880 /// assert_eq!(stream.next().await, Some(Ok(4))); 881 /// assert_eq!(stream.next().await, Some(Err(5))); 882 /// assert_eq!(stream.next().await, None); 883 /// # }); 884 /// ``` try_flatten(self) -> TryFlatten<Self> where Self::Ok: TryStream, <Self::Ok as TryStream>::Error: From<Self::Error>, Self: Sized,885 fn try_flatten(self) -> TryFlatten<Self> 886 where 887 Self::Ok: TryStream, 888 <Self::Ok as TryStream>::Error: From<Self::Error>, 889 Self: Sized, 890 { 891 assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>( 892 TryFlatten::new(self), 893 ) 894 } 895 896 /// Attempt to execute an accumulating asynchronous computation over a 897 /// stream, collecting all the values into one final result. 898 /// 899 /// This combinator will accumulate all values returned by this stream 900 /// according to the closure provided. The initial state is also provided to 901 /// this method and then is returned again by each execution of the closure. 902 /// Once the entire stream has been exhausted the returned future will 903 /// resolve to this value. 904 /// 905 /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will 906 /// exit early if an error is encountered in either the stream or the 907 /// provided closure. 908 /// 909 /// # Examples 910 /// 911 /// ``` 912 /// # futures::executor::block_on(async { 913 /// use futures::stream::{self, TryStreamExt}; 914 /// 915 /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]); 916 /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) }); 917 /// assert_eq!(sum.await, Ok(3)); 918 /// 919 /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]); 920 /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) }); 921 /// assert_eq!(sum.await, Err(2)); 922 /// # }) 923 /// ``` try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> where F: FnMut(T, Self::Ok) -> Fut, Fut: TryFuture<Ok = T, Error = Self::Error>, Self: Sized,924 fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> 925 where 926 F: FnMut(T, Self::Ok) -> Fut, 927 Fut: TryFuture<Ok = T, Error = Self::Error>, 928 Self: Sized, 929 { 930 assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init)) 931 } 932 933 /// Attempt to concatenate all items of a stream into a single 934 /// extendable destination, returning a future representing the end result. 935 /// 936 /// This combinator will extend the first item with the contents of all 937 /// the subsequent successful results of the stream. If the stream is empty, 938 /// the default value will be returned. 939 /// 940 /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait. 941 /// 942 /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will 943 /// exit early if an error is encountered in the stream. 944 /// 945 /// # Examples 946 /// 947 /// ``` 948 /// # futures::executor::block_on(async { 949 /// use futures::channel::mpsc; 950 /// use futures::stream::TryStreamExt; 951 /// use std::thread; 952 /// 953 /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>(); 954 /// 955 /// thread::spawn(move || { 956 /// for i in (0..3).rev() { 957 /// let n = i * 3; 958 /// tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap(); 959 /// } 960 /// }); 961 /// 962 /// let result = rx.try_concat().await; 963 /// 964 /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3])); 965 /// # }); 966 /// ``` try_concat(self) -> TryConcat<Self> where Self: Sized, Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,967 fn try_concat(self) -> TryConcat<Self> 968 where 969 Self: Sized, 970 Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default, 971 { 972 assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self)) 973 } 974 975 /// Attempt to execute several futures from a stream concurrently (unordered). 976 /// 977 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type 978 /// that matches the stream's `Error` type. 979 /// 980 /// This adaptor will buffer up to `n` futures and then return their 981 /// outputs in the order in which they complete. If the underlying stream 982 /// returns an error, it will be immediately propagated. 983 /// 984 /// The returned stream will be a stream of results, each containing either 985 /// an error or a future's output. An error can be produced either by the 986 /// underlying stream itself or by one of the futures it yielded. 987 /// 988 /// This method is only available when the `std` or `alloc` feature of this 989 /// library is activated, and it is activated by default. 990 /// 991 /// # Examples 992 /// 993 /// Results are returned in the order of completion: 994 /// ``` 995 /// # futures::executor::block_on(async { 996 /// use futures::channel::oneshot; 997 /// use futures::stream::{self, StreamExt, TryStreamExt}; 998 /// 999 /// let (send_one, recv_one) = oneshot::channel(); 1000 /// let (send_two, recv_two) = oneshot::channel(); 1001 /// 1002 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]); 1003 /// 1004 /// let mut buffered = stream_of_futures.try_buffer_unordered(10); 1005 /// 1006 /// send_two.send(2i32)?; 1007 /// assert_eq!(buffered.next().await, Some(Ok(2i32))); 1008 /// 1009 /// send_one.send(1i32)?; 1010 /// assert_eq!(buffered.next().await, Some(Ok(1i32))); 1011 /// 1012 /// assert_eq!(buffered.next().await, None); 1013 /// # Ok::<(), i32>(()) }).unwrap(); 1014 /// ``` 1015 /// 1016 /// Errors from the underlying stream itself are propagated: 1017 /// ``` 1018 /// # futures::executor::block_on(async { 1019 /// use futures::channel::mpsc; 1020 /// use futures::stream::{StreamExt, TryStreamExt}; 1021 /// 1022 /// let (sink, stream_of_futures) = mpsc::unbounded(); 1023 /// let mut buffered = stream_of_futures.try_buffer_unordered(10); 1024 /// 1025 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?; 1026 /// assert_eq!(buffered.next().await, Some(Ok(7i32))); 1027 /// 1028 /// sink.unbounded_send(Err("error in the stream"))?; 1029 /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); 1030 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); 1031 /// ``` 1032 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 1033 #[cfg(feature = "alloc")] try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,1034 fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> 1035 where 1036 Self::Ok: TryFuture<Error = Self::Error>, 1037 Self: Sized, 1038 { 1039 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>( 1040 TryBufferUnordered::new(self, n), 1041 ) 1042 } 1043 1044 /// Attempt to execute several futures from a stream concurrently. 1045 /// 1046 /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type 1047 /// that matches the stream's `Error` type. 1048 /// 1049 /// This adaptor will buffer up to `n` futures and then return their 1050 /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will 1051 /// be immediately propagated. 1052 /// 1053 /// The returned stream will be a stream of results, each containing either 1054 /// an error or a future's output. An error can be produced either by the 1055 /// underlying stream itself or by one of the futures it yielded. 1056 /// 1057 /// This method is only available when the `std` or `alloc` feature of this 1058 /// library is activated, and it is activated by default. 1059 /// 1060 /// # Examples 1061 /// 1062 /// Results are returned in the order of addition: 1063 /// ``` 1064 /// # futures::executor::block_on(async { 1065 /// use futures::channel::oneshot; 1066 /// use futures::future::lazy; 1067 /// use futures::stream::{self, StreamExt, TryStreamExt}; 1068 /// 1069 /// let (send_one, recv_one) = oneshot::channel(); 1070 /// let (send_two, recv_two) = oneshot::channel(); 1071 /// 1072 /// let mut buffered = lazy(move |cx| { 1073 /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]); 1074 /// 1075 /// let mut buffered = stream_of_futures.try_buffered(10); 1076 /// 1077 /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); 1078 /// 1079 /// send_two.send(2i32)?; 1080 /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); 1081 /// Ok::<_, i32>(buffered) 1082 /// }).await?; 1083 /// 1084 /// send_one.send(1i32)?; 1085 /// assert_eq!(buffered.next().await, Some(Ok(1i32))); 1086 /// assert_eq!(buffered.next().await, Some(Ok(2i32))); 1087 /// 1088 /// assert_eq!(buffered.next().await, None); 1089 /// # Ok::<(), i32>(()) }).unwrap(); 1090 /// ``` 1091 /// 1092 /// Errors from the underlying stream itself are propagated: 1093 /// ``` 1094 /// # futures::executor::block_on(async { 1095 /// use futures::channel::mpsc; 1096 /// use futures::stream::{StreamExt, TryStreamExt}; 1097 /// 1098 /// let (sink, stream_of_futures) = mpsc::unbounded(); 1099 /// let mut buffered = stream_of_futures.try_buffered(10); 1100 /// 1101 /// sink.unbounded_send(Ok(async { Ok(7i32) }))?; 1102 /// assert_eq!(buffered.next().await, Some(Ok(7i32))); 1103 /// 1104 /// sink.unbounded_send(Err("error in the stream"))?; 1105 /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); 1106 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); 1107 /// ``` 1108 #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] 1109 #[cfg(feature = "alloc")] try_buffered(self, n: usize) -> TryBuffered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,1110 fn try_buffered(self, n: usize) -> TryBuffered<Self> 1111 where 1112 Self::Ok: TryFuture<Error = Self::Error>, 1113 Self: Sized, 1114 { 1115 assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new( 1116 self, n, 1117 )) 1118 } 1119 1120 // TODO: false positive warning from rustdoc. Verify once #43466 settles 1121 // 1122 /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`] 1123 /// stream types. try_poll_next_unpin( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>> where Self: Unpin,1124 fn try_poll_next_unpin( 1125 &mut self, 1126 cx: &mut Context<'_>, 1127 ) -> Poll<Option<Result<Self::Ok, Self::Error>>> 1128 where 1129 Self: Unpin, 1130 { 1131 Pin::new(self).try_poll_next(cx) 1132 } 1133 1134 /// Wraps a [`TryStream`] into a stream compatible with libraries using 1135 /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled. 1136 /// ``` 1137 /// # if cfg!(miri) { return; } // Miri does not support epoll 1138 /// use futures::future::{FutureExt, TryFutureExt}; 1139 /// # let (tx, rx) = futures::channel::oneshot::channel(); 1140 /// 1141 /// let future03 = async { 1142 /// println!("Running on the pool"); 1143 /// tx.send(42).unwrap(); 1144 /// }; 1145 /// 1146 /// let future01 = future03 1147 /// .unit_error() // Make it a TryFuture 1148 /// .boxed() // Make it Unpin 1149 /// .compat(); 1150 /// 1151 /// tokio::run(future01); 1152 /// # assert_eq!(42, futures::executor::block_on(rx).unwrap()); 1153 /// ``` 1154 #[cfg(feature = "compat")] 1155 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] compat(self) -> Compat<Self> where Self: Sized + Unpin,1156 fn compat(self) -> Compat<Self> 1157 where 1158 Self: Sized + Unpin, 1159 { 1160 Compat::new(self) 1161 } 1162 1163 /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead). 1164 /// 1165 /// This method is only available when the `std` feature of this 1166 /// library is activated, and it is activated by default. 1167 /// 1168 /// # Examples 1169 /// 1170 /// ``` 1171 /// # futures::executor::block_on(async { 1172 /// use futures::stream::{self, TryStreamExt}; 1173 /// use futures::io::AsyncReadExt; 1174 /// 1175 /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]); 1176 /// let mut reader = stream.into_async_read(); 1177 /// 1178 /// let mut buf = Vec::new(); 1179 /// reader.read_to_end(&mut buf).await.unwrap(); 1180 /// assert_eq!(buf, [1, 2, 3, 4, 5]); 1181 /// # }) 1182 /// ``` 1183 #[cfg(feature = "io")] 1184 #[cfg_attr(docsrs, doc(cfg(feature = "io")))] 1185 #[cfg(feature = "std")] into_async_read(self) -> IntoAsyncRead<Self> where Self: Sized + TryStreamExt<Error = std::io::Error>, Self::Ok: AsRef<[u8]>,1186 fn into_async_read(self) -> IntoAsyncRead<Self> 1187 where 1188 Self: Sized + TryStreamExt<Error = std::io::Error>, 1189 Self::Ok: AsRef<[u8]>, 1190 { 1191 crate::io::assert_read(IntoAsyncRead::new(self)) 1192 } 1193 1194 /// Attempt to execute a predicate over an asynchronous stream and evaluate if all items 1195 /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found 1196 /// that does not satisfy the predicate. 1197 /// 1198 /// # Examples 1199 /// 1200 /// ``` 1201 /// # futures::executor::block_on(async { 1202 /// use futures::stream::{self, StreamExt, TryStreamExt}; 1203 /// use std::convert::Infallible; 1204 /// 1205 /// let number_stream = stream::iter(1..10).map(Ok::<_, Infallible>); 1206 /// let positive = number_stream.try_all(|i| async move { i > 0 }); 1207 /// assert_eq!(positive.await, Ok(true)); 1208 /// 1209 /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]); 1210 /// let positive = stream_with_errors.try_all(|i| async move { i > 0 }); 1211 /// assert_eq!(positive.await, Err("err")); 1212 /// # }); 1213 /// ``` try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F> where Self: Sized, F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = bool>,1214 fn try_all<Fut, F>(self, f: F) -> TryAll<Self, Fut, F> 1215 where 1216 Self: Sized, 1217 F: FnMut(Self::Ok) -> Fut, 1218 Fut: Future<Output = bool>, 1219 { 1220 assert_future::<Result<bool, Self::Error>, _>(TryAll::new(self, f)) 1221 } 1222 1223 /// Attempt to execute a predicate over an asynchronous stream and evaluate if any items 1224 /// satisfy the predicate. Exits early if an `Err` is encountered or if an `Ok` item is found 1225 /// that satisfies the predicate. 1226 /// 1227 /// # Examples 1228 /// 1229 /// ``` 1230 /// # futures::executor::block_on(async { 1231 /// use futures::stream::{self, StreamExt, TryStreamExt}; 1232 /// use std::convert::Infallible; 1233 /// 1234 /// let number_stream = stream::iter(0..10).map(Ok::<_, Infallible>); 1235 /// let contain_three = number_stream.try_any(|i| async move { i == 3 }); 1236 /// assert_eq!(contain_three.await, Ok(true)); 1237 /// 1238 /// let stream_with_errors = stream::iter([Ok(1), Err("err"), Ok(3)]); 1239 /// let contain_three = stream_with_errors.try_any(|i| async move { i == 3 }); 1240 /// assert_eq!(contain_three.await, Err("err")); 1241 /// # }); 1242 /// ``` try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F> where Self: Sized, F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = bool>,1243 fn try_any<Fut, F>(self, f: F) -> TryAny<Self, Fut, F> 1244 where 1245 Self: Sized, 1246 F: FnMut(Self::Ok) -> Fut, 1247 Fut: Future<Output = bool>, 1248 { 1249 assert_future::<Result<bool, Self::Error>, _>(TryAny::new(self, f)) 1250 } 1251 } 1252