1 //! Asynchronous streams.
2 
3 use core::ops::DerefMut;
4 use core::pin::Pin;
5 use core::task::{Context, Poll};
6 
7 /// An owned dynamically typed [`Stream`] for use in cases where you can't
8 /// statically type your result or need to add some indirection.
9 ///
10 /// This type is often created by the [`boxed`] method on [`StreamExt`]. See its documentation for more.
11 ///
12 /// [`boxed`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.boxed
13 /// [`StreamExt`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html
14 #[cfg(feature = "alloc")]
15 pub type BoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + Send + 'a>>;
16 
17 /// `BoxStream`, but without the `Send` requirement.
18 ///
19 /// This type is often created by the [`boxed_local`] method on [`StreamExt`]. See its documentation for more.
20 ///
21 /// [`boxed_local`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.boxed_local
22 /// [`StreamExt`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html
23 #[cfg(feature = "alloc")]
24 pub type LocalBoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + 'a>>;
25 
26 /// A stream of values produced asynchronously.
27 ///
28 /// If `Future<Output = T>` is an asynchronous version of `T`, then `Stream<Item
29 /// = T>` is an asynchronous version of `Iterator<Item = T>`. A stream
30 /// represents a sequence of value-producing events that occur asynchronously to
31 /// the caller.
32 ///
33 /// The trait is modeled after `Future`, but allows `poll_next` to be called
34 /// even after a value has been produced, yielding `None` once the stream has
35 /// been fully exhausted.
36 #[must_use = "streams do nothing unless polled"]
37 pub trait Stream {
38     /// Values yielded by the stream.
39     type Item;
40 
41     /// Attempt to pull out the next value of this stream, registering the
42     /// current task for wakeup if the value is not yet available, and returning
43     /// `None` if the stream is exhausted.
44     ///
45     /// # Return value
46     ///
47     /// There are several possible return values, each indicating a distinct
48     /// stream state:
49     ///
50     /// - `Poll::Pending` means that this stream's next value is not ready
51     ///   yet. Implementations will ensure that the current task will be notified
52     ///   when the next value may be ready.
53     ///
54     /// - `Poll::Ready(Some(val))` means that the stream has successfully
55     ///   produced a value, `val`, and may produce further values on subsequent
56     ///   `poll_next` calls.
57     ///
58     /// - `Poll::Ready(None)` means that the stream has terminated, and
59     ///   `poll_next` should not be invoked again.
60     ///
61     /// # Panics
62     ///
63     /// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its
64     /// `poll_next` method again may panic, block forever, or cause other kinds of
65     /// problems; the `Stream` trait places no requirements on the effects of
66     /// such a call. However, as the `poll_next` method is not marked `unsafe`,
67     /// Rust's usual rules apply: calls must never cause undefined behavior
68     /// (memory corruption, incorrect use of `unsafe` functions, or the like),
69     /// regardless of the stream's state.
70     ///
71     /// If this is difficult to guard against then the [`fuse`] adapter can be used
72     /// to ensure that `poll_next` always returns `Ready(None)` in subsequent
73     /// calls.
74     ///
75     /// [`fuse`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.fuse
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>76     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
77 
78     /// Returns the bounds on the remaining length of the stream.
79     ///
80     /// Specifically, `size_hint()` returns a tuple where the first element
81     /// is the lower bound, and the second element is the upper bound.
82     ///
83     /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
84     /// A [`None`] here means that either there is no known upper bound, or the
85     /// upper bound is larger than [`usize`].
86     ///
87     /// # Implementation notes
88     ///
89     /// It is not enforced that a stream implementation yields the declared
90     /// number of elements. A buggy stream may yield less than the lower bound
91     /// or more than the upper bound of elements.
92     ///
93     /// `size_hint()` is primarily intended to be used for optimizations such as
94     /// reserving space for the elements of the stream, but must not be
95     /// trusted to e.g., omit bounds checks in unsafe code. An incorrect
96     /// implementation of `size_hint()` should not lead to memory safety
97     /// violations.
98     ///
99     /// That said, the implementation should provide a correct estimation,
100     /// because otherwise it would be a violation of the trait's protocol.
101     ///
102     /// The default implementation returns `(0, `[`None`]`)` which is correct for any
103     /// stream.
104     #[inline]
size_hint(&self) -> (usize, Option<usize>)105     fn size_hint(&self) -> (usize, Option<usize>) {
106         (0, None)
107     }
108 }
109 
110 impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
111     type Item = S::Item;
112 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>113     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
114         S::poll_next(Pin::new(&mut **self), cx)
115     }
116 
size_hint(&self) -> (usize, Option<usize>)117     fn size_hint(&self) -> (usize, Option<usize>) {
118         (**self).size_hint()
119     }
120 }
121 
122 impl<P> Stream for Pin<P>
123 where
124     P: DerefMut + Unpin,
125     P::Target: Stream,
126 {
127     type Item = <P::Target as Stream>::Item;
128 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>129     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130         self.get_mut().as_mut().poll_next(cx)
131     }
132 
size_hint(&self) -> (usize, Option<usize>)133     fn size_hint(&self) -> (usize, Option<usize>) {
134         (**self).size_hint()
135     }
136 }
137 
138 /// A stream which tracks whether or not the underlying stream
139 /// should no longer be polled.
140 ///
141 /// `is_terminated` will return `true` if a future should no longer be polled.
142 /// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned
143 /// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a
144 /// stream has become inactive and can no longer make progress and should be
145 /// ignored or dropped rather than being polled again.
146 pub trait FusedStream: Stream {
147     /// Returns `true` if the stream should no longer be polled.
is_terminated(&self) -> bool148     fn is_terminated(&self) -> bool;
149 }
150 
151 impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F {
is_terminated(&self) -> bool152     fn is_terminated(&self) -> bool {
153         <F as FusedStream>::is_terminated(&**self)
154     }
155 }
156 
157 impl<P> FusedStream for Pin<P>
158 where
159     P: DerefMut + Unpin,
160     P::Target: FusedStream,
161 {
is_terminated(&self) -> bool162     fn is_terminated(&self) -> bool {
163         <P::Target as FusedStream>::is_terminated(&**self)
164     }
165 }
166 
167 mod private_try_stream {
168     use super::Stream;
169 
170     pub trait Sealed {}
171 
172     impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {}
173 }
174 
175 /// A convenience for streams that return `Result` values that includes
176 /// a variety of adapters tailored to such futures.
177 pub trait TryStream: Stream + private_try_stream::Sealed {
178     /// The type of successful values yielded by this future
179     type Ok;
180 
181     /// The type of failures yielded by this future
182     type Error;
183 
184     /// Poll this `TryStream` as if it were a `Stream`.
185     ///
186     /// This method is a stopgap for a compiler limitation that prevents us from
187     /// directly inheriting from the `Stream` trait; in the future it won't be
188     /// needed.
try_poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>>189     fn try_poll_next(
190         self: Pin<&mut Self>,
191         cx: &mut Context<'_>,
192     ) -> Poll<Option<Result<Self::Ok, Self::Error>>>;
193 }
194 
195 impl<S, T, E> TryStream for S
196 where
197     S: ?Sized + Stream<Item = Result<T, E>>,
198 {
199     type Ok = T;
200     type Error = E;
201 
try_poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>>202     fn try_poll_next(
203         self: Pin<&mut Self>,
204         cx: &mut Context<'_>,
205     ) -> Poll<Option<Result<Self::Ok, Self::Error>>> {
206         self.poll_next(cx)
207     }
208 }
209 
210 #[cfg(feature = "alloc")]
211 mod if_alloc {
212     use super::*;
213     use alloc::boxed::Box;
214 
215     impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
216         type Item = S::Item;
217 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>218         fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
219             Pin::new(&mut **self).poll_next(cx)
220         }
221 
size_hint(&self) -> (usize, Option<usize>)222         fn size_hint(&self) -> (usize, Option<usize>) {
223             (**self).size_hint()
224         }
225     }
226 
227     #[cfg(feature = "std")]
228     impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> {
229         type Item = S::Item;
230 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>231         fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
232             unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
233         }
234 
size_hint(&self) -> (usize, Option<usize>)235         fn size_hint(&self) -> (usize, Option<usize>) {
236             self.0.size_hint()
237         }
238     }
239 
240     impl<S: ?Sized + FusedStream + Unpin> FusedStream for Box<S> {
is_terminated(&self) -> bool241         fn is_terminated(&self) -> bool {
242             <S as FusedStream>::is_terminated(&**self)
243         }
244     }
245 }
246