1 use std::io;
2 
3 use crate::codec::UserError;
4 use crate::frame::{self, Reason, StreamId};
5 use crate::proto::{self, Error, Initiator, PollReset};
6 
7 use self::Inner::*;
8 use self::Peer::*;
9 
10 /// Represents the state of an H2 stream
11 ///
12 /// ```not_rust
13 ///                              +--------+
14 ///                      send PP |        | recv PP
15 ///                     ,--------|  idle  |--------.
16 ///                    /         |        |         \
17 ///                   v          +--------+          v
18 ///            +----------+          |           +----------+
19 ///            |          |          | send H /  |          |
20 ///     ,------| reserved |          | recv H    | reserved |------.
21 ///     |      | (local)  |          |           | (remote) |      |
22 ///     |      +----------+          v           +----------+      |
23 ///     |          |             +--------+             |          |
24 ///     |          |     recv ES |        | send ES     |          |
25 ///     |   send H |     ,-------|  open  |-------.     | recv H   |
26 ///     |          |    /        |        |        \    |          |
27 ///     |          v   v         +--------+         v   v          |
28 ///     |      +----------+          |           +----------+      |
29 ///     |      |   half   |          |           |   half   |      |
30 ///     |      |  closed  |          | send R /  |  closed  |      |
31 ///     |      | (remote) |          | recv R    | (local)  |      |
32 ///     |      +----------+          |           +----------+      |
33 ///     |           |                |                 |           |
34 ///     |           | send ES /      |       recv ES / |           |
35 ///     |           | send R /       v        send R / |           |
36 ///     |           | recv R     +--------+   recv R   |           |
37 ///     | send R /  `----------->|        |<-----------'  send R / |
38 ///     | recv R                 | closed |               recv R   |
39 ///     `----------------------->|        |<----------------------'
40 ///                              +--------+
41 ///
42 ///        send:   endpoint sends this frame
43 ///        recv:   endpoint receives this frame
44 ///
45 ///        H:  HEADERS frame (with implied CONTINUATIONs)
46 ///        PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
47 ///        ES: END_STREAM flag
48 ///        R:  RST_STREAM frame
49 /// ```
50 #[derive(Debug, Clone)]
51 pub struct State {
52     inner: Inner,
53 }
54 
55 #[derive(Debug, Clone)]
56 enum Inner {
57     Idle,
58     // TODO: these states shouldn't count against concurrency limits:
59     ReservedLocal,
60     ReservedRemote,
61     Open { local: Peer, remote: Peer },
62     HalfClosedLocal(Peer), // TODO: explicitly name this value
63     HalfClosedRemote(Peer),
64     Closed(Cause),
65 }
66 
67 #[derive(Debug, Copy, Clone, Default)]
68 enum Peer {
69     #[default]
70     AwaitingHeaders,
71     Streaming,
72 }
73 
74 #[derive(Debug, Clone)]
75 enum Cause {
76     EndStream,
77     Error(Error),
78 
79     /// This indicates to the connection that a reset frame must be sent out
80     /// once the send queue has been flushed.
81     ///
82     /// Examples of when this could happen:
83     /// - User drops all references to a stream, so we want to CANCEL the it.
84     /// - Header block size was too large, so we want to REFUSE, possibly
85     ///   after sending a 431 response frame.
86     ScheduledLibraryReset(Reason),
87 }
88 
89 impl State {
90     /// Opens the send-half of a stream if it is not already open.
send_open(&mut self, eos: bool) -> Result<(), UserError>91     pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> {
92         let local = Streaming;
93 
94         self.inner = match self.inner {
95             Idle => {
96                 if eos {
97                     HalfClosedLocal(AwaitingHeaders)
98                 } else {
99                     Open {
100                         local,
101                         remote: AwaitingHeaders,
102                     }
103                 }
104             }
105             Open {
106                 local: AwaitingHeaders,
107                 remote,
108             } => {
109                 if eos {
110                     HalfClosedLocal(remote)
111                 } else {
112                     Open { local, remote }
113                 }
114             }
115             HalfClosedRemote(AwaitingHeaders) | ReservedLocal => {
116                 if eos {
117                     Closed(Cause::EndStream)
118                 } else {
119                     HalfClosedRemote(local)
120                 }
121             }
122             _ => {
123                 // All other transitions result in a protocol error
124                 return Err(UserError::UnexpectedFrameType);
125             }
126         };
127 
128         Ok(())
129     }
130 
131     /// Opens the receive-half of the stream when a HEADERS frame is received.
132     ///
133     /// Returns true if this transitions the state to Open.
recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error>134     pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> {
135         let mut initial = false;
136         let eos = frame.is_end_stream();
137 
138         self.inner = match self.inner {
139             Idle => {
140                 initial = true;
141 
142                 if eos {
143                     HalfClosedRemote(AwaitingHeaders)
144                 } else {
145                     Open {
146                         local: AwaitingHeaders,
147                         remote: if frame.is_informational() {
148                             tracing::trace!("skipping 1xx response headers");
149                             AwaitingHeaders
150                         } else {
151                             Streaming
152                         },
153                     }
154                 }
155             }
156             ReservedRemote => {
157                 initial = true;
158 
159                 if eos {
160                     Closed(Cause::EndStream)
161                 } else if frame.is_informational() {
162                     tracing::trace!("skipping 1xx response headers");
163                     ReservedRemote
164                 } else {
165                     HalfClosedLocal(Streaming)
166                 }
167             }
168             Open {
169                 local,
170                 remote: AwaitingHeaders,
171             } => {
172                 if eos {
173                     HalfClosedRemote(local)
174                 } else {
175                     Open {
176                         local,
177                         remote: if frame.is_informational() {
178                             tracing::trace!("skipping 1xx response headers");
179                             AwaitingHeaders
180                         } else {
181                             Streaming
182                         },
183                     }
184                 }
185             }
186             HalfClosedLocal(AwaitingHeaders) => {
187                 if eos {
188                     Closed(Cause::EndStream)
189                 } else if frame.is_informational() {
190                     tracing::trace!("skipping 1xx response headers");
191                     HalfClosedLocal(AwaitingHeaders)
192                 } else {
193                     HalfClosedLocal(Streaming)
194                 }
195             }
196             ref state => {
197                 // All other transitions result in a protocol error
198                 proto_err!(conn: "recv_open: in unexpected state {:?}", state);
199                 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
200             }
201         };
202 
203         Ok(initial)
204     }
205 
206     /// Transition from Idle -> ReservedRemote
reserve_remote(&mut self) -> Result<(), Error>207     pub fn reserve_remote(&mut self) -> Result<(), Error> {
208         match self.inner {
209             Idle => {
210                 self.inner = ReservedRemote;
211                 Ok(())
212             }
213             ref state => {
214                 proto_err!(conn: "reserve_remote: in unexpected state {:?}", state);
215                 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
216             }
217         }
218     }
219 
220     /// Transition from Idle -> ReservedLocal
reserve_local(&mut self) -> Result<(), UserError>221     pub fn reserve_local(&mut self) -> Result<(), UserError> {
222         match self.inner {
223             Idle => {
224                 self.inner = ReservedLocal;
225                 Ok(())
226             }
227             _ => Err(UserError::UnexpectedFrameType),
228         }
229     }
230 
231     /// Indicates that the remote side will not send more data to the local.
recv_close(&mut self) -> Result<(), Error>232     pub fn recv_close(&mut self) -> Result<(), Error> {
233         match self.inner {
234             Open { local, .. } => {
235                 // The remote side will continue to receive data.
236                 tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
237                 self.inner = HalfClosedRemote(local);
238                 Ok(())
239             }
240             HalfClosedLocal(..) => {
241                 tracing::trace!("recv_close: HalfClosedLocal => Closed");
242                 self.inner = Closed(Cause::EndStream);
243                 Ok(())
244             }
245             ref state => {
246                 proto_err!(conn: "recv_close: in unexpected state {:?}", state);
247                 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
248             }
249         }
250     }
251 
252     /// The remote explicitly sent a RST_STREAM.
253     ///
254     /// # Arguments
255     /// - `frame`: the received RST_STREAM frame.
256     /// - `queued`: true if this stream has frames in the pending send queue.
recv_reset(&mut self, frame: frame::Reset, queued: bool)257     pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) {
258         match self.inner {
259             // If the stream is already in a `Closed` state, do nothing,
260             // provided that there are no frames still in the send queue.
261             Closed(..) if !queued => {}
262             // A notionally `Closed` stream may still have queued frames in
263             // the following cases:
264             //
265             // - if the cause is `Cause::Scheduled(..)` (i.e. we have not
266             //   actually closed the stream yet).
267             // - if the cause is `Cause::EndStream`: we transition to this
268             //   state when an EOS frame is *enqueued* (so that it's invalid
269             //   to enqueue more frames), not when the EOS frame is *sent*;
270             //   therefore, there may still be frames ahead of the EOS frame
271             //   in the send queue.
272             //
273             // In either of these cases, we want to overwrite the stream's
274             // previous state with the received RST_STREAM, so that the queue
275             // will be cleared by `Prioritize::pop_frame`.
276             ref state => {
277                 tracing::trace!(
278                     "recv_reset; frame={:?}; state={:?}; queued={:?}",
279                     frame,
280                     state,
281                     queued
282                 );
283                 self.inner = Closed(Cause::Error(Error::remote_reset(
284                     frame.stream_id(),
285                     frame.reason(),
286                 )));
287             }
288         }
289     }
290 
291     /// Handle a connection-level error.
handle_error(&mut self, err: &proto::Error)292     pub fn handle_error(&mut self, err: &proto::Error) {
293         match self.inner {
294             Closed(..) => {}
295             _ => {
296                 tracing::trace!("handle_error; err={:?}", err);
297                 self.inner = Closed(Cause::Error(err.clone()));
298             }
299         }
300     }
301 
recv_eof(&mut self)302     pub fn recv_eof(&mut self) {
303         match self.inner {
304             Closed(..) => {}
305             ref state => {
306                 tracing::trace!("recv_eof; state={:?}", state);
307                 self.inner = Closed(Cause::Error(
308                     io::Error::new(
309                         io::ErrorKind::BrokenPipe,
310                         "stream closed because of a broken pipe",
311                     )
312                     .into(),
313                 ));
314             }
315         }
316     }
317 
318     /// Indicates that the local side will not send more data to the local.
send_close(&mut self)319     pub fn send_close(&mut self) {
320         match self.inner {
321             Open { remote, .. } => {
322                 // The remote side will continue to receive data.
323                 tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
324                 self.inner = HalfClosedLocal(remote);
325             }
326             HalfClosedRemote(..) => {
327                 tracing::trace!("send_close: HalfClosedRemote => Closed");
328                 self.inner = Closed(Cause::EndStream);
329             }
330             ref state => panic!("send_close: unexpected state {:?}", state),
331         }
332     }
333 
334     /// Set the stream state to reset locally.
set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator)335     pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) {
336         self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator)));
337     }
338 
339     /// Set the stream state to a scheduled reset.
set_scheduled_reset(&mut self, reason: Reason)340     pub fn set_scheduled_reset(&mut self, reason: Reason) {
341         debug_assert!(!self.is_closed());
342         self.inner = Closed(Cause::ScheduledLibraryReset(reason));
343     }
344 
get_scheduled_reset(&self) -> Option<Reason>345     pub fn get_scheduled_reset(&self) -> Option<Reason> {
346         match self.inner {
347             Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason),
348             _ => None,
349         }
350     }
351 
is_scheduled_reset(&self) -> bool352     pub fn is_scheduled_reset(&self) -> bool {
353         matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..)))
354     }
355 
is_local_error(&self) -> bool356     pub fn is_local_error(&self) -> bool {
357         match self.inner {
358             Closed(Cause::Error(ref e)) => e.is_local(),
359             Closed(Cause::ScheduledLibraryReset(..)) => true,
360             _ => false,
361         }
362     }
363 
is_remote_reset(&self) -> bool364     pub fn is_remote_reset(&self) -> bool {
365         matches!(
366             self.inner,
367             Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote)))
368         )
369     }
370 
371     /// Returns true if the stream is already reset.
is_reset(&self) -> bool372     pub fn is_reset(&self) -> bool {
373         match self.inner {
374             Closed(Cause::EndStream) => false,
375             Closed(_) => true,
376             _ => false,
377         }
378     }
379 
is_send_streaming(&self) -> bool380     pub fn is_send_streaming(&self) -> bool {
381         matches!(
382             self.inner,
383             Open {
384                 local: Streaming,
385                 ..
386             } | HalfClosedRemote(Streaming)
387         )
388     }
389 
390     /// Returns true when the stream is in a state to receive headers
is_recv_headers(&self) -> bool391     pub fn is_recv_headers(&self) -> bool {
392         matches!(
393             self.inner,
394             Idle | Open {
395                 remote: AwaitingHeaders,
396                 ..
397             } | HalfClosedLocal(AwaitingHeaders)
398                 | ReservedRemote
399         )
400     }
401 
is_recv_streaming(&self) -> bool402     pub fn is_recv_streaming(&self) -> bool {
403         matches!(
404             self.inner,
405             Open {
406                 remote: Streaming,
407                 ..
408             } | HalfClosedLocal(Streaming)
409         )
410     }
411 
is_closed(&self) -> bool412     pub fn is_closed(&self) -> bool {
413         matches!(self.inner, Closed(_))
414     }
415 
is_recv_closed(&self) -> bool416     pub fn is_recv_closed(&self) -> bool {
417         matches!(
418             self.inner,
419             Closed(..) | HalfClosedRemote(..) | ReservedLocal
420         )
421     }
422 
is_send_closed(&self) -> bool423     pub fn is_send_closed(&self) -> bool {
424         matches!(
425             self.inner,
426             Closed(..) | HalfClosedLocal(..) | ReservedRemote
427         )
428     }
429 
is_idle(&self) -> bool430     pub fn is_idle(&self) -> bool {
431         matches!(self.inner, Idle)
432     }
433 
ensure_recv_open(&self) -> Result<bool, proto::Error>434     pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
435         // TODO: Is this correct?
436         match self.inner {
437             Closed(Cause::Error(ref e)) => Err(e.clone()),
438             Closed(Cause::ScheduledLibraryReset(reason)) => {
439                 Err(proto::Error::library_go_away(reason))
440             }
441             Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
442             _ => Ok(true),
443         }
444     }
445 
446     /// Returns a reason if the stream has been reset.
ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error>447     pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
448         match self.inner {
449             Closed(Cause::Error(Error::Reset(_, reason, _)))
450             | Closed(Cause::Error(Error::GoAway(_, reason, _)))
451             | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)),
452             Closed(Cause::Error(ref e)) => Err(e.clone().into()),
453             Open {
454                 local: Streaming, ..
455             }
456             | HalfClosedRemote(Streaming) => match mode {
457                 PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()),
458                 PollReset::Streaming => Ok(None),
459             },
460             _ => Ok(None),
461         }
462     }
463 }
464 
465 impl Default for State {
default() -> State466     fn default() -> State {
467         State { inner: Inner::Idle }
468     }
469 }
470