1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <algorithm>
20 #include <map>
21 #include <memory>
22 #include <set>
23 #include <string>
24 #include <type_traits>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/status/status.h"
29 #include "absl/status/statusor.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/types/optional.h"
33 #include "absl/types/variant.h"
34
35 #include <grpc/grpc_security.h>
36 #include <grpc/impl/connectivity_state.h>
37 #include <grpc/support/json.h>
38 #include <grpc/support/log.h>
39
40 #include "src/core/ext/xds/xds_cluster.h"
41 #include "src/core/ext/xds/xds_common_types.h"
42 #include "src/core/ext/xds/xds_health_status.h"
43 #include "src/core/lib/channel/channel_args.h"
44 #include "src/core/lib/config/core_configuration.h"
45 #include "src/core/lib/debug/trace.h"
46 #include "src/core/lib/gprpp/debug_location.h"
47 #include "src/core/lib/gprpp/env.h"
48 #include "src/core/lib/gprpp/match.h"
49 #include "src/core/lib/gprpp/orphanable.h"
50 #include "src/core/lib/gprpp/ref_counted_ptr.h"
51 #include "src/core/lib/gprpp/time.h"
52 #include "src/core/lib/gprpp/unique_type_name.h"
53 #include "src/core/lib/gprpp/work_serializer.h"
54 #include "src/core/lib/iomgr/pollset_set.h"
55 #include "src/core/lib/json/json.h"
56 #include "src/core/lib/json/json_args.h"
57 #include "src/core/lib/json/json_object_loader.h"
58 #include "src/core/lib/json/json_writer.h"
59 #include "src/core/load_balancing/address_filtering.h"
60 #include "src/core/load_balancing/delegating_helper.h"
61 #include "src/core/load_balancing/lb_policy.h"
62 #include "src/core/load_balancing/lb_policy_factory.h"
63 #include "src/core/load_balancing/lb_policy_registry.h"
64 #include "src/core/load_balancing/outlier_detection/outlier_detection.h"
65 #include "src/core/load_balancing/xds/xds_channel_args.h"
66 #include "src/core/resolver/xds/xds_dependency_manager.h"
67
68 namespace grpc_core {
69
70 TraceFlag grpc_cds_lb_trace(false, "cds_lb");
71
72 namespace {
73
74 // TODO(roth): Remove this after the 1.63 release.
XdsAggregateClusterBackwardCompatibilityEnabled()75 bool XdsAggregateClusterBackwardCompatibilityEnabled() {
76 auto value = GetEnv("GRPC_XDS_AGGREGATE_CLUSTER_BACKWARD_COMPAT");
77 if (!value.has_value()) return false;
78 bool parsed_value;
79 bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
80 return parse_succeeded && parsed_value;
81 }
82
83 using XdsConfig = XdsDependencyManager::XdsConfig;
84
85 constexpr absl::string_view kCds = "cds_experimental";
86
87 // Config for this LB policy.
88 class CdsLbConfig final : public LoadBalancingPolicy::Config {
89 public:
90 CdsLbConfig() = default;
91
92 CdsLbConfig(const CdsLbConfig&) = delete;
93 CdsLbConfig& operator=(const CdsLbConfig&) = delete;
94
95 CdsLbConfig(CdsLbConfig&& other) = delete;
96 CdsLbConfig& operator=(CdsLbConfig&& other) = delete;
97
name() const98 absl::string_view name() const override { return kCds; }
99
cluster() const100 const std::string& cluster() const { return cluster_; }
is_dynamic() const101 bool is_dynamic() const { return is_dynamic_; }
102
JsonLoader(const JsonArgs &)103 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
104 static const auto* loader =
105 JsonObjectLoader<CdsLbConfig>()
106 .Field("cluster", &CdsLbConfig::cluster_)
107 .OptionalField("isDynamic", &CdsLbConfig::is_dynamic_)
108 .Finish();
109 return loader;
110 }
111
112 private:
113 std::string cluster_;
114 bool is_dynamic_ = false;
115 };
116
117 // CDS LB policy.
118 class CdsLb final : public LoadBalancingPolicy {
119 public:
120 explicit CdsLb(Args args);
121
name() const122 absl::string_view name() const override { return kCds; }
123
124 absl::Status UpdateLocked(UpdateArgs args) override;
125 void ResetBackoffLocked() override;
126 void ExitIdleLocked() override;
127
128 private:
129 // Delegating helper to be passed to child policy.
130 using Helper = ParentOwningDelegatingChannelControlHelper<CdsLb>;
131
132 // State used to retain child policy names for the priority policy.
133 struct ChildNameState {
134 std::vector<size_t /*child_number*/> priority_child_numbers;
135 size_t next_available_child_number = 0;
136
Resetgrpc_core::__anona38954c30111::CdsLb::ChildNameState137 void Reset() {
138 priority_child_numbers.clear();
139 next_available_child_number = 0;
140 }
141 };
142
143 ~CdsLb() override;
144
145 void ShutdownLocked() override;
146
147 // Computes child numbers for new_cluster, reusing child numbers
148 // from old_cluster and child_name_state_ in an intelligent
149 // way to avoid unnecessary churn.
150 ChildNameState ComputeChildNames(
151 const XdsConfig::ClusterConfig* old_cluster,
152 const XdsConfig::ClusterConfig& new_cluster,
153 const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) const;
154
155 std::string GetChildPolicyName(const std::string& cluster, size_t priority);
156
157 Json CreateChildPolicyConfigForLeafCluster(
158 const XdsConfig::ClusterConfig& new_cluster,
159 const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config,
160 const XdsClusterResource* aggregate_cluster_resource);
161 Json CreateChildPolicyConfigForAggregateCluster(
162 const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config);
163
164 void ResetState();
165
166 void ReportTransientFailure(absl::Status status);
167
168 std::string cluster_name_;
169 RefCountedPtr<const XdsConfig> xds_config_;
170
171 // Cluster subscription, for dynamic clusters (e.g., RLS).
172 RefCountedPtr<XdsDependencyManager::ClusterSubscription> subscription_;
173
174 ChildNameState child_name_state_;
175
176 // Child LB policy.
177 OrphanablePtr<LoadBalancingPolicy> child_policy_;
178
179 // Internal state.
180 bool shutting_down_ = false;
181 };
182
183 //
184 // CdsLb
185 //
186
CdsLb(Args args)187 CdsLb::CdsLb(Args args) : LoadBalancingPolicy(std::move(args)) {
188 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
189 gpr_log(GPR_INFO, "[cdslb %p] created", this);
190 }
191 }
192
~CdsLb()193 CdsLb::~CdsLb() {
194 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
195 gpr_log(GPR_INFO, "[cdslb %p] destroying cds LB policy", this);
196 }
197 }
198
ShutdownLocked()199 void CdsLb::ShutdownLocked() {
200 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
201 gpr_log(GPR_INFO, "[cdslb %p] shutting down", this);
202 }
203 shutting_down_ = true;
204 ResetState();
205 }
206
ResetBackoffLocked()207 void CdsLb::ResetBackoffLocked() {
208 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
209 }
210
ExitIdleLocked()211 void CdsLb::ExitIdleLocked() {
212 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
213 }
214
215 // We need at least one priority for each discovery mechanism, just so that we
216 // have a child in which to create the xds_cluster_impl policy. This ensures
217 // that we properly handle the case of a discovery mechanism dropping 100% of
218 // calls, the OnError() case, and the OnResourceDoesNotExist() case.
GetUpdatePriorityList(const XdsEndpointResource * update)219 const XdsEndpointResource::PriorityList& GetUpdatePriorityList(
220 const XdsEndpointResource* update) {
221 static const NoDestruct<XdsEndpointResource::PriorityList>
222 kPriorityListWithEmptyPriority(1);
223 if (update == nullptr || update->priorities.empty()) {
224 return *kPriorityListWithEmptyPriority;
225 }
226 return update->priorities;
227 }
228
MakeChildPolicyName(absl::string_view cluster,size_t child_number)229 std::string MakeChildPolicyName(absl::string_view cluster,
230 size_t child_number) {
231 return absl::StrCat("{cluster=", cluster, ", child_number=", child_number,
232 "}");
233 }
234
235 class PriorityEndpointIterator final : public EndpointAddressesIterator {
236 public:
PriorityEndpointIterator(std::string cluster_name,std::shared_ptr<const XdsEndpointResource> endpoints,std::vector<size_t> priority_child_numbers)237 PriorityEndpointIterator(
238 std::string cluster_name,
239 std::shared_ptr<const XdsEndpointResource> endpoints,
240 std::vector<size_t /*child_number*/> priority_child_numbers)
241 : cluster_name_(std::move(cluster_name)),
242 endpoints_(std::move(endpoints)),
243 priority_child_numbers_(std::move(priority_child_numbers)) {}
244
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const245 void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
246 const override {
247 const auto& priority_list = GetUpdatePriorityList(endpoints_.get());
248 for (size_t priority = 0; priority < priority_list.size(); ++priority) {
249 const auto& priority_entry = priority_list[priority];
250 std::string priority_child_name =
251 MakeChildPolicyName(cluster_name_, priority_child_numbers_[priority]);
252 for (const auto& p : priority_entry.localities) {
253 const auto& locality_name = p.first;
254 const auto& locality = p.second;
255 std::vector<RefCountedStringValue> hierarchical_path = {
256 RefCountedStringValue(priority_child_name),
257 locality_name->human_readable_string()};
258 auto hierarchical_path_attr =
259 MakeRefCounted<HierarchicalPathArg>(std::move(hierarchical_path));
260 for (const auto& endpoint : locality.endpoints) {
261 uint32_t endpoint_weight =
262 locality.lb_weight *
263 endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
264 callback(EndpointAddresses(
265 endpoint.addresses(),
266 endpoint.args()
267 .SetObject(hierarchical_path_attr)
268 .Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight)
269 .SetObject(locality_name->Ref())
270 .Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight)));
271 }
272 }
273 }
274 }
275
276 private:
277 std::string cluster_name_;
278 std::shared_ptr<const XdsEndpointResource> endpoints_;
279 std::vector<size_t /*child_number*/> priority_child_numbers_;
280 };
281
UpdateLocked(UpdateArgs args)282 absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
283 // Get new config.
284 auto new_config = args.config.TakeAsSubclass<CdsLbConfig>();
285 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
286 gpr_log(GPR_INFO, "[cdslb %p] received update: cluster=%s is_dynamic=%d",
287 this, new_config->cluster().c_str(), new_config->is_dynamic());
288 }
289 GPR_ASSERT(new_config != nullptr);
290 // Cluster name should never change, because we should use a different
291 // child name in xds_cluster_manager in that case.
292 if (cluster_name_.empty()) {
293 cluster_name_ = new_config->cluster();
294 } else {
295 GPR_ASSERT(cluster_name_ == new_config->cluster());
296 }
297 // Start dynamic subscription if needed.
298 if (new_config->is_dynamic() && subscription_ == nullptr) {
299 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
300 gpr_log(GPR_INFO,
301 "[cdslb %p] obtaining dynamic subscription for cluster %s", this,
302 cluster_name_.c_str());
303 }
304 auto* dependency_mgr = args.args.GetObject<XdsDependencyManager>();
305 if (dependency_mgr == nullptr) {
306 // Should never happen.
307 absl::Status status =
308 absl::InternalError("xDS dependency mgr not passed to CDS LB policy");
309 ReportTransientFailure(status);
310 return status;
311 }
312 subscription_ = dependency_mgr->GetClusterSubscription(cluster_name_);
313 }
314 // Get xDS config.
315 auto new_xds_config = args.args.GetObjectRef<XdsConfig>();
316 if (new_xds_config == nullptr) {
317 // Should never happen.
318 absl::Status status =
319 absl::InternalError("xDS config not passed to CDS LB policy");
320 ReportTransientFailure(status);
321 return status;
322 }
323 auto it = new_xds_config->clusters.find(cluster_name_);
324 if (it == new_xds_config->clusters.end()) {
325 // Cluster not present.
326 if (new_config->is_dynamic()) {
327 // If we are already subscribed, it's possible that we just
328 // recently subscribed but another update came through before we
329 // got the new cluster, in which case it will still be missing.
330 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
331 gpr_log(GPR_INFO,
332 "[cdslb %p] xDS config has no entry for dynamic cluster %s, "
333 "waiting for subsequent update",
334 this, cluster_name_.c_str());
335 }
336 // Stay in CONNECTING until we get an update that has the cluster.
337 return absl::OkStatus();
338 }
339 // Not a dynamic cluster. This should never happen.
340 absl::Status status = absl::UnavailableError(absl::StrCat(
341 "xDS config has no entry for static cluster ", cluster_name_));
342 ReportTransientFailure(status);
343 return status;
344 }
345 auto& new_cluster_config = it->second;
346 // If new list is not OK, report TRANSIENT_FAILURE.
347 if (!new_cluster_config.ok()) {
348 ReportTransientFailure(new_cluster_config.status());
349 return new_cluster_config.status();
350 }
351 GPR_ASSERT(new_cluster_config->cluster != nullptr);
352 // Find old cluster, if any.
353 const XdsConfig::ClusterConfig* old_cluster_config = nullptr;
354 if (xds_config_ != nullptr) {
355 auto it_old = xds_config_->clusters.find(cluster_name_);
356 if (it_old != xds_config_->clusters.end() && it_old->second.ok()) {
357 old_cluster_config = &*it_old->second;
358 // If nothing changed for a leaf cluster, then ignore the update.
359 // Can't do this for an aggregate cluster, because even if the aggregate
360 // cluster itself didn't change, the leaf clusters may have changed.
361 if (*new_cluster_config == *old_cluster_config &&
362 absl::holds_alternative<XdsConfig::ClusterConfig::EndpointConfig>(
363 new_cluster_config->children)) {
364 return absl::OkStatus();
365 }
366 }
367 }
368 // TODO(roth): Remove this after the 1.63 release.
369 const XdsClusterResource* aggregate_cluster_resource = nullptr;
370 static constexpr absl::string_view kArgXdsAggregateClusterName =
371 GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_aggregate_cluster_name";
372 if (XdsAggregateClusterBackwardCompatibilityEnabled()) {
373 if (absl::holds_alternative<XdsConfig::ClusterConfig::EndpointConfig>(
374 new_cluster_config->children)) {
375 auto aggregate_cluster = args.args.GetString(kArgXdsAggregateClusterName);
376 if (aggregate_cluster.has_value()) {
377 auto it = new_xds_config->clusters.find(*aggregate_cluster);
378 if (it == new_xds_config->clusters.end()) {
379 // Cluster not present. This should never happen.
380 absl::Status status = absl::UnavailableError(
381 absl::StrCat("xDS config has no entry for aggregate cluster ",
382 *aggregate_cluster));
383 ReportTransientFailure(status);
384 return status;
385 }
386 auto& aggregate_cluster_config = it->second;
387 if (!aggregate_cluster_config.ok()) {
388 ReportTransientFailure(aggregate_cluster_config.status());
389 return aggregate_cluster_config.status();
390 }
391 GPR_ASSERT(aggregate_cluster_config->cluster != nullptr);
392 aggregate_cluster_resource = aggregate_cluster_config->cluster.get();
393 }
394 } else {
395 args.args = args.args.Set(kArgXdsAggregateClusterName, cluster_name_);
396 }
397 }
398 // Construct child policy config and update state based on the cluster type.
399 Json child_policy_config_json;
400 UpdateArgs update_args;
401 Match(
402 new_cluster_config->children,
403 // Leaf cluster.
404 [&](const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) {
405 // Compute new child numbers.
406 child_name_state_ = ComputeChildNames(
407 old_cluster_config, *new_cluster_config, endpoint_config);
408 // Populate addresses and resolution_note for child policy.
409 update_args.addresses = std::make_shared<PriorityEndpointIterator>(
410 cluster_name_, endpoint_config.endpoints,
411 child_name_state_.priority_child_numbers);
412 update_args.resolution_note = endpoint_config.resolution_note;
413 // Construct child policy config.
414 child_policy_config_json = CreateChildPolicyConfigForLeafCluster(
415 *new_cluster_config, endpoint_config, aggregate_cluster_resource);
416 },
417 // Aggregate cluster.
418 [&](const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config) {
419 child_name_state_.Reset();
420 // Construct child policy config.
421 child_policy_config_json =
422 CreateChildPolicyConfigForAggregateCluster(aggregate_config);
423 });
424 // Swap in new xDS config, now that we're done with the old one.
425 xds_config_ = std::move(new_xds_config);
426 // Validate child policy config.
427 auto child_config =
428 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
429 child_policy_config_json);
430 if (!child_config.ok()) {
431 // Should never happen.
432 absl::Status status = absl::InternalError(
433 absl::StrCat(cluster_name_, ": error parsing child policy config: ",
434 child_config.status().message()));
435 ReportTransientFailure(status);
436 return status;
437 }
438 // Create child policy if not already present.
439 if (child_policy_ == nullptr) {
440 LoadBalancingPolicy::Args lb_args;
441 lb_args.work_serializer = work_serializer();
442 lb_args.args = args.args;
443 lb_args.channel_control_helper =
444 std::make_unique<Helper>(RefAsSubclass<CdsLb>());
445 child_policy_ =
446 CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
447 (*child_config)->name(), std::move(lb_args));
448 if (child_policy_ == nullptr) {
449 // Should never happen.
450 absl::Status status = absl::UnavailableError(
451 absl::StrCat(cluster_name_, ": failed to create child policy"));
452 ReportTransientFailure(status);
453 return status;
454 }
455 grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
456 interested_parties());
457 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
458 gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this,
459 std::string((*child_config)->name()).c_str(),
460 child_policy_.get());
461 }
462 }
463 // Update child policy.
464 update_args.config = std::move(*child_config);
465 update_args.args = args.args;
466 return child_policy_->UpdateLocked(std::move(update_args));
467 }
468
ComputeChildNames(const XdsConfig::ClusterConfig * old_cluster,const XdsConfig::ClusterConfig & new_cluster,const XdsConfig::ClusterConfig::EndpointConfig & endpoint_config) const469 CdsLb::ChildNameState CdsLb::ComputeChildNames(
470 const XdsConfig::ClusterConfig* old_cluster,
471 const XdsConfig::ClusterConfig& new_cluster,
472 const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) const {
473 GPR_ASSERT(
474 !absl::holds_alternative<XdsConfig::ClusterConfig::AggregateConfig>(
475 new_cluster.children));
476 // First, build some maps from locality to child number and the reverse
477 // from old_cluster and child_name_state_.
478 std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
479 locality_child_map;
480 std::map<size_t, std::set<XdsLocalityName*, XdsLocalityName::Less>>
481 child_locality_map;
482 if (old_cluster != nullptr) {
483 auto* old_endpoint_config =
484 absl::get_if<XdsConfig::ClusterConfig::EndpointConfig>(
485 &old_cluster->children);
486 if (old_endpoint_config != nullptr) {
487 const auto& prev_priority_list =
488 GetUpdatePriorityList(old_endpoint_config->endpoints.get());
489 for (size_t priority = 0; priority < prev_priority_list.size();
490 ++priority) {
491 size_t child_number =
492 child_name_state_.priority_child_numbers[priority];
493 const auto& localities = prev_priority_list[priority].localities;
494 for (const auto& p : localities) {
495 XdsLocalityName* locality_name = p.first;
496 locality_child_map[locality_name] = child_number;
497 child_locality_map[child_number].insert(locality_name);
498 }
499 }
500 }
501 }
502 // Now construct new state containing priority child numbers for the new
503 // cluster based on the maps constructed above.
504 ChildNameState new_child_name_state;
505 new_child_name_state.next_available_child_number =
506 child_name_state_.next_available_child_number;
507 const XdsEndpointResource::PriorityList& priority_list =
508 GetUpdatePriorityList(endpoint_config.endpoints.get());
509 for (size_t priority = 0; priority < priority_list.size(); ++priority) {
510 const auto& localities = priority_list[priority].localities;
511 absl::optional<size_t> child_number;
512 // If one of the localities in this priority already existed, reuse its
513 // child number.
514 for (const auto& p : localities) {
515 XdsLocalityName* locality_name = p.first;
516 if (!child_number.has_value()) {
517 auto it = locality_child_map.find(locality_name);
518 if (it != locality_child_map.end()) {
519 child_number = it->second;
520 locality_child_map.erase(it);
521 // Remove localities that *used* to be in this child number, so
522 // that we don't incorrectly reuse this child number for a
523 // subsequent priority.
524 for (XdsLocalityName* old_locality :
525 child_locality_map[*child_number]) {
526 locality_child_map.erase(old_locality);
527 }
528 }
529 } else {
530 // Remove all localities that are now in this child number, so
531 // that we don't accidentally reuse this child number for a
532 // subsequent priority.
533 locality_child_map.erase(locality_name);
534 }
535 }
536 // If we didn't find an existing child number, assign a new one.
537 if (!child_number.has_value()) {
538 for (child_number = new_child_name_state.next_available_child_number;
539 child_locality_map.find(*child_number) != child_locality_map.end();
540 ++(*child_number)) {
541 }
542 new_child_name_state.next_available_child_number = *child_number + 1;
543 // Add entry so we know that the child number is in use.
544 // (Don't need to add the list of localities, since we won't use them.)
545 child_locality_map[*child_number];
546 }
547 new_child_name_state.priority_child_numbers.push_back(*child_number);
548 }
549 return new_child_name_state;
550 }
551
CreateChildPolicyConfigForLeafCluster(const XdsConfig::ClusterConfig & new_cluster,const XdsConfig::ClusterConfig::EndpointConfig & endpoint_config,const XdsClusterResource * aggregate_cluster_resource)552 Json CdsLb::CreateChildPolicyConfigForLeafCluster(
553 const XdsConfig::ClusterConfig& new_cluster,
554 const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config,
555 const XdsClusterResource* aggregate_cluster_resource) {
556 const auto& cluster_resource = *new_cluster.cluster;
557 const bool is_logical_dns =
558 absl::holds_alternative<XdsClusterResource::LogicalDns>(
559 cluster_resource.type);
560 // Determine what xDS LB policy to use.
561 Json xds_lb_policy;
562 if (is_logical_dns) {
563 xds_lb_policy = Json::FromArray({
564 Json::FromObject({
565 {"pick_first", Json::FromObject({})},
566 }),
567 });
568 }
569 // TODO(roth): Remove this "else if" block after the 1.63 release.
570 else if (XdsAggregateClusterBackwardCompatibilityEnabled() &&
571 aggregate_cluster_resource != nullptr) {
572 xds_lb_policy =
573 Json::FromArray(aggregate_cluster_resource->lb_policy_config);
574 } else {
575 xds_lb_policy = Json::FromArray(new_cluster.cluster->lb_policy_config);
576 }
577 // Wrap it in the priority policy.
578 Json::Object priority_children;
579 Json::Array priority_priorities;
580 const auto& priority_list =
581 GetUpdatePriorityList(endpoint_config.endpoints.get());
582 for (size_t priority = 0; priority < priority_list.size(); ++priority) {
583 // Add priority entry, with the appropriate child name.
584 std::string child_name = MakeChildPolicyName(
585 cluster_name_, child_name_state_.priority_child_numbers[priority]);
586 priority_priorities.emplace_back(Json::FromString(child_name));
587 Json::Object child_config = {{"config", xds_lb_policy}};
588 if (!is_logical_dns) {
589 child_config["ignore_reresolution_requests"] = Json::FromBool(true);
590 }
591 priority_children[child_name] = Json::FromObject(std::move(child_config));
592 }
593 Json priority_policy = Json::FromArray({Json::FromObject({
594 {"priority_experimental",
595 Json::FromObject({
596 {"children", Json::FromObject(std::move(priority_children))},
597 {"priorities", Json::FromArray(std::move(priority_priorities))},
598 })},
599 })});
600 // Wrap the priority policy in the xds_override_host policy.
601 Json xds_override_host_policy = Json::FromArray({Json::FromObject({
602 {"xds_override_host_experimental",
603 Json::FromObject({
604 {"clusterName", Json::FromString(cluster_name_)},
605 {"childPolicy", std::move(priority_policy)},
606 })},
607 })});
608 // Wrap the xds_override_host policy in the xds_cluster_impl policy.
609 Json xds_cluster_impl_policy = Json::FromArray({Json::FromObject({
610 {"xds_cluster_impl_experimental",
611 Json::FromObject({
612 {"clusterName", Json::FromString(cluster_name_)},
613 {"childPolicy", std::move(xds_override_host_policy)},
614 })},
615 })});
616 // Wrap the xds_cluster_impl policy in the outlier_detection policy.
617 Json::Object outlier_detection_config = {
618 {"childPolicy", std::move(xds_cluster_impl_policy)},
619 };
620 if (cluster_resource.outlier_detection.has_value()) {
621 auto& outlier_detection_update = *cluster_resource.outlier_detection;
622 outlier_detection_config["interval"] =
623 Json::FromString(outlier_detection_update.interval.ToJsonString());
624 outlier_detection_config["baseEjectionTime"] = Json::FromString(
625 outlier_detection_update.base_ejection_time.ToJsonString());
626 outlier_detection_config["maxEjectionTime"] = Json::FromString(
627 outlier_detection_update.max_ejection_time.ToJsonString());
628 outlier_detection_config["maxEjectionPercent"] =
629 Json::FromNumber(outlier_detection_update.max_ejection_percent);
630 if (outlier_detection_update.success_rate_ejection.has_value()) {
631 outlier_detection_config["successRateEjection"] = Json::FromObject({
632 {"stdevFactor",
633 Json::FromNumber(
634 outlier_detection_update.success_rate_ejection->stdev_factor)},
635 {"enforcementPercentage",
636 Json::FromNumber(outlier_detection_update.success_rate_ejection
637 ->enforcement_percentage)},
638 {"minimumHosts",
639 Json::FromNumber(
640 outlier_detection_update.success_rate_ejection->minimum_hosts)},
641 {"requestVolume",
642 Json::FromNumber(
643 outlier_detection_update.success_rate_ejection->request_volume)},
644 });
645 }
646 if (outlier_detection_update.failure_percentage_ejection.has_value()) {
647 outlier_detection_config["failurePercentageEjection"] = Json::FromObject({
648 {"threshold",
649 Json::FromNumber(outlier_detection_update
650 .failure_percentage_ejection->threshold)},
651 {"enforcementPercentage",
652 Json::FromNumber(
653 outlier_detection_update.failure_percentage_ejection
654 ->enforcement_percentage)},
655 {"minimumHosts",
656 Json::FromNumber(outlier_detection_update
657 .failure_percentage_ejection->minimum_hosts)},
658 {"requestVolume",
659 Json::FromNumber(outlier_detection_update
660 .failure_percentage_ejection->request_volume)},
661 });
662 }
663 }
664 Json outlier_detection_policy = Json::FromArray({Json::FromObject({
665 {"outlier_detection_experimental",
666 Json::FromObject(std::move(outlier_detection_config))},
667 })});
668 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
669 gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this,
670 JsonDump(outlier_detection_policy, /*indent=*/1).c_str());
671 }
672 return outlier_detection_policy;
673 }
674
CreateChildPolicyConfigForAggregateCluster(const XdsConfig::ClusterConfig::AggregateConfig & aggregate_config)675 Json CdsLb::CreateChildPolicyConfigForAggregateCluster(
676 const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config) {
677 Json::Object priority_children;
678 Json::Array priority_priorities;
679 for (const absl::string_view& leaf_cluster : aggregate_config.leaf_clusters) {
680 priority_children[std::string(leaf_cluster)] = Json::FromObject({
681 {"config",
682 Json::FromArray({
683 Json::FromObject({
684 {"cds_experimental",
685 Json::FromObject({
686 {"cluster", Json::FromString(std::string(leaf_cluster))},
687 })},
688 }),
689 })},
690 });
691 priority_priorities.emplace_back(
692 Json::FromString(std::string(leaf_cluster)));
693 }
694 Json json = Json::FromArray({Json::FromObject({
695 {"priority_experimental",
696 Json::FromObject({
697 {"children", Json::FromObject(std::move(priority_children))},
698 {"priorities", Json::FromArray(std::move(priority_priorities))},
699 })},
700 })});
701 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
702 gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this,
703 JsonDump(json, /*indent=*/1).c_str());
704 }
705 return json;
706 }
707
ResetState()708 void CdsLb::ResetState() {
709 cluster_name_.clear();
710 xds_config_.reset();
711 child_name_state_.Reset();
712 if (child_policy_ != nullptr) {
713 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
714 interested_parties());
715 child_policy_.reset();
716 }
717 }
718
ReportTransientFailure(absl::Status status)719 void CdsLb::ReportTransientFailure(absl::Status status) {
720 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
721 gpr_log(GPR_INFO, "[cdslb %p] reporting TRANSIENT_FAILURE: %s", this,
722 status.ToString().c_str());
723 }
724 ResetState();
725 channel_control_helper()->UpdateState(
726 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
727 MakeRefCounted<TransientFailurePicker>(status));
728 }
729
730 //
731 // factory
732 //
733
734 class CdsLbFactory final : public LoadBalancingPolicyFactory {
735 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const736 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
737 LoadBalancingPolicy::Args args) const override {
738 return MakeOrphanable<CdsLb>(std::move(args));
739 }
740
name() const741 absl::string_view name() const override { return kCds; }
742
743 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const744 ParseLoadBalancingConfig(const Json& json) const override {
745 return LoadFromJson<RefCountedPtr<CdsLbConfig>>(
746 json, JsonArgs(), "errors validating cds LB policy config");
747 }
748 };
749
750 } // namespace
751
RegisterCdsLbPolicy(CoreConfiguration::Builder * builder)752 void RegisterCdsLbPolicy(CoreConfiguration::Builder* builder) {
753 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
754 std::make_unique<CdsLbFactory>());
755 }
756
757 } // namespace grpc_core
758