1 // Copyright (C) 2022, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 //! Pacer provides the timestamp for the next packet to be sent based on the
28 //! current send_quantum, pacing rate and last updated time.
29 //!
30 //! It's a kind of leaky bucket algorithm (RFC9002, 7.7 Pacing) but it considers
31 //! max burst (send_quantum, in bytes) and provide the same timestamp for the
32 //! same sized packets (except last one) to be GSO friendly, assuming we send
33 //! packets using multiple sendmsg(), a sendmmsg(), or sendmsg() with GSO
34 //! without waiting for new I/O events.
35 //!
36 //! After sending a burst of packets, the next timestamp will be updated based
37 //! on the current pacing rate. It will make actual timestamp sent and recorded
38 //! timestamp (Sent.time_sent) as close as possible. If GSO is not used, it will
39 //! still try to provide close timestamp if the send burst is implemented.
40 
41 use std::time::Duration;
42 use std::time::Instant;
43 
44 #[derive(Debug)]
45 pub struct Pacer {
46     /// Whether pacing is enabled.
47     enabled: bool,
48 
49     /// Bucket capacity (bytes).
50     capacity: usize,
51 
52     /// Bucket used (bytes).
53     used: usize,
54 
55     /// Sending pacing rate (bytes/sec).
56     rate: u64,
57 
58     /// Timestamp of the last packet sent time update.
59     last_update: Instant,
60 
61     /// Timestamp of the next packet to be sent.
62     next_time: Instant,
63 
64     /// Current MSS.
65     max_datagram_size: usize,
66 
67     /// Last packet size.
68     last_packet_size: Option<usize>,
69 
70     /// Interval to be added in next burst.
71     iv: Duration,
72 }
73 
74 impl Pacer {
new( enabled: bool, capacity: usize, rate: u64, max_datagram_size: usize, ) -> Self75     pub fn new(
76         enabled: bool, capacity: usize, rate: u64, max_datagram_size: usize,
77     ) -> Self {
78         // Round capacity to MSS.
79         let capacity = capacity / max_datagram_size * max_datagram_size;
80 
81         Pacer {
82             enabled,
83 
84             capacity,
85 
86             used: 0,
87 
88             rate,
89 
90             last_update: Instant::now(),
91 
92             next_time: Instant::now(),
93 
94             max_datagram_size,
95 
96             last_packet_size: None,
97 
98             iv: Duration::ZERO,
99         }
100     }
101 
102     /// Returns whether pacing is enabled.
enabled(&self) -> bool103     pub fn enabled(&self) -> bool {
104         self.enabled
105     }
106 
107     /// Returns the current pacing rate.
rate(&self) -> u64108     pub fn rate(&self) -> u64 {
109         self.rate
110     }
111 
112     /// Updates the bucket capacity or pacing_rate.
update(&mut self, capacity: usize, rate: u64, now: Instant)113     pub fn update(&mut self, capacity: usize, rate: u64, now: Instant) {
114         let capacity = capacity / self.max_datagram_size * self.max_datagram_size;
115 
116         if self.capacity != capacity {
117             self.reset(now);
118         }
119 
120         self.capacity = capacity;
121 
122         self.rate = rate;
123     }
124 
125     /// Resets the pacer for the next burst.
reset(&mut self, now: Instant)126     pub fn reset(&mut self, now: Instant) {
127         self.used = 0;
128 
129         self.last_update = now;
130 
131         self.next_time = self.next_time.max(now);
132 
133         self.last_packet_size = None;
134 
135         self.iv = Duration::ZERO;
136     }
137 
138     /// Updates the timestamp for the packet to send.
send(&mut self, packet_size: usize, now: Instant)139     pub fn send(&mut self, packet_size: usize, now: Instant) {
140         if self.rate == 0 {
141             self.reset(now);
142 
143             return;
144         }
145 
146         if !self.iv.is_zero() {
147             self.next_time = self.next_time.max(now) + self.iv;
148 
149             self.iv = Duration::ZERO;
150         }
151 
152         let interval =
153             Duration::from_secs_f64(self.capacity as f64 / self.rate as f64);
154 
155         let elapsed = now.saturating_duration_since(self.last_update);
156 
157         // If too old, reset it.
158         if elapsed > interval {
159             self.reset(now);
160         }
161 
162         self.used += packet_size;
163 
164         let same_size = if let Some(last_packet_size) = self.last_packet_size {
165             last_packet_size == packet_size
166         } else {
167             true
168         };
169 
170         self.last_packet_size = Some(packet_size);
171 
172         if self.used >= self.capacity || !same_size {
173             self.iv =
174                 Duration::from_secs_f64(self.used as f64 / self.rate as f64);
175 
176             self.used = 0;
177 
178             self.last_update = now;
179 
180             self.last_packet_size = None;
181         };
182     }
183 
184     /// Returns the timestamp for the next packet.
next_time(&self) -> Instant185     pub fn next_time(&self) -> Instant {
186         self.next_time
187     }
188 }
189 
190 #[cfg(test)]
191 mod tests {
192     use super::*;
193 
194     #[test]
pacer_update()195     fn pacer_update() {
196         let datagram_size = 1200;
197         let max_burst = datagram_size * 10;
198         let pacing_rate = 100_000;
199 
200         let mut p = Pacer::new(true, max_burst, pacing_rate, datagram_size);
201 
202         let now = Instant::now();
203 
204         // Send 6000 (half of max_burst) -> no timestamp change yet.
205         p.send(6000, now);
206 
207         assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));
208 
209         // Send 6000 bytes -> max_burst filled.
210         p.send(6000, now);
211 
212         assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));
213 
214         // Start of a new burst.
215         let now = now + Duration::from_millis(5);
216 
217         // Send 1000 bytes and next_time is updated.
218         p.send(1000, now);
219 
220         let interval = max_burst as f64 / pacing_rate as f64;
221 
222         assert_eq!(p.next_time() - now, Duration::from_secs_f64(interval));
223     }
224 
225     #[test]
226     /// Same as pacer_update() but adds some idle time between transfers to
227     /// trigger a reset.
pacer_idle()228     fn pacer_idle() {
229         let datagram_size = 1200;
230         let max_burst = datagram_size * 10;
231         let pacing_rate = 100_000;
232 
233         let mut p = Pacer::new(true, max_burst, pacing_rate, datagram_size);
234 
235         let now = Instant::now();
236 
237         // Send 6000 (half of max_burst) -> no timestamp change yet.
238         p.send(6000, now);
239 
240         assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));
241 
242         // Sleep 200ms to reset the idle pacer (at least 120ms).
243         let now = now + Duration::from_millis(200);
244 
245         // Send 6000 bytes -> idle reset and a new burst  isstarted.
246         p.send(6000, now);
247 
248         assert_eq!(p.next_time(), now);
249     }
250 }
251