xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/outlier_detection/outlier_detection.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/load_balancing/outlier_detection/outlier_detection.h"
20 
21 #include <inttypes.h>
22 #include <stddef.h>
23 
24 #include <algorithm>
25 #include <atomic>
26 #include <cmath>
27 #include <map>
28 #include <memory>
29 #include <set>
30 #include <string>
31 #include <type_traits>
32 #include <utility>
33 #include <vector>
34 
35 #include "absl/base/thread_annotations.h"
36 #include "absl/meta/type_traits.h"
37 #include "absl/random/random.h"
38 #include "absl/status/status.h"
39 #include "absl/status/statusor.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/types/variant.h"
42 
43 #include <grpc/event_engine/event_engine.h>
44 #include <grpc/impl/connectivity_state.h>
45 #include <grpc/support/log.h>
46 
47 #include "src/core/client_channel/subchannel_interface_internal.h"
48 #include "src/core/lib/address_utils/sockaddr_utils.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/config/core_configuration.h"
51 #include "src/core/lib/debug/trace.h"
52 #include "src/core/lib/experiments/experiments.h"
53 #include "src/core/lib/gprpp/debug_location.h"
54 #include "src/core/lib/gprpp/orphanable.h"
55 #include "src/core/lib/gprpp/ref_counted.h"
56 #include "src/core/lib/gprpp/ref_counted_ptr.h"
57 #include "src/core/lib/gprpp/sync.h"
58 #include "src/core/lib/gprpp/unique_type_name.h"
59 #include "src/core/lib/gprpp/validation_errors.h"
60 #include "src/core/lib/gprpp/work_serializer.h"
61 #include "src/core/lib/iomgr/exec_ctx.h"
62 #include "src/core/lib/iomgr/iomgr_fwd.h"
63 #include "src/core/lib/iomgr/pollset_set.h"
64 #include "src/core/lib/iomgr/resolved_address.h"
65 #include "src/core/lib/json/json.h"
66 #include "src/core/lib/transport/connectivity_state.h"
67 #include "src/core/load_balancing/child_policy_handler.h"
68 #include "src/core/load_balancing/delegating_helper.h"
69 #include "src/core/load_balancing/health_check_client_internal.h"
70 #include "src/core/load_balancing/lb_policy.h"
71 #include "src/core/load_balancing/lb_policy_factory.h"
72 #include "src/core/load_balancing/lb_policy_registry.h"
73 #include "src/core/load_balancing/subchannel_interface.h"
74 #include "src/core/resolver/endpoint_addresses.h"
75 
76 namespace grpc_core {
77 
78 TraceFlag grpc_outlier_detection_lb_trace(false, "outlier_detection_lb");
79 
80 namespace {
81 
82 using ::grpc_event_engine::experimental::EventEngine;
83 
84 constexpr absl::string_view kOutlierDetection =
85     "outlier_detection_experimental";
86 
87 // Config for xDS Cluster Impl LB policy.
88 class OutlierDetectionLbConfig final : public LoadBalancingPolicy::Config {
89  public:
OutlierDetectionLbConfig(OutlierDetectionConfig outlier_detection_config,RefCountedPtr<LoadBalancingPolicy::Config> child_policy)90   OutlierDetectionLbConfig(
91       OutlierDetectionConfig outlier_detection_config,
92       RefCountedPtr<LoadBalancingPolicy::Config> child_policy)
93       : outlier_detection_config_(outlier_detection_config),
94         child_policy_(std::move(child_policy)) {}
95 
name() const96   absl::string_view name() const override { return kOutlierDetection; }
97 
CountingEnabled() const98   bool CountingEnabled() const {
99     return outlier_detection_config_.success_rate_ejection.has_value() ||
100            outlier_detection_config_.failure_percentage_ejection.has_value();
101   }
102 
outlier_detection_config() const103   const OutlierDetectionConfig& outlier_detection_config() const {
104     return outlier_detection_config_;
105   }
106 
child_policy() const107   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
108     return child_policy_;
109   }
110 
111  private:
112   OutlierDetectionConfig outlier_detection_config_;
113   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
114 };
115 
116 // xDS Cluster Impl LB policy.
117 class OutlierDetectionLb final : public LoadBalancingPolicy {
118  public:
119   explicit OutlierDetectionLb(Args args);
120 
name() const121   absl::string_view name() const override { return kOutlierDetection; }
122 
123   absl::Status UpdateLocked(UpdateArgs args) override;
124   void ExitIdleLocked() override;
125   void ResetBackoffLocked() override;
126 
127  private:
128   class SubchannelState;
129   class EndpointState;
130 
131   class SubchannelWrapper final : public DelegatingSubchannel {
132    public:
SubchannelWrapper(std::shared_ptr<WorkSerializer> work_serializer,RefCountedPtr<SubchannelState> subchannel_state,RefCountedPtr<SubchannelInterface> subchannel)133     SubchannelWrapper(std::shared_ptr<WorkSerializer> work_serializer,
134                       RefCountedPtr<SubchannelState> subchannel_state,
135                       RefCountedPtr<SubchannelInterface> subchannel)
136         : DelegatingSubchannel(std::move(subchannel)),
137           work_serializer_(std::move(work_serializer)),
138           subchannel_state_(std::move(subchannel_state)) {
139       if (subchannel_state_ != nullptr) {
140         subchannel_state_->AddSubchannel(this);
141         if (subchannel_state_->endpoint_state()->ejection_time().has_value()) {
142           ejected_ = true;
143         }
144       }
145     }
146 
147     void Eject();
148 
149     void Uneject();
150 
151     void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override;
152 
153     void CancelDataWatcher(DataWatcherInterface* watcher) override;
154 
endpoint_state() const155     RefCountedPtr<EndpointState> endpoint_state() const {
156       if (subchannel_state_ == nullptr) return nullptr;
157       return subchannel_state_->endpoint_state();
158     }
159 
160    private:
161     class WatcherWrapper final
162         : public SubchannelInterface::ConnectivityStateWatcherInterface {
163      public:
WatcherWrapper(std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> health_watcher,bool ejected)164       WatcherWrapper(std::shared_ptr<
165                          SubchannelInterface::ConnectivityStateWatcherInterface>
166                          health_watcher,
167                      bool ejected)
168           : watcher_(std::move(health_watcher)), ejected_(ejected) {}
169 
Eject()170       void Eject() {
171         ejected_ = true;
172         if (last_seen_state_.has_value()) {
173           watcher_->OnConnectivityStateChange(
174               GRPC_CHANNEL_TRANSIENT_FAILURE,
175               absl::UnavailableError(
176                   "subchannel ejected by outlier detection"));
177         }
178       }
179 
Uneject()180       void Uneject() {
181         ejected_ = false;
182         if (last_seen_state_.has_value()) {
183           watcher_->OnConnectivityStateChange(*last_seen_state_,
184                                               last_seen_status_);
185         }
186       }
187 
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)188       void OnConnectivityStateChange(grpc_connectivity_state new_state,
189                                      absl::Status status) override {
190         const bool send_update = !last_seen_state_.has_value() || !ejected_;
191         last_seen_state_ = new_state;
192         last_seen_status_ = status;
193         if (send_update) {
194           if (ejected_) {
195             new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
196             status = absl::UnavailableError(
197                 "subchannel ejected by outlier detection");
198           }
199           watcher_->OnConnectivityStateChange(new_state, status);
200         }
201       }
202 
interested_parties()203       grpc_pollset_set* interested_parties() override {
204         return watcher_->interested_parties();
205       }
206 
207      private:
208       std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
209           watcher_;
210       absl::optional<grpc_connectivity_state> last_seen_state_;
211       absl::Status last_seen_status_;
212       bool ejected_;
213     };
214 
Orphaned()215     void Orphaned() override {
216       if (!IsWorkSerializerDispatchEnabled()) {
217         if (subchannel_state_ != nullptr) {
218           subchannel_state_->RemoveSubchannel(this);
219         }
220         return;
221       }
222       work_serializer_->Run(
223           [self = WeakRefAsSubclass<SubchannelWrapper>()]() {
224             if (self->subchannel_state_ != nullptr) {
225               self->subchannel_state_->RemoveSubchannel(self.get());
226             }
227           },
228           DEBUG_LOCATION);
229     }
230 
231     std::shared_ptr<WorkSerializer> work_serializer_;
232     RefCountedPtr<SubchannelState> subchannel_state_;
233     bool ejected_ = false;
234     WatcherWrapper* watcher_wrapper_ = nullptr;
235   };
236 
237   class SubchannelState final : public RefCounted<SubchannelState> {
238    public:
AddSubchannel(SubchannelWrapper * wrapper)239     void AddSubchannel(SubchannelWrapper* wrapper) {
240       subchannels_.insert(wrapper);
241     }
242 
RemoveSubchannel(SubchannelWrapper * wrapper)243     void RemoveSubchannel(SubchannelWrapper* wrapper) {
244       subchannels_.erase(wrapper);
245     }
246 
endpoint_state()247     RefCountedPtr<EndpointState> endpoint_state() {
248       MutexLock lock(&mu_);
249       return endpoint_state_;
250     }
251 
set_endpoint_state(RefCountedPtr<EndpointState> endpoint_state)252     void set_endpoint_state(RefCountedPtr<EndpointState> endpoint_state) {
253       MutexLock lock(&mu_);
254       endpoint_state_ = std::move(endpoint_state);
255     }
256 
Eject()257     void Eject() {
258       // Ejecting the subchannel may cause the child policy to unref the
259       // subchannel, so we need to be prepared for the set to be modified
260       // while we are iterating.
261       for (auto it = subchannels_.begin(); it != subchannels_.end();) {
262         SubchannelWrapper* subchannel = *it;
263         ++it;
264         subchannel->Eject();
265       }
266     }
267 
Uneject()268     void Uneject() {
269       for (auto& subchannel : subchannels_) {
270         subchannel->Uneject();
271       }
272     }
273 
274    private:
275     std::set<SubchannelWrapper*> subchannels_;
276     Mutex mu_;
277     RefCountedPtr<EndpointState> endpoint_state_ ABSL_GUARDED_BY(mu_);
278   };
279 
280   class EndpointState final : public RefCounted<EndpointState> {
281    public:
EndpointState(std::set<SubchannelState * > subchannels)282     explicit EndpointState(std::set<SubchannelState*> subchannels)
283         : subchannels_(std::move(subchannels)) {
284       for (SubchannelState* subchannel : subchannels_) {
285         subchannel->set_endpoint_state(Ref());
286       }
287     }
288 
RotateBucket()289     void RotateBucket() {
290       backup_bucket_->successes = 0;
291       backup_bucket_->failures = 0;
292       current_bucket_.swap(backup_bucket_);
293       active_bucket_.store(current_bucket_.get());
294     }
295 
GetSuccessRateAndVolume()296     absl::optional<std::pair<double, uint64_t>> GetSuccessRateAndVolume() {
297       uint64_t total_request =
298           backup_bucket_->successes + backup_bucket_->failures;
299       if (total_request == 0) {
300         return absl::nullopt;
301       }
302       double success_rate =
303           backup_bucket_->successes * 100.0 /
304           (backup_bucket_->successes + backup_bucket_->failures);
305       return {
306           {success_rate, backup_bucket_->successes + backup_bucket_->failures}};
307     }
308 
AddSuccessCount()309     void AddSuccessCount() { active_bucket_.load()->successes.fetch_add(1); }
310 
AddFailureCount()311     void AddFailureCount() { active_bucket_.load()->failures.fetch_add(1); }
312 
ejection_time() const313     absl::optional<Timestamp> ejection_time() const { return ejection_time_; }
314 
Eject(const Timestamp & time)315     void Eject(const Timestamp& time) {
316       ejection_time_ = time;
317       ++multiplier_;
318       for (SubchannelState* subchannel_state : subchannels_) {
319         subchannel_state->Eject();
320       }
321     }
322 
Uneject()323     void Uneject() {
324       ejection_time_.reset();
325       for (SubchannelState* subchannel_state : subchannels_) {
326         subchannel_state->Uneject();
327       }
328     }
329 
MaybeUneject(uint64_t base_ejection_time_in_millis,uint64_t max_ejection_time_in_millis)330     bool MaybeUneject(uint64_t base_ejection_time_in_millis,
331                       uint64_t max_ejection_time_in_millis) {
332       if (!ejection_time_.has_value()) {
333         if (multiplier_ > 0) {
334           --multiplier_;
335         }
336       } else {
337         GPR_ASSERT(ejection_time_.has_value());
338         auto change_time = ejection_time_.value() +
339                            Duration::Milliseconds(std::min(
340                                base_ejection_time_in_millis * multiplier_,
341                                std::max(base_ejection_time_in_millis,
342                                         max_ejection_time_in_millis)));
343         if (change_time < Timestamp::Now()) {
344           Uneject();
345           return true;
346         }
347       }
348       return false;
349     }
350 
DisableEjection()351     void DisableEjection() {
352       if (ejection_time_.has_value()) Uneject();
353       multiplier_ = 0;
354     }
355 
356    private:
357     struct Bucket {
358       std::atomic<uint64_t> successes;
359       std::atomic<uint64_t> failures;
360     };
361 
362     const std::set<SubchannelState*> subchannels_;
363 
364     std::unique_ptr<Bucket> current_bucket_ = std::make_unique<Bucket>();
365     std::unique_ptr<Bucket> backup_bucket_ = std::make_unique<Bucket>();
366     // The bucket used to update call counts.
367     // Points to either current_bucket or active_bucket.
368     std::atomic<Bucket*> active_bucket_{current_bucket_.get()};
369     uint32_t multiplier_ = 0;
370     absl::optional<Timestamp> ejection_time_;
371   };
372 
373   // A picker that wraps the picker from the child to perform outlier detection.
374   class Picker final : public SubchannelPicker {
375    public:
376     Picker(OutlierDetectionLb* outlier_detection_lb,
377            RefCountedPtr<SubchannelPicker> picker, bool counting_enabled);
378 
379     PickResult Pick(PickArgs args) override;
380 
381    private:
382     class SubchannelCallTracker;
383     RefCountedPtr<SubchannelPicker> picker_;
384     bool counting_enabled_;
385   };
386 
387   class Helper final
388       : public ParentOwningDelegatingChannelControlHelper<OutlierDetectionLb> {
389    public:
Helper(RefCountedPtr<OutlierDetectionLb> outlier_detection_policy)390     explicit Helper(RefCountedPtr<OutlierDetectionLb> outlier_detection_policy)
391         : ParentOwningDelegatingChannelControlHelper(
392               std::move(outlier_detection_policy)) {}
393 
394     RefCountedPtr<SubchannelInterface> CreateSubchannel(
395         const grpc_resolved_address& address,
396         const ChannelArgs& per_address_args, const ChannelArgs& args) override;
397     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
398                      RefCountedPtr<SubchannelPicker> picker) override;
399   };
400 
401   class EjectionTimer final : public InternallyRefCounted<EjectionTimer> {
402    public:
403     EjectionTimer(RefCountedPtr<OutlierDetectionLb> parent,
404                   Timestamp start_time);
405 
406     void Orphan() override;
407 
StartTime() const408     Timestamp StartTime() const { return start_time_; }
409 
410    private:
411     void OnTimerLocked();
412 
413     RefCountedPtr<OutlierDetectionLb> parent_;
414     absl::optional<EventEngine::TaskHandle> timer_handle_;
415     Timestamp start_time_;
416     absl::BitGen bit_gen_;
417   };
418 
419   ~OutlierDetectionLb() override;
420 
421   void ShutdownLocked() override;
422 
423   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
424       const ChannelArgs& args);
425 
426   void MaybeUpdatePickerLocked();
427 
428   // Current config from the resolver.
429   RefCountedPtr<OutlierDetectionLbConfig> config_;
430 
431   // Internal state.
432   bool shutting_down_ = false;
433 
434   OrphanablePtr<LoadBalancingPolicy> child_policy_;
435 
436   // Latest state and picker reported by the child policy.
437   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
438   absl::Status status_;
439   RefCountedPtr<SubchannelPicker> picker_;
440   std::map<EndpointAddressSet, RefCountedPtr<EndpointState>>
441       endpoint_state_map_;
442   std::map<grpc_resolved_address, RefCountedPtr<SubchannelState>,
443            ResolvedAddressLessThan>
444       subchannel_state_map_;
445   OrphanablePtr<EjectionTimer> ejection_timer_;
446 };
447 
448 //
449 // OutlierDetectionLb::SubchannelWrapper
450 //
451 
Eject()452 void OutlierDetectionLb::SubchannelWrapper::Eject() {
453   ejected_ = true;
454   if (watcher_wrapper_ != nullptr) watcher_wrapper_->Eject();
455 }
456 
Uneject()457 void OutlierDetectionLb::SubchannelWrapper::Uneject() {
458   ejected_ = false;
459   if (watcher_wrapper_ != nullptr) watcher_wrapper_->Uneject();
460 }
461 
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)462 void OutlierDetectionLb::SubchannelWrapper::AddDataWatcher(
463     std::unique_ptr<DataWatcherInterface> watcher) {
464   auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get());
465   if (w->type() == HealthProducer::Type()) {
466     auto* health_watcher = static_cast<HealthWatcher*>(watcher.get());
467     auto watcher_wrapper = std::make_shared<WatcherWrapper>(
468         health_watcher->TakeWatcher(), ejected_);
469     watcher_wrapper_ = watcher_wrapper.get();
470     health_watcher->SetWatcher(std::move(watcher_wrapper));
471   }
472   DelegatingSubchannel::AddDataWatcher(std::move(watcher));
473 }
474 
CancelDataWatcher(DataWatcherInterface * watcher)475 void OutlierDetectionLb::SubchannelWrapper::CancelDataWatcher(
476     DataWatcherInterface* watcher) {
477   auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher);
478   if (w->type() == HealthProducer::Type()) watcher_wrapper_ = nullptr;
479   DelegatingSubchannel::CancelDataWatcher(watcher);
480 }
481 
482 //
483 // OutlierDetectionLb::Picker::SubchannelCallTracker
484 //
485 
486 class OutlierDetectionLb::Picker::SubchannelCallTracker final
487     : public LoadBalancingPolicy::SubchannelCallTrackerInterface {
488  public:
SubchannelCallTracker(std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface> original_subchannel_call_tracker,RefCountedPtr<EndpointState> endpoint_state)489   SubchannelCallTracker(
490       std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
491           original_subchannel_call_tracker,
492       RefCountedPtr<EndpointState> endpoint_state)
493       : original_subchannel_call_tracker_(
494             std::move(original_subchannel_call_tracker)),
495         endpoint_state_(std::move(endpoint_state)) {}
496 
~SubchannelCallTracker()497   ~SubchannelCallTracker() override {
498     endpoint_state_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
499   }
500 
Start()501   void Start() override {
502     // This tracker does not care about started calls only finished calls.
503     // Delegate if needed.
504     if (original_subchannel_call_tracker_ != nullptr) {
505       original_subchannel_call_tracker_->Start();
506     }
507   }
508 
Finish(FinishArgs args)509   void Finish(FinishArgs args) override {
510     // Delegate if needed.
511     if (original_subchannel_call_tracker_ != nullptr) {
512       original_subchannel_call_tracker_->Finish(args);
513     }
514     // Record call completion based on status for outlier detection
515     // calculations.
516     if (args.status.ok()) {
517       endpoint_state_->AddSuccessCount();
518     } else {
519       endpoint_state_->AddFailureCount();
520     }
521   }
522 
523  private:
524   std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
525       original_subchannel_call_tracker_;
526   RefCountedPtr<EndpointState> endpoint_state_;
527 };
528 
529 //
530 // OutlierDetectionLb::Picker
531 //
532 
Picker(OutlierDetectionLb * outlier_detection_lb,RefCountedPtr<SubchannelPicker> picker,bool counting_enabled)533 OutlierDetectionLb::Picker::Picker(OutlierDetectionLb* outlier_detection_lb,
534                                    RefCountedPtr<SubchannelPicker> picker,
535                                    bool counting_enabled)
536     : picker_(std::move(picker)), counting_enabled_(counting_enabled) {
537   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
538     gpr_log(GPR_INFO,
539             "[outlier_detection_lb %p] constructed new picker %p and counting "
540             "is %s",
541             outlier_detection_lb, this,
542             (counting_enabled ? "enabled" : "disabled"));
543   }
544 }
545 
Pick(LoadBalancingPolicy::PickArgs args)546 LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(
547     LoadBalancingPolicy::PickArgs args) {
548   if (picker_ == nullptr) {  // Should never happen.
549     return PickResult::Fail(absl::InternalError(
550         "outlier_detection picker not given any child picker"));
551   }
552   // Delegate to child picker
553   PickResult result = picker_->Pick(args);
554   auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
555   if (complete_pick != nullptr) {
556     auto* subchannel_wrapper =
557         static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
558     // Inject subchannel call tracker to record call completion as long as
559     // either success_rate_ejection or failure_percentage_ejection is enabled.
560     if (counting_enabled_) {
561       auto endpoint_state = subchannel_wrapper->endpoint_state();
562       if (endpoint_state != nullptr) {
563         complete_pick->subchannel_call_tracker =
564             std::make_unique<SubchannelCallTracker>(
565                 std::move(complete_pick->subchannel_call_tracker),
566                 std::move(endpoint_state));
567       }
568     }
569     // Unwrap subchannel to pass back up the stack.
570     complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
571   }
572   return result;
573 }
574 
575 //
576 // OutlierDetectionLb
577 //
578 
OutlierDetectionLb(Args args)579 OutlierDetectionLb::OutlierDetectionLb(Args args)
580     : LoadBalancingPolicy(std::move(args)) {
581   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
582     gpr_log(GPR_INFO, "[outlier_detection_lb %p] created", this);
583   }
584 }
585 
~OutlierDetectionLb()586 OutlierDetectionLb::~OutlierDetectionLb() {
587   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
588     gpr_log(GPR_INFO,
589             "[outlier_detection_lb %p] destroying outlier_detection LB policy",
590             this);
591   }
592 }
593 
ShutdownLocked()594 void OutlierDetectionLb::ShutdownLocked() {
595   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
596     gpr_log(GPR_INFO, "[outlier_detection_lb %p] shutting down", this);
597   }
598   ejection_timer_.reset();
599   shutting_down_ = true;
600   // Remove the child policy's interested_parties pollset_set from the
601   // xDS policy.
602   if (child_policy_ != nullptr) {
603     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
604                                      interested_parties());
605     child_policy_.reset();
606   }
607   // Drop our ref to the child's picker, in case it's holding a ref to
608   // the child.
609   picker_.reset();
610 }
611 
ExitIdleLocked()612 void OutlierDetectionLb::ExitIdleLocked() {
613   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
614 }
615 
ResetBackoffLocked()616 void OutlierDetectionLb::ResetBackoffLocked() {
617   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
618 }
619 
UpdateLocked(UpdateArgs args)620 absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
621   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
622     gpr_log(GPR_INFO, "[outlier_detection_lb %p] Received update", this);
623   }
624   auto old_config = std::move(config_);
625   // Update config.
626   config_ = args.config.TakeAsSubclass<OutlierDetectionLbConfig>();
627   // Update outlier detection timer.
628   if (!config_->CountingEnabled()) {
629     // No need for timer.  Cancel the current timer, if any.
630     if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
631       gpr_log(GPR_INFO,
632               "[outlier_detection_lb %p] counting disabled, cancelling timer",
633               this);
634     }
635     ejection_timer_.reset();
636   } else if (ejection_timer_ == nullptr) {
637     // No timer running.  Start it now.
638     if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
639       gpr_log(GPR_INFO, "[outlier_detection_lb %p] starting timer", this);
640     }
641     ejection_timer_ = MakeOrphanable<EjectionTimer>(
642         RefAsSubclass<OutlierDetectionLb>(), Timestamp::Now());
643     for (const auto& p : endpoint_state_map_) {
644       p.second->RotateBucket();  // Reset call counters.
645     }
646   } else if (old_config->outlier_detection_config().interval !=
647              config_->outlier_detection_config().interval) {
648     // Timer interval changed.  Cancel the current timer and start a new one
649     // with the same start time.
650     // Note that if the new deadline is in the past, the timer will fire
651     // immediately.
652     if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
653       gpr_log(GPR_INFO,
654               "[outlier_detection_lb %p] interval changed, replacing timer",
655               this);
656     }
657     ejection_timer_ = MakeOrphanable<EjectionTimer>(
658         RefAsSubclass<OutlierDetectionLb>(), ejection_timer_->StartTime());
659   }
660   // Update subchannel and endpoint maps.
661   if (args.addresses.ok()) {
662     std::set<EndpointAddressSet> current_endpoints;
663     std::set<grpc_resolved_address, ResolvedAddressLessThan> current_addresses;
664     (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
665       EndpointAddressSet key(endpoint.addresses());
666       current_endpoints.emplace(key);
667       for (const grpc_resolved_address& address : endpoint.addresses()) {
668         current_addresses.emplace(address);
669       }
670       // Find the entry in the endpoint map.
671       auto it = endpoint_state_map_.find(key);
672       if (it == endpoint_state_map_.end()) {
673         if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
674           gpr_log(GPR_INFO,
675                   "[outlier_detection_lb %p] adding endpoint entry for %s",
676                   this, key.ToString().c_str());
677         }
678         // The endpoint is not present in the map, so we'll need to add it.
679         // Start by getting a pointer to the entry for each address in the
680         // subchannel map, creating the entry if needed.
681         std::set<SubchannelState*> subchannels;
682         for (const grpc_resolved_address& address : endpoint.addresses()) {
683           auto it2 = subchannel_state_map_.find(address);
684           if (it2 == subchannel_state_map_.end()) {
685             if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
686               std::string address_str = grpc_sockaddr_to_string(&address, false)
687                                             .value_or("<unknown>");
688               gpr_log(GPR_INFO,
689                       "[outlier_detection_lb %p] adding address entry for %s",
690                       this, address_str.c_str());
691             }
692             it2 = subchannel_state_map_
693                       .emplace(address, MakeRefCounted<SubchannelState>())
694                       .first;
695           }
696           subchannels.insert(it2->second.get());
697         }
698         // Now create the endpoint.
699         endpoint_state_map_.emplace(
700             key, MakeRefCounted<EndpointState>(std::move(subchannels)));
701       } else if (!config_->CountingEnabled()) {
702         // If counting is not enabled, reset state.
703         if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
704           gpr_log(GPR_INFO,
705                   "[outlier_detection_lb %p] counting disabled; disabling "
706                   "ejection for %s",
707                   this, key.ToString().c_str());
708         }
709         it->second->DisableEjection();
710       }
711     });
712     // Remove any entries we no longer need in the subchannel map.
713     for (auto it = subchannel_state_map_.begin();
714          it != subchannel_state_map_.end();) {
715       if (current_addresses.find(it->first) == current_addresses.end()) {
716         if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
717           std::string address_str =
718               grpc_sockaddr_to_string(&it->first, false).value_or("<unknown>");
719           gpr_log(GPR_INFO,
720                   "[outlier_detection_lb %p] removing subchannel map entry %s",
721                   this, address_str.c_str());
722         }
723         // Don't hold a ref to the corresponding EndpointState object,
724         // because there could be subchannel wrappers keeping this alive
725         // for a while, and we don't need them to do any call tracking.
726         it->second->set_endpoint_state(nullptr);
727         it = subchannel_state_map_.erase(it);
728       } else {
729         ++it;
730       }
731     }
732     // Remove any entries we no longer need in the endpoint map.
733     for (auto it = endpoint_state_map_.begin();
734          it != endpoint_state_map_.end();) {
735       if (current_endpoints.find(it->first) == current_endpoints.end()) {
736         if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
737           gpr_log(GPR_INFO,
738                   "[outlier_detection_lb %p] removing endpoint map entry %s",
739                   this, it->first.ToString().c_str());
740         }
741         it = endpoint_state_map_.erase(it);
742       } else {
743         ++it;
744       }
745     }
746   }
747   // Create child policy if needed.
748   if (child_policy_ == nullptr) {
749     child_policy_ = CreateChildPolicyLocked(args.args);
750   }
751   // Update child policy.
752   UpdateArgs update_args;
753   update_args.addresses = std::move(args.addresses);
754   update_args.resolution_note = std::move(args.resolution_note);
755   update_args.config = config_->child_policy();
756   update_args.args = std::move(args.args);
757   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
758     gpr_log(GPR_INFO,
759             "[outlier_detection_lb %p] Updating child policy handler %p", this,
760             child_policy_.get());
761   }
762   return child_policy_->UpdateLocked(std::move(update_args));
763 }
764 
MaybeUpdatePickerLocked()765 void OutlierDetectionLb::MaybeUpdatePickerLocked() {
766   if (picker_ != nullptr) {
767     auto outlier_detection_picker =
768         MakeRefCounted<Picker>(this, picker_, config_->CountingEnabled());
769     if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
770       gpr_log(GPR_INFO,
771               "[outlier_detection_lb %p] updating connectivity: state=%s "
772               "status=(%s) picker=%p",
773               this, ConnectivityStateName(state_), status_.ToString().c_str(),
774               outlier_detection_picker.get());
775     }
776     channel_control_helper()->UpdateState(state_, status_,
777                                           std::move(outlier_detection_picker));
778   }
779 }
780 
CreateChildPolicyLocked(const ChannelArgs & args)781 OrphanablePtr<LoadBalancingPolicy> OutlierDetectionLb::CreateChildPolicyLocked(
782     const ChannelArgs& args) {
783   LoadBalancingPolicy::Args lb_policy_args;
784   lb_policy_args.work_serializer = work_serializer();
785   lb_policy_args.args = args;
786   lb_policy_args.channel_control_helper = std::make_unique<Helper>(
787       RefAsSubclass<OutlierDetectionLb>(DEBUG_LOCATION, "Helper"));
788   OrphanablePtr<LoadBalancingPolicy> lb_policy =
789       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
790                                          &grpc_outlier_detection_lb_trace);
791   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
792     gpr_log(GPR_INFO,
793             "[outlier_detection_lb %p] Created new child policy handler %p",
794             this, lb_policy.get());
795   }
796   // Add our interested_parties pollset_set to that of the newly created
797   // child policy. This will make the child policy progress upon activity on
798   // this policy, which in turn is tied to the application's call.
799   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
800                                    interested_parties());
801   return lb_policy;
802 }
803 
804 //
805 // OutlierDetectionLb::Helper
806 //
807 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)808 RefCountedPtr<SubchannelInterface> OutlierDetectionLb::Helper::CreateSubchannel(
809     const grpc_resolved_address& address, const ChannelArgs& per_address_args,
810     const ChannelArgs& args) {
811   if (parent()->shutting_down_) return nullptr;
812   RefCountedPtr<SubchannelState> subchannel_state;
813   auto it = parent()->subchannel_state_map_.find(address);
814   if (it != parent()->subchannel_state_map_.end()) {
815     subchannel_state = it->second->Ref();
816   }
817   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
818     std::string address_str =
819         grpc_sockaddr_to_string(&address, false).value_or("<unknown>");
820     gpr_log(GPR_INFO,
821             "[outlier_detection_lb %p] creating subchannel for %s, "
822             "subchannel state %p",
823             parent(), address_str.c_str(), subchannel_state.get());
824   }
825   auto subchannel = MakeRefCounted<SubchannelWrapper>(
826       parent()->work_serializer(), subchannel_state,
827       parent()->channel_control_helper()->CreateSubchannel(
828           address, per_address_args, args));
829   if (subchannel_state != nullptr) {
830     subchannel_state->AddSubchannel(subchannel.get());
831   }
832   return subchannel;
833 }
834 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)835 void OutlierDetectionLb::Helper::UpdateState(
836     grpc_connectivity_state state, const absl::Status& status,
837     RefCountedPtr<SubchannelPicker> picker) {
838   if (parent()->shutting_down_) return;
839   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
840     gpr_log(GPR_INFO,
841             "[outlier_detection_lb %p] child connectivity state update: "
842             "state=%s (%s) picker=%p",
843             parent(), ConnectivityStateName(state), status.ToString().c_str(),
844             picker.get());
845   }
846   // Save the state and picker.
847   parent()->state_ = state;
848   parent()->status_ = status;
849   parent()->picker_ = std::move(picker);
850   // Wrap the picker and return it to the channel.
851   parent()->MaybeUpdatePickerLocked();
852 }
853 
854 //
855 // OutlierDetectionLb::EjectionTimer
856 //
857 
EjectionTimer(RefCountedPtr<OutlierDetectionLb> parent,Timestamp start_time)858 OutlierDetectionLb::EjectionTimer::EjectionTimer(
859     RefCountedPtr<OutlierDetectionLb> parent, Timestamp start_time)
860     : parent_(std::move(parent)), start_time_(start_time) {
861   auto interval = parent_->config_->outlier_detection_config().interval;
862   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
863     gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer will run in %s",
864             parent_.get(), interval.ToString().c_str());
865   }
866   timer_handle_ = parent_->channel_control_helper()->GetEventEngine()->RunAfter(
867       interval, [self = Ref(DEBUG_LOCATION, "EjectionTimer")]() mutable {
868         ApplicationCallbackExecCtx callback_exec_ctx;
869         ExecCtx exec_ctx;
870         auto self_ptr = self.get();
871         self_ptr->parent_->work_serializer()->Run(
872             [self = std::move(self)]() { self->OnTimerLocked(); },
873             DEBUG_LOCATION);
874       });
875 }
876 
Orphan()877 void OutlierDetectionLb::EjectionTimer::Orphan() {
878   if (timer_handle_.has_value()) {
879     parent_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
880     timer_handle_.reset();
881   }
882   Unref();
883 }
884 
OnTimerLocked()885 void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
886   if (!timer_handle_.has_value()) return;
887   timer_handle_.reset();
888   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
889     gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running",
890             parent_.get());
891   }
892   std::map<EndpointState*, double> success_rate_ejection_candidates;
893   std::map<EndpointState*, double> failure_percentage_ejection_candidates;
894   size_t ejected_host_count = 0;
895   double success_rate_sum = 0;
896   auto time_now = Timestamp::Now();
897   auto& config = parent_->config_->outlier_detection_config();
898   for (auto& state : parent_->endpoint_state_map_) {
899     auto* endpoint_state = state.second.get();
900     // For each address, swap the call counter's buckets in that address's
901     // map entry.
902     endpoint_state->RotateBucket();
903     // Gather data to run success rate algorithm or failure percentage
904     // algorithm.
905     if (endpoint_state->ejection_time().has_value()) {
906       ++ejected_host_count;
907     }
908     absl::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
909         endpoint_state->GetSuccessRateAndVolume();
910     if (!host_success_rate_and_volume.has_value()) {
911       continue;
912     }
913     double success_rate = host_success_rate_and_volume->first;
914     uint64_t request_volume = host_success_rate_and_volume->second;
915     if (config.success_rate_ejection.has_value()) {
916       if (request_volume >= config.success_rate_ejection->request_volume) {
917         success_rate_ejection_candidates[endpoint_state] = success_rate;
918         success_rate_sum += success_rate;
919       }
920     }
921     if (config.failure_percentage_ejection.has_value()) {
922       if (request_volume >=
923           config.failure_percentage_ejection->request_volume) {
924         failure_percentage_ejection_candidates[endpoint_state] = success_rate;
925       }
926     }
927   }
928   if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
929     gpr_log(GPR_INFO,
930             "[outlier_detection_lb %p] found %" PRIuPTR
931             " success rate candidates and %" PRIuPTR
932             " failure percentage candidates; ejected_host_count=%" PRIuPTR
933             "; success_rate_sum=%.3f",
934             parent_.get(), success_rate_ejection_candidates.size(),
935             failure_percentage_ejection_candidates.size(), ejected_host_count,
936             success_rate_sum);
937   }
938   // success rate algorithm
939   if (!success_rate_ejection_candidates.empty() &&
940       success_rate_ejection_candidates.size() >=
941           config.success_rate_ejection->minimum_hosts) {
942     if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
943       gpr_log(GPR_INFO,
944               "[outlier_detection_lb %p] running success rate algorithm: "
945               "stdev_factor=%d, enforcement_percentage=%d",
946               parent_.get(), config.success_rate_ejection->stdev_factor,
947               config.success_rate_ejection->enforcement_percentage);
948     }
949     // calculate ejection threshold: (mean - stdev *
950     // (success_rate_ejection.stdev_factor / 1000))
951     double mean = success_rate_sum / success_rate_ejection_candidates.size();
952     double variance = 0;
953     for (const auto& p : success_rate_ejection_candidates) {
954       variance += std::pow(p.second - mean, 2);
955     }
956     variance /= success_rate_ejection_candidates.size();
957     double stdev = std::sqrt(variance);
958     const double success_rate_stdev_factor =
959         static_cast<double>(config.success_rate_ejection->stdev_factor) / 1000;
960     double ejection_threshold = mean - stdev * success_rate_stdev_factor;
961     if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
962       gpr_log(GPR_INFO,
963               "[outlier_detection_lb %p] stdev=%.3f, ejection_threshold=%.3f",
964               parent_.get(), stdev, ejection_threshold);
965     }
966     for (auto& candidate : success_rate_ejection_candidates) {
967       if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
968         gpr_log(GPR_INFO,
969                 "[outlier_detection_lb %p] checking candidate %p: "
970                 "success_rate=%.3f",
971                 parent_.get(), candidate.first, candidate.second);
972       }
973       if (candidate.second < ejection_threshold) {
974         uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
975         double current_percent =
976             100.0 * ejected_host_count / parent_->endpoint_state_map_.size();
977         if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
978           gpr_log(GPR_INFO,
979                   "[outlier_detection_lb %p] random_key=%d "
980                   "ejected_host_count=%" PRIuPTR " current_percent=%.3f",
981                   parent_.get(), random_key, ejected_host_count,
982                   current_percent);
983         }
984         if (random_key < config.success_rate_ejection->enforcement_percentage &&
985             (ejected_host_count == 0 ||
986              (current_percent < config.max_ejection_percent))) {
987           // Eject and record the timestamp for use when ejecting addresses in
988           // this iteration.
989           if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
990             gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
991                     parent_.get());
992           }
993           candidate.first->Eject(time_now);
994           ++ejected_host_count;
995         }
996       }
997     }
998   }
999   // failure percentage algorithm
1000   if (!failure_percentage_ejection_candidates.empty() &&
1001       failure_percentage_ejection_candidates.size() >=
1002           config.failure_percentage_ejection->minimum_hosts) {
1003     if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
1004       gpr_log(GPR_INFO,
1005               "[outlier_detection_lb %p] running failure percentage algorithm: "
1006               "threshold=%d, enforcement_percentage=%d",
1007               parent_.get(), config.failure_percentage_ejection->threshold,
1008               config.failure_percentage_ejection->enforcement_percentage);
1009     }
1010     for (auto& candidate : failure_percentage_ejection_candidates) {
1011       if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
1012         gpr_log(GPR_INFO,
1013                 "[outlier_detection_lb %p] checking candidate %p: "
1014                 "success_rate=%.3f",
1015                 parent_.get(), candidate.first, candidate.second);
1016       }
1017       // Extra check to make sure success rate algorithm didn't already
1018       // eject this backend.
1019       if (candidate.first->ejection_time().has_value()) continue;
1020       if ((100.0 - candidate.second) >
1021           config.failure_percentage_ejection->threshold) {
1022         uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
1023         double current_percent =
1024             100.0 * ejected_host_count / parent_->endpoint_state_map_.size();
1025         if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
1026           gpr_log(GPR_INFO,
1027                   "[outlier_detection_lb %p] random_key=%d "
1028                   "ejected_host_count=%" PRIuPTR " current_percent=%.3f",
1029                   parent_.get(), random_key, ejected_host_count,
1030                   current_percent);
1031         }
1032         if (random_key <
1033                 config.failure_percentage_ejection->enforcement_percentage &&
1034             (ejected_host_count == 0 ||
1035              (current_percent < config.max_ejection_percent))) {
1036           // Eject and record the timestamp for use when ejecting addresses in
1037           // this iteration.
1038           if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
1039             gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
1040                     parent_.get());
1041           }
1042           candidate.first->Eject(time_now);
1043           ++ejected_host_count;
1044         }
1045       }
1046     }
1047   }
1048   // For each address in the map:
1049   //   If the address is not ejected and the multiplier is greater than 0,
1050   //   decrease the multiplier by 1. If the address is ejected, and the
1051   //   current time is after ejection_timestamp + min(base_ejection_time *
1052   //   multiplier, max(base_ejection_time, max_ejection_time)), un-eject the
1053   //   address.
1054   for (auto& state : parent_->endpoint_state_map_) {
1055     auto* endpoint_state = state.second.get();
1056     const bool unejected = endpoint_state->MaybeUneject(
1057         config.base_ejection_time.millis(), config.max_ejection_time.millis());
1058     if (unejected && GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
1059       gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected endpoint %s (%p)",
1060               parent_.get(), state.first.ToString().c_str(), endpoint_state);
1061     }
1062   }
1063   parent_->ejection_timer_ =
1064       MakeOrphanable<EjectionTimer>(parent_, Timestamp::Now());
1065 }
1066 
1067 //
1068 // factory
1069 //
1070 
1071 class OutlierDetectionLbFactory final : public LoadBalancingPolicyFactory {
1072  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1073   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1074       LoadBalancingPolicy::Args args) const override {
1075     return MakeOrphanable<OutlierDetectionLb>(std::move(args));
1076   }
1077 
name() const1078   absl::string_view name() const override { return kOutlierDetection; }
1079 
1080   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1081   ParseLoadBalancingConfig(const Json& json) const override {
1082     ValidationErrors errors;
1083     OutlierDetectionConfig outlier_detection_config;
1084     RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
1085     {
1086       outlier_detection_config =
1087           LoadFromJson<OutlierDetectionConfig>(json, JsonArgs(), &errors);
1088       // Parse childPolicy manually.
1089       {
1090         ValidationErrors::ScopedField field(&errors, ".childPolicy");
1091         auto it = json.object().find("childPolicy");
1092         if (it == json.object().end()) {
1093           errors.AddError("field not present");
1094         } else {
1095           auto child_policy_config = CoreConfiguration::Get()
1096                                          .lb_policy_registry()
1097                                          .ParseLoadBalancingConfig(it->second);
1098           if (!child_policy_config.ok()) {
1099             errors.AddError(child_policy_config.status().message());
1100           } else {
1101             child_policy = std::move(*child_policy_config);
1102           }
1103         }
1104       }
1105     }
1106     if (!errors.ok()) {
1107       return errors.status(
1108           absl::StatusCode::kInvalidArgument,
1109           "errors validating outlier_detection LB policy config");
1110     }
1111     return MakeRefCounted<OutlierDetectionLbConfig>(outlier_detection_config,
1112                                                     std::move(child_policy));
1113   }
1114 };
1115 
1116 }  // namespace
1117 
1118 //
1119 // OutlierDetectionConfig
1120 //
1121 
1122 const JsonLoaderInterface*
JsonLoader(const JsonArgs &)1123 OutlierDetectionConfig::SuccessRateEjection::JsonLoader(const JsonArgs&) {
1124   static const auto* loader =
1125       JsonObjectLoader<SuccessRateEjection>()
1126           .OptionalField("stdevFactor", &SuccessRateEjection::stdev_factor)
1127           .OptionalField("enforcementPercentage",
1128                          &SuccessRateEjection::enforcement_percentage)
1129           .OptionalField("minimumHosts", &SuccessRateEjection::minimum_hosts)
1130           .OptionalField("requestVolume", &SuccessRateEjection::request_volume)
1131           .Finish();
1132   return loader;
1133 }
1134 
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)1135 void OutlierDetectionConfig::SuccessRateEjection::JsonPostLoad(
1136     const Json&, const JsonArgs&, ValidationErrors* errors) {
1137   if (enforcement_percentage > 100) {
1138     ValidationErrors::ScopedField field(errors, ".enforcement_percentage");
1139     errors->AddError("value must be <= 100");
1140   }
1141 }
1142 
1143 const JsonLoaderInterface*
JsonLoader(const JsonArgs &)1144 OutlierDetectionConfig::FailurePercentageEjection::JsonLoader(const JsonArgs&) {
1145   static const auto* loader =
1146       JsonObjectLoader<FailurePercentageEjection>()
1147           .OptionalField("threshold", &FailurePercentageEjection::threshold)
1148           .OptionalField("enforcementPercentage",
1149                          &FailurePercentageEjection::enforcement_percentage)
1150           .OptionalField("minimumHosts",
1151                          &FailurePercentageEjection::minimum_hosts)
1152           .OptionalField("requestVolume",
1153                          &FailurePercentageEjection::request_volume)
1154           .Finish();
1155   return loader;
1156 }
1157 
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)1158 void OutlierDetectionConfig::FailurePercentageEjection::JsonPostLoad(
1159     const Json&, const JsonArgs&, ValidationErrors* errors) {
1160   if (enforcement_percentage > 100) {
1161     ValidationErrors::ScopedField field(errors, ".enforcement_percentage");
1162     errors->AddError("value must be <= 100");
1163   }
1164   if (threshold > 100) {
1165     ValidationErrors::ScopedField field(errors, ".threshold");
1166     errors->AddError("value must be <= 100");
1167   }
1168 }
1169 
JsonLoader(const JsonArgs &)1170 const JsonLoaderInterface* OutlierDetectionConfig::JsonLoader(const JsonArgs&) {
1171   static const auto* loader =
1172       JsonObjectLoader<OutlierDetectionConfig>()
1173           .OptionalField("interval", &OutlierDetectionConfig::interval)
1174           .OptionalField("baseEjectionTime",
1175                          &OutlierDetectionConfig::base_ejection_time)
1176           .OptionalField("maxEjectionTime",
1177                          &OutlierDetectionConfig::max_ejection_time)
1178           .OptionalField("maxEjectionPercent",
1179                          &OutlierDetectionConfig::max_ejection_percent)
1180           .OptionalField("successRateEjection",
1181                          &OutlierDetectionConfig::success_rate_ejection)
1182           .OptionalField("failurePercentageEjection",
1183                          &OutlierDetectionConfig::failure_percentage_ejection)
1184           .Finish();
1185   return loader;
1186 }
1187 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)1188 void OutlierDetectionConfig::JsonPostLoad(const Json& json, const JsonArgs&,
1189                                           ValidationErrors* errors) {
1190   if (json.object().find("maxEjectionTime") == json.object().end()) {
1191     max_ejection_time = std::max(base_ejection_time, Duration::Seconds(300));
1192   }
1193   if (max_ejection_percent > 100) {
1194     ValidationErrors::ScopedField field(errors, ".max_ejection_percent");
1195     errors->AddError("value must be <= 100");
1196   }
1197 }
1198 
1199 //
1200 // Plugin registration
1201 //
1202 
RegisterOutlierDetectionLbPolicy(CoreConfiguration::Builder * builder)1203 void RegisterOutlierDetectionLbPolicy(CoreConfiguration::Builder* builder) {
1204   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1205       std::make_unique<OutlierDetectionLbFactory>());
1206 }
1207 
1208 }  // namespace grpc_core
1209