xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/test_tools/simple_session_notifier.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright (c) 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/test_tools/simple_session_notifier.h"
6 
7 #include "quiche/quic/core/quic_utils.h"
8 #include "quiche/quic/platform/api/quic_logging.h"
9 #include "quiche/quic/test_tools/quic_test_utils.h"
10 
11 namespace quic {
12 
13 namespace test {
14 
SimpleSessionNotifier(QuicConnection * connection)15 SimpleSessionNotifier::SimpleSessionNotifier(QuicConnection* connection)
16     : last_control_frame_id_(kInvalidControlFrameId),
17       least_unacked_(1),
18       least_unsent_(1),
19       connection_(connection) {}
20 
~SimpleSessionNotifier()21 SimpleSessionNotifier::~SimpleSessionNotifier() {
22   while (!control_frames_.empty()) {
23     DeleteFrame(&control_frames_.front());
24     control_frames_.pop_front();
25   }
26 }
27 
StreamState()28 SimpleSessionNotifier::StreamState::StreamState()
29     : bytes_total(0),
30       bytes_sent(0),
31       fin_buffered(false),
32       fin_sent(false),
33       fin_outstanding(false),
34       fin_lost(false) {}
35 
~StreamState()36 SimpleSessionNotifier::StreamState::~StreamState() {}
37 
WriteOrBufferData(QuicStreamId id,QuicByteCount data_length,StreamSendingState state)38 QuicConsumedData SimpleSessionNotifier::WriteOrBufferData(
39     QuicStreamId id, QuicByteCount data_length, StreamSendingState state) {
40   return WriteOrBufferData(id, data_length, state, NOT_RETRANSMISSION);
41 }
42 
WriteOrBufferData(QuicStreamId id,QuicByteCount data_length,StreamSendingState state,TransmissionType transmission_type)43 QuicConsumedData SimpleSessionNotifier::WriteOrBufferData(
44     QuicStreamId id, QuicByteCount data_length, StreamSendingState state,
45     TransmissionType transmission_type) {
46   if (!stream_map_.contains(id)) {
47     stream_map_[id] = StreamState();
48   }
49   StreamState& stream_state = stream_map_.find(id)->second;
50   const bool had_buffered_data =
51       HasBufferedStreamData() || HasBufferedControlFrames();
52   QuicStreamOffset offset = stream_state.bytes_sent;
53   QUIC_DVLOG(1) << "WriteOrBuffer stream_id: " << id << " [" << offset << ", "
54                 << offset + data_length << "), fin: " << (state != NO_FIN);
55   stream_state.bytes_total += data_length;
56   stream_state.fin_buffered = state != NO_FIN;
57   if (had_buffered_data) {
58     QUIC_DLOG(WARNING) << "Connection is write blocked";
59     return {0, false};
60   }
61   const size_t length = stream_state.bytes_total - stream_state.bytes_sent;
62   connection_->SetTransmissionType(transmission_type);
63   QuicConsumedData consumed =
64       connection_->SendStreamData(id, length, stream_state.bytes_sent, state);
65   QUIC_DVLOG(1) << "consumed: " << consumed;
66   OnStreamDataConsumed(id, stream_state.bytes_sent, consumed.bytes_consumed,
67                        consumed.fin_consumed);
68   return consumed;
69 }
70 
OnStreamDataConsumed(QuicStreamId id,QuicStreamOffset offset,QuicByteCount data_length,bool fin)71 void SimpleSessionNotifier::OnStreamDataConsumed(QuicStreamId id,
72                                                  QuicStreamOffset offset,
73                                                  QuicByteCount data_length,
74                                                  bool fin) {
75   StreamState& state = stream_map_.find(id)->second;
76   if (QuicUtils::IsCryptoStreamId(connection_->transport_version(), id) &&
77       data_length > 0) {
78     crypto_bytes_transferred_[connection_->encryption_level()].Add(
79         offset, offset + data_length);
80   }
81   state.bytes_sent += data_length;
82   state.fin_sent = fin;
83   state.fin_outstanding = fin;
84 }
85 
WriteCryptoData(EncryptionLevel level,QuicByteCount data_length,QuicStreamOffset offset)86 size_t SimpleSessionNotifier::WriteCryptoData(EncryptionLevel level,
87                                               QuicByteCount data_length,
88                                               QuicStreamOffset offset) {
89   crypto_state_[level].bytes_total += data_length;
90   size_t bytes_written =
91       connection_->SendCryptoData(level, data_length, offset);
92   crypto_state_[level].bytes_sent += bytes_written;
93   crypto_bytes_transferred_[level].Add(offset, offset + bytes_written);
94   return bytes_written;
95 }
96 
WriteOrBufferRstStream(QuicStreamId id,QuicRstStreamErrorCode error,QuicStreamOffset bytes_written)97 void SimpleSessionNotifier::WriteOrBufferRstStream(
98     QuicStreamId id, QuicRstStreamErrorCode error,
99     QuicStreamOffset bytes_written) {
100   QUIC_DVLOG(1) << "Writing RST_STREAM_FRAME";
101   const bool had_buffered_data =
102       HasBufferedStreamData() || HasBufferedControlFrames();
103   control_frames_.emplace_back((QuicFrame(new QuicRstStreamFrame(
104       ++last_control_frame_id_, id, error, bytes_written))));
105   if (error != QUIC_STREAM_NO_ERROR) {
106     // Delete stream to avoid retransmissions.
107     stream_map_.erase(id);
108   }
109   if (had_buffered_data) {
110     QUIC_DLOG(WARNING) << "Connection is write blocked";
111     return;
112   }
113   WriteBufferedControlFrames();
114 }
115 
WriteOrBufferWindowUpate(QuicStreamId id,QuicStreamOffset byte_offset)116 void SimpleSessionNotifier::WriteOrBufferWindowUpate(
117     QuicStreamId id, QuicStreamOffset byte_offset) {
118   QUIC_DVLOG(1) << "Writing WINDOW_UPDATE";
119   const bool had_buffered_data =
120       HasBufferedStreamData() || HasBufferedControlFrames();
121   QuicControlFrameId control_frame_id = ++last_control_frame_id_;
122   control_frames_.emplace_back(
123       (QuicFrame(QuicWindowUpdateFrame(control_frame_id, id, byte_offset))));
124   if (had_buffered_data) {
125     QUIC_DLOG(WARNING) << "Connection is write blocked";
126     return;
127   }
128   WriteBufferedControlFrames();
129 }
130 
WriteOrBufferPing()131 void SimpleSessionNotifier::WriteOrBufferPing() {
132   QUIC_DVLOG(1) << "Writing PING_FRAME";
133   const bool had_buffered_data =
134       HasBufferedStreamData() || HasBufferedControlFrames();
135   control_frames_.emplace_back(
136       (QuicFrame(QuicPingFrame(++last_control_frame_id_))));
137   if (had_buffered_data) {
138     QUIC_DLOG(WARNING) << "Connection is write blocked";
139     return;
140   }
141   WriteBufferedControlFrames();
142 }
143 
WriteOrBufferAckFrequency(const QuicAckFrequencyFrame & ack_frequency_frame)144 void SimpleSessionNotifier::WriteOrBufferAckFrequency(
145     const QuicAckFrequencyFrame& ack_frequency_frame) {
146   QUIC_DVLOG(1) << "Writing ACK_FREQUENCY";
147   const bool had_buffered_data =
148       HasBufferedStreamData() || HasBufferedControlFrames();
149   QuicControlFrameId control_frame_id = ++last_control_frame_id_;
150   control_frames_.emplace_back((
151       QuicFrame(new QuicAckFrequencyFrame(control_frame_id,
152                                           /*sequence_number=*/control_frame_id,
153                                           ack_frequency_frame.packet_tolerance,
154                                           ack_frequency_frame.max_ack_delay))));
155   if (had_buffered_data) {
156     QUIC_DLOG(WARNING) << "Connection is write blocked";
157     return;
158   }
159   WriteBufferedControlFrames();
160 }
161 
NeuterUnencryptedData()162 void SimpleSessionNotifier::NeuterUnencryptedData() {
163   if (QuicVersionUsesCryptoFrames(connection_->transport_version())) {
164     for (const auto& interval : crypto_bytes_transferred_[ENCRYPTION_INITIAL]) {
165       QuicCryptoFrame crypto_frame(ENCRYPTION_INITIAL, interval.min(),
166                                    interval.max() - interval.min());
167       OnFrameAcked(QuicFrame(&crypto_frame), QuicTime::Delta::Zero(),
168                    QuicTime::Zero());
169     }
170     return;
171   }
172   for (const auto& interval : crypto_bytes_transferred_[ENCRYPTION_INITIAL]) {
173     QuicStreamFrame stream_frame(
174         QuicUtils::GetCryptoStreamId(connection_->transport_version()), false,
175         interval.min(), interval.max() - interval.min());
176     OnFrameAcked(QuicFrame(stream_frame), QuicTime::Delta::Zero(),
177                  QuicTime::Zero());
178   }
179 }
180 
OnCanWrite()181 void SimpleSessionNotifier::OnCanWrite() {
182   if (connection_->framer().is_processing_packet()) {
183     // Do not write data in the middle of packet processing because rest
184     // frames in the packet may change the data to write. For example, lost
185     // data could be acknowledged. Also, connection is going to emit
186     // OnCanWrite signal post packet processing.
187     QUIC_BUG(simple_notifier_write_mid_packet_processing)
188         << "Try to write mid packet processing.";
189     return;
190   }
191   if (!RetransmitLostCryptoData() || !RetransmitLostControlFrames() ||
192       !RetransmitLostStreamData()) {
193     return;
194   }
195   if (!WriteBufferedCryptoData() || !WriteBufferedControlFrames()) {
196     return;
197   }
198   // Write new data.
199   for (const auto& pair : stream_map_) {
200     const auto& state = pair.second;
201     if (!StreamHasBufferedData(pair.first)) {
202       continue;
203     }
204 
205     const size_t length = state.bytes_total - state.bytes_sent;
206     const bool can_bundle_fin =
207         state.fin_buffered && (state.bytes_sent + length == state.bytes_total);
208     connection_->SetTransmissionType(NOT_RETRANSMISSION);
209     QuicConnection::ScopedEncryptionLevelContext context(
210         connection_,
211         connection_->framer().GetEncryptionLevelToSendApplicationData());
212     QuicConsumedData consumed = connection_->SendStreamData(
213         pair.first, length, state.bytes_sent, can_bundle_fin ? FIN : NO_FIN);
214     QUIC_DVLOG(1) << "Tries to write stream_id: " << pair.first << " ["
215                   << state.bytes_sent << ", " << state.bytes_sent + length
216                   << "), fin: " << can_bundle_fin
217                   << ", and consumed: " << consumed;
218     OnStreamDataConsumed(pair.first, state.bytes_sent, consumed.bytes_consumed,
219                          consumed.fin_consumed);
220     if (length != consumed.bytes_consumed ||
221         (can_bundle_fin && !consumed.fin_consumed)) {
222       break;
223     }
224   }
225 }
226 
OnStreamReset(QuicStreamId id,QuicRstStreamErrorCode error)227 void SimpleSessionNotifier::OnStreamReset(QuicStreamId id,
228                                           QuicRstStreamErrorCode error) {
229   if (error != QUIC_STREAM_NO_ERROR) {
230     // Delete stream to avoid retransmissions.
231     stream_map_.erase(id);
232   }
233 }
234 
WillingToWrite() const235 bool SimpleSessionNotifier::WillingToWrite() const {
236   QUIC_DVLOG(1) << "has_buffered_control_frames: " << HasBufferedControlFrames()
237                 << " as_lost_control_frames: " << !lost_control_frames_.empty()
238                 << " has_buffered_stream_data: " << HasBufferedStreamData()
239                 << " has_lost_stream_data: " << HasLostStreamData();
240   return HasBufferedControlFrames() || !lost_control_frames_.empty() ||
241          HasBufferedStreamData() || HasLostStreamData();
242 }
243 
StreamBytesSent() const244 QuicByteCount SimpleSessionNotifier::StreamBytesSent() const {
245   QuicByteCount bytes_sent = 0;
246   for (const auto& pair : stream_map_) {
247     const auto& state = pair.second;
248     bytes_sent += state.bytes_sent;
249   }
250   return bytes_sent;
251 }
252 
StreamBytesToSend() const253 QuicByteCount SimpleSessionNotifier::StreamBytesToSend() const {
254   QuicByteCount bytes_to_send = 0;
255   for (const auto& pair : stream_map_) {
256     const auto& state = pair.second;
257     bytes_to_send += (state.bytes_total - state.bytes_sent);
258   }
259   return bytes_to_send;
260 }
261 
OnFrameAcked(const QuicFrame & frame,QuicTime::Delta,QuicTime)262 bool SimpleSessionNotifier::OnFrameAcked(const QuicFrame& frame,
263                                          QuicTime::Delta /*ack_delay_time*/,
264                                          QuicTime /*receive_timestamp*/) {
265   QUIC_DVLOG(1) << "Acking " << frame;
266   if (frame.type == CRYPTO_FRAME) {
267     StreamState* state = &crypto_state_[frame.crypto_frame->level];
268     QuicStreamOffset offset = frame.crypto_frame->offset;
269     QuicByteCount data_length = frame.crypto_frame->data_length;
270     QuicIntervalSet<QuicStreamOffset> newly_acked(offset, offset + data_length);
271     newly_acked.Difference(state->bytes_acked);
272     if (newly_acked.Empty()) {
273       return false;
274     }
275     state->bytes_acked.Add(offset, offset + data_length);
276     state->pending_retransmissions.Difference(offset, offset + data_length);
277     return true;
278   }
279   if (frame.type != STREAM_FRAME) {
280     return OnControlFrameAcked(frame);
281   }
282   if (!stream_map_.contains(frame.stream_frame.stream_id)) {
283     return false;
284   }
285   auto* state = &stream_map_.find(frame.stream_frame.stream_id)->second;
286   QuicStreamOffset offset = frame.stream_frame.offset;
287   QuicByteCount data_length = frame.stream_frame.data_length;
288   QuicIntervalSet<QuicStreamOffset> newly_acked(offset, offset + data_length);
289   newly_acked.Difference(state->bytes_acked);
290   const bool fin_newly_acked = frame.stream_frame.fin && state->fin_outstanding;
291   if (newly_acked.Empty() && !fin_newly_acked) {
292     return false;
293   }
294   state->bytes_acked.Add(offset, offset + data_length);
295   if (fin_newly_acked) {
296     state->fin_outstanding = false;
297     state->fin_lost = false;
298   }
299   state->pending_retransmissions.Difference(offset, offset + data_length);
300   return true;
301 }
302 
OnFrameLost(const QuicFrame & frame)303 void SimpleSessionNotifier::OnFrameLost(const QuicFrame& frame) {
304   QUIC_DVLOG(1) << "Losting " << frame;
305   if (frame.type == CRYPTO_FRAME) {
306     StreamState* state = &crypto_state_[frame.crypto_frame->level];
307     QuicStreamOffset offset = frame.crypto_frame->offset;
308     QuicByteCount data_length = frame.crypto_frame->data_length;
309     QuicIntervalSet<QuicStreamOffset> bytes_lost(offset, offset + data_length);
310     bytes_lost.Difference(state->bytes_acked);
311     if (bytes_lost.Empty()) {
312       return;
313     }
314     for (const auto& lost : bytes_lost) {
315       state->pending_retransmissions.Add(lost.min(), lost.max());
316     }
317     return;
318   }
319   if (frame.type != STREAM_FRAME) {
320     OnControlFrameLost(frame);
321     return;
322   }
323   if (!stream_map_.contains(frame.stream_frame.stream_id)) {
324     return;
325   }
326   auto* state = &stream_map_.find(frame.stream_frame.stream_id)->second;
327   QuicStreamOffset offset = frame.stream_frame.offset;
328   QuicByteCount data_length = frame.stream_frame.data_length;
329   QuicIntervalSet<QuicStreamOffset> bytes_lost(offset, offset + data_length);
330   bytes_lost.Difference(state->bytes_acked);
331   const bool fin_lost = state->fin_outstanding && frame.stream_frame.fin;
332   if (bytes_lost.Empty() && !fin_lost) {
333     return;
334   }
335   for (const auto& lost : bytes_lost) {
336     state->pending_retransmissions.Add(lost.min(), lost.max());
337   }
338   state->fin_lost = fin_lost;
339 }
340 
RetransmitFrames(const QuicFrames & frames,TransmissionType type)341 bool SimpleSessionNotifier::RetransmitFrames(const QuicFrames& frames,
342                                              TransmissionType type) {
343   QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
344   connection_->SetTransmissionType(type);
345   for (const QuicFrame& frame : frames) {
346     if (frame.type == CRYPTO_FRAME) {
347       const StreamState& state = crypto_state_[frame.crypto_frame->level];
348       const EncryptionLevel current_encryption_level =
349           connection_->encryption_level();
350       QuicIntervalSet<QuicStreamOffset> retransmission(
351           frame.crypto_frame->offset,
352           frame.crypto_frame->offset + frame.crypto_frame->data_length);
353       retransmission.Difference(state.bytes_acked);
354       for (const auto& interval : retransmission) {
355         QuicStreamOffset offset = interval.min();
356         QuicByteCount length = interval.max() - interval.min();
357         connection_->SetDefaultEncryptionLevel(frame.crypto_frame->level);
358         size_t consumed = connection_->SendCryptoData(frame.crypto_frame->level,
359                                                       length, offset);
360         if (consumed < length) {
361           return false;
362         }
363       }
364       connection_->SetDefaultEncryptionLevel(current_encryption_level);
365     }
366     if (frame.type != STREAM_FRAME) {
367       if (GetControlFrameId(frame) == kInvalidControlFrameId) {
368         continue;
369       }
370       QuicFrame copy = CopyRetransmittableControlFrame(frame);
371       if (!connection_->SendControlFrame(copy)) {
372         // Connection is write blocked.
373         DeleteFrame(&copy);
374         return false;
375       }
376       continue;
377     }
378     if (!stream_map_.contains(frame.stream_frame.stream_id)) {
379       continue;
380     }
381     const auto& state = stream_map_.find(frame.stream_frame.stream_id)->second;
382     QuicIntervalSet<QuicStreamOffset> retransmission(
383         frame.stream_frame.offset,
384         frame.stream_frame.offset + frame.stream_frame.data_length);
385     EncryptionLevel retransmission_encryption_level =
386         connection_->encryption_level();
387     if (QuicUtils::IsCryptoStreamId(connection_->transport_version(),
388                                     frame.stream_frame.stream_id)) {
389       for (size_t i = 0; i < NUM_ENCRYPTION_LEVELS; ++i) {
390         if (retransmission.Intersects(crypto_bytes_transferred_[i])) {
391           retransmission_encryption_level = static_cast<EncryptionLevel>(i);
392           retransmission.Intersection(crypto_bytes_transferred_[i]);
393           break;
394         }
395       }
396     }
397     retransmission.Difference(state.bytes_acked);
398     bool retransmit_fin = frame.stream_frame.fin && state.fin_outstanding;
399     QuicConsumedData consumed(0, false);
400     for (const auto& interval : retransmission) {
401       QuicStreamOffset retransmission_offset = interval.min();
402       QuicByteCount retransmission_length = interval.max() - interval.min();
403       const bool can_bundle_fin =
404           retransmit_fin &&
405           (retransmission_offset + retransmission_length == state.bytes_sent);
406       QuicConnection::ScopedEncryptionLevelContext context(
407           connection_,
408           QuicUtils::IsCryptoStreamId(connection_->transport_version(),
409                                       frame.stream_frame.stream_id)
410               ? retransmission_encryption_level
411               : connection_->framer()
412                     .GetEncryptionLevelToSendApplicationData());
413       consumed = connection_->SendStreamData(
414           frame.stream_frame.stream_id, retransmission_length,
415           retransmission_offset, can_bundle_fin ? FIN : NO_FIN);
416       QUIC_DVLOG(1) << "stream " << frame.stream_frame.stream_id
417                     << " is forced to retransmit stream data ["
418                     << retransmission_offset << ", "
419                     << retransmission_offset + retransmission_length
420                     << ") and fin: " << can_bundle_fin
421                     << ", consumed: " << consumed;
422       if (can_bundle_fin) {
423         retransmit_fin = !consumed.fin_consumed;
424       }
425       if (consumed.bytes_consumed < retransmission_length ||
426           (can_bundle_fin && !consumed.fin_consumed)) {
427         // Connection is write blocked.
428         return false;
429       }
430     }
431     if (retransmit_fin) {
432       QUIC_DVLOG(1) << "stream " << frame.stream_frame.stream_id
433                     << " retransmits fin only frame.";
434       consumed = connection_->SendStreamData(frame.stream_frame.stream_id, 0,
435                                              state.bytes_sent, FIN);
436       if (!consumed.fin_consumed) {
437         return false;
438       }
439     }
440   }
441   return true;
442 }
443 
IsFrameOutstanding(const QuicFrame & frame) const444 bool SimpleSessionNotifier::IsFrameOutstanding(const QuicFrame& frame) const {
445   if (frame.type == CRYPTO_FRAME) {
446     QuicStreamOffset offset = frame.crypto_frame->offset;
447     QuicByteCount data_length = frame.crypto_frame->data_length;
448     bool ret = data_length > 0 &&
449                !crypto_state_[frame.crypto_frame->level].bytes_acked.Contains(
450                    offset, offset + data_length);
451     return ret;
452   }
453   if (frame.type != STREAM_FRAME) {
454     return IsControlFrameOutstanding(frame);
455   }
456   if (!stream_map_.contains(frame.stream_frame.stream_id)) {
457     return false;
458   }
459   const auto& state = stream_map_.find(frame.stream_frame.stream_id)->second;
460   QuicStreamOffset offset = frame.stream_frame.offset;
461   QuicByteCount data_length = frame.stream_frame.data_length;
462   return (data_length > 0 &&
463           !state.bytes_acked.Contains(offset, offset + data_length)) ||
464          (frame.stream_frame.fin && state.fin_outstanding);
465 }
466 
HasUnackedCryptoData() const467 bool SimpleSessionNotifier::HasUnackedCryptoData() const {
468   if (QuicVersionUsesCryptoFrames(connection_->transport_version())) {
469     for (size_t i = 0; i < NUM_ENCRYPTION_LEVELS; ++i) {
470       const StreamState& state = crypto_state_[i];
471       if (state.bytes_total > state.bytes_sent) {
472         return true;
473       }
474       QuicIntervalSet<QuicStreamOffset> bytes_to_ack(0, state.bytes_total);
475       bytes_to_ack.Difference(state.bytes_acked);
476       if (!bytes_to_ack.Empty()) {
477         return true;
478       }
479     }
480     return false;
481   }
482   if (!stream_map_.contains(
483           QuicUtils::GetCryptoStreamId(connection_->transport_version()))) {
484     return false;
485   }
486   const auto& state =
487       stream_map_
488           .find(QuicUtils::GetCryptoStreamId(connection_->transport_version()))
489           ->second;
490   if (state.bytes_total > state.bytes_sent) {
491     return true;
492   }
493   QuicIntervalSet<QuicStreamOffset> bytes_to_ack(0, state.bytes_total);
494   bytes_to_ack.Difference(state.bytes_acked);
495   return !bytes_to_ack.Empty();
496 }
497 
HasUnackedStreamData() const498 bool SimpleSessionNotifier::HasUnackedStreamData() const {
499   for (const auto& it : stream_map_) {
500     if (StreamIsWaitingForAcks(it.first)) return true;
501   }
502   return false;
503 }
504 
OnControlFrameAcked(const QuicFrame & frame)505 bool SimpleSessionNotifier::OnControlFrameAcked(const QuicFrame& frame) {
506   QuicControlFrameId id = GetControlFrameId(frame);
507   if (id == kInvalidControlFrameId) {
508     return false;
509   }
510   QUICHE_DCHECK(id < least_unacked_ + control_frames_.size());
511   if (id < least_unacked_ ||
512       GetControlFrameId(control_frames_.at(id - least_unacked_)) ==
513           kInvalidControlFrameId) {
514     return false;
515   }
516   SetControlFrameId(kInvalidControlFrameId,
517                     &control_frames_.at(id - least_unacked_));
518   lost_control_frames_.erase(id);
519   while (!control_frames_.empty() &&
520          GetControlFrameId(control_frames_.front()) == kInvalidControlFrameId) {
521     DeleteFrame(&control_frames_.front());
522     control_frames_.pop_front();
523     ++least_unacked_;
524   }
525   return true;
526 }
527 
OnControlFrameLost(const QuicFrame & frame)528 void SimpleSessionNotifier::OnControlFrameLost(const QuicFrame& frame) {
529   QuicControlFrameId id = GetControlFrameId(frame);
530   if (id == kInvalidControlFrameId) {
531     return;
532   }
533   QUICHE_DCHECK(id < least_unacked_ + control_frames_.size());
534   if (id < least_unacked_ ||
535       GetControlFrameId(control_frames_.at(id - least_unacked_)) ==
536           kInvalidControlFrameId) {
537     return;
538   }
539   if (!lost_control_frames_.contains(id)) {
540     lost_control_frames_[id] = true;
541   }
542 }
543 
IsControlFrameOutstanding(const QuicFrame & frame) const544 bool SimpleSessionNotifier::IsControlFrameOutstanding(
545     const QuicFrame& frame) const {
546   QuicControlFrameId id = GetControlFrameId(frame);
547   if (id == kInvalidControlFrameId) {
548     return false;
549   }
550   return id < least_unacked_ + control_frames_.size() && id >= least_unacked_ &&
551          GetControlFrameId(control_frames_.at(id - least_unacked_)) !=
552              kInvalidControlFrameId;
553 }
554 
RetransmitLostControlFrames()555 bool SimpleSessionNotifier::RetransmitLostControlFrames() {
556   while (!lost_control_frames_.empty()) {
557     QuicFrame pending = control_frames_.at(lost_control_frames_.begin()->first -
558                                            least_unacked_);
559     QuicFrame copy = CopyRetransmittableControlFrame(pending);
560     connection_->SetTransmissionType(LOSS_RETRANSMISSION);
561     if (!connection_->SendControlFrame(copy)) {
562       // Connection is write blocked.
563       DeleteFrame(&copy);
564       break;
565     }
566     lost_control_frames_.pop_front();
567   }
568   return lost_control_frames_.empty();
569 }
570 
RetransmitLostCryptoData()571 bool SimpleSessionNotifier::RetransmitLostCryptoData() {
572   if (QuicVersionUsesCryptoFrames(connection_->transport_version())) {
573     for (EncryptionLevel level :
574          {ENCRYPTION_INITIAL, ENCRYPTION_HANDSHAKE, ENCRYPTION_ZERO_RTT,
575           ENCRYPTION_FORWARD_SECURE}) {
576       auto& state = crypto_state_[level];
577       while (!state.pending_retransmissions.Empty()) {
578         connection_->SetTransmissionType(HANDSHAKE_RETRANSMISSION);
579         EncryptionLevel current_encryption_level =
580             connection_->encryption_level();
581         connection_->SetDefaultEncryptionLevel(level);
582         QuicIntervalSet<QuicStreamOffset> retransmission(
583             state.pending_retransmissions.begin()->min(),
584             state.pending_retransmissions.begin()->max());
585         retransmission.Intersection(crypto_bytes_transferred_[level]);
586         QuicStreamOffset retransmission_offset = retransmission.begin()->min();
587         QuicByteCount retransmission_length =
588             retransmission.begin()->max() - retransmission.begin()->min();
589         size_t bytes_consumed = connection_->SendCryptoData(
590             level, retransmission_length, retransmission_offset);
591         // Restore encryption level.
592         connection_->SetDefaultEncryptionLevel(current_encryption_level);
593         state.pending_retransmissions.Difference(
594             retransmission_offset, retransmission_offset + bytes_consumed);
595         if (bytes_consumed < retransmission_length) {
596           return false;
597         }
598       }
599     }
600     return true;
601   }
602   if (!stream_map_.contains(
603           QuicUtils::GetCryptoStreamId(connection_->transport_version()))) {
604     return true;
605   }
606   auto& state =
607       stream_map_
608           .find(QuicUtils::GetCryptoStreamId(connection_->transport_version()))
609           ->second;
610   while (!state.pending_retransmissions.Empty()) {
611     connection_->SetTransmissionType(HANDSHAKE_RETRANSMISSION);
612     QuicIntervalSet<QuicStreamOffset> retransmission(
613         state.pending_retransmissions.begin()->min(),
614         state.pending_retransmissions.begin()->max());
615     EncryptionLevel retransmission_encryption_level = ENCRYPTION_INITIAL;
616     for (size_t i = 0; i < NUM_ENCRYPTION_LEVELS; ++i) {
617       if (retransmission.Intersects(crypto_bytes_transferred_[i])) {
618         retransmission_encryption_level = static_cast<EncryptionLevel>(i);
619         retransmission.Intersection(crypto_bytes_transferred_[i]);
620         break;
621       }
622     }
623     QuicStreamOffset retransmission_offset = retransmission.begin()->min();
624     QuicByteCount retransmission_length =
625         retransmission.begin()->max() - retransmission.begin()->min();
626     EncryptionLevel current_encryption_level = connection_->encryption_level();
627     // Set appropriate encryption level.
628     connection_->SetDefaultEncryptionLevel(retransmission_encryption_level);
629     QuicConsumedData consumed = connection_->SendStreamData(
630         QuicUtils::GetCryptoStreamId(connection_->transport_version()),
631         retransmission_length, retransmission_offset, NO_FIN);
632     // Restore encryption level.
633     connection_->SetDefaultEncryptionLevel(current_encryption_level);
634     state.pending_retransmissions.Difference(
635         retransmission_offset, retransmission_offset + consumed.bytes_consumed);
636     if (consumed.bytes_consumed < retransmission_length) {
637       break;
638     }
639   }
640   return state.pending_retransmissions.Empty();
641 }
642 
RetransmitLostStreamData()643 bool SimpleSessionNotifier::RetransmitLostStreamData() {
644   for (auto& pair : stream_map_) {
645     StreamState& state = pair.second;
646     QuicConsumedData consumed(0, false);
647     while (!state.pending_retransmissions.Empty() || state.fin_lost) {
648       connection_->SetTransmissionType(LOSS_RETRANSMISSION);
649       if (state.pending_retransmissions.Empty()) {
650         QUIC_DVLOG(1) << "stream " << pair.first
651                       << " retransmits fin only frame.";
652         consumed =
653             connection_->SendStreamData(pair.first, 0, state.bytes_sent, FIN);
654         state.fin_lost = !consumed.fin_consumed;
655         if (state.fin_lost) {
656           QUIC_DLOG(INFO) << "Connection is write blocked";
657           return false;
658         }
659       } else {
660         QuicStreamOffset offset = state.pending_retransmissions.begin()->min();
661         QuicByteCount length = state.pending_retransmissions.begin()->max() -
662                                state.pending_retransmissions.begin()->min();
663         const bool can_bundle_fin =
664             state.fin_lost && (offset + length == state.bytes_sent);
665         consumed = connection_->SendStreamData(pair.first, length, offset,
666                                                can_bundle_fin ? FIN : NO_FIN);
667         QUIC_DVLOG(1) << "stream " << pair.first
668                       << " tries to retransmit stream data [" << offset << ", "
669                       << offset + length << ") and fin: " << can_bundle_fin
670                       << ", consumed: " << consumed;
671         state.pending_retransmissions.Difference(
672             offset, offset + consumed.bytes_consumed);
673         if (consumed.fin_consumed) {
674           state.fin_lost = false;
675         }
676         if (length > consumed.bytes_consumed ||
677             (can_bundle_fin && !consumed.fin_consumed)) {
678           QUIC_DVLOG(1) << "Connection is write blocked";
679           break;
680         }
681       }
682     }
683   }
684   return !HasLostStreamData();
685 }
686 
WriteBufferedCryptoData()687 bool SimpleSessionNotifier::WriteBufferedCryptoData() {
688   for (size_t i = 0; i < NUM_ENCRYPTION_LEVELS; ++i) {
689     const StreamState& state = crypto_state_[i];
690     QuicIntervalSet<QuicStreamOffset> buffered_crypto_data(0,
691                                                            state.bytes_total);
692     buffered_crypto_data.Difference(crypto_bytes_transferred_[i]);
693     for (const auto& interval : buffered_crypto_data) {
694       size_t bytes_written = connection_->SendCryptoData(
695           static_cast<EncryptionLevel>(i), interval.Length(), interval.min());
696       crypto_state_[i].bytes_sent += bytes_written;
697       crypto_bytes_transferred_[i].Add(interval.min(),
698                                        interval.min() + bytes_written);
699       if (bytes_written < interval.Length()) {
700         return false;
701       }
702     }
703   }
704   return true;
705 }
706 
WriteBufferedControlFrames()707 bool SimpleSessionNotifier::WriteBufferedControlFrames() {
708   while (HasBufferedControlFrames()) {
709     QuicFrame frame_to_send =
710         control_frames_.at(least_unsent_ - least_unacked_);
711     QuicFrame copy = CopyRetransmittableControlFrame(frame_to_send);
712     connection_->SetTransmissionType(NOT_RETRANSMISSION);
713     if (!connection_->SendControlFrame(copy)) {
714       // Connection is write blocked.
715       DeleteFrame(&copy);
716       break;
717     }
718     ++least_unsent_;
719   }
720   return !HasBufferedControlFrames();
721 }
722 
HasBufferedControlFrames() const723 bool SimpleSessionNotifier::HasBufferedControlFrames() const {
724   return least_unsent_ < least_unacked_ + control_frames_.size();
725 }
726 
HasBufferedStreamData() const727 bool SimpleSessionNotifier::HasBufferedStreamData() const {
728   for (const auto& pair : stream_map_) {
729     const auto& state = pair.second;
730     if (state.bytes_total > state.bytes_sent ||
731         (state.fin_buffered && !state.fin_sent)) {
732       return true;
733     }
734   }
735   return false;
736 }
737 
StreamIsWaitingForAcks(QuicStreamId id) const738 bool SimpleSessionNotifier::StreamIsWaitingForAcks(QuicStreamId id) const {
739   if (!stream_map_.contains(id)) {
740     return false;
741   }
742   const StreamState& state = stream_map_.find(id)->second;
743   return !state.bytes_acked.Contains(0, state.bytes_sent) ||
744          state.fin_outstanding;
745 }
746 
StreamHasBufferedData(QuicStreamId id) const747 bool SimpleSessionNotifier::StreamHasBufferedData(QuicStreamId id) const {
748   if (!stream_map_.contains(id)) {
749     return false;
750   }
751   const StreamState& state = stream_map_.find(id)->second;
752   return state.bytes_total > state.bytes_sent ||
753          (state.fin_buffered && !state.fin_sent);
754 }
755 
HasLostStreamData() const756 bool SimpleSessionNotifier::HasLostStreamData() const {
757   for (const auto& pair : stream_map_) {
758     const auto& state = pair.second;
759     if (!state.pending_retransmissions.Empty() || state.fin_lost) {
760       return true;
761     }
762   }
763   return false;
764 }
765 
766 }  // namespace test
767 
768 }  // namespace quic
769