xref: /aosp_15_r20/external/pigweed/pw_grpc/connection.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_grpc/connection.h"
16 
17 #include <cinttypes>
18 #include <cstring>
19 #include <string_view>
20 #include <type_traits>
21 
22 #include "pw_assert/check.h"
23 #include "pw_chrono/system_clock.h"
24 #include "pw_grpc_private/hpack.h"
25 #include "pw_log/log.h"
26 #include "pw_preprocessor/compiler.h"
27 #include "pw_status/try.h"
28 
29 namespace pw::grpc {
30 namespace internal {
31 
32 // RFC 9113 §6
33 // Enum names left in naming style of RFC
34 enum class FrameType : uint8_t {
35   DATA = 0x00,
36   HEADERS = 0x01,
37   PRIORITY = 0x02,
38   RST_STREAM = 0x03,
39   SETTINGS = 0x04,
40   PUSH_PROMISE = 0x05,
41   PING = 0x06,
42   GOAWAY = 0x07,
43   WINDOW_UPDATE = 0x08,
44   CONTINUATION = 0x09,
45 };
46 
47 // RFC 9113 §4.1
48 constexpr size_t kFrameHeaderEncodedSize = 9;
49 struct FrameHeader {
50   uint32_t payload_length;
51   FrameType type;
52   uint8_t flags;
53   StreamId stream_id;
54 };
55 
56 // RFC 9113 §7
57 // Enum names left in naming style of RFC
58 enum class Http2Error : uint32_t {
59   NO_ERROR = 0x00,
60   PROTOCOL_ERROR = 0x01,
61   INTERNAL_ERROR = 0x02,
62   FLOW_CONTROL_ERROR = 0x03,
63   SETTINGS_TIMEOUT = 0x04,
64   STREAM_CLOSED = 0x05,
65   FRAME_SIZE_ERROR = 0x06,
66   REFUSED_STREAM = 0x07,
67   CANCEL = 0x08,
68   COMPRESSION_ERROR = 0x09,
69   CONNECT_ERROR = 0x0a,
70   ENHANCE_YOUR_CALM = 0x0b,
71   INADEQUATE_SECURITY = 0x0c,
72   HTTP_1_1_REQUIRED = 0x0d,
73 };
74 
75 }  // namespace internal
76 
77 namespace {
78 
79 using internal::FrameHeader;
80 using internal::FrameType;
81 using internal::Http2Error;
82 using internal::kMaxConcurrentStreams;
83 using internal::kMaxGrpcMessageSize;
84 
85 // RFC 9113 §3.4
86 constexpr std::string_view kExpectedConnectionPrefaceLiteral(
87     "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
88 
89 static_assert(kMaxMethodNameSize == kHpackMaxStringSize);
90 
91 enum {
92   FLAGS_ACK = 0x01,
93   FLAGS_END_STREAM = 0x01,
94   FLAGS_END_HEADERS = 0x04,
95   FLAGS_PADDED = 0x08,
96   FLAGS_PRIORITY = 0x20,
97 };
98 
99 // RFC 9113 §6.5.2
100 enum SettingType : uint16_t {
101   SETTINGS_HEADER_TABLE_SIZE = 0x01,
102   SETTINGS_ENABLE_PUSH = 0x02,
103   SETTINGS_MAX_CONCURRENT_STREAMS = 0x03,
104   SETTINGS_INITIAL_WINDOW_SIZE = 0x04,
105   SETTINGS_MAX_FRAME_SIZE = 0x05,
106   SETTINGS_MAX_HEADER_LIST_SIZE = 0x06,
107 };
108 
ReadExactly(stream::Reader & reader,ByteSpan buffer)109 Status ReadExactly(stream::Reader& reader, ByteSpan buffer) {
110   size_t bytes_read = 0;
111   while (bytes_read < buffer.size()) {
112     PW_TRY_ASSIGN(auto out, reader.Read(buffer.subspan(bytes_read)));
113     bytes_read += out.size();
114   }
115   return OkStatus();
116 }
117 
ReadFrameHeader(stream::Reader & reader)118 Result<FrameHeader> ReadFrameHeader(stream::Reader& reader) {
119   std::array<std::byte, internal::kFrameHeaderEncodedSize> buffer;
120   PW_TRY(ReadExactly(reader, buffer));
121 
122   // RFC 9113 §4.1
123   FrameHeader out;
124   ByteBuilder builder(as_writable_bytes(span{buffer}));
125   auto it = builder.begin();
126   auto type_and_length = it.ReadUint32(endian::big);
127   out.payload_length = type_and_length >> 8;
128   out.type = static_cast<FrameType>(type_and_length & 0xff);
129   out.flags = it.ReadUint8();
130   out.stream_id = it.ReadUint32(endian::big) & 0x7fffffff;
131   return out;
132 }
133 
134 template <typename T, std::enable_if_t<std::is_integral_v<T>, bool> = true>
ToNetworkOrder(T value)135 constexpr T ToNetworkOrder(T value) {
136   return bytes::ConvertOrder(/*from=*/endian::native,
137                              /*to=*/endian::big,
138                              value);
139 }
140 
141 template <typename T, std::enable_if_t<std::is_enum_v<T>, bool> = true>
ToNetworkOrder(T value)142 constexpr std::underlying_type_t<T> ToNetworkOrder(T value) {
143   return ToNetworkOrder(static_cast<std::underlying_type_t<T>>(value));
144 }
145 
146 // Use this instead of FrameHeader when writing frames.
PW_PACKED(struct)147 PW_PACKED(struct) WireFrameHeader {
148   WireFrameHeader(FrameHeader h)
149       : payload_length_and_type(ToNetworkOrder(h.payload_length << 8 |
150                                                static_cast<uint32_t>(h.type))),
151         flags(h.flags),
152         stream_id(ToNetworkOrder(h.stream_id)) {}
153 
154   uint32_t payload_length_and_type;
155   uint8_t flags;
156   uint32_t stream_id;
157 };
158 
159 template <typename T>
AsBytes(T & object)160 ConstByteSpan AsBytes(T& object) {
161   return as_bytes(span<T, 1>{&object, 1});
162 }
163 
164 // RFC 9113 §6.1
SendData(SendQueue & send_queue,StreamId stream_id,ConstByteSpan payload1,ConstByteSpan payload2)165 Status SendData(SendQueue& send_queue,
166                 StreamId stream_id,
167                 ConstByteSpan payload1,
168                 ConstByteSpan payload2) {
169   PW_LOG_DEBUG("Conn.Send DATA with id=%" PRIu32 " len1=%" PRIu32
170                " len2=%" PRIu32,
171                stream_id,
172                static_cast<uint32_t>(payload1.size()),
173                static_cast<uint32_t>(payload2.size()));
174   WireFrameHeader frame(FrameHeader{
175       .payload_length =
176           static_cast<uint32_t>(payload1.size() + payload2.size()),
177       .type = FrameType::DATA,
178       .flags = 0,
179       .stream_id = stream_id,
180   });
181   std::array<ConstByteSpan, 3> data_vector = {AsBytes(frame)};
182   size_t i = 1;
183   if (!payload1.empty()) {
184     data_vector[i++] = payload1;
185   }
186   if (!payload2.empty()) {
187     data_vector[i++] = payload2;
188   }
189   PW_TRY(send_queue.SendBytesVector(span{data_vector.data(), i}));
190   return OkStatus();
191 }
192 
193 // RFC 9113 §6.2
SendHeaders(SendQueue & send_queue,StreamId stream_id,ConstByteSpan payload1,ConstByteSpan payload2,bool end_stream)194 Status SendHeaders(SendQueue& send_queue,
195                    StreamId stream_id,
196                    ConstByteSpan payload1,
197                    ConstByteSpan payload2,
198                    bool end_stream) {
199   PW_LOG_DEBUG("Conn.Send HEADERS with id=%" PRIu32 " len1=%" PRIu32
200                " len2=%" PRIu32 " end=%d",
201                stream_id,
202                static_cast<uint32_t>(payload1.size()),
203                static_cast<uint32_t>(payload2.size()),
204                end_stream);
205   WireFrameHeader frame(FrameHeader{
206       .payload_length =
207           static_cast<uint32_t>(payload1.size() + payload2.size()),
208       .type = FrameType::HEADERS,
209       .flags = FLAGS_END_HEADERS,
210       .stream_id = stream_id,
211   });
212 
213   if (end_stream) {
214     frame.flags |= FLAGS_END_STREAM;
215   }
216 
217   std::array<ConstByteSpan, 3> headers_vector = {AsBytes(frame)};
218   size_t i = 1;
219   if (!payload1.empty()) {
220     headers_vector[i++] = payload1;
221   }
222   if (!payload2.empty()) {
223     headers_vector[i++] = payload2;
224   }
225   PW_TRY(send_queue.SendBytesVector(span{headers_vector.data(), i}));
226 
227   return OkStatus();
228 }
229 
230 // RFC 9113 §6.4
SendRstStream(SendQueue & send_queue,StreamId stream_id,Http2Error code)231 Status SendRstStream(SendQueue& send_queue,
232                      StreamId stream_id,
233                      Http2Error code) {
234   PW_PACKED(struct) RstStreamFrame {
235     WireFrameHeader header;
236     uint32_t error_code;
237   };
238   RstStreamFrame frame{
239       .header = WireFrameHeader(FrameHeader{
240           .payload_length = 4,
241           .type = FrameType::RST_STREAM,
242           .flags = 0,
243           .stream_id = stream_id,
244       }),
245       .error_code = ToNetworkOrder(code),
246   };
247   PW_TRY(send_queue.SendBytes(AsBytes(frame)));
248 
249   return OkStatus();
250 }
251 
252 // RFC 9113 §6.9
SendWindowUpdates(SendQueue & send_queue,StreamId stream_id,uint32_t increment)253 Status SendWindowUpdates(SendQueue& send_queue,
254                          StreamId stream_id,
255                          uint32_t increment) {
256   // It is illegal to send updates with increment=0.
257   if (increment == 0) {
258     return OkStatus();
259   }
260   if (increment & 0x80000000) {
261     // Upper bit is reserved, error.
262     return Status::InvalidArgument();
263   }
264 
265   PW_LOG_DEBUG("Conn.Send WINDOW_UPDATE frames with id=%" PRIu32
266                " increment=%" PRIu32,
267                stream_id,
268                increment);
269 
270   PW_PACKED(struct) WindowUpdateFrame {
271     WireFrameHeader header;
272     uint32_t increment;
273   };
274   WindowUpdateFrame frames[2] = {
275       {
276           .header = WireFrameHeader(FrameHeader{
277               .payload_length = 4,
278               .type = FrameType::WINDOW_UPDATE,
279               .flags = 0,
280               .stream_id = 0,
281           }),
282           .increment = ToNetworkOrder(increment),
283       },
284       {
285           .header = WireFrameHeader(FrameHeader{
286               .payload_length = 4,
287               .type = FrameType::WINDOW_UPDATE,
288               .flags = 0,
289               .stream_id = stream_id,
290           }),
291           .increment = ToNetworkOrder(increment),
292       },
293   };
294   PW_TRY(send_queue.SendBytes(as_bytes(span{frames})));
295   return OkStatus();
296 }
297 
298 // RFC 9113 §6.5
SendSettingsAck(SendQueue & send_queue)299 Status SendSettingsAck(SendQueue& send_queue) {
300   PW_LOG_DEBUG("Conn.Send SETTINGS ACK");
301   WireFrameHeader frame(FrameHeader{
302       .payload_length = 0,
303       .type = FrameType::SETTINGS,
304       .flags = FLAGS_ACK,
305       .stream_id = 0,
306   });
307   PW_TRY(send_queue.SendBytes(AsBytes(frame)));
308   return OkStatus();
309 }
310 
311 }  // namespace
312 
Connection(stream::ReaderWriter & socket,SendQueue & send_queue,RequestCallbacks & callbacks,allocator::Allocator * message_assembly_allocator)313 Connection::Connection(stream::ReaderWriter& socket,
314                        SendQueue& send_queue,
315                        RequestCallbacks& callbacks,
316                        allocator::Allocator* message_assembly_allocator)
317     : socket_(socket),
318       send_queue_(send_queue),
319       reader_(*this, callbacks),
320       writer_(*this) {
321   LockState()->message_assembly_allocator_ = message_assembly_allocator;
322 }
323 
ProcessFrame()324 Status Connection::Reader::ProcessFrame() {
325   if (!received_connection_preface_) {
326     return Status::FailedPrecondition();
327   }
328 
329   PW_TRY_ASSIGN(auto frame, ReadFrameHeader(connection_.socket_.as_reader()));
330   switch (frame.type) {
331     // Frames that we handle.
332     case FrameType::DATA:
333       PW_TRY(ProcessDataFrame(frame));
334       break;
335     case FrameType::HEADERS:
336       PW_TRY(ProcessHeadersFrame(frame));
337       break;
338     case FrameType::PRIORITY:
339       PW_TRY(ProcessIgnoredFrame(frame));
340       break;
341     case FrameType::RST_STREAM:
342       PW_TRY(ProcessRstStreamFrame(frame));
343       break;
344     case FrameType::SETTINGS:
345       PW_TRY(ProcessSettingsFrame(frame, /*send_ack=*/true));
346       break;
347     case FrameType::PING:
348       PW_TRY(ProcessPingFrame(frame));
349       break;
350     case FrameType::WINDOW_UPDATE:
351       PW_TRY(ProcessWindowUpdateFrame(frame));
352       break;
353 
354     // Frames that trigger an immediate connection close.
355     case FrameType::GOAWAY:
356       PW_LOG_ERROR("Client sent GOAWAY");
357       // don't bother sending GOAWAY in response
358       return Status::Internal();
359     case FrameType::PUSH_PROMISE:
360       PW_LOG_ERROR("Client sent PUSH_PROMISE");
361       SendGoAway(Http2Error::PROTOCOL_ERROR);
362       return Status::Internal();
363     case FrameType::CONTINUATION:
364       PW_LOG_ERROR("Client sent CONTINUATION: unsupported");
365       SendGoAway(Http2Error::INTERNAL_ERROR);
366       return Status::Internal();
367   }
368 
369   return OkStatus();
370 }
371 
372 pw::Result<std::reference_wrapper<Connection::Stream>>
LookupStream(StreamId id)373 Connection::SharedState::LookupStream(StreamId id) {
374   for (size_t i = 0; i < streams.size(); i++) {
375     if (streams[i].id == id) {
376       return streams[i];
377     }
378   }
379   return Status::NotFound();
380 }
381 
SendResponseMessage(StreamId stream_id,ConstByteSpan message)382 Status Connection::Writer::SendResponseMessage(StreamId stream_id,
383                                                ConstByteSpan message) {
384   auto state = connection_.LockState();
385   auto stream = state->LookupStream(stream_id);
386   if (!stream.ok()) {
387     return Status::NotFound();
388   }
389 
390   if (message.size() > kMaxGrpcMessageSize) {
391     PW_LOG_WARN("Message %" PRIu32 " bytes on id=%" PRIu32
392                 " exceeds maximum message size",
393                 static_cast<uint32_t>(message.size()),
394                 stream_id);
395     return Status::InvalidArgument();
396   }
397 
398   // This should block until there is enough send window.
399   if (static_cast<int32_t>(message.size()) > stream->get().send_window ||
400       static_cast<int32_t>(message.size()) > state->connection_send_window) {
401     PW_LOG_WARN("Not enough window to send %" PRIu32 " bytes on id=%" PRIu32,
402                 static_cast<uint32_t>(message.size()),
403                 stream_id);
404     return Status::ResourceExhausted();
405   }
406 
407   auto status = OkStatus();
408   if (!stream->get().started_response) {
409     stream->get().started_response = true;
410     status = SendHeaders(connection_.send_queue_,
411                          stream_id,
412                          ResponseHeadersPayload(),
413                          ConstByteSpan(),
414                          /*end_stream=*/false);
415   }
416   if (status.ok()) {
417     // Write a Length-Prefixed-Message payload.
418     ByteBuffer<5> prefix;
419     prefix.PutUint8(0);
420     prefix.PutUint32(message.size(), endian::big);
421     status = SendData(connection_.send_queue_, stream_id, prefix, message);
422   }
423   if (!status.ok()) {
424     PW_LOG_WARN("Failed sending response message on id=%" PRIu32 " error=%d",
425                 stream_id,
426                 status.code());
427     return Status::Unavailable();
428   }
429   stream->get().send_window -= message.size();
430   state->connection_send_window -= message.size();
431   return OkStatus();
432 }
433 
SendResponseComplete(StreamId stream_id,Status response_code)434 Status Connection::Writer::SendResponseComplete(StreamId stream_id,
435                                                 Status response_code) {
436   auto state = connection_.LockState();
437   auto stream = state->LookupStream(stream_id);
438   if (!stream.ok()) {
439     return Status::NotFound();
440   }
441 
442   Status status;
443   if (!stream->get().started_response) {
444     // If the response has not started yet, we need to include the initial
445     // headers.
446     PW_LOG_DEBUG("Conn.SendResponseWithTrailers id=%" PRIu32 " code=%d",
447                  stream_id,
448                  response_code.code());
449     status = SendHeaders(connection_.send_queue_,
450                          stream_id,
451                          ResponseHeadersPayload(),
452                          ResponseTrailersPayload(response_code),
453                          /*end_stream=*/true);
454   } else {
455     PW_LOG_DEBUG("Conn.SendTrailers id=%" PRIu32 " code=%d",
456                  stream_id,
457                  response_code.code());
458     status = SendHeaders(connection_.send_queue_,
459                          stream_id,
460                          ConstByteSpan(),
461                          ResponseTrailersPayload(response_code),
462                          /*end_stream=*/true);
463   }
464 
465   if (!status.ok()) {
466     PW_LOG_WARN("Failed sending response complete on id=%" PRIu32 " error=%d",
467                 stream_id,
468                 status.code());
469     return Status::Unavailable();
470   }
471 
472   PW_LOG_DEBUG("Conn.CloseStream id=%" PRIu32, stream_id);
473   stream->get().Reset();
474 
475   return OkStatus();
476 }
477 
CreateStream(StreamId id)478 pw::Status Connection::Reader::CreateStream(StreamId id) {
479   auto state = connection_.LockState();
480   for (size_t i = 0; i < state->streams.size(); i++) {
481     if (state->streams[i].id != 0) {
482       continue;
483     }
484     PW_LOG_DEBUG("Conn.CreateStream id=%" PRIu32 " at slot=%" PRIu32,
485                  id,
486                  static_cast<uint32_t>(i));
487     state->streams[i].id = id;
488     state->streams[i].half_closed = false;
489     state->streams[i].started_response = false;
490     state->streams[i].send_window = initial_send_window_;
491     return OkStatus();
492   }
493   PW_LOG_WARN("Conn.CreateStream id=%" PRIu32 " OUT OF SPACE", id);
494   return Status::ResourceExhausted();
495 }
496 
CloseStream(Connection::Stream & stream)497 void Connection::Reader::CloseStream(Connection::Stream& stream) {
498   StreamId id = stream.id;
499   PW_LOG_DEBUG("Conn.CloseStream id=%" PRIu32, id);
500   stream.Reset();
501   callbacks_.OnCancel(id);
502 }
503 
504 // RFC 9113 §3.4
ProcessConnectionPreface()505 Status Connection::Reader::ProcessConnectionPreface() {
506   if (received_connection_preface_) {
507     return OkStatus();
508   }
509 
510   callbacks_.OnNewConnection();
511 
512   // The preface starts with a literal string.
513   auto literal = span{payload_scratch_}.subspan(
514       0, kExpectedConnectionPrefaceLiteral.size());
515 
516   PW_TRY(ReadExactly(connection_.socket_.as_reader(), literal));
517   if (std::memcmp(literal.data(),
518                   kExpectedConnectionPrefaceLiteral.data(),
519                   kExpectedConnectionPrefaceLiteral.size()) != 0) {
520     PW_LOG_ERROR("Invalid connection preface literal");
521     return Status::Internal();
522   }
523 
524   PW_LOG_DEBUG("Conn.Preface received literal");
525 
526   // Client must send a SETTINGS frames.
527   PW_TRY_ASSIGN(auto client_frame,
528                 ReadFrameHeader(connection_.socket_.as_reader()));
529   if (client_frame.type != FrameType::SETTINGS) {
530     PW_LOG_ERROR(
531         "Connection preface missing SETTINGS frame, found frame.type=%d",
532         static_cast<int>(client_frame.type));
533     return Status::Internal();
534   }
535 
536   // Don't send an ACK yet, we'll do that below.
537   PW_TRY(ProcessSettingsFrame(client_frame, /*send_ack=*/false));
538   PW_LOG_DEBUG("Conn.Preface received SETTINGS");
539 
540   // We must send a SETTINGS frame.
541   // RFC 9113 §6.5.2
542   PW_PACKED(struct) Setting {
543     uint16_t id;
544     uint32_t value;
545   };
546   PW_PACKED(struct) SettingsFrame {
547     WireFrameHeader header;
548     Setting settings[2];
549   };
550   SettingsFrame server_frame{
551       .header = WireFrameHeader(FrameHeader{
552           .payload_length = 12,
553           .type = FrameType::SETTINGS,
554           .flags = 0,
555           .stream_id = 0,
556       }),
557       .settings =
558           {
559               {
560                   .id = ToNetworkOrder(SETTINGS_HEADER_TABLE_SIZE),
561                   .value = ToNetworkOrder(kHpackDynamicHeaderTableSize),
562               },
563               {
564                   .id = ToNetworkOrder(SETTINGS_MAX_CONCURRENT_STREAMS),
565                   .value = ToNetworkOrder(kMaxConcurrentStreams),
566               },
567           },
568   };
569   PW_LOG_DEBUG("Conn.Send SETTINGS");
570   PW_TRY(connection_.send_queue_.SendBytes(AsBytes(server_frame)));
571 
572   // We must ack the client's SETTINGS frame *after* sending our SETTINGS.
573   PW_TRY(SendSettingsAck(connection_.send_queue_));
574 
575   received_connection_preface_ = true;
576   PW_LOG_DEBUG("Conn.Preface complete");
577   return OkStatus();
578 }
579 
580 // RFC 9113 §6.1
ProcessDataFrame(const FrameHeader & frame)581 Status Connection::Reader::ProcessDataFrame(const FrameHeader& frame) {
582   PW_LOG_DEBUG("Conn.Recv DATA id=%" PRIu32 " flags=0x%x len=%" PRIu32,
583                frame.stream_id,
584                frame.flags,
585                frame.payload_length);
586 
587   if (frame.stream_id == 0) {
588     // RFC 9113 §6.1: "If a DATA frame is received whose Stream Identifier field
589     // is 0x00, the recipient MUST respond with a connection error of type
590     // PROTOCOL_ERROR."
591     SendGoAway(Http2Error::PROTOCOL_ERROR);
592     return Status::Internal();
593   }
594 
595   // From RFC 9113 §6.9: "A receiver that receives a flow-controlled frame MUST
596   // always account for its contribution against the connection flow-control
597   // window, unless the receiver treats this as a connection error. This is
598   // necessary even if the frame is in error. The sender counts the frame toward
599   // the flow-control window, but if the receiver does not, the flow-control
600   // window at the sender and receiver can become different."
601   //
602   // To simplify this, we send WINDOW_UPDATE frames eagerly.
603   //
604   // In the future we should do something less chatty.
605   PW_TRY(SendWindowUpdates(
606       connection_.send_queue_, frame.stream_id, frame.payload_length));
607 
608   {
609     auto state = connection_.LockState();
610     auto stream = state->LookupStream(frame.stream_id);
611     if (!stream.ok()) {
612       PW_LOG_DEBUG("Ignoring DATA on closed stream id=%" PRIu32,
613                    frame.stream_id);
614       PW_TRY(ProcessIgnoredFrame(frame));
615       // Stream has been fully closed: silently ignore.
616       return OkStatus();
617     }
618 
619     if (stream->get().half_closed) {
620       PW_LOG_ERROR("Recv DATA on half-closed stream id=%" PRIu32,
621                    frame.stream_id);
622       PW_TRY(ProcessIgnoredFrame(frame));
623       // RFC 9113 §6.1: "If a DATA frame is received whose stream is not in the
624       // "open" or "half-closed (local)" state, the recipient MUST respond with
625       // a stream error of type STREAM_CLOSED."
626       PW_TRY(SendRstStreamAndClose(stream->get(), Http2Error::STREAM_CLOSED));
627       return OkStatus();
628     }
629   }
630 
631   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
632 
633   // Drop padding.
634   if ((frame.flags & FLAGS_PADDED) != 0) {
635     if (payload.size() < 1) {
636       // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
637       // if a frame ... is too small to contain mandatory frame data."
638       SendGoAway(Http2Error::FRAME_SIZE_ERROR);
639       return Status::Internal();
640     }
641 
642     uint32_t pad_length = static_cast<uint32_t>(payload[0]);
643     if (pad_length >= frame.payload_length) {
644       // RFC 9113 §6.1: "If the length of the padding is the length of the frame
645       // payload or greater, the recipient MUST treat this as a connection error
646       // of type PROTOCOL_ERROR."
647       SendGoAway(Http2Error::PROTOCOL_ERROR);
648       return Status::Internal();
649     }
650     payload = payload.subspan(1, payload.size() - pad_length - 1);
651   }
652 
653   auto state = connection_.LockState();
654   auto maybe_stream = state->LookupStream(frame.stream_id);
655   if (!maybe_stream.ok()) {
656     return OkStatus();
657   }
658   Stream* stream = &maybe_stream->get();
659 
660   // Parse repeated grpc Length-Prefix-Message.
661   // https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md#requests
662   while (!payload.empty()) {
663     uint32_t message_length;
664 
665     // If we aren't reassembling a message, read the next length prefix.
666     if (!stream->assembly_buffer) {
667       size_t read = std::min(5 - static_cast<size_t>(stream->prefix_received),
668                              payload.size());
669       std::copy(payload.begin(),
670                 payload.begin() + read,
671                 stream->prefix_buffer.data() + stream->prefix_received);
672       stream->prefix_received += read;
673       payload = payload.subspan(read);
674 
675       // Read the length prefix.
676       if (stream->prefix_received < 5) {
677         continue;
678       }
679       stream->prefix_received = 0;
680 
681       ByteBuilder builder(stream->prefix_buffer);
682       auto it = builder.begin();
683       auto message_compressed = it.ReadUint8();
684       message_length = it.ReadUint32(endian::big);
685       if (message_compressed != 0) {
686         PW_LOG_ERROR("Unsupported: grpc message is compressed");
687         PW_TRY(SendRstStreamAndClose(*stream, Http2Error::INTERNAL_ERROR));
688         return OkStatus();
689       }
690 
691       if (message_length > payload.size()) {
692         // gRPC message is split across DATA frames, must allocate buffer.
693         if (!state->message_assembly_allocator_) {
694           PW_LOG_ERROR(
695               "Unsupported: split grpc message without allocator provided");
696           PW_TRY(SendRstStreamAndClose(*stream, Http2Error::INTERNAL_ERROR));
697           return OkStatus();
698         }
699 
700         stream->assembly_buffer = static_cast<std::byte*>(
701             state->message_assembly_allocator_->Allocate(
702                 allocator::Layout(message_length)));
703         if (stream->assembly_buffer == nullptr) {
704           PW_LOG_ERROR("Partial message reassembly buffer allocation failed");
705           PW_TRY(SendRstStreamAndClose(*stream, Http2Error::INTERNAL_ERROR));
706           return OkStatus();
707         }
708         stream->message_length = message_length;
709         stream->message_received = 0;
710         continue;
711       }
712     }
713 
714     pw::ByteSpan message;
715 
716     // Reading message payload.
717     if (stream->assembly_buffer != nullptr) {
718       uint32_t read =
719           std::min(stream->message_length - stream->message_received,
720                    static_cast<uint32_t>(payload.size()));
721       std::copy(payload.begin(),
722                 payload.begin() + read,
723                 stream->assembly_buffer + stream->message_received);
724       payload = payload.subspan(read);
725       stream->message_received += read;
726       if (stream->message_received < stream->message_length) {
727         continue;
728       }
729       // Fully received message.
730       message = pw::span(stream->assembly_buffer, stream->message_length);
731     } else {
732       message = payload.subspan(0, message_length);
733       payload = payload.subspan(message_length);
734     }
735 
736     // Release state lock before callback, reacquire after.
737     connection_.UnlockState(std::move(state));
738     const auto status = callbacks_.OnMessage(frame.stream_id, message);
739     state = connection_.LockState();
740     maybe_stream = state->LookupStream(frame.stream_id);
741     if (!maybe_stream.ok()) {
742       return OkStatus();
743     }
744     stream = &maybe_stream->get();
745 
746     if (!status.ok()) {
747       PW_TRY(SendRstStreamAndClose(*stream, Http2Error::INTERNAL_ERROR));
748       return OkStatus();
749     }
750 
751     if (stream->assembly_buffer != nullptr) {
752       state->message_assembly_allocator_->Deallocate(stream->assembly_buffer);
753       stream->assembly_buffer = nullptr;
754       stream->message_length = 0;
755       stream->message_received = 0;
756     }
757   }
758 
759   // grpc requires every request stream to end with an empty DATA frame with
760   // FLAGS_END_STREAM. If a client sends FLAGS_END_STREAM with a non-empty
761   // payload, it's not specified how the server should respond. We choose to
762   // accept the payload before ending the stream.
763   // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
764   if ((frame.flags & FLAGS_END_STREAM) != 0) {
765     stream->half_closed = true;
766     connection_.UnlockState(std::move(state));
767     callbacks_.OnHalfClose(frame.stream_id);
768   }
769 
770   return OkStatus();
771 }
772 
773 // RFC 9113 §6.2
ProcessHeadersFrame(const FrameHeader & frame)774 Status Connection::Reader::ProcessHeadersFrame(const FrameHeader& frame) {
775   PW_LOG_DEBUG("Conn.Recv HEADERS id=%" PRIu32 " len=%" PRIu32,
776                frame.stream_id,
777                frame.payload_length);
778 
779   if (frame.stream_id == 0) {
780     // RFC 9113 §6.2: "If a HEADERS frame is received whose Stream Identifier
781     // field is 0x00, the recipient MUST respond with a connection error of type
782     // PROTOCOL_ERROR."
783     SendGoAway(Http2Error::PROTOCOL_ERROR);
784     return Status::Internal();
785   }
786   if (frame.stream_id % 2 != 1 || frame.stream_id <= last_stream_id_) {
787     // RFC 9113 §5.1.1: "Streams initiated by a client MUST use odd-numbered
788     // stream identifiers ... The identifier of a newly established stream MUST
789     // be numerically greater than all streams that the initiating endpoint has
790     // opened ... An endpoint that receives an unexpected stream identifier MUST
791     // respond with a connection error of type PROTOCOL_ERROR."
792     SendGoAway(Http2Error::PROTOCOL_ERROR);
793     return Status::Internal();
794   }
795 
796   last_stream_id_ = frame.stream_id;
797 
798   {
799     auto state = connection_.LockState();
800     if (auto stream = state->LookupStream(frame.stream_id); stream.ok()) {
801       PW_LOG_DEBUG("Client sent HEADERS after the first stream message");
802       PW_TRY(ProcessIgnoredFrame(frame));
803       // grpc requests cannot contain trailers.
804       // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
805       PW_TRY(SendRstStreamAndClose(stream->get(), Http2Error::PROTOCOL_ERROR));
806       return OkStatus();
807     }
808   }
809 
810   if ((frame.flags & FLAGS_END_STREAM) != 0) {
811     PW_LOG_DEBUG("Client sent HEADERS with END_STREAM");
812     PW_TRY(ProcessIgnoredFrame(frame));
813     // grpc requests must send END_STREAM in an empty DATA frame.
814     // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
815     PW_TRY(SendRstStream(
816         connection_.send_queue_, frame.stream_id, Http2Error::PROTOCOL_ERROR));
817     return OkStatus();
818   }
819   if ((frame.flags & FLAGS_END_HEADERS) == 0) {
820     PW_LOG_ERROR("Client sent HEADERS frame without END_HEADERS: unsupported");
821     SendGoAway(Http2Error::INTERNAL_ERROR);
822     return Status::Internal();
823   }
824 
825   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
826 
827   // Drop padding.
828   if ((frame.flags & FLAGS_PADDED) != 0) {
829     if (payload.size() < 1) {
830       // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
831       // if a frame ... is too small to contain mandatory frame data. A frame
832       // size error in a frame that could alter the state of the entire
833       // connection MUST be treated as a connection error"
834       SendGoAway(Http2Error::FRAME_SIZE_ERROR);
835       return Status::Internal();
836     }
837     uint32_t pad_length = static_cast<uint32_t>(payload[0]);
838     if (pad_length >= frame.payload_length) {
839       // RFC 9113 §6.2: "If the length of the padding is the length of the frame
840       // payload or greater, the recipient MUST treat this as a connection error
841       // of type PROTOCOL_ERROR."
842       SendGoAway(Http2Error::PROTOCOL_ERROR);
843       return Status::Internal();
844     }
845     payload = payload.subspan(1, payload.size() - pad_length - 1);
846   }
847 
848   // Drop priority fields.
849   if ((frame.flags & FLAGS_PRIORITY) != 0) {
850     if (payload.size() < 5) {
851       // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
852       // if a frame ... is too small to contain mandatory frame data."
853       SendGoAway(Http2Error::FRAME_SIZE_ERROR);
854       return Status::Internal();
855     }
856     payload = payload.subspan(5);
857   }
858 
859   PW_TRY_ASSIGN(auto method_name, HpackParseRequestHeaders(payload));
860   if (!CreateStream(frame.stream_id).ok()) {
861     PW_LOG_WARN("Too many streams, rejecting id=%" PRIu32, frame.stream_id);
862     return SendRstStream(
863         connection_.send_queue_, frame.stream_id, Http2Error::REFUSED_STREAM);
864   }
865 
866   if (const auto status = callbacks_.OnNew(frame.stream_id, method_name);
867       !status.ok()) {
868     auto state = connection_.LockState();
869     if (auto stream = state->LookupStream(frame.stream_id); stream.ok()) {
870       return SendRstStreamAndClose(stream->get(), Http2Error::INTERNAL_ERROR);
871     }
872   }
873 
874   return OkStatus();
875 }
876 
877 // RFC 9113 §6.4
ProcessRstStreamFrame(const FrameHeader & frame)878 Status Connection::Reader::ProcessRstStreamFrame(const FrameHeader& frame) {
879   PW_LOG_DEBUG("Conn.Recv RST_STREAM id=%" PRIu32 " len=%" PRIu32,
880                frame.stream_id,
881                frame.payload_length);
882 
883   if (frame.stream_id == 0) {
884     // RFC 9113 §6.4: "If a RST_STREAM frame is received with a stream
885     // identifier of 0x00, the recipient MUST treat this as a connection error
886     // of type PROTOCOL_ERROR".
887     SendGoAway(Http2Error::PROTOCOL_ERROR);
888     return Status::Internal();
889   }
890   if (frame.stream_id > last_stream_id_) {
891     // RFC 9113 §6.4: "If a RST_STREAM frame identifying an idle stream is
892     // received, the recipient MUST treat this as a connection error of type
893     // PROTOCOL_ERROR."
894     SendGoAway(Http2Error::PROTOCOL_ERROR);
895     return Status::Internal();
896   }
897   if (frame.payload_length != 4) {
898     // RFC 9113 §6.4: "A RST_STREAM frame with a length other than 4 octets MUST
899     // be treated as a connection error of type FRAME_SIZE_ERROR."
900     SendGoAway(Http2Error::FRAME_SIZE_ERROR);
901     return Status::Internal();
902   }
903 
904   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
905   ByteBuilder builder(payload);
906   auto error_code = builder.begin().ReadUint32(endian::big);
907 
908   PW_LOG_DEBUG("Conn.RstStream id=%" PRIu32 " error=%" PRIu32,
909                frame.stream_id,
910                error_code);
911   auto state = connection_.LockState();
912   if (auto stream = state->LookupStream(frame.stream_id); stream.ok()) {
913     CloseStream(stream->get());
914   }
915   return OkStatus();
916 }
917 
918 // RFC 9113 §6.5
ProcessSettingsFrame(const FrameHeader & frame,bool send_ack)919 Status Connection::Reader::ProcessSettingsFrame(const FrameHeader& frame,
920                                                 bool send_ack) {
921   PW_LOG_DEBUG("Conn.Recv SETTINGS len=%" PRIu32 " flags=0x%x",
922                frame.payload_length,
923                frame.flags);
924 
925   if ((frame.flags & FLAGS_ACK) != 0) {
926     // RFC 9113 §6.5: "Receipt of a SETTINGS frame with the ACK flag set and a
927     // length field value other than 0 MUST be treated as a connection error of
928     // type FRAME_SIZE_ERROR."
929     if (frame.payload_length != 0) {
930       PW_LOG_ERROR("Invalid SETTINGS frame: has ACK with non-empty payload");
931       SendGoAway(Http2Error::FRAME_SIZE_ERROR);
932       return Status::Internal();
933     }
934     // Don't ACK an ACK.
935     send_ack = false;
936   } else {
937     // RFC 9113 §6.5: "A SETTINGS frame with a length other than a multiple of 6
938     // octets MUST be treated as a connection error of type FRAME_SIZE_ERROR."
939     if (frame.payload_length % 6 != 0) {
940       PW_LOG_ERROR("Invalid SETTINGS frame: payload size invalid");
941       SendGoAway(Http2Error::FRAME_SIZE_ERROR);
942       return Status::Internal();
943     }
944   }
945 
946   if (frame.stream_id != 0) {
947     // RFC 9113 §6.5: "If an endpoint receives a SETTINGS frame whose Stream
948     // Identifier field is anything other than 0x00, the endpoint MUST respond
949     // with a connection error of type PROTOCOL_ERROR."
950     SendGoAway(Http2Error::PROTOCOL_ERROR);
951     return Status::Internal();
952   }
953 
954   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
955 
956   // RFC 9113 §6.5.2
957   ByteBuilder builder(payload);
958   for (auto it = builder.begin(); it != builder.end();) {
959     auto id = it.ReadUint16(endian::big);
960     auto value = it.ReadUint32(endian::big);
961     PW_LOG_DEBUG("Applying SETTING id=%" PRIu16 " value=%" PRIu32, id, value);
962     switch (id) {
963       case SETTINGS_INITIAL_WINDOW_SIZE: {
964         // RFC 9113 §6.5.2: "Values above the maximum flow-control window size
965         // of 2^31-1 MUST be treated as a connection error of type
966         // FLOW_CONTROL_ERROR."
967         if ((value & (1 << 31)) != 0) {
968           SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
969           return Status::Internal();
970         }
971         // RFC 9113 §6.9.2: "When the value of SETTINGS_INITIAL_WINDOW_SIZE
972         // changes, a receiver MUST adjust the size of all stream flow-control
973         // windows that it maintains by the difference between the new value and
974         // the old value."
975         int32_t newval = static_cast<int32_t>(value);
976         int32_t delta = newval - initial_send_window_;
977         auto state = connection_.LockState();
978         for (size_t i = 0; i < state->streams.size(); i++) {
979           if (state->streams[i].id == 0) {
980             continue;
981           }
982           if (PW_ADD_OVERFLOW(state->streams[i].send_window,
983                               delta,
984                               &state->streams[i].send_window)) {
985             SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
986             return Status::Internal();
987           }
988         }
989         initial_send_window_ = newval;
990         break;
991       }
992       case SETTINGS_MAX_FRAME_SIZE:
993         // RFC 9113 §6.5.2: "Values outside this range MUST be treated as a
994         // connection error of type PROTOCOL_ERROR".
995         if (value < 16384 || 16777215 < value) {
996           SendGoAway(Http2Error::PROTOCOL_ERROR);
997           return Status::Internal();
998         }
999         // We never send frame payloads larger than 16384, so we don't need to
1000         // track the client's preference.
1001         break;
1002       // Ignore these.
1003       // SETTINGS_HEADER_TABLE_SIZE: our responses don't use the dynamic table
1004       // SETTINGS_ENABLE_PUSH: we don't support push
1005       // SETTINGS_MAX_CONCURRENT_STREAMS: we don't support push
1006       // SETTINGS_MAX_HEADER_LIST_SIZE: we send very tiny response HEADERS
1007       default:
1008         break;
1009     }
1010   }
1011 
1012   if (send_ack) {
1013     PW_TRY(SendSettingsAck(connection_.send_queue_));
1014   }
1015 
1016   return OkStatus();
1017 }
1018 
1019 // RFC 9113 §6.7
ProcessPingFrame(const FrameHeader & frame)1020 Status Connection::Reader::ProcessPingFrame(const FrameHeader& frame) {
1021   PW_LOG_DEBUG("Conn.Recv PING len=%" PRIu32, frame.payload_length);
1022 
1023   if (frame.stream_id != 0) {
1024     // RFC 9113 §6.7: "If a PING frame is received with a Stream Identifier
1025     // field value other than 0x00, the recipient MUST respond with a connection
1026     // error of type PROTOCOL_ERROR."
1027     SendGoAway(Http2Error::PROTOCOL_ERROR);
1028     return Status::Internal();
1029   }
1030   if (frame.payload_length != 8) {
1031     // RFC 9113 §6.7: "Receipt of a PING frame with a length field value other
1032     // than 8 MUST be treated as a connection error of type FRAME_SIZE_ERROR."
1033     SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1034     return Status::Internal();
1035   }
1036 
1037   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1038 
1039   // Don't ACK an ACK.
1040   if ((frame.flags & FLAGS_ACK) != 0) {
1041     return OkStatus();
1042   }
1043 
1044   // Send an ACK.
1045   PW_PACKED(struct) PingFrame {
1046     WireFrameHeader header;
1047     uint64_t opaque_data;
1048   };
1049   ByteBuilder builder(payload);
1050   PingFrame ack_frame = {
1051       .header = WireFrameHeader(FrameHeader{
1052           .payload_length = 8,
1053           .type = FrameType::PING,
1054           .flags = FLAGS_ACK,
1055           .stream_id = 0,
1056       }),
1057       // Since we're going to echo this, read as native endian so it gets echoed
1058       // exactly as-is.
1059       .opaque_data = builder.begin().ReadUint64(endian::native),
1060   };
1061   PW_TRY(connection_.send_queue_.SendBytes(AsBytes(ack_frame)));
1062   return OkStatus();
1063 }
1064 
1065 // RFC 9113 §6.9
ProcessWindowUpdateFrame(const FrameHeader & frame)1066 Status Connection::Reader::ProcessWindowUpdateFrame(const FrameHeader& frame) {
1067   PW_LOG_DEBUG("Conn.Recv WINDOW_UPDATE id=%" PRIu32 " len=%" PRIu32,
1068                frame.stream_id,
1069                frame.payload_length);
1070 
1071   if (frame.payload_length != 4) {
1072     // RFC 9113 §6.9: "A WINDOW_UPDATE frame with a length other than 4 octets
1073     // MUST be treated as a connection error of type FRAME_SIZE_ERROR."
1074     SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1075     return Status::Internal();
1076   }
1077 
1078   // Read window size increment.
1079   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1080   ByteBuilder builder(payload);
1081   int32_t delta = static_cast<int32_t>(builder.begin().ReadUint32(endian::big) &
1082                                        0x7fffffff);
1083 
1084   auto state = connection_.LockState();
1085   auto stream = state->LookupStream(frame.stream_id);
1086 
1087   if (delta == 0) {
1088     // RFC 9113 §6.9: "A receiver MUST treat a WINDOW_UPDATE frame with an
1089     // increment of 0 as a stream error of type PROTOCOL_ERROR; errors on the
1090     // connection flow-control window MUST be treated as a connection error."
1091     if (frame.stream_id == 0) {
1092       SendGoAway(Http2Error::PROTOCOL_ERROR);
1093       return Status::Internal();
1094     } else {
1095       if (!stream.ok()) {
1096         // Already closed
1097         return OkStatus();
1098       }
1099       PW_TRY(SendRstStreamAndClose(stream->get(), Http2Error::PROTOCOL_ERROR));
1100       return OkStatus();
1101     }
1102   }
1103 
1104   // RFC 9113 §6.9.1: "If a sender receives a WINDOW_UPDATE that causes a
1105   // flow-control window to exceed 2^31-1 bytes, it MUST terminate either the
1106   // stream or the connection, as appropriate ... with an error code of
1107   // FLOW_CONTROL_ERROR"
1108   if (frame.stream_id == 0) {
1109     if (PW_ADD_OVERFLOW(state->connection_send_window,
1110                         delta,
1111                         &state->connection_send_window)) {
1112       SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
1113       return Status::Internal();
1114     }
1115   } else if (stream.ok()) {
1116     if (PW_ADD_OVERFLOW(
1117             stream->get().send_window, delta, &stream->get().send_window)) {
1118       PW_TRY(
1119           SendRstStreamAndClose(stream->get(), Http2Error::FLOW_CONTROL_ERROR));
1120       return OkStatus();
1121     }
1122   }
1123 
1124   return OkStatus();
1125 }
1126 
1127 // Advance past the payload.
ProcessIgnoredFrame(const FrameHeader & frame)1128 Status Connection::Reader::ProcessIgnoredFrame(const FrameHeader& frame) {
1129   PW_TRY(ReadFramePayload(frame));
1130   return OkStatus();
1131 }
1132 
ReadFramePayload(const FrameHeader & frame)1133 Result<ByteSpan> Connection::Reader::ReadFramePayload(
1134     const FrameHeader& frame) {
1135   if (frame.payload_length == 0) {
1136     return ByteSpan();
1137   }
1138   if (frame.payload_length > payload_scratch_.size()) {
1139     PW_LOG_ERROR("Frame type=%d payload too large: %" PRIu32 " > %" PRIu32,
1140                  static_cast<int>(frame.type),
1141                  frame.payload_length,
1142                  static_cast<uint32_t>(payload_scratch_.size()));
1143     SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1144     return Status::Internal();
1145   }
1146   auto payload = span{payload_scratch_}.subspan(0, frame.payload_length);
1147   PW_TRY(ReadExactly(connection_.socket_.as_reader(), payload));
1148   return payload;
1149 }
1150 
1151 // RFC 9113 §6.8
SendGoAway(Http2Error code)1152 void Connection::Reader::SendGoAway(Http2Error code) {
1153   if (!received_connection_preface_) {
1154     // RFC 9113 §3.4: "A GOAWAY frame MAY be omitted in this case, since an
1155     // invalid preface indicates that the peer is not using HTTP/2."
1156     return;
1157   }
1158 
1159   // Close all open streams.
1160   {
1161     auto state = connection_.LockState();
1162     for (size_t i = 0; i < state->streams.size(); i++) {
1163       if (state->streams[i].id != 0) {
1164         CloseStream(state->streams[i]);
1165       }
1166     }
1167   }
1168 
1169   PW_PACKED(struct) GoAwayFrame {
1170     WireFrameHeader header;
1171     uint32_t last_stream_id;
1172     uint32_t error_code;
1173   };
1174   GoAwayFrame frame{
1175       .header = WireFrameHeader(FrameHeader{
1176           .payload_length = 8,
1177           .type = FrameType::GOAWAY,
1178           .flags = 0,
1179           .stream_id = 0,
1180       }),
1181       .last_stream_id = ToNetworkOrder(last_stream_id_),
1182       .error_code = ToNetworkOrder(code),
1183   };
1184   // Ignore errors since we're about to close the connection anyway.
1185   connection_.send_queue_.SendBytes(AsBytes(frame)).IgnoreError();
1186 }
1187 
1188 // RFC 9113 §6.4
SendRstStreamAndClose(Stream & stream,Http2Error code)1189 Status Connection::Reader::SendRstStreamAndClose(Stream& stream,
1190                                                  Http2Error code) {
1191   // Ignore errors as we are closing anyways.
1192   SendRstStream(connection_.send_queue_, stream.id, code).IgnoreError();
1193   CloseStream(stream);
1194   return OkStatus();
1195 }
1196 
1197 }  // namespace pw::grpc
1198