xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/core/http/quic_spdy_stream.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/http/quic_spdy_stream.h"
6 
7 #include <limits>
8 #include <memory>
9 #include <optional>
10 #include <string>
11 #include <utility>
12 
13 #include "absl/base/macros.h"
14 #include "absl/strings/numbers.h"
15 #include "absl/strings/str_cat.h"
16 #include "absl/strings/string_view.h"
17 #include "quiche/http2/adapter/header_validator.h"
18 #include "quiche/http2/http2_constants.h"
19 #include "quiche/quic/core/http/http_constants.h"
20 #include "quiche/quic/core/http/http_decoder.h"
21 #include "quiche/quic/core/http/http_frames.h"
22 #include "quiche/quic/core/http/quic_spdy_session.h"
23 #include "quiche/quic/core/http/spdy_utils.h"
24 #include "quiche/quic/core/http/web_transport_http3.h"
25 #include "quiche/quic/core/qpack/qpack_decoder.h"
26 #include "quiche/quic/core/qpack/qpack_encoder.h"
27 #include "quiche/quic/core/quic_error_codes.h"
28 #include "quiche/quic/core/quic_stream_priority.h"
29 #include "quiche/quic/core/quic_types.h"
30 #include "quiche/quic/core/quic_utils.h"
31 #include "quiche/quic/core/quic_versions.h"
32 #include "quiche/quic/core/quic_write_blocked_list.h"
33 #include "quiche/quic/core/web_transport_interface.h"
34 #include "quiche/quic/platform/api/quic_bug_tracker.h"
35 #include "quiche/quic/platform/api/quic_flag_utils.h"
36 #include "quiche/quic/platform/api/quic_flags.h"
37 #include "quiche/quic/platform/api/quic_logging.h"
38 #include "quiche/quic/platform/api/quic_testvalue.h"
39 #include "quiche/common/capsule.h"
40 #include "quiche/common/platform/api/quiche_flag_utils.h"
41 #include "quiche/common/platform/api/quiche_logging.h"
42 #include "quiche/common/quiche_mem_slice_storage.h"
43 #include "quiche/common/quiche_text_utils.h"
44 #include "quiche/spdy/core/spdy_protocol.h"
45 
46 using ::quiche::Capsule;
47 using ::quiche::CapsuleType;
48 using ::spdy::Http2HeaderBlock;
49 
50 namespace quic {
51 
52 // Visitor of HttpDecoder that passes data frame to QuicSpdyStream and closes
53 // the connection on unexpected frames.
54 class QuicSpdyStream::HttpDecoderVisitor : public HttpDecoder::Visitor {
55  public:
HttpDecoderVisitor(QuicSpdyStream * stream)56   explicit HttpDecoderVisitor(QuicSpdyStream* stream) : stream_(stream) {}
57   HttpDecoderVisitor(const HttpDecoderVisitor&) = delete;
58   HttpDecoderVisitor& operator=(const HttpDecoderVisitor&) = delete;
59 
OnError(HttpDecoder * decoder)60   void OnError(HttpDecoder* decoder) override {
61     stream_->OnUnrecoverableError(decoder->error(), decoder->error_detail());
62   }
63 
OnMaxPushIdFrame()64   bool OnMaxPushIdFrame() override {
65     CloseConnectionOnWrongFrame("Max Push Id");
66     return false;
67   }
68 
OnGoAwayFrame(const GoAwayFrame &)69   bool OnGoAwayFrame(const GoAwayFrame& /*frame*/) override {
70     CloseConnectionOnWrongFrame("Goaway");
71     return false;
72   }
73 
OnSettingsFrameStart(QuicByteCount)74   bool OnSettingsFrameStart(QuicByteCount /*header_length*/) override {
75     CloseConnectionOnWrongFrame("Settings");
76     return false;
77   }
78 
OnSettingsFrame(const SettingsFrame &)79   bool OnSettingsFrame(const SettingsFrame& /*frame*/) override {
80     CloseConnectionOnWrongFrame("Settings");
81     return false;
82   }
83 
OnDataFrameStart(QuicByteCount header_length,QuicByteCount payload_length)84   bool OnDataFrameStart(QuicByteCount header_length,
85                         QuicByteCount payload_length) override {
86     return stream_->OnDataFrameStart(header_length, payload_length);
87   }
88 
OnDataFramePayload(absl::string_view payload)89   bool OnDataFramePayload(absl::string_view payload) override {
90     QUICHE_DCHECK(!payload.empty());
91     return stream_->OnDataFramePayload(payload);
92   }
93 
OnDataFrameEnd()94   bool OnDataFrameEnd() override { return stream_->OnDataFrameEnd(); }
95 
OnHeadersFrameStart(QuicByteCount header_length,QuicByteCount payload_length)96   bool OnHeadersFrameStart(QuicByteCount header_length,
97                            QuicByteCount payload_length) override {
98     if (!VersionUsesHttp3(stream_->transport_version())) {
99       CloseConnectionOnWrongFrame("Headers");
100       return false;
101     }
102     return stream_->OnHeadersFrameStart(header_length, payload_length);
103   }
104 
OnHeadersFramePayload(absl::string_view payload)105   bool OnHeadersFramePayload(absl::string_view payload) override {
106     QUICHE_DCHECK(!payload.empty());
107     if (!VersionUsesHttp3(stream_->transport_version())) {
108       CloseConnectionOnWrongFrame("Headers");
109       return false;
110     }
111     return stream_->OnHeadersFramePayload(payload);
112   }
113 
OnHeadersFrameEnd()114   bool OnHeadersFrameEnd() override {
115     if (!VersionUsesHttp3(stream_->transport_version())) {
116       CloseConnectionOnWrongFrame("Headers");
117       return false;
118     }
119     return stream_->OnHeadersFrameEnd();
120   }
121 
OnPriorityUpdateFrameStart(QuicByteCount)122   bool OnPriorityUpdateFrameStart(QuicByteCount /*header_length*/) override {
123     CloseConnectionOnWrongFrame("Priority update");
124     return false;
125   }
126 
OnPriorityUpdateFrame(const PriorityUpdateFrame &)127   bool OnPriorityUpdateFrame(const PriorityUpdateFrame& /*frame*/) override {
128     CloseConnectionOnWrongFrame("Priority update");
129     return false;
130   }
131 
OnAcceptChFrameStart(QuicByteCount)132   bool OnAcceptChFrameStart(QuicByteCount /*header_length*/) override {
133     CloseConnectionOnWrongFrame("ACCEPT_CH");
134     return false;
135   }
136 
OnAcceptChFrame(const AcceptChFrame &)137   bool OnAcceptChFrame(const AcceptChFrame& /*frame*/) override {
138     CloseConnectionOnWrongFrame("ACCEPT_CH");
139     return false;
140   }
141 
OnWebTransportStreamFrameType(QuicByteCount header_length,WebTransportSessionId session_id)142   void OnWebTransportStreamFrameType(
143       QuicByteCount header_length, WebTransportSessionId session_id) override {
144     stream_->OnWebTransportStreamFrameType(header_length, session_id);
145   }
146 
OnMetadataFrameStart(QuicByteCount header_length,QuicByteCount payload_length)147   bool OnMetadataFrameStart(QuicByteCount header_length,
148                             QuicByteCount payload_length) override {
149     if (!VersionUsesHttp3(stream_->transport_version())) {
150       CloseConnectionOnWrongFrame("Metadata");
151       return false;
152     }
153     return stream_->OnMetadataFrameStart(header_length, payload_length);
154   }
155 
OnMetadataFramePayload(absl::string_view payload)156   bool OnMetadataFramePayload(absl::string_view payload) override {
157     QUICHE_DCHECK(!payload.empty());
158     if (!VersionUsesHttp3(stream_->transport_version())) {
159       CloseConnectionOnWrongFrame("Metadata");
160       return false;
161     }
162     return stream_->OnMetadataFramePayload(payload);
163   }
164 
OnMetadataFrameEnd()165   bool OnMetadataFrameEnd() override {
166     if (!VersionUsesHttp3(stream_->transport_version())) {
167       CloseConnectionOnWrongFrame("Metadata");
168       return false;
169     }
170     return stream_->OnMetadataFrameEnd();
171   }
172 
OnUnknownFrameStart(uint64_t frame_type,QuicByteCount header_length,QuicByteCount payload_length)173   bool OnUnknownFrameStart(uint64_t frame_type, QuicByteCount header_length,
174                            QuicByteCount payload_length) override {
175     return stream_->OnUnknownFrameStart(frame_type, header_length,
176                                         payload_length);
177   }
178 
OnUnknownFramePayload(absl::string_view payload)179   bool OnUnknownFramePayload(absl::string_view payload) override {
180     return stream_->OnUnknownFramePayload(payload);
181   }
182 
OnUnknownFrameEnd()183   bool OnUnknownFrameEnd() override { return stream_->OnUnknownFrameEnd(); }
184 
185  private:
CloseConnectionOnWrongFrame(absl::string_view frame_type)186   void CloseConnectionOnWrongFrame(absl::string_view frame_type) {
187     stream_->OnUnrecoverableError(
188         QUIC_HTTP_FRAME_UNEXPECTED_ON_SPDY_STREAM,
189         absl::StrCat(frame_type, " frame received on data stream"));
190   }
191 
192   QuicSpdyStream* stream_;
193 };
194 
195 #define ENDPOINT                                                   \
196   (session()->perspective() == Perspective::IS_SERVER ? "Server: " \
197                                                       : "Client:"  \
198                                                         " ")
199 
QuicSpdyStream(QuicStreamId id,QuicSpdySession * spdy_session,StreamType type)200 QuicSpdyStream::QuicSpdyStream(QuicStreamId id, QuicSpdySession* spdy_session,
201                                StreamType type)
202     : QuicStream(id, spdy_session, /*is_static=*/false, type),
203       spdy_session_(spdy_session),
204       on_body_available_called_because_sequencer_is_closed_(false),
205       visitor_(nullptr),
206       blocked_on_decoding_headers_(false),
207       headers_decompressed_(false),
208       header_list_size_limit_exceeded_(false),
209       headers_payload_length_(0),
210       trailers_decompressed_(false),
211       trailers_consumed_(false),
212       http_decoder_visitor_(std::make_unique<HttpDecoderVisitor>(this)),
213       decoder_(http_decoder_visitor_.get()),
214       sequencer_offset_(0),
215       is_decoder_processing_input_(false),
216       ack_listener_(nullptr),
217       last_sent_priority_(
218           QuicStreamPriority::Default(spdy_session->priority_type())) {
219   QUICHE_DCHECK_EQ(session()->connection(), spdy_session->connection());
220   QUICHE_DCHECK_EQ(transport_version(), spdy_session->transport_version());
221   QUICHE_DCHECK(!QuicUtils::IsCryptoStreamId(transport_version(), id));
222   QUICHE_DCHECK_EQ(0u, sequencer()->NumBytesConsumed());
223   // If headers are sent on the headers stream, then do not receive any
224   // callbacks from the sequencer until headers are complete.
225   if (!VersionUsesHttp3(transport_version())) {
226     sequencer()->SetBlockedUntilFlush();
227   }
228 
229   if (VersionUsesHttp3(transport_version())) {
230     sequencer()->set_level_triggered(true);
231   }
232 
233   spdy_session_->OnStreamCreated(this);
234 }
235 
QuicSpdyStream(PendingStream * pending,QuicSpdySession * spdy_session)236 QuicSpdyStream::QuicSpdyStream(PendingStream* pending,
237                                QuicSpdySession* spdy_session)
238     : QuicStream(pending, spdy_session, /*is_static=*/false),
239       spdy_session_(spdy_session),
240       on_body_available_called_because_sequencer_is_closed_(false),
241       visitor_(nullptr),
242       blocked_on_decoding_headers_(false),
243       headers_decompressed_(false),
244       header_list_size_limit_exceeded_(false),
245       headers_payload_length_(0),
246       trailers_decompressed_(false),
247       trailers_consumed_(false),
248       http_decoder_visitor_(std::make_unique<HttpDecoderVisitor>(this)),
249       decoder_(http_decoder_visitor_.get()),
250       sequencer_offset_(sequencer()->NumBytesConsumed()),
251       is_decoder_processing_input_(false),
252       ack_listener_(nullptr),
253       last_sent_priority_(
254           QuicStreamPriority::Default(spdy_session->priority_type())) {
255   QUICHE_DCHECK_EQ(session()->connection(), spdy_session->connection());
256   QUICHE_DCHECK_EQ(transport_version(), spdy_session->transport_version());
257   QUICHE_DCHECK(!QuicUtils::IsCryptoStreamId(transport_version(), id()));
258   // If headers are sent on the headers stream, then do not receive any
259   // callbacks from the sequencer until headers are complete.
260   if (!VersionUsesHttp3(transport_version())) {
261     sequencer()->SetBlockedUntilFlush();
262   }
263 
264   if (VersionUsesHttp3(transport_version())) {
265     sequencer()->set_level_triggered(true);
266   }
267 
268   spdy_session_->OnStreamCreated(this);
269 }
270 
~QuicSpdyStream()271 QuicSpdyStream::~QuicSpdyStream() {}
272 
WriteHeaders(Http2HeaderBlock header_block,bool fin,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)273 size_t QuicSpdyStream::WriteHeaders(
274     Http2HeaderBlock header_block, bool fin,
275     quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
276         ack_listener) {
277   if (!AssertNotWebTransportDataStream("writing headers")) {
278     return 0;
279   }
280 
281   QuicConnection::ScopedPacketFlusher flusher(spdy_session_->connection());
282 
283   MaybeProcessSentWebTransportHeaders(header_block);
284 
285   if (web_transport_ != nullptr &&
286       spdy_session_->perspective() == Perspective::IS_SERVER &&
287       spdy_session_->SupportedWebTransportVersion() ==
288           WebTransportHttp3Version::kDraft02) {
289     header_block["sec-webtransport-http3-draft"] = "draft02";
290   }
291 
292   size_t bytes_written =
293       WriteHeadersImpl(std::move(header_block), fin, std::move(ack_listener));
294   if (!VersionUsesHttp3(transport_version()) && fin) {
295     // If HEADERS are sent on the headers stream, then |fin_sent_| needs to be
296     // set and write side needs to be closed without actually sending a FIN on
297     // this stream.
298     // TODO(rch): Add test to ensure fin_sent_ is set whenever a fin is sent.
299     SetFinSent();
300     CloseWriteSide();
301   }
302 
303   if (web_transport_ != nullptr &&
304       session()->perspective() == Perspective::IS_CLIENT) {
305     WriteGreaseCapsule();
306     if (spdy_session_->http_datagram_support() ==
307         HttpDatagramSupport::kDraft04) {
308       // Send a REGISTER_DATAGRAM_NO_CONTEXT capsule to support servers that
309       // are running draft-ietf-masque-h3-datagram-04 or -05.
310       uint64_t capsule_type = 0xff37a2;  // REGISTER_DATAGRAM_NO_CONTEXT
311       constexpr unsigned char capsule_data[4] = {
312           0x80, 0xff, 0x7c, 0x00,  // WEBTRANSPORT datagram format type
313       };
314       WriteCapsule(Capsule::Unknown(
315           capsule_type,
316           absl::string_view(reinterpret_cast<const char*>(capsule_data),
317                             sizeof(capsule_data))));
318       WriteGreaseCapsule();
319     }
320   }
321 
322   if (connect_ip_visitor_ != nullptr) {
323     connect_ip_visitor_->OnHeadersWritten();
324   }
325 
326   return bytes_written;
327 }
328 
WriteOrBufferBody(absl::string_view data,bool fin)329 void QuicSpdyStream::WriteOrBufferBody(absl::string_view data, bool fin) {
330   if (!AssertNotWebTransportDataStream("writing body data")) {
331     return;
332   }
333   if (!VersionUsesHttp3(transport_version()) || data.length() == 0) {
334     WriteOrBufferData(data, fin, nullptr);
335     return;
336   }
337   QuicConnection::ScopedPacketFlusher flusher(spdy_session_->connection());
338 
339   const bool success =
340       WriteDataFrameHeader(data.length(), /*force_write=*/true);
341   QUICHE_DCHECK(success);
342 
343   // Write body.
344   QUIC_DVLOG(1) << ENDPOINT << "Stream " << id()
345                 << " is writing DATA frame payload of length " << data.length()
346                 << " with fin " << fin;
347   WriteOrBufferData(data, fin, nullptr);
348 }
349 
WriteTrailers(Http2HeaderBlock trailer_block,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)350 size_t QuicSpdyStream::WriteTrailers(
351     Http2HeaderBlock trailer_block,
352     quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
353         ack_listener) {
354   if (fin_sent()) {
355     QUIC_BUG(quic_bug_10410_1)
356         << "Trailers cannot be sent after a FIN, on stream " << id();
357     return 0;
358   }
359 
360   if (!VersionUsesHttp3(transport_version())) {
361     // The header block must contain the final offset for this stream, as the
362     // trailers may be processed out of order at the peer.
363     const QuicStreamOffset final_offset =
364         stream_bytes_written() + BufferedDataBytes();
365     QUIC_DVLOG(1) << ENDPOINT << "Inserting trailer: (" << kFinalOffsetHeaderKey
366                   << ", " << final_offset << ")";
367     trailer_block.insert(
368         std::make_pair(kFinalOffsetHeaderKey, absl::StrCat(final_offset)));
369   }
370 
371   // Write the trailing headers with a FIN, and close stream for writing:
372   // trailers are the last thing to be sent on a stream.
373   const bool kFin = true;
374   size_t bytes_written =
375       WriteHeadersImpl(std::move(trailer_block), kFin, std::move(ack_listener));
376 
377   // If trailers are sent on the headers stream, then |fin_sent_| needs to be
378   // set without actually sending a FIN on this stream.
379   if (!VersionUsesHttp3(transport_version())) {
380     SetFinSent();
381 
382     // Also, write side of this stream needs to be closed.  However, only do
383     // this if there is no more buffered data, otherwise it will never be sent.
384     if (BufferedDataBytes() == 0) {
385       CloseWriteSide();
386     }
387   }
388 
389   return bytes_written;
390 }
391 
WritevBody(const struct iovec * iov,int count,bool fin)392 QuicConsumedData QuicSpdyStream::WritevBody(const struct iovec* iov, int count,
393                                             bool fin) {
394   quiche::QuicheMemSliceStorage storage(
395       iov, count,
396       session()->connection()->helper()->GetStreamSendBufferAllocator(),
397       GetQuicFlag(quic_send_buffer_max_data_slice_size));
398   return WriteBodySlices(storage.ToSpan(), fin);
399 }
400 
WriteDataFrameHeader(QuicByteCount data_length,bool force_write)401 bool QuicSpdyStream::WriteDataFrameHeader(QuicByteCount data_length,
402                                           bool force_write) {
403   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
404   QUICHE_DCHECK_GT(data_length, 0u);
405   quiche::QuicheBuffer header = HttpEncoder::SerializeDataFrameHeader(
406       data_length,
407       spdy_session_->connection()->helper()->GetStreamSendBufferAllocator());
408   const bool can_write = CanWriteNewDataAfterData(header.size());
409   if (!can_write && !force_write) {
410     return false;
411   }
412 
413   if (spdy_session_->debug_visitor()) {
414     spdy_session_->debug_visitor()->OnDataFrameSent(id(), data_length);
415   }
416 
417   unacked_frame_headers_offsets_.Add(
418       send_buffer().stream_offset(),
419       send_buffer().stream_offset() + header.size());
420   QUIC_DVLOG(1) << ENDPOINT << "Stream " << id()
421                 << " is writing DATA frame header of length " << header.size();
422   if (can_write) {
423     // Save one copy and allocation if send buffer can accomodate the header.
424     quiche::QuicheMemSlice header_slice(std::move(header));
425     WriteMemSlices(absl::MakeSpan(&header_slice, 1), false);
426   } else {
427     QUICHE_DCHECK(force_write);
428     WriteOrBufferData(header.AsStringView(), false, nullptr);
429   }
430   return true;
431 }
432 
WriteBodySlices(absl::Span<quiche::QuicheMemSlice> slices,bool fin)433 QuicConsumedData QuicSpdyStream::WriteBodySlices(
434     absl::Span<quiche::QuicheMemSlice> slices, bool fin) {
435   if (!VersionUsesHttp3(transport_version()) || slices.empty()) {
436     return WriteMemSlices(slices, fin);
437   }
438 
439   QuicConnection::ScopedPacketFlusher flusher(spdy_session_->connection());
440   const QuicByteCount data_size = MemSliceSpanTotalSize(slices);
441   if (!WriteDataFrameHeader(data_size, /*force_write=*/false)) {
442     return {0, false};
443   }
444 
445   QUIC_DVLOG(1) << ENDPOINT << "Stream " << id()
446                 << " is writing DATA frame payload of length " << data_size;
447   return WriteMemSlices(slices, fin);
448 }
449 
Readv(const struct iovec * iov,size_t iov_len)450 size_t QuicSpdyStream::Readv(const struct iovec* iov, size_t iov_len) {
451   QUICHE_DCHECK(FinishedReadingHeaders());
452   if (!VersionUsesHttp3(transport_version())) {
453     return sequencer()->Readv(iov, iov_len);
454   }
455   size_t bytes_read = 0;
456   sequencer()->MarkConsumed(body_manager_.ReadBody(iov, iov_len, &bytes_read));
457 
458   return bytes_read;
459 }
460 
GetReadableRegions(iovec * iov,size_t iov_len) const461 int QuicSpdyStream::GetReadableRegions(iovec* iov, size_t iov_len) const {
462   QUICHE_DCHECK(FinishedReadingHeaders());
463   if (!VersionUsesHttp3(transport_version())) {
464     return sequencer()->GetReadableRegions(iov, iov_len);
465   }
466   return body_manager_.PeekBody(iov, iov_len);
467 }
468 
MarkConsumed(size_t num_bytes)469 void QuicSpdyStream::MarkConsumed(size_t num_bytes) {
470   QUICHE_DCHECK(FinishedReadingHeaders());
471   if (!VersionUsesHttp3(transport_version())) {
472     sequencer()->MarkConsumed(num_bytes);
473     return;
474   }
475 
476   sequencer()->MarkConsumed(body_manager_.OnBodyConsumed(num_bytes));
477 }
478 
IsDoneReading() const479 bool QuicSpdyStream::IsDoneReading() const {
480   bool done_reading_headers = FinishedReadingHeaders();
481   bool done_reading_body = sequencer()->IsClosed();
482   bool done_reading_trailers = FinishedReadingTrailers();
483   return done_reading_headers && done_reading_body && done_reading_trailers;
484 }
485 
HasBytesToRead() const486 bool QuicSpdyStream::HasBytesToRead() const {
487   if (!VersionUsesHttp3(transport_version())) {
488     return sequencer()->HasBytesToRead();
489   }
490   return body_manager_.HasBytesToRead();
491 }
492 
ReadableBytes() const493 QuicByteCount QuicSpdyStream::ReadableBytes() const {
494   if (!VersionUsesHttp3(transport_version())) {
495     return sequencer()->ReadableBytes();
496   }
497   return body_manager_.ReadableBytes();
498 }
499 
MarkTrailersConsumed()500 void QuicSpdyStream::MarkTrailersConsumed() { trailers_consumed_ = true; }
501 
total_body_bytes_read() const502 uint64_t QuicSpdyStream::total_body_bytes_read() const {
503   if (VersionUsesHttp3(transport_version())) {
504     return body_manager_.total_body_bytes_received();
505   }
506   return sequencer()->NumBytesConsumed();
507 }
508 
ConsumeHeaderList()509 void QuicSpdyStream::ConsumeHeaderList() {
510   header_list_.Clear();
511 
512   if (!FinishedReadingHeaders()) {
513     return;
514   }
515 
516   if (!VersionUsesHttp3(transport_version())) {
517     sequencer()->SetUnblocked();
518     return;
519   }
520 
521   if (body_manager_.HasBytesToRead()) {
522     HandleBodyAvailable();
523     return;
524   }
525 
526   if (sequencer()->IsClosed() &&
527       !on_body_available_called_because_sequencer_is_closed_) {
528     on_body_available_called_because_sequencer_is_closed_ = true;
529     HandleBodyAvailable();
530   }
531 }
532 
OnStreamHeadersPriority(const spdy::SpdyStreamPrecedence & precedence)533 void QuicSpdyStream::OnStreamHeadersPriority(
534     const spdy::SpdyStreamPrecedence& precedence) {
535   QUICHE_DCHECK_EQ(Perspective::IS_SERVER,
536                    session()->connection()->perspective());
537   if (session()->priority_type() != QuicPriorityType::kHttp) {
538     return;
539   }
540   SetPriority(QuicStreamPriority(HttpStreamPriority{
541       precedence.spdy3_priority(), HttpStreamPriority::kDefaultIncremental}));
542 }
543 
OnStreamHeaderList(bool fin,size_t frame_len,const QuicHeaderList & header_list)544 void QuicSpdyStream::OnStreamHeaderList(bool fin, size_t frame_len,
545                                         const QuicHeaderList& header_list) {
546   if (!spdy_session()->user_agent_id().has_value()) {
547     std::string uaid;
548     for (const auto& kv : header_list) {
549       if (quiche::QuicheTextUtils::ToLower(kv.first) == kUserAgentHeaderName) {
550         uaid = kv.second;
551         break;
552       }
553     }
554     spdy_session()->SetUserAgentId(std::move(uaid));
555   }
556 
557   // TODO(b/134706391): remove |fin| argument.
558   // When using Google QUIC, an empty header list indicates that the size limit
559   // has been exceeded.
560   // When using IETF QUIC, there is an explicit signal from
561   // QpackDecodedHeadersAccumulator.
562   if ((VersionUsesHttp3(transport_version()) &&
563        header_list_size_limit_exceeded_) ||
564       (!VersionUsesHttp3(transport_version()) && header_list.empty())) {
565     OnHeadersTooLarge();
566     if (IsDoneReading()) {
567       return;
568     }
569   }
570   if (!NextHeaderIsTrailer()) {
571     OnInitialHeadersComplete(fin, frame_len, header_list);
572   } else {
573     OnTrailingHeadersComplete(fin, frame_len, header_list);
574   }
575 }
576 
OnHeadersDecoded(QuicHeaderList headers,bool header_list_size_limit_exceeded)577 void QuicSpdyStream::OnHeadersDecoded(QuicHeaderList headers,
578                                       bool header_list_size_limit_exceeded) {
579   header_list_size_limit_exceeded_ = header_list_size_limit_exceeded;
580   qpack_decoded_headers_accumulator_.reset();
581 
582   QuicSpdySession::LogHeaderCompressionRatioHistogram(
583       /* using_qpack = */ true,
584       /* is_sent = */ false, headers.compressed_header_bytes(),
585       headers.uncompressed_header_bytes());
586 
587   Http3DebugVisitor* const debug_visitor = spdy_session()->debug_visitor();
588   if (debug_visitor) {
589     debug_visitor->OnHeadersDecoded(id(), headers);
590   }
591 
592   OnStreamHeaderList(/* fin = */ false, headers_payload_length_, headers);
593 
594   if (blocked_on_decoding_headers_) {
595     blocked_on_decoding_headers_ = false;
596     // Continue decoding HTTP/3 frames.
597     OnDataAvailable();
598   }
599 }
600 
OnHeaderDecodingError(QuicErrorCode error_code,absl::string_view error_message)601 void QuicSpdyStream::OnHeaderDecodingError(QuicErrorCode error_code,
602                                            absl::string_view error_message) {
603   qpack_decoded_headers_accumulator_.reset();
604 
605   std::string connection_close_error_message = absl::StrCat(
606       "Error decoding ", headers_decompressed_ ? "trailers" : "headers",
607       " on stream ", id(), ": ", error_message);
608   OnUnrecoverableError(error_code, connection_close_error_message);
609 }
610 
MaybeSendPriorityUpdateFrame()611 void QuicSpdyStream::MaybeSendPriorityUpdateFrame() {
612   if (!VersionUsesHttp3(transport_version()) ||
613       session()->perspective() != Perspective::IS_CLIENT) {
614     return;
615   }
616   if (spdy_session_->priority_type() != QuicPriorityType::kHttp) {
617     return;
618   }
619 
620   if (last_sent_priority_ == priority()) {
621     return;
622   }
623   last_sent_priority_ = priority();
624 
625   spdy_session_->WriteHttp3PriorityUpdate(id(), priority().http());
626 }
627 
OnHeadersTooLarge()628 void QuicSpdyStream::OnHeadersTooLarge() { Reset(QUIC_HEADERS_TOO_LARGE); }
629 
OnInitialHeadersComplete(bool fin,size_t,const QuicHeaderList & header_list)630 void QuicSpdyStream::OnInitialHeadersComplete(
631     bool fin, size_t /*frame_len*/, const QuicHeaderList& header_list) {
632   // TODO(b/134706391): remove |fin| argument.
633   headers_decompressed_ = true;
634   header_list_ = header_list;
635   bool header_too_large = VersionUsesHttp3(transport_version())
636                               ? header_list_size_limit_exceeded_
637                               : header_list.empty();
638   if (!AreHeaderFieldValuesValid(header_list)) {
639     OnInvalidHeaders();
640     return;
641   }
642   // Validate request headers if it did not exceed size limit. If it did,
643   // OnHeadersTooLarge() should have already handled it previously.
644   if (!header_too_large && !ValidateReceivedHeaders(header_list)) {
645     QUIC_CODE_COUNT_N(quic_validate_request_header, 1, 2);
646     QUICHE_DCHECK(!invalid_request_details().empty())
647         << "ValidatedRequestHeaders() returns false without populating "
648            "invalid_request_details_";
649     if (GetQuicReloadableFlag(quic_act_upon_invalid_header)) {
650       QUIC_RELOADABLE_FLAG_COUNT(quic_act_upon_invalid_header);
651       OnInvalidHeaders();
652       return;
653     }
654   }
655   QUIC_CODE_COUNT_N(quic_validate_request_header, 2, 2);
656 
657   if (!header_too_large) {
658     MaybeProcessReceivedWebTransportHeaders();
659   }
660 
661   if (VersionUsesHttp3(transport_version())) {
662     if (fin) {
663       OnStreamFrame(QuicStreamFrame(id(), /* fin = */ true,
664                                     highest_received_byte_offset(),
665                                     absl::string_view()));
666     }
667     return;
668   }
669 
670   if (fin && !rst_sent()) {
671     OnStreamFrame(
672         QuicStreamFrame(id(), fin, /* offset = */ 0, absl::string_view()));
673   }
674   if (FinishedReadingHeaders()) {
675     sequencer()->SetUnblocked();
676   }
677 }
678 
CopyAndValidateTrailers(const QuicHeaderList & header_list,bool expect_final_byte_offset,size_t * final_byte_offset,spdy::Http2HeaderBlock * trailers)679 bool QuicSpdyStream::CopyAndValidateTrailers(const QuicHeaderList& header_list,
680                                              bool expect_final_byte_offset,
681                                              size_t* final_byte_offset,
682                                              spdy::Http2HeaderBlock* trailers) {
683   return SpdyUtils::CopyAndValidateTrailers(
684       header_list, expect_final_byte_offset, final_byte_offset, trailers);
685 }
686 
OnTrailingHeadersComplete(bool fin,size_t,const QuicHeaderList & header_list)687 void QuicSpdyStream::OnTrailingHeadersComplete(
688     bool fin, size_t /*frame_len*/, const QuicHeaderList& header_list) {
689   // TODO(b/134706391): remove |fin| argument.
690   QUICHE_DCHECK(!trailers_decompressed_);
691   if (!VersionUsesHttp3(transport_version()) && fin_received()) {
692     QUIC_DLOG(INFO) << ENDPOINT
693                     << "Received Trailers after FIN, on stream: " << id();
694     stream_delegate()->OnStreamError(QUIC_INVALID_HEADERS_STREAM_DATA,
695                                      "Trailers after fin");
696     return;
697   }
698 
699   if (!VersionUsesHttp3(transport_version()) && !fin) {
700     QUIC_DLOG(INFO) << ENDPOINT
701                     << "Trailers must have FIN set, on stream: " << id();
702     stream_delegate()->OnStreamError(QUIC_INVALID_HEADERS_STREAM_DATA,
703                                      "Fin missing from trailers");
704     return;
705   }
706 
707   size_t final_byte_offset = 0;
708   const bool expect_final_byte_offset = !VersionUsesHttp3(transport_version());
709   if (!CopyAndValidateTrailers(header_list, expect_final_byte_offset,
710                                &final_byte_offset, &received_trailers_)) {
711     QUIC_DLOG(ERROR) << ENDPOINT << "Trailers for stream " << id()
712                      << " are malformed.";
713     stream_delegate()->OnStreamError(QUIC_INVALID_HEADERS_STREAM_DATA,
714                                      "Trailers are malformed");
715     return;
716   }
717   trailers_decompressed_ = true;
718   if (fin) {
719     const QuicStreamOffset offset = VersionUsesHttp3(transport_version())
720                                         ? highest_received_byte_offset()
721                                         : final_byte_offset;
722     OnStreamFrame(QuicStreamFrame(id(), fin, offset, absl::string_view()));
723   }
724 }
725 
RegisterMetadataVisitor(MetadataVisitor * visitor)726 void QuicSpdyStream::RegisterMetadataVisitor(MetadataVisitor* visitor) {
727   QUIC_BUG_IF(Metadata visitor requires http3 metadata flag,
728               !GetQuicReloadableFlag(quic_enable_http3_metadata_decoding));
729   metadata_visitor_ = visitor;
730 }
731 
UnregisterMetadataVisitor()732 void QuicSpdyStream::UnregisterMetadataVisitor() {
733   metadata_visitor_ = nullptr;
734 }
735 
OnPriorityFrame(const spdy::SpdyStreamPrecedence & precedence)736 void QuicSpdyStream::OnPriorityFrame(
737     const spdy::SpdyStreamPrecedence& precedence) {
738   QUICHE_DCHECK_EQ(Perspective::IS_SERVER,
739                    session()->connection()->perspective());
740   if (session()->priority_type() != QuicPriorityType::kHttp) {
741     return;
742   }
743   SetPriority(QuicStreamPriority(HttpStreamPriority{
744       precedence.spdy3_priority(), HttpStreamPriority::kDefaultIncremental}));
745 }
746 
OnStreamReset(const QuicRstStreamFrame & frame)747 void QuicSpdyStream::OnStreamReset(const QuicRstStreamFrame& frame) {
748   if (web_transport_data_ != nullptr) {
749     WebTransportStreamVisitor* webtransport_visitor =
750         web_transport_data_->adapter.visitor();
751     if (webtransport_visitor != nullptr) {
752       webtransport_visitor->OnResetStreamReceived(
753           Http3ErrorToWebTransportOrDefault(frame.ietf_error_code));
754     }
755     QuicStream::OnStreamReset(frame);
756     return;
757   }
758 
759   if (VersionUsesHttp3(transport_version()) && !fin_received() &&
760       spdy_session_->qpack_decoder()) {
761     spdy_session_->qpack_decoder()->OnStreamReset(id());
762     qpack_decoded_headers_accumulator_.reset();
763   }
764 
765   if (VersionUsesHttp3(transport_version()) ||
766       frame.error_code != QUIC_STREAM_NO_ERROR) {
767     QuicStream::OnStreamReset(frame);
768     return;
769   }
770 
771   QUIC_DVLOG(1) << ENDPOINT
772                 << "Received QUIC_STREAM_NO_ERROR, not discarding response";
773   set_rst_received(true);
774   MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
775   set_stream_error(frame.error());
776   CloseWriteSide();
777 }
778 
ResetWithError(QuicResetStreamError error)779 void QuicSpdyStream::ResetWithError(QuicResetStreamError error) {
780   if (VersionUsesHttp3(transport_version()) && !fin_received() &&
781       spdy_session_->qpack_decoder() && web_transport_data_ == nullptr) {
782     spdy_session_->qpack_decoder()->OnStreamReset(id());
783     qpack_decoded_headers_accumulator_.reset();
784   }
785 
786   QuicStream::ResetWithError(error);
787 }
788 
OnStopSending(QuicResetStreamError error)789 bool QuicSpdyStream::OnStopSending(QuicResetStreamError error) {
790   if (web_transport_data_ != nullptr) {
791     WebTransportStreamVisitor* visitor = web_transport_data_->adapter.visitor();
792     if (visitor != nullptr) {
793       visitor->OnStopSendingReceived(
794           Http3ErrorToWebTransportOrDefault(error.ietf_application_code()));
795     }
796   }
797 
798   return QuicStream::OnStopSending(error);
799 }
800 
OnWriteSideInDataRecvdState()801 void QuicSpdyStream::OnWriteSideInDataRecvdState() {
802   if (web_transport_data_ != nullptr) {
803     WebTransportStreamVisitor* visitor = web_transport_data_->adapter.visitor();
804     if (visitor != nullptr) {
805       visitor->OnWriteSideInDataRecvdState();
806     }
807   }
808 
809   QuicStream::OnWriteSideInDataRecvdState();
810 }
811 
OnDataAvailable()812 void QuicSpdyStream::OnDataAvailable() {
813   if (!VersionUsesHttp3(transport_version())) {
814     // Sequencer must be blocked until headers are consumed.
815     QUICHE_DCHECK(FinishedReadingHeaders());
816   }
817 
818   if (!VersionUsesHttp3(transport_version())) {
819     HandleBodyAvailable();
820     return;
821   }
822 
823   if (web_transport_data_ != nullptr) {
824     web_transport_data_->adapter.OnDataAvailable();
825     return;
826   }
827 
828   if (!spdy_session()->ShouldProcessIncomingRequests()) {
829     spdy_session()->OnStreamWaitingForClientSettings(id());
830     return;
831   }
832 
833   if (is_decoder_processing_input_) {
834     // Let the outermost nested OnDataAvailable() call do the work.
835     return;
836   }
837 
838   if (blocked_on_decoding_headers_) {
839     return;
840   }
841 
842   if (spdy_session_->SupportsWebTransport()) {
843     // We do this here, since at this point, we have passed the
844     // ShouldProcessIncomingRequests() check above, meaning we know for a fact
845     // if we should be parsing WEBTRANSPORT_STREAM or not.
846     decoder_.EnableWebTransportStreamParsing();
847   }
848 
849   iovec iov;
850   while (session()->connection()->connected() && !reading_stopped() &&
851          decoder_.error() == QUIC_NO_ERROR) {
852     QUICHE_DCHECK_GE(sequencer_offset_, sequencer()->NumBytesConsumed());
853     if (!sequencer()->PeekRegion(sequencer_offset_, &iov)) {
854       break;
855     }
856 
857     QUICHE_DCHECK(!sequencer()->IsClosed());
858     is_decoder_processing_input_ = true;
859     QuicByteCount processed_bytes = decoder_.ProcessInput(
860         reinterpret_cast<const char*>(iov.iov_base), iov.iov_len);
861     is_decoder_processing_input_ = false;
862     if (!session()->connection()->connected()) {
863       return;
864     }
865     sequencer_offset_ += processed_bytes;
866     if (blocked_on_decoding_headers_) {
867       return;
868     }
869     if (web_transport_data_ != nullptr) {
870       return;
871     }
872   }
873 
874   // Do not call HandleBodyAvailable() until headers are consumed.
875   if (!FinishedReadingHeaders()) {
876     return;
877   }
878 
879   if (body_manager_.HasBytesToRead()) {
880     HandleBodyAvailable();
881     return;
882   }
883 
884   if (sequencer()->IsClosed() &&
885       !on_body_available_called_because_sequencer_is_closed_) {
886     on_body_available_called_because_sequencer_is_closed_ = true;
887     HandleBodyAvailable();
888   }
889 }
890 
OnClose()891 void QuicSpdyStream::OnClose() {
892   QuicStream::OnClose();
893 
894   qpack_decoded_headers_accumulator_.reset();
895 
896   if (visitor_) {
897     Visitor* visitor = visitor_;
898     // Calling Visitor::OnClose() may result the destruction of the visitor,
899     // so we need to ensure we don't call it again.
900     visitor_ = nullptr;
901     visitor->OnClose(this);
902   }
903 
904   if (web_transport_ != nullptr) {
905     web_transport_->OnConnectStreamClosing();
906   }
907   if (web_transport_data_ != nullptr) {
908     WebTransportHttp3* web_transport =
909         spdy_session_->GetWebTransportSession(web_transport_data_->session_id);
910     if (web_transport == nullptr) {
911       // Since there is no guaranteed destruction order for streams, the session
912       // could be already removed from the stream map by the time we reach here.
913       QUIC_DLOG(WARNING) << ENDPOINT << "WebTransport stream " << id()
914                          << " attempted to notify parent session "
915                          << web_transport_data_->session_id
916                          << ", but the session could not be found.";
917       return;
918     }
919     web_transport->OnStreamClosed(id());
920   }
921 }
922 
OnCanWrite()923 void QuicSpdyStream::OnCanWrite() {
924   QuicStream::OnCanWrite();
925 
926   // Trailers (and hence a FIN) may have been sent ahead of queued body bytes.
927   if (!HasBufferedData() && fin_sent()) {
928     CloseWriteSide();
929   }
930 }
931 
FinishedReadingHeaders() const932 bool QuicSpdyStream::FinishedReadingHeaders() const {
933   return headers_decompressed_ && header_list_.empty();
934 }
935 
ParseHeaderStatusCode(const Http2HeaderBlock & header,int * status_code)936 bool QuicSpdyStream::ParseHeaderStatusCode(const Http2HeaderBlock& header,
937                                            int* status_code) {
938   Http2HeaderBlock::const_iterator it = header.find(spdy::kHttp2StatusHeader);
939   if (it == header.end()) {
940     return false;
941   }
942   const absl::string_view status(it->second);
943   return ParseHeaderStatusCode(status, status_code);
944 }
945 
ParseHeaderStatusCode(absl::string_view status,int * status_code)946 bool QuicSpdyStream::ParseHeaderStatusCode(absl::string_view status,
947                                            int* status_code) {
948   if (status.size() != 3) {
949     return false;
950   }
951   // First character must be an integer in range [1,5].
952   if (status[0] < '1' || status[0] > '5') {
953     return false;
954   }
955   // The remaining two characters must be integers.
956   if (!isdigit(status[1]) || !isdigit(status[2])) {
957     return false;
958   }
959   return absl::SimpleAtoi(status, status_code);
960 }
961 
FinishedReadingTrailers() const962 bool QuicSpdyStream::FinishedReadingTrailers() const {
963   // If no further trailing headers are expected, and the decompressed trailers
964   // (if any) have been consumed, then reading of trailers is finished.
965   if (!fin_received()) {
966     return false;
967   } else if (!trailers_decompressed_) {
968     return true;
969   } else {
970     return trailers_consumed_;
971   }
972 }
973 
OnDataFrameStart(QuicByteCount header_length,QuicByteCount payload_length)974 bool QuicSpdyStream::OnDataFrameStart(QuicByteCount header_length,
975                                       QuicByteCount payload_length) {
976   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
977 
978   if (spdy_session_->debug_visitor()) {
979     spdy_session_->debug_visitor()->OnDataFrameReceived(id(), payload_length);
980   }
981 
982   if (!headers_decompressed_ || trailers_decompressed_) {
983     QUICHE_LOG(INFO) << ENDPOINT << "stream_id: " << id()
984                      << ", headers_decompressed: "
985                      << (headers_decompressed_ ? "true" : "false")
986                      << ", trailers_decompressed: "
987                      << (trailers_decompressed_ ? "true" : "false")
988                      << ", NumBytesConsumed: "
989                      << sequencer()->NumBytesConsumed()
990                      << ", total_body_bytes_received: "
991                      << body_manager_.total_body_bytes_received()
992                      << ", header_length: " << header_length
993                      << ", payload_length: " << payload_length;
994     stream_delegate()->OnStreamError(
995         QUIC_HTTP_INVALID_FRAME_SEQUENCE_ON_SPDY_STREAM,
996         "Unexpected DATA frame received.");
997     return false;
998   }
999 
1000   sequencer()->MarkConsumed(body_manager_.OnNonBody(header_length));
1001 
1002   return true;
1003 }
1004 
OnDataFramePayload(absl::string_view payload)1005 bool QuicSpdyStream::OnDataFramePayload(absl::string_view payload) {
1006   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
1007 
1008   body_manager_.OnBody(payload);
1009 
1010   return true;
1011 }
1012 
OnDataFrameEnd()1013 bool QuicSpdyStream::OnDataFrameEnd() {
1014   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
1015 
1016   QUIC_DVLOG(1) << ENDPOINT
1017                 << "Reaches the end of a data frame. Total bytes received are "
1018                 << body_manager_.total_body_bytes_received();
1019   return true;
1020 }
1021 
OnStreamFrameAcked(QuicStreamOffset offset,QuicByteCount data_length,bool fin_acked,QuicTime::Delta ack_delay_time,QuicTime receive_timestamp,QuicByteCount * newly_acked_length)1022 bool QuicSpdyStream::OnStreamFrameAcked(QuicStreamOffset offset,
1023                                         QuicByteCount data_length,
1024                                         bool fin_acked,
1025                                         QuicTime::Delta ack_delay_time,
1026                                         QuicTime receive_timestamp,
1027                                         QuicByteCount* newly_acked_length) {
1028   const bool new_data_acked = QuicStream::OnStreamFrameAcked(
1029       offset, data_length, fin_acked, ack_delay_time, receive_timestamp,
1030       newly_acked_length);
1031 
1032   const QuicByteCount newly_acked_header_length =
1033       GetNumFrameHeadersInInterval(offset, data_length);
1034   QUICHE_DCHECK_LE(newly_acked_header_length, *newly_acked_length);
1035   unacked_frame_headers_offsets_.Difference(offset, offset + data_length);
1036   if (ack_listener_ != nullptr && new_data_acked) {
1037     ack_listener_->OnPacketAcked(
1038         *newly_acked_length - newly_acked_header_length, ack_delay_time);
1039   }
1040   return new_data_acked;
1041 }
1042 
OnStreamFrameRetransmitted(QuicStreamOffset offset,QuicByteCount data_length,bool fin_retransmitted)1043 void QuicSpdyStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
1044                                                 QuicByteCount data_length,
1045                                                 bool fin_retransmitted) {
1046   QuicStream::OnStreamFrameRetransmitted(offset, data_length,
1047                                          fin_retransmitted);
1048 
1049   const QuicByteCount retransmitted_header_length =
1050       GetNumFrameHeadersInInterval(offset, data_length);
1051   QUICHE_DCHECK_LE(retransmitted_header_length, data_length);
1052 
1053   if (ack_listener_ != nullptr) {
1054     ack_listener_->OnPacketRetransmitted(data_length -
1055                                          retransmitted_header_length);
1056   }
1057 }
1058 
GetNumFrameHeadersInInterval(QuicStreamOffset offset,QuicByteCount data_length) const1059 QuicByteCount QuicSpdyStream::GetNumFrameHeadersInInterval(
1060     QuicStreamOffset offset, QuicByteCount data_length) const {
1061   QuicByteCount header_acked_length = 0;
1062   QuicIntervalSet<QuicStreamOffset> newly_acked(offset, offset + data_length);
1063   newly_acked.Intersection(unacked_frame_headers_offsets_);
1064   for (const auto& interval : newly_acked) {
1065     header_acked_length += interval.Length();
1066   }
1067   return header_acked_length;
1068 }
1069 
OnHeadersFrameStart(QuicByteCount header_length,QuicByteCount payload_length)1070 bool QuicSpdyStream::OnHeadersFrameStart(QuicByteCount header_length,
1071                                          QuicByteCount payload_length) {
1072   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
1073   QUICHE_DCHECK(!qpack_decoded_headers_accumulator_);
1074 
1075   if (spdy_session_->debug_visitor()) {
1076     spdy_session_->debug_visitor()->OnHeadersFrameReceived(id(),
1077                                                            payload_length);
1078   }
1079 
1080   headers_payload_length_ = payload_length;
1081 
1082   if (trailers_decompressed_) {
1083     QUICHE_LOG(INFO) << ENDPOINT << "stream_id: " << id()
1084                      << ", headers_decompressed: "
1085                      << (headers_decompressed_ ? "true" : "false")
1086                      << ", NumBytesConsumed: "
1087                      << sequencer()->NumBytesConsumed()
1088                      << ", total_body_bytes_received: "
1089                      << body_manager_.total_body_bytes_received();
1090     stream_delegate()->OnStreamError(
1091         QUIC_HTTP_INVALID_FRAME_SEQUENCE_ON_SPDY_STREAM,
1092         "HEADERS frame received after trailing HEADERS.");
1093     return false;
1094   }
1095 
1096   sequencer()->MarkConsumed(body_manager_.OnNonBody(header_length));
1097 
1098   qpack_decoded_headers_accumulator_ =
1099       std::make_unique<QpackDecodedHeadersAccumulator>(
1100           id(), spdy_session_->qpack_decoder(), this,
1101           spdy_session_->max_inbound_header_list_size());
1102 
1103   return true;
1104 }
1105 
OnHeadersFramePayload(absl::string_view payload)1106 bool QuicSpdyStream::OnHeadersFramePayload(absl::string_view payload) {
1107   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
1108 
1109   if (!qpack_decoded_headers_accumulator_) {
1110     QUIC_BUG(b215142466_OnHeadersFramePayload);
1111     OnHeaderDecodingError(QUIC_INTERNAL_ERROR,
1112                           "qpack_decoded_headers_accumulator_ is nullptr");
1113     return false;
1114   }
1115 
1116   qpack_decoded_headers_accumulator_->Decode(payload);
1117 
1118   // |qpack_decoded_headers_accumulator_| is reset if an error is detected.
1119   if (!qpack_decoded_headers_accumulator_) {
1120     return false;
1121   }
1122 
1123   sequencer()->MarkConsumed(body_manager_.OnNonBody(payload.size()));
1124   return true;
1125 }
1126 
OnHeadersFrameEnd()1127 bool QuicSpdyStream::OnHeadersFrameEnd() {
1128   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
1129 
1130   if (!qpack_decoded_headers_accumulator_) {
1131     QUIC_BUG(b215142466_OnHeadersFrameEnd);
1132     OnHeaderDecodingError(QUIC_INTERNAL_ERROR,
1133                           "qpack_decoded_headers_accumulator_ is nullptr");
1134     return false;
1135   }
1136 
1137   qpack_decoded_headers_accumulator_->EndHeaderBlock();
1138 
1139   // If decoding is complete or an error is detected, then
1140   // |qpack_decoded_headers_accumulator_| is already reset.
1141   if (qpack_decoded_headers_accumulator_) {
1142     blocked_on_decoding_headers_ = true;
1143     return false;
1144   }
1145 
1146   return !sequencer()->IsClosed() && !reading_stopped();
1147 }
1148 
OnWebTransportStreamFrameType(QuicByteCount header_length,WebTransportSessionId session_id)1149 void QuicSpdyStream::OnWebTransportStreamFrameType(
1150     QuicByteCount header_length, WebTransportSessionId session_id) {
1151   QUIC_DVLOG(1) << ENDPOINT << " Received WEBTRANSPORT_STREAM on stream "
1152                 << id() << " for session " << session_id;
1153   QuicStreamOffset offset = sequencer()->NumBytesConsumed();
1154   sequencer()->MarkConsumed(header_length);
1155 
1156   std::optional<WebTransportHttp3Version> version =
1157       spdy_session_->SupportedWebTransportVersion();
1158   QUICHE_DCHECK(version.has_value());
1159   if (version == WebTransportHttp3Version::kDraft02) {
1160     if (headers_payload_length_ > 0 || headers_decompressed_) {
1161       std::string error =
1162           absl::StrCat("Stream ", id(),
1163                        " attempted to convert itself into a WebTransport data "
1164                        "stream, but it already has HTTP data on it");
1165       QUIC_PEER_BUG(WEBTRANSPORT_STREAM received on HTTP request)
1166           << ENDPOINT << error;
1167       OnUnrecoverableError(QUIC_HTTP_INVALID_FRAME_SEQUENCE_ON_SPDY_STREAM,
1168                            error);
1169       return;
1170     }
1171   } else {
1172     if (offset > 0) {
1173       std::string error =
1174           absl::StrCat("Stream ", id(),
1175                        " received WEBTRANSPORT_STREAM at a non-zero offset");
1176       QUIC_DLOG(ERROR) << ENDPOINT << error;
1177       OnUnrecoverableError(QUIC_HTTP_INVALID_FRAME_SEQUENCE_ON_SPDY_STREAM,
1178                            error);
1179       return;
1180     }
1181   }
1182 
1183   if (QuicUtils::IsOutgoingStreamId(spdy_session_->version(), id(),
1184                                     spdy_session_->perspective())) {
1185     std::string error = absl::StrCat(
1186         "Stream ", id(),
1187         " attempted to convert itself into a WebTransport data stream, but "
1188         "only the initiator of the stream can do that");
1189     QUIC_PEER_BUG(WEBTRANSPORT_STREAM received on outgoing request)
1190         << ENDPOINT << error;
1191     OnUnrecoverableError(QUIC_HTTP_INVALID_FRAME_SEQUENCE_ON_SPDY_STREAM,
1192                          error);
1193     return;
1194   }
1195 
1196   QUICHE_DCHECK(web_transport_ == nullptr);
1197   web_transport_data_ =
1198       std::make_unique<WebTransportDataStream>(this, session_id);
1199   spdy_session_->AssociateIncomingWebTransportStreamWithSession(session_id,
1200                                                                 id());
1201 }
1202 
OnMetadataFrameStart(QuicByteCount header_length,QuicByteCount payload_length)1203 bool QuicSpdyStream::OnMetadataFrameStart(QuicByteCount header_length,
1204                                           QuicByteCount payload_length) {
1205   if (metadata_visitor_ == nullptr) {
1206     return OnUnknownFrameStart(
1207         static_cast<uint64_t>(quic::HttpFrameType::METADATA), header_length,
1208         payload_length);
1209   }
1210 
1211   QUIC_BUG_IF(Invalid METADATA state, metadata_decoder_ != nullptr);
1212   constexpr size_t kMaxMetadataBlockSize = 1 << 20;  // 1 MB
1213   metadata_decoder_ = std::make_unique<MetadataDecoder>(
1214       id(), kMaxMetadataBlockSize, header_length, payload_length);
1215 
1216   // Consume the frame header.
1217   QUIC_DVLOG(1) << ENDPOINT << "Consuming " << header_length
1218                 << " byte long frame header of METADATA.";
1219   sequencer()->MarkConsumed(body_manager_.OnNonBody(header_length));
1220   return true;
1221 }
1222 
OnMetadataFramePayload(absl::string_view payload)1223 bool QuicSpdyStream::OnMetadataFramePayload(absl::string_view payload) {
1224   if (metadata_visitor_ == nullptr) {
1225     return OnUnknownFramePayload(payload);
1226   }
1227 
1228   if (!metadata_decoder_->Decode(payload)) {
1229     OnUnrecoverableError(QUIC_DECOMPRESSION_FAILURE,
1230                          metadata_decoder_->error_message());
1231     return false;
1232   }
1233 
1234   // Consume the frame payload.
1235   QUIC_DVLOG(1) << ENDPOINT << "Consuming " << payload.size()
1236                 << " bytes of payload of METADATA.";
1237   sequencer()->MarkConsumed(body_manager_.OnNonBody(payload.size()));
1238   return true;
1239 }
1240 
OnMetadataFrameEnd()1241 bool QuicSpdyStream::OnMetadataFrameEnd() {
1242   if (metadata_visitor_ == nullptr) {
1243     return OnUnknownFrameEnd();
1244   }
1245 
1246   if (!metadata_decoder_->EndHeaderBlock()) {
1247     OnUnrecoverableError(QUIC_DECOMPRESSION_FAILURE,
1248                          metadata_decoder_->error_message());
1249     return false;
1250   }
1251 
1252   metadata_visitor_->OnMetadataComplete(metadata_decoder_->frame_len(),
1253                                         metadata_decoder_->headers());
1254   metadata_decoder_.reset();
1255   return !sequencer()->IsClosed() && !reading_stopped();
1256 }
1257 
OnUnknownFrameStart(uint64_t frame_type,QuicByteCount header_length,QuicByteCount payload_length)1258 bool QuicSpdyStream::OnUnknownFrameStart(uint64_t frame_type,
1259                                          QuicByteCount header_length,
1260                                          QuicByteCount payload_length) {
1261   if (spdy_session_->debug_visitor()) {
1262     spdy_session_->debug_visitor()->OnUnknownFrameReceived(id(), frame_type,
1263                                                            payload_length);
1264   }
1265   spdy_session_->OnUnknownFrameStart(id(), frame_type, header_length,
1266                                      payload_length);
1267 
1268   // Consume the frame header.
1269   QUIC_DVLOG(1) << ENDPOINT << "Consuming " << header_length
1270                 << " byte long frame header of frame of unknown type "
1271                 << frame_type << ".";
1272   sequencer()->MarkConsumed(body_manager_.OnNonBody(header_length));
1273   return true;
1274 }
1275 
OnUnknownFramePayload(absl::string_view payload)1276 bool QuicSpdyStream::OnUnknownFramePayload(absl::string_view payload) {
1277   spdy_session_->OnUnknownFramePayload(id(), payload);
1278 
1279   // Consume the frame payload.
1280   QUIC_DVLOG(1) << ENDPOINT << "Consuming " << payload.size()
1281                 << " bytes of payload of frame of unknown type.";
1282   sequencer()->MarkConsumed(body_manager_.OnNonBody(payload.size()));
1283   return true;
1284 }
1285 
OnUnknownFrameEnd()1286 bool QuicSpdyStream::OnUnknownFrameEnd() { return true; }
1287 
WriteHeadersImpl(spdy::Http2HeaderBlock header_block,bool fin,quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface> ack_listener)1288 size_t QuicSpdyStream::WriteHeadersImpl(
1289     spdy::Http2HeaderBlock header_block, bool fin,
1290     quiche::QuicheReferenceCountedPointer<QuicAckListenerInterface>
1291         ack_listener) {
1292   if (!VersionUsesHttp3(transport_version())) {
1293     return spdy_session_->WriteHeadersOnHeadersStream(
1294         id(), std::move(header_block), fin,
1295         spdy::SpdyStreamPrecedence(priority().http().urgency),
1296         std::move(ack_listener));
1297   }
1298 
1299   // Encode header list.
1300   QuicByteCount encoder_stream_sent_byte_count;
1301   std::string encoded_headers =
1302       spdy_session_->qpack_encoder()->EncodeHeaderList(
1303           id(), header_block, &encoder_stream_sent_byte_count);
1304 
1305   if (spdy_session_->debug_visitor()) {
1306     spdy_session_->debug_visitor()->OnHeadersFrameSent(id(), header_block);
1307   }
1308 
1309   // Write HEADERS frame.
1310   std::string headers_frame_header =
1311       HttpEncoder::SerializeHeadersFrameHeader(encoded_headers.size());
1312   unacked_frame_headers_offsets_.Add(
1313       send_buffer().stream_offset(),
1314       send_buffer().stream_offset() + headers_frame_header.length());
1315 
1316   QUIC_DVLOG(1) << ENDPOINT << "Stream " << id()
1317                 << " is writing HEADERS frame header of length "
1318                 << headers_frame_header.length() << ", and payload of length "
1319                 << encoded_headers.length() << " with fin " << fin;
1320   WriteOrBufferData(absl::StrCat(headers_frame_header, encoded_headers), fin,
1321                     /*ack_listener=*/nullptr);
1322 
1323   QuicSpdySession::LogHeaderCompressionRatioHistogram(
1324       /* using_qpack = */ true,
1325       /* is_sent = */ true,
1326       encoded_headers.size() + encoder_stream_sent_byte_count,
1327       header_block.TotalBytesUsed());
1328 
1329   return encoded_headers.size();
1330 }
1331 
CanWriteNewBodyData(QuicByteCount write_size) const1332 bool QuicSpdyStream::CanWriteNewBodyData(QuicByteCount write_size) const {
1333   QUICHE_DCHECK_NE(0u, write_size);
1334   if (!VersionUsesHttp3(transport_version())) {
1335     return CanWriteNewData();
1336   }
1337 
1338   return CanWriteNewDataAfterData(
1339       HttpEncoder::GetDataFrameHeaderLength(write_size));
1340 }
1341 
MaybeProcessReceivedWebTransportHeaders()1342 void QuicSpdyStream::MaybeProcessReceivedWebTransportHeaders() {
1343   if (!spdy_session_->SupportsWebTransport()) {
1344     return;
1345   }
1346   if (session()->perspective() != Perspective::IS_SERVER) {
1347     return;
1348   }
1349   QUICHE_DCHECK(IsValidWebTransportSessionId(id(), version()));
1350 
1351   std::string method;
1352   std::string protocol;
1353   for (const auto& [header_name, header_value] : header_list_) {
1354     if (header_name == ":method") {
1355       if (!method.empty() || header_value.empty()) {
1356         return;
1357       }
1358       method = header_value;
1359     }
1360     if (header_name == ":protocol") {
1361       if (!protocol.empty() || header_value.empty()) {
1362         return;
1363       }
1364       protocol = header_value;
1365     }
1366     if (header_name == "datagram-flow-id") {
1367       QUIC_DLOG(ERROR) << ENDPOINT
1368                        << "Rejecting WebTransport due to unexpected "
1369                           "Datagram-Flow-Id header";
1370       return;
1371     }
1372   }
1373 
1374   if (method != "CONNECT" || protocol != "webtransport") {
1375     return;
1376   }
1377 
1378   web_transport_ =
1379       std::make_unique<WebTransportHttp3>(spdy_session_, this, id());
1380 }
1381 
MaybeProcessSentWebTransportHeaders(spdy::Http2HeaderBlock & headers)1382 void QuicSpdyStream::MaybeProcessSentWebTransportHeaders(
1383     spdy::Http2HeaderBlock& headers) {
1384   if (!spdy_session_->SupportsWebTransport()) {
1385     return;
1386   }
1387   if (session()->perspective() != Perspective::IS_CLIENT) {
1388     return;
1389   }
1390   QUICHE_DCHECK(IsValidWebTransportSessionId(id(), version()));
1391 
1392   const auto method_it = headers.find(":method");
1393   const auto protocol_it = headers.find(":protocol");
1394   if (method_it == headers.end() || protocol_it == headers.end()) {
1395     return;
1396   }
1397   if (method_it->second != "CONNECT" && protocol_it->second != "webtransport") {
1398     return;
1399   }
1400 
1401   if (spdy_session_->SupportedWebTransportVersion() ==
1402       WebTransportHttp3Version::kDraft02) {
1403     headers["sec-webtransport-http3-draft02"] = "1";
1404   }
1405 
1406   web_transport_ =
1407       std::make_unique<WebTransportHttp3>(spdy_session_, this, id());
1408 }
1409 
OnCanWriteNewData()1410 void QuicSpdyStream::OnCanWriteNewData() {
1411   if (web_transport_data_ != nullptr) {
1412     web_transport_data_->adapter.OnCanWriteNewData();
1413   }
1414 }
1415 
AssertNotWebTransportDataStream(absl::string_view operation)1416 bool QuicSpdyStream::AssertNotWebTransportDataStream(
1417     absl::string_view operation) {
1418   if (web_transport_data_ != nullptr) {
1419     QUIC_BUG(Invalid operation on WebTransport stream)
1420         << "Attempted to " << operation << " on WebTransport data stream "
1421         << id() << " associated with session "
1422         << web_transport_data_->session_id;
1423     OnUnrecoverableError(QUIC_INTERNAL_ERROR,
1424                          absl::StrCat("Attempted to ", operation,
1425                                       " on WebTransport data stream"));
1426     return false;
1427   }
1428   return true;
1429 }
1430 
ConvertToWebTransportDataStream(WebTransportSessionId session_id)1431 void QuicSpdyStream::ConvertToWebTransportDataStream(
1432     WebTransportSessionId session_id) {
1433   if (send_buffer().stream_offset() != 0) {
1434     QUIC_BUG(Sending WEBTRANSPORT_STREAM when data already sent)
1435         << "Attempted to send a WEBTRANSPORT_STREAM frame when other data has "
1436            "already been sent on the stream.";
1437     OnUnrecoverableError(QUIC_INTERNAL_ERROR,
1438                          "Attempted to send a WEBTRANSPORT_STREAM frame when "
1439                          "other data has already been sent on the stream.");
1440     return;
1441   }
1442 
1443   std::string header =
1444       HttpEncoder::SerializeWebTransportStreamFrameHeader(session_id);
1445   if (header.empty()) {
1446     QUIC_BUG(Failed to serialize WEBTRANSPORT_STREAM)
1447         << "Failed to serialize a WEBTRANSPORT_STREAM frame.";
1448     OnUnrecoverableError(QUIC_INTERNAL_ERROR,
1449                          "Failed to serialize a WEBTRANSPORT_STREAM frame.");
1450     return;
1451   }
1452 
1453   WriteOrBufferData(header, /*fin=*/false, nullptr);
1454   web_transport_data_ =
1455       std::make_unique<WebTransportDataStream>(this, session_id);
1456   QUIC_DVLOG(1) << ENDPOINT << "Successfully opened WebTransport data stream "
1457                 << id() << " for session " << session_id;
1458 }
1459 
WebTransportDataStream(QuicSpdyStream * stream,WebTransportSessionId session_id)1460 QuicSpdyStream::WebTransportDataStream::WebTransportDataStream(
1461     QuicSpdyStream* stream, WebTransportSessionId session_id)
1462     : session_id(session_id),
1463       adapter(stream->spdy_session_, stream, stream->sequencer()) {}
1464 
HandleReceivedDatagram(absl::string_view payload)1465 void QuicSpdyStream::HandleReceivedDatagram(absl::string_view payload) {
1466   if (datagram_visitor_ == nullptr) {
1467     QUIC_DLOG(ERROR) << ENDPOINT << "Received datagram without any visitor";
1468     return;
1469   }
1470   datagram_visitor_->OnHttp3Datagram(id(), payload);
1471 }
1472 
OnCapsule(const Capsule & capsule)1473 bool QuicSpdyStream::OnCapsule(const Capsule& capsule) {
1474   QUIC_DLOG(INFO) << ENDPOINT << "Stream " << id() << " received capsule "
1475                   << capsule;
1476   if (!headers_decompressed_) {
1477     QUIC_PEER_BUG(capsule before headers)
1478         << ENDPOINT << "Stream " << id() << " received capsule " << capsule
1479         << " before headers";
1480     return false;
1481   }
1482   if (web_transport_ != nullptr && web_transport_->close_received()) {
1483     QUIC_PEER_BUG(capsule after close)
1484         << ENDPOINT << "Stream " << id() << " received capsule " << capsule
1485         << " after CLOSE_WEBTRANSPORT_SESSION.";
1486     return false;
1487   }
1488   switch (capsule.capsule_type()) {
1489     case CapsuleType::DATAGRAM:
1490       HandleReceivedDatagram(capsule.datagram_capsule().http_datagram_payload);
1491       return true;
1492     case CapsuleType::LEGACY_DATAGRAM:
1493       HandleReceivedDatagram(
1494           capsule.legacy_datagram_capsule().http_datagram_payload);
1495       return true;
1496     case CapsuleType::LEGACY_DATAGRAM_WITHOUT_CONTEXT:
1497       HandleReceivedDatagram(capsule.legacy_datagram_without_context_capsule()
1498                                  .http_datagram_payload);
1499       return true;
1500     case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
1501       if (web_transport_ == nullptr) {
1502         QUIC_DLOG(ERROR) << ENDPOINT << "Received capsule " << capsule
1503                          << " for a non-WebTransport stream.";
1504         return false;
1505       }
1506       web_transport_->OnCloseReceived(
1507           capsule.close_web_transport_session_capsule().error_code,
1508           capsule.close_web_transport_session_capsule().error_message);
1509       return true;
1510     case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
1511       if (web_transport_ == nullptr) {
1512         QUIC_DLOG(ERROR) << ENDPOINT << "Received capsule " << capsule
1513                          << " for a non-WebTransport stream.";
1514         return false;
1515       }
1516       web_transport_->OnDrainSessionReceived();
1517       return true;
1518     case CapsuleType::ADDRESS_ASSIGN:
1519       if (connect_ip_visitor_ == nullptr) {
1520         return true;
1521       }
1522       return connect_ip_visitor_->OnAddressAssignCapsule(
1523           capsule.address_assign_capsule());
1524     case CapsuleType::ADDRESS_REQUEST:
1525       if (connect_ip_visitor_ == nullptr) {
1526         return true;
1527       }
1528       return connect_ip_visitor_->OnAddressRequestCapsule(
1529           capsule.address_request_capsule());
1530     case CapsuleType::ROUTE_ADVERTISEMENT:
1531       if (connect_ip_visitor_ == nullptr) {
1532         return true;
1533       }
1534       return connect_ip_visitor_->OnRouteAdvertisementCapsule(
1535           capsule.route_advertisement_capsule());
1536 
1537     // Ignore WebTransport over HTTP/2 capsules.
1538     case CapsuleType::WT_RESET_STREAM:
1539     case CapsuleType::WT_STOP_SENDING:
1540     case CapsuleType::WT_STREAM:
1541     case CapsuleType::WT_STREAM_WITH_FIN:
1542     case CapsuleType::WT_MAX_STREAM_DATA:
1543     case CapsuleType::WT_MAX_STREAMS_BIDI:
1544     case CapsuleType::WT_MAX_STREAMS_UNIDI:
1545       return true;
1546   }
1547   if (datagram_visitor_) {
1548     datagram_visitor_->OnUnknownCapsule(id(), capsule.unknown_capsule());
1549   }
1550   return true;
1551 }
1552 
OnCapsuleParseFailure(absl::string_view error_message)1553 void QuicSpdyStream::OnCapsuleParseFailure(absl::string_view error_message) {
1554   QUIC_DLOG(ERROR) << ENDPOINT << "Capsule parse failure: " << error_message;
1555   Reset(QUIC_BAD_APPLICATION_PAYLOAD);
1556 }
1557 
WriteCapsule(const Capsule & capsule,bool fin)1558 void QuicSpdyStream::WriteCapsule(const Capsule& capsule, bool fin) {
1559   QUIC_DLOG(INFO) << ENDPOINT << "Stream " << id() << " sending capsule "
1560                   << capsule;
1561   quiche::QuicheBuffer serialized_capsule = SerializeCapsule(
1562       capsule,
1563       spdy_session_->connection()->helper()->GetStreamSendBufferAllocator());
1564   QUICHE_DCHECK_GT(serialized_capsule.size(), 0u);
1565   WriteOrBufferBody(serialized_capsule.AsStringView(), /*fin=*/fin);
1566 }
1567 
WriteGreaseCapsule()1568 void QuicSpdyStream::WriteGreaseCapsule() {
1569   // GREASE capsulde IDs have a form of 41 * N + 23.
1570   QuicRandom* random = spdy_session_->connection()->random_generator();
1571   uint64_t type = random->InsecureRandUint64() >> 4;
1572   type = (type / 41) * 41 + 23;
1573   QUICHE_DCHECK_EQ((type - 23) % 41, 0u);
1574 
1575   constexpr size_t kMaxLength = 64;
1576   size_t length = random->InsecureRandUint64() % kMaxLength;
1577   std::string bytes(length, '\0');
1578   random->InsecureRandBytes(&bytes[0], bytes.size());
1579   Capsule capsule = Capsule::Unknown(type, bytes);
1580   WriteCapsule(capsule, /*fin=*/false);
1581 }
1582 
SendHttp3Datagram(absl::string_view payload)1583 MessageStatus QuicSpdyStream::SendHttp3Datagram(absl::string_view payload) {
1584   return spdy_session_->SendHttp3Datagram(id(), payload);
1585 }
1586 
RegisterHttp3DatagramVisitor(Http3DatagramVisitor * visitor)1587 void QuicSpdyStream::RegisterHttp3DatagramVisitor(
1588     Http3DatagramVisitor* visitor) {
1589   if (visitor == nullptr) {
1590     QUIC_BUG(null datagram visitor)
1591         << ENDPOINT << "Null datagram visitor for stream ID " << id();
1592     return;
1593   }
1594   QUIC_DLOG(INFO) << ENDPOINT << "Registering datagram visitor with stream ID "
1595                   << id();
1596 
1597   if (datagram_visitor_ != nullptr) {
1598     QUIC_BUG(h3 datagram double registration)
1599         << ENDPOINT
1600         << "Attempted to doubly register HTTP/3 datagram with stream ID "
1601         << id();
1602     return;
1603   }
1604   datagram_visitor_ = visitor;
1605   QUICHE_DCHECK(!capsule_parser_);
1606   capsule_parser_ = std::make_unique<quiche::CapsuleParser>(this);
1607 }
1608 
UnregisterHttp3DatagramVisitor()1609 void QuicSpdyStream::UnregisterHttp3DatagramVisitor() {
1610   if (datagram_visitor_ == nullptr) {
1611     QUIC_BUG(datagram visitor empty during unregistration)
1612         << ENDPOINT << "Cannot unregister datagram visitor for stream ID "
1613         << id();
1614     return;
1615   }
1616   QUIC_DLOG(INFO) << ENDPOINT << "Unregistering datagram visitor for stream ID "
1617                   << id();
1618   datagram_visitor_ = nullptr;
1619 }
1620 
ReplaceHttp3DatagramVisitor(Http3DatagramVisitor * visitor)1621 void QuicSpdyStream::ReplaceHttp3DatagramVisitor(
1622     Http3DatagramVisitor* visitor) {
1623   QUIC_BUG_IF(h3 datagram unknown move, datagram_visitor_ == nullptr)
1624       << "Attempted to move missing datagram visitor on HTTP/3 stream ID "
1625       << id();
1626   datagram_visitor_ = visitor;
1627 }
1628 
RegisterConnectIpVisitor(ConnectIpVisitor * visitor)1629 void QuicSpdyStream::RegisterConnectIpVisitor(ConnectIpVisitor* visitor) {
1630   if (visitor == nullptr) {
1631     QUIC_BUG(null connect - ip visitor)
1632         << ENDPOINT << "Null connect-ip visitor for stream ID " << id();
1633     return;
1634   }
1635   QUIC_DLOG(INFO) << ENDPOINT
1636                   << "Registering CONNECT-IP visitor with stream ID " << id();
1637 
1638   if (connect_ip_visitor_ != nullptr) {
1639     QUIC_BUG(connect - ip double registration)
1640         << ENDPOINT << "Attempted to doubly register CONNECT-IP with stream ID "
1641         << id();
1642     return;
1643   }
1644   connect_ip_visitor_ = visitor;
1645 }
1646 
UnregisterConnectIpVisitor()1647 void QuicSpdyStream::UnregisterConnectIpVisitor() {
1648   if (connect_ip_visitor_ == nullptr) {
1649     QUIC_BUG(connect - ip visitor empty during unregistration)
1650         << ENDPOINT << "Cannot unregister CONNECT-IP visitor for stream ID "
1651         << id();
1652     return;
1653   }
1654   QUIC_DLOG(INFO) << ENDPOINT
1655                   << "Unregistering CONNECT-IP visitor for stream ID " << id();
1656   connect_ip_visitor_ = nullptr;
1657 }
1658 
ReplaceConnectIpVisitor(ConnectIpVisitor * visitor)1659 void QuicSpdyStream::ReplaceConnectIpVisitor(ConnectIpVisitor* visitor) {
1660   QUIC_BUG_IF(connect - ip unknown move, connect_ip_visitor_ == nullptr)
1661       << "Attempted to move missing CONNECT-IP visitor on HTTP/3 stream ID "
1662       << id();
1663   connect_ip_visitor_ = visitor;
1664 }
1665 
SetMaxDatagramTimeInQueue(QuicTime::Delta max_time_in_queue)1666 void QuicSpdyStream::SetMaxDatagramTimeInQueue(
1667     QuicTime::Delta max_time_in_queue) {
1668   spdy_session_->SetMaxDatagramTimeInQueueForStreamId(id(), max_time_in_queue);
1669 }
1670 
OnDatagramReceived(QuicDataReader * reader)1671 void QuicSpdyStream::OnDatagramReceived(QuicDataReader* reader) {
1672   if (!headers_decompressed_) {
1673     QUIC_DLOG(INFO) << "Dropping datagram received before headers on stream ID "
1674                     << id();
1675     return;
1676   }
1677   HandleReceivedDatagram(reader->ReadRemainingPayload());
1678 }
1679 
GetMaxDatagramSize() const1680 QuicByteCount QuicSpdyStream::GetMaxDatagramSize() const {
1681   QuicByteCount prefix_size = 0;
1682   switch (spdy_session_->http_datagram_support()) {
1683     case HttpDatagramSupport::kDraft04:
1684     case HttpDatagramSupport::kRfc:
1685       prefix_size =
1686           QuicDataWriter::GetVarInt62Len(id() / kHttpDatagramStreamIdDivisor);
1687       break;
1688     case HttpDatagramSupport::kNone:
1689     case HttpDatagramSupport::kRfcAndDraft04:
1690       QUIC_BUG(GetMaxDatagramSize called with no datagram support)
1691           << "GetMaxDatagramSize() called when no HTTP/3 datagram support has "
1692              "been negotiated.  Support value: "
1693           << spdy_session_->http_datagram_support();
1694       break;
1695   }
1696   // If the logic above fails, use the largest possible value as the safe one.
1697   if (prefix_size == 0) {
1698     prefix_size = 8;
1699   }
1700 
1701   QuicByteCount max_datagram_size =
1702       session()->GetGuaranteedLargestMessagePayload();
1703   if (max_datagram_size < prefix_size) {
1704     QUIC_BUG(max_datagram_size smaller than prefix_size)
1705         << "GetGuaranteedLargestMessagePayload() returned a datagram size that "
1706            "is not sufficient to fit stream ID into it.";
1707     return 0;
1708   }
1709   return max_datagram_size - prefix_size;
1710 }
1711 
HandleBodyAvailable()1712 void QuicSpdyStream::HandleBodyAvailable() {
1713   if (!capsule_parser_) {
1714     OnBodyAvailable();
1715     return;
1716   }
1717   while (body_manager_.HasBytesToRead()) {
1718     iovec iov;
1719     int num_iov = GetReadableRegions(&iov, /*iov_len=*/1);
1720     if (num_iov == 0) {
1721       break;
1722     }
1723     if (!capsule_parser_->IngestCapsuleFragment(absl::string_view(
1724             reinterpret_cast<const char*>(iov.iov_base), iov.iov_len))) {
1725       break;
1726     }
1727     MarkConsumed(iov.iov_len);
1728   }
1729   // If we received a FIN, make sure that there isn't a partial capsule buffered
1730   // in the capsule parser.
1731   if (sequencer()->IsClosed()) {
1732     capsule_parser_->ErrorIfThereIsRemainingBufferedData();
1733     if (web_transport_ != nullptr) {
1734       web_transport_->OnConnectStreamFinReceived();
1735     }
1736     OnFinRead();
1737   }
1738 }
1739 
1740 namespace {
1741 
1742 // Return true if `name` only has allowed characters.
IsValidHeaderName(absl::string_view name)1743 bool IsValidHeaderName(absl::string_view name) {
1744   if (name.empty()) {
1745     return true;
1746   }
1747 
1748   // Remove leading colon of pseudo-headers.
1749   // This is the only position where colon is allowed.
1750   if (name[0] == ':') {
1751     name.remove_prefix(1);
1752   }
1753 
1754   return http2::adapter::HeaderValidator::IsValidHeaderName(name);
1755 }
1756 
1757 }  // namespace
1758 
ValidateReceivedHeaders(const QuicHeaderList & header_list)1759 bool QuicSpdyStream::ValidateReceivedHeaders(
1760     const QuicHeaderList& header_list) {
1761   bool force_fail_validation = false;
1762   AdjustTestValue("quic::QuicSpdyStream::request_header_validation_adjust",
1763                   &force_fail_validation);
1764   if (force_fail_validation) {
1765     invalid_request_details_ =
1766         "request_header_validation_adjust force failed the validation.";
1767     QUIC_DLOG(ERROR) << invalid_request_details_;
1768     return false;
1769   }
1770   bool is_response = false;
1771   for (const std::pair<std::string, std::string>& pair : header_list) {
1772     const std::string& name = pair.first;
1773     if (!IsValidHeaderName(name)) {
1774       invalid_request_details_ =
1775           absl::StrCat("Invalid character in header name ", name);
1776       QUIC_DLOG(ERROR) << invalid_request_details_;
1777       return false;
1778     }
1779     if (name == ":status") {
1780       is_response = !pair.second.empty();
1781     }
1782     if (is_response && name == "host") {
1783       // Host header is allowed in response.
1784       continue;
1785     }
1786     if (http2::GetInvalidHttp2HeaderSet().contains(name)) {
1787       invalid_request_details_ = absl::StrCat(name, " header is not allowed");
1788       QUIC_DLOG(ERROR) << invalid_request_details_;
1789       return false;
1790     }
1791   }
1792   return true;
1793 }
1794 
set_invalid_request_details(std::string invalid_request_details)1795 void QuicSpdyStream::set_invalid_request_details(
1796     std::string invalid_request_details) {
1797   QUIC_BUG_IF(
1798       empty invalid request detail,
1799       !invalid_request_details_.empty() || invalid_request_details.empty());
1800   invalid_request_details_ = std::move(invalid_request_details);
1801 }
1802 
AreHeaderFieldValuesValid(const QuicHeaderList & header_list) const1803 bool QuicSpdyStream::AreHeaderFieldValuesValid(
1804     const QuicHeaderList& header_list) const {
1805   if (!VersionUsesHttp3(transport_version())) {
1806     return true;
1807   }
1808   // According to https://www.rfc-editor.org/rfc/rfc9114.html#section-10.3
1809   // "[...] HTTP/3 can transport field values that are not valid. While most
1810   // values that can be encoded will not alter field parsing, carriage return
1811   // (ASCII 0x0d), line feed (ASCII 0x0a), and the null character (ASCII 0x00)
1812   // might be exploited by an attacker if they are translated verbatim. Any
1813   // request or response that contains a character not permitted in a field
1814   // value MUST be treated as malformed.
1815   // [...]"
1816   for (const std::pair<std::string, std::string>& pair : header_list) {
1817     const std::string& value = pair.second;
1818     for (const auto c : value) {
1819       if (c == '\0' || c == '\n' || c == '\r') {
1820         return false;
1821       }
1822     }
1823   }
1824   return true;
1825 }
1826 
OnInvalidHeaders()1827 void QuicSpdyStream::OnInvalidHeaders() { Reset(QUIC_BAD_APPLICATION_PAYLOAD); }
1828 
CloseReadSide()1829 void QuicSpdyStream::CloseReadSide() {
1830   QuicStream::CloseReadSide();
1831 
1832   // QuicStream::CloseReadSide() releases buffered read data from
1833   // QuicStreamSequencer, invalidating every reference held by `body_manager_`.
1834   body_manager_.Clear();
1835 }
1836 
1837 #undef ENDPOINT  // undef for jumbo builds
1838 }  // namespace quic
1839