1 use std::{ 2 collections::VecDeque, 3 future::Future, 4 pin::Pin, 5 task::{Context, Poll}, 6 }; 7 8 use super::Body; 9 10 use bytes::{Buf, Bytes}; 11 use http::HeaderMap; 12 use pin_project_lite::pin_project; 13 14 pin_project! { 15 /// Future that resolves into a [`Collected`]. 16 pub struct Collect<T> 17 where 18 T: Body, 19 { 20 #[pin] 21 body: T, 22 collected: Option<Collected<T::Data>>, 23 is_data_done: bool, 24 } 25 } 26 27 impl<T: Body> Collect<T> { new(body: T) -> Self28 pub(crate) fn new(body: T) -> Self { 29 Self { 30 body, 31 collected: Some(Collected::default()), 32 is_data_done: false, 33 } 34 } 35 } 36 37 impl<T: Body> Future for Collect<T> { 38 type Output = Result<Collected<T::Data>, T::Error>; 39 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>40 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 41 let mut me = self.project(); 42 43 loop { 44 if !*me.is_data_done { 45 match me.body.as_mut().poll_data(cx) { 46 Poll::Ready(Some(Ok(data))) => { 47 me.collected.as_mut().unwrap().push_data(data); 48 } 49 Poll::Ready(Some(Err(err))) => { 50 return Poll::Ready(Err(err)); 51 } 52 Poll::Ready(None) => { 53 *me.is_data_done = true; 54 } 55 Poll::Pending => return Poll::Pending, 56 } 57 } else { 58 match me.body.as_mut().poll_trailers(cx) { 59 Poll::Ready(Ok(Some(trailers))) => { 60 me.collected.as_mut().unwrap().push_trailers(trailers); 61 break; 62 } 63 Poll::Ready(Err(err)) => { 64 return Poll::Ready(Err(err)); 65 } 66 Poll::Ready(Ok(None)) => break, 67 Poll::Pending => return Poll::Pending, 68 } 69 } 70 } 71 72 Poll::Ready(Ok(me.collected.take().expect("polled after complete"))) 73 } 74 } 75 76 /// A collected body produced by [`Body::collect`] which collects all the DATA frames 77 /// and trailers. 78 #[derive(Debug)] 79 pub struct Collected<B> { 80 bufs: BufList<B>, 81 trailers: Option<HeaderMap>, 82 } 83 84 impl<B: Buf> Collected<B> { 85 /// If there is a trailers frame buffered, returns a reference to it. 86 /// 87 /// Returns `None` if the body contained no trailers. trailers(&self) -> Option<&HeaderMap>88 pub fn trailers(&self) -> Option<&HeaderMap> { 89 self.trailers.as_ref() 90 } 91 92 /// Aggregate this buffered into a [`Buf`]. aggregate(self) -> impl Buf93 pub fn aggregate(self) -> impl Buf { 94 self.bufs 95 } 96 97 /// Convert this body into a [`Bytes`]. to_bytes(mut self) -> Bytes98 pub fn to_bytes(mut self) -> Bytes { 99 self.bufs.copy_to_bytes(self.bufs.remaining()) 100 } 101 push_data(&mut self, data: B)102 fn push_data(&mut self, data: B) { 103 // Only push this frame if it has some data in it, to avoid crashing on 104 // `BufList::push`. 105 if data.has_remaining() { 106 self.bufs.push(data); 107 } 108 } 109 push_trailers(&mut self, trailers: HeaderMap)110 fn push_trailers(&mut self, trailers: HeaderMap) { 111 if let Some(current) = &mut self.trailers { 112 current.extend(trailers); 113 } else { 114 self.trailers = Some(trailers); 115 } 116 } 117 } 118 119 impl<B> Default for Collected<B> { default() -> Self120 fn default() -> Self { 121 Self { 122 bufs: BufList::default(), 123 trailers: None, 124 } 125 } 126 } 127 128 impl<B> Unpin for Collected<B> {} 129 130 #[derive(Debug)] 131 struct BufList<T> { 132 bufs: VecDeque<T>, 133 } 134 135 impl<T: Buf> BufList<T> { 136 #[inline] push(&mut self, buf: T)137 pub(crate) fn push(&mut self, buf: T) { 138 debug_assert!(buf.has_remaining()); 139 self.bufs.push_back(buf); 140 } 141 142 /* 143 #[inline] 144 pub(crate) fn pop(&mut self) -> Option<T> { 145 self.bufs.pop_front() 146 } 147 */ 148 } 149 150 impl<T: Buf> Buf for BufList<T> { 151 #[inline] remaining(&self) -> usize152 fn remaining(&self) -> usize { 153 self.bufs.iter().map(|buf| buf.remaining()).sum() 154 } 155 156 #[inline] chunk(&self) -> &[u8]157 fn chunk(&self) -> &[u8] { 158 self.bufs.front().map(Buf::chunk).unwrap_or_default() 159 } 160 161 #[inline] advance(&mut self, mut cnt: usize)162 fn advance(&mut self, mut cnt: usize) { 163 while cnt > 0 { 164 { 165 let front = &mut self.bufs[0]; 166 let rem = front.remaining(); 167 if rem > cnt { 168 front.advance(cnt); 169 return; 170 } else { 171 front.advance(rem); 172 cnt -= rem; 173 } 174 } 175 self.bufs.pop_front(); 176 } 177 } 178 179 #[inline] chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize180 fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize { 181 if dst.is_empty() { 182 return 0; 183 } 184 let mut vecs = 0; 185 for buf in &self.bufs { 186 vecs += buf.chunks_vectored(&mut dst[vecs..]); 187 if vecs == dst.len() { 188 break; 189 } 190 } 191 vecs 192 } 193 194 #[inline] copy_to_bytes(&mut self, len: usize) -> Bytes195 fn copy_to_bytes(&mut self, len: usize) -> Bytes { 196 use bytes::{BufMut, BytesMut}; 197 // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole 198 // request can be fulfilled by the front buffer, we can take advantage. 199 match self.bufs.front_mut() { 200 Some(front) if front.remaining() == len => { 201 let b = front.copy_to_bytes(len); 202 self.bufs.pop_front(); 203 b 204 } 205 Some(front) if front.remaining() > len => front.copy_to_bytes(len), 206 _ => { 207 assert!(len <= self.remaining(), "`len` greater than remaining"); 208 let mut bm = BytesMut::with_capacity(len); 209 bm.put(self.take(len)); 210 bm.freeze() 211 } 212 } 213 } 214 } 215 216 impl<T> Default for BufList<T> { default() -> Self217 fn default() -> Self { 218 BufList { 219 bufs: VecDeque::new(), 220 } 221 } 222 } 223