1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 #include <numeric>
17 #include <string>
18 #include <vector>
19
20 #include <gmock/gmock.h>
21 #include <gtest/gtest.h>
22
23 #include "absl/strings/match.h"
24 #include "absl/strings/str_cat.h"
25
26 #include "src/core/client_channel/backup_poller.h"
27 #include "src/core/lib/address_utils/sockaddr_utils.h"
28 #include "src/core/lib/channel/call_tracer.h"
29 #include "src/core/lib/config/config_vars.h"
30 #include "src/core/lib/surface/call.h"
31 #include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
32 #include "test/core/util/fake_stats_plugin.h"
33 #include "test/core/util/scoped_env_var.h"
34 #include "test/cpp/end2end/connection_attempt_injector.h"
35 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
36
37 namespace grpc {
38 namespace testing {
39 namespace {
40
41 using ::envoy::config::cluster::v3::CircuitBreakers;
42 using ::envoy::config::cluster::v3::RoutingPriority;
43 using ::envoy::config::core::v3::HealthStatus;
44 using ::envoy::type::v3::FractionalPercent;
45
46 using ClientStats = LrsServiceImpl::ClientStats;
47 using OptionalLabelKey =
48 grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey;
49
50 constexpr char kLbDropType[] = "lb";
51 constexpr char kThrottleDropType[] = "throttle";
52 constexpr char kStatusMessageDropPrefix[] = "EDS-configured drop: ";
53
54 //
55 // CDS tests
56 //
57
58 using CdsTest = XdsEnd2endTest;
59
60 INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest, ::testing::Values(XdsTestType()),
61 &XdsTestType::Name);
62
63 // Tests that CDS client should send an ACK upon correct CDS response.
TEST_P(CdsTest,Vanilla)64 TEST_P(CdsTest, Vanilla) {
65 (void)SendRpc();
66 auto response_state = balancer_->ads_service()->cds_response_state();
67 ASSERT_TRUE(response_state.has_value());
68 EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
69 }
70
71 // Testing just one example of an invalid resource here.
72 // Unit tests for XdsClusterResourceType have exhaustive tests for all
73 // of the invalid cases.
TEST_P(CdsTest,InvalidClusterResource)74 TEST_P(CdsTest, InvalidClusterResource) {
75 auto cluster = default_cluster_;
76 cluster.set_type(Cluster::STATIC);
77 balancer_->ads_service()->SetCdsResource(cluster);
78 const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
79 ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
80 EXPECT_EQ(response_state->error_message,
81 "xDS response validation errors: ["
82 "resource index 0: cluster_name: "
83 "INVALID_ARGUMENT: errors validating Cluster resource: ["
84 "field:type error:unknown discovery type]]");
85 }
86
87 // Tests that we don't trigger does-not-exist callbacks for a resource
88 // that was previously valid but is updated to be invalid.
TEST_P(CdsTest,InvalidClusterStillExistsIfPreviouslyCached)89 TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) {
90 CreateAndStartBackends(1);
91 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
92 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
93 // Check that everything works.
94 CheckRpcSendOk(DEBUG_LOCATION);
95 // Now send an update changing the Cluster to be invalid.
96 auto cluster = default_cluster_;
97 cluster.set_type(Cluster::STATIC);
98 balancer_->ads_service()->SetCdsResource(cluster);
99 const auto response_state =
100 WaitForCdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK);
101 ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
102 EXPECT_EQ(response_state->error_message,
103 "xDS response validation errors: ["
104 "resource index 0: cluster_name: "
105 "INVALID_ARGUMENT: errors validating Cluster resource: ["
106 "field:type error:unknown discovery type]]");
107 CheckRpcSendOk(DEBUG_LOCATION);
108 }
109
110 // Tests round robin is not implacted by the endpoint weight, and that the
111 // localities in a locality map are picked according to their weights.
TEST_P(CdsTest,EndpointWeightDoesNotImpactWeightedRoundRobin)112 TEST_P(CdsTest, EndpointWeightDoesNotImpactWeightedRoundRobin) {
113 CreateAndStartBackends(2);
114 const int kLocalityWeight0 = 2;
115 const int kLocalityWeight1 = 8;
116 const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
117 const double kLocalityWeightRate0 =
118 static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
119 const double kLocalityWeightRate1 =
120 static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
121 const double kErrorTolerance = 0.05;
122 const size_t kNumRpcs =
123 ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
124 // ADS response contains 2 localities, each of which contains 1 backend.
125 EdsResourceArgs args({
126 {"locality0",
127 {CreateEndpoint(0, HealthStatus::UNKNOWN, 8)},
128 kLocalityWeight0},
129 {"locality1",
130 {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)},
131 kLocalityWeight1},
132 });
133 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
134 // Wait for both backends to be ready.
135 WaitForAllBackends(DEBUG_LOCATION, 0, 2);
136 // Send kNumRpcs RPCs.
137 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
138 // The locality picking rates should be roughly equal to the expectation.
139 const double locality_picked_rate_0 =
140 static_cast<double>(backends_[0]->backend_service()->request_count()) /
141 kNumRpcs;
142 const double locality_picked_rate_1 =
143 static_cast<double>(backends_[1]->backend_service()->request_count()) /
144 kNumRpcs;
145 EXPECT_THAT(locality_picked_rate_0,
146 ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
147 EXPECT_THAT(locality_picked_rate_1,
148 ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
149 }
150
151 // In most of our tests, we use different names for different resource
152 // types, to make sure that there are no cut-and-paste errors in the code
153 // that cause us to look at data for the wrong resource type. So we add
154 // this test to make sure that the EDS resource name defaults to the
155 // cluster name if not specified in the CDS resource.
TEST_P(CdsTest,EdsServiceNameDefaultsToClusterName)156 TEST_P(CdsTest, EdsServiceNameDefaultsToClusterName) {
157 CreateAndStartBackends(1);
158 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
159 balancer_->ads_service()->SetEdsResource(
160 BuildEdsResource(args, kDefaultClusterName));
161 Cluster cluster = default_cluster_;
162 cluster.mutable_eds_cluster_config()->clear_service_name();
163 balancer_->ads_service()->SetCdsResource(cluster);
164 CheckRpcSendOk(DEBUG_LOCATION, /*times=*/1,
165 RpcOptions().set_timeout_ms(5000));
166 }
167
168 // Tests switching over from one cluster to another.
TEST_P(CdsTest,ChangeClusters)169 TEST_P(CdsTest, ChangeClusters) {
170 CreateAndStartBackends(2);
171 const char* kNewClusterName = "new_cluster_name";
172 const char* kNewEdsServiceName = "new_eds_service_name";
173 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
174 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
175 // We need to wait for all backends to come online.
176 WaitForAllBackends(DEBUG_LOCATION, 0, 1);
177 // Populate new EDS resource.
178 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
179 balancer_->ads_service()->SetEdsResource(
180 BuildEdsResource(args, kNewEdsServiceName));
181 // Populate new CDS resource.
182 Cluster new_cluster = default_cluster_;
183 new_cluster.set_name(kNewClusterName);
184 new_cluster.mutable_eds_cluster_config()->set_service_name(
185 kNewEdsServiceName);
186 balancer_->ads_service()->SetCdsResource(new_cluster);
187 // Change RDS resource to point to new cluster.
188 RouteConfiguration new_route_config = default_route_config_;
189 new_route_config.mutable_virtual_hosts(0)
190 ->mutable_routes(0)
191 ->mutable_route()
192 ->set_cluster(kNewClusterName);
193 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
194 new_route_config);
195 // Wait for all new backends to be used.
196 WaitForAllBackends(DEBUG_LOCATION, 1, 2);
197 }
198
TEST_P(CdsTest,CircuitBreaking)199 TEST_P(CdsTest, CircuitBreaking) {
200 CreateAndStartBackends(1);
201 constexpr size_t kMaxConcurrentRequests = 10;
202 // Populate new EDS resources.
203 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
204 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
205 // Update CDS resource to set max concurrent request.
206 CircuitBreakers circuit_breaks;
207 Cluster cluster = default_cluster_;
208 auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
209 threshold->set_priority(RoutingPriority::DEFAULT);
210 threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
211 balancer_->ads_service()->SetCdsResource(cluster);
212 // Send exactly max_concurrent_requests long RPCs.
213 LongRunningRpc rpcs[kMaxConcurrentRequests];
214 for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
215 rpcs[i].StartRpc(stub_.get());
216 }
217 // Wait for all RPCs to be in flight.
218 while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
219 kMaxConcurrentRequests) {
220 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
221 gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
222 }
223 // Sending a RPC now should fail, the error message should tell us
224 // we hit the max concurrent requests limit and got dropped.
225 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
226 "circuit breaker drop");
227 // Cancel one RPC to allow another one through.
228 rpcs[0].CancelRpc();
229 // Add a sleep here to ensure the RPC cancellation has completed correctly
230 // before trying the next RPC. There maybe a slight delay between return of
231 // CANCELLED RPC status and update of internal state tracking the number of
232 // concurrent active requests.
233 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
234 gpr_time_from_millis(1000, GPR_TIMESPAN)));
235 CheckRpcSendOk(DEBUG_LOCATION);
236 // Clean up.
237 for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
238 rpcs[i].CancelRpc();
239 }
240 }
241
TEST_P(CdsTest,CircuitBreakingMultipleChannelsShareCallCounter)242 TEST_P(CdsTest, CircuitBreakingMultipleChannelsShareCallCounter) {
243 CreateAndStartBackends(1);
244 constexpr size_t kMaxConcurrentRequests = 10;
245 // Populate new EDS resources.
246 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
247 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
248 // Update CDS resource to set max concurrent request.
249 CircuitBreakers circuit_breaks;
250 Cluster cluster = default_cluster_;
251 auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
252 threshold->set_priority(RoutingPriority::DEFAULT);
253 threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
254 balancer_->ads_service()->SetCdsResource(cluster);
255 auto channel2 = CreateChannel();
256 auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
257 // Send exactly max_concurrent_requests long RPCs, alternating between
258 // the two channels.
259 LongRunningRpc rpcs[kMaxConcurrentRequests];
260 for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
261 rpcs[i].StartRpc(i % 2 == 0 ? stub_.get() : stub2.get());
262 }
263 // Wait for all RPCs to be in flight.
264 while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
265 kMaxConcurrentRequests) {
266 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
267 gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
268 }
269 // Sending a RPC now should fail, the error message should tell us
270 // we hit the max concurrent requests limit and got dropped.
271 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
272 "circuit breaker drop");
273 // Cancel one RPC to allow another one through
274 rpcs[0].CancelRpc();
275 // Add a sleep here to ensure the RPC cancellation has completed correctly
276 // before trying the next RPC. There maybe a slight delay between return of
277 // CANCELLED RPC status and update of internal state tracking the number of
278 // concurrent active requests.
279 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
280 gpr_time_from_millis(1000, GPR_TIMESPAN)));
281 CheckRpcSendOk(DEBUG_LOCATION);
282 // Clean up.
283 for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
284 rpcs[i].CancelRpc();
285 }
286 }
287
TEST_P(CdsTest,ClusterChangeAfterAdsCallFails)288 TEST_P(CdsTest, ClusterChangeAfterAdsCallFails) {
289 CreateAndStartBackends(2);
290 const char* kNewEdsResourceName = "new_eds_resource_name";
291 // Populate EDS resources.
292 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
293 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
294 // Check that the channel is working.
295 CheckRpcSendOk(DEBUG_LOCATION);
296 // Stop and restart the balancer.
297 balancer_->Shutdown();
298 balancer_->Start();
299 // Create new EDS resource.
300 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
301 balancer_->ads_service()->SetEdsResource(
302 BuildEdsResource(args, kNewEdsResourceName));
303 // Change CDS resource to point to new EDS resource.
304 auto cluster = default_cluster_;
305 cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
306 balancer_->ads_service()->SetCdsResource(cluster);
307 // Make sure client sees the change.
308 WaitForBackend(DEBUG_LOCATION, 1);
309 }
310
TEST_P(CdsTest,MetricLabels)311 TEST_P(CdsTest, MetricLabels) {
312 // Injects a fake client call tracer factory. Try keep this at top.
313 grpc_core::FakeClientCallTracerFactory fake_client_call_tracer_factory;
314 CreateAndStartBackends(2);
315 // Populates EDS resources.
316 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)},
317 {"locality1", CreateEndpointsForBackends(1, 2)}});
318 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
319 // Populates service labels to CDS resources.
320 auto cluster = default_cluster_;
321 auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
322 auto& label_map =
323 *filter_map["com.google.csm.telemetry_labels"].mutable_fields();
324 *label_map["service_name"].mutable_string_value() = "myservice";
325 *label_map["service_namespace"].mutable_string_value() = "mynamespace";
326 balancer_->ads_service()->SetCdsResource(cluster);
327 ChannelArguments channel_args;
328 channel_args.SetPointer(GRPC_ARG_INJECT_FAKE_CLIENT_CALL_TRACER_FACTORY,
329 &fake_client_call_tracer_factory);
330 ResetStub(/*failover_timeout_ms=*/0, &channel_args);
331 // Send an RPC to backend 0.
332 WaitForBackend(DEBUG_LOCATION, 0);
333 // Verify that the optional labels are recorded in the call tracer.
334 EXPECT_THAT(
335 fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
336 ->GetLastCallAttemptTracer()
337 ->GetOptionalLabels(),
338 ::testing::ElementsAre(
339 ::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"),
340 ::testing::Pair(OptionalLabelKey::kXdsServiceNamespace,
341 "mynamespace"),
342 ::testing::Pair(OptionalLabelKey::kLocality,
343 LocalityNameString("locality0"))));
344 // Send an RPC to backend 1.
345 WaitForBackend(DEBUG_LOCATION, 1);
346 // Verify that the optional labels are recorded in the call
347 // tracer.
348 EXPECT_THAT(
349 fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
350 ->GetLastCallAttemptTracer()
351 ->GetOptionalLabels(),
352 ::testing::ElementsAre(
353 ::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"),
354 ::testing::Pair(OptionalLabelKey::kXdsServiceNamespace,
355 "mynamespace"),
356 ::testing::Pair(OptionalLabelKey::kLocality,
357 LocalityNameString("locality1"))));
358 // TODO(yashkt, yijiem): This shutdown shouldn't actually be necessary. The
359 // only reason it's here is to add a delay before
360 // fake_client_call_tracer_factory goes out of scope, since there may be
361 // lingering callbacks in the call stack that are using the CallAttemptTracer
362 // even after we get here, which would then cause a crash. Find a cleaner way
363 // to fix this.
364 balancer_->Shutdown();
365 }
366
367 //
368 // CDS deletion tests
369 //
370
371 class CdsDeletionTest : public XdsEnd2endTest {
372 protected:
SetUp()373 void SetUp() override {} // Individual tests call InitClient().
374 };
375
376 INSTANTIATE_TEST_SUITE_P(XdsTest, CdsDeletionTest,
377 ::testing::Values(XdsTestType()), &XdsTestType::Name);
378
379 // Tests that we go into TRANSIENT_FAILURE if the Cluster is deleted.
TEST_P(CdsDeletionTest,ClusterDeleted)380 TEST_P(CdsDeletionTest, ClusterDeleted) {
381 InitClient();
382 CreateAndStartBackends(1);
383 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
384 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
385 // We need to wait for all backends to come online.
386 WaitForAllBackends(DEBUG_LOCATION);
387 // Unset CDS resource.
388 balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
389 // Wait for RPCs to start failing.
390 SendRpcsUntil(DEBUG_LOCATION, [](const RpcResult& result) {
391 if (result.status.ok()) return true; // Keep going.
392 EXPECT_EQ(StatusCode::UNAVAILABLE, result.status.error_code());
393 EXPECT_EQ(
394 absl::StrCat("CDS resource ", kDefaultClusterName, " does not exist"),
395 result.status.error_message());
396 return false;
397 });
398 // Make sure we ACK'ed the update.
399 auto response_state = balancer_->ads_service()->cds_response_state();
400 ASSERT_TRUE(response_state.has_value());
401 EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
402 }
403
404 // Tests that we ignore Cluster deletions if configured to do so.
TEST_P(CdsDeletionTest,ClusterDeletionIgnored)405 TEST_P(CdsDeletionTest, ClusterDeletionIgnored) {
406 InitClient(MakeBootstrapBuilder().SetIgnoreResourceDeletion());
407 CreateAndStartBackends(2);
408 // Bring up client pointing to backend 0 and wait for it to connect.
409 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
410 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
411 WaitForAllBackends(DEBUG_LOCATION, 0, 1);
412 // Make sure we ACKed the CDS update.
413 auto response_state = balancer_->ads_service()->cds_response_state();
414 ASSERT_TRUE(response_state.has_value());
415 EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
416 // Unset CDS resource and wait for client to ACK the update.
417 balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
418 const auto deadline = absl::Now() + absl::Seconds(30);
419 while (true) {
420 ASSERT_LT(absl::Now(), deadline) << "timed out waiting for CDS ACK";
421 response_state = balancer_->ads_service()->cds_response_state();
422 if (response_state.has_value()) break;
423 }
424 EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
425 // Make sure we can still send RPCs.
426 CheckRpcSendOk(DEBUG_LOCATION);
427 // Now recreate the CDS resource pointing to a new EDS resource that
428 // specified backend 1, and make sure the client uses it.
429 const char* kNewEdsResourceName = "new_eds_resource_name";
430 auto cluster = default_cluster_;
431 cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
432 balancer_->ads_service()->SetCdsResource(cluster);
433 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
434 balancer_->ads_service()->SetEdsResource(
435 BuildEdsResource(args, kNewEdsResourceName));
436 // Wait for client to start using backend 1.
437 WaitForAllBackends(DEBUG_LOCATION, 1, 2);
438 }
439
440 //
441 // EDS tests
442 //
443
444 using EdsTest = XdsEnd2endTest;
445
446 INSTANTIATE_TEST_SUITE_P(
447 XdsTest, EdsTest,
448 ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
449 &XdsTestType::Name);
450
451 // Tests that the balancer sends the correct response to the client, and the
452 // client sends RPCs to the backends using the default child policy.
TEST_P(EdsTest,Vanilla)453 TEST_P(EdsTest, Vanilla) {
454 CreateAndStartBackends(3);
455 const size_t kNumRpcsPerAddress = 100;
456 EdsResourceArgs args({
457 {"locality0", CreateEndpointsForBackends()},
458 });
459 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
460 // Make sure that trying to connect works without a call.
461 channel_->GetState(true /* try_to_connect */);
462 // We need to wait for all backends to come online.
463 WaitForAllBackends(DEBUG_LOCATION);
464 // Send kNumRpcsPerAddress RPCs per server.
465 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
466 // Each backend should have gotten 100 requests.
467 for (size_t i = 0; i < backends_.size(); ++i) {
468 EXPECT_EQ(kNumRpcsPerAddress,
469 backends_[i]->backend_service()->request_count());
470 }
471 // Check LB policy name for the channel.
472 EXPECT_EQ("xds_cluster_manager_experimental",
473 channel_->GetLoadBalancingPolicyName());
474 }
475
TEST_P(EdsTest,MultipleAddressesPerEndpoint)476 TEST_P(EdsTest, MultipleAddressesPerEndpoint) {
477 grpc_core::testing::ScopedExperimentalEnvVar env(
478 "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
479 const size_t kNumRpcsPerAddress = 10;
480 // Create 3 backends, but leave backend 0 unstarted.
481 CreateBackends(3);
482 StartBackend(1);
483 StartBackend(2);
484 // The first endpoint is backends 0 and 1, the second endpoint is backend 2.
485 EdsResourceArgs args({
486 {"locality0",
487 {CreateEndpoint(0, HealthStatus::UNKNOWN, 1, {1}), CreateEndpoint(2)}},
488 });
489 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
490 // Initially, backend 0 is offline, so the first endpoint should
491 // connect to backend 1 instead. Traffic should round-robin across
492 // backends 1 and 2.
493 WaitForAllBackends(DEBUG_LOCATION, 1); // Wait for backends 1 and 2.
494 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
495 EXPECT_EQ(kNumRpcsPerAddress,
496 backends_[1]->backend_service()->request_count());
497 EXPECT_EQ(kNumRpcsPerAddress,
498 backends_[2]->backend_service()->request_count());
499 // Now start backend 0 and shutdown backend 1.
500 StartBackend(0);
501 ShutdownBackend(1);
502 // Wait for traffic to go to backend 0.
503 WaitForBackend(DEBUG_LOCATION, 0);
504 // Traffic should now round-robin across backends 0 and 2.
505 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
506 EXPECT_EQ(kNumRpcsPerAddress,
507 backends_[0]->backend_service()->request_count());
508 EXPECT_EQ(kNumRpcsPerAddress,
509 backends_[2]->backend_service()->request_count());
510 }
511
TEST_P(EdsTest,IgnoresUnhealthyEndpoints)512 TEST_P(EdsTest, IgnoresUnhealthyEndpoints) {
513 CreateAndStartBackends(2);
514 const size_t kNumRpcsPerAddress = 100;
515 auto endpoints = CreateEndpointsForBackends();
516 endpoints.push_back(MakeNonExistantEndpoint());
517 endpoints.back().health_status = HealthStatus::DRAINING;
518 EdsResourceArgs args({
519 {"locality0", std::move(endpoints), kDefaultLocalityWeight,
520 kDefaultLocalityPriority},
521 });
522 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
523 // Make sure that trying to connect works without a call.
524 channel_->GetState(true /* try_to_connect */);
525 // We need to wait for all backends to come online.
526 WaitForAllBackends(DEBUG_LOCATION);
527 // Send kNumRpcsPerAddress RPCs per server.
528 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
529 // Each backend should have gotten 100 requests.
530 for (size_t i = 0; i < backends_.size(); ++i) {
531 EXPECT_EQ(kNumRpcsPerAddress,
532 backends_[i]->backend_service()->request_count());
533 }
534 }
535
536 // This tests the bug described in https://github.com/grpc/grpc/issues/32486.
TEST_P(EdsTest,LocalityBecomesEmptyWithDeactivatedChildStateUpdate)537 TEST_P(EdsTest, LocalityBecomesEmptyWithDeactivatedChildStateUpdate) {
538 CreateAndStartBackends(1);
539 // Initial EDS resource has one locality with no endpoints.
540 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
541 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
542 WaitForAllBackends(DEBUG_LOCATION);
543 // EDS update removes all endpoints from the locality.
544 EdsResourceArgs::Locality empty_locality("locality0", {});
545 args = EdsResourceArgs({std::move(empty_locality)});
546 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
547 // Wait for RPCs to start failing.
548 constexpr char kErrorMessage[] =
549 "no children in weighted_target policy: "
550 "EDS resource eds_service_name contains empty localities: "
551 "\\[\\{region=\"xds_default_locality_region\", "
552 "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
553 SendRpcsUntil(DEBUG_LOCATION, [&](const RpcResult& result) {
554 if (result.status.ok()) return true;
555 EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
556 EXPECT_THAT(result.status.error_message(),
557 ::testing::MatchesRegex(kErrorMessage));
558 return false;
559 });
560 // Shut down backend. This triggers a connectivity state update from the
561 // deactivated child of the weighted_target policy.
562 ShutdownAllBackends();
563 // Now restart the backend.
564 StartAllBackends();
565 // Re-add endpoint.
566 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
567 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
568 // RPCs should eventually succeed.
569 WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
570 if (!result.status.ok()) {
571 EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
572 EXPECT_THAT(result.status.error_message(),
573 ::testing::MatchesRegex(absl::StrCat(
574 // The error message we see here depends on whether
575 // the client sees the EDS update before or after it
576 // sees the backend come back up.
577 MakeConnectionFailureRegex(
578 "connections to all backends failing; last error: "),
579 "|", kErrorMessage)));
580 }
581 });
582 }
583
TEST_P(EdsTest,NoLocalities)584 TEST_P(EdsTest, NoLocalities) {
585 CreateAndStartBackends(1);
586 // Initial EDS resource has no localities.
587 EdsResourceArgs args;
588 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
589 // RPCs should fail.
590 constexpr char kErrorMessage[] =
591 "no children in weighted_target policy: EDS resource eds_service_name "
592 "contains no localities";
593 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
594 // Send EDS resource that has an endpoint.
595 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
596 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
597 // RPCs should eventually succeed.
598 WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
599 if (!result.status.ok()) {
600 EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
601 EXPECT_THAT(result.status.error_message(),
602 ::testing::MatchesRegex(kErrorMessage));
603 }
604 });
605 }
606
607 // Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if
608 // all the servers are unreachable.
TEST_P(EdsTest,AllServersUnreachableFailFast)609 TEST_P(EdsTest, AllServersUnreachableFailFast) {
610 // Set Rpc timeout to 5 seconds to ensure there is enough time
611 // for communication with the xDS server to take place upon test start up.
612 const uint32_t kRpcTimeoutMs = 5000;
613 const size_t kNumUnreachableServers = 5;
614 std::vector<EdsResourceArgs::Endpoint> endpoints;
615 for (size_t i = 0; i < kNumUnreachableServers; ++i) {
616 endpoints.emplace_back(MakeNonExistantEndpoint());
617 }
618 EdsResourceArgs args({{"locality0", std::move(endpoints)}});
619 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
620 // The error shouldn't be DEADLINE_EXCEEDED because timeout is set to 5
621 // seconds, and we should disocver in that time that the target backend is
622 // down.
623 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
624 MakeConnectionFailureRegex(
625 "connections to all backends failing; last error: "),
626 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
627 }
628
629 // Tests that RPCs fail when the backends are down, and will succeed again
630 // after the backends are restarted.
TEST_P(EdsTest,BackendsRestart)631 TEST_P(EdsTest, BackendsRestart) {
632 CreateAndStartBackends(3);
633 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
634 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
635 WaitForAllBackends(DEBUG_LOCATION);
636 // Stop backends. RPCs should fail.
637 ShutdownAllBackends();
638 // Wait for channel to transition out of READY, so that we know it has
639 // noticed that all of the subchannels have failed. Note that it may
640 // be reporting either CONNECTING or TRANSIENT_FAILURE at this point.
641 EXPECT_TRUE(channel_->WaitForStateChange(
642 GRPC_CHANNEL_READY, grpc_timeout_seconds_to_deadline(5)));
643 EXPECT_THAT(channel_->GetState(false),
644 ::testing::AnyOf(::testing::Eq(GRPC_CHANNEL_TRANSIENT_FAILURE),
645 ::testing::Eq(GRPC_CHANNEL_CONNECTING)));
646 // RPCs should fail.
647 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
648 MakeConnectionFailureRegex(
649 "connections to all backends failing; last error: "));
650 // Restart all backends. RPCs should start succeeding again.
651 StartAllBackends();
652 CheckRpcSendOk(DEBUG_LOCATION, 1,
653 RpcOptions().set_timeout_ms(2000).set_wait_for_ready(true));
654 }
655
TEST_P(EdsTest,IgnoresDuplicateUpdates)656 TEST_P(EdsTest, IgnoresDuplicateUpdates) {
657 CreateAndStartBackends(1);
658 const size_t kNumRpcsPerAddress = 100;
659 EdsResourceArgs args({
660 {"locality0", CreateEndpointsForBackends()},
661 });
662 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
663 // Wait for all backends to come online.
664 WaitForAllBackends(DEBUG_LOCATION);
665 // Send kNumRpcsPerAddress RPCs per server, but send an EDS update in
666 // between. If the update is not ignored, this will cause the
667 // round_robin policy to see an update, which will randomly reset its
668 // position in the address list.
669 for (size_t i = 0; i < kNumRpcsPerAddress; ++i) {
670 CheckRpcSendOk(DEBUG_LOCATION, 2);
671 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
672 CheckRpcSendOk(DEBUG_LOCATION, 2);
673 }
674 // Each backend should have gotten the right number of requests.
675 for (size_t i = 1; i < backends_.size(); ++i) {
676 EXPECT_EQ(kNumRpcsPerAddress,
677 backends_[i]->backend_service()->request_count());
678 }
679 }
680
681 // Testing just one example of an invalid resource here.
682 // Unit tests for XdsEndpointResourceType have exhaustive tests for all
683 // of the invalid cases.
TEST_P(EdsTest,NacksInvalidResource)684 TEST_P(EdsTest, NacksInvalidResource) {
685 EdsResourceArgs args({
686 {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
687 });
688 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
689 const auto response_state = WaitForEdsNack(DEBUG_LOCATION);
690 ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
691 EXPECT_EQ(response_state->error_message,
692 "xDS response validation errors: ["
693 "resource index 0: eds_service_name: "
694 "INVALID_ARGUMENT: errors parsing EDS resource: ["
695 "field:endpoints error:priority 0 empty]]");
696 }
697
698 // Tests that if the balancer is down, the RPCs will still be sent to the
699 // backends according to the last balancer response, until a new balancer is
700 // reachable.
TEST_P(EdsTest,KeepUsingLastDataIfBalancerGoesDown)701 TEST_P(EdsTest, KeepUsingLastDataIfBalancerGoesDown) {
702 CreateAndStartBackends(2);
703 // Set up EDS resource pointing to backend 0.
704 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
705 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
706 // Start the client and make sure it sees the backend.
707 WaitForBackend(DEBUG_LOCATION, 0);
708 // Stop the balancer, and verify that RPCs continue to flow to backend 0.
709 balancer_->Shutdown();
710 auto deadline = grpc_timeout_seconds_to_deadline(5);
711 do {
712 CheckRpcSendOk(DEBUG_LOCATION);
713 } while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
714 // Check the EDS resource to point to backend 1 and bring the balancer
715 // back up.
716 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
717 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
718 balancer_->Start();
719 // Wait for client to see backend 1.
720 WaitForBackend(DEBUG_LOCATION, 1);
721 }
722
723 // Tests that the localities in a locality map are picked according to their
724 // weights.
TEST_P(EdsTest,LocalityWeights)725 TEST_P(EdsTest, LocalityWeights) {
726 CreateAndStartBackends(2);
727 const int kLocalityWeight0 = 2;
728 const int kLocalityWeight1 = 8;
729 const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
730 const double kLocalityWeightRate0 =
731 static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
732 const double kLocalityWeightRate1 =
733 static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
734 const double kErrorTolerance = 0.05;
735 const size_t kNumRpcs =
736 ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
737 // ADS response contains 2 localities, each of which contains 1 backend.
738 EdsResourceArgs args({
739 {"locality0", CreateEndpointsForBackends(0, 1), kLocalityWeight0},
740 {"locality1", CreateEndpointsForBackends(1, 2), kLocalityWeight1},
741 });
742 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
743 // Wait for both backends to be ready.
744 WaitForAllBackends(DEBUG_LOCATION, 0, 2);
745 // Send kNumRpcs RPCs.
746 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
747 // The locality picking rates should be roughly equal to the expectation.
748 const double locality_picked_rate_0 =
749 static_cast<double>(backends_[0]->backend_service()->request_count()) /
750 kNumRpcs;
751 const double locality_picked_rate_1 =
752 static_cast<double>(backends_[1]->backend_service()->request_count()) /
753 kNumRpcs;
754 EXPECT_THAT(locality_picked_rate_0,
755 ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
756 EXPECT_THAT(locality_picked_rate_1,
757 ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
758 }
759
760 // Tests that we don't suffer from integer overflow in locality weights.
TEST_P(EdsTest,NoIntegerOverflowInLocalityWeights)761 TEST_P(EdsTest, NoIntegerOverflowInLocalityWeights) {
762 CreateAndStartBackends(2);
763 const uint32_t kLocalityWeight1 = std::numeric_limits<uint32_t>::max() / 3;
764 const uint32_t kLocalityWeight0 =
765 std::numeric_limits<uint32_t>::max() - kLocalityWeight1;
766 const uint64_t kTotalLocalityWeight =
767 static_cast<uint64_t>(kLocalityWeight0) +
768 static_cast<uint64_t>(kLocalityWeight1);
769 const double kLocalityWeightRate0 =
770 static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
771 const double kLocalityWeightRate1 =
772 static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
773 const double kErrorTolerance = 0.05;
774 const size_t kNumRpcs =
775 ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
776 // ADS response contains 2 localities, each of which contains 1 backend.
777 EdsResourceArgs args({
778 {"locality0", CreateEndpointsForBackends(0, 1), kLocalityWeight0},
779 {"locality1", CreateEndpointsForBackends(1, 2), kLocalityWeight1},
780 });
781 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
782 // Wait for both backends to be ready.
783 WaitForAllBackends(DEBUG_LOCATION, 0, 2);
784 // Send kNumRpcs RPCs.
785 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
786 // The locality picking rates should be roughly equal to the expectation.
787 const double locality_picked_rate_0 =
788 static_cast<double>(backends_[0]->backend_service()->request_count()) /
789 kNumRpcs;
790 const double locality_picked_rate_1 =
791 static_cast<double>(backends_[1]->backend_service()->request_count()) /
792 kNumRpcs;
793 EXPECT_THAT(locality_picked_rate_0,
794 ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
795 EXPECT_THAT(locality_picked_rate_1,
796 ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
797 }
798
TEST_P(EdsTest,OneLocalityWithNoEndpoints)799 TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
800 CreateAndStartBackends(1);
801 // Initial EDS resource has one locality with no endpoints.
802 EdsResourceArgs::Locality empty_locality("locality0", {});
803 EdsResourceArgs args({std::move(empty_locality)});
804 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
805 // RPCs should fail.
806 constexpr char kErrorMessage[] =
807 "no children in weighted_target policy: "
808 "EDS resource eds_service_name contains empty localities: "
809 "\\[\\{region=\"xds_default_locality_region\", "
810 "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
811 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
812 // Send EDS resource that has an endpoint.
813 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
814 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
815 // RPCs should eventually succeed.
816 WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
817 if (!result.status.ok()) {
818 EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
819 EXPECT_THAT(result.status.error_message(),
820 ::testing::MatchesRegex(kErrorMessage));
821 }
822 });
823 }
824
825 // Tests that we correctly handle a locality containing no endpoints.
TEST_P(EdsTest,LocalityContainingNoEndpoints)826 TEST_P(EdsTest, LocalityContainingNoEndpoints) {
827 CreateAndStartBackends(2);
828 const size_t kNumRpcs = 5000;
829 // EDS response contains 2 localities, one with no endpoints.
830 EdsResourceArgs args({
831 {"locality0", CreateEndpointsForBackends()},
832 {"locality1", {}},
833 });
834 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
835 // Wait for both backends to be ready.
836 WaitForAllBackends(DEBUG_LOCATION);
837 // Send kNumRpcs RPCs.
838 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
839 // All traffic should go to the reachable locality.
840 EXPECT_EQ(backends_[0]->backend_service()->request_count(),
841 kNumRpcs / backends_.size());
842 EXPECT_EQ(backends_[1]->backend_service()->request_count(),
843 kNumRpcs / backends_.size());
844 }
845
846 // Tests that the locality map can work properly even when it contains a large
847 // number of localities.
TEST_P(EdsTest,ManyLocalitiesStressTest)848 TEST_P(EdsTest, ManyLocalitiesStressTest) {
849 const size_t kNumLocalities = 50;
850 CreateAndStartBackends(kNumLocalities + 1);
851 const uint32_t kRpcTimeoutMs = 5000;
852 // The first ADS response contains kNumLocalities localities, each of which
853 // contains its own backend.
854 EdsResourceArgs args;
855 for (size_t i = 0; i < kNumLocalities; ++i) {
856 std::string name = absl::StrCat("locality", i);
857 EdsResourceArgs::Locality locality(name,
858 CreateEndpointsForBackends(i, i + 1));
859 args.locality_list.emplace_back(std::move(locality));
860 }
861 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
862 // Wait until all backends are ready.
863 WaitForAllBackends(DEBUG_LOCATION, 0, kNumLocalities,
864 /*check_status=*/nullptr,
865 WaitForBackendOptions().set_reset_counters(false),
866 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
867 // The second ADS response contains 1 locality, which contains backend 50.
868 args =
869 EdsResourceArgs({{"locality0", CreateEndpointsForBackends(
870 kNumLocalities, kNumLocalities + 1)}});
871 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
872 // Wait until backend 50 is ready.
873 WaitForBackend(DEBUG_LOCATION, kNumLocalities);
874 }
875
876 // Tests that the localities in a locality map are picked correctly after
877 // update (addition, modification, deletion).
TEST_P(EdsTest,LocalityMapUpdateChurn)878 TEST_P(EdsTest, LocalityMapUpdateChurn) {
879 CreateAndStartBackends(4);
880 const size_t kNumRpcs = 3000;
881 // The locality weight for the first 3 localities.
882 const std::vector<int> kLocalityWeights0 = {2, 3, 4};
883 const double kTotalLocalityWeight0 =
884 std::accumulate(kLocalityWeights0.begin(), kLocalityWeights0.end(), 0);
885 std::vector<double> locality_weight_rate_0;
886 locality_weight_rate_0.reserve(kLocalityWeights0.size());
887 for (int weight : kLocalityWeights0) {
888 locality_weight_rate_0.push_back(weight / kTotalLocalityWeight0);
889 }
890 // Delete the first locality, keep the second locality, change the third
891 // locality's weight from 4 to 2, and add a new locality with weight 6.
892 const std::vector<int> kLocalityWeights1 = {3, 2, 6};
893 const double kTotalLocalityWeight1 =
894 std::accumulate(kLocalityWeights1.begin(), kLocalityWeights1.end(), 0);
895 std::vector<double> locality_weight_rate_1 = {
896 0 /* placeholder for locality 0 */};
897 for (int weight : kLocalityWeights1) {
898 locality_weight_rate_1.push_back(weight / kTotalLocalityWeight1);
899 }
900 EdsResourceArgs args({
901 {"locality0", CreateEndpointsForBackends(0, 1), 2},
902 {"locality1", CreateEndpointsForBackends(1, 2), 3},
903 {"locality2", CreateEndpointsForBackends(2, 3), 4},
904 });
905 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
906 // Wait for the first 3 backends to be ready.
907 WaitForAllBackends(DEBUG_LOCATION, 0, 3);
908 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
909 // Send kNumRpcs RPCs.
910 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
911 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
912 // The picking rates of the first 3 backends should be roughly equal to the
913 // expectation.
914 std::vector<double> locality_picked_rates;
915 for (size_t i = 0; i < 3; ++i) {
916 locality_picked_rates.push_back(
917 static_cast<double>(backends_[i]->backend_service()->request_count()) /
918 kNumRpcs);
919 }
920 const double kErrorTolerance = 0.2;
921 for (size_t i = 0; i < 3; ++i) {
922 gpr_log(GPR_INFO, "Locality %" PRIuPTR " rate %f", i,
923 locality_picked_rates[i]);
924 EXPECT_THAT(
925 locality_picked_rates[i],
926 ::testing::AllOf(
927 ::testing::Ge(locality_weight_rate_0[i] * (1 - kErrorTolerance)),
928 ::testing::Le(locality_weight_rate_0[i] * (1 + kErrorTolerance))));
929 }
930 args = EdsResourceArgs({
931 {"locality1", CreateEndpointsForBackends(1, 2), 3},
932 {"locality2", CreateEndpointsForBackends(2, 3), 2},
933 {"locality3", CreateEndpointsForBackends(3, 4), 6},
934 });
935 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
936 // Backend 3 hasn't received any request.
937 EXPECT_EQ(0U, backends_[3]->backend_service()->request_count());
938 // Wait until the locality update has been processed, as signaled by backend
939 // 3 receiving a request.
940 WaitForAllBackends(DEBUG_LOCATION, 3, 4);
941 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
942 // Send kNumRpcs RPCs.
943 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
944 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
945 // Backend 0 no longer receives any request.
946 EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
947 // The picking rates of the last 3 backends should be roughly equal to the
948 // expectation.
949 locality_picked_rates = {0 /* placeholder for backend 0 */};
950 for (size_t i = 1; i < 4; ++i) {
951 locality_picked_rates.push_back(
952 static_cast<double>(backends_[i]->backend_service()->request_count()) /
953 kNumRpcs);
954 }
955 for (size_t i = 1; i < 4; ++i) {
956 gpr_log(GPR_INFO, "Locality %" PRIuPTR " rate %f", i,
957 locality_picked_rates[i]);
958 EXPECT_THAT(
959 locality_picked_rates[i],
960 ::testing::AllOf(
961 ::testing::Ge(locality_weight_rate_1[i] * (1 - kErrorTolerance)),
962 ::testing::Le(locality_weight_rate_1[i] * (1 + kErrorTolerance))));
963 }
964 }
965
966 // Tests that we don't fail RPCs when replacing all of the localities in
967 // a given priority.
TEST_P(EdsTest,ReplaceAllLocalitiesInPriority)968 TEST_P(EdsTest, ReplaceAllLocalitiesInPriority) {
969 CreateAndStartBackends(2);
970 // Initial EDS update has backend 0.
971 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
972 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
973 // Wait for the first backend to be ready.
974 WaitForBackend(DEBUG_LOCATION, 0);
975 // Send EDS update that replaces the locality and switches to backend 1.
976 args = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}});
977 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
978 // When the client sees the update, RPCs should start going to backend 1.
979 // No RPCs should fail during this change.
980 WaitForBackend(DEBUG_LOCATION, 1);
981 }
982
TEST_P(EdsTest,ConsistentWeightedTargetUpdates)983 TEST_P(EdsTest, ConsistentWeightedTargetUpdates) {
984 CreateAndStartBackends(4);
985 // Initial update has two localities.
986 EdsResourceArgs args({
987 {"locality0", CreateEndpointsForBackends(1, 2)},
988 {"locality1", CreateEndpointsForBackends(2, 3)},
989 });
990 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
991 WaitForAllBackends(DEBUG_LOCATION, 1, 3);
992 // Next update removes locality1.
993 // Also add backend 0 to locality0, so that we can tell when the
994 // update has been seen.
995 args = EdsResourceArgs({
996 {"locality0", CreateEndpointsForBackends(0, 2)},
997 });
998 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
999 WaitForBackend(DEBUG_LOCATION, 0);
1000 // Next update re-adds locality1.
1001 // Also add backend 3 to locality1, so that we can tell when the
1002 // update has been seen.
1003 args = EdsResourceArgs({
1004 {"locality0", CreateEndpointsForBackends(0, 2)},
1005 {"locality1", CreateEndpointsForBackends(2, 4)},
1006 });
1007 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1008 WaitForBackend(DEBUG_LOCATION, 3);
1009 }
1010
1011 // Tests that RPCs are dropped according to the drop config.
TEST_P(EdsTest,Drops)1012 TEST_P(EdsTest, Drops) {
1013 CreateAndStartBackends(1);
1014 const uint32_t kDropPerMillionForLb = 100000;
1015 const uint32_t kDropPerMillionForThrottle = 200000;
1016 const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
1017 const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
1018 const double kDropRateForLbAndThrottle =
1019 kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
1020 const double kErrorTolerance = 0.05;
1021 const size_t kNumRpcs =
1022 ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
1023 // The ADS response contains two drop categories.
1024 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1025 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1026 {kThrottleDropType, kDropPerMillionForThrottle}};
1027 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1028 // Send kNumRpcs RPCs and count the drops.
1029 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1030 DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1031 kStatusMessageDropPrefix);
1032 // The drop rate should be roughly equal to the expectation.
1033 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1034 EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1035 kErrorTolerance));
1036 }
1037
1038 // Tests that drop config is converted correctly from per hundred.
TEST_P(EdsTest,DropPerHundred)1039 TEST_P(EdsTest, DropPerHundred) {
1040 CreateAndStartBackends(1);
1041 const uint32_t kDropPerHundredForLb = 10;
1042 const double kDropRateForLb = kDropPerHundredForLb / 100.0;
1043 const double kErrorTolerance = 0.05;
1044 const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1045 // The ADS response contains one drop category.
1046 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1047 args.drop_categories = {{kLbDropType, kDropPerHundredForLb}};
1048 args.drop_denominator = FractionalPercent::HUNDRED;
1049 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1050 // Send kNumRpcs RPCs and count the drops.
1051 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1052 DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1053 kStatusMessageDropPrefix);
1054 // The drop rate should be roughly equal to the expectation.
1055 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1056 EXPECT_THAT(seen_drop_rate,
1057 ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1058 }
1059
1060 // Tests that drop config is converted correctly from per ten thousand.
TEST_P(EdsTest,DropPerTenThousand)1061 TEST_P(EdsTest, DropPerTenThousand) {
1062 CreateAndStartBackends(1);
1063 const uint32_t kDropPerTenThousandForLb = 1000;
1064 const double kDropRateForLb = kDropPerTenThousandForLb / 10000.0;
1065 const double kErrorTolerance = 0.05;
1066 const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1067 // The ADS response contains one drop category.
1068 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1069 args.drop_categories = {{kLbDropType, kDropPerTenThousandForLb}};
1070 args.drop_denominator = FractionalPercent::TEN_THOUSAND;
1071 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1072 // Send kNumRpcs RPCs and count the drops.
1073 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1074 DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1075 kStatusMessageDropPrefix);
1076 // The drop rate should be roughly equal to the expectation.
1077 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1078 EXPECT_THAT(seen_drop_rate,
1079 ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1080 }
1081
1082 // Tests that drop is working correctly after update.
TEST_P(EdsTest,DropConfigUpdate)1083 TEST_P(EdsTest, DropConfigUpdate) {
1084 CreateAndStartBackends(1);
1085 const uint32_t kDropPerMillionForLb = 100000;
1086 const uint32_t kDropPerMillionForThrottle = 200000;
1087 const double kErrorTolerance = 0.05;
1088 const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
1089 const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
1090 const double kDropRateForLbAndThrottle =
1091 kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
1092 const size_t kNumRpcsLbOnly =
1093 ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1094 const size_t kNumRpcsBoth =
1095 ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
1096 // The first ADS response contains one drop category.
1097 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1098 args.drop_categories = {{kLbDropType, kDropPerMillionForLb}};
1099 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1100 // Send kNumRpcsLbOnly RPCs and count the drops.
1101 gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1102 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1103 DEBUG_LOCATION, kNumRpcsLbOnly, StatusCode::UNAVAILABLE,
1104 kStatusMessageDropPrefix);
1105 gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1106 // The drop rate should be roughly equal to the expectation.
1107 double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsLbOnly;
1108 gpr_log(GPR_INFO, "First batch drop rate %f", seen_drop_rate);
1109 EXPECT_THAT(seen_drop_rate,
1110 ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1111 // The second ADS response contains two drop categories, send an update EDS
1112 // response.
1113 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1114 {kThrottleDropType, kDropPerMillionForThrottle}};
1115 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1116 // Wait until the drop rate increases to the middle of the two configs,
1117 // which implies that the update has been in effect.
1118 const double kDropRateThreshold =
1119 (kDropRateForLb + kDropRateForLbAndThrottle) / 2;
1120 size_t num_rpcs = kNumRpcsBoth;
1121 SendRpcsUntil(
1122 DEBUG_LOCATION,
1123 [&](const RpcResult& result) {
1124 ++num_rpcs;
1125 if (result.status.ok()) {
1126 EXPECT_EQ(result.response.message(), kRequestMessage);
1127 } else {
1128 EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
1129 EXPECT_THAT(result.status.error_message(),
1130 ::testing::StartsWith(kStatusMessageDropPrefix));
1131 ++num_drops;
1132 }
1133 seen_drop_rate = static_cast<double>(num_drops) / num_rpcs;
1134 return seen_drop_rate < kDropRateThreshold;
1135 },
1136 /*timeout_ms=*/40000);
1137 // Send kNumRpcsBoth RPCs and count the drops.
1138 gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1139 num_drops = SendRpcsAndCountFailuresWithMessage(DEBUG_LOCATION, kNumRpcsBoth,
1140 StatusCode::UNAVAILABLE,
1141 kStatusMessageDropPrefix);
1142 gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1143 // The new drop rate should be roughly equal to the expectation.
1144 seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsBoth;
1145 gpr_log(GPR_INFO, "Second batch drop rate %f", seen_drop_rate);
1146 EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1147 kErrorTolerance));
1148 }
1149
1150 // Tests that all the RPCs are dropped if any drop category drops 100%.
TEST_P(EdsTest,DropAll)1151 TEST_P(EdsTest, DropAll) {
1152 const size_t kNumRpcs = 1000;
1153 const uint32_t kDropPerMillionForLb = 100000;
1154 const uint32_t kDropPerMillionForThrottle = 1000000;
1155 // The ADS response contains two drop categories.
1156 EdsResourceArgs args;
1157 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1158 {kThrottleDropType, kDropPerMillionForThrottle}};
1159 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1160 // Send kNumRpcs RPCs and all of them are dropped.
1161 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1162 DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1163 kStatusMessageDropPrefix);
1164 EXPECT_EQ(num_drops, kNumRpcs);
1165 }
1166
1167 //
1168 // EDS failover tests
1169 //
1170
1171 class FailoverTest : public XdsEnd2endTest {
1172 public:
SetUp()1173 void SetUp() override {
1174 XdsEnd2endTest::SetUp();
1175 ResetStub(/*failover_timeout_ms=*/500);
1176 }
1177 };
1178
1179 INSTANTIATE_TEST_SUITE_P(
1180 XdsTest, FailoverTest,
1181 ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
1182 &XdsTestType::Name);
1183
1184 // Localities with the highest priority are used when multiple priority exist.
TEST_P(FailoverTest,ChooseHighestPriority)1185 TEST_P(FailoverTest, ChooseHighestPriority) {
1186 CreateAndStartBackends(4);
1187 EdsResourceArgs args({
1188 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1189 1},
1190 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1191 2},
1192 {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1193 3},
1194 {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1195 0},
1196 });
1197 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1198 WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1199 WaitForBackendOptions().set_reset_counters(false));
1200 for (size_t i = 0; i < 3; ++i) {
1201 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1202 }
1203 }
1204
1205 // Does not choose priority with no endpoints.
TEST_P(FailoverTest,DoesNotUsePriorityWithNoEndpoints)1206 TEST_P(FailoverTest, DoesNotUsePriorityWithNoEndpoints) {
1207 CreateAndStartBackends(3);
1208 EdsResourceArgs args({
1209 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1210 1},
1211 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1212 2},
1213 {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1214 3},
1215 {"locality3", {}, kDefaultLocalityWeight, 0},
1216 });
1217 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1218 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1219 WaitForBackendOptions().set_reset_counters(false));
1220 for (size_t i = 1; i < 3; ++i) {
1221 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1222 }
1223 }
1224
1225 // Does not choose locality with no endpoints.
TEST_P(FailoverTest,DoesNotUseLocalityWithNoEndpoints)1226 TEST_P(FailoverTest, DoesNotUseLocalityWithNoEndpoints) {
1227 CreateAndStartBackends(1);
1228 EdsResourceArgs args({
1229 {"locality0", {}, kDefaultLocalityWeight, 0},
1230 {"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 0},
1231 });
1232 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1233 // Wait for all backends to be used.
1234 WaitForAllBackends(DEBUG_LOCATION);
1235 }
1236
1237 // If the higher priority localities are not reachable, failover to the
1238 // highest priority among the rest.
TEST_P(FailoverTest,Failover)1239 TEST_P(FailoverTest, Failover) {
1240 CreateAndStartBackends(2);
1241 EdsResourceArgs args({
1242 {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
1243 {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1244 2},
1245 {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1246 3},
1247 {"locality3", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
1248 });
1249 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1250 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1251 WaitForBackendOptions().set_reset_counters(false));
1252 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
1253 }
1254
1255 // Reports CONNECTING when failing over to a lower priority.
TEST_P(FailoverTest,ReportsConnectingDuringFailover)1256 TEST_P(FailoverTest, ReportsConnectingDuringFailover) {
1257 CreateAndStartBackends(1);
1258 // Priority 0 will be unreachable, so we'll use priority 1.
1259 EdsResourceArgs args({
1260 {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
1261 {"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 1},
1262 });
1263 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1264 ConnectionAttemptInjector injector;
1265 auto hold = injector.AddHold(backends_[0]->port());
1266 // Start an RPC in the background, which should cause the channel to
1267 // try to connect.
1268 LongRunningRpc rpc;
1269 rpc.StartRpc(stub_.get(), RpcOptions());
1270 // Wait for connection attempt to start to the backend.
1271 hold->Wait();
1272 // Channel state should be CONNECTING here, and any RPC should be
1273 // queued.
1274 EXPECT_EQ(channel_->GetState(false), GRPC_CHANNEL_CONNECTING);
1275 // Allow the connection attempt to complete.
1276 hold->Resume();
1277 // Now the RPC should complete successfully.
1278 gpr_log(GPR_INFO, "=== WAITING FOR RPC TO FINISH ===");
1279 Status status = rpc.GetStatus();
1280 gpr_log(GPR_INFO, "=== RPC FINISHED ===");
1281 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1282 << " message=" << status.error_message();
1283 }
1284
1285 // If a locality with higher priority than the current one becomes ready,
1286 // switch to it.
TEST_P(FailoverTest,SwitchBackToHigherPriority)1287 TEST_P(FailoverTest, SwitchBackToHigherPriority) {
1288 CreateAndStartBackends(4);
1289 const size_t kNumRpcs = 100;
1290 EdsResourceArgs args({
1291 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1292 1},
1293 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1294 2},
1295 {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1296 3},
1297 {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1298 0},
1299 });
1300 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1301 WaitForBackend(DEBUG_LOCATION, 3);
1302 backends_[3]->StopListeningAndSendGoaways();
1303 backends_[0]->StopListeningAndSendGoaways();
1304 WaitForBackend(DEBUG_LOCATION, 1);
1305 ShutdownBackend(0);
1306 StartBackend(0);
1307 WaitForBackend(DEBUG_LOCATION, 0);
1308 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
1309 EXPECT_EQ(kNumRpcs, backends_[0]->backend_service()->request_count());
1310 }
1311
1312 // The first update only contains unavailable priorities. The second update
1313 // contains available priorities.
TEST_P(FailoverTest,UpdateInitialUnavailable)1314 TEST_P(FailoverTest, UpdateInitialUnavailable) {
1315 CreateAndStartBackends(2);
1316 EdsResourceArgs args({
1317 {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
1318 {"locality1", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
1319 });
1320 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1321 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1322 MakeConnectionFailureRegex(
1323 "connections to all backends failing; last error: "));
1324 args = EdsResourceArgs({
1325 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1326 0},
1327 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1328 1},
1329 });
1330 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1331 WaitForBackend(DEBUG_LOCATION, 0, [&](const RpcResult& result) {
1332 if (!result.status.ok()) {
1333 EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
1334 EXPECT_THAT(result.status.error_message(),
1335 ::testing::MatchesRegex(MakeConnectionFailureRegex(
1336 "connections to all backends failing; last error: ")));
1337 }
1338 });
1339 }
1340
1341 // Tests that after the localities' priorities are updated, we still choose
1342 // the highest READY priority with the updated localities.
TEST_P(FailoverTest,UpdatePriority)1343 TEST_P(FailoverTest, UpdatePriority) {
1344 CreateAndStartBackends(4);
1345 const size_t kNumRpcs = 100;
1346 EdsResourceArgs args({
1347 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1348 1},
1349 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1350 2},
1351 {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1352 3},
1353 {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1354 0},
1355 });
1356 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1357 WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1358 WaitForBackendOptions().set_reset_counters(false));
1359 EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
1360 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
1361 EXPECT_EQ(0U, backends_[2]->backend_service()->request_count());
1362 args = EdsResourceArgs({
1363 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1364 2},
1365 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1366 0},
1367 {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1368 1},
1369 {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1370 3},
1371 });
1372 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1373 WaitForBackend(DEBUG_LOCATION, 1);
1374 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
1375 EXPECT_EQ(kNumRpcs, backends_[1]->backend_service()->request_count());
1376 }
1377
1378 // Moves all localities in the current priority to a higher priority.
TEST_P(FailoverTest,MoveAllLocalitiesInCurrentPriorityToHigherPriority)1379 TEST_P(FailoverTest, MoveAllLocalitiesInCurrentPriorityToHigherPriority) {
1380 CreateAndStartBackends(3);
1381 auto non_existant_endpoint = MakeNonExistantEndpoint();
1382 // First update:
1383 // - Priority 0 is locality 0, containing an unreachable backend.
1384 // - Priority 1 is locality 1, containing backends 0 and 1.
1385 EdsResourceArgs args({
1386 {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1387 {"locality1", CreateEndpointsForBackends(0, 2), kDefaultLocalityWeight,
1388 1},
1389 });
1390 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1391 // When we get the first update, all backends in priority 0 are down,
1392 // so we will create priority 1. Backends 0 and 1 should have traffic,
1393 // but backend 2 should not.
1394 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
1395 WaitForBackendOptions().set_reset_counters(false));
1396 EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
1397 // Second update:
1398 // - Priority 0 contains both localities 0 and 1.
1399 // - Priority 1 is not present.
1400 // - We add backend 2 to locality 1, just so we have a way to know
1401 // when the update has been seen by the client.
1402 args = EdsResourceArgs({
1403 {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1404 {"locality1", CreateEndpointsForBackends(0, 3), kDefaultLocalityWeight,
1405 0},
1406 });
1407 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1408 // When backend 2 gets traffic, we know the second update has been seen.
1409 WaitForBackend(DEBUG_LOCATION, 2);
1410 // The xDS server got at least 1 response.
1411 EXPECT_TRUE(balancer_->ads_service()->eds_response_state().has_value());
1412 }
1413
1414 // This tests a bug triggered by the xds_cluster_resolver policy reusing
1415 // a child name for the priority policy when that child name was still
1416 // present but deactivated.
TEST_P(FailoverTest,PriorityChildNameChurn)1417 TEST_P(FailoverTest, PriorityChildNameChurn) {
1418 CreateAndStartBackends(4);
1419 auto non_existant_endpoint = MakeNonExistantEndpoint();
1420 // Initial update:
1421 // - P0:locality0, child number 0 (unreachable)
1422 // - P1:locality1, child number 1
1423 // - P2:locality2, child number 2
1424 EdsResourceArgs args({
1425 {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1426 {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1427 1},
1428 {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1429 2},
1430 });
1431 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1432 WaitForBackend(DEBUG_LOCATION, 0);
1433 // Next update:
1434 // - P0:locality0, child number 0 (still unreachable)
1435 // - P1:locality2, child number 2 (moved from P2 to P1)
1436 // - P2:locality3, child number 3 (new child)
1437 // Child number 1 will be deactivated.
1438 args = EdsResourceArgs({
1439 {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1440 {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1441 1},
1442 {"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1443 2},
1444 });
1445 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1446 WaitForBackend(DEBUG_LOCATION, 1);
1447 // Next update:
1448 // - P0:locality0, child number 0 (still unreachable)
1449 // - P1:locality4, child number 4 (new child number -- should not reuse #1)
1450 // - P2:locality3, child number 3
1451 // Child number 1 will be deactivated.
1452 args = EdsResourceArgs({
1453 {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1454 {"locality4", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1455 1},
1456 {"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1457 2},
1458 });
1459 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1460 WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1461 WaitForBackendOptions().set_reset_counters(false));
1462 // P2 should not have gotten any traffic in this change.
1463 EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
1464 }
1465
1466 //
1467 // EDS client load reporting tests
1468 //
1469
1470 using ClientLoadReportingTest = XdsEnd2endTest;
1471
1472 INSTANTIATE_TEST_SUITE_P(
1473 XdsTest, ClientLoadReportingTest,
1474 ::testing::Values(XdsTestType().set_enable_load_reporting()),
1475 &XdsTestType::Name);
1476
1477 MATCHER_P2(LoadMetricEq, num_requests_finished_with_metric, total_metric_value,
1478 "equals LoadMetric") {
1479 bool match = true;
1480 match &= ::testing::ExplainMatchResult(num_requests_finished_with_metric,
1481 arg.num_requests_finished_with_metric,
1482 result_listener);
1483 match &=
1484 ::testing::ExplainMatchResult(::testing::DoubleEq(total_metric_value),
1485 arg.total_metric_value, result_listener);
1486 return match;
1487 }
1488
1489 // Tests that the load report received at the balancer is correct.
TEST_P(ClientLoadReportingTest,Vanilla)1490 TEST_P(ClientLoadReportingTest, Vanilla) {
1491 CreateAndStartBackends(4);
1492 const size_t kNumRpcsPerAddress = 10;
1493 const size_t kNumFailuresPerAddress = 3;
1494 EdsResourceArgs args({
1495 {"locality0", CreateEndpointsForBackends(0, 2)},
1496 {"locality1", CreateEndpointsForBackends(2, 4)},
1497 });
1498 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1499 // Wait until all backends are ready.
1500 size_t num_warmup_rpcs =
1501 WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
1502 WaitForBackendOptions().set_reset_counters(false));
1503 // Send kNumRpcsPerAddress RPCs per server with named metrics.
1504 xds::data::orca::v3::OrcaLoadReport backend_metrics;
1505 auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1506 named_metrics["foo"] = 1.0;
1507 named_metrics["bar"] = 2.0;
1508 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1509 RpcOptions().set_backend_metrics(backend_metrics));
1510 named_metrics["foo"] = 0.3;
1511 named_metrics["bar"] = 0.4;
1512 for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1513 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1514 RpcOptions().set_server_fail(true).set_backend_metrics(
1515 backend_metrics));
1516 }
1517 const size_t total_successful_rpcs_sent =
1518 (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
1519 const size_t total_failed_rpcs_sent =
1520 kNumFailuresPerAddress * backends_.size();
1521 // Check that the backends got the right number of requests.
1522 size_t total_rpcs_sent = 0;
1523 for (const auto& backend : backends_) {
1524 total_rpcs_sent += backend->backend_service()->request_count();
1525 }
1526 EXPECT_EQ(total_rpcs_sent,
1527 total_successful_rpcs_sent + total_failed_rpcs_sent);
1528 // The load report received at the balancer should be correct.
1529 std::vector<ClientStats> load_report =
1530 balancer_->lrs_service()->WaitForLoadReport();
1531 ASSERT_EQ(load_report.size(), 1UL);
1532 ClientStats& client_stats = load_report.front();
1533 EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
1534 EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
1535 EXPECT_EQ(total_successful_rpcs_sent,
1536 client_stats.total_successful_requests());
1537 EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1538 EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
1539 EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
1540 EXPECT_EQ(0U, client_stats.total_dropped_requests());
1541 ASSERT_THAT(
1542 client_stats.locality_stats(),
1543 ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
1544 ::testing::Pair("locality1", ::testing::_)));
1545 size_t num_successful_rpcs = 0;
1546 size_t num_failed_rpcs = 0;
1547 std::map<std::string, ClientStats::LocalityStats::LoadMetric>
1548 named_metrics_total;
1549 for (const auto& p : client_stats.locality_stats()) {
1550 EXPECT_EQ(p.second.total_requests_in_progress, 0U);
1551 EXPECT_EQ(
1552 p.second.total_issued_requests,
1553 p.second.total_successful_requests + p.second.total_error_requests);
1554 num_successful_rpcs += p.second.total_successful_requests;
1555 num_failed_rpcs += p.second.total_error_requests;
1556 for (const auto& s : p.second.load_metrics) {
1557 named_metrics_total[s.first] += s.second;
1558 }
1559 }
1560 EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
1561 EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
1562 EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
1563 EXPECT_THAT(
1564 named_metrics_total,
1565 ::testing::UnorderedElementsAre(
1566 ::testing::Pair(
1567 "foo",
1568 LoadMetricEq(
1569 (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1570 backends_.size(),
1571 (kNumRpcsPerAddress * backends_.size()) * 1.0 +
1572 (kNumFailuresPerAddress * backends_.size()) * 0.3)),
1573 ::testing::Pair(
1574 "bar",
1575 LoadMetricEq(
1576 (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1577 backends_.size(),
1578 (kNumRpcsPerAddress * backends_.size()) * 2.0 +
1579 (kNumFailuresPerAddress * backends_.size()) * 0.4))));
1580 // The LRS service got a single request, and sent a single response.
1581 EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1582 EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1583 }
1584
1585 // Tests send_all_clusters.
TEST_P(ClientLoadReportingTest,SendAllClusters)1586 TEST_P(ClientLoadReportingTest, SendAllClusters) {
1587 CreateAndStartBackends(2);
1588 balancer_->lrs_service()->set_send_all_clusters(true);
1589 const size_t kNumRpcsPerAddress = 10;
1590 const size_t kNumFailuresPerAddress = 3;
1591 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1592 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1593 // Wait until all backends are ready.
1594 size_t num_warmup_rpcs = WaitForAllBackends(DEBUG_LOCATION);
1595 // Send kNumRpcsPerAddress RPCs per server.
1596 xds::data::orca::v3::OrcaLoadReport backend_metrics;
1597 auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1598 named_metrics["foo"] = 1.0;
1599 named_metrics["bar"] = 2.0;
1600 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1601 RpcOptions().set_backend_metrics(backend_metrics));
1602 named_metrics["foo"] = 0.3;
1603 named_metrics["bar"] = 0.4;
1604 for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1605 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1606 RpcOptions().set_server_fail(true).set_backend_metrics(
1607 backend_metrics));
1608 }
1609 // Check that each backend got the right number of requests.
1610 for (size_t i = 0; i < backends_.size(); ++i) {
1611 EXPECT_EQ(kNumRpcsPerAddress + kNumFailuresPerAddress,
1612 backends_[i]->backend_service()->request_count());
1613 }
1614 // The load report received at the balancer should be correct.
1615 std::vector<ClientStats> load_report =
1616 balancer_->lrs_service()->WaitForLoadReport();
1617 ASSERT_EQ(load_report.size(), 1UL);
1618 ClientStats& client_stats = load_report.front();
1619 EXPECT_EQ(kNumRpcsPerAddress * backends_.size() + num_warmup_rpcs,
1620 client_stats.total_successful_requests());
1621 EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1622 EXPECT_EQ((kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size() +
1623 num_warmup_rpcs,
1624 client_stats.total_issued_requests());
1625 EXPECT_EQ(kNumFailuresPerAddress * backends_.size(),
1626 client_stats.total_error_requests());
1627 EXPECT_EQ(0U, client_stats.total_dropped_requests());
1628 EXPECT_THAT(
1629 client_stats.locality_stats(),
1630 ::testing::ElementsAre(::testing::Pair(
1631 "locality0",
1632 ::testing::Field(
1633 &ClientStats::LocalityStats::load_metrics,
1634 ::testing::UnorderedElementsAre(
1635 ::testing::Pair(
1636 "foo",
1637 LoadMetricEq(
1638 (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1639 backends_.size(),
1640 (kNumRpcsPerAddress * backends_.size()) * 1.0 +
1641 (kNumFailuresPerAddress * backends_.size()) *
1642 0.3)),
1643 ::testing::Pair(
1644 "bar",
1645 LoadMetricEq(
1646 (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1647 backends_.size(),
1648 (kNumRpcsPerAddress * backends_.size()) * 2.0 +
1649 (kNumFailuresPerAddress * backends_.size()) *
1650 0.4)))))));
1651 // The LRS service got a single request, and sent a single response.
1652 EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1653 EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1654 }
1655
1656 // Tests that we don't include stats for clusters that are not requested
1657 // by the LRS server.
TEST_P(ClientLoadReportingTest,HonorsClustersRequestedByLrsServer)1658 TEST_P(ClientLoadReportingTest, HonorsClustersRequestedByLrsServer) {
1659 CreateAndStartBackends(1);
1660 balancer_->lrs_service()->set_cluster_names({"bogus"});
1661 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1662 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1663 // Wait until all backends are ready.
1664 WaitForAllBackends(DEBUG_LOCATION);
1665 // The load report received at the balancer should be correct.
1666 std::vector<ClientStats> load_report =
1667 balancer_->lrs_service()->WaitForLoadReport();
1668 ASSERT_EQ(load_report.size(), 0UL);
1669 // The LRS service got a single request, and sent a single response.
1670 EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1671 EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1672 }
1673
1674 // Tests that if the balancer restarts, the client load report contains the
1675 // stats before and after the restart correctly.
TEST_P(ClientLoadReportingTest,BalancerRestart)1676 TEST_P(ClientLoadReportingTest, BalancerRestart) {
1677 CreateAndStartBackends(4);
1678 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 2)}});
1679 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1680 // Wait until all backends returned by the balancer are ready.
1681 size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
1682 std::vector<ClientStats> load_report =
1683 balancer_->lrs_service()->WaitForLoadReport();
1684 ASSERT_EQ(load_report.size(), 1UL);
1685 ClientStats client_stats = std::move(load_report.front());
1686 EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
1687 EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1688 EXPECT_EQ(0U, client_stats.total_error_requests());
1689 EXPECT_EQ(0U, client_stats.total_dropped_requests());
1690 EXPECT_THAT(client_stats.locality_stats(),
1691 ::testing::ElementsAre(::testing::Pair(
1692 "locality0",
1693 ::testing::Field(&ClientStats::LocalityStats::load_metrics,
1694 ::testing::IsEmpty()))));
1695 // Shut down the balancer.
1696 balancer_->Shutdown();
1697 // We should continue using the last EDS response we received from the
1698 // balancer before it was shut down.
1699 // Note: We need to use WaitForAllBackends() here instead of just
1700 // CheckRpcSendOk(kNumBackendsFirstPass), because when the balancer
1701 // shuts down, the XdsClient will generate an error to the
1702 // ListenerWatcher, which will cause the xds resolver to send a
1703 // no-op update to the LB policy. When this update gets down to the
1704 // round_robin child policy for the locality, it will generate a new
1705 // subchannel list, which resets the start index randomly. So we need
1706 // to be a little more permissive here to avoid spurious failures.
1707 ResetBackendCounters();
1708 num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
1709 // Now restart the balancer, this time pointing to the new backends.
1710 balancer_->Start();
1711 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(2, 4)}});
1712 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1713 // Wait for queries to start going to one of the new backends.
1714 // This tells us that we're now using the new serverlist.
1715 num_rpcs += WaitForAllBackends(DEBUG_LOCATION, 2, 4);
1716 // Send one RPC per backend.
1717 xds::data::orca::v3::OrcaLoadReport backend_metrics;
1718 auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1719 named_metrics["foo"] = 1.0;
1720 named_metrics["bar"] = 2.0;
1721 CheckRpcSendOk(DEBUG_LOCATION, 2,
1722 RpcOptions().set_backend_metrics(backend_metrics));
1723 num_rpcs += 2;
1724 // Check client stats.
1725 load_report = balancer_->lrs_service()->WaitForLoadReport();
1726 ASSERT_EQ(load_report.size(), 1UL);
1727 client_stats = std::move(load_report.front());
1728 EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
1729 EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1730 EXPECT_EQ(0U, client_stats.total_error_requests());
1731 EXPECT_EQ(0U, client_stats.total_dropped_requests());
1732 EXPECT_THAT(client_stats.locality_stats(),
1733 ::testing::ElementsAre(::testing::Pair(
1734 "locality0",
1735 ::testing::Field(
1736 &ClientStats::LocalityStats::load_metrics,
1737 ::testing::UnorderedElementsAre(
1738 ::testing::Pair("foo", LoadMetricEq(2, 2.0)),
1739 ::testing::Pair("bar", LoadMetricEq(2, 4.0)))))));
1740 }
1741
1742 // Tests load reporting when switching over from one cluster to another.
TEST_P(ClientLoadReportingTest,ChangeClusters)1743 TEST_P(ClientLoadReportingTest, ChangeClusters) {
1744 CreateAndStartBackends(4);
1745 const char* kNewClusterName = "new_cluster_name";
1746 const char* kNewEdsServiceName = "new_eds_service_name";
1747 balancer_->lrs_service()->set_cluster_names(
1748 {kDefaultClusterName, kNewClusterName});
1749 // cluster kDefaultClusterName -> locality0 -> backends 0 and 1
1750 EdsResourceArgs args({
1751 {"locality0", CreateEndpointsForBackends(0, 2)},
1752 });
1753 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1754 // cluster kNewClusterName -> locality1 -> backends 2 and 3
1755 EdsResourceArgs args2({
1756 {"locality1", CreateEndpointsForBackends(2, 4)},
1757 });
1758 balancer_->ads_service()->SetEdsResource(
1759 BuildEdsResource(args2, kNewEdsServiceName));
1760 // CDS resource for kNewClusterName.
1761 Cluster new_cluster = default_cluster_;
1762 new_cluster.set_name(kNewClusterName);
1763 new_cluster.mutable_eds_cluster_config()->set_service_name(
1764 kNewEdsServiceName);
1765 balancer_->ads_service()->SetCdsResource(new_cluster);
1766 // Wait for all backends to come online.
1767 size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
1768 // The load report received at the balancer should be correct.
1769 std::vector<ClientStats> load_report =
1770 balancer_->lrs_service()->WaitForLoadReport();
1771 EXPECT_THAT(
1772 load_report,
1773 ::testing::ElementsAre(::testing::AllOf(
1774 ::testing::Property(&ClientStats::cluster_name, kDefaultClusterName),
1775 ::testing::Property(&ClientStats::eds_service_name,
1776 kDefaultEdsServiceName),
1777 ::testing::Property(
1778 &ClientStats::locality_stats,
1779 ::testing::ElementsAre(::testing::Pair(
1780 "locality0",
1781 ::testing::AllOf(
1782 ::testing::Field(&ClientStats::LocalityStats::
1783 total_successful_requests,
1784 num_rpcs),
1785 ::testing::Field(&ClientStats::LocalityStats::
1786 total_requests_in_progress,
1787 0UL),
1788 ::testing::Field(
1789 &ClientStats::LocalityStats::total_error_requests,
1790 0UL),
1791 ::testing::Field(
1792 &ClientStats::LocalityStats::total_issued_requests,
1793 num_rpcs),
1794 ::testing::Field(
1795 &ClientStats::LocalityStats::load_metrics,
1796 ::testing::IsEmpty()))))),
1797 ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
1798 // Change RDS resource to point to new cluster.
1799 RouteConfiguration new_route_config = default_route_config_;
1800 new_route_config.mutable_virtual_hosts(0)
1801 ->mutable_routes(0)
1802 ->mutable_route()
1803 ->set_cluster(kNewClusterName);
1804 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1805 new_route_config);
1806 // Wait for all new backends to be used.
1807 num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 2, 4);
1808 // The load report received at the balancer should be correct.
1809 load_report = balancer_->lrs_service()->WaitForLoadReport();
1810 EXPECT_THAT(
1811 load_report,
1812 ::testing::ElementsAre(
1813 ::testing::AllOf(
1814 ::testing::Property(&ClientStats::cluster_name,
1815 kDefaultClusterName),
1816 ::testing::Property(&ClientStats::eds_service_name,
1817 kDefaultEdsServiceName),
1818 ::testing::Property(
1819 &ClientStats::locality_stats,
1820 ::testing::ElementsAre(::testing::Pair(
1821 "locality0",
1822 ::testing::AllOf(
1823 ::testing::Field(&ClientStats::LocalityStats::
1824 total_successful_requests,
1825 ::testing::Lt(num_rpcs)),
1826 ::testing::Field(&ClientStats::LocalityStats::
1827 total_requests_in_progress,
1828 0UL),
1829 ::testing::Field(
1830 &ClientStats::LocalityStats::total_error_requests,
1831 0UL),
1832 ::testing::Field(&ClientStats::LocalityStats::
1833 total_issued_requests,
1834 ::testing::Le(num_rpcs)),
1835 ::testing::Field(
1836 &ClientStats::LocalityStats::load_metrics,
1837 ::testing::IsEmpty()))))),
1838 ::testing::Property(&ClientStats::total_dropped_requests, 0UL)),
1839 ::testing::AllOf(
1840 ::testing::Property(&ClientStats::cluster_name, kNewClusterName),
1841 ::testing::Property(&ClientStats::eds_service_name,
1842 kNewEdsServiceName),
1843 ::testing::Property(
1844 &ClientStats::locality_stats,
1845 ::testing::ElementsAre(::testing::Pair(
1846 "locality1",
1847 ::testing::AllOf(
1848 ::testing::Field(&ClientStats::LocalityStats::
1849 total_successful_requests,
1850 ::testing::Le(num_rpcs)),
1851 ::testing::Field(&ClientStats::LocalityStats::
1852 total_requests_in_progress,
1853 0UL),
1854 ::testing::Field(
1855 &ClientStats::LocalityStats::total_error_requests,
1856 0UL),
1857 ::testing::Field(&ClientStats::LocalityStats::
1858 total_issued_requests,
1859 ::testing::Le(num_rpcs)),
1860 ::testing::Field(
1861 &ClientStats::LocalityStats::load_metrics,
1862 ::testing::IsEmpty()))))),
1863 ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
1864 size_t total_ok = 0;
1865 for (const ClientStats& client_stats : load_report) {
1866 total_ok += client_stats.total_successful_requests();
1867 }
1868 EXPECT_EQ(total_ok, num_rpcs);
1869 // The LRS service got a single request, and sent a single response.
1870 EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1871 EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1872 }
1873
1874 // Tests that the drop stats are correctly reported by client load reporting.
TEST_P(ClientLoadReportingTest,DropStats)1875 TEST_P(ClientLoadReportingTest, DropStats) {
1876 CreateAndStartBackends(1);
1877 const uint32_t kDropPerMillionForLb = 100000;
1878 const uint32_t kDropPerMillionForThrottle = 200000;
1879 const double kErrorTolerance = 0.05;
1880 const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
1881 const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
1882 const double kDropRateForLbAndThrottle =
1883 kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
1884 const size_t kNumRpcs =
1885 ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
1886 // The ADS response contains two drop categories.
1887 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1888 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1889 {kThrottleDropType, kDropPerMillionForThrottle}};
1890 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1891 // Send kNumRpcs RPCs and count the drops.
1892 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1893 DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1894 kStatusMessageDropPrefix);
1895 // The drop rate should be roughly equal to the expectation.
1896 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1897 EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1898 kErrorTolerance));
1899 // Check client stats.
1900 ClientStats client_stats;
1901 do {
1902 std::vector<ClientStats> load_reports =
1903 balancer_->lrs_service()->WaitForLoadReport();
1904 for (const auto& load_report : load_reports) {
1905 client_stats += load_report;
1906 }
1907 } while (client_stats.total_issued_requests() +
1908 client_stats.total_dropped_requests() <
1909 kNumRpcs);
1910 EXPECT_EQ(num_drops, client_stats.total_dropped_requests());
1911 EXPECT_THAT(static_cast<double>(client_stats.dropped_requests(kLbDropType)) /
1912 kNumRpcs,
1913 ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1914 EXPECT_THAT(
1915 static_cast<double>(client_stats.dropped_requests(kThrottleDropType)) /
1916 (kNumRpcs * (1 - kDropRateForLb)),
1917 ::testing::DoubleNear(kDropRateForThrottle, kErrorTolerance));
1918 }
1919
1920 } // namespace
1921 } // namespace testing
1922 } // namespace grpc
1923
main(int argc,char ** argv)1924 int main(int argc, char** argv) {
1925 grpc::testing::TestEnvironment env(&argc, argv);
1926 ::testing::InitGoogleTest(&argc, argv);
1927 // Make the backup poller poll very frequently in order to pick up
1928 // updates from all the subchannels's FDs.
1929 grpc_core::ConfigVars::Overrides overrides;
1930 overrides.client_channel_backup_poll_interval_ms = 1;
1931 grpc_core::ConfigVars::SetOverrides(overrides);
1932 #if TARGET_OS_IPHONE
1933 // Workaround Apple CFStream bug
1934 grpc_core::SetEnv("grpc_cfstream", "0");
1935 #endif
1936 grpc_core::RegisterFakeStatsPlugin();
1937 grpc_init();
1938 grpc::testing::ConnectionAttemptInjector::Init();
1939 const auto result = RUN_ALL_TESTS();
1940 grpc_shutdown();
1941 return result;
1942 }
1943