1 // Copyright (c) 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/test_tools/simple_session_notifier.h"
6
7 #include "quiche/quic/core/quic_utils.h"
8 #include "quiche/quic/platform/api/quic_logging.h"
9 #include "quiche/quic/test_tools/quic_test_utils.h"
10
11 namespace quic {
12
13 namespace test {
14
SimpleSessionNotifier(QuicConnection * connection)15 SimpleSessionNotifier::SimpleSessionNotifier(QuicConnection* connection)
16 : last_control_frame_id_(kInvalidControlFrameId),
17 least_unacked_(1),
18 least_unsent_(1),
19 connection_(connection) {}
20
~SimpleSessionNotifier()21 SimpleSessionNotifier::~SimpleSessionNotifier() {
22 while (!control_frames_.empty()) {
23 DeleteFrame(&control_frames_.front());
24 control_frames_.pop_front();
25 }
26 }
27
StreamState()28 SimpleSessionNotifier::StreamState::StreamState()
29 : bytes_total(0),
30 bytes_sent(0),
31 fin_buffered(false),
32 fin_sent(false),
33 fin_outstanding(false),
34 fin_lost(false) {}
35
~StreamState()36 SimpleSessionNotifier::StreamState::~StreamState() {}
37
WriteOrBufferData(QuicStreamId id,QuicByteCount data_length,StreamSendingState state)38 QuicConsumedData SimpleSessionNotifier::WriteOrBufferData(
39 QuicStreamId id, QuicByteCount data_length, StreamSendingState state) {
40 return WriteOrBufferData(id, data_length, state, NOT_RETRANSMISSION);
41 }
42
WriteOrBufferData(QuicStreamId id,QuicByteCount data_length,StreamSendingState state,TransmissionType transmission_type)43 QuicConsumedData SimpleSessionNotifier::WriteOrBufferData(
44 QuicStreamId id, QuicByteCount data_length, StreamSendingState state,
45 TransmissionType transmission_type) {
46 if (!stream_map_.contains(id)) {
47 stream_map_[id] = StreamState();
48 }
49 StreamState& stream_state = stream_map_.find(id)->second;
50 const bool had_buffered_data =
51 HasBufferedStreamData() || HasBufferedControlFrames();
52 QuicStreamOffset offset = stream_state.bytes_sent;
53 QUIC_DVLOG(1) << "WriteOrBuffer stream_id: " << id << " [" << offset << ", "
54 << offset + data_length << "), fin: " << (state != NO_FIN);
55 stream_state.bytes_total += data_length;
56 stream_state.fin_buffered = state != NO_FIN;
57 if (had_buffered_data) {
58 QUIC_DLOG(WARNING) << "Connection is write blocked";
59 return {0, false};
60 }
61 const size_t length = stream_state.bytes_total - stream_state.bytes_sent;
62 connection_->SetTransmissionType(transmission_type);
63 QuicConsumedData consumed =
64 connection_->SendStreamData(id, length, stream_state.bytes_sent, state);
65 QUIC_DVLOG(1) << "consumed: " << consumed;
66 OnStreamDataConsumed(id, stream_state.bytes_sent, consumed.bytes_consumed,
67 consumed.fin_consumed);
68 return consumed;
69 }
70
OnStreamDataConsumed(QuicStreamId id,QuicStreamOffset offset,QuicByteCount data_length,bool fin)71 void SimpleSessionNotifier::OnStreamDataConsumed(QuicStreamId id,
72 QuicStreamOffset offset,
73 QuicByteCount data_length,
74 bool fin) {
75 StreamState& state = stream_map_.find(id)->second;
76 if (QuicUtils::IsCryptoStreamId(connection_->transport_version(), id) &&
77 data_length > 0) {
78 crypto_bytes_transferred_[connection_->encryption_level()].Add(
79 offset, offset + data_length);
80 }
81 state.bytes_sent += data_length;
82 state.fin_sent = fin;
83 state.fin_outstanding = fin;
84 }
85
WriteCryptoData(EncryptionLevel level,QuicByteCount data_length,QuicStreamOffset offset)86 size_t SimpleSessionNotifier::WriteCryptoData(EncryptionLevel level,
87 QuicByteCount data_length,
88 QuicStreamOffset offset) {
89 crypto_state_[level].bytes_total += data_length;
90 size_t bytes_written =
91 connection_->SendCryptoData(level, data_length, offset);
92 crypto_state_[level].bytes_sent += bytes_written;
93 crypto_bytes_transferred_[level].Add(offset, offset + bytes_written);
94 return bytes_written;
95 }
96
WriteOrBufferRstStream(QuicStreamId id,QuicRstStreamErrorCode error,QuicStreamOffset bytes_written)97 void SimpleSessionNotifier::WriteOrBufferRstStream(
98 QuicStreamId id, QuicRstStreamErrorCode error,
99 QuicStreamOffset bytes_written) {
100 QUIC_DVLOG(1) << "Writing RST_STREAM_FRAME";
101 const bool had_buffered_data =
102 HasBufferedStreamData() || HasBufferedControlFrames();
103 control_frames_.emplace_back((QuicFrame(new QuicRstStreamFrame(
104 ++last_control_frame_id_, id, error, bytes_written))));
105 if (error != QUIC_STREAM_NO_ERROR) {
106 // Delete stream to avoid retransmissions.
107 stream_map_.erase(id);
108 }
109 if (had_buffered_data) {
110 QUIC_DLOG(WARNING) << "Connection is write blocked";
111 return;
112 }
113 WriteBufferedControlFrames();
114 }
115
WriteOrBufferWindowUpate(QuicStreamId id,QuicStreamOffset byte_offset)116 void SimpleSessionNotifier::WriteOrBufferWindowUpate(
117 QuicStreamId id, QuicStreamOffset byte_offset) {
118 QUIC_DVLOG(1) << "Writing WINDOW_UPDATE";
119 const bool had_buffered_data =
120 HasBufferedStreamData() || HasBufferedControlFrames();
121 QuicControlFrameId control_frame_id = ++last_control_frame_id_;
122 control_frames_.emplace_back(
123 (QuicFrame(QuicWindowUpdateFrame(control_frame_id, id, byte_offset))));
124 if (had_buffered_data) {
125 QUIC_DLOG(WARNING) << "Connection is write blocked";
126 return;
127 }
128 WriteBufferedControlFrames();
129 }
130
WriteOrBufferPing()131 void SimpleSessionNotifier::WriteOrBufferPing() {
132 QUIC_DVLOG(1) << "Writing PING_FRAME";
133 const bool had_buffered_data =
134 HasBufferedStreamData() || HasBufferedControlFrames();
135 control_frames_.emplace_back(
136 (QuicFrame(QuicPingFrame(++last_control_frame_id_))));
137 if (had_buffered_data) {
138 QUIC_DLOG(WARNING) << "Connection is write blocked";
139 return;
140 }
141 WriteBufferedControlFrames();
142 }
143
WriteOrBufferAckFrequency(const QuicAckFrequencyFrame & ack_frequency_frame)144 void SimpleSessionNotifier::WriteOrBufferAckFrequency(
145 const QuicAckFrequencyFrame& ack_frequency_frame) {
146 QUIC_DVLOG(1) << "Writing ACK_FREQUENCY";
147 const bool had_buffered_data =
148 HasBufferedStreamData() || HasBufferedControlFrames();
149 QuicControlFrameId control_frame_id = ++last_control_frame_id_;
150 control_frames_.emplace_back((
151 QuicFrame(new QuicAckFrequencyFrame(control_frame_id,
152 /*sequence_number=*/control_frame_id,
153 ack_frequency_frame.packet_tolerance,
154 ack_frequency_frame.max_ack_delay))));
155 if (had_buffered_data) {
156 QUIC_DLOG(WARNING) << "Connection is write blocked";
157 return;
158 }
159 WriteBufferedControlFrames();
160 }
161
NeuterUnencryptedData()162 void SimpleSessionNotifier::NeuterUnencryptedData() {
163 if (QuicVersionUsesCryptoFrames(connection_->transport_version())) {
164 for (const auto& interval : crypto_bytes_transferred_[ENCRYPTION_INITIAL]) {
165 QuicCryptoFrame crypto_frame(ENCRYPTION_INITIAL, interval.min(),
166 interval.max() - interval.min());
167 OnFrameAcked(QuicFrame(&crypto_frame), QuicTime::Delta::Zero(),
168 QuicTime::Zero());
169 }
170 return;
171 }
172 for (const auto& interval : crypto_bytes_transferred_[ENCRYPTION_INITIAL]) {
173 QuicStreamFrame stream_frame(
174 QuicUtils::GetCryptoStreamId(connection_->transport_version()), false,
175 interval.min(), interval.max() - interval.min());
176 OnFrameAcked(QuicFrame(stream_frame), QuicTime::Delta::Zero(),
177 QuicTime::Zero());
178 }
179 }
180
OnCanWrite()181 void SimpleSessionNotifier::OnCanWrite() {
182 if (connection_->framer().is_processing_packet()) {
183 // Do not write data in the middle of packet processing because rest
184 // frames in the packet may change the data to write. For example, lost
185 // data could be acknowledged. Also, connection is going to emit
186 // OnCanWrite signal post packet processing.
187 QUIC_BUG(simple_notifier_write_mid_packet_processing)
188 << "Try to write mid packet processing.";
189 return;
190 }
191 if (!RetransmitLostCryptoData() || !RetransmitLostControlFrames() ||
192 !RetransmitLostStreamData()) {
193 return;
194 }
195 if (!WriteBufferedCryptoData() || !WriteBufferedControlFrames()) {
196 return;
197 }
198 // Write new data.
199 for (const auto& pair : stream_map_) {
200 const auto& state = pair.second;
201 if (!StreamHasBufferedData(pair.first)) {
202 continue;
203 }
204
205 const size_t length = state.bytes_total - state.bytes_sent;
206 const bool can_bundle_fin =
207 state.fin_buffered && (state.bytes_sent + length == state.bytes_total);
208 connection_->SetTransmissionType(NOT_RETRANSMISSION);
209 QuicConnection::ScopedEncryptionLevelContext context(
210 connection_,
211 connection_->framer().GetEncryptionLevelToSendApplicationData());
212 QuicConsumedData consumed = connection_->SendStreamData(
213 pair.first, length, state.bytes_sent, can_bundle_fin ? FIN : NO_FIN);
214 QUIC_DVLOG(1) << "Tries to write stream_id: " << pair.first << " ["
215 << state.bytes_sent << ", " << state.bytes_sent + length
216 << "), fin: " << can_bundle_fin
217 << ", and consumed: " << consumed;
218 OnStreamDataConsumed(pair.first, state.bytes_sent, consumed.bytes_consumed,
219 consumed.fin_consumed);
220 if (length != consumed.bytes_consumed ||
221 (can_bundle_fin && !consumed.fin_consumed)) {
222 break;
223 }
224 }
225 }
226
OnStreamReset(QuicStreamId id,QuicRstStreamErrorCode error)227 void SimpleSessionNotifier::OnStreamReset(QuicStreamId id,
228 QuicRstStreamErrorCode error) {
229 if (error != QUIC_STREAM_NO_ERROR) {
230 // Delete stream to avoid retransmissions.
231 stream_map_.erase(id);
232 }
233 }
234
WillingToWrite() const235 bool SimpleSessionNotifier::WillingToWrite() const {
236 QUIC_DVLOG(1) << "has_buffered_control_frames: " << HasBufferedControlFrames()
237 << " as_lost_control_frames: " << !lost_control_frames_.empty()
238 << " has_buffered_stream_data: " << HasBufferedStreamData()
239 << " has_lost_stream_data: " << HasLostStreamData();
240 return HasBufferedControlFrames() || !lost_control_frames_.empty() ||
241 HasBufferedStreamData() || HasLostStreamData();
242 }
243
StreamBytesSent() const244 QuicByteCount SimpleSessionNotifier::StreamBytesSent() const {
245 QuicByteCount bytes_sent = 0;
246 for (const auto& pair : stream_map_) {
247 const auto& state = pair.second;
248 bytes_sent += state.bytes_sent;
249 }
250 return bytes_sent;
251 }
252
StreamBytesToSend() const253 QuicByteCount SimpleSessionNotifier::StreamBytesToSend() const {
254 QuicByteCount bytes_to_send = 0;
255 for (const auto& pair : stream_map_) {
256 const auto& state = pair.second;
257 bytes_to_send += (state.bytes_total - state.bytes_sent);
258 }
259 return bytes_to_send;
260 }
261
OnFrameAcked(const QuicFrame & frame,QuicTime::Delta,QuicTime)262 bool SimpleSessionNotifier::OnFrameAcked(const QuicFrame& frame,
263 QuicTime::Delta /*ack_delay_time*/,
264 QuicTime /*receive_timestamp*/) {
265 QUIC_DVLOG(1) << "Acking " << frame;
266 if (frame.type == CRYPTO_FRAME) {
267 StreamState* state = &crypto_state_[frame.crypto_frame->level];
268 QuicStreamOffset offset = frame.crypto_frame->offset;
269 QuicByteCount data_length = frame.crypto_frame->data_length;
270 QuicIntervalSet<QuicStreamOffset> newly_acked(offset, offset + data_length);
271 newly_acked.Difference(state->bytes_acked);
272 if (newly_acked.Empty()) {
273 return false;
274 }
275 state->bytes_acked.Add(offset, offset + data_length);
276 state->pending_retransmissions.Difference(offset, offset + data_length);
277 return true;
278 }
279 if (frame.type != STREAM_FRAME) {
280 return OnControlFrameAcked(frame);
281 }
282 if (!stream_map_.contains(frame.stream_frame.stream_id)) {
283 return false;
284 }
285 auto* state = &stream_map_.find(frame.stream_frame.stream_id)->second;
286 QuicStreamOffset offset = frame.stream_frame.offset;
287 QuicByteCount data_length = frame.stream_frame.data_length;
288 QuicIntervalSet<QuicStreamOffset> newly_acked(offset, offset + data_length);
289 newly_acked.Difference(state->bytes_acked);
290 const bool fin_newly_acked = frame.stream_frame.fin && state->fin_outstanding;
291 if (newly_acked.Empty() && !fin_newly_acked) {
292 return false;
293 }
294 state->bytes_acked.Add(offset, offset + data_length);
295 if (fin_newly_acked) {
296 state->fin_outstanding = false;
297 state->fin_lost = false;
298 }
299 state->pending_retransmissions.Difference(offset, offset + data_length);
300 return true;
301 }
302
OnFrameLost(const QuicFrame & frame)303 void SimpleSessionNotifier::OnFrameLost(const QuicFrame& frame) {
304 QUIC_DVLOG(1) << "Losting " << frame;
305 if (frame.type == CRYPTO_FRAME) {
306 StreamState* state = &crypto_state_[frame.crypto_frame->level];
307 QuicStreamOffset offset = frame.crypto_frame->offset;
308 QuicByteCount data_length = frame.crypto_frame->data_length;
309 QuicIntervalSet<QuicStreamOffset> bytes_lost(offset, offset + data_length);
310 bytes_lost.Difference(state->bytes_acked);
311 if (bytes_lost.Empty()) {
312 return;
313 }
314 for (const auto& lost : bytes_lost) {
315 state->pending_retransmissions.Add(lost.min(), lost.max());
316 }
317 return;
318 }
319 if (frame.type != STREAM_FRAME) {
320 OnControlFrameLost(frame);
321 return;
322 }
323 if (!stream_map_.contains(frame.stream_frame.stream_id)) {
324 return;
325 }
326 auto* state = &stream_map_.find(frame.stream_frame.stream_id)->second;
327 QuicStreamOffset offset = frame.stream_frame.offset;
328 QuicByteCount data_length = frame.stream_frame.data_length;
329 QuicIntervalSet<QuicStreamOffset> bytes_lost(offset, offset + data_length);
330 bytes_lost.Difference(state->bytes_acked);
331 const bool fin_lost = state->fin_outstanding && frame.stream_frame.fin;
332 if (bytes_lost.Empty() && !fin_lost) {
333 return;
334 }
335 for (const auto& lost : bytes_lost) {
336 state->pending_retransmissions.Add(lost.min(), lost.max());
337 }
338 state->fin_lost = fin_lost;
339 }
340
RetransmitFrames(const QuicFrames & frames,TransmissionType type)341 bool SimpleSessionNotifier::RetransmitFrames(const QuicFrames& frames,
342 TransmissionType type) {
343 QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
344 connection_->SetTransmissionType(type);
345 for (const QuicFrame& frame : frames) {
346 if (frame.type == CRYPTO_FRAME) {
347 const StreamState& state = crypto_state_[frame.crypto_frame->level];
348 const EncryptionLevel current_encryption_level =
349 connection_->encryption_level();
350 QuicIntervalSet<QuicStreamOffset> retransmission(
351 frame.crypto_frame->offset,
352 frame.crypto_frame->offset + frame.crypto_frame->data_length);
353 retransmission.Difference(state.bytes_acked);
354 for (const auto& interval : retransmission) {
355 QuicStreamOffset offset = interval.min();
356 QuicByteCount length = interval.max() - interval.min();
357 connection_->SetDefaultEncryptionLevel(frame.crypto_frame->level);
358 size_t consumed = connection_->SendCryptoData(frame.crypto_frame->level,
359 length, offset);
360 if (consumed < length) {
361 return false;
362 }
363 }
364 connection_->SetDefaultEncryptionLevel(current_encryption_level);
365 }
366 if (frame.type != STREAM_FRAME) {
367 if (GetControlFrameId(frame) == kInvalidControlFrameId) {
368 continue;
369 }
370 QuicFrame copy = CopyRetransmittableControlFrame(frame);
371 if (!connection_->SendControlFrame(copy)) {
372 // Connection is write blocked.
373 DeleteFrame(©);
374 return false;
375 }
376 continue;
377 }
378 if (!stream_map_.contains(frame.stream_frame.stream_id)) {
379 continue;
380 }
381 const auto& state = stream_map_.find(frame.stream_frame.stream_id)->second;
382 QuicIntervalSet<QuicStreamOffset> retransmission(
383 frame.stream_frame.offset,
384 frame.stream_frame.offset + frame.stream_frame.data_length);
385 EncryptionLevel retransmission_encryption_level =
386 connection_->encryption_level();
387 if (QuicUtils::IsCryptoStreamId(connection_->transport_version(),
388 frame.stream_frame.stream_id)) {
389 for (size_t i = 0; i < NUM_ENCRYPTION_LEVELS; ++i) {
390 if (retransmission.Intersects(crypto_bytes_transferred_[i])) {
391 retransmission_encryption_level = static_cast<EncryptionLevel>(i);
392 retransmission.Intersection(crypto_bytes_transferred_[i]);
393 break;
394 }
395 }
396 }
397 retransmission.Difference(state.bytes_acked);
398 bool retransmit_fin = frame.stream_frame.fin && state.fin_outstanding;
399 QuicConsumedData consumed(0, false);
400 for (const auto& interval : retransmission) {
401 QuicStreamOffset retransmission_offset = interval.min();
402 QuicByteCount retransmission_length = interval.max() - interval.min();
403 const bool can_bundle_fin =
404 retransmit_fin &&
405 (retransmission_offset + retransmission_length == state.bytes_sent);
406 QuicConnection::ScopedEncryptionLevelContext context(
407 connection_,
408 QuicUtils::IsCryptoStreamId(connection_->transport_version(),
409 frame.stream_frame.stream_id)
410 ? retransmission_encryption_level
411 : connection_->framer()
412 .GetEncryptionLevelToSendApplicationData());
413 consumed = connection_->SendStreamData(
414 frame.stream_frame.stream_id, retransmission_length,
415 retransmission_offset, can_bundle_fin ? FIN : NO_FIN);
416 QUIC_DVLOG(1) << "stream " << frame.stream_frame.stream_id
417 << " is forced to retransmit stream data ["
418 << retransmission_offset << ", "
419 << retransmission_offset + retransmission_length
420 << ") and fin: " << can_bundle_fin
421 << ", consumed: " << consumed;
422 if (can_bundle_fin) {
423 retransmit_fin = !consumed.fin_consumed;
424 }
425 if (consumed.bytes_consumed < retransmission_length ||
426 (can_bundle_fin && !consumed.fin_consumed)) {
427 // Connection is write blocked.
428 return false;
429 }
430 }
431 if (retransmit_fin) {
432 QUIC_DVLOG(1) << "stream " << frame.stream_frame.stream_id
433 << " retransmits fin only frame.";
434 consumed = connection_->SendStreamData(frame.stream_frame.stream_id, 0,
435 state.bytes_sent, FIN);
436 if (!consumed.fin_consumed) {
437 return false;
438 }
439 }
440 }
441 return true;
442 }
443
IsFrameOutstanding(const QuicFrame & frame) const444 bool SimpleSessionNotifier::IsFrameOutstanding(const QuicFrame& frame) const {
445 if (frame.type == CRYPTO_FRAME) {
446 QuicStreamOffset offset = frame.crypto_frame->offset;
447 QuicByteCount data_length = frame.crypto_frame->data_length;
448 bool ret = data_length > 0 &&
449 !crypto_state_[frame.crypto_frame->level].bytes_acked.Contains(
450 offset, offset + data_length);
451 return ret;
452 }
453 if (frame.type != STREAM_FRAME) {
454 return IsControlFrameOutstanding(frame);
455 }
456 if (!stream_map_.contains(frame.stream_frame.stream_id)) {
457 return false;
458 }
459 const auto& state = stream_map_.find(frame.stream_frame.stream_id)->second;
460 QuicStreamOffset offset = frame.stream_frame.offset;
461 QuicByteCount data_length = frame.stream_frame.data_length;
462 return (data_length > 0 &&
463 !state.bytes_acked.Contains(offset, offset + data_length)) ||
464 (frame.stream_frame.fin && state.fin_outstanding);
465 }
466
HasUnackedCryptoData() const467 bool SimpleSessionNotifier::HasUnackedCryptoData() const {
468 if (QuicVersionUsesCryptoFrames(connection_->transport_version())) {
469 for (size_t i = 0; i < NUM_ENCRYPTION_LEVELS; ++i) {
470 const StreamState& state = crypto_state_[i];
471 if (state.bytes_total > state.bytes_sent) {
472 return true;
473 }
474 QuicIntervalSet<QuicStreamOffset> bytes_to_ack(0, state.bytes_total);
475 bytes_to_ack.Difference(state.bytes_acked);
476 if (!bytes_to_ack.Empty()) {
477 return true;
478 }
479 }
480 return false;
481 }
482 if (!stream_map_.contains(
483 QuicUtils::GetCryptoStreamId(connection_->transport_version()))) {
484 return false;
485 }
486 const auto& state =
487 stream_map_
488 .find(QuicUtils::GetCryptoStreamId(connection_->transport_version()))
489 ->second;
490 if (state.bytes_total > state.bytes_sent) {
491 return true;
492 }
493 QuicIntervalSet<QuicStreamOffset> bytes_to_ack(0, state.bytes_total);
494 bytes_to_ack.Difference(state.bytes_acked);
495 return !bytes_to_ack.Empty();
496 }
497
HasUnackedStreamData() const498 bool SimpleSessionNotifier::HasUnackedStreamData() const {
499 for (const auto& it : stream_map_) {
500 if (StreamIsWaitingForAcks(it.first)) return true;
501 }
502 return false;
503 }
504
OnControlFrameAcked(const QuicFrame & frame)505 bool SimpleSessionNotifier::OnControlFrameAcked(const QuicFrame& frame) {
506 QuicControlFrameId id = GetControlFrameId(frame);
507 if (id == kInvalidControlFrameId) {
508 return false;
509 }
510 QUICHE_DCHECK(id < least_unacked_ + control_frames_.size());
511 if (id < least_unacked_ ||
512 GetControlFrameId(control_frames_.at(id - least_unacked_)) ==
513 kInvalidControlFrameId) {
514 return false;
515 }
516 SetControlFrameId(kInvalidControlFrameId,
517 &control_frames_.at(id - least_unacked_));
518 lost_control_frames_.erase(id);
519 while (!control_frames_.empty() &&
520 GetControlFrameId(control_frames_.front()) == kInvalidControlFrameId) {
521 DeleteFrame(&control_frames_.front());
522 control_frames_.pop_front();
523 ++least_unacked_;
524 }
525 return true;
526 }
527
OnControlFrameLost(const QuicFrame & frame)528 void SimpleSessionNotifier::OnControlFrameLost(const QuicFrame& frame) {
529 QuicControlFrameId id = GetControlFrameId(frame);
530 if (id == kInvalidControlFrameId) {
531 return;
532 }
533 QUICHE_DCHECK(id < least_unacked_ + control_frames_.size());
534 if (id < least_unacked_ ||
535 GetControlFrameId(control_frames_.at(id - least_unacked_)) ==
536 kInvalidControlFrameId) {
537 return;
538 }
539 if (!lost_control_frames_.contains(id)) {
540 lost_control_frames_[id] = true;
541 }
542 }
543
IsControlFrameOutstanding(const QuicFrame & frame) const544 bool SimpleSessionNotifier::IsControlFrameOutstanding(
545 const QuicFrame& frame) const {
546 QuicControlFrameId id = GetControlFrameId(frame);
547 if (id == kInvalidControlFrameId) {
548 return false;
549 }
550 return id < least_unacked_ + control_frames_.size() && id >= least_unacked_ &&
551 GetControlFrameId(control_frames_.at(id - least_unacked_)) !=
552 kInvalidControlFrameId;
553 }
554
RetransmitLostControlFrames()555 bool SimpleSessionNotifier::RetransmitLostControlFrames() {
556 while (!lost_control_frames_.empty()) {
557 QuicFrame pending = control_frames_.at(lost_control_frames_.begin()->first -
558 least_unacked_);
559 QuicFrame copy = CopyRetransmittableControlFrame(pending);
560 connection_->SetTransmissionType(LOSS_RETRANSMISSION);
561 if (!connection_->SendControlFrame(copy)) {
562 // Connection is write blocked.
563 DeleteFrame(©);
564 break;
565 }
566 lost_control_frames_.pop_front();
567 }
568 return lost_control_frames_.empty();
569 }
570
RetransmitLostCryptoData()571 bool SimpleSessionNotifier::RetransmitLostCryptoData() {
572 if (QuicVersionUsesCryptoFrames(connection_->transport_version())) {
573 for (EncryptionLevel level :
574 {ENCRYPTION_INITIAL, ENCRYPTION_HANDSHAKE, ENCRYPTION_ZERO_RTT,
575 ENCRYPTION_FORWARD_SECURE}) {
576 auto& state = crypto_state_[level];
577 while (!state.pending_retransmissions.Empty()) {
578 connection_->SetTransmissionType(HANDSHAKE_RETRANSMISSION);
579 EncryptionLevel current_encryption_level =
580 connection_->encryption_level();
581 connection_->SetDefaultEncryptionLevel(level);
582 QuicIntervalSet<QuicStreamOffset> retransmission(
583 state.pending_retransmissions.begin()->min(),
584 state.pending_retransmissions.begin()->max());
585 retransmission.Intersection(crypto_bytes_transferred_[level]);
586 QuicStreamOffset retransmission_offset = retransmission.begin()->min();
587 QuicByteCount retransmission_length =
588 retransmission.begin()->max() - retransmission.begin()->min();
589 size_t bytes_consumed = connection_->SendCryptoData(
590 level, retransmission_length, retransmission_offset);
591 // Restore encryption level.
592 connection_->SetDefaultEncryptionLevel(current_encryption_level);
593 state.pending_retransmissions.Difference(
594 retransmission_offset, retransmission_offset + bytes_consumed);
595 if (bytes_consumed < retransmission_length) {
596 return false;
597 }
598 }
599 }
600 return true;
601 }
602 if (!stream_map_.contains(
603 QuicUtils::GetCryptoStreamId(connection_->transport_version()))) {
604 return true;
605 }
606 auto& state =
607 stream_map_
608 .find(QuicUtils::GetCryptoStreamId(connection_->transport_version()))
609 ->second;
610 while (!state.pending_retransmissions.Empty()) {
611 connection_->SetTransmissionType(HANDSHAKE_RETRANSMISSION);
612 QuicIntervalSet<QuicStreamOffset> retransmission(
613 state.pending_retransmissions.begin()->min(),
614 state.pending_retransmissions.begin()->max());
615 EncryptionLevel retransmission_encryption_level = ENCRYPTION_INITIAL;
616 for (size_t i = 0; i < NUM_ENCRYPTION_LEVELS; ++i) {
617 if (retransmission.Intersects(crypto_bytes_transferred_[i])) {
618 retransmission_encryption_level = static_cast<EncryptionLevel>(i);
619 retransmission.Intersection(crypto_bytes_transferred_[i]);
620 break;
621 }
622 }
623 QuicStreamOffset retransmission_offset = retransmission.begin()->min();
624 QuicByteCount retransmission_length =
625 retransmission.begin()->max() - retransmission.begin()->min();
626 EncryptionLevel current_encryption_level = connection_->encryption_level();
627 // Set appropriate encryption level.
628 connection_->SetDefaultEncryptionLevel(retransmission_encryption_level);
629 QuicConsumedData consumed = connection_->SendStreamData(
630 QuicUtils::GetCryptoStreamId(connection_->transport_version()),
631 retransmission_length, retransmission_offset, NO_FIN);
632 // Restore encryption level.
633 connection_->SetDefaultEncryptionLevel(current_encryption_level);
634 state.pending_retransmissions.Difference(
635 retransmission_offset, retransmission_offset + consumed.bytes_consumed);
636 if (consumed.bytes_consumed < retransmission_length) {
637 break;
638 }
639 }
640 return state.pending_retransmissions.Empty();
641 }
642
RetransmitLostStreamData()643 bool SimpleSessionNotifier::RetransmitLostStreamData() {
644 for (auto& pair : stream_map_) {
645 StreamState& state = pair.second;
646 QuicConsumedData consumed(0, false);
647 while (!state.pending_retransmissions.Empty() || state.fin_lost) {
648 connection_->SetTransmissionType(LOSS_RETRANSMISSION);
649 if (state.pending_retransmissions.Empty()) {
650 QUIC_DVLOG(1) << "stream " << pair.first
651 << " retransmits fin only frame.";
652 consumed =
653 connection_->SendStreamData(pair.first, 0, state.bytes_sent, FIN);
654 state.fin_lost = !consumed.fin_consumed;
655 if (state.fin_lost) {
656 QUIC_DLOG(INFO) << "Connection is write blocked";
657 return false;
658 }
659 } else {
660 QuicStreamOffset offset = state.pending_retransmissions.begin()->min();
661 QuicByteCount length = state.pending_retransmissions.begin()->max() -
662 state.pending_retransmissions.begin()->min();
663 const bool can_bundle_fin =
664 state.fin_lost && (offset + length == state.bytes_sent);
665 consumed = connection_->SendStreamData(pair.first, length, offset,
666 can_bundle_fin ? FIN : NO_FIN);
667 QUIC_DVLOG(1) << "stream " << pair.first
668 << " tries to retransmit stream data [" << offset << ", "
669 << offset + length << ") and fin: " << can_bundle_fin
670 << ", consumed: " << consumed;
671 state.pending_retransmissions.Difference(
672 offset, offset + consumed.bytes_consumed);
673 if (consumed.fin_consumed) {
674 state.fin_lost = false;
675 }
676 if (length > consumed.bytes_consumed ||
677 (can_bundle_fin && !consumed.fin_consumed)) {
678 QUIC_DVLOG(1) << "Connection is write blocked";
679 break;
680 }
681 }
682 }
683 }
684 return !HasLostStreamData();
685 }
686
WriteBufferedCryptoData()687 bool SimpleSessionNotifier::WriteBufferedCryptoData() {
688 for (size_t i = 0; i < NUM_ENCRYPTION_LEVELS; ++i) {
689 const StreamState& state = crypto_state_[i];
690 QuicIntervalSet<QuicStreamOffset> buffered_crypto_data(0,
691 state.bytes_total);
692 buffered_crypto_data.Difference(crypto_bytes_transferred_[i]);
693 for (const auto& interval : buffered_crypto_data) {
694 size_t bytes_written = connection_->SendCryptoData(
695 static_cast<EncryptionLevel>(i), interval.Length(), interval.min());
696 crypto_state_[i].bytes_sent += bytes_written;
697 crypto_bytes_transferred_[i].Add(interval.min(),
698 interval.min() + bytes_written);
699 if (bytes_written < interval.Length()) {
700 return false;
701 }
702 }
703 }
704 return true;
705 }
706
WriteBufferedControlFrames()707 bool SimpleSessionNotifier::WriteBufferedControlFrames() {
708 while (HasBufferedControlFrames()) {
709 QuicFrame frame_to_send =
710 control_frames_.at(least_unsent_ - least_unacked_);
711 QuicFrame copy = CopyRetransmittableControlFrame(frame_to_send);
712 connection_->SetTransmissionType(NOT_RETRANSMISSION);
713 if (!connection_->SendControlFrame(copy)) {
714 // Connection is write blocked.
715 DeleteFrame(©);
716 break;
717 }
718 ++least_unsent_;
719 }
720 return !HasBufferedControlFrames();
721 }
722
HasBufferedControlFrames() const723 bool SimpleSessionNotifier::HasBufferedControlFrames() const {
724 return least_unsent_ < least_unacked_ + control_frames_.size();
725 }
726
HasBufferedStreamData() const727 bool SimpleSessionNotifier::HasBufferedStreamData() const {
728 for (const auto& pair : stream_map_) {
729 const auto& state = pair.second;
730 if (state.bytes_total > state.bytes_sent ||
731 (state.fin_buffered && !state.fin_sent)) {
732 return true;
733 }
734 }
735 return false;
736 }
737
StreamIsWaitingForAcks(QuicStreamId id) const738 bool SimpleSessionNotifier::StreamIsWaitingForAcks(QuicStreamId id) const {
739 if (!stream_map_.contains(id)) {
740 return false;
741 }
742 const StreamState& state = stream_map_.find(id)->second;
743 return !state.bytes_acked.Contains(0, state.bytes_sent) ||
744 state.fin_outstanding;
745 }
746
StreamHasBufferedData(QuicStreamId id) const747 bool SimpleSessionNotifier::StreamHasBufferedData(QuicStreamId id) const {
748 if (!stream_map_.contains(id)) {
749 return false;
750 }
751 const StreamState& state = stream_map_.find(id)->second;
752 return state.bytes_total > state.bytes_sent ||
753 (state.fin_buffered && !state.fin_sent);
754 }
755
HasLostStreamData() const756 bool SimpleSessionNotifier::HasLostStreamData() const {
757 for (const auto& pair : stream_map_) {
758 const auto& state = pair.second;
759 if (!state.pending_retransmissions.Empty() || state.fin_lost) {
760 return true;
761 }
762 }
763 return false;
764 }
765
766 } // namespace test
767
768 } // namespace quic
769