xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/xds/cds.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <algorithm>
20 #include <map>
21 #include <memory>
22 #include <set>
23 #include <string>
24 #include <type_traits>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/status/status.h"
29 #include "absl/status/statusor.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/types/optional.h"
33 #include "absl/types/variant.h"
34 
35 #include <grpc/grpc_security.h>
36 #include <grpc/impl/connectivity_state.h>
37 #include <grpc/support/json.h>
38 #include <grpc/support/log.h>
39 
40 #include "src/core/ext/xds/xds_cluster.h"
41 #include "src/core/ext/xds/xds_common_types.h"
42 #include "src/core/ext/xds/xds_health_status.h"
43 #include "src/core/lib/channel/channel_args.h"
44 #include "src/core/lib/config/core_configuration.h"
45 #include "src/core/lib/debug/trace.h"
46 #include "src/core/lib/gprpp/debug_location.h"
47 #include "src/core/lib/gprpp/env.h"
48 #include "src/core/lib/gprpp/match.h"
49 #include "src/core/lib/gprpp/orphanable.h"
50 #include "src/core/lib/gprpp/ref_counted_ptr.h"
51 #include "src/core/lib/gprpp/time.h"
52 #include "src/core/lib/gprpp/unique_type_name.h"
53 #include "src/core/lib/gprpp/work_serializer.h"
54 #include "src/core/lib/iomgr/pollset_set.h"
55 #include "src/core/lib/json/json.h"
56 #include "src/core/lib/json/json_args.h"
57 #include "src/core/lib/json/json_object_loader.h"
58 #include "src/core/lib/json/json_writer.h"
59 #include "src/core/load_balancing/address_filtering.h"
60 #include "src/core/load_balancing/delegating_helper.h"
61 #include "src/core/load_balancing/lb_policy.h"
62 #include "src/core/load_balancing/lb_policy_factory.h"
63 #include "src/core/load_balancing/lb_policy_registry.h"
64 #include "src/core/load_balancing/outlier_detection/outlier_detection.h"
65 #include "src/core/load_balancing/xds/xds_channel_args.h"
66 #include "src/core/resolver/xds/xds_dependency_manager.h"
67 
68 namespace grpc_core {
69 
70 TraceFlag grpc_cds_lb_trace(false, "cds_lb");
71 
72 namespace {
73 
74 // TODO(roth): Remove this after the 1.63 release.
XdsAggregateClusterBackwardCompatibilityEnabled()75 bool XdsAggregateClusterBackwardCompatibilityEnabled() {
76   auto value = GetEnv("GRPC_XDS_AGGREGATE_CLUSTER_BACKWARD_COMPAT");
77   if (!value.has_value()) return false;
78   bool parsed_value;
79   bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
80   return parse_succeeded && parsed_value;
81 }
82 
83 using XdsConfig = XdsDependencyManager::XdsConfig;
84 
85 constexpr absl::string_view kCds = "cds_experimental";
86 
87 // Config for this LB policy.
88 class CdsLbConfig final : public LoadBalancingPolicy::Config {
89  public:
90   CdsLbConfig() = default;
91 
92   CdsLbConfig(const CdsLbConfig&) = delete;
93   CdsLbConfig& operator=(const CdsLbConfig&) = delete;
94 
95   CdsLbConfig(CdsLbConfig&& other) = delete;
96   CdsLbConfig& operator=(CdsLbConfig&& other) = delete;
97 
name() const98   absl::string_view name() const override { return kCds; }
99 
cluster() const100   const std::string& cluster() const { return cluster_; }
is_dynamic() const101   bool is_dynamic() const { return is_dynamic_; }
102 
JsonLoader(const JsonArgs &)103   static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
104     static const auto* loader =
105         JsonObjectLoader<CdsLbConfig>()
106             .Field("cluster", &CdsLbConfig::cluster_)
107             .OptionalField("isDynamic", &CdsLbConfig::is_dynamic_)
108             .Finish();
109     return loader;
110   }
111 
112  private:
113   std::string cluster_;
114   bool is_dynamic_ = false;
115 };
116 
117 // CDS LB policy.
118 class CdsLb final : public LoadBalancingPolicy {
119  public:
120   explicit CdsLb(Args args);
121 
name() const122   absl::string_view name() const override { return kCds; }
123 
124   absl::Status UpdateLocked(UpdateArgs args) override;
125   void ResetBackoffLocked() override;
126   void ExitIdleLocked() override;
127 
128  private:
129   // Delegating helper to be passed to child policy.
130   using Helper = ParentOwningDelegatingChannelControlHelper<CdsLb>;
131 
132   // State used to retain child policy names for the priority policy.
133   struct ChildNameState {
134     std::vector<size_t /*child_number*/> priority_child_numbers;
135     size_t next_available_child_number = 0;
136 
Resetgrpc_core::__anona38954c30111::CdsLb::ChildNameState137     void Reset() {
138       priority_child_numbers.clear();
139       next_available_child_number = 0;
140     }
141   };
142 
143   ~CdsLb() override;
144 
145   void ShutdownLocked() override;
146 
147   // Computes child numbers for new_cluster, reusing child numbers
148   // from old_cluster and child_name_state_ in an intelligent
149   // way to avoid unnecessary churn.
150   ChildNameState ComputeChildNames(
151       const XdsConfig::ClusterConfig* old_cluster,
152       const XdsConfig::ClusterConfig& new_cluster,
153       const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) const;
154 
155   std::string GetChildPolicyName(const std::string& cluster, size_t priority);
156 
157   Json CreateChildPolicyConfigForLeafCluster(
158       const XdsConfig::ClusterConfig& new_cluster,
159       const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config,
160       const XdsClusterResource* aggregate_cluster_resource);
161   Json CreateChildPolicyConfigForAggregateCluster(
162       const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config);
163 
164   void ResetState();
165 
166   void ReportTransientFailure(absl::Status status);
167 
168   std::string cluster_name_;
169   RefCountedPtr<const XdsConfig> xds_config_;
170 
171   // Cluster subscription, for dynamic clusters (e.g., RLS).
172   RefCountedPtr<XdsDependencyManager::ClusterSubscription> subscription_;
173 
174   ChildNameState child_name_state_;
175 
176   // Child LB policy.
177   OrphanablePtr<LoadBalancingPolicy> child_policy_;
178 
179   // Internal state.
180   bool shutting_down_ = false;
181 };
182 
183 //
184 // CdsLb
185 //
186 
CdsLb(Args args)187 CdsLb::CdsLb(Args args) : LoadBalancingPolicy(std::move(args)) {
188   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
189     gpr_log(GPR_INFO, "[cdslb %p] created", this);
190   }
191 }
192 
~CdsLb()193 CdsLb::~CdsLb() {
194   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
195     gpr_log(GPR_INFO, "[cdslb %p] destroying cds LB policy", this);
196   }
197 }
198 
ShutdownLocked()199 void CdsLb::ShutdownLocked() {
200   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
201     gpr_log(GPR_INFO, "[cdslb %p] shutting down", this);
202   }
203   shutting_down_ = true;
204   ResetState();
205 }
206 
ResetBackoffLocked()207 void CdsLb::ResetBackoffLocked() {
208   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
209 }
210 
ExitIdleLocked()211 void CdsLb::ExitIdleLocked() {
212   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
213 }
214 
215 // We need at least one priority for each discovery mechanism, just so that we
216 // have a child in which to create the xds_cluster_impl policy.  This ensures
217 // that we properly handle the case of a discovery mechanism dropping 100% of
218 // calls, the OnError() case, and the OnResourceDoesNotExist() case.
GetUpdatePriorityList(const XdsEndpointResource * update)219 const XdsEndpointResource::PriorityList& GetUpdatePriorityList(
220     const XdsEndpointResource* update) {
221   static const NoDestruct<XdsEndpointResource::PriorityList>
222       kPriorityListWithEmptyPriority(1);
223   if (update == nullptr || update->priorities.empty()) {
224     return *kPriorityListWithEmptyPriority;
225   }
226   return update->priorities;
227 }
228 
MakeChildPolicyName(absl::string_view cluster,size_t child_number)229 std::string MakeChildPolicyName(absl::string_view cluster,
230                                 size_t child_number) {
231   return absl::StrCat("{cluster=", cluster, ", child_number=", child_number,
232                       "}");
233 }
234 
235 class PriorityEndpointIterator final : public EndpointAddressesIterator {
236  public:
PriorityEndpointIterator(std::string cluster_name,std::shared_ptr<const XdsEndpointResource> endpoints,std::vector<size_t> priority_child_numbers)237   PriorityEndpointIterator(
238       std::string cluster_name,
239       std::shared_ptr<const XdsEndpointResource> endpoints,
240       std::vector<size_t /*child_number*/> priority_child_numbers)
241       : cluster_name_(std::move(cluster_name)),
242         endpoints_(std::move(endpoints)),
243         priority_child_numbers_(std::move(priority_child_numbers)) {}
244 
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const245   void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
246       const override {
247     const auto& priority_list = GetUpdatePriorityList(endpoints_.get());
248     for (size_t priority = 0; priority < priority_list.size(); ++priority) {
249       const auto& priority_entry = priority_list[priority];
250       std::string priority_child_name =
251           MakeChildPolicyName(cluster_name_, priority_child_numbers_[priority]);
252       for (const auto& p : priority_entry.localities) {
253         const auto& locality_name = p.first;
254         const auto& locality = p.second;
255         std::vector<RefCountedStringValue> hierarchical_path = {
256             RefCountedStringValue(priority_child_name),
257             locality_name->human_readable_string()};
258         auto hierarchical_path_attr =
259             MakeRefCounted<HierarchicalPathArg>(std::move(hierarchical_path));
260         for (const auto& endpoint : locality.endpoints) {
261           uint32_t endpoint_weight =
262               locality.lb_weight *
263               endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
264           callback(EndpointAddresses(
265               endpoint.addresses(),
266               endpoint.args()
267                   .SetObject(hierarchical_path_attr)
268                   .Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight)
269                   .SetObject(locality_name->Ref())
270                   .Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight)));
271         }
272       }
273     }
274   }
275 
276  private:
277   std::string cluster_name_;
278   std::shared_ptr<const XdsEndpointResource> endpoints_;
279   std::vector<size_t /*child_number*/> priority_child_numbers_;
280 };
281 
UpdateLocked(UpdateArgs args)282 absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
283   // Get new config.
284   auto new_config = args.config.TakeAsSubclass<CdsLbConfig>();
285   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
286     gpr_log(GPR_INFO, "[cdslb %p] received update: cluster=%s is_dynamic=%d",
287             this, new_config->cluster().c_str(), new_config->is_dynamic());
288   }
289   GPR_ASSERT(new_config != nullptr);
290   // Cluster name should never change, because we should use a different
291   // child name in xds_cluster_manager in that case.
292   if (cluster_name_.empty()) {
293     cluster_name_ = new_config->cluster();
294   } else {
295     GPR_ASSERT(cluster_name_ == new_config->cluster());
296   }
297   // Start dynamic subscription if needed.
298   if (new_config->is_dynamic() && subscription_ == nullptr) {
299     if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
300       gpr_log(GPR_INFO,
301               "[cdslb %p] obtaining dynamic subscription for cluster %s", this,
302               cluster_name_.c_str());
303     }
304     auto* dependency_mgr = args.args.GetObject<XdsDependencyManager>();
305     if (dependency_mgr == nullptr) {
306       // Should never happen.
307       absl::Status status =
308           absl::InternalError("xDS dependency mgr not passed to CDS LB policy");
309       ReportTransientFailure(status);
310       return status;
311     }
312     subscription_ = dependency_mgr->GetClusterSubscription(cluster_name_);
313   }
314   // Get xDS config.
315   auto new_xds_config = args.args.GetObjectRef<XdsConfig>();
316   if (new_xds_config == nullptr) {
317     // Should never happen.
318     absl::Status status =
319         absl::InternalError("xDS config not passed to CDS LB policy");
320     ReportTransientFailure(status);
321     return status;
322   }
323   auto it = new_xds_config->clusters.find(cluster_name_);
324   if (it == new_xds_config->clusters.end()) {
325     // Cluster not present.
326     if (new_config->is_dynamic()) {
327       // If we are already subscribed, it's possible that we just
328       // recently subscribed but another update came through before we
329       // got the new cluster, in which case it will still be missing.
330       if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
331         gpr_log(GPR_INFO,
332                 "[cdslb %p] xDS config has no entry for dynamic cluster %s, "
333                 "waiting for subsequent update",
334                 this, cluster_name_.c_str());
335       }
336       // Stay in CONNECTING until we get an update that has the cluster.
337       return absl::OkStatus();
338     }
339     // Not a dynamic cluster.  This should never happen.
340     absl::Status status = absl::UnavailableError(absl::StrCat(
341         "xDS config has no entry for static cluster ", cluster_name_));
342     ReportTransientFailure(status);
343     return status;
344   }
345   auto& new_cluster_config = it->second;
346   // If new list is not OK, report TRANSIENT_FAILURE.
347   if (!new_cluster_config.ok()) {
348     ReportTransientFailure(new_cluster_config.status());
349     return new_cluster_config.status();
350   }
351   GPR_ASSERT(new_cluster_config->cluster != nullptr);
352   // Find old cluster, if any.
353   const XdsConfig::ClusterConfig* old_cluster_config = nullptr;
354   if (xds_config_ != nullptr) {
355     auto it_old = xds_config_->clusters.find(cluster_name_);
356     if (it_old != xds_config_->clusters.end() && it_old->second.ok()) {
357       old_cluster_config = &*it_old->second;
358       // If nothing changed for a leaf cluster, then ignore the update.
359       // Can't do this for an aggregate cluster, because even if the aggregate
360       // cluster itself didn't change, the leaf clusters may have changed.
361       if (*new_cluster_config == *old_cluster_config &&
362           absl::holds_alternative<XdsConfig::ClusterConfig::EndpointConfig>(
363               new_cluster_config->children)) {
364         return absl::OkStatus();
365       }
366     }
367   }
368   // TODO(roth): Remove this after the 1.63 release.
369   const XdsClusterResource* aggregate_cluster_resource = nullptr;
370   static constexpr absl::string_view kArgXdsAggregateClusterName =
371       GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_aggregate_cluster_name";
372   if (XdsAggregateClusterBackwardCompatibilityEnabled()) {
373     if (absl::holds_alternative<XdsConfig::ClusterConfig::EndpointConfig>(
374             new_cluster_config->children)) {
375       auto aggregate_cluster = args.args.GetString(kArgXdsAggregateClusterName);
376       if (aggregate_cluster.has_value()) {
377         auto it = new_xds_config->clusters.find(*aggregate_cluster);
378         if (it == new_xds_config->clusters.end()) {
379           // Cluster not present.  This should never happen.
380           absl::Status status = absl::UnavailableError(
381               absl::StrCat("xDS config has no entry for aggregate cluster ",
382                            *aggregate_cluster));
383           ReportTransientFailure(status);
384           return status;
385         }
386         auto& aggregate_cluster_config = it->second;
387         if (!aggregate_cluster_config.ok()) {
388           ReportTransientFailure(aggregate_cluster_config.status());
389           return aggregate_cluster_config.status();
390         }
391         GPR_ASSERT(aggregate_cluster_config->cluster != nullptr);
392         aggregate_cluster_resource = aggregate_cluster_config->cluster.get();
393       }
394     } else {
395       args.args = args.args.Set(kArgXdsAggregateClusterName, cluster_name_);
396     }
397   }
398   // Construct child policy config and update state based on the cluster type.
399   Json child_policy_config_json;
400   UpdateArgs update_args;
401   Match(
402       new_cluster_config->children,
403       // Leaf cluster.
404       [&](const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) {
405         // Compute new child numbers.
406         child_name_state_ = ComputeChildNames(
407             old_cluster_config, *new_cluster_config, endpoint_config);
408         // Populate addresses and resolution_note for child policy.
409         update_args.addresses = std::make_shared<PriorityEndpointIterator>(
410             cluster_name_, endpoint_config.endpoints,
411             child_name_state_.priority_child_numbers);
412         update_args.resolution_note = endpoint_config.resolution_note;
413         // Construct child policy config.
414         child_policy_config_json = CreateChildPolicyConfigForLeafCluster(
415             *new_cluster_config, endpoint_config, aggregate_cluster_resource);
416       },
417       // Aggregate cluster.
418       [&](const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config) {
419         child_name_state_.Reset();
420         // Construct child policy config.
421         child_policy_config_json =
422             CreateChildPolicyConfigForAggregateCluster(aggregate_config);
423       });
424   // Swap in new xDS config, now that we're done with the old one.
425   xds_config_ = std::move(new_xds_config);
426   // Validate child policy config.
427   auto child_config =
428       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
429           child_policy_config_json);
430   if (!child_config.ok()) {
431     // Should never happen.
432     absl::Status status = absl::InternalError(
433         absl::StrCat(cluster_name_, ": error parsing child policy config: ",
434                      child_config.status().message()));
435     ReportTransientFailure(status);
436     return status;
437   }
438   // Create child policy if not already present.
439   if (child_policy_ == nullptr) {
440     LoadBalancingPolicy::Args lb_args;
441     lb_args.work_serializer = work_serializer();
442     lb_args.args = args.args;
443     lb_args.channel_control_helper =
444         std::make_unique<Helper>(RefAsSubclass<CdsLb>());
445     child_policy_ =
446         CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
447             (*child_config)->name(), std::move(lb_args));
448     if (child_policy_ == nullptr) {
449       // Should never happen.
450       absl::Status status = absl::UnavailableError(
451           absl::StrCat(cluster_name_, ": failed to create child policy"));
452       ReportTransientFailure(status);
453       return status;
454     }
455     grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
456                                      interested_parties());
457     if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
458       gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this,
459               std::string((*child_config)->name()).c_str(),
460               child_policy_.get());
461     }
462   }
463   // Update child policy.
464   update_args.config = std::move(*child_config);
465   update_args.args = args.args;
466   return child_policy_->UpdateLocked(std::move(update_args));
467 }
468 
ComputeChildNames(const XdsConfig::ClusterConfig * old_cluster,const XdsConfig::ClusterConfig & new_cluster,const XdsConfig::ClusterConfig::EndpointConfig & endpoint_config) const469 CdsLb::ChildNameState CdsLb::ComputeChildNames(
470     const XdsConfig::ClusterConfig* old_cluster,
471     const XdsConfig::ClusterConfig& new_cluster,
472     const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) const {
473   GPR_ASSERT(
474       !absl::holds_alternative<XdsConfig::ClusterConfig::AggregateConfig>(
475           new_cluster.children));
476   // First, build some maps from locality to child number and the reverse
477   // from old_cluster and child_name_state_.
478   std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
479       locality_child_map;
480   std::map<size_t, std::set<XdsLocalityName*, XdsLocalityName::Less>>
481       child_locality_map;
482   if (old_cluster != nullptr) {
483     auto* old_endpoint_config =
484         absl::get_if<XdsConfig::ClusterConfig::EndpointConfig>(
485             &old_cluster->children);
486     if (old_endpoint_config != nullptr) {
487       const auto& prev_priority_list =
488           GetUpdatePriorityList(old_endpoint_config->endpoints.get());
489       for (size_t priority = 0; priority < prev_priority_list.size();
490            ++priority) {
491         size_t child_number =
492             child_name_state_.priority_child_numbers[priority];
493         const auto& localities = prev_priority_list[priority].localities;
494         for (const auto& p : localities) {
495           XdsLocalityName* locality_name = p.first;
496           locality_child_map[locality_name] = child_number;
497           child_locality_map[child_number].insert(locality_name);
498         }
499       }
500     }
501   }
502   // Now construct new state containing priority child numbers for the new
503   // cluster based on the maps constructed above.
504   ChildNameState new_child_name_state;
505   new_child_name_state.next_available_child_number =
506       child_name_state_.next_available_child_number;
507   const XdsEndpointResource::PriorityList& priority_list =
508       GetUpdatePriorityList(endpoint_config.endpoints.get());
509   for (size_t priority = 0; priority < priority_list.size(); ++priority) {
510     const auto& localities = priority_list[priority].localities;
511     absl::optional<size_t> child_number;
512     // If one of the localities in this priority already existed, reuse its
513     // child number.
514     for (const auto& p : localities) {
515       XdsLocalityName* locality_name = p.first;
516       if (!child_number.has_value()) {
517         auto it = locality_child_map.find(locality_name);
518         if (it != locality_child_map.end()) {
519           child_number = it->second;
520           locality_child_map.erase(it);
521           // Remove localities that *used* to be in this child number, so
522           // that we don't incorrectly reuse this child number for a
523           // subsequent priority.
524           for (XdsLocalityName* old_locality :
525                child_locality_map[*child_number]) {
526             locality_child_map.erase(old_locality);
527           }
528         }
529       } else {
530         // Remove all localities that are now in this child number, so
531         // that we don't accidentally reuse this child number for a
532         // subsequent priority.
533         locality_child_map.erase(locality_name);
534       }
535     }
536     // If we didn't find an existing child number, assign a new one.
537     if (!child_number.has_value()) {
538       for (child_number = new_child_name_state.next_available_child_number;
539            child_locality_map.find(*child_number) != child_locality_map.end();
540            ++(*child_number)) {
541       }
542       new_child_name_state.next_available_child_number = *child_number + 1;
543       // Add entry so we know that the child number is in use.
544       // (Don't need to add the list of localities, since we won't use them.)
545       child_locality_map[*child_number];
546     }
547     new_child_name_state.priority_child_numbers.push_back(*child_number);
548   }
549   return new_child_name_state;
550 }
551 
CreateChildPolicyConfigForLeafCluster(const XdsConfig::ClusterConfig & new_cluster,const XdsConfig::ClusterConfig::EndpointConfig & endpoint_config,const XdsClusterResource * aggregate_cluster_resource)552 Json CdsLb::CreateChildPolicyConfigForLeafCluster(
553     const XdsConfig::ClusterConfig& new_cluster,
554     const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config,
555     const XdsClusterResource* aggregate_cluster_resource) {
556   const auto& cluster_resource = *new_cluster.cluster;
557   const bool is_logical_dns =
558       absl::holds_alternative<XdsClusterResource::LogicalDns>(
559           cluster_resource.type);
560   // Determine what xDS LB policy to use.
561   Json xds_lb_policy;
562   if (is_logical_dns) {
563     xds_lb_policy = Json::FromArray({
564         Json::FromObject({
565             {"pick_first", Json::FromObject({})},
566         }),
567     });
568   }
569   // TODO(roth): Remove this "else if" block after the 1.63 release.
570   else if (XdsAggregateClusterBackwardCompatibilityEnabled() &&
571            aggregate_cluster_resource != nullptr) {
572     xds_lb_policy =
573         Json::FromArray(aggregate_cluster_resource->lb_policy_config);
574   } else {
575     xds_lb_policy = Json::FromArray(new_cluster.cluster->lb_policy_config);
576   }
577   // Wrap it in the priority policy.
578   Json::Object priority_children;
579   Json::Array priority_priorities;
580   const auto& priority_list =
581       GetUpdatePriorityList(endpoint_config.endpoints.get());
582   for (size_t priority = 0; priority < priority_list.size(); ++priority) {
583     // Add priority entry, with the appropriate child name.
584     std::string child_name = MakeChildPolicyName(
585         cluster_name_, child_name_state_.priority_child_numbers[priority]);
586     priority_priorities.emplace_back(Json::FromString(child_name));
587     Json::Object child_config = {{"config", xds_lb_policy}};
588     if (!is_logical_dns) {
589       child_config["ignore_reresolution_requests"] = Json::FromBool(true);
590     }
591     priority_children[child_name] = Json::FromObject(std::move(child_config));
592   }
593   Json priority_policy = Json::FromArray({Json::FromObject({
594       {"priority_experimental",
595        Json::FromObject({
596            {"children", Json::FromObject(std::move(priority_children))},
597            {"priorities", Json::FromArray(std::move(priority_priorities))},
598        })},
599   })});
600   // Wrap the priority policy in the xds_override_host policy.
601   Json xds_override_host_policy = Json::FromArray({Json::FromObject({
602       {"xds_override_host_experimental",
603        Json::FromObject({
604            {"clusterName", Json::FromString(cluster_name_)},
605            {"childPolicy", std::move(priority_policy)},
606        })},
607   })});
608   // Wrap the xds_override_host policy in the xds_cluster_impl policy.
609   Json xds_cluster_impl_policy = Json::FromArray({Json::FromObject({
610       {"xds_cluster_impl_experimental",
611        Json::FromObject({
612            {"clusterName", Json::FromString(cluster_name_)},
613            {"childPolicy", std::move(xds_override_host_policy)},
614        })},
615   })});
616   // Wrap the xds_cluster_impl policy in the outlier_detection policy.
617   Json::Object outlier_detection_config = {
618       {"childPolicy", std::move(xds_cluster_impl_policy)},
619   };
620   if (cluster_resource.outlier_detection.has_value()) {
621     auto& outlier_detection_update = *cluster_resource.outlier_detection;
622     outlier_detection_config["interval"] =
623         Json::FromString(outlier_detection_update.interval.ToJsonString());
624     outlier_detection_config["baseEjectionTime"] = Json::FromString(
625         outlier_detection_update.base_ejection_time.ToJsonString());
626     outlier_detection_config["maxEjectionTime"] = Json::FromString(
627         outlier_detection_update.max_ejection_time.ToJsonString());
628     outlier_detection_config["maxEjectionPercent"] =
629         Json::FromNumber(outlier_detection_update.max_ejection_percent);
630     if (outlier_detection_update.success_rate_ejection.has_value()) {
631       outlier_detection_config["successRateEjection"] = Json::FromObject({
632           {"stdevFactor",
633            Json::FromNumber(
634                outlier_detection_update.success_rate_ejection->stdev_factor)},
635           {"enforcementPercentage",
636            Json::FromNumber(outlier_detection_update.success_rate_ejection
637                                 ->enforcement_percentage)},
638           {"minimumHosts",
639            Json::FromNumber(
640                outlier_detection_update.success_rate_ejection->minimum_hosts)},
641           {"requestVolume",
642            Json::FromNumber(
643                outlier_detection_update.success_rate_ejection->request_volume)},
644       });
645     }
646     if (outlier_detection_update.failure_percentage_ejection.has_value()) {
647       outlier_detection_config["failurePercentageEjection"] = Json::FromObject({
648           {"threshold",
649            Json::FromNumber(outlier_detection_update
650                                 .failure_percentage_ejection->threshold)},
651           {"enforcementPercentage",
652            Json::FromNumber(
653                outlier_detection_update.failure_percentage_ejection
654                    ->enforcement_percentage)},
655           {"minimumHosts",
656            Json::FromNumber(outlier_detection_update
657                                 .failure_percentage_ejection->minimum_hosts)},
658           {"requestVolume",
659            Json::FromNumber(outlier_detection_update
660                                 .failure_percentage_ejection->request_volume)},
661       });
662     }
663   }
664   Json outlier_detection_policy = Json::FromArray({Json::FromObject({
665       {"outlier_detection_experimental",
666        Json::FromObject(std::move(outlier_detection_config))},
667   })});
668   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
669     gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this,
670             JsonDump(outlier_detection_policy, /*indent=*/1).c_str());
671   }
672   return outlier_detection_policy;
673 }
674 
CreateChildPolicyConfigForAggregateCluster(const XdsConfig::ClusterConfig::AggregateConfig & aggregate_config)675 Json CdsLb::CreateChildPolicyConfigForAggregateCluster(
676     const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config) {
677   Json::Object priority_children;
678   Json::Array priority_priorities;
679   for (const absl::string_view& leaf_cluster : aggregate_config.leaf_clusters) {
680     priority_children[std::string(leaf_cluster)] = Json::FromObject({
681         {"config",
682          Json::FromArray({
683              Json::FromObject({
684                  {"cds_experimental",
685                   Json::FromObject({
686                       {"cluster", Json::FromString(std::string(leaf_cluster))},
687                   })},
688              }),
689          })},
690     });
691     priority_priorities.emplace_back(
692         Json::FromString(std::string(leaf_cluster)));
693   }
694   Json json = Json::FromArray({Json::FromObject({
695       {"priority_experimental",
696        Json::FromObject({
697            {"children", Json::FromObject(std::move(priority_children))},
698            {"priorities", Json::FromArray(std::move(priority_priorities))},
699        })},
700   })});
701   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
702     gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this,
703             JsonDump(json, /*indent=*/1).c_str());
704   }
705   return json;
706 }
707 
ResetState()708 void CdsLb::ResetState() {
709   cluster_name_.clear();
710   xds_config_.reset();
711   child_name_state_.Reset();
712   if (child_policy_ != nullptr) {
713     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
714                                      interested_parties());
715     child_policy_.reset();
716   }
717 }
718 
ReportTransientFailure(absl::Status status)719 void CdsLb::ReportTransientFailure(absl::Status status) {
720   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
721     gpr_log(GPR_INFO, "[cdslb %p] reporting TRANSIENT_FAILURE: %s", this,
722             status.ToString().c_str());
723   }
724   ResetState();
725   channel_control_helper()->UpdateState(
726       GRPC_CHANNEL_TRANSIENT_FAILURE, status,
727       MakeRefCounted<TransientFailurePicker>(status));
728 }
729 
730 //
731 // factory
732 //
733 
734 class CdsLbFactory final : public LoadBalancingPolicyFactory {
735  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const736   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
737       LoadBalancingPolicy::Args args) const override {
738     return MakeOrphanable<CdsLb>(std::move(args));
739   }
740 
name() const741   absl::string_view name() const override { return kCds; }
742 
743   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const744   ParseLoadBalancingConfig(const Json& json) const override {
745     return LoadFromJson<RefCountedPtr<CdsLbConfig>>(
746         json, JsonArgs(), "errors validating cds LB policy config");
747   }
748 };
749 
750 }  // namespace
751 
RegisterCdsLbPolicy(CoreConfiguration::Builder * builder)752 void RegisterCdsLbPolicy(CoreConfiguration::Builder* builder) {
753   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
754       std::make_unique<CdsLbFactory>());
755 }
756 
757 }  // namespace grpc_core
758