1 //! Client implementation of the HTTP/2 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2 client requires the caller to establish the underlying
6 //! connection as well as get the connection to a state that is ready to begin
7 //! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11 //! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12 //!
13 //! Once a connection is obtained, it is passed to [`handshake`], which will
14 //! begin the [HTTP/2 handshake]. This returns a future that completes once
15 //! the handshake process is performed and HTTP/2 streams may be initialized.
16 //!
17 //! [`handshake`] uses default configuration values. There are a number of
18 //! settings that can be changed by using [`Builder`] instead.
19 //!
20 //! Once the handshake future completes, the caller is provided with a
21 //! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22 //! instance is used to drive the connection (see [Managing the connection]).
23 //! The [`SendRequest`] instance is used to initialize new streams (see [Making
24 //! requests]).
25 //!
26 //! # Making requests
27 //!
28 //! Requests are made using the [`SendRequest`] handle provided by the handshake
29 //! future. Once a request is submitted, an HTTP/2 stream is initialized and
30 //! the request is sent to the server.
31 //!
32 //! A request body and request trailers are sent using [`SendRequest`] and the
33 //! server's response is returned once the [`ResponseFuture`] future completes.
34 //! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35 //! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36 //! initialized by the sent request.
37 //!
38 //! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39 //! stream can be created, i.e. as long as the current number of active streams
40 //! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41 //! caller will be notified once an existing stream closes, freeing capacity for
42 //! the caller. The caller should use [`SendRequest::poll_ready`] to check for
43 //! capacity before sending a request to the server.
44 //!
45 //! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46 //! must not send a request if `poll_ready` does not return `Ready`. Attempting
47 //! to do so will result in an [`Error`] being returned.
48 //!
49 //! # Managing the connection
50 //!
51 //! The [`Connection`] instance is used to manage connection state. The caller
52 //! is required to call [`Connection::poll`] in order to advance state.
53 //! [`SendRequest::send_request`] and other functions have no effect unless
54 //! [`Connection::poll`] is called.
55 //!
56 //! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57 //! returns `Ready`. At this point, the underlying socket has been closed and no
58 //! further work needs to be done.
59 //!
60 //! The easiest way to ensure that the [`Connection`] instance gets polled is to
61 //! submit the [`Connection`] instance to an [executor]. The executor will then
62 //! manage polling the connection until the connection is complete.
63 //! Alternatively, the caller can call `poll` manually.
64 //!
65 //! # Example
66 //!
67 //! ```rust, no_run
68 //!
69 //! use h2::client;
70 //!
71 //! use http::{Request, Method};
72 //! use std::error::Error;
73 //! use tokio::net::TcpStream;
74 //!
75 //! #[tokio::main]
76 //! pub async fn main() -> Result<(), Box<dyn Error>> {
77 //! // Establish TCP connection to the server.
78 //! let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79 //! let (h2, connection) = client::handshake(tcp).await?;
80 //! tokio::spawn(async move {
81 //! connection.await.unwrap();
82 //! });
83 //!
84 //! let mut h2 = h2.ready().await?;
85 //! // Prepare the HTTP request to send to the server.
86 //! let request = Request::builder()
87 //! .method(Method::GET)
88 //! .uri("https://www.example.com/")
89 //! .body(())
90 //! .unwrap();
91 //!
92 //! // Send the request. The second tuple item allows the caller
93 //! // to stream a request body.
94 //! let (response, _) = h2.send_request(request, true).unwrap();
95 //!
96 //! let (head, mut body) = response.await?.into_parts();
97 //!
98 //! println!("Received response: {:?}", head);
99 //!
100 //! // The `flow_control` handle allows the caller to manage
101 //! // flow control.
102 //! //
103 //! // Whenever data is received, the caller is responsible for
104 //! // releasing capacity back to the server once it has freed
105 //! // the data from memory.
106 //! let mut flow_control = body.flow_control().clone();
107 //!
108 //! while let Some(chunk) = body.data().await {
109 //! let chunk = chunk?;
110 //! println!("RX: {:?}", chunk);
111 //!
112 //! // Let the server send more data.
113 //! let _ = flow_control.release_capacity(chunk.len());
114 //! }
115 //!
116 //! Ok(())
117 //! }
118 //! ```
119 //!
120 //! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121 //! [`handshake`]: fn.handshake.html
122 //! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123 //! [`SendRequest`]: struct.SendRequest.html
124 //! [`SendStream`]: ../struct.SendStream.html
125 //! [Making requests]: #making-requests
126 //! [Managing the connection]: #managing-the-connection
127 //! [`Connection`]: struct.Connection.html
128 //! [`Connection::poll`]: struct.Connection.html#method.poll
129 //! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130 //! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131 //! [`SendRequest`]: struct.SendRequest.html
132 //! [`ResponseFuture`]: struct.ResponseFuture.html
133 //! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134 //! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135 //! [`Builder`]: struct.Builder.html
136 //! [`Error`]: ../struct.Error.html
137
138 use crate::codec::{Codec, SendError, UserError};
139 use crate::ext::Protocol;
140 use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
141 use crate::proto::{self, Error};
142 use crate::{FlowControl, PingPong, RecvStream, SendStream};
143
144 use bytes::{Buf, Bytes};
145 use http::{uri, HeaderMap, Method, Request, Response, Version};
146 use std::fmt;
147 use std::future::Future;
148 use std::pin::Pin;
149 use std::task::{Context, Poll};
150 use std::time::Duration;
151 use std::usize;
152 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
153 use tracing::Instrument;
154
155 /// Initializes new HTTP/2 streams on a connection by sending a request.
156 ///
157 /// This type does no work itself. Instead, it is a handle to the inner
158 /// connection state held by [`Connection`]. If the associated connection
159 /// instance is dropped, all `SendRequest` functions will return [`Error`].
160 ///
161 /// [`SendRequest`] instances are able to move to and operate on separate tasks
162 /// / threads than their associated [`Connection`] instance. Internally, there
163 /// is a buffer used to stage requests before they get written to the
164 /// connection. There is no guarantee that requests get written to the
165 /// connection in FIFO order as HTTP/2 prioritization logic can play a role.
166 ///
167 /// [`SendRequest`] implements [`Clone`], enabling the creation of many
168 /// instances that are backed by a single connection.
169 ///
170 /// See [module] level documentation for more details.
171 ///
172 /// [module]: index.html
173 /// [`Connection`]: struct.Connection.html
174 /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
175 /// [`Error`]: ../struct.Error.html
176 pub struct SendRequest<B: Buf> {
177 inner: proto::Streams<B, Peer>,
178 pending: Option<proto::OpaqueStreamRef>,
179 }
180
181 /// Returns a `SendRequest` instance once it is ready to send at least one
182 /// request.
183 #[derive(Debug)]
184 pub struct ReadySendRequest<B: Buf> {
185 inner: Option<SendRequest<B>>,
186 }
187
188 /// Manages all state associated with an HTTP/2 client connection.
189 ///
190 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
191 /// implements the HTTP/2 client logic for that connection. It is responsible
192 /// for driving the internal state forward, performing the work requested of the
193 /// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
194 /// [`RecvStream`]).
195 ///
196 /// `Connection` values are created by calling [`handshake`]. Once a
197 /// `Connection` value is obtained, the caller must repeatedly call [`poll`]
198 /// until `Ready` is returned. The easiest way to do this is to submit the
199 /// `Connection` instance to an [executor].
200 ///
201 /// [module]: index.html
202 /// [`handshake`]: fn.handshake.html
203 /// [`SendRequest`]: struct.SendRequest.html
204 /// [`ResponseFuture`]: struct.ResponseFuture.html
205 /// [`SendStream`]: ../struct.SendStream.html
206 /// [`RecvStream`]: ../struct.RecvStream.html
207 /// [`poll`]: #method.poll
208 /// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
209 ///
210 /// # Examples
211 ///
212 /// ```
213 /// # use tokio::io::{AsyncRead, AsyncWrite};
214 /// # use h2::client;
215 /// # use h2::client::*;
216 /// #
217 /// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
218 /// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
219 /// # {
220 /// let (send_request, connection) = client::handshake(my_io).await?;
221 /// // Submit the connection handle to an executor.
222 /// tokio::spawn(async { connection.await.expect("connection failed"); });
223 ///
224 /// // Now, use `send_request` to initialize HTTP/2 streams.
225 /// // ...
226 /// # Ok(())
227 /// # }
228 /// #
229 /// # pub fn main() {}
230 /// ```
231 #[must_use = "futures do nothing unless polled"]
232 pub struct Connection<T, B: Buf = Bytes> {
233 inner: proto::Connection<T, Peer, B>,
234 }
235
236 /// A future of an HTTP response.
237 #[derive(Debug)]
238 #[must_use = "futures do nothing unless polled"]
239 pub struct ResponseFuture {
240 inner: proto::OpaqueStreamRef,
241 push_promise_consumed: bool,
242 }
243
244 /// A future of a pushed HTTP response.
245 ///
246 /// We have to differentiate between pushed and non pushed because of the spec
247 /// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
248 /// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
249 /// > that is in either the "open" or "half-closed (remote)" state.
250 #[derive(Debug)]
251 #[must_use = "futures do nothing unless polled"]
252 pub struct PushedResponseFuture {
253 inner: ResponseFuture,
254 }
255
256 /// A pushed response and corresponding request headers
257 #[derive(Debug)]
258 pub struct PushPromise {
259 /// The request headers
260 request: Request<()>,
261
262 /// The pushed response
263 response: PushedResponseFuture,
264 }
265
266 /// A stream of pushed responses and corresponding promised requests
267 #[derive(Debug)]
268 #[must_use = "streams do nothing unless polled"]
269 pub struct PushPromises {
270 inner: proto::OpaqueStreamRef,
271 }
272
273 /// Builds client connections with custom configuration values.
274 ///
275 /// Methods can be chained in order to set the configuration values.
276 ///
277 /// The client is constructed by calling [`handshake`] and passing the I/O
278 /// handle that will back the HTTP/2 server.
279 ///
280 /// New instances of `Builder` are obtained via [`Builder::new`].
281 ///
282 /// See function level documentation for details on the various client
283 /// configuration settings.
284 ///
285 /// [`Builder::new`]: struct.Builder.html#method.new
286 /// [`handshake`]: struct.Builder.html#method.handshake
287 ///
288 /// # Examples
289 ///
290 /// ```
291 /// # use tokio::io::{AsyncRead, AsyncWrite};
292 /// # use h2::client::*;
293 /// # use bytes::Bytes;
294 /// #
295 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
296 /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
297 /// # {
298 /// // `client_fut` is a future representing the completion of the HTTP/2
299 /// // handshake.
300 /// let client_fut = Builder::new()
301 /// .initial_window_size(1_000_000)
302 /// .max_concurrent_streams(1000)
303 /// .handshake(my_io);
304 /// # client_fut.await
305 /// # }
306 /// #
307 /// # pub fn main() {}
308 /// ```
309 #[derive(Clone, Debug)]
310 pub struct Builder {
311 /// Time to keep locally reset streams around before reaping.
312 reset_stream_duration: Duration,
313
314 /// Initial maximum number of locally initiated (send) streams.
315 /// After receiving a SETTINGS frame from the remote peer,
316 /// the connection will overwrite this value with the
317 /// MAX_CONCURRENT_STREAMS specified in the frame.
318 /// If no value is advertised by the remote peer in the initial SETTINGS
319 /// frame, it will be set to usize::MAX.
320 initial_max_send_streams: usize,
321
322 /// Initial target window size for new connections.
323 initial_target_connection_window_size: Option<u32>,
324
325 /// Maximum amount of bytes to "buffer" for writing per stream.
326 max_send_buffer_size: usize,
327
328 /// Maximum number of locally reset streams to keep at a time.
329 reset_stream_max: usize,
330
331 /// Maximum number of remotely reset streams to allow in the pending
332 /// accept queue.
333 pending_accept_reset_stream_max: usize,
334
335 /// Initial `Settings` frame to send as part of the handshake.
336 settings: Settings,
337
338 /// The stream ID of the first (lowest) stream. Subsequent streams will use
339 /// monotonically increasing stream IDs.
340 stream_id: StreamId,
341
342 /// Maximum number of locally reset streams due to protocol error across
343 /// the lifetime of the connection.
344 ///
345 /// When this gets exceeded, we issue GOAWAYs.
346 local_max_error_reset_streams: Option<usize>,
347 }
348
349 #[derive(Debug)]
350 pub(crate) struct Peer;
351
352 // ===== impl SendRequest =====
353
354 impl<B> SendRequest<B>
355 where
356 B: Buf,
357 {
358 /// Returns `Ready` when the connection can initialize a new HTTP/2
359 /// stream.
360 ///
361 /// This function must return `Ready` before `send_request` is called. When
362 /// `Poll::Pending` is returned, the task will be notified once the readiness
363 /// state changes.
364 ///
365 /// See [module] level docs for more details.
366 ///
367 /// [module]: index.html
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>>368 pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
369 ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
370 self.pending = None;
371 Poll::Ready(Ok(()))
372 }
373
374 /// Consumes `self`, returning a future that returns `self` back once it is
375 /// ready to send a request.
376 ///
377 /// This function should be called before calling `send_request`.
378 ///
379 /// This is a functional combinator for [`poll_ready`]. The returned future
380 /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
381 /// the caller.
382 ///
383 /// # Examples
384 ///
385 /// ```rust
386 /// # use h2::client::*;
387 /// # use http::*;
388 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
389 /// # {
390 /// // First, wait until the `send_request` handle is ready to send a new
391 /// // request
392 /// let mut send_request = send_request.ready().await.unwrap();
393 /// // Use `send_request` here.
394 /// # }
395 /// # pub fn main() {}
396 /// ```
397 ///
398 /// See [module] level docs for more details.
399 ///
400 /// [`poll_ready`]: #method.poll_ready
401 /// [module]: index.html
ready(self) -> ReadySendRequest<B>402 pub fn ready(self) -> ReadySendRequest<B> {
403 ReadySendRequest { inner: Some(self) }
404 }
405
406 /// Sends a HTTP/2 request to the server.
407 ///
408 /// `send_request` initializes a new HTTP/2 stream on the associated
409 /// connection, then sends the given request using this new stream. Only the
410 /// request head is sent.
411 ///
412 /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
413 /// are returned. The [`ResponseFuture`] instance is used to get the
414 /// server's response and the [`SendStream`] instance is used to send a
415 /// request body or trailers to the server over the same HTTP/2 stream.
416 ///
417 /// To send a request body or trailers, set `end_of_stream` to `false`.
418 /// Then, use the returned [`SendStream`] instance to stream request body
419 /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
420 /// then attempting to call [`SendStream::send_data`] or
421 /// [`SendStream::send_trailers`] will result in an error.
422 ///
423 /// If no request body or trailers are to be sent, set `end_of_stream` to
424 /// `true` and drop the returned [`SendStream`] instance.
425 ///
426 /// # A note on HTTP versions
427 ///
428 /// The provided `Request` will be encoded differently depending on the
429 /// value of its version field. If the version is set to 2.0, then the
430 /// request is encoded as per the specification recommends.
431 ///
432 /// If the version is set to a lower value, then the request is encoded to
433 /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
434 /// headers are permitted and the `:authority` pseudo header is not
435 /// included.
436 ///
437 /// The caller should always set the request's version field to 2.0 unless
438 /// specifically transmitting an HTTP 1.1 request over 2.0.
439 ///
440 /// # Examples
441 ///
442 /// Sending a request with no body
443 ///
444 /// ```rust
445 /// # use h2::client::*;
446 /// # use http::*;
447 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
448 /// # {
449 /// // First, wait until the `send_request` handle is ready to send a new
450 /// // request
451 /// let mut send_request = send_request.ready().await.unwrap();
452 /// // Prepare the HTTP request to send to the server.
453 /// let request = Request::get("https://www.example.com/")
454 /// .body(())
455 /// .unwrap();
456 ///
457 /// // Send the request to the server. Since we are not sending a
458 /// // body or trailers, we can drop the `SendStream` instance.
459 /// let (response, _) = send_request.send_request(request, true).unwrap();
460 /// let response = response.await.unwrap();
461 /// // Process the response
462 /// # }
463 /// # pub fn main() {}
464 /// ```
465 ///
466 /// Sending a request with a body and trailers
467 ///
468 /// ```rust
469 /// # use h2::client::*;
470 /// # use http::*;
471 /// # async fn doc(send_request: SendRequest<&'static [u8]>)
472 /// # {
473 /// // First, wait until the `send_request` handle is ready to send a new
474 /// // request
475 /// let mut send_request = send_request.ready().await.unwrap();
476 ///
477 /// // Prepare the HTTP request to send to the server.
478 /// let request = Request::get("https://www.example.com/")
479 /// .body(())
480 /// .unwrap();
481 ///
482 /// // Send the request to the server. If we are not sending a
483 /// // body or trailers, we can drop the `SendStream` instance.
484 /// let (response, mut send_stream) = send_request
485 /// .send_request(request, false).unwrap();
486 ///
487 /// // At this point, one option would be to wait for send capacity.
488 /// // Doing so would allow us to not hold data in memory that
489 /// // cannot be sent. However, this is not a requirement, so this
490 /// // example will skip that step. See `SendStream` documentation
491 /// // for more details.
492 /// send_stream.send_data(b"hello", false).unwrap();
493 /// send_stream.send_data(b"world", false).unwrap();
494 ///
495 /// // Send the trailers.
496 /// let mut trailers = HeaderMap::new();
497 /// trailers.insert(
498 /// header::HeaderName::from_bytes(b"my-trailer").unwrap(),
499 /// header::HeaderValue::from_bytes(b"hello").unwrap());
500 ///
501 /// send_stream.send_trailers(trailers).unwrap();
502 ///
503 /// let response = response.await.unwrap();
504 /// // Process the response
505 /// # }
506 /// # pub fn main() {}
507 /// ```
508 ///
509 /// [`ResponseFuture`]: struct.ResponseFuture.html
510 /// [`SendStream`]: ../struct.SendStream.html
511 /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
512 /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
send_request( &mut self, request: Request<()>, end_of_stream: bool, ) -> Result<(ResponseFuture, SendStream<B>), crate::Error>513 pub fn send_request(
514 &mut self,
515 request: Request<()>,
516 end_of_stream: bool,
517 ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
518 self.inner
519 .send_request(request, end_of_stream, self.pending.as_ref())
520 .map_err(Into::into)
521 .map(|(stream, is_full)| {
522 if stream.is_pending_open() && is_full {
523 // Only prevent sending another request when the request queue
524 // is not full.
525 self.pending = Some(stream.clone_to_opaque());
526 }
527
528 let response = ResponseFuture {
529 inner: stream.clone_to_opaque(),
530 push_promise_consumed: false,
531 };
532
533 let stream = SendStream::new(stream);
534
535 (response, stream)
536 })
537 }
538
539 /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
540 ///
541 /// This setting is configured by the server peer by sending the
542 /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
543 /// This method returns the currently acknowledged value received from the
544 /// remote.
545 ///
546 /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
547 /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
is_extended_connect_protocol_enabled(&self) -> bool548 pub fn is_extended_connect_protocol_enabled(&self) -> bool {
549 self.inner.is_extended_connect_protocol_enabled()
550 }
551 }
552
553 impl<B> fmt::Debug for SendRequest<B>
554 where
555 B: Buf,
556 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result557 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
558 fmt.debug_struct("SendRequest").finish()
559 }
560 }
561
562 impl<B> Clone for SendRequest<B>
563 where
564 B: Buf,
565 {
clone(&self) -> Self566 fn clone(&self) -> Self {
567 SendRequest {
568 inner: self.inner.clone(),
569 pending: None,
570 }
571 }
572 }
573
574 #[cfg(feature = "unstable")]
575 impl<B> SendRequest<B>
576 where
577 B: Buf,
578 {
579 /// Returns the number of active streams.
580 ///
581 /// An active stream is a stream that has not yet transitioned to a closed
582 /// state.
num_active_streams(&self) -> usize583 pub fn num_active_streams(&self) -> usize {
584 self.inner.num_active_streams()
585 }
586
587 /// Returns the number of streams that are held in memory.
588 ///
589 /// A wired stream is a stream that is either active or is closed but must
590 /// stay in memory for some reason. For example, there are still outstanding
591 /// userspace handles pointing to the slot.
num_wired_streams(&self) -> usize592 pub fn num_wired_streams(&self) -> usize {
593 self.inner.num_wired_streams()
594 }
595 }
596
597 // ===== impl ReadySendRequest =====
598
599 impl<B> Future for ReadySendRequest<B>
600 where
601 B: Buf,
602 {
603 type Output = Result<SendRequest<B>, crate::Error>;
604
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>605 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
606 match &mut self.inner {
607 Some(send_request) => {
608 ready!(send_request.poll_ready(cx))?;
609 }
610 None => panic!("called `poll` after future completed"),
611 }
612
613 Poll::Ready(Ok(self.inner.take().unwrap()))
614 }
615 }
616
617 // ===== impl Builder =====
618
619 impl Builder {
620 /// Returns a new client builder instance initialized with default
621 /// configuration values.
622 ///
623 /// Configuration methods can be chained on the return value.
624 ///
625 /// # Examples
626 ///
627 /// ```
628 /// # use tokio::io::{AsyncRead, AsyncWrite};
629 /// # use h2::client::*;
630 /// # use bytes::Bytes;
631 /// #
632 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
633 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
634 /// # {
635 /// // `client_fut` is a future representing the completion of the HTTP/2
636 /// // handshake.
637 /// let client_fut = Builder::new()
638 /// .initial_window_size(1_000_000)
639 /// .max_concurrent_streams(1000)
640 /// .handshake(my_io);
641 /// # client_fut.await
642 /// # }
643 /// #
644 /// # pub fn main() {}
645 /// ```
new() -> Builder646 pub fn new() -> Builder {
647 Builder {
648 max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
649 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
650 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
651 pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
652 initial_target_connection_window_size: None,
653 initial_max_send_streams: usize::MAX,
654 settings: Default::default(),
655 stream_id: 1.into(),
656 local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
657 }
658 }
659
660 /// Indicates the initial window size (in octets) for stream-level
661 /// flow control for received data.
662 ///
663 /// The initial window of a stream is used as part of flow control. For more
664 /// details, see [`FlowControl`].
665 ///
666 /// The default value is 65,535.
667 ///
668 /// [`FlowControl`]: ../struct.FlowControl.html
669 ///
670 /// # Examples
671 ///
672 /// ```
673 /// # use tokio::io::{AsyncRead, AsyncWrite};
674 /// # use h2::client::*;
675 /// # use bytes::Bytes;
676 /// #
677 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
678 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
679 /// # {
680 /// // `client_fut` is a future representing the completion of the HTTP/2
681 /// // handshake.
682 /// let client_fut = Builder::new()
683 /// .initial_window_size(1_000_000)
684 /// .handshake(my_io);
685 /// # client_fut.await
686 /// # }
687 /// #
688 /// # pub fn main() {}
689 /// ```
initial_window_size(&mut self, size: u32) -> &mut Self690 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
691 self.settings.set_initial_window_size(Some(size));
692 self
693 }
694
695 /// Indicates the initial window size (in octets) for connection-level flow control
696 /// for received data.
697 ///
698 /// The initial window of a connection is used as part of flow control. For more details,
699 /// see [`FlowControl`].
700 ///
701 /// The default value is 65,535.
702 ///
703 /// [`FlowControl`]: ../struct.FlowControl.html
704 ///
705 /// # Examples
706 ///
707 /// ```
708 /// # use tokio::io::{AsyncRead, AsyncWrite};
709 /// # use h2::client::*;
710 /// # use bytes::Bytes;
711 /// #
712 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
713 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
714 /// # {
715 /// // `client_fut` is a future representing the completion of the HTTP/2
716 /// // handshake.
717 /// let client_fut = Builder::new()
718 /// .initial_connection_window_size(1_000_000)
719 /// .handshake(my_io);
720 /// # client_fut.await
721 /// # }
722 /// #
723 /// # pub fn main() {}
724 /// ```
initial_connection_window_size(&mut self, size: u32) -> &mut Self725 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
726 self.initial_target_connection_window_size = Some(size);
727 self
728 }
729
730 /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
731 /// configured client is able to accept.
732 ///
733 /// The sender may send data frames that are **smaller** than this value,
734 /// but any data larger than `max` will be broken up into multiple `DATA`
735 /// frames.
736 ///
737 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
738 ///
739 /// # Examples
740 ///
741 /// ```
742 /// # use tokio::io::{AsyncRead, AsyncWrite};
743 /// # use h2::client::*;
744 /// # use bytes::Bytes;
745 /// #
746 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
747 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
748 /// # {
749 /// // `client_fut` is a future representing the completion of the HTTP/2
750 /// // handshake.
751 /// let client_fut = Builder::new()
752 /// .max_frame_size(1_000_000)
753 /// .handshake(my_io);
754 /// # client_fut.await
755 /// # }
756 /// #
757 /// # pub fn main() {}
758 /// ```
759 ///
760 /// # Panics
761 ///
762 /// This function panics if `max` is not within the legal range specified
763 /// above.
max_frame_size(&mut self, max: u32) -> &mut Self764 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
765 self.settings.set_max_frame_size(Some(max));
766 self
767 }
768
769 /// Sets the max size of received header frames.
770 ///
771 /// This advisory setting informs a peer of the maximum size of header list
772 /// that the sender is prepared to accept, in octets. The value is based on
773 /// the uncompressed size of header fields, including the length of the name
774 /// and value in octets plus an overhead of 32 octets for each header field.
775 ///
776 /// This setting is also used to limit the maximum amount of data that is
777 /// buffered to decode HEADERS frames.
778 ///
779 /// # Examples
780 ///
781 /// ```
782 /// # use tokio::io::{AsyncRead, AsyncWrite};
783 /// # use h2::client::*;
784 /// # use bytes::Bytes;
785 /// #
786 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
787 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
788 /// # {
789 /// // `client_fut` is a future representing the completion of the HTTP/2
790 /// // handshake.
791 /// let client_fut = Builder::new()
792 /// .max_header_list_size(16 * 1024)
793 /// .handshake(my_io);
794 /// # client_fut.await
795 /// # }
796 /// #
797 /// # pub fn main() {}
798 /// ```
max_header_list_size(&mut self, max: u32) -> &mut Self799 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
800 self.settings.set_max_header_list_size(Some(max));
801 self
802 }
803
804 /// Sets the maximum number of concurrent streams.
805 ///
806 /// The maximum concurrent streams setting only controls the maximum number
807 /// of streams that can be initiated by the remote peer. In other words,
808 /// when this setting is set to 100, this does not limit the number of
809 /// concurrent streams that can be created by the caller.
810 ///
811 /// It is recommended that this value be no smaller than 100, so as to not
812 /// unnecessarily limit parallelism. However, any value is legal, including
813 /// 0. If `max` is set to 0, then the remote will not be permitted to
814 /// initiate streams.
815 ///
816 /// Note that streams in the reserved state, i.e., push promises that have
817 /// been reserved but the stream has not started, do not count against this
818 /// setting.
819 ///
820 /// Also note that if the remote *does* exceed the value set here, it is not
821 /// a protocol level error. Instead, the `h2` library will immediately reset
822 /// the stream.
823 ///
824 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
825 ///
826 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
827 ///
828 /// # Examples
829 ///
830 /// ```
831 /// # use tokio::io::{AsyncRead, AsyncWrite};
832 /// # use h2::client::*;
833 /// # use bytes::Bytes;
834 /// #
835 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
836 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
837 /// # {
838 /// // `client_fut` is a future representing the completion of the HTTP/2
839 /// // handshake.
840 /// let client_fut = Builder::new()
841 /// .max_concurrent_streams(1000)
842 /// .handshake(my_io);
843 /// # client_fut.await
844 /// # }
845 /// #
846 /// # pub fn main() {}
847 /// ```
max_concurrent_streams(&mut self, max: u32) -> &mut Self848 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
849 self.settings.set_max_concurrent_streams(Some(max));
850 self
851 }
852
853 /// Sets the initial maximum of locally initiated (send) streams.
854 ///
855 /// The initial settings will be overwritten by the remote peer when
856 /// the SETTINGS frame is received. The new value will be set to the
857 /// `max_concurrent_streams()` from the frame. If no value is advertised in
858 /// the initial SETTINGS frame from the remote peer as part of
859 /// [HTTP/2 Connection Preface], `usize::MAX` will be set.
860 ///
861 /// This setting prevents the caller from exceeding this number of
862 /// streams that are counted towards the concurrency limit.
863 ///
864 /// Sending streams past the limit returned by the peer will be treated
865 /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
866 ///
867 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
868 ///
869 /// The default value is `usize::MAX`.
870 ///
871 /// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface
872 /// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2
873 ///
874 /// # Examples
875 ///
876 /// ```
877 /// # use tokio::io::{AsyncRead, AsyncWrite};
878 /// # use h2::client::*;
879 /// # use bytes::Bytes;
880 /// #
881 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
882 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
883 /// # {
884 /// // `client_fut` is a future representing the completion of the HTTP/2
885 /// // handshake.
886 /// let client_fut = Builder::new()
887 /// .initial_max_send_streams(1000)
888 /// .handshake(my_io);
889 /// # client_fut.await
890 /// # }
891 /// #
892 /// # pub fn main() {}
893 /// ```
initial_max_send_streams(&mut self, initial: usize) -> &mut Self894 pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
895 self.initial_max_send_streams = initial;
896 self
897 }
898
899 /// Sets the maximum number of concurrent locally reset streams.
900 ///
901 /// When a stream is explicitly reset, the HTTP/2 specification requires
902 /// that any further frames received for that stream must be ignored for
903 /// "some time".
904 ///
905 /// In order to satisfy the specification, internal state must be maintained
906 /// to implement the behavior. This state grows linearly with the number of
907 /// streams that are locally reset.
908 ///
909 /// The `max_concurrent_reset_streams` setting configures sets an upper
910 /// bound on the amount of state that is maintained. When this max value is
911 /// reached, the oldest reset stream is purged from memory.
912 ///
913 /// Once the stream has been fully purged from memory, any additional frames
914 /// received for that stream will result in a connection level protocol
915 /// error, forcing the connection to terminate.
916 ///
917 /// The default value is 10.
918 ///
919 /// # Examples
920 ///
921 /// ```
922 /// # use tokio::io::{AsyncRead, AsyncWrite};
923 /// # use h2::client::*;
924 /// # use bytes::Bytes;
925 /// #
926 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
927 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
928 /// # {
929 /// // `client_fut` is a future representing the completion of the HTTP/2
930 /// // handshake.
931 /// let client_fut = Builder::new()
932 /// .max_concurrent_reset_streams(1000)
933 /// .handshake(my_io);
934 /// # client_fut.await
935 /// # }
936 /// #
937 /// # pub fn main() {}
938 /// ```
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self939 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
940 self.reset_stream_max = max;
941 self
942 }
943
944 /// Sets the duration to remember locally reset streams.
945 ///
946 /// When a stream is explicitly reset, the HTTP/2 specification requires
947 /// that any further frames received for that stream must be ignored for
948 /// "some time".
949 ///
950 /// In order to satisfy the specification, internal state must be maintained
951 /// to implement the behavior. This state grows linearly with the number of
952 /// streams that are locally reset.
953 ///
954 /// The `reset_stream_duration` setting configures the max amount of time
955 /// this state will be maintained in memory. Once the duration elapses, the
956 /// stream state is purged from memory.
957 ///
958 /// Once the stream has been fully purged from memory, any additional frames
959 /// received for that stream will result in a connection level protocol
960 /// error, forcing the connection to terminate.
961 ///
962 /// The default value is 30 seconds.
963 ///
964 /// # Examples
965 ///
966 /// ```
967 /// # use tokio::io::{AsyncRead, AsyncWrite};
968 /// # use h2::client::*;
969 /// # use std::time::Duration;
970 /// # use bytes::Bytes;
971 /// #
972 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
973 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
974 /// # {
975 /// // `client_fut` is a future representing the completion of the HTTP/2
976 /// // handshake.
977 /// let client_fut = Builder::new()
978 /// .reset_stream_duration(Duration::from_secs(10))
979 /// .handshake(my_io);
980 /// # client_fut.await
981 /// # }
982 /// #
983 /// # pub fn main() {}
984 /// ```
reset_stream_duration(&mut self, dur: Duration) -> &mut Self985 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
986 self.reset_stream_duration = dur;
987 self
988 }
989
990 /// Sets the maximum number of local resets due to protocol errors made by the remote end.
991 ///
992 /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
993 /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
994 /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
995 ///
996 /// When the number of local resets exceeds this threshold, the client will close the connection.
997 ///
998 /// If you really want to disable this, supply [`Option::None`] here.
999 /// Disabling this is not recommended and may expose you to DOS attacks.
1000 ///
1001 /// The default value is currently 1024, but could change.
max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self1002 pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
1003 self.local_max_error_reset_streams = max;
1004 self
1005 }
1006
1007 /// Sets the maximum number of pending-accept remotely-reset streams.
1008 ///
1009 /// Streams that have been received by the peer, but not accepted by the
1010 /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
1011 /// could send a request and then shortly after, realize it is not needed,
1012 /// sending a CANCEL.
1013 ///
1014 /// However, since those streams are now "closed", they don't count towards
1015 /// the max concurrent streams. So, they will sit in the accept queue,
1016 /// using memory.
1017 ///
1018 /// When the number of remotely-reset streams sitting in the pending-accept
1019 /// queue reaches this maximum value, a connection error with the code of
1020 /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
1021 /// `Future`.
1022 ///
1023 /// The default value is currently 20, but could change.
1024 ///
1025 /// # Examples
1026 ///
1027 /// ```
1028 /// # use tokio::io::{AsyncRead, AsyncWrite};
1029 /// # use h2::client::*;
1030 /// # use bytes::Bytes;
1031 /// #
1032 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1033 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1034 /// # {
1035 /// // `client_fut` is a future representing the completion of the HTTP/2
1036 /// // handshake.
1037 /// let client_fut = Builder::new()
1038 /// .max_pending_accept_reset_streams(100)
1039 /// .handshake(my_io);
1040 /// # client_fut.await
1041 /// # }
1042 /// #
1043 /// # pub fn main() {}
1044 /// ```
max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self1045 pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
1046 self.pending_accept_reset_stream_max = max;
1047 self
1048 }
1049
1050 /// Sets the maximum send buffer size per stream.
1051 ///
1052 /// Once a stream has buffered up to (or over) the maximum, the stream's
1053 /// flow control will not "poll" additional capacity. Once bytes for the
1054 /// stream have been written to the connection, the send buffer capacity
1055 /// will be freed up again.
1056 ///
1057 /// The default is currently ~400KB, but may change.
1058 ///
1059 /// # Panics
1060 ///
1061 /// This function panics if `max` is larger than `u32::MAX`.
max_send_buffer_size(&mut self, max: usize) -> &mut Self1062 pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
1063 assert!(max <= std::u32::MAX as usize);
1064 self.max_send_buffer_size = max;
1065 self
1066 }
1067
1068 /// Enables or disables server push promises.
1069 ///
1070 /// This value is included in the initial SETTINGS handshake.
1071 /// Setting this value to value to
1072 /// false in the initial SETTINGS handshake guarantees that the remote server
1073 /// will never send a push promise.
1074 ///
1075 /// This setting can be changed during the life of a single HTTP/2
1076 /// connection by sending another settings frame updating the value.
1077 ///
1078 /// Default value: `true`.
1079 ///
1080 /// # Examples
1081 ///
1082 /// ```
1083 /// # use tokio::io::{AsyncRead, AsyncWrite};
1084 /// # use h2::client::*;
1085 /// # use std::time::Duration;
1086 /// # use bytes::Bytes;
1087 /// #
1088 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1089 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1090 /// # {
1091 /// // `client_fut` is a future representing the completion of the HTTP/2
1092 /// // handshake.
1093 /// let client_fut = Builder::new()
1094 /// .enable_push(false)
1095 /// .handshake(my_io);
1096 /// # client_fut.await
1097 /// # }
1098 /// #
1099 /// # pub fn main() {}
1100 /// ```
enable_push(&mut self, enabled: bool) -> &mut Self1101 pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1102 self.settings.set_enable_push(enabled);
1103 self
1104 }
1105
1106 /// Sets the header table size.
1107 ///
1108 /// This setting informs the peer of the maximum size of the header compression
1109 /// table used to encode header blocks, in octets. The encoder may select any value
1110 /// equal to or less than the header table size specified by the sender.
1111 ///
1112 /// The default value is 4,096.
1113 ///
1114 /// # Examples
1115 ///
1116 /// ```
1117 /// # use tokio::io::{AsyncRead, AsyncWrite};
1118 /// # use h2::client::*;
1119 /// # use bytes::Bytes;
1120 /// #
1121 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1122 /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1123 /// # {
1124 /// // `client_fut` is a future representing the completion of the HTTP/2
1125 /// // handshake.
1126 /// let client_fut = Builder::new()
1127 /// .header_table_size(1_000_000)
1128 /// .handshake(my_io);
1129 /// # client_fut.await
1130 /// # }
1131 /// #
1132 /// # pub fn main() {}
1133 /// ```
header_table_size(&mut self, size: u32) -> &mut Self1134 pub fn header_table_size(&mut self, size: u32) -> &mut Self {
1135 self.settings.set_header_table_size(Some(size));
1136 self
1137 }
1138
1139 /// Sets the first stream ID to something other than 1.
1140 #[cfg(feature = "unstable")]
initial_stream_id(&mut self, stream_id: u32) -> &mut Self1141 pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1142 self.stream_id = stream_id.into();
1143 assert!(
1144 self.stream_id.is_client_initiated(),
1145 "stream id must be odd"
1146 );
1147 self
1148 }
1149
1150 /// Creates a new configured HTTP/2 client backed by `io`.
1151 ///
1152 /// It is expected that `io` already be in an appropriate state to commence
1153 /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1154 /// preface and the initial settings frame is sent by the client.
1155 ///
1156 /// The handshake future does not wait for the initial settings frame from the
1157 /// server.
1158 ///
1159 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1160 /// tuple once the HTTP/2 handshake has been completed.
1161 ///
1162 /// This function also allows the caller to configure the send payload data
1163 /// type. See [Outbound data type] for more details.
1164 ///
1165 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1166 /// [`Connection`]: struct.Connection.html
1167 /// [`SendRequest`]: struct.SendRequest.html
1168 /// [Outbound data type]: ../index.html#outbound-data-type.
1169 ///
1170 /// # Examples
1171 ///
1172 /// Basic usage:
1173 ///
1174 /// ```
1175 /// # use tokio::io::{AsyncRead, AsyncWrite};
1176 /// # use h2::client::*;
1177 /// # use bytes::Bytes;
1178 /// #
1179 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1180 /// -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1181 /// # {
1182 /// // `client_fut` is a future representing the completion of the HTTP/2
1183 /// // handshake.
1184 /// let client_fut = Builder::new()
1185 /// .handshake(my_io);
1186 /// # client_fut.await
1187 /// # }
1188 /// #
1189 /// # pub fn main() {}
1190 /// ```
1191 ///
1192 /// Configures the send-payload data type. In this case, the outbound data
1193 /// type will be `&'static [u8]`.
1194 ///
1195 /// ```
1196 /// # use tokio::io::{AsyncRead, AsyncWrite};
1197 /// # use h2::client::*;
1198 /// #
1199 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1200 /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1201 /// # {
1202 /// // `client_fut` is a future representing the completion of the HTTP/2
1203 /// // handshake.
1204 /// let client_fut = Builder::new()
1205 /// .handshake::<_, &'static [u8]>(my_io);
1206 /// # client_fut.await
1207 /// # }
1208 /// #
1209 /// # pub fn main() {}
1210 /// ```
handshake<T, B>( &self, io: T, ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>> where T: AsyncRead + AsyncWrite + Unpin, B: Buf,1211 pub fn handshake<T, B>(
1212 &self,
1213 io: T,
1214 ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1215 where
1216 T: AsyncRead + AsyncWrite + Unpin,
1217 B: Buf,
1218 {
1219 Connection::handshake2(io, self.clone())
1220 }
1221 }
1222
1223 impl Default for Builder {
default() -> Builder1224 fn default() -> Builder {
1225 Builder::new()
1226 }
1227 }
1228
1229 /// Creates a new configured HTTP/2 client with default configuration
1230 /// values backed by `io`.
1231 ///
1232 /// It is expected that `io` already be in an appropriate state to commence
1233 /// the [HTTP/2 handshake]. See [Handshake] for more details.
1234 ///
1235 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1236 /// tuple once the HTTP/2 handshake has been completed. The returned
1237 /// [`Connection`] instance will be using default configuration values. Use
1238 /// [`Builder`] to customize the configuration values used by a [`Connection`]
1239 /// instance.
1240 ///
1241 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1242 /// [Handshake]: ../index.html#handshake
1243 /// [`Connection`]: struct.Connection.html
1244 /// [`SendRequest`]: struct.SendRequest.html
1245 ///
1246 /// # Examples
1247 ///
1248 /// ```
1249 /// # use tokio::io::{AsyncRead, AsyncWrite};
1250 /// # use h2::client;
1251 /// # use h2::client::*;
1252 /// #
1253 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
1254 /// # {
1255 /// let (send_request, connection) = client::handshake(my_io).await?;
1256 /// // The HTTP/2 handshake has completed, now start polling
1257 /// // `connection` and use `send_request` to send requests to the
1258 /// // server.
1259 /// # Ok(())
1260 /// # }
1261 /// #
1262 /// # pub fn main() {}
1263 /// ```
handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error> where T: AsyncRead + AsyncWrite + Unpin,1264 pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1265 where
1266 T: AsyncRead + AsyncWrite + Unpin,
1267 {
1268 let builder = Builder::new();
1269 builder
1270 .handshake(io)
1271 .instrument(tracing::trace_span!("client_handshake"))
1272 .await
1273 }
1274
1275 // ===== impl Connection =====
1276
bind_connection<T>(io: &mut T) -> Result<(), crate::Error> where T: AsyncRead + AsyncWrite + Unpin,1277 async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1278 where
1279 T: AsyncRead + AsyncWrite + Unpin,
1280 {
1281 tracing::debug!("binding client connection");
1282
1283 let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1284 io.write_all(msg).await.map_err(crate::Error::from_io)?;
1285
1286 tracing::debug!("client connection bound");
1287
1288 Ok(())
1289 }
1290
1291 impl<T, B> Connection<T, B>
1292 where
1293 T: AsyncRead + AsyncWrite + Unpin,
1294 B: Buf,
1295 {
handshake2( mut io: T, builder: Builder, ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error>1296 async fn handshake2(
1297 mut io: T,
1298 builder: Builder,
1299 ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1300 bind_connection(&mut io).await?;
1301
1302 // Create the codec
1303 let mut codec = Codec::new(io);
1304
1305 if let Some(max) = builder.settings.max_frame_size() {
1306 codec.set_max_recv_frame_size(max as usize);
1307 }
1308
1309 if let Some(max) = builder.settings.max_header_list_size() {
1310 codec.set_max_recv_header_list_size(max as usize);
1311 }
1312
1313 // Send initial settings frame
1314 codec
1315 .buffer(builder.settings.clone().into())
1316 .expect("invalid SETTINGS frame");
1317
1318 let inner = proto::Connection::new(
1319 codec,
1320 proto::Config {
1321 next_stream_id: builder.stream_id,
1322 initial_max_send_streams: builder.initial_max_send_streams,
1323 max_send_buffer_size: builder.max_send_buffer_size,
1324 reset_stream_duration: builder.reset_stream_duration,
1325 reset_stream_max: builder.reset_stream_max,
1326 remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1327 local_error_reset_streams_max: builder.local_max_error_reset_streams,
1328 settings: builder.settings.clone(),
1329 },
1330 );
1331 let send_request = SendRequest {
1332 inner: inner.streams().clone(),
1333 pending: None,
1334 };
1335
1336 let mut connection = Connection { inner };
1337 if let Some(sz) = builder.initial_target_connection_window_size {
1338 connection.set_target_window_size(sz);
1339 }
1340
1341 Ok((send_request, connection))
1342 }
1343
1344 /// Sets the target window size for the whole connection.
1345 ///
1346 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1347 /// frame will be immediately sent to the remote, increasing the connection
1348 /// level window by `size - current_value`.
1349 ///
1350 /// If `size` is less than the current value, nothing will happen
1351 /// immediately. However, as window capacity is released by
1352 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1353 /// out until the number of "in flight" bytes drops below `size`.
1354 ///
1355 /// The default value is 65,535.
1356 ///
1357 /// See [`FlowControl`] documentation for more details.
1358 ///
1359 /// [`FlowControl`]: ../struct.FlowControl.html
1360 /// [library level]: ../index.html#flow-control
set_target_window_size(&mut self, size: u32)1361 pub fn set_target_window_size(&mut self, size: u32) {
1362 assert!(size <= proto::MAX_WINDOW_SIZE);
1363 self.inner.set_target_window_size(size);
1364 }
1365
1366 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1367 /// flow control for received data.
1368 ///
1369 /// The `SETTINGS` will be sent to the remote, and only applied once the
1370 /// remote acknowledges the change.
1371 ///
1372 /// This can be used to increase or decrease the window size for existing
1373 /// streams.
1374 ///
1375 /// # Errors
1376 ///
1377 /// Returns an error if a previous call is still pending acknowledgement
1378 /// from the remote endpoint.
set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error>1379 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1380 assert!(size <= proto::MAX_WINDOW_SIZE);
1381 self.inner.set_initial_window_size(size)?;
1382 Ok(())
1383 }
1384
1385 /// Takes a `PingPong` instance from the connection.
1386 ///
1387 /// # Note
1388 ///
1389 /// This may only be called once. Calling multiple times will return `None`.
ping_pong(&mut self) -> Option<PingPong>1390 pub fn ping_pong(&mut self) -> Option<PingPong> {
1391 self.inner.take_user_pings().map(PingPong::new)
1392 }
1393
1394 /// Returns the maximum number of concurrent streams that may be initiated
1395 /// by this client.
1396 ///
1397 /// This limit is configured by the server peer by sending the
1398 /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1399 /// This method returns the currently acknowledged value received from the
1400 /// remote.
1401 ///
1402 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
max_concurrent_send_streams(&self) -> usize1403 pub fn max_concurrent_send_streams(&self) -> usize {
1404 self.inner.max_send_streams()
1405 }
1406 /// Returns the maximum number of concurrent streams that may be initiated
1407 /// by the server on this connection.
1408 ///
1409 /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1410 /// parameter][1] sent in a `SETTINGS` frame that has been
1411 /// acknowledged by the remote peer. The value to be sent is configured by
1412 /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1413 /// with the remote peer.
1414 ///
1415 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1416 /// [2]: ../struct.Builder.html#method.max_concurrent_streams
max_concurrent_recv_streams(&self) -> usize1417 pub fn max_concurrent_recv_streams(&self) -> usize {
1418 self.inner.max_recv_streams()
1419 }
1420 }
1421
1422 impl<T, B> Future for Connection<T, B>
1423 where
1424 T: AsyncRead + AsyncWrite + Unpin,
1425 B: Buf,
1426 {
1427 type Output = Result<(), crate::Error>;
1428
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1429 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1430 self.inner.maybe_close_connection_if_no_streams();
1431 self.inner.poll(cx).map_err(Into::into)
1432 }
1433 }
1434
1435 impl<T, B> fmt::Debug for Connection<T, B>
1436 where
1437 T: AsyncRead + AsyncWrite,
1438 T: fmt::Debug,
1439 B: fmt::Debug + Buf,
1440 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1441 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1442 fmt::Debug::fmt(&self.inner, fmt)
1443 }
1444 }
1445
1446 // ===== impl ResponseFuture =====
1447
1448 impl Future for ResponseFuture {
1449 type Output = Result<Response<RecvStream>, crate::Error>;
1450
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1451 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1452 let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1453 let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1454
1455 Poll::Ready(Ok(Response::from_parts(parts, body)))
1456 }
1457 }
1458
1459 impl ResponseFuture {
1460 /// Returns the stream ID of the response stream.
1461 ///
1462 /// # Panics
1463 ///
1464 /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1465 pub fn stream_id(&self) -> crate::StreamId {
1466 crate::StreamId::from_internal(self.inner.stream_id())
1467 }
1468 /// Returns a stream of PushPromises
1469 ///
1470 /// # Panics
1471 ///
1472 /// If this method has been called before
1473 /// or the stream was itself was pushed
push_promises(&mut self) -> PushPromises1474 pub fn push_promises(&mut self) -> PushPromises {
1475 if self.push_promise_consumed {
1476 panic!("Reference to push promises stream taken!");
1477 }
1478 self.push_promise_consumed = true;
1479 PushPromises {
1480 inner: self.inner.clone(),
1481 }
1482 }
1483 }
1484
1485 // ===== impl PushPromises =====
1486
1487 impl PushPromises {
1488 /// Get the next `PushPromise`.
push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>>1489 pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1490 futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await
1491 }
1492
1493 #[doc(hidden)]
poll_push_promise( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<PushPromise, crate::Error>>>1494 pub fn poll_push_promise(
1495 &mut self,
1496 cx: &mut Context<'_>,
1497 ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1498 match self.inner.poll_pushed(cx) {
1499 Poll::Ready(Some(Ok((request, response)))) => {
1500 let response = PushedResponseFuture {
1501 inner: ResponseFuture {
1502 inner: response,
1503 push_promise_consumed: false,
1504 },
1505 };
1506 Poll::Ready(Some(Ok(PushPromise { request, response })))
1507 }
1508 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1509 Poll::Ready(None) => Poll::Ready(None),
1510 Poll::Pending => Poll::Pending,
1511 }
1512 }
1513 }
1514
1515 #[cfg(feature = "stream")]
1516 impl futures_core::Stream for PushPromises {
1517 type Item = Result<PushPromise, crate::Error>;
1518
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>1519 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1520 self.poll_push_promise(cx)
1521 }
1522 }
1523
1524 // ===== impl PushPromise =====
1525
1526 impl PushPromise {
1527 /// Returns a reference to the push promise's request headers.
request(&self) -> &Request<()>1528 pub fn request(&self) -> &Request<()> {
1529 &self.request
1530 }
1531
1532 /// Returns a mutable reference to the push promise's request headers.
request_mut(&mut self) -> &mut Request<()>1533 pub fn request_mut(&mut self) -> &mut Request<()> {
1534 &mut self.request
1535 }
1536
1537 /// Consumes `self`, returning the push promise's request headers and
1538 /// response future.
into_parts(self) -> (Request<()>, PushedResponseFuture)1539 pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1540 (self.request, self.response)
1541 }
1542 }
1543
1544 // ===== impl PushedResponseFuture =====
1545
1546 impl Future for PushedResponseFuture {
1547 type Output = Result<Response<RecvStream>, crate::Error>;
1548
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1549 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1550 Pin::new(&mut self.inner).poll(cx)
1551 }
1552 }
1553
1554 impl PushedResponseFuture {
1555 /// Returns the stream ID of the response stream.
1556 ///
1557 /// # Panics
1558 ///
1559 /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1560 pub fn stream_id(&self) -> crate::StreamId {
1561 self.inner.stream_id()
1562 }
1563 }
1564
1565 // ===== impl Peer =====
1566
1567 impl Peer {
convert_send_message( id: StreamId, request: Request<()>, protocol: Option<Protocol>, end_of_stream: bool, ) -> Result<Headers, SendError>1568 pub fn convert_send_message(
1569 id: StreamId,
1570 request: Request<()>,
1571 protocol: Option<Protocol>,
1572 end_of_stream: bool,
1573 ) -> Result<Headers, SendError> {
1574 use http::request::Parts;
1575
1576 let (
1577 Parts {
1578 method,
1579 uri,
1580 headers,
1581 version,
1582 ..
1583 },
1584 _,
1585 ) = request.into_parts();
1586
1587 let is_connect = method == Method::CONNECT;
1588
1589 // Build the set pseudo header set. All requests will include `method`
1590 // and `path`.
1591 let mut pseudo = Pseudo::request(method, uri, protocol);
1592
1593 if pseudo.scheme.is_none() {
1594 // If the scheme is not set, then there are a two options.
1595 //
1596 // 1) Authority is not set. In this case, a request was issued with
1597 // a relative URI. This is permitted **only** when forwarding
1598 // HTTP 1.x requests. If the HTTP version is set to 2.0, then
1599 // this is an error.
1600 //
1601 // 2) Authority is set, then the HTTP method *must* be CONNECT.
1602 //
1603 // It is not possible to have a scheme but not an authority set (the
1604 // `http` crate does not allow it).
1605 //
1606 if pseudo.authority.is_none() {
1607 if version == Version::HTTP_2 {
1608 return Err(UserError::MissingUriSchemeAndAuthority.into());
1609 } else {
1610 // This is acceptable as per the above comment. However,
1611 // HTTP/2 requires that a scheme is set. Since we are
1612 // forwarding an HTTP 1.1 request, the scheme is set to
1613 // "http".
1614 pseudo.set_scheme(uri::Scheme::HTTP);
1615 }
1616 } else if !is_connect {
1617 // TODO: Error
1618 }
1619 }
1620
1621 // Create the HEADERS frame
1622 let mut frame = Headers::new(id, pseudo, headers);
1623
1624 if end_of_stream {
1625 frame.set_end_stream()
1626 }
1627
1628 Ok(frame)
1629 }
1630 }
1631
1632 impl proto::Peer for Peer {
1633 type Poll = Response<()>;
1634
1635 const NAME: &'static str = "Client";
1636
1637 fn r#dyn() -> proto::DynPeer {
1638 proto::DynPeer::Client
1639 }
1640
1641 /*
1642 fn is_server() -> bool {
1643 false
1644 }
1645 */
1646
convert_poll_message( pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, ) -> Result<Self::Poll, Error>1647 fn convert_poll_message(
1648 pseudo: Pseudo,
1649 fields: HeaderMap,
1650 stream_id: StreamId,
1651 ) -> Result<Self::Poll, Error> {
1652 let mut b = Response::builder();
1653
1654 b = b.version(Version::HTTP_2);
1655
1656 if let Some(status) = pseudo.status {
1657 b = b.status(status);
1658 }
1659
1660 let mut response = match b.body(()) {
1661 Ok(response) => response,
1662 Err(_) => {
1663 // TODO: Should there be more specialized handling for different
1664 // kinds of errors
1665 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1666 }
1667 };
1668
1669 *response.headers_mut() = fields;
1670
1671 Ok(response)
1672 }
1673 }
1674