1 // Copyright (C) 2021, Cloudflare, Inc. 2 // All rights reserved. 3 // 4 // Redistribution and use in source and binary forms, with or without 5 // modification, are permitted provided that the following conditions are 6 // met: 7 // 8 // * Redistributions of source code must retain the above copyright notice, 9 // this list of conditions and the following disclaimer. 10 // 11 // * Redistributions in binary form must reproduce the above copyright 12 // notice, this list of conditions and the following disclaimer in the 13 // documentation and/or other materials provided with the distribution. 14 // 15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27 use std::time::Duration; 28 use std::time::Instant; 29 30 // When autotuning the receiver window, decide how much 31 // we increase the window. 32 const WINDOW_INCREASE_FACTOR: u64 = 2; 33 34 // When autotuning the receiver window, check if the last 35 // update is within RTT * this constant. 36 const WINDOW_TRIGGER_FACTOR: u32 = 2; 37 38 #[derive(Default, Debug)] 39 pub struct FlowControl { 40 /// Total consumed bytes by the receiver. 41 consumed: u64, 42 43 /// Flow control limit. 44 max_data: u64, 45 46 /// The receive window. This value is used for updating 47 /// flow control limit. 48 window: u64, 49 50 /// The maximum receive window. 51 max_window: u64, 52 53 /// Last update time of max_data for autotuning the window. 54 last_update: Option<Instant>, 55 } 56 57 impl FlowControl { new(max_data: u64, window: u64, max_window: u64) -> Self58 pub fn new(max_data: u64, window: u64, max_window: u64) -> Self { 59 Self { 60 max_data, 61 62 window, 63 64 max_window, 65 66 ..Default::default() 67 } 68 } 69 70 /// Returns the current window size. window(&self) -> u6471 pub fn window(&self) -> u64 { 72 self.window 73 } 74 75 /// Returns the current flow limit. max_data(&self) -> u6476 pub fn max_data(&self) -> u64 { 77 self.max_data 78 } 79 80 /// Update consumed bytes. add_consumed(&mut self, consumed: u64)81 pub fn add_consumed(&mut self, consumed: u64) { 82 self.consumed += consumed; 83 } 84 85 /// Returns true if the flow control needs to update max_data. 86 /// 87 /// This happens when the available window is smaller than the half 88 /// of the current window. should_update_max_data(&self) -> bool89 pub fn should_update_max_data(&self) -> bool { 90 let available_window = self.max_data - self.consumed; 91 92 available_window < (self.window / 2) 93 } 94 95 /// Returns the new max_data limit. max_data_next(&self) -> u6496 pub fn max_data_next(&self) -> u64 { 97 self.consumed + self.window 98 } 99 100 /// Commits the new max_data limit. update_max_data(&mut self, now: Instant)101 pub fn update_max_data(&mut self, now: Instant) { 102 self.max_data = self.max_data_next(); 103 self.last_update = Some(now); 104 } 105 106 /// Autotune the window size. When there is an another update 107 /// within RTT x 2, bump the window x 1.5, capped by 108 /// max_window. autotune_window(&mut self, now: Instant, rtt: Duration)109 pub fn autotune_window(&mut self, now: Instant, rtt: Duration) { 110 if let Some(last_update) = self.last_update { 111 if now - last_update < rtt * WINDOW_TRIGGER_FACTOR { 112 self.window = std::cmp::min( 113 self.window * WINDOW_INCREASE_FACTOR, 114 self.max_window, 115 ); 116 } 117 } 118 } 119 120 /// Make sure the lower bound of the window is same to 121 /// the current window. ensure_window_lower_bound(&mut self, min_window: u64)122 pub fn ensure_window_lower_bound(&mut self, min_window: u64) { 123 if min_window > self.window { 124 self.window = min_window; 125 } 126 } 127 } 128 129 #[cfg(test)] 130 mod tests { 131 use super::*; 132 133 #[test] max_data()134 fn max_data() { 135 let fc = FlowControl::new(100, 20, 100); 136 137 assert_eq!(fc.max_data(), 100); 138 } 139 140 #[test] should_update_max_data()141 fn should_update_max_data() { 142 let mut fc = FlowControl::new(100, 20, 100); 143 144 fc.add_consumed(85); 145 assert_eq!(fc.should_update_max_data(), false); 146 147 fc.add_consumed(10); 148 assert_eq!(fc.should_update_max_data(), true); 149 } 150 151 #[test] max_data_next()152 fn max_data_next() { 153 let mut fc = FlowControl::new(100, 20, 100); 154 155 let consumed = 95; 156 157 fc.add_consumed(consumed); 158 assert_eq!(fc.should_update_max_data(), true); 159 assert_eq!(fc.max_data_next(), consumed + 20); 160 } 161 162 #[test] update_max_data()163 fn update_max_data() { 164 let mut fc = FlowControl::new(100, 20, 100); 165 166 let consumed = 95; 167 168 fc.add_consumed(consumed); 169 assert_eq!(fc.should_update_max_data(), true); 170 171 let max_data_next = fc.max_data_next(); 172 assert_eq!(fc.max_data_next(), consumed + 20); 173 174 fc.update_max_data(Instant::now()); 175 assert_eq!(fc.max_data(), max_data_next); 176 } 177 178 #[test] autotune_window()179 fn autotune_window() { 180 let w = 20; 181 let mut fc = FlowControl::new(100, w, 100); 182 183 let consumed = 95; 184 185 fc.add_consumed(consumed); 186 assert_eq!(fc.should_update_max_data(), true); 187 188 let max_data_next = fc.max_data_next(); 189 assert_eq!(max_data_next, consumed + w); 190 191 fc.update_max_data(Instant::now()); 192 assert_eq!(fc.max_data(), max_data_next); 193 194 // Window size should be doubled. 195 fc.autotune_window(Instant::now(), Duration::from_millis(100)); 196 197 let w = w * 2; 198 let consumed_inc = 15; 199 200 fc.add_consumed(consumed_inc); 201 assert_eq!(fc.should_update_max_data(), true); 202 203 let max_data_next = fc.max_data_next(); 204 assert_eq!(max_data_next, consumed + consumed_inc + w); 205 } 206 207 #[test] ensure_window_lower_bound()208 fn ensure_window_lower_bound() { 209 let w = 20; 210 let mut fc = FlowControl::new(100, w, 100); 211 212 // Window doesn't change. 213 fc.ensure_window_lower_bound(w); 214 assert_eq!(fc.window(), 20); 215 216 // Window changed to the new value. 217 fc.ensure_window_lower_bound(w * 2); 218 assert_eq!(fc.window(), 40); 219 } 220 } 221