xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/chaotic_good/client_transport.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H
16 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <stdint.h>
21 #include <stdio.h>
22 
23 #include <cstdint>
24 #include <initializer_list>  // IWYU pragma: keep
25 #include <map>
26 #include <memory>
27 #include <tuple>
28 #include <type_traits>
29 #include <utility>
30 
31 #include "absl/base/thread_annotations.h"
32 #include "absl/container/flat_hash_map.h"
33 #include "absl/random/random.h"
34 #include "absl/status/status.h"
35 #include "absl/types/optional.h"
36 #include "absl/types/variant.h"
37 
38 #include <grpc/event_engine/event_engine.h>
39 #include <grpc/event_engine/memory_allocator.h>
40 #include <grpc/grpc.h>
41 
42 #include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h"
43 #include "src/core/ext/transport/chaotic_good/frame.h"
44 #include "src/core/ext/transport/chaotic_good/frame_header.h"
45 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
46 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
47 #include "src/core/lib/gprpp/sync.h"
48 #include "src/core/lib/promise/activity.h"
49 #include "src/core/lib/promise/context.h"
50 #include "src/core/lib/promise/for_each.h"
51 #include "src/core/lib/promise/if.h"
52 #include "src/core/lib/promise/inter_activity_pipe.h"
53 #include "src/core/lib/promise/loop.h"
54 #include "src/core/lib/promise/mpsc.h"
55 #include "src/core/lib/promise/pipe.h"
56 #include "src/core/lib/promise/poll.h"
57 #include "src/core/lib/promise/try_join.h"
58 #include "src/core/lib/promise/try_seq.h"
59 #include "src/core/lib/resource_quota/arena.h"
60 #include "src/core/lib/resource_quota/memory_quota.h"
61 #include "src/core/lib/slice/slice_buffer.h"
62 #include "src/core/lib/transport/metadata_batch.h"  // IWYU pragma: keep
63 #include "src/core/lib/transport/promise_endpoint.h"
64 #include "src/core/lib/transport/transport.h"
65 
66 namespace grpc_core {
67 namespace chaotic_good {
68 
69 class ChaoticGoodClientTransport final : public Transport,
70                                          public ClientTransport {
71  public:
72   ChaoticGoodClientTransport(
73       PromiseEndpoint control_endpoint, PromiseEndpoint data_endpoint,
74       const ChannelArgs& channel_args,
75       std::shared_ptr<grpc_event_engine::experimental::EventEngine>
76           event_engine,
77       HPackParser hpack_parser, HPackCompressor hpack_encoder);
78   ~ChaoticGoodClientTransport() override;
79 
filter_stack_transport()80   FilterStackTransport* filter_stack_transport() override { return nullptr; }
client_transport()81   ClientTransport* client_transport() override { return this; }
server_transport()82   ServerTransport* server_transport() override { return nullptr; }
GetTransportName()83   absl::string_view GetTransportName() const override { return "chaotic_good"; }
SetPollset(grpc_stream *,grpc_pollset *)84   void SetPollset(grpc_stream*, grpc_pollset*) override {}
SetPollsetSet(grpc_stream *,grpc_pollset_set *)85   void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
86   void PerformOp(grpc_transport_op*) override;
GetEndpoint()87   grpc_endpoint* GetEndpoint() override { return nullptr; }
Orphan()88   void Orphan() override {
89     AbortWithError();
90     delete this;
91   }
92 
93   void StartCall(CallHandler call_handler) override;
94   void AbortWithError();
95 
96  private:
97   // Queue size of each stream pipe is set to 2, so that for each stream read it
98   // will queue at most 2 frames.
99   static const size_t kServerFrameQueueSize = 2;
100   using StreamMap = absl::flat_hash_map<uint32_t, CallHandler>;
101 
102   uint32_t MakeStream(CallHandler call_handler);
103   absl::optional<CallHandler> LookupStream(uint32_t stream_id);
104   auto CallOutboundLoop(uint32_t stream_id, CallHandler call_handler);
105   auto OnTransportActivityDone();
106   auto TransportWriteLoop(RefCountedPtr<ChaoticGoodTransport> transport);
107   auto TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport);
108   // Push one frame into a call
109   auto PushFrameIntoCall(ServerFragmentFrame frame, CallHandler call_handler);
110 
111   grpc_event_engine::experimental::MemoryAllocator allocator_;
112   // Max buffer is set to 4, so that for stream writes each time it will queue
113   // at most 2 frames.
114   MpscReceiver<ClientFrame> outgoing_frames_;
115   // Assigned aligned bytes from setting frame.
116   size_t aligned_bytes_ = 64;
117   Mutex mu_;
118   uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
119   // Map of stream incoming server frames, key is stream_id.
120   StreamMap stream_map_ ABSL_GUARDED_BY(mu_);
121   ActivityPtr writer_;
122   ActivityPtr reader_;
ABSL_GUARDED_BY(mu_)123   ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){
124       "chaotic_good_client", GRPC_CHANNEL_READY};
125 };
126 
127 }  // namespace chaotic_good
128 }  // namespace grpc_core
129 
130 #endif  // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H
131