xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/core/quic_flow_controller.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/quic_flow_controller.h"
6 
7 #include <cstdint>
8 
9 #include "absl/strings/str_cat.h"
10 #include "quiche/quic/core/quic_connection.h"
11 #include "quiche/quic/core/quic_packets.h"
12 #include "quiche/quic/core/quic_session.h"
13 #include "quiche/quic/core/quic_utils.h"
14 #include "quiche/quic/platform/api/quic_bug_tracker.h"
15 #include "quiche/quic/platform/api/quic_flag_utils.h"
16 #include "quiche/quic/platform/api/quic_flags.h"
17 #include "quiche/quic/platform/api/quic_logging.h"
18 
19 namespace quic {
20 
21 #define ENDPOINT \
22   (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
23 
LogLabel()24 std::string QuicFlowController::LogLabel() {
25   if (is_connection_flow_controller_) {
26     return "connection";
27   }
28   return absl::StrCat("stream ", id_);
29 }
30 
QuicFlowController(QuicSession * session,QuicStreamId id,bool is_connection_flow_controller,QuicStreamOffset send_window_offset,QuicStreamOffset receive_window_offset,QuicByteCount receive_window_size_limit,bool should_auto_tune_receive_window,QuicFlowControllerInterface * session_flow_controller)31 QuicFlowController::QuicFlowController(
32     QuicSession* session, QuicStreamId id, bool is_connection_flow_controller,
33     QuicStreamOffset send_window_offset, QuicStreamOffset receive_window_offset,
34     QuicByteCount receive_window_size_limit,
35     bool should_auto_tune_receive_window,
36     QuicFlowControllerInterface* session_flow_controller)
37     : session_(session),
38       connection_(session->connection()),
39       id_(id),
40       is_connection_flow_controller_(is_connection_flow_controller),
41       perspective_(session->perspective()),
42       bytes_sent_(0),
43       send_window_offset_(send_window_offset),
44       bytes_consumed_(0),
45       highest_received_byte_offset_(0),
46       receive_window_offset_(receive_window_offset),
47       receive_window_size_(receive_window_offset),
48       receive_window_size_limit_(receive_window_size_limit),
49       auto_tune_receive_window_(should_auto_tune_receive_window),
50       session_flow_controller_(session_flow_controller),
51       last_blocked_send_window_offset_(0),
52       prev_window_update_time_(QuicTime::Zero()) {
53   QUICHE_DCHECK_LE(receive_window_size_, receive_window_size_limit_);
54   QUICHE_DCHECK_EQ(
55       is_connection_flow_controller_,
56       QuicUtils::GetInvalidStreamId(session_->transport_version()) == id_);
57 
58   QUIC_DVLOG(1) << ENDPOINT << "Created flow controller for " << LogLabel()
59                 << ", setting initial receive window offset to: "
60                 << receive_window_offset_
61                 << ", max receive window to: " << receive_window_size_
62                 << ", max receive window limit to: "
63                 << receive_window_size_limit_
64                 << ", setting send window offset to: " << send_window_offset_;
65 }
66 
AddBytesConsumed(QuicByteCount bytes_consumed)67 void QuicFlowController::AddBytesConsumed(QuicByteCount bytes_consumed) {
68   bytes_consumed_ += bytes_consumed;
69   QUIC_DVLOG(1) << ENDPOINT << LogLabel() << " consumed " << bytes_consumed_
70                 << " bytes.";
71 
72   MaybeSendWindowUpdate();
73 }
74 
UpdateHighestReceivedOffset(QuicStreamOffset new_offset)75 bool QuicFlowController::UpdateHighestReceivedOffset(
76     QuicStreamOffset new_offset) {
77   // Only update if offset has increased.
78   if (new_offset <= highest_received_byte_offset_) {
79     return false;
80   }
81 
82   QUIC_DVLOG(1) << ENDPOINT << LogLabel()
83                 << " highest byte offset increased from "
84                 << highest_received_byte_offset_ << " to " << new_offset;
85   highest_received_byte_offset_ = new_offset;
86   return true;
87 }
88 
AddBytesSent(QuicByteCount bytes_sent)89 void QuicFlowController::AddBytesSent(QuicByteCount bytes_sent) {
90   if (bytes_sent_ + bytes_sent > send_window_offset_) {
91     QUIC_BUG(quic_bug_10836_1)
92         << ENDPOINT << LogLabel() << " Trying to send an extra " << bytes_sent
93         << " bytes, when bytes_sent = " << bytes_sent_
94         << ", and send_window_offset_ = " << send_window_offset_;
95     bytes_sent_ = send_window_offset_;
96 
97     // This is an error on our side, close the connection as soon as possible.
98     connection_->CloseConnection(
99         QUIC_FLOW_CONTROL_SENT_TOO_MUCH_DATA,
100         absl::StrCat(send_window_offset_ - (bytes_sent_ + bytes_sent),
101                      "bytes over send window offset"),
102         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
103     return;
104   }
105 
106   bytes_sent_ += bytes_sent;
107   QUIC_DVLOG(1) << ENDPOINT << LogLabel() << " sent " << bytes_sent_
108                 << " bytes.";
109 }
110 
FlowControlViolation()111 bool QuicFlowController::FlowControlViolation() {
112   if (highest_received_byte_offset_ > receive_window_offset_) {
113     QUIC_DLOG(INFO) << ENDPOINT << "Flow control violation on " << LogLabel()
114                     << ", receive window offset: " << receive_window_offset_
115                     << ", highest received byte offset: "
116                     << highest_received_byte_offset_;
117     return true;
118   }
119   return false;
120 }
121 
MaybeIncreaseMaxWindowSize()122 void QuicFlowController::MaybeIncreaseMaxWindowSize() {
123   // Core of receive window auto tuning.  This method should be called before a
124   // WINDOW_UPDATE frame is sent.  Ideally, window updates should occur close to
125   // once per RTT.  If a window update happens much faster than RTT, it implies
126   // that the flow control window is imposing a bottleneck.  To prevent this,
127   // this method will increase the receive window size (subject to a reasonable
128   // upper bound).  For simplicity this algorithm is deliberately asymmetric, in
129   // that it may increase window size but never decreases.
130 
131   // Keep track of timing between successive window updates.
132   QuicTime now = connection_->clock()->ApproximateNow();
133   QuicTime prev = prev_window_update_time_;
134   prev_window_update_time_ = now;
135   if (!prev.IsInitialized()) {
136     QUIC_DVLOG(1) << ENDPOINT << "first window update for " << LogLabel();
137     return;
138   }
139 
140   if (!auto_tune_receive_window_) {
141     return;
142   }
143 
144   // Get outbound RTT.
145   QuicTime::Delta rtt =
146       connection_->sent_packet_manager().GetRttStats()->smoothed_rtt();
147   if (rtt.IsZero()) {
148     QUIC_DVLOG(1) << ENDPOINT << "rtt zero for " << LogLabel();
149     return;
150   }
151 
152   // Now we can compare timing of window updates with RTT.
153   QuicTime::Delta since_last = now - prev;
154   QuicTime::Delta two_rtt = 2 * rtt;
155 
156   if (since_last >= two_rtt) {
157     // If interval between window updates is sufficiently large, there
158     // is no need to increase receive_window_size_.
159     return;
160   }
161   QuicByteCount old_window = receive_window_size_;
162   IncreaseWindowSize();
163 
164   if (receive_window_size_ > old_window) {
165     QUIC_DVLOG(1) << ENDPOINT << "New max window increase for " << LogLabel()
166                   << " after " << since_last.ToMicroseconds()
167                   << " us, and RTT is " << rtt.ToMicroseconds()
168                   << "us. max wndw: " << receive_window_size_;
169     if (session_flow_controller_ != nullptr) {
170       session_flow_controller_->EnsureWindowAtLeast(
171           kSessionFlowControlMultiplier * receive_window_size_);
172     }
173   } else {
174     // TODO(ckrasic) - add a varz to track this (?).
175     QUIC_LOG_FIRST_N(INFO, 1)
176         << ENDPOINT << "Max window at limit for " << LogLabel() << " after "
177         << since_last.ToMicroseconds() << " us, and RTT is "
178         << rtt.ToMicroseconds() << "us. Limit size: " << receive_window_size_;
179   }
180 }
181 
IncreaseWindowSize()182 void QuicFlowController::IncreaseWindowSize() {
183   receive_window_size_ *= 2;
184   receive_window_size_ =
185       std::min(receive_window_size_, receive_window_size_limit_);
186 }
187 
WindowUpdateThreshold()188 QuicByteCount QuicFlowController::WindowUpdateThreshold() {
189   return receive_window_size_ / 2;
190 }
191 
MaybeSendWindowUpdate()192 void QuicFlowController::MaybeSendWindowUpdate() {
193   if (!session_->connection()->connected()) {
194     return;
195   }
196   // Send WindowUpdate to increase receive window if
197   // (receive window offset - consumed bytes) < (max window / 2).
198   // This is behaviour copied from SPDY.
199   QUICHE_DCHECK_LE(bytes_consumed_, receive_window_offset_);
200   QuicStreamOffset available_window = receive_window_offset_ - bytes_consumed_;
201   QuicByteCount threshold = WindowUpdateThreshold();
202 
203   if (!prev_window_update_time_.IsInitialized()) {
204     // Treat the initial window as if it is a window update, so if 1/2 the
205     // window is used in less than 2 RTTs, the window is increased.
206     prev_window_update_time_ = connection_->clock()->ApproximateNow();
207   }
208 
209   if (available_window >= threshold) {
210     QUIC_DVLOG(1) << ENDPOINT << "Not sending WindowUpdate for " << LogLabel()
211                   << ", available window: " << available_window
212                   << " >= threshold: " << threshold;
213     return;
214   }
215 
216   MaybeIncreaseMaxWindowSize();
217   UpdateReceiveWindowOffsetAndSendWindowUpdate(available_window);
218 }
219 
UpdateReceiveWindowOffsetAndSendWindowUpdate(QuicStreamOffset available_window)220 void QuicFlowController::UpdateReceiveWindowOffsetAndSendWindowUpdate(
221     QuicStreamOffset available_window) {
222   // Update our receive window.
223   receive_window_offset_ += (receive_window_size_ - available_window);
224 
225   QUIC_DVLOG(1) << ENDPOINT << "Sending WindowUpdate frame for " << LogLabel()
226                 << ", consumed bytes: " << bytes_consumed_
227                 << ", available window: " << available_window
228                 << ", and threshold: " << WindowUpdateThreshold()
229                 << ", and receive window size: " << receive_window_size_
230                 << ". New receive window offset is: " << receive_window_offset_;
231 
232   SendWindowUpdate();
233 }
234 
MaybeSendBlocked()235 void QuicFlowController::MaybeSendBlocked() {
236   if (SendWindowSize() != 0 ||
237       last_blocked_send_window_offset_ >= send_window_offset_) {
238     return;
239   }
240   QUIC_DLOG(INFO) << ENDPOINT << LogLabel() << " is flow control blocked. "
241                   << "Send window: " << SendWindowSize()
242                   << ", bytes sent: " << bytes_sent_
243                   << ", send limit: " << send_window_offset_;
244   // The entire send_window has been consumed, we are now flow control
245   // blocked.
246 
247   // Keep track of when we last sent a BLOCKED frame so that we only send one
248   // at a given send offset.
249   last_blocked_send_window_offset_ = send_window_offset_;
250   session_->SendBlocked(id_, last_blocked_send_window_offset_);
251 }
252 
UpdateSendWindowOffset(QuicStreamOffset new_send_window_offset)253 bool QuicFlowController::UpdateSendWindowOffset(
254     QuicStreamOffset new_send_window_offset) {
255   // Only update if send window has increased.
256   if (new_send_window_offset <= send_window_offset_) {
257     return false;
258   }
259 
260   QUIC_DVLOG(1) << ENDPOINT << "UpdateSendWindowOffset for " << LogLabel()
261                 << " with new offset " << new_send_window_offset
262                 << " current offset: " << send_window_offset_
263                 << " bytes_sent: " << bytes_sent_;
264 
265   // The flow is now unblocked but could have also been unblocked
266   // before.  Return true iff this update caused a change from blocked
267   // to unblocked.
268   const bool was_previously_blocked = IsBlocked();
269   send_window_offset_ = new_send_window_offset;
270   return was_previously_blocked;
271 }
272 
EnsureWindowAtLeast(QuicByteCount window_size)273 void QuicFlowController::EnsureWindowAtLeast(QuicByteCount window_size) {
274   if (receive_window_size_limit_ >= window_size) {
275     return;
276   }
277 
278   QuicStreamOffset available_window = receive_window_offset_ - bytes_consumed_;
279   IncreaseWindowSize();
280   UpdateReceiveWindowOffsetAndSendWindowUpdate(available_window);
281 }
282 
IsBlocked() const283 bool QuicFlowController::IsBlocked() const { return SendWindowSize() == 0; }
284 
SendWindowSize() const285 uint64_t QuicFlowController::SendWindowSize() const {
286   if (bytes_sent_ > send_window_offset_) {
287     return 0;
288   }
289   return send_window_offset_ - bytes_sent_;
290 }
291 
UpdateReceiveWindowSize(QuicStreamOffset size)292 void QuicFlowController::UpdateReceiveWindowSize(QuicStreamOffset size) {
293   QUICHE_DCHECK_LE(size, receive_window_size_limit_);
294   QUIC_DVLOG(1) << ENDPOINT << "UpdateReceiveWindowSize for " << LogLabel()
295                 << ": " << size;
296   if (receive_window_size_ != receive_window_offset_) {
297     QUIC_BUG(quic_bug_10836_2)
298         << "receive_window_size_:" << receive_window_size_
299         << " != receive_window_offset:" << receive_window_offset_;
300     return;
301   }
302   receive_window_size_ = size;
303   receive_window_offset_ = size;
304 }
305 
SendWindowUpdate()306 void QuicFlowController::SendWindowUpdate() {
307   QuicStreamId id = id_;
308   if (is_connection_flow_controller_) {
309     id = QuicUtils::GetInvalidStreamId(connection_->transport_version());
310   }
311   session_->SendWindowUpdate(id, receive_window_offset_);
312 }
313 
314 }  // namespace quic
315