1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <stddef.h>
20
21 #include <algorithm>
22 #include <functional>
23 #include <map>
24 #include <memory>
25 #include <string>
26 #include <type_traits>
27 #include <utility>
28 #include <vector>
29
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_join.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36
37 #include <grpc/event_engine/event_engine.h>
38 #include <grpc/impl/connectivity_state.h>
39 #include <grpc/support/log.h>
40
41 #include "src/core/client_channel/client_channel_internal.h"
42 #include "src/core/load_balancing/child_policy_handler.h"
43 #include "src/core/lib/channel/channel_args.h"
44 #include "src/core/lib/config/core_configuration.h"
45 #include "src/core/lib/debug/trace.h"
46 #include "src/core/lib/gprpp/debug_location.h"
47 #include "src/core/lib/gprpp/orphanable.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/gprpp/time.h"
50 #include "src/core/lib/gprpp/validation_errors.h"
51 #include "src/core/lib/gprpp/work_serializer.h"
52 #include "src/core/lib/iomgr/exec_ctx.h"
53 #include "src/core/lib/iomgr/pollset_set.h"
54 #include "src/core/lib/json/json.h"
55 #include "src/core/lib/json/json_args.h"
56 #include "src/core/lib/json/json_object_loader.h"
57 #include "src/core/lib/transport/connectivity_state.h"
58 #include "src/core/load_balancing/delegating_helper.h"
59 #include "src/core/load_balancing/lb_policy.h"
60 #include "src/core/load_balancing/lb_policy_factory.h"
61 #include "src/core/load_balancing/lb_policy_registry.h"
62 #include "src/core/resolver/endpoint_addresses.h"
63 #include "src/core/resolver/xds/xds_resolver_attributes.h"
64
65 namespace grpc_core {
66
67 TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb");
68
69 namespace {
70
71 using ::grpc_event_engine::experimental::EventEngine;
72
73 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
74 constexpr absl::string_view kXdsClusterManager =
75 "xds_cluster_manager_experimental";
76
77 // Config for xds_cluster_manager LB policy.
78 class XdsClusterManagerLbConfig final : public LoadBalancingPolicy::Config {
79 public:
80 struct Child {
81 RefCountedPtr<LoadBalancingPolicy::Config> config;
82
83 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
84 void JsonPostLoad(const Json& json, const JsonArgs&,
85 ValidationErrors* errors);
86 };
87
88 XdsClusterManagerLbConfig() = default;
89
90 XdsClusterManagerLbConfig(const XdsClusterManagerLbConfig&) = delete;
91 XdsClusterManagerLbConfig& operator=(const XdsClusterManagerLbConfig&) =
92 delete;
93
94 XdsClusterManagerLbConfig(XdsClusterManagerLbConfig&& other) = delete;
95 XdsClusterManagerLbConfig& operator=(XdsClusterManagerLbConfig&& other) =
96 delete;
97
name() const98 absl::string_view name() const override { return kXdsClusterManager; }
99
cluster_map() const100 const std::map<std::string, Child>& cluster_map() const {
101 return cluster_map_;
102 }
103
104 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
105
106 private:
107 std::map<std::string, Child> cluster_map_;
108 };
109
110 // xds_cluster_manager LB policy.
111 class XdsClusterManagerLb final : public LoadBalancingPolicy {
112 public:
113 explicit XdsClusterManagerLb(Args args);
114
name() const115 absl::string_view name() const override { return kXdsClusterManager; }
116
117 absl::Status UpdateLocked(UpdateArgs args) override;
118 void ExitIdleLocked() override;
119 void ResetBackoffLocked() override;
120
121 private:
122 // Picks a child using prefix or path matching and then delegates to that
123 // child's picker.
124 class ClusterPicker final : public SubchannelPicker {
125 public:
126 // Maintains a map of cluster names to pickers.
127 using ClusterMap = std::map<std::string /*cluster_name*/,
128 RefCountedPtr<SubchannelPicker>, std::less<>>;
129
130 // It is required that the keys of cluster_map have to live at least as long
131 // as the ClusterPicker instance.
ClusterPicker(ClusterMap cluster_map)132 explicit ClusterPicker(ClusterMap cluster_map)
133 : cluster_map_(std::move(cluster_map)) {}
134
135 PickResult Pick(PickArgs args) override;
136
137 private:
138 ClusterMap cluster_map_;
139 };
140
141 // Each ClusterChild holds a ref to its parent XdsClusterManagerLb.
142 class ClusterChild final : public InternallyRefCounted<ClusterChild> {
143 public:
144 ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
145 const std::string& name);
146 ~ClusterChild() override;
147
148 void Orphan() override;
149
150 absl::Status UpdateLocked(
151 RefCountedPtr<LoadBalancingPolicy::Config> config,
152 const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>&
153 addresses,
154 const ChannelArgs& args);
155 void ExitIdleLocked();
156 void ResetBackoffLocked();
157 void DeactivateLocked();
158
connectivity_state() const159 grpc_connectivity_state connectivity_state() const {
160 return connectivity_state_;
161 }
picker() const162 RefCountedPtr<SubchannelPicker> picker() const { return picker_; }
163
164 private:
165 class Helper final : public DelegatingChannelControlHelper {
166 public:
Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)167 explicit Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)
168 : xds_cluster_manager_child_(std::move(xds_cluster_manager_child)) {}
169
~Helper()170 ~Helper() override {
171 xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper");
172 }
173
174 void UpdateState(grpc_connectivity_state state,
175 const absl::Status& status,
176 RefCountedPtr<SubchannelPicker> picker) override;
177
178 private:
parent_helper() const179 ChannelControlHelper* parent_helper() const override {
180 return xds_cluster_manager_child_->xds_cluster_manager_policy_
181 ->channel_control_helper();
182 }
183
184 RefCountedPtr<ClusterChild> xds_cluster_manager_child_;
185 };
186
187 // Methods for dealing with the child policy.
188 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
189 const ChannelArgs& args);
190
191 void OnDelayedRemovalTimerLocked();
192
193 // The owning LB policy.
194 RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy_;
195
196 // Points to the corresponding key in children map.
197 const std::string name_;
198
199 OrphanablePtr<LoadBalancingPolicy> child_policy_;
200
201 RefCountedPtr<SubchannelPicker> picker_;
202 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
203
204 // States for delayed removal.
205 absl::optional<EventEngine::TaskHandle> delayed_removal_timer_handle_;
206 bool shutdown_ = false;
207 };
208
209 ~XdsClusterManagerLb() override;
210
211 void ShutdownLocked() override;
212
213 void UpdateStateLocked();
214
215 // Current config from the resolver.
216 RefCountedPtr<XdsClusterManagerLbConfig> config_;
217
218 // Internal state.
219 bool shutting_down_ = false;
220 bool update_in_progress_ = false;
221
222 // Children.
223 std::map<std::string, OrphanablePtr<ClusterChild>> children_;
224 };
225
226 //
227 // XdsClusterManagerLb::ClusterPicker
228 //
229
Pick(PickArgs args)230 XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
231 PickArgs args) {
232 auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
233 auto* cluster_name_attribute = static_cast<XdsClusterAttribute*>(
234 call_state->GetCallAttribute(XdsClusterAttribute::TypeName()));
235 absl::string_view cluster_name;
236 if (cluster_name_attribute != nullptr) {
237 cluster_name = cluster_name_attribute->cluster();
238 }
239 auto it = cluster_map_.find(cluster_name);
240 if (it != cluster_map_.end()) {
241 return it->second->Pick(args);
242 }
243 return PickResult::Fail(absl::InternalError(absl::StrCat(
244 "xds cluster manager picker: unknown cluster \"", cluster_name, "\"")));
245 }
246
247 //
248 // XdsClusterManagerLb
249 //
250
XdsClusterManagerLb(Args args)251 XdsClusterManagerLb::XdsClusterManagerLb(Args args)
252 : LoadBalancingPolicy(std::move(args)) {}
253
~XdsClusterManagerLb()254 XdsClusterManagerLb::~XdsClusterManagerLb() {
255 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
256 gpr_log(
257 GPR_INFO,
258 "[xds_cluster_manager_lb %p] destroying xds_cluster_manager LB policy",
259 this);
260 }
261 }
262
ShutdownLocked()263 void XdsClusterManagerLb::ShutdownLocked() {
264 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
265 gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] shutting down", this);
266 }
267 shutting_down_ = true;
268 children_.clear();
269 }
270
ExitIdleLocked()271 void XdsClusterManagerLb::ExitIdleLocked() {
272 for (auto& p : children_) p.second->ExitIdleLocked();
273 }
274
ResetBackoffLocked()275 void XdsClusterManagerLb::ResetBackoffLocked() {
276 for (auto& p : children_) p.second->ResetBackoffLocked();
277 }
278
UpdateLocked(UpdateArgs args)279 absl::Status XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
280 if (shutting_down_) return absl::OkStatus();
281 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
282 gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this);
283 }
284 update_in_progress_ = true;
285 // Update config.
286 config_ = args.config.TakeAsSubclass<XdsClusterManagerLbConfig>();
287 // Deactivate the children not in the new config.
288 for (const auto& p : children_) {
289 const std::string& name = p.first;
290 ClusterChild* child = p.second.get();
291 if (config_->cluster_map().find(name) == config_->cluster_map().end()) {
292 child->DeactivateLocked();
293 }
294 }
295 // Add or update the children in the new config.
296 std::vector<std::string> errors;
297 for (const auto& p : config_->cluster_map()) {
298 const std::string& name = p.first;
299 const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second.config;
300 auto& child = children_[name];
301 if (child == nullptr) {
302 child = MakeOrphanable<ClusterChild>(
303 RefAsSubclass<XdsClusterManagerLb>(DEBUG_LOCATION, "ClusterChild"),
304 name);
305 }
306 absl::Status status =
307 child->UpdateLocked(config, args.addresses, args.args);
308 if (!status.ok()) {
309 errors.emplace_back(
310 absl::StrCat("child ", name, ": ", status.ToString()));
311 }
312 }
313 update_in_progress_ = false;
314 UpdateStateLocked();
315 // Return status.
316 if (!errors.empty()) {
317 return absl::UnavailableError(absl::StrCat(
318 "errors from children: [", absl::StrJoin(errors, "; "), "]"));
319 }
320 return absl::OkStatus();
321 }
322
UpdateStateLocked()323 void XdsClusterManagerLb::UpdateStateLocked() {
324 // If we're in the process of propagating an update from our parent to
325 // our children, ignore any updates that come from the children. We
326 // will instead return a new picker once the update has been seen by
327 // all children. This avoids unnecessary picker churn while an update
328 // is being propagated to our children.
329 if (update_in_progress_) return;
330 // Also count the number of children in each state, to determine the
331 // overall state.
332 size_t num_ready = 0;
333 size_t num_connecting = 0;
334 size_t num_idle = 0;
335 for (const auto& p : children_) {
336 const auto& child_name = p.first;
337 const ClusterChild* child = p.second.get();
338 // Skip the children that are not in the latest update.
339 if (config_->cluster_map().find(child_name) ==
340 config_->cluster_map().end()) {
341 continue;
342 }
343 switch (child->connectivity_state()) {
344 case GRPC_CHANNEL_READY: {
345 ++num_ready;
346 break;
347 }
348 case GRPC_CHANNEL_CONNECTING: {
349 ++num_connecting;
350 break;
351 }
352 case GRPC_CHANNEL_IDLE: {
353 ++num_idle;
354 break;
355 }
356 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
357 break;
358 }
359 default:
360 GPR_UNREACHABLE_CODE(return);
361 }
362 }
363 // Determine aggregated connectivity state.
364 grpc_connectivity_state connectivity_state;
365 if (num_ready > 0) {
366 connectivity_state = GRPC_CHANNEL_READY;
367 } else if (num_connecting > 0) {
368 connectivity_state = GRPC_CHANNEL_CONNECTING;
369 } else if (num_idle > 0) {
370 connectivity_state = GRPC_CHANNEL_IDLE;
371 } else {
372 connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
373 }
374 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
375 gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] connectivity changed to %s",
376 this, ConnectivityStateName(connectivity_state));
377 }
378 ClusterPicker::ClusterMap cluster_map;
379 for (const auto& p : config_->cluster_map()) {
380 const std::string& cluster_name = p.first;
381 RefCountedPtr<SubchannelPicker>& child_picker = cluster_map[cluster_name];
382 child_picker = children_[cluster_name]->picker();
383 if (child_picker == nullptr) {
384 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
385 gpr_log(GPR_INFO,
386 "[xds_cluster_manager_lb %p] child %s has not yet returned a "
387 "picker; creating a QueuePicker.",
388 this, cluster_name.c_str());
389 }
390 child_picker =
391 MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
392 }
393 }
394 auto picker = MakeRefCounted<ClusterPicker>(std::move(cluster_map));
395 absl::Status status;
396 if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
397 status = absl::Status(absl::StatusCode::kUnavailable,
398 "TRANSIENT_FAILURE from XdsClusterManagerLb");
399 }
400 channel_control_helper()->UpdateState(connectivity_state, status,
401 std::move(picker));
402 }
403
404 //
405 // XdsClusterManagerLb::ClusterChild
406 //
407
ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,const std::string & name)408 XdsClusterManagerLb::ClusterChild::ClusterChild(
409 RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
410 const std::string& name)
411 : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
412 name_(name),
413 picker_(MakeRefCounted<QueuePicker>(nullptr)) {
414 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
415 gpr_log(GPR_INFO,
416 "[xds_cluster_manager_lb %p] created ClusterChild %p for %s",
417 xds_cluster_manager_policy_.get(), this, name_.c_str());
418 }
419 }
420
~ClusterChild()421 XdsClusterManagerLb::ClusterChild::~ClusterChild() {
422 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
423 gpr_log(GPR_INFO,
424 "[xds_cluster_manager_lb %p] ClusterChild %p: destroying "
425 "child",
426 xds_cluster_manager_policy_.get(), this);
427 }
428 xds_cluster_manager_policy_.reset(DEBUG_LOCATION, "ClusterChild");
429 }
430
Orphan()431 void XdsClusterManagerLb::ClusterChild::Orphan() {
432 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
433 gpr_log(GPR_INFO,
434 "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
435 "shutting down child",
436 xds_cluster_manager_policy_.get(), this, name_.c_str());
437 }
438 // Remove the child policy's interested_parties pollset_set from the
439 // xDS policy.
440 grpc_pollset_set_del_pollset_set(
441 child_policy_->interested_parties(),
442 xds_cluster_manager_policy_->interested_parties());
443 child_policy_.reset();
444 // Drop our ref to the child's picker, in case it's holding a ref to
445 // the child.
446 picker_.reset();
447 if (delayed_removal_timer_handle_.has_value()) {
448 xds_cluster_manager_policy_->channel_control_helper()
449 ->GetEventEngine()
450 ->Cancel(*delayed_removal_timer_handle_);
451 }
452 shutdown_ = true;
453 Unref();
454 }
455
456 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)457 XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
458 const ChannelArgs& args) {
459 LoadBalancingPolicy::Args lb_policy_args;
460 lb_policy_args.work_serializer =
461 xds_cluster_manager_policy_->work_serializer();
462 lb_policy_args.args = args;
463 lb_policy_args.channel_control_helper =
464 std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
465 OrphanablePtr<LoadBalancingPolicy> lb_policy =
466 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
467 &grpc_xds_cluster_manager_lb_trace);
468 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
469 gpr_log(GPR_INFO,
470 "[xds_cluster_manager_lb %p] ClusterChild %p %s: Created "
471 "new child "
472 "policy handler %p",
473 xds_cluster_manager_policy_.get(), this, name_.c_str(),
474 lb_policy.get());
475 }
476 // Add the xDS's interested_parties pollset_set to that of the newly created
477 // child policy. This will make the child policy progress upon activity on
478 // xDS LB, which in turn is tied to the application's call.
479 grpc_pollset_set_add_pollset_set(
480 lb_policy->interested_parties(),
481 xds_cluster_manager_policy_->interested_parties());
482 return lb_policy;
483 }
484
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> & addresses,const ChannelArgs & args)485 absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked(
486 RefCountedPtr<LoadBalancingPolicy::Config> config,
487 const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>& addresses,
488 const ChannelArgs& args) {
489 if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus();
490 // Update child weight.
491 // Reactivate if needed.
492 if (delayed_removal_timer_handle_.has_value() &&
493 xds_cluster_manager_policy_->channel_control_helper()
494 ->GetEventEngine()
495 ->Cancel(*delayed_removal_timer_handle_)) {
496 delayed_removal_timer_handle_.reset();
497 }
498 // Create child policy if needed.
499 if (child_policy_ == nullptr) {
500 child_policy_ = CreateChildPolicyLocked(args);
501 }
502 // Construct update args.
503 UpdateArgs update_args;
504 update_args.config = std::move(config);
505 update_args.addresses = addresses;
506 update_args.args = args;
507 // Update the policy.
508 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
509 gpr_log(GPR_INFO,
510 "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
511 "Updating child "
512 "policy handler %p",
513 xds_cluster_manager_policy_.get(), this, name_.c_str(),
514 child_policy_.get());
515 }
516 return child_policy_->UpdateLocked(std::move(update_args));
517 }
518
ExitIdleLocked()519 void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() {
520 child_policy_->ExitIdleLocked();
521 }
522
ResetBackoffLocked()523 void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
524 child_policy_->ResetBackoffLocked();
525 }
526
DeactivateLocked()527 void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
528 // If already deactivated, don't do that again.
529 if (delayed_removal_timer_handle_.has_value()) return;
530 // Start a timer to delete the child.
531 delayed_removal_timer_handle_ =
532 xds_cluster_manager_policy_->channel_control_helper()
533 ->GetEventEngine()
534 ->RunAfter(
535 kChildRetentionInterval,
536 [self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable {
537 ApplicationCallbackExecCtx application_exec_ctx;
538 ExecCtx exec_ctx;
539 auto* self_ptr = self.get(); // Avoid use-after-move problem.
540 self_ptr->xds_cluster_manager_policy_->work_serializer()->Run(
541 [self = std::move(self)]() {
542 self->OnDelayedRemovalTimerLocked();
543 },
544 DEBUG_LOCATION);
545 });
546 }
547
OnDelayedRemovalTimerLocked()548 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked() {
549 delayed_removal_timer_handle_.reset();
550 if (!shutdown_) {
551 xds_cluster_manager_policy_->children_.erase(name_);
552 }
553 }
554
555 //
556 // XdsClusterManagerLb::ClusterChild::Helper
557 //
558
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)559 void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
560 grpc_connectivity_state state, const absl::Status& status,
561 RefCountedPtr<SubchannelPicker> picker) {
562 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
563 gpr_log(
564 GPR_INFO,
565 "[xds_cluster_manager_lb %p] child %s: received update: state=%s (%s) "
566 "picker=%p",
567 xds_cluster_manager_child_->xds_cluster_manager_policy_.get(),
568 xds_cluster_manager_child_->name_.c_str(), ConnectivityStateName(state),
569 status.ToString().c_str(), picker.get());
570 }
571 if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
572 return;
573 }
574 // Cache the picker in the ClusterChild.
575 xds_cluster_manager_child_->picker_ = std::move(picker);
576 // Decide what state to report for aggregation purposes.
577 // If the last recorded state was TRANSIENT_FAILURE and the new state
578 // is something other than READY, don't change the state.
579 if (xds_cluster_manager_child_->connectivity_state_ !=
580 GRPC_CHANNEL_TRANSIENT_FAILURE ||
581 state == GRPC_CHANNEL_READY) {
582 xds_cluster_manager_child_->connectivity_state_ = state;
583 }
584 // Notify the LB policy.
585 xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked();
586 }
587
588 //
589 // factory
590 //
591
JsonLoader(const JsonArgs &)592 const JsonLoaderInterface* XdsClusterManagerLbConfig::Child::JsonLoader(
593 const JsonArgs&) {
594 // Note: The "childPolicy" field requires custom processing, so
595 // it's handled in JsonPostLoad() instead.
596 static const auto* loader = JsonObjectLoader<Child>().Finish();
597 return loader;
598 }
599
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)600 void XdsClusterManagerLbConfig::Child::JsonPostLoad(const Json& json,
601 const JsonArgs&,
602 ValidationErrors* errors) {
603 ValidationErrors::ScopedField field(errors, ".childPolicy");
604 auto it = json.object().find("childPolicy");
605 if (it == json.object().end()) {
606 errors->AddError("field not present");
607 return;
608 }
609 auto lb_config =
610 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
611 it->second);
612 if (!lb_config.ok()) {
613 errors->AddError(lb_config.status().message());
614 return;
615 }
616 config = std::move(*lb_config);
617 }
618
JsonLoader(const JsonArgs &)619 const JsonLoaderInterface* XdsClusterManagerLbConfig::JsonLoader(
620 const JsonArgs&) {
621 static const auto* loader =
622 JsonObjectLoader<XdsClusterManagerLbConfig>()
623 .Field("children", &XdsClusterManagerLbConfig::cluster_map_)
624 .Finish();
625 return loader;
626 }
627
628 class XdsClusterManagerLbFactory final : public LoadBalancingPolicyFactory {
629 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const630 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
631 LoadBalancingPolicy::Args args) const override {
632 return MakeOrphanable<XdsClusterManagerLb>(std::move(args));
633 }
634
name() const635 absl::string_view name() const override { return kXdsClusterManager; }
636
637 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const638 ParseLoadBalancingConfig(const Json& json) const override {
639 return LoadFromJson<RefCountedPtr<XdsClusterManagerLbConfig>>(
640 json, JsonArgs(),
641 "errors validating xds_cluster_manager LB policy config");
642 }
643 };
644
645 } // namespace
646
RegisterXdsClusterManagerLbPolicy(CoreConfiguration::Builder * builder)647 void RegisterXdsClusterManagerLbPolicy(CoreConfiguration::Builder* builder) {
648 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
649 std::make_unique<XdsClusterManagerLbFactory>());
650 }
651
652 } // namespace grpc_core
653