1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H 16 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <stdint.h> 21 #include <stdio.h> 22 23 #include <cstdint> 24 #include <initializer_list> // IWYU pragma: keep 25 #include <map> 26 #include <memory> 27 #include <tuple> 28 #include <type_traits> 29 #include <utility> 30 31 #include "absl/base/thread_annotations.h" 32 #include "absl/container/flat_hash_map.h" 33 #include "absl/random/random.h" 34 #include "absl/status/status.h" 35 #include "absl/types/optional.h" 36 #include "absl/types/variant.h" 37 38 #include <grpc/event_engine/event_engine.h> 39 #include <grpc/event_engine/memory_allocator.h> 40 #include <grpc/grpc.h> 41 42 #include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h" 43 #include "src/core/ext/transport/chaotic_good/frame.h" 44 #include "src/core/ext/transport/chaotic_good/frame_header.h" 45 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" 46 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" 47 #include "src/core/lib/gprpp/sync.h" 48 #include "src/core/lib/promise/activity.h" 49 #include "src/core/lib/promise/context.h" 50 #include "src/core/lib/promise/for_each.h" 51 #include "src/core/lib/promise/if.h" 52 #include "src/core/lib/promise/inter_activity_pipe.h" 53 #include "src/core/lib/promise/loop.h" 54 #include "src/core/lib/promise/mpsc.h" 55 #include "src/core/lib/promise/pipe.h" 56 #include "src/core/lib/promise/poll.h" 57 #include "src/core/lib/promise/try_join.h" 58 #include "src/core/lib/promise/try_seq.h" 59 #include "src/core/lib/resource_quota/arena.h" 60 #include "src/core/lib/resource_quota/memory_quota.h" 61 #include "src/core/lib/slice/slice_buffer.h" 62 #include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep 63 #include "src/core/lib/transport/promise_endpoint.h" 64 #include "src/core/lib/transport/transport.h" 65 66 namespace grpc_core { 67 namespace chaotic_good { 68 69 class ChaoticGoodClientTransport final : public Transport, 70 public ClientTransport { 71 public: 72 ChaoticGoodClientTransport( 73 PromiseEndpoint control_endpoint, PromiseEndpoint data_endpoint, 74 const ChannelArgs& channel_args, 75 std::shared_ptr<grpc_event_engine::experimental::EventEngine> 76 event_engine, 77 HPackParser hpack_parser, HPackCompressor hpack_encoder); 78 ~ChaoticGoodClientTransport() override; 79 filter_stack_transport()80 FilterStackTransport* filter_stack_transport() override { return nullptr; } client_transport()81 ClientTransport* client_transport() override { return this; } server_transport()82 ServerTransport* server_transport() override { return nullptr; } GetTransportName()83 absl::string_view GetTransportName() const override { return "chaotic_good"; } SetPollset(grpc_stream *,grpc_pollset *)84 void SetPollset(grpc_stream*, grpc_pollset*) override {} SetPollsetSet(grpc_stream *,grpc_pollset_set *)85 void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {} 86 void PerformOp(grpc_transport_op*) override; GetEndpoint()87 grpc_endpoint* GetEndpoint() override { return nullptr; } Orphan()88 void Orphan() override { 89 AbortWithError(); 90 delete this; 91 } 92 93 void StartCall(CallHandler call_handler) override; 94 void AbortWithError(); 95 96 private: 97 // Queue size of each stream pipe is set to 2, so that for each stream read it 98 // will queue at most 2 frames. 99 static const size_t kServerFrameQueueSize = 2; 100 using StreamMap = absl::flat_hash_map<uint32_t, CallHandler>; 101 102 uint32_t MakeStream(CallHandler call_handler); 103 absl::optional<CallHandler> LookupStream(uint32_t stream_id); 104 auto CallOutboundLoop(uint32_t stream_id, CallHandler call_handler); 105 auto OnTransportActivityDone(); 106 auto TransportWriteLoop(RefCountedPtr<ChaoticGoodTransport> transport); 107 auto TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport); 108 // Push one frame into a call 109 auto PushFrameIntoCall(ServerFragmentFrame frame, CallHandler call_handler); 110 111 grpc_event_engine::experimental::MemoryAllocator allocator_; 112 // Max buffer is set to 4, so that for stream writes each time it will queue 113 // at most 2 frames. 114 MpscReceiver<ClientFrame> outgoing_frames_; 115 // Assigned aligned bytes from setting frame. 116 size_t aligned_bytes_ = 64; 117 Mutex mu_; 118 uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1; 119 // Map of stream incoming server frames, key is stream_id. 120 StreamMap stream_map_ ABSL_GUARDED_BY(mu_); 121 ActivityPtr writer_; 122 ActivityPtr reader_; ABSL_GUARDED_BY(mu_)123 ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){ 124 "chaotic_good_client", GRPC_CHANNEL_READY}; 125 }; 126 127 } // namespace chaotic_good 128 } // namespace grpc_core 129 130 #endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H 131