1 // Copyright (C) 2021, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 use std::time::Duration;
28 use std::time::Instant;
29 
30 // When autotuning the receiver window, decide how much
31 // we increase the window.
32 const WINDOW_INCREASE_FACTOR: u64 = 2;
33 
34 // When autotuning the receiver window, check if the last
35 // update is within RTT * this constant.
36 const WINDOW_TRIGGER_FACTOR: u32 = 2;
37 
38 #[derive(Default, Debug)]
39 pub struct FlowControl {
40     /// Total consumed bytes by the receiver.
41     consumed: u64,
42 
43     /// Flow control limit.
44     max_data: u64,
45 
46     /// The receive window. This value is used for updating
47     /// flow control limit.
48     window: u64,
49 
50     /// The maximum receive window.
51     max_window: u64,
52 
53     /// Last update time of max_data for autotuning the window.
54     last_update: Option<Instant>,
55 }
56 
57 impl FlowControl {
new(max_data: u64, window: u64, max_window: u64) -> Self58     pub fn new(max_data: u64, window: u64, max_window: u64) -> Self {
59         Self {
60             max_data,
61 
62             window,
63 
64             max_window,
65 
66             ..Default::default()
67         }
68     }
69 
70     /// Returns the current window size.
window(&self) -> u6471     pub fn window(&self) -> u64 {
72         self.window
73     }
74 
75     /// Returns the current flow limit.
max_data(&self) -> u6476     pub fn max_data(&self) -> u64 {
77         self.max_data
78     }
79 
80     /// Update consumed bytes.
add_consumed(&mut self, consumed: u64)81     pub fn add_consumed(&mut self, consumed: u64) {
82         self.consumed += consumed;
83     }
84 
85     /// Returns true if the flow control needs to update max_data.
86     ///
87     /// This happens when the available window is smaller than the half
88     /// of the current window.
should_update_max_data(&self) -> bool89     pub fn should_update_max_data(&self) -> bool {
90         let available_window = self.max_data - self.consumed;
91 
92         available_window < (self.window / 2)
93     }
94 
95     /// Returns the new max_data limit.
max_data_next(&self) -> u6496     pub fn max_data_next(&self) -> u64 {
97         self.consumed + self.window
98     }
99 
100     /// Commits the new max_data limit.
update_max_data(&mut self, now: Instant)101     pub fn update_max_data(&mut self, now: Instant) {
102         self.max_data = self.max_data_next();
103         self.last_update = Some(now);
104     }
105 
106     /// Autotune the window size. When there is an another update
107     /// within RTT x 2, bump the window x 1.5, capped by
108     /// max_window.
autotune_window(&mut self, now: Instant, rtt: Duration)109     pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
110         if let Some(last_update) = self.last_update {
111             if now - last_update < rtt * WINDOW_TRIGGER_FACTOR {
112                 self.window = std::cmp::min(
113                     self.window * WINDOW_INCREASE_FACTOR,
114                     self.max_window,
115                 );
116             }
117         }
118     }
119 
120     /// Make sure the lower bound of the window is same to
121     /// the current window.
ensure_window_lower_bound(&mut self, min_window: u64)122     pub fn ensure_window_lower_bound(&mut self, min_window: u64) {
123         if min_window > self.window {
124             self.window = min_window;
125         }
126     }
127 }
128 
129 #[cfg(test)]
130 mod tests {
131     use super::*;
132 
133     #[test]
max_data()134     fn max_data() {
135         let fc = FlowControl::new(100, 20, 100);
136 
137         assert_eq!(fc.max_data(), 100);
138     }
139 
140     #[test]
should_update_max_data()141     fn should_update_max_data() {
142         let mut fc = FlowControl::new(100, 20, 100);
143 
144         fc.add_consumed(85);
145         assert_eq!(fc.should_update_max_data(), false);
146 
147         fc.add_consumed(10);
148         assert_eq!(fc.should_update_max_data(), true);
149     }
150 
151     #[test]
max_data_next()152     fn max_data_next() {
153         let mut fc = FlowControl::new(100, 20, 100);
154 
155         let consumed = 95;
156 
157         fc.add_consumed(consumed);
158         assert_eq!(fc.should_update_max_data(), true);
159         assert_eq!(fc.max_data_next(), consumed + 20);
160     }
161 
162     #[test]
update_max_data()163     fn update_max_data() {
164         let mut fc = FlowControl::new(100, 20, 100);
165 
166         let consumed = 95;
167 
168         fc.add_consumed(consumed);
169         assert_eq!(fc.should_update_max_data(), true);
170 
171         let max_data_next = fc.max_data_next();
172         assert_eq!(fc.max_data_next(), consumed + 20);
173 
174         fc.update_max_data(Instant::now());
175         assert_eq!(fc.max_data(), max_data_next);
176     }
177 
178     #[test]
autotune_window()179     fn autotune_window() {
180         let w = 20;
181         let mut fc = FlowControl::new(100, w, 100);
182 
183         let consumed = 95;
184 
185         fc.add_consumed(consumed);
186         assert_eq!(fc.should_update_max_data(), true);
187 
188         let max_data_next = fc.max_data_next();
189         assert_eq!(max_data_next, consumed + w);
190 
191         fc.update_max_data(Instant::now());
192         assert_eq!(fc.max_data(), max_data_next);
193 
194         // Window size should be doubled.
195         fc.autotune_window(Instant::now(), Duration::from_millis(100));
196 
197         let w = w * 2;
198         let consumed_inc = 15;
199 
200         fc.add_consumed(consumed_inc);
201         assert_eq!(fc.should_update_max_data(), true);
202 
203         let max_data_next = fc.max_data_next();
204         assert_eq!(max_data_next, consumed + consumed_inc + w);
205     }
206 
207     #[test]
ensure_window_lower_bound()208     fn ensure_window_lower_bound() {
209         let w = 20;
210         let mut fc = FlowControl::new(100, w, 100);
211 
212         // Window doesn't change.
213         fc.ensure_window_lower_bound(w);
214         assert_eq!(fc.window(), 20);
215 
216         // Window changed to the new value.
217         fc.ensure_window_lower_bound(w * 2);
218         assert_eq!(fc.window(), 40);
219     }
220 }
221