1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
20 
21 #include <inttypes.h>
22 #include <stdlib.h>
23 
24 #include <algorithm>
25 #include <cmath>
26 #include <memory>
27 #include <string>
28 #include <utility>
29 #include <vector>
30 
31 #include "absl/base/attributes.h"
32 #include "absl/container/inlined_vector.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/numbers.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/string_view.h"
38 #include "absl/types/optional.h"
39 
40 #define XXH_INLINE_ALL
41 #include "xxhash.h"
42 
43 #include <grpc/grpc.h>
44 #include <grpc/impl/connectivity_state.h>
45 #include <grpc/support/log.h>
46 
47 #include "src/core/ext/filters/client_channel/client_channel_internal.h"
48 #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
49 #include "src/core/lib/address_utils/sockaddr_utils.h"
50 #include "src/core/lib/channel/channel_args.h"
51 #include "src/core/lib/config/core_configuration.h"
52 #include "src/core/lib/debug/trace.h"
53 #include "src/core/lib/gprpp/debug_location.h"
54 #include "src/core/lib/gprpp/orphanable.h"
55 #include "src/core/lib/gprpp/ref_counted.h"
56 #include "src/core/lib/gprpp/ref_counted_ptr.h"
57 #include "src/core/lib/gprpp/unique_type_name.h"
58 #include "src/core/lib/gprpp/work_serializer.h"
59 #include "src/core/lib/iomgr/closure.h"
60 #include "src/core/lib/iomgr/error.h"
61 #include "src/core/lib/iomgr/exec_ctx.h"
62 #include "src/core/lib/json/json.h"
63 #include "src/core/lib/load_balancing/lb_policy.h"
64 #include "src/core/lib/load_balancing/lb_policy_factory.h"
65 #include "src/core/lib/load_balancing/subchannel_interface.h"
66 #include "src/core/lib/resolver/server_address.h"
67 #include "src/core/lib/transport/connectivity_state.h"
68 
69 namespace grpc_core {
70 
71 TraceFlag grpc_lb_ring_hash_trace(false, "ring_hash_lb");
72 
TypeName()73 UniqueTypeName RequestHashAttribute::TypeName() {
74   static UniqueTypeName::Factory kFactory("request_hash");
75   return kFactory.Create();
76 }
77 
78 // Helper Parser method
79 
JsonLoader(const JsonArgs &)80 const JsonLoaderInterface* RingHashConfig::JsonLoader(const JsonArgs&) {
81   static const auto* loader =
82       JsonObjectLoader<RingHashConfig>()
83           .OptionalField("minRingSize", &RingHashConfig::min_ring_size)
84           .OptionalField("maxRingSize", &RingHashConfig::max_ring_size)
85           .Finish();
86   return loader;
87 }
88 
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)89 void RingHashConfig::JsonPostLoad(const Json&, const JsonArgs&,
90                                   ValidationErrors* errors) {
91   {
92     ValidationErrors::ScopedField field(errors, ".minRingSize");
93     if (!errors->FieldHasErrors() &&
94         (min_ring_size == 0 || min_ring_size > 8388608)) {
95       errors->AddError("must be in the range [1, 8388608]");
96     }
97   }
98   {
99     ValidationErrors::ScopedField field(errors, ".maxRingSize");
100     if (!errors->FieldHasErrors() &&
101         (max_ring_size == 0 || max_ring_size > 8388608)) {
102       errors->AddError("must be in the range [1, 8388608]");
103     }
104   }
105   if (min_ring_size > max_ring_size) {
106     errors->AddError("max_ring_size cannot be smaller than min_ring_size");
107   }
108 }
109 
110 namespace {
111 
112 constexpr absl::string_view kRingHash = "ring_hash_experimental";
113 
114 class RingHashLbConfig : public LoadBalancingPolicy::Config {
115  public:
RingHashLbConfig(size_t min_ring_size,size_t max_ring_size)116   RingHashLbConfig(size_t min_ring_size, size_t max_ring_size)
117       : min_ring_size_(min_ring_size), max_ring_size_(max_ring_size) {}
name() const118   absl::string_view name() const override { return kRingHash; }
min_ring_size() const119   size_t min_ring_size() const { return min_ring_size_; }
max_ring_size() const120   size_t max_ring_size() const { return max_ring_size_; }
121 
122  private:
123   size_t min_ring_size_;
124   size_t max_ring_size_;
125 };
126 
127 //
128 // ring_hash LB policy
129 //
130 
131 constexpr size_t kRingSizeCapDefault = 4096;
132 
133 class RingHash : public LoadBalancingPolicy {
134  public:
135   explicit RingHash(Args args);
136 
name() const137   absl::string_view name() const override { return kRingHash; }
138 
139   absl::Status UpdateLocked(UpdateArgs args) override;
140   void ResetBackoffLocked() override;
141 
142  private:
143   // Forward declaration.
144   class RingHashSubchannelList;
145 
146   // Data for a particular subchannel in a subchannel list.
147   // This subclass adds the following functionality:
148   // - Tracks the previous connectivity state of the subchannel, so that
149   //   we know how many subchannels are in each state.
150   class RingHashSubchannelData
151       : public SubchannelData<RingHashSubchannelList, RingHashSubchannelData> {
152    public:
RingHashSubchannelData(SubchannelList<RingHashSubchannelList,RingHashSubchannelData> * subchannel_list,const ServerAddress & address,RefCountedPtr<SubchannelInterface> subchannel)153     RingHashSubchannelData(
154         SubchannelList<RingHashSubchannelList, RingHashSubchannelData>*
155             subchannel_list,
156         const ServerAddress& address,
157         RefCountedPtr<SubchannelInterface> subchannel)
158         : SubchannelData(subchannel_list, address, std::move(subchannel)),
159           address_(address) {}
160 
address() const161     const ServerAddress& address() const { return address_; }
162 
logical_connectivity_state() const163     grpc_connectivity_state logical_connectivity_state() const {
164       return logical_connectivity_state_;
165     }
logical_connectivity_status() const166     const absl::Status& logical_connectivity_status() const {
167       return logical_connectivity_status_;
168     }
169 
170    private:
171     // Performs connectivity state updates that need to be done only
172     // after we have started watching.
173     void ProcessConnectivityChangeLocked(
174         absl::optional<grpc_connectivity_state> old_state,
175         grpc_connectivity_state new_state) override;
176 
177     ServerAddress address_;
178 
179     // Last logical connectivity state seen.
180     // Note that this may differ from the state actually reported by the
181     // subchannel in some cases; for example, once this is set to
182     // TRANSIENT_FAILURE, we do not change it again until we get READY,
183     // so we skip any interim stops in CONNECTING.
184     grpc_connectivity_state logical_connectivity_state_ = GRPC_CHANNEL_IDLE;
185     absl::Status logical_connectivity_status_;
186   };
187 
188   // A list of subchannels and the ring containing those subchannels.
189   class RingHashSubchannelList
190       : public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> {
191    public:
192     class Ring : public RefCounted<Ring> {
193      public:
194       struct RingEntry {
195         uint64_t hash;
196         size_t subchannel_index;
197       };
198 
199       Ring(RingHashLbConfig* config, RingHashSubchannelList* subchannel_list,
200            const ChannelArgs& args);
201 
ring() const202       const std::vector<RingEntry>& ring() const { return ring_; }
203 
204      private:
205       std::vector<RingEntry> ring_;
206     };
207 
208     RingHashSubchannelList(RingHash* policy, ServerAddressList addresses,
209                            const ChannelArgs& args);
210 
~RingHashSubchannelList()211     ~RingHashSubchannelList() override {
212       RingHash* p = static_cast<RingHash*>(policy());
213       p->Unref(DEBUG_LOCATION, "subchannel_list");
214     }
215 
ring()216     RefCountedPtr<Ring> ring() { return ring_; }
217 
218     // Updates the counters of subchannels in each state when a
219     // subchannel transitions from old_state to new_state.
220     void UpdateStateCountersLocked(grpc_connectivity_state old_state,
221                                    grpc_connectivity_state new_state);
222 
223     // Updates the RH policy's connectivity state based on the
224     // subchannel list's state counters, creating new picker and new ring.
225     // The index parameter indicates the index into the list of the subchannel
226     // whose status report triggered the call to
227     // UpdateRingHashConnectivityStateLocked().
228     // connection_attempt_complete is true if the subchannel just
229     // finished a connection attempt.
230     void UpdateRingHashConnectivityStateLocked(size_t index,
231                                                bool connection_attempt_complete,
232                                                absl::Status status);
233 
234    private:
work_serializer() const235     std::shared_ptr<WorkSerializer> work_serializer() const override {
236       return static_cast<RingHash*>(policy())->work_serializer();
237     }
238 
239     size_t num_idle_;
240     size_t num_ready_ = 0;
241     size_t num_connecting_ = 0;
242     size_t num_transient_failure_ = 0;
243 
244     RefCountedPtr<Ring> ring_;
245 
246     // The index of the subchannel currently doing an internally
247     // triggered connection attempt, if any.
248     absl::optional<size_t> internally_triggered_connection_index_;
249 
250     // TODO(roth): If we ever change the helper UpdateState() API to not
251     // need the status reported for TRANSIENT_FAILURE state (because
252     // it's not currently actually used for anything outside of the picker),
253     // then we will no longer need this data member.
254     absl::Status last_failure_;
255   };
256 
257   class Picker : public SubchannelPicker {
258    public:
Picker(RefCountedPtr<RingHash> ring_hash_lb,RingHashSubchannelList * subchannel_list)259     Picker(RefCountedPtr<RingHash> ring_hash_lb,
260            RingHashSubchannelList* subchannel_list)
261         : ring_hash_lb_(std::move(ring_hash_lb)),
262           ring_(subchannel_list->ring()) {
263       subchannels_.reserve(subchannel_list->num_subchannels());
264       for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
265         RingHashSubchannelData* subchannel_data =
266             subchannel_list->subchannel(i);
267         subchannels_.emplace_back(
268             SubchannelInfo{subchannel_data->subchannel()->Ref(),
269                            subchannel_data->logical_connectivity_state(),
270                            subchannel_data->logical_connectivity_status()});
271       }
272     }
273 
274     PickResult Pick(PickArgs args) override;
275 
276    private:
277     // A fire-and-forget class that schedules subchannel connection attempts
278     // on the control plane WorkSerializer.
279     class SubchannelConnectionAttempter : public Orphanable {
280      public:
SubchannelConnectionAttempter(RefCountedPtr<RingHash> ring_hash_lb)281       explicit SubchannelConnectionAttempter(
282           RefCountedPtr<RingHash> ring_hash_lb)
283           : ring_hash_lb_(std::move(ring_hash_lb)) {
284         GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
285       }
286 
Orphan()287       void Orphan() override {
288         // Hop into ExecCtx, so that we're not holding the data plane mutex
289         // while we run control-plane code.
290         ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
291       }
292 
AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel)293       void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
294         subchannels_.push_back(std::move(subchannel));
295       }
296 
297      private:
RunInExecCtx(void * arg,grpc_error_handle)298       static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
299         auto* self = static_cast<SubchannelConnectionAttempter*>(arg);
300         self->ring_hash_lb_->work_serializer()->Run(
301             [self]() {
302               if (!self->ring_hash_lb_->shutdown_) {
303                 for (auto& subchannel : self->subchannels_) {
304                   subchannel->RequestConnection();
305                 }
306               }
307               delete self;
308             },
309             DEBUG_LOCATION);
310       }
311 
312       RefCountedPtr<RingHash> ring_hash_lb_;
313       grpc_closure closure_;
314       std::vector<RefCountedPtr<SubchannelInterface>> subchannels_;
315     };
316 
317     struct SubchannelInfo {
318       RefCountedPtr<SubchannelInterface> subchannel;
319       grpc_connectivity_state state;
320       absl::Status status;
321     };
322 
323     RefCountedPtr<RingHash> ring_hash_lb_;
324     RefCountedPtr<RingHashSubchannelList::Ring> ring_;
325     std::vector<SubchannelInfo> subchannels_;
326   };
327 
328   ~RingHash() override;
329 
330   void ShutdownLocked() override;
331 
332   // Current config from resolver.
333   RefCountedPtr<RingHashLbConfig> config_;
334 
335   // list of subchannels.
336   RefCountedPtr<RingHashSubchannelList> subchannel_list_;
337   RefCountedPtr<RingHashSubchannelList> latest_pending_subchannel_list_;
338   // indicating if we are shutting down.
339   bool shutdown_ = false;
340 };
341 
342 //
343 // RingHash::Picker
344 //
345 
Pick(PickArgs args)346 RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
347   auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
348   auto* hash_attribute = static_cast<RequestHashAttribute*>(
349       call_state->GetCallAttribute(RequestHashAttribute::TypeName()));
350   absl::string_view hash;
351   if (hash_attribute != nullptr) {
352     hash = hash_attribute->request_hash();
353   }
354   uint64_t h;
355   if (!absl::SimpleAtoi(hash, &h)) {
356     return PickResult::Fail(
357         absl::InternalError("ring hash value is not a number"));
358   }
359   const auto& ring = ring_->ring();
360   // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
361   // (ketama_get_server) NOTE: The algorithm depends on using signed integers
362   // for lowp, highp, and first_index. Do not change them!
363   int64_t lowp = 0;
364   int64_t highp = ring.size();
365   int64_t first_index = 0;
366   while (true) {
367     first_index = (lowp + highp) / 2;
368     if (first_index == static_cast<int64_t>(ring.size())) {
369       first_index = 0;
370       break;
371     }
372     uint64_t midval = ring[first_index].hash;
373     uint64_t midval1 = first_index == 0 ? 0 : ring[first_index - 1].hash;
374     if (h <= midval && h > midval1) {
375       break;
376     }
377     if (midval < h) {
378       lowp = first_index + 1;
379     } else {
380       highp = first_index - 1;
381     }
382     if (lowp > highp) {
383       first_index = 0;
384       break;
385     }
386   }
387   OrphanablePtr<SubchannelConnectionAttempter> subchannel_connection_attempter;
388   auto ScheduleSubchannelConnectionAttempt =
389       [&](RefCountedPtr<SubchannelInterface> subchannel) {
390         if (subchannel_connection_attempter == nullptr) {
391           subchannel_connection_attempter =
392               MakeOrphanable<SubchannelConnectionAttempter>(ring_hash_lb_->Ref(
393                   DEBUG_LOCATION, "SubchannelConnectionAttempter"));
394         }
395         subchannel_connection_attempter->AddSubchannel(std::move(subchannel));
396       };
397   SubchannelInfo& first_subchannel =
398       subchannels_[ring[first_index].subchannel_index];
399   switch (first_subchannel.state) {
400     case GRPC_CHANNEL_READY:
401       return PickResult::Complete(first_subchannel.subchannel);
402     case GRPC_CHANNEL_IDLE:
403       ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel);
404       ABSL_FALLTHROUGH_INTENDED;
405     case GRPC_CHANNEL_CONNECTING:
406       return PickResult::Queue();
407     default:  // GRPC_CHANNEL_TRANSIENT_FAILURE
408       break;
409   }
410   ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel);
411   // Loop through remaining subchannels to find one in READY.
412   // On the way, we make sure the right set of connection attempts
413   // will happen.
414   bool found_second_subchannel = false;
415   bool found_first_non_failed = false;
416   for (size_t i = 1; i < ring.size(); ++i) {
417     const auto& entry = ring[(first_index + i) % ring.size()];
418     if (entry.subchannel_index == ring[first_index].subchannel_index) {
419       continue;
420     }
421     SubchannelInfo& subchannel_info = subchannels_[entry.subchannel_index];
422     if (subchannel_info.state == GRPC_CHANNEL_READY) {
423       return PickResult::Complete(subchannel_info.subchannel);
424     }
425     if (!found_second_subchannel) {
426       switch (subchannel_info.state) {
427         case GRPC_CHANNEL_IDLE:
428           ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
429           ABSL_FALLTHROUGH_INTENDED;
430         case GRPC_CHANNEL_CONNECTING:
431           return PickResult::Queue();
432         default:
433           break;
434       }
435       found_second_subchannel = true;
436     }
437     if (!found_first_non_failed) {
438       if (subchannel_info.state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
439         ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
440       } else {
441         if (subchannel_info.state == GRPC_CHANNEL_IDLE) {
442           ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel);
443         }
444         found_first_non_failed = true;
445       }
446     }
447   }
448   return PickResult::Fail(absl::UnavailableError(absl::StrCat(
449       "ring hash cannot find a connected subchannel; first failure: ",
450       first_subchannel.status.ToString())));
451 }
452 
453 //
454 // RingHash::RingHashSubchannelList::Ring
455 //
456 
Ring(RingHashLbConfig * config,RingHashSubchannelList * subchannel_list,const ChannelArgs & args)457 RingHash::RingHashSubchannelList::Ring::Ring(
458     RingHashLbConfig* config, RingHashSubchannelList* subchannel_list,
459     const ChannelArgs& args) {
460   // Store the weights while finding the sum.
461   struct AddressWeight {
462     std::string address;
463     // Default weight is 1 for the cases where a weight is not provided,
464     // each occurrence of the address will be counted a weight value of 1.
465     uint32_t weight = 1;
466     double normalized_weight;
467   };
468   std::vector<AddressWeight> address_weights;
469   size_t sum = 0;
470   address_weights.reserve(subchannel_list->num_subchannels());
471   for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
472     RingHashSubchannelData* sd = subchannel_list->subchannel(i);
473     const ServerAddressWeightAttribute* weight_attribute = static_cast<
474         const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
475         ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
476     AddressWeight address_weight;
477     address_weight.address =
478         grpc_sockaddr_to_string(&sd->address().address(), false).value();
479     // Weight should never be zero, but ignore it just in case, since
480     // that value would screw up the ring-building algorithm.
481     if (weight_attribute != nullptr && weight_attribute->weight() > 0) {
482       address_weight.weight = weight_attribute->weight();
483     }
484     sum += address_weight.weight;
485     address_weights.push_back(std::move(address_weight));
486   }
487   // Calculating normalized weights and find min and max.
488   double min_normalized_weight = 1.0;
489   double max_normalized_weight = 0.0;
490   for (auto& address : address_weights) {
491     address.normalized_weight = static_cast<double>(address.weight) / sum;
492     min_normalized_weight =
493         std::min(address.normalized_weight, min_normalized_weight);
494     max_normalized_weight =
495         std::max(address.normalized_weight, max_normalized_weight);
496   }
497   // Scale up the number of hashes per host such that the least-weighted host
498   // gets a whole number of hashes on the ring. Other hosts might not end up
499   // with whole numbers, and that's fine (the ring-building algorithm below can
500   // handle this). This preserves the original implementation's behavior: when
501   // weights aren't provided, all hosts should get an equal number of hashes. In
502   // the case where this number exceeds the max_ring_size, it's scaled back down
503   // to fit.
504   const size_t ring_size_cap = args.GetInt(GRPC_ARG_RING_HASH_LB_RING_SIZE_CAP)
505                                    .value_or(kRingSizeCapDefault);
506   const size_t min_ring_size = std::min(config->min_ring_size(), ring_size_cap);
507   const size_t max_ring_size = std::min(config->max_ring_size(), ring_size_cap);
508   const double scale = std::min(
509       std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
510       static_cast<double>(max_ring_size));
511   // Reserve memory for the entire ring up front.
512   const uint64_t ring_size = std::ceil(scale);
513   ring_.reserve(ring_size);
514   // Populate the hash ring by walking through the (host, weight) pairs in
515   // normalized_host_weights, and generating (scale * weight) hashes for each
516   // host. Since these aren't necessarily whole numbers, we maintain running
517   // sums -- current_hashes and target_hashes -- which allows us to populate the
518   // ring in a mostly stable way.
519   absl::InlinedVector<char, 196> hash_key_buffer;
520   double current_hashes = 0.0;
521   double target_hashes = 0.0;
522   uint64_t min_hashes_per_host = ring_size;
523   uint64_t max_hashes_per_host = 0;
524   for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
525     const std::string& address_string = address_weights[i].address;
526     hash_key_buffer.assign(address_string.begin(), address_string.end());
527     hash_key_buffer.emplace_back('_');
528     auto offset_start = hash_key_buffer.end();
529     target_hashes += scale * address_weights[i].normalized_weight;
530     size_t count = 0;
531     while (current_hashes < target_hashes) {
532       const std::string count_str = absl::StrCat(count);
533       hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end());
534       absl::string_view hash_key(hash_key_buffer.data(),
535                                  hash_key_buffer.size());
536       const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
537       ring_.push_back({hash, i});
538       ++count;
539       ++current_hashes;
540       hash_key_buffer.erase(offset_start, hash_key_buffer.end());
541     }
542     min_hashes_per_host =
543         std::min(static_cast<uint64_t>(i), min_hashes_per_host);
544     max_hashes_per_host =
545         std::max(static_cast<uint64_t>(i), max_hashes_per_host);
546   }
547   std::sort(ring_.begin(), ring_.end(),
548             [](const RingEntry& lhs, const RingEntry& rhs) -> bool {
549               return lhs.hash < rhs.hash;
550             });
551 }
552 
553 //
554 // RingHash::RingHashSubchannelList
555 //
556 
RingHashSubchannelList(RingHash * policy,ServerAddressList addresses,const ChannelArgs & args)557 RingHash::RingHashSubchannelList::RingHashSubchannelList(
558     RingHash* policy, ServerAddressList addresses, const ChannelArgs& args)
559     : SubchannelList(policy,
560                      (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)
561                           ? "RingHashSubchannelList"
562                           : nullptr),
563                      std::move(addresses), policy->channel_control_helper(),
564                      args),
565       num_idle_(num_subchannels()) {
566   // Need to maintain a ref to the LB policy as long as we maintain
567   // any references to subchannels, since the subchannels'
568   // pollset_sets will include the LB policy's pollset_set.
569   policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
570   // Construct the ring.
571   ring_ = MakeRefCounted<Ring>(policy->config_.get(), this, args);
572   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
573     gpr_log(GPR_INFO,
574             "[RH %p] created subchannel list %p with %" PRIuPTR " ring entries",
575             policy, this, ring_->ring().size());
576   }
577 }
578 
UpdateStateCountersLocked(grpc_connectivity_state old_state,grpc_connectivity_state new_state)579 void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
580     grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
581   if (old_state == GRPC_CHANNEL_IDLE) {
582     GPR_ASSERT(num_idle_ > 0);
583     --num_idle_;
584   } else if (old_state == GRPC_CHANNEL_READY) {
585     GPR_ASSERT(num_ready_ > 0);
586     --num_ready_;
587   } else if (old_state == GRPC_CHANNEL_CONNECTING) {
588     GPR_ASSERT(num_connecting_ > 0);
589     --num_connecting_;
590   } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
591     GPR_ASSERT(num_transient_failure_ > 0);
592     --num_transient_failure_;
593   }
594   GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
595   if (new_state == GRPC_CHANNEL_IDLE) {
596     ++num_idle_;
597   } else if (new_state == GRPC_CHANNEL_READY) {
598     ++num_ready_;
599   } else if (new_state == GRPC_CHANNEL_CONNECTING) {
600     ++num_connecting_;
601   } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
602     ++num_transient_failure_;
603   }
604 }
605 
UpdateRingHashConnectivityStateLocked(size_t index,bool connection_attempt_complete,absl::Status status)606 void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked(
607     size_t index, bool connection_attempt_complete, absl::Status status) {
608   RingHash* p = static_cast<RingHash*>(policy());
609   // If this is latest_pending_subchannel_list_, then swap it into
610   // subchannel_list_ as soon as we get the initial connectivity state
611   // report for every subchannel in the list.
612   if (p->latest_pending_subchannel_list_.get() == this &&
613       AllSubchannelsSeenInitialState()) {
614     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
615       gpr_log(GPR_INFO, "[RH %p] replacing subchannel list %p with %p", p,
616               p->subchannel_list_.get(), this);
617     }
618     p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
619   }
620   // Only set connectivity state if this is the current subchannel list.
621   if (p->subchannel_list_.get() != this) return;
622   // The overall aggregation rules here are:
623   // 1. If there is at least one subchannel in READY state, report READY.
624   // 2. If there are 2 or more subchannels in TRANSIENT_FAILURE state, report
625   //    TRANSIENT_FAILURE.
626   // 3. If there is at least one subchannel in CONNECTING state, report
627   //    CONNECTING.
628   // 4. If there is one subchannel in TRANSIENT_FAILURE state and there is
629   //    more than one subchannel, report CONNECTING.
630   // 5. If there is at least one subchannel in IDLE state, report IDLE.
631   // 6. Otherwise, report TRANSIENT_FAILURE.
632   //
633   // We set start_connection_attempt to true if we match rules 2, 3, or 6.
634   grpc_connectivity_state state;
635   bool start_connection_attempt = false;
636   if (num_ready_ > 0) {
637     state = GRPC_CHANNEL_READY;
638   } else if (num_transient_failure_ >= 2) {
639     state = GRPC_CHANNEL_TRANSIENT_FAILURE;
640     start_connection_attempt = true;
641   } else if (num_connecting_ > 0) {
642     state = GRPC_CHANNEL_CONNECTING;
643   } else if (num_transient_failure_ == 1 && num_subchannels() > 1) {
644     state = GRPC_CHANNEL_CONNECTING;
645     start_connection_attempt = true;
646   } else if (num_idle_ > 0) {
647     state = GRPC_CHANNEL_IDLE;
648   } else {
649     state = GRPC_CHANNEL_TRANSIENT_FAILURE;
650     start_connection_attempt = true;
651   }
652   // In TRANSIENT_FAILURE, report the last reported failure.
653   // Otherwise, report OK.
654   if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
655     if (!status.ok()) {
656       last_failure_ = absl::UnavailableError(absl::StrCat(
657           "no reachable subchannels; last error: ", status.ToString()));
658     }
659     status = last_failure_;
660   } else {
661     status = absl::OkStatus();
662   }
663   // Generate new picker and return it to the channel.
664   // Note that we use our own picker regardless of connectivity state.
665   p->channel_control_helper()->UpdateState(
666       state, status,
667       MakeRefCounted<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"), this));
668   // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
669   // not be getting any pick requests from the priority policy.
670   // However, because the ring_hash policy does not attempt to
671   // reconnect to subchannels unless it is getting pick requests,
672   // it will need special handling to ensure that it will eventually
673   // recover from TRANSIENT_FAILURE state once the problem is resolved.
674   // Specifically, it will make sure that it is attempting to connect to
675   // at least one subchannel at any given time.  After a given subchannel
676   // fails a connection attempt, it will move on to the next subchannel
677   // in the ring.  It will keep doing this until one of the subchannels
678   // successfully connects, at which point it will report READY and stop
679   // proactively trying to connect.  The policy will remain in
680   // TRANSIENT_FAILURE until at least one subchannel becomes connected,
681   // even if subchannels are in state CONNECTING during that time.
682   //
683   // Note that we do the same thing when the policy is in state
684   // CONNECTING, just to ensure that we don't remain in CONNECTING state
685   // indefinitely if there are no new picks coming in.
686   if (internally_triggered_connection_index_.has_value() &&
687       *internally_triggered_connection_index_ == index &&
688       connection_attempt_complete) {
689     internally_triggered_connection_index_.reset();
690   }
691   if (start_connection_attempt &&
692       !internally_triggered_connection_index_.has_value()) {
693     size_t next_index = (index + 1) % num_subchannels();
694     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
695       gpr_log(GPR_INFO,
696               "[RH %p] triggering internal connection attempt for subchannel "
697               "%p, subchannel_list %p (index %" PRIuPTR " of %" PRIuPTR ")",
698               p, subchannel(next_index)->subchannel(), this, next_index,
699               num_subchannels());
700     }
701     internally_triggered_connection_index_ = next_index;
702     subchannel(next_index)->subchannel()->RequestConnection();
703   }
704 }
705 
706 //
707 // RingHash::RingHashSubchannelData
708 //
709 
ProcessConnectivityChangeLocked(absl::optional<grpc_connectivity_state> old_state,grpc_connectivity_state new_state)710 void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
711     absl::optional<grpc_connectivity_state> old_state,
712     grpc_connectivity_state new_state) {
713   RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
714   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
715     gpr_log(
716         GPR_INFO,
717         "[RH %p] connectivity changed for subchannel %p, subchannel_list %p "
718         "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
719         p, subchannel(), subchannel_list(), Index(),
720         subchannel_list()->num_subchannels(),
721         ConnectivityStateName(logical_connectivity_state_),
722         ConnectivityStateName(new_state));
723   }
724   GPR_ASSERT(subchannel() != nullptr);
725   // If this is not the initial state notification and the new state is
726   // TRANSIENT_FAILURE or IDLE, re-resolve.
727   // Note that we don't want to do this on the initial state notification,
728   // because that would result in an endless loop of re-resolution.
729   if (old_state.has_value() && (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
730                                 new_state == GRPC_CHANNEL_IDLE)) {
731     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
732       gpr_log(GPR_INFO,
733               "[RH %p] Subchannel %p reported %s; requesting re-resolution", p,
734               subchannel(), ConnectivityStateName(new_state));
735     }
736     p->channel_control_helper()->RequestReresolution();
737   }
738   const bool connection_attempt_complete = new_state != GRPC_CHANNEL_CONNECTING;
739   // Decide what state to report for the purposes of aggregation and
740   // picker behavior.
741   // If the last recorded state was TRANSIENT_FAILURE, ignore the change
742   // unless the new state is READY (or TF again, in which case we need
743   // to update the status).
744   if (logical_connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE ||
745       new_state == GRPC_CHANNEL_READY ||
746       new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
747     // Update state counters used for aggregation.
748     subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_,
749                                                  new_state);
750     // Update logical state.
751     logical_connectivity_state_ = new_state;
752     logical_connectivity_status_ = connectivity_status();
753   }
754   // Update the RH policy's connectivity state, creating new picker and new
755   // ring.
756   subchannel_list()->UpdateRingHashConnectivityStateLocked(
757       Index(), connection_attempt_complete, logical_connectivity_status_);
758 }
759 
760 //
761 // RingHash
762 //
763 
RingHash(Args args)764 RingHash::RingHash(Args args) : LoadBalancingPolicy(std::move(args)) {
765   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
766     gpr_log(GPR_INFO, "[RH %p] Created", this);
767   }
768 }
769 
~RingHash()770 RingHash::~RingHash() {
771   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
772     gpr_log(GPR_INFO, "[RH %p] Destroying Ring Hash policy", this);
773   }
774   GPR_ASSERT(subchannel_list_ == nullptr);
775   GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
776 }
777 
ShutdownLocked()778 void RingHash::ShutdownLocked() {
779   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
780     gpr_log(GPR_INFO, "[RH %p] Shutting down", this);
781   }
782   shutdown_ = true;
783   subchannel_list_.reset();
784   latest_pending_subchannel_list_.reset();
785 }
786 
ResetBackoffLocked()787 void RingHash::ResetBackoffLocked() {
788   subchannel_list_->ResetBackoffLocked();
789   if (latest_pending_subchannel_list_ != nullptr) {
790     latest_pending_subchannel_list_->ResetBackoffLocked();
791   }
792 }
793 
UpdateLocked(UpdateArgs args)794 absl::Status RingHash::UpdateLocked(UpdateArgs args) {
795   config_ = std::move(args.config);
796   ServerAddressList addresses;
797   if (args.addresses.ok()) {
798     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
799       gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses",
800               this, args.addresses->size());
801     }
802     addresses = *std::move(args.addresses);
803   } else {
804     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
805       gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s",
806               this, args.addresses.status().ToString().c_str());
807     }
808     // If we already have a subchannel list, then keep using the existing
809     // list, but still report back that the update was not accepted.
810     if (subchannel_list_ != nullptr) return args.addresses.status();
811   }
812   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) &&
813       latest_pending_subchannel_list_ != nullptr) {
814     gpr_log(GPR_INFO, "[RH %p] replacing latest pending subchannel list %p",
815             this, latest_pending_subchannel_list_.get());
816   }
817   latest_pending_subchannel_list_ = MakeRefCounted<RingHashSubchannelList>(
818       this, std::move(addresses), args.args);
819   latest_pending_subchannel_list_->StartWatchingLocked();
820   // If we have no existing list or the new list is empty, immediately
821   // promote the new list.
822   // Otherwise, do nothing; the new list will be promoted when the
823   // initial subchannel states are reported.
824   if (subchannel_list_ == nullptr ||
825       latest_pending_subchannel_list_->num_subchannels() == 0) {
826     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) &&
827         subchannel_list_ != nullptr) {
828       gpr_log(GPR_INFO,
829               "[RH %p] empty address list, replacing subchannel list %p", this,
830               subchannel_list_.get());
831     }
832     subchannel_list_ = std::move(latest_pending_subchannel_list_);
833     // If the new list is empty, report TRANSIENT_FAILURE.
834     if (subchannel_list_->num_subchannels() == 0) {
835       absl::Status status =
836           args.addresses.ok()
837               ? absl::UnavailableError(
838                     absl::StrCat("empty address list: ", args.resolution_note))
839               : args.addresses.status();
840       channel_control_helper()->UpdateState(
841           GRPC_CHANNEL_TRANSIENT_FAILURE, status,
842           MakeRefCounted<TransientFailurePicker>(status));
843       return status;
844     }
845     // Otherwise, report IDLE.
846     subchannel_list_->UpdateRingHashConnectivityStateLocked(
847         /*index=*/0, /*connection_attempt_complete=*/false, absl::OkStatus());
848   }
849   return absl::OkStatus();
850 }
851 
852 //
853 // factory
854 //
855 
856 class RingHashFactory : public LoadBalancingPolicyFactory {
857  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const858   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
859       LoadBalancingPolicy::Args args) const override {
860     return MakeOrphanable<RingHash>(std::move(args));
861   }
862 
name() const863   absl::string_view name() const override { return kRingHash; }
864 
865   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const866   ParseLoadBalancingConfig(const Json& json) const override {
867     auto config = LoadFromJson<RingHashConfig>(
868         json, JsonArgs(), "errors validating ring_hash LB policy config");
869     if (!config.ok()) return config.status();
870     return MakeRefCounted<RingHashLbConfig>(config->min_ring_size,
871                                             config->max_ring_size);
872   }
873 };
874 
875 }  // namespace
876 
RegisterRingHashLbPolicy(CoreConfiguration::Builder * builder)877 void RegisterRingHashLbPolicy(CoreConfiguration::Builder* builder) {
878   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
879       std::make_unique<RingHashFactory>());
880 }
881 
882 }  // namespace grpc_core
883