1 use super::*; 2 3 use indexmap::{self, IndexMap}; 4 5 use std::convert::Infallible; 6 use std::fmt; 7 use std::marker::PhantomData; 8 use std::ops; 9 10 /// Storage for streams 11 #[derive(Debug)] 12 pub(super) struct Store { 13 slab: slab::Slab<Stream>, 14 ids: IndexMap<StreamId, SlabIndex>, 15 } 16 17 /// "Pointer" to an entry in the store 18 pub(super) struct Ptr<'a> { 19 key: Key, 20 store: &'a mut Store, 21 } 22 23 /// References an entry in the store. 24 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 25 pub(crate) struct Key { 26 index: SlabIndex, 27 /// Keep the stream ID in the key as an ABA guard, since slab indices 28 /// could be re-used with a new stream. 29 stream_id: StreamId, 30 } 31 32 // We can never have more than `StreamId::MAX` streams in the store, 33 // so we can save a smaller index (u32 vs usize). 34 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 35 struct SlabIndex(u32); 36 37 #[derive(Debug)] 38 pub(super) struct Queue<N> { 39 indices: Option<store::Indices>, 40 _p: PhantomData<N>, 41 } 42 43 pub(super) trait Next { next(stream: &Stream) -> Option<Key>44 fn next(stream: &Stream) -> Option<Key>; 45 set_next(stream: &mut Stream, key: Option<Key>)46 fn set_next(stream: &mut Stream, key: Option<Key>); 47 take_next(stream: &mut Stream) -> Option<Key>48 fn take_next(stream: &mut Stream) -> Option<Key>; 49 is_queued(stream: &Stream) -> bool50 fn is_queued(stream: &Stream) -> bool; 51 set_queued(stream: &mut Stream, val: bool)52 fn set_queued(stream: &mut Stream, val: bool); 53 } 54 55 /// A linked list 56 #[derive(Debug, Clone, Copy)] 57 struct Indices { 58 pub head: Key, 59 pub tail: Key, 60 } 61 62 pub(super) enum Entry<'a> { 63 Occupied(OccupiedEntry<'a>), 64 Vacant(VacantEntry<'a>), 65 } 66 67 pub(super) struct OccupiedEntry<'a> { 68 ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>, 69 } 70 71 pub(super) struct VacantEntry<'a> { 72 ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>, 73 slab: &'a mut slab::Slab<Stream>, 74 } 75 76 pub(super) trait Resolve { resolve(&mut self, key: Key) -> Ptr77 fn resolve(&mut self, key: Key) -> Ptr; 78 } 79 80 // ===== impl Store ===== 81 82 impl Store { new() -> Self83 pub fn new() -> Self { 84 Store { 85 slab: slab::Slab::new(), 86 ids: IndexMap::new(), 87 } 88 } 89 find_mut(&mut self, id: &StreamId) -> Option<Ptr>90 pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr> { 91 let index = match self.ids.get(id) { 92 Some(key) => *key, 93 None => return None, 94 }; 95 96 Some(Ptr { 97 key: Key { 98 index, 99 stream_id: *id, 100 }, 101 store: self, 102 }) 103 } 104 insert(&mut self, id: StreamId, val: Stream) -> Ptr105 pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { 106 let index = SlabIndex(self.slab.insert(val) as u32); 107 assert!(self.ids.insert(id, index).is_none()); 108 109 Ptr { 110 key: Key { 111 index, 112 stream_id: id, 113 }, 114 store: self, 115 } 116 } 117 find_entry(&mut self, id: StreamId) -> Entry118 pub fn find_entry(&mut self, id: StreamId) -> Entry { 119 use self::indexmap::map::Entry::*; 120 121 match self.ids.entry(id) { 122 Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }), 123 Vacant(e) => Entry::Vacant(VacantEntry { 124 ids: e, 125 slab: &mut self.slab, 126 }), 127 } 128 } 129 130 #[allow(clippy::blocks_in_conditions)] for_each<F>(&mut self, mut f: F) where F: FnMut(Ptr),131 pub(crate) fn for_each<F>(&mut self, mut f: F) 132 where 133 F: FnMut(Ptr), 134 { 135 match self.try_for_each(|ptr| { 136 f(ptr); 137 Ok::<_, Infallible>(()) 138 }) { 139 Ok(()) => (), 140 Err(infallible) => match infallible {}, 141 } 142 } 143 try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E> where F: FnMut(Ptr) -> Result<(), E>,144 pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E> 145 where 146 F: FnMut(Ptr) -> Result<(), E>, 147 { 148 let mut len = self.ids.len(); 149 let mut i = 0; 150 151 while i < len { 152 // Get the key by index, this makes the borrow checker happy 153 let (stream_id, index) = { 154 let entry = self.ids.get_index(i).unwrap(); 155 (*entry.0, *entry.1) 156 }; 157 158 f(Ptr { 159 key: Key { index, stream_id }, 160 store: self, 161 })?; 162 163 // TODO: This logic probably could be better... 164 let new_len = self.ids.len(); 165 166 if new_len < len { 167 debug_assert!(new_len == len - 1); 168 len -= 1; 169 } else { 170 i += 1; 171 } 172 } 173 174 Ok(()) 175 } 176 } 177 178 impl Resolve for Store { resolve(&mut self, key: Key) -> Ptr179 fn resolve(&mut self, key: Key) -> Ptr { 180 Ptr { key, store: self } 181 } 182 } 183 184 impl ops::Index<Key> for Store { 185 type Output = Stream; 186 index(&self, key: Key) -> &Self::Output187 fn index(&self, key: Key) -> &Self::Output { 188 self.slab 189 .get(key.index.0 as usize) 190 .filter(|s| s.id == key.stream_id) 191 .unwrap_or_else(|| { 192 panic!("dangling store key for stream_id={:?}", key.stream_id); 193 }) 194 } 195 } 196 197 impl ops::IndexMut<Key> for Store { index_mut(&mut self, key: Key) -> &mut Self::Output198 fn index_mut(&mut self, key: Key) -> &mut Self::Output { 199 self.slab 200 .get_mut(key.index.0 as usize) 201 .filter(|s| s.id == key.stream_id) 202 .unwrap_or_else(|| { 203 panic!("dangling store key for stream_id={:?}", key.stream_id); 204 }) 205 } 206 } 207 208 impl Store { 209 #[cfg(feature = "unstable")] num_active_streams(&self) -> usize210 pub fn num_active_streams(&self) -> usize { 211 self.ids.len() 212 } 213 214 #[cfg(feature = "unstable")] num_wired_streams(&self) -> usize215 pub fn num_wired_streams(&self) -> usize { 216 self.slab.len() 217 } 218 } 219 220 // While running h2 unit/integration tests, enable this debug assertion. 221 // 222 // In practice, we don't need to ensure this. But the integration tests 223 // help to make sure we've cleaned up in cases where we could (like, the 224 // runtime isn't suddenly dropping the task for unknown reasons). 225 #[cfg(feature = "unstable")] 226 impl Drop for Store { drop(&mut self)227 fn drop(&mut self) { 228 use std::thread; 229 230 if !thread::panicking() { 231 debug_assert!(self.slab.is_empty()); 232 } 233 } 234 } 235 236 // ===== impl Queue ===== 237 238 impl<N> Queue<N> 239 where 240 N: Next, 241 { new() -> Self242 pub fn new() -> Self { 243 Queue { 244 indices: None, 245 _p: PhantomData, 246 } 247 } 248 take(&mut self) -> Self249 pub fn take(&mut self) -> Self { 250 Queue { 251 indices: self.indices.take(), 252 _p: PhantomData, 253 } 254 } 255 256 /// Queue the stream. 257 /// 258 /// If the stream is already contained by the list, return `false`. push(&mut self, stream: &mut store::Ptr) -> bool259 pub fn push(&mut self, stream: &mut store::Ptr) -> bool { 260 tracing::trace!("Queue::push_back"); 261 262 if N::is_queued(stream) { 263 tracing::trace!(" -> already queued"); 264 return false; 265 } 266 267 N::set_queued(stream, true); 268 269 // The next pointer shouldn't be set 270 debug_assert!(N::next(stream).is_none()); 271 272 // Queue the stream 273 match self.indices { 274 Some(ref mut idxs) => { 275 tracing::trace!(" -> existing entries"); 276 277 // Update the current tail node to point to `stream` 278 let key = stream.key(); 279 N::set_next(&mut stream.resolve(idxs.tail), Some(key)); 280 281 // Update the tail pointer 282 idxs.tail = stream.key(); 283 } 284 None => { 285 tracing::trace!(" -> first entry"); 286 self.indices = Some(store::Indices { 287 head: stream.key(), 288 tail: stream.key(), 289 }); 290 } 291 } 292 293 true 294 } 295 296 /// Queue the stream 297 /// 298 /// If the stream is already contained by the list, return `false`. push_front(&mut self, stream: &mut store::Ptr) -> bool299 pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool { 300 tracing::trace!("Queue::push_front"); 301 302 if N::is_queued(stream) { 303 tracing::trace!(" -> already queued"); 304 return false; 305 } 306 307 N::set_queued(stream, true); 308 309 // The next pointer shouldn't be set 310 debug_assert!(N::next(stream).is_none()); 311 312 // Queue the stream 313 match self.indices { 314 Some(ref mut idxs) => { 315 tracing::trace!(" -> existing entries"); 316 317 // Update the provided stream to point to the head node 318 let head_key = stream.resolve(idxs.head).key(); 319 N::set_next(stream, Some(head_key)); 320 321 // Update the head pointer 322 idxs.head = stream.key(); 323 } 324 None => { 325 tracing::trace!(" -> first entry"); 326 self.indices = Some(store::Indices { 327 head: stream.key(), 328 tail: stream.key(), 329 }); 330 } 331 } 332 333 true 334 } 335 pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>> where R: Resolve,336 pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>> 337 where 338 R: Resolve, 339 { 340 if let Some(mut idxs) = self.indices { 341 let mut stream = store.resolve(idxs.head); 342 343 if idxs.head == idxs.tail { 344 assert!(N::next(&stream).is_none()); 345 self.indices = None; 346 } else { 347 idxs.head = N::take_next(&mut stream).unwrap(); 348 self.indices = Some(idxs); 349 } 350 351 debug_assert!(N::is_queued(&stream)); 352 N::set_queued(&mut stream, false); 353 354 return Some(stream); 355 } 356 357 None 358 } 359 is_empty(&self) -> bool360 pub fn is_empty(&self) -> bool { 361 self.indices.is_none() 362 } 363 pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>> where R: Resolve, F: Fn(&Stream) -> bool,364 pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>> 365 where 366 R: Resolve, 367 F: Fn(&Stream) -> bool, 368 { 369 if let Some(idxs) = self.indices { 370 let should_pop = f(&store.resolve(idxs.head)); 371 if should_pop { 372 return self.pop(store); 373 } 374 } 375 376 None 377 } 378 } 379 380 // ===== impl Ptr ===== 381 382 impl<'a> Ptr<'a> { 383 /// Returns the Key associated with the stream key(&self) -> Key384 pub fn key(&self) -> Key { 385 self.key 386 } 387 store_mut(&mut self) -> &mut Store388 pub fn store_mut(&mut self) -> &mut Store { 389 self.store 390 } 391 392 /// Remove the stream from the store remove(self) -> StreamId393 pub fn remove(self) -> StreamId { 394 // The stream must have been unlinked before this point 395 debug_assert!(!self.store.ids.contains_key(&self.key.stream_id)); 396 397 // Remove the stream state 398 let stream = self.store.slab.remove(self.key.index.0 as usize); 399 assert_eq!(stream.id, self.key.stream_id); 400 stream.id 401 } 402 403 /// Remove the StreamId -> stream state association. 404 /// 405 /// This will effectively remove the stream as far as the H2 protocol is 406 /// concerned. unlink(&mut self)407 pub fn unlink(&mut self) { 408 let id = self.key.stream_id; 409 self.store.ids.swap_remove(&id); 410 } 411 } 412 413 impl<'a> Resolve for Ptr<'a> { resolve(&mut self, key: Key) -> Ptr414 fn resolve(&mut self, key: Key) -> Ptr { 415 Ptr { 416 key, 417 store: &mut *self.store, 418 } 419 } 420 } 421 422 impl<'a> ops::Deref for Ptr<'a> { 423 type Target = Stream; 424 deref(&self) -> &Stream425 fn deref(&self) -> &Stream { 426 &self.store[self.key] 427 } 428 } 429 430 impl<'a> ops::DerefMut for Ptr<'a> { deref_mut(&mut self) -> &mut Stream431 fn deref_mut(&mut self) -> &mut Stream { 432 &mut self.store[self.key] 433 } 434 } 435 436 impl<'a> fmt::Debug for Ptr<'a> { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result437 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 438 (**self).fmt(fmt) 439 } 440 } 441 442 // ===== impl OccupiedEntry ===== 443 444 impl<'a> OccupiedEntry<'a> { key(&self) -> Key445 pub fn key(&self) -> Key { 446 let stream_id = *self.ids.key(); 447 let index = *self.ids.get(); 448 Key { index, stream_id } 449 } 450 } 451 452 // ===== impl VacantEntry ===== 453 454 impl<'a> VacantEntry<'a> { insert(self, value: Stream) -> Key455 pub fn insert(self, value: Stream) -> Key { 456 // Insert the value in the slab 457 let stream_id = value.id; 458 let index = SlabIndex(self.slab.insert(value) as u32); 459 460 // Insert the handle in the ID map 461 self.ids.insert(index); 462 463 Key { index, stream_id } 464 } 465 } 466