xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/xds/xds_fallback_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 #include <iostream>
16 #include <memory>
17 #include <string>
18 #include <string_view>
19 #include <type_traits>
20 
21 #include <gmock/gmock.h>
22 #include <gtest/gtest.h>
23 
24 #include "absl/cleanup/cleanup.h"
25 #include "absl/strings/str_format.h"
26 #include "absl/strings/strip.h"
27 
28 #include <grpcpp/create_channel.h>
29 #include <grpcpp/security/credentials.h>
30 #include <grpcpp/support/status.h>
31 
32 #include "src/core/client_channel/backup_poller.h"
33 #include "src/core/lib/config/config_vars.h"
34 #include "src/core/lib/gprpp/env.h"
35 #include "src/cpp/client/secure_credentials.h"
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "src/proto/grpc/testing/echo_messages.pb.h"
38 #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
39 #include "src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h"
40 #include "src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h"
41 #include "src/proto/grpc/testing/xds/v3/listener.grpc.pb.h"
42 #include "src/proto/grpc/testing/xds/v3/route.grpc.pb.h"
43 #include "test/core/util/resolve_localhost_ip46.h"
44 #include "test/core/util/scoped_env_var.h"
45 #include "test/core/util/test_config.h"
46 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
47 #include "test/cpp/end2end/xds/xds_utils.h"
48 
49 namespace grpc {
50 namespace testing {
51 namespace {
52 
53 constexpr char const* kErrorMessage = "test forced ADS stream failure";
54 
55 class XdsFallbackTest : public XdsEnd2endTest {
56  public:
XdsFallbackTest()57   XdsFallbackTest()
58       : fallback_balancer_(CreateAndStartBalancer("Fallback Balancer")) {}
59 
SetUp()60   void SetUp() override {
61     // Overrides SetUp from a base class so we can call InitClient per-test case
62   }
63 
TearDown()64   void TearDown() override {
65     fallback_balancer_->Shutdown();
66     XdsEnd2endTest::TearDown();
67   }
68 
SetXdsResourcesForServer(BalancerServerThread * balancer,size_t backend,absl::string_view server_name="",absl::string_view authority="")69   void SetXdsResourcesForServer(BalancerServerThread* balancer, size_t backend,
70                                 absl::string_view server_name = "",
71                                 absl::string_view authority = "") {
72     Listener listener = default_listener_;
73     RouteConfiguration route_config = default_route_config_;
74     Cluster cluster = default_cluster_;
75     // Default server uses default resources when no authority, to enable using
76     // more test framework functions.
77     if (!server_name.empty() || !authority.empty()) {
78       auto get_resource_name = [&](absl::string_view resource_type) {
79         absl::string_view stripped_resource_type =
80             absl::StripPrefix(resource_type, "type.googleapis.com/");
81         if (authority.empty()) {
82           if (resource_type == kLdsTypeUrl) return std::string(server_name);
83           return absl::StrFormat("%s_%s", stripped_resource_type, server_name);
84         }
85         return absl::StrFormat("xdstp://%s/%s/%s", authority,
86                                stripped_resource_type, server_name);
87       };
88       listener.set_name(get_resource_name(kLdsTypeUrl));
89       cluster.set_name(get_resource_name(kCdsTypeUrl));
90       cluster.mutable_eds_cluster_config()->set_service_name(
91           get_resource_name(kEdsTypeUrl));
92       route_config.set_name(get_resource_name(kRdsTypeUrl));
93       route_config.mutable_virtual_hosts(0)
94           ->mutable_routes(0)
95           ->mutable_route()
96           ->set_cluster(cluster.name());
97     }
98     SetListenerAndRouteConfiguration(balancer, listener, route_config);
99     balancer->ads_service()->SetCdsResource(cluster);
100     balancer->ads_service()->SetEdsResource(BuildEdsResource(
101         EdsResourceArgs(
102             {{"locality0", CreateEndpointsForBackends(backend, backend + 1)}}),
103         cluster.eds_cluster_config().service_name()));
104   }
105 
ExpectBackendCall(EchoTestService::Stub * stub,int backend,grpc_core::DebugLocation location)106   void ExpectBackendCall(EchoTestService::Stub* stub, int backend,
107                          grpc_core::DebugLocation location) {
108     ClientContext context;
109     EchoRequest request;
110     EchoResponse response;
111     RpcOptions().SetupRpc(&context, &request);
112     Status status = stub->Echo(&context, request, &response);
113     EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
114                              << " message=" << status.error_message() << "\n"
115                              << location.file() << ':' << location.line();
116     EXPECT_EQ(1U, backends_[backend]->backend_service()->request_count())
117         << "\n"
118         << location.file() << ':' << location.line();
119   }
120 
121  protected:
122   std::unique_ptr<BalancerServerThread> fallback_balancer_;
123 };
124 
TEST_P(XdsFallbackTest,FallbackAndRecover)125 TEST_P(XdsFallbackTest, FallbackAndRecover) {
126   grpc_core::testing::ScopedEnvVar fallback_enabled(
127       "GRPC_EXPERIMENTAL_XDS_FALLBACK", "1");
128   auto broken_balancer = CreateAndStartBalancer("Broken balancer");
129   broken_balancer->ads_service()->ForceADSFailure(
130       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
131   InitClient(XdsBootstrapBuilder().SetServers({
132       balancer_->target(),
133       broken_balancer->target(),
134       fallback_balancer_->target(),
135   }));
136   // Primary xDS server has backends_[0] configured and fallback server has
137   // backends_[1]
138   CreateAndStartBackends(2);
139   SetXdsResourcesForServer(balancer_.get(), 0);
140   SetXdsResourcesForServer(fallback_balancer_.get(), 1);
141   balancer_->ads_service()->ForceADSFailure(
142       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
143   // Primary server down, fallback server data is used (backends_[1])
144   CheckRpcSendOk(DEBUG_LOCATION);
145   EXPECT_EQ(backends_[0]->backend_service()->request_count(), 0);
146   EXPECT_EQ(backends_[1]->backend_service()->request_count(), 1);
147   // Primary server is back. backends_[0] will be used when the data makes it
148   // all way to the client
149   balancer_->ads_service()->ClearADSFailure();
150   WaitForBackend(DEBUG_LOCATION, 0);
151   broken_balancer->Shutdown();
152 }
153 
TEST_P(XdsFallbackTest,EnvVarNotSet)154 TEST_P(XdsFallbackTest, EnvVarNotSet) {
155   InitClient(XdsBootstrapBuilder().SetServers({
156       balancer_->target(),
157       fallback_balancer_->target(),
158   }));
159   // Primary xDS server has backends_[0] configured and fallback server has
160   // backends_[1]
161   CreateAndStartBackends(2);
162   SetXdsResourcesForServer(balancer_.get(), 0);
163   SetXdsResourcesForServer(fallback_balancer_.get(), 1);
164   balancer_->ads_service()->ForceADSFailure(
165       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
166   // Primary server down, failure should be reported
167   CheckRpcSendFailure(
168       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
169       absl::StrFormat("server.example.com: UNAVAILABLE: xDS channel for server "
170                       "localhost:%d: xDS call failed with no responses "
171                       "received; status: RESOURCE_EXHAUSTED: test forced ADS "
172                       "stream failure \\(node ID:xds_end2end_test\\)",
173                       balancer_->port()));
174 }
175 
TEST_P(XdsFallbackTest,PrimarySecondaryNotAvailable)176 TEST_P(XdsFallbackTest, PrimarySecondaryNotAvailable) {
177   grpc_core::testing::ScopedEnvVar fallback_enabled(
178       "GRPC_EXPERIMENTAL_XDS_FALLBACK", "1");
179   InitClient(XdsBootstrapBuilder().SetServers(
180       {balancer_->target(), fallback_balancer_->target()}));
181   balancer_->ads_service()->ForceADSFailure(
182       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
183   fallback_balancer_->ads_service()->ForceADSFailure(
184       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
185   CheckRpcSendFailure(
186       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
187       absl::StrFormat(
188           "server.example.com: UNAVAILABLE: xDS channel for server "
189           "localhost:%d: xDS call failed with no responses received; "
190           "status: RESOURCE_EXHAUSTED: test forced ADS stream failure \\(node "
191           "ID:xds_end2end_test\\)",
192           fallback_balancer_->port()));
193 }
194 
TEST_P(XdsFallbackTest,UsesCachedResourcesAfterFailure)195 TEST_P(XdsFallbackTest, UsesCachedResourcesAfterFailure) {
196   constexpr absl::string_view kServerName2 = "server2.example.com";
197   grpc_core::testing::ScopedEnvVar fallback_enabled(
198       "GRPC_EXPERIMENTAL_XDS_FALLBACK", "1");
199   InitClient(XdsBootstrapBuilder().SetServers(
200       {balancer_->target(), fallback_balancer_->target()}));
201   // 4 backends - cross product of two data plane targets and two balancers
202   CreateAndStartBackends(4);
203   SetXdsResourcesForServer(balancer_.get(), 0);
204   SetXdsResourcesForServer(fallback_balancer_.get(), 1);
205   SetXdsResourcesForServer(balancer_.get(), 2, kServerName2);
206   SetXdsResourcesForServer(fallback_balancer_.get(), 3, kServerName2);
207   CheckRpcSendOk(DEBUG_LOCATION);
208   EXPECT_EQ(backends_[0]->backend_service()->request_count(), 1);
209   balancer_->ads_service()->ForceADSFailure(
210       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
211   auto channel = CreateChannel(0, std::string(kServerName2).c_str());
212   auto stub = grpc::testing::EchoTestService::NewStub(channel);
213   // server2.example.com is configured from the fallback server
214   ExpectBackendCall(stub.get(), 3, DEBUG_LOCATION);
215   // Calling server.example.com still uses cached value
216   CheckRpcSendOk(DEBUG_LOCATION);
217   EXPECT_EQ(backends_[0]->backend_service()->request_count(), 2);
218   EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
219 }
220 
TEST_P(XdsFallbackTest,PerAuthorityFallback)221 TEST_P(XdsFallbackTest, PerAuthorityFallback) {
222   auto fallback_balancer2 = CreateAndStartBalancer("Fallback for Authority2");
223   // Use cleanup in case test assertion fails
224   auto balancer2_cleanup =
225       absl::MakeCleanup([&]() { fallback_balancer2->Shutdown(); });
226   grpc_core::testing::ScopedEnvVar fallback_enabled(
227       "GRPC_EXPERIMENTAL_XDS_FALLBACK", "1");
228   grpc_core::testing::ScopedExperimentalEnvVar env_var(
229       "GRPC_EXPERIMENTAL_XDS_FEDERATION");
230   const char* kAuthority1 = "xds1.example.com";
231   const char* kAuthority2 = "xds2.example.com";
232   constexpr absl::string_view kServer1Name = "server1.example.com";
233   constexpr absl::string_view kServer2Name = "server2.example.com";
234   // Authority1 uses balancer_ and fallback_balancer_
235   // Authority2 uses balancer_ and fallback_balancer2
236   XdsBootstrapBuilder builder;
237   builder.SetServers({balancer_->target()});
238   builder.AddAuthority(kAuthority1,
239                        {balancer_->target(), fallback_balancer_->target()});
240   builder.AddAuthority(kAuthority2,
241                        {balancer_->target(), fallback_balancer2->target()});
242   InitClient(builder);
243   CreateAndStartBackends(4);
244   SetXdsResourcesForServer(fallback_balancer_.get(), 0, kServer1Name,
245                            kAuthority1);
246   SetXdsResourcesForServer(fallback_balancer2.get(), 1, kServer2Name,
247                            kAuthority2);
248   SetXdsResourcesForServer(balancer_.get(), 2, kServer1Name, kAuthority1);
249   SetXdsResourcesForServer(balancer_.get(), 3, kServer2Name, kAuthority2);
250   // Primary balancer is down, using the fallback servers
251   balancer_->ads_service()->ForceADSFailure(
252       Status(StatusCode::RESOURCE_EXHAUSTED, kErrorMessage));
253   // Create second channel to new target URI and send 1 RPC.
254   auto authority1_stub = grpc::testing::EchoTestService::NewStub(CreateChannel(
255       /*failover_timeout_ms=*/0, std::string(kServer1Name).c_str(),
256       kAuthority1));
257   auto authority2_stub = grpc::testing::EchoTestService::NewStub(CreateChannel(
258       /*failover_timeout_ms=*/0, std::string(kServer2Name).c_str(),
259       kAuthority2));
260   ExpectBackendCall(authority1_stub.get(), 0, DEBUG_LOCATION);
261   ExpectBackendCall(authority2_stub.get(), 1, DEBUG_LOCATION);
262   // Primary balancer is up, its data will be used now.
263   balancer_->ads_service()->ClearADSFailure();
264   auto deadline =
265       absl::Now() + (absl::Seconds(5) * grpc_test_slowdown_factor());
266   while (absl::Now() < deadline &&
267          (backends_[2]->backend_service()->request_count() == 0 ||
268           backends_[3]->backend_service()->request_count() == 0)) {
269     ClientContext context;
270     EchoRequest request;
271     EchoResponse response;
272     RpcOptions().SetupRpc(&context, &request);
273     Status status = authority1_stub->Echo(&context, request, &response);
274     EXPECT_TRUE(status.ok()) << status.error_message();
275     ClientContext context2;
276     EchoRequest request2;
277     EchoResponse response2;
278     RpcOptions().SetupRpc(&context2, &request2);
279     status = authority2_stub->Echo(&context2, request2, &response2);
280     EXPECT_TRUE(status.ok()) << status.error_message();
281   }
282   ASSERT_LE(1U, backends_[2]->backend_service()->request_count());
283   ASSERT_LE(1U, backends_[3]->backend_service()->request_count());
284 }
285 
286 INSTANTIATE_TEST_SUITE_P(XdsTest, XdsFallbackTest,
287                          ::testing::Values(XdsTestType().set_bootstrap_source(
288                              XdsTestType::kBootstrapFromEnvVar)),
289                          &XdsTestType::Name);
290 
291 }  // namespace
292 }  // namespace testing
293 }  // namespace grpc
294 
main(int argc,char ** argv)295 int main(int argc, char** argv) {
296   grpc::testing::TestEnvironment env(&argc, argv);
297   ::testing::InitGoogleTest(&argc, argv);
298   // Make the backup poller poll very frequently in order to pick up
299   // updates from all the subchannels's FDs.
300   grpc_core::ConfigVars::Overrides overrides;
301   overrides.client_channel_backup_poll_interval_ms = 1;
302   grpc_core::ConfigVars::SetOverrides(overrides);
303 #if TARGET_OS_IPHONE
304   // Workaround Apple CFStream bug
305   grpc_core::SetEnv("grpc_cfstream", "0");
306 #endif
307   grpc_init();
308   const auto result = RUN_ALL_TESTS();
309   grpc_shutdown();
310   return result;
311 }
312