1 // 2 // 3 // Copyright 2016 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 20 #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 21 22 #include <condition_variable> 23 #include <memory> 24 #include <mutex> 25 #include <string> 26 #include <thread> 27 28 #include <gtest/gtest.h> 29 30 #include <grpc/grpc.h> 31 #include <grpc/support/log.h> 32 #include <grpcpp/alarm.h> 33 #include <grpcpp/security/credentials.h> 34 #include <grpcpp/server_context.h> 35 36 #include "src/core/lib/gprpp/crash.h" 37 #include "src/proto/grpc/testing/echo.grpc.pb.h" 38 #include "test/core/util/test_config.h" 39 #include "test/cpp/util/string_ref_helper.h" 40 41 namespace grpc { 42 namespace testing { 43 44 const int kServerDefaultResponseStreamsToSend = 3; 45 const char* const kServerResponseStreamsToSend = "server_responses_to_send"; 46 const char* const kServerTryCancelRequest = "server_try_cancel"; 47 const char* const kClientTryCancelRequest = "client_try_cancel"; 48 const char* const kDebugInfoTrailerKey = "debug-info-bin"; 49 const char* const kServerFinishAfterNReads = "server_finish_after_n_reads"; 50 const char* const kServerUseCoalescingApi = "server_use_coalescing_api"; 51 const char* const kCheckClientInitialMetadataKey = "custom_client_metadata"; 52 const char* const kCheckClientInitialMetadataVal = "Value for client metadata"; 53 54 typedef enum { 55 DO_NOT_CANCEL = 0, 56 CANCEL_BEFORE_PROCESSING, 57 CANCEL_DURING_PROCESSING, 58 CANCEL_AFTER_PROCESSING 59 } ServerTryCancelRequestPhase; 60 61 namespace internal { 62 // When echo_deadline is requested, deadline seen in the ServerContext is set in 63 // the response in seconds. 64 void MaybeEchoDeadline(ServerContextBase* context, const EchoRequest* request, 65 EchoResponse* response); 66 67 void CheckServerAuthContext(const ServerContextBase* context, 68 const std::string& expected_transport_security_type, 69 const std::string& expected_client_identity); 70 71 // Returns the number of pairs in metadata that exactly match the given 72 // key-value pair. Returns -1 if the pair wasn't found. 73 int MetadataMatchCount( 74 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 75 const std::string& key, const std::string& value); 76 77 int GetIntValueFromMetadataHelper( 78 const char* key, 79 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 80 int default_value); 81 82 int GetIntValueFromMetadata( 83 const char* key, 84 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 85 int default_value); 86 87 void ServerTryCancel(ServerContext* context); 88 } // namespace internal 89 90 class TestServiceSignaller { 91 public: 92 // Waits for at least *desired_rpcs* to to be waiting for a server 93 // continue notification. 94 // Returns when *desired_rpcs* reaches that amount, or when we've 95 // surpassed the timeout, whichever happens first. The return value 96 // is whatever the number of RPCs waiting for server notification is 97 // at that time. ClientWaitUntilNRpcsStarted(int desired_rpcs,absl::Duration timeout)98 int ClientWaitUntilNRpcsStarted(int desired_rpcs, absl::Duration timeout) { 99 gpr_log(GPR_DEBUG, "*** enter ClientWaitUntilNRpcsStarted ***"); 100 absl::Time deadline = absl::Now() + timeout; 101 std::chrono::system_clock::time_point chrono_deadline = 102 absl::ToChronoTime(deadline); 103 std::unique_lock<std::mutex> lock(mu_); 104 cv_rpc_started_.wait_until(lock, chrono_deadline, [this, desired_rpcs] { 105 gpr_log( 106 GPR_DEBUG, 107 "*** desired_rpcs: %d rpcs_waiting_for_server_to_continue_: %d ***", 108 desired_rpcs, rpcs_waiting_for_server_to_continue_); 109 return rpcs_waiting_for_server_to_continue_ >= desired_rpcs; 110 }); 111 gpr_log(GPR_DEBUG, "*** leave ClientWaitUntilNRpcsStarted ***"); 112 return rpcs_waiting_for_server_to_continue_; 113 } ServerWaitToContinue()114 void ServerWaitToContinue() { 115 gpr_log(GPR_DEBUG, "*** enter ServerWaitToContinue ***"); 116 std::unique_lock<std::mutex> lock(mu_); 117 cv_server_continue_.wait(lock, [this] { return server_should_continue_; }); 118 gpr_log(GPR_DEBUG, "*** leave ServerWaitToContinue ***"); 119 } SignalClientThatRpcStarted()120 void SignalClientThatRpcStarted() { 121 gpr_log(GPR_DEBUG, "*** SignalClientThatRpcStarted ***"); 122 std::unique_lock<std::mutex> lock(mu_); 123 ++rpcs_waiting_for_server_to_continue_; 124 cv_rpc_started_.notify_all(); 125 } SignalServerToContinue()126 void SignalServerToContinue() { 127 gpr_log(GPR_DEBUG, "*** SignalServerToContinue ***"); 128 std::unique_lock<std::mutex> lock(mu_); 129 server_should_continue_ = true; 130 cv_server_continue_.notify_all(); 131 } Reset()132 void Reset() { 133 std::unique_lock<std::mutex> lock(mu_); 134 rpcs_waiting_for_server_to_continue_ = 0; 135 server_should_continue_ = false; 136 } 137 138 private: 139 std::mutex mu_; 140 std::condition_variable cv_rpc_started_; 141 int rpcs_waiting_for_server_to_continue_ /* GUARDED_BY(mu_) */ = 0; 142 std::condition_variable cv_server_continue_; 143 bool server_should_continue_ /* GUARDED_BY(mu_) */ = false; 144 }; 145 146 template <typename RpcService> 147 class TestMultipleServiceImpl : public RpcService { 148 public: TestMultipleServiceImpl()149 TestMultipleServiceImpl() : signal_client_(false), host_() {} TestMultipleServiceImpl(const std::string & host)150 explicit TestMultipleServiceImpl(const std::string& host) 151 : signal_client_(false), host_(new std::string(host)) {} 152 Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)153 Status Echo(ServerContext* context, const EchoRequest* request, 154 EchoResponse* response) { 155 if (request->has_param() && 156 request->param().server_notify_client_when_started()) { 157 signaller_.SignalClientThatRpcStarted(); 158 signaller_.ServerWaitToContinue(); 159 } 160 161 // A bit of sleep to make sure that short deadline tests fail 162 if (request->has_param() && request->param().server_sleep_us() > 0) { 163 gpr_sleep_until(gpr_time_add( 164 gpr_now(GPR_CLOCK_MONOTONIC), 165 gpr_time_from_micros( 166 request->param().server_sleep_us() * grpc_test_slowdown_factor(), 167 GPR_TIMESPAN))); 168 } 169 170 if (request->has_param() && request->param().server_die()) { 171 gpr_log(GPR_ERROR, "The request should not reach application handler."); 172 GPR_ASSERT(0); 173 } 174 if (request->has_param() && request->param().has_expected_error()) { 175 const auto& error = request->param().expected_error(); 176 return Status(static_cast<StatusCode>(error.code()), 177 error.error_message(), error.binary_error_details()); 178 } 179 int server_try_cancel = internal::GetIntValueFromMetadata( 180 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 181 if (server_try_cancel > DO_NOT_CANCEL) { 182 // Since this is a unary RPC, by the time this server handler is called, 183 // the 'request' message is already read from the client. So the scenarios 184 // in server_try_cancel don't make much sense. Just cancel the RPC as long 185 // as server_try_cancel is not DO_NOT_CANCEL 186 internal::ServerTryCancel(context); 187 return Status::CANCELLED; 188 } 189 190 response->set_message(request->message()); 191 internal::MaybeEchoDeadline(context, request, response); 192 if (host_) { 193 response->mutable_param()->set_host(*host_); 194 } else if (request->has_param() && 195 request->param().echo_host_from_authority_header()) { 196 auto authority = context->ExperimentalGetAuthority(); 197 std::string authority_str(authority.data(), authority.size()); 198 response->mutable_param()->set_host(std::move(authority_str)); 199 } 200 if (request->has_param() && request->param().client_cancel_after_us()) { 201 { 202 std::unique_lock<std::mutex> lock(mu_); 203 signal_client_ = true; 204 ++rpcs_waiting_for_client_cancel_; 205 } 206 while (!context->IsCancelled()) { 207 gpr_sleep_until(gpr_time_add( 208 gpr_now(GPR_CLOCK_REALTIME), 209 gpr_time_from_micros(request->param().client_cancel_after_us() * 210 grpc_test_slowdown_factor(), 211 GPR_TIMESPAN))); 212 } 213 { 214 std::unique_lock<std::mutex> lock(mu_); 215 --rpcs_waiting_for_client_cancel_; 216 } 217 return Status::CANCELLED; 218 } else if (request->has_param() && 219 request->param().server_cancel_after_us()) { 220 gpr_sleep_until(gpr_time_add( 221 gpr_now(GPR_CLOCK_REALTIME), 222 gpr_time_from_micros(request->param().server_cancel_after_us() * 223 grpc_test_slowdown_factor(), 224 GPR_TIMESPAN))); 225 return Status::CANCELLED; 226 } else if (!request->has_param() || 227 !request->param().skip_cancelled_check()) { 228 EXPECT_FALSE(context->IsCancelled()); 229 } 230 231 if (request->has_param() && request->param().echo_metadata_initially()) { 232 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = 233 context->client_metadata(); 234 for (const auto& metadatum : client_metadata) { 235 context->AddInitialMetadata(ToString(metadatum.first), 236 ToString(metadatum.second)); 237 } 238 } 239 240 if (request->has_param() && request->param().echo_metadata()) { 241 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = 242 context->client_metadata(); 243 for (const auto& metadatum : client_metadata) { 244 context->AddTrailingMetadata(ToString(metadatum.first), 245 ToString(metadatum.second)); 246 } 247 // Terminate rpc with error and debug info in trailer. 248 if (request->param().debug_info().stack_entries_size() || 249 !request->param().debug_info().detail().empty()) { 250 std::string serialized_debug_info = 251 request->param().debug_info().SerializeAsString(); 252 context->AddTrailingMetadata(kDebugInfoTrailerKey, 253 serialized_debug_info); 254 return Status::CANCELLED; 255 } 256 } 257 if (request->has_param() && 258 (request->param().expected_client_identity().length() > 0 || 259 request->param().check_auth_context())) { 260 internal::CheckServerAuthContext( 261 context, request->param().expected_transport_security_type(), 262 request->param().expected_client_identity()); 263 } 264 if (request->has_param() && 265 request->param().response_message_length() > 0) { 266 response->set_message( 267 std::string(request->param().response_message_length(), '\0')); 268 } 269 if (request->has_param() && request->param().echo_peer()) { 270 response->mutable_param()->set_peer(context->peer()); 271 } 272 return Status::OK; 273 } 274 Echo1(ServerContext * context,const EchoRequest * request,EchoResponse * response)275 Status Echo1(ServerContext* context, const EchoRequest* request, 276 EchoResponse* response) { 277 return Echo(context, request, response); 278 } 279 Echo2(ServerContext * context,const EchoRequest * request,EchoResponse * response)280 Status Echo2(ServerContext* context, const EchoRequest* request, 281 EchoResponse* response) { 282 return Echo(context, request, response); 283 } 284 CheckClientInitialMetadata(ServerContext * context,const SimpleRequest *,SimpleResponse *)285 Status CheckClientInitialMetadata(ServerContext* context, 286 const SimpleRequest* /*request*/, 287 SimpleResponse* /*response*/) { 288 EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(), 289 kCheckClientInitialMetadataKey, 290 kCheckClientInitialMetadataVal), 291 1); 292 EXPECT_EQ(1u, 293 context->client_metadata().count(kCheckClientInitialMetadataKey)); 294 return Status::OK; 295 } 296 297 // Unimplemented is left unimplemented to test the returned error. 298 RequestStream(ServerContext * context,ServerReader<EchoRequest> * reader,EchoResponse * response)299 Status RequestStream(ServerContext* context, 300 ServerReader<EchoRequest>* reader, 301 EchoResponse* response) { 302 // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by 303 // the server by calling ServerContext::TryCancel() depending on the value: 304 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads 305 // any message from the client 306 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 307 // reading messages from the client 308 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads 309 // all the messages from the client 310 int server_try_cancel = internal::GetIntValueFromMetadata( 311 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 312 313 EchoRequest request; 314 response->set_message(""); 315 316 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 317 internal::ServerTryCancel(context); 318 return Status::CANCELLED; 319 } 320 321 std::thread* server_try_cancel_thd = nullptr; 322 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 323 server_try_cancel_thd = 324 new std::thread([context] { internal::ServerTryCancel(context); }); 325 } 326 327 int num_msgs_read = 0; 328 while (reader->Read(&request)) { 329 response->mutable_message()->append(request.message()); 330 } 331 gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); 332 333 if (server_try_cancel_thd != nullptr) { 334 server_try_cancel_thd->join(); 335 delete server_try_cancel_thd; 336 return Status::CANCELLED; 337 } 338 339 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 340 internal::ServerTryCancel(context); 341 return Status::CANCELLED; 342 } 343 344 return Status::OK; 345 } 346 347 // Return 'kNumResponseStreamMsgs' messages. 348 // TODO(yangg) make it generic by adding a parameter into EchoRequest ResponseStream(ServerContext * context,const EchoRequest * request,ServerWriter<EchoResponse> * writer)349 Status ResponseStream(ServerContext* context, const EchoRequest* request, 350 ServerWriter<EchoResponse>* writer) { 351 // If server_try_cancel is set in the metadata, the RPC is cancelled by the 352 // server by calling ServerContext::TryCancel() depending on the value: 353 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes 354 // any messages to the client 355 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 356 // writing messages to the client 357 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes 358 // all the messages to the client 359 int server_try_cancel = internal::GetIntValueFromMetadata( 360 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 361 362 int server_coalescing_api = internal::GetIntValueFromMetadata( 363 kServerUseCoalescingApi, context->client_metadata(), 0); 364 365 int server_responses_to_send = internal::GetIntValueFromMetadata( 366 kServerResponseStreamsToSend, context->client_metadata(), 367 kServerDefaultResponseStreamsToSend); 368 369 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 370 internal::ServerTryCancel(context); 371 return Status::CANCELLED; 372 } 373 374 EchoResponse response; 375 std::thread* server_try_cancel_thd = nullptr; 376 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 377 server_try_cancel_thd = 378 new std::thread([context] { internal::ServerTryCancel(context); }); 379 } 380 381 for (int i = 0; i < server_responses_to_send; i++) { 382 response.set_message(request->message() + std::to_string(i)); 383 if (i == server_responses_to_send - 1 && server_coalescing_api != 0) { 384 writer->WriteLast(response, WriteOptions()); 385 } else { 386 writer->Write(response); 387 } 388 } 389 390 if (server_try_cancel_thd != nullptr) { 391 server_try_cancel_thd->join(); 392 delete server_try_cancel_thd; 393 return Status::CANCELLED; 394 } 395 396 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 397 internal::ServerTryCancel(context); 398 return Status::CANCELLED; 399 } 400 401 return Status::OK; 402 } 403 BidiStream(ServerContext * context,ServerReaderWriter<EchoResponse,EchoRequest> * stream)404 Status BidiStream(ServerContext* context, 405 ServerReaderWriter<EchoResponse, EchoRequest>* stream) { 406 // If server_try_cancel is set in the metadata, the RPC is cancelled by the 407 // server by calling ServerContext::TryCancel() depending on the value: 408 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ 409 // writes any messages from/to the client 410 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 411 // reading/writing messages from/to the client 412 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server 413 // reads/writes all messages from/to the client 414 int server_try_cancel = internal::GetIntValueFromMetadata( 415 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 416 417 int client_try_cancel = static_cast<bool>(internal::GetIntValueFromMetadata( 418 kClientTryCancelRequest, context->client_metadata(), 0)); 419 420 EchoRequest request; 421 EchoResponse response; 422 423 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 424 internal::ServerTryCancel(context); 425 return Status::CANCELLED; 426 } 427 428 std::thread* server_try_cancel_thd = nullptr; 429 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 430 server_try_cancel_thd = 431 new std::thread([context] { internal::ServerTryCancel(context); }); 432 } 433 434 // kServerFinishAfterNReads suggests after how many reads, the server should 435 // write the last message and send status (coalesced using WriteLast) 436 int server_write_last = internal::GetIntValueFromMetadata( 437 kServerFinishAfterNReads, context->client_metadata(), 0); 438 439 int read_counts = 0; 440 while (stream->Read(&request)) { 441 read_counts++; 442 gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); 443 response.set_message(request.message()); 444 if (read_counts == server_write_last) { 445 stream->WriteLast(response, WriteOptions()); 446 break; 447 } else { 448 stream->Write(response); 449 } 450 } 451 452 if (client_try_cancel) { 453 EXPECT_TRUE(context->IsCancelled()); 454 } 455 456 if (server_try_cancel_thd != nullptr) { 457 server_try_cancel_thd->join(); 458 delete server_try_cancel_thd; 459 return Status::CANCELLED; 460 } 461 462 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 463 internal::ServerTryCancel(context); 464 return Status::CANCELLED; 465 } 466 467 return Status::OK; 468 } 469 470 // Unimplemented is left unimplemented to test the returned error. signal_client()471 bool signal_client() { 472 std::unique_lock<std::mutex> lock(mu_); 473 return signal_client_; 474 } 475 int ClientWaitUntilNRpcsStarted(int desired_rpcs, 476 absl::Duration timeout = absl::Minutes(1)) { 477 return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout); 478 } SignalServerToContinue()479 void SignalServerToContinue() { signaller_.SignalServerToContinue(); } ResetSignaller()480 void ResetSignaller() { signaller_.Reset(); } RpcsWaitingForClientCancel()481 uint64_t RpcsWaitingForClientCancel() { 482 std::unique_lock<std::mutex> lock(mu_); 483 return rpcs_waiting_for_client_cancel_; 484 } 485 486 private: 487 bool signal_client_; 488 std::mutex mu_; 489 TestServiceSignaller signaller_; 490 std::unique_ptr<std::string> host_; 491 uint64_t rpcs_waiting_for_client_cancel_ = 0; 492 }; 493 494 class CallbackTestServiceImpl 495 : public grpc::testing::EchoTestService::CallbackService { 496 public: CallbackTestServiceImpl()497 CallbackTestServiceImpl() : signal_client_(false), host_() {} CallbackTestServiceImpl(const std::string & host)498 explicit CallbackTestServiceImpl(const std::string& host) 499 : signal_client_(false), host_(new std::string(host)) {} 500 501 ServerUnaryReactor* Echo(CallbackServerContext* context, 502 const EchoRequest* request, 503 EchoResponse* response) override; 504 505 ServerUnaryReactor* CheckClientInitialMetadata(CallbackServerContext* context, 506 const SimpleRequest*, 507 SimpleResponse*) override; 508 509 ServerReadReactor<EchoRequest>* RequestStream( 510 CallbackServerContext* context, EchoResponse* response) override; 511 512 ServerWriteReactor<EchoResponse>* ResponseStream( 513 CallbackServerContext* context, const EchoRequest* request) override; 514 515 ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream( 516 CallbackServerContext* context) override; 517 518 // Unimplemented is left unimplemented to test the returned error. signal_client()519 bool signal_client() { 520 std::unique_lock<std::mutex> lock(mu_); 521 return signal_client_; 522 } 523 int ClientWaitUntilNRpcsStarted(int desired_rpcs, 524 absl::Duration timeout = absl::Minutes(1)) { 525 return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout); 526 } SignalServerToContinue()527 void SignalServerToContinue() { signaller_.SignalServerToContinue(); } ResetSignaller()528 void ResetSignaller() { signaller_.Reset(); } 529 530 private: 531 bool signal_client_; 532 std::mutex mu_; 533 TestServiceSignaller signaller_; 534 std::unique_ptr<std::string> host_; 535 }; 536 537 using TestServiceImpl = 538 TestMultipleServiceImpl<grpc::testing::EchoTestService::Service>; 539 540 } // namespace testing 541 } // namespace grpc 542 543 #endif // GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 544