1 //! HTTP/2 client connections
2
3 use std::error::Error as StdError;
4 use std::fmt;
5 use std::future::Future;
6 use std::marker::PhantomData;
7 use std::marker::Unpin;
8 use std::pin::Pin;
9 use std::sync::Arc;
10 use std::task::{Context, Poll};
11 use std::time::Duration;
12
13 use http::{Request, Response};
14 use tokio::io::{AsyncRead, AsyncWrite};
15
16 use super::super::dispatch;
17 use crate::body::{Body as IncomingBody, HttpBody as Body};
18 use crate::common::exec::{BoxSendFuture, Exec};
19 use crate::proto;
20 use crate::rt::Executor;
21
22 /// The sender side of an established connection.
23 pub struct SendRequest<B> {
24 dispatch: dispatch::UnboundedSender<Request<B>, Response<IncomingBody>>,
25 }
26
27 impl<B> Clone for SendRequest<B> {
clone(&self) -> SendRequest<B>28 fn clone(&self) -> SendRequest<B> {
29 SendRequest {
30 dispatch: self.dispatch.clone(),
31 }
32 }
33 }
34
35 /// A future that processes all HTTP state for the IO object.
36 ///
37 /// In most cases, this should just be spawned into an executor, so that it
38 /// can process incoming and outgoing messages, notice hangups, and the like.
39 #[must_use = "futures do nothing unless polled"]
40 pub struct Connection<T, B>
41 where
42 T: AsyncRead + AsyncWrite + Send + 'static,
43 B: Body + 'static,
44 {
45 inner: (PhantomData<T>, proto::h2::ClientTask<B>),
46 }
47
48 /// A builder to configure an HTTP connection.
49 ///
50 /// After setting options, the builder is used to create a handshake future.
51 #[derive(Clone, Debug)]
52 pub struct Builder {
53 pub(super) exec: Exec,
54 h2_builder: proto::h2::client::Config,
55 }
56
57 /// Returns a handshake future over some IO.
58 ///
59 /// This is a shortcut for `Builder::new().handshake(io)`.
60 /// See [`client::conn`](crate::client::conn) for more.
handshake<E, T, B>(exec: E, io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)> where E: Executor<BoxSendFuture> + Send + Sync + 'static, T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into<Box<dyn StdError + Send + Sync>>,61 pub async fn handshake<E, T, B>(exec: E, io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
62 where
63 E: Executor<BoxSendFuture> + Send + Sync + 'static,
64 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
65 B: Body + 'static,
66 B::Data: Send,
67 B::Error: Into<Box<dyn StdError + Send + Sync>>,
68 {
69 Builder::new(exec).handshake(io).await
70 }
71
72 // ===== impl SendRequest
73
74 impl<B> SendRequest<B> {
75 /// Polls to determine whether this sender can be used yet for a request.
76 ///
77 /// If the associated connection is closed, this returns an Error.
poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<crate::Result<()>>78 pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
79 if self.is_closed() {
80 Poll::Ready(Err(crate::Error::new_closed()))
81 } else {
82 Poll::Ready(Ok(()))
83 }
84 }
85
86 /// Waits until the dispatcher is ready
87 ///
88 /// If the associated connection is closed, this returns an Error.
ready(&mut self) -> crate::Result<()>89 pub async fn ready(&mut self) -> crate::Result<()> {
90 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
91 }
92
93 /*
94 pub(super) async fn when_ready(self) -> crate::Result<Self> {
95 let mut me = Some(self);
96 future::poll_fn(move |cx| {
97 ready!(me.as_mut().unwrap().poll_ready(cx))?;
98 Poll::Ready(Ok(me.take().unwrap()))
99 })
100 .await
101 }
102
103 pub(super) fn is_ready(&self) -> bool {
104 self.dispatch.is_ready()
105 }
106 */
107
is_closed(&self) -> bool108 pub(super) fn is_closed(&self) -> bool {
109 self.dispatch.is_closed()
110 }
111 }
112
113 impl<B> SendRequest<B>
114 where
115 B: Body + 'static,
116 {
117 /// Sends a `Request` on the associated connection.
118 ///
119 /// Returns a future that if successful, yields the `Response`.
120 ///
121 /// # Note
122 ///
123 /// There are some key differences in what automatic things the `Client`
124 /// does for you that will not be done here:
125 ///
126 /// - `Client` requires absolute-form `Uri`s, since the scheme and
127 /// authority are needed to connect. They aren't required here.
128 /// - Since the `Client` requires absolute-form `Uri`s, it can add
129 /// the `Host` header based on it. You must add a `Host` header yourself
130 /// before calling this method.
131 /// - Since absolute-form `Uri`s are not required, if received, they will
132 /// be serialized as-is.
send_request( &mut self, req: Request<B>, ) -> impl Future<Output = crate::Result<Response<IncomingBody>>>133 pub fn send_request(
134 &mut self,
135 req: Request<B>,
136 ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
137 let sent = self.dispatch.send(req);
138
139 async move {
140 match sent {
141 Ok(rx) => match rx.await {
142 Ok(Ok(resp)) => Ok(resp),
143 Ok(Err(err)) => Err(err),
144 // this is definite bug if it happens, but it shouldn't happen!
145 Err(_canceled) => panic!("dispatch dropped without returning error"),
146 },
147 Err(_req) => {
148 tracing::debug!("connection was not ready");
149
150 Err(crate::Error::new_canceled().with("connection was not ready"))
151 }
152 }
153 }
154 }
155
156 /*
157 pub(super) fn send_request_retryable(
158 &mut self,
159 req: Request<B>,
160 ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
161 where
162 B: Send,
163 {
164 match self.dispatch.try_send(req) {
165 Ok(rx) => {
166 Either::Left(rx.then(move |res| {
167 match res {
168 Ok(Ok(res)) => future::ok(res),
169 Ok(Err(err)) => future::err(err),
170 // this is definite bug if it happens, but it shouldn't happen!
171 Err(_) => panic!("dispatch dropped without returning error"),
172 }
173 }))
174 }
175 Err(req) => {
176 tracing::debug!("connection was not ready");
177 let err = crate::Error::new_canceled().with("connection was not ready");
178 Either::Right(future::err((err, Some(req))))
179 }
180 }
181 }
182 */
183 }
184
185 impl<B> fmt::Debug for SendRequest<B> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result186 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187 f.debug_struct("SendRequest").finish()
188 }
189 }
190
191 // ===== impl Connection
192
193 impl<T, B> Connection<T, B>
194 where
195 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
196 B: Body + Unpin + Send + 'static,
197 B::Data: Send,
198 B::Error: Into<Box<dyn StdError + Send + Sync>>,
199 {
200 /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
201 ///
202 /// This setting is configured by the server peer by sending the
203 /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
204 /// This method returns the currently acknowledged value received from the
205 /// remote.
206 ///
207 /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
208 /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
is_extended_connect_protocol_enabled(&self) -> bool209 pub fn is_extended_connect_protocol_enabled(&self) -> bool {
210 self.inner.1.is_extended_connect_protocol_enabled()
211 }
212 }
213
214 impl<T, B> fmt::Debug for Connection<T, B>
215 where
216 T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
217 B: Body + 'static,
218 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 f.debug_struct("Connection").finish()
221 }
222 }
223
224 impl<T, B> Future for Connection<T, B>
225 where
226 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
227 B: Body + Send + 'static,
228 B::Data: Send,
229 B::Error: Into<Box<dyn StdError + Send + Sync>>,
230 {
231 type Output = crate::Result<()>;
232
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>233 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
234 match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
235 proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
236 #[cfg(feature = "http1")]
237 proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
238 }
239 }
240 }
241
242 // ===== impl Builder
243
244 impl Builder {
245 /// Creates a new connection builder.
246 #[inline]
new<E>(exec: E) -> Builder where E: Executor<BoxSendFuture> + Send + Sync + 'static,247 pub fn new<E>(exec: E) -> Builder
248 where
249 E: Executor<BoxSendFuture> + Send + Sync + 'static,
250 {
251 use std::sync::Arc;
252 Builder {
253 exec: Exec::Executor(Arc::new(exec)),
254 h2_builder: Default::default(),
255 }
256 }
257
258 /// Provide an executor to execute background HTTP2 tasks.
executor<E>(&mut self, exec: E) -> &mut Builder where E: Executor<BoxSendFuture> + Send + Sync + 'static,259 pub fn executor<E>(&mut self, exec: E) -> &mut Builder
260 where
261 E: Executor<BoxSendFuture> + Send + Sync + 'static,
262 {
263 self.exec = Exec::Executor(Arc::new(exec));
264 self
265 }
266
267 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
268 /// stream-level flow control.
269 ///
270 /// Passing `None` will do nothing.
271 ///
272 /// If not set, hyper will use a default.
273 ///
274 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self275 pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
276 if let Some(sz) = sz.into() {
277 self.h2_builder.adaptive_window = false;
278 self.h2_builder.initial_stream_window_size = sz;
279 }
280 self
281 }
282
283 /// Sets the max connection-level flow control for HTTP2
284 ///
285 /// Passing `None` will do nothing.
286 ///
287 /// If not set, hyper will use a default.
initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self288 pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
289 if let Some(sz) = sz.into() {
290 self.h2_builder.adaptive_window = false;
291 self.h2_builder.initial_conn_window_size = sz;
292 }
293 self
294 }
295
296 /// Sets whether to use an adaptive flow control.
297 ///
298 /// Enabling this will override the limits set in
299 /// `initial_stream_window_size` and
300 /// `initial_connection_window_size`.
adaptive_window(&mut self, enabled: bool) -> &mut Self301 pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
302 use proto::h2::SPEC_WINDOW_SIZE;
303
304 self.h2_builder.adaptive_window = enabled;
305 if enabled {
306 self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
307 self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
308 }
309 self
310 }
311
312 /// Sets the maximum frame size to use for HTTP2.
313 ///
314 /// Passing `None` will do nothing.
315 ///
316 /// If not set, hyper will use a default.
max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self317 pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
318 if let Some(sz) = sz.into() {
319 self.h2_builder.max_frame_size = sz;
320 }
321 self
322 }
323
324 /// Sets an interval for HTTP2 Ping frames should be sent to keep a
325 /// connection alive.
326 ///
327 /// Pass `None` to disable HTTP2 keep-alive.
328 ///
329 /// Default is currently disabled.
330 #[cfg(feature = "runtime")]
keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self331 pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
332 self.h2_builder.keep_alive_interval = interval.into();
333 self
334 }
335
336 /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
337 ///
338 /// If the ping is not acknowledged within the timeout, the connection will
339 /// be closed. Does nothing if `keep_alive_interval` is disabled.
340 ///
341 /// Default is 20 seconds.
342 #[cfg(feature = "runtime")]
keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self343 pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
344 self.h2_builder.keep_alive_timeout = timeout;
345 self
346 }
347
348 /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
349 ///
350 /// If disabled, keep-alive pings are only sent while there are open
351 /// request/responses streams. If enabled, pings are also sent when no
352 /// streams are active. Does nothing if `keep_alive_interval` is
353 /// disabled.
354 ///
355 /// Default is `false`.
356 #[cfg(feature = "runtime")]
keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self357 pub fn keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
358 self.h2_builder.keep_alive_while_idle = enabled;
359 self
360 }
361
362 /// Sets the maximum number of HTTP2 concurrent locally reset streams.
363 ///
364 /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
365 /// details.
366 ///
367 /// The default value is determined by the `h2` crate.
368 ///
369 /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self370 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
371 self.h2_builder.max_concurrent_reset_streams = Some(max);
372 self
373 }
374
375 /// Set the maximum write buffer size for each HTTP/2 stream.
376 ///
377 /// Default is currently 1MB, but may change.
378 ///
379 /// # Panics
380 ///
381 /// The value must be no larger than `u32::MAX`.
max_send_buf_size(&mut self, max: usize) -> &mut Self382 pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
383 assert!(max <= std::u32::MAX as usize);
384 self.h2_builder.max_send_buffer_size = max;
385 self
386 }
387
388 /// Constructs a connection with the configured options and IO.
389 /// See [`client::conn`](crate::client::conn) for more.
390 ///
391 /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
392 /// do nothing.
handshake<T, B>( &self, io: T, ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, B::Data: Send, B::Error: Into<Box<dyn StdError + Send + Sync>>,393 pub fn handshake<T, B>(
394 &self,
395 io: T,
396 ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
397 where
398 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
399 B: Body + 'static,
400 B::Data: Send,
401 B::Error: Into<Box<dyn StdError + Send + Sync>>,
402 {
403 let opts = self.clone();
404
405 async move {
406 tracing::trace!("client handshake HTTP/1");
407
408 let (tx, rx) = dispatch::channel();
409 let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec).await?;
410 Ok((
411 SendRequest {
412 dispatch: tx.unbound(),
413 },
414 Connection {
415 inner: (PhantomData, h2),
416 },
417 ))
418 }
419 }
420 }
421