xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/test_service_impl.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
20 #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
21 
22 #include <condition_variable>
23 #include <memory>
24 #include <mutex>
25 #include <string>
26 #include <thread>
27 
28 #include <gtest/gtest.h>
29 
30 #include <grpc/grpc.h>
31 #include <grpc/support/log.h>
32 #include <grpcpp/alarm.h>
33 #include <grpcpp/security/credentials.h>
34 #include <grpcpp/server_context.h>
35 
36 #include "src/core/lib/gprpp/crash.h"
37 #include "src/proto/grpc/testing/echo.grpc.pb.h"
38 #include "test/core/util/test_config.h"
39 #include "test/cpp/util/string_ref_helper.h"
40 
41 namespace grpc {
42 namespace testing {
43 
44 const int kServerDefaultResponseStreamsToSend = 3;
45 const char* const kServerResponseStreamsToSend = "server_responses_to_send";
46 const char* const kServerTryCancelRequest = "server_try_cancel";
47 const char* const kClientTryCancelRequest = "client_try_cancel";
48 const char* const kDebugInfoTrailerKey = "debug-info-bin";
49 const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
50 const char* const kServerUseCoalescingApi = "server_use_coalescing_api";
51 const char* const kCheckClientInitialMetadataKey = "custom_client_metadata";
52 const char* const kCheckClientInitialMetadataVal = "Value for client metadata";
53 
54 typedef enum {
55   DO_NOT_CANCEL = 0,
56   CANCEL_BEFORE_PROCESSING,
57   CANCEL_DURING_PROCESSING,
58   CANCEL_AFTER_PROCESSING
59 } ServerTryCancelRequestPhase;
60 
61 namespace internal {
62 // When echo_deadline is requested, deadline seen in the ServerContext is set in
63 // the response in seconds.
64 void MaybeEchoDeadline(ServerContextBase* context, const EchoRequest* request,
65                        EchoResponse* response);
66 
67 void CheckServerAuthContext(const ServerContextBase* context,
68                             const std::string& expected_transport_security_type,
69                             const std::string& expected_client_identity);
70 
71 // Returns the number of pairs in metadata that exactly match the given
72 // key-value pair. Returns -1 if the pair wasn't found.
73 int MetadataMatchCount(
74     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
75     const std::string& key, const std::string& value);
76 
77 int GetIntValueFromMetadataHelper(
78     const char* key,
79     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
80     int default_value);
81 
82 int GetIntValueFromMetadata(
83     const char* key,
84     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
85     int default_value);
86 
87 void ServerTryCancel(ServerContext* context);
88 }  // namespace internal
89 
90 class TestServiceSignaller {
91  public:
92   // Waits for at least *desired_rpcs* to to be waiting for a server
93   // continue notification.
94   // Returns when *desired_rpcs* reaches that amount, or when we've
95   // surpassed the timeout, whichever happens first. The return value
96   // is whatever the number of RPCs waiting for server notification is
97   // at that time.
ClientWaitUntilNRpcsStarted(int desired_rpcs,absl::Duration timeout)98   int ClientWaitUntilNRpcsStarted(int desired_rpcs, absl::Duration timeout) {
99     gpr_log(GPR_DEBUG, "*** enter ClientWaitUntilNRpcsStarted ***");
100     absl::Time deadline = absl::Now() + timeout;
101     std::chrono::system_clock::time_point chrono_deadline =
102         absl::ToChronoTime(deadline);
103     std::unique_lock<std::mutex> lock(mu_);
104     cv_rpc_started_.wait_until(lock, chrono_deadline, [this, desired_rpcs] {
105       gpr_log(
106           GPR_DEBUG,
107           "*** desired_rpcs: %d rpcs_waiting_for_server_to_continue_: %d ***",
108           desired_rpcs, rpcs_waiting_for_server_to_continue_);
109       return rpcs_waiting_for_server_to_continue_ >= desired_rpcs;
110     });
111     gpr_log(GPR_DEBUG, "*** leave ClientWaitUntilNRpcsStarted ***");
112     return rpcs_waiting_for_server_to_continue_;
113   }
ServerWaitToContinue()114   void ServerWaitToContinue() {
115     gpr_log(GPR_DEBUG, "*** enter ServerWaitToContinue ***");
116     std::unique_lock<std::mutex> lock(mu_);
117     cv_server_continue_.wait(lock, [this] { return server_should_continue_; });
118     gpr_log(GPR_DEBUG, "*** leave ServerWaitToContinue ***");
119   }
SignalClientThatRpcStarted()120   void SignalClientThatRpcStarted() {
121     gpr_log(GPR_DEBUG, "*** SignalClientThatRpcStarted ***");
122     std::unique_lock<std::mutex> lock(mu_);
123     ++rpcs_waiting_for_server_to_continue_;
124     cv_rpc_started_.notify_all();
125   }
SignalServerToContinue()126   void SignalServerToContinue() {
127     gpr_log(GPR_DEBUG, "*** SignalServerToContinue ***");
128     std::unique_lock<std::mutex> lock(mu_);
129     server_should_continue_ = true;
130     cv_server_continue_.notify_all();
131   }
Reset()132   void Reset() {
133     std::unique_lock<std::mutex> lock(mu_);
134     rpcs_waiting_for_server_to_continue_ = 0;
135     server_should_continue_ = false;
136   }
137 
138  private:
139   std::mutex mu_;
140   std::condition_variable cv_rpc_started_;
141   int rpcs_waiting_for_server_to_continue_ /* GUARDED_BY(mu_) */ = 0;
142   std::condition_variable cv_server_continue_;
143   bool server_should_continue_ /* GUARDED_BY(mu_) */ = false;
144 };
145 
146 template <typename RpcService>
147 class TestMultipleServiceImpl : public RpcService {
148  public:
TestMultipleServiceImpl()149   TestMultipleServiceImpl() : signal_client_(false), host_() {}
TestMultipleServiceImpl(const std::string & host)150   explicit TestMultipleServiceImpl(const std::string& host)
151       : signal_client_(false), host_(new std::string(host)) {}
152 
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)153   Status Echo(ServerContext* context, const EchoRequest* request,
154               EchoResponse* response) {
155     if (request->has_param() &&
156         request->param().server_notify_client_when_started()) {
157       signaller_.SignalClientThatRpcStarted();
158       signaller_.ServerWaitToContinue();
159     }
160 
161     // A bit of sleep to make sure that short deadline tests fail
162     if (request->has_param() && request->param().server_sleep_us() > 0) {
163       gpr_sleep_until(gpr_time_add(
164           gpr_now(GPR_CLOCK_MONOTONIC),
165           gpr_time_from_micros(
166               request->param().server_sleep_us() * grpc_test_slowdown_factor(),
167               GPR_TIMESPAN)));
168     }
169 
170     if (request->has_param() && request->param().server_die()) {
171       gpr_log(GPR_ERROR, "The request should not reach application handler.");
172       GPR_ASSERT(0);
173     }
174     if (request->has_param() && request->param().has_expected_error()) {
175       const auto& error = request->param().expected_error();
176       return Status(static_cast<StatusCode>(error.code()),
177                     error.error_message(), error.binary_error_details());
178     }
179     int server_try_cancel = internal::GetIntValueFromMetadata(
180         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
181     if (server_try_cancel > DO_NOT_CANCEL) {
182       // Since this is a unary RPC, by the time this server handler is called,
183       // the 'request' message is already read from the client. So the scenarios
184       // in server_try_cancel don't make much sense. Just cancel the RPC as long
185       // as server_try_cancel is not DO_NOT_CANCEL
186       internal::ServerTryCancel(context);
187       return Status::CANCELLED;
188     }
189 
190     response->set_message(request->message());
191     internal::MaybeEchoDeadline(context, request, response);
192     if (host_) {
193       response->mutable_param()->set_host(*host_);
194     } else if (request->has_param() &&
195                request->param().echo_host_from_authority_header()) {
196       auto authority = context->ExperimentalGetAuthority();
197       std::string authority_str(authority.data(), authority.size());
198       response->mutable_param()->set_host(std::move(authority_str));
199     }
200     if (request->has_param() && request->param().client_cancel_after_us()) {
201       {
202         std::unique_lock<std::mutex> lock(mu_);
203         signal_client_ = true;
204         ++rpcs_waiting_for_client_cancel_;
205       }
206       while (!context->IsCancelled()) {
207         gpr_sleep_until(gpr_time_add(
208             gpr_now(GPR_CLOCK_REALTIME),
209             gpr_time_from_micros(request->param().client_cancel_after_us() *
210                                      grpc_test_slowdown_factor(),
211                                  GPR_TIMESPAN)));
212       }
213       {
214         std::unique_lock<std::mutex> lock(mu_);
215         --rpcs_waiting_for_client_cancel_;
216       }
217       return Status::CANCELLED;
218     } else if (request->has_param() &&
219                request->param().server_cancel_after_us()) {
220       gpr_sleep_until(gpr_time_add(
221           gpr_now(GPR_CLOCK_REALTIME),
222           gpr_time_from_micros(request->param().server_cancel_after_us() *
223                                    grpc_test_slowdown_factor(),
224                                GPR_TIMESPAN)));
225       return Status::CANCELLED;
226     } else if (!request->has_param() ||
227                !request->param().skip_cancelled_check()) {
228       EXPECT_FALSE(context->IsCancelled());
229     }
230 
231     if (request->has_param() && request->param().echo_metadata_initially()) {
232       const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
233           context->client_metadata();
234       for (const auto& metadatum : client_metadata) {
235         context->AddInitialMetadata(ToString(metadatum.first),
236                                     ToString(metadatum.second));
237       }
238     }
239 
240     if (request->has_param() && request->param().echo_metadata()) {
241       const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
242           context->client_metadata();
243       for (const auto& metadatum : client_metadata) {
244         context->AddTrailingMetadata(ToString(metadatum.first),
245                                      ToString(metadatum.second));
246       }
247       // Terminate rpc with error and debug info in trailer.
248       if (request->param().debug_info().stack_entries_size() ||
249           !request->param().debug_info().detail().empty()) {
250         std::string serialized_debug_info =
251             request->param().debug_info().SerializeAsString();
252         context->AddTrailingMetadata(kDebugInfoTrailerKey,
253                                      serialized_debug_info);
254         return Status::CANCELLED;
255       }
256     }
257     if (request->has_param() &&
258         (request->param().expected_client_identity().length() > 0 ||
259          request->param().check_auth_context())) {
260       internal::CheckServerAuthContext(
261           context, request->param().expected_transport_security_type(),
262           request->param().expected_client_identity());
263     }
264     if (request->has_param() &&
265         request->param().response_message_length() > 0) {
266       response->set_message(
267           std::string(request->param().response_message_length(), '\0'));
268     }
269     if (request->has_param() && request->param().echo_peer()) {
270       response->mutable_param()->set_peer(context->peer());
271     }
272     return Status::OK;
273   }
274 
Echo1(ServerContext * context,const EchoRequest * request,EchoResponse * response)275   Status Echo1(ServerContext* context, const EchoRequest* request,
276                EchoResponse* response) {
277     return Echo(context, request, response);
278   }
279 
Echo2(ServerContext * context,const EchoRequest * request,EchoResponse * response)280   Status Echo2(ServerContext* context, const EchoRequest* request,
281                EchoResponse* response) {
282     return Echo(context, request, response);
283   }
284 
CheckClientInitialMetadata(ServerContext * context,const SimpleRequest *,SimpleResponse *)285   Status CheckClientInitialMetadata(ServerContext* context,
286                                     const SimpleRequest* /*request*/,
287                                     SimpleResponse* /*response*/) {
288     EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(),
289                                            kCheckClientInitialMetadataKey,
290                                            kCheckClientInitialMetadataVal),
291               1);
292     EXPECT_EQ(1u,
293               context->client_metadata().count(kCheckClientInitialMetadataKey));
294     return Status::OK;
295   }
296 
297   // Unimplemented is left unimplemented to test the returned error.
298 
RequestStream(ServerContext * context,ServerReader<EchoRequest> * reader,EchoResponse * response)299   Status RequestStream(ServerContext* context,
300                        ServerReader<EchoRequest>* reader,
301                        EchoResponse* response) {
302     // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
303     // the server by calling ServerContext::TryCancel() depending on the value:
304     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
305     //   any message from the client
306     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
307     //   reading messages from the client
308     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
309     //   all the messages from the client
310     int server_try_cancel = internal::GetIntValueFromMetadata(
311         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
312 
313     EchoRequest request;
314     response->set_message("");
315 
316     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
317       internal::ServerTryCancel(context);
318       return Status::CANCELLED;
319     }
320 
321     std::thread* server_try_cancel_thd = nullptr;
322     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
323       server_try_cancel_thd =
324           new std::thread([context] { internal::ServerTryCancel(context); });
325     }
326 
327     int num_msgs_read = 0;
328     while (reader->Read(&request)) {
329       response->mutable_message()->append(request.message());
330     }
331     gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
332 
333     if (server_try_cancel_thd != nullptr) {
334       server_try_cancel_thd->join();
335       delete server_try_cancel_thd;
336       return Status::CANCELLED;
337     }
338 
339     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
340       internal::ServerTryCancel(context);
341       return Status::CANCELLED;
342     }
343 
344     return Status::OK;
345   }
346 
347   // Return 'kNumResponseStreamMsgs' messages.
348   // TODO(yangg) make it generic by adding a parameter into EchoRequest
ResponseStream(ServerContext * context,const EchoRequest * request,ServerWriter<EchoResponse> * writer)349   Status ResponseStream(ServerContext* context, const EchoRequest* request,
350                         ServerWriter<EchoResponse>* writer) {
351     // If server_try_cancel is set in the metadata, the RPC is cancelled by the
352     // server by calling ServerContext::TryCancel() depending on the value:
353     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
354     //   any messages to the client
355     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
356     //   writing messages to the client
357     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
358     //   all the messages to the client
359     int server_try_cancel = internal::GetIntValueFromMetadata(
360         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
361 
362     int server_coalescing_api = internal::GetIntValueFromMetadata(
363         kServerUseCoalescingApi, context->client_metadata(), 0);
364 
365     int server_responses_to_send = internal::GetIntValueFromMetadata(
366         kServerResponseStreamsToSend, context->client_metadata(),
367         kServerDefaultResponseStreamsToSend);
368 
369     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
370       internal::ServerTryCancel(context);
371       return Status::CANCELLED;
372     }
373 
374     EchoResponse response;
375     std::thread* server_try_cancel_thd = nullptr;
376     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
377       server_try_cancel_thd =
378           new std::thread([context] { internal::ServerTryCancel(context); });
379     }
380 
381     for (int i = 0; i < server_responses_to_send; i++) {
382       response.set_message(request->message() + std::to_string(i));
383       if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
384         writer->WriteLast(response, WriteOptions());
385       } else {
386         writer->Write(response);
387       }
388     }
389 
390     if (server_try_cancel_thd != nullptr) {
391       server_try_cancel_thd->join();
392       delete server_try_cancel_thd;
393       return Status::CANCELLED;
394     }
395 
396     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
397       internal::ServerTryCancel(context);
398       return Status::CANCELLED;
399     }
400 
401     return Status::OK;
402   }
403 
BidiStream(ServerContext * context,ServerReaderWriter<EchoResponse,EchoRequest> * stream)404   Status BidiStream(ServerContext* context,
405                     ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
406     // If server_try_cancel is set in the metadata, the RPC is cancelled by the
407     // server by calling ServerContext::TryCancel() depending on the value:
408     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
409     //   writes any messages from/to the client
410     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
411     //   reading/writing messages from/to the client
412     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
413     //   reads/writes all messages from/to the client
414     int server_try_cancel = internal::GetIntValueFromMetadata(
415         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
416 
417     int client_try_cancel = static_cast<bool>(internal::GetIntValueFromMetadata(
418         kClientTryCancelRequest, context->client_metadata(), 0));
419 
420     EchoRequest request;
421     EchoResponse response;
422 
423     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
424       internal::ServerTryCancel(context);
425       return Status::CANCELLED;
426     }
427 
428     std::thread* server_try_cancel_thd = nullptr;
429     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
430       server_try_cancel_thd =
431           new std::thread([context] { internal::ServerTryCancel(context); });
432     }
433 
434     // kServerFinishAfterNReads suggests after how many reads, the server should
435     // write the last message and send status (coalesced using WriteLast)
436     int server_write_last = internal::GetIntValueFromMetadata(
437         kServerFinishAfterNReads, context->client_metadata(), 0);
438 
439     int read_counts = 0;
440     while (stream->Read(&request)) {
441       read_counts++;
442       gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
443       response.set_message(request.message());
444       if (read_counts == server_write_last) {
445         stream->WriteLast(response, WriteOptions());
446         break;
447       } else {
448         stream->Write(response);
449       }
450     }
451 
452     if (client_try_cancel) {
453       EXPECT_TRUE(context->IsCancelled());
454     }
455 
456     if (server_try_cancel_thd != nullptr) {
457       server_try_cancel_thd->join();
458       delete server_try_cancel_thd;
459       return Status::CANCELLED;
460     }
461 
462     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
463       internal::ServerTryCancel(context);
464       return Status::CANCELLED;
465     }
466 
467     return Status::OK;
468   }
469 
470   // Unimplemented is left unimplemented to test the returned error.
signal_client()471   bool signal_client() {
472     std::unique_lock<std::mutex> lock(mu_);
473     return signal_client_;
474   }
475   int ClientWaitUntilNRpcsStarted(int desired_rpcs,
476                                   absl::Duration timeout = absl::Minutes(1)) {
477     return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout);
478   }
SignalServerToContinue()479   void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
ResetSignaller()480   void ResetSignaller() { signaller_.Reset(); }
RpcsWaitingForClientCancel()481   uint64_t RpcsWaitingForClientCancel() {
482     std::unique_lock<std::mutex> lock(mu_);
483     return rpcs_waiting_for_client_cancel_;
484   }
485 
486  private:
487   bool signal_client_;
488   std::mutex mu_;
489   TestServiceSignaller signaller_;
490   std::unique_ptr<std::string> host_;
491   uint64_t rpcs_waiting_for_client_cancel_ = 0;
492 };
493 
494 class CallbackTestServiceImpl
495     : public grpc::testing::EchoTestService::CallbackService {
496  public:
CallbackTestServiceImpl()497   CallbackTestServiceImpl() : signal_client_(false), host_() {}
CallbackTestServiceImpl(const std::string & host)498   explicit CallbackTestServiceImpl(const std::string& host)
499       : signal_client_(false), host_(new std::string(host)) {}
500 
501   ServerUnaryReactor* Echo(CallbackServerContext* context,
502                            const EchoRequest* request,
503                            EchoResponse* response) override;
504 
505   ServerUnaryReactor* CheckClientInitialMetadata(CallbackServerContext* context,
506                                                  const SimpleRequest*,
507                                                  SimpleResponse*) override;
508 
509   ServerReadReactor<EchoRequest>* RequestStream(
510       CallbackServerContext* context, EchoResponse* response) override;
511 
512   ServerWriteReactor<EchoResponse>* ResponseStream(
513       CallbackServerContext* context, const EchoRequest* request) override;
514 
515   ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream(
516       CallbackServerContext* context) override;
517 
518   // Unimplemented is left unimplemented to test the returned error.
signal_client()519   bool signal_client() {
520     std::unique_lock<std::mutex> lock(mu_);
521     return signal_client_;
522   }
523   int ClientWaitUntilNRpcsStarted(int desired_rpcs,
524                                   absl::Duration timeout = absl::Minutes(1)) {
525     return signaller_.ClientWaitUntilNRpcsStarted(desired_rpcs, timeout);
526   }
SignalServerToContinue()527   void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
ResetSignaller()528   void ResetSignaller() { signaller_.Reset(); }
529 
530  private:
531   bool signal_client_;
532   std::mutex mu_;
533   TestServiceSignaller signaller_;
534   std::unique_ptr<std::string> host_;
535 };
536 
537 using TestServiceImpl =
538     TestMultipleServiceImpl<grpc::testing::EchoTestService::Service>;
539 
540 }  // namespace testing
541 }  // namespace grpc
542 
543 #endif  // GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
544