//! A retry "budget" for allowing only a certain amount of retries over time. use std::{ fmt, sync::{ atomic::{AtomicIsize, Ordering}, Mutex, }, time::Duration, }; use tokio::time::Instant; /// Represents a "budget" for retrying requests. /// /// This is useful for limiting the amount of retries a service can perform /// over a period of time, or per a certain number of requests attempted. pub struct Budget { bucket: Bucket, deposit_amount: isize, withdraw_amount: isize, } /// Indicates that it is not currently allowed to "withdraw" another retry /// from the [`Budget`]. #[derive(Debug)] pub struct Overdrawn { _inner: (), } #[derive(Debug)] struct Bucket { generation: Mutex, /// Initial budget allowed for every second. reserve: isize, /// Slots of a the TTL divided evenly. slots: Box<[AtomicIsize]>, /// The amount of time represented by each slot. window: Duration, /// The changers for the current slot to be commited /// after the slot expires. writer: AtomicIsize, } #[derive(Debug)] struct Generation { /// Slot index of the last generation. index: usize, /// The timestamp since the last generation expired. time: Instant, } // ===== impl Budget ===== impl Budget { /// Create a [`Budget`] that allows for a certain percent of the total /// requests to be retried. /// /// - The `ttl` is the duration of how long a single `deposit` should be /// considered. Must be between 1 and 60 seconds. /// - The `min_per_sec` is the minimum rate of retries allowed to accomodate /// clients that have just started issuing requests, or clients that do /// not issue many requests per window. /// - The `retry_percent` is the percentage of calls to `deposit` that can /// be retried. This is in addition to any retries allowed for via /// `min_per_sec`. Must be between 0 and 1000. /// /// As an example, if `0.1` is used, then for every 10 calls to `deposit`, /// 1 retry will be allowed. If `2.0` is used, then every `deposit` /// allows for 2 retries. pub fn new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self { // assertions taken from finagle assert!(ttl >= Duration::from_secs(1)); assert!(ttl <= Duration::from_secs(60)); assert!(retry_percent >= 0.0); assert!(retry_percent <= 1000.0); assert!(min_per_sec < ::std::i32::MAX as u32); let (deposit_amount, withdraw_amount) = if retry_percent == 0.0 { // If there is no percent, then you gain nothing from deposits. // Withdrawals can only be made against the reserve, over time. (0, 1) } else if retry_percent <= 1.0 { (1, (1.0 / retry_percent) as isize) } else { // Support for when retry_percent is between 1.0 and 1000.0, // meaning for every deposit D, D*retry_percent withdrawals // can be made. (1000, (1000.0 / retry_percent) as isize) }; let reserve = (min_per_sec as isize) .saturating_mul(ttl.as_secs() as isize) // ttl is between 1 and 60 seconds .saturating_mul(withdraw_amount); // AtomicIsize isn't clone, so the slots need to be built in a loop... let windows = 10u32; let mut slots = Vec::with_capacity(windows as usize); for _ in 0..windows { slots.push(AtomicIsize::new(0)); } Budget { bucket: Bucket { generation: Mutex::new(Generation { index: 0, time: Instant::now(), }), reserve, slots: slots.into_boxed_slice(), window: ttl / windows, writer: AtomicIsize::new(0), }, deposit_amount, withdraw_amount, } } /// Store a "deposit" in the budget, which will be used to permit future /// withdrawals. pub fn deposit(&self) { self.bucket.put(self.deposit_amount); } /// Check whether there is enough "balance" in the budget to issue a new /// retry. /// /// If there is not enough, an `Err(Overdrawn)` is returned. pub fn withdraw(&self) -> Result<(), Overdrawn> { if self.bucket.try_get(self.withdraw_amount) { Ok(()) } else { Err(Overdrawn { _inner: () }) } } } impl Default for Budget { fn default() -> Budget { Budget::new(Duration::from_secs(10), 10, 0.2) } } impl fmt::Debug for Budget { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Budget") .field("deposit", &self.deposit_amount) .field("withdraw", &self.withdraw_amount) .field("balance", &self.bucket.sum()) .finish() } } // ===== impl Bucket ===== impl Bucket { fn put(&self, amt: isize) { self.expire(); self.writer.fetch_add(amt, Ordering::SeqCst); } fn try_get(&self, amt: isize) -> bool { debug_assert!(amt >= 0); self.expire(); let sum = self.sum(); if sum >= amt { self.writer.fetch_add(-amt, Ordering::SeqCst); true } else { false } } fn expire(&self) { let mut gen = self.generation.lock().expect("generation lock"); let now = Instant::now(); let diff = now.saturating_duration_since(gen.time); if diff < self.window { // not expired yet return; } let to_commit = self.writer.swap(0, Ordering::SeqCst); self.slots[gen.index].store(to_commit, Ordering::SeqCst); let mut diff = diff; let mut idx = (gen.index + 1) % self.slots.len(); while diff > self.window { self.slots[idx].store(0, Ordering::SeqCst); diff -= self.window; idx = (idx + 1) % self.slots.len(); } gen.index = idx; gen.time = now; } fn sum(&self) -> isize { let current = self.writer.load(Ordering::SeqCst); let windowed_sum: isize = self .slots .iter() .map(|slot| slot.load(Ordering::SeqCst)) // fold() is used instead of sum() to determine overflow behavior .fold(0, isize::saturating_add); current .saturating_add(windowed_sum) .saturating_add(self.reserve) } } #[cfg(test)] mod tests { use super::*; use tokio::time; #[test] fn empty() { let bgt = Budget::new(Duration::from_secs(1), 0, 1.0); bgt.withdraw().unwrap_err(); } #[tokio::test] async fn leaky() { time::pause(); let bgt = Budget::new(Duration::from_secs(1), 0, 1.0); bgt.deposit(); time::advance(Duration::from_secs(3)).await; bgt.withdraw().unwrap_err(); } #[tokio::test] async fn slots() { time::pause(); let bgt = Budget::new(Duration::from_secs(1), 0, 0.5); bgt.deposit(); bgt.deposit(); time::advance(Duration::from_millis(901)).await; // 900ms later, the deposit should still be valid bgt.withdraw().unwrap(); // blank slate time::advance(Duration::from_millis(2001)).await; bgt.deposit(); time::advance(Duration::from_millis(301)).await; bgt.deposit(); time::advance(Duration::from_millis(801)).await; bgt.deposit(); // the first deposit is expired, but the 2nd should still be valid, // combining with the 3rd bgt.withdraw().unwrap(); } #[tokio::test] async fn reserve() { let bgt = Budget::new(Duration::from_secs(1), 5, 1.0); bgt.withdraw().unwrap(); bgt.withdraw().unwrap(); bgt.withdraw().unwrap(); bgt.withdraw().unwrap(); bgt.withdraw().unwrap(); bgt.withdraw().unwrap_err(); } }