1 //! A retry "budget" for allowing only a certain amount of retries over time.
2 
3 use std::{
4     fmt,
5     sync::{
6         atomic::{AtomicIsize, Ordering},
7         Mutex,
8     },
9     time::Duration,
10 };
11 use tokio::time::Instant;
12 
13 /// Represents a "budget" for retrying requests.
14 ///
15 /// This is useful for limiting the amount of retries a service can perform
16 /// over a period of time, or per a certain number of requests attempted.
17 pub struct Budget {
18     bucket: Bucket,
19     deposit_amount: isize,
20     withdraw_amount: isize,
21 }
22 
23 /// Indicates that it is not currently allowed to "withdraw" another retry
24 /// from the [`Budget`].
25 #[derive(Debug)]
26 pub struct Overdrawn {
27     _inner: (),
28 }
29 
30 #[derive(Debug)]
31 struct Bucket {
32     generation: Mutex<Generation>,
33     /// Initial budget allowed for every second.
34     reserve: isize,
35     /// Slots of a the TTL divided evenly.
36     slots: Box<[AtomicIsize]>,
37     /// The amount of time represented by each slot.
38     window: Duration,
39     /// The changers for the current slot to be commited
40     /// after the slot expires.
41     writer: AtomicIsize,
42 }
43 
44 #[derive(Debug)]
45 struct Generation {
46     /// Slot index of the last generation.
47     index: usize,
48     /// The timestamp since the last generation expired.
49     time: Instant,
50 }
51 
52 // ===== impl Budget =====
53 
54 impl Budget {
55     /// Create a [`Budget`] that allows for a certain percent of the total
56     /// requests to be retried.
57     ///
58     /// - The `ttl` is the duration of how long a single `deposit` should be
59     ///   considered. Must be between 1 and 60 seconds.
60     /// - The `min_per_sec` is the minimum rate of retries allowed to accomodate
61     ///   clients that have just started issuing requests, or clients that do
62     ///   not issue many requests per window.
63     /// - The `retry_percent` is the percentage of calls to `deposit` that can
64     ///   be retried. This is in addition to any retries allowed for via
65     ///   `min_per_sec`. Must be between 0 and 1000.
66     ///
67     ///   As an example, if `0.1` is used, then for every 10 calls to `deposit`,
68     ///   1 retry will be allowed. If `2.0` is used, then every `deposit`
69     ///   allows for 2 retries.
new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self70     pub fn new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self {
71         // assertions taken from finagle
72         assert!(ttl >= Duration::from_secs(1));
73         assert!(ttl <= Duration::from_secs(60));
74         assert!(retry_percent >= 0.0);
75         assert!(retry_percent <= 1000.0);
76         assert!(min_per_sec < ::std::i32::MAX as u32);
77 
78         let (deposit_amount, withdraw_amount) = if retry_percent == 0.0 {
79             // If there is no percent, then you gain nothing from deposits.
80             // Withdrawals can only be made against the reserve, over time.
81             (0, 1)
82         } else if retry_percent <= 1.0 {
83             (1, (1.0 / retry_percent) as isize)
84         } else {
85             // Support for when retry_percent is between 1.0 and 1000.0,
86             // meaning for every deposit D, D*retry_percent withdrawals
87             // can be made.
88             (1000, (1000.0 / retry_percent) as isize)
89         };
90         let reserve = (min_per_sec as isize)
91             .saturating_mul(ttl.as_secs() as isize) // ttl is between 1 and 60 seconds
92             .saturating_mul(withdraw_amount);
93 
94         // AtomicIsize isn't clone, so the slots need to be built in a loop...
95         let windows = 10u32;
96         let mut slots = Vec::with_capacity(windows as usize);
97         for _ in 0..windows {
98             slots.push(AtomicIsize::new(0));
99         }
100 
101         Budget {
102             bucket: Bucket {
103                 generation: Mutex::new(Generation {
104                     index: 0,
105                     time: Instant::now(),
106                 }),
107                 reserve,
108                 slots: slots.into_boxed_slice(),
109                 window: ttl / windows,
110                 writer: AtomicIsize::new(0),
111             },
112             deposit_amount,
113             withdraw_amount,
114         }
115     }
116 
117     /// Store a "deposit" in the budget, which will be used to permit future
118     /// withdrawals.
deposit(&self)119     pub fn deposit(&self) {
120         self.bucket.put(self.deposit_amount);
121     }
122 
123     /// Check whether there is enough "balance" in the budget to issue a new
124     /// retry.
125     ///
126     /// If there is not enough, an `Err(Overdrawn)` is returned.
withdraw(&self) -> Result<(), Overdrawn>127     pub fn withdraw(&self) -> Result<(), Overdrawn> {
128         if self.bucket.try_get(self.withdraw_amount) {
129             Ok(())
130         } else {
131             Err(Overdrawn { _inner: () })
132         }
133     }
134 }
135 
136 impl Default for Budget {
default() -> Budget137     fn default() -> Budget {
138         Budget::new(Duration::from_secs(10), 10, 0.2)
139     }
140 }
141 
142 impl fmt::Debug for Budget {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result143     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
144         f.debug_struct("Budget")
145             .field("deposit", &self.deposit_amount)
146             .field("withdraw", &self.withdraw_amount)
147             .field("balance", &self.bucket.sum())
148             .finish()
149     }
150 }
151 
152 // ===== impl Bucket =====
153 
154 impl Bucket {
put(&self, amt: isize)155     fn put(&self, amt: isize) {
156         self.expire();
157         self.writer.fetch_add(amt, Ordering::SeqCst);
158     }
159 
try_get(&self, amt: isize) -> bool160     fn try_get(&self, amt: isize) -> bool {
161         debug_assert!(amt >= 0);
162 
163         self.expire();
164 
165         let sum = self.sum();
166         if sum >= amt {
167             self.writer.fetch_add(-amt, Ordering::SeqCst);
168             true
169         } else {
170             false
171         }
172     }
173 
expire(&self)174     fn expire(&self) {
175         let mut gen = self.generation.lock().expect("generation lock");
176 
177         let now = Instant::now();
178         let diff = now.saturating_duration_since(gen.time);
179         if diff < self.window {
180             // not expired yet
181             return;
182         }
183 
184         let to_commit = self.writer.swap(0, Ordering::SeqCst);
185         self.slots[gen.index].store(to_commit, Ordering::SeqCst);
186 
187         let mut diff = diff;
188         let mut idx = (gen.index + 1) % self.slots.len();
189         while diff > self.window {
190             self.slots[idx].store(0, Ordering::SeqCst);
191             diff -= self.window;
192             idx = (idx + 1) % self.slots.len();
193         }
194 
195         gen.index = idx;
196         gen.time = now;
197     }
198 
sum(&self) -> isize199     fn sum(&self) -> isize {
200         let current = self.writer.load(Ordering::SeqCst);
201         let windowed_sum: isize = self
202             .slots
203             .iter()
204             .map(|slot| slot.load(Ordering::SeqCst))
205             // fold() is used instead of sum() to determine overflow behavior
206             .fold(0, isize::saturating_add);
207 
208         current
209             .saturating_add(windowed_sum)
210             .saturating_add(self.reserve)
211     }
212 }
213 
214 #[cfg(test)]
215 mod tests {
216     use super::*;
217     use tokio::time;
218 
219     #[test]
empty()220     fn empty() {
221         let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
222         bgt.withdraw().unwrap_err();
223     }
224 
225     #[tokio::test]
leaky()226     async fn leaky() {
227         time::pause();
228 
229         let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
230         bgt.deposit();
231 
232         time::advance(Duration::from_secs(3)).await;
233 
234         bgt.withdraw().unwrap_err();
235     }
236 
237     #[tokio::test]
slots()238     async fn slots() {
239         time::pause();
240 
241         let bgt = Budget::new(Duration::from_secs(1), 0, 0.5);
242         bgt.deposit();
243         bgt.deposit();
244         time::advance(Duration::from_millis(901)).await;
245         // 900ms later, the deposit should still be valid
246         bgt.withdraw().unwrap();
247 
248         // blank slate
249         time::advance(Duration::from_millis(2001)).await;
250 
251         bgt.deposit();
252         time::advance(Duration::from_millis(301)).await;
253         bgt.deposit();
254         time::advance(Duration::from_millis(801)).await;
255         bgt.deposit();
256 
257         // the first deposit is expired, but the 2nd should still be valid,
258         // combining with the 3rd
259         bgt.withdraw().unwrap();
260     }
261 
262     #[tokio::test]
reserve()263     async fn reserve() {
264         let bgt = Budget::new(Duration::from_secs(1), 5, 1.0);
265         bgt.withdraw().unwrap();
266         bgt.withdraw().unwrap();
267         bgt.withdraw().unwrap();
268         bgt.withdraw().unwrap();
269         bgt.withdraw().unwrap();
270 
271         bgt.withdraw().unwrap_err();
272     }
273 }
274