1 //! HTTP/1 client connections
2 
3 use std::error::Error as StdError;
4 use std::fmt;
5 use std::future::Future;
6 use std::marker::Unpin;
7 use std::pin::Pin;
8 use std::task::{Context, Poll};
9 
10 use bytes::Bytes;
11 use http::{Request, Response};
12 use httparse::ParserConfig;
13 use tokio::io::{AsyncRead, AsyncWrite};
14 
15 use super::super::dispatch;
16 use crate::body::{Body as IncomingBody, HttpBody as Body};
17 use crate::proto;
18 use crate::upgrade::Upgraded;
19 
20 type Dispatcher<T, B> =
21     proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
22 
23 /// The sender side of an established connection.
24 pub struct SendRequest<B> {
25     dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>,
26 }
27 
28 /// Deconstructed parts of a `Connection`.
29 ///
30 /// This allows taking apart a `Connection` at a later time, in order to
31 /// reclaim the IO object, and additional related pieces.
32 #[derive(Debug)]
33 pub struct Parts<T> {
34     /// The original IO object used in the handshake.
35     pub io: T,
36     /// A buffer of bytes that have been read but not processed as HTTP.
37     ///
38     /// For instance, if the `Connection` is used for an HTTP upgrade request,
39     /// it is possible the server sent back the first bytes of the new protocol
40     /// along with the response upgrade.
41     ///
42     /// You will want to check for any existing bytes if you plan to continue
43     /// communicating on the IO object.
44     pub read_buf: Bytes,
45     _inner: (),
46 }
47 
48 /// A future that processes all HTTP state for the IO object.
49 ///
50 /// In most cases, this should just be spawned into an executor, so that it
51 /// can process incoming and outgoing messages, notice hangups, and the like.
52 #[must_use = "futures do nothing unless polled"]
53 pub struct Connection<T, B>
54 where
55     T: AsyncRead + AsyncWrite + Send + 'static,
56     B: Body + 'static,
57 {
58     inner: Option<Dispatcher<T, B>>,
59 }
60 
61 impl<T, B> Connection<T, B>
62 where
63     T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
64     B: Body + 'static,
65     B::Error: Into<Box<dyn StdError + Send + Sync>>,
66 {
67     /// Return the inner IO object, and additional information.
68     ///
69     /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
into_parts(self) -> Parts<T>70     pub fn into_parts(self) -> Parts<T> {
71         let (io, read_buf, _) = self.inner.expect("already upgraded").into_inner();
72         Parts {
73             io,
74             read_buf,
75             _inner: (),
76         }
77     }
78 
79     /// Poll the connection for completion, but without calling `shutdown`
80     /// on the underlying IO.
81     ///
82     /// This is useful to allow running a connection while doing an HTTP
83     /// upgrade. Once the upgrade is completed, the connection would be "done",
84     /// but it is not desired to actually shutdown the IO object. Instead you
85     /// would take it back using `into_parts`.
86     ///
87     /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
88     /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
89     /// to work with this function; or use the `without_shutdown` wrapper.
poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>90     pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
91         self.inner
92             .as_mut()
93             .expect("algready upgraded")
94             .poll_without_shutdown(cx)
95     }
96 
97     /// Prevent shutdown of the underlying IO object at the end of service the request,
98     /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>>99     pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>> {
100         let mut conn = Some(self);
101         futures_util::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
102             ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
103             Poll::Ready(Ok(conn.take().unwrap().into_parts()))
104         })
105     }
106 }
107 
108 /// A builder to configure an HTTP connection.
109 ///
110 /// After setting options, the builder is used to create a handshake future.
111 #[derive(Clone, Debug)]
112 pub struct Builder {
113     h09_responses: bool,
114     h1_parser_config: ParserConfig,
115     h1_writev: Option<bool>,
116     h1_title_case_headers: bool,
117     h1_preserve_header_case: bool,
118     #[cfg(feature = "ffi")]
119     h1_preserve_header_order: bool,
120     h1_read_buf_exact_size: Option<usize>,
121     h1_max_buf_size: Option<usize>,
122 }
123 
124 /// Returns a handshake future over some IO.
125 ///
126 /// This is a shortcut for `Builder::new().handshake(io)`.
127 /// See [`client::conn`](crate::client::conn) for more.
handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into<Box<dyn StdError + Send + Sync>>,128 pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
129 where
130     T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
131     B: Body + 'static,
132     B::Data: Send,
133     B::Error: Into<Box<dyn StdError + Send + Sync>>,
134 {
135     Builder::new().handshake(io).await
136 }
137 
138 // ===== impl SendRequest
139 
140 impl<B> SendRequest<B> {
141     /// Polls to determine whether this sender can be used yet for a request.
142     ///
143     /// If the associated connection is closed, this returns an Error.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>144     pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
145         self.dispatch.poll_ready(cx)
146     }
147 
148     /// Waits until the dispatcher is ready
149     ///
150     /// If the associated connection is closed, this returns an Error.
ready(&mut self) -> crate::Result<()>151     pub async fn ready(&mut self) -> crate::Result<()> {
152         futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
153     }
154 
155     /*
156     pub(super) async fn when_ready(self) -> crate::Result<Self> {
157         let mut me = Some(self);
158         future::poll_fn(move |cx| {
159             ready!(me.as_mut().unwrap().poll_ready(cx))?;
160             Poll::Ready(Ok(me.take().unwrap()))
161         })
162         .await
163     }
164 
165     pub(super) fn is_ready(&self) -> bool {
166         self.dispatch.is_ready()
167     }
168 
169     pub(super) fn is_closed(&self) -> bool {
170         self.dispatch.is_closed()
171     }
172     */
173 }
174 
175 impl<B> SendRequest<B>
176 where
177     B: Body + 'static,
178 {
179     /// Sends a `Request` on the associated connection.
180     ///
181     /// Returns a future that if successful, yields the `Response`.
182     ///
183     /// # Note
184     ///
185     /// There are some key differences in what automatic things the `Client`
186     /// does for you that will not be done here:
187     ///
188     /// - `Client` requires absolute-form `Uri`s, since the scheme and
189     ///   authority are needed to connect. They aren't required here.
190     /// - Since the `Client` requires absolute-form `Uri`s, it can add
191     ///   the `Host` header based on it. You must add a `Host` header yourself
192     ///   before calling this method.
193     /// - Since absolute-form `Uri`s are not required, if received, they will
194     ///   be serialized as-is.
send_request( &mut self, req: Request<B>, ) -> impl Future<Output = crate::Result<Response<IncomingBody>>>195     pub fn send_request(
196         &mut self,
197         req: Request<B>,
198     ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
199         let sent = self.dispatch.send(req);
200 
201         async move {
202             match sent {
203                 Ok(rx) => match rx.await {
204                     Ok(Ok(resp)) => Ok(resp),
205                     Ok(Err(err)) => Err(err),
206                     // this is definite bug if it happens, but it shouldn't happen!
207                     Err(_canceled) => panic!("dispatch dropped without returning error"),
208                 },
209                 Err(_req) => {
210                     tracing::debug!("connection was not ready");
211 
212                     Err(crate::Error::new_canceled().with("connection was not ready"))
213                 }
214             }
215         }
216     }
217 
218     /*
219     pub(super) fn send_request_retryable(
220         &mut self,
221         req: Request<B>,
222     ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
223     where
224         B: Send,
225     {
226         match self.dispatch.try_send(req) {
227             Ok(rx) => {
228                 Either::Left(rx.then(move |res| {
229                     match res {
230                         Ok(Ok(res)) => future::ok(res),
231                         Ok(Err(err)) => future::err(err),
232                         // this is definite bug if it happens, but it shouldn't happen!
233                         Err(_) => panic!("dispatch dropped without returning error"),
234                     }
235                 }))
236             }
237             Err(req) => {
238                 tracing::debug!("connection was not ready");
239                 let err = crate::Error::new_canceled().with("connection was not ready");
240                 Either::Right(future::err((err, Some(req))))
241             }
242         }
243     }
244     */
245 }
246 
247 impl<B> fmt::Debug for SendRequest<B> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result248     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249         f.debug_struct("SendRequest").finish()
250     }
251 }
252 
253 // ===== impl Connection
254 
255 impl<T, B> fmt::Debug for Connection<T, B>
256 where
257     T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
258     B: Body + 'static,
259 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result260     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261         f.debug_struct("Connection").finish()
262     }
263 }
264 
265 impl<T, B> Future for Connection<T, B>
266 where
267     T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
268     B: Body + Send + 'static,
269     B::Data: Send,
270     B::Error: Into<Box<dyn StdError + Send + Sync>>,
271 {
272     type Output = crate::Result<()>;
273 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>274     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
275         match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
276             proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
277             proto::Dispatched::Upgrade(pending) => match self.inner.take() {
278                 Some(h1) => {
279                     let (io, buf, _) = h1.into_inner();
280                     pending.fulfill(Upgraded::new(io, buf));
281                     Poll::Ready(Ok(()))
282                 }
283                 _ => {
284                     drop(pending);
285                     unreachable!("Upgraded twice");
286                 }
287             },
288         }
289     }
290 }
291 
292 // ===== impl Builder
293 
294 impl Builder {
295     /// Creates a new connection builder.
296     #[inline]
new() -> Builder297     pub fn new() -> Builder {
298         Builder {
299             h09_responses: false,
300             h1_writev: None,
301             h1_read_buf_exact_size: None,
302             h1_parser_config: Default::default(),
303             h1_title_case_headers: false,
304             h1_preserve_header_case: false,
305             #[cfg(feature = "ffi")]
306             h1_preserve_header_order: false,
307             h1_max_buf_size: None,
308         }
309     }
310 
311     /// Set whether HTTP/0.9 responses should be tolerated.
312     ///
313     /// Default is false.
http09_responses(&mut self, enabled: bool) -> &mut Builder314     pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
315         self.h09_responses = enabled;
316         self
317     }
318 
319     /// Set whether HTTP/1 connections will accept spaces between header names
320     /// and the colon that follow them in responses.
321     ///
322     /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
323     /// to say about it:
324     ///
325     /// > No whitespace is allowed between the header field-name and colon. In
326     /// > the past, differences in the handling of such whitespace have led to
327     /// > security vulnerabilities in request routing and response handling. A
328     /// > server MUST reject any received request message that contains
329     /// > whitespace between a header field-name and colon with a response code
330     /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
331     /// > response message before forwarding the message downstream.
332     ///
333     /// Note that this setting does not affect HTTP/2.
334     ///
335     /// Default is false.
336     ///
337     /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder338     pub fn allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder {
339         self.h1_parser_config
340             .allow_spaces_after_header_name_in_responses(enabled);
341         self
342     }
343 
344     /// Set whether HTTP/1 connections will accept obsolete line folding for
345     /// header values.
346     ///
347     /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
348     /// parsing.
349     ///
350     /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
351     /// to say about it:
352     ///
353     /// > A server that receives an obs-fold in a request message that is not
354     /// > within a message/http container MUST either reject the message by
355     /// > sending a 400 (Bad Request), preferably with a representation
356     /// > explaining that obsolete line folding is unacceptable, or replace
357     /// > each received obs-fold with one or more SP octets prior to
358     /// > interpreting the field value or forwarding the message downstream.
359     ///
360     /// > A proxy or gateway that receives an obs-fold in a response message
361     /// > that is not within a message/http container MUST either discard the
362     /// > message and replace it with a 502 (Bad Gateway) response, preferably
363     /// > with a representation explaining that unacceptable line folding was
364     /// > received, or replace each received obs-fold with one or more SP
365     /// > octets prior to interpreting the field value or forwarding the
366     /// > message downstream.
367     ///
368     /// > A user agent that receives an obs-fold in a response message that is
369     /// > not within a message/http container MUST replace each received
370     /// > obs-fold with one or more SP octets prior to interpreting the field
371     /// > value.
372     ///
373     /// Default is false.
374     ///
375     /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder376     pub fn allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
377         self.h1_parser_config
378             .allow_obsolete_multiline_headers_in_responses(enabled);
379         self
380     }
381 
382     /// Set whether HTTP/1 connections will silently ignored malformed header lines.
383     ///
384     /// If this is enabled and and a header line does not start with a valid header
385     /// name, or does not include a colon at all, the line will be silently ignored
386     /// and no error will be reported.
387     ///
388     /// Default is false.
ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder389     pub fn ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
390         self.h1_parser_config
391             .ignore_invalid_headers_in_responses(enabled);
392         self
393     }
394 
395     /// Set whether HTTP/1 connections should try to use vectored writes,
396     /// or always flatten into a single buffer.
397     ///
398     /// Note that setting this to false may mean more copies of body data,
399     /// but may also improve performance when an IO transport doesn't
400     /// support vectored writes well, such as most TLS implementations.
401     ///
402     /// Setting this to true will force hyper to use queued strategy
403     /// which may eliminate unnecessary cloning on some TLS backends
404     ///
405     /// Default is `auto`. In this mode hyper will try to guess which
406     /// mode to use
writev(&mut self, enabled: bool) -> &mut Builder407     pub fn writev(&mut self, enabled: bool) -> &mut Builder {
408         self.h1_writev = Some(enabled);
409         self
410     }
411 
412     /// Set whether HTTP/1 connections will write header names as title case at
413     /// the socket level.
414     ///
415     /// Default is false.
title_case_headers(&mut self, enabled: bool) -> &mut Builder416     pub fn title_case_headers(&mut self, enabled: bool) -> &mut Builder {
417         self.h1_title_case_headers = enabled;
418         self
419     }
420 
421     /// Set whether to support preserving original header cases.
422     ///
423     /// Currently, this will record the original cases received, and store them
424     /// in a private extension on the `Response`. It will also look for and use
425     /// such an extension in any provided `Request`.
426     ///
427     /// Since the relevant extension is still private, there is no way to
428     /// interact with the original cases. The only effect this can have now is
429     /// to forward the cases in a proxy-like fashion.
430     ///
431     /// Default is false.
preserve_header_case(&mut self, enabled: bool) -> &mut Builder432     pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
433         self.h1_preserve_header_case = enabled;
434         self
435     }
436 
437     /// Set whether to support preserving original header order.
438     ///
439     /// Currently, this will record the order in which headers are received, and store this
440     /// ordering in a private extension on the `Response`. It will also look for and use
441     /// such an extension in any provided `Request`.
442     ///
443     /// Default is false.
444     #[cfg(feature = "ffi")]
preserve_header_order(&mut self, enabled: bool) -> &mut Builder445     pub fn preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
446         self.h1_preserve_header_order = enabled;
447         self
448     }
449 
450     /// Sets the exact size of the read buffer to *always* use.
451     ///
452     /// Note that setting this option unsets the `max_buf_size` option.
453     ///
454     /// Default is an adaptive read buffer.
read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder455     pub fn read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
456         self.h1_read_buf_exact_size = sz;
457         self.h1_max_buf_size = None;
458         self
459     }
460 
461     /// Set the maximum buffer size for the connection.
462     ///
463     /// Default is ~400kb.
464     ///
465     /// Note that setting this option unsets the `read_exact_buf_size` option.
466     ///
467     /// # Panics
468     ///
469     /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
max_buf_size(&mut self, max: usize) -> &mut Self470     pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
471         assert!(
472             max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
473             "the max_buf_size cannot be smaller than the minimum that h1 specifies."
474         );
475 
476         self.h1_max_buf_size = Some(max);
477         self.h1_read_buf_exact_size = None;
478         self
479     }
480 
481     /// Constructs a connection with the configured options and IO.
482     /// See [`client::conn`](crate::client::conn) for more.
483     ///
484     /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
485     /// do nothing.
handshake<T, B>( &self, io: T, ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into<Box<dyn StdError + Send + Sync>>,486     pub fn handshake<T, B>(
487         &self,
488         io: T,
489     ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
490     where
491         T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
492         B: Body + 'static,
493         B::Data: Send,
494         B::Error: Into<Box<dyn StdError + Send + Sync>>,
495     {
496         let opts = self.clone();
497 
498         async move {
499             tracing::trace!("client handshake HTTP/1");
500 
501             let (tx, rx) = dispatch::channel();
502             let mut conn = proto::Conn::new(io);
503             conn.set_h1_parser_config(opts.h1_parser_config);
504             if let Some(writev) = opts.h1_writev {
505                 if writev {
506                     conn.set_write_strategy_queue();
507                 } else {
508                     conn.set_write_strategy_flatten();
509                 }
510             }
511             if opts.h1_title_case_headers {
512                 conn.set_title_case_headers();
513             }
514             if opts.h1_preserve_header_case {
515                 conn.set_preserve_header_case();
516             }
517             #[cfg(feature = "ffi")]
518             if opts.h1_preserve_header_order {
519                 conn.set_preserve_header_order();
520             }
521 
522             if opts.h09_responses {
523                 conn.set_h09_responses();
524             }
525 
526             if let Some(sz) = opts.h1_read_buf_exact_size {
527                 conn.set_read_buf_exact_size(sz);
528             }
529             if let Some(max) = opts.h1_max_buf_size {
530                 conn.set_max_buf_size(max);
531             }
532             let cd = proto::h1::dispatch::Client::new(rx);
533             let proto = proto::h1::Dispatcher::new(cd, conn);
534 
535             Ok((
536                 SendRequest { dispatch: tx },
537                 Connection { inner: Some(proto) },
538             ))
539         }
540     }
541 }
542