xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/xds/xds_core_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include <algorithm>
17 #include <memory>
18 #include <string>
19 #include <type_traits>
20 #include <vector>
21 
22 #include <gmock/gmock.h>
23 #include <gtest/gtest.h>
24 
25 #include "absl/strings/str_cat.h"
26 
27 #include "src/core/client_channel/backup_poller.h"
28 #include "src/core/lib/config/config_vars.h"
29 #include "src/proto/grpc/testing/xds/v3/listener.pb.h"
30 #include "test/core/util/fake_stats_plugin.h"
31 #include "test/core/util/resolve_localhost_ip46.h"
32 #include "test/core/util/scoped_env_var.h"
33 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
34 #include "test/cpp/end2end/xds/xds_server.h"
35 
36 namespace grpc {
37 namespace testing {
38 namespace {
39 
40 using ClientStats = LrsServiceImpl::ClientStats;
41 
42 //
43 // XdsClientTest - basic tests of XdsClient functionality
44 //
45 
46 using XdsClientTest = XdsEnd2endTest;
47 
48 INSTANTIATE_TEST_SUITE_P(XdsTest, XdsClientTest,
49                          ::testing::Values(XdsTestType()), &XdsTestType::Name);
50 
51 // Tests that the client can handle resource wrapped in a Resource message.
TEST_P(XdsClientTest,ResourceWrappedInResourceMessage)52 TEST_P(XdsClientTest, ResourceWrappedInResourceMessage) {
53   CreateAndStartBackends(1);
54   balancer_->ads_service()->set_wrap_resources(true);
55   const size_t kNumRpcsPerAddress = 100;
56   EdsResourceArgs args({
57       {"locality0", CreateEndpointsForBackends()},
58   });
59   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
60   // Make sure that trying to connect works without a call.
61   channel_->GetState(true /* try_to_connect */);
62   // We need to wait for all backends to come online.
63   WaitForAllBackends(DEBUG_LOCATION);
64   // Send kNumRpcsPerAddress RPCs per server.
65   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
66   // Each backend should have gotten 100 requests.
67   for (size_t i = 0; i < backends_.size(); ++i) {
68     EXPECT_EQ(kNumRpcsPerAddress,
69               backends_[i]->backend_service()->request_count());
70   }
71   // Check LB policy name for the channel.
72   EXPECT_EQ("xds_cluster_manager_experimental",
73             channel_->GetLoadBalancingPolicyName());
74 }
75 
TEST_P(XdsClientTest,ResourceTypeVersionPersistsAcrossStreamRestarts)76 TEST_P(XdsClientTest, ResourceTypeVersionPersistsAcrossStreamRestarts) {
77   CreateAndStartBackends(2);
78   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
79   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
80   // Wait for backends to come online.
81   WaitForAllBackends(DEBUG_LOCATION, 0, 1);
82   // Stop balancer.
83   balancer_->Shutdown();
84   // Expect minimum version 1 for all resource types.
85   balancer_->ads_service()->SetCheckVersionCallback(
86       [&](absl::string_view resource_type, int version) {
87         EXPECT_GE(version, 1) << "resource_type: " << resource_type;
88       });
89   // Update backend, just so we can be sure that the client has
90   // reconnected to the balancer.
91   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
92   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
93   // Restart balancer.
94   balancer_->Start();
95   // Make sure client has reconnected.
96   WaitForAllBackends(DEBUG_LOCATION, 1, 2);
97 }
98 
99 // Tests that we restart all xDS requests when we reestablish the ADS call.
TEST_P(XdsClientTest,RestartsRequestsUponReconnection)100 TEST_P(XdsClientTest, RestartsRequestsUponReconnection) {
101   CreateAndStartBackends(2);
102   // Manually configure use of RDS.
103   auto listener = default_listener_;
104   HttpConnectionManager http_connection_manager;
105   listener.mutable_api_listener()->mutable_api_listener()->UnpackTo(
106       &http_connection_manager);
107   auto* rds = http_connection_manager.mutable_rds();
108   rds->set_route_config_name(kDefaultRouteConfigurationName);
109   rds->mutable_config_source()->mutable_self();
110   listener.mutable_api_listener()->mutable_api_listener()->PackFrom(
111       http_connection_manager);
112   balancer_->ads_service()->SetLdsResource(listener);
113   balancer_->ads_service()->SetRdsResource(default_route_config_);
114   const char* kNewClusterName = "new_cluster_name";
115   const char* kNewEdsServiceName = "new_eds_service_name";
116   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
117   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
118   // We need to wait for all backends to come online.
119   WaitForAllBackends(DEBUG_LOCATION, 0, 1);
120   // Now shut down and restart the balancer.  When the client
121   // reconnects, it should automatically restart the requests for all
122   // resource types.
123   balancer_->Shutdown();
124   balancer_->Start();
125   // Make sure things are still working.
126   CheckRpcSendOk(DEBUG_LOCATION, 100);
127   // Populate new EDS resource.
128   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
129   balancer_->ads_service()->SetEdsResource(
130       BuildEdsResource(args, kNewEdsServiceName));
131   // Populate new CDS resource.
132   Cluster new_cluster = default_cluster_;
133   new_cluster.set_name(kNewClusterName);
134   new_cluster.mutable_eds_cluster_config()->set_service_name(
135       kNewEdsServiceName);
136   balancer_->ads_service()->SetCdsResource(new_cluster);
137   // Change RDS resource to point to new cluster.
138   RouteConfiguration new_route_config = default_route_config_;
139   new_route_config.mutable_virtual_hosts(0)
140       ->mutable_routes(0)
141       ->mutable_route()
142       ->set_cluster(kNewClusterName);
143   balancer_->ads_service()->SetRdsResource(new_route_config);
144   // Wait for all new backends to be used.
145   WaitForAllBackends(DEBUG_LOCATION, 1, 2);
146 }
147 
TEST_P(XdsClientTest,XdsStreamErrorPropagation)148 TEST_P(XdsClientTest, XdsStreamErrorPropagation) {
149   const std::string kErrorMessage = "test forced ADS stream failure";
150   balancer_->ads_service()->ForceADSFailure(
151       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
152   auto status = SendRpc();
153   gpr_log(GPR_INFO,
154           "XdsStreamErrorPropagation test: RPC got error: code=%d message=%s",
155           status.error_code(), status.error_message().c_str());
156   EXPECT_THAT(status.error_code(), StatusCode::UNAVAILABLE);
157   EXPECT_THAT(status.error_message(), ::testing::HasSubstr(kErrorMessage));
158   EXPECT_THAT(status.error_message(),
159               ::testing::HasSubstr("(node ID:xds_end2end_test)"));
160 }
161 
162 //
163 // GlobalXdsClientTest - tests that need to run with a global XdsClient
164 // (this is the default in production)
165 //
166 
167 using GlobalXdsClientTest = XdsEnd2endTest;
168 
169 // Get bootstrap from env var, so that there's a global XdsClient.
170 INSTANTIATE_TEST_SUITE_P(XdsTest, GlobalXdsClientTest,
171                          ::testing::Values(XdsTestType().set_bootstrap_source(
172                              XdsTestType::kBootstrapFromEnvVar)),
173                          &XdsTestType::Name);
174 
TEST_P(GlobalXdsClientTest,MultipleChannelsSameTargetShareXdsClient)175 TEST_P(GlobalXdsClientTest, MultipleChannelsSameTargetShareXdsClient) {
176   CreateAndStartBackends(1);
177   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
178   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
179   WaitForAllBackends(DEBUG_LOCATION);
180   // Create second channel and tell it to connect to the same server.
181   auto channel2 = CreateChannel(/*failover_timeout_ms=*/0, kServerName);
182   channel2->GetState(/*try_to_connect=*/true);
183   ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1)));
184   // Make sure there's only one client connected.
185   EXPECT_EQ(1UL, balancer_->ads_service()->clients().size());
186 }
187 
TEST_P(GlobalXdsClientTest,MultipleChannelsDifferentTargetDoNotShareXdsClient)188 TEST_P(GlobalXdsClientTest,
189        MultipleChannelsDifferentTargetDoNotShareXdsClient) {
190   CreateAndStartBackends(1);
191   const char* kNewServerName = "new-server.example.com";
192   Listener listener = default_listener_;
193   listener.set_name(kNewServerName);
194   SetListenerAndRouteConfiguration(balancer_.get(), listener,
195                                    default_route_config_);
196   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
197   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
198   WaitForAllBackends(DEBUG_LOCATION);
199   // Create second channel and tell it to connect to kNewServerName.
200   auto channel2 = CreateChannel(/*failover_timeout_ms=*/0, kNewServerName);
201   channel2->GetState(/*try_to_connect=*/true);
202   ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1)));
203   // Make sure there are two clients connected.
204   EXPECT_EQ(2UL, balancer_->ads_service()->clients().size());
205 }
206 
TEST_P(GlobalXdsClientTest,MultipleChannelsShareXdsClientWithResourceUpdateAfterOneChannelGoesAway)207 TEST_P(
208     GlobalXdsClientTest,
209     MultipleChannelsShareXdsClientWithResourceUpdateAfterOneChannelGoesAway) {
210   CreateAndStartBackends(2);
211   // Test for https://github.com/grpc/grpc/issues/28468. Makes sure that the
212   // XdsClient properly handles the case where there are multiple watchers on
213   // the same resource and one of them unsubscribes.
214   const char* kNewServerName = "new-server.example.com";
215   Listener listener = default_listener_;
216   listener.set_name(kNewServerName);
217   SetListenerAndRouteConfiguration(balancer_.get(), listener,
218                                    default_route_config_);
219   balancer_->ads_service()->SetEdsResource(BuildEdsResource(EdsResourceArgs({
220       {"locality0", CreateEndpointsForBackends(0, 1)},
221   })));
222   WaitForBackend(DEBUG_LOCATION, 0);
223   // Create second channel and tell it to connect to kNewServerName.
224   auto channel2 = CreateChannel(/*failover_timeout_ms=*/0, kNewServerName);
225   channel2->GetState(/*try_to_connect=*/true);
226   ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1)));
227   // Now, destroy the new channel, send an EDS update to use a different backend
228   // and test that the channel switches to that backend.
229   channel2.reset();
230   // This sleep is needed to be able to reproduce the bug and to give time for
231   // the buggy unsubscription to take place.
232   // TODO(yashykt): Figure out a way to do this without the sleep.
233   gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(10));
234   balancer_->ads_service()->SetEdsResource(BuildEdsResource(EdsResourceArgs({
235       {"locality0", CreateEndpointsForBackends(1, 2)},
236   })));
237   WaitForBackend(DEBUG_LOCATION, 1);
238 }
239 
240 // Tests that the NACK for multiple bad LDS resources includes both errors.
241 // This needs to use xDS server as this is the only scenario when XdsClient
242 // is shared.
TEST_P(GlobalXdsClientTest,MultipleBadLdsResources)243 TEST_P(GlobalXdsClientTest, MultipleBadLdsResources) {
244   CreateBackends(2, true);
245   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 2)}});
246   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
247   auto listener = default_server_listener_;
248   listener.clear_address();
249   listener.set_name(GetServerListenerName(backends_[0]->port()));
250   balancer_->ads_service()->SetLdsResource(listener);
251   backends_[0]->Start();
252   auto response_state = WaitForLdsNack(DEBUG_LOCATION);
253   ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
254   EXPECT_EQ(
255       response_state->error_message,
256       absl::StrFormat(
257           "xDS response validation errors: ["
258           "resource index 0: "
259           "grpc/server?xds.resource.listening_address=127.0.0.1:%lu: "
260           "INVALID_ARGUMENT: Listener has neither address nor ApiListener]",
261           backends_[0]->port()));
262   listener = default_server_listener_;
263   listener.clear_address();
264   listener.set_name(GetServerListenerName(backends_[1]->port()));
265   balancer_->ads_service()->SetLdsResource(listener);
266   backends_[1]->Start();
267   constexpr absl::string_view kMessageFormat =
268       "xDS response validation errors: ["
269       "resource index 0: "
270       "grpc/server?xds.resource.listening_address=127.0.0.1:%d: "
271       "INVALID_ARGUMENT: Listener has neither address nor "
272       "ApiListener; "
273       "resource index 1: "
274       "grpc/server?xds.resource.listening_address=127.0.0.1:%d: "
275       "INVALID_ARGUMENT: Listener has neither address nor "
276       "ApiListener"
277       "]";
278   const std::string expected_message1 = absl::StrFormat(
279       kMessageFormat, backends_[0]->port(), backends_[1]->port());
280   const std::string expected_message2 = absl::StrFormat(
281       kMessageFormat, backends_[1]->port(), backends_[0]->port());
282   response_state = WaitForNack(
283       DEBUG_LOCATION, [&]() -> absl::optional<AdsServiceImpl::ResponseState> {
284         auto response = balancer_->ads_service()->lds_response_state();
285         if (response.has_value() &&
286             response->state == AdsServiceImpl::ResponseState::NACKED) {
287           if (response->error_message == expected_message1 ||
288               response->error_message == expected_message2) {
289             return response;
290           }
291           gpr_log(GPR_INFO, "non-matching NACK message: %s",
292                   response->error_message.c_str());
293         }
294         return absl::nullopt;
295       });
296   EXPECT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
297 }
298 
299 // Tests that we don't trigger does-not-exist callbacks for a resource
300 // that was previously valid but is updated to be invalid.
TEST_P(GlobalXdsClientTest,InvalidListenerStillExistsIfPreviouslyCached)301 TEST_P(GlobalXdsClientTest, InvalidListenerStillExistsIfPreviouslyCached) {
302   CreateAndStartBackends(1);
303   // Set up valid resources and check that the channel works.
304   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
305   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
306   CheckRpcSendOk(DEBUG_LOCATION);
307   // Now send an update changing the Listener to be invalid.
308   auto listener = default_listener_;
309   listener.clear_api_listener();
310   balancer_->ads_service()->SetLdsResource(listener);
311   const auto response_state =
312       WaitForLdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK);
313   ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
314   EXPECT_EQ(response_state->error_message,
315             "xDS response validation errors: ["
316             "resource index 0: server.example.com: "
317             "INVALID_ARGUMENT: Listener has neither address nor ApiListener]");
318   CheckRpcSendOk(DEBUG_LOCATION);
319 }
320 
321 //
322 // TimeoutTest - tests xDS initial timeout handling
323 //
324 
325 class TimeoutTest : public XdsEnd2endTest {
326  protected:
SetUp()327   void SetUp() override {
328     InitClient(MakeBootstrapBuilder(), /*lb_expected_authority=*/"",
329                /*xds_resource_does_not_exist_timeout_ms=*/2000);
330   }
331 };
332 
333 // Enable RDS, so that we can test all resource types.
334 // Run with bootstrap from env var so that multiple channels share the same
335 // XdsClient (needed for testing the timeout for the 2nd LDS and RDS resource).
336 INSTANTIATE_TEST_SUITE_P(
337     XdsTest, TimeoutTest,
338     ::testing::Values(
339         XdsTestType().set_enable_rds_testing().set_bootstrap_source(
340             XdsTestType::kBootstrapFromEnvVar)),
341     &XdsTestType::Name);
342 
TEST_P(TimeoutTest,LdsServerIgnoresRequest)343 TEST_P(TimeoutTest, LdsServerIgnoresRequest) {
344   balancer_->ads_service()->IgnoreResourceType(kLdsTypeUrl);
345   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
346                       absl::StrCat("empty address list: ", kServerName,
347                                    ": xDS listener resource does not exist"),
348                       RpcOptions().set_timeout_ms(4000));
349 }
350 
TEST_P(TimeoutTest,LdsResourceNotPresentInRequest)351 TEST_P(TimeoutTest, LdsResourceNotPresentInRequest) {
352   balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
353   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
354                       absl::StrCat("empty address list: ", kServerName,
355                                    ": xDS listener resource does not exist"),
356                       RpcOptions().set_timeout_ms(4000));
357 }
358 
TEST_P(TimeoutTest,LdsSecondResourceNotPresentInRequest)359 TEST_P(TimeoutTest, LdsSecondResourceNotPresentInRequest) {
360   ASSERT_NE(GetParam().bootstrap_source(),
361             XdsTestType::kBootstrapFromChannelArg)
362       << "This test cannot use bootstrap from channel args, because it "
363          "needs two channels to use the same XdsClient instance.";
364   CreateAndStartBackends(1);
365   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
366   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
367   CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(4000));
368   // Create second channel for a new server name.
369   // This should fail because there is no LDS resource for this server name.
370   const char* kNewServerName = "new-server.example.com";
371   auto channel2 = CreateChannel(/*failover_timeout_ms=*/0, kNewServerName);
372   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
373   ClientContext context;
374   EchoRequest request;
375   EchoResponse response;
376   RpcOptions rpc_options;
377   rpc_options.set_timeout_ms(4000).SetupRpc(&context, &request);
378   auto status =
379       SendRpcMethod(stub2.get(), rpc_options, &context, request, &response);
380   EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
381   EXPECT_THAT(status.error_message(),
382               absl::StrCat("empty address list: ", kNewServerName,
383                            ": xDS listener resource does not exist"));
384 }
385 
TEST_P(TimeoutTest,RdsServerIgnoresRequest)386 TEST_P(TimeoutTest, RdsServerIgnoresRequest) {
387   balancer_->ads_service()->IgnoreResourceType(kRdsTypeUrl);
388   CheckRpcSendFailure(
389       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
390       absl::StrCat("empty address list: ", kDefaultRouteConfigurationName,
391                    ": xDS route configuration resource does not exist"),
392       RpcOptions().set_timeout_ms(4000));
393 }
394 
TEST_P(TimeoutTest,RdsResourceNotPresentInRequest)395 TEST_P(TimeoutTest, RdsResourceNotPresentInRequest) {
396   balancer_->ads_service()->UnsetResource(kRdsTypeUrl,
397                                           kDefaultRouteConfigurationName);
398   CheckRpcSendFailure(
399       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
400       absl::StrCat("empty address list: ", kDefaultRouteConfigurationName,
401                    ": xDS route configuration resource does not exist"),
402       RpcOptions().set_timeout_ms(4000));
403 }
404 
TEST_P(TimeoutTest,RdsSecondResourceNotPresentInRequest)405 TEST_P(TimeoutTest, RdsSecondResourceNotPresentInRequest) {
406   ASSERT_NE(GetParam().bootstrap_source(),
407             XdsTestType::kBootstrapFromChannelArg)
408       << "This test cannot use bootstrap from channel args, because it "
409          "needs two channels to use the same XdsClient instance.";
410   CreateAndStartBackends(1);
411   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
412   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
413   CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(4000));
414   // Add listener for 2nd channel, but no RDS resource.
415   const char* kNewServerName = "new-server.example.com";
416   const char* kNewRouteConfigName = "rds_resource_does_not_exist";
417   Listener listener = default_listener_;
418   listener.set_name(kNewServerName);
419   HttpConnectionManager http_connection_manager =
420       ClientHcmAccessor().Unpack(listener);
421   auto* rds = http_connection_manager.mutable_rds();
422   rds->set_route_config_name(kNewRouteConfigName);
423   rds->mutable_config_source()->mutable_self();
424   ClientHcmAccessor().Pack(http_connection_manager, &listener);
425   balancer_->ads_service()->SetLdsResource(listener);
426   // Create second channel for a new server name.
427   // This should fail because the LDS resource points to a non-existent RDS
428   // resource.
429   auto channel2 = CreateChannel(/*failover_timeout_ms=*/0, kNewServerName);
430   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
431   ClientContext context;
432   EchoRequest request;
433   EchoResponse response;
434   RpcOptions rpc_options;
435   rpc_options.set_timeout_ms(4000).SetupRpc(&context, &request);
436   auto status =
437       SendRpcMethod(stub2.get(), rpc_options, &context, request, &response);
438   EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
439   EXPECT_THAT(
440       status.error_message(),
441       absl::StrCat("empty address list: ", kNewRouteConfigName,
442                    ": xDS route configuration resource does not exist"));
443 }
444 
TEST_P(TimeoutTest,CdsServerIgnoresRequest)445 TEST_P(TimeoutTest, CdsServerIgnoresRequest) {
446   balancer_->ads_service()->IgnoreResourceType(kCdsTypeUrl);
447   CheckRpcSendFailure(
448       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
449       absl::StrCat("CDS resource ", kDefaultClusterName, " does not exist"),
450       RpcOptions().set_timeout_ms(4000));
451 }
452 
TEST_P(TimeoutTest,CdsResourceNotPresentInRequest)453 TEST_P(TimeoutTest, CdsResourceNotPresentInRequest) {
454   balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
455   CheckRpcSendFailure(
456       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
457       absl::StrCat("CDS resource ", kDefaultClusterName, " does not exist"),
458       RpcOptions().set_timeout_ms(4000));
459 }
460 
TEST_P(TimeoutTest,CdsSecondResourceNotPresentInRequest)461 TEST_P(TimeoutTest, CdsSecondResourceNotPresentInRequest) {
462   CreateAndStartBackends(1);
463   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
464   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
465   CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(4000));
466   // Change route config to point to non-existing cluster.
467   const char* kNewClusterName = "new_cluster_name";
468   RouteConfiguration route_config = default_route_config_;
469   route_config.mutable_virtual_hosts(0)
470       ->mutable_routes(0)
471       ->mutable_route()
472       ->set_cluster(kNewClusterName);
473   balancer_->ads_service()->SetRdsResource(route_config);
474   // New cluster times out.
475   // May need to wait a bit for the change to propagate to the client.
476   SendRpcsUntil(
477       DEBUG_LOCATION,
478       [&](const RpcResult& result) {
479         if (result.status.ok()) return true;  // Keep going.
480         EXPECT_EQ(StatusCode::UNAVAILABLE, result.status.error_code());
481         EXPECT_EQ(
482             absl::StrCat("CDS resource ", kNewClusterName, " does not exist"),
483             result.status.error_message());
484         return false;
485       },
486       /*timeout_ms=*/30000, RpcOptions().set_timeout_ms(4000));
487 }
488 
TEST_P(TimeoutTest,EdsServerIgnoresRequest)489 TEST_P(TimeoutTest, EdsServerIgnoresRequest) {
490   balancer_->ads_service()->IgnoreResourceType(kEdsTypeUrl);
491   CheckRpcSendFailure(
492       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
493       "no children in weighted_target policy: EDS resource eds_service_name "
494       "does not exist",
495       RpcOptions().set_timeout_ms(4000));
496 }
497 
TEST_P(TimeoutTest,EdsResourceNotPresentInRequest)498 TEST_P(TimeoutTest, EdsResourceNotPresentInRequest) {
499   // No need to remove EDS resource, since the test suite does not add it
500   // by default.
501   CheckRpcSendFailure(
502       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
503       "no children in weighted_target policy: EDS resource eds_service_name "
504       "does not exist",
505       RpcOptions().set_timeout_ms(4000));
506 }
507 
TEST_P(TimeoutTest,EdsSecondResourceNotPresentInRequest)508 TEST_P(TimeoutTest, EdsSecondResourceNotPresentInRequest) {
509   CreateAndStartBackends(1);
510   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
511   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
512   CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(4000));
513   // New cluster that points to a non-existant EDS resource.
514   const char* kNewClusterName = "new_cluster_name";
515   Cluster cluster = default_cluster_;
516   cluster.set_name(kNewClusterName);
517   cluster.mutable_eds_cluster_config()->set_service_name(
518       "eds_service_name_does_not_exist");
519   balancer_->ads_service()->SetCdsResource(cluster);
520   // Now add a route pointing to the new cluster.
521   RouteConfiguration route_config = default_route_config_;
522   auto* route = route_config.mutable_virtual_hosts(0)->mutable_routes(0);
523   *route_config.mutable_virtual_hosts(0)->add_routes() = *route;
524   route->mutable_match()->set_path("/grpc.testing.EchoTestService/Echo1");
525   route->mutable_route()->set_cluster(kNewClusterName);
526   balancer_->ads_service()->SetRdsResource(route_config);
527   // New EDS resource times out.
528   // May need to wait a bit for the RDS change to propagate to the client.
529   SendRpcsUntil(
530       DEBUG_LOCATION,
531       [](const RpcResult& result) {
532         if (result.status.ok()) return true;  // Keep going.
533         EXPECT_EQ(StatusCode::UNAVAILABLE, result.status.error_code());
534         EXPECT_EQ(result.status.error_message(),
535                   "no children in weighted_target policy: EDS resource "
536                   "eds_service_name_does_not_exist does not exist");
537         return false;
538       },
539       /*timeout_ms=*/30000,
540       RpcOptions().set_rpc_method(METHOD_ECHO1).set_timeout_ms(4000));
541 }
542 
TEST_P(TimeoutTest,ServerDoesNotResendAfterAdsStreamRestart)543 TEST_P(TimeoutTest, ServerDoesNotResendAfterAdsStreamRestart) {
544   CreateAndStartBackends(1);
545   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
546   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
547   CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(4000));
548   // Stop balancer.
549   balancer_->Shutdown();
550   // Expect minimum version 1 for all resource types.
551   balancer_->ads_service()->SetCheckVersionCallback(
552       [&](absl::string_view resource_type, int version) {
553         EXPECT_GE(version, 1) << "resource_type: " << resource_type;
554       });
555   // Tell balancer not to reply to the requests.
556   balancer_->ads_service()->IgnoreResourceType(kLdsTypeUrl);
557   balancer_->ads_service()->IgnoreResourceType(kRdsTypeUrl);
558   balancer_->ads_service()->IgnoreResourceType(kCdsTypeUrl);
559   balancer_->ads_service()->IgnoreResourceType(kEdsTypeUrl);
560   // Restart balancer.
561   balancer_->Start();
562   // Send RPCs for long enough to cover the ADS stream restart delay,
563   // the stream restart, and then the resulting timeout period, just to
564   // be sure that the channel continues to use the resources from before
565   // the restart.
566   absl::Time deadline =
567       absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor());
568   do {
569     CheckRpcSendOk(DEBUG_LOCATION);
570   } while (absl::Now() < deadline);
571 }
572 
573 //
574 // BootstrapSourceTest - tests different bootstrap sources
575 //
576 
577 using BootstrapSourceTest = XdsEnd2endTest;
578 
579 INSTANTIATE_TEST_SUITE_P(
580     XdsTest, BootstrapSourceTest,
581     ::testing::Values(
582         XdsTestType().set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar),
583         XdsTestType().set_bootstrap_source(XdsTestType::kBootstrapFromFile)),
584     &XdsTestType::Name);
585 
TEST_P(BootstrapSourceTest,Vanilla)586 TEST_P(BootstrapSourceTest, Vanilla) {
587   CreateAndStartBackends(1);
588   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
589   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
590   // Increase timeout, since kBootstrapFromFile takes more time on busy
591   // test machines.  (We've seen at least one occurrence where it's
592   // taken over 10 seconds.)
593   CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(15000));
594 }
595 
596 //
597 // XdsFederationTest - tests xDS federation
598 //
599 
600 class XdsFederationTest : public XdsEnd2endTest {
601  protected:
XdsFederationTest()602   XdsFederationTest() : authority_balancer_(CreateAndStartBalancer()) {}
603 
SetUp()604   void SetUp() override {
605     // Each test will use a slightly different bootstrap config,
606     // so SetUp() is intentionally empty here, and the real
607     // setup (calling of InitClient()) is moved into each test.
608   }
609 
TearDown()610   void TearDown() override {
611     authority_balancer_->Shutdown();
612     XdsEnd2endTest::TearDown();
613   }
614 
615   std::unique_ptr<BalancerServerThread> authority_balancer_;
616 };
617 
618 // Get bootstrap from env var, so that there's a global XdsClient.
619 // Runs with RDS so that we know all resource types work properly.
620 INSTANTIATE_TEST_SUITE_P(
621     XdsTest, XdsFederationTest,
622     ::testing::Values(
623         XdsTestType()
624             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
625             .set_enable_rds_testing()),
626     &XdsTestType::Name);
627 
628 // Channel is created with URI "xds:server.example.com".
629 // Bootstrap config default client listener template uses new-style name with
630 // authority "xds.example.com".
TEST_P(XdsFederationTest,FederationTargetNoAuthorityWithResourceTemplate)631 TEST_P(XdsFederationTest, FederationTargetNoAuthorityWithResourceTemplate) {
632   const char* kAuthority = "xds.example.com";
633   const char* kNewListenerTemplate =
634       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
635       "client/%s?psm_project_id=1234";
636   const char* kNewListenerName =
637       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
638       "client/server.example.com?psm_project_id=1234";
639   const char* kNewRouteConfigName =
640       "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
641       "new_route_config_name";
642   const char* kNewEdsServiceName =
643       "xdstp://xds.example.com/envoy.config.endpoint.v3.ClusterLoadAssignment/"
644       "new_edsservice_name";
645   const char* kNewClusterName =
646       "xdstp://xds.example.com/envoy.config.cluster.v3.Cluster/"
647       "new_cluster_name";
648   XdsBootstrapBuilder builder = MakeBootstrapBuilder();
649   builder.SetClientDefaultListenerResourceNameTemplate(kNewListenerTemplate);
650   builder.AddAuthority(
651       kAuthority, absl::StrCat("localhost:", authority_balancer_->port()),
652       // Note we will not use the client_listener_resource_name_template field
653       // in the authority.
654       "xdstp://xds.example.com/envoy.config.listener.v3.Listener"
655       "client/%s?client_listener_resource_name_template_not_in_use");
656   InitClient(builder);
657   CreateAndStartBackends(2);
658   // Eds for the new authority balancer.
659   EdsResourceArgs args =
660       EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
661   authority_balancer_->ads_service()->SetEdsResource(
662       BuildEdsResource(args, kNewEdsServiceName));
663   // New cluster
664   Cluster new_cluster = default_cluster_;
665   new_cluster.set_name(kNewClusterName);
666   new_cluster.mutable_eds_cluster_config()->set_service_name(
667       kNewEdsServiceName);
668   authority_balancer_->ads_service()->SetCdsResource(new_cluster);
669   // New Route
670   RouteConfiguration new_route_config = default_route_config_;
671   new_route_config.set_name(kNewRouteConfigName);
672   new_route_config.mutable_virtual_hosts(0)
673       ->mutable_routes(0)
674       ->mutable_route()
675       ->set_cluster(kNewClusterName);
676   // New Listener
677   Listener listener = default_listener_;
678   listener.set_name(kNewListenerName);
679   SetListenerAndRouteConfiguration(authority_balancer_.get(), listener,
680                                    new_route_config);
681   WaitForAllBackends(DEBUG_LOCATION);
682 }
683 
684 // Channel is created with URI "xds://xds.example.com/server.example.com".
685 // In bootstrap config, authority has no client listener template, so we use the
686 // default.
TEST_P(XdsFederationTest,FederationTargetAuthorityDefaultResourceTemplate)687 TEST_P(XdsFederationTest, FederationTargetAuthorityDefaultResourceTemplate) {
688   const char* kAuthority = "xds.example.com";
689   const char* kNewServerName = "whee%/server.example.com";
690   const char* kNewListenerName =
691       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
692       "whee%25/server.example.com";
693   const char* kNewRouteConfigName =
694       "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
695       "new_route_config_name";
696   const char* kNewEdsServiceName =
697       "xdstp://xds.example.com/envoy.config.endpoint.v3.ClusterLoadAssignment/"
698       "edsservice_name";
699   const char* kNewClusterName =
700       "xdstp://xds.example.com/envoy.config.cluster.v3.Cluster/"
701       "cluster_name";
702   XdsBootstrapBuilder builder = MakeBootstrapBuilder();
703   builder.AddAuthority(kAuthority,
704                        absl::StrCat("localhost:", authority_balancer_->port()));
705   InitClient(builder);
706   CreateAndStartBackends(2);
707   // Eds for 2 balancers to ensure RPCs sent using current stub go to backend 0
708   // and RPCs sent using the new stub go to backend 1.
709   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
710   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
711   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
712   authority_balancer_->ads_service()->SetEdsResource(
713       BuildEdsResource(args, kNewEdsServiceName));
714   // New cluster
715   Cluster new_cluster = default_cluster_;
716   new_cluster.set_name(kNewClusterName);
717   new_cluster.mutable_eds_cluster_config()->set_service_name(
718       kNewEdsServiceName);
719   authority_balancer_->ads_service()->SetCdsResource(new_cluster);
720   // New Route
721   RouteConfiguration new_route_config = default_route_config_;
722   new_route_config.set_name(kNewRouteConfigName);
723   new_route_config.mutable_virtual_hosts(0)
724       ->mutable_routes(0)
725       ->mutable_route()
726       ->set_cluster(kNewClusterName);
727   // New Listener
728   Listener listener = default_listener_;
729   listener.set_name(kNewListenerName);
730   SetListenerAndRouteConfiguration(authority_balancer_.get(), listener,
731                                    new_route_config);
732   // Ensure update has reached and send 10 RPCs to the current stub.
733   WaitForAllBackends(DEBUG_LOCATION, 0, 1);
734   // Create second channel to new target uri and send 1 RPC.
735   auto channel2 =
736       CreateChannel(/*failover_timeout_ms=*/0, kNewServerName, kAuthority);
737   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
738   ClientContext context;
739   EchoRequest request;
740   RpcOptions().SetupRpc(&context, &request);
741   EchoResponse response;
742   grpc::Status status = stub2->Echo(&context, request, &response);
743   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
744                            << " message=" << status.error_message();
745   // We should be reaching backend 1, not 0, as balanced by the authority xds
746   // server.
747   EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
748   EXPECT_EQ(1U, backends_[1]->backend_service()->request_count());
749 }
750 
751 // Channel is created with URI "xds://xds.example.com/server.example.com".
752 // Bootstrap entry for that authority specifies a client listener name template.
TEST_P(XdsFederationTest,FederationTargetAuthorityWithResourceTemplate)753 TEST_P(XdsFederationTest, FederationTargetAuthorityWithResourceTemplate) {
754   const char* kAuthority = "xds.example.com";
755   const char* kNewServerName = "whee%/server.example.com";
756   const char* kNewListenerTemplate =
757       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
758       "client/%s?psm_project_id=1234";
759   const char* kNewListenerName =
760       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
761       "client/whee%25/server.example.com?psm_project_id=1234";
762   const char* kNewRouteConfigName =
763       "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
764       "new_route_config_name";
765   const char* kNewEdsServiceName =
766       "xdstp://xds.example.com/envoy.config.endpoint.v3.ClusterLoadAssignment/"
767       "edsservice_name";
768   const char* kNewClusterName =
769       "xdstp://xds.example.com/envoy.config.cluster.v3.Cluster/"
770       "cluster_name";
771   XdsBootstrapBuilder builder = MakeBootstrapBuilder();
772   builder.AddAuthority(kAuthority,
773                        absl::StrCat("localhost:", authority_balancer_->port()),
774                        kNewListenerTemplate);
775   InitClient(builder);
776   CreateAndStartBackends(2);
777   // Eds for 2 balancers to ensure RPCs sent using current stub go to backend 0
778   // and RPCs sent using the new stub go to backend 1.
779   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
780   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
781   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
782   authority_balancer_->ads_service()->SetEdsResource(
783       BuildEdsResource(args, kNewEdsServiceName));
784   // New cluster
785   Cluster new_cluster = default_cluster_;
786   new_cluster.set_name(kNewClusterName);
787   new_cluster.mutable_eds_cluster_config()->set_service_name(
788       kNewEdsServiceName);
789   authority_balancer_->ads_service()->SetCdsResource(new_cluster);
790   // New Route
791   RouteConfiguration new_route_config = default_route_config_;
792   new_route_config.set_name(kNewRouteConfigName);
793   new_route_config.mutable_virtual_hosts(0)
794       ->mutable_routes(0)
795       ->mutable_route()
796       ->set_cluster(kNewClusterName);
797   // New Listener
798   Listener listener = default_listener_;
799   listener.set_name(kNewListenerName);
800   SetListenerAndRouteConfiguration(authority_balancer_.get(), listener,
801                                    new_route_config);
802   // Ensure update has reached and send 10 RPCs to the current stub.
803   WaitForAllBackends(DEBUG_LOCATION, 0, 1);
804   // Create second channel to new target uri and send 1 RPC.
805   auto channel2 =
806       CreateChannel(/*failover_timeout_ms=*/0, kNewServerName, kAuthority);
807   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
808   ClientContext context;
809   EchoRequest request;
810   RpcOptions().SetupRpc(&context, &request);
811   EchoResponse response;
812   grpc::Status status = stub2->Echo(&context, request, &response);
813   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
814                            << " message=" << status.error_message();
815   // We should be reaching backend 1, not 0, as balanced by the authority xds
816   // server.
817   EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
818   EXPECT_EQ(1U, backends_[1]->backend_service()->request_count());
819 }
820 
TEST_P(XdsFederationTest,TargetUriAuthorityUnknown)821 TEST_P(XdsFederationTest, TargetUriAuthorityUnknown) {
822   const char* kAuthority = "xds.example.com";
823   const char* kNewServerName = "whee%/server.example.com";
824   const char* kNewListenerTemplate =
825       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
826       "client/%s?psm_project_id=1234";
827   XdsBootstrapBuilder builder = MakeBootstrapBuilder();
828   builder.AddAuthority(
829       kAuthority, absl::StrCat("localhost:", grpc_pick_unused_port_or_die()),
830       kNewListenerTemplate);
831   InitClient(builder);
832   auto channel2 = CreateChannel(
833       /*failover_timeout_ms=*/0, kNewServerName, "xds.unknown.com");
834   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
835   ClientContext context;
836   EchoRequest request;
837   RpcOptions().SetupRpc(&context, &request);
838   EchoResponse response;
839   grpc::Status status = stub2->Echo(&context, request, &response);
840   EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
841   EXPECT_EQ(status.error_message(),
842             "Invalid target URI -- authority not found for xds.unknown.com");
843   ASSERT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel2->GetState(false));
844 }
845 
TEST_P(XdsFederationTest,RdsResourceNameAuthorityUnknown)846 TEST_P(XdsFederationTest, RdsResourceNameAuthorityUnknown) {
847   const char* kAuthority = "xds.example.com";
848   const char* kNewServerName = "whee%/server.example.com";
849   const char* kNewListenerTemplate =
850       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
851       "client/%s?psm_project_id=1234";
852   const char* kNewListenerName =
853       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
854       "client/whee%25/server.example.com?psm_project_id=1234";
855   const char* kNewRouteConfigName =
856       "xdstp://xds.unknown.com/envoy.config.route.v3.RouteConfiguration/"
857       "new_route_config_name";
858   XdsBootstrapBuilder builder = MakeBootstrapBuilder();
859   builder.AddAuthority(kAuthority,
860                        absl::StrCat("localhost:", authority_balancer_->port()),
861                        kNewListenerTemplate);
862   InitClient(builder);
863   // New RouteConfig
864   RouteConfiguration new_route_config = default_route_config_;
865   new_route_config.set_name(kNewRouteConfigName);
866   // New Listener
867   Listener listener = default_listener_;
868   listener.set_name(kNewListenerName);
869   SetListenerAndRouteConfiguration(authority_balancer_.get(), listener,
870                                    new_route_config);
871   // Channel should report TRANSIENT_FAILURE.
872   auto channel2 =
873       CreateChannel(/*failover_timeout_ms=*/0, kNewServerName, kAuthority);
874   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
875   ClientContext context;
876   EchoRequest request;
877   RpcOptions().SetupRpc(&context, &request);
878   EchoResponse response;
879   grpc::Status status = stub2->Echo(&context, request, &response);
880   EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
881   EXPECT_EQ(status.error_message(),
882             absl::StrCat(
883                 kNewRouteConfigName,
884                 ": UNAVAILABLE: authority \"xds.unknown.com\" not present in "
885                 "bootstrap config"));
886   ASSERT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel2->GetState(false));
887 }
888 
TEST_P(XdsFederationTest,CdsResourceNameAuthorityUnknown)889 TEST_P(XdsFederationTest, CdsResourceNameAuthorityUnknown) {
890   const char* kAuthority = "xds.example.com";
891   const char* kNewServerName = "whee%/server.example.com";
892   const char* kNewListenerTemplate =
893       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
894       "client/%s?psm_project_id=1234";
895   const char* kNewListenerName =
896       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
897       "client/whee%25/server.example.com?psm_project_id=1234";
898   const char* kNewRouteConfigName =
899       "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
900       "new_route_config_name";
901   const char* kNewClusterName =
902       "xdstp://xds.unknown.com/envoy.config.cluster.v3.Cluster/"
903       "cluster_name";
904   XdsBootstrapBuilder builder = MakeBootstrapBuilder();
905   builder.AddAuthority(kAuthority,
906                        absl::StrCat("localhost:", authority_balancer_->port()),
907                        kNewListenerTemplate);
908   InitClient(builder);
909   // New Route
910   RouteConfiguration new_route_config = default_route_config_;
911   new_route_config.set_name(kNewRouteConfigName);
912   new_route_config.mutable_virtual_hosts(0)
913       ->mutable_routes(0)
914       ->mutable_route()
915       ->set_cluster(kNewClusterName);
916   // New Listener
917   Listener listener = default_listener_;
918   listener.set_name(kNewListenerName);
919   SetListenerAndRouteConfiguration(authority_balancer_.get(), listener,
920                                    new_route_config);
921   // Channel should report TRANSIENT_FAILURE.
922   auto channel2 =
923       CreateChannel(/*failover_timeout_ms=*/0, kNewServerName, kAuthority);
924   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
925   ClientContext context;
926   EchoRequest request;
927   RpcOptions().SetupRpc(&context, &request);
928   EchoResponse response;
929   grpc::Status status = stub2->Echo(&context, request, &response);
930   EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
931   EXPECT_EQ(status.error_message(),
932             absl::StrCat(kNewClusterName,
933                          ": authority \"xds.unknown.com\" not present in "
934                          "bootstrap config"));
935   ASSERT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel2->GetState(false));
936 }
937 
TEST_P(XdsFederationTest,EdsResourceNameAuthorityUnknown)938 TEST_P(XdsFederationTest, EdsResourceNameAuthorityUnknown) {
939   const char* kAuthority = "xds.example.com";
940   const char* kNewServerName = "whee%/server.example.com";
941   const char* kNewListenerTemplate =
942       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
943       "client/%s?psm_project_id=1234";
944   const char* kNewListenerName =
945       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
946       "client/whee%25/server.example.com?psm_project_id=1234";
947   const char* kNewRouteConfigName =
948       "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
949       "new_route_config_name";
950   const char* kNewEdsServiceName =
951       "xdstp://xds.unknown.com/envoy.config.endpoint.v3.ClusterLoadAssignment/"
952       "edsservice_name";
953   const char* kNewClusterName =
954       "xdstp://xds.example.com/envoy.config.cluster.v3.Cluster/"
955       "cluster_name";
956   XdsBootstrapBuilder builder = MakeBootstrapBuilder();
957   builder.AddAuthority(kAuthority,
958                        absl::StrCat("localhost:", authority_balancer_->port()),
959                        kNewListenerTemplate);
960   InitClient(builder);
961   // New cluster
962   Cluster new_cluster = default_cluster_;
963   new_cluster.set_name(kNewClusterName);
964   new_cluster.mutable_eds_cluster_config()->set_service_name(
965       kNewEdsServiceName);
966   authority_balancer_->ads_service()->SetCdsResource(new_cluster);
967   // New Route
968   RouteConfiguration new_route_config = default_route_config_;
969   new_route_config.set_name(kNewRouteConfigName);
970   new_route_config.mutable_virtual_hosts(0)
971       ->mutable_routes(0)
972       ->mutable_route()
973       ->set_cluster(kNewClusterName);
974   // New Listener
975   Listener listener = default_listener_;
976   listener.set_name(kNewListenerName);
977   SetListenerAndRouteConfiguration(authority_balancer_.get(), listener,
978                                    new_route_config);
979   // Channel should report TRANSIENT_FAILURE.
980   auto channel2 =
981       CreateChannel(/*failover_timeout_ms=*/0, kNewServerName, kAuthority);
982   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
983   ClientContext context;
984   EchoRequest request;
985   RpcOptions().SetupRpc(&context, &request);
986   EchoResponse response;
987   grpc::Status status = stub2->Echo(&context, request, &response);
988   EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
989   EXPECT_EQ(
990       status.error_message(),
991       "no children in weighted_target policy: EDS resource "
992       "xdstp://xds.unknown.com/envoy.config.endpoint.v3.ClusterLoadAssignment/"
993       "edsservice_name: UNAVAILABLE: authority \"xds.unknown.com\" not "
994       "present in bootstrap config");
995   ASSERT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel2->GetState(false));
996 }
997 
998 // Setting server_listener_resource_name_template to start with "xdstp:" and
999 // look up xds server under an authority map.
TEST_P(XdsFederationTest,FederationServer)1000 TEST_P(XdsFederationTest, FederationServer) {
1001   const char* kAuthority = "xds.example.com";
1002   const char* kNewListenerTemplate =
1003       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
1004       "client/%s?psm_project_id=1234";
1005   const char* kNewServerListenerTemplate =
1006       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
1007       "server/%s?psm_project_id=1234";
1008   const char* kNewListenerName =
1009       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
1010       "client/server.example.com?psm_project_id=1234";
1011   const char* kNewRouteConfigName =
1012       "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
1013       "new_route_config_name";
1014   const char* kNewServerRouteConfigName =
1015       "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
1016       "new_server_route_config_name";
1017   const char* kNewEdsServiceName =
1018       "xdstp://xds.example.com/envoy.config.endpoint.v3.ClusterLoadAssignment/"
1019       "new_edsservice_name";
1020   const char* kNewClusterName =
1021       "xdstp://xds.example.com/envoy.config.cluster.v3.Cluster/"
1022       "new_cluster_name";
1023   XdsBootstrapBuilder builder = MakeBootstrapBuilder();
1024   builder.SetClientDefaultListenerResourceNameTemplate(kNewListenerTemplate);
1025   builder.SetServerListenerResourceNameTemplate(kNewServerListenerTemplate);
1026   builder.AddAuthority(
1027       kAuthority, absl::StrCat("localhost:", authority_balancer_->port()),
1028       // Note we will not use the client_listener_resource_name_template field
1029       // in the authority.
1030       "xdstp://xds.example.com/envoy.config.listener.v3.Listener"
1031       "client/%s?client_listener_resource_name_template_not_in_use");
1032   InitClient(builder);
1033   CreateAndStartBackends(2, /*xds_enabled=*/true);
1034   // Eds for new authority balancer.
1035   EdsResourceArgs args =
1036       EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
1037   authority_balancer_->ads_service()->SetEdsResource(
1038       BuildEdsResource(args, kNewEdsServiceName));
1039   // New cluster
1040   Cluster new_cluster = default_cluster_;
1041   new_cluster.set_name(kNewClusterName);
1042   new_cluster.mutable_eds_cluster_config()->set_service_name(
1043       kNewEdsServiceName);
1044   authority_balancer_->ads_service()->SetCdsResource(new_cluster);
1045   // New RouteConfig
1046   RouteConfiguration new_route_config = default_route_config_;
1047   new_route_config.set_name(kNewRouteConfigName);
1048   new_route_config.mutable_virtual_hosts(0)
1049       ->mutable_routes(0)
1050       ->mutable_route()
1051       ->set_cluster(kNewClusterName);
1052   // New Listener
1053   Listener listener = default_listener_;
1054   listener.set_name(kNewListenerName);
1055   SetListenerAndRouteConfiguration(authority_balancer_.get(), listener,
1056                                    new_route_config);
1057   // New Server RouteConfig
1058   RouteConfiguration new_server_route_config = default_server_route_config_;
1059   new_server_route_config.set_name(kNewServerRouteConfigName);
1060   // New Server Listeners
1061   for (int port : GetBackendPorts()) {
1062     Listener server_listener = default_server_listener_;
1063     server_listener.set_name(absl::StrCat(
1064         "xdstp://xds.example.com/envoy.config.listener.v3.Listener/server/",
1065         grpc_core::LocalIp(), ":", port, "?psm_project_id=1234"));
1066     server_listener.mutable_address()->mutable_socket_address()->set_port_value(
1067         port);
1068     SetListenerAndRouteConfiguration(authority_balancer_.get(), server_listener,
1069                                      new_server_route_config,
1070                                      ServerHcmAccessor());
1071   }
1072   WaitForAllBackends(DEBUG_LOCATION);
1073 }
1074 
1075 //
1076 // XdsMetricsTest - tests xDS metrics
1077 //
1078 
1079 class XdsMetricsTest : public XdsEnd2endTest {
1080  protected:
SetUp()1081   void SetUp() override {
1082     stats_plugin_ = grpc_core::FakeStatsPluginBuilder()
1083                         .UseDisabledByDefaultMetrics(true)
1084                         .BuildAndRegister();
1085     InitClient();
1086   }
1087 
1088   std::shared_ptr<grpc_core::FakeStatsPlugin> stats_plugin_;
1089 };
1090 
1091 // Runs with RDS so that we know all resource types work properly.
1092 INSTANTIATE_TEST_SUITE_P(
1093     XdsTest, XdsMetricsTest,
1094     ::testing::Values(XdsTestType().set_enable_rds_testing()),
1095     &XdsTestType::Name);
1096 
TEST_P(XdsMetricsTest,MetricDefinitionResourceUpdatesValid)1097 TEST_P(XdsMetricsTest, MetricDefinitionResourceUpdatesValid) {
1098   const auto* descriptor =
1099       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1100           "grpc.xds_client.resource_updates_valid");
1101   ASSERT_NE(descriptor, nullptr);
1102   EXPECT_EQ(descriptor->value_type,
1103             grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1104   EXPECT_EQ(descriptor->instrument_type,
1105             grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1106   EXPECT_EQ(descriptor->enable_by_default, false);
1107   EXPECT_EQ(descriptor->name, "grpc.xds_client.resource_updates_valid");
1108   EXPECT_EQ(descriptor->unit, "{resource}");
1109   EXPECT_THAT(descriptor->label_keys,
1110               ::testing::ElementsAre("grpc.target", "grpc.xds.server",
1111                                      "grpc.xds.resource_type"));
1112   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1113 }
1114 
TEST_P(XdsMetricsTest,MetricDefinitionResourceUpdatesInvalid)1115 TEST_P(XdsMetricsTest, MetricDefinitionResourceUpdatesInvalid) {
1116   const auto* descriptor =
1117       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1118           "grpc.xds_client.resource_updates_invalid");
1119   ASSERT_NE(descriptor, nullptr);
1120   EXPECT_EQ(descriptor->value_type,
1121             grpc_core::GlobalInstrumentsRegistry::ValueType::kUInt64);
1122   EXPECT_EQ(descriptor->instrument_type,
1123             grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCounter);
1124   EXPECT_EQ(descriptor->enable_by_default, false);
1125   EXPECT_EQ(descriptor->name, "grpc.xds_client.resource_updates_invalid");
1126   EXPECT_EQ(descriptor->unit, "{resource}");
1127   EXPECT_THAT(descriptor->label_keys,
1128               ::testing::ElementsAre("grpc.target", "grpc.xds.server",
1129                                      "grpc.xds.resource_type"));
1130   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1131 }
1132 
TEST_P(XdsMetricsTest,MetricDefinitionConnected)1133 TEST_P(XdsMetricsTest, MetricDefinitionConnected) {
1134   const auto* descriptor =
1135       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1136           "grpc.xds_client.connected");
1137   ASSERT_NE(descriptor, nullptr);
1138   EXPECT_EQ(descriptor->value_type,
1139             grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64);
1140   EXPECT_EQ(
1141       descriptor->instrument_type,
1142       grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
1143   EXPECT_EQ(descriptor->enable_by_default, false);
1144   EXPECT_EQ(descriptor->name, "grpc.xds_client.connected");
1145   EXPECT_EQ(descriptor->unit, "{bool}");
1146   EXPECT_THAT(descriptor->label_keys,
1147               ::testing::ElementsAre("grpc.target", "grpc.xds.server"));
1148   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1149 }
1150 
TEST_P(XdsMetricsTest,MetricDefinitionResources)1151 TEST_P(XdsMetricsTest, MetricDefinitionResources) {
1152   const auto* descriptor =
1153       grpc_core::GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
1154           "grpc.xds_client.resources");
1155   ASSERT_NE(descriptor, nullptr);
1156   EXPECT_EQ(descriptor->value_type,
1157             grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64);
1158   EXPECT_EQ(
1159       descriptor->instrument_type,
1160       grpc_core::GlobalInstrumentsRegistry::InstrumentType::kCallbackGauge);
1161   EXPECT_EQ(descriptor->enable_by_default, false);
1162   EXPECT_EQ(descriptor->name, "grpc.xds_client.resources");
1163   EXPECT_EQ(descriptor->unit, "{resource}");
1164   EXPECT_THAT(
1165       descriptor->label_keys,
1166       ::testing::ElementsAre("grpc.target", "grpc.xds.authority",
1167                              "grpc.xds.resource_type", "grpc.xds.cache_state"));
1168   EXPECT_THAT(descriptor->optional_label_keys, ::testing::ElementsAre());
1169 }
1170 
TEST_P(XdsMetricsTest,MetricValues)1171 TEST_P(XdsMetricsTest, MetricValues) {
1172   const auto kMetricResourceUpdatesValid =
1173       grpc_core::GlobalInstrumentsRegistryTestPeer::
1174           FindUInt64CounterHandleByName(
1175               "grpc.xds_client.resource_updates_valid")
1176               .value();
1177   const auto kMetricResourceUpdatesInvalid =
1178       grpc_core::GlobalInstrumentsRegistryTestPeer::
1179           FindUInt64CounterHandleByName(
1180               "grpc.xds_client.resource_updates_invalid")
1181               .value();
1182   const auto kMetricConnected =
1183       grpc_core::GlobalInstrumentsRegistryTestPeer::
1184           FindCallbackInt64GaugeHandleByName("grpc.xds_client.connected")
1185               .value();
1186   const auto kMetricResources =
1187       grpc_core::GlobalInstrumentsRegistryTestPeer::
1188           FindCallbackInt64GaugeHandleByName("grpc.xds_client.resources")
1189               .value();
1190   const std::string kTarget = absl::StrCat("xds:", kServerName);
1191   const std::string kXdsServer = absl::StrCat("localhost:", balancer_->port());
1192   CreateAndStartBackends(1, /*xds_enabled=*/true);
1193   EdsResourceArgs args =
1194       EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
1195   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1196   CheckRpcSendOk(DEBUG_LOCATION);
1197   stats_plugin_->TriggerCallbacks();
1198   // Check client metrics.
1199   EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(kMetricConnected,
1200                                                    {kTarget, kXdsServer}, {}),
1201               ::testing::Optional(1));
1202   for (absl::string_view type_url :
1203        {"envoy.config.listener.v3.Listener",
1204         "envoy.config.route.v3.RouteConfiguration",
1205         "envoy.config.cluster.v3.Cluster",
1206         "envoy.config.endpoint.v3.ClusterLoadAssignment"}) {
1207     EXPECT_THAT(
1208         stats_plugin_->GetCounterValue(kMetricResourceUpdatesValid,
1209                                        {kTarget, kXdsServer, type_url}, {}),
1210         ::testing::Optional(1));
1211     EXPECT_THAT(
1212         stats_plugin_->GetCounterValue(kMetricResourceUpdatesInvalid,
1213                                        {kTarget, kXdsServer, type_url}, {}),
1214         ::testing::Optional(0));
1215     EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(
1216                     kMetricResources, {kTarget, "#old", type_url, "acked"}, {}),
1217                 ::testing::Optional(1));
1218   }
1219   // Check server metrics.
1220   EXPECT_THAT(stats_plugin_->GetCallbackGaugeValue(kMetricConnected,
1221                                                    {"#server", kXdsServer}, {}),
1222               ::testing::Optional(1));
1223   for (absl::string_view type_url :
1224        {"envoy.config.listener.v3.Listener",
1225         "envoy.config.route.v3.RouteConfiguration"}) {
1226     EXPECT_THAT(
1227         stats_plugin_->GetCounterValue(kMetricResourceUpdatesValid,
1228                                        {"#server", kXdsServer, type_url}, {}),
1229         ::testing::Optional(1));
1230     EXPECT_THAT(
1231         stats_plugin_->GetCounterValue(kMetricResourceUpdatesInvalid,
1232                                        {"#server", kXdsServer, type_url}, {}),
1233         ::testing::Optional(0));
1234     EXPECT_THAT(
1235         stats_plugin_->GetCallbackGaugeValue(
1236             kMetricResources, {"#server", "#old", type_url, "acked"}, {}),
1237         ::testing::Optional(1));
1238   }
1239 }
1240 
1241 //
1242 // XdsFederationDisabledTest
1243 //
1244 
1245 using XdsFederationDisabledTest = XdsEnd2endTest;
1246 
1247 // Runs with RDS so that we know all resource types work properly.
1248 INSTANTIATE_TEST_SUITE_P(
1249     XdsTest, XdsFederationDisabledTest,
1250     ::testing::Values(XdsTestType().set_enable_rds_testing()),
1251     &XdsTestType::Name);
1252 
1253 // TODO(roth,apolcyn): remove this test when the
1254 // GRPC_EXPERIMENTAL_XDS_FEDERATION env var is removed.
TEST_P(XdsFederationDisabledTest,FederationDisabledWithNewStyleNames)1255 TEST_P(XdsFederationDisabledTest, FederationDisabledWithNewStyleNames) {
1256   grpc_core::testing::ScopedEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION",
1257                                            "false");
1258   const char* kNewRouteConfigName =
1259       "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
1260       "new_route_config_name";
1261   const char* kNewClusterName =
1262       "xdstp://xds.example.com/envoy.config.cluster.v3.Cluster/"
1263       "cluster_name";
1264   const char* kNewEdsResourceName =
1265       "xdstp://xds.example.com/envoy.config.endpoint.v3.ClusterLoadAssignment/"
1266       "edsservice_name";
1267   InitClient();
1268   CreateAndStartBackends(1);
1269   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1270   balancer_->ads_service()->SetEdsResource(
1271       BuildEdsResource(args, kNewEdsResourceName));
1272   // New cluster
1273   Cluster new_cluster = default_cluster_;
1274   new_cluster.set_name(kNewClusterName);
1275   new_cluster.mutable_eds_cluster_config()->set_service_name(
1276       kNewEdsResourceName);
1277   balancer_->ads_service()->SetCdsResource(new_cluster);
1278   // New RouteConfig
1279   RouteConfiguration new_route_config = default_route_config_;
1280   new_route_config.set_name(kNewRouteConfigName);
1281   new_route_config.mutable_virtual_hosts(0)
1282       ->mutable_routes(0)
1283       ->mutable_route()
1284       ->set_cluster(kNewClusterName);
1285   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1286                                    new_route_config);
1287   // Channel should work.
1288   CheckRpcSendOk(DEBUG_LOCATION);
1289 }
1290 
1291 //
1292 // XdsFederationLoadReportingTest - xDS federation and load reporting
1293 //
1294 
1295 using XdsFederationLoadReportingTest = XdsFederationTest;
1296 
1297 // Get bootstrap from env var, so that there's a global XdsClient.
1298 // Runs with and without RDS.
1299 INSTANTIATE_TEST_SUITE_P(
1300     XdsTest, XdsFederationLoadReportingTest,
1301     ::testing::Values(
1302         XdsTestType()
1303             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
1304             .set_enable_load_reporting(),
1305         XdsTestType()
1306             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
1307             .set_enable_load_reporting()
1308             .set_enable_rds_testing()),
1309     &XdsTestType::Name);
1310 
1311 // Channel is created with URI "xds://xds.example.com/server.example.com".
1312 // Bootstrap entry for that authority specifies a client listener name template.
1313 // Sending traffic to both default balancer and authority balancer and checking
1314 // load reporting with each one.
TEST_P(XdsFederationLoadReportingTest,FederationMultipleLoadReportingTest)1315 TEST_P(XdsFederationLoadReportingTest, FederationMultipleLoadReportingTest) {
1316   const char* kAuthority = "xds.example.com";
1317   const char* kNewServerName = "whee%/server.example.com";
1318   const char* kNewListenerTemplate =
1319       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
1320       "client/%s?psm_project_id=1234";
1321   const char* kNewListenerName =
1322       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
1323       "client/whee%25/server.example.com?psm_project_id=1234";
1324   const char* kNewRouteConfigName =
1325       "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
1326       "new_route_config_name";
1327   const char* kNewEdsServiceName =
1328       "xdstp://xds.example.com/envoy.config.endpoint.v3.ClusterLoadAssignment/"
1329       "edsservice_name";
1330   const char* kNewClusterName =
1331       "xdstp://xds.example.com/envoy.config.cluster.v3.Cluster/"
1332       "cluster_name";
1333   const size_t kNumRpcsToDefaultBalancer = 5;
1334   const size_t kNumRpcsToAuthorityBalancer = 10;
1335   XdsBootstrapBuilder builder = MakeBootstrapBuilder();
1336   builder.AddAuthority(kAuthority,
1337                        absl::StrCat("localhost:", authority_balancer_->port()),
1338                        kNewListenerTemplate);
1339   InitClient(builder);
1340   CreateAndStartBackends(2, /*xds_enabled=*/true);
1341   // Eds for 2 balancers to ensure RPCs sent using current stub go to backend 0
1342   // and RPCs sent using the new stub go to backend 1.
1343   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
1344   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1345   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
1346   authority_balancer_->ads_service()->SetEdsResource(
1347       BuildEdsResource(args, kNewEdsServiceName));
1348   authority_balancer_->lrs_service()->set_cluster_names({kNewClusterName});
1349   // New cluster
1350   Cluster new_cluster = default_cluster_;
1351   new_cluster.set_name(kNewClusterName);
1352   new_cluster.mutable_lrs_server()->mutable_self();
1353   new_cluster.mutable_eds_cluster_config()->set_service_name(
1354       kNewEdsServiceName);
1355   authority_balancer_->ads_service()->SetCdsResource(new_cluster);
1356   // New Route
1357   RouteConfiguration new_route_config = default_route_config_;
1358   new_route_config.set_name(kNewRouteConfigName);
1359   new_route_config.mutable_virtual_hosts(0)
1360       ->mutable_routes(0)
1361       ->mutable_route()
1362       ->set_cluster(kNewClusterName);
1363   // New Listener
1364   Listener listener = default_listener_;
1365   listener.set_name(kNewListenerName);
1366   SetListenerAndRouteConfiguration(authority_balancer_.get(), listener,
1367                                    new_route_config);
1368   // Send kNumRpcsToDefaultBalancer RPCs to the current stub.
1369   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsToDefaultBalancer);
1370   // Create second channel to new target uri.
1371   auto channel2 =
1372       CreateChannel(/*failover_timeout_ms=*/0, kNewServerName, kAuthority);
1373   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
1374   // Send kNumRpcsToAuthorityBalancer on the second channel.
1375   for (size_t i = 0; i < kNumRpcsToAuthorityBalancer; ++i) {
1376     ClientContext context;
1377     EchoRequest request;
1378     RpcOptions().SetupRpc(&context, &request);
1379     EchoResponse response;
1380     grpc::Status status = stub2->Echo(&context, request, &response);
1381     EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1382                              << " message=" << status.error_message();
1383   }
1384   // Each backend should have received the expected number of RPCs,
1385   // and the load report also reflect the correct numbers.
1386   EXPECT_EQ(kNumRpcsToAuthorityBalancer,
1387             backends_[1]->backend_service()->request_count());
1388   EXPECT_EQ(kNumRpcsToDefaultBalancer,
1389             backends_[0]->backend_service()->request_count());
1390   // Load report for authority LRS.
1391   std::vector<ClientStats> authority_load_report =
1392       authority_balancer_->lrs_service()->WaitForLoadReport();
1393   ASSERT_EQ(authority_load_report.size(), 1UL);
1394   ClientStats& authority_client_stats = authority_load_report.front();
1395   EXPECT_EQ(authority_client_stats.cluster_name(), kNewClusterName);
1396   EXPECT_EQ(authority_client_stats.eds_service_name(), kNewEdsServiceName);
1397   EXPECT_EQ(kNumRpcsToAuthorityBalancer,
1398             authority_client_stats.total_successful_requests());
1399   EXPECT_EQ(0U, authority_client_stats.total_requests_in_progress());
1400   EXPECT_EQ(kNumRpcsToAuthorityBalancer,
1401             authority_client_stats.total_issued_requests());
1402   EXPECT_EQ(0U, authority_client_stats.total_error_requests());
1403   EXPECT_EQ(0U, authority_client_stats.total_dropped_requests());
1404   EXPECT_EQ(1U, authority_balancer_->lrs_service()->request_count());
1405   EXPECT_EQ(1U, authority_balancer_->lrs_service()->response_count());
1406   // Load report for default LRS.
1407   std::vector<ClientStats> default_load_report =
1408       balancer_->lrs_service()->WaitForLoadReport();
1409   ASSERT_EQ(default_load_report.size(), 1UL);
1410   ClientStats& default_client_stats = default_load_report.front();
1411   EXPECT_EQ(default_client_stats.cluster_name(), kDefaultClusterName);
1412   EXPECT_EQ(default_client_stats.eds_service_name(), kDefaultEdsServiceName);
1413   EXPECT_EQ(kNumRpcsToDefaultBalancer,
1414             default_client_stats.total_successful_requests());
1415   EXPECT_EQ(0U, default_client_stats.total_requests_in_progress());
1416   EXPECT_EQ(kNumRpcsToDefaultBalancer,
1417             default_client_stats.total_issued_requests());
1418   EXPECT_EQ(0U, default_client_stats.total_error_requests());
1419   EXPECT_EQ(0U, default_client_stats.total_dropped_requests());
1420   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1421   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1422 }
1423 
1424 // This test covers a bug found in the wild whereby we incorrectly failed
1425 // to de-dup xDS servers when the same server is used both in an authority
1426 // and as the top-level server in the bootstrap config.  This resulted in
1427 // the ADS call and LRS call being in two different ChannelState objects,
1428 // which resulted in the LRS load reports not being sent.
TEST_P(XdsFederationLoadReportingTest,SameServerInAuthorityAndTopLevel)1429 TEST_P(XdsFederationLoadReportingTest, SameServerInAuthorityAndTopLevel) {
1430   grpc_core::testing::ScopedExperimentalEnvVar env_var(
1431       "GRPC_EXPERIMENTAL_XDS_FEDERATION");
1432   const char* kAuthority = "xds.example.com";
1433   const char* kNewServerName = "whee%/server.example.com";
1434   const char* kNewListenerName =
1435       "xdstp://xds.example.com/envoy.config.listener.v3.Listener/"
1436       "whee%25/server.example.com";
1437   const char* kNewRouteConfigName =
1438       "xdstp://xds.example.com/envoy.config.route.v3.RouteConfiguration/"
1439       "new_route_config_name";
1440   const char* kNewClusterName =
1441       "xdstp://xds.example.com/envoy.config.cluster.v3.Cluster/"
1442       "cluster_name";
1443   const char* kNewEdsServiceName =
1444       "xdstp://xds.example.com/envoy.config.endpoint.v3.ClusterLoadAssignment/"
1445       "edsservice_name";
1446   std::string xds_server =
1447       absl::StrCat("localhost:", authority_balancer_->port());
1448   XdsBootstrapBuilder builder;
1449   builder.SetServers({xds_server});
1450   builder.AddAuthority(kAuthority, xds_server);
1451   InitClient(builder);
1452   CreateAndStartBackends(1);
1453   authority_balancer_->lrs_service()->set_send_all_clusters(true);
1454   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1455   authority_balancer_->ads_service()->SetEdsResource(
1456       BuildEdsResource(args, kNewEdsServiceName));
1457   // New cluster
1458   Cluster new_cluster = default_cluster_;
1459   new_cluster.set_name(kNewClusterName);
1460   new_cluster.mutable_eds_cluster_config()->set_service_name(
1461       kNewEdsServiceName);
1462   authority_balancer_->ads_service()->SetCdsResource(new_cluster);
1463   // New Route
1464   RouteConfiguration new_route_config = default_route_config_;
1465   new_route_config.set_name(kNewRouteConfigName);
1466   new_route_config.mutable_virtual_hosts(0)
1467       ->mutable_routes(0)
1468       ->mutable_route()
1469       ->set_cluster(kNewClusterName);
1470   // New Listener
1471   Listener listener = default_listener_;
1472   listener.set_name(kNewListenerName);
1473   SetListenerAndRouteConfiguration(authority_balancer_.get(), listener,
1474                                    new_route_config);
1475   // Create second channel to new target URI and send 1 RPC.
1476   auto channel2 =
1477       CreateChannel(/*failover_timeout_ms=*/0, kNewServerName, kAuthority);
1478   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
1479   ClientContext context;
1480   EchoRequest request;
1481   RpcOptions().SetupRpc(&context, &request);
1482   EchoResponse response;
1483   grpc::Status status = stub2->Echo(&context, request, &response);
1484   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1485                            << " message=" << status.error_message();
1486   EXPECT_EQ(1U, backends_[0]->backend_service()->request_count());
1487   // Wait for load report.
1488   std::vector<ClientStats> authority_load_report =
1489       authority_balancer_->lrs_service()->WaitForLoadReport();
1490   ASSERT_EQ(authority_load_report.size(), 1UL);
1491   ClientStats& authority_client_stats = authority_load_report.front();
1492   EXPECT_EQ(authority_client_stats.cluster_name(), kNewClusterName);
1493   EXPECT_EQ(authority_client_stats.eds_service_name(), kNewEdsServiceName);
1494   EXPECT_EQ(1U, authority_client_stats.total_successful_requests());
1495   EXPECT_EQ(0U, authority_client_stats.total_requests_in_progress());
1496   EXPECT_EQ(1U, authority_client_stats.total_issued_requests());
1497   EXPECT_EQ(0U, authority_client_stats.total_error_requests());
1498   EXPECT_EQ(0U, authority_client_stats.total_dropped_requests());
1499   EXPECT_EQ(1U, authority_balancer_->lrs_service()->request_count());
1500   EXPECT_EQ(1U, authority_balancer_->lrs_service()->response_count());
1501 }
1502 
1503 //
1504 // SecureNamingTest - test that the right authority is used for the xDS server
1505 //
1506 
1507 class SecureNamingTest : public XdsEnd2endTest {
1508  public:
SetUp()1509   void SetUp() override {
1510     // Each test calls InitClient() on its own.
1511   }
1512 };
1513 
1514 INSTANTIATE_TEST_SUITE_P(XdsTest, SecureNamingTest,
1515                          ::testing::Values(XdsTestType()), &XdsTestType::Name);
1516 
1517 // Tests that secure naming check passes if target name is expected.
TEST_P(SecureNamingTest,TargetNameIsExpected)1518 TEST_P(SecureNamingTest, TargetNameIsExpected) {
1519   InitClient(MakeBootstrapBuilder(), /*lb_expected_authority=*/"localhost:%d");
1520   CreateAndStartBackends(4);
1521   EdsResourceArgs args({
1522       {"locality0", CreateEndpointsForBackends()},
1523   });
1524   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1525   CheckRpcSendOk(DEBUG_LOCATION);
1526 }
1527 
1528 // Tests that secure naming check fails if target name is unexpected.
TEST_P(SecureNamingTest,TargetNameIsUnexpected)1529 TEST_P(SecureNamingTest, TargetNameIsUnexpected) {
1530   GTEST_FLAG_SET(death_test_style, "threadsafe");
1531   InitClient(MakeBootstrapBuilder(),
1532              /*lb_expected_authority=*/"incorrect_server_name");
1533   CreateAndStartBackends(4);
1534   EdsResourceArgs args({
1535       {"locality0", CreateEndpointsForBackends()},
1536   });
1537   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1538   // Make sure that we blow up (via abort() from the security connector) when
1539   // the name from the balancer doesn't match expectations.
1540   ASSERT_DEATH_IF_SUPPORTED({ CheckRpcSendOk(DEBUG_LOCATION); }, "");
1541 }
1542 
1543 }  // namespace
1544 }  // namespace testing
1545 }  // namespace grpc
1546 
main(int argc,char ** argv)1547 int main(int argc, char** argv) {
1548   grpc::testing::TestEnvironment env(&argc, argv);
1549   ::testing::InitGoogleTest(&argc, argv);
1550   // Make the backup poller poll very frequently in order to pick up
1551   // updates from all the subchannels's FDs.
1552   grpc_core::ConfigVars::Overrides overrides;
1553   overrides.client_channel_backup_poll_interval_ms = 1;
1554   grpc_core::ConfigVars::SetOverrides(overrides);
1555 #if TARGET_OS_IPHONE
1556   // Workaround Apple CFStream bug
1557   grpc_core::SetEnv("grpc_cfstream", "0");
1558 #endif
1559   grpc_init();
1560   const auto result = RUN_ALL_TESTS();
1561   grpc_shutdown();
1562   return result;
1563 }
1564