1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/interop/xds_interop_server_lib.h"
20
21 #include <memory>
22
23 #include "absl/strings/str_cat.h"
24 #include "absl/strings/str_split.h"
25
26 #include <grpc/grpc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
29 #include <grpcpp/ext/admin_services.h>
30 #include <grpcpp/ext/proto_server_reflection_plugin.h>
31 #include <grpcpp/server.h>
32 #include <grpcpp/server_builder.h>
33 #include <grpcpp/server_context.h>
34 #include <grpcpp/xds_server_builder.h>
35
36 #include "src/proto/grpc/testing/empty.pb.h"
37 #include "src/proto/grpc/testing/messages.pb.h"
38 #include "src/proto/grpc/testing/test.grpc.pb.h"
39 #include "test/cpp/end2end/test_health_check_service_impl.h"
40 #include "test/cpp/interop/pre_stop_hook_server.h"
41
42 namespace grpc {
43 namespace testing {
44 namespace {
45
46 using grpc::Server;
47 using grpc::ServerBuilder;
48 using grpc::ServerContext;
49 using grpc::Status;
50 using grpc::XdsServerBuilder;
51 using grpc::testing::Empty;
52 using grpc::testing::HealthCheckServiceImpl;
53 using grpc::testing::SimpleRequest;
54 using grpc::testing::SimpleResponse;
55 using grpc::testing::TestService;
56 using grpc::testing::XdsUpdateHealthService;
57
58 constexpr absl::string_view kRpcBehaviorMetadataKey = "rpc-behavior";
59 constexpr absl::string_view kErrorCodeRpcBehavior = "error-code-";
60 constexpr absl::string_view kHostnameRpcBehaviorFilter = "hostname=";
61
GetRpcBehaviorMetadata(ServerContext * context)62 std::vector<std::string> GetRpcBehaviorMetadata(ServerContext* context) {
63 std::vector<std::string> rpc_behaviors;
64 auto rpc_behavior_metadata =
65 context->client_metadata().equal_range(grpc::string_ref(
66 kRpcBehaviorMetadataKey.data(), kRpcBehaviorMetadataKey.length()));
67 for (auto metadata = rpc_behavior_metadata.first;
68 metadata != rpc_behavior_metadata.second; ++metadata) {
69 auto value = metadata->second;
70 for (auto behavior :
71 absl::StrSplit(absl::string_view(value.data(), value.length()), ',')) {
72 rpc_behaviors.emplace_back(behavior);
73 }
74 }
75 return rpc_behaviors;
76 }
77
78 class TestServiceImpl : public TestService::Service {
79 public:
TestServiceImpl(absl::string_view hostname,absl::string_view server_id)80 explicit TestServiceImpl(absl::string_view hostname,
81 absl::string_view server_id)
82 : hostname_(hostname), server_id_(server_id) {}
83
UnaryCall(ServerContext * context,const SimpleRequest * request,SimpleResponse * response)84 Status UnaryCall(ServerContext* context, const SimpleRequest* request,
85 SimpleResponse* response) override {
86 response->set_server_id(server_id_);
87 for (const auto& rpc_behavior : GetRpcBehaviorMetadata(context)) {
88 auto maybe_status =
89 GetStatusForRpcBehaviorMetadata(rpc_behavior, hostname_);
90 if (maybe_status.has_value()) {
91 return *maybe_status;
92 }
93 }
94 if (request->response_size() > 0) {
95 std::string payload(request->response_size(), '0');
96 response->mutable_payload()->set_body(payload.c_str(),
97 request->response_size());
98 }
99 response->set_hostname(hostname_);
100 context->AddInitialMetadata("hostname", hostname_);
101 return Status::OK;
102 }
103
EmptyCall(ServerContext * context,const Empty *,Empty *)104 Status EmptyCall(ServerContext* context, const Empty* /*request*/,
105 Empty* /*response*/) override {
106 context->AddInitialMetadata("hostname", hostname_);
107 return Status::OK;
108 }
109
110 private:
111 std::string hostname_;
112 std::string server_id_;
113 };
114
115 class XdsUpdateHealthServiceImpl : public XdsUpdateHealthService::Service {
116 public:
XdsUpdateHealthServiceImpl(HealthCheckServiceImpl * health_check_service,std::unique_ptr<PreStopHookServerManager> pre_stop_hook_server)117 explicit XdsUpdateHealthServiceImpl(
118 HealthCheckServiceImpl* health_check_service,
119 std::unique_ptr<PreStopHookServerManager> pre_stop_hook_server)
120 : health_check_service_(health_check_service),
121 pre_stop_hook_server_(std::move(pre_stop_hook_server)) {}
122
SetServing(ServerContext *,const Empty *,Empty *)123 Status SetServing(ServerContext* /* context */, const Empty* /* request */,
124 Empty* /* response */) override {
125 health_check_service_->SetAll(
126 grpc::health::v1::HealthCheckResponse::SERVING);
127 return Status::OK;
128 }
129
SetNotServing(ServerContext *,const Empty *,Empty *)130 Status SetNotServing(ServerContext* /* context */, const Empty* /* request */,
131 Empty* /* response */) override {
132 health_check_service_->SetAll(
133 grpc::health::v1::HealthCheckResponse::NOT_SERVING);
134 return Status::OK;
135 }
136
SendHookRequest(ServerContext *,const HookRequest * request,HookResponse *)137 Status SendHookRequest(ServerContext* /* context */,
138 const HookRequest* request,
139 HookResponse* /* response */) override {
140 switch (request->command()) {
141 case HookRequest::START:
142 return pre_stop_hook_server_->Start(request->server_port(), 30 /* s */);
143 case HookRequest::STOP:
144 return pre_stop_hook_server_->Stop();
145 case HookRequest::RETURN:
146 pre_stop_hook_server_->Return(
147 static_cast<StatusCode>(request->grpc_code_to_return()),
148 request->grpc_status_description());
149 return Status::OK;
150 default:
151 return Status(
152 StatusCode::INVALID_ARGUMENT,
153 absl::StrFormat("Invalid command %d", request->command()));
154 }
155 }
156
157 private:
158 HealthCheckServiceImpl* const health_check_service_;
159 std::unique_ptr<PreStopHookServerManager> pre_stop_hook_server_;
160 };
161
162 class MaintenanceServices {
163 public:
MaintenanceServices()164 MaintenanceServices()
165 : update_health_service_(&health_check_service_,
166 std::make_unique<PreStopHookServerManager>()) {
167 health_check_service_.SetStatus(
168 "", grpc::health::v1::HealthCheckResponse::SERVING);
169 health_check_service_.SetStatus(
170 "grpc.testing.TestService",
171 grpc::health::v1::HealthCheckResponse::SERVING);
172 health_check_service_.SetStatus(
173 "grpc.testing.XdsUpdateHealthService",
174 grpc::health::v1::HealthCheckResponse::SERVING);
175 }
176
AddToServerBuilder(ServerBuilder * builder)177 void AddToServerBuilder(ServerBuilder* builder) {
178 builder->RegisterService(&health_check_service_);
179 builder->RegisterService(&update_health_service_);
180 builder->RegisterService(&hook_service_);
181 grpc::AddAdminServices(builder);
182 }
183
184 private:
185 HealthCheckServiceImpl health_check_service_;
186 XdsUpdateHealthServiceImpl update_health_service_;
187 HookServiceImpl hook_service_;
188 };
189 } // namespace
190
GetStatusForRpcBehaviorMetadata(absl::string_view header_value,absl::string_view hostname)191 absl::optional<grpc::Status> GetStatusForRpcBehaviorMetadata(
192 absl::string_view header_value, absl::string_view hostname) {
193 for (auto part : absl::StrSplit(header_value, ' ')) {
194 if (absl::ConsumePrefix(&part, kHostnameRpcBehaviorFilter)) {
195 gpr_log(GPR_INFO, "%s", std::string(part).c_str());
196 if (part.empty()) {
197 return Status(
198 grpc::StatusCode::INVALID_ARGUMENT,
199 absl::StrCat("Empty host name in the RPC behavior header: ",
200 header_value));
201 }
202 if (part != hostname) {
203 gpr_log(
204 GPR_DEBUG,
205 "RPC behavior for a different host: \"%s\", this one is: \"%s\"",
206 std::string(part).c_str(), std::string(hostname).c_str());
207 return absl::nullopt;
208 }
209 } else if (absl::ConsumePrefix(&part, kErrorCodeRpcBehavior)) {
210 grpc::StatusCode code;
211 if (absl::SimpleAtoi(part, &code)) {
212 return Status(
213 code,
214 absl::StrCat("Rpc failed as per the rpc-behavior header value: ",
215 header_value));
216 } else {
217 return Status(grpc::StatusCode::INVALID_ARGUMENT,
218 absl::StrCat("Invalid format for rpc-behavior header: ",
219 header_value));
220 }
221 } else {
222 // TODO (eugeneo): Add support for other behaviors as needed
223 return Status(
224 grpc::StatusCode::INVALID_ARGUMENT,
225 absl::StrCat("Unsupported rpc behavior header: ", header_value));
226 }
227 }
228 return absl::nullopt;
229 }
230
RunServer(bool secure_mode,bool enable_csm_observability,int port,const int maintenance_port,absl::string_view hostname,absl::string_view server_id,const std::function<void (Server *)> & server_callback)231 void RunServer(bool secure_mode, bool enable_csm_observability, int port,
232 const int maintenance_port, absl::string_view hostname,
233 absl::string_view server_id,
234 const std::function<void(Server*)>& server_callback) {
235 std::unique_ptr<Server> xds_enabled_server;
236 std::unique_ptr<Server> server;
237 TestServiceImpl service(hostname, server_id);
238
239 grpc::reflection::InitProtoReflectionServerBuilderPlugin();
240 MaintenanceServices maintenance_services;
241 if (secure_mode) {
242 XdsServerBuilder xds_builder;
243 xds_builder.RegisterService(&service);
244 xds_builder.AddListeningPort(
245 absl::StrCat("0.0.0.0:", port),
246 grpc::XdsServerCredentials(grpc::InsecureServerCredentials()));
247 xds_enabled_server = xds_builder.BuildAndStart();
248 gpr_log(GPR_INFO, "Server starting on 0.0.0.0:%d", port);
249 ServerBuilder builder;
250 maintenance_services.AddToServerBuilder(&builder);
251 server = builder
252 .AddListeningPort(absl::StrCat("0.0.0.0:", maintenance_port),
253 grpc::InsecureServerCredentials())
254 .BuildAndStart();
255 gpr_log(GPR_INFO, "Maintenance server listening on 0.0.0.0:%d",
256 maintenance_port);
257 } else {
258 // CSM Observability requires an xDS enabled server.
259 auto builder = enable_csm_observability
260 ? std::make_unique<XdsServerBuilder>()
261 : std::make_unique<ServerBuilder>();
262 maintenance_services.AddToServerBuilder(builder.get());
263 server = builder
264 ->AddListeningPort(absl::StrCat("0.0.0.0:", port),
265 grpc::InsecureServerCredentials())
266 .RegisterService(&service)
267 .BuildAndStart();
268 gpr_log(GPR_INFO, "Server listening on 0.0.0.0:%d", port);
269 }
270 server_callback(server.get());
271 server->Wait();
272 }
273
274 } // namespace testing
275 } // namespace grpc
276