// Copyright (C) 2022, Cloudflare, Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // * Redistributions of source code must retain the above copyright notice, // this list of conditions and the following disclaimer. // // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. //! BBR Congestion Control //! //! This implementation is based on the following draft: //! use crate::minmax::Minmax; use crate::packet; use crate::recovery::*; use std::time::Duration; use std::time::Instant; pub static BBR: CongestionControlOps = CongestionControlOps { on_init, reset, on_packet_sent, on_packets_acked, congestion_event, collapse_cwnd, checkpoint, rollback, has_custom_pacing, debug_fmt, }; /// A constant specifying the length of the BBR.BtlBw max filter window for /// BBR.BtlBwFilter, BtlBwFilterLen is 10 packet-timed round trips. const BTLBW_FILTER_LEN: Duration = Duration::from_secs(10); /// A constant specifying the minimum time interval between ProbeRTT states: 10 /// secs. const PROBE_RTT_INTERVAL: Duration = Duration::from_secs(10); /// A constant specifying the length of the RTProp min filter window. const RTPROP_FILTER_LEN: Duration = PROBE_RTT_INTERVAL; /// A constant specifying the minimum gain value that will allow the sending /// rate to double each round (2/ln(2) ~= 2.89), used in Startup mode for both /// BBR.pacing_gain and BBR.cwnd_gain. const BBR_HIGH_GAIN: f64 = 2.89; /// The minimal cwnd value BBR tries to target using: 4 packets, or 4 * SMSS const BBR_MIN_PIPE_CWND_PKTS: usize = 4; /// The number of phases in the BBR ProbeBW gain cycle: 8. const BBR_GAIN_CYCLE_LEN: usize = 8; /// A constant specifying the minimum duration for which ProbeRTT state holds /// inflight to BBRMinPipeCwnd or fewer packets: 200 ms. const PROBE_RTT_DURATION: Duration = Duration::from_millis(200); /// Pacing Gain Cycle. const PACING_GAIN_CYCLE: [f64; BBR_GAIN_CYCLE_LEN] = [5.0 / 4.0, 3.0 / 4.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]; /// A constant to check BBR.BtlBW is still growing. const BTLBW_GROWTH_TARGET: f64 = 1.25; /// BBR Internal State Machine. #[derive(Debug, PartialEq, Eq)] enum BBRStateMachine { Startup, Drain, ProbeBW, ProbeRTT, } /// BBR Specific State Variables. pub struct State { // The current state of a BBR flow in the BBR state machine. state: BBRStateMachine, // The current pacing rate for a BBR flow, which controls inter-packet // spacing. pacing_rate: u64, // BBR's estimated bottleneck bandwidth available to the transport flow, // estimated from the maximum delivery rate sample in a sliding window. btlbw: u64, // The max filter used to estimate BBR.BtlBw. btlbwfilter: Minmax, // BBR's estimated two-way round-trip propagation delay of the path, // estimated from the windowed minimum recent round-trip delay sample. rtprop: Duration, // The wall clock time at which the current BBR.RTProp sample was obtained. rtprop_stamp: Instant, // A boolean recording whether the BBR.RTprop has expired and is due for a // refresh with an application idle period or a transition into ProbeRTT // state. rtprop_expired: bool, // The dynamic gain factor used to scale BBR.BtlBw to produce // BBR.pacing_rate. pacing_gain: f64, // The dynamic gain factor used to scale the estimated BDP to produce a // congestion window (cwnd). cwnd_gain: f64, // A boolean that records whether BBR estimates that it has ever fully // utilized its available bandwidth ("filled the pipe"). filled_pipe: bool, // Count of packet-timed round trips elapsed so far. round_count: u64, // A boolean that BBR sets to true once per packet-timed round trip, // on ACKs that advance BBR.round_count. round_start: bool, // packet.delivered value denoting the end of a packet-timed round trip. next_round_delivered: usize, // Timestamp when ProbeRTT state ends. probe_rtt_done_stamp: Option, // Checking if a roundtrip in ProbeRTT state ends. probe_rtt_round_done: bool, // Checking if in the packet conservation mode during recovery. packet_conservation: bool, // Saved cwnd before loss recovery. prior_cwnd: usize, // Checking if restarting from idle. idle_restart: bool, // Baseline level delivery rate for full pipe estimator. full_bw: u64, // The number of round for full pipe estimator without much growth. full_bw_count: usize, // Last time cycle_index is updated. cycle_stamp: Instant, // Current index of pacing_gain_cycle[]. cycle_index: usize, // The upper bound on the volume of data BBR allows in flight. target_cwnd: usize, // Whether in the recovery episode. in_recovery: bool, // Start time of the connection. start_time: Instant, // Newly marked lost data size in bytes. newly_lost_bytes: usize, // Newly acked data size in bytes. newly_acked_bytes: usize, // bytes_in_flight before processing this ACK. prior_bytes_in_flight: usize, } impl State { pub fn new() -> Self { let now = Instant::now(); State { state: BBRStateMachine::Startup, pacing_rate: 0, btlbw: 0, btlbwfilter: Minmax::new(0), rtprop: Duration::ZERO, rtprop_stamp: now, rtprop_expired: false, pacing_gain: 0.0, cwnd_gain: 0.0, filled_pipe: false, round_count: 0, round_start: false, next_round_delivered: 0, probe_rtt_done_stamp: None, probe_rtt_round_done: false, packet_conservation: false, prior_cwnd: 0, idle_restart: false, full_bw: 0, full_bw_count: 0, cycle_stamp: now, cycle_index: 0, target_cwnd: 0, in_recovery: false, start_time: now, newly_lost_bytes: 0, newly_acked_bytes: 0, prior_bytes_in_flight: 0, } } } // When entering the recovery episode. fn bbr_enter_recovery(r: &mut Recovery, now: Instant) { r.bbr_state.prior_cwnd = per_ack::bbr_save_cwnd(r); r.congestion_window = r.bytes_in_flight + r.bbr_state.newly_acked_bytes.max(r.max_datagram_size); r.congestion_recovery_start_time = Some(now); r.bbr_state.packet_conservation = true; r.bbr_state.in_recovery = true; // Start round now. r.bbr_state.next_round_delivered = r.delivery_rate.delivered(); } // When exiting the recovery episode. fn bbr_exit_recovery(r: &mut Recovery) { r.congestion_recovery_start_time = None; r.bbr_state.packet_conservation = false; r.bbr_state.in_recovery = false; per_ack::bbr_restore_cwnd(r); } // Congestion Control Hooks. // fn on_init(r: &mut Recovery) { init::bbr_init(r); } fn reset(r: &mut Recovery) { r.bbr_state = State::new(); init::bbr_init(r); } fn on_packet_sent(r: &mut Recovery, sent_bytes: usize, _now: Instant) { r.bytes_in_flight += sent_bytes; per_transmit::bbr_on_transmit(r); } fn on_packets_acked( r: &mut Recovery, packets: &[Acked], _epoch: packet::Epoch, now: Instant, ) { r.bbr_state.newly_acked_bytes = packets.iter().fold(0, |acked_bytes, p| { r.bbr_state.prior_bytes_in_flight = r.bytes_in_flight; per_ack::bbr_update_model_and_state(r, p, now); r.bytes_in_flight = r.bytes_in_flight.saturating_sub(p.size); acked_bytes + p.size }); if let Some(pkt) = packets.last() { if !r.in_congestion_recovery(pkt.time_sent) { // Upon exiting loss recovery. bbr_exit_recovery(r); } } per_ack::bbr_update_control_parameters(r, now); r.bbr_state.newly_lost_bytes = 0; } fn congestion_event( r: &mut Recovery, lost_bytes: usize, time_sent: Instant, _epoch: packet::Epoch, now: Instant, ) { r.bbr_state.newly_lost_bytes = lost_bytes; // Upon entering Fast Recovery. if !r.in_congestion_recovery(time_sent) { // Upon entering Fast Recovery. bbr_enter_recovery(r, now); } } fn collapse_cwnd(r: &mut Recovery) { r.bbr_state.prior_cwnd = per_ack::bbr_save_cwnd(r); reno::collapse_cwnd(r); } fn checkpoint(_r: &mut Recovery) {} fn rollback(_r: &mut Recovery) -> bool { false } fn has_custom_pacing() -> bool { true } fn debug_fmt(r: &Recovery, f: &mut std::fmt::Formatter) -> std::fmt::Result { let bbr = &r.bbr_state; write!( f, "bbr={{ state={:?} btlbw={} rtprop={:?} pacing_rate={} pacing_gain={} cwnd_gain={} target_cwnd={} send_quantum={} filled_pipe={} round_count={} }}", bbr.state, bbr.btlbw, bbr.rtprop, bbr.pacing_rate, bbr.pacing_gain, bbr.cwnd_gain, bbr.target_cwnd, r.send_quantum(), bbr.filled_pipe, bbr.round_count ) } #[cfg(test)] mod tests { use super::*; use crate::recovery; use smallvec::smallvec; #[test] fn bbr_init() { let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR); let mut r = Recovery::new(&cfg); // on_init() is called in Connection::new(), so it need to be // called manually here. r.on_init(); assert_eq!(r.cwnd(), r.max_datagram_size * INITIAL_WINDOW_PACKETS); assert_eq!(r.bytes_in_flight, 0); assert_eq!(r.bbr_state.state, BBRStateMachine::Startup); } #[test] fn bbr_send() { let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR); let mut r = Recovery::new(&cfg); let now = Instant::now(); r.on_init(); r.on_packet_sent_cc(1000, now); assert_eq!(r.bytes_in_flight, 1000); } #[test] fn bbr_startup() { let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR); let mut r = Recovery::new(&cfg); let now = Instant::now(); let mss = r.max_datagram_size; r.on_init(); // Send 5 packets. for pn in 0..5 { let pkt = Sent { pkt_num: pn, frames: smallvec![], time_sent: now, time_acked: None, time_lost: None, size: mss, ack_eliciting: true, in_flight: true, delivered: 0, delivered_time: now, first_sent_time: now, is_app_limited: false, has_data: false, }; r.on_packet_sent( pkt, packet::Epoch::Application, HandshakeStatus::default(), now, "", ); } let rtt = Duration::from_millis(50); let now = now + rtt; let cwnd_prev = r.cwnd(); let mut acked = ranges::RangeSet::default(); acked.insert(0..5); assert_eq!( r.on_ack_received( &acked, 25, packet::Epoch::Application, HandshakeStatus::default(), now, "", ), Ok((0, 0)), ); assert_eq!(r.bbr_state.state, BBRStateMachine::Startup); assert_eq!(r.cwnd(), cwnd_prev + mss * 5); assert_eq!(r.bytes_in_flight, 0); assert_eq!( r.delivery_rate(), ((mss * 5) as f64 / rtt.as_secs_f64()) as u64 ); assert_eq!(r.bbr_state.btlbw, r.delivery_rate()); } #[test] fn bbr_congestion_event() { let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR); let mut r = Recovery::new(&cfg); let now = Instant::now(); let mss = r.max_datagram_size; r.on_init(); // Send 5 packets. for pn in 0..5 { let pkt = Sent { pkt_num: pn, frames: smallvec![], time_sent: now, time_acked: None, time_lost: None, size: mss, ack_eliciting: true, in_flight: true, delivered: 0, delivered_time: now, first_sent_time: now, is_app_limited: false, has_data: false, }; r.on_packet_sent( pkt, packet::Epoch::Application, HandshakeStatus::default(), now, "", ); } let rtt = Duration::from_millis(50); let now = now + rtt; // Make a packet loss to trigger a congestion event. let mut acked = ranges::RangeSet::default(); acked.insert(4..5); // 2 acked, 2 x MSS lost. assert_eq!( r.on_ack_received( &acked, 25, packet::Epoch::Application, HandshakeStatus::default(), now, "", ), Ok((2, 2400)), ); // Sent: 0, 1, 2, 3, 4, Acked 4. assert_eq!(r.cwnd(), mss * 4); // Stil in flight: 2, 3. assert_eq!(r.bytes_in_flight, mss * 2); } #[test] fn bbr_drain() { let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR); let mut r = Recovery::new(&cfg); let now = Instant::now(); let mss = r.max_datagram_size; r.on_init(); let mut pn = 0; // Stop right before filled_pipe=true. for _ in 0..3 { let pkt = Sent { pkt_num: pn, frames: smallvec![], time_sent: now, time_acked: None, time_lost: None, size: mss, ack_eliciting: true, in_flight: true, delivered: r.delivery_rate.delivered(), delivered_time: now, first_sent_time: now, is_app_limited: false, has_data: false, }; r.on_packet_sent( pkt, packet::Epoch::Application, HandshakeStatus::default(), now, "", ); pn += 1; let rtt = Duration::from_millis(50); let now = now + rtt; let mut acked = ranges::RangeSet::default(); acked.insert(0..pn); assert_eq!( r.on_ack_received( &acked, 25, packet::Epoch::Application, HandshakeStatus::default(), now, "", ), Ok((0, 0)), ); } // Stop at right before filled_pipe=true. for _ in 0..5 { let pkt = Sent { pkt_num: pn, frames: smallvec![], time_sent: now, time_acked: None, time_lost: None, size: mss, ack_eliciting: true, in_flight: true, delivered: r.delivery_rate.delivered(), delivered_time: now, first_sent_time: now, is_app_limited: false, has_data: false, }; r.on_packet_sent( pkt, packet::Epoch::Application, HandshakeStatus::default(), now, "", ); pn += 1; } let rtt = Duration::from_millis(50); let now = now + rtt; let mut acked = ranges::RangeSet::default(); // We sent 5 packets, but ack only one, to stay // in Drain state. acked.insert(0..pn - 4); assert_eq!( r.on_ack_received( &acked, 25, packet::Epoch::Application, HandshakeStatus::default(), now, "", ), Ok((0, 0)), ); // Now we are in Drain state. assert_eq!(r.bbr_state.filled_pipe, true); assert_eq!(r.bbr_state.state, BBRStateMachine::Drain); assert!(r.bbr_state.pacing_gain < 1.0); } #[test] fn bbr_probe_bw() { let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR); let mut r = Recovery::new(&cfg); let now = Instant::now(); let mss = r.max_datagram_size; r.on_init(); let mut pn = 0; // At 4th roundtrip, filled_pipe=true and switch to Drain, // but move to ProbeBW immediately because bytes_in_flight is // smaller than BBRInFlight(1). for _ in 0..4 { let pkt = Sent { pkt_num: pn, frames: smallvec![], time_sent: now, time_acked: None, time_lost: None, size: mss, ack_eliciting: true, in_flight: true, delivered: r.delivery_rate.delivered(), delivered_time: now, first_sent_time: now, is_app_limited: false, has_data: false, }; r.on_packet_sent( pkt, packet::Epoch::Application, HandshakeStatus::default(), now, "", ); pn += 1; let rtt = Duration::from_millis(50); let now = now + rtt; let mut acked = ranges::RangeSet::default(); acked.insert(0..pn); assert_eq!( r.on_ack_received( &acked, 25, packet::Epoch::Application, HandshakeStatus::default(), now, "", ), Ok((0, 0)), ); } // Now we are in ProbeBW state. assert_eq!(r.bbr_state.filled_pipe, true); assert_eq!(r.bbr_state.state, BBRStateMachine::ProbeBW); // In the first ProbeBW cycle, pacing_gain should be >= 1.0. assert!(r.bbr_state.pacing_gain >= 1.0); } #[test] fn bbr_probe_rtt() { let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR); let mut r = Recovery::new(&cfg); let now = Instant::now(); let mss = r.max_datagram_size; r.on_init(); let mut pn = 0; // At 4th roundtrip, filled_pipe=true and switch to Drain, // but move to ProbeBW immediately because bytes_in_flight is // smaller than BBRInFlight(1). for _ in 0..4 { let pkt = Sent { pkt_num: pn, frames: smallvec![], time_sent: now, time_acked: None, time_lost: None, size: mss, ack_eliciting: true, in_flight: true, delivered: r.delivery_rate.delivered(), delivered_time: now, first_sent_time: now, is_app_limited: false, has_data: false, }; r.on_packet_sent( pkt, packet::Epoch::Application, HandshakeStatus::default(), now, "", ); pn += 1; let rtt = Duration::from_millis(50); let now = now + rtt; let mut acked = ranges::RangeSet::default(); acked.insert(0..pn); assert_eq!( r.on_ack_received( &acked, 25, packet::Epoch::Application, HandshakeStatus::default(), now, "", ), Ok((0, 0)), ); } // Now we are in ProbeBW state. assert_eq!(r.bbr_state.state, BBRStateMachine::ProbeBW); // After RTPROP_FILTER_LEN (10s), switch to ProbeRTT. let now = now + RTPROP_FILTER_LEN; let pkt = Sent { pkt_num: pn, frames: smallvec![], time_sent: now, time_acked: None, time_lost: None, size: mss, ack_eliciting: true, in_flight: true, delivered: r.delivery_rate.delivered(), delivered_time: now, first_sent_time: now, is_app_limited: false, has_data: false, }; r.on_packet_sent( pkt, packet::Epoch::Application, HandshakeStatus::default(), now, "", ); pn += 1; // Don't update rtprop by giving larger rtt than before. // If rtprop is updated, rtprop expiry check is reset. let rtt = Duration::from_millis(100); let now = now + rtt; let mut acked = ranges::RangeSet::default(); acked.insert(0..pn); assert_eq!( r.on_ack_received( &acked, 25, packet::Epoch::Application, HandshakeStatus::default(), now, "", ), Ok((0, 0)), ); assert_eq!(r.bbr_state.state, BBRStateMachine::ProbeRTT); assert_eq!(r.bbr_state.pacing_gain, 1.0); } } mod init; mod pacing; mod per_ack; mod per_transmit;