1 use crate::codec::UserError;
2 use crate::frame::{Reason, StreamId};
3 use crate::{client, server};
4 
5 use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE;
6 use crate::proto::*;
7 
8 use bytes::Bytes;
9 use futures_core::Stream;
10 use std::io;
11 use std::marker::PhantomData;
12 use std::pin::Pin;
13 use std::task::{Context, Poll};
14 use std::time::Duration;
15 use tokio::io::AsyncRead;
16 
17 /// An H2 connection
18 #[derive(Debug)]
19 pub(crate) struct Connection<T, P, B: Buf = Bytes>
20 where
21     P: Peer,
22 {
23     /// Read / write frame values
24     codec: Codec<T, Prioritized<B>>,
25 
26     inner: ConnectionInner<P, B>,
27 }
28 
29 // Extracted part of `Connection` which does not depend on `T`. Reduces the amount of duplicated
30 // method instantiations.
31 #[derive(Debug)]
32 struct ConnectionInner<P, B: Buf = Bytes>
33 where
34     P: Peer,
35 {
36     /// Tracks the connection level state transitions.
37     state: State,
38 
39     /// An error to report back once complete.
40     ///
41     /// This exists separately from State in order to support
42     /// graceful shutdown.
43     error: Option<frame::GoAway>,
44 
45     /// Pending GOAWAY frames to write.
46     go_away: GoAway,
47 
48     /// Ping/pong handler
49     ping_pong: PingPong,
50 
51     /// Connection settings
52     settings: Settings,
53 
54     /// Stream state handler
55     streams: Streams<B, P>,
56 
57     /// A `tracing` span tracking the lifetime of the connection.
58     span: tracing::Span,
59 
60     /// Client or server
61     _phantom: PhantomData<P>,
62 }
63 
64 struct DynConnection<'a, B: Buf = Bytes> {
65     state: &'a mut State,
66 
67     go_away: &'a mut GoAway,
68 
69     streams: DynStreams<'a, B>,
70 
71     error: &'a mut Option<frame::GoAway>,
72 
73     ping_pong: &'a mut PingPong,
74 }
75 
76 #[derive(Debug, Clone)]
77 pub(crate) struct Config {
78     pub next_stream_id: StreamId,
79     pub initial_max_send_streams: usize,
80     pub max_send_buffer_size: usize,
81     pub reset_stream_duration: Duration,
82     pub reset_stream_max: usize,
83     pub remote_reset_stream_max: usize,
84     pub local_error_reset_streams_max: Option<usize>,
85     pub settings: frame::Settings,
86 }
87 
88 #[derive(Debug)]
89 enum State {
90     /// Currently open in a sane state
91     Open,
92 
93     /// The codec must be flushed
94     Closing(Reason, Initiator),
95 
96     /// In a closed state
97     Closed(Reason, Initiator),
98 }
99 
100 impl<T, P, B> Connection<T, P, B>
101 where
102     T: AsyncRead + AsyncWrite + Unpin,
103     P: Peer,
104     B: Buf,
105 {
new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B>106     pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> {
107         fn streams_config(config: &Config) -> streams::Config {
108             streams::Config {
109                 initial_max_send_streams: config.initial_max_send_streams,
110                 local_max_buffer_size: config.max_send_buffer_size,
111                 local_next_stream_id: config.next_stream_id,
112                 local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
113                 extended_connect_protocol_enabled: config
114                     .settings
115                     .is_extended_connect_protocol_enabled()
116                     .unwrap_or(false),
117                 local_reset_duration: config.reset_stream_duration,
118                 local_reset_max: config.reset_stream_max,
119                 remote_reset_max: config.remote_reset_stream_max,
120                 remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
121                 remote_max_initiated: config
122                     .settings
123                     .max_concurrent_streams()
124                     .map(|max| max as usize),
125                 local_max_error_reset_streams: config.local_error_reset_streams_max,
126             }
127         }
128         let streams = Streams::new(streams_config(&config));
129         Connection {
130             codec,
131             inner: ConnectionInner {
132                 state: State::Open,
133                 error: None,
134                 go_away: GoAway::new(),
135                 ping_pong: PingPong::new(),
136                 settings: Settings::new(config.settings),
137                 streams,
138                 span: tracing::debug_span!("Connection", peer = %P::NAME),
139                 _phantom: PhantomData,
140             },
141         }
142     }
143 
144     /// connection flow control
set_target_window_size(&mut self, size: WindowSize)145     pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
146         let _res = self.inner.streams.set_target_connection_window_size(size);
147         // TODO: proper error handling
148         debug_assert!(_res.is_ok());
149     }
150 
151     /// Send a new SETTINGS frame with an updated initial window size.
set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError>152     pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
153         let mut settings = frame::Settings::default();
154         settings.set_initial_window_size(Some(size));
155         self.inner.settings.send_settings(settings)
156     }
157 
158     /// Send a new SETTINGS frame with extended CONNECT protocol enabled.
set_enable_connect_protocol(&mut self) -> Result<(), UserError>159     pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> {
160         let mut settings = frame::Settings::default();
161         settings.set_enable_connect_protocol(Some(1));
162         self.inner.settings.send_settings(settings)
163     }
164 
165     /// Returns the maximum number of concurrent streams that may be initiated
166     /// by this peer.
max_send_streams(&self) -> usize167     pub(crate) fn max_send_streams(&self) -> usize {
168         self.inner.streams.max_send_streams()
169     }
170 
171     /// Returns the maximum number of concurrent streams that may be initiated
172     /// by the remote peer.
max_recv_streams(&self) -> usize173     pub(crate) fn max_recv_streams(&self) -> usize {
174         self.inner.streams.max_recv_streams()
175     }
176 
177     #[cfg(feature = "unstable")]
num_wired_streams(&self) -> usize178     pub fn num_wired_streams(&self) -> usize {
179         self.inner.streams.num_wired_streams()
180     }
181 
182     /// Returns `Ready` when the connection is ready to receive a frame.
183     ///
184     /// Returns `Error` as this may raise errors that are caused by delayed
185     /// processing of received frames.
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>>186     fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
187         let _e = self.inner.span.enter();
188         let span = tracing::trace_span!("poll_ready");
189         let _e = span.enter();
190         // The order of these calls don't really matter too much
191         ready!(self.inner.ping_pong.send_pending_pong(cx, &mut self.codec))?;
192         ready!(self.inner.ping_pong.send_pending_ping(cx, &mut self.codec))?;
193         ready!(self
194             .inner
195             .settings
196             .poll_send(cx, &mut self.codec, &mut self.inner.streams))?;
197         ready!(self.inner.streams.send_pending_refusal(cx, &mut self.codec))?;
198 
199         Poll::Ready(Ok(()))
200     }
201 
202     /// Send any pending GOAWAY frames.
203     ///
204     /// This will return `Some(reason)` if the connection should be closed
205     /// afterwards. If this is a graceful shutdown, this returns `None`.
poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>>206     fn poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>> {
207         self.inner.go_away.send_pending_go_away(cx, &mut self.codec)
208     }
209 
go_away_from_user(&mut self, e: Reason)210     pub fn go_away_from_user(&mut self, e: Reason) {
211         self.inner.as_dyn().go_away_from_user(e)
212     }
213 
take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error>214     fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> {
215         let (debug_data, theirs) = self
216             .inner
217             .error
218             .take()
219             .as_ref()
220             .map_or((Bytes::new(), Reason::NO_ERROR), |frame| {
221                 (frame.debug_data().clone(), frame.reason())
222             });
223 
224         match (ours, theirs) {
225             (Reason::NO_ERROR, Reason::NO_ERROR) => Ok(()),
226             (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)),
227             // If both sides reported an error, give their
228             // error back to th user. We assume our error
229             // was a consequence of their error, and less
230             // important.
231             (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)),
232         }
233     }
234 
235     /// Closes the connection by transitioning to a GOAWAY state
236     /// iff there are no streams or references
maybe_close_connection_if_no_streams(&mut self)237     pub fn maybe_close_connection_if_no_streams(&mut self) {
238         // If we poll() and realize that there are no streams or references
239         // then we can close the connection by transitioning to GOAWAY
240         if !self.inner.streams.has_streams_or_other_references() {
241             self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
242         }
243     }
244 
take_user_pings(&mut self) -> Option<UserPings>245     pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> {
246         self.inner.ping_pong.take_user_pings()
247     }
248 
249     /// Advances the internal state of the connection.
poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>>250     pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
251         // XXX(eliza): cloning the span is unfortunately necessary here in
252         // order to placate the borrow checker — `self` is mutably borrowed by
253         // `poll2`, which means that we can't borrow `self.span` to enter it.
254         // The clone is just an atomic ref bump.
255         let span = self.inner.span.clone();
256         let _e = span.enter();
257         let span = tracing::trace_span!("poll");
258         let _e = span.enter();
259 
260         loop {
261             tracing::trace!(connection.state = ?self.inner.state);
262             // TODO: probably clean up this glob of code
263             match self.inner.state {
264                 // When open, continue to poll a frame
265                 State::Open => {
266                     let result = match self.poll2(cx) {
267                         Poll::Ready(result) => result,
268                         // The connection is not ready to make progress
269                         Poll::Pending => {
270                             // Ensure all window updates have been sent.
271                             //
272                             // This will also handle flushing `self.codec`
273                             ready!(self.inner.streams.poll_complete(cx, &mut self.codec))?;
274 
275                             if (self.inner.error.is_some()
276                                 || self.inner.go_away.should_close_on_idle())
277                                 && !self.inner.streams.has_streams()
278                             {
279                                 self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
280                                 continue;
281                             }
282 
283                             return Poll::Pending;
284                         }
285                     };
286 
287                     self.inner.as_dyn().handle_poll2_result(result)?
288                 }
289                 State::Closing(reason, initiator) => {
290                     tracing::trace!("connection closing after flush");
291                     // Flush/shutdown the codec
292                     ready!(self.codec.shutdown(cx))?;
293 
294                     // Transition the state to error
295                     self.inner.state = State::Closed(reason, initiator);
296                 }
297                 State::Closed(reason, initiator) => {
298                     return Poll::Ready(self.take_error(reason, initiator));
299                 }
300             }
301         }
302     }
303 
poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>>304     fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
305         // This happens outside of the loop to prevent needing to do a clock
306         // check and then comparison of the queue possibly multiple times a
307         // second (and thus, the clock wouldn't have changed enough to matter).
308         self.clear_expired_reset_streams();
309 
310         loop {
311             // First, ensure that the `Connection` is able to receive a frame
312             //
313             // The order here matters:
314             // - poll_go_away may buffer a graceful shutdown GOAWAY frame
315             // - If it has, we've also added a PING to be sent in poll_ready
316             if let Some(reason) = ready!(self.poll_go_away(cx)?) {
317                 if self.inner.go_away.should_close_now() {
318                     if self.inner.go_away.is_user_initiated() {
319                         // A user initiated abrupt shutdown shouldn't return
320                         // the same error back to the user.
321                         return Poll::Ready(Ok(()));
322                     } else {
323                         return Poll::Ready(Err(Error::library_go_away(reason)));
324                     }
325                 }
326                 // Only NO_ERROR should be waiting for idle
327                 debug_assert_eq!(
328                     reason,
329                     Reason::NO_ERROR,
330                     "graceful GOAWAY should be NO_ERROR"
331                 );
332             }
333             ready!(self.poll_ready(cx))?;
334 
335             match self
336                 .inner
337                 .as_dyn()
338                 .recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))?
339             {
340                 ReceivedFrame::Settings(frame) => {
341                     self.inner.settings.recv_settings(
342                         frame,
343                         &mut self.codec,
344                         &mut self.inner.streams,
345                     )?;
346                 }
347                 ReceivedFrame::Continue => (),
348                 ReceivedFrame::Done => {
349                     return Poll::Ready(Ok(()));
350                 }
351             }
352         }
353     }
354 
clear_expired_reset_streams(&mut self)355     fn clear_expired_reset_streams(&mut self) {
356         self.inner.streams.clear_expired_reset_streams();
357     }
358 }
359 
360 impl<P, B> ConnectionInner<P, B>
361 where
362     P: Peer,
363     B: Buf,
364 {
as_dyn(&mut self) -> DynConnection<'_, B>365     fn as_dyn(&mut self) -> DynConnection<'_, B> {
366         let ConnectionInner {
367             state,
368             go_away,
369             streams,
370             error,
371             ping_pong,
372             ..
373         } = self;
374         let streams = streams.as_dyn();
375         DynConnection {
376             state,
377             go_away,
378             streams,
379             error,
380             ping_pong,
381         }
382     }
383 }
384 
385 impl<B> DynConnection<'_, B>
386 where
387     B: Buf,
388 {
go_away(&mut self, id: StreamId, e: Reason)389     fn go_away(&mut self, id: StreamId, e: Reason) {
390         let frame = frame::GoAway::new(id, e);
391         self.streams.send_go_away(id);
392         self.go_away.go_away(frame);
393     }
394 
go_away_now(&mut self, e: Reason)395     fn go_away_now(&mut self, e: Reason) {
396         let last_processed_id = self.streams.last_processed_id();
397         let frame = frame::GoAway::new(last_processed_id, e);
398         self.go_away.go_away_now(frame);
399     }
400 
go_away_now_data(&mut self, e: Reason, data: Bytes)401     fn go_away_now_data(&mut self, e: Reason, data: Bytes) {
402         let last_processed_id = self.streams.last_processed_id();
403         let frame = frame::GoAway::with_debug_data(last_processed_id, e, data);
404         self.go_away.go_away_now(frame);
405     }
406 
go_away_from_user(&mut self, e: Reason)407     fn go_away_from_user(&mut self, e: Reason) {
408         let last_processed_id = self.streams.last_processed_id();
409         let frame = frame::GoAway::new(last_processed_id, e);
410         self.go_away.go_away_from_user(frame);
411 
412         // Notify all streams of reason we're abruptly closing.
413         self.streams.handle_error(Error::user_go_away(e));
414     }
415 
handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error>416     fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> {
417         match result {
418             // The connection has shutdown normally
419             Ok(()) => {
420                 *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library);
421                 Ok(())
422             }
423             // Attempting to read a frame resulted in a connection level
424             // error. This is handled by setting a GOAWAY frame followed by
425             // terminating the connection.
426             Err(Error::GoAway(debug_data, reason, initiator)) => {
427                 let e = Error::GoAway(debug_data.clone(), reason, initiator);
428                 tracing::debug!(error = ?e, "Connection::poll; connection error");
429 
430                 // We may have already sent a GOAWAY for this error,
431                 // if so, don't send another, just flush and close up.
432                 if self
433                     .go_away
434                     .going_away()
435                     .map_or(false, |frame| frame.reason() == reason)
436                 {
437                     tracing::trace!("    -> already going away");
438                     *self.state = State::Closing(reason, initiator);
439                     return Ok(());
440                 }
441 
442                 // Reset all active streams
443                 self.streams.handle_error(e);
444                 self.go_away_now_data(reason, debug_data);
445                 Ok(())
446             }
447             // Attempting to read a frame resulted in a stream level error.
448             // This is handled by resetting the frame then trying to read
449             // another frame.
450             Err(Error::Reset(id, reason, initiator)) => {
451                 debug_assert_eq!(initiator, Initiator::Library);
452                 tracing::trace!(?id, ?reason, "stream error");
453                 self.streams.send_reset(id, reason);
454                 Ok(())
455             }
456             // Attempting to read a frame resulted in an I/O error. All
457             // active streams must be reset.
458             //
459             // TODO: Are I/O errors recoverable?
460             Err(Error::Io(kind, inner)) => {
461                 tracing::debug!(error = ?kind, "Connection::poll; IO error");
462                 let e = Error::Io(kind, inner);
463 
464                 // Reset all active streams
465                 self.streams.handle_error(e.clone());
466 
467                 // Some client implementations drop the connections without notifying its peer
468                 // Attempting to read after the client dropped the connection results in UnexpectedEof
469                 // If as a server, we don't have anything more to send, just close the connection
470                 // without error
471                 //
472                 // See https://github.com/hyperium/hyper/issues/3427
473                 if self.streams.is_server()
474                     && self.streams.is_buffer_empty()
475                     && matches!(kind, io::ErrorKind::UnexpectedEof)
476                 {
477                     *self.state = State::Closed(Reason::NO_ERROR, Initiator::Library);
478                     return Ok(());
479                 }
480 
481                 // Return the error
482                 Err(e)
483             }
484         }
485     }
486 
recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error>487     fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> {
488         use crate::frame::Frame::*;
489         match frame {
490             Some(Headers(frame)) => {
491                 tracing::trace!(?frame, "recv HEADERS");
492                 self.streams.recv_headers(frame)?;
493             }
494             Some(Data(frame)) => {
495                 tracing::trace!(?frame, "recv DATA");
496                 self.streams.recv_data(frame)?;
497             }
498             Some(Reset(frame)) => {
499                 tracing::trace!(?frame, "recv RST_STREAM");
500                 self.streams.recv_reset(frame)?;
501             }
502             Some(PushPromise(frame)) => {
503                 tracing::trace!(?frame, "recv PUSH_PROMISE");
504                 self.streams.recv_push_promise(frame)?;
505             }
506             Some(Settings(frame)) => {
507                 tracing::trace!(?frame, "recv SETTINGS");
508                 return Ok(ReceivedFrame::Settings(frame));
509             }
510             Some(GoAway(frame)) => {
511                 tracing::trace!(?frame, "recv GOAWAY");
512                 // This should prevent starting new streams,
513                 // but should allow continuing to process current streams
514                 // until they are all EOS. Once they are, State should
515                 // transition to GoAway.
516                 self.streams.recv_go_away(&frame)?;
517                 *self.error = Some(frame);
518             }
519             Some(Ping(frame)) => {
520                 tracing::trace!(?frame, "recv PING");
521                 let status = self.ping_pong.recv_ping(frame);
522                 if status.is_shutdown() {
523                     assert!(
524                         self.go_away.is_going_away(),
525                         "received unexpected shutdown ping"
526                     );
527 
528                     let last_processed_id = self.streams.last_processed_id();
529                     self.go_away(last_processed_id, Reason::NO_ERROR);
530                 }
531             }
532             Some(WindowUpdate(frame)) => {
533                 tracing::trace!(?frame, "recv WINDOW_UPDATE");
534                 self.streams.recv_window_update(frame)?;
535             }
536             Some(Priority(frame)) => {
537                 tracing::trace!(?frame, "recv PRIORITY");
538                 // TODO: handle
539             }
540             None => {
541                 tracing::trace!("codec closed");
542                 self.streams.recv_eof(false).expect("mutex poisoned");
543                 return Ok(ReceivedFrame::Done);
544             }
545         }
546         Ok(ReceivedFrame::Continue)
547     }
548 }
549 
550 enum ReceivedFrame {
551     Settings(frame::Settings),
552     Continue,
553     Done,
554 }
555 
556 impl<T, B> Connection<T, client::Peer, B>
557 where
558     T: AsyncRead + AsyncWrite,
559     B: Buf,
560 {
streams(&self) -> &Streams<B, client::Peer>561     pub(crate) fn streams(&self) -> &Streams<B, client::Peer> {
562         &self.inner.streams
563     }
564 }
565 
566 impl<T, B> Connection<T, server::Peer, B>
567 where
568     T: AsyncRead + AsyncWrite + Unpin,
569     B: Buf,
570 {
next_incoming(&mut self) -> Option<StreamRef<B>>571     pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
572         self.inner.streams.next_incoming()
573     }
574 
575     // Graceful shutdown only makes sense for server peers.
go_away_gracefully(&mut self)576     pub fn go_away_gracefully(&mut self) {
577         if self.inner.go_away.is_going_away() {
578             // No reason to start a new one.
579             return;
580         }
581 
582         // According to http://httpwg.org/specs/rfc7540.html#GOAWAY:
583         //
584         // > A server that is attempting to gracefully shut down a connection
585         // > SHOULD send an initial GOAWAY frame with the last stream
586         // > identifier set to 2^31-1 and a NO_ERROR code. This signals to the
587         // > client that a shutdown is imminent and that initiating further
588         // > requests is prohibited. After allowing time for any in-flight
589         // > stream creation (at least one round-trip time), the server can
590         // > send another GOAWAY frame with an updated last stream identifier.
591         // > This ensures that a connection can be cleanly shut down without
592         // > losing requests.
593         self.inner.as_dyn().go_away(StreamId::MAX, Reason::NO_ERROR);
594 
595         // We take the advice of waiting 1 RTT literally, and wait
596         // for a pong before proceeding.
597         self.inner.ping_pong.ping_shutdown();
598     }
599 }
600 
601 impl<T, P, B> Drop for Connection<T, P, B>
602 where
603     P: Peer,
604     B: Buf,
605 {
drop(&mut self)606     fn drop(&mut self) {
607         // Ignore errors as this indicates that the mutex is poisoned.
608         let _ = self.inner.streams.recv_eof(true);
609     }
610 }
611