1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_grpc/connection.h"
16
17 #include <cinttypes>
18 #include <cstring>
19 #include <string_view>
20 #include <type_traits>
21
22 #include "pw_assert/check.h"
23 #include "pw_chrono/system_clock.h"
24 #include "pw_grpc_private/hpack.h"
25 #include "pw_log/log.h"
26 #include "pw_preprocessor/compiler.h"
27 #include "pw_status/try.h"
28
29 namespace pw::grpc {
30 namespace internal {
31
32 // RFC 9113 §6
33 // Enum names left in naming style of RFC
34 enum class FrameType : uint8_t {
35 DATA = 0x00,
36 HEADERS = 0x01,
37 PRIORITY = 0x02,
38 RST_STREAM = 0x03,
39 SETTINGS = 0x04,
40 PUSH_PROMISE = 0x05,
41 PING = 0x06,
42 GOAWAY = 0x07,
43 WINDOW_UPDATE = 0x08,
44 CONTINUATION = 0x09,
45 };
46
47 // RFC 9113 §4.1
48 constexpr size_t kFrameHeaderEncodedSize = 9;
49 struct FrameHeader {
50 uint32_t payload_length;
51 FrameType type;
52 uint8_t flags;
53 StreamId stream_id;
54 };
55
56 // RFC 9113 §7
57 // Enum names left in naming style of RFC
58 enum class Http2Error : uint32_t {
59 NO_ERROR = 0x00,
60 PROTOCOL_ERROR = 0x01,
61 INTERNAL_ERROR = 0x02,
62 FLOW_CONTROL_ERROR = 0x03,
63 SETTINGS_TIMEOUT = 0x04,
64 STREAM_CLOSED = 0x05,
65 FRAME_SIZE_ERROR = 0x06,
66 REFUSED_STREAM = 0x07,
67 CANCEL = 0x08,
68 COMPRESSION_ERROR = 0x09,
69 CONNECT_ERROR = 0x0a,
70 ENHANCE_YOUR_CALM = 0x0b,
71 INADEQUATE_SECURITY = 0x0c,
72 HTTP_1_1_REQUIRED = 0x0d,
73 };
74
75 } // namespace internal
76
77 namespace {
78
79 using internal::FrameHeader;
80 using internal::FrameType;
81 using internal::Http2Error;
82 using internal::kMaxConcurrentStreams;
83 using internal::kMaxGrpcMessageSize;
84
85 // RFC 9113 §3.4
86 constexpr std::string_view kExpectedConnectionPrefaceLiteral(
87 "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
88
89 static_assert(kMaxMethodNameSize == kHpackMaxStringSize);
90
91 enum {
92 FLAGS_ACK = 0x01,
93 FLAGS_END_STREAM = 0x01,
94 FLAGS_END_HEADERS = 0x04,
95 FLAGS_PADDED = 0x08,
96 FLAGS_PRIORITY = 0x20,
97 };
98
99 // RFC 9113 §6.5.2
100 enum SettingType : uint16_t {
101 SETTINGS_HEADER_TABLE_SIZE = 0x01,
102 SETTINGS_ENABLE_PUSH = 0x02,
103 SETTINGS_MAX_CONCURRENT_STREAMS = 0x03,
104 SETTINGS_INITIAL_WINDOW_SIZE = 0x04,
105 SETTINGS_MAX_FRAME_SIZE = 0x05,
106 SETTINGS_MAX_HEADER_LIST_SIZE = 0x06,
107 };
108
ReadExactly(stream::Reader & reader,ByteSpan buffer)109 Status ReadExactly(stream::Reader& reader, ByteSpan buffer) {
110 size_t bytes_read = 0;
111 while (bytes_read < buffer.size()) {
112 PW_TRY_ASSIGN(auto out, reader.Read(buffer.subspan(bytes_read)));
113 bytes_read += out.size();
114 }
115 return OkStatus();
116 }
117
ReadFrameHeader(stream::Reader & reader)118 Result<FrameHeader> ReadFrameHeader(stream::Reader& reader) {
119 std::array<std::byte, internal::kFrameHeaderEncodedSize> buffer;
120 PW_TRY(ReadExactly(reader, buffer));
121
122 // RFC 9113 §4.1
123 FrameHeader out;
124 ByteBuilder builder(as_writable_bytes(span{buffer}));
125 auto it = builder.begin();
126 auto type_and_length = it.ReadUint32(endian::big);
127 out.payload_length = type_and_length >> 8;
128 out.type = static_cast<FrameType>(type_and_length & 0xff);
129 out.flags = it.ReadUint8();
130 out.stream_id = it.ReadUint32(endian::big) & 0x7fffffff;
131 return out;
132 }
133
134 template <typename T, std::enable_if_t<std::is_integral_v<T>, bool> = true>
ToNetworkOrder(T value)135 constexpr T ToNetworkOrder(T value) {
136 return bytes::ConvertOrder(/*from=*/endian::native,
137 /*to=*/endian::big,
138 value);
139 }
140
141 template <typename T, std::enable_if_t<std::is_enum_v<T>, bool> = true>
ToNetworkOrder(T value)142 constexpr std::underlying_type_t<T> ToNetworkOrder(T value) {
143 return ToNetworkOrder(static_cast<std::underlying_type_t<T>>(value));
144 }
145
146 // Use this instead of FrameHeader when writing frames.
PW_PACKED(struct)147 PW_PACKED(struct) WireFrameHeader {
148 WireFrameHeader(FrameHeader h)
149 : payload_length_and_type(ToNetworkOrder(h.payload_length << 8 |
150 static_cast<uint32_t>(h.type))),
151 flags(h.flags),
152 stream_id(ToNetworkOrder(h.stream_id)) {}
153
154 uint32_t payload_length_and_type;
155 uint8_t flags;
156 uint32_t stream_id;
157 };
158
159 template <typename T>
AsBytes(T & object)160 ConstByteSpan AsBytes(T& object) {
161 return as_bytes(span<T, 1>{&object, 1});
162 }
163
164 // RFC 9113 §6.1
SendData(SendQueue & send_queue,StreamId stream_id,ConstByteSpan payload1,ConstByteSpan payload2)165 Status SendData(SendQueue& send_queue,
166 StreamId stream_id,
167 ConstByteSpan payload1,
168 ConstByteSpan payload2) {
169 PW_LOG_DEBUG("Conn.Send DATA with id=%" PRIu32 " len1=%" PRIu32
170 " len2=%" PRIu32,
171 stream_id,
172 static_cast<uint32_t>(payload1.size()),
173 static_cast<uint32_t>(payload2.size()));
174 WireFrameHeader frame(FrameHeader{
175 .payload_length =
176 static_cast<uint32_t>(payload1.size() + payload2.size()),
177 .type = FrameType::DATA,
178 .flags = 0,
179 .stream_id = stream_id,
180 });
181 std::array<ConstByteSpan, 3> data_vector = {AsBytes(frame)};
182 size_t i = 1;
183 if (!payload1.empty()) {
184 data_vector[i++] = payload1;
185 }
186 if (!payload2.empty()) {
187 data_vector[i++] = payload2;
188 }
189 PW_TRY(send_queue.SendBytesVector(span{data_vector.data(), i}));
190 return OkStatus();
191 }
192
193 // RFC 9113 §6.2
SendHeaders(SendQueue & send_queue,StreamId stream_id,ConstByteSpan payload1,ConstByteSpan payload2,bool end_stream)194 Status SendHeaders(SendQueue& send_queue,
195 StreamId stream_id,
196 ConstByteSpan payload1,
197 ConstByteSpan payload2,
198 bool end_stream) {
199 PW_LOG_DEBUG("Conn.Send HEADERS with id=%" PRIu32 " len1=%" PRIu32
200 " len2=%" PRIu32 " end=%d",
201 stream_id,
202 static_cast<uint32_t>(payload1.size()),
203 static_cast<uint32_t>(payload2.size()),
204 end_stream);
205 WireFrameHeader frame(FrameHeader{
206 .payload_length =
207 static_cast<uint32_t>(payload1.size() + payload2.size()),
208 .type = FrameType::HEADERS,
209 .flags = FLAGS_END_HEADERS,
210 .stream_id = stream_id,
211 });
212
213 if (end_stream) {
214 frame.flags |= FLAGS_END_STREAM;
215 }
216
217 std::array<ConstByteSpan, 3> headers_vector = {AsBytes(frame)};
218 size_t i = 1;
219 if (!payload1.empty()) {
220 headers_vector[i++] = payload1;
221 }
222 if (!payload2.empty()) {
223 headers_vector[i++] = payload2;
224 }
225 PW_TRY(send_queue.SendBytesVector(span{headers_vector.data(), i}));
226
227 return OkStatus();
228 }
229
230 // RFC 9113 §6.4
SendRstStream(SendQueue & send_queue,StreamId stream_id,Http2Error code)231 Status SendRstStream(SendQueue& send_queue,
232 StreamId stream_id,
233 Http2Error code) {
234 PW_PACKED(struct) RstStreamFrame {
235 WireFrameHeader header;
236 uint32_t error_code;
237 };
238 RstStreamFrame frame{
239 .header = WireFrameHeader(FrameHeader{
240 .payload_length = 4,
241 .type = FrameType::RST_STREAM,
242 .flags = 0,
243 .stream_id = stream_id,
244 }),
245 .error_code = ToNetworkOrder(code),
246 };
247 PW_TRY(send_queue.SendBytes(AsBytes(frame)));
248
249 return OkStatus();
250 }
251
252 // RFC 9113 §6.9
SendWindowUpdates(SendQueue & send_queue,StreamId stream_id,uint32_t increment)253 Status SendWindowUpdates(SendQueue& send_queue,
254 StreamId stream_id,
255 uint32_t increment) {
256 // It is illegal to send updates with increment=0.
257 if (increment == 0) {
258 return OkStatus();
259 }
260 if (increment & 0x80000000) {
261 // Upper bit is reserved, error.
262 return Status::InvalidArgument();
263 }
264
265 PW_LOG_DEBUG("Conn.Send WINDOW_UPDATE frames with id=%" PRIu32
266 " increment=%" PRIu32,
267 stream_id,
268 increment);
269
270 PW_PACKED(struct) WindowUpdateFrame {
271 WireFrameHeader header;
272 uint32_t increment;
273 };
274 WindowUpdateFrame frames[2] = {
275 {
276 .header = WireFrameHeader(FrameHeader{
277 .payload_length = 4,
278 .type = FrameType::WINDOW_UPDATE,
279 .flags = 0,
280 .stream_id = 0,
281 }),
282 .increment = ToNetworkOrder(increment),
283 },
284 {
285 .header = WireFrameHeader(FrameHeader{
286 .payload_length = 4,
287 .type = FrameType::WINDOW_UPDATE,
288 .flags = 0,
289 .stream_id = stream_id,
290 }),
291 .increment = ToNetworkOrder(increment),
292 },
293 };
294 PW_TRY(send_queue.SendBytes(as_bytes(span{frames})));
295 return OkStatus();
296 }
297
298 // RFC 9113 §6.5
SendSettingsAck(SendQueue & send_queue)299 Status SendSettingsAck(SendQueue& send_queue) {
300 PW_LOG_DEBUG("Conn.Send SETTINGS ACK");
301 WireFrameHeader frame(FrameHeader{
302 .payload_length = 0,
303 .type = FrameType::SETTINGS,
304 .flags = FLAGS_ACK,
305 .stream_id = 0,
306 });
307 PW_TRY(send_queue.SendBytes(AsBytes(frame)));
308 return OkStatus();
309 }
310
311 } // namespace
312
Connection(stream::ReaderWriter & socket,SendQueue & send_queue,RequestCallbacks & callbacks,allocator::Allocator * message_assembly_allocator)313 Connection::Connection(stream::ReaderWriter& socket,
314 SendQueue& send_queue,
315 RequestCallbacks& callbacks,
316 allocator::Allocator* message_assembly_allocator)
317 : socket_(socket),
318 send_queue_(send_queue),
319 reader_(*this, callbacks),
320 writer_(*this) {
321 LockState()->message_assembly_allocator_ = message_assembly_allocator;
322 }
323
ProcessFrame()324 Status Connection::Reader::ProcessFrame() {
325 if (!received_connection_preface_) {
326 return Status::FailedPrecondition();
327 }
328
329 PW_TRY_ASSIGN(auto frame, ReadFrameHeader(connection_.socket_.as_reader()));
330 switch (frame.type) {
331 // Frames that we handle.
332 case FrameType::DATA:
333 PW_TRY(ProcessDataFrame(frame));
334 break;
335 case FrameType::HEADERS:
336 PW_TRY(ProcessHeadersFrame(frame));
337 break;
338 case FrameType::PRIORITY:
339 PW_TRY(ProcessIgnoredFrame(frame));
340 break;
341 case FrameType::RST_STREAM:
342 PW_TRY(ProcessRstStreamFrame(frame));
343 break;
344 case FrameType::SETTINGS:
345 PW_TRY(ProcessSettingsFrame(frame, /*send_ack=*/true));
346 break;
347 case FrameType::PING:
348 PW_TRY(ProcessPingFrame(frame));
349 break;
350 case FrameType::WINDOW_UPDATE:
351 PW_TRY(ProcessWindowUpdateFrame(frame));
352 break;
353
354 // Frames that trigger an immediate connection close.
355 case FrameType::GOAWAY:
356 PW_LOG_ERROR("Client sent GOAWAY");
357 // don't bother sending GOAWAY in response
358 return Status::Internal();
359 case FrameType::PUSH_PROMISE:
360 PW_LOG_ERROR("Client sent PUSH_PROMISE");
361 SendGoAway(Http2Error::PROTOCOL_ERROR);
362 return Status::Internal();
363 case FrameType::CONTINUATION:
364 PW_LOG_ERROR("Client sent CONTINUATION: unsupported");
365 SendGoAway(Http2Error::INTERNAL_ERROR);
366 return Status::Internal();
367 }
368
369 return OkStatus();
370 }
371
372 pw::Result<std::reference_wrapper<Connection::Stream>>
LookupStream(StreamId id)373 Connection::SharedState::LookupStream(StreamId id) {
374 for (size_t i = 0; i < streams.size(); i++) {
375 if (streams[i].id == id) {
376 return streams[i];
377 }
378 }
379 return Status::NotFound();
380 }
381
SendResponseMessage(StreamId stream_id,ConstByteSpan message)382 Status Connection::Writer::SendResponseMessage(StreamId stream_id,
383 ConstByteSpan message) {
384 auto state = connection_.LockState();
385 auto stream = state->LookupStream(stream_id);
386 if (!stream.ok()) {
387 return Status::NotFound();
388 }
389
390 if (message.size() > kMaxGrpcMessageSize) {
391 PW_LOG_WARN("Message %" PRIu32 " bytes on id=%" PRIu32
392 " exceeds maximum message size",
393 static_cast<uint32_t>(message.size()),
394 stream_id);
395 return Status::InvalidArgument();
396 }
397
398 // This should block until there is enough send window.
399 if (static_cast<int32_t>(message.size()) > stream->get().send_window ||
400 static_cast<int32_t>(message.size()) > state->connection_send_window) {
401 PW_LOG_WARN("Not enough window to send %" PRIu32 " bytes on id=%" PRIu32,
402 static_cast<uint32_t>(message.size()),
403 stream_id);
404 return Status::ResourceExhausted();
405 }
406
407 auto status = OkStatus();
408 if (!stream->get().started_response) {
409 stream->get().started_response = true;
410 status = SendHeaders(connection_.send_queue_,
411 stream_id,
412 ResponseHeadersPayload(),
413 ConstByteSpan(),
414 /*end_stream=*/false);
415 }
416 if (status.ok()) {
417 // Write a Length-Prefixed-Message payload.
418 ByteBuffer<5> prefix;
419 prefix.PutUint8(0);
420 prefix.PutUint32(message.size(), endian::big);
421 status = SendData(connection_.send_queue_, stream_id, prefix, message);
422 }
423 if (!status.ok()) {
424 PW_LOG_WARN("Failed sending response message on id=%" PRIu32 " error=%d",
425 stream_id,
426 status.code());
427 return Status::Unavailable();
428 }
429 stream->get().send_window -= message.size();
430 state->connection_send_window -= message.size();
431 return OkStatus();
432 }
433
SendResponseComplete(StreamId stream_id,Status response_code)434 Status Connection::Writer::SendResponseComplete(StreamId stream_id,
435 Status response_code) {
436 auto state = connection_.LockState();
437 auto stream = state->LookupStream(stream_id);
438 if (!stream.ok()) {
439 return Status::NotFound();
440 }
441
442 Status status;
443 if (!stream->get().started_response) {
444 // If the response has not started yet, we need to include the initial
445 // headers.
446 PW_LOG_DEBUG("Conn.SendResponseWithTrailers id=%" PRIu32 " code=%d",
447 stream_id,
448 response_code.code());
449 status = SendHeaders(connection_.send_queue_,
450 stream_id,
451 ResponseHeadersPayload(),
452 ResponseTrailersPayload(response_code),
453 /*end_stream=*/true);
454 } else {
455 PW_LOG_DEBUG("Conn.SendTrailers id=%" PRIu32 " code=%d",
456 stream_id,
457 response_code.code());
458 status = SendHeaders(connection_.send_queue_,
459 stream_id,
460 ConstByteSpan(),
461 ResponseTrailersPayload(response_code),
462 /*end_stream=*/true);
463 }
464
465 if (!status.ok()) {
466 PW_LOG_WARN("Failed sending response complete on id=%" PRIu32 " error=%d",
467 stream_id,
468 status.code());
469 return Status::Unavailable();
470 }
471
472 PW_LOG_DEBUG("Conn.CloseStream id=%" PRIu32, stream_id);
473 stream->get().Reset();
474
475 return OkStatus();
476 }
477
CreateStream(StreamId id)478 pw::Status Connection::Reader::CreateStream(StreamId id) {
479 auto state = connection_.LockState();
480 for (size_t i = 0; i < state->streams.size(); i++) {
481 if (state->streams[i].id != 0) {
482 continue;
483 }
484 PW_LOG_DEBUG("Conn.CreateStream id=%" PRIu32 " at slot=%" PRIu32,
485 id,
486 static_cast<uint32_t>(i));
487 state->streams[i].id = id;
488 state->streams[i].half_closed = false;
489 state->streams[i].started_response = false;
490 state->streams[i].send_window = initial_send_window_;
491 return OkStatus();
492 }
493 PW_LOG_WARN("Conn.CreateStream id=%" PRIu32 " OUT OF SPACE", id);
494 return Status::ResourceExhausted();
495 }
496
CloseStream(Connection::Stream & stream)497 void Connection::Reader::CloseStream(Connection::Stream& stream) {
498 StreamId id = stream.id;
499 PW_LOG_DEBUG("Conn.CloseStream id=%" PRIu32, id);
500 stream.Reset();
501 callbacks_.OnCancel(id);
502 }
503
504 // RFC 9113 §3.4
ProcessConnectionPreface()505 Status Connection::Reader::ProcessConnectionPreface() {
506 if (received_connection_preface_) {
507 return OkStatus();
508 }
509
510 callbacks_.OnNewConnection();
511
512 // The preface starts with a literal string.
513 auto literal = span{payload_scratch_}.subspan(
514 0, kExpectedConnectionPrefaceLiteral.size());
515
516 PW_TRY(ReadExactly(connection_.socket_.as_reader(), literal));
517 if (std::memcmp(literal.data(),
518 kExpectedConnectionPrefaceLiteral.data(),
519 kExpectedConnectionPrefaceLiteral.size()) != 0) {
520 PW_LOG_ERROR("Invalid connection preface literal");
521 return Status::Internal();
522 }
523
524 PW_LOG_DEBUG("Conn.Preface received literal");
525
526 // Client must send a SETTINGS frames.
527 PW_TRY_ASSIGN(auto client_frame,
528 ReadFrameHeader(connection_.socket_.as_reader()));
529 if (client_frame.type != FrameType::SETTINGS) {
530 PW_LOG_ERROR(
531 "Connection preface missing SETTINGS frame, found frame.type=%d",
532 static_cast<int>(client_frame.type));
533 return Status::Internal();
534 }
535
536 // Don't send an ACK yet, we'll do that below.
537 PW_TRY(ProcessSettingsFrame(client_frame, /*send_ack=*/false));
538 PW_LOG_DEBUG("Conn.Preface received SETTINGS");
539
540 // We must send a SETTINGS frame.
541 // RFC 9113 §6.5.2
542 PW_PACKED(struct) Setting {
543 uint16_t id;
544 uint32_t value;
545 };
546 PW_PACKED(struct) SettingsFrame {
547 WireFrameHeader header;
548 Setting settings[2];
549 };
550 SettingsFrame server_frame{
551 .header = WireFrameHeader(FrameHeader{
552 .payload_length = 12,
553 .type = FrameType::SETTINGS,
554 .flags = 0,
555 .stream_id = 0,
556 }),
557 .settings =
558 {
559 {
560 .id = ToNetworkOrder(SETTINGS_HEADER_TABLE_SIZE),
561 .value = ToNetworkOrder(kHpackDynamicHeaderTableSize),
562 },
563 {
564 .id = ToNetworkOrder(SETTINGS_MAX_CONCURRENT_STREAMS),
565 .value = ToNetworkOrder(kMaxConcurrentStreams),
566 },
567 },
568 };
569 PW_LOG_DEBUG("Conn.Send SETTINGS");
570 PW_TRY(connection_.send_queue_.SendBytes(AsBytes(server_frame)));
571
572 // We must ack the client's SETTINGS frame *after* sending our SETTINGS.
573 PW_TRY(SendSettingsAck(connection_.send_queue_));
574
575 received_connection_preface_ = true;
576 PW_LOG_DEBUG("Conn.Preface complete");
577 return OkStatus();
578 }
579
580 // RFC 9113 §6.1
ProcessDataFrame(const FrameHeader & frame)581 Status Connection::Reader::ProcessDataFrame(const FrameHeader& frame) {
582 PW_LOG_DEBUG("Conn.Recv DATA id=%" PRIu32 " flags=0x%x len=%" PRIu32,
583 frame.stream_id,
584 frame.flags,
585 frame.payload_length);
586
587 if (frame.stream_id == 0) {
588 // RFC 9113 §6.1: "If a DATA frame is received whose Stream Identifier field
589 // is 0x00, the recipient MUST respond with a connection error of type
590 // PROTOCOL_ERROR."
591 SendGoAway(Http2Error::PROTOCOL_ERROR);
592 return Status::Internal();
593 }
594
595 // From RFC 9113 §6.9: "A receiver that receives a flow-controlled frame MUST
596 // always account for its contribution against the connection flow-control
597 // window, unless the receiver treats this as a connection error. This is
598 // necessary even if the frame is in error. The sender counts the frame toward
599 // the flow-control window, but if the receiver does not, the flow-control
600 // window at the sender and receiver can become different."
601 //
602 // To simplify this, we send WINDOW_UPDATE frames eagerly.
603 //
604 // In the future we should do something less chatty.
605 PW_TRY(SendWindowUpdates(
606 connection_.send_queue_, frame.stream_id, frame.payload_length));
607
608 {
609 auto state = connection_.LockState();
610 auto stream = state->LookupStream(frame.stream_id);
611 if (!stream.ok()) {
612 PW_LOG_DEBUG("Ignoring DATA on closed stream id=%" PRIu32,
613 frame.stream_id);
614 PW_TRY(ProcessIgnoredFrame(frame));
615 // Stream has been fully closed: silently ignore.
616 return OkStatus();
617 }
618
619 if (stream->get().half_closed) {
620 PW_LOG_ERROR("Recv DATA on half-closed stream id=%" PRIu32,
621 frame.stream_id);
622 PW_TRY(ProcessIgnoredFrame(frame));
623 // RFC 9113 §6.1: "If a DATA frame is received whose stream is not in the
624 // "open" or "half-closed (local)" state, the recipient MUST respond with
625 // a stream error of type STREAM_CLOSED."
626 PW_TRY(SendRstStreamAndClose(stream->get(), Http2Error::STREAM_CLOSED));
627 return OkStatus();
628 }
629 }
630
631 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
632
633 // Drop padding.
634 if ((frame.flags & FLAGS_PADDED) != 0) {
635 if (payload.size() < 1) {
636 // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
637 // if a frame ... is too small to contain mandatory frame data."
638 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
639 return Status::Internal();
640 }
641
642 uint32_t pad_length = static_cast<uint32_t>(payload[0]);
643 if (pad_length >= frame.payload_length) {
644 // RFC 9113 §6.1: "If the length of the padding is the length of the frame
645 // payload or greater, the recipient MUST treat this as a connection error
646 // of type PROTOCOL_ERROR."
647 SendGoAway(Http2Error::PROTOCOL_ERROR);
648 return Status::Internal();
649 }
650 payload = payload.subspan(1, payload.size() - pad_length - 1);
651 }
652
653 auto state = connection_.LockState();
654 auto maybe_stream = state->LookupStream(frame.stream_id);
655 if (!maybe_stream.ok()) {
656 return OkStatus();
657 }
658 Stream* stream = &maybe_stream->get();
659
660 // Parse repeated grpc Length-Prefix-Message.
661 // https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md#requests
662 while (!payload.empty()) {
663 uint32_t message_length;
664
665 // If we aren't reassembling a message, read the next length prefix.
666 if (!stream->assembly_buffer) {
667 size_t read = std::min(5 - static_cast<size_t>(stream->prefix_received),
668 payload.size());
669 std::copy(payload.begin(),
670 payload.begin() + read,
671 stream->prefix_buffer.data() + stream->prefix_received);
672 stream->prefix_received += read;
673 payload = payload.subspan(read);
674
675 // Read the length prefix.
676 if (stream->prefix_received < 5) {
677 continue;
678 }
679 stream->prefix_received = 0;
680
681 ByteBuilder builder(stream->prefix_buffer);
682 auto it = builder.begin();
683 auto message_compressed = it.ReadUint8();
684 message_length = it.ReadUint32(endian::big);
685 if (message_compressed != 0) {
686 PW_LOG_ERROR("Unsupported: grpc message is compressed");
687 PW_TRY(SendRstStreamAndClose(*stream, Http2Error::INTERNAL_ERROR));
688 return OkStatus();
689 }
690
691 if (message_length > payload.size()) {
692 // gRPC message is split across DATA frames, must allocate buffer.
693 if (!state->message_assembly_allocator_) {
694 PW_LOG_ERROR(
695 "Unsupported: split grpc message without allocator provided");
696 PW_TRY(SendRstStreamAndClose(*stream, Http2Error::INTERNAL_ERROR));
697 return OkStatus();
698 }
699
700 stream->assembly_buffer = static_cast<std::byte*>(
701 state->message_assembly_allocator_->Allocate(
702 allocator::Layout(message_length)));
703 if (stream->assembly_buffer == nullptr) {
704 PW_LOG_ERROR("Partial message reassembly buffer allocation failed");
705 PW_TRY(SendRstStreamAndClose(*stream, Http2Error::INTERNAL_ERROR));
706 return OkStatus();
707 }
708 stream->message_length = message_length;
709 stream->message_received = 0;
710 continue;
711 }
712 }
713
714 pw::ByteSpan message;
715
716 // Reading message payload.
717 if (stream->assembly_buffer != nullptr) {
718 uint32_t read =
719 std::min(stream->message_length - stream->message_received,
720 static_cast<uint32_t>(payload.size()));
721 std::copy(payload.begin(),
722 payload.begin() + read,
723 stream->assembly_buffer + stream->message_received);
724 payload = payload.subspan(read);
725 stream->message_received += read;
726 if (stream->message_received < stream->message_length) {
727 continue;
728 }
729 // Fully received message.
730 message = pw::span(stream->assembly_buffer, stream->message_length);
731 } else {
732 message = payload.subspan(0, message_length);
733 payload = payload.subspan(message_length);
734 }
735
736 // Release state lock before callback, reacquire after.
737 connection_.UnlockState(std::move(state));
738 const auto status = callbacks_.OnMessage(frame.stream_id, message);
739 state = connection_.LockState();
740 maybe_stream = state->LookupStream(frame.stream_id);
741 if (!maybe_stream.ok()) {
742 return OkStatus();
743 }
744 stream = &maybe_stream->get();
745
746 if (!status.ok()) {
747 PW_TRY(SendRstStreamAndClose(*stream, Http2Error::INTERNAL_ERROR));
748 return OkStatus();
749 }
750
751 if (stream->assembly_buffer != nullptr) {
752 state->message_assembly_allocator_->Deallocate(stream->assembly_buffer);
753 stream->assembly_buffer = nullptr;
754 stream->message_length = 0;
755 stream->message_received = 0;
756 }
757 }
758
759 // grpc requires every request stream to end with an empty DATA frame with
760 // FLAGS_END_STREAM. If a client sends FLAGS_END_STREAM with a non-empty
761 // payload, it's not specified how the server should respond. We choose to
762 // accept the payload before ending the stream.
763 // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
764 if ((frame.flags & FLAGS_END_STREAM) != 0) {
765 stream->half_closed = true;
766 connection_.UnlockState(std::move(state));
767 callbacks_.OnHalfClose(frame.stream_id);
768 }
769
770 return OkStatus();
771 }
772
773 // RFC 9113 §6.2
ProcessHeadersFrame(const FrameHeader & frame)774 Status Connection::Reader::ProcessHeadersFrame(const FrameHeader& frame) {
775 PW_LOG_DEBUG("Conn.Recv HEADERS id=%" PRIu32 " len=%" PRIu32,
776 frame.stream_id,
777 frame.payload_length);
778
779 if (frame.stream_id == 0) {
780 // RFC 9113 §6.2: "If a HEADERS frame is received whose Stream Identifier
781 // field is 0x00, the recipient MUST respond with a connection error of type
782 // PROTOCOL_ERROR."
783 SendGoAway(Http2Error::PROTOCOL_ERROR);
784 return Status::Internal();
785 }
786 if (frame.stream_id % 2 != 1 || frame.stream_id <= last_stream_id_) {
787 // RFC 9113 §5.1.1: "Streams initiated by a client MUST use odd-numbered
788 // stream identifiers ... The identifier of a newly established stream MUST
789 // be numerically greater than all streams that the initiating endpoint has
790 // opened ... An endpoint that receives an unexpected stream identifier MUST
791 // respond with a connection error of type PROTOCOL_ERROR."
792 SendGoAway(Http2Error::PROTOCOL_ERROR);
793 return Status::Internal();
794 }
795
796 last_stream_id_ = frame.stream_id;
797
798 {
799 auto state = connection_.LockState();
800 if (auto stream = state->LookupStream(frame.stream_id); stream.ok()) {
801 PW_LOG_DEBUG("Client sent HEADERS after the first stream message");
802 PW_TRY(ProcessIgnoredFrame(frame));
803 // grpc requests cannot contain trailers.
804 // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
805 PW_TRY(SendRstStreamAndClose(stream->get(), Http2Error::PROTOCOL_ERROR));
806 return OkStatus();
807 }
808 }
809
810 if ((frame.flags & FLAGS_END_STREAM) != 0) {
811 PW_LOG_DEBUG("Client sent HEADERS with END_STREAM");
812 PW_TRY(ProcessIgnoredFrame(frame));
813 // grpc requests must send END_STREAM in an empty DATA frame.
814 // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
815 PW_TRY(SendRstStream(
816 connection_.send_queue_, frame.stream_id, Http2Error::PROTOCOL_ERROR));
817 return OkStatus();
818 }
819 if ((frame.flags & FLAGS_END_HEADERS) == 0) {
820 PW_LOG_ERROR("Client sent HEADERS frame without END_HEADERS: unsupported");
821 SendGoAway(Http2Error::INTERNAL_ERROR);
822 return Status::Internal();
823 }
824
825 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
826
827 // Drop padding.
828 if ((frame.flags & FLAGS_PADDED) != 0) {
829 if (payload.size() < 1) {
830 // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
831 // if a frame ... is too small to contain mandatory frame data. A frame
832 // size error in a frame that could alter the state of the entire
833 // connection MUST be treated as a connection error"
834 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
835 return Status::Internal();
836 }
837 uint32_t pad_length = static_cast<uint32_t>(payload[0]);
838 if (pad_length >= frame.payload_length) {
839 // RFC 9113 §6.2: "If the length of the padding is the length of the frame
840 // payload or greater, the recipient MUST treat this as a connection error
841 // of type PROTOCOL_ERROR."
842 SendGoAway(Http2Error::PROTOCOL_ERROR);
843 return Status::Internal();
844 }
845 payload = payload.subspan(1, payload.size() - pad_length - 1);
846 }
847
848 // Drop priority fields.
849 if ((frame.flags & FLAGS_PRIORITY) != 0) {
850 if (payload.size() < 5) {
851 // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
852 // if a frame ... is too small to contain mandatory frame data."
853 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
854 return Status::Internal();
855 }
856 payload = payload.subspan(5);
857 }
858
859 PW_TRY_ASSIGN(auto method_name, HpackParseRequestHeaders(payload));
860 if (!CreateStream(frame.stream_id).ok()) {
861 PW_LOG_WARN("Too many streams, rejecting id=%" PRIu32, frame.stream_id);
862 return SendRstStream(
863 connection_.send_queue_, frame.stream_id, Http2Error::REFUSED_STREAM);
864 }
865
866 if (const auto status = callbacks_.OnNew(frame.stream_id, method_name);
867 !status.ok()) {
868 auto state = connection_.LockState();
869 if (auto stream = state->LookupStream(frame.stream_id); stream.ok()) {
870 return SendRstStreamAndClose(stream->get(), Http2Error::INTERNAL_ERROR);
871 }
872 }
873
874 return OkStatus();
875 }
876
877 // RFC 9113 §6.4
ProcessRstStreamFrame(const FrameHeader & frame)878 Status Connection::Reader::ProcessRstStreamFrame(const FrameHeader& frame) {
879 PW_LOG_DEBUG("Conn.Recv RST_STREAM id=%" PRIu32 " len=%" PRIu32,
880 frame.stream_id,
881 frame.payload_length);
882
883 if (frame.stream_id == 0) {
884 // RFC 9113 §6.4: "If a RST_STREAM frame is received with a stream
885 // identifier of 0x00, the recipient MUST treat this as a connection error
886 // of type PROTOCOL_ERROR".
887 SendGoAway(Http2Error::PROTOCOL_ERROR);
888 return Status::Internal();
889 }
890 if (frame.stream_id > last_stream_id_) {
891 // RFC 9113 §6.4: "If a RST_STREAM frame identifying an idle stream is
892 // received, the recipient MUST treat this as a connection error of type
893 // PROTOCOL_ERROR."
894 SendGoAway(Http2Error::PROTOCOL_ERROR);
895 return Status::Internal();
896 }
897 if (frame.payload_length != 4) {
898 // RFC 9113 §6.4: "A RST_STREAM frame with a length other than 4 octets MUST
899 // be treated as a connection error of type FRAME_SIZE_ERROR."
900 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
901 return Status::Internal();
902 }
903
904 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
905 ByteBuilder builder(payload);
906 auto error_code = builder.begin().ReadUint32(endian::big);
907
908 PW_LOG_DEBUG("Conn.RstStream id=%" PRIu32 " error=%" PRIu32,
909 frame.stream_id,
910 error_code);
911 auto state = connection_.LockState();
912 if (auto stream = state->LookupStream(frame.stream_id); stream.ok()) {
913 CloseStream(stream->get());
914 }
915 return OkStatus();
916 }
917
918 // RFC 9113 §6.5
ProcessSettingsFrame(const FrameHeader & frame,bool send_ack)919 Status Connection::Reader::ProcessSettingsFrame(const FrameHeader& frame,
920 bool send_ack) {
921 PW_LOG_DEBUG("Conn.Recv SETTINGS len=%" PRIu32 " flags=0x%x",
922 frame.payload_length,
923 frame.flags);
924
925 if ((frame.flags & FLAGS_ACK) != 0) {
926 // RFC 9113 §6.5: "Receipt of a SETTINGS frame with the ACK flag set and a
927 // length field value other than 0 MUST be treated as a connection error of
928 // type FRAME_SIZE_ERROR."
929 if (frame.payload_length != 0) {
930 PW_LOG_ERROR("Invalid SETTINGS frame: has ACK with non-empty payload");
931 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
932 return Status::Internal();
933 }
934 // Don't ACK an ACK.
935 send_ack = false;
936 } else {
937 // RFC 9113 §6.5: "A SETTINGS frame with a length other than a multiple of 6
938 // octets MUST be treated as a connection error of type FRAME_SIZE_ERROR."
939 if (frame.payload_length % 6 != 0) {
940 PW_LOG_ERROR("Invalid SETTINGS frame: payload size invalid");
941 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
942 return Status::Internal();
943 }
944 }
945
946 if (frame.stream_id != 0) {
947 // RFC 9113 §6.5: "If an endpoint receives a SETTINGS frame whose Stream
948 // Identifier field is anything other than 0x00, the endpoint MUST respond
949 // with a connection error of type PROTOCOL_ERROR."
950 SendGoAway(Http2Error::PROTOCOL_ERROR);
951 return Status::Internal();
952 }
953
954 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
955
956 // RFC 9113 §6.5.2
957 ByteBuilder builder(payload);
958 for (auto it = builder.begin(); it != builder.end();) {
959 auto id = it.ReadUint16(endian::big);
960 auto value = it.ReadUint32(endian::big);
961 PW_LOG_DEBUG("Applying SETTING id=%" PRIu16 " value=%" PRIu32, id, value);
962 switch (id) {
963 case SETTINGS_INITIAL_WINDOW_SIZE: {
964 // RFC 9113 §6.5.2: "Values above the maximum flow-control window size
965 // of 2^31-1 MUST be treated as a connection error of type
966 // FLOW_CONTROL_ERROR."
967 if ((value & (1 << 31)) != 0) {
968 SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
969 return Status::Internal();
970 }
971 // RFC 9113 §6.9.2: "When the value of SETTINGS_INITIAL_WINDOW_SIZE
972 // changes, a receiver MUST adjust the size of all stream flow-control
973 // windows that it maintains by the difference between the new value and
974 // the old value."
975 int32_t newval = static_cast<int32_t>(value);
976 int32_t delta = newval - initial_send_window_;
977 auto state = connection_.LockState();
978 for (size_t i = 0; i < state->streams.size(); i++) {
979 if (state->streams[i].id == 0) {
980 continue;
981 }
982 if (PW_ADD_OVERFLOW(state->streams[i].send_window,
983 delta,
984 &state->streams[i].send_window)) {
985 SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
986 return Status::Internal();
987 }
988 }
989 initial_send_window_ = newval;
990 break;
991 }
992 case SETTINGS_MAX_FRAME_SIZE:
993 // RFC 9113 §6.5.2: "Values outside this range MUST be treated as a
994 // connection error of type PROTOCOL_ERROR".
995 if (value < 16384 || 16777215 < value) {
996 SendGoAway(Http2Error::PROTOCOL_ERROR);
997 return Status::Internal();
998 }
999 // We never send frame payloads larger than 16384, so we don't need to
1000 // track the client's preference.
1001 break;
1002 // Ignore these.
1003 // SETTINGS_HEADER_TABLE_SIZE: our responses don't use the dynamic table
1004 // SETTINGS_ENABLE_PUSH: we don't support push
1005 // SETTINGS_MAX_CONCURRENT_STREAMS: we don't support push
1006 // SETTINGS_MAX_HEADER_LIST_SIZE: we send very tiny response HEADERS
1007 default:
1008 break;
1009 }
1010 }
1011
1012 if (send_ack) {
1013 PW_TRY(SendSettingsAck(connection_.send_queue_));
1014 }
1015
1016 return OkStatus();
1017 }
1018
1019 // RFC 9113 §6.7
ProcessPingFrame(const FrameHeader & frame)1020 Status Connection::Reader::ProcessPingFrame(const FrameHeader& frame) {
1021 PW_LOG_DEBUG("Conn.Recv PING len=%" PRIu32, frame.payload_length);
1022
1023 if (frame.stream_id != 0) {
1024 // RFC 9113 §6.7: "If a PING frame is received with a Stream Identifier
1025 // field value other than 0x00, the recipient MUST respond with a connection
1026 // error of type PROTOCOL_ERROR."
1027 SendGoAway(Http2Error::PROTOCOL_ERROR);
1028 return Status::Internal();
1029 }
1030 if (frame.payload_length != 8) {
1031 // RFC 9113 §6.7: "Receipt of a PING frame with a length field value other
1032 // than 8 MUST be treated as a connection error of type FRAME_SIZE_ERROR."
1033 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1034 return Status::Internal();
1035 }
1036
1037 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1038
1039 // Don't ACK an ACK.
1040 if ((frame.flags & FLAGS_ACK) != 0) {
1041 return OkStatus();
1042 }
1043
1044 // Send an ACK.
1045 PW_PACKED(struct) PingFrame {
1046 WireFrameHeader header;
1047 uint64_t opaque_data;
1048 };
1049 ByteBuilder builder(payload);
1050 PingFrame ack_frame = {
1051 .header = WireFrameHeader(FrameHeader{
1052 .payload_length = 8,
1053 .type = FrameType::PING,
1054 .flags = FLAGS_ACK,
1055 .stream_id = 0,
1056 }),
1057 // Since we're going to echo this, read as native endian so it gets echoed
1058 // exactly as-is.
1059 .opaque_data = builder.begin().ReadUint64(endian::native),
1060 };
1061 PW_TRY(connection_.send_queue_.SendBytes(AsBytes(ack_frame)));
1062 return OkStatus();
1063 }
1064
1065 // RFC 9113 §6.9
ProcessWindowUpdateFrame(const FrameHeader & frame)1066 Status Connection::Reader::ProcessWindowUpdateFrame(const FrameHeader& frame) {
1067 PW_LOG_DEBUG("Conn.Recv WINDOW_UPDATE id=%" PRIu32 " len=%" PRIu32,
1068 frame.stream_id,
1069 frame.payload_length);
1070
1071 if (frame.payload_length != 4) {
1072 // RFC 9113 §6.9: "A WINDOW_UPDATE frame with a length other than 4 octets
1073 // MUST be treated as a connection error of type FRAME_SIZE_ERROR."
1074 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1075 return Status::Internal();
1076 }
1077
1078 // Read window size increment.
1079 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1080 ByteBuilder builder(payload);
1081 int32_t delta = static_cast<int32_t>(builder.begin().ReadUint32(endian::big) &
1082 0x7fffffff);
1083
1084 auto state = connection_.LockState();
1085 auto stream = state->LookupStream(frame.stream_id);
1086
1087 if (delta == 0) {
1088 // RFC 9113 §6.9: "A receiver MUST treat a WINDOW_UPDATE frame with an
1089 // increment of 0 as a stream error of type PROTOCOL_ERROR; errors on the
1090 // connection flow-control window MUST be treated as a connection error."
1091 if (frame.stream_id == 0) {
1092 SendGoAway(Http2Error::PROTOCOL_ERROR);
1093 return Status::Internal();
1094 } else {
1095 if (!stream.ok()) {
1096 // Already closed
1097 return OkStatus();
1098 }
1099 PW_TRY(SendRstStreamAndClose(stream->get(), Http2Error::PROTOCOL_ERROR));
1100 return OkStatus();
1101 }
1102 }
1103
1104 // RFC 9113 §6.9.1: "If a sender receives a WINDOW_UPDATE that causes a
1105 // flow-control window to exceed 2^31-1 bytes, it MUST terminate either the
1106 // stream or the connection, as appropriate ... with an error code of
1107 // FLOW_CONTROL_ERROR"
1108 if (frame.stream_id == 0) {
1109 if (PW_ADD_OVERFLOW(state->connection_send_window,
1110 delta,
1111 &state->connection_send_window)) {
1112 SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
1113 return Status::Internal();
1114 }
1115 } else if (stream.ok()) {
1116 if (PW_ADD_OVERFLOW(
1117 stream->get().send_window, delta, &stream->get().send_window)) {
1118 PW_TRY(
1119 SendRstStreamAndClose(stream->get(), Http2Error::FLOW_CONTROL_ERROR));
1120 return OkStatus();
1121 }
1122 }
1123
1124 return OkStatus();
1125 }
1126
1127 // Advance past the payload.
ProcessIgnoredFrame(const FrameHeader & frame)1128 Status Connection::Reader::ProcessIgnoredFrame(const FrameHeader& frame) {
1129 PW_TRY(ReadFramePayload(frame));
1130 return OkStatus();
1131 }
1132
ReadFramePayload(const FrameHeader & frame)1133 Result<ByteSpan> Connection::Reader::ReadFramePayload(
1134 const FrameHeader& frame) {
1135 if (frame.payload_length == 0) {
1136 return ByteSpan();
1137 }
1138 if (frame.payload_length > payload_scratch_.size()) {
1139 PW_LOG_ERROR("Frame type=%d payload too large: %" PRIu32 " > %" PRIu32,
1140 static_cast<int>(frame.type),
1141 frame.payload_length,
1142 static_cast<uint32_t>(payload_scratch_.size()));
1143 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1144 return Status::Internal();
1145 }
1146 auto payload = span{payload_scratch_}.subspan(0, frame.payload_length);
1147 PW_TRY(ReadExactly(connection_.socket_.as_reader(), payload));
1148 return payload;
1149 }
1150
1151 // RFC 9113 §6.8
SendGoAway(Http2Error code)1152 void Connection::Reader::SendGoAway(Http2Error code) {
1153 if (!received_connection_preface_) {
1154 // RFC 9113 §3.4: "A GOAWAY frame MAY be omitted in this case, since an
1155 // invalid preface indicates that the peer is not using HTTP/2."
1156 return;
1157 }
1158
1159 // Close all open streams.
1160 {
1161 auto state = connection_.LockState();
1162 for (size_t i = 0; i < state->streams.size(); i++) {
1163 if (state->streams[i].id != 0) {
1164 CloseStream(state->streams[i]);
1165 }
1166 }
1167 }
1168
1169 PW_PACKED(struct) GoAwayFrame {
1170 WireFrameHeader header;
1171 uint32_t last_stream_id;
1172 uint32_t error_code;
1173 };
1174 GoAwayFrame frame{
1175 .header = WireFrameHeader(FrameHeader{
1176 .payload_length = 8,
1177 .type = FrameType::GOAWAY,
1178 .flags = 0,
1179 .stream_id = 0,
1180 }),
1181 .last_stream_id = ToNetworkOrder(last_stream_id_),
1182 .error_code = ToNetworkOrder(code),
1183 };
1184 // Ignore errors since we're about to close the connection anyway.
1185 connection_.send_queue_.SendBytes(AsBytes(frame)).IgnoreError();
1186 }
1187
1188 // RFC 9113 §6.4
SendRstStreamAndClose(Stream & stream,Http2Error code)1189 Status Connection::Reader::SendRstStreamAndClose(Stream& stream,
1190 Http2Error code) {
1191 // Ignore errors as we are closing anyways.
1192 SendRstStream(connection_.send_queue_, stream.id, code).IgnoreError();
1193 CloseStream(stream);
1194 return OkStatus();
1195 }
1196
1197 } // namespace pw::grpc
1198