1 use super::recv::RecvHeaderBlockError;
2 use super::store::{self, Entry, Resolve, Store};
3 use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
4 use crate::codec::{Codec, SendError, UserError};
5 use crate::ext::Protocol;
6 use crate::frame::{self, Frame, Reason};
7 use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize};
8 use crate::{client, proto, server};
9 
10 use bytes::{Buf, Bytes};
11 use http::{HeaderMap, Request, Response};
12 use std::task::{Context, Poll, Waker};
13 use tokio::io::AsyncWrite;
14 
15 use std::sync::{Arc, Mutex};
16 use std::{fmt, io};
17 
18 #[derive(Debug)]
19 pub(crate) struct Streams<B, P>
20 where
21     P: Peer,
22 {
23     /// Holds most of the connection and stream related state for processing
24     /// HTTP/2 frames associated with streams.
25     inner: Arc<Mutex<Inner>>,
26 
27     /// This is the queue of frames to be written to the wire. This is split out
28     /// to avoid requiring a `B` generic on all public API types even if `B` is
29     /// not technically required.
30     ///
31     /// Currently, splitting this out requires a second `Arc` + `Mutex`.
32     /// However, it should be possible to avoid this duplication with a little
33     /// bit of unsafe code. This optimization has been postponed until it has
34     /// been shown to be necessary.
35     send_buffer: Arc<SendBuffer<B>>,
36 
37     _p: ::std::marker::PhantomData<P>,
38 }
39 
40 // Like `Streams` but with a `peer::Dyn` field instead of a static `P: Peer` type parameter.
41 // Ensures that the methods only get one instantiation, instead of two (client and server)
42 #[derive(Debug)]
43 pub(crate) struct DynStreams<'a, B> {
44     inner: &'a Mutex<Inner>,
45 
46     send_buffer: &'a SendBuffer<B>,
47 
48     peer: peer::Dyn,
49 }
50 
51 /// Reference to the stream state
52 #[derive(Debug)]
53 pub(crate) struct StreamRef<B> {
54     opaque: OpaqueStreamRef,
55     send_buffer: Arc<SendBuffer<B>>,
56 }
57 
58 /// Reference to the stream state that hides the send data chunk generic
59 pub(crate) struct OpaqueStreamRef {
60     inner: Arc<Mutex<Inner>>,
61     key: store::Key,
62 }
63 
64 /// Fields needed to manage state related to managing the set of streams. This
65 /// is mostly split out to make ownership happy.
66 ///
67 /// TODO: better name
68 #[derive(Debug)]
69 struct Inner {
70     /// Tracks send & recv stream concurrency.
71     counts: Counts,
72 
73     /// Connection level state and performs actions on streams
74     actions: Actions,
75 
76     /// Stores stream state
77     store: Store,
78 
79     /// The number of stream refs to this shared state.
80     refs: usize,
81 }
82 
83 #[derive(Debug)]
84 struct Actions {
85     /// Manages state transitions initiated by receiving frames
86     recv: Recv,
87 
88     /// Manages state transitions initiated by sending frames
89     send: Send,
90 
91     /// Task that calls `poll_complete`.
92     task: Option<Waker>,
93 
94     /// If the connection errors, a copy is kept for any StreamRefs.
95     conn_error: Option<proto::Error>,
96 }
97 
98 /// Contains the buffer of frames to be written to the wire.
99 #[derive(Debug)]
100 struct SendBuffer<B> {
101     inner: Mutex<Buffer<Frame<B>>>,
102 }
103 
104 // ===== impl Streams =====
105 
106 impl<B, P> Streams<B, P>
107 where
108     B: Buf,
109     P: Peer,
110 {
new(config: Config) -> Self111     pub fn new(config: Config) -> Self {
112         let peer = P::r#dyn();
113 
114         Streams {
115             inner: Inner::new(peer, config),
116             send_buffer: Arc::new(SendBuffer::new()),
117             _p: ::std::marker::PhantomData,
118         }
119     }
120 
set_target_connection_window_size(&mut self, size: WindowSize) -> Result<(), Reason>121     pub fn set_target_connection_window_size(&mut self, size: WindowSize) -> Result<(), Reason> {
122         let mut me = self.inner.lock().unwrap();
123         let me = &mut *me;
124 
125         me.actions
126             .recv
127             .set_target_connection_window(size, &mut me.actions.task)
128     }
129 
next_incoming(&mut self) -> Option<StreamRef<B>>130     pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
131         let mut me = self.inner.lock().unwrap();
132         let me = &mut *me;
133         me.actions.recv.next_incoming(&mut me.store).map(|key| {
134             let stream = &mut me.store.resolve(key);
135             tracing::trace!(
136                 "next_incoming; id={:?}, state={:?}",
137                 stream.id,
138                 stream.state
139             );
140             // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
141             // the lock, so it can't.
142             me.refs += 1;
143 
144             // Pending-accepted remotely-reset streams are counted.
145             if stream.state.is_remote_reset() {
146                 me.counts.dec_num_remote_reset_streams();
147             }
148 
149             StreamRef {
150                 opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
151                 send_buffer: self.send_buffer.clone(),
152             }
153         })
154     }
155 
send_pending_refusal<T>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin,156     pub fn send_pending_refusal<T>(
157         &mut self,
158         cx: &mut Context,
159         dst: &mut Codec<T, Prioritized<B>>,
160     ) -> Poll<io::Result<()>>
161     where
162         T: AsyncWrite + Unpin,
163     {
164         let mut me = self.inner.lock().unwrap();
165         let me = &mut *me;
166         me.actions.recv.send_pending_refusal(cx, dst)
167     }
168 
clear_expired_reset_streams(&mut self)169     pub fn clear_expired_reset_streams(&mut self) {
170         let mut me = self.inner.lock().unwrap();
171         let me = &mut *me;
172         me.actions
173             .recv
174             .clear_expired_reset_streams(&mut me.store, &mut me.counts);
175     }
176 
poll_complete<T>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin,177     pub fn poll_complete<T>(
178         &mut self,
179         cx: &mut Context,
180         dst: &mut Codec<T, Prioritized<B>>,
181     ) -> Poll<io::Result<()>>
182     where
183         T: AsyncWrite + Unpin,
184     {
185         let mut me = self.inner.lock().unwrap();
186         me.poll_complete(&self.send_buffer, cx, dst)
187     }
188 
apply_remote_settings( &mut self, frame: &frame::Settings, is_initial: bool, ) -> Result<(), Error>189     pub fn apply_remote_settings(
190         &mut self,
191         frame: &frame::Settings,
192         is_initial: bool,
193     ) -> Result<(), Error> {
194         let mut me = self.inner.lock().unwrap();
195         let me = &mut *me;
196 
197         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
198         let send_buffer = &mut *send_buffer;
199 
200         me.counts.apply_remote_settings(frame, is_initial);
201 
202         me.actions.send.apply_remote_settings(
203             frame,
204             send_buffer,
205             &mut me.store,
206             &mut me.counts,
207             &mut me.actions.task,
208         )
209     }
210 
apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error>211     pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
212         let mut me = self.inner.lock().unwrap();
213         let me = &mut *me;
214 
215         me.actions.recv.apply_local_settings(frame, &mut me.store)
216     }
217 
send_request( &mut self, mut request: Request<()>, end_of_stream: bool, pending: Option<&OpaqueStreamRef>, ) -> Result<(StreamRef<B>, bool), SendError>218     pub fn send_request(
219         &mut self,
220         mut request: Request<()>,
221         end_of_stream: bool,
222         pending: Option<&OpaqueStreamRef>,
223     ) -> Result<(StreamRef<B>, bool), SendError> {
224         use super::stream::ContentLength;
225         use http::Method;
226 
227         let protocol = request.extensions_mut().remove::<Protocol>();
228 
229         // Clear before taking lock, incase extensions contain a StreamRef.
230         request.extensions_mut().clear();
231 
232         // TODO: There is a hazard with assigning a stream ID before the
233         // prioritize layer. If prioritization reorders new streams, this
234         // implicitly closes the earlier stream IDs.
235         //
236         // See: hyperium/h2#11
237         let mut me = self.inner.lock().unwrap();
238         let me = &mut *me;
239 
240         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
241         let send_buffer = &mut *send_buffer;
242 
243         me.actions.ensure_no_conn_error()?;
244         me.actions.send.ensure_next_stream_id()?;
245 
246         // The `pending` argument is provided by the `Client`, and holds
247         // a store `Key` of a `Stream` that may have been not been opened
248         // yet.
249         //
250         // If that stream is still pending, the Client isn't allowed to
251         // queue up another pending stream. They should use `poll_ready`.
252         if let Some(stream) = pending {
253             if me.store.resolve(stream.key).is_pending_open {
254                 return Err(UserError::Rejected.into());
255             }
256         }
257 
258         if me.counts.peer().is_server() {
259             // Servers cannot open streams. PushPromise must first be reserved.
260             return Err(UserError::UnexpectedFrameType.into());
261         }
262 
263         let stream_id = me.actions.send.open()?;
264 
265         let mut stream = Stream::new(
266             stream_id,
267             me.actions.send.init_window_sz(),
268             me.actions.recv.init_window_sz(),
269         );
270 
271         if *request.method() == Method::HEAD {
272             stream.content_length = ContentLength::Head;
273         }
274 
275         // Convert the message
276         let headers =
277             client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
278 
279         let mut stream = me.store.insert(stream.id, stream);
280 
281         let sent = me.actions.send.send_headers(
282             headers,
283             send_buffer,
284             &mut stream,
285             &mut me.counts,
286             &mut me.actions.task,
287         );
288 
289         // send_headers can return a UserError, if it does,
290         // we should forget about this stream.
291         if let Err(err) = sent {
292             stream.unlink();
293             stream.remove();
294             return Err(err.into());
295         }
296 
297         // Given that the stream has been initialized, it should not be in the
298         // closed state.
299         debug_assert!(!stream.state.is_closed());
300 
301         // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
302         // the lock, so it can't.
303         me.refs += 1;
304 
305         let is_full = me.counts.next_send_stream_will_reach_capacity();
306         Ok((
307             StreamRef {
308                 opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
309                 send_buffer: self.send_buffer.clone(),
310             },
311             is_full,
312         ))
313     }
314 
is_extended_connect_protocol_enabled(&self) -> bool315     pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
316         self.inner
317             .lock()
318             .unwrap()
319             .actions
320             .send
321             .is_extended_connect_protocol_enabled()
322     }
323 }
324 
325 impl<B> DynStreams<'_, B> {
is_buffer_empty(&self) -> bool326     pub fn is_buffer_empty(&self) -> bool {
327         self.send_buffer.is_empty()
328     }
329 
is_server(&self) -> bool330     pub fn is_server(&self) -> bool {
331         self.peer.is_server()
332     }
333 
recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error>334     pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> {
335         let mut me = self.inner.lock().unwrap();
336 
337         me.recv_headers(self.peer, self.send_buffer, frame)
338     }
339 
recv_data(&mut self, frame: frame::Data) -> Result<(), Error>340     pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> {
341         let mut me = self.inner.lock().unwrap();
342         me.recv_data(self.peer, self.send_buffer, frame)
343     }
344 
recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error>345     pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> {
346         let mut me = self.inner.lock().unwrap();
347 
348         me.recv_reset(self.send_buffer, frame)
349     }
350 
351     /// Notify all streams that a connection-level error happened.
handle_error(&mut self, err: proto::Error) -> StreamId352     pub fn handle_error(&mut self, err: proto::Error) -> StreamId {
353         let mut me = self.inner.lock().unwrap();
354         me.handle_error(self.send_buffer, err)
355     }
356 
recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error>357     pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> {
358         let mut me = self.inner.lock().unwrap();
359         me.recv_go_away(self.send_buffer, frame)
360     }
361 
last_processed_id(&self) -> StreamId362     pub fn last_processed_id(&self) -> StreamId {
363         self.inner.lock().unwrap().actions.recv.last_processed_id()
364     }
365 
recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error>366     pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> {
367         let mut me = self.inner.lock().unwrap();
368         me.recv_window_update(self.send_buffer, frame)
369     }
370 
recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error>371     pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> {
372         let mut me = self.inner.lock().unwrap();
373         me.recv_push_promise(self.send_buffer, frame)
374     }
375 
recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()>376     pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
377         let mut me = self.inner.lock().map_err(|_| ())?;
378         me.recv_eof(self.send_buffer, clear_pending_accept)
379     }
380 
send_reset(&mut self, id: StreamId, reason: Reason)381     pub fn send_reset(&mut self, id: StreamId, reason: Reason) {
382         let mut me = self.inner.lock().unwrap();
383         me.send_reset(self.send_buffer, id, reason)
384     }
385 
send_go_away(&mut self, last_processed_id: StreamId)386     pub fn send_go_away(&mut self, last_processed_id: StreamId) {
387         let mut me = self.inner.lock().unwrap();
388         me.actions.recv.go_away(last_processed_id);
389     }
390 }
391 
392 impl Inner {
new(peer: peer::Dyn, config: Config) -> Arc<Mutex<Self>>393     fn new(peer: peer::Dyn, config: Config) -> Arc<Mutex<Self>> {
394         Arc::new(Mutex::new(Inner {
395             counts: Counts::new(peer, &config),
396             actions: Actions {
397                 recv: Recv::new(peer, &config),
398                 send: Send::new(&config),
399                 task: None,
400                 conn_error: None,
401             },
402             store: Store::new(),
403             refs: 1,
404         }))
405     }
406 
recv_headers<B>( &mut self, peer: peer::Dyn, send_buffer: &SendBuffer<B>, frame: frame::Headers, ) -> Result<(), Error>407     fn recv_headers<B>(
408         &mut self,
409         peer: peer::Dyn,
410         send_buffer: &SendBuffer<B>,
411         frame: frame::Headers,
412     ) -> Result<(), Error> {
413         let id = frame.stream_id();
414 
415         // The GOAWAY process has begun. All streams with a greater ID than
416         // specified as part of GOAWAY should be ignored.
417         if id > self.actions.recv.max_stream_id() {
418             tracing::trace!(
419                 "id ({:?}) > max_stream_id ({:?}), ignoring HEADERS",
420                 id,
421                 self.actions.recv.max_stream_id()
422             );
423             return Ok(());
424         }
425 
426         let key = match self.store.find_entry(id) {
427             Entry::Occupied(e) => e.key(),
428             Entry::Vacant(e) => {
429                 // Client: it's possible to send a request, and then send
430                 // a RST_STREAM while the response HEADERS were in transit.
431                 //
432                 // Server: we can't reset a stream before having received
433                 // the request headers, so don't allow.
434                 if !peer.is_server() {
435                     // This may be response headers for a stream we've already
436                     // forgotten about...
437                     if self.actions.may_have_forgotten_stream(peer, id) {
438                         tracing::debug!(
439                             "recv_headers for old stream={:?}, sending STREAM_CLOSED",
440                             id,
441                         );
442                         return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
443                     }
444                 }
445 
446                 match self
447                     .actions
448                     .recv
449                     .open(id, Open::Headers, &mut self.counts)?
450                 {
451                     Some(stream_id) => {
452                         let stream = Stream::new(
453                             stream_id,
454                             self.actions.send.init_window_sz(),
455                             self.actions.recv.init_window_sz(),
456                         );
457 
458                         e.insert(stream)
459                     }
460                     None => return Ok(()),
461                 }
462             }
463         };
464 
465         let stream = self.store.resolve(key);
466 
467         if stream.state.is_local_error() {
468             // Locally reset streams must ignore frames "for some time".
469             // This is because the remote may have sent trailers before
470             // receiving the RST_STREAM frame.
471             tracing::trace!("recv_headers; ignoring trailers on {:?}", stream.id);
472             return Ok(());
473         }
474 
475         let actions = &mut self.actions;
476         let mut send_buffer = send_buffer.inner.lock().unwrap();
477         let send_buffer = &mut *send_buffer;
478 
479         self.counts.transition(stream, |counts, stream| {
480             tracing::trace!(
481                 "recv_headers; stream={:?}; state={:?}",
482                 stream.id,
483                 stream.state
484             );
485 
486             let res = if stream.state.is_recv_headers() {
487                 match actions.recv.recv_headers(frame, stream, counts) {
488                     Ok(()) => Ok(()),
489                     Err(RecvHeaderBlockError::Oversize(resp)) => {
490                         if let Some(resp) = resp {
491                             let sent = actions.send.send_headers(
492                                 resp, send_buffer, stream, counts, &mut actions.task);
493                             debug_assert!(sent.is_ok(), "oversize response should not fail");
494 
495                             actions.send.schedule_implicit_reset(
496                                 stream,
497                                 Reason::REFUSED_STREAM,
498                                 counts,
499                                 &mut actions.task);
500 
501                             actions.recv.enqueue_reset_expiration(stream, counts);
502 
503                             Ok(())
504                         } else {
505                             Err(Error::library_reset(stream.id, Reason::REFUSED_STREAM))
506                         }
507                     },
508                     Err(RecvHeaderBlockError::State(err)) => Err(err),
509                 }
510             } else {
511                 if !frame.is_end_stream() {
512                     // Receiving trailers that don't set EOS is a "malformed"
513                     // message. Malformed messages are a stream error.
514                     proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id);
515                     return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
516                 }
517 
518                 actions.recv.recv_trailers(frame, stream)
519             };
520 
521             actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
522         })
523     }
524 
recv_data<B>( &mut self, peer: peer::Dyn, send_buffer: &SendBuffer<B>, frame: frame::Data, ) -> Result<(), Error>525     fn recv_data<B>(
526         &mut self,
527         peer: peer::Dyn,
528         send_buffer: &SendBuffer<B>,
529         frame: frame::Data,
530     ) -> Result<(), Error> {
531         let id = frame.stream_id();
532 
533         let stream = match self.store.find_mut(&id) {
534             Some(stream) => stream,
535             None => {
536                 // The GOAWAY process has begun. All streams with a greater ID
537                 // than specified as part of GOAWAY should be ignored.
538                 if id > self.actions.recv.max_stream_id() {
539                     tracing::trace!(
540                         "id ({:?}) > max_stream_id ({:?}), ignoring DATA",
541                         id,
542                         self.actions.recv.max_stream_id()
543                     );
544                     return Ok(());
545                 }
546 
547                 if self.actions.may_have_forgotten_stream(peer, id) {
548                     tracing::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,);
549 
550                     let sz = frame.payload().len();
551                     // This should have been enforced at the codec::FramedRead layer, so
552                     // this is just a sanity check.
553                     assert!(sz <= super::MAX_WINDOW_SIZE as usize);
554                     let sz = sz as WindowSize;
555 
556                     self.actions.recv.ignore_data(sz)?;
557                     return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
558                 }
559 
560                 proto_err!(conn: "recv_data: stream not found; id={:?}", id);
561                 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
562             }
563         };
564 
565         let actions = &mut self.actions;
566         let mut send_buffer = send_buffer.inner.lock().unwrap();
567         let send_buffer = &mut *send_buffer;
568 
569         self.counts.transition(stream, |counts, stream| {
570             let sz = frame.payload().len();
571             let res = actions.recv.recv_data(frame, stream);
572 
573             // Any stream error after receiving a DATA frame means
574             // we won't give the data to the user, and so they can't
575             // release the capacity. We do it automatically.
576             if let Err(Error::Reset(..)) = res {
577                 actions
578                     .recv
579                     .release_connection_capacity(sz as WindowSize, &mut None);
580             }
581             actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
582         })
583     }
584 
recv_reset<B>( &mut self, send_buffer: &SendBuffer<B>, frame: frame::Reset, ) -> Result<(), Error>585     fn recv_reset<B>(
586         &mut self,
587         send_buffer: &SendBuffer<B>,
588         frame: frame::Reset,
589     ) -> Result<(), Error> {
590         let id = frame.stream_id();
591 
592         if id.is_zero() {
593             proto_err!(conn: "recv_reset: invalid stream ID 0");
594             return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
595         }
596 
597         // The GOAWAY process has begun. All streams with a greater ID than
598         // specified as part of GOAWAY should be ignored.
599         if id > self.actions.recv.max_stream_id() {
600             tracing::trace!(
601                 "id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM",
602                 id,
603                 self.actions.recv.max_stream_id()
604             );
605             return Ok(());
606         }
607 
608         let stream = match self.store.find_mut(&id) {
609             Some(stream) => stream,
610             None => {
611                 // TODO: Are there other error cases?
612                 self.actions
613                     .ensure_not_idle(self.counts.peer(), id)
614                     .map_err(Error::library_go_away)?;
615 
616                 return Ok(());
617             }
618         };
619 
620         let mut send_buffer = send_buffer.inner.lock().unwrap();
621         let send_buffer = &mut *send_buffer;
622 
623         let actions = &mut self.actions;
624 
625         self.counts.transition(stream, |counts, stream| {
626             actions.recv.recv_reset(frame, stream, counts)?;
627             actions.send.handle_error(send_buffer, stream, counts);
628             assert!(stream.state.is_closed());
629             Ok(())
630         })
631     }
632 
recv_window_update<B>( &mut self, send_buffer: &SendBuffer<B>, frame: frame::WindowUpdate, ) -> Result<(), Error>633     fn recv_window_update<B>(
634         &mut self,
635         send_buffer: &SendBuffer<B>,
636         frame: frame::WindowUpdate,
637     ) -> Result<(), Error> {
638         let id = frame.stream_id();
639 
640         let mut send_buffer = send_buffer.inner.lock().unwrap();
641         let send_buffer = &mut *send_buffer;
642 
643         if id.is_zero() {
644             self.actions
645                 .send
646                 .recv_connection_window_update(frame, &mut self.store, &mut self.counts)
647                 .map_err(Error::library_go_away)?;
648         } else {
649             // The remote may send window updates for streams that the local now
650             // considers closed. It's ok...
651             if let Some(mut stream) = self.store.find_mut(&id) {
652                 // This result is ignored as there is nothing to do when there
653                 // is an error. The stream is reset by the function on error and
654                 // the error is informational.
655                 let _ = self.actions.send.recv_stream_window_update(
656                     frame.size_increment(),
657                     send_buffer,
658                     &mut stream,
659                     &mut self.counts,
660                     &mut self.actions.task,
661                 );
662             } else {
663                 self.actions
664                     .ensure_not_idle(self.counts.peer(), id)
665                     .map_err(Error::library_go_away)?;
666             }
667         }
668 
669         Ok(())
670     }
671 
handle_error<B>(&mut self, send_buffer: &SendBuffer<B>, err: proto::Error) -> StreamId672     fn handle_error<B>(&mut self, send_buffer: &SendBuffer<B>, err: proto::Error) -> StreamId {
673         let actions = &mut self.actions;
674         let counts = &mut self.counts;
675         let mut send_buffer = send_buffer.inner.lock().unwrap();
676         let send_buffer = &mut *send_buffer;
677 
678         let last_processed_id = actions.recv.last_processed_id();
679 
680         self.store.for_each(|stream| {
681             counts.transition(stream, |counts, stream| {
682                 actions.recv.handle_error(&err, &mut *stream);
683                 actions.send.handle_error(send_buffer, stream, counts);
684             })
685         });
686 
687         actions.conn_error = Some(err);
688 
689         last_processed_id
690     }
691 
recv_go_away<B>( &mut self, send_buffer: &SendBuffer<B>, frame: &frame::GoAway, ) -> Result<(), Error>692     fn recv_go_away<B>(
693         &mut self,
694         send_buffer: &SendBuffer<B>,
695         frame: &frame::GoAway,
696     ) -> Result<(), Error> {
697         let actions = &mut self.actions;
698         let counts = &mut self.counts;
699         let mut send_buffer = send_buffer.inner.lock().unwrap();
700         let send_buffer = &mut *send_buffer;
701 
702         let last_stream_id = frame.last_stream_id();
703 
704         actions.send.recv_go_away(last_stream_id)?;
705 
706         let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason());
707 
708         self.store.for_each(|stream| {
709             if stream.id > last_stream_id {
710                 counts.transition(stream, |counts, stream| {
711                     actions.recv.handle_error(&err, &mut *stream);
712                     actions.send.handle_error(send_buffer, stream, counts);
713                 })
714             }
715         });
716 
717         actions.conn_error = Some(err);
718 
719         Ok(())
720     }
721 
recv_push_promise<B>( &mut self, send_buffer: &SendBuffer<B>, frame: frame::PushPromise, ) -> Result<(), Error>722     fn recv_push_promise<B>(
723         &mut self,
724         send_buffer: &SendBuffer<B>,
725         frame: frame::PushPromise,
726     ) -> Result<(), Error> {
727         let id = frame.stream_id();
728         let promised_id = frame.promised_id();
729 
730         // First, ensure that the initiating stream is still in a valid state.
731         let parent_key = match self.store.find_mut(&id) {
732             Some(stream) => {
733                 // The GOAWAY process has begun. All streams with a greater ID
734                 // than specified as part of GOAWAY should be ignored.
735                 if id > self.actions.recv.max_stream_id() {
736                     tracing::trace!(
737                         "id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE",
738                         id,
739                         self.actions.recv.max_stream_id()
740                     );
741                     return Ok(());
742                 }
743 
744                 // The stream must be receive open
745                 if !stream.state.ensure_recv_open()? {
746                     proto_err!(conn: "recv_push_promise: initiating stream is not opened");
747                     return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
748                 }
749 
750                 stream.key()
751             }
752             None => {
753                 proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state");
754                 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
755             }
756         };
757 
758         // TODO: Streams in the reserved states do not count towards the concurrency
759         // limit. However, it seems like there should be a cap otherwise this
760         // could grow in memory indefinitely.
761 
762         // Ensure that we can reserve streams
763         self.actions.recv.ensure_can_reserve()?;
764 
765         // Next, open the stream.
766         //
767         // If `None` is returned, then the stream is being refused. There is no
768         // further work to be done.
769         if self
770             .actions
771             .recv
772             .open(promised_id, Open::PushPromise, &mut self.counts)?
773             .is_none()
774         {
775             return Ok(());
776         }
777 
778         // Try to handle the frame and create a corresponding key for the pushed stream
779         // this requires a bit of indirection to make the borrow checker happy.
780         let child_key: Option<store::Key> = {
781             // Create state for the stream
782             let stream = self.store.insert(promised_id, {
783                 Stream::new(
784                     promised_id,
785                     self.actions.send.init_window_sz(),
786                     self.actions.recv.init_window_sz(),
787                 )
788             });
789 
790             let actions = &mut self.actions;
791 
792             self.counts.transition(stream, |counts, stream| {
793                 let stream_valid = actions.recv.recv_push_promise(frame, stream);
794 
795                 match stream_valid {
796                     Ok(()) => Ok(Some(stream.key())),
797                     _ => {
798                         let mut send_buffer = send_buffer.inner.lock().unwrap();
799                         actions
800                             .reset_on_recv_stream_err(
801                                 &mut *send_buffer,
802                                 stream,
803                                 counts,
804                                 stream_valid,
805                             )
806                             .map(|()| None)
807                     }
808                 }
809             })?
810         };
811         // If we're successful, push the headers and stream...
812         if let Some(child) = child_key {
813             let mut ppp = self.store[parent_key].pending_push_promises.take();
814             ppp.push(&mut self.store.resolve(child));
815 
816             let parent = &mut self.store.resolve(parent_key);
817             parent.pending_push_promises = ppp;
818             parent.notify_recv();
819         };
820 
821         Ok(())
822     }
823 
recv_eof<B>( &mut self, send_buffer: &SendBuffer<B>, clear_pending_accept: bool, ) -> Result<(), ()>824     fn recv_eof<B>(
825         &mut self,
826         send_buffer: &SendBuffer<B>,
827         clear_pending_accept: bool,
828     ) -> Result<(), ()> {
829         let actions = &mut self.actions;
830         let counts = &mut self.counts;
831         let mut send_buffer = send_buffer.inner.lock().unwrap();
832         let send_buffer = &mut *send_buffer;
833 
834         if actions.conn_error.is_none() {
835             actions.conn_error = Some(
836                 io::Error::new(
837                     io::ErrorKind::BrokenPipe,
838                     "connection closed because of a broken pipe",
839                 )
840                 .into(),
841             );
842         }
843 
844         tracing::trace!("Streams::recv_eof");
845 
846         self.store.for_each(|stream| {
847             counts.transition(stream, |counts, stream| {
848                 actions.recv.recv_eof(stream);
849 
850                 // This handles resetting send state associated with the
851                 // stream
852                 actions.send.handle_error(send_buffer, stream, counts);
853             })
854         });
855 
856         actions.clear_queues(clear_pending_accept, &mut self.store, counts);
857         Ok(())
858     }
859 
poll_complete<T, B>( &mut self, send_buffer: &SendBuffer<B>, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,860     fn poll_complete<T, B>(
861         &mut self,
862         send_buffer: &SendBuffer<B>,
863         cx: &mut Context,
864         dst: &mut Codec<T, Prioritized<B>>,
865     ) -> Poll<io::Result<()>>
866     where
867         T: AsyncWrite + Unpin,
868         B: Buf,
869     {
870         let mut send_buffer = send_buffer.inner.lock().unwrap();
871         let send_buffer = &mut *send_buffer;
872 
873         // Send WINDOW_UPDATE frames first
874         //
875         // TODO: It would probably be better to interleave updates w/ data
876         // frames.
877         ready!(self
878             .actions
879             .recv
880             .poll_complete(cx, &mut self.store, &mut self.counts, dst))?;
881 
882         // Send any other pending frames
883         ready!(self.actions.send.poll_complete(
884             cx,
885             send_buffer,
886             &mut self.store,
887             &mut self.counts,
888             dst
889         ))?;
890 
891         // Nothing else to do, track the task
892         self.actions.task = Some(cx.waker().clone());
893 
894         Poll::Ready(Ok(()))
895     }
896 
send_reset<B>(&mut self, send_buffer: &SendBuffer<B>, id: StreamId, reason: Reason)897     fn send_reset<B>(&mut self, send_buffer: &SendBuffer<B>, id: StreamId, reason: Reason) {
898         let key = match self.store.find_entry(id) {
899             Entry::Occupied(e) => e.key(),
900             Entry::Vacant(e) => {
901                 // Resetting a stream we don't know about? That could be OK...
902                 //
903                 // 1. As a server, we just received a request, but that request
904                 //    was bad, so we're resetting before even accepting it.
905                 //    This is totally fine.
906                 //
907                 // 2. The remote may have sent us a frame on new stream that
908                 //    it's *not* supposed to have done, and thus, we don't know
909                 //    the stream. In that case, sending a reset will "open" the
910                 //    stream in our store. Maybe that should be a connection
911                 //    error instead? At least for now, we need to update what
912                 //    our vision of the next stream is.
913                 if self.counts.peer().is_local_init(id) {
914                     // We normally would open this stream, so update our
915                     // next-send-id record.
916                     self.actions.send.maybe_reset_next_stream_id(id);
917                 } else {
918                     // We normally would recv this stream, so update our
919                     // next-recv-id record.
920                     self.actions.recv.maybe_reset_next_stream_id(id);
921                 }
922 
923                 let stream = Stream::new(id, 0, 0);
924 
925                 e.insert(stream)
926             }
927         };
928 
929         let stream = self.store.resolve(key);
930         let mut send_buffer = send_buffer.inner.lock().unwrap();
931         let send_buffer = &mut *send_buffer;
932         self.actions.send_reset(
933             stream,
934             reason,
935             Initiator::Library,
936             &mut self.counts,
937             send_buffer,
938         );
939     }
940 }
941 
942 impl<B> Streams<B, client::Peer>
943 where
944     B: Buf,
945 {
poll_pending_open( &mut self, cx: &Context, pending: Option<&OpaqueStreamRef>, ) -> Poll<Result<(), crate::Error>>946     pub fn poll_pending_open(
947         &mut self,
948         cx: &Context,
949         pending: Option<&OpaqueStreamRef>,
950     ) -> Poll<Result<(), crate::Error>> {
951         let mut me = self.inner.lock().unwrap();
952         let me = &mut *me;
953 
954         me.actions.ensure_no_conn_error()?;
955         me.actions.send.ensure_next_stream_id()?;
956 
957         if let Some(pending) = pending {
958             let mut stream = me.store.resolve(pending.key);
959             tracing::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
960             if stream.is_pending_open {
961                 stream.wait_send(cx);
962                 return Poll::Pending;
963             }
964         }
965         Poll::Ready(Ok(()))
966     }
967 }
968 
969 impl<B, P> Streams<B, P>
970 where
971     P: Peer,
972 {
as_dyn(&self) -> DynStreams<B>973     pub fn as_dyn(&self) -> DynStreams<B> {
974         let Self {
975             inner,
976             send_buffer,
977             _p,
978         } = self;
979         DynStreams {
980             inner,
981             send_buffer,
982             peer: P::r#dyn(),
983         }
984     }
985 
986     /// This function is safe to call multiple times.
987     ///
988     /// A `Result` is returned to avoid panicking if the mutex is poisoned.
recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()>989     pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
990         self.as_dyn().recv_eof(clear_pending_accept)
991     }
992 
max_send_streams(&self) -> usize993     pub(crate) fn max_send_streams(&self) -> usize {
994         self.inner.lock().unwrap().counts.max_send_streams()
995     }
996 
max_recv_streams(&self) -> usize997     pub(crate) fn max_recv_streams(&self) -> usize {
998         self.inner.lock().unwrap().counts.max_recv_streams()
999     }
1000 
1001     #[cfg(feature = "unstable")]
num_active_streams(&self) -> usize1002     pub fn num_active_streams(&self) -> usize {
1003         let me = self.inner.lock().unwrap();
1004         me.store.num_active_streams()
1005     }
1006 
has_streams(&self) -> bool1007     pub fn has_streams(&self) -> bool {
1008         let me = self.inner.lock().unwrap();
1009         me.counts.has_streams()
1010     }
1011 
has_streams_or_other_references(&self) -> bool1012     pub fn has_streams_or_other_references(&self) -> bool {
1013         let me = self.inner.lock().unwrap();
1014         me.counts.has_streams() || me.refs > 1
1015     }
1016 
1017     #[cfg(feature = "unstable")]
num_wired_streams(&self) -> usize1018     pub fn num_wired_streams(&self) -> usize {
1019         let me = self.inner.lock().unwrap();
1020         me.store.num_wired_streams()
1021     }
1022 }
1023 
1024 // no derive because we don't need B and P to be Clone.
1025 impl<B, P> Clone for Streams<B, P>
1026 where
1027     P: Peer,
1028 {
clone(&self) -> Self1029     fn clone(&self) -> Self {
1030         self.inner.lock().unwrap().refs += 1;
1031         Streams {
1032             inner: self.inner.clone(),
1033             send_buffer: self.send_buffer.clone(),
1034             _p: ::std::marker::PhantomData,
1035         }
1036     }
1037 }
1038 
1039 impl<B, P> Drop for Streams<B, P>
1040 where
1041     P: Peer,
1042 {
drop(&mut self)1043     fn drop(&mut self) {
1044         if let Ok(mut inner) = self.inner.lock() {
1045             inner.refs -= 1;
1046             if inner.refs == 1 {
1047                 if let Some(task) = inner.actions.task.take() {
1048                     task.wake();
1049                 }
1050             }
1051         }
1052     }
1053 }
1054 
1055 // ===== impl StreamRef =====
1056 
1057 impl<B> StreamRef<B> {
send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError> where B: Buf,1058     pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError>
1059     where
1060         B: Buf,
1061     {
1062         let mut me = self.opaque.inner.lock().unwrap();
1063         let me = &mut *me;
1064 
1065         let stream = me.store.resolve(self.opaque.key);
1066         let actions = &mut me.actions;
1067         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1068         let send_buffer = &mut *send_buffer;
1069 
1070         me.counts.transition(stream, |counts, stream| {
1071             // Create the data frame
1072             let mut frame = frame::Data::new(stream.id, data);
1073             frame.set_end_stream(end_stream);
1074 
1075             // Send the data frame
1076             actions
1077                 .send
1078                 .send_data(frame, send_buffer, stream, counts, &mut actions.task)
1079         })
1080     }
1081 
send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError>1082     pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> {
1083         let mut me = self.opaque.inner.lock().unwrap();
1084         let me = &mut *me;
1085 
1086         let stream = me.store.resolve(self.opaque.key);
1087         let actions = &mut me.actions;
1088         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1089         let send_buffer = &mut *send_buffer;
1090 
1091         me.counts.transition(stream, |counts, stream| {
1092             // Create the trailers frame
1093             let frame = frame::Headers::trailers(stream.id, trailers);
1094 
1095             // Send the trailers frame
1096             actions
1097                 .send
1098                 .send_trailers(frame, send_buffer, stream, counts, &mut actions.task)
1099         })
1100     }
1101 
send_reset(&mut self, reason: Reason)1102     pub fn send_reset(&mut self, reason: Reason) {
1103         let mut me = self.opaque.inner.lock().unwrap();
1104         let me = &mut *me;
1105 
1106         let stream = me.store.resolve(self.opaque.key);
1107         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1108         let send_buffer = &mut *send_buffer;
1109 
1110         me.actions
1111             .send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer);
1112     }
1113 
send_response( &mut self, mut response: Response<()>, end_of_stream: bool, ) -> Result<(), UserError>1114     pub fn send_response(
1115         &mut self,
1116         mut response: Response<()>,
1117         end_of_stream: bool,
1118     ) -> Result<(), UserError> {
1119         // Clear before taking lock, incase extensions contain a StreamRef.
1120         response.extensions_mut().clear();
1121         let mut me = self.opaque.inner.lock().unwrap();
1122         let me = &mut *me;
1123 
1124         let stream = me.store.resolve(self.opaque.key);
1125         let actions = &mut me.actions;
1126         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1127         let send_buffer = &mut *send_buffer;
1128 
1129         me.counts.transition(stream, |counts, stream| {
1130             let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream);
1131 
1132             actions
1133                 .send
1134                 .send_headers(frame, send_buffer, stream, counts, &mut actions.task)
1135         })
1136     }
1137 
send_push_promise( &mut self, mut request: Request<()>, ) -> Result<StreamRef<B>, UserError>1138     pub fn send_push_promise(
1139         &mut self,
1140         mut request: Request<()>,
1141     ) -> Result<StreamRef<B>, UserError> {
1142         // Clear before taking lock, incase extensions contain a StreamRef.
1143         request.extensions_mut().clear();
1144         let mut me = self.opaque.inner.lock().unwrap();
1145         let me = &mut *me;
1146 
1147         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1148         let send_buffer = &mut *send_buffer;
1149 
1150         let actions = &mut me.actions;
1151         let promised_id = actions.send.reserve_local()?;
1152 
1153         let child_key = {
1154             let mut child_stream = me.store.insert(
1155                 promised_id,
1156                 Stream::new(
1157                     promised_id,
1158                     actions.send.init_window_sz(),
1159                     actions.recv.init_window_sz(),
1160                 ),
1161             );
1162             child_stream.state.reserve_local()?;
1163             child_stream.is_pending_push = true;
1164             child_stream.key()
1165         };
1166 
1167         let pushed = {
1168             let mut stream = me.store.resolve(self.opaque.key);
1169 
1170             let frame = crate::server::Peer::convert_push_message(stream.id, promised_id, request)?;
1171 
1172             actions
1173                 .send
1174                 .send_push_promise(frame, send_buffer, &mut stream, &mut actions.task)
1175         };
1176 
1177         if let Err(err) = pushed {
1178             let mut child_stream = me.store.resolve(child_key);
1179             child_stream.unlink();
1180             child_stream.remove();
1181             return Err(err);
1182         }
1183 
1184         me.refs += 1;
1185         let opaque =
1186             OpaqueStreamRef::new(self.opaque.inner.clone(), &mut me.store.resolve(child_key));
1187 
1188         Ok(StreamRef {
1189             opaque,
1190             send_buffer: self.send_buffer.clone(),
1191         })
1192     }
1193 
1194     /// Called by the server after the stream is accepted. Given that clients
1195     /// initialize streams by sending HEADERS, the request will always be
1196     /// available.
1197     ///
1198     /// # Panics
1199     ///
1200     /// This function panics if the request isn't present.
take_request(&self) -> Request<()>1201     pub fn take_request(&self) -> Request<()> {
1202         let mut me = self.opaque.inner.lock().unwrap();
1203         let me = &mut *me;
1204 
1205         let mut stream = me.store.resolve(self.opaque.key);
1206         me.actions.recv.take_request(&mut stream)
1207     }
1208 
1209     /// Called by a client to see if the current stream is pending open
is_pending_open(&self) -> bool1210     pub fn is_pending_open(&self) -> bool {
1211         let mut me = self.opaque.inner.lock().unwrap();
1212         me.store.resolve(self.opaque.key).is_pending_open
1213     }
1214 
1215     /// Request capacity to send data
reserve_capacity(&mut self, capacity: WindowSize)1216     pub fn reserve_capacity(&mut self, capacity: WindowSize) {
1217         let mut me = self.opaque.inner.lock().unwrap();
1218         let me = &mut *me;
1219 
1220         let mut stream = me.store.resolve(self.opaque.key);
1221 
1222         me.actions
1223             .send
1224             .reserve_capacity(capacity, &mut stream, &mut me.counts)
1225     }
1226 
1227     /// Returns the stream's current send capacity.
capacity(&self) -> WindowSize1228     pub fn capacity(&self) -> WindowSize {
1229         let mut me = self.opaque.inner.lock().unwrap();
1230         let me = &mut *me;
1231 
1232         let mut stream = me.store.resolve(self.opaque.key);
1233 
1234         me.actions.send.capacity(&mut stream)
1235     }
1236 
1237     /// Request to be notified when the stream's capacity increases
poll_capacity(&mut self, cx: &Context) -> Poll<Option<Result<WindowSize, UserError>>>1238     pub fn poll_capacity(&mut self, cx: &Context) -> Poll<Option<Result<WindowSize, UserError>>> {
1239         let mut me = self.opaque.inner.lock().unwrap();
1240         let me = &mut *me;
1241 
1242         let mut stream = me.store.resolve(self.opaque.key);
1243 
1244         me.actions.send.poll_capacity(cx, &mut stream)
1245     }
1246 
1247     /// Request to be notified for if a `RST_STREAM` is received for this stream.
poll_reset( &mut self, cx: &Context, mode: proto::PollReset, ) -> Poll<Result<Reason, crate::Error>>1248     pub(crate) fn poll_reset(
1249         &mut self,
1250         cx: &Context,
1251         mode: proto::PollReset,
1252     ) -> Poll<Result<Reason, crate::Error>> {
1253         let mut me = self.opaque.inner.lock().unwrap();
1254         let me = &mut *me;
1255 
1256         let mut stream = me.store.resolve(self.opaque.key);
1257 
1258         me.actions
1259             .send
1260             .poll_reset(cx, &mut stream, mode)
1261             .map_err(From::from)
1262     }
1263 
clone_to_opaque(&self) -> OpaqueStreamRef1264     pub fn clone_to_opaque(&self) -> OpaqueStreamRef {
1265         self.opaque.clone()
1266     }
1267 
stream_id(&self) -> StreamId1268     pub fn stream_id(&self) -> StreamId {
1269         self.opaque.stream_id()
1270     }
1271 }
1272 
1273 impl<B> Clone for StreamRef<B> {
clone(&self) -> Self1274     fn clone(&self) -> Self {
1275         StreamRef {
1276             opaque: self.opaque.clone(),
1277             send_buffer: self.send_buffer.clone(),
1278         }
1279     }
1280 }
1281 
1282 // ===== impl OpaqueStreamRef =====
1283 
1284 impl OpaqueStreamRef {
new(inner: Arc<Mutex<Inner>>, stream: &mut store::Ptr) -> OpaqueStreamRef1285     fn new(inner: Arc<Mutex<Inner>>, stream: &mut store::Ptr) -> OpaqueStreamRef {
1286         stream.ref_inc();
1287         OpaqueStreamRef {
1288             inner,
1289             key: stream.key(),
1290         }
1291     }
1292     /// Called by a client to check for a received response.
poll_response(&mut self, cx: &Context) -> Poll<Result<Response<()>, proto::Error>>1293     pub fn poll_response(&mut self, cx: &Context) -> Poll<Result<Response<()>, proto::Error>> {
1294         let mut me = self.inner.lock().unwrap();
1295         let me = &mut *me;
1296 
1297         let mut stream = me.store.resolve(self.key);
1298 
1299         me.actions.recv.poll_response(cx, &mut stream)
1300     }
1301     /// Called by a client to check for a pushed request.
poll_pushed( &mut self, cx: &Context, ) -> Poll<Option<Result<(Request<()>, OpaqueStreamRef), proto::Error>>>1302     pub fn poll_pushed(
1303         &mut self,
1304         cx: &Context,
1305     ) -> Poll<Option<Result<(Request<()>, OpaqueStreamRef), proto::Error>>> {
1306         let mut me = self.inner.lock().unwrap();
1307         let me = &mut *me;
1308 
1309         let mut stream = me.store.resolve(self.key);
1310         me.actions
1311             .recv
1312             .poll_pushed(cx, &mut stream)
1313             .map_ok(|(h, key)| {
1314                 me.refs += 1;
1315                 let opaque_ref =
1316                     OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key));
1317                 (h, opaque_ref)
1318             })
1319     }
1320 
is_end_stream(&self) -> bool1321     pub fn is_end_stream(&self) -> bool {
1322         let mut me = self.inner.lock().unwrap();
1323         let me = &mut *me;
1324 
1325         let stream = me.store.resolve(self.key);
1326 
1327         me.actions.recv.is_end_stream(&stream)
1328     }
1329 
poll_data(&mut self, cx: &Context) -> Poll<Option<Result<Bytes, proto::Error>>>1330     pub fn poll_data(&mut self, cx: &Context) -> Poll<Option<Result<Bytes, proto::Error>>> {
1331         let mut me = self.inner.lock().unwrap();
1332         let me = &mut *me;
1333 
1334         let mut stream = me.store.resolve(self.key);
1335 
1336         me.actions.recv.poll_data(cx, &mut stream)
1337     }
1338 
poll_trailers(&mut self, cx: &Context) -> Poll<Option<Result<HeaderMap, proto::Error>>>1339     pub fn poll_trailers(&mut self, cx: &Context) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1340         let mut me = self.inner.lock().unwrap();
1341         let me = &mut *me;
1342 
1343         let mut stream = me.store.resolve(self.key);
1344 
1345         me.actions.recv.poll_trailers(cx, &mut stream)
1346     }
1347 
available_recv_capacity(&self) -> isize1348     pub(crate) fn available_recv_capacity(&self) -> isize {
1349         let me = self.inner.lock().unwrap();
1350         let me = &*me;
1351 
1352         let stream = &me.store[self.key];
1353         stream.recv_flow.available().into()
1354     }
1355 
used_recv_capacity(&self) -> WindowSize1356     pub(crate) fn used_recv_capacity(&self) -> WindowSize {
1357         let me = self.inner.lock().unwrap();
1358         let me = &*me;
1359 
1360         let stream = &me.store[self.key];
1361         stream.in_flight_recv_data
1362     }
1363 
1364     /// Releases recv capacity back to the peer. This may result in sending
1365     /// WINDOW_UPDATE frames on both the stream and connection.
release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError>1366     pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> {
1367         let mut me = self.inner.lock().unwrap();
1368         let me = &mut *me;
1369 
1370         let mut stream = me.store.resolve(self.key);
1371 
1372         me.actions
1373             .recv
1374             .release_capacity(capacity, &mut stream, &mut me.actions.task)
1375     }
1376 
1377     /// Clear the receive queue and set the status to no longer receive data frames.
clear_recv_buffer(&mut self)1378     pub(crate) fn clear_recv_buffer(&mut self) {
1379         let mut me = self.inner.lock().unwrap();
1380         let me = &mut *me;
1381 
1382         let mut stream = me.store.resolve(self.key);
1383         stream.is_recv = false;
1384         me.actions.recv.clear_recv_buffer(&mut stream);
1385     }
1386 
stream_id(&self) -> StreamId1387     pub fn stream_id(&self) -> StreamId {
1388         self.inner.lock().unwrap().store[self.key].id
1389     }
1390 }
1391 
1392 impl fmt::Debug for OpaqueStreamRef {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1393     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1394         use std::sync::TryLockError::*;
1395 
1396         match self.inner.try_lock() {
1397             Ok(me) => {
1398                 let stream = &me.store[self.key];
1399                 fmt.debug_struct("OpaqueStreamRef")
1400                     .field("stream_id", &stream.id)
1401                     .field("ref_count", &stream.ref_count)
1402                     .finish()
1403             }
1404             Err(Poisoned(_)) => fmt
1405                 .debug_struct("OpaqueStreamRef")
1406                 .field("inner", &"<Poisoned>")
1407                 .finish(),
1408             Err(WouldBlock) => fmt
1409                 .debug_struct("OpaqueStreamRef")
1410                 .field("inner", &"<Locked>")
1411                 .finish(),
1412         }
1413     }
1414 }
1415 
1416 impl Clone for OpaqueStreamRef {
clone(&self) -> Self1417     fn clone(&self) -> Self {
1418         // Increment the ref count
1419         let mut inner = self.inner.lock().unwrap();
1420         inner.store.resolve(self.key).ref_inc();
1421         inner.refs += 1;
1422 
1423         OpaqueStreamRef {
1424             inner: self.inner.clone(),
1425             key: self.key,
1426         }
1427     }
1428 }
1429 
1430 impl Drop for OpaqueStreamRef {
drop(&mut self)1431     fn drop(&mut self) {
1432         drop_stream_ref(&self.inner, self.key);
1433     }
1434 }
1435 
1436 // TODO: Move back in fn above
drop_stream_ref(inner: &Mutex<Inner>, key: store::Key)1437 fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
1438     let mut me = match inner.lock() {
1439         Ok(inner) => inner,
1440         Err(_) => {
1441             if ::std::thread::panicking() {
1442                 tracing::trace!("StreamRef::drop; mutex poisoned");
1443                 return;
1444             } else {
1445                 panic!("StreamRef::drop; mutex poisoned");
1446             }
1447         }
1448     };
1449 
1450     let me = &mut *me;
1451     me.refs -= 1;
1452     let mut stream = me.store.resolve(key);
1453 
1454     tracing::trace!("drop_stream_ref; stream={:?}", stream);
1455 
1456     // decrement the stream's ref count by 1.
1457     stream.ref_dec();
1458 
1459     let actions = &mut me.actions;
1460 
1461     // If the stream is not referenced and it is already
1462     // closed (does not have to go through logic below
1463     // of canceling the stream), we should notify the task
1464     // (connection) so that it can close properly
1465     if stream.ref_count == 0 && stream.is_closed() {
1466         if let Some(task) = actions.task.take() {
1467             task.wake();
1468         }
1469     }
1470 
1471     me.counts.transition(stream, |counts, stream| {
1472         maybe_cancel(stream, actions, counts);
1473 
1474         if stream.ref_count == 0 {
1475             // Release any recv window back to connection, no one can access
1476             // it anymore.
1477             actions
1478                 .recv
1479                 .release_closed_capacity(stream, &mut actions.task);
1480 
1481             // We won't be able to reach our push promises anymore
1482             let mut ppp = stream.pending_push_promises.take();
1483             while let Some(promise) = ppp.pop(stream.store_mut()) {
1484                 counts.transition(promise, |counts, stream| {
1485                     maybe_cancel(stream, actions, counts);
1486                 });
1487             }
1488         }
1489     });
1490 }
1491 
maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts)1492 fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts) {
1493     if stream.is_canceled_interest() {
1494         // Server is allowed to early respond without fully consuming the client input stream
1495         // But per the RFC, must send a RST_STREAM(NO_ERROR) in such cases. https://www.rfc-editor.org/rfc/rfc7540#section-8.1
1496         // Some other http2 implementation may interpret other error code as fatal if not respected (i.e: nginx https://trac.nginx.org/nginx/ticket/2376)
1497         let reason = if counts.peer().is_server()
1498             && stream.state.is_send_closed()
1499             && stream.state.is_recv_streaming()
1500         {
1501             Reason::NO_ERROR
1502         } else {
1503             Reason::CANCEL
1504         };
1505 
1506         actions
1507             .send
1508             .schedule_implicit_reset(stream, reason, counts, &mut actions.task);
1509         actions.recv.enqueue_reset_expiration(stream, counts);
1510     }
1511 }
1512 
1513 // ===== impl SendBuffer =====
1514 
1515 impl<B> SendBuffer<B> {
new() -> Self1516     fn new() -> Self {
1517         let inner = Mutex::new(Buffer::new());
1518         SendBuffer { inner }
1519     }
1520 
is_empty(&self) -> bool1521     pub fn is_empty(&self) -> bool {
1522         let buf = self.inner.lock().unwrap();
1523         buf.is_empty()
1524     }
1525 }
1526 
1527 // ===== impl Actions =====
1528 
1529 impl Actions {
send_reset<B>( &mut self, stream: store::Ptr, reason: Reason, initiator: Initiator, counts: &mut Counts, send_buffer: &mut Buffer<Frame<B>>, )1530     fn send_reset<B>(
1531         &mut self,
1532         stream: store::Ptr,
1533         reason: Reason,
1534         initiator: Initiator,
1535         counts: &mut Counts,
1536         send_buffer: &mut Buffer<Frame<B>>,
1537     ) {
1538         counts.transition(stream, |counts, stream| {
1539             self.send.send_reset(
1540                 reason,
1541                 initiator,
1542                 send_buffer,
1543                 stream,
1544                 counts,
1545                 &mut self.task,
1546             );
1547             self.recv.enqueue_reset_expiration(stream, counts);
1548             // if a RecvStream is parked, ensure it's notified
1549             stream.notify_recv();
1550         });
1551     }
1552 
reset_on_recv_stream_err<B>( &mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, counts: &mut Counts, res: Result<(), Error>, ) -> Result<(), Error>1553     fn reset_on_recv_stream_err<B>(
1554         &mut self,
1555         buffer: &mut Buffer<Frame<B>>,
1556         stream: &mut store::Ptr,
1557         counts: &mut Counts,
1558         res: Result<(), Error>,
1559     ) -> Result<(), Error> {
1560         if let Err(Error::Reset(stream_id, reason, initiator)) = res {
1561             debug_assert_eq!(stream_id, stream.id);
1562 
1563             if counts.can_inc_num_local_error_resets() {
1564                 counts.inc_num_local_error_resets();
1565 
1566                 // Reset the stream.
1567                 self.send
1568                     .send_reset(reason, initiator, buffer, stream, counts, &mut self.task);
1569                 Ok(())
1570             } else {
1571                 tracing::warn!(
1572                     "reset_on_recv_stream_err; locally-reset streams reached limit ({:?})",
1573                     counts.max_local_error_resets().unwrap(),
1574                 );
1575                 Err(Error::library_go_away_data(
1576                     Reason::ENHANCE_YOUR_CALM,
1577                     "too_many_internal_resets",
1578                 ))
1579             }
1580         } else {
1581             res
1582         }
1583     }
1584 
ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason>1585     fn ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason> {
1586         if peer.is_local_init(id) {
1587             self.send.ensure_not_idle(id)
1588         } else {
1589             self.recv.ensure_not_idle(id)
1590         }
1591     }
1592 
ensure_no_conn_error(&self) -> Result<(), proto::Error>1593     fn ensure_no_conn_error(&self) -> Result<(), proto::Error> {
1594         if let Some(ref err) = self.conn_error {
1595             Err(err.clone())
1596         } else {
1597             Ok(())
1598         }
1599     }
1600 
1601     /// Check if we possibly could have processed and since forgotten this stream.
1602     ///
1603     /// If we send a RST_STREAM for a stream, we will eventually "forget" about
1604     /// the stream to free up memory. It's possible that the remote peer had
1605     /// frames in-flight, and by the time we receive them, our own state is
1606     /// gone. We *could* tear everything down by sending a GOAWAY, but it
1607     /// is more likely to be latency/memory constraints that caused this,
1608     /// and not a bad actor. So be less catastrophic, the spec allows
1609     /// us to send another RST_STREAM of STREAM_CLOSED.
may_have_forgotten_stream(&self, peer: peer::Dyn, id: StreamId) -> bool1610     fn may_have_forgotten_stream(&self, peer: peer::Dyn, id: StreamId) -> bool {
1611         if id.is_zero() {
1612             return false;
1613         }
1614         if peer.is_local_init(id) {
1615             self.send.may_have_created_stream(id)
1616         } else {
1617             self.recv.may_have_created_stream(id)
1618         }
1619     }
1620 
clear_queues(&mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts)1621     fn clear_queues(&mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts) {
1622         self.recv.clear_queues(clear_pending_accept, store, counts);
1623         self.send.clear_queues(store, counts);
1624     }
1625 }
1626