1 //! Client implementation of the HTTP/2 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2 client requires the caller to establish the underlying
6 //! connection as well as get the connection to a state that is ready to begin
7 //! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11 //! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12 //!
13 //! Once a connection is obtained, it is passed to [`handshake`], which will
14 //! begin the [HTTP/2 handshake]. This returns a future that completes once
15 //! the handshake process is performed and HTTP/2 streams may be initialized.
16 //!
17 //! [`handshake`] uses default configuration values. There are a number of
18 //! settings that can be changed by using [`Builder`] instead.
19 //!
20 //! Once the handshake future completes, the caller is provided with a
21 //! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22 //! instance is used to drive the connection (see [Managing the connection]).
23 //! The [`SendRequest`] instance is used to initialize new streams (see [Making
24 //! requests]).
25 //!
26 //! # Making requests
27 //!
28 //! Requests are made using the [`SendRequest`] handle provided by the handshake
29 //! future. Once a request is submitted, an HTTP/2 stream is initialized and
30 //! the request is sent to the server.
31 //!
32 //! A request body and request trailers are sent using [`SendRequest`] and the
33 //! server's response is returned once the [`ResponseFuture`] future completes.
34 //! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35 //! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36 //! initialized by the sent request.
37 //!
38 //! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39 //! stream can be created, i.e. as long as the current number of active streams
40 //! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41 //! caller will be notified once an existing stream closes, freeing capacity for
42 //! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43 //! capacity before sending a request to the server.
44 //!
45 //! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46 //! must not send a request if `poll_ready` does not return `Ready`. Attempting
47 //! to do so will result in an [`Error`] being returned.
48 //!
49 //! # Managing the connection
50 //!
51 //! The [`Connection`] instance is used to manage connection state. The caller
52 //! is required to call [`Connection::poll`] in order to advance state.
53 //! [`SendRequest::send_request`] and other functions have no effect unless
54 //! [`Connection::poll`] is called.
55 //!
56 //! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57 //! returns `Ready`. At this point, the underlying socket has been closed and no
58 //! further work needs to be done.
59 //!
60 //! The easiest way to ensure that the [`Connection`] instance gets polled is to
61 //! submit the [`Connection`] instance to an [executor]. The executor will then
62 //! manage polling the connection until the connection is complete.
63 //! Alternatively, the caller can call `poll` manually.
64 //!
65 //! # Example
66 //!
67 //! ```rust, no_run
68 //!
69 //! use h2::client;
70 //!
71 //! use http::{Request, Method};
72 //! use std::error::Error;
73 //! use tokio::net::TcpStream;
74 //!
75 //! #[tokio::main]
76 //! pub async fn main() -> Result<(), Box<dyn Error>> {
77 //!     // Establish TCP connection to the server.
78 //!     let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79 //!     let (h2, connection) = client::handshake(tcp).await?;
80 //!     tokio::spawn(async move {
81 //!         connection.await.unwrap();
82 //!     });
83 //!
84 //!     let mut h2 = h2.ready().await?;
85 //!     // Prepare the HTTP request to send to the server.
86 //!     let request = Request::builder()
87 //!                     .method(Method::GET)
88 //!                     .uri("https://www.example.com/")
89 //!                     .body(())
90 //!                     .unwrap();
91 //!
92 //!     // Send the request. The second tuple item allows the caller
93 //!     // to stream a request body.
94 //!     let (response, _) = h2.send_request(request, true).unwrap();
95 //!
96 //!     let (head, mut body) = response.await?.into_parts();
97 //!
98 //!     println!("Received response: {:?}", head);
99 //!
100 //!     // The `flow_control` handle allows the caller to manage
101 //!     // flow control.
102 //!     //
103 //!     // Whenever data is received, the caller is responsible for
104 //!     // releasing capacity back to the server once it has freed
105 //!     // the data from memory.
106 //!     let mut flow_control = body.flow_control().clone();
107 //!
108 //!     while let Some(chunk) = body.data().await {
109 //!         let chunk = chunk?;
110 //!         println!("RX: {:?}", chunk);
111 //!
112 //!         // Let the server send more data.
113 //!         let _ = flow_control.release_capacity(chunk.len());
114 //!     }
115 //!
116 //!     Ok(())
117 //! }
118 //! ```
119 //!
120 //! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121 //! [`handshake`]: fn.handshake.html
122 //! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123 //! [`SendRequest`]: struct.SendRequest.html
124 //! [`SendStream`]: ../struct.SendStream.html
125 //! [Making requests]: #making-requests
126 //! [Managing the connection]: #managing-the-connection
127 //! [`Connection`]: struct.Connection.html
128 //! [`Connection::poll`]: struct.Connection.html#method.poll
129 //! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130 //! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131 //! [`SendRequest`]: struct.SendRequest.html
132 //! [`ResponseFuture`]: struct.ResponseFuture.html
133 //! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134 //! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135 //! [`Builder`]: struct.Builder.html
136 //! [`Error`]: ../struct.Error.html
137 
138 use crate::codec::{Codec, SendError, UserError};
139 use crate::ext::Protocol;
140 use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
141 use crate::proto::{self, Error};
142 use crate::{FlowControl, PingPong, RecvStream, SendStream};
143 
144 use bytes::{Buf, Bytes};
145 use http::{uri, HeaderMap, Method, Request, Response, Version};
146 use std::fmt;
147 use std::future::Future;
148 use std::pin::Pin;
149 use std::task::{Context, Poll};
150 use std::time::Duration;
151 use std::usize;
152 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
153 use tracing::Instrument;
154 
155 /// Initializes new HTTP/2 streams on a connection by sending a request.
156 ///
157 /// This type does no work itself. Instead, it is a handle to the inner
158 /// connection state held by [`Connection`]. If the associated connection
159 /// instance is dropped, all `SendRequest` functions will return [`Error`].
160 ///
161 /// [`SendRequest`] instances are able to move to and operate on separate tasks
162 /// / threads than their associated [`Connection`] instance. Internally, there
163 /// is a buffer used to stage requests before they get written to the
164 /// connection. There is no guarantee that requests get written to the
165 /// connection in FIFO order as HTTP/2 prioritization logic can play a role.
166 ///
167 /// [`SendRequest`] implements [`Clone`], enabling the creation of many
168 /// instances that are backed by a single connection.
169 ///
170 /// See [module] level documentation for more details.
171 ///
172 /// [module]: index.html
173 /// [`Connection`]: struct.Connection.html
174 /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
175 /// [`Error`]: ../struct.Error.html
176 pub struct SendRequest<B: Buf> {
177     inner: proto::Streams<B, Peer>,
178     pending: Option<proto::OpaqueStreamRef>,
179 }
180 
181 /// Returns a `SendRequest` instance once it is ready to send at least one
182 /// request.
183 #[derive(Debug)]
184 pub struct ReadySendRequest<B: Buf> {
185     inner: Option<SendRequest<B>>,
186 }
187 
188 /// Manages all state associated with an HTTP/2 client connection.
189 ///
190 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
191 /// implements the HTTP/2 client logic for that connection. It is responsible
192 /// for driving the internal state forward, performing the work requested of the
193 /// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
194 /// [`RecvStream`]).
195 ///
196 /// `Connection` values are created by calling [`handshake`]. Once a
197 /// `Connection` value is obtained, the caller must repeatedly call [`poll`]
198 /// until `Ready` is returned. The easiest way to do this is to submit the
199 /// `Connection` instance to an [executor].
200 ///
201 /// [module]: index.html
202 /// [`handshake`]: fn.handshake.html
203 /// [`SendRequest`]: struct.SendRequest.html
204 /// [`ResponseFuture`]: struct.ResponseFuture.html
205 /// [`SendStream`]: ../struct.SendStream.html
206 /// [`RecvStream`]: ../struct.RecvStream.html
207 /// [`poll`]: #method.poll
208 /// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
209 ///
210 /// # Examples
211 ///
212 /// ```
213 /// # use tokio::io::{AsyncRead, AsyncWrite};
214 /// # use h2::client;
215 /// # use h2::client::*;
216 /// #
217 /// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
218 /// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
219 /// # {
220 ///     let (send_request, connection) = client::handshake(my_io).await?;
221 ///     // Submit the connection handle to an executor.
222 ///     tokio::spawn(async { connection.await.expect("connection failed"); });
223 ///
224 ///     // Now, use `send_request` to initialize HTTP/2 streams.
225 ///     // ...
226 /// # Ok(())
227 /// # }
228 /// #
229 /// # pub fn main() {}
230 /// ```
231 #[must_use = "futures do nothing unless polled"]
232 pub struct Connection<T, B: Buf = Bytes> {
233     inner: proto::Connection<T, Peer, B>,
234 }
235 
236 /// A future of an HTTP response.
237 #[derive(Debug)]
238 #[must_use = "futures do nothing unless polled"]
239 pub struct ResponseFuture {
240     inner: proto::OpaqueStreamRef,
241     push_promise_consumed: bool,
242 }
243 
244 /// A future of a pushed HTTP response.
245 ///
246 /// We have to differentiate between pushed and non pushed because of the spec
247 /// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
248 /// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
249 /// > that is in either the "open" or "half-closed (remote)" state.
250 #[derive(Debug)]
251 #[must_use = "futures do nothing unless polled"]
252 pub struct PushedResponseFuture {
253     inner: ResponseFuture,
254 }
255 
256 /// A pushed response and corresponding request headers
257 #[derive(Debug)]
258 pub struct PushPromise {
259     /// The request headers
260     request: Request<()>,
261 
262     /// The pushed response
263     response: PushedResponseFuture,
264 }
265 
266 /// A stream of pushed responses and corresponding promised requests
267 #[derive(Debug)]
268 #[must_use = "streams do nothing unless polled"]
269 pub struct PushPromises {
270     inner: proto::OpaqueStreamRef,
271 }
272 
273 /// Builds client connections with custom configuration values.
274 ///
275 /// Methods can be chained in order to set the configuration values.
276 ///
277 /// The client is constructed by calling [`handshake`] and passing the I/O
278 /// handle that will back the HTTP/2 server.
279 ///
280 /// New instances of `Builder` are obtained via [`Builder::new`].
281 ///
282 /// See function level documentation for details on the various client
283 /// configuration settings.
284 ///
285 /// [`Builder::new`]: struct.Builder.html#method.new
286 /// [`handshake`]: struct.Builder.html#method.handshake
287 ///
288 /// # Examples
289 ///
290 /// ```
291 /// # use tokio::io::{AsyncRead, AsyncWrite};
292 /// # use h2::client::*;
293 /// # use bytes::Bytes;
294 /// #
295 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
296 ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
297 /// # {
298 /// // `client_fut` is a future representing the completion of the HTTP/2
299 /// // handshake.
300 /// let client_fut = Builder::new()
301 ///     .initial_window_size(1_000_000)
302 ///     .max_concurrent_streams(1000)
303 ///     .handshake(my_io);
304 /// # client_fut.await
305 /// # }
306 /// #
307 /// # pub fn main() {}
308 /// ```
309 #[derive(Clone, Debug)]
310 pub struct Builder {
311     /// Time to keep locally reset streams around before reaping.
312     reset_stream_duration: Duration,
313 
314     /// Initial maximum number of locally initiated (send) streams.
315     /// After receiving a SETTINGS frame from the remote peer,
316     /// the connection will overwrite this value with the
317     /// MAX_CONCURRENT_STREAMS specified in the frame.
318     /// If no value is advertised by the remote peer in the initial SETTINGS
319     /// frame, it will be set to usize::MAX.
320     initial_max_send_streams: usize,
321 
322     /// Initial target window size for new connections.
323     initial_target_connection_window_size: Option<u32>,
324 
325     /// Maximum amount of bytes to "buffer" for writing per stream.
326     max_send_buffer_size: usize,
327 
328     /// Maximum number of locally reset streams to keep at a time.
329     reset_stream_max: usize,
330 
331     /// Maximum number of remotely reset streams to allow in the pending
332     /// accept queue.
333     pending_accept_reset_stream_max: usize,
334 
335     /// Initial `Settings` frame to send as part of the handshake.
336     settings: Settings,
337 
338     /// The stream ID of the first (lowest) stream. Subsequent streams will use
339     /// monotonically increasing stream IDs.
340     stream_id: StreamId,
341 
342     /// Maximum number of locally reset streams due to protocol error across
343     /// the lifetime of the connection.
344     ///
345     /// When this gets exceeded, we issue GOAWAYs.
346     local_max_error_reset_streams: Option<usize>,
347 }
348 
349 #[derive(Debug)]
350 pub(crate) struct Peer;
351 
352 // ===== impl SendRequest =====
353 
354 impl<B> SendRequest<B>
355 where
356     B: Buf,
357 {
358     /// Returns `Ready` when the connection can initialize a new HTTP/2
359     /// stream.
360     ///
361     /// This function must return `Ready` before `send_request` is called. When
362     /// `Poll::Pending` is returned, the task will be notified once the readiness
363     /// state changes.
364     ///
365     /// See [module] level docs for more details.
366     ///
367     /// [module]: index.html
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>>368     pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
369         ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
370         self.pending = None;
371         Poll::Ready(Ok(()))
372     }
373 
374     /// Consumes `self`, returning a future that returns `self` back once it is
375     /// ready to send a request.
376     ///
377     /// This function should be called before calling `send_request`.
378     ///
379     /// This is a functional combinator for [`poll_ready`]. The returned future
380     /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
381     /// the caller.
382     ///
383     /// # Examples
384     ///
385     /// ```rust
386     /// # use h2::client::*;
387     /// # use http::*;
388     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
389     /// # {
390     /// // First, wait until the `send_request` handle is ready to send a new
391     /// // request
392     /// let mut send_request = send_request.ready().await.unwrap();
393     /// // Use `send_request` here.
394     /// # }
395     /// # pub fn main() {}
396     /// ```
397     ///
398     /// See [module] level docs for more details.
399     ///
400     /// [`poll_ready`]: #method.poll_ready
401     /// [module]: index.html
ready(self) -> ReadySendRequest<B>402     pub fn ready(self) -> ReadySendRequest<B> {
403         ReadySendRequest { inner: Some(self) }
404     }
405 
406     /// Sends a HTTP/2 request to the server.
407     ///
408     /// `send_request` initializes a new HTTP/2 stream on the associated
409     /// connection, then sends the given request using this new stream. Only the
410     /// request head is sent.
411     ///
412     /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
413     /// are returned. The [`ResponseFuture`] instance is used to get the
414     /// server's response and the [`SendStream`] instance is used to send a
415     /// request body or trailers to the server over the same HTTP/2 stream.
416     ///
417     /// To send a request body or trailers, set `end_of_stream` to `false`.
418     /// Then, use the returned [`SendStream`] instance to stream request body
419     /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
420     /// then attempting to call [`SendStream::send_data`] or
421     /// [`SendStream::send_trailers`] will result in an error.
422     ///
423     /// If no request body or trailers are to be sent, set `end_of_stream` to
424     /// `true` and drop the returned [`SendStream`] instance.
425     ///
426     /// # A note on HTTP versions
427     ///
428     /// The provided `Request` will be encoded differently depending on the
429     /// value of its version field. If the version is set to 2.0, then the
430     /// request is encoded as per the specification recommends.
431     ///
432     /// If the version is set to a lower value, then the request is encoded to
433     /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
434     /// headers are permitted and the `:authority` pseudo header is not
435     /// included.
436     ///
437     /// The caller should always set the request's version field to 2.0 unless
438     /// specifically transmitting an HTTP 1.1 request over 2.0.
439     ///
440     /// # Examples
441     ///
442     /// Sending a request with no body
443     ///
444     /// ```rust
445     /// # use h2::client::*;
446     /// # use http::*;
447     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
448     /// # {
449     /// // First, wait until the `send_request` handle is ready to send a new
450     /// // request
451     /// let mut send_request = send_request.ready().await.unwrap();
452     /// // Prepare the HTTP request to send to the server.
453     /// let request = Request::get("https://www.example.com/")
454     ///     .body(())
455     ///     .unwrap();
456     ///
457     /// // Send the request to the server. Since we are not sending a
458     /// // body or trailers, we can drop the `SendStream` instance.
459     /// let (response, _) = send_request.send_request(request, true).unwrap();
460     /// let response = response.await.unwrap();
461     /// // Process the response
462     /// # }
463     /// # pub fn main() {}
464     /// ```
465     ///
466     /// Sending a request with a body and trailers
467     ///
468     /// ```rust
469     /// # use h2::client::*;
470     /// # use http::*;
471     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
472     /// # {
473     /// // First, wait until the `send_request` handle is ready to send a new
474     /// // request
475     /// let mut send_request = send_request.ready().await.unwrap();
476     ///
477     /// // Prepare the HTTP request to send to the server.
478     /// let request = Request::get("https://www.example.com/")
479     ///     .body(())
480     ///     .unwrap();
481     ///
482     /// // Send the request to the server. If we are not sending a
483     /// // body or trailers, we can drop the `SendStream` instance.
484     /// let (response, mut send_stream) = send_request
485     ///     .send_request(request, false).unwrap();
486     ///
487     /// // At this point, one option would be to wait for send capacity.
488     /// // Doing so would allow us to not hold data in memory that
489     /// // cannot be sent. However, this is not a requirement, so this
490     /// // example will skip that step. See `SendStream` documentation
491     /// // for more details.
492     /// send_stream.send_data(b"hello", false).unwrap();
493     /// send_stream.send_data(b"world", false).unwrap();
494     ///
495     /// // Send the trailers.
496     /// let mut trailers = HeaderMap::new();
497     /// trailers.insert(
498     ///     header::HeaderName::from_bytes(b"my-trailer").unwrap(),
499     ///     header::HeaderValue::from_bytes(b"hello").unwrap());
500     ///
501     /// send_stream.send_trailers(trailers).unwrap();
502     ///
503     /// let response = response.await.unwrap();
504     /// // Process the response
505     /// # }
506     /// # pub fn main() {}
507     /// ```
508     ///
509     /// [`ResponseFuture`]: struct.ResponseFuture.html
510     /// [`SendStream`]: ../struct.SendStream.html
511     /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
512     /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
send_request( &mut self, request: Request<()>, end_of_stream: bool, ) -> Result<(ResponseFuture, SendStream<B>), crate::Error>513     pub fn send_request(
514         &mut self,
515         request: Request<()>,
516         end_of_stream: bool,
517     ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
518         self.inner
519             .send_request(request, end_of_stream, self.pending.as_ref())
520             .map_err(Into::into)
521             .map(|(stream, is_full)| {
522                 if stream.is_pending_open() && is_full {
523                     // Only prevent sending another request when the request queue
524                     // is not full.
525                     self.pending = Some(stream.clone_to_opaque());
526                 }
527 
528                 let response = ResponseFuture {
529                     inner: stream.clone_to_opaque(),
530                     push_promise_consumed: false,
531                 };
532 
533                 let stream = SendStream::new(stream);
534 
535                 (response, stream)
536             })
537     }
538 
539     /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
540     ///
541     /// This setting is configured by the server peer by sending the
542     /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
543     /// This method returns the currently acknowledged value received from the
544     /// remote.
545     ///
546     /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
547     /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
is_extended_connect_protocol_enabled(&self) -> bool548     pub fn is_extended_connect_protocol_enabled(&self) -> bool {
549         self.inner.is_extended_connect_protocol_enabled()
550     }
551 }
552 
553 impl<B> fmt::Debug for SendRequest<B>
554 where
555     B: Buf,
556 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result557     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
558         fmt.debug_struct("SendRequest").finish()
559     }
560 }
561 
562 impl<B> Clone for SendRequest<B>
563 where
564     B: Buf,
565 {
clone(&self) -> Self566     fn clone(&self) -> Self {
567         SendRequest {
568             inner: self.inner.clone(),
569             pending: None,
570         }
571     }
572 }
573 
574 #[cfg(feature = "unstable")]
575 impl<B> SendRequest<B>
576 where
577     B: Buf,
578 {
579     /// Returns the number of active streams.
580     ///
581     /// An active stream is a stream that has not yet transitioned to a closed
582     /// state.
num_active_streams(&self) -> usize583     pub fn num_active_streams(&self) -> usize {
584         self.inner.num_active_streams()
585     }
586 
587     /// Returns the number of streams that are held in memory.
588     ///
589     /// A wired stream is a stream that is either active or is closed but must
590     /// stay in memory for some reason. For example, there are still outstanding
591     /// userspace handles pointing to the slot.
num_wired_streams(&self) -> usize592     pub fn num_wired_streams(&self) -> usize {
593         self.inner.num_wired_streams()
594     }
595 }
596 
597 // ===== impl ReadySendRequest =====
598 
599 impl<B> Future for ReadySendRequest<B>
600 where
601     B: Buf,
602 {
603     type Output = Result<SendRequest<B>, crate::Error>;
604 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>605     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
606         match &mut self.inner {
607             Some(send_request) => {
608                 ready!(send_request.poll_ready(cx))?;
609             }
610             None => panic!("called `poll` after future completed"),
611         }
612 
613         Poll::Ready(Ok(self.inner.take().unwrap()))
614     }
615 }
616 
617 // ===== impl Builder =====
618 
619 impl Builder {
620     /// Returns a new client builder instance initialized with default
621     /// configuration values.
622     ///
623     /// Configuration methods can be chained on the return value.
624     ///
625     /// # Examples
626     ///
627     /// ```
628     /// # use tokio::io::{AsyncRead, AsyncWrite};
629     /// # use h2::client::*;
630     /// # use bytes::Bytes;
631     /// #
632     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
633     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
634     /// # {
635     /// // `client_fut` is a future representing the completion of the HTTP/2
636     /// // handshake.
637     /// let client_fut = Builder::new()
638     ///     .initial_window_size(1_000_000)
639     ///     .max_concurrent_streams(1000)
640     ///     .handshake(my_io);
641     /// # client_fut.await
642     /// # }
643     /// #
644     /// # pub fn main() {}
645     /// ```
new() -> Builder646     pub fn new() -> Builder {
647         Builder {
648             max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
649             reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
650             reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
651             pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
652             initial_target_connection_window_size: None,
653             initial_max_send_streams: usize::MAX,
654             settings: Default::default(),
655             stream_id: 1.into(),
656             local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
657         }
658     }
659 
660     /// Indicates the initial window size (in octets) for stream-level
661     /// flow control for received data.
662     ///
663     /// The initial window of a stream is used as part of flow control. For more
664     /// details, see [`FlowControl`].
665     ///
666     /// The default value is 65,535.
667     ///
668     /// [`FlowControl`]: ../struct.FlowControl.html
669     ///
670     /// # Examples
671     ///
672     /// ```
673     /// # use tokio::io::{AsyncRead, AsyncWrite};
674     /// # use h2::client::*;
675     /// # use bytes::Bytes;
676     /// #
677     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
678     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
679     /// # {
680     /// // `client_fut` is a future representing the completion of the HTTP/2
681     /// // handshake.
682     /// let client_fut = Builder::new()
683     ///     .initial_window_size(1_000_000)
684     ///     .handshake(my_io);
685     /// # client_fut.await
686     /// # }
687     /// #
688     /// # pub fn main() {}
689     /// ```
initial_window_size(&mut self, size: u32) -> &mut Self690     pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
691         self.settings.set_initial_window_size(Some(size));
692         self
693     }
694 
695     /// Indicates the initial window size (in octets) for connection-level flow control
696     /// for received data.
697     ///
698     /// The initial window of a connection is used as part of flow control. For more details,
699     /// see [`FlowControl`].
700     ///
701     /// The default value is 65,535.
702     ///
703     /// [`FlowControl`]: ../struct.FlowControl.html
704     ///
705     /// # Examples
706     ///
707     /// ```
708     /// # use tokio::io::{AsyncRead, AsyncWrite};
709     /// # use h2::client::*;
710     /// # use bytes::Bytes;
711     /// #
712     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
713     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
714     /// # {
715     /// // `client_fut` is a future representing the completion of the HTTP/2
716     /// // handshake.
717     /// let client_fut = Builder::new()
718     ///     .initial_connection_window_size(1_000_000)
719     ///     .handshake(my_io);
720     /// # client_fut.await
721     /// # }
722     /// #
723     /// # pub fn main() {}
724     /// ```
initial_connection_window_size(&mut self, size: u32) -> &mut Self725     pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
726         self.initial_target_connection_window_size = Some(size);
727         self
728     }
729 
730     /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
731     /// configured client is able to accept.
732     ///
733     /// The sender may send data frames that are **smaller** than this value,
734     /// but any data larger than `max` will be broken up into multiple `DATA`
735     /// frames.
736     ///
737     /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
738     ///
739     /// # Examples
740     ///
741     /// ```
742     /// # use tokio::io::{AsyncRead, AsyncWrite};
743     /// # use h2::client::*;
744     /// # use bytes::Bytes;
745     /// #
746     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
747     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
748     /// # {
749     /// // `client_fut` is a future representing the completion of the HTTP/2
750     /// // handshake.
751     /// let client_fut = Builder::new()
752     ///     .max_frame_size(1_000_000)
753     ///     .handshake(my_io);
754     /// # client_fut.await
755     /// # }
756     /// #
757     /// # pub fn main() {}
758     /// ```
759     ///
760     /// # Panics
761     ///
762     /// This function panics if `max` is not within the legal range specified
763     /// above.
max_frame_size(&mut self, max: u32) -> &mut Self764     pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
765         self.settings.set_max_frame_size(Some(max));
766         self
767     }
768 
769     /// Sets the max size of received header frames.
770     ///
771     /// This advisory setting informs a peer of the maximum size of header list
772     /// that the sender is prepared to accept, in octets. The value is based on
773     /// the uncompressed size of header fields, including the length of the name
774     /// and value in octets plus an overhead of 32 octets for each header field.
775     ///
776     /// This setting is also used to limit the maximum amount of data that is
777     /// buffered to decode HEADERS frames.
778     ///
779     /// # Examples
780     ///
781     /// ```
782     /// # use tokio::io::{AsyncRead, AsyncWrite};
783     /// # use h2::client::*;
784     /// # use bytes::Bytes;
785     /// #
786     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
787     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
788     /// # {
789     /// // `client_fut` is a future representing the completion of the HTTP/2
790     /// // handshake.
791     /// let client_fut = Builder::new()
792     ///     .max_header_list_size(16 * 1024)
793     ///     .handshake(my_io);
794     /// # client_fut.await
795     /// # }
796     /// #
797     /// # pub fn main() {}
798     /// ```
max_header_list_size(&mut self, max: u32) -> &mut Self799     pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
800         self.settings.set_max_header_list_size(Some(max));
801         self
802     }
803 
804     /// Sets the maximum number of concurrent streams.
805     ///
806     /// The maximum concurrent streams setting only controls the maximum number
807     /// of streams that can be initiated by the remote peer. In other words,
808     /// when this setting is set to 100, this does not limit the number of
809     /// concurrent streams that can be created by the caller.
810     ///
811     /// It is recommended that this value be no smaller than 100, so as to not
812     /// unnecessarily limit parallelism. However, any value is legal, including
813     /// 0. If `max` is set to 0, then the remote will not be permitted to
814     /// initiate streams.
815     ///
816     /// Note that streams in the reserved state, i.e., push promises that have
817     /// been reserved but the stream has not started, do not count against this
818     /// setting.
819     ///
820     /// Also note that if the remote *does* exceed the value set here, it is not
821     /// a protocol level error. Instead, the `h2` library will immediately reset
822     /// the stream.
823     ///
824     /// See [Section 5.1.2] in the HTTP/2 spec for more details.
825     ///
826     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
827     ///
828     /// # Examples
829     ///
830     /// ```
831     /// # use tokio::io::{AsyncRead, AsyncWrite};
832     /// # use h2::client::*;
833     /// # use bytes::Bytes;
834     /// #
835     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
836     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
837     /// # {
838     /// // `client_fut` is a future representing the completion of the HTTP/2
839     /// // handshake.
840     /// let client_fut = Builder::new()
841     ///     .max_concurrent_streams(1000)
842     ///     .handshake(my_io);
843     /// # client_fut.await
844     /// # }
845     /// #
846     /// # pub fn main() {}
847     /// ```
max_concurrent_streams(&mut self, max: u32) -> &mut Self848     pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
849         self.settings.set_max_concurrent_streams(Some(max));
850         self
851     }
852 
853     /// Sets the initial maximum of locally initiated (send) streams.
854     ///
855     /// The initial settings will be overwritten by the remote peer when
856     /// the SETTINGS frame is received. The new value will be set to the
857     /// `max_concurrent_streams()` from the frame. If no value is advertised in
858     /// the initial SETTINGS frame from the remote peer as part of
859     /// [HTTP/2 Connection Preface], `usize::MAX` will be set.
860     ///
861     /// This setting prevents the caller from exceeding this number of
862     /// streams that are counted towards the concurrency limit.
863     ///
864     /// Sending streams past the limit returned by the peer will be treated
865     /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
866     ///
867     /// See [Section 5.1.2] in the HTTP/2 spec for more details.
868     ///
869     /// The default value is `usize::MAX`.
870     ///
871     /// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface
872     /// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2
873     ///
874     /// # Examples
875     ///
876     /// ```
877     /// # use tokio::io::{AsyncRead, AsyncWrite};
878     /// # use h2::client::*;
879     /// # use bytes::Bytes;
880     /// #
881     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
882     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
883     /// # {
884     /// // `client_fut` is a future representing the completion of the HTTP/2
885     /// // handshake.
886     /// let client_fut = Builder::new()
887     ///     .initial_max_send_streams(1000)
888     ///     .handshake(my_io);
889     /// # client_fut.await
890     /// # }
891     /// #
892     /// # pub fn main() {}
893     /// ```
initial_max_send_streams(&mut self, initial: usize) -> &mut Self894     pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
895         self.initial_max_send_streams = initial;
896         self
897     }
898 
899     /// Sets the maximum number of concurrent locally reset streams.
900     ///
901     /// When a stream is explicitly reset, the HTTP/2 specification requires
902     /// that any further frames received for that stream must be ignored for
903     /// "some time".
904     ///
905     /// In order to satisfy the specification, internal state must be maintained
906     /// to implement the behavior. This state grows linearly with the number of
907     /// streams that are locally reset.
908     ///
909     /// The `max_concurrent_reset_streams` setting configures sets an upper
910     /// bound on the amount of state that is maintained. When this max value is
911     /// reached, the oldest reset stream is purged from memory.
912     ///
913     /// Once the stream has been fully purged from memory, any additional frames
914     /// received for that stream will result in a connection level protocol
915     /// error, forcing the connection to terminate.
916     ///
917     /// The default value is 10.
918     ///
919     /// # Examples
920     ///
921     /// ```
922     /// # use tokio::io::{AsyncRead, AsyncWrite};
923     /// # use h2::client::*;
924     /// # use bytes::Bytes;
925     /// #
926     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
927     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
928     /// # {
929     /// // `client_fut` is a future representing the completion of the HTTP/2
930     /// // handshake.
931     /// let client_fut = Builder::new()
932     ///     .max_concurrent_reset_streams(1000)
933     ///     .handshake(my_io);
934     /// # client_fut.await
935     /// # }
936     /// #
937     /// # pub fn main() {}
938     /// ```
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self939     pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
940         self.reset_stream_max = max;
941         self
942     }
943 
944     /// Sets the duration to remember locally reset streams.
945     ///
946     /// When a stream is explicitly reset, the HTTP/2 specification requires
947     /// that any further frames received for that stream must be ignored for
948     /// "some time".
949     ///
950     /// In order to satisfy the specification, internal state must be maintained
951     /// to implement the behavior. This state grows linearly with the number of
952     /// streams that are locally reset.
953     ///
954     /// The `reset_stream_duration` setting configures the max amount of time
955     /// this state will be maintained in memory. Once the duration elapses, the
956     /// stream state is purged from memory.
957     ///
958     /// Once the stream has been fully purged from memory, any additional frames
959     /// received for that stream will result in a connection level protocol
960     /// error, forcing the connection to terminate.
961     ///
962     /// The default value is 30 seconds.
963     ///
964     /// # Examples
965     ///
966     /// ```
967     /// # use tokio::io::{AsyncRead, AsyncWrite};
968     /// # use h2::client::*;
969     /// # use std::time::Duration;
970     /// # use bytes::Bytes;
971     /// #
972     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
973     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
974     /// # {
975     /// // `client_fut` is a future representing the completion of the HTTP/2
976     /// // handshake.
977     /// let client_fut = Builder::new()
978     ///     .reset_stream_duration(Duration::from_secs(10))
979     ///     .handshake(my_io);
980     /// # client_fut.await
981     /// # }
982     /// #
983     /// # pub fn main() {}
984     /// ```
reset_stream_duration(&mut self, dur: Duration) -> &mut Self985     pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
986         self.reset_stream_duration = dur;
987         self
988     }
989 
990     /// Sets the maximum number of local resets due to protocol errors made by the remote end.
991     ///
992     /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
993     /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
994     /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
995     ///
996     /// When the number of local resets exceeds this threshold, the client will close the connection.
997     ///
998     /// If you really want to disable this, supply [`Option::None`] here.
999     /// Disabling this is not recommended and may expose you to DOS attacks.
1000     ///
1001     /// The default value is currently 1024, but could change.
max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self1002     pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
1003         self.local_max_error_reset_streams = max;
1004         self
1005     }
1006 
1007     /// Sets the maximum number of pending-accept remotely-reset streams.
1008     ///
1009     /// Streams that have been received by the peer, but not accepted by the
1010     /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
1011     /// could send a request and then shortly after, realize it is not needed,
1012     /// sending a CANCEL.
1013     ///
1014     /// However, since those streams are now "closed", they don't count towards
1015     /// the max concurrent streams. So, they will sit in the accept queue,
1016     /// using memory.
1017     ///
1018     /// When the number of remotely-reset streams sitting in the pending-accept
1019     /// queue reaches this maximum value, a connection error with the code of
1020     /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
1021     /// `Future`.
1022     ///
1023     /// The default value is currently 20, but could change.
1024     ///
1025     /// # Examples
1026     ///
1027     /// ```
1028     /// # use tokio::io::{AsyncRead, AsyncWrite};
1029     /// # use h2::client::*;
1030     /// # use bytes::Bytes;
1031     /// #
1032     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1033     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1034     /// # {
1035     /// // `client_fut` is a future representing the completion of the HTTP/2
1036     /// // handshake.
1037     /// let client_fut = Builder::new()
1038     ///     .max_pending_accept_reset_streams(100)
1039     ///     .handshake(my_io);
1040     /// # client_fut.await
1041     /// # }
1042     /// #
1043     /// # pub fn main() {}
1044     /// ```
max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self1045     pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
1046         self.pending_accept_reset_stream_max = max;
1047         self
1048     }
1049 
1050     /// Sets the maximum send buffer size per stream.
1051     ///
1052     /// Once a stream has buffered up to (or over) the maximum, the stream's
1053     /// flow control will not "poll" additional capacity. Once bytes for the
1054     /// stream have been written to the connection, the send buffer capacity
1055     /// will be freed up again.
1056     ///
1057     /// The default is currently ~400KB, but may change.
1058     ///
1059     /// # Panics
1060     ///
1061     /// This function panics if `max` is larger than `u32::MAX`.
max_send_buffer_size(&mut self, max: usize) -> &mut Self1062     pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
1063         assert!(max <= std::u32::MAX as usize);
1064         self.max_send_buffer_size = max;
1065         self
1066     }
1067 
1068     /// Enables or disables server push promises.
1069     ///
1070     /// This value is included in the initial SETTINGS handshake.
1071     /// Setting this value to value to
1072     /// false in the initial SETTINGS handshake guarantees that the remote server
1073     /// will never send a push promise.
1074     ///
1075     /// This setting can be changed during the life of a single HTTP/2
1076     /// connection by sending another settings frame updating the value.
1077     ///
1078     /// Default value: `true`.
1079     ///
1080     /// # Examples
1081     ///
1082     /// ```
1083     /// # use tokio::io::{AsyncRead, AsyncWrite};
1084     /// # use h2::client::*;
1085     /// # use std::time::Duration;
1086     /// # use bytes::Bytes;
1087     /// #
1088     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1089     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1090     /// # {
1091     /// // `client_fut` is a future representing the completion of the HTTP/2
1092     /// // handshake.
1093     /// let client_fut = Builder::new()
1094     ///     .enable_push(false)
1095     ///     .handshake(my_io);
1096     /// # client_fut.await
1097     /// # }
1098     /// #
1099     /// # pub fn main() {}
1100     /// ```
enable_push(&mut self, enabled: bool) -> &mut Self1101     pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1102         self.settings.set_enable_push(enabled);
1103         self
1104     }
1105 
1106     /// Sets the header table size.
1107     ///
1108     /// This setting informs the peer of the maximum size of the header compression
1109     /// table used to encode header blocks, in octets. The encoder may select any value
1110     /// equal to or less than the header table size specified by the sender.
1111     ///
1112     /// The default value is 4,096.
1113     ///
1114     /// # Examples
1115     ///
1116     /// ```
1117     /// # use tokio::io::{AsyncRead, AsyncWrite};
1118     /// # use h2::client::*;
1119     /// # use bytes::Bytes;
1120     /// #
1121     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1122     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1123     /// # {
1124     /// // `client_fut` is a future representing the completion of the HTTP/2
1125     /// // handshake.
1126     /// let client_fut = Builder::new()
1127     ///     .header_table_size(1_000_000)
1128     ///     .handshake(my_io);
1129     /// # client_fut.await
1130     /// # }
1131     /// #
1132     /// # pub fn main() {}
1133     /// ```
header_table_size(&mut self, size: u32) -> &mut Self1134     pub fn header_table_size(&mut self, size: u32) -> &mut Self {
1135         self.settings.set_header_table_size(Some(size));
1136         self
1137     }
1138 
1139     /// Sets the first stream ID to something other than 1.
1140     #[cfg(feature = "unstable")]
initial_stream_id(&mut self, stream_id: u32) -> &mut Self1141     pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1142         self.stream_id = stream_id.into();
1143         assert!(
1144             self.stream_id.is_client_initiated(),
1145             "stream id must be odd"
1146         );
1147         self
1148     }
1149 
1150     /// Creates a new configured HTTP/2 client backed by `io`.
1151     ///
1152     /// It is expected that `io` already be in an appropriate state to commence
1153     /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1154     /// preface and the initial settings frame is sent by the client.
1155     ///
1156     /// The handshake future does not wait for the initial settings frame from the
1157     /// server.
1158     ///
1159     /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1160     /// tuple once the HTTP/2 handshake has been completed.
1161     ///
1162     /// This function also allows the caller to configure the send payload data
1163     /// type. See [Outbound data type] for more details.
1164     ///
1165     /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1166     /// [`Connection`]: struct.Connection.html
1167     /// [`SendRequest`]: struct.SendRequest.html
1168     /// [Outbound data type]: ../index.html#outbound-data-type.
1169     ///
1170     /// # Examples
1171     ///
1172     /// Basic usage:
1173     ///
1174     /// ```
1175     /// # use tokio::io::{AsyncRead, AsyncWrite};
1176     /// # use h2::client::*;
1177     /// # use bytes::Bytes;
1178     /// #
1179     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1180     ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1181     /// # {
1182     /// // `client_fut` is a future representing the completion of the HTTP/2
1183     /// // handshake.
1184     /// let client_fut = Builder::new()
1185     ///     .handshake(my_io);
1186     /// # client_fut.await
1187     /// # }
1188     /// #
1189     /// # pub fn main() {}
1190     /// ```
1191     ///
1192     /// Configures the send-payload data type. In this case, the outbound data
1193     /// type will be `&'static [u8]`.
1194     ///
1195     /// ```
1196     /// # use tokio::io::{AsyncRead, AsyncWrite};
1197     /// # use h2::client::*;
1198     /// #
1199     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1200     /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1201     /// # {
1202     /// // `client_fut` is a future representing the completion of the HTTP/2
1203     /// // handshake.
1204     /// let client_fut = Builder::new()
1205     ///     .handshake::<_, &'static [u8]>(my_io);
1206     /// # client_fut.await
1207     /// # }
1208     /// #
1209     /// # pub fn main() {}
1210     /// ```
handshake<T, B>( &self, io: T, ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>> where T: AsyncRead + AsyncWrite + Unpin, B: Buf,1211     pub fn handshake<T, B>(
1212         &self,
1213         io: T,
1214     ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1215     where
1216         T: AsyncRead + AsyncWrite + Unpin,
1217         B: Buf,
1218     {
1219         Connection::handshake2(io, self.clone())
1220     }
1221 }
1222 
1223 impl Default for Builder {
default() -> Builder1224     fn default() -> Builder {
1225         Builder::new()
1226     }
1227 }
1228 
1229 /// Creates a new configured HTTP/2 client with default configuration
1230 /// values backed by `io`.
1231 ///
1232 /// It is expected that `io` already be in an appropriate state to commence
1233 /// the [HTTP/2 handshake]. See [Handshake] for more details.
1234 ///
1235 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1236 /// tuple once the HTTP/2 handshake has been completed. The returned
1237 /// [`Connection`] instance will be using default configuration values. Use
1238 /// [`Builder`] to customize the configuration values used by a [`Connection`]
1239 /// instance.
1240 ///
1241 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1242 /// [Handshake]: ../index.html#handshake
1243 /// [`Connection`]: struct.Connection.html
1244 /// [`SendRequest`]: struct.SendRequest.html
1245 ///
1246 /// # Examples
1247 ///
1248 /// ```
1249 /// # use tokio::io::{AsyncRead, AsyncWrite};
1250 /// # use h2::client;
1251 /// # use h2::client::*;
1252 /// #
1253 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
1254 /// # {
1255 /// let (send_request, connection) = client::handshake(my_io).await?;
1256 /// // The HTTP/2 handshake has completed, now start polling
1257 /// // `connection` and use `send_request` to send requests to the
1258 /// // server.
1259 /// # Ok(())
1260 /// # }
1261 /// #
1262 /// # pub fn main() {}
1263 /// ```
handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error> where T: AsyncRead + AsyncWrite + Unpin,1264 pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1265 where
1266     T: AsyncRead + AsyncWrite + Unpin,
1267 {
1268     let builder = Builder::new();
1269     builder
1270         .handshake(io)
1271         .instrument(tracing::trace_span!("client_handshake"))
1272         .await
1273 }
1274 
1275 // ===== impl Connection =====
1276 
bind_connection<T>(io: &mut T) -> Result<(), crate::Error> where T: AsyncRead + AsyncWrite + Unpin,1277 async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1278 where
1279     T: AsyncRead + AsyncWrite + Unpin,
1280 {
1281     tracing::debug!("binding client connection");
1282 
1283     let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1284     io.write_all(msg).await.map_err(crate::Error::from_io)?;
1285 
1286     tracing::debug!("client connection bound");
1287 
1288     Ok(())
1289 }
1290 
1291 impl<T, B> Connection<T, B>
1292 where
1293     T: AsyncRead + AsyncWrite + Unpin,
1294     B: Buf,
1295 {
handshake2( mut io: T, builder: Builder, ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error>1296     async fn handshake2(
1297         mut io: T,
1298         builder: Builder,
1299     ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1300         bind_connection(&mut io).await?;
1301 
1302         // Create the codec
1303         let mut codec = Codec::new(io);
1304 
1305         if let Some(max) = builder.settings.max_frame_size() {
1306             codec.set_max_recv_frame_size(max as usize);
1307         }
1308 
1309         if let Some(max) = builder.settings.max_header_list_size() {
1310             codec.set_max_recv_header_list_size(max as usize);
1311         }
1312 
1313         // Send initial settings frame
1314         codec
1315             .buffer(builder.settings.clone().into())
1316             .expect("invalid SETTINGS frame");
1317 
1318         let inner = proto::Connection::new(
1319             codec,
1320             proto::Config {
1321                 next_stream_id: builder.stream_id,
1322                 initial_max_send_streams: builder.initial_max_send_streams,
1323                 max_send_buffer_size: builder.max_send_buffer_size,
1324                 reset_stream_duration: builder.reset_stream_duration,
1325                 reset_stream_max: builder.reset_stream_max,
1326                 remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1327                 local_error_reset_streams_max: builder.local_max_error_reset_streams,
1328                 settings: builder.settings.clone(),
1329             },
1330         );
1331         let send_request = SendRequest {
1332             inner: inner.streams().clone(),
1333             pending: None,
1334         };
1335 
1336         let mut connection = Connection { inner };
1337         if let Some(sz) = builder.initial_target_connection_window_size {
1338             connection.set_target_window_size(sz);
1339         }
1340 
1341         Ok((send_request, connection))
1342     }
1343 
1344     /// Sets the target window size for the whole connection.
1345     ///
1346     /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1347     /// frame will be immediately sent to the remote, increasing the connection
1348     /// level window by `size - current_value`.
1349     ///
1350     /// If `size` is less than the current value, nothing will happen
1351     /// immediately. However, as window capacity is released by
1352     /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1353     /// out until the number of "in flight" bytes drops below `size`.
1354     ///
1355     /// The default value is 65,535.
1356     ///
1357     /// See [`FlowControl`] documentation for more details.
1358     ///
1359     /// [`FlowControl`]: ../struct.FlowControl.html
1360     /// [library level]: ../index.html#flow-control
set_target_window_size(&mut self, size: u32)1361     pub fn set_target_window_size(&mut self, size: u32) {
1362         assert!(size <= proto::MAX_WINDOW_SIZE);
1363         self.inner.set_target_window_size(size);
1364     }
1365 
1366     /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1367     /// flow control for received data.
1368     ///
1369     /// The `SETTINGS` will be sent to the remote, and only applied once the
1370     /// remote acknowledges the change.
1371     ///
1372     /// This can be used to increase or decrease the window size for existing
1373     /// streams.
1374     ///
1375     /// # Errors
1376     ///
1377     /// Returns an error if a previous call is still pending acknowledgement
1378     /// from the remote endpoint.
set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error>1379     pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1380         assert!(size <= proto::MAX_WINDOW_SIZE);
1381         self.inner.set_initial_window_size(size)?;
1382         Ok(())
1383     }
1384 
1385     /// Takes a `PingPong` instance from the connection.
1386     ///
1387     /// # Note
1388     ///
1389     /// This may only be called once. Calling multiple times will return `None`.
ping_pong(&mut self) -> Option<PingPong>1390     pub fn ping_pong(&mut self) -> Option<PingPong> {
1391         self.inner.take_user_pings().map(PingPong::new)
1392     }
1393 
1394     /// Returns the maximum number of concurrent streams that may be initiated
1395     /// by this client.
1396     ///
1397     /// This limit is configured by the server peer by sending the
1398     /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1399     /// This method returns the currently acknowledged value received from the
1400     /// remote.
1401     ///
1402     /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
max_concurrent_send_streams(&self) -> usize1403     pub fn max_concurrent_send_streams(&self) -> usize {
1404         self.inner.max_send_streams()
1405     }
1406     /// Returns the maximum number of concurrent streams that may be initiated
1407     /// by the server on this connection.
1408     ///
1409     /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1410     /// parameter][1] sent in a `SETTINGS` frame that has been
1411     /// acknowledged by the remote peer. The value to be sent is configured by
1412     /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1413     /// with the remote peer.
1414     ///
1415     /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1416     /// [2]: ../struct.Builder.html#method.max_concurrent_streams
max_concurrent_recv_streams(&self) -> usize1417     pub fn max_concurrent_recv_streams(&self) -> usize {
1418         self.inner.max_recv_streams()
1419     }
1420 }
1421 
1422 impl<T, B> Future for Connection<T, B>
1423 where
1424     T: AsyncRead + AsyncWrite + Unpin,
1425     B: Buf,
1426 {
1427     type Output = Result<(), crate::Error>;
1428 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1429     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1430         self.inner.maybe_close_connection_if_no_streams();
1431         self.inner.poll(cx).map_err(Into::into)
1432     }
1433 }
1434 
1435 impl<T, B> fmt::Debug for Connection<T, B>
1436 where
1437     T: AsyncRead + AsyncWrite,
1438     T: fmt::Debug,
1439     B: fmt::Debug + Buf,
1440 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1441     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1442         fmt::Debug::fmt(&self.inner, fmt)
1443     }
1444 }
1445 
1446 // ===== impl ResponseFuture =====
1447 
1448 impl Future for ResponseFuture {
1449     type Output = Result<Response<RecvStream>, crate::Error>;
1450 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1451     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1452         let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1453         let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1454 
1455         Poll::Ready(Ok(Response::from_parts(parts, body)))
1456     }
1457 }
1458 
1459 impl ResponseFuture {
1460     /// Returns the stream ID of the response stream.
1461     ///
1462     /// # Panics
1463     ///
1464     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1465     pub fn stream_id(&self) -> crate::StreamId {
1466         crate::StreamId::from_internal(self.inner.stream_id())
1467     }
1468     /// Returns a stream of PushPromises
1469     ///
1470     /// # Panics
1471     ///
1472     /// If this method has been called before
1473     /// or the stream was itself was pushed
push_promises(&mut self) -> PushPromises1474     pub fn push_promises(&mut self) -> PushPromises {
1475         if self.push_promise_consumed {
1476             panic!("Reference to push promises stream taken!");
1477         }
1478         self.push_promise_consumed = true;
1479         PushPromises {
1480             inner: self.inner.clone(),
1481         }
1482     }
1483 }
1484 
1485 // ===== impl PushPromises =====
1486 
1487 impl PushPromises {
1488     /// Get the next `PushPromise`.
push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>>1489     pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1490         futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await
1491     }
1492 
1493     #[doc(hidden)]
poll_push_promise( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<PushPromise, crate::Error>>>1494     pub fn poll_push_promise(
1495         &mut self,
1496         cx: &mut Context<'_>,
1497     ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1498         match self.inner.poll_pushed(cx) {
1499             Poll::Ready(Some(Ok((request, response)))) => {
1500                 let response = PushedResponseFuture {
1501                     inner: ResponseFuture {
1502                         inner: response,
1503                         push_promise_consumed: false,
1504                     },
1505                 };
1506                 Poll::Ready(Some(Ok(PushPromise { request, response })))
1507             }
1508             Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1509             Poll::Ready(None) => Poll::Ready(None),
1510             Poll::Pending => Poll::Pending,
1511         }
1512     }
1513 }
1514 
1515 #[cfg(feature = "stream")]
1516 impl futures_core::Stream for PushPromises {
1517     type Item = Result<PushPromise, crate::Error>;
1518 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>1519     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1520         self.poll_push_promise(cx)
1521     }
1522 }
1523 
1524 // ===== impl PushPromise =====
1525 
1526 impl PushPromise {
1527     /// Returns a reference to the push promise's request headers.
request(&self) -> &Request<()>1528     pub fn request(&self) -> &Request<()> {
1529         &self.request
1530     }
1531 
1532     /// Returns a mutable reference to the push promise's request headers.
request_mut(&mut self) -> &mut Request<()>1533     pub fn request_mut(&mut self) -> &mut Request<()> {
1534         &mut self.request
1535     }
1536 
1537     /// Consumes `self`, returning the push promise's request headers and
1538     /// response future.
into_parts(self) -> (Request<()>, PushedResponseFuture)1539     pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1540         (self.request, self.response)
1541     }
1542 }
1543 
1544 // ===== impl PushedResponseFuture =====
1545 
1546 impl Future for PushedResponseFuture {
1547     type Output = Result<Response<RecvStream>, crate::Error>;
1548 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1549     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1550         Pin::new(&mut self.inner).poll(cx)
1551     }
1552 }
1553 
1554 impl PushedResponseFuture {
1555     /// Returns the stream ID of the response stream.
1556     ///
1557     /// # Panics
1558     ///
1559     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1560     pub fn stream_id(&self) -> crate::StreamId {
1561         self.inner.stream_id()
1562     }
1563 }
1564 
1565 // ===== impl Peer =====
1566 
1567 impl Peer {
convert_send_message( id: StreamId, request: Request<()>, protocol: Option<Protocol>, end_of_stream: bool, ) -> Result<Headers, SendError>1568     pub fn convert_send_message(
1569         id: StreamId,
1570         request: Request<()>,
1571         protocol: Option<Protocol>,
1572         end_of_stream: bool,
1573     ) -> Result<Headers, SendError> {
1574         use http::request::Parts;
1575 
1576         let (
1577             Parts {
1578                 method,
1579                 uri,
1580                 headers,
1581                 version,
1582                 ..
1583             },
1584             _,
1585         ) = request.into_parts();
1586 
1587         let is_connect = method == Method::CONNECT;
1588 
1589         // Build the set pseudo header set. All requests will include `method`
1590         // and `path`.
1591         let mut pseudo = Pseudo::request(method, uri, protocol);
1592 
1593         if pseudo.scheme.is_none() {
1594             // If the scheme is not set, then there are a two options.
1595             //
1596             // 1) Authority is not set. In this case, a request was issued with
1597             //    a relative URI. This is permitted **only** when forwarding
1598             //    HTTP 1.x requests. If the HTTP version is set to 2.0, then
1599             //    this is an error.
1600             //
1601             // 2) Authority is set, then the HTTP method *must* be CONNECT.
1602             //
1603             // It is not possible to have a scheme but not an authority set (the
1604             // `http` crate does not allow it).
1605             //
1606             if pseudo.authority.is_none() {
1607                 if version == Version::HTTP_2 {
1608                     return Err(UserError::MissingUriSchemeAndAuthority.into());
1609                 } else {
1610                     // This is acceptable as per the above comment. However,
1611                     // HTTP/2 requires that a scheme is set. Since we are
1612                     // forwarding an HTTP 1.1 request, the scheme is set to
1613                     // "http".
1614                     pseudo.set_scheme(uri::Scheme::HTTP);
1615                 }
1616             } else if !is_connect {
1617                 // TODO: Error
1618             }
1619         }
1620 
1621         // Create the HEADERS frame
1622         let mut frame = Headers::new(id, pseudo, headers);
1623 
1624         if end_of_stream {
1625             frame.set_end_stream()
1626         }
1627 
1628         Ok(frame)
1629     }
1630 }
1631 
1632 impl proto::Peer for Peer {
1633     type Poll = Response<()>;
1634 
1635     const NAME: &'static str = "Client";
1636 
1637     fn r#dyn() -> proto::DynPeer {
1638         proto::DynPeer::Client
1639     }
1640 
1641     /*
1642     fn is_server() -> bool {
1643         false
1644     }
1645     */
1646 
convert_poll_message( pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, ) -> Result<Self::Poll, Error>1647     fn convert_poll_message(
1648         pseudo: Pseudo,
1649         fields: HeaderMap,
1650         stream_id: StreamId,
1651     ) -> Result<Self::Poll, Error> {
1652         let mut b = Response::builder();
1653 
1654         b = b.version(Version::HTTP_2);
1655 
1656         if let Some(status) = pseudo.status {
1657             b = b.status(status);
1658         }
1659 
1660         let mut response = match b.body(()) {
1661             Ok(response) => response,
1662             Err(_) => {
1663                 // TODO: Should there be more specialized handling for different
1664                 // kinds of errors
1665                 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1666             }
1667         };
1668 
1669         *response.headers_mut() = fields;
1670 
1671         Ok(response)
1672     }
1673 }
1674