1 use std::{
2     collections::VecDeque,
3     future::Future,
4     pin::Pin,
5     task::{Context, Poll},
6 };
7 
8 use super::Body;
9 
10 use bytes::{Buf, Bytes};
11 use http::HeaderMap;
12 use pin_project_lite::pin_project;
13 
14 pin_project! {
15     /// Future that resolves into a [`Collected`].
16     pub struct Collect<T>
17     where
18         T: Body,
19     {
20         #[pin]
21         body: T,
22         collected: Option<Collected<T::Data>>,
23         is_data_done: bool,
24     }
25 }
26 
27 impl<T: Body> Collect<T> {
new(body: T) -> Self28     pub(crate) fn new(body: T) -> Self {
29         Self {
30             body,
31             collected: Some(Collected::default()),
32             is_data_done: false,
33         }
34     }
35 }
36 
37 impl<T: Body> Future for Collect<T> {
38     type Output = Result<Collected<T::Data>, T::Error>;
39 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>40     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41         let mut me = self.project();
42 
43         loop {
44             if !*me.is_data_done {
45                 match me.body.as_mut().poll_data(cx) {
46                     Poll::Ready(Some(Ok(data))) => {
47                         me.collected.as_mut().unwrap().push_data(data);
48                     }
49                     Poll::Ready(Some(Err(err))) => {
50                         return Poll::Ready(Err(err));
51                     }
52                     Poll::Ready(None) => {
53                         *me.is_data_done = true;
54                     }
55                     Poll::Pending => return Poll::Pending,
56                 }
57             } else {
58                 match me.body.as_mut().poll_trailers(cx) {
59                     Poll::Ready(Ok(Some(trailers))) => {
60                         me.collected.as_mut().unwrap().push_trailers(trailers);
61                         break;
62                     }
63                     Poll::Ready(Err(err)) => {
64                         return Poll::Ready(Err(err));
65                     }
66                     Poll::Ready(Ok(None)) => break,
67                     Poll::Pending => return Poll::Pending,
68                 }
69             }
70         }
71 
72         Poll::Ready(Ok(me.collected.take().expect("polled after complete")))
73     }
74 }
75 
76 /// A collected body produced by [`Body::collect`] which collects all the DATA frames
77 /// and trailers.
78 #[derive(Debug)]
79 pub struct Collected<B> {
80     bufs: BufList<B>,
81     trailers: Option<HeaderMap>,
82 }
83 
84 impl<B: Buf> Collected<B> {
85     /// If there is a trailers frame buffered, returns a reference to it.
86     ///
87     /// Returns `None` if the body contained no trailers.
trailers(&self) -> Option<&HeaderMap>88     pub fn trailers(&self) -> Option<&HeaderMap> {
89         self.trailers.as_ref()
90     }
91 
92     /// Aggregate this buffered into a [`Buf`].
aggregate(self) -> impl Buf93     pub fn aggregate(self) -> impl Buf {
94         self.bufs
95     }
96 
97     /// Convert this body into a [`Bytes`].
to_bytes(mut self) -> Bytes98     pub fn to_bytes(mut self) -> Bytes {
99         self.bufs.copy_to_bytes(self.bufs.remaining())
100     }
101 
push_data(&mut self, data: B)102     fn push_data(&mut self, data: B) {
103         // Only push this frame if it has some data in it, to avoid crashing on
104         // `BufList::push`.
105         if data.has_remaining() {
106             self.bufs.push(data);
107         }
108     }
109 
push_trailers(&mut self, trailers: HeaderMap)110     fn push_trailers(&mut self, trailers: HeaderMap) {
111         if let Some(current) = &mut self.trailers {
112             current.extend(trailers);
113         } else {
114             self.trailers = Some(trailers);
115         }
116     }
117 }
118 
119 impl<B> Default for Collected<B> {
default() -> Self120     fn default() -> Self {
121         Self {
122             bufs: BufList::default(),
123             trailers: None,
124         }
125     }
126 }
127 
128 impl<B> Unpin for Collected<B> {}
129 
130 #[derive(Debug)]
131 struct BufList<T> {
132     bufs: VecDeque<T>,
133 }
134 
135 impl<T: Buf> BufList<T> {
136     #[inline]
push(&mut self, buf: T)137     pub(crate) fn push(&mut self, buf: T) {
138         debug_assert!(buf.has_remaining());
139         self.bufs.push_back(buf);
140     }
141 
142     /*
143     #[inline]
144     pub(crate) fn pop(&mut self) -> Option<T> {
145         self.bufs.pop_front()
146     }
147     */
148 }
149 
150 impl<T: Buf> Buf for BufList<T> {
151     #[inline]
remaining(&self) -> usize152     fn remaining(&self) -> usize {
153         self.bufs.iter().map(|buf| buf.remaining()).sum()
154     }
155 
156     #[inline]
chunk(&self) -> &[u8]157     fn chunk(&self) -> &[u8] {
158         self.bufs.front().map(Buf::chunk).unwrap_or_default()
159     }
160 
161     #[inline]
advance(&mut self, mut cnt: usize)162     fn advance(&mut self, mut cnt: usize) {
163         while cnt > 0 {
164             {
165                 let front = &mut self.bufs[0];
166                 let rem = front.remaining();
167                 if rem > cnt {
168                     front.advance(cnt);
169                     return;
170                 } else {
171                     front.advance(rem);
172                     cnt -= rem;
173                 }
174             }
175             self.bufs.pop_front();
176         }
177     }
178 
179     #[inline]
chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize180     fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize {
181         if dst.is_empty() {
182             return 0;
183         }
184         let mut vecs = 0;
185         for buf in &self.bufs {
186             vecs += buf.chunks_vectored(&mut dst[vecs..]);
187             if vecs == dst.len() {
188                 break;
189             }
190         }
191         vecs
192     }
193 
194     #[inline]
copy_to_bytes(&mut self, len: usize) -> Bytes195     fn copy_to_bytes(&mut self, len: usize) -> Bytes {
196         use bytes::{BufMut, BytesMut};
197         // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole
198         // request can be fulfilled by the front buffer, we can take advantage.
199         match self.bufs.front_mut() {
200             Some(front) if front.remaining() == len => {
201                 let b = front.copy_to_bytes(len);
202                 self.bufs.pop_front();
203                 b
204             }
205             Some(front) if front.remaining() > len => front.copy_to_bytes(len),
206             _ => {
207                 assert!(len <= self.remaining(), "`len` greater than remaining");
208                 let mut bm = BytesMut::with_capacity(len);
209                 bm.put(self.take(len));
210                 bm.freeze()
211             }
212         }
213     }
214 }
215 
216 impl<T> Default for BufList<T> {
default() -> Self217     fn default() -> Self {
218         BufList {
219             bufs: VecDeque::new(),
220         }
221     }
222 }
223