1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/load_balancing/weighted_target/weighted_target.h"
20
21 #include <string.h>
22
23 #include <algorithm>
24 #include <cstdint>
25 #include <map>
26 #include <memory>
27 #include <string>
28 #include <utility>
29 #include <vector>
30
31 #include "absl/base/thread_annotations.h"
32 #include "absl/meta/type_traits.h"
33 #include "absl/random/random.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/str_join.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/types/optional.h"
40
41 #include <grpc/event_engine/event_engine.h>
42 #include <grpc/impl/connectivity_state.h>
43 #include <grpc/support/log.h>
44
45 #include "src/core/load_balancing/address_filtering.h"
46 #include "src/core/load_balancing/child_policy_handler.h"
47 #include "src/core/lib/channel/channel_args.h"
48 #include "src/core/lib/config/core_configuration.h"
49 #include "src/core/lib/debug/trace.h"
50 #include "src/core/lib/gprpp/debug_location.h"
51 #include "src/core/lib/gprpp/orphanable.h"
52 #include "src/core/lib/gprpp/ref_counted_ptr.h"
53 #include "src/core/lib/gprpp/sync.h"
54 #include "src/core/lib/gprpp/time.h"
55 #include "src/core/lib/gprpp/validation_errors.h"
56 #include "src/core/lib/gprpp/work_serializer.h"
57 #include "src/core/lib/iomgr/exec_ctx.h"
58 #include "src/core/lib/iomgr/pollset_set.h"
59 #include "src/core/lib/json/json.h"
60 #include "src/core/lib/json/json_args.h"
61 #include "src/core/lib/json/json_object_loader.h"
62 #include "src/core/lib/transport/connectivity_state.h"
63 #include "src/core/load_balancing/delegating_helper.h"
64 #include "src/core/load_balancing/lb_policy.h"
65 #include "src/core/load_balancing/lb_policy_factory.h"
66 #include "src/core/load_balancing/lb_policy_registry.h"
67 #include "src/core/resolver/endpoint_addresses.h"
68
69 // IWYU pragma: no_include <type_traits>
70
71 namespace grpc_core {
72
73 TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb");
74
75 namespace {
76
77 using ::grpc_event_engine::experimental::EventEngine;
78
79 constexpr absl::string_view kWeightedTarget = "weighted_target_experimental";
80
81 // How long we keep a child around for after it has been removed from
82 // the config.
83 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
84
85 // Config for weighted_target LB policy.
86 class WeightedTargetLbConfig final : public LoadBalancingPolicy::Config {
87 public:
88 struct ChildConfig {
89 uint32_t weight;
90 RefCountedPtr<LoadBalancingPolicy::Config> config;
91
92 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
93 void JsonPostLoad(const Json& json, const JsonArgs&,
94 ValidationErrors* errors);
95 };
96
97 using TargetMap = std::map<std::string, ChildConfig>;
98
99 WeightedTargetLbConfig() = default;
100
101 WeightedTargetLbConfig(const WeightedTargetLbConfig&) = delete;
102 WeightedTargetLbConfig& operator=(const WeightedTargetLbConfig&) = delete;
103
104 WeightedTargetLbConfig(WeightedTargetLbConfig&& other) = delete;
105 WeightedTargetLbConfig& operator=(WeightedTargetLbConfig&& other) = delete;
106
name() const107 absl::string_view name() const override { return kWeightedTarget; }
108
target_map() const109 const TargetMap& target_map() const { return target_map_; }
110
111 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
112
113 private:
114 TargetMap target_map_;
115 };
116
117 // weighted_target LB policy.
118 class WeightedTargetLb final : public LoadBalancingPolicy {
119 public:
120 explicit WeightedTargetLb(Args args);
121
name() const122 absl::string_view name() const override { return kWeightedTarget; }
123
124 absl::Status UpdateLocked(UpdateArgs args) override;
125 void ResetBackoffLocked() override;
126
127 private:
128 // Picks a child using stateless WRR and then delegates to that
129 // child's picker.
130 class WeightedPicker final : public SubchannelPicker {
131 public:
132 // Maintains a weighted list of pickers from each child that is in
133 // ready state. The first element in the pair represents the end of a
134 // range proportional to the child's weight. The start of the range
135 // is the previous value in the vector and is 0 for the first element.
136 using PickerList =
137 std::vector<std::pair<uint64_t, RefCountedPtr<SubchannelPicker>>>;
138
WeightedPicker(PickerList pickers)139 explicit WeightedPicker(PickerList pickers)
140 : pickers_(std::move(pickers)) {}
141
142 PickResult Pick(PickArgs args) override;
143
144 private:
145 PickerList pickers_;
146
147 // TODO(roth): Consider using a separate thread-local BitGen for each CPU
148 // to avoid the need for this mutex.
149 Mutex mu_;
150 absl::BitGen bit_gen_ ABSL_GUARDED_BY(&mu_);
151 };
152
153 // Each WeightedChild holds a ref to its parent WeightedTargetLb.
154 class WeightedChild final : public InternallyRefCounted<WeightedChild> {
155 public:
156 WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,
157 const std::string& name);
158 ~WeightedChild() override;
159
160 void Orphan() override;
161
162 absl::Status UpdateLocked(
163 const WeightedTargetLbConfig::ChildConfig& config,
164 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
165 const std::string& resolution_note, ChannelArgs args);
166 void ResetBackoffLocked();
167 void DeactivateLocked();
168
weight() const169 uint32_t weight() const { return weight_; }
connectivity_state() const170 grpc_connectivity_state connectivity_state() const {
171 return connectivity_state_;
172 }
picker() const173 RefCountedPtr<SubchannelPicker> picker() const { return picker_; }
174
175 private:
176 class Helper final : public DelegatingChannelControlHelper {
177 public:
Helper(RefCountedPtr<WeightedChild> weighted_child)178 explicit Helper(RefCountedPtr<WeightedChild> weighted_child)
179 : weighted_child_(std::move(weighted_child)) {}
180
~Helper()181 ~Helper() override { weighted_child_.reset(DEBUG_LOCATION, "Helper"); }
182
183 void UpdateState(grpc_connectivity_state state,
184 const absl::Status& status,
185 RefCountedPtr<SubchannelPicker> picker) override;
186
187 private:
parent_helper() const188 ChannelControlHelper* parent_helper() const override {
189 return weighted_child_->weighted_target_policy_
190 ->channel_control_helper();
191 }
192
193 RefCountedPtr<WeightedChild> weighted_child_;
194 };
195
196 class DelayedRemovalTimer final
197 : public InternallyRefCounted<DelayedRemovalTimer> {
198 public:
199 explicit DelayedRemovalTimer(RefCountedPtr<WeightedChild> weighted_child);
200
201 void Orphan() override;
202
203 private:
204 void OnTimerLocked();
205
206 RefCountedPtr<WeightedChild> weighted_child_;
207 absl::optional<EventEngine::TaskHandle> timer_handle_;
208 };
209
210 // Methods for dealing with the child policy.
211 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
212 const ChannelArgs& args);
213
214 void OnConnectivityStateUpdateLocked(
215 grpc_connectivity_state state, const absl::Status& status,
216 RefCountedPtr<SubchannelPicker> picker);
217
218 // The owning LB policy.
219 RefCountedPtr<WeightedTargetLb> weighted_target_policy_;
220
221 const std::string name_;
222
223 uint32_t weight_ = 0;
224
225 OrphanablePtr<LoadBalancingPolicy> child_policy_;
226
227 RefCountedPtr<SubchannelPicker> picker_;
228 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
229
230 OrphanablePtr<DelayedRemovalTimer> delayed_removal_timer_;
231 };
232
233 ~WeightedTargetLb() override;
234
235 void ShutdownLocked() override;
236
237 void UpdateStateLocked();
238
239 // Current config from the resolver.
240 RefCountedPtr<WeightedTargetLbConfig> config_;
241
242 // Internal state.
243 bool shutting_down_ = false;
244 bool update_in_progress_ = false;
245
246 // Children.
247 std::map<std::string, OrphanablePtr<WeightedChild>> targets_;
248 };
249
250 //
251 // WeightedTargetLb::WeightedPicker
252 //
253
Pick(PickArgs args)254 WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick(
255 PickArgs args) {
256 // Generate a random number in [0, total weight).
257 const uint64_t key = [&]() {
258 MutexLock lock(&mu_);
259 return absl::Uniform<uint64_t>(bit_gen_, 0, pickers_.back().first);
260 }();
261 // Find the index in pickers_ corresponding to key.
262 size_t mid = 0;
263 size_t start_index = 0;
264 size_t end_index = pickers_.size() - 1;
265 size_t index = 0;
266 while (end_index > start_index) {
267 mid = (start_index + end_index) / 2;
268 if (pickers_[mid].first > key) {
269 end_index = mid;
270 } else if (pickers_[mid].first < key) {
271 start_index = mid + 1;
272 } else {
273 index = mid + 1;
274 break;
275 }
276 }
277 if (index == 0) index = start_index;
278 GPR_ASSERT(pickers_[index].first > key);
279 // Delegate to the child picker.
280 return pickers_[index].second->Pick(args);
281 }
282
283 //
284 // WeightedTargetLb
285 //
286
WeightedTargetLb(Args args)287 WeightedTargetLb::WeightedTargetLb(Args args)
288 : LoadBalancingPolicy(std::move(args)) {
289 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
290 gpr_log(GPR_INFO, "[weighted_target_lb %p] created", this);
291 }
292 }
293
~WeightedTargetLb()294 WeightedTargetLb::~WeightedTargetLb() {
295 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
296 gpr_log(GPR_INFO,
297 "[weighted_target_lb %p] destroying weighted_target LB policy",
298 this);
299 }
300 }
301
ShutdownLocked()302 void WeightedTargetLb::ShutdownLocked() {
303 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
304 gpr_log(GPR_INFO, "[weighted_target_lb %p] shutting down", this);
305 }
306 shutting_down_ = true;
307 targets_.clear();
308 }
309
ResetBackoffLocked()310 void WeightedTargetLb::ResetBackoffLocked() {
311 for (auto& p : targets_) p.second->ResetBackoffLocked();
312 }
313
UpdateLocked(UpdateArgs args)314 absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) {
315 if (shutting_down_) return absl::OkStatus();
316 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
317 gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this);
318 }
319 update_in_progress_ = true;
320 // Update config.
321 config_ = args.config.TakeAsSubclass<WeightedTargetLbConfig>();
322 // Deactivate the targets not in the new config.
323 for (const auto& p : targets_) {
324 const std::string& name = p.first;
325 WeightedChild* child = p.second.get();
326 if (config_->target_map().find(name) == config_->target_map().end()) {
327 child->DeactivateLocked();
328 }
329 }
330 // Update all children.
331 absl::StatusOr<HierarchicalAddressMap> address_map =
332 MakeHierarchicalAddressMap(args.addresses);
333 std::vector<std::string> errors;
334 for (const auto& p : config_->target_map()) {
335 const std::string& name = p.first;
336 const WeightedTargetLbConfig::ChildConfig& config = p.second;
337 auto& target = targets_[name];
338 // Create child if it does not already exist.
339 if (target == nullptr) {
340 target = MakeOrphanable<WeightedChild>(
341 RefAsSubclass<WeightedTargetLb>(DEBUG_LOCATION, "WeightedChild"),
342 name);
343 }
344 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses;
345 if (address_map.ok()) {
346 auto it = address_map->find(name);
347 if (it == address_map->end()) {
348 addresses = std::make_shared<EndpointAddressesListIterator>(
349 EndpointAddressesList());
350 } else {
351 addresses = std::move(it->second);
352 }
353 } else {
354 addresses = address_map.status();
355 }
356 absl::Status status = target->UpdateLocked(config, std::move(addresses),
357 args.resolution_note, args.args);
358 if (!status.ok()) {
359 errors.emplace_back(
360 absl::StrCat("child ", name, ": ", status.ToString()));
361 }
362 }
363 update_in_progress_ = false;
364 if (config_->target_map().empty()) {
365 absl::Status status = absl::UnavailableError(absl::StrCat(
366 "no children in weighted_target policy: ", args.resolution_note));
367 channel_control_helper()->UpdateState(
368 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
369 MakeRefCounted<TransientFailurePicker>(status));
370 return absl::OkStatus();
371 }
372 UpdateStateLocked();
373 // Return status.
374 if (!errors.empty()) {
375 return absl::UnavailableError(absl::StrCat(
376 "errors from children: [", absl::StrJoin(errors, "; "), "]"));
377 }
378 return absl::OkStatus();
379 }
380
UpdateStateLocked()381 void WeightedTargetLb::UpdateStateLocked() {
382 // If we're in the process of propagating an update from our parent to
383 // our children, ignore any updates that come from the children. We
384 // will instead return a new picker once the update has been seen by
385 // all children. This avoids unnecessary picker churn while an update
386 // is being propagated to our children.
387 if (update_in_progress_) return;
388 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
389 gpr_log(GPR_INFO,
390 "[weighted_target_lb %p] scanning children to determine "
391 "connectivity state",
392 this);
393 }
394 // Construct lists of child pickers with associated weights, one for
395 // children that are in state READY and another for children that are
396 // in state TRANSIENT_FAILURE. Each child is represented by a portion of
397 // the range proportional to its weight, such that the total range is the
398 // sum of the weights of all children.
399 WeightedPicker::PickerList ready_picker_list;
400 uint64_t ready_end = 0;
401 WeightedPicker::PickerList tf_picker_list;
402 uint64_t tf_end = 0;
403 // Also count the number of children in CONNECTING and IDLE, to determine
404 // the aggregated state.
405 size_t num_connecting = 0;
406 size_t num_idle = 0;
407 for (const auto& p : targets_) {
408 const std::string& child_name = p.first;
409 const WeightedChild* child = p.second.get();
410 // Skip the targets that are not in the latest update.
411 if (config_->target_map().find(child_name) == config_->target_map().end()) {
412 continue;
413 }
414 auto child_picker = child->picker();
415 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
416 gpr_log(GPR_INFO,
417 "[weighted_target_lb %p] child=%s state=%s weight=%u picker=%p",
418 this, child_name.c_str(),
419 ConnectivityStateName(child->connectivity_state()),
420 child->weight(), child_picker.get());
421 }
422 switch (child->connectivity_state()) {
423 case GRPC_CHANNEL_READY: {
424 GPR_ASSERT(child->weight() > 0);
425 ready_end += child->weight();
426 ready_picker_list.emplace_back(ready_end, std::move(child_picker));
427 break;
428 }
429 case GRPC_CHANNEL_CONNECTING: {
430 ++num_connecting;
431 break;
432 }
433 case GRPC_CHANNEL_IDLE: {
434 ++num_idle;
435 break;
436 }
437 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
438 GPR_ASSERT(child->weight() > 0);
439 tf_end += child->weight();
440 tf_picker_list.emplace_back(tf_end, std::move(child_picker));
441 break;
442 }
443 default:
444 GPR_UNREACHABLE_CODE(return);
445 }
446 }
447 // Determine aggregated connectivity state.
448 grpc_connectivity_state connectivity_state;
449 if (!ready_picker_list.empty()) {
450 connectivity_state = GRPC_CHANNEL_READY;
451 } else if (num_connecting > 0) {
452 connectivity_state = GRPC_CHANNEL_CONNECTING;
453 } else if (num_idle > 0) {
454 connectivity_state = GRPC_CHANNEL_IDLE;
455 } else {
456 connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
457 }
458 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
459 gpr_log(GPR_INFO, "[weighted_target_lb %p] connectivity changed to %s",
460 this, ConnectivityStateName(connectivity_state));
461 }
462 RefCountedPtr<SubchannelPicker> picker;
463 absl::Status status;
464 switch (connectivity_state) {
465 case GRPC_CHANNEL_READY:
466 picker = MakeRefCounted<WeightedPicker>(std::move(ready_picker_list));
467 break;
468 case GRPC_CHANNEL_CONNECTING:
469 case GRPC_CHANNEL_IDLE:
470 picker = MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
471 break;
472 default:
473 picker = MakeRefCounted<WeightedPicker>(std::move(tf_picker_list));
474 }
475 channel_control_helper()->UpdateState(connectivity_state, status,
476 std::move(picker));
477 }
478
479 //
480 // WeightedTargetLb::WeightedChild::DelayedRemovalTimer
481 //
482
DelayedRemovalTimer(RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)483 WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer(
484 RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)
485 : weighted_child_(std::move(weighted_child)) {
486 timer_handle_ =
487 weighted_child_->weighted_target_policy_->channel_control_helper()
488 ->GetEventEngine()
489 ->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable {
490 ApplicationCallbackExecCtx app_exec_ctx;
491 ExecCtx exec_ctx;
492 auto* self_ptr = self.get(); // Avoid use-after-move problem.
493 self_ptr->weighted_child_->weighted_target_policy_
494 ->work_serializer()
495 ->Run([self = std::move(self)] { self->OnTimerLocked(); },
496 DEBUG_LOCATION);
497 });
498 }
499
Orphan()500 void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
501 if (timer_handle_.has_value()) {
502 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
503 gpr_log(GPR_INFO,
504 "[weighted_target_lb %p] WeightedChild %p %s: cancelling "
505 "delayed removal timer",
506 weighted_child_->weighted_target_policy_.get(),
507 weighted_child_.get(), weighted_child_->name_.c_str());
508 }
509 weighted_child_->weighted_target_policy_->channel_control_helper()
510 ->GetEventEngine()
511 ->Cancel(*timer_handle_);
512 }
513 Unref();
514 }
515
OnTimerLocked()516 void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimerLocked() {
517 GPR_ASSERT(timer_handle_.has_value());
518 timer_handle_.reset();
519 weighted_child_->weighted_target_policy_->targets_.erase(
520 weighted_child_->name_);
521 }
522
523 //
524 // WeightedTargetLb::WeightedChild
525 //
526
WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,const std::string & name)527 WeightedTargetLb::WeightedChild::WeightedChild(
528 RefCountedPtr<WeightedTargetLb> weighted_target_policy,
529 const std::string& name)
530 : weighted_target_policy_(std::move(weighted_target_policy)),
531 name_(name),
532 picker_(MakeRefCounted<QueuePicker>(nullptr)) {
533 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
534 gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s",
535 weighted_target_policy_.get(), this, name_.c_str());
536 }
537 }
538
~WeightedChild()539 WeightedTargetLb::WeightedChild::~WeightedChild() {
540 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
541 gpr_log(GPR_INFO,
542 "[weighted_target_lb %p] WeightedChild %p %s: destroying child",
543 weighted_target_policy_.get(), this, name_.c_str());
544 }
545 weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild");
546 }
547
Orphan()548 void WeightedTargetLb::WeightedChild::Orphan() {
549 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
550 gpr_log(GPR_INFO,
551 "[weighted_target_lb %p] WeightedChild %p %s: shutting down child",
552 weighted_target_policy_.get(), this, name_.c_str());
553 }
554 // Remove the child policy's interested_parties pollset_set from the
555 // xDS policy.
556 grpc_pollset_set_del_pollset_set(
557 child_policy_->interested_parties(),
558 weighted_target_policy_->interested_parties());
559 child_policy_.reset();
560 // Drop our ref to the child's picker, in case it's holding a ref to
561 // the child.
562 picker_.reset();
563 delayed_removal_timer_.reset();
564 Unref();
565 }
566
567 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)568 WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
569 const ChannelArgs& args) {
570 LoadBalancingPolicy::Args lb_policy_args;
571 lb_policy_args.work_serializer = weighted_target_policy_->work_serializer();
572 lb_policy_args.args = args;
573 lb_policy_args.channel_control_helper =
574 std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
575 OrphanablePtr<LoadBalancingPolicy> lb_policy =
576 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
577 &grpc_lb_weighted_target_trace);
578 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
579 gpr_log(GPR_INFO,
580 "[weighted_target_lb %p] WeightedChild %p %s: Created new child "
581 "policy handler %p",
582 weighted_target_policy_.get(), this, name_.c_str(),
583 lb_policy.get());
584 }
585 // Add the xDS's interested_parties pollset_set to that of the newly created
586 // child policy. This will make the child policy progress upon activity on
587 // xDS LB, which in turn is tied to the application's call.
588 grpc_pollset_set_add_pollset_set(
589 lb_policy->interested_parties(),
590 weighted_target_policy_->interested_parties());
591 return lb_policy;
592 }
593
UpdateLocked(const WeightedTargetLbConfig::ChildConfig & config,absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,const std::string & resolution_note,ChannelArgs args)594 absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
595 const WeightedTargetLbConfig::ChildConfig& config,
596 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
597 const std::string& resolution_note, ChannelArgs args) {
598 if (weighted_target_policy_->shutting_down_) return absl::OkStatus();
599 // Update child weight.
600 if (weight_ != config.weight &&
601 GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
602 gpr_log(GPR_INFO, "[weighted_target_lb %p] WeightedChild %p %s: weight=%u",
603 weighted_target_policy_.get(), this, name_.c_str(), config.weight);
604 }
605 weight_ = config.weight;
606 // Reactivate if needed.
607 if (delayed_removal_timer_ != nullptr) {
608 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
609 gpr_log(GPR_INFO,
610 "[weighted_target_lb %p] WeightedChild %p %s: reactivating",
611 weighted_target_policy_.get(), this, name_.c_str());
612 }
613 delayed_removal_timer_.reset();
614 }
615 // Create child policy if needed.
616 args = args.Set(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD, name_);
617 if (child_policy_ == nullptr) {
618 child_policy_ = CreateChildPolicyLocked(args);
619 }
620 // Construct update args.
621 UpdateArgs update_args;
622 update_args.config = config.config;
623 update_args.addresses = std::move(addresses);
624 update_args.resolution_note = resolution_note;
625 update_args.args = std::move(args);
626 // Update the policy.
627 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
628 gpr_log(GPR_INFO,
629 "[weighted_target_lb %p] WeightedChild %p %s: Updating child "
630 "policy handler %p",
631 weighted_target_policy_.get(), this, name_.c_str(),
632 child_policy_.get());
633 }
634 return child_policy_->UpdateLocked(std::move(update_args));
635 }
636
ResetBackoffLocked()637 void WeightedTargetLb::WeightedChild::ResetBackoffLocked() {
638 child_policy_->ResetBackoffLocked();
639 }
640
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)641 void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked(
642 grpc_connectivity_state state, const absl::Status& status,
643 RefCountedPtr<SubchannelPicker> picker) {
644 // Cache the picker in the WeightedChild.
645 picker_ = std::move(picker);
646 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
647 gpr_log(GPR_INFO,
648 "[weighted_target_lb %p] WeightedChild %p %s: connectivity "
649 "state update: state=%s (%s) picker=%p",
650 weighted_target_policy_.get(), this, name_.c_str(),
651 ConnectivityStateName(state), status.ToString().c_str(),
652 picker_.get());
653 }
654 // If the child reports IDLE, immediately tell it to exit idle.
655 if (state == GRPC_CHANNEL_IDLE) child_policy_->ExitIdleLocked();
656 // Decide what state to report for aggregation purposes.
657 // If the last recorded state was TRANSIENT_FAILURE and the new state
658 // is something other than READY, don't change the state.
659 if (connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE ||
660 state == GRPC_CHANNEL_READY) {
661 connectivity_state_ = state;
662 }
663 // Update the LB policy's state if this child is not deactivated.
664 if (weight_ != 0) weighted_target_policy_->UpdateStateLocked();
665 }
666
DeactivateLocked()667 void WeightedTargetLb::WeightedChild::DeactivateLocked() {
668 // If already deactivated, don't do that again.
669 if (weight_ == 0) return;
670 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
671 gpr_log(GPR_INFO,
672 "[weighted_target_lb %p] WeightedChild %p %s: deactivating",
673 weighted_target_policy_.get(), this, name_.c_str());
674 }
675 // Set the child weight to 0 so that future picker won't contain this child.
676 weight_ = 0;
677 // Start a timer to delete the child.
678 delayed_removal_timer_ = MakeOrphanable<DelayedRemovalTimer>(
679 Ref(DEBUG_LOCATION, "DelayedRemovalTimer"));
680 }
681
682 //
683 // WeightedTargetLb::WeightedChild::Helper
684 //
685
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)686 void WeightedTargetLb::WeightedChild::Helper::UpdateState(
687 grpc_connectivity_state state, const absl::Status& status,
688 RefCountedPtr<SubchannelPicker> picker) {
689 if (weighted_child_->weighted_target_policy_->shutting_down_) return;
690 weighted_child_->OnConnectivityStateUpdateLocked(state, status,
691 std::move(picker));
692 }
693
694 //
695 // factory
696 //
697
JsonLoader(const JsonArgs &)698 const JsonLoaderInterface* WeightedTargetLbConfig::ChildConfig::JsonLoader(
699 const JsonArgs&) {
700 static const auto* loader =
701 JsonObjectLoader<ChildConfig>()
702 // Note: The config field requires custom parsing, so it's
703 // handled in JsonPostLoad() instead.
704 .Field("weight", &ChildConfig::weight)
705 .Finish();
706 return loader;
707 }
708
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)709 void WeightedTargetLbConfig::ChildConfig::JsonPostLoad(
710 const Json& json, const JsonArgs&, ValidationErrors* errors) {
711 ValidationErrors::ScopedField field(errors, ".childPolicy");
712 auto it = json.object().find("childPolicy");
713 if (it == json.object().end()) {
714 errors->AddError("field not present");
715 return;
716 }
717 auto lb_config =
718 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
719 it->second);
720 if (!lb_config.ok()) {
721 errors->AddError(lb_config.status().message());
722 return;
723 }
724 config = std::move(*lb_config);
725 }
726
JsonLoader(const JsonArgs &)727 const JsonLoaderInterface* WeightedTargetLbConfig::JsonLoader(const JsonArgs&) {
728 static const auto* loader =
729 JsonObjectLoader<WeightedTargetLbConfig>()
730 .Field("targets", &WeightedTargetLbConfig::target_map_)
731 .Finish();
732 return loader;
733 }
734
735 class WeightedTargetLbFactory final : public LoadBalancingPolicyFactory {
736 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const737 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
738 LoadBalancingPolicy::Args args) const override {
739 return MakeOrphanable<WeightedTargetLb>(std::move(args));
740 }
741
name() const742 absl::string_view name() const override { return kWeightedTarget; }
743
744 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const745 ParseLoadBalancingConfig(const Json& json) const override {
746 return LoadFromJson<RefCountedPtr<WeightedTargetLbConfig>>(
747 json, JsonArgs(), "errors validating weighted_target LB policy config");
748 }
749 };
750
751 } // namespace
752
RegisterWeightedTargetLbPolicy(CoreConfiguration::Builder * builder)753 void RegisterWeightedTargetLbPolicy(CoreConfiguration::Builder* builder) {
754 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
755 std::make_unique<WeightedTargetLbFactory>());
756 }
757
758 } // namespace grpc_core
759