xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/xds/xds_cluster_manager.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <stddef.h>
20 
21 #include <algorithm>
22 #include <functional>
23 #include <map>
24 #include <memory>
25 #include <string>
26 #include <type_traits>
27 #include <utility>
28 #include <vector>
29 
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_join.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36 
37 #include <grpc/event_engine/event_engine.h>
38 #include <grpc/impl/connectivity_state.h>
39 #include <grpc/support/log.h>
40 
41 #include "src/core/client_channel/client_channel_internal.h"
42 #include "src/core/load_balancing/child_policy_handler.h"
43 #include "src/core/lib/channel/channel_args.h"
44 #include "src/core/lib/config/core_configuration.h"
45 #include "src/core/lib/debug/trace.h"
46 #include "src/core/lib/gprpp/debug_location.h"
47 #include "src/core/lib/gprpp/orphanable.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/gprpp/time.h"
50 #include "src/core/lib/gprpp/validation_errors.h"
51 #include "src/core/lib/gprpp/work_serializer.h"
52 #include "src/core/lib/iomgr/exec_ctx.h"
53 #include "src/core/lib/iomgr/pollset_set.h"
54 #include "src/core/lib/json/json.h"
55 #include "src/core/lib/json/json_args.h"
56 #include "src/core/lib/json/json_object_loader.h"
57 #include "src/core/lib/transport/connectivity_state.h"
58 #include "src/core/load_balancing/delegating_helper.h"
59 #include "src/core/load_balancing/lb_policy.h"
60 #include "src/core/load_balancing/lb_policy_factory.h"
61 #include "src/core/load_balancing/lb_policy_registry.h"
62 #include "src/core/resolver/endpoint_addresses.h"
63 #include "src/core/resolver/xds/xds_resolver_attributes.h"
64 
65 namespace grpc_core {
66 
67 TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb");
68 
69 namespace {
70 
71 using ::grpc_event_engine::experimental::EventEngine;
72 
73 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
74 constexpr absl::string_view kXdsClusterManager =
75     "xds_cluster_manager_experimental";
76 
77 // Config for xds_cluster_manager LB policy.
78 class XdsClusterManagerLbConfig final : public LoadBalancingPolicy::Config {
79  public:
80   struct Child {
81     RefCountedPtr<LoadBalancingPolicy::Config> config;
82 
83     static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
84     void JsonPostLoad(const Json& json, const JsonArgs&,
85                       ValidationErrors* errors);
86   };
87 
88   XdsClusterManagerLbConfig() = default;
89 
90   XdsClusterManagerLbConfig(const XdsClusterManagerLbConfig&) = delete;
91   XdsClusterManagerLbConfig& operator=(const XdsClusterManagerLbConfig&) =
92       delete;
93 
94   XdsClusterManagerLbConfig(XdsClusterManagerLbConfig&& other) = delete;
95   XdsClusterManagerLbConfig& operator=(XdsClusterManagerLbConfig&& other) =
96       delete;
97 
name() const98   absl::string_view name() const override { return kXdsClusterManager; }
99 
cluster_map() const100   const std::map<std::string, Child>& cluster_map() const {
101     return cluster_map_;
102   }
103 
104   static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
105 
106  private:
107   std::map<std::string, Child> cluster_map_;
108 };
109 
110 // xds_cluster_manager LB policy.
111 class XdsClusterManagerLb final : public LoadBalancingPolicy {
112  public:
113   explicit XdsClusterManagerLb(Args args);
114 
name() const115   absl::string_view name() const override { return kXdsClusterManager; }
116 
117   absl::Status UpdateLocked(UpdateArgs args) override;
118   void ExitIdleLocked() override;
119   void ResetBackoffLocked() override;
120 
121  private:
122   // Picks a child using prefix or path matching and then delegates to that
123   // child's picker.
124   class ClusterPicker final : public SubchannelPicker {
125    public:
126     // Maintains a map of cluster names to pickers.
127     using ClusterMap = std::map<std::string /*cluster_name*/,
128                                 RefCountedPtr<SubchannelPicker>, std::less<>>;
129 
130     // It is required that the keys of cluster_map have to live at least as long
131     // as the ClusterPicker instance.
ClusterPicker(ClusterMap cluster_map)132     explicit ClusterPicker(ClusterMap cluster_map)
133         : cluster_map_(std::move(cluster_map)) {}
134 
135     PickResult Pick(PickArgs args) override;
136 
137    private:
138     ClusterMap cluster_map_;
139   };
140 
141   // Each ClusterChild holds a ref to its parent XdsClusterManagerLb.
142   class ClusterChild final : public InternallyRefCounted<ClusterChild> {
143    public:
144     ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
145                  const std::string& name);
146     ~ClusterChild() override;
147 
148     void Orphan() override;
149 
150     absl::Status UpdateLocked(
151         RefCountedPtr<LoadBalancingPolicy::Config> config,
152         const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>&
153             addresses,
154         const ChannelArgs& args);
155     void ExitIdleLocked();
156     void ResetBackoffLocked();
157     void DeactivateLocked();
158 
connectivity_state() const159     grpc_connectivity_state connectivity_state() const {
160       return connectivity_state_;
161     }
picker() const162     RefCountedPtr<SubchannelPicker> picker() const { return picker_; }
163 
164    private:
165     class Helper final : public DelegatingChannelControlHelper {
166      public:
Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)167       explicit Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)
168           : xds_cluster_manager_child_(std::move(xds_cluster_manager_child)) {}
169 
~Helper()170       ~Helper() override {
171         xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper");
172       }
173 
174       void UpdateState(grpc_connectivity_state state,
175                        const absl::Status& status,
176                        RefCountedPtr<SubchannelPicker> picker) override;
177 
178      private:
parent_helper() const179       ChannelControlHelper* parent_helper() const override {
180         return xds_cluster_manager_child_->xds_cluster_manager_policy_
181             ->channel_control_helper();
182       }
183 
184       RefCountedPtr<ClusterChild> xds_cluster_manager_child_;
185     };
186 
187     // Methods for dealing with the child policy.
188     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
189         const ChannelArgs& args);
190 
191     void OnDelayedRemovalTimerLocked();
192 
193     // The owning LB policy.
194     RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy_;
195 
196     // Points to the corresponding key in children map.
197     const std::string name_;
198 
199     OrphanablePtr<LoadBalancingPolicy> child_policy_;
200 
201     RefCountedPtr<SubchannelPicker> picker_;
202     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
203 
204     // States for delayed removal.
205     absl::optional<EventEngine::TaskHandle> delayed_removal_timer_handle_;
206     bool shutdown_ = false;
207   };
208 
209   ~XdsClusterManagerLb() override;
210 
211   void ShutdownLocked() override;
212 
213   void UpdateStateLocked();
214 
215   // Current config from the resolver.
216   RefCountedPtr<XdsClusterManagerLbConfig> config_;
217 
218   // Internal state.
219   bool shutting_down_ = false;
220   bool update_in_progress_ = false;
221 
222   // Children.
223   std::map<std::string, OrphanablePtr<ClusterChild>> children_;
224 };
225 
226 //
227 // XdsClusterManagerLb::ClusterPicker
228 //
229 
Pick(PickArgs args)230 XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
231     PickArgs args) {
232   auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
233   auto* cluster_name_attribute = static_cast<XdsClusterAttribute*>(
234       call_state->GetCallAttribute(XdsClusterAttribute::TypeName()));
235   absl::string_view cluster_name;
236   if (cluster_name_attribute != nullptr) {
237     cluster_name = cluster_name_attribute->cluster();
238   }
239   auto it = cluster_map_.find(cluster_name);
240   if (it != cluster_map_.end()) {
241     return it->second->Pick(args);
242   }
243   return PickResult::Fail(absl::InternalError(absl::StrCat(
244       "xds cluster manager picker: unknown cluster \"", cluster_name, "\"")));
245 }
246 
247 //
248 // XdsClusterManagerLb
249 //
250 
XdsClusterManagerLb(Args args)251 XdsClusterManagerLb::XdsClusterManagerLb(Args args)
252     : LoadBalancingPolicy(std::move(args)) {}
253 
~XdsClusterManagerLb()254 XdsClusterManagerLb::~XdsClusterManagerLb() {
255   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
256     gpr_log(
257         GPR_INFO,
258         "[xds_cluster_manager_lb %p] destroying xds_cluster_manager LB policy",
259         this);
260   }
261 }
262 
ShutdownLocked()263 void XdsClusterManagerLb::ShutdownLocked() {
264   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
265     gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] shutting down", this);
266   }
267   shutting_down_ = true;
268   children_.clear();
269 }
270 
ExitIdleLocked()271 void XdsClusterManagerLb::ExitIdleLocked() {
272   for (auto& p : children_) p.second->ExitIdleLocked();
273 }
274 
ResetBackoffLocked()275 void XdsClusterManagerLb::ResetBackoffLocked() {
276   for (auto& p : children_) p.second->ResetBackoffLocked();
277 }
278 
UpdateLocked(UpdateArgs args)279 absl::Status XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
280   if (shutting_down_) return absl::OkStatus();
281   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
282     gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this);
283   }
284   update_in_progress_ = true;
285   // Update config.
286   config_ = args.config.TakeAsSubclass<XdsClusterManagerLbConfig>();
287   // Deactivate the children not in the new config.
288   for (const auto& p : children_) {
289     const std::string& name = p.first;
290     ClusterChild* child = p.second.get();
291     if (config_->cluster_map().find(name) == config_->cluster_map().end()) {
292       child->DeactivateLocked();
293     }
294   }
295   // Add or update the children in the new config.
296   std::vector<std::string> errors;
297   for (const auto& p : config_->cluster_map()) {
298     const std::string& name = p.first;
299     const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second.config;
300     auto& child = children_[name];
301     if (child == nullptr) {
302       child = MakeOrphanable<ClusterChild>(
303           RefAsSubclass<XdsClusterManagerLb>(DEBUG_LOCATION, "ClusterChild"),
304           name);
305     }
306     absl::Status status =
307         child->UpdateLocked(config, args.addresses, args.args);
308     if (!status.ok()) {
309       errors.emplace_back(
310           absl::StrCat("child ", name, ": ", status.ToString()));
311     }
312   }
313   update_in_progress_ = false;
314   UpdateStateLocked();
315   // Return status.
316   if (!errors.empty()) {
317     return absl::UnavailableError(absl::StrCat(
318         "errors from children: [", absl::StrJoin(errors, "; "), "]"));
319   }
320   return absl::OkStatus();
321 }
322 
UpdateStateLocked()323 void XdsClusterManagerLb::UpdateStateLocked() {
324   // If we're in the process of propagating an update from our parent to
325   // our children, ignore any updates that come from the children.  We
326   // will instead return a new picker once the update has been seen by
327   // all children.  This avoids unnecessary picker churn while an update
328   // is being propagated to our children.
329   if (update_in_progress_) return;
330   // Also count the number of children in each state, to determine the
331   // overall state.
332   size_t num_ready = 0;
333   size_t num_connecting = 0;
334   size_t num_idle = 0;
335   for (const auto& p : children_) {
336     const auto& child_name = p.first;
337     const ClusterChild* child = p.second.get();
338     // Skip the children that are not in the latest update.
339     if (config_->cluster_map().find(child_name) ==
340         config_->cluster_map().end()) {
341       continue;
342     }
343     switch (child->connectivity_state()) {
344       case GRPC_CHANNEL_READY: {
345         ++num_ready;
346         break;
347       }
348       case GRPC_CHANNEL_CONNECTING: {
349         ++num_connecting;
350         break;
351       }
352       case GRPC_CHANNEL_IDLE: {
353         ++num_idle;
354         break;
355       }
356       case GRPC_CHANNEL_TRANSIENT_FAILURE: {
357         break;
358       }
359       default:
360         GPR_UNREACHABLE_CODE(return);
361     }
362   }
363   // Determine aggregated connectivity state.
364   grpc_connectivity_state connectivity_state;
365   if (num_ready > 0) {
366     connectivity_state = GRPC_CHANNEL_READY;
367   } else if (num_connecting > 0) {
368     connectivity_state = GRPC_CHANNEL_CONNECTING;
369   } else if (num_idle > 0) {
370     connectivity_state = GRPC_CHANNEL_IDLE;
371   } else {
372     connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
373   }
374   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
375     gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] connectivity changed to %s",
376             this, ConnectivityStateName(connectivity_state));
377   }
378   ClusterPicker::ClusterMap cluster_map;
379   for (const auto& p : config_->cluster_map()) {
380     const std::string& cluster_name = p.first;
381     RefCountedPtr<SubchannelPicker>& child_picker = cluster_map[cluster_name];
382     child_picker = children_[cluster_name]->picker();
383     if (child_picker == nullptr) {
384       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
385         gpr_log(GPR_INFO,
386                 "[xds_cluster_manager_lb %p] child %s has not yet returned a "
387                 "picker; creating a QueuePicker.",
388                 this, cluster_name.c_str());
389       }
390       child_picker =
391           MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
392     }
393   }
394   auto picker = MakeRefCounted<ClusterPicker>(std::move(cluster_map));
395   absl::Status status;
396   if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
397     status = absl::Status(absl::StatusCode::kUnavailable,
398                           "TRANSIENT_FAILURE from XdsClusterManagerLb");
399   }
400   channel_control_helper()->UpdateState(connectivity_state, status,
401                                         std::move(picker));
402 }
403 
404 //
405 // XdsClusterManagerLb::ClusterChild
406 //
407 
ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,const std::string & name)408 XdsClusterManagerLb::ClusterChild::ClusterChild(
409     RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
410     const std::string& name)
411     : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
412       name_(name),
413       picker_(MakeRefCounted<QueuePicker>(nullptr)) {
414   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
415     gpr_log(GPR_INFO,
416             "[xds_cluster_manager_lb %p] created ClusterChild %p for %s",
417             xds_cluster_manager_policy_.get(), this, name_.c_str());
418   }
419 }
420 
~ClusterChild()421 XdsClusterManagerLb::ClusterChild::~ClusterChild() {
422   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
423     gpr_log(GPR_INFO,
424             "[xds_cluster_manager_lb %p] ClusterChild %p: destroying "
425             "child",
426             xds_cluster_manager_policy_.get(), this);
427   }
428   xds_cluster_manager_policy_.reset(DEBUG_LOCATION, "ClusterChild");
429 }
430 
Orphan()431 void XdsClusterManagerLb::ClusterChild::Orphan() {
432   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
433     gpr_log(GPR_INFO,
434             "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
435             "shutting down child",
436             xds_cluster_manager_policy_.get(), this, name_.c_str());
437   }
438   // Remove the child policy's interested_parties pollset_set from the
439   // xDS policy.
440   grpc_pollset_set_del_pollset_set(
441       child_policy_->interested_parties(),
442       xds_cluster_manager_policy_->interested_parties());
443   child_policy_.reset();
444   // Drop our ref to the child's picker, in case it's holding a ref to
445   // the child.
446   picker_.reset();
447   if (delayed_removal_timer_handle_.has_value()) {
448     xds_cluster_manager_policy_->channel_control_helper()
449         ->GetEventEngine()
450         ->Cancel(*delayed_removal_timer_handle_);
451   }
452   shutdown_ = true;
453   Unref();
454 }
455 
456 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)457 XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
458     const ChannelArgs& args) {
459   LoadBalancingPolicy::Args lb_policy_args;
460   lb_policy_args.work_serializer =
461       xds_cluster_manager_policy_->work_serializer();
462   lb_policy_args.args = args;
463   lb_policy_args.channel_control_helper =
464       std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
465   OrphanablePtr<LoadBalancingPolicy> lb_policy =
466       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
467                                          &grpc_xds_cluster_manager_lb_trace);
468   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
469     gpr_log(GPR_INFO,
470             "[xds_cluster_manager_lb %p] ClusterChild %p %s: Created "
471             "new child "
472             "policy handler %p",
473             xds_cluster_manager_policy_.get(), this, name_.c_str(),
474             lb_policy.get());
475   }
476   // Add the xDS's interested_parties pollset_set to that of the newly created
477   // child policy. This will make the child policy progress upon activity on
478   // xDS LB, which in turn is tied to the application's call.
479   grpc_pollset_set_add_pollset_set(
480       lb_policy->interested_parties(),
481       xds_cluster_manager_policy_->interested_parties());
482   return lb_policy;
483 }
484 
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> & addresses,const ChannelArgs & args)485 absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked(
486     RefCountedPtr<LoadBalancingPolicy::Config> config,
487     const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>& addresses,
488     const ChannelArgs& args) {
489   if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus();
490   // Update child weight.
491   // Reactivate if needed.
492   if (delayed_removal_timer_handle_.has_value() &&
493       xds_cluster_manager_policy_->channel_control_helper()
494           ->GetEventEngine()
495           ->Cancel(*delayed_removal_timer_handle_)) {
496     delayed_removal_timer_handle_.reset();
497   }
498   // Create child policy if needed.
499   if (child_policy_ == nullptr) {
500     child_policy_ = CreateChildPolicyLocked(args);
501   }
502   // Construct update args.
503   UpdateArgs update_args;
504   update_args.config = std::move(config);
505   update_args.addresses = addresses;
506   update_args.args = args;
507   // Update the policy.
508   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
509     gpr_log(GPR_INFO,
510             "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
511             "Updating child "
512             "policy handler %p",
513             xds_cluster_manager_policy_.get(), this, name_.c_str(),
514             child_policy_.get());
515   }
516   return child_policy_->UpdateLocked(std::move(update_args));
517 }
518 
ExitIdleLocked()519 void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() {
520   child_policy_->ExitIdleLocked();
521 }
522 
ResetBackoffLocked()523 void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
524   child_policy_->ResetBackoffLocked();
525 }
526 
DeactivateLocked()527 void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
528   // If already deactivated, don't do that again.
529   if (delayed_removal_timer_handle_.has_value()) return;
530   // Start a timer to delete the child.
531   delayed_removal_timer_handle_ =
532       xds_cluster_manager_policy_->channel_control_helper()
533           ->GetEventEngine()
534           ->RunAfter(
535               kChildRetentionInterval,
536               [self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable {
537                 ApplicationCallbackExecCtx application_exec_ctx;
538                 ExecCtx exec_ctx;
539                 auto* self_ptr = self.get();  // Avoid use-after-move problem.
540                 self_ptr->xds_cluster_manager_policy_->work_serializer()->Run(
541                     [self = std::move(self)]() {
542                       self->OnDelayedRemovalTimerLocked();
543                     },
544                     DEBUG_LOCATION);
545               });
546 }
547 
OnDelayedRemovalTimerLocked()548 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked() {
549   delayed_removal_timer_handle_.reset();
550   if (!shutdown_) {
551     xds_cluster_manager_policy_->children_.erase(name_);
552   }
553 }
554 
555 //
556 // XdsClusterManagerLb::ClusterChild::Helper
557 //
558 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)559 void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
560     grpc_connectivity_state state, const absl::Status& status,
561     RefCountedPtr<SubchannelPicker> picker) {
562   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
563     gpr_log(
564         GPR_INFO,
565         "[xds_cluster_manager_lb %p] child %s: received update: state=%s (%s) "
566         "picker=%p",
567         xds_cluster_manager_child_->xds_cluster_manager_policy_.get(),
568         xds_cluster_manager_child_->name_.c_str(), ConnectivityStateName(state),
569         status.ToString().c_str(), picker.get());
570   }
571   if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
572     return;
573   }
574   // Cache the picker in the ClusterChild.
575   xds_cluster_manager_child_->picker_ = std::move(picker);
576   // Decide what state to report for aggregation purposes.
577   // If the last recorded state was TRANSIENT_FAILURE and the new state
578   // is something other than READY, don't change the state.
579   if (xds_cluster_manager_child_->connectivity_state_ !=
580           GRPC_CHANNEL_TRANSIENT_FAILURE ||
581       state == GRPC_CHANNEL_READY) {
582     xds_cluster_manager_child_->connectivity_state_ = state;
583   }
584   // Notify the LB policy.
585   xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked();
586 }
587 
588 //
589 // factory
590 //
591 
JsonLoader(const JsonArgs &)592 const JsonLoaderInterface* XdsClusterManagerLbConfig::Child::JsonLoader(
593     const JsonArgs&) {
594   // Note: The "childPolicy" field requires custom processing, so
595   // it's handled in JsonPostLoad() instead.
596   static const auto* loader = JsonObjectLoader<Child>().Finish();
597   return loader;
598 }
599 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)600 void XdsClusterManagerLbConfig::Child::JsonPostLoad(const Json& json,
601                                                     const JsonArgs&,
602                                                     ValidationErrors* errors) {
603   ValidationErrors::ScopedField field(errors, ".childPolicy");
604   auto it = json.object().find("childPolicy");
605   if (it == json.object().end()) {
606     errors->AddError("field not present");
607     return;
608   }
609   auto lb_config =
610       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
611           it->second);
612   if (!lb_config.ok()) {
613     errors->AddError(lb_config.status().message());
614     return;
615   }
616   config = std::move(*lb_config);
617 }
618 
JsonLoader(const JsonArgs &)619 const JsonLoaderInterface* XdsClusterManagerLbConfig::JsonLoader(
620     const JsonArgs&) {
621   static const auto* loader =
622       JsonObjectLoader<XdsClusterManagerLbConfig>()
623           .Field("children", &XdsClusterManagerLbConfig::cluster_map_)
624           .Finish();
625   return loader;
626 }
627 
628 class XdsClusterManagerLbFactory final : public LoadBalancingPolicyFactory {
629  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const630   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
631       LoadBalancingPolicy::Args args) const override {
632     return MakeOrphanable<XdsClusterManagerLb>(std::move(args));
633   }
634 
name() const635   absl::string_view name() const override { return kXdsClusterManager; }
636 
637   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const638   ParseLoadBalancingConfig(const Json& json) const override {
639     return LoadFromJson<RefCountedPtr<XdsClusterManagerLbConfig>>(
640         json, JsonArgs(),
641         "errors validating xds_cluster_manager LB policy config");
642   }
643 };
644 
645 }  // namespace
646 
RegisterXdsClusterManagerLbPolicy(CoreConfiguration::Builder * builder)647 void RegisterXdsClusterManagerLbPolicy(CoreConfiguration::Builder* builder) {
648   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
649       std::make_unique<XdsClusterManagerLbFactory>());
650 }
651 
652 }  // namespace grpc_core
653