1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <inttypes.h>
20
21 #include <algorithm>
22 #include <map>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <type_traits>
27 #include <utility>
28 #include <vector>
29
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_join.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36
37 #include <grpc/event_engine/event_engine.h>
38 #include <grpc/impl/channel_arg_names.h>
39 #include <grpc/impl/connectivity_state.h>
40 #include <grpc/support/log.h>
41
42 #include "src/core/load_balancing/address_filtering.h"
43 #include "src/core/load_balancing/child_policy_handler.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/config/core_configuration.h"
46 #include "src/core/lib/debug/trace.h"
47 #include "src/core/lib/gprpp/debug_location.h"
48 #include "src/core/lib/gprpp/orphanable.h"
49 #include "src/core/lib/gprpp/ref_counted_ptr.h"
50 #include "src/core/lib/gprpp/ref_counted_string.h"
51 #include "src/core/lib/gprpp/time.h"
52 #include "src/core/lib/gprpp/validation_errors.h"
53 #include "src/core/lib/gprpp/work_serializer.h"
54 #include "src/core/lib/iomgr/exec_ctx.h"
55 #include "src/core/lib/iomgr/pollset_set.h"
56 #include "src/core/lib/json/json.h"
57 #include "src/core/lib/json/json_args.h"
58 #include "src/core/lib/json/json_object_loader.h"
59 #include "src/core/lib/transport/connectivity_state.h"
60 #include "src/core/load_balancing/delegating_helper.h"
61 #include "src/core/load_balancing/lb_policy.h"
62 #include "src/core/load_balancing/lb_policy_factory.h"
63 #include "src/core/load_balancing/lb_policy_registry.h"
64 #include "src/core/resolver/endpoint_addresses.h"
65
66 namespace grpc_core {
67
68 TraceFlag grpc_lb_priority_trace(false, "priority_lb");
69
70 namespace {
71
72 using ::grpc_event_engine::experimental::EventEngine;
73
74 constexpr absl::string_view kPriority = "priority_experimental";
75
76 // How long we keep a child around for after it is no longer being used
77 // (either because it has been removed from the config or because we
78 // have switched to a higher-priority child).
79 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
80
81 // Default for how long we wait for a newly created child to get connected
82 // before starting to attempt the next priority. Overridable via channel arg.
83 constexpr Duration kDefaultChildFailoverTimeout = Duration::Seconds(10);
84
85 // Config for priority LB policy.
86 class PriorityLbConfig final : public LoadBalancingPolicy::Config {
87 public:
88 struct PriorityLbChild {
89 RefCountedPtr<LoadBalancingPolicy::Config> config;
90 bool ignore_reresolution_requests = false;
91
92 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
93 void JsonPostLoad(const Json& json, const JsonArgs&,
94 ValidationErrors* errors);
95 };
96
97 PriorityLbConfig() = default;
98
99 PriorityLbConfig(const PriorityLbConfig&) = delete;
100 PriorityLbConfig& operator=(const PriorityLbConfig&) = delete;
101
102 PriorityLbConfig(PriorityLbConfig&& other) = delete;
103 PriorityLbConfig& operator=(PriorityLbConfig&& other) = delete;
104
name() const105 absl::string_view name() const override { return kPriority; }
106
children() const107 const std::map<std::string, PriorityLbChild>& children() const {
108 return children_;
109 }
priorities() const110 const std::vector<std::string>& priorities() const { return priorities_; }
111
112 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
113 void JsonPostLoad(const Json& json, const JsonArgs&,
114 ValidationErrors* errors);
115
116 private:
117 std::map<std::string, PriorityLbChild> children_;
118 std::vector<std::string> priorities_;
119 };
120
121 // priority LB policy.
122 class PriorityLb final : public LoadBalancingPolicy {
123 public:
124 explicit PriorityLb(Args args);
125
name() const126 absl::string_view name() const override { return kPriority; }
127
128 absl::Status UpdateLocked(UpdateArgs args) override;
129 void ExitIdleLocked() override;
130 void ResetBackoffLocked() override;
131
132 private:
133 // Each ChildPriority holds a ref to the PriorityLb.
134 class ChildPriority final : public InternallyRefCounted<ChildPriority> {
135 public:
136 ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
137
~ChildPriority()138 ~ChildPriority() override {
139 priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
140 }
141
name() const142 const std::string& name() const { return name_; }
143
144 absl::Status UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
145 bool ignore_reresolution_requests);
146 void ExitIdleLocked();
147 void ResetBackoffLocked();
148 void MaybeDeactivateLocked();
149 void MaybeReactivateLocked();
150
151 void Orphan() override;
152
153 RefCountedPtr<SubchannelPicker> GetPicker();
154
connectivity_state() const155 grpc_connectivity_state connectivity_state() const {
156 return connectivity_state_;
157 }
158
connectivity_status() const159 const absl::Status& connectivity_status() const {
160 return connectivity_status_;
161 }
162
FailoverTimerPending() const163 bool FailoverTimerPending() const { return failover_timer_ != nullptr; }
164
165 private:
166 class Helper final : public DelegatingChannelControlHelper {
167 public:
Helper(RefCountedPtr<ChildPriority> priority)168 explicit Helper(RefCountedPtr<ChildPriority> priority)
169 : priority_(std::move(priority)) {}
170
~Helper()171 ~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); }
172
173 void UpdateState(grpc_connectivity_state state,
174 const absl::Status& status,
175 RefCountedPtr<SubchannelPicker> picker) override;
176 void RequestReresolution() override;
177
178 private:
parent_helper() const179 ChannelControlHelper* parent_helper() const override {
180 return priority_->priority_policy_->channel_control_helper();
181 }
182
183 RefCountedPtr<ChildPriority> priority_;
184 };
185
186 class DeactivationTimer final
187 : public InternallyRefCounted<DeactivationTimer> {
188 public:
189 explicit DeactivationTimer(RefCountedPtr<ChildPriority> child_priority);
190
191 void Orphan() override;
192
193 private:
194 void OnTimerLocked();
195
196 RefCountedPtr<ChildPriority> child_priority_;
197 absl::optional<EventEngine::TaskHandle> timer_handle_;
198 };
199
200 class FailoverTimer final : public InternallyRefCounted<FailoverTimer> {
201 public:
202 explicit FailoverTimer(RefCountedPtr<ChildPriority> child_priority);
203
204 void Orphan() override;
205
206 private:
207 void OnTimerLocked();
208
209 RefCountedPtr<ChildPriority> child_priority_;
210 absl::optional<EventEngine::TaskHandle> timer_handle_;
211 };
212
213 // Methods for dealing with the child policy.
214 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
215 const ChannelArgs& args);
216
217 void OnConnectivityStateUpdateLocked(
218 grpc_connectivity_state state, const absl::Status& status,
219 RefCountedPtr<SubchannelPicker> picker);
220
221 RefCountedPtr<PriorityLb> priority_policy_;
222 const std::string name_;
223 bool ignore_reresolution_requests_ = false;
224
225 OrphanablePtr<LoadBalancingPolicy> child_policy_;
226
227 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
228 absl::Status connectivity_status_;
229 RefCountedPtr<SubchannelPicker> picker_;
230
231 bool seen_ready_or_idle_since_transient_failure_ = true;
232
233 OrphanablePtr<DeactivationTimer> deactivation_timer_;
234 OrphanablePtr<FailoverTimer> failover_timer_;
235 };
236
237 ~PriorityLb() override;
238
239 void ShutdownLocked() override;
240
241 // Returns the priority of the specified child name, or UINT32_MAX if
242 // the child is not in the current priority list.
243 uint32_t GetChildPriorityLocked(const std::string& child_name) const;
244
245 // Deletes a child. Called when the child's deactivation timer fires.
246 void DeleteChild(ChildPriority* child);
247
248 // Iterates through the list of priorities to choose one:
249 // - If the child for a priority doesn't exist, creates it.
250 // - If a child's failover timer is pending, selects that priority
251 // while we wait for the child to attempt to connect.
252 // - If the child is connected, selects that priority.
253 // - Otherwise, continues on to the next child.
254 // Delegates to the last child if none of the children are connecting.
255 // Reports TRANSIENT_FAILURE if the priority list is empty.
256 //
257 // This method is idempotent; it should yield the same result every
258 // time as a function of the state of the children.
259 void ChoosePriorityLocked();
260
261 // Sets the specified priority as the current priority.
262 // Optionally deactivates any children at lower priorities.
263 // Returns the child's picker to the channel.
264 void SetCurrentPriorityLocked(int32_t priority,
265 bool deactivate_lower_priorities,
266 const char* reason);
267
268 const Duration child_failover_timeout_;
269
270 // Current channel args and config from the resolver.
271 ChannelArgs args_;
272 RefCountedPtr<PriorityLbConfig> config_;
273 absl::StatusOr<HierarchicalAddressMap> addresses_;
274 std::string resolution_note_;
275
276 // Internal state.
277 bool shutting_down_ = false;
278
279 bool update_in_progress_ = false;
280
281 // All children that currently exist.
282 // Some of these children may be in deactivated state.
283 std::map<std::string, OrphanablePtr<ChildPriority>> children_;
284 // The priority that is being used.
285 uint32_t current_priority_ = UINT32_MAX;
286 };
287
288 //
289 // PriorityLb
290 //
291
PriorityLb(Args args)292 PriorityLb::PriorityLb(Args args)
293 : LoadBalancingPolicy(std::move(args)),
294 child_failover_timeout_(std::max(
295 Duration::Zero(),
296 channel_args()
297 .GetDurationFromIntMillis(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS)
298 .value_or(kDefaultChildFailoverTimeout))) {
299 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
300 gpr_log(GPR_INFO, "[priority_lb %p] created", this);
301 }
302 }
303
~PriorityLb()304 PriorityLb::~PriorityLb() {
305 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
306 gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this);
307 }
308 }
309
ShutdownLocked()310 void PriorityLb::ShutdownLocked() {
311 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
312 gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this);
313 }
314 shutting_down_ = true;
315 children_.clear();
316 }
317
ExitIdleLocked()318 void PriorityLb::ExitIdleLocked() {
319 if (current_priority_ != UINT32_MAX) {
320 const std::string& child_name = config_->priorities()[current_priority_];
321 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
322 gpr_log(GPR_INFO,
323 "[priority_lb %p] exiting IDLE for current priority %d child %s",
324 this, current_priority_, child_name.c_str());
325 }
326 children_[child_name]->ExitIdleLocked();
327 }
328 }
329
ResetBackoffLocked()330 void PriorityLb::ResetBackoffLocked() {
331 for (const auto& p : children_) p.second->ResetBackoffLocked();
332 }
333
UpdateLocked(UpdateArgs args)334 absl::Status PriorityLb::UpdateLocked(UpdateArgs args) {
335 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
336 gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
337 }
338 // Update config.
339 config_ = args.config.TakeAsSubclass<PriorityLbConfig>();
340 // Update args.
341 args_ = std::move(args.args);
342 // Update addresses.
343 addresses_ = MakeHierarchicalAddressMap(args.addresses);
344 resolution_note_ = std::move(args.resolution_note);
345 // Check all existing children against the new config.
346 update_in_progress_ = true;
347 std::vector<std::string> errors;
348 for (const auto& p : children_) {
349 const std::string& child_name = p.first;
350 auto& child = p.second;
351 auto config_it = config_->children().find(child_name);
352 if (config_it == config_->children().end()) {
353 // Existing child not found in new config. Deactivate it.
354 child->MaybeDeactivateLocked();
355 } else {
356 // Existing child found in new config. Update it.
357 absl::Status status =
358 child->UpdateLocked(config_it->second.config,
359 config_it->second.ignore_reresolution_requests);
360 if (!status.ok()) {
361 errors.emplace_back(
362 absl::StrCat("child ", child_name, ": ", status.ToString()));
363 }
364 }
365 }
366 update_in_progress_ = false;
367 // Try to get connected.
368 ChoosePriorityLocked();
369 // Return status.
370 if (!errors.empty()) {
371 return absl::UnavailableError(absl::StrCat(
372 "errors from children: [", absl::StrJoin(errors, "; "), "]"));
373 }
374 return absl::OkStatus();
375 }
376
GetChildPriorityLocked(const std::string & child_name) const377 uint32_t PriorityLb::GetChildPriorityLocked(
378 const std::string& child_name) const {
379 for (uint32_t priority = 0; priority < config_->priorities().size();
380 ++priority) {
381 if (config_->priorities()[priority] == child_name) return priority;
382 }
383 return UINT32_MAX;
384 }
385
DeleteChild(ChildPriority * child)386 void PriorityLb::DeleteChild(ChildPriority* child) {
387 children_.erase(child->name());
388 }
389
ChoosePriorityLocked()390 void PriorityLb::ChoosePriorityLocked() {
391 // If priority list is empty, report TF.
392 if (config_->priorities().empty()) {
393 absl::Status status =
394 absl::UnavailableError("priority policy has empty priority list");
395 channel_control_helper()->UpdateState(
396 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
397 MakeRefCounted<TransientFailurePicker>(status));
398 return;
399 }
400 // Iterate through priorities, searching for one in READY or IDLE,
401 // creating new children as needed.
402 current_priority_ = UINT32_MAX;
403 for (uint32_t priority = 0; priority < config_->priorities().size();
404 ++priority) {
405 // If the child for the priority does not exist yet, create it.
406 const std::string& child_name = config_->priorities()[priority];
407 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
408 gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
409 priority, child_name.c_str());
410 }
411 auto& child = children_[child_name];
412 // Create child if needed.
413 if (child == nullptr) {
414 child = MakeOrphanable<ChildPriority>(
415 RefAsSubclass<PriorityLb>(DEBUG_LOCATION, "ChildPriority"),
416 child_name);
417 auto child_config = config_->children().find(child_name);
418 GPR_DEBUG_ASSERT(child_config != config_->children().end());
419 // TODO(roth): If the child reports a non-OK status with the
420 // update, we need to propagate that back to the resolver somehow.
421 (void)child->UpdateLocked(
422 child_config->second.config,
423 child_config->second.ignore_reresolution_requests);
424 } else {
425 // The child already exists. Reactivate if needed.
426 child->MaybeReactivateLocked();
427 }
428 // Select this child if it is in states READY or IDLE.
429 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
430 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
431 SetCurrentPriorityLocked(
432 priority, /*deactivate_lower_priorities=*/true,
433 ConnectivityStateName(child->connectivity_state()));
434 return;
435 }
436 // Select this child if its failover timer is pending.
437 if (child->FailoverTimerPending()) {
438 SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false,
439 "failover timer pending");
440 return;
441 }
442 // Child has been failing for a while. Move on to the next priority.
443 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
444 gpr_log(GPR_INFO,
445 "[priority_lb %p] skipping priority %u, child %s: state=%s, "
446 "failover timer not pending",
447 this, priority, child_name.c_str(),
448 ConnectivityStateName(child->connectivity_state()));
449 }
450 }
451 // If we didn't find any priority to try, pick the first one in state
452 // CONNECTING.
453 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
454 gpr_log(GPR_INFO,
455 "[priority_lb %p] no priority reachable, checking for CONNECTING "
456 "priority to delegate to",
457 this);
458 }
459 for (uint32_t priority = 0; priority < config_->priorities().size();
460 ++priority) {
461 // If the child for the priority does not exist yet, create it.
462 const std::string& child_name = config_->priorities()[priority];
463 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
464 gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
465 priority, child_name.c_str());
466 }
467 auto& child = children_[child_name];
468 GPR_ASSERT(child != nullptr);
469 if (child->connectivity_state() == GRPC_CHANNEL_CONNECTING) {
470 SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false,
471 "CONNECTING (pass 2)");
472 return;
473 }
474 }
475 // Did not find any child in CONNECTING, delegate to last child.
476 SetCurrentPriorityLocked(config_->priorities().size() - 1,
477 /*deactivate_lower_priorities=*/false,
478 "no usable children");
479 }
480
SetCurrentPriorityLocked(int32_t priority,bool deactivate_lower_priorities,const char * reason)481 void PriorityLb::SetCurrentPriorityLocked(int32_t priority,
482 bool deactivate_lower_priorities,
483 const char* reason) {
484 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
485 gpr_log(GPR_INFO,
486 "[priority_lb %p] selecting priority %u, child %s (%s, "
487 "deactivate_lower_priorities=%d)",
488 this, priority, config_->priorities()[priority].c_str(), reason,
489 deactivate_lower_priorities);
490 }
491 current_priority_ = priority;
492 if (deactivate_lower_priorities) {
493 for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
494 const std::string& child_name = config_->priorities()[p];
495 auto it = children_.find(child_name);
496 if (it != children_.end()) it->second->MaybeDeactivateLocked();
497 }
498 }
499 auto& child = children_[config_->priorities()[priority]];
500 GPR_ASSERT(child != nullptr);
501 channel_control_helper()->UpdateState(child->connectivity_state(),
502 child->connectivity_status(),
503 child->GetPicker());
504 }
505
506 //
507 // PriorityLb::ChildPriority::DeactivationTimer
508 //
509
DeactivationTimer(RefCountedPtr<PriorityLb::ChildPriority> child_priority)510 PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer(
511 RefCountedPtr<PriorityLb::ChildPriority> child_priority)
512 : child_priority_(std::move(child_priority)) {
513 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
514 gpr_log(GPR_INFO,
515 "[priority_lb %p] child %s (%p): deactivating -- will remove in "
516 "%" PRId64 "ms",
517 child_priority_->priority_policy_.get(),
518 child_priority_->name_.c_str(), child_priority_.get(),
519 kChildRetentionInterval.millis());
520 }
521 timer_handle_ =
522 child_priority_->priority_policy_->channel_control_helper()
523 ->GetEventEngine()
524 ->RunAfter(kChildRetentionInterval, [self = Ref(DEBUG_LOCATION,
525 "Timer")]() mutable {
526 ApplicationCallbackExecCtx callback_exec_ctx;
527 ExecCtx exec_ctx;
528 auto self_ptr = self.get();
529 self_ptr->child_priority_->priority_policy_->work_serializer()->Run(
530 [self = std::move(self)]() { self->OnTimerLocked(); },
531 DEBUG_LOCATION);
532 });
533 }
534
Orphan()535 void PriorityLb::ChildPriority::DeactivationTimer::Orphan() {
536 if (timer_handle_.has_value()) {
537 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
538 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
539 child_priority_->priority_policy_.get(),
540 child_priority_->name_.c_str(), child_priority_.get());
541 }
542 child_priority_->priority_policy_->channel_control_helper()
543 ->GetEventEngine()
544 ->Cancel(*timer_handle_);
545 timer_handle_.reset();
546 }
547 Unref();
548 }
549
OnTimerLocked()550 void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked() {
551 if (timer_handle_.has_value()) {
552 timer_handle_.reset();
553 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
554 gpr_log(GPR_INFO,
555 "[priority_lb %p] child %s (%p): deactivation timer fired, "
556 "deleting child",
557 child_priority_->priority_policy_.get(),
558 child_priority_->name_.c_str(), child_priority_.get());
559 }
560 child_priority_->priority_policy_->DeleteChild(child_priority_.get());
561 }
562 }
563
564 //
565 // PriorityLb::ChildPriority::FailoverTimer
566 //
567
FailoverTimer(RefCountedPtr<PriorityLb::ChildPriority> child_priority)568 PriorityLb::ChildPriority::FailoverTimer::FailoverTimer(
569 RefCountedPtr<PriorityLb::ChildPriority> child_priority)
570 : child_priority_(std::move(child_priority)) {
571 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
572 gpr_log(
573 GPR_INFO,
574 "[priority_lb %p] child %s (%p): starting failover timer for %" PRId64
575 "ms",
576 child_priority_->priority_policy_.get(), child_priority_->name_.c_str(),
577 child_priority_.get(),
578 child_priority_->priority_policy_->child_failover_timeout_.millis());
579 }
580 timer_handle_ =
581 child_priority_->priority_policy_->channel_control_helper()
582 ->GetEventEngine()
583 ->RunAfter(
584 child_priority_->priority_policy_->child_failover_timeout_,
585 [self = Ref(DEBUG_LOCATION, "Timer")]() mutable {
586 ApplicationCallbackExecCtx callback_exec_ctx;
587 ExecCtx exec_ctx;
588 auto self_ptr = self.get();
589 self_ptr->child_priority_->priority_policy_->work_serializer()
590 ->Run([self = std::move(self)]() { self->OnTimerLocked(); },
591 DEBUG_LOCATION);
592 });
593 }
594
Orphan()595 void PriorityLb::ChildPriority::FailoverTimer::Orphan() {
596 if (timer_handle_.has_value()) {
597 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
598 gpr_log(GPR_INFO,
599 "[priority_lb %p] child %s (%p): cancelling failover timer",
600 child_priority_->priority_policy_.get(),
601 child_priority_->name_.c_str(), child_priority_.get());
602 }
603 child_priority_->priority_policy_->channel_control_helper()
604 ->GetEventEngine()
605 ->Cancel(*timer_handle_);
606 timer_handle_.reset();
607 }
608 Unref();
609 }
610
OnTimerLocked()611 void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked() {
612 if (timer_handle_.has_value()) {
613 timer_handle_.reset();
614 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
615 gpr_log(GPR_INFO,
616 "[priority_lb %p] child %s (%p): failover timer fired, "
617 "reporting TRANSIENT_FAILURE",
618 child_priority_->priority_policy_.get(),
619 child_priority_->name_.c_str(), child_priority_.get());
620 }
621 child_priority_->OnConnectivityStateUpdateLocked(
622 GRPC_CHANNEL_TRANSIENT_FAILURE,
623 absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
624 nullptr);
625 }
626 }
627
628 //
629 // PriorityLb::ChildPriority
630 //
631
ChildPriority(RefCountedPtr<PriorityLb> priority_policy,std::string name)632 PriorityLb::ChildPriority::ChildPriority(
633 RefCountedPtr<PriorityLb> priority_policy, std::string name)
634 : priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
635 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
636 gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
637 priority_policy_.get(), name_.c_str(), this);
638 }
639 // Start the failover timer.
640 failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
641 }
642
Orphan()643 void PriorityLb::ChildPriority::Orphan() {
644 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
645 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
646 priority_policy_.get(), name_.c_str(), this);
647 }
648 failover_timer_.reset();
649 deactivation_timer_.reset();
650 // Remove the child policy's interested_parties pollset_set from the
651 // xDS policy.
652 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
653 priority_policy_->interested_parties());
654 child_policy_.reset();
655 // Drop our ref to the child's picker, in case it's holding a ref to
656 // the child.
657 picker_.reset();
658 Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
659 }
660
661 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
GetPicker()662 PriorityLb::ChildPriority::GetPicker() {
663 if (picker_ == nullptr) {
664 return MakeRefCounted<QueuePicker>(
665 priority_policy_->Ref(DEBUG_LOCATION, "QueuePicker"));
666 }
667 return picker_;
668 }
669
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,bool ignore_reresolution_requests)670 absl::Status PriorityLb::ChildPriority::UpdateLocked(
671 RefCountedPtr<LoadBalancingPolicy::Config> config,
672 bool ignore_reresolution_requests) {
673 if (priority_policy_->shutting_down_) return absl::OkStatus();
674 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
675 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
676 priority_policy_.get(), name_.c_str(), this);
677 }
678 ignore_reresolution_requests_ = ignore_reresolution_requests;
679 // Create policy if needed.
680 if (child_policy_ == nullptr) {
681 child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
682 }
683 // Construct update args.
684 UpdateArgs update_args;
685 update_args.config = std::move(config);
686 if (priority_policy_->addresses_.ok()) {
687 auto it = priority_policy_->addresses_->find(name_);
688 if (it == priority_policy_->addresses_->end()) {
689 update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
690 EndpointAddressesList());
691 } else {
692 update_args.addresses = it->second;
693 }
694 } else {
695 update_args.addresses = priority_policy_->addresses_.status();
696 }
697 update_args.resolution_note = priority_policy_->resolution_note_;
698 update_args.args = priority_policy_->args_;
699 // Update the policy.
700 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
701 gpr_log(GPR_INFO,
702 "[priority_lb %p] child %s (%p): updating child policy handler %p",
703 priority_policy_.get(), name_.c_str(), this, child_policy_.get());
704 }
705 return child_policy_->UpdateLocked(std::move(update_args));
706 }
707
708 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)709 PriorityLb::ChildPriority::CreateChildPolicyLocked(const ChannelArgs& args) {
710 LoadBalancingPolicy::Args lb_policy_args;
711 lb_policy_args.work_serializer = priority_policy_->work_serializer();
712 lb_policy_args.args = args;
713 lb_policy_args.channel_control_helper =
714 std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
715 OrphanablePtr<LoadBalancingPolicy> lb_policy =
716 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
717 &grpc_lb_priority_trace);
718 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
719 gpr_log(GPR_INFO,
720 "[priority_lb %p] child %s (%p): created new child policy "
721 "handler %p",
722 priority_policy_.get(), name_.c_str(), this, lb_policy.get());
723 }
724 // Add the parent's interested_parties pollset_set to that of the newly
725 // created child policy. This will make the child policy progress upon
726 // activity on the parent LB, which in turn is tied to the application's call.
727 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
728 priority_policy_->interested_parties());
729 return lb_policy;
730 }
731
ExitIdleLocked()732 void PriorityLb::ChildPriority::ExitIdleLocked() {
733 child_policy_->ExitIdleLocked();
734 }
735
ResetBackoffLocked()736 void PriorityLb::ChildPriority::ResetBackoffLocked() {
737 child_policy_->ResetBackoffLocked();
738 }
739
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)740 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
741 grpc_connectivity_state state, const absl::Status& status,
742 RefCountedPtr<SubchannelPicker> picker) {
743 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
744 gpr_log(GPR_INFO,
745 "[priority_lb %p] child %s (%p): state update: %s (%s) picker %p",
746 priority_policy_.get(), name_.c_str(), this,
747 ConnectivityStateName(state), status.ToString().c_str(),
748 picker.get());
749 }
750 // Store the state and picker.
751 connectivity_state_ = state;
752 connectivity_status_ = status;
753 // When the failover timer fires, this method will be called with picker
754 // set to null, because we want to consider the child to be in
755 // TRANSIENT_FAILURE, but we have no new picker to report. In that case,
756 // just keep using the old picker, in case we wind up delegating to this
757 // child when all priorities are failing.
758 if (picker != nullptr) picker_ = std::move(picker);
759 // If we transition to state CONNECTING and we've not seen
760 // TRANSIENT_FAILURE more recently than READY or IDLE, start failover
761 // timer if not already pending.
762 // In any other state, update seen_ready_or_idle_since_transient_failure_
763 // and cancel failover timer.
764 if (state == GRPC_CHANNEL_CONNECTING) {
765 if (seen_ready_or_idle_since_transient_failure_ &&
766 failover_timer_ == nullptr) {
767 failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
768 }
769 } else if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE) {
770 seen_ready_or_idle_since_transient_failure_ = true;
771 failover_timer_.reset();
772 } else if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
773 seen_ready_or_idle_since_transient_failure_ = false;
774 failover_timer_.reset();
775 }
776 // Call the LB policy's ChoosePriorityLocked() to choose a priority to
777 // use based on the updated state of this child.
778 //
779 // Note that if we're in the process of propagating an update from our
780 // parent to our children, we skip this, because we don't want to
781 // choose a new priority based on inconsistent state. Instead, the
782 // policy will choose a new priority once the update has been seen by
783 // all children.
784 if (!priority_policy_->update_in_progress_) {
785 priority_policy_->ChoosePriorityLocked();
786 }
787 }
788
MaybeDeactivateLocked()789 void PriorityLb::ChildPriority::MaybeDeactivateLocked() {
790 if (deactivation_timer_ == nullptr) {
791 deactivation_timer_ = MakeOrphanable<DeactivationTimer>(Ref());
792 }
793 }
794
MaybeReactivateLocked()795 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
796 deactivation_timer_.reset();
797 }
798
799 //
800 // PriorityLb::ChildPriority::Helper
801 //
802
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)803 void PriorityLb::ChildPriority::Helper::UpdateState(
804 grpc_connectivity_state state, const absl::Status& status,
805 RefCountedPtr<SubchannelPicker> picker) {
806 if (priority_->priority_policy_->shutting_down_) return;
807 // Notify the priority.
808 priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));
809 }
810
RequestReresolution()811 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
812 if (priority_->priority_policy_->shutting_down_) return;
813 if (priority_->ignore_reresolution_requests_) {
814 return;
815 }
816 priority_->priority_policy_->channel_control_helper()->RequestReresolution();
817 }
818
819 //
820 // factory
821 //
822
JsonLoader(const JsonArgs &)823 const JsonLoaderInterface* PriorityLbConfig::PriorityLbChild::JsonLoader(
824 const JsonArgs&) {
825 static const auto* loader =
826 JsonObjectLoader<PriorityLbChild>()
827 // Note: The "config" field requires custom parsing, so it's
828 // handled in JsonPostLoad() instead of here.
829 .OptionalField("ignore_reresolution_requests",
830 &PriorityLbChild::ignore_reresolution_requests)
831 .Finish();
832 return loader;
833 }
834
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)835 void PriorityLbConfig::PriorityLbChild::JsonPostLoad(const Json& json,
836 const JsonArgs&,
837 ValidationErrors* errors) {
838 ValidationErrors::ScopedField field(errors, ".config");
839 auto it = json.object().find("config");
840 if (it == json.object().end()) {
841 errors->AddError("field not present");
842 return;
843 }
844 auto lb_config =
845 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
846 it->second);
847 if (!lb_config.ok()) {
848 errors->AddError(lb_config.status().message());
849 return;
850 }
851 config = std::move(*lb_config);
852 }
853
JsonLoader(const JsonArgs &)854 const JsonLoaderInterface* PriorityLbConfig::JsonLoader(const JsonArgs&) {
855 static const auto* loader =
856 JsonObjectLoader<PriorityLbConfig>()
857 .Field("children", &PriorityLbConfig::children_)
858 .Field("priorities", &PriorityLbConfig::priorities_)
859 .Finish();
860 return loader;
861 }
862
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)863 void PriorityLbConfig::JsonPostLoad(const Json& /*json*/, const JsonArgs&,
864 ValidationErrors* errors) {
865 std::set<std::string> unknown_priorities;
866 for (const std::string& priority : priorities_) {
867 if (children_.find(priority) == children_.end()) {
868 unknown_priorities.insert(priority);
869 }
870 }
871 if (!unknown_priorities.empty()) {
872 errors->AddError(absl::StrCat("unknown priorit(ies): [",
873 absl::StrJoin(unknown_priorities, ", "),
874 "]"));
875 }
876 }
877
878 class PriorityLbFactory final : public LoadBalancingPolicyFactory {
879 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const880 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
881 LoadBalancingPolicy::Args args) const override {
882 return MakeOrphanable<PriorityLb>(std::move(args));
883 }
884
name() const885 absl::string_view name() const override { return kPriority; }
886
887 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const888 ParseLoadBalancingConfig(const Json& json) const override {
889 return LoadFromJson<RefCountedPtr<PriorityLbConfig>>(
890 json, JsonArgs(), "errors validating priority LB policy config");
891 }
892 };
893
894 } // namespace
895
RegisterPriorityLbPolicy(CoreConfiguration::Builder * builder)896 void RegisterPriorityLbPolicy(CoreConfiguration::Builder* builder) {
897 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
898 std::make_unique<PriorityLbFactory>());
899 }
900
901 } // namespace grpc_core
902