xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/test_service_impl.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/end2end/test_service_impl.h"
20 
21 #include <string>
22 #include <thread>
23 
24 #include <gtest/gtest.h>
25 
26 #include <grpc/support/log.h>
27 #include <grpcpp/alarm.h>
28 #include <grpcpp/security/credentials.h>
29 #include <grpcpp/server_context.h>
30 
31 #include "src/core/lib/gprpp/crash.h"
32 #include "src/core/lib/gprpp/notification.h"
33 #include "src/proto/grpc/testing/echo.grpc.pb.h"
34 #include "test/cpp/util/string_ref_helper.h"
35 
36 using std::chrono::system_clock;
37 
38 namespace grpc {
39 namespace testing {
40 namespace internal {
41 
42 // When echo_deadline is requested, deadline seen in the ServerContext is set in
43 // the response in seconds.
MaybeEchoDeadline(ServerContextBase * context,const EchoRequest * request,EchoResponse * response)44 void MaybeEchoDeadline(ServerContextBase* context, const EchoRequest* request,
45                        EchoResponse* response) {
46   if (request->has_param() && request->param().echo_deadline()) {
47     gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
48     if (context->deadline() != system_clock::time_point::max()) {
49       Timepoint2Timespec(context->deadline(), &deadline);
50     }
51     response->mutable_param()->set_request_deadline(deadline.tv_sec);
52   }
53 }
54 
CheckServerAuthContext(const ServerContextBase * context,const std::string & expected_transport_security_type,const std::string & expected_client_identity)55 void CheckServerAuthContext(const ServerContextBase* context,
56                             const std::string& expected_transport_security_type,
57                             const std::string& expected_client_identity) {
58   std::shared_ptr<const AuthContext> auth_ctx = context->auth_context();
59   std::vector<grpc::string_ref> tst =
60       auth_ctx->FindPropertyValues("transport_security_type");
61   EXPECT_EQ(1u, tst.size());
62   EXPECT_EQ(expected_transport_security_type, ToString(tst[0]));
63   if (expected_client_identity.empty()) {
64     EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty());
65     EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty());
66     EXPECT_FALSE(auth_ctx->IsPeerAuthenticated());
67   } else {
68     auto identity = auth_ctx->GetPeerIdentity();
69     EXPECT_TRUE(auth_ctx->IsPeerAuthenticated());
70     EXPECT_EQ(1u, identity.size());
71     EXPECT_EQ(expected_client_identity, identity[0]);
72   }
73 }
74 
75 // Returns the number of pairs in metadata that exactly match the given
76 // key-value pair. Returns -1 if the pair wasn't found.
MetadataMatchCount(const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,const std::string & key,const std::string & value)77 int MetadataMatchCount(
78     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
79     const std::string& key, const std::string& value) {
80   int count = 0;
81   for (const auto& metadatum : metadata) {
82     if (ToString(metadatum.first) == key &&
83         ToString(metadatum.second) == value) {
84       count++;
85     }
86   }
87   return count;
88 }
89 
GetIntValueFromMetadataHelper(const char * key,const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,int default_value)90 int GetIntValueFromMetadataHelper(
91     const char* key,
92     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
93     int default_value) {
94   if (metadata.find(key) != metadata.end()) {
95     std::istringstream iss(ToString(metadata.find(key)->second));
96     iss >> default_value;
97     gpr_log(GPR_INFO, "%s : %d", key, default_value);
98   }
99 
100   return default_value;
101 }
102 
GetIntValueFromMetadata(const char * key,const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,int default_value)103 int GetIntValueFromMetadata(
104     const char* key,
105     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
106     int default_value) {
107   return GetIntValueFromMetadataHelper(key, metadata, default_value);
108 }
109 
ServerTryCancel(ServerContext * context)110 void ServerTryCancel(ServerContext* context) {
111   EXPECT_FALSE(context->IsCancelled());
112   context->TryCancel();
113   gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
114   // Now wait until it's really canceled
115   while (!context->IsCancelled()) {
116     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
117                                  gpr_time_from_micros(1000, GPR_TIMESPAN)));
118   }
119 }
120 
ServerTryCancelNonblocking(CallbackServerContext * context)121 void ServerTryCancelNonblocking(CallbackServerContext* context) {
122   EXPECT_FALSE(context->IsCancelled());
123   context->TryCancel();
124   gpr_log(GPR_INFO,
125           "Server called TryCancelNonblocking() to cancel the request");
126 }
127 
128 }  // namespace internal
129 
Echo(CallbackServerContext * context,const EchoRequest * request,EchoResponse * response)130 ServerUnaryReactor* CallbackTestServiceImpl::Echo(
131     CallbackServerContext* context, const EchoRequest* request,
132     EchoResponse* response) {
133   class Reactor : public grpc::ServerUnaryReactor {
134    public:
135     Reactor(CallbackTestServiceImpl* service, CallbackServerContext* ctx,
136             const EchoRequest* request, EchoResponse* response)
137         : service_(service), ctx_(ctx), req_(request), resp_(response) {
138       // It should be safe to call IsCancelled here, even though we don't know
139       // the result. Call it asynchronously to see if we trigger any data races.
140       // Join it in OnDone (technically that could be blocking but shouldn't be
141       // for very long).
142       async_cancel_check_ = std::thread([this] { (void)ctx_->IsCancelled(); });
143 
144       started_ = true;
145 
146       if (request->has_param() &&
147           request->param().server_notify_client_when_started()) {
148         service->signaller_.SignalClientThatRpcStarted();
149         // Block on the "wait to continue" decision in a different thread since
150         // we can't tie up an EM thread with blocking events. We can join it in
151         // OnDone since it would definitely be done by then.
152         rpc_wait_thread_ = std::thread([this] {
153           service_->signaller_.ServerWaitToContinue();
154           StartRpc();
155         });
156       } else {
157         StartRpc();
158       }
159     }
160 
161     void StartRpc() {
162       if (req_->has_param() && req_->param().server_sleep_us() > 0) {
163         // Set an alarm for that much time
164         alarm_.Set(
165             gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
166                          gpr_time_from_micros(req_->param().server_sleep_us() *
167                                                   grpc_test_slowdown_factor(),
168                                               GPR_TIMESPAN)),
169             [this](bool ok) { NonDelayed(ok); });
170         return;
171       }
172       NonDelayed(true);
173     }
174     void OnSendInitialMetadataDone(bool ok) override {
175       EXPECT_TRUE(ok);
176       initial_metadata_sent_ = true;
177     }
178     void OnCancel() override {
179       EXPECT_TRUE(started_);
180       EXPECT_TRUE(ctx_->IsCancelled());
181       on_cancel_invoked_ = true;
182       std::lock_guard<std::mutex> l(cancel_mu_);
183       cancel_cv_.notify_one();
184     }
185     void OnDone() override {
186       if (req_->has_param() && req_->param().echo_metadata_initially()) {
187         EXPECT_TRUE(initial_metadata_sent_);
188       }
189       EXPECT_EQ(ctx_->IsCancelled(), on_cancel_invoked_);
190       // Validate that finishing with a non-OK status doesn't cause cancellation
191       if (req_->has_param() && req_->param().has_expected_error()) {
192         EXPECT_FALSE(on_cancel_invoked_);
193       }
194       async_cancel_check_.join();
195       if (rpc_wait_thread_.joinable()) {
196         rpc_wait_thread_.join();
197       }
198       if (finish_when_cancelled_.joinable()) {
199         finish_when_cancelled_.join();
200       }
201       delete this;
202     }
203 
204    private:
205     void NonDelayed(bool ok) {
206       if (!ok) {
207         EXPECT_TRUE(ctx_->IsCancelled());
208         Finish(Status::CANCELLED);
209         return;
210       }
211       if (req_->has_param() && req_->param().server_die()) {
212         gpr_log(GPR_ERROR, "The request should not reach application handler.");
213         GPR_ASSERT(0);
214       }
215       if (req_->has_param() && req_->param().has_expected_error()) {
216         const auto& error = req_->param().expected_error();
217         Finish(Status(static_cast<StatusCode>(error.code()),
218                       error.error_message(), error.binary_error_details()));
219         return;
220       }
221       int server_try_cancel = internal::GetIntValueFromMetadata(
222           kServerTryCancelRequest, ctx_->client_metadata(), DO_NOT_CANCEL);
223       if (server_try_cancel != DO_NOT_CANCEL) {
224         // Since this is a unary RPC, by the time this server handler is called,
225         // the 'request' message is already read from the client. So the
226         // scenarios in server_try_cancel don't make much sense. Just cancel the
227         // RPC as long as server_try_cancel is not DO_NOT_CANCEL
228         EXPECT_FALSE(ctx_->IsCancelled());
229         ctx_->TryCancel();
230         gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
231         FinishWhenCancelledAsync();
232         return;
233       }
234       resp_->set_message(req_->message());
235       internal::MaybeEchoDeadline(ctx_, req_, resp_);
236       if (service_->host_) {
237         resp_->mutable_param()->set_host(*service_->host_);
238       } else if (req_->has_param() &&
239                  req_->param().echo_host_from_authority_header()) {
240         auto authority = ctx_->ExperimentalGetAuthority();
241         std::string authority_str(authority.data(), authority.size());
242         resp_->mutable_param()->set_host(std::move(authority_str));
243       }
244       if (req_->has_param() && req_->param().client_cancel_after_us()) {
245         {
246           std::unique_lock<std::mutex> lock(service_->mu_);
247           service_->signal_client_ = true;
248         }
249         FinishWhenCancelledAsync();
250         return;
251       } else if (req_->has_param() && req_->param().server_cancel_after_us()) {
252         alarm_.Set(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
253                                 gpr_time_from_micros(
254                                     req_->param().server_cancel_after_us() *
255                                         grpc_test_slowdown_factor(),
256                                     GPR_TIMESPAN)),
257                    [this](bool) { Finish(Status::CANCELLED); });
258         return;
259       } else if (!req_->has_param() || !req_->param().skip_cancelled_check()) {
260         EXPECT_FALSE(ctx_->IsCancelled());
261       }
262 
263       if (req_->has_param() && req_->param().echo_metadata_initially()) {
264         const std::multimap<grpc::string_ref, grpc::string_ref>&
265             client_metadata = ctx_->client_metadata();
266         for (const auto& metadatum : client_metadata) {
267           ctx_->AddInitialMetadata(ToString(metadatum.first),
268                                    ToString(metadatum.second));
269         }
270         StartSendInitialMetadata();
271       }
272 
273       if (req_->has_param() && req_->param().echo_metadata()) {
274         const std::multimap<grpc::string_ref, grpc::string_ref>&
275             client_metadata = ctx_->client_metadata();
276         for (const auto& metadatum : client_metadata) {
277           ctx_->AddTrailingMetadata(ToString(metadatum.first),
278                                     ToString(metadatum.second));
279         }
280         // Terminate rpc with error and debug info in trailer.
281         if (req_->param().debug_info().stack_entries_size() ||
282             !req_->param().debug_info().detail().empty()) {
283           std::string serialized_debug_info =
284               req_->param().debug_info().SerializeAsString();
285           ctx_->AddTrailingMetadata(kDebugInfoTrailerKey,
286                                     serialized_debug_info);
287           Finish(Status::CANCELLED);
288           return;
289         }
290       }
291       if (req_->has_param() &&
292           (req_->param().expected_client_identity().length() > 0 ||
293            req_->param().check_auth_context())) {
294         internal::CheckServerAuthContext(
295             ctx_, req_->param().expected_transport_security_type(),
296             req_->param().expected_client_identity());
297       }
298       if (req_->has_param() && req_->param().response_message_length() > 0) {
299         resp_->set_message(
300             std::string(req_->param().response_message_length(), '\0'));
301       }
302       if (req_->has_param() && req_->param().echo_peer()) {
303         resp_->mutable_param()->set_peer(ctx_->peer());
304       }
305       Finish(Status::OK);
306     }
307     void FinishWhenCancelledAsync() {
308       finish_when_cancelled_ = std::thread([this] {
309         std::unique_lock<std::mutex> l(cancel_mu_);
310         cancel_cv_.wait(l, [this] { return ctx_->IsCancelled(); });
311         Finish(Status::CANCELLED);
312       });
313     }
314 
315     CallbackTestServiceImpl* const service_;
316     CallbackServerContext* const ctx_;
317     const EchoRequest* const req_;
318     EchoResponse* const resp_;
319     Alarm alarm_;
320     std::mutex cancel_mu_;
321     std::condition_variable cancel_cv_;
322     bool initial_metadata_sent_ = false;
323     bool started_ = false;
324     bool on_cancel_invoked_ = false;
325     std::thread async_cancel_check_;
326     std::thread rpc_wait_thread_;
327     std::thread finish_when_cancelled_;
328   };
329 
330   return new Reactor(this, context, request, response);
331 }
332 
CheckClientInitialMetadata(CallbackServerContext * context,const SimpleRequest *,SimpleResponse *)333 ServerUnaryReactor* CallbackTestServiceImpl::CheckClientInitialMetadata(
334     CallbackServerContext* context, const SimpleRequest*, SimpleResponse*) {
335   class Reactor : public grpc::ServerUnaryReactor {
336    public:
337     explicit Reactor(CallbackServerContext* ctx) {
338       EXPECT_EQ(internal::MetadataMatchCount(ctx->client_metadata(),
339                                              kCheckClientInitialMetadataKey,
340                                              kCheckClientInitialMetadataVal),
341                 1);
342       EXPECT_EQ(ctx->client_metadata().count(kCheckClientInitialMetadataKey),
343                 1u);
344       Finish(Status::OK);
345     }
346     void OnDone() override { delete this; }
347   };
348 
349   return new Reactor(context);
350 }
351 
RequestStream(CallbackServerContext * context,EchoResponse * response)352 ServerReadReactor<EchoRequest>* CallbackTestServiceImpl::RequestStream(
353     CallbackServerContext* context, EchoResponse* response) {
354   // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
355   // the server by calling ServerContext::TryCancel() depending on the
356   // value:
357   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
358   //   reads any message from the client CANCEL_DURING_PROCESSING: The RPC
359   //   is cancelled while the server is reading messages from the client
360   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
361   //   all the messages from the client
362   int server_try_cancel = internal::GetIntValueFromMetadata(
363       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
364   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
365     internal::ServerTryCancelNonblocking(context);
366     // Don't need to provide a reactor since the RPC is canceled
367     return nullptr;
368   }
369 
370   class Reactor : public grpc::ServerReadReactor<EchoRequest> {
371    public:
372     Reactor(CallbackServerContext* ctx, EchoResponse* response,
373             int server_try_cancel)
374         : ctx_(ctx),
375           response_(response),
376           server_try_cancel_(server_try_cancel) {
377       EXPECT_NE(server_try_cancel, CANCEL_BEFORE_PROCESSING);
378       response->set_message("");
379 
380       if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
381         ctx->TryCancel();
382         // Don't wait for it here
383       }
384       StartRead(&request_);
385       setup_done_ = true;
386     }
387     void OnDone() override { delete this; }
388     void OnCancel() override {
389       EXPECT_TRUE(setup_done_);
390       EXPECT_TRUE(ctx_->IsCancelled());
391       FinishOnce(Status::CANCELLED);
392     }
393     void OnReadDone(bool ok) override {
394       if (ok) {
395         response_->mutable_message()->append(request_.message());
396         num_msgs_read_++;
397         StartRead(&request_);
398       } else {
399         gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read_);
400 
401         if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
402           // Let OnCancel recover this
403           return;
404         }
405         if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
406           internal::ServerTryCancelNonblocking(ctx_);
407           return;
408         }
409         FinishOnce(Status::OK);
410       }
411     }
412 
413    private:
414     void FinishOnce(const Status& s) {
415       std::lock_guard<std::mutex> l(finish_mu_);
416       if (!finished_) {
417         Finish(s);
418         finished_ = true;
419       }
420     }
421 
422     CallbackServerContext* const ctx_;
423     EchoResponse* const response_;
424     EchoRequest request_;
425     int num_msgs_read_{0};
426     int server_try_cancel_;
427     std::mutex finish_mu_;
428     bool finished_{false};
429     bool setup_done_{false};
430   };
431 
432   return new Reactor(context, response, server_try_cancel);
433 }
434 
435 // Return 'kNumResponseStreamMsgs' messages.
436 // TODO(yangg) make it generic by adding a parameter into EchoRequest
ResponseStream(CallbackServerContext * context,const EchoRequest * request)437 ServerWriteReactor<EchoResponse>* CallbackTestServiceImpl::ResponseStream(
438     CallbackServerContext* context, const EchoRequest* request) {
439   // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
440   // the server by calling ServerContext::TryCancel() depending on the
441   // value:
442   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
443   //   reads any message from the client CANCEL_DURING_PROCESSING: The RPC
444   //   is cancelled while the server is reading messages from the client
445   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
446   //   all the messages from the client
447   int server_try_cancel = internal::GetIntValueFromMetadata(
448       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
449   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
450     internal::ServerTryCancelNonblocking(context);
451   }
452 
453   class Reactor : public grpc::ServerWriteReactor<EchoResponse> {
454    public:
455     Reactor(CallbackServerContext* ctx, const EchoRequest* request,
456             int server_try_cancel)
457         : ctx_(ctx), request_(request), server_try_cancel_(server_try_cancel) {
458       server_coalescing_api_ = internal::GetIntValueFromMetadata(
459           kServerUseCoalescingApi, ctx->client_metadata(), 0);
460       server_responses_to_send_ = internal::GetIntValueFromMetadata(
461           kServerResponseStreamsToSend, ctx->client_metadata(),
462           kServerDefaultResponseStreamsToSend);
463       if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
464         ctx->TryCancel();
465       }
466       if (server_try_cancel_ != CANCEL_BEFORE_PROCESSING) {
467         if (num_msgs_sent_ < server_responses_to_send_) {
468           NextWrite();
469         }
470       }
471       setup_done_ = true;
472     }
473     void OnDone() override { delete this; }
474     void OnCancel() override {
475       EXPECT_TRUE(setup_done_);
476       EXPECT_TRUE(ctx_->IsCancelled());
477       FinishOnce(Status::CANCELLED);
478     }
479     void OnWriteDone(bool /*ok*/) override {
480       if (num_msgs_sent_ < server_responses_to_send_) {
481         NextWrite();
482       } else if (server_coalescing_api_ != 0) {
483         // We would have already done Finish just after the WriteLast
484       } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
485         // Let OnCancel recover this
486       } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
487         internal::ServerTryCancelNonblocking(ctx_);
488       } else {
489         FinishOnce(Status::OK);
490       }
491     }
492 
493    private:
494     void FinishOnce(const Status& s) {
495       std::lock_guard<std::mutex> l(finish_mu_);
496       if (!finished_) {
497         Finish(s);
498         finished_ = true;
499       }
500     }
501 
502     void NextWrite() {
503       response_.set_message(request_->message() +
504                             std::to_string(num_msgs_sent_));
505       if (num_msgs_sent_ == server_responses_to_send_ - 1 &&
506           server_coalescing_api_ != 0) {
507         {
508           std::lock_guard<std::mutex> l(finish_mu_);
509           if (!finished_) {
510             num_msgs_sent_++;
511             StartWriteLast(&response_, WriteOptions());
512           }
513         }
514         // If we use WriteLast, we shouldn't wait before attempting Finish
515         FinishOnce(Status::OK);
516       } else {
517         std::lock_guard<std::mutex> l(finish_mu_);
518         if (!finished_) {
519           num_msgs_sent_++;
520           StartWrite(&response_);
521         }
522       }
523     }
524     CallbackServerContext* const ctx_;
525     const EchoRequest* const request_;
526     EchoResponse response_;
527     int num_msgs_sent_{0};
528     int server_try_cancel_;
529     int server_coalescing_api_;
530     int server_responses_to_send_;
531     std::mutex finish_mu_;
532     bool finished_{false};
533     bool setup_done_{false};
534   };
535   return new Reactor(context, request, server_try_cancel);
536 }
537 
538 ServerBidiReactor<EchoRequest, EchoResponse>*
BidiStream(CallbackServerContext * context)539 CallbackTestServiceImpl::BidiStream(CallbackServerContext* context) {
540   class Reactor : public grpc::ServerBidiReactor<EchoRequest, EchoResponse> {
541    public:
542     explicit Reactor(CallbackServerContext* ctx) : ctx_(ctx) {
543       // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
544       // the server by calling ServerContext::TryCancel() depending on the
545       // value:
546       //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
547       //   reads any message from the client CANCEL_DURING_PROCESSING: The RPC
548       //   is cancelled while the server is reading messages from the client
549       //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
550       //   all the messages from the client
551       server_try_cancel_ = internal::GetIntValueFromMetadata(
552           kServerTryCancelRequest, ctx->client_metadata(), DO_NOT_CANCEL);
553       server_write_last_ = internal::GetIntValueFromMetadata(
554           kServerFinishAfterNReads, ctx->client_metadata(), 0);
555       client_try_cancel_ = static_cast<bool>(internal::GetIntValueFromMetadata(
556           kClientTryCancelRequest, ctx->client_metadata(), 0));
557       if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
558         internal::ServerTryCancelNonblocking(ctx);
559       } else {
560         if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
561           ctx->TryCancel();
562         }
563         StartRead(&request_);
564       }
565       setup_done_ = true;
566     }
567     void OnDone() override {
568       {
569         // Use the same lock as finish to make sure that OnDone isn't inlined.
570         std::lock_guard<std::mutex> l(finish_mu_);
571         EXPECT_TRUE(finished_);
572         finish_thread_.join();
573       }
574       delete this;
575     }
576     void OnCancel() override {
577       cancel_notification_.Notify();
578       EXPECT_TRUE(setup_done_);
579       EXPECT_TRUE(ctx_->IsCancelled());
580       FinishOnce(Status::CANCELLED);
581     }
582     void OnReadDone(bool ok) override {
583       if (ok) {
584         num_msgs_read_++;
585         response_.set_message(request_.message());
586         std::lock_guard<std::mutex> l(finish_mu_);
587         if (!finished_) {
588           if (num_msgs_read_ == server_write_last_) {
589             StartWriteLast(&response_, WriteOptions());
590             // If we use WriteLast, we shouldn't wait before attempting Finish
591           } else {
592             StartWrite(&response_);
593             return;
594           }
595         }
596       } else if (client_try_cancel_) {
597         cancel_notification_.WaitForNotificationWithTimeout(absl::Seconds(10));
598         EXPECT_TRUE(ctx_->IsCancelled());
599       }
600 
601       if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
602         // Let OnCancel handle this
603       } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
604         internal::ServerTryCancelNonblocking(ctx_);
605       } else {
606         FinishOnce(Status::OK);
607       }
608     }
609     void OnWriteDone(bool /*ok*/) override {
610       std::lock_guard<std::mutex> l(finish_mu_);
611       if (!finished_) {
612         StartRead(&request_);
613       }
614     }
615 
616    private:
617     void FinishOnce(const Status& s) {
618       std::lock_guard<std::mutex> l(finish_mu_);
619       if (!finished_) {
620         finished_ = true;
621         // Finish asynchronously to make sure that there are no deadlocks.
622         finish_thread_ = std::thread([this, s] {
623           std::lock_guard<std::mutex> l(finish_mu_);
624           Finish(s);
625         });
626       }
627     }
628 
629     CallbackServerContext* const ctx_;
630     EchoRequest request_;
631     EchoResponse response_;
632     int num_msgs_read_{0};
633     int server_try_cancel_;
634     int server_write_last_;
635     std::mutex finish_mu_;
636     bool finished_{false};
637     bool setup_done_{false};
638     std::thread finish_thread_;
639     bool client_try_cancel_ = false;
640     grpc_core::Notification cancel_notification_;
641   };
642 
643   return new Reactor(context);
644 }
645 
646 }  // namespace testing
647 }  // namespace grpc
648