1 use crate::Stream;
2 
3 use core::future::Future;
4 use core::marker::PhantomPinned;
5 use core::mem;
6 use core::pin::Pin;
7 use core::task::{ready, Context, Poll};
8 use pin_project_lite::pin_project;
9 
10 // Do not export this struct until `FromStream` can be unsealed.
11 pin_project! {
12     /// Future returned by the [`collect`](super::StreamExt::collect) method.
13     #[must_use = "futures do nothing unless you `.await` or poll them"]
14     #[derive(Debug)]
15     pub struct Collect<T, U>
16     where
17         T: Stream,
18         U: FromStream<T::Item>,
19     {
20         #[pin]
21         stream: T,
22         collection: U::InternalCollection,
23         // Make this future `!Unpin` for compatibility with async trait methods.
24         #[pin]
25         _pin: PhantomPinned,
26     }
27 }
28 
29 /// Convert from a [`Stream`].
30 ///
31 /// This trait is not intended to be used directly. Instead, call
32 /// [`StreamExt::collect()`](super::StreamExt::collect).
33 ///
34 /// # Implementing
35 ///
36 /// Currently, this trait may not be implemented by third parties. The trait is
37 /// sealed in order to make changes in the future. Stabilization is pending
38 /// enhancements to the Rust language.
39 pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
40 
41 impl<T, U> Collect<T, U>
42 where
43     T: Stream,
44     U: FromStream<T::Item>,
45 {
new(stream: T) -> Collect<T, U>46     pub(super) fn new(stream: T) -> Collect<T, U> {
47         let (lower, upper) = stream.size_hint();
48         let collection = U::initialize(sealed::Internal, lower, upper);
49 
50         Collect {
51             stream,
52             collection,
53             _pin: PhantomPinned,
54         }
55     }
56 }
57 
58 impl<T, U> Future for Collect<T, U>
59 where
60     T: Stream,
61     U: FromStream<T::Item>,
62 {
63     type Output = U;
64 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U>65     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
66         use Poll::Ready;
67 
68         loop {
69             let me = self.as_mut().project();
70 
71             let item = match ready!(me.stream.poll_next(cx)) {
72                 Some(item) => item,
73                 None => {
74                     return Ready(U::finalize(sealed::Internal, me.collection));
75                 }
76             };
77 
78             if !U::extend(sealed::Internal, me.collection, item) {
79                 return Ready(U::finalize(sealed::Internal, me.collection));
80             }
81         }
82     }
83 }
84 
85 // ===== FromStream implementations
86 
87 impl FromStream<()> for () {}
88 
89 impl sealed::FromStreamPriv<()> for () {
90     type InternalCollection = ();
91 
initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>)92     fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}
93 
extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool94     fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
95         true
96     }
97 
finalize(_: sealed::Internal, _collection: &mut ())98     fn finalize(_: sealed::Internal, _collection: &mut ()) {}
99 }
100 
101 impl<T: AsRef<str>> FromStream<T> for String {}
102 
103 impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
104     type InternalCollection = String;
105 
initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String106     fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
107         String::new()
108     }
109 
extend(_: sealed::Internal, collection: &mut String, item: T) -> bool110     fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
111         collection.push_str(item.as_ref());
112         true
113     }
114 
finalize(_: sealed::Internal, collection: &mut String) -> String115     fn finalize(_: sealed::Internal, collection: &mut String) -> String {
116         mem::take(collection)
117     }
118 }
119 
120 impl<T> FromStream<T> for Vec<T> {}
121 
122 impl<T> sealed::FromStreamPriv<T> for Vec<T> {
123     type InternalCollection = Vec<T>;
124 
initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T>125     fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
126         Vec::with_capacity(lower)
127     }
128 
extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool129     fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
130         collection.push(item);
131         true
132     }
133 
finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T>134     fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
135         mem::take(collection)
136     }
137 }
138 
139 impl<T> FromStream<T> for Box<[T]> {}
140 
141 impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
142     type InternalCollection = Vec<T>;
143 
initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T>144     fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
145         <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
146     }
147 
extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool148     fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
149         <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
150     }
151 
finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]>152     fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
153         <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
154             .into_boxed_slice()
155     }
156 }
157 
158 impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
159 
160 impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
161 where
162     U: FromStream<T>,
163 {
164     type InternalCollection = Result<U::InternalCollection, E>;
165 
initialize( _: sealed::Internal, lower: usize, upper: Option<usize>, ) -> Result<U::InternalCollection, E>166     fn initialize(
167         _: sealed::Internal,
168         lower: usize,
169         upper: Option<usize>,
170     ) -> Result<U::InternalCollection, E> {
171         Ok(U::initialize(sealed::Internal, lower, upper))
172     }
173 
extend( _: sealed::Internal, collection: &mut Self::InternalCollection, item: Result<T, E>, ) -> bool174     fn extend(
175         _: sealed::Internal,
176         collection: &mut Self::InternalCollection,
177         item: Result<T, E>,
178     ) -> bool {
179         assert!(collection.is_ok());
180         match item {
181             Ok(item) => {
182                 let collection = collection.as_mut().ok().expect("invalid state");
183                 U::extend(sealed::Internal, collection, item)
184             }
185             Err(err) => {
186                 *collection = Err(err);
187                 false
188             }
189         }
190     }
191 
finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E>192     fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
193         if let Ok(collection) = collection.as_mut() {
194             Ok(U::finalize(sealed::Internal, collection))
195         } else {
196             let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
197 
198             Err(res.map(drop).unwrap_err())
199         }
200     }
201 }
202 
203 pub(crate) mod sealed {
204     #[doc(hidden)]
205     pub trait FromStreamPriv<T> {
206         /// Intermediate type used during collection process
207         ///
208         /// The name of this type is internal and cannot be relied upon.
209         type InternalCollection;
210 
211         /// Initialize the collection
initialize( internal: Internal, lower: usize, upper: Option<usize>, ) -> Self::InternalCollection212         fn initialize(
213             internal: Internal,
214             lower: usize,
215             upper: Option<usize>,
216         ) -> Self::InternalCollection;
217 
218         /// Extend the collection with the received item
219         ///
220         /// Return `true` to continue streaming, `false` complete collection.
extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool221         fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;
222 
223         /// Finalize collection into target type.
finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self224         fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
225     }
226 
227     #[allow(missing_debug_implementations)]
228     pub struct Internal;
229 }
230