1 use std::io; 2 3 use crate::codec::UserError; 4 use crate::frame::{self, Reason, StreamId}; 5 use crate::proto::{self, Error, Initiator, PollReset}; 6 7 use self::Inner::*; 8 use self::Peer::*; 9 10 /// Represents the state of an H2 stream 11 /// 12 /// ```not_rust 13 /// +--------+ 14 /// send PP | | recv PP 15 /// ,--------| idle |--------. 16 /// / | | \ 17 /// v +--------+ v 18 /// +----------+ | +----------+ 19 /// | | | send H / | | 20 /// ,------| reserved | | recv H | reserved |------. 21 /// | | (local) | | | (remote) | | 22 /// | +----------+ v +----------+ | 23 /// | | +--------+ | | 24 /// | | recv ES | | send ES | | 25 /// | send H | ,-------| open |-------. | recv H | 26 /// | | / | | \ | | 27 /// | v v +--------+ v v | 28 /// | +----------+ | +----------+ | 29 /// | | half | | | half | | 30 /// | | closed | | send R / | closed | | 31 /// | | (remote) | | recv R | (local) | | 32 /// | +----------+ | +----------+ | 33 /// | | | | | 34 /// | | send ES / | recv ES / | | 35 /// | | send R / v send R / | | 36 /// | | recv R +--------+ recv R | | 37 /// | send R / `----------->| |<-----------' send R / | 38 /// | recv R | closed | recv R | 39 /// `----------------------->| |<----------------------' 40 /// +--------+ 41 /// 42 /// send: endpoint sends this frame 43 /// recv: endpoint receives this frame 44 /// 45 /// H: HEADERS frame (with implied CONTINUATIONs) 46 /// PP: PUSH_PROMISE frame (with implied CONTINUATIONs) 47 /// ES: END_STREAM flag 48 /// R: RST_STREAM frame 49 /// ``` 50 #[derive(Debug, Clone)] 51 pub struct State { 52 inner: Inner, 53 } 54 55 #[derive(Debug, Clone)] 56 enum Inner { 57 Idle, 58 // TODO: these states shouldn't count against concurrency limits: 59 ReservedLocal, 60 ReservedRemote, 61 Open { local: Peer, remote: Peer }, 62 HalfClosedLocal(Peer), // TODO: explicitly name this value 63 HalfClosedRemote(Peer), 64 Closed(Cause), 65 } 66 67 #[derive(Debug, Copy, Clone, Default)] 68 enum Peer { 69 #[default] 70 AwaitingHeaders, 71 Streaming, 72 } 73 74 #[derive(Debug, Clone)] 75 enum Cause { 76 EndStream, 77 Error(Error), 78 79 /// This indicates to the connection that a reset frame must be sent out 80 /// once the send queue has been flushed. 81 /// 82 /// Examples of when this could happen: 83 /// - User drops all references to a stream, so we want to CANCEL the it. 84 /// - Header block size was too large, so we want to REFUSE, possibly 85 /// after sending a 431 response frame. 86 ScheduledLibraryReset(Reason), 87 } 88 89 impl State { 90 /// Opens the send-half of a stream if it is not already open. send_open(&mut self, eos: bool) -> Result<(), UserError>91 pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> { 92 let local = Streaming; 93 94 self.inner = match self.inner { 95 Idle => { 96 if eos { 97 HalfClosedLocal(AwaitingHeaders) 98 } else { 99 Open { 100 local, 101 remote: AwaitingHeaders, 102 } 103 } 104 } 105 Open { 106 local: AwaitingHeaders, 107 remote, 108 } => { 109 if eos { 110 HalfClosedLocal(remote) 111 } else { 112 Open { local, remote } 113 } 114 } 115 HalfClosedRemote(AwaitingHeaders) | ReservedLocal => { 116 if eos { 117 Closed(Cause::EndStream) 118 } else { 119 HalfClosedRemote(local) 120 } 121 } 122 _ => { 123 // All other transitions result in a protocol error 124 return Err(UserError::UnexpectedFrameType); 125 } 126 }; 127 128 Ok(()) 129 } 130 131 /// Opens the receive-half of the stream when a HEADERS frame is received. 132 /// 133 /// Returns true if this transitions the state to Open. recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error>134 pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> { 135 let mut initial = false; 136 let eos = frame.is_end_stream(); 137 138 self.inner = match self.inner { 139 Idle => { 140 initial = true; 141 142 if eos { 143 HalfClosedRemote(AwaitingHeaders) 144 } else { 145 Open { 146 local: AwaitingHeaders, 147 remote: if frame.is_informational() { 148 tracing::trace!("skipping 1xx response headers"); 149 AwaitingHeaders 150 } else { 151 Streaming 152 }, 153 } 154 } 155 } 156 ReservedRemote => { 157 initial = true; 158 159 if eos { 160 Closed(Cause::EndStream) 161 } else if frame.is_informational() { 162 tracing::trace!("skipping 1xx response headers"); 163 ReservedRemote 164 } else { 165 HalfClosedLocal(Streaming) 166 } 167 } 168 Open { 169 local, 170 remote: AwaitingHeaders, 171 } => { 172 if eos { 173 HalfClosedRemote(local) 174 } else { 175 Open { 176 local, 177 remote: if frame.is_informational() { 178 tracing::trace!("skipping 1xx response headers"); 179 AwaitingHeaders 180 } else { 181 Streaming 182 }, 183 } 184 } 185 } 186 HalfClosedLocal(AwaitingHeaders) => { 187 if eos { 188 Closed(Cause::EndStream) 189 } else if frame.is_informational() { 190 tracing::trace!("skipping 1xx response headers"); 191 HalfClosedLocal(AwaitingHeaders) 192 } else { 193 HalfClosedLocal(Streaming) 194 } 195 } 196 ref state => { 197 // All other transitions result in a protocol error 198 proto_err!(conn: "recv_open: in unexpected state {:?}", state); 199 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); 200 } 201 }; 202 203 Ok(initial) 204 } 205 206 /// Transition from Idle -> ReservedRemote reserve_remote(&mut self) -> Result<(), Error>207 pub fn reserve_remote(&mut self) -> Result<(), Error> { 208 match self.inner { 209 Idle => { 210 self.inner = ReservedRemote; 211 Ok(()) 212 } 213 ref state => { 214 proto_err!(conn: "reserve_remote: in unexpected state {:?}", state); 215 Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) 216 } 217 } 218 } 219 220 /// Transition from Idle -> ReservedLocal reserve_local(&mut self) -> Result<(), UserError>221 pub fn reserve_local(&mut self) -> Result<(), UserError> { 222 match self.inner { 223 Idle => { 224 self.inner = ReservedLocal; 225 Ok(()) 226 } 227 _ => Err(UserError::UnexpectedFrameType), 228 } 229 } 230 231 /// Indicates that the remote side will not send more data to the local. recv_close(&mut self) -> Result<(), Error>232 pub fn recv_close(&mut self) -> Result<(), Error> { 233 match self.inner { 234 Open { local, .. } => { 235 // The remote side will continue to receive data. 236 tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local); 237 self.inner = HalfClosedRemote(local); 238 Ok(()) 239 } 240 HalfClosedLocal(..) => { 241 tracing::trace!("recv_close: HalfClosedLocal => Closed"); 242 self.inner = Closed(Cause::EndStream); 243 Ok(()) 244 } 245 ref state => { 246 proto_err!(conn: "recv_close: in unexpected state {:?}", state); 247 Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) 248 } 249 } 250 } 251 252 /// The remote explicitly sent a RST_STREAM. 253 /// 254 /// # Arguments 255 /// - `frame`: the received RST_STREAM frame. 256 /// - `queued`: true if this stream has frames in the pending send queue. recv_reset(&mut self, frame: frame::Reset, queued: bool)257 pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) { 258 match self.inner { 259 // If the stream is already in a `Closed` state, do nothing, 260 // provided that there are no frames still in the send queue. 261 Closed(..) if !queued => {} 262 // A notionally `Closed` stream may still have queued frames in 263 // the following cases: 264 // 265 // - if the cause is `Cause::Scheduled(..)` (i.e. we have not 266 // actually closed the stream yet). 267 // - if the cause is `Cause::EndStream`: we transition to this 268 // state when an EOS frame is *enqueued* (so that it's invalid 269 // to enqueue more frames), not when the EOS frame is *sent*; 270 // therefore, there may still be frames ahead of the EOS frame 271 // in the send queue. 272 // 273 // In either of these cases, we want to overwrite the stream's 274 // previous state with the received RST_STREAM, so that the queue 275 // will be cleared by `Prioritize::pop_frame`. 276 ref state => { 277 tracing::trace!( 278 "recv_reset; frame={:?}; state={:?}; queued={:?}", 279 frame, 280 state, 281 queued 282 ); 283 self.inner = Closed(Cause::Error(Error::remote_reset( 284 frame.stream_id(), 285 frame.reason(), 286 ))); 287 } 288 } 289 } 290 291 /// Handle a connection-level error. handle_error(&mut self, err: &proto::Error)292 pub fn handle_error(&mut self, err: &proto::Error) { 293 match self.inner { 294 Closed(..) => {} 295 _ => { 296 tracing::trace!("handle_error; err={:?}", err); 297 self.inner = Closed(Cause::Error(err.clone())); 298 } 299 } 300 } 301 recv_eof(&mut self)302 pub fn recv_eof(&mut self) { 303 match self.inner { 304 Closed(..) => {} 305 ref state => { 306 tracing::trace!("recv_eof; state={:?}", state); 307 self.inner = Closed(Cause::Error( 308 io::Error::new( 309 io::ErrorKind::BrokenPipe, 310 "stream closed because of a broken pipe", 311 ) 312 .into(), 313 )); 314 } 315 } 316 } 317 318 /// Indicates that the local side will not send more data to the local. send_close(&mut self)319 pub fn send_close(&mut self) { 320 match self.inner { 321 Open { remote, .. } => { 322 // The remote side will continue to receive data. 323 tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote); 324 self.inner = HalfClosedLocal(remote); 325 } 326 HalfClosedRemote(..) => { 327 tracing::trace!("send_close: HalfClosedRemote => Closed"); 328 self.inner = Closed(Cause::EndStream); 329 } 330 ref state => panic!("send_close: unexpected state {:?}", state), 331 } 332 } 333 334 /// Set the stream state to reset locally. set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator)335 pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) { 336 self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator))); 337 } 338 339 /// Set the stream state to a scheduled reset. set_scheduled_reset(&mut self, reason: Reason)340 pub fn set_scheduled_reset(&mut self, reason: Reason) { 341 debug_assert!(!self.is_closed()); 342 self.inner = Closed(Cause::ScheduledLibraryReset(reason)); 343 } 344 get_scheduled_reset(&self) -> Option<Reason>345 pub fn get_scheduled_reset(&self) -> Option<Reason> { 346 match self.inner { 347 Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason), 348 _ => None, 349 } 350 } 351 is_scheduled_reset(&self) -> bool352 pub fn is_scheduled_reset(&self) -> bool { 353 matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..))) 354 } 355 is_local_error(&self) -> bool356 pub fn is_local_error(&self) -> bool { 357 match self.inner { 358 Closed(Cause::Error(ref e)) => e.is_local(), 359 Closed(Cause::ScheduledLibraryReset(..)) => true, 360 _ => false, 361 } 362 } 363 is_remote_reset(&self) -> bool364 pub fn is_remote_reset(&self) -> bool { 365 matches!( 366 self.inner, 367 Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote))) 368 ) 369 } 370 371 /// Returns true if the stream is already reset. is_reset(&self) -> bool372 pub fn is_reset(&self) -> bool { 373 match self.inner { 374 Closed(Cause::EndStream) => false, 375 Closed(_) => true, 376 _ => false, 377 } 378 } 379 is_send_streaming(&self) -> bool380 pub fn is_send_streaming(&self) -> bool { 381 matches!( 382 self.inner, 383 Open { 384 local: Streaming, 385 .. 386 } | HalfClosedRemote(Streaming) 387 ) 388 } 389 390 /// Returns true when the stream is in a state to receive headers is_recv_headers(&self) -> bool391 pub fn is_recv_headers(&self) -> bool { 392 matches!( 393 self.inner, 394 Idle | Open { 395 remote: AwaitingHeaders, 396 .. 397 } | HalfClosedLocal(AwaitingHeaders) 398 | ReservedRemote 399 ) 400 } 401 is_recv_streaming(&self) -> bool402 pub fn is_recv_streaming(&self) -> bool { 403 matches!( 404 self.inner, 405 Open { 406 remote: Streaming, 407 .. 408 } | HalfClosedLocal(Streaming) 409 ) 410 } 411 is_closed(&self) -> bool412 pub fn is_closed(&self) -> bool { 413 matches!(self.inner, Closed(_)) 414 } 415 is_recv_closed(&self) -> bool416 pub fn is_recv_closed(&self) -> bool { 417 matches!( 418 self.inner, 419 Closed(..) | HalfClosedRemote(..) | ReservedLocal 420 ) 421 } 422 is_send_closed(&self) -> bool423 pub fn is_send_closed(&self) -> bool { 424 matches!( 425 self.inner, 426 Closed(..) | HalfClosedLocal(..) | ReservedRemote 427 ) 428 } 429 is_idle(&self) -> bool430 pub fn is_idle(&self) -> bool { 431 matches!(self.inner, Idle) 432 } 433 ensure_recv_open(&self) -> Result<bool, proto::Error>434 pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> { 435 // TODO: Is this correct? 436 match self.inner { 437 Closed(Cause::Error(ref e)) => Err(e.clone()), 438 Closed(Cause::ScheduledLibraryReset(reason)) => { 439 Err(proto::Error::library_go_away(reason)) 440 } 441 Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), 442 _ => Ok(true), 443 } 444 } 445 446 /// Returns a reason if the stream has been reset. ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error>447 pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> { 448 match self.inner { 449 Closed(Cause::Error(Error::Reset(_, reason, _))) 450 | Closed(Cause::Error(Error::GoAway(_, reason, _))) 451 | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)), 452 Closed(Cause::Error(ref e)) => Err(e.clone().into()), 453 Open { 454 local: Streaming, .. 455 } 456 | HalfClosedRemote(Streaming) => match mode { 457 PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()), 458 PollReset::Streaming => Ok(None), 459 }, 460 _ => Ok(None), 461 } 462 } 463 } 464 465 impl Default for State { default() -> State466 fn default() -> State { 467 State { inner: Inner::Idle } 468 } 469 } 470