1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <stddef.h>
18 #include <stdint.h>
19
20 #include <algorithm>
21 #include <array>
22 #include <chrono>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/status/status.h"
29 #include "absl/strings/string_view.h"
30 #include "absl/types/optional.h"
31 #include "absl/types/span.h"
32 #include "gtest/gtest.h"
33
34 #include <grpc/grpc.h>
35 #include <grpc/support/json.h>
36 #include <grpc/support/log.h>
37
38 #include "src/core/lib/gprpp/orphanable.h"
39 #include "src/core/lib/gprpp/ref_counted_ptr.h"
40 #include "src/core/lib/gprpp/time.h"
41 #include "src/core/lib/json/json.h"
42 #include "src/core/load_balancing/backend_metric_data.h"
43 #include "src/core/load_balancing/lb_policy.h"
44 #include "src/core/resolver/endpoint_addresses.h"
45 #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
46 #include "test/core/util/test_config.h"
47
48 namespace grpc_core {
49 namespace testing {
50 namespace {
51
52 class OutlierDetectionTest : public LoadBalancingPolicyTest {
53 protected:
54 class ConfigBuilder {
55 public:
ConfigBuilder()56 ConfigBuilder() {
57 SetChildPolicy(Json::Object{{"round_robin", Json::FromObject({})}});
58 }
59
SetInterval(Duration duration)60 ConfigBuilder& SetInterval(Duration duration) {
61 json_["interval"] = Json::FromString(duration.ToJsonString());
62 return *this;
63 }
SetBaseEjectionTime(Duration duration)64 ConfigBuilder& SetBaseEjectionTime(Duration duration) {
65 json_["baseEjectionTime"] = Json::FromString(duration.ToJsonString());
66 return *this;
67 }
SetMaxEjectionTime(Duration duration)68 ConfigBuilder& SetMaxEjectionTime(Duration duration) {
69 json_["maxEjectionTime"] = Json::FromString(duration.ToJsonString());
70 return *this;
71 }
SetMaxEjectionPercent(uint32_t value)72 ConfigBuilder& SetMaxEjectionPercent(uint32_t value) {
73 json_["maxEjectionPercent"] = Json::FromNumber(value);
74 return *this;
75 }
SetChildPolicy(Json::Object child_policy)76 ConfigBuilder& SetChildPolicy(Json::Object child_policy) {
77 json_["childPolicy"] =
78 Json::FromArray({Json::FromObject(std::move(child_policy))});
79 return *this;
80 }
81
SetSuccessRateStdevFactor(uint32_t value)82 ConfigBuilder& SetSuccessRateStdevFactor(uint32_t value) {
83 GetSuccessRate()["stdevFactor"] = Json::FromNumber(value);
84 return *this;
85 }
SetSuccessRateEnforcementPercentage(uint32_t value)86 ConfigBuilder& SetSuccessRateEnforcementPercentage(uint32_t value) {
87 GetSuccessRate()["enforcementPercentage"] = Json::FromNumber(value);
88 return *this;
89 }
SetSuccessRateMinHosts(uint32_t value)90 ConfigBuilder& SetSuccessRateMinHosts(uint32_t value) {
91 GetSuccessRate()["minimumHosts"] = Json::FromNumber(value);
92 return *this;
93 }
SetSuccessRateRequestVolume(uint32_t value)94 ConfigBuilder& SetSuccessRateRequestVolume(uint32_t value) {
95 GetSuccessRate()["requestVolume"] = Json::FromNumber(value);
96 return *this;
97 }
98
SetFailurePercentageThreshold(uint32_t value)99 ConfigBuilder& SetFailurePercentageThreshold(uint32_t value) {
100 GetFailurePercentage()["threshold"] = Json::FromNumber(value);
101 return *this;
102 }
SetFailurePercentageEnforcementPercentage(uint32_t value)103 ConfigBuilder& SetFailurePercentageEnforcementPercentage(uint32_t value) {
104 GetFailurePercentage()["enforcementPercentage"] = Json::FromNumber(value);
105 return *this;
106 }
SetFailurePercentageMinimumHosts(uint32_t value)107 ConfigBuilder& SetFailurePercentageMinimumHosts(uint32_t value) {
108 GetFailurePercentage()["minimumHosts"] = Json::FromNumber(value);
109 return *this;
110 }
SetFailurePercentageRequestVolume(uint32_t value)111 ConfigBuilder& SetFailurePercentageRequestVolume(uint32_t value) {
112 GetFailurePercentage()["requestVolume"] = Json::FromNumber(value);
113 return *this;
114 }
115
Build()116 RefCountedPtr<LoadBalancingPolicy::Config> Build() {
117 Json::Object fields = json_;
118 if (success_rate_.has_value()) {
119 fields["successRateEjection"] = Json::FromObject(*success_rate_);
120 }
121 if (failure_percentage_.has_value()) {
122 fields["failurePercentageEjection"] =
123 Json::FromObject(*failure_percentage_);
124 }
125 Json config = Json::FromArray(
126 {Json::FromObject({{"outlier_detection_experimental",
127 Json::FromObject(std::move(fields))}})});
128 return MakeConfig(config);
129 }
130
131 private:
GetSuccessRate()132 Json::Object& GetSuccessRate() {
133 if (!success_rate_.has_value()) success_rate_.emplace();
134 return *success_rate_;
135 }
136
GetFailurePercentage()137 Json::Object& GetFailurePercentage() {
138 if (!failure_percentage_.has_value()) failure_percentage_.emplace();
139 return *failure_percentage_;
140 }
141
142 Json::Object json_;
143 absl::optional<Json::Object> success_rate_;
144 absl::optional<Json::Object> failure_percentage_;
145 };
146
OutlierDetectionTest()147 OutlierDetectionTest()
148 : LoadBalancingPolicyTest("outlier_detection_experimental") {}
149
SetUp()150 void SetUp() override {
151 LoadBalancingPolicyTest::SetUp();
152 SetExpectedTimerDuration(std::chrono::seconds(10));
153 }
154
DoPickWithFailedCall(LoadBalancingPolicy::SubchannelPicker * picker)155 absl::optional<std::string> DoPickWithFailedCall(
156 LoadBalancingPolicy::SubchannelPicker* picker) {
157 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
158 subchannel_call_tracker;
159 auto address = ExpectPickComplete(picker, {}, &subchannel_call_tracker);
160 if (address.has_value()) {
161 subchannel_call_tracker->Start();
162 FakeMetadata metadata({});
163 FakeBackendMetricAccessor backend_metric_accessor({});
164 LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
165 *address, absl::UnavailableError("uh oh"), &metadata,
166 &backend_metric_accessor};
167 subchannel_call_tracker->Finish(args);
168 }
169 return address;
170 }
171 };
172
TEST_F(OutlierDetectionTest,Basic)173 TEST_F(OutlierDetectionTest, Basic) {
174 constexpr absl::string_view kAddressUri = "ipv4:127.0.0.1:443";
175 // Send an update containing one address.
176 absl::Status status = ApplyUpdate(
177 BuildUpdate({kAddressUri}, ConfigBuilder().Build()), lb_policy());
178 EXPECT_TRUE(status.ok()) << status;
179 // LB policy should have created a subchannel for the address.
180 auto* subchannel = FindSubchannel(kAddressUri);
181 ASSERT_NE(subchannel, nullptr);
182 // When the LB policy receives the subchannel's initial connectivity
183 // state notification (IDLE), it will request a connection.
184 EXPECT_TRUE(subchannel->ConnectionRequested());
185 // This causes the subchannel to start to connect, so it reports CONNECTING.
186 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
187 // LB policy should have reported CONNECTING state.
188 ExpectConnectingUpdate();
189 // When the subchannel becomes connected, it reports READY.
190 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
191 // The LB policy will report CONNECTING some number of times (doesn't
192 // matter how many) and then report READY.
193 auto picker = WaitForConnected();
194 ASSERT_NE(picker, nullptr);
195 // Picker should return the same subchannel repeatedly.
196 for (size_t i = 0; i < 3; ++i) {
197 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddressUri);
198 }
199 }
200
TEST_F(OutlierDetectionTest,FailurePercentage)201 TEST_F(OutlierDetectionTest, FailurePercentage) {
202 constexpr std::array<absl::string_view, 3> kAddresses = {
203 "ipv4:127.0.0.1:440", "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"};
204 // Send initial update.
205 absl::Status status = ApplyUpdate(
206 BuildUpdate(kAddresses, ConfigBuilder()
207 .SetFailurePercentageThreshold(1)
208 .SetFailurePercentageMinimumHosts(1)
209 .SetFailurePercentageRequestVolume(1)
210 .SetMaxEjectionTime(Duration::Seconds(1))
211 .SetBaseEjectionTime(Duration::Seconds(1))
212 .Build()),
213 lb_policy());
214 EXPECT_TRUE(status.ok()) << status;
215 // Expect normal startup.
216 auto picker = ExpectRoundRobinStartup(kAddresses);
217 ASSERT_NE(picker, nullptr);
218 gpr_log(GPR_INFO, "### RR startup complete");
219 // Do a pick and report a failed call.
220 auto address = DoPickWithFailedCall(picker.get());
221 ASSERT_TRUE(address.has_value());
222 gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
223 // Advance time and run the timer callback to trigger ejection.
224 IncrementTimeBy(Duration::Seconds(10));
225 gpr_log(GPR_INFO, "### ejection complete");
226 // Expect a picker update.
227 std::vector<absl::string_view> remaining_addresses;
228 for (const auto& addr : kAddresses) {
229 if (addr != *address) remaining_addresses.push_back(addr);
230 }
231 WaitForRoundRobinListChange(kAddresses, remaining_addresses);
232 // Advance time and run the timer callback to trigger un-ejection.
233 IncrementTimeBy(Duration::Seconds(10));
234 gpr_log(GPR_INFO, "### un-ejection complete");
235 // Expect a picker update.
236 WaitForRoundRobinListChange(remaining_addresses, kAddresses);
237 }
238
TEST_F(OutlierDetectionTest,MultipleAddressesPerEndpoint)239 TEST_F(OutlierDetectionTest, MultipleAddressesPerEndpoint) {
240 // Can't use timer duration expectation here, because the Happy
241 // Eyeballs timer inside pick_first will use a different duration than
242 // the timer in outlier_detection.
243 SetExpectedTimerDuration(absl::nullopt);
244 constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
245 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
246 constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
247 "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
248 constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
249 "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
250 const std::array<EndpointAddresses, 3> kEndpoints = {
251 MakeEndpointAddresses(kEndpoint1Addresses),
252 MakeEndpointAddresses(kEndpoint2Addresses),
253 MakeEndpointAddresses(kEndpoint3Addresses)};
254 // Send initial update.
255 absl::Status status = ApplyUpdate(
256 BuildUpdate(kEndpoints, ConfigBuilder()
257 .SetFailurePercentageThreshold(1)
258 .SetFailurePercentageMinimumHosts(1)
259 .SetFailurePercentageRequestVolume(1)
260 .SetMaxEjectionTime(Duration::Seconds(1))
261 .SetBaseEjectionTime(Duration::Seconds(1))
262 .Build()),
263 lb_policy_.get());
264 EXPECT_TRUE(status.ok()) << status;
265 // Expect normal startup.
266 auto picker = ExpectRoundRobinStartup(kEndpoints);
267 ASSERT_NE(picker, nullptr);
268 gpr_log(GPR_INFO, "### RR startup complete");
269 // Do a pick and report a failed call.
270 auto address = DoPickWithFailedCall(picker.get());
271 ASSERT_TRUE(address.has_value());
272 gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
273 // Based on the address that the failed call went to, we determine
274 // which addresses to use in the subsequent steps.
275 absl::Span<const absl::string_view> ejected_endpoint_addresses;
276 absl::Span<const absl::string_view> sentinel_endpoint_addresses;
277 absl::string_view unmodified_endpoint_address;
278 std::vector<absl::string_view> final_addresses;
279 if (kEndpoint1Addresses[0] == *address) {
280 ejected_endpoint_addresses = kEndpoint1Addresses;
281 sentinel_endpoint_addresses = kEndpoint2Addresses;
282 unmodified_endpoint_address = kEndpoint3Addresses[0];
283 final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[1],
284 kEndpoint3Addresses[0]};
285 } else if (kEndpoint2Addresses[0] == *address) {
286 ejected_endpoint_addresses = kEndpoint2Addresses;
287 sentinel_endpoint_addresses = kEndpoint1Addresses;
288 unmodified_endpoint_address = kEndpoint3Addresses[0];
289 final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[1],
290 kEndpoint3Addresses[0]};
291 } else {
292 ejected_endpoint_addresses = kEndpoint3Addresses;
293 sentinel_endpoint_addresses = kEndpoint1Addresses;
294 unmodified_endpoint_address = kEndpoint2Addresses[0];
295 final_addresses = {kEndpoint1Addresses[1], kEndpoint2Addresses[0],
296 kEndpoint3Addresses[1]};
297 }
298 // Advance time and run the timer callback to trigger ejection.
299 IncrementTimeBy(Duration::Seconds(10));
300 gpr_log(GPR_INFO, "### ejection complete");
301 // Expect a picker that removes the ejected address.
302 WaitForRoundRobinListChange(
303 {kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
304 {sentinel_endpoint_addresses[0], unmodified_endpoint_address});
305 gpr_log(GPR_INFO, "### ejected endpoint removed");
306 // Cause the connection to the ejected endpoint to fail, and then
307 // have it reconnect to a different address. The endpoint is still
308 // ejected, so the new address should not be used.
309 ExpectEndpointAddressChange(ejected_endpoint_addresses, 0, 1, nullptr);
310 // Need to drain the picker updates before calling
311 // ExpectEndpointAddressChange() again, since that will expect a
312 // re-resolution request in the queue.
313 DrainRoundRobinPickerUpdates(
314 {sentinel_endpoint_addresses[0], unmodified_endpoint_address});
315 gpr_log(GPR_INFO, "### done changing address of ejected endpoint");
316 // Do the same thing for the sentinel endpoint, so that we
317 // know that the LB policy has seen the address change for the ejected
318 // endpoint.
319 ExpectEndpointAddressChange(sentinel_endpoint_addresses, 0, 1, [&]() {
320 WaitForRoundRobinListChange(
321 {sentinel_endpoint_addresses[0], unmodified_endpoint_address},
322 {unmodified_endpoint_address});
323 });
324 WaitForRoundRobinListChange(
325 {unmodified_endpoint_address},
326 {sentinel_endpoint_addresses[1], unmodified_endpoint_address});
327 gpr_log(GPR_INFO, "### done changing address of ejected endpoint");
328 // Advance time and run the timer callback to trigger un-ejection.
329 IncrementTimeBy(Duration::Seconds(10));
330 gpr_log(GPR_INFO, "### un-ejection complete");
331 // The ejected endpoint should come back using the new address.
332 WaitForRoundRobinListChange(
333 {sentinel_endpoint_addresses[1], unmodified_endpoint_address},
334 final_addresses);
335 }
336
TEST_F(OutlierDetectionTest,EjectionStateResetsWhenEndpointAddressesChange)337 TEST_F(OutlierDetectionTest, EjectionStateResetsWhenEndpointAddressesChange) {
338 // Can't use timer duration expectation here, because the Happy
339 // Eyeballs timer inside pick_first will use a different duration than
340 // the timer in outlier_detection.
341 SetExpectedTimerDuration(absl::nullopt);
342 constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
343 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
344 constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
345 "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
346 constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
347 "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
348 const std::array<EndpointAddresses, 3> kEndpoints = {
349 MakeEndpointAddresses(kEndpoint1Addresses),
350 MakeEndpointAddresses(kEndpoint2Addresses),
351 MakeEndpointAddresses(kEndpoint3Addresses)};
352 auto kConfig = ConfigBuilder()
353 .SetFailurePercentageThreshold(1)
354 .SetFailurePercentageMinimumHosts(1)
355 .SetFailurePercentageRequestVolume(1)
356 .SetMaxEjectionTime(Duration::Seconds(1))
357 .SetBaseEjectionTime(Duration::Seconds(1))
358 .Build();
359 // Send initial update.
360 absl::Status status =
361 ApplyUpdate(BuildUpdate(kEndpoints, kConfig), lb_policy_.get());
362 EXPECT_TRUE(status.ok()) << status;
363 // Expect normal startup.
364 auto picker = ExpectRoundRobinStartup(kEndpoints);
365 ASSERT_NE(picker, nullptr);
366 gpr_log(GPR_INFO, "### RR startup complete");
367 // Do a pick and report a failed call.
368 auto ejected_address = DoPickWithFailedCall(picker.get());
369 ASSERT_TRUE(ejected_address.has_value());
370 gpr_log(GPR_INFO, "### failed RPC on %s", ejected_address->c_str());
371 // Based on the address that the failed call went to, we determine
372 // which addresses to use in the subsequent steps.
373 std::vector<absl::string_view> expected_round_robin_while_ejected;
374 std::vector<EndpointAddresses> new_endpoints;
375 if (kEndpoint1Addresses[0] == *ejected_address) {
376 expected_round_robin_while_ejected = {kEndpoint2Addresses[0],
377 kEndpoint3Addresses[0]};
378 new_endpoints = {MakeEndpointAddresses({kEndpoint1Addresses[0]}),
379 MakeEndpointAddresses(kEndpoint2Addresses),
380 MakeEndpointAddresses(kEndpoint3Addresses)};
381 } else if (kEndpoint2Addresses[0] == *ejected_address) {
382 expected_round_robin_while_ejected = {kEndpoint1Addresses[0],
383 kEndpoint3Addresses[0]};
384 new_endpoints = {MakeEndpointAddresses(kEndpoint1Addresses),
385 MakeEndpointAddresses({kEndpoint2Addresses[0]}),
386 MakeEndpointAddresses(kEndpoint3Addresses)};
387 } else {
388 expected_round_robin_while_ejected = {kEndpoint1Addresses[0],
389 kEndpoint2Addresses[0]};
390 new_endpoints = {MakeEndpointAddresses(kEndpoint1Addresses),
391 MakeEndpointAddresses(kEndpoint2Addresses),
392 MakeEndpointAddresses({kEndpoint3Addresses[0]})};
393 }
394 // Advance time and run the timer callback to trigger ejection.
395 IncrementTimeBy(Duration::Seconds(10));
396 gpr_log(GPR_INFO, "### ejection complete");
397 // Expect a picker that removes the ejected address.
398 WaitForRoundRobinListChange(
399 {kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
400 expected_round_robin_while_ejected);
401 gpr_log(GPR_INFO, "### ejected endpoint removed");
402 // Send an update that removes the other address from the ejected endpoint.
403 status = ApplyUpdate(BuildUpdate(new_endpoints, kConfig), lb_policy_.get());
404 EXPECT_TRUE(status.ok()) << status;
405 // This should cause the address to start getting used again, since
406 // it's now associated with a different endpoint.
407 WaitForRoundRobinListChange(
408 expected_round_robin_while_ejected,
409 {kEndpoint1Addresses[0], kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
410 }
411
TEST_F(OutlierDetectionTest,DoesNotWorkWithPickFirst)412 TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) {
413 // Can't use timer duration expectation here, because the Happy
414 // Eyeballs timer inside pick_first will use a different duration than
415 // the timer in outlier_detection.
416 SetExpectedTimerDuration(absl::nullopt);
417 constexpr std::array<absl::string_view, 3> kAddresses = {
418 "ipv4:127.0.0.1:440", "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"};
419 // Send initial update.
420 absl::Status status = ApplyUpdate(
421 BuildUpdate(kAddresses,
422 ConfigBuilder()
423 .SetFailurePercentageThreshold(1)
424 .SetFailurePercentageMinimumHosts(1)
425 .SetFailurePercentageRequestVolume(1)
426 .SetChildPolicy({{"pick_first", Json::FromObject({})}})
427 .Build()),
428 lb_policy());
429 EXPECT_TRUE(status.ok()) << status;
430 // LB policy should have created a subchannel for the first address.
431 auto* subchannel = FindSubchannel(kAddresses[0]);
432 ASSERT_NE(subchannel, nullptr);
433 // When the LB policy receives the subchannel's initial connectivity
434 // state notification (IDLE), it will request a connection.
435 EXPECT_TRUE(subchannel->ConnectionRequested());
436 // This causes the subchannel to start to connect, so it reports CONNECTING.
437 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
438 // LB policy should have reported CONNECTING state.
439 ExpectConnectingUpdate();
440 // When the subchannel becomes connected, it reports READY.
441 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
442 // The LB policy will report CONNECTING some number of times (doesn't
443 // matter how many) and then report READY.
444 auto picker = WaitForConnected();
445 ASSERT_NE(picker, nullptr);
446 // Picker should return the same subchannel repeatedly.
447 for (size_t i = 0; i < 3; ++i) {
448 EXPECT_EQ(ExpectPickComplete(picker.get()), kAddresses[0]);
449 }
450 gpr_log(GPR_INFO, "### PF startup complete");
451 // Now have an RPC to that subchannel fail.
452 auto address = DoPickWithFailedCall(picker.get());
453 ASSERT_TRUE(address.has_value());
454 gpr_log(GPR_INFO, "### failed RPC on %s", address->c_str());
455 // Advance time and run the timer callback to trigger ejection.
456 IncrementTimeBy(Duration::Seconds(10));
457 gpr_log(GPR_INFO, "### ejection timer pass complete");
458 // Subchannel should not be ejected.
459 ExpectQueueEmpty();
460 // Subchannel should not see a reconnection request.
461 EXPECT_FALSE(subchannel->ConnectionRequested());
462 }
463
464 } // namespace
465 } // namespace testing
466 } // namespace grpc_core
467
main(int argc,char ** argv)468 int main(int argc, char** argv) {
469 ::testing::InitGoogleTest(&argc, argv);
470 grpc::testing::TestEnvironment env(&argc, argv);
471 return RUN_ALL_TESTS();
472 }
473