xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/core/quic_session.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/quic_session.h"
6 
7 #include <cstdint>
8 #include <string>
9 #include <utility>
10 
11 #include "absl/memory/memory.h"
12 #include "absl/strings/str_cat.h"
13 #include "absl/strings/string_view.h"
14 #include "quiche/quic/core/frames/quic_ack_frequency_frame.h"
15 #include "quiche/quic/core/frames/quic_window_update_frame.h"
16 #include "quiche/quic/core/quic_connection.h"
17 #include "quiche/quic/core/quic_connection_context.h"
18 #include "quiche/quic/core/quic_error_codes.h"
19 #include "quiche/quic/core/quic_flow_controller.h"
20 #include "quiche/quic/core/quic_stream_priority.h"
21 #include "quiche/quic/core/quic_types.h"
22 #include "quiche/quic/core/quic_utils.h"
23 #include "quiche/quic/core/quic_versions.h"
24 #include "quiche/quic/core/quic_write_blocked_list.h"
25 #include "quiche/quic/platform/api/quic_bug_tracker.h"
26 #include "quiche/quic/platform/api/quic_flag_utils.h"
27 #include "quiche/quic/platform/api/quic_flags.h"
28 #include "quiche/quic/platform/api/quic_logging.h"
29 #include "quiche/quic/platform/api/quic_server_stats.h"
30 #include "quiche/quic/platform/api/quic_stack_trace.h"
31 #include "quiche/common/platform/api/quiche_logging.h"
32 #include "quiche/common/quiche_callbacks.h"
33 #include "quiche/common/quiche_text_utils.h"
34 
35 namespace quic {
36 
37 namespace {
38 
39 class ClosedStreamsCleanUpDelegate : public QuicAlarm::Delegate {
40  public:
ClosedStreamsCleanUpDelegate(QuicSession * session)41   explicit ClosedStreamsCleanUpDelegate(QuicSession* session)
42       : session_(session) {}
43   ClosedStreamsCleanUpDelegate(const ClosedStreamsCleanUpDelegate&) = delete;
44   ClosedStreamsCleanUpDelegate& operator=(const ClosedStreamsCleanUpDelegate&) =
45       delete;
46 
GetConnectionContext()47   QuicConnectionContext* GetConnectionContext() override {
48     return (session_->connection() == nullptr)
49                ? nullptr
50                : session_->connection()->context();
51   }
52 
OnAlarm()53   void OnAlarm() override { session_->CleanUpClosedStreams(); }
54 
55  private:
56   QuicSession* session_;
57 };
58 
59 class StreamCountResetAlarmDelegate : public QuicAlarm::Delegate {
60  public:
StreamCountResetAlarmDelegate(QuicSession * session)61   explicit StreamCountResetAlarmDelegate(QuicSession* session)
62       : session_(session) {}
63   StreamCountResetAlarmDelegate(const StreamCountResetAlarmDelegate&) = delete;
64   StreamCountResetAlarmDelegate& operator=(
65       const StreamCountResetAlarmDelegate&) = delete;
66 
GetConnectionContext()67   QuicConnectionContext* GetConnectionContext() override {
68     return (session_->connection() == nullptr)
69                ? nullptr
70                : session_->connection()->context();
71   }
72 
OnAlarm()73   void OnAlarm() override { session_->OnStreamCountReset(); }
74 
75  private:
76   QuicSession* session_;
77 };
78 
79 }  // namespace
80 
81 #define ENDPOINT \
82   (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
83 
QuicSession(QuicConnection * connection,Visitor * owner,const QuicConfig & config,const ParsedQuicVersionVector & supported_versions,QuicStreamCount num_expected_unidirectional_static_streams)84 QuicSession::QuicSession(
85     QuicConnection* connection, Visitor* owner, const QuicConfig& config,
86     const ParsedQuicVersionVector& supported_versions,
87     QuicStreamCount num_expected_unidirectional_static_streams)
88     : QuicSession(connection, owner, config, supported_versions,
89                   num_expected_unidirectional_static_streams, nullptr) {}
90 
QuicSession(QuicConnection * connection,Visitor * owner,const QuicConfig & config,const ParsedQuicVersionVector & supported_versions,QuicStreamCount num_expected_unidirectional_static_streams,std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)91 QuicSession::QuicSession(
92     QuicConnection* connection, Visitor* owner, const QuicConfig& config,
93     const ParsedQuicVersionVector& supported_versions,
94     QuicStreamCount num_expected_unidirectional_static_streams,
95     std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)
96     : connection_(connection),
97       perspective_(connection->perspective()),
98       visitor_(owner),
99       write_blocked_streams_(std::make_unique<QuicWriteBlockedList>()),
100       config_(config),
101       stream_id_manager_(perspective(), connection->transport_version(),
102                          kDefaultMaxStreamsPerConnection,
103                          config_.GetMaxBidirectionalStreamsToSend()),
104       ietf_streamid_manager_(perspective(), connection->version(), this, 0,
105                              num_expected_unidirectional_static_streams,
106                              config_.GetMaxBidirectionalStreamsToSend(),
107                              config_.GetMaxUnidirectionalStreamsToSend() +
108                                  num_expected_unidirectional_static_streams),
109       num_draining_streams_(0),
110       num_outgoing_draining_streams_(0),
111       num_static_streams_(0),
112       num_zombie_streams_(0),
113       flow_controller_(
114           this, QuicUtils::GetInvalidStreamId(connection->transport_version()),
115           /*is_connection_flow_controller*/ true,
116           connection->version().AllowsLowFlowControlLimits()
117               ? 0
118               : kMinimumFlowControlSendWindow,
119           config_.GetInitialSessionFlowControlWindowToSend(),
120           kSessionReceiveWindowLimit, perspective() == Perspective::IS_SERVER,
121           nullptr),
122       currently_writing_stream_id_(0),
123       transport_goaway_sent_(false),
124       transport_goaway_received_(false),
125       control_frame_manager_(this),
126       last_message_id_(0),
127       datagram_queue_(this, std::move(datagram_observer)),
128       closed_streams_clean_up_alarm_(nullptr),
129       supported_versions_(supported_versions),
130       is_configured_(false),
131       was_zero_rtt_rejected_(false),
132       liveness_testing_in_progress_(false),
133       stream_count_reset_alarm_(
134           absl::WrapUnique<QuicAlarm>(connection->alarm_factory()->CreateAlarm(
135               new StreamCountResetAlarmDelegate(this)))) {
136   closed_streams_clean_up_alarm_ =
137       absl::WrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
138           new ClosedStreamsCleanUpDelegate(this)));
139   if (VersionHasIetfQuicFrames(transport_version())) {
140     config_.SetMaxUnidirectionalStreamsToSend(
141         config_.GetMaxUnidirectionalStreamsToSend() +
142         num_expected_unidirectional_static_streams);
143   }
144 }
145 
Initialize()146 void QuicSession::Initialize() {
147   connection_->set_visitor(this);
148   connection_->SetSessionNotifier(this);
149   connection_->SetDataProducer(this);
150   connection_->SetUnackedMapInitialCapacity();
151   connection_->SetFromConfig(config_);
152   if (perspective_ == Perspective::IS_CLIENT) {
153     if (config_.HasClientRequestedIndependentOption(kAFFE, perspective_) &&
154         version().HasIetfQuicFrames()) {
155       connection_->set_can_receive_ack_frequency_frame();
156       config_.SetMinAckDelayMs(kDefaultMinAckDelayTimeMs);
157     }
158   }
159   if (perspective() == Perspective::IS_SERVER &&
160       connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
161     config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
162   }
163 
164   connection_->CreateConnectionIdManager();
165 
166   // On the server side, version negotiation has been done by the dispatcher,
167   // and the server session is created with the right version.
168   if (perspective() == Perspective::IS_SERVER) {
169     connection_->OnSuccessfulVersionNegotiation();
170   }
171 
172   if (QuicVersionUsesCryptoFrames(transport_version())) {
173     return;
174   }
175 
176   QUICHE_DCHECK_EQ(QuicUtils::GetCryptoStreamId(transport_version()),
177                    GetMutableCryptoStream()->id());
178 }
179 
~QuicSession()180 QuicSession::~QuicSession() {
181   if (closed_streams_clean_up_alarm_ != nullptr) {
182     closed_streams_clean_up_alarm_->PermanentCancel();
183   }
184   if (stream_count_reset_alarm_ != nullptr) {
185     stream_count_reset_alarm_->PermanentCancel();
186   }
187 }
188 
PendingStreamOnStreamFrame(const QuicStreamFrame & frame)189 PendingStream* QuicSession::PendingStreamOnStreamFrame(
190     const QuicStreamFrame& frame) {
191   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
192   QuicStreamId stream_id = frame.stream_id;
193 
194   PendingStream* pending = GetOrCreatePendingStream(stream_id);
195 
196   if (!pending) {
197     if (frame.fin) {
198       QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
199       OnFinalByteOffsetReceived(stream_id, final_byte_offset);
200     }
201     return nullptr;
202   }
203 
204   pending->OnStreamFrame(frame);
205   if (!connection()->connected()) {
206     return nullptr;
207   }
208   return pending;
209 }
210 
MaybeProcessPendingStream(PendingStream * pending)211 bool QuicSession::MaybeProcessPendingStream(PendingStream* pending) {
212   QUICHE_DCHECK(pending != nullptr && connection()->connected());
213 
214   if (ExceedsPerLoopStreamLimit()) {
215     QUIC_DLOG(INFO) << "Skip processing pending stream " << pending->id()
216                     << " because it exceeds per loop limit.";
217     QUIC_CODE_COUNT_N(quic_pending_stream, 1, 3);
218     return false;
219   }
220 
221   QuicStreamId stream_id = pending->id();
222   std::optional<QuicResetStreamError> stop_sending_error_code =
223       pending->GetStopSendingErrorCode();
224   QUIC_DLOG(INFO) << "Process pending stream " << pending->id();
225   QuicStream* stream = ProcessPendingStream(pending);
226   if (stream != nullptr) {
227     // The pending stream should now be in the scope of normal streams.
228     QUICHE_DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
229         << "Stream " << stream_id << " not created";
230     if (!stream->pending_duration().IsZero()) {
231       QUIC_SERVER_HISTOGRAM_TIMES("QuicStream.PendingDurationUs",
232                                   stream->pending_duration().ToMicroseconds(),
233                                   0, 1000 * 100, 20,
234                                   "Time a stream has been pending at server.");
235       ++connection()->mutable_stats().num_total_pending_streams;
236     }
237     pending_stream_map_.erase(stream_id);
238     if (stop_sending_error_code) {
239       stream->OnStopSending(*stop_sending_error_code);
240       if (!connection()->connected()) {
241         return false;
242       }
243     }
244     stream->OnStreamCreatedFromPendingStream();
245     return connection()->connected();
246   }
247   // At this point, none of the bytes has been successfully consumed by the
248   // application layer. We should close the pending stream even if it is
249   // bidirectionl as no application will be able to write in a bidirectional
250   // stream with zero byte as input.
251   if (pending->sequencer()->IsClosed()) {
252     ClosePendingStream(stream_id);
253   }
254   return connection()->connected();
255 }
256 
PendingStreamOnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)257 void QuicSession::PendingStreamOnWindowUpdateFrame(
258     const QuicWindowUpdateFrame& frame) {
259   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
260   PendingStream* pending = GetOrCreatePendingStream(frame.stream_id);
261   if (pending) {
262     pending->OnWindowUpdateFrame(frame);
263   }
264 }
265 
PendingStreamOnStopSendingFrame(const QuicStopSendingFrame & frame)266 void QuicSession::PendingStreamOnStopSendingFrame(
267     const QuicStopSendingFrame& frame) {
268   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
269   PendingStream* pending = GetOrCreatePendingStream(frame.stream_id);
270   if (pending) {
271     pending->OnStopSending(frame.error());
272   }
273 }
274 
OnStreamFrame(const QuicStreamFrame & frame)275 void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
276   QuicStreamId stream_id = frame.stream_id;
277   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
278     connection()->CloseConnection(
279         QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
280         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
281     return;
282   }
283 
284   if (ShouldProcessFrameByPendingStream(STREAM_FRAME, stream_id)) {
285     PendingStream* pending = PendingStreamOnStreamFrame(frame);
286     if (pending != nullptr && IsEncryptionEstablished()) {
287       MaybeProcessPendingStream(pending);
288     }
289     return;
290   }
291 
292   QuicStream* stream = GetOrCreateStream(stream_id);
293 
294   if (!stream) {
295     // The stream no longer exists, but we may still be interested in the
296     // final stream byte offset sent by the peer. A frame with a FIN can give
297     // us this offset.
298     if (frame.fin) {
299       QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
300       OnFinalByteOffsetReceived(stream_id, final_byte_offset);
301     }
302     return;
303   }
304   stream->OnStreamFrame(frame);
305 }
306 
OnCryptoFrame(const QuicCryptoFrame & frame)307 void QuicSession::OnCryptoFrame(const QuicCryptoFrame& frame) {
308   GetMutableCryptoStream()->OnCryptoFrame(frame);
309 }
310 
OnStopSendingFrame(const QuicStopSendingFrame & frame)311 void QuicSession::OnStopSendingFrame(const QuicStopSendingFrame& frame) {
312   // STOP_SENDING is in IETF QUIC only.
313   QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
314   QUICHE_DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
315 
316   QuicStreamId stream_id = frame.stream_id;
317   // If Stream ID is invalid then close the connection.
318   // TODO(ianswett): This check is redundant to checks for IsClosedStream,
319   // but removing it requires removing multiple QUICHE_DCHECKs.
320   // TODO(ianswett): Multiple QUIC_DVLOGs could be QUIC_PEER_BUGs.
321   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
322     QUIC_DVLOG(1) << ENDPOINT
323                   << "Received STOP_SENDING with invalid stream_id: "
324                   << stream_id << " Closing connection";
325     connection()->CloseConnection(
326         QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for an invalid stream",
327         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
328     return;
329   }
330 
331   // If stream_id is READ_UNIDIRECTIONAL, close the connection.
332   if (QuicUtils::GetStreamType(stream_id, perspective(),
333                                IsIncomingStream(stream_id),
334                                version()) == READ_UNIDIRECTIONAL) {
335     QUIC_DVLOG(1) << ENDPOINT
336                   << "Received STOP_SENDING for a read-only stream_id: "
337                   << stream_id << ".";
338     connection()->CloseConnection(
339         QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a read-only stream",
340         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
341     return;
342   }
343 
344   if (visitor_) {
345     visitor_->OnStopSendingReceived(frame);
346   }
347   if (ShouldProcessFrameByPendingStream(STOP_SENDING_FRAME, stream_id)) {
348     PendingStreamOnStopSendingFrame(frame);
349     return;
350   }
351 
352   QuicStream* stream = GetOrCreateStream(stream_id);
353   if (!stream) {
354     // Errors are handled by GetOrCreateStream.
355     return;
356   }
357 
358   stream->OnStopSending(frame.error());
359 }
360 
OnPacketDecrypted(EncryptionLevel level)361 void QuicSession::OnPacketDecrypted(EncryptionLevel level) {
362   GetMutableCryptoStream()->OnPacketDecrypted(level);
363   if (liveness_testing_in_progress_) {
364     liveness_testing_in_progress_ = false;
365     OnCanCreateNewOutgoingStream(/*unidirectional=*/false);
366   }
367 }
368 
OnOneRttPacketAcknowledged()369 void QuicSession::OnOneRttPacketAcknowledged() {
370   GetMutableCryptoStream()->OnOneRttPacketAcknowledged();
371 }
372 
OnHandshakePacketSent()373 void QuicSession::OnHandshakePacketSent() {
374   GetMutableCryptoStream()->OnHandshakePacketSent();
375 }
376 
377 std::unique_ptr<QuicDecrypter>
AdvanceKeysAndCreateCurrentOneRttDecrypter()378 QuicSession::AdvanceKeysAndCreateCurrentOneRttDecrypter() {
379   return GetMutableCryptoStream()->AdvanceKeysAndCreateCurrentOneRttDecrypter();
380 }
381 
CreateCurrentOneRttEncrypter()382 std::unique_ptr<QuicEncrypter> QuicSession::CreateCurrentOneRttEncrypter() {
383   return GetMutableCryptoStream()->CreateCurrentOneRttEncrypter();
384 }
385 
PendingStreamOnRstStream(const QuicRstStreamFrame & frame)386 void QuicSession::PendingStreamOnRstStream(const QuicRstStreamFrame& frame) {
387   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
388   QuicStreamId stream_id = frame.stream_id;
389 
390   PendingStream* pending = GetOrCreatePendingStream(stream_id);
391 
392   if (!pending) {
393     HandleRstOnValidNonexistentStream(frame);
394     return;
395   }
396 
397   pending->OnRstStreamFrame(frame);
398   // At this point, none of the bytes has been consumed by the application
399   // layer. It is safe to close the pending stream even if it is bidirectionl as
400   // no application will be able to write in a bidirectional stream with zero
401   // byte as input.
402   ClosePendingStream(stream_id);
403 }
404 
OnRstStream(const QuicRstStreamFrame & frame)405 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
406   QuicStreamId stream_id = frame.stream_id;
407   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
408     connection()->CloseConnection(
409         QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
410         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
411     return;
412   }
413 
414   if (VersionHasIetfQuicFrames(transport_version()) &&
415       QuicUtils::GetStreamType(stream_id, perspective(),
416                                IsIncomingStream(stream_id),
417                                version()) == WRITE_UNIDIRECTIONAL) {
418     connection()->CloseConnection(
419         QUIC_INVALID_STREAM_ID, "Received RESET_STREAM for a write-only stream",
420         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
421     return;
422   }
423 
424   if (visitor_) {
425     visitor_->OnRstStreamReceived(frame);
426   }
427 
428   if (ShouldProcessFrameByPendingStream(RST_STREAM_FRAME, stream_id)) {
429     PendingStreamOnRstStream(frame);
430     return;
431   }
432 
433   QuicStream* stream = GetOrCreateStream(stream_id);
434 
435   if (!stream) {
436     HandleRstOnValidNonexistentStream(frame);
437     return;  // Errors are handled by GetOrCreateStream.
438   }
439   stream->OnStreamReset(frame);
440 }
441 
OnGoAway(const QuicGoAwayFrame &)442 void QuicSession::OnGoAway(const QuicGoAwayFrame& /*frame*/) {
443   QUIC_BUG_IF(quic_bug_12435_1, version().UsesHttp3())
444       << "gQUIC GOAWAY received on version " << version();
445 
446   transport_goaway_received_ = true;
447 }
448 
OnMessageReceived(absl::string_view message)449 void QuicSession::OnMessageReceived(absl::string_view message) {
450   QUIC_DVLOG(1) << ENDPOINT << "Received message of length "
451                 << message.length();
452   QUIC_DVLOG(2) << ENDPOINT << "Contents of message of length "
453                 << message.length() << ":" << std::endl
454                 << quiche::QuicheTextUtils::HexDump(message);
455 }
456 
OnHandshakeDoneReceived()457 void QuicSession::OnHandshakeDoneReceived() {
458   QUIC_DVLOG(1) << ENDPOINT << "OnHandshakeDoneReceived";
459   GetMutableCryptoStream()->OnHandshakeDoneReceived();
460 }
461 
OnNewTokenReceived(absl::string_view token)462 void QuicSession::OnNewTokenReceived(absl::string_view token) {
463   QUICHE_DCHECK_EQ(perspective_, Perspective::IS_CLIENT);
464   GetMutableCryptoStream()->OnNewTokenReceived(token);
465 }
466 
467 // static
RecordConnectionCloseAtServer(QuicErrorCode error,ConnectionCloseSource source)468 void QuicSession::RecordConnectionCloseAtServer(QuicErrorCode error,
469                                                 ConnectionCloseSource source) {
470   if (error != QUIC_NO_ERROR) {
471     if (source == ConnectionCloseSource::FROM_SELF) {
472       QUIC_SERVER_HISTOGRAM_ENUM(
473           "quic_server_connection_close_errors", error, QUIC_LAST_ERROR,
474           "QuicErrorCode for server-closed connections.");
475     } else {
476       QUIC_SERVER_HISTOGRAM_ENUM(
477           "quic_client_connection_close_errors", error, QUIC_LAST_ERROR,
478           "QuicErrorCode for client-closed connections.");
479     }
480   }
481 }
482 
OnConnectionClosed(const QuicConnectionCloseFrame & frame,ConnectionCloseSource source)483 void QuicSession::OnConnectionClosed(const QuicConnectionCloseFrame& frame,
484                                      ConnectionCloseSource source) {
485   QUICHE_DCHECK(!connection_->connected());
486   if (perspective() == Perspective::IS_SERVER) {
487     RecordConnectionCloseAtServer(frame.quic_error_code, source);
488   }
489 
490   if (on_closed_frame_.quic_error_code == QUIC_NO_ERROR) {
491     // Save all of the connection close information
492     on_closed_frame_ = frame;
493     source_ = source;
494   }
495 
496   GetMutableCryptoStream()->OnConnectionClosed(frame.quic_error_code, source);
497 
498   PerformActionOnActiveStreams([this, frame, source](QuicStream* stream) {
499     QuicStreamId id = stream->id();
500     stream->OnConnectionClosed(frame.quic_error_code, source);
501     auto it = stream_map_.find(id);
502     if (it != stream_map_.end()) {
503       QUIC_BUG_IF(quic_bug_12435_2, !it->second->IsZombie())
504           << ENDPOINT << "Non-zombie stream " << id
505           << " failed to close under OnConnectionClosed";
506     }
507     return true;
508   });
509 
510   closed_streams_clean_up_alarm_->Cancel();
511   stream_count_reset_alarm_->Cancel();
512 
513   if (visitor_) {
514     visitor_->OnConnectionClosed(connection_->GetOneActiveServerConnectionId(),
515                                  frame.quic_error_code, frame.error_details,
516                                  source);
517   }
518 }
519 
OnWriteBlocked()520 void QuicSession::OnWriteBlocked() {
521   if (!connection_->connected()) {
522     return;
523   }
524   if (visitor_) {
525     visitor_->OnWriteBlocked(connection_);
526   }
527 }
528 
OnSuccessfulVersionNegotiation(const ParsedQuicVersion &)529 void QuicSession::OnSuccessfulVersionNegotiation(
530     const ParsedQuicVersion& /*version*/) {}
531 
OnPacketReceived(const QuicSocketAddress &,const QuicSocketAddress & peer_address,bool is_connectivity_probe)532 void QuicSession::OnPacketReceived(const QuicSocketAddress& /*self_address*/,
533                                    const QuicSocketAddress& peer_address,
534                                    bool is_connectivity_probe) {
535   QUICHE_DCHECK(!connection_->ignore_gquic_probing());
536   if (is_connectivity_probe && perspective() == Perspective::IS_SERVER) {
537     // Server only sends back a connectivity probe after received a
538     // connectivity probe from a new peer address.
539     connection_->SendConnectivityProbingPacket(nullptr, peer_address);
540   }
541 }
542 
OnPathDegrading()543 void QuicSession::OnPathDegrading() {
544   if (visitor_) {
545     visitor_->OnPathDegrading();
546   }
547 }
548 
OnForwardProgressMadeAfterPathDegrading()549 void QuicSession::OnForwardProgressMadeAfterPathDegrading() {}
550 
AllowSelfAddressChange() const551 bool QuicSession::AllowSelfAddressChange() const { return false; }
552 
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)553 void QuicSession::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
554   // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
555   // assume that it still exists.
556   QuicStreamId stream_id = frame.stream_id;
557   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
558     // This is a window update that applies to the connection, rather than an
559     // individual stream.
560     QUIC_DVLOG(1) << ENDPOINT
561                   << "Received connection level flow control window "
562                      "update with max data: "
563                   << frame.max_data;
564     flow_controller_.UpdateSendWindowOffset(frame.max_data);
565     return;
566   }
567 
568   if (VersionHasIetfQuicFrames(transport_version()) &&
569       QuicUtils::GetStreamType(stream_id, perspective(),
570                                IsIncomingStream(stream_id),
571                                version()) == READ_UNIDIRECTIONAL) {
572     connection()->CloseConnection(
573         QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
574         "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.",
575         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
576     return;
577   }
578 
579   if (ShouldProcessFrameByPendingStream(WINDOW_UPDATE_FRAME, stream_id)) {
580     PendingStreamOnWindowUpdateFrame(frame);
581     return;
582   }
583 
584   QuicStream* stream = GetOrCreateStream(stream_id);
585   if (stream != nullptr) {
586     stream->OnWindowUpdateFrame(frame);
587   }
588 }
589 
OnBlockedFrame(const QuicBlockedFrame & frame)590 void QuicSession::OnBlockedFrame(const QuicBlockedFrame& frame) {
591   // TODO(rjshade): Compare our flow control receive windows for specified
592   //                streams: if we have a large window then maybe something
593   //                had gone wrong with the flow control accounting.
594   QUIC_DLOG(INFO) << ENDPOINT << "Received BLOCKED frame with stream id: "
595                   << frame.stream_id << ", offset: " << frame.offset;
596 }
597 
CheckStreamNotBusyLooping(QuicStream * stream,uint64_t previous_bytes_written,bool previous_fin_sent)598 bool QuicSession::CheckStreamNotBusyLooping(QuicStream* stream,
599                                             uint64_t previous_bytes_written,
600                                             bool previous_fin_sent) {
601   if (  // Stream should not be closed.
602       !stream->write_side_closed() &&
603       // Not connection flow control blocked.
604       !flow_controller_.IsBlocked() &&
605       // Detect lack of forward progress.
606       previous_bytes_written == stream->stream_bytes_written() &&
607       previous_fin_sent == stream->fin_sent()) {
608     stream->set_busy_counter(stream->busy_counter() + 1);
609     QUIC_DVLOG(1) << ENDPOINT << "Suspected busy loop on stream id "
610                   << stream->id() << " stream_bytes_written "
611                   << stream->stream_bytes_written() << " fin "
612                   << stream->fin_sent() << " count " << stream->busy_counter();
613     // Wait a few iterations before firing, the exact count is
614     // arbitrary, more than a few to cover a few test-only false
615     // positives.
616     if (stream->busy_counter() > 20) {
617       QUIC_LOG(ERROR) << ENDPOINT << "Detected busy loop on stream id "
618                       << stream->id() << " stream_bytes_written "
619                       << stream->stream_bytes_written() << " fin "
620                       << stream->fin_sent();
621       return false;
622     }
623   } else {
624     stream->set_busy_counter(0);
625   }
626   return true;
627 }
628 
CheckStreamWriteBlocked(QuicStream * stream) const629 bool QuicSession::CheckStreamWriteBlocked(QuicStream* stream) const {
630   if (!stream->write_side_closed() && stream->HasBufferedData() &&
631       !stream->IsFlowControlBlocked() &&
632       !write_blocked_streams_->IsStreamBlocked(stream->id())) {
633     QUIC_DLOG(ERROR) << ENDPOINT << "stream " << stream->id()
634                      << " has buffered " << stream->BufferedDataBytes()
635                      << " bytes, and is not flow control blocked, "
636                         "but it is not in the write block list.";
637     return false;
638   }
639   return true;
640 }
641 
OnCanWrite()642 void QuicSession::OnCanWrite() {
643   if (connection_->framer().is_processing_packet()) {
644     // Do not write data in the middle of packet processing because rest
645     // frames in the packet may change the data to write. For example, lost
646     // data could be acknowledged. Also, connection is going to emit
647     // OnCanWrite signal post packet processing.
648     QUIC_BUG(session_write_mid_packet_processing)
649         << ENDPOINT << "Try to write mid packet processing.";
650     return;
651   }
652   if (!RetransmitLostData()) {
653     // Cannot finish retransmitting lost data, connection is write blocked.
654     QUIC_DVLOG(1) << ENDPOINT
655                   << "Cannot finish retransmitting lost data, connection is "
656                      "write blocked.";
657     return;
658   }
659   // We limit the number of writes to the number of pending streams. If more
660   // streams become pending, WillingAndAbleToWrite will be true, which will
661   // cause the connection to request resumption before yielding to other
662   // connections.
663   // If we are connection level flow control blocked, then only allow the
664   // crypto and headers streams to try writing as all other streams will be
665   // blocked.
666   size_t num_writes = flow_controller_.IsBlocked()
667                           ? write_blocked_streams_->NumBlockedSpecialStreams()
668                           : write_blocked_streams_->NumBlockedStreams();
669   if (num_writes == 0 && !control_frame_manager_.WillingToWrite() &&
670       datagram_queue_.empty() &&
671       (!QuicVersionUsesCryptoFrames(transport_version()) ||
672        !GetCryptoStream()->HasBufferedCryptoFrames())) {
673     return;
674   }
675 
676   QuicConnection::ScopedPacketFlusher flusher(connection_);
677   if (QuicVersionUsesCryptoFrames(transport_version())) {
678     QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
679     if (crypto_stream->HasBufferedCryptoFrames()) {
680       crypto_stream->WriteBufferedCryptoFrames();
681     }
682     if ((GetQuicReloadableFlag(
683              quic_no_write_control_frame_upon_connection_close) &&
684          !connection_->connected()) ||
685         crypto_stream->HasBufferedCryptoFrames()) {
686       if (!connection_->connected()) {
687         QUIC_RELOADABLE_FLAG_COUNT(
688             quic_no_write_control_frame_upon_connection_close);
689       }
690       // Cannot finish writing buffered crypto frames, connection is either
691       // write blocked or closed.
692       return;
693     }
694   }
695   if (control_frame_manager_.WillingToWrite()) {
696     control_frame_manager_.OnCanWrite();
697   }
698   if (version().UsesTls() && GetHandshakeState() != HANDSHAKE_CONFIRMED &&
699       connection_->in_probe_time_out()) {
700     QUIC_CODE_COUNT(quic_donot_pto_stream_data_before_handshake_confirmed);
701     // Do not PTO stream data before handshake gets confirmed.
702     return;
703   }
704   // TODO(b/147146815): this makes all datagrams go before stream data.  We
705   // should have a better priority scheme for this.
706   if (!datagram_queue_.empty()) {
707     size_t written = datagram_queue_.SendDatagrams();
708     QUIC_DVLOG(1) << ENDPOINT << "Sent " << written << " datagrams";
709     if (!datagram_queue_.empty()) {
710       return;
711     }
712   }
713   std::vector<QuicStreamId> last_writing_stream_ids;
714   for (size_t i = 0; i < num_writes; ++i) {
715     if (!(write_blocked_streams_->HasWriteBlockedSpecialStream() ||
716           write_blocked_streams_->HasWriteBlockedDataStreams())) {
717       // Writing one stream removed another!? Something's broken.
718       QUIC_BUG(quic_bug_10866_1)
719           << "WriteBlockedStream is missing, num_writes: " << num_writes
720           << ", finished_writes: " << i
721           << ", connected: " << connection_->connected()
722           << ", connection level flow control blocked: "
723           << flow_controller_.IsBlocked();
724       for (QuicStreamId id : last_writing_stream_ids) {
725         QUIC_LOG(WARNING) << "last_writing_stream_id: " << id;
726       }
727       connection_->CloseConnection(QUIC_INTERNAL_ERROR,
728                                    "WriteBlockedStream is missing",
729                                    ConnectionCloseBehavior::SILENT_CLOSE);
730       return;
731     }
732     if (!CanWriteStreamData()) {
733       return;
734     }
735     currently_writing_stream_id_ = write_blocked_streams_->PopFront();
736     last_writing_stream_ids.push_back(currently_writing_stream_id_);
737     QUIC_DVLOG(1) << ENDPOINT << "Removing stream "
738                   << currently_writing_stream_id_ << " from write-blocked list";
739     QuicStream* stream = GetOrCreateStream(currently_writing_stream_id_);
740     if (stream != nullptr && !stream->IsFlowControlBlocked()) {
741       // If the stream can't write all bytes it'll re-add itself to the blocked
742       // list.
743       uint64_t previous_bytes_written = stream->stream_bytes_written();
744       bool previous_fin_sent = stream->fin_sent();
745       QUIC_DVLOG(1) << ENDPOINT << "stream " << stream->id()
746                     << " bytes_written " << previous_bytes_written << " fin "
747                     << previous_fin_sent;
748       stream->OnCanWrite();
749       QUICHE_DCHECK(CheckStreamWriteBlocked(stream));
750       QUICHE_DCHECK(CheckStreamNotBusyLooping(stream, previous_bytes_written,
751                                               previous_fin_sent));
752     }
753     currently_writing_stream_id_ = 0;
754   }
755 }
756 
WillingAndAbleToWrite() const757 bool QuicSession::WillingAndAbleToWrite() const {
758   // Schedule a write when:
759   // 1) control frame manager has pending or new control frames, or
760   // 2) any stream has pending retransmissions, or
761   // 3) If the crypto or headers streams are blocked, or
762   // 4) connection is not flow control blocked and there are write blocked
763   // streams.
764   if (QuicVersionUsesCryptoFrames(transport_version())) {
765     if (HasPendingHandshake()) {
766       return true;
767     }
768     if (!IsEncryptionEstablished()) {
769       return false;
770     }
771   }
772   if (control_frame_manager_.WillingToWrite() ||
773       !streams_with_pending_retransmission_.empty()) {
774     return true;
775   }
776   if (flow_controller_.IsBlocked()) {
777     if (VersionUsesHttp3(transport_version())) {
778       return false;
779     }
780     // Crypto and headers streams are not blocked by connection level flow
781     // control.
782     return write_blocked_streams_->HasWriteBlockedSpecialStream();
783   }
784   return write_blocked_streams_->HasWriteBlockedSpecialStream() ||
785          write_blocked_streams_->HasWriteBlockedDataStreams();
786 }
787 
GetStreamsInfoForLogging() const788 std::string QuicSession::GetStreamsInfoForLogging() const {
789   std::string info = absl::StrCat(
790       "num_active_streams: ", GetNumActiveStreams(),
791       ", num_pending_streams: ", pending_streams_size(),
792       ", num_outgoing_draining_streams: ", num_outgoing_draining_streams(),
793       " ");
794   // Log info for up to 5 streams.
795   size_t i = 5;
796   for (const auto& it : stream_map_) {
797     if (it.second->is_static()) {
798       continue;
799     }
800     // Calculate the stream creation delay.
801     const QuicTime::Delta delay =
802         connection_->clock()->ApproximateNow() - it.second->creation_time();
803     absl::StrAppend(
804         &info, "{", it.second->id(), ":", delay.ToDebuggingValue(), ";",
805         it.second->stream_bytes_written(), ",", it.second->fin_sent(), ",",
806         it.second->HasBufferedData(), ",", it.second->fin_buffered(), ";",
807         it.second->stream_bytes_read(), ",", it.second->fin_received(), "}");
808     --i;
809     if (i == 0) {
810       break;
811     }
812   }
813   return info;
814 }
815 
HasPendingHandshake() const816 bool QuicSession::HasPendingHandshake() const {
817   if (QuicVersionUsesCryptoFrames(transport_version())) {
818     return GetCryptoStream()->HasPendingCryptoRetransmission() ||
819            GetCryptoStream()->HasBufferedCryptoFrames();
820   }
821   return streams_with_pending_retransmission_.contains(
822              QuicUtils::GetCryptoStreamId(transport_version())) ||
823          write_blocked_streams_->IsStreamBlocked(
824              QuicUtils::GetCryptoStreamId(transport_version()));
825 }
826 
ProcessUdpPacket(const QuicSocketAddress & self_address,const QuicSocketAddress & peer_address,const QuicReceivedPacket & packet)827 void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
828                                    const QuicSocketAddress& peer_address,
829                                    const QuicReceivedPacket& packet) {
830   QuicConnectionContextSwitcher cs(connection_->context());
831   connection_->ProcessUdpPacket(self_address, peer_address, packet);
832 }
833 
on_closed_frame_string() const834 std::string QuicSession::on_closed_frame_string() const {
835   std::stringstream ss;
836   ss << on_closed_frame_;
837   if (source_.has_value()) {
838     ss << " " << ConnectionCloseSourceToString(*source_);
839   }
840   return ss.str();
841 }
842 
WritevData(QuicStreamId id,size_t write_length,QuicStreamOffset offset,StreamSendingState state,TransmissionType type,EncryptionLevel level)843 QuicConsumedData QuicSession::WritevData(QuicStreamId id, size_t write_length,
844                                          QuicStreamOffset offset,
845                                          StreamSendingState state,
846                                          TransmissionType type,
847                                          EncryptionLevel level) {
848   QUIC_BUG_IF(session writevdata when disconnected, !connection()->connected())
849       << ENDPOINT << "Try to write stream data when connection is closed: "
850       << on_closed_frame_string();
851   if (!IsEncryptionEstablished() &&
852       !QuicUtils::IsCryptoStreamId(transport_version(), id)) {
853     // Do not let streams write without encryption. The calling stream will end
854     // up write blocked until OnCanWrite is next called.
855     if (was_zero_rtt_rejected_ && !OneRttKeysAvailable()) {
856       QUICHE_DCHECK(version().UsesTls() &&
857                     perspective() == Perspective::IS_CLIENT);
858       QUIC_DLOG(INFO) << ENDPOINT
859                       << "Suppress the write while 0-RTT gets rejected and "
860                          "1-RTT keys are not available. Version: "
861                       << ParsedQuicVersionToString(version());
862     } else if (version().UsesTls() || perspective() == Perspective::IS_SERVER) {
863       QUIC_BUG(quic_bug_10866_2)
864           << ENDPOINT << "Try to send data of stream " << id
865           << " before encryption is established. Version: "
866           << ParsedQuicVersionToString(version());
867     } else {
868       // In QUIC crypto, this could happen when the client sends full CHLO and
869       // 0-RTT request, then receives an inchoate REJ and sends an inchoate
870       // CHLO. The client then gets the ACK of the inchoate CHLO or the client
871       // gets the full REJ and needs to verify the proof (before it sends the
872       // full CHLO), such that there is no outstanding crypto data.
873       // Retransmission alarm fires in TLP mode which tries to retransmit the
874       // 0-RTT request (without encryption).
875       QUIC_DLOG(INFO) << ENDPOINT << "Try to send data of stream " << id
876                       << " before encryption is established.";
877     }
878     return QuicConsumedData(0, false);
879   }
880 
881   SetTransmissionType(type);
882   QuicConnection::ScopedEncryptionLevelContext context(connection(), level);
883 
884   QuicConsumedData data =
885       connection_->SendStreamData(id, write_length, offset, state);
886   if (type == NOT_RETRANSMISSION) {
887     // This is new stream data.
888     write_blocked_streams_->UpdateBytesForStream(id, data.bytes_consumed);
889   }
890 
891   return data;
892 }
893 
SendCryptoData(EncryptionLevel level,size_t write_length,QuicStreamOffset offset,TransmissionType type)894 size_t QuicSession::SendCryptoData(EncryptionLevel level, size_t write_length,
895                                    QuicStreamOffset offset,
896                                    TransmissionType type) {
897   QUICHE_DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
898   if (!connection()->framer().HasEncrypterOfEncryptionLevel(level)) {
899     const std::string error_details = absl::StrCat(
900         "Try to send crypto data with missing keys of encryption level: ",
901         EncryptionLevelToString(level));
902     QUIC_BUG(quic_bug_10866_3) << ENDPOINT << error_details;
903     connection()->CloseConnection(
904         QUIC_MISSING_WRITE_KEYS, error_details,
905         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
906     return 0;
907   }
908   SetTransmissionType(type);
909   QuicConnection::ScopedEncryptionLevelContext context(connection(), level);
910   const auto bytes_consumed =
911       connection_->SendCryptoData(level, write_length, offset);
912   return bytes_consumed;
913 }
914 
OnControlFrameManagerError(QuicErrorCode error_code,std::string error_details)915 void QuicSession::OnControlFrameManagerError(QuicErrorCode error_code,
916                                              std::string error_details) {
917   connection_->CloseConnection(
918       error_code, error_details,
919       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
920 }
921 
WriteControlFrame(const QuicFrame & frame,TransmissionType type)922 bool QuicSession::WriteControlFrame(const QuicFrame& frame,
923                                     TransmissionType type) {
924   QUIC_BUG_IF(quic_bug_12435_11, !connection()->connected())
925       << ENDPOINT
926       << absl::StrCat("Try to write control frame: ", QuicFrameToString(frame),
927                       " when connection is closed: ")
928       << on_closed_frame_string();
929   if (!IsEncryptionEstablished()) {
930     // Suppress the write before encryption gets established.
931     return false;
932   }
933   SetTransmissionType(type);
934   QuicConnection::ScopedEncryptionLevelContext context(
935       connection(), GetEncryptionLevelToSendApplicationData());
936   return connection_->SendControlFrame(frame);
937 }
938 
ResetStream(QuicStreamId id,QuicRstStreamErrorCode error)939 void QuicSession::ResetStream(QuicStreamId id, QuicRstStreamErrorCode error) {
940   QuicStream* stream = GetStream(id);
941   if (stream != nullptr && stream->is_static()) {
942     connection()->CloseConnection(
943         QUIC_INVALID_STREAM_ID, "Try to reset a static stream",
944         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
945     return;
946   }
947 
948   if (stream != nullptr) {
949     stream->Reset(error);
950     return;
951   }
952 
953   QuicConnection::ScopedPacketFlusher flusher(connection());
954   MaybeSendStopSendingFrame(id, QuicResetStreamError::FromInternal(error));
955   MaybeSendRstStreamFrame(id, QuicResetStreamError::FromInternal(error), 0);
956 }
957 
MaybeSendRstStreamFrame(QuicStreamId id,QuicResetStreamError error,QuicStreamOffset bytes_written)958 void QuicSession::MaybeSendRstStreamFrame(QuicStreamId id,
959                                           QuicResetStreamError error,
960                                           QuicStreamOffset bytes_written) {
961   if (!connection()->connected()) {
962     return;
963   }
964   if (!VersionHasIetfQuicFrames(transport_version()) ||
965       QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id),
966                                version()) != READ_UNIDIRECTIONAL) {
967     control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
968   }
969 
970   connection_->OnStreamReset(id, error.internal_code());
971 }
972 
MaybeSendStopSendingFrame(QuicStreamId id,QuicResetStreamError error)973 void QuicSession::MaybeSendStopSendingFrame(QuicStreamId id,
974                                             QuicResetStreamError error) {
975   if (!connection()->connected()) {
976     return;
977   }
978   if (VersionHasIetfQuicFrames(transport_version()) &&
979       QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id),
980                                version()) != WRITE_UNIDIRECTIONAL) {
981     control_frame_manager_.WriteOrBufferStopSending(error, id);
982   }
983 }
984 
SendGoAway(QuicErrorCode error_code,const std::string & reason)985 void QuicSession::SendGoAway(QuicErrorCode error_code,
986                              const std::string& reason) {
987   // GOAWAY frame is not supported in IETF QUIC.
988   QUICHE_DCHECK(!VersionHasIetfQuicFrames(transport_version()));
989   if (!IsEncryptionEstablished()) {
990     QUIC_CODE_COUNT(quic_goaway_before_encryption_established);
991     connection_->CloseConnection(
992         error_code, reason,
993         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
994     return;
995   }
996   if (transport_goaway_sent_) {
997     return;
998   }
999   transport_goaway_sent_ = true;
1000 
1001   QUICHE_DCHECK_EQ(perspective(), Perspective::IS_SERVER);
1002   control_frame_manager_.WriteOrBufferGoAway(
1003       error_code,
1004       QuicUtils::GetMaxClientInitiatedBidirectionalStreamId(
1005           transport_version()),
1006       reason);
1007 }
1008 
SendBlocked(QuicStreamId id,QuicStreamOffset byte_offset)1009 void QuicSession::SendBlocked(QuicStreamId id, QuicStreamOffset byte_offset) {
1010   control_frame_manager_.WriteOrBufferBlocked(id, byte_offset);
1011 }
1012 
SendWindowUpdate(QuicStreamId id,QuicStreamOffset byte_offset)1013 void QuicSession::SendWindowUpdate(QuicStreamId id,
1014                                    QuicStreamOffset byte_offset) {
1015   control_frame_manager_.WriteOrBufferWindowUpdate(id, byte_offset);
1016 }
1017 
OnStreamError(QuicErrorCode error_code,std::string error_details)1018 void QuicSession::OnStreamError(QuicErrorCode error_code,
1019                                 std::string error_details) {
1020   connection_->CloseConnection(
1021       error_code, error_details,
1022       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1023 }
1024 
OnStreamError(QuicErrorCode error_code,QuicIetfTransportErrorCodes ietf_error,std::string error_details)1025 void QuicSession::OnStreamError(QuicErrorCode error_code,
1026                                 QuicIetfTransportErrorCodes ietf_error,
1027                                 std::string error_details) {
1028   connection_->CloseConnection(
1029       error_code, ietf_error, error_details,
1030       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1031 }
1032 
CanSendMaxStreams()1033 bool QuicSession::CanSendMaxStreams() {
1034   return control_frame_manager_.NumBufferedMaxStreams() < 2;
1035 }
1036 
SendMaxStreams(QuicStreamCount stream_count,bool unidirectional)1037 void QuicSession::SendMaxStreams(QuicStreamCount stream_count,
1038                                  bool unidirectional) {
1039   if (!is_configured_) {
1040     QUIC_BUG(quic_bug_10866_5)
1041         << "Try to send max streams before config negotiated.";
1042     return;
1043   }
1044   control_frame_manager_.WriteOrBufferMaxStreams(stream_count, unidirectional);
1045 }
1046 
InsertLocallyClosedStreamsHighestOffset(const QuicStreamId id,QuicStreamOffset offset)1047 void QuicSession::InsertLocallyClosedStreamsHighestOffset(
1048     const QuicStreamId id, QuicStreamOffset offset) {
1049   locally_closed_streams_highest_offset_[id] = offset;
1050 }
1051 
OnStreamClosed(QuicStreamId stream_id)1052 void QuicSession::OnStreamClosed(QuicStreamId stream_id) {
1053   QUIC_DVLOG(1) << ENDPOINT << "Closing stream: " << stream_id;
1054   StreamMap::iterator it = stream_map_.find(stream_id);
1055   if (it == stream_map_.end()) {
1056     QUIC_BUG(quic_bug_10866_6)
1057         << ENDPOINT << "Stream is already closed: " << stream_id;
1058     return;
1059   }
1060   QuicStream* stream = it->second.get();
1061   StreamType type = stream->type();
1062 
1063   const bool stream_waiting_for_acks = stream->IsWaitingForAcks();
1064   if (stream_waiting_for_acks) {
1065     // The stream needs to be kept alive because it's waiting for acks.
1066     ++num_zombie_streams_;
1067   } else {
1068     closed_streams_.push_back(std::move(it->second));
1069     stream_map_.erase(it);
1070     // Do not retransmit data of a closed stream.
1071     streams_with_pending_retransmission_.erase(stream_id);
1072     if (!closed_streams_clean_up_alarm_->IsSet()) {
1073       closed_streams_clean_up_alarm_->Set(
1074           connection_->clock()->ApproximateNow());
1075     }
1076     connection_->QuicBugIfHasPendingFrames(stream_id);
1077   }
1078 
1079   if (!stream->HasReceivedFinalOffset()) {
1080     // If we haven't received a FIN or RST for this stream, we need to keep
1081     // track of the how many bytes the stream's flow controller believes it has
1082     // received, for accurate connection level flow control accounting.
1083     // If this is an outgoing stream, it is technically open from peer's
1084     // perspective. Do not inform stream Id manager yet.
1085     QUICHE_DCHECK(!stream->was_draining());
1086     InsertLocallyClosedStreamsHighestOffset(
1087         stream_id, stream->highest_received_byte_offset());
1088     return;
1089   }
1090 
1091   const bool stream_was_draining = stream->was_draining();
1092   QUIC_DVLOG_IF(1, stream_was_draining)
1093       << ENDPOINT << "Stream " << stream_id << " was draining";
1094   if (stream_was_draining) {
1095     QUIC_BUG_IF(quic_bug_12435_4, num_draining_streams_ == 0);
1096     --num_draining_streams_;
1097     if (!IsIncomingStream(stream_id)) {
1098       QUIC_BUG_IF(quic_bug_12435_5, num_outgoing_draining_streams_ == 0);
1099       --num_outgoing_draining_streams_;
1100     }
1101     // Stream Id manager has been informed with draining streams.
1102     return;
1103   }
1104   if (!VersionHasIetfQuicFrames(transport_version())) {
1105     stream_id_manager_.OnStreamClosed(
1106         /*is_incoming=*/IsIncomingStream(stream_id));
1107   }
1108   if (!connection_->connected()) {
1109     return;
1110   }
1111   if (IsIncomingStream(stream_id)) {
1112     // Stream Id manager is only interested in peer initiated stream IDs.
1113     if (VersionHasIetfQuicFrames(transport_version())) {
1114       ietf_streamid_manager_.OnStreamClosed(stream_id);
1115     }
1116     return;
1117   }
1118   if (!VersionHasIetfQuicFrames(transport_version())) {
1119     OnCanCreateNewOutgoingStream(type != BIDIRECTIONAL);
1120   }
1121 }
1122 
ClosePendingStream(QuicStreamId stream_id)1123 void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
1124   QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
1125   QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
1126   pending_stream_map_.erase(stream_id);
1127   if (connection_->connected()) {
1128     ietf_streamid_manager_.OnStreamClosed(stream_id);
1129   }
1130 }
1131 
ShouldProcessFrameByPendingStream(QuicFrameType type,QuicStreamId id) const1132 bool QuicSession::ShouldProcessFrameByPendingStream(QuicFrameType type,
1133                                                     QuicStreamId id) const {
1134   return stream_map_.find(id) == stream_map_.end() &&
1135          ((version().HasIetfQuicFrames() && ExceedsPerLoopStreamLimit()) ||
1136           UsesPendingStreamForFrame(type, id));
1137 }
1138 
OnFinalByteOffsetReceived(QuicStreamId stream_id,QuicStreamOffset final_byte_offset)1139 void QuicSession::OnFinalByteOffsetReceived(
1140     QuicStreamId stream_id, QuicStreamOffset final_byte_offset) {
1141   auto it = locally_closed_streams_highest_offset_.find(stream_id);
1142   if (it == locally_closed_streams_highest_offset_.end()) {
1143     return;
1144   }
1145 
1146   QUIC_DVLOG(1) << ENDPOINT << "Received final byte offset "
1147                 << final_byte_offset << " for stream " << stream_id;
1148   QuicByteCount offset_diff = final_byte_offset - it->second;
1149   if (flow_controller_.UpdateHighestReceivedOffset(
1150           flow_controller_.highest_received_byte_offset() + offset_diff)) {
1151     // If the final offset violates flow control, close the connection now.
1152     if (flow_controller_.FlowControlViolation()) {
1153       connection_->CloseConnection(
1154           QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
1155           "Connection level flow control violation",
1156           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1157       return;
1158     }
1159   }
1160 
1161   flow_controller_.AddBytesConsumed(offset_diff);
1162   locally_closed_streams_highest_offset_.erase(it);
1163   if (!VersionHasIetfQuicFrames(transport_version())) {
1164     stream_id_manager_.OnStreamClosed(
1165         /*is_incoming=*/IsIncomingStream(stream_id));
1166   }
1167   if (IsIncomingStream(stream_id)) {
1168     if (VersionHasIetfQuicFrames(transport_version())) {
1169       ietf_streamid_manager_.OnStreamClosed(stream_id);
1170     }
1171   } else if (!VersionHasIetfQuicFrames(transport_version())) {
1172     OnCanCreateNewOutgoingStream(false);
1173   }
1174 }
1175 
IsEncryptionEstablished() const1176 bool QuicSession::IsEncryptionEstablished() const {
1177   if (GetCryptoStream() == nullptr) {
1178     return false;
1179   }
1180   return GetCryptoStream()->encryption_established();
1181 }
1182 
OneRttKeysAvailable() const1183 bool QuicSession::OneRttKeysAvailable() const {
1184   if (GetCryptoStream() == nullptr) {
1185     return false;
1186   }
1187   return GetCryptoStream()->one_rtt_keys_available();
1188 }
1189 
OnConfigNegotiated()1190 void QuicSession::OnConfigNegotiated() {
1191   // In versions with TLS, the configs will be set twice if 0-RTT is available.
1192   // In the second config setting, 1-RTT keys are guaranteed to be available.
1193   if (version().UsesTls() && is_configured_ &&
1194       connection_->encryption_level() != ENCRYPTION_FORWARD_SECURE) {
1195     QUIC_BUG(quic_bug_12435_6)
1196         << ENDPOINT
1197         << "1-RTT keys missing when config is negotiated for the second time.";
1198     connection_->CloseConnection(
1199         QUIC_INTERNAL_ERROR,
1200         "1-RTT keys missing when config is negotiated for the second time.",
1201         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1202     return;
1203   }
1204 
1205   QUIC_DVLOG(1) << ENDPOINT << "OnConfigNegotiated";
1206   connection_->SetFromConfig(config_);
1207 
1208   if (VersionHasIetfQuicFrames(transport_version())) {
1209     uint32_t max_streams = 0;
1210     if (config_.HasReceivedMaxBidirectionalStreams()) {
1211       max_streams = config_.ReceivedMaxBidirectionalStreams();
1212     }
1213     if (was_zero_rtt_rejected_ &&
1214         max_streams <
1215             ietf_streamid_manager_.outgoing_bidirectional_stream_count()) {
1216       connection_->CloseConnection(
1217           QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1218           absl::StrCat(
1219               "Server rejected 0-RTT, aborting because new bidirectional "
1220               "initial stream limit ",
1221               max_streams, " is less than current open streams: ",
1222               ietf_streamid_manager_.outgoing_bidirectional_stream_count()),
1223           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1224       return;
1225     }
1226     QUIC_DVLOG(1) << ENDPOINT
1227                   << "Setting Bidirectional outgoing_max_streams_ to "
1228                   << max_streams;
1229     if (perspective_ == Perspective::IS_CLIENT &&
1230         max_streams <
1231             ietf_streamid_manager_.max_outgoing_bidirectional_streams()) {
1232       connection_->CloseConnection(
1233           was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1234                                  : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1235           absl::StrCat(
1236               was_zero_rtt_rejected_
1237                   ? "Server rejected 0-RTT, aborting because "
1238                   : "",
1239               "new bidirectional limit ", max_streams,
1240               " decreases the current limit: ",
1241               ietf_streamid_manager_.max_outgoing_bidirectional_streams()),
1242           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1243       return;
1244     }
1245     if (ietf_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
1246             max_streams)) {
1247       OnCanCreateNewOutgoingStream(/*unidirectional = */ false);
1248     }
1249 
1250     max_streams = 0;
1251     if (config_.HasReceivedMaxUnidirectionalStreams()) {
1252       max_streams = config_.ReceivedMaxUnidirectionalStreams();
1253     }
1254 
1255     if (was_zero_rtt_rejected_ &&
1256         max_streams <
1257             ietf_streamid_manager_.outgoing_unidirectional_stream_count()) {
1258       connection_->CloseConnection(
1259           QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1260           absl::StrCat(
1261               "Server rejected 0-RTT, aborting because new unidirectional "
1262               "initial stream limit ",
1263               max_streams, " is less than current open streams: ",
1264               ietf_streamid_manager_.outgoing_unidirectional_stream_count()),
1265           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1266       return;
1267     }
1268 
1269     if (max_streams <
1270         ietf_streamid_manager_.max_outgoing_unidirectional_streams()) {
1271       connection_->CloseConnection(
1272           was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1273                                  : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1274           absl::StrCat(
1275               was_zero_rtt_rejected_
1276                   ? "Server rejected 0-RTT, aborting because "
1277                   : "",
1278               "new unidirectional limit ", max_streams,
1279               " decreases the current limit: ",
1280               ietf_streamid_manager_.max_outgoing_unidirectional_streams()),
1281           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1282       return;
1283     }
1284     QUIC_DVLOG(1) << ENDPOINT
1285                   << "Setting Unidirectional outgoing_max_streams_ to "
1286                   << max_streams;
1287     if (ietf_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
1288             max_streams)) {
1289       OnCanCreateNewOutgoingStream(/*unidirectional = */ true);
1290     }
1291   } else {
1292     uint32_t max_streams = 0;
1293     if (config_.HasReceivedMaxBidirectionalStreams()) {
1294       max_streams = config_.ReceivedMaxBidirectionalStreams();
1295     }
1296     QUIC_DVLOG(1) << ENDPOINT << "Setting max_open_outgoing_streams_ to "
1297                   << max_streams;
1298     if (was_zero_rtt_rejected_ &&
1299         max_streams < stream_id_manager_.num_open_outgoing_streams()) {
1300       connection_->CloseConnection(
1301           QUIC_INTERNAL_ERROR,
1302           absl::StrCat(
1303               "Server rejected 0-RTT, aborting because new stream limit ",
1304               max_streams, " is less than current open streams: ",
1305               stream_id_manager_.num_open_outgoing_streams()),
1306           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1307       return;
1308     }
1309     stream_id_manager_.set_max_open_outgoing_streams(max_streams);
1310   }
1311 
1312   if (perspective() == Perspective::IS_SERVER) {
1313     if (config_.HasReceivedConnectionOptions()) {
1314       // The following variations change the initial receive flow control
1315       // window sizes.
1316       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW6)) {
1317         AdjustInitialFlowControlWindows(64 * 1024);
1318       }
1319       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW7)) {
1320         AdjustInitialFlowControlWindows(128 * 1024);
1321       }
1322       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW8)) {
1323         AdjustInitialFlowControlWindows(256 * 1024);
1324       }
1325       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW9)) {
1326         AdjustInitialFlowControlWindows(512 * 1024);
1327       }
1328       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFWA)) {
1329         AdjustInitialFlowControlWindows(1024 * 1024);
1330       }
1331     }
1332 
1333     config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
1334   }
1335 
1336   if (VersionHasIetfQuicFrames(transport_version())) {
1337     ietf_streamid_manager_.SetMaxOpenIncomingBidirectionalStreams(
1338         config_.GetMaxBidirectionalStreamsToSend());
1339     ietf_streamid_manager_.SetMaxOpenIncomingUnidirectionalStreams(
1340         config_.GetMaxUnidirectionalStreamsToSend());
1341   } else {
1342     // A small number of additional incoming streams beyond the limit should be
1343     // allowed. This helps avoid early connection termination when FIN/RSTs for
1344     // old streams are lost or arrive out of order.
1345     // Use a minimum number of additional streams, or a percentage increase,
1346     // whichever is larger.
1347     uint32_t max_incoming_streams_to_send =
1348         config_.GetMaxBidirectionalStreamsToSend();
1349     uint32_t max_incoming_streams =
1350         std::max(max_incoming_streams_to_send + kMaxStreamsMinimumIncrement,
1351                  static_cast<uint32_t>(max_incoming_streams_to_send *
1352                                        kMaxStreamsMultiplier));
1353     stream_id_manager_.set_max_open_incoming_streams(max_incoming_streams);
1354   }
1355 
1356   if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
1357     // When using IETF-style TLS transport parameters, inform existing streams
1358     // of new flow-control limits.
1359     if (config_.HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
1360       OnNewStreamOutgoingBidirectionalFlowControlWindow(
1361           config_.ReceivedInitialMaxStreamDataBytesOutgoingBidirectional());
1362     }
1363     if (config_.HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
1364       OnNewStreamIncomingBidirectionalFlowControlWindow(
1365           config_.ReceivedInitialMaxStreamDataBytesIncomingBidirectional());
1366     }
1367     if (config_.HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
1368       OnNewStreamUnidirectionalFlowControlWindow(
1369           config_.ReceivedInitialMaxStreamDataBytesUnidirectional());
1370     }
1371   } else {  // The version uses Google QUIC Crypto.
1372     if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
1373       // Streams which were created before the SHLO was received (0-RTT
1374       // requests) are now informed of the peer's initial flow control window.
1375       OnNewStreamFlowControlWindow(
1376           config_.ReceivedInitialStreamFlowControlWindowBytes());
1377     }
1378   }
1379 
1380   if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
1381     OnNewSessionFlowControlWindow(
1382         config_.ReceivedInitialSessionFlowControlWindowBytes());
1383   }
1384 
1385   if (perspective_ == Perspective::IS_SERVER && version().HasIetfQuicFrames() &&
1386       connection_->effective_peer_address().IsInitialized()) {
1387     if (config_.SupportsServerPreferredAddress(perspective_)) {
1388       quiche::IpAddressFamily address_family =
1389           connection_->effective_peer_address()
1390               .Normalized()
1391               .host()
1392               .address_family();
1393       std::optional<QuicSocketAddress> expected_preferred_address =
1394           config_.GetMappedAlternativeServerAddress(address_family);
1395       if (expected_preferred_address.has_value()) {
1396         // Set connection ID and token if SPAD has received and a preferred
1397         // address of the same address family is configured.
1398         std::optional<QuicNewConnectionIdFrame> frame =
1399             connection_->MaybeIssueNewConnectionIdForPreferredAddress();
1400         if (frame.has_value()) {
1401           config_.SetPreferredAddressConnectionIdAndTokenToSend(
1402               frame->connection_id, frame->stateless_reset_token);
1403         }
1404         connection_->set_expected_server_preferred_address(
1405             *expected_preferred_address);
1406       }
1407       // Clear the alternative address of the other address family in the
1408       // config.
1409       config_.ClearAlternateServerAddressToSend(
1410           address_family == quiche::IpAddressFamily::IP_V4
1411               ? quiche::IpAddressFamily::IP_V6
1412               : quiche::IpAddressFamily::IP_V4);
1413     } else {
1414       // Clear alternative IPv(4|6) addresses in config if the server hasn't
1415       // received 'SPAD' connection option.
1416       config_.ClearAlternateServerAddressToSend(quiche::IpAddressFamily::IP_V4);
1417       config_.ClearAlternateServerAddressToSend(quiche::IpAddressFamily::IP_V6);
1418     }
1419   }
1420 
1421   is_configured_ = true;
1422   connection()->OnConfigNegotiated();
1423 
1424   // Ask flow controllers to try again since the config could have unblocked us.
1425   // Or if this session is configured on TLS enabled QUIC versions,
1426   // attempt to retransmit 0-RTT data if there's any.
1427   // TODO(fayang): consider removing this OnCanWrite call.
1428   if (!connection_->framer().is_processing_packet() &&
1429       (connection_->version().AllowsLowFlowControlLimits() ||
1430        version().UsesTls())) {
1431     QUIC_CODE_COUNT(quic_session_on_can_write_on_config_negotiated);
1432     OnCanWrite();
1433   }
1434 }
1435 
OnAlpsData(const uint8_t *,size_t)1436 std::optional<std::string> QuicSession::OnAlpsData(const uint8_t* /*alps_data*/,
1437                                                    size_t /*alps_length*/) {
1438   return std::nullopt;
1439 }
1440 
AdjustInitialFlowControlWindows(size_t stream_window)1441 void QuicSession::AdjustInitialFlowControlWindows(size_t stream_window) {
1442   const float session_window_multiplier =
1443       config_.GetInitialStreamFlowControlWindowToSend()
1444           ? static_cast<float>(
1445                 config_.GetInitialSessionFlowControlWindowToSend()) /
1446                 config_.GetInitialStreamFlowControlWindowToSend()
1447           : 1.5;
1448 
1449   QUIC_DVLOG(1) << ENDPOINT << "Set stream receive window to " << stream_window;
1450   config_.SetInitialStreamFlowControlWindowToSend(stream_window);
1451 
1452   size_t session_window = session_window_multiplier * stream_window;
1453   QUIC_DVLOG(1) << ENDPOINT << "Set session receive window to "
1454                 << session_window;
1455   config_.SetInitialSessionFlowControlWindowToSend(session_window);
1456   flow_controller_.UpdateReceiveWindowSize(session_window);
1457   // Inform all existing streams about the new window.
1458   for (auto const& kv : stream_map_) {
1459     kv.second->UpdateReceiveWindowSize(stream_window);
1460   }
1461   if (!QuicVersionUsesCryptoFrames(transport_version())) {
1462     GetMutableCryptoStream()->UpdateReceiveWindowSize(stream_window);
1463   }
1464 }
1465 
HandleFrameOnNonexistentOutgoingStream(QuicStreamId stream_id)1466 void QuicSession::HandleFrameOnNonexistentOutgoingStream(
1467     QuicStreamId stream_id) {
1468   QUICHE_DCHECK(!IsClosedStream(stream_id));
1469   // Received a frame for a locally-created stream that is not currently
1470   // active. This is an error.
1471   if (VersionHasIetfQuicFrames(transport_version())) {
1472     connection()->CloseConnection(
1473         QUIC_HTTP_STREAM_WRONG_DIRECTION, "Data for nonexistent stream",
1474         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1475     return;
1476   }
1477   connection()->CloseConnection(
1478       QUIC_INVALID_STREAM_ID, "Data for nonexistent stream",
1479       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1480 }
1481 
HandleRstOnValidNonexistentStream(const QuicRstStreamFrame & frame)1482 void QuicSession::HandleRstOnValidNonexistentStream(
1483     const QuicRstStreamFrame& frame) {
1484   // If the stream is neither originally in active streams nor created in
1485   // GetOrCreateStream(), it could be a closed stream in which case its
1486   // final received byte offset need to be updated.
1487   if (IsClosedStream(frame.stream_id)) {
1488     // The RST frame contains the final byte offset for the stream: we can now
1489     // update the connection level flow controller if needed.
1490     OnFinalByteOffsetReceived(frame.stream_id, frame.byte_offset);
1491   }
1492 }
1493 
OnNewStreamFlowControlWindow(QuicStreamOffset new_window)1494 void QuicSession::OnNewStreamFlowControlWindow(QuicStreamOffset new_window) {
1495   QUICHE_DCHECK(version().UsesQuicCrypto());
1496   QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamFlowControlWindow " << new_window;
1497   if (new_window < kMinimumFlowControlSendWindow) {
1498     QUIC_LOG_FIRST_N(ERROR, 1)
1499         << "Peer sent us an invalid stream flow control send window: "
1500         << new_window << ", below minimum: " << kMinimumFlowControlSendWindow;
1501     connection_->CloseConnection(
1502         QUIC_FLOW_CONTROL_INVALID_WINDOW, "New stream window too low",
1503         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1504     return;
1505   }
1506 
1507   // Inform all existing streams about the new window.
1508   for (auto const& kv : stream_map_) {
1509     QUIC_DVLOG(1) << ENDPOINT << "Informing stream " << kv.first
1510                   << " of new stream flow control window " << new_window;
1511     if (!kv.second->MaybeConfigSendWindowOffset(
1512             new_window, /* was_zero_rtt_rejected = */ false)) {
1513       return;
1514     }
1515   }
1516   if (!QuicVersionUsesCryptoFrames(transport_version())) {
1517     QUIC_DVLOG(1)
1518         << ENDPOINT
1519         << "Informing crypto stream of new stream flow control window "
1520         << new_window;
1521     GetMutableCryptoStream()->MaybeConfigSendWindowOffset(
1522         new_window, /* was_zero_rtt_rejected = */ false);
1523   }
1524 }
1525 
OnNewStreamUnidirectionalFlowControlWindow(QuicStreamOffset new_window)1526 void QuicSession::OnNewStreamUnidirectionalFlowControlWindow(
1527     QuicStreamOffset new_window) {
1528   QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1529   QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamUnidirectionalFlowControlWindow "
1530                 << new_window;
1531   // Inform all existing outgoing unidirectional streams about the new window.
1532   for (auto const& kv : stream_map_) {
1533     const QuicStreamId id = kv.first;
1534     if (!version().HasIetfQuicFrames()) {
1535       if (kv.second->type() == BIDIRECTIONAL) {
1536         continue;
1537       }
1538     } else {
1539       if (QuicUtils::IsBidirectionalStreamId(id, version())) {
1540         continue;
1541       }
1542     }
1543     if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1544                                        perspective())) {
1545       continue;
1546     }
1547     QUIC_DVLOG(1) << ENDPOINT << "Informing unidirectional stream " << id
1548                   << " of new stream flow control window " << new_window;
1549     if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1550                                                 was_zero_rtt_rejected_)) {
1551       return;
1552     }
1553   }
1554 }
1555 
OnNewStreamOutgoingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1556 void QuicSession::OnNewStreamOutgoingBidirectionalFlowControlWindow(
1557     QuicStreamOffset new_window) {
1558   QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1559   QUIC_DVLOG(1) << ENDPOINT
1560                 << "OnNewStreamOutgoingBidirectionalFlowControlWindow "
1561                 << new_window;
1562   // Inform all existing outgoing bidirectional streams about the new window.
1563   for (auto const& kv : stream_map_) {
1564     const QuicStreamId id = kv.first;
1565     if (!version().HasIetfQuicFrames()) {
1566       if (kv.second->type() != BIDIRECTIONAL) {
1567         continue;
1568       }
1569     } else {
1570       if (!QuicUtils::IsBidirectionalStreamId(id, version())) {
1571         continue;
1572       }
1573     }
1574     if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1575                                        perspective())) {
1576       continue;
1577     }
1578     QUIC_DVLOG(1) << ENDPOINT << "Informing outgoing bidirectional stream "
1579                   << id << " of new stream flow control window " << new_window;
1580     if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1581                                                 was_zero_rtt_rejected_)) {
1582       return;
1583     }
1584   }
1585 }
1586 
OnNewStreamIncomingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1587 void QuicSession::OnNewStreamIncomingBidirectionalFlowControlWindow(
1588     QuicStreamOffset new_window) {
1589   QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1590   QUIC_DVLOG(1) << ENDPOINT
1591                 << "OnNewStreamIncomingBidirectionalFlowControlWindow "
1592                 << new_window;
1593   // Inform all existing incoming bidirectional streams about the new window.
1594   for (auto const& kv : stream_map_) {
1595     const QuicStreamId id = kv.first;
1596     if (!version().HasIetfQuicFrames()) {
1597       if (kv.second->type() != BIDIRECTIONAL) {
1598         continue;
1599       }
1600     } else {
1601       if (!QuicUtils::IsBidirectionalStreamId(id, version())) {
1602         continue;
1603       }
1604     }
1605     if (QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1606                                       perspective())) {
1607       continue;
1608     }
1609     QUIC_DVLOG(1) << ENDPOINT << "Informing incoming bidirectional stream "
1610                   << id << " of new stream flow control window " << new_window;
1611     if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1612                                                 was_zero_rtt_rejected_)) {
1613       return;
1614     }
1615   }
1616 }
1617 
OnNewSessionFlowControlWindow(QuicStreamOffset new_window)1618 void QuicSession::OnNewSessionFlowControlWindow(QuicStreamOffset new_window) {
1619   QUIC_DVLOG(1) << ENDPOINT << "OnNewSessionFlowControlWindow " << new_window;
1620 
1621   if (was_zero_rtt_rejected_ && new_window < flow_controller_.bytes_sent()) {
1622     std::string error_details = absl::StrCat(
1623         "Server rejected 0-RTT. Aborting because the client received session "
1624         "flow control send window: ",
1625         new_window,
1626         ", which is below currently used: ", flow_controller_.bytes_sent());
1627     QUIC_LOG(ERROR) << error_details;
1628     connection_->CloseConnection(
1629         QUIC_ZERO_RTT_UNRETRANSMITTABLE, error_details,
1630         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1631     return;
1632   }
1633   if (!connection_->version().AllowsLowFlowControlLimits() &&
1634       new_window < kMinimumFlowControlSendWindow) {
1635     std::string error_details = absl::StrCat(
1636         "Peer sent us an invalid session flow control send window: ",
1637         new_window, ", below minimum: ", kMinimumFlowControlSendWindow);
1638     QUIC_LOG_FIRST_N(ERROR, 1) << error_details;
1639     connection_->CloseConnection(
1640         QUIC_FLOW_CONTROL_INVALID_WINDOW, error_details,
1641         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1642     return;
1643   }
1644   if (perspective_ == Perspective::IS_CLIENT &&
1645       new_window < flow_controller_.send_window_offset()) {
1646     // The client receives a lower limit than remembered, violating
1647     // https://tools.ietf.org/html/draft-ietf-quic-transport-27#section-7.3.1
1648     std::string error_details = absl::StrCat(
1649         was_zero_rtt_rejected_ ? "Server rejected 0-RTT, aborting because "
1650                                : "",
1651         "new session max data ", new_window,
1652         " decreases current limit: ", flow_controller_.send_window_offset());
1653     QUIC_LOG(ERROR) << error_details;
1654     connection_->CloseConnection(
1655         was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1656                                : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1657         error_details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1658     return;
1659   }
1660 
1661   flow_controller_.UpdateSendWindowOffset(new_window);
1662 }
1663 
OnNewDecryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicDecrypter> decrypter,bool set_alternative_decrypter,bool latch_once_used)1664 bool QuicSession::OnNewDecryptionKeyAvailable(
1665     EncryptionLevel level, std::unique_ptr<QuicDecrypter> decrypter,
1666     bool set_alternative_decrypter, bool latch_once_used) {
1667   if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3 &&
1668       !connection()->framer().HasEncrypterOfEncryptionLevel(
1669           QuicUtils::GetEncryptionLevelToSendAckofSpace(
1670               QuicUtils::GetPacketNumberSpace(level)))) {
1671     // This should never happen because connection should never decrypt a packet
1672     // while an ACK for it cannot be encrypted.
1673     return false;
1674   }
1675   if (connection()->version().KnowsWhichDecrypterToUse()) {
1676     connection()->InstallDecrypter(level, std::move(decrypter));
1677     return true;
1678   }
1679   if (set_alternative_decrypter) {
1680     connection()->SetAlternativeDecrypter(level, std::move(decrypter),
1681                                           latch_once_used);
1682     return true;
1683   }
1684   connection()->SetDecrypter(level, std::move(decrypter));
1685   return true;
1686 }
1687 
OnNewEncryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicEncrypter> encrypter)1688 void QuicSession::OnNewEncryptionKeyAvailable(
1689     EncryptionLevel level, std::unique_ptr<QuicEncrypter> encrypter) {
1690   connection()->SetEncrypter(level, std::move(encrypter));
1691   if (connection_->version().handshake_protocol != PROTOCOL_TLS1_3) {
1692     return;
1693   }
1694 
1695   bool reset_encryption_level = false;
1696   if (IsEncryptionEstablished() && level == ENCRYPTION_HANDSHAKE) {
1697     // ENCRYPTION_HANDSHAKE keys are only used for the handshake. If
1698     // ENCRYPTION_ZERO_RTT keys exist, it is possible for a client to send
1699     // stream data, which must not be sent at the ENCRYPTION_HANDSHAKE level.
1700     // Therefore, we avoid setting the default encryption level to
1701     // ENCRYPTION_HANDSHAKE.
1702     reset_encryption_level = true;
1703   }
1704   QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to " << level;
1705   connection()->SetDefaultEncryptionLevel(level);
1706   if (reset_encryption_level) {
1707     connection()->SetDefaultEncryptionLevel(ENCRYPTION_ZERO_RTT);
1708   }
1709   QUIC_BUG_IF(quic_bug_12435_7,
1710               IsEncryptionEstablished() &&
1711                   (connection()->encryption_level() == ENCRYPTION_INITIAL ||
1712                    connection()->encryption_level() == ENCRYPTION_HANDSHAKE))
1713       << "Encryption is established, but the encryption level " << level
1714       << " does not support sending stream data";
1715 }
1716 
SetDefaultEncryptionLevel(EncryptionLevel level)1717 void QuicSession::SetDefaultEncryptionLevel(EncryptionLevel level) {
1718   QUICHE_DCHECK_EQ(PROTOCOL_QUIC_CRYPTO,
1719                    connection_->version().handshake_protocol);
1720   QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to " << level;
1721   connection()->SetDefaultEncryptionLevel(level);
1722 
1723   switch (level) {
1724     case ENCRYPTION_INITIAL:
1725       break;
1726     case ENCRYPTION_ZERO_RTT:
1727       if (perspective() == Perspective::IS_CLIENT) {
1728         // Retransmit old 0-RTT data (if any) with the new 0-RTT keys, since
1729         // they can't be decrypted by the server.
1730         connection_->MarkZeroRttPacketsForRetransmission(0);
1731         if (!connection_->framer().is_processing_packet()) {
1732           // TODO(fayang): consider removing this OnCanWrite call.
1733           // Given any streams blocked by encryption a chance to write.
1734           QUIC_CODE_COUNT(
1735               quic_session_on_can_write_set_default_encryption_level);
1736           OnCanWrite();
1737         }
1738       }
1739       break;
1740     case ENCRYPTION_HANDSHAKE:
1741       break;
1742     case ENCRYPTION_FORWARD_SECURE:
1743       QUIC_BUG_IF(quic_bug_12435_8, !config_.negotiated())
1744           << ENDPOINT << "Handshake confirmed without parameter negotiation.";
1745       connection()->mutable_stats().handshake_completion_time =
1746           connection()->clock()->ApproximateNow();
1747       break;
1748     default:
1749       QUIC_BUG(quic_bug_10866_7) << "Unknown encryption level: " << level;
1750   }
1751 }
1752 
OnTlsHandshakeComplete()1753 void QuicSession::OnTlsHandshakeComplete() {
1754   QUICHE_DCHECK_EQ(PROTOCOL_TLS1_3, connection_->version().handshake_protocol);
1755   QUIC_BUG_IF(quic_bug_12435_9,
1756               !GetCryptoStream()->crypto_negotiated_params().cipher_suite)
1757       << ENDPOINT << "Handshake completes without cipher suite negotiation.";
1758   QUIC_BUG_IF(quic_bug_12435_10, !config_.negotiated())
1759       << ENDPOINT << "Handshake completes without parameter negotiation.";
1760   connection()->mutable_stats().handshake_completion_time =
1761       connection()->clock()->ApproximateNow();
1762   if (connection()->version().UsesTls() &&
1763       perspective_ == Perspective::IS_SERVER) {
1764     // Server sends HANDSHAKE_DONE to signal confirmation of the handshake
1765     // to the client.
1766     control_frame_manager_.WriteOrBufferHandshakeDone();
1767     if (connection()->version().HasIetfQuicFrames()) {
1768       MaybeSendAddressToken();
1769     }
1770   }
1771 }
1772 
MaybeSendAddressToken()1773 bool QuicSession::MaybeSendAddressToken() {
1774   QUICHE_DCHECK(perspective_ == Perspective::IS_SERVER &&
1775                 connection()->version().HasIetfQuicFrames());
1776   std::optional<CachedNetworkParameters> cached_network_params =
1777       GenerateCachedNetworkParameters();
1778 
1779   std::string address_token = GetCryptoStream()->GetAddressToken(
1780       cached_network_params.has_value() ? &*cached_network_params : nullptr);
1781   if (address_token.empty()) {
1782     return false;
1783   }
1784   const size_t buf_len = address_token.length() + 1;
1785   auto buffer = std::make_unique<char[]>(buf_len);
1786   QuicDataWriter writer(buf_len, buffer.get());
1787   // Add |kAddressTokenPrefix| for token sent in NEW_TOKEN frame.
1788   writer.WriteUInt8(kAddressTokenPrefix);
1789   writer.WriteBytes(address_token.data(), address_token.length());
1790   control_frame_manager_.WriteOrBufferNewToken(
1791       absl::string_view(buffer.get(), buf_len));
1792   if (cached_network_params.has_value()) {
1793     connection()->OnSendConnectionState(*cached_network_params);
1794   }
1795   return true;
1796 }
1797 
DiscardOldDecryptionKey(EncryptionLevel level)1798 void QuicSession::DiscardOldDecryptionKey(EncryptionLevel level) {
1799   if (!connection()->version().KnowsWhichDecrypterToUse()) {
1800     return;
1801   }
1802   connection()->RemoveDecrypter(level);
1803 }
1804 
DiscardOldEncryptionKey(EncryptionLevel level)1805 void QuicSession::DiscardOldEncryptionKey(EncryptionLevel level) {
1806   QUIC_DLOG(INFO) << ENDPOINT << "Discarding " << level << " keys";
1807   if (connection()->version().handshake_protocol == PROTOCOL_TLS1_3) {
1808     connection()->RemoveEncrypter(level);
1809   }
1810   switch (level) {
1811     case ENCRYPTION_INITIAL:
1812       NeuterUnencryptedData();
1813       break;
1814     case ENCRYPTION_HANDSHAKE:
1815       NeuterHandshakeData();
1816       break;
1817     case ENCRYPTION_ZERO_RTT:
1818       break;
1819     case ENCRYPTION_FORWARD_SECURE:
1820       QUIC_BUG(quic_bug_10866_8)
1821           << ENDPOINT << "Discarding 1-RTT keys is not allowed";
1822       break;
1823     default:
1824       QUIC_BUG(quic_bug_10866_9)
1825           << ENDPOINT
1826           << "Cannot discard keys for unknown encryption level: " << level;
1827   }
1828 }
1829 
NeuterHandshakeData()1830 void QuicSession::NeuterHandshakeData() {
1831   GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(
1832       ENCRYPTION_HANDSHAKE);
1833   connection()->OnHandshakeComplete();
1834 }
1835 
OnZeroRttRejected(int reason)1836 void QuicSession::OnZeroRttRejected(int reason) {
1837   was_zero_rtt_rejected_ = true;
1838   connection_->MarkZeroRttPacketsForRetransmission(reason);
1839   if (connection_->encryption_level() == ENCRYPTION_FORWARD_SECURE) {
1840     QUIC_BUG(quic_bug_10866_10)
1841         << "1-RTT keys already available when 0-RTT is rejected.";
1842     connection_->CloseConnection(
1843         QUIC_INTERNAL_ERROR,
1844         "1-RTT keys already available when 0-RTT is rejected.",
1845         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1846   }
1847 }
1848 
FillTransportParameters(TransportParameters * params)1849 bool QuicSession::FillTransportParameters(TransportParameters* params) {
1850   if (version().UsesTls()) {
1851     if (perspective() == Perspective::IS_SERVER) {
1852       config_.SetOriginalConnectionIdToSend(
1853           connection_->GetOriginalDestinationConnectionId());
1854       config_.SetInitialSourceConnectionIdToSend(connection_->connection_id());
1855     } else {
1856       config_.SetInitialSourceConnectionIdToSend(
1857           connection_->client_connection_id());
1858     }
1859   }
1860   return config_.FillTransportParameters(params);
1861 }
1862 
ProcessTransportParameters(const TransportParameters & params,bool is_resumption,std::string * error_details)1863 QuicErrorCode QuicSession::ProcessTransportParameters(
1864     const TransportParameters& params, bool is_resumption,
1865     std::string* error_details) {
1866   return config_.ProcessTransportParameters(params, is_resumption,
1867                                             error_details);
1868 }
1869 
OnHandshakeCallbackDone()1870 void QuicSession::OnHandshakeCallbackDone() {
1871   if (!connection_->connected()) {
1872     return;
1873   }
1874 
1875   if (!connection()->is_processing_packet()) {
1876     connection()->MaybeProcessUndecryptablePackets();
1877   }
1878 }
1879 
PacketFlusherAttached() const1880 bool QuicSession::PacketFlusherAttached() const {
1881   QUICHE_DCHECK(connection_->connected());
1882   return connection()->packet_creator().PacketFlusherAttached();
1883 }
1884 
OnEncryptedClientHelloSent(absl::string_view client_hello) const1885 void QuicSession::OnEncryptedClientHelloSent(
1886     absl::string_view client_hello) const {
1887   connection()->OnEncryptedClientHelloSent(client_hello);
1888 }
1889 
OnEncryptedClientHelloReceived(absl::string_view client_hello) const1890 void QuicSession::OnEncryptedClientHelloReceived(
1891     absl::string_view client_hello) const {
1892   connection()->OnEncryptedClientHelloReceived(client_hello);
1893 }
1894 
OnCryptoHandshakeMessageSent(const CryptoHandshakeMessage &)1895 void QuicSession::OnCryptoHandshakeMessageSent(
1896     const CryptoHandshakeMessage& /*message*/) {}
1897 
OnCryptoHandshakeMessageReceived(const CryptoHandshakeMessage &)1898 void QuicSession::OnCryptoHandshakeMessageReceived(
1899     const CryptoHandshakeMessage& /*message*/) {}
1900 
RegisterStreamPriority(QuicStreamId id,bool is_static,const QuicStreamPriority & priority)1901 void QuicSession::RegisterStreamPriority(QuicStreamId id, bool is_static,
1902                                          const QuicStreamPriority& priority) {
1903   write_blocked_streams()->RegisterStream(id, is_static, priority);
1904 }
1905 
UnregisterStreamPriority(QuicStreamId id)1906 void QuicSession::UnregisterStreamPriority(QuicStreamId id) {
1907   write_blocked_streams()->UnregisterStream(id);
1908 }
1909 
UpdateStreamPriority(QuicStreamId id,const QuicStreamPriority & new_priority)1910 void QuicSession::UpdateStreamPriority(QuicStreamId id,
1911                                        const QuicStreamPriority& new_priority) {
1912   write_blocked_streams()->UpdateStreamPriority(id, new_priority);
1913 }
1914 
ActivateStream(std::unique_ptr<QuicStream> stream)1915 void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
1916   const bool should_keep_alive = ShouldKeepConnectionAlive();
1917   QuicStreamId stream_id = stream->id();
1918   bool is_static = stream->is_static();
1919   QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
1920                 << ". activating stream " << stream_id;
1921   QUICHE_DCHECK(!stream_map_.contains(stream_id));
1922   stream_map_[stream_id] = std::move(stream);
1923   if (is_static) {
1924     ++num_static_streams_;
1925     return;
1926   }
1927   if (version().HasIetfQuicFrames() && IsIncomingStream(stream_id) &&
1928       max_streams_accepted_per_loop_ != kMaxQuicStreamCount) {
1929     QUICHE_DCHECK(!ExceedsPerLoopStreamLimit());
1930     // Per-loop stream limit is emposed.
1931     ++new_incoming_streams_in_current_loop_;
1932     if (!stream_count_reset_alarm_->IsSet()) {
1933       stream_count_reset_alarm_->Set(connection()->clock()->ApproximateNow());
1934     }
1935   }
1936   if (!VersionHasIetfQuicFrames(transport_version())) {
1937     // Do not inform stream ID manager of static streams.
1938     stream_id_manager_.ActivateStream(
1939         /*is_incoming=*/IsIncomingStream(stream_id));
1940   }
1941   if (perspective() == Perspective::IS_CLIENT &&
1942       connection()->multi_port_stats() != nullptr && !should_keep_alive &&
1943       ShouldKeepConnectionAlive()) {
1944     connection()->MaybeProbeMultiPortPath();
1945   }
1946 }
1947 
GetNextOutgoingBidirectionalStreamId()1948 QuicStreamId QuicSession::GetNextOutgoingBidirectionalStreamId() {
1949   if (VersionHasIetfQuicFrames(transport_version())) {
1950     return ietf_streamid_manager_.GetNextOutgoingBidirectionalStreamId();
1951   }
1952   return stream_id_manager_.GetNextOutgoingStreamId();
1953 }
1954 
GetNextOutgoingUnidirectionalStreamId()1955 QuicStreamId QuicSession::GetNextOutgoingUnidirectionalStreamId() {
1956   if (VersionHasIetfQuicFrames(transport_version())) {
1957     return ietf_streamid_manager_.GetNextOutgoingUnidirectionalStreamId();
1958   }
1959   return stream_id_manager_.GetNextOutgoingStreamId();
1960 }
1961 
CanOpenNextOutgoingBidirectionalStream()1962 bool QuicSession::CanOpenNextOutgoingBidirectionalStream() {
1963   if (liveness_testing_in_progress_) {
1964     QUICHE_DCHECK_EQ(Perspective::IS_CLIENT, perspective());
1965     QUIC_CODE_COUNT(
1966         quic_client_fails_to_create_stream_liveness_testing_in_progress);
1967     return false;
1968   }
1969   if (!VersionHasIetfQuicFrames(transport_version())) {
1970     if (!stream_id_manager_.CanOpenNextOutgoingStream()) {
1971       return false;
1972     }
1973   } else {
1974     if (!ietf_streamid_manager_.CanOpenNextOutgoingBidirectionalStream()) {
1975       QUIC_CODE_COUNT(
1976           quic_fails_to_create_stream_close_too_many_streams_created);
1977       if (is_configured_) {
1978         // Send STREAM_BLOCKED after config negotiated.
1979         control_frame_manager_.WriteOrBufferStreamsBlocked(
1980             ietf_streamid_manager_.max_outgoing_bidirectional_streams(),
1981             /*unidirectional=*/false);
1982       }
1983       return false;
1984     }
1985   }
1986   if (perspective() == Perspective::IS_CLIENT &&
1987       connection_->MaybeTestLiveness()) {
1988     // Now is relatively close to the idle timeout having the risk that requests
1989     // could be discarded at the server.
1990     liveness_testing_in_progress_ = true;
1991     QUIC_CODE_COUNT(quic_client_fails_to_create_stream_close_to_idle_timeout);
1992     return false;
1993   }
1994   return true;
1995 }
1996 
CanOpenNextOutgoingUnidirectionalStream()1997 bool QuicSession::CanOpenNextOutgoingUnidirectionalStream() {
1998   if (!VersionHasIetfQuicFrames(transport_version())) {
1999     return stream_id_manager_.CanOpenNextOutgoingStream();
2000   }
2001   if (ietf_streamid_manager_.CanOpenNextOutgoingUnidirectionalStream()) {
2002     return true;
2003   }
2004   if (is_configured_) {
2005     // Send STREAM_BLOCKED after config negotiated.
2006     control_frame_manager_.WriteOrBufferStreamsBlocked(
2007         ietf_streamid_manager_.max_outgoing_unidirectional_streams(),
2008         /*unidirectional=*/true);
2009   }
2010   return false;
2011 }
2012 
GetAdvertisedMaxIncomingBidirectionalStreams() const2013 QuicStreamCount QuicSession::GetAdvertisedMaxIncomingBidirectionalStreams()
2014     const {
2015   QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
2016   return ietf_streamid_manager_.advertised_max_incoming_bidirectional_streams();
2017 }
2018 
GetOrCreateStream(const QuicStreamId stream_id)2019 QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
2020   QUICHE_DCHECK(!pending_stream_map_.contains(stream_id));
2021   if (QuicUtils::IsCryptoStreamId(transport_version(), stream_id)) {
2022     return GetMutableCryptoStream();
2023   }
2024 
2025   StreamMap::iterator it = stream_map_.find(stream_id);
2026   if (it != stream_map_.end()) {
2027     return it->second->IsZombie() ? nullptr : it->second.get();
2028   }
2029 
2030   if (IsClosedStream(stream_id)) {
2031     return nullptr;
2032   }
2033 
2034   if (!IsIncomingStream(stream_id)) {
2035     HandleFrameOnNonexistentOutgoingStream(stream_id);
2036     return nullptr;
2037   }
2038 
2039   // TODO(fkastenholz): If we are creating a new stream and we have sent a
2040   // goaway, we should ignore the stream creation. Need to add code to A) test
2041   // if goaway was sent ("if (transport_goaway_sent_)") and B) reject stream
2042   // creation ("return nullptr")
2043 
2044   if (!MaybeIncreaseLargestPeerStreamId(stream_id)) {
2045     return nullptr;
2046   }
2047 
2048   if (!VersionHasIetfQuicFrames(transport_version()) &&
2049       !stream_id_manager_.CanOpenIncomingStream()) {
2050     // Refuse to open the stream.
2051     ResetStream(stream_id, QUIC_REFUSED_STREAM);
2052     return nullptr;
2053   }
2054 
2055   return CreateIncomingStream(stream_id);
2056 }
2057 
StreamDraining(QuicStreamId stream_id,bool unidirectional)2058 void QuicSession::StreamDraining(QuicStreamId stream_id, bool unidirectional) {
2059   QUICHE_DCHECK(stream_map_.contains(stream_id));
2060   QUIC_DVLOG(1) << ENDPOINT << "Stream " << stream_id << " is draining";
2061   if (VersionHasIetfQuicFrames(transport_version())) {
2062     ietf_streamid_manager_.OnStreamClosed(stream_id);
2063   } else {
2064     stream_id_manager_.OnStreamClosed(
2065         /*is_incoming=*/IsIncomingStream(stream_id));
2066   }
2067   ++num_draining_streams_;
2068   if (!IsIncomingStream(stream_id)) {
2069     ++num_outgoing_draining_streams_;
2070     if (!VersionHasIetfQuicFrames(transport_version())) {
2071       OnCanCreateNewOutgoingStream(unidirectional);
2072     }
2073   }
2074 }
2075 
MaybeIncreaseLargestPeerStreamId(const QuicStreamId stream_id)2076 bool QuicSession::MaybeIncreaseLargestPeerStreamId(
2077     const QuicStreamId stream_id) {
2078   if (VersionHasIetfQuicFrames(transport_version())) {
2079     std::string error_details;
2080     if (ietf_streamid_manager_.MaybeIncreaseLargestPeerStreamId(
2081             stream_id, &error_details)) {
2082       return true;
2083     }
2084     connection()->CloseConnection(
2085         QUIC_INVALID_STREAM_ID, error_details,
2086         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2087     return false;
2088   }
2089   if (!stream_id_manager_.MaybeIncreaseLargestPeerStreamId(stream_id)) {
2090     connection()->CloseConnection(
2091         QUIC_TOO_MANY_AVAILABLE_STREAMS,
2092         absl::StrCat(stream_id, " exceeds available streams ",
2093                      stream_id_manager_.MaxAvailableStreams()),
2094         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2095     return false;
2096   }
2097   return true;
2098 }
2099 
ShouldYield(QuicStreamId stream_id)2100 bool QuicSession::ShouldYield(QuicStreamId stream_id) {
2101   if (stream_id == currently_writing_stream_id_) {
2102     return false;
2103   }
2104   return write_blocked_streams()->ShouldYield(stream_id);
2105 }
2106 
GetOrCreatePendingStream(QuicStreamId stream_id)2107 PendingStream* QuicSession::GetOrCreatePendingStream(QuicStreamId stream_id) {
2108   auto it = pending_stream_map_.find(stream_id);
2109   if (it != pending_stream_map_.end()) {
2110     return it->second.get();
2111   }
2112 
2113   if (IsClosedStream(stream_id) ||
2114       !MaybeIncreaseLargestPeerStreamId(stream_id)) {
2115     return nullptr;
2116   }
2117 
2118   auto pending = std::make_unique<PendingStream>(stream_id, this);
2119   PendingStream* unowned_pending = pending.get();
2120   pending_stream_map_[stream_id] = std::move(pending);
2121   return unowned_pending;
2122 }
2123 
set_largest_peer_created_stream_id(QuicStreamId largest_peer_created_stream_id)2124 void QuicSession::set_largest_peer_created_stream_id(
2125     QuicStreamId largest_peer_created_stream_id) {
2126   QUICHE_DCHECK(!VersionHasIetfQuicFrames(transport_version()));
2127   stream_id_manager_.set_largest_peer_created_stream_id(
2128       largest_peer_created_stream_id);
2129 }
2130 
GetLargestPeerCreatedStreamId(bool unidirectional) const2131 QuicStreamId QuicSession::GetLargestPeerCreatedStreamId(
2132     bool unidirectional) const {
2133   // This method is only used in IETF QUIC.
2134   QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
2135   return ietf_streamid_manager_.GetLargestPeerCreatedStreamId(unidirectional);
2136 }
2137 
DeleteConnection()2138 void QuicSession::DeleteConnection() {
2139   if (connection_) {
2140     delete connection_;
2141     connection_ = nullptr;
2142   }
2143 }
2144 
MaybeSetStreamPriority(QuicStreamId stream_id,const QuicStreamPriority & priority)2145 bool QuicSession::MaybeSetStreamPriority(QuicStreamId stream_id,
2146                                          const QuicStreamPriority& priority) {
2147   auto active_stream = stream_map_.find(stream_id);
2148   if (active_stream != stream_map_.end()) {
2149     active_stream->second->SetPriority(priority);
2150     return true;
2151   }
2152 
2153   return false;
2154 }
2155 
IsClosedStream(QuicStreamId id)2156 bool QuicSession::IsClosedStream(QuicStreamId id) {
2157   QUICHE_DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
2158   if (IsOpenStream(id)) {
2159     // Stream is active
2160     return false;
2161   }
2162 
2163   if (VersionHasIetfQuicFrames(transport_version())) {
2164     return !ietf_streamid_manager_.IsAvailableStream(id);
2165   }
2166 
2167   return !stream_id_manager_.IsAvailableStream(id);
2168 }
2169 
IsOpenStream(QuicStreamId id)2170 bool QuicSession::IsOpenStream(QuicStreamId id) {
2171   QUICHE_DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
2172   const StreamMap::iterator it = stream_map_.find(id);
2173   if (it != stream_map_.end()) {
2174     return !it->second->IsZombie();
2175   }
2176   if (pending_stream_map_.contains(id) ||
2177       QuicUtils::IsCryptoStreamId(transport_version(), id)) {
2178     // Stream is active
2179     return true;
2180   }
2181   return false;
2182 }
2183 
IsStaticStream(QuicStreamId id) const2184 bool QuicSession::IsStaticStream(QuicStreamId id) const {
2185   auto it = stream_map_.find(id);
2186   if (it == stream_map_.end()) {
2187     return false;
2188   }
2189   return it->second->is_static();
2190 }
2191 
GetNumActiveStreams() const2192 size_t QuicSession::GetNumActiveStreams() const {
2193   QUICHE_DCHECK_GE(
2194       static_cast<QuicStreamCount>(stream_map_.size()),
2195       num_static_streams_ + num_draining_streams_ + num_zombie_streams_);
2196   return stream_map_.size() - num_draining_streams_ - num_static_streams_ -
2197          num_zombie_streams_;
2198 }
2199 
MarkConnectionLevelWriteBlocked(QuicStreamId id)2200 void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
2201   if (GetOrCreateStream(id) == nullptr) {
2202     QUIC_BUG(quic_bug_10866_11)
2203         << "Marking unknown stream " << id << " blocked.";
2204     QUIC_LOG_FIRST_N(ERROR, 2) << QuicStackTrace();
2205   }
2206 
2207   QUIC_DVLOG(1) << ENDPOINT << "Adding stream " << id
2208                 << " to write-blocked list";
2209 
2210   write_blocked_streams_->AddStream(id);
2211 }
2212 
HasDataToWrite() const2213 bool QuicSession::HasDataToWrite() const {
2214   return write_blocked_streams_->HasWriteBlockedSpecialStream() ||
2215          write_blocked_streams_->HasWriteBlockedDataStreams() ||
2216          connection_->HasQueuedData() ||
2217          !streams_with_pending_retransmission_.empty() ||
2218          control_frame_manager_.WillingToWrite();
2219 }
2220 
OnAckNeedsRetransmittableFrame()2221 void QuicSession::OnAckNeedsRetransmittableFrame() {
2222   flow_controller_.SendWindowUpdate();
2223 }
2224 
SendAckFrequency(const QuicAckFrequencyFrame & frame)2225 void QuicSession::SendAckFrequency(const QuicAckFrequencyFrame& frame) {
2226   control_frame_manager_.WriteOrBufferAckFrequency(frame);
2227 }
2228 
SendNewConnectionId(const QuicNewConnectionIdFrame & frame)2229 void QuicSession::SendNewConnectionId(const QuicNewConnectionIdFrame& frame) {
2230   control_frame_manager_.WriteOrBufferNewConnectionId(
2231       frame.connection_id, frame.sequence_number, frame.retire_prior_to,
2232       frame.stateless_reset_token);
2233 }
2234 
SendRetireConnectionId(uint64_t sequence_number)2235 void QuicSession::SendRetireConnectionId(uint64_t sequence_number) {
2236   if (GetQuicReloadableFlag(
2237           quic_no_write_control_frame_upon_connection_close2)) {
2238     QUIC_RELOADABLE_FLAG_COUNT(
2239         quic_no_write_control_frame_upon_connection_close2);
2240     if (!connection_->connected()) {
2241       return;
2242     }
2243   }
2244   control_frame_manager_.WriteOrBufferRetireConnectionId(sequence_number);
2245 }
2246 
MaybeReserveConnectionId(const QuicConnectionId & server_connection_id)2247 bool QuicSession::MaybeReserveConnectionId(
2248     const QuicConnectionId& server_connection_id) {
2249   if (visitor_) {
2250     return visitor_->TryAddNewConnectionId(
2251         connection_->GetOneActiveServerConnectionId(), server_connection_id);
2252   }
2253   return true;
2254 }
2255 
OnServerConnectionIdRetired(const QuicConnectionId & server_connection_id)2256 void QuicSession::OnServerConnectionIdRetired(
2257     const QuicConnectionId& server_connection_id) {
2258   if (visitor_) {
2259     visitor_->OnConnectionIdRetired(server_connection_id);
2260   }
2261 }
2262 
IsConnectionFlowControlBlocked() const2263 bool QuicSession::IsConnectionFlowControlBlocked() const {
2264   return flow_controller_.IsBlocked();
2265 }
2266 
IsStreamFlowControlBlocked()2267 bool QuicSession::IsStreamFlowControlBlocked() {
2268   for (auto const& kv : stream_map_) {
2269     if (kv.second->IsFlowControlBlocked()) {
2270       return true;
2271     }
2272   }
2273   if (!QuicVersionUsesCryptoFrames(transport_version()) &&
2274       GetMutableCryptoStream()->IsFlowControlBlocked()) {
2275     return true;
2276   }
2277   return false;
2278 }
2279 
MaxAvailableBidirectionalStreams() const2280 size_t QuicSession::MaxAvailableBidirectionalStreams() const {
2281   if (VersionHasIetfQuicFrames(transport_version())) {
2282     return ietf_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
2283   }
2284   return stream_id_manager_.MaxAvailableStreams();
2285 }
2286 
MaxAvailableUnidirectionalStreams() const2287 size_t QuicSession::MaxAvailableUnidirectionalStreams() const {
2288   if (VersionHasIetfQuicFrames(transport_version())) {
2289     return ietf_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
2290   }
2291   return stream_id_manager_.MaxAvailableStreams();
2292 }
2293 
IsIncomingStream(QuicStreamId id) const2294 bool QuicSession::IsIncomingStream(QuicStreamId id) const {
2295   if (VersionHasIetfQuicFrames(transport_version())) {
2296     return !QuicUtils::IsOutgoingStreamId(version(), id, perspective_);
2297   }
2298   return stream_id_manager_.IsIncomingStream(id);
2299 }
2300 
MaybeCloseZombieStream(QuicStreamId id)2301 void QuicSession::MaybeCloseZombieStream(QuicStreamId id) {
2302   auto it = stream_map_.find(id);
2303   if (it == stream_map_.end()) {
2304     return;
2305   }
2306   --num_zombie_streams_;
2307   closed_streams_.push_back(std::move(it->second));
2308   stream_map_.erase(it);
2309 
2310   if (!closed_streams_clean_up_alarm_->IsSet()) {
2311     closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
2312   }
2313   // Do not retransmit data of a closed stream.
2314   streams_with_pending_retransmission_.erase(id);
2315   connection_->QuicBugIfHasPendingFrames(id);
2316 }
2317 
GetStream(QuicStreamId id) const2318 QuicStream* QuicSession::GetStream(QuicStreamId id) const {
2319   auto active_stream = stream_map_.find(id);
2320   if (active_stream != stream_map_.end()) {
2321     return active_stream->second.get();
2322   }
2323 
2324   if (QuicUtils::IsCryptoStreamId(transport_version(), id)) {
2325     return const_cast<QuicCryptoStream*>(GetCryptoStream());
2326   }
2327 
2328   return nullptr;
2329 }
2330 
GetActiveStream(QuicStreamId id) const2331 QuicStream* QuicSession::GetActiveStream(QuicStreamId id) const {
2332   auto stream = stream_map_.find(id);
2333   if (stream != stream_map_.end() && !stream->second->is_static()) {
2334     return stream->second.get();
2335   }
2336   return nullptr;
2337 }
2338 
OnFrameAcked(const QuicFrame & frame,QuicTime::Delta ack_delay_time,QuicTime receive_timestamp)2339 bool QuicSession::OnFrameAcked(const QuicFrame& frame,
2340                                QuicTime::Delta ack_delay_time,
2341                                QuicTime receive_timestamp) {
2342   if (frame.type == MESSAGE_FRAME) {
2343     OnMessageAcked(frame.message_frame->message_id, receive_timestamp);
2344     return true;
2345   }
2346   if (frame.type == CRYPTO_FRAME) {
2347     return GetMutableCryptoStream()->OnCryptoFrameAcked(*frame.crypto_frame,
2348                                                         ack_delay_time);
2349   }
2350   if (frame.type != STREAM_FRAME) {
2351     bool acked = control_frame_manager_.OnControlFrameAcked(frame);
2352     if (acked && frame.type == MAX_STREAMS_FRAME) {
2353       // Since there is a 2 frame limit on the number of outstanding max_streams
2354       // frames, when an outstanding max_streams frame is ack'd that frees up
2355       // room to potntially send another.
2356       ietf_streamid_manager_.MaybeSendMaxStreamsFrame();
2357     }
2358     return acked;
2359   }
2360   bool new_stream_data_acked = false;
2361   QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2362   // Stream can already be reset when sent frame gets acked.
2363   if (stream != nullptr) {
2364     QuicByteCount newly_acked_length = 0;
2365     new_stream_data_acked = stream->OnStreamFrameAcked(
2366         frame.stream_frame.offset, frame.stream_frame.data_length,
2367         frame.stream_frame.fin, ack_delay_time, receive_timestamp,
2368         &newly_acked_length);
2369     if (!stream->HasPendingRetransmission()) {
2370       streams_with_pending_retransmission_.erase(stream->id());
2371     }
2372   }
2373   return new_stream_data_acked;
2374 }
2375 
OnStreamFrameRetransmitted(const QuicStreamFrame & frame)2376 void QuicSession::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) {
2377   QuicStream* stream = GetStream(frame.stream_id);
2378   if (stream == nullptr) {
2379     QUIC_BUG(quic_bug_10866_12)
2380         << "Stream: " << frame.stream_id << " is closed when " << frame
2381         << " is retransmitted.";
2382     connection()->CloseConnection(
2383         QUIC_INTERNAL_ERROR, "Attempt to retransmit frame of a closed stream",
2384         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2385     return;
2386   }
2387   stream->OnStreamFrameRetransmitted(frame.offset, frame.data_length,
2388                                      frame.fin);
2389 }
2390 
OnFrameLost(const QuicFrame & frame)2391 void QuicSession::OnFrameLost(const QuicFrame& frame) {
2392   if (frame.type == MESSAGE_FRAME) {
2393     ++total_datagrams_lost_;
2394     OnMessageLost(frame.message_frame->message_id);
2395     return;
2396   }
2397   if (frame.type == CRYPTO_FRAME) {
2398     GetMutableCryptoStream()->OnCryptoFrameLost(frame.crypto_frame);
2399     return;
2400   }
2401   if (frame.type != STREAM_FRAME) {
2402     control_frame_manager_.OnControlFrameLost(frame);
2403     return;
2404   }
2405   QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2406   if (stream == nullptr) {
2407     return;
2408   }
2409   stream->OnStreamFrameLost(frame.stream_frame.offset,
2410                             frame.stream_frame.data_length,
2411                             frame.stream_frame.fin);
2412   if (stream->HasPendingRetransmission() &&
2413       !streams_with_pending_retransmission_.contains(
2414           frame.stream_frame.stream_id)) {
2415     streams_with_pending_retransmission_.insert(
2416         std::make_pair(frame.stream_frame.stream_id, true));
2417   }
2418 }
2419 
RetransmitFrames(const QuicFrames & frames,TransmissionType type)2420 bool QuicSession::RetransmitFrames(const QuicFrames& frames,
2421                                    TransmissionType type) {
2422   QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
2423   for (const QuicFrame& frame : frames) {
2424     if (frame.type == MESSAGE_FRAME) {
2425       // Do not retransmit MESSAGE frames.
2426       continue;
2427     }
2428     if (frame.type == CRYPTO_FRAME) {
2429       if (!GetMutableCryptoStream()->RetransmitData(frame.crypto_frame, type)) {
2430         return false;
2431       }
2432       continue;
2433     }
2434     if (frame.type != STREAM_FRAME) {
2435       if (!control_frame_manager_.RetransmitControlFrame(frame, type)) {
2436         return false;
2437       }
2438       continue;
2439     }
2440     QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2441     if (stream != nullptr &&
2442         !stream->RetransmitStreamData(frame.stream_frame.offset,
2443                                       frame.stream_frame.data_length,
2444                                       frame.stream_frame.fin, type)) {
2445       return false;
2446     }
2447   }
2448   return true;
2449 }
2450 
IsFrameOutstanding(const QuicFrame & frame) const2451 bool QuicSession::IsFrameOutstanding(const QuicFrame& frame) const {
2452   if (frame.type == MESSAGE_FRAME) {
2453     return false;
2454   }
2455   if (frame.type == CRYPTO_FRAME) {
2456     return GetCryptoStream()->IsFrameOutstanding(
2457         frame.crypto_frame->level, frame.crypto_frame->offset,
2458         frame.crypto_frame->data_length);
2459   }
2460   if (frame.type != STREAM_FRAME) {
2461     return control_frame_manager_.IsControlFrameOutstanding(frame);
2462   }
2463   QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2464   return stream != nullptr &&
2465          stream->IsStreamFrameOutstanding(frame.stream_frame.offset,
2466                                           frame.stream_frame.data_length,
2467                                           frame.stream_frame.fin);
2468 }
2469 
HasUnackedCryptoData() const2470 bool QuicSession::HasUnackedCryptoData() const {
2471   const QuicCryptoStream* crypto_stream = GetCryptoStream();
2472   return crypto_stream->IsWaitingForAcks() || crypto_stream->HasBufferedData();
2473 }
2474 
HasUnackedStreamData() const2475 bool QuicSession::HasUnackedStreamData() const {
2476   for (const auto& it : stream_map_) {
2477     if (it.second->IsWaitingForAcks()) {
2478       return true;
2479     }
2480   }
2481   return false;
2482 }
2483 
GetHandshakeState() const2484 HandshakeState QuicSession::GetHandshakeState() const {
2485   return GetCryptoStream()->GetHandshakeState();
2486 }
2487 
GetFlowControlSendWindowSize(QuicStreamId id)2488 QuicByteCount QuicSession::GetFlowControlSendWindowSize(QuicStreamId id) {
2489   QuicStream* stream = GetActiveStream(id);
2490   if (stream == nullptr) {
2491     // No flow control for invalid or inactive stream ids. Returning uint64max
2492     // allows QuicPacketCreator to write as much data as possible.
2493     return std::numeric_limits<QuicByteCount>::max();
2494   }
2495   return stream->CalculateSendWindowSize();
2496 }
2497 
WriteStreamData(QuicStreamId id,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2498 WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
2499                                                    QuicStreamOffset offset,
2500                                                    QuicByteCount data_length,
2501                                                    QuicDataWriter* writer) {
2502   QuicStream* stream = GetStream(id);
2503   if (stream == nullptr) {
2504     // This causes the connection to be closed because of failed to serialize
2505     // packet.
2506     QUIC_BUG(quic_bug_10866_13)
2507         << "Stream " << id << " does not exist when trying to write data."
2508         << " version:" << transport_version();
2509     return STREAM_MISSING;
2510   }
2511   if (stream->WriteStreamData(offset, data_length, writer)) {
2512     return WRITE_SUCCESS;
2513   }
2514   return WRITE_FAILED;
2515 }
2516 
WriteCryptoData(EncryptionLevel level,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2517 bool QuicSession::WriteCryptoData(EncryptionLevel level,
2518                                   QuicStreamOffset offset,
2519                                   QuicByteCount data_length,
2520                                   QuicDataWriter* writer) {
2521   return GetMutableCryptoStream()->WriteCryptoFrame(level, offset, data_length,
2522                                                     writer);
2523 }
2524 
GetStatelessResetToken() const2525 StatelessResetToken QuicSession::GetStatelessResetToken() const {
2526   return QuicUtils::GenerateStatelessResetToken(connection_->connection_id());
2527 }
2528 
CanWriteStreamData() const2529 bool QuicSession::CanWriteStreamData() const {
2530   // Don't write stream data if there are queued data packets.
2531   if (connection_->HasQueuedPackets()) {
2532     return false;
2533   }
2534   // Immediately write handshake data.
2535   if (HasPendingHandshake()) {
2536     return true;
2537   }
2538   return connection_->CanWrite(HAS_RETRANSMITTABLE_DATA);
2539 }
2540 
RetransmitLostData()2541 bool QuicSession::RetransmitLostData() {
2542   QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
2543   // Retransmit crypto data first.
2544   bool uses_crypto_frames = QuicVersionUsesCryptoFrames(transport_version());
2545   if (QuicCryptoStream* const crypto_stream = GetMutableCryptoStream();
2546       uses_crypto_frames && crypto_stream->HasPendingCryptoRetransmission()) {
2547     crypto_stream->WritePendingCryptoRetransmission();
2548   }
2549   // Retransmit crypto data in stream 1 frames (version < 47).
2550   if (!uses_crypto_frames &&
2551       streams_with_pending_retransmission_.contains(
2552           QuicUtils::GetCryptoStreamId(transport_version()))) {
2553     // Retransmit crypto data first.
2554     QuicStream* const crypto_stream =
2555         GetStream(QuicUtils::GetCryptoStreamId(transport_version()));
2556     crypto_stream->OnCanWrite();
2557     QUICHE_DCHECK(CheckStreamWriteBlocked(crypto_stream));
2558     if (crypto_stream->HasPendingRetransmission()) {
2559       // Connection is write blocked.
2560       return false;
2561     } else {
2562       streams_with_pending_retransmission_.erase(
2563           QuicUtils::GetCryptoStreamId(transport_version()));
2564     }
2565   }
2566   if (control_frame_manager_.HasPendingRetransmission()) {
2567     control_frame_manager_.OnCanWrite();
2568     if (control_frame_manager_.HasPendingRetransmission()) {
2569       return false;
2570     }
2571   }
2572   while (!streams_with_pending_retransmission_.empty()) {
2573     if (!CanWriteStreamData()) {
2574       break;
2575     }
2576     // Retransmit lost data on headers and data streams.
2577     const QuicStreamId id = streams_with_pending_retransmission_.begin()->first;
2578     QuicStream* stream = GetStream(id);
2579     if (stream != nullptr) {
2580       stream->OnCanWrite();
2581       QUICHE_DCHECK(CheckStreamWriteBlocked(stream));
2582       if (stream->HasPendingRetransmission()) {
2583         // Connection is write blocked.
2584         break;
2585       } else if (!streams_with_pending_retransmission_.empty() &&
2586                  streams_with_pending_retransmission_.begin()->first == id) {
2587         // Retransmit lost data may cause connection close. If this stream
2588         // has not yet sent fin, a RST_STREAM will be sent and it will be
2589         // removed from streams_with_pending_retransmission_.
2590         streams_with_pending_retransmission_.pop_front();
2591       }
2592     } else {
2593       QUIC_BUG(quic_bug_10866_14)
2594           << "Try to retransmit data of a closed stream";
2595       streams_with_pending_retransmission_.pop_front();
2596     }
2597   }
2598 
2599   return streams_with_pending_retransmission_.empty();
2600 }
2601 
NeuterUnencryptedData()2602 void QuicSession::NeuterUnencryptedData() {
2603   QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
2604   crypto_stream->NeuterUnencryptedStreamData();
2605   if (!crypto_stream->HasPendingRetransmission() &&
2606       !QuicVersionUsesCryptoFrames(transport_version())) {
2607     streams_with_pending_retransmission_.erase(
2608         QuicUtils::GetCryptoStreamId(transport_version()));
2609   }
2610   connection_->NeuterUnencryptedPackets();
2611 }
2612 
SetTransmissionType(TransmissionType type)2613 void QuicSession::SetTransmissionType(TransmissionType type) {
2614   connection_->SetTransmissionType(type);
2615 }
2616 
SendMessage(absl::Span<quiche::QuicheMemSlice> message)2617 MessageResult QuicSession::SendMessage(
2618     absl::Span<quiche::QuicheMemSlice> message) {
2619   return SendMessage(message, /*flush=*/false);
2620 }
2621 
SendMessage(quiche::QuicheMemSlice message)2622 MessageResult QuicSession::SendMessage(quiche::QuicheMemSlice message) {
2623   return SendMessage(absl::MakeSpan(&message, 1), /*flush=*/false);
2624 }
2625 
SendMessage(absl::Span<quiche::QuicheMemSlice> message,bool flush)2626 MessageResult QuicSession::SendMessage(
2627     absl::Span<quiche::QuicheMemSlice> message, bool flush) {
2628   QUICHE_DCHECK(connection_->connected())
2629       << ENDPOINT << "Try to write messages when connection is closed.";
2630   if (!IsEncryptionEstablished()) {
2631     return {MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED, 0};
2632   }
2633   QuicConnection::ScopedEncryptionLevelContext context(
2634       connection(), GetEncryptionLevelToSendApplicationData());
2635   MessageStatus result =
2636       connection_->SendMessage(last_message_id_ + 1, message, flush);
2637   if (result == MESSAGE_STATUS_SUCCESS) {
2638     return {result, ++last_message_id_};
2639   }
2640   return {result, 0};
2641 }
2642 
OnMessageAcked(QuicMessageId message_id,QuicTime)2643 void QuicSession::OnMessageAcked(QuicMessageId message_id,
2644                                  QuicTime /*receive_timestamp*/) {
2645   QUIC_DVLOG(1) << ENDPOINT << "message " << message_id << " gets acked.";
2646 }
2647 
OnMessageLost(QuicMessageId message_id)2648 void QuicSession::OnMessageLost(QuicMessageId message_id) {
2649   QUIC_DVLOG(1) << ENDPOINT << "message " << message_id
2650                 << " is considered lost";
2651 }
2652 
CleanUpClosedStreams()2653 void QuicSession::CleanUpClosedStreams() { closed_streams_.clear(); }
2654 
GetCurrentLargestMessagePayload() const2655 QuicPacketLength QuicSession::GetCurrentLargestMessagePayload() const {
2656   return connection_->GetCurrentLargestMessagePayload();
2657 }
2658 
GetGuaranteedLargestMessagePayload() const2659 QuicPacketLength QuicSession::GetGuaranteedLargestMessagePayload() const {
2660   return connection_->GetGuaranteedLargestMessagePayload();
2661 }
2662 
next_outgoing_bidirectional_stream_id() const2663 QuicStreamId QuicSession::next_outgoing_bidirectional_stream_id() const {
2664   if (VersionHasIetfQuicFrames(transport_version())) {
2665     return ietf_streamid_manager_.next_outgoing_bidirectional_stream_id();
2666   }
2667   return stream_id_manager_.next_outgoing_stream_id();
2668 }
2669 
next_outgoing_unidirectional_stream_id() const2670 QuicStreamId QuicSession::next_outgoing_unidirectional_stream_id() const {
2671   if (VersionHasIetfQuicFrames(transport_version())) {
2672     return ietf_streamid_manager_.next_outgoing_unidirectional_stream_id();
2673   }
2674   return stream_id_manager_.next_outgoing_stream_id();
2675 }
2676 
OnMaxStreamsFrame(const QuicMaxStreamsFrame & frame)2677 bool QuicSession::OnMaxStreamsFrame(const QuicMaxStreamsFrame& frame) {
2678   const bool allow_new_streams =
2679       frame.unidirectional
2680           ? ietf_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
2681                 frame.stream_count)
2682           : ietf_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
2683                 frame.stream_count);
2684   if (allow_new_streams) {
2685     OnCanCreateNewOutgoingStream(frame.unidirectional);
2686   }
2687 
2688   return true;
2689 }
2690 
OnStreamsBlockedFrame(const QuicStreamsBlockedFrame & frame)2691 bool QuicSession::OnStreamsBlockedFrame(const QuicStreamsBlockedFrame& frame) {
2692   std::string error_details;
2693   if (ietf_streamid_manager_.OnStreamsBlockedFrame(frame, &error_details)) {
2694     return true;
2695   }
2696   connection_->CloseConnection(
2697       QUIC_STREAMS_BLOCKED_ERROR, error_details,
2698       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2699   return false;
2700 }
2701 
max_open_incoming_bidirectional_streams() const2702 size_t QuicSession::max_open_incoming_bidirectional_streams() const {
2703   if (VersionHasIetfQuicFrames(transport_version())) {
2704     return ietf_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
2705   }
2706   return stream_id_manager_.max_open_incoming_streams();
2707 }
2708 
max_open_incoming_unidirectional_streams() const2709 size_t QuicSession::max_open_incoming_unidirectional_streams() const {
2710   if (VersionHasIetfQuicFrames(transport_version())) {
2711     return ietf_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
2712   }
2713   return stream_id_manager_.max_open_incoming_streams();
2714 }
2715 
SelectAlpn(const std::vector<absl::string_view> & alpns) const2716 std::vector<absl::string_view>::const_iterator QuicSession::SelectAlpn(
2717     const std::vector<absl::string_view>& alpns) const {
2718   const std::string alpn = AlpnForVersion(connection()->version());
2719   return std::find(alpns.cbegin(), alpns.cend(), alpn);
2720 }
2721 
OnAlpnSelected(absl::string_view alpn)2722 void QuicSession::OnAlpnSelected(absl::string_view alpn) {
2723   QUIC_DLOG(INFO) << (perspective() == Perspective::IS_SERVER ? "Server: "
2724                                                               : "Client: ")
2725                   << "ALPN selected: " << alpn;
2726 }
2727 
NeuterCryptoDataOfEncryptionLevel(EncryptionLevel level)2728 void QuicSession::NeuterCryptoDataOfEncryptionLevel(EncryptionLevel level) {
2729   GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(level);
2730 }
2731 
PerformActionOnActiveStreams(quiche::UnretainedCallback<bool (QuicStream *)> action)2732 void QuicSession::PerformActionOnActiveStreams(
2733     quiche::UnretainedCallback<bool(QuicStream*)> action) {
2734   std::vector<QuicStream*> active_streams;
2735   for (const auto& it : stream_map_) {
2736     if (!it.second->is_static() && !it.second->IsZombie()) {
2737       active_streams.push_back(it.second.get());
2738     }
2739   }
2740 
2741   for (QuicStream* stream : active_streams) {
2742     if (!action(stream)) {
2743       return;
2744     }
2745   }
2746 }
2747 
PerformActionOnActiveStreams(quiche::UnretainedCallback<bool (QuicStream *)> action) const2748 void QuicSession::PerformActionOnActiveStreams(
2749     quiche::UnretainedCallback<bool(QuicStream*)> action) const {
2750   for (const auto& it : stream_map_) {
2751     if (!it.second->is_static() && !it.second->IsZombie() &&
2752         !action(it.second.get())) {
2753       return;
2754     }
2755   }
2756 }
2757 
GetEncryptionLevelToSendApplicationData() const2758 EncryptionLevel QuicSession::GetEncryptionLevelToSendApplicationData() const {
2759   return connection_->framer().GetEncryptionLevelToSendApplicationData();
2760 }
2761 
ProcessAllPendingStreams()2762 void QuicSession::ProcessAllPendingStreams() {
2763   std::vector<PendingStream*> pending_streams;
2764   pending_streams.reserve(pending_stream_map_.size());
2765   for (auto it = pending_stream_map_.begin(); it != pending_stream_map_.end();
2766        ++it) {
2767     pending_streams.push_back(it->second.get());
2768   }
2769   for (auto* pending_stream : pending_streams) {
2770     if (!MaybeProcessPendingStream(pending_stream)) {
2771       // Defer any further pending stream processing to the next event loop.
2772       return;
2773     }
2774   }
2775 }
2776 
ValidatePath(std::unique_ptr<QuicPathValidationContext> context,std::unique_ptr<QuicPathValidator::ResultDelegate> result_delegate,PathValidationReason reason)2777 void QuicSession::ValidatePath(
2778     std::unique_ptr<QuicPathValidationContext> context,
2779     std::unique_ptr<QuicPathValidator::ResultDelegate> result_delegate,
2780     PathValidationReason reason) {
2781   connection_->ValidatePath(std::move(context), std::move(result_delegate),
2782                             reason);
2783 }
2784 
HasPendingPathValidation() const2785 bool QuicSession::HasPendingPathValidation() const {
2786   return connection_->HasPendingPathValidation();
2787 }
2788 
MigratePath(const QuicSocketAddress & self_address,const QuicSocketAddress & peer_address,QuicPacketWriter * writer,bool owns_writer)2789 bool QuicSession::MigratePath(const QuicSocketAddress& self_address,
2790                               const QuicSocketAddress& peer_address,
2791                               QuicPacketWriter* writer, bool owns_writer) {
2792   return connection_->MigratePath(self_address, peer_address, writer,
2793                                   owns_writer);
2794 }
2795 
ValidateToken(absl::string_view token)2796 bool QuicSession::ValidateToken(absl::string_view token) {
2797   QUICHE_DCHECK_EQ(perspective_, Perspective::IS_SERVER);
2798   if (GetQuicFlag(quic_reject_retry_token_in_initial_packet)) {
2799     return false;
2800   }
2801   if (token.empty() || token[0] != kAddressTokenPrefix) {
2802     // Validate the prefix for token received in NEW_TOKEN frame.
2803     return false;
2804   }
2805   const bool valid = GetCryptoStream()->ValidateAddressToken(
2806       absl::string_view(token.data() + 1, token.length() - 1));
2807   if (valid) {
2808     const CachedNetworkParameters* cached_network_params =
2809         GetCryptoStream()->PreviousCachedNetworkParams();
2810     if (cached_network_params != nullptr &&
2811         cached_network_params->timestamp() > 0) {
2812       connection()->OnReceiveConnectionState(*cached_network_params);
2813     }
2814   }
2815   return valid;
2816 }
2817 
OnServerPreferredAddressAvailable(const QuicSocketAddress & server_preferred_address)2818 void QuicSession::OnServerPreferredAddressAvailable(
2819     const QuicSocketAddress& server_preferred_address) {
2820   QUICHE_DCHECK_EQ(perspective_, Perspective::IS_CLIENT);
2821   if (visitor_ != nullptr) {
2822     visitor_->OnServerPreferredAddressAvailable(server_preferred_address);
2823   }
2824 }
2825 
ProcessPendingStream(PendingStream * pending)2826 QuicStream* QuicSession::ProcessPendingStream(PendingStream* pending) {
2827   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
2828   QUICHE_DCHECK(connection()->connected());
2829   QuicStreamId stream_id = pending->id();
2830   QUIC_BUG_IF(bad pending stream, !IsIncomingStream(stream_id))
2831       << "Pending stream " << stream_id << " is not an incoming stream.";
2832   // TODO(b/305051334) check if this stream is incoming stream before making it
2833   // pending. If not, connection should be closed.
2834   StreamType stream_type = QuicUtils::GetStreamType(
2835       stream_id, perspective(), /*peer_initiated=*/true, version());
2836   switch (stream_type) {
2837     case BIDIRECTIONAL: {
2838       return ProcessBidirectionalPendingStream(pending);
2839     }
2840     case READ_UNIDIRECTIONAL: {
2841       return ProcessReadUnidirectionalPendingStream(pending);
2842     }
2843     case WRITE_UNIDIRECTIONAL:
2844       ABSL_FALLTHROUGH_INTENDED;
2845     case CRYPTO:
2846       QUICHE_BUG(unexpected pending stream)
2847           << "Unexpected pending stream " << stream_id << " with type "
2848           << stream_type;
2849       return nullptr;
2850   }
2851   return nullptr;  // Unreachable, unless the enum value is out-of-range
2852                    // (potentially undefined behavior)
2853 }
2854 
ExceedsPerLoopStreamLimit() const2855 bool QuicSession::ExceedsPerLoopStreamLimit() const {
2856   QUICHE_DCHECK(version().HasIetfQuicFrames());
2857   return new_incoming_streams_in_current_loop_ >=
2858          max_streams_accepted_per_loop_;
2859 }
2860 
OnStreamCountReset()2861 void QuicSession::OnStreamCountReset() {
2862   const bool exceeded_per_loop_stream_limit = ExceedsPerLoopStreamLimit();
2863   new_incoming_streams_in_current_loop_ = 0;
2864   if (exceeded_per_loop_stream_limit) {
2865     QUIC_CODE_COUNT_N(quic_pending_stream, 2, 3);
2866     // Convert as many leftover pending streams from last loop to active streams
2867     // as allowed.
2868     ProcessAllPendingStreams();
2869   }
2870 }
2871 
2872 #undef ENDPOINT  // undef for jumbo builds
2873 }  // namespace quic
2874