1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/quic_session.h"
6
7 #include <cstdint>
8 #include <string>
9 #include <utility>
10
11 #include "absl/memory/memory.h"
12 #include "absl/strings/str_cat.h"
13 #include "absl/strings/string_view.h"
14 #include "quiche/quic/core/frames/quic_ack_frequency_frame.h"
15 #include "quiche/quic/core/frames/quic_window_update_frame.h"
16 #include "quiche/quic/core/quic_connection.h"
17 #include "quiche/quic/core/quic_connection_context.h"
18 #include "quiche/quic/core/quic_error_codes.h"
19 #include "quiche/quic/core/quic_flow_controller.h"
20 #include "quiche/quic/core/quic_stream_priority.h"
21 #include "quiche/quic/core/quic_types.h"
22 #include "quiche/quic/core/quic_utils.h"
23 #include "quiche/quic/core/quic_versions.h"
24 #include "quiche/quic/core/quic_write_blocked_list.h"
25 #include "quiche/quic/platform/api/quic_bug_tracker.h"
26 #include "quiche/quic/platform/api/quic_flag_utils.h"
27 #include "quiche/quic/platform/api/quic_flags.h"
28 #include "quiche/quic/platform/api/quic_logging.h"
29 #include "quiche/quic/platform/api/quic_server_stats.h"
30 #include "quiche/quic/platform/api/quic_stack_trace.h"
31 #include "quiche/common/platform/api/quiche_logging.h"
32 #include "quiche/common/quiche_callbacks.h"
33 #include "quiche/common/quiche_text_utils.h"
34
35 namespace quic {
36
37 namespace {
38
39 class ClosedStreamsCleanUpDelegate : public QuicAlarm::Delegate {
40 public:
ClosedStreamsCleanUpDelegate(QuicSession * session)41 explicit ClosedStreamsCleanUpDelegate(QuicSession* session)
42 : session_(session) {}
43 ClosedStreamsCleanUpDelegate(const ClosedStreamsCleanUpDelegate&) = delete;
44 ClosedStreamsCleanUpDelegate& operator=(const ClosedStreamsCleanUpDelegate&) =
45 delete;
46
GetConnectionContext()47 QuicConnectionContext* GetConnectionContext() override {
48 return (session_->connection() == nullptr)
49 ? nullptr
50 : session_->connection()->context();
51 }
52
OnAlarm()53 void OnAlarm() override { session_->CleanUpClosedStreams(); }
54
55 private:
56 QuicSession* session_;
57 };
58
59 class StreamCountResetAlarmDelegate : public QuicAlarm::Delegate {
60 public:
StreamCountResetAlarmDelegate(QuicSession * session)61 explicit StreamCountResetAlarmDelegate(QuicSession* session)
62 : session_(session) {}
63 StreamCountResetAlarmDelegate(const StreamCountResetAlarmDelegate&) = delete;
64 StreamCountResetAlarmDelegate& operator=(
65 const StreamCountResetAlarmDelegate&) = delete;
66
GetConnectionContext()67 QuicConnectionContext* GetConnectionContext() override {
68 return (session_->connection() == nullptr)
69 ? nullptr
70 : session_->connection()->context();
71 }
72
OnAlarm()73 void OnAlarm() override { session_->OnStreamCountReset(); }
74
75 private:
76 QuicSession* session_;
77 };
78
79 } // namespace
80
81 #define ENDPOINT \
82 (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
83
QuicSession(QuicConnection * connection,Visitor * owner,const QuicConfig & config,const ParsedQuicVersionVector & supported_versions,QuicStreamCount num_expected_unidirectional_static_streams)84 QuicSession::QuicSession(
85 QuicConnection* connection, Visitor* owner, const QuicConfig& config,
86 const ParsedQuicVersionVector& supported_versions,
87 QuicStreamCount num_expected_unidirectional_static_streams)
88 : QuicSession(connection, owner, config, supported_versions,
89 num_expected_unidirectional_static_streams, nullptr) {}
90
QuicSession(QuicConnection * connection,Visitor * owner,const QuicConfig & config,const ParsedQuicVersionVector & supported_versions,QuicStreamCount num_expected_unidirectional_static_streams,std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)91 QuicSession::QuicSession(
92 QuicConnection* connection, Visitor* owner, const QuicConfig& config,
93 const ParsedQuicVersionVector& supported_versions,
94 QuicStreamCount num_expected_unidirectional_static_streams,
95 std::unique_ptr<QuicDatagramQueue::Observer> datagram_observer)
96 : connection_(connection),
97 perspective_(connection->perspective()),
98 visitor_(owner),
99 write_blocked_streams_(std::make_unique<QuicWriteBlockedList>()),
100 config_(config),
101 stream_id_manager_(perspective(), connection->transport_version(),
102 kDefaultMaxStreamsPerConnection,
103 config_.GetMaxBidirectionalStreamsToSend()),
104 ietf_streamid_manager_(perspective(), connection->version(), this, 0,
105 num_expected_unidirectional_static_streams,
106 config_.GetMaxBidirectionalStreamsToSend(),
107 config_.GetMaxUnidirectionalStreamsToSend() +
108 num_expected_unidirectional_static_streams),
109 num_draining_streams_(0),
110 num_outgoing_draining_streams_(0),
111 num_static_streams_(0),
112 num_zombie_streams_(0),
113 flow_controller_(
114 this, QuicUtils::GetInvalidStreamId(connection->transport_version()),
115 /*is_connection_flow_controller*/ true,
116 connection->version().AllowsLowFlowControlLimits()
117 ? 0
118 : kMinimumFlowControlSendWindow,
119 config_.GetInitialSessionFlowControlWindowToSend(),
120 kSessionReceiveWindowLimit, perspective() == Perspective::IS_SERVER,
121 nullptr),
122 currently_writing_stream_id_(0),
123 transport_goaway_sent_(false),
124 transport_goaway_received_(false),
125 control_frame_manager_(this),
126 last_message_id_(0),
127 datagram_queue_(this, std::move(datagram_observer)),
128 closed_streams_clean_up_alarm_(nullptr),
129 supported_versions_(supported_versions),
130 is_configured_(false),
131 was_zero_rtt_rejected_(false),
132 liveness_testing_in_progress_(false),
133 stream_count_reset_alarm_(
134 absl::WrapUnique<QuicAlarm>(connection->alarm_factory()->CreateAlarm(
135 new StreamCountResetAlarmDelegate(this)))) {
136 closed_streams_clean_up_alarm_ =
137 absl::WrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
138 new ClosedStreamsCleanUpDelegate(this)));
139 if (VersionHasIetfQuicFrames(transport_version())) {
140 config_.SetMaxUnidirectionalStreamsToSend(
141 config_.GetMaxUnidirectionalStreamsToSend() +
142 num_expected_unidirectional_static_streams);
143 }
144 }
145
Initialize()146 void QuicSession::Initialize() {
147 connection_->set_visitor(this);
148 connection_->SetSessionNotifier(this);
149 connection_->SetDataProducer(this);
150 connection_->SetUnackedMapInitialCapacity();
151 connection_->SetFromConfig(config_);
152 if (perspective_ == Perspective::IS_CLIENT) {
153 if (config_.HasClientRequestedIndependentOption(kAFFE, perspective_) &&
154 version().HasIetfQuicFrames()) {
155 connection_->set_can_receive_ack_frequency_frame();
156 config_.SetMinAckDelayMs(kDefaultMinAckDelayTimeMs);
157 }
158 }
159 if (perspective() == Perspective::IS_SERVER &&
160 connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
161 config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
162 }
163
164 connection_->CreateConnectionIdManager();
165
166 // On the server side, version negotiation has been done by the dispatcher,
167 // and the server session is created with the right version.
168 if (perspective() == Perspective::IS_SERVER) {
169 connection_->OnSuccessfulVersionNegotiation();
170 }
171
172 if (QuicVersionUsesCryptoFrames(transport_version())) {
173 return;
174 }
175
176 QUICHE_DCHECK_EQ(QuicUtils::GetCryptoStreamId(transport_version()),
177 GetMutableCryptoStream()->id());
178 }
179
~QuicSession()180 QuicSession::~QuicSession() {
181 if (closed_streams_clean_up_alarm_ != nullptr) {
182 closed_streams_clean_up_alarm_->PermanentCancel();
183 }
184 if (stream_count_reset_alarm_ != nullptr) {
185 stream_count_reset_alarm_->PermanentCancel();
186 }
187 }
188
PendingStreamOnStreamFrame(const QuicStreamFrame & frame)189 PendingStream* QuicSession::PendingStreamOnStreamFrame(
190 const QuicStreamFrame& frame) {
191 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
192 QuicStreamId stream_id = frame.stream_id;
193
194 PendingStream* pending = GetOrCreatePendingStream(stream_id);
195
196 if (!pending) {
197 if (frame.fin) {
198 QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
199 OnFinalByteOffsetReceived(stream_id, final_byte_offset);
200 }
201 return nullptr;
202 }
203
204 pending->OnStreamFrame(frame);
205 if (!connection()->connected()) {
206 return nullptr;
207 }
208 return pending;
209 }
210
MaybeProcessPendingStream(PendingStream * pending)211 bool QuicSession::MaybeProcessPendingStream(PendingStream* pending) {
212 QUICHE_DCHECK(pending != nullptr && connection()->connected());
213
214 if (ExceedsPerLoopStreamLimit()) {
215 QUIC_DLOG(INFO) << "Skip processing pending stream " << pending->id()
216 << " because it exceeds per loop limit.";
217 QUIC_CODE_COUNT_N(quic_pending_stream, 1, 3);
218 return false;
219 }
220
221 QuicStreamId stream_id = pending->id();
222 std::optional<QuicResetStreamError> stop_sending_error_code =
223 pending->GetStopSendingErrorCode();
224 QUIC_DLOG(INFO) << "Process pending stream " << pending->id();
225 QuicStream* stream = ProcessPendingStream(pending);
226 if (stream != nullptr) {
227 // The pending stream should now be in the scope of normal streams.
228 QUICHE_DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
229 << "Stream " << stream_id << " not created";
230 if (!stream->pending_duration().IsZero()) {
231 QUIC_SERVER_HISTOGRAM_TIMES("QuicStream.PendingDurationUs",
232 stream->pending_duration().ToMicroseconds(),
233 0, 1000 * 100, 20,
234 "Time a stream has been pending at server.");
235 ++connection()->mutable_stats().num_total_pending_streams;
236 }
237 pending_stream_map_.erase(stream_id);
238 if (stop_sending_error_code) {
239 stream->OnStopSending(*stop_sending_error_code);
240 if (!connection()->connected()) {
241 return false;
242 }
243 }
244 stream->OnStreamCreatedFromPendingStream();
245 return connection()->connected();
246 }
247 // At this point, none of the bytes has been successfully consumed by the
248 // application layer. We should close the pending stream even if it is
249 // bidirectionl as no application will be able to write in a bidirectional
250 // stream with zero byte as input.
251 if (pending->sequencer()->IsClosed()) {
252 ClosePendingStream(stream_id);
253 }
254 return connection()->connected();
255 }
256
PendingStreamOnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)257 void QuicSession::PendingStreamOnWindowUpdateFrame(
258 const QuicWindowUpdateFrame& frame) {
259 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
260 PendingStream* pending = GetOrCreatePendingStream(frame.stream_id);
261 if (pending) {
262 pending->OnWindowUpdateFrame(frame);
263 }
264 }
265
PendingStreamOnStopSendingFrame(const QuicStopSendingFrame & frame)266 void QuicSession::PendingStreamOnStopSendingFrame(
267 const QuicStopSendingFrame& frame) {
268 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
269 PendingStream* pending = GetOrCreatePendingStream(frame.stream_id);
270 if (pending) {
271 pending->OnStopSending(frame.error());
272 }
273 }
274
OnStreamFrame(const QuicStreamFrame & frame)275 void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
276 QuicStreamId stream_id = frame.stream_id;
277 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
278 connection()->CloseConnection(
279 QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
280 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
281 return;
282 }
283
284 if (ShouldProcessFrameByPendingStream(STREAM_FRAME, stream_id)) {
285 PendingStream* pending = PendingStreamOnStreamFrame(frame);
286 if (pending != nullptr && IsEncryptionEstablished()) {
287 MaybeProcessPendingStream(pending);
288 }
289 return;
290 }
291
292 QuicStream* stream = GetOrCreateStream(stream_id);
293
294 if (!stream) {
295 // The stream no longer exists, but we may still be interested in the
296 // final stream byte offset sent by the peer. A frame with a FIN can give
297 // us this offset.
298 if (frame.fin) {
299 QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
300 OnFinalByteOffsetReceived(stream_id, final_byte_offset);
301 }
302 return;
303 }
304 stream->OnStreamFrame(frame);
305 }
306
OnCryptoFrame(const QuicCryptoFrame & frame)307 void QuicSession::OnCryptoFrame(const QuicCryptoFrame& frame) {
308 GetMutableCryptoStream()->OnCryptoFrame(frame);
309 }
310
OnStopSendingFrame(const QuicStopSendingFrame & frame)311 void QuicSession::OnStopSendingFrame(const QuicStopSendingFrame& frame) {
312 // STOP_SENDING is in IETF QUIC only.
313 QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
314 QUICHE_DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
315
316 QuicStreamId stream_id = frame.stream_id;
317 // If Stream ID is invalid then close the connection.
318 // TODO(ianswett): This check is redundant to checks for IsClosedStream,
319 // but removing it requires removing multiple QUICHE_DCHECKs.
320 // TODO(ianswett): Multiple QUIC_DVLOGs could be QUIC_PEER_BUGs.
321 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
322 QUIC_DVLOG(1) << ENDPOINT
323 << "Received STOP_SENDING with invalid stream_id: "
324 << stream_id << " Closing connection";
325 connection()->CloseConnection(
326 QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for an invalid stream",
327 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
328 return;
329 }
330
331 // If stream_id is READ_UNIDIRECTIONAL, close the connection.
332 if (QuicUtils::GetStreamType(stream_id, perspective(),
333 IsIncomingStream(stream_id),
334 version()) == READ_UNIDIRECTIONAL) {
335 QUIC_DVLOG(1) << ENDPOINT
336 << "Received STOP_SENDING for a read-only stream_id: "
337 << stream_id << ".";
338 connection()->CloseConnection(
339 QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a read-only stream",
340 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
341 return;
342 }
343
344 if (visitor_) {
345 visitor_->OnStopSendingReceived(frame);
346 }
347 if (ShouldProcessFrameByPendingStream(STOP_SENDING_FRAME, stream_id)) {
348 PendingStreamOnStopSendingFrame(frame);
349 return;
350 }
351
352 QuicStream* stream = GetOrCreateStream(stream_id);
353 if (!stream) {
354 // Errors are handled by GetOrCreateStream.
355 return;
356 }
357
358 stream->OnStopSending(frame.error());
359 }
360
OnPacketDecrypted(EncryptionLevel level)361 void QuicSession::OnPacketDecrypted(EncryptionLevel level) {
362 GetMutableCryptoStream()->OnPacketDecrypted(level);
363 if (liveness_testing_in_progress_) {
364 liveness_testing_in_progress_ = false;
365 OnCanCreateNewOutgoingStream(/*unidirectional=*/false);
366 }
367 }
368
OnOneRttPacketAcknowledged()369 void QuicSession::OnOneRttPacketAcknowledged() {
370 GetMutableCryptoStream()->OnOneRttPacketAcknowledged();
371 }
372
OnHandshakePacketSent()373 void QuicSession::OnHandshakePacketSent() {
374 GetMutableCryptoStream()->OnHandshakePacketSent();
375 }
376
377 std::unique_ptr<QuicDecrypter>
AdvanceKeysAndCreateCurrentOneRttDecrypter()378 QuicSession::AdvanceKeysAndCreateCurrentOneRttDecrypter() {
379 return GetMutableCryptoStream()->AdvanceKeysAndCreateCurrentOneRttDecrypter();
380 }
381
CreateCurrentOneRttEncrypter()382 std::unique_ptr<QuicEncrypter> QuicSession::CreateCurrentOneRttEncrypter() {
383 return GetMutableCryptoStream()->CreateCurrentOneRttEncrypter();
384 }
385
PendingStreamOnRstStream(const QuicRstStreamFrame & frame)386 void QuicSession::PendingStreamOnRstStream(const QuicRstStreamFrame& frame) {
387 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
388 QuicStreamId stream_id = frame.stream_id;
389
390 PendingStream* pending = GetOrCreatePendingStream(stream_id);
391
392 if (!pending) {
393 HandleRstOnValidNonexistentStream(frame);
394 return;
395 }
396
397 pending->OnRstStreamFrame(frame);
398 // At this point, none of the bytes has been consumed by the application
399 // layer. It is safe to close the pending stream even if it is bidirectionl as
400 // no application will be able to write in a bidirectional stream with zero
401 // byte as input.
402 ClosePendingStream(stream_id);
403 }
404
OnRstStream(const QuicRstStreamFrame & frame)405 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
406 QuicStreamId stream_id = frame.stream_id;
407 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
408 connection()->CloseConnection(
409 QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
410 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
411 return;
412 }
413
414 if (VersionHasIetfQuicFrames(transport_version()) &&
415 QuicUtils::GetStreamType(stream_id, perspective(),
416 IsIncomingStream(stream_id),
417 version()) == WRITE_UNIDIRECTIONAL) {
418 connection()->CloseConnection(
419 QUIC_INVALID_STREAM_ID, "Received RESET_STREAM for a write-only stream",
420 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
421 return;
422 }
423
424 if (visitor_) {
425 visitor_->OnRstStreamReceived(frame);
426 }
427
428 if (ShouldProcessFrameByPendingStream(RST_STREAM_FRAME, stream_id)) {
429 PendingStreamOnRstStream(frame);
430 return;
431 }
432
433 QuicStream* stream = GetOrCreateStream(stream_id);
434
435 if (!stream) {
436 HandleRstOnValidNonexistentStream(frame);
437 return; // Errors are handled by GetOrCreateStream.
438 }
439 stream->OnStreamReset(frame);
440 }
441
OnGoAway(const QuicGoAwayFrame &)442 void QuicSession::OnGoAway(const QuicGoAwayFrame& /*frame*/) {
443 QUIC_BUG_IF(quic_bug_12435_1, version().UsesHttp3())
444 << "gQUIC GOAWAY received on version " << version();
445
446 transport_goaway_received_ = true;
447 }
448
OnMessageReceived(absl::string_view message)449 void QuicSession::OnMessageReceived(absl::string_view message) {
450 QUIC_DVLOG(1) << ENDPOINT << "Received message of length "
451 << message.length();
452 QUIC_DVLOG(2) << ENDPOINT << "Contents of message of length "
453 << message.length() << ":" << std::endl
454 << quiche::QuicheTextUtils::HexDump(message);
455 }
456
OnHandshakeDoneReceived()457 void QuicSession::OnHandshakeDoneReceived() {
458 QUIC_DVLOG(1) << ENDPOINT << "OnHandshakeDoneReceived";
459 GetMutableCryptoStream()->OnHandshakeDoneReceived();
460 }
461
OnNewTokenReceived(absl::string_view token)462 void QuicSession::OnNewTokenReceived(absl::string_view token) {
463 QUICHE_DCHECK_EQ(perspective_, Perspective::IS_CLIENT);
464 GetMutableCryptoStream()->OnNewTokenReceived(token);
465 }
466
467 // static
RecordConnectionCloseAtServer(QuicErrorCode error,ConnectionCloseSource source)468 void QuicSession::RecordConnectionCloseAtServer(QuicErrorCode error,
469 ConnectionCloseSource source) {
470 if (error != QUIC_NO_ERROR) {
471 if (source == ConnectionCloseSource::FROM_SELF) {
472 QUIC_SERVER_HISTOGRAM_ENUM(
473 "quic_server_connection_close_errors", error, QUIC_LAST_ERROR,
474 "QuicErrorCode for server-closed connections.");
475 } else {
476 QUIC_SERVER_HISTOGRAM_ENUM(
477 "quic_client_connection_close_errors", error, QUIC_LAST_ERROR,
478 "QuicErrorCode for client-closed connections.");
479 }
480 }
481 }
482
OnConnectionClosed(const QuicConnectionCloseFrame & frame,ConnectionCloseSource source)483 void QuicSession::OnConnectionClosed(const QuicConnectionCloseFrame& frame,
484 ConnectionCloseSource source) {
485 QUICHE_DCHECK(!connection_->connected());
486 if (perspective() == Perspective::IS_SERVER) {
487 RecordConnectionCloseAtServer(frame.quic_error_code, source);
488 }
489
490 if (on_closed_frame_.quic_error_code == QUIC_NO_ERROR) {
491 // Save all of the connection close information
492 on_closed_frame_ = frame;
493 source_ = source;
494 }
495
496 GetMutableCryptoStream()->OnConnectionClosed(frame.quic_error_code, source);
497
498 PerformActionOnActiveStreams([this, frame, source](QuicStream* stream) {
499 QuicStreamId id = stream->id();
500 stream->OnConnectionClosed(frame.quic_error_code, source);
501 auto it = stream_map_.find(id);
502 if (it != stream_map_.end()) {
503 QUIC_BUG_IF(quic_bug_12435_2, !it->second->IsZombie())
504 << ENDPOINT << "Non-zombie stream " << id
505 << " failed to close under OnConnectionClosed";
506 }
507 return true;
508 });
509
510 closed_streams_clean_up_alarm_->Cancel();
511 stream_count_reset_alarm_->Cancel();
512
513 if (visitor_) {
514 visitor_->OnConnectionClosed(connection_->GetOneActiveServerConnectionId(),
515 frame.quic_error_code, frame.error_details,
516 source);
517 }
518 }
519
OnWriteBlocked()520 void QuicSession::OnWriteBlocked() {
521 if (!connection_->connected()) {
522 return;
523 }
524 if (visitor_) {
525 visitor_->OnWriteBlocked(connection_);
526 }
527 }
528
OnSuccessfulVersionNegotiation(const ParsedQuicVersion &)529 void QuicSession::OnSuccessfulVersionNegotiation(
530 const ParsedQuicVersion& /*version*/) {}
531
OnPacketReceived(const QuicSocketAddress &,const QuicSocketAddress & peer_address,bool is_connectivity_probe)532 void QuicSession::OnPacketReceived(const QuicSocketAddress& /*self_address*/,
533 const QuicSocketAddress& peer_address,
534 bool is_connectivity_probe) {
535 QUICHE_DCHECK(!connection_->ignore_gquic_probing());
536 if (is_connectivity_probe && perspective() == Perspective::IS_SERVER) {
537 // Server only sends back a connectivity probe after received a
538 // connectivity probe from a new peer address.
539 connection_->SendConnectivityProbingPacket(nullptr, peer_address);
540 }
541 }
542
OnPathDegrading()543 void QuicSession::OnPathDegrading() {
544 if (visitor_) {
545 visitor_->OnPathDegrading();
546 }
547 }
548
OnForwardProgressMadeAfterPathDegrading()549 void QuicSession::OnForwardProgressMadeAfterPathDegrading() {}
550
AllowSelfAddressChange() const551 bool QuicSession::AllowSelfAddressChange() const { return false; }
552
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)553 void QuicSession::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
554 // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
555 // assume that it still exists.
556 QuicStreamId stream_id = frame.stream_id;
557 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
558 // This is a window update that applies to the connection, rather than an
559 // individual stream.
560 QUIC_DVLOG(1) << ENDPOINT
561 << "Received connection level flow control window "
562 "update with max data: "
563 << frame.max_data;
564 flow_controller_.UpdateSendWindowOffset(frame.max_data);
565 return;
566 }
567
568 if (VersionHasIetfQuicFrames(transport_version()) &&
569 QuicUtils::GetStreamType(stream_id, perspective(),
570 IsIncomingStream(stream_id),
571 version()) == READ_UNIDIRECTIONAL) {
572 connection()->CloseConnection(
573 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
574 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.",
575 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
576 return;
577 }
578
579 if (ShouldProcessFrameByPendingStream(WINDOW_UPDATE_FRAME, stream_id)) {
580 PendingStreamOnWindowUpdateFrame(frame);
581 return;
582 }
583
584 QuicStream* stream = GetOrCreateStream(stream_id);
585 if (stream != nullptr) {
586 stream->OnWindowUpdateFrame(frame);
587 }
588 }
589
OnBlockedFrame(const QuicBlockedFrame & frame)590 void QuicSession::OnBlockedFrame(const QuicBlockedFrame& frame) {
591 // TODO(rjshade): Compare our flow control receive windows for specified
592 // streams: if we have a large window then maybe something
593 // had gone wrong with the flow control accounting.
594 QUIC_DLOG(INFO) << ENDPOINT << "Received BLOCKED frame with stream id: "
595 << frame.stream_id << ", offset: " << frame.offset;
596 }
597
CheckStreamNotBusyLooping(QuicStream * stream,uint64_t previous_bytes_written,bool previous_fin_sent)598 bool QuicSession::CheckStreamNotBusyLooping(QuicStream* stream,
599 uint64_t previous_bytes_written,
600 bool previous_fin_sent) {
601 if ( // Stream should not be closed.
602 !stream->write_side_closed() &&
603 // Not connection flow control blocked.
604 !flow_controller_.IsBlocked() &&
605 // Detect lack of forward progress.
606 previous_bytes_written == stream->stream_bytes_written() &&
607 previous_fin_sent == stream->fin_sent()) {
608 stream->set_busy_counter(stream->busy_counter() + 1);
609 QUIC_DVLOG(1) << ENDPOINT << "Suspected busy loop on stream id "
610 << stream->id() << " stream_bytes_written "
611 << stream->stream_bytes_written() << " fin "
612 << stream->fin_sent() << " count " << stream->busy_counter();
613 // Wait a few iterations before firing, the exact count is
614 // arbitrary, more than a few to cover a few test-only false
615 // positives.
616 if (stream->busy_counter() > 20) {
617 QUIC_LOG(ERROR) << ENDPOINT << "Detected busy loop on stream id "
618 << stream->id() << " stream_bytes_written "
619 << stream->stream_bytes_written() << " fin "
620 << stream->fin_sent();
621 return false;
622 }
623 } else {
624 stream->set_busy_counter(0);
625 }
626 return true;
627 }
628
CheckStreamWriteBlocked(QuicStream * stream) const629 bool QuicSession::CheckStreamWriteBlocked(QuicStream* stream) const {
630 if (!stream->write_side_closed() && stream->HasBufferedData() &&
631 !stream->IsFlowControlBlocked() &&
632 !write_blocked_streams_->IsStreamBlocked(stream->id())) {
633 QUIC_DLOG(ERROR) << ENDPOINT << "stream " << stream->id()
634 << " has buffered " << stream->BufferedDataBytes()
635 << " bytes, and is not flow control blocked, "
636 "but it is not in the write block list.";
637 return false;
638 }
639 return true;
640 }
641
OnCanWrite()642 void QuicSession::OnCanWrite() {
643 if (connection_->framer().is_processing_packet()) {
644 // Do not write data in the middle of packet processing because rest
645 // frames in the packet may change the data to write. For example, lost
646 // data could be acknowledged. Also, connection is going to emit
647 // OnCanWrite signal post packet processing.
648 QUIC_BUG(session_write_mid_packet_processing)
649 << ENDPOINT << "Try to write mid packet processing.";
650 return;
651 }
652 if (!RetransmitLostData()) {
653 // Cannot finish retransmitting lost data, connection is write blocked.
654 QUIC_DVLOG(1) << ENDPOINT
655 << "Cannot finish retransmitting lost data, connection is "
656 "write blocked.";
657 return;
658 }
659 // We limit the number of writes to the number of pending streams. If more
660 // streams become pending, WillingAndAbleToWrite will be true, which will
661 // cause the connection to request resumption before yielding to other
662 // connections.
663 // If we are connection level flow control blocked, then only allow the
664 // crypto and headers streams to try writing as all other streams will be
665 // blocked.
666 size_t num_writes = flow_controller_.IsBlocked()
667 ? write_blocked_streams_->NumBlockedSpecialStreams()
668 : write_blocked_streams_->NumBlockedStreams();
669 if (num_writes == 0 && !control_frame_manager_.WillingToWrite() &&
670 datagram_queue_.empty() &&
671 (!QuicVersionUsesCryptoFrames(transport_version()) ||
672 !GetCryptoStream()->HasBufferedCryptoFrames())) {
673 return;
674 }
675
676 QuicConnection::ScopedPacketFlusher flusher(connection_);
677 if (QuicVersionUsesCryptoFrames(transport_version())) {
678 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
679 if (crypto_stream->HasBufferedCryptoFrames()) {
680 crypto_stream->WriteBufferedCryptoFrames();
681 }
682 if ((GetQuicReloadableFlag(
683 quic_no_write_control_frame_upon_connection_close) &&
684 !connection_->connected()) ||
685 crypto_stream->HasBufferedCryptoFrames()) {
686 if (!connection_->connected()) {
687 QUIC_RELOADABLE_FLAG_COUNT(
688 quic_no_write_control_frame_upon_connection_close);
689 }
690 // Cannot finish writing buffered crypto frames, connection is either
691 // write blocked or closed.
692 return;
693 }
694 }
695 if (control_frame_manager_.WillingToWrite()) {
696 control_frame_manager_.OnCanWrite();
697 }
698 if (version().UsesTls() && GetHandshakeState() != HANDSHAKE_CONFIRMED &&
699 connection_->in_probe_time_out()) {
700 QUIC_CODE_COUNT(quic_donot_pto_stream_data_before_handshake_confirmed);
701 // Do not PTO stream data before handshake gets confirmed.
702 return;
703 }
704 // TODO(b/147146815): this makes all datagrams go before stream data. We
705 // should have a better priority scheme for this.
706 if (!datagram_queue_.empty()) {
707 size_t written = datagram_queue_.SendDatagrams();
708 QUIC_DVLOG(1) << ENDPOINT << "Sent " << written << " datagrams";
709 if (!datagram_queue_.empty()) {
710 return;
711 }
712 }
713 std::vector<QuicStreamId> last_writing_stream_ids;
714 for (size_t i = 0; i < num_writes; ++i) {
715 if (!(write_blocked_streams_->HasWriteBlockedSpecialStream() ||
716 write_blocked_streams_->HasWriteBlockedDataStreams())) {
717 // Writing one stream removed another!? Something's broken.
718 QUIC_BUG(quic_bug_10866_1)
719 << "WriteBlockedStream is missing, num_writes: " << num_writes
720 << ", finished_writes: " << i
721 << ", connected: " << connection_->connected()
722 << ", connection level flow control blocked: "
723 << flow_controller_.IsBlocked();
724 for (QuicStreamId id : last_writing_stream_ids) {
725 QUIC_LOG(WARNING) << "last_writing_stream_id: " << id;
726 }
727 connection_->CloseConnection(QUIC_INTERNAL_ERROR,
728 "WriteBlockedStream is missing",
729 ConnectionCloseBehavior::SILENT_CLOSE);
730 return;
731 }
732 if (!CanWriteStreamData()) {
733 return;
734 }
735 currently_writing_stream_id_ = write_blocked_streams_->PopFront();
736 last_writing_stream_ids.push_back(currently_writing_stream_id_);
737 QUIC_DVLOG(1) << ENDPOINT << "Removing stream "
738 << currently_writing_stream_id_ << " from write-blocked list";
739 QuicStream* stream = GetOrCreateStream(currently_writing_stream_id_);
740 if (stream != nullptr && !stream->IsFlowControlBlocked()) {
741 // If the stream can't write all bytes it'll re-add itself to the blocked
742 // list.
743 uint64_t previous_bytes_written = stream->stream_bytes_written();
744 bool previous_fin_sent = stream->fin_sent();
745 QUIC_DVLOG(1) << ENDPOINT << "stream " << stream->id()
746 << " bytes_written " << previous_bytes_written << " fin "
747 << previous_fin_sent;
748 stream->OnCanWrite();
749 QUICHE_DCHECK(CheckStreamWriteBlocked(stream));
750 QUICHE_DCHECK(CheckStreamNotBusyLooping(stream, previous_bytes_written,
751 previous_fin_sent));
752 }
753 currently_writing_stream_id_ = 0;
754 }
755 }
756
WillingAndAbleToWrite() const757 bool QuicSession::WillingAndAbleToWrite() const {
758 // Schedule a write when:
759 // 1) control frame manager has pending or new control frames, or
760 // 2) any stream has pending retransmissions, or
761 // 3) If the crypto or headers streams are blocked, or
762 // 4) connection is not flow control blocked and there are write blocked
763 // streams.
764 if (QuicVersionUsesCryptoFrames(transport_version())) {
765 if (HasPendingHandshake()) {
766 return true;
767 }
768 if (!IsEncryptionEstablished()) {
769 return false;
770 }
771 }
772 if (control_frame_manager_.WillingToWrite() ||
773 !streams_with_pending_retransmission_.empty()) {
774 return true;
775 }
776 if (flow_controller_.IsBlocked()) {
777 if (VersionUsesHttp3(transport_version())) {
778 return false;
779 }
780 // Crypto and headers streams are not blocked by connection level flow
781 // control.
782 return write_blocked_streams_->HasWriteBlockedSpecialStream();
783 }
784 return write_blocked_streams_->HasWriteBlockedSpecialStream() ||
785 write_blocked_streams_->HasWriteBlockedDataStreams();
786 }
787
GetStreamsInfoForLogging() const788 std::string QuicSession::GetStreamsInfoForLogging() const {
789 std::string info = absl::StrCat(
790 "num_active_streams: ", GetNumActiveStreams(),
791 ", num_pending_streams: ", pending_streams_size(),
792 ", num_outgoing_draining_streams: ", num_outgoing_draining_streams(),
793 " ");
794 // Log info for up to 5 streams.
795 size_t i = 5;
796 for (const auto& it : stream_map_) {
797 if (it.second->is_static()) {
798 continue;
799 }
800 // Calculate the stream creation delay.
801 const QuicTime::Delta delay =
802 connection_->clock()->ApproximateNow() - it.second->creation_time();
803 absl::StrAppend(
804 &info, "{", it.second->id(), ":", delay.ToDebuggingValue(), ";",
805 it.second->stream_bytes_written(), ",", it.second->fin_sent(), ",",
806 it.second->HasBufferedData(), ",", it.second->fin_buffered(), ";",
807 it.second->stream_bytes_read(), ",", it.second->fin_received(), "}");
808 --i;
809 if (i == 0) {
810 break;
811 }
812 }
813 return info;
814 }
815
HasPendingHandshake() const816 bool QuicSession::HasPendingHandshake() const {
817 if (QuicVersionUsesCryptoFrames(transport_version())) {
818 return GetCryptoStream()->HasPendingCryptoRetransmission() ||
819 GetCryptoStream()->HasBufferedCryptoFrames();
820 }
821 return streams_with_pending_retransmission_.contains(
822 QuicUtils::GetCryptoStreamId(transport_version())) ||
823 write_blocked_streams_->IsStreamBlocked(
824 QuicUtils::GetCryptoStreamId(transport_version()));
825 }
826
ProcessUdpPacket(const QuicSocketAddress & self_address,const QuicSocketAddress & peer_address,const QuicReceivedPacket & packet)827 void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
828 const QuicSocketAddress& peer_address,
829 const QuicReceivedPacket& packet) {
830 QuicConnectionContextSwitcher cs(connection_->context());
831 connection_->ProcessUdpPacket(self_address, peer_address, packet);
832 }
833
on_closed_frame_string() const834 std::string QuicSession::on_closed_frame_string() const {
835 std::stringstream ss;
836 ss << on_closed_frame_;
837 if (source_.has_value()) {
838 ss << " " << ConnectionCloseSourceToString(*source_);
839 }
840 return ss.str();
841 }
842
WritevData(QuicStreamId id,size_t write_length,QuicStreamOffset offset,StreamSendingState state,TransmissionType type,EncryptionLevel level)843 QuicConsumedData QuicSession::WritevData(QuicStreamId id, size_t write_length,
844 QuicStreamOffset offset,
845 StreamSendingState state,
846 TransmissionType type,
847 EncryptionLevel level) {
848 QUIC_BUG_IF(session writevdata when disconnected, !connection()->connected())
849 << ENDPOINT << "Try to write stream data when connection is closed: "
850 << on_closed_frame_string();
851 if (!IsEncryptionEstablished() &&
852 !QuicUtils::IsCryptoStreamId(transport_version(), id)) {
853 // Do not let streams write without encryption. The calling stream will end
854 // up write blocked until OnCanWrite is next called.
855 if (was_zero_rtt_rejected_ && !OneRttKeysAvailable()) {
856 QUICHE_DCHECK(version().UsesTls() &&
857 perspective() == Perspective::IS_CLIENT);
858 QUIC_DLOG(INFO) << ENDPOINT
859 << "Suppress the write while 0-RTT gets rejected and "
860 "1-RTT keys are not available. Version: "
861 << ParsedQuicVersionToString(version());
862 } else if (version().UsesTls() || perspective() == Perspective::IS_SERVER) {
863 QUIC_BUG(quic_bug_10866_2)
864 << ENDPOINT << "Try to send data of stream " << id
865 << " before encryption is established. Version: "
866 << ParsedQuicVersionToString(version());
867 } else {
868 // In QUIC crypto, this could happen when the client sends full CHLO and
869 // 0-RTT request, then receives an inchoate REJ and sends an inchoate
870 // CHLO. The client then gets the ACK of the inchoate CHLO or the client
871 // gets the full REJ and needs to verify the proof (before it sends the
872 // full CHLO), such that there is no outstanding crypto data.
873 // Retransmission alarm fires in TLP mode which tries to retransmit the
874 // 0-RTT request (without encryption).
875 QUIC_DLOG(INFO) << ENDPOINT << "Try to send data of stream " << id
876 << " before encryption is established.";
877 }
878 return QuicConsumedData(0, false);
879 }
880
881 SetTransmissionType(type);
882 QuicConnection::ScopedEncryptionLevelContext context(connection(), level);
883
884 QuicConsumedData data =
885 connection_->SendStreamData(id, write_length, offset, state);
886 if (type == NOT_RETRANSMISSION) {
887 // This is new stream data.
888 write_blocked_streams_->UpdateBytesForStream(id, data.bytes_consumed);
889 }
890
891 return data;
892 }
893
SendCryptoData(EncryptionLevel level,size_t write_length,QuicStreamOffset offset,TransmissionType type)894 size_t QuicSession::SendCryptoData(EncryptionLevel level, size_t write_length,
895 QuicStreamOffset offset,
896 TransmissionType type) {
897 QUICHE_DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
898 if (!connection()->framer().HasEncrypterOfEncryptionLevel(level)) {
899 const std::string error_details = absl::StrCat(
900 "Try to send crypto data with missing keys of encryption level: ",
901 EncryptionLevelToString(level));
902 QUIC_BUG(quic_bug_10866_3) << ENDPOINT << error_details;
903 connection()->CloseConnection(
904 QUIC_MISSING_WRITE_KEYS, error_details,
905 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
906 return 0;
907 }
908 SetTransmissionType(type);
909 QuicConnection::ScopedEncryptionLevelContext context(connection(), level);
910 const auto bytes_consumed =
911 connection_->SendCryptoData(level, write_length, offset);
912 return bytes_consumed;
913 }
914
OnControlFrameManagerError(QuicErrorCode error_code,std::string error_details)915 void QuicSession::OnControlFrameManagerError(QuicErrorCode error_code,
916 std::string error_details) {
917 connection_->CloseConnection(
918 error_code, error_details,
919 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
920 }
921
WriteControlFrame(const QuicFrame & frame,TransmissionType type)922 bool QuicSession::WriteControlFrame(const QuicFrame& frame,
923 TransmissionType type) {
924 QUIC_BUG_IF(quic_bug_12435_11, !connection()->connected())
925 << ENDPOINT
926 << absl::StrCat("Try to write control frame: ", QuicFrameToString(frame),
927 " when connection is closed: ")
928 << on_closed_frame_string();
929 if (!IsEncryptionEstablished()) {
930 // Suppress the write before encryption gets established.
931 return false;
932 }
933 SetTransmissionType(type);
934 QuicConnection::ScopedEncryptionLevelContext context(
935 connection(), GetEncryptionLevelToSendApplicationData());
936 return connection_->SendControlFrame(frame);
937 }
938
ResetStream(QuicStreamId id,QuicRstStreamErrorCode error)939 void QuicSession::ResetStream(QuicStreamId id, QuicRstStreamErrorCode error) {
940 QuicStream* stream = GetStream(id);
941 if (stream != nullptr && stream->is_static()) {
942 connection()->CloseConnection(
943 QUIC_INVALID_STREAM_ID, "Try to reset a static stream",
944 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
945 return;
946 }
947
948 if (stream != nullptr) {
949 stream->Reset(error);
950 return;
951 }
952
953 QuicConnection::ScopedPacketFlusher flusher(connection());
954 MaybeSendStopSendingFrame(id, QuicResetStreamError::FromInternal(error));
955 MaybeSendRstStreamFrame(id, QuicResetStreamError::FromInternal(error), 0);
956 }
957
MaybeSendRstStreamFrame(QuicStreamId id,QuicResetStreamError error,QuicStreamOffset bytes_written)958 void QuicSession::MaybeSendRstStreamFrame(QuicStreamId id,
959 QuicResetStreamError error,
960 QuicStreamOffset bytes_written) {
961 if (!connection()->connected()) {
962 return;
963 }
964 if (!VersionHasIetfQuicFrames(transport_version()) ||
965 QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id),
966 version()) != READ_UNIDIRECTIONAL) {
967 control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
968 }
969
970 connection_->OnStreamReset(id, error.internal_code());
971 }
972
MaybeSendStopSendingFrame(QuicStreamId id,QuicResetStreamError error)973 void QuicSession::MaybeSendStopSendingFrame(QuicStreamId id,
974 QuicResetStreamError error) {
975 if (!connection()->connected()) {
976 return;
977 }
978 if (VersionHasIetfQuicFrames(transport_version()) &&
979 QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id),
980 version()) != WRITE_UNIDIRECTIONAL) {
981 control_frame_manager_.WriteOrBufferStopSending(error, id);
982 }
983 }
984
SendGoAway(QuicErrorCode error_code,const std::string & reason)985 void QuicSession::SendGoAway(QuicErrorCode error_code,
986 const std::string& reason) {
987 // GOAWAY frame is not supported in IETF QUIC.
988 QUICHE_DCHECK(!VersionHasIetfQuicFrames(transport_version()));
989 if (!IsEncryptionEstablished()) {
990 QUIC_CODE_COUNT(quic_goaway_before_encryption_established);
991 connection_->CloseConnection(
992 error_code, reason,
993 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
994 return;
995 }
996 if (transport_goaway_sent_) {
997 return;
998 }
999 transport_goaway_sent_ = true;
1000
1001 QUICHE_DCHECK_EQ(perspective(), Perspective::IS_SERVER);
1002 control_frame_manager_.WriteOrBufferGoAway(
1003 error_code,
1004 QuicUtils::GetMaxClientInitiatedBidirectionalStreamId(
1005 transport_version()),
1006 reason);
1007 }
1008
SendBlocked(QuicStreamId id,QuicStreamOffset byte_offset)1009 void QuicSession::SendBlocked(QuicStreamId id, QuicStreamOffset byte_offset) {
1010 control_frame_manager_.WriteOrBufferBlocked(id, byte_offset);
1011 }
1012
SendWindowUpdate(QuicStreamId id,QuicStreamOffset byte_offset)1013 void QuicSession::SendWindowUpdate(QuicStreamId id,
1014 QuicStreamOffset byte_offset) {
1015 control_frame_manager_.WriteOrBufferWindowUpdate(id, byte_offset);
1016 }
1017
OnStreamError(QuicErrorCode error_code,std::string error_details)1018 void QuicSession::OnStreamError(QuicErrorCode error_code,
1019 std::string error_details) {
1020 connection_->CloseConnection(
1021 error_code, error_details,
1022 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1023 }
1024
OnStreamError(QuicErrorCode error_code,QuicIetfTransportErrorCodes ietf_error,std::string error_details)1025 void QuicSession::OnStreamError(QuicErrorCode error_code,
1026 QuicIetfTransportErrorCodes ietf_error,
1027 std::string error_details) {
1028 connection_->CloseConnection(
1029 error_code, ietf_error, error_details,
1030 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1031 }
1032
CanSendMaxStreams()1033 bool QuicSession::CanSendMaxStreams() {
1034 return control_frame_manager_.NumBufferedMaxStreams() < 2;
1035 }
1036
SendMaxStreams(QuicStreamCount stream_count,bool unidirectional)1037 void QuicSession::SendMaxStreams(QuicStreamCount stream_count,
1038 bool unidirectional) {
1039 if (!is_configured_) {
1040 QUIC_BUG(quic_bug_10866_5)
1041 << "Try to send max streams before config negotiated.";
1042 return;
1043 }
1044 control_frame_manager_.WriteOrBufferMaxStreams(stream_count, unidirectional);
1045 }
1046
InsertLocallyClosedStreamsHighestOffset(const QuicStreamId id,QuicStreamOffset offset)1047 void QuicSession::InsertLocallyClosedStreamsHighestOffset(
1048 const QuicStreamId id, QuicStreamOffset offset) {
1049 locally_closed_streams_highest_offset_[id] = offset;
1050 }
1051
OnStreamClosed(QuicStreamId stream_id)1052 void QuicSession::OnStreamClosed(QuicStreamId stream_id) {
1053 QUIC_DVLOG(1) << ENDPOINT << "Closing stream: " << stream_id;
1054 StreamMap::iterator it = stream_map_.find(stream_id);
1055 if (it == stream_map_.end()) {
1056 QUIC_BUG(quic_bug_10866_6)
1057 << ENDPOINT << "Stream is already closed: " << stream_id;
1058 return;
1059 }
1060 QuicStream* stream = it->second.get();
1061 StreamType type = stream->type();
1062
1063 const bool stream_waiting_for_acks = stream->IsWaitingForAcks();
1064 if (stream_waiting_for_acks) {
1065 // The stream needs to be kept alive because it's waiting for acks.
1066 ++num_zombie_streams_;
1067 } else {
1068 closed_streams_.push_back(std::move(it->second));
1069 stream_map_.erase(it);
1070 // Do not retransmit data of a closed stream.
1071 streams_with_pending_retransmission_.erase(stream_id);
1072 if (!closed_streams_clean_up_alarm_->IsSet()) {
1073 closed_streams_clean_up_alarm_->Set(
1074 connection_->clock()->ApproximateNow());
1075 }
1076 connection_->QuicBugIfHasPendingFrames(stream_id);
1077 }
1078
1079 if (!stream->HasReceivedFinalOffset()) {
1080 // If we haven't received a FIN or RST for this stream, we need to keep
1081 // track of the how many bytes the stream's flow controller believes it has
1082 // received, for accurate connection level flow control accounting.
1083 // If this is an outgoing stream, it is technically open from peer's
1084 // perspective. Do not inform stream Id manager yet.
1085 QUICHE_DCHECK(!stream->was_draining());
1086 InsertLocallyClosedStreamsHighestOffset(
1087 stream_id, stream->highest_received_byte_offset());
1088 return;
1089 }
1090
1091 const bool stream_was_draining = stream->was_draining();
1092 QUIC_DVLOG_IF(1, stream_was_draining)
1093 << ENDPOINT << "Stream " << stream_id << " was draining";
1094 if (stream_was_draining) {
1095 QUIC_BUG_IF(quic_bug_12435_4, num_draining_streams_ == 0);
1096 --num_draining_streams_;
1097 if (!IsIncomingStream(stream_id)) {
1098 QUIC_BUG_IF(quic_bug_12435_5, num_outgoing_draining_streams_ == 0);
1099 --num_outgoing_draining_streams_;
1100 }
1101 // Stream Id manager has been informed with draining streams.
1102 return;
1103 }
1104 if (!VersionHasIetfQuicFrames(transport_version())) {
1105 stream_id_manager_.OnStreamClosed(
1106 /*is_incoming=*/IsIncomingStream(stream_id));
1107 }
1108 if (!connection_->connected()) {
1109 return;
1110 }
1111 if (IsIncomingStream(stream_id)) {
1112 // Stream Id manager is only interested in peer initiated stream IDs.
1113 if (VersionHasIetfQuicFrames(transport_version())) {
1114 ietf_streamid_manager_.OnStreamClosed(stream_id);
1115 }
1116 return;
1117 }
1118 if (!VersionHasIetfQuicFrames(transport_version())) {
1119 OnCanCreateNewOutgoingStream(type != BIDIRECTIONAL);
1120 }
1121 }
1122
ClosePendingStream(QuicStreamId stream_id)1123 void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
1124 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
1125 QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
1126 pending_stream_map_.erase(stream_id);
1127 if (connection_->connected()) {
1128 ietf_streamid_manager_.OnStreamClosed(stream_id);
1129 }
1130 }
1131
ShouldProcessFrameByPendingStream(QuicFrameType type,QuicStreamId id) const1132 bool QuicSession::ShouldProcessFrameByPendingStream(QuicFrameType type,
1133 QuicStreamId id) const {
1134 return stream_map_.find(id) == stream_map_.end() &&
1135 ((version().HasIetfQuicFrames() && ExceedsPerLoopStreamLimit()) ||
1136 UsesPendingStreamForFrame(type, id));
1137 }
1138
OnFinalByteOffsetReceived(QuicStreamId stream_id,QuicStreamOffset final_byte_offset)1139 void QuicSession::OnFinalByteOffsetReceived(
1140 QuicStreamId stream_id, QuicStreamOffset final_byte_offset) {
1141 auto it = locally_closed_streams_highest_offset_.find(stream_id);
1142 if (it == locally_closed_streams_highest_offset_.end()) {
1143 return;
1144 }
1145
1146 QUIC_DVLOG(1) << ENDPOINT << "Received final byte offset "
1147 << final_byte_offset << " for stream " << stream_id;
1148 QuicByteCount offset_diff = final_byte_offset - it->second;
1149 if (flow_controller_.UpdateHighestReceivedOffset(
1150 flow_controller_.highest_received_byte_offset() + offset_diff)) {
1151 // If the final offset violates flow control, close the connection now.
1152 if (flow_controller_.FlowControlViolation()) {
1153 connection_->CloseConnection(
1154 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
1155 "Connection level flow control violation",
1156 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1157 return;
1158 }
1159 }
1160
1161 flow_controller_.AddBytesConsumed(offset_diff);
1162 locally_closed_streams_highest_offset_.erase(it);
1163 if (!VersionHasIetfQuicFrames(transport_version())) {
1164 stream_id_manager_.OnStreamClosed(
1165 /*is_incoming=*/IsIncomingStream(stream_id));
1166 }
1167 if (IsIncomingStream(stream_id)) {
1168 if (VersionHasIetfQuicFrames(transport_version())) {
1169 ietf_streamid_manager_.OnStreamClosed(stream_id);
1170 }
1171 } else if (!VersionHasIetfQuicFrames(transport_version())) {
1172 OnCanCreateNewOutgoingStream(false);
1173 }
1174 }
1175
IsEncryptionEstablished() const1176 bool QuicSession::IsEncryptionEstablished() const {
1177 if (GetCryptoStream() == nullptr) {
1178 return false;
1179 }
1180 return GetCryptoStream()->encryption_established();
1181 }
1182
OneRttKeysAvailable() const1183 bool QuicSession::OneRttKeysAvailable() const {
1184 if (GetCryptoStream() == nullptr) {
1185 return false;
1186 }
1187 return GetCryptoStream()->one_rtt_keys_available();
1188 }
1189
OnConfigNegotiated()1190 void QuicSession::OnConfigNegotiated() {
1191 // In versions with TLS, the configs will be set twice if 0-RTT is available.
1192 // In the second config setting, 1-RTT keys are guaranteed to be available.
1193 if (version().UsesTls() && is_configured_ &&
1194 connection_->encryption_level() != ENCRYPTION_FORWARD_SECURE) {
1195 QUIC_BUG(quic_bug_12435_6)
1196 << ENDPOINT
1197 << "1-RTT keys missing when config is negotiated for the second time.";
1198 connection_->CloseConnection(
1199 QUIC_INTERNAL_ERROR,
1200 "1-RTT keys missing when config is negotiated for the second time.",
1201 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1202 return;
1203 }
1204
1205 QUIC_DVLOG(1) << ENDPOINT << "OnConfigNegotiated";
1206 connection_->SetFromConfig(config_);
1207
1208 if (VersionHasIetfQuicFrames(transport_version())) {
1209 uint32_t max_streams = 0;
1210 if (config_.HasReceivedMaxBidirectionalStreams()) {
1211 max_streams = config_.ReceivedMaxBidirectionalStreams();
1212 }
1213 if (was_zero_rtt_rejected_ &&
1214 max_streams <
1215 ietf_streamid_manager_.outgoing_bidirectional_stream_count()) {
1216 connection_->CloseConnection(
1217 QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1218 absl::StrCat(
1219 "Server rejected 0-RTT, aborting because new bidirectional "
1220 "initial stream limit ",
1221 max_streams, " is less than current open streams: ",
1222 ietf_streamid_manager_.outgoing_bidirectional_stream_count()),
1223 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1224 return;
1225 }
1226 QUIC_DVLOG(1) << ENDPOINT
1227 << "Setting Bidirectional outgoing_max_streams_ to "
1228 << max_streams;
1229 if (perspective_ == Perspective::IS_CLIENT &&
1230 max_streams <
1231 ietf_streamid_manager_.max_outgoing_bidirectional_streams()) {
1232 connection_->CloseConnection(
1233 was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1234 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1235 absl::StrCat(
1236 was_zero_rtt_rejected_
1237 ? "Server rejected 0-RTT, aborting because "
1238 : "",
1239 "new bidirectional limit ", max_streams,
1240 " decreases the current limit: ",
1241 ietf_streamid_manager_.max_outgoing_bidirectional_streams()),
1242 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1243 return;
1244 }
1245 if (ietf_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
1246 max_streams)) {
1247 OnCanCreateNewOutgoingStream(/*unidirectional = */ false);
1248 }
1249
1250 max_streams = 0;
1251 if (config_.HasReceivedMaxUnidirectionalStreams()) {
1252 max_streams = config_.ReceivedMaxUnidirectionalStreams();
1253 }
1254
1255 if (was_zero_rtt_rejected_ &&
1256 max_streams <
1257 ietf_streamid_manager_.outgoing_unidirectional_stream_count()) {
1258 connection_->CloseConnection(
1259 QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1260 absl::StrCat(
1261 "Server rejected 0-RTT, aborting because new unidirectional "
1262 "initial stream limit ",
1263 max_streams, " is less than current open streams: ",
1264 ietf_streamid_manager_.outgoing_unidirectional_stream_count()),
1265 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1266 return;
1267 }
1268
1269 if (max_streams <
1270 ietf_streamid_manager_.max_outgoing_unidirectional_streams()) {
1271 connection_->CloseConnection(
1272 was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1273 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1274 absl::StrCat(
1275 was_zero_rtt_rejected_
1276 ? "Server rejected 0-RTT, aborting because "
1277 : "",
1278 "new unidirectional limit ", max_streams,
1279 " decreases the current limit: ",
1280 ietf_streamid_manager_.max_outgoing_unidirectional_streams()),
1281 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1282 return;
1283 }
1284 QUIC_DVLOG(1) << ENDPOINT
1285 << "Setting Unidirectional outgoing_max_streams_ to "
1286 << max_streams;
1287 if (ietf_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
1288 max_streams)) {
1289 OnCanCreateNewOutgoingStream(/*unidirectional = */ true);
1290 }
1291 } else {
1292 uint32_t max_streams = 0;
1293 if (config_.HasReceivedMaxBidirectionalStreams()) {
1294 max_streams = config_.ReceivedMaxBidirectionalStreams();
1295 }
1296 QUIC_DVLOG(1) << ENDPOINT << "Setting max_open_outgoing_streams_ to "
1297 << max_streams;
1298 if (was_zero_rtt_rejected_ &&
1299 max_streams < stream_id_manager_.num_open_outgoing_streams()) {
1300 connection_->CloseConnection(
1301 QUIC_INTERNAL_ERROR,
1302 absl::StrCat(
1303 "Server rejected 0-RTT, aborting because new stream limit ",
1304 max_streams, " is less than current open streams: ",
1305 stream_id_manager_.num_open_outgoing_streams()),
1306 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1307 return;
1308 }
1309 stream_id_manager_.set_max_open_outgoing_streams(max_streams);
1310 }
1311
1312 if (perspective() == Perspective::IS_SERVER) {
1313 if (config_.HasReceivedConnectionOptions()) {
1314 // The following variations change the initial receive flow control
1315 // window sizes.
1316 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW6)) {
1317 AdjustInitialFlowControlWindows(64 * 1024);
1318 }
1319 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW7)) {
1320 AdjustInitialFlowControlWindows(128 * 1024);
1321 }
1322 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW8)) {
1323 AdjustInitialFlowControlWindows(256 * 1024);
1324 }
1325 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW9)) {
1326 AdjustInitialFlowControlWindows(512 * 1024);
1327 }
1328 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFWA)) {
1329 AdjustInitialFlowControlWindows(1024 * 1024);
1330 }
1331 }
1332
1333 config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
1334 }
1335
1336 if (VersionHasIetfQuicFrames(transport_version())) {
1337 ietf_streamid_manager_.SetMaxOpenIncomingBidirectionalStreams(
1338 config_.GetMaxBidirectionalStreamsToSend());
1339 ietf_streamid_manager_.SetMaxOpenIncomingUnidirectionalStreams(
1340 config_.GetMaxUnidirectionalStreamsToSend());
1341 } else {
1342 // A small number of additional incoming streams beyond the limit should be
1343 // allowed. This helps avoid early connection termination when FIN/RSTs for
1344 // old streams are lost or arrive out of order.
1345 // Use a minimum number of additional streams, or a percentage increase,
1346 // whichever is larger.
1347 uint32_t max_incoming_streams_to_send =
1348 config_.GetMaxBidirectionalStreamsToSend();
1349 uint32_t max_incoming_streams =
1350 std::max(max_incoming_streams_to_send + kMaxStreamsMinimumIncrement,
1351 static_cast<uint32_t>(max_incoming_streams_to_send *
1352 kMaxStreamsMultiplier));
1353 stream_id_manager_.set_max_open_incoming_streams(max_incoming_streams);
1354 }
1355
1356 if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
1357 // When using IETF-style TLS transport parameters, inform existing streams
1358 // of new flow-control limits.
1359 if (config_.HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
1360 OnNewStreamOutgoingBidirectionalFlowControlWindow(
1361 config_.ReceivedInitialMaxStreamDataBytesOutgoingBidirectional());
1362 }
1363 if (config_.HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
1364 OnNewStreamIncomingBidirectionalFlowControlWindow(
1365 config_.ReceivedInitialMaxStreamDataBytesIncomingBidirectional());
1366 }
1367 if (config_.HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
1368 OnNewStreamUnidirectionalFlowControlWindow(
1369 config_.ReceivedInitialMaxStreamDataBytesUnidirectional());
1370 }
1371 } else { // The version uses Google QUIC Crypto.
1372 if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
1373 // Streams which were created before the SHLO was received (0-RTT
1374 // requests) are now informed of the peer's initial flow control window.
1375 OnNewStreamFlowControlWindow(
1376 config_.ReceivedInitialStreamFlowControlWindowBytes());
1377 }
1378 }
1379
1380 if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
1381 OnNewSessionFlowControlWindow(
1382 config_.ReceivedInitialSessionFlowControlWindowBytes());
1383 }
1384
1385 if (perspective_ == Perspective::IS_SERVER && version().HasIetfQuicFrames() &&
1386 connection_->effective_peer_address().IsInitialized()) {
1387 if (config_.SupportsServerPreferredAddress(perspective_)) {
1388 quiche::IpAddressFamily address_family =
1389 connection_->effective_peer_address()
1390 .Normalized()
1391 .host()
1392 .address_family();
1393 std::optional<QuicSocketAddress> expected_preferred_address =
1394 config_.GetMappedAlternativeServerAddress(address_family);
1395 if (expected_preferred_address.has_value()) {
1396 // Set connection ID and token if SPAD has received and a preferred
1397 // address of the same address family is configured.
1398 std::optional<QuicNewConnectionIdFrame> frame =
1399 connection_->MaybeIssueNewConnectionIdForPreferredAddress();
1400 if (frame.has_value()) {
1401 config_.SetPreferredAddressConnectionIdAndTokenToSend(
1402 frame->connection_id, frame->stateless_reset_token);
1403 }
1404 connection_->set_expected_server_preferred_address(
1405 *expected_preferred_address);
1406 }
1407 // Clear the alternative address of the other address family in the
1408 // config.
1409 config_.ClearAlternateServerAddressToSend(
1410 address_family == quiche::IpAddressFamily::IP_V4
1411 ? quiche::IpAddressFamily::IP_V6
1412 : quiche::IpAddressFamily::IP_V4);
1413 } else {
1414 // Clear alternative IPv(4|6) addresses in config if the server hasn't
1415 // received 'SPAD' connection option.
1416 config_.ClearAlternateServerAddressToSend(quiche::IpAddressFamily::IP_V4);
1417 config_.ClearAlternateServerAddressToSend(quiche::IpAddressFamily::IP_V6);
1418 }
1419 }
1420
1421 is_configured_ = true;
1422 connection()->OnConfigNegotiated();
1423
1424 // Ask flow controllers to try again since the config could have unblocked us.
1425 // Or if this session is configured on TLS enabled QUIC versions,
1426 // attempt to retransmit 0-RTT data if there's any.
1427 // TODO(fayang): consider removing this OnCanWrite call.
1428 if (!connection_->framer().is_processing_packet() &&
1429 (connection_->version().AllowsLowFlowControlLimits() ||
1430 version().UsesTls())) {
1431 QUIC_CODE_COUNT(quic_session_on_can_write_on_config_negotiated);
1432 OnCanWrite();
1433 }
1434 }
1435
OnAlpsData(const uint8_t *,size_t)1436 std::optional<std::string> QuicSession::OnAlpsData(const uint8_t* /*alps_data*/,
1437 size_t /*alps_length*/) {
1438 return std::nullopt;
1439 }
1440
AdjustInitialFlowControlWindows(size_t stream_window)1441 void QuicSession::AdjustInitialFlowControlWindows(size_t stream_window) {
1442 const float session_window_multiplier =
1443 config_.GetInitialStreamFlowControlWindowToSend()
1444 ? static_cast<float>(
1445 config_.GetInitialSessionFlowControlWindowToSend()) /
1446 config_.GetInitialStreamFlowControlWindowToSend()
1447 : 1.5;
1448
1449 QUIC_DVLOG(1) << ENDPOINT << "Set stream receive window to " << stream_window;
1450 config_.SetInitialStreamFlowControlWindowToSend(stream_window);
1451
1452 size_t session_window = session_window_multiplier * stream_window;
1453 QUIC_DVLOG(1) << ENDPOINT << "Set session receive window to "
1454 << session_window;
1455 config_.SetInitialSessionFlowControlWindowToSend(session_window);
1456 flow_controller_.UpdateReceiveWindowSize(session_window);
1457 // Inform all existing streams about the new window.
1458 for (auto const& kv : stream_map_) {
1459 kv.second->UpdateReceiveWindowSize(stream_window);
1460 }
1461 if (!QuicVersionUsesCryptoFrames(transport_version())) {
1462 GetMutableCryptoStream()->UpdateReceiveWindowSize(stream_window);
1463 }
1464 }
1465
HandleFrameOnNonexistentOutgoingStream(QuicStreamId stream_id)1466 void QuicSession::HandleFrameOnNonexistentOutgoingStream(
1467 QuicStreamId stream_id) {
1468 QUICHE_DCHECK(!IsClosedStream(stream_id));
1469 // Received a frame for a locally-created stream that is not currently
1470 // active. This is an error.
1471 if (VersionHasIetfQuicFrames(transport_version())) {
1472 connection()->CloseConnection(
1473 QUIC_HTTP_STREAM_WRONG_DIRECTION, "Data for nonexistent stream",
1474 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1475 return;
1476 }
1477 connection()->CloseConnection(
1478 QUIC_INVALID_STREAM_ID, "Data for nonexistent stream",
1479 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1480 }
1481
HandleRstOnValidNonexistentStream(const QuicRstStreamFrame & frame)1482 void QuicSession::HandleRstOnValidNonexistentStream(
1483 const QuicRstStreamFrame& frame) {
1484 // If the stream is neither originally in active streams nor created in
1485 // GetOrCreateStream(), it could be a closed stream in which case its
1486 // final received byte offset need to be updated.
1487 if (IsClosedStream(frame.stream_id)) {
1488 // The RST frame contains the final byte offset for the stream: we can now
1489 // update the connection level flow controller if needed.
1490 OnFinalByteOffsetReceived(frame.stream_id, frame.byte_offset);
1491 }
1492 }
1493
OnNewStreamFlowControlWindow(QuicStreamOffset new_window)1494 void QuicSession::OnNewStreamFlowControlWindow(QuicStreamOffset new_window) {
1495 QUICHE_DCHECK(version().UsesQuicCrypto());
1496 QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamFlowControlWindow " << new_window;
1497 if (new_window < kMinimumFlowControlSendWindow) {
1498 QUIC_LOG_FIRST_N(ERROR, 1)
1499 << "Peer sent us an invalid stream flow control send window: "
1500 << new_window << ", below minimum: " << kMinimumFlowControlSendWindow;
1501 connection_->CloseConnection(
1502 QUIC_FLOW_CONTROL_INVALID_WINDOW, "New stream window too low",
1503 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1504 return;
1505 }
1506
1507 // Inform all existing streams about the new window.
1508 for (auto const& kv : stream_map_) {
1509 QUIC_DVLOG(1) << ENDPOINT << "Informing stream " << kv.first
1510 << " of new stream flow control window " << new_window;
1511 if (!kv.second->MaybeConfigSendWindowOffset(
1512 new_window, /* was_zero_rtt_rejected = */ false)) {
1513 return;
1514 }
1515 }
1516 if (!QuicVersionUsesCryptoFrames(transport_version())) {
1517 QUIC_DVLOG(1)
1518 << ENDPOINT
1519 << "Informing crypto stream of new stream flow control window "
1520 << new_window;
1521 GetMutableCryptoStream()->MaybeConfigSendWindowOffset(
1522 new_window, /* was_zero_rtt_rejected = */ false);
1523 }
1524 }
1525
OnNewStreamUnidirectionalFlowControlWindow(QuicStreamOffset new_window)1526 void QuicSession::OnNewStreamUnidirectionalFlowControlWindow(
1527 QuicStreamOffset new_window) {
1528 QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1529 QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamUnidirectionalFlowControlWindow "
1530 << new_window;
1531 // Inform all existing outgoing unidirectional streams about the new window.
1532 for (auto const& kv : stream_map_) {
1533 const QuicStreamId id = kv.first;
1534 if (!version().HasIetfQuicFrames()) {
1535 if (kv.second->type() == BIDIRECTIONAL) {
1536 continue;
1537 }
1538 } else {
1539 if (QuicUtils::IsBidirectionalStreamId(id, version())) {
1540 continue;
1541 }
1542 }
1543 if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1544 perspective())) {
1545 continue;
1546 }
1547 QUIC_DVLOG(1) << ENDPOINT << "Informing unidirectional stream " << id
1548 << " of new stream flow control window " << new_window;
1549 if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1550 was_zero_rtt_rejected_)) {
1551 return;
1552 }
1553 }
1554 }
1555
OnNewStreamOutgoingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1556 void QuicSession::OnNewStreamOutgoingBidirectionalFlowControlWindow(
1557 QuicStreamOffset new_window) {
1558 QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1559 QUIC_DVLOG(1) << ENDPOINT
1560 << "OnNewStreamOutgoingBidirectionalFlowControlWindow "
1561 << new_window;
1562 // Inform all existing outgoing bidirectional streams about the new window.
1563 for (auto const& kv : stream_map_) {
1564 const QuicStreamId id = kv.first;
1565 if (!version().HasIetfQuicFrames()) {
1566 if (kv.second->type() != BIDIRECTIONAL) {
1567 continue;
1568 }
1569 } else {
1570 if (!QuicUtils::IsBidirectionalStreamId(id, version())) {
1571 continue;
1572 }
1573 }
1574 if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1575 perspective())) {
1576 continue;
1577 }
1578 QUIC_DVLOG(1) << ENDPOINT << "Informing outgoing bidirectional stream "
1579 << id << " of new stream flow control window " << new_window;
1580 if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1581 was_zero_rtt_rejected_)) {
1582 return;
1583 }
1584 }
1585 }
1586
OnNewStreamIncomingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1587 void QuicSession::OnNewStreamIncomingBidirectionalFlowControlWindow(
1588 QuicStreamOffset new_window) {
1589 QUICHE_DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
1590 QUIC_DVLOG(1) << ENDPOINT
1591 << "OnNewStreamIncomingBidirectionalFlowControlWindow "
1592 << new_window;
1593 // Inform all existing incoming bidirectional streams about the new window.
1594 for (auto const& kv : stream_map_) {
1595 const QuicStreamId id = kv.first;
1596 if (!version().HasIetfQuicFrames()) {
1597 if (kv.second->type() != BIDIRECTIONAL) {
1598 continue;
1599 }
1600 } else {
1601 if (!QuicUtils::IsBidirectionalStreamId(id, version())) {
1602 continue;
1603 }
1604 }
1605 if (QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1606 perspective())) {
1607 continue;
1608 }
1609 QUIC_DVLOG(1) << ENDPOINT << "Informing incoming bidirectional stream "
1610 << id << " of new stream flow control window " << new_window;
1611 if (!kv.second->MaybeConfigSendWindowOffset(new_window,
1612 was_zero_rtt_rejected_)) {
1613 return;
1614 }
1615 }
1616 }
1617
OnNewSessionFlowControlWindow(QuicStreamOffset new_window)1618 void QuicSession::OnNewSessionFlowControlWindow(QuicStreamOffset new_window) {
1619 QUIC_DVLOG(1) << ENDPOINT << "OnNewSessionFlowControlWindow " << new_window;
1620
1621 if (was_zero_rtt_rejected_ && new_window < flow_controller_.bytes_sent()) {
1622 std::string error_details = absl::StrCat(
1623 "Server rejected 0-RTT. Aborting because the client received session "
1624 "flow control send window: ",
1625 new_window,
1626 ", which is below currently used: ", flow_controller_.bytes_sent());
1627 QUIC_LOG(ERROR) << error_details;
1628 connection_->CloseConnection(
1629 QUIC_ZERO_RTT_UNRETRANSMITTABLE, error_details,
1630 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1631 return;
1632 }
1633 if (!connection_->version().AllowsLowFlowControlLimits() &&
1634 new_window < kMinimumFlowControlSendWindow) {
1635 std::string error_details = absl::StrCat(
1636 "Peer sent us an invalid session flow control send window: ",
1637 new_window, ", below minimum: ", kMinimumFlowControlSendWindow);
1638 QUIC_LOG_FIRST_N(ERROR, 1) << error_details;
1639 connection_->CloseConnection(
1640 QUIC_FLOW_CONTROL_INVALID_WINDOW, error_details,
1641 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1642 return;
1643 }
1644 if (perspective_ == Perspective::IS_CLIENT &&
1645 new_window < flow_controller_.send_window_offset()) {
1646 // The client receives a lower limit than remembered, violating
1647 // https://tools.ietf.org/html/draft-ietf-quic-transport-27#section-7.3.1
1648 std::string error_details = absl::StrCat(
1649 was_zero_rtt_rejected_ ? "Server rejected 0-RTT, aborting because "
1650 : "",
1651 "new session max data ", new_window,
1652 " decreases current limit: ", flow_controller_.send_window_offset());
1653 QUIC_LOG(ERROR) << error_details;
1654 connection_->CloseConnection(
1655 was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1656 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1657 error_details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1658 return;
1659 }
1660
1661 flow_controller_.UpdateSendWindowOffset(new_window);
1662 }
1663
OnNewDecryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicDecrypter> decrypter,bool set_alternative_decrypter,bool latch_once_used)1664 bool QuicSession::OnNewDecryptionKeyAvailable(
1665 EncryptionLevel level, std::unique_ptr<QuicDecrypter> decrypter,
1666 bool set_alternative_decrypter, bool latch_once_used) {
1667 if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3 &&
1668 !connection()->framer().HasEncrypterOfEncryptionLevel(
1669 QuicUtils::GetEncryptionLevelToSendAckofSpace(
1670 QuicUtils::GetPacketNumberSpace(level)))) {
1671 // This should never happen because connection should never decrypt a packet
1672 // while an ACK for it cannot be encrypted.
1673 return false;
1674 }
1675 if (connection()->version().KnowsWhichDecrypterToUse()) {
1676 connection()->InstallDecrypter(level, std::move(decrypter));
1677 return true;
1678 }
1679 if (set_alternative_decrypter) {
1680 connection()->SetAlternativeDecrypter(level, std::move(decrypter),
1681 latch_once_used);
1682 return true;
1683 }
1684 connection()->SetDecrypter(level, std::move(decrypter));
1685 return true;
1686 }
1687
OnNewEncryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicEncrypter> encrypter)1688 void QuicSession::OnNewEncryptionKeyAvailable(
1689 EncryptionLevel level, std::unique_ptr<QuicEncrypter> encrypter) {
1690 connection()->SetEncrypter(level, std::move(encrypter));
1691 if (connection_->version().handshake_protocol != PROTOCOL_TLS1_3) {
1692 return;
1693 }
1694
1695 bool reset_encryption_level = false;
1696 if (IsEncryptionEstablished() && level == ENCRYPTION_HANDSHAKE) {
1697 // ENCRYPTION_HANDSHAKE keys are only used for the handshake. If
1698 // ENCRYPTION_ZERO_RTT keys exist, it is possible for a client to send
1699 // stream data, which must not be sent at the ENCRYPTION_HANDSHAKE level.
1700 // Therefore, we avoid setting the default encryption level to
1701 // ENCRYPTION_HANDSHAKE.
1702 reset_encryption_level = true;
1703 }
1704 QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to " << level;
1705 connection()->SetDefaultEncryptionLevel(level);
1706 if (reset_encryption_level) {
1707 connection()->SetDefaultEncryptionLevel(ENCRYPTION_ZERO_RTT);
1708 }
1709 QUIC_BUG_IF(quic_bug_12435_7,
1710 IsEncryptionEstablished() &&
1711 (connection()->encryption_level() == ENCRYPTION_INITIAL ||
1712 connection()->encryption_level() == ENCRYPTION_HANDSHAKE))
1713 << "Encryption is established, but the encryption level " << level
1714 << " does not support sending stream data";
1715 }
1716
SetDefaultEncryptionLevel(EncryptionLevel level)1717 void QuicSession::SetDefaultEncryptionLevel(EncryptionLevel level) {
1718 QUICHE_DCHECK_EQ(PROTOCOL_QUIC_CRYPTO,
1719 connection_->version().handshake_protocol);
1720 QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to " << level;
1721 connection()->SetDefaultEncryptionLevel(level);
1722
1723 switch (level) {
1724 case ENCRYPTION_INITIAL:
1725 break;
1726 case ENCRYPTION_ZERO_RTT:
1727 if (perspective() == Perspective::IS_CLIENT) {
1728 // Retransmit old 0-RTT data (if any) with the new 0-RTT keys, since
1729 // they can't be decrypted by the server.
1730 connection_->MarkZeroRttPacketsForRetransmission(0);
1731 if (!connection_->framer().is_processing_packet()) {
1732 // TODO(fayang): consider removing this OnCanWrite call.
1733 // Given any streams blocked by encryption a chance to write.
1734 QUIC_CODE_COUNT(
1735 quic_session_on_can_write_set_default_encryption_level);
1736 OnCanWrite();
1737 }
1738 }
1739 break;
1740 case ENCRYPTION_HANDSHAKE:
1741 break;
1742 case ENCRYPTION_FORWARD_SECURE:
1743 QUIC_BUG_IF(quic_bug_12435_8, !config_.negotiated())
1744 << ENDPOINT << "Handshake confirmed without parameter negotiation.";
1745 connection()->mutable_stats().handshake_completion_time =
1746 connection()->clock()->ApproximateNow();
1747 break;
1748 default:
1749 QUIC_BUG(quic_bug_10866_7) << "Unknown encryption level: " << level;
1750 }
1751 }
1752
OnTlsHandshakeComplete()1753 void QuicSession::OnTlsHandshakeComplete() {
1754 QUICHE_DCHECK_EQ(PROTOCOL_TLS1_3, connection_->version().handshake_protocol);
1755 QUIC_BUG_IF(quic_bug_12435_9,
1756 !GetCryptoStream()->crypto_negotiated_params().cipher_suite)
1757 << ENDPOINT << "Handshake completes without cipher suite negotiation.";
1758 QUIC_BUG_IF(quic_bug_12435_10, !config_.negotiated())
1759 << ENDPOINT << "Handshake completes without parameter negotiation.";
1760 connection()->mutable_stats().handshake_completion_time =
1761 connection()->clock()->ApproximateNow();
1762 if (connection()->version().UsesTls() &&
1763 perspective_ == Perspective::IS_SERVER) {
1764 // Server sends HANDSHAKE_DONE to signal confirmation of the handshake
1765 // to the client.
1766 control_frame_manager_.WriteOrBufferHandshakeDone();
1767 if (connection()->version().HasIetfQuicFrames()) {
1768 MaybeSendAddressToken();
1769 }
1770 }
1771 }
1772
MaybeSendAddressToken()1773 bool QuicSession::MaybeSendAddressToken() {
1774 QUICHE_DCHECK(perspective_ == Perspective::IS_SERVER &&
1775 connection()->version().HasIetfQuicFrames());
1776 std::optional<CachedNetworkParameters> cached_network_params =
1777 GenerateCachedNetworkParameters();
1778
1779 std::string address_token = GetCryptoStream()->GetAddressToken(
1780 cached_network_params.has_value() ? &*cached_network_params : nullptr);
1781 if (address_token.empty()) {
1782 return false;
1783 }
1784 const size_t buf_len = address_token.length() + 1;
1785 auto buffer = std::make_unique<char[]>(buf_len);
1786 QuicDataWriter writer(buf_len, buffer.get());
1787 // Add |kAddressTokenPrefix| for token sent in NEW_TOKEN frame.
1788 writer.WriteUInt8(kAddressTokenPrefix);
1789 writer.WriteBytes(address_token.data(), address_token.length());
1790 control_frame_manager_.WriteOrBufferNewToken(
1791 absl::string_view(buffer.get(), buf_len));
1792 if (cached_network_params.has_value()) {
1793 connection()->OnSendConnectionState(*cached_network_params);
1794 }
1795 return true;
1796 }
1797
DiscardOldDecryptionKey(EncryptionLevel level)1798 void QuicSession::DiscardOldDecryptionKey(EncryptionLevel level) {
1799 if (!connection()->version().KnowsWhichDecrypterToUse()) {
1800 return;
1801 }
1802 connection()->RemoveDecrypter(level);
1803 }
1804
DiscardOldEncryptionKey(EncryptionLevel level)1805 void QuicSession::DiscardOldEncryptionKey(EncryptionLevel level) {
1806 QUIC_DLOG(INFO) << ENDPOINT << "Discarding " << level << " keys";
1807 if (connection()->version().handshake_protocol == PROTOCOL_TLS1_3) {
1808 connection()->RemoveEncrypter(level);
1809 }
1810 switch (level) {
1811 case ENCRYPTION_INITIAL:
1812 NeuterUnencryptedData();
1813 break;
1814 case ENCRYPTION_HANDSHAKE:
1815 NeuterHandshakeData();
1816 break;
1817 case ENCRYPTION_ZERO_RTT:
1818 break;
1819 case ENCRYPTION_FORWARD_SECURE:
1820 QUIC_BUG(quic_bug_10866_8)
1821 << ENDPOINT << "Discarding 1-RTT keys is not allowed";
1822 break;
1823 default:
1824 QUIC_BUG(quic_bug_10866_9)
1825 << ENDPOINT
1826 << "Cannot discard keys for unknown encryption level: " << level;
1827 }
1828 }
1829
NeuterHandshakeData()1830 void QuicSession::NeuterHandshakeData() {
1831 GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(
1832 ENCRYPTION_HANDSHAKE);
1833 connection()->OnHandshakeComplete();
1834 }
1835
OnZeroRttRejected(int reason)1836 void QuicSession::OnZeroRttRejected(int reason) {
1837 was_zero_rtt_rejected_ = true;
1838 connection_->MarkZeroRttPacketsForRetransmission(reason);
1839 if (connection_->encryption_level() == ENCRYPTION_FORWARD_SECURE) {
1840 QUIC_BUG(quic_bug_10866_10)
1841 << "1-RTT keys already available when 0-RTT is rejected.";
1842 connection_->CloseConnection(
1843 QUIC_INTERNAL_ERROR,
1844 "1-RTT keys already available when 0-RTT is rejected.",
1845 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1846 }
1847 }
1848
FillTransportParameters(TransportParameters * params)1849 bool QuicSession::FillTransportParameters(TransportParameters* params) {
1850 if (version().UsesTls()) {
1851 if (perspective() == Perspective::IS_SERVER) {
1852 config_.SetOriginalConnectionIdToSend(
1853 connection_->GetOriginalDestinationConnectionId());
1854 config_.SetInitialSourceConnectionIdToSend(connection_->connection_id());
1855 } else {
1856 config_.SetInitialSourceConnectionIdToSend(
1857 connection_->client_connection_id());
1858 }
1859 }
1860 return config_.FillTransportParameters(params);
1861 }
1862
ProcessTransportParameters(const TransportParameters & params,bool is_resumption,std::string * error_details)1863 QuicErrorCode QuicSession::ProcessTransportParameters(
1864 const TransportParameters& params, bool is_resumption,
1865 std::string* error_details) {
1866 return config_.ProcessTransportParameters(params, is_resumption,
1867 error_details);
1868 }
1869
OnHandshakeCallbackDone()1870 void QuicSession::OnHandshakeCallbackDone() {
1871 if (!connection_->connected()) {
1872 return;
1873 }
1874
1875 if (!connection()->is_processing_packet()) {
1876 connection()->MaybeProcessUndecryptablePackets();
1877 }
1878 }
1879
PacketFlusherAttached() const1880 bool QuicSession::PacketFlusherAttached() const {
1881 QUICHE_DCHECK(connection_->connected());
1882 return connection()->packet_creator().PacketFlusherAttached();
1883 }
1884
OnEncryptedClientHelloSent(absl::string_view client_hello) const1885 void QuicSession::OnEncryptedClientHelloSent(
1886 absl::string_view client_hello) const {
1887 connection()->OnEncryptedClientHelloSent(client_hello);
1888 }
1889
OnEncryptedClientHelloReceived(absl::string_view client_hello) const1890 void QuicSession::OnEncryptedClientHelloReceived(
1891 absl::string_view client_hello) const {
1892 connection()->OnEncryptedClientHelloReceived(client_hello);
1893 }
1894
OnCryptoHandshakeMessageSent(const CryptoHandshakeMessage &)1895 void QuicSession::OnCryptoHandshakeMessageSent(
1896 const CryptoHandshakeMessage& /*message*/) {}
1897
OnCryptoHandshakeMessageReceived(const CryptoHandshakeMessage &)1898 void QuicSession::OnCryptoHandshakeMessageReceived(
1899 const CryptoHandshakeMessage& /*message*/) {}
1900
RegisterStreamPriority(QuicStreamId id,bool is_static,const QuicStreamPriority & priority)1901 void QuicSession::RegisterStreamPriority(QuicStreamId id, bool is_static,
1902 const QuicStreamPriority& priority) {
1903 write_blocked_streams()->RegisterStream(id, is_static, priority);
1904 }
1905
UnregisterStreamPriority(QuicStreamId id)1906 void QuicSession::UnregisterStreamPriority(QuicStreamId id) {
1907 write_blocked_streams()->UnregisterStream(id);
1908 }
1909
UpdateStreamPriority(QuicStreamId id,const QuicStreamPriority & new_priority)1910 void QuicSession::UpdateStreamPriority(QuicStreamId id,
1911 const QuicStreamPriority& new_priority) {
1912 write_blocked_streams()->UpdateStreamPriority(id, new_priority);
1913 }
1914
ActivateStream(std::unique_ptr<QuicStream> stream)1915 void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
1916 const bool should_keep_alive = ShouldKeepConnectionAlive();
1917 QuicStreamId stream_id = stream->id();
1918 bool is_static = stream->is_static();
1919 QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
1920 << ". activating stream " << stream_id;
1921 QUICHE_DCHECK(!stream_map_.contains(stream_id));
1922 stream_map_[stream_id] = std::move(stream);
1923 if (is_static) {
1924 ++num_static_streams_;
1925 return;
1926 }
1927 if (version().HasIetfQuicFrames() && IsIncomingStream(stream_id) &&
1928 max_streams_accepted_per_loop_ != kMaxQuicStreamCount) {
1929 QUICHE_DCHECK(!ExceedsPerLoopStreamLimit());
1930 // Per-loop stream limit is emposed.
1931 ++new_incoming_streams_in_current_loop_;
1932 if (!stream_count_reset_alarm_->IsSet()) {
1933 stream_count_reset_alarm_->Set(connection()->clock()->ApproximateNow());
1934 }
1935 }
1936 if (!VersionHasIetfQuicFrames(transport_version())) {
1937 // Do not inform stream ID manager of static streams.
1938 stream_id_manager_.ActivateStream(
1939 /*is_incoming=*/IsIncomingStream(stream_id));
1940 }
1941 if (perspective() == Perspective::IS_CLIENT &&
1942 connection()->multi_port_stats() != nullptr && !should_keep_alive &&
1943 ShouldKeepConnectionAlive()) {
1944 connection()->MaybeProbeMultiPortPath();
1945 }
1946 }
1947
GetNextOutgoingBidirectionalStreamId()1948 QuicStreamId QuicSession::GetNextOutgoingBidirectionalStreamId() {
1949 if (VersionHasIetfQuicFrames(transport_version())) {
1950 return ietf_streamid_manager_.GetNextOutgoingBidirectionalStreamId();
1951 }
1952 return stream_id_manager_.GetNextOutgoingStreamId();
1953 }
1954
GetNextOutgoingUnidirectionalStreamId()1955 QuicStreamId QuicSession::GetNextOutgoingUnidirectionalStreamId() {
1956 if (VersionHasIetfQuicFrames(transport_version())) {
1957 return ietf_streamid_manager_.GetNextOutgoingUnidirectionalStreamId();
1958 }
1959 return stream_id_manager_.GetNextOutgoingStreamId();
1960 }
1961
CanOpenNextOutgoingBidirectionalStream()1962 bool QuicSession::CanOpenNextOutgoingBidirectionalStream() {
1963 if (liveness_testing_in_progress_) {
1964 QUICHE_DCHECK_EQ(Perspective::IS_CLIENT, perspective());
1965 QUIC_CODE_COUNT(
1966 quic_client_fails_to_create_stream_liveness_testing_in_progress);
1967 return false;
1968 }
1969 if (!VersionHasIetfQuicFrames(transport_version())) {
1970 if (!stream_id_manager_.CanOpenNextOutgoingStream()) {
1971 return false;
1972 }
1973 } else {
1974 if (!ietf_streamid_manager_.CanOpenNextOutgoingBidirectionalStream()) {
1975 QUIC_CODE_COUNT(
1976 quic_fails_to_create_stream_close_too_many_streams_created);
1977 if (is_configured_) {
1978 // Send STREAM_BLOCKED after config negotiated.
1979 control_frame_manager_.WriteOrBufferStreamsBlocked(
1980 ietf_streamid_manager_.max_outgoing_bidirectional_streams(),
1981 /*unidirectional=*/false);
1982 }
1983 return false;
1984 }
1985 }
1986 if (perspective() == Perspective::IS_CLIENT &&
1987 connection_->MaybeTestLiveness()) {
1988 // Now is relatively close to the idle timeout having the risk that requests
1989 // could be discarded at the server.
1990 liveness_testing_in_progress_ = true;
1991 QUIC_CODE_COUNT(quic_client_fails_to_create_stream_close_to_idle_timeout);
1992 return false;
1993 }
1994 return true;
1995 }
1996
CanOpenNextOutgoingUnidirectionalStream()1997 bool QuicSession::CanOpenNextOutgoingUnidirectionalStream() {
1998 if (!VersionHasIetfQuicFrames(transport_version())) {
1999 return stream_id_manager_.CanOpenNextOutgoingStream();
2000 }
2001 if (ietf_streamid_manager_.CanOpenNextOutgoingUnidirectionalStream()) {
2002 return true;
2003 }
2004 if (is_configured_) {
2005 // Send STREAM_BLOCKED after config negotiated.
2006 control_frame_manager_.WriteOrBufferStreamsBlocked(
2007 ietf_streamid_manager_.max_outgoing_unidirectional_streams(),
2008 /*unidirectional=*/true);
2009 }
2010 return false;
2011 }
2012
GetAdvertisedMaxIncomingBidirectionalStreams() const2013 QuicStreamCount QuicSession::GetAdvertisedMaxIncomingBidirectionalStreams()
2014 const {
2015 QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
2016 return ietf_streamid_manager_.advertised_max_incoming_bidirectional_streams();
2017 }
2018
GetOrCreateStream(const QuicStreamId stream_id)2019 QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
2020 QUICHE_DCHECK(!pending_stream_map_.contains(stream_id));
2021 if (QuicUtils::IsCryptoStreamId(transport_version(), stream_id)) {
2022 return GetMutableCryptoStream();
2023 }
2024
2025 StreamMap::iterator it = stream_map_.find(stream_id);
2026 if (it != stream_map_.end()) {
2027 return it->second->IsZombie() ? nullptr : it->second.get();
2028 }
2029
2030 if (IsClosedStream(stream_id)) {
2031 return nullptr;
2032 }
2033
2034 if (!IsIncomingStream(stream_id)) {
2035 HandleFrameOnNonexistentOutgoingStream(stream_id);
2036 return nullptr;
2037 }
2038
2039 // TODO(fkastenholz): If we are creating a new stream and we have sent a
2040 // goaway, we should ignore the stream creation. Need to add code to A) test
2041 // if goaway was sent ("if (transport_goaway_sent_)") and B) reject stream
2042 // creation ("return nullptr")
2043
2044 if (!MaybeIncreaseLargestPeerStreamId(stream_id)) {
2045 return nullptr;
2046 }
2047
2048 if (!VersionHasIetfQuicFrames(transport_version()) &&
2049 !stream_id_manager_.CanOpenIncomingStream()) {
2050 // Refuse to open the stream.
2051 ResetStream(stream_id, QUIC_REFUSED_STREAM);
2052 return nullptr;
2053 }
2054
2055 return CreateIncomingStream(stream_id);
2056 }
2057
StreamDraining(QuicStreamId stream_id,bool unidirectional)2058 void QuicSession::StreamDraining(QuicStreamId stream_id, bool unidirectional) {
2059 QUICHE_DCHECK(stream_map_.contains(stream_id));
2060 QUIC_DVLOG(1) << ENDPOINT << "Stream " << stream_id << " is draining";
2061 if (VersionHasIetfQuicFrames(transport_version())) {
2062 ietf_streamid_manager_.OnStreamClosed(stream_id);
2063 } else {
2064 stream_id_manager_.OnStreamClosed(
2065 /*is_incoming=*/IsIncomingStream(stream_id));
2066 }
2067 ++num_draining_streams_;
2068 if (!IsIncomingStream(stream_id)) {
2069 ++num_outgoing_draining_streams_;
2070 if (!VersionHasIetfQuicFrames(transport_version())) {
2071 OnCanCreateNewOutgoingStream(unidirectional);
2072 }
2073 }
2074 }
2075
MaybeIncreaseLargestPeerStreamId(const QuicStreamId stream_id)2076 bool QuicSession::MaybeIncreaseLargestPeerStreamId(
2077 const QuicStreamId stream_id) {
2078 if (VersionHasIetfQuicFrames(transport_version())) {
2079 std::string error_details;
2080 if (ietf_streamid_manager_.MaybeIncreaseLargestPeerStreamId(
2081 stream_id, &error_details)) {
2082 return true;
2083 }
2084 connection()->CloseConnection(
2085 QUIC_INVALID_STREAM_ID, error_details,
2086 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2087 return false;
2088 }
2089 if (!stream_id_manager_.MaybeIncreaseLargestPeerStreamId(stream_id)) {
2090 connection()->CloseConnection(
2091 QUIC_TOO_MANY_AVAILABLE_STREAMS,
2092 absl::StrCat(stream_id, " exceeds available streams ",
2093 stream_id_manager_.MaxAvailableStreams()),
2094 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2095 return false;
2096 }
2097 return true;
2098 }
2099
ShouldYield(QuicStreamId stream_id)2100 bool QuicSession::ShouldYield(QuicStreamId stream_id) {
2101 if (stream_id == currently_writing_stream_id_) {
2102 return false;
2103 }
2104 return write_blocked_streams()->ShouldYield(stream_id);
2105 }
2106
GetOrCreatePendingStream(QuicStreamId stream_id)2107 PendingStream* QuicSession::GetOrCreatePendingStream(QuicStreamId stream_id) {
2108 auto it = pending_stream_map_.find(stream_id);
2109 if (it != pending_stream_map_.end()) {
2110 return it->second.get();
2111 }
2112
2113 if (IsClosedStream(stream_id) ||
2114 !MaybeIncreaseLargestPeerStreamId(stream_id)) {
2115 return nullptr;
2116 }
2117
2118 auto pending = std::make_unique<PendingStream>(stream_id, this);
2119 PendingStream* unowned_pending = pending.get();
2120 pending_stream_map_[stream_id] = std::move(pending);
2121 return unowned_pending;
2122 }
2123
set_largest_peer_created_stream_id(QuicStreamId largest_peer_created_stream_id)2124 void QuicSession::set_largest_peer_created_stream_id(
2125 QuicStreamId largest_peer_created_stream_id) {
2126 QUICHE_DCHECK(!VersionHasIetfQuicFrames(transport_version()));
2127 stream_id_manager_.set_largest_peer_created_stream_id(
2128 largest_peer_created_stream_id);
2129 }
2130
GetLargestPeerCreatedStreamId(bool unidirectional) const2131 QuicStreamId QuicSession::GetLargestPeerCreatedStreamId(
2132 bool unidirectional) const {
2133 // This method is only used in IETF QUIC.
2134 QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
2135 return ietf_streamid_manager_.GetLargestPeerCreatedStreamId(unidirectional);
2136 }
2137
DeleteConnection()2138 void QuicSession::DeleteConnection() {
2139 if (connection_) {
2140 delete connection_;
2141 connection_ = nullptr;
2142 }
2143 }
2144
MaybeSetStreamPriority(QuicStreamId stream_id,const QuicStreamPriority & priority)2145 bool QuicSession::MaybeSetStreamPriority(QuicStreamId stream_id,
2146 const QuicStreamPriority& priority) {
2147 auto active_stream = stream_map_.find(stream_id);
2148 if (active_stream != stream_map_.end()) {
2149 active_stream->second->SetPriority(priority);
2150 return true;
2151 }
2152
2153 return false;
2154 }
2155
IsClosedStream(QuicStreamId id)2156 bool QuicSession::IsClosedStream(QuicStreamId id) {
2157 QUICHE_DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
2158 if (IsOpenStream(id)) {
2159 // Stream is active
2160 return false;
2161 }
2162
2163 if (VersionHasIetfQuicFrames(transport_version())) {
2164 return !ietf_streamid_manager_.IsAvailableStream(id);
2165 }
2166
2167 return !stream_id_manager_.IsAvailableStream(id);
2168 }
2169
IsOpenStream(QuicStreamId id)2170 bool QuicSession::IsOpenStream(QuicStreamId id) {
2171 QUICHE_DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
2172 const StreamMap::iterator it = stream_map_.find(id);
2173 if (it != stream_map_.end()) {
2174 return !it->second->IsZombie();
2175 }
2176 if (pending_stream_map_.contains(id) ||
2177 QuicUtils::IsCryptoStreamId(transport_version(), id)) {
2178 // Stream is active
2179 return true;
2180 }
2181 return false;
2182 }
2183
IsStaticStream(QuicStreamId id) const2184 bool QuicSession::IsStaticStream(QuicStreamId id) const {
2185 auto it = stream_map_.find(id);
2186 if (it == stream_map_.end()) {
2187 return false;
2188 }
2189 return it->second->is_static();
2190 }
2191
GetNumActiveStreams() const2192 size_t QuicSession::GetNumActiveStreams() const {
2193 QUICHE_DCHECK_GE(
2194 static_cast<QuicStreamCount>(stream_map_.size()),
2195 num_static_streams_ + num_draining_streams_ + num_zombie_streams_);
2196 return stream_map_.size() - num_draining_streams_ - num_static_streams_ -
2197 num_zombie_streams_;
2198 }
2199
MarkConnectionLevelWriteBlocked(QuicStreamId id)2200 void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
2201 if (GetOrCreateStream(id) == nullptr) {
2202 QUIC_BUG(quic_bug_10866_11)
2203 << "Marking unknown stream " << id << " blocked.";
2204 QUIC_LOG_FIRST_N(ERROR, 2) << QuicStackTrace();
2205 }
2206
2207 QUIC_DVLOG(1) << ENDPOINT << "Adding stream " << id
2208 << " to write-blocked list";
2209
2210 write_blocked_streams_->AddStream(id);
2211 }
2212
HasDataToWrite() const2213 bool QuicSession::HasDataToWrite() const {
2214 return write_blocked_streams_->HasWriteBlockedSpecialStream() ||
2215 write_blocked_streams_->HasWriteBlockedDataStreams() ||
2216 connection_->HasQueuedData() ||
2217 !streams_with_pending_retransmission_.empty() ||
2218 control_frame_manager_.WillingToWrite();
2219 }
2220
OnAckNeedsRetransmittableFrame()2221 void QuicSession::OnAckNeedsRetransmittableFrame() {
2222 flow_controller_.SendWindowUpdate();
2223 }
2224
SendAckFrequency(const QuicAckFrequencyFrame & frame)2225 void QuicSession::SendAckFrequency(const QuicAckFrequencyFrame& frame) {
2226 control_frame_manager_.WriteOrBufferAckFrequency(frame);
2227 }
2228
SendNewConnectionId(const QuicNewConnectionIdFrame & frame)2229 void QuicSession::SendNewConnectionId(const QuicNewConnectionIdFrame& frame) {
2230 control_frame_manager_.WriteOrBufferNewConnectionId(
2231 frame.connection_id, frame.sequence_number, frame.retire_prior_to,
2232 frame.stateless_reset_token);
2233 }
2234
SendRetireConnectionId(uint64_t sequence_number)2235 void QuicSession::SendRetireConnectionId(uint64_t sequence_number) {
2236 if (GetQuicReloadableFlag(
2237 quic_no_write_control_frame_upon_connection_close2)) {
2238 QUIC_RELOADABLE_FLAG_COUNT(
2239 quic_no_write_control_frame_upon_connection_close2);
2240 if (!connection_->connected()) {
2241 return;
2242 }
2243 }
2244 control_frame_manager_.WriteOrBufferRetireConnectionId(sequence_number);
2245 }
2246
MaybeReserveConnectionId(const QuicConnectionId & server_connection_id)2247 bool QuicSession::MaybeReserveConnectionId(
2248 const QuicConnectionId& server_connection_id) {
2249 if (visitor_) {
2250 return visitor_->TryAddNewConnectionId(
2251 connection_->GetOneActiveServerConnectionId(), server_connection_id);
2252 }
2253 return true;
2254 }
2255
OnServerConnectionIdRetired(const QuicConnectionId & server_connection_id)2256 void QuicSession::OnServerConnectionIdRetired(
2257 const QuicConnectionId& server_connection_id) {
2258 if (visitor_) {
2259 visitor_->OnConnectionIdRetired(server_connection_id);
2260 }
2261 }
2262
IsConnectionFlowControlBlocked() const2263 bool QuicSession::IsConnectionFlowControlBlocked() const {
2264 return flow_controller_.IsBlocked();
2265 }
2266
IsStreamFlowControlBlocked()2267 bool QuicSession::IsStreamFlowControlBlocked() {
2268 for (auto const& kv : stream_map_) {
2269 if (kv.second->IsFlowControlBlocked()) {
2270 return true;
2271 }
2272 }
2273 if (!QuicVersionUsesCryptoFrames(transport_version()) &&
2274 GetMutableCryptoStream()->IsFlowControlBlocked()) {
2275 return true;
2276 }
2277 return false;
2278 }
2279
MaxAvailableBidirectionalStreams() const2280 size_t QuicSession::MaxAvailableBidirectionalStreams() const {
2281 if (VersionHasIetfQuicFrames(transport_version())) {
2282 return ietf_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
2283 }
2284 return stream_id_manager_.MaxAvailableStreams();
2285 }
2286
MaxAvailableUnidirectionalStreams() const2287 size_t QuicSession::MaxAvailableUnidirectionalStreams() const {
2288 if (VersionHasIetfQuicFrames(transport_version())) {
2289 return ietf_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
2290 }
2291 return stream_id_manager_.MaxAvailableStreams();
2292 }
2293
IsIncomingStream(QuicStreamId id) const2294 bool QuicSession::IsIncomingStream(QuicStreamId id) const {
2295 if (VersionHasIetfQuicFrames(transport_version())) {
2296 return !QuicUtils::IsOutgoingStreamId(version(), id, perspective_);
2297 }
2298 return stream_id_manager_.IsIncomingStream(id);
2299 }
2300
MaybeCloseZombieStream(QuicStreamId id)2301 void QuicSession::MaybeCloseZombieStream(QuicStreamId id) {
2302 auto it = stream_map_.find(id);
2303 if (it == stream_map_.end()) {
2304 return;
2305 }
2306 --num_zombie_streams_;
2307 closed_streams_.push_back(std::move(it->second));
2308 stream_map_.erase(it);
2309
2310 if (!closed_streams_clean_up_alarm_->IsSet()) {
2311 closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
2312 }
2313 // Do not retransmit data of a closed stream.
2314 streams_with_pending_retransmission_.erase(id);
2315 connection_->QuicBugIfHasPendingFrames(id);
2316 }
2317
GetStream(QuicStreamId id) const2318 QuicStream* QuicSession::GetStream(QuicStreamId id) const {
2319 auto active_stream = stream_map_.find(id);
2320 if (active_stream != stream_map_.end()) {
2321 return active_stream->second.get();
2322 }
2323
2324 if (QuicUtils::IsCryptoStreamId(transport_version(), id)) {
2325 return const_cast<QuicCryptoStream*>(GetCryptoStream());
2326 }
2327
2328 return nullptr;
2329 }
2330
GetActiveStream(QuicStreamId id) const2331 QuicStream* QuicSession::GetActiveStream(QuicStreamId id) const {
2332 auto stream = stream_map_.find(id);
2333 if (stream != stream_map_.end() && !stream->second->is_static()) {
2334 return stream->second.get();
2335 }
2336 return nullptr;
2337 }
2338
OnFrameAcked(const QuicFrame & frame,QuicTime::Delta ack_delay_time,QuicTime receive_timestamp)2339 bool QuicSession::OnFrameAcked(const QuicFrame& frame,
2340 QuicTime::Delta ack_delay_time,
2341 QuicTime receive_timestamp) {
2342 if (frame.type == MESSAGE_FRAME) {
2343 OnMessageAcked(frame.message_frame->message_id, receive_timestamp);
2344 return true;
2345 }
2346 if (frame.type == CRYPTO_FRAME) {
2347 return GetMutableCryptoStream()->OnCryptoFrameAcked(*frame.crypto_frame,
2348 ack_delay_time);
2349 }
2350 if (frame.type != STREAM_FRAME) {
2351 bool acked = control_frame_manager_.OnControlFrameAcked(frame);
2352 if (acked && frame.type == MAX_STREAMS_FRAME) {
2353 // Since there is a 2 frame limit on the number of outstanding max_streams
2354 // frames, when an outstanding max_streams frame is ack'd that frees up
2355 // room to potntially send another.
2356 ietf_streamid_manager_.MaybeSendMaxStreamsFrame();
2357 }
2358 return acked;
2359 }
2360 bool new_stream_data_acked = false;
2361 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2362 // Stream can already be reset when sent frame gets acked.
2363 if (stream != nullptr) {
2364 QuicByteCount newly_acked_length = 0;
2365 new_stream_data_acked = stream->OnStreamFrameAcked(
2366 frame.stream_frame.offset, frame.stream_frame.data_length,
2367 frame.stream_frame.fin, ack_delay_time, receive_timestamp,
2368 &newly_acked_length);
2369 if (!stream->HasPendingRetransmission()) {
2370 streams_with_pending_retransmission_.erase(stream->id());
2371 }
2372 }
2373 return new_stream_data_acked;
2374 }
2375
OnStreamFrameRetransmitted(const QuicStreamFrame & frame)2376 void QuicSession::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) {
2377 QuicStream* stream = GetStream(frame.stream_id);
2378 if (stream == nullptr) {
2379 QUIC_BUG(quic_bug_10866_12)
2380 << "Stream: " << frame.stream_id << " is closed when " << frame
2381 << " is retransmitted.";
2382 connection()->CloseConnection(
2383 QUIC_INTERNAL_ERROR, "Attempt to retransmit frame of a closed stream",
2384 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2385 return;
2386 }
2387 stream->OnStreamFrameRetransmitted(frame.offset, frame.data_length,
2388 frame.fin);
2389 }
2390
OnFrameLost(const QuicFrame & frame)2391 void QuicSession::OnFrameLost(const QuicFrame& frame) {
2392 if (frame.type == MESSAGE_FRAME) {
2393 ++total_datagrams_lost_;
2394 OnMessageLost(frame.message_frame->message_id);
2395 return;
2396 }
2397 if (frame.type == CRYPTO_FRAME) {
2398 GetMutableCryptoStream()->OnCryptoFrameLost(frame.crypto_frame);
2399 return;
2400 }
2401 if (frame.type != STREAM_FRAME) {
2402 control_frame_manager_.OnControlFrameLost(frame);
2403 return;
2404 }
2405 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2406 if (stream == nullptr) {
2407 return;
2408 }
2409 stream->OnStreamFrameLost(frame.stream_frame.offset,
2410 frame.stream_frame.data_length,
2411 frame.stream_frame.fin);
2412 if (stream->HasPendingRetransmission() &&
2413 !streams_with_pending_retransmission_.contains(
2414 frame.stream_frame.stream_id)) {
2415 streams_with_pending_retransmission_.insert(
2416 std::make_pair(frame.stream_frame.stream_id, true));
2417 }
2418 }
2419
RetransmitFrames(const QuicFrames & frames,TransmissionType type)2420 bool QuicSession::RetransmitFrames(const QuicFrames& frames,
2421 TransmissionType type) {
2422 QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
2423 for (const QuicFrame& frame : frames) {
2424 if (frame.type == MESSAGE_FRAME) {
2425 // Do not retransmit MESSAGE frames.
2426 continue;
2427 }
2428 if (frame.type == CRYPTO_FRAME) {
2429 if (!GetMutableCryptoStream()->RetransmitData(frame.crypto_frame, type)) {
2430 return false;
2431 }
2432 continue;
2433 }
2434 if (frame.type != STREAM_FRAME) {
2435 if (!control_frame_manager_.RetransmitControlFrame(frame, type)) {
2436 return false;
2437 }
2438 continue;
2439 }
2440 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2441 if (stream != nullptr &&
2442 !stream->RetransmitStreamData(frame.stream_frame.offset,
2443 frame.stream_frame.data_length,
2444 frame.stream_frame.fin, type)) {
2445 return false;
2446 }
2447 }
2448 return true;
2449 }
2450
IsFrameOutstanding(const QuicFrame & frame) const2451 bool QuicSession::IsFrameOutstanding(const QuicFrame& frame) const {
2452 if (frame.type == MESSAGE_FRAME) {
2453 return false;
2454 }
2455 if (frame.type == CRYPTO_FRAME) {
2456 return GetCryptoStream()->IsFrameOutstanding(
2457 frame.crypto_frame->level, frame.crypto_frame->offset,
2458 frame.crypto_frame->data_length);
2459 }
2460 if (frame.type != STREAM_FRAME) {
2461 return control_frame_manager_.IsControlFrameOutstanding(frame);
2462 }
2463 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2464 return stream != nullptr &&
2465 stream->IsStreamFrameOutstanding(frame.stream_frame.offset,
2466 frame.stream_frame.data_length,
2467 frame.stream_frame.fin);
2468 }
2469
HasUnackedCryptoData() const2470 bool QuicSession::HasUnackedCryptoData() const {
2471 const QuicCryptoStream* crypto_stream = GetCryptoStream();
2472 return crypto_stream->IsWaitingForAcks() || crypto_stream->HasBufferedData();
2473 }
2474
HasUnackedStreamData() const2475 bool QuicSession::HasUnackedStreamData() const {
2476 for (const auto& it : stream_map_) {
2477 if (it.second->IsWaitingForAcks()) {
2478 return true;
2479 }
2480 }
2481 return false;
2482 }
2483
GetHandshakeState() const2484 HandshakeState QuicSession::GetHandshakeState() const {
2485 return GetCryptoStream()->GetHandshakeState();
2486 }
2487
GetFlowControlSendWindowSize(QuicStreamId id)2488 QuicByteCount QuicSession::GetFlowControlSendWindowSize(QuicStreamId id) {
2489 QuicStream* stream = GetActiveStream(id);
2490 if (stream == nullptr) {
2491 // No flow control for invalid or inactive stream ids. Returning uint64max
2492 // allows QuicPacketCreator to write as much data as possible.
2493 return std::numeric_limits<QuicByteCount>::max();
2494 }
2495 return stream->CalculateSendWindowSize();
2496 }
2497
WriteStreamData(QuicStreamId id,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2498 WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
2499 QuicStreamOffset offset,
2500 QuicByteCount data_length,
2501 QuicDataWriter* writer) {
2502 QuicStream* stream = GetStream(id);
2503 if (stream == nullptr) {
2504 // This causes the connection to be closed because of failed to serialize
2505 // packet.
2506 QUIC_BUG(quic_bug_10866_13)
2507 << "Stream " << id << " does not exist when trying to write data."
2508 << " version:" << transport_version();
2509 return STREAM_MISSING;
2510 }
2511 if (stream->WriteStreamData(offset, data_length, writer)) {
2512 return WRITE_SUCCESS;
2513 }
2514 return WRITE_FAILED;
2515 }
2516
WriteCryptoData(EncryptionLevel level,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2517 bool QuicSession::WriteCryptoData(EncryptionLevel level,
2518 QuicStreamOffset offset,
2519 QuicByteCount data_length,
2520 QuicDataWriter* writer) {
2521 return GetMutableCryptoStream()->WriteCryptoFrame(level, offset, data_length,
2522 writer);
2523 }
2524
GetStatelessResetToken() const2525 StatelessResetToken QuicSession::GetStatelessResetToken() const {
2526 return QuicUtils::GenerateStatelessResetToken(connection_->connection_id());
2527 }
2528
CanWriteStreamData() const2529 bool QuicSession::CanWriteStreamData() const {
2530 // Don't write stream data if there are queued data packets.
2531 if (connection_->HasQueuedPackets()) {
2532 return false;
2533 }
2534 // Immediately write handshake data.
2535 if (HasPendingHandshake()) {
2536 return true;
2537 }
2538 return connection_->CanWrite(HAS_RETRANSMITTABLE_DATA);
2539 }
2540
RetransmitLostData()2541 bool QuicSession::RetransmitLostData() {
2542 QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
2543 // Retransmit crypto data first.
2544 bool uses_crypto_frames = QuicVersionUsesCryptoFrames(transport_version());
2545 if (QuicCryptoStream* const crypto_stream = GetMutableCryptoStream();
2546 uses_crypto_frames && crypto_stream->HasPendingCryptoRetransmission()) {
2547 crypto_stream->WritePendingCryptoRetransmission();
2548 }
2549 // Retransmit crypto data in stream 1 frames (version < 47).
2550 if (!uses_crypto_frames &&
2551 streams_with_pending_retransmission_.contains(
2552 QuicUtils::GetCryptoStreamId(transport_version()))) {
2553 // Retransmit crypto data first.
2554 QuicStream* const crypto_stream =
2555 GetStream(QuicUtils::GetCryptoStreamId(transport_version()));
2556 crypto_stream->OnCanWrite();
2557 QUICHE_DCHECK(CheckStreamWriteBlocked(crypto_stream));
2558 if (crypto_stream->HasPendingRetransmission()) {
2559 // Connection is write blocked.
2560 return false;
2561 } else {
2562 streams_with_pending_retransmission_.erase(
2563 QuicUtils::GetCryptoStreamId(transport_version()));
2564 }
2565 }
2566 if (control_frame_manager_.HasPendingRetransmission()) {
2567 control_frame_manager_.OnCanWrite();
2568 if (control_frame_manager_.HasPendingRetransmission()) {
2569 return false;
2570 }
2571 }
2572 while (!streams_with_pending_retransmission_.empty()) {
2573 if (!CanWriteStreamData()) {
2574 break;
2575 }
2576 // Retransmit lost data on headers and data streams.
2577 const QuicStreamId id = streams_with_pending_retransmission_.begin()->first;
2578 QuicStream* stream = GetStream(id);
2579 if (stream != nullptr) {
2580 stream->OnCanWrite();
2581 QUICHE_DCHECK(CheckStreamWriteBlocked(stream));
2582 if (stream->HasPendingRetransmission()) {
2583 // Connection is write blocked.
2584 break;
2585 } else if (!streams_with_pending_retransmission_.empty() &&
2586 streams_with_pending_retransmission_.begin()->first == id) {
2587 // Retransmit lost data may cause connection close. If this stream
2588 // has not yet sent fin, a RST_STREAM will be sent and it will be
2589 // removed from streams_with_pending_retransmission_.
2590 streams_with_pending_retransmission_.pop_front();
2591 }
2592 } else {
2593 QUIC_BUG(quic_bug_10866_14)
2594 << "Try to retransmit data of a closed stream";
2595 streams_with_pending_retransmission_.pop_front();
2596 }
2597 }
2598
2599 return streams_with_pending_retransmission_.empty();
2600 }
2601
NeuterUnencryptedData()2602 void QuicSession::NeuterUnencryptedData() {
2603 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
2604 crypto_stream->NeuterUnencryptedStreamData();
2605 if (!crypto_stream->HasPendingRetransmission() &&
2606 !QuicVersionUsesCryptoFrames(transport_version())) {
2607 streams_with_pending_retransmission_.erase(
2608 QuicUtils::GetCryptoStreamId(transport_version()));
2609 }
2610 connection_->NeuterUnencryptedPackets();
2611 }
2612
SetTransmissionType(TransmissionType type)2613 void QuicSession::SetTransmissionType(TransmissionType type) {
2614 connection_->SetTransmissionType(type);
2615 }
2616
SendMessage(absl::Span<quiche::QuicheMemSlice> message)2617 MessageResult QuicSession::SendMessage(
2618 absl::Span<quiche::QuicheMemSlice> message) {
2619 return SendMessage(message, /*flush=*/false);
2620 }
2621
SendMessage(quiche::QuicheMemSlice message)2622 MessageResult QuicSession::SendMessage(quiche::QuicheMemSlice message) {
2623 return SendMessage(absl::MakeSpan(&message, 1), /*flush=*/false);
2624 }
2625
SendMessage(absl::Span<quiche::QuicheMemSlice> message,bool flush)2626 MessageResult QuicSession::SendMessage(
2627 absl::Span<quiche::QuicheMemSlice> message, bool flush) {
2628 QUICHE_DCHECK(connection_->connected())
2629 << ENDPOINT << "Try to write messages when connection is closed.";
2630 if (!IsEncryptionEstablished()) {
2631 return {MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED, 0};
2632 }
2633 QuicConnection::ScopedEncryptionLevelContext context(
2634 connection(), GetEncryptionLevelToSendApplicationData());
2635 MessageStatus result =
2636 connection_->SendMessage(last_message_id_ + 1, message, flush);
2637 if (result == MESSAGE_STATUS_SUCCESS) {
2638 return {result, ++last_message_id_};
2639 }
2640 return {result, 0};
2641 }
2642
OnMessageAcked(QuicMessageId message_id,QuicTime)2643 void QuicSession::OnMessageAcked(QuicMessageId message_id,
2644 QuicTime /*receive_timestamp*/) {
2645 QUIC_DVLOG(1) << ENDPOINT << "message " << message_id << " gets acked.";
2646 }
2647
OnMessageLost(QuicMessageId message_id)2648 void QuicSession::OnMessageLost(QuicMessageId message_id) {
2649 QUIC_DVLOG(1) << ENDPOINT << "message " << message_id
2650 << " is considered lost";
2651 }
2652
CleanUpClosedStreams()2653 void QuicSession::CleanUpClosedStreams() { closed_streams_.clear(); }
2654
GetCurrentLargestMessagePayload() const2655 QuicPacketLength QuicSession::GetCurrentLargestMessagePayload() const {
2656 return connection_->GetCurrentLargestMessagePayload();
2657 }
2658
GetGuaranteedLargestMessagePayload() const2659 QuicPacketLength QuicSession::GetGuaranteedLargestMessagePayload() const {
2660 return connection_->GetGuaranteedLargestMessagePayload();
2661 }
2662
next_outgoing_bidirectional_stream_id() const2663 QuicStreamId QuicSession::next_outgoing_bidirectional_stream_id() const {
2664 if (VersionHasIetfQuicFrames(transport_version())) {
2665 return ietf_streamid_manager_.next_outgoing_bidirectional_stream_id();
2666 }
2667 return stream_id_manager_.next_outgoing_stream_id();
2668 }
2669
next_outgoing_unidirectional_stream_id() const2670 QuicStreamId QuicSession::next_outgoing_unidirectional_stream_id() const {
2671 if (VersionHasIetfQuicFrames(transport_version())) {
2672 return ietf_streamid_manager_.next_outgoing_unidirectional_stream_id();
2673 }
2674 return stream_id_manager_.next_outgoing_stream_id();
2675 }
2676
OnMaxStreamsFrame(const QuicMaxStreamsFrame & frame)2677 bool QuicSession::OnMaxStreamsFrame(const QuicMaxStreamsFrame& frame) {
2678 const bool allow_new_streams =
2679 frame.unidirectional
2680 ? ietf_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
2681 frame.stream_count)
2682 : ietf_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
2683 frame.stream_count);
2684 if (allow_new_streams) {
2685 OnCanCreateNewOutgoingStream(frame.unidirectional);
2686 }
2687
2688 return true;
2689 }
2690
OnStreamsBlockedFrame(const QuicStreamsBlockedFrame & frame)2691 bool QuicSession::OnStreamsBlockedFrame(const QuicStreamsBlockedFrame& frame) {
2692 std::string error_details;
2693 if (ietf_streamid_manager_.OnStreamsBlockedFrame(frame, &error_details)) {
2694 return true;
2695 }
2696 connection_->CloseConnection(
2697 QUIC_STREAMS_BLOCKED_ERROR, error_details,
2698 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2699 return false;
2700 }
2701
max_open_incoming_bidirectional_streams() const2702 size_t QuicSession::max_open_incoming_bidirectional_streams() const {
2703 if (VersionHasIetfQuicFrames(transport_version())) {
2704 return ietf_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
2705 }
2706 return stream_id_manager_.max_open_incoming_streams();
2707 }
2708
max_open_incoming_unidirectional_streams() const2709 size_t QuicSession::max_open_incoming_unidirectional_streams() const {
2710 if (VersionHasIetfQuicFrames(transport_version())) {
2711 return ietf_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
2712 }
2713 return stream_id_manager_.max_open_incoming_streams();
2714 }
2715
SelectAlpn(const std::vector<absl::string_view> & alpns) const2716 std::vector<absl::string_view>::const_iterator QuicSession::SelectAlpn(
2717 const std::vector<absl::string_view>& alpns) const {
2718 const std::string alpn = AlpnForVersion(connection()->version());
2719 return std::find(alpns.cbegin(), alpns.cend(), alpn);
2720 }
2721
OnAlpnSelected(absl::string_view alpn)2722 void QuicSession::OnAlpnSelected(absl::string_view alpn) {
2723 QUIC_DLOG(INFO) << (perspective() == Perspective::IS_SERVER ? "Server: "
2724 : "Client: ")
2725 << "ALPN selected: " << alpn;
2726 }
2727
NeuterCryptoDataOfEncryptionLevel(EncryptionLevel level)2728 void QuicSession::NeuterCryptoDataOfEncryptionLevel(EncryptionLevel level) {
2729 GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(level);
2730 }
2731
PerformActionOnActiveStreams(quiche::UnretainedCallback<bool (QuicStream *)> action)2732 void QuicSession::PerformActionOnActiveStreams(
2733 quiche::UnretainedCallback<bool(QuicStream*)> action) {
2734 std::vector<QuicStream*> active_streams;
2735 for (const auto& it : stream_map_) {
2736 if (!it.second->is_static() && !it.second->IsZombie()) {
2737 active_streams.push_back(it.second.get());
2738 }
2739 }
2740
2741 for (QuicStream* stream : active_streams) {
2742 if (!action(stream)) {
2743 return;
2744 }
2745 }
2746 }
2747
PerformActionOnActiveStreams(quiche::UnretainedCallback<bool (QuicStream *)> action) const2748 void QuicSession::PerformActionOnActiveStreams(
2749 quiche::UnretainedCallback<bool(QuicStream*)> action) const {
2750 for (const auto& it : stream_map_) {
2751 if (!it.second->is_static() && !it.second->IsZombie() &&
2752 !action(it.second.get())) {
2753 return;
2754 }
2755 }
2756 }
2757
GetEncryptionLevelToSendApplicationData() const2758 EncryptionLevel QuicSession::GetEncryptionLevelToSendApplicationData() const {
2759 return connection_->framer().GetEncryptionLevelToSendApplicationData();
2760 }
2761
ProcessAllPendingStreams()2762 void QuicSession::ProcessAllPendingStreams() {
2763 std::vector<PendingStream*> pending_streams;
2764 pending_streams.reserve(pending_stream_map_.size());
2765 for (auto it = pending_stream_map_.begin(); it != pending_stream_map_.end();
2766 ++it) {
2767 pending_streams.push_back(it->second.get());
2768 }
2769 for (auto* pending_stream : pending_streams) {
2770 if (!MaybeProcessPendingStream(pending_stream)) {
2771 // Defer any further pending stream processing to the next event loop.
2772 return;
2773 }
2774 }
2775 }
2776
ValidatePath(std::unique_ptr<QuicPathValidationContext> context,std::unique_ptr<QuicPathValidator::ResultDelegate> result_delegate,PathValidationReason reason)2777 void QuicSession::ValidatePath(
2778 std::unique_ptr<QuicPathValidationContext> context,
2779 std::unique_ptr<QuicPathValidator::ResultDelegate> result_delegate,
2780 PathValidationReason reason) {
2781 connection_->ValidatePath(std::move(context), std::move(result_delegate),
2782 reason);
2783 }
2784
HasPendingPathValidation() const2785 bool QuicSession::HasPendingPathValidation() const {
2786 return connection_->HasPendingPathValidation();
2787 }
2788
MigratePath(const QuicSocketAddress & self_address,const QuicSocketAddress & peer_address,QuicPacketWriter * writer,bool owns_writer)2789 bool QuicSession::MigratePath(const QuicSocketAddress& self_address,
2790 const QuicSocketAddress& peer_address,
2791 QuicPacketWriter* writer, bool owns_writer) {
2792 return connection_->MigratePath(self_address, peer_address, writer,
2793 owns_writer);
2794 }
2795
ValidateToken(absl::string_view token)2796 bool QuicSession::ValidateToken(absl::string_view token) {
2797 QUICHE_DCHECK_EQ(perspective_, Perspective::IS_SERVER);
2798 if (GetQuicFlag(quic_reject_retry_token_in_initial_packet)) {
2799 return false;
2800 }
2801 if (token.empty() || token[0] != kAddressTokenPrefix) {
2802 // Validate the prefix for token received in NEW_TOKEN frame.
2803 return false;
2804 }
2805 const bool valid = GetCryptoStream()->ValidateAddressToken(
2806 absl::string_view(token.data() + 1, token.length() - 1));
2807 if (valid) {
2808 const CachedNetworkParameters* cached_network_params =
2809 GetCryptoStream()->PreviousCachedNetworkParams();
2810 if (cached_network_params != nullptr &&
2811 cached_network_params->timestamp() > 0) {
2812 connection()->OnReceiveConnectionState(*cached_network_params);
2813 }
2814 }
2815 return valid;
2816 }
2817
OnServerPreferredAddressAvailable(const QuicSocketAddress & server_preferred_address)2818 void QuicSession::OnServerPreferredAddressAvailable(
2819 const QuicSocketAddress& server_preferred_address) {
2820 QUICHE_DCHECK_EQ(perspective_, Perspective::IS_CLIENT);
2821 if (visitor_ != nullptr) {
2822 visitor_->OnServerPreferredAddressAvailable(server_preferred_address);
2823 }
2824 }
2825
ProcessPendingStream(PendingStream * pending)2826 QuicStream* QuicSession::ProcessPendingStream(PendingStream* pending) {
2827 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
2828 QUICHE_DCHECK(connection()->connected());
2829 QuicStreamId stream_id = pending->id();
2830 QUIC_BUG_IF(bad pending stream, !IsIncomingStream(stream_id))
2831 << "Pending stream " << stream_id << " is not an incoming stream.";
2832 // TODO(b/305051334) check if this stream is incoming stream before making it
2833 // pending. If not, connection should be closed.
2834 StreamType stream_type = QuicUtils::GetStreamType(
2835 stream_id, perspective(), /*peer_initiated=*/true, version());
2836 switch (stream_type) {
2837 case BIDIRECTIONAL: {
2838 return ProcessBidirectionalPendingStream(pending);
2839 }
2840 case READ_UNIDIRECTIONAL: {
2841 return ProcessReadUnidirectionalPendingStream(pending);
2842 }
2843 case WRITE_UNIDIRECTIONAL:
2844 ABSL_FALLTHROUGH_INTENDED;
2845 case CRYPTO:
2846 QUICHE_BUG(unexpected pending stream)
2847 << "Unexpected pending stream " << stream_id << " with type "
2848 << stream_type;
2849 return nullptr;
2850 }
2851 return nullptr; // Unreachable, unless the enum value is out-of-range
2852 // (potentially undefined behavior)
2853 }
2854
ExceedsPerLoopStreamLimit() const2855 bool QuicSession::ExceedsPerLoopStreamLimit() const {
2856 QUICHE_DCHECK(version().HasIetfQuicFrames());
2857 return new_incoming_streams_in_current_loop_ >=
2858 max_streams_accepted_per_loop_;
2859 }
2860
OnStreamCountReset()2861 void QuicSession::OnStreamCountReset() {
2862 const bool exceeded_per_loop_stream_limit = ExceedsPerLoopStreamLimit();
2863 new_incoming_streams_in_current_loop_ = 0;
2864 if (exceeded_per_loop_stream_limit) {
2865 QUIC_CODE_COUNT_N(quic_pending_stream, 2, 3);
2866 // Convert as many leftover pending streams from last loop to active streams
2867 // as allowed.
2868 ProcessAllPendingStreams();
2869 }
2870 }
2871
2872 #undef ENDPOINT // undef for jumbo builds
2873 } // namespace quic
2874