xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/xds/xds_csds_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include <memory>
17 #include <string>
18 #include <vector>
19 
20 #include <gmock/gmock.h>
21 #include <gtest/gtest.h>
22 
23 #include "absl/memory/memory.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/strings/strip.h"
26 
27 #include <grpcpp/create_channel.h>
28 #include <grpcpp/security/credentials.h>
29 
30 #include "src/core/client_channel/backup_poller.h"
31 #include "src/core/lib/config/config_vars.h"
32 #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
33 #include "src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h"
34 #include "src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h"
35 #include "src/proto/grpc/testing/xds/v3/listener.grpc.pb.h"
36 #include "src/proto/grpc/testing/xds/v3/route.grpc.pb.h"
37 #include "test/core/util/resolve_localhost_ip46.h"
38 #include "test/core/util/test_config.h"
39 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
40 #include "test/cpp/util/credentials.h"
41 
42 #ifndef DISABLED_XDS_PROTO_IN_CC
43 
44 #include "src/cpp/server/csds/csds.h"
45 #include "src/proto/grpc/testing/xds/v3/csds.grpc.pb.h"
46 
47 namespace grpc {
48 namespace testing {
49 namespace {
50 
51 using ::envoy::admin::v3::ClientResourceStatus;
52 using ::envoy::config::cluster::v3::Cluster;
53 using ::envoy::config::endpoint::v3::ClusterLoadAssignment;
54 using ::envoy::config::listener::v3::Listener;
55 using ::envoy::config::route::v3::RouteConfiguration;
56 using ::envoy::extensions::filters::network::http_connection_manager::v3::
57     HttpConnectionManager;
58 
59 MATCHER_P4(EqNode, id, user_agent_name, user_agent_version, client_features,
60            "equals Node") {
61   bool ok = true;
62   ok &= ::testing::ExplainMatchResult(id, arg.id(), result_listener);
63   ok &= ::testing::ExplainMatchResult(user_agent_name, arg.user_agent_name(),
64                                       result_listener);
65   ok &= ::testing::ExplainMatchResult(
66       user_agent_version, arg.user_agent_version(), result_listener);
67   ok &= ::testing::ExplainMatchResult(client_features, arg.client_features(),
68                                       result_listener);
69   return ok;
70 }
71 
72 MATCHER_P6(EqGenericXdsConfig, type_url, name, version_info, xds_config,
73            client_status, error_state, "equals GenericXdsConfig") {
74   bool ok = true;
75   ok &=
76       ::testing::ExplainMatchResult(type_url, arg.type_url(), result_listener);
77   ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
78   ok &= ::testing::ExplainMatchResult(version_info, arg.version_info(),
79                                       result_listener);
80   ok &= ::testing::ExplainMatchResult(xds_config, arg.xds_config(),
81                                       result_listener);
82   ok &= ::testing::ExplainMatchResult(client_status, arg.client_status(),
83                                       result_listener);
84   ok &= ::testing::ExplainMatchResult(error_state, arg.error_state(),
85                                       result_listener);
86   return ok;
87 }
88 
89 MATCHER_P2(EqListener, name, api_listener, "equals Listener") {
90   bool ok = true;
91   ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
92   ok &= ::testing::ExplainMatchResult(
93       api_listener, arg.api_listener().api_listener(), result_listener);
94   return ok;
95 }
96 
97 MATCHER_P(EqHttpConnectionManagerNotRds, route_config,
98           "equals HttpConnectionManager") {
99   bool ok = true;
100   ok &= ::testing::ExplainMatchResult(route_config, arg.route_config(),
101                                       result_listener);
102   return ok;
103 }
104 
105 MATCHER_P(EqRouteConfigurationName, name, "equals RouteConfiguration") {
106   bool ok = true;
107   ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
108   return ok;
109 }
110 
111 MATCHER_P2(EqRouteConfiguration, name, cluster_name,
112            "equals RouteConfiguration") {
113   bool ok = true;
114   ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
115   ok &= ::testing::ExplainMatchResult(
116       ::testing::ElementsAre(::testing::Property(
117           &envoy::config::route::v3::VirtualHost::routes,
118           ::testing::ElementsAre(::testing::Property(
119               &envoy::config::route::v3::Route::route,
120               ::testing::Property(
121                   &envoy::config::route::v3::RouteAction::cluster,
122                   cluster_name))))),
123       arg.virtual_hosts(), result_listener);
124   return ok;
125 }
126 
127 MATCHER_P(EqCluster, name, "equals Cluster") {
128   bool ok = true;
129   ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
130   return ok;
131 }
132 
133 MATCHER_P(EqEndpoint, port, "equals Endpoint") {
134   bool ok = true;
135   ok &= ::testing::ExplainMatchResult(
136       port, arg.address().socket_address().port_value(), result_listener);
137   return ok;
138 }
139 
140 MATCHER_P2(EqLocalityLbEndpoints, port, weight, "equals LocalityLbEndpoints") {
141   bool ok = true;
142   ok &= ::testing::ExplainMatchResult(
143       ::testing::ElementsAre(::testing::Property(
144           &envoy::config::endpoint::v3::LbEndpoint::endpoint,
145           EqEndpoint(port))),
146       arg.lb_endpoints(), result_listener);
147   ok &= ::testing::ExplainMatchResult(
148       weight, arg.load_balancing_weight().value(), result_listener);
149   return ok;
150 }
151 
152 MATCHER_P(EqClusterLoadAssignmentName, cluster_name,
153           "equals ClusterLoadAssignment") {
154   bool ok = true;
155   ok &= ::testing::ExplainMatchResult(cluster_name, arg.cluster_name(),
156                                       result_listener);
157   return ok;
158 }
159 
160 MATCHER_P3(EqClusterLoadAssignment, cluster_name, port, weight,
161            "equals ClusterLoadAssignment") {
162   bool ok = true;
163   ok &= ::testing::ExplainMatchResult(cluster_name, arg.cluster_name(),
164                                       result_listener);
165   ok &= ::testing::ExplainMatchResult(
166       ::testing::ElementsAre(EqLocalityLbEndpoints(port, weight)),
167       arg.endpoints(), result_listener);
168   return ok;
169 }
170 
171 MATCHER_P2(EqUpdateFailureState, details, version_info,
172            "equals UpdateFailureState") {
173   bool ok = true;
174   ok &= ::testing::ExplainMatchResult(details, arg.details(), result_listener);
175   ok &= ::testing::ExplainMatchResult(version_info, arg.version_info(),
176                                       result_listener);
177   return ok;
178 }
179 
180 MATCHER_P(UnpackListener, matcher, "is a Listener") {
181   Listener config;
182   if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
183                                      result_listener)) {
184     return false;
185   }
186   return ::testing::ExplainMatchResult(matcher, config, result_listener);
187 }
188 
189 MATCHER_P(UnpackRouteConfiguration, matcher, "is a RouteConfiguration") {
190   RouteConfiguration config;
191   if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
192                                      result_listener)) {
193     return false;
194   }
195   return ::testing::ExplainMatchResult(matcher, config, result_listener);
196 }
197 
198 MATCHER_P(UnpackHttpConnectionManager, matcher, "is a HttpConnectionManager") {
199   HttpConnectionManager config;
200   if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
201                                      result_listener)) {
202     return false;
203   }
204   return ::testing::ExplainMatchResult(matcher, config, result_listener);
205 }
206 
207 MATCHER_P(UnpackCluster, matcher, "is a Cluster") {
208   Cluster config;
209   if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
210                                      result_listener)) {
211     return false;
212   }
213   return ::testing::ExplainMatchResult(matcher, config, result_listener);
214 }
215 
216 MATCHER_P(UnpackClusterLoadAssignment, matcher, "is a ClusterLoadAssignment") {
217   ClusterLoadAssignment config;
218   if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
219                                      result_listener)) {
220     return false;
221   }
222   return ::testing::ExplainMatchResult(matcher, config, result_listener);
223 }
224 
225 MATCHER(IsRdsEnabledHCM, "is a RDS enabled HttpConnectionManager") {
226   return ::testing::ExplainMatchResult(
227       UnpackHttpConnectionManager(
228           ::testing::Property(&HttpConnectionManager::has_rds, true)),
229       arg, result_listener);
230 }
231 
232 MATCHER_P2(EqNoRdsHCM, route_configuration_name, cluster_name,
233            "equals RDS disabled HttpConnectionManager") {
234   return ::testing::ExplainMatchResult(
235       UnpackHttpConnectionManager(EqHttpConnectionManagerNotRds(
236           EqRouteConfiguration(route_configuration_name, cluster_name))),
237       arg, result_listener);
238 }
239 
240 class ClientStatusDiscoveryServiceTest : public XdsEnd2endTest {
241  public:
ClientStatusDiscoveryServiceTest()242   ClientStatusDiscoveryServiceTest() {
243     admin_server_thread_ = std::make_unique<AdminServerThread>(this);
244     admin_server_thread_->Start();
245     std::string admin_server_address =
246         grpc_core::LocalIpAndPort(admin_server_thread_->port());
247     admin_channel_ = grpc::CreateChannel(
248         admin_server_address,
249         std::make_shared<FakeTransportSecurityChannelCredentials>());
250     csds_stub_ =
251         envoy::service::status::v3::ClientStatusDiscoveryService::NewStub(
252             admin_channel_);
253     if (GetParam().use_csds_streaming()) {
254       stream_ = csds_stub_->StreamClientStatus(&stream_context_);
255     }
256   }
257 
~ClientStatusDiscoveryServiceTest()258   ~ClientStatusDiscoveryServiceTest() override {
259     if (stream_ != nullptr) {
260       EXPECT_TRUE(stream_->WritesDone());
261       Status status = stream_->Finish();
262       EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
263                                << " message=" << status.error_message();
264     }
265     admin_server_thread_->Shutdown();
266   }
267 
FetchCsdsResponse()268   envoy::service::status::v3::ClientStatusResponse FetchCsdsResponse() {
269     envoy::service::status::v3::ClientStatusResponse response;
270     if (!GetParam().use_csds_streaming()) {
271       // Fetch through unary pulls
272       ClientContext context;
273       Status status = csds_stub_->FetchClientStatus(
274           &context, envoy::service::status::v3::ClientStatusRequest(),
275           &response);
276       EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
277                                << " message=" << status.error_message();
278     } else {
279       // Fetch through streaming pulls
280       EXPECT_TRUE(
281           stream_->Write(envoy::service::status::v3::ClientStatusRequest()));
282       EXPECT_TRUE(stream_->Read(&response));
283     }
284     return response;
285   }
286 
287  private:
288   // Server thread for CSDS server.
289   class AdminServerThread : public ServerThread {
290    public:
AdminServerThread(XdsEnd2endTest * test_obj)291     explicit AdminServerThread(XdsEnd2endTest* test_obj)
292         : ServerThread(test_obj) {}
293 
294    private:
Type()295     const char* Type() override { return "Admin"; }
296 
RegisterAllServices(ServerBuilder * builder)297     void RegisterAllServices(ServerBuilder* builder) override {
298       builder->RegisterService(&csds_service_);
299     }
StartAllServices()300     void StartAllServices() override {}
ShutdownAllServices()301     void ShutdownAllServices() override {}
302 
303     grpc::xds::experimental::ClientStatusDiscoveryService csds_service_;
304   };
305 
306   std::unique_ptr<AdminServerThread> admin_server_thread_;
307   std::shared_ptr<Channel> admin_channel_;
308   std::unique_ptr<
309       envoy::service::status::v3::ClientStatusDiscoveryService::Stub>
310       csds_stub_;
311   ClientContext stream_context_;
312   std::unique_ptr<
313       ClientReaderWriter<envoy::service::status::v3::ClientStatusRequest,
314                          envoy::service::status::v3::ClientStatusResponse>>
315       stream_;
316 };
317 
318 // Run CSDS tests with RDS enabled and disabled.
319 // These need to run with the bootstrap from an env var instead of from
320 // a channel arg, since there needs to be a global XdsClient instance.
321 INSTANTIATE_TEST_SUITE_P(
322     XdsTest, ClientStatusDiscoveryServiceTest,
323     ::testing::Values(
324         XdsTestType().set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar),
325         XdsTestType()
326             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
327             .set_enable_rds_testing(),
328         XdsTestType()
329             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
330             .set_use_csds_streaming(),
331         XdsTestType()
332             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
333             .set_enable_rds_testing()
334             .set_use_csds_streaming()),
335     &XdsTestType::Name);
336 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpVanilla)337 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpVanilla) {
338   CreateAndStartBackends(1);
339   const size_t kNumRpcs = 5;
340   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
341   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
342   // Send several RPCs to ensure the xDS setup works
343   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
344   // Fetches the client config
345   auto csds_response = FetchCsdsResponse();
346   gpr_log(GPR_INFO, "xDS config dump: %s", csds_response.DebugString().c_str());
347   ASSERT_EQ(1, csds_response.config_size());
348   const auto& client_config = csds_response.config(0);
349   // Validate the Node information
350   EXPECT_THAT(client_config.node(),
351               EqNode("xds_end2end_test", ::testing::HasSubstr("C-core"),
352                      ::testing::HasSubstr(grpc_version_string()),
353                      ::testing::ElementsAre(
354                          "envoy.lb.does_not_support_overprovisioning")));
355   // Listener matcher depends on whether RDS is enabled.
356   ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
357   if (GetParam().enable_rds_testing()) {
358     api_listener_matcher = IsRdsEnabledHCM();
359   } else {
360     api_listener_matcher =
361         EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
362   }
363   // Construct list of all matchers.
364   std::vector<::testing::Matcher<
365       envoy::service::status::v3::ClientConfig_GenericXdsConfig>>
366       matchers = {
367           // Listener
368           EqGenericXdsConfig(
369               kLdsTypeUrl, kServerName, "1",
370               UnpackListener(EqListener(kServerName, api_listener_matcher)),
371               ClientResourceStatus::ACKED, ::testing::_),
372           // Cluster
373           EqGenericXdsConfig(kCdsTypeUrl, kDefaultClusterName, "1",
374                              UnpackCluster(EqCluster(kDefaultClusterName)),
375                              ClientResourceStatus::ACKED, ::testing::_),
376           // ClusterLoadAssignment
377           EqGenericXdsConfig(
378               kEdsTypeUrl, kDefaultEdsServiceName, "1",
379               UnpackClusterLoadAssignment(EqClusterLoadAssignment(
380                   kDefaultEdsServiceName, backends_[0]->port(),
381                   kDefaultLocalityWeight)),
382               ClientResourceStatus::ACKED, ::testing::_),
383       };
384   // If RDS is enabled, add matcher for RDS resource.
385   if (GetParam().enable_rds_testing()) {
386     matchers.push_back(EqGenericXdsConfig(
387         kRdsTypeUrl, kDefaultRouteConfigurationName, "1",
388         UnpackRouteConfiguration(EqRouteConfiguration(
389             kDefaultRouteConfigurationName, kDefaultClusterName)),
390         ClientResourceStatus::ACKED, ::testing::_));
391   }
392   // Validate the dumped xDS configs
393   EXPECT_THAT(client_config.generic_xds_configs(),
394               ::testing::UnorderedElementsAreArray(matchers))
395       << "Actual: " << client_config.DebugString();
396 }
397 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpEmpty)398 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpEmpty) {
399   // The CSDS service should not fail if XdsClient is not initialized or there
400   // is no working xDS configs.
401   FetchCsdsResponse();
402 }
403 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpListenerError)404 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpListenerError) {
405   CreateAndStartBackends(1);
406   int kFetchConfigRetries = 3;
407   int kFetchIntervalMilliseconds = 200;
408   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
409   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
410   // Ensure the xDS resolver has working configs.
411   CheckRpcSendOk(DEBUG_LOCATION);
412   // Bad Listener should be rejected.
413   Listener listener;
414   listener.set_name(kServerName);
415   balancer_->ads_service()->SetLdsResource(listener);
416   // The old xDS configs should still be effective.
417   CheckRpcSendOk(DEBUG_LOCATION);
418   ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
419   if (GetParam().enable_rds_testing()) {
420     api_listener_matcher = IsRdsEnabledHCM();
421   } else {
422     api_listener_matcher =
423         EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
424   }
425   for (int i = 0; i < kFetchConfigRetries; ++i) {
426     auto csds_response = FetchCsdsResponse();
427     // Check if error state is propagated
428     bool ok = ::testing::Value(
429         csds_response.config(0).generic_xds_configs(),
430         ::testing::Contains(EqGenericXdsConfig(
431             kLdsTypeUrl, kServerName, "1",
432             UnpackListener(EqListener(kServerName, api_listener_matcher)),
433             ClientResourceStatus::NACKED,
434             EqUpdateFailureState(
435                 ::testing::HasSubstr(
436                     "Listener has neither address nor ApiListener"),
437                 "2"))));
438     if (ok) return;  // TEST PASSED!
439     gpr_sleep_until(
440         grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
441   }
442   FAIL() << "error_state not seen in CSDS responses";
443 }
444 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpRouteError)445 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpRouteError) {
446   CreateAndStartBackends(1);
447   int kFetchConfigRetries = 3;
448   int kFetchIntervalMilliseconds = 200;
449   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
450   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
451   // Ensure the xDS resolver has working configs.
452   CheckRpcSendOk(DEBUG_LOCATION);
453   // Bad route config will be rejected.
454   RouteConfiguration route_config;
455   route_config.set_name(kDefaultRouteConfigurationName);
456   route_config.add_virtual_hosts();
457   SetRouteConfiguration(balancer_.get(), route_config);
458   // The old xDS configs should still be effective.
459   CheckRpcSendOk(DEBUG_LOCATION);
460   for (int i = 0; i < kFetchConfigRetries; ++i) {
461     auto csds_response = FetchCsdsResponse();
462     bool ok = false;
463     if (GetParam().enable_rds_testing()) {
464       ok = ::testing::Value(
465           csds_response.config(0).generic_xds_configs(),
466           ::testing::Contains(EqGenericXdsConfig(
467               kRdsTypeUrl, kDefaultRouteConfigurationName, "1",
468               UnpackRouteConfiguration(EqRouteConfiguration(
469                   kDefaultRouteConfigurationName, kDefaultClusterName)),
470               ClientResourceStatus::NACKED,
471               EqUpdateFailureState(
472                   ::testing::HasSubstr(
473                       "field:virtual_hosts[0].domains error:must be non-empty"),
474                   "2"))));
475     } else {
476       ok = ::testing::Value(
477           csds_response.config(0).generic_xds_configs(),
478           ::testing::Contains(EqGenericXdsConfig(
479               kLdsTypeUrl, kServerName, "1",
480               UnpackListener(EqListener(
481                   kServerName, EqNoRdsHCM(kDefaultRouteConfigurationName,
482                                           kDefaultClusterName))),
483               ClientResourceStatus::NACKED,
484               EqUpdateFailureState(
485                   ::testing::HasSubstr(
486                       "field:api_listener.api_listener.value[envoy.extensions"
487                       ".filters.network.http_connection_manager.v3"
488                       ".HttpConnectionManager].route_config.virtual_hosts[0]"
489                       ".domains error:must be non-empty"),
490                   "2"))));
491     }
492     if (ok) return;  // TEST PASSED!
493     gpr_sleep_until(
494         grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
495   }
496   FAIL() << "error_state not seen in CSDS responses";
497 }
498 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpClusterError)499 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpClusterError) {
500   CreateAndStartBackends(1);
501   int kFetchConfigRetries = 3;
502   int kFetchIntervalMilliseconds = 200;
503   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
504   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
505   // Ensure the xDS resolver has working configs.
506   CheckRpcSendOk(DEBUG_LOCATION);
507   // Listener without any route, will be rejected.
508   Cluster cluster;
509   cluster.set_name(kDefaultClusterName);
510   balancer_->ads_service()->SetCdsResource(cluster);
511   // The old xDS configs should still be effective.
512   CheckRpcSendOk(DEBUG_LOCATION);
513   for (int i = 0; i < kFetchConfigRetries; ++i) {
514     auto csds_response = FetchCsdsResponse();
515     // Check if error state is propagated
516     bool ok = ::testing::Value(
517         csds_response.config(0).generic_xds_configs(),
518         ::testing::Contains(EqGenericXdsConfig(
519             kCdsTypeUrl, kDefaultClusterName, "1",
520             UnpackCluster(EqCluster(kDefaultClusterName)),
521             ClientResourceStatus::NACKED,
522             EqUpdateFailureState(::testing::HasSubstr("unknown discovery type"),
523                                  "2"))));
524     if (ok) return;  // TEST PASSED!
525     gpr_sleep_until(
526         grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
527   }
528   FAIL() << "error_state not seen in CSDS responses";
529 }
530 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpEndpointError)531 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpEndpointError) {
532   CreateAndStartBackends(1);
533   int kFetchConfigRetries = 3;
534   int kFetchIntervalMilliseconds = 200;
535   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
536   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
537   // Ensure the xDS resolver has working configs.
538   CheckRpcSendOk(DEBUG_LOCATION);
539   // Bad endpoint config will be rejected.
540   ClusterLoadAssignment cluster_load_assignment;
541   cluster_load_assignment.set_cluster_name(kDefaultEdsServiceName);
542   auto* endpoints = cluster_load_assignment.add_endpoints();
543   endpoints->mutable_load_balancing_weight()->set_value(1);
544   auto* endpoint = endpoints->add_lb_endpoints()->mutable_endpoint();
545   endpoint->mutable_address()->mutable_socket_address()->set_port_value(1 << 1);
546   balancer_->ads_service()->SetEdsResource(cluster_load_assignment);
547   // The old xDS configs should still be effective.
548   CheckRpcSendOk(DEBUG_LOCATION);
549   for (int i = 0; i < kFetchConfigRetries; ++i) {
550     auto csds_response = FetchCsdsResponse();
551     // Check if error state is propagated
552     bool ok = ::testing::Value(
553         csds_response.config(0).generic_xds_configs(),
554         ::testing::Contains(EqGenericXdsConfig(
555             kEdsTypeUrl, kDefaultEdsServiceName, "1",
556             UnpackClusterLoadAssignment(EqClusterLoadAssignment(
557                 kDefaultEdsServiceName, backends_[0]->port(),
558                 kDefaultLocalityWeight)),
559             ClientResourceStatus::NACKED,
560             EqUpdateFailureState(
561                 ::testing::HasSubstr(
562                     "errors parsing EDS resource: ["
563                     "field:endpoints[0].locality error:field not present]"),
564                 "2"))));
565     if (ok) return;  // TEST PASSED!
566     gpr_sleep_until(
567         grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
568   }
569   FAIL() << "error_state not seen in CSDS responses";
570 }
571 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpListenerRequested)572 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpListenerRequested) {
573   int kTimeoutMillisecond = 1000;
574   balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
575   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::DEADLINE_EXCEEDED,
576                       "Deadline Exceeded",
577                       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
578   auto csds_response = FetchCsdsResponse();
579   EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
580               ::testing::Contains(EqGenericXdsConfig(
581                   kLdsTypeUrl, kServerName, ::testing::_, ::testing::_,
582                   ClientResourceStatus::REQUESTED, ::testing::_)));
583 }
584 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpClusterRequested)585 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpClusterRequested) {
586   int kTimeoutMillisecond = 1000;
587   std::string kClusterName1 = "cluster-1";
588   std::string kClusterName2 = "cluster-2";
589   // Create a route config requesting two non-existing clusters
590   RouteConfiguration route_config;
591   route_config.set_name(kDefaultRouteConfigurationName);
592   auto* vh = route_config.add_virtual_hosts();
593   // The VirtualHost must match the domain name, otherwise will cause resolver
594   // transient failure.
595   vh->add_domains("*");
596   auto* routes1 = vh->add_routes();
597   routes1->mutable_match()->set_prefix("");
598   routes1->mutable_route()->set_cluster(kClusterName1);
599   auto* routes2 = vh->add_routes();
600   routes2->mutable_match()->set_prefix("");
601   routes2->mutable_route()->set_cluster(kClusterName2);
602   SetRouteConfiguration(balancer_.get(), route_config);
603   // Try to get the configs plumb through
604   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::DEADLINE_EXCEEDED,
605                       "Deadline Exceeded",
606                       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
607   auto csds_response = FetchCsdsResponse();
608   EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
609               ::testing::AllOf(
610                   ::testing::Contains(EqGenericXdsConfig(
611                       kCdsTypeUrl, kClusterName1, ::testing::_, ::testing::_,
612                       ClientResourceStatus::REQUESTED, ::testing::_)),
613                   ::testing::Contains(EqGenericXdsConfig(
614                       kCdsTypeUrl, kClusterName2, ::testing::_, ::testing::_,
615                       ClientResourceStatus::REQUESTED, ::testing::_))));
616 }
617 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpMultiClient)618 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpMultiClient) {
619   Listener listener = default_listener_;
620   const char* kServer2Name = "server2.example.com";
621   listener.set_name(kServer2Name);
622   balancer_->ads_service()->SetLdsResource(listener);
623   SetListenerAndRouteConfiguration(balancer_.get(), listener,
624                                    default_route_config_);
625   CreateAndStartBackends(1);
626   const size_t kNumRpcs = 5;
627   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
628   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
629   // Send several RPCs to ensure the xDS setup works
630   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
631   // Connect to a second server
632   auto channel2 = CreateChannel(0, kServer2Name);
633   channel2->GetState(/*try_to_connect=*/true);
634   ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1)));
635   // Fetches the client config
636   auto csds_response = FetchCsdsResponse();
637   ASSERT_EQ(2, csds_response.config_size());
638   std::vector<std::string> scopes;
639   for (const auto& client_config : csds_response.config()) {
640     // Validate the Node information
641     EXPECT_THAT(client_config.node(),
642                 EqNode("xds_end2end_test", ::testing::HasSubstr("C-core"),
643                        ::testing::HasSubstr(grpc_version_string()),
644                        ::testing::ElementsAre(
645                            "envoy.lb.does_not_support_overprovisioning")));
646     scopes.emplace_back(client_config.client_scope());
647     absl::string_view server = client_config.client_scope();
648     // Listener matcher depends on whether RDS is enabled.
649     ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
650     if (GetParam().enable_rds_testing()) {
651       api_listener_matcher = IsRdsEnabledHCM();
652     } else {
653       api_listener_matcher =
654           EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
655     }
656     // Construct list of all matchers.
657     std::vector<::testing::Matcher<
658         envoy::service::status::v3::ClientConfig_GenericXdsConfig>>
659         matchers = {
660             // Listener
661             EqGenericXdsConfig(
662                 kLdsTypeUrl, absl::StripPrefix(server, "xds:"), "3",
663                 UnpackListener(EqListener(absl::StripPrefix(server, "xds:"),
664                                           api_listener_matcher)),
665                 ClientResourceStatus::ACKED, ::testing::_),
666             // Cluster
667             EqGenericXdsConfig(kCdsTypeUrl, kDefaultClusterName, "1",
668                                UnpackCluster(EqCluster(kDefaultClusterName)),
669                                ClientResourceStatus::ACKED, ::testing::_),
670             // ClusterLoadAssignment
671             EqGenericXdsConfig(
672                 kEdsTypeUrl, kDefaultEdsServiceName, "1",
673                 UnpackClusterLoadAssignment(EqClusterLoadAssignment(
674                     kDefaultEdsServiceName, backends_[0]->port(),
675                     kDefaultLocalityWeight)),
676                 ClientResourceStatus::ACKED, ::testing::_),
677         };
678     // If RDS is enabled, add matcher for RDS resource.
679     if (GetParam().enable_rds_testing()) {
680       matchers.push_back(EqGenericXdsConfig(
681           kRdsTypeUrl, kDefaultRouteConfigurationName, "2",
682           UnpackRouteConfiguration(EqRouteConfiguration(
683               kDefaultRouteConfigurationName, kDefaultClusterName)),
684           ClientResourceStatus::ACKED, ::testing::_));
685     }
686     // Validate the dumped xDS configs
687     EXPECT_THAT(client_config.generic_xds_configs(),
688                 ::testing::UnorderedElementsAreArray(matchers));
689   }
690   EXPECT_THAT(scopes, ::testing::UnorderedElementsAre(
691                           "xds:server.example.com", "xds:server2.example.com"));
692 }
693 
694 class CsdsShortAdsTimeoutTest : public ClientStatusDiscoveryServiceTest {
695  protected:
SetUp()696   void SetUp() override {
697     // Shorten the ADS subscription timeout to speed up the test run.
698     InitClient(absl::nullopt, /*lb_expected_authority=*/"",
699                /*xds_resource_does_not_exist_timeout_ms=*/2000);
700   }
701 };
702 
703 // Run CSDS tests with RDS enabled and disabled.
704 // These need to run with the bootstrap from an env var instead of from
705 // a channel arg, since there needs to be a global XdsClient instance.
706 INSTANTIATE_TEST_SUITE_P(
707     XdsTest, CsdsShortAdsTimeoutTest,
708     ::testing::Values(
709         XdsTestType().set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar),
710         XdsTestType()
711             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
712             .set_enable_rds_testing(),
713         XdsTestType()
714             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
715             .set_use_csds_streaming(),
716         XdsTestType()
717             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
718             .set_enable_rds_testing()
719             .set_use_csds_streaming()),
720     &XdsTestType::Name);
721 
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpListenerDoesNotExist)722 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpListenerDoesNotExist) {
723   int kTimeoutMillisecond = 1000000;  // 1000s wait for the transient failure.
724   balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
725   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
726                       absl::StrCat("empty address list: ", kServerName,
727                                    ": xDS listener resource does not exist"),
728                       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
729   auto csds_response = FetchCsdsResponse();
730   EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
731               ::testing::Contains(EqGenericXdsConfig(
732                   kLdsTypeUrl, kServerName, ::testing::_, ::testing::_,
733                   ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
734 }
735 
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpRouteConfigDoesNotExist)736 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpRouteConfigDoesNotExist) {
737   if (!GetParam().enable_rds_testing()) return;
738   int kTimeoutMillisecond = 1000000;  // 1000s wait for the transient failure.
739   balancer_->ads_service()->UnsetResource(kRdsTypeUrl,
740                                           kDefaultRouteConfigurationName);
741   CheckRpcSendFailure(
742       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
743       absl::StrCat("empty address list: ", kDefaultRouteConfigurationName,
744                    ": xDS route configuration resource does not exist"),
745       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
746   auto csds_response = FetchCsdsResponse();
747   EXPECT_THAT(
748       csds_response.config(0).generic_xds_configs(),
749       ::testing::Contains(EqGenericXdsConfig(
750           kRdsTypeUrl, kDefaultRouteConfigurationName, ::testing::_,
751           ::testing::_, ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
752 }
753 
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpClusterDoesNotExist)754 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpClusterDoesNotExist) {
755   int kTimeoutMillisecond = 1000000;  // 1000s wait for the transient failure.
756   balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
757   CheckRpcSendFailure(
758       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
759       absl::StrCat("CDS resource ", kDefaultClusterName, " does not exist"),
760       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
761   auto csds_response = FetchCsdsResponse();
762   EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
763               ::testing::Contains(EqGenericXdsConfig(
764                   kCdsTypeUrl, kDefaultClusterName, ::testing::_, ::testing::_,
765                   ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
766 }
767 
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpEndpointDoesNotExist)768 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpEndpointDoesNotExist) {
769   int kTimeoutMillisecond = 1000000;  // 1000s wait for the transient failure.
770   balancer_->ads_service()->UnsetResource(kEdsTypeUrl, kDefaultEdsServiceName);
771   CheckRpcSendFailure(
772       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
773       "no children in weighted_target policy: EDS resource eds_service_name "
774       "does not exist",
775       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
776   auto csds_response = FetchCsdsResponse();
777   EXPECT_THAT(
778       csds_response.config(0).generic_xds_configs(),
779       ::testing::Contains(EqGenericXdsConfig(
780           kEdsTypeUrl, kDefaultEdsServiceName, ::testing::_, ::testing::_,
781           ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
782 }
783 
784 }  // namespace
785 }  // namespace testing
786 }  // namespace grpc
787 
788 #endif  // DISABLED_XDS_PROTO_IN_CC
789 
main(int argc,char ** argv)790 int main(int argc, char** argv) {
791   grpc::testing::TestEnvironment env(&argc, argv);
792   ::testing::InitGoogleTest(&argc, argv);
793   // Make the backup poller poll very frequently in order to pick up
794   // updates from all the subchannels's FDs.
795   grpc_core::ConfigVars::Overrides overrides;
796   overrides.client_channel_backup_poll_interval_ms = 1;
797   grpc_core::ConfigVars::SetOverrides(overrides);
798 #if TARGET_OS_IPHONE
799   // Workaround Apple CFStream bug
800   grpc_core::SetEnv("grpc_cfstream", "0");
801 #endif
802   grpc_init();
803   const auto result = RUN_ALL_TESTS();
804   grpc_shutdown();
805   return result;
806 }
807