1 use super::*; 2 3 use std::task::{Context, Waker}; 4 use std::time::Instant; 5 use std::usize; 6 7 /// Tracks Stream related state 8 /// 9 /// # Reference counting 10 /// 11 /// There can be a number of outstanding handles to a single Stream. These are 12 /// tracked using reference counting. The `ref_count` field represents the 13 /// number of outstanding userspace handles that can reach this stream. 14 /// 15 /// It's important to note that when the stream is placed in an internal queue 16 /// (such as an accept queue), this is **not** tracked by a reference count. 17 /// Thus, `ref_count` can be zero and the stream still has to be kept around. 18 #[derive(Debug)] 19 pub(super) struct Stream { 20 /// The h2 stream identifier 21 pub id: StreamId, 22 23 /// Current state of the stream 24 pub state: State, 25 26 /// Set to `true` when the stream is counted against the connection's max 27 /// concurrent streams. 28 pub is_counted: bool, 29 30 /// Number of outstanding handles pointing to this stream 31 pub ref_count: usize, 32 33 // ===== Fields related to sending ===== 34 /// Next node in the accept linked list 35 pub next_pending_send: Option<store::Key>, 36 37 /// Set to true when the stream is pending accept 38 pub is_pending_send: bool, 39 40 /// Send data flow control 41 pub send_flow: FlowControl, 42 43 /// Amount of send capacity that has been requested, but not yet allocated. 44 pub requested_send_capacity: WindowSize, 45 46 /// Amount of data buffered at the prioritization layer. 47 /// TODO: Technically this could be greater than the window size... 48 pub buffered_send_data: usize, 49 50 /// Task tracking additional send capacity (i.e. window updates). 51 send_task: Option<Waker>, 52 53 /// Frames pending for this stream being sent to the socket 54 pub pending_send: buffer::Deque, 55 56 /// Next node in the linked list of streams waiting for additional 57 /// connection level capacity. 58 pub next_pending_send_capacity: Option<store::Key>, 59 60 /// True if the stream is waiting for outbound connection capacity 61 pub is_pending_send_capacity: bool, 62 63 /// Set to true when the send capacity has been incremented 64 pub send_capacity_inc: bool, 65 66 /// Next node in the open linked list 67 pub next_open: Option<store::Key>, 68 69 /// Set to true when the stream is pending to be opened 70 pub is_pending_open: bool, 71 72 /// Set to true when a push is pending for this stream 73 pub is_pending_push: bool, 74 75 // ===== Fields related to receiving ===== 76 /// Next node in the accept linked list 77 pub next_pending_accept: Option<store::Key>, 78 79 /// Set to true when the stream is pending accept 80 pub is_pending_accept: bool, 81 82 /// Receive data flow control 83 pub recv_flow: FlowControl, 84 85 pub in_flight_recv_data: WindowSize, 86 87 /// Next node in the linked list of streams waiting to send window updates. 88 pub next_window_update: Option<store::Key>, 89 90 /// True if the stream is waiting to send a window update 91 pub is_pending_window_update: bool, 92 93 /// The time when this stream may have been locally reset. 94 pub reset_at: Option<Instant>, 95 96 /// Next node in list of reset streams that should expire eventually 97 pub next_reset_expire: Option<store::Key>, 98 99 /// Frames pending for this stream to read 100 pub pending_recv: buffer::Deque, 101 102 /// When the RecvStream drop occurs, no data should be received. 103 pub is_recv: bool, 104 105 /// Task tracking receiving frames 106 pub recv_task: Option<Waker>, 107 108 /// The stream's pending push promises 109 pub pending_push_promises: store::Queue<NextAccept>, 110 111 /// Validate content-length headers 112 pub content_length: ContentLength, 113 } 114 115 /// State related to validating a stream's content-length 116 #[derive(Debug)] 117 pub enum ContentLength { 118 Omitted, 119 Head, 120 Remaining(u64), 121 } 122 123 #[derive(Debug)] 124 pub(super) struct NextAccept; 125 126 #[derive(Debug)] 127 pub(super) struct NextSend; 128 129 #[derive(Debug)] 130 pub(super) struct NextSendCapacity; 131 132 #[derive(Debug)] 133 pub(super) struct NextWindowUpdate; 134 135 #[derive(Debug)] 136 pub(super) struct NextOpen; 137 138 #[derive(Debug)] 139 pub(super) struct NextResetExpire; 140 141 impl Stream { new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream142 pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { 143 let mut send_flow = FlowControl::new(); 144 let mut recv_flow = FlowControl::new(); 145 146 recv_flow 147 .inc_window(init_recv_window) 148 .expect("invalid initial receive window"); 149 // TODO: proper error handling? 150 let _res = recv_flow.assign_capacity(init_recv_window); 151 debug_assert!(_res.is_ok()); 152 153 send_flow 154 .inc_window(init_send_window) 155 .expect("invalid initial send window size"); 156 157 Stream { 158 id, 159 state: State::default(), 160 ref_count: 0, 161 is_counted: false, 162 163 // ===== Fields related to sending ===== 164 next_pending_send: None, 165 is_pending_send: false, 166 send_flow, 167 requested_send_capacity: 0, 168 buffered_send_data: 0, 169 send_task: None, 170 pending_send: buffer::Deque::new(), 171 is_pending_send_capacity: false, 172 next_pending_send_capacity: None, 173 send_capacity_inc: false, 174 is_pending_open: false, 175 next_open: None, 176 is_pending_push: false, 177 178 // ===== Fields related to receiving ===== 179 next_pending_accept: None, 180 is_pending_accept: false, 181 recv_flow, 182 in_flight_recv_data: 0, 183 next_window_update: None, 184 is_pending_window_update: false, 185 reset_at: None, 186 next_reset_expire: None, 187 pending_recv: buffer::Deque::new(), 188 is_recv: true, 189 recv_task: None, 190 pending_push_promises: store::Queue::new(), 191 content_length: ContentLength::Omitted, 192 } 193 } 194 195 /// Increment the stream's ref count ref_inc(&mut self)196 pub fn ref_inc(&mut self) { 197 assert!(self.ref_count < usize::MAX); 198 self.ref_count += 1; 199 } 200 201 /// Decrements the stream's ref count ref_dec(&mut self)202 pub fn ref_dec(&mut self) { 203 assert!(self.ref_count > 0); 204 self.ref_count -= 1; 205 } 206 207 /// Returns true if stream is currently being held for some time because of 208 /// a local reset. is_pending_reset_expiration(&self) -> bool209 pub fn is_pending_reset_expiration(&self) -> bool { 210 self.reset_at.is_some() 211 } 212 213 /// Returns true if frames for this stream are ready to be sent over the wire is_send_ready(&self) -> bool214 pub fn is_send_ready(&self) -> bool { 215 // Why do we check pending_open? 216 // 217 // We allow users to call send_request() which schedules a stream to be pending_open 218 // if there is no room according to the concurrency limit (max_send_streams), and we 219 // also allow data to be buffered for send with send_data() if there is no capacity for 220 // the stream to send the data, which attempts to place the stream in pending_send. 221 // If the stream is not open, we don't want the stream to be scheduled for 222 // execution (pending_send). Note that if the stream is in pending_open, it will be 223 // pushed to pending_send when there is room for an open stream. 224 // 225 // In pending_push we track whether a PushPromise still needs to be sent 226 // from a different stream before we can start sending frames on this one. 227 // This is different from the "open" check because reserved streams don't count 228 // toward the concurrency limit. 229 // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 230 !self.is_pending_open && !self.is_pending_push 231 } 232 233 /// Returns true if the stream is closed is_closed(&self) -> bool234 pub fn is_closed(&self) -> bool { 235 // The state has fully transitioned to closed. 236 self.state.is_closed() && 237 // Because outbound frames transition the stream state before being 238 // buffered, we have to ensure that all frames have been flushed. 239 self.pending_send.is_empty() && 240 // Sometimes large data frames are sent out in chunks. After a chunk 241 // of the frame is sent, the remainder is pushed back onto the send 242 // queue to be rescheduled. 243 // 244 // Checking for additional buffered data lets us catch this case. 245 self.buffered_send_data == 0 246 } 247 248 /// Returns true if the stream is no longer in use is_released(&self) -> bool249 pub fn is_released(&self) -> bool { 250 // The stream is closed and fully flushed 251 self.is_closed() && 252 // There are no more outstanding references to the stream 253 self.ref_count == 0 && 254 // The stream is not in any queue 255 !self.is_pending_send && !self.is_pending_send_capacity && 256 !self.is_pending_accept && !self.is_pending_window_update && 257 !self.is_pending_open && self.reset_at.is_none() 258 } 259 260 /// Returns true when the consumer of the stream has dropped all handles 261 /// (indicating no further interest in the stream) and the stream state is 262 /// not actually closed. 263 /// 264 /// In this case, a reset should be sent. is_canceled_interest(&self) -> bool265 pub fn is_canceled_interest(&self) -> bool { 266 self.ref_count == 0 && !self.state.is_closed() 267 } 268 269 /// Current available stream send capacity capacity(&self, max_buffer_size: usize) -> WindowSize270 pub fn capacity(&self, max_buffer_size: usize) -> WindowSize { 271 let available = self.send_flow.available().as_size() as usize; 272 let buffered = self.buffered_send_data; 273 274 available.min(max_buffer_size).saturating_sub(buffered) as WindowSize 275 } 276 assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize)277 pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { 278 let prev_capacity = self.capacity(max_buffer_size); 279 debug_assert!(capacity > 0); 280 // TODO: proper error handling 281 let _res = self.send_flow.assign_capacity(capacity); 282 debug_assert!(_res.is_ok()); 283 284 tracing::trace!( 285 " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", 286 self.send_flow.available(), 287 self.buffered_send_data, 288 self.id, 289 max_buffer_size, 290 prev_capacity, 291 ); 292 293 if prev_capacity < self.capacity(max_buffer_size) { 294 self.notify_capacity(); 295 } 296 } 297 send_data(&mut self, len: WindowSize, max_buffer_size: usize)298 pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) { 299 let prev_capacity = self.capacity(max_buffer_size); 300 301 // TODO: proper error handling 302 let _res = self.send_flow.send_data(len); 303 debug_assert!(_res.is_ok()); 304 305 // Decrement the stream's buffered data counter 306 debug_assert!(self.buffered_send_data >= len as usize); 307 self.buffered_send_data -= len as usize; 308 self.requested_send_capacity -= len; 309 310 tracing::trace!( 311 " sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", 312 self.send_flow.available(), 313 self.buffered_send_data, 314 self.id, 315 max_buffer_size, 316 prev_capacity, 317 ); 318 319 if prev_capacity < self.capacity(max_buffer_size) { 320 self.notify_capacity(); 321 } 322 } 323 324 /// If the capacity was limited because of the max_send_buffer_size, 325 /// then consider waking the send task again... notify_capacity(&mut self)326 pub fn notify_capacity(&mut self) { 327 self.send_capacity_inc = true; 328 tracing::trace!(" notifying task"); 329 self.notify_send(); 330 } 331 332 /// Returns `Err` when the decrement cannot be completed due to overflow. dec_content_length(&mut self, len: usize) -> Result<(), ()>333 pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { 334 match self.content_length { 335 ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { 336 Some(val) => *rem = val, 337 None => return Err(()), 338 }, 339 ContentLength::Head => { 340 if len != 0 { 341 return Err(()); 342 } 343 } 344 _ => {} 345 } 346 347 Ok(()) 348 } 349 ensure_content_length_zero(&self) -> Result<(), ()>350 pub fn ensure_content_length_zero(&self) -> Result<(), ()> { 351 match self.content_length { 352 ContentLength::Remaining(0) => Ok(()), 353 ContentLength::Remaining(_) => Err(()), 354 _ => Ok(()), 355 } 356 } 357 notify_send(&mut self)358 pub fn notify_send(&mut self) { 359 if let Some(task) = self.send_task.take() { 360 task.wake(); 361 } 362 } 363 wait_send(&mut self, cx: &Context)364 pub fn wait_send(&mut self, cx: &Context) { 365 self.send_task = Some(cx.waker().clone()); 366 } 367 notify_recv(&mut self)368 pub fn notify_recv(&mut self) { 369 if let Some(task) = self.recv_task.take() { 370 task.wake(); 371 } 372 } 373 } 374 375 impl store::Next for NextAccept { next(stream: &Stream) -> Option<store::Key>376 fn next(stream: &Stream) -> Option<store::Key> { 377 stream.next_pending_accept 378 } 379 set_next(stream: &mut Stream, key: Option<store::Key>)380 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 381 stream.next_pending_accept = key; 382 } 383 take_next(stream: &mut Stream) -> Option<store::Key>384 fn take_next(stream: &mut Stream) -> Option<store::Key> { 385 stream.next_pending_accept.take() 386 } 387 is_queued(stream: &Stream) -> bool388 fn is_queued(stream: &Stream) -> bool { 389 stream.is_pending_accept 390 } 391 set_queued(stream: &mut Stream, val: bool)392 fn set_queued(stream: &mut Stream, val: bool) { 393 stream.is_pending_accept = val; 394 } 395 } 396 397 impl store::Next for NextSend { next(stream: &Stream) -> Option<store::Key>398 fn next(stream: &Stream) -> Option<store::Key> { 399 stream.next_pending_send 400 } 401 set_next(stream: &mut Stream, key: Option<store::Key>)402 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 403 stream.next_pending_send = key; 404 } 405 take_next(stream: &mut Stream) -> Option<store::Key>406 fn take_next(stream: &mut Stream) -> Option<store::Key> { 407 stream.next_pending_send.take() 408 } 409 is_queued(stream: &Stream) -> bool410 fn is_queued(stream: &Stream) -> bool { 411 stream.is_pending_send 412 } 413 set_queued(stream: &mut Stream, val: bool)414 fn set_queued(stream: &mut Stream, val: bool) { 415 if val { 416 // ensure that stream is not queued for being opened 417 // if it's being put into queue for sending data 418 debug_assert!(!stream.is_pending_open); 419 } 420 stream.is_pending_send = val; 421 } 422 } 423 424 impl store::Next for NextSendCapacity { next(stream: &Stream) -> Option<store::Key>425 fn next(stream: &Stream) -> Option<store::Key> { 426 stream.next_pending_send_capacity 427 } 428 set_next(stream: &mut Stream, key: Option<store::Key>)429 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 430 stream.next_pending_send_capacity = key; 431 } 432 take_next(stream: &mut Stream) -> Option<store::Key>433 fn take_next(stream: &mut Stream) -> Option<store::Key> { 434 stream.next_pending_send_capacity.take() 435 } 436 is_queued(stream: &Stream) -> bool437 fn is_queued(stream: &Stream) -> bool { 438 stream.is_pending_send_capacity 439 } 440 set_queued(stream: &mut Stream, val: bool)441 fn set_queued(stream: &mut Stream, val: bool) { 442 stream.is_pending_send_capacity = val; 443 } 444 } 445 446 impl store::Next for NextWindowUpdate { next(stream: &Stream) -> Option<store::Key>447 fn next(stream: &Stream) -> Option<store::Key> { 448 stream.next_window_update 449 } 450 set_next(stream: &mut Stream, key: Option<store::Key>)451 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 452 stream.next_window_update = key; 453 } 454 take_next(stream: &mut Stream) -> Option<store::Key>455 fn take_next(stream: &mut Stream) -> Option<store::Key> { 456 stream.next_window_update.take() 457 } 458 is_queued(stream: &Stream) -> bool459 fn is_queued(stream: &Stream) -> bool { 460 stream.is_pending_window_update 461 } 462 set_queued(stream: &mut Stream, val: bool)463 fn set_queued(stream: &mut Stream, val: bool) { 464 stream.is_pending_window_update = val; 465 } 466 } 467 468 impl store::Next for NextOpen { next(stream: &Stream) -> Option<store::Key>469 fn next(stream: &Stream) -> Option<store::Key> { 470 stream.next_open 471 } 472 set_next(stream: &mut Stream, key: Option<store::Key>)473 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 474 stream.next_open = key; 475 } 476 take_next(stream: &mut Stream) -> Option<store::Key>477 fn take_next(stream: &mut Stream) -> Option<store::Key> { 478 stream.next_open.take() 479 } 480 is_queued(stream: &Stream) -> bool481 fn is_queued(stream: &Stream) -> bool { 482 stream.is_pending_open 483 } 484 set_queued(stream: &mut Stream, val: bool)485 fn set_queued(stream: &mut Stream, val: bool) { 486 if val { 487 // ensure that stream is not queued for being sent 488 // if it's being put into queue for opening the stream 489 debug_assert!(!stream.is_pending_send); 490 } 491 stream.is_pending_open = val; 492 } 493 } 494 495 impl store::Next for NextResetExpire { next(stream: &Stream) -> Option<store::Key>496 fn next(stream: &Stream) -> Option<store::Key> { 497 stream.next_reset_expire 498 } 499 set_next(stream: &mut Stream, key: Option<store::Key>)500 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 501 stream.next_reset_expire = key; 502 } 503 take_next(stream: &mut Stream) -> Option<store::Key>504 fn take_next(stream: &mut Stream) -> Option<store::Key> { 505 stream.next_reset_expire.take() 506 } 507 is_queued(stream: &Stream) -> bool508 fn is_queued(stream: &Stream) -> bool { 509 stream.reset_at.is_some() 510 } 511 set_queued(stream: &mut Stream, val: bool)512 fn set_queued(stream: &mut Stream, val: bool) { 513 if val { 514 stream.reset_at = Some(Instant::now()); 515 } else { 516 stream.reset_at = None; 517 } 518 } 519 } 520 521 // ===== impl ContentLength ===== 522 523 impl ContentLength { is_head(&self) -> bool524 pub fn is_head(&self) -> bool { 525 matches!(*self, Self::Head) 526 } 527 } 528