1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_TRANSPORT_H 16 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_TRANSPORT_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <stdint.h> 21 #include <stdio.h> 22 23 #include <cstdint> 24 #include <initializer_list> // IWYU pragma: keep 25 #include <iostream> 26 #include <map> 27 #include <memory> 28 #include <optional> 29 #include <string> 30 #include <tuple> 31 #include <type_traits> 32 #include <utility> 33 34 #include "absl/base/thread_annotations.h" 35 #include "absl/container/flat_hash_map.h" 36 #include "absl/functional/any_invocable.h" 37 #include "absl/random/random.h" 38 #include "absl/status/status.h" 39 #include "absl/status/statusor.h" 40 #include "absl/types/optional.h" 41 #include "absl/types/variant.h" 42 43 #include <grpc/event_engine/event_engine.h> 44 #include <grpc/event_engine/memory_allocator.h> 45 #include <grpc/grpc.h> 46 #include <grpc/slice.h> 47 #include <grpc/support/log.h> 48 49 #include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h" 50 #include "src/core/ext/transport/chaotic_good/frame.h" 51 #include "src/core/ext/transport/chaotic_good/frame_header.h" 52 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" 53 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" 54 #include "src/core/lib/event_engine/default_event_engine.h" // IWYU pragma: keep 55 #include "src/core/lib/gprpp/ref_counted_ptr.h" 56 #include "src/core/lib/gprpp/sync.h" 57 #include "src/core/lib/promise/activity.h" 58 #include "src/core/lib/promise/context.h" 59 #include "src/core/lib/promise/if.h" 60 #include "src/core/lib/promise/inter_activity_latch.h" 61 #include "src/core/lib/promise/inter_activity_pipe.h" 62 #include "src/core/lib/promise/loop.h" 63 #include "src/core/lib/promise/mpsc.h" 64 #include "src/core/lib/promise/party.h" 65 #include "src/core/lib/promise/pipe.h" 66 #include "src/core/lib/promise/poll.h" 67 #include "src/core/lib/promise/seq.h" 68 #include "src/core/lib/promise/try_join.h" 69 #include "src/core/lib/promise/try_seq.h" 70 #include "src/core/lib/resource_quota/arena.h" 71 #include "src/core/lib/resource_quota/memory_quota.h" 72 #include "src/core/lib/slice/slice.h" 73 #include "src/core/lib/slice/slice_buffer.h" 74 #include "src/core/lib/slice/slice_internal.h" 75 #include "src/core/lib/transport/metadata_batch.h" 76 #include "src/core/lib/transport/promise_endpoint.h" 77 #include "src/core/lib/transport/transport.h" 78 79 namespace grpc_core { 80 namespace chaotic_good { 81 82 class ChaoticGoodServerTransport final : public Transport, 83 public ServerTransport { 84 public: 85 ChaoticGoodServerTransport( 86 const ChannelArgs& args, PromiseEndpoint control_endpoint, 87 PromiseEndpoint data_endpoint, 88 std::shared_ptr<grpc_event_engine::experimental::EventEngine> 89 event_engine, 90 HPackParser hpack_parser, HPackCompressor hpack_encoder); 91 ~ChaoticGoodServerTransport() override; 92 filter_stack_transport()93 FilterStackTransport* filter_stack_transport() override { return nullptr; } client_transport()94 ClientTransport* client_transport() override { return nullptr; } server_transport()95 ServerTransport* server_transport() override { return this; } GetTransportName()96 absl::string_view GetTransportName() const override { return "chaotic_good"; } SetPollset(grpc_stream *,grpc_pollset *)97 void SetPollset(grpc_stream*, grpc_pollset*) override {} SetPollsetSet(grpc_stream *,grpc_pollset_set *)98 void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {} 99 void PerformOp(grpc_transport_op*) override; GetEndpoint()100 grpc_endpoint* GetEndpoint() override { return nullptr; } Orphan()101 void Orphan() override { delete this; } 102 103 void SetAcceptor(Acceptor* acceptor) override; 104 void AbortWithError(); 105 106 private: 107 using StreamMap = absl::flat_hash_map<uint32_t, CallInitiator>; 108 109 absl::Status NewStream(uint32_t stream_id, CallInitiator call_initiator); 110 absl::optional<CallInitiator> LookupStream(uint32_t stream_id); 111 absl::optional<CallInitiator> ExtractStream(uint32_t stream_id); 112 auto SendCallInitialMetadataAndBody(uint32_t stream_id, 113 MpscSender<ServerFrame> outgoing_frames, 114 CallInitiator call_initiator); 115 auto SendCallBody(uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames, 116 CallInitiator call_initiator); 117 static auto SendFragment(ServerFragmentFrame frame, 118 MpscSender<ServerFrame> outgoing_frames, 119 CallInitiator call_initiator); 120 auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator); 121 auto OnTransportActivityDone(absl::string_view activity); 122 auto TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport); 123 auto ReadOneFrame(ChaoticGoodTransport& transport); 124 auto TransportWriteLoop(RefCountedPtr<ChaoticGoodTransport> transport); 125 // Read different parts of the server frame from control/data endpoints 126 // based on frame header. 127 // Resolves to a StatusOr<tuple<SliceBuffer, SliceBuffer>> 128 auto ReadFrameBody(Slice read_buffer); 129 void SendCancel(uint32_t stream_id, absl::Status why); 130 auto DeserializeAndPushFragmentToNewCall(FrameHeader frame_header, 131 BufferPair buffers, 132 ChaoticGoodTransport& transport); 133 auto DeserializeAndPushFragmentToExistingCall( 134 FrameHeader frame_header, BufferPair buffers, 135 ChaoticGoodTransport& transport); 136 auto MaybePushFragmentIntoCall(absl::optional<CallInitiator> call_initiator, 137 absl::Status error, ClientFragmentFrame frame, 138 uint32_t stream_id); 139 auto PushFragmentIntoCall(CallInitiator call_initiator, 140 ClientFragmentFrame frame, uint32_t stream_id); 141 142 Acceptor* acceptor_ = nullptr; 143 InterActivityLatch<void> got_acceptor_; 144 MpscReceiver<ServerFrame> outgoing_frames_; 145 // Assigned aligned bytes from setting frame. 146 size_t aligned_bytes_ = 64; 147 Mutex mu_; 148 // Map of stream incoming server frames, key is stream_id. 149 StreamMap stream_map_ ABSL_GUARDED_BY(mu_); 150 uint32_t last_seen_new_stream_id_ = 0; 151 grpc_event_engine::experimental::MemoryAllocator allocator_; 152 ActivityPtr writer_ ABSL_GUARDED_BY(mu_); 153 ActivityPtr reader_ ABSL_GUARDED_BY(mu_); ABSL_GUARDED_BY(mu_)154 ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){ 155 "chaotic_good_server", GRPC_CHANNEL_READY}; 156 }; 157 158 } // namespace chaotic_good 159 } // namespace grpc_core 160 161 #endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_TRANSPORT_H