1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 #include <memory>
17 #include <string>
18 #include <vector>
19
20 #include <gmock/gmock.h>
21 #include <gtest/gtest.h>
22
23 #include "absl/memory/memory.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/strings/strip.h"
26
27 #include <grpcpp/create_channel.h>
28 #include <grpcpp/security/credentials.h>
29
30 #include "src/core/client_channel/backup_poller.h"
31 #include "src/core/lib/config/config_vars.h"
32 #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
33 #include "src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h"
34 #include "src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h"
35 #include "src/proto/grpc/testing/xds/v3/listener.grpc.pb.h"
36 #include "src/proto/grpc/testing/xds/v3/route.grpc.pb.h"
37 #include "test/core/util/resolve_localhost_ip46.h"
38 #include "test/core/util/test_config.h"
39 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
40 #include "test/cpp/util/credentials.h"
41
42 #ifndef DISABLED_XDS_PROTO_IN_CC
43
44 #include "src/cpp/server/csds/csds.h"
45 #include "src/proto/grpc/testing/xds/v3/csds.grpc.pb.h"
46
47 namespace grpc {
48 namespace testing {
49 namespace {
50
51 using ::envoy::admin::v3::ClientResourceStatus;
52 using ::envoy::config::cluster::v3::Cluster;
53 using ::envoy::config::endpoint::v3::ClusterLoadAssignment;
54 using ::envoy::config::listener::v3::Listener;
55 using ::envoy::config::route::v3::RouteConfiguration;
56 using ::envoy::extensions::filters::network::http_connection_manager::v3::
57 HttpConnectionManager;
58
59 MATCHER_P4(EqNode, id, user_agent_name, user_agent_version, client_features,
60 "equals Node") {
61 bool ok = true;
62 ok &= ::testing::ExplainMatchResult(id, arg.id(), result_listener);
63 ok &= ::testing::ExplainMatchResult(user_agent_name, arg.user_agent_name(),
64 result_listener);
65 ok &= ::testing::ExplainMatchResult(
66 user_agent_version, arg.user_agent_version(), result_listener);
67 ok &= ::testing::ExplainMatchResult(client_features, arg.client_features(),
68 result_listener);
69 return ok;
70 }
71
72 MATCHER_P6(EqGenericXdsConfig, type_url, name, version_info, xds_config,
73 client_status, error_state, "equals GenericXdsConfig") {
74 bool ok = true;
75 ok &=
76 ::testing::ExplainMatchResult(type_url, arg.type_url(), result_listener);
77 ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
78 ok &= ::testing::ExplainMatchResult(version_info, arg.version_info(),
79 result_listener);
80 ok &= ::testing::ExplainMatchResult(xds_config, arg.xds_config(),
81 result_listener);
82 ok &= ::testing::ExplainMatchResult(client_status, arg.client_status(),
83 result_listener);
84 ok &= ::testing::ExplainMatchResult(error_state, arg.error_state(),
85 result_listener);
86 return ok;
87 }
88
89 MATCHER_P2(EqListener, name, api_listener, "equals Listener") {
90 bool ok = true;
91 ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
92 ok &= ::testing::ExplainMatchResult(
93 api_listener, arg.api_listener().api_listener(), result_listener);
94 return ok;
95 }
96
97 MATCHER_P(EqHttpConnectionManagerNotRds, route_config,
98 "equals HttpConnectionManager") {
99 bool ok = true;
100 ok &= ::testing::ExplainMatchResult(route_config, arg.route_config(),
101 result_listener);
102 return ok;
103 }
104
105 MATCHER_P(EqRouteConfigurationName, name, "equals RouteConfiguration") {
106 bool ok = true;
107 ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
108 return ok;
109 }
110
111 MATCHER_P2(EqRouteConfiguration, name, cluster_name,
112 "equals RouteConfiguration") {
113 bool ok = true;
114 ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
115 ok &= ::testing::ExplainMatchResult(
116 ::testing::ElementsAre(::testing::Property(
117 &envoy::config::route::v3::VirtualHost::routes,
118 ::testing::ElementsAre(::testing::Property(
119 &envoy::config::route::v3::Route::route,
120 ::testing::Property(
121 &envoy::config::route::v3::RouteAction::cluster,
122 cluster_name))))),
123 arg.virtual_hosts(), result_listener);
124 return ok;
125 }
126
127 MATCHER_P(EqCluster, name, "equals Cluster") {
128 bool ok = true;
129 ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
130 return ok;
131 }
132
133 MATCHER_P(EqEndpoint, port, "equals Endpoint") {
134 bool ok = true;
135 ok &= ::testing::ExplainMatchResult(
136 port, arg.address().socket_address().port_value(), result_listener);
137 return ok;
138 }
139
140 MATCHER_P2(EqLocalityLbEndpoints, port, weight, "equals LocalityLbEndpoints") {
141 bool ok = true;
142 ok &= ::testing::ExplainMatchResult(
143 ::testing::ElementsAre(::testing::Property(
144 &envoy::config::endpoint::v3::LbEndpoint::endpoint,
145 EqEndpoint(port))),
146 arg.lb_endpoints(), result_listener);
147 ok &= ::testing::ExplainMatchResult(
148 weight, arg.load_balancing_weight().value(), result_listener);
149 return ok;
150 }
151
152 MATCHER_P(EqClusterLoadAssignmentName, cluster_name,
153 "equals ClusterLoadAssignment") {
154 bool ok = true;
155 ok &= ::testing::ExplainMatchResult(cluster_name, arg.cluster_name(),
156 result_listener);
157 return ok;
158 }
159
160 MATCHER_P3(EqClusterLoadAssignment, cluster_name, port, weight,
161 "equals ClusterLoadAssignment") {
162 bool ok = true;
163 ok &= ::testing::ExplainMatchResult(cluster_name, arg.cluster_name(),
164 result_listener);
165 ok &= ::testing::ExplainMatchResult(
166 ::testing::ElementsAre(EqLocalityLbEndpoints(port, weight)),
167 arg.endpoints(), result_listener);
168 return ok;
169 }
170
171 MATCHER_P2(EqUpdateFailureState, details, version_info,
172 "equals UpdateFailureState") {
173 bool ok = true;
174 ok &= ::testing::ExplainMatchResult(details, arg.details(), result_listener);
175 ok &= ::testing::ExplainMatchResult(version_info, arg.version_info(),
176 result_listener);
177 return ok;
178 }
179
180 MATCHER_P(UnpackListener, matcher, "is a Listener") {
181 Listener config;
182 if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
183 result_listener)) {
184 return false;
185 }
186 return ::testing::ExplainMatchResult(matcher, config, result_listener);
187 }
188
189 MATCHER_P(UnpackRouteConfiguration, matcher, "is a RouteConfiguration") {
190 RouteConfiguration config;
191 if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
192 result_listener)) {
193 return false;
194 }
195 return ::testing::ExplainMatchResult(matcher, config, result_listener);
196 }
197
198 MATCHER_P(UnpackHttpConnectionManager, matcher, "is a HttpConnectionManager") {
199 HttpConnectionManager config;
200 if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
201 result_listener)) {
202 return false;
203 }
204 return ::testing::ExplainMatchResult(matcher, config, result_listener);
205 }
206
207 MATCHER_P(UnpackCluster, matcher, "is a Cluster") {
208 Cluster config;
209 if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
210 result_listener)) {
211 return false;
212 }
213 return ::testing::ExplainMatchResult(matcher, config, result_listener);
214 }
215
216 MATCHER_P(UnpackClusterLoadAssignment, matcher, "is a ClusterLoadAssignment") {
217 ClusterLoadAssignment config;
218 if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
219 result_listener)) {
220 return false;
221 }
222 return ::testing::ExplainMatchResult(matcher, config, result_listener);
223 }
224
225 MATCHER(IsRdsEnabledHCM, "is a RDS enabled HttpConnectionManager") {
226 return ::testing::ExplainMatchResult(
227 UnpackHttpConnectionManager(
228 ::testing::Property(&HttpConnectionManager::has_rds, true)),
229 arg, result_listener);
230 }
231
232 MATCHER_P2(EqNoRdsHCM, route_configuration_name, cluster_name,
233 "equals RDS disabled HttpConnectionManager") {
234 return ::testing::ExplainMatchResult(
235 UnpackHttpConnectionManager(EqHttpConnectionManagerNotRds(
236 EqRouteConfiguration(route_configuration_name, cluster_name))),
237 arg, result_listener);
238 }
239
240 class ClientStatusDiscoveryServiceTest : public XdsEnd2endTest {
241 public:
ClientStatusDiscoveryServiceTest()242 ClientStatusDiscoveryServiceTest() {
243 admin_server_thread_ = std::make_unique<AdminServerThread>(this);
244 admin_server_thread_->Start();
245 std::string admin_server_address =
246 grpc_core::LocalIpAndPort(admin_server_thread_->port());
247 admin_channel_ = grpc::CreateChannel(
248 admin_server_address,
249 std::make_shared<FakeTransportSecurityChannelCredentials>());
250 csds_stub_ =
251 envoy::service::status::v3::ClientStatusDiscoveryService::NewStub(
252 admin_channel_);
253 if (GetParam().use_csds_streaming()) {
254 stream_ = csds_stub_->StreamClientStatus(&stream_context_);
255 }
256 }
257
~ClientStatusDiscoveryServiceTest()258 ~ClientStatusDiscoveryServiceTest() override {
259 if (stream_ != nullptr) {
260 EXPECT_TRUE(stream_->WritesDone());
261 Status status = stream_->Finish();
262 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
263 << " message=" << status.error_message();
264 }
265 admin_server_thread_->Shutdown();
266 }
267
FetchCsdsResponse()268 envoy::service::status::v3::ClientStatusResponse FetchCsdsResponse() {
269 envoy::service::status::v3::ClientStatusResponse response;
270 if (!GetParam().use_csds_streaming()) {
271 // Fetch through unary pulls
272 ClientContext context;
273 Status status = csds_stub_->FetchClientStatus(
274 &context, envoy::service::status::v3::ClientStatusRequest(),
275 &response);
276 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
277 << " message=" << status.error_message();
278 } else {
279 // Fetch through streaming pulls
280 EXPECT_TRUE(
281 stream_->Write(envoy::service::status::v3::ClientStatusRequest()));
282 EXPECT_TRUE(stream_->Read(&response));
283 }
284 return response;
285 }
286
287 private:
288 // Server thread for CSDS server.
289 class AdminServerThread : public ServerThread {
290 public:
AdminServerThread(XdsEnd2endTest * test_obj)291 explicit AdminServerThread(XdsEnd2endTest* test_obj)
292 : ServerThread(test_obj) {}
293
294 private:
Type()295 const char* Type() override { return "Admin"; }
296
RegisterAllServices(ServerBuilder * builder)297 void RegisterAllServices(ServerBuilder* builder) override {
298 builder->RegisterService(&csds_service_);
299 }
StartAllServices()300 void StartAllServices() override {}
ShutdownAllServices()301 void ShutdownAllServices() override {}
302
303 grpc::xds::experimental::ClientStatusDiscoveryService csds_service_;
304 };
305
306 std::unique_ptr<AdminServerThread> admin_server_thread_;
307 std::shared_ptr<Channel> admin_channel_;
308 std::unique_ptr<
309 envoy::service::status::v3::ClientStatusDiscoveryService::Stub>
310 csds_stub_;
311 ClientContext stream_context_;
312 std::unique_ptr<
313 ClientReaderWriter<envoy::service::status::v3::ClientStatusRequest,
314 envoy::service::status::v3::ClientStatusResponse>>
315 stream_;
316 };
317
318 // Run CSDS tests with RDS enabled and disabled.
319 // These need to run with the bootstrap from an env var instead of from
320 // a channel arg, since there needs to be a global XdsClient instance.
321 INSTANTIATE_TEST_SUITE_P(
322 XdsTest, ClientStatusDiscoveryServiceTest,
323 ::testing::Values(
324 XdsTestType().set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar),
325 XdsTestType()
326 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
327 .set_enable_rds_testing(),
328 XdsTestType()
329 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
330 .set_use_csds_streaming(),
331 XdsTestType()
332 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
333 .set_enable_rds_testing()
334 .set_use_csds_streaming()),
335 &XdsTestType::Name);
336
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpVanilla)337 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpVanilla) {
338 CreateAndStartBackends(1);
339 const size_t kNumRpcs = 5;
340 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
341 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
342 // Send several RPCs to ensure the xDS setup works
343 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
344 // Fetches the client config
345 auto csds_response = FetchCsdsResponse();
346 gpr_log(GPR_INFO, "xDS config dump: %s", csds_response.DebugString().c_str());
347 ASSERT_EQ(1, csds_response.config_size());
348 const auto& client_config = csds_response.config(0);
349 // Validate the Node information
350 EXPECT_THAT(client_config.node(),
351 EqNode("xds_end2end_test", ::testing::HasSubstr("C-core"),
352 ::testing::HasSubstr(grpc_version_string()),
353 ::testing::ElementsAre(
354 "envoy.lb.does_not_support_overprovisioning")));
355 // Listener matcher depends on whether RDS is enabled.
356 ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
357 if (GetParam().enable_rds_testing()) {
358 api_listener_matcher = IsRdsEnabledHCM();
359 } else {
360 api_listener_matcher =
361 EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
362 }
363 // Construct list of all matchers.
364 std::vector<::testing::Matcher<
365 envoy::service::status::v3::ClientConfig_GenericXdsConfig>>
366 matchers = {
367 // Listener
368 EqGenericXdsConfig(
369 kLdsTypeUrl, kServerName, "1",
370 UnpackListener(EqListener(kServerName, api_listener_matcher)),
371 ClientResourceStatus::ACKED, ::testing::_),
372 // Cluster
373 EqGenericXdsConfig(kCdsTypeUrl, kDefaultClusterName, "1",
374 UnpackCluster(EqCluster(kDefaultClusterName)),
375 ClientResourceStatus::ACKED, ::testing::_),
376 // ClusterLoadAssignment
377 EqGenericXdsConfig(
378 kEdsTypeUrl, kDefaultEdsServiceName, "1",
379 UnpackClusterLoadAssignment(EqClusterLoadAssignment(
380 kDefaultEdsServiceName, backends_[0]->port(),
381 kDefaultLocalityWeight)),
382 ClientResourceStatus::ACKED, ::testing::_),
383 };
384 // If RDS is enabled, add matcher for RDS resource.
385 if (GetParam().enable_rds_testing()) {
386 matchers.push_back(EqGenericXdsConfig(
387 kRdsTypeUrl, kDefaultRouteConfigurationName, "1",
388 UnpackRouteConfiguration(EqRouteConfiguration(
389 kDefaultRouteConfigurationName, kDefaultClusterName)),
390 ClientResourceStatus::ACKED, ::testing::_));
391 }
392 // Validate the dumped xDS configs
393 EXPECT_THAT(client_config.generic_xds_configs(),
394 ::testing::UnorderedElementsAreArray(matchers))
395 << "Actual: " << client_config.DebugString();
396 }
397
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpEmpty)398 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpEmpty) {
399 // The CSDS service should not fail if XdsClient is not initialized or there
400 // is no working xDS configs.
401 FetchCsdsResponse();
402 }
403
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpListenerError)404 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpListenerError) {
405 CreateAndStartBackends(1);
406 int kFetchConfigRetries = 3;
407 int kFetchIntervalMilliseconds = 200;
408 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
409 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
410 // Ensure the xDS resolver has working configs.
411 CheckRpcSendOk(DEBUG_LOCATION);
412 // Bad Listener should be rejected.
413 Listener listener;
414 listener.set_name(kServerName);
415 balancer_->ads_service()->SetLdsResource(listener);
416 // The old xDS configs should still be effective.
417 CheckRpcSendOk(DEBUG_LOCATION);
418 ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
419 if (GetParam().enable_rds_testing()) {
420 api_listener_matcher = IsRdsEnabledHCM();
421 } else {
422 api_listener_matcher =
423 EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
424 }
425 for (int i = 0; i < kFetchConfigRetries; ++i) {
426 auto csds_response = FetchCsdsResponse();
427 // Check if error state is propagated
428 bool ok = ::testing::Value(
429 csds_response.config(0).generic_xds_configs(),
430 ::testing::Contains(EqGenericXdsConfig(
431 kLdsTypeUrl, kServerName, "1",
432 UnpackListener(EqListener(kServerName, api_listener_matcher)),
433 ClientResourceStatus::NACKED,
434 EqUpdateFailureState(
435 ::testing::HasSubstr(
436 "Listener has neither address nor ApiListener"),
437 "2"))));
438 if (ok) return; // TEST PASSED!
439 gpr_sleep_until(
440 grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
441 }
442 FAIL() << "error_state not seen in CSDS responses";
443 }
444
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpRouteError)445 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpRouteError) {
446 CreateAndStartBackends(1);
447 int kFetchConfigRetries = 3;
448 int kFetchIntervalMilliseconds = 200;
449 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
450 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
451 // Ensure the xDS resolver has working configs.
452 CheckRpcSendOk(DEBUG_LOCATION);
453 // Bad route config will be rejected.
454 RouteConfiguration route_config;
455 route_config.set_name(kDefaultRouteConfigurationName);
456 route_config.add_virtual_hosts();
457 SetRouteConfiguration(balancer_.get(), route_config);
458 // The old xDS configs should still be effective.
459 CheckRpcSendOk(DEBUG_LOCATION);
460 for (int i = 0; i < kFetchConfigRetries; ++i) {
461 auto csds_response = FetchCsdsResponse();
462 bool ok = false;
463 if (GetParam().enable_rds_testing()) {
464 ok = ::testing::Value(
465 csds_response.config(0).generic_xds_configs(),
466 ::testing::Contains(EqGenericXdsConfig(
467 kRdsTypeUrl, kDefaultRouteConfigurationName, "1",
468 UnpackRouteConfiguration(EqRouteConfiguration(
469 kDefaultRouteConfigurationName, kDefaultClusterName)),
470 ClientResourceStatus::NACKED,
471 EqUpdateFailureState(
472 ::testing::HasSubstr(
473 "field:virtual_hosts[0].domains error:must be non-empty"),
474 "2"))));
475 } else {
476 ok = ::testing::Value(
477 csds_response.config(0).generic_xds_configs(),
478 ::testing::Contains(EqGenericXdsConfig(
479 kLdsTypeUrl, kServerName, "1",
480 UnpackListener(EqListener(
481 kServerName, EqNoRdsHCM(kDefaultRouteConfigurationName,
482 kDefaultClusterName))),
483 ClientResourceStatus::NACKED,
484 EqUpdateFailureState(
485 ::testing::HasSubstr(
486 "field:api_listener.api_listener.value[envoy.extensions"
487 ".filters.network.http_connection_manager.v3"
488 ".HttpConnectionManager].route_config.virtual_hosts[0]"
489 ".domains error:must be non-empty"),
490 "2"))));
491 }
492 if (ok) return; // TEST PASSED!
493 gpr_sleep_until(
494 grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
495 }
496 FAIL() << "error_state not seen in CSDS responses";
497 }
498
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpClusterError)499 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpClusterError) {
500 CreateAndStartBackends(1);
501 int kFetchConfigRetries = 3;
502 int kFetchIntervalMilliseconds = 200;
503 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
504 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
505 // Ensure the xDS resolver has working configs.
506 CheckRpcSendOk(DEBUG_LOCATION);
507 // Listener without any route, will be rejected.
508 Cluster cluster;
509 cluster.set_name(kDefaultClusterName);
510 balancer_->ads_service()->SetCdsResource(cluster);
511 // The old xDS configs should still be effective.
512 CheckRpcSendOk(DEBUG_LOCATION);
513 for (int i = 0; i < kFetchConfigRetries; ++i) {
514 auto csds_response = FetchCsdsResponse();
515 // Check if error state is propagated
516 bool ok = ::testing::Value(
517 csds_response.config(0).generic_xds_configs(),
518 ::testing::Contains(EqGenericXdsConfig(
519 kCdsTypeUrl, kDefaultClusterName, "1",
520 UnpackCluster(EqCluster(kDefaultClusterName)),
521 ClientResourceStatus::NACKED,
522 EqUpdateFailureState(::testing::HasSubstr("unknown discovery type"),
523 "2"))));
524 if (ok) return; // TEST PASSED!
525 gpr_sleep_until(
526 grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
527 }
528 FAIL() << "error_state not seen in CSDS responses";
529 }
530
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpEndpointError)531 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpEndpointError) {
532 CreateAndStartBackends(1);
533 int kFetchConfigRetries = 3;
534 int kFetchIntervalMilliseconds = 200;
535 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
536 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
537 // Ensure the xDS resolver has working configs.
538 CheckRpcSendOk(DEBUG_LOCATION);
539 // Bad endpoint config will be rejected.
540 ClusterLoadAssignment cluster_load_assignment;
541 cluster_load_assignment.set_cluster_name(kDefaultEdsServiceName);
542 auto* endpoints = cluster_load_assignment.add_endpoints();
543 endpoints->mutable_load_balancing_weight()->set_value(1);
544 auto* endpoint = endpoints->add_lb_endpoints()->mutable_endpoint();
545 endpoint->mutable_address()->mutable_socket_address()->set_port_value(1 << 1);
546 balancer_->ads_service()->SetEdsResource(cluster_load_assignment);
547 // The old xDS configs should still be effective.
548 CheckRpcSendOk(DEBUG_LOCATION);
549 for (int i = 0; i < kFetchConfigRetries; ++i) {
550 auto csds_response = FetchCsdsResponse();
551 // Check if error state is propagated
552 bool ok = ::testing::Value(
553 csds_response.config(0).generic_xds_configs(),
554 ::testing::Contains(EqGenericXdsConfig(
555 kEdsTypeUrl, kDefaultEdsServiceName, "1",
556 UnpackClusterLoadAssignment(EqClusterLoadAssignment(
557 kDefaultEdsServiceName, backends_[0]->port(),
558 kDefaultLocalityWeight)),
559 ClientResourceStatus::NACKED,
560 EqUpdateFailureState(
561 ::testing::HasSubstr(
562 "errors parsing EDS resource: ["
563 "field:endpoints[0].locality error:field not present]"),
564 "2"))));
565 if (ok) return; // TEST PASSED!
566 gpr_sleep_until(
567 grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
568 }
569 FAIL() << "error_state not seen in CSDS responses";
570 }
571
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpListenerRequested)572 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpListenerRequested) {
573 int kTimeoutMillisecond = 1000;
574 balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
575 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::DEADLINE_EXCEEDED,
576 "Deadline Exceeded",
577 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
578 auto csds_response = FetchCsdsResponse();
579 EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
580 ::testing::Contains(EqGenericXdsConfig(
581 kLdsTypeUrl, kServerName, ::testing::_, ::testing::_,
582 ClientResourceStatus::REQUESTED, ::testing::_)));
583 }
584
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpClusterRequested)585 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpClusterRequested) {
586 int kTimeoutMillisecond = 1000;
587 std::string kClusterName1 = "cluster-1";
588 std::string kClusterName2 = "cluster-2";
589 // Create a route config requesting two non-existing clusters
590 RouteConfiguration route_config;
591 route_config.set_name(kDefaultRouteConfigurationName);
592 auto* vh = route_config.add_virtual_hosts();
593 // The VirtualHost must match the domain name, otherwise will cause resolver
594 // transient failure.
595 vh->add_domains("*");
596 auto* routes1 = vh->add_routes();
597 routes1->mutable_match()->set_prefix("");
598 routes1->mutable_route()->set_cluster(kClusterName1);
599 auto* routes2 = vh->add_routes();
600 routes2->mutable_match()->set_prefix("");
601 routes2->mutable_route()->set_cluster(kClusterName2);
602 SetRouteConfiguration(balancer_.get(), route_config);
603 // Try to get the configs plumb through
604 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::DEADLINE_EXCEEDED,
605 "Deadline Exceeded",
606 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
607 auto csds_response = FetchCsdsResponse();
608 EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
609 ::testing::AllOf(
610 ::testing::Contains(EqGenericXdsConfig(
611 kCdsTypeUrl, kClusterName1, ::testing::_, ::testing::_,
612 ClientResourceStatus::REQUESTED, ::testing::_)),
613 ::testing::Contains(EqGenericXdsConfig(
614 kCdsTypeUrl, kClusterName2, ::testing::_, ::testing::_,
615 ClientResourceStatus::REQUESTED, ::testing::_))));
616 }
617
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpMultiClient)618 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpMultiClient) {
619 Listener listener = default_listener_;
620 const char* kServer2Name = "server2.example.com";
621 listener.set_name(kServer2Name);
622 balancer_->ads_service()->SetLdsResource(listener);
623 SetListenerAndRouteConfiguration(balancer_.get(), listener,
624 default_route_config_);
625 CreateAndStartBackends(1);
626 const size_t kNumRpcs = 5;
627 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
628 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
629 // Send several RPCs to ensure the xDS setup works
630 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
631 // Connect to a second server
632 auto channel2 = CreateChannel(0, kServer2Name);
633 channel2->GetState(/*try_to_connect=*/true);
634 ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1)));
635 // Fetches the client config
636 auto csds_response = FetchCsdsResponse();
637 ASSERT_EQ(2, csds_response.config_size());
638 std::vector<std::string> scopes;
639 for (const auto& client_config : csds_response.config()) {
640 // Validate the Node information
641 EXPECT_THAT(client_config.node(),
642 EqNode("xds_end2end_test", ::testing::HasSubstr("C-core"),
643 ::testing::HasSubstr(grpc_version_string()),
644 ::testing::ElementsAre(
645 "envoy.lb.does_not_support_overprovisioning")));
646 scopes.emplace_back(client_config.client_scope());
647 absl::string_view server = client_config.client_scope();
648 // Listener matcher depends on whether RDS is enabled.
649 ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
650 if (GetParam().enable_rds_testing()) {
651 api_listener_matcher = IsRdsEnabledHCM();
652 } else {
653 api_listener_matcher =
654 EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
655 }
656 // Construct list of all matchers.
657 std::vector<::testing::Matcher<
658 envoy::service::status::v3::ClientConfig_GenericXdsConfig>>
659 matchers = {
660 // Listener
661 EqGenericXdsConfig(
662 kLdsTypeUrl, absl::StripPrefix(server, "xds:"), "3",
663 UnpackListener(EqListener(absl::StripPrefix(server, "xds:"),
664 api_listener_matcher)),
665 ClientResourceStatus::ACKED, ::testing::_),
666 // Cluster
667 EqGenericXdsConfig(kCdsTypeUrl, kDefaultClusterName, "1",
668 UnpackCluster(EqCluster(kDefaultClusterName)),
669 ClientResourceStatus::ACKED, ::testing::_),
670 // ClusterLoadAssignment
671 EqGenericXdsConfig(
672 kEdsTypeUrl, kDefaultEdsServiceName, "1",
673 UnpackClusterLoadAssignment(EqClusterLoadAssignment(
674 kDefaultEdsServiceName, backends_[0]->port(),
675 kDefaultLocalityWeight)),
676 ClientResourceStatus::ACKED, ::testing::_),
677 };
678 // If RDS is enabled, add matcher for RDS resource.
679 if (GetParam().enable_rds_testing()) {
680 matchers.push_back(EqGenericXdsConfig(
681 kRdsTypeUrl, kDefaultRouteConfigurationName, "2",
682 UnpackRouteConfiguration(EqRouteConfiguration(
683 kDefaultRouteConfigurationName, kDefaultClusterName)),
684 ClientResourceStatus::ACKED, ::testing::_));
685 }
686 // Validate the dumped xDS configs
687 EXPECT_THAT(client_config.generic_xds_configs(),
688 ::testing::UnorderedElementsAreArray(matchers));
689 }
690 EXPECT_THAT(scopes, ::testing::UnorderedElementsAre(
691 "xds:server.example.com", "xds:server2.example.com"));
692 }
693
694 class CsdsShortAdsTimeoutTest : public ClientStatusDiscoveryServiceTest {
695 protected:
SetUp()696 void SetUp() override {
697 // Shorten the ADS subscription timeout to speed up the test run.
698 InitClient(absl::nullopt, /*lb_expected_authority=*/"",
699 /*xds_resource_does_not_exist_timeout_ms=*/2000);
700 }
701 };
702
703 // Run CSDS tests with RDS enabled and disabled.
704 // These need to run with the bootstrap from an env var instead of from
705 // a channel arg, since there needs to be a global XdsClient instance.
706 INSTANTIATE_TEST_SUITE_P(
707 XdsTest, CsdsShortAdsTimeoutTest,
708 ::testing::Values(
709 XdsTestType().set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar),
710 XdsTestType()
711 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
712 .set_enable_rds_testing(),
713 XdsTestType()
714 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
715 .set_use_csds_streaming(),
716 XdsTestType()
717 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
718 .set_enable_rds_testing()
719 .set_use_csds_streaming()),
720 &XdsTestType::Name);
721
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpListenerDoesNotExist)722 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpListenerDoesNotExist) {
723 int kTimeoutMillisecond = 1000000; // 1000s wait for the transient failure.
724 balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
725 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
726 absl::StrCat("empty address list: ", kServerName,
727 ": xDS listener resource does not exist"),
728 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
729 auto csds_response = FetchCsdsResponse();
730 EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
731 ::testing::Contains(EqGenericXdsConfig(
732 kLdsTypeUrl, kServerName, ::testing::_, ::testing::_,
733 ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
734 }
735
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpRouteConfigDoesNotExist)736 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpRouteConfigDoesNotExist) {
737 if (!GetParam().enable_rds_testing()) return;
738 int kTimeoutMillisecond = 1000000; // 1000s wait for the transient failure.
739 balancer_->ads_service()->UnsetResource(kRdsTypeUrl,
740 kDefaultRouteConfigurationName);
741 CheckRpcSendFailure(
742 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
743 absl::StrCat("empty address list: ", kDefaultRouteConfigurationName,
744 ": xDS route configuration resource does not exist"),
745 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
746 auto csds_response = FetchCsdsResponse();
747 EXPECT_THAT(
748 csds_response.config(0).generic_xds_configs(),
749 ::testing::Contains(EqGenericXdsConfig(
750 kRdsTypeUrl, kDefaultRouteConfigurationName, ::testing::_,
751 ::testing::_, ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
752 }
753
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpClusterDoesNotExist)754 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpClusterDoesNotExist) {
755 int kTimeoutMillisecond = 1000000; // 1000s wait for the transient failure.
756 balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
757 CheckRpcSendFailure(
758 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
759 absl::StrCat("CDS resource ", kDefaultClusterName, " does not exist"),
760 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
761 auto csds_response = FetchCsdsResponse();
762 EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
763 ::testing::Contains(EqGenericXdsConfig(
764 kCdsTypeUrl, kDefaultClusterName, ::testing::_, ::testing::_,
765 ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
766 }
767
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpEndpointDoesNotExist)768 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpEndpointDoesNotExist) {
769 int kTimeoutMillisecond = 1000000; // 1000s wait for the transient failure.
770 balancer_->ads_service()->UnsetResource(kEdsTypeUrl, kDefaultEdsServiceName);
771 CheckRpcSendFailure(
772 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
773 "no children in weighted_target policy: EDS resource eds_service_name "
774 "does not exist",
775 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
776 auto csds_response = FetchCsdsResponse();
777 EXPECT_THAT(
778 csds_response.config(0).generic_xds_configs(),
779 ::testing::Contains(EqGenericXdsConfig(
780 kEdsTypeUrl, kDefaultEdsServiceName, ::testing::_, ::testing::_,
781 ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
782 }
783
784 } // namespace
785 } // namespace testing
786 } // namespace grpc
787
788 #endif // DISABLED_XDS_PROTO_IN_CC
789
main(int argc,char ** argv)790 int main(int argc, char** argv) {
791 grpc::testing::TestEnvironment env(&argc, argv);
792 ::testing::InitGoogleTest(&argc, argv);
793 // Make the backup poller poll very frequently in order to pick up
794 // updates from all the subchannels's FDs.
795 grpc_core::ConfigVars::Overrides overrides;
796 overrides.client_channel_backup_poll_interval_ms = 1;
797 grpc_core::ConfigVars::SetOverrides(overrides);
798 #if TARGET_OS_IPHONE
799 // Workaround Apple CFStream bug
800 grpc_core::SetEnv("grpc_cfstream", "0");
801 #endif
802 grpc_init();
803 const auto result = RUN_ALL_TESTS();
804 grpc_shutdown();
805 return result;
806 }
807