xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/round_robin/round_robin.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <inttypes.h>
20 #include <stdlib.h>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 #include "absl/meta/type_traits.h"
30 #include "absl/random/random.h"
31 #include "absl/status/status.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36 
37 #include <grpc/impl/connectivity_state.h>
38 #include <grpc/support/log.h>
39 
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/config/core_configuration.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/gprpp/debug_location.h"
44 #include "src/core/lib/gprpp/orphanable.h"
45 #include "src/core/lib/gprpp/ref_counted_ptr.h"
46 #include "src/core/lib/gprpp/work_serializer.h"
47 #include "src/core/lib/json/json.h"
48 #include "src/core/lib/transport/connectivity_state.h"
49 #include "src/core/load_balancing/endpoint_list.h"
50 #include "src/core/load_balancing/lb_policy.h"
51 #include "src/core/load_balancing/lb_policy_factory.h"
52 #include "src/core/resolver/endpoint_addresses.h"
53 
54 namespace grpc_core {
55 
56 TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
57 
58 namespace {
59 
60 constexpr absl::string_view kRoundRobin = "round_robin";
61 
62 class RoundRobin final : public LoadBalancingPolicy {
63  public:
64   explicit RoundRobin(Args args);
65 
name() const66   absl::string_view name() const override { return kRoundRobin; }
67 
68   absl::Status UpdateLocked(UpdateArgs args) override;
69   void ResetBackoffLocked() override;
70 
71  private:
72   class RoundRobinEndpointList final : public EndpointList {
73    public:
RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,EndpointAddressesIterator * endpoints,const ChannelArgs & args)74     RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,
75                            EndpointAddressesIterator* endpoints,
76                            const ChannelArgs& args)
77         : EndpointList(std::move(round_robin),
78                        GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)
79                            ? "RoundRobinEndpointList"
80                            : nullptr) {
81       Init(endpoints, args,
82            [&](RefCountedPtr<EndpointList> endpoint_list,
83                const EndpointAddresses& addresses, const ChannelArgs& args) {
84              return MakeOrphanable<RoundRobinEndpoint>(
85                  std::move(endpoint_list), addresses, args,
86                  policy<RoundRobin>()->work_serializer());
87            });
88     }
89 
90    private:
91     class RoundRobinEndpoint final : public Endpoint {
92      public:
RoundRobinEndpoint(RefCountedPtr<EndpointList> endpoint_list,const EndpointAddresses & addresses,const ChannelArgs & args,std::shared_ptr<WorkSerializer> work_serializer)93       RoundRobinEndpoint(RefCountedPtr<EndpointList> endpoint_list,
94                          const EndpointAddresses& addresses,
95                          const ChannelArgs& args,
96                          std::shared_ptr<WorkSerializer> work_serializer)
97           : Endpoint(std::move(endpoint_list)) {
98         Init(addresses, args, std::move(work_serializer));
99       }
100 
101      private:
102       // Called when the child policy reports a connectivity state update.
103       void OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,
104                          grpc_connectivity_state new_state,
105                          const absl::Status& status) override;
106     };
107 
channel_control_helper() const108     LoadBalancingPolicy::ChannelControlHelper* channel_control_helper()
109         const override {
110       return policy<RoundRobin>()->channel_control_helper();
111     }
112 
113     // Updates the counters of children in each state when a
114     // child transitions from old_state to new_state.
115     void UpdateStateCountersLocked(
116         absl::optional<grpc_connectivity_state> old_state,
117         grpc_connectivity_state new_state);
118 
119     // Ensures that the right child list is used and then updates
120     // the RR policy's connectivity state based on the child list's
121     // state counters.
122     void MaybeUpdateRoundRobinConnectivityStateLocked(
123         absl::Status status_for_tf);
124 
CountersString() const125     std::string CountersString() const {
126       return absl::StrCat("num_children=", size(), " num_ready=", num_ready_,
127                           " num_connecting=", num_connecting_,
128                           " num_transient_failure=", num_transient_failure_);
129     }
130 
131     size_t num_ready_ = 0;
132     size_t num_connecting_ = 0;
133     size_t num_transient_failure_ = 0;
134 
135     absl::Status last_failure_;
136   };
137 
138   class Picker final : public SubchannelPicker {
139    public:
140     Picker(RoundRobin* parent,
141            std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>>
142                pickers);
143 
144     PickResult Pick(PickArgs args) override;
145 
146    private:
147     // Using pointer value only, no ref held -- do not dereference!
148     RoundRobin* parent_;
149 
150     std::atomic<size_t> last_picked_index_;
151     std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers_;
152   };
153 
154   ~RoundRobin() override;
155 
156   void ShutdownLocked() override;
157 
158   // Current child list.
159   OrphanablePtr<RoundRobinEndpointList> endpoint_list_;
160   // Latest pending child list.
161   // When we get an updated address list, we create a new child list
162   // for it here, and we wait to swap it into endpoint_list_ until the new
163   // list becomes READY.
164   OrphanablePtr<RoundRobinEndpointList> latest_pending_endpoint_list_;
165 
166   bool shutdown_ = false;
167 
168   absl::BitGen bit_gen_;
169 };
170 
171 //
172 // RoundRobin::Picker
173 //
174 
Picker(RoundRobin * parent,std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers)175 RoundRobin::Picker::Picker(
176     RoundRobin* parent,
177     std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers)
178     : parent_(parent), pickers_(std::move(pickers)) {
179   // For discussion on why we generate a random starting index for
180   // the picker, see https://github.com/grpc/grpc-go/issues/2580.
181   size_t index = absl::Uniform<size_t>(parent->bit_gen_, 0, pickers_.size());
182   last_picked_index_.store(index, std::memory_order_relaxed);
183   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
184     gpr_log(GPR_INFO,
185             "[RR %p picker %p] created picker from endpoint_list=%p "
186             "with %" PRIuPTR " READY children; last_picked_index_=%" PRIuPTR,
187             parent_, this, parent_->endpoint_list_.get(), pickers_.size(),
188             index);
189   }
190 }
191 
Pick(PickArgs args)192 RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
193   size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) %
194                  pickers_.size();
195   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
196     gpr_log(GPR_INFO,
197             "[RR %p picker %p] using picker index %" PRIuPTR ", picker=%p",
198             parent_, this, index, pickers_[index].get());
199   }
200   return pickers_[index]->Pick(args);
201 }
202 
203 //
204 // RoundRobin
205 //
206 
RoundRobin(Args args)207 RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
208   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
209     gpr_log(GPR_INFO, "[RR %p] Created", this);
210   }
211 }
212 
~RoundRobin()213 RoundRobin::~RoundRobin() {
214   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
215     gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
216   }
217   GPR_ASSERT(endpoint_list_ == nullptr);
218   GPR_ASSERT(latest_pending_endpoint_list_ == nullptr);
219 }
220 
ShutdownLocked()221 void RoundRobin::ShutdownLocked() {
222   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
223     gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
224   }
225   shutdown_ = true;
226   endpoint_list_.reset();
227   latest_pending_endpoint_list_.reset();
228 }
229 
ResetBackoffLocked()230 void RoundRobin::ResetBackoffLocked() {
231   endpoint_list_->ResetBackoffLocked();
232   if (latest_pending_endpoint_list_ != nullptr) {
233     latest_pending_endpoint_list_->ResetBackoffLocked();
234   }
235 }
236 
UpdateLocked(UpdateArgs args)237 absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
238   EndpointAddressesIterator* addresses = nullptr;
239   if (args.addresses.ok()) {
240     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
241       gpr_log(GPR_INFO, "[RR %p] received update", this);
242     }
243     addresses = args.addresses->get();
244   } else {
245     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
246       gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
247               args.addresses.status().ToString().c_str());
248     }
249     // If we already have a child list, then keep using the existing
250     // list, but still report back that the update was not accepted.
251     if (endpoint_list_ != nullptr) return args.addresses.status();
252   }
253   // Create new child list, replacing the previous pending list, if any.
254   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
255       latest_pending_endpoint_list_ != nullptr) {
256     gpr_log(GPR_INFO, "[RR %p] replacing previous pending child list %p", this,
257             latest_pending_endpoint_list_.get());
258   }
259   latest_pending_endpoint_list_ = MakeOrphanable<RoundRobinEndpointList>(
260       RefAsSubclass<RoundRobin>(DEBUG_LOCATION, "RoundRobinEndpointList"),
261       addresses, args.args);
262   // If the new list is empty, immediately promote it to
263   // endpoint_list_ and report TRANSIENT_FAILURE.
264   if (latest_pending_endpoint_list_->size() == 0) {
265     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
266         endpoint_list_ != nullptr) {
267       gpr_log(GPR_INFO, "[RR %p] replacing previous child list %p", this,
268               endpoint_list_.get());
269     }
270     endpoint_list_ = std::move(latest_pending_endpoint_list_);
271     absl::Status status =
272         args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
273                                   "empty address list: ", args.resolution_note))
274                             : args.addresses.status();
275     channel_control_helper()->UpdateState(
276         GRPC_CHANNEL_TRANSIENT_FAILURE, status,
277         MakeRefCounted<TransientFailurePicker>(status));
278     return status;
279   }
280   // Otherwise, if this is the initial update, immediately promote it to
281   // endpoint_list_.
282   if (endpoint_list_ == nullptr) {
283     endpoint_list_ = std::move(latest_pending_endpoint_list_);
284   }
285   return absl::OkStatus();
286 }
287 
288 //
289 // RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint
290 //
291 
OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,grpc_connectivity_state new_state,const absl::Status & status)292 void RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint::OnStateUpdate(
293     absl::optional<grpc_connectivity_state> old_state,
294     grpc_connectivity_state new_state, const absl::Status& status) {
295   auto* rr_endpoint_list = endpoint_list<RoundRobinEndpointList>();
296   auto* round_robin = policy<RoundRobin>();
297   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
298     gpr_log(
299         GPR_INFO,
300         "[RR %p] connectivity changed for child %p, endpoint_list %p "
301         "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s (%s)",
302         round_robin, this, rr_endpoint_list, Index(), rr_endpoint_list->size(),
303         (old_state.has_value() ? ConnectivityStateName(*old_state) : "N/A"),
304         ConnectivityStateName(new_state), status.ToString().c_str());
305   }
306   if (new_state == GRPC_CHANNEL_IDLE) {
307     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
308       gpr_log(GPR_INFO, "[RR %p] child %p reported IDLE; requesting connection",
309               round_robin, this);
310     }
311     ExitIdleLocked();
312   }
313   // If state changed, update state counters.
314   if (!old_state.has_value() || *old_state != new_state) {
315     rr_endpoint_list->UpdateStateCountersLocked(old_state, new_state);
316   }
317   // Update the policy state.
318   rr_endpoint_list->MaybeUpdateRoundRobinConnectivityStateLocked(status);
319 }
320 
321 //
322 // RoundRobin::RoundRobinEndpointList
323 //
324 
UpdateStateCountersLocked(absl::optional<grpc_connectivity_state> old_state,grpc_connectivity_state new_state)325 void RoundRobin::RoundRobinEndpointList::UpdateStateCountersLocked(
326     absl::optional<grpc_connectivity_state> old_state,
327     grpc_connectivity_state new_state) {
328   // We treat IDLE the same as CONNECTING, since it will immediately
329   // transition into that state anyway.
330   if (old_state.has_value()) {
331     GPR_ASSERT(*old_state != GRPC_CHANNEL_SHUTDOWN);
332     if (*old_state == GRPC_CHANNEL_READY) {
333       GPR_ASSERT(num_ready_ > 0);
334       --num_ready_;
335     } else if (*old_state == GRPC_CHANNEL_CONNECTING ||
336                *old_state == GRPC_CHANNEL_IDLE) {
337       GPR_ASSERT(num_connecting_ > 0);
338       --num_connecting_;
339     } else if (*old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
340       GPR_ASSERT(num_transient_failure_ > 0);
341       --num_transient_failure_;
342     }
343   }
344   GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
345   if (new_state == GRPC_CHANNEL_READY) {
346     ++num_ready_;
347   } else if (new_state == GRPC_CHANNEL_CONNECTING ||
348              new_state == GRPC_CHANNEL_IDLE) {
349     ++num_connecting_;
350   } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
351     ++num_transient_failure_;
352   }
353 }
354 
355 void RoundRobin::RoundRobinEndpointList::
MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf)356     MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf) {
357   auto* round_robin = policy<RoundRobin>();
358   // If this is latest_pending_endpoint_list_, then swap it into
359   // endpoint_list_ in the following cases:
360   // - endpoint_list_ has no READY children.
361   // - This list has at least one READY child and we have seen the
362   //   initial connectivity state notification for all children.
363   // - All of the children in this list are in TRANSIENT_FAILURE.
364   //   (This may cause the channel to go from READY to TRANSIENT_FAILURE,
365   //   but we're doing what the control plane told us to do.)
366   if (round_robin->latest_pending_endpoint_list_.get() == this &&
367       (round_robin->endpoint_list_->num_ready_ == 0 ||
368        (num_ready_ > 0 && AllEndpointsSeenInitialState()) ||
369        num_transient_failure_ == size())) {
370     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
371       const std::string old_counters_string =
372           round_robin->endpoint_list_ != nullptr
373               ? round_robin->endpoint_list_->CountersString()
374               : "";
375       gpr_log(GPR_INFO,
376               "[RR %p] swapping out child list %p (%s) in favor of %p (%s)",
377               round_robin, round_robin->endpoint_list_.get(),
378               old_counters_string.c_str(), this, CountersString().c_str());
379     }
380     round_robin->endpoint_list_ =
381         std::move(round_robin->latest_pending_endpoint_list_);
382   }
383   // Only set connectivity state if this is the current child list.
384   if (round_robin->endpoint_list_.get() != this) return;
385   // First matching rule wins:
386   // 1) ANY child is READY => policy is READY.
387   // 2) ANY child is CONNECTING => policy is CONNECTING.
388   // 3) ALL children are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE.
389   if (num_ready_ > 0) {
390     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
391       gpr_log(GPR_INFO, "[RR %p] reporting READY with child list %p",
392               round_robin, this);
393     }
394     std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
395     for (const auto& endpoint : endpoints()) {
396       auto state = endpoint->connectivity_state();
397       if (state.has_value() && *state == GRPC_CHANNEL_READY) {
398         pickers.push_back(endpoint->picker());
399       }
400     }
401     GPR_ASSERT(!pickers.empty());
402     round_robin->channel_control_helper()->UpdateState(
403         GRPC_CHANNEL_READY, absl::OkStatus(),
404         MakeRefCounted<Picker>(round_robin, std::move(pickers)));
405   } else if (num_connecting_ > 0) {
406     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
407       gpr_log(GPR_INFO, "[RR %p] reporting CONNECTING with child list %p",
408               round_robin, this);
409     }
410     round_robin->channel_control_helper()->UpdateState(
411         GRPC_CHANNEL_CONNECTING, absl::Status(),
412         MakeRefCounted<QueuePicker>(nullptr));
413   } else if (num_transient_failure_ == size()) {
414     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
415       gpr_log(GPR_INFO,
416               "[RR %p] reporting TRANSIENT_FAILURE with child list %p: %s",
417               round_robin, this, status_for_tf.ToString().c_str());
418     }
419     if (!status_for_tf.ok()) {
420       last_failure_ = absl::UnavailableError(
421           absl::StrCat("connections to all backends failing; last error: ",
422                        status_for_tf.message()));
423     }
424     round_robin->channel_control_helper()->UpdateState(
425         GRPC_CHANNEL_TRANSIENT_FAILURE, last_failure_,
426         MakeRefCounted<TransientFailurePicker>(last_failure_));
427   }
428 }
429 
430 //
431 // factory
432 //
433 
434 class RoundRobinConfig final : public LoadBalancingPolicy::Config {
435  public:
name() const436   absl::string_view name() const override { return kRoundRobin; }
437 };
438 
439 class RoundRobinFactory final : public LoadBalancingPolicyFactory {
440  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const441   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
442       LoadBalancingPolicy::Args args) const override {
443     return MakeOrphanable<RoundRobin>(std::move(args));
444   }
445 
name() const446   absl::string_view name() const override { return kRoundRobin; }
447 
448   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json &) const449   ParseLoadBalancingConfig(const Json& /*json*/) const override {
450     return MakeRefCounted<RoundRobinConfig>();
451   }
452 };
453 
454 }  // namespace
455 
RegisterRoundRobinLbPolicy(CoreConfiguration::Builder * builder)456 void RegisterRoundRobinLbPolicy(CoreConfiguration::Builder* builder) {
457   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
458       std::make_unique<RoundRobinFactory>());
459 }
460 
461 }  // namespace grpc_core
462