xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/inproc/inproc_transport.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/ext/transport/inproc/inproc_transport.h"
18 
19 #include <atomic>
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/log.h>
23 
24 #include "src/core/ext/transport/inproc/legacy_inproc_transport.h"
25 #include "src/core/lib/config/core_configuration.h"
26 #include "src/core/lib/experiments/experiments.h"
27 #include "src/core/lib/gprpp/crash.h"
28 #include "src/core/lib/promise/promise.h"
29 #include "src/core/lib/promise/try_seq.h"
30 #include "src/core/lib/surface/channel_create.h"
31 #include "src/core/lib/surface/server.h"
32 #include "src/core/lib/transport/transport.h"
33 
34 namespace grpc_core {
35 
36 namespace {
37 class InprocServerTransport final : public RefCounted<InprocServerTransport>,
38                                     public Transport,
39                                     public ServerTransport {
40  public:
SetAcceptor(Acceptor * acceptor)41   void SetAcceptor(Acceptor* acceptor) override {
42     acceptor_ = acceptor;
43     ConnectionState expect = ConnectionState::kInitial;
44     state_.compare_exchange_strong(expect, ConnectionState::kReady,
45                                    std::memory_order_acq_rel,
46                                    std::memory_order_acquire);
47     MutexLock lock(&state_tracker_mu_);
48     state_tracker_.SetState(GRPC_CHANNEL_READY, absl::OkStatus(),
49                             "accept function set");
50   }
51 
Orphan()52   void Orphan() override { Unref(); }
53 
filter_stack_transport()54   FilterStackTransport* filter_stack_transport() override { return nullptr; }
client_transport()55   ClientTransport* client_transport() override { return nullptr; }
server_transport()56   ServerTransport* server_transport() override { return this; }
GetTransportName() const57   absl::string_view GetTransportName() const override { return "inproc"; }
SetPollset(grpc_stream *,grpc_pollset *)58   void SetPollset(grpc_stream*, grpc_pollset*) override {}
SetPollsetSet(grpc_stream *,grpc_pollset_set *)59   void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
PerformOp(grpc_transport_op * op)60   void PerformOp(grpc_transport_op* op) override {
61     gpr_log(GPR_INFO, "inproc server op: %s",
62             grpc_transport_op_string(op).c_str());
63     if (op->start_connectivity_watch != nullptr) {
64       MutexLock lock(&state_tracker_mu_);
65       state_tracker_.AddWatcher(op->start_connectivity_watch_state,
66                                 std::move(op->start_connectivity_watch));
67     }
68     if (op->stop_connectivity_watch != nullptr) {
69       MutexLock lock(&state_tracker_mu_);
70       state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
71     }
72     if (op->set_accept_stream) {
73       Crash("set_accept_stream not supported on inproc transport");
74     }
75   }
GetEndpoint()76   grpc_endpoint* GetEndpoint() override { return nullptr; }
77 
Disconnect(absl::Status error)78   void Disconnect(absl::Status error) {
79     if (disconnecting_.exchange(true, std::memory_order_relaxed)) return;
80     disconnect_error_ = std::move(error);
81     state_.store(ConnectionState::kDisconnected, std::memory_order_relaxed);
82     MutexLock lock(&state_tracker_mu_);
83     state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, disconnect_error_,
84                             "inproc transport disconnected");
85   }
86 
AcceptCall(ClientMetadata & md)87   absl::StatusOr<CallInitiator> AcceptCall(ClientMetadata& md) {
88     switch (state_.load(std::memory_order_acquire)) {
89       case ConnectionState::kInitial:
90         return absl::InternalError(
91             "inproc transport hasn't started accepting calls");
92       case ConnectionState::kDisconnected:
93         return absl::UnavailableError("inproc transport is disconnected");
94       case ConnectionState::kReady:
95         break;
96     }
97     return acceptor_->CreateCall(md, acceptor_->CreateArena());
98   }
99 
100  private:
101   enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected };
102 
103   std::atomic<ConnectionState> state_{ConnectionState::kInitial};
104   std::atomic<bool> disconnecting_{false};
105   Acceptor* acceptor_;
106   absl::Status disconnect_error_;
107   Mutex state_tracker_mu_;
ABSL_GUARDED_BY(state_tracker_mu_)108   ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(state_tracker_mu_){
109       "inproc_server_transport", GRPC_CHANNEL_CONNECTING};
110 };
111 
112 class InprocClientTransport final : public Transport, public ClientTransport {
113  public:
StartCall(CallHandler call_handler)114   void StartCall(CallHandler call_handler) override {
115     call_handler.SpawnGuarded(
116         "pull_initial_metadata",
117         TrySeq(call_handler.PullClientInitialMetadata(),
118                [server_transport = server_transport_,
119                 call_handler](ClientMetadataHandle md) {
120                  auto call_initiator = server_transport->AcceptCall(*md);
121                  if (!call_initiator.ok()) return call_initiator.status();
122                  ForwardCall(call_handler, std::move(*call_initiator),
123                              std::move(md));
124                  return absl::OkStatus();
125                }));
126   }
127 
Orphan()128   void Orphan() override { delete this; }
129 
GetServerTransport()130   OrphanablePtr<Transport> GetServerTransport() {
131     return OrphanablePtr<Transport>(server_transport_->Ref().release());
132   }
133 
filter_stack_transport()134   FilterStackTransport* filter_stack_transport() override { return nullptr; }
client_transport()135   ClientTransport* client_transport() override { return this; }
server_transport()136   ServerTransport* server_transport() override { return nullptr; }
GetTransportName() const137   absl::string_view GetTransportName() const override { return "inproc"; }
SetPollset(grpc_stream *,grpc_pollset *)138   void SetPollset(grpc_stream*, grpc_pollset*) override {}
SetPollsetSet(grpc_stream *,grpc_pollset_set *)139   void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
PerformOp(grpc_transport_op *)140   void PerformOp(grpc_transport_op*) override { Crash("unimplemented"); }
GetEndpoint()141   grpc_endpoint* GetEndpoint() override { return nullptr; }
142 
143  private:
~InprocClientTransport()144   ~InprocClientTransport() override {
145     server_transport_->Disconnect(
146         absl::UnavailableError("Client transport closed"));
147   }
148 
149   RefCountedPtr<InprocServerTransport> server_transport_ =
150       MakeRefCounted<InprocServerTransport>();
151 };
152 
UsePromiseBasedTransport()153 bool UsePromiseBasedTransport() {
154   if (!IsPromiseBasedInprocTransportEnabled()) return false;
155   GPR_ASSERT(IsPromiseBasedClientCallEnabled());
156   GPR_ASSERT(IsPromiseBasedServerCallEnabled());
157   return true;
158 }
159 
MakeLameChannel(absl::string_view why,absl::Status error)160 OrphanablePtr<Channel> MakeLameChannel(absl::string_view why,
161                                        absl::Status error) {
162   gpr_log(GPR_ERROR, "%s: %s", std::string(why).c_str(),
163           std::string(error.message()).c_str());
164   intptr_t integer;
165   grpc_status_code status = GRPC_STATUS_INTERNAL;
166   if (grpc_error_get_int(error, StatusIntProperty::kRpcStatus, &integer)) {
167     status = static_cast<grpc_status_code>(integer);
168   }
169   return OrphanablePtr<Channel>(Channel::FromC(grpc_lame_client_channel_create(
170       nullptr, status, std::string(why).c_str())));
171 }
172 
MakeInprocChannel(Server * server,ChannelArgs client_channel_args)173 OrphanablePtr<Channel> MakeInprocChannel(Server* server,
174                                          ChannelArgs client_channel_args) {
175   auto transports = MakeInProcessTransportPair();
176   auto client_transport = std::move(transports.first);
177   auto server_transport = std::move(transports.second);
178   auto error =
179       server->SetupTransport(server_transport.get(), nullptr,
180                              server->channel_args()
181                                  .Remove(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
182                                  .Remove(GRPC_ARG_MAX_CONNECTION_AGE_MS),
183                              nullptr);
184   if (!error.ok()) {
185     return MakeLameChannel("Failed to create server channel", std::move(error));
186   }
187   std::ignore = server_transport.release();  // consumed by SetupTransport
188   auto channel = ChannelCreate(
189       "inproc",
190       client_channel_args.Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority"),
191       GRPC_CLIENT_DIRECT_CHANNEL, client_transport.release());
192   if (!channel.ok()) {
193     return MakeLameChannel("Failed to create client channel", channel.status());
194   }
195   return std::move(*channel);
196 }
197 }  // namespace
198 
199 std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>>
MakeInProcessTransportPair()200 MakeInProcessTransportPair() {
201   auto client_transport = MakeOrphanable<InprocClientTransport>();
202   auto server_transport = client_transport->GetServerTransport();
203   return std::make_pair(std::move(client_transport),
204                         std::move(server_transport));
205 }
206 
207 }  // namespace grpc_core
208 
grpc_inproc_channel_create(grpc_server * server,const grpc_channel_args * args,void * reserved)209 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
210                                          const grpc_channel_args* args,
211                                          void* reserved) {
212   grpc_core::ApplicationCallbackExecCtx app_exec_ctx;
213   grpc_core::ExecCtx exec_ctx;
214   if (!grpc_core::UsePromiseBasedTransport()) {
215     return grpc_legacy_inproc_channel_create(server, args, reserved);
216   }
217   return grpc_core::MakeInprocChannel(grpc_core::Server::FromC(server),
218                                       grpc_core::CoreConfiguration::Get()
219                                           .channel_args_preconditioning()
220                                           .PreconditionChannelArgs(args))
221       .release()
222       ->c_ptr();
223 }
224