1 use std::borrow::Cow; 2 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] 3 use std::convert::Infallible; 4 #[cfg(feature = "stream")] 5 use std::error::Error as StdError; 6 use std::fmt; 7 use std::future::Future; 8 use std::pin::Pin; 9 use std::task::{Context, Poll}; 10 11 use bytes::Bytes; 12 use futures_channel::mpsc; 13 use futures_channel::oneshot; 14 use futures_core::Stream; // for mpsc::Receiver 15 #[cfg(feature = "stream")] 16 use futures_util::TryStreamExt; 17 use http::HeaderMap; 18 use http_body::{Body as HttpBody, SizeHint}; 19 20 use super::DecodedLength; 21 #[cfg(feature = "stream")] 22 use crate::common::sync_wrapper::SyncWrapper; 23 use crate::common::watch; 24 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 25 use crate::proto::h2::ping; 26 27 type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>; 28 type TrailersSender = oneshot::Sender<HeaderMap>; 29 30 /// A stream of `Bytes`, used when receiving bodies. 31 /// 32 /// A good default [`HttpBody`](crate::body::HttpBody) to use in many 33 /// applications. 34 /// 35 /// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes()) 36 /// or [`body::aggregate`](crate::body::aggregate()). 37 #[must_use = "streams do nothing unless polled"] 38 pub struct Body { 39 kind: Kind, 40 /// Keep the extra bits in an `Option<Box<Extra>>`, so that 41 /// Body stays small in the common case (no extras needed). 42 extra: Option<Box<Extra>>, 43 } 44 45 enum Kind { 46 Once(Option<Bytes>), 47 Chan { 48 content_length: DecodedLength, 49 want_tx: watch::Sender, 50 data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>, 51 trailers_rx: oneshot::Receiver<HeaderMap>, 52 }, 53 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 54 H2 { 55 ping: ping::Recorder, 56 content_length: DecodedLength, 57 recv: h2::RecvStream, 58 }, 59 #[cfg(feature = "ffi")] 60 Ffi(crate::ffi::UserBody), 61 #[cfg(feature = "stream")] 62 Wrapped( 63 SyncWrapper< 64 Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>, 65 >, 66 ), 67 } 68 69 struct Extra { 70 /// Allow the client to pass a future to delay the `Body` from returning 71 /// EOF. This allows the `Client` to try to put the idle connection 72 /// back into the pool before the body is "finished". 73 /// 74 /// The reason for this is so that creating a new request after finishing 75 /// streaming the body of a response could sometimes result in creating 76 /// a brand new connection, since the pool didn't know about the idle 77 /// connection yet. 78 delayed_eof: Option<DelayEof>, 79 } 80 81 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] 82 type DelayEofUntil = oneshot::Receiver<Infallible>; 83 84 enum DelayEof { 85 /// Initial state, stream hasn't seen EOF yet. 86 #[cfg(any(feature = "http1", feature = "http2"))] 87 #[cfg(feature = "client")] 88 NotEof(DelayEofUntil), 89 /// Transitions to this state once we've seen `poll` try to 90 /// return EOF (`None`). This future is then polled, and 91 /// when it completes, the Body finally returns EOF (`None`). 92 #[cfg(any(feature = "http1", feature = "http2"))] 93 #[cfg(feature = "client")] 94 Eof(DelayEofUntil), 95 } 96 97 /// A sender half created through [`Body::channel()`]. 98 /// 99 /// Useful when wanting to stream chunks from another thread. 100 /// 101 /// ## Body Closing 102 /// 103 /// Note that the request body will always be closed normally when the sender is dropped (meaning 104 /// that the empty terminating chunk will be sent to the remote). If you desire to close the 105 /// connection with an incomplete response (e.g. in the case of an error during asynchronous 106 /// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion. 107 /// 108 /// [`Body::channel()`]: struct.Body.html#method.channel 109 /// [`Sender::abort()`]: struct.Sender.html#method.abort 110 #[must_use = "Sender does nothing unless sent on"] 111 pub struct Sender { 112 want_rx: watch::Receiver, 113 data_tx: BodySender, 114 trailers_tx: Option<TrailersSender>, 115 } 116 117 const WANT_PENDING: usize = 1; 118 const WANT_READY: usize = 2; 119 120 impl Body { 121 /// Create an empty `Body` stream. 122 /// 123 /// # Example 124 /// 125 /// ``` 126 /// use hyper::{Body, Request}; 127 /// 128 /// // create a `GET /` request 129 /// let get = Request::new(Body::empty()); 130 /// ``` 131 #[inline] empty() -> Body132 pub fn empty() -> Body { 133 Body::new(Kind::Once(None)) 134 } 135 136 /// Create a `Body` stream with an associated sender half. 137 /// 138 /// Useful when wanting to stream chunks from another thread. 139 #[inline] channel() -> (Sender, Body)140 pub fn channel() -> (Sender, Body) { 141 Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) 142 } 143 new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body)144 pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) { 145 let (data_tx, data_rx) = mpsc::channel(0); 146 let (trailers_tx, trailers_rx) = oneshot::channel(); 147 148 // If wanter is true, `Sender::poll_ready()` won't becoming ready 149 // until the `Body` has been polled for data once. 150 let want = if wanter { WANT_PENDING } else { WANT_READY }; 151 152 let (want_tx, want_rx) = watch::channel(want); 153 154 let tx = Sender { 155 want_rx, 156 data_tx, 157 trailers_tx: Some(trailers_tx), 158 }; 159 let rx = Body::new(Kind::Chan { 160 content_length, 161 want_tx, 162 data_rx, 163 trailers_rx, 164 }); 165 166 (tx, rx) 167 } 168 169 /// Wrap a futures `Stream` in a box inside `Body`. 170 /// 171 /// # Example 172 /// 173 /// ``` 174 /// # use hyper::Body; 175 /// let chunks: Vec<Result<_, std::io::Error>> = vec![ 176 /// Ok("hello"), 177 /// Ok(" "), 178 /// Ok("world"), 179 /// ]; 180 /// 181 /// let stream = futures_util::stream::iter(chunks); 182 /// 183 /// let body = Body::wrap_stream(stream); 184 /// ``` 185 /// 186 /// # Optional 187 /// 188 /// This function requires enabling the `stream` feature in your 189 /// `Cargo.toml`. 190 #[cfg(feature = "stream")] 191 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] wrap_stream<S, O, E>(stream: S) -> Body where S: Stream<Item = Result<O, E>> + Send + 'static, O: Into<Bytes> + 'static, E: Into<Box<dyn StdError + Send + Sync>> + 'static,192 pub fn wrap_stream<S, O, E>(stream: S) -> Body 193 where 194 S: Stream<Item = Result<O, E>> + Send + 'static, 195 O: Into<Bytes> + 'static, 196 E: Into<Box<dyn StdError + Send + Sync>> + 'static, 197 { 198 let mapped = stream.map_ok(Into::into).map_err(Into::into); 199 Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) 200 } 201 new(kind: Kind) -> Body202 fn new(kind: Kind) -> Body { 203 Body { kind, extra: None } 204 } 205 206 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] h2( recv: h2::RecvStream, mut content_length: DecodedLength, ping: ping::Recorder, ) -> Self207 pub(crate) fn h2( 208 recv: h2::RecvStream, 209 mut content_length: DecodedLength, 210 ping: ping::Recorder, 211 ) -> Self { 212 // If the stream is already EOS, then the "unknown length" is clearly 213 // actually ZERO. 214 if !content_length.is_exact() && recv.is_end_stream() { 215 content_length = DecodedLength::ZERO; 216 } 217 let body = Body::new(Kind::H2 { 218 ping, 219 content_length, 220 recv, 221 }); 222 223 body 224 } 225 226 #[cfg(any(feature = "http1", feature = "http2"))] 227 #[cfg(feature = "client")] delayed_eof(&mut self, fut: DelayEofUntil)228 pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { 229 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut)); 230 } 231 take_delayed_eof(&mut self) -> Option<DelayEof>232 fn take_delayed_eof(&mut self) -> Option<DelayEof> { 233 self.extra 234 .as_mut() 235 .and_then(|extra| extra.delayed_eof.take()) 236 } 237 238 #[cfg(any(feature = "http1", feature = "http2"))] extra_mut(&mut self) -> &mut Extra239 fn extra_mut(&mut self) -> &mut Extra { 240 self.extra 241 .get_or_insert_with(|| Box::new(Extra { delayed_eof: None })) 242 } 243 poll_eof(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>>244 fn poll_eof(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { 245 match self.take_delayed_eof() { 246 #[cfg(any(feature = "http1", feature = "http2"))] 247 #[cfg(feature = "client")] 248 Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) { 249 ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => { 250 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay)); 251 ok 252 } 253 Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) { 254 Poll::Ready(Ok(never)) => match never {}, 255 Poll::Pending => { 256 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); 257 Poll::Pending 258 } 259 Poll::Ready(Err(_done)) => Poll::Ready(None), 260 }, 261 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), 262 }, 263 #[cfg(any(feature = "http1", feature = "http2"))] 264 #[cfg(feature = "client")] 265 Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) { 266 Poll::Ready(Ok(never)) => match never {}, 267 Poll::Pending => { 268 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); 269 Poll::Pending 270 } 271 Poll::Ready(Err(_done)) => Poll::Ready(None), 272 }, 273 #[cfg(any( 274 not(any(feature = "http1", feature = "http2")), 275 not(feature = "client") 276 ))] 277 Some(delay_eof) => match delay_eof {}, 278 None => self.poll_inner(cx), 279 } 280 } 281 282 #[cfg(feature = "ffi")] as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody283 pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody { 284 match self.kind { 285 Kind::Ffi(ref mut body) => return body, 286 _ => { 287 self.kind = Kind::Ffi(crate::ffi::UserBody::new()); 288 } 289 } 290 291 match self.kind { 292 Kind::Ffi(ref mut body) => body, 293 _ => unreachable!(), 294 } 295 } 296 poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>>297 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { 298 match self.kind { 299 Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), 300 Kind::Chan { 301 content_length: ref mut len, 302 ref mut data_rx, 303 ref mut want_tx, 304 .. 305 } => { 306 want_tx.send(WANT_READY); 307 308 match ready!(Pin::new(data_rx).poll_next(cx)?) { 309 Some(chunk) => { 310 len.sub_if(chunk.len() as u64); 311 Poll::Ready(Some(Ok(chunk))) 312 } 313 None => Poll::Ready(None), 314 } 315 } 316 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 317 Kind::H2 { 318 ref ping, 319 recv: ref mut h2, 320 content_length: ref mut len, 321 } => match ready!(h2.poll_data(cx)) { 322 Some(Ok(bytes)) => { 323 let _ = h2.flow_control().release_capacity(bytes.len()); 324 len.sub_if(bytes.len() as u64); 325 ping.record_data(bytes.len()); 326 Poll::Ready(Some(Ok(bytes))) 327 } 328 Some(Err(e)) => match e.reason() { 329 // These reasons should cause stop of body reading, but nor fail it. 330 // The same logic as for `AsyncRead for H2Upgraded` is applied here. 331 Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(None), 332 _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))), 333 }, 334 None => Poll::Ready(None), 335 }, 336 337 #[cfg(feature = "ffi")] 338 Kind::Ffi(ref mut body) => body.poll_data(cx), 339 340 #[cfg(feature = "stream")] 341 Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) { 342 Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), 343 None => Poll::Ready(None), 344 }, 345 } 346 } 347 348 #[cfg(feature = "http1")] take_full_data(&mut self) -> Option<Bytes>349 pub(super) fn take_full_data(&mut self) -> Option<Bytes> { 350 if let Kind::Once(ref mut chunk) = self.kind { 351 chunk.take() 352 } else { 353 None 354 } 355 } 356 } 357 358 impl Default for Body { 359 /// Returns `Body::empty()`. 360 #[inline] default() -> Body361 fn default() -> Body { 362 Body::empty() 363 } 364 } 365 366 impl HttpBody for Body { 367 type Data = Bytes; 368 type Error = crate::Error; 369 poll_data( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Data, Self::Error>>>370 fn poll_data( 371 mut self: Pin<&mut Self>, 372 cx: &mut Context<'_>, 373 ) -> Poll<Option<Result<Self::Data, Self::Error>>> { 374 self.poll_eof(cx) 375 } 376 poll_trailers( #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut Context<'_>, ) -> Poll<Result<Option<HeaderMap>, Self::Error>>377 fn poll_trailers( 378 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, 379 #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut Context<'_>, 380 ) -> Poll<Result<Option<HeaderMap>, Self::Error>> { 381 match self.kind { 382 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 383 Kind::H2 { 384 recv: ref mut h2, 385 ref ping, 386 .. 387 } => match ready!(h2.poll_trailers(cx)) { 388 Ok(t) => { 389 ping.record_non_data(); 390 Poll::Ready(Ok(t)) 391 } 392 Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), 393 }, 394 Kind::Chan { 395 ref mut trailers_rx, 396 .. 397 } => match ready!(Pin::new(trailers_rx).poll(cx)) { 398 Ok(t) => Poll::Ready(Ok(Some(t))), 399 Err(_) => Poll::Ready(Ok(None)), 400 }, 401 #[cfg(feature = "ffi")] 402 Kind::Ffi(ref mut body) => body.poll_trailers(cx), 403 _ => Poll::Ready(Ok(None)), 404 } 405 } 406 is_end_stream(&self) -> bool407 fn is_end_stream(&self) -> bool { 408 match self.kind { 409 Kind::Once(ref val) => val.is_none(), 410 Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO, 411 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 412 Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), 413 #[cfg(feature = "ffi")] 414 Kind::Ffi(..) => false, 415 #[cfg(feature = "stream")] 416 Kind::Wrapped(..) => false, 417 } 418 } 419 size_hint(&self) -> SizeHint420 fn size_hint(&self) -> SizeHint { 421 macro_rules! opt_len { 422 ($content_length:expr) => {{ 423 let mut hint = SizeHint::default(); 424 425 if let Some(content_length) = $content_length.into_opt() { 426 hint.set_exact(content_length); 427 } 428 429 hint 430 }}; 431 } 432 433 match self.kind { 434 Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64), 435 Kind::Once(None) => SizeHint::with_exact(0), 436 #[cfg(feature = "stream")] 437 Kind::Wrapped(..) => SizeHint::default(), 438 Kind::Chan { content_length, .. } => opt_len!(content_length), 439 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 440 Kind::H2 { content_length, .. } => opt_len!(content_length), 441 #[cfg(feature = "ffi")] 442 Kind::Ffi(..) => SizeHint::default(), 443 } 444 } 445 } 446 447 impl fmt::Debug for Body { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result448 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 449 #[derive(Debug)] 450 struct Streaming; 451 #[derive(Debug)] 452 struct Empty; 453 #[derive(Debug)] 454 struct Full<'a>(&'a Bytes); 455 456 let mut builder = f.debug_tuple("Body"); 457 match self.kind { 458 Kind::Once(None) => builder.field(&Empty), 459 Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)), 460 _ => builder.field(&Streaming), 461 }; 462 463 builder.finish() 464 } 465 } 466 467 /// # Optional 468 /// 469 /// This function requires enabling the `stream` feature in your 470 /// `Cargo.toml`. 471 #[cfg(feature = "stream")] 472 impl Stream for Body { 473 type Item = crate::Result<Bytes>; 474 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>475 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 476 HttpBody::poll_data(self, cx) 477 } 478 } 479 480 /// # Optional 481 /// 482 /// This function requires enabling the `stream` feature in your 483 /// `Cargo.toml`. 484 #[cfg(feature = "stream")] 485 impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body { 486 #[inline] from( stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, ) -> Body487 fn from( 488 stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, 489 ) -> Body { 490 Body::new(Kind::Wrapped(SyncWrapper::new(stream.into()))) 491 } 492 } 493 494 impl From<Bytes> for Body { 495 #[inline] from(chunk: Bytes) -> Body496 fn from(chunk: Bytes) -> Body { 497 if chunk.is_empty() { 498 Body::empty() 499 } else { 500 Body::new(Kind::Once(Some(chunk))) 501 } 502 } 503 } 504 505 impl From<Vec<u8>> for Body { 506 #[inline] from(vec: Vec<u8>) -> Body507 fn from(vec: Vec<u8>) -> Body { 508 Body::from(Bytes::from(vec)) 509 } 510 } 511 512 impl From<&'static [u8]> for Body { 513 #[inline] from(slice: &'static [u8]) -> Body514 fn from(slice: &'static [u8]) -> Body { 515 Body::from(Bytes::from(slice)) 516 } 517 } 518 519 impl From<Cow<'static, [u8]>> for Body { 520 #[inline] from(cow: Cow<'static, [u8]>) -> Body521 fn from(cow: Cow<'static, [u8]>) -> Body { 522 match cow { 523 Cow::Borrowed(b) => Body::from(b), 524 Cow::Owned(o) => Body::from(o), 525 } 526 } 527 } 528 529 impl From<String> for Body { 530 #[inline] from(s: String) -> Body531 fn from(s: String) -> Body { 532 Body::from(Bytes::from(s.into_bytes())) 533 } 534 } 535 536 impl From<&'static str> for Body { 537 #[inline] from(slice: &'static str) -> Body538 fn from(slice: &'static str) -> Body { 539 Body::from(Bytes::from(slice.as_bytes())) 540 } 541 } 542 543 impl From<Cow<'static, str>> for Body { 544 #[inline] from(cow: Cow<'static, str>) -> Body545 fn from(cow: Cow<'static, str>) -> Body { 546 match cow { 547 Cow::Borrowed(b) => Body::from(b), 548 Cow::Owned(o) => Body::from(o), 549 } 550 } 551 } 552 553 impl Sender { 554 /// Check to see if this `Sender` can send more data. poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>555 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { 556 // Check if the receiver end has tried polling for the body yet 557 ready!(self.poll_want(cx)?); 558 self.data_tx 559 .poll_ready(cx) 560 .map_err(|_| crate::Error::new_closed()) 561 } 562 poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>563 fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { 564 match self.want_rx.load(cx) { 565 WANT_READY => Poll::Ready(Ok(())), 566 WANT_PENDING => Poll::Pending, 567 watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())), 568 unexpected => unreachable!("want_rx value: {}", unexpected), 569 } 570 } 571 ready(&mut self) -> crate::Result<()>572 async fn ready(&mut self) -> crate::Result<()> { 573 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await 574 } 575 576 /// Send data on data channel when it is ready. send_data(&mut self, chunk: Bytes) -> crate::Result<()>577 pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { 578 self.ready().await?; 579 self.data_tx 580 .try_send(Ok(chunk)) 581 .map_err(|_| crate::Error::new_closed()) 582 } 583 584 /// Send trailers on trailers channel. send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()>585 pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { 586 let tx = match self.trailers_tx.take() { 587 Some(tx) => tx, 588 None => return Err(crate::Error::new_closed()), 589 }; 590 tx.send(trailers).map_err(|_| crate::Error::new_closed()) 591 } 592 593 /// Try to send data on this channel. 594 /// 595 /// # Errors 596 /// 597 /// Returns `Err(Bytes)` if the channel could not (currently) accept 598 /// another `Bytes`. 599 /// 600 /// # Note 601 /// 602 /// This is mostly useful for when trying to send from some other thread 603 /// that doesn't have an async context. If in an async context, prefer 604 /// `send_data()` instead. try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes>605 pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { 606 self.data_tx 607 .try_send(Ok(chunk)) 608 .map_err(|err| err.into_inner().expect("just sent Ok")) 609 } 610 611 /// Aborts the body in an abnormal fashion. abort(mut self)612 pub fn abort(mut self) { 613 self.send_error(crate::Error::new_body_write_aborted()); 614 } 615 send_error(&mut self, err: crate::Error)616 pub(crate) fn send_error(&mut self, err: crate::Error) { 617 let _ = self 618 .data_tx 619 // clone so the send works even if buffer is full 620 .clone() 621 .try_send(Err(err)); 622 } 623 } 624 625 impl fmt::Debug for Sender { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result626 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 627 #[derive(Debug)] 628 struct Open; 629 #[derive(Debug)] 630 struct Closed; 631 632 let mut builder = f.debug_tuple("Sender"); 633 match self.want_rx.peek() { 634 watch::CLOSED => builder.field(&Closed), 635 _ => builder.field(&Open), 636 }; 637 638 builder.finish() 639 } 640 } 641 642 #[cfg(test)] 643 mod tests { 644 use std::mem; 645 use std::task::Poll; 646 647 use super::{Body, DecodedLength, HttpBody, Sender, SizeHint}; 648 649 #[test] test_size_of()650 fn test_size_of() { 651 // These are mostly to help catch *accidentally* increasing 652 // the size by too much. 653 654 let body_size = mem::size_of::<Body>(); 655 let body_expected_size = mem::size_of::<u64>() * 6; 656 assert!( 657 body_size <= body_expected_size, 658 "Body size = {} <= {}", 659 body_size, 660 body_expected_size, 661 ); 662 663 assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>"); 664 665 assert_eq!( 666 mem::size_of::<Sender>(), 667 mem::size_of::<usize>() * 5, 668 "Sender" 669 ); 670 671 assert_eq!( 672 mem::size_of::<Sender>(), 673 mem::size_of::<Option<Sender>>(), 674 "Option<Sender>" 675 ); 676 } 677 678 #[test] size_hint()679 fn size_hint() { 680 fn eq(body: Body, b: SizeHint, note: &str) { 681 let a = body.size_hint(); 682 assert_eq!(a.lower(), b.lower(), "lower for {:?}", note); 683 assert_eq!(a.upper(), b.upper(), "upper for {:?}", note); 684 } 685 686 eq(Body::from("Hello"), SizeHint::with_exact(5), "from str"); 687 688 eq(Body::empty(), SizeHint::with_exact(0), "empty"); 689 690 eq(Body::channel().1, SizeHint::new(), "channel"); 691 692 eq( 693 Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, 694 SizeHint::with_exact(4), 695 "channel with length", 696 ); 697 } 698 699 #[tokio::test] channel_abort()700 async fn channel_abort() { 701 let (tx, mut rx) = Body::channel(); 702 703 tx.abort(); 704 705 let err = rx.data().await.unwrap().unwrap_err(); 706 assert!(err.is_body_write_aborted(), "{:?}", err); 707 } 708 709 #[tokio::test] channel_abort_when_buffer_is_full()710 async fn channel_abort_when_buffer_is_full() { 711 let (mut tx, mut rx) = Body::channel(); 712 713 tx.try_send_data("chunk 1".into()).expect("send 1"); 714 // buffer is full, but can still send abort 715 tx.abort(); 716 717 let chunk1 = rx.data().await.expect("item 1").expect("chunk 1"); 718 assert_eq!(chunk1, "chunk 1"); 719 720 let err = rx.data().await.unwrap().unwrap_err(); 721 assert!(err.is_body_write_aborted(), "{:?}", err); 722 } 723 724 #[test] channel_buffers_one()725 fn channel_buffers_one() { 726 let (mut tx, _rx) = Body::channel(); 727 728 tx.try_send_data("chunk 1".into()).expect("send 1"); 729 730 // buffer is now full 731 let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2"); 732 assert_eq!(chunk2, "chunk 2"); 733 } 734 735 #[tokio::test] channel_empty()736 async fn channel_empty() { 737 let (_, mut rx) = Body::channel(); 738 739 assert!(rx.data().await.is_none()); 740 } 741 742 #[test] channel_ready()743 fn channel_ready() { 744 let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); 745 746 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 747 748 assert!(tx_ready.poll().is_ready(), "tx is ready immediately"); 749 } 750 751 #[test] channel_wanter()752 fn channel_wanter() { 753 let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); 754 755 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 756 let mut rx_data = tokio_test::task::spawn(rx.data()); 757 758 assert!( 759 tx_ready.poll().is_pending(), 760 "tx isn't ready before rx has been polled" 761 ); 762 763 assert!(rx_data.poll().is_pending(), "poll rx.data"); 764 assert!(tx_ready.is_woken(), "rx poll wakes tx"); 765 766 assert!( 767 tx_ready.poll().is_ready(), 768 "tx is ready after rx has been polled" 769 ); 770 } 771 772 #[test] channel_notices_closure()773 fn channel_notices_closure() { 774 let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); 775 776 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 777 778 assert!( 779 tx_ready.poll().is_pending(), 780 "tx isn't ready before rx has been polled" 781 ); 782 783 drop(rx); 784 assert!(tx_ready.is_woken(), "dropping rx wakes tx"); 785 786 match tx_ready.poll() { 787 Poll::Ready(Err(ref e)) if e.is_closed() => (), 788 unexpected => panic!("tx poll ready unexpected: {:?}", unexpected), 789 } 790 } 791 } 792