1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <inttypes.h>
20 #include <stdlib.h>
21
22 #include <algorithm>
23 #include <atomic>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28
29 #include "absl/meta/type_traits.h"
30 #include "absl/random/random.h"
31 #include "absl/status/status.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36
37 #include <grpc/impl/connectivity_state.h>
38 #include <grpc/support/log.h>
39
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/config/core_configuration.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/gprpp/debug_location.h"
44 #include "src/core/lib/gprpp/orphanable.h"
45 #include "src/core/lib/gprpp/ref_counted_ptr.h"
46 #include "src/core/lib/gprpp/work_serializer.h"
47 #include "src/core/lib/json/json.h"
48 #include "src/core/lib/transport/connectivity_state.h"
49 #include "src/core/load_balancing/endpoint_list.h"
50 #include "src/core/load_balancing/lb_policy.h"
51 #include "src/core/load_balancing/lb_policy_factory.h"
52 #include "src/core/resolver/endpoint_addresses.h"
53
54 namespace grpc_core {
55
56 TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
57
58 namespace {
59
60 constexpr absl::string_view kRoundRobin = "round_robin";
61
62 class RoundRobin final : public LoadBalancingPolicy {
63 public:
64 explicit RoundRobin(Args args);
65
name() const66 absl::string_view name() const override { return kRoundRobin; }
67
68 absl::Status UpdateLocked(UpdateArgs args) override;
69 void ResetBackoffLocked() override;
70
71 private:
72 class RoundRobinEndpointList final : public EndpointList {
73 public:
RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,EndpointAddressesIterator * endpoints,const ChannelArgs & args)74 RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,
75 EndpointAddressesIterator* endpoints,
76 const ChannelArgs& args)
77 : EndpointList(std::move(round_robin),
78 GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)
79 ? "RoundRobinEndpointList"
80 : nullptr) {
81 Init(endpoints, args,
82 [&](RefCountedPtr<EndpointList> endpoint_list,
83 const EndpointAddresses& addresses, const ChannelArgs& args) {
84 return MakeOrphanable<RoundRobinEndpoint>(
85 std::move(endpoint_list), addresses, args,
86 policy<RoundRobin>()->work_serializer());
87 });
88 }
89
90 private:
91 class RoundRobinEndpoint final : public Endpoint {
92 public:
RoundRobinEndpoint(RefCountedPtr<EndpointList> endpoint_list,const EndpointAddresses & addresses,const ChannelArgs & args,std::shared_ptr<WorkSerializer> work_serializer)93 RoundRobinEndpoint(RefCountedPtr<EndpointList> endpoint_list,
94 const EndpointAddresses& addresses,
95 const ChannelArgs& args,
96 std::shared_ptr<WorkSerializer> work_serializer)
97 : Endpoint(std::move(endpoint_list)) {
98 Init(addresses, args, std::move(work_serializer));
99 }
100
101 private:
102 // Called when the child policy reports a connectivity state update.
103 void OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,
104 grpc_connectivity_state new_state,
105 const absl::Status& status) override;
106 };
107
channel_control_helper() const108 LoadBalancingPolicy::ChannelControlHelper* channel_control_helper()
109 const override {
110 return policy<RoundRobin>()->channel_control_helper();
111 }
112
113 // Updates the counters of children in each state when a
114 // child transitions from old_state to new_state.
115 void UpdateStateCountersLocked(
116 absl::optional<grpc_connectivity_state> old_state,
117 grpc_connectivity_state new_state);
118
119 // Ensures that the right child list is used and then updates
120 // the RR policy's connectivity state based on the child list's
121 // state counters.
122 void MaybeUpdateRoundRobinConnectivityStateLocked(
123 absl::Status status_for_tf);
124
CountersString() const125 std::string CountersString() const {
126 return absl::StrCat("num_children=", size(), " num_ready=", num_ready_,
127 " num_connecting=", num_connecting_,
128 " num_transient_failure=", num_transient_failure_);
129 }
130
131 size_t num_ready_ = 0;
132 size_t num_connecting_ = 0;
133 size_t num_transient_failure_ = 0;
134
135 absl::Status last_failure_;
136 };
137
138 class Picker final : public SubchannelPicker {
139 public:
140 Picker(RoundRobin* parent,
141 std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>>
142 pickers);
143
144 PickResult Pick(PickArgs args) override;
145
146 private:
147 // Using pointer value only, no ref held -- do not dereference!
148 RoundRobin* parent_;
149
150 std::atomic<size_t> last_picked_index_;
151 std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers_;
152 };
153
154 ~RoundRobin() override;
155
156 void ShutdownLocked() override;
157
158 // Current child list.
159 OrphanablePtr<RoundRobinEndpointList> endpoint_list_;
160 // Latest pending child list.
161 // When we get an updated address list, we create a new child list
162 // for it here, and we wait to swap it into endpoint_list_ until the new
163 // list becomes READY.
164 OrphanablePtr<RoundRobinEndpointList> latest_pending_endpoint_list_;
165
166 bool shutdown_ = false;
167
168 absl::BitGen bit_gen_;
169 };
170
171 //
172 // RoundRobin::Picker
173 //
174
Picker(RoundRobin * parent,std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers)175 RoundRobin::Picker::Picker(
176 RoundRobin* parent,
177 std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers)
178 : parent_(parent), pickers_(std::move(pickers)) {
179 // For discussion on why we generate a random starting index for
180 // the picker, see https://github.com/grpc/grpc-go/issues/2580.
181 size_t index = absl::Uniform<size_t>(parent->bit_gen_, 0, pickers_.size());
182 last_picked_index_.store(index, std::memory_order_relaxed);
183 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
184 gpr_log(GPR_INFO,
185 "[RR %p picker %p] created picker from endpoint_list=%p "
186 "with %" PRIuPTR " READY children; last_picked_index_=%" PRIuPTR,
187 parent_, this, parent_->endpoint_list_.get(), pickers_.size(),
188 index);
189 }
190 }
191
Pick(PickArgs args)192 RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
193 size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) %
194 pickers_.size();
195 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
196 gpr_log(GPR_INFO,
197 "[RR %p picker %p] using picker index %" PRIuPTR ", picker=%p",
198 parent_, this, index, pickers_[index].get());
199 }
200 return pickers_[index]->Pick(args);
201 }
202
203 //
204 // RoundRobin
205 //
206
RoundRobin(Args args)207 RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
208 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
209 gpr_log(GPR_INFO, "[RR %p] Created", this);
210 }
211 }
212
~RoundRobin()213 RoundRobin::~RoundRobin() {
214 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
215 gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
216 }
217 GPR_ASSERT(endpoint_list_ == nullptr);
218 GPR_ASSERT(latest_pending_endpoint_list_ == nullptr);
219 }
220
ShutdownLocked()221 void RoundRobin::ShutdownLocked() {
222 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
223 gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
224 }
225 shutdown_ = true;
226 endpoint_list_.reset();
227 latest_pending_endpoint_list_.reset();
228 }
229
ResetBackoffLocked()230 void RoundRobin::ResetBackoffLocked() {
231 endpoint_list_->ResetBackoffLocked();
232 if (latest_pending_endpoint_list_ != nullptr) {
233 latest_pending_endpoint_list_->ResetBackoffLocked();
234 }
235 }
236
UpdateLocked(UpdateArgs args)237 absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
238 EndpointAddressesIterator* addresses = nullptr;
239 if (args.addresses.ok()) {
240 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
241 gpr_log(GPR_INFO, "[RR %p] received update", this);
242 }
243 addresses = args.addresses->get();
244 } else {
245 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
246 gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
247 args.addresses.status().ToString().c_str());
248 }
249 // If we already have a child list, then keep using the existing
250 // list, but still report back that the update was not accepted.
251 if (endpoint_list_ != nullptr) return args.addresses.status();
252 }
253 // Create new child list, replacing the previous pending list, if any.
254 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
255 latest_pending_endpoint_list_ != nullptr) {
256 gpr_log(GPR_INFO, "[RR %p] replacing previous pending child list %p", this,
257 latest_pending_endpoint_list_.get());
258 }
259 latest_pending_endpoint_list_ = MakeOrphanable<RoundRobinEndpointList>(
260 RefAsSubclass<RoundRobin>(DEBUG_LOCATION, "RoundRobinEndpointList"),
261 addresses, args.args);
262 // If the new list is empty, immediately promote it to
263 // endpoint_list_ and report TRANSIENT_FAILURE.
264 if (latest_pending_endpoint_list_->size() == 0) {
265 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
266 endpoint_list_ != nullptr) {
267 gpr_log(GPR_INFO, "[RR %p] replacing previous child list %p", this,
268 endpoint_list_.get());
269 }
270 endpoint_list_ = std::move(latest_pending_endpoint_list_);
271 absl::Status status =
272 args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
273 "empty address list: ", args.resolution_note))
274 : args.addresses.status();
275 channel_control_helper()->UpdateState(
276 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
277 MakeRefCounted<TransientFailurePicker>(status));
278 return status;
279 }
280 // Otherwise, if this is the initial update, immediately promote it to
281 // endpoint_list_.
282 if (endpoint_list_ == nullptr) {
283 endpoint_list_ = std::move(latest_pending_endpoint_list_);
284 }
285 return absl::OkStatus();
286 }
287
288 //
289 // RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint
290 //
291
OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,grpc_connectivity_state new_state,const absl::Status & status)292 void RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint::OnStateUpdate(
293 absl::optional<grpc_connectivity_state> old_state,
294 grpc_connectivity_state new_state, const absl::Status& status) {
295 auto* rr_endpoint_list = endpoint_list<RoundRobinEndpointList>();
296 auto* round_robin = policy<RoundRobin>();
297 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
298 gpr_log(
299 GPR_INFO,
300 "[RR %p] connectivity changed for child %p, endpoint_list %p "
301 "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s (%s)",
302 round_robin, this, rr_endpoint_list, Index(), rr_endpoint_list->size(),
303 (old_state.has_value() ? ConnectivityStateName(*old_state) : "N/A"),
304 ConnectivityStateName(new_state), status.ToString().c_str());
305 }
306 if (new_state == GRPC_CHANNEL_IDLE) {
307 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
308 gpr_log(GPR_INFO, "[RR %p] child %p reported IDLE; requesting connection",
309 round_robin, this);
310 }
311 ExitIdleLocked();
312 }
313 // If state changed, update state counters.
314 if (!old_state.has_value() || *old_state != new_state) {
315 rr_endpoint_list->UpdateStateCountersLocked(old_state, new_state);
316 }
317 // Update the policy state.
318 rr_endpoint_list->MaybeUpdateRoundRobinConnectivityStateLocked(status);
319 }
320
321 //
322 // RoundRobin::RoundRobinEndpointList
323 //
324
UpdateStateCountersLocked(absl::optional<grpc_connectivity_state> old_state,grpc_connectivity_state new_state)325 void RoundRobin::RoundRobinEndpointList::UpdateStateCountersLocked(
326 absl::optional<grpc_connectivity_state> old_state,
327 grpc_connectivity_state new_state) {
328 // We treat IDLE the same as CONNECTING, since it will immediately
329 // transition into that state anyway.
330 if (old_state.has_value()) {
331 GPR_ASSERT(*old_state != GRPC_CHANNEL_SHUTDOWN);
332 if (*old_state == GRPC_CHANNEL_READY) {
333 GPR_ASSERT(num_ready_ > 0);
334 --num_ready_;
335 } else if (*old_state == GRPC_CHANNEL_CONNECTING ||
336 *old_state == GRPC_CHANNEL_IDLE) {
337 GPR_ASSERT(num_connecting_ > 0);
338 --num_connecting_;
339 } else if (*old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
340 GPR_ASSERT(num_transient_failure_ > 0);
341 --num_transient_failure_;
342 }
343 }
344 GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
345 if (new_state == GRPC_CHANNEL_READY) {
346 ++num_ready_;
347 } else if (new_state == GRPC_CHANNEL_CONNECTING ||
348 new_state == GRPC_CHANNEL_IDLE) {
349 ++num_connecting_;
350 } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
351 ++num_transient_failure_;
352 }
353 }
354
355 void RoundRobin::RoundRobinEndpointList::
MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf)356 MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf) {
357 auto* round_robin = policy<RoundRobin>();
358 // If this is latest_pending_endpoint_list_, then swap it into
359 // endpoint_list_ in the following cases:
360 // - endpoint_list_ has no READY children.
361 // - This list has at least one READY child and we have seen the
362 // initial connectivity state notification for all children.
363 // - All of the children in this list are in TRANSIENT_FAILURE.
364 // (This may cause the channel to go from READY to TRANSIENT_FAILURE,
365 // but we're doing what the control plane told us to do.)
366 if (round_robin->latest_pending_endpoint_list_.get() == this &&
367 (round_robin->endpoint_list_->num_ready_ == 0 ||
368 (num_ready_ > 0 && AllEndpointsSeenInitialState()) ||
369 num_transient_failure_ == size())) {
370 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
371 const std::string old_counters_string =
372 round_robin->endpoint_list_ != nullptr
373 ? round_robin->endpoint_list_->CountersString()
374 : "";
375 gpr_log(GPR_INFO,
376 "[RR %p] swapping out child list %p (%s) in favor of %p (%s)",
377 round_robin, round_robin->endpoint_list_.get(),
378 old_counters_string.c_str(), this, CountersString().c_str());
379 }
380 round_robin->endpoint_list_ =
381 std::move(round_robin->latest_pending_endpoint_list_);
382 }
383 // Only set connectivity state if this is the current child list.
384 if (round_robin->endpoint_list_.get() != this) return;
385 // First matching rule wins:
386 // 1) ANY child is READY => policy is READY.
387 // 2) ANY child is CONNECTING => policy is CONNECTING.
388 // 3) ALL children are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE.
389 if (num_ready_ > 0) {
390 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
391 gpr_log(GPR_INFO, "[RR %p] reporting READY with child list %p",
392 round_robin, this);
393 }
394 std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
395 for (const auto& endpoint : endpoints()) {
396 auto state = endpoint->connectivity_state();
397 if (state.has_value() && *state == GRPC_CHANNEL_READY) {
398 pickers.push_back(endpoint->picker());
399 }
400 }
401 GPR_ASSERT(!pickers.empty());
402 round_robin->channel_control_helper()->UpdateState(
403 GRPC_CHANNEL_READY, absl::OkStatus(),
404 MakeRefCounted<Picker>(round_robin, std::move(pickers)));
405 } else if (num_connecting_ > 0) {
406 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
407 gpr_log(GPR_INFO, "[RR %p] reporting CONNECTING with child list %p",
408 round_robin, this);
409 }
410 round_robin->channel_control_helper()->UpdateState(
411 GRPC_CHANNEL_CONNECTING, absl::Status(),
412 MakeRefCounted<QueuePicker>(nullptr));
413 } else if (num_transient_failure_ == size()) {
414 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
415 gpr_log(GPR_INFO,
416 "[RR %p] reporting TRANSIENT_FAILURE with child list %p: %s",
417 round_robin, this, status_for_tf.ToString().c_str());
418 }
419 if (!status_for_tf.ok()) {
420 last_failure_ = absl::UnavailableError(
421 absl::StrCat("connections to all backends failing; last error: ",
422 status_for_tf.message()));
423 }
424 round_robin->channel_control_helper()->UpdateState(
425 GRPC_CHANNEL_TRANSIENT_FAILURE, last_failure_,
426 MakeRefCounted<TransientFailurePicker>(last_failure_));
427 }
428 }
429
430 //
431 // factory
432 //
433
434 class RoundRobinConfig final : public LoadBalancingPolicy::Config {
435 public:
name() const436 absl::string_view name() const override { return kRoundRobin; }
437 };
438
439 class RoundRobinFactory final : public LoadBalancingPolicyFactory {
440 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const441 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
442 LoadBalancingPolicy::Args args) const override {
443 return MakeOrphanable<RoundRobin>(std::move(args));
444 }
445
name() const446 absl::string_view name() const override { return kRoundRobin; }
447
448 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json &) const449 ParseLoadBalancingConfig(const Json& /*json*/) const override {
450 return MakeRefCounted<RoundRobinConfig>();
451 }
452 };
453
454 } // namespace
455
RegisterRoundRobinLbPolicy(CoreConfiguration::Builder * builder)456 void RegisterRoundRobinLbPolicy(CoreConfiguration::Builder* builder) {
457 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
458 std::make_unique<RoundRobinFactory>());
459 }
460
461 } // namespace grpc_core
462