1 //! HTTP/1 client connections
2
3 use std::error::Error as StdError;
4 use std::fmt;
5 use std::future::Future;
6 use std::marker::Unpin;
7 use std::pin::Pin;
8 use std::task::{Context, Poll};
9
10 use bytes::Bytes;
11 use http::{Request, Response};
12 use httparse::ParserConfig;
13 use tokio::io::{AsyncRead, AsyncWrite};
14
15 use super::super::dispatch;
16 use crate::body::{Body as IncomingBody, HttpBody as Body};
17 use crate::proto;
18 use crate::upgrade::Upgraded;
19
20 type Dispatcher<T, B> =
21 proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
22
23 /// The sender side of an established connection.
24 pub struct SendRequest<B> {
25 dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>,
26 }
27
28 /// Deconstructed parts of a `Connection`.
29 ///
30 /// This allows taking apart a `Connection` at a later time, in order to
31 /// reclaim the IO object, and additional related pieces.
32 #[derive(Debug)]
33 pub struct Parts<T> {
34 /// The original IO object used in the handshake.
35 pub io: T,
36 /// A buffer of bytes that have been read but not processed as HTTP.
37 ///
38 /// For instance, if the `Connection` is used for an HTTP upgrade request,
39 /// it is possible the server sent back the first bytes of the new protocol
40 /// along with the response upgrade.
41 ///
42 /// You will want to check for any existing bytes if you plan to continue
43 /// communicating on the IO object.
44 pub read_buf: Bytes,
45 _inner: (),
46 }
47
48 /// A future that processes all HTTP state for the IO object.
49 ///
50 /// In most cases, this should just be spawned into an executor, so that it
51 /// can process incoming and outgoing messages, notice hangups, and the like.
52 #[must_use = "futures do nothing unless polled"]
53 pub struct Connection<T, B>
54 where
55 T: AsyncRead + AsyncWrite + Send + 'static,
56 B: Body + 'static,
57 {
58 inner: Option<Dispatcher<T, B>>,
59 }
60
61 impl<T, B> Connection<T, B>
62 where
63 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
64 B: Body + 'static,
65 B::Error: Into<Box<dyn StdError + Send + Sync>>,
66 {
67 /// Return the inner IO object, and additional information.
68 ///
69 /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
into_parts(self) -> Parts<T>70 pub fn into_parts(self) -> Parts<T> {
71 let (io, read_buf, _) = self.inner.expect("already upgraded").into_inner();
72 Parts {
73 io,
74 read_buf,
75 _inner: (),
76 }
77 }
78
79 /// Poll the connection for completion, but without calling `shutdown`
80 /// on the underlying IO.
81 ///
82 /// This is useful to allow running a connection while doing an HTTP
83 /// upgrade. Once the upgrade is completed, the connection would be "done",
84 /// but it is not desired to actually shutdown the IO object. Instead you
85 /// would take it back using `into_parts`.
86 ///
87 /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
88 /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
89 /// to work with this function; or use the `without_shutdown` wrapper.
poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>90 pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
91 self.inner
92 .as_mut()
93 .expect("algready upgraded")
94 .poll_without_shutdown(cx)
95 }
96
97 /// Prevent shutdown of the underlying IO object at the end of service the request,
98 /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>>99 pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>> {
100 let mut conn = Some(self);
101 futures_util::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
102 ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
103 Poll::Ready(Ok(conn.take().unwrap().into_parts()))
104 })
105 }
106 }
107
108 /// A builder to configure an HTTP connection.
109 ///
110 /// After setting options, the builder is used to create a handshake future.
111 #[derive(Clone, Debug)]
112 pub struct Builder {
113 h09_responses: bool,
114 h1_parser_config: ParserConfig,
115 h1_writev: Option<bool>,
116 h1_title_case_headers: bool,
117 h1_preserve_header_case: bool,
118 #[cfg(feature = "ffi")]
119 h1_preserve_header_order: bool,
120 h1_read_buf_exact_size: Option<usize>,
121 h1_max_buf_size: Option<usize>,
122 }
123
124 /// Returns a handshake future over some IO.
125 ///
126 /// This is a shortcut for `Builder::new().handshake(io)`.
127 /// See [`client::conn`](crate::client::conn) for more.
handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into<Box<dyn StdError + Send + Sync>>,128 pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
129 where
130 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
131 B: Body + 'static,
132 B::Data: Send,
133 B::Error: Into<Box<dyn StdError + Send + Sync>>,
134 {
135 Builder::new().handshake(io).await
136 }
137
138 // ===== impl SendRequest
139
140 impl<B> SendRequest<B> {
141 /// Polls to determine whether this sender can be used yet for a request.
142 ///
143 /// If the associated connection is closed, this returns an Error.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>144 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
145 self.dispatch.poll_ready(cx)
146 }
147
148 /// Waits until the dispatcher is ready
149 ///
150 /// If the associated connection is closed, this returns an Error.
ready(&mut self) -> crate::Result<()>151 pub async fn ready(&mut self) -> crate::Result<()> {
152 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
153 }
154
155 /*
156 pub(super) async fn when_ready(self) -> crate::Result<Self> {
157 let mut me = Some(self);
158 future::poll_fn(move |cx| {
159 ready!(me.as_mut().unwrap().poll_ready(cx))?;
160 Poll::Ready(Ok(me.take().unwrap()))
161 })
162 .await
163 }
164
165 pub(super) fn is_ready(&self) -> bool {
166 self.dispatch.is_ready()
167 }
168
169 pub(super) fn is_closed(&self) -> bool {
170 self.dispatch.is_closed()
171 }
172 */
173 }
174
175 impl<B> SendRequest<B>
176 where
177 B: Body + 'static,
178 {
179 /// Sends a `Request` on the associated connection.
180 ///
181 /// Returns a future that if successful, yields the `Response`.
182 ///
183 /// # Note
184 ///
185 /// There are some key differences in what automatic things the `Client`
186 /// does for you that will not be done here:
187 ///
188 /// - `Client` requires absolute-form `Uri`s, since the scheme and
189 /// authority are needed to connect. They aren't required here.
190 /// - Since the `Client` requires absolute-form `Uri`s, it can add
191 /// the `Host` header based on it. You must add a `Host` header yourself
192 /// before calling this method.
193 /// - Since absolute-form `Uri`s are not required, if received, they will
194 /// be serialized as-is.
send_request( &mut self, req: Request<B>, ) -> impl Future<Output = crate::Result<Response<IncomingBody>>>195 pub fn send_request(
196 &mut self,
197 req: Request<B>,
198 ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
199 let sent = self.dispatch.send(req);
200
201 async move {
202 match sent {
203 Ok(rx) => match rx.await {
204 Ok(Ok(resp)) => Ok(resp),
205 Ok(Err(err)) => Err(err),
206 // this is definite bug if it happens, but it shouldn't happen!
207 Err(_canceled) => panic!("dispatch dropped without returning error"),
208 },
209 Err(_req) => {
210 tracing::debug!("connection was not ready");
211
212 Err(crate::Error::new_canceled().with("connection was not ready"))
213 }
214 }
215 }
216 }
217
218 /*
219 pub(super) fn send_request_retryable(
220 &mut self,
221 req: Request<B>,
222 ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
223 where
224 B: Send,
225 {
226 match self.dispatch.try_send(req) {
227 Ok(rx) => {
228 Either::Left(rx.then(move |res| {
229 match res {
230 Ok(Ok(res)) => future::ok(res),
231 Ok(Err(err)) => future::err(err),
232 // this is definite bug if it happens, but it shouldn't happen!
233 Err(_) => panic!("dispatch dropped without returning error"),
234 }
235 }))
236 }
237 Err(req) => {
238 tracing::debug!("connection was not ready");
239 let err = crate::Error::new_canceled().with("connection was not ready");
240 Either::Right(future::err((err, Some(req))))
241 }
242 }
243 }
244 */
245 }
246
247 impl<B> fmt::Debug for SendRequest<B> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 f.debug_struct("SendRequest").finish()
250 }
251 }
252
253 // ===== impl Connection
254
255 impl<T, B> fmt::Debug for Connection<T, B>
256 where
257 T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
258 B: Body + 'static,
259 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 f.debug_struct("Connection").finish()
262 }
263 }
264
265 impl<T, B> Future for Connection<T, B>
266 where
267 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
268 B: Body + Send + 'static,
269 B::Data: Send,
270 B::Error: Into<Box<dyn StdError + Send + Sync>>,
271 {
272 type Output = crate::Result<()>;
273
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>274 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
275 match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
276 proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
277 proto::Dispatched::Upgrade(pending) => match self.inner.take() {
278 Some(h1) => {
279 let (io, buf, _) = h1.into_inner();
280 pending.fulfill(Upgraded::new(io, buf));
281 Poll::Ready(Ok(()))
282 }
283 _ => {
284 drop(pending);
285 unreachable!("Upgraded twice");
286 }
287 },
288 }
289 }
290 }
291
292 // ===== impl Builder
293
294 impl Builder {
295 /// Creates a new connection builder.
296 #[inline]
new() -> Builder297 pub fn new() -> Builder {
298 Builder {
299 h09_responses: false,
300 h1_writev: None,
301 h1_read_buf_exact_size: None,
302 h1_parser_config: Default::default(),
303 h1_title_case_headers: false,
304 h1_preserve_header_case: false,
305 #[cfg(feature = "ffi")]
306 h1_preserve_header_order: false,
307 h1_max_buf_size: None,
308 }
309 }
310
311 /// Set whether HTTP/0.9 responses should be tolerated.
312 ///
313 /// Default is false.
http09_responses(&mut self, enabled: bool) -> &mut Builder314 pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
315 self.h09_responses = enabled;
316 self
317 }
318
319 /// Set whether HTTP/1 connections will accept spaces between header names
320 /// and the colon that follow them in responses.
321 ///
322 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
323 /// to say about it:
324 ///
325 /// > No whitespace is allowed between the header field-name and colon. In
326 /// > the past, differences in the handling of such whitespace have led to
327 /// > security vulnerabilities in request routing and response handling. A
328 /// > server MUST reject any received request message that contains
329 /// > whitespace between a header field-name and colon with a response code
330 /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
331 /// > response message before forwarding the message downstream.
332 ///
333 /// Note that this setting does not affect HTTP/2.
334 ///
335 /// Default is false.
336 ///
337 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder338 pub fn allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder {
339 self.h1_parser_config
340 .allow_spaces_after_header_name_in_responses(enabled);
341 self
342 }
343
344 /// Set whether HTTP/1 connections will accept obsolete line folding for
345 /// header values.
346 ///
347 /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
348 /// parsing.
349 ///
350 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
351 /// to say about it:
352 ///
353 /// > A server that receives an obs-fold in a request message that is not
354 /// > within a message/http container MUST either reject the message by
355 /// > sending a 400 (Bad Request), preferably with a representation
356 /// > explaining that obsolete line folding is unacceptable, or replace
357 /// > each received obs-fold with one or more SP octets prior to
358 /// > interpreting the field value or forwarding the message downstream.
359 ///
360 /// > A proxy or gateway that receives an obs-fold in a response message
361 /// > that is not within a message/http container MUST either discard the
362 /// > message and replace it with a 502 (Bad Gateway) response, preferably
363 /// > with a representation explaining that unacceptable line folding was
364 /// > received, or replace each received obs-fold with one or more SP
365 /// > octets prior to interpreting the field value or forwarding the
366 /// > message downstream.
367 ///
368 /// > A user agent that receives an obs-fold in a response message that is
369 /// > not within a message/http container MUST replace each received
370 /// > obs-fold with one or more SP octets prior to interpreting the field
371 /// > value.
372 ///
373 /// Default is false.
374 ///
375 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder376 pub fn allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
377 self.h1_parser_config
378 .allow_obsolete_multiline_headers_in_responses(enabled);
379 self
380 }
381
382 /// Set whether HTTP/1 connections will silently ignored malformed header lines.
383 ///
384 /// If this is enabled and and a header line does not start with a valid header
385 /// name, or does not include a colon at all, the line will be silently ignored
386 /// and no error will be reported.
387 ///
388 /// Default is false.
ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder389 pub fn ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
390 self.h1_parser_config
391 .ignore_invalid_headers_in_responses(enabled);
392 self
393 }
394
395 /// Set whether HTTP/1 connections should try to use vectored writes,
396 /// or always flatten into a single buffer.
397 ///
398 /// Note that setting this to false may mean more copies of body data,
399 /// but may also improve performance when an IO transport doesn't
400 /// support vectored writes well, such as most TLS implementations.
401 ///
402 /// Setting this to true will force hyper to use queued strategy
403 /// which may eliminate unnecessary cloning on some TLS backends
404 ///
405 /// Default is `auto`. In this mode hyper will try to guess which
406 /// mode to use
writev(&mut self, enabled: bool) -> &mut Builder407 pub fn writev(&mut self, enabled: bool) -> &mut Builder {
408 self.h1_writev = Some(enabled);
409 self
410 }
411
412 /// Set whether HTTP/1 connections will write header names as title case at
413 /// the socket level.
414 ///
415 /// Default is false.
title_case_headers(&mut self, enabled: bool) -> &mut Builder416 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Builder {
417 self.h1_title_case_headers = enabled;
418 self
419 }
420
421 /// Set whether to support preserving original header cases.
422 ///
423 /// Currently, this will record the original cases received, and store them
424 /// in a private extension on the `Response`. It will also look for and use
425 /// such an extension in any provided `Request`.
426 ///
427 /// Since the relevant extension is still private, there is no way to
428 /// interact with the original cases. The only effect this can have now is
429 /// to forward the cases in a proxy-like fashion.
430 ///
431 /// Default is false.
preserve_header_case(&mut self, enabled: bool) -> &mut Builder432 pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
433 self.h1_preserve_header_case = enabled;
434 self
435 }
436
437 /// Set whether to support preserving original header order.
438 ///
439 /// Currently, this will record the order in which headers are received, and store this
440 /// ordering in a private extension on the `Response`. It will also look for and use
441 /// such an extension in any provided `Request`.
442 ///
443 /// Default is false.
444 #[cfg(feature = "ffi")]
preserve_header_order(&mut self, enabled: bool) -> &mut Builder445 pub fn preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
446 self.h1_preserve_header_order = enabled;
447 self
448 }
449
450 /// Sets the exact size of the read buffer to *always* use.
451 ///
452 /// Note that setting this option unsets the `max_buf_size` option.
453 ///
454 /// Default is an adaptive read buffer.
read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder455 pub fn read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
456 self.h1_read_buf_exact_size = sz;
457 self.h1_max_buf_size = None;
458 self
459 }
460
461 /// Set the maximum buffer size for the connection.
462 ///
463 /// Default is ~400kb.
464 ///
465 /// Note that setting this option unsets the `read_exact_buf_size` option.
466 ///
467 /// # Panics
468 ///
469 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
max_buf_size(&mut self, max: usize) -> &mut Self470 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
471 assert!(
472 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
473 "the max_buf_size cannot be smaller than the minimum that h1 specifies."
474 );
475
476 self.h1_max_buf_size = Some(max);
477 self.h1_read_buf_exact_size = None;
478 self
479 }
480
481 /// Constructs a connection with the configured options and IO.
482 /// See [`client::conn`](crate::client::conn) for more.
483 ///
484 /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
485 /// do nothing.
handshake<T, B>( &self, io: T, ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into<Box<dyn StdError + Send + Sync>>,486 pub fn handshake<T, B>(
487 &self,
488 io: T,
489 ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
490 where
491 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
492 B: Body + 'static,
493 B::Data: Send,
494 B::Error: Into<Box<dyn StdError + Send + Sync>>,
495 {
496 let opts = self.clone();
497
498 async move {
499 tracing::trace!("client handshake HTTP/1");
500
501 let (tx, rx) = dispatch::channel();
502 let mut conn = proto::Conn::new(io);
503 conn.set_h1_parser_config(opts.h1_parser_config);
504 if let Some(writev) = opts.h1_writev {
505 if writev {
506 conn.set_write_strategy_queue();
507 } else {
508 conn.set_write_strategy_flatten();
509 }
510 }
511 if opts.h1_title_case_headers {
512 conn.set_title_case_headers();
513 }
514 if opts.h1_preserve_header_case {
515 conn.set_preserve_header_case();
516 }
517 #[cfg(feature = "ffi")]
518 if opts.h1_preserve_header_order {
519 conn.set_preserve_header_order();
520 }
521
522 if opts.h09_responses {
523 conn.set_h09_responses();
524 }
525
526 if let Some(sz) = opts.h1_read_buf_exact_size {
527 conn.set_read_buf_exact_size(sz);
528 }
529 if let Some(max) = opts.h1_max_buf_size {
530 conn.set_max_buf_size(max);
531 }
532 let cd = proto::h1::dispatch::Client::new(rx);
533 let proto = proto::h1::Dispatcher::new(cd, conn);
534
535 Ok((
536 SendRequest { dispatch: tx },
537 Connection { inner: Some(proto) },
538 ))
539 }
540 }
541 }
542