1 use std::fmt;
2 use std::io;
3 use std::marker::PhantomData;
4 use std::marker::Unpin;
5 use std::pin::Pin;
6 use std::task::{Context, Poll};
7 #[cfg(all(feature = "server", feature = "runtime"))]
8 use std::time::Duration;
9 
10 use bytes::{Buf, Bytes};
11 use http::header::{HeaderValue, CONNECTION};
12 use http::{HeaderMap, Method, Version};
13 use httparse::ParserConfig;
14 use tokio::io::{AsyncRead, AsyncWrite};
15 #[cfg(all(feature = "server", feature = "runtime"))]
16 use tokio::time::Sleep;
17 use tracing::{debug, error, trace};
18 
19 use super::io::Buffered;
20 use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
21 use crate::body::DecodedLength;
22 use crate::headers::connection_keep_alive;
23 use crate::proto::{BodyLength, MessageHead};
24 
25 const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
26 
27 /// This handles a connection, which will have been established over an
28 /// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
29 /// `Transaction`s over HTTP.
30 ///
31 /// The connection will determine when a message begins and ends as well as
32 /// determine if this connection can be kept alive after the message,
33 /// or if it is complete.
34 pub(crate) struct Conn<I, B, T> {
35     io: Buffered<I, EncodedBuf<B>>,
36     state: State,
37     _marker: PhantomData<fn(T)>,
38 }
39 
40 impl<I, B, T> Conn<I, B, T>
41 where
42     I: AsyncRead + AsyncWrite + Unpin,
43     B: Buf,
44     T: Http1Transaction,
45 {
new(io: I) -> Conn<I, B, T>46     pub(crate) fn new(io: I) -> Conn<I, B, T> {
47         Conn {
48             io: Buffered::new(io),
49             state: State {
50                 allow_half_close: false,
51                 cached_headers: None,
52                 error: None,
53                 keep_alive: KA::Busy,
54                 method: None,
55                 h1_parser_config: ParserConfig::default(),
56                 #[cfg(all(feature = "server", feature = "runtime"))]
57                 h1_header_read_timeout: None,
58                 #[cfg(all(feature = "server", feature = "runtime"))]
59                 h1_header_read_timeout_fut: None,
60                 #[cfg(all(feature = "server", feature = "runtime"))]
61                 h1_header_read_timeout_running: false,
62                 preserve_header_case: false,
63                 #[cfg(feature = "ffi")]
64                 preserve_header_order: false,
65                 title_case_headers: false,
66                 h09_responses: false,
67                 #[cfg(feature = "ffi")]
68                 on_informational: None,
69                 #[cfg(feature = "ffi")]
70                 raw_headers: false,
71                 notify_read: false,
72                 reading: Reading::Init,
73                 writing: Writing::Init,
74                 upgrade: None,
75                 // We assume a modern world where the remote speaks HTTP/1.1.
76                 // If they tell us otherwise, we'll downgrade in `read_head`.
77                 version: Version::HTTP_11,
78             },
79             _marker: PhantomData,
80         }
81     }
82 
83     #[cfg(feature = "server")]
set_flush_pipeline(&mut self, enabled: bool)84     pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
85         self.io.set_flush_pipeline(enabled);
86     }
87 
set_write_strategy_queue(&mut self)88     pub(crate) fn set_write_strategy_queue(&mut self) {
89         self.io.set_write_strategy_queue();
90     }
91 
set_max_buf_size(&mut self, max: usize)92     pub(crate) fn set_max_buf_size(&mut self, max: usize) {
93         self.io.set_max_buf_size(max);
94     }
95 
96     #[cfg(feature = "client")]
set_read_buf_exact_size(&mut self, sz: usize)97     pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
98         self.io.set_read_buf_exact_size(sz);
99     }
100 
set_write_strategy_flatten(&mut self)101     pub(crate) fn set_write_strategy_flatten(&mut self) {
102         self.io.set_write_strategy_flatten();
103     }
104 
105     #[cfg(feature = "client")]
set_h1_parser_config(&mut self, parser_config: ParserConfig)106     pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
107         self.state.h1_parser_config = parser_config;
108     }
109 
set_title_case_headers(&mut self)110     pub(crate) fn set_title_case_headers(&mut self) {
111         self.state.title_case_headers = true;
112     }
113 
set_preserve_header_case(&mut self)114     pub(crate) fn set_preserve_header_case(&mut self) {
115         self.state.preserve_header_case = true;
116     }
117 
118     #[cfg(feature = "ffi")]
set_preserve_header_order(&mut self)119     pub(crate) fn set_preserve_header_order(&mut self) {
120         self.state.preserve_header_order = true;
121     }
122 
123     #[cfg(feature = "client")]
set_h09_responses(&mut self)124     pub(crate) fn set_h09_responses(&mut self) {
125         self.state.h09_responses = true;
126     }
127 
128     #[cfg(all(feature = "server", feature = "runtime"))]
set_http1_header_read_timeout(&mut self, val: Duration)129     pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
130         self.state.h1_header_read_timeout = Some(val);
131     }
132 
133     #[cfg(feature = "server")]
set_allow_half_close(&mut self)134     pub(crate) fn set_allow_half_close(&mut self) {
135         self.state.allow_half_close = true;
136     }
137 
138     #[cfg(feature = "ffi")]
set_raw_headers(&mut self, enabled: bool)139     pub(crate) fn set_raw_headers(&mut self, enabled: bool) {
140         self.state.raw_headers = enabled;
141     }
142 
into_inner(self) -> (I, Bytes)143     pub(crate) fn into_inner(self) -> (I, Bytes) {
144         self.io.into_inner()
145     }
146 
pending_upgrade(&mut self) -> Option<crate::upgrade::Pending>147     pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
148         self.state.upgrade.take()
149     }
150 
is_read_closed(&self) -> bool151     pub(crate) fn is_read_closed(&self) -> bool {
152         self.state.is_read_closed()
153     }
154 
is_write_closed(&self) -> bool155     pub(crate) fn is_write_closed(&self) -> bool {
156         self.state.is_write_closed()
157     }
158 
can_read_head(&self) -> bool159     pub(crate) fn can_read_head(&self) -> bool {
160         if !matches!(self.state.reading, Reading::Init) {
161             return false;
162         }
163 
164         if T::should_read_first() {
165             return true;
166         }
167 
168         !matches!(self.state.writing, Writing::Init)
169     }
170 
can_read_body(&self) -> bool171     pub(crate) fn can_read_body(&self) -> bool {
172         match self.state.reading {
173             Reading::Body(..) | Reading::Continue(..) => true,
174             _ => false,
175         }
176     }
177 
should_error_on_eof(&self) -> bool178     fn should_error_on_eof(&self) -> bool {
179         // If we're idle, it's probably just the connection closing gracefully.
180         T::should_error_on_parse_eof() && !self.state.is_idle()
181     }
182 
has_h2_prefix(&self) -> bool183     fn has_h2_prefix(&self) -> bool {
184         let read_buf = self.io.read_buf();
185         read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
186     }
187 
poll_read_head( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>>188     pub(super) fn poll_read_head(
189         &mut self,
190         cx: &mut Context<'_>,
191     ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
192         debug_assert!(self.can_read_head());
193         trace!("Conn::read_head");
194 
195         let msg = match ready!(self.io.parse::<T>(
196             cx,
197             ParseContext {
198                 cached_headers: &mut self.state.cached_headers,
199                 req_method: &mut self.state.method,
200                 h1_parser_config: self.state.h1_parser_config.clone(),
201                 #[cfg(all(feature = "server", feature = "runtime"))]
202                 h1_header_read_timeout: self.state.h1_header_read_timeout,
203                 #[cfg(all(feature = "server", feature = "runtime"))]
204                 h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut,
205                 #[cfg(all(feature = "server", feature = "runtime"))]
206                 h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running,
207                 preserve_header_case: self.state.preserve_header_case,
208                 #[cfg(feature = "ffi")]
209                 preserve_header_order: self.state.preserve_header_order,
210                 h09_responses: self.state.h09_responses,
211                 #[cfg(feature = "ffi")]
212                 on_informational: &mut self.state.on_informational,
213                 #[cfg(feature = "ffi")]
214                 raw_headers: self.state.raw_headers,
215             }
216         )) {
217             Ok(msg) => msg,
218             Err(e) => return self.on_read_head_error(e),
219         };
220 
221         // Note: don't deconstruct `msg` into local variables, it appears
222         // the optimizer doesn't remove the extra copies.
223 
224         debug!("incoming body is {}", msg.decode);
225 
226         // Prevent accepting HTTP/0.9 responses after the initial one, if any.
227         self.state.h09_responses = false;
228 
229         // Drop any OnInformational callbacks, we're done there!
230         #[cfg(feature = "ffi")]
231         {
232             self.state.on_informational = None;
233         }
234 
235         self.state.busy();
236         self.state.keep_alive &= msg.keep_alive;
237         self.state.version = msg.head.version;
238 
239         let mut wants = if msg.wants_upgrade {
240             Wants::UPGRADE
241         } else {
242             Wants::EMPTY
243         };
244 
245         if msg.decode == DecodedLength::ZERO {
246             if msg.expect_continue {
247                 debug!("ignoring expect-continue since body is empty");
248             }
249             self.state.reading = Reading::KeepAlive;
250             if !T::should_read_first() {
251                 self.try_keep_alive(cx);
252             }
253         } else if msg.expect_continue {
254             self.state.reading = Reading::Continue(Decoder::new(msg.decode));
255             wants = wants.add(Wants::EXPECT);
256         } else {
257             self.state.reading = Reading::Body(Decoder::new(msg.decode));
258         }
259 
260         Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
261     }
262 
on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>>263     fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
264         // If we are currently waiting on a message, then an empty
265         // message should be reported as an error. If not, it is just
266         // the connection closing gracefully.
267         let must_error = self.should_error_on_eof();
268         self.close_read();
269         self.io.consume_leading_lines();
270         let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
271         if was_mid_parse || must_error {
272             // We check if the buf contains the h2 Preface
273             debug!(
274                 "parse error ({}) with {} bytes",
275                 e,
276                 self.io.read_buf().len()
277             );
278             match self.on_parse_error(e) {
279                 Ok(()) => Poll::Pending, // XXX: wat?
280                 Err(e) => Poll::Ready(Some(Err(e))),
281             }
282         } else {
283             debug!("read eof");
284             self.close_write();
285             Poll::Ready(None)
286         }
287     }
288 
poll_read_body( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<io::Result<Bytes>>>289     pub(crate) fn poll_read_body(
290         &mut self,
291         cx: &mut Context<'_>,
292     ) -> Poll<Option<io::Result<Bytes>>> {
293         debug_assert!(self.can_read_body());
294 
295         let (reading, ret) = match self.state.reading {
296             Reading::Body(ref mut decoder) => {
297                 match ready!(decoder.decode(cx, &mut self.io)) {
298                     Ok(slice) => {
299                         let (reading, chunk) = if decoder.is_eof() {
300                             debug!("incoming body completed");
301                             (
302                                 Reading::KeepAlive,
303                                 if !slice.is_empty() {
304                                     Some(Ok(slice))
305                                 } else {
306                                     None
307                                 },
308                             )
309                         } else if slice.is_empty() {
310                             error!("incoming body unexpectedly ended");
311                             // This should be unreachable, since all 3 decoders
312                             // either set eof=true or return an Err when reading
313                             // an empty slice...
314                             (Reading::Closed, None)
315                         } else {
316                             return Poll::Ready(Some(Ok(slice)));
317                         };
318                         (reading, Poll::Ready(chunk))
319                     }
320                     Err(e) => {
321                         debug!("incoming body decode error: {}", e);
322                         (Reading::Closed, Poll::Ready(Some(Err(e))))
323                     }
324                 }
325             }
326             Reading::Continue(ref decoder) => {
327                 // Write the 100 Continue if not already responded...
328                 if let Writing::Init = self.state.writing {
329                     trace!("automatically sending 100 Continue");
330                     let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
331                     self.io.headers_buf().extend_from_slice(cont);
332                 }
333 
334                 // And now recurse once in the Reading::Body state...
335                 self.state.reading = Reading::Body(decoder.clone());
336                 return self.poll_read_body(cx);
337             }
338             _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
339         };
340 
341         self.state.reading = reading;
342         self.try_keep_alive(cx);
343         ret
344     }
345 
wants_read_again(&mut self) -> bool346     pub(crate) fn wants_read_again(&mut self) -> bool {
347         let ret = self.state.notify_read;
348         self.state.notify_read = false;
349         ret
350     }
351 
poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>352     pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
353         debug_assert!(!self.can_read_head() && !self.can_read_body());
354 
355         if self.is_read_closed() {
356             Poll::Pending
357         } else if self.is_mid_message() {
358             self.mid_message_detect_eof(cx)
359         } else {
360             self.require_empty_read(cx)
361         }
362     }
363 
is_mid_message(&self) -> bool364     fn is_mid_message(&self) -> bool {
365         !matches!(
366             (&self.state.reading, &self.state.writing),
367             (&Reading::Init, &Writing::Init)
368         )
369     }
370 
371     // This will check to make sure the io object read is empty.
372     //
373     // This should only be called for Clients wanting to enter the idle
374     // state.
require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>375     fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
376         debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
377         debug_assert!(!self.is_mid_message());
378         debug_assert!(T::is_client());
379 
380         if !self.io.read_buf().is_empty() {
381             debug!("received an unexpected {} bytes", self.io.read_buf().len());
382             return Poll::Ready(Err(crate::Error::new_unexpected_message()));
383         }
384 
385         let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
386 
387         if num_read == 0 {
388             let ret = if self.should_error_on_eof() {
389                 trace!("found unexpected EOF on busy connection: {:?}", self.state);
390                 Poll::Ready(Err(crate::Error::new_incomplete()))
391             } else {
392                 trace!("found EOF on idle connection, closing");
393                 Poll::Ready(Ok(()))
394             };
395 
396             // order is important: should_error needs state BEFORE close_read
397             self.state.close_read();
398             return ret;
399         }
400 
401         debug!(
402             "received unexpected {} bytes on an idle connection",
403             num_read
404         );
405         Poll::Ready(Err(crate::Error::new_unexpected_message()))
406     }
407 
mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>408     fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
409         debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
410         debug_assert!(self.is_mid_message());
411 
412         if self.state.allow_half_close || !self.io.read_buf().is_empty() {
413             return Poll::Pending;
414         }
415 
416         let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
417 
418         if num_read == 0 {
419             trace!("found unexpected EOF on busy connection: {:?}", self.state);
420             self.state.close_read();
421             Poll::Ready(Err(crate::Error::new_incomplete()))
422         } else {
423             Poll::Ready(Ok(()))
424         }
425     }
426 
force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>>427     fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
428         debug_assert!(!self.state.is_read_closed());
429 
430         let result = ready!(self.io.poll_read_from_io(cx));
431         Poll::Ready(result.map_err(|e| {
432             trace!("force_io_read; io error = {:?}", e);
433             self.state.close();
434             e
435         }))
436     }
437 
maybe_notify(&mut self, cx: &mut Context<'_>)438     fn maybe_notify(&mut self, cx: &mut Context<'_>) {
439         // its possible that we returned NotReady from poll() without having
440         // exhausted the underlying Io. We would have done this when we
441         // determined we couldn't keep reading until we knew how writing
442         // would finish.
443 
444         match self.state.reading {
445             Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
446                 return
447             }
448             Reading::Init => (),
449         };
450 
451         match self.state.writing {
452             Writing::Body(..) => return,
453             Writing::Init | Writing::KeepAlive | Writing::Closed => (),
454         }
455 
456         if !self.io.is_read_blocked() {
457             if self.io.read_buf().is_empty() {
458                 match self.io.poll_read_from_io(cx) {
459                     Poll::Ready(Ok(n)) => {
460                         if n == 0 {
461                             trace!("maybe_notify; read eof");
462                             if self.state.is_idle() {
463                                 self.state.close();
464                             } else {
465                                 self.close_read()
466                             }
467                             return;
468                         }
469                     }
470                     Poll::Pending => {
471                         trace!("maybe_notify; read_from_io blocked");
472                         return;
473                     }
474                     Poll::Ready(Err(e)) => {
475                         trace!("maybe_notify; read_from_io error: {}", e);
476                         self.state.close();
477                         self.state.error = Some(crate::Error::new_io(e));
478                     }
479                 }
480             }
481             self.state.notify_read = true;
482         }
483     }
484 
try_keep_alive(&mut self, cx: &mut Context<'_>)485     fn try_keep_alive(&mut self, cx: &mut Context<'_>) {
486         self.state.try_keep_alive::<T>();
487         self.maybe_notify(cx);
488     }
489 
can_write_head(&self) -> bool490     pub(crate) fn can_write_head(&self) -> bool {
491         if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) {
492             return false;
493         }
494 
495         match self.state.writing {
496             Writing::Init => self.io.can_headers_buf(),
497             _ => false,
498         }
499     }
500 
can_write_body(&self) -> bool501     pub(crate) fn can_write_body(&self) -> bool {
502         match self.state.writing {
503             Writing::Body(..) => true,
504             Writing::Init | Writing::KeepAlive | Writing::Closed => false,
505         }
506     }
507 
can_buffer_body(&self) -> bool508     pub(crate) fn can_buffer_body(&self) -> bool {
509         self.io.can_buffer()
510     }
511 
write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>)512     pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
513         if let Some(encoder) = self.encode_head(head, body) {
514             self.state.writing = if !encoder.is_eof() {
515                 Writing::Body(encoder)
516             } else if encoder.is_last() {
517                 Writing::Closed
518             } else {
519                 Writing::KeepAlive
520             };
521         }
522     }
523 
write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B)524     pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) {
525         if let Some(encoder) =
526             self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64)))
527         {
528             let is_last = encoder.is_last();
529             // Make sure we don't write a body if we weren't actually allowed
530             // to do so, like because its a HEAD request.
531             if !encoder.is_eof() {
532                 encoder.danger_full_buf(body, self.io.write_buf());
533             }
534             self.state.writing = if is_last {
535                 Writing::Closed
536             } else {
537                 Writing::KeepAlive
538             }
539         }
540     }
541 
encode_head( &mut self, mut head: MessageHead<T::Outgoing>, body: Option<BodyLength>, ) -> Option<Encoder>542     fn encode_head(
543         &mut self,
544         mut head: MessageHead<T::Outgoing>,
545         body: Option<BodyLength>,
546     ) -> Option<Encoder> {
547         debug_assert!(self.can_write_head());
548 
549         if !T::should_read_first() {
550             self.state.busy();
551         }
552 
553         self.enforce_version(&mut head);
554 
555         let buf = self.io.headers_buf();
556         match super::role::encode_headers::<T>(
557             Encode {
558                 head: &mut head,
559                 body,
560                 #[cfg(feature = "server")]
561                 keep_alive: self.state.wants_keep_alive(),
562                 req_method: &mut self.state.method,
563                 title_case_headers: self.state.title_case_headers,
564             },
565             buf,
566         ) {
567             Ok(encoder) => {
568                 debug_assert!(self.state.cached_headers.is_none());
569                 debug_assert!(head.headers.is_empty());
570                 self.state.cached_headers = Some(head.headers);
571 
572                 #[cfg(feature = "ffi")]
573                 {
574                     self.state.on_informational =
575                         head.extensions.remove::<crate::ffi::OnInformational>();
576                 }
577 
578                 Some(encoder)
579             }
580             Err(err) => {
581                 self.state.error = Some(err);
582                 self.state.writing = Writing::Closed;
583                 None
584             }
585         }
586     }
587 
588     // Fix keep-alive when Connection: keep-alive header is not present
fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>)589     fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
590         let outgoing_is_keep_alive = head
591             .headers
592             .get(CONNECTION)
593             .map(connection_keep_alive)
594             .unwrap_or(false);
595 
596         if !outgoing_is_keep_alive {
597             match head.version {
598                 // If response is version 1.0 and keep-alive is not present in the response,
599                 // disable keep-alive so the server closes the connection
600                 Version::HTTP_10 => self.state.disable_keep_alive(),
601                 // If response is version 1.1 and keep-alive is wanted, add
602                 // Connection: keep-alive header when not present
603                 Version::HTTP_11 => {
604                     if self.state.wants_keep_alive() {
605                         head.headers
606                             .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
607                     }
608                 }
609                 _ => (),
610             }
611         }
612     }
613 
614     // If we know the remote speaks an older version, we try to fix up any messages
615     // to work with our older peer.
enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>)616     fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
617         if let Version::HTTP_10 = self.state.version {
618             // Fixes response or connection when keep-alive header is not present
619             self.fix_keep_alive(head);
620             // If the remote only knows HTTP/1.0, we should force ourselves
621             // to do only speak HTTP/1.0 as well.
622             head.version = Version::HTTP_10;
623         }
624         // If the remote speaks HTTP/1.1, then it *should* be fine with
625         // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
626         // the user's headers be.
627     }
628 
write_body(&mut self, chunk: B)629     pub(crate) fn write_body(&mut self, chunk: B) {
630         debug_assert!(self.can_write_body() && self.can_buffer_body());
631         // empty chunks should be discarded at Dispatcher level
632         debug_assert!(chunk.remaining() != 0);
633 
634         let state = match self.state.writing {
635             Writing::Body(ref mut encoder) => {
636                 self.io.buffer(encoder.encode(chunk));
637 
638                 if !encoder.is_eof() {
639                     return;
640                 }
641 
642                 if encoder.is_last() {
643                     Writing::Closed
644                 } else {
645                     Writing::KeepAlive
646                 }
647             }
648             _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
649         };
650 
651         self.state.writing = state;
652     }
653 
write_body_and_end(&mut self, chunk: B)654     pub(crate) fn write_body_and_end(&mut self, chunk: B) {
655         debug_assert!(self.can_write_body() && self.can_buffer_body());
656         // empty chunks should be discarded at Dispatcher level
657         debug_assert!(chunk.remaining() != 0);
658 
659         let state = match self.state.writing {
660             Writing::Body(ref encoder) => {
661                 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
662                 if can_keep_alive {
663                     Writing::KeepAlive
664                 } else {
665                     Writing::Closed
666                 }
667             }
668             _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
669         };
670 
671         self.state.writing = state;
672     }
673 
end_body(&mut self) -> crate::Result<()>674     pub(crate) fn end_body(&mut self) -> crate::Result<()> {
675         debug_assert!(self.can_write_body());
676 
677         let encoder = match self.state.writing {
678             Writing::Body(ref mut enc) => enc,
679             _ => return Ok(()),
680         };
681 
682         // end of stream, that means we should try to eof
683         match encoder.end() {
684             Ok(end) => {
685                 if let Some(end) = end {
686                     self.io.buffer(end);
687                 }
688 
689                 self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
690                     Writing::Closed
691                 } else {
692                     Writing::KeepAlive
693                 };
694 
695                 Ok(())
696             }
697             Err(not_eof) => {
698                 self.state.writing = Writing::Closed;
699                 Err(crate::Error::new_body_write_aborted().with(not_eof))
700             }
701         }
702     }
703 
704     // When we get a parse error, depending on what side we are, we might be able
705     // to write a response before closing the connection.
706     //
707     // - Client: there is nothing we can do
708     // - Server: if Response hasn't been written yet, we can send a 4xx response
on_parse_error(&mut self, err: crate::Error) -> crate::Result<()>709     fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
710         if let Writing::Init = self.state.writing {
711             if self.has_h2_prefix() {
712                 return Err(crate::Error::new_version_h2());
713             }
714             if let Some(msg) = T::on_error(&err) {
715                 // Drop the cached headers so as to not trigger a debug
716                 // assert in `write_head`...
717                 self.state.cached_headers.take();
718                 self.write_head(msg, None);
719                 self.state.error = Some(err);
720                 return Ok(());
721             }
722         }
723 
724         // fallback is pass the error back up
725         Err(err)
726     }
727 
poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>728     pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
729         ready!(Pin::new(&mut self.io).poll_flush(cx))?;
730         self.try_keep_alive(cx);
731         trace!("flushed({}): {:?}", T::LOG, self.state);
732         Poll::Ready(Ok(()))
733     }
734 
poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>>735     pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
736         match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
737             Ok(()) => {
738                 trace!("shut down IO complete");
739                 Poll::Ready(Ok(()))
740             }
741             Err(e) => {
742                 debug!("error shutting down IO: {}", e);
743                 Poll::Ready(Err(e))
744             }
745         }
746     }
747 
748     /// If the read side can be cheaply drained, do so. Otherwise, close.
poll_drain_or_close_read(&mut self, cx: &mut Context<'_>)749     pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
750         if let Reading::Continue(ref decoder) = self.state.reading {
751             // skip sending the 100-continue
752             // just move forward to a read, in case a tiny body was included
753             self.state.reading = Reading::Body(decoder.clone());
754         }
755 
756         let _ = self.poll_read_body(cx);
757 
758         // If still in Reading::Body, just give up
759         match self.state.reading {
760             Reading::Init | Reading::KeepAlive => trace!("body drained"),
761             _ => self.close_read(),
762         }
763     }
764 
close_read(&mut self)765     pub(crate) fn close_read(&mut self) {
766         self.state.close_read();
767     }
768 
close_write(&mut self)769     pub(crate) fn close_write(&mut self) {
770         self.state.close_write();
771     }
772 
773     #[cfg(feature = "server")]
disable_keep_alive(&mut self)774     pub(crate) fn disable_keep_alive(&mut self) {
775         if self.state.is_idle() {
776             trace!("disable_keep_alive; closing idle connection");
777             self.state.close();
778         } else {
779             trace!("disable_keep_alive; in-progress connection");
780             self.state.disable_keep_alive();
781         }
782     }
783 
take_error(&mut self) -> crate::Result<()>784     pub(crate) fn take_error(&mut self) -> crate::Result<()> {
785         if let Some(err) = self.state.error.take() {
786             Err(err)
787         } else {
788             Ok(())
789         }
790     }
791 
on_upgrade(&mut self) -> crate::upgrade::OnUpgrade792     pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
793         trace!("{}: prepare possible HTTP upgrade", T::LOG);
794         self.state.prepare_upgrade()
795     }
796 }
797 
798 impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result799     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
800         f.debug_struct("Conn")
801             .field("state", &self.state)
802             .field("io", &self.io)
803             .finish()
804     }
805 }
806 
807 // B and T are never pinned
808 impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
809 
810 struct State {
811     allow_half_close: bool,
812     /// Re-usable HeaderMap to reduce allocating new ones.
813     cached_headers: Option<HeaderMap>,
814     /// If an error occurs when there wasn't a direct way to return it
815     /// back to the user, this is set.
816     error: Option<crate::Error>,
817     /// Current keep-alive status.
818     keep_alive: KA,
819     /// If mid-message, the HTTP Method that started it.
820     ///
821     /// This is used to know things such as if the message can include
822     /// a body or not.
823     method: Option<Method>,
824     h1_parser_config: ParserConfig,
825     #[cfg(all(feature = "server", feature = "runtime"))]
826     h1_header_read_timeout: Option<Duration>,
827     #[cfg(all(feature = "server", feature = "runtime"))]
828     h1_header_read_timeout_fut: Option<Pin<Box<Sleep>>>,
829     #[cfg(all(feature = "server", feature = "runtime"))]
830     h1_header_read_timeout_running: bool,
831     preserve_header_case: bool,
832     #[cfg(feature = "ffi")]
833     preserve_header_order: bool,
834     title_case_headers: bool,
835     h09_responses: bool,
836     /// If set, called with each 1xx informational response received for
837     /// the current request. MUST be unset after a non-1xx response is
838     /// received.
839     #[cfg(feature = "ffi")]
840     on_informational: Option<crate::ffi::OnInformational>,
841     #[cfg(feature = "ffi")]
842     raw_headers: bool,
843     /// Set to true when the Dispatcher should poll read operations
844     /// again. See the `maybe_notify` method for more.
845     notify_read: bool,
846     /// State of allowed reads
847     reading: Reading,
848     /// State of allowed writes
849     writing: Writing,
850     /// An expected pending HTTP upgrade.
851     upgrade: Option<crate::upgrade::Pending>,
852     /// Either HTTP/1.0 or 1.1 connection
853     version: Version,
854 }
855 
856 #[derive(Debug)]
857 enum Reading {
858     Init,
859     Continue(Decoder),
860     Body(Decoder),
861     KeepAlive,
862     Closed,
863 }
864 
865 enum Writing {
866     Init,
867     Body(Encoder),
868     KeepAlive,
869     Closed,
870 }
871 
872 impl fmt::Debug for State {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result873     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
874         let mut builder = f.debug_struct("State");
875         builder
876             .field("reading", &self.reading)
877             .field("writing", &self.writing)
878             .field("keep_alive", &self.keep_alive);
879 
880         // Only show error field if it's interesting...
881         if let Some(ref error) = self.error {
882             builder.field("error", error);
883         }
884 
885         if self.allow_half_close {
886             builder.field("allow_half_close", &true);
887         }
888 
889         // Purposefully leaving off other fields..
890 
891         builder.finish()
892     }
893 }
894 
895 impl fmt::Debug for Writing {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result896     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
897         match *self {
898             Writing::Init => f.write_str("Init"),
899             Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
900             Writing::KeepAlive => f.write_str("KeepAlive"),
901             Writing::Closed => f.write_str("Closed"),
902         }
903     }
904 }
905 
906 impl std::ops::BitAndAssign<bool> for KA {
bitand_assign(&mut self, enabled: bool)907     fn bitand_assign(&mut self, enabled: bool) {
908         if !enabled {
909             trace!("remote disabling keep-alive");
910             *self = KA::Disabled;
911         }
912     }
913 }
914 
915 #[derive(Clone, Copy, Debug)]
916 enum KA {
917     Idle,
918     Busy,
919     Disabled,
920 }
921 
922 impl Default for KA {
default() -> KA923     fn default() -> KA {
924         KA::Busy
925     }
926 }
927 
928 impl KA {
idle(&mut self)929     fn idle(&mut self) {
930         *self = KA::Idle;
931     }
932 
busy(&mut self)933     fn busy(&mut self) {
934         *self = KA::Busy;
935     }
936 
disable(&mut self)937     fn disable(&mut self) {
938         *self = KA::Disabled;
939     }
940 
status(&self) -> KA941     fn status(&self) -> KA {
942         *self
943     }
944 }
945 
946 impl State {
close(&mut self)947     fn close(&mut self) {
948         trace!("State::close()");
949         self.reading = Reading::Closed;
950         self.writing = Writing::Closed;
951         self.keep_alive.disable();
952     }
953 
close_read(&mut self)954     fn close_read(&mut self) {
955         trace!("State::close_read()");
956         self.reading = Reading::Closed;
957         self.keep_alive.disable();
958     }
959 
close_write(&mut self)960     fn close_write(&mut self) {
961         trace!("State::close_write()");
962         self.writing = Writing::Closed;
963         self.keep_alive.disable();
964     }
965 
wants_keep_alive(&self) -> bool966     fn wants_keep_alive(&self) -> bool {
967         if let KA::Disabled = self.keep_alive.status() {
968             false
969         } else {
970             true
971         }
972     }
973 
try_keep_alive<T: Http1Transaction>(&mut self)974     fn try_keep_alive<T: Http1Transaction>(&mut self) {
975         match (&self.reading, &self.writing) {
976             (&Reading::KeepAlive, &Writing::KeepAlive) => {
977                 if let KA::Busy = self.keep_alive.status() {
978                     self.idle::<T>();
979                 } else {
980                     trace!(
981                         "try_keep_alive({}): could keep-alive, but status = {:?}",
982                         T::LOG,
983                         self.keep_alive
984                     );
985                     self.close();
986                 }
987             }
988             (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
989                 self.close()
990             }
991             _ => (),
992         }
993     }
994 
disable_keep_alive(&mut self)995     fn disable_keep_alive(&mut self) {
996         self.keep_alive.disable()
997     }
998 
busy(&mut self)999     fn busy(&mut self) {
1000         if let KA::Disabled = self.keep_alive.status() {
1001             return;
1002         }
1003         self.keep_alive.busy();
1004     }
1005 
idle<T: Http1Transaction>(&mut self)1006     fn idle<T: Http1Transaction>(&mut self) {
1007         debug_assert!(!self.is_idle(), "State::idle() called while idle");
1008 
1009         self.method = None;
1010         self.keep_alive.idle();
1011 
1012         if !self.is_idle() {
1013             self.close();
1014             return;
1015         }
1016 
1017         self.reading = Reading::Init;
1018         self.writing = Writing::Init;
1019 
1020         // !T::should_read_first() means Client.
1021         //
1022         // If Client connection has just gone idle, the Dispatcher
1023         // should try the poll loop one more time, so as to poll the
1024         // pending requests stream.
1025         if !T::should_read_first() {
1026             self.notify_read = true;
1027         }
1028     }
1029 
is_idle(&self) -> bool1030     fn is_idle(&self) -> bool {
1031         matches!(self.keep_alive.status(), KA::Idle)
1032     }
1033 
is_read_closed(&self) -> bool1034     fn is_read_closed(&self) -> bool {
1035         matches!(self.reading, Reading::Closed)
1036     }
1037 
is_write_closed(&self) -> bool1038     fn is_write_closed(&self) -> bool {
1039         matches!(self.writing, Writing::Closed)
1040     }
1041 
prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade1042     fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1043         let (tx, rx) = crate::upgrade::pending();
1044         self.upgrade = Some(tx);
1045         rx
1046     }
1047 }
1048 
1049 #[cfg(test)]
1050 mod tests {
1051     #[cfg(feature = "nightly")]
1052     #[bench]
bench_read_head_short(b: &mut ::test::Bencher)1053     fn bench_read_head_short(b: &mut ::test::Bencher) {
1054         use super::*;
1055         let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1056         let len = s.len();
1057         b.bytes = len as u64;
1058 
1059         // an empty IO, we'll be skipping and using the read buffer anyways
1060         let io = tokio_test::io::Builder::new().build();
1061         let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1062         *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1063         conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1064 
1065         let rt = tokio::runtime::Builder::new_current_thread()
1066             .enable_all()
1067             .build()
1068             .unwrap();
1069 
1070         b.iter(|| {
1071             rt.block_on(futures_util::future::poll_fn(|cx| {
1072                 match conn.poll_read_head(cx) {
1073                     Poll::Ready(Some(Ok(x))) => {
1074                         ::test::black_box(&x);
1075                         let mut headers = x.0.headers;
1076                         headers.clear();
1077                         conn.state.cached_headers = Some(headers);
1078                     }
1079                     f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1080                 }
1081 
1082                 conn.io.read_buf_mut().reserve(1);
1083                 unsafe {
1084                     conn.io.read_buf_mut().set_len(len);
1085                 }
1086                 conn.state.reading = Reading::Init;
1087                 Poll::Ready(())
1088             }));
1089         });
1090     }
1091 
1092     /*
1093     //TODO: rewrite these using dispatch... someday...
1094     use futures::{Async, Future, Stream, Sink};
1095     use futures::future;
1096 
1097     use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
1098     use super::super::Encoder;
1099     use mock::AsyncIo;
1100 
1101     use super::{Conn, Decoder, Reading, Writing};
1102     use ::uri::Uri;
1103 
1104     use std::str::FromStr;
1105 
1106     #[test]
1107     fn test_conn_init_read() {
1108         let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1109         let len = good_message.len();
1110         let io = AsyncIo::new_buf(good_message, len);
1111         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1112 
1113         match conn.poll().unwrap() {
1114             Async::Ready(Some(Frame::Message { message, body: false })) => {
1115                 assert_eq!(message, MessageHead {
1116                     subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1117                     .. MessageHead::default()
1118                 })
1119             },
1120             f => panic!("frame is not Frame::Message: {:?}", f)
1121         }
1122     }
1123 
1124     #[test]
1125     fn test_conn_parse_partial() {
1126         let _: Result<(), ()> = future::lazy(|| {
1127             let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1128             let io = AsyncIo::new_buf(good_message, 10);
1129             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1130             assert!(conn.poll().unwrap().is_not_ready());
1131             conn.io.io_mut().block_in(50);
1132             let async = conn.poll().unwrap();
1133             assert!(async.is_ready());
1134             match async {
1135                 Async::Ready(Some(Frame::Message { .. })) => (),
1136                 f => panic!("frame is not Message: {:?}", f),
1137             }
1138             Ok(())
1139         }).wait();
1140     }
1141 
1142     #[test]
1143     fn test_conn_init_read_eof_idle() {
1144         let io = AsyncIo::new_buf(vec![], 1);
1145         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1146         conn.state.idle();
1147 
1148         match conn.poll().unwrap() {
1149             Async::Ready(None) => {},
1150             other => panic!("frame is not None: {:?}", other)
1151         }
1152     }
1153 
1154     #[test]
1155     fn test_conn_init_read_eof_idle_partial_parse() {
1156         let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1157         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1158         conn.state.idle();
1159 
1160         match conn.poll() {
1161             Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1162             other => panic!("unexpected frame: {:?}", other)
1163         }
1164     }
1165 
1166     #[test]
1167     fn test_conn_init_read_eof_busy() {
1168         let _: Result<(), ()> = future::lazy(|| {
1169             // server ignores
1170             let io = AsyncIo::new_eof();
1171             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1172             conn.state.busy();
1173 
1174             match conn.poll().unwrap() {
1175                 Async::Ready(None) => {},
1176                 other => panic!("unexpected frame: {:?}", other)
1177             }
1178 
1179             // client
1180             let io = AsyncIo::new_eof();
1181             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1182             conn.state.busy();
1183 
1184             match conn.poll() {
1185                 Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1186                 other => panic!("unexpected frame: {:?}", other)
1187             }
1188             Ok(())
1189         }).wait();
1190     }
1191 
1192     #[test]
1193     fn test_conn_body_finish_read_eof() {
1194         let _: Result<(), ()> = future::lazy(|| {
1195             let io = AsyncIo::new_eof();
1196             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1197             conn.state.busy();
1198             conn.state.writing = Writing::KeepAlive;
1199             conn.state.reading = Reading::Body(Decoder::length(0));
1200 
1201             match conn.poll() {
1202                 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1203                 other => panic!("unexpected frame: {:?}", other)
1204             }
1205 
1206             // conn eofs, but tokio-proto will call poll() again, before calling flush()
1207             // the conn eof in this case is perfectly fine
1208 
1209             match conn.poll() {
1210                 Ok(Async::Ready(None)) => (),
1211                 other => panic!("unexpected frame: {:?}", other)
1212             }
1213             Ok(())
1214         }).wait();
1215     }
1216 
1217     #[test]
1218     fn test_conn_message_empty_body_read_eof() {
1219         let _: Result<(), ()> = future::lazy(|| {
1220             let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1221             let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1222             conn.state.busy();
1223             conn.state.writing = Writing::KeepAlive;
1224 
1225             match conn.poll() {
1226                 Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1227                 other => panic!("unexpected frame: {:?}", other)
1228             }
1229 
1230             // conn eofs, but tokio-proto will call poll() again, before calling flush()
1231             // the conn eof in this case is perfectly fine
1232 
1233             match conn.poll() {
1234                 Ok(Async::Ready(None)) => (),
1235                 other => panic!("unexpected frame: {:?}", other)
1236             }
1237             Ok(())
1238         }).wait();
1239     }
1240 
1241     #[test]
1242     fn test_conn_read_body_end() {
1243         let _: Result<(), ()> = future::lazy(|| {
1244             let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1245             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1246             conn.state.busy();
1247 
1248             match conn.poll() {
1249                 Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1250                 other => panic!("unexpected frame: {:?}", other)
1251             }
1252 
1253             match conn.poll() {
1254                 Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1255                 other => panic!("unexpected frame: {:?}", other)
1256             }
1257 
1258             // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1259             match conn.poll() {
1260                 Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1261                 other => panic!("unexpected frame: {:?}", other)
1262             }
1263 
1264             match conn.poll() {
1265                 Ok(Async::NotReady) => (),
1266                 other => panic!("unexpected frame: {:?}", other)
1267             }
1268             Ok(())
1269         }).wait();
1270     }
1271 
1272     #[test]
1273     fn test_conn_closed_read() {
1274         let io = AsyncIo::new_buf(vec![], 0);
1275         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1276         conn.state.close();
1277 
1278         match conn.poll().unwrap() {
1279             Async::Ready(None) => {},
1280             other => panic!("frame is not None: {:?}", other)
1281         }
1282     }
1283 
1284     #[test]
1285     fn test_conn_body_write_length() {
1286         let _ = pretty_env_logger::try_init();
1287         let _: Result<(), ()> = future::lazy(|| {
1288             let io = AsyncIo::new_buf(vec![], 0);
1289             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1290             let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1291             conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1292 
1293             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1294             assert!(!conn.can_buffer_body());
1295 
1296             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1297 
1298             conn.io.io_mut().block_in(1024 * 3);
1299             assert!(conn.poll_complete().unwrap().is_not_ready());
1300             conn.io.io_mut().block_in(1024 * 3);
1301             assert!(conn.poll_complete().unwrap().is_not_ready());
1302             conn.io.io_mut().block_in(max * 2);
1303             assert!(conn.poll_complete().unwrap().is_ready());
1304 
1305             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1306             Ok(())
1307         }).wait();
1308     }
1309 
1310     #[test]
1311     fn test_conn_body_write_chunked() {
1312         let _: Result<(), ()> = future::lazy(|| {
1313             let io = AsyncIo::new_buf(vec![], 4096);
1314             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1315             conn.state.writing = Writing::Body(Encoder::chunked());
1316 
1317             assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1318             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1319             Ok(())
1320         }).wait();
1321     }
1322 
1323     #[test]
1324     fn test_conn_body_flush() {
1325         let _: Result<(), ()> = future::lazy(|| {
1326             let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1327             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1328             conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1329             assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1330             assert!(!conn.can_buffer_body());
1331             conn.io.io_mut().block_in(1024 * 1024 * 5);
1332             assert!(conn.poll_complete().unwrap().is_ready());
1333             assert!(conn.can_buffer_body());
1334             assert!(conn.io.io_mut().flushed());
1335 
1336             Ok(())
1337         }).wait();
1338     }
1339 
1340     #[test]
1341     fn test_conn_parking() {
1342         use std::sync::Arc;
1343         use futures::executor::Notify;
1344         use futures::executor::NotifyHandle;
1345 
1346         struct Car {
1347             permit: bool,
1348         }
1349         impl Notify for Car {
1350             fn notify(&self, _id: usize) {
1351                 assert!(self.permit, "unparked without permit");
1352             }
1353         }
1354 
1355         fn car(permit: bool) -> NotifyHandle {
1356             Arc::new(Car {
1357                 permit: permit,
1358             }).into()
1359         }
1360 
1361         // test that once writing is done, unparks
1362         let f = future::lazy(|| {
1363             let io = AsyncIo::new_buf(vec![], 4096);
1364             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1365             conn.state.reading = Reading::KeepAlive;
1366             assert!(conn.poll().unwrap().is_not_ready());
1367 
1368             conn.state.writing = Writing::KeepAlive;
1369             assert!(conn.poll_complete().unwrap().is_ready());
1370             Ok::<(), ()>(())
1371         });
1372         ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1373 
1374 
1375         // test that flushing when not waiting on read doesn't unpark
1376         let f = future::lazy(|| {
1377             let io = AsyncIo::new_buf(vec![], 4096);
1378             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1379             conn.state.writing = Writing::KeepAlive;
1380             assert!(conn.poll_complete().unwrap().is_ready());
1381             Ok::<(), ()>(())
1382         });
1383         ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1384 
1385 
1386         // test that flushing and writing isn't done doesn't unpark
1387         let f = future::lazy(|| {
1388             let io = AsyncIo::new_buf(vec![], 4096);
1389             let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1390             conn.state.reading = Reading::KeepAlive;
1391             assert!(conn.poll().unwrap().is_not_ready());
1392             conn.state.writing = Writing::Body(Encoder::length(5_000));
1393             assert!(conn.poll_complete().unwrap().is_ready());
1394             Ok::<(), ()>(())
1395         });
1396         ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1397     }
1398 
1399     #[test]
1400     fn test_conn_closed_write() {
1401         let io = AsyncIo::new_buf(vec![], 0);
1402         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1403         conn.state.close();
1404 
1405         match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1406             Err(_e) => {},
1407             other => panic!("did not return Err: {:?}", other)
1408         }
1409 
1410         assert!(conn.state.is_write_closed());
1411     }
1412 
1413     #[test]
1414     fn test_conn_write_empty_chunk() {
1415         let io = AsyncIo::new_buf(vec![], 0);
1416         let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1417         conn.state.writing = Writing::KeepAlive;
1418 
1419         assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1420         assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1421         conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1422     }
1423     */
1424 }
1425