1 // Copyright (C) 2020-2022, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 //! Delivery rate estimation.
28 //!
29 //! This implements the algorithm for estimating delivery rate as described in
30 //! <https://tools.ietf.org/html/draft-cheng-iccrg-delivery-rate-estimation-01>
31 
32 use std::time::Duration;
33 use std::time::Instant;
34 
35 use crate::recovery::Acked;
36 use crate::recovery::Sent;
37 
38 #[derive(Debug)]
39 pub struct Rate {
40     delivered: usize,
41 
42     delivered_time: Instant,
43 
44     first_sent_time: Instant,
45 
46     // Packet number of the last sent packet with app limited.
47     end_of_app_limited: u64,
48 
49     // Packet number of the last sent packet.
50     last_sent_packet: u64,
51 
52     // Packet number of the largest acked packet.
53     largest_acked: u64,
54 
55     // Sample of rate estimation.
56     rate_sample: RateSample,
57 }
58 
59 impl Default for Rate {
default() -> Self60     fn default() -> Self {
61         let now = Instant::now();
62 
63         Rate {
64             delivered: 0,
65 
66             delivered_time: now,
67 
68             first_sent_time: now,
69 
70             end_of_app_limited: 0,
71 
72             last_sent_packet: 0,
73 
74             largest_acked: 0,
75 
76             rate_sample: RateSample::default(),
77         }
78     }
79 }
80 
81 impl Rate {
on_packet_sent(&mut self, pkt: &mut Sent, bytes_in_flight: usize)82     pub fn on_packet_sent(&mut self, pkt: &mut Sent, bytes_in_flight: usize) {
83         // No packets in flight.
84         if bytes_in_flight == 0 {
85             self.first_sent_time = pkt.time_sent;
86             self.delivered_time = pkt.time_sent;
87         }
88 
89         pkt.first_sent_time = self.first_sent_time;
90         pkt.delivered_time = self.delivered_time;
91         pkt.delivered = self.delivered;
92         pkt.is_app_limited = self.app_limited();
93 
94         self.last_sent_packet = pkt.pkt_num;
95     }
96 
97     // Update the delivery rate sample when a packet is acked.
update_rate_sample(&mut self, pkt: &Acked, now: Instant)98     pub fn update_rate_sample(&mut self, pkt: &Acked, now: Instant) {
99         self.delivered += pkt.size;
100         self.delivered_time = now;
101 
102         // Update info using the newest packet. If rate_sample is not yet
103         // initialized, initialize with the first packet.
104         if self.rate_sample.prior_time.is_none() ||
105             pkt.delivered > self.rate_sample.prior_delivered
106         {
107             self.rate_sample.prior_delivered = pkt.delivered;
108             self.rate_sample.prior_time = Some(pkt.delivered_time);
109             self.rate_sample.is_app_limited = pkt.is_app_limited;
110             self.rate_sample.send_elapsed =
111                 pkt.time_sent.saturating_duration_since(pkt.first_sent_time);
112             self.rate_sample.rtt = pkt.rtt;
113             self.rate_sample.ack_elapsed = self
114                 .delivered_time
115                 .saturating_duration_since(pkt.delivered_time);
116 
117             self.first_sent_time = pkt.time_sent;
118         }
119 
120         self.largest_acked = self.largest_acked.max(pkt.pkt_num);
121     }
122 
generate_rate_sample(&mut self, min_rtt: Duration)123     pub fn generate_rate_sample(&mut self, min_rtt: Duration) {
124         // End app-limited phase if bubble is ACKed and gone.
125         if self.app_limited() && self.largest_acked > self.end_of_app_limited {
126             self.update_app_limited(false);
127         }
128 
129         if self.rate_sample.prior_time.is_some() {
130             let interval = self
131                 .rate_sample
132                 .send_elapsed
133                 .max(self.rate_sample.ack_elapsed);
134 
135             self.rate_sample.delivered =
136                 self.delivered - self.rate_sample.prior_delivered;
137             self.rate_sample.interval = interval;
138 
139             if interval < min_rtt {
140                 self.rate_sample.interval = Duration::ZERO;
141 
142                 // No reliable sample.
143                 return;
144             }
145 
146             if !interval.is_zero() {
147                 // Fill in rate_sample with a rate sample.
148                 self.rate_sample.delivery_rate =
149                     (self.rate_sample.delivered as f64 / interval.as_secs_f64())
150                         as u64;
151             }
152         }
153     }
154 
update_app_limited(&mut self, v: bool)155     pub fn update_app_limited(&mut self, v: bool) {
156         self.end_of_app_limited = if v { self.last_sent_packet.max(1) } else { 0 }
157     }
158 
app_limited(&mut self) -> bool159     pub fn app_limited(&mut self) -> bool {
160         self.end_of_app_limited != 0
161     }
162 
delivered(&self) -> usize163     pub fn delivered(&self) -> usize {
164         self.delivered
165     }
166 
sample_delivery_rate(&self) -> u64167     pub fn sample_delivery_rate(&self) -> u64 {
168         self.rate_sample.delivery_rate
169     }
170 
sample_rtt(&self) -> Duration171     pub fn sample_rtt(&self) -> Duration {
172         self.rate_sample.rtt
173     }
174 
sample_is_app_limited(&self) -> bool175     pub fn sample_is_app_limited(&self) -> bool {
176         self.rate_sample.is_app_limited
177     }
178 }
179 
180 #[derive(Default, Debug)]
181 struct RateSample {
182     delivery_rate: u64,
183 
184     is_app_limited: bool,
185 
186     interval: Duration,
187 
188     delivered: usize,
189 
190     prior_delivered: usize,
191 
192     prior_time: Option<Instant>,
193 
194     send_elapsed: Duration,
195 
196     ack_elapsed: Duration,
197 
198     rtt: Duration,
199 }
200 
201 #[cfg(test)]
202 mod tests {
203     use super::*;
204 
205     use crate::recovery::*;
206 
207     use smallvec::smallvec;
208 
209     #[test]
rate_check()210     fn rate_check() {
211         let config = Config::new(0xbabababa).unwrap();
212         let mut r = Recovery::new(&config);
213 
214         let now = Instant::now();
215         let mss = r.max_datagram_size();
216 
217         // Send 2 packets.
218         for pn in 0..2 {
219             let pkt = Sent {
220                 pkt_num: pn,
221                 frames: smallvec![],
222                 time_sent: now,
223                 time_acked: None,
224                 time_lost: None,
225                 size: mss,
226                 ack_eliciting: true,
227                 in_flight: true,
228                 delivered: 0,
229                 delivered_time: now,
230                 first_sent_time: now,
231                 is_app_limited: false,
232                 has_data: false,
233             };
234 
235             r.on_packet_sent(
236                 pkt,
237                 packet::Epoch::Application,
238                 HandshakeStatus::default(),
239                 now,
240                 "",
241             );
242         }
243 
244         let rtt = Duration::from_millis(50);
245         let now = now + rtt;
246 
247         // Ack 2 packets.
248         for pn in 0..2 {
249             let acked = Acked {
250                 pkt_num: pn,
251                 time_sent: now,
252                 size: mss,
253                 rtt,
254                 delivered: 0,
255                 delivered_time: now,
256                 first_sent_time: now.checked_sub(rtt).unwrap(),
257                 is_app_limited: false,
258             };
259 
260             r.delivery_rate.update_rate_sample(&acked, now);
261         }
262 
263         // Update rate sample after 1 rtt.
264         r.delivery_rate.generate_rate_sample(rtt);
265 
266         // Bytes acked so far.
267         assert_eq!(r.delivery_rate.delivered(), 2400);
268 
269         // Estimated delivery rate = (1200 x 2) / 0.05s = 48000.
270         assert_eq!(r.delivery_rate(), 48000);
271     }
272 
273     #[test]
app_limited_cwnd_full()274     fn app_limited_cwnd_full() {
275         let config = Config::new(0xbabababa).unwrap();
276         let mut r = Recovery::new(&config);
277 
278         let now = Instant::now();
279         let mss = r.max_datagram_size();
280 
281         // Send 10 packets to fill cwnd.
282         for pn in 0..10 {
283             let pkt = Sent {
284                 pkt_num: pn,
285                 frames: smallvec![],
286                 time_sent: now,
287                 time_acked: None,
288                 time_lost: None,
289                 size: mss,
290                 ack_eliciting: true,
291                 in_flight: true,
292                 delivered: 0,
293                 delivered_time: now,
294                 first_sent_time: now,
295                 is_app_limited: false,
296                 has_data: false,
297             };
298 
299             r.on_packet_sent(
300                 pkt,
301                 packet::Epoch::Application,
302                 HandshakeStatus::default(),
303                 now,
304                 "",
305             );
306         }
307 
308         assert_eq!(r.app_limited(), false);
309         assert_eq!(r.delivery_rate.sample_is_app_limited(), false);
310     }
311 
312     #[test]
app_limited_check()313     fn app_limited_check() {
314         let config = Config::new(0xbabababa).unwrap();
315         let mut r = Recovery::new(&config);
316 
317         let now = Instant::now();
318         let mss = r.max_datagram_size();
319 
320         // Send 5 packets.
321         for pn in 0..5 {
322             let pkt = Sent {
323                 pkt_num: pn,
324                 frames: smallvec![],
325                 time_sent: now,
326                 time_acked: None,
327                 time_lost: None,
328                 size: mss,
329                 ack_eliciting: true,
330                 in_flight: true,
331                 delivered: 0,
332                 delivered_time: now,
333                 first_sent_time: now,
334                 is_app_limited: false,
335                 has_data: false,
336             };
337 
338             r.on_packet_sent(
339                 pkt,
340                 packet::Epoch::Application,
341                 HandshakeStatus::default(),
342                 now,
343                 "",
344             );
345         }
346 
347         let rtt = Duration::from_millis(50);
348         let now = now + rtt;
349 
350         let mut acked = ranges::RangeSet::default();
351         acked.insert(0..5);
352 
353         assert_eq!(
354             r.on_ack_received(
355                 &acked,
356                 25,
357                 packet::Epoch::Application,
358                 HandshakeStatus::default(),
359                 now,
360                 "",
361             ),
362             Ok((0, 0)),
363         );
364 
365         assert_eq!(r.app_limited(), true);
366         // Rate sample is not app limited (all acked).
367         assert_eq!(r.delivery_rate.sample_is_app_limited(), false);
368         assert_eq!(r.delivery_rate.sample_rtt(), rtt);
369     }
370 }
371