1 use super::*;
2 
3 use std::usize;
4 
5 #[derive(Debug)]
6 pub(super) struct Counts {
7     /// Acting as a client or server. This allows us to track which values to
8     /// inc / dec.
9     peer: peer::Dyn,
10 
11     /// Maximum number of locally initiated streams
12     max_send_streams: usize,
13 
14     /// Current number of remote initiated streams
15     num_send_streams: usize,
16 
17     /// Maximum number of remote initiated streams
18     max_recv_streams: usize,
19 
20     /// Current number of locally initiated streams
21     num_recv_streams: usize,
22 
23     /// Maximum number of pending locally reset streams
24     max_local_reset_streams: usize,
25 
26     /// Current number of pending locally reset streams
27     num_local_reset_streams: usize,
28 
29     /// Max number of "pending accept" streams that were remotely reset
30     max_remote_reset_streams: usize,
31 
32     /// Current number of "pending accept" streams that were remotely reset
33     num_remote_reset_streams: usize,
34 
35     /// Maximum number of locally reset streams due to protocol error across
36     /// the lifetime of the connection.
37     ///
38     /// When this gets exceeded, we issue GOAWAYs.
39     max_local_error_reset_streams: Option<usize>,
40 
41     /// Total number of locally reset streams due to protocol error across the
42     /// lifetime of the connection.
43     num_local_error_reset_streams: usize,
44 }
45 
46 impl Counts {
47     /// Create a new `Counts` using the provided configuration values.
new(peer: peer::Dyn, config: &Config) -> Self48     pub fn new(peer: peer::Dyn, config: &Config) -> Self {
49         Counts {
50             peer,
51             max_send_streams: config.initial_max_send_streams,
52             num_send_streams: 0,
53             max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
54             num_recv_streams: 0,
55             max_local_reset_streams: config.local_reset_max,
56             num_local_reset_streams: 0,
57             max_remote_reset_streams: config.remote_reset_max,
58             num_remote_reset_streams: 0,
59             max_local_error_reset_streams: config.local_max_error_reset_streams,
60             num_local_error_reset_streams: 0,
61         }
62     }
63 
64     /// Returns true when the next opened stream will reach capacity of outbound streams
65     ///
66     /// The number of client send streams is incremented in prioritize; send_request has to guess if
67     /// it should wait before allowing another request to be sent.
next_send_stream_will_reach_capacity(&self) -> bool68     pub fn next_send_stream_will_reach_capacity(&self) -> bool {
69         self.max_send_streams <= (self.num_send_streams + 1)
70     }
71 
72     /// Returns the current peer
peer(&self) -> peer::Dyn73     pub fn peer(&self) -> peer::Dyn {
74         self.peer
75     }
76 
has_streams(&self) -> bool77     pub fn has_streams(&self) -> bool {
78         self.num_send_streams != 0 || self.num_recv_streams != 0
79     }
80 
81     /// Returns true if we can issue another local reset due to protocol error.
can_inc_num_local_error_resets(&self) -> bool82     pub fn can_inc_num_local_error_resets(&self) -> bool {
83         if let Some(max) = self.max_local_error_reset_streams {
84             max > self.num_local_error_reset_streams
85         } else {
86             true
87         }
88     }
89 
inc_num_local_error_resets(&mut self)90     pub fn inc_num_local_error_resets(&mut self) {
91         assert!(self.can_inc_num_local_error_resets());
92 
93         // Increment the number of remote initiated streams
94         self.num_local_error_reset_streams += 1;
95     }
96 
max_local_error_resets(&self) -> Option<usize>97     pub(crate) fn max_local_error_resets(&self) -> Option<usize> {
98         self.max_local_error_reset_streams
99     }
100 
101     /// Returns true if the receive stream concurrency can be incremented
can_inc_num_recv_streams(&self) -> bool102     pub fn can_inc_num_recv_streams(&self) -> bool {
103         self.max_recv_streams > self.num_recv_streams
104     }
105 
106     /// Increments the number of concurrent receive streams.
107     ///
108     /// # Panics
109     ///
110     /// Panics on failure as this should have been validated before hand.
inc_num_recv_streams(&mut self, stream: &mut store::Ptr)111     pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
112         assert!(self.can_inc_num_recv_streams());
113         assert!(!stream.is_counted);
114 
115         // Increment the number of remote initiated streams
116         self.num_recv_streams += 1;
117         stream.is_counted = true;
118     }
119 
120     /// Returns true if the send stream concurrency can be incremented
can_inc_num_send_streams(&self) -> bool121     pub fn can_inc_num_send_streams(&self) -> bool {
122         self.max_send_streams > self.num_send_streams
123     }
124 
125     /// Increments the number of concurrent send streams.
126     ///
127     /// # Panics
128     ///
129     /// Panics on failure as this should have been validated before hand.
inc_num_send_streams(&mut self, stream: &mut store::Ptr)130     pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
131         assert!(self.can_inc_num_send_streams());
132         assert!(!stream.is_counted);
133 
134         // Increment the number of remote initiated streams
135         self.num_send_streams += 1;
136         stream.is_counted = true;
137     }
138 
139     /// Returns true if the number of pending reset streams can be incremented.
can_inc_num_reset_streams(&self) -> bool140     pub fn can_inc_num_reset_streams(&self) -> bool {
141         self.max_local_reset_streams > self.num_local_reset_streams
142     }
143 
144     /// Increments the number of pending reset streams.
145     ///
146     /// # Panics
147     ///
148     /// Panics on failure as this should have been validated before hand.
inc_num_reset_streams(&mut self)149     pub fn inc_num_reset_streams(&mut self) {
150         assert!(self.can_inc_num_reset_streams());
151 
152         self.num_local_reset_streams += 1;
153     }
154 
max_remote_reset_streams(&self) -> usize155     pub(crate) fn max_remote_reset_streams(&self) -> usize {
156         self.max_remote_reset_streams
157     }
158 
159     /// Returns true if the number of pending REMOTE reset streams can be
160     /// incremented.
can_inc_num_remote_reset_streams(&self) -> bool161     pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
162         self.max_remote_reset_streams > self.num_remote_reset_streams
163     }
164 
165     /// Increments the number of pending REMOTE reset streams.
166     ///
167     /// # Panics
168     ///
169     /// Panics on failure as this should have been validated before hand.
inc_num_remote_reset_streams(&mut self)170     pub(crate) fn inc_num_remote_reset_streams(&mut self) {
171         assert!(self.can_inc_num_remote_reset_streams());
172 
173         self.num_remote_reset_streams += 1;
174     }
175 
dec_num_remote_reset_streams(&mut self)176     pub(crate) fn dec_num_remote_reset_streams(&mut self) {
177         assert!(self.num_remote_reset_streams > 0);
178 
179         self.num_remote_reset_streams -= 1;
180     }
181 
apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool)182     pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
183         match settings.max_concurrent_streams() {
184             Some(val) => self.max_send_streams = val as usize,
185             None if is_initial => self.max_send_streams = usize::MAX,
186             None => {}
187         }
188     }
189 
190     /// Run a block of code that could potentially transition a stream's state.
191     ///
192     /// If the stream state transitions to closed, this function will perform
193     /// all necessary cleanup.
194     ///
195     /// TODO: Is this function still needed?
transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U where F: FnOnce(&mut Self, &mut store::Ptr) -> U,196     pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
197     where
198         F: FnOnce(&mut Self, &mut store::Ptr) -> U,
199     {
200         // TODO: Does this need to be computed before performing the action?
201         let is_pending_reset = stream.is_pending_reset_expiration();
202 
203         // Run the action
204         let ret = f(self, &mut stream);
205 
206         self.transition_after(stream, is_pending_reset);
207 
208         ret
209     }
210 
211     // TODO: move this to macro?
transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool)212     pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
213         tracing::trace!(
214             "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
215              pending_send_empty={:?}; buffered_send_data={}; \
216              num_recv={}; num_send={}",
217             stream.id,
218             stream.state,
219             stream.is_closed(),
220             stream.pending_send.is_empty(),
221             stream.buffered_send_data,
222             self.num_recv_streams,
223             self.num_send_streams
224         );
225 
226         if stream.is_closed() {
227             if !stream.is_pending_reset_expiration() {
228                 stream.unlink();
229                 if is_reset_counted {
230                     self.dec_num_reset_streams();
231                 }
232             }
233 
234             if stream.is_counted {
235                 tracing::trace!("dec_num_streams; stream={:?}", stream.id);
236                 // Decrement the number of active streams.
237                 self.dec_num_streams(&mut stream);
238             }
239         }
240 
241         // Release the stream if it requires releasing
242         if stream.is_released() {
243             stream.remove();
244         }
245     }
246 
247     /// Returns the maximum number of streams that can be initiated by this
248     /// peer.
max_send_streams(&self) -> usize249     pub(crate) fn max_send_streams(&self) -> usize {
250         self.max_send_streams
251     }
252 
253     /// Returns the maximum number of streams that can be initiated by the
254     /// remote peer.
max_recv_streams(&self) -> usize255     pub(crate) fn max_recv_streams(&self) -> usize {
256         self.max_recv_streams
257     }
258 
dec_num_streams(&mut self, stream: &mut store::Ptr)259     fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
260         assert!(stream.is_counted);
261 
262         if self.peer.is_local_init(stream.id) {
263             assert!(self.num_send_streams > 0);
264             self.num_send_streams -= 1;
265             stream.is_counted = false;
266         } else {
267             assert!(self.num_recv_streams > 0);
268             self.num_recv_streams -= 1;
269             stream.is_counted = false;
270         }
271     }
272 
dec_num_reset_streams(&mut self)273     fn dec_num_reset_streams(&mut self) {
274         assert!(self.num_local_reset_streams > 0);
275         self.num_local_reset_streams -= 1;
276     }
277 }
278 
279 impl Drop for Counts {
drop(&mut self)280     fn drop(&mut self) {
281         use std::thread;
282 
283         if !thread::panicking() {
284             debug_assert!(!self.has_streams());
285         }
286     }
287 }
288