1 use std::fmt; 2 use std::io; 3 use std::marker::PhantomData; 4 use std::marker::Unpin; 5 use std::pin::Pin; 6 use std::task::{Context, Poll}; 7 #[cfg(all(feature = "server", feature = "runtime"))] 8 use std::time::Duration; 9 10 use bytes::{Buf, Bytes}; 11 use http::header::{HeaderValue, CONNECTION}; 12 use http::{HeaderMap, Method, Version}; 13 use httparse::ParserConfig; 14 use tokio::io::{AsyncRead, AsyncWrite}; 15 #[cfg(all(feature = "server", feature = "runtime"))] 16 use tokio::time::Sleep; 17 use tracing::{debug, error, trace}; 18 19 use super::io::Buffered; 20 use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants}; 21 use crate::body::DecodedLength; 22 use crate::headers::connection_keep_alive; 23 use crate::proto::{BodyLength, MessageHead}; 24 25 const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 26 27 /// This handles a connection, which will have been established over an 28 /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple 29 /// `Transaction`s over HTTP. 30 /// 31 /// The connection will determine when a message begins and ends as well as 32 /// determine if this connection can be kept alive after the message, 33 /// or if it is complete. 34 pub(crate) struct Conn<I, B, T> { 35 io: Buffered<I, EncodedBuf<B>>, 36 state: State, 37 _marker: PhantomData<fn(T)>, 38 } 39 40 impl<I, B, T> Conn<I, B, T> 41 where 42 I: AsyncRead + AsyncWrite + Unpin, 43 B: Buf, 44 T: Http1Transaction, 45 { new(io: I) -> Conn<I, B, T>46 pub(crate) fn new(io: I) -> Conn<I, B, T> { 47 Conn { 48 io: Buffered::new(io), 49 state: State { 50 allow_half_close: false, 51 cached_headers: None, 52 error: None, 53 keep_alive: KA::Busy, 54 method: None, 55 h1_parser_config: ParserConfig::default(), 56 #[cfg(all(feature = "server", feature = "runtime"))] 57 h1_header_read_timeout: None, 58 #[cfg(all(feature = "server", feature = "runtime"))] 59 h1_header_read_timeout_fut: None, 60 #[cfg(all(feature = "server", feature = "runtime"))] 61 h1_header_read_timeout_running: false, 62 preserve_header_case: false, 63 #[cfg(feature = "ffi")] 64 preserve_header_order: false, 65 title_case_headers: false, 66 h09_responses: false, 67 #[cfg(feature = "ffi")] 68 on_informational: None, 69 #[cfg(feature = "ffi")] 70 raw_headers: false, 71 notify_read: false, 72 reading: Reading::Init, 73 writing: Writing::Init, 74 upgrade: None, 75 // We assume a modern world where the remote speaks HTTP/1.1. 76 // If they tell us otherwise, we'll downgrade in `read_head`. 77 version: Version::HTTP_11, 78 }, 79 _marker: PhantomData, 80 } 81 } 82 83 #[cfg(feature = "server")] set_flush_pipeline(&mut self, enabled: bool)84 pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) { 85 self.io.set_flush_pipeline(enabled); 86 } 87 set_write_strategy_queue(&mut self)88 pub(crate) fn set_write_strategy_queue(&mut self) { 89 self.io.set_write_strategy_queue(); 90 } 91 set_max_buf_size(&mut self, max: usize)92 pub(crate) fn set_max_buf_size(&mut self, max: usize) { 93 self.io.set_max_buf_size(max); 94 } 95 96 #[cfg(feature = "client")] set_read_buf_exact_size(&mut self, sz: usize)97 pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) { 98 self.io.set_read_buf_exact_size(sz); 99 } 100 set_write_strategy_flatten(&mut self)101 pub(crate) fn set_write_strategy_flatten(&mut self) { 102 self.io.set_write_strategy_flatten(); 103 } 104 105 #[cfg(feature = "client")] set_h1_parser_config(&mut self, parser_config: ParserConfig)106 pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) { 107 self.state.h1_parser_config = parser_config; 108 } 109 set_title_case_headers(&mut self)110 pub(crate) fn set_title_case_headers(&mut self) { 111 self.state.title_case_headers = true; 112 } 113 set_preserve_header_case(&mut self)114 pub(crate) fn set_preserve_header_case(&mut self) { 115 self.state.preserve_header_case = true; 116 } 117 118 #[cfg(feature = "ffi")] set_preserve_header_order(&mut self)119 pub(crate) fn set_preserve_header_order(&mut self) { 120 self.state.preserve_header_order = true; 121 } 122 123 #[cfg(feature = "client")] set_h09_responses(&mut self)124 pub(crate) fn set_h09_responses(&mut self) { 125 self.state.h09_responses = true; 126 } 127 128 #[cfg(all(feature = "server", feature = "runtime"))] set_http1_header_read_timeout(&mut self, val: Duration)129 pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) { 130 self.state.h1_header_read_timeout = Some(val); 131 } 132 133 #[cfg(feature = "server")] set_allow_half_close(&mut self)134 pub(crate) fn set_allow_half_close(&mut self) { 135 self.state.allow_half_close = true; 136 } 137 138 #[cfg(feature = "ffi")] set_raw_headers(&mut self, enabled: bool)139 pub(crate) fn set_raw_headers(&mut self, enabled: bool) { 140 self.state.raw_headers = enabled; 141 } 142 into_inner(self) -> (I, Bytes)143 pub(crate) fn into_inner(self) -> (I, Bytes) { 144 self.io.into_inner() 145 } 146 pending_upgrade(&mut self) -> Option<crate::upgrade::Pending>147 pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> { 148 self.state.upgrade.take() 149 } 150 is_read_closed(&self) -> bool151 pub(crate) fn is_read_closed(&self) -> bool { 152 self.state.is_read_closed() 153 } 154 is_write_closed(&self) -> bool155 pub(crate) fn is_write_closed(&self) -> bool { 156 self.state.is_write_closed() 157 } 158 can_read_head(&self) -> bool159 pub(crate) fn can_read_head(&self) -> bool { 160 if !matches!(self.state.reading, Reading::Init) { 161 return false; 162 } 163 164 if T::should_read_first() { 165 return true; 166 } 167 168 !matches!(self.state.writing, Writing::Init) 169 } 170 can_read_body(&self) -> bool171 pub(crate) fn can_read_body(&self) -> bool { 172 match self.state.reading { 173 Reading::Body(..) | Reading::Continue(..) => true, 174 _ => false, 175 } 176 } 177 should_error_on_eof(&self) -> bool178 fn should_error_on_eof(&self) -> bool { 179 // If we're idle, it's probably just the connection closing gracefully. 180 T::should_error_on_parse_eof() && !self.state.is_idle() 181 } 182 has_h2_prefix(&self) -> bool183 fn has_h2_prefix(&self) -> bool { 184 let read_buf = self.io.read_buf(); 185 read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE 186 } 187 poll_read_head( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>>188 pub(super) fn poll_read_head( 189 &mut self, 190 cx: &mut Context<'_>, 191 ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> { 192 debug_assert!(self.can_read_head()); 193 trace!("Conn::read_head"); 194 195 let msg = match ready!(self.io.parse::<T>( 196 cx, 197 ParseContext { 198 cached_headers: &mut self.state.cached_headers, 199 req_method: &mut self.state.method, 200 h1_parser_config: self.state.h1_parser_config.clone(), 201 #[cfg(all(feature = "server", feature = "runtime"))] 202 h1_header_read_timeout: self.state.h1_header_read_timeout, 203 #[cfg(all(feature = "server", feature = "runtime"))] 204 h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut, 205 #[cfg(all(feature = "server", feature = "runtime"))] 206 h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running, 207 preserve_header_case: self.state.preserve_header_case, 208 #[cfg(feature = "ffi")] 209 preserve_header_order: self.state.preserve_header_order, 210 h09_responses: self.state.h09_responses, 211 #[cfg(feature = "ffi")] 212 on_informational: &mut self.state.on_informational, 213 #[cfg(feature = "ffi")] 214 raw_headers: self.state.raw_headers, 215 } 216 )) { 217 Ok(msg) => msg, 218 Err(e) => return self.on_read_head_error(e), 219 }; 220 221 // Note: don't deconstruct `msg` into local variables, it appears 222 // the optimizer doesn't remove the extra copies. 223 224 debug!("incoming body is {}", msg.decode); 225 226 // Prevent accepting HTTP/0.9 responses after the initial one, if any. 227 self.state.h09_responses = false; 228 229 // Drop any OnInformational callbacks, we're done there! 230 #[cfg(feature = "ffi")] 231 { 232 self.state.on_informational = None; 233 } 234 235 self.state.busy(); 236 self.state.keep_alive &= msg.keep_alive; 237 self.state.version = msg.head.version; 238 239 let mut wants = if msg.wants_upgrade { 240 Wants::UPGRADE 241 } else { 242 Wants::EMPTY 243 }; 244 245 if msg.decode == DecodedLength::ZERO { 246 if msg.expect_continue { 247 debug!("ignoring expect-continue since body is empty"); 248 } 249 self.state.reading = Reading::KeepAlive; 250 if !T::should_read_first() { 251 self.try_keep_alive(cx); 252 } 253 } else if msg.expect_continue { 254 self.state.reading = Reading::Continue(Decoder::new(msg.decode)); 255 wants = wants.add(Wants::EXPECT); 256 } else { 257 self.state.reading = Reading::Body(Decoder::new(msg.decode)); 258 } 259 260 Poll::Ready(Some(Ok((msg.head, msg.decode, wants)))) 261 } 262 on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>>263 fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> { 264 // If we are currently waiting on a message, then an empty 265 // message should be reported as an error. If not, it is just 266 // the connection closing gracefully. 267 let must_error = self.should_error_on_eof(); 268 self.close_read(); 269 self.io.consume_leading_lines(); 270 let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty(); 271 if was_mid_parse || must_error { 272 // We check if the buf contains the h2 Preface 273 debug!( 274 "parse error ({}) with {} bytes", 275 e, 276 self.io.read_buf().len() 277 ); 278 match self.on_parse_error(e) { 279 Ok(()) => Poll::Pending, // XXX: wat? 280 Err(e) => Poll::Ready(Some(Err(e))), 281 } 282 } else { 283 debug!("read eof"); 284 self.close_write(); 285 Poll::Ready(None) 286 } 287 } 288 poll_read_body( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<io::Result<Bytes>>>289 pub(crate) fn poll_read_body( 290 &mut self, 291 cx: &mut Context<'_>, 292 ) -> Poll<Option<io::Result<Bytes>>> { 293 debug_assert!(self.can_read_body()); 294 295 let (reading, ret) = match self.state.reading { 296 Reading::Body(ref mut decoder) => { 297 match ready!(decoder.decode(cx, &mut self.io)) { 298 Ok(slice) => { 299 let (reading, chunk) = if decoder.is_eof() { 300 debug!("incoming body completed"); 301 ( 302 Reading::KeepAlive, 303 if !slice.is_empty() { 304 Some(Ok(slice)) 305 } else { 306 None 307 }, 308 ) 309 } else if slice.is_empty() { 310 error!("incoming body unexpectedly ended"); 311 // This should be unreachable, since all 3 decoders 312 // either set eof=true or return an Err when reading 313 // an empty slice... 314 (Reading::Closed, None) 315 } else { 316 return Poll::Ready(Some(Ok(slice))); 317 }; 318 (reading, Poll::Ready(chunk)) 319 } 320 Err(e) => { 321 debug!("incoming body decode error: {}", e); 322 (Reading::Closed, Poll::Ready(Some(Err(e)))) 323 } 324 } 325 } 326 Reading::Continue(ref decoder) => { 327 // Write the 100 Continue if not already responded... 328 if let Writing::Init = self.state.writing { 329 trace!("automatically sending 100 Continue"); 330 let cont = b"HTTP/1.1 100 Continue\r\n\r\n"; 331 self.io.headers_buf().extend_from_slice(cont); 332 } 333 334 // And now recurse once in the Reading::Body state... 335 self.state.reading = Reading::Body(decoder.clone()); 336 return self.poll_read_body(cx); 337 } 338 _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading), 339 }; 340 341 self.state.reading = reading; 342 self.try_keep_alive(cx); 343 ret 344 } 345 wants_read_again(&mut self) -> bool346 pub(crate) fn wants_read_again(&mut self) -> bool { 347 let ret = self.state.notify_read; 348 self.state.notify_read = false; 349 ret 350 } 351 poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>352 pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { 353 debug_assert!(!self.can_read_head() && !self.can_read_body()); 354 355 if self.is_read_closed() { 356 Poll::Pending 357 } else if self.is_mid_message() { 358 self.mid_message_detect_eof(cx) 359 } else { 360 self.require_empty_read(cx) 361 } 362 } 363 is_mid_message(&self) -> bool364 fn is_mid_message(&self) -> bool { 365 !matches!( 366 (&self.state.reading, &self.state.writing), 367 (&Reading::Init, &Writing::Init) 368 ) 369 } 370 371 // This will check to make sure the io object read is empty. 372 // 373 // This should only be called for Clients wanting to enter the idle 374 // state. require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>375 fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { 376 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); 377 debug_assert!(!self.is_mid_message()); 378 debug_assert!(T::is_client()); 379 380 if !self.io.read_buf().is_empty() { 381 debug!("received an unexpected {} bytes", self.io.read_buf().len()); 382 return Poll::Ready(Err(crate::Error::new_unexpected_message())); 383 } 384 385 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; 386 387 if num_read == 0 { 388 let ret = if self.should_error_on_eof() { 389 trace!("found unexpected EOF on busy connection: {:?}", self.state); 390 Poll::Ready(Err(crate::Error::new_incomplete())) 391 } else { 392 trace!("found EOF on idle connection, closing"); 393 Poll::Ready(Ok(())) 394 }; 395 396 // order is important: should_error needs state BEFORE close_read 397 self.state.close_read(); 398 return ret; 399 } 400 401 debug!( 402 "received unexpected {} bytes on an idle connection", 403 num_read 404 ); 405 Poll::Ready(Err(crate::Error::new_unexpected_message())) 406 } 407 mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>408 fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { 409 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); 410 debug_assert!(self.is_mid_message()); 411 412 if self.state.allow_half_close || !self.io.read_buf().is_empty() { 413 return Poll::Pending; 414 } 415 416 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; 417 418 if num_read == 0 { 419 trace!("found unexpected EOF on busy connection: {:?}", self.state); 420 self.state.close_read(); 421 Poll::Ready(Err(crate::Error::new_incomplete())) 422 } else { 423 Poll::Ready(Ok(())) 424 } 425 } 426 force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>>427 fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> { 428 debug_assert!(!self.state.is_read_closed()); 429 430 let result = ready!(self.io.poll_read_from_io(cx)); 431 Poll::Ready(result.map_err(|e| { 432 trace!("force_io_read; io error = {:?}", e); 433 self.state.close(); 434 e 435 })) 436 } 437 maybe_notify(&mut self, cx: &mut Context<'_>)438 fn maybe_notify(&mut self, cx: &mut Context<'_>) { 439 // its possible that we returned NotReady from poll() without having 440 // exhausted the underlying Io. We would have done this when we 441 // determined we couldn't keep reading until we knew how writing 442 // would finish. 443 444 match self.state.reading { 445 Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => { 446 return 447 } 448 Reading::Init => (), 449 }; 450 451 match self.state.writing { 452 Writing::Body(..) => return, 453 Writing::Init | Writing::KeepAlive | Writing::Closed => (), 454 } 455 456 if !self.io.is_read_blocked() { 457 if self.io.read_buf().is_empty() { 458 match self.io.poll_read_from_io(cx) { 459 Poll::Ready(Ok(n)) => { 460 if n == 0 { 461 trace!("maybe_notify; read eof"); 462 if self.state.is_idle() { 463 self.state.close(); 464 } else { 465 self.close_read() 466 } 467 return; 468 } 469 } 470 Poll::Pending => { 471 trace!("maybe_notify; read_from_io blocked"); 472 return; 473 } 474 Poll::Ready(Err(e)) => { 475 trace!("maybe_notify; read_from_io error: {}", e); 476 self.state.close(); 477 self.state.error = Some(crate::Error::new_io(e)); 478 } 479 } 480 } 481 self.state.notify_read = true; 482 } 483 } 484 try_keep_alive(&mut self, cx: &mut Context<'_>)485 fn try_keep_alive(&mut self, cx: &mut Context<'_>) { 486 self.state.try_keep_alive::<T>(); 487 self.maybe_notify(cx); 488 } 489 can_write_head(&self) -> bool490 pub(crate) fn can_write_head(&self) -> bool { 491 if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) { 492 return false; 493 } 494 495 match self.state.writing { 496 Writing::Init => self.io.can_headers_buf(), 497 _ => false, 498 } 499 } 500 can_write_body(&self) -> bool501 pub(crate) fn can_write_body(&self) -> bool { 502 match self.state.writing { 503 Writing::Body(..) => true, 504 Writing::Init | Writing::KeepAlive | Writing::Closed => false, 505 } 506 } 507 can_buffer_body(&self) -> bool508 pub(crate) fn can_buffer_body(&self) -> bool { 509 self.io.can_buffer() 510 } 511 write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>)512 pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) { 513 if let Some(encoder) = self.encode_head(head, body) { 514 self.state.writing = if !encoder.is_eof() { 515 Writing::Body(encoder) 516 } else if encoder.is_last() { 517 Writing::Closed 518 } else { 519 Writing::KeepAlive 520 }; 521 } 522 } 523 write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B)524 pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) { 525 if let Some(encoder) = 526 self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64))) 527 { 528 let is_last = encoder.is_last(); 529 // Make sure we don't write a body if we weren't actually allowed 530 // to do so, like because its a HEAD request. 531 if !encoder.is_eof() { 532 encoder.danger_full_buf(body, self.io.write_buf()); 533 } 534 self.state.writing = if is_last { 535 Writing::Closed 536 } else { 537 Writing::KeepAlive 538 } 539 } 540 } 541 encode_head( &mut self, mut head: MessageHead<T::Outgoing>, body: Option<BodyLength>, ) -> Option<Encoder>542 fn encode_head( 543 &mut self, 544 mut head: MessageHead<T::Outgoing>, 545 body: Option<BodyLength>, 546 ) -> Option<Encoder> { 547 debug_assert!(self.can_write_head()); 548 549 if !T::should_read_first() { 550 self.state.busy(); 551 } 552 553 self.enforce_version(&mut head); 554 555 let buf = self.io.headers_buf(); 556 match super::role::encode_headers::<T>( 557 Encode { 558 head: &mut head, 559 body, 560 #[cfg(feature = "server")] 561 keep_alive: self.state.wants_keep_alive(), 562 req_method: &mut self.state.method, 563 title_case_headers: self.state.title_case_headers, 564 }, 565 buf, 566 ) { 567 Ok(encoder) => { 568 debug_assert!(self.state.cached_headers.is_none()); 569 debug_assert!(head.headers.is_empty()); 570 self.state.cached_headers = Some(head.headers); 571 572 #[cfg(feature = "ffi")] 573 { 574 self.state.on_informational = 575 head.extensions.remove::<crate::ffi::OnInformational>(); 576 } 577 578 Some(encoder) 579 } 580 Err(err) => { 581 self.state.error = Some(err); 582 self.state.writing = Writing::Closed; 583 None 584 } 585 } 586 } 587 588 // Fix keep-alive when Connection: keep-alive header is not present fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>)589 fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) { 590 let outgoing_is_keep_alive = head 591 .headers 592 .get(CONNECTION) 593 .map(connection_keep_alive) 594 .unwrap_or(false); 595 596 if !outgoing_is_keep_alive { 597 match head.version { 598 // If response is version 1.0 and keep-alive is not present in the response, 599 // disable keep-alive so the server closes the connection 600 Version::HTTP_10 => self.state.disable_keep_alive(), 601 // If response is version 1.1 and keep-alive is wanted, add 602 // Connection: keep-alive header when not present 603 Version::HTTP_11 => { 604 if self.state.wants_keep_alive() { 605 head.headers 606 .insert(CONNECTION, HeaderValue::from_static("keep-alive")); 607 } 608 } 609 _ => (), 610 } 611 } 612 } 613 614 // If we know the remote speaks an older version, we try to fix up any messages 615 // to work with our older peer. enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>)616 fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) { 617 if let Version::HTTP_10 = self.state.version { 618 // Fixes response or connection when keep-alive header is not present 619 self.fix_keep_alive(head); 620 // If the remote only knows HTTP/1.0, we should force ourselves 621 // to do only speak HTTP/1.0 as well. 622 head.version = Version::HTTP_10; 623 } 624 // If the remote speaks HTTP/1.1, then it *should* be fine with 625 // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let 626 // the user's headers be. 627 } 628 write_body(&mut self, chunk: B)629 pub(crate) fn write_body(&mut self, chunk: B) { 630 debug_assert!(self.can_write_body() && self.can_buffer_body()); 631 // empty chunks should be discarded at Dispatcher level 632 debug_assert!(chunk.remaining() != 0); 633 634 let state = match self.state.writing { 635 Writing::Body(ref mut encoder) => { 636 self.io.buffer(encoder.encode(chunk)); 637 638 if !encoder.is_eof() { 639 return; 640 } 641 642 if encoder.is_last() { 643 Writing::Closed 644 } else { 645 Writing::KeepAlive 646 } 647 } 648 _ => unreachable!("write_body invalid state: {:?}", self.state.writing), 649 }; 650 651 self.state.writing = state; 652 } 653 write_body_and_end(&mut self, chunk: B)654 pub(crate) fn write_body_and_end(&mut self, chunk: B) { 655 debug_assert!(self.can_write_body() && self.can_buffer_body()); 656 // empty chunks should be discarded at Dispatcher level 657 debug_assert!(chunk.remaining() != 0); 658 659 let state = match self.state.writing { 660 Writing::Body(ref encoder) => { 661 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf()); 662 if can_keep_alive { 663 Writing::KeepAlive 664 } else { 665 Writing::Closed 666 } 667 } 668 _ => unreachable!("write_body invalid state: {:?}", self.state.writing), 669 }; 670 671 self.state.writing = state; 672 } 673 end_body(&mut self) -> crate::Result<()>674 pub(crate) fn end_body(&mut self) -> crate::Result<()> { 675 debug_assert!(self.can_write_body()); 676 677 let encoder = match self.state.writing { 678 Writing::Body(ref mut enc) => enc, 679 _ => return Ok(()), 680 }; 681 682 // end of stream, that means we should try to eof 683 match encoder.end() { 684 Ok(end) => { 685 if let Some(end) = end { 686 self.io.buffer(end); 687 } 688 689 self.state.writing = if encoder.is_last() || encoder.is_close_delimited() { 690 Writing::Closed 691 } else { 692 Writing::KeepAlive 693 }; 694 695 Ok(()) 696 } 697 Err(not_eof) => { 698 self.state.writing = Writing::Closed; 699 Err(crate::Error::new_body_write_aborted().with(not_eof)) 700 } 701 } 702 } 703 704 // When we get a parse error, depending on what side we are, we might be able 705 // to write a response before closing the connection. 706 // 707 // - Client: there is nothing we can do 708 // - Server: if Response hasn't been written yet, we can send a 4xx response on_parse_error(&mut self, err: crate::Error) -> crate::Result<()>709 fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> { 710 if let Writing::Init = self.state.writing { 711 if self.has_h2_prefix() { 712 return Err(crate::Error::new_version_h2()); 713 } 714 if let Some(msg) = T::on_error(&err) { 715 // Drop the cached headers so as to not trigger a debug 716 // assert in `write_head`... 717 self.state.cached_headers.take(); 718 self.write_head(msg, None); 719 self.state.error = Some(err); 720 return Ok(()); 721 } 722 } 723 724 // fallback is pass the error back up 725 Err(err) 726 } 727 poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>728 pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 729 ready!(Pin::new(&mut self.io).poll_flush(cx))?; 730 self.try_keep_alive(cx); 731 trace!("flushed({}): {:?}", T::LOG, self.state); 732 Poll::Ready(Ok(())) 733 } 734 poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>735 pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 736 match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) { 737 Ok(()) => { 738 trace!("shut down IO complete"); 739 Poll::Ready(Ok(())) 740 } 741 Err(e) => { 742 debug!("error shutting down IO: {}", e); 743 Poll::Ready(Err(e)) 744 } 745 } 746 } 747 748 /// If the read side can be cheaply drained, do so. Otherwise, close. poll_drain_or_close_read(&mut self, cx: &mut Context<'_>)749 pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) { 750 if let Reading::Continue(ref decoder) = self.state.reading { 751 // skip sending the 100-continue 752 // just move forward to a read, in case a tiny body was included 753 self.state.reading = Reading::Body(decoder.clone()); 754 } 755 756 let _ = self.poll_read_body(cx); 757 758 // If still in Reading::Body, just give up 759 match self.state.reading { 760 Reading::Init | Reading::KeepAlive => trace!("body drained"), 761 _ => self.close_read(), 762 } 763 } 764 close_read(&mut self)765 pub(crate) fn close_read(&mut self) { 766 self.state.close_read(); 767 } 768 close_write(&mut self)769 pub(crate) fn close_write(&mut self) { 770 self.state.close_write(); 771 } 772 773 #[cfg(feature = "server")] disable_keep_alive(&mut self)774 pub(crate) fn disable_keep_alive(&mut self) { 775 if self.state.is_idle() { 776 trace!("disable_keep_alive; closing idle connection"); 777 self.state.close(); 778 } else { 779 trace!("disable_keep_alive; in-progress connection"); 780 self.state.disable_keep_alive(); 781 } 782 } 783 take_error(&mut self) -> crate::Result<()>784 pub(crate) fn take_error(&mut self) -> crate::Result<()> { 785 if let Some(err) = self.state.error.take() { 786 Err(err) 787 } else { 788 Ok(()) 789 } 790 } 791 on_upgrade(&mut self) -> crate::upgrade::OnUpgrade792 pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade { 793 trace!("{}: prepare possible HTTP upgrade", T::LOG); 794 self.state.prepare_upgrade() 795 } 796 } 797 798 impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result799 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 800 f.debug_struct("Conn") 801 .field("state", &self.state) 802 .field("io", &self.io) 803 .finish() 804 } 805 } 806 807 // B and T are never pinned 808 impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {} 809 810 struct State { 811 allow_half_close: bool, 812 /// Re-usable HeaderMap to reduce allocating new ones. 813 cached_headers: Option<HeaderMap>, 814 /// If an error occurs when there wasn't a direct way to return it 815 /// back to the user, this is set. 816 error: Option<crate::Error>, 817 /// Current keep-alive status. 818 keep_alive: KA, 819 /// If mid-message, the HTTP Method that started it. 820 /// 821 /// This is used to know things such as if the message can include 822 /// a body or not. 823 method: Option<Method>, 824 h1_parser_config: ParserConfig, 825 #[cfg(all(feature = "server", feature = "runtime"))] 826 h1_header_read_timeout: Option<Duration>, 827 #[cfg(all(feature = "server", feature = "runtime"))] 828 h1_header_read_timeout_fut: Option<Pin<Box<Sleep>>>, 829 #[cfg(all(feature = "server", feature = "runtime"))] 830 h1_header_read_timeout_running: bool, 831 preserve_header_case: bool, 832 #[cfg(feature = "ffi")] 833 preserve_header_order: bool, 834 title_case_headers: bool, 835 h09_responses: bool, 836 /// If set, called with each 1xx informational response received for 837 /// the current request. MUST be unset after a non-1xx response is 838 /// received. 839 #[cfg(feature = "ffi")] 840 on_informational: Option<crate::ffi::OnInformational>, 841 #[cfg(feature = "ffi")] 842 raw_headers: bool, 843 /// Set to true when the Dispatcher should poll read operations 844 /// again. See the `maybe_notify` method for more. 845 notify_read: bool, 846 /// State of allowed reads 847 reading: Reading, 848 /// State of allowed writes 849 writing: Writing, 850 /// An expected pending HTTP upgrade. 851 upgrade: Option<crate::upgrade::Pending>, 852 /// Either HTTP/1.0 or 1.1 connection 853 version: Version, 854 } 855 856 #[derive(Debug)] 857 enum Reading { 858 Init, 859 Continue(Decoder), 860 Body(Decoder), 861 KeepAlive, 862 Closed, 863 } 864 865 enum Writing { 866 Init, 867 Body(Encoder), 868 KeepAlive, 869 Closed, 870 } 871 872 impl fmt::Debug for State { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result873 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 874 let mut builder = f.debug_struct("State"); 875 builder 876 .field("reading", &self.reading) 877 .field("writing", &self.writing) 878 .field("keep_alive", &self.keep_alive); 879 880 // Only show error field if it's interesting... 881 if let Some(ref error) = self.error { 882 builder.field("error", error); 883 } 884 885 if self.allow_half_close { 886 builder.field("allow_half_close", &true); 887 } 888 889 // Purposefully leaving off other fields.. 890 891 builder.finish() 892 } 893 } 894 895 impl fmt::Debug for Writing { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result896 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 897 match *self { 898 Writing::Init => f.write_str("Init"), 899 Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(), 900 Writing::KeepAlive => f.write_str("KeepAlive"), 901 Writing::Closed => f.write_str("Closed"), 902 } 903 } 904 } 905 906 impl std::ops::BitAndAssign<bool> for KA { bitand_assign(&mut self, enabled: bool)907 fn bitand_assign(&mut self, enabled: bool) { 908 if !enabled { 909 trace!("remote disabling keep-alive"); 910 *self = KA::Disabled; 911 } 912 } 913 } 914 915 #[derive(Clone, Copy, Debug)] 916 enum KA { 917 Idle, 918 Busy, 919 Disabled, 920 } 921 922 impl Default for KA { default() -> KA923 fn default() -> KA { 924 KA::Busy 925 } 926 } 927 928 impl KA { idle(&mut self)929 fn idle(&mut self) { 930 *self = KA::Idle; 931 } 932 busy(&mut self)933 fn busy(&mut self) { 934 *self = KA::Busy; 935 } 936 disable(&mut self)937 fn disable(&mut self) { 938 *self = KA::Disabled; 939 } 940 status(&self) -> KA941 fn status(&self) -> KA { 942 *self 943 } 944 } 945 946 impl State { close(&mut self)947 fn close(&mut self) { 948 trace!("State::close()"); 949 self.reading = Reading::Closed; 950 self.writing = Writing::Closed; 951 self.keep_alive.disable(); 952 } 953 close_read(&mut self)954 fn close_read(&mut self) { 955 trace!("State::close_read()"); 956 self.reading = Reading::Closed; 957 self.keep_alive.disable(); 958 } 959 close_write(&mut self)960 fn close_write(&mut self) { 961 trace!("State::close_write()"); 962 self.writing = Writing::Closed; 963 self.keep_alive.disable(); 964 } 965 wants_keep_alive(&self) -> bool966 fn wants_keep_alive(&self) -> bool { 967 if let KA::Disabled = self.keep_alive.status() { 968 false 969 } else { 970 true 971 } 972 } 973 try_keep_alive<T: Http1Transaction>(&mut self)974 fn try_keep_alive<T: Http1Transaction>(&mut self) { 975 match (&self.reading, &self.writing) { 976 (&Reading::KeepAlive, &Writing::KeepAlive) => { 977 if let KA::Busy = self.keep_alive.status() { 978 self.idle::<T>(); 979 } else { 980 trace!( 981 "try_keep_alive({}): could keep-alive, but status = {:?}", 982 T::LOG, 983 self.keep_alive 984 ); 985 self.close(); 986 } 987 } 988 (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => { 989 self.close() 990 } 991 _ => (), 992 } 993 } 994 disable_keep_alive(&mut self)995 fn disable_keep_alive(&mut self) { 996 self.keep_alive.disable() 997 } 998 busy(&mut self)999 fn busy(&mut self) { 1000 if let KA::Disabled = self.keep_alive.status() { 1001 return; 1002 } 1003 self.keep_alive.busy(); 1004 } 1005 idle<T: Http1Transaction>(&mut self)1006 fn idle<T: Http1Transaction>(&mut self) { 1007 debug_assert!(!self.is_idle(), "State::idle() called while idle"); 1008 1009 self.method = None; 1010 self.keep_alive.idle(); 1011 1012 if !self.is_idle() { 1013 self.close(); 1014 return; 1015 } 1016 1017 self.reading = Reading::Init; 1018 self.writing = Writing::Init; 1019 1020 // !T::should_read_first() means Client. 1021 // 1022 // If Client connection has just gone idle, the Dispatcher 1023 // should try the poll loop one more time, so as to poll the 1024 // pending requests stream. 1025 if !T::should_read_first() { 1026 self.notify_read = true; 1027 } 1028 } 1029 is_idle(&self) -> bool1030 fn is_idle(&self) -> bool { 1031 matches!(self.keep_alive.status(), KA::Idle) 1032 } 1033 is_read_closed(&self) -> bool1034 fn is_read_closed(&self) -> bool { 1035 matches!(self.reading, Reading::Closed) 1036 } 1037 is_write_closed(&self) -> bool1038 fn is_write_closed(&self) -> bool { 1039 matches!(self.writing, Writing::Closed) 1040 } 1041 prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade1042 fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade { 1043 let (tx, rx) = crate::upgrade::pending(); 1044 self.upgrade = Some(tx); 1045 rx 1046 } 1047 } 1048 1049 #[cfg(test)] 1050 mod tests { 1051 #[cfg(feature = "nightly")] 1052 #[bench] bench_read_head_short(b: &mut ::test::Bencher)1053 fn bench_read_head_short(b: &mut ::test::Bencher) { 1054 use super::*; 1055 let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n"; 1056 let len = s.len(); 1057 b.bytes = len as u64; 1058 1059 // an empty IO, we'll be skipping and using the read buffer anyways 1060 let io = tokio_test::io::Builder::new().build(); 1061 let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io); 1062 *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]); 1063 conn.state.cached_headers = Some(HeaderMap::with_capacity(2)); 1064 1065 let rt = tokio::runtime::Builder::new_current_thread() 1066 .enable_all() 1067 .build() 1068 .unwrap(); 1069 1070 b.iter(|| { 1071 rt.block_on(futures_util::future::poll_fn(|cx| { 1072 match conn.poll_read_head(cx) { 1073 Poll::Ready(Some(Ok(x))) => { 1074 ::test::black_box(&x); 1075 let mut headers = x.0.headers; 1076 headers.clear(); 1077 conn.state.cached_headers = Some(headers); 1078 } 1079 f => panic!("expected Ready(Some(Ok(..))): {:?}", f), 1080 } 1081 1082 conn.io.read_buf_mut().reserve(1); 1083 unsafe { 1084 conn.io.read_buf_mut().set_len(len); 1085 } 1086 conn.state.reading = Reading::Init; 1087 Poll::Ready(()) 1088 })); 1089 }); 1090 } 1091 1092 /* 1093 //TODO: rewrite these using dispatch... someday... 1094 use futures::{Async, Future, Stream, Sink}; 1095 use futures::future; 1096 1097 use proto::{self, ClientTransaction, MessageHead, ServerTransaction}; 1098 use super::super::Encoder; 1099 use mock::AsyncIo; 1100 1101 use super::{Conn, Decoder, Reading, Writing}; 1102 use ::uri::Uri; 1103 1104 use std::str::FromStr; 1105 1106 #[test] 1107 fn test_conn_init_read() { 1108 let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); 1109 let len = good_message.len(); 1110 let io = AsyncIo::new_buf(good_message, len); 1111 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1112 1113 match conn.poll().unwrap() { 1114 Async::Ready(Some(Frame::Message { message, body: false })) => { 1115 assert_eq!(message, MessageHead { 1116 subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()), 1117 .. MessageHead::default() 1118 }) 1119 }, 1120 f => panic!("frame is not Frame::Message: {:?}", f) 1121 } 1122 } 1123 1124 #[test] 1125 fn test_conn_parse_partial() { 1126 let _: Result<(), ()> = future::lazy(|| { 1127 let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); 1128 let io = AsyncIo::new_buf(good_message, 10); 1129 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1130 assert!(conn.poll().unwrap().is_not_ready()); 1131 conn.io.io_mut().block_in(50); 1132 let async = conn.poll().unwrap(); 1133 assert!(async.is_ready()); 1134 match async { 1135 Async::Ready(Some(Frame::Message { .. })) => (), 1136 f => panic!("frame is not Message: {:?}", f), 1137 } 1138 Ok(()) 1139 }).wait(); 1140 } 1141 1142 #[test] 1143 fn test_conn_init_read_eof_idle() { 1144 let io = AsyncIo::new_buf(vec![], 1); 1145 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1146 conn.state.idle(); 1147 1148 match conn.poll().unwrap() { 1149 Async::Ready(None) => {}, 1150 other => panic!("frame is not None: {:?}", other) 1151 } 1152 } 1153 1154 #[test] 1155 fn test_conn_init_read_eof_idle_partial_parse() { 1156 let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100); 1157 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1158 conn.state.idle(); 1159 1160 match conn.poll() { 1161 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, 1162 other => panic!("unexpected frame: {:?}", other) 1163 } 1164 } 1165 1166 #[test] 1167 fn test_conn_init_read_eof_busy() { 1168 let _: Result<(), ()> = future::lazy(|| { 1169 // server ignores 1170 let io = AsyncIo::new_eof(); 1171 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1172 conn.state.busy(); 1173 1174 match conn.poll().unwrap() { 1175 Async::Ready(None) => {}, 1176 other => panic!("unexpected frame: {:?}", other) 1177 } 1178 1179 // client 1180 let io = AsyncIo::new_eof(); 1181 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1182 conn.state.busy(); 1183 1184 match conn.poll() { 1185 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, 1186 other => panic!("unexpected frame: {:?}", other) 1187 } 1188 Ok(()) 1189 }).wait(); 1190 } 1191 1192 #[test] 1193 fn test_conn_body_finish_read_eof() { 1194 let _: Result<(), ()> = future::lazy(|| { 1195 let io = AsyncIo::new_eof(); 1196 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1197 conn.state.busy(); 1198 conn.state.writing = Writing::KeepAlive; 1199 conn.state.reading = Reading::Body(Decoder::length(0)); 1200 1201 match conn.poll() { 1202 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), 1203 other => panic!("unexpected frame: {:?}", other) 1204 } 1205 1206 // conn eofs, but tokio-proto will call poll() again, before calling flush() 1207 // the conn eof in this case is perfectly fine 1208 1209 match conn.poll() { 1210 Ok(Async::Ready(None)) => (), 1211 other => panic!("unexpected frame: {:?}", other) 1212 } 1213 Ok(()) 1214 }).wait(); 1215 } 1216 1217 #[test] 1218 fn test_conn_message_empty_body_read_eof() { 1219 let _: Result<(), ()> = future::lazy(|| { 1220 let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024); 1221 let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); 1222 conn.state.busy(); 1223 conn.state.writing = Writing::KeepAlive; 1224 1225 match conn.poll() { 1226 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (), 1227 other => panic!("unexpected frame: {:?}", other) 1228 } 1229 1230 // conn eofs, but tokio-proto will call poll() again, before calling flush() 1231 // the conn eof in this case is perfectly fine 1232 1233 match conn.poll() { 1234 Ok(Async::Ready(None)) => (), 1235 other => panic!("unexpected frame: {:?}", other) 1236 } 1237 Ok(()) 1238 }).wait(); 1239 } 1240 1241 #[test] 1242 fn test_conn_read_body_end() { 1243 let _: Result<(), ()> = future::lazy(|| { 1244 let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024); 1245 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1246 conn.state.busy(); 1247 1248 match conn.poll() { 1249 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (), 1250 other => panic!("unexpected frame: {:?}", other) 1251 } 1252 1253 match conn.poll() { 1254 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (), 1255 other => panic!("unexpected frame: {:?}", other) 1256 } 1257 1258 // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None` 1259 match conn.poll() { 1260 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), 1261 other => panic!("unexpected frame: {:?}", other) 1262 } 1263 1264 match conn.poll() { 1265 Ok(Async::NotReady) => (), 1266 other => panic!("unexpected frame: {:?}", other) 1267 } 1268 Ok(()) 1269 }).wait(); 1270 } 1271 1272 #[test] 1273 fn test_conn_closed_read() { 1274 let io = AsyncIo::new_buf(vec![], 0); 1275 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1276 conn.state.close(); 1277 1278 match conn.poll().unwrap() { 1279 Async::Ready(None) => {}, 1280 other => panic!("frame is not None: {:?}", other) 1281 } 1282 } 1283 1284 #[test] 1285 fn test_conn_body_write_length() { 1286 let _ = pretty_env_logger::try_init(); 1287 let _: Result<(), ()> = future::lazy(|| { 1288 let io = AsyncIo::new_buf(vec![], 0); 1289 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1290 let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096; 1291 conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64)); 1292 1293 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready()); 1294 assert!(!conn.can_buffer_body()); 1295 1296 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready()); 1297 1298 conn.io.io_mut().block_in(1024 * 3); 1299 assert!(conn.poll_complete().unwrap().is_not_ready()); 1300 conn.io.io_mut().block_in(1024 * 3); 1301 assert!(conn.poll_complete().unwrap().is_not_ready()); 1302 conn.io.io_mut().block_in(max * 2); 1303 assert!(conn.poll_complete().unwrap().is_ready()); 1304 1305 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready()); 1306 Ok(()) 1307 }).wait(); 1308 } 1309 1310 #[test] 1311 fn test_conn_body_write_chunked() { 1312 let _: Result<(), ()> = future::lazy(|| { 1313 let io = AsyncIo::new_buf(vec![], 4096); 1314 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1315 conn.state.writing = Writing::Body(Encoder::chunked()); 1316 1317 assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready()); 1318 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready()); 1319 Ok(()) 1320 }).wait(); 1321 } 1322 1323 #[test] 1324 fn test_conn_body_flush() { 1325 let _: Result<(), ()> = future::lazy(|| { 1326 let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); 1327 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1328 conn.state.writing = Writing::Body(Encoder::length(1024 * 1024)); 1329 assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready()); 1330 assert!(!conn.can_buffer_body()); 1331 conn.io.io_mut().block_in(1024 * 1024 * 5); 1332 assert!(conn.poll_complete().unwrap().is_ready()); 1333 assert!(conn.can_buffer_body()); 1334 assert!(conn.io.io_mut().flushed()); 1335 1336 Ok(()) 1337 }).wait(); 1338 } 1339 1340 #[test] 1341 fn test_conn_parking() { 1342 use std::sync::Arc; 1343 use futures::executor::Notify; 1344 use futures::executor::NotifyHandle; 1345 1346 struct Car { 1347 permit: bool, 1348 } 1349 impl Notify for Car { 1350 fn notify(&self, _id: usize) { 1351 assert!(self.permit, "unparked without permit"); 1352 } 1353 } 1354 1355 fn car(permit: bool) -> NotifyHandle { 1356 Arc::new(Car { 1357 permit: permit, 1358 }).into() 1359 } 1360 1361 // test that once writing is done, unparks 1362 let f = future::lazy(|| { 1363 let io = AsyncIo::new_buf(vec![], 4096); 1364 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1365 conn.state.reading = Reading::KeepAlive; 1366 assert!(conn.poll().unwrap().is_not_ready()); 1367 1368 conn.state.writing = Writing::KeepAlive; 1369 assert!(conn.poll_complete().unwrap().is_ready()); 1370 Ok::<(), ()>(()) 1371 }); 1372 ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap(); 1373 1374 1375 // test that flushing when not waiting on read doesn't unpark 1376 let f = future::lazy(|| { 1377 let io = AsyncIo::new_buf(vec![], 4096); 1378 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1379 conn.state.writing = Writing::KeepAlive; 1380 assert!(conn.poll_complete().unwrap().is_ready()); 1381 Ok::<(), ()>(()) 1382 }); 1383 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); 1384 1385 1386 // test that flushing and writing isn't done doesn't unpark 1387 let f = future::lazy(|| { 1388 let io = AsyncIo::new_buf(vec![], 4096); 1389 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1390 conn.state.reading = Reading::KeepAlive; 1391 assert!(conn.poll().unwrap().is_not_ready()); 1392 conn.state.writing = Writing::Body(Encoder::length(5_000)); 1393 assert!(conn.poll_complete().unwrap().is_ready()); 1394 Ok::<(), ()>(()) 1395 }); 1396 ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); 1397 } 1398 1399 #[test] 1400 fn test_conn_closed_write() { 1401 let io = AsyncIo::new_buf(vec![], 0); 1402 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1403 conn.state.close(); 1404 1405 match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) { 1406 Err(_e) => {}, 1407 other => panic!("did not return Err: {:?}", other) 1408 } 1409 1410 assert!(conn.state.is_write_closed()); 1411 } 1412 1413 #[test] 1414 fn test_conn_write_empty_chunk() { 1415 let io = AsyncIo::new_buf(vec![], 0); 1416 let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); 1417 conn.state.writing = Writing::KeepAlive; 1418 1419 assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready()); 1420 assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready()); 1421 conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err(); 1422 } 1423 */ 1424 } 1425