1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/ext/transport/inproc/inproc_transport.h"
18
19 #include <atomic>
20
21 #include <grpc/grpc.h>
22 #include <grpc/support/log.h>
23
24 #include "src/core/ext/transport/inproc/legacy_inproc_transport.h"
25 #include "src/core/lib/config/core_configuration.h"
26 #include "src/core/lib/experiments/experiments.h"
27 #include "src/core/lib/gprpp/crash.h"
28 #include "src/core/lib/promise/promise.h"
29 #include "src/core/lib/promise/try_seq.h"
30 #include "src/core/lib/surface/channel_create.h"
31 #include "src/core/lib/surface/server.h"
32 #include "src/core/lib/transport/transport.h"
33
34 namespace grpc_core {
35
36 namespace {
37 class InprocServerTransport final : public RefCounted<InprocServerTransport>,
38 public Transport,
39 public ServerTransport {
40 public:
SetAcceptor(Acceptor * acceptor)41 void SetAcceptor(Acceptor* acceptor) override {
42 acceptor_ = acceptor;
43 ConnectionState expect = ConnectionState::kInitial;
44 state_.compare_exchange_strong(expect, ConnectionState::kReady,
45 std::memory_order_acq_rel,
46 std::memory_order_acquire);
47 MutexLock lock(&state_tracker_mu_);
48 state_tracker_.SetState(GRPC_CHANNEL_READY, absl::OkStatus(),
49 "accept function set");
50 }
51
Orphan()52 void Orphan() override { Unref(); }
53
filter_stack_transport()54 FilterStackTransport* filter_stack_transport() override { return nullptr; }
client_transport()55 ClientTransport* client_transport() override { return nullptr; }
server_transport()56 ServerTransport* server_transport() override { return this; }
GetTransportName() const57 absl::string_view GetTransportName() const override { return "inproc"; }
SetPollset(grpc_stream *,grpc_pollset *)58 void SetPollset(grpc_stream*, grpc_pollset*) override {}
SetPollsetSet(grpc_stream *,grpc_pollset_set *)59 void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
PerformOp(grpc_transport_op * op)60 void PerformOp(grpc_transport_op* op) override {
61 gpr_log(GPR_INFO, "inproc server op: %s",
62 grpc_transport_op_string(op).c_str());
63 if (op->start_connectivity_watch != nullptr) {
64 MutexLock lock(&state_tracker_mu_);
65 state_tracker_.AddWatcher(op->start_connectivity_watch_state,
66 std::move(op->start_connectivity_watch));
67 }
68 if (op->stop_connectivity_watch != nullptr) {
69 MutexLock lock(&state_tracker_mu_);
70 state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
71 }
72 if (op->set_accept_stream) {
73 Crash("set_accept_stream not supported on inproc transport");
74 }
75 }
GetEndpoint()76 grpc_endpoint* GetEndpoint() override { return nullptr; }
77
Disconnect(absl::Status error)78 void Disconnect(absl::Status error) {
79 if (disconnecting_.exchange(true, std::memory_order_relaxed)) return;
80 disconnect_error_ = std::move(error);
81 state_.store(ConnectionState::kDisconnected, std::memory_order_relaxed);
82 MutexLock lock(&state_tracker_mu_);
83 state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, disconnect_error_,
84 "inproc transport disconnected");
85 }
86
AcceptCall(ClientMetadata & md)87 absl::StatusOr<CallInitiator> AcceptCall(ClientMetadata& md) {
88 switch (state_.load(std::memory_order_acquire)) {
89 case ConnectionState::kInitial:
90 return absl::InternalError(
91 "inproc transport hasn't started accepting calls");
92 case ConnectionState::kDisconnected:
93 return absl::UnavailableError("inproc transport is disconnected");
94 case ConnectionState::kReady:
95 break;
96 }
97 return acceptor_->CreateCall(md, acceptor_->CreateArena());
98 }
99
100 private:
101 enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected };
102
103 std::atomic<ConnectionState> state_{ConnectionState::kInitial};
104 std::atomic<bool> disconnecting_{false};
105 Acceptor* acceptor_;
106 absl::Status disconnect_error_;
107 Mutex state_tracker_mu_;
ABSL_GUARDED_BY(state_tracker_mu_)108 ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(state_tracker_mu_){
109 "inproc_server_transport", GRPC_CHANNEL_CONNECTING};
110 };
111
112 class InprocClientTransport final : public Transport, public ClientTransport {
113 public:
StartCall(CallHandler call_handler)114 void StartCall(CallHandler call_handler) override {
115 call_handler.SpawnGuarded(
116 "pull_initial_metadata",
117 TrySeq(call_handler.PullClientInitialMetadata(),
118 [server_transport = server_transport_,
119 call_handler](ClientMetadataHandle md) {
120 auto call_initiator = server_transport->AcceptCall(*md);
121 if (!call_initiator.ok()) return call_initiator.status();
122 ForwardCall(call_handler, std::move(*call_initiator),
123 std::move(md));
124 return absl::OkStatus();
125 }));
126 }
127
Orphan()128 void Orphan() override { delete this; }
129
GetServerTransport()130 OrphanablePtr<Transport> GetServerTransport() {
131 return OrphanablePtr<Transport>(server_transport_->Ref().release());
132 }
133
filter_stack_transport()134 FilterStackTransport* filter_stack_transport() override { return nullptr; }
client_transport()135 ClientTransport* client_transport() override { return this; }
server_transport()136 ServerTransport* server_transport() override { return nullptr; }
GetTransportName() const137 absl::string_view GetTransportName() const override { return "inproc"; }
SetPollset(grpc_stream *,grpc_pollset *)138 void SetPollset(grpc_stream*, grpc_pollset*) override {}
SetPollsetSet(grpc_stream *,grpc_pollset_set *)139 void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
PerformOp(grpc_transport_op *)140 void PerformOp(grpc_transport_op*) override { Crash("unimplemented"); }
GetEndpoint()141 grpc_endpoint* GetEndpoint() override { return nullptr; }
142
143 private:
~InprocClientTransport()144 ~InprocClientTransport() override {
145 server_transport_->Disconnect(
146 absl::UnavailableError("Client transport closed"));
147 }
148
149 RefCountedPtr<InprocServerTransport> server_transport_ =
150 MakeRefCounted<InprocServerTransport>();
151 };
152
UsePromiseBasedTransport()153 bool UsePromiseBasedTransport() {
154 if (!IsPromiseBasedInprocTransportEnabled()) return false;
155 GPR_ASSERT(IsPromiseBasedClientCallEnabled());
156 GPR_ASSERT(IsPromiseBasedServerCallEnabled());
157 return true;
158 }
159
MakeLameChannel(absl::string_view why,absl::Status error)160 OrphanablePtr<Channel> MakeLameChannel(absl::string_view why,
161 absl::Status error) {
162 gpr_log(GPR_ERROR, "%s: %s", std::string(why).c_str(),
163 std::string(error.message()).c_str());
164 intptr_t integer;
165 grpc_status_code status = GRPC_STATUS_INTERNAL;
166 if (grpc_error_get_int(error, StatusIntProperty::kRpcStatus, &integer)) {
167 status = static_cast<grpc_status_code>(integer);
168 }
169 return OrphanablePtr<Channel>(Channel::FromC(grpc_lame_client_channel_create(
170 nullptr, status, std::string(why).c_str())));
171 }
172
MakeInprocChannel(Server * server,ChannelArgs client_channel_args)173 OrphanablePtr<Channel> MakeInprocChannel(Server* server,
174 ChannelArgs client_channel_args) {
175 auto transports = MakeInProcessTransportPair();
176 auto client_transport = std::move(transports.first);
177 auto server_transport = std::move(transports.second);
178 auto error =
179 server->SetupTransport(server_transport.get(), nullptr,
180 server->channel_args()
181 .Remove(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
182 .Remove(GRPC_ARG_MAX_CONNECTION_AGE_MS),
183 nullptr);
184 if (!error.ok()) {
185 return MakeLameChannel("Failed to create server channel", std::move(error));
186 }
187 std::ignore = server_transport.release(); // consumed by SetupTransport
188 auto channel = ChannelCreate(
189 "inproc",
190 client_channel_args.Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority"),
191 GRPC_CLIENT_DIRECT_CHANNEL, client_transport.release());
192 if (!channel.ok()) {
193 return MakeLameChannel("Failed to create client channel", channel.status());
194 }
195 return std::move(*channel);
196 }
197 } // namespace
198
199 std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>>
MakeInProcessTransportPair()200 MakeInProcessTransportPair() {
201 auto client_transport = MakeOrphanable<InprocClientTransport>();
202 auto server_transport = client_transport->GetServerTransport();
203 return std::make_pair(std::move(client_transport),
204 std::move(server_transport));
205 }
206
207 } // namespace grpc_core
208
grpc_inproc_channel_create(grpc_server * server,const grpc_channel_args * args,void * reserved)209 grpc_channel* grpc_inproc_channel_create(grpc_server* server,
210 const grpc_channel_args* args,
211 void* reserved) {
212 grpc_core::ApplicationCallbackExecCtx app_exec_ctx;
213 grpc_core::ExecCtx exec_ctx;
214 if (!grpc_core::UsePromiseBasedTransport()) {
215 return grpc_legacy_inproc_channel_create(server, args, reserved);
216 }
217 return grpc_core::MakeInprocChannel(grpc_core::Server::FromC(server),
218 grpc_core::CoreConfiguration::Get()
219 .channel_args_preconditioning()
220 .PreconditionChannelArgs(args))
221 .release()
222 ->c_ptr();
223 }
224