1 //! HTTP/2 client connections
2 
3 use std::error::Error as StdError;
4 use std::fmt;
5 use std::future::Future;
6 use std::marker::PhantomData;
7 use std::marker::Unpin;
8 use std::pin::Pin;
9 use std::sync::Arc;
10 use std::task::{Context, Poll};
11 use std::time::Duration;
12 
13 use http::{Request, Response};
14 use tokio::io::{AsyncRead, AsyncWrite};
15 
16 use super::super::dispatch;
17 use crate::body::{Body as IncomingBody, HttpBody as Body};
18 use crate::common::exec::{BoxSendFuture, Exec};
19 use crate::proto;
20 use crate::rt::Executor;
21 
22 /// The sender side of an established connection.
23 pub struct SendRequest<B> {
24     dispatch: dispatch::UnboundedSender<Request<B>, Response<IncomingBody>>,
25 }
26 
27 impl<B> Clone for SendRequest<B> {
clone(&self) -> SendRequest<B>28     fn clone(&self) -> SendRequest<B> {
29         SendRequest {
30             dispatch: self.dispatch.clone(),
31         }
32     }
33 }
34 
35 /// A future that processes all HTTP state for the IO object.
36 ///
37 /// In most cases, this should just be spawned into an executor, so that it
38 /// can process incoming and outgoing messages, notice hangups, and the like.
39 #[must_use = "futures do nothing unless polled"]
40 pub struct Connection<T, B>
41 where
42     T: AsyncRead + AsyncWrite + Send + 'static,
43     B: Body + 'static,
44 {
45     inner: (PhantomData<T>, proto::h2::ClientTask<B>),
46 }
47 
48 /// A builder to configure an HTTP connection.
49 ///
50 /// After setting options, the builder is used to create a handshake future.
51 #[derive(Clone, Debug)]
52 pub struct Builder {
53     pub(super) exec: Exec,
54     h2_builder: proto::h2::client::Config,
55 }
56 
57 /// Returns a handshake future over some IO.
58 ///
59 /// This is a shortcut for `Builder::new().handshake(io)`.
60 /// See [`client::conn`](crate::client::conn) for more.
handshake<E, T, B>(exec: E, io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)> where E: Executor<BoxSendFuture> + Send + Sync + 'static, T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into<Box<dyn StdError + Send + Sync>>,61 pub async fn handshake<E, T, B>(exec: E, io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
62 where
63     E: Executor<BoxSendFuture> + Send + Sync + 'static,
64     T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
65     B: Body + 'static,
66     B::Data: Send,
67     B::Error: Into<Box<dyn StdError + Send + Sync>>,
68 {
69     Builder::new(exec).handshake(io).await
70 }
71 
72 // ===== impl SendRequest
73 
74 impl<B> SendRequest<B> {
75     /// Polls to determine whether this sender can be used yet for a request.
76     ///
77     /// If the associated connection is closed, this returns an Error.
poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<crate::Result<()>>78     pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
79         if self.is_closed() {
80             Poll::Ready(Err(crate::Error::new_closed()))
81         } else {
82             Poll::Ready(Ok(()))
83         }
84     }
85 
86     /// Waits until the dispatcher is ready
87     ///
88     /// If the associated connection is closed, this returns an Error.
ready(&mut self) -> crate::Result<()>89     pub async fn ready(&mut self) -> crate::Result<()> {
90         futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
91     }
92 
93     /*
94     pub(super) async fn when_ready(self) -> crate::Result<Self> {
95         let mut me = Some(self);
96         future::poll_fn(move |cx| {
97             ready!(me.as_mut().unwrap().poll_ready(cx))?;
98             Poll::Ready(Ok(me.take().unwrap()))
99         })
100         .await
101     }
102 
103     pub(super) fn is_ready(&self) -> bool {
104         self.dispatch.is_ready()
105     }
106     */
107 
is_closed(&self) -> bool108     pub(super) fn is_closed(&self) -> bool {
109         self.dispatch.is_closed()
110     }
111 }
112 
113 impl<B> SendRequest<B>
114 where
115     B: Body + 'static,
116 {
117     /// Sends a `Request` on the associated connection.
118     ///
119     /// Returns a future that if successful, yields the `Response`.
120     ///
121     /// # Note
122     ///
123     /// There are some key differences in what automatic things the `Client`
124     /// does for you that will not be done here:
125     ///
126     /// - `Client` requires absolute-form `Uri`s, since the scheme and
127     ///   authority are needed to connect. They aren't required here.
128     /// - Since the `Client` requires absolute-form `Uri`s, it can add
129     ///   the `Host` header based on it. You must add a `Host` header yourself
130     ///   before calling this method.
131     /// - Since absolute-form `Uri`s are not required, if received, they will
132     ///   be serialized as-is.
send_request( &mut self, req: Request<B>, ) -> impl Future<Output = crate::Result<Response<IncomingBody>>>133     pub fn send_request(
134         &mut self,
135         req: Request<B>,
136     ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
137         let sent = self.dispatch.send(req);
138 
139         async move {
140             match sent {
141                 Ok(rx) => match rx.await {
142                     Ok(Ok(resp)) => Ok(resp),
143                     Ok(Err(err)) => Err(err),
144                     // this is definite bug if it happens, but it shouldn't happen!
145                     Err(_canceled) => panic!("dispatch dropped without returning error"),
146                 },
147                 Err(_req) => {
148                     tracing::debug!("connection was not ready");
149 
150                     Err(crate::Error::new_canceled().with("connection was not ready"))
151                 }
152             }
153         }
154     }
155 
156     /*
157     pub(super) fn send_request_retryable(
158         &mut self,
159         req: Request<B>,
160     ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
161     where
162         B: Send,
163     {
164         match self.dispatch.try_send(req) {
165             Ok(rx) => {
166                 Either::Left(rx.then(move |res| {
167                     match res {
168                         Ok(Ok(res)) => future::ok(res),
169                         Ok(Err(err)) => future::err(err),
170                         // this is definite bug if it happens, but it shouldn't happen!
171                         Err(_) => panic!("dispatch dropped without returning error"),
172                     }
173                 }))
174             }
175             Err(req) => {
176                 tracing::debug!("connection was not ready");
177                 let err = crate::Error::new_canceled().with("connection was not ready");
178                 Either::Right(future::err((err, Some(req))))
179             }
180         }
181     }
182     */
183 }
184 
185 impl<B> fmt::Debug for SendRequest<B> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result186     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187         f.debug_struct("SendRequest").finish()
188     }
189 }
190 
191 // ===== impl Connection
192 
193 impl<T, B> Connection<T, B>
194 where
195     T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
196     B: Body + Unpin + Send + 'static,
197     B::Data: Send,
198     B::Error: Into<Box<dyn StdError + Send + Sync>>,
199 {
200     /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
201     ///
202     /// This setting is configured by the server peer by sending the
203     /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
204     /// This method returns the currently acknowledged value received from the
205     /// remote.
206     ///
207     /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
208     /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
is_extended_connect_protocol_enabled(&self) -> bool209     pub fn is_extended_connect_protocol_enabled(&self) -> bool {
210         self.inner.1.is_extended_connect_protocol_enabled()
211     }
212 }
213 
214 impl<T, B> fmt::Debug for Connection<T, B>
215 where
216     T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
217     B: Body + 'static,
218 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result219     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220         f.debug_struct("Connection").finish()
221     }
222 }
223 
224 impl<T, B> Future for Connection<T, B>
225 where
226     T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
227     B: Body + Send + 'static,
228     B::Data: Send,
229     B::Error: Into<Box<dyn StdError + Send + Sync>>,
230 {
231     type Output = crate::Result<()>;
232 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>233     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
234         match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
235             proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
236             #[cfg(feature = "http1")]
237             proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
238         }
239     }
240 }
241 
242 // ===== impl Builder
243 
244 impl Builder {
245     /// Creates a new connection builder.
246     #[inline]
new<E>(exec: E) -> Builder where E: Executor<BoxSendFuture> + Send + Sync + 'static,247     pub fn new<E>(exec: E) -> Builder
248     where
249         E: Executor<BoxSendFuture> + Send + Sync + 'static,
250     {
251         use std::sync::Arc;
252         Builder {
253             exec: Exec::Executor(Arc::new(exec)),
254             h2_builder: Default::default(),
255         }
256     }
257 
258     /// Provide an executor to execute background HTTP2 tasks.
executor<E>(&mut self, exec: E) -> &mut Builder where E: Executor<BoxSendFuture> + Send + Sync + 'static,259     pub fn executor<E>(&mut self, exec: E) -> &mut Builder
260     where
261         E: Executor<BoxSendFuture> + Send + Sync + 'static,
262     {
263         self.exec = Exec::Executor(Arc::new(exec));
264         self
265     }
266 
267     /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
268     /// stream-level flow control.
269     ///
270     /// Passing `None` will do nothing.
271     ///
272     /// If not set, hyper will use a default.
273     ///
274     /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self275     pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
276         if let Some(sz) = sz.into() {
277             self.h2_builder.adaptive_window = false;
278             self.h2_builder.initial_stream_window_size = sz;
279         }
280         self
281     }
282 
283     /// Sets the max connection-level flow control for HTTP2
284     ///
285     /// Passing `None` will do nothing.
286     ///
287     /// If not set, hyper will use a default.
initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self288     pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
289         if let Some(sz) = sz.into() {
290             self.h2_builder.adaptive_window = false;
291             self.h2_builder.initial_conn_window_size = sz;
292         }
293         self
294     }
295 
296     /// Sets whether to use an adaptive flow control.
297     ///
298     /// Enabling this will override the limits set in
299     /// `initial_stream_window_size` and
300     /// `initial_connection_window_size`.
adaptive_window(&mut self, enabled: bool) -> &mut Self301     pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
302         use proto::h2::SPEC_WINDOW_SIZE;
303 
304         self.h2_builder.adaptive_window = enabled;
305         if enabled {
306             self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
307             self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
308         }
309         self
310     }
311 
312     /// Sets the maximum frame size to use for HTTP2.
313     ///
314     /// Passing `None` will do nothing.
315     ///
316     /// If not set, hyper will use a default.
max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self317     pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
318         if let Some(sz) = sz.into() {
319             self.h2_builder.max_frame_size = sz;
320         }
321         self
322     }
323 
324     /// Sets an interval for HTTP2 Ping frames should be sent to keep a
325     /// connection alive.
326     ///
327     /// Pass `None` to disable HTTP2 keep-alive.
328     ///
329     /// Default is currently disabled.
330     #[cfg(feature = "runtime")]
keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self331     pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
332         self.h2_builder.keep_alive_interval = interval.into();
333         self
334     }
335 
336     /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
337     ///
338     /// If the ping is not acknowledged within the timeout, the connection will
339     /// be closed. Does nothing if `keep_alive_interval` is disabled.
340     ///
341     /// Default is 20 seconds.
342     #[cfg(feature = "runtime")]
keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self343     pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
344         self.h2_builder.keep_alive_timeout = timeout;
345         self
346     }
347 
348     /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
349     ///
350     /// If disabled, keep-alive pings are only sent while there are open
351     /// request/responses streams. If enabled, pings are also sent when no
352     /// streams are active. Does nothing if `keep_alive_interval` is
353     /// disabled.
354     ///
355     /// Default is `false`.
356     #[cfg(feature = "runtime")]
keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self357     pub fn keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
358         self.h2_builder.keep_alive_while_idle = enabled;
359         self
360     }
361 
362     /// Sets the maximum number of HTTP2 concurrent locally reset streams.
363     ///
364     /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
365     /// details.
366     ///
367     /// The default value is determined by the `h2` crate.
368     ///
369     /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self370     pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
371         self.h2_builder.max_concurrent_reset_streams = Some(max);
372         self
373     }
374 
375     /// Set the maximum write buffer size for each HTTP/2 stream.
376     ///
377     /// Default is currently 1MB, but may change.
378     ///
379     /// # Panics
380     ///
381     /// The value must be no larger than `u32::MAX`.
max_send_buf_size(&mut self, max: usize) -> &mut Self382     pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
383         assert!(max <= std::u32::MAX as usize);
384         self.h2_builder.max_send_buffer_size = max;
385         self
386     }
387 
388     /// Constructs a connection with the configured options and IO.
389     /// See [`client::conn`](crate::client::conn) for more.
390     ///
391     /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
392     /// do nothing.
handshake<T, B>( &self, io: T, ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into<Box<dyn StdError + Send + Sync>>,393     pub fn handshake<T, B>(
394         &self,
395         io: T,
396     ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
397     where
398         T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
399         B: Body + 'static,
400         B::Data: Send,
401         B::Error: Into<Box<dyn StdError + Send + Sync>>,
402     {
403         let opts = self.clone();
404 
405         async move {
406             tracing::trace!("client handshake HTTP/1");
407 
408             let (tx, rx) = dispatch::channel();
409             let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec).await?;
410             Ok((
411                 SendRequest {
412                     dispatch: tx.unbound(),
413                 },
414                 Connection {
415                     inner: (PhantomData, h2),
416                 },
417             ))
418         }
419     }
420 }
421