xref: /aosp_15_r20/external/grpc-grpc/src/core/client_channel/client_channel_filter.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/client_channel/client_channel_filter.h"
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 
24 #include <algorithm>
25 #include <functional>
26 #include <new>
27 #include <set>
28 #include <type_traits>
29 #include <utility>
30 #include <vector>
31 
32 #include "absl/cleanup/cleanup.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/cord.h"
36 #include "absl/strings/numbers.h"
37 #include "absl/strings/str_cat.h"
38 #include "absl/strings/str_join.h"
39 #include "absl/strings/string_view.h"
40 #include "absl/types/optional.h"
41 #include "absl/types/variant.h"
42 
43 #include <grpc/event_engine/event_engine.h>
44 #include <grpc/impl/channel_arg_names.h>
45 #include <grpc/slice.h>
46 #include <grpc/status.h>
47 #include <grpc/support/json.h>
48 #include <grpc/support/log.h>
49 #include <grpc/support/string_util.h>
50 #include <grpc/support/time.h>
51 
52 #include "src/core/client_channel/backup_poller.h"
53 #include "src/core/client_channel/client_channel_channelz.h"
54 #include "src/core/client_channel/client_channel_internal.h"
55 #include "src/core/client_channel/client_channel_service_config.h"
56 #include "src/core/client_channel/config_selector.h"
57 #include "src/core/client_channel/dynamic_filters.h"
58 #include "src/core/client_channel/global_subchannel_pool.h"
59 #include "src/core/client_channel/local_subchannel_pool.h"
60 #include "src/core/client_channel/retry_filter.h"
61 #include "src/core/client_channel/subchannel.h"
62 #include "src/core/client_channel/subchannel_interface_internal.h"
63 #include "src/core/ext/filters/deadline/deadline_filter.h"
64 #include "src/core/lib/channel/channel_args.h"
65 #include "src/core/lib/channel/channel_stack.h"
66 #include "src/core/lib/channel/channel_trace.h"
67 #include "src/core/lib/channel/status_util.h"
68 #include "src/core/lib/config/core_configuration.h"
69 #include "src/core/lib/debug/trace.h"
70 #include "src/core/lib/experiments/experiments.h"
71 #include "src/core/lib/gpr/useful.h"
72 #include "src/core/lib/gprpp/crash.h"
73 #include "src/core/lib/gprpp/debug_location.h"
74 #include "src/core/lib/gprpp/manual_constructor.h"
75 #include "src/core/lib/gprpp/status_helper.h"
76 #include "src/core/lib/gprpp/sync.h"
77 #include "src/core/lib/gprpp/unique_type_name.h"
78 #include "src/core/lib/gprpp/work_serializer.h"
79 #include "src/core/lib/handshaker/proxy_mapper_registry.h"
80 #include "src/core/lib/iomgr/exec_ctx.h"
81 #include "src/core/lib/iomgr/polling_entity.h"
82 #include "src/core/lib/iomgr/pollset_set.h"
83 #include "src/core/lib/iomgr/resolved_address.h"
84 #include "src/core/lib/json/json.h"
85 #include "src/core/lib/promise/cancel_callback.h"
86 #include "src/core/lib/promise/context.h"
87 #include "src/core/lib/promise/latch.h"
88 #include "src/core/lib/promise/map.h"
89 #include "src/core/lib/promise/pipe.h"
90 #include "src/core/lib/promise/poll.h"
91 #include "src/core/lib/promise/promise.h"
92 #include "src/core/lib/promise/try_seq.h"
93 #include "src/core/lib/security/credentials/credentials.h"
94 #include "src/core/lib/slice/slice.h"
95 #include "src/core/lib/slice/slice_internal.h"
96 #include "src/core/lib/surface/call.h"
97 #include "src/core/lib/transport/connectivity_state.h"
98 #include "src/core/lib/transport/error_utils.h"
99 #include "src/core/lib/transport/metadata_batch.h"
100 #include "src/core/load_balancing/backend_metric_parser.h"
101 #include "src/core/load_balancing/child_policy_handler.h"
102 #include "src/core/load_balancing/lb_policy_registry.h"
103 #include "src/core/load_balancing/subchannel_interface.h"
104 #include "src/core/resolver/endpoint_addresses.h"
105 #include "src/core/resolver/resolver_registry.h"
106 #include "src/core/service_config/service_config_call_data.h"
107 #include "src/core/service_config/service_config_impl.h"
108 
109 //
110 // Client channel filter
111 //
112 
113 namespace grpc_core {
114 
115 using internal::ClientChannelMethodParsedConfig;
116 
117 TraceFlag grpc_client_channel_trace(false, "client_channel");
118 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
119 TraceFlag grpc_client_channel_lb_call_trace(false, "client_channel_lb_call");
120 
121 //
122 // ClientChannelFilter::CallData definition
123 //
124 
125 class ClientChannelFilter::CallData {
126  public:
127   // Removes the call from the channel's list of calls queued
128   // for name resolution.
129   void RemoveCallFromResolverQueuedCallsLocked()
130       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
131 
132   // Called by the channel for each queued call when a new resolution
133   // result becomes available.
134   virtual void RetryCheckResolutionLocked()
135       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) = 0;
136 
dynamic_filters() const137   RefCountedPtr<DynamicFilters> dynamic_filters() const {
138     return dynamic_filters_;
139   }
140 
141  protected:
142   CallData() = default;
143   virtual ~CallData() = default;
144 
145   // Checks whether a resolver result is available.  The following
146   // outcomes are possible:
147   // - No resolver result is available yet.  The call will be queued and
148   //   absl::nullopt will be returned.  Later, when a resolver result
149   //   becomes available, RetryCheckResolutionLocked() will be called.
150   // - The resolver has returned a transient failure.  If the call is
151   //   not wait_for_ready, a non-OK status will be returned.  (If the
152   //   call *is* wait_for_ready, it will be queued instead.)
153   // - There is a valid resolver result.  The service config will be
154   //   stored in the call context and an OK status will be returned.
155   absl::optional<absl::Status> CheckResolution(bool was_queued);
156 
157  private:
158   // Accessors for data stored in the subclass.
159   virtual ClientChannelFilter* chand() const = 0;
160   virtual Arena* arena() const = 0;
161   virtual grpc_polling_entity* pollent() = 0;
162   virtual grpc_metadata_batch* send_initial_metadata() = 0;
163   virtual grpc_call_context_element* call_context() const = 0;
164 
165   // Helper function for CheckResolution().  Returns true if the call
166   // can continue (i.e., there is a valid resolution result, or there is
167   // an invalid resolution result but the call is not wait_for_ready).
168   bool CheckResolutionLocked(
169       absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector)
170       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
171 
172   // Adds the call to the channel's list of calls queued for name resolution.
173   void AddCallToResolverQueuedCallsLocked()
174       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
175 
176   // Called when adding the call to the resolver queue.
OnAddToQueueLocked()177   virtual void OnAddToQueueLocked()
178       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) {}
179 
180   // Applies service config to the call.  Must be invoked once we know
181   // that the resolver has returned results to the channel.
182   // If an error is returned, the error indicates the status with which
183   // the call should be failed.
184   grpc_error_handle ApplyServiceConfigToCallLocked(
185       const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector);
186 
187   // Called to reset the deadline based on the service config obtained
188   // from the resolver.
189   virtual void ResetDeadline(Duration timeout) = 0;
190 
191   RefCountedPtr<DynamicFilters> dynamic_filters_;
192 };
193 
194 class ClientChannelFilter::FilterBasedCallData final
195     : public ClientChannelFilter::CallData {
196  public:
197   static grpc_error_handle Init(grpc_call_element* elem,
198                                 const grpc_call_element_args* args);
199   static void Destroy(grpc_call_element* elem,
200                       const grpc_call_final_info* final_info,
201                       grpc_closure* then_schedule_closure);
202   static void StartTransportStreamOpBatch(
203       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
204   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
205 
206  private:
207   class ResolverQueuedCallCanceller;
208 
209   FilterBasedCallData(grpc_call_element* elem,
210                       const grpc_call_element_args& args);
211   ~FilterBasedCallData() override;
212 
elem() const213   grpc_call_element* elem() const { return deadline_state_.elem; }
owning_call() const214   grpc_call_stack* owning_call() const { return deadline_state_.call_stack; }
call_combiner() const215   CallCombiner* call_combiner() const { return deadline_state_.call_combiner; }
216 
chand() const217   ClientChannelFilter* chand() const override {
218     return static_cast<ClientChannelFilter*>(elem()->channel_data);
219   }
arena() const220   Arena* arena() const override { return deadline_state_.arena; }
pollent()221   grpc_polling_entity* pollent() override { return pollent_; }
send_initial_metadata()222   grpc_metadata_batch* send_initial_metadata() override {
223     return pending_batches_[0]
224         ->payload->send_initial_metadata.send_initial_metadata;
225   }
call_context() const226   grpc_call_context_element* call_context() const override {
227     return call_context_;
228   }
229 
230   // Returns the index into pending_batches_ to be used for batch.
231   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
232   void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
233   static void FailPendingBatchInCallCombiner(void* arg,
234                                              grpc_error_handle error);
235   // A predicate type and some useful implementations for PendingBatchesFail().
236   typedef bool (*YieldCallCombinerPredicate)(
237       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)238   static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
239     return true;
240   }
NoYieldCallCombiner(const CallCombinerClosureList &)241   static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
242     return false;
243   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)244   static bool YieldCallCombinerIfPendingBatchesFound(
245       const CallCombinerClosureList& closures) {
246     return closures.size() > 0;
247   }
248   // Fails all pending batches.
249   // If yield_call_combiner_predicate returns true, assumes responsibility for
250   // yielding the call combiner.
251   void PendingBatchesFail(
252       grpc_error_handle error,
253       YieldCallCombinerPredicate yield_call_combiner_predicate);
254   static void ResumePendingBatchInCallCombiner(void* arg,
255                                                grpc_error_handle ignored);
256   // Resumes all pending batches on dynamic_call_.
257   void PendingBatchesResume();
258 
259   // Called to check for a resolution result, both when the call is
260   // initially started and when it is queued and the channel gets a new
261   // resolution result.
262   void TryCheckResolution(bool was_queued);
263 
264   void OnAddToQueueLocked() override
265       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
266 
267   void RetryCheckResolutionLocked() override
268       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
269 
ResetDeadline(Duration timeout)270   void ResetDeadline(Duration timeout) override {
271     const Timestamp per_method_deadline =
272         Timestamp::FromCycleCounterRoundUp(call_start_time_) + timeout;
273     if (per_method_deadline < deadline_) {
274       deadline_ = per_method_deadline;
275       grpc_deadline_state_reset(&deadline_state_, deadline_);
276     }
277   }
278 
279   void CreateDynamicCall();
280 
281   static void RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
282       void* arg, grpc_error_handle error);
283 
284   grpc_slice path_;  // Request path.
285   grpc_call_context_element* call_context_;
286   gpr_cycle_counter call_start_time_;
287   Timestamp deadline_;
288 
289   // State for handling deadlines.
290   grpc_deadline_state deadline_state_;
291 
292   grpc_polling_entity* pollent_ = nullptr;
293 
294   // Accessed while holding ClientChannelFilter::resolution_mu_.
295   ResolverQueuedCallCanceller* resolver_call_canceller_
296       ABSL_GUARDED_BY(&ClientChannelFilter::resolution_mu_) = nullptr;
297 
298   grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
299   grpc_closure recv_trailing_metadata_ready_;
300 
301   RefCountedPtr<DynamicFilters::Call> dynamic_call_;
302 
303   // Batches are added to this list when received from above.
304   // They are removed when we are done handling the batch (i.e., when
305   // either we have invoked all of the batch's callbacks or we have
306   // passed the batch down to the LB call and are not intercepting any of
307   // its callbacks).
308   grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
309 
310   // Set when we get a cancel_stream op.
311   grpc_error_handle cancel_error_;
312 };
313 
314 class ClientChannelFilter::PromiseBasedCallData final
315     : public ClientChannelFilter::CallData {
316  public:
PromiseBasedCallData(ClientChannelFilter * chand)317   explicit PromiseBasedCallData(ClientChannelFilter* chand) : chand_(chand) {}
318 
~PromiseBasedCallData()319   ~PromiseBasedCallData() override {
320     if (was_queued_ && client_initial_metadata_ != nullptr) {
321       MutexLock lock(&chand_->resolution_mu_);
322       RemoveCallFromResolverQueuedCallsLocked();
323       chand_->resolver_queued_calls_.erase(this);
324     }
325   }
326 
MakeNameResolutionPromise(CallArgs call_args)327   ArenaPromise<absl::StatusOr<CallArgs>> MakeNameResolutionPromise(
328       CallArgs call_args) {
329     pollent_ = NowOrNever(call_args.polling_entity->WaitAndCopy()).value();
330     client_initial_metadata_ = std::move(call_args.client_initial_metadata);
331     // If we're still in IDLE, we need to start resolving.
332     if (GPR_UNLIKELY(chand_->CheckConnectivityState(false) ==
333                      GRPC_CHANNEL_IDLE)) {
334       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
335         gpr_log(GPR_INFO, "chand=%p calld=%p: %striggering exit idle", chand_,
336                 this, GetContext<Activity>()->DebugTag().c_str());
337       }
338       // Bounce into the control plane work serializer to start resolving.
339       GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExitIdle");
340       chand_->work_serializer_->Run(
341           [chand = chand_]()
342               ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
343                 chand->CheckConnectivityState(/*try_to_connect=*/true);
344                 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "ExitIdle");
345               },
346           DEBUG_LOCATION);
347     }
348     return [this, call_args = std::move(
349                       call_args)]() mutable -> Poll<absl::StatusOr<CallArgs>> {
350       auto result = CheckResolution(was_queued_);
351       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
352         gpr_log(GPR_INFO, "chand=%p calld=%p: %sCheckResolution returns %s",
353                 chand_, this, GetContext<Activity>()->DebugTag().c_str(),
354                 result.has_value() ? result->ToString().c_str() : "Pending");
355       }
356       if (!result.has_value()) return Pending{};
357       if (!result->ok()) return *result;
358       call_args.client_initial_metadata = std::move(client_initial_metadata_);
359       return std::move(call_args);
360     };
361   }
362 
363  private:
chand() const364   ClientChannelFilter* chand() const override { return chand_; }
arena() const365   Arena* arena() const override { return GetContext<Arena>(); }
pollent()366   grpc_polling_entity* pollent() override { return &pollent_; }
send_initial_metadata()367   grpc_metadata_batch* send_initial_metadata() override {
368     return client_initial_metadata_.get();
369   }
call_context() const370   grpc_call_context_element* call_context() const override {
371     return GetContext<grpc_call_context_element>();
372   }
373 
OnAddToQueueLocked()374   void OnAddToQueueLocked() override
375       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) {
376     waker_ = GetContext<Activity>()->MakeNonOwningWaker();
377     was_queued_ = true;
378   }
379 
RetryCheckResolutionLocked()380   void RetryCheckResolutionLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
381       &ClientChannelFilter::resolution_mu_) override {
382     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
383       gpr_log(GPR_INFO, "chand=%p calld=%p: RetryCheckResolutionLocked(): %s",
384               chand_, this, waker_.ActivityDebugTag().c_str());
385     }
386     waker_.WakeupAsync();
387   }
388 
ResetDeadline(Duration timeout)389   void ResetDeadline(Duration timeout) override {
390     CallContext* call_context = GetContext<CallContext>();
391     const Timestamp per_method_deadline =
392         Timestamp::FromCycleCounterRoundUp(call_context->call_start_time()) +
393         timeout;
394     call_context->UpdateDeadline(per_method_deadline);
395   }
396 
397   ClientChannelFilter* chand_;
398   grpc_polling_entity pollent_;
399   ClientMetadataHandle client_initial_metadata_;
400   bool was_queued_ = false;
401   Waker waker_ ABSL_GUARDED_BY(&ClientChannelFilter::resolution_mu_);
402 };
403 
404 //
405 // Filter vtable
406 //
407 
408 const grpc_channel_filter ClientChannelFilter::kFilterVtableWithPromises = {
409     ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch,
410     ClientChannelFilter::MakeCallPromise,
411     /* init_call: */ nullptr,
412     ClientChannelFilter::StartTransportOp,
413     sizeof(ClientChannelFilter::FilterBasedCallData),
414     ClientChannelFilter::FilterBasedCallData::Init,
415     ClientChannelFilter::FilterBasedCallData::SetPollent,
416     ClientChannelFilter::FilterBasedCallData::Destroy,
417     sizeof(ClientChannelFilter),
418     ClientChannelFilter::Init,
419     grpc_channel_stack_no_post_init,
420     ClientChannelFilter::Destroy,
421     ClientChannelFilter::GetChannelInfo,
422     "client-channel",
423 };
424 
425 const grpc_channel_filter ClientChannelFilter::kFilterVtableWithoutPromises = {
426     ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch,
427     nullptr,
428     /* init_call: */ nullptr,
429     ClientChannelFilter::StartTransportOp,
430     sizeof(ClientChannelFilter::FilterBasedCallData),
431     ClientChannelFilter::FilterBasedCallData::Init,
432     ClientChannelFilter::FilterBasedCallData::SetPollent,
433     ClientChannelFilter::FilterBasedCallData::Destroy,
434     sizeof(ClientChannelFilter),
435     ClientChannelFilter::Init,
436     grpc_channel_stack_no_post_init,
437     ClientChannelFilter::Destroy,
438     ClientChannelFilter::GetChannelInfo,
439     "client-channel",
440 };
441 
442 //
443 // dynamic termination filter
444 //
445 
446 namespace {
447 
GetServiceConfigCallData(grpc_call_context_element * context)448 ClientChannelServiceConfigCallData* GetServiceConfigCallData(
449     grpc_call_context_element* context) {
450   return static_cast<ClientChannelServiceConfigCallData*>(
451       context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
452 }
453 
454 class DynamicTerminationFilter final {
455  public:
456   class CallData;
457 
458   static const grpc_channel_filter kFilterVtable;
459 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)460   static grpc_error_handle Init(grpc_channel_element* elem,
461                                 grpc_channel_element_args* args) {
462     GPR_ASSERT(args->is_last);
463     GPR_ASSERT(elem->filter == &kFilterVtable);
464     new (elem->channel_data) DynamicTerminationFilter(args->channel_args);
465     return absl::OkStatus();
466   }
467 
Destroy(grpc_channel_element * elem)468   static void Destroy(grpc_channel_element* elem) {
469     auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
470     chand->~DynamicTerminationFilter();
471   }
472 
473   // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)474   static void StartTransportOp(grpc_channel_element* /*elem*/,
475                                grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)476   static void GetChannelInfo(grpc_channel_element* /*elem*/,
477                              const grpc_channel_info* /*info*/) {}
478 
MakeCallPromise(grpc_channel_element * elem,CallArgs call_args,NextPromiseFactory)479   static ArenaPromise<ServerMetadataHandle> MakeCallPromise(
480       grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) {
481     auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
482     return chand->chand_->CreateLoadBalancedCallPromise(
483         std::move(call_args),
484         []() {
485           auto* service_config_call_data =
486               GetServiceConfigCallData(GetContext<grpc_call_context_element>());
487           service_config_call_data->Commit();
488         },
489         /*is_transparent_retry=*/false);
490   }
491 
492  private:
DynamicTerminationFilter(const ChannelArgs & args)493   explicit DynamicTerminationFilter(const ChannelArgs& args)
494       : chand_(args.GetObject<ClientChannelFilter>()) {}
495 
496   ClientChannelFilter* chand_;
497 };
498 
499 class DynamicTerminationFilter::CallData final {
500  public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)501   static grpc_error_handle Init(grpc_call_element* elem,
502                                 const grpc_call_element_args* args) {
503     new (elem->call_data) CallData(*args);
504     return absl::OkStatus();
505   }
506 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)507   static void Destroy(grpc_call_element* elem,
508                       const grpc_call_final_info* /*final_info*/,
509                       grpc_closure* then_schedule_closure) {
510     auto* calld = static_cast<CallData*>(elem->call_data);
511     RefCountedPtr<SubchannelCall> subchannel_call;
512     if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
513       subchannel_call = calld->lb_call_->subchannel_call();
514     }
515     calld->~CallData();
516     if (GPR_LIKELY(subchannel_call != nullptr)) {
517       subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
518     } else {
519       // TODO(yashkt) : This can potentially be a Closure::Run
520       ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
521     }
522   }
523 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)524   static void StartTransportStreamOpBatch(
525       grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
526     auto* calld = static_cast<CallData*>(elem->call_data);
527     calld->lb_call_->StartTransportStreamOpBatch(batch);
528   }
529 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)530   static void SetPollent(grpc_call_element* elem,
531                          grpc_polling_entity* pollent) {
532     auto* calld = static_cast<CallData*>(elem->call_data);
533     auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
534     ClientChannelFilter* client_channel = chand->chand_;
535     grpc_call_element_args args = {calld->owning_call_,  nullptr,
536                                    calld->call_context_, calld->path_,
537                                    /*start_time=*/0,     calld->deadline_,
538                                    calld->arena_,        calld->call_combiner_};
539     auto* service_config_call_data =
540         GetServiceConfigCallData(calld->call_context_);
541     calld->lb_call_ = client_channel->CreateLoadBalancedCall(
542         args, pollent, nullptr,
543         [service_config_call_data]() { service_config_call_data->Commit(); },
544         /*is_transparent_retry=*/false);
545     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
546       gpr_log(GPR_INFO,
547               "chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
548               client_channel, calld->lb_call_.get());
549     }
550   }
551 
552  private:
CallData(const grpc_call_element_args & args)553   explicit CallData(const grpc_call_element_args& args)
554       : path_(CSliceRef(args.path)),
555         deadline_(args.deadline),
556         arena_(args.arena),
557         owning_call_(args.call_stack),
558         call_combiner_(args.call_combiner),
559         call_context_(args.context) {}
560 
~CallData()561   ~CallData() { CSliceUnref(path_); }
562 
563   grpc_slice path_;  // Request path.
564   Timestamp deadline_;
565   Arena* arena_;
566   grpc_call_stack* owning_call_;
567   CallCombiner* call_combiner_;
568   grpc_call_context_element* call_context_;
569 
570   OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall> lb_call_;
571 };
572 
573 const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = {
574     DynamicTerminationFilter::CallData::StartTransportStreamOpBatch,
575     DynamicTerminationFilter::MakeCallPromise,
576     /* init_call: */ nullptr,
577     DynamicTerminationFilter::StartTransportOp,
578     sizeof(DynamicTerminationFilter::CallData),
579     DynamicTerminationFilter::CallData::Init,
580     DynamicTerminationFilter::CallData::SetPollent,
581     DynamicTerminationFilter::CallData::Destroy,
582     sizeof(DynamicTerminationFilter),
583     DynamicTerminationFilter::Init,
584     grpc_channel_stack_no_post_init,
585     DynamicTerminationFilter::Destroy,
586     DynamicTerminationFilter::GetChannelInfo,
587     "dynamic_filter_termination",
588 };
589 
590 }  // namespace
591 
592 //
593 // ClientChannelFilter::ResolverResultHandler
594 //
595 
596 class ClientChannelFilter::ResolverResultHandler final
597     : public Resolver::ResultHandler {
598  public:
ResolverResultHandler(ClientChannelFilter * chand)599   explicit ResolverResultHandler(ClientChannelFilter* chand) : chand_(chand) {
600     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
601   }
602 
~ResolverResultHandler()603   ~ResolverResultHandler() override {
604     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
605       gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
606     }
607     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
608   }
609 
ReportResult(Resolver::Result result)610   void ReportResult(Resolver::Result result) override
611       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
612     chand_->OnResolverResultChangedLocked(std::move(result));
613   }
614 
615  private:
616   ClientChannelFilter* chand_;
617 };
618 
619 //
620 // ClientChannelFilter::SubchannelWrapper
621 //
622 
623 // This class is a wrapper for Subchannel that hides details of the
624 // channel's implementation (such as the connected subchannel) from the
625 // LB policy API.
626 //
627 // Note that no synchronization is needed here, because even if the
628 // underlying subchannel is shared between channels, this wrapper will only
629 // be used within one channel, so it will always be synchronized by the
630 // control plane work_serializer.
631 class ClientChannelFilter::SubchannelWrapper final
632     : public SubchannelInterface {
633  public:
SubchannelWrapper(ClientChannelFilter * chand,RefCountedPtr<Subchannel> subchannel)634   SubchannelWrapper(ClientChannelFilter* chand,
635                     RefCountedPtr<Subchannel> subchannel)
636       : SubchannelInterface(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)
637                                 ? "SubchannelWrapper"
638                                 : nullptr),
639         chand_(chand),
640         subchannel_(std::move(subchannel)) {
641     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
642       gpr_log(GPR_INFO,
643               "chand=%p: creating subchannel wrapper %p for subchannel %p",
644               chand, this, subchannel_.get());
645     }
646     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
647     GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer());
648     if (chand_->channelz_node_ != nullptr) {
649       auto* subchannel_node = subchannel_->channelz_node();
650       if (subchannel_node != nullptr) {
651         auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
652         if (it == chand_->subchannel_refcount_map_.end()) {
653           chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
654           it = chand_->subchannel_refcount_map_.emplace(subchannel_.get(), 0)
655                    .first;
656         }
657         ++it->second;
658       }
659     }
660     chand_->subchannel_wrappers_.insert(this);
661   }
662 
~SubchannelWrapper()663   ~SubchannelWrapper() override {
664     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
665       gpr_log(GPR_INFO,
666               "chand=%p: destroying subchannel wrapper %p for subchannel %p",
667               chand_, this, subchannel_.get());
668     }
669     if (!IsWorkSerializerDispatchEnabled()) {
670       chand_->subchannel_wrappers_.erase(this);
671       if (chand_->channelz_node_ != nullptr) {
672         auto* subchannel_node = subchannel_->channelz_node();
673         if (subchannel_node != nullptr) {
674           auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
675           GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
676           --it->second;
677           if (it->second == 0) {
678             chand_->channelz_node_->RemoveChildSubchannel(
679                 subchannel_node->uuid());
680             chand_->subchannel_refcount_map_.erase(it);
681           }
682         }
683       }
684     }
685     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
686   }
687 
Orphaned()688   void Orphaned() override {
689     if (!IsWorkSerializerDispatchEnabled()) return;
690     // Make sure we clean up the channel's subchannel maps inside the
691     // WorkSerializer.
692     // Ref held by callback.
693     WeakRef(DEBUG_LOCATION, "subchannel map cleanup").release();
694     chand_->work_serializer_->Run(
695         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
696           chand_->subchannel_wrappers_.erase(this);
697           if (chand_->channelz_node_ != nullptr) {
698             auto* subchannel_node = subchannel_->channelz_node();
699             if (subchannel_node != nullptr) {
700               auto it =
701                   chand_->subchannel_refcount_map_.find(subchannel_.get());
702               GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
703               --it->second;
704               if (it->second == 0) {
705                 chand_->channelz_node_->RemoveChildSubchannel(
706                     subchannel_node->uuid());
707                 chand_->subchannel_refcount_map_.erase(it);
708               }
709             }
710           }
711           WeakUnref(DEBUG_LOCATION, "subchannel map cleanup");
712         },
713         DEBUG_LOCATION);
714   }
715 
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> watcher)716   void WatchConnectivityState(
717       std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
718       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
719     auto& watcher_wrapper = watcher_map_[watcher.get()];
720     GPR_ASSERT(watcher_wrapper == nullptr);
721     watcher_wrapper = new WatcherWrapper(
722         std::move(watcher),
723         RefAsSubclass<SubchannelWrapper>(DEBUG_LOCATION, "WatcherWrapper"));
724     subchannel_->WatchConnectivityState(
725         RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
726             watcher_wrapper));
727   }
728 
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)729   void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface* watcher)
730       override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
731     auto it = watcher_map_.find(watcher);
732     GPR_ASSERT(it != watcher_map_.end());
733     subchannel_->CancelConnectivityStateWatch(it->second);
734     watcher_map_.erase(it);
735   }
736 
connected_subchannel() const737   RefCountedPtr<ConnectedSubchannel> connected_subchannel() const {
738     return subchannel_->connected_subchannel();
739   }
740 
RequestConnection()741   void RequestConnection() override { subchannel_->RequestConnection(); }
742 
ResetBackoff()743   void ResetBackoff() override { subchannel_->ResetBackoff(); }
744 
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)745   void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override
746       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
747     static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get())
748         ->SetSubchannel(subchannel_.get());
749     GPR_ASSERT(data_watchers_.insert(std::move(watcher)).second);
750   }
751 
CancelDataWatcher(DataWatcherInterface * watcher)752   void CancelDataWatcher(DataWatcherInterface* watcher) override
753       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
754     auto it = data_watchers_.find(watcher);
755     if (it != data_watchers_.end()) data_watchers_.erase(it);
756   }
757 
ThrottleKeepaliveTime(int new_keepalive_time)758   void ThrottleKeepaliveTime(int new_keepalive_time) {
759     subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
760   }
761 
762  private:
763   // This wrapper provides a bridge between the internal Subchannel API
764   // and the SubchannelInterface API that we expose to LB policies.
765   // It implements Subchannel::ConnectivityStateWatcherInterface and wraps
766   // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
767   // that was passed in by the LB policy.  We pass an instance of this
768   // class to the underlying Subchannel, and when we get updates from
769   // the subchannel, we pass those on to the wrapped watcher to return
770   // the update to the LB policy.
771   //
772   // This class handles things like hopping into the WorkSerializer
773   // before passing notifications to the LB policy and propagating
774   // keepalive information betwen subchannels.
775   class WatcherWrapper final
776       : public Subchannel::ConnectivityStateWatcherInterface {
777    public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent)778     WatcherWrapper(
779         std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
780             watcher,
781         RefCountedPtr<SubchannelWrapper> parent)
782         : watcher_(std::move(watcher)), parent_(std::move(parent)) {}
783 
~WatcherWrapper()784     ~WatcherWrapper() override {
785       if (!IsWorkSerializerDispatchEnabled()) {
786         auto* parent = parent_.release();  // ref owned by lambda
787         parent->chand_->work_serializer_->Run(
788             [parent]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
789                 *parent_->chand_->work_serializer_) {
790               parent->Unref(DEBUG_LOCATION, "WatcherWrapper");
791             },
792             DEBUG_LOCATION);
793         return;
794       }
795       parent_.reset(DEBUG_LOCATION, "WatcherWrapper");
796     }
797 
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status & status)798     void OnConnectivityStateChange(
799         RefCountedPtr<ConnectivityStateWatcherInterface> self,
800         grpc_connectivity_state state, const absl::Status& status) override {
801       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
802         gpr_log(GPR_INFO,
803                 "chand=%p: connectivity change for subchannel wrapper %p "
804                 "subchannel %p; hopping into work_serializer",
805                 parent_->chand_, parent_.get(), parent_->subchannel_.get());
806       }
807       self.release();  // Held by callback.
808       parent_->chand_->work_serializer_->Run(
809           [this, state, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
810               *parent_->chand_->work_serializer_) {
811             ApplyUpdateInControlPlaneWorkSerializer(state, status);
812             Unref();
813           },
814           DEBUG_LOCATION);
815     }
816 
interested_parties()817     grpc_pollset_set* interested_parties() override {
818       return watcher_->interested_parties();
819     }
820 
821    private:
ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,const absl::Status & status)822     void ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,
823                                                  const absl::Status& status)
824         ABSL_EXCLUSIVE_LOCKS_REQUIRED(*parent_->chand_->work_serializer_) {
825       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
826         gpr_log(GPR_INFO,
827                 "chand=%p: processing connectivity change in work serializer "
828                 "for subchannel wrapper %p subchannel %p watcher=%p "
829                 "state=%s status=%s",
830                 parent_->chand_, parent_.get(), parent_->subchannel_.get(),
831                 watcher_.get(), ConnectivityStateName(state),
832                 status.ToString().c_str());
833       }
834       absl::optional<absl::Cord> keepalive_throttling =
835           status.GetPayload(kKeepaliveThrottlingKey);
836       if (keepalive_throttling.has_value()) {
837         int new_keepalive_time = -1;
838         if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
839                              &new_keepalive_time)) {
840           if (new_keepalive_time > parent_->chand_->keepalive_time_) {
841             parent_->chand_->keepalive_time_ = new_keepalive_time;
842             if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
843               gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d",
844                       parent_->chand_, parent_->chand_->keepalive_time_);
845             }
846             // Propagate the new keepalive time to all subchannels. This is so
847             // that new transports created by any subchannel (and not just the
848             // subchannel that received the GOAWAY), use the new keepalive time.
849             for (auto* subchannel_wrapper :
850                  parent_->chand_->subchannel_wrappers_) {
851               subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
852             }
853           }
854         } else {
855           gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s",
856                   parent_->chand_,
857                   std::string(keepalive_throttling.value()).c_str());
858         }
859       }
860       // Propagate status only in state TF.
861       // We specifically want to avoid propagating the status for
862       // state IDLE that the real subchannel gave us only for the
863       // purpose of keepalive propagation.
864       watcher_->OnConnectivityStateChange(
865           state,
866           state == GRPC_CHANNEL_TRANSIENT_FAILURE ? status : absl::OkStatus());
867     }
868 
869     std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
870         watcher_;
871     RefCountedPtr<SubchannelWrapper> parent_;
872   };
873 
874   // A heterogenous lookup comparator for data watchers that allows
875   // unique_ptr keys to be looked up as raw pointers.
876   struct DataWatcherLessThan {
877     using is_transparent = void;
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan878     bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
879                     const std::unique_ptr<DataWatcherInterface>& p2) const {
880       return p1 < p2;
881     }
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan882     bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
883                     const DataWatcherInterface* p2) const {
884       return p1.get() < p2;
885     }
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan886     bool operator()(const DataWatcherInterface* p1,
887                     const std::unique_ptr<DataWatcherInterface>& p2) const {
888       return p1 < p2.get();
889     }
890   };
891 
892   ClientChannelFilter* chand_;
893   RefCountedPtr<Subchannel> subchannel_;
894   // Maps from the address of the watcher passed to us by the LB policy
895   // to the address of the WrapperWatcher that we passed to the underlying
896   // subchannel.  This is needed so that when the LB policy calls
897   // CancelConnectivityStateWatch() with its watcher, we know the
898   // corresponding WrapperWatcher to cancel on the underlying subchannel.
899   std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
900       ABSL_GUARDED_BY(*chand_->work_serializer_);
901   std::set<std::unique_ptr<DataWatcherInterface>, DataWatcherLessThan>
902       data_watchers_ ABSL_GUARDED_BY(*chand_->work_serializer_);
903 };
904 
905 //
906 // ClientChannelFilter::ExternalConnectivityWatcher
907 //
908 
ExternalConnectivityWatcher(ClientChannelFilter * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)909 ClientChannelFilter::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
910     ClientChannelFilter* chand, grpc_polling_entity pollent,
911     grpc_connectivity_state* state, grpc_closure* on_complete,
912     grpc_closure* watcher_timer_init)
913     : chand_(chand),
914       pollent_(pollent),
915       initial_state_(*state),
916       state_(state),
917       on_complete_(on_complete),
918       watcher_timer_init_(watcher_timer_init) {
919   grpc_polling_entity_add_to_pollset_set(&pollent_,
920                                          chand_->interested_parties_);
921   GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
922   {
923     MutexLock lock(&chand_->external_watchers_mu_);
924     // Will be deleted when the watch is complete.
925     GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
926     // Store a ref to the watcher in the external_watchers_ map.
927     chand->external_watchers_[on_complete] =
928         RefAsSubclass<ExternalConnectivityWatcher>(
929             DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
930   }
931   // Pass the ref from creating the object to Start().
932   chand_->work_serializer_->Run(
933       [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
934         // The ref is passed to AddWatcherLocked().
935         AddWatcherLocked();
936       },
937       DEBUG_LOCATION);
938 }
939 
940 ClientChannelFilter::ExternalConnectivityWatcher::
~ExternalConnectivityWatcher()941     ~ExternalConnectivityWatcher() {
942   grpc_polling_entity_del_from_pollset_set(&pollent_,
943                                            chand_->interested_parties_);
944   GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
945                            "ExternalConnectivityWatcher");
946 }
947 
948 void ClientChannelFilter::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ClientChannelFilter * chand,grpc_closure * on_complete,bool cancel)949     RemoveWatcherFromExternalWatchersMap(ClientChannelFilter* chand,
950                                          grpc_closure* on_complete,
951                                          bool cancel) {
952   RefCountedPtr<ExternalConnectivityWatcher> watcher;
953   {
954     MutexLock lock(&chand->external_watchers_mu_);
955     auto it = chand->external_watchers_.find(on_complete);
956     if (it != chand->external_watchers_.end()) {
957       watcher = std::move(it->second);
958       chand->external_watchers_.erase(it);
959     }
960   }
961   // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
962   // the mutex before calling it.
963   if (watcher != nullptr && cancel) watcher->Cancel();
964 }
965 
Notify(grpc_connectivity_state state,const absl::Status &)966 void ClientChannelFilter::ExternalConnectivityWatcher::Notify(
967     grpc_connectivity_state state, const absl::Status& /* status */) {
968   bool done = false;
969   if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
970                                      std::memory_order_relaxed)) {
971     return;  // Already done.
972   }
973   // Remove external watcher.
974   ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
975       chand_, on_complete_, /*cancel=*/false);
976   // Report new state to the user.
977   *state_ = state;
978   ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::OkStatus());
979   // Hop back into the work_serializer to clean up.
980   // Not needed in state SHUTDOWN, because the tracker will
981   // automatically remove all watchers in that case.
982   // Note: The callback takes a ref in case the ref inside the state tracker
983   // gets removed before the callback runs via a SHUTDOWN notification.
984   if (state != GRPC_CHANNEL_SHUTDOWN) {
985     Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
986     chand_->work_serializer_->Run(
987         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
988           RemoveWatcherLocked();
989           Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
990         },
991         DEBUG_LOCATION);
992   }
993 }
994 
Cancel()995 void ClientChannelFilter::ExternalConnectivityWatcher::Cancel() {
996   bool done = false;
997   if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
998                                      std::memory_order_relaxed)) {
999     return;  // Already done.
1000   }
1001   ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::CancelledError());
1002   // Hop back into the work_serializer to clean up.
1003   // Note: The callback takes a ref in case the ref inside the state tracker
1004   // gets removed before the callback runs via a SHUTDOWN notification.
1005   Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
1006   chand_->work_serializer_->Run(
1007       [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1008         RemoveWatcherLocked();
1009         Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
1010       },
1011       DEBUG_LOCATION);
1012 }
1013 
AddWatcherLocked()1014 void ClientChannelFilter::ExternalConnectivityWatcher::AddWatcherLocked() {
1015   Closure::Run(DEBUG_LOCATION, watcher_timer_init_, absl::OkStatus());
1016   // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
1017   chand_->state_tracker_.AddWatcher(
1018       initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
1019 }
1020 
RemoveWatcherLocked()1021 void ClientChannelFilter::ExternalConnectivityWatcher::RemoveWatcherLocked() {
1022   chand_->state_tracker_.RemoveWatcher(this);
1023 }
1024 
1025 //
1026 // ClientChannelFilter::ConnectivityWatcherAdder
1027 //
1028 
1029 class ClientChannelFilter::ConnectivityWatcherAdder final {
1030  public:
ConnectivityWatcherAdder(ClientChannelFilter * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1031   ConnectivityWatcherAdder(
1032       ClientChannelFilter* chand, grpc_connectivity_state initial_state,
1033       OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
1034       : chand_(chand),
1035         initial_state_(initial_state),
1036         watcher_(std::move(watcher)) {
1037     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1038     chand_->work_serializer_->Run(
1039         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1040           AddWatcherLocked();
1041         },
1042         DEBUG_LOCATION);
1043   }
1044 
1045  private:
AddWatcherLocked()1046   void AddWatcherLocked()
1047       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1048     chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
1049     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1050     delete this;
1051   }
1052 
1053   ClientChannelFilter* chand_;
1054   grpc_connectivity_state initial_state_;
1055   OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
1056 };
1057 
1058 //
1059 // ClientChannelFilter::ConnectivityWatcherRemover
1060 //
1061 
1062 class ClientChannelFilter::ConnectivityWatcherRemover final {
1063  public:
ConnectivityWatcherRemover(ClientChannelFilter * chand,AsyncConnectivityStateWatcherInterface * watcher)1064   ConnectivityWatcherRemover(ClientChannelFilter* chand,
1065                              AsyncConnectivityStateWatcherInterface* watcher)
1066       : chand_(chand), watcher_(watcher) {
1067     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
1068     chand_->work_serializer_->Run(
1069         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1070           RemoveWatcherLocked();
1071         },
1072         DEBUG_LOCATION);
1073   }
1074 
1075  private:
RemoveWatcherLocked()1076   void RemoveWatcherLocked()
1077       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1078     chand_->state_tracker_.RemoveWatcher(watcher_);
1079     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1080                              "ConnectivityWatcherRemover");
1081     delete this;
1082   }
1083 
1084   ClientChannelFilter* chand_;
1085   AsyncConnectivityStateWatcherInterface* watcher_;
1086 };
1087 
1088 //
1089 // ClientChannelFilter::ClientChannelControlHelper
1090 //
1091 
1092 class ClientChannelFilter::ClientChannelControlHelper final
1093     : public LoadBalancingPolicy::ChannelControlHelper {
1094  public:
ClientChannelControlHelper(ClientChannelFilter * chand)1095   explicit ClientChannelControlHelper(ClientChannelFilter* chand)
1096       : chand_(chand) {
1097     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1098   }
1099 
~ClientChannelControlHelper()1100   ~ClientChannelControlHelper() override {
1101     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1102                              "ClientChannelControlHelper");
1103   }
1104 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)1105   RefCountedPtr<SubchannelInterface> CreateSubchannel(
1106       const grpc_resolved_address& address, const ChannelArgs& per_address_args,
1107       const ChannelArgs& args) override
1108       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1109     if (chand_->resolver_ == nullptr) return nullptr;  // Shutting down.
1110     ChannelArgs subchannel_args = ClientChannelFilter::MakeSubchannelArgs(
1111         args, per_address_args, chand_->subchannel_pool_,
1112         chand_->default_authority_);
1113     // Create subchannel.
1114     RefCountedPtr<Subchannel> subchannel =
1115         chand_->client_channel_factory_->CreateSubchannel(address,
1116                                                           subchannel_args);
1117     if (subchannel == nullptr) return nullptr;
1118     // Make sure the subchannel has updated keepalive time.
1119     subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
1120     // Create and return wrapper for the subchannel.
1121     return MakeRefCounted<SubchannelWrapper>(chand_, std::move(subchannel));
1122   }
1123 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)1124   void UpdateState(grpc_connectivity_state state, const absl::Status& status,
1125                    RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
1126       override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1127     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1128     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1129       const char* extra = chand_->disconnect_error_.ok()
1130                               ? ""
1131                               : " (ignoring -- channel shutting down)";
1132       gpr_log(GPR_INFO, "chand=%p: update: state=%s status=(%s) picker=%p%s",
1133               chand_, ConnectivityStateName(state), status.ToString().c_str(),
1134               picker.get(), extra);
1135     }
1136     // Do update only if not shutting down.
1137     if (chand_->disconnect_error_.ok()) {
1138       chand_->UpdateStateAndPickerLocked(state, status, "helper",
1139                                          std::move(picker));
1140     }
1141   }
1142 
RequestReresolution()1143   void RequestReresolution() override
1144       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1145     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1146     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1147       gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
1148     }
1149     chand_->resolver_->RequestReresolutionLocked();
1150   }
1151 
GetTarget()1152   absl::string_view GetTarget() override { return chand_->target_uri_; }
1153 
GetAuthority()1154   absl::string_view GetAuthority() override {
1155     return chand_->default_authority_;
1156   }
1157 
GetChannelCredentials()1158   RefCountedPtr<grpc_channel_credentials> GetChannelCredentials() override {
1159     return chand_->channel_args_.GetObject<grpc_channel_credentials>()
1160         ->duplicate_without_call_credentials();
1161   }
1162 
GetUnsafeChannelCredentials()1163   RefCountedPtr<grpc_channel_credentials> GetUnsafeChannelCredentials()
1164       override {
1165     return chand_->channel_args_.GetObject<grpc_channel_credentials>()->Ref();
1166   }
1167 
GetEventEngine()1168   grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
1169     return chand_->owning_stack_->EventEngine();
1170   }
1171 
GetStatsPluginGroup()1172   GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup() override {
1173     return *chand_->owning_stack_->stats_plugin_group;
1174   }
1175 
AddTraceEvent(TraceSeverity severity,absl::string_view message)1176   void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
1177       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1178     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1179     if (chand_->channelz_node_ != nullptr) {
1180       chand_->channelz_node_->AddTraceEvent(
1181           ConvertSeverityEnum(severity),
1182           grpc_slice_from_copied_buffer(message.data(), message.size()));
1183     }
1184   }
1185 
1186  private:
ConvertSeverityEnum(TraceSeverity severity)1187   static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1188       TraceSeverity severity) {
1189     if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1190     if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1191     return channelz::ChannelTrace::Error;
1192   }
1193 
1194   ClientChannelFilter* chand_;
1195 };
1196 
1197 //
1198 // ClientChannelFilter implementation
1199 //
1200 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1201 grpc_error_handle ClientChannelFilter::Init(grpc_channel_element* elem,
1202                                             grpc_channel_element_args* args) {
1203   GPR_ASSERT(args->is_last);
1204   GPR_ASSERT(elem->filter == &kFilterVtableWithPromises ||
1205              elem->filter == &kFilterVtableWithoutPromises);
1206   grpc_error_handle error;
1207   new (elem->channel_data) ClientChannelFilter(args, &error);
1208   return error;
1209 }
1210 
Destroy(grpc_channel_element * elem)1211 void ClientChannelFilter::Destroy(grpc_channel_element* elem) {
1212   auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1213   chand->~ClientChannelFilter();
1214 }
1215 
1216 namespace {
1217 
GetSubchannelPool(const ChannelArgs & args)1218 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1219     const ChannelArgs& args) {
1220   if (args.GetBool(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL).value_or(false)) {
1221     return MakeRefCounted<LocalSubchannelPool>();
1222   }
1223   return GlobalSubchannelPool::instance();
1224 }
1225 
1226 }  // namespace
1227 
ClientChannelFilter(grpc_channel_element_args * args,grpc_error_handle * error)1228 ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args,
1229                                          grpc_error_handle* error)
1230     : channel_args_(args->channel_args),
1231       deadline_checking_enabled_(
1232           channel_args_.GetBool(GRPC_ARG_ENABLE_DEADLINE_CHECKS)
1233               .value_or(!channel_args_.WantMinimalStack())),
1234       owning_stack_(args->channel_stack),
1235       client_channel_factory_(channel_args_.GetObject<ClientChannelFactory>()),
1236       channelz_node_(channel_args_.GetObject<channelz::ChannelNode>()),
1237       interested_parties_(grpc_pollset_set_create()),
1238       service_config_parser_index_(
1239           internal::ClientChannelServiceConfigParser::ParserIndex()),
1240       work_serializer_(
1241           std::make_shared<WorkSerializer>(*args->channel_stack->event_engine)),
1242       state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1243       subchannel_pool_(GetSubchannelPool(channel_args_)) {
1244   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1245     gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1246             this, owning_stack_);
1247   }
1248   // Start backup polling.
1249   grpc_client_channel_start_backup_polling(interested_parties_);
1250   // Check client channel factory.
1251   if (client_channel_factory_ == nullptr) {
1252     *error = GRPC_ERROR_CREATE(
1253         "Missing client channel factory in args for client channel filter");
1254     return;
1255   }
1256   // Get default service config.  If none is specified via the client API,
1257   // we use an empty config.
1258   absl::optional<absl::string_view> service_config_json =
1259       channel_args_.GetString(GRPC_ARG_SERVICE_CONFIG);
1260   if (!service_config_json.has_value()) service_config_json = "{}";
1261   *error = absl::OkStatus();
1262   auto service_config =
1263       ServiceConfigImpl::Create(channel_args_, *service_config_json);
1264   if (!service_config.ok()) {
1265     *error = absl_status_to_grpc_error(service_config.status());
1266     return;
1267   }
1268   default_service_config_ = std::move(*service_config);
1269   // Get URI to resolve, using proxy mapper if needed.
1270   absl::optional<std::string> target_uri =
1271       channel_args_.GetOwnedString(GRPC_ARG_SERVER_URI);
1272   if (!target_uri.has_value()) {
1273     *error = GRPC_ERROR_CREATE(
1274         "target URI channel arg missing or wrong type in client channel "
1275         "filter");
1276     return;
1277   }
1278   target_uri_ = std::move(*target_uri);
1279   uri_to_resolve_ = CoreConfiguration::Get()
1280                         .proxy_mapper_registry()
1281                         .MapName(target_uri_, &channel_args_)
1282                         .value_or(target_uri_);
1283   // Make sure the URI to resolve is valid, so that we know that
1284   // resolver creation will succeed later.
1285   if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
1286           uri_to_resolve_)) {
1287     *error = GRPC_ERROR_CREATE(
1288         absl::StrCat("the target uri is not valid: ", uri_to_resolve_));
1289     return;
1290   }
1291   // Strip out service config channel arg, so that it doesn't affect
1292   // subchannel uniqueness when the args flow down to that layer.
1293   channel_args_ = channel_args_.Remove(GRPC_ARG_SERVICE_CONFIG);
1294   // Set initial keepalive time.
1295   auto keepalive_arg = channel_args_.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS);
1296   if (keepalive_arg.has_value()) {
1297     keepalive_time_ = Clamp(*keepalive_arg, 1, INT_MAX);
1298   } else {
1299     keepalive_time_ = -1;  // unset
1300   }
1301   // Set default authority.
1302   absl::optional<std::string> default_authority =
1303       channel_args_.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
1304   if (!default_authority.has_value()) {
1305     default_authority_ =
1306         CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
1307             target_uri_);
1308   } else {
1309     default_authority_ = std::move(*default_authority);
1310   }
1311   // Success.
1312   *error = absl::OkStatus();
1313 }
1314 
~ClientChannelFilter()1315 ClientChannelFilter::~ClientChannelFilter() {
1316   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1317     gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1318   }
1319   DestroyResolverAndLbPolicyLocked();
1320   // Stop backup polling.
1321   grpc_client_channel_stop_backup_polling(interested_parties_);
1322   grpc_pollset_set_destroy(interested_parties_);
1323 }
1324 
MakeCallPromise(grpc_channel_element * elem,CallArgs call_args,NextPromiseFactory)1325 ArenaPromise<ServerMetadataHandle> ClientChannelFilter::MakeCallPromise(
1326     grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) {
1327   auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1328   // TODO(roth): Is this the right lifetime story for calld?
1329   auto* calld = GetContext<Arena>()->ManagedNew<PromiseBasedCallData>(chand);
1330   return TrySeq(
1331       // Name resolution.
1332       calld->MakeNameResolutionPromise(std::move(call_args)),
1333       // Dynamic filter stack.
1334       [calld](CallArgs call_args) mutable {
1335         return calld->dynamic_filters()->channel_stack()->MakeClientCallPromise(
1336             std::move(call_args));
1337       });
1338 }
1339 
1340 OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)1341 ClientChannelFilter::CreateLoadBalancedCall(
1342     const grpc_call_element_args& args, grpc_polling_entity* pollent,
1343     grpc_closure* on_call_destruction_complete,
1344     absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) {
1345   promise_detail::Context<Arena> arena_ctx(args.arena);
1346   return OrphanablePtr<FilterBasedLoadBalancedCall>(
1347       args.arena->New<FilterBasedLoadBalancedCall>(
1348           this, args, pollent, on_call_destruction_complete,
1349           std::move(on_commit), is_transparent_retry));
1350 }
1351 
1352 ArenaPromise<ServerMetadataHandle>
CreateLoadBalancedCallPromise(CallArgs call_args,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)1353 ClientChannelFilter::CreateLoadBalancedCallPromise(
1354     CallArgs call_args, absl::AnyInvocable<void()> on_commit,
1355     bool is_transparent_retry) {
1356   OrphanablePtr<PromiseBasedLoadBalancedCall> lb_call(
1357       GetContext<Arena>()->New<PromiseBasedLoadBalancedCall>(
1358           this, std::move(on_commit), is_transparent_retry));
1359   auto* call_ptr = lb_call.get();
1360   return call_ptr->MakeCallPromise(std::move(call_args), std::move(lb_call));
1361 }
1362 
MakeSubchannelArgs(const ChannelArgs & channel_args,const ChannelArgs & address_args,const RefCountedPtr<SubchannelPoolInterface> & subchannel_pool,const std::string & channel_default_authority)1363 ChannelArgs ClientChannelFilter::MakeSubchannelArgs(
1364     const ChannelArgs& channel_args, const ChannelArgs& address_args,
1365     const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
1366     const std::string& channel_default_authority) {
1367   // Note that we start with the channel-level args and then apply the
1368   // per-address args, so that if a value is present in both, the one
1369   // in the channel-level args is used.  This is particularly important
1370   // for the GRPC_ARG_DEFAULT_AUTHORITY arg, which we want to allow
1371   // resolvers to set on a per-address basis only if the application
1372   // did not explicitly set it at the channel level.
1373   return channel_args.UnionWith(address_args)
1374       .SetObject(subchannel_pool)
1375       // If we haven't already set the default authority arg (i.e., it
1376       // was not explicitly set by the application nor overridden by
1377       // the resolver), add it from the channel's default.
1378       .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, channel_default_authority)
1379       // Remove channel args that should not affect subchannel
1380       // uniqueness.
1381       .Remove(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME)
1382       .Remove(GRPC_ARG_INHIBIT_HEALTH_CHECKING)
1383       .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE)
1384       // Remove all keys with the no-subchannel prefix.
1385       .RemoveAllKeysWithPrefix(GRPC_ARG_NO_SUBCHANNEL_PREFIX);
1386 }
1387 
ReprocessQueuedResolverCalls()1388 void ClientChannelFilter::ReprocessQueuedResolverCalls() {
1389   for (CallData* calld : resolver_queued_calls_) {
1390     calld->RemoveCallFromResolverQueuedCallsLocked();
1391     calld->RetryCheckResolutionLocked();
1392   }
1393   resolver_queued_calls_.clear();
1394 }
1395 
1396 namespace {
1397 
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1398 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1399     const Resolver::Result& resolver_result,
1400     const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1401   // Prefer the LB policy config found in the service config.
1402   if (parsed_service_config->parsed_lb_config() != nullptr) {
1403     return parsed_service_config->parsed_lb_config();
1404   }
1405   // Try the deprecated LB policy name from the service config.
1406   // If not, try the setting from channel args.
1407   absl::optional<absl::string_view> policy_name;
1408   if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1409     policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1410   } else {
1411     policy_name = resolver_result.args.GetString(GRPC_ARG_LB_POLICY_NAME);
1412     bool requires_config = false;
1413     if (policy_name.has_value() &&
1414         (!CoreConfiguration::Get()
1415               .lb_policy_registry()
1416               .LoadBalancingPolicyExists(*policy_name, &requires_config) ||
1417          requires_config)) {
1418       if (requires_config) {
1419         gpr_log(GPR_ERROR,
1420                 "LB policy: %s passed through channel_args must not "
1421                 "require a config. Using pick_first instead.",
1422                 std::string(*policy_name).c_str());
1423       } else {
1424         gpr_log(GPR_ERROR,
1425                 "LB policy: %s passed through channel_args does not exist. "
1426                 "Using pick_first instead.",
1427                 std::string(*policy_name).c_str());
1428       }
1429       policy_name = "pick_first";
1430     }
1431   }
1432   // Use pick_first if nothing was specified and we didn't select grpclb
1433   // above.
1434   if (!policy_name.has_value()) policy_name = "pick_first";
1435   // Now that we have the policy name, construct an empty config for it.
1436   Json config_json = Json::FromArray({Json::FromObject({
1437       {std::string(*policy_name), Json::FromObject({})},
1438   })});
1439   auto lb_policy_config =
1440       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
1441           config_json);
1442   // The policy name came from one of three places:
1443   // - The deprecated loadBalancingPolicy field in the service config,
1444   //   in which case the code in ClientChannelServiceConfigParser
1445   //   already verified that the policy does not require a config.
1446   // - One of the hard-coded values here, all of which are known to not
1447   //   require a config.
1448   // - A channel arg, in which case we check that the specified policy exists
1449   //   and accepts an empty config. If not, we revert to using pick_first
1450   //   lb_policy
1451   GPR_ASSERT(lb_policy_config.ok());
1452   return std::move(*lb_policy_config);
1453 }
1454 
1455 }  // namespace
1456 
OnResolverResultChangedLocked(Resolver::Result result)1457 void ClientChannelFilter::OnResolverResultChangedLocked(
1458     Resolver::Result result) {
1459   // Handle race conditions.
1460   if (resolver_ == nullptr) return;
1461   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1462     gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
1463   }
1464   // Grab resolver result health callback.
1465   auto resolver_callback = std::move(result.result_health_callback);
1466   absl::Status resolver_result_status;
1467   // We only want to trace the address resolution in the follow cases:
1468   // (a) Address resolution resulted in service config change.
1469   // (b) Address resolution that causes number of backends to go from
1470   //     zero to non-zero.
1471   // (c) Address resolution that causes number of backends to go from
1472   //     non-zero to zero.
1473   // (d) Address resolution that causes a new LB policy to be created.
1474   //
1475   // We track a list of strings to eventually be concatenated and traced.
1476   std::vector<const char*> trace_strings;
1477   const bool resolution_contains_addresses =
1478       result.addresses.ok() && !result.addresses->empty();
1479   if (!resolution_contains_addresses &&
1480       previous_resolution_contained_addresses_) {
1481     trace_strings.push_back("Address list became empty");
1482   } else if (resolution_contains_addresses &&
1483              !previous_resolution_contained_addresses_) {
1484     trace_strings.push_back("Address list became non-empty");
1485   }
1486   previous_resolution_contained_addresses_ = resolution_contains_addresses;
1487   std::string service_config_error_string_storage;
1488   if (!result.service_config.ok()) {
1489     service_config_error_string_storage =
1490         result.service_config.status().ToString();
1491     trace_strings.push_back(service_config_error_string_storage.c_str());
1492   }
1493   // Choose the service config.
1494   RefCountedPtr<ServiceConfig> service_config;
1495   RefCountedPtr<ConfigSelector> config_selector;
1496   if (!result.service_config.ok()) {
1497     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1498       gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
1499               this, result.service_config.status().ToString().c_str());
1500     }
1501     // If the service config was invalid, then fallback to the
1502     // previously returned service config.
1503     if (saved_service_config_ != nullptr) {
1504       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1505         gpr_log(GPR_INFO,
1506                 "chand=%p: resolver returned invalid service config. "
1507                 "Continuing to use previous service config.",
1508                 this);
1509       }
1510       service_config = saved_service_config_;
1511       config_selector = saved_config_selector_;
1512     } else {
1513       // We received a service config error and we don't have a
1514       // previous service config to fall back to.  Put the channel into
1515       // TRANSIENT_FAILURE.
1516       OnResolverErrorLocked(result.service_config.status());
1517       trace_strings.push_back("no valid service config");
1518       resolver_result_status =
1519           absl::UnavailableError("no valid service config");
1520     }
1521   } else if (*result.service_config == nullptr) {
1522     // Resolver did not return any service config.
1523     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1524       gpr_log(GPR_INFO,
1525               "chand=%p: resolver returned no service config. Using default "
1526               "service config for channel.",
1527               this);
1528     }
1529     service_config = default_service_config_;
1530   } else {
1531     // Use ServiceConfig and ConfigSelector returned by resolver.
1532     service_config = std::move(*result.service_config);
1533     config_selector = result.args.GetObjectRef<ConfigSelector>();
1534   }
1535   // Note: The only case in which service_config is null here is if the resolver
1536   // returned a service config error and we don't have a previous service
1537   // config to fall back to.
1538   if (service_config != nullptr) {
1539     // Extract global config for client channel.
1540     const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1541         static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1542             service_config->GetGlobalParsedConfig(
1543                 service_config_parser_index_));
1544     // Choose LB policy config.
1545     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
1546         ChooseLbPolicy(result, parsed_service_config);
1547     // Check if the ServiceConfig has changed.
1548     const bool service_config_changed =
1549         saved_service_config_ == nullptr ||
1550         service_config->json_string() != saved_service_config_->json_string();
1551     // Check if the ConfigSelector has changed.
1552     const bool config_selector_changed = !ConfigSelector::Equals(
1553         saved_config_selector_.get(), config_selector.get());
1554     // If either has changed, apply the global parameters now.
1555     if (service_config_changed || config_selector_changed) {
1556       // Update service config in control plane.
1557       UpdateServiceConfigInControlPlaneLocked(
1558           std::move(service_config), std::move(config_selector),
1559           std::string(lb_policy_config->name()));
1560     } else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1561       gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
1562     }
1563     // Create or update LB policy, as needed.
1564     resolver_result_status = CreateOrUpdateLbPolicyLocked(
1565         std::move(lb_policy_config),
1566         parsed_service_config->health_check_service_name(), std::move(result));
1567     if (service_config_changed || config_selector_changed) {
1568       // Start using new service config for calls.
1569       // This needs to happen after the LB policy has been updated, since
1570       // the ConfigSelector may need the LB policy to know about new
1571       // destinations before it can send RPCs to those destinations.
1572       UpdateServiceConfigInDataPlaneLocked();
1573       // TODO(ncteisen): might be worth somehow including a snippet of the
1574       // config in the trace, at the risk of bloating the trace logs.
1575       trace_strings.push_back("Service config changed");
1576     }
1577   }
1578   // Invoke resolver callback if needed.
1579   if (resolver_callback != nullptr) {
1580     resolver_callback(std::move(resolver_result_status));
1581   }
1582   // Add channel trace event.
1583   if (!trace_strings.empty()) {
1584     std::string message =
1585         absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
1586     if (channelz_node_ != nullptr) {
1587       channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
1588                                     grpc_slice_from_cpp_string(message));
1589     }
1590   }
1591 }
1592 
OnResolverErrorLocked(absl::Status status)1593 void ClientChannelFilter::OnResolverErrorLocked(absl::Status status) {
1594   if (resolver_ == nullptr) return;
1595   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1596     gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
1597             status.ToString().c_str());
1598   }
1599   // If we already have an LB policy from a previous resolution
1600   // result, then we continue to let it set the connectivity state.
1601   // Otherwise, we go into TRANSIENT_FAILURE.
1602   if (lb_policy_ == nullptr) {
1603     // Update connectivity state.
1604     UpdateStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1605                       "resolver failure");
1606     {
1607       MutexLock lock(&resolution_mu_);
1608       // Update resolver transient failure.
1609       resolver_transient_failure_error_ =
1610           MaybeRewriteIllegalStatusCode(status, "resolver");
1611       ReprocessQueuedResolverCalls();
1612     }
1613   }
1614 }
1615 
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,const absl::optional<std::string> & health_check_service_name,Resolver::Result result)1616 absl::Status ClientChannelFilter::CreateOrUpdateLbPolicyLocked(
1617     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
1618     const absl::optional<std::string>& health_check_service_name,
1619     Resolver::Result result) {
1620   // Construct update.
1621   LoadBalancingPolicy::UpdateArgs update_args;
1622   if (!result.addresses.ok()) {
1623     update_args.addresses = result.addresses.status();
1624   } else {
1625     update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
1626         std::move(*result.addresses));
1627   }
1628   update_args.config = std::move(lb_policy_config);
1629   update_args.resolution_note = std::move(result.resolution_note);
1630   // Remove the config selector from channel args so that we're not holding
1631   // unnecessary refs that cause it to be destroyed somewhere other than in the
1632   // WorkSerializer.
1633   update_args.args = result.args.Remove(GRPC_ARG_CONFIG_SELECTOR);
1634   // Add health check service name to channel args.
1635   if (health_check_service_name.has_value()) {
1636     update_args.args = update_args.args.Set(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME,
1637                                             *health_check_service_name);
1638   }
1639   // Create policy if needed.
1640   if (lb_policy_ == nullptr) {
1641     lb_policy_ = CreateLbPolicyLocked(update_args.args);
1642   }
1643   // Update the policy.
1644   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1645     gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
1646             lb_policy_.get());
1647   }
1648   return lb_policy_->UpdateLocked(std::move(update_args));
1649 }
1650 
1651 // Creates a new LB policy.
CreateLbPolicyLocked(const ChannelArgs & args)1652 OrphanablePtr<LoadBalancingPolicy> ClientChannelFilter::CreateLbPolicyLocked(
1653     const ChannelArgs& args) {
1654   // The LB policy will start in state CONNECTING but will not
1655   // necessarily send us an update synchronously, so set state to
1656   // CONNECTING (in case the resolver had previously failed and put the
1657   // channel into TRANSIENT_FAILURE) and make sure we have a queueing picker.
1658   UpdateStateAndPickerLocked(
1659       GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
1660       MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
1661   // Now create the LB policy.
1662   LoadBalancingPolicy::Args lb_policy_args;
1663   lb_policy_args.work_serializer = work_serializer_;
1664   lb_policy_args.channel_control_helper =
1665       std::make_unique<ClientChannelControlHelper>(this);
1666   lb_policy_args.args = args;
1667   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1668       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1669                                          &grpc_client_channel_trace);
1670   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1671     gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
1672             lb_policy.get());
1673   }
1674   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1675                                    interested_parties_);
1676   return lb_policy;
1677 }
1678 
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,std::string lb_policy_name)1679 void ClientChannelFilter::UpdateServiceConfigInControlPlaneLocked(
1680     RefCountedPtr<ServiceConfig> service_config,
1681     RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {
1682   std::string service_config_json(service_config->json_string());
1683   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1684     gpr_log(GPR_INFO, "chand=%p: using service config: \"%s\"", this,
1685             service_config_json.c_str());
1686   }
1687   // Save service config.
1688   saved_service_config_ = std::move(service_config);
1689   // Swap out the data used by GetChannelInfo().
1690   {
1691     MutexLock lock(&info_mu_);
1692     info_lb_policy_name_ = std::move(lb_policy_name);
1693     info_service_config_json_ = std::move(service_config_json);
1694   }
1695   // Save config selector.
1696   saved_config_selector_ = std::move(config_selector);
1697   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1698     gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this,
1699             saved_config_selector_.get());
1700   }
1701 }
1702 
UpdateServiceConfigInDataPlaneLocked()1703 void ClientChannelFilter::UpdateServiceConfigInDataPlaneLocked() {
1704   // Grab ref to service config.
1705   RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
1706   // Grab ref to config selector.  Use default if resolver didn't supply one.
1707   RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
1708   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1709     gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
1710             saved_config_selector_.get());
1711   }
1712   if (config_selector == nullptr) {
1713     config_selector =
1714         MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1715   }
1716   ChannelArgs new_args =
1717       channel_args_.SetObject(this).SetObject(service_config);
1718   bool enable_retries =
1719       !new_args.WantMinimalStack() &&
1720       new_args.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
1721   // Construct dynamic filter stack.
1722   std::vector<const grpc_channel_filter*> filters =
1723       config_selector->GetFilters();
1724   if (enable_retries) {
1725     filters.push_back(&RetryFilter::kVtable);
1726   } else {
1727     filters.push_back(&DynamicTerminationFilter::kFilterVtable);
1728   }
1729   RefCountedPtr<DynamicFilters> dynamic_filters =
1730       DynamicFilters::Create(new_args, std::move(filters));
1731   GPR_ASSERT(dynamic_filters != nullptr);
1732   // Grab data plane lock to update service config.
1733   //
1734   // We defer unreffing the old values (and deallocating memory) until
1735   // after releasing the lock to keep the critical section small.
1736   {
1737     MutexLock lock(&resolution_mu_);
1738     resolver_transient_failure_error_ = absl::OkStatus();
1739     // Update service config.
1740     received_service_config_data_ = true;
1741     // Old values will be unreffed after lock is released.
1742     service_config_.swap(service_config);
1743     config_selector_.swap(config_selector);
1744     dynamic_filters_.swap(dynamic_filters);
1745     // Re-process queued calls asynchronously.
1746     ReprocessQueuedResolverCalls();
1747   }
1748   // Old values will be unreffed after lock is released when they go out
1749   // of scope.
1750 }
1751 
CreateResolverLocked()1752 void ClientChannelFilter::CreateResolverLocked() {
1753   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1754     gpr_log(GPR_INFO, "chand=%p: starting name resolution for %s", this,
1755             uri_to_resolve_.c_str());
1756   }
1757   resolver_ = CoreConfiguration::Get().resolver_registry().CreateResolver(
1758       uri_to_resolve_, channel_args_, interested_parties_, work_serializer_,
1759       std::make_unique<ResolverResultHandler>(this));
1760   // Since the validity of the args was checked when the channel was created,
1761   // CreateResolver() must return a non-null result.
1762   GPR_ASSERT(resolver_ != nullptr);
1763   UpdateStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status(),
1764                     "started resolving");
1765   resolver_->StartLocked();
1766   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1767     gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
1768   }
1769 }
1770 
DestroyResolverAndLbPolicyLocked()1771 void ClientChannelFilter::DestroyResolverAndLbPolicyLocked() {
1772   if (resolver_ != nullptr) {
1773     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1774       gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
1775               resolver_.get());
1776     }
1777     resolver_.reset();
1778     // Clear resolution state.
1779     saved_service_config_.reset();
1780     saved_config_selector_.reset();
1781     // Acquire resolution lock to update config selector and associated state.
1782     // To minimize lock contention, we wait to unref these objects until
1783     // after we release the lock.
1784     RefCountedPtr<ServiceConfig> service_config_to_unref;
1785     RefCountedPtr<ConfigSelector> config_selector_to_unref;
1786     RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
1787     {
1788       MutexLock lock(&resolution_mu_);
1789       received_service_config_data_ = false;
1790       service_config_to_unref = std::move(service_config_);
1791       config_selector_to_unref = std::move(config_selector_);
1792       dynamic_filters_to_unref = std::move(dynamic_filters_);
1793     }
1794     // Clear LB policy if set.
1795     if (lb_policy_ != nullptr) {
1796       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1797         gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
1798                 lb_policy_.get());
1799       }
1800       grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
1801                                        interested_parties_);
1802       lb_policy_.reset();
1803     }
1804   }
1805 }
1806 
UpdateStateLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason)1807 void ClientChannelFilter::UpdateStateLocked(grpc_connectivity_state state,
1808                                             const absl::Status& status,
1809                                             const char* reason) {
1810   if (state != GRPC_CHANNEL_SHUTDOWN &&
1811       state_tracker_.state() == GRPC_CHANNEL_SHUTDOWN) {
1812     Crash("Illegal transition SHUTDOWN -> anything");
1813   }
1814   state_tracker_.SetState(state, status, reason);
1815   if (channelz_node_ != nullptr) {
1816     channelz_node_->SetConnectivityState(state);
1817     channelz_node_->AddTraceEvent(
1818         channelz::ChannelTrace::Severity::Info,
1819         grpc_slice_from_static_string(
1820             channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1821                 state)));
1822   }
1823 }
1824 
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)1825 void ClientChannelFilter::UpdateStateAndPickerLocked(
1826     grpc_connectivity_state state, const absl::Status& status,
1827     const char* reason,
1828     RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
1829   UpdateStateLocked(state, status, reason);
1830   // Grab the LB lock to update the picker and trigger reprocessing of the
1831   // queued picks.
1832   // Old picker will be unreffed after releasing the lock.
1833   MutexLock lock(&lb_mu_);
1834   picker_.swap(picker);
1835   // Reprocess queued picks.
1836   for (auto& call : lb_queued_calls_) {
1837     call->RemoveCallFromLbQueuedCallsLocked();
1838     call->RetryPickLocked();
1839   }
1840   lb_queued_calls_.clear();
1841 }
1842 
1843 namespace {
1844 
1845 // TODO(roth): Remove this in favor of the gprpp Match() function once
1846 // we can do that without breaking lock annotations.
1847 template <typename T>
HandlePickResult(LoadBalancingPolicy::PickResult * result,std::function<T (LoadBalancingPolicy::PickResult::Complete *)> complete_func,std::function<T (LoadBalancingPolicy::PickResult::Queue *)> queue_func,std::function<T (LoadBalancingPolicy::PickResult::Fail *)> fail_func,std::function<T (LoadBalancingPolicy::PickResult::Drop *)> drop_func)1848 T HandlePickResult(
1849     LoadBalancingPolicy::PickResult* result,
1850     std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
1851     std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
1852     std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
1853     std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) {
1854   auto* complete_pick =
1855       absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result);
1856   if (complete_pick != nullptr) {
1857     return complete_func(complete_pick);
1858   }
1859   auto* queue_pick =
1860       absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result);
1861   if (queue_pick != nullptr) {
1862     return queue_func(queue_pick);
1863   }
1864   auto* fail_pick =
1865       absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result);
1866   if (fail_pick != nullptr) {
1867     return fail_func(fail_pick);
1868   }
1869   auto* drop_pick =
1870       absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result);
1871   GPR_ASSERT(drop_pick != nullptr);
1872   return drop_func(drop_pick);
1873 }
1874 
1875 }  // namespace
1876 
DoPingLocked(grpc_transport_op * op)1877 grpc_error_handle ClientChannelFilter::DoPingLocked(grpc_transport_op* op) {
1878   if (state_tracker_.state() != GRPC_CHANNEL_READY) {
1879     return GRPC_ERROR_CREATE("channel not connected");
1880   }
1881   LoadBalancingPolicy::PickResult result;
1882   {
1883     MutexLock lock(&lb_mu_);
1884     result = picker_->Pick(LoadBalancingPolicy::PickArgs());
1885   }
1886   return HandlePickResult<grpc_error_handle>(
1887       &result,
1888       // Complete pick.
1889       [op](LoadBalancingPolicy::PickResult::Complete* complete_pick)
1890           ABSL_EXCLUSIVE_LOCKS_REQUIRED(
1891               *ClientChannelFilter::work_serializer_) {
1892             SubchannelWrapper* subchannel = static_cast<SubchannelWrapper*>(
1893                 complete_pick->subchannel.get());
1894             RefCountedPtr<ConnectedSubchannel> connected_subchannel =
1895                 subchannel->connected_subchannel();
1896             if (connected_subchannel == nullptr) {
1897               return GRPC_ERROR_CREATE("LB pick for ping not connected");
1898             }
1899             connected_subchannel->Ping(op->send_ping.on_initiate,
1900                                        op->send_ping.on_ack);
1901             return absl::OkStatus();
1902           },
1903       // Queue pick.
1904       [](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
1905         return GRPC_ERROR_CREATE("LB picker queued call");
1906       },
1907       // Fail pick.
1908       [](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
1909         return absl_status_to_grpc_error(fail_pick->status);
1910       },
1911       // Drop pick.
1912       [](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
1913         return absl_status_to_grpc_error(drop_pick->status);
1914       });
1915 }
1916 
StartTransportOpLocked(grpc_transport_op * op)1917 void ClientChannelFilter::StartTransportOpLocked(grpc_transport_op* op) {
1918   // Connectivity watch.
1919   if (op->start_connectivity_watch != nullptr) {
1920     state_tracker_.AddWatcher(op->start_connectivity_watch_state,
1921                               std::move(op->start_connectivity_watch));
1922   }
1923   if (op->stop_connectivity_watch != nullptr) {
1924     state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
1925   }
1926   // Ping.
1927   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1928     grpc_error_handle error = DoPingLocked(op);
1929     if (!error.ok()) {
1930       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate, error);
1931       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
1932     }
1933     op->bind_pollset = nullptr;
1934     op->send_ping.on_initiate = nullptr;
1935     op->send_ping.on_ack = nullptr;
1936   }
1937   // Reset backoff.
1938   if (op->reset_connect_backoff) {
1939     if (lb_policy_ != nullptr) {
1940       lb_policy_->ResetBackoffLocked();
1941     }
1942   }
1943   // Disconnect or enter IDLE.
1944   if (!op->disconnect_with_error.ok()) {
1945     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1946       gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
1947               StatusToString(op->disconnect_with_error).c_str());
1948     }
1949     DestroyResolverAndLbPolicyLocked();
1950     intptr_t value;
1951     if (grpc_error_get_int(op->disconnect_with_error,
1952                            StatusIntProperty::ChannelConnectivityState,
1953                            &value) &&
1954         static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
1955       if (disconnect_error_.ok()) {  // Ignore if we're shutting down.
1956         // Enter IDLE state.
1957         UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
1958                                    "channel entering IDLE", nullptr);
1959         // TODO(roth): Do we need to check for any queued picks here, in
1960         // case there's a race condition in the client_idle filter?
1961         // And maybe also check for calls in the resolver queue?
1962       }
1963     } else {
1964       // Disconnect.
1965       GPR_ASSERT(disconnect_error_.ok());
1966       disconnect_error_ = op->disconnect_with_error;
1967       UpdateStateAndPickerLocked(
1968           GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
1969           MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(
1970               grpc_error_to_absl_status(op->disconnect_with_error)));
1971       // TODO(roth): If this happens when we're still waiting for a
1972       // resolver result, we need to trigger failures for all calls in
1973       // the resolver queue here.
1974     }
1975   }
1976   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
1977   ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1978 }
1979 
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1980 void ClientChannelFilter::StartTransportOp(grpc_channel_element* elem,
1981                                            grpc_transport_op* op) {
1982   auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1983   GPR_ASSERT(op->set_accept_stream == false);
1984   // Handle bind_pollset.
1985   if (op->bind_pollset != nullptr) {
1986     grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1987   }
1988   // Pop into control plane work_serializer for remaining ops.
1989   GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1990   chand->work_serializer_->Run(
1991       [chand, op]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
1992         chand->StartTransportOpLocked(op);
1993       },
1994       DEBUG_LOCATION);
1995 }
1996 
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1997 void ClientChannelFilter::GetChannelInfo(grpc_channel_element* elem,
1998                                          const grpc_channel_info* info) {
1999   auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
2000   MutexLock lock(&chand->info_mu_);
2001   if (info->lb_policy_name != nullptr) {
2002     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.c_str());
2003   }
2004   if (info->service_config_json != nullptr) {
2005     *info->service_config_json =
2006         gpr_strdup(chand->info_service_config_json_.c_str());
2007   }
2008 }
2009 
TryToConnectLocked()2010 void ClientChannelFilter::TryToConnectLocked() {
2011   if (disconnect_error_.ok()) {
2012     if (lb_policy_ != nullptr) {
2013       lb_policy_->ExitIdleLocked();
2014     } else if (resolver_ == nullptr) {
2015       CreateResolverLocked();
2016     }
2017   }
2018   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
2019 }
2020 
CheckConnectivityState(bool try_to_connect)2021 grpc_connectivity_state ClientChannelFilter::CheckConnectivityState(
2022     bool try_to_connect) {
2023   // state_tracker_ is guarded by work_serializer_, which we're not
2024   // holding here.  But the one method of state_tracker_ that *is*
2025   // thread-safe to call without external synchronization is the state()
2026   // method, so we can disable thread-safety analysis for this one read.
2027   grpc_connectivity_state out = ABSL_TS_UNCHECKED_READ(state_tracker_).state();
2028   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
2029     GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
2030     work_serializer_->Run([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
2031                               *work_serializer_) { TryToConnectLocked(); },
2032                           DEBUG_LOCATION);
2033   }
2034   return out;
2035 }
2036 
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)2037 void ClientChannelFilter::AddConnectivityWatcher(
2038     grpc_connectivity_state initial_state,
2039     OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
2040   new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
2041 }
2042 
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)2043 void ClientChannelFilter::RemoveConnectivityWatcher(
2044     AsyncConnectivityStateWatcherInterface* watcher) {
2045   new ConnectivityWatcherRemover(this, watcher);
2046 }
2047 
2048 //
2049 // CallData implementation
2050 //
2051 
RemoveCallFromResolverQueuedCallsLocked()2052 void ClientChannelFilter::CallData::RemoveCallFromResolverQueuedCallsLocked() {
2053   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2054     gpr_log(GPR_INFO,
2055             "chand=%p calld=%p: removing from resolver queued picks list",
2056             chand(), this);
2057   }
2058   // Remove call's pollent from channel's interested_parties.
2059   grpc_polling_entity_del_from_pollset_set(pollent(),
2060                                            chand()->interested_parties_);
2061   // Note: There's no need to actually remove the call from the queue
2062   // here, because that will be done in
2063   // ResolverQueuedCallCanceller::CancelLocked() or
2064   // ClientChannelFilter::ReprocessQueuedResolverCalls().
2065 }
2066 
AddCallToResolverQueuedCallsLocked()2067 void ClientChannelFilter::CallData::AddCallToResolverQueuedCallsLocked() {
2068   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2069     gpr_log(
2070         GPR_INFO,
2071         "chand=%p calld=%p: adding to resolver queued picks list; pollent=%s",
2072         chand(), this, grpc_polling_entity_string(pollent()).c_str());
2073   }
2074   // Add call's pollent to channel's interested_parties, so that I/O
2075   // can be done under the call's CQ.
2076   grpc_polling_entity_add_to_pollset_set(pollent(),
2077                                          chand()->interested_parties_);
2078   // Add to queue.
2079   chand()->resolver_queued_calls_.insert(this);
2080   OnAddToQueueLocked();
2081 }
2082 
ApplyServiceConfigToCallLocked(const absl::StatusOr<RefCountedPtr<ConfigSelector>> & config_selector)2083 grpc_error_handle ClientChannelFilter::CallData::ApplyServiceConfigToCallLocked(
2084     const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector) {
2085   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2086     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2087             chand(), this);
2088   }
2089   if (!config_selector.ok()) return config_selector.status();
2090   // Create a ClientChannelServiceConfigCallData for the call.  This stores
2091   // a ref to the ServiceConfig and caches the right set of parsed configs
2092   // to use for the call.  The ClientChannelServiceConfigCallData will store
2093   // itself in the call context, so that it can be accessed by filters
2094   // below us in the stack, and it will be cleaned up when the call ends.
2095   auto* service_config_call_data =
2096       arena()->New<ClientChannelServiceConfigCallData>(arena(), call_context());
2097   // Use the ConfigSelector to determine the config for the call.
2098   absl::Status call_config_status =
2099       (*config_selector)
2100           ->GetCallConfig(
2101               {send_initial_metadata(), arena(), service_config_call_data});
2102   if (!call_config_status.ok()) {
2103     return absl_status_to_grpc_error(
2104         MaybeRewriteIllegalStatusCode(call_config_status, "ConfigSelector"));
2105   }
2106   // Apply our own method params to the call.
2107   auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
2108       service_config_call_data->GetMethodParsedConfig(
2109           chand()->service_config_parser_index_));
2110   if (method_params != nullptr) {
2111     // If the deadline from the service config is shorter than the one
2112     // from the client API, reset the deadline timer.
2113     if (chand()->deadline_checking_enabled_ &&
2114         method_params->timeout() != Duration::Zero()) {
2115       ResetDeadline(method_params->timeout());
2116     }
2117     // If the service config set wait_for_ready and the application
2118     // did not explicitly set it, use the value from the service config.
2119     auto* wait_for_ready =
2120         send_initial_metadata()->GetOrCreatePointer(WaitForReady());
2121     if (method_params->wait_for_ready().has_value() &&
2122         !wait_for_ready->explicitly_set) {
2123       wait_for_ready->value = method_params->wait_for_ready().value();
2124     }
2125   }
2126   return absl::OkStatus();
2127 }
2128 
CheckResolution(bool was_queued)2129 absl::optional<absl::Status> ClientChannelFilter::CallData::CheckResolution(
2130     bool was_queued) {
2131   // Check if we have a resolver result to use.
2132   absl::StatusOr<RefCountedPtr<ConfigSelector>> config_selector;
2133   {
2134     MutexLock lock(&chand()->resolution_mu_);
2135     bool result_ready = CheckResolutionLocked(&config_selector);
2136     // If no result is available, queue the call.
2137     if (!result_ready) {
2138       AddCallToResolverQueuedCallsLocked();
2139       return absl::nullopt;
2140     }
2141   }
2142   // We have a result.  Apply service config to call.
2143   grpc_error_handle error = ApplyServiceConfigToCallLocked(config_selector);
2144   // ConfigSelector must be unreffed inside the WorkSerializer.
2145   if (!IsWorkSerializerDispatchEnabled() && config_selector.ok()) {
2146     chand()->work_serializer_->Run(
2147         [config_selector = std::move(*config_selector)]() mutable {
2148           config_selector.reset();
2149         },
2150         DEBUG_LOCATION);
2151   }
2152   // Handle errors.
2153   if (!error.ok()) {
2154     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2155       gpr_log(GPR_INFO,
2156               "chand=%p calld=%p: error applying config to call: error=%s",
2157               chand(), this, StatusToString(error).c_str());
2158     }
2159     return error;
2160   }
2161   // If the call was queued, add trace annotation.
2162   if (was_queued) {
2163     auto* call_tracer = static_cast<CallTracerAnnotationInterface*>(
2164         call_context()[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
2165     if (call_tracer != nullptr) {
2166       call_tracer->RecordAnnotation("Delayed name resolution complete.");
2167     }
2168   }
2169   return absl::OkStatus();
2170 }
2171 
CheckResolutionLocked(absl::StatusOr<RefCountedPtr<ConfigSelector>> * config_selector)2172 bool ClientChannelFilter::CallData::CheckResolutionLocked(
2173     absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector) {
2174   // If we don't yet have a resolver result, we need to queue the call
2175   // until we get one.
2176   if (GPR_UNLIKELY(!chand()->received_service_config_data_)) {
2177     // If the resolver returned transient failure before returning the
2178     // first service config, fail any non-wait_for_ready calls.
2179     absl::Status resolver_error = chand()->resolver_transient_failure_error_;
2180     if (!resolver_error.ok() &&
2181         !send_initial_metadata()->GetOrCreatePointer(WaitForReady())->value) {
2182       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2183         gpr_log(GPR_INFO, "chand=%p calld=%p: resolution failed, failing call",
2184                 chand(), this);
2185       }
2186       *config_selector = absl_status_to_grpc_error(resolver_error);
2187       return true;
2188     }
2189     // Either the resolver has not yet returned a result, or it has
2190     // returned transient failure but the call is wait_for_ready.  In
2191     // either case, queue the call.
2192     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2193       gpr_log(GPR_INFO, "chand=%p calld=%p: no resolver result yet", chand(),
2194               this);
2195     }
2196     return false;
2197   }
2198   // Result found.
2199   *config_selector = chand()->config_selector_;
2200   dynamic_filters_ = chand()->dynamic_filters_;
2201   return true;
2202 }
2203 
2204 //
2205 // FilterBasedCallData implementation
2206 //
2207 
FilterBasedCallData(grpc_call_element * elem,const grpc_call_element_args & args)2208 ClientChannelFilter::FilterBasedCallData::FilterBasedCallData(
2209     grpc_call_element* elem, const grpc_call_element_args& args)
2210     : path_(CSliceRef(args.path)),
2211       call_context_(args.context),
2212       call_start_time_(args.start_time),
2213       deadline_(args.deadline),
2214       deadline_state_(
2215           elem, args,
2216           GPR_LIKELY(static_cast<ClientChannelFilter*>(elem->channel_data)
2217                          ->deadline_checking_enabled_)
2218               ? args.deadline
2219               : Timestamp::InfFuture()) {
2220   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2221     gpr_log(GPR_INFO, "chand=%p calld=%p: created call", chand(), this);
2222   }
2223 }
2224 
~FilterBasedCallData()2225 ClientChannelFilter::FilterBasedCallData::~FilterBasedCallData() {
2226   CSliceUnref(path_);
2227   // Make sure there are no remaining pending batches.
2228   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2229     GPR_ASSERT(pending_batches_[i] == nullptr);
2230   }
2231 }
2232 
Init(grpc_call_element * elem,const grpc_call_element_args * args)2233 grpc_error_handle ClientChannelFilter::FilterBasedCallData::Init(
2234     grpc_call_element* elem, const grpc_call_element_args* args) {
2235   new (elem->call_data) FilterBasedCallData(elem, *args);
2236   return absl::OkStatus();
2237 }
2238 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)2239 void ClientChannelFilter::FilterBasedCallData::Destroy(
2240     grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
2241     grpc_closure* then_schedule_closure) {
2242   auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2243   RefCountedPtr<DynamicFilters::Call> dynamic_call =
2244       std::move(calld->dynamic_call_);
2245   calld->~FilterBasedCallData();
2246   if (GPR_LIKELY(dynamic_call != nullptr)) {
2247     dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
2248   } else {
2249     // TODO(yashkt) : This can potentially be a Closure::Run
2250     ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
2251   }
2252 }
2253 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2254 void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch(
2255     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2256   auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2257   auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
2258   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace) &&
2259       !GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) {
2260     gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from above: %s", chand,
2261             calld, grpc_transport_stream_op_batch_string(batch, false).c_str());
2262   }
2263   if (GPR_LIKELY(chand->deadline_checking_enabled_)) {
2264     grpc_deadline_state_client_start_transport_stream_op_batch(
2265         &calld->deadline_state_, batch);
2266   }
2267   // Intercept recv_trailing_metadata to commit the call, in case we wind up
2268   // failing the call before we get down to the retry or LB call layer.
2269   if (batch->recv_trailing_metadata) {
2270     calld->original_recv_trailing_metadata_ready_ =
2271         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2272     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_,
2273                       RecvTrailingMetadataReadyForConfigSelectorCommitCallback,
2274                       calld, nullptr);
2275     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2276         &calld->recv_trailing_metadata_ready_;
2277   }
2278   // If we already have a dynamic call, pass the batch down to it.
2279   // Note that once we have done so, we do not need to acquire the channel's
2280   // resolution mutex, which is more efficient (especially for streaming calls).
2281   if (calld->dynamic_call_ != nullptr) {
2282     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2283       gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
2284               chand, calld, calld->dynamic_call_.get());
2285     }
2286     calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2287     return;
2288   }
2289   // We do not yet have a dynamic call.
2290   //
2291   // If we've previously been cancelled, immediately fail any new batches.
2292   if (GPR_UNLIKELY(!calld->cancel_error_.ok())) {
2293     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2294       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2295               chand, calld, StatusToString(calld->cancel_error_).c_str());
2296     }
2297     // Note: This will release the call combiner.
2298     grpc_transport_stream_op_batch_finish_with_failure(
2299         batch, calld->cancel_error_, calld->call_combiner());
2300     return;
2301   }
2302   // Handle cancellation.
2303   if (GPR_UNLIKELY(batch->cancel_stream)) {
2304     // Stash a copy of cancel_error in our call data, so that we can use
2305     // it for subsequent operations.  This ensures that if the call is
2306     // cancelled before any batches are passed down (e.g., if the deadline
2307     // is in the past when the call starts), we can return the right
2308     // error to the caller when the first batch does get passed down.
2309     calld->cancel_error_ = batch->payload->cancel_stream.cancel_error;
2310     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2311       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2312               calld, StatusToString(calld->cancel_error_).c_str());
2313     }
2314     // Fail all pending batches.
2315     calld->PendingBatchesFail(calld->cancel_error_, NoYieldCallCombiner);
2316     // Note: This will release the call combiner.
2317     grpc_transport_stream_op_batch_finish_with_failure(
2318         batch, calld->cancel_error_, calld->call_combiner());
2319     return;
2320   }
2321   // Add the batch to the pending list.
2322   calld->PendingBatchesAdd(batch);
2323   // For batches containing a send_initial_metadata op, acquire the
2324   // channel's resolution mutex to apply the service config to the call,
2325   // after which we will create a dynamic call.
2326   if (GPR_LIKELY(batch->send_initial_metadata)) {
2327     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2328       gpr_log(GPR_INFO,
2329               "chand=%p calld=%p: grabbing resolution mutex to apply service "
2330               "config",
2331               chand, calld);
2332     }
2333     // If we're still in IDLE, we need to start resolving.
2334     if (GPR_UNLIKELY(chand->CheckConnectivityState(false) ==
2335                      GRPC_CHANNEL_IDLE)) {
2336       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2337         gpr_log(GPR_INFO, "chand=%p calld=%p: triggering exit idle", chand,
2338                 calld);
2339       }
2340       // Bounce into the control plane work serializer to start resolving.
2341       GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ExitIdle");
2342       chand->work_serializer_->Run(
2343           [chand]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
2344             chand->CheckConnectivityState(/*try_to_connect=*/true);
2345             GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "ExitIdle");
2346           },
2347           DEBUG_LOCATION);
2348     }
2349     calld->TryCheckResolution(/*was_queued=*/false);
2350   } else {
2351     // For all other batches, release the call combiner.
2352     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2353       gpr_log(GPR_INFO,
2354               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2355               calld);
2356     }
2357     GRPC_CALL_COMBINER_STOP(calld->call_combiner(),
2358                             "batch does not include send_initial_metadata");
2359   }
2360 }
2361 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2362 void ClientChannelFilter::FilterBasedCallData::SetPollent(
2363     grpc_call_element* elem, grpc_polling_entity* pollent) {
2364   auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2365   calld->pollent_ = pollent;
2366 }
2367 
GetBatchIndex(grpc_transport_stream_op_batch * batch)2368 size_t ClientChannelFilter::FilterBasedCallData::GetBatchIndex(
2369     grpc_transport_stream_op_batch* batch) {
2370   // Note: It is important the send_initial_metadata be the first entry
2371   // here, since the code in CheckResolution() assumes it will be.
2372   if (batch->send_initial_metadata) return 0;
2373   if (batch->send_message) return 1;
2374   if (batch->send_trailing_metadata) return 2;
2375   if (batch->recv_initial_metadata) return 3;
2376   if (batch->recv_message) return 4;
2377   if (batch->recv_trailing_metadata) return 5;
2378   GPR_UNREACHABLE_CODE(return (size_t)-1);
2379 }
2380 
2381 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2382 void ClientChannelFilter::FilterBasedCallData::PendingBatchesAdd(
2383     grpc_transport_stream_op_batch* batch) {
2384   const size_t idx = GetBatchIndex(batch);
2385   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2386     gpr_log(GPR_INFO,
2387             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR,
2388             chand(), this, idx);
2389   }
2390   grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2391   GPR_ASSERT(pending == nullptr);
2392   pending = batch;
2393 }
2394 
2395 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2396 void ClientChannelFilter::FilterBasedCallData::FailPendingBatchInCallCombiner(
2397     void* arg, grpc_error_handle error) {
2398   grpc_transport_stream_op_batch* batch =
2399       static_cast<grpc_transport_stream_op_batch*>(arg);
2400   auto* calld =
2401       static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2402   // Note: This will release the call combiner.
2403   grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2404                                                      calld->call_combiner());
2405 }
2406 
2407 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2408 void ClientChannelFilter::FilterBasedCallData::PendingBatchesFail(
2409     grpc_error_handle error,
2410     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2411   GPR_ASSERT(!error.ok());
2412   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2413     size_t num_batches = 0;
2414     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2415       if (pending_batches_[i] != nullptr) ++num_batches;
2416     }
2417     gpr_log(GPR_INFO,
2418             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2419             chand(), this, num_batches, StatusToString(error).c_str());
2420   }
2421   CallCombinerClosureList closures;
2422   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2423     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2424     if (batch != nullptr) {
2425       batch->handler_private.extra_arg = this;
2426       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2427                         FailPendingBatchInCallCombiner, batch,
2428                         grpc_schedule_on_exec_ctx);
2429       closures.Add(&batch->handler_private.closure, error,
2430                    "PendingBatchesFail");
2431       batch = nullptr;
2432     }
2433   }
2434   if (yield_call_combiner_predicate(closures)) {
2435     closures.RunClosures(call_combiner());
2436   } else {
2437     closures.RunClosuresWithoutYielding(call_combiner());
2438   }
2439 }
2440 
2441 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2442 void ClientChannelFilter::FilterBasedCallData::ResumePendingBatchInCallCombiner(
2443     void* arg, grpc_error_handle /*ignored*/) {
2444   grpc_transport_stream_op_batch* batch =
2445       static_cast<grpc_transport_stream_op_batch*>(arg);
2446   auto* calld =
2447       static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2448   // Note: This will release the call combiner.
2449   calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2450 }
2451 
2452 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2453 void ClientChannelFilter::FilterBasedCallData::PendingBatchesResume() {
2454   // Retries not enabled; send down batches as-is.
2455   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2456     size_t num_batches = 0;
2457     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2458       if (pending_batches_[i] != nullptr) ++num_batches;
2459     }
2460     gpr_log(GPR_INFO,
2461             "chand=%p calld=%p: starting %" PRIuPTR
2462             " pending batches on dynamic_call=%p",
2463             chand(), this, num_batches, dynamic_call_.get());
2464   }
2465   CallCombinerClosureList closures;
2466   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2467     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2468     if (batch != nullptr) {
2469       batch->handler_private.extra_arg = this;
2470       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2471                         ResumePendingBatchInCallCombiner, batch, nullptr);
2472       closures.Add(&batch->handler_private.closure, absl::OkStatus(),
2473                    "resuming pending batch from client channel call");
2474       batch = nullptr;
2475     }
2476   }
2477   // Note: This will release the call combiner.
2478   closures.RunClosures(call_combiner());
2479 }
2480 
2481 // A class to handle the call combiner cancellation callback for a
2482 // queued pick.
2483 class ClientChannelFilter::FilterBasedCallData::ResolverQueuedCallCanceller
2484     final {
2485  public:
ResolverQueuedCallCanceller(FilterBasedCallData * calld)2486   explicit ResolverQueuedCallCanceller(FilterBasedCallData* calld)
2487       : calld_(calld) {
2488     GRPC_CALL_STACK_REF(calld->owning_call(), "ResolverQueuedCallCanceller");
2489     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2490                       grpc_schedule_on_exec_ctx);
2491     calld->call_combiner()->SetNotifyOnCancel(&closure_);
2492   }
2493 
2494  private:
CancelLocked(void * arg,grpc_error_handle error)2495   static void CancelLocked(void* arg, grpc_error_handle error) {
2496     auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2497     auto* calld = self->calld_;
2498     auto* chand = calld->chand();
2499     {
2500       MutexLock lock(&chand->resolution_mu_);
2501       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2502         gpr_log(GPR_INFO,
2503                 "chand=%p calld=%p: cancelling resolver queued pick: "
2504                 "error=%s self=%p calld->resolver_pick_canceller=%p",
2505                 chand, calld, StatusToString(error).c_str(), self,
2506                 calld->resolver_call_canceller_);
2507       }
2508       if (calld->resolver_call_canceller_ == self && !error.ok()) {
2509         // Remove pick from list of queued picks.
2510         calld->RemoveCallFromResolverQueuedCallsLocked();
2511         chand->resolver_queued_calls_.erase(calld);
2512         // Fail pending batches on the call.
2513         calld->PendingBatchesFail(error,
2514                                   YieldCallCombinerIfPendingBatchesFound);
2515       }
2516     }
2517     GRPC_CALL_STACK_UNREF(calld->owning_call(), "ResolvingQueuedCallCanceller");
2518     delete self;
2519   }
2520 
2521   FilterBasedCallData* calld_;
2522   grpc_closure closure_;
2523 };
2524 
TryCheckResolution(bool was_queued)2525 void ClientChannelFilter::FilterBasedCallData::TryCheckResolution(
2526     bool was_queued) {
2527   auto result = CheckResolution(was_queued);
2528   if (result.has_value()) {
2529     if (!result->ok()) {
2530       PendingBatchesFail(*result, YieldCallCombiner);
2531       return;
2532     }
2533     CreateDynamicCall();
2534   }
2535 }
2536 
OnAddToQueueLocked()2537 void ClientChannelFilter::FilterBasedCallData::OnAddToQueueLocked() {
2538   // Register call combiner cancellation callback.
2539   resolver_call_canceller_ = new ResolverQueuedCallCanceller(this);
2540 }
2541 
RetryCheckResolutionLocked()2542 void ClientChannelFilter::FilterBasedCallData::RetryCheckResolutionLocked() {
2543   // Lame the call combiner canceller.
2544   resolver_call_canceller_ = nullptr;
2545   // Do an async callback to resume call processing, so that we're not
2546   // doing it while holding the channel's resolution mutex.
2547   chand()->owning_stack_->EventEngine()->Run([this]() {
2548     ApplicationCallbackExecCtx application_exec_ctx;
2549     ExecCtx exec_ctx;
2550     TryCheckResolution(/*was_queued=*/true);
2551   });
2552 }
2553 
CreateDynamicCall()2554 void ClientChannelFilter::FilterBasedCallData::CreateDynamicCall() {
2555   DynamicFilters::Call::Args args = {dynamic_filters(), pollent_,       path_,
2556                                      call_start_time_,  deadline_,      arena(),
2557                                      call_context_,     call_combiner()};
2558   grpc_error_handle error;
2559   DynamicFilters* channel_stack = args.channel_stack.get();
2560   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2561     gpr_log(
2562         GPR_INFO,
2563         "chand=%p calld=%p: creating dynamic call stack on channel_stack=%p",
2564         chand(), this, channel_stack);
2565   }
2566   dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
2567   if (!error.ok()) {
2568     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2569       gpr_log(GPR_INFO,
2570               "chand=%p calld=%p: failed to create dynamic call: error=%s",
2571               chand(), this, StatusToString(error).c_str());
2572     }
2573     PendingBatchesFail(error, YieldCallCombiner);
2574     return;
2575   }
2576   PendingBatchesResume();
2577 }
2578 
2579 void ClientChannelFilter::FilterBasedCallData::
RecvTrailingMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error_handle error)2580     RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
2581         void* arg, grpc_error_handle error) {
2582   auto* calld = static_cast<FilterBasedCallData*>(arg);
2583   auto* chand = calld->chand();
2584   auto* service_config_call_data =
2585       GetServiceConfigCallData(calld->call_context());
2586   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2587     gpr_log(GPR_INFO,
2588             "chand=%p calld=%p: got recv_trailing_metadata_ready: error=%s "
2589             "service_config_call_data=%p",
2590             chand, calld, StatusToString(error).c_str(),
2591             service_config_call_data);
2592   }
2593   if (service_config_call_data != nullptr) {
2594     service_config_call_data->Commit();
2595   }
2596   // Chain to original callback.
2597   Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
2598                error);
2599 }
2600 
2601 //
2602 // ClientChannelFilter::LoadBalancedCall::LbCallState
2603 //
2604 
2605 class ClientChannelFilter::LoadBalancedCall::LbCallState final
2606     : public ClientChannelLbCallState {
2607  public:
LbCallState(LoadBalancedCall * lb_call)2608   explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
2609 
Alloc(size_t size)2610   void* Alloc(size_t size) override { return lb_call_->arena()->Alloc(size); }
2611 
2612   // Internal API to allow first-party LB policies to access per-call
2613   // attributes set by the ConfigSelector.
2614   ServiceConfigCallData::CallAttributeInterface* GetCallAttribute(
2615       UniqueTypeName type) const override;
2616 
2617   ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override;
2618 
2619  private:
2620   LoadBalancedCall* lb_call_;
2621 };
2622 
2623 //
2624 // ClientChannelFilter::LoadBalancedCall::Metadata
2625 //
2626 
2627 class ClientChannelFilter::LoadBalancedCall::Metadata final
2628     : public LoadBalancingPolicy::MetadataInterface {
2629  public:
Metadata(grpc_metadata_batch * batch)2630   explicit Metadata(grpc_metadata_batch* batch) : batch_(batch) {}
2631 
Add(absl::string_view key,absl::string_view value)2632   void Add(absl::string_view key, absl::string_view value) override {
2633     if (batch_ == nullptr) return;
2634     // Gross, egregious hack to support legacy grpclb behavior.
2635     // TODO(ctiller): Use a promise context for this once that plumbing is done.
2636     if (key == GrpcLbClientStatsMetadata::key()) {
2637       batch_->Set(
2638           GrpcLbClientStatsMetadata(),
2639           const_cast<GrpcLbClientStats*>(
2640               reinterpret_cast<const GrpcLbClientStats*>(value.data())));
2641       return;
2642     }
2643     batch_->Append(key, Slice::FromStaticString(value),
2644                    [key](absl::string_view error, const Slice& value) {
2645                      gpr_log(GPR_ERROR, "%s",
2646                              absl::StrCat(error, " key:", key,
2647                                           " value:", value.as_string_view())
2648                                  .c_str());
2649                    });
2650   }
2651 
TestOnlyCopyToVector()2652   std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
2653       override {
2654     if (batch_ == nullptr) return {};
2655     Encoder encoder;
2656     batch_->Encode(&encoder);
2657     return encoder.Take();
2658   }
2659 
Lookup(absl::string_view key,std::string * buffer) const2660   absl::optional<absl::string_view> Lookup(absl::string_view key,
2661                                            std::string* buffer) const override {
2662     if (batch_ == nullptr) return absl::nullopt;
2663     return batch_->GetStringValue(key, buffer);
2664   }
2665 
2666  private:
2667   class Encoder final {
2668    public:
Encode(const Slice & key,const Slice & value)2669     void Encode(const Slice& key, const Slice& value) {
2670       out_.emplace_back(std::string(key.as_string_view()),
2671                         std::string(value.as_string_view()));
2672     }
2673 
2674     template <class Which>
Encode(Which,const typename Which::ValueType & value)2675     void Encode(Which, const typename Which::ValueType& value) {
2676       auto value_slice = Which::Encode(value);
2677       out_.emplace_back(std::string(Which::key()),
2678                         std::string(value_slice.as_string_view()));
2679     }
2680 
Encode(GrpcTimeoutMetadata,const typename GrpcTimeoutMetadata::ValueType &)2681     void Encode(GrpcTimeoutMetadata,
2682                 const typename GrpcTimeoutMetadata::ValueType&) {}
Encode(HttpPathMetadata,const Slice &)2683     void Encode(HttpPathMetadata, const Slice&) {}
Encode(HttpMethodMetadata,const typename HttpMethodMetadata::ValueType &)2684     void Encode(HttpMethodMetadata,
2685                 const typename HttpMethodMetadata::ValueType&) {}
2686 
Take()2687     std::vector<std::pair<std::string, std::string>> Take() {
2688       return std::move(out_);
2689     }
2690 
2691    private:
2692     std::vector<std::pair<std::string, std::string>> out_;
2693   };
2694 
2695   grpc_metadata_batch* batch_;
2696 };
2697 
2698 //
2699 // ClientChannelFilter::LoadBalancedCall::LbCallState
2700 //
2701 
2702 ServiceConfigCallData::CallAttributeInterface*
GetCallAttribute(UniqueTypeName type) const2703 ClientChannelFilter::LoadBalancedCall::LbCallState::GetCallAttribute(
2704     UniqueTypeName type) const {
2705   auto* service_config_call_data =
2706       GetServiceConfigCallData(lb_call_->call_context_);
2707   return service_config_call_data->GetCallAttribute(type);
2708 }
2709 
2710 ClientCallTracer::CallAttemptTracer*
GetCallAttemptTracer() const2711 ClientChannelFilter::LoadBalancedCall::LbCallState::GetCallAttemptTracer()
2712     const {
2713   return lb_call_->call_attempt_tracer();
2714 }
2715 
2716 //
2717 // ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor
2718 //
2719 
2720 class ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor final
2721     : public LoadBalancingPolicy::BackendMetricAccessor {
2722  public:
BackendMetricAccessor(LoadBalancedCall * lb_call,grpc_metadata_batch * recv_trailing_metadata)2723   BackendMetricAccessor(LoadBalancedCall* lb_call,
2724                         grpc_metadata_batch* recv_trailing_metadata)
2725       : lb_call_(lb_call), recv_trailing_metadata_(recv_trailing_metadata) {}
2726 
GetBackendMetricData()2727   const BackendMetricData* GetBackendMetricData() override {
2728     if (lb_call_->backend_metric_data_ == nullptr &&
2729         recv_trailing_metadata_ != nullptr) {
2730       if (const auto* md = recv_trailing_metadata_->get_pointer(
2731               EndpointLoadMetricsBinMetadata())) {
2732         BackendMetricAllocator allocator(lb_call_->arena());
2733         lb_call_->backend_metric_data_ =
2734             ParseBackendMetricData(md->as_string_view(), &allocator);
2735       }
2736     }
2737     return lb_call_->backend_metric_data_;
2738   }
2739 
2740  private:
2741   class BackendMetricAllocator final : public BackendMetricAllocatorInterface {
2742    public:
BackendMetricAllocator(Arena * arena)2743     explicit BackendMetricAllocator(Arena* arena) : arena_(arena) {}
2744 
AllocateBackendMetricData()2745     BackendMetricData* AllocateBackendMetricData() override {
2746       return arena_->New<BackendMetricData>();
2747     }
2748 
AllocateString(size_t size)2749     char* AllocateString(size_t size) override {
2750       return static_cast<char*>(arena_->Alloc(size));
2751     }
2752 
2753    private:
2754     Arena* arena_;
2755   };
2756 
2757   LoadBalancedCall* lb_call_;
2758   grpc_metadata_batch* recv_trailing_metadata_;
2759 };
2760 
2761 //
2762 // ClientChannelFilter::LoadBalancedCall
2763 //
2764 
2765 namespace {
2766 
CreateCallAttemptTracer(grpc_call_context_element * context,bool is_transparent_retry)2767 void CreateCallAttemptTracer(grpc_call_context_element* context,
2768                              bool is_transparent_retry) {
2769   auto* call_tracer = static_cast<ClientCallTracer*>(
2770       context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
2771   if (call_tracer == nullptr) return;
2772   auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
2773   context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
2774 }
2775 
2776 }  // namespace
2777 
LoadBalancedCall(ClientChannelFilter * chand,grpc_call_context_element * call_context,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2778 ClientChannelFilter::LoadBalancedCall::LoadBalancedCall(
2779     ClientChannelFilter* chand, grpc_call_context_element* call_context,
2780     absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
2781     : InternallyRefCounted(
2782           GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)
2783               ? "LoadBalancedCall"
2784               : nullptr),
2785       chand_(chand),
2786       on_commit_(std::move(on_commit)),
2787       call_context_(call_context) {
2788   CreateCallAttemptTracer(call_context, is_transparent_retry);
2789   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2790     gpr_log(GPR_INFO, "chand=%p lb_call=%p: created", chand_, this);
2791   }
2792 }
2793 
~LoadBalancedCall()2794 ClientChannelFilter::LoadBalancedCall::~LoadBalancedCall() {
2795   if (backend_metric_data_ != nullptr) {
2796     backend_metric_data_->BackendMetricData::~BackendMetricData();
2797   }
2798 }
2799 
RecordCallCompletion(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,grpc_transport_stream_stats * transport_stream_stats,absl::string_view peer_address)2800 void ClientChannelFilter::LoadBalancedCall::RecordCallCompletion(
2801     absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
2802     grpc_transport_stream_stats* transport_stream_stats,
2803     absl::string_view peer_address) {
2804   // If we have a tracer, notify it.
2805   if (call_attempt_tracer() != nullptr) {
2806     call_attempt_tracer()->RecordReceivedTrailingMetadata(
2807         status, recv_trailing_metadata, transport_stream_stats);
2808   }
2809   // If the LB policy requested a callback for trailing metadata, invoke
2810   // the callback.
2811   if (lb_subchannel_call_tracker_ != nullptr) {
2812     Metadata trailing_metadata(recv_trailing_metadata);
2813     BackendMetricAccessor backend_metric_accessor(this, recv_trailing_metadata);
2814     LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
2815         peer_address, status, &trailing_metadata, &backend_metric_accessor};
2816     lb_subchannel_call_tracker_->Finish(args);
2817     lb_subchannel_call_tracker_.reset();
2818   }
2819 }
2820 
RecordLatency()2821 void ClientChannelFilter::LoadBalancedCall::RecordLatency() {
2822   // Compute latency and report it to the tracer.
2823   if (call_attempt_tracer() != nullptr) {
2824     gpr_timespec latency =
2825         gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
2826     call_attempt_tracer()->RecordEnd(latency);
2827   }
2828 }
2829 
2830 void ClientChannelFilter::LoadBalancedCall::
RemoveCallFromLbQueuedCallsLocked()2831     RemoveCallFromLbQueuedCallsLocked() {
2832   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2833     gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
2834             chand_, this);
2835   }
2836   // Remove pollset_set linkage.
2837   grpc_polling_entity_del_from_pollset_set(pollent(),
2838                                            chand_->interested_parties_);
2839   // Note: There's no need to actually remove the call from the queue
2840   // here, beacuse that will be done in either
2841   // LbQueuedCallCanceller::CancelLocked() or
2842   // in ClientChannelFilter::UpdateStateAndPickerLocked().
2843 }
2844 
AddCallToLbQueuedCallsLocked()2845 void ClientChannelFilter::LoadBalancedCall::AddCallToLbQueuedCallsLocked() {
2846   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2847     gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
2848             chand_, this);
2849   }
2850   // Add call's pollent to channel's interested_parties, so that I/O
2851   // can be done under the call's CQ.
2852   grpc_polling_entity_add_to_pollset_set(pollent(),
2853                                          chand_->interested_parties_);
2854   // Add to queue.
2855   chand_->lb_queued_calls_.insert(Ref());
2856   OnAddToQueueLocked();
2857 }
2858 
2859 absl::optional<absl::Status>
PickSubchannel(bool was_queued)2860 ClientChannelFilter::LoadBalancedCall::PickSubchannel(bool was_queued) {
2861   // We may accumulate multiple pickers here, because if a picker says
2862   // to queue the call, we check again to see if the picker has been
2863   // updated before we queue it.
2864   // We need to unref pickers in the WorkSerializer.
2865   std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
2866   auto cleanup = absl::MakeCleanup([&]() {
2867     if (IsWorkSerializerDispatchEnabled()) return;
2868     chand_->work_serializer_->Run(
2869         [pickers = std::move(pickers)]() mutable {
2870           for (auto& picker : pickers) {
2871             picker.reset(DEBUG_LOCATION, "PickSubchannel");
2872           }
2873         },
2874         DEBUG_LOCATION);
2875   });
2876   absl::AnyInvocable<void(RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>)>
2877       set_picker;
2878   if (!IsWorkSerializerDispatchEnabled()) {
2879     set_picker =
2880         [&](RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
2881           pickers.emplace_back(std::move(picker));
2882         };
2883   } else {
2884     pickers.emplace_back();
2885     set_picker =
2886         [&](RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
2887           pickers[0] = std::move(picker);
2888         };
2889   }
2890   // Grab mutex and take a ref to the picker.
2891   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2892     gpr_log(GPR_INFO, "chand=%p lb_call=%p: grabbing LB mutex to get picker",
2893             chand_, this);
2894   }
2895   RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
2896   {
2897     MutexLock lock(&chand_->lb_mu_);
2898     set_picker(chand_->picker_);
2899   }
2900   while (true) {
2901     // TODO(roth): Fix race condition in channel_idle filter and any
2902     // other possible causes of this.
2903     if (pickers.back() == nullptr) {
2904       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2905         gpr_log(GPR_ERROR, "chand=%p lb_call=%p: picker is null, failing call",
2906                 chand_, this);
2907       }
2908       return absl::InternalError("picker is null -- shouldn't happen");
2909     }
2910     // Do pick.
2911     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2912       gpr_log(GPR_INFO, "chand=%p lb_call=%p: performing pick with picker=%p",
2913               chand_, this, pickers.back().get());
2914     }
2915     grpc_error_handle error;
2916     bool pick_complete = PickSubchannelImpl(pickers.back().get(), &error);
2917     if (!pick_complete) {
2918       RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> old_picker;
2919       MutexLock lock(&chand_->lb_mu_);
2920       // If picker has been swapped out since we grabbed it, try again.
2921       if (pickers.back() != chand_->picker_) {
2922         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2923           gpr_log(GPR_INFO,
2924                   "chand=%p lb_call=%p: pick not complete, but picker changed",
2925                   chand_, this);
2926         }
2927         if (IsWorkSerializerDispatchEnabled()) {
2928           // Don't unref until after we release the mutex.
2929           old_picker = std::move(pickers.back());
2930         }
2931         set_picker(chand_->picker_);
2932         continue;
2933       }
2934       // Otherwise queue the pick to try again later when we get a new picker.
2935       AddCallToLbQueuedCallsLocked();
2936       return absl::nullopt;
2937     }
2938     // Pick is complete.
2939     // If it was queued, add a trace annotation.
2940     if (was_queued && call_attempt_tracer() != nullptr) {
2941       call_attempt_tracer()->RecordAnnotation("Delayed LB pick complete.");
2942     }
2943     // If the pick failed, fail the call.
2944     if (!error.ok()) {
2945       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2946         gpr_log(GPR_INFO,
2947                 "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
2948                 chand_, this, StatusToString(error).c_str());
2949       }
2950       return error;
2951     }
2952     // Pick succeeded.
2953     Commit();
2954     return absl::OkStatus();
2955   }
2956 }
2957 
PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker * picker,grpc_error_handle * error)2958 bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl(
2959     LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) {
2960   GPR_ASSERT(connected_subchannel_ == nullptr);
2961   // Perform LB pick.
2962   LoadBalancingPolicy::PickArgs pick_args;
2963   Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
2964   GPR_ASSERT(path != nullptr);
2965   pick_args.path = path->as_string_view();
2966   LbCallState lb_call_state(this);
2967   pick_args.call_state = &lb_call_state;
2968   Metadata initial_metadata(send_initial_metadata());
2969   pick_args.initial_metadata = &initial_metadata;
2970   auto result = picker->Pick(pick_args);
2971   return HandlePickResult<bool>(
2972       &result,
2973       // CompletePick
2974       [this](LoadBalancingPolicy::PickResult::Complete* complete_pick) {
2975         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2976           gpr_log(GPR_INFO,
2977                   "chand=%p lb_call=%p: LB pick succeeded: subchannel=%p",
2978                   chand_, this, complete_pick->subchannel.get());
2979         }
2980         GPR_ASSERT(complete_pick->subchannel != nullptr);
2981         // Grab a ref to the connected subchannel while we're still
2982         // holding the data plane mutex.
2983         SubchannelWrapper* subchannel =
2984             static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
2985         connected_subchannel_ = subchannel->connected_subchannel();
2986         // If the subchannel has no connected subchannel (e.g., if the
2987         // subchannel has moved out of state READY but the LB policy hasn't
2988         // yet seen that change and given us a new picker), then just
2989         // queue the pick.  We'll try again as soon as we get a new picker.
2990         if (connected_subchannel_ == nullptr) {
2991           if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2992             gpr_log(GPR_INFO,
2993                     "chand=%p lb_call=%p: subchannel returned by LB picker "
2994                     "has no connected subchannel; queueing pick",
2995                     chand_, this);
2996           }
2997           return false;
2998         }
2999         lb_subchannel_call_tracker_ =
3000             std::move(complete_pick->subchannel_call_tracker);
3001         if (lb_subchannel_call_tracker_ != nullptr) {
3002           lb_subchannel_call_tracker_->Start();
3003         }
3004         return true;
3005       },
3006       // QueuePick
3007       [this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
3008         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3009           gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_,
3010                   this);
3011         }
3012         return false;
3013       },
3014       // FailPick
3015       [this, &error](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
3016         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3017           gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s", chand_,
3018                   this, fail_pick->status.ToString().c_str());
3019         }
3020         // If wait_for_ready is false, then the error indicates the RPC
3021         // attempt's final status.
3022         if (!send_initial_metadata()
3023                  ->GetOrCreatePointer(WaitForReady())
3024                  ->value) {
3025           *error = absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
3026               std::move(fail_pick->status), "LB pick"));
3027           return true;
3028         }
3029         // If wait_for_ready is true, then queue to retry when we get a new
3030         // picker.
3031         return false;
3032       },
3033       // DropPick
3034       [this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
3035         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3036           gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s", chand_,
3037                   this, drop_pick->status.ToString().c_str());
3038         }
3039         *error = grpc_error_set_int(
3040             absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
3041                 std::move(drop_pick->status), "LB drop")),
3042             StatusIntProperty::kLbPolicyDrop, 1);
3043         return true;
3044       });
3045 }
3046 
3047 //
3048 // ClientChannelFilter::FilterBasedLoadBalancedCall
3049 //
3050 
FilterBasedLoadBalancedCall(ClientChannelFilter * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)3051 ClientChannelFilter::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall(
3052     ClientChannelFilter* chand, const grpc_call_element_args& args,
3053     grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
3054     absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
3055     : LoadBalancedCall(chand, args.context, std::move(on_commit),
3056                        is_transparent_retry),
3057       deadline_(args.deadline),
3058       arena_(args.arena),
3059       owning_call_(args.call_stack),
3060       call_combiner_(args.call_combiner),
3061       pollent_(pollent),
3062       on_call_destruction_complete_(on_call_destruction_complete) {}
3063 
3064 ClientChannelFilter::FilterBasedLoadBalancedCall::
~FilterBasedLoadBalancedCall()3065     ~FilterBasedLoadBalancedCall() {
3066   // Make sure there are no remaining pending batches.
3067   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3068     GPR_ASSERT(pending_batches_[i] == nullptr);
3069   }
3070   if (on_call_destruction_complete_ != nullptr) {
3071     ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
3072                  absl::OkStatus());
3073   }
3074 }
3075 
Orphan()3076 void ClientChannelFilter::FilterBasedLoadBalancedCall::Orphan() {
3077   // If the recv_trailing_metadata op was never started, then notify
3078   // about call completion here, as best we can.  We assume status
3079   // CANCELLED in this case.
3080   if (recv_trailing_metadata_ == nullptr) {
3081     RecordCallCompletion(absl::CancelledError("call cancelled"), nullptr,
3082                          nullptr, "");
3083   }
3084   RecordLatency();
3085   // Delegate to parent.
3086   LoadBalancedCall::Orphan();
3087 }
3088 
GetBatchIndex(grpc_transport_stream_op_batch * batch)3089 size_t ClientChannelFilter::FilterBasedLoadBalancedCall::GetBatchIndex(
3090     grpc_transport_stream_op_batch* batch) {
3091   // Note: It is important the send_initial_metadata be the first entry
3092   // here, since the code in PickSubchannelImpl() assumes it will be.
3093   if (batch->send_initial_metadata) return 0;
3094   if (batch->send_message) return 1;
3095   if (batch->send_trailing_metadata) return 2;
3096   if (batch->recv_initial_metadata) return 3;
3097   if (batch->recv_message) return 4;
3098   if (batch->recv_trailing_metadata) return 5;
3099   GPR_UNREACHABLE_CODE(return (size_t)-1);
3100 }
3101 
3102 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)3103 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesAdd(
3104     grpc_transport_stream_op_batch* batch) {
3105   const size_t idx = GetBatchIndex(batch);
3106   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3107     gpr_log(GPR_INFO,
3108             "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
3109             chand(), this, idx);
3110   }
3111   GPR_ASSERT(pending_batches_[idx] == nullptr);
3112   pending_batches_[idx] = batch;
3113 }
3114 
3115 // This is called via the call combiner, so access to calld is synchronized.
3116 void ClientChannelFilter::FilterBasedLoadBalancedCall::
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)3117     FailPendingBatchInCallCombiner(void* arg, grpc_error_handle error) {
3118   grpc_transport_stream_op_batch* batch =
3119       static_cast<grpc_transport_stream_op_batch*>(arg);
3120   auto* self = static_cast<FilterBasedLoadBalancedCall*>(
3121       batch->handler_private.extra_arg);
3122   // Note: This will release the call combiner.
3123   grpc_transport_stream_op_batch_finish_with_failure(batch, error,
3124                                                      self->call_combiner_);
3125 }
3126 
3127 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)3128 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesFail(
3129     grpc_error_handle error,
3130     YieldCallCombinerPredicate yield_call_combiner_predicate) {
3131   GPR_ASSERT(!error.ok());
3132   failure_error_ = error;
3133   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3134     size_t num_batches = 0;
3135     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3136       if (pending_batches_[i] != nullptr) ++num_batches;
3137     }
3138     gpr_log(GPR_INFO,
3139             "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s",
3140             chand(), this, num_batches, StatusToString(error).c_str());
3141   }
3142   CallCombinerClosureList closures;
3143   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3144     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
3145     if (batch != nullptr) {
3146       batch->handler_private.extra_arg = this;
3147       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3148                         FailPendingBatchInCallCombiner, batch,
3149                         grpc_schedule_on_exec_ctx);
3150       closures.Add(&batch->handler_private.closure, error,
3151                    "PendingBatchesFail");
3152       batch = nullptr;
3153     }
3154   }
3155   if (yield_call_combiner_predicate(closures)) {
3156     closures.RunClosures(call_combiner_);
3157   } else {
3158     closures.RunClosuresWithoutYielding(call_combiner_);
3159   }
3160 }
3161 
3162 // This is called via the call combiner, so access to calld is synchronized.
3163 void ClientChannelFilter::FilterBasedLoadBalancedCall::
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)3164     ResumePendingBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
3165   grpc_transport_stream_op_batch* batch =
3166       static_cast<grpc_transport_stream_op_batch*>(arg);
3167   SubchannelCall* subchannel_call =
3168       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
3169   // Note: This will release the call combiner.
3170   subchannel_call->StartTransportStreamOpBatch(batch);
3171 }
3172 
3173 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()3174 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesResume() {
3175   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3176     size_t num_batches = 0;
3177     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3178       if (pending_batches_[i] != nullptr) ++num_batches;
3179     }
3180     gpr_log(GPR_INFO,
3181             "chand=%p lb_call=%p: starting %" PRIuPTR
3182             " pending batches on subchannel_call=%p",
3183             chand(), this, num_batches, subchannel_call_.get());
3184   }
3185   CallCombinerClosureList closures;
3186   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3187     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
3188     if (batch != nullptr) {
3189       batch->handler_private.extra_arg = subchannel_call_.get();
3190       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3191                         ResumePendingBatchInCallCombiner, batch,
3192                         grpc_schedule_on_exec_ctx);
3193       closures.Add(&batch->handler_private.closure, absl::OkStatus(),
3194                    "resuming pending batch from LB call");
3195       batch = nullptr;
3196     }
3197   }
3198   // Note: This will release the call combiner.
3199   closures.RunClosures(call_combiner_);
3200 }
3201 
3202 void ClientChannelFilter::FilterBasedLoadBalancedCall::
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)3203     StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch) {
3204   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace) ||
3205       GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) {
3206     gpr_log(GPR_INFO,
3207             "chand=%p lb_call=%p: batch started from above: %s, "
3208             "call_attempt_tracer()=%p",
3209             chand(), this,
3210             grpc_transport_stream_op_batch_string(batch, false).c_str(),
3211             call_attempt_tracer());
3212   }
3213   // Handle call tracing.
3214   if (call_attempt_tracer() != nullptr) {
3215     // Record send ops in tracer.
3216     if (batch->cancel_stream) {
3217       call_attempt_tracer()->RecordCancel(
3218           batch->payload->cancel_stream.cancel_error);
3219     }
3220     if (batch->send_initial_metadata) {
3221       call_attempt_tracer()->RecordSendInitialMetadata(
3222           batch->payload->send_initial_metadata.send_initial_metadata);
3223     }
3224     if (batch->send_trailing_metadata) {
3225       call_attempt_tracer()->RecordSendTrailingMetadata(
3226           batch->payload->send_trailing_metadata.send_trailing_metadata);
3227     }
3228     // Intercept recv ops.
3229     if (batch->recv_initial_metadata) {
3230       recv_initial_metadata_ =
3231           batch->payload->recv_initial_metadata.recv_initial_metadata;
3232       original_recv_initial_metadata_ready_ =
3233           batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
3234       GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
3235                         this, nullptr);
3236       batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
3237           &recv_initial_metadata_ready_;
3238     }
3239   }
3240   // Intercept recv_trailing_metadata even if there is no call tracer,
3241   // since we may need to notify the LB policy about trailing metadata.
3242   if (batch->recv_trailing_metadata) {
3243     recv_trailing_metadata_ =
3244         batch->payload->recv_trailing_metadata.recv_trailing_metadata;
3245     transport_stream_stats_ =
3246         batch->payload->recv_trailing_metadata.collect_stats;
3247     original_recv_trailing_metadata_ready_ =
3248         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
3249     GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
3250                       this, nullptr);
3251     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
3252         &recv_trailing_metadata_ready_;
3253   }
3254   // If we've already gotten a subchannel call, pass the batch down to it.
3255   // Note that once we have picked a subchannel, we do not need to acquire
3256   // the channel's data plane mutex, which is more efficient (especially for
3257   // streaming calls).
3258   if (subchannel_call_ != nullptr) {
3259     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3260       gpr_log(GPR_INFO,
3261               "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
3262               chand(), this, subchannel_call_.get());
3263     }
3264     subchannel_call_->StartTransportStreamOpBatch(batch);
3265     return;
3266   }
3267   // We do not yet have a subchannel call.
3268   //
3269   // If we've previously been cancelled, immediately fail any new batches.
3270   if (GPR_UNLIKELY(!cancel_error_.ok())) {
3271     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3272       gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s",
3273               chand(), this, StatusToString(cancel_error_).c_str());
3274     }
3275     // Note: This will release the call combiner.
3276     grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
3277                                                        call_combiner_);
3278     return;
3279   }
3280   // Handle cancellation.
3281   if (GPR_UNLIKELY(batch->cancel_stream)) {
3282     // Stash a copy of cancel_error in our call data, so that we can use
3283     // it for subsequent operations.  This ensures that if the call is
3284     // cancelled before any batches are passed down (e.g., if the deadline
3285     // is in the past when the call starts), we can return the right
3286     // error to the caller when the first batch does get passed down.
3287     cancel_error_ = batch->payload->cancel_stream.cancel_error;
3288     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3289       gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s",
3290               chand(), this, StatusToString(cancel_error_).c_str());
3291     }
3292     // Fail all pending batches.
3293     PendingBatchesFail(cancel_error_, NoYieldCallCombiner);
3294     // Note: This will release the call combiner.
3295     grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
3296                                                        call_combiner_);
3297     return;
3298   }
3299   // Add the batch to the pending list.
3300   PendingBatchesAdd(batch);
3301   // For batches containing a send_initial_metadata op, acquire the
3302   // channel's LB mutex to pick a subchannel.
3303   if (GPR_LIKELY(batch->send_initial_metadata)) {
3304     TryPick(/*was_queued=*/false);
3305   } else {
3306     // For all other batches, release the call combiner.
3307     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3308       gpr_log(GPR_INFO,
3309               "chand=%p lb_call=%p: saved batch, yielding call combiner",
3310               chand(), this);
3311     }
3312     GRPC_CALL_COMBINER_STOP(call_combiner_,
3313                             "batch does not include send_initial_metadata");
3314   }
3315 }
3316 
RecvInitialMetadataReady(void * arg,grpc_error_handle error)3317 void ClientChannelFilter::FilterBasedLoadBalancedCall::RecvInitialMetadataReady(
3318     void* arg, grpc_error_handle error) {
3319   auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
3320   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3321     gpr_log(GPR_INFO,
3322             "chand=%p lb_call=%p: got recv_initial_metadata_ready: error=%s",
3323             self->chand(), self, StatusToString(error).c_str());
3324   }
3325   if (error.ok()) {
3326     // recv_initial_metadata_flags is not populated for clients
3327     self->call_attempt_tracer()->RecordReceivedInitialMetadata(
3328         self->recv_initial_metadata_);
3329     auto* peer_string = self->recv_initial_metadata_->get_pointer(PeerString());
3330     if (peer_string != nullptr) self->peer_string_ = peer_string->Ref();
3331   }
3332   Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
3333                error);
3334 }
3335 
3336 void ClientChannelFilter::FilterBasedLoadBalancedCall::
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)3337     RecvTrailingMetadataReady(void* arg, grpc_error_handle error) {
3338   auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
3339   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3340     gpr_log(GPR_INFO,
3341             "chand=%p lb_call=%p: got recv_trailing_metadata_ready: error=%s "
3342             "call_attempt_tracer()=%p lb_subchannel_call_tracker_=%p "
3343             "failure_error_=%s",
3344             self->chand(), self, StatusToString(error).c_str(),
3345             self->call_attempt_tracer(), self->lb_subchannel_call_tracker(),
3346             StatusToString(self->failure_error_).c_str());
3347   }
3348   // Check if we have a tracer or an LB callback to invoke.
3349   if (self->call_attempt_tracer() != nullptr ||
3350       self->lb_subchannel_call_tracker() != nullptr) {
3351     // Get the call's status.
3352     absl::Status status;
3353     if (!error.ok()) {
3354       // Get status from error.
3355       grpc_status_code code;
3356       std::string message;
3357       grpc_error_get_status(error, self->deadline_, &code, &message,
3358                             /*http_error=*/nullptr, /*error_string=*/nullptr);
3359       status = absl::Status(static_cast<absl::StatusCode>(code), message);
3360     } else {
3361       // Get status from headers.
3362       const auto& md = *self->recv_trailing_metadata_;
3363       grpc_status_code code =
3364           md.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
3365       if (code != GRPC_STATUS_OK) {
3366         absl::string_view message;
3367         if (const auto* grpc_message = md.get_pointer(GrpcMessageMetadata())) {
3368           message = grpc_message->as_string_view();
3369         }
3370         status = absl::Status(static_cast<absl::StatusCode>(code), message);
3371       }
3372     }
3373     absl::string_view peer_string;
3374     if (self->peer_string_.has_value()) {
3375       peer_string = self->peer_string_->as_string_view();
3376     }
3377     self->RecordCallCompletion(status, self->recv_trailing_metadata_,
3378                                self->transport_stream_stats_, peer_string);
3379   }
3380   // Chain to original callback.
3381   if (!self->failure_error_.ok()) {
3382     error = self->failure_error_;
3383     self->failure_error_ = absl::OkStatus();
3384   }
3385   Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
3386                error);
3387 }
3388 
3389 // A class to handle the call combiner cancellation callback for a
3390 // queued pick.
3391 // TODO(roth): When we implement hedging support, we won't be able to
3392 // register a call combiner cancellation closure for each LB pick,
3393 // because there may be multiple LB picks happening in parallel.
3394 // Instead, we will probably need to maintain a list in the CallData
3395 // object of pending LB picks to be cancelled when the closure runs.
3396 class ClientChannelFilter::FilterBasedLoadBalancedCall::LbQueuedCallCanceller
3397     final {
3398  public:
LbQueuedCallCanceller(RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)3399   explicit LbQueuedCallCanceller(
3400       RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)
3401       : lb_call_(std::move(lb_call)) {
3402     GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
3403     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
3404     lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
3405   }
3406 
3407  private:
CancelLocked(void * arg,grpc_error_handle error)3408   static void CancelLocked(void* arg, grpc_error_handle error) {
3409     auto* self = static_cast<LbQueuedCallCanceller*>(arg);
3410     auto* lb_call = self->lb_call_.get();
3411     auto* chand = lb_call->chand();
3412     {
3413       MutexLock lock(&chand->lb_mu_);
3414       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3415         gpr_log(GPR_INFO,
3416                 "chand=%p lb_call=%p: cancelling queued pick: "
3417                 "error=%s self=%p calld->pick_canceller=%p",
3418                 chand, lb_call, StatusToString(error).c_str(), self,
3419                 lb_call->lb_call_canceller_);
3420       }
3421       if (lb_call->lb_call_canceller_ == self && !error.ok()) {
3422         lb_call->Commit();
3423         // Remove pick from list of queued picks.
3424         lb_call->RemoveCallFromLbQueuedCallsLocked();
3425         // Remove from queued picks list.
3426         chand->lb_queued_calls_.erase(self->lb_call_);
3427         // Fail pending batches on the call.
3428         lb_call->PendingBatchesFail(error,
3429                                     YieldCallCombinerIfPendingBatchesFound);
3430       }
3431     }
3432     // Unref lb_call before unreffing the call stack, since unreffing
3433     // the call stack may destroy the arena in which lb_call is allocated.
3434     auto* owning_call = lb_call->owning_call_;
3435     self->lb_call_.reset();
3436     GRPC_CALL_STACK_UNREF(owning_call, "LbQueuedCallCanceller");
3437     delete self;
3438   }
3439 
3440   RefCountedPtr<FilterBasedLoadBalancedCall> lb_call_;
3441   grpc_closure closure_;
3442 };
3443 
TryPick(bool was_queued)3444 void ClientChannelFilter::FilterBasedLoadBalancedCall::TryPick(
3445     bool was_queued) {
3446   auto result = PickSubchannel(was_queued);
3447   if (result.has_value()) {
3448     if (!result->ok()) {
3449       PendingBatchesFail(*result, YieldCallCombiner);
3450       return;
3451     }
3452     CreateSubchannelCall();
3453   }
3454 }
3455 
OnAddToQueueLocked()3456 void ClientChannelFilter::FilterBasedLoadBalancedCall::OnAddToQueueLocked() {
3457   // Register call combiner cancellation callback.
3458   lb_call_canceller_ =
3459       new LbQueuedCallCanceller(RefAsSubclass<FilterBasedLoadBalancedCall>());
3460 }
3461 
RetryPickLocked()3462 void ClientChannelFilter::FilterBasedLoadBalancedCall::RetryPickLocked() {
3463   // Lame the call combiner canceller.
3464   lb_call_canceller_ = nullptr;
3465   // Do an async callback to resume call processing, so that we're not
3466   // doing it while holding the channel's LB mutex.
3467   // TODO(roth): We should really be using EventEngine::Run() here
3468   // instead of ExecCtx::Run().  Unfortunately, doing that seems to cause
3469   // a flaky TSAN failure for reasons that I do not fully understand.
3470   // However, given that we are working toward eliminating this code as
3471   // part of the promise conversion, it doesn't seem worth further
3472   // investigation right now.
3473   ExecCtx::Run(DEBUG_LOCATION, NewClosure([this](grpc_error_handle) {
3474                  // If there are a lot of queued calls here, resuming them
3475                  // all may cause us to stay inside C-core for a long period
3476                  // of time. All of that work would be done using the same
3477                  // ExecCtx instance and therefore the same cached value of
3478                  // "now". The longer it takes to finish all of this work
3479                  // and exit from C-core, the more stale the cached value of
3480                  // "now" may become. This can cause problems whereby (e.g.)
3481                  // we calculate a timer deadline based on the stale value,
3482                  // which results in the timer firing too early. To avoid
3483                  // this, we invalidate the cached value for each call we
3484                  // process.
3485                  ExecCtx::Get()->InvalidateNow();
3486                  TryPick(/*was_queued=*/true);
3487                }),
3488                absl::OkStatus());
3489 }
3490 
CreateSubchannelCall()3491 void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() {
3492   Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
3493   GPR_ASSERT(path != nullptr);
3494   SubchannelCall::Args call_args = {
3495       connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0,
3496       deadline_, arena_,
3497       // TODO(roth): When we implement hedging support, we will probably
3498       // need to use a separate call context for each subchannel call.
3499       call_context(), call_combiner_};
3500   grpc_error_handle error;
3501   subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
3502   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3503     gpr_log(GPR_INFO,
3504             "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand(),
3505             this, subchannel_call_.get(), StatusToString(error).c_str());
3506   }
3507   if (on_call_destruction_complete_ != nullptr) {
3508     subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
3509     on_call_destruction_complete_ = nullptr;
3510   }
3511   if (GPR_UNLIKELY(!error.ok())) {
3512     PendingBatchesFail(error, YieldCallCombiner);
3513   } else {
3514     PendingBatchesResume();
3515   }
3516 }
3517 
3518 //
3519 // ClientChannelFilter::PromiseBasedLoadBalancedCall
3520 //
3521 
PromiseBasedLoadBalancedCall(ClientChannelFilter * chand,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)3522 ClientChannelFilter::PromiseBasedLoadBalancedCall::PromiseBasedLoadBalancedCall(
3523     ClientChannelFilter* chand, absl::AnyInvocable<void()> on_commit,
3524     bool is_transparent_retry)
3525     : LoadBalancedCall(chand, GetContext<grpc_call_context_element>(),
3526                        std::move(on_commit), is_transparent_retry) {}
3527 
3528 ArenaPromise<ServerMetadataHandle>
MakeCallPromise(CallArgs call_args,OrphanablePtr<PromiseBasedLoadBalancedCall> lb_call)3529 ClientChannelFilter::PromiseBasedLoadBalancedCall::MakeCallPromise(
3530     CallArgs call_args, OrphanablePtr<PromiseBasedLoadBalancedCall> lb_call) {
3531   pollent_ = NowOrNever(call_args.polling_entity->WaitAndCopy()).value();
3532   // Record ops in tracer.
3533   if (call_attempt_tracer() != nullptr) {
3534     call_attempt_tracer()->RecordSendInitialMetadata(
3535         call_args.client_initial_metadata.get());
3536     // TODO(ctiller): Find a way to do this without registering a no-op mapper.
3537     call_args.client_to_server_messages->InterceptAndMapWithHalfClose(
3538         [](MessageHandle message) { return message; },  // No-op.
3539         [this]() {
3540           // TODO(roth): Change CallTracer API to not pass metadata
3541           // batch to this method, since the batch is always empty.
3542           grpc_metadata_batch metadata;
3543           call_attempt_tracer()->RecordSendTrailingMetadata(&metadata);
3544         });
3545   }
3546   // Extract peer name from server initial metadata.
3547   call_args.server_initial_metadata->InterceptAndMap(
3548       [self = lb_call->RefAsSubclass<PromiseBasedLoadBalancedCall>()](
3549           ServerMetadataHandle metadata) {
3550         if (self->call_attempt_tracer() != nullptr) {
3551           self->call_attempt_tracer()->RecordReceivedInitialMetadata(
3552               metadata.get());
3553         }
3554         Slice* peer_string = metadata->get_pointer(PeerString());
3555         if (peer_string != nullptr) self->peer_string_ = peer_string->Ref();
3556         return metadata;
3557       });
3558   client_initial_metadata_ = std::move(call_args.client_initial_metadata);
3559   return OnCancel(
3560       Map(TrySeq(
3561               // LB pick.
3562               [this]() -> Poll<absl::Status> {
3563                 auto result = PickSubchannel(was_queued_);
3564                 if (GRPC_TRACE_FLAG_ENABLED(
3565                         grpc_client_channel_lb_call_trace)) {
3566                   gpr_log(GPR_INFO,
3567                           "chand=%p lb_call=%p: %sPickSubchannel() returns %s",
3568                           chand(), this,
3569                           GetContext<Activity>()->DebugTag().c_str(),
3570                           result.has_value() ? result->ToString().c_str()
3571                                              : "Pending");
3572                 }
3573                 if (result == absl::nullopt) return Pending{};
3574                 return std::move(*result);
3575               },
3576               [this, call_args = std::move(call_args)]() mutable
3577               -> ArenaPromise<ServerMetadataHandle> {
3578                 call_args.client_initial_metadata =
3579                     std::move(client_initial_metadata_);
3580                 return connected_subchannel()->MakeCallPromise(
3581                     std::move(call_args));
3582               }),
3583           // Record call completion.
3584           [this](ServerMetadataHandle metadata) {
3585             if (call_attempt_tracer() != nullptr ||
3586                 lb_subchannel_call_tracker() != nullptr) {
3587               absl::Status status;
3588               grpc_status_code code = metadata->get(GrpcStatusMetadata())
3589                                           .value_or(GRPC_STATUS_UNKNOWN);
3590               if (code != GRPC_STATUS_OK) {
3591                 absl::string_view message;
3592                 if (const auto* grpc_message =
3593                         metadata->get_pointer(GrpcMessageMetadata())) {
3594                   message = grpc_message->as_string_view();
3595                 }
3596                 status =
3597                     absl::Status(static_cast<absl::StatusCode>(code), message);
3598               }
3599               RecordCallCompletion(status, metadata.get(),
3600                                    &GetContext<CallContext>()
3601                                         ->call_stats()
3602                                         ->transport_stream_stats,
3603                                    peer_string_.as_string_view());
3604             }
3605             RecordLatency();
3606             return metadata;
3607           }),
3608       [lb_call = std::move(lb_call)]() {
3609         // If the waker is pending, then we need to remove ourself from
3610         // the list of queued LB calls.
3611         if (!lb_call->waker_.is_unwakeable()) {
3612           MutexLock lock(&lb_call->chand()->lb_mu_);
3613           lb_call->Commit();
3614           // Remove pick from list of queued picks.
3615           lb_call->RemoveCallFromLbQueuedCallsLocked();
3616           // Remove from queued picks list.
3617           lb_call->chand()->lb_queued_calls_.erase(lb_call.get());
3618         }
3619         // TODO(ctiller): We don't have access to the call's actual status
3620         // here, so we just assume CANCELLED.  We could change this to use
3621         // CallFinalization instead of OnCancel() so that we can get the
3622         // actual status.  But we should also have access to the trailing
3623         // metadata, which we don't have in either case.  Ultimately, we
3624         // need a better story for code that needs to run at the end of a
3625         // call in both cancellation and non-cancellation cases that needs
3626         // access to server trailing metadata and the call's real status.
3627         if (lb_call->call_attempt_tracer() != nullptr) {
3628           lb_call->call_attempt_tracer()->RecordCancel(
3629               absl::CancelledError("call cancelled"));
3630         }
3631         if (lb_call->call_attempt_tracer() != nullptr ||
3632             lb_call->lb_subchannel_call_tracker() != nullptr) {
3633           // If we were cancelled without recording call completion, then
3634           // record call completion here, as best we can.  We assume status
3635           // CANCELLED in this case.
3636           lb_call->RecordCallCompletion(absl::CancelledError("call cancelled"),
3637                                         nullptr, nullptr, "");
3638         }
3639       });
3640 }
3641 
arena() const3642 Arena* ClientChannelFilter::PromiseBasedLoadBalancedCall::arena() const {
3643   return GetContext<Arena>();
3644 }
3645 
3646 grpc_metadata_batch*
send_initial_metadata() const3647 ClientChannelFilter::PromiseBasedLoadBalancedCall::send_initial_metadata()
3648     const {
3649   return client_initial_metadata_.get();
3650 }
3651 
OnAddToQueueLocked()3652 void ClientChannelFilter::PromiseBasedLoadBalancedCall::OnAddToQueueLocked() {
3653   waker_ = GetContext<Activity>()->MakeNonOwningWaker();
3654   was_queued_ = true;
3655 }
3656 
RetryPickLocked()3657 void ClientChannelFilter::PromiseBasedLoadBalancedCall::RetryPickLocked() {
3658   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3659     gpr_log(GPR_INFO, "chand=%p lb_call=%p: RetryPickLocked()", chand(), this);
3660   }
3661   waker_.WakeupAsync();
3662 }
3663 
3664 }  // namespace grpc_core
3665