xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/xds/xds_cluster_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include <numeric>
17 #include <string>
18 #include <vector>
19 
20 #include <gmock/gmock.h>
21 #include <gtest/gtest.h>
22 
23 #include "absl/strings/match.h"
24 #include "absl/strings/str_cat.h"
25 
26 #include "src/core/client_channel/backup_poller.h"
27 #include "src/core/lib/address_utils/sockaddr_utils.h"
28 #include "src/core/lib/channel/call_tracer.h"
29 #include "src/core/lib/config/config_vars.h"
30 #include "src/core/lib/surface/call.h"
31 #include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
32 #include "test/core/util/fake_stats_plugin.h"
33 #include "test/core/util/scoped_env_var.h"
34 #include "test/cpp/end2end/connection_attempt_injector.h"
35 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
36 
37 namespace grpc {
38 namespace testing {
39 namespace {
40 
41 using ::envoy::config::cluster::v3::CircuitBreakers;
42 using ::envoy::config::cluster::v3::RoutingPriority;
43 using ::envoy::config::core::v3::HealthStatus;
44 using ::envoy::type::v3::FractionalPercent;
45 
46 using ClientStats = LrsServiceImpl::ClientStats;
47 using OptionalLabelKey =
48     grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey;
49 
50 constexpr char kLbDropType[] = "lb";
51 constexpr char kThrottleDropType[] = "throttle";
52 constexpr char kStatusMessageDropPrefix[] = "EDS-configured drop: ";
53 
54 //
55 // CDS tests
56 //
57 
58 using CdsTest = XdsEnd2endTest;
59 
60 INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest, ::testing::Values(XdsTestType()),
61                          &XdsTestType::Name);
62 
63 // Tests that CDS client should send an ACK upon correct CDS response.
TEST_P(CdsTest,Vanilla)64 TEST_P(CdsTest, Vanilla) {
65   (void)SendRpc();
66   auto response_state = balancer_->ads_service()->cds_response_state();
67   ASSERT_TRUE(response_state.has_value());
68   EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
69 }
70 
71 // Testing just one example of an invalid resource here.
72 // Unit tests for XdsClusterResourceType have exhaustive tests for all
73 // of the invalid cases.
TEST_P(CdsTest,InvalidClusterResource)74 TEST_P(CdsTest, InvalidClusterResource) {
75   auto cluster = default_cluster_;
76   cluster.set_type(Cluster::STATIC);
77   balancer_->ads_service()->SetCdsResource(cluster);
78   const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
79   ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
80   EXPECT_EQ(response_state->error_message,
81             "xDS response validation errors: ["
82             "resource index 0: cluster_name: "
83             "INVALID_ARGUMENT: errors validating Cluster resource: ["
84             "field:type error:unknown discovery type]]");
85 }
86 
87 // Tests that we don't trigger does-not-exist callbacks for a resource
88 // that was previously valid but is updated to be invalid.
TEST_P(CdsTest,InvalidClusterStillExistsIfPreviouslyCached)89 TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) {
90   CreateAndStartBackends(1);
91   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
92   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
93   // Check that everything works.
94   CheckRpcSendOk(DEBUG_LOCATION);
95   // Now send an update changing the Cluster to be invalid.
96   auto cluster = default_cluster_;
97   cluster.set_type(Cluster::STATIC);
98   balancer_->ads_service()->SetCdsResource(cluster);
99   const auto response_state =
100       WaitForCdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK);
101   ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
102   EXPECT_EQ(response_state->error_message,
103             "xDS response validation errors: ["
104             "resource index 0: cluster_name: "
105             "INVALID_ARGUMENT: errors validating Cluster resource: ["
106             "field:type error:unknown discovery type]]");
107   CheckRpcSendOk(DEBUG_LOCATION);
108 }
109 
110 // Tests round robin is not implacted by the endpoint weight, and that the
111 // localities in a locality map are picked according to their weights.
TEST_P(CdsTest,EndpointWeightDoesNotImpactWeightedRoundRobin)112 TEST_P(CdsTest, EndpointWeightDoesNotImpactWeightedRoundRobin) {
113   CreateAndStartBackends(2);
114   const int kLocalityWeight0 = 2;
115   const int kLocalityWeight1 = 8;
116   const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
117   const double kLocalityWeightRate0 =
118       static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
119   const double kLocalityWeightRate1 =
120       static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
121   const double kErrorTolerance = 0.05;
122   const size_t kNumRpcs =
123       ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
124   // ADS response contains 2 localities, each of which contains 1 backend.
125   EdsResourceArgs args({
126       {"locality0",
127        {CreateEndpoint(0, HealthStatus::UNKNOWN, 8)},
128        kLocalityWeight0},
129       {"locality1",
130        {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)},
131        kLocalityWeight1},
132   });
133   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
134   // Wait for both backends to be ready.
135   WaitForAllBackends(DEBUG_LOCATION, 0, 2);
136   // Send kNumRpcs RPCs.
137   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
138   // The locality picking rates should be roughly equal to the expectation.
139   const double locality_picked_rate_0 =
140       static_cast<double>(backends_[0]->backend_service()->request_count()) /
141       kNumRpcs;
142   const double locality_picked_rate_1 =
143       static_cast<double>(backends_[1]->backend_service()->request_count()) /
144       kNumRpcs;
145   EXPECT_THAT(locality_picked_rate_0,
146               ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
147   EXPECT_THAT(locality_picked_rate_1,
148               ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
149 }
150 
151 // In most of our tests, we use different names for different resource
152 // types, to make sure that there are no cut-and-paste errors in the code
153 // that cause us to look at data for the wrong resource type.  So we add
154 // this test to make sure that the EDS resource name defaults to the
155 // cluster name if not specified in the CDS resource.
TEST_P(CdsTest,EdsServiceNameDefaultsToClusterName)156 TEST_P(CdsTest, EdsServiceNameDefaultsToClusterName) {
157   CreateAndStartBackends(1);
158   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
159   balancer_->ads_service()->SetEdsResource(
160       BuildEdsResource(args, kDefaultClusterName));
161   Cluster cluster = default_cluster_;
162   cluster.mutable_eds_cluster_config()->clear_service_name();
163   balancer_->ads_service()->SetCdsResource(cluster);
164   CheckRpcSendOk(DEBUG_LOCATION, /*times=*/1,
165                  RpcOptions().set_timeout_ms(5000));
166 }
167 
168 // Tests switching over from one cluster to another.
TEST_P(CdsTest,ChangeClusters)169 TEST_P(CdsTest, ChangeClusters) {
170   CreateAndStartBackends(2);
171   const char* kNewClusterName = "new_cluster_name";
172   const char* kNewEdsServiceName = "new_eds_service_name";
173   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
174   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
175   // We need to wait for all backends to come online.
176   WaitForAllBackends(DEBUG_LOCATION, 0, 1);
177   // Populate new EDS resource.
178   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
179   balancer_->ads_service()->SetEdsResource(
180       BuildEdsResource(args, kNewEdsServiceName));
181   // Populate new CDS resource.
182   Cluster new_cluster = default_cluster_;
183   new_cluster.set_name(kNewClusterName);
184   new_cluster.mutable_eds_cluster_config()->set_service_name(
185       kNewEdsServiceName);
186   balancer_->ads_service()->SetCdsResource(new_cluster);
187   // Change RDS resource to point to new cluster.
188   RouteConfiguration new_route_config = default_route_config_;
189   new_route_config.mutable_virtual_hosts(0)
190       ->mutable_routes(0)
191       ->mutable_route()
192       ->set_cluster(kNewClusterName);
193   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
194                                    new_route_config);
195   // Wait for all new backends to be used.
196   WaitForAllBackends(DEBUG_LOCATION, 1, 2);
197 }
198 
TEST_P(CdsTest,CircuitBreaking)199 TEST_P(CdsTest, CircuitBreaking) {
200   CreateAndStartBackends(1);
201   constexpr size_t kMaxConcurrentRequests = 10;
202   // Populate new EDS resources.
203   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
204   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
205   // Update CDS resource to set max concurrent request.
206   CircuitBreakers circuit_breaks;
207   Cluster cluster = default_cluster_;
208   auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
209   threshold->set_priority(RoutingPriority::DEFAULT);
210   threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
211   balancer_->ads_service()->SetCdsResource(cluster);
212   // Send exactly max_concurrent_requests long RPCs.
213   LongRunningRpc rpcs[kMaxConcurrentRequests];
214   for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
215     rpcs[i].StartRpc(stub_.get());
216   }
217   // Wait for all RPCs to be in flight.
218   while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
219          kMaxConcurrentRequests) {
220     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
221                                  gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
222   }
223   // Sending a RPC now should fail, the error message should tell us
224   // we hit the max concurrent requests limit and got dropped.
225   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
226                       "circuit breaker drop");
227   // Cancel one RPC to allow another one through.
228   rpcs[0].CancelRpc();
229   // Add a sleep here to ensure the RPC cancellation has completed correctly
230   // before trying the next RPC. There maybe a slight delay between return of
231   // CANCELLED RPC status and update of internal state tracking the number of
232   // concurrent active requests.
233   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
234                                gpr_time_from_millis(1000, GPR_TIMESPAN)));
235   CheckRpcSendOk(DEBUG_LOCATION);
236   // Clean up.
237   for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
238     rpcs[i].CancelRpc();
239   }
240 }
241 
TEST_P(CdsTest,CircuitBreakingMultipleChannelsShareCallCounter)242 TEST_P(CdsTest, CircuitBreakingMultipleChannelsShareCallCounter) {
243   CreateAndStartBackends(1);
244   constexpr size_t kMaxConcurrentRequests = 10;
245   // Populate new EDS resources.
246   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
247   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
248   // Update CDS resource to set max concurrent request.
249   CircuitBreakers circuit_breaks;
250   Cluster cluster = default_cluster_;
251   auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
252   threshold->set_priority(RoutingPriority::DEFAULT);
253   threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
254   balancer_->ads_service()->SetCdsResource(cluster);
255   auto channel2 = CreateChannel();
256   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
257   // Send exactly max_concurrent_requests long RPCs, alternating between
258   // the two channels.
259   LongRunningRpc rpcs[kMaxConcurrentRequests];
260   for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
261     rpcs[i].StartRpc(i % 2 == 0 ? stub_.get() : stub2.get());
262   }
263   // Wait for all RPCs to be in flight.
264   while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
265          kMaxConcurrentRequests) {
266     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
267                                  gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
268   }
269   // Sending a RPC now should fail, the error message should tell us
270   // we hit the max concurrent requests limit and got dropped.
271   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
272                       "circuit breaker drop");
273   // Cancel one RPC to allow another one through
274   rpcs[0].CancelRpc();
275   // Add a sleep here to ensure the RPC cancellation has completed correctly
276   // before trying the next RPC. There maybe a slight delay between return of
277   // CANCELLED RPC status and update of internal state tracking the number of
278   // concurrent active requests.
279   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
280                                gpr_time_from_millis(1000, GPR_TIMESPAN)));
281   CheckRpcSendOk(DEBUG_LOCATION);
282   // Clean up.
283   for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
284     rpcs[i].CancelRpc();
285   }
286 }
287 
TEST_P(CdsTest,ClusterChangeAfterAdsCallFails)288 TEST_P(CdsTest, ClusterChangeAfterAdsCallFails) {
289   CreateAndStartBackends(2);
290   const char* kNewEdsResourceName = "new_eds_resource_name";
291   // Populate EDS resources.
292   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
293   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
294   // Check that the channel is working.
295   CheckRpcSendOk(DEBUG_LOCATION);
296   // Stop and restart the balancer.
297   balancer_->Shutdown();
298   balancer_->Start();
299   // Create new EDS resource.
300   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
301   balancer_->ads_service()->SetEdsResource(
302       BuildEdsResource(args, kNewEdsResourceName));
303   // Change CDS resource to point to new EDS resource.
304   auto cluster = default_cluster_;
305   cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
306   balancer_->ads_service()->SetCdsResource(cluster);
307   // Make sure client sees the change.
308   WaitForBackend(DEBUG_LOCATION, 1);
309 }
310 
TEST_P(CdsTest,MetricLabels)311 TEST_P(CdsTest, MetricLabels) {
312   // Injects a fake client call tracer factory. Try keep this at top.
313   grpc_core::FakeClientCallTracerFactory fake_client_call_tracer_factory;
314   CreateAndStartBackends(2);
315   // Populates EDS resources.
316   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)},
317                         {"locality1", CreateEndpointsForBackends(1, 2)}});
318   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
319   // Populates service labels to CDS resources.
320   auto cluster = default_cluster_;
321   auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
322   auto& label_map =
323       *filter_map["com.google.csm.telemetry_labels"].mutable_fields();
324   *label_map["service_name"].mutable_string_value() = "myservice";
325   *label_map["service_namespace"].mutable_string_value() = "mynamespace";
326   balancer_->ads_service()->SetCdsResource(cluster);
327   ChannelArguments channel_args;
328   channel_args.SetPointer(GRPC_ARG_INJECT_FAKE_CLIENT_CALL_TRACER_FACTORY,
329                           &fake_client_call_tracer_factory);
330   ResetStub(/*failover_timeout_ms=*/0, &channel_args);
331   // Send an RPC to backend 0.
332   WaitForBackend(DEBUG_LOCATION, 0);
333   // Verify that the optional labels are recorded in the call tracer.
334   EXPECT_THAT(
335       fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
336           ->GetLastCallAttemptTracer()
337           ->GetOptionalLabels(),
338       ::testing::ElementsAre(
339           ::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"),
340           ::testing::Pair(OptionalLabelKey::kXdsServiceNamespace,
341                           "mynamespace"),
342           ::testing::Pair(OptionalLabelKey::kLocality,
343                           LocalityNameString("locality0"))));
344   // Send an RPC to backend 1.
345   WaitForBackend(DEBUG_LOCATION, 1);
346   // Verify that the optional labels are recorded in the call
347   // tracer.
348   EXPECT_THAT(
349       fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
350           ->GetLastCallAttemptTracer()
351           ->GetOptionalLabels(),
352       ::testing::ElementsAre(
353           ::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"),
354           ::testing::Pair(OptionalLabelKey::kXdsServiceNamespace,
355                           "mynamespace"),
356           ::testing::Pair(OptionalLabelKey::kLocality,
357                           LocalityNameString("locality1"))));
358   // TODO(yashkt, yijiem): This shutdown shouldn't actually be necessary. The
359   // only reason it's here is to add a delay before
360   // fake_client_call_tracer_factory goes out of scope, since there may be
361   // lingering callbacks in the call stack that are using the CallAttemptTracer
362   // even after we get here, which would then cause a crash.  Find a cleaner way
363   // to fix this.
364   balancer_->Shutdown();
365 }
366 
367 //
368 // CDS deletion tests
369 //
370 
371 class CdsDeletionTest : public XdsEnd2endTest {
372  protected:
SetUp()373   void SetUp() override {}  // Individual tests call InitClient().
374 };
375 
376 INSTANTIATE_TEST_SUITE_P(XdsTest, CdsDeletionTest,
377                          ::testing::Values(XdsTestType()), &XdsTestType::Name);
378 
379 // Tests that we go into TRANSIENT_FAILURE if the Cluster is deleted.
TEST_P(CdsDeletionTest,ClusterDeleted)380 TEST_P(CdsDeletionTest, ClusterDeleted) {
381   InitClient();
382   CreateAndStartBackends(1);
383   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
384   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
385   // We need to wait for all backends to come online.
386   WaitForAllBackends(DEBUG_LOCATION);
387   // Unset CDS resource.
388   balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
389   // Wait for RPCs to start failing.
390   SendRpcsUntil(DEBUG_LOCATION, [](const RpcResult& result) {
391     if (result.status.ok()) return true;  // Keep going.
392     EXPECT_EQ(StatusCode::UNAVAILABLE, result.status.error_code());
393     EXPECT_EQ(
394         absl::StrCat("CDS resource ", kDefaultClusterName, " does not exist"),
395         result.status.error_message());
396     return false;
397   });
398   // Make sure we ACK'ed the update.
399   auto response_state = balancer_->ads_service()->cds_response_state();
400   ASSERT_TRUE(response_state.has_value());
401   EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
402 }
403 
404 // Tests that we ignore Cluster deletions if configured to do so.
TEST_P(CdsDeletionTest,ClusterDeletionIgnored)405 TEST_P(CdsDeletionTest, ClusterDeletionIgnored) {
406   InitClient(MakeBootstrapBuilder().SetIgnoreResourceDeletion());
407   CreateAndStartBackends(2);
408   // Bring up client pointing to backend 0 and wait for it to connect.
409   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
410   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
411   WaitForAllBackends(DEBUG_LOCATION, 0, 1);
412   // Make sure we ACKed the CDS update.
413   auto response_state = balancer_->ads_service()->cds_response_state();
414   ASSERT_TRUE(response_state.has_value());
415   EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
416   // Unset CDS resource and wait for client to ACK the update.
417   balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
418   const auto deadline = absl::Now() + absl::Seconds(30);
419   while (true) {
420     ASSERT_LT(absl::Now(), deadline) << "timed out waiting for CDS ACK";
421     response_state = balancer_->ads_service()->cds_response_state();
422     if (response_state.has_value()) break;
423   }
424   EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
425   // Make sure we can still send RPCs.
426   CheckRpcSendOk(DEBUG_LOCATION);
427   // Now recreate the CDS resource pointing to a new EDS resource that
428   // specified backend 1, and make sure the client uses it.
429   const char* kNewEdsResourceName = "new_eds_resource_name";
430   auto cluster = default_cluster_;
431   cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
432   balancer_->ads_service()->SetCdsResource(cluster);
433   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
434   balancer_->ads_service()->SetEdsResource(
435       BuildEdsResource(args, kNewEdsResourceName));
436   // Wait for client to start using backend 1.
437   WaitForAllBackends(DEBUG_LOCATION, 1, 2);
438 }
439 
440 //
441 // EDS tests
442 //
443 
444 using EdsTest = XdsEnd2endTest;
445 
446 INSTANTIATE_TEST_SUITE_P(
447     XdsTest, EdsTest,
448     ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
449     &XdsTestType::Name);
450 
451 // Tests that the balancer sends the correct response to the client, and the
452 // client sends RPCs to the backends using the default child policy.
TEST_P(EdsTest,Vanilla)453 TEST_P(EdsTest, Vanilla) {
454   CreateAndStartBackends(3);
455   const size_t kNumRpcsPerAddress = 100;
456   EdsResourceArgs args({
457       {"locality0", CreateEndpointsForBackends()},
458   });
459   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
460   // Make sure that trying to connect works without a call.
461   channel_->GetState(true /* try_to_connect */);
462   // We need to wait for all backends to come online.
463   WaitForAllBackends(DEBUG_LOCATION);
464   // Send kNumRpcsPerAddress RPCs per server.
465   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
466   // Each backend should have gotten 100 requests.
467   for (size_t i = 0; i < backends_.size(); ++i) {
468     EXPECT_EQ(kNumRpcsPerAddress,
469               backends_[i]->backend_service()->request_count());
470   }
471   // Check LB policy name for the channel.
472   EXPECT_EQ("xds_cluster_manager_experimental",
473             channel_->GetLoadBalancingPolicyName());
474 }
475 
TEST_P(EdsTest,MultipleAddressesPerEndpoint)476 TEST_P(EdsTest, MultipleAddressesPerEndpoint) {
477   grpc_core::testing::ScopedExperimentalEnvVar env(
478       "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
479   const size_t kNumRpcsPerAddress = 10;
480   // Create 3 backends, but leave backend 0 unstarted.
481   CreateBackends(3);
482   StartBackend(1);
483   StartBackend(2);
484   // The first endpoint is backends 0 and 1, the second endpoint is backend 2.
485   EdsResourceArgs args({
486       {"locality0",
487        {CreateEndpoint(0, HealthStatus::UNKNOWN, 1, {1}), CreateEndpoint(2)}},
488   });
489   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
490   // Initially, backend 0 is offline, so the first endpoint should
491   // connect to backend 1 instead.  Traffic should round-robin across
492   // backends 1 and 2.
493   WaitForAllBackends(DEBUG_LOCATION, 1);  // Wait for backends 1 and 2.
494   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
495   EXPECT_EQ(kNumRpcsPerAddress,
496             backends_[1]->backend_service()->request_count());
497   EXPECT_EQ(kNumRpcsPerAddress,
498             backends_[2]->backend_service()->request_count());
499   // Now start backend 0 and shutdown backend 1.
500   StartBackend(0);
501   ShutdownBackend(1);
502   // Wait for traffic to go to backend 0.
503   WaitForBackend(DEBUG_LOCATION, 0);
504   // Traffic should now round-robin across backends 0 and 2.
505   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
506   EXPECT_EQ(kNumRpcsPerAddress,
507             backends_[0]->backend_service()->request_count());
508   EXPECT_EQ(kNumRpcsPerAddress,
509             backends_[2]->backend_service()->request_count());
510 }
511 
TEST_P(EdsTest,IgnoresUnhealthyEndpoints)512 TEST_P(EdsTest, IgnoresUnhealthyEndpoints) {
513   CreateAndStartBackends(2);
514   const size_t kNumRpcsPerAddress = 100;
515   auto endpoints = CreateEndpointsForBackends();
516   endpoints.push_back(MakeNonExistantEndpoint());
517   endpoints.back().health_status = HealthStatus::DRAINING;
518   EdsResourceArgs args({
519       {"locality0", std::move(endpoints), kDefaultLocalityWeight,
520        kDefaultLocalityPriority},
521   });
522   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
523   // Make sure that trying to connect works without a call.
524   channel_->GetState(true /* try_to_connect */);
525   // We need to wait for all backends to come online.
526   WaitForAllBackends(DEBUG_LOCATION);
527   // Send kNumRpcsPerAddress RPCs per server.
528   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
529   // Each backend should have gotten 100 requests.
530   for (size_t i = 0; i < backends_.size(); ++i) {
531     EXPECT_EQ(kNumRpcsPerAddress,
532               backends_[i]->backend_service()->request_count());
533   }
534 }
535 
536 // This tests the bug described in https://github.com/grpc/grpc/issues/32486.
TEST_P(EdsTest,LocalityBecomesEmptyWithDeactivatedChildStateUpdate)537 TEST_P(EdsTest, LocalityBecomesEmptyWithDeactivatedChildStateUpdate) {
538   CreateAndStartBackends(1);
539   // Initial EDS resource has one locality with no endpoints.
540   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
541   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
542   WaitForAllBackends(DEBUG_LOCATION);
543   // EDS update removes all endpoints from the locality.
544   EdsResourceArgs::Locality empty_locality("locality0", {});
545   args = EdsResourceArgs({std::move(empty_locality)});
546   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
547   // Wait for RPCs to start failing.
548   constexpr char kErrorMessage[] =
549       "no children in weighted_target policy: "
550       "EDS resource eds_service_name contains empty localities: "
551       "\\[\\{region=\"xds_default_locality_region\", "
552       "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
553   SendRpcsUntil(DEBUG_LOCATION, [&](const RpcResult& result) {
554     if (result.status.ok()) return true;
555     EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
556     EXPECT_THAT(result.status.error_message(),
557                 ::testing::MatchesRegex(kErrorMessage));
558     return false;
559   });
560   // Shut down backend.  This triggers a connectivity state update from the
561   // deactivated child of the weighted_target policy.
562   ShutdownAllBackends();
563   // Now restart the backend.
564   StartAllBackends();
565   // Re-add endpoint.
566   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
567   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
568   // RPCs should eventually succeed.
569   WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
570     if (!result.status.ok()) {
571       EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
572       EXPECT_THAT(result.status.error_message(),
573                   ::testing::MatchesRegex(absl::StrCat(
574                       // The error message we see here depends on whether
575                       // the client sees the EDS update before or after it
576                       // sees the backend come back up.
577                       MakeConnectionFailureRegex(
578                           "connections to all backends failing; last error: "),
579                       "|", kErrorMessage)));
580     }
581   });
582 }
583 
TEST_P(EdsTest,NoLocalities)584 TEST_P(EdsTest, NoLocalities) {
585   CreateAndStartBackends(1);
586   // Initial EDS resource has no localities.
587   EdsResourceArgs args;
588   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
589   // RPCs should fail.
590   constexpr char kErrorMessage[] =
591       "no children in weighted_target policy: EDS resource eds_service_name "
592       "contains no localities";
593   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
594   // Send EDS resource that has an endpoint.
595   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
596   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
597   // RPCs should eventually succeed.
598   WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
599     if (!result.status.ok()) {
600       EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
601       EXPECT_THAT(result.status.error_message(),
602                   ::testing::MatchesRegex(kErrorMessage));
603     }
604   });
605 }
606 
607 // Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if
608 // all the servers are unreachable.
TEST_P(EdsTest,AllServersUnreachableFailFast)609 TEST_P(EdsTest, AllServersUnreachableFailFast) {
610   // Set Rpc timeout to 5 seconds to ensure there is enough time
611   // for communication with the xDS server to take place upon test start up.
612   const uint32_t kRpcTimeoutMs = 5000;
613   const size_t kNumUnreachableServers = 5;
614   std::vector<EdsResourceArgs::Endpoint> endpoints;
615   for (size_t i = 0; i < kNumUnreachableServers; ++i) {
616     endpoints.emplace_back(MakeNonExistantEndpoint());
617   }
618   EdsResourceArgs args({{"locality0", std::move(endpoints)}});
619   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
620   // The error shouldn't be DEADLINE_EXCEEDED because timeout is set to 5
621   // seconds, and we should disocver in that time that the target backend is
622   // down.
623   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
624                       MakeConnectionFailureRegex(
625                           "connections to all backends failing; last error: "),
626                       RpcOptions().set_timeout_ms(kRpcTimeoutMs));
627 }
628 
629 // Tests that RPCs fail when the backends are down, and will succeed again
630 // after the backends are restarted.
TEST_P(EdsTest,BackendsRestart)631 TEST_P(EdsTest, BackendsRestart) {
632   CreateAndStartBackends(3);
633   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
634   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
635   WaitForAllBackends(DEBUG_LOCATION);
636   // Stop backends.  RPCs should fail.
637   ShutdownAllBackends();
638   // Wait for channel to transition out of READY, so that we know it has
639   // noticed that all of the subchannels have failed.  Note that it may
640   // be reporting either CONNECTING or TRANSIENT_FAILURE at this point.
641   EXPECT_TRUE(channel_->WaitForStateChange(
642       GRPC_CHANNEL_READY, grpc_timeout_seconds_to_deadline(5)));
643   EXPECT_THAT(channel_->GetState(false),
644               ::testing::AnyOf(::testing::Eq(GRPC_CHANNEL_TRANSIENT_FAILURE),
645                                ::testing::Eq(GRPC_CHANNEL_CONNECTING)));
646   // RPCs should fail.
647   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
648                       MakeConnectionFailureRegex(
649                           "connections to all backends failing; last error: "));
650   // Restart all backends.  RPCs should start succeeding again.
651   StartAllBackends();
652   CheckRpcSendOk(DEBUG_LOCATION, 1,
653                  RpcOptions().set_timeout_ms(2000).set_wait_for_ready(true));
654 }
655 
TEST_P(EdsTest,IgnoresDuplicateUpdates)656 TEST_P(EdsTest, IgnoresDuplicateUpdates) {
657   CreateAndStartBackends(1);
658   const size_t kNumRpcsPerAddress = 100;
659   EdsResourceArgs args({
660       {"locality0", CreateEndpointsForBackends()},
661   });
662   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
663   // Wait for all backends to come online.
664   WaitForAllBackends(DEBUG_LOCATION);
665   // Send kNumRpcsPerAddress RPCs per server, but send an EDS update in
666   // between.  If the update is not ignored, this will cause the
667   // round_robin policy to see an update, which will randomly reset its
668   // position in the address list.
669   for (size_t i = 0; i < kNumRpcsPerAddress; ++i) {
670     CheckRpcSendOk(DEBUG_LOCATION, 2);
671     balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
672     CheckRpcSendOk(DEBUG_LOCATION, 2);
673   }
674   // Each backend should have gotten the right number of requests.
675   for (size_t i = 1; i < backends_.size(); ++i) {
676     EXPECT_EQ(kNumRpcsPerAddress,
677               backends_[i]->backend_service()->request_count());
678   }
679 }
680 
681 // Testing just one example of an invalid resource here.
682 // Unit tests for XdsEndpointResourceType have exhaustive tests for all
683 // of the invalid cases.
TEST_P(EdsTest,NacksInvalidResource)684 TEST_P(EdsTest, NacksInvalidResource) {
685   EdsResourceArgs args({
686       {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
687   });
688   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
689   const auto response_state = WaitForEdsNack(DEBUG_LOCATION);
690   ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
691   EXPECT_EQ(response_state->error_message,
692             "xDS response validation errors: ["
693             "resource index 0: eds_service_name: "
694             "INVALID_ARGUMENT: errors parsing EDS resource: ["
695             "field:endpoints error:priority 0 empty]]");
696 }
697 
698 // Tests that if the balancer is down, the RPCs will still be sent to the
699 // backends according to the last balancer response, until a new balancer is
700 // reachable.
TEST_P(EdsTest,KeepUsingLastDataIfBalancerGoesDown)701 TEST_P(EdsTest, KeepUsingLastDataIfBalancerGoesDown) {
702   CreateAndStartBackends(2);
703   // Set up EDS resource pointing to backend 0.
704   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
705   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
706   // Start the client and make sure it sees the backend.
707   WaitForBackend(DEBUG_LOCATION, 0);
708   // Stop the balancer, and verify that RPCs continue to flow to backend 0.
709   balancer_->Shutdown();
710   auto deadline = grpc_timeout_seconds_to_deadline(5);
711   do {
712     CheckRpcSendOk(DEBUG_LOCATION);
713   } while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
714   // Check the EDS resource to point to backend 1 and bring the balancer
715   // back up.
716   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
717   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
718   balancer_->Start();
719   // Wait for client to see backend 1.
720   WaitForBackend(DEBUG_LOCATION, 1);
721 }
722 
723 // Tests that the localities in a locality map are picked according to their
724 // weights.
TEST_P(EdsTest,LocalityWeights)725 TEST_P(EdsTest, LocalityWeights) {
726   CreateAndStartBackends(2);
727   const int kLocalityWeight0 = 2;
728   const int kLocalityWeight1 = 8;
729   const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
730   const double kLocalityWeightRate0 =
731       static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
732   const double kLocalityWeightRate1 =
733       static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
734   const double kErrorTolerance = 0.05;
735   const size_t kNumRpcs =
736       ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
737   // ADS response contains 2 localities, each of which contains 1 backend.
738   EdsResourceArgs args({
739       {"locality0", CreateEndpointsForBackends(0, 1), kLocalityWeight0},
740       {"locality1", CreateEndpointsForBackends(1, 2), kLocalityWeight1},
741   });
742   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
743   // Wait for both backends to be ready.
744   WaitForAllBackends(DEBUG_LOCATION, 0, 2);
745   // Send kNumRpcs RPCs.
746   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
747   // The locality picking rates should be roughly equal to the expectation.
748   const double locality_picked_rate_0 =
749       static_cast<double>(backends_[0]->backend_service()->request_count()) /
750       kNumRpcs;
751   const double locality_picked_rate_1 =
752       static_cast<double>(backends_[1]->backend_service()->request_count()) /
753       kNumRpcs;
754   EXPECT_THAT(locality_picked_rate_0,
755               ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
756   EXPECT_THAT(locality_picked_rate_1,
757               ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
758 }
759 
760 // Tests that we don't suffer from integer overflow in locality weights.
TEST_P(EdsTest,NoIntegerOverflowInLocalityWeights)761 TEST_P(EdsTest, NoIntegerOverflowInLocalityWeights) {
762   CreateAndStartBackends(2);
763   const uint32_t kLocalityWeight1 = std::numeric_limits<uint32_t>::max() / 3;
764   const uint32_t kLocalityWeight0 =
765       std::numeric_limits<uint32_t>::max() - kLocalityWeight1;
766   const uint64_t kTotalLocalityWeight =
767       static_cast<uint64_t>(kLocalityWeight0) +
768       static_cast<uint64_t>(kLocalityWeight1);
769   const double kLocalityWeightRate0 =
770       static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
771   const double kLocalityWeightRate1 =
772       static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
773   const double kErrorTolerance = 0.05;
774   const size_t kNumRpcs =
775       ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
776   // ADS response contains 2 localities, each of which contains 1 backend.
777   EdsResourceArgs args({
778       {"locality0", CreateEndpointsForBackends(0, 1), kLocalityWeight0},
779       {"locality1", CreateEndpointsForBackends(1, 2), kLocalityWeight1},
780   });
781   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
782   // Wait for both backends to be ready.
783   WaitForAllBackends(DEBUG_LOCATION, 0, 2);
784   // Send kNumRpcs RPCs.
785   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
786   // The locality picking rates should be roughly equal to the expectation.
787   const double locality_picked_rate_0 =
788       static_cast<double>(backends_[0]->backend_service()->request_count()) /
789       kNumRpcs;
790   const double locality_picked_rate_1 =
791       static_cast<double>(backends_[1]->backend_service()->request_count()) /
792       kNumRpcs;
793   EXPECT_THAT(locality_picked_rate_0,
794               ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
795   EXPECT_THAT(locality_picked_rate_1,
796               ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
797 }
798 
TEST_P(EdsTest,OneLocalityWithNoEndpoints)799 TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
800   CreateAndStartBackends(1);
801   // Initial EDS resource has one locality with no endpoints.
802   EdsResourceArgs::Locality empty_locality("locality0", {});
803   EdsResourceArgs args({std::move(empty_locality)});
804   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
805   // RPCs should fail.
806   constexpr char kErrorMessage[] =
807       "no children in weighted_target policy: "
808       "EDS resource eds_service_name contains empty localities: "
809       "\\[\\{region=\"xds_default_locality_region\", "
810       "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
811   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
812   // Send EDS resource that has an endpoint.
813   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
814   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
815   // RPCs should eventually succeed.
816   WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
817     if (!result.status.ok()) {
818       EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
819       EXPECT_THAT(result.status.error_message(),
820                   ::testing::MatchesRegex(kErrorMessage));
821     }
822   });
823 }
824 
825 // Tests that we correctly handle a locality containing no endpoints.
TEST_P(EdsTest,LocalityContainingNoEndpoints)826 TEST_P(EdsTest, LocalityContainingNoEndpoints) {
827   CreateAndStartBackends(2);
828   const size_t kNumRpcs = 5000;
829   // EDS response contains 2 localities, one with no endpoints.
830   EdsResourceArgs args({
831       {"locality0", CreateEndpointsForBackends()},
832       {"locality1", {}},
833   });
834   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
835   // Wait for both backends to be ready.
836   WaitForAllBackends(DEBUG_LOCATION);
837   // Send kNumRpcs RPCs.
838   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
839   // All traffic should go to the reachable locality.
840   EXPECT_EQ(backends_[0]->backend_service()->request_count(),
841             kNumRpcs / backends_.size());
842   EXPECT_EQ(backends_[1]->backend_service()->request_count(),
843             kNumRpcs / backends_.size());
844 }
845 
846 // Tests that the locality map can work properly even when it contains a large
847 // number of localities.
TEST_P(EdsTest,ManyLocalitiesStressTest)848 TEST_P(EdsTest, ManyLocalitiesStressTest) {
849   const size_t kNumLocalities = 50;
850   CreateAndStartBackends(kNumLocalities + 1);
851   const uint32_t kRpcTimeoutMs = 5000;
852   // The first ADS response contains kNumLocalities localities, each of which
853   // contains its own backend.
854   EdsResourceArgs args;
855   for (size_t i = 0; i < kNumLocalities; ++i) {
856     std::string name = absl::StrCat("locality", i);
857     EdsResourceArgs::Locality locality(name,
858                                        CreateEndpointsForBackends(i, i + 1));
859     args.locality_list.emplace_back(std::move(locality));
860   }
861   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
862   // Wait until all backends are ready.
863   WaitForAllBackends(DEBUG_LOCATION, 0, kNumLocalities,
864                      /*check_status=*/nullptr,
865                      WaitForBackendOptions().set_reset_counters(false),
866                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
867   // The second ADS response contains 1 locality, which contains backend 50.
868   args =
869       EdsResourceArgs({{"locality0", CreateEndpointsForBackends(
870                                          kNumLocalities, kNumLocalities + 1)}});
871   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
872   // Wait until backend 50 is ready.
873   WaitForBackend(DEBUG_LOCATION, kNumLocalities);
874 }
875 
876 // Tests that the localities in a locality map are picked correctly after
877 // update (addition, modification, deletion).
TEST_P(EdsTest,LocalityMapUpdateChurn)878 TEST_P(EdsTest, LocalityMapUpdateChurn) {
879   CreateAndStartBackends(4);
880   const size_t kNumRpcs = 3000;
881   // The locality weight for the first 3 localities.
882   const std::vector<int> kLocalityWeights0 = {2, 3, 4};
883   const double kTotalLocalityWeight0 =
884       std::accumulate(kLocalityWeights0.begin(), kLocalityWeights0.end(), 0);
885   std::vector<double> locality_weight_rate_0;
886   locality_weight_rate_0.reserve(kLocalityWeights0.size());
887   for (int weight : kLocalityWeights0) {
888     locality_weight_rate_0.push_back(weight / kTotalLocalityWeight0);
889   }
890   // Delete the first locality, keep the second locality, change the third
891   // locality's weight from 4 to 2, and add a new locality with weight 6.
892   const std::vector<int> kLocalityWeights1 = {3, 2, 6};
893   const double kTotalLocalityWeight1 =
894       std::accumulate(kLocalityWeights1.begin(), kLocalityWeights1.end(), 0);
895   std::vector<double> locality_weight_rate_1 = {
896       0 /* placeholder for locality 0 */};
897   for (int weight : kLocalityWeights1) {
898     locality_weight_rate_1.push_back(weight / kTotalLocalityWeight1);
899   }
900   EdsResourceArgs args({
901       {"locality0", CreateEndpointsForBackends(0, 1), 2},
902       {"locality1", CreateEndpointsForBackends(1, 2), 3},
903       {"locality2", CreateEndpointsForBackends(2, 3), 4},
904   });
905   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
906   // Wait for the first 3 backends to be ready.
907   WaitForAllBackends(DEBUG_LOCATION, 0, 3);
908   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
909   // Send kNumRpcs RPCs.
910   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
911   gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
912   // The picking rates of the first 3 backends should be roughly equal to the
913   // expectation.
914   std::vector<double> locality_picked_rates;
915   for (size_t i = 0; i < 3; ++i) {
916     locality_picked_rates.push_back(
917         static_cast<double>(backends_[i]->backend_service()->request_count()) /
918         kNumRpcs);
919   }
920   const double kErrorTolerance = 0.2;
921   for (size_t i = 0; i < 3; ++i) {
922     gpr_log(GPR_INFO, "Locality %" PRIuPTR " rate %f", i,
923             locality_picked_rates[i]);
924     EXPECT_THAT(
925         locality_picked_rates[i],
926         ::testing::AllOf(
927             ::testing::Ge(locality_weight_rate_0[i] * (1 - kErrorTolerance)),
928             ::testing::Le(locality_weight_rate_0[i] * (1 + kErrorTolerance))));
929   }
930   args = EdsResourceArgs({
931       {"locality1", CreateEndpointsForBackends(1, 2), 3},
932       {"locality2", CreateEndpointsForBackends(2, 3), 2},
933       {"locality3", CreateEndpointsForBackends(3, 4), 6},
934   });
935   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
936   // Backend 3 hasn't received any request.
937   EXPECT_EQ(0U, backends_[3]->backend_service()->request_count());
938   // Wait until the locality update has been processed, as signaled by backend
939   // 3 receiving a request.
940   WaitForAllBackends(DEBUG_LOCATION, 3, 4);
941   gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
942   // Send kNumRpcs RPCs.
943   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
944   gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
945   // Backend 0 no longer receives any request.
946   EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
947   // The picking rates of the last 3 backends should be roughly equal to the
948   // expectation.
949   locality_picked_rates = {0 /* placeholder for backend 0 */};
950   for (size_t i = 1; i < 4; ++i) {
951     locality_picked_rates.push_back(
952         static_cast<double>(backends_[i]->backend_service()->request_count()) /
953         kNumRpcs);
954   }
955   for (size_t i = 1; i < 4; ++i) {
956     gpr_log(GPR_INFO, "Locality %" PRIuPTR " rate %f", i,
957             locality_picked_rates[i]);
958     EXPECT_THAT(
959         locality_picked_rates[i],
960         ::testing::AllOf(
961             ::testing::Ge(locality_weight_rate_1[i] * (1 - kErrorTolerance)),
962             ::testing::Le(locality_weight_rate_1[i] * (1 + kErrorTolerance))));
963   }
964 }
965 
966 // Tests that we don't fail RPCs when replacing all of the localities in
967 // a given priority.
TEST_P(EdsTest,ReplaceAllLocalitiesInPriority)968 TEST_P(EdsTest, ReplaceAllLocalitiesInPriority) {
969   CreateAndStartBackends(2);
970   // Initial EDS update has backend 0.
971   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
972   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
973   // Wait for the first backend to be ready.
974   WaitForBackend(DEBUG_LOCATION, 0);
975   // Send EDS update that replaces the locality and switches to backend 1.
976   args = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}});
977   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
978   // When the client sees the update, RPCs should start going to backend 1.
979   // No RPCs should fail during this change.
980   WaitForBackend(DEBUG_LOCATION, 1);
981 }
982 
TEST_P(EdsTest,ConsistentWeightedTargetUpdates)983 TEST_P(EdsTest, ConsistentWeightedTargetUpdates) {
984   CreateAndStartBackends(4);
985   // Initial update has two localities.
986   EdsResourceArgs args({
987       {"locality0", CreateEndpointsForBackends(1, 2)},
988       {"locality1", CreateEndpointsForBackends(2, 3)},
989   });
990   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
991   WaitForAllBackends(DEBUG_LOCATION, 1, 3);
992   // Next update removes locality1.
993   // Also add backend 0 to locality0, so that we can tell when the
994   // update has been seen.
995   args = EdsResourceArgs({
996       {"locality0", CreateEndpointsForBackends(0, 2)},
997   });
998   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
999   WaitForBackend(DEBUG_LOCATION, 0);
1000   // Next update re-adds locality1.
1001   // Also add backend 3 to locality1, so that we can tell when the
1002   // update has been seen.
1003   args = EdsResourceArgs({
1004       {"locality0", CreateEndpointsForBackends(0, 2)},
1005       {"locality1", CreateEndpointsForBackends(2, 4)},
1006   });
1007   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1008   WaitForBackend(DEBUG_LOCATION, 3);
1009 }
1010 
1011 // Tests that RPCs are dropped according to the drop config.
TEST_P(EdsTest,Drops)1012 TEST_P(EdsTest, Drops) {
1013   CreateAndStartBackends(1);
1014   const uint32_t kDropPerMillionForLb = 100000;
1015   const uint32_t kDropPerMillionForThrottle = 200000;
1016   const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
1017   const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
1018   const double kDropRateForLbAndThrottle =
1019       kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
1020   const double kErrorTolerance = 0.05;
1021   const size_t kNumRpcs =
1022       ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
1023   // The ADS response contains two drop categories.
1024   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1025   args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1026                           {kThrottleDropType, kDropPerMillionForThrottle}};
1027   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1028   // Send kNumRpcs RPCs and count the drops.
1029   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1030       DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1031       kStatusMessageDropPrefix);
1032   // The drop rate should be roughly equal to the expectation.
1033   const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1034   EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1035                                                     kErrorTolerance));
1036 }
1037 
1038 // Tests that drop config is converted correctly from per hundred.
TEST_P(EdsTest,DropPerHundred)1039 TEST_P(EdsTest, DropPerHundred) {
1040   CreateAndStartBackends(1);
1041   const uint32_t kDropPerHundredForLb = 10;
1042   const double kDropRateForLb = kDropPerHundredForLb / 100.0;
1043   const double kErrorTolerance = 0.05;
1044   const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1045   // The ADS response contains one drop category.
1046   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1047   args.drop_categories = {{kLbDropType, kDropPerHundredForLb}};
1048   args.drop_denominator = FractionalPercent::HUNDRED;
1049   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1050   // Send kNumRpcs RPCs and count the drops.
1051   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1052       DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1053       kStatusMessageDropPrefix);
1054   // The drop rate should be roughly equal to the expectation.
1055   const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1056   EXPECT_THAT(seen_drop_rate,
1057               ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1058 }
1059 
1060 // Tests that drop config is converted correctly from per ten thousand.
TEST_P(EdsTest,DropPerTenThousand)1061 TEST_P(EdsTest, DropPerTenThousand) {
1062   CreateAndStartBackends(1);
1063   const uint32_t kDropPerTenThousandForLb = 1000;
1064   const double kDropRateForLb = kDropPerTenThousandForLb / 10000.0;
1065   const double kErrorTolerance = 0.05;
1066   const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1067   // The ADS response contains one drop category.
1068   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1069   args.drop_categories = {{kLbDropType, kDropPerTenThousandForLb}};
1070   args.drop_denominator = FractionalPercent::TEN_THOUSAND;
1071   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1072   // Send kNumRpcs RPCs and count the drops.
1073   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1074       DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1075       kStatusMessageDropPrefix);
1076   // The drop rate should be roughly equal to the expectation.
1077   const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1078   EXPECT_THAT(seen_drop_rate,
1079               ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1080 }
1081 
1082 // Tests that drop is working correctly after update.
TEST_P(EdsTest,DropConfigUpdate)1083 TEST_P(EdsTest, DropConfigUpdate) {
1084   CreateAndStartBackends(1);
1085   const uint32_t kDropPerMillionForLb = 100000;
1086   const uint32_t kDropPerMillionForThrottle = 200000;
1087   const double kErrorTolerance = 0.05;
1088   const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
1089   const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
1090   const double kDropRateForLbAndThrottle =
1091       kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
1092   const size_t kNumRpcsLbOnly =
1093       ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1094   const size_t kNumRpcsBoth =
1095       ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
1096   // The first ADS response contains one drop category.
1097   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1098   args.drop_categories = {{kLbDropType, kDropPerMillionForLb}};
1099   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1100   // Send kNumRpcsLbOnly RPCs and count the drops.
1101   gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
1102   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1103       DEBUG_LOCATION, kNumRpcsLbOnly, StatusCode::UNAVAILABLE,
1104       kStatusMessageDropPrefix);
1105   gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
1106   // The drop rate should be roughly equal to the expectation.
1107   double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsLbOnly;
1108   gpr_log(GPR_INFO, "First batch drop rate %f", seen_drop_rate);
1109   EXPECT_THAT(seen_drop_rate,
1110               ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1111   // The second ADS response contains two drop categories, send an update EDS
1112   // response.
1113   args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1114                           {kThrottleDropType, kDropPerMillionForThrottle}};
1115   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1116   // Wait until the drop rate increases to the middle of the two configs,
1117   // which implies that the update has been in effect.
1118   const double kDropRateThreshold =
1119       (kDropRateForLb + kDropRateForLbAndThrottle) / 2;
1120   size_t num_rpcs = kNumRpcsBoth;
1121   SendRpcsUntil(
1122       DEBUG_LOCATION,
1123       [&](const RpcResult& result) {
1124         ++num_rpcs;
1125         if (result.status.ok()) {
1126           EXPECT_EQ(result.response.message(), kRequestMessage);
1127         } else {
1128           EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
1129           EXPECT_THAT(result.status.error_message(),
1130                       ::testing::StartsWith(kStatusMessageDropPrefix));
1131           ++num_drops;
1132         }
1133         seen_drop_rate = static_cast<double>(num_drops) / num_rpcs;
1134         return seen_drop_rate < kDropRateThreshold;
1135       },
1136       /*timeout_ms=*/40000);
1137   // Send kNumRpcsBoth RPCs and count the drops.
1138   gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
1139   num_drops = SendRpcsAndCountFailuresWithMessage(DEBUG_LOCATION, kNumRpcsBoth,
1140                                                   StatusCode::UNAVAILABLE,
1141                                                   kStatusMessageDropPrefix);
1142   gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
1143   // The new drop rate should be roughly equal to the expectation.
1144   seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsBoth;
1145   gpr_log(GPR_INFO, "Second batch drop rate %f", seen_drop_rate);
1146   EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1147                                                     kErrorTolerance));
1148 }
1149 
1150 // Tests that all the RPCs are dropped if any drop category drops 100%.
TEST_P(EdsTest,DropAll)1151 TEST_P(EdsTest, DropAll) {
1152   const size_t kNumRpcs = 1000;
1153   const uint32_t kDropPerMillionForLb = 100000;
1154   const uint32_t kDropPerMillionForThrottle = 1000000;
1155   // The ADS response contains two drop categories.
1156   EdsResourceArgs args;
1157   args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1158                           {kThrottleDropType, kDropPerMillionForThrottle}};
1159   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1160   // Send kNumRpcs RPCs and all of them are dropped.
1161   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1162       DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1163       kStatusMessageDropPrefix);
1164   EXPECT_EQ(num_drops, kNumRpcs);
1165 }
1166 
1167 //
1168 // EDS failover tests
1169 //
1170 
1171 class FailoverTest : public XdsEnd2endTest {
1172  public:
SetUp()1173   void SetUp() override {
1174     XdsEnd2endTest::SetUp();
1175     ResetStub(/*failover_timeout_ms=*/500);
1176   }
1177 };
1178 
1179 INSTANTIATE_TEST_SUITE_P(
1180     XdsTest, FailoverTest,
1181     ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
1182     &XdsTestType::Name);
1183 
1184 // Localities with the highest priority are used when multiple priority exist.
TEST_P(FailoverTest,ChooseHighestPriority)1185 TEST_P(FailoverTest, ChooseHighestPriority) {
1186   CreateAndStartBackends(4);
1187   EdsResourceArgs args({
1188       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1189        1},
1190       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1191        2},
1192       {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1193        3},
1194       {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1195        0},
1196   });
1197   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1198   WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1199                  WaitForBackendOptions().set_reset_counters(false));
1200   for (size_t i = 0; i < 3; ++i) {
1201     EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1202   }
1203 }
1204 
1205 // Does not choose priority with no endpoints.
TEST_P(FailoverTest,DoesNotUsePriorityWithNoEndpoints)1206 TEST_P(FailoverTest, DoesNotUsePriorityWithNoEndpoints) {
1207   CreateAndStartBackends(3);
1208   EdsResourceArgs args({
1209       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1210        1},
1211       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1212        2},
1213       {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1214        3},
1215       {"locality3", {}, kDefaultLocalityWeight, 0},
1216   });
1217   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1218   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1219                  WaitForBackendOptions().set_reset_counters(false));
1220   for (size_t i = 1; i < 3; ++i) {
1221     EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1222   }
1223 }
1224 
1225 // Does not choose locality with no endpoints.
TEST_P(FailoverTest,DoesNotUseLocalityWithNoEndpoints)1226 TEST_P(FailoverTest, DoesNotUseLocalityWithNoEndpoints) {
1227   CreateAndStartBackends(1);
1228   EdsResourceArgs args({
1229       {"locality0", {}, kDefaultLocalityWeight, 0},
1230       {"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 0},
1231   });
1232   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1233   // Wait for all backends to be used.
1234   WaitForAllBackends(DEBUG_LOCATION);
1235 }
1236 
1237 // If the higher priority localities are not reachable, failover to the
1238 // highest priority among the rest.
TEST_P(FailoverTest,Failover)1239 TEST_P(FailoverTest, Failover) {
1240   CreateAndStartBackends(2);
1241   EdsResourceArgs args({
1242       {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
1243       {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1244        2},
1245       {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1246        3},
1247       {"locality3", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
1248   });
1249   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1250   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1251                  WaitForBackendOptions().set_reset_counters(false));
1252   EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
1253 }
1254 
1255 // Reports CONNECTING when failing over to a lower priority.
TEST_P(FailoverTest,ReportsConnectingDuringFailover)1256 TEST_P(FailoverTest, ReportsConnectingDuringFailover) {
1257   CreateAndStartBackends(1);
1258   // Priority 0 will be unreachable, so we'll use priority 1.
1259   EdsResourceArgs args({
1260       {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
1261       {"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 1},
1262   });
1263   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1264   ConnectionAttemptInjector injector;
1265   auto hold = injector.AddHold(backends_[0]->port());
1266   // Start an RPC in the background, which should cause the channel to
1267   // try to connect.
1268   LongRunningRpc rpc;
1269   rpc.StartRpc(stub_.get(), RpcOptions());
1270   // Wait for connection attempt to start to the backend.
1271   hold->Wait();
1272   // Channel state should be CONNECTING here, and any RPC should be
1273   // queued.
1274   EXPECT_EQ(channel_->GetState(false), GRPC_CHANNEL_CONNECTING);
1275   // Allow the connection attempt to complete.
1276   hold->Resume();
1277   // Now the RPC should complete successfully.
1278   gpr_log(GPR_INFO, "=== WAITING FOR RPC TO FINISH ===");
1279   Status status = rpc.GetStatus();
1280   gpr_log(GPR_INFO, "=== RPC FINISHED ===");
1281   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1282                            << " message=" << status.error_message();
1283 }
1284 
1285 // If a locality with higher priority than the current one becomes ready,
1286 // switch to it.
TEST_P(FailoverTest,SwitchBackToHigherPriority)1287 TEST_P(FailoverTest, SwitchBackToHigherPriority) {
1288   CreateAndStartBackends(4);
1289   const size_t kNumRpcs = 100;
1290   EdsResourceArgs args({
1291       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1292        1},
1293       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1294        2},
1295       {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1296        3},
1297       {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1298        0},
1299   });
1300   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1301   WaitForBackend(DEBUG_LOCATION, 3);
1302   backends_[3]->StopListeningAndSendGoaways();
1303   backends_[0]->StopListeningAndSendGoaways();
1304   WaitForBackend(DEBUG_LOCATION, 1);
1305   ShutdownBackend(0);
1306   StartBackend(0);
1307   WaitForBackend(DEBUG_LOCATION, 0);
1308   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
1309   EXPECT_EQ(kNumRpcs, backends_[0]->backend_service()->request_count());
1310 }
1311 
1312 // The first update only contains unavailable priorities. The second update
1313 // contains available priorities.
TEST_P(FailoverTest,UpdateInitialUnavailable)1314 TEST_P(FailoverTest, UpdateInitialUnavailable) {
1315   CreateAndStartBackends(2);
1316   EdsResourceArgs args({
1317       {"locality0", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 0},
1318       {"locality1", {MakeNonExistantEndpoint()}, kDefaultLocalityWeight, 1},
1319   });
1320   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1321   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1322                       MakeConnectionFailureRegex(
1323                           "connections to all backends failing; last error: "));
1324   args = EdsResourceArgs({
1325       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1326        0},
1327       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1328        1},
1329   });
1330   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1331   WaitForBackend(DEBUG_LOCATION, 0, [&](const RpcResult& result) {
1332     if (!result.status.ok()) {
1333       EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
1334       EXPECT_THAT(result.status.error_message(),
1335                   ::testing::MatchesRegex(MakeConnectionFailureRegex(
1336                       "connections to all backends failing; last error: ")));
1337     }
1338   });
1339 }
1340 
1341 // Tests that after the localities' priorities are updated, we still choose
1342 // the highest READY priority with the updated localities.
TEST_P(FailoverTest,UpdatePriority)1343 TEST_P(FailoverTest, UpdatePriority) {
1344   CreateAndStartBackends(4);
1345   const size_t kNumRpcs = 100;
1346   EdsResourceArgs args({
1347       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1348        1},
1349       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1350        2},
1351       {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1352        3},
1353       {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1354        0},
1355   });
1356   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1357   WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1358                  WaitForBackendOptions().set_reset_counters(false));
1359   EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
1360   EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
1361   EXPECT_EQ(0U, backends_[2]->backend_service()->request_count());
1362   args = EdsResourceArgs({
1363       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1364        2},
1365       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1366        0},
1367       {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1368        1},
1369       {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1370        3},
1371   });
1372   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1373   WaitForBackend(DEBUG_LOCATION, 1);
1374   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
1375   EXPECT_EQ(kNumRpcs, backends_[1]->backend_service()->request_count());
1376 }
1377 
1378 // Moves all localities in the current priority to a higher priority.
TEST_P(FailoverTest,MoveAllLocalitiesInCurrentPriorityToHigherPriority)1379 TEST_P(FailoverTest, MoveAllLocalitiesInCurrentPriorityToHigherPriority) {
1380   CreateAndStartBackends(3);
1381   auto non_existant_endpoint = MakeNonExistantEndpoint();
1382   // First update:
1383   // - Priority 0 is locality 0, containing an unreachable backend.
1384   // - Priority 1 is locality 1, containing backends 0 and 1.
1385   EdsResourceArgs args({
1386       {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1387       {"locality1", CreateEndpointsForBackends(0, 2), kDefaultLocalityWeight,
1388        1},
1389   });
1390   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1391   // When we get the first update, all backends in priority 0 are down,
1392   // so we will create priority 1.  Backends 0 and 1 should have traffic,
1393   // but backend 2 should not.
1394   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
1395                      WaitForBackendOptions().set_reset_counters(false));
1396   EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
1397   // Second update:
1398   // - Priority 0 contains both localities 0 and 1.
1399   // - Priority 1 is not present.
1400   // - We add backend 2 to locality 1, just so we have a way to know
1401   //   when the update has been seen by the client.
1402   args = EdsResourceArgs({
1403       {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1404       {"locality1", CreateEndpointsForBackends(0, 3), kDefaultLocalityWeight,
1405        0},
1406   });
1407   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1408   // When backend 2 gets traffic, we know the second update has been seen.
1409   WaitForBackend(DEBUG_LOCATION, 2);
1410   // The xDS server got at least 1 response.
1411   EXPECT_TRUE(balancer_->ads_service()->eds_response_state().has_value());
1412 }
1413 
1414 // This tests a bug triggered by the xds_cluster_resolver policy reusing
1415 // a child name for the priority policy when that child name was still
1416 // present but deactivated.
TEST_P(FailoverTest,PriorityChildNameChurn)1417 TEST_P(FailoverTest, PriorityChildNameChurn) {
1418   CreateAndStartBackends(4);
1419   auto non_existant_endpoint = MakeNonExistantEndpoint();
1420   // Initial update:
1421   // - P0:locality0, child number 0 (unreachable)
1422   // - P1:locality1, child number 1
1423   // - P2:locality2, child number 2
1424   EdsResourceArgs args({
1425       {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1426       {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1427        1},
1428       {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1429        2},
1430   });
1431   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1432   WaitForBackend(DEBUG_LOCATION, 0);
1433   // Next update:
1434   // - P0:locality0, child number 0 (still unreachable)
1435   // - P1:locality2, child number 2 (moved from P2 to P1)
1436   // - P2:locality3, child number 3 (new child)
1437   // Child number 1 will be deactivated.
1438   args = EdsResourceArgs({
1439       {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1440       {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1441        1},
1442       {"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1443        2},
1444   });
1445   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1446   WaitForBackend(DEBUG_LOCATION, 1);
1447   // Next update:
1448   // - P0:locality0, child number 0 (still unreachable)
1449   // - P1:locality4, child number 4 (new child number -- should not reuse #1)
1450   // - P2:locality3, child number 3
1451   // Child number 1 will be deactivated.
1452   args = EdsResourceArgs({
1453       {"locality0", {non_existant_endpoint}, kDefaultLocalityWeight, 0},
1454       {"locality4", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1455        1},
1456       {"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1457        2},
1458   });
1459   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1460   WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1461                  WaitForBackendOptions().set_reset_counters(false));
1462   // P2 should not have gotten any traffic in this change.
1463   EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
1464 }
1465 
1466 //
1467 // EDS client load reporting tests
1468 //
1469 
1470 using ClientLoadReportingTest = XdsEnd2endTest;
1471 
1472 INSTANTIATE_TEST_SUITE_P(
1473     XdsTest, ClientLoadReportingTest,
1474     ::testing::Values(XdsTestType().set_enable_load_reporting()),
1475     &XdsTestType::Name);
1476 
1477 MATCHER_P2(LoadMetricEq, num_requests_finished_with_metric, total_metric_value,
1478            "equals LoadMetric") {
1479   bool match = true;
1480   match &= ::testing::ExplainMatchResult(num_requests_finished_with_metric,
1481                                          arg.num_requests_finished_with_metric,
1482                                          result_listener);
1483   match &=
1484       ::testing::ExplainMatchResult(::testing::DoubleEq(total_metric_value),
1485                                     arg.total_metric_value, result_listener);
1486   return match;
1487 }
1488 
1489 // Tests that the load report received at the balancer is correct.
TEST_P(ClientLoadReportingTest,Vanilla)1490 TEST_P(ClientLoadReportingTest, Vanilla) {
1491   CreateAndStartBackends(4);
1492   const size_t kNumRpcsPerAddress = 10;
1493   const size_t kNumFailuresPerAddress = 3;
1494   EdsResourceArgs args({
1495       {"locality0", CreateEndpointsForBackends(0, 2)},
1496       {"locality1", CreateEndpointsForBackends(2, 4)},
1497   });
1498   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1499   // Wait until all backends are ready.
1500   size_t num_warmup_rpcs =
1501       WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
1502                          WaitForBackendOptions().set_reset_counters(false));
1503   // Send kNumRpcsPerAddress RPCs per server with named metrics.
1504   xds::data::orca::v3::OrcaLoadReport backend_metrics;
1505   auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1506   named_metrics["foo"] = 1.0;
1507   named_metrics["bar"] = 2.0;
1508   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1509                  RpcOptions().set_backend_metrics(backend_metrics));
1510   named_metrics["foo"] = 0.3;
1511   named_metrics["bar"] = 0.4;
1512   for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1513     CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1514                         RpcOptions().set_server_fail(true).set_backend_metrics(
1515                             backend_metrics));
1516   }
1517   const size_t total_successful_rpcs_sent =
1518       (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
1519   const size_t total_failed_rpcs_sent =
1520       kNumFailuresPerAddress * backends_.size();
1521   // Check that the backends got the right number of requests.
1522   size_t total_rpcs_sent = 0;
1523   for (const auto& backend : backends_) {
1524     total_rpcs_sent += backend->backend_service()->request_count();
1525   }
1526   EXPECT_EQ(total_rpcs_sent,
1527             total_successful_rpcs_sent + total_failed_rpcs_sent);
1528   // The load report received at the balancer should be correct.
1529   std::vector<ClientStats> load_report =
1530       balancer_->lrs_service()->WaitForLoadReport();
1531   ASSERT_EQ(load_report.size(), 1UL);
1532   ClientStats& client_stats = load_report.front();
1533   EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
1534   EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
1535   EXPECT_EQ(total_successful_rpcs_sent,
1536             client_stats.total_successful_requests());
1537   EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1538   EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
1539   EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
1540   EXPECT_EQ(0U, client_stats.total_dropped_requests());
1541   ASSERT_THAT(
1542       client_stats.locality_stats(),
1543       ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
1544                              ::testing::Pair("locality1", ::testing::_)));
1545   size_t num_successful_rpcs = 0;
1546   size_t num_failed_rpcs = 0;
1547   std::map<std::string, ClientStats::LocalityStats::LoadMetric>
1548       named_metrics_total;
1549   for (const auto& p : client_stats.locality_stats()) {
1550     EXPECT_EQ(p.second.total_requests_in_progress, 0U);
1551     EXPECT_EQ(
1552         p.second.total_issued_requests,
1553         p.second.total_successful_requests + p.second.total_error_requests);
1554     num_successful_rpcs += p.second.total_successful_requests;
1555     num_failed_rpcs += p.second.total_error_requests;
1556     for (const auto& s : p.second.load_metrics) {
1557       named_metrics_total[s.first] += s.second;
1558     }
1559   }
1560   EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
1561   EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
1562   EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
1563   EXPECT_THAT(
1564       named_metrics_total,
1565       ::testing::UnorderedElementsAre(
1566           ::testing::Pair(
1567               "foo",
1568               LoadMetricEq(
1569                   (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1570                       backends_.size(),
1571                   (kNumRpcsPerAddress * backends_.size()) * 1.0 +
1572                       (kNumFailuresPerAddress * backends_.size()) * 0.3)),
1573           ::testing::Pair(
1574               "bar",
1575               LoadMetricEq(
1576                   (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1577                       backends_.size(),
1578                   (kNumRpcsPerAddress * backends_.size()) * 2.0 +
1579                       (kNumFailuresPerAddress * backends_.size()) * 0.4))));
1580   // The LRS service got a single request, and sent a single response.
1581   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1582   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1583 }
1584 
1585 // Tests send_all_clusters.
TEST_P(ClientLoadReportingTest,SendAllClusters)1586 TEST_P(ClientLoadReportingTest, SendAllClusters) {
1587   CreateAndStartBackends(2);
1588   balancer_->lrs_service()->set_send_all_clusters(true);
1589   const size_t kNumRpcsPerAddress = 10;
1590   const size_t kNumFailuresPerAddress = 3;
1591   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1592   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1593   // Wait until all backends are ready.
1594   size_t num_warmup_rpcs = WaitForAllBackends(DEBUG_LOCATION);
1595   // Send kNumRpcsPerAddress RPCs per server.
1596   xds::data::orca::v3::OrcaLoadReport backend_metrics;
1597   auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1598   named_metrics["foo"] = 1.0;
1599   named_metrics["bar"] = 2.0;
1600   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1601                  RpcOptions().set_backend_metrics(backend_metrics));
1602   named_metrics["foo"] = 0.3;
1603   named_metrics["bar"] = 0.4;
1604   for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1605     CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1606                         RpcOptions().set_server_fail(true).set_backend_metrics(
1607                             backend_metrics));
1608   }
1609   // Check that each backend got the right number of requests.
1610   for (size_t i = 0; i < backends_.size(); ++i) {
1611     EXPECT_EQ(kNumRpcsPerAddress + kNumFailuresPerAddress,
1612               backends_[i]->backend_service()->request_count());
1613   }
1614   // The load report received at the balancer should be correct.
1615   std::vector<ClientStats> load_report =
1616       balancer_->lrs_service()->WaitForLoadReport();
1617   ASSERT_EQ(load_report.size(), 1UL);
1618   ClientStats& client_stats = load_report.front();
1619   EXPECT_EQ(kNumRpcsPerAddress * backends_.size() + num_warmup_rpcs,
1620             client_stats.total_successful_requests());
1621   EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1622   EXPECT_EQ((kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size() +
1623                 num_warmup_rpcs,
1624             client_stats.total_issued_requests());
1625   EXPECT_EQ(kNumFailuresPerAddress * backends_.size(),
1626             client_stats.total_error_requests());
1627   EXPECT_EQ(0U, client_stats.total_dropped_requests());
1628   EXPECT_THAT(
1629       client_stats.locality_stats(),
1630       ::testing::ElementsAre(::testing::Pair(
1631           "locality0",
1632           ::testing::Field(
1633               &ClientStats::LocalityStats::load_metrics,
1634               ::testing::UnorderedElementsAre(
1635                   ::testing::Pair(
1636                       "foo",
1637                       LoadMetricEq(
1638                           (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1639                               backends_.size(),
1640                           (kNumRpcsPerAddress * backends_.size()) * 1.0 +
1641                               (kNumFailuresPerAddress * backends_.size()) *
1642                                   0.3)),
1643                   ::testing::Pair(
1644                       "bar",
1645                       LoadMetricEq(
1646                           (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1647                               backends_.size(),
1648                           (kNumRpcsPerAddress * backends_.size()) * 2.0 +
1649                               (kNumFailuresPerAddress * backends_.size()) *
1650                                   0.4)))))));
1651   // The LRS service got a single request, and sent a single response.
1652   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1653   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1654 }
1655 
1656 // Tests that we don't include stats for clusters that are not requested
1657 // by the LRS server.
TEST_P(ClientLoadReportingTest,HonorsClustersRequestedByLrsServer)1658 TEST_P(ClientLoadReportingTest, HonorsClustersRequestedByLrsServer) {
1659   CreateAndStartBackends(1);
1660   balancer_->lrs_service()->set_cluster_names({"bogus"});
1661   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1662   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1663   // Wait until all backends are ready.
1664   WaitForAllBackends(DEBUG_LOCATION);
1665   // The load report received at the balancer should be correct.
1666   std::vector<ClientStats> load_report =
1667       balancer_->lrs_service()->WaitForLoadReport();
1668   ASSERT_EQ(load_report.size(), 0UL);
1669   // The LRS service got a single request, and sent a single response.
1670   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1671   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1672 }
1673 
1674 // Tests that if the balancer restarts, the client load report contains the
1675 // stats before and after the restart correctly.
TEST_P(ClientLoadReportingTest,BalancerRestart)1676 TEST_P(ClientLoadReportingTest, BalancerRestart) {
1677   CreateAndStartBackends(4);
1678   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 2)}});
1679   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1680   // Wait until all backends returned by the balancer are ready.
1681   size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
1682   std::vector<ClientStats> load_report =
1683       balancer_->lrs_service()->WaitForLoadReport();
1684   ASSERT_EQ(load_report.size(), 1UL);
1685   ClientStats client_stats = std::move(load_report.front());
1686   EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
1687   EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1688   EXPECT_EQ(0U, client_stats.total_error_requests());
1689   EXPECT_EQ(0U, client_stats.total_dropped_requests());
1690   EXPECT_THAT(client_stats.locality_stats(),
1691               ::testing::ElementsAre(::testing::Pair(
1692                   "locality0",
1693                   ::testing::Field(&ClientStats::LocalityStats::load_metrics,
1694                                    ::testing::IsEmpty()))));
1695   // Shut down the balancer.
1696   balancer_->Shutdown();
1697   // We should continue using the last EDS response we received from the
1698   // balancer before it was shut down.
1699   // Note: We need to use WaitForAllBackends() here instead of just
1700   // CheckRpcSendOk(kNumBackendsFirstPass), because when the balancer
1701   // shuts down, the XdsClient will generate an error to the
1702   // ListenerWatcher, which will cause the xds resolver to send a
1703   // no-op update to the LB policy.  When this update gets down to the
1704   // round_robin child policy for the locality, it will generate a new
1705   // subchannel list, which resets the start index randomly.  So we need
1706   // to be a little more permissive here to avoid spurious failures.
1707   ResetBackendCounters();
1708   num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
1709   // Now restart the balancer, this time pointing to the new backends.
1710   balancer_->Start();
1711   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(2, 4)}});
1712   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1713   // Wait for queries to start going to one of the new backends.
1714   // This tells us that we're now using the new serverlist.
1715   num_rpcs += WaitForAllBackends(DEBUG_LOCATION, 2, 4);
1716   // Send one RPC per backend.
1717   xds::data::orca::v3::OrcaLoadReport backend_metrics;
1718   auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1719   named_metrics["foo"] = 1.0;
1720   named_metrics["bar"] = 2.0;
1721   CheckRpcSendOk(DEBUG_LOCATION, 2,
1722                  RpcOptions().set_backend_metrics(backend_metrics));
1723   num_rpcs += 2;
1724   // Check client stats.
1725   load_report = balancer_->lrs_service()->WaitForLoadReport();
1726   ASSERT_EQ(load_report.size(), 1UL);
1727   client_stats = std::move(load_report.front());
1728   EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
1729   EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1730   EXPECT_EQ(0U, client_stats.total_error_requests());
1731   EXPECT_EQ(0U, client_stats.total_dropped_requests());
1732   EXPECT_THAT(client_stats.locality_stats(),
1733               ::testing::ElementsAre(::testing::Pair(
1734                   "locality0",
1735                   ::testing::Field(
1736                       &ClientStats::LocalityStats::load_metrics,
1737                       ::testing::UnorderedElementsAre(
1738                           ::testing::Pair("foo", LoadMetricEq(2, 2.0)),
1739                           ::testing::Pair("bar", LoadMetricEq(2, 4.0)))))));
1740 }
1741 
1742 // Tests load reporting when switching over from one cluster to another.
TEST_P(ClientLoadReportingTest,ChangeClusters)1743 TEST_P(ClientLoadReportingTest, ChangeClusters) {
1744   CreateAndStartBackends(4);
1745   const char* kNewClusterName = "new_cluster_name";
1746   const char* kNewEdsServiceName = "new_eds_service_name";
1747   balancer_->lrs_service()->set_cluster_names(
1748       {kDefaultClusterName, kNewClusterName});
1749   // cluster kDefaultClusterName -> locality0 -> backends 0 and 1
1750   EdsResourceArgs args({
1751       {"locality0", CreateEndpointsForBackends(0, 2)},
1752   });
1753   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1754   // cluster kNewClusterName -> locality1 -> backends 2 and 3
1755   EdsResourceArgs args2({
1756       {"locality1", CreateEndpointsForBackends(2, 4)},
1757   });
1758   balancer_->ads_service()->SetEdsResource(
1759       BuildEdsResource(args2, kNewEdsServiceName));
1760   // CDS resource for kNewClusterName.
1761   Cluster new_cluster = default_cluster_;
1762   new_cluster.set_name(kNewClusterName);
1763   new_cluster.mutable_eds_cluster_config()->set_service_name(
1764       kNewEdsServiceName);
1765   balancer_->ads_service()->SetCdsResource(new_cluster);
1766   // Wait for all backends to come online.
1767   size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
1768   // The load report received at the balancer should be correct.
1769   std::vector<ClientStats> load_report =
1770       balancer_->lrs_service()->WaitForLoadReport();
1771   EXPECT_THAT(
1772       load_report,
1773       ::testing::ElementsAre(::testing::AllOf(
1774           ::testing::Property(&ClientStats::cluster_name, kDefaultClusterName),
1775           ::testing::Property(&ClientStats::eds_service_name,
1776                               kDefaultEdsServiceName),
1777           ::testing::Property(
1778               &ClientStats::locality_stats,
1779               ::testing::ElementsAre(::testing::Pair(
1780                   "locality0",
1781                   ::testing::AllOf(
1782                       ::testing::Field(&ClientStats::LocalityStats::
1783                                            total_successful_requests,
1784                                        num_rpcs),
1785                       ::testing::Field(&ClientStats::LocalityStats::
1786                                            total_requests_in_progress,
1787                                        0UL),
1788                       ::testing::Field(
1789                           &ClientStats::LocalityStats::total_error_requests,
1790                           0UL),
1791                       ::testing::Field(
1792                           &ClientStats::LocalityStats::total_issued_requests,
1793                           num_rpcs),
1794                       ::testing::Field(
1795                           &ClientStats::LocalityStats::load_metrics,
1796                           ::testing::IsEmpty()))))),
1797           ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
1798   // Change RDS resource to point to new cluster.
1799   RouteConfiguration new_route_config = default_route_config_;
1800   new_route_config.mutable_virtual_hosts(0)
1801       ->mutable_routes(0)
1802       ->mutable_route()
1803       ->set_cluster(kNewClusterName);
1804   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1805                                    new_route_config);
1806   // Wait for all new backends to be used.
1807   num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 2, 4);
1808   // The load report received at the balancer should be correct.
1809   load_report = balancer_->lrs_service()->WaitForLoadReport();
1810   EXPECT_THAT(
1811       load_report,
1812       ::testing::ElementsAre(
1813           ::testing::AllOf(
1814               ::testing::Property(&ClientStats::cluster_name,
1815                                   kDefaultClusterName),
1816               ::testing::Property(&ClientStats::eds_service_name,
1817                                   kDefaultEdsServiceName),
1818               ::testing::Property(
1819                   &ClientStats::locality_stats,
1820                   ::testing::ElementsAre(::testing::Pair(
1821                       "locality0",
1822                       ::testing::AllOf(
1823                           ::testing::Field(&ClientStats::LocalityStats::
1824                                                total_successful_requests,
1825                                            ::testing::Lt(num_rpcs)),
1826                           ::testing::Field(&ClientStats::LocalityStats::
1827                                                total_requests_in_progress,
1828                                            0UL),
1829                           ::testing::Field(
1830                               &ClientStats::LocalityStats::total_error_requests,
1831                               0UL),
1832                           ::testing::Field(&ClientStats::LocalityStats::
1833                                                total_issued_requests,
1834                                            ::testing::Le(num_rpcs)),
1835                           ::testing::Field(
1836                               &ClientStats::LocalityStats::load_metrics,
1837                               ::testing::IsEmpty()))))),
1838               ::testing::Property(&ClientStats::total_dropped_requests, 0UL)),
1839           ::testing::AllOf(
1840               ::testing::Property(&ClientStats::cluster_name, kNewClusterName),
1841               ::testing::Property(&ClientStats::eds_service_name,
1842                                   kNewEdsServiceName),
1843               ::testing::Property(
1844                   &ClientStats::locality_stats,
1845                   ::testing::ElementsAre(::testing::Pair(
1846                       "locality1",
1847                       ::testing::AllOf(
1848                           ::testing::Field(&ClientStats::LocalityStats::
1849                                                total_successful_requests,
1850                                            ::testing::Le(num_rpcs)),
1851                           ::testing::Field(&ClientStats::LocalityStats::
1852                                                total_requests_in_progress,
1853                                            0UL),
1854                           ::testing::Field(
1855                               &ClientStats::LocalityStats::total_error_requests,
1856                               0UL),
1857                           ::testing::Field(&ClientStats::LocalityStats::
1858                                                total_issued_requests,
1859                                            ::testing::Le(num_rpcs)),
1860                           ::testing::Field(
1861                               &ClientStats::LocalityStats::load_metrics,
1862                               ::testing::IsEmpty()))))),
1863               ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
1864   size_t total_ok = 0;
1865   for (const ClientStats& client_stats : load_report) {
1866     total_ok += client_stats.total_successful_requests();
1867   }
1868   EXPECT_EQ(total_ok, num_rpcs);
1869   // The LRS service got a single request, and sent a single response.
1870   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1871   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1872 }
1873 
1874 // Tests that the drop stats are correctly reported by client load reporting.
TEST_P(ClientLoadReportingTest,DropStats)1875 TEST_P(ClientLoadReportingTest, DropStats) {
1876   CreateAndStartBackends(1);
1877   const uint32_t kDropPerMillionForLb = 100000;
1878   const uint32_t kDropPerMillionForThrottle = 200000;
1879   const double kErrorTolerance = 0.05;
1880   const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
1881   const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
1882   const double kDropRateForLbAndThrottle =
1883       kDropRateForLb + (1 - kDropRateForLb) * kDropRateForThrottle;
1884   const size_t kNumRpcs =
1885       ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
1886   // The ADS response contains two drop categories.
1887   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1888   args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1889                           {kThrottleDropType, kDropPerMillionForThrottle}};
1890   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1891   // Send kNumRpcs RPCs and count the drops.
1892   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1893       DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1894       kStatusMessageDropPrefix);
1895   // The drop rate should be roughly equal to the expectation.
1896   const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1897   EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1898                                                     kErrorTolerance));
1899   // Check client stats.
1900   ClientStats client_stats;
1901   do {
1902     std::vector<ClientStats> load_reports =
1903         balancer_->lrs_service()->WaitForLoadReport();
1904     for (const auto& load_report : load_reports) {
1905       client_stats += load_report;
1906     }
1907   } while (client_stats.total_issued_requests() +
1908                client_stats.total_dropped_requests() <
1909            kNumRpcs);
1910   EXPECT_EQ(num_drops, client_stats.total_dropped_requests());
1911   EXPECT_THAT(static_cast<double>(client_stats.dropped_requests(kLbDropType)) /
1912                   kNumRpcs,
1913               ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1914   EXPECT_THAT(
1915       static_cast<double>(client_stats.dropped_requests(kThrottleDropType)) /
1916           (kNumRpcs * (1 - kDropRateForLb)),
1917       ::testing::DoubleNear(kDropRateForThrottle, kErrorTolerance));
1918 }
1919 
1920 }  // namespace
1921 }  // namespace testing
1922 }  // namespace grpc
1923 
main(int argc,char ** argv)1924 int main(int argc, char** argv) {
1925   grpc::testing::TestEnvironment env(&argc, argv);
1926   ::testing::InitGoogleTest(&argc, argv);
1927   // Make the backup poller poll very frequently in order to pick up
1928   // updates from all the subchannels's FDs.
1929   grpc_core::ConfigVars::Overrides overrides;
1930   overrides.client_channel_backup_poll_interval_ms = 1;
1931   grpc_core::ConfigVars::SetOverrides(overrides);
1932 #if TARGET_OS_IPHONE
1933   // Workaround Apple CFStream bug
1934   grpc_core::SetEnv("grpc_cfstream", "0");
1935 #endif
1936   grpc_core::RegisterFakeStatsPlugin();
1937   grpc_init();
1938   grpc::testing::ConnectionAttemptInjector::Init();
1939   const auto result = RUN_ALL_TESTS();
1940   grpc_shutdown();
1941   return result;
1942 }
1943