1 // Copyright (c) 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/quic_stream_send_buffer.h"
6
7 #include <algorithm>
8
9 #include "quiche/quic/core/quic_data_writer.h"
10 #include "quiche/quic/core/quic_interval.h"
11 #include "quiche/quic/core/quic_utils.h"
12 #include "quiche/quic/platform/api/quic_bug_tracker.h"
13 #include "quiche/quic/platform/api/quic_flag_utils.h"
14 #include "quiche/quic/platform/api/quic_flags.h"
15 #include "quiche/quic/platform/api/quic_logging.h"
16 #include "quiche/common/platform/api/quiche_mem_slice.h"
17
18 namespace quic {
19
20 namespace {
21
22 struct CompareOffset {
operator ()quic::__anonddb8548f0111::CompareOffset23 bool operator()(const BufferedSlice& slice, QuicStreamOffset offset) const {
24 return slice.offset + slice.slice.length() < offset;
25 }
26 };
27
28 } // namespace
29
BufferedSlice(quiche::QuicheMemSlice mem_slice,QuicStreamOffset offset)30 BufferedSlice::BufferedSlice(quiche::QuicheMemSlice mem_slice,
31 QuicStreamOffset offset)
32 : slice(std::move(mem_slice)), offset(offset) {}
33
34 BufferedSlice::BufferedSlice(BufferedSlice&& other) = default;
35
36 BufferedSlice& BufferedSlice::operator=(BufferedSlice&& other) = default;
37
~BufferedSlice()38 BufferedSlice::~BufferedSlice() {}
39
interval() const40 QuicInterval<std::size_t> BufferedSlice::interval() const {
41 const std::size_t length = slice.length();
42 return QuicInterval<std::size_t>(offset, offset + length);
43 }
44
operator ==(const StreamPendingRetransmission & other) const45 bool StreamPendingRetransmission::operator==(
46 const StreamPendingRetransmission& other) const {
47 return offset == other.offset && length == other.length;
48 }
49
QuicStreamSendBuffer(quiche::QuicheBufferAllocator * allocator)50 QuicStreamSendBuffer::QuicStreamSendBuffer(
51 quiche::QuicheBufferAllocator* allocator)
52 : current_end_offset_(0),
53 stream_offset_(0),
54 allocator_(allocator),
55 stream_bytes_written_(0),
56 stream_bytes_outstanding_(0),
57 write_index_(-1) {}
58
~QuicStreamSendBuffer()59 QuicStreamSendBuffer::~QuicStreamSendBuffer() {}
60
SaveStreamData(absl::string_view data)61 void QuicStreamSendBuffer::SaveStreamData(absl::string_view data) {
62 QUICHE_DCHECK(!data.empty());
63
64 // Latch the maximum data slice size.
65 const QuicByteCount max_data_slice_size =
66 GetQuicFlag(quic_send_buffer_max_data_slice_size);
67 while (!data.empty()) {
68 auto slice_len = std::min<absl::string_view::size_type>(
69 data.length(), max_data_slice_size);
70 auto buffer =
71 quiche::QuicheBuffer::Copy(allocator_, data.substr(0, slice_len));
72 SaveMemSlice(quiche::QuicheMemSlice(std::move(buffer)));
73
74 data = data.substr(slice_len);
75 }
76 }
77
SaveMemSlice(quiche::QuicheMemSlice slice)78 void QuicStreamSendBuffer::SaveMemSlice(quiche::QuicheMemSlice slice) {
79 QUIC_DVLOG(2) << "Save slice offset " << stream_offset_ << " length "
80 << slice.length();
81 if (slice.empty()) {
82 QUIC_BUG(quic_bug_10853_1) << "Try to save empty MemSlice to send buffer.";
83 return;
84 }
85 size_t length = slice.length();
86 // Need to start the offsets at the right interval.
87 if (interval_deque_.Empty()) {
88 const QuicStreamOffset end = stream_offset_ + length;
89 current_end_offset_ = std::max(current_end_offset_, end);
90 }
91 BufferedSlice bs = BufferedSlice(std::move(slice), stream_offset_);
92 interval_deque_.PushBack(std::move(bs));
93 stream_offset_ += length;
94 }
95
SaveMemSliceSpan(absl::Span<quiche::QuicheMemSlice> span)96 QuicByteCount QuicStreamSendBuffer::SaveMemSliceSpan(
97 absl::Span<quiche::QuicheMemSlice> span) {
98 QuicByteCount total = 0;
99 for (quiche::QuicheMemSlice& slice : span) {
100 if (slice.length() == 0) {
101 // Skip empty slices.
102 continue;
103 }
104 total += slice.length();
105 SaveMemSlice(std::move(slice));
106 }
107 return total;
108 }
109
OnStreamDataConsumed(size_t bytes_consumed)110 void QuicStreamSendBuffer::OnStreamDataConsumed(size_t bytes_consumed) {
111 stream_bytes_written_ += bytes_consumed;
112 stream_bytes_outstanding_ += bytes_consumed;
113 }
114
WriteStreamData(QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)115 bool QuicStreamSendBuffer::WriteStreamData(QuicStreamOffset offset,
116 QuicByteCount data_length,
117 QuicDataWriter* writer) {
118 QUIC_BUG_IF(quic_bug_12823_1, current_end_offset_ < offset)
119 << "Tried to write data out of sequence. last_offset_end:"
120 << current_end_offset_ << ", offset:" << offset;
121 // The iterator returned from |interval_deque_| will automatically advance
122 // the internal write index for the QuicIntervalDeque. The incrementing is
123 // done in operator++.
124 for (auto slice_it = interval_deque_.DataAt(offset);
125 slice_it != interval_deque_.DataEnd(); ++slice_it) {
126 if (data_length == 0 || offset < slice_it->offset) {
127 break;
128 }
129
130 QuicByteCount slice_offset = offset - slice_it->offset;
131 QuicByteCount available_bytes_in_slice =
132 slice_it->slice.length() - slice_offset;
133 QuicByteCount copy_length = std::min(data_length, available_bytes_in_slice);
134 if (!writer->WriteBytes(slice_it->slice.data() + slice_offset,
135 copy_length)) {
136 QUIC_BUG(quic_bug_10853_2) << "Writer fails to write.";
137 return false;
138 }
139 offset += copy_length;
140 data_length -= copy_length;
141 const QuicStreamOffset new_end =
142 slice_it->offset + slice_it->slice.length();
143 current_end_offset_ = std::max(current_end_offset_, new_end);
144 }
145 return data_length == 0;
146 }
147
OnStreamDataAcked(QuicStreamOffset offset,QuicByteCount data_length,QuicByteCount * newly_acked_length)148 bool QuicStreamSendBuffer::OnStreamDataAcked(
149 QuicStreamOffset offset, QuicByteCount data_length,
150 QuicByteCount* newly_acked_length) {
151 *newly_acked_length = 0;
152 if (data_length == 0) {
153 return true;
154 }
155 if (bytes_acked_.Empty() || offset >= bytes_acked_.rbegin()->max() ||
156 bytes_acked_.IsDisjoint(
157 QuicInterval<QuicStreamOffset>(offset, offset + data_length))) {
158 // Optimization for the typical case, when all data is newly acked.
159 if (stream_bytes_outstanding_ < data_length) {
160 return false;
161 }
162 bytes_acked_.AddOptimizedForAppend(offset, offset + data_length);
163 *newly_acked_length = data_length;
164 stream_bytes_outstanding_ -= data_length;
165 pending_retransmissions_.Difference(offset, offset + data_length);
166 if (!FreeMemSlices(offset, offset + data_length)) {
167 return false;
168 }
169 CleanUpBufferedSlices();
170 return true;
171 }
172 // Exit if no new data gets acked.
173 if (bytes_acked_.Contains(offset, offset + data_length)) {
174 return true;
175 }
176 // Execute the slow path if newly acked data fill in existing holes.
177 QuicIntervalSet<QuicStreamOffset> newly_acked(offset, offset + data_length);
178 newly_acked.Difference(bytes_acked_);
179 for (const auto& interval : newly_acked) {
180 *newly_acked_length += (interval.max() - interval.min());
181 }
182 if (stream_bytes_outstanding_ < *newly_acked_length) {
183 return false;
184 }
185 stream_bytes_outstanding_ -= *newly_acked_length;
186 bytes_acked_.Add(offset, offset + data_length);
187 pending_retransmissions_.Difference(offset, offset + data_length);
188 if (newly_acked.Empty()) {
189 return true;
190 }
191 if (!FreeMemSlices(newly_acked.begin()->min(), newly_acked.rbegin()->max())) {
192 return false;
193 }
194 CleanUpBufferedSlices();
195 return true;
196 }
197
OnStreamDataLost(QuicStreamOffset offset,QuicByteCount data_length)198 void QuicStreamSendBuffer::OnStreamDataLost(QuicStreamOffset offset,
199 QuicByteCount data_length) {
200 if (data_length == 0) {
201 return;
202 }
203 QuicIntervalSet<QuicStreamOffset> bytes_lost(offset, offset + data_length);
204 bytes_lost.Difference(bytes_acked_);
205 if (bytes_lost.Empty()) {
206 return;
207 }
208 for (const auto& lost : bytes_lost) {
209 pending_retransmissions_.Add(lost.min(), lost.max());
210 }
211 }
212
OnStreamDataRetransmitted(QuicStreamOffset offset,QuicByteCount data_length)213 void QuicStreamSendBuffer::OnStreamDataRetransmitted(
214 QuicStreamOffset offset, QuicByteCount data_length) {
215 if (data_length == 0) {
216 return;
217 }
218 pending_retransmissions_.Difference(offset, offset + data_length);
219 }
220
HasPendingRetransmission() const221 bool QuicStreamSendBuffer::HasPendingRetransmission() const {
222 return !pending_retransmissions_.Empty();
223 }
224
NextPendingRetransmission() const225 StreamPendingRetransmission QuicStreamSendBuffer::NextPendingRetransmission()
226 const {
227 if (HasPendingRetransmission()) {
228 const auto pending = pending_retransmissions_.begin();
229 return {pending->min(), pending->max() - pending->min()};
230 }
231 QUIC_BUG(quic_bug_10853_3)
232 << "NextPendingRetransmission is called unexpected with no "
233 "pending retransmissions.";
234 return {0, 0};
235 }
236
FreeMemSlices(QuicStreamOffset start,QuicStreamOffset end)237 bool QuicStreamSendBuffer::FreeMemSlices(QuicStreamOffset start,
238 QuicStreamOffset end) {
239 auto it = interval_deque_.DataBegin();
240 if (it == interval_deque_.DataEnd() || it->slice.empty()) {
241 QUIC_BUG(quic_bug_10853_4)
242 << "Trying to ack stream data [" << start << ", " << end << "), "
243 << (it == interval_deque_.DataEnd()
244 ? "and there is no outstanding data."
245 : "and the first slice is empty.");
246 return false;
247 }
248 if (!it->interval().Contains(start)) {
249 // Slow path that not the earliest outstanding data gets acked.
250 it = std::lower_bound(interval_deque_.DataBegin(),
251 interval_deque_.DataEnd(), start, CompareOffset());
252 }
253 if (it == interval_deque_.DataEnd() || it->slice.empty()) {
254 QUIC_BUG(quic_bug_10853_5)
255 << "Offset " << start << " with iterator offset: " << it->offset
256 << (it == interval_deque_.DataEnd() ? " does not exist."
257 : " has already been acked.");
258 return false;
259 }
260 for (; it != interval_deque_.DataEnd(); ++it) {
261 if (it->offset >= end) {
262 break;
263 }
264 if (!it->slice.empty() &&
265 bytes_acked_.Contains(it->offset, it->offset + it->slice.length())) {
266 it->slice.Reset();
267 }
268 }
269 return true;
270 }
271
CleanUpBufferedSlices()272 void QuicStreamSendBuffer::CleanUpBufferedSlices() {
273 while (!interval_deque_.Empty() &&
274 interval_deque_.DataBegin()->slice.empty()) {
275 QUIC_BUG_IF(quic_bug_12823_2,
276 interval_deque_.DataBegin()->offset > current_end_offset_)
277 << "Fail to pop front from interval_deque_. Front element contained "
278 "a slice whose data has not all be written. Front offset "
279 << interval_deque_.DataBegin()->offset << " length "
280 << interval_deque_.DataBegin()->slice.length();
281 interval_deque_.PopFront();
282 }
283 }
284
IsStreamDataOutstanding(QuicStreamOffset offset,QuicByteCount data_length) const285 bool QuicStreamSendBuffer::IsStreamDataOutstanding(
286 QuicStreamOffset offset, QuicByteCount data_length) const {
287 return data_length > 0 &&
288 !bytes_acked_.Contains(offset, offset + data_length);
289 }
290
size() const291 size_t QuicStreamSendBuffer::size() const { return interval_deque_.Size(); }
292
293 } // namespace quic
294