xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/weighted_target/weighted_target.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/load_balancing/weighted_target/weighted_target.h"
20 
21 #include <string.h>
22 
23 #include <algorithm>
24 #include <cstdint>
25 #include <map>
26 #include <memory>
27 #include <string>
28 #include <utility>
29 #include <vector>
30 
31 #include "absl/base/thread_annotations.h"
32 #include "absl/meta/type_traits.h"
33 #include "absl/random/random.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/str_join.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/types/optional.h"
40 
41 #include <grpc/event_engine/event_engine.h>
42 #include <grpc/impl/connectivity_state.h>
43 #include <grpc/support/log.h>
44 
45 #include "src/core/load_balancing/address_filtering.h"
46 #include "src/core/load_balancing/child_policy_handler.h"
47 #include "src/core/lib/channel/channel_args.h"
48 #include "src/core/lib/config/core_configuration.h"
49 #include "src/core/lib/debug/trace.h"
50 #include "src/core/lib/gprpp/debug_location.h"
51 #include "src/core/lib/gprpp/orphanable.h"
52 #include "src/core/lib/gprpp/ref_counted_ptr.h"
53 #include "src/core/lib/gprpp/sync.h"
54 #include "src/core/lib/gprpp/time.h"
55 #include "src/core/lib/gprpp/validation_errors.h"
56 #include "src/core/lib/gprpp/work_serializer.h"
57 #include "src/core/lib/iomgr/exec_ctx.h"
58 #include "src/core/lib/iomgr/pollset_set.h"
59 #include "src/core/lib/json/json.h"
60 #include "src/core/lib/json/json_args.h"
61 #include "src/core/lib/json/json_object_loader.h"
62 #include "src/core/lib/transport/connectivity_state.h"
63 #include "src/core/load_balancing/delegating_helper.h"
64 #include "src/core/load_balancing/lb_policy.h"
65 #include "src/core/load_balancing/lb_policy_factory.h"
66 #include "src/core/load_balancing/lb_policy_registry.h"
67 #include "src/core/resolver/endpoint_addresses.h"
68 
69 // IWYU pragma: no_include <type_traits>
70 
71 namespace grpc_core {
72 
73 TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb");
74 
75 namespace {
76 
77 using ::grpc_event_engine::experimental::EventEngine;
78 
79 constexpr absl::string_view kWeightedTarget = "weighted_target_experimental";
80 
81 // How long we keep a child around for after it has been removed from
82 // the config.
83 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
84 
85 // Config for weighted_target LB policy.
86 class WeightedTargetLbConfig final : public LoadBalancingPolicy::Config {
87  public:
88   struct ChildConfig {
89     uint32_t weight;
90     RefCountedPtr<LoadBalancingPolicy::Config> config;
91 
92     static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
93     void JsonPostLoad(const Json& json, const JsonArgs&,
94                       ValidationErrors* errors);
95   };
96 
97   using TargetMap = std::map<std::string, ChildConfig>;
98 
99   WeightedTargetLbConfig() = default;
100 
101   WeightedTargetLbConfig(const WeightedTargetLbConfig&) = delete;
102   WeightedTargetLbConfig& operator=(const WeightedTargetLbConfig&) = delete;
103 
104   WeightedTargetLbConfig(WeightedTargetLbConfig&& other) = delete;
105   WeightedTargetLbConfig& operator=(WeightedTargetLbConfig&& other) = delete;
106 
name() const107   absl::string_view name() const override { return kWeightedTarget; }
108 
target_map() const109   const TargetMap& target_map() const { return target_map_; }
110 
111   static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
112 
113  private:
114   TargetMap target_map_;
115 };
116 
117 // weighted_target LB policy.
118 class WeightedTargetLb final : public LoadBalancingPolicy {
119  public:
120   explicit WeightedTargetLb(Args args);
121 
name() const122   absl::string_view name() const override { return kWeightedTarget; }
123 
124   absl::Status UpdateLocked(UpdateArgs args) override;
125   void ResetBackoffLocked() override;
126 
127  private:
128   // Picks a child using stateless WRR and then delegates to that
129   // child's picker.
130   class WeightedPicker final : public SubchannelPicker {
131    public:
132     // Maintains a weighted list of pickers from each child that is in
133     // ready state. The first element in the pair represents the end of a
134     // range proportional to the child's weight. The start of the range
135     // is the previous value in the vector and is 0 for the first element.
136     using PickerList =
137         std::vector<std::pair<uint64_t, RefCountedPtr<SubchannelPicker>>>;
138 
WeightedPicker(PickerList pickers)139     explicit WeightedPicker(PickerList pickers)
140         : pickers_(std::move(pickers)) {}
141 
142     PickResult Pick(PickArgs args) override;
143 
144    private:
145     PickerList pickers_;
146 
147     // TODO(roth): Consider using a separate thread-local BitGen for each CPU
148     // to avoid the need for this mutex.
149     Mutex mu_;
150     absl::BitGen bit_gen_ ABSL_GUARDED_BY(&mu_);
151   };
152 
153   // Each WeightedChild holds a ref to its parent WeightedTargetLb.
154   class WeightedChild final : public InternallyRefCounted<WeightedChild> {
155    public:
156     WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,
157                   const std::string& name);
158     ~WeightedChild() override;
159 
160     void Orphan() override;
161 
162     absl::Status UpdateLocked(
163         const WeightedTargetLbConfig::ChildConfig& config,
164         absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
165         const std::string& resolution_note, ChannelArgs args);
166     void ResetBackoffLocked();
167     void DeactivateLocked();
168 
weight() const169     uint32_t weight() const { return weight_; }
connectivity_state() const170     grpc_connectivity_state connectivity_state() const {
171       return connectivity_state_;
172     }
picker() const173     RefCountedPtr<SubchannelPicker> picker() const { return picker_; }
174 
175    private:
176     class Helper final : public DelegatingChannelControlHelper {
177      public:
Helper(RefCountedPtr<WeightedChild> weighted_child)178       explicit Helper(RefCountedPtr<WeightedChild> weighted_child)
179           : weighted_child_(std::move(weighted_child)) {}
180 
~Helper()181       ~Helper() override { weighted_child_.reset(DEBUG_LOCATION, "Helper"); }
182 
183       void UpdateState(grpc_connectivity_state state,
184                        const absl::Status& status,
185                        RefCountedPtr<SubchannelPicker> picker) override;
186 
187      private:
parent_helper() const188       ChannelControlHelper* parent_helper() const override {
189         return weighted_child_->weighted_target_policy_
190             ->channel_control_helper();
191       }
192 
193       RefCountedPtr<WeightedChild> weighted_child_;
194     };
195 
196     class DelayedRemovalTimer final
197         : public InternallyRefCounted<DelayedRemovalTimer> {
198      public:
199       explicit DelayedRemovalTimer(RefCountedPtr<WeightedChild> weighted_child);
200 
201       void Orphan() override;
202 
203      private:
204       void OnTimerLocked();
205 
206       RefCountedPtr<WeightedChild> weighted_child_;
207       absl::optional<EventEngine::TaskHandle> timer_handle_;
208     };
209 
210     // Methods for dealing with the child policy.
211     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
212         const ChannelArgs& args);
213 
214     void OnConnectivityStateUpdateLocked(
215         grpc_connectivity_state state, const absl::Status& status,
216         RefCountedPtr<SubchannelPicker> picker);
217 
218     // The owning LB policy.
219     RefCountedPtr<WeightedTargetLb> weighted_target_policy_;
220 
221     const std::string name_;
222 
223     uint32_t weight_ = 0;
224 
225     OrphanablePtr<LoadBalancingPolicy> child_policy_;
226 
227     RefCountedPtr<SubchannelPicker> picker_;
228     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
229 
230     OrphanablePtr<DelayedRemovalTimer> delayed_removal_timer_;
231   };
232 
233   ~WeightedTargetLb() override;
234 
235   void ShutdownLocked() override;
236 
237   void UpdateStateLocked();
238 
239   // Current config from the resolver.
240   RefCountedPtr<WeightedTargetLbConfig> config_;
241 
242   // Internal state.
243   bool shutting_down_ = false;
244   bool update_in_progress_ = false;
245 
246   // Children.
247   std::map<std::string, OrphanablePtr<WeightedChild>> targets_;
248 };
249 
250 //
251 // WeightedTargetLb::WeightedPicker
252 //
253 
Pick(PickArgs args)254 WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick(
255     PickArgs args) {
256   // Generate a random number in [0, total weight).
257   const uint64_t key = [&]() {
258     MutexLock lock(&mu_);
259     return absl::Uniform<uint64_t>(bit_gen_, 0, pickers_.back().first);
260   }();
261   // Find the index in pickers_ corresponding to key.
262   size_t mid = 0;
263   size_t start_index = 0;
264   size_t end_index = pickers_.size() - 1;
265   size_t index = 0;
266   while (end_index > start_index) {
267     mid = (start_index + end_index) / 2;
268     if (pickers_[mid].first > key) {
269       end_index = mid;
270     } else if (pickers_[mid].first < key) {
271       start_index = mid + 1;
272     } else {
273       index = mid + 1;
274       break;
275     }
276   }
277   if (index == 0) index = start_index;
278   GPR_ASSERT(pickers_[index].first > key);
279   // Delegate to the child picker.
280   return pickers_[index].second->Pick(args);
281 }
282 
283 //
284 // WeightedTargetLb
285 //
286 
WeightedTargetLb(Args args)287 WeightedTargetLb::WeightedTargetLb(Args args)
288     : LoadBalancingPolicy(std::move(args)) {
289   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
290     gpr_log(GPR_INFO, "[weighted_target_lb %p] created", this);
291   }
292 }
293 
~WeightedTargetLb()294 WeightedTargetLb::~WeightedTargetLb() {
295   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
296     gpr_log(GPR_INFO,
297             "[weighted_target_lb %p] destroying weighted_target LB policy",
298             this);
299   }
300 }
301 
ShutdownLocked()302 void WeightedTargetLb::ShutdownLocked() {
303   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
304     gpr_log(GPR_INFO, "[weighted_target_lb %p] shutting down", this);
305   }
306   shutting_down_ = true;
307   targets_.clear();
308 }
309 
ResetBackoffLocked()310 void WeightedTargetLb::ResetBackoffLocked() {
311   for (auto& p : targets_) p.second->ResetBackoffLocked();
312 }
313 
UpdateLocked(UpdateArgs args)314 absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) {
315   if (shutting_down_) return absl::OkStatus();
316   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
317     gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this);
318   }
319   update_in_progress_ = true;
320   // Update config.
321   config_ = args.config.TakeAsSubclass<WeightedTargetLbConfig>();
322   // Deactivate the targets not in the new config.
323   for (const auto& p : targets_) {
324     const std::string& name = p.first;
325     WeightedChild* child = p.second.get();
326     if (config_->target_map().find(name) == config_->target_map().end()) {
327       child->DeactivateLocked();
328     }
329   }
330   // Update all children.
331   absl::StatusOr<HierarchicalAddressMap> address_map =
332       MakeHierarchicalAddressMap(args.addresses);
333   std::vector<std::string> errors;
334   for (const auto& p : config_->target_map()) {
335     const std::string& name = p.first;
336     const WeightedTargetLbConfig::ChildConfig& config = p.second;
337     auto& target = targets_[name];
338     // Create child if it does not already exist.
339     if (target == nullptr) {
340       target = MakeOrphanable<WeightedChild>(
341           RefAsSubclass<WeightedTargetLb>(DEBUG_LOCATION, "WeightedChild"),
342           name);
343     }
344     absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses;
345     if (address_map.ok()) {
346       auto it = address_map->find(name);
347       if (it == address_map->end()) {
348         addresses = std::make_shared<EndpointAddressesListIterator>(
349             EndpointAddressesList());
350       } else {
351         addresses = std::move(it->second);
352       }
353     } else {
354       addresses = address_map.status();
355     }
356     absl::Status status = target->UpdateLocked(config, std::move(addresses),
357                                                args.resolution_note, args.args);
358     if (!status.ok()) {
359       errors.emplace_back(
360           absl::StrCat("child ", name, ": ", status.ToString()));
361     }
362   }
363   update_in_progress_ = false;
364   if (config_->target_map().empty()) {
365     absl::Status status = absl::UnavailableError(absl::StrCat(
366         "no children in weighted_target policy: ", args.resolution_note));
367     channel_control_helper()->UpdateState(
368         GRPC_CHANNEL_TRANSIENT_FAILURE, status,
369         MakeRefCounted<TransientFailurePicker>(status));
370     return absl::OkStatus();
371   }
372   UpdateStateLocked();
373   // Return status.
374   if (!errors.empty()) {
375     return absl::UnavailableError(absl::StrCat(
376         "errors from children: [", absl::StrJoin(errors, "; "), "]"));
377   }
378   return absl::OkStatus();
379 }
380 
UpdateStateLocked()381 void WeightedTargetLb::UpdateStateLocked() {
382   // If we're in the process of propagating an update from our parent to
383   // our children, ignore any updates that come from the children.  We
384   // will instead return a new picker once the update has been seen by
385   // all children.  This avoids unnecessary picker churn while an update
386   // is being propagated to our children.
387   if (update_in_progress_) return;
388   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
389     gpr_log(GPR_INFO,
390             "[weighted_target_lb %p] scanning children to determine "
391             "connectivity state",
392             this);
393   }
394   // Construct lists of child pickers with associated weights, one for
395   // children that are in state READY and another for children that are
396   // in state TRANSIENT_FAILURE.  Each child is represented by a portion of
397   // the range proportional to its weight, such that the total range is the
398   // sum of the weights of all children.
399   WeightedPicker::PickerList ready_picker_list;
400   uint64_t ready_end = 0;
401   WeightedPicker::PickerList tf_picker_list;
402   uint64_t tf_end = 0;
403   // Also count the number of children in CONNECTING and IDLE, to determine
404   // the aggregated state.
405   size_t num_connecting = 0;
406   size_t num_idle = 0;
407   for (const auto& p : targets_) {
408     const std::string& child_name = p.first;
409     const WeightedChild* child = p.second.get();
410     // Skip the targets that are not in the latest update.
411     if (config_->target_map().find(child_name) == config_->target_map().end()) {
412       continue;
413     }
414     auto child_picker = child->picker();
415     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
416       gpr_log(GPR_INFO,
417               "[weighted_target_lb %p]   child=%s state=%s weight=%u picker=%p",
418               this, child_name.c_str(),
419               ConnectivityStateName(child->connectivity_state()),
420               child->weight(), child_picker.get());
421     }
422     switch (child->connectivity_state()) {
423       case GRPC_CHANNEL_READY: {
424         GPR_ASSERT(child->weight() > 0);
425         ready_end += child->weight();
426         ready_picker_list.emplace_back(ready_end, std::move(child_picker));
427         break;
428       }
429       case GRPC_CHANNEL_CONNECTING: {
430         ++num_connecting;
431         break;
432       }
433       case GRPC_CHANNEL_IDLE: {
434         ++num_idle;
435         break;
436       }
437       case GRPC_CHANNEL_TRANSIENT_FAILURE: {
438         GPR_ASSERT(child->weight() > 0);
439         tf_end += child->weight();
440         tf_picker_list.emplace_back(tf_end, std::move(child_picker));
441         break;
442       }
443       default:
444         GPR_UNREACHABLE_CODE(return);
445     }
446   }
447   // Determine aggregated connectivity state.
448   grpc_connectivity_state connectivity_state;
449   if (!ready_picker_list.empty()) {
450     connectivity_state = GRPC_CHANNEL_READY;
451   } else if (num_connecting > 0) {
452     connectivity_state = GRPC_CHANNEL_CONNECTING;
453   } else if (num_idle > 0) {
454     connectivity_state = GRPC_CHANNEL_IDLE;
455   } else {
456     connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
457   }
458   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
459     gpr_log(GPR_INFO, "[weighted_target_lb %p] connectivity changed to %s",
460             this, ConnectivityStateName(connectivity_state));
461   }
462   RefCountedPtr<SubchannelPicker> picker;
463   absl::Status status;
464   switch (connectivity_state) {
465     case GRPC_CHANNEL_READY:
466       picker = MakeRefCounted<WeightedPicker>(std::move(ready_picker_list));
467       break;
468     case GRPC_CHANNEL_CONNECTING:
469     case GRPC_CHANNEL_IDLE:
470       picker = MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
471       break;
472     default:
473       picker = MakeRefCounted<WeightedPicker>(std::move(tf_picker_list));
474   }
475   channel_control_helper()->UpdateState(connectivity_state, status,
476                                         std::move(picker));
477 }
478 
479 //
480 // WeightedTargetLb::WeightedChild::DelayedRemovalTimer
481 //
482 
DelayedRemovalTimer(RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)483 WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer(
484     RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)
485     : weighted_child_(std::move(weighted_child)) {
486   timer_handle_ =
487       weighted_child_->weighted_target_policy_->channel_control_helper()
488           ->GetEventEngine()
489           ->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable {
490             ApplicationCallbackExecCtx app_exec_ctx;
491             ExecCtx exec_ctx;
492             auto* self_ptr = self.get();  // Avoid use-after-move problem.
493             self_ptr->weighted_child_->weighted_target_policy_
494                 ->work_serializer()
495                 ->Run([self = std::move(self)] { self->OnTimerLocked(); },
496                       DEBUG_LOCATION);
497           });
498 }
499 
Orphan()500 void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
501   if (timer_handle_.has_value()) {
502     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
503       gpr_log(GPR_INFO,
504               "[weighted_target_lb %p] WeightedChild %p %s: cancelling "
505               "delayed removal timer",
506               weighted_child_->weighted_target_policy_.get(),
507               weighted_child_.get(), weighted_child_->name_.c_str());
508     }
509     weighted_child_->weighted_target_policy_->channel_control_helper()
510         ->GetEventEngine()
511         ->Cancel(*timer_handle_);
512   }
513   Unref();
514 }
515 
OnTimerLocked()516 void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimerLocked() {
517   GPR_ASSERT(timer_handle_.has_value());
518   timer_handle_.reset();
519   weighted_child_->weighted_target_policy_->targets_.erase(
520       weighted_child_->name_);
521 }
522 
523 //
524 // WeightedTargetLb::WeightedChild
525 //
526 
WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,const std::string & name)527 WeightedTargetLb::WeightedChild::WeightedChild(
528     RefCountedPtr<WeightedTargetLb> weighted_target_policy,
529     const std::string& name)
530     : weighted_target_policy_(std::move(weighted_target_policy)),
531       name_(name),
532       picker_(MakeRefCounted<QueuePicker>(nullptr)) {
533   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
534     gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s",
535             weighted_target_policy_.get(), this, name_.c_str());
536   }
537 }
538 
~WeightedChild()539 WeightedTargetLb::WeightedChild::~WeightedChild() {
540   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
541     gpr_log(GPR_INFO,
542             "[weighted_target_lb %p] WeightedChild %p %s: destroying child",
543             weighted_target_policy_.get(), this, name_.c_str());
544   }
545   weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild");
546 }
547 
Orphan()548 void WeightedTargetLb::WeightedChild::Orphan() {
549   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
550     gpr_log(GPR_INFO,
551             "[weighted_target_lb %p] WeightedChild %p %s: shutting down child",
552             weighted_target_policy_.get(), this, name_.c_str());
553   }
554   // Remove the child policy's interested_parties pollset_set from the
555   // xDS policy.
556   grpc_pollset_set_del_pollset_set(
557       child_policy_->interested_parties(),
558       weighted_target_policy_->interested_parties());
559   child_policy_.reset();
560   // Drop our ref to the child's picker, in case it's holding a ref to
561   // the child.
562   picker_.reset();
563   delayed_removal_timer_.reset();
564   Unref();
565 }
566 
567 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)568 WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
569     const ChannelArgs& args) {
570   LoadBalancingPolicy::Args lb_policy_args;
571   lb_policy_args.work_serializer = weighted_target_policy_->work_serializer();
572   lb_policy_args.args = args;
573   lb_policy_args.channel_control_helper =
574       std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
575   OrphanablePtr<LoadBalancingPolicy> lb_policy =
576       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
577                                          &grpc_lb_weighted_target_trace);
578   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
579     gpr_log(GPR_INFO,
580             "[weighted_target_lb %p] WeightedChild %p %s: Created new child "
581             "policy handler %p",
582             weighted_target_policy_.get(), this, name_.c_str(),
583             lb_policy.get());
584   }
585   // Add the xDS's interested_parties pollset_set to that of the newly created
586   // child policy. This will make the child policy progress upon activity on
587   // xDS LB, which in turn is tied to the application's call.
588   grpc_pollset_set_add_pollset_set(
589       lb_policy->interested_parties(),
590       weighted_target_policy_->interested_parties());
591   return lb_policy;
592 }
593 
UpdateLocked(const WeightedTargetLbConfig::ChildConfig & config,absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,const std::string & resolution_note,ChannelArgs args)594 absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
595     const WeightedTargetLbConfig::ChildConfig& config,
596     absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
597     const std::string& resolution_note, ChannelArgs args) {
598   if (weighted_target_policy_->shutting_down_) return absl::OkStatus();
599   // Update child weight.
600   if (weight_ != config.weight &&
601       GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
602     gpr_log(GPR_INFO, "[weighted_target_lb %p] WeightedChild %p %s: weight=%u",
603             weighted_target_policy_.get(), this, name_.c_str(), config.weight);
604   }
605   weight_ = config.weight;
606   // Reactivate if needed.
607   if (delayed_removal_timer_ != nullptr) {
608     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
609       gpr_log(GPR_INFO,
610               "[weighted_target_lb %p] WeightedChild %p %s: reactivating",
611               weighted_target_policy_.get(), this, name_.c_str());
612     }
613     delayed_removal_timer_.reset();
614   }
615   // Create child policy if needed.
616   args = args.Set(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD, name_);
617   if (child_policy_ == nullptr) {
618     child_policy_ = CreateChildPolicyLocked(args);
619   }
620   // Construct update args.
621   UpdateArgs update_args;
622   update_args.config = config.config;
623   update_args.addresses = std::move(addresses);
624   update_args.resolution_note = resolution_note;
625   update_args.args = std::move(args);
626   // Update the policy.
627   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
628     gpr_log(GPR_INFO,
629             "[weighted_target_lb %p] WeightedChild %p %s: Updating child "
630             "policy handler %p",
631             weighted_target_policy_.get(), this, name_.c_str(),
632             child_policy_.get());
633   }
634   return child_policy_->UpdateLocked(std::move(update_args));
635 }
636 
ResetBackoffLocked()637 void WeightedTargetLb::WeightedChild::ResetBackoffLocked() {
638   child_policy_->ResetBackoffLocked();
639 }
640 
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)641 void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked(
642     grpc_connectivity_state state, const absl::Status& status,
643     RefCountedPtr<SubchannelPicker> picker) {
644   // Cache the picker in the WeightedChild.
645   picker_ = std::move(picker);
646   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
647     gpr_log(GPR_INFO,
648             "[weighted_target_lb %p] WeightedChild %p %s: connectivity "
649             "state update: state=%s (%s) picker=%p",
650             weighted_target_policy_.get(), this, name_.c_str(),
651             ConnectivityStateName(state), status.ToString().c_str(),
652             picker_.get());
653   }
654   // If the child reports IDLE, immediately tell it to exit idle.
655   if (state == GRPC_CHANNEL_IDLE) child_policy_->ExitIdleLocked();
656   // Decide what state to report for aggregation purposes.
657   // If the last recorded state was TRANSIENT_FAILURE and the new state
658   // is something other than READY, don't change the state.
659   if (connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE ||
660       state == GRPC_CHANNEL_READY) {
661     connectivity_state_ = state;
662   }
663   // Update the LB policy's state if this child is not deactivated.
664   if (weight_ != 0) weighted_target_policy_->UpdateStateLocked();
665 }
666 
DeactivateLocked()667 void WeightedTargetLb::WeightedChild::DeactivateLocked() {
668   // If already deactivated, don't do that again.
669   if (weight_ == 0) return;
670   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
671     gpr_log(GPR_INFO,
672             "[weighted_target_lb %p] WeightedChild %p %s: deactivating",
673             weighted_target_policy_.get(), this, name_.c_str());
674   }
675   // Set the child weight to 0 so that future picker won't contain this child.
676   weight_ = 0;
677   // Start a timer to delete the child.
678   delayed_removal_timer_ = MakeOrphanable<DelayedRemovalTimer>(
679       Ref(DEBUG_LOCATION, "DelayedRemovalTimer"));
680 }
681 
682 //
683 // WeightedTargetLb::WeightedChild::Helper
684 //
685 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)686 void WeightedTargetLb::WeightedChild::Helper::UpdateState(
687     grpc_connectivity_state state, const absl::Status& status,
688     RefCountedPtr<SubchannelPicker> picker) {
689   if (weighted_child_->weighted_target_policy_->shutting_down_) return;
690   weighted_child_->OnConnectivityStateUpdateLocked(state, status,
691                                                    std::move(picker));
692 }
693 
694 //
695 // factory
696 //
697 
JsonLoader(const JsonArgs &)698 const JsonLoaderInterface* WeightedTargetLbConfig::ChildConfig::JsonLoader(
699     const JsonArgs&) {
700   static const auto* loader =
701       JsonObjectLoader<ChildConfig>()
702           // Note: The config field requires custom parsing, so it's
703           // handled in JsonPostLoad() instead.
704           .Field("weight", &ChildConfig::weight)
705           .Finish();
706   return loader;
707 }
708 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)709 void WeightedTargetLbConfig::ChildConfig::JsonPostLoad(
710     const Json& json, const JsonArgs&, ValidationErrors* errors) {
711   ValidationErrors::ScopedField field(errors, ".childPolicy");
712   auto it = json.object().find("childPolicy");
713   if (it == json.object().end()) {
714     errors->AddError("field not present");
715     return;
716   }
717   auto lb_config =
718       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
719           it->second);
720   if (!lb_config.ok()) {
721     errors->AddError(lb_config.status().message());
722     return;
723   }
724   config = std::move(*lb_config);
725 }
726 
JsonLoader(const JsonArgs &)727 const JsonLoaderInterface* WeightedTargetLbConfig::JsonLoader(const JsonArgs&) {
728   static const auto* loader =
729       JsonObjectLoader<WeightedTargetLbConfig>()
730           .Field("targets", &WeightedTargetLbConfig::target_map_)
731           .Finish();
732   return loader;
733 }
734 
735 class WeightedTargetLbFactory final : public LoadBalancingPolicyFactory {
736  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const737   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
738       LoadBalancingPolicy::Args args) const override {
739     return MakeOrphanable<WeightedTargetLb>(std::move(args));
740   }
741 
name() const742   absl::string_view name() const override { return kWeightedTarget; }
743 
744   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const745   ParseLoadBalancingConfig(const Json& json) const override {
746     return LoadFromJson<RefCountedPtr<WeightedTargetLbConfig>>(
747         json, JsonArgs(), "errors validating weighted_target LB policy config");
748   }
749 };
750 
751 }  // namespace
752 
RegisterWeightedTargetLbPolicy(CoreConfiguration::Builder * builder)753 void RegisterWeightedTargetLbPolicy(CoreConfiguration::Builder* builder) {
754   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
755       std::make_unique<WeightedTargetLbFactory>());
756 }
757 
758 }  // namespace grpc_core
759