1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/http/quic_spdy_stream.h"
6
7 #include <limits>
8 #include <memory>
9 #include <optional>
10 #include <string>
11 #include <utility>
12
13 #include "absl/base/macros.h"
14 #include "absl/strings/numbers.h"
15 #include "absl/strings/str_cat.h"
16 #include "absl/strings/string_view.h"
17 #include "quiche/http2/adapter/header_validator.h"
18 #include "quiche/http2/http2_constants.h"
19 #include "quiche/quic/core/http/http_constants.h"
20 #include "quiche/quic/core/http/http_decoder.h"
21 #include "quiche/quic/core/http/http_frames.h"
22 #include "quiche/quic/core/http/quic_spdy_session.h"
23 #include "quiche/quic/core/http/spdy_utils.h"
24 #include "quiche/quic/core/http/web_transport_http3.h"
25 #include "quiche/quic/core/qpack/qpack_decoder.h"
26 #include "quiche/quic/core/qpack/qpack_encoder.h"
27 #include "quiche/quic/core/quic_error_codes.h"
28 #include "quiche/quic/core/quic_stream_priority.h"
29 #include "quiche/quic/core/quic_types.h"
30 #include "quiche/quic/core/quic_utils.h"
31 #include "quiche/quic/core/quic_versions.h"
32 #include "quiche/quic/core/quic_write_blocked_list.h"
33 #include "quiche/quic/core/web_transport_interface.h"
34 #include "quiche/quic/platform/api/quic_bug_tracker.h"
35 #include "quiche/quic/platform/api/quic_flag_utils.h"
36 #include "quiche/quic/platform/api/quic_flags.h"
37 #include "quiche/quic/platform/api/quic_logging.h"
38 #include "quiche/quic/platform/api/quic_testvalue.h"
39 #include "quiche/common/capsule.h"
40 #include "quiche/common/platform/api/quiche_flag_utils.h"
41 #include "quiche/common/platform/api/quiche_logging.h"
42 #include "quiche/common/quiche_mem_slice_storage.h"
43 #include "quiche/common/quiche_text_utils.h"
44 #include "quiche/spdy/core/spdy_protocol.h"
45
46 using ::quiche::Capsule;
47 using ::quiche::CapsuleType;
48 using ::spdy::Http2HeaderBlock;
49
50 namespace quic {
51
52 // Visitor of HttpDecoder that passes data frame to QuicSpdyStream and closes
53 // the connection on unexpected frames.
54 class QuicSpdyStream::HttpDecoderVisitor : public HttpDecoder::Visitor {
55 public:
HttpDecoderVisitor(QuicSpdyStream * stream)56 explicit HttpDecoderVisitor(QuicSpdyStream* stream) : stream_(stream) {}
57 HttpDecoderVisitor(const HttpDecoderVisitor&) = delete;
58 HttpDecoderVisitor& operator=(const HttpDecoderVisitor&) = delete;
59
OnError(HttpDecoder * decoder)60 void OnError(HttpDecoder* decoder) override {
61 stream_->OnUnrecoverableError(decoder->error(), decoder->error_detail());
62 }
63
OnMaxPushIdFrame()64 bool OnMaxPushIdFrame() override {
65 CloseConnectionOnWrongFrame("Max Push Id");
66 return false;
67 }
68
OnGoAwayFrame(const GoAwayFrame &)69 bool OnGoAwayFrame(const GoAwayFrame& /*frame*/) override {
70 CloseConnectionOnWrongFrame("Goaway");
71 return false;
72 }
73
OnSettingsFrameStart(QuicByteCount)74 bool OnSettingsFrameStart(QuicByteCount /*header_length*/) override {
75 CloseConnectionOnWrongFrame("Settings");
76 return false;
77 }
78
OnSettingsFrame(const SettingsFrame &)79 bool OnSettingsFrame(const SettingsFrame& /*frame*/) override {
80 CloseConnectionOnWrongFrame("Settings");
81 return false;
82 }
83
OnDataFrameStart(QuicByteCount header_length,QuicByteCount payload_length)84 bool OnDataFrameStart(QuicByteCount header_length,
85 QuicByteCount payload_length) override {
86 return stream_->OnDataFrameStart(header_length, payload_length);
87 }
88
OnDataFramePayload(absl::string_view payload)89 bool OnDataFramePayload(absl::string_view payload) override {
90 QUICHE_DCHECK(!payload.empty());
91 return stream_->OnDataFramePayload(payload);
92 }
93
OnDataFrameEnd()94 bool OnDataFrameEnd() override { return stream_->OnDataFrameEnd(); }
95
OnHeadersFrameStart(QuicByteCount header_length,QuicByteCount payload_length)96 bool OnHeadersFrameStart(QuicByteCount header_length,
97 QuicByteCount payload_length) override {
98 if (!VersionUsesHttp3(stream_->transport_version())) {
99 CloseConnectionOnWrongFrame("Headers");
100 return false;
101 }
102 return stream_->OnHeadersFrameStart(header_length, payload_length);
103 }
104
OnHeadersFramePayload(absl::string_view payload)105 bool OnHeadersFramePayload(absl::string_view payload) override {
106 QUICHE_DCHECK(!payload.empty());
107 if (!VersionUsesHttp3(stream_->transport_version())) {
108 CloseConnectionOnWrongFrame("Headers");
109 return false;
110 }
111 return stream_->OnHeadersFramePayload(payload);
112 }
113
OnHeadersFrameEnd()114 bool OnHeadersFrameEnd() override {
115 if (!VersionUsesHttp3(stream_->transport_version())) {
116 CloseConnectionOnWrongFrame("Headers");
117 return false;
118 }
119 return stream_->OnHeadersFrameEnd();
120 }
121
OnPriorityUpdateFrameStart(QuicByteCount)122 bool OnPriorityUpdateFrameStart(QuicByteCount /*header_length*/) override {
123 CloseConnectionOnWrongFrame("Priority update");
124 return false;
125 }
126
OnPriorityUpdateFrame(const PriorityUpdateFrame &)127 bool OnPriorityUpdateFrame(const PriorityUpdateFrame& /*frame*/) override {
128 CloseConnectionOnWrongFrame("Priority update");
129 return false;
130 }
131
OnAcceptChFrameStart(QuicByteCount)132 bool OnAcceptChFrameStart(QuicByteCount /*header_length*/) override {
133 CloseConnectionOnWrongFrame("ACCEPT_CH");
134 return false;
135 }
136
OnAcceptChFrame(const AcceptChFrame &)137 bool OnAcceptChFrame(const AcceptChFrame& /*frame*/) override {
138 CloseConnectionOnWrongFrame("ACCEPT_CH");
139 return false;
140 }
141
OnWebTransportStreamFrameType(QuicByteCount header_length,WebTransportSessionId session_id)142 void OnWebTransportStreamFrameType(
143 QuicByteCount header_length, WebTransportSessionId session_id) override {
144 stream_->OnWebTransportStreamFrameType(header_length, session_id);
145 }
146
OnMetadataFrameStart(QuicByteCount header_length,QuicByteCount payload_length)147 bool OnMetadataFrameStart(QuicByteCount header_length,
148 QuicByteCount payload_length) override {
149 if (!VersionUsesHttp3(stream_->transport_version())) {
150 CloseConnectionOnWrongFrame("Metadata");
151 return false;
152 }
153 return stream_->OnMetadataFrameStart(header_length, payload_length);
154 }
155
OnMetadataFramePayload(absl::string_view payload)156 bool OnMetadataFramePayload(absl::string_view payload) override {
157 QUICHE_DCHECK(!payload.empty());
158 if (!VersionUsesHttp3(stream_->transport_version())) {
159 CloseConnectionOnWrongFrame("Metadata");
160 return false;
161 }
162 return stream_->OnMetadataFramePayload(payload);
163 }
164
OnMetadataFrameEnd()165 bool OnMetadataFrameEnd() override {
166 if (!VersionUsesHttp3(stream_->transport_version())) {
167 CloseConnectionOnWrongFrame("Metadata");
168 return false;
169 }
170 return stream_->OnMetadataFrameEnd();
171 }
172
OnUnknownFrameStart(uint64_t frame_type,QuicByteCount header_length,QuicByteCount payload_length)173 bool OnUnknownFrameStart(uint64_t frame_type, QuicByteCount header_length,
174 QuicByteCount payload_length) override {
175 return stream_->OnUnknownFrameStart(frame_type, header_length,
176 payload_length);
177 }
178
OnUnknownFramePayload(absl::string_view payload)179 bool OnUnknownFramePayload(absl::string_view payload) override {
180 return stream_->OnUnknownFramePayload(payload);
181 }
182
OnUnknownFrameEnd()183 bool OnUnknownFrameEnd() override { return stream_->OnUnknownFrameEnd(); }
184
185 private:
CloseConnectionOnWrongFrame(absl::string_view frame_type)186 void CloseConnectionOnWrongFrame(absl::string_view frame_type) {
187 stream_->OnUnrecoverableError(
188 QUIC_HTTP_FRAME_UNEXPECTED_ON_SPDY_STREAM,
189 absl::StrCat(frame_type, " frame received on data stream"));
190 }
191
192 QuicSpdyStream* stream_;
193 };
194
195 #define ENDPOINT \
196 (session()->perspective() == Perspective::IS_SERVER ? "Server: " \
197 : "Client:" \
198 " ")
199
QuicSpdyStream(QuicStreamId id,QuicSpdySession * spdy_session,StreamType type)200 QuicSpdyStream::QuicSpdyStream(QuicStreamId id, QuicSpdySession* spdy_session,
201 StreamType type)
202 : QuicStream(id, spdy_session, /*is_static=*/false, type),
203 spdy_session_(spdy_session),
204 on_body_available_called_because_sequencer_is_closed_(false),
205 visitor_(nullptr),
206 blocked_on_decoding_headers_(false),
207 headers_decompressed_(false),
208 header_list_size_limit_exceeded_(false),
209 headers_payload_length_(0),
210 trailers_decompressed_(false),
211 trailers_consumed_(false),
212 http_decoder_visitor_(std::make_unique<HttpDecoderVisitor>(this)),
213 decoder_(http_decoder_visitor_.get()),
214 sequencer_offset_(0),
215 is_decoder_processing_input_(false),
216 ack_listener_(nullptr),
217 last_sent_priority_(
218 QuicStreamPriority::Default(spdy_session->priority_type())) {
219 QUICHE_DCHECK_EQ(session()->connection(), spdy_session->connection());
220 QUICHE_DCHECK_EQ(transport_version(), spdy_session->transport_version());
221 QUICHE_DCHECK(!QuicUtils::IsCryptoStreamId(transport_version(), id));
222 QUICHE_DCHECK_EQ(0u, sequencer()->NumBytesConsumed());
223 // If headers are sent on the headers stream, then do not receive any
224 // callbacks from the sequencer until headers are complete.
225 if (!VersionUsesHttp3(transport_version())) {
226 sequencer()->SetBlockedUntilFlush();
227 }
228
229 if (VersionUsesHttp3(transport_version())) {
230 sequencer()->set_level_triggered(true);
231 }
232
233 spdy_session_->OnStreamCreated(this);
234 }
235
QuicSpdyStream(PendingStream * pending,QuicSpdySession * spdy_session)236 QuicSpdyStream::QuicSpdyStream(PendingStream* pending,
237 QuicSpdySession* spdy_session)
238 : QuicStream(pending, spdy_session, /*is_static=*/false),
239 spdy_session_(spdy_session),
240 on_body_available_called_because_sequencer_is_closed_(false),
241 visitor_(nullptr),
242 blocked_on_decoding_headers_(false),
243 headers_decompressed_(false),
244 header_list_size_limit_exceeded_(false),
245 headers_payload_length_(0),
246 trailers_decompressed_(false),
247 trailers_consumed_(false),
248 http_decoder_visitor_(std::make_unique<HttpDecoderVisitor>(this)),
249 decoder_(http_decoder_visitor_.get()),
250 sequencer_offset_(sequencer()->NumBytesConsumed()),
251 is_decoder_processing_input_(false),
252 ack_listener_(nullptr),
253 last_sent_priority_(
254 QuicStreamPriority::Default(spdy_session->priority_type())) {
255 QUICHE_DCHECK_EQ(session()->connection(), spdy_session->connection());
256 QUICHE_DCHECK_EQ(transport_version(), spdy_session->transport_version());
257 QUICHE_DCHECK(!QuicUtils::IsCryptoStreamId(transport_version(), id()));
258 // If headers are sent on the headers stream, then do not receive any
259 // callbacks from the sequencer until headers are complete.
260 if (!VersionUsesHttp3(transport_version())) {
261 sequencer()->SetBlockedUntilFlush();
262 }
263
264 if (VersionUsesHttp3(transport_version())) {
265 sequencer()->set_level_triggered(true);
266 }
267
268 spdy_session_->OnStreamCreated(this);
269 }
270
~QuicSpdyStream()271 QuicSpdyStream::~QuicSpdyStream() {}
272
WriteHeaders(Http2HeaderBlock header_block,bool fin,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)273 size_t QuicSpdyStream::WriteHeaders(
274 Http2HeaderBlock header_block, bool fin,
275 quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
276 ack_listener) {
277 if (!AssertNotWebTransportDataStream("writing headers")) {
278 return 0;
279 }
280
281 QuicConnection::ScopedPacketFlusher flusher(spdy_session_->connection());
282
283 MaybeProcessSentWebTransportHeaders(header_block);
284
285 if (web_transport_ != nullptr &&
286 spdy_session_->perspective() == Perspective::IS_SERVER &&
287 spdy_session_->SupportedWebTransportVersion() ==
288 WebTransportHttp3Version::kDraft02) {
289 header_block["sec-webtransport-http3-draft"] = "draft02";
290 }
291
292 size_t bytes_written =
293 WriteHeadersImpl(std::move(header_block), fin, std::move(ack_listener));
294 if (!VersionUsesHttp3(transport_version()) && fin) {
295 // If HEADERS are sent on the headers stream, then |fin_sent_| needs to be
296 // set and write side needs to be closed without actually sending a FIN on
297 // this stream.
298 // TODO(rch): Add test to ensure fin_sent_ is set whenever a fin is sent.
299 SetFinSent();
300 CloseWriteSide();
301 }
302
303 if (web_transport_ != nullptr &&
304 session()->perspective() == Perspective::IS_CLIENT) {
305 WriteGreaseCapsule();
306 if (spdy_session_->http_datagram_support() ==
307 HttpDatagramSupport::kDraft04) {
308 // Send a REGISTER_DATAGRAM_NO_CONTEXT capsule to support servers that
309 // are running draft-ietf-masque-h3-datagram-04 or -05.
310 uint64_t capsule_type = 0xff37a2; // REGISTER_DATAGRAM_NO_CONTEXT
311 constexpr unsigned char capsule_data[4] = {
312 0x80, 0xff, 0x7c, 0x00, // WEBTRANSPORT datagram format type
313 };
314 WriteCapsule(Capsule::Unknown(
315 capsule_type,
316 absl::string_view(reinterpret_cast<const char*>(capsule_data),
317 sizeof(capsule_data))));
318 WriteGreaseCapsule();
319 }
320 }
321
322 if (connect_ip_visitor_ != nullptr) {
323 connect_ip_visitor_->OnHeadersWritten();
324 }
325
326 return bytes_written;
327 }
328
WriteOrBufferBody(absl::string_view data,bool fin)329 void QuicSpdyStream::WriteOrBufferBody(absl::string_view data, bool fin) {
330 if (!AssertNotWebTransportDataStream("writing body data")) {
331 return;
332 }
333 if (!VersionUsesHttp3(transport_version()) || data.length() == 0) {
334 WriteOrBufferData(data, fin, nullptr);
335 return;
336 }
337 QuicConnection::ScopedPacketFlusher flusher(spdy_session_->connection());
338
339 const bool success =
340 WriteDataFrameHeader(data.length(), /*force_write=*/true);
341 QUICHE_DCHECK(success);
342
343 // Write body.
344 QUIC_DVLOG(1) << ENDPOINT << "Stream " << id()
345 << " is writing DATA frame payload of length " << data.length()
346 << " with fin " << fin;
347 WriteOrBufferData(data, fin, nullptr);
348 }
349
WriteTrailers(Http2HeaderBlock trailer_block,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)350 size_t QuicSpdyStream::WriteTrailers(
351 Http2HeaderBlock trailer_block,
352 quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
353 ack_listener) {
354 if (fin_sent()) {
355 QUIC_BUG(quic_bug_10410_1)
356 << "Trailers cannot be sent after a FIN, on stream " << id();
357 return 0;
358 }
359
360 if (!VersionUsesHttp3(transport_version())) {
361 // The header block must contain the final offset for this stream, as the
362 // trailers may be processed out of order at the peer.
363 const QuicStreamOffset final_offset =
364 stream_bytes_written() + BufferedDataBytes();
365 QUIC_DVLOG(1) << ENDPOINT << "Inserting trailer: (" << kFinalOffsetHeaderKey
366 << ", " << final_offset << ")";
367 trailer_block.insert(
368 std::make_pair(kFinalOffsetHeaderKey, absl::StrCat(final_offset)));
369 }
370
371 // Write the trailing headers with a FIN, and close stream for writing:
372 // trailers are the last thing to be sent on a stream.
373 const bool kFin = true;
374 size_t bytes_written =
375 WriteHeadersImpl(std::move(trailer_block), kFin, std::move(ack_listener));
376
377 // If trailers are sent on the headers stream, then |fin_sent_| needs to be
378 // set without actually sending a FIN on this stream.
379 if (!VersionUsesHttp3(transport_version())) {
380 SetFinSent();
381
382 // Also, write side of this stream needs to be closed. However, only do
383 // this if there is no more buffered data, otherwise it will never be sent.
384 if (BufferedDataBytes() == 0) {
385 CloseWriteSide();
386 }
387 }
388
389 return bytes_written;
390 }
391
WritevBody(const struct iovec * iov,int count,bool fin)392 QuicConsumedData QuicSpdyStream::WritevBody(const struct iovec* iov, int count,
393 bool fin) {
394 quiche::QuicheMemSliceStorage storage(
395 iov, count,
396 session()->connection()->helper()->GetStreamSendBufferAllocator(),
397 GetQuicFlag(quic_send_buffer_max_data_slice_size));
398 return WriteBodySlices(storage.ToSpan(), fin);
399 }
400
WriteDataFrameHeader(QuicByteCount data_length,bool force_write)401 bool QuicSpdyStream::WriteDataFrameHeader(QuicByteCount data_length,
402 bool force_write) {
403 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
404 QUICHE_DCHECK_GT(data_length, 0u);
405 quiche::QuicheBuffer header = HttpEncoder::SerializeDataFrameHeader(
406 data_length,
407 spdy_session_->connection()->helper()->GetStreamSendBufferAllocator());
408 const bool can_write = CanWriteNewDataAfterData(header.size());
409 if (!can_write && !force_write) {
410 return false;
411 }
412
413 if (spdy_session_->debug_visitor()) {
414 spdy_session_->debug_visitor()->OnDataFrameSent(id(), data_length);
415 }
416
417 unacked_frame_headers_offsets_.Add(
418 send_buffer().stream_offset(),
419 send_buffer().stream_offset() + header.size());
420 QUIC_DVLOG(1) << ENDPOINT << "Stream " << id()
421 << " is writing DATA frame header of length " << header.size();
422 if (can_write) {
423 // Save one copy and allocation if send buffer can accomodate the header.
424 quiche::QuicheMemSlice header_slice(std::move(header));
425 WriteMemSlices(absl::MakeSpan(&header_slice, 1), false);
426 } else {
427 QUICHE_DCHECK(force_write);
428 WriteOrBufferData(header.AsStringView(), false, nullptr);
429 }
430 return true;
431 }
432
WriteBodySlices(absl::Span<quiche::QuicheMemSlice> slices,bool fin)433 QuicConsumedData QuicSpdyStream::WriteBodySlices(
434 absl::Span<quiche::QuicheMemSlice> slices, bool fin) {
435 if (!VersionUsesHttp3(transport_version()) || slices.empty()) {
436 return WriteMemSlices(slices, fin);
437 }
438
439 QuicConnection::ScopedPacketFlusher flusher(spdy_session_->connection());
440 const QuicByteCount data_size = MemSliceSpanTotalSize(slices);
441 if (!WriteDataFrameHeader(data_size, /*force_write=*/false)) {
442 return {0, false};
443 }
444
445 QUIC_DVLOG(1) << ENDPOINT << "Stream " << id()
446 << " is writing DATA frame payload of length " << data_size;
447 return WriteMemSlices(slices, fin);
448 }
449
Readv(const struct iovec * iov,size_t iov_len)450 size_t QuicSpdyStream::Readv(const struct iovec* iov, size_t iov_len) {
451 QUICHE_DCHECK(FinishedReadingHeaders());
452 if (!VersionUsesHttp3(transport_version())) {
453 return sequencer()->Readv(iov, iov_len);
454 }
455 size_t bytes_read = 0;
456 sequencer()->MarkConsumed(body_manager_.ReadBody(iov, iov_len, &bytes_read));
457
458 return bytes_read;
459 }
460
GetReadableRegions(iovec * iov,size_t iov_len) const461 int QuicSpdyStream::GetReadableRegions(iovec* iov, size_t iov_len) const {
462 QUICHE_DCHECK(FinishedReadingHeaders());
463 if (!VersionUsesHttp3(transport_version())) {
464 return sequencer()->GetReadableRegions(iov, iov_len);
465 }
466 return body_manager_.PeekBody(iov, iov_len);
467 }
468
MarkConsumed(size_t num_bytes)469 void QuicSpdyStream::MarkConsumed(size_t num_bytes) {
470 QUICHE_DCHECK(FinishedReadingHeaders());
471 if (!VersionUsesHttp3(transport_version())) {
472 sequencer()->MarkConsumed(num_bytes);
473 return;
474 }
475
476 sequencer()->MarkConsumed(body_manager_.OnBodyConsumed(num_bytes));
477 }
478
IsDoneReading() const479 bool QuicSpdyStream::IsDoneReading() const {
480 bool done_reading_headers = FinishedReadingHeaders();
481 bool done_reading_body = sequencer()->IsClosed();
482 bool done_reading_trailers = FinishedReadingTrailers();
483 return done_reading_headers && done_reading_body && done_reading_trailers;
484 }
485
HasBytesToRead() const486 bool QuicSpdyStream::HasBytesToRead() const {
487 if (!VersionUsesHttp3(transport_version())) {
488 return sequencer()->HasBytesToRead();
489 }
490 return body_manager_.HasBytesToRead();
491 }
492
ReadableBytes() const493 QuicByteCount QuicSpdyStream::ReadableBytes() const {
494 if (!VersionUsesHttp3(transport_version())) {
495 return sequencer()->ReadableBytes();
496 }
497 return body_manager_.ReadableBytes();
498 }
499
MarkTrailersConsumed()500 void QuicSpdyStream::MarkTrailersConsumed() { trailers_consumed_ = true; }
501
total_body_bytes_read() const502 uint64_t QuicSpdyStream::total_body_bytes_read() const {
503 if (VersionUsesHttp3(transport_version())) {
504 return body_manager_.total_body_bytes_received();
505 }
506 return sequencer()->NumBytesConsumed();
507 }
508
ConsumeHeaderList()509 void QuicSpdyStream::ConsumeHeaderList() {
510 header_list_.Clear();
511
512 if (!FinishedReadingHeaders()) {
513 return;
514 }
515
516 if (!VersionUsesHttp3(transport_version())) {
517 sequencer()->SetUnblocked();
518 return;
519 }
520
521 if (body_manager_.HasBytesToRead()) {
522 HandleBodyAvailable();
523 return;
524 }
525
526 if (sequencer()->IsClosed() &&
527 !on_body_available_called_because_sequencer_is_closed_) {
528 on_body_available_called_because_sequencer_is_closed_ = true;
529 HandleBodyAvailable();
530 }
531 }
532
OnStreamHeadersPriority(const spdy::SpdyStreamPrecedence & precedence)533 void QuicSpdyStream::OnStreamHeadersPriority(
534 const spdy::SpdyStreamPrecedence& precedence) {
535 QUICHE_DCHECK_EQ(Perspective::IS_SERVER,
536 session()->connection()->perspective());
537 if (session()->priority_type() != QuicPriorityType::kHttp) {
538 return;
539 }
540 SetPriority(QuicStreamPriority(HttpStreamPriority{
541 precedence.spdy3_priority(), HttpStreamPriority::kDefaultIncremental}));
542 }
543
OnStreamHeaderList(bool fin,size_t frame_len,const QuicHeaderList & header_list)544 void QuicSpdyStream::OnStreamHeaderList(bool fin, size_t frame_len,
545 const QuicHeaderList& header_list) {
546 if (!spdy_session()->user_agent_id().has_value()) {
547 std::string uaid;
548 for (const auto& kv : header_list) {
549 if (quiche::QuicheTextUtils::ToLower(kv.first) == kUserAgentHeaderName) {
550 uaid = kv.second;
551 break;
552 }
553 }
554 spdy_session()->SetUserAgentId(std::move(uaid));
555 }
556
557 // TODO(b/134706391): remove |fin| argument.
558 // When using Google QUIC, an empty header list indicates that the size limit
559 // has been exceeded.
560 // When using IETF QUIC, there is an explicit signal from
561 // QpackDecodedHeadersAccumulator.
562 if ((VersionUsesHttp3(transport_version()) &&
563 header_list_size_limit_exceeded_) ||
564 (!VersionUsesHttp3(transport_version()) && header_list.empty())) {
565 OnHeadersTooLarge();
566 if (IsDoneReading()) {
567 return;
568 }
569 }
570 if (!NextHeaderIsTrailer()) {
571 OnInitialHeadersComplete(fin, frame_len, header_list);
572 } else {
573 OnTrailingHeadersComplete(fin, frame_len, header_list);
574 }
575 }
576
OnHeadersDecoded(QuicHeaderList headers,bool header_list_size_limit_exceeded)577 void QuicSpdyStream::OnHeadersDecoded(QuicHeaderList headers,
578 bool header_list_size_limit_exceeded) {
579 header_list_size_limit_exceeded_ = header_list_size_limit_exceeded;
580 qpack_decoded_headers_accumulator_.reset();
581
582 QuicSpdySession::LogHeaderCompressionRatioHistogram(
583 /* using_qpack = */ true,
584 /* is_sent = */ false, headers.compressed_header_bytes(),
585 headers.uncompressed_header_bytes());
586
587 Http3DebugVisitor* const debug_visitor = spdy_session()->debug_visitor();
588 if (debug_visitor) {
589 debug_visitor->OnHeadersDecoded(id(), headers);
590 }
591
592 OnStreamHeaderList(/* fin = */ false, headers_payload_length_, headers);
593
594 if (blocked_on_decoding_headers_) {
595 blocked_on_decoding_headers_ = false;
596 // Continue decoding HTTP/3 frames.
597 OnDataAvailable();
598 }
599 }
600
OnHeaderDecodingError(QuicErrorCode error_code,absl::string_view error_message)601 void QuicSpdyStream::OnHeaderDecodingError(QuicErrorCode error_code,
602 absl::string_view error_message) {
603 qpack_decoded_headers_accumulator_.reset();
604
605 std::string connection_close_error_message = absl::StrCat(
606 "Error decoding ", headers_decompressed_ ? "trailers" : "headers",
607 " on stream ", id(), ": ", error_message);
608 OnUnrecoverableError(error_code, connection_close_error_message);
609 }
610
MaybeSendPriorityUpdateFrame()611 void QuicSpdyStream::MaybeSendPriorityUpdateFrame() {
612 if (!VersionUsesHttp3(transport_version()) ||
613 session()->perspective() != Perspective::IS_CLIENT) {
614 return;
615 }
616 if (spdy_session_->priority_type() != QuicPriorityType::kHttp) {
617 return;
618 }
619
620 if (last_sent_priority_ == priority()) {
621 return;
622 }
623 last_sent_priority_ = priority();
624
625 spdy_session_->WriteHttp3PriorityUpdate(id(), priority().http());
626 }
627
OnHeadersTooLarge()628 void QuicSpdyStream::OnHeadersTooLarge() { Reset(QUIC_HEADERS_TOO_LARGE); }
629
OnInitialHeadersComplete(bool fin,size_t,const QuicHeaderList & header_list)630 void QuicSpdyStream::OnInitialHeadersComplete(
631 bool fin, size_t /*frame_len*/, const QuicHeaderList& header_list) {
632 // TODO(b/134706391): remove |fin| argument.
633 headers_decompressed_ = true;
634 header_list_ = header_list;
635 bool header_too_large = VersionUsesHttp3(transport_version())
636 ? header_list_size_limit_exceeded_
637 : header_list.empty();
638 if (!AreHeaderFieldValuesValid(header_list)) {
639 OnInvalidHeaders();
640 return;
641 }
642 // Validate request headers if it did not exceed size limit. If it did,
643 // OnHeadersTooLarge() should have already handled it previously.
644 if (!header_too_large && !ValidateReceivedHeaders(header_list)) {
645 QUIC_CODE_COUNT_N(quic_validate_request_header, 1, 2);
646 QUICHE_DCHECK(!invalid_request_details().empty())
647 << "ValidatedRequestHeaders() returns false without populating "
648 "invalid_request_details_";
649 if (GetQuicReloadableFlag(quic_act_upon_invalid_header)) {
650 QUIC_RELOADABLE_FLAG_COUNT(quic_act_upon_invalid_header);
651 OnInvalidHeaders();
652 return;
653 }
654 }
655 QUIC_CODE_COUNT_N(quic_validate_request_header, 2, 2);
656
657 if (!header_too_large) {
658 MaybeProcessReceivedWebTransportHeaders();
659 }
660
661 if (VersionUsesHttp3(transport_version())) {
662 if (fin) {
663 OnStreamFrame(QuicStreamFrame(id(), /* fin = */ true,
664 highest_received_byte_offset(),
665 absl::string_view()));
666 }
667 return;
668 }
669
670 if (fin && !rst_sent()) {
671 OnStreamFrame(
672 QuicStreamFrame(id(), fin, /* offset = */ 0, absl::string_view()));
673 }
674 if (FinishedReadingHeaders()) {
675 sequencer()->SetUnblocked();
676 }
677 }
678
CopyAndValidateTrailers(const QuicHeaderList & header_list,bool expect_final_byte_offset,size_t * final_byte_offset,spdy::Http2HeaderBlock * trailers)679 bool QuicSpdyStream::CopyAndValidateTrailers(const QuicHeaderList& header_list,
680 bool expect_final_byte_offset,
681 size_t* final_byte_offset,
682 spdy::Http2HeaderBlock* trailers) {
683 return SpdyUtils::CopyAndValidateTrailers(
684 header_list, expect_final_byte_offset, final_byte_offset, trailers);
685 }
686
OnTrailingHeadersComplete(bool fin,size_t,const QuicHeaderList & header_list)687 void QuicSpdyStream::OnTrailingHeadersComplete(
688 bool fin, size_t /*frame_len*/, const QuicHeaderList& header_list) {
689 // TODO(b/134706391): remove |fin| argument.
690 QUICHE_DCHECK(!trailers_decompressed_);
691 if (!VersionUsesHttp3(transport_version()) && fin_received()) {
692 QUIC_DLOG(INFO) << ENDPOINT
693 << "Received Trailers after FIN, on stream: " << id();
694 stream_delegate()->OnStreamError(QUIC_INVALID_HEADERS_STREAM_DATA,
695 "Trailers after fin");
696 return;
697 }
698
699 if (!VersionUsesHttp3(transport_version()) && !fin) {
700 QUIC_DLOG(INFO) << ENDPOINT
701 << "Trailers must have FIN set, on stream: " << id();
702 stream_delegate()->OnStreamError(QUIC_INVALID_HEADERS_STREAM_DATA,
703 "Fin missing from trailers");
704 return;
705 }
706
707 size_t final_byte_offset = 0;
708 const bool expect_final_byte_offset = !VersionUsesHttp3(transport_version());
709 if (!CopyAndValidateTrailers(header_list, expect_final_byte_offset,
710 &final_byte_offset, &received_trailers_)) {
711 QUIC_DLOG(ERROR) << ENDPOINT << "Trailers for stream " << id()
712 << " are malformed.";
713 stream_delegate()->OnStreamError(QUIC_INVALID_HEADERS_STREAM_DATA,
714 "Trailers are malformed");
715 return;
716 }
717 trailers_decompressed_ = true;
718 if (fin) {
719 const QuicStreamOffset offset = VersionUsesHttp3(transport_version())
720 ? highest_received_byte_offset()
721 : final_byte_offset;
722 OnStreamFrame(QuicStreamFrame(id(), fin, offset, absl::string_view()));
723 }
724 }
725
RegisterMetadataVisitor(MetadataVisitor * visitor)726 void QuicSpdyStream::RegisterMetadataVisitor(MetadataVisitor* visitor) {
727 QUIC_BUG_IF(Metadata visitor requires http3 metadata flag,
728 !GetQuicReloadableFlag(quic_enable_http3_metadata_decoding));
729 metadata_visitor_ = visitor;
730 }
731
UnregisterMetadataVisitor()732 void QuicSpdyStream::UnregisterMetadataVisitor() {
733 metadata_visitor_ = nullptr;
734 }
735
OnPriorityFrame(const spdy::SpdyStreamPrecedence & precedence)736 void QuicSpdyStream::OnPriorityFrame(
737 const spdy::SpdyStreamPrecedence& precedence) {
738 QUICHE_DCHECK_EQ(Perspective::IS_SERVER,
739 session()->connection()->perspective());
740 if (session()->priority_type() != QuicPriorityType::kHttp) {
741 return;
742 }
743 SetPriority(QuicStreamPriority(HttpStreamPriority{
744 precedence.spdy3_priority(), HttpStreamPriority::kDefaultIncremental}));
745 }
746
OnStreamReset(const QuicRstStreamFrame & frame)747 void QuicSpdyStream::OnStreamReset(const QuicRstStreamFrame& frame) {
748 if (web_transport_data_ != nullptr) {
749 WebTransportStreamVisitor* webtransport_visitor =
750 web_transport_data_->adapter.visitor();
751 if (webtransport_visitor != nullptr) {
752 webtransport_visitor->OnResetStreamReceived(
753 Http3ErrorToWebTransportOrDefault(frame.ietf_error_code));
754 }
755 QuicStream::OnStreamReset(frame);
756 return;
757 }
758
759 if (VersionUsesHttp3(transport_version()) && !fin_received() &&
760 spdy_session_->qpack_decoder()) {
761 spdy_session_->qpack_decoder()->OnStreamReset(id());
762 qpack_decoded_headers_accumulator_.reset();
763 }
764
765 if (VersionUsesHttp3(transport_version()) ||
766 frame.error_code != QUIC_STREAM_NO_ERROR) {
767 QuicStream::OnStreamReset(frame);
768 return;
769 }
770
771 QUIC_DVLOG(1) << ENDPOINT
772 << "Received QUIC_STREAM_NO_ERROR, not discarding response";
773 set_rst_received(true);
774 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
775 set_stream_error(frame.error());
776 CloseWriteSide();
777 }
778
ResetWithError(QuicResetStreamError error)779 void QuicSpdyStream::ResetWithError(QuicResetStreamError error) {
780 if (VersionUsesHttp3(transport_version()) && !fin_received() &&
781 spdy_session_->qpack_decoder() && web_transport_data_ == nullptr) {
782 spdy_session_->qpack_decoder()->OnStreamReset(id());
783 qpack_decoded_headers_accumulator_.reset();
784 }
785
786 QuicStream::ResetWithError(error);
787 }
788
OnStopSending(QuicResetStreamError error)789 bool QuicSpdyStream::OnStopSending(QuicResetStreamError error) {
790 if (web_transport_data_ != nullptr) {
791 WebTransportStreamVisitor* visitor = web_transport_data_->adapter.visitor();
792 if (visitor != nullptr) {
793 visitor->OnStopSendingReceived(
794 Http3ErrorToWebTransportOrDefault(error.ietf_application_code()));
795 }
796 }
797
798 return QuicStream::OnStopSending(error);
799 }
800
OnWriteSideInDataRecvdState()801 void QuicSpdyStream::OnWriteSideInDataRecvdState() {
802 if (web_transport_data_ != nullptr) {
803 WebTransportStreamVisitor* visitor = web_transport_data_->adapter.visitor();
804 if (visitor != nullptr) {
805 visitor->OnWriteSideInDataRecvdState();
806 }
807 }
808
809 QuicStream::OnWriteSideInDataRecvdState();
810 }
811
OnDataAvailable()812 void QuicSpdyStream::OnDataAvailable() {
813 if (!VersionUsesHttp3(transport_version())) {
814 // Sequencer must be blocked until headers are consumed.
815 QUICHE_DCHECK(FinishedReadingHeaders());
816 }
817
818 if (!VersionUsesHttp3(transport_version())) {
819 HandleBodyAvailable();
820 return;
821 }
822
823 if (web_transport_data_ != nullptr) {
824 web_transport_data_->adapter.OnDataAvailable();
825 return;
826 }
827
828 if (!spdy_session()->ShouldProcessIncomingRequests()) {
829 spdy_session()->OnStreamWaitingForClientSettings(id());
830 return;
831 }
832
833 if (is_decoder_processing_input_) {
834 // Let the outermost nested OnDataAvailable() call do the work.
835 return;
836 }
837
838 if (blocked_on_decoding_headers_) {
839 return;
840 }
841
842 if (spdy_session_->SupportsWebTransport()) {
843 // We do this here, since at this point, we have passed the
844 // ShouldProcessIncomingRequests() check above, meaning we know for a fact
845 // if we should be parsing WEBTRANSPORT_STREAM or not.
846 decoder_.EnableWebTransportStreamParsing();
847 }
848
849 iovec iov;
850 while (session()->connection()->connected() && !reading_stopped() &&
851 decoder_.error() == QUIC_NO_ERROR) {
852 QUICHE_DCHECK_GE(sequencer_offset_, sequencer()->NumBytesConsumed());
853 if (!sequencer()->PeekRegion(sequencer_offset_, &iov)) {
854 break;
855 }
856
857 QUICHE_DCHECK(!sequencer()->IsClosed());
858 is_decoder_processing_input_ = true;
859 QuicByteCount processed_bytes = decoder_.ProcessInput(
860 reinterpret_cast<const char*>(iov.iov_base), iov.iov_len);
861 is_decoder_processing_input_ = false;
862 if (!session()->connection()->connected()) {
863 return;
864 }
865 sequencer_offset_ += processed_bytes;
866 if (blocked_on_decoding_headers_) {
867 return;
868 }
869 if (web_transport_data_ != nullptr) {
870 return;
871 }
872 }
873
874 // Do not call HandleBodyAvailable() until headers are consumed.
875 if (!FinishedReadingHeaders()) {
876 return;
877 }
878
879 if (body_manager_.HasBytesToRead()) {
880 HandleBodyAvailable();
881 return;
882 }
883
884 if (sequencer()->IsClosed() &&
885 !on_body_available_called_because_sequencer_is_closed_) {
886 on_body_available_called_because_sequencer_is_closed_ = true;
887 HandleBodyAvailable();
888 }
889 }
890
OnClose()891 void QuicSpdyStream::OnClose() {
892 QuicStream::OnClose();
893
894 qpack_decoded_headers_accumulator_.reset();
895
896 if (visitor_) {
897 Visitor* visitor = visitor_;
898 // Calling Visitor::OnClose() may result the destruction of the visitor,
899 // so we need to ensure we don't call it again.
900 visitor_ = nullptr;
901 visitor->OnClose(this);
902 }
903
904 if (web_transport_ != nullptr) {
905 web_transport_->OnConnectStreamClosing();
906 }
907 if (web_transport_data_ != nullptr) {
908 WebTransportHttp3* web_transport =
909 spdy_session_->GetWebTransportSession(web_transport_data_->session_id);
910 if (web_transport == nullptr) {
911 // Since there is no guaranteed destruction order for streams, the session
912 // could be already removed from the stream map by the time we reach here.
913 QUIC_DLOG(WARNING) << ENDPOINT << "WebTransport stream " << id()
914 << " attempted to notify parent session "
915 << web_transport_data_->session_id
916 << ", but the session could not be found.";
917 return;
918 }
919 web_transport->OnStreamClosed(id());
920 }
921 }
922
OnCanWrite()923 void QuicSpdyStream::OnCanWrite() {
924 QuicStream::OnCanWrite();
925
926 // Trailers (and hence a FIN) may have been sent ahead of queued body bytes.
927 if (!HasBufferedData() && fin_sent()) {
928 CloseWriteSide();
929 }
930 }
931
FinishedReadingHeaders() const932 bool QuicSpdyStream::FinishedReadingHeaders() const {
933 return headers_decompressed_ && header_list_.empty();
934 }
935
ParseHeaderStatusCode(const Http2HeaderBlock & header,int * status_code)936 bool QuicSpdyStream::ParseHeaderStatusCode(const Http2HeaderBlock& header,
937 int* status_code) {
938 Http2HeaderBlock::const_iterator it = header.find(spdy::kHttp2StatusHeader);
939 if (it == header.end()) {
940 return false;
941 }
942 const absl::string_view status(it->second);
943 return ParseHeaderStatusCode(status, status_code);
944 }
945
ParseHeaderStatusCode(absl::string_view status,int * status_code)946 bool QuicSpdyStream::ParseHeaderStatusCode(absl::string_view status,
947 int* status_code) {
948 if (status.size() != 3) {
949 return false;
950 }
951 // First character must be an integer in range [1,5].
952 if (status[0] < '1' || status[0] > '5') {
953 return false;
954 }
955 // The remaining two characters must be integers.
956 if (!isdigit(status[1]) || !isdigit(status[2])) {
957 return false;
958 }
959 return absl::SimpleAtoi(status, status_code);
960 }
961
FinishedReadingTrailers() const962 bool QuicSpdyStream::FinishedReadingTrailers() const {
963 // If no further trailing headers are expected, and the decompressed trailers
964 // (if any) have been consumed, then reading of trailers is finished.
965 if (!fin_received()) {
966 return false;
967 } else if (!trailers_decompressed_) {
968 return true;
969 } else {
970 return trailers_consumed_;
971 }
972 }
973
OnDataFrameStart(QuicByteCount header_length,QuicByteCount payload_length)974 bool QuicSpdyStream::OnDataFrameStart(QuicByteCount header_length,
975 QuicByteCount payload_length) {
976 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
977
978 if (spdy_session_->debug_visitor()) {
979 spdy_session_->debug_visitor()->OnDataFrameReceived(id(), payload_length);
980 }
981
982 if (!headers_decompressed_ || trailers_decompressed_) {
983 QUICHE_LOG(INFO) << ENDPOINT << "stream_id: " << id()
984 << ", headers_decompressed: "
985 << (headers_decompressed_ ? "true" : "false")
986 << ", trailers_decompressed: "
987 << (trailers_decompressed_ ? "true" : "false")
988 << ", NumBytesConsumed: "
989 << sequencer()->NumBytesConsumed()
990 << ", total_body_bytes_received: "
991 << body_manager_.total_body_bytes_received()
992 << ", header_length: " << header_length
993 << ", payload_length: " << payload_length;
994 stream_delegate()->OnStreamError(
995 QUIC_HTTP_INVALID_FRAME_SEQUENCE_ON_SPDY_STREAM,
996 "Unexpected DATA frame received.");
997 return false;
998 }
999
1000 sequencer()->MarkConsumed(body_manager_.OnNonBody(header_length));
1001
1002 return true;
1003 }
1004
OnDataFramePayload(absl::string_view payload)1005 bool QuicSpdyStream::OnDataFramePayload(absl::string_view payload) {
1006 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
1007
1008 body_manager_.OnBody(payload);
1009
1010 return true;
1011 }
1012
OnDataFrameEnd()1013 bool QuicSpdyStream::OnDataFrameEnd() {
1014 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
1015
1016 QUIC_DVLOG(1) << ENDPOINT
1017 << "Reaches the end of a data frame. Total bytes received are "
1018 << body_manager_.total_body_bytes_received();
1019 return true;
1020 }
1021
OnStreamFrameAcked(QuicStreamOffset offset,QuicByteCount data_length,bool fin_acked,QuicTime::Delta ack_delay_time,QuicTime receive_timestamp,QuicByteCount * newly_acked_length)1022 bool QuicSpdyStream::OnStreamFrameAcked(QuicStreamOffset offset,
1023 QuicByteCount data_length,
1024 bool fin_acked,
1025 QuicTime::Delta ack_delay_time,
1026 QuicTime receive_timestamp,
1027 QuicByteCount* newly_acked_length) {
1028 const bool new_data_acked = QuicStream::OnStreamFrameAcked(
1029 offset, data_length, fin_acked, ack_delay_time, receive_timestamp,
1030 newly_acked_length);
1031
1032 const QuicByteCount newly_acked_header_length =
1033 GetNumFrameHeadersInInterval(offset, data_length);
1034 QUICHE_DCHECK_LE(newly_acked_header_length, *newly_acked_length);
1035 unacked_frame_headers_offsets_.Difference(offset, offset + data_length);
1036 if (ack_listener_ != nullptr && new_data_acked) {
1037 ack_listener_->OnPacketAcked(
1038 *newly_acked_length - newly_acked_header_length, ack_delay_time);
1039 }
1040 return new_data_acked;
1041 }
1042
OnStreamFrameRetransmitted(QuicStreamOffset offset,QuicByteCount data_length,bool fin_retransmitted)1043 void QuicSpdyStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
1044 QuicByteCount data_length,
1045 bool fin_retransmitted) {
1046 QuicStream::OnStreamFrameRetransmitted(offset, data_length,
1047 fin_retransmitted);
1048
1049 const QuicByteCount retransmitted_header_length =
1050 GetNumFrameHeadersInInterval(offset, data_length);
1051 QUICHE_DCHECK_LE(retransmitted_header_length, data_length);
1052
1053 if (ack_listener_ != nullptr) {
1054 ack_listener_->OnPacketRetransmitted(data_length -
1055 retransmitted_header_length);
1056 }
1057 }
1058
GetNumFrameHeadersInInterval(QuicStreamOffset offset,QuicByteCount data_length) const1059 QuicByteCount QuicSpdyStream::GetNumFrameHeadersInInterval(
1060 QuicStreamOffset offset, QuicByteCount data_length) const {
1061 QuicByteCount header_acked_length = 0;
1062 QuicIntervalSet<QuicStreamOffset> newly_acked(offset, offset + data_length);
1063 newly_acked.Intersection(unacked_frame_headers_offsets_);
1064 for (const auto& interval : newly_acked) {
1065 header_acked_length += interval.Length();
1066 }
1067 return header_acked_length;
1068 }
1069
OnHeadersFrameStart(QuicByteCount header_length,QuicByteCount payload_length)1070 bool QuicSpdyStream::OnHeadersFrameStart(QuicByteCount header_length,
1071 QuicByteCount payload_length) {
1072 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
1073 QUICHE_DCHECK(!qpack_decoded_headers_accumulator_);
1074
1075 if (spdy_session_->debug_visitor()) {
1076 spdy_session_->debug_visitor()->OnHeadersFrameReceived(id(),
1077 payload_length);
1078 }
1079
1080 headers_payload_length_ = payload_length;
1081
1082 if (trailers_decompressed_) {
1083 QUICHE_LOG(INFO) << ENDPOINT << "stream_id: " << id()
1084 << ", headers_decompressed: "
1085 << (headers_decompressed_ ? "true" : "false")
1086 << ", NumBytesConsumed: "
1087 << sequencer()->NumBytesConsumed()
1088 << ", total_body_bytes_received: "
1089 << body_manager_.total_body_bytes_received();
1090 stream_delegate()->OnStreamError(
1091 QUIC_HTTP_INVALID_FRAME_SEQUENCE_ON_SPDY_STREAM,
1092 "HEADERS frame received after trailing HEADERS.");
1093 return false;
1094 }
1095
1096 sequencer()->MarkConsumed(body_manager_.OnNonBody(header_length));
1097
1098 qpack_decoded_headers_accumulator_ =
1099 std::make_unique<QpackDecodedHeadersAccumulator>(
1100 id(), spdy_session_->qpack_decoder(), this,
1101 spdy_session_->max_inbound_header_list_size());
1102
1103 return true;
1104 }
1105
OnHeadersFramePayload(absl::string_view payload)1106 bool QuicSpdyStream::OnHeadersFramePayload(absl::string_view payload) {
1107 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
1108
1109 if (!qpack_decoded_headers_accumulator_) {
1110 QUIC_BUG(b215142466_OnHeadersFramePayload);
1111 OnHeaderDecodingError(QUIC_INTERNAL_ERROR,
1112 "qpack_decoded_headers_accumulator_ is nullptr");
1113 return false;
1114 }
1115
1116 qpack_decoded_headers_accumulator_->Decode(payload);
1117
1118 // |qpack_decoded_headers_accumulator_| is reset if an error is detected.
1119 if (!qpack_decoded_headers_accumulator_) {
1120 return false;
1121 }
1122
1123 sequencer()->MarkConsumed(body_manager_.OnNonBody(payload.size()));
1124 return true;
1125 }
1126
OnHeadersFrameEnd()1127 bool QuicSpdyStream::OnHeadersFrameEnd() {
1128 QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
1129
1130 if (!qpack_decoded_headers_accumulator_) {
1131 QUIC_BUG(b215142466_OnHeadersFrameEnd);
1132 OnHeaderDecodingError(QUIC_INTERNAL_ERROR,
1133 "qpack_decoded_headers_accumulator_ is nullptr");
1134 return false;
1135 }
1136
1137 qpack_decoded_headers_accumulator_->EndHeaderBlock();
1138
1139 // If decoding is complete or an error is detected, then
1140 // |qpack_decoded_headers_accumulator_| is already reset.
1141 if (qpack_decoded_headers_accumulator_) {
1142 blocked_on_decoding_headers_ = true;
1143 return false;
1144 }
1145
1146 return !sequencer()->IsClosed() && !reading_stopped();
1147 }
1148
OnWebTransportStreamFrameType(QuicByteCount header_length,WebTransportSessionId session_id)1149 void QuicSpdyStream::OnWebTransportStreamFrameType(
1150 QuicByteCount header_length, WebTransportSessionId session_id) {
1151 QUIC_DVLOG(1) << ENDPOINT << " Received WEBTRANSPORT_STREAM on stream "
1152 << id() << " for session " << session_id;
1153 QuicStreamOffset offset = sequencer()->NumBytesConsumed();
1154 sequencer()->MarkConsumed(header_length);
1155
1156 std::optional<WebTransportHttp3Version> version =
1157 spdy_session_->SupportedWebTransportVersion();
1158 QUICHE_DCHECK(version.has_value());
1159 if (version == WebTransportHttp3Version::kDraft02) {
1160 if (headers_payload_length_ > 0 || headers_decompressed_) {
1161 std::string error =
1162 absl::StrCat("Stream ", id(),
1163 " attempted to convert itself into a WebTransport data "
1164 "stream, but it already has HTTP data on it");
1165 QUIC_PEER_BUG(WEBTRANSPORT_STREAM received on HTTP request)
1166 << ENDPOINT << error;
1167 OnUnrecoverableError(QUIC_HTTP_INVALID_FRAME_SEQUENCE_ON_SPDY_STREAM,
1168 error);
1169 return;
1170 }
1171 } else {
1172 if (offset > 0) {
1173 std::string error =
1174 absl::StrCat("Stream ", id(),
1175 " received WEBTRANSPORT_STREAM at a non-zero offset");
1176 QUIC_DLOG(ERROR) << ENDPOINT << error;
1177 OnUnrecoverableError(QUIC_HTTP_INVALID_FRAME_SEQUENCE_ON_SPDY_STREAM,
1178 error);
1179 return;
1180 }
1181 }
1182
1183 if (QuicUtils::IsOutgoingStreamId(spdy_session_->version(), id(),
1184 spdy_session_->perspective())) {
1185 std::string error = absl::StrCat(
1186 "Stream ", id(),
1187 " attempted to convert itself into a WebTransport data stream, but "
1188 "only the initiator of the stream can do that");
1189 QUIC_PEER_BUG(WEBTRANSPORT_STREAM received on outgoing request)
1190 << ENDPOINT << error;
1191 OnUnrecoverableError(QUIC_HTTP_INVALID_FRAME_SEQUENCE_ON_SPDY_STREAM,
1192 error);
1193 return;
1194 }
1195
1196 QUICHE_DCHECK(web_transport_ == nullptr);
1197 web_transport_data_ =
1198 std::make_unique<WebTransportDataStream>(this, session_id);
1199 spdy_session_->AssociateIncomingWebTransportStreamWithSession(session_id,
1200 id());
1201 }
1202
OnMetadataFrameStart(QuicByteCount header_length,QuicByteCount payload_length)1203 bool QuicSpdyStream::OnMetadataFrameStart(QuicByteCount header_length,
1204 QuicByteCount payload_length) {
1205 if (metadata_visitor_ == nullptr) {
1206 return OnUnknownFrameStart(
1207 static_cast<uint64_t>(quic::HttpFrameType::METADATA), header_length,
1208 payload_length);
1209 }
1210
1211 QUIC_BUG_IF(Invalid METADATA state, metadata_decoder_ != nullptr);
1212 constexpr size_t kMaxMetadataBlockSize = 1 << 20; // 1 MB
1213 metadata_decoder_ = std::make_unique<MetadataDecoder>(
1214 id(), kMaxMetadataBlockSize, header_length, payload_length);
1215
1216 // Consume the frame header.
1217 QUIC_DVLOG(1) << ENDPOINT << "Consuming " << header_length
1218 << " byte long frame header of METADATA.";
1219 sequencer()->MarkConsumed(body_manager_.OnNonBody(header_length));
1220 return true;
1221 }
1222
OnMetadataFramePayload(absl::string_view payload)1223 bool QuicSpdyStream::OnMetadataFramePayload(absl::string_view payload) {
1224 if (metadata_visitor_ == nullptr) {
1225 return OnUnknownFramePayload(payload);
1226 }
1227
1228 if (!metadata_decoder_->Decode(payload)) {
1229 OnUnrecoverableError(QUIC_DECOMPRESSION_FAILURE,
1230 metadata_decoder_->error_message());
1231 return false;
1232 }
1233
1234 // Consume the frame payload.
1235 QUIC_DVLOG(1) << ENDPOINT << "Consuming " << payload.size()
1236 << " bytes of payload of METADATA.";
1237 sequencer()->MarkConsumed(body_manager_.OnNonBody(payload.size()));
1238 return true;
1239 }
1240
OnMetadataFrameEnd()1241 bool QuicSpdyStream::OnMetadataFrameEnd() {
1242 if (metadata_visitor_ == nullptr) {
1243 return OnUnknownFrameEnd();
1244 }
1245
1246 if (!metadata_decoder_->EndHeaderBlock()) {
1247 OnUnrecoverableError(QUIC_DECOMPRESSION_FAILURE,
1248 metadata_decoder_->error_message());
1249 return false;
1250 }
1251
1252 metadata_visitor_->OnMetadataComplete(metadata_decoder_->frame_len(),
1253 metadata_decoder_->headers());
1254 metadata_decoder_.reset();
1255 return !sequencer()->IsClosed() && !reading_stopped();
1256 }
1257
OnUnknownFrameStart(uint64_t frame_type,QuicByteCount header_length,QuicByteCount payload_length)1258 bool QuicSpdyStream::OnUnknownFrameStart(uint64_t frame_type,
1259 QuicByteCount header_length,
1260 QuicByteCount payload_length) {
1261 if (spdy_session_->debug_visitor()) {
1262 spdy_session_->debug_visitor()->OnUnknownFrameReceived(id(), frame_type,
1263 payload_length);
1264 }
1265 spdy_session_->OnUnknownFrameStart(id(), frame_type, header_length,
1266 payload_length);
1267
1268 // Consume the frame header.
1269 QUIC_DVLOG(1) << ENDPOINT << "Consuming " << header_length
1270 << " byte long frame header of frame of unknown type "
1271 << frame_type << ".";
1272 sequencer()->MarkConsumed(body_manager_.OnNonBody(header_length));
1273 return true;
1274 }
1275
OnUnknownFramePayload(absl::string_view payload)1276 bool QuicSpdyStream::OnUnknownFramePayload(absl::string_view payload) {
1277 spdy_session_->OnUnknownFramePayload(id(), payload);
1278
1279 // Consume the frame payload.
1280 QUIC_DVLOG(1) << ENDPOINT << "Consuming " << payload.size()
1281 << " bytes of payload of frame of unknown type.";
1282 sequencer()->MarkConsumed(body_manager_.OnNonBody(payload.size()));
1283 return true;
1284 }
1285
OnUnknownFrameEnd()1286 bool QuicSpdyStream::OnUnknownFrameEnd() { return true; }
1287
WriteHeadersImpl(spdy::Http2HeaderBlock header_block,bool fin,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)1288 size_t QuicSpdyStream::WriteHeadersImpl(
1289 spdy::Http2HeaderBlock header_block, bool fin,
1290 quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
1291 ack_listener) {
1292 if (!VersionUsesHttp3(transport_version())) {
1293 return spdy_session_->WriteHeadersOnHeadersStream(
1294 id(), std::move(header_block), fin,
1295 spdy::SpdyStreamPrecedence(priority().http().urgency),
1296 std::move(ack_listener));
1297 }
1298
1299 // Encode header list.
1300 QuicByteCount encoder_stream_sent_byte_count;
1301 std::string encoded_headers =
1302 spdy_session_->qpack_encoder()->EncodeHeaderList(
1303 id(), header_block, &encoder_stream_sent_byte_count);
1304
1305 if (spdy_session_->debug_visitor()) {
1306 spdy_session_->debug_visitor()->OnHeadersFrameSent(id(), header_block);
1307 }
1308
1309 // Write HEADERS frame.
1310 std::string headers_frame_header =
1311 HttpEncoder::SerializeHeadersFrameHeader(encoded_headers.size());
1312 unacked_frame_headers_offsets_.Add(
1313 send_buffer().stream_offset(),
1314 send_buffer().stream_offset() + headers_frame_header.length());
1315
1316 QUIC_DVLOG(1) << ENDPOINT << "Stream " << id()
1317 << " is writing HEADERS frame header of length "
1318 << headers_frame_header.length() << ", and payload of length "
1319 << encoded_headers.length() << " with fin " << fin;
1320 WriteOrBufferData(absl::StrCat(headers_frame_header, encoded_headers), fin,
1321 /*ack_listener=*/nullptr);
1322
1323 QuicSpdySession::LogHeaderCompressionRatioHistogram(
1324 /* using_qpack = */ true,
1325 /* is_sent = */ true,
1326 encoded_headers.size() + encoder_stream_sent_byte_count,
1327 header_block.TotalBytesUsed());
1328
1329 return encoded_headers.size();
1330 }
1331
CanWriteNewBodyData(QuicByteCount write_size) const1332 bool QuicSpdyStream::CanWriteNewBodyData(QuicByteCount write_size) const {
1333 QUICHE_DCHECK_NE(0u, write_size);
1334 if (!VersionUsesHttp3(transport_version())) {
1335 return CanWriteNewData();
1336 }
1337
1338 return CanWriteNewDataAfterData(
1339 HttpEncoder::GetDataFrameHeaderLength(write_size));
1340 }
1341
MaybeProcessReceivedWebTransportHeaders()1342 void QuicSpdyStream::MaybeProcessReceivedWebTransportHeaders() {
1343 if (!spdy_session_->SupportsWebTransport()) {
1344 return;
1345 }
1346 if (session()->perspective() != Perspective::IS_SERVER) {
1347 return;
1348 }
1349 QUICHE_DCHECK(IsValidWebTransportSessionId(id(), version()));
1350
1351 std::string method;
1352 std::string protocol;
1353 for (const auto& [header_name, header_value] : header_list_) {
1354 if (header_name == ":method") {
1355 if (!method.empty() || header_value.empty()) {
1356 return;
1357 }
1358 method = header_value;
1359 }
1360 if (header_name == ":protocol") {
1361 if (!protocol.empty() || header_value.empty()) {
1362 return;
1363 }
1364 protocol = header_value;
1365 }
1366 if (header_name == "datagram-flow-id") {
1367 QUIC_DLOG(ERROR) << ENDPOINT
1368 << "Rejecting WebTransport due to unexpected "
1369 "Datagram-Flow-Id header";
1370 return;
1371 }
1372 }
1373
1374 if (method != "CONNECT" || protocol != "webtransport") {
1375 return;
1376 }
1377
1378 web_transport_ =
1379 std::make_unique<WebTransportHttp3>(spdy_session_, this, id());
1380 }
1381
MaybeProcessSentWebTransportHeaders(spdy::Http2HeaderBlock & headers)1382 void QuicSpdyStream::MaybeProcessSentWebTransportHeaders(
1383 spdy::Http2HeaderBlock& headers) {
1384 if (!spdy_session_->SupportsWebTransport()) {
1385 return;
1386 }
1387 if (session()->perspective() != Perspective::IS_CLIENT) {
1388 return;
1389 }
1390 QUICHE_DCHECK(IsValidWebTransportSessionId(id(), version()));
1391
1392 const auto method_it = headers.find(":method");
1393 const auto protocol_it = headers.find(":protocol");
1394 if (method_it == headers.end() || protocol_it == headers.end()) {
1395 return;
1396 }
1397 if (method_it->second != "CONNECT" && protocol_it->second != "webtransport") {
1398 return;
1399 }
1400
1401 if (spdy_session_->SupportedWebTransportVersion() ==
1402 WebTransportHttp3Version::kDraft02) {
1403 headers["sec-webtransport-http3-draft02"] = "1";
1404 }
1405
1406 web_transport_ =
1407 std::make_unique<WebTransportHttp3>(spdy_session_, this, id());
1408 }
1409
OnCanWriteNewData()1410 void QuicSpdyStream::OnCanWriteNewData() {
1411 if (web_transport_data_ != nullptr) {
1412 web_transport_data_->adapter.OnCanWriteNewData();
1413 }
1414 }
1415
AssertNotWebTransportDataStream(absl::string_view operation)1416 bool QuicSpdyStream::AssertNotWebTransportDataStream(
1417 absl::string_view operation) {
1418 if (web_transport_data_ != nullptr) {
1419 QUIC_BUG(Invalid operation on WebTransport stream)
1420 << "Attempted to " << operation << " on WebTransport data stream "
1421 << id() << " associated with session "
1422 << web_transport_data_->session_id;
1423 OnUnrecoverableError(QUIC_INTERNAL_ERROR,
1424 absl::StrCat("Attempted to ", operation,
1425 " on WebTransport data stream"));
1426 return false;
1427 }
1428 return true;
1429 }
1430
ConvertToWebTransportDataStream(WebTransportSessionId session_id)1431 void QuicSpdyStream::ConvertToWebTransportDataStream(
1432 WebTransportSessionId session_id) {
1433 if (send_buffer().stream_offset() != 0) {
1434 QUIC_BUG(Sending WEBTRANSPORT_STREAM when data already sent)
1435 << "Attempted to send a WEBTRANSPORT_STREAM frame when other data has "
1436 "already been sent on the stream.";
1437 OnUnrecoverableError(QUIC_INTERNAL_ERROR,
1438 "Attempted to send a WEBTRANSPORT_STREAM frame when "
1439 "other data has already been sent on the stream.");
1440 return;
1441 }
1442
1443 std::string header =
1444 HttpEncoder::SerializeWebTransportStreamFrameHeader(session_id);
1445 if (header.empty()) {
1446 QUIC_BUG(Failed to serialize WEBTRANSPORT_STREAM)
1447 << "Failed to serialize a WEBTRANSPORT_STREAM frame.";
1448 OnUnrecoverableError(QUIC_INTERNAL_ERROR,
1449 "Failed to serialize a WEBTRANSPORT_STREAM frame.");
1450 return;
1451 }
1452
1453 WriteOrBufferData(header, /*fin=*/false, nullptr);
1454 web_transport_data_ =
1455 std::make_unique<WebTransportDataStream>(this, session_id);
1456 QUIC_DVLOG(1) << ENDPOINT << "Successfully opened WebTransport data stream "
1457 << id() << " for session " << session_id;
1458 }
1459
WebTransportDataStream(QuicSpdyStream * stream,WebTransportSessionId session_id)1460 QuicSpdyStream::WebTransportDataStream::WebTransportDataStream(
1461 QuicSpdyStream* stream, WebTransportSessionId session_id)
1462 : session_id(session_id),
1463 adapter(stream->spdy_session_, stream, stream->sequencer()) {}
1464
HandleReceivedDatagram(absl::string_view payload)1465 void QuicSpdyStream::HandleReceivedDatagram(absl::string_view payload) {
1466 if (datagram_visitor_ == nullptr) {
1467 QUIC_DLOG(ERROR) << ENDPOINT << "Received datagram without any visitor";
1468 return;
1469 }
1470 datagram_visitor_->OnHttp3Datagram(id(), payload);
1471 }
1472
OnCapsule(const Capsule & capsule)1473 bool QuicSpdyStream::OnCapsule(const Capsule& capsule) {
1474 QUIC_DLOG(INFO) << ENDPOINT << "Stream " << id() << " received capsule "
1475 << capsule;
1476 if (!headers_decompressed_) {
1477 QUIC_PEER_BUG(capsule before headers)
1478 << ENDPOINT << "Stream " << id() << " received capsule " << capsule
1479 << " before headers";
1480 return false;
1481 }
1482 if (web_transport_ != nullptr && web_transport_->close_received()) {
1483 QUIC_PEER_BUG(capsule after close)
1484 << ENDPOINT << "Stream " << id() << " received capsule " << capsule
1485 << " after CLOSE_WEBTRANSPORT_SESSION.";
1486 return false;
1487 }
1488 switch (capsule.capsule_type()) {
1489 case CapsuleType::DATAGRAM:
1490 HandleReceivedDatagram(capsule.datagram_capsule().http_datagram_payload);
1491 return true;
1492 case CapsuleType::LEGACY_DATAGRAM:
1493 HandleReceivedDatagram(
1494 capsule.legacy_datagram_capsule().http_datagram_payload);
1495 return true;
1496 case CapsuleType::LEGACY_DATAGRAM_WITHOUT_CONTEXT:
1497 HandleReceivedDatagram(capsule.legacy_datagram_without_context_capsule()
1498 .http_datagram_payload);
1499 return true;
1500 case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
1501 if (web_transport_ == nullptr) {
1502 QUIC_DLOG(ERROR) << ENDPOINT << "Received capsule " << capsule
1503 << " for a non-WebTransport stream.";
1504 return false;
1505 }
1506 web_transport_->OnCloseReceived(
1507 capsule.close_web_transport_session_capsule().error_code,
1508 capsule.close_web_transport_session_capsule().error_message);
1509 return true;
1510 case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
1511 if (web_transport_ == nullptr) {
1512 QUIC_DLOG(ERROR) << ENDPOINT << "Received capsule " << capsule
1513 << " for a non-WebTransport stream.";
1514 return false;
1515 }
1516 web_transport_->OnDrainSessionReceived();
1517 return true;
1518 case CapsuleType::ADDRESS_ASSIGN:
1519 if (connect_ip_visitor_ == nullptr) {
1520 return true;
1521 }
1522 return connect_ip_visitor_->OnAddressAssignCapsule(
1523 capsule.address_assign_capsule());
1524 case CapsuleType::ADDRESS_REQUEST:
1525 if (connect_ip_visitor_ == nullptr) {
1526 return true;
1527 }
1528 return connect_ip_visitor_->OnAddressRequestCapsule(
1529 capsule.address_request_capsule());
1530 case CapsuleType::ROUTE_ADVERTISEMENT:
1531 if (connect_ip_visitor_ == nullptr) {
1532 return true;
1533 }
1534 return connect_ip_visitor_->OnRouteAdvertisementCapsule(
1535 capsule.route_advertisement_capsule());
1536
1537 // Ignore WebTransport over HTTP/2 capsules.
1538 case CapsuleType::WT_RESET_STREAM:
1539 case CapsuleType::WT_STOP_SENDING:
1540 case CapsuleType::WT_STREAM:
1541 case CapsuleType::WT_STREAM_WITH_FIN:
1542 case CapsuleType::WT_MAX_STREAM_DATA:
1543 case CapsuleType::WT_MAX_STREAMS_BIDI:
1544 case CapsuleType::WT_MAX_STREAMS_UNIDI:
1545 return true;
1546 }
1547 if (datagram_visitor_) {
1548 datagram_visitor_->OnUnknownCapsule(id(), capsule.unknown_capsule());
1549 }
1550 return true;
1551 }
1552
OnCapsuleParseFailure(absl::string_view error_message)1553 void QuicSpdyStream::OnCapsuleParseFailure(absl::string_view error_message) {
1554 QUIC_DLOG(ERROR) << ENDPOINT << "Capsule parse failure: " << error_message;
1555 Reset(QUIC_BAD_APPLICATION_PAYLOAD);
1556 }
1557
WriteCapsule(const Capsule & capsule,bool fin)1558 void QuicSpdyStream::WriteCapsule(const Capsule& capsule, bool fin) {
1559 QUIC_DLOG(INFO) << ENDPOINT << "Stream " << id() << " sending capsule "
1560 << capsule;
1561 quiche::QuicheBuffer serialized_capsule = SerializeCapsule(
1562 capsule,
1563 spdy_session_->connection()->helper()->GetStreamSendBufferAllocator());
1564 QUICHE_DCHECK_GT(serialized_capsule.size(), 0u);
1565 WriteOrBufferBody(serialized_capsule.AsStringView(), /*fin=*/fin);
1566 }
1567
WriteGreaseCapsule()1568 void QuicSpdyStream::WriteGreaseCapsule() {
1569 // GREASE capsulde IDs have a form of 41 * N + 23.
1570 QuicRandom* random = spdy_session_->connection()->random_generator();
1571 uint64_t type = random->InsecureRandUint64() >> 4;
1572 type = (type / 41) * 41 + 23;
1573 QUICHE_DCHECK_EQ((type - 23) % 41, 0u);
1574
1575 constexpr size_t kMaxLength = 64;
1576 size_t length = random->InsecureRandUint64() % kMaxLength;
1577 std::string bytes(length, '\0');
1578 random->InsecureRandBytes(&bytes[0], bytes.size());
1579 Capsule capsule = Capsule::Unknown(type, bytes);
1580 WriteCapsule(capsule, /*fin=*/false);
1581 }
1582
SendHttp3Datagram(absl::string_view payload)1583 MessageStatus QuicSpdyStream::SendHttp3Datagram(absl::string_view payload) {
1584 return spdy_session_->SendHttp3Datagram(id(), payload);
1585 }
1586
RegisterHttp3DatagramVisitor(Http3DatagramVisitor * visitor)1587 void QuicSpdyStream::RegisterHttp3DatagramVisitor(
1588 Http3DatagramVisitor* visitor) {
1589 if (visitor == nullptr) {
1590 QUIC_BUG(null datagram visitor)
1591 << ENDPOINT << "Null datagram visitor for stream ID " << id();
1592 return;
1593 }
1594 QUIC_DLOG(INFO) << ENDPOINT << "Registering datagram visitor with stream ID "
1595 << id();
1596
1597 if (datagram_visitor_ != nullptr) {
1598 QUIC_BUG(h3 datagram double registration)
1599 << ENDPOINT
1600 << "Attempted to doubly register HTTP/3 datagram with stream ID "
1601 << id();
1602 return;
1603 }
1604 datagram_visitor_ = visitor;
1605 QUICHE_DCHECK(!capsule_parser_);
1606 capsule_parser_ = std::make_unique<quiche::CapsuleParser>(this);
1607 }
1608
UnregisterHttp3DatagramVisitor()1609 void QuicSpdyStream::UnregisterHttp3DatagramVisitor() {
1610 if (datagram_visitor_ == nullptr) {
1611 QUIC_BUG(datagram visitor empty during unregistration)
1612 << ENDPOINT << "Cannot unregister datagram visitor for stream ID "
1613 << id();
1614 return;
1615 }
1616 QUIC_DLOG(INFO) << ENDPOINT << "Unregistering datagram visitor for stream ID "
1617 << id();
1618 datagram_visitor_ = nullptr;
1619 }
1620
ReplaceHttp3DatagramVisitor(Http3DatagramVisitor * visitor)1621 void QuicSpdyStream::ReplaceHttp3DatagramVisitor(
1622 Http3DatagramVisitor* visitor) {
1623 QUIC_BUG_IF(h3 datagram unknown move, datagram_visitor_ == nullptr)
1624 << "Attempted to move missing datagram visitor on HTTP/3 stream ID "
1625 << id();
1626 datagram_visitor_ = visitor;
1627 }
1628
RegisterConnectIpVisitor(ConnectIpVisitor * visitor)1629 void QuicSpdyStream::RegisterConnectIpVisitor(ConnectIpVisitor* visitor) {
1630 if (visitor == nullptr) {
1631 QUIC_BUG(null connect - ip visitor)
1632 << ENDPOINT << "Null connect-ip visitor for stream ID " << id();
1633 return;
1634 }
1635 QUIC_DLOG(INFO) << ENDPOINT
1636 << "Registering CONNECT-IP visitor with stream ID " << id();
1637
1638 if (connect_ip_visitor_ != nullptr) {
1639 QUIC_BUG(connect - ip double registration)
1640 << ENDPOINT << "Attempted to doubly register CONNECT-IP with stream ID "
1641 << id();
1642 return;
1643 }
1644 connect_ip_visitor_ = visitor;
1645 }
1646
UnregisterConnectIpVisitor()1647 void QuicSpdyStream::UnregisterConnectIpVisitor() {
1648 if (connect_ip_visitor_ == nullptr) {
1649 QUIC_BUG(connect - ip visitor empty during unregistration)
1650 << ENDPOINT << "Cannot unregister CONNECT-IP visitor for stream ID "
1651 << id();
1652 return;
1653 }
1654 QUIC_DLOG(INFO) << ENDPOINT
1655 << "Unregistering CONNECT-IP visitor for stream ID " << id();
1656 connect_ip_visitor_ = nullptr;
1657 }
1658
ReplaceConnectIpVisitor(ConnectIpVisitor * visitor)1659 void QuicSpdyStream::ReplaceConnectIpVisitor(ConnectIpVisitor* visitor) {
1660 QUIC_BUG_IF(connect - ip unknown move, connect_ip_visitor_ == nullptr)
1661 << "Attempted to move missing CONNECT-IP visitor on HTTP/3 stream ID "
1662 << id();
1663 connect_ip_visitor_ = visitor;
1664 }
1665
SetMaxDatagramTimeInQueue(QuicTime::Delta max_time_in_queue)1666 void QuicSpdyStream::SetMaxDatagramTimeInQueue(
1667 QuicTime::Delta max_time_in_queue) {
1668 spdy_session_->SetMaxDatagramTimeInQueueForStreamId(id(), max_time_in_queue);
1669 }
1670
OnDatagramReceived(QuicDataReader * reader)1671 void QuicSpdyStream::OnDatagramReceived(QuicDataReader* reader) {
1672 if (!headers_decompressed_) {
1673 QUIC_DLOG(INFO) << "Dropping datagram received before headers on stream ID "
1674 << id();
1675 return;
1676 }
1677 HandleReceivedDatagram(reader->ReadRemainingPayload());
1678 }
1679
GetMaxDatagramSize() const1680 QuicByteCount QuicSpdyStream::GetMaxDatagramSize() const {
1681 QuicByteCount prefix_size = 0;
1682 switch (spdy_session_->http_datagram_support()) {
1683 case HttpDatagramSupport::kDraft04:
1684 case HttpDatagramSupport::kRfc:
1685 prefix_size =
1686 QuicDataWriter::GetVarInt62Len(id() / kHttpDatagramStreamIdDivisor);
1687 break;
1688 case HttpDatagramSupport::kNone:
1689 case HttpDatagramSupport::kRfcAndDraft04:
1690 QUIC_BUG(GetMaxDatagramSize called with no datagram support)
1691 << "GetMaxDatagramSize() called when no HTTP/3 datagram support has "
1692 "been negotiated. Support value: "
1693 << spdy_session_->http_datagram_support();
1694 break;
1695 }
1696 // If the logic above fails, use the largest possible value as the safe one.
1697 if (prefix_size == 0) {
1698 prefix_size = 8;
1699 }
1700
1701 QuicByteCount max_datagram_size =
1702 session()->GetGuaranteedLargestMessagePayload();
1703 if (max_datagram_size < prefix_size) {
1704 QUIC_BUG(max_datagram_size smaller than prefix_size)
1705 << "GetGuaranteedLargestMessagePayload() returned a datagram size that "
1706 "is not sufficient to fit stream ID into it.";
1707 return 0;
1708 }
1709 return max_datagram_size - prefix_size;
1710 }
1711
HandleBodyAvailable()1712 void QuicSpdyStream::HandleBodyAvailable() {
1713 if (!capsule_parser_) {
1714 OnBodyAvailable();
1715 return;
1716 }
1717 while (body_manager_.HasBytesToRead()) {
1718 iovec iov;
1719 int num_iov = GetReadableRegions(&iov, /*iov_len=*/1);
1720 if (num_iov == 0) {
1721 break;
1722 }
1723 if (!capsule_parser_->IngestCapsuleFragment(absl::string_view(
1724 reinterpret_cast<const char*>(iov.iov_base), iov.iov_len))) {
1725 break;
1726 }
1727 MarkConsumed(iov.iov_len);
1728 }
1729 // If we received a FIN, make sure that there isn't a partial capsule buffered
1730 // in the capsule parser.
1731 if (sequencer()->IsClosed()) {
1732 capsule_parser_->ErrorIfThereIsRemainingBufferedData();
1733 if (web_transport_ != nullptr) {
1734 web_transport_->OnConnectStreamFinReceived();
1735 }
1736 OnFinRead();
1737 }
1738 }
1739
1740 namespace {
1741
1742 // Return true if `name` only has allowed characters.
IsValidHeaderName(absl::string_view name)1743 bool IsValidHeaderName(absl::string_view name) {
1744 if (name.empty()) {
1745 return true;
1746 }
1747
1748 // Remove leading colon of pseudo-headers.
1749 // This is the only position where colon is allowed.
1750 if (name[0] == ':') {
1751 name.remove_prefix(1);
1752 }
1753
1754 return http2::adapter::HeaderValidator::IsValidHeaderName(name);
1755 }
1756
1757 } // namespace
1758
ValidateReceivedHeaders(const QuicHeaderList & header_list)1759 bool QuicSpdyStream::ValidateReceivedHeaders(
1760 const QuicHeaderList& header_list) {
1761 bool force_fail_validation = false;
1762 AdjustTestValue("quic::QuicSpdyStream::request_header_validation_adjust",
1763 &force_fail_validation);
1764 if (force_fail_validation) {
1765 invalid_request_details_ =
1766 "request_header_validation_adjust force failed the validation.";
1767 QUIC_DLOG(ERROR) << invalid_request_details_;
1768 return false;
1769 }
1770 bool is_response = false;
1771 for (const std::pair<std::string, std::string>& pair : header_list) {
1772 const std::string& name = pair.first;
1773 if (!IsValidHeaderName(name)) {
1774 invalid_request_details_ =
1775 absl::StrCat("Invalid character in header name ", name);
1776 QUIC_DLOG(ERROR) << invalid_request_details_;
1777 return false;
1778 }
1779 if (name == ":status") {
1780 is_response = !pair.second.empty();
1781 }
1782 if (is_response && name == "host") {
1783 // Host header is allowed in response.
1784 continue;
1785 }
1786 if (http2::GetInvalidHttp2HeaderSet().contains(name)) {
1787 invalid_request_details_ = absl::StrCat(name, " header is not allowed");
1788 QUIC_DLOG(ERROR) << invalid_request_details_;
1789 return false;
1790 }
1791 }
1792 return true;
1793 }
1794
set_invalid_request_details(std::string invalid_request_details)1795 void QuicSpdyStream::set_invalid_request_details(
1796 std::string invalid_request_details) {
1797 QUIC_BUG_IF(
1798 empty invalid request detail,
1799 !invalid_request_details_.empty() || invalid_request_details.empty());
1800 invalid_request_details_ = std::move(invalid_request_details);
1801 }
1802
AreHeaderFieldValuesValid(const QuicHeaderList & header_list) const1803 bool QuicSpdyStream::AreHeaderFieldValuesValid(
1804 const QuicHeaderList& header_list) const {
1805 if (!VersionUsesHttp3(transport_version())) {
1806 return true;
1807 }
1808 // According to https://www.rfc-editor.org/rfc/rfc9114.html#section-10.3
1809 // "[...] HTTP/3 can transport field values that are not valid. While most
1810 // values that can be encoded will not alter field parsing, carriage return
1811 // (ASCII 0x0d), line feed (ASCII 0x0a), and the null character (ASCII 0x00)
1812 // might be exploited by an attacker if they are translated verbatim. Any
1813 // request or response that contains a character not permitted in a field
1814 // value MUST be treated as malformed.
1815 // [...]"
1816 for (const std::pair<std::string, std::string>& pair : header_list) {
1817 const std::string& value = pair.second;
1818 for (const auto c : value) {
1819 if (c == '\0' || c == '\n' || c == '\r') {
1820 return false;
1821 }
1822 }
1823 }
1824 return true;
1825 }
1826
OnInvalidHeaders()1827 void QuicSpdyStream::OnInvalidHeaders() { Reset(QUIC_BAD_APPLICATION_PAYLOAD); }
1828
CloseReadSide()1829 void QuicSpdyStream::CloseReadSide() {
1830 QuicStream::CloseReadSide();
1831
1832 // QuicStream::CloseReadSide() releases buffered read data from
1833 // QuicStreamSequencer, invalidating every reference held by `body_manager_`.
1834 body_manager_.Clear();
1835 }
1836
1837 #undef ENDPOINT // undef for jumbo builds
1838 } // namespace quic
1839