1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <array> 17 #include <cstdint> 18 19 #include "pw_allocator/allocator.h" 20 #include "pw_bytes/byte_builder.h" 21 #include "pw_bytes/span.h" 22 #include "pw_function/function.h" 23 #include "pw_grpc/send_queue.h" 24 #include "pw_result/result.h" 25 #include "pw_status/status.h" 26 #include "pw_stream/stream.h" 27 #include "pw_string/string.h" 28 #include "pw_sync/inline_borrowable.h" 29 #include "pw_thread/thread.h" 30 #include "pw_thread/thread_core.h" 31 32 namespace pw::grpc { 33 namespace internal { 34 35 struct FrameHeader; 36 enum class Http2Error : uint32_t; 37 38 // Parameters of this implementation. 39 // RFC 9113 §5.1.2 40 inline constexpr uint32_t kMaxConcurrentStreams = 16; 41 42 // RFC 9113 §4.2 and §6.5.2 43 inline constexpr uint32_t kMaxFramePayloadSize = 16384; 44 45 // Limits on grpc message sizes. The length prefix includes the compressed byte 46 // and 32-bit length from Length-Prefixed-Message. 47 // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md. 48 inline constexpr uint32_t kMaxGrpcMessageSizeWithLengthPrefix = 49 kMaxFramePayloadSize; 50 inline constexpr uint32_t kMaxGrpcMessageSize = 51 kMaxGrpcMessageSizeWithLengthPrefix - 5; 52 53 } // namespace internal 54 55 // RFC 9113 §5.1.1: Streams are identified by unsigned 31-bit integers. 56 using StreamId = uint32_t; 57 58 inline constexpr uint32_t kMaxMethodNameSize = 127; 59 60 // Implements a gRPC over HTTP2 server. 61 // 62 // Basic usage: 63 // * Provide a Connection::RequestCallbacks implementation that handles RPC 64 // events. 65 // * Provide a readable/writeable stream object that will be used like a 66 // socket over which the HTTP2 frames are read/written. When the underlying 67 // stream should be closed, the provided connection_close_callback will be 68 // called. 69 // * Drive the connection by calling ProcessConnectionPreface then ProcessFrame 70 // in a loop while status is Ok on one thread. 71 // * RPC responses can be sent from any thread by calling 72 // SendResponseMessage/SendResponseComplete. The SendQueue object will 73 // handle concurrent access. 74 // 75 // One thread should be dedicated to driving reads (ProcessFrame calls), while 76 // another thread (implemented by SendQueue) handles all writes. Refer to 77 // the ConnectionThread class for an implementation of this. 78 // 79 // By default, each gRPC message must be entirely contained within a single 80 // HTTP2 DATA frame, as supporting fragmented messages requires buffering 81 // up to the maximum message size per stream. To support fragmented messages, 82 // provide a message_assembly_allocator, which will be used to allocate 83 // temporary storage for fragmented gRPC messages when required. If no 84 // allocator is provided, or allocation fails, the stream will be closed. 85 class Connection { 86 public: 87 // Callbacks invoked on requests from the client. Called on same thread as 88 // ProcessFrame is being called on. 89 class RequestCallbacks { 90 public: 91 virtual ~RequestCallbacks() = default; 92 93 // Called on startup of connection. 94 virtual void OnNewConnection() = 0; 95 96 // Called on a new RPC. full_method_name is "<ServiceName>/<MethodName>". 97 // This is guaranteed to be called before any other method with the same id. 98 virtual Status OnNew(StreamId id, 99 InlineString<kMaxMethodNameSize> full_method_name) = 0; 100 101 // Called on a new request message for an RPC. The `message` must not be 102 // accessed after this method returns. 103 // 104 // Return an error status to cause the stream to be closed with RST_STREAM 105 // frame. 106 virtual Status OnMessage(StreamId id, ByteSpan message) = 0; 107 108 // Called after the client has sent all request messages for an RPC. 109 virtual void OnHalfClose(StreamId id) = 0; 110 111 // Called when an RPC has been canceled. 112 virtual void OnCancel(StreamId id) = 0; 113 }; 114 115 Connection(stream::ReaderWriter& stream, 116 SendQueue& send_queue, 117 RequestCallbacks& callbacks, 118 allocator::Allocator* message_assembly_allocator = nullptr); 119 120 // Reads from stream and processes required connection preface frames. Should 121 // be called before ProcessFrame(). Return OK if connection preface was found. ProcessConnectionPreface()122 Status ProcessConnectionPreface() { 123 return reader_.ProcessConnectionPreface(); 124 } 125 126 // Reads from stream and processes next frame on connection. Returns OK 127 // as long as connection is open. Should be called from a single thread. ProcessFrame()128 Status ProcessFrame() { return reader_.ProcessFrame(); } 129 130 // Sends a response message for an RPC. The `message` will not be accessed 131 // after this method returns. Thread safe. 132 // 133 // Errors are: 134 // 135 // * NOT_FOUND if stream_id does not reference an active stream, including 136 // RPCs that have already completed and IDs that do not refer to any prior 137 // RPC. 138 // * RESOURCE_EXHAUSTED if the flow control window is not large enough to send 139 // this RPC immediately. In this case, no response will be send. 140 // * UNAVAILABLE if the connection is closed. SendResponseMessage(StreamId stream_id,pw::ConstByteSpan message)141 Status SendResponseMessage(StreamId stream_id, pw::ConstByteSpan message) { 142 return writer_.SendResponseMessage(stream_id, message); 143 } 144 145 // Completes an RPC with the given status code. Thread safe. Pigweed status 146 // codes happen to align exactly with grpc status codes. Compare: 147 // https://grpc.github.io/grpc/core/md_doc_statuscodes.html 148 // https://pigweed.dev/pw_status/#quick-reference 149 // 150 // Errors are: 151 // 152 // * NOT_FOUND if stream_id does not reference an active stream, including 153 // RPCs that have already completed, or if stream_id does not refer to any 154 // prior RPC. 155 // * UNAVAILABLE if the connection is closed. SendResponseComplete(StreamId stream_id,pw::Status response_code)156 Status SendResponseComplete(StreamId stream_id, pw::Status response_code) { 157 return writer_.SendResponseComplete(stream_id, response_code); 158 } 159 160 private: 161 // RFC 9113 §6.9.2. Flow control windows are unsigned 31-bit numbers, but 162 // because of the following requirement from §6.9.2, we track flow control 163 // windows with signed integers. "A change to SETTINGS_INITIAL_WINDOW_SIZE can 164 // cause the available space in a flow-control window to become negative. A 165 // sender MUST track the negative flow-control window ..." 166 static inline constexpr int32_t kDefaultInitialWindowSize = 65535; 167 168 // From RFC 9113 §5.1, we use only the following states: 169 // * idle, which have `id > last_stream_id_` 170 // * open, which are in `streams_` with `half_closed = false` 171 // * half-closed (remote), which are in `streams_` with `half_closed = true` 172 // 173 // Regarding other states: 174 // * reserved is ignored because we do not sent PUSH_PROMISE 175 // * half-closed (local) is merged into close, because once a grpc server has 176 // sent a response, the RPC is complete 177 struct Stream { 178 StreamId id; 179 bool half_closed; 180 bool started_response; 181 int32_t send_window; 182 183 // Fragmented gRPC message assembly, nullptr if not assembling a message. 184 std::byte* assembly_buffer; 185 union { 186 struct { 187 // Buffer for the length-prefix, if fragmented. 188 std::array<std::byte, 5> prefix_buffer; 189 // Bytes of the prefix received so far. 190 uint8_t prefix_received; 191 }; 192 struct { 193 // Total length of the message. 194 uint32_t message_length; 195 // Length of the message received so far (during assembly). 196 uint32_t message_received; 197 }; 198 }; 199 ResetStream200 void Reset() { 201 id = 0; 202 half_closed = false; 203 started_response = false; 204 send_window = 0; 205 206 assembly_buffer = nullptr; 207 message_length = 0; 208 message_received = 0; 209 prefix_received = 0; 210 } 211 }; 212 213 // Internal state is divided into what is needed for reading/writing/shared to 214 // both. 215 216 struct SharedState { 217 pw::Result<std::reference_wrapper<Stream>> LookupStream(StreamId id); 218 219 // Stream state 220 std::array<Stream, internal::kMaxConcurrentStreams> streams{}; 221 int32_t connection_send_window = kDefaultInitialWindowSize; 222 223 // Allocator for fragmented grpc message reassembly 224 allocator::Allocator* message_assembly_allocator_; 225 }; 226 227 class Writer { 228 public: Writer(Connection & connection)229 Writer(Connection& connection) : connection_(connection) {} 230 231 Status SendResponseMessage(StreamId stream_id, pw::ConstByteSpan message); 232 Status SendResponseComplete(StreamId stream_id, pw::Status response_code); 233 234 private: 235 Connection& connection_; 236 }; 237 238 class Reader { 239 public: Reader(Connection & connection,RequestCallbacks & callbacks)240 Reader(Connection& connection, RequestCallbacks& callbacks) 241 : connection_(connection), callbacks_(callbacks) {} 242 243 Status ProcessConnectionPreface(); 244 Status ProcessFrame(); 245 246 private: 247 pw::Status CreateStream(StreamId id); 248 void CloseStream(Stream& stream); 249 250 Status ProcessDataFrame(const internal::FrameHeader&); 251 Status ProcessHeadersFrame(const internal::FrameHeader&); 252 Status ProcessRstStreamFrame(const internal::FrameHeader&); 253 Status ProcessSettingsFrame(const internal::FrameHeader&, bool send_ack); 254 Status ProcessPingFrame(const internal::FrameHeader&); 255 Status ProcessWindowUpdateFrame(const internal::FrameHeader&); 256 Status ProcessIgnoredFrame(const internal::FrameHeader&); 257 Result<ByteSpan> ReadFramePayload(const internal::FrameHeader&); 258 259 // Send GOAWAY frame and signal connection should be closed. 260 void SendGoAway(internal::Http2Error code); 261 Status SendRstStreamAndClose(Stream& stream, internal::Http2Error code); 262 263 Connection& connection_; 264 RequestCallbacks& callbacks_; 265 int32_t initial_send_window_ = kDefaultInitialWindowSize; 266 bool received_connection_preface_ = false; 267 268 std::array<std::byte, internal::kMaxFramePayloadSize> payload_scratch_{}; 269 StreamId last_stream_id_ = 0; 270 }; 271 LockState()272 sync::BorrowedPointer<SharedState> LockState() { 273 return shared_state_.acquire(); 274 } 275 UnlockState(sync::BorrowedPointer<SharedState> && state)276 void UnlockState(sync::BorrowedPointer<SharedState>&& state) { 277 sync::BorrowedPointer<SharedState> moved_state = std::move(state); 278 static_cast<void>(moved_state); 279 } 280 281 // Shared state that is thread-safe. 282 stream::ReaderWriter& socket_; 283 SendQueue& send_queue_; 284 285 sync::InlineBorrowable<SharedState> shared_state_; 286 Reader reader_; 287 Writer writer_; 288 }; 289 290 class ConnectionThread : public Connection, public thread::ThreadCore { 291 public: 292 // The ConnectionCloseCallback will be called when this thread is shutting 293 // down and all data has finished sending. It will be called from this 294 // ConnectionThread. 295 using ConnectionCloseCallback = Function<void()>; 296 297 ConnectionThread(stream::NonSeekableReaderWriter& stream, 298 const thread::Options& send_thread_options, 299 RequestCallbacks& callbacks, 300 ConnectionCloseCallback&& connection_close_callback, 301 allocator::Allocator* message_assembly_allocator = nullptr) Connection(stream,send_queue_,callbacks,message_assembly_allocator)302 : Connection(stream, send_queue_, callbacks, message_assembly_allocator), 303 send_queue_(stream), 304 send_queue_thread_options_(send_thread_options), 305 connection_close_callback_(std::move(connection_close_callback)) {} 306 307 // Process the connection. Does not return until the connection is closed. Run()308 void Run() override { 309 Thread send_thread(send_queue_thread_options_, send_queue_); 310 Status status = ProcessConnectionPreface(); 311 while (status.ok()) { 312 status = ProcessFrame(); 313 } 314 send_queue_.RequestStop(); 315 send_thread.join(); 316 if (connection_close_callback_) { 317 connection_close_callback_(); 318 } 319 }; 320 321 private: 322 SendQueue send_queue_; 323 const thread::Options& send_queue_thread_options_; 324 ConnectionCloseCallback connection_close_callback_; 325 }; 326 327 } // namespace pw::grpc 328