1 use super::*; 2 3 use std::usize; 4 5 #[derive(Debug)] 6 pub(super) struct Counts { 7 /// Acting as a client or server. This allows us to track which values to 8 /// inc / dec. 9 peer: peer::Dyn, 10 11 /// Maximum number of locally initiated streams 12 max_send_streams: usize, 13 14 /// Current number of remote initiated streams 15 num_send_streams: usize, 16 17 /// Maximum number of remote initiated streams 18 max_recv_streams: usize, 19 20 /// Current number of locally initiated streams 21 num_recv_streams: usize, 22 23 /// Maximum number of pending locally reset streams 24 max_local_reset_streams: usize, 25 26 /// Current number of pending locally reset streams 27 num_local_reset_streams: usize, 28 29 /// Max number of "pending accept" streams that were remotely reset 30 max_remote_reset_streams: usize, 31 32 /// Current number of "pending accept" streams that were remotely reset 33 num_remote_reset_streams: usize, 34 35 /// Maximum number of locally reset streams due to protocol error across 36 /// the lifetime of the connection. 37 /// 38 /// When this gets exceeded, we issue GOAWAYs. 39 max_local_error_reset_streams: Option<usize>, 40 41 /// Total number of locally reset streams due to protocol error across the 42 /// lifetime of the connection. 43 num_local_error_reset_streams: usize, 44 } 45 46 impl Counts { 47 /// Create a new `Counts` using the provided configuration values. new(peer: peer::Dyn, config: &Config) -> Self48 pub fn new(peer: peer::Dyn, config: &Config) -> Self { 49 Counts { 50 peer, 51 max_send_streams: config.initial_max_send_streams, 52 num_send_streams: 0, 53 max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), 54 num_recv_streams: 0, 55 max_local_reset_streams: config.local_reset_max, 56 num_local_reset_streams: 0, 57 max_remote_reset_streams: config.remote_reset_max, 58 num_remote_reset_streams: 0, 59 max_local_error_reset_streams: config.local_max_error_reset_streams, 60 num_local_error_reset_streams: 0, 61 } 62 } 63 64 /// Returns true when the next opened stream will reach capacity of outbound streams 65 /// 66 /// The number of client send streams is incremented in prioritize; send_request has to guess if 67 /// it should wait before allowing another request to be sent. next_send_stream_will_reach_capacity(&self) -> bool68 pub fn next_send_stream_will_reach_capacity(&self) -> bool { 69 self.max_send_streams <= (self.num_send_streams + 1) 70 } 71 72 /// Returns the current peer peer(&self) -> peer::Dyn73 pub fn peer(&self) -> peer::Dyn { 74 self.peer 75 } 76 has_streams(&self) -> bool77 pub fn has_streams(&self) -> bool { 78 self.num_send_streams != 0 || self.num_recv_streams != 0 79 } 80 81 /// Returns true if we can issue another local reset due to protocol error. can_inc_num_local_error_resets(&self) -> bool82 pub fn can_inc_num_local_error_resets(&self) -> bool { 83 if let Some(max) = self.max_local_error_reset_streams { 84 max > self.num_local_error_reset_streams 85 } else { 86 true 87 } 88 } 89 inc_num_local_error_resets(&mut self)90 pub fn inc_num_local_error_resets(&mut self) { 91 assert!(self.can_inc_num_local_error_resets()); 92 93 // Increment the number of remote initiated streams 94 self.num_local_error_reset_streams += 1; 95 } 96 max_local_error_resets(&self) -> Option<usize>97 pub(crate) fn max_local_error_resets(&self) -> Option<usize> { 98 self.max_local_error_reset_streams 99 } 100 101 /// Returns true if the receive stream concurrency can be incremented can_inc_num_recv_streams(&self) -> bool102 pub fn can_inc_num_recv_streams(&self) -> bool { 103 self.max_recv_streams > self.num_recv_streams 104 } 105 106 /// Increments the number of concurrent receive streams. 107 /// 108 /// # Panics 109 /// 110 /// Panics on failure as this should have been validated before hand. inc_num_recv_streams(&mut self, stream: &mut store::Ptr)111 pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) { 112 assert!(self.can_inc_num_recv_streams()); 113 assert!(!stream.is_counted); 114 115 // Increment the number of remote initiated streams 116 self.num_recv_streams += 1; 117 stream.is_counted = true; 118 } 119 120 /// Returns true if the send stream concurrency can be incremented can_inc_num_send_streams(&self) -> bool121 pub fn can_inc_num_send_streams(&self) -> bool { 122 self.max_send_streams > self.num_send_streams 123 } 124 125 /// Increments the number of concurrent send streams. 126 /// 127 /// # Panics 128 /// 129 /// Panics on failure as this should have been validated before hand. inc_num_send_streams(&mut self, stream: &mut store::Ptr)130 pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) { 131 assert!(self.can_inc_num_send_streams()); 132 assert!(!stream.is_counted); 133 134 // Increment the number of remote initiated streams 135 self.num_send_streams += 1; 136 stream.is_counted = true; 137 } 138 139 /// Returns true if the number of pending reset streams can be incremented. can_inc_num_reset_streams(&self) -> bool140 pub fn can_inc_num_reset_streams(&self) -> bool { 141 self.max_local_reset_streams > self.num_local_reset_streams 142 } 143 144 /// Increments the number of pending reset streams. 145 /// 146 /// # Panics 147 /// 148 /// Panics on failure as this should have been validated before hand. inc_num_reset_streams(&mut self)149 pub fn inc_num_reset_streams(&mut self) { 150 assert!(self.can_inc_num_reset_streams()); 151 152 self.num_local_reset_streams += 1; 153 } 154 max_remote_reset_streams(&self) -> usize155 pub(crate) fn max_remote_reset_streams(&self) -> usize { 156 self.max_remote_reset_streams 157 } 158 159 /// Returns true if the number of pending REMOTE reset streams can be 160 /// incremented. can_inc_num_remote_reset_streams(&self) -> bool161 pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool { 162 self.max_remote_reset_streams > self.num_remote_reset_streams 163 } 164 165 /// Increments the number of pending REMOTE reset streams. 166 /// 167 /// # Panics 168 /// 169 /// Panics on failure as this should have been validated before hand. inc_num_remote_reset_streams(&mut self)170 pub(crate) fn inc_num_remote_reset_streams(&mut self) { 171 assert!(self.can_inc_num_remote_reset_streams()); 172 173 self.num_remote_reset_streams += 1; 174 } 175 dec_num_remote_reset_streams(&mut self)176 pub(crate) fn dec_num_remote_reset_streams(&mut self) { 177 assert!(self.num_remote_reset_streams > 0); 178 179 self.num_remote_reset_streams -= 1; 180 } 181 apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool)182 pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) { 183 match settings.max_concurrent_streams() { 184 Some(val) => self.max_send_streams = val as usize, 185 None if is_initial => self.max_send_streams = usize::MAX, 186 None => {} 187 } 188 } 189 190 /// Run a block of code that could potentially transition a stream's state. 191 /// 192 /// If the stream state transitions to closed, this function will perform 193 /// all necessary cleanup. 194 /// 195 /// TODO: Is this function still needed? transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U where F: FnOnce(&mut Self, &mut store::Ptr) -> U,196 pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U 197 where 198 F: FnOnce(&mut Self, &mut store::Ptr) -> U, 199 { 200 // TODO: Does this need to be computed before performing the action? 201 let is_pending_reset = stream.is_pending_reset_expiration(); 202 203 // Run the action 204 let ret = f(self, &mut stream); 205 206 self.transition_after(stream, is_pending_reset); 207 208 ret 209 } 210 211 // TODO: move this to macro? transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool)212 pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) { 213 tracing::trace!( 214 "transition_after; stream={:?}; state={:?}; is_closed={:?}; \ 215 pending_send_empty={:?}; buffered_send_data={}; \ 216 num_recv={}; num_send={}", 217 stream.id, 218 stream.state, 219 stream.is_closed(), 220 stream.pending_send.is_empty(), 221 stream.buffered_send_data, 222 self.num_recv_streams, 223 self.num_send_streams 224 ); 225 226 if stream.is_closed() { 227 if !stream.is_pending_reset_expiration() { 228 stream.unlink(); 229 if is_reset_counted { 230 self.dec_num_reset_streams(); 231 } 232 } 233 234 if stream.is_counted { 235 tracing::trace!("dec_num_streams; stream={:?}", stream.id); 236 // Decrement the number of active streams. 237 self.dec_num_streams(&mut stream); 238 } 239 } 240 241 // Release the stream if it requires releasing 242 if stream.is_released() { 243 stream.remove(); 244 } 245 } 246 247 /// Returns the maximum number of streams that can be initiated by this 248 /// peer. max_send_streams(&self) -> usize249 pub(crate) fn max_send_streams(&self) -> usize { 250 self.max_send_streams 251 } 252 253 /// Returns the maximum number of streams that can be initiated by the 254 /// remote peer. max_recv_streams(&self) -> usize255 pub(crate) fn max_recv_streams(&self) -> usize { 256 self.max_recv_streams 257 } 258 dec_num_streams(&mut self, stream: &mut store::Ptr)259 fn dec_num_streams(&mut self, stream: &mut store::Ptr) { 260 assert!(stream.is_counted); 261 262 if self.peer.is_local_init(stream.id) { 263 assert!(self.num_send_streams > 0); 264 self.num_send_streams -= 1; 265 stream.is_counted = false; 266 } else { 267 assert!(self.num_recv_streams > 0); 268 self.num_recv_streams -= 1; 269 stream.is_counted = false; 270 } 271 } 272 dec_num_reset_streams(&mut self)273 fn dec_num_reset_streams(&mut self) { 274 assert!(self.num_local_reset_streams > 0); 275 self.num_local_reset_streams -= 1; 276 } 277 } 278 279 impl Drop for Counts { drop(&mut self)280 fn drop(&mut self) { 281 use std::thread; 282 283 if !thread::panicking() { 284 debug_assert!(!self.has_streams()); 285 } 286 } 287 } 288