xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/async_end2end_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <cinttypes>
20 #include <memory>
21 #include <thread>
22 
23 #include "absl/memory/memory.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/strings/str_format.h"
26 
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/create_channel.h>
34 #include <grpcpp/ext/health_check_service_server_builder_option.h>
35 #include <grpcpp/server.h>
36 #include <grpcpp/server_builder.h>
37 #include <grpcpp/server_context.h>
38 
39 #include "src/core/client_channel/backup_poller.h"
40 #include "src/core/lib/config/config_vars.h"
41 #include "src/core/lib/gprpp/crash.h"
42 #include "src/core/lib/gprpp/debug_location.h"
43 #include "src/core/lib/iomgr/port.h"
44 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
45 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
46 #include "src/proto/grpc/testing/echo.grpc.pb.h"
47 #include "test/core/util/build.h"
48 #include "test/core/util/port.h"
49 #include "test/core/util/test_config.h"
50 #include "test/cpp/util/string_ref_helper.h"
51 #include "test/cpp/util/test_credentials_provider.h"
52 
53 #ifdef GRPC_POSIX_SOCKET_EV
54 #include "src/core/lib/iomgr/ev_posix.h"
55 #endif  // GRPC_POSIX_SOCKET_EV
56 
57 #include <gtest/gtest.h>
58 
59 using grpc::testing::EchoRequest;
60 using grpc::testing::EchoResponse;
61 using std::chrono::system_clock;
62 
63 namespace grpc {
64 namespace testing {
65 
66 namespace {
67 
tag(int t)68 void* tag(int t) { return reinterpret_cast<void*>(t); }
detag(void * p)69 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
70 
71 class Verifier {
72  public:
Verifier()73   Verifier() : lambda_run_(false) {}
74   // Expect sets the expected ok value for a specific tag
Expect(int i,bool expect_ok,grpc_core::SourceLocation whence=grpc_core::SourceLocation ())75   Verifier& Expect(
76       int i, bool expect_ok,
77       grpc_core::SourceLocation whence = grpc_core::SourceLocation()) {
78     return ExpectUnless(i, expect_ok, false, whence);
79   }
80   // ExpectUnless sets the expected ok value for a specific tag
81   // unless the tag was already marked seen (as a result of ExpectMaybe)
ExpectUnless(int i,bool expect_ok,bool seen,grpc_core::SourceLocation whence=grpc_core::SourceLocation ())82   Verifier& ExpectUnless(
83       int i, bool expect_ok, bool seen,
84       grpc_core::SourceLocation whence = grpc_core::SourceLocation()) {
85     if (!seen) {
86       expectations_[tag(i)] = {expect_ok, whence};
87     }
88     return *this;
89   }
90   // ExpectMaybe sets the expected ok value for a specific tag, but does not
91   // require it to appear
92   // If it does, sets *seen to true
ExpectMaybe(int i,bool expect_ok,bool * seen,grpc_core::SourceLocation whence=grpc_core::SourceLocation ())93   Verifier& ExpectMaybe(
94       int i, bool expect_ok, bool* seen,
95       grpc_core::SourceLocation whence = grpc_core::SourceLocation()) {
96     if (!*seen) {
97       maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen, whence};
98     }
99     return *this;
100   }
101 
102   // Next waits for 1 async tag to complete, checks its
103   // expectations, and returns the tag
Next(CompletionQueue * cq,bool ignore_ok)104   int Next(CompletionQueue* cq, bool ignore_ok) {
105     bool ok;
106     void* got_tag;
107     EXPECT_TRUE(cq->Next(&got_tag, &ok));
108     GotTag(got_tag, ok, ignore_ok);
109     return detag(got_tag);
110   }
111 
112   template <typename T>
DoOnceThenAsyncNext(CompletionQueue * cq,void ** got_tag,bool * ok,T deadline,std::function<void (void)> lambda)113   CompletionQueue::NextStatus DoOnceThenAsyncNext(
114       CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
115       std::function<void(void)> lambda) {
116     if (lambda_run_) {
117       return cq->AsyncNext(got_tag, ok, deadline);
118     } else {
119       lambda_run_ = true;
120       return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
121     }
122   }
123 
124   // Verify keeps calling Next until all currently set
125   // expected tags are complete
Verify(CompletionQueue * cq)126   void Verify(CompletionQueue* cq) { Verify(cq, false); }
127 
128   // This version of Verify allows optionally ignoring the
129   // outcome of the expectation
Verify(CompletionQueue * cq,bool ignore_ok)130   void Verify(CompletionQueue* cq, bool ignore_ok) {
131     GPR_ASSERT(!expectations_.empty() || !maybe_expectations_.empty());
132     while (!expectations_.empty()) {
133       Next(cq, ignore_ok);
134     }
135     maybe_expectations_.clear();
136   }
137 
138   // This version of Verify stops after a certain deadline
Verify(CompletionQueue * cq,std::chrono::system_clock::time_point deadline)139   void Verify(CompletionQueue* cq,
140               std::chrono::system_clock::time_point deadline) {
141     if (expectations_.empty()) {
142       bool ok;
143       void* got_tag;
144       EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
145                 CompletionQueue::TIMEOUT);
146     } else {
147       while (!expectations_.empty()) {
148         bool ok;
149         void* got_tag;
150         EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
151                   CompletionQueue::GOT_EVENT);
152         GotTag(got_tag, ok, false);
153       }
154     }
155     maybe_expectations_.clear();
156   }
157 
158   // This version of Verify stops after a certain deadline, and uses the
159   // DoThenAsyncNext API
160   // to call the lambda
Verify(CompletionQueue * cq,std::chrono::system_clock::time_point deadline,const std::function<void (void)> & lambda)161   void Verify(CompletionQueue* cq,
162               std::chrono::system_clock::time_point deadline,
163               const std::function<void(void)>& lambda) {
164     if (expectations_.empty()) {
165       bool ok;
166       void* got_tag;
167       EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
168                 CompletionQueue::TIMEOUT);
169     } else {
170       while (!expectations_.empty()) {
171         bool ok;
172         void* got_tag;
173         EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
174                   CompletionQueue::GOT_EVENT);
175         GotTag(got_tag, ok, false);
176       }
177     }
178     maybe_expectations_.clear();
179   }
180 
181  private:
GotTag(void * got_tag,bool ok,bool ignore_ok)182   void GotTag(void* got_tag, bool ok, bool ignore_ok) {
183     auto it = expectations_.find(got_tag);
184     if (it != expectations_.end()) {
185       if (!ignore_ok) {
186         EXPECT_EQ(it->second.ok, ok) << it->second.ToString(it->first);
187       }
188       expectations_.erase(it);
189     } else {
190       auto it2 = maybe_expectations_.find(got_tag);
191       if (it2 != maybe_expectations_.end()) {
192         if (it2->second.seen != nullptr) {
193           EXPECT_FALSE(*it2->second.seen);
194           *it2->second.seen = true;
195         }
196         if (!ignore_ok) {
197           EXPECT_EQ(it2->second.ok, ok) << it->second.ToString(it->first);
198         }
199         maybe_expectations_.erase(it2);
200       } else {
201         grpc_core::Crash(absl::StrFormat("Unexpected tag: %p", got_tag));
202       }
203     }
204   }
205 
206   struct MaybeExpect {
207     bool ok;
208     bool* seen;
209     grpc_core::SourceLocation whence;
ToStringgrpc::testing::__anon04e861db0111::Verifier::MaybeExpect210     std::string ToString(void* tag) const {
211       return absl::StrCat(
212           "[MaybeExpect] tag=", reinterpret_cast<uintptr_t>(tag),
213           " expect_ok=", ok, " whence=", whence.file(), ":", whence.line());
214     }
215   };
216 
217   struct DefinitelyExpect {
218     bool ok;
219     grpc_core::SourceLocation whence;
ToStringgrpc::testing::__anon04e861db0111::Verifier::DefinitelyExpect220     std::string ToString(void* tag) const {
221       return absl::StrCat("[Expect] tag=", reinterpret_cast<uintptr_t>(tag),
222                           " expect_ok=", ok, " whence=", whence.file(), ":",
223                           whence.line());
224     }
225   };
226 
227   std::map<void*, DefinitelyExpect> expectations_;
228   std::map<void*, MaybeExpect> maybe_expectations_;
229   bool lambda_run_;
230 };
231 
plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin> & plugin)232 bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
233   return plugin->has_sync_methods();
234 }
235 
236 // This class disables the server builder plugins that may add sync services to
237 // the server. If there are sync services, UnimplementedRpc test will triger
238 // the sync unknown rpc routine on the server side, rather than the async one
239 // that needs to be tested here.
240 class ServerBuilderSyncPluginDisabler : public grpc::ServerBuilderOption {
241  public:
UpdateArguments(ChannelArguments *)242   void UpdateArguments(ChannelArguments* /*arg*/) override {}
243 
UpdatePlugins(std::vector<std::unique_ptr<ServerBuilderPlugin>> * plugins)244   void UpdatePlugins(
245       std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) override {
246     plugins->erase(std::remove_if(plugins->begin(), plugins->end(),
247                                   plugin_has_sync_methods),
248                    plugins->end());
249   }
250 };
251 
252 class TestScenario {
253  public:
TestScenario(bool inproc_stub,const std::string & creds_type,bool hcs,const std::string & content)254   TestScenario(bool inproc_stub, const std::string& creds_type, bool hcs,
255                const std::string& content)
256       : inproc(inproc_stub),
257         health_check_service(hcs),
258         credentials_type(creds_type),
259         message_content(content) {}
260   void Log() const;
261   bool inproc;
262   bool health_check_service;
263   const std::string credentials_type;
264   const std::string message_content;
265 };
266 
operator <<(std::ostream & out,const TestScenario & scenario)267 std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) {
268   return out << "TestScenario{inproc=" << (scenario.inproc ? "true" : "false")
269              << ", credentials='" << scenario.credentials_type
270              << ", health_check_service="
271              << (scenario.health_check_service ? "true" : "false")
272              << "', message_size=" << scenario.message_content.size() << "}";
273 }
274 
Log() const275 void TestScenario::Log() const {
276   std::ostringstream out;
277   out << *this;
278   gpr_log(GPR_DEBUG, "%s", out.str().c_str());
279 }
280 
281 class HealthCheck : public health::v1::Health::Service {};
282 
283 class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
284  protected:
AsyncEnd2endTest()285   AsyncEnd2endTest() { GetParam().Log(); }
286 
SetUp()287   void SetUp() override {
288     port_ = grpc_pick_unused_port_or_die();
289     server_address_ << "localhost:" << port_;
290 
291     // Setup server
292     BuildAndStartServer();
293   }
294 
TearDown()295   void TearDown() override {
296     stub_.reset();
297     ServerShutdown();
298     grpc_recycle_unused_port(port_);
299   }
300 
ServerShutdown()301   void ServerShutdown() {
302     std::thread t([this]() {
303       void* ignored_tag;
304       bool ignored_ok;
305       while (cq_->Next(&ignored_tag, &ignored_ok)) {
306       }
307     });
308     server_->Shutdown();
309     cq_->Shutdown();
310     t.join();
311   }
312 
BuildAndStartServer()313   void BuildAndStartServer() {
314     ServerBuilder builder;
315     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
316         GetParam().credentials_type);
317     builder.AddListeningPort(server_address_.str(), server_creds);
318     service_ = std::make_unique<grpc::testing::EchoTestService::AsyncService>();
319     builder.RegisterService(service_.get());
320     if (GetParam().health_check_service) {
321       builder.RegisterService(&health_check_);
322     }
323     cq_ = builder.AddCompletionQueue();
324 
325     // TODO(zyc): make a test option to choose wheather sync plugins should be
326     // deleted
327     std::unique_ptr<ServerBuilderOption> sync_plugin_disabler(
328         new ServerBuilderSyncPluginDisabler());
329     builder.SetOption(std::move(sync_plugin_disabler));
330     server_ = builder.BuildAndStart();
331   }
332 
ResetStub()333   void ResetStub() {
334     ChannelArguments args;
335     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
336         GetParam().credentials_type, &args);
337     std::shared_ptr<Channel> channel =
338         !(GetParam().inproc) ? grpc::CreateCustomChannel(server_address_.str(),
339                                                          channel_creds, args)
340                              : server_->InProcessChannel(args);
341     stub_ = grpc::testing::EchoTestService::NewStub(channel);
342   }
343 
SendRpc(int num_rpcs)344   void SendRpc(int num_rpcs) {
345     for (int i = 0; i < num_rpcs; i++) {
346       EchoRequest send_request;
347       EchoRequest recv_request;
348       EchoResponse send_response;
349       EchoResponse recv_response;
350       Status recv_status;
351 
352       ClientContext cli_ctx;
353       ServerContext srv_ctx;
354       grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
355 
356       send_request.set_message(GetParam().message_content);
357       std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
358           stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
359 
360       service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
361                             cq_.get(), cq_.get(), tag(2));
362 
363       response_reader->Finish(&recv_response, &recv_status, tag(4));
364 
365       Verifier().Expect(2, true).Verify(cq_.get());
366       EXPECT_EQ(send_request.message(), recv_request.message());
367 
368       send_response.set_message(recv_request.message());
369       response_writer.Finish(send_response, Status::OK, tag(3));
370       Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
371 
372       EXPECT_EQ(send_response.message(), recv_response.message());
373       EXPECT_TRUE(recv_status.ok());
374     }
375   }
376 
377   std::unique_ptr<ServerCompletionQueue> cq_;
378   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
379   std::unique_ptr<Server> server_;
380   std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
381   HealthCheck health_check_;
382   std::ostringstream server_address_;
383   int port_;
384 };
385 
TEST_P(AsyncEnd2endTest,SimpleRpc)386 TEST_P(AsyncEnd2endTest, SimpleRpc) {
387   ResetStub();
388   SendRpc(1);
389 }
390 
TEST_P(AsyncEnd2endTest,SimpleRpcWithExpectedError)391 TEST_P(AsyncEnd2endTest, SimpleRpcWithExpectedError) {
392   ResetStub();
393 
394   EchoRequest send_request;
395   EchoRequest recv_request;
396   EchoResponse send_response;
397   EchoResponse recv_response;
398   Status recv_status;
399 
400   ClientContext cli_ctx;
401   ServerContext srv_ctx;
402   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
403   ErrorStatus error_status;
404 
405   send_request.set_message(GetParam().message_content);
406   error_status.set_code(1);  // CANCELLED
407   error_status.set_error_message("cancel error message");
408   *send_request.mutable_param()->mutable_expected_error() = error_status;
409 
410   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
411       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
412 
413   srv_ctx.AsyncNotifyWhenDone(tag(5));
414   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
415                         cq_.get(), tag(2));
416 
417   response_reader->Finish(&recv_response, &recv_status, tag(4));
418 
419   Verifier().Expect(2, true).Verify(cq_.get());
420   EXPECT_EQ(send_request.message(), recv_request.message());
421 
422   send_response.set_message(recv_request.message());
423   response_writer.Finish(
424       send_response,
425       Status(
426           static_cast<StatusCode>(recv_request.param().expected_error().code()),
427           recv_request.param().expected_error().error_message()),
428       tag(3));
429   Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
430 
431   EXPECT_EQ(recv_response.message(), "");
432   EXPECT_EQ(recv_status.error_code(), error_status.code());
433   EXPECT_EQ(recv_status.error_message(), error_status.error_message());
434   EXPECT_FALSE(srv_ctx.IsCancelled());
435 }
436 
TEST_P(AsyncEnd2endTest,SequentialRpcs)437 TEST_P(AsyncEnd2endTest, SequentialRpcs) {
438   ResetStub();
439   SendRpc(10);
440 }
441 
TEST_P(AsyncEnd2endTest,ReconnectChannel)442 TEST_P(AsyncEnd2endTest, ReconnectChannel) {
443   // GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS is set to 100ms in main()
444   if (GetParam().inproc) {
445     return;
446   }
447   int poller_slowdown_factor = 1;
448 #ifdef GRPC_POSIX_SOCKET_EV
449   // It needs 2 pollset_works to reconnect the channel with polling engine
450   // "poll"
451   if (grpc_core::ConfigVars::Get().PollStrategy() == "poll") {
452     poller_slowdown_factor = 2;
453   }
454 #endif  // GRPC_POSIX_SOCKET_EV
455   ResetStub();
456   SendRpc(1);
457   ServerShutdown();
458   BuildAndStartServer();
459   // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
460   // reconnect the channel.
461   gpr_sleep_until(gpr_time_add(
462       gpr_now(GPR_CLOCK_REALTIME),
463       gpr_time_from_millis(
464           300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
465           GPR_TIMESPAN)));
466   SendRpc(1);
467 }
468 
469 // We do not need to protect notify because the use is synchronized.
ServerWait(Server * server,int * notify)470 void ServerWait(Server* server, int* notify) {
471   server->Wait();
472   *notify = 1;
473 }
TEST_P(AsyncEnd2endTest,WaitAndShutdownTest)474 TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
475   int notify = 0;
476   std::thread wait_thread(&ServerWait, server_.get(), &notify);
477   ResetStub();
478   SendRpc(1);
479   EXPECT_EQ(0, notify);
480   ServerShutdown();
481   wait_thread.join();
482   EXPECT_EQ(1, notify);
483 }
484 
TEST_P(AsyncEnd2endTest,ShutdownThenWait)485 TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
486   ResetStub();
487   SendRpc(1);
488   std::thread t([this]() { ServerShutdown(); });
489   server_->Wait();
490   t.join();
491 }
492 
493 // Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest,AsyncNextRpc)494 TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
495   ResetStub();
496 
497   EchoRequest send_request;
498   EchoRequest recv_request;
499   EchoResponse send_response;
500   EchoResponse recv_response;
501   Status recv_status;
502 
503   ClientContext cli_ctx;
504   ServerContext srv_ctx;
505   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
506 
507   send_request.set_message(GetParam().message_content);
508   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
509       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
510 
511   std::chrono::system_clock::time_point time_now(
512       std::chrono::system_clock::now());
513   std::chrono::system_clock::time_point time_limit(
514       std::chrono::system_clock::now() + std::chrono::seconds(10));
515   Verifier().Verify(cq_.get(), time_now);
516   Verifier().Verify(cq_.get(), time_now);
517 
518   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
519                         cq_.get(), tag(2));
520   response_reader->Finish(&recv_response, &recv_status, tag(4));
521 
522   Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
523   EXPECT_EQ(send_request.message(), recv_request.message());
524 
525   send_response.set_message(recv_request.message());
526   response_writer.Finish(send_response, Status::OK, tag(3));
527   Verifier().Expect(3, true).Expect(4, true).Verify(
528       cq_.get(), std::chrono::system_clock::time_point::max());
529 
530   EXPECT_EQ(send_response.message(), recv_response.message());
531   EXPECT_TRUE(recv_status.ok());
532 }
533 
534 // Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest,DoThenAsyncNextRpc)535 TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
536   ResetStub();
537 
538   EchoRequest send_request;
539   EchoRequest recv_request;
540   EchoResponse send_response;
541   EchoResponse recv_response;
542   Status recv_status;
543 
544   ClientContext cli_ctx;
545   ServerContext srv_ctx;
546   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
547 
548   send_request.set_message(GetParam().message_content);
549   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
550       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
551 
552   std::chrono::system_clock::time_point time_now(
553       std::chrono::system_clock::now());
554   std::chrono::system_clock::time_point time_limit(
555       std::chrono::system_clock::now() + std::chrono::seconds(10));
556   Verifier().Verify(cq_.get(), time_now);
557   Verifier().Verify(cq_.get(), time_now);
558 
559   auto resp_writer_ptr = &response_writer;
560   auto lambda_2 = [&, this, resp_writer_ptr]() {
561     service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
562                           cq_.get(), tag(2));
563   };
564   response_reader->Finish(&recv_response, &recv_status, tag(4));
565 
566   Verifier().Expect(2, true).Verify(cq_.get(), time_limit, lambda_2);
567   EXPECT_EQ(send_request.message(), recv_request.message());
568 
569   send_response.set_message(recv_request.message());
570   auto lambda_3 = [resp_writer_ptr, send_response]() {
571     resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
572   };
573   Verifier().Expect(3, true).Expect(4, true).Verify(
574       cq_.get(), std::chrono::system_clock::time_point::max(), lambda_3);
575 
576   EXPECT_EQ(send_response.message(), recv_response.message());
577   EXPECT_TRUE(recv_status.ok());
578 }
579 
580 // Two pings and a final pong.
TEST_P(AsyncEnd2endTest,SimpleClientStreaming)581 TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
582   ResetStub();
583 
584   EchoRequest send_request;
585   EchoRequest recv_request;
586   EchoResponse send_response;
587   EchoResponse recv_response;
588   Status recv_status;
589   ClientContext cli_ctx;
590   ServerContext srv_ctx;
591   ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
592 
593   send_request.set_message(GetParam().message_content);
594   std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
595       stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
596 
597   service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
598                                  tag(2));
599 
600   Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
601 
602   cli_stream->Write(send_request, tag(3));
603   srv_stream.Read(&recv_request, tag(4));
604   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
605   EXPECT_EQ(send_request.message(), recv_request.message());
606 
607   cli_stream->Write(send_request, tag(5));
608   srv_stream.Read(&recv_request, tag(6));
609   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
610 
611   EXPECT_EQ(send_request.message(), recv_request.message());
612   cli_stream->WritesDone(tag(7));
613   srv_stream.Read(&recv_request, tag(8));
614   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
615 
616   send_response.set_message(recv_request.message());
617   srv_stream.Finish(send_response, Status::OK, tag(9));
618   cli_stream->Finish(&recv_status, tag(10));
619   Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
620 
621   EXPECT_EQ(send_response.message(), recv_response.message());
622   EXPECT_TRUE(recv_status.ok());
623 }
624 
625 // Two pings and a final pong.
TEST_P(AsyncEnd2endTest,SimpleClientStreamingWithCoalescingApi)626 TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
627   ResetStub();
628 
629   EchoRequest send_request;
630   EchoRequest recv_request;
631   EchoResponse send_response;
632   EchoResponse recv_response;
633   Status recv_status;
634   ClientContext cli_ctx;
635   ServerContext srv_ctx;
636   ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
637 
638   send_request.set_message(GetParam().message_content);
639   cli_ctx.set_initial_metadata_corked(true);
640   // tag:1 never comes up since no op is performed
641   std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
642       stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
643 
644   service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
645                                  tag(2));
646 
647   cli_stream->Write(send_request, tag(3));
648 
649   bool seen3 = false;
650 
651   Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
652 
653   srv_stream.Read(&recv_request, tag(4));
654 
655   Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
656 
657   EXPECT_EQ(send_request.message(), recv_request.message());
658 
659   cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
660   srv_stream.Read(&recv_request, tag(6));
661   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
662   EXPECT_EQ(send_request.message(), recv_request.message());
663 
664   srv_stream.Read(&recv_request, tag(7));
665   Verifier().Expect(7, false).Verify(cq_.get());
666 
667   send_response.set_message(recv_request.message());
668   srv_stream.Finish(send_response, Status::OK, tag(8));
669   cli_stream->Finish(&recv_status, tag(9));
670   Verifier().Expect(8, true).Expect(9, true).Verify(cq_.get());
671 
672   EXPECT_EQ(send_response.message(), recv_response.message());
673   EXPECT_TRUE(recv_status.ok());
674 }
675 
676 // One ping, two pongs.
TEST_P(AsyncEnd2endTest,SimpleServerStreaming)677 TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
678   ResetStub();
679 
680   EchoRequest send_request;
681   EchoRequest recv_request;
682   EchoResponse send_response;
683   EchoResponse recv_response;
684   Status recv_status;
685   ClientContext cli_ctx;
686   ServerContext srv_ctx;
687   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
688 
689   send_request.set_message(GetParam().message_content);
690   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
691       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
692 
693   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
694                                   cq_.get(), cq_.get(), tag(2));
695 
696   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
697   EXPECT_EQ(send_request.message(), recv_request.message());
698 
699   send_response.set_message(recv_request.message());
700   srv_stream.Write(send_response, tag(3));
701   cli_stream->Read(&recv_response, tag(4));
702   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
703   EXPECT_EQ(send_response.message(), recv_response.message());
704 
705   srv_stream.Write(send_response, tag(5));
706   cli_stream->Read(&recv_response, tag(6));
707   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
708   EXPECT_EQ(send_response.message(), recv_response.message());
709 
710   srv_stream.Finish(Status::OK, tag(7));
711   cli_stream->Read(&recv_response, tag(8));
712   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
713 
714   cli_stream->Finish(&recv_status, tag(9));
715   Verifier().Expect(9, true).Verify(cq_.get());
716 
717   EXPECT_TRUE(recv_status.ok());
718 }
719 
720 // One ping, two pongs. Using WriteAndFinish API
TEST_P(AsyncEnd2endTest,SimpleServerStreamingWithCoalescingApiWAF)721 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
722   ResetStub();
723 
724   EchoRequest send_request;
725   EchoRequest recv_request;
726   EchoResponse send_response;
727   EchoResponse recv_response;
728   Status recv_status;
729   ClientContext cli_ctx;
730   ServerContext srv_ctx;
731   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
732 
733   send_request.set_message(GetParam().message_content);
734   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
735       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
736 
737   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
738                                   cq_.get(), cq_.get(), tag(2));
739 
740   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
741   EXPECT_EQ(send_request.message(), recv_request.message());
742 
743   send_response.set_message(recv_request.message());
744   srv_stream.Write(send_response, tag(3));
745   cli_stream->Read(&recv_response, tag(4));
746   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
747   EXPECT_EQ(send_response.message(), recv_response.message());
748 
749   srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
750   cli_stream->Read(&recv_response, tag(6));
751   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
752   EXPECT_EQ(send_response.message(), recv_response.message());
753 
754   cli_stream->Read(&recv_response, tag(7));
755   Verifier().Expect(7, false).Verify(cq_.get());
756 
757   cli_stream->Finish(&recv_status, tag(8));
758   Verifier().Expect(8, true).Verify(cq_.get());
759 
760   EXPECT_TRUE(recv_status.ok());
761 }
762 
763 // One ping, two pongs. Using WriteLast API
TEST_P(AsyncEnd2endTest,SimpleServerStreamingWithCoalescingApiWL)764 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
765   ResetStub();
766 
767   EchoRequest send_request;
768   EchoRequest recv_request;
769   EchoResponse send_response;
770   EchoResponse recv_response;
771   Status recv_status;
772   ClientContext cli_ctx;
773   ServerContext srv_ctx;
774   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
775 
776   send_request.set_message(GetParam().message_content);
777   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
778       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
779 
780   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
781                                   cq_.get(), cq_.get(), tag(2));
782 
783   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
784   EXPECT_EQ(send_request.message(), recv_request.message());
785 
786   send_response.set_message(recv_request.message());
787   srv_stream.Write(send_response, tag(3));
788   cli_stream->Read(&recv_response, tag(4));
789   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
790   EXPECT_EQ(send_response.message(), recv_response.message());
791 
792   srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
793   cli_stream->Read(&recv_response, tag(6));
794   srv_stream.Finish(Status::OK, tag(7));
795   Verifier().Expect(5, true).Expect(6, true).Expect(7, true).Verify(cq_.get());
796   EXPECT_EQ(send_response.message(), recv_response.message());
797 
798   cli_stream->Read(&recv_response, tag(8));
799   Verifier().Expect(8, false).Verify(cq_.get());
800 
801   cli_stream->Finish(&recv_status, tag(9));
802   Verifier().Expect(9, true).Verify(cq_.get());
803 
804   EXPECT_TRUE(recv_status.ok());
805 }
806 
807 // One ping, one pong.
TEST_P(AsyncEnd2endTest,SimpleBidiStreaming)808 TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
809   ResetStub();
810 
811   EchoRequest send_request;
812   EchoRequest recv_request;
813   EchoResponse send_response;
814   EchoResponse recv_response;
815   Status recv_status;
816   ClientContext cli_ctx;
817   ServerContext srv_ctx;
818   ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
819 
820   send_request.set_message(GetParam().message_content);
821   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
822       cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
823 
824   service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
825                               tag(2));
826 
827   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
828 
829   cli_stream->Write(send_request, tag(3));
830   srv_stream.Read(&recv_request, tag(4));
831   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
832   EXPECT_EQ(send_request.message(), recv_request.message());
833 
834   send_response.set_message(recv_request.message());
835   srv_stream.Write(send_response, tag(5));
836   cli_stream->Read(&recv_response, tag(6));
837   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
838   EXPECT_EQ(send_response.message(), recv_response.message());
839 
840   cli_stream->WritesDone(tag(7));
841   srv_stream.Read(&recv_request, tag(8));
842   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
843 
844   srv_stream.Finish(Status::OK, tag(9));
845   cli_stream->Finish(&recv_status, tag(10));
846   Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
847 
848   EXPECT_TRUE(recv_status.ok());
849 }
850 
851 // One ping, one pong. Using server:WriteAndFinish api
TEST_P(AsyncEnd2endTest,SimpleBidiStreamingWithCoalescingApiWAF)852 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
853   ResetStub();
854 
855   EchoRequest send_request;
856   EchoRequest recv_request;
857   EchoResponse send_response;
858   EchoResponse recv_response;
859   Status recv_status;
860   ClientContext cli_ctx;
861   ServerContext srv_ctx;
862   ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
863 
864   send_request.set_message(GetParam().message_content);
865   cli_ctx.set_initial_metadata_corked(true);
866   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
867       cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
868 
869   service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
870                               tag(2));
871 
872   cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
873 
874   bool seen3 = false;
875 
876   Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
877 
878   srv_stream.Read(&recv_request, tag(4));
879 
880   Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
881   EXPECT_EQ(send_request.message(), recv_request.message());
882 
883   srv_stream.Read(&recv_request, tag(5));
884   Verifier().Expect(5, false).Verify(cq_.get());
885 
886   send_response.set_message(recv_request.message());
887   srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
888   cli_stream->Read(&recv_response, tag(7));
889   Verifier().Expect(6, true).Expect(7, true).Verify(cq_.get());
890   EXPECT_EQ(send_response.message(), recv_response.message());
891 
892   cli_stream->Finish(&recv_status, tag(8));
893   Verifier().Expect(8, true).Verify(cq_.get());
894 
895   EXPECT_TRUE(recv_status.ok());
896 }
897 
898 // One ping, one pong. Using server:WriteLast api
TEST_P(AsyncEnd2endTest,SimpleBidiStreamingWithCoalescingApiWL)899 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
900   ResetStub();
901 
902   EchoRequest send_request;
903   EchoRequest recv_request;
904   EchoResponse send_response;
905   EchoResponse recv_response;
906   Status recv_status;
907   ClientContext cli_ctx;
908   ServerContext srv_ctx;
909   ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
910 
911   send_request.set_message(GetParam().message_content);
912   cli_ctx.set_initial_metadata_corked(true);
913   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
914       cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
915 
916   service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
917                               tag(2));
918 
919   cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
920 
921   bool seen3 = false;
922 
923   Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
924 
925   srv_stream.Read(&recv_request, tag(4));
926 
927   Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
928   EXPECT_EQ(send_request.message(), recv_request.message());
929 
930   srv_stream.Read(&recv_request, tag(5));
931   Verifier().Expect(5, false).Verify(cq_.get());
932 
933   send_response.set_message(recv_request.message());
934   srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
935   srv_stream.Finish(Status::OK, tag(7));
936   cli_stream->Read(&recv_response, tag(8));
937   Verifier().Expect(6, true).Expect(7, true).Expect(8, true).Verify(cq_.get());
938   EXPECT_EQ(send_response.message(), recv_response.message());
939 
940   cli_stream->Finish(&recv_status, tag(9));
941   Verifier().Expect(9, true).Verify(cq_.get());
942 
943   EXPECT_TRUE(recv_status.ok());
944 }
945 
946 // Metadata tests
TEST_P(AsyncEnd2endTest,ClientInitialMetadataRpc)947 TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
948   ResetStub();
949 
950   EchoRequest send_request;
951   EchoRequest recv_request;
952   EchoResponse send_response;
953   EchoResponse recv_response;
954   Status recv_status;
955 
956   ClientContext cli_ctx;
957   ServerContext srv_ctx;
958   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
959 
960   send_request.set_message(GetParam().message_content);
961   std::pair<std::string, std::string> meta1("key1", "val1");
962   std::pair<std::string, std::string> meta2("key2", "val2");
963   std::pair<std::string, std::string> meta3("g.r.d-bin", "xyz");
964   cli_ctx.AddMetadata(meta1.first, meta1.second);
965   cli_ctx.AddMetadata(meta2.first, meta2.second);
966   cli_ctx.AddMetadata(meta3.first, meta3.second);
967 
968   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
969       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
970   response_reader->Finish(&recv_response, &recv_status, tag(4));
971 
972   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
973                         cq_.get(), tag(2));
974   Verifier().Expect(2, true).Verify(cq_.get());
975   EXPECT_EQ(send_request.message(), recv_request.message());
976   const auto& client_initial_metadata = srv_ctx.client_metadata();
977   EXPECT_EQ(meta1.second,
978             ToString(client_initial_metadata.find(meta1.first)->second));
979   EXPECT_EQ(meta2.second,
980             ToString(client_initial_metadata.find(meta2.first)->second));
981   EXPECT_EQ(meta3.second,
982             ToString(client_initial_metadata.find(meta3.first)->second));
983   EXPECT_GE(client_initial_metadata.size(), 2);
984 
985   send_response.set_message(recv_request.message());
986   response_writer.Finish(send_response, Status::OK, tag(3));
987   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
988 
989   EXPECT_EQ(send_response.message(), recv_response.message());
990   EXPECT_TRUE(recv_status.ok());
991 }
992 
TEST_P(AsyncEnd2endTest,ServerInitialMetadataRpc)993 TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
994   ResetStub();
995 
996   EchoRequest send_request;
997   EchoRequest recv_request;
998   EchoResponse send_response;
999   EchoResponse recv_response;
1000   Status recv_status;
1001 
1002   ClientContext cli_ctx;
1003   ServerContext srv_ctx;
1004   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1005 
1006   send_request.set_message(GetParam().message_content);
1007   std::pair<std::string, std::string> meta1("key1", "val1");
1008   std::pair<std::string, std::string> meta2("key2", "val2");
1009 
1010   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1011       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1012   response_reader->ReadInitialMetadata(tag(4));
1013 
1014   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1015                         cq_.get(), tag(2));
1016   Verifier().Expect(2, true).Verify(cq_.get());
1017   EXPECT_EQ(send_request.message(), recv_request.message());
1018   srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1019   srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1020   response_writer.SendInitialMetadata(tag(3));
1021   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1022   const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1023   EXPECT_EQ(meta1.second,
1024             ToString(server_initial_metadata.find(meta1.first)->second));
1025   EXPECT_EQ(meta2.second,
1026             ToString(server_initial_metadata.find(meta2.first)->second));
1027   EXPECT_EQ(2, server_initial_metadata.size());
1028 
1029   send_response.set_message(recv_request.message());
1030   response_writer.Finish(send_response, Status::OK, tag(5));
1031   response_reader->Finish(&recv_response, &recv_status, tag(6));
1032   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1033 
1034   EXPECT_EQ(send_response.message(), recv_response.message());
1035   EXPECT_TRUE(recv_status.ok());
1036 }
1037 
1038 // 1 ping, 2 pongs.
TEST_P(AsyncEnd2endTest,ServerInitialMetadataServerStreaming)1039 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreaming) {
1040   ResetStub();
1041   EchoRequest send_request;
1042   EchoRequest recv_request;
1043   EchoResponse send_response;
1044   EchoResponse recv_response;
1045   Status recv_status;
1046   ClientContext cli_ctx;
1047   ServerContext srv_ctx;
1048   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1049 
1050   std::pair<::std::string, ::std::string> meta1("key1", "val1");
1051   std::pair<::std::string, ::std::string> meta2("key2", "val2");
1052 
1053   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1054       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1055   cli_stream->ReadInitialMetadata(tag(11));
1056   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1057                                   cq_.get(), cq_.get(), tag(2));
1058 
1059   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1060 
1061   srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1062   srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1063   srv_stream.SendInitialMetadata(tag(10));
1064   Verifier().Expect(10, true).Expect(11, true).Verify(cq_.get());
1065   auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1066   EXPECT_EQ(meta1.second,
1067             ToString(server_initial_metadata.find(meta1.first)->second));
1068   EXPECT_EQ(meta2.second,
1069             ToString(server_initial_metadata.find(meta2.first)->second));
1070   EXPECT_EQ(2, server_initial_metadata.size());
1071 
1072   srv_stream.Write(send_response, tag(3));
1073 
1074   cli_stream->Read(&recv_response, tag(4));
1075   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1076 
1077   srv_stream.Write(send_response, tag(5));
1078   cli_stream->Read(&recv_response, tag(6));
1079   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1080 
1081   srv_stream.Finish(Status::OK, tag(7));
1082   cli_stream->Read(&recv_response, tag(8));
1083   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1084 
1085   cli_stream->Finish(&recv_status, tag(9));
1086   Verifier().Expect(9, true).Verify(cq_.get());
1087 
1088   EXPECT_TRUE(recv_status.ok());
1089 }
1090 
1091 // 1 ping, 2 pongs.
1092 // Test for server initial metadata being sent implicitly
TEST_P(AsyncEnd2endTest,ServerInitialMetadataServerStreamingImplicit)1093 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreamingImplicit) {
1094   ResetStub();
1095   EchoRequest send_request;
1096   EchoRequest recv_request;
1097   EchoResponse send_response;
1098   EchoResponse recv_response;
1099   Status recv_status;
1100   ClientContext cli_ctx;
1101   ServerContext srv_ctx;
1102   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1103 
1104   send_request.set_message(GetParam().message_content);
1105   std::pair<::std::string, ::std::string> meta1("key1", "val1");
1106   std::pair<::std::string, ::std::string> meta2("key2", "val2");
1107 
1108   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1109       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1110   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1111                                   cq_.get(), cq_.get(), tag(2));
1112 
1113   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1114   EXPECT_EQ(send_request.message(), recv_request.message());
1115 
1116   srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1117   srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1118   send_response.set_message(recv_request.message());
1119   srv_stream.Write(send_response, tag(3));
1120 
1121   cli_stream->Read(&recv_response, tag(4));
1122   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1123   EXPECT_EQ(send_response.message(), recv_response.message());
1124 
1125   auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1126   EXPECT_EQ(meta1.second,
1127             ToString(server_initial_metadata.find(meta1.first)->second));
1128   EXPECT_EQ(meta2.second,
1129             ToString(server_initial_metadata.find(meta2.first)->second));
1130   EXPECT_EQ(2, server_initial_metadata.size());
1131 
1132   srv_stream.Write(send_response, tag(5));
1133   cli_stream->Read(&recv_response, tag(6));
1134   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1135 
1136   srv_stream.Finish(Status::OK, tag(7));
1137   cli_stream->Read(&recv_response, tag(8));
1138   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1139 
1140   cli_stream->Finish(&recv_status, tag(9));
1141   Verifier().Expect(9, true).Verify(cq_.get());
1142 
1143   EXPECT_TRUE(recv_status.ok());
1144 }
1145 
TEST_P(AsyncEnd2endTest,ServerTrailingMetadataRpc)1146 TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
1147   ResetStub();
1148 
1149   EchoRequest send_request;
1150   EchoRequest recv_request;
1151   EchoResponse send_response;
1152   EchoResponse recv_response;
1153   Status recv_status;
1154 
1155   ClientContext cli_ctx;
1156   ServerContext srv_ctx;
1157   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1158 
1159   send_request.set_message(GetParam().message_content);
1160   std::pair<std::string, std::string> meta1("key1", "val1");
1161   std::pair<std::string, std::string> meta2("key2", "val2");
1162 
1163   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1164       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1165   response_reader->Finish(&recv_response, &recv_status, tag(5));
1166 
1167   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1168                         cq_.get(), tag(2));
1169   Verifier().Expect(2, true).Verify(cq_.get());
1170   EXPECT_EQ(send_request.message(), recv_request.message());
1171   response_writer.SendInitialMetadata(tag(3));
1172   Verifier().Expect(3, true).Verify(cq_.get());
1173 
1174   send_response.set_message(recv_request.message());
1175   srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
1176   srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
1177   response_writer.Finish(send_response, Status::OK, tag(4));
1178 
1179   Verifier().Expect(4, true).Expect(5, true).Verify(cq_.get());
1180 
1181   EXPECT_EQ(send_response.message(), recv_response.message());
1182   EXPECT_TRUE(recv_status.ok());
1183   const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1184   EXPECT_EQ(meta1.second,
1185             ToString(server_trailing_metadata.find(meta1.first)->second));
1186   EXPECT_EQ(meta2.second,
1187             ToString(server_trailing_metadata.find(meta2.first)->second));
1188   EXPECT_EQ(2, server_trailing_metadata.size());
1189 }
1190 
TEST_P(AsyncEnd2endTest,MetadataRpc)1191 TEST_P(AsyncEnd2endTest, MetadataRpc) {
1192   ResetStub();
1193 
1194   EchoRequest send_request;
1195   EchoRequest recv_request;
1196   EchoResponse send_response;
1197   EchoResponse recv_response;
1198   Status recv_status;
1199 
1200   ClientContext cli_ctx;
1201   ServerContext srv_ctx;
1202   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1203 
1204   send_request.set_message(GetParam().message_content);
1205   std::pair<std::string, std::string> meta1("key1", "val1");
1206   std::pair<std::string, std::string> meta2(
1207       "key2-bin",
1208       std::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
1209   std::pair<std::string, std::string> meta3("key3", "val3");
1210   std::pair<std::string, std::string> meta6(
1211       "key4-bin",
1212       std::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
1213                   14));
1214   std::pair<std::string, std::string> meta5("key5", "val5");
1215   std::pair<std::string, std::string> meta4(
1216       "key6-bin",
1217       std::string(
1218           "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
1219 
1220   cli_ctx.AddMetadata(meta1.first, meta1.second);
1221   cli_ctx.AddMetadata(meta2.first, meta2.second);
1222 
1223   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1224       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1225   response_reader->ReadInitialMetadata(tag(4));
1226 
1227   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1228                         cq_.get(), tag(2));
1229   Verifier().Expect(2, true).Verify(cq_.get());
1230   EXPECT_EQ(send_request.message(), recv_request.message());
1231   const auto& client_initial_metadata = srv_ctx.client_metadata();
1232   EXPECT_EQ(meta1.second,
1233             ToString(client_initial_metadata.find(meta1.first)->second));
1234   EXPECT_EQ(meta2.second,
1235             ToString(client_initial_metadata.find(meta2.first)->second));
1236   EXPECT_GE(client_initial_metadata.size(), 2);
1237 
1238   srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
1239   srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
1240   response_writer.SendInitialMetadata(tag(3));
1241   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1242   const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1243   EXPECT_EQ(meta3.second,
1244             ToString(server_initial_metadata.find(meta3.first)->second));
1245   EXPECT_EQ(meta4.second,
1246             ToString(server_initial_metadata.find(meta4.first)->second));
1247   EXPECT_GE(server_initial_metadata.size(), 2);
1248 
1249   send_response.set_message(recv_request.message());
1250   srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
1251   srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
1252   response_writer.Finish(send_response, Status::OK, tag(5));
1253   response_reader->Finish(&recv_response, &recv_status, tag(6));
1254 
1255   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1256 
1257   EXPECT_EQ(send_response.message(), recv_response.message());
1258   EXPECT_TRUE(recv_status.ok());
1259   const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1260   EXPECT_EQ(meta5.second,
1261             ToString(server_trailing_metadata.find(meta5.first)->second));
1262   EXPECT_EQ(meta6.second,
1263             ToString(server_trailing_metadata.find(meta6.first)->second));
1264   EXPECT_GE(server_trailing_metadata.size(), 2);
1265 }
1266 
1267 // Server uses AsyncNotifyWhenDone API to check for cancellation
TEST_P(AsyncEnd2endTest,ServerCheckCancellation)1268 TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
1269   ResetStub();
1270 
1271   EchoRequest send_request;
1272   EchoRequest recv_request;
1273   EchoResponse send_response;
1274   EchoResponse recv_response;
1275   Status recv_status;
1276 
1277   ClientContext cli_ctx;
1278   ServerContext srv_ctx;
1279   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1280 
1281   send_request.set_message(GetParam().message_content);
1282   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1283       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1284   response_reader->Finish(&recv_response, &recv_status, tag(4));
1285 
1286   srv_ctx.AsyncNotifyWhenDone(tag(5));
1287   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1288                         cq_.get(), tag(2));
1289 
1290   Verifier().Expect(2, true).Verify(cq_.get());
1291   EXPECT_EQ(send_request.message(), recv_request.message());
1292 
1293   cli_ctx.TryCancel();
1294   Verifier().Expect(5, true).Expect(4, true).Verify(cq_.get());
1295   EXPECT_TRUE(srv_ctx.IsCancelled());
1296 
1297   EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
1298 }
1299 
1300 // Server uses AsyncNotifyWhenDone API to check for normal finish
TEST_P(AsyncEnd2endTest,ServerCheckDone)1301 TEST_P(AsyncEnd2endTest, ServerCheckDone) {
1302   ResetStub();
1303 
1304   EchoRequest send_request;
1305   EchoRequest recv_request;
1306   EchoResponse send_response;
1307   EchoResponse recv_response;
1308   Status recv_status;
1309 
1310   ClientContext cli_ctx;
1311   ServerContext srv_ctx;
1312   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1313 
1314   send_request.set_message(GetParam().message_content);
1315   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1316       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1317   response_reader->Finish(&recv_response, &recv_status, tag(4));
1318 
1319   srv_ctx.AsyncNotifyWhenDone(tag(5));
1320   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1321                         cq_.get(), tag(2));
1322 
1323   Verifier().Expect(2, true).Verify(cq_.get());
1324   EXPECT_EQ(send_request.message(), recv_request.message());
1325 
1326   send_response.set_message(recv_request.message());
1327   response_writer.Finish(send_response, Status::OK, tag(3));
1328   Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
1329   EXPECT_FALSE(srv_ctx.IsCancelled());
1330 
1331   EXPECT_EQ(send_response.message(), recv_response.message());
1332   EXPECT_TRUE(recv_status.ok());
1333 }
1334 
TEST_P(AsyncEnd2endTest,UnimplementedRpc)1335 TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
1336   ChannelArguments args;
1337   const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1338       GetParam().credentials_type, &args);
1339   std::shared_ptr<Channel> channel =
1340       !(GetParam().inproc) ? grpc::CreateCustomChannel(server_address_.str(),
1341                                                        channel_creds, args)
1342                            : server_->InProcessChannel(args);
1343   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1344   stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1345   EchoRequest send_request;
1346   EchoResponse recv_response;
1347   Status recv_status;
1348 
1349   ClientContext cli_ctx;
1350   send_request.set_message(GetParam().message_content);
1351   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1352       stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
1353 
1354   response_reader->Finish(&recv_response, &recv_status, tag(4));
1355   Verifier().Expect(4, true).Verify(cq_.get());
1356 
1357   EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
1358   EXPECT_EQ("", recv_status.error_message());
1359 }
1360 
1361 // This class is for testing scenarios where RPCs are cancelled on the server
1362 // by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
1363 // API to check for cancellation
1364 class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
1365  protected:
1366   typedef enum {
1367     DO_NOT_CANCEL = 0,
1368     CANCEL_BEFORE_PROCESSING,
1369     CANCEL_DURING_PROCESSING,
1370     CANCEL_AFTER_PROCESSING
1371   } ServerTryCancelRequestPhase;
1372 
1373   // Helper for testing client-streaming RPCs which are cancelled on the server.
1374   // Depending on the value of server_try_cancel parameter, this will test one
1375   // of the following three scenarios:
1376   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
1377   //   any messages from the client
1378   //
1379   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1380   //   messages from the client
1381   //
1382   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1383   //   messages from the client (but before sending any status back to the
1384   //   client)
TestClientStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1385   void TestClientStreamingServerCancel(
1386       ServerTryCancelRequestPhase server_try_cancel) {
1387     ResetStub();
1388 
1389     EchoRequest recv_request;
1390     EchoResponse send_response;
1391     EchoResponse recv_response;
1392     Status recv_status;
1393 
1394     ClientContext cli_ctx;
1395     ServerContext srv_ctx;
1396     ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1397 
1398     // Initiate the 'RequestStream' call on client
1399     CompletionQueue cli_cq;
1400 
1401     std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
1402         stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1)));
1403 
1404     // On the server, request to be notified of 'RequestStream' calls
1405     // and receive the 'RequestStream' call just made by the client
1406     srv_ctx.AsyncNotifyWhenDone(tag(11));
1407     service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1408                                    tag(2));
1409     std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1410     Verifier().Expect(2, true).Verify(cq_.get());
1411     t1.join();
1412 
1413     bool expected_server_cq_result = true;
1414     bool expected_client_cq_result = true;
1415 
1416     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1417       srv_ctx.TryCancel();
1418       Verifier().Expect(11, true).Verify(cq_.get());
1419       EXPECT_TRUE(srv_ctx.IsCancelled());
1420 
1421       // Since cancellation is done before server reads any results, we know
1422       // for sure that all server cq results will return false from this
1423       // point forward
1424       expected_server_cq_result = false;
1425       expected_client_cq_result = false;
1426     }
1427 
1428     bool ignore_client_cq_result =
1429         (server_try_cancel == CANCEL_DURING_PROCESSING) ||
1430         (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1431 
1432     std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1433                             &ignore_client_cq_result] {
1434       EchoRequest send_request;
1435       // Client sends 3 messages (tags 3, 4 and 5)
1436       for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1437         send_request.set_message("Ping " + std::to_string(tag_idx));
1438         cli_stream->Write(send_request, tag(tag_idx));
1439         Verifier()
1440             .Expect(tag_idx, expected_client_cq_result)
1441             .Verify(&cli_cq, ignore_client_cq_result);
1442       }
1443       cli_stream->WritesDone(tag(6));
1444       // Ignore ok on WritesDone since cancel can affect it
1445       Verifier()
1446           .Expect(6, expected_client_cq_result)
1447           .Verify(&cli_cq, ignore_client_cq_result);
1448     });
1449 
1450     bool ignore_cq_result = false;
1451     bool want_done_tag = false;
1452     std::thread* server_try_cancel_thd = nullptr;
1453 
1454     auto verif = Verifier();
1455 
1456     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1457       server_try_cancel_thd =
1458           new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1459       // Server will cancel the RPC in a parallel thread while reading the
1460       // requests from the client. Since the cancellation can happen at anytime,
1461       // some of the cq results (i.e those until cancellation) might be true but
1462       // its non deterministic. So better to ignore the cq results
1463       ignore_cq_result = true;
1464       // Expect that we might possibly see the done tag that
1465       // indicates cancellation completion in this case
1466       want_done_tag = true;
1467       verif.Expect(11, true);
1468     }
1469 
1470     // Server reads 3 messages (tags 6, 7 and 8)
1471     // But if want_done_tag is true, we might also see tag 11
1472     for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1473       srv_stream.Read(&recv_request, tag(tag_idx));
1474       // Note that we'll add something to the verifier and verify that
1475       // something was seen, but it might be tag 11 and not what we
1476       // just added
1477       int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
1478                         .Next(cq_.get(), ignore_cq_result);
1479       GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1480       if (got_tag == 11) {
1481         EXPECT_TRUE(srv_ctx.IsCancelled());
1482         want_done_tag = false;
1483         // Now get the other entry that we were waiting on
1484         EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1485       }
1486     }
1487 
1488     cli_thread.join();
1489 
1490     if (server_try_cancel_thd != nullptr) {
1491       server_try_cancel_thd->join();
1492       delete server_try_cancel_thd;
1493     }
1494 
1495     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1496       srv_ctx.TryCancel();
1497       want_done_tag = true;
1498       verif.Expect(11, true);
1499     }
1500 
1501     if (want_done_tag) {
1502       verif.Verify(cq_.get());
1503       EXPECT_TRUE(srv_ctx.IsCancelled());
1504       want_done_tag = false;
1505     }
1506 
1507     // The RPC has been cancelled at this point for sure (i.e irrespective of
1508     // the value of `server_try_cancel` is). So, from this point forward, we
1509     // know that cq results are supposed to return false on server.
1510 
1511     // Server sends the final message and cancelled status (but the RPC is
1512     // already cancelled at this point. So we expect the operation to fail)
1513     srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
1514     Verifier().Expect(9, false).Verify(cq_.get());
1515 
1516     // Client will see the cancellation
1517     cli_stream->Finish(&recv_status, tag(10));
1518     Verifier().Expect(10, true).Verify(&cli_cq);
1519     EXPECT_FALSE(recv_status.ok());
1520     EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1521 
1522     cli_cq.Shutdown();
1523     void* phony_tag;
1524     bool phony_ok;
1525     while (cli_cq.Next(&phony_tag, &phony_ok)) {
1526     }
1527   }
1528 
1529   // Helper for testing server-streaming RPCs which are cancelled on the server.
1530   // Depending on the value of server_try_cancel parameter, this will test one
1531   // of the following three scenarios:
1532   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
1533   //   any messages to the client
1534   //
1535   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
1536   //   messages to the client
1537   //
1538   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
1539   //   messages to the client (but before sending any status back to the
1540   //   client)
TestServerStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1541   void TestServerStreamingServerCancel(
1542       ServerTryCancelRequestPhase server_try_cancel) {
1543     ResetStub();
1544 
1545     EchoRequest send_request;
1546     EchoRequest recv_request;
1547     EchoResponse send_response;
1548     Status recv_status;
1549     ClientContext cli_ctx;
1550     ServerContext srv_ctx;
1551     ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1552 
1553     send_request.set_message("Ping");
1554     // Initiate the 'ResponseStream' call on the client
1555     CompletionQueue cli_cq;
1556     std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1557         stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1)));
1558     // On the server, request to be notified of 'ResponseStream' calls and
1559     // receive the call just made by the client
1560     srv_ctx.AsyncNotifyWhenDone(tag(11));
1561     service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1562                                     cq_.get(), cq_.get(), tag(2));
1563 
1564     std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1565     Verifier().Expect(2, true).Verify(cq_.get());
1566     t1.join();
1567 
1568     EXPECT_EQ(send_request.message(), recv_request.message());
1569 
1570     bool expected_cq_result = true;
1571     bool ignore_cq_result = false;
1572     bool want_done_tag = false;
1573     bool expected_client_cq_result = true;
1574     bool ignore_client_cq_result =
1575         (server_try_cancel != CANCEL_BEFORE_PROCESSING);
1576 
1577     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1578       srv_ctx.TryCancel();
1579       Verifier().Expect(11, true).Verify(cq_.get());
1580       EXPECT_TRUE(srv_ctx.IsCancelled());
1581 
1582       // We know for sure that all cq results will be false from this point
1583       // since the server cancelled the RPC
1584       expected_cq_result = false;
1585       expected_client_cq_result = false;
1586     }
1587 
1588     std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1589                             &ignore_client_cq_result] {
1590       // Client attempts to read the three messages from the server
1591       for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1592         EchoResponse recv_response;
1593         cli_stream->Read(&recv_response, tag(tag_idx));
1594         Verifier()
1595             .Expect(tag_idx, expected_client_cq_result)
1596             .Verify(&cli_cq, ignore_client_cq_result);
1597       }
1598     });
1599 
1600     std::thread* server_try_cancel_thd = nullptr;
1601 
1602     auto verif = Verifier();
1603 
1604     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1605       server_try_cancel_thd =
1606           new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1607 
1608       // Server will cancel the RPC in a parallel thread while writing responses
1609       // to the client. Since the cancellation can happen at anytime, some of
1610       // the cq results (i.e those until cancellation) might be true but it is
1611       // non deterministic. So better to ignore the cq results
1612       ignore_cq_result = true;
1613       // Expect that we might possibly see the done tag that
1614       // indicates cancellation completion in this case
1615       want_done_tag = true;
1616       verif.Expect(11, true);
1617     }
1618 
1619     // Server sends three messages (tags 3, 4 and 5)
1620     // But if want_done tag is true, we might also see tag 11
1621     for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1622       send_response.set_message("Pong " + std::to_string(tag_idx));
1623       srv_stream.Write(send_response, tag(tag_idx));
1624       // Note that we'll add something to the verifier and verify that
1625       // something was seen, but it might be tag 11 and not what we
1626       // just added
1627       int got_tag = verif.Expect(tag_idx, expected_cq_result)
1628                         .Next(cq_.get(), ignore_cq_result);
1629       GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1630       if (got_tag == 11) {
1631         EXPECT_TRUE(srv_ctx.IsCancelled());
1632         want_done_tag = false;
1633         // Now get the other entry that we were waiting on
1634         EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1635       }
1636     }
1637 
1638     if (server_try_cancel_thd != nullptr) {
1639       server_try_cancel_thd->join();
1640       delete server_try_cancel_thd;
1641     }
1642 
1643     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1644       srv_ctx.TryCancel();
1645       want_done_tag = true;
1646       verif.Expect(11, true);
1647     }
1648 
1649     if (want_done_tag) {
1650       verif.Verify(cq_.get());
1651       EXPECT_TRUE(srv_ctx.IsCancelled());
1652       want_done_tag = false;
1653     }
1654 
1655     cli_thread.join();
1656 
1657     // The RPC has been cancelled at this point for sure (i.e irrespective of
1658     // the value of `server_try_cancel` is). So, from this point forward, we
1659     // know that cq results are supposed to return false on server.
1660 
1661     // Server finishes the stream (but the RPC is already cancelled)
1662     srv_stream.Finish(Status::CANCELLED, tag(9));
1663     Verifier().Expect(9, false).Verify(cq_.get());
1664 
1665     // Client will see the cancellation
1666     cli_stream->Finish(&recv_status, tag(10));
1667     Verifier().Expect(10, true).Verify(&cli_cq);
1668     EXPECT_FALSE(recv_status.ok());
1669     EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1670 
1671     cli_cq.Shutdown();
1672     void* phony_tag;
1673     bool phony_ok;
1674     while (cli_cq.Next(&phony_tag, &phony_ok)) {
1675     }
1676   }
1677 
1678   // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1679   // server.
1680   //
1681   // Depending on the value of server_try_cancel parameter, this will
1682   // test one of the following three scenarios:
1683   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1684   //   writing any messages from/to the client
1685   //
1686   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1687   //   messages from the client
1688   //
1689   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1690   //   messages from the client (but before sending any status back to the
1691   //   client)
TestBidiStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1692   void TestBidiStreamingServerCancel(
1693       ServerTryCancelRequestPhase server_try_cancel) {
1694     ResetStub();
1695 
1696     EchoRequest send_request;
1697     EchoRequest recv_request;
1698     EchoResponse send_response;
1699     EchoResponse recv_response;
1700     Status recv_status;
1701     ClientContext cli_ctx;
1702     ServerContext srv_ctx;
1703     ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1704 
1705     // Initiate the call from the client side
1706     std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
1707         cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
1708 
1709     // On the server, request to be notified of the 'BidiStream' call and
1710     // receive the call just made by the client
1711     srv_ctx.AsyncNotifyWhenDone(tag(11));
1712     service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1713                                 tag(2));
1714     Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1715 
1716     auto verif = Verifier();
1717 
1718     // Client sends the first and the only message
1719     send_request.set_message("Ping");
1720     cli_stream->Write(send_request, tag(3));
1721     verif.Expect(3, true);
1722 
1723     bool expected_cq_result = true;
1724     bool ignore_cq_result = false;
1725     bool want_done_tag = false;
1726 
1727     int got_tag, got_tag2;
1728     bool tag_3_done = false;
1729 
1730     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1731       srv_ctx.TryCancel();
1732       verif.Expect(11, true);
1733       // We know for sure that all server cq results will be false from
1734       // this point since the server cancelled the RPC. However, we can't
1735       // say for sure about the client
1736       expected_cq_result = false;
1737       ignore_cq_result = true;
1738 
1739       do {
1740         got_tag = verif.Next(cq_.get(), ignore_cq_result);
1741         GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
1742         if (got_tag == 3) {
1743           tag_3_done = true;
1744         }
1745       } while (got_tag != 11);
1746       EXPECT_TRUE(srv_ctx.IsCancelled());
1747     }
1748 
1749     std::thread* server_try_cancel_thd = nullptr;
1750 
1751     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1752       server_try_cancel_thd =
1753           new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1754 
1755       // Since server is going to cancel the RPC in a parallel thread, some of
1756       // the cq results (i.e those until the cancellation) might be true. Since
1757       // that number is non-deterministic, it is better to ignore the cq results
1758       ignore_cq_result = true;
1759       // Expect that we might possibly see the done tag that
1760       // indicates cancellation completion in this case
1761       want_done_tag = true;
1762       verif.Expect(11, true);
1763     }
1764 
1765     srv_stream.Read(&recv_request, tag(4));
1766     verif.Expect(4, expected_cq_result);
1767     got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result);
1768     got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1769     GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
1770                (got_tag == 11 && want_done_tag));
1771     GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
1772                (got_tag2 == 11 && want_done_tag));
1773     // If we get 3 and 4, we don't need to wait for 11, but if
1774     // we get 11, we should also clear 3 and 4
1775     if (got_tag + got_tag2 != 7) {
1776       EXPECT_TRUE(srv_ctx.IsCancelled());
1777       want_done_tag = false;
1778       got_tag = verif.Next(cq_.get(), ignore_cq_result);
1779       GPR_ASSERT((got_tag == 3) || (got_tag == 4));
1780     }
1781 
1782     send_response.set_message("Pong");
1783     srv_stream.Write(send_response, tag(5));
1784     verif.Expect(5, expected_cq_result);
1785 
1786     cli_stream->Read(&recv_response, tag(6));
1787     verif.Expect(6, expected_cq_result);
1788     got_tag = verif.Next(cq_.get(), ignore_cq_result);
1789     got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1790     GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
1791                (got_tag == 11 && want_done_tag));
1792     GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
1793                (got_tag2 == 11 && want_done_tag));
1794     // If we get 5 and 6, we don't need to wait for 11, but if
1795     // we get 11, we should also clear 5 and 6
1796     if (got_tag + got_tag2 != 11) {
1797       EXPECT_TRUE(srv_ctx.IsCancelled());
1798       want_done_tag = false;
1799       got_tag = verif.Next(cq_.get(), ignore_cq_result);
1800       GPR_ASSERT((got_tag == 5) || (got_tag == 6));
1801     }
1802 
1803     // This is expected to succeed in all cases
1804     cli_stream->WritesDone(tag(7));
1805     verif.Expect(7, true);
1806     // TODO(vjpai): Consider whether the following is too flexible
1807     // or whether it should just be reset to ignore_cq_result
1808     bool ignore_cq_wd_result =
1809         ignore_cq_result || (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1810     got_tag = verif.Next(cq_.get(), ignore_cq_wd_result);
1811     GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1812     if (got_tag == 11) {
1813       EXPECT_TRUE(srv_ctx.IsCancelled());
1814       want_done_tag = false;
1815       // Now get the other entry that we were waiting on
1816       EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_wd_result), 7);
1817     }
1818 
1819     // This is expected to fail in all cases i.e for all values of
1820     // server_try_cancel. This is because at this point, either there are no
1821     // more msgs from the client (because client called WritesDone) or the RPC
1822     // is cancelled on the server
1823     srv_stream.Read(&recv_request, tag(8));
1824     verif.Expect(8, false);
1825     got_tag = verif.Next(cq_.get(), ignore_cq_result);
1826     GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1827     if (got_tag == 11) {
1828       EXPECT_TRUE(srv_ctx.IsCancelled());
1829       want_done_tag = false;
1830       // Now get the other entry that we were waiting on
1831       EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1832     }
1833 
1834     if (server_try_cancel_thd != nullptr) {
1835       server_try_cancel_thd->join();
1836       delete server_try_cancel_thd;
1837     }
1838 
1839     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1840       srv_ctx.TryCancel();
1841       want_done_tag = true;
1842       verif.Expect(11, true);
1843     }
1844 
1845     if (want_done_tag) {
1846       verif.Verify(cq_.get());
1847       EXPECT_TRUE(srv_ctx.IsCancelled());
1848       want_done_tag = false;
1849     }
1850 
1851     // The RPC has been cancelled at this point for sure (i.e irrespective of
1852     // the value of `server_try_cancel` is). So, from this point forward, we
1853     // know that cq results are supposed to return false on server.
1854 
1855     srv_stream.Finish(Status::CANCELLED, tag(9));
1856     Verifier().Expect(9, false).Verify(cq_.get());
1857 
1858     cli_stream->Finish(&recv_status, tag(10));
1859     Verifier().Expect(10, true).Verify(cq_.get());
1860     EXPECT_FALSE(recv_status.ok());
1861     EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1862   }
1863 };
1864 
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelBefore)1865 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1866   TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1867 }
1868 
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelDuring)1869 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1870   TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1871 }
1872 
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelAfter)1873 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1874   TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1875 }
1876 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelBefore)1877 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1878   TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1879 }
1880 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelDuring)1881 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1882   TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1883 }
1884 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelAfter)1885 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1886   TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1887 }
1888 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelBefore)1889 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1890   TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1891 }
1892 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelDuring)1893 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1894   TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1895 }
1896 
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelAfter)1897 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1898   TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1899 }
1900 
CreateTestScenarios(bool,bool test_message_size_limit)1901 std::vector<TestScenario> CreateTestScenarios(bool /*test_secure*/,
1902                                               bool test_message_size_limit) {
1903   std::vector<TestScenario> scenarios;
1904   std::vector<std::string> credentials_types;
1905   std::vector<std::string> messages;
1906 
1907   auto insec_ok = [] {
1908     // Only allow insecure credentials type when it is registered with the
1909     // provider. User may create providers that do not have insecure.
1910     return GetCredentialsProvider()->GetChannelCredentials(
1911                kInsecureCredentialsType, nullptr) != nullptr;
1912   };
1913 
1914   if (insec_ok()) {
1915     credentials_types.push_back(kInsecureCredentialsType);
1916   }
1917   auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList();
1918   for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
1919     credentials_types.push_back(*sec);
1920   }
1921   GPR_ASSERT(!credentials_types.empty());
1922 
1923   messages.push_back("Hello");
1924   if (test_message_size_limit) {
1925     for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024;
1926          k *= 32) {
1927       std::string big_msg;
1928       for (size_t i = 0; i < k * 1024; ++i) {
1929         char c = 'a' + (i % 26);
1930         big_msg += c;
1931       }
1932       messages.push_back(big_msg);
1933     }
1934     if (!BuiltUnderMsan()) {
1935       // 4MB message processing with SSL is very slow under msan
1936       // (causes timeouts) and doesn't really increase the signal from tests.
1937       // Reserve 100 bytes for other fields of the message proto.
1938       messages.push_back(
1939           std::string(GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH - 100, 'a'));
1940     }
1941   }
1942 
1943   // TODO (sreek) Renable tests with health check service after the issue
1944   // https://github.com/grpc/grpc/issues/11223 is resolved
1945   for (auto health_check_service : {false}) {
1946     for (auto msg = messages.begin(); msg != messages.end(); msg++) {
1947       for (auto cred = credentials_types.begin();
1948            cred != credentials_types.end(); ++cred) {
1949         scenarios.emplace_back(false, *cred, health_check_service, *msg);
1950       }
1951       if (insec_ok()) {
1952         scenarios.emplace_back(true, kInsecureCredentialsType,
1953                                health_check_service, *msg);
1954       }
1955     }
1956   }
1957   return scenarios;
1958 }
1959 
1960 INSTANTIATE_TEST_SUITE_P(AsyncEnd2end, AsyncEnd2endTest,
1961                          ::testing::ValuesIn(CreateTestScenarios(true, true)));
1962 INSTANTIATE_TEST_SUITE_P(AsyncEnd2endServerTryCancel,
1963                          AsyncEnd2endServerTryCancelTest,
1964                          ::testing::ValuesIn(CreateTestScenarios(false,
1965                                                                  false)));
1966 
1967 }  // namespace
1968 }  // namespace testing
1969 }  // namespace grpc
1970 
main(int argc,char ** argv)1971 int main(int argc, char** argv) {
1972   // Change the backup poll interval from 5s to 100ms to speed up the
1973   // ReconnectChannel test
1974   grpc_core::ConfigVars::Overrides overrides;
1975   overrides.client_channel_backup_poll_interval_ms = 100;
1976   grpc_core::ConfigVars::SetOverrides(overrides);
1977   grpc::testing::TestEnvironment env(&argc, argv);
1978   ::testing::InitGoogleTest(&argc, argv);
1979   int ret = RUN_ALL_TESTS();
1980   return ret;
1981 }
1982