1 use super::*; 2 use crate::codec::UserError; 3 use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; 4 use crate::proto; 5 6 use http::{HeaderMap, Request, Response}; 7 8 use std::cmp::Ordering; 9 use std::io; 10 use std::task::{Context, Poll, Waker}; 11 use std::time::Instant; 12 13 #[derive(Debug)] 14 pub(super) struct Recv { 15 /// Initial window size of remote initiated streams 16 init_window_sz: WindowSize, 17 18 /// Connection level flow control governing received data 19 flow: FlowControl, 20 21 /// Amount of connection window capacity currently used by outstanding streams. 22 in_flight_data: WindowSize, 23 24 /// The lowest stream ID that is still idle 25 next_stream_id: Result<StreamId, StreamIdOverflow>, 26 27 /// The stream ID of the last processed stream 28 last_processed_id: StreamId, 29 30 /// Any streams with a higher ID are ignored. 31 /// 32 /// This starts as MAX, but is lowered when a GOAWAY is received. 33 /// 34 /// > After sending a GOAWAY frame, the sender can discard frames for 35 /// > streams initiated by the receiver with identifiers higher than 36 /// > the identified last stream. 37 max_stream_id: StreamId, 38 39 /// Streams that have pending window updates 40 pending_window_updates: store::Queue<stream::NextWindowUpdate>, 41 42 /// New streams to be accepted 43 pending_accept: store::Queue<stream::NextAccept>, 44 45 /// Locally reset streams that should be reaped when they expire 46 pending_reset_expired: store::Queue<stream::NextResetExpire>, 47 48 /// How long locally reset streams should ignore received frames 49 reset_duration: Duration, 50 51 /// Holds frames that are waiting to be read 52 buffer: Buffer<Event>, 53 54 /// Refused StreamId, this represents a frame that must be sent out. 55 refused: Option<StreamId>, 56 57 /// If push promises are allowed to be received. 58 is_push_enabled: bool, 59 60 /// If extended connect protocol is enabled. 61 is_extended_connect_protocol_enabled: bool, 62 } 63 64 #[derive(Debug)] 65 pub(super) enum Event { 66 Headers(peer::PollMessage), 67 Data(Bytes), 68 Trailers(HeaderMap), 69 } 70 71 #[derive(Debug)] 72 pub(super) enum RecvHeaderBlockError<T> { 73 Oversize(T), 74 State(Error), 75 } 76 77 #[derive(Debug)] 78 pub(crate) enum Open { 79 PushPromise, 80 Headers, 81 } 82 83 impl Recv { new(peer: peer::Dyn, config: &Config) -> Self84 pub fn new(peer: peer::Dyn, config: &Config) -> Self { 85 let next_stream_id = if peer.is_server() { 1 } else { 2 }; 86 87 let mut flow = FlowControl::new(); 88 89 // connections always have the default window size, regardless of 90 // settings 91 flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE) 92 .expect("invalid initial remote window size"); 93 flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap(); 94 95 Recv { 96 init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, 97 flow, 98 in_flight_data: 0 as WindowSize, 99 next_stream_id: Ok(next_stream_id.into()), 100 pending_window_updates: store::Queue::new(), 101 last_processed_id: StreamId::ZERO, 102 max_stream_id: StreamId::MAX, 103 pending_accept: store::Queue::new(), 104 pending_reset_expired: store::Queue::new(), 105 reset_duration: config.local_reset_duration, 106 buffer: Buffer::new(), 107 refused: None, 108 is_push_enabled: config.local_push_enabled, 109 is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled, 110 } 111 } 112 113 /// Returns the initial receive window size init_window_sz(&self) -> WindowSize114 pub fn init_window_sz(&self) -> WindowSize { 115 self.init_window_sz 116 } 117 118 /// Returns the ID of the last processed stream last_processed_id(&self) -> StreamId119 pub fn last_processed_id(&self) -> StreamId { 120 self.last_processed_id 121 } 122 123 /// Update state reflecting a new, remotely opened stream 124 /// 125 /// Returns the stream state if successful. `None` if refused open( &mut self, id: StreamId, mode: Open, counts: &mut Counts, ) -> Result<Option<StreamId>, Error>126 pub fn open( 127 &mut self, 128 id: StreamId, 129 mode: Open, 130 counts: &mut Counts, 131 ) -> Result<Option<StreamId>, Error> { 132 assert!(self.refused.is_none()); 133 134 counts.peer().ensure_can_open(id, mode)?; 135 136 let next_id = self.next_stream_id()?; 137 if id < next_id { 138 proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id); 139 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); 140 } 141 142 self.next_stream_id = id.next_id(); 143 144 if !counts.can_inc_num_recv_streams() { 145 self.refused = Some(id); 146 return Ok(None); 147 } 148 149 Ok(Some(id)) 150 } 151 152 /// Transition the stream state based on receiving headers 153 /// 154 /// The caller ensures that the frame represents headers and not trailers. recv_headers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, counts: &mut Counts, ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>>155 pub fn recv_headers( 156 &mut self, 157 frame: frame::Headers, 158 stream: &mut store::Ptr, 159 counts: &mut Counts, 160 ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> { 161 tracing::trace!("opening stream; init_window={}", self.init_window_sz); 162 let is_initial = stream.state.recv_open(&frame)?; 163 164 if is_initial { 165 // TODO: be smarter about this logic 166 if frame.stream_id() > self.last_processed_id { 167 self.last_processed_id = frame.stream_id(); 168 } 169 170 // Increment the number of concurrent streams 171 counts.inc_num_recv_streams(stream); 172 } 173 174 if !stream.content_length.is_head() { 175 use super::stream::ContentLength; 176 use http::header; 177 178 if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) { 179 let content_length = match frame::parse_u64(content_length.as_bytes()) { 180 Ok(v) => v, 181 Err(_) => { 182 proto_err!(stream: "could not parse content-length; stream={:?}", stream.id); 183 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); 184 } 185 }; 186 187 stream.content_length = ContentLength::Remaining(content_length); 188 } 189 } 190 191 if frame.is_over_size() { 192 // A frame is over size if the decoded header block was bigger than 193 // SETTINGS_MAX_HEADER_LIST_SIZE. 194 // 195 // > A server that receives a larger header block than it is willing 196 // > to handle can send an HTTP 431 (Request Header Fields Too 197 // > Large) status code [RFC6585]. A client can discard responses 198 // > that it cannot process. 199 // 200 // So, if peer is a server, we'll send a 431. In either case, 201 // an error is recorded, which will send a REFUSED_STREAM, 202 // since we don't want any of the data frames either. 203 tracing::debug!( 204 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \ 205 recv_headers: frame is over size; stream={:?}", 206 stream.id 207 ); 208 return if counts.peer().is_server() && is_initial { 209 let mut res = frame::Headers::new( 210 stream.id, 211 frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE), 212 HeaderMap::new(), 213 ); 214 res.set_end_stream(); 215 Err(RecvHeaderBlockError::Oversize(Some(res))) 216 } else { 217 Err(RecvHeaderBlockError::Oversize(None)) 218 }; 219 } 220 221 let stream_id = frame.stream_id(); 222 let (pseudo, fields) = frame.into_parts(); 223 224 if pseudo.protocol.is_some() 225 && counts.peer().is_server() 226 && !self.is_extended_connect_protocol_enabled 227 { 228 proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id); 229 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); 230 } 231 232 if pseudo.status.is_some() && counts.peer().is_server() { 233 proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id); 234 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); 235 } 236 237 if !pseudo.is_informational() { 238 let message = counts 239 .peer() 240 .convert_poll_message(pseudo, fields, stream_id)?; 241 242 // Push the frame onto the stream's recv buffer 243 stream 244 .pending_recv 245 .push_back(&mut self.buffer, Event::Headers(message)); 246 stream.notify_recv(); 247 248 // Only servers can receive a headers frame that initiates the stream. 249 // This is verified in `Streams` before calling this function. 250 if counts.peer().is_server() { 251 // Correctness: never push a stream to `pending_accept` without having the 252 // corresponding headers frame pushed to `stream.pending_recv`. 253 self.pending_accept.push(stream); 254 } 255 } 256 257 Ok(()) 258 } 259 260 /// Called by the server to get the request 261 /// 262 /// # Panics 263 /// 264 /// Panics if `stream.pending_recv` has no `Event::Headers` queued. 265 /// take_request(&mut self, stream: &mut store::Ptr) -> Request<()>266 pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> { 267 use super::peer::PollMessage::*; 268 269 match stream.pending_recv.pop_front(&mut self.buffer) { 270 Some(Event::Headers(Server(request))) => request, 271 _ => unreachable!("server stream queue must start with Headers"), 272 } 273 } 274 275 /// Called by the client to get pushed response poll_pushed( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>>276 pub fn poll_pushed( 277 &mut self, 278 cx: &Context, 279 stream: &mut store::Ptr, 280 ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> { 281 use super::peer::PollMessage::*; 282 283 let mut ppp = stream.pending_push_promises.take(); 284 let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| { 285 match pushed.pending_recv.pop_front(&mut self.buffer) { 286 Some(Event::Headers(Server(headers))) => (headers, pushed.key()), 287 // When frames are pushed into the queue, it is verified that 288 // the first frame is a HEADERS frame. 289 _ => panic!("Headers not set on pushed stream"), 290 } 291 }); 292 stream.pending_push_promises = ppp; 293 if let Some(p) = pushed { 294 Poll::Ready(Some(Ok(p))) 295 } else { 296 let is_open = stream.state.ensure_recv_open()?; 297 298 if is_open { 299 stream.recv_task = Some(cx.waker().clone()); 300 Poll::Pending 301 } else { 302 Poll::Ready(None) 303 } 304 } 305 } 306 307 /// Called by the client to get the response poll_response( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Result<Response<()>, proto::Error>>308 pub fn poll_response( 309 &mut self, 310 cx: &Context, 311 stream: &mut store::Ptr, 312 ) -> Poll<Result<Response<()>, proto::Error>> { 313 use super::peer::PollMessage::*; 314 315 // If the buffer is not empty, then the first frame must be a HEADERS 316 // frame or the user violated the contract. 317 match stream.pending_recv.pop_front(&mut self.buffer) { 318 Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)), 319 Some(_) => panic!("poll_response called after response returned"), 320 None => { 321 if !stream.state.ensure_recv_open()? { 322 proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id); 323 return Poll::Ready(Err(Error::library_reset( 324 stream.id, 325 Reason::PROTOCOL_ERROR, 326 ))); 327 } 328 329 stream.recv_task = Some(cx.waker().clone()); 330 Poll::Pending 331 } 332 } 333 } 334 335 /// Transition the stream based on receiving trailers recv_trailers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, ) -> Result<(), Error>336 pub fn recv_trailers( 337 &mut self, 338 frame: frame::Headers, 339 stream: &mut store::Ptr, 340 ) -> Result<(), Error> { 341 // Transition the state 342 stream.state.recv_close()?; 343 344 if stream.ensure_content_length_zero().is_err() { 345 proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id); 346 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); 347 } 348 349 let trailers = frame.into_fields(); 350 351 // Push the frame onto the stream's recv buffer 352 stream 353 .pending_recv 354 .push_back(&mut self.buffer, Event::Trailers(trailers)); 355 stream.notify_recv(); 356 357 Ok(()) 358 } 359 360 /// Releases capacity of the connection release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>)361 pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) { 362 tracing::trace!( 363 "release_connection_capacity; size={}, connection in_flight_data={}", 364 capacity, 365 self.in_flight_data, 366 ); 367 368 // Decrement in-flight data 369 self.in_flight_data -= capacity; 370 371 // Assign capacity to connection 372 // TODO: proper error handling 373 let _res = self.flow.assign_capacity(capacity); 374 debug_assert!(_res.is_ok()); 375 376 if self.flow.unclaimed_capacity().is_some() { 377 if let Some(task) = task.take() { 378 task.wake(); 379 } 380 } 381 } 382 383 /// Releases capacity back to the connection & stream release_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, task: &mut Option<Waker>, ) -> Result<(), UserError>384 pub fn release_capacity( 385 &mut self, 386 capacity: WindowSize, 387 stream: &mut store::Ptr, 388 task: &mut Option<Waker>, 389 ) -> Result<(), UserError> { 390 tracing::trace!("release_capacity; size={}", capacity); 391 392 if capacity > stream.in_flight_recv_data { 393 return Err(UserError::ReleaseCapacityTooBig); 394 } 395 396 self.release_connection_capacity(capacity, task); 397 398 // Decrement in-flight data 399 stream.in_flight_recv_data -= capacity; 400 401 // Assign capacity to stream 402 // TODO: proper error handling 403 let _res = stream.recv_flow.assign_capacity(capacity); 404 debug_assert!(_res.is_ok()); 405 406 if stream.recv_flow.unclaimed_capacity().is_some() { 407 // Queue the stream for sending the WINDOW_UPDATE frame. 408 self.pending_window_updates.push(stream); 409 410 if let Some(task) = task.take() { 411 task.wake(); 412 } 413 } 414 415 Ok(()) 416 } 417 418 /// Release any unclaimed capacity for a closed stream. release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)419 pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { 420 debug_assert_eq!(stream.ref_count, 0); 421 422 if stream.in_flight_recv_data == 0 { 423 return; 424 } 425 426 tracing::trace!( 427 "auto-release closed stream ({:?}) capacity: {:?}", 428 stream.id, 429 stream.in_flight_recv_data, 430 ); 431 432 self.release_connection_capacity(stream.in_flight_recv_data, task); 433 stream.in_flight_recv_data = 0; 434 435 self.clear_recv_buffer(stream); 436 } 437 438 /// Set the "target" connection window size. 439 /// 440 /// By default, all new connections start with 64kb of window size. As 441 /// streams used and release capacity, we will send WINDOW_UPDATEs for the 442 /// connection to bring it back up to the initial "target". 443 /// 444 /// Setting a target means that we will try to tell the peer about 445 /// WINDOW_UPDATEs so the peer knows it has about `target` window to use 446 /// for the whole connection. 447 /// 448 /// The `task` is an optional parked task for the `Connection` that might 449 /// be blocked on needing more window capacity. set_target_connection_window( &mut self, target: WindowSize, task: &mut Option<Waker>, ) -> Result<(), Reason>450 pub fn set_target_connection_window( 451 &mut self, 452 target: WindowSize, 453 task: &mut Option<Waker>, 454 ) -> Result<(), Reason> { 455 tracing::trace!( 456 "set_target_connection_window; target={}; available={}, reserved={}", 457 target, 458 self.flow.available(), 459 self.in_flight_data, 460 ); 461 462 // The current target connection window is our `available` plus any 463 // in-flight data reserved by streams. 464 // 465 // Update the flow controller with the difference between the new 466 // target and the current target. 467 let current = self 468 .flow 469 .available() 470 .add(self.in_flight_data)? 471 .checked_size(); 472 if target > current { 473 self.flow.assign_capacity(target - current)?; 474 } else { 475 self.flow.claim_capacity(current - target)?; 476 } 477 478 // If changing the target capacity means we gained a bunch of capacity, 479 // enough that we went over the update threshold, then schedule sending 480 // a connection WINDOW_UPDATE. 481 if self.flow.unclaimed_capacity().is_some() { 482 if let Some(task) = task.take() { 483 task.wake(); 484 } 485 } 486 Ok(()) 487 } 488 apply_local_settings( &mut self, settings: &frame::Settings, store: &mut Store, ) -> Result<(), proto::Error>489 pub(crate) fn apply_local_settings( 490 &mut self, 491 settings: &frame::Settings, 492 store: &mut Store, 493 ) -> Result<(), proto::Error> { 494 if let Some(val) = settings.is_extended_connect_protocol_enabled() { 495 self.is_extended_connect_protocol_enabled = val; 496 } 497 498 if let Some(target) = settings.initial_window_size() { 499 let old_sz = self.init_window_sz; 500 self.init_window_sz = target; 501 502 tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,); 503 504 // Per RFC 7540 §6.9.2: 505 // 506 // In addition to changing the flow-control window for streams that are 507 // not yet active, a SETTINGS frame can alter the initial flow-control 508 // window size for streams with active flow-control windows (that is, 509 // streams in the "open" or "half-closed (remote)" state). When the 510 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust 511 // the size of all stream flow-control windows that it maintains by the 512 // difference between the new value and the old value. 513 // 514 // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available 515 // space in a flow-control window to become negative. A sender MUST 516 // track the negative flow-control window and MUST NOT send new 517 // flow-controlled frames until it receives WINDOW_UPDATE frames that 518 // cause the flow-control window to become positive. 519 520 match target.cmp(&old_sz) { 521 Ordering::Less => { 522 // We must decrease the (local) window on every open stream. 523 let dec = old_sz - target; 524 tracing::trace!("decrementing all windows; dec={}", dec); 525 526 store.try_for_each(|mut stream| { 527 stream 528 .recv_flow 529 .dec_recv_window(dec) 530 .map_err(proto::Error::library_go_away)?; 531 Ok::<_, proto::Error>(()) 532 })?; 533 } 534 Ordering::Greater => { 535 // We must increase the (local) window on every open stream. 536 let inc = target - old_sz; 537 tracing::trace!("incrementing all windows; inc={}", inc); 538 store.try_for_each(|mut stream| { 539 // XXX: Shouldn't the peer have already noticed our 540 // overflow and sent us a GOAWAY? 541 stream 542 .recv_flow 543 .inc_window(inc) 544 .map_err(proto::Error::library_go_away)?; 545 stream 546 .recv_flow 547 .assign_capacity(inc) 548 .map_err(proto::Error::library_go_away)?; 549 Ok::<_, proto::Error>(()) 550 })?; 551 } 552 Ordering::Equal => (), 553 } 554 } 555 556 Ok(()) 557 } 558 is_end_stream(&self, stream: &store::Ptr) -> bool559 pub fn is_end_stream(&self, stream: &store::Ptr) -> bool { 560 if !stream.state.is_recv_closed() { 561 return false; 562 } 563 564 stream.pending_recv.is_empty() 565 } 566 recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error>567 pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> { 568 let sz = frame.payload().len(); 569 570 // This should have been enforced at the codec::FramedRead layer, so 571 // this is just a sanity check. 572 assert!(sz <= MAX_WINDOW_SIZE as usize); 573 574 let sz = sz as WindowSize; 575 576 let is_ignoring_frame = stream.state.is_local_error(); 577 578 if !is_ignoring_frame && !stream.state.is_recv_streaming() { 579 // TODO: There are cases where this can be a stream error of 580 // STREAM_CLOSED instead... 581 582 // Receiving a DATA frame when not expecting one is a protocol 583 // error. 584 proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id); 585 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); 586 } 587 588 tracing::trace!( 589 "recv_data; size={}; connection={}; stream={}", 590 sz, 591 self.flow.window_size(), 592 stream.recv_flow.window_size() 593 ); 594 595 if is_ignoring_frame { 596 tracing::trace!( 597 "recv_data; frame ignored on locally reset {:?} for some time", 598 stream.id, 599 ); 600 return self.ignore_data(sz); 601 } 602 603 // Ensure that there is enough capacity on the connection before acting 604 // on the stream. 605 self.consume_connection_window(sz)?; 606 607 if stream.recv_flow.window_size() < sz { 608 // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE 609 // > A receiver MAY respond with a stream error (Section 5.4.2) or 610 // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if 611 // > it is unable to accept a frame. 612 // 613 // So, for violating the **stream** window, we can send either a 614 // stream or connection error. We've opted to send a stream 615 // error. 616 return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR)); 617 } 618 619 if stream.dec_content_length(frame.payload().len()).is_err() { 620 proto_err!(stream: 621 "recv_data: content-length overflow; stream={:?}; len={:?}", 622 stream.id, 623 frame.payload().len(), 624 ); 625 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); 626 } 627 628 if frame.is_end_stream() { 629 if stream.ensure_content_length_zero().is_err() { 630 proto_err!(stream: 631 "recv_data: content-length underflow; stream={:?}; len={:?}", 632 stream.id, 633 frame.payload().len(), 634 ); 635 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); 636 } 637 638 if stream.state.recv_close().is_err() { 639 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id); 640 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); 641 } 642 } 643 644 // Received a frame, but no one cared about it. fix issue#648 645 if !stream.is_recv { 646 tracing::trace!( 647 "recv_data; frame ignored on stream release {:?} for some time", 648 stream.id, 649 ); 650 self.release_connection_capacity(sz, &mut None); 651 return Ok(()); 652 } 653 654 // Update stream level flow control 655 stream 656 .recv_flow 657 .send_data(sz) 658 .map_err(proto::Error::library_go_away)?; 659 660 // Track the data as in-flight 661 stream.in_flight_recv_data += sz; 662 663 let event = Event::Data(frame.into_payload()); 664 665 // Push the frame onto the recv buffer 666 stream.pending_recv.push_back(&mut self.buffer, event); 667 stream.notify_recv(); 668 669 Ok(()) 670 } 671 ignore_data(&mut self, sz: WindowSize) -> Result<(), Error>672 pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> { 673 // Ensure that there is enough capacity on the connection... 674 self.consume_connection_window(sz)?; 675 676 // Since we are ignoring this frame, 677 // we aren't returning the frame to the user. That means they 678 // have no way to release the capacity back to the connection. So 679 // we have to release it automatically. 680 // 681 // This call doesn't send a WINDOW_UPDATE immediately, just marks 682 // the capacity as available to be reclaimed. When the available 683 // capacity meets a threshold, a WINDOW_UPDATE is then sent. 684 self.release_connection_capacity(sz, &mut None); 685 Ok(()) 686 } 687 consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error>688 pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> { 689 if self.flow.window_size() < sz { 690 tracing::debug!( 691 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});", 692 self.flow.window_size(), 693 sz, 694 ); 695 return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR)); 696 } 697 698 // Update connection level flow control 699 self.flow.send_data(sz).map_err(Error::library_go_away)?; 700 701 // Track the data as in-flight 702 self.in_flight_data += sz; 703 Ok(()) 704 } 705 recv_push_promise( &mut self, frame: frame::PushPromise, stream: &mut store::Ptr, ) -> Result<(), Error>706 pub fn recv_push_promise( 707 &mut self, 708 frame: frame::PushPromise, 709 stream: &mut store::Ptr, 710 ) -> Result<(), Error> { 711 stream.state.reserve_remote()?; 712 if frame.is_over_size() { 713 // A frame is over size if the decoded header block was bigger than 714 // SETTINGS_MAX_HEADER_LIST_SIZE. 715 // 716 // > A server that receives a larger header block than it is willing 717 // > to handle can send an HTTP 431 (Request Header Fields Too 718 // > Large) status code [RFC6585]. A client can discard responses 719 // > that it cannot process. 720 // 721 // So, if peer is a server, we'll send a 431. In either case, 722 // an error is recorded, which will send a REFUSED_STREAM, 723 // since we don't want any of the data frames either. 724 tracing::debug!( 725 "stream error REFUSED_STREAM -- recv_push_promise: \ 726 headers frame is over size; promised_id={:?};", 727 frame.promised_id(), 728 ); 729 return Err(Error::library_reset( 730 frame.promised_id(), 731 Reason::REFUSED_STREAM, 732 )); 733 } 734 735 let promised_id = frame.promised_id(); 736 let (pseudo, fields) = frame.into_parts(); 737 let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?; 738 739 if let Err(e) = frame::PushPromise::validate_request(&req) { 740 use PushPromiseHeaderError::*; 741 match e { 742 NotSafeAndCacheable => proto_err!( 743 stream: 744 "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}", 745 req.method(), 746 promised_id, 747 ), 748 InvalidContentLength(e) => proto_err!( 749 stream: 750 "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}", 751 e, 752 promised_id, 753 ), 754 } 755 return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR)); 756 } 757 758 use super::peer::PollMessage::*; 759 stream 760 .pending_recv 761 .push_back(&mut self.buffer, Event::Headers(Server(req))); 762 stream.notify_recv(); 763 Ok(()) 764 } 765 766 /// Ensures that `id` is not in the `Idle` state. ensure_not_idle(&self, id: StreamId) -> Result<(), Reason>767 pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { 768 if let Ok(next) = self.next_stream_id { 769 if id >= next { 770 tracing::debug!( 771 "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}", 772 id 773 ); 774 return Err(Reason::PROTOCOL_ERROR); 775 } 776 } 777 // if next_stream_id is overflowed, that's ok. 778 779 Ok(()) 780 } 781 782 /// Handle remote sending an explicit RST_STREAM. recv_reset( &mut self, frame: frame::Reset, stream: &mut Stream, counts: &mut Counts, ) -> Result<(), Error>783 pub fn recv_reset( 784 &mut self, 785 frame: frame::Reset, 786 stream: &mut Stream, 787 counts: &mut Counts, 788 ) -> Result<(), Error> { 789 // Reseting a stream that the user hasn't accepted is possible, 790 // but should be done with care. These streams will continue 791 // to take up memory in the accept queue, but will no longer be 792 // counted as "concurrent" streams. 793 // 794 // So, we have a separate limit for these. 795 // 796 // See https://github.com/hyperium/hyper/issues/2877 797 if stream.is_pending_accept { 798 if counts.can_inc_num_remote_reset_streams() { 799 counts.inc_num_remote_reset_streams(); 800 } else { 801 tracing::warn!( 802 "recv_reset; remotely-reset pending-accept streams reached limit ({:?})", 803 counts.max_remote_reset_streams(), 804 ); 805 return Err(Error::library_go_away_data( 806 Reason::ENHANCE_YOUR_CALM, 807 "too_many_resets", 808 )); 809 } 810 } 811 812 // Notify the stream 813 stream.state.recv_reset(frame, stream.is_pending_send); 814 815 stream.notify_send(); 816 stream.notify_recv(); 817 818 Ok(()) 819 } 820 821 /// Handle a connection-level error handle_error(&mut self, err: &proto::Error, stream: &mut Stream)822 pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) { 823 // Receive an error 824 stream.state.handle_error(err); 825 826 // If a receiver is waiting, notify it 827 stream.notify_send(); 828 stream.notify_recv(); 829 } 830 go_away(&mut self, last_processed_id: StreamId)831 pub fn go_away(&mut self, last_processed_id: StreamId) { 832 assert!(self.max_stream_id >= last_processed_id); 833 self.max_stream_id = last_processed_id; 834 } 835 recv_eof(&mut self, stream: &mut Stream)836 pub fn recv_eof(&mut self, stream: &mut Stream) { 837 stream.state.recv_eof(); 838 stream.notify_send(); 839 stream.notify_recv(); 840 } 841 clear_recv_buffer(&mut self, stream: &mut Stream)842 pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) { 843 while stream.pending_recv.pop_front(&mut self.buffer).is_some() { 844 // drop it 845 } 846 } 847 848 /// Get the max ID of streams we can receive. 849 /// 850 /// This gets lowered if we send a GOAWAY frame. max_stream_id(&self) -> StreamId851 pub fn max_stream_id(&self) -> StreamId { 852 self.max_stream_id 853 } 854 next_stream_id(&self) -> Result<StreamId, Error>855 pub fn next_stream_id(&self) -> Result<StreamId, Error> { 856 if let Ok(id) = self.next_stream_id { 857 Ok(id) 858 } else { 859 Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) 860 } 861 } 862 may_have_created_stream(&self, id: StreamId) -> bool863 pub fn may_have_created_stream(&self, id: StreamId) -> bool { 864 if let Ok(next_id) = self.next_stream_id { 865 // Peer::is_local_init should have been called beforehand 866 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); 867 id < next_id 868 } else { 869 true 870 } 871 } 872 maybe_reset_next_stream_id(&mut self, id: StreamId)873 pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) { 874 if let Ok(next_id) = self.next_stream_id { 875 // !Peer::is_local_init should have been called beforehand 876 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated()); 877 if id >= next_id { 878 self.next_stream_id = id.next_id(); 879 } 880 } 881 } 882 883 /// Returns true if the remote peer can reserve a stream with the given ID. ensure_can_reserve(&self) -> Result<(), Error>884 pub fn ensure_can_reserve(&self) -> Result<(), Error> { 885 if !self.is_push_enabled { 886 proto_err!(conn: "recv_push_promise: push is disabled"); 887 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); 888 } 889 890 Ok(()) 891 } 892 893 /// Add a locally reset stream to queue to be eventually reaped. enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts)894 pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { 895 if !stream.state.is_local_error() || stream.is_pending_reset_expiration() { 896 return; 897 } 898 899 tracing::trace!("enqueue_reset_expiration; {:?}", stream.id); 900 901 if counts.can_inc_num_reset_streams() { 902 counts.inc_num_reset_streams(); 903 self.pending_reset_expired.push(stream); 904 } 905 } 906 907 /// Send any pending refusals. send_pending_refusal<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,908 pub fn send_pending_refusal<T, B>( 909 &mut self, 910 cx: &mut Context, 911 dst: &mut Codec<T, Prioritized<B>>, 912 ) -> Poll<io::Result<()>> 913 where 914 T: AsyncWrite + Unpin, 915 B: Buf, 916 { 917 if let Some(stream_id) = self.refused { 918 ready!(dst.poll_ready(cx))?; 919 920 // Create the RST_STREAM frame 921 let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM); 922 923 // Buffer the frame 924 dst.buffer(frame.into()).expect("invalid RST_STREAM frame"); 925 } 926 927 self.refused = None; 928 929 Poll::Ready(Ok(())) 930 } 931 clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)932 pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { 933 if !self.pending_reset_expired.is_empty() { 934 let now = Instant::now(); 935 let reset_duration = self.reset_duration; 936 while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| { 937 let reset_at = stream.reset_at.expect("reset_at must be set if in queue"); 938 // rust-lang/rust#86470 tracks a bug in the standard library where `Instant` 939 // subtraction can panic (because, on some platforms, `Instant` isn't actually 940 // monotonic). We use a saturating operation to avoid this panic here. 941 now.saturating_duration_since(reset_at) > reset_duration 942 }) { 943 counts.transition_after(stream, true); 944 } 945 } 946 } 947 clear_queues( &mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts, )948 pub fn clear_queues( 949 &mut self, 950 clear_pending_accept: bool, 951 store: &mut Store, 952 counts: &mut Counts, 953 ) { 954 self.clear_stream_window_update_queue(store, counts); 955 self.clear_all_reset_streams(store, counts); 956 957 if clear_pending_accept { 958 self.clear_all_pending_accept(store, counts); 959 } 960 } 961 clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts)962 fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) { 963 while let Some(stream) = self.pending_window_updates.pop(store) { 964 counts.transition(stream, |_, stream| { 965 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id); 966 }) 967 } 968 } 969 970 /// Called on EOF clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)971 fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { 972 while let Some(stream) = self.pending_reset_expired.pop(store) { 973 counts.transition_after(stream, true); 974 } 975 } 976 clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts)977 fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) { 978 while let Some(stream) = self.pending_accept.pop(store) { 979 counts.transition_after(stream, false); 980 } 981 } 982 poll_complete<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,983 pub fn poll_complete<T, B>( 984 &mut self, 985 cx: &mut Context, 986 store: &mut Store, 987 counts: &mut Counts, 988 dst: &mut Codec<T, Prioritized<B>>, 989 ) -> Poll<io::Result<()>> 990 where 991 T: AsyncWrite + Unpin, 992 B: Buf, 993 { 994 // Send any pending connection level window updates 995 ready!(self.send_connection_window_update(cx, dst))?; 996 997 // Send any pending stream level window updates 998 ready!(self.send_stream_window_updates(cx, store, counts, dst))?; 999 1000 Poll::Ready(Ok(())) 1001 } 1002 1003 /// Send connection level window update send_connection_window_update<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,1004 fn send_connection_window_update<T, B>( 1005 &mut self, 1006 cx: &mut Context, 1007 dst: &mut Codec<T, Prioritized<B>>, 1008 ) -> Poll<io::Result<()>> 1009 where 1010 T: AsyncWrite + Unpin, 1011 B: Buf, 1012 { 1013 if let Some(incr) = self.flow.unclaimed_capacity() { 1014 let frame = frame::WindowUpdate::new(StreamId::zero(), incr); 1015 1016 // Ensure the codec has capacity 1017 ready!(dst.poll_ready(cx))?; 1018 1019 // Buffer the WINDOW_UPDATE frame 1020 dst.buffer(frame.into()) 1021 .expect("invalid WINDOW_UPDATE frame"); 1022 1023 // Update flow control 1024 self.flow 1025 .inc_window(incr) 1026 .expect("unexpected flow control state"); 1027 } 1028 1029 Poll::Ready(Ok(())) 1030 } 1031 1032 /// Send stream level window update send_stream_window_updates<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,1033 pub fn send_stream_window_updates<T, B>( 1034 &mut self, 1035 cx: &mut Context, 1036 store: &mut Store, 1037 counts: &mut Counts, 1038 dst: &mut Codec<T, Prioritized<B>>, 1039 ) -> Poll<io::Result<()>> 1040 where 1041 T: AsyncWrite + Unpin, 1042 B: Buf, 1043 { 1044 loop { 1045 // Ensure the codec has capacity 1046 ready!(dst.poll_ready(cx))?; 1047 1048 // Get the next stream 1049 let stream = match self.pending_window_updates.pop(store) { 1050 Some(stream) => stream, 1051 None => return Poll::Ready(Ok(())), 1052 }; 1053 1054 counts.transition(stream, |_, stream| { 1055 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id); 1056 debug_assert!(!stream.is_pending_window_update); 1057 1058 if !stream.state.is_recv_streaming() { 1059 // No need to send window updates on the stream if the stream is 1060 // no longer receiving data. 1061 // 1062 // TODO: is this correct? We could possibly send a window 1063 // update on a ReservedRemote stream if we already know 1064 // we want to stream the data faster... 1065 return; 1066 } 1067 1068 // TODO: de-dup 1069 if let Some(incr) = stream.recv_flow.unclaimed_capacity() { 1070 // Create the WINDOW_UPDATE frame 1071 let frame = frame::WindowUpdate::new(stream.id, incr); 1072 1073 // Buffer it 1074 dst.buffer(frame.into()) 1075 .expect("invalid WINDOW_UPDATE frame"); 1076 1077 // Update flow control 1078 stream 1079 .recv_flow 1080 .inc_window(incr) 1081 .expect("unexpected flow control state"); 1082 } 1083 }) 1084 } 1085 } 1086 next_incoming(&mut self, store: &mut Store) -> Option<store::Key>1087 pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> { 1088 self.pending_accept.pop(store).map(|ptr| ptr.key()) 1089 } 1090 poll_data( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<Bytes, proto::Error>>>1091 pub fn poll_data( 1092 &mut self, 1093 cx: &Context, 1094 stream: &mut Stream, 1095 ) -> Poll<Option<Result<Bytes, proto::Error>>> { 1096 match stream.pending_recv.pop_front(&mut self.buffer) { 1097 Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))), 1098 Some(event) => { 1099 // Frame is trailer 1100 stream.pending_recv.push_front(&mut self.buffer, event); 1101 1102 // Notify the recv task. This is done just in case 1103 // `poll_trailers` was called. 1104 // 1105 // It is very likely that `notify_recv` will just be a no-op (as 1106 // the task will be None), so this isn't really much of a 1107 // performance concern. It also means we don't have to track 1108 // state to see if `poll_trailers` was called before `poll_data` 1109 // returned `None`. 1110 stream.notify_recv(); 1111 1112 // No more data frames 1113 Poll::Ready(None) 1114 } 1115 None => self.schedule_recv(cx, stream), 1116 } 1117 } 1118 poll_trailers( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<HeaderMap, proto::Error>>>1119 pub fn poll_trailers( 1120 &mut self, 1121 cx: &Context, 1122 stream: &mut Stream, 1123 ) -> Poll<Option<Result<HeaderMap, proto::Error>>> { 1124 match stream.pending_recv.pop_front(&mut self.buffer) { 1125 Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))), 1126 Some(event) => { 1127 // Frame is not trailers.. not ready to poll trailers yet. 1128 stream.pending_recv.push_front(&mut self.buffer, event); 1129 1130 Poll::Pending 1131 } 1132 None => self.schedule_recv(cx, stream), 1133 } 1134 } 1135 schedule_recv<T>( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<T, proto::Error>>>1136 fn schedule_recv<T>( 1137 &mut self, 1138 cx: &Context, 1139 stream: &mut Stream, 1140 ) -> Poll<Option<Result<T, proto::Error>>> { 1141 if stream.state.ensure_recv_open()? { 1142 // Request to get notified once more frames arrive 1143 stream.recv_task = Some(cx.waker().clone()); 1144 Poll::Pending 1145 } else { 1146 // No more frames will be received 1147 Poll::Ready(None) 1148 } 1149 } 1150 } 1151 1152 // ===== impl Open ===== 1153 1154 impl Open { is_push_promise(&self) -> bool1155 pub fn is_push_promise(&self) -> bool { 1156 matches!(*self, Self::PushPromise) 1157 } 1158 } 1159 1160 // ===== impl RecvHeaderBlockError ===== 1161 1162 impl<T> From<Error> for RecvHeaderBlockError<T> { from(err: Error) -> Self1163 fn from(err: Error) -> Self { 1164 RecvHeaderBlockError::State(err) 1165 } 1166 } 1167