1 // Copyright (C) 2020-2022, Cloudflare, Inc. 2 // All rights reserved. 3 // 4 // Redistribution and use in source and binary forms, with or without 5 // modification, are permitted provided that the following conditions are 6 // met: 7 // 8 // * Redistributions of source code must retain the above copyright notice, 9 // this list of conditions and the following disclaimer. 10 // 11 // * Redistributions in binary form must reproduce the above copyright 12 // notice, this list of conditions and the following disclaimer in the 13 // documentation and/or other materials provided with the distribution. 14 // 15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27 //! Delivery rate estimation. 28 //! 29 //! This implements the algorithm for estimating delivery rate as described in 30 //! <https://tools.ietf.org/html/draft-cheng-iccrg-delivery-rate-estimation-01> 31 32 use std::time::Duration; 33 use std::time::Instant; 34 35 use crate::recovery::Acked; 36 use crate::recovery::Sent; 37 38 #[derive(Debug)] 39 pub struct Rate { 40 delivered: usize, 41 42 delivered_time: Instant, 43 44 first_sent_time: Instant, 45 46 // Packet number of the last sent packet with app limited. 47 end_of_app_limited: u64, 48 49 // Packet number of the last sent packet. 50 last_sent_packet: u64, 51 52 // Packet number of the largest acked packet. 53 largest_acked: u64, 54 55 // Sample of rate estimation. 56 rate_sample: RateSample, 57 } 58 59 impl Default for Rate { default() -> Self60 fn default() -> Self { 61 let now = Instant::now(); 62 63 Rate { 64 delivered: 0, 65 66 delivered_time: now, 67 68 first_sent_time: now, 69 70 end_of_app_limited: 0, 71 72 last_sent_packet: 0, 73 74 largest_acked: 0, 75 76 rate_sample: RateSample::default(), 77 } 78 } 79 } 80 81 impl Rate { on_packet_sent(&mut self, pkt: &mut Sent, bytes_in_flight: usize)82 pub fn on_packet_sent(&mut self, pkt: &mut Sent, bytes_in_flight: usize) { 83 // No packets in flight. 84 if bytes_in_flight == 0 { 85 self.first_sent_time = pkt.time_sent; 86 self.delivered_time = pkt.time_sent; 87 } 88 89 pkt.first_sent_time = self.first_sent_time; 90 pkt.delivered_time = self.delivered_time; 91 pkt.delivered = self.delivered; 92 pkt.is_app_limited = self.app_limited(); 93 94 self.last_sent_packet = pkt.pkt_num; 95 } 96 97 // Update the delivery rate sample when a packet is acked. update_rate_sample(&mut self, pkt: &Acked, now: Instant)98 pub fn update_rate_sample(&mut self, pkt: &Acked, now: Instant) { 99 self.delivered += pkt.size; 100 self.delivered_time = now; 101 102 // Update info using the newest packet. If rate_sample is not yet 103 // initialized, initialize with the first packet. 104 if self.rate_sample.prior_time.is_none() || 105 pkt.delivered > self.rate_sample.prior_delivered 106 { 107 self.rate_sample.prior_delivered = pkt.delivered; 108 self.rate_sample.prior_time = Some(pkt.delivered_time); 109 self.rate_sample.is_app_limited = pkt.is_app_limited; 110 self.rate_sample.send_elapsed = 111 pkt.time_sent.saturating_duration_since(pkt.first_sent_time); 112 self.rate_sample.rtt = pkt.rtt; 113 self.rate_sample.ack_elapsed = self 114 .delivered_time 115 .saturating_duration_since(pkt.delivered_time); 116 117 self.first_sent_time = pkt.time_sent; 118 } 119 120 self.largest_acked = self.largest_acked.max(pkt.pkt_num); 121 } 122 generate_rate_sample(&mut self, min_rtt: Duration)123 pub fn generate_rate_sample(&mut self, min_rtt: Duration) { 124 // End app-limited phase if bubble is ACKed and gone. 125 if self.app_limited() && self.largest_acked > self.end_of_app_limited { 126 self.update_app_limited(false); 127 } 128 129 if self.rate_sample.prior_time.is_some() { 130 let interval = self 131 .rate_sample 132 .send_elapsed 133 .max(self.rate_sample.ack_elapsed); 134 135 self.rate_sample.delivered = 136 self.delivered - self.rate_sample.prior_delivered; 137 self.rate_sample.interval = interval; 138 139 if interval < min_rtt { 140 self.rate_sample.interval = Duration::ZERO; 141 142 // No reliable sample. 143 return; 144 } 145 146 if !interval.is_zero() { 147 // Fill in rate_sample with a rate sample. 148 self.rate_sample.delivery_rate = 149 (self.rate_sample.delivered as f64 / interval.as_secs_f64()) 150 as u64; 151 } 152 } 153 } 154 update_app_limited(&mut self, v: bool)155 pub fn update_app_limited(&mut self, v: bool) { 156 self.end_of_app_limited = if v { self.last_sent_packet.max(1) } else { 0 } 157 } 158 app_limited(&mut self) -> bool159 pub fn app_limited(&mut self) -> bool { 160 self.end_of_app_limited != 0 161 } 162 delivered(&self) -> usize163 pub fn delivered(&self) -> usize { 164 self.delivered 165 } 166 sample_delivery_rate(&self) -> u64167 pub fn sample_delivery_rate(&self) -> u64 { 168 self.rate_sample.delivery_rate 169 } 170 sample_rtt(&self) -> Duration171 pub fn sample_rtt(&self) -> Duration { 172 self.rate_sample.rtt 173 } 174 sample_is_app_limited(&self) -> bool175 pub fn sample_is_app_limited(&self) -> bool { 176 self.rate_sample.is_app_limited 177 } 178 } 179 180 #[derive(Default, Debug)] 181 struct RateSample { 182 delivery_rate: u64, 183 184 is_app_limited: bool, 185 186 interval: Duration, 187 188 delivered: usize, 189 190 prior_delivered: usize, 191 192 prior_time: Option<Instant>, 193 194 send_elapsed: Duration, 195 196 ack_elapsed: Duration, 197 198 rtt: Duration, 199 } 200 201 #[cfg(test)] 202 mod tests { 203 use super::*; 204 205 use crate::recovery::*; 206 207 use smallvec::smallvec; 208 209 #[test] rate_check()210 fn rate_check() { 211 let config = Config::new(0xbabababa).unwrap(); 212 let mut r = Recovery::new(&config); 213 214 let now = Instant::now(); 215 let mss = r.max_datagram_size(); 216 217 // Send 2 packets. 218 for pn in 0..2 { 219 let pkt = Sent { 220 pkt_num: pn, 221 frames: smallvec![], 222 time_sent: now, 223 time_acked: None, 224 time_lost: None, 225 size: mss, 226 ack_eliciting: true, 227 in_flight: true, 228 delivered: 0, 229 delivered_time: now, 230 first_sent_time: now, 231 is_app_limited: false, 232 has_data: false, 233 }; 234 235 r.on_packet_sent( 236 pkt, 237 packet::Epoch::Application, 238 HandshakeStatus::default(), 239 now, 240 "", 241 ); 242 } 243 244 let rtt = Duration::from_millis(50); 245 let now = now + rtt; 246 247 // Ack 2 packets. 248 for pn in 0..2 { 249 let acked = Acked { 250 pkt_num: pn, 251 time_sent: now, 252 size: mss, 253 rtt, 254 delivered: 0, 255 delivered_time: now, 256 first_sent_time: now.checked_sub(rtt).unwrap(), 257 is_app_limited: false, 258 }; 259 260 r.delivery_rate.update_rate_sample(&acked, now); 261 } 262 263 // Update rate sample after 1 rtt. 264 r.delivery_rate.generate_rate_sample(rtt); 265 266 // Bytes acked so far. 267 assert_eq!(r.delivery_rate.delivered(), 2400); 268 269 // Estimated delivery rate = (1200 x 2) / 0.05s = 48000. 270 assert_eq!(r.delivery_rate(), 48000); 271 } 272 273 #[test] app_limited_cwnd_full()274 fn app_limited_cwnd_full() { 275 let config = Config::new(0xbabababa).unwrap(); 276 let mut r = Recovery::new(&config); 277 278 let now = Instant::now(); 279 let mss = r.max_datagram_size(); 280 281 // Send 10 packets to fill cwnd. 282 for pn in 0..10 { 283 let pkt = Sent { 284 pkt_num: pn, 285 frames: smallvec![], 286 time_sent: now, 287 time_acked: None, 288 time_lost: None, 289 size: mss, 290 ack_eliciting: true, 291 in_flight: true, 292 delivered: 0, 293 delivered_time: now, 294 first_sent_time: now, 295 is_app_limited: false, 296 has_data: false, 297 }; 298 299 r.on_packet_sent( 300 pkt, 301 packet::Epoch::Application, 302 HandshakeStatus::default(), 303 now, 304 "", 305 ); 306 } 307 308 assert_eq!(r.app_limited(), false); 309 assert_eq!(r.delivery_rate.sample_is_app_limited(), false); 310 } 311 312 #[test] app_limited_check()313 fn app_limited_check() { 314 let config = Config::new(0xbabababa).unwrap(); 315 let mut r = Recovery::new(&config); 316 317 let now = Instant::now(); 318 let mss = r.max_datagram_size(); 319 320 // Send 5 packets. 321 for pn in 0..5 { 322 let pkt = Sent { 323 pkt_num: pn, 324 frames: smallvec![], 325 time_sent: now, 326 time_acked: None, 327 time_lost: None, 328 size: mss, 329 ack_eliciting: true, 330 in_flight: true, 331 delivered: 0, 332 delivered_time: now, 333 first_sent_time: now, 334 is_app_limited: false, 335 has_data: false, 336 }; 337 338 r.on_packet_sent( 339 pkt, 340 packet::Epoch::Application, 341 HandshakeStatus::default(), 342 now, 343 "", 344 ); 345 } 346 347 let rtt = Duration::from_millis(50); 348 let now = now + rtt; 349 350 let mut acked = ranges::RangeSet::default(); 351 acked.insert(0..5); 352 353 assert_eq!( 354 r.on_ack_received( 355 &acked, 356 25, 357 packet::Epoch::Application, 358 HandshakeStatus::default(), 359 now, 360 "", 361 ), 362 Ok((0, 0)), 363 ); 364 365 assert_eq!(r.app_limited(), true); 366 // Rate sample is not app limited (all acked). 367 assert_eq!(r.delivery_rate.sample_is_app_limited(), false); 368 assert_eq!(r.delivery_rate.sample_rtt(), rtt); 369 } 370 } 371