1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/load_balancing/outlier_detection/outlier_detection.h"
20
21 #include <inttypes.h>
22 #include <stddef.h>
23
24 #include <algorithm>
25 #include <atomic>
26 #include <cmath>
27 #include <map>
28 #include <memory>
29 #include <set>
30 #include <string>
31 #include <type_traits>
32 #include <utility>
33 #include <vector>
34
35 #include "absl/base/thread_annotations.h"
36 #include "absl/meta/type_traits.h"
37 #include "absl/random/random.h"
38 #include "absl/status/status.h"
39 #include "absl/status/statusor.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/types/variant.h"
42
43 #include <grpc/event_engine/event_engine.h>
44 #include <grpc/impl/connectivity_state.h>
45 #include <grpc/support/log.h>
46
47 #include "src/core/client_channel/subchannel_interface_internal.h"
48 #include "src/core/lib/address_utils/sockaddr_utils.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/config/core_configuration.h"
51 #include "src/core/lib/debug/trace.h"
52 #include "src/core/lib/experiments/experiments.h"
53 #include "src/core/lib/gprpp/debug_location.h"
54 #include "src/core/lib/gprpp/orphanable.h"
55 #include "src/core/lib/gprpp/ref_counted.h"
56 #include "src/core/lib/gprpp/ref_counted_ptr.h"
57 #include "src/core/lib/gprpp/sync.h"
58 #include "src/core/lib/gprpp/unique_type_name.h"
59 #include "src/core/lib/gprpp/validation_errors.h"
60 #include "src/core/lib/gprpp/work_serializer.h"
61 #include "src/core/lib/iomgr/exec_ctx.h"
62 #include "src/core/lib/iomgr/iomgr_fwd.h"
63 #include "src/core/lib/iomgr/pollset_set.h"
64 #include "src/core/lib/iomgr/resolved_address.h"
65 #include "src/core/lib/json/json.h"
66 #include "src/core/lib/transport/connectivity_state.h"
67 #include "src/core/load_balancing/child_policy_handler.h"
68 #include "src/core/load_balancing/delegating_helper.h"
69 #include "src/core/load_balancing/health_check_client_internal.h"
70 #include "src/core/load_balancing/lb_policy.h"
71 #include "src/core/load_balancing/lb_policy_factory.h"
72 #include "src/core/load_balancing/lb_policy_registry.h"
73 #include "src/core/load_balancing/subchannel_interface.h"
74 #include "src/core/resolver/endpoint_addresses.h"
75
76 namespace grpc_core {
77
78 TraceFlag grpc_outlier_detection_lb_trace(false, "outlier_detection_lb");
79
80 namespace {
81
82 using ::grpc_event_engine::experimental::EventEngine;
83
84 constexpr absl::string_view kOutlierDetection =
85 "outlier_detection_experimental";
86
87 // Config for xDS Cluster Impl LB policy.
88 class OutlierDetectionLbConfig final : public LoadBalancingPolicy::Config {
89 public:
OutlierDetectionLbConfig(OutlierDetectionConfig outlier_detection_config,RefCountedPtr<LoadBalancingPolicy::Config> child_policy)90 OutlierDetectionLbConfig(
91 OutlierDetectionConfig outlier_detection_config,
92 RefCountedPtr<LoadBalancingPolicy::Config> child_policy)
93 : outlier_detection_config_(outlier_detection_config),
94 child_policy_(std::move(child_policy)) {}
95
name() const96 absl::string_view name() const override { return kOutlierDetection; }
97
CountingEnabled() const98 bool CountingEnabled() const {
99 return outlier_detection_config_.success_rate_ejection.has_value() ||
100 outlier_detection_config_.failure_percentage_ejection.has_value();
101 }
102
outlier_detection_config() const103 const OutlierDetectionConfig& outlier_detection_config() const {
104 return outlier_detection_config_;
105 }
106
child_policy() const107 RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
108 return child_policy_;
109 }
110
111 private:
112 OutlierDetectionConfig outlier_detection_config_;
113 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
114 };
115
116 // xDS Cluster Impl LB policy.
117 class OutlierDetectionLb final : public LoadBalancingPolicy {
118 public:
119 explicit OutlierDetectionLb(Args args);
120
name() const121 absl::string_view name() const override { return kOutlierDetection; }
122
123 absl::Status UpdateLocked(UpdateArgs args) override;
124 void ExitIdleLocked() override;
125 void ResetBackoffLocked() override;
126
127 private:
128 class SubchannelState;
129 class EndpointState;
130
131 class SubchannelWrapper final : public DelegatingSubchannel {
132 public:
SubchannelWrapper(std::shared_ptr<WorkSerializer> work_serializer,RefCountedPtr<SubchannelState> subchannel_state,RefCountedPtr<SubchannelInterface> subchannel)133 SubchannelWrapper(std::shared_ptr<WorkSerializer> work_serializer,
134 RefCountedPtr<SubchannelState> subchannel_state,
135 RefCountedPtr<SubchannelInterface> subchannel)
136 : DelegatingSubchannel(std::move(subchannel)),
137 work_serializer_(std::move(work_serializer)),
138 subchannel_state_(std::move(subchannel_state)) {
139 if (subchannel_state_ != nullptr) {
140 subchannel_state_->AddSubchannel(this);
141 if (subchannel_state_->endpoint_state()->ejection_time().has_value()) {
142 ejected_ = true;
143 }
144 }
145 }
146
147 void Eject();
148
149 void Uneject();
150
151 void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override;
152
153 void CancelDataWatcher(DataWatcherInterface* watcher) override;
154
endpoint_state() const155 RefCountedPtr<EndpointState> endpoint_state() const {
156 if (subchannel_state_ == nullptr) return nullptr;
157 return subchannel_state_->endpoint_state();
158 }
159
160 private:
161 class WatcherWrapper final
162 : public SubchannelInterface::ConnectivityStateWatcherInterface {
163 public:
WatcherWrapper(std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> health_watcher,bool ejected)164 WatcherWrapper(std::shared_ptr<
165 SubchannelInterface::ConnectivityStateWatcherInterface>
166 health_watcher,
167 bool ejected)
168 : watcher_(std::move(health_watcher)), ejected_(ejected) {}
169
Eject()170 void Eject() {
171 ejected_ = true;
172 if (last_seen_state_.has_value()) {
173 watcher_->OnConnectivityStateChange(
174 GRPC_CHANNEL_TRANSIENT_FAILURE,
175 absl::UnavailableError(
176 "subchannel ejected by outlier detection"));
177 }
178 }
179
Uneject()180 void Uneject() {
181 ejected_ = false;
182 if (last_seen_state_.has_value()) {
183 watcher_->OnConnectivityStateChange(*last_seen_state_,
184 last_seen_status_);
185 }
186 }
187
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)188 void OnConnectivityStateChange(grpc_connectivity_state new_state,
189 absl::Status status) override {
190 const bool send_update = !last_seen_state_.has_value() || !ejected_;
191 last_seen_state_ = new_state;
192 last_seen_status_ = status;
193 if (send_update) {
194 if (ejected_) {
195 new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
196 status = absl::UnavailableError(
197 "subchannel ejected by outlier detection");
198 }
199 watcher_->OnConnectivityStateChange(new_state, status);
200 }
201 }
202
interested_parties()203 grpc_pollset_set* interested_parties() override {
204 return watcher_->interested_parties();
205 }
206
207 private:
208 std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
209 watcher_;
210 absl::optional<grpc_connectivity_state> last_seen_state_;
211 absl::Status last_seen_status_;
212 bool ejected_;
213 };
214
Orphaned()215 void Orphaned() override {
216 if (!IsWorkSerializerDispatchEnabled()) {
217 if (subchannel_state_ != nullptr) {
218 subchannel_state_->RemoveSubchannel(this);
219 }
220 return;
221 }
222 work_serializer_->Run(
223 [self = WeakRefAsSubclass<SubchannelWrapper>()]() {
224 if (self->subchannel_state_ != nullptr) {
225 self->subchannel_state_->RemoveSubchannel(self.get());
226 }
227 },
228 DEBUG_LOCATION);
229 }
230
231 std::shared_ptr<WorkSerializer> work_serializer_;
232 RefCountedPtr<SubchannelState> subchannel_state_;
233 bool ejected_ = false;
234 WatcherWrapper* watcher_wrapper_ = nullptr;
235 };
236
237 class SubchannelState final : public RefCounted<SubchannelState> {
238 public:
AddSubchannel(SubchannelWrapper * wrapper)239 void AddSubchannel(SubchannelWrapper* wrapper) {
240 subchannels_.insert(wrapper);
241 }
242
RemoveSubchannel(SubchannelWrapper * wrapper)243 void RemoveSubchannel(SubchannelWrapper* wrapper) {
244 subchannels_.erase(wrapper);
245 }
246
endpoint_state()247 RefCountedPtr<EndpointState> endpoint_state() {
248 MutexLock lock(&mu_);
249 return endpoint_state_;
250 }
251
set_endpoint_state(RefCountedPtr<EndpointState> endpoint_state)252 void set_endpoint_state(RefCountedPtr<EndpointState> endpoint_state) {
253 MutexLock lock(&mu_);
254 endpoint_state_ = std::move(endpoint_state);
255 }
256
Eject()257 void Eject() {
258 // Ejecting the subchannel may cause the child policy to unref the
259 // subchannel, so we need to be prepared for the set to be modified
260 // while we are iterating.
261 for (auto it = subchannels_.begin(); it != subchannels_.end();) {
262 SubchannelWrapper* subchannel = *it;
263 ++it;
264 subchannel->Eject();
265 }
266 }
267
Uneject()268 void Uneject() {
269 for (auto& subchannel : subchannels_) {
270 subchannel->Uneject();
271 }
272 }
273
274 private:
275 std::set<SubchannelWrapper*> subchannels_;
276 Mutex mu_;
277 RefCountedPtr<EndpointState> endpoint_state_ ABSL_GUARDED_BY(mu_);
278 };
279
280 class EndpointState final : public RefCounted<EndpointState> {
281 public:
EndpointState(std::set<SubchannelState * > subchannels)282 explicit EndpointState(std::set<SubchannelState*> subchannels)
283 : subchannels_(std::move(subchannels)) {
284 for (SubchannelState* subchannel : subchannels_) {
285 subchannel->set_endpoint_state(Ref());
286 }
287 }
288
RotateBucket()289 void RotateBucket() {
290 backup_bucket_->successes = 0;
291 backup_bucket_->failures = 0;
292 current_bucket_.swap(backup_bucket_);
293 active_bucket_.store(current_bucket_.get());
294 }
295
GetSuccessRateAndVolume()296 absl::optional<std::pair<double, uint64_t>> GetSuccessRateAndVolume() {
297 uint64_t total_request =
298 backup_bucket_->successes + backup_bucket_->failures;
299 if (total_request == 0) {
300 return absl::nullopt;
301 }
302 double success_rate =
303 backup_bucket_->successes * 100.0 /
304 (backup_bucket_->successes + backup_bucket_->failures);
305 return {
306 {success_rate, backup_bucket_->successes + backup_bucket_->failures}};
307 }
308
AddSuccessCount()309 void AddSuccessCount() { active_bucket_.load()->successes.fetch_add(1); }
310
AddFailureCount()311 void AddFailureCount() { active_bucket_.load()->failures.fetch_add(1); }
312
ejection_time() const313 absl::optional<Timestamp> ejection_time() const { return ejection_time_; }
314
Eject(const Timestamp & time)315 void Eject(const Timestamp& time) {
316 ejection_time_ = time;
317 ++multiplier_;
318 for (SubchannelState* subchannel_state : subchannels_) {
319 subchannel_state->Eject();
320 }
321 }
322
Uneject()323 void Uneject() {
324 ejection_time_.reset();
325 for (SubchannelState* subchannel_state : subchannels_) {
326 subchannel_state->Uneject();
327 }
328 }
329
MaybeUneject(uint64_t base_ejection_time_in_millis,uint64_t max_ejection_time_in_millis)330 bool MaybeUneject(uint64_t base_ejection_time_in_millis,
331 uint64_t max_ejection_time_in_millis) {
332 if (!ejection_time_.has_value()) {
333 if (multiplier_ > 0) {
334 --multiplier_;
335 }
336 } else {
337 GPR_ASSERT(ejection_time_.has_value());
338 auto change_time = ejection_time_.value() +
339 Duration::Milliseconds(std::min(
340 base_ejection_time_in_millis * multiplier_,
341 std::max(base_ejection_time_in_millis,
342 max_ejection_time_in_millis)));
343 if (change_time < Timestamp::Now()) {
344 Uneject();
345 return true;
346 }
347 }
348 return false;
349 }
350
DisableEjection()351 void DisableEjection() {
352 if (ejection_time_.has_value()) Uneject();
353 multiplier_ = 0;
354 }
355
356 private:
357 struct Bucket {
358 std::atomic<uint64_t> successes;
359 std::atomic<uint64_t> failures;
360 };
361
362 const std::set<SubchannelState*> subchannels_;
363
364 std::unique_ptr<Bucket> current_bucket_ = std::make_unique<Bucket>();
365 std::unique_ptr<Bucket> backup_bucket_ = std::make_unique<Bucket>();
366 // The bucket used to update call counts.
367 // Points to either current_bucket or active_bucket.
368 std::atomic<Bucket*> active_bucket_{current_bucket_.get()};
369 uint32_t multiplier_ = 0;
370 absl::optional<Timestamp> ejection_time_;
371 };
372
373 // A picker that wraps the picker from the child to perform outlier detection.
374 class Picker final : public SubchannelPicker {
375 public:
376 Picker(OutlierDetectionLb* outlier_detection_lb,
377 RefCountedPtr<SubchannelPicker> picker, bool counting_enabled);
378
379 PickResult Pick(PickArgs args) override;
380
381 private:
382 class SubchannelCallTracker;
383 RefCountedPtr<SubchannelPicker> picker_;
384 bool counting_enabled_;
385 };
386
387 class Helper final
388 : public ParentOwningDelegatingChannelControlHelper<OutlierDetectionLb> {
389 public:
Helper(RefCountedPtr<OutlierDetectionLb> outlier_detection_policy)390 explicit Helper(RefCountedPtr<OutlierDetectionLb> outlier_detection_policy)
391 : ParentOwningDelegatingChannelControlHelper(
392 std::move(outlier_detection_policy)) {}
393
394 RefCountedPtr<SubchannelInterface> CreateSubchannel(
395 const grpc_resolved_address& address,
396 const ChannelArgs& per_address_args, const ChannelArgs& args) override;
397 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
398 RefCountedPtr<SubchannelPicker> picker) override;
399 };
400
401 class EjectionTimer final : public InternallyRefCounted<EjectionTimer> {
402 public:
403 EjectionTimer(RefCountedPtr<OutlierDetectionLb> parent,
404 Timestamp start_time);
405
406 void Orphan() override;
407
StartTime() const408 Timestamp StartTime() const { return start_time_; }
409
410 private:
411 void OnTimerLocked();
412
413 RefCountedPtr<OutlierDetectionLb> parent_;
414 absl::optional<EventEngine::TaskHandle> timer_handle_;
415 Timestamp start_time_;
416 absl::BitGen bit_gen_;
417 };
418
419 ~OutlierDetectionLb() override;
420
421 void ShutdownLocked() override;
422
423 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
424 const ChannelArgs& args);
425
426 void MaybeUpdatePickerLocked();
427
428 // Current config from the resolver.
429 RefCountedPtr<OutlierDetectionLbConfig> config_;
430
431 // Internal state.
432 bool shutting_down_ = false;
433
434 OrphanablePtr<LoadBalancingPolicy> child_policy_;
435
436 // Latest state and picker reported by the child policy.
437 grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
438 absl::Status status_;
439 RefCountedPtr<SubchannelPicker> picker_;
440 std::map<EndpointAddressSet, RefCountedPtr<EndpointState>>
441 endpoint_state_map_;
442 std::map<grpc_resolved_address, RefCountedPtr<SubchannelState>,
443 ResolvedAddressLessThan>
444 subchannel_state_map_;
445 OrphanablePtr<EjectionTimer> ejection_timer_;
446 };
447
448 //
449 // OutlierDetectionLb::SubchannelWrapper
450 //
451
Eject()452 void OutlierDetectionLb::SubchannelWrapper::Eject() {
453 ejected_ = true;
454 if (watcher_wrapper_ != nullptr) watcher_wrapper_->Eject();
455 }
456
Uneject()457 void OutlierDetectionLb::SubchannelWrapper::Uneject() {
458 ejected_ = false;
459 if (watcher_wrapper_ != nullptr) watcher_wrapper_->Uneject();
460 }
461
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)462 void OutlierDetectionLb::SubchannelWrapper::AddDataWatcher(
463 std::unique_ptr<DataWatcherInterface> watcher) {
464 auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get());
465 if (w->type() == HealthProducer::Type()) {
466 auto* health_watcher = static_cast<HealthWatcher*>(watcher.get());
467 auto watcher_wrapper = std::make_shared<WatcherWrapper>(
468 health_watcher->TakeWatcher(), ejected_);
469 watcher_wrapper_ = watcher_wrapper.get();
470 health_watcher->SetWatcher(std::move(watcher_wrapper));
471 }
472 DelegatingSubchannel::AddDataWatcher(std::move(watcher));
473 }
474
CancelDataWatcher(DataWatcherInterface * watcher)475 void OutlierDetectionLb::SubchannelWrapper::CancelDataWatcher(
476 DataWatcherInterface* watcher) {
477 auto* w = static_cast<InternalSubchannelDataWatcherInterface*>(watcher);
478 if (w->type() == HealthProducer::Type()) watcher_wrapper_ = nullptr;
479 DelegatingSubchannel::CancelDataWatcher(watcher);
480 }
481
482 //
483 // OutlierDetectionLb::Picker::SubchannelCallTracker
484 //
485
486 class OutlierDetectionLb::Picker::SubchannelCallTracker final
487 : public LoadBalancingPolicy::SubchannelCallTrackerInterface {
488 public:
SubchannelCallTracker(std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface> original_subchannel_call_tracker,RefCountedPtr<EndpointState> endpoint_state)489 SubchannelCallTracker(
490 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
491 original_subchannel_call_tracker,
492 RefCountedPtr<EndpointState> endpoint_state)
493 : original_subchannel_call_tracker_(
494 std::move(original_subchannel_call_tracker)),
495 endpoint_state_(std::move(endpoint_state)) {}
496
~SubchannelCallTracker()497 ~SubchannelCallTracker() override {
498 endpoint_state_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
499 }
500
Start()501 void Start() override {
502 // This tracker does not care about started calls only finished calls.
503 // Delegate if needed.
504 if (original_subchannel_call_tracker_ != nullptr) {
505 original_subchannel_call_tracker_->Start();
506 }
507 }
508
Finish(FinishArgs args)509 void Finish(FinishArgs args) override {
510 // Delegate if needed.
511 if (original_subchannel_call_tracker_ != nullptr) {
512 original_subchannel_call_tracker_->Finish(args);
513 }
514 // Record call completion based on status for outlier detection
515 // calculations.
516 if (args.status.ok()) {
517 endpoint_state_->AddSuccessCount();
518 } else {
519 endpoint_state_->AddFailureCount();
520 }
521 }
522
523 private:
524 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
525 original_subchannel_call_tracker_;
526 RefCountedPtr<EndpointState> endpoint_state_;
527 };
528
529 //
530 // OutlierDetectionLb::Picker
531 //
532
Picker(OutlierDetectionLb * outlier_detection_lb,RefCountedPtr<SubchannelPicker> picker,bool counting_enabled)533 OutlierDetectionLb::Picker::Picker(OutlierDetectionLb* outlier_detection_lb,
534 RefCountedPtr<SubchannelPicker> picker,
535 bool counting_enabled)
536 : picker_(std::move(picker)), counting_enabled_(counting_enabled) {
537 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
538 gpr_log(GPR_INFO,
539 "[outlier_detection_lb %p] constructed new picker %p and counting "
540 "is %s",
541 outlier_detection_lb, this,
542 (counting_enabled ? "enabled" : "disabled"));
543 }
544 }
545
Pick(LoadBalancingPolicy::PickArgs args)546 LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(
547 LoadBalancingPolicy::PickArgs args) {
548 if (picker_ == nullptr) { // Should never happen.
549 return PickResult::Fail(absl::InternalError(
550 "outlier_detection picker not given any child picker"));
551 }
552 // Delegate to child picker
553 PickResult result = picker_->Pick(args);
554 auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
555 if (complete_pick != nullptr) {
556 auto* subchannel_wrapper =
557 static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
558 // Inject subchannel call tracker to record call completion as long as
559 // either success_rate_ejection or failure_percentage_ejection is enabled.
560 if (counting_enabled_) {
561 auto endpoint_state = subchannel_wrapper->endpoint_state();
562 if (endpoint_state != nullptr) {
563 complete_pick->subchannel_call_tracker =
564 std::make_unique<SubchannelCallTracker>(
565 std::move(complete_pick->subchannel_call_tracker),
566 std::move(endpoint_state));
567 }
568 }
569 // Unwrap subchannel to pass back up the stack.
570 complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
571 }
572 return result;
573 }
574
575 //
576 // OutlierDetectionLb
577 //
578
OutlierDetectionLb(Args args)579 OutlierDetectionLb::OutlierDetectionLb(Args args)
580 : LoadBalancingPolicy(std::move(args)) {
581 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
582 gpr_log(GPR_INFO, "[outlier_detection_lb %p] created", this);
583 }
584 }
585
~OutlierDetectionLb()586 OutlierDetectionLb::~OutlierDetectionLb() {
587 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
588 gpr_log(GPR_INFO,
589 "[outlier_detection_lb %p] destroying outlier_detection LB policy",
590 this);
591 }
592 }
593
ShutdownLocked()594 void OutlierDetectionLb::ShutdownLocked() {
595 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
596 gpr_log(GPR_INFO, "[outlier_detection_lb %p] shutting down", this);
597 }
598 ejection_timer_.reset();
599 shutting_down_ = true;
600 // Remove the child policy's interested_parties pollset_set from the
601 // xDS policy.
602 if (child_policy_ != nullptr) {
603 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
604 interested_parties());
605 child_policy_.reset();
606 }
607 // Drop our ref to the child's picker, in case it's holding a ref to
608 // the child.
609 picker_.reset();
610 }
611
ExitIdleLocked()612 void OutlierDetectionLb::ExitIdleLocked() {
613 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
614 }
615
ResetBackoffLocked()616 void OutlierDetectionLb::ResetBackoffLocked() {
617 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
618 }
619
UpdateLocked(UpdateArgs args)620 absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
621 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
622 gpr_log(GPR_INFO, "[outlier_detection_lb %p] Received update", this);
623 }
624 auto old_config = std::move(config_);
625 // Update config.
626 config_ = args.config.TakeAsSubclass<OutlierDetectionLbConfig>();
627 // Update outlier detection timer.
628 if (!config_->CountingEnabled()) {
629 // No need for timer. Cancel the current timer, if any.
630 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
631 gpr_log(GPR_INFO,
632 "[outlier_detection_lb %p] counting disabled, cancelling timer",
633 this);
634 }
635 ejection_timer_.reset();
636 } else if (ejection_timer_ == nullptr) {
637 // No timer running. Start it now.
638 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
639 gpr_log(GPR_INFO, "[outlier_detection_lb %p] starting timer", this);
640 }
641 ejection_timer_ = MakeOrphanable<EjectionTimer>(
642 RefAsSubclass<OutlierDetectionLb>(), Timestamp::Now());
643 for (const auto& p : endpoint_state_map_) {
644 p.second->RotateBucket(); // Reset call counters.
645 }
646 } else if (old_config->outlier_detection_config().interval !=
647 config_->outlier_detection_config().interval) {
648 // Timer interval changed. Cancel the current timer and start a new one
649 // with the same start time.
650 // Note that if the new deadline is in the past, the timer will fire
651 // immediately.
652 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
653 gpr_log(GPR_INFO,
654 "[outlier_detection_lb %p] interval changed, replacing timer",
655 this);
656 }
657 ejection_timer_ = MakeOrphanable<EjectionTimer>(
658 RefAsSubclass<OutlierDetectionLb>(), ejection_timer_->StartTime());
659 }
660 // Update subchannel and endpoint maps.
661 if (args.addresses.ok()) {
662 std::set<EndpointAddressSet> current_endpoints;
663 std::set<grpc_resolved_address, ResolvedAddressLessThan> current_addresses;
664 (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
665 EndpointAddressSet key(endpoint.addresses());
666 current_endpoints.emplace(key);
667 for (const grpc_resolved_address& address : endpoint.addresses()) {
668 current_addresses.emplace(address);
669 }
670 // Find the entry in the endpoint map.
671 auto it = endpoint_state_map_.find(key);
672 if (it == endpoint_state_map_.end()) {
673 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
674 gpr_log(GPR_INFO,
675 "[outlier_detection_lb %p] adding endpoint entry for %s",
676 this, key.ToString().c_str());
677 }
678 // The endpoint is not present in the map, so we'll need to add it.
679 // Start by getting a pointer to the entry for each address in the
680 // subchannel map, creating the entry if needed.
681 std::set<SubchannelState*> subchannels;
682 for (const grpc_resolved_address& address : endpoint.addresses()) {
683 auto it2 = subchannel_state_map_.find(address);
684 if (it2 == subchannel_state_map_.end()) {
685 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
686 std::string address_str = grpc_sockaddr_to_string(&address, false)
687 .value_or("<unknown>");
688 gpr_log(GPR_INFO,
689 "[outlier_detection_lb %p] adding address entry for %s",
690 this, address_str.c_str());
691 }
692 it2 = subchannel_state_map_
693 .emplace(address, MakeRefCounted<SubchannelState>())
694 .first;
695 }
696 subchannels.insert(it2->second.get());
697 }
698 // Now create the endpoint.
699 endpoint_state_map_.emplace(
700 key, MakeRefCounted<EndpointState>(std::move(subchannels)));
701 } else if (!config_->CountingEnabled()) {
702 // If counting is not enabled, reset state.
703 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
704 gpr_log(GPR_INFO,
705 "[outlier_detection_lb %p] counting disabled; disabling "
706 "ejection for %s",
707 this, key.ToString().c_str());
708 }
709 it->second->DisableEjection();
710 }
711 });
712 // Remove any entries we no longer need in the subchannel map.
713 for (auto it = subchannel_state_map_.begin();
714 it != subchannel_state_map_.end();) {
715 if (current_addresses.find(it->first) == current_addresses.end()) {
716 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
717 std::string address_str =
718 grpc_sockaddr_to_string(&it->first, false).value_or("<unknown>");
719 gpr_log(GPR_INFO,
720 "[outlier_detection_lb %p] removing subchannel map entry %s",
721 this, address_str.c_str());
722 }
723 // Don't hold a ref to the corresponding EndpointState object,
724 // because there could be subchannel wrappers keeping this alive
725 // for a while, and we don't need them to do any call tracking.
726 it->second->set_endpoint_state(nullptr);
727 it = subchannel_state_map_.erase(it);
728 } else {
729 ++it;
730 }
731 }
732 // Remove any entries we no longer need in the endpoint map.
733 for (auto it = endpoint_state_map_.begin();
734 it != endpoint_state_map_.end();) {
735 if (current_endpoints.find(it->first) == current_endpoints.end()) {
736 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
737 gpr_log(GPR_INFO,
738 "[outlier_detection_lb %p] removing endpoint map entry %s",
739 this, it->first.ToString().c_str());
740 }
741 it = endpoint_state_map_.erase(it);
742 } else {
743 ++it;
744 }
745 }
746 }
747 // Create child policy if needed.
748 if (child_policy_ == nullptr) {
749 child_policy_ = CreateChildPolicyLocked(args.args);
750 }
751 // Update child policy.
752 UpdateArgs update_args;
753 update_args.addresses = std::move(args.addresses);
754 update_args.resolution_note = std::move(args.resolution_note);
755 update_args.config = config_->child_policy();
756 update_args.args = std::move(args.args);
757 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
758 gpr_log(GPR_INFO,
759 "[outlier_detection_lb %p] Updating child policy handler %p", this,
760 child_policy_.get());
761 }
762 return child_policy_->UpdateLocked(std::move(update_args));
763 }
764
MaybeUpdatePickerLocked()765 void OutlierDetectionLb::MaybeUpdatePickerLocked() {
766 if (picker_ != nullptr) {
767 auto outlier_detection_picker =
768 MakeRefCounted<Picker>(this, picker_, config_->CountingEnabled());
769 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
770 gpr_log(GPR_INFO,
771 "[outlier_detection_lb %p] updating connectivity: state=%s "
772 "status=(%s) picker=%p",
773 this, ConnectivityStateName(state_), status_.ToString().c_str(),
774 outlier_detection_picker.get());
775 }
776 channel_control_helper()->UpdateState(state_, status_,
777 std::move(outlier_detection_picker));
778 }
779 }
780
CreateChildPolicyLocked(const ChannelArgs & args)781 OrphanablePtr<LoadBalancingPolicy> OutlierDetectionLb::CreateChildPolicyLocked(
782 const ChannelArgs& args) {
783 LoadBalancingPolicy::Args lb_policy_args;
784 lb_policy_args.work_serializer = work_serializer();
785 lb_policy_args.args = args;
786 lb_policy_args.channel_control_helper = std::make_unique<Helper>(
787 RefAsSubclass<OutlierDetectionLb>(DEBUG_LOCATION, "Helper"));
788 OrphanablePtr<LoadBalancingPolicy> lb_policy =
789 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
790 &grpc_outlier_detection_lb_trace);
791 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
792 gpr_log(GPR_INFO,
793 "[outlier_detection_lb %p] Created new child policy handler %p",
794 this, lb_policy.get());
795 }
796 // Add our interested_parties pollset_set to that of the newly created
797 // child policy. This will make the child policy progress upon activity on
798 // this policy, which in turn is tied to the application's call.
799 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
800 interested_parties());
801 return lb_policy;
802 }
803
804 //
805 // OutlierDetectionLb::Helper
806 //
807
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)808 RefCountedPtr<SubchannelInterface> OutlierDetectionLb::Helper::CreateSubchannel(
809 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
810 const ChannelArgs& args) {
811 if (parent()->shutting_down_) return nullptr;
812 RefCountedPtr<SubchannelState> subchannel_state;
813 auto it = parent()->subchannel_state_map_.find(address);
814 if (it != parent()->subchannel_state_map_.end()) {
815 subchannel_state = it->second->Ref();
816 }
817 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
818 std::string address_str =
819 grpc_sockaddr_to_string(&address, false).value_or("<unknown>");
820 gpr_log(GPR_INFO,
821 "[outlier_detection_lb %p] creating subchannel for %s, "
822 "subchannel state %p",
823 parent(), address_str.c_str(), subchannel_state.get());
824 }
825 auto subchannel = MakeRefCounted<SubchannelWrapper>(
826 parent()->work_serializer(), subchannel_state,
827 parent()->channel_control_helper()->CreateSubchannel(
828 address, per_address_args, args));
829 if (subchannel_state != nullptr) {
830 subchannel_state->AddSubchannel(subchannel.get());
831 }
832 return subchannel;
833 }
834
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)835 void OutlierDetectionLb::Helper::UpdateState(
836 grpc_connectivity_state state, const absl::Status& status,
837 RefCountedPtr<SubchannelPicker> picker) {
838 if (parent()->shutting_down_) return;
839 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
840 gpr_log(GPR_INFO,
841 "[outlier_detection_lb %p] child connectivity state update: "
842 "state=%s (%s) picker=%p",
843 parent(), ConnectivityStateName(state), status.ToString().c_str(),
844 picker.get());
845 }
846 // Save the state and picker.
847 parent()->state_ = state;
848 parent()->status_ = status;
849 parent()->picker_ = std::move(picker);
850 // Wrap the picker and return it to the channel.
851 parent()->MaybeUpdatePickerLocked();
852 }
853
854 //
855 // OutlierDetectionLb::EjectionTimer
856 //
857
EjectionTimer(RefCountedPtr<OutlierDetectionLb> parent,Timestamp start_time)858 OutlierDetectionLb::EjectionTimer::EjectionTimer(
859 RefCountedPtr<OutlierDetectionLb> parent, Timestamp start_time)
860 : parent_(std::move(parent)), start_time_(start_time) {
861 auto interval = parent_->config_->outlier_detection_config().interval;
862 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
863 gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer will run in %s",
864 parent_.get(), interval.ToString().c_str());
865 }
866 timer_handle_ = parent_->channel_control_helper()->GetEventEngine()->RunAfter(
867 interval, [self = Ref(DEBUG_LOCATION, "EjectionTimer")]() mutable {
868 ApplicationCallbackExecCtx callback_exec_ctx;
869 ExecCtx exec_ctx;
870 auto self_ptr = self.get();
871 self_ptr->parent_->work_serializer()->Run(
872 [self = std::move(self)]() { self->OnTimerLocked(); },
873 DEBUG_LOCATION);
874 });
875 }
876
Orphan()877 void OutlierDetectionLb::EjectionTimer::Orphan() {
878 if (timer_handle_.has_value()) {
879 parent_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
880 timer_handle_.reset();
881 }
882 Unref();
883 }
884
OnTimerLocked()885 void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
886 if (!timer_handle_.has_value()) return;
887 timer_handle_.reset();
888 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
889 gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running",
890 parent_.get());
891 }
892 std::map<EndpointState*, double> success_rate_ejection_candidates;
893 std::map<EndpointState*, double> failure_percentage_ejection_candidates;
894 size_t ejected_host_count = 0;
895 double success_rate_sum = 0;
896 auto time_now = Timestamp::Now();
897 auto& config = parent_->config_->outlier_detection_config();
898 for (auto& state : parent_->endpoint_state_map_) {
899 auto* endpoint_state = state.second.get();
900 // For each address, swap the call counter's buckets in that address's
901 // map entry.
902 endpoint_state->RotateBucket();
903 // Gather data to run success rate algorithm or failure percentage
904 // algorithm.
905 if (endpoint_state->ejection_time().has_value()) {
906 ++ejected_host_count;
907 }
908 absl::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
909 endpoint_state->GetSuccessRateAndVolume();
910 if (!host_success_rate_and_volume.has_value()) {
911 continue;
912 }
913 double success_rate = host_success_rate_and_volume->first;
914 uint64_t request_volume = host_success_rate_and_volume->second;
915 if (config.success_rate_ejection.has_value()) {
916 if (request_volume >= config.success_rate_ejection->request_volume) {
917 success_rate_ejection_candidates[endpoint_state] = success_rate;
918 success_rate_sum += success_rate;
919 }
920 }
921 if (config.failure_percentage_ejection.has_value()) {
922 if (request_volume >=
923 config.failure_percentage_ejection->request_volume) {
924 failure_percentage_ejection_candidates[endpoint_state] = success_rate;
925 }
926 }
927 }
928 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
929 gpr_log(GPR_INFO,
930 "[outlier_detection_lb %p] found %" PRIuPTR
931 " success rate candidates and %" PRIuPTR
932 " failure percentage candidates; ejected_host_count=%" PRIuPTR
933 "; success_rate_sum=%.3f",
934 parent_.get(), success_rate_ejection_candidates.size(),
935 failure_percentage_ejection_candidates.size(), ejected_host_count,
936 success_rate_sum);
937 }
938 // success rate algorithm
939 if (!success_rate_ejection_candidates.empty() &&
940 success_rate_ejection_candidates.size() >=
941 config.success_rate_ejection->minimum_hosts) {
942 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
943 gpr_log(GPR_INFO,
944 "[outlier_detection_lb %p] running success rate algorithm: "
945 "stdev_factor=%d, enforcement_percentage=%d",
946 parent_.get(), config.success_rate_ejection->stdev_factor,
947 config.success_rate_ejection->enforcement_percentage);
948 }
949 // calculate ejection threshold: (mean - stdev *
950 // (success_rate_ejection.stdev_factor / 1000))
951 double mean = success_rate_sum / success_rate_ejection_candidates.size();
952 double variance = 0;
953 for (const auto& p : success_rate_ejection_candidates) {
954 variance += std::pow(p.second - mean, 2);
955 }
956 variance /= success_rate_ejection_candidates.size();
957 double stdev = std::sqrt(variance);
958 const double success_rate_stdev_factor =
959 static_cast<double>(config.success_rate_ejection->stdev_factor) / 1000;
960 double ejection_threshold = mean - stdev * success_rate_stdev_factor;
961 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
962 gpr_log(GPR_INFO,
963 "[outlier_detection_lb %p] stdev=%.3f, ejection_threshold=%.3f",
964 parent_.get(), stdev, ejection_threshold);
965 }
966 for (auto& candidate : success_rate_ejection_candidates) {
967 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
968 gpr_log(GPR_INFO,
969 "[outlier_detection_lb %p] checking candidate %p: "
970 "success_rate=%.3f",
971 parent_.get(), candidate.first, candidate.second);
972 }
973 if (candidate.second < ejection_threshold) {
974 uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
975 double current_percent =
976 100.0 * ejected_host_count / parent_->endpoint_state_map_.size();
977 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
978 gpr_log(GPR_INFO,
979 "[outlier_detection_lb %p] random_key=%d "
980 "ejected_host_count=%" PRIuPTR " current_percent=%.3f",
981 parent_.get(), random_key, ejected_host_count,
982 current_percent);
983 }
984 if (random_key < config.success_rate_ejection->enforcement_percentage &&
985 (ejected_host_count == 0 ||
986 (current_percent < config.max_ejection_percent))) {
987 // Eject and record the timestamp for use when ejecting addresses in
988 // this iteration.
989 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
990 gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
991 parent_.get());
992 }
993 candidate.first->Eject(time_now);
994 ++ejected_host_count;
995 }
996 }
997 }
998 }
999 // failure percentage algorithm
1000 if (!failure_percentage_ejection_candidates.empty() &&
1001 failure_percentage_ejection_candidates.size() >=
1002 config.failure_percentage_ejection->minimum_hosts) {
1003 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
1004 gpr_log(GPR_INFO,
1005 "[outlier_detection_lb %p] running failure percentage algorithm: "
1006 "threshold=%d, enforcement_percentage=%d",
1007 parent_.get(), config.failure_percentage_ejection->threshold,
1008 config.failure_percentage_ejection->enforcement_percentage);
1009 }
1010 for (auto& candidate : failure_percentage_ejection_candidates) {
1011 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
1012 gpr_log(GPR_INFO,
1013 "[outlier_detection_lb %p] checking candidate %p: "
1014 "success_rate=%.3f",
1015 parent_.get(), candidate.first, candidate.second);
1016 }
1017 // Extra check to make sure success rate algorithm didn't already
1018 // eject this backend.
1019 if (candidate.first->ejection_time().has_value()) continue;
1020 if ((100.0 - candidate.second) >
1021 config.failure_percentage_ejection->threshold) {
1022 uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
1023 double current_percent =
1024 100.0 * ejected_host_count / parent_->endpoint_state_map_.size();
1025 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
1026 gpr_log(GPR_INFO,
1027 "[outlier_detection_lb %p] random_key=%d "
1028 "ejected_host_count=%" PRIuPTR " current_percent=%.3f",
1029 parent_.get(), random_key, ejected_host_count,
1030 current_percent);
1031 }
1032 if (random_key <
1033 config.failure_percentage_ejection->enforcement_percentage &&
1034 (ejected_host_count == 0 ||
1035 (current_percent < config.max_ejection_percent))) {
1036 // Eject and record the timestamp for use when ejecting addresses in
1037 // this iteration.
1038 if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
1039 gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate",
1040 parent_.get());
1041 }
1042 candidate.first->Eject(time_now);
1043 ++ejected_host_count;
1044 }
1045 }
1046 }
1047 }
1048 // For each address in the map:
1049 // If the address is not ejected and the multiplier is greater than 0,
1050 // decrease the multiplier by 1. If the address is ejected, and the
1051 // current time is after ejection_timestamp + min(base_ejection_time *
1052 // multiplier, max(base_ejection_time, max_ejection_time)), un-eject the
1053 // address.
1054 for (auto& state : parent_->endpoint_state_map_) {
1055 auto* endpoint_state = state.second.get();
1056 const bool unejected = endpoint_state->MaybeUneject(
1057 config.base_ejection_time.millis(), config.max_ejection_time.millis());
1058 if (unejected && GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
1059 gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected endpoint %s (%p)",
1060 parent_.get(), state.first.ToString().c_str(), endpoint_state);
1061 }
1062 }
1063 parent_->ejection_timer_ =
1064 MakeOrphanable<EjectionTimer>(parent_, Timestamp::Now());
1065 }
1066
1067 //
1068 // factory
1069 //
1070
1071 class OutlierDetectionLbFactory final : public LoadBalancingPolicyFactory {
1072 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1073 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1074 LoadBalancingPolicy::Args args) const override {
1075 return MakeOrphanable<OutlierDetectionLb>(std::move(args));
1076 }
1077
name() const1078 absl::string_view name() const override { return kOutlierDetection; }
1079
1080 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1081 ParseLoadBalancingConfig(const Json& json) const override {
1082 ValidationErrors errors;
1083 OutlierDetectionConfig outlier_detection_config;
1084 RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
1085 {
1086 outlier_detection_config =
1087 LoadFromJson<OutlierDetectionConfig>(json, JsonArgs(), &errors);
1088 // Parse childPolicy manually.
1089 {
1090 ValidationErrors::ScopedField field(&errors, ".childPolicy");
1091 auto it = json.object().find("childPolicy");
1092 if (it == json.object().end()) {
1093 errors.AddError("field not present");
1094 } else {
1095 auto child_policy_config = CoreConfiguration::Get()
1096 .lb_policy_registry()
1097 .ParseLoadBalancingConfig(it->second);
1098 if (!child_policy_config.ok()) {
1099 errors.AddError(child_policy_config.status().message());
1100 } else {
1101 child_policy = std::move(*child_policy_config);
1102 }
1103 }
1104 }
1105 }
1106 if (!errors.ok()) {
1107 return errors.status(
1108 absl::StatusCode::kInvalidArgument,
1109 "errors validating outlier_detection LB policy config");
1110 }
1111 return MakeRefCounted<OutlierDetectionLbConfig>(outlier_detection_config,
1112 std::move(child_policy));
1113 }
1114 };
1115
1116 } // namespace
1117
1118 //
1119 // OutlierDetectionConfig
1120 //
1121
1122 const JsonLoaderInterface*
JsonLoader(const JsonArgs &)1123 OutlierDetectionConfig::SuccessRateEjection::JsonLoader(const JsonArgs&) {
1124 static const auto* loader =
1125 JsonObjectLoader<SuccessRateEjection>()
1126 .OptionalField("stdevFactor", &SuccessRateEjection::stdev_factor)
1127 .OptionalField("enforcementPercentage",
1128 &SuccessRateEjection::enforcement_percentage)
1129 .OptionalField("minimumHosts", &SuccessRateEjection::minimum_hosts)
1130 .OptionalField("requestVolume", &SuccessRateEjection::request_volume)
1131 .Finish();
1132 return loader;
1133 }
1134
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)1135 void OutlierDetectionConfig::SuccessRateEjection::JsonPostLoad(
1136 const Json&, const JsonArgs&, ValidationErrors* errors) {
1137 if (enforcement_percentage > 100) {
1138 ValidationErrors::ScopedField field(errors, ".enforcement_percentage");
1139 errors->AddError("value must be <= 100");
1140 }
1141 }
1142
1143 const JsonLoaderInterface*
JsonLoader(const JsonArgs &)1144 OutlierDetectionConfig::FailurePercentageEjection::JsonLoader(const JsonArgs&) {
1145 static const auto* loader =
1146 JsonObjectLoader<FailurePercentageEjection>()
1147 .OptionalField("threshold", &FailurePercentageEjection::threshold)
1148 .OptionalField("enforcementPercentage",
1149 &FailurePercentageEjection::enforcement_percentage)
1150 .OptionalField("minimumHosts",
1151 &FailurePercentageEjection::minimum_hosts)
1152 .OptionalField("requestVolume",
1153 &FailurePercentageEjection::request_volume)
1154 .Finish();
1155 return loader;
1156 }
1157
JsonPostLoad(const Json &,const JsonArgs &,ValidationErrors * errors)1158 void OutlierDetectionConfig::FailurePercentageEjection::JsonPostLoad(
1159 const Json&, const JsonArgs&, ValidationErrors* errors) {
1160 if (enforcement_percentage > 100) {
1161 ValidationErrors::ScopedField field(errors, ".enforcement_percentage");
1162 errors->AddError("value must be <= 100");
1163 }
1164 if (threshold > 100) {
1165 ValidationErrors::ScopedField field(errors, ".threshold");
1166 errors->AddError("value must be <= 100");
1167 }
1168 }
1169
JsonLoader(const JsonArgs &)1170 const JsonLoaderInterface* OutlierDetectionConfig::JsonLoader(const JsonArgs&) {
1171 static const auto* loader =
1172 JsonObjectLoader<OutlierDetectionConfig>()
1173 .OptionalField("interval", &OutlierDetectionConfig::interval)
1174 .OptionalField("baseEjectionTime",
1175 &OutlierDetectionConfig::base_ejection_time)
1176 .OptionalField("maxEjectionTime",
1177 &OutlierDetectionConfig::max_ejection_time)
1178 .OptionalField("maxEjectionPercent",
1179 &OutlierDetectionConfig::max_ejection_percent)
1180 .OptionalField("successRateEjection",
1181 &OutlierDetectionConfig::success_rate_ejection)
1182 .OptionalField("failurePercentageEjection",
1183 &OutlierDetectionConfig::failure_percentage_ejection)
1184 .Finish();
1185 return loader;
1186 }
1187
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)1188 void OutlierDetectionConfig::JsonPostLoad(const Json& json, const JsonArgs&,
1189 ValidationErrors* errors) {
1190 if (json.object().find("maxEjectionTime") == json.object().end()) {
1191 max_ejection_time = std::max(base_ejection_time, Duration::Seconds(300));
1192 }
1193 if (max_ejection_percent > 100) {
1194 ValidationErrors::ScopedField field(errors, ".max_ejection_percent");
1195 errors->AddError("value must be <= 100");
1196 }
1197 }
1198
1199 //
1200 // Plugin registration
1201 //
1202
RegisterOutlierDetectionLbPolicy(CoreConfiguration::Builder * builder)1203 void RegisterOutlierDetectionLbPolicy(CoreConfiguration::Builder* builder) {
1204 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1205 std::make_unique<OutlierDetectionLbFactory>());
1206 }
1207
1208 } // namespace grpc_core
1209