1 mod level;
2 pub(crate) use self::level::Expiration;
3 use self::level::Level;
4 
5 mod stack;
6 pub(crate) use self::stack::Stack;
7 
8 use std::borrow::Borrow;
9 use std::fmt::Debug;
10 
11 /// Timing wheel implementation.
12 ///
13 /// This type provides the hashed timing wheel implementation that backs `Timer`
14 /// and `DelayQueue`.
15 ///
16 /// The structure is generic over `T: Stack`. This allows handling timeout data
17 /// being stored on the heap or in a slab. In order to support the latter case,
18 /// the slab must be passed into each function allowing the implementation to
19 /// lookup timer entries.
20 ///
21 /// See `Timer` documentation for some implementation notes.
22 #[derive(Debug)]
23 pub(crate) struct Wheel<T> {
24     /// The number of milliseconds elapsed since the wheel started.
25     elapsed: u64,
26 
27     /// Timer wheel.
28     ///
29     /// Levels:
30     ///
31     /// * 1 ms slots / 64 ms range
32     /// * 64 ms slots / ~ 4 sec range
33     /// * ~ 4 sec slots / ~ 4 min range
34     /// * ~ 4 min slots / ~ 4 hr range
35     /// * ~ 4 hr slots / ~ 12 day range
36     /// * ~ 12 day slots / ~ 2 yr range
37     levels: Box<[Level<T>]>,
38 }
39 
40 /// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
41 /// each, the timer is able to track time up to 2 years into the future with a
42 /// precision of 1 millisecond.
43 const NUM_LEVELS: usize = 6;
44 
45 /// The maximum duration of a delay
46 const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
47 
48 #[derive(Debug)]
49 pub(crate) enum InsertError {
50     Elapsed,
51     Invalid,
52 }
53 
54 impl<T> Wheel<T>
55 where
56     T: Stack,
57 {
58     /// Create a new timing wheel
new() -> Wheel<T>59     pub(crate) fn new() -> Wheel<T> {
60         let levels = (0..NUM_LEVELS).map(Level::new).collect();
61 
62         Wheel { elapsed: 0, levels }
63     }
64 
65     /// Return the number of milliseconds that have elapsed since the timing
66     /// wheel's creation.
elapsed(&self) -> u6467     pub(crate) fn elapsed(&self) -> u64 {
68         self.elapsed
69     }
70 
71     /// Insert an entry into the timing wheel.
72     ///
73     /// # Arguments
74     ///
75     /// * `when`: is the instant at which the entry should be fired. It is
76     ///           represented as the number of milliseconds since the creation
77     ///           of the timing wheel.
78     ///
79     /// * `item`: The item to insert into the wheel.
80     ///
81     /// * `store`: The slab or `()` when using heap storage.
82     ///
83     /// # Return
84     ///
85     /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
86     ///
87     /// `Err(Elapsed)` indicates that `when` represents an instant that has
88     /// already passed. In this case, the caller should fire the timeout
89     /// immediately.
90     ///
91     /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
insert( &mut self, when: u64, item: T::Owned, store: &mut T::Store, ) -> Result<(), (T::Owned, InsertError)>92     pub(crate) fn insert(
93         &mut self,
94         when: u64,
95         item: T::Owned,
96         store: &mut T::Store,
97     ) -> Result<(), (T::Owned, InsertError)> {
98         if when <= self.elapsed {
99             return Err((item, InsertError::Elapsed));
100         } else if when - self.elapsed > MAX_DURATION {
101             return Err((item, InsertError::Invalid));
102         }
103 
104         // Get the level at which the entry should be stored
105         let level = self.level_for(when);
106 
107         self.levels[level].add_entry(when, item, store);
108 
109         debug_assert!({
110             self.levels[level]
111                 .next_expiration(self.elapsed)
112                 .map(|e| e.deadline >= self.elapsed)
113                 .unwrap_or(true)
114         });
115 
116         Ok(())
117     }
118 
119     /// Remove `item` from the timing wheel.
120     #[track_caller]
remove(&mut self, item: &T::Borrowed, store: &mut T::Store)121     pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
122         let when = T::when(item, store);
123 
124         assert!(
125             self.elapsed <= when,
126             "elapsed={}; when={}",
127             self.elapsed,
128             when
129         );
130 
131         let level = self.level_for(when);
132 
133         self.levels[level].remove_entry(when, item, store);
134     }
135 
136     /// Instant at which to poll
poll_at(&self) -> Option<u64>137     pub(crate) fn poll_at(&self) -> Option<u64> {
138         self.next_expiration().map(|expiration| expiration.deadline)
139     }
140 
141     /// Next key that will expire
peek(&self) -> Option<T::Owned>142     pub(crate) fn peek(&self) -> Option<T::Owned> {
143         self.next_expiration()
144             .and_then(|expiration| self.peek_entry(&expiration))
145     }
146 
147     /// Advances the timer up to the instant represented by `now`.
poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned>148     pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
149         loop {
150             let expiration = self.next_expiration().and_then(|expiration| {
151                 if expiration.deadline > now {
152                     None
153                 } else {
154                     Some(expiration)
155                 }
156             });
157 
158             match expiration {
159                 Some(ref expiration) => {
160                     if let Some(item) = self.poll_expiration(expiration, store) {
161                         return Some(item);
162                     }
163 
164                     self.set_elapsed(expiration.deadline);
165                 }
166                 None => {
167                     // in this case the poll did not indicate an expiration
168                     // _and_ we were not able to find a next expiration in
169                     // the current list of timers.  advance to the poll's
170                     // current time and do nothing else.
171                     self.set_elapsed(now);
172                     return None;
173                 }
174             }
175         }
176     }
177 
178     /// Returns the instant at which the next timeout expires.
next_expiration(&self) -> Option<Expiration>179     fn next_expiration(&self) -> Option<Expiration> {
180         // Check all levels
181         for level in 0..NUM_LEVELS {
182             if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
183                 // There cannot be any expirations at a higher level that happen
184                 // before this one.
185                 debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
186 
187                 return Some(expiration);
188             }
189         }
190 
191         None
192     }
193 
194     /// Used for debug assertions
no_expirations_before(&self, start_level: usize, before: u64) -> bool195     fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
196         let mut res = true;
197 
198         for l2 in start_level..NUM_LEVELS {
199             if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
200                 if e2.deadline < before {
201                     res = false;
202                 }
203             }
204         }
205 
206         res
207     }
208 
209     /// iteratively find entries that are between the wheel's current
210     /// time and the expiration time.  for each in that population either
211     /// return it for notification (in the case of the last level) or tier
212     /// it down to the next level (in all other cases).
poll_expiration( &mut self, expiration: &Expiration, store: &mut T::Store, ) -> Option<T::Owned>213     pub(crate) fn poll_expiration(
214         &mut self,
215         expiration: &Expiration,
216         store: &mut T::Store,
217     ) -> Option<T::Owned> {
218         while let Some(item) = self.pop_entry(expiration, store) {
219             if expiration.level == 0 {
220                 debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline);
221 
222                 return Some(item);
223             } else {
224                 let when = T::when(item.borrow(), store);
225 
226                 let next_level = expiration.level - 1;
227 
228                 self.levels[next_level].add_entry(when, item, store);
229             }
230         }
231 
232         None
233     }
234 
set_elapsed(&mut self, when: u64)235     fn set_elapsed(&mut self, when: u64) {
236         assert!(
237             self.elapsed <= when,
238             "elapsed={:?}; when={:?}",
239             self.elapsed,
240             when
241         );
242 
243         if when > self.elapsed {
244             self.elapsed = when;
245         }
246     }
247 
pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned>248     fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned> {
249         self.levels[expiration.level].pop_entry_slot(expiration.slot, store)
250     }
251 
peek_entry(&self, expiration: &Expiration) -> Option<T::Owned>252     fn peek_entry(&self, expiration: &Expiration) -> Option<T::Owned> {
253         self.levels[expiration.level].peek_entry_slot(expiration.slot)
254     }
255 
level_for(&self, when: u64) -> usize256     fn level_for(&self, when: u64) -> usize {
257         level_for(self.elapsed, when)
258     }
259 }
260 
level_for(elapsed: u64, when: u64) -> usize261 fn level_for(elapsed: u64, when: u64) -> usize {
262     const SLOT_MASK: u64 = (1 << 6) - 1;
263 
264     // Mask in the trailing bits ignored by the level calculation in order to cap
265     // the possible leading zeros
266     let mut masked = elapsed ^ when | SLOT_MASK;
267     if masked >= MAX_DURATION {
268         // Fudge the timer into the top level
269         masked = MAX_DURATION - 1;
270     }
271     let leading_zeros = masked.leading_zeros() as usize;
272     let significant = 63 - leading_zeros;
273     significant / 6
274 }
275 
276 #[cfg(all(test, not(loom)))]
277 mod test {
278     use super::*;
279 
280     #[test]
test_level_for()281     fn test_level_for() {
282         for pos in 0..64 {
283             assert_eq!(0, level_for(0, pos), "level_for({pos}) -- binary = {pos:b}");
284         }
285 
286         for level in 1..5 {
287             for pos in level..64 {
288                 let a = pos * 64_usize.pow(level as u32);
289                 assert_eq!(
290                     level,
291                     level_for(0, a as u64),
292                     "level_for({a}) -- binary = {a:b}"
293                 );
294 
295                 if pos > level {
296                     let a = a - 1;
297                     assert_eq!(
298                         level,
299                         level_for(0, a as u64),
300                         "level_for({a}) -- binary = {a:b}"
301                     );
302                 }
303 
304                 if pos < 64 {
305                     let a = a + 1;
306                     assert_eq!(
307                         level,
308                         level_for(0, a as u64),
309                         "level_for({a}) -- binary = {a:b}"
310                     );
311                 }
312             }
313         }
314     }
315 }
316