1 // Copyright (C) 2022, Cloudflare, Inc. 2 // All rights reserved. 3 // 4 // Redistribution and use in source and binary forms, with or without 5 // modification, are permitted provided that the following conditions are 6 // met: 7 // 8 // * Redistributions of source code must retain the above copyright notice, 9 // this list of conditions and the following disclaimer. 10 // 11 // * Redistributions in binary form must reproduce the above copyright 12 // notice, this list of conditions and the following disclaimer in the 13 // documentation and/or other materials provided with the distribution. 14 // 15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27 //! Pacer provides the timestamp for the next packet to be sent based on the 28 //! current send_quantum, pacing rate and last updated time. 29 //! 30 //! It's a kind of leaky bucket algorithm (RFC9002, 7.7 Pacing) but it considers 31 //! max burst (send_quantum, in bytes) and provide the same timestamp for the 32 //! same sized packets (except last one) to be GSO friendly, assuming we send 33 //! packets using multiple sendmsg(), a sendmmsg(), or sendmsg() with GSO 34 //! without waiting for new I/O events. 35 //! 36 //! After sending a burst of packets, the next timestamp will be updated based 37 //! on the current pacing rate. It will make actual timestamp sent and recorded 38 //! timestamp (Sent.time_sent) as close as possible. If GSO is not used, it will 39 //! still try to provide close timestamp if the send burst is implemented. 40 41 use std::time::Duration; 42 use std::time::Instant; 43 44 #[derive(Debug)] 45 pub struct Pacer { 46 /// Whether pacing is enabled. 47 enabled: bool, 48 49 /// Bucket capacity (bytes). 50 capacity: usize, 51 52 /// Bucket used (bytes). 53 used: usize, 54 55 /// Sending pacing rate (bytes/sec). 56 rate: u64, 57 58 /// Timestamp of the last packet sent time update. 59 last_update: Instant, 60 61 /// Timestamp of the next packet to be sent. 62 next_time: Instant, 63 64 /// Current MSS. 65 max_datagram_size: usize, 66 67 /// Last packet size. 68 last_packet_size: Option<usize>, 69 70 /// Interval to be added in next burst. 71 iv: Duration, 72 } 73 74 impl Pacer { new( enabled: bool, capacity: usize, rate: u64, max_datagram_size: usize, ) -> Self75 pub fn new( 76 enabled: bool, capacity: usize, rate: u64, max_datagram_size: usize, 77 ) -> Self { 78 // Round capacity to MSS. 79 let capacity = capacity / max_datagram_size * max_datagram_size; 80 81 Pacer { 82 enabled, 83 84 capacity, 85 86 used: 0, 87 88 rate, 89 90 last_update: Instant::now(), 91 92 next_time: Instant::now(), 93 94 max_datagram_size, 95 96 last_packet_size: None, 97 98 iv: Duration::ZERO, 99 } 100 } 101 102 /// Returns whether pacing is enabled. enabled(&self) -> bool103 pub fn enabled(&self) -> bool { 104 self.enabled 105 } 106 107 /// Returns the current pacing rate. rate(&self) -> u64108 pub fn rate(&self) -> u64 { 109 self.rate 110 } 111 112 /// Updates the bucket capacity or pacing_rate. update(&mut self, capacity: usize, rate: u64, now: Instant)113 pub fn update(&mut self, capacity: usize, rate: u64, now: Instant) { 114 let capacity = capacity / self.max_datagram_size * self.max_datagram_size; 115 116 if self.capacity != capacity { 117 self.reset(now); 118 } 119 120 self.capacity = capacity; 121 122 self.rate = rate; 123 } 124 125 /// Resets the pacer for the next burst. reset(&mut self, now: Instant)126 pub fn reset(&mut self, now: Instant) { 127 self.used = 0; 128 129 self.last_update = now; 130 131 self.next_time = self.next_time.max(now); 132 133 self.last_packet_size = None; 134 135 self.iv = Duration::ZERO; 136 } 137 138 /// Updates the timestamp for the packet to send. send(&mut self, packet_size: usize, now: Instant)139 pub fn send(&mut self, packet_size: usize, now: Instant) { 140 if self.rate == 0 { 141 self.reset(now); 142 143 return; 144 } 145 146 if !self.iv.is_zero() { 147 self.next_time = self.next_time.max(now) + self.iv; 148 149 self.iv = Duration::ZERO; 150 } 151 152 let interval = 153 Duration::from_secs_f64(self.capacity as f64 / self.rate as f64); 154 155 let elapsed = now.saturating_duration_since(self.last_update); 156 157 // If too old, reset it. 158 if elapsed > interval { 159 self.reset(now); 160 } 161 162 self.used += packet_size; 163 164 let same_size = if let Some(last_packet_size) = self.last_packet_size { 165 last_packet_size == packet_size 166 } else { 167 true 168 }; 169 170 self.last_packet_size = Some(packet_size); 171 172 if self.used >= self.capacity || !same_size { 173 self.iv = 174 Duration::from_secs_f64(self.used as f64 / self.rate as f64); 175 176 self.used = 0; 177 178 self.last_update = now; 179 180 self.last_packet_size = None; 181 }; 182 } 183 184 /// Returns the timestamp for the next packet. next_time(&self) -> Instant185 pub fn next_time(&self) -> Instant { 186 self.next_time 187 } 188 } 189 190 #[cfg(test)] 191 mod tests { 192 use super::*; 193 194 #[test] pacer_update()195 fn pacer_update() { 196 let datagram_size = 1200; 197 let max_burst = datagram_size * 10; 198 let pacing_rate = 100_000; 199 200 let mut p = Pacer::new(true, max_burst, pacing_rate, datagram_size); 201 202 let now = Instant::now(); 203 204 // Send 6000 (half of max_burst) -> no timestamp change yet. 205 p.send(6000, now); 206 207 assert!(now.duration_since(p.next_time()) < Duration::from_millis(1)); 208 209 // Send 6000 bytes -> max_burst filled. 210 p.send(6000, now); 211 212 assert!(now.duration_since(p.next_time()) < Duration::from_millis(1)); 213 214 // Start of a new burst. 215 let now = now + Duration::from_millis(5); 216 217 // Send 1000 bytes and next_time is updated. 218 p.send(1000, now); 219 220 let interval = max_burst as f64 / pacing_rate as f64; 221 222 assert_eq!(p.next_time() - now, Duration::from_secs_f64(interval)); 223 } 224 225 #[test] 226 /// Same as pacer_update() but adds some idle time between transfers to 227 /// trigger a reset. pacer_idle()228 fn pacer_idle() { 229 let datagram_size = 1200; 230 let max_burst = datagram_size * 10; 231 let pacing_rate = 100_000; 232 233 let mut p = Pacer::new(true, max_burst, pacing_rate, datagram_size); 234 235 let now = Instant::now(); 236 237 // Send 6000 (half of max_burst) -> no timestamp change yet. 238 p.send(6000, now); 239 240 assert!(now.duration_since(p.next_time()) < Duration::from_millis(1)); 241 242 // Sleep 200ms to reset the idle pacer (at least 120ms). 243 let now = now + Duration::from_millis(200); 244 245 // Send 6000 bytes -> idle reset and a new burst isstarted. 246 p.send(6000, now); 247 248 assert_eq!(p.next_time(), now); 249 } 250 } 251