xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/surface/legacy_channel.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/surface/legacy_channel.h"
22 
23 #include "absl/base/thread_annotations.h"
24 #include "absl/status/status.h"
25 #include "absl/types/optional.h"
26 
27 #include <grpc/event_engine/event_engine.h>
28 #include <grpc/grpc.h>
29 #include <grpc/impl/connectivity_state.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 
33 #include "src/core/client_channel/client_channel_filter.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/channel/channel_fwd.h"
36 #include "src/core/lib/channel/channel_stack.h"
37 #include "src/core/lib/channel/channel_stack_builder_impl.h"
38 #include "src/core/lib/channel/channelz.h"
39 #include "src/core/lib/channel/metrics.h"
40 #include "src/core/lib/config/core_configuration.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/debug/stats_data.h"
43 #include "src/core/lib/gprpp/crash.h"
44 #include "src/core/lib/gprpp/dual_ref_counted.h"
45 #include "src/core/lib/gprpp/ref_counted_ptr.h"
46 #include "src/core/lib/gprpp/sync.h"
47 #include "src/core/lib/gprpp/time.h"
48 #include "src/core/lib/iomgr/closure.h"
49 #include "src/core/lib/iomgr/error.h"
50 #include "src/core/lib/iomgr/exec_ctx.h"
51 #include "src/core/lib/resource_quota/resource_quota.h"
52 #include "src/core/lib/surface/call.h"
53 #include "src/core/lib/surface/channel.h"
54 #include "src/core/lib/surface/channel_init.h"
55 #include "src/core/lib/surface/channel_stack_type.h"
56 #include "src/core/lib/surface/completion_queue.h"
57 #include "src/core/lib/surface/init_internally.h"
58 #include "src/core/lib/surface/lame_client.h"
59 #include "src/core/lib/transport/transport.h"
60 
61 namespace grpc_core {
62 
Create(std::string target,ChannelArgs args,grpc_channel_stack_type channel_stack_type)63 absl::StatusOr<OrphanablePtr<Channel>> LegacyChannel::Create(
64     std::string target, ChannelArgs args,
65     grpc_channel_stack_type channel_stack_type) {
66   if (grpc_channel_stack_type_is_client(channel_stack_type)) {
67     auto channel_args_mutator =
68         grpc_channel_args_get_client_channel_creation_mutator();
69     if (channel_args_mutator != nullptr) {
70       args = channel_args_mutator(target.c_str(), args, channel_stack_type);
71     }
72   }
73   ChannelStackBuilderImpl builder(
74       grpc_channel_stack_type_string(channel_stack_type), channel_stack_type,
75       args);
76   builder.SetTarget(target.c_str());
77   if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
78     return nullptr;
79   }
80   // Only need to update stats for server channels here.  Stats for client
81   // channels are handled in our base class.
82   if (builder.channel_stack_type() == GRPC_SERVER_CHANNEL) {
83     global_stats().IncrementServerChannelsCreated();
84   }
85   absl::StatusOr<RefCountedPtr<grpc_channel_stack>> r = builder.Build();
86   if (!r.ok()) {
87     auto status = r.status();
88     gpr_log(GPR_ERROR, "channel stack builder failed: %s",
89             status.ToString().c_str());
90     return status;
91   }
92   if (channel_stack_type == GRPC_SERVER_CHANNEL) {
93     *(*r)->stats_plugin_group =
94         GlobalStatsPluginRegistry::GetStatsPluginsForServer(args);
95   } else {
96     std::string authority = args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY)
97                                 .value_or(CoreConfiguration::Get()
98                                               .resolver_registry()
99                                               .GetDefaultAuthority(target));
100     *(*r)->stats_plugin_group =
101         GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
102             experimental::StatsPluginChannelScope(target, authority));
103   }
104   return MakeOrphanable<LegacyChannel>(
105       grpc_channel_stack_type_is_client(builder.channel_stack_type()),
106       builder.IsPromising(), std::move(target), args, std::move(*r));
107 }
108 
LegacyChannel(bool is_client,bool is_promising,std::string target,const ChannelArgs & channel_args,RefCountedPtr<grpc_channel_stack> channel_stack)109 LegacyChannel::LegacyChannel(bool is_client, bool is_promising,
110                              std::string target,
111                              const ChannelArgs& channel_args,
112                              RefCountedPtr<grpc_channel_stack> channel_stack)
113     : Channel(std::move(target), channel_args),
114       is_client_(is_client),
115       is_promising_(is_promising),
116       channel_stack_(std::move(channel_stack)),
117       allocator_(channel_args.GetObject<ResourceQuota>()
118                      ->memory_quota()
119                      ->CreateMemoryOwner()) {
120   // We need to make sure that grpc_shutdown() does not shut things down
121   // until after the channel is destroyed.  However, the channel may not
122   // actually be destroyed by the time grpc_channel_destroy() returns,
123   // since there may be other existing refs to the channel.  If those
124   // refs are held by things that are visible to the wrapped language
125   // (such as outstanding calls on the channel), then the wrapped
126   // language can be responsible for making sure that grpc_shutdown()
127   // does not run until after those refs are released.  However, the
128   // channel may also have refs to itself held internally for various
129   // things that need to be cleaned up at channel destruction (e.g.,
130   // LB policies, subchannels, etc), and because these refs are not
131   // visible to the wrapped language, it cannot be responsible for
132   // deferring grpc_shutdown() until after they are released.  To
133   // accommodate that, we call grpc_init() here and then call
134   // grpc_shutdown() when the channel is actually destroyed, thus
135   // ensuring that shutdown is deferred until that point.
136   InitInternally();
137   RefCountedPtr<channelz::ChannelNode> node;
138   if (channelz_node() != nullptr) {
139     node = channelz_node()->RefAsSubclass<channelz::ChannelNode>();
140   }
141   *channel_stack_->on_destroy = [node = std::move(node)]() {
142     if (node != nullptr) {
143       node->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
144                           grpc_slice_from_static_string("Channel destroyed"));
145     }
146     ShutdownInternally();
147   };
148 }
149 
Orphan()150 void LegacyChannel::Orphan() {
151   grpc_transport_op* op = grpc_make_transport_op(nullptr);
152   op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
153   grpc_channel_element* elem =
154       grpc_channel_stack_element(channel_stack_.get(), 0);
155   elem->filter->start_transport_op(elem, op);
156   Unref();
157 }
158 
IsLame() const159 bool LegacyChannel::IsLame() const {
160   grpc_channel_element* elem =
161       grpc_channel_stack_last_element(channel_stack_.get());
162   return elem->filter == &LameClientFilter::kFilter;
163 }
164 
CreateCall(grpc_call * parent_call,uint32_t propagation_mask,grpc_completion_queue * cq,grpc_pollset_set * pollset_set_alternative,Slice path,absl::optional<Slice> authority,Timestamp deadline,bool registered_method)165 grpc_call* LegacyChannel::CreateCall(
166     grpc_call* parent_call, uint32_t propagation_mask,
167     grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
168     Slice path, absl::optional<Slice> authority, Timestamp deadline,
169     bool registered_method) {
170   GPR_ASSERT(is_client_);
171   GPR_ASSERT(!(cq != nullptr && pollset_set_alternative != nullptr));
172   grpc_call_create_args args;
173   args.channel = Ref();
174   args.server = nullptr;
175   args.parent = parent_call;
176   args.propagation_mask = propagation_mask;
177   args.cq = cq;
178   args.pollset_set_alternative = pollset_set_alternative;
179   args.server_transport_data = nullptr;
180   args.path = std::move(path);
181   args.authority = std::move(authority);
182   args.send_deadline = deadline;
183   args.registered_method = registered_method;
184   grpc_call* call;
185   GRPC_LOG_IF_ERROR("call_create", grpc_call_create(&args, &call));
186   return call;
187 }
188 
CheckConnectivityState(bool try_to_connect)189 grpc_connectivity_state LegacyChannel::CheckConnectivityState(
190     bool try_to_connect) {
191   // Forward through to the underlying client channel.
192   ClientChannelFilter* client_channel = GetClientChannelFilter();
193   if (GPR_UNLIKELY(client_channel == nullptr)) {
194     if (IsLame()) return GRPC_CHANNEL_TRANSIENT_FAILURE;
195     gpr_log(GPR_ERROR,
196             "grpc_channel_check_connectivity_state called on something that is "
197             "not a client channel");
198     return GRPC_CHANNEL_SHUTDOWN;
199   }
200   return client_channel->CheckConnectivityState(try_to_connect);
201 }
202 
SupportsConnectivityWatcher() const203 bool LegacyChannel::SupportsConnectivityWatcher() const {
204   return GetClientChannelFilter() != nullptr;
205 }
206 
207 // A fire-and-forget object to handle external connectivity state watches.
208 class LegacyChannel::StateWatcher final : public DualRefCounted<StateWatcher> {
209  public:
StateWatcher(RefCountedPtr<LegacyChannel> channel,grpc_completion_queue * cq,void * tag,grpc_connectivity_state last_observed_state,Timestamp deadline)210   StateWatcher(RefCountedPtr<LegacyChannel> channel, grpc_completion_queue* cq,
211                void* tag, grpc_connectivity_state last_observed_state,
212                Timestamp deadline)
213       : channel_(std::move(channel)),
214         cq_(cq),
215         tag_(tag),
216         state_(last_observed_state) {
217     GPR_ASSERT(grpc_cq_begin_op(cq, tag));
218     GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr);
219     ClientChannelFilter* client_channel = channel_->GetClientChannelFilter();
220     if (client_channel == nullptr) {
221       // If the target URI used to create the channel was invalid, channel
222       // stack initialization failed, and that caused us to create a lame
223       // channel.  In that case, connectivity state will never change (it
224       // will always be TRANSIENT_FAILURE), so we don't actually start a
225       // watch, but we are hiding that fact from the application.
226       if (channel_->IsLame()) {
227         // A ref is held by the timer callback.
228         StartTimer(deadline);
229         // Ref from object creation needs to be freed here since lame channel
230         // does not have a watcher.
231         Unref();
232         return;
233       }
234       Crash(
235           "grpc_channel_watch_connectivity_state called on something that is "
236           "not a client channel");
237     }
238     // Ref from object creation is held by the watcher callback.
239     auto* watcher_timer_init_state = new WatcherTimerInitState(this, deadline);
240     client_channel->AddExternalConnectivityWatcher(
241         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &state_,
242         &on_complete_, watcher_timer_init_state->closure());
243   }
244 
245  private:
246   // A fire-and-forget object used to delay starting the timer until the
247   // ClientChannelFilter actually starts the watch.
248   class WatcherTimerInitState final {
249    public:
WatcherTimerInitState(StateWatcher * state_watcher,Timestamp deadline)250     WatcherTimerInitState(StateWatcher* state_watcher, Timestamp deadline)
251         : state_watcher_(state_watcher), deadline_(deadline) {
252       GRPC_CLOSURE_INIT(&closure_, WatcherTimerInit, this, nullptr);
253     }
254 
closure()255     grpc_closure* closure() { return &closure_; }
256 
257    private:
WatcherTimerInit(void * arg,grpc_error_handle)258     static void WatcherTimerInit(void* arg, grpc_error_handle /*error*/) {
259       auto* self = static_cast<WatcherTimerInitState*>(arg);
260       self->state_watcher_->StartTimer(self->deadline_);
261       delete self;
262     }
263 
264     StateWatcher* state_watcher_;
265     Timestamp deadline_;
266     grpc_closure closure_;
267   };
268 
StartTimer(Timestamp deadline)269   void StartTimer(Timestamp deadline) {
270     const Duration timeout = deadline - Timestamp::Now();
271     MutexLock lock(&mu_);
272     timer_handle_ =
273         channel_->event_engine()->RunAfter(timeout, [self = Ref()]() mutable {
274           ApplicationCallbackExecCtx callback_exec_ctx;
275           ExecCtx exec_ctx;
276           self->TimeoutComplete();
277           // StateWatcher deletion might require an active ExecCtx.
278           self.reset();
279         });
280   }
281 
TimeoutComplete()282   void TimeoutComplete() {
283     timer_fired_ = true;
284     // If this is a client channel (not a lame channel), cancel the watch.
285     ClientChannelFilter* client_channel = channel_->GetClientChannelFilter();
286     if (client_channel != nullptr) {
287       client_channel->CancelExternalConnectivityWatcher(&on_complete_);
288     }
289   }
290 
WatchComplete(void * arg,grpc_error_handle error)291   static void WatchComplete(void* arg, grpc_error_handle error) {
292     RefCountedPtr<StateWatcher> self(static_cast<StateWatcher*>(arg));
293     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
294       GRPC_LOG_IF_ERROR("watch_completion_error", error);
295     }
296     MutexLock lock(&self->mu_);
297     if (self->timer_handle_.has_value()) {
298       self->channel_->event_engine()->Cancel(*self->timer_handle_);
299     }
300   }
301 
302   // Invoked when both strong refs are released.
Orphaned()303   void Orphaned() override {
304     WeakRef().release();  // Take a weak ref until completion is finished.
305     grpc_error_handle error =
306         timer_fired_
307             ? GRPC_ERROR_CREATE("Timed out waiting for connection state change")
308             : absl::OkStatus();
309     grpc_cq_end_op(cq_, tag_, error, FinishedCompletion, this,
310                    &completion_storage_);
311   }
312 
313   // Called when the completion is returned to the CQ.
FinishedCompletion(void * arg,grpc_cq_completion *)314   static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
315     auto* self = static_cast<StateWatcher*>(arg);
316     self->WeakUnref();
317   }
318 
319   RefCountedPtr<LegacyChannel> channel_;
320   grpc_completion_queue* cq_;
321   void* tag_;
322 
323   grpc_connectivity_state state_;
324   grpc_cq_completion completion_storage_;
325   grpc_closure on_complete_;
326 
327   // timer_handle_ might be accessed in parallel from multiple threads, e.g.
328   // timer callback fired immediately on an EventEngine thread before
329   // RunAfter() returns.
330   Mutex mu_;
331   absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
332       timer_handle_ ABSL_GUARDED_BY(mu_);
333   bool timer_fired_ = false;
334 };
335 
WatchConnectivityState(grpc_connectivity_state last_observed_state,Timestamp deadline,grpc_completion_queue * cq,void * tag)336 void LegacyChannel::WatchConnectivityState(
337     grpc_connectivity_state last_observed_state, Timestamp deadline,
338     grpc_completion_queue* cq, void* tag) {
339   new StateWatcher(RefAsSubclass<LegacyChannel>(), cq, tag, last_observed_state,
340                    deadline);
341 }
342 
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)343 void LegacyChannel::AddConnectivityWatcher(
344     grpc_connectivity_state initial_state,
345     OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
346   auto* client_channel = GetClientChannelFilter();
347   GPR_ASSERT(client_channel != nullptr);
348   client_channel->AddConnectivityWatcher(initial_state, std::move(watcher));
349 }
350 
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)351 void LegacyChannel::RemoveConnectivityWatcher(
352     AsyncConnectivityStateWatcherInterface* watcher) {
353   auto* client_channel = GetClientChannelFilter();
354   GPR_ASSERT(client_channel != nullptr);
355   client_channel->RemoveConnectivityWatcher(watcher);
356 }
357 
GetInfo(const grpc_channel_info * channel_info)358 void LegacyChannel::GetInfo(const grpc_channel_info* channel_info) {
359   grpc_channel_element* elem =
360       grpc_channel_stack_element(channel_stack_.get(), 0);
361   elem->filter->get_channel_info(elem, channel_info);
362 }
363 
ResetConnectionBackoff()364 void LegacyChannel::ResetConnectionBackoff() {
365   grpc_transport_op* op = grpc_make_transport_op(nullptr);
366   op->reset_connect_backoff = true;
367   grpc_channel_element* elem =
368       grpc_channel_stack_element(channel_stack_.get(), 0);
369   elem->filter->start_transport_op(elem, op);
370 }
371 
372 namespace {
373 
374 struct ping_result {
375   grpc_closure closure;
376   void* tag;
377   grpc_completion_queue* cq;
378   grpc_cq_completion completion_storage;
379 };
ping_destroy(void * arg,grpc_cq_completion *)380 void ping_destroy(void* arg, grpc_cq_completion* /*storage*/) { gpr_free(arg); }
381 
ping_done(void * arg,grpc_error_handle error)382 void ping_done(void* arg, grpc_error_handle error) {
383   ping_result* pr = static_cast<ping_result*>(arg);
384   grpc_cq_end_op(pr->cq, pr->tag, error, ping_destroy, pr,
385                  &pr->completion_storage);
386 }
387 
388 }  // namespace
389 
Ping(grpc_completion_queue * cq,void * tag)390 void LegacyChannel::Ping(grpc_completion_queue* cq, void* tag) {
391   ping_result* pr = static_cast<ping_result*>(gpr_malloc(sizeof(*pr)));
392   pr->tag = tag;
393   pr->cq = cq;
394   GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
395   grpc_transport_op* op = grpc_make_transport_op(nullptr);
396   op->send_ping.on_ack = &pr->closure;
397   op->bind_pollset = grpc_cq_pollset(cq);
398   GPR_ASSERT(grpc_cq_begin_op(cq, tag));
399   grpc_channel_element* top_elem =
400       grpc_channel_stack_element(channel_stack_.get(), 0);
401   top_elem->filter->start_transport_op(top_elem, op);
402 }
403 
GetClientChannelFilter() const404 ClientChannelFilter* LegacyChannel::GetClientChannelFilter() const {
405   grpc_channel_element* elem =
406       grpc_channel_stack_last_element(channel_stack_.get());
407   if (elem->filter != &ClientChannelFilter::kFilterVtableWithPromises &&
408       elem->filter != &ClientChannelFilter::kFilterVtableWithoutPromises) {
409     return nullptr;
410   }
411   return static_cast<ClientChannelFilter*>(elem->channel_data);
412 }
413 
414 }  // namespace grpc_core
415