1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
20
21 #include <inttypes.h>
22 #include <stdlib.h>
23
24 #include <algorithm>
25 #include <cmath>
26 #include <memory>
27 #include <string>
28 #include <utility>
29 #include <vector>
30
31 #include "absl/base/attributes.h"
32 #include "absl/container/inlined_vector.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/numbers.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/string_view.h"
38 #include "absl/types/optional.h"
39
40 #define XXH_INLINE_ALL
41 #include "xxhash.h"
42
43 #include <grpc/grpc.h>
44 #include <grpc/impl/connectivity_state.h>
45 #include <grpc/support/log.h>
46
47 #include "src/core/ext/filters/client_channel/client_channel_internal.h"
48 #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
49 #include "src/core/lib/address_utils/sockaddr_utils.h"
50 #include "src/core/lib/channel/channel_args.h"
51 #include "src/core/lib/config/core_configuration.h"
52 #include "src/core/lib/debug/trace.h"
53 #include "src/core/lib/gprpp/debug_location.h"
54 #include "src/core/lib/gprpp/orphanable.h"
55 #include "src/core/lib/gprpp/ref_counted.h"
56 #include "src/core/lib/gprpp/ref_counted_ptr.h"
57 #include "src/core/lib/gprpp/unique_type_name.h"
58 #include "src/core/lib/gprpp/work_serializer.h"
59 #include "src/core/lib/iomgr/closure.h"
60 #include "src/core/lib/iomgr/error.h"
61 #include "src/core/lib/iomgr/exec_ctx.h"
62 #include "src/core/lib/json/json.h"
63 #include "src/core/lib/load_balancing/lb_policy.h"
64 #include "src/core/lib/load_balancing/lb_policy_factory.h"
65 #include "src/core/lib/load_balancing/subchannel_interface.h"
66 #include "src/core/lib/resolver/server_address.h"
67 #include "src/core/lib/transport/connectivity_state.h"
68
69 namespace grpc_core {
70
71 TraceFlag grpc_lb_ring_hash_trace(false, "ring_hash_lb");
72
TypeName()73 UniqueTypeName RequestHashAttribute::TypeName() {
74 static UniqueTypeName::Factory kFactory("request_hash");
75 return kFactory.Create();
76 }
77
78 // Helper Parser method
79
JsonLoader(const JsonArgs &)80 const JsonLoaderInterface* RingHashConfig::JsonLoader(const JsonArgs&) {
81 static const auto* loader =
82 JsonObjectLoader<RingHashConfig>()
83 .OptionalField("minRingSize", &RingHashConfig::min_ring_size)
84 .OptionalField("maxRingSize", &RingHashConfig::max_ring_size)
85 .Finish();
86 return loader;
87 }
88
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)89 void RingHashConfig::JsonPostLoad(const Json&, const JsonArgs&,
90 ValidationErrors* errors) {
91 {
92 ValidationErrors::ScopedField field(errors, ".minRingSize");
93 if (!errors->FieldHasErrors() &&
94 (min_ring_size == 0 || min_ring_size > 8388608)) {
95 errors->AddError("must be in the range [1, 8388608]");
96 }
97 }
98 {
99 ValidationErrors::ScopedField field(errors, ".maxRingSize");
100 if (!errors->FieldHasErrors() &&
101 (max_ring_size == 0 || max_ring_size > 8388608)) {
102 errors->AddError("must be in the range [1, 8388608]");
103 }
104 }
105 if (min_ring_size > max_ring_size) {
106 errors->AddError("max_ring_size cannot be smaller than min_ring_size");
107 }
108 }
109
110 namespace {
111
112 constexpr absl::string_view kRingHash = "ring_hash_experimental";
113
114 class RingHashLbConfig : public LoadBalancingPolicy::Config {
115 public:
RingHashLbConfig(size_t min_ring_size,size_t max_ring_size)116 RingHashLbConfig(size_t min_ring_size, size_t max_ring_size)
117 : min_ring_size_(min_ring_size), max_ring_size_(max_ring_size) {}
name() const118 absl::string_view name() const override { return kRingHash; }
min_ring_size() const119 size_t min_ring_size() const { return min_ring_size_; }
max_ring_size() const120 size_t max_ring_size() const { return max_ring_size_; }
121
122 private:
123 size_t min_ring_size_;
124 size_t max_ring_size_;
125 };
126
127 //
128 // ring_hash LB policy
129 //
130
131 constexpr size_t kRingSizeCapDefault = 4096;
132
133 class RingHash : public LoadBalancingPolicy {
134 public:
135 explicit RingHash(Args args);
136
name() const137 absl::string_view name() const override { return kRingHash; }
138
139 absl::Status UpdateLocked(UpdateArgs args) override;
140 void ResetBackoffLocked() override;
141
142 private:
143 // Forward declaration.
144 class RingHashSubchannelList;
145
146 // Data for a particular subchannel in a subchannel list.
147 // This subclass adds the following functionality:
148 // - Tracks the previous connectivity state of the subchannel, so that
149 // we know how many subchannels are in each state.
150 class RingHashSubchannelData
151 : public SubchannelData<RingHashSubchannelList, RingHashSubchannelData> {
152 public:
RingHashSubchannelData(SubchannelList<RingHashSubchannelList,RingHashSubchannelData> * subchannel_list,const ServerAddress & address,RefCountedPtr<SubchannelInterface> subchannel)153 RingHashSubchannelData(
154 SubchannelList<RingHashSubchannelList, RingHashSubchannelData>*
155 subchannel_list,
156 const ServerAddress& address,
157 RefCountedPtr<SubchannelInterface> subchannel)
158 : SubchannelData(subchannel_list, address, std::move(subchannel)),
159 address_(address) {}
160
address() const161 const ServerAddress& address() const { return address_; }
162
logical_connectivity_state() const163 grpc_connectivity_state logical_connectivity_state() const {
164 return logical_connectivity_state_;
165 }
logical_connectivity_status() const166 const absl::Status& logical_connectivity_status() const {
167 return logical_connectivity_status_;
168 }
169
170 private:
171 // Performs connectivity state updates that need to be done only
172 // after we have started watching.
173 void ProcessConnectivityChangeLocked(
174 absl::optional<grpc_connectivity_state> old_state,
175 grpc_connectivity_state new_state) override;
176
177 ServerAddress address_;
178
179 // Last logical connectivity state seen.
180 // Note that this may differ from the state actually reported by the
181 // subchannel in some cases; for example, once this is set to
182 // TRANSIENT_FAILURE, we do not change it again until we get READY,
183 // so we skip any interim stops in CONNECTING.
184 grpc_connectivity_state logical_connectivity_state_ = GRPC_CHANNEL_IDLE;
185 absl::Status logical_connectivity_status_;
186 };
187
188 // A list of subchannels and the ring containing those subchannels.
189 class RingHashSubchannelList
190 : public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> {
191 public:
192 class Ring : public RefCounted<Ring> {
193 public:
194 struct RingEntry {
195 uint64_t hash;
196 size_t subchannel_index;
197 };
198
199 Ring(RingHashLbConfig* config, RingHashSubchannelList* subchannel_list,
200 const ChannelArgs& args);
201
ring() const202 const std::vector<RingEntry>& ring() const { return ring_; }
203
204 private:
205 std::vector<RingEntry> ring_;
206 };
207
208 RingHashSubchannelList(RingHash* policy, ServerAddressList addresses,
209 const ChannelArgs& args);
210
~RingHashSubchannelList()211 ~RingHashSubchannelList() override {
212 RingHash* p = static_cast<RingHash*>(policy());
213 p->Unref(DEBUG_LOCATION, "subchannel_list");
214 }
215
ring()216 RefCountedPtr<Ring> ring() { return ring_; }
217
218 // Updates the counters of subchannels in each state when a
219 // subchannel transitions from old_state to new_state.
220 void UpdateStateCountersLocked(grpc_connectivity_state old_state,
221 grpc_connectivity_state new_state);
222
223 // Updates the RH policy's connectivity state based on the
224 // subchannel list's state counters, creating new picker and new ring.
225 // The index parameter indicates the index into the list of the subchannel
226 // whose status report triggered the call to
227 // UpdateRingHashConnectivityStateLocked().
228 // connection_attempt_complete is true if the subchannel just
229 // finished a connection attempt.
230 void UpdateRingHashConnectivityStateLocked(size_t index,
231 bool connection_attempt_complete,
232 absl::Status status);
233
234 private:
work_serializer() const235 std::shared_ptr<WorkSerializer> work_serializer() const override {
236 return static_cast<RingHash*>(policy())->work_serializer();
237 }
238
239 size_t num_idle_;
240 size_t num_ready_ = 0;
241 size_t num_connecting_ = 0;
242 size_t num_transient_failure_ = 0;
243
244 RefCountedPtr<Ring> ring_;
245
246 // The index of the subchannel currently doing an internally
247 // triggered connection attempt, if any.
248 absl::optional<size_t> internally_triggered_connection_index_;
249
250 // TODO(roth): If we ever change the helper UpdateState() API to not
251 // need the status reported for TRANSIENT_FAILURE state (because
252 // it's not currently actually used for anything outside of the picker),
253 // then we will no longer need this data member.
254 absl::Status last_failure_;
255 };
256
257 class Picker : public SubchannelPicker {
258 public:
Picker(RefCountedPtr<RingHash> ring_hash_lb,RingHashSubchannelList * subchannel_list)259 Picker(RefCountedPtr<RingHash> ring_hash_lb,
260 RingHashSubchannelList* subchannel_list)
261 : ring_hash_lb_(std::move(ring_hash_lb)),
262 ring_(subchannel_list->ring()) {
263 subchannels_.reserve(subchannel_list->num_subchannels());
264 for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
265 RingHashSubchannelData* subchannel_data =
266 subchannel_list->subchannel(i);
267 subchannels_.emplace_back(
268 SubchannelInfo{subchannel_data->subchannel()->Ref(),
269 subchannel_data->logical_connectivity_state(),
270 subchannel_data->logical_connectivity_status()});
271 }
272 }
273
274 PickResult Pick(PickArgs args) override;
275
276 private:
277 // A fire-and-forget class that schedules subchannel connection attempts
278 // on the control plane WorkSerializer.
279 class SubchannelConnectionAttempter : public Orphanable {
280 public:
SubchannelConnectionAttempter(RefCountedPtr<RingHash> ring_hash_lb)281 explicit SubchannelConnectionAttempter(
282 RefCountedPtr<RingHash> ring_hash_lb)
283 : ring_hash_lb_(std::move(ring_hash_lb)) {
284 GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
285 }
286
Orphan()287 void Orphan() override {
288 // Hop into ExecCtx, so that we're not holding the data plane mutex
289 // while we run control-plane code.
290 ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
291 }
292
AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel)293 void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
294 subchannels_.push_back(std::move(subchannel));
295 }
296
297 private:
RunInExecCtx(void * arg,grpc_error_handle)298 static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
299 auto* self = static_cast<SubchannelConnectionAttempter*>(arg);
300 self->ring_hash_lb_->work_serializer()->Run(
301 [self]() {
302 if (!self->ring_hash_lb_->shutdown_) {
303 for (auto& subchannel : self->subchannels_) {
304 subchannel->RequestConnection();
305 }
306 }
307 delete self;
308 },
309 DEBUG_LOCATION);
310 }
311
312 RefCountedPtr<RingHash> ring_hash_lb_;
313 grpc_closure closure_;
314 std::vector<RefCountedPtr<SubchannelInterface>> subchannels_;
315 };
316
317 struct SubchannelInfo {
318 RefCountedPtr<SubchannelInterface> subchannel;
319 grpc_connectivity_state state;
320 absl::Status status;
321 };
322
323 RefCountedPtr<RingHash> ring_hash_lb_;
324 RefCountedPtr<RingHashSubchannelList::Ring> ring_;
325 std::vector<SubchannelInfo> subchannels_;
326 };
327
328 ~RingHash() override;
329
330 void ShutdownLocked() override;
331
332 // Current config from resolver.
333 RefCountedPtr<RingHashLbConfig> config_;
334
335 // list of subchannels.
336 RefCountedPtr<RingHashSubchannelList> subchannel_list_;
337 RefCountedPtr<RingHashSubchannelList> latest_pending_subchannel_list_;
338 // indicating if we are shutting down.
339 bool shutdown_ = false;
340 };
341
342 //
343 // RingHash::Picker
344 //
345
Pick(PickArgs args)346 RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
347 auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
348 auto* hash_attribute = static_cast<RequestHashAttribute*>(
349 call_state->GetCallAttribute(RequestHashAttribute::TypeName()));
350 absl::string_view hash;
351 if (hash_attribute != nullptr) {
352 hash = hash_attribute->request_hash();
353 }
354 uint64_t h;
355 if (!absl::SimpleAtoi(hash, &h)) {
356 return PickResult::Fail(
357 absl::InternalError("ring hash value is not a number"));
358 }
359 const auto& ring = ring_->ring();
360 // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
361 // (ketama_get_server) NOTE: The algorithm depends on using signed integers
362 // for lowp, highp, and first_index. Do not change them!
363 int64_t lowp = 0;
364 int64_t highp = ring.size();
365 int64_t first_index = 0;
366 while (true) {
367 first_index = (lowp + highp) / 2;
368 if (first_index == static_cast<int64_t>(ring.size())) {
369 first_index = 0;
370 break;
371 }
372 uint64_t midval = ring[first_index].hash;
373 uint64_t midval1 = first_index == 0 ? 0 : ring[first_index - 1].hash;
374 if (h <= midval && h > midval1) {
375 break;
376 }
377 if (midval < h) {
378 lowp = first_index + 1;
379 } else {
380 highp = first_index - 1;
381 }
382 if (lowp > highp) {
383 first_index = 0;
384 break;
385 }
386 }
387 OrphanablePtr<SubchannelConnectionAttempter> subchannel_connection_attempter;
388 auto ScheduleSubchannelConnectionAttempt =
389 [&](RefCountedPtr<SubchannelInterface> subchannel) {
390 if (subchannel_connection_attempter == nullptr) {
391 subchannel_connection_attempter =
392 MakeOrphanable<SubchannelConnectionAttempter>(ring_hash_lb_->Ref(
393 DEBUG_LOCATION, "SubchannelConnectionAttempter"));
394 }
395 subchannel_connection_attempter->AddSubchannel(std::move(subchannel));
396 };
397 SubchannelInfo& first_subchannel =
398 subchannels_[ring[first_index].subchannel_index];
399 switch (first_subchannel.state) {
400 case GRPC_CHANNEL_READY:
401 return PickResult::Complete(first_subchannel.subchannel);
402 case GRPC_CHANNEL_IDLE:
403 ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel);
404 ABSL_FALLTHROUGH_INTENDED;
405 case GRPC_CHANNEL_CONNECTING:
406 return PickResult::Queue();
407 default: // GRPC_CHANNEL_TRANSIENT_FAILURE
408 break;
409 }
410 ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel);
411 // Loop through remaining subchannels to find one in READY.
412 // On the way, we make sure the right set of connection attempts
413 // will happen.
414 bool found_second_subchannel = false;
415 bool found_first_non_failed = false;
416 for (size_t i = 1; i < ring.size(); ++i) {
417 const auto& entry = ring[(first_index + i) % ring.size()];
418 if (entry.subchannel_index == ring[first_index].subchannel_index) {
419 continue;
420 }
421 SubchannelInfo& subchannel_info = subchannels_[entry.subchannel_index];
422 if (subchannel_info.state == GRPC_CHANNEL_READY) {
423 return PickResult::Complete(subchannel_info.subchannel);
424 }
425 if (!found_second_subchannel) {
426 switch (subchannel_info.state) {
427 case GRPC_CHANNEL_IDLE:
428 ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
429 ABSL_FALLTHROUGH_INTENDED;
430 case GRPC_CHANNEL_CONNECTING:
431 return PickResult::Queue();
432 default:
433 break;
434 }
435 found_second_subchannel = true;
436 }
437 if (!found_first_non_failed) {
438 if (subchannel_info.state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
439 ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
440 } else {
441 if (subchannel_info.state == GRPC_CHANNEL_IDLE) {
442 ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
443 }
444 found_first_non_failed = true;
445 }
446 }
447 }
448 return PickResult::Fail(absl::UnavailableError(absl::StrCat(
449 "ring hash cannot find a connected subchannel; first failure: ",
450 first_subchannel.status.ToString())));
451 }
452
453 //
454 // RingHash::RingHashSubchannelList::Ring
455 //
456
Ring(RingHashLbConfig * config,RingHashSubchannelList * subchannel_list,const ChannelArgs & args)457 RingHash::RingHashSubchannelList::Ring::Ring(
458 RingHashLbConfig* config, RingHashSubchannelList* subchannel_list,
459 const ChannelArgs& args) {
460 // Store the weights while finding the sum.
461 struct AddressWeight {
462 std::string address;
463 // Default weight is 1 for the cases where a weight is not provided,
464 // each occurrence of the address will be counted a weight value of 1.
465 uint32_t weight = 1;
466 double normalized_weight;
467 };
468 std::vector<AddressWeight> address_weights;
469 size_t sum = 0;
470 address_weights.reserve(subchannel_list->num_subchannels());
471 for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
472 RingHashSubchannelData* sd = subchannel_list->subchannel(i);
473 const ServerAddressWeightAttribute* weight_attribute = static_cast<
474 const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
475 ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
476 AddressWeight address_weight;
477 address_weight.address =
478 grpc_sockaddr_to_string(&sd->address().address(), false).value();
479 // Weight should never be zero, but ignore it just in case, since
480 // that value would screw up the ring-building algorithm.
481 if (weight_attribute != nullptr && weight_attribute->weight() > 0) {
482 address_weight.weight = weight_attribute->weight();
483 }
484 sum += address_weight.weight;
485 address_weights.push_back(std::move(address_weight));
486 }
487 // Calculating normalized weights and find min and max.
488 double min_normalized_weight = 1.0;
489 double max_normalized_weight = 0.0;
490 for (auto& address : address_weights) {
491 address.normalized_weight = static_cast<double>(address.weight) / sum;
492 min_normalized_weight =
493 std::min(address.normalized_weight, min_normalized_weight);
494 max_normalized_weight =
495 std::max(address.normalized_weight, max_normalized_weight);
496 }
497 // Scale up the number of hashes per host such that the least-weighted host
498 // gets a whole number of hashes on the ring. Other hosts might not end up
499 // with whole numbers, and that's fine (the ring-building algorithm below can
500 // handle this). This preserves the original implementation's behavior: when
501 // weights aren't provided, all hosts should get an equal number of hashes. In
502 // the case where this number exceeds the max_ring_size, it's scaled back down
503 // to fit.
504 const size_t ring_size_cap = args.GetInt(GRPC_ARG_RING_HASH_LB_RING_SIZE_CAP)
505 .value_or(kRingSizeCapDefault);
506 const size_t min_ring_size = std::min(config->min_ring_size(), ring_size_cap);
507 const size_t max_ring_size = std::min(config->max_ring_size(), ring_size_cap);
508 const double scale = std::min(
509 std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
510 static_cast<double>(max_ring_size));
511 // Reserve memory for the entire ring up front.
512 const uint64_t ring_size = std::ceil(scale);
513 ring_.reserve(ring_size);
514 // Populate the hash ring by walking through the (host, weight) pairs in
515 // normalized_host_weights, and generating (scale * weight) hashes for each
516 // host. Since these aren't necessarily whole numbers, we maintain running
517 // sums -- current_hashes and target_hashes -- which allows us to populate the
518 // ring in a mostly stable way.
519 absl::InlinedVector<char, 196> hash_key_buffer;
520 double current_hashes = 0.0;
521 double target_hashes = 0.0;
522 uint64_t min_hashes_per_host = ring_size;
523 uint64_t max_hashes_per_host = 0;
524 for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
525 const std::string& address_string = address_weights[i].address;
526 hash_key_buffer.assign(address_string.begin(), address_string.end());
527 hash_key_buffer.emplace_back('_');
528 auto offset_start = hash_key_buffer.end();
529 target_hashes += scale * address_weights[i].normalized_weight;
530 size_t count = 0;
531 while (current_hashes < target_hashes) {
532 const std::string count_str = absl::StrCat(count);
533 hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end());
534 absl::string_view hash_key(hash_key_buffer.data(),
535 hash_key_buffer.size());
536 const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
537 ring_.push_back({hash, i});
538 ++count;
539 ++current_hashes;
540 hash_key_buffer.erase(offset_start, hash_key_buffer.end());
541 }
542 min_hashes_per_host =
543 std::min(static_cast<uint64_t>(i), min_hashes_per_host);
544 max_hashes_per_host =
545 std::max(static_cast<uint64_t>(i), max_hashes_per_host);
546 }
547 std::sort(ring_.begin(), ring_.end(),
548 [](const RingEntry& lhs, const RingEntry& rhs) -> bool {
549 return lhs.hash < rhs.hash;
550 });
551 }
552
553 //
554 // RingHash::RingHashSubchannelList
555 //
556
RingHashSubchannelList(RingHash * policy,ServerAddressList addresses,const ChannelArgs & args)557 RingHash::RingHashSubchannelList::RingHashSubchannelList(
558 RingHash* policy, ServerAddressList addresses, const ChannelArgs& args)
559 : SubchannelList(policy,
560 (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)
561 ? "RingHashSubchannelList"
562 : nullptr),
563 std::move(addresses), policy->channel_control_helper(),
564 args),
565 num_idle_(num_subchannels()) {
566 // Need to maintain a ref to the LB policy as long as we maintain
567 // any references to subchannels, since the subchannels'
568 // pollset_sets will include the LB policy's pollset_set.
569 policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
570 // Construct the ring.
571 ring_ = MakeRefCounted<Ring>(policy->config_.get(), this, args);
572 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
573 gpr_log(GPR_INFO,
574 "[RH %p] created subchannel list %p with %" PRIuPTR " ring entries",
575 policy, this, ring_->ring().size());
576 }
577 }
578
UpdateStateCountersLocked(grpc_connectivity_state old_state,grpc_connectivity_state new_state)579 void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
580 grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
581 if (old_state == GRPC_CHANNEL_IDLE) {
582 GPR_ASSERT(num_idle_ > 0);
583 --num_idle_;
584 } else if (old_state == GRPC_CHANNEL_READY) {
585 GPR_ASSERT(num_ready_ > 0);
586 --num_ready_;
587 } else if (old_state == GRPC_CHANNEL_CONNECTING) {
588 GPR_ASSERT(num_connecting_ > 0);
589 --num_connecting_;
590 } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
591 GPR_ASSERT(num_transient_failure_ > 0);
592 --num_transient_failure_;
593 }
594 GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
595 if (new_state == GRPC_CHANNEL_IDLE) {
596 ++num_idle_;
597 } else if (new_state == GRPC_CHANNEL_READY) {
598 ++num_ready_;
599 } else if (new_state == GRPC_CHANNEL_CONNECTING) {
600 ++num_connecting_;
601 } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
602 ++num_transient_failure_;
603 }
604 }
605
UpdateRingHashConnectivityStateLocked(size_t index,bool connection_attempt_complete,absl::Status status)606 void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
607 size_t index, bool connection_attempt_complete, absl::Status status) {
608 RingHash* p = static_cast<RingHash*>(policy());
609 // If this is latest_pending_subchannel_list_, then swap it into
610 // subchannel_list_ as soon as we get the initial connectivity state
611 // report for every subchannel in the list.
612 if (p->latest_pending_subchannel_list_.get() == this &&
613 AllSubchannelsSeenInitialState()) {
614 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
615 gpr_log(GPR_INFO, "[RH %p] replacing subchannel list %p with %p", p,
616 p->subchannel_list_.get(), this);
617 }
618 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
619 }
620 // Only set connectivity state if this is the current subchannel list.
621 if (p->subchannel_list_.get() != this) return;
622 // The overall aggregation rules here are:
623 // 1. If there is at least one subchannel in READY state, report READY.
624 // 2. If there are 2 or more subchannels in TRANSIENT_FAILURE state, report
625 // TRANSIENT_FAILURE.
626 // 3. If there is at least one subchannel in CONNECTING state, report
627 // CONNECTING.
628 // 4. If there is one subchannel in TRANSIENT_FAILURE state and there is
629 // more than one subchannel, report CONNECTING.
630 // 5. If there is at least one subchannel in IDLE state, report IDLE.
631 // 6. Otherwise, report TRANSIENT_FAILURE.
632 //
633 // We set start_connection_attempt to true if we match rules 2, 3, or 6.
634 grpc_connectivity_state state;
635 bool start_connection_attempt = false;
636 if (num_ready_ > 0) {
637 state = GRPC_CHANNEL_READY;
638 } else if (num_transient_failure_ >= 2) {
639 state = GRPC_CHANNEL_TRANSIENT_FAILURE;
640 start_connection_attempt = true;
641 } else if (num_connecting_ > 0) {
642 state = GRPC_CHANNEL_CONNECTING;
643 } else if (num_transient_failure_ == 1 && num_subchannels() > 1) {
644 state = GRPC_CHANNEL_CONNECTING;
645 start_connection_attempt = true;
646 } else if (num_idle_ > 0) {
647 state = GRPC_CHANNEL_IDLE;
648 } else {
649 state = GRPC_CHANNEL_TRANSIENT_FAILURE;
650 start_connection_attempt = true;
651 }
652 // In TRANSIENT_FAILURE, report the last reported failure.
653 // Otherwise, report OK.
654 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
655 if (!status.ok()) {
656 last_failure_ = absl::UnavailableError(absl::StrCat(
657 "no reachable subchannels; last error: ", status.ToString()));
658 }
659 status = last_failure_;
660 } else {
661 status = absl::OkStatus();
662 }
663 // Generate new picker and return it to the channel.
664 // Note that we use our own picker regardless of connectivity state.
665 p->channel_control_helper()->UpdateState(
666 state, status,
667 MakeRefCounted<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"), this));
668 // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
669 // not be getting any pick requests from the priority policy.
670 // However, because the ring_hash policy does not attempt to
671 // reconnect to subchannels unless it is getting pick requests,
672 // it will need special handling to ensure that it will eventually
673 // recover from TRANSIENT_FAILURE state once the problem is resolved.
674 // Specifically, it will make sure that it is attempting to connect to
675 // at least one subchannel at any given time. After a given subchannel
676 // fails a connection attempt, it will move on to the next subchannel
677 // in the ring. It will keep doing this until one of the subchannels
678 // successfully connects, at which point it will report READY and stop
679 // proactively trying to connect. The policy will remain in
680 // TRANSIENT_FAILURE until at least one subchannel becomes connected,
681 // even if subchannels are in state CONNECTING during that time.
682 //
683 // Note that we do the same thing when the policy is in state
684 // CONNECTING, just to ensure that we don't remain in CONNECTING state
685 // indefinitely if there are no new picks coming in.
686 if (internally_triggered_connection_index_.has_value() &&
687 *internally_triggered_connection_index_ == index &&
688 connection_attempt_complete) {
689 internally_triggered_connection_index_.reset();
690 }
691 if (start_connection_attempt &&
692 !internally_triggered_connection_index_.has_value()) {
693 size_t next_index = (index + 1) % num_subchannels();
694 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
695 gpr_log(GPR_INFO,
696 "[RH %p] triggering internal connection attempt for subchannel "
697 "%p, subchannel_list %p (index %" PRIuPTR " of %" PRIuPTR ")",
698 p, subchannel(next_index)->subchannel(), this, next_index,
699 num_subchannels());
700 }
701 internally_triggered_connection_index_ = next_index;
702 subchannel(next_index)->subchannel()->RequestConnection();
703 }
704 }
705
706 //
707 // RingHash::RingHashSubchannelData
708 //
709
ProcessConnectivityChangeLocked(absl::optional<grpc_connectivity_state> old_state,grpc_connectivity_state new_state)710 void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
711 absl::optional<grpc_connectivity_state> old_state,
712 grpc_connectivity_state new_state) {
713 RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
714 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
715 gpr_log(
716 GPR_INFO,
717 "[RH %p] connectivity changed for subchannel %p, subchannel_list %p "
718 "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
719 p, subchannel(), subchannel_list(), Index(),
720 subchannel_list()->num_subchannels(),
721 ConnectivityStateName(logical_connectivity_state_),
722 ConnectivityStateName(new_state));
723 }
724 GPR_ASSERT(subchannel() != nullptr);
725 // If this is not the initial state notification and the new state is
726 // TRANSIENT_FAILURE or IDLE, re-resolve.
727 // Note that we don't want to do this on the initial state notification,
728 // because that would result in an endless loop of re-resolution.
729 if (old_state.has_value() && (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
730 new_state == GRPC_CHANNEL_IDLE)) {
731 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
732 gpr_log(GPR_INFO,
733 "[RH %p] Subchannel %p reported %s; requesting re-resolution", p,
734 subchannel(), ConnectivityStateName(new_state));
735 }
736 p->channel_control_helper()->RequestReresolution();
737 }
738 const bool connection_attempt_complete = new_state != GRPC_CHANNEL_CONNECTING;
739 // Decide what state to report for the purposes of aggregation and
740 // picker behavior.
741 // If the last recorded state was TRANSIENT_FAILURE, ignore the change
742 // unless the new state is READY (or TF again, in which case we need
743 // to update the status).
744 if (logical_connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE ||
745 new_state == GRPC_CHANNEL_READY ||
746 new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
747 // Update state counters used for aggregation.
748 subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_,
749 new_state);
750 // Update logical state.
751 logical_connectivity_state_ = new_state;
752 logical_connectivity_status_ = connectivity_status();
753 }
754 // Update the RH policy's connectivity state, creating new picker and new
755 // ring.
756 subchannel_list()->UpdateRingHashConnectivityStateLocked(
757 Index(), connection_attempt_complete, logical_connectivity_status_);
758 }
759
760 //
761 // RingHash
762 //
763
RingHash(Args args)764 RingHash::RingHash(Args args) : LoadBalancingPolicy(std::move(args)) {
765 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
766 gpr_log(GPR_INFO, "[RH %p] Created", this);
767 }
768 }
769
~RingHash()770 RingHash::~RingHash() {
771 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
772 gpr_log(GPR_INFO, "[RH %p] Destroying Ring Hash policy", this);
773 }
774 GPR_ASSERT(subchannel_list_ == nullptr);
775 GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
776 }
777
ShutdownLocked()778 void RingHash::ShutdownLocked() {
779 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
780 gpr_log(GPR_INFO, "[RH %p] Shutting down", this);
781 }
782 shutdown_ = true;
783 subchannel_list_.reset();
784 latest_pending_subchannel_list_.reset();
785 }
786
ResetBackoffLocked()787 void RingHash::ResetBackoffLocked() {
788 subchannel_list_->ResetBackoffLocked();
789 if (latest_pending_subchannel_list_ != nullptr) {
790 latest_pending_subchannel_list_->ResetBackoffLocked();
791 }
792 }
793
UpdateLocked(UpdateArgs args)794 absl::Status RingHash::UpdateLocked(UpdateArgs args) {
795 config_ = std::move(args.config);
796 ServerAddressList addresses;
797 if (args.addresses.ok()) {
798 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
799 gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses",
800 this, args.addresses->size());
801 }
802 addresses = *std::move(args.addresses);
803 } else {
804 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
805 gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s",
806 this, args.addresses.status().ToString().c_str());
807 }
808 // If we already have a subchannel list, then keep using the existing
809 // list, but still report back that the update was not accepted.
810 if (subchannel_list_ != nullptr) return args.addresses.status();
811 }
812 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) &&
813 latest_pending_subchannel_list_ != nullptr) {
814 gpr_log(GPR_INFO, "[RH %p] replacing latest pending subchannel list %p",
815 this, latest_pending_subchannel_list_.get());
816 }
817 latest_pending_subchannel_list_ = MakeRefCounted<RingHashSubchannelList>(
818 this, std::move(addresses), args.args);
819 latest_pending_subchannel_list_->StartWatchingLocked();
820 // If we have no existing list or the new list is empty, immediately
821 // promote the new list.
822 // Otherwise, do nothing; the new list will be promoted when the
823 // initial subchannel states are reported.
824 if (subchannel_list_ == nullptr ||
825 latest_pending_subchannel_list_->num_subchannels() == 0) {
826 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) &&
827 subchannel_list_ != nullptr) {
828 gpr_log(GPR_INFO,
829 "[RH %p] empty address list, replacing subchannel list %p", this,
830 subchannel_list_.get());
831 }
832 subchannel_list_ = std::move(latest_pending_subchannel_list_);
833 // If the new list is empty, report TRANSIENT_FAILURE.
834 if (subchannel_list_->num_subchannels() == 0) {
835 absl::Status status =
836 args.addresses.ok()
837 ? absl::UnavailableError(
838 absl::StrCat("empty address list: ", args.resolution_note))
839 : args.addresses.status();
840 channel_control_helper()->UpdateState(
841 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
842 MakeRefCounted<TransientFailurePicker>(status));
843 return status;
844 }
845 // Otherwise, report IDLE.
846 subchannel_list_->UpdateRingHashConnectivityStateLocked(
847 /*index=*/0, /*connection_attempt_complete=*/false, absl::OkStatus());
848 }
849 return absl::OkStatus();
850 }
851
852 //
853 // factory
854 //
855
856 class RingHashFactory : public LoadBalancingPolicyFactory {
857 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const858 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
859 LoadBalancingPolicy::Args args) const override {
860 return MakeOrphanable<RingHash>(std::move(args));
861 }
862
name() const863 absl::string_view name() const override { return kRingHash; }
864
865 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const866 ParseLoadBalancingConfig(const Json& json) const override {
867 auto config = LoadFromJson<RingHashConfig>(
868 json, JsonArgs(), "errors validating ring_hash LB policy config");
869 if (!config.ok()) return config.status();
870 return MakeRefCounted<RingHashLbConfig>(config->min_ring_size,
871 config->max_ring_size);
872 }
873 };
874
875 } // namespace
876
RegisterRingHashLbPolicy(CoreConfiguration::Builder * builder)877 void RegisterRingHashLbPolicy(CoreConfiguration::Builder* builder) {
878 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
879 std::make_unique<RingHashFactory>());
880 }
881
882 } // namespace grpc_core
883