xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/priority/priority.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <inttypes.h>
20 
21 #include <algorithm>
22 #include <map>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <type_traits>
27 #include <utility>
28 #include <vector>
29 
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_join.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36 
37 #include <grpc/event_engine/event_engine.h>
38 #include <grpc/impl/channel_arg_names.h>
39 #include <grpc/impl/connectivity_state.h>
40 #include <grpc/support/log.h>
41 
42 #include "src/core/load_balancing/address_filtering.h"
43 #include "src/core/load_balancing/child_policy_handler.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/config/core_configuration.h"
46 #include "src/core/lib/debug/trace.h"
47 #include "src/core/lib/gprpp/debug_location.h"
48 #include "src/core/lib/gprpp/orphanable.h"
49 #include "src/core/lib/gprpp/ref_counted_ptr.h"
50 #include "src/core/lib/gprpp/ref_counted_string.h"
51 #include "src/core/lib/gprpp/time.h"
52 #include "src/core/lib/gprpp/validation_errors.h"
53 #include "src/core/lib/gprpp/work_serializer.h"
54 #include "src/core/lib/iomgr/exec_ctx.h"
55 #include "src/core/lib/iomgr/pollset_set.h"
56 #include "src/core/lib/json/json.h"
57 #include "src/core/lib/json/json_args.h"
58 #include "src/core/lib/json/json_object_loader.h"
59 #include "src/core/lib/transport/connectivity_state.h"
60 #include "src/core/load_balancing/delegating_helper.h"
61 #include "src/core/load_balancing/lb_policy.h"
62 #include "src/core/load_balancing/lb_policy_factory.h"
63 #include "src/core/load_balancing/lb_policy_registry.h"
64 #include "src/core/resolver/endpoint_addresses.h"
65 
66 namespace grpc_core {
67 
68 TraceFlag grpc_lb_priority_trace(false, "priority_lb");
69 
70 namespace {
71 
72 using ::grpc_event_engine::experimental::EventEngine;
73 
74 constexpr absl::string_view kPriority = "priority_experimental";
75 
76 // How long we keep a child around for after it is no longer being used
77 // (either because it has been removed from the config or because we
78 // have switched to a higher-priority child).
79 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
80 
81 // Default for how long we wait for a newly created child to get connected
82 // before starting to attempt the next priority.  Overridable via channel arg.
83 constexpr Duration kDefaultChildFailoverTimeout = Duration::Seconds(10);
84 
85 // Config for priority LB policy.
86 class PriorityLbConfig final : public LoadBalancingPolicy::Config {
87  public:
88   struct PriorityLbChild {
89     RefCountedPtr<LoadBalancingPolicy::Config> config;
90     bool ignore_reresolution_requests = false;
91 
92     static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
93     void JsonPostLoad(const Json& json, const JsonArgs&,
94                       ValidationErrors* errors);
95   };
96 
97   PriorityLbConfig() = default;
98 
99   PriorityLbConfig(const PriorityLbConfig&) = delete;
100   PriorityLbConfig& operator=(const PriorityLbConfig&) = delete;
101 
102   PriorityLbConfig(PriorityLbConfig&& other) = delete;
103   PriorityLbConfig& operator=(PriorityLbConfig&& other) = delete;
104 
name() const105   absl::string_view name() const override { return kPriority; }
106 
children() const107   const std::map<std::string, PriorityLbChild>& children() const {
108     return children_;
109   }
priorities() const110   const std::vector<std::string>& priorities() const { return priorities_; }
111 
112   static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
113   void JsonPostLoad(const Json& json, const JsonArgs&,
114                     ValidationErrors* errors);
115 
116  private:
117   std::map<std::string, PriorityLbChild> children_;
118   std::vector<std::string> priorities_;
119 };
120 
121 // priority LB policy.
122 class PriorityLb final : public LoadBalancingPolicy {
123  public:
124   explicit PriorityLb(Args args);
125 
name() const126   absl::string_view name() const override { return kPriority; }
127 
128   absl::Status UpdateLocked(UpdateArgs args) override;
129   void ExitIdleLocked() override;
130   void ResetBackoffLocked() override;
131 
132  private:
133   // Each ChildPriority holds a ref to the PriorityLb.
134   class ChildPriority final : public InternallyRefCounted<ChildPriority> {
135    public:
136     ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
137 
~ChildPriority()138     ~ChildPriority() override {
139       priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
140     }
141 
name() const142     const std::string& name() const { return name_; }
143 
144     absl::Status UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
145                               bool ignore_reresolution_requests);
146     void ExitIdleLocked();
147     void ResetBackoffLocked();
148     void MaybeDeactivateLocked();
149     void MaybeReactivateLocked();
150 
151     void Orphan() override;
152 
153     RefCountedPtr<SubchannelPicker> GetPicker();
154 
connectivity_state() const155     grpc_connectivity_state connectivity_state() const {
156       return connectivity_state_;
157     }
158 
connectivity_status() const159     const absl::Status& connectivity_status() const {
160       return connectivity_status_;
161     }
162 
FailoverTimerPending() const163     bool FailoverTimerPending() const { return failover_timer_ != nullptr; }
164 
165    private:
166     class Helper final : public DelegatingChannelControlHelper {
167      public:
Helper(RefCountedPtr<ChildPriority> priority)168       explicit Helper(RefCountedPtr<ChildPriority> priority)
169           : priority_(std::move(priority)) {}
170 
~Helper()171       ~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); }
172 
173       void UpdateState(grpc_connectivity_state state,
174                        const absl::Status& status,
175                        RefCountedPtr<SubchannelPicker> picker) override;
176       void RequestReresolution() override;
177 
178      private:
parent_helper() const179       ChannelControlHelper* parent_helper() const override {
180         return priority_->priority_policy_->channel_control_helper();
181       }
182 
183       RefCountedPtr<ChildPriority> priority_;
184     };
185 
186     class DeactivationTimer final
187         : public InternallyRefCounted<DeactivationTimer> {
188      public:
189       explicit DeactivationTimer(RefCountedPtr<ChildPriority> child_priority);
190 
191       void Orphan() override;
192 
193      private:
194       void OnTimerLocked();
195 
196       RefCountedPtr<ChildPriority> child_priority_;
197       absl::optional<EventEngine::TaskHandle> timer_handle_;
198     };
199 
200     class FailoverTimer final : public InternallyRefCounted<FailoverTimer> {
201      public:
202       explicit FailoverTimer(RefCountedPtr<ChildPriority> child_priority);
203 
204       void Orphan() override;
205 
206      private:
207       void OnTimerLocked();
208 
209       RefCountedPtr<ChildPriority> child_priority_;
210       absl::optional<EventEngine::TaskHandle> timer_handle_;
211     };
212 
213     // Methods for dealing with the child policy.
214     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
215         const ChannelArgs& args);
216 
217     void OnConnectivityStateUpdateLocked(
218         grpc_connectivity_state state, const absl::Status& status,
219         RefCountedPtr<SubchannelPicker> picker);
220 
221     RefCountedPtr<PriorityLb> priority_policy_;
222     const std::string name_;
223     bool ignore_reresolution_requests_ = false;
224 
225     OrphanablePtr<LoadBalancingPolicy> child_policy_;
226 
227     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
228     absl::Status connectivity_status_;
229     RefCountedPtr<SubchannelPicker> picker_;
230 
231     bool seen_ready_or_idle_since_transient_failure_ = true;
232 
233     OrphanablePtr<DeactivationTimer> deactivation_timer_;
234     OrphanablePtr<FailoverTimer> failover_timer_;
235   };
236 
237   ~PriorityLb() override;
238 
239   void ShutdownLocked() override;
240 
241   // Returns the priority of the specified child name, or UINT32_MAX if
242   // the child is not in the current priority list.
243   uint32_t GetChildPriorityLocked(const std::string& child_name) const;
244 
245   // Deletes a child.  Called when the child's deactivation timer fires.
246   void DeleteChild(ChildPriority* child);
247 
248   // Iterates through the list of priorities to choose one:
249   // - If the child for a priority doesn't exist, creates it.
250   // - If a child's failover timer is pending, selects that priority
251   //   while we wait for the child to attempt to connect.
252   // - If the child is connected, selects that priority.
253   // - Otherwise, continues on to the next child.
254   // Delegates to the last child if none of the children are connecting.
255   // Reports TRANSIENT_FAILURE if the priority list is empty.
256   //
257   // This method is idempotent; it should yield the same result every
258   // time as a function of the state of the children.
259   void ChoosePriorityLocked();
260 
261   // Sets the specified priority as the current priority.
262   // Optionally deactivates any children at lower priorities.
263   // Returns the child's picker to the channel.
264   void SetCurrentPriorityLocked(int32_t priority,
265                                 bool deactivate_lower_priorities,
266                                 const char* reason);
267 
268   const Duration child_failover_timeout_;
269 
270   // Current channel args and config from the resolver.
271   ChannelArgs args_;
272   RefCountedPtr<PriorityLbConfig> config_;
273   absl::StatusOr<HierarchicalAddressMap> addresses_;
274   std::string resolution_note_;
275 
276   // Internal state.
277   bool shutting_down_ = false;
278 
279   bool update_in_progress_ = false;
280 
281   // All children that currently exist.
282   // Some of these children may be in deactivated state.
283   std::map<std::string, OrphanablePtr<ChildPriority>> children_;
284   // The priority that is being used.
285   uint32_t current_priority_ = UINT32_MAX;
286 };
287 
288 //
289 // PriorityLb
290 //
291 
PriorityLb(Args args)292 PriorityLb::PriorityLb(Args args)
293     : LoadBalancingPolicy(std::move(args)),
294       child_failover_timeout_(std::max(
295           Duration::Zero(),
296           channel_args()
297               .GetDurationFromIntMillis(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS)
298               .value_or(kDefaultChildFailoverTimeout))) {
299   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
300     gpr_log(GPR_INFO, "[priority_lb %p] created", this);
301   }
302 }
303 
~PriorityLb()304 PriorityLb::~PriorityLb() {
305   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
306     gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this);
307   }
308 }
309 
ShutdownLocked()310 void PriorityLb::ShutdownLocked() {
311   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
312     gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this);
313   }
314   shutting_down_ = true;
315   children_.clear();
316 }
317 
ExitIdleLocked()318 void PriorityLb::ExitIdleLocked() {
319   if (current_priority_ != UINT32_MAX) {
320     const std::string& child_name = config_->priorities()[current_priority_];
321     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
322       gpr_log(GPR_INFO,
323               "[priority_lb %p] exiting IDLE for current priority %d child %s",
324               this, current_priority_, child_name.c_str());
325     }
326     children_[child_name]->ExitIdleLocked();
327   }
328 }
329 
ResetBackoffLocked()330 void PriorityLb::ResetBackoffLocked() {
331   for (const auto& p : children_) p.second->ResetBackoffLocked();
332 }
333 
UpdateLocked(UpdateArgs args)334 absl::Status PriorityLb::UpdateLocked(UpdateArgs args) {
335   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
336     gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
337   }
338   // Update config.
339   config_ = args.config.TakeAsSubclass<PriorityLbConfig>();
340   // Update args.
341   args_ = std::move(args.args);
342   // Update addresses.
343   addresses_ = MakeHierarchicalAddressMap(args.addresses);
344   resolution_note_ = std::move(args.resolution_note);
345   // Check all existing children against the new config.
346   update_in_progress_ = true;
347   std::vector<std::string> errors;
348   for (const auto& p : children_) {
349     const std::string& child_name = p.first;
350     auto& child = p.second;
351     auto config_it = config_->children().find(child_name);
352     if (config_it == config_->children().end()) {
353       // Existing child not found in new config.  Deactivate it.
354       child->MaybeDeactivateLocked();
355     } else {
356       // Existing child found in new config.  Update it.
357       absl::Status status =
358           child->UpdateLocked(config_it->second.config,
359                               config_it->second.ignore_reresolution_requests);
360       if (!status.ok()) {
361         errors.emplace_back(
362             absl::StrCat("child ", child_name, ": ", status.ToString()));
363       }
364     }
365   }
366   update_in_progress_ = false;
367   // Try to get connected.
368   ChoosePriorityLocked();
369   // Return status.
370   if (!errors.empty()) {
371     return absl::UnavailableError(absl::StrCat(
372         "errors from children: [", absl::StrJoin(errors, "; "), "]"));
373   }
374   return absl::OkStatus();
375 }
376 
GetChildPriorityLocked(const std::string & child_name) const377 uint32_t PriorityLb::GetChildPriorityLocked(
378     const std::string& child_name) const {
379   for (uint32_t priority = 0; priority < config_->priorities().size();
380        ++priority) {
381     if (config_->priorities()[priority] == child_name) return priority;
382   }
383   return UINT32_MAX;
384 }
385 
DeleteChild(ChildPriority * child)386 void PriorityLb::DeleteChild(ChildPriority* child) {
387   children_.erase(child->name());
388 }
389 
ChoosePriorityLocked()390 void PriorityLb::ChoosePriorityLocked() {
391   // If priority list is empty, report TF.
392   if (config_->priorities().empty()) {
393     absl::Status status =
394         absl::UnavailableError("priority policy has empty priority list");
395     channel_control_helper()->UpdateState(
396         GRPC_CHANNEL_TRANSIENT_FAILURE, status,
397         MakeRefCounted<TransientFailurePicker>(status));
398     return;
399   }
400   // Iterate through priorities, searching for one in READY or IDLE,
401   // creating new children as needed.
402   current_priority_ = UINT32_MAX;
403   for (uint32_t priority = 0; priority < config_->priorities().size();
404        ++priority) {
405     // If the child for the priority does not exist yet, create it.
406     const std::string& child_name = config_->priorities()[priority];
407     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
408       gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
409               priority, child_name.c_str());
410     }
411     auto& child = children_[child_name];
412     // Create child if needed.
413     if (child == nullptr) {
414       child = MakeOrphanable<ChildPriority>(
415           RefAsSubclass<PriorityLb>(DEBUG_LOCATION, "ChildPriority"),
416           child_name);
417       auto child_config = config_->children().find(child_name);
418       GPR_DEBUG_ASSERT(child_config != config_->children().end());
419       // TODO(roth): If the child reports a non-OK status with the
420       // update, we need to propagate that back to the resolver somehow.
421       (void)child->UpdateLocked(
422           child_config->second.config,
423           child_config->second.ignore_reresolution_requests);
424     } else {
425       // The child already exists.  Reactivate if needed.
426       child->MaybeReactivateLocked();
427     }
428     // Select this child if it is in states READY or IDLE.
429     if (child->connectivity_state() == GRPC_CHANNEL_READY ||
430         child->connectivity_state() == GRPC_CHANNEL_IDLE) {
431       SetCurrentPriorityLocked(
432           priority, /*deactivate_lower_priorities=*/true,
433           ConnectivityStateName(child->connectivity_state()));
434       return;
435     }
436     // Select this child if its failover timer is pending.
437     if (child->FailoverTimerPending()) {
438       SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false,
439                                "failover timer pending");
440       return;
441     }
442     // Child has been failing for a while.  Move on to the next priority.
443     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
444       gpr_log(GPR_INFO,
445               "[priority_lb %p] skipping priority %u, child %s: state=%s, "
446               "failover timer not pending",
447               this, priority, child_name.c_str(),
448               ConnectivityStateName(child->connectivity_state()));
449     }
450   }
451   // If we didn't find any priority to try, pick the first one in state
452   // CONNECTING.
453   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
454     gpr_log(GPR_INFO,
455             "[priority_lb %p] no priority reachable, checking for CONNECTING "
456             "priority to delegate to",
457             this);
458   }
459   for (uint32_t priority = 0; priority < config_->priorities().size();
460        ++priority) {
461     // If the child for the priority does not exist yet, create it.
462     const std::string& child_name = config_->priorities()[priority];
463     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
464       gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
465               priority, child_name.c_str());
466     }
467     auto& child = children_[child_name];
468     GPR_ASSERT(child != nullptr);
469     if (child->connectivity_state() == GRPC_CHANNEL_CONNECTING) {
470       SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false,
471                                "CONNECTING (pass 2)");
472       return;
473     }
474   }
475   // Did not find any child in CONNECTING, delegate to last child.
476   SetCurrentPriorityLocked(config_->priorities().size() - 1,
477                            /*deactivate_lower_priorities=*/false,
478                            "no usable children");
479 }
480 
SetCurrentPriorityLocked(int32_t priority,bool deactivate_lower_priorities,const char * reason)481 void PriorityLb::SetCurrentPriorityLocked(int32_t priority,
482                                           bool deactivate_lower_priorities,
483                                           const char* reason) {
484   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
485     gpr_log(GPR_INFO,
486             "[priority_lb %p] selecting priority %u, child %s (%s, "
487             "deactivate_lower_priorities=%d)",
488             this, priority, config_->priorities()[priority].c_str(), reason,
489             deactivate_lower_priorities);
490   }
491   current_priority_ = priority;
492   if (deactivate_lower_priorities) {
493     for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
494       const std::string& child_name = config_->priorities()[p];
495       auto it = children_.find(child_name);
496       if (it != children_.end()) it->second->MaybeDeactivateLocked();
497     }
498   }
499   auto& child = children_[config_->priorities()[priority]];
500   GPR_ASSERT(child != nullptr);
501   channel_control_helper()->UpdateState(child->connectivity_state(),
502                                         child->connectivity_status(),
503                                         child->GetPicker());
504 }
505 
506 //
507 // PriorityLb::ChildPriority::DeactivationTimer
508 //
509 
DeactivationTimer(RefCountedPtr<PriorityLb::ChildPriority> child_priority)510 PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer(
511     RefCountedPtr<PriorityLb::ChildPriority> child_priority)
512     : child_priority_(std::move(child_priority)) {
513   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
514     gpr_log(GPR_INFO,
515             "[priority_lb %p] child %s (%p): deactivating -- will remove in "
516             "%" PRId64 "ms",
517             child_priority_->priority_policy_.get(),
518             child_priority_->name_.c_str(), child_priority_.get(),
519             kChildRetentionInterval.millis());
520   }
521   timer_handle_ =
522       child_priority_->priority_policy_->channel_control_helper()
523           ->GetEventEngine()
524           ->RunAfter(kChildRetentionInterval, [self = Ref(DEBUG_LOCATION,
525                                                           "Timer")]() mutable {
526             ApplicationCallbackExecCtx callback_exec_ctx;
527             ExecCtx exec_ctx;
528             auto self_ptr = self.get();
529             self_ptr->child_priority_->priority_policy_->work_serializer()->Run(
530                 [self = std::move(self)]() { self->OnTimerLocked(); },
531                 DEBUG_LOCATION);
532           });
533 }
534 
Orphan()535 void PriorityLb::ChildPriority::DeactivationTimer::Orphan() {
536   if (timer_handle_.has_value()) {
537     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
538       gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
539               child_priority_->priority_policy_.get(),
540               child_priority_->name_.c_str(), child_priority_.get());
541     }
542     child_priority_->priority_policy_->channel_control_helper()
543         ->GetEventEngine()
544         ->Cancel(*timer_handle_);
545     timer_handle_.reset();
546   }
547   Unref();
548 }
549 
OnTimerLocked()550 void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked() {
551   if (timer_handle_.has_value()) {
552     timer_handle_.reset();
553     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
554       gpr_log(GPR_INFO,
555               "[priority_lb %p] child %s (%p): deactivation timer fired, "
556               "deleting child",
557               child_priority_->priority_policy_.get(),
558               child_priority_->name_.c_str(), child_priority_.get());
559     }
560     child_priority_->priority_policy_->DeleteChild(child_priority_.get());
561   }
562 }
563 
564 //
565 // PriorityLb::ChildPriority::FailoverTimer
566 //
567 
FailoverTimer(RefCountedPtr<PriorityLb::ChildPriority> child_priority)568 PriorityLb::ChildPriority::FailoverTimer::FailoverTimer(
569     RefCountedPtr<PriorityLb::ChildPriority> child_priority)
570     : child_priority_(std::move(child_priority)) {
571   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
572     gpr_log(
573         GPR_INFO,
574         "[priority_lb %p] child %s (%p): starting failover timer for %" PRId64
575         "ms",
576         child_priority_->priority_policy_.get(), child_priority_->name_.c_str(),
577         child_priority_.get(),
578         child_priority_->priority_policy_->child_failover_timeout_.millis());
579   }
580   timer_handle_ =
581       child_priority_->priority_policy_->channel_control_helper()
582           ->GetEventEngine()
583           ->RunAfter(
584               child_priority_->priority_policy_->child_failover_timeout_,
585               [self = Ref(DEBUG_LOCATION, "Timer")]() mutable {
586                 ApplicationCallbackExecCtx callback_exec_ctx;
587                 ExecCtx exec_ctx;
588                 auto self_ptr = self.get();
589                 self_ptr->child_priority_->priority_policy_->work_serializer()
590                     ->Run([self = std::move(self)]() { self->OnTimerLocked(); },
591                           DEBUG_LOCATION);
592               });
593 }
594 
Orphan()595 void PriorityLb::ChildPriority::FailoverTimer::Orphan() {
596   if (timer_handle_.has_value()) {
597     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
598       gpr_log(GPR_INFO,
599               "[priority_lb %p] child %s (%p): cancelling failover timer",
600               child_priority_->priority_policy_.get(),
601               child_priority_->name_.c_str(), child_priority_.get());
602     }
603     child_priority_->priority_policy_->channel_control_helper()
604         ->GetEventEngine()
605         ->Cancel(*timer_handle_);
606     timer_handle_.reset();
607   }
608   Unref();
609 }
610 
OnTimerLocked()611 void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked() {
612   if (timer_handle_.has_value()) {
613     timer_handle_.reset();
614     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
615       gpr_log(GPR_INFO,
616               "[priority_lb %p] child %s (%p): failover timer fired, "
617               "reporting TRANSIENT_FAILURE",
618               child_priority_->priority_policy_.get(),
619               child_priority_->name_.c_str(), child_priority_.get());
620     }
621     child_priority_->OnConnectivityStateUpdateLocked(
622         GRPC_CHANNEL_TRANSIENT_FAILURE,
623         absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
624         nullptr);
625   }
626 }
627 
628 //
629 // PriorityLb::ChildPriority
630 //
631 
ChildPriority(RefCountedPtr<PriorityLb> priority_policy,std::string name)632 PriorityLb::ChildPriority::ChildPriority(
633     RefCountedPtr<PriorityLb> priority_policy, std::string name)
634     : priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
635   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
636     gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
637             priority_policy_.get(), name_.c_str(), this);
638   }
639   // Start the failover timer.
640   failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
641 }
642 
Orphan()643 void PriorityLb::ChildPriority::Orphan() {
644   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
645     gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
646             priority_policy_.get(), name_.c_str(), this);
647   }
648   failover_timer_.reset();
649   deactivation_timer_.reset();
650   // Remove the child policy's interested_parties pollset_set from the
651   // xDS policy.
652   grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
653                                    priority_policy_->interested_parties());
654   child_policy_.reset();
655   // Drop our ref to the child's picker, in case it's holding a ref to
656   // the child.
657   picker_.reset();
658   Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
659 }
660 
661 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
GetPicker()662 PriorityLb::ChildPriority::GetPicker() {
663   if (picker_ == nullptr) {
664     return MakeRefCounted<QueuePicker>(
665         priority_policy_->Ref(DEBUG_LOCATION, "QueuePicker"));
666   }
667   return picker_;
668 }
669 
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,bool ignore_reresolution_requests)670 absl::Status PriorityLb::ChildPriority::UpdateLocked(
671     RefCountedPtr<LoadBalancingPolicy::Config> config,
672     bool ignore_reresolution_requests) {
673   if (priority_policy_->shutting_down_) return absl::OkStatus();
674   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
675     gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
676             priority_policy_.get(), name_.c_str(), this);
677   }
678   ignore_reresolution_requests_ = ignore_reresolution_requests;
679   // Create policy if needed.
680   if (child_policy_ == nullptr) {
681     child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
682   }
683   // Construct update args.
684   UpdateArgs update_args;
685   update_args.config = std::move(config);
686   if (priority_policy_->addresses_.ok()) {
687     auto it = priority_policy_->addresses_->find(name_);
688     if (it == priority_policy_->addresses_->end()) {
689       update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
690           EndpointAddressesList());
691     } else {
692       update_args.addresses = it->second;
693     }
694   } else {
695     update_args.addresses = priority_policy_->addresses_.status();
696   }
697   update_args.resolution_note = priority_policy_->resolution_note_;
698   update_args.args = priority_policy_->args_;
699   // Update the policy.
700   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
701     gpr_log(GPR_INFO,
702             "[priority_lb %p] child %s (%p): updating child policy handler %p",
703             priority_policy_.get(), name_.c_str(), this, child_policy_.get());
704   }
705   return child_policy_->UpdateLocked(std::move(update_args));
706 }
707 
708 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)709 PriorityLb::ChildPriority::CreateChildPolicyLocked(const ChannelArgs& args) {
710   LoadBalancingPolicy::Args lb_policy_args;
711   lb_policy_args.work_serializer = priority_policy_->work_serializer();
712   lb_policy_args.args = args;
713   lb_policy_args.channel_control_helper =
714       std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
715   OrphanablePtr<LoadBalancingPolicy> lb_policy =
716       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
717                                          &grpc_lb_priority_trace);
718   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
719     gpr_log(GPR_INFO,
720             "[priority_lb %p] child %s (%p): created new child policy "
721             "handler %p",
722             priority_policy_.get(), name_.c_str(), this, lb_policy.get());
723   }
724   // Add the parent's interested_parties pollset_set to that of the newly
725   // created child policy. This will make the child policy progress upon
726   // activity on the parent LB, which in turn is tied to the application's call.
727   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
728                                    priority_policy_->interested_parties());
729   return lb_policy;
730 }
731 
ExitIdleLocked()732 void PriorityLb::ChildPriority::ExitIdleLocked() {
733   child_policy_->ExitIdleLocked();
734 }
735 
ResetBackoffLocked()736 void PriorityLb::ChildPriority::ResetBackoffLocked() {
737   child_policy_->ResetBackoffLocked();
738 }
739 
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)740 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
741     grpc_connectivity_state state, const absl::Status& status,
742     RefCountedPtr<SubchannelPicker> picker) {
743   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
744     gpr_log(GPR_INFO,
745             "[priority_lb %p] child %s (%p): state update: %s (%s) picker %p",
746             priority_policy_.get(), name_.c_str(), this,
747             ConnectivityStateName(state), status.ToString().c_str(),
748             picker.get());
749   }
750   // Store the state and picker.
751   connectivity_state_ = state;
752   connectivity_status_ = status;
753   // When the failover timer fires, this method will be called with picker
754   // set to null, because we want to consider the child to be in
755   // TRANSIENT_FAILURE, but we have no new picker to report.  In that case,
756   // just keep using the old picker, in case we wind up delegating to this
757   // child when all priorities are failing.
758   if (picker != nullptr) picker_ = std::move(picker);
759   // If we transition to state CONNECTING and we've not seen
760   // TRANSIENT_FAILURE more recently than READY or IDLE, start failover
761   // timer if not already pending.
762   // In any other state, update seen_ready_or_idle_since_transient_failure_
763   // and cancel failover timer.
764   if (state == GRPC_CHANNEL_CONNECTING) {
765     if (seen_ready_or_idle_since_transient_failure_ &&
766         failover_timer_ == nullptr) {
767       failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
768     }
769   } else if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE) {
770     seen_ready_or_idle_since_transient_failure_ = true;
771     failover_timer_.reset();
772   } else if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
773     seen_ready_or_idle_since_transient_failure_ = false;
774     failover_timer_.reset();
775   }
776   // Call the LB policy's ChoosePriorityLocked() to choose a priority to
777   // use based on the updated state of this child.
778   //
779   // Note that if we're in the process of propagating an update from our
780   // parent to our children, we skip this, because we don't want to
781   // choose a new priority based on inconsistent state.  Instead, the
782   // policy will choose a new priority once the update has been seen by
783   // all children.
784   if (!priority_policy_->update_in_progress_) {
785     priority_policy_->ChoosePriorityLocked();
786   }
787 }
788 
MaybeDeactivateLocked()789 void PriorityLb::ChildPriority::MaybeDeactivateLocked() {
790   if (deactivation_timer_ == nullptr) {
791     deactivation_timer_ = MakeOrphanable<DeactivationTimer>(Ref());
792   }
793 }
794 
MaybeReactivateLocked()795 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
796   deactivation_timer_.reset();
797 }
798 
799 //
800 // PriorityLb::ChildPriority::Helper
801 //
802 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)803 void PriorityLb::ChildPriority::Helper::UpdateState(
804     grpc_connectivity_state state, const absl::Status& status,
805     RefCountedPtr<SubchannelPicker> picker) {
806   if (priority_->priority_policy_->shutting_down_) return;
807   // Notify the priority.
808   priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));
809 }
810 
RequestReresolution()811 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
812   if (priority_->priority_policy_->shutting_down_) return;
813   if (priority_->ignore_reresolution_requests_) {
814     return;
815   }
816   priority_->priority_policy_->channel_control_helper()->RequestReresolution();
817 }
818 
819 //
820 // factory
821 //
822 
JsonLoader(const JsonArgs &)823 const JsonLoaderInterface* PriorityLbConfig::PriorityLbChild::JsonLoader(
824     const JsonArgs&) {
825   static const auto* loader =
826       JsonObjectLoader<PriorityLbChild>()
827           // Note: The "config" field requires custom parsing, so it's
828           // handled in JsonPostLoad() instead of here.
829           .OptionalField("ignore_reresolution_requests",
830                          &PriorityLbChild::ignore_reresolution_requests)
831           .Finish();
832   return loader;
833 }
834 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)835 void PriorityLbConfig::PriorityLbChild::JsonPostLoad(const Json& json,
836                                                      const JsonArgs&,
837                                                      ValidationErrors* errors) {
838   ValidationErrors::ScopedField field(errors, ".config");
839   auto it = json.object().find("config");
840   if (it == json.object().end()) {
841     errors->AddError("field not present");
842     return;
843   }
844   auto lb_config =
845       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
846           it->second);
847   if (!lb_config.ok()) {
848     errors->AddError(lb_config.status().message());
849     return;
850   }
851   config = std::move(*lb_config);
852 }
853 
JsonLoader(const JsonArgs &)854 const JsonLoaderInterface* PriorityLbConfig::JsonLoader(const JsonArgs&) {
855   static const auto* loader =
856       JsonObjectLoader<PriorityLbConfig>()
857           .Field("children", &PriorityLbConfig::children_)
858           .Field("priorities", &PriorityLbConfig::priorities_)
859           .Finish();
860   return loader;
861 }
862 
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)863 void PriorityLbConfig::JsonPostLoad(const Json& /*json*/, const JsonArgs&,
864                                     ValidationErrors* errors) {
865   std::set<std::string> unknown_priorities;
866   for (const std::string& priority : priorities_) {
867     if (children_.find(priority) == children_.end()) {
868       unknown_priorities.insert(priority);
869     }
870   }
871   if (!unknown_priorities.empty()) {
872     errors->AddError(absl::StrCat("unknown priorit(ies): [",
873                                   absl::StrJoin(unknown_priorities, ", "),
874                                   "]"));
875   }
876 }
877 
878 class PriorityLbFactory final : public LoadBalancingPolicyFactory {
879  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const880   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
881       LoadBalancingPolicy::Args args) const override {
882     return MakeOrphanable<PriorityLb>(std::move(args));
883   }
884 
name() const885   absl::string_view name() const override { return kPriority; }
886 
887   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const888   ParseLoadBalancingConfig(const Json& json) const override {
889     return LoadFromJson<RefCountedPtr<PriorityLbConfig>>(
890         json, JsonArgs(), "errors validating priority LB policy config");
891   }
892 };
893 
894 }  // namespace
895 
RegisterPriorityLbPolicy(CoreConfiguration::Builder * builder)896 void RegisterPriorityLbPolicy(CoreConfiguration::Builder* builder) {
897   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
898       std::make_unique<PriorityLbFactory>());
899 }
900 
901 }  // namespace grpc_core
902