1 //! Asynchronous streams. 2 3 use core::ops::DerefMut; 4 use core::pin::Pin; 5 use core::task::{Context, Poll}; 6 7 /// An owned dynamically typed [`Stream`] for use in cases where you can't 8 /// statically type your result or need to add some indirection. 9 /// 10 /// This type is often created by the [`boxed`] method on [`StreamExt`]. See its documentation for more. 11 /// 12 /// [`boxed`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.boxed 13 /// [`StreamExt`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html 14 #[cfg(feature = "alloc")] 15 pub type BoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + Send + 'a>>; 16 17 /// `BoxStream`, but without the `Send` requirement. 18 /// 19 /// This type is often created by the [`boxed_local`] method on [`StreamExt`]. See its documentation for more. 20 /// 21 /// [`boxed_local`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.boxed_local 22 /// [`StreamExt`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html 23 #[cfg(feature = "alloc")] 24 pub type LocalBoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + 'a>>; 25 26 /// A stream of values produced asynchronously. 27 /// 28 /// If `Future<Output = T>` is an asynchronous version of `T`, then `Stream<Item 29 /// = T>` is an asynchronous version of `Iterator<Item = T>`. A stream 30 /// represents a sequence of value-producing events that occur asynchronously to 31 /// the caller. 32 /// 33 /// The trait is modeled after `Future`, but allows `poll_next` to be called 34 /// even after a value has been produced, yielding `None` once the stream has 35 /// been fully exhausted. 36 #[must_use = "streams do nothing unless polled"] 37 pub trait Stream { 38 /// Values yielded by the stream. 39 type Item; 40 41 /// Attempt to pull out the next value of this stream, registering the 42 /// current task for wakeup if the value is not yet available, and returning 43 /// `None` if the stream is exhausted. 44 /// 45 /// # Return value 46 /// 47 /// There are several possible return values, each indicating a distinct 48 /// stream state: 49 /// 50 /// - `Poll::Pending` means that this stream's next value is not ready 51 /// yet. Implementations will ensure that the current task will be notified 52 /// when the next value may be ready. 53 /// 54 /// - `Poll::Ready(Some(val))` means that the stream has successfully 55 /// produced a value, `val`, and may produce further values on subsequent 56 /// `poll_next` calls. 57 /// 58 /// - `Poll::Ready(None)` means that the stream has terminated, and 59 /// `poll_next` should not be invoked again. 60 /// 61 /// # Panics 62 /// 63 /// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its 64 /// `poll_next` method again may panic, block forever, or cause other kinds of 65 /// problems; the `Stream` trait places no requirements on the effects of 66 /// such a call. However, as the `poll_next` method is not marked `unsafe`, 67 /// Rust's usual rules apply: calls must never cause undefined behavior 68 /// (memory corruption, incorrect use of `unsafe` functions, or the like), 69 /// regardless of the stream's state. 70 /// 71 /// If this is difficult to guard against then the [`fuse`] adapter can be used 72 /// to ensure that `poll_next` always returns `Ready(None)` in subsequent 73 /// calls. 74 /// 75 /// [`fuse`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.fuse poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>76 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; 77 78 /// Returns the bounds on the remaining length of the stream. 79 /// 80 /// Specifically, `size_hint()` returns a tuple where the first element 81 /// is the lower bound, and the second element is the upper bound. 82 /// 83 /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`. 84 /// A [`None`] here means that either there is no known upper bound, or the 85 /// upper bound is larger than [`usize`]. 86 /// 87 /// # Implementation notes 88 /// 89 /// It is not enforced that a stream implementation yields the declared 90 /// number of elements. A buggy stream may yield less than the lower bound 91 /// or more than the upper bound of elements. 92 /// 93 /// `size_hint()` is primarily intended to be used for optimizations such as 94 /// reserving space for the elements of the stream, but must not be 95 /// trusted to e.g., omit bounds checks in unsafe code. An incorrect 96 /// implementation of `size_hint()` should not lead to memory safety 97 /// violations. 98 /// 99 /// That said, the implementation should provide a correct estimation, 100 /// because otherwise it would be a violation of the trait's protocol. 101 /// 102 /// The default implementation returns `(0, `[`None`]`)` which is correct for any 103 /// stream. 104 #[inline] size_hint(&self) -> (usize, Option<usize>)105 fn size_hint(&self) -> (usize, Option<usize>) { 106 (0, None) 107 } 108 } 109 110 impl<S: ?Sized + Stream + Unpin> Stream for &mut S { 111 type Item = S::Item; 112 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>113 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 114 S::poll_next(Pin::new(&mut **self), cx) 115 } 116 size_hint(&self) -> (usize, Option<usize>)117 fn size_hint(&self) -> (usize, Option<usize>) { 118 (**self).size_hint() 119 } 120 } 121 122 impl<P> Stream for Pin<P> 123 where 124 P: DerefMut + Unpin, 125 P::Target: Stream, 126 { 127 type Item = <P::Target as Stream>::Item; 128 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>129 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 130 self.get_mut().as_mut().poll_next(cx) 131 } 132 size_hint(&self) -> (usize, Option<usize>)133 fn size_hint(&self) -> (usize, Option<usize>) { 134 (**self).size_hint() 135 } 136 } 137 138 /// A stream which tracks whether or not the underlying stream 139 /// should no longer be polled. 140 /// 141 /// `is_terminated` will return `true` if a future should no longer be polled. 142 /// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned 143 /// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a 144 /// stream has become inactive and can no longer make progress and should be 145 /// ignored or dropped rather than being polled again. 146 pub trait FusedStream: Stream { 147 /// Returns `true` if the stream should no longer be polled. is_terminated(&self) -> bool148 fn is_terminated(&self) -> bool; 149 } 150 151 impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F { is_terminated(&self) -> bool152 fn is_terminated(&self) -> bool { 153 <F as FusedStream>::is_terminated(&**self) 154 } 155 } 156 157 impl<P> FusedStream for Pin<P> 158 where 159 P: DerefMut + Unpin, 160 P::Target: FusedStream, 161 { is_terminated(&self) -> bool162 fn is_terminated(&self) -> bool { 163 <P::Target as FusedStream>::is_terminated(&**self) 164 } 165 } 166 167 mod private_try_stream { 168 use super::Stream; 169 170 pub trait Sealed {} 171 172 impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {} 173 } 174 175 /// A convenience for streams that return `Result` values that includes 176 /// a variety of adapters tailored to such futures. 177 pub trait TryStream: Stream + private_try_stream::Sealed { 178 /// The type of successful values yielded by this future 179 type Ok; 180 181 /// The type of failures yielded by this future 182 type Error; 183 184 /// Poll this `TryStream` as if it were a `Stream`. 185 /// 186 /// This method is a stopgap for a compiler limitation that prevents us from 187 /// directly inheriting from the `Stream` trait; in the future it won't be 188 /// needed. try_poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>>189 fn try_poll_next( 190 self: Pin<&mut Self>, 191 cx: &mut Context<'_>, 192 ) -> Poll<Option<Result<Self::Ok, Self::Error>>>; 193 } 194 195 impl<S, T, E> TryStream for S 196 where 197 S: ?Sized + Stream<Item = Result<T, E>>, 198 { 199 type Ok = T; 200 type Error = E; 201 try_poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>>202 fn try_poll_next( 203 self: Pin<&mut Self>, 204 cx: &mut Context<'_>, 205 ) -> Poll<Option<Result<Self::Ok, Self::Error>>> { 206 self.poll_next(cx) 207 } 208 } 209 210 #[cfg(feature = "alloc")] 211 mod if_alloc { 212 use super::*; 213 use alloc::boxed::Box; 214 215 impl<S: ?Sized + Stream + Unpin> Stream for Box<S> { 216 type Item = S::Item; 217 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>218 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 219 Pin::new(&mut **self).poll_next(cx) 220 } 221 size_hint(&self) -> (usize, Option<usize>)222 fn size_hint(&self) -> (usize, Option<usize>) { 223 (**self).size_hint() 224 } 225 } 226 227 #[cfg(feature = "std")] 228 impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> { 229 type Item = S::Item; 230 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>231 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { 232 unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx) 233 } 234 size_hint(&self) -> (usize, Option<usize>)235 fn size_hint(&self) -> (usize, Option<usize>) { 236 self.0.size_hint() 237 } 238 } 239 240 impl<S: ?Sized + FusedStream + Unpin> FusedStream for Box<S> { is_terminated(&self) -> bool241 fn is_terminated(&self) -> bool { 242 <S as FusedStream>::is_terminated(&**self) 243 } 244 } 245 } 246