1 //! A retry "budget" for allowing only a certain amount of retries over time. 2 3 use std::{ 4 fmt, 5 sync::{ 6 atomic::{AtomicIsize, Ordering}, 7 Mutex, 8 }, 9 time::Duration, 10 }; 11 use tokio::time::Instant; 12 13 /// Represents a "budget" for retrying requests. 14 /// 15 /// This is useful for limiting the amount of retries a service can perform 16 /// over a period of time, or per a certain number of requests attempted. 17 pub struct Budget { 18 bucket: Bucket, 19 deposit_amount: isize, 20 withdraw_amount: isize, 21 } 22 23 /// Indicates that it is not currently allowed to "withdraw" another retry 24 /// from the [`Budget`]. 25 #[derive(Debug)] 26 pub struct Overdrawn { 27 _inner: (), 28 } 29 30 #[derive(Debug)] 31 struct Bucket { 32 generation: Mutex<Generation>, 33 /// Initial budget allowed for every second. 34 reserve: isize, 35 /// Slots of a the TTL divided evenly. 36 slots: Box<[AtomicIsize]>, 37 /// The amount of time represented by each slot. 38 window: Duration, 39 /// The changers for the current slot to be commited 40 /// after the slot expires. 41 writer: AtomicIsize, 42 } 43 44 #[derive(Debug)] 45 struct Generation { 46 /// Slot index of the last generation. 47 index: usize, 48 /// The timestamp since the last generation expired. 49 time: Instant, 50 } 51 52 // ===== impl Budget ===== 53 54 impl Budget { 55 /// Create a [`Budget`] that allows for a certain percent of the total 56 /// requests to be retried. 57 /// 58 /// - The `ttl` is the duration of how long a single `deposit` should be 59 /// considered. Must be between 1 and 60 seconds. 60 /// - The `min_per_sec` is the minimum rate of retries allowed to accomodate 61 /// clients that have just started issuing requests, or clients that do 62 /// not issue many requests per window. 63 /// - The `retry_percent` is the percentage of calls to `deposit` that can 64 /// be retried. This is in addition to any retries allowed for via 65 /// `min_per_sec`. Must be between 0 and 1000. 66 /// 67 /// As an example, if `0.1` is used, then for every 10 calls to `deposit`, 68 /// 1 retry will be allowed. If `2.0` is used, then every `deposit` 69 /// allows for 2 retries. new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self70 pub fn new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self { 71 // assertions taken from finagle 72 assert!(ttl >= Duration::from_secs(1)); 73 assert!(ttl <= Duration::from_secs(60)); 74 assert!(retry_percent >= 0.0); 75 assert!(retry_percent <= 1000.0); 76 assert!(min_per_sec < ::std::i32::MAX as u32); 77 78 let (deposit_amount, withdraw_amount) = if retry_percent == 0.0 { 79 // If there is no percent, then you gain nothing from deposits. 80 // Withdrawals can only be made against the reserve, over time. 81 (0, 1) 82 } else if retry_percent <= 1.0 { 83 (1, (1.0 / retry_percent) as isize) 84 } else { 85 // Support for when retry_percent is between 1.0 and 1000.0, 86 // meaning for every deposit D, D*retry_percent withdrawals 87 // can be made. 88 (1000, (1000.0 / retry_percent) as isize) 89 }; 90 let reserve = (min_per_sec as isize) 91 .saturating_mul(ttl.as_secs() as isize) // ttl is between 1 and 60 seconds 92 .saturating_mul(withdraw_amount); 93 94 // AtomicIsize isn't clone, so the slots need to be built in a loop... 95 let windows = 10u32; 96 let mut slots = Vec::with_capacity(windows as usize); 97 for _ in 0..windows { 98 slots.push(AtomicIsize::new(0)); 99 } 100 101 Budget { 102 bucket: Bucket { 103 generation: Mutex::new(Generation { 104 index: 0, 105 time: Instant::now(), 106 }), 107 reserve, 108 slots: slots.into_boxed_slice(), 109 window: ttl / windows, 110 writer: AtomicIsize::new(0), 111 }, 112 deposit_amount, 113 withdraw_amount, 114 } 115 } 116 117 /// Store a "deposit" in the budget, which will be used to permit future 118 /// withdrawals. deposit(&self)119 pub fn deposit(&self) { 120 self.bucket.put(self.deposit_amount); 121 } 122 123 /// Check whether there is enough "balance" in the budget to issue a new 124 /// retry. 125 /// 126 /// If there is not enough, an `Err(Overdrawn)` is returned. withdraw(&self) -> Result<(), Overdrawn>127 pub fn withdraw(&self) -> Result<(), Overdrawn> { 128 if self.bucket.try_get(self.withdraw_amount) { 129 Ok(()) 130 } else { 131 Err(Overdrawn { _inner: () }) 132 } 133 } 134 } 135 136 impl Default for Budget { default() -> Budget137 fn default() -> Budget { 138 Budget::new(Duration::from_secs(10), 10, 0.2) 139 } 140 } 141 142 impl fmt::Debug for Budget { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result143 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 144 f.debug_struct("Budget") 145 .field("deposit", &self.deposit_amount) 146 .field("withdraw", &self.withdraw_amount) 147 .field("balance", &self.bucket.sum()) 148 .finish() 149 } 150 } 151 152 // ===== impl Bucket ===== 153 154 impl Bucket { put(&self, amt: isize)155 fn put(&self, amt: isize) { 156 self.expire(); 157 self.writer.fetch_add(amt, Ordering::SeqCst); 158 } 159 try_get(&self, amt: isize) -> bool160 fn try_get(&self, amt: isize) -> bool { 161 debug_assert!(amt >= 0); 162 163 self.expire(); 164 165 let sum = self.sum(); 166 if sum >= amt { 167 self.writer.fetch_add(-amt, Ordering::SeqCst); 168 true 169 } else { 170 false 171 } 172 } 173 expire(&self)174 fn expire(&self) { 175 let mut gen = self.generation.lock().expect("generation lock"); 176 177 let now = Instant::now(); 178 let diff = now.saturating_duration_since(gen.time); 179 if diff < self.window { 180 // not expired yet 181 return; 182 } 183 184 let to_commit = self.writer.swap(0, Ordering::SeqCst); 185 self.slots[gen.index].store(to_commit, Ordering::SeqCst); 186 187 let mut diff = diff; 188 let mut idx = (gen.index + 1) % self.slots.len(); 189 while diff > self.window { 190 self.slots[idx].store(0, Ordering::SeqCst); 191 diff -= self.window; 192 idx = (idx + 1) % self.slots.len(); 193 } 194 195 gen.index = idx; 196 gen.time = now; 197 } 198 sum(&self) -> isize199 fn sum(&self) -> isize { 200 let current = self.writer.load(Ordering::SeqCst); 201 let windowed_sum: isize = self 202 .slots 203 .iter() 204 .map(|slot| slot.load(Ordering::SeqCst)) 205 // fold() is used instead of sum() to determine overflow behavior 206 .fold(0, isize::saturating_add); 207 208 current 209 .saturating_add(windowed_sum) 210 .saturating_add(self.reserve) 211 } 212 } 213 214 #[cfg(test)] 215 mod tests { 216 use super::*; 217 use tokio::time; 218 219 #[test] empty()220 fn empty() { 221 let bgt = Budget::new(Duration::from_secs(1), 0, 1.0); 222 bgt.withdraw().unwrap_err(); 223 } 224 225 #[tokio::test] leaky()226 async fn leaky() { 227 time::pause(); 228 229 let bgt = Budget::new(Duration::from_secs(1), 0, 1.0); 230 bgt.deposit(); 231 232 time::advance(Duration::from_secs(3)).await; 233 234 bgt.withdraw().unwrap_err(); 235 } 236 237 #[tokio::test] slots()238 async fn slots() { 239 time::pause(); 240 241 let bgt = Budget::new(Duration::from_secs(1), 0, 0.5); 242 bgt.deposit(); 243 bgt.deposit(); 244 time::advance(Duration::from_millis(901)).await; 245 // 900ms later, the deposit should still be valid 246 bgt.withdraw().unwrap(); 247 248 // blank slate 249 time::advance(Duration::from_millis(2001)).await; 250 251 bgt.deposit(); 252 time::advance(Duration::from_millis(301)).await; 253 bgt.deposit(); 254 time::advance(Duration::from_millis(801)).await; 255 bgt.deposit(); 256 257 // the first deposit is expired, but the 2nd should still be valid, 258 // combining with the 3rd 259 bgt.withdraw().unwrap(); 260 } 261 262 #[tokio::test] reserve()263 async fn reserve() { 264 let bgt = Budget::new(Duration::from_secs(1), 5, 1.0); 265 bgt.withdraw().unwrap(); 266 bgt.withdraw().unwrap(); 267 bgt.withdraw().unwrap(); 268 bgt.withdraw().unwrap(); 269 bgt.withdraw().unwrap(); 270 271 bgt.withdraw().unwrap_err(); 272 } 273 } 274