1 use super::*;
2 
3 use indexmap::{self, IndexMap};
4 
5 use std::convert::Infallible;
6 use std::fmt;
7 use std::marker::PhantomData;
8 use std::ops;
9 
10 /// Storage for streams
11 #[derive(Debug)]
12 pub(super) struct Store {
13     slab: slab::Slab<Stream>,
14     ids: IndexMap<StreamId, SlabIndex>,
15 }
16 
17 /// "Pointer" to an entry in the store
18 pub(super) struct Ptr<'a> {
19     key: Key,
20     store: &'a mut Store,
21 }
22 
23 /// References an entry in the store.
24 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
25 pub(crate) struct Key {
26     index: SlabIndex,
27     /// Keep the stream ID in the key as an ABA guard, since slab indices
28     /// could be re-used with a new stream.
29     stream_id: StreamId,
30 }
31 
32 // We can never have more than `StreamId::MAX` streams in the store,
33 // so we can save a smaller index (u32 vs usize).
34 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
35 struct SlabIndex(u32);
36 
37 #[derive(Debug)]
38 pub(super) struct Queue<N> {
39     indices: Option<store::Indices>,
40     _p: PhantomData<N>,
41 }
42 
43 pub(super) trait Next {
next(stream: &Stream) -> Option<Key>44     fn next(stream: &Stream) -> Option<Key>;
45 
set_next(stream: &mut Stream, key: Option<Key>)46     fn set_next(stream: &mut Stream, key: Option<Key>);
47 
take_next(stream: &mut Stream) -> Option<Key>48     fn take_next(stream: &mut Stream) -> Option<Key>;
49 
is_queued(stream: &Stream) -> bool50     fn is_queued(stream: &Stream) -> bool;
51 
set_queued(stream: &mut Stream, val: bool)52     fn set_queued(stream: &mut Stream, val: bool);
53 }
54 
55 /// A linked list
56 #[derive(Debug, Clone, Copy)]
57 struct Indices {
58     pub head: Key,
59     pub tail: Key,
60 }
61 
62 pub(super) enum Entry<'a> {
63     Occupied(OccupiedEntry<'a>),
64     Vacant(VacantEntry<'a>),
65 }
66 
67 pub(super) struct OccupiedEntry<'a> {
68     ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
69 }
70 
71 pub(super) struct VacantEntry<'a> {
72     ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
73     slab: &'a mut slab::Slab<Stream>,
74 }
75 
76 pub(super) trait Resolve {
resolve(&mut self, key: Key) -> Ptr77     fn resolve(&mut self, key: Key) -> Ptr;
78 }
79 
80 // ===== impl Store =====
81 
82 impl Store {
new() -> Self83     pub fn new() -> Self {
84         Store {
85             slab: slab::Slab::new(),
86             ids: IndexMap::new(),
87         }
88     }
89 
find_mut(&mut self, id: &StreamId) -> Option<Ptr>90     pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr> {
91         let index = match self.ids.get(id) {
92             Some(key) => *key,
93             None => return None,
94         };
95 
96         Some(Ptr {
97             key: Key {
98                 index,
99                 stream_id: *id,
100             },
101             store: self,
102         })
103     }
104 
insert(&mut self, id: StreamId, val: Stream) -> Ptr105     pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr {
106         let index = SlabIndex(self.slab.insert(val) as u32);
107         assert!(self.ids.insert(id, index).is_none());
108 
109         Ptr {
110             key: Key {
111                 index,
112                 stream_id: id,
113             },
114             store: self,
115         }
116     }
117 
find_entry(&mut self, id: StreamId) -> Entry118     pub fn find_entry(&mut self, id: StreamId) -> Entry {
119         use self::indexmap::map::Entry::*;
120 
121         match self.ids.entry(id) {
122             Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
123             Vacant(e) => Entry::Vacant(VacantEntry {
124                 ids: e,
125                 slab: &mut self.slab,
126             }),
127         }
128     }
129 
130     #[allow(clippy::blocks_in_conditions)]
for_each<F>(&mut self, mut f: F) where F: FnMut(Ptr),131     pub(crate) fn for_each<F>(&mut self, mut f: F)
132     where
133         F: FnMut(Ptr),
134     {
135         match self.try_for_each(|ptr| {
136             f(ptr);
137             Ok::<_, Infallible>(())
138         }) {
139             Ok(()) => (),
140             Err(infallible) => match infallible {},
141         }
142     }
143 
try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E> where F: FnMut(Ptr) -> Result<(), E>,144     pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
145     where
146         F: FnMut(Ptr) -> Result<(), E>,
147     {
148         let mut len = self.ids.len();
149         let mut i = 0;
150 
151         while i < len {
152             // Get the key by index, this makes the borrow checker happy
153             let (stream_id, index) = {
154                 let entry = self.ids.get_index(i).unwrap();
155                 (*entry.0, *entry.1)
156             };
157 
158             f(Ptr {
159                 key: Key { index, stream_id },
160                 store: self,
161             })?;
162 
163             // TODO: This logic probably could be better...
164             let new_len = self.ids.len();
165 
166             if new_len < len {
167                 debug_assert!(new_len == len - 1);
168                 len -= 1;
169             } else {
170                 i += 1;
171             }
172         }
173 
174         Ok(())
175     }
176 }
177 
178 impl Resolve for Store {
resolve(&mut self, key: Key) -> Ptr179     fn resolve(&mut self, key: Key) -> Ptr {
180         Ptr { key, store: self }
181     }
182 }
183 
184 impl ops::Index<Key> for Store {
185     type Output = Stream;
186 
index(&self, key: Key) -> &Self::Output187     fn index(&self, key: Key) -> &Self::Output {
188         self.slab
189             .get(key.index.0 as usize)
190             .filter(|s| s.id == key.stream_id)
191             .unwrap_or_else(|| {
192                 panic!("dangling store key for stream_id={:?}", key.stream_id);
193             })
194     }
195 }
196 
197 impl ops::IndexMut<Key> for Store {
index_mut(&mut self, key: Key) -> &mut Self::Output198     fn index_mut(&mut self, key: Key) -> &mut Self::Output {
199         self.slab
200             .get_mut(key.index.0 as usize)
201             .filter(|s| s.id == key.stream_id)
202             .unwrap_or_else(|| {
203                 panic!("dangling store key for stream_id={:?}", key.stream_id);
204             })
205     }
206 }
207 
208 impl Store {
209     #[cfg(feature = "unstable")]
num_active_streams(&self) -> usize210     pub fn num_active_streams(&self) -> usize {
211         self.ids.len()
212     }
213 
214     #[cfg(feature = "unstable")]
num_wired_streams(&self) -> usize215     pub fn num_wired_streams(&self) -> usize {
216         self.slab.len()
217     }
218 }
219 
220 // While running h2 unit/integration tests, enable this debug assertion.
221 //
222 // In practice, we don't need to ensure this. But the integration tests
223 // help to make sure we've cleaned up in cases where we could (like, the
224 // runtime isn't suddenly dropping the task for unknown reasons).
225 #[cfg(feature = "unstable")]
226 impl Drop for Store {
drop(&mut self)227     fn drop(&mut self) {
228         use std::thread;
229 
230         if !thread::panicking() {
231             debug_assert!(self.slab.is_empty());
232         }
233     }
234 }
235 
236 // ===== impl Queue =====
237 
238 impl<N> Queue<N>
239 where
240     N: Next,
241 {
new() -> Self242     pub fn new() -> Self {
243         Queue {
244             indices: None,
245             _p: PhantomData,
246         }
247     }
248 
take(&mut self) -> Self249     pub fn take(&mut self) -> Self {
250         Queue {
251             indices: self.indices.take(),
252             _p: PhantomData,
253         }
254     }
255 
256     /// Queue the stream.
257     ///
258     /// If the stream is already contained by the list, return `false`.
push(&mut self, stream: &mut store::Ptr) -> bool259     pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
260         tracing::trace!("Queue::push_back");
261 
262         if N::is_queued(stream) {
263             tracing::trace!(" -> already queued");
264             return false;
265         }
266 
267         N::set_queued(stream, true);
268 
269         // The next pointer shouldn't be set
270         debug_assert!(N::next(stream).is_none());
271 
272         // Queue the stream
273         match self.indices {
274             Some(ref mut idxs) => {
275                 tracing::trace!(" -> existing entries");
276 
277                 // Update the current tail node to point to `stream`
278                 let key = stream.key();
279                 N::set_next(&mut stream.resolve(idxs.tail), Some(key));
280 
281                 // Update the tail pointer
282                 idxs.tail = stream.key();
283             }
284             None => {
285                 tracing::trace!(" -> first entry");
286                 self.indices = Some(store::Indices {
287                     head: stream.key(),
288                     tail: stream.key(),
289                 });
290             }
291         }
292 
293         true
294     }
295 
296     /// Queue the stream
297     ///
298     /// If the stream is already contained by the list, return `false`.
push_front(&mut self, stream: &mut store::Ptr) -> bool299     pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
300         tracing::trace!("Queue::push_front");
301 
302         if N::is_queued(stream) {
303             tracing::trace!(" -> already queued");
304             return false;
305         }
306 
307         N::set_queued(stream, true);
308 
309         // The next pointer shouldn't be set
310         debug_assert!(N::next(stream).is_none());
311 
312         // Queue the stream
313         match self.indices {
314             Some(ref mut idxs) => {
315                 tracing::trace!(" -> existing entries");
316 
317                 // Update the provided stream to point to the head node
318                 let head_key = stream.resolve(idxs.head).key();
319                 N::set_next(stream, Some(head_key));
320 
321                 // Update the head pointer
322                 idxs.head = stream.key();
323             }
324             None => {
325                 tracing::trace!(" -> first entry");
326                 self.indices = Some(store::Indices {
327                     head: stream.key(),
328                     tail: stream.key(),
329                 });
330             }
331         }
332 
333         true
334     }
335 
pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>> where R: Resolve,336     pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
337     where
338         R: Resolve,
339     {
340         if let Some(mut idxs) = self.indices {
341             let mut stream = store.resolve(idxs.head);
342 
343             if idxs.head == idxs.tail {
344                 assert!(N::next(&stream).is_none());
345                 self.indices = None;
346             } else {
347                 idxs.head = N::take_next(&mut stream).unwrap();
348                 self.indices = Some(idxs);
349             }
350 
351             debug_assert!(N::is_queued(&stream));
352             N::set_queued(&mut stream, false);
353 
354             return Some(stream);
355         }
356 
357         None
358     }
359 
is_empty(&self) -> bool360     pub fn is_empty(&self) -> bool {
361         self.indices.is_none()
362     }
363 
pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>> where R: Resolve, F: Fn(&Stream) -> bool,364     pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
365     where
366         R: Resolve,
367         F: Fn(&Stream) -> bool,
368     {
369         if let Some(idxs) = self.indices {
370             let should_pop = f(&store.resolve(idxs.head));
371             if should_pop {
372                 return self.pop(store);
373             }
374         }
375 
376         None
377     }
378 }
379 
380 // ===== impl Ptr =====
381 
382 impl<'a> Ptr<'a> {
383     /// Returns the Key associated with the stream
key(&self) -> Key384     pub fn key(&self) -> Key {
385         self.key
386     }
387 
store_mut(&mut self) -> &mut Store388     pub fn store_mut(&mut self) -> &mut Store {
389         self.store
390     }
391 
392     /// Remove the stream from the store
remove(self) -> StreamId393     pub fn remove(self) -> StreamId {
394         // The stream must have been unlinked before this point
395         debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
396 
397         // Remove the stream state
398         let stream = self.store.slab.remove(self.key.index.0 as usize);
399         assert_eq!(stream.id, self.key.stream_id);
400         stream.id
401     }
402 
403     /// Remove the StreamId -> stream state association.
404     ///
405     /// This will effectively remove the stream as far as the H2 protocol is
406     /// concerned.
unlink(&mut self)407     pub fn unlink(&mut self) {
408         let id = self.key.stream_id;
409         self.store.ids.swap_remove(&id);
410     }
411 }
412 
413 impl<'a> Resolve for Ptr<'a> {
resolve(&mut self, key: Key) -> Ptr414     fn resolve(&mut self, key: Key) -> Ptr {
415         Ptr {
416             key,
417             store: &mut *self.store,
418         }
419     }
420 }
421 
422 impl<'a> ops::Deref for Ptr<'a> {
423     type Target = Stream;
424 
deref(&self) -> &Stream425     fn deref(&self) -> &Stream {
426         &self.store[self.key]
427     }
428 }
429 
430 impl<'a> ops::DerefMut for Ptr<'a> {
deref_mut(&mut self) -> &mut Stream431     fn deref_mut(&mut self) -> &mut Stream {
432         &mut self.store[self.key]
433     }
434 }
435 
436 impl<'a> fmt::Debug for Ptr<'a> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result437     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
438         (**self).fmt(fmt)
439     }
440 }
441 
442 // ===== impl OccupiedEntry =====
443 
444 impl<'a> OccupiedEntry<'a> {
key(&self) -> Key445     pub fn key(&self) -> Key {
446         let stream_id = *self.ids.key();
447         let index = *self.ids.get();
448         Key { index, stream_id }
449     }
450 }
451 
452 // ===== impl VacantEntry =====
453 
454 impl<'a> VacantEntry<'a> {
insert(self, value: Stream) -> Key455     pub fn insert(self, value: Stream) -> Key {
456         // Insert the value in the slab
457         let stream_id = value.id;
458         let index = SlabIndex(self.slab.insert(value) as u32);
459 
460         // Insert the handle in the ID map
461         self.ids.insert(index);
462 
463         Key { index, stream_id }
464     }
465 }
466