xref: /aosp_15_r20/external/pigweed/pw_grpc/public/pw_grpc/connection.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <array>
17 #include <cstdint>
18 
19 #include "pw_allocator/allocator.h"
20 #include "pw_bytes/byte_builder.h"
21 #include "pw_bytes/span.h"
22 #include "pw_function/function.h"
23 #include "pw_grpc/send_queue.h"
24 #include "pw_result/result.h"
25 #include "pw_status/status.h"
26 #include "pw_stream/stream.h"
27 #include "pw_string/string.h"
28 #include "pw_sync/inline_borrowable.h"
29 #include "pw_thread/thread.h"
30 #include "pw_thread/thread_core.h"
31 
32 namespace pw::grpc {
33 namespace internal {
34 
35 struct FrameHeader;
36 enum class Http2Error : uint32_t;
37 
38 // Parameters of this implementation.
39 // RFC 9113 §5.1.2
40 inline constexpr uint32_t kMaxConcurrentStreams = 16;
41 
42 // RFC 9113 §4.2 and §6.5.2
43 inline constexpr uint32_t kMaxFramePayloadSize = 16384;
44 
45 // Limits on grpc message sizes. The length prefix includes the compressed byte
46 // and 32-bit length from Length-Prefixed-Message.
47 // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
48 inline constexpr uint32_t kMaxGrpcMessageSizeWithLengthPrefix =
49     kMaxFramePayloadSize;
50 inline constexpr uint32_t kMaxGrpcMessageSize =
51     kMaxGrpcMessageSizeWithLengthPrefix - 5;
52 
53 }  // namespace internal
54 
55 // RFC 9113 §5.1.1: Streams are identified by unsigned 31-bit integers.
56 using StreamId = uint32_t;
57 
58 inline constexpr uint32_t kMaxMethodNameSize = 127;
59 
60 // Implements a gRPC over HTTP2 server.
61 //
62 // Basic usage:
63 // * Provide a Connection::RequestCallbacks implementation that handles RPC
64 //   events.
65 // * Provide a readable/writeable stream object that will be used like a
66 //   socket over which the HTTP2 frames are read/written. When the underlying
67 //   stream should be closed, the provided connection_close_callback will be
68 //   called.
69 // * Drive the connection by calling ProcessConnectionPreface then ProcessFrame
70 //   in a loop while status is Ok on one thread.
71 // * RPC responses can be sent from any thread by calling
72 //   SendResponseMessage/SendResponseComplete. The SendQueue object will
73 //   handle concurrent access.
74 //
75 // One thread should be dedicated to driving reads (ProcessFrame calls), while
76 // another thread (implemented by SendQueue) handles all writes. Refer to
77 // the ConnectionThread class for an implementation of this.
78 //
79 // By default, each gRPC message must be entirely contained within a single
80 // HTTP2 DATA frame, as supporting fragmented messages requires buffering
81 // up to the maximum message size per stream. To support fragmented messages,
82 // provide a message_assembly_allocator, which will be used to allocate
83 // temporary storage for fragmented gRPC messages when required. If no
84 // allocator is provided, or allocation fails, the stream will be closed.
85 class Connection {
86  public:
87   // Callbacks invoked on requests from the client. Called on same thread as
88   // ProcessFrame is being called on.
89   class RequestCallbacks {
90    public:
91     virtual ~RequestCallbacks() = default;
92 
93     // Called on startup of connection.
94     virtual void OnNewConnection() = 0;
95 
96     // Called on a new RPC. full_method_name is "<ServiceName>/<MethodName>".
97     // This is guaranteed to be called before any other method with the same id.
98     virtual Status OnNew(StreamId id,
99                          InlineString<kMaxMethodNameSize> full_method_name) = 0;
100 
101     // Called on a new request message for an RPC. The `message` must not be
102     // accessed after this method returns.
103     //
104     // Return an error status to cause the stream to be closed with RST_STREAM
105     // frame.
106     virtual Status OnMessage(StreamId id, ByteSpan message) = 0;
107 
108     // Called after the client has sent all request messages for an RPC.
109     virtual void OnHalfClose(StreamId id) = 0;
110 
111     // Called when an RPC has been canceled.
112     virtual void OnCancel(StreamId id) = 0;
113   };
114 
115   Connection(stream::ReaderWriter& stream,
116              SendQueue& send_queue,
117              RequestCallbacks& callbacks,
118              allocator::Allocator* message_assembly_allocator = nullptr);
119 
120   // Reads from stream and processes required connection preface frames. Should
121   // be called before ProcessFrame(). Return OK if connection preface was found.
ProcessConnectionPreface()122   Status ProcessConnectionPreface() {
123     return reader_.ProcessConnectionPreface();
124   }
125 
126   // Reads from stream and processes next frame on connection. Returns OK
127   // as long as connection is open. Should be called from a single thread.
ProcessFrame()128   Status ProcessFrame() { return reader_.ProcessFrame(); }
129 
130   // Sends a response message for an RPC. The `message` will not be accessed
131   // after this method returns. Thread safe.
132   //
133   // Errors are:
134   //
135   // * NOT_FOUND if stream_id does not reference an active stream, including
136   //   RPCs that have already completed and IDs that do not refer to any prior
137   //   RPC.
138   // * RESOURCE_EXHAUSTED if the flow control window is not large enough to send
139   //   this RPC immediately. In this case, no response will be send.
140   // * UNAVAILABLE if the connection is closed.
SendResponseMessage(StreamId stream_id,pw::ConstByteSpan message)141   Status SendResponseMessage(StreamId stream_id, pw::ConstByteSpan message) {
142     return writer_.SendResponseMessage(stream_id, message);
143   }
144 
145   // Completes an RPC with the given status code. Thread safe. Pigweed status
146   // codes happen to align exactly with grpc status codes. Compare:
147   // https://grpc.github.io/grpc/core/md_doc_statuscodes.html
148   // https://pigweed.dev/pw_status/#quick-reference
149   //
150   // Errors are:
151   //
152   // * NOT_FOUND if stream_id does not reference an active stream, including
153   //   RPCs that have already completed, or if stream_id does not refer to any
154   //   prior RPC.
155   // * UNAVAILABLE if the connection is closed.
SendResponseComplete(StreamId stream_id,pw::Status response_code)156   Status SendResponseComplete(StreamId stream_id, pw::Status response_code) {
157     return writer_.SendResponseComplete(stream_id, response_code);
158   }
159 
160  private:
161   // RFC 9113 §6.9.2. Flow control windows are unsigned 31-bit numbers, but
162   // because of the following requirement from §6.9.2, we track flow control
163   // windows with signed integers. "A change to SETTINGS_INITIAL_WINDOW_SIZE can
164   // cause the available space in a flow-control window to become negative. A
165   // sender MUST track the negative flow-control window ..."
166   static inline constexpr int32_t kDefaultInitialWindowSize = 65535;
167 
168   // From RFC 9113 §5.1, we use only the following states:
169   // * idle, which have `id > last_stream_id_`
170   // * open, which are in `streams_` with `half_closed = false`
171   // * half-closed (remote), which are in `streams_` with `half_closed = true`
172   //
173   // Regarding other states:
174   // * reserved is ignored because we do not sent PUSH_PROMISE
175   // * half-closed (local) is merged into close, because once a grpc server has
176   //   sent a response, the RPC is complete
177   struct Stream {
178     StreamId id;
179     bool half_closed;
180     bool started_response;
181     int32_t send_window;
182 
183     // Fragmented gRPC message assembly, nullptr if not assembling a message.
184     std::byte* assembly_buffer;
185     union {
186       struct {
187         // Buffer for the length-prefix, if fragmented.
188         std::array<std::byte, 5> prefix_buffer;
189         // Bytes of the prefix received so far.
190         uint8_t prefix_received;
191       };
192       struct {
193         // Total length of the message.
194         uint32_t message_length;
195         // Length of the message received so far (during assembly).
196         uint32_t message_received;
197       };
198     };
199 
ResetStream200     void Reset() {
201       id = 0;
202       half_closed = false;
203       started_response = false;
204       send_window = 0;
205 
206       assembly_buffer = nullptr;
207       message_length = 0;
208       message_received = 0;
209       prefix_received = 0;
210     }
211   };
212 
213   // Internal state is divided into what is needed for reading/writing/shared to
214   // both.
215 
216   struct SharedState {
217     pw::Result<std::reference_wrapper<Stream>> LookupStream(StreamId id);
218 
219     // Stream state
220     std::array<Stream, internal::kMaxConcurrentStreams> streams{};
221     int32_t connection_send_window = kDefaultInitialWindowSize;
222 
223     // Allocator for fragmented grpc message reassembly
224     allocator::Allocator* message_assembly_allocator_;
225   };
226 
227   class Writer {
228    public:
Writer(Connection & connection)229     Writer(Connection& connection) : connection_(connection) {}
230 
231     Status SendResponseMessage(StreamId stream_id, pw::ConstByteSpan message);
232     Status SendResponseComplete(StreamId stream_id, pw::Status response_code);
233 
234    private:
235     Connection& connection_;
236   };
237 
238   class Reader {
239    public:
Reader(Connection & connection,RequestCallbacks & callbacks)240     Reader(Connection& connection, RequestCallbacks& callbacks)
241         : connection_(connection), callbacks_(callbacks) {}
242 
243     Status ProcessConnectionPreface();
244     Status ProcessFrame();
245 
246    private:
247     pw::Status CreateStream(StreamId id);
248     void CloseStream(Stream& stream);
249 
250     Status ProcessDataFrame(const internal::FrameHeader&);
251     Status ProcessHeadersFrame(const internal::FrameHeader&);
252     Status ProcessRstStreamFrame(const internal::FrameHeader&);
253     Status ProcessSettingsFrame(const internal::FrameHeader&, bool send_ack);
254     Status ProcessPingFrame(const internal::FrameHeader&);
255     Status ProcessWindowUpdateFrame(const internal::FrameHeader&);
256     Status ProcessIgnoredFrame(const internal::FrameHeader&);
257     Result<ByteSpan> ReadFramePayload(const internal::FrameHeader&);
258 
259     // Send GOAWAY frame and signal connection should be closed.
260     void SendGoAway(internal::Http2Error code);
261     Status SendRstStreamAndClose(Stream& stream, internal::Http2Error code);
262 
263     Connection& connection_;
264     RequestCallbacks& callbacks_;
265     int32_t initial_send_window_ = kDefaultInitialWindowSize;
266     bool received_connection_preface_ = false;
267 
268     std::array<std::byte, internal::kMaxFramePayloadSize> payload_scratch_{};
269     StreamId last_stream_id_ = 0;
270   };
271 
LockState()272   sync::BorrowedPointer<SharedState> LockState() {
273     return shared_state_.acquire();
274   }
275 
UnlockState(sync::BorrowedPointer<SharedState> && state)276   void UnlockState(sync::BorrowedPointer<SharedState>&& state) {
277     sync::BorrowedPointer<SharedState> moved_state = std::move(state);
278     static_cast<void>(moved_state);
279   }
280 
281   // Shared state that is thread-safe.
282   stream::ReaderWriter& socket_;
283   SendQueue& send_queue_;
284 
285   sync::InlineBorrowable<SharedState> shared_state_;
286   Reader reader_;
287   Writer writer_;
288 };
289 
290 class ConnectionThread : public Connection, public thread::ThreadCore {
291  public:
292   // The ConnectionCloseCallback will be called when this thread is shutting
293   // down and all data has finished sending. It will be called from this
294   // ConnectionThread.
295   using ConnectionCloseCallback = Function<void()>;
296 
297   ConnectionThread(stream::NonSeekableReaderWriter& stream,
298                    const thread::Options& send_thread_options,
299                    RequestCallbacks& callbacks,
300                    ConnectionCloseCallback&& connection_close_callback,
301                    allocator::Allocator* message_assembly_allocator = nullptr)
Connection(stream,send_queue_,callbacks,message_assembly_allocator)302       : Connection(stream, send_queue_, callbacks, message_assembly_allocator),
303         send_queue_(stream),
304         send_queue_thread_options_(send_thread_options),
305         connection_close_callback_(std::move(connection_close_callback)) {}
306 
307   // Process the connection. Does not return until the connection is closed.
Run()308   void Run() override {
309     Thread send_thread(send_queue_thread_options_, send_queue_);
310     Status status = ProcessConnectionPreface();
311     while (status.ok()) {
312       status = ProcessFrame();
313     }
314     send_queue_.RequestStop();
315     send_thread.join();
316     if (connection_close_callback_) {
317       connection_close_callback_();
318     }
319   };
320 
321  private:
322   SendQueue send_queue_;
323   const thread::Options& send_queue_thread_options_;
324   ConnectionCloseCallback connection_close_callback_;
325 };
326 
327 }  // namespace pw::grpc
328