1 use std::borrow::Cow;
2 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
3 use std::convert::Infallible;
4 #[cfg(feature = "stream")]
5 use std::error::Error as StdError;
6 use std::fmt;
7 use std::future::Future;
8 use std::pin::Pin;
9 use std::task::{Context, Poll};
10 
11 use bytes::Bytes;
12 use futures_channel::mpsc;
13 use futures_channel::oneshot;
14 use futures_core::Stream; // for mpsc::Receiver
15 #[cfg(feature = "stream")]
16 use futures_util::TryStreamExt;
17 use http::HeaderMap;
18 use http_body::{Body as HttpBody, SizeHint};
19 
20 use super::DecodedLength;
21 #[cfg(feature = "stream")]
22 use crate::common::sync_wrapper::SyncWrapper;
23 use crate::common::watch;
24 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
25 use crate::proto::h2::ping;
26 
27 type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
28 type TrailersSender = oneshot::Sender<HeaderMap>;
29 
30 /// A stream of `Bytes`, used when receiving bodies.
31 ///
32 /// A good default [`HttpBody`](crate::body::HttpBody) to use in many
33 /// applications.
34 ///
35 /// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes())
36 /// or [`body::aggregate`](crate::body::aggregate()).
37 #[must_use = "streams do nothing unless polled"]
38 pub struct Body {
39     kind: Kind,
40     /// Keep the extra bits in an `Option<Box<Extra>>`, so that
41     /// Body stays small in the common case (no extras needed).
42     extra: Option<Box<Extra>>,
43 }
44 
45 enum Kind {
46     Once(Option<Bytes>),
47     Chan {
48         content_length: DecodedLength,
49         want_tx: watch::Sender,
50         data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
51         trailers_rx: oneshot::Receiver<HeaderMap>,
52     },
53     #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
54     H2 {
55         ping: ping::Recorder,
56         content_length: DecodedLength,
57         recv: h2::RecvStream,
58     },
59     #[cfg(feature = "ffi")]
60     Ffi(crate::ffi::UserBody),
61     #[cfg(feature = "stream")]
62     Wrapped(
63         SyncWrapper<
64             Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
65         >,
66     ),
67 }
68 
69 struct Extra {
70     /// Allow the client to pass a future to delay the `Body` from returning
71     /// EOF. This allows the `Client` to try to put the idle connection
72     /// back into the pool before the body is "finished".
73     ///
74     /// The reason for this is so that creating a new request after finishing
75     /// streaming the body of a response could sometimes result in creating
76     /// a brand new connection, since the pool didn't know about the idle
77     /// connection yet.
78     delayed_eof: Option<DelayEof>,
79 }
80 
81 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
82 type DelayEofUntil = oneshot::Receiver<Infallible>;
83 
84 enum DelayEof {
85     /// Initial state, stream hasn't seen EOF yet.
86     #[cfg(any(feature = "http1", feature = "http2"))]
87     #[cfg(feature = "client")]
88     NotEof(DelayEofUntil),
89     /// Transitions to this state once we've seen `poll` try to
90     /// return EOF (`None`). This future is then polled, and
91     /// when it completes, the Body finally returns EOF (`None`).
92     #[cfg(any(feature = "http1", feature = "http2"))]
93     #[cfg(feature = "client")]
94     Eof(DelayEofUntil),
95 }
96 
97 /// A sender half created through [`Body::channel()`].
98 ///
99 /// Useful when wanting to stream chunks from another thread.
100 ///
101 /// ## Body Closing
102 ///
103 /// Note that the request body will always be closed normally when the sender is dropped (meaning
104 /// that the empty terminating chunk will be sent to the remote). If you desire to close the
105 /// connection with an incomplete response (e.g. in the case of an error during asynchronous
106 /// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
107 ///
108 /// [`Body::channel()`]: struct.Body.html#method.channel
109 /// [`Sender::abort()`]: struct.Sender.html#method.abort
110 #[must_use = "Sender does nothing unless sent on"]
111 pub struct Sender {
112     want_rx: watch::Receiver,
113     data_tx: BodySender,
114     trailers_tx: Option<TrailersSender>,
115 }
116 
117 const WANT_PENDING: usize = 1;
118 const WANT_READY: usize = 2;
119 
120 impl Body {
121     /// Create an empty `Body` stream.
122     ///
123     /// # Example
124     ///
125     /// ```
126     /// use hyper::{Body, Request};
127     ///
128     /// // create a `GET /` request
129     /// let get = Request::new(Body::empty());
130     /// ```
131     #[inline]
empty() -> Body132     pub fn empty() -> Body {
133         Body::new(Kind::Once(None))
134     }
135 
136     /// Create a `Body` stream with an associated sender half.
137     ///
138     /// Useful when wanting to stream chunks from another thread.
139     #[inline]
channel() -> (Sender, Body)140     pub fn channel() -> (Sender, Body) {
141         Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
142     }
143 
new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body)144     pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
145         let (data_tx, data_rx) = mpsc::channel(0);
146         let (trailers_tx, trailers_rx) = oneshot::channel();
147 
148         // If wanter is true, `Sender::poll_ready()` won't becoming ready
149         // until the `Body` has been polled for data once.
150         let want = if wanter { WANT_PENDING } else { WANT_READY };
151 
152         let (want_tx, want_rx) = watch::channel(want);
153 
154         let tx = Sender {
155             want_rx,
156             data_tx,
157             trailers_tx: Some(trailers_tx),
158         };
159         let rx = Body::new(Kind::Chan {
160             content_length,
161             want_tx,
162             data_rx,
163             trailers_rx,
164         });
165 
166         (tx, rx)
167     }
168 
169     /// Wrap a futures `Stream` in a box inside `Body`.
170     ///
171     /// # Example
172     ///
173     /// ```
174     /// # use hyper::Body;
175     /// let chunks: Vec<Result<_, std::io::Error>> = vec![
176     ///     Ok("hello"),
177     ///     Ok(" "),
178     ///     Ok("world"),
179     /// ];
180     ///
181     /// let stream = futures_util::stream::iter(chunks);
182     ///
183     /// let body = Body::wrap_stream(stream);
184     /// ```
185     ///
186     /// # Optional
187     ///
188     /// This function requires enabling the `stream` feature in your
189     /// `Cargo.toml`.
190     #[cfg(feature = "stream")]
191     #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
wrap_stream<S, O, E>(stream: S) -> Body where S: Stream<Item = Result<O, E>> + Send + 'static, O: Into<Bytes> + 'static, E: Into<Box<dyn StdError + Send + Sync>> + 'static,192     pub fn wrap_stream<S, O, E>(stream: S) -> Body
193     where
194         S: Stream<Item = Result<O, E>> + Send + 'static,
195         O: Into<Bytes> + 'static,
196         E: Into<Box<dyn StdError + Send + Sync>> + 'static,
197     {
198         let mapped = stream.map_ok(Into::into).map_err(Into::into);
199         Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
200     }
201 
new(kind: Kind) -> Body202     fn new(kind: Kind) -> Body {
203         Body { kind, extra: None }
204     }
205 
206     #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
h2( recv: h2::RecvStream, mut content_length: DecodedLength, ping: ping::Recorder, ) -> Self207     pub(crate) fn h2(
208         recv: h2::RecvStream,
209         mut content_length: DecodedLength,
210         ping: ping::Recorder,
211     ) -> Self {
212         // If the stream is already EOS, then the "unknown length" is clearly
213         // actually ZERO.
214         if !content_length.is_exact() && recv.is_end_stream() {
215             content_length = DecodedLength::ZERO;
216         }
217         let body = Body::new(Kind::H2 {
218             ping,
219             content_length,
220             recv,
221         });
222 
223         body
224     }
225 
226     #[cfg(any(feature = "http1", feature = "http2"))]
227     #[cfg(feature = "client")]
delayed_eof(&mut self, fut: DelayEofUntil)228     pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
229         self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
230     }
231 
take_delayed_eof(&mut self) -> Option<DelayEof>232     fn take_delayed_eof(&mut self) -> Option<DelayEof> {
233         self.extra
234             .as_mut()
235             .and_then(|extra| extra.delayed_eof.take())
236     }
237 
238     #[cfg(any(feature = "http1", feature = "http2"))]
extra_mut(&mut self) -> &mut Extra239     fn extra_mut(&mut self) -> &mut Extra {
240         self.extra
241             .get_or_insert_with(|| Box::new(Extra { delayed_eof: None }))
242     }
243 
poll_eof(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>>244     fn poll_eof(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
245         match self.take_delayed_eof() {
246             #[cfg(any(feature = "http1", feature = "http2"))]
247             #[cfg(feature = "client")]
248             Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
249                 ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
250                     self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
251                     ok
252                 }
253                 Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) {
254                     Poll::Ready(Ok(never)) => match never {},
255                     Poll::Pending => {
256                         self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
257                         Poll::Pending
258                     }
259                     Poll::Ready(Err(_done)) => Poll::Ready(None),
260                 },
261                 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
262             },
263             #[cfg(any(feature = "http1", feature = "http2"))]
264             #[cfg(feature = "client")]
265             Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) {
266                 Poll::Ready(Ok(never)) => match never {},
267                 Poll::Pending => {
268                     self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
269                     Poll::Pending
270                 }
271                 Poll::Ready(Err(_done)) => Poll::Ready(None),
272             },
273             #[cfg(any(
274                 not(any(feature = "http1", feature = "http2")),
275                 not(feature = "client")
276             ))]
277             Some(delay_eof) => match delay_eof {},
278             None => self.poll_inner(cx),
279         }
280     }
281 
282     #[cfg(feature = "ffi")]
as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody283     pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
284         match self.kind {
285             Kind::Ffi(ref mut body) => return body,
286             _ => {
287                 self.kind = Kind::Ffi(crate::ffi::UserBody::new());
288             }
289         }
290 
291         match self.kind {
292             Kind::Ffi(ref mut body) => body,
293             _ => unreachable!(),
294         }
295     }
296 
poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>>297     fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
298         match self.kind {
299             Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
300             Kind::Chan {
301                 content_length: ref mut len,
302                 ref mut data_rx,
303                 ref mut want_tx,
304                 ..
305             } => {
306                 want_tx.send(WANT_READY);
307 
308                 match ready!(Pin::new(data_rx).poll_next(cx)?) {
309                     Some(chunk) => {
310                         len.sub_if(chunk.len() as u64);
311                         Poll::Ready(Some(Ok(chunk)))
312                     }
313                     None => Poll::Ready(None),
314                 }
315             }
316             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
317             Kind::H2 {
318                 ref ping,
319                 recv: ref mut h2,
320                 content_length: ref mut len,
321             } => match ready!(h2.poll_data(cx)) {
322                 Some(Ok(bytes)) => {
323                     let _ = h2.flow_control().release_capacity(bytes.len());
324                     len.sub_if(bytes.len() as u64);
325                     ping.record_data(bytes.len());
326                     Poll::Ready(Some(Ok(bytes)))
327                 }
328                 Some(Err(e)) => match e.reason() {
329                     // These reasons should cause stop of body reading, but nor fail it.
330                     // The same logic as for `AsyncRead for H2Upgraded` is applied here.
331                     Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(None),
332                     _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
333                 },
334                 None => Poll::Ready(None),
335             },
336 
337             #[cfg(feature = "ffi")]
338             Kind::Ffi(ref mut body) => body.poll_data(cx),
339 
340             #[cfg(feature = "stream")]
341             Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
342                 Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
343                 None => Poll::Ready(None),
344             },
345         }
346     }
347 
348     #[cfg(feature = "http1")]
take_full_data(&mut self) -> Option<Bytes>349     pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
350         if let Kind::Once(ref mut chunk) = self.kind {
351             chunk.take()
352         } else {
353             None
354         }
355     }
356 }
357 
358 impl Default for Body {
359     /// Returns `Body::empty()`.
360     #[inline]
default() -> Body361     fn default() -> Body {
362         Body::empty()
363     }
364 }
365 
366 impl HttpBody for Body {
367     type Data = Bytes;
368     type Error = crate::Error;
369 
poll_data( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Data, Self::Error>>>370     fn poll_data(
371         mut self: Pin<&mut Self>,
372         cx: &mut Context<'_>,
373     ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
374         self.poll_eof(cx)
375     }
376 
poll_trailers( #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut Context<'_>, ) -> Poll<Result<Option<HeaderMap>, Self::Error>>377     fn poll_trailers(
378         #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>,
379         #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut Context<'_>,
380     ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
381         match self.kind {
382             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
383             Kind::H2 {
384                 recv: ref mut h2,
385                 ref ping,
386                 ..
387             } => match ready!(h2.poll_trailers(cx)) {
388                 Ok(t) => {
389                     ping.record_non_data();
390                     Poll::Ready(Ok(t))
391                 }
392                 Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
393             },
394             Kind::Chan {
395                 ref mut trailers_rx,
396                 ..
397             } => match ready!(Pin::new(trailers_rx).poll(cx)) {
398                 Ok(t) => Poll::Ready(Ok(Some(t))),
399                 Err(_) => Poll::Ready(Ok(None)),
400             },
401             #[cfg(feature = "ffi")]
402             Kind::Ffi(ref mut body) => body.poll_trailers(cx),
403             _ => Poll::Ready(Ok(None)),
404         }
405     }
406 
is_end_stream(&self) -> bool407     fn is_end_stream(&self) -> bool {
408         match self.kind {
409             Kind::Once(ref val) => val.is_none(),
410             Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
411             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
412             Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
413             #[cfg(feature = "ffi")]
414             Kind::Ffi(..) => false,
415             #[cfg(feature = "stream")]
416             Kind::Wrapped(..) => false,
417         }
418     }
419 
size_hint(&self) -> SizeHint420     fn size_hint(&self) -> SizeHint {
421         macro_rules! opt_len {
422             ($content_length:expr) => {{
423                 let mut hint = SizeHint::default();
424 
425                 if let Some(content_length) = $content_length.into_opt() {
426                     hint.set_exact(content_length);
427                 }
428 
429                 hint
430             }};
431         }
432 
433         match self.kind {
434             Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
435             Kind::Once(None) => SizeHint::with_exact(0),
436             #[cfg(feature = "stream")]
437             Kind::Wrapped(..) => SizeHint::default(),
438             Kind::Chan { content_length, .. } => opt_len!(content_length),
439             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
440             Kind::H2 { content_length, .. } => opt_len!(content_length),
441             #[cfg(feature = "ffi")]
442             Kind::Ffi(..) => SizeHint::default(),
443         }
444     }
445 }
446 
447 impl fmt::Debug for Body {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result448     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449         #[derive(Debug)]
450         struct Streaming;
451         #[derive(Debug)]
452         struct Empty;
453         #[derive(Debug)]
454         struct Full<'a>(&'a Bytes);
455 
456         let mut builder = f.debug_tuple("Body");
457         match self.kind {
458             Kind::Once(None) => builder.field(&Empty),
459             Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)),
460             _ => builder.field(&Streaming),
461         };
462 
463         builder.finish()
464     }
465 }
466 
467 /// # Optional
468 ///
469 /// This function requires enabling the `stream` feature in your
470 /// `Cargo.toml`.
471 #[cfg(feature = "stream")]
472 impl Stream for Body {
473     type Item = crate::Result<Bytes>;
474 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>475     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
476         HttpBody::poll_data(self, cx)
477     }
478 }
479 
480 /// # Optional
481 ///
482 /// This function requires enabling the `stream` feature in your
483 /// `Cargo.toml`.
484 #[cfg(feature = "stream")]
485 impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
486     #[inline]
from( stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, ) -> Body487     fn from(
488         stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
489     ) -> Body {
490         Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
491     }
492 }
493 
494 impl From<Bytes> for Body {
495     #[inline]
from(chunk: Bytes) -> Body496     fn from(chunk: Bytes) -> Body {
497         if chunk.is_empty() {
498             Body::empty()
499         } else {
500             Body::new(Kind::Once(Some(chunk)))
501         }
502     }
503 }
504 
505 impl From<Vec<u8>> for Body {
506     #[inline]
from(vec: Vec<u8>) -> Body507     fn from(vec: Vec<u8>) -> Body {
508         Body::from(Bytes::from(vec))
509     }
510 }
511 
512 impl From<&'static [u8]> for Body {
513     #[inline]
from(slice: &'static [u8]) -> Body514     fn from(slice: &'static [u8]) -> Body {
515         Body::from(Bytes::from(slice))
516     }
517 }
518 
519 impl From<Cow<'static, [u8]>> for Body {
520     #[inline]
from(cow: Cow<'static, [u8]>) -> Body521     fn from(cow: Cow<'static, [u8]>) -> Body {
522         match cow {
523             Cow::Borrowed(b) => Body::from(b),
524             Cow::Owned(o) => Body::from(o),
525         }
526     }
527 }
528 
529 impl From<String> for Body {
530     #[inline]
from(s: String) -> Body531     fn from(s: String) -> Body {
532         Body::from(Bytes::from(s.into_bytes()))
533     }
534 }
535 
536 impl From<&'static str> for Body {
537     #[inline]
from(slice: &'static str) -> Body538     fn from(slice: &'static str) -> Body {
539         Body::from(Bytes::from(slice.as_bytes()))
540     }
541 }
542 
543 impl From<Cow<'static, str>> for Body {
544     #[inline]
from(cow: Cow<'static, str>) -> Body545     fn from(cow: Cow<'static, str>) -> Body {
546         match cow {
547             Cow::Borrowed(b) => Body::from(b),
548             Cow::Owned(o) => Body::from(o),
549         }
550     }
551 }
552 
553 impl Sender {
554     /// Check to see if this `Sender` can send more data.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>555     pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
556         // Check if the receiver end has tried polling for the body yet
557         ready!(self.poll_want(cx)?);
558         self.data_tx
559             .poll_ready(cx)
560             .map_err(|_| crate::Error::new_closed())
561     }
562 
poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>563     fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
564         match self.want_rx.load(cx) {
565             WANT_READY => Poll::Ready(Ok(())),
566             WANT_PENDING => Poll::Pending,
567             watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
568             unexpected => unreachable!("want_rx value: {}", unexpected),
569         }
570     }
571 
ready(&mut self) -> crate::Result<()>572     async fn ready(&mut self) -> crate::Result<()> {
573         futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
574     }
575 
576     /// Send data on data channel when it is ready.
send_data(&mut self, chunk: Bytes) -> crate::Result<()>577     pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
578         self.ready().await?;
579         self.data_tx
580             .try_send(Ok(chunk))
581             .map_err(|_| crate::Error::new_closed())
582     }
583 
584     /// Send trailers on trailers channel.
send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()>585     pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
586         let tx = match self.trailers_tx.take() {
587             Some(tx) => tx,
588             None => return Err(crate::Error::new_closed()),
589         };
590         tx.send(trailers).map_err(|_| crate::Error::new_closed())
591     }
592 
593     /// Try to send data on this channel.
594     ///
595     /// # Errors
596     ///
597     /// Returns `Err(Bytes)` if the channel could not (currently) accept
598     /// another `Bytes`.
599     ///
600     /// # Note
601     ///
602     /// This is mostly useful for when trying to send from some other thread
603     /// that doesn't have an async context. If in an async context, prefer
604     /// `send_data()` instead.
try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes>605     pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
606         self.data_tx
607             .try_send(Ok(chunk))
608             .map_err(|err| err.into_inner().expect("just sent Ok"))
609     }
610 
611     /// Aborts the body in an abnormal fashion.
abort(mut self)612     pub fn abort(mut self) {
613         self.send_error(crate::Error::new_body_write_aborted());
614     }
615 
send_error(&mut self, err: crate::Error)616     pub(crate) fn send_error(&mut self, err: crate::Error) {
617         let _ = self
618             .data_tx
619             // clone so the send works even if buffer is full
620             .clone()
621             .try_send(Err(err));
622     }
623 }
624 
625 impl fmt::Debug for Sender {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result626     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627         #[derive(Debug)]
628         struct Open;
629         #[derive(Debug)]
630         struct Closed;
631 
632         let mut builder = f.debug_tuple("Sender");
633         match self.want_rx.peek() {
634             watch::CLOSED => builder.field(&Closed),
635             _ => builder.field(&Open),
636         };
637 
638         builder.finish()
639     }
640 }
641 
642 #[cfg(test)]
643 mod tests {
644     use std::mem;
645     use std::task::Poll;
646 
647     use super::{Body, DecodedLength, HttpBody, Sender, SizeHint};
648 
649     #[test]
test_size_of()650     fn test_size_of() {
651         // These are mostly to help catch *accidentally* increasing
652         // the size by too much.
653 
654         let body_size = mem::size_of::<Body>();
655         let body_expected_size = mem::size_of::<u64>() * 6;
656         assert!(
657             body_size <= body_expected_size,
658             "Body size = {} <= {}",
659             body_size,
660             body_expected_size,
661         );
662 
663         assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>");
664 
665         assert_eq!(
666             mem::size_of::<Sender>(),
667             mem::size_of::<usize>() * 5,
668             "Sender"
669         );
670 
671         assert_eq!(
672             mem::size_of::<Sender>(),
673             mem::size_of::<Option<Sender>>(),
674             "Option<Sender>"
675         );
676     }
677 
678     #[test]
size_hint()679     fn size_hint() {
680         fn eq(body: Body, b: SizeHint, note: &str) {
681             let a = body.size_hint();
682             assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
683             assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
684         }
685 
686         eq(Body::from("Hello"), SizeHint::with_exact(5), "from str");
687 
688         eq(Body::empty(), SizeHint::with_exact(0), "empty");
689 
690         eq(Body::channel().1, SizeHint::new(), "channel");
691 
692         eq(
693             Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
694             SizeHint::with_exact(4),
695             "channel with length",
696         );
697     }
698 
699     #[tokio::test]
channel_abort()700     async fn channel_abort() {
701         let (tx, mut rx) = Body::channel();
702 
703         tx.abort();
704 
705         let err = rx.data().await.unwrap().unwrap_err();
706         assert!(err.is_body_write_aborted(), "{:?}", err);
707     }
708 
709     #[tokio::test]
channel_abort_when_buffer_is_full()710     async fn channel_abort_when_buffer_is_full() {
711         let (mut tx, mut rx) = Body::channel();
712 
713         tx.try_send_data("chunk 1".into()).expect("send 1");
714         // buffer is full, but can still send abort
715         tx.abort();
716 
717         let chunk1 = rx.data().await.expect("item 1").expect("chunk 1");
718         assert_eq!(chunk1, "chunk 1");
719 
720         let err = rx.data().await.unwrap().unwrap_err();
721         assert!(err.is_body_write_aborted(), "{:?}", err);
722     }
723 
724     #[test]
channel_buffers_one()725     fn channel_buffers_one() {
726         let (mut tx, _rx) = Body::channel();
727 
728         tx.try_send_data("chunk 1".into()).expect("send 1");
729 
730         // buffer is now full
731         let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
732         assert_eq!(chunk2, "chunk 2");
733     }
734 
735     #[tokio::test]
channel_empty()736     async fn channel_empty() {
737         let (_, mut rx) = Body::channel();
738 
739         assert!(rx.data().await.is_none());
740     }
741 
742     #[test]
channel_ready()743     fn channel_ready() {
744         let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
745 
746         let mut tx_ready = tokio_test::task::spawn(tx.ready());
747 
748         assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
749     }
750 
751     #[test]
channel_wanter()752     fn channel_wanter() {
753         let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
754 
755         let mut tx_ready = tokio_test::task::spawn(tx.ready());
756         let mut rx_data = tokio_test::task::spawn(rx.data());
757 
758         assert!(
759             tx_ready.poll().is_pending(),
760             "tx isn't ready before rx has been polled"
761         );
762 
763         assert!(rx_data.poll().is_pending(), "poll rx.data");
764         assert!(tx_ready.is_woken(), "rx poll wakes tx");
765 
766         assert!(
767             tx_ready.poll().is_ready(),
768             "tx is ready after rx has been polled"
769         );
770     }
771 
772     #[test]
channel_notices_closure()773     fn channel_notices_closure() {
774         let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
775 
776         let mut tx_ready = tokio_test::task::spawn(tx.ready());
777 
778         assert!(
779             tx_ready.poll().is_pending(),
780             "tx isn't ready before rx has been polled"
781         );
782 
783         drop(rx);
784         assert!(tx_ready.is_woken(), "dropping rx wakes tx");
785 
786         match tx_ready.poll() {
787             Poll::Ready(Err(ref e)) if e.is_closed() => (),
788             unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
789         }
790     }
791 }
792