1 use super::*;
2 use crate::codec::UserError;
3 use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4 use crate::proto;
5 
6 use http::{HeaderMap, Request, Response};
7 
8 use std::cmp::Ordering;
9 use std::io;
10 use std::task::{Context, Poll, Waker};
11 use std::time::Instant;
12 
13 #[derive(Debug)]
14 pub(super) struct Recv {
15     /// Initial window size of remote initiated streams
16     init_window_sz: WindowSize,
17 
18     /// Connection level flow control governing received data
19     flow: FlowControl,
20 
21     /// Amount of connection window capacity currently used by outstanding streams.
22     in_flight_data: WindowSize,
23 
24     /// The lowest stream ID that is still idle
25     next_stream_id: Result<StreamId, StreamIdOverflow>,
26 
27     /// The stream ID of the last processed stream
28     last_processed_id: StreamId,
29 
30     /// Any streams with a higher ID are ignored.
31     ///
32     /// This starts as MAX, but is lowered when a GOAWAY is received.
33     ///
34     /// > After sending a GOAWAY frame, the sender can discard frames for
35     /// > streams initiated by the receiver with identifiers higher than
36     /// > the identified last stream.
37     max_stream_id: StreamId,
38 
39     /// Streams that have pending window updates
40     pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41 
42     /// New streams to be accepted
43     pending_accept: store::Queue<stream::NextAccept>,
44 
45     /// Locally reset streams that should be reaped when they expire
46     pending_reset_expired: store::Queue<stream::NextResetExpire>,
47 
48     /// How long locally reset streams should ignore received frames
49     reset_duration: Duration,
50 
51     /// Holds frames that are waiting to be read
52     buffer: Buffer<Event>,
53 
54     /// Refused StreamId, this represents a frame that must be sent out.
55     refused: Option<StreamId>,
56 
57     /// If push promises are allowed to be received.
58     is_push_enabled: bool,
59 
60     /// If extended connect protocol is enabled.
61     is_extended_connect_protocol_enabled: bool,
62 }
63 
64 #[derive(Debug)]
65 pub(super) enum Event {
66     Headers(peer::PollMessage),
67     Data(Bytes),
68     Trailers(HeaderMap),
69 }
70 
71 #[derive(Debug)]
72 pub(super) enum RecvHeaderBlockError<T> {
73     Oversize(T),
74     State(Error),
75 }
76 
77 #[derive(Debug)]
78 pub(crate) enum Open {
79     PushPromise,
80     Headers,
81 }
82 
83 impl Recv {
new(peer: peer::Dyn, config: &Config) -> Self84     pub fn new(peer: peer::Dyn, config: &Config) -> Self {
85         let next_stream_id = if peer.is_server() { 1 } else { 2 };
86 
87         let mut flow = FlowControl::new();
88 
89         // connections always have the default window size, regardless of
90         // settings
91         flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
92             .expect("invalid initial remote window size");
93         flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
94 
95         Recv {
96             init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
97             flow,
98             in_flight_data: 0 as WindowSize,
99             next_stream_id: Ok(next_stream_id.into()),
100             pending_window_updates: store::Queue::new(),
101             last_processed_id: StreamId::ZERO,
102             max_stream_id: StreamId::MAX,
103             pending_accept: store::Queue::new(),
104             pending_reset_expired: store::Queue::new(),
105             reset_duration: config.local_reset_duration,
106             buffer: Buffer::new(),
107             refused: None,
108             is_push_enabled: config.local_push_enabled,
109             is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
110         }
111     }
112 
113     /// Returns the initial receive window size
init_window_sz(&self) -> WindowSize114     pub fn init_window_sz(&self) -> WindowSize {
115         self.init_window_sz
116     }
117 
118     /// Returns the ID of the last processed stream
last_processed_id(&self) -> StreamId119     pub fn last_processed_id(&self) -> StreamId {
120         self.last_processed_id
121     }
122 
123     /// Update state reflecting a new, remotely opened stream
124     ///
125     /// Returns the stream state if successful. `None` if refused
open( &mut self, id: StreamId, mode: Open, counts: &mut Counts, ) -> Result<Option<StreamId>, Error>126     pub fn open(
127         &mut self,
128         id: StreamId,
129         mode: Open,
130         counts: &mut Counts,
131     ) -> Result<Option<StreamId>, Error> {
132         assert!(self.refused.is_none());
133 
134         counts.peer().ensure_can_open(id, mode)?;
135 
136         let next_id = self.next_stream_id()?;
137         if id < next_id {
138             proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
139             return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
140         }
141 
142         self.next_stream_id = id.next_id();
143 
144         if !counts.can_inc_num_recv_streams() {
145             self.refused = Some(id);
146             return Ok(None);
147         }
148 
149         Ok(Some(id))
150     }
151 
152     /// Transition the stream state based on receiving headers
153     ///
154     /// The caller ensures that the frame represents headers and not trailers.
recv_headers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, counts: &mut Counts, ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>>155     pub fn recv_headers(
156         &mut self,
157         frame: frame::Headers,
158         stream: &mut store::Ptr,
159         counts: &mut Counts,
160     ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
161         tracing::trace!("opening stream; init_window={}", self.init_window_sz);
162         let is_initial = stream.state.recv_open(&frame)?;
163 
164         if is_initial {
165             // TODO: be smarter about this logic
166             if frame.stream_id() > self.last_processed_id {
167                 self.last_processed_id = frame.stream_id();
168             }
169 
170             // Increment the number of concurrent streams
171             counts.inc_num_recv_streams(stream);
172         }
173 
174         if !stream.content_length.is_head() {
175             use super::stream::ContentLength;
176             use http::header;
177 
178             if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
179                 let content_length = match frame::parse_u64(content_length.as_bytes()) {
180                     Ok(v) => v,
181                     Err(_) => {
182                         proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
183                         return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
184                     }
185                 };
186 
187                 stream.content_length = ContentLength::Remaining(content_length);
188             }
189         }
190 
191         if frame.is_over_size() {
192             // A frame is over size if the decoded header block was bigger than
193             // SETTINGS_MAX_HEADER_LIST_SIZE.
194             //
195             // > A server that receives a larger header block than it is willing
196             // > to handle can send an HTTP 431 (Request Header Fields Too
197             // > Large) status code [RFC6585]. A client can discard responses
198             // > that it cannot process.
199             //
200             // So, if peer is a server, we'll send a 431. In either case,
201             // an error is recorded, which will send a REFUSED_STREAM,
202             // since we don't want any of the data frames either.
203             tracing::debug!(
204                 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
205                  recv_headers: frame is over size; stream={:?}",
206                 stream.id
207             );
208             return if counts.peer().is_server() && is_initial {
209                 let mut res = frame::Headers::new(
210                     stream.id,
211                     frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
212                     HeaderMap::new(),
213                 );
214                 res.set_end_stream();
215                 Err(RecvHeaderBlockError::Oversize(Some(res)))
216             } else {
217                 Err(RecvHeaderBlockError::Oversize(None))
218             };
219         }
220 
221         let stream_id = frame.stream_id();
222         let (pseudo, fields) = frame.into_parts();
223 
224         if pseudo.protocol.is_some()
225             && counts.peer().is_server()
226             && !self.is_extended_connect_protocol_enabled
227         {
228             proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
229             return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
230         }
231 
232         if pseudo.status.is_some() && counts.peer().is_server() {
233             proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
234             return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
235         }
236 
237         if !pseudo.is_informational() {
238             let message = counts
239                 .peer()
240                 .convert_poll_message(pseudo, fields, stream_id)?;
241 
242             // Push the frame onto the stream's recv buffer
243             stream
244                 .pending_recv
245                 .push_back(&mut self.buffer, Event::Headers(message));
246             stream.notify_recv();
247 
248             // Only servers can receive a headers frame that initiates the stream.
249             // This is verified in `Streams` before calling this function.
250             if counts.peer().is_server() {
251                 // Correctness: never push a stream to `pending_accept` without having the
252                 // corresponding headers frame pushed to `stream.pending_recv`.
253                 self.pending_accept.push(stream);
254             }
255         }
256 
257         Ok(())
258     }
259 
260     /// Called by the server to get the request
261     ///
262     /// # Panics
263     ///
264     /// Panics if `stream.pending_recv` has no `Event::Headers` queued.
265     ///
take_request(&mut self, stream: &mut store::Ptr) -> Request<()>266     pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
267         use super::peer::PollMessage::*;
268 
269         match stream.pending_recv.pop_front(&mut self.buffer) {
270             Some(Event::Headers(Server(request))) => request,
271             _ => unreachable!("server stream queue must start with Headers"),
272         }
273     }
274 
275     /// Called by the client to get pushed response
poll_pushed( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>>276     pub fn poll_pushed(
277         &mut self,
278         cx: &Context,
279         stream: &mut store::Ptr,
280     ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
281         use super::peer::PollMessage::*;
282 
283         let mut ppp = stream.pending_push_promises.take();
284         let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
285             match pushed.pending_recv.pop_front(&mut self.buffer) {
286                 Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
287                 // When frames are pushed into the queue, it is verified that
288                 // the first frame is a HEADERS frame.
289                 _ => panic!("Headers not set on pushed stream"),
290             }
291         });
292         stream.pending_push_promises = ppp;
293         if let Some(p) = pushed {
294             Poll::Ready(Some(Ok(p)))
295         } else {
296             let is_open = stream.state.ensure_recv_open()?;
297 
298             if is_open {
299                 stream.recv_task = Some(cx.waker().clone());
300                 Poll::Pending
301             } else {
302                 Poll::Ready(None)
303             }
304         }
305     }
306 
307     /// Called by the client to get the response
poll_response( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Result<Response<()>, proto::Error>>308     pub fn poll_response(
309         &mut self,
310         cx: &Context,
311         stream: &mut store::Ptr,
312     ) -> Poll<Result<Response<()>, proto::Error>> {
313         use super::peer::PollMessage::*;
314 
315         // If the buffer is not empty, then the first frame must be a HEADERS
316         // frame or the user violated the contract.
317         match stream.pending_recv.pop_front(&mut self.buffer) {
318             Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
319             Some(_) => panic!("poll_response called after response returned"),
320             None => {
321                 if !stream.state.ensure_recv_open()? {
322                     proto_err!(stream: "poll_response: stream={:?} is not opened;",  stream.id);
323                     return Poll::Ready(Err(Error::library_reset(
324                         stream.id,
325                         Reason::PROTOCOL_ERROR,
326                     )));
327                 }
328 
329                 stream.recv_task = Some(cx.waker().clone());
330                 Poll::Pending
331             }
332         }
333     }
334 
335     /// Transition the stream based on receiving trailers
recv_trailers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, ) -> Result<(), Error>336     pub fn recv_trailers(
337         &mut self,
338         frame: frame::Headers,
339         stream: &mut store::Ptr,
340     ) -> Result<(), Error> {
341         // Transition the state
342         stream.state.recv_close()?;
343 
344         if stream.ensure_content_length_zero().is_err() {
345             proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};",  stream.id);
346             return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
347         }
348 
349         let trailers = frame.into_fields();
350 
351         // Push the frame onto the stream's recv buffer
352         stream
353             .pending_recv
354             .push_back(&mut self.buffer, Event::Trailers(trailers));
355         stream.notify_recv();
356 
357         Ok(())
358     }
359 
360     /// Releases capacity of the connection
release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>)361     pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
362         tracing::trace!(
363             "release_connection_capacity; size={}, connection in_flight_data={}",
364             capacity,
365             self.in_flight_data,
366         );
367 
368         // Decrement in-flight data
369         self.in_flight_data -= capacity;
370 
371         // Assign capacity to connection
372         // TODO: proper error handling
373         let _res = self.flow.assign_capacity(capacity);
374         debug_assert!(_res.is_ok());
375 
376         if self.flow.unclaimed_capacity().is_some() {
377             if let Some(task) = task.take() {
378                 task.wake();
379             }
380         }
381     }
382 
383     /// Releases capacity back to the connection & stream
release_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, task: &mut Option<Waker>, ) -> Result<(), UserError>384     pub fn release_capacity(
385         &mut self,
386         capacity: WindowSize,
387         stream: &mut store::Ptr,
388         task: &mut Option<Waker>,
389     ) -> Result<(), UserError> {
390         tracing::trace!("release_capacity; size={}", capacity);
391 
392         if capacity > stream.in_flight_recv_data {
393             return Err(UserError::ReleaseCapacityTooBig);
394         }
395 
396         self.release_connection_capacity(capacity, task);
397 
398         // Decrement in-flight data
399         stream.in_flight_recv_data -= capacity;
400 
401         // Assign capacity to stream
402         // TODO: proper error handling
403         let _res = stream.recv_flow.assign_capacity(capacity);
404         debug_assert!(_res.is_ok());
405 
406         if stream.recv_flow.unclaimed_capacity().is_some() {
407             // Queue the stream for sending the WINDOW_UPDATE frame.
408             self.pending_window_updates.push(stream);
409 
410             if let Some(task) = task.take() {
411                 task.wake();
412             }
413         }
414 
415         Ok(())
416     }
417 
418     /// Release any unclaimed capacity for a closed stream.
release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)419     pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
420         debug_assert_eq!(stream.ref_count, 0);
421 
422         if stream.in_flight_recv_data == 0 {
423             return;
424         }
425 
426         tracing::trace!(
427             "auto-release closed stream ({:?}) capacity: {:?}",
428             stream.id,
429             stream.in_flight_recv_data,
430         );
431 
432         self.release_connection_capacity(stream.in_flight_recv_data, task);
433         stream.in_flight_recv_data = 0;
434 
435         self.clear_recv_buffer(stream);
436     }
437 
438     /// Set the "target" connection window size.
439     ///
440     /// By default, all new connections start with 64kb of window size. As
441     /// streams used and release capacity, we will send WINDOW_UPDATEs for the
442     /// connection to bring it back up to the initial "target".
443     ///
444     /// Setting a target means that we will try to tell the peer about
445     /// WINDOW_UPDATEs so the peer knows it has about `target` window to use
446     /// for the whole connection.
447     ///
448     /// The `task` is an optional parked task for the `Connection` that might
449     /// be blocked on needing more window capacity.
set_target_connection_window( &mut self, target: WindowSize, task: &mut Option<Waker>, ) -> Result<(), Reason>450     pub fn set_target_connection_window(
451         &mut self,
452         target: WindowSize,
453         task: &mut Option<Waker>,
454     ) -> Result<(), Reason> {
455         tracing::trace!(
456             "set_target_connection_window; target={}; available={}, reserved={}",
457             target,
458             self.flow.available(),
459             self.in_flight_data,
460         );
461 
462         // The current target connection window is our `available` plus any
463         // in-flight data reserved by streams.
464         //
465         // Update the flow controller with the difference between the new
466         // target and the current target.
467         let current = self
468             .flow
469             .available()
470             .add(self.in_flight_data)?
471             .checked_size();
472         if target > current {
473             self.flow.assign_capacity(target - current)?;
474         } else {
475             self.flow.claim_capacity(current - target)?;
476         }
477 
478         // If changing the target capacity means we gained a bunch of capacity,
479         // enough that we went over the update threshold, then schedule sending
480         // a connection WINDOW_UPDATE.
481         if self.flow.unclaimed_capacity().is_some() {
482             if let Some(task) = task.take() {
483                 task.wake();
484             }
485         }
486         Ok(())
487     }
488 
apply_local_settings( &mut self, settings: &frame::Settings, store: &mut Store, ) -> Result<(), proto::Error>489     pub(crate) fn apply_local_settings(
490         &mut self,
491         settings: &frame::Settings,
492         store: &mut Store,
493     ) -> Result<(), proto::Error> {
494         if let Some(val) = settings.is_extended_connect_protocol_enabled() {
495             self.is_extended_connect_protocol_enabled = val;
496         }
497 
498         if let Some(target) = settings.initial_window_size() {
499             let old_sz = self.init_window_sz;
500             self.init_window_sz = target;
501 
502             tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
503 
504             // Per RFC 7540 §6.9.2:
505             //
506             // In addition to changing the flow-control window for streams that are
507             // not yet active, a SETTINGS frame can alter the initial flow-control
508             // window size for streams with active flow-control windows (that is,
509             // streams in the "open" or "half-closed (remote)" state). When the
510             // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
511             // the size of all stream flow-control windows that it maintains by the
512             // difference between the new value and the old value.
513             //
514             // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
515             // space in a flow-control window to become negative. A sender MUST
516             // track the negative flow-control window and MUST NOT send new
517             // flow-controlled frames until it receives WINDOW_UPDATE frames that
518             // cause the flow-control window to become positive.
519 
520             match target.cmp(&old_sz) {
521                 Ordering::Less => {
522                     // We must decrease the (local) window on every open stream.
523                     let dec = old_sz - target;
524                     tracing::trace!("decrementing all windows; dec={}", dec);
525 
526                     store.try_for_each(|mut stream| {
527                         stream
528                             .recv_flow
529                             .dec_recv_window(dec)
530                             .map_err(proto::Error::library_go_away)?;
531                         Ok::<_, proto::Error>(())
532                     })?;
533                 }
534                 Ordering::Greater => {
535                     // We must increase the (local) window on every open stream.
536                     let inc = target - old_sz;
537                     tracing::trace!("incrementing all windows; inc={}", inc);
538                     store.try_for_each(|mut stream| {
539                         // XXX: Shouldn't the peer have already noticed our
540                         // overflow and sent us a GOAWAY?
541                         stream
542                             .recv_flow
543                             .inc_window(inc)
544                             .map_err(proto::Error::library_go_away)?;
545                         stream
546                             .recv_flow
547                             .assign_capacity(inc)
548                             .map_err(proto::Error::library_go_away)?;
549                         Ok::<_, proto::Error>(())
550                     })?;
551                 }
552                 Ordering::Equal => (),
553             }
554         }
555 
556         Ok(())
557     }
558 
is_end_stream(&self, stream: &store::Ptr) -> bool559     pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
560         if !stream.state.is_recv_closed() {
561             return false;
562         }
563 
564         stream.pending_recv.is_empty()
565     }
566 
recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error>567     pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
568         let sz = frame.payload().len();
569 
570         // This should have been enforced at the codec::FramedRead layer, so
571         // this is just a sanity check.
572         assert!(sz <= MAX_WINDOW_SIZE as usize);
573 
574         let sz = sz as WindowSize;
575 
576         let is_ignoring_frame = stream.state.is_local_error();
577 
578         if !is_ignoring_frame && !stream.state.is_recv_streaming() {
579             // TODO: There are cases where this can be a stream error of
580             // STREAM_CLOSED instead...
581 
582             // Receiving a DATA frame when not expecting one is a protocol
583             // error.
584             proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
585             return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
586         }
587 
588         tracing::trace!(
589             "recv_data; size={}; connection={}; stream={}",
590             sz,
591             self.flow.window_size(),
592             stream.recv_flow.window_size()
593         );
594 
595         if is_ignoring_frame {
596             tracing::trace!(
597                 "recv_data; frame ignored on locally reset {:?} for some time",
598                 stream.id,
599             );
600             return self.ignore_data(sz);
601         }
602 
603         // Ensure that there is enough capacity on the connection before acting
604         // on the stream.
605         self.consume_connection_window(sz)?;
606 
607         if stream.recv_flow.window_size() < sz {
608             // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
609             // > A receiver MAY respond with a stream error (Section 5.4.2) or
610             // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
611             // > it is unable to accept a frame.
612             //
613             // So, for violating the **stream** window, we can send either a
614             // stream or connection error. We've opted to send a stream
615             // error.
616             return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
617         }
618 
619         if stream.dec_content_length(frame.payload().len()).is_err() {
620             proto_err!(stream:
621                 "recv_data: content-length overflow; stream={:?}; len={:?}",
622                 stream.id,
623                 frame.payload().len(),
624             );
625             return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
626         }
627 
628         if frame.is_end_stream() {
629             if stream.ensure_content_length_zero().is_err() {
630                 proto_err!(stream:
631                     "recv_data: content-length underflow; stream={:?}; len={:?}",
632                     stream.id,
633                     frame.payload().len(),
634                 );
635                 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
636             }
637 
638             if stream.state.recv_close().is_err() {
639                 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
640                 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
641             }
642         }
643 
644         // Received a frame, but no one cared about it. fix issue#648
645         if !stream.is_recv {
646             tracing::trace!(
647                 "recv_data; frame ignored on stream release {:?} for some time",
648                 stream.id,
649             );
650             self.release_connection_capacity(sz, &mut None);
651             return Ok(());
652         }
653 
654         // Update stream level flow control
655         stream
656             .recv_flow
657             .send_data(sz)
658             .map_err(proto::Error::library_go_away)?;
659 
660         // Track the data as in-flight
661         stream.in_flight_recv_data += sz;
662 
663         let event = Event::Data(frame.into_payload());
664 
665         // Push the frame onto the recv buffer
666         stream.pending_recv.push_back(&mut self.buffer, event);
667         stream.notify_recv();
668 
669         Ok(())
670     }
671 
ignore_data(&mut self, sz: WindowSize) -> Result<(), Error>672     pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
673         // Ensure that there is enough capacity on the connection...
674         self.consume_connection_window(sz)?;
675 
676         // Since we are ignoring this frame,
677         // we aren't returning the frame to the user. That means they
678         // have no way to release the capacity back to the connection. So
679         // we have to release it automatically.
680         //
681         // This call doesn't send a WINDOW_UPDATE immediately, just marks
682         // the capacity as available to be reclaimed. When the available
683         // capacity meets a threshold, a WINDOW_UPDATE is then sent.
684         self.release_connection_capacity(sz, &mut None);
685         Ok(())
686     }
687 
consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error>688     pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
689         if self.flow.window_size() < sz {
690             tracing::debug!(
691                 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
692                 self.flow.window_size(),
693                 sz,
694             );
695             return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
696         }
697 
698         // Update connection level flow control
699         self.flow.send_data(sz).map_err(Error::library_go_away)?;
700 
701         // Track the data as in-flight
702         self.in_flight_data += sz;
703         Ok(())
704     }
705 
recv_push_promise( &mut self, frame: frame::PushPromise, stream: &mut store::Ptr, ) -> Result<(), Error>706     pub fn recv_push_promise(
707         &mut self,
708         frame: frame::PushPromise,
709         stream: &mut store::Ptr,
710     ) -> Result<(), Error> {
711         stream.state.reserve_remote()?;
712         if frame.is_over_size() {
713             // A frame is over size if the decoded header block was bigger than
714             // SETTINGS_MAX_HEADER_LIST_SIZE.
715             //
716             // > A server that receives a larger header block than it is willing
717             // > to handle can send an HTTP 431 (Request Header Fields Too
718             // > Large) status code [RFC6585]. A client can discard responses
719             // > that it cannot process.
720             //
721             // So, if peer is a server, we'll send a 431. In either case,
722             // an error is recorded, which will send a REFUSED_STREAM,
723             // since we don't want any of the data frames either.
724             tracing::debug!(
725                 "stream error REFUSED_STREAM -- recv_push_promise: \
726                  headers frame is over size; promised_id={:?};",
727                 frame.promised_id(),
728             );
729             return Err(Error::library_reset(
730                 frame.promised_id(),
731                 Reason::REFUSED_STREAM,
732             ));
733         }
734 
735         let promised_id = frame.promised_id();
736         let (pseudo, fields) = frame.into_parts();
737         let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
738 
739         if let Err(e) = frame::PushPromise::validate_request(&req) {
740             use PushPromiseHeaderError::*;
741             match e {
742                 NotSafeAndCacheable => proto_err!(
743                     stream:
744                     "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
745                     req.method(),
746                     promised_id,
747                 ),
748                 InvalidContentLength(e) => proto_err!(
749                     stream:
750                     "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
751                     e,
752                     promised_id,
753                 ),
754             }
755             return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
756         }
757 
758         use super::peer::PollMessage::*;
759         stream
760             .pending_recv
761             .push_back(&mut self.buffer, Event::Headers(Server(req)));
762         stream.notify_recv();
763         Ok(())
764     }
765 
766     /// Ensures that `id` is not in the `Idle` state.
ensure_not_idle(&self, id: StreamId) -> Result<(), Reason>767     pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
768         if let Ok(next) = self.next_stream_id {
769             if id >= next {
770                 tracing::debug!(
771                     "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
772                     id
773                 );
774                 return Err(Reason::PROTOCOL_ERROR);
775             }
776         }
777         // if next_stream_id is overflowed, that's ok.
778 
779         Ok(())
780     }
781 
782     /// Handle remote sending an explicit RST_STREAM.
recv_reset( &mut self, frame: frame::Reset, stream: &mut Stream, counts: &mut Counts, ) -> Result<(), Error>783     pub fn recv_reset(
784         &mut self,
785         frame: frame::Reset,
786         stream: &mut Stream,
787         counts: &mut Counts,
788     ) -> Result<(), Error> {
789         // Reseting a stream that the user hasn't accepted is possible,
790         // but should be done with care. These streams will continue
791         // to take up memory in the accept queue, but will no longer be
792         // counted as "concurrent" streams.
793         //
794         // So, we have a separate limit for these.
795         //
796         // See https://github.com/hyperium/hyper/issues/2877
797         if stream.is_pending_accept {
798             if counts.can_inc_num_remote_reset_streams() {
799                 counts.inc_num_remote_reset_streams();
800             } else {
801                 tracing::warn!(
802                     "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
803                     counts.max_remote_reset_streams(),
804                 );
805                 return Err(Error::library_go_away_data(
806                     Reason::ENHANCE_YOUR_CALM,
807                     "too_many_resets",
808                 ));
809             }
810         }
811 
812         // Notify the stream
813         stream.state.recv_reset(frame, stream.is_pending_send);
814 
815         stream.notify_send();
816         stream.notify_recv();
817 
818         Ok(())
819     }
820 
821     /// Handle a connection-level error
handle_error(&mut self, err: &proto::Error, stream: &mut Stream)822     pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
823         // Receive an error
824         stream.state.handle_error(err);
825 
826         // If a receiver is waiting, notify it
827         stream.notify_send();
828         stream.notify_recv();
829     }
830 
go_away(&mut self, last_processed_id: StreamId)831     pub fn go_away(&mut self, last_processed_id: StreamId) {
832         assert!(self.max_stream_id >= last_processed_id);
833         self.max_stream_id = last_processed_id;
834     }
835 
recv_eof(&mut self, stream: &mut Stream)836     pub fn recv_eof(&mut self, stream: &mut Stream) {
837         stream.state.recv_eof();
838         stream.notify_send();
839         stream.notify_recv();
840     }
841 
clear_recv_buffer(&mut self, stream: &mut Stream)842     pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
843         while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
844             // drop it
845         }
846     }
847 
848     /// Get the max ID of streams we can receive.
849     ///
850     /// This gets lowered if we send a GOAWAY frame.
max_stream_id(&self) -> StreamId851     pub fn max_stream_id(&self) -> StreamId {
852         self.max_stream_id
853     }
854 
next_stream_id(&self) -> Result<StreamId, Error>855     pub fn next_stream_id(&self) -> Result<StreamId, Error> {
856         if let Ok(id) = self.next_stream_id {
857             Ok(id)
858         } else {
859             Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
860         }
861     }
862 
may_have_created_stream(&self, id: StreamId) -> bool863     pub fn may_have_created_stream(&self, id: StreamId) -> bool {
864         if let Ok(next_id) = self.next_stream_id {
865             // Peer::is_local_init should have been called beforehand
866             debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
867             id < next_id
868         } else {
869             true
870         }
871     }
872 
maybe_reset_next_stream_id(&mut self, id: StreamId)873     pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
874         if let Ok(next_id) = self.next_stream_id {
875             // !Peer::is_local_init should have been called beforehand
876             debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
877             if id >= next_id {
878                 self.next_stream_id = id.next_id();
879             }
880         }
881     }
882 
883     /// Returns true if the remote peer can reserve a stream with the given ID.
ensure_can_reserve(&self) -> Result<(), Error>884     pub fn ensure_can_reserve(&self) -> Result<(), Error> {
885         if !self.is_push_enabled {
886             proto_err!(conn: "recv_push_promise: push is disabled");
887             return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
888         }
889 
890         Ok(())
891     }
892 
893     /// Add a locally reset stream to queue to be eventually reaped.
enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts)894     pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
895         if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
896             return;
897         }
898 
899         tracing::trace!("enqueue_reset_expiration; {:?}", stream.id);
900 
901         if counts.can_inc_num_reset_streams() {
902             counts.inc_num_reset_streams();
903             self.pending_reset_expired.push(stream);
904         }
905     }
906 
907     /// Send any pending refusals.
send_pending_refusal<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,908     pub fn send_pending_refusal<T, B>(
909         &mut self,
910         cx: &mut Context,
911         dst: &mut Codec<T, Prioritized<B>>,
912     ) -> Poll<io::Result<()>>
913     where
914         T: AsyncWrite + Unpin,
915         B: Buf,
916     {
917         if let Some(stream_id) = self.refused {
918             ready!(dst.poll_ready(cx))?;
919 
920             // Create the RST_STREAM frame
921             let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
922 
923             // Buffer the frame
924             dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
925         }
926 
927         self.refused = None;
928 
929         Poll::Ready(Ok(()))
930     }
931 
clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)932     pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
933         if !self.pending_reset_expired.is_empty() {
934             let now = Instant::now();
935             let reset_duration = self.reset_duration;
936             while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
937                 let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
938                 // rust-lang/rust#86470 tracks a bug in the standard library where `Instant`
939                 // subtraction can panic (because, on some platforms, `Instant` isn't actually
940                 // monotonic). We use a saturating operation to avoid this panic here.
941                 now.saturating_duration_since(reset_at) > reset_duration
942             }) {
943                 counts.transition_after(stream, true);
944             }
945         }
946     }
947 
clear_queues( &mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts, )948     pub fn clear_queues(
949         &mut self,
950         clear_pending_accept: bool,
951         store: &mut Store,
952         counts: &mut Counts,
953     ) {
954         self.clear_stream_window_update_queue(store, counts);
955         self.clear_all_reset_streams(store, counts);
956 
957         if clear_pending_accept {
958             self.clear_all_pending_accept(store, counts);
959         }
960     }
961 
clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts)962     fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
963         while let Some(stream) = self.pending_window_updates.pop(store) {
964             counts.transition(stream, |_, stream| {
965                 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
966             })
967         }
968     }
969 
970     /// Called on EOF
clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)971     fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
972         while let Some(stream) = self.pending_reset_expired.pop(store) {
973             counts.transition_after(stream, true);
974         }
975     }
976 
clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts)977     fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
978         while let Some(stream) = self.pending_accept.pop(store) {
979             counts.transition_after(stream, false);
980         }
981     }
982 
poll_complete<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,983     pub fn poll_complete<T, B>(
984         &mut self,
985         cx: &mut Context,
986         store: &mut Store,
987         counts: &mut Counts,
988         dst: &mut Codec<T, Prioritized<B>>,
989     ) -> Poll<io::Result<()>>
990     where
991         T: AsyncWrite + Unpin,
992         B: Buf,
993     {
994         // Send any pending connection level window updates
995         ready!(self.send_connection_window_update(cx, dst))?;
996 
997         // Send any pending stream level window updates
998         ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
999 
1000         Poll::Ready(Ok(()))
1001     }
1002 
1003     /// Send connection level window update
send_connection_window_update<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,1004     fn send_connection_window_update<T, B>(
1005         &mut self,
1006         cx: &mut Context,
1007         dst: &mut Codec<T, Prioritized<B>>,
1008     ) -> Poll<io::Result<()>>
1009     where
1010         T: AsyncWrite + Unpin,
1011         B: Buf,
1012     {
1013         if let Some(incr) = self.flow.unclaimed_capacity() {
1014             let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1015 
1016             // Ensure the codec has capacity
1017             ready!(dst.poll_ready(cx))?;
1018 
1019             // Buffer the WINDOW_UPDATE frame
1020             dst.buffer(frame.into())
1021                 .expect("invalid WINDOW_UPDATE frame");
1022 
1023             // Update flow control
1024             self.flow
1025                 .inc_window(incr)
1026                 .expect("unexpected flow control state");
1027         }
1028 
1029         Poll::Ready(Ok(()))
1030     }
1031 
1032     /// Send stream level window update
send_stream_window_updates<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,1033     pub fn send_stream_window_updates<T, B>(
1034         &mut self,
1035         cx: &mut Context,
1036         store: &mut Store,
1037         counts: &mut Counts,
1038         dst: &mut Codec<T, Prioritized<B>>,
1039     ) -> Poll<io::Result<()>>
1040     where
1041         T: AsyncWrite + Unpin,
1042         B: Buf,
1043     {
1044         loop {
1045             // Ensure the codec has capacity
1046             ready!(dst.poll_ready(cx))?;
1047 
1048             // Get the next stream
1049             let stream = match self.pending_window_updates.pop(store) {
1050                 Some(stream) => stream,
1051                 None => return Poll::Ready(Ok(())),
1052             };
1053 
1054             counts.transition(stream, |_, stream| {
1055                 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1056                 debug_assert!(!stream.is_pending_window_update);
1057 
1058                 if !stream.state.is_recv_streaming() {
1059                     // No need to send window updates on the stream if the stream is
1060                     // no longer receiving data.
1061                     //
1062                     // TODO: is this correct? We could possibly send a window
1063                     // update on a ReservedRemote stream if we already know
1064                     // we want to stream the data faster...
1065                     return;
1066                 }
1067 
1068                 // TODO: de-dup
1069                 if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1070                     // Create the WINDOW_UPDATE frame
1071                     let frame = frame::WindowUpdate::new(stream.id, incr);
1072 
1073                     // Buffer it
1074                     dst.buffer(frame.into())
1075                         .expect("invalid WINDOW_UPDATE frame");
1076 
1077                     // Update flow control
1078                     stream
1079                         .recv_flow
1080                         .inc_window(incr)
1081                         .expect("unexpected flow control state");
1082                 }
1083             })
1084         }
1085     }
1086 
next_incoming(&mut self, store: &mut Store) -> Option<store::Key>1087     pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1088         self.pending_accept.pop(store).map(|ptr| ptr.key())
1089     }
1090 
poll_data( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<Bytes, proto::Error>>>1091     pub fn poll_data(
1092         &mut self,
1093         cx: &Context,
1094         stream: &mut Stream,
1095     ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1096         match stream.pending_recv.pop_front(&mut self.buffer) {
1097             Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1098             Some(event) => {
1099                 // Frame is trailer
1100                 stream.pending_recv.push_front(&mut self.buffer, event);
1101 
1102                 // Notify the recv task. This is done just in case
1103                 // `poll_trailers` was called.
1104                 //
1105                 // It is very likely that `notify_recv` will just be a no-op (as
1106                 // the task will be None), so this isn't really much of a
1107                 // performance concern. It also means we don't have to track
1108                 // state to see if `poll_trailers` was called before `poll_data`
1109                 // returned `None`.
1110                 stream.notify_recv();
1111 
1112                 // No more data frames
1113                 Poll::Ready(None)
1114             }
1115             None => self.schedule_recv(cx, stream),
1116         }
1117     }
1118 
poll_trailers( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<HeaderMap, proto::Error>>>1119     pub fn poll_trailers(
1120         &mut self,
1121         cx: &Context,
1122         stream: &mut Stream,
1123     ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1124         match stream.pending_recv.pop_front(&mut self.buffer) {
1125             Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1126             Some(event) => {
1127                 // Frame is not trailers.. not ready to poll trailers yet.
1128                 stream.pending_recv.push_front(&mut self.buffer, event);
1129 
1130                 Poll::Pending
1131             }
1132             None => self.schedule_recv(cx, stream),
1133         }
1134     }
1135 
schedule_recv<T>( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<T, proto::Error>>>1136     fn schedule_recv<T>(
1137         &mut self,
1138         cx: &Context,
1139         stream: &mut Stream,
1140     ) -> Poll<Option<Result<T, proto::Error>>> {
1141         if stream.state.ensure_recv_open()? {
1142             // Request to get notified once more frames arrive
1143             stream.recv_task = Some(cx.waker().clone());
1144             Poll::Pending
1145         } else {
1146             // No more frames will be received
1147             Poll::Ready(None)
1148         }
1149     }
1150 }
1151 
1152 // ===== impl Open =====
1153 
1154 impl Open {
is_push_promise(&self) -> bool1155     pub fn is_push_promise(&self) -> bool {
1156         matches!(*self, Self::PushPromise)
1157     }
1158 }
1159 
1160 // ===== impl RecvHeaderBlockError =====
1161 
1162 impl<T> From<Error> for RecvHeaderBlockError<T> {
from(err: Error) -> Self1163     fn from(err: Error) -> Self {
1164         RecvHeaderBlockError::State(err)
1165     }
1166 }
1167