1 use crate::Stream; 2 3 use core::future::Future; 4 use core::marker::PhantomPinned; 5 use core::mem; 6 use core::pin::Pin; 7 use core::task::{ready, Context, Poll}; 8 use pin_project_lite::pin_project; 9 10 // Do not export this struct until `FromStream` can be unsealed. 11 pin_project! { 12 /// Future returned by the [`collect`](super::StreamExt::collect) method. 13 #[must_use = "futures do nothing unless you `.await` or poll them"] 14 #[derive(Debug)] 15 pub struct Collect<T, U> 16 where 17 T: Stream, 18 U: FromStream<T::Item>, 19 { 20 #[pin] 21 stream: T, 22 collection: U::InternalCollection, 23 // Make this future `!Unpin` for compatibility with async trait methods. 24 #[pin] 25 _pin: PhantomPinned, 26 } 27 } 28 29 /// Convert from a [`Stream`]. 30 /// 31 /// This trait is not intended to be used directly. Instead, call 32 /// [`StreamExt::collect()`](super::StreamExt::collect). 33 /// 34 /// # Implementing 35 /// 36 /// Currently, this trait may not be implemented by third parties. The trait is 37 /// sealed in order to make changes in the future. Stabilization is pending 38 /// enhancements to the Rust language. 39 pub trait FromStream<T>: sealed::FromStreamPriv<T> {} 40 41 impl<T, U> Collect<T, U> 42 where 43 T: Stream, 44 U: FromStream<T::Item>, 45 { new(stream: T) -> Collect<T, U>46 pub(super) fn new(stream: T) -> Collect<T, U> { 47 let (lower, upper) = stream.size_hint(); 48 let collection = U::initialize(sealed::Internal, lower, upper); 49 50 Collect { 51 stream, 52 collection, 53 _pin: PhantomPinned, 54 } 55 } 56 } 57 58 impl<T, U> Future for Collect<T, U> 59 where 60 T: Stream, 61 U: FromStream<T::Item>, 62 { 63 type Output = U; 64 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U>65 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> { 66 use Poll::Ready; 67 68 loop { 69 let me = self.as_mut().project(); 70 71 let item = match ready!(me.stream.poll_next(cx)) { 72 Some(item) => item, 73 None => { 74 return Ready(U::finalize(sealed::Internal, me.collection)); 75 } 76 }; 77 78 if !U::extend(sealed::Internal, me.collection, item) { 79 return Ready(U::finalize(sealed::Internal, me.collection)); 80 } 81 } 82 } 83 } 84 85 // ===== FromStream implementations 86 87 impl FromStream<()> for () {} 88 89 impl sealed::FromStreamPriv<()> for () { 90 type InternalCollection = (); 91 initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>)92 fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {} 93 extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool94 fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool { 95 true 96 } 97 finalize(_: sealed::Internal, _collection: &mut ())98 fn finalize(_: sealed::Internal, _collection: &mut ()) {} 99 } 100 101 impl<T: AsRef<str>> FromStream<T> for String {} 102 103 impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String { 104 type InternalCollection = String; 105 initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String106 fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String { 107 String::new() 108 } 109 extend(_: sealed::Internal, collection: &mut String, item: T) -> bool110 fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool { 111 collection.push_str(item.as_ref()); 112 true 113 } 114 finalize(_: sealed::Internal, collection: &mut String) -> String115 fn finalize(_: sealed::Internal, collection: &mut String) -> String { 116 mem::take(collection) 117 } 118 } 119 120 impl<T> FromStream<T> for Vec<T> {} 121 122 impl<T> sealed::FromStreamPriv<T> for Vec<T> { 123 type InternalCollection = Vec<T>; 124 initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T>125 fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> { 126 Vec::with_capacity(lower) 127 } 128 extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool129 fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool { 130 collection.push(item); 131 true 132 } 133 finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T>134 fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> { 135 mem::take(collection) 136 } 137 } 138 139 impl<T> FromStream<T> for Box<[T]> {} 140 141 impl<T> sealed::FromStreamPriv<T> for Box<[T]> { 142 type InternalCollection = Vec<T>; 143 initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T>144 fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> { 145 <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper) 146 } 147 extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool148 fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool { 149 <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item) 150 } 151 finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]>152 fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> { 153 <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection) 154 .into_boxed_slice() 155 } 156 } 157 158 impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {} 159 160 impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E> 161 where 162 U: FromStream<T>, 163 { 164 type InternalCollection = Result<U::InternalCollection, E>; 165 initialize( _: sealed::Internal, lower: usize, upper: Option<usize>, ) -> Result<U::InternalCollection, E>166 fn initialize( 167 _: sealed::Internal, 168 lower: usize, 169 upper: Option<usize>, 170 ) -> Result<U::InternalCollection, E> { 171 Ok(U::initialize(sealed::Internal, lower, upper)) 172 } 173 extend( _: sealed::Internal, collection: &mut Self::InternalCollection, item: Result<T, E>, ) -> bool174 fn extend( 175 _: sealed::Internal, 176 collection: &mut Self::InternalCollection, 177 item: Result<T, E>, 178 ) -> bool { 179 assert!(collection.is_ok()); 180 match item { 181 Ok(item) => { 182 let collection = collection.as_mut().ok().expect("invalid state"); 183 U::extend(sealed::Internal, collection, item) 184 } 185 Err(err) => { 186 *collection = Err(err); 187 false 188 } 189 } 190 } 191 finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E>192 fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> { 193 if let Ok(collection) = collection.as_mut() { 194 Ok(U::finalize(sealed::Internal, collection)) 195 } else { 196 let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0)))); 197 198 Err(res.map(drop).unwrap_err()) 199 } 200 } 201 } 202 203 pub(crate) mod sealed { 204 #[doc(hidden)] 205 pub trait FromStreamPriv<T> { 206 /// Intermediate type used during collection process 207 /// 208 /// The name of this type is internal and cannot be relied upon. 209 type InternalCollection; 210 211 /// Initialize the collection initialize( internal: Internal, lower: usize, upper: Option<usize>, ) -> Self::InternalCollection212 fn initialize( 213 internal: Internal, 214 lower: usize, 215 upper: Option<usize>, 216 ) -> Self::InternalCollection; 217 218 /// Extend the collection with the received item 219 /// 220 /// Return `true` to continue streaming, `false` complete collection. extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool221 fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool; 222 223 /// Finalize collection into target type. finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self224 fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self; 225 } 226 227 #[allow(missing_debug_implementations)] 228 pub struct Internal; 229 } 230