xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/chaotic_good/server_transport.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_TRANSPORT_H
16 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_TRANSPORT_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <stdint.h>
21 #include <stdio.h>
22 
23 #include <cstdint>
24 #include <initializer_list>  // IWYU pragma: keep
25 #include <iostream>
26 #include <map>
27 #include <memory>
28 #include <optional>
29 #include <string>
30 #include <tuple>
31 #include <type_traits>
32 #include <utility>
33 
34 #include "absl/base/thread_annotations.h"
35 #include "absl/container/flat_hash_map.h"
36 #include "absl/functional/any_invocable.h"
37 #include "absl/random/random.h"
38 #include "absl/status/status.h"
39 #include "absl/status/statusor.h"
40 #include "absl/types/optional.h"
41 #include "absl/types/variant.h"
42 
43 #include <grpc/event_engine/event_engine.h>
44 #include <grpc/event_engine/memory_allocator.h>
45 #include <grpc/grpc.h>
46 #include <grpc/slice.h>
47 #include <grpc/support/log.h>
48 
49 #include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h"
50 #include "src/core/ext/transport/chaotic_good/frame.h"
51 #include "src/core/ext/transport/chaotic_good/frame_header.h"
52 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
53 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
54 #include "src/core/lib/event_engine/default_event_engine.h"  // IWYU pragma: keep
55 #include "src/core/lib/gprpp/ref_counted_ptr.h"
56 #include "src/core/lib/gprpp/sync.h"
57 #include "src/core/lib/promise/activity.h"
58 #include "src/core/lib/promise/context.h"
59 #include "src/core/lib/promise/if.h"
60 #include "src/core/lib/promise/inter_activity_latch.h"
61 #include "src/core/lib/promise/inter_activity_pipe.h"
62 #include "src/core/lib/promise/loop.h"
63 #include "src/core/lib/promise/mpsc.h"
64 #include "src/core/lib/promise/party.h"
65 #include "src/core/lib/promise/pipe.h"
66 #include "src/core/lib/promise/poll.h"
67 #include "src/core/lib/promise/seq.h"
68 #include "src/core/lib/promise/try_join.h"
69 #include "src/core/lib/promise/try_seq.h"
70 #include "src/core/lib/resource_quota/arena.h"
71 #include "src/core/lib/resource_quota/memory_quota.h"
72 #include "src/core/lib/slice/slice.h"
73 #include "src/core/lib/slice/slice_buffer.h"
74 #include "src/core/lib/slice/slice_internal.h"
75 #include "src/core/lib/transport/metadata_batch.h"
76 #include "src/core/lib/transport/promise_endpoint.h"
77 #include "src/core/lib/transport/transport.h"
78 
79 namespace grpc_core {
80 namespace chaotic_good {
81 
82 class ChaoticGoodServerTransport final : public Transport,
83                                          public ServerTransport {
84  public:
85   ChaoticGoodServerTransport(
86       const ChannelArgs& args, PromiseEndpoint control_endpoint,
87       PromiseEndpoint data_endpoint,
88       std::shared_ptr<grpc_event_engine::experimental::EventEngine>
89           event_engine,
90       HPackParser hpack_parser, HPackCompressor hpack_encoder);
91   ~ChaoticGoodServerTransport() override;
92 
filter_stack_transport()93   FilterStackTransport* filter_stack_transport() override { return nullptr; }
client_transport()94   ClientTransport* client_transport() override { return nullptr; }
server_transport()95   ServerTransport* server_transport() override { return this; }
GetTransportName()96   absl::string_view GetTransportName() const override { return "chaotic_good"; }
SetPollset(grpc_stream *,grpc_pollset *)97   void SetPollset(grpc_stream*, grpc_pollset*) override {}
SetPollsetSet(grpc_stream *,grpc_pollset_set *)98   void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
99   void PerformOp(grpc_transport_op*) override;
GetEndpoint()100   grpc_endpoint* GetEndpoint() override { return nullptr; }
Orphan()101   void Orphan() override { delete this; }
102 
103   void SetAcceptor(Acceptor* acceptor) override;
104   void AbortWithError();
105 
106  private:
107   using StreamMap = absl::flat_hash_map<uint32_t, CallInitiator>;
108 
109   absl::Status NewStream(uint32_t stream_id, CallInitiator call_initiator);
110   absl::optional<CallInitiator> LookupStream(uint32_t stream_id);
111   absl::optional<CallInitiator> ExtractStream(uint32_t stream_id);
112   auto SendCallInitialMetadataAndBody(uint32_t stream_id,
113                                       MpscSender<ServerFrame> outgoing_frames,
114                                       CallInitiator call_initiator);
115   auto SendCallBody(uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
116                     CallInitiator call_initiator);
117   static auto SendFragment(ServerFragmentFrame frame,
118                            MpscSender<ServerFrame> outgoing_frames,
119                            CallInitiator call_initiator);
120   auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator);
121   auto OnTransportActivityDone(absl::string_view activity);
122   auto TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport);
123   auto ReadOneFrame(ChaoticGoodTransport& transport);
124   auto TransportWriteLoop(RefCountedPtr<ChaoticGoodTransport> transport);
125   // Read different parts of the server frame from control/data endpoints
126   // based on frame header.
127   // Resolves to a StatusOr<tuple<SliceBuffer, SliceBuffer>>
128   auto ReadFrameBody(Slice read_buffer);
129   void SendCancel(uint32_t stream_id, absl::Status why);
130   auto DeserializeAndPushFragmentToNewCall(FrameHeader frame_header,
131                                            BufferPair buffers,
132                                            ChaoticGoodTransport& transport);
133   auto DeserializeAndPushFragmentToExistingCall(
134       FrameHeader frame_header, BufferPair buffers,
135       ChaoticGoodTransport& transport);
136   auto MaybePushFragmentIntoCall(absl::optional<CallInitiator> call_initiator,
137                                  absl::Status error, ClientFragmentFrame frame,
138                                  uint32_t stream_id);
139   auto PushFragmentIntoCall(CallInitiator call_initiator,
140                             ClientFragmentFrame frame, uint32_t stream_id);
141 
142   Acceptor* acceptor_ = nullptr;
143   InterActivityLatch<void> got_acceptor_;
144   MpscReceiver<ServerFrame> outgoing_frames_;
145   // Assigned aligned bytes from setting frame.
146   size_t aligned_bytes_ = 64;
147   Mutex mu_;
148   // Map of stream incoming server frames, key is stream_id.
149   StreamMap stream_map_ ABSL_GUARDED_BY(mu_);
150   uint32_t last_seen_new_stream_id_ = 0;
151   grpc_event_engine::experimental::MemoryAllocator allocator_;
152   ActivityPtr writer_ ABSL_GUARDED_BY(mu_);
153   ActivityPtr reader_ ABSL_GUARDED_BY(mu_);
ABSL_GUARDED_BY(mu_)154   ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){
155       "chaotic_good_server", GRPC_CHANNEL_READY};
156 };
157 
158 }  // namespace chaotic_good
159 }  // namespace grpc_core
160 
161 #endif  // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_TRANSPORT_H