xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/xds/xds_cluster_impl.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <stddef.h>
20 #include <stdint.h>
21 
22 #include <atomic>
23 #include <map>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/types/optional.h"
35 #include "absl/types/variant.h"
36 
37 #include <grpc/impl/connectivity_state.h>
38 #include <grpc/support/log.h>
39 
40 #include "src/core/client_channel/client_channel_internal.h"
41 #include "src/core/ext/xds/xds_bootstrap.h"
42 #include "src/core/ext/xds/xds_bootstrap_grpc.h"
43 #include "src/core/ext/xds/xds_client.h"
44 #include "src/core/ext/xds/xds_client_grpc.h"
45 #include "src/core/ext/xds/xds_client_stats.h"
46 #include "src/core/ext/xds/xds_endpoint.h"
47 #include "src/core/lib/channel/call_tracer.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/config/core_configuration.h"
50 #include "src/core/lib/debug/trace.h"
51 #include "src/core/lib/gprpp/debug_location.h"
52 #include "src/core/lib/gprpp/match.h"
53 #include "src/core/lib/gprpp/orphanable.h"
54 #include "src/core/lib/gprpp/ref_counted.h"
55 #include "src/core/lib/gprpp/ref_counted_ptr.h"
56 #include "src/core/lib/gprpp/ref_counted_string.h"
57 #include "src/core/lib/gprpp/sync.h"
58 #include "src/core/lib/gprpp/validation_errors.h"
59 #include "src/core/lib/iomgr/pollset_set.h"
60 #include "src/core/lib/iomgr/resolved_address.h"
61 #include "src/core/lib/json/json.h"
62 #include "src/core/lib/json/json_args.h"
63 #include "src/core/lib/json/json_object_loader.h"
64 #include "src/core/lib/security/credentials/xds/xds_credentials.h"
65 #include "src/core/lib/transport/connectivity_state.h"
66 #include "src/core/load_balancing/backend_metric_data.h"
67 #include "src/core/load_balancing/child_policy_handler.h"
68 #include "src/core/load_balancing/delegating_helper.h"
69 #include "src/core/load_balancing/lb_policy.h"
70 #include "src/core/load_balancing/lb_policy_factory.h"
71 #include "src/core/load_balancing/lb_policy_registry.h"
72 #include "src/core/load_balancing/subchannel_interface.h"
73 #include "src/core/load_balancing/xds/xds_channel_args.h"
74 #include "src/core/resolver/endpoint_addresses.h"
75 #include "src/core/resolver/xds/xds_dependency_manager.h"
76 
77 namespace grpc_core {
78 
79 TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
80 
81 namespace {
82 
83 using XdsConfig = XdsDependencyManager::XdsConfig;
84 
85 //
86 // global circuit breaker atomic map
87 //
88 
89 class CircuitBreakerCallCounterMap final {
90  public:
91   using Key =
92       std::pair<std::string /*cluster*/, std::string /*eds_service_name*/>;
93 
94   class CallCounter final : public RefCounted<CallCounter> {
95    public:
CallCounter(Key key)96     explicit CallCounter(Key key) : key_(std::move(key)) {}
97     ~CallCounter() override;
98 
Load()99     uint32_t Load() {
100       return concurrent_requests_.load(std::memory_order_seq_cst);
101     }
Increment()102     uint32_t Increment() { return concurrent_requests_.fetch_add(1); }
Decrement()103     void Decrement() { concurrent_requests_.fetch_sub(1); }
104 
105    private:
106     Key key_;
107     std::atomic<uint32_t> concurrent_requests_{0};
108   };
109 
110   RefCountedPtr<CallCounter> GetOrCreate(const std::string& cluster,
111                                          const std::string& eds_service_name);
112 
113  private:
114   Mutex mu_;
115   std::map<Key, CallCounter*> map_ ABSL_GUARDED_BY(mu_);
116 };
117 
118 CircuitBreakerCallCounterMap* const g_call_counter_map =
119     new CircuitBreakerCallCounterMap;
120 
121 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
GetOrCreate(const std::string & cluster,const std::string & eds_service_name)122 CircuitBreakerCallCounterMap::GetOrCreate(const std::string& cluster,
123                                           const std::string& eds_service_name) {
124   Key key(cluster, eds_service_name);
125   RefCountedPtr<CallCounter> result;
126   MutexLock lock(&mu_);
127   auto it = map_.find(key);
128   if (it == map_.end()) {
129     it = map_.insert({key, nullptr}).first;
130   } else {
131     result = it->second->RefIfNonZero();
132   }
133   if (result == nullptr) {
134     result = MakeRefCounted<CallCounter>(std::move(key));
135     it->second = result.get();
136   }
137   return result;
138 }
139 
~CallCounter()140 CircuitBreakerCallCounterMap::CallCounter::~CallCounter() {
141   MutexLock lock(&g_call_counter_map->mu_);
142   auto it = g_call_counter_map->map_.find(key_);
143   if (it != g_call_counter_map->map_.end() && it->second == this) {
144     g_call_counter_map->map_.erase(it);
145   }
146 }
147 
148 //
149 // LB policy
150 //
151 
152 constexpr absl::string_view kXdsClusterImpl = "xds_cluster_impl_experimental";
153 
154 // Config for xDS Cluster Impl LB policy.
155 class XdsClusterImplLbConfig final : public LoadBalancingPolicy::Config {
156  public:
157   XdsClusterImplLbConfig() = default;
158 
159   XdsClusterImplLbConfig(const XdsClusterImplLbConfig&) = delete;
160   XdsClusterImplLbConfig& operator=(const XdsClusterImplLbConfig&) = delete;
161 
162   XdsClusterImplLbConfig(XdsClusterImplLbConfig&& other) = delete;
163   XdsClusterImplLbConfig& operator=(XdsClusterImplLbConfig&& other) = delete;
164 
name() const165   absl::string_view name() const override { return kXdsClusterImpl; }
166 
cluster_name() const167   const std::string& cluster_name() const { return cluster_name_; }
child_policy() const168   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
169     return child_policy_;
170   }
171 
172   static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
173   void JsonPostLoad(const Json& json, const JsonArgs& args,
174                     ValidationErrors* errors);
175 
176  private:
177   std::string cluster_name_;
178   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
179 };
180 
181 // xDS Cluster Impl LB policy.
182 class XdsClusterImplLb final : public LoadBalancingPolicy {
183  public:
184   XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client, Args args);
185 
name() const186   absl::string_view name() const override { return kXdsClusterImpl; }
187 
188   absl::Status UpdateLocked(UpdateArgs args) override;
189   void ExitIdleLocked() override;
190   void ResetBackoffLocked() override;
191 
192  private:
193   class StatsSubchannelWrapper final : public DelegatingSubchannel {
194    public:
195     // If load reporting is enabled and we have an XdsClusterLocalityStats
196     // object, that object already contains the locality label.  We
197     // need to store the locality label directly only in the case where
198     // load reporting is disabled.
199     using LocalityData = absl::variant<
200         RefCountedStringValue /*locality*/,
201         RefCountedPtr<XdsClusterLocalityStats> /*locality_stats*/>;
202 
StatsSubchannelWrapper(RefCountedPtr<SubchannelInterface> wrapped_subchannel,LocalityData locality_data)203     StatsSubchannelWrapper(
204         RefCountedPtr<SubchannelInterface> wrapped_subchannel,
205         LocalityData locality_data)
206         : DelegatingSubchannel(std::move(wrapped_subchannel)),
207           locality_data_(std::move(locality_data)) {}
208 
locality() const209     RefCountedStringValue locality() const {
210       return Match(
211           locality_data_,
212           [](RefCountedStringValue locality) { return locality; },
213           [](const RefCountedPtr<XdsClusterLocalityStats>& locality_stats) {
214             return locality_stats->locality_name()->human_readable_string();
215           });
216     }
217 
locality_stats() const218     XdsClusterLocalityStats* locality_stats() const {
219       return Match(
220           locality_data_,
221           [](const RefCountedStringValue&) {
222             return static_cast<XdsClusterLocalityStats*>(nullptr);
223           },
224           [](const RefCountedPtr<XdsClusterLocalityStats>& locality_stats) {
225             return locality_stats.get();
226           });
227     }
228 
229    private:
230     LocalityData locality_data_;
231   };
232 
233   // A picker that wraps the picker from the child to perform drops.
234   class Picker final : public SubchannelPicker {
235    public:
236     Picker(XdsClusterImplLb* xds_cluster_impl_lb,
237            RefCountedPtr<SubchannelPicker> picker);
238 
239     PickResult Pick(PickArgs args) override;
240 
241    private:
242     class SubchannelCallTracker;
243 
244     RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
245     uint32_t max_concurrent_requests_;
246     RefCountedStringValue service_telemetry_label_;
247     RefCountedStringValue namespace_telemetry_label_;
248     RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
249     RefCountedPtr<XdsClusterDropStats> drop_stats_;
250     RefCountedPtr<SubchannelPicker> picker_;
251   };
252 
253   class Helper final
254       : public ParentOwningDelegatingChannelControlHelper<XdsClusterImplLb> {
255    public:
Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)256     explicit Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)
257         : ParentOwningDelegatingChannelControlHelper(
258               std::move(xds_cluster_impl_policy)) {}
259 
260     RefCountedPtr<SubchannelInterface> CreateSubchannel(
261         const grpc_resolved_address& address,
262         const ChannelArgs& per_address_args, const ChannelArgs& args) override;
263     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
264                      RefCountedPtr<SubchannelPicker> picker) override;
265   };
266 
267   ~XdsClusterImplLb() override;
268 
269   void ShutdownLocked() override;
270 
271   void ResetState();
272   void ReportTransientFailure(absl::Status status);
273 
274   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
275       const ChannelArgs& args);
276   absl::Status UpdateChildPolicyLocked(
277       absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
278       std::string resolution_note, const ChannelArgs& args);
279 
280   absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
281   MaybeCreateCertificateProviderLocked(
282       const XdsClusterResource& cluster_resource) const;
283 
284   void MaybeUpdatePickerLocked();
285 
286   // Current config from the resolver.
287   RefCountedPtr<XdsClusterImplLbConfig> config_;
288   std::shared_ptr<const XdsClusterResource> cluster_resource_;
289   RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
290 
291   // Current concurrent number of requests.
292   RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
293 
294   // Internal state.
295   bool shutting_down_ = false;
296 
297   // The xds client.
298   RefCountedPtr<GrpcXdsClient> xds_client_;
299 
300   // The stats for client-side load reporting.
301   RefCountedPtr<XdsClusterDropStats> drop_stats_;
302 
303   OrphanablePtr<LoadBalancingPolicy> child_policy_;
304 
305   // Latest state and picker reported by the child policy.
306   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
307   absl::Status status_;
308   RefCountedPtr<SubchannelPicker> picker_;
309 };
310 
311 //
312 // XdsClusterImplLb::Picker::SubchannelCallTracker
313 //
314 
315 class XdsClusterImplLb::Picker::SubchannelCallTracker final
316     : public LoadBalancingPolicy::SubchannelCallTrackerInterface {
317  public:
SubchannelCallTracker(std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface> original_subchannel_call_tracker,RefCountedPtr<XdsClusterLocalityStats> locality_stats,RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)318   SubchannelCallTracker(
319       std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
320           original_subchannel_call_tracker,
321       RefCountedPtr<XdsClusterLocalityStats> locality_stats,
322       RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)
323       : original_subchannel_call_tracker_(
324             std::move(original_subchannel_call_tracker)),
325         locality_stats_(std::move(locality_stats)),
326         call_counter_(std::move(call_counter)) {}
327 
~SubchannelCallTracker()328   ~SubchannelCallTracker() override {
329     locality_stats_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
330     call_counter_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
331     GPR_DEBUG_ASSERT(!started_);
332   }
333 
Start()334   void Start() override {
335     // Increment number of calls in flight.
336     call_counter_->Increment();
337     // Record a call started.
338     if (locality_stats_ != nullptr) {
339       locality_stats_->AddCallStarted();
340     }
341     // Delegate if needed.
342     if (original_subchannel_call_tracker_ != nullptr) {
343       original_subchannel_call_tracker_->Start();
344     }
345 #ifndef NDEBUG
346     started_ = true;
347 #endif
348   }
349 
Finish(FinishArgs args)350   void Finish(FinishArgs args) override {
351     // Delegate if needed.
352     if (original_subchannel_call_tracker_ != nullptr) {
353       original_subchannel_call_tracker_->Finish(args);
354     }
355     // Record call completion for load reporting.
356     if (locality_stats_ != nullptr) {
357       auto* backend_metric_data =
358           args.backend_metric_accessor->GetBackendMetricData();
359       const std::map<absl::string_view, double>* named_metrics = nullptr;
360       if (backend_metric_data != nullptr) {
361         named_metrics = &backend_metric_data->named_metrics;
362       }
363       locality_stats_->AddCallFinished(named_metrics, !args.status.ok());
364     }
365     // Decrement number of calls in flight.
366     call_counter_->Decrement();
367 #ifndef NDEBUG
368     started_ = false;
369 #endif
370   }
371 
372  private:
373   std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
374       original_subchannel_call_tracker_;
375   RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
376   RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
377 #ifndef NDEBUG
378   bool started_ = false;
379 #endif
380 };
381 
382 //
383 // XdsClusterImplLb::Picker
384 //
385 
Picker(XdsClusterImplLb * xds_cluster_impl_lb,RefCountedPtr<SubchannelPicker> picker)386 XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
387                                  RefCountedPtr<SubchannelPicker> picker)
388     : call_counter_(xds_cluster_impl_lb->call_counter_),
389       max_concurrent_requests_(
390           xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests),
391       service_telemetry_label_(
392           xds_cluster_impl_lb->cluster_resource_->service_telemetry_label),
393       namespace_telemetry_label_(
394           xds_cluster_impl_lb->cluster_resource_->namespace_telemetry_label),
395       drop_config_(xds_cluster_impl_lb->drop_config_),
396       drop_stats_(xds_cluster_impl_lb->drop_stats_),
397       picker_(std::move(picker)) {
398   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
399     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] constructed new picker %p",
400             xds_cluster_impl_lb, this);
401   }
402 }
403 
Pick(LoadBalancingPolicy::PickArgs args)404 LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
405     LoadBalancingPolicy::PickArgs args) {
406   auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
407   auto* call_attempt_tracer = call_state->GetCallAttemptTracer();
408   if (call_attempt_tracer != nullptr) {
409     call_attempt_tracer->SetOptionalLabel(
410         ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kXdsServiceName,
411         service_telemetry_label_);
412     call_attempt_tracer->SetOptionalLabel(
413         ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
414             kXdsServiceNamespace,
415         namespace_telemetry_label_);
416   }
417   // Handle EDS drops.
418   const std::string* drop_category;
419   if (drop_config_ != nullptr && drop_config_->ShouldDrop(&drop_category)) {
420     if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
421     return PickResult::Drop(absl::UnavailableError(
422         absl::StrCat("EDS-configured drop: ", *drop_category)));
423   }
424   // Check if we exceeded the max concurrent requests circuit breaking limit.
425   // Note: We check the value here, but we don't actually increment the
426   // counter for the current request until the channel calls the subchannel
427   // call tracker's Start() method.  This means that we may wind up
428   // allowing more concurrent requests than the configured limit.
429   if (call_counter_->Load() >= max_concurrent_requests_) {
430     if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
431     return PickResult::Drop(absl::UnavailableError("circuit breaker drop"));
432   }
433   // If we're not dropping the call, we should always have a child picker.
434   if (picker_ == nullptr) {  // Should never happen.
435     return PickResult::Fail(absl::InternalError(
436         "xds_cluster_impl picker not given any child picker"));
437   }
438   // Not dropping, so delegate to child picker.
439   PickResult result = picker_->Pick(args);
440   auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
441   if (complete_pick != nullptr) {
442     auto* subchannel_wrapper =
443         static_cast<StatsSubchannelWrapper*>(complete_pick->subchannel.get());
444     // Add locality label to per-call metrics if needed.
445     if (call_attempt_tracer != nullptr) {
446       call_attempt_tracer->SetOptionalLabel(
447           ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kLocality,
448           subchannel_wrapper->locality());
449     }
450     // Handle load reporting.
451     RefCountedPtr<XdsClusterLocalityStats> locality_stats;
452     if (subchannel_wrapper->locality_stats() != nullptr) {
453       locality_stats = subchannel_wrapper->locality_stats()->Ref(
454           DEBUG_LOCATION, "SubchannelCallTracker");
455     }
456     // Unwrap subchannel to pass back up the stack.
457     complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
458     // Inject subchannel call tracker to record call completion.
459     complete_pick->subchannel_call_tracker =
460         std::make_unique<SubchannelCallTracker>(
461             std::move(complete_pick->subchannel_call_tracker),
462             std::move(locality_stats),
463             call_counter_->Ref(DEBUG_LOCATION, "SubchannelCallTracker"));
464   } else {
465     // TODO(roth): We should ideally also record call failures here in the case
466     // where a pick fails.  This is challenging, because we don't know which
467     // picks are for wait_for_ready RPCs or how many times we'll return a
468     // failure for the same wait_for_ready RPC.
469   }
470   return result;
471 }
472 
473 //
474 // XdsClusterImplLb
475 //
476 
XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client,Args args)477 XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client,
478                                    Args args)
479     : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
480   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
481     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] created -- using xds client %p",
482             this, xds_client_.get());
483   }
484 }
485 
~XdsClusterImplLb()486 XdsClusterImplLb::~XdsClusterImplLb() {
487   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
488     gpr_log(GPR_INFO,
489             "[xds_cluster_impl_lb %p] destroying xds_cluster_impl LB policy",
490             this);
491   }
492 }
493 
ShutdownLocked()494 void XdsClusterImplLb::ShutdownLocked() {
495   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
496     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] shutting down", this);
497   }
498   shutting_down_ = true;
499   ResetState();
500   xds_client_.reset(DEBUG_LOCATION, "XdsClusterImpl");
501 }
502 
ResetState()503 void XdsClusterImplLb::ResetState() {
504   // Remove the child policy's interested_parties pollset_set from the
505   // xDS policy.
506   if (child_policy_ != nullptr) {
507     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
508                                      interested_parties());
509     child_policy_.reset();
510   }
511   // Drop our ref to the child's picker, in case it's holding a ref to
512   // the child.
513   picker_.reset();
514   drop_stats_.reset();
515 }
516 
ReportTransientFailure(absl::Status status)517 void XdsClusterImplLb::ReportTransientFailure(absl::Status status) {
518   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
519     gpr_log(GPR_INFO,
520             "[xds_cluster_impl_lb %p] reporting TRANSIENT_FAILURE: %s", this,
521             status.ToString().c_str());
522   }
523   ResetState();
524   channel_control_helper()->UpdateState(
525       GRPC_CHANNEL_TRANSIENT_FAILURE, status,
526       MakeRefCounted<TransientFailurePicker>(status));
527 }
528 
ExitIdleLocked()529 void XdsClusterImplLb::ExitIdleLocked() {
530   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
531 }
532 
ResetBackoffLocked()533 void XdsClusterImplLb::ResetBackoffLocked() {
534   // The XdsClient will have its backoff reset by the xds resolver, so we
535   // don't need to do it here.
536   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
537 }
538 
GetEdsResourceName(const XdsClusterResource & cluster_resource)539 std::string GetEdsResourceName(const XdsClusterResource& cluster_resource) {
540   auto* eds = absl::get_if<XdsClusterResource::Eds>(&cluster_resource.type);
541   if (eds == nullptr) return "";
542   return eds->eds_service_name;
543 }
544 
UpdateLocked(UpdateArgs args)545 absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
546   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
547     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this);
548   }
549   // Grab new LB policy config.
550   auto new_config = args.config.TakeAsSubclass<XdsClusterImplLbConfig>();
551   // Cluster name should never change, because the cds policy will assign a
552   // different priority child name if that happens, which means that this
553   // policy instance will get replaced instead of being updated.
554   if (config_ != nullptr) {
555     GPR_ASSERT(config_->cluster_name() == new_config->cluster_name());
556   }
557   // Get xDS config.
558   auto new_xds_config = args.args.GetObjectRef<XdsConfig>();
559   if (new_xds_config == nullptr) {
560     // Should never happen.
561     absl::Status status = absl::InternalError(
562         "xDS config not passed to xds_cluster_impl LB policy");
563     ReportTransientFailure(status);
564     return status;
565   }
566   auto it = new_xds_config->clusters.find(new_config->cluster_name());
567   if (it == new_xds_config->clusters.end() || !it->second.ok() ||
568       it->second->cluster == nullptr) {
569     // Should never happen.
570     absl::Status status = absl::InternalError(absl::StrCat(
571         "xDS config has no entry for cluster ", new_config->cluster_name()));
572     ReportTransientFailure(status);
573     return status;
574   }
575   auto& new_cluster_config = *it->second;
576   auto* endpoint_config =
577       absl::get_if<XdsConfig::ClusterConfig::EndpointConfig>(
578           &new_cluster_config.children);
579   if (endpoint_config == nullptr) {
580     // Should never happen.
581     absl::Status status = absl::InternalError(
582         absl::StrCat("cluster config for ", new_config->cluster_name(),
583                      " has no endpoint config"));
584     ReportTransientFailure(status);
585     return status;
586   }
587   auto xds_cert_provider =
588       MaybeCreateCertificateProviderLocked(*new_cluster_config.cluster);
589   if (!xds_cert_provider.ok()) {
590     // Should never happen.
591     ReportTransientFailure(xds_cert_provider.status());
592     return xds_cert_provider.status();
593   }
594   if (*xds_cert_provider != nullptr) {
595     args.args = args.args.SetObject(std::move(*xds_cert_provider));
596   }
597   // Now we've verified the new config is good.
598   // Get new and old (if any) EDS service name.
599   std::string new_eds_service_name =
600       GetEdsResourceName(*new_cluster_config.cluster);
601   std::string old_eds_service_name =
602       cluster_resource_ == nullptr ? ""
603                                    : GetEdsResourceName(*cluster_resource_);
604   // Update drop stats if needed.
605   // Note: We need a drop stats object whenever load reporting is enabled,
606   // even if we have no EDS drop config, because we also use it when
607   // reporting circuit breaker drops.
608   if (!new_cluster_config.cluster->lrs_load_reporting_server.has_value()) {
609     drop_stats_.reset();
610   } else if (cluster_resource_ == nullptr ||
611              old_eds_service_name != new_eds_service_name ||
612              cluster_resource_->lrs_load_reporting_server !=
613                  new_cluster_config.cluster->lrs_load_reporting_server) {
614     drop_stats_ = xds_client_->AddClusterDropStats(
615         *new_cluster_config.cluster->lrs_load_reporting_server,
616         new_config->cluster_name(), new_eds_service_name);
617     if (drop_stats_ == nullptr) {
618       gpr_log(
619           GPR_ERROR,
620           "[xds_cluster_impl_lb %p] Failed to get cluster drop stats for "
621           "LRS server %s, cluster %s, EDS service name %s, load "
622           "reporting for drops will not be done.",
623           this,
624           new_cluster_config.cluster->lrs_load_reporting_server->server_uri()
625               .c_str(),
626           new_config->cluster_name().c_str(), new_eds_service_name.c_str());
627     }
628   }
629   // Update call counter if needed.
630   if (cluster_resource_ == nullptr ||
631       old_eds_service_name != new_eds_service_name) {
632     call_counter_ = g_call_counter_map->GetOrCreate(new_config->cluster_name(),
633                                                     new_eds_service_name);
634   }
635   // Update config state, now that we're done comparing old and new fields.
636   config_ = std::move(new_config);
637   cluster_resource_ = new_cluster_config.cluster;
638   drop_config_ = endpoint_config->endpoints != nullptr
639                      ? endpoint_config->endpoints->drop_config
640                      : nullptr;
641   // Update picker in case some dependent config field changed.
642   MaybeUpdatePickerLocked();
643   // Update child policy.
644   return UpdateChildPolicyLocked(std::move(args.addresses),
645                                  std::move(args.resolution_note), args.args);
646 }
647 
648 absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
MaybeCreateCertificateProviderLocked(const XdsClusterResource & cluster_resource) const649 XdsClusterImplLb::MaybeCreateCertificateProviderLocked(
650     const XdsClusterResource& cluster_resource) const {
651   // If the channel is not using XdsCreds, do nothing.
652   auto channel_credentials = channel_control_helper()->GetChannelCredentials();
653   if (channel_credentials == nullptr ||
654       channel_credentials->type() != XdsCredentials::Type()) {
655     return nullptr;
656   }
657   // Configure root cert.
658   absl::string_view root_provider_instance_name =
659       cluster_resource.common_tls_context.certificate_validation_context
660           .ca_certificate_provider_instance.instance_name;
661   absl::string_view root_cert_name =
662       cluster_resource.common_tls_context.certificate_validation_context
663           .ca_certificate_provider_instance.certificate_name;
664   RefCountedPtr<grpc_tls_certificate_provider> root_cert_provider;
665   if (!root_provider_instance_name.empty()) {
666     root_cert_provider =
667         xds_client_->certificate_provider_store()
668             .CreateOrGetCertificateProvider(root_provider_instance_name);
669     if (root_cert_provider == nullptr) {
670       return absl::InternalError(
671           absl::StrCat("Certificate provider instance name: \"",
672                        root_provider_instance_name, "\" not recognized."));
673     }
674   }
675   // Configure identity cert.
676   absl::string_view identity_provider_instance_name =
677       cluster_resource.common_tls_context.tls_certificate_provider_instance
678           .instance_name;
679   absl::string_view identity_cert_name =
680       cluster_resource.common_tls_context.tls_certificate_provider_instance
681           .certificate_name;
682   RefCountedPtr<grpc_tls_certificate_provider> identity_cert_provider;
683   if (!identity_provider_instance_name.empty()) {
684     identity_cert_provider =
685         xds_client_->certificate_provider_store()
686             .CreateOrGetCertificateProvider(identity_provider_instance_name);
687     if (identity_cert_provider == nullptr) {
688       return absl::InternalError(
689           absl::StrCat("Certificate provider instance name: \"",
690                        identity_provider_instance_name, "\" not recognized."));
691     }
692   }
693   // Configure SAN matchers.
694   const std::vector<StringMatcher>& san_matchers =
695       cluster_resource.common_tls_context.certificate_validation_context
696           .match_subject_alt_names;
697   // Create xds cert provider.
698   return MakeRefCounted<XdsCertificateProvider>(
699       root_cert_provider, root_cert_name, identity_cert_provider,
700       identity_cert_name, san_matchers);
701 }
702 
MaybeUpdatePickerLocked()703 void XdsClusterImplLb::MaybeUpdatePickerLocked() {
704   // If we're dropping all calls, report READY, regardless of what (or
705   // whether) the child has reported.
706   if (drop_config_ != nullptr && drop_config_->drop_all()) {
707     auto drop_picker = MakeRefCounted<Picker>(this, picker_);
708     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
709       gpr_log(GPR_INFO,
710               "[xds_cluster_impl_lb %p] updating connectivity (drop all): "
711               "state=READY picker=%p",
712               this, drop_picker.get());
713     }
714     channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
715                                           std::move(drop_picker));
716     return;
717   }
718   // Otherwise, update only if we have a child picker.
719   if (picker_ != nullptr) {
720     auto drop_picker = MakeRefCounted<Picker>(this, picker_);
721     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
722       gpr_log(GPR_INFO,
723               "[xds_cluster_impl_lb %p] updating connectivity: state=%s "
724               "status=(%s) picker=%p",
725               this, ConnectivityStateName(state_), status_.ToString().c_str(),
726               drop_picker.get());
727     }
728     channel_control_helper()->UpdateState(state_, status_,
729                                           std::move(drop_picker));
730   }
731 }
732 
CreateChildPolicyLocked(const ChannelArgs & args)733 OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
734     const ChannelArgs& args) {
735   LoadBalancingPolicy::Args lb_policy_args;
736   lb_policy_args.work_serializer = work_serializer();
737   lb_policy_args.args = args;
738   lb_policy_args.channel_control_helper = std::make_unique<Helper>(
739       RefAsSubclass<XdsClusterImplLb>(DEBUG_LOCATION, "Helper"));
740   OrphanablePtr<LoadBalancingPolicy> lb_policy =
741       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
742                                          &grpc_xds_cluster_impl_lb_trace);
743   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
744     gpr_log(GPR_INFO,
745             "[xds_cluster_impl_lb %p] Created new child policy handler %p",
746             this, lb_policy.get());
747   }
748   // Add our interested_parties pollset_set to that of the newly created
749   // child policy. This will make the child policy progress upon activity on
750   // this policy, which in turn is tied to the application's call.
751   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
752                                    interested_parties());
753   return lb_policy;
754 }
755 
UpdateChildPolicyLocked(absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,std::string resolution_note,const ChannelArgs & args)756 absl::Status XdsClusterImplLb::UpdateChildPolicyLocked(
757     absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
758     std::string resolution_note, const ChannelArgs& args) {
759   // Create policy if needed.
760   if (child_policy_ == nullptr) {
761     child_policy_ = CreateChildPolicyLocked(args);
762   }
763   // Construct update args.
764   UpdateArgs update_args;
765   update_args.addresses = std::move(addresses);
766   update_args.resolution_note = std::move(resolution_note);
767   update_args.config = config_->child_policy();
768   update_args.args =
769       args.Set(GRPC_ARG_XDS_CLUSTER_NAME, config_->cluster_name());
770   // Update the policy.
771   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
772     gpr_log(GPR_INFO,
773             "[xds_cluster_impl_lb %p] Updating child policy handler %p", this,
774             child_policy_.get());
775   }
776   return child_policy_->UpdateLocked(std::move(update_args));
777 }
778 
779 //
780 // XdsClusterImplLb::Helper
781 //
782 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)783 RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
784     const grpc_resolved_address& address, const ChannelArgs& per_address_args,
785     const ChannelArgs& args) {
786   if (parent()->shutting_down_) return nullptr;
787   // Wrap the subchannel so that we pass along the locality label and
788   // (if load reporting is enabled) the locality stats object, which
789   // will be used by the picker.
790   auto locality_name = per_address_args.GetObjectRef<XdsLocalityName>();
791   RefCountedPtr<XdsClusterLocalityStats> locality_stats;
792   if (parent()->cluster_resource_->lrs_load_reporting_server.has_value()) {
793     locality_stats = parent()->xds_client_->AddClusterLocalityStats(
794         parent()->cluster_resource_->lrs_load_reporting_server.value(),
795         parent()->config_->cluster_name(),
796         GetEdsResourceName(*parent()->cluster_resource_), locality_name);
797     if (locality_stats == nullptr) {
798       gpr_log(GPR_ERROR,
799               "[xds_cluster_impl_lb %p] Failed to get locality stats object "
800               "for LRS server %s, cluster %s, EDS service name %s; load "
801               "reports will not be generated",
802               parent(),
803               parent()
804                   ->cluster_resource_->lrs_load_reporting_server->server_uri()
805                   .c_str(),
806               parent()->config_->cluster_name().c_str(),
807               GetEdsResourceName(*parent()->cluster_resource_).c_str());
808     }
809   }
810   StatsSubchannelWrapper::LocalityData locality_data;
811   if (locality_stats != nullptr) {
812     locality_data = std::move(locality_stats);
813   } else {
814     locality_data = locality_name->human_readable_string();
815   }
816   return MakeRefCounted<StatsSubchannelWrapper>(
817       parent()->channel_control_helper()->CreateSubchannel(
818           address, per_address_args, args),
819       std::move(locality_data));
820 }
821 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)822 void XdsClusterImplLb::Helper::UpdateState(
823     grpc_connectivity_state state, const absl::Status& status,
824     RefCountedPtr<SubchannelPicker> picker) {
825   if (parent()->shutting_down_) return;
826   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
827     gpr_log(GPR_INFO,
828             "[xds_cluster_impl_lb %p] child connectivity state update: "
829             "state=%s (%s) picker=%p",
830             parent(), ConnectivityStateName(state), status.ToString().c_str(),
831             picker.get());
832   }
833   // Save the state and picker.
834   parent()->state_ = state;
835   parent()->status_ = status;
836   parent()->picker_ = std::move(picker);
837   // Wrap the picker and return it to the channel.
838   parent()->MaybeUpdatePickerLocked();
839 }
840 
841 //
842 // factory
843 //
844 
JsonLoader(const JsonArgs &)845 const JsonLoaderInterface* XdsClusterImplLbConfig::JsonLoader(const JsonArgs&) {
846   static const auto* loader =
847       JsonObjectLoader<XdsClusterImplLbConfig>()
848           // Note: Some fields require custom processing, so they are
849           // handled in JsonPostLoad() instead.
850           .Field("clusterName", &XdsClusterImplLbConfig::cluster_name_)
851           .Finish();
852   return loader;
853 }
854 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)855 void XdsClusterImplLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
856                                           ValidationErrors* errors) {
857   // Parse "childPolicy" field.
858   ValidationErrors::ScopedField field(errors, ".childPolicy");
859   auto it = json.object().find("childPolicy");
860   if (it == json.object().end()) {
861     errors->AddError("field not present");
862   } else {
863     auto lb_config =
864         CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
865             it->second);
866     if (!lb_config.ok()) {
867       errors->AddError(lb_config.status().message());
868     } else {
869       child_policy_ = std::move(*lb_config);
870     }
871   }
872 }
873 
874 class XdsClusterImplLbFactory final : public LoadBalancingPolicyFactory {
875  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const876   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
877       LoadBalancingPolicy::Args args) const override {
878     auto xds_client = args.args.GetObjectRef<GrpcXdsClient>(DEBUG_LOCATION,
879                                                             "XdsClusterImplLb");
880     if (xds_client == nullptr) {
881       gpr_log(GPR_ERROR,
882               "XdsClient not present in channel args -- cannot instantiate "
883               "xds_cluster_impl LB policy");
884       return nullptr;
885     }
886     return MakeOrphanable<XdsClusterImplLb>(std::move(xds_client),
887                                             std::move(args));
888   }
889 
name() const890   absl::string_view name() const override { return kXdsClusterImpl; }
891 
892   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const893   ParseLoadBalancingConfig(const Json& json) const override {
894     return LoadFromJson<RefCountedPtr<XdsClusterImplLbConfig>>(
895         json, JsonArgs(),
896         "errors validating xds_cluster_impl LB policy config");
897   }
898 };
899 
900 }  // namespace
901 
RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder * builder)902 void RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder* builder) {
903   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
904       std::make_unique<XdsClusterImplLbFactory>());
905 }
906 
907 }  // namespace grpc_core
908