1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/surface/legacy_channel.h"
22
23 #include "absl/base/thread_annotations.h"
24 #include "absl/status/status.h"
25 #include "absl/types/optional.h"
26
27 #include <grpc/event_engine/event_engine.h>
28 #include <grpc/grpc.h>
29 #include <grpc/impl/connectivity_state.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32
33 #include "src/core/client_channel/client_channel_filter.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/channel/channel_fwd.h"
36 #include "src/core/lib/channel/channel_stack.h"
37 #include "src/core/lib/channel/channel_stack_builder_impl.h"
38 #include "src/core/lib/channel/channelz.h"
39 #include "src/core/lib/channel/metrics.h"
40 #include "src/core/lib/config/core_configuration.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/debug/stats_data.h"
43 #include "src/core/lib/gprpp/crash.h"
44 #include "src/core/lib/gprpp/dual_ref_counted.h"
45 #include "src/core/lib/gprpp/ref_counted_ptr.h"
46 #include "src/core/lib/gprpp/sync.h"
47 #include "src/core/lib/gprpp/time.h"
48 #include "src/core/lib/iomgr/closure.h"
49 #include "src/core/lib/iomgr/error.h"
50 #include "src/core/lib/iomgr/exec_ctx.h"
51 #include "src/core/lib/resource_quota/resource_quota.h"
52 #include "src/core/lib/surface/call.h"
53 #include "src/core/lib/surface/channel.h"
54 #include "src/core/lib/surface/channel_init.h"
55 #include "src/core/lib/surface/channel_stack_type.h"
56 #include "src/core/lib/surface/completion_queue.h"
57 #include "src/core/lib/surface/init_internally.h"
58 #include "src/core/lib/surface/lame_client.h"
59 #include "src/core/lib/transport/transport.h"
60
61 namespace grpc_core {
62
Create(std::string target,ChannelArgs args,grpc_channel_stack_type channel_stack_type)63 absl::StatusOr<OrphanablePtr<Channel>> LegacyChannel::Create(
64 std::string target, ChannelArgs args,
65 grpc_channel_stack_type channel_stack_type) {
66 if (grpc_channel_stack_type_is_client(channel_stack_type)) {
67 auto channel_args_mutator =
68 grpc_channel_args_get_client_channel_creation_mutator();
69 if (channel_args_mutator != nullptr) {
70 args = channel_args_mutator(target.c_str(), args, channel_stack_type);
71 }
72 }
73 ChannelStackBuilderImpl builder(
74 grpc_channel_stack_type_string(channel_stack_type), channel_stack_type,
75 args);
76 builder.SetTarget(target.c_str());
77 if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
78 return nullptr;
79 }
80 // Only need to update stats for server channels here. Stats for client
81 // channels are handled in our base class.
82 if (builder.channel_stack_type() == GRPC_SERVER_CHANNEL) {
83 global_stats().IncrementServerChannelsCreated();
84 }
85 absl::StatusOr<RefCountedPtr<grpc_channel_stack>> r = builder.Build();
86 if (!r.ok()) {
87 auto status = r.status();
88 gpr_log(GPR_ERROR, "channel stack builder failed: %s",
89 status.ToString().c_str());
90 return status;
91 }
92 if (channel_stack_type == GRPC_SERVER_CHANNEL) {
93 *(*r)->stats_plugin_group =
94 GlobalStatsPluginRegistry::GetStatsPluginsForServer(args);
95 } else {
96 std::string authority = args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY)
97 .value_or(CoreConfiguration::Get()
98 .resolver_registry()
99 .GetDefaultAuthority(target));
100 *(*r)->stats_plugin_group =
101 GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
102 experimental::StatsPluginChannelScope(target, authority));
103 }
104 return MakeOrphanable<LegacyChannel>(
105 grpc_channel_stack_type_is_client(builder.channel_stack_type()),
106 builder.IsPromising(), std::move(target), args, std::move(*r));
107 }
108
LegacyChannel(bool is_client,bool is_promising,std::string target,const ChannelArgs & channel_args,RefCountedPtr<grpc_channel_stack> channel_stack)109 LegacyChannel::LegacyChannel(bool is_client, bool is_promising,
110 std::string target,
111 const ChannelArgs& channel_args,
112 RefCountedPtr<grpc_channel_stack> channel_stack)
113 : Channel(std::move(target), channel_args),
114 is_client_(is_client),
115 is_promising_(is_promising),
116 channel_stack_(std::move(channel_stack)),
117 allocator_(channel_args.GetObject<ResourceQuota>()
118 ->memory_quota()
119 ->CreateMemoryOwner()) {
120 // We need to make sure that grpc_shutdown() does not shut things down
121 // until after the channel is destroyed. However, the channel may not
122 // actually be destroyed by the time grpc_channel_destroy() returns,
123 // since there may be other existing refs to the channel. If those
124 // refs are held by things that are visible to the wrapped language
125 // (such as outstanding calls on the channel), then the wrapped
126 // language can be responsible for making sure that grpc_shutdown()
127 // does not run until after those refs are released. However, the
128 // channel may also have refs to itself held internally for various
129 // things that need to be cleaned up at channel destruction (e.g.,
130 // LB policies, subchannels, etc), and because these refs are not
131 // visible to the wrapped language, it cannot be responsible for
132 // deferring grpc_shutdown() until after they are released. To
133 // accommodate that, we call grpc_init() here and then call
134 // grpc_shutdown() when the channel is actually destroyed, thus
135 // ensuring that shutdown is deferred until that point.
136 InitInternally();
137 RefCountedPtr<channelz::ChannelNode> node;
138 if (channelz_node() != nullptr) {
139 node = channelz_node()->RefAsSubclass<channelz::ChannelNode>();
140 }
141 *channel_stack_->on_destroy = [node = std::move(node)]() {
142 if (node != nullptr) {
143 node->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
144 grpc_slice_from_static_string("Channel destroyed"));
145 }
146 ShutdownInternally();
147 };
148 }
149
Orphan()150 void LegacyChannel::Orphan() {
151 grpc_transport_op* op = grpc_make_transport_op(nullptr);
152 op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
153 grpc_channel_element* elem =
154 grpc_channel_stack_element(channel_stack_.get(), 0);
155 elem->filter->start_transport_op(elem, op);
156 Unref();
157 }
158
IsLame() const159 bool LegacyChannel::IsLame() const {
160 grpc_channel_element* elem =
161 grpc_channel_stack_last_element(channel_stack_.get());
162 return elem->filter == &LameClientFilter::kFilter;
163 }
164
CreateCall(grpc_call * parent_call,uint32_t propagation_mask,grpc_completion_queue * cq,grpc_pollset_set * pollset_set_alternative,Slice path,absl::optional<Slice> authority,Timestamp deadline,bool registered_method)165 grpc_call* LegacyChannel::CreateCall(
166 grpc_call* parent_call, uint32_t propagation_mask,
167 grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
168 Slice path, absl::optional<Slice> authority, Timestamp deadline,
169 bool registered_method) {
170 GPR_ASSERT(is_client_);
171 GPR_ASSERT(!(cq != nullptr && pollset_set_alternative != nullptr));
172 grpc_call_create_args args;
173 args.channel = Ref();
174 args.server = nullptr;
175 args.parent = parent_call;
176 args.propagation_mask = propagation_mask;
177 args.cq = cq;
178 args.pollset_set_alternative = pollset_set_alternative;
179 args.server_transport_data = nullptr;
180 args.path = std::move(path);
181 args.authority = std::move(authority);
182 args.send_deadline = deadline;
183 args.registered_method = registered_method;
184 grpc_call* call;
185 GRPC_LOG_IF_ERROR("call_create", grpc_call_create(&args, &call));
186 return call;
187 }
188
CheckConnectivityState(bool try_to_connect)189 grpc_connectivity_state LegacyChannel::CheckConnectivityState(
190 bool try_to_connect) {
191 // Forward through to the underlying client channel.
192 ClientChannelFilter* client_channel = GetClientChannelFilter();
193 if (GPR_UNLIKELY(client_channel == nullptr)) {
194 if (IsLame()) return GRPC_CHANNEL_TRANSIENT_FAILURE;
195 gpr_log(GPR_ERROR,
196 "grpc_channel_check_connectivity_state called on something that is "
197 "not a client channel");
198 return GRPC_CHANNEL_SHUTDOWN;
199 }
200 return client_channel->CheckConnectivityState(try_to_connect);
201 }
202
SupportsConnectivityWatcher() const203 bool LegacyChannel::SupportsConnectivityWatcher() const {
204 return GetClientChannelFilter() != nullptr;
205 }
206
207 // A fire-and-forget object to handle external connectivity state watches.
208 class LegacyChannel::StateWatcher final : public DualRefCounted<StateWatcher> {
209 public:
StateWatcher(RefCountedPtr<LegacyChannel> channel,grpc_completion_queue * cq,void * tag,grpc_connectivity_state last_observed_state,Timestamp deadline)210 StateWatcher(RefCountedPtr<LegacyChannel> channel, grpc_completion_queue* cq,
211 void* tag, grpc_connectivity_state last_observed_state,
212 Timestamp deadline)
213 : channel_(std::move(channel)),
214 cq_(cq),
215 tag_(tag),
216 state_(last_observed_state) {
217 GPR_ASSERT(grpc_cq_begin_op(cq, tag));
218 GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr);
219 ClientChannelFilter* client_channel = channel_->GetClientChannelFilter();
220 if (client_channel == nullptr) {
221 // If the target URI used to create the channel was invalid, channel
222 // stack initialization failed, and that caused us to create a lame
223 // channel. In that case, connectivity state will never change (it
224 // will always be TRANSIENT_FAILURE), so we don't actually start a
225 // watch, but we are hiding that fact from the application.
226 if (channel_->IsLame()) {
227 // A ref is held by the timer callback.
228 StartTimer(deadline);
229 // Ref from object creation needs to be freed here since lame channel
230 // does not have a watcher.
231 Unref();
232 return;
233 }
234 Crash(
235 "grpc_channel_watch_connectivity_state called on something that is "
236 "not a client channel");
237 }
238 // Ref from object creation is held by the watcher callback.
239 auto* watcher_timer_init_state = new WatcherTimerInitState(this, deadline);
240 client_channel->AddExternalConnectivityWatcher(
241 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &state_,
242 &on_complete_, watcher_timer_init_state->closure());
243 }
244
245 private:
246 // A fire-and-forget object used to delay starting the timer until the
247 // ClientChannelFilter actually starts the watch.
248 class WatcherTimerInitState final {
249 public:
WatcherTimerInitState(StateWatcher * state_watcher,Timestamp deadline)250 WatcherTimerInitState(StateWatcher* state_watcher, Timestamp deadline)
251 : state_watcher_(state_watcher), deadline_(deadline) {
252 GRPC_CLOSURE_INIT(&closure_, WatcherTimerInit, this, nullptr);
253 }
254
closure()255 grpc_closure* closure() { return &closure_; }
256
257 private:
WatcherTimerInit(void * arg,grpc_error_handle)258 static void WatcherTimerInit(void* arg, grpc_error_handle /*error*/) {
259 auto* self = static_cast<WatcherTimerInitState*>(arg);
260 self->state_watcher_->StartTimer(self->deadline_);
261 delete self;
262 }
263
264 StateWatcher* state_watcher_;
265 Timestamp deadline_;
266 grpc_closure closure_;
267 };
268
StartTimer(Timestamp deadline)269 void StartTimer(Timestamp deadline) {
270 const Duration timeout = deadline - Timestamp::Now();
271 MutexLock lock(&mu_);
272 timer_handle_ =
273 channel_->event_engine()->RunAfter(timeout, [self = Ref()]() mutable {
274 ApplicationCallbackExecCtx callback_exec_ctx;
275 ExecCtx exec_ctx;
276 self->TimeoutComplete();
277 // StateWatcher deletion might require an active ExecCtx.
278 self.reset();
279 });
280 }
281
TimeoutComplete()282 void TimeoutComplete() {
283 timer_fired_ = true;
284 // If this is a client channel (not a lame channel), cancel the watch.
285 ClientChannelFilter* client_channel = channel_->GetClientChannelFilter();
286 if (client_channel != nullptr) {
287 client_channel->CancelExternalConnectivityWatcher(&on_complete_);
288 }
289 }
290
WatchComplete(void * arg,grpc_error_handle error)291 static void WatchComplete(void* arg, grpc_error_handle error) {
292 RefCountedPtr<StateWatcher> self(static_cast<StateWatcher*>(arg));
293 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
294 GRPC_LOG_IF_ERROR("watch_completion_error", error);
295 }
296 MutexLock lock(&self->mu_);
297 if (self->timer_handle_.has_value()) {
298 self->channel_->event_engine()->Cancel(*self->timer_handle_);
299 }
300 }
301
302 // Invoked when both strong refs are released.
Orphaned()303 void Orphaned() override {
304 WeakRef().release(); // Take a weak ref until completion is finished.
305 grpc_error_handle error =
306 timer_fired_
307 ? GRPC_ERROR_CREATE("Timed out waiting for connection state change")
308 : absl::OkStatus();
309 grpc_cq_end_op(cq_, tag_, error, FinishedCompletion, this,
310 &completion_storage_);
311 }
312
313 // Called when the completion is returned to the CQ.
FinishedCompletion(void * arg,grpc_cq_completion *)314 static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
315 auto* self = static_cast<StateWatcher*>(arg);
316 self->WeakUnref();
317 }
318
319 RefCountedPtr<LegacyChannel> channel_;
320 grpc_completion_queue* cq_;
321 void* tag_;
322
323 grpc_connectivity_state state_;
324 grpc_cq_completion completion_storage_;
325 grpc_closure on_complete_;
326
327 // timer_handle_ might be accessed in parallel from multiple threads, e.g.
328 // timer callback fired immediately on an EventEngine thread before
329 // RunAfter() returns.
330 Mutex mu_;
331 absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
332 timer_handle_ ABSL_GUARDED_BY(mu_);
333 bool timer_fired_ = false;
334 };
335
WatchConnectivityState(grpc_connectivity_state last_observed_state,Timestamp deadline,grpc_completion_queue * cq,void * tag)336 void LegacyChannel::WatchConnectivityState(
337 grpc_connectivity_state last_observed_state, Timestamp deadline,
338 grpc_completion_queue* cq, void* tag) {
339 new StateWatcher(RefAsSubclass<LegacyChannel>(), cq, tag, last_observed_state,
340 deadline);
341 }
342
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)343 void LegacyChannel::AddConnectivityWatcher(
344 grpc_connectivity_state initial_state,
345 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
346 auto* client_channel = GetClientChannelFilter();
347 GPR_ASSERT(client_channel != nullptr);
348 client_channel->AddConnectivityWatcher(initial_state, std::move(watcher));
349 }
350
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)351 void LegacyChannel::RemoveConnectivityWatcher(
352 AsyncConnectivityStateWatcherInterface* watcher) {
353 auto* client_channel = GetClientChannelFilter();
354 GPR_ASSERT(client_channel != nullptr);
355 client_channel->RemoveConnectivityWatcher(watcher);
356 }
357
GetInfo(const grpc_channel_info * channel_info)358 void LegacyChannel::GetInfo(const grpc_channel_info* channel_info) {
359 grpc_channel_element* elem =
360 grpc_channel_stack_element(channel_stack_.get(), 0);
361 elem->filter->get_channel_info(elem, channel_info);
362 }
363
ResetConnectionBackoff()364 void LegacyChannel::ResetConnectionBackoff() {
365 grpc_transport_op* op = grpc_make_transport_op(nullptr);
366 op->reset_connect_backoff = true;
367 grpc_channel_element* elem =
368 grpc_channel_stack_element(channel_stack_.get(), 0);
369 elem->filter->start_transport_op(elem, op);
370 }
371
372 namespace {
373
374 struct ping_result {
375 grpc_closure closure;
376 void* tag;
377 grpc_completion_queue* cq;
378 grpc_cq_completion completion_storage;
379 };
ping_destroy(void * arg,grpc_cq_completion *)380 void ping_destroy(void* arg, grpc_cq_completion* /*storage*/) { gpr_free(arg); }
381
ping_done(void * arg,grpc_error_handle error)382 void ping_done(void* arg, grpc_error_handle error) {
383 ping_result* pr = static_cast<ping_result*>(arg);
384 grpc_cq_end_op(pr->cq, pr->tag, error, ping_destroy, pr,
385 &pr->completion_storage);
386 }
387
388 } // namespace
389
Ping(grpc_completion_queue * cq,void * tag)390 void LegacyChannel::Ping(grpc_completion_queue* cq, void* tag) {
391 ping_result* pr = static_cast<ping_result*>(gpr_malloc(sizeof(*pr)));
392 pr->tag = tag;
393 pr->cq = cq;
394 GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
395 grpc_transport_op* op = grpc_make_transport_op(nullptr);
396 op->send_ping.on_ack = &pr->closure;
397 op->bind_pollset = grpc_cq_pollset(cq);
398 GPR_ASSERT(grpc_cq_begin_op(cq, tag));
399 grpc_channel_element* top_elem =
400 grpc_channel_stack_element(channel_stack_.get(), 0);
401 top_elem->filter->start_transport_op(top_elem, op);
402 }
403
GetClientChannelFilter() const404 ClientChannelFilter* LegacyChannel::GetClientChannelFilter() const {
405 grpc_channel_element* elem =
406 grpc_channel_stack_last_element(channel_stack_.get());
407 if (elem->filter != &ClientChannelFilter::kFilterVtableWithPromises &&
408 elem->filter != &ClientChannelFilter::kFilterVtableWithoutPromises) {
409 return nullptr;
410 }
411 return static_cast<ClientChannelFilter*>(elem->channel_data);
412 }
413
414 } // namespace grpc_core
415