1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <cinttypes>
20 #include <memory>
21 #include <thread>
22
23 #include "absl/memory/memory.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/strings/str_format.h"
26
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/create_channel.h>
34 #include <grpcpp/ext/health_check_service_server_builder_option.h>
35 #include <grpcpp/server.h>
36 #include <grpcpp/server_builder.h>
37 #include <grpcpp/server_context.h>
38
39 #include "src/core/client_channel/backup_poller.h"
40 #include "src/core/lib/config/config_vars.h"
41 #include "src/core/lib/gprpp/crash.h"
42 #include "src/core/lib/gprpp/debug_location.h"
43 #include "src/core/lib/iomgr/port.h"
44 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
45 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
46 #include "src/proto/grpc/testing/echo.grpc.pb.h"
47 #include "test/core/util/build.h"
48 #include "test/core/util/port.h"
49 #include "test/core/util/test_config.h"
50 #include "test/cpp/util/string_ref_helper.h"
51 #include "test/cpp/util/test_credentials_provider.h"
52
53 #ifdef GRPC_POSIX_SOCKET_EV
54 #include "src/core/lib/iomgr/ev_posix.h"
55 #endif // GRPC_POSIX_SOCKET_EV
56
57 #include <gtest/gtest.h>
58
59 using grpc::testing::EchoRequest;
60 using grpc::testing::EchoResponse;
61 using std::chrono::system_clock;
62
63 namespace grpc {
64 namespace testing {
65
66 namespace {
67
tag(int t)68 void* tag(int t) { return reinterpret_cast<void*>(t); }
detag(void * p)69 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
70
71 class Verifier {
72 public:
Verifier()73 Verifier() : lambda_run_(false) {}
74 // Expect sets the expected ok value for a specific tag
Expect(int i,bool expect_ok,grpc_core::SourceLocation whence=grpc_core::SourceLocation ())75 Verifier& Expect(
76 int i, bool expect_ok,
77 grpc_core::SourceLocation whence = grpc_core::SourceLocation()) {
78 return ExpectUnless(i, expect_ok, false, whence);
79 }
80 // ExpectUnless sets the expected ok value for a specific tag
81 // unless the tag was already marked seen (as a result of ExpectMaybe)
ExpectUnless(int i,bool expect_ok,bool seen,grpc_core::SourceLocation whence=grpc_core::SourceLocation ())82 Verifier& ExpectUnless(
83 int i, bool expect_ok, bool seen,
84 grpc_core::SourceLocation whence = grpc_core::SourceLocation()) {
85 if (!seen) {
86 expectations_[tag(i)] = {expect_ok, whence};
87 }
88 return *this;
89 }
90 // ExpectMaybe sets the expected ok value for a specific tag, but does not
91 // require it to appear
92 // If it does, sets *seen to true
ExpectMaybe(int i,bool expect_ok,bool * seen,grpc_core::SourceLocation whence=grpc_core::SourceLocation ())93 Verifier& ExpectMaybe(
94 int i, bool expect_ok, bool* seen,
95 grpc_core::SourceLocation whence = grpc_core::SourceLocation()) {
96 if (!*seen) {
97 maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen, whence};
98 }
99 return *this;
100 }
101
102 // Next waits for 1 async tag to complete, checks its
103 // expectations, and returns the tag
Next(CompletionQueue * cq,bool ignore_ok)104 int Next(CompletionQueue* cq, bool ignore_ok) {
105 bool ok;
106 void* got_tag;
107 EXPECT_TRUE(cq->Next(&got_tag, &ok));
108 GotTag(got_tag, ok, ignore_ok);
109 return detag(got_tag);
110 }
111
112 template <typename T>
DoOnceThenAsyncNext(CompletionQueue * cq,void ** got_tag,bool * ok,T deadline,std::function<void (void)> lambda)113 CompletionQueue::NextStatus DoOnceThenAsyncNext(
114 CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
115 std::function<void(void)> lambda) {
116 if (lambda_run_) {
117 return cq->AsyncNext(got_tag, ok, deadline);
118 } else {
119 lambda_run_ = true;
120 return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
121 }
122 }
123
124 // Verify keeps calling Next until all currently set
125 // expected tags are complete
Verify(CompletionQueue * cq)126 void Verify(CompletionQueue* cq) { Verify(cq, false); }
127
128 // This version of Verify allows optionally ignoring the
129 // outcome of the expectation
Verify(CompletionQueue * cq,bool ignore_ok)130 void Verify(CompletionQueue* cq, bool ignore_ok) {
131 GPR_ASSERT(!expectations_.empty() || !maybe_expectations_.empty());
132 while (!expectations_.empty()) {
133 Next(cq, ignore_ok);
134 }
135 maybe_expectations_.clear();
136 }
137
138 // This version of Verify stops after a certain deadline
Verify(CompletionQueue * cq,std::chrono::system_clock::time_point deadline)139 void Verify(CompletionQueue* cq,
140 std::chrono::system_clock::time_point deadline) {
141 if (expectations_.empty()) {
142 bool ok;
143 void* got_tag;
144 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
145 CompletionQueue::TIMEOUT);
146 } else {
147 while (!expectations_.empty()) {
148 bool ok;
149 void* got_tag;
150 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
151 CompletionQueue::GOT_EVENT);
152 GotTag(got_tag, ok, false);
153 }
154 }
155 maybe_expectations_.clear();
156 }
157
158 // This version of Verify stops after a certain deadline, and uses the
159 // DoThenAsyncNext API
160 // to call the lambda
Verify(CompletionQueue * cq,std::chrono::system_clock::time_point deadline,const std::function<void (void)> & lambda)161 void Verify(CompletionQueue* cq,
162 std::chrono::system_clock::time_point deadline,
163 const std::function<void(void)>& lambda) {
164 if (expectations_.empty()) {
165 bool ok;
166 void* got_tag;
167 EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
168 CompletionQueue::TIMEOUT);
169 } else {
170 while (!expectations_.empty()) {
171 bool ok;
172 void* got_tag;
173 EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
174 CompletionQueue::GOT_EVENT);
175 GotTag(got_tag, ok, false);
176 }
177 }
178 maybe_expectations_.clear();
179 }
180
181 private:
GotTag(void * got_tag,bool ok,bool ignore_ok)182 void GotTag(void* got_tag, bool ok, bool ignore_ok) {
183 auto it = expectations_.find(got_tag);
184 if (it != expectations_.end()) {
185 if (!ignore_ok) {
186 EXPECT_EQ(it->second.ok, ok) << it->second.ToString(it->first);
187 }
188 expectations_.erase(it);
189 } else {
190 auto it2 = maybe_expectations_.find(got_tag);
191 if (it2 != maybe_expectations_.end()) {
192 if (it2->second.seen != nullptr) {
193 EXPECT_FALSE(*it2->second.seen);
194 *it2->second.seen = true;
195 }
196 if (!ignore_ok) {
197 EXPECT_EQ(it2->second.ok, ok) << it->second.ToString(it->first);
198 }
199 maybe_expectations_.erase(it2);
200 } else {
201 grpc_core::Crash(absl::StrFormat("Unexpected tag: %p", got_tag));
202 }
203 }
204 }
205
206 struct MaybeExpect {
207 bool ok;
208 bool* seen;
209 grpc_core::SourceLocation whence;
ToStringgrpc::testing::__anon04e861db0111::Verifier::MaybeExpect210 std::string ToString(void* tag) const {
211 return absl::StrCat(
212 "[MaybeExpect] tag=", reinterpret_cast<uintptr_t>(tag),
213 " expect_ok=", ok, " whence=", whence.file(), ":", whence.line());
214 }
215 };
216
217 struct DefinitelyExpect {
218 bool ok;
219 grpc_core::SourceLocation whence;
ToStringgrpc::testing::__anon04e861db0111::Verifier::DefinitelyExpect220 std::string ToString(void* tag) const {
221 return absl::StrCat("[Expect] tag=", reinterpret_cast<uintptr_t>(tag),
222 " expect_ok=", ok, " whence=", whence.file(), ":",
223 whence.line());
224 }
225 };
226
227 std::map<void*, DefinitelyExpect> expectations_;
228 std::map<void*, MaybeExpect> maybe_expectations_;
229 bool lambda_run_;
230 };
231
plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin> & plugin)232 bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
233 return plugin->has_sync_methods();
234 }
235
236 // This class disables the server builder plugins that may add sync services to
237 // the server. If there are sync services, UnimplementedRpc test will triger
238 // the sync unknown rpc routine on the server side, rather than the async one
239 // that needs to be tested here.
240 class ServerBuilderSyncPluginDisabler : public grpc::ServerBuilderOption {
241 public:
UpdateArguments(ChannelArguments *)242 void UpdateArguments(ChannelArguments* /*arg*/) override {}
243
UpdatePlugins(std::vector<std::unique_ptr<ServerBuilderPlugin>> * plugins)244 void UpdatePlugins(
245 std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) override {
246 plugins->erase(std::remove_if(plugins->begin(), plugins->end(),
247 plugin_has_sync_methods),
248 plugins->end());
249 }
250 };
251
252 class TestScenario {
253 public:
TestScenario(bool inproc_stub,const std::string & creds_type,bool hcs,const std::string & content)254 TestScenario(bool inproc_stub, const std::string& creds_type, bool hcs,
255 const std::string& content)
256 : inproc(inproc_stub),
257 health_check_service(hcs),
258 credentials_type(creds_type),
259 message_content(content) {}
260 void Log() const;
261 bool inproc;
262 bool health_check_service;
263 const std::string credentials_type;
264 const std::string message_content;
265 };
266
operator <<(std::ostream & out,const TestScenario & scenario)267 std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) {
268 return out << "TestScenario{inproc=" << (scenario.inproc ? "true" : "false")
269 << ", credentials='" << scenario.credentials_type
270 << ", health_check_service="
271 << (scenario.health_check_service ? "true" : "false")
272 << "', message_size=" << scenario.message_content.size() << "}";
273 }
274
Log() const275 void TestScenario::Log() const {
276 std::ostringstream out;
277 out << *this;
278 gpr_log(GPR_DEBUG, "%s", out.str().c_str());
279 }
280
281 class HealthCheck : public health::v1::Health::Service {};
282
283 class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
284 protected:
AsyncEnd2endTest()285 AsyncEnd2endTest() { GetParam().Log(); }
286
SetUp()287 void SetUp() override {
288 port_ = grpc_pick_unused_port_or_die();
289 server_address_ << "localhost:" << port_;
290
291 // Setup server
292 BuildAndStartServer();
293 }
294
TearDown()295 void TearDown() override {
296 stub_.reset();
297 ServerShutdown();
298 grpc_recycle_unused_port(port_);
299 }
300
ServerShutdown()301 void ServerShutdown() {
302 std::thread t([this]() {
303 void* ignored_tag;
304 bool ignored_ok;
305 while (cq_->Next(&ignored_tag, &ignored_ok)) {
306 }
307 });
308 server_->Shutdown();
309 cq_->Shutdown();
310 t.join();
311 }
312
BuildAndStartServer()313 void BuildAndStartServer() {
314 ServerBuilder builder;
315 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
316 GetParam().credentials_type);
317 builder.AddListeningPort(server_address_.str(), server_creds);
318 service_ = std::make_unique<grpc::testing::EchoTestService::AsyncService>();
319 builder.RegisterService(service_.get());
320 if (GetParam().health_check_service) {
321 builder.RegisterService(&health_check_);
322 }
323 cq_ = builder.AddCompletionQueue();
324
325 // TODO(zyc): make a test option to choose wheather sync plugins should be
326 // deleted
327 std::unique_ptr<ServerBuilderOption> sync_plugin_disabler(
328 new ServerBuilderSyncPluginDisabler());
329 builder.SetOption(std::move(sync_plugin_disabler));
330 server_ = builder.BuildAndStart();
331 }
332
ResetStub()333 void ResetStub() {
334 ChannelArguments args;
335 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
336 GetParam().credentials_type, &args);
337 std::shared_ptr<Channel> channel =
338 !(GetParam().inproc) ? grpc::CreateCustomChannel(server_address_.str(),
339 channel_creds, args)
340 : server_->InProcessChannel(args);
341 stub_ = grpc::testing::EchoTestService::NewStub(channel);
342 }
343
SendRpc(int num_rpcs)344 void SendRpc(int num_rpcs) {
345 for (int i = 0; i < num_rpcs; i++) {
346 EchoRequest send_request;
347 EchoRequest recv_request;
348 EchoResponse send_response;
349 EchoResponse recv_response;
350 Status recv_status;
351
352 ClientContext cli_ctx;
353 ServerContext srv_ctx;
354 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
355
356 send_request.set_message(GetParam().message_content);
357 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
358 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
359
360 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
361 cq_.get(), cq_.get(), tag(2));
362
363 response_reader->Finish(&recv_response, &recv_status, tag(4));
364
365 Verifier().Expect(2, true).Verify(cq_.get());
366 EXPECT_EQ(send_request.message(), recv_request.message());
367
368 send_response.set_message(recv_request.message());
369 response_writer.Finish(send_response, Status::OK, tag(3));
370 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
371
372 EXPECT_EQ(send_response.message(), recv_response.message());
373 EXPECT_TRUE(recv_status.ok());
374 }
375 }
376
377 std::unique_ptr<ServerCompletionQueue> cq_;
378 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
379 std::unique_ptr<Server> server_;
380 std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
381 HealthCheck health_check_;
382 std::ostringstream server_address_;
383 int port_;
384 };
385
TEST_P(AsyncEnd2endTest,SimpleRpc)386 TEST_P(AsyncEnd2endTest, SimpleRpc) {
387 ResetStub();
388 SendRpc(1);
389 }
390
TEST_P(AsyncEnd2endTest,SimpleRpcWithExpectedError)391 TEST_P(AsyncEnd2endTest, SimpleRpcWithExpectedError) {
392 ResetStub();
393
394 EchoRequest send_request;
395 EchoRequest recv_request;
396 EchoResponse send_response;
397 EchoResponse recv_response;
398 Status recv_status;
399
400 ClientContext cli_ctx;
401 ServerContext srv_ctx;
402 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
403 ErrorStatus error_status;
404
405 send_request.set_message(GetParam().message_content);
406 error_status.set_code(1); // CANCELLED
407 error_status.set_error_message("cancel error message");
408 *send_request.mutable_param()->mutable_expected_error() = error_status;
409
410 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
411 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
412
413 srv_ctx.AsyncNotifyWhenDone(tag(5));
414 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
415 cq_.get(), tag(2));
416
417 response_reader->Finish(&recv_response, &recv_status, tag(4));
418
419 Verifier().Expect(2, true).Verify(cq_.get());
420 EXPECT_EQ(send_request.message(), recv_request.message());
421
422 send_response.set_message(recv_request.message());
423 response_writer.Finish(
424 send_response,
425 Status(
426 static_cast<StatusCode>(recv_request.param().expected_error().code()),
427 recv_request.param().expected_error().error_message()),
428 tag(3));
429 Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
430
431 EXPECT_EQ(recv_response.message(), "");
432 EXPECT_EQ(recv_status.error_code(), error_status.code());
433 EXPECT_EQ(recv_status.error_message(), error_status.error_message());
434 EXPECT_FALSE(srv_ctx.IsCancelled());
435 }
436
TEST_P(AsyncEnd2endTest,SequentialRpcs)437 TEST_P(AsyncEnd2endTest, SequentialRpcs) {
438 ResetStub();
439 SendRpc(10);
440 }
441
TEST_P(AsyncEnd2endTest,ReconnectChannel)442 TEST_P(AsyncEnd2endTest, ReconnectChannel) {
443 // GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS is set to 100ms in main()
444 if (GetParam().inproc) {
445 return;
446 }
447 int poller_slowdown_factor = 1;
448 #ifdef GRPC_POSIX_SOCKET_EV
449 // It needs 2 pollset_works to reconnect the channel with polling engine
450 // "poll"
451 if (grpc_core::ConfigVars::Get().PollStrategy() == "poll") {
452 poller_slowdown_factor = 2;
453 }
454 #endif // GRPC_POSIX_SOCKET_EV
455 ResetStub();
456 SendRpc(1);
457 ServerShutdown();
458 BuildAndStartServer();
459 // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
460 // reconnect the channel.
461 gpr_sleep_until(gpr_time_add(
462 gpr_now(GPR_CLOCK_REALTIME),
463 gpr_time_from_millis(
464 300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
465 GPR_TIMESPAN)));
466 SendRpc(1);
467 }
468
469 // We do not need to protect notify because the use is synchronized.
ServerWait(Server * server,int * notify)470 void ServerWait(Server* server, int* notify) {
471 server->Wait();
472 *notify = 1;
473 }
TEST_P(AsyncEnd2endTest,WaitAndShutdownTest)474 TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
475 int notify = 0;
476 std::thread wait_thread(&ServerWait, server_.get(), ¬ify);
477 ResetStub();
478 SendRpc(1);
479 EXPECT_EQ(0, notify);
480 ServerShutdown();
481 wait_thread.join();
482 EXPECT_EQ(1, notify);
483 }
484
TEST_P(AsyncEnd2endTest,ShutdownThenWait)485 TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
486 ResetStub();
487 SendRpc(1);
488 std::thread t([this]() { ServerShutdown(); });
489 server_->Wait();
490 t.join();
491 }
492
493 // Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest,AsyncNextRpc)494 TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
495 ResetStub();
496
497 EchoRequest send_request;
498 EchoRequest recv_request;
499 EchoResponse send_response;
500 EchoResponse recv_response;
501 Status recv_status;
502
503 ClientContext cli_ctx;
504 ServerContext srv_ctx;
505 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
506
507 send_request.set_message(GetParam().message_content);
508 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
509 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
510
511 std::chrono::system_clock::time_point time_now(
512 std::chrono::system_clock::now());
513 std::chrono::system_clock::time_point time_limit(
514 std::chrono::system_clock::now() + std::chrono::seconds(10));
515 Verifier().Verify(cq_.get(), time_now);
516 Verifier().Verify(cq_.get(), time_now);
517
518 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
519 cq_.get(), tag(2));
520 response_reader->Finish(&recv_response, &recv_status, tag(4));
521
522 Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
523 EXPECT_EQ(send_request.message(), recv_request.message());
524
525 send_response.set_message(recv_request.message());
526 response_writer.Finish(send_response, Status::OK, tag(3));
527 Verifier().Expect(3, true).Expect(4, true).Verify(
528 cq_.get(), std::chrono::system_clock::time_point::max());
529
530 EXPECT_EQ(send_response.message(), recv_response.message());
531 EXPECT_TRUE(recv_status.ok());
532 }
533
534 // Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest,DoThenAsyncNextRpc)535 TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
536 ResetStub();
537
538 EchoRequest send_request;
539 EchoRequest recv_request;
540 EchoResponse send_response;
541 EchoResponse recv_response;
542 Status recv_status;
543
544 ClientContext cli_ctx;
545 ServerContext srv_ctx;
546 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
547
548 send_request.set_message(GetParam().message_content);
549 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
550 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
551
552 std::chrono::system_clock::time_point time_now(
553 std::chrono::system_clock::now());
554 std::chrono::system_clock::time_point time_limit(
555 std::chrono::system_clock::now() + std::chrono::seconds(10));
556 Verifier().Verify(cq_.get(), time_now);
557 Verifier().Verify(cq_.get(), time_now);
558
559 auto resp_writer_ptr = &response_writer;
560 auto lambda_2 = [&, this, resp_writer_ptr]() {
561 service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
562 cq_.get(), tag(2));
563 };
564 response_reader->Finish(&recv_response, &recv_status, tag(4));
565
566 Verifier().Expect(2, true).Verify(cq_.get(), time_limit, lambda_2);
567 EXPECT_EQ(send_request.message(), recv_request.message());
568
569 send_response.set_message(recv_request.message());
570 auto lambda_3 = [resp_writer_ptr, send_response]() {
571 resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
572 };
573 Verifier().Expect(3, true).Expect(4, true).Verify(
574 cq_.get(), std::chrono::system_clock::time_point::max(), lambda_3);
575
576 EXPECT_EQ(send_response.message(), recv_response.message());
577 EXPECT_TRUE(recv_status.ok());
578 }
579
580 // Two pings and a final pong.
TEST_P(AsyncEnd2endTest,SimpleClientStreaming)581 TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
582 ResetStub();
583
584 EchoRequest send_request;
585 EchoRequest recv_request;
586 EchoResponse send_response;
587 EchoResponse recv_response;
588 Status recv_status;
589 ClientContext cli_ctx;
590 ServerContext srv_ctx;
591 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
592
593 send_request.set_message(GetParam().message_content);
594 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
595 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
596
597 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
598 tag(2));
599
600 Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
601
602 cli_stream->Write(send_request, tag(3));
603 srv_stream.Read(&recv_request, tag(4));
604 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
605 EXPECT_EQ(send_request.message(), recv_request.message());
606
607 cli_stream->Write(send_request, tag(5));
608 srv_stream.Read(&recv_request, tag(6));
609 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
610
611 EXPECT_EQ(send_request.message(), recv_request.message());
612 cli_stream->WritesDone(tag(7));
613 srv_stream.Read(&recv_request, tag(8));
614 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
615
616 send_response.set_message(recv_request.message());
617 srv_stream.Finish(send_response, Status::OK, tag(9));
618 cli_stream->Finish(&recv_status, tag(10));
619 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
620
621 EXPECT_EQ(send_response.message(), recv_response.message());
622 EXPECT_TRUE(recv_status.ok());
623 }
624
625 // Two pings and a final pong.
TEST_P(AsyncEnd2endTest,SimpleClientStreamingWithCoalescingApi)626 TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
627 ResetStub();
628
629 EchoRequest send_request;
630 EchoRequest recv_request;
631 EchoResponse send_response;
632 EchoResponse recv_response;
633 Status recv_status;
634 ClientContext cli_ctx;
635 ServerContext srv_ctx;
636 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
637
638 send_request.set_message(GetParam().message_content);
639 cli_ctx.set_initial_metadata_corked(true);
640 // tag:1 never comes up since no op is performed
641 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
642 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
643
644 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
645 tag(2));
646
647 cli_stream->Write(send_request, tag(3));
648
649 bool seen3 = false;
650
651 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
652
653 srv_stream.Read(&recv_request, tag(4));
654
655 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
656
657 EXPECT_EQ(send_request.message(), recv_request.message());
658
659 cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
660 srv_stream.Read(&recv_request, tag(6));
661 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
662 EXPECT_EQ(send_request.message(), recv_request.message());
663
664 srv_stream.Read(&recv_request, tag(7));
665 Verifier().Expect(7, false).Verify(cq_.get());
666
667 send_response.set_message(recv_request.message());
668 srv_stream.Finish(send_response, Status::OK, tag(8));
669 cli_stream->Finish(&recv_status, tag(9));
670 Verifier().Expect(8, true).Expect(9, true).Verify(cq_.get());
671
672 EXPECT_EQ(send_response.message(), recv_response.message());
673 EXPECT_TRUE(recv_status.ok());
674 }
675
676 // One ping, two pongs.
TEST_P(AsyncEnd2endTest,SimpleServerStreaming)677 TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
678 ResetStub();
679
680 EchoRequest send_request;
681 EchoRequest recv_request;
682 EchoResponse send_response;
683 EchoResponse recv_response;
684 Status recv_status;
685 ClientContext cli_ctx;
686 ServerContext srv_ctx;
687 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
688
689 send_request.set_message(GetParam().message_content);
690 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
691 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
692
693 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
694 cq_.get(), cq_.get(), tag(2));
695
696 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
697 EXPECT_EQ(send_request.message(), recv_request.message());
698
699 send_response.set_message(recv_request.message());
700 srv_stream.Write(send_response, tag(3));
701 cli_stream->Read(&recv_response, tag(4));
702 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
703 EXPECT_EQ(send_response.message(), recv_response.message());
704
705 srv_stream.Write(send_response, tag(5));
706 cli_stream->Read(&recv_response, tag(6));
707 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
708 EXPECT_EQ(send_response.message(), recv_response.message());
709
710 srv_stream.Finish(Status::OK, tag(7));
711 cli_stream->Read(&recv_response, tag(8));
712 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
713
714 cli_stream->Finish(&recv_status, tag(9));
715 Verifier().Expect(9, true).Verify(cq_.get());
716
717 EXPECT_TRUE(recv_status.ok());
718 }
719
720 // One ping, two pongs. Using WriteAndFinish API
TEST_P(AsyncEnd2endTest,SimpleServerStreamingWithCoalescingApiWAF)721 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
722 ResetStub();
723
724 EchoRequest send_request;
725 EchoRequest recv_request;
726 EchoResponse send_response;
727 EchoResponse recv_response;
728 Status recv_status;
729 ClientContext cli_ctx;
730 ServerContext srv_ctx;
731 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
732
733 send_request.set_message(GetParam().message_content);
734 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
735 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
736
737 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
738 cq_.get(), cq_.get(), tag(2));
739
740 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
741 EXPECT_EQ(send_request.message(), recv_request.message());
742
743 send_response.set_message(recv_request.message());
744 srv_stream.Write(send_response, tag(3));
745 cli_stream->Read(&recv_response, tag(4));
746 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
747 EXPECT_EQ(send_response.message(), recv_response.message());
748
749 srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
750 cli_stream->Read(&recv_response, tag(6));
751 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
752 EXPECT_EQ(send_response.message(), recv_response.message());
753
754 cli_stream->Read(&recv_response, tag(7));
755 Verifier().Expect(7, false).Verify(cq_.get());
756
757 cli_stream->Finish(&recv_status, tag(8));
758 Verifier().Expect(8, true).Verify(cq_.get());
759
760 EXPECT_TRUE(recv_status.ok());
761 }
762
763 // One ping, two pongs. Using WriteLast API
TEST_P(AsyncEnd2endTest,SimpleServerStreamingWithCoalescingApiWL)764 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
765 ResetStub();
766
767 EchoRequest send_request;
768 EchoRequest recv_request;
769 EchoResponse send_response;
770 EchoResponse recv_response;
771 Status recv_status;
772 ClientContext cli_ctx;
773 ServerContext srv_ctx;
774 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
775
776 send_request.set_message(GetParam().message_content);
777 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
778 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
779
780 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
781 cq_.get(), cq_.get(), tag(2));
782
783 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
784 EXPECT_EQ(send_request.message(), recv_request.message());
785
786 send_response.set_message(recv_request.message());
787 srv_stream.Write(send_response, tag(3));
788 cli_stream->Read(&recv_response, tag(4));
789 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
790 EXPECT_EQ(send_response.message(), recv_response.message());
791
792 srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
793 cli_stream->Read(&recv_response, tag(6));
794 srv_stream.Finish(Status::OK, tag(7));
795 Verifier().Expect(5, true).Expect(6, true).Expect(7, true).Verify(cq_.get());
796 EXPECT_EQ(send_response.message(), recv_response.message());
797
798 cli_stream->Read(&recv_response, tag(8));
799 Verifier().Expect(8, false).Verify(cq_.get());
800
801 cli_stream->Finish(&recv_status, tag(9));
802 Verifier().Expect(9, true).Verify(cq_.get());
803
804 EXPECT_TRUE(recv_status.ok());
805 }
806
807 // One ping, one pong.
TEST_P(AsyncEnd2endTest,SimpleBidiStreaming)808 TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
809 ResetStub();
810
811 EchoRequest send_request;
812 EchoRequest recv_request;
813 EchoResponse send_response;
814 EchoResponse recv_response;
815 Status recv_status;
816 ClientContext cli_ctx;
817 ServerContext srv_ctx;
818 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
819
820 send_request.set_message(GetParam().message_content);
821 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
822 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
823
824 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
825 tag(2));
826
827 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
828
829 cli_stream->Write(send_request, tag(3));
830 srv_stream.Read(&recv_request, tag(4));
831 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
832 EXPECT_EQ(send_request.message(), recv_request.message());
833
834 send_response.set_message(recv_request.message());
835 srv_stream.Write(send_response, tag(5));
836 cli_stream->Read(&recv_response, tag(6));
837 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
838 EXPECT_EQ(send_response.message(), recv_response.message());
839
840 cli_stream->WritesDone(tag(7));
841 srv_stream.Read(&recv_request, tag(8));
842 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
843
844 srv_stream.Finish(Status::OK, tag(9));
845 cli_stream->Finish(&recv_status, tag(10));
846 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
847
848 EXPECT_TRUE(recv_status.ok());
849 }
850
851 // One ping, one pong. Using server:WriteAndFinish api
TEST_P(AsyncEnd2endTest,SimpleBidiStreamingWithCoalescingApiWAF)852 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
853 ResetStub();
854
855 EchoRequest send_request;
856 EchoRequest recv_request;
857 EchoResponse send_response;
858 EchoResponse recv_response;
859 Status recv_status;
860 ClientContext cli_ctx;
861 ServerContext srv_ctx;
862 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
863
864 send_request.set_message(GetParam().message_content);
865 cli_ctx.set_initial_metadata_corked(true);
866 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
867 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
868
869 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
870 tag(2));
871
872 cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
873
874 bool seen3 = false;
875
876 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
877
878 srv_stream.Read(&recv_request, tag(4));
879
880 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
881 EXPECT_EQ(send_request.message(), recv_request.message());
882
883 srv_stream.Read(&recv_request, tag(5));
884 Verifier().Expect(5, false).Verify(cq_.get());
885
886 send_response.set_message(recv_request.message());
887 srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
888 cli_stream->Read(&recv_response, tag(7));
889 Verifier().Expect(6, true).Expect(7, true).Verify(cq_.get());
890 EXPECT_EQ(send_response.message(), recv_response.message());
891
892 cli_stream->Finish(&recv_status, tag(8));
893 Verifier().Expect(8, true).Verify(cq_.get());
894
895 EXPECT_TRUE(recv_status.ok());
896 }
897
898 // One ping, one pong. Using server:WriteLast api
TEST_P(AsyncEnd2endTest,SimpleBidiStreamingWithCoalescingApiWL)899 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
900 ResetStub();
901
902 EchoRequest send_request;
903 EchoRequest recv_request;
904 EchoResponse send_response;
905 EchoResponse recv_response;
906 Status recv_status;
907 ClientContext cli_ctx;
908 ServerContext srv_ctx;
909 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
910
911 send_request.set_message(GetParam().message_content);
912 cli_ctx.set_initial_metadata_corked(true);
913 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
914 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
915
916 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
917 tag(2));
918
919 cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
920
921 bool seen3 = false;
922
923 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
924
925 srv_stream.Read(&recv_request, tag(4));
926
927 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
928 EXPECT_EQ(send_request.message(), recv_request.message());
929
930 srv_stream.Read(&recv_request, tag(5));
931 Verifier().Expect(5, false).Verify(cq_.get());
932
933 send_response.set_message(recv_request.message());
934 srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
935 srv_stream.Finish(Status::OK, tag(7));
936 cli_stream->Read(&recv_response, tag(8));
937 Verifier().Expect(6, true).Expect(7, true).Expect(8, true).Verify(cq_.get());
938 EXPECT_EQ(send_response.message(), recv_response.message());
939
940 cli_stream->Finish(&recv_status, tag(9));
941 Verifier().Expect(9, true).Verify(cq_.get());
942
943 EXPECT_TRUE(recv_status.ok());
944 }
945
946 // Metadata tests
TEST_P(AsyncEnd2endTest,ClientInitialMetadataRpc)947 TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
948 ResetStub();
949
950 EchoRequest send_request;
951 EchoRequest recv_request;
952 EchoResponse send_response;
953 EchoResponse recv_response;
954 Status recv_status;
955
956 ClientContext cli_ctx;
957 ServerContext srv_ctx;
958 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
959
960 send_request.set_message(GetParam().message_content);
961 std::pair<std::string, std::string> meta1("key1", "val1");
962 std::pair<std::string, std::string> meta2("key2", "val2");
963 std::pair<std::string, std::string> meta3("g.r.d-bin", "xyz");
964 cli_ctx.AddMetadata(meta1.first, meta1.second);
965 cli_ctx.AddMetadata(meta2.first, meta2.second);
966 cli_ctx.AddMetadata(meta3.first, meta3.second);
967
968 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
969 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
970 response_reader->Finish(&recv_response, &recv_status, tag(4));
971
972 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
973 cq_.get(), tag(2));
974 Verifier().Expect(2, true).Verify(cq_.get());
975 EXPECT_EQ(send_request.message(), recv_request.message());
976 const auto& client_initial_metadata = srv_ctx.client_metadata();
977 EXPECT_EQ(meta1.second,
978 ToString(client_initial_metadata.find(meta1.first)->second));
979 EXPECT_EQ(meta2.second,
980 ToString(client_initial_metadata.find(meta2.first)->second));
981 EXPECT_EQ(meta3.second,
982 ToString(client_initial_metadata.find(meta3.first)->second));
983 EXPECT_GE(client_initial_metadata.size(), 2);
984
985 send_response.set_message(recv_request.message());
986 response_writer.Finish(send_response, Status::OK, tag(3));
987 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
988
989 EXPECT_EQ(send_response.message(), recv_response.message());
990 EXPECT_TRUE(recv_status.ok());
991 }
992
TEST_P(AsyncEnd2endTest,ServerInitialMetadataRpc)993 TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
994 ResetStub();
995
996 EchoRequest send_request;
997 EchoRequest recv_request;
998 EchoResponse send_response;
999 EchoResponse recv_response;
1000 Status recv_status;
1001
1002 ClientContext cli_ctx;
1003 ServerContext srv_ctx;
1004 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1005
1006 send_request.set_message(GetParam().message_content);
1007 std::pair<std::string, std::string> meta1("key1", "val1");
1008 std::pair<std::string, std::string> meta2("key2", "val2");
1009
1010 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1011 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1012 response_reader->ReadInitialMetadata(tag(4));
1013
1014 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1015 cq_.get(), tag(2));
1016 Verifier().Expect(2, true).Verify(cq_.get());
1017 EXPECT_EQ(send_request.message(), recv_request.message());
1018 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1019 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1020 response_writer.SendInitialMetadata(tag(3));
1021 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1022 const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1023 EXPECT_EQ(meta1.second,
1024 ToString(server_initial_metadata.find(meta1.first)->second));
1025 EXPECT_EQ(meta2.second,
1026 ToString(server_initial_metadata.find(meta2.first)->second));
1027 EXPECT_EQ(2, server_initial_metadata.size());
1028
1029 send_response.set_message(recv_request.message());
1030 response_writer.Finish(send_response, Status::OK, tag(5));
1031 response_reader->Finish(&recv_response, &recv_status, tag(6));
1032 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1033
1034 EXPECT_EQ(send_response.message(), recv_response.message());
1035 EXPECT_TRUE(recv_status.ok());
1036 }
1037
1038 // 1 ping, 2 pongs.
TEST_P(AsyncEnd2endTest,ServerInitialMetadataServerStreaming)1039 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreaming) {
1040 ResetStub();
1041 EchoRequest send_request;
1042 EchoRequest recv_request;
1043 EchoResponse send_response;
1044 EchoResponse recv_response;
1045 Status recv_status;
1046 ClientContext cli_ctx;
1047 ServerContext srv_ctx;
1048 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1049
1050 std::pair<::std::string, ::std::string> meta1("key1", "val1");
1051 std::pair<::std::string, ::std::string> meta2("key2", "val2");
1052
1053 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1054 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1055 cli_stream->ReadInitialMetadata(tag(11));
1056 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1057 cq_.get(), cq_.get(), tag(2));
1058
1059 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1060
1061 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1062 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1063 srv_stream.SendInitialMetadata(tag(10));
1064 Verifier().Expect(10, true).Expect(11, true).Verify(cq_.get());
1065 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1066 EXPECT_EQ(meta1.second,
1067 ToString(server_initial_metadata.find(meta1.first)->second));
1068 EXPECT_EQ(meta2.second,
1069 ToString(server_initial_metadata.find(meta2.first)->second));
1070 EXPECT_EQ(2, server_initial_metadata.size());
1071
1072 srv_stream.Write(send_response, tag(3));
1073
1074 cli_stream->Read(&recv_response, tag(4));
1075 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1076
1077 srv_stream.Write(send_response, tag(5));
1078 cli_stream->Read(&recv_response, tag(6));
1079 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1080
1081 srv_stream.Finish(Status::OK, tag(7));
1082 cli_stream->Read(&recv_response, tag(8));
1083 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1084
1085 cli_stream->Finish(&recv_status, tag(9));
1086 Verifier().Expect(9, true).Verify(cq_.get());
1087
1088 EXPECT_TRUE(recv_status.ok());
1089 }
1090
1091 // 1 ping, 2 pongs.
1092 // Test for server initial metadata being sent implicitly
TEST_P(AsyncEnd2endTest,ServerInitialMetadataServerStreamingImplicit)1093 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreamingImplicit) {
1094 ResetStub();
1095 EchoRequest send_request;
1096 EchoRequest recv_request;
1097 EchoResponse send_response;
1098 EchoResponse recv_response;
1099 Status recv_status;
1100 ClientContext cli_ctx;
1101 ServerContext srv_ctx;
1102 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1103
1104 send_request.set_message(GetParam().message_content);
1105 std::pair<::std::string, ::std::string> meta1("key1", "val1");
1106 std::pair<::std::string, ::std::string> meta2("key2", "val2");
1107
1108 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1109 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1110 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1111 cq_.get(), cq_.get(), tag(2));
1112
1113 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1114 EXPECT_EQ(send_request.message(), recv_request.message());
1115
1116 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1117 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1118 send_response.set_message(recv_request.message());
1119 srv_stream.Write(send_response, tag(3));
1120
1121 cli_stream->Read(&recv_response, tag(4));
1122 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1123 EXPECT_EQ(send_response.message(), recv_response.message());
1124
1125 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1126 EXPECT_EQ(meta1.second,
1127 ToString(server_initial_metadata.find(meta1.first)->second));
1128 EXPECT_EQ(meta2.second,
1129 ToString(server_initial_metadata.find(meta2.first)->second));
1130 EXPECT_EQ(2, server_initial_metadata.size());
1131
1132 srv_stream.Write(send_response, tag(5));
1133 cli_stream->Read(&recv_response, tag(6));
1134 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1135
1136 srv_stream.Finish(Status::OK, tag(7));
1137 cli_stream->Read(&recv_response, tag(8));
1138 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1139
1140 cli_stream->Finish(&recv_status, tag(9));
1141 Verifier().Expect(9, true).Verify(cq_.get());
1142
1143 EXPECT_TRUE(recv_status.ok());
1144 }
1145
TEST_P(AsyncEnd2endTest,ServerTrailingMetadataRpc)1146 TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
1147 ResetStub();
1148
1149 EchoRequest send_request;
1150 EchoRequest recv_request;
1151 EchoResponse send_response;
1152 EchoResponse recv_response;
1153 Status recv_status;
1154
1155 ClientContext cli_ctx;
1156 ServerContext srv_ctx;
1157 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1158
1159 send_request.set_message(GetParam().message_content);
1160 std::pair<std::string, std::string> meta1("key1", "val1");
1161 std::pair<std::string, std::string> meta2("key2", "val2");
1162
1163 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1164 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1165 response_reader->Finish(&recv_response, &recv_status, tag(5));
1166
1167 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1168 cq_.get(), tag(2));
1169 Verifier().Expect(2, true).Verify(cq_.get());
1170 EXPECT_EQ(send_request.message(), recv_request.message());
1171 response_writer.SendInitialMetadata(tag(3));
1172 Verifier().Expect(3, true).Verify(cq_.get());
1173
1174 send_response.set_message(recv_request.message());
1175 srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
1176 srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
1177 response_writer.Finish(send_response, Status::OK, tag(4));
1178
1179 Verifier().Expect(4, true).Expect(5, true).Verify(cq_.get());
1180
1181 EXPECT_EQ(send_response.message(), recv_response.message());
1182 EXPECT_TRUE(recv_status.ok());
1183 const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1184 EXPECT_EQ(meta1.second,
1185 ToString(server_trailing_metadata.find(meta1.first)->second));
1186 EXPECT_EQ(meta2.second,
1187 ToString(server_trailing_metadata.find(meta2.first)->second));
1188 EXPECT_EQ(2, server_trailing_metadata.size());
1189 }
1190
TEST_P(AsyncEnd2endTest,MetadataRpc)1191 TEST_P(AsyncEnd2endTest, MetadataRpc) {
1192 ResetStub();
1193
1194 EchoRequest send_request;
1195 EchoRequest recv_request;
1196 EchoResponse send_response;
1197 EchoResponse recv_response;
1198 Status recv_status;
1199
1200 ClientContext cli_ctx;
1201 ServerContext srv_ctx;
1202 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1203
1204 send_request.set_message(GetParam().message_content);
1205 std::pair<std::string, std::string> meta1("key1", "val1");
1206 std::pair<std::string, std::string> meta2(
1207 "key2-bin",
1208 std::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
1209 std::pair<std::string, std::string> meta3("key3", "val3");
1210 std::pair<std::string, std::string> meta6(
1211 "key4-bin",
1212 std::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
1213 14));
1214 std::pair<std::string, std::string> meta5("key5", "val5");
1215 std::pair<std::string, std::string> meta4(
1216 "key6-bin",
1217 std::string(
1218 "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
1219
1220 cli_ctx.AddMetadata(meta1.first, meta1.second);
1221 cli_ctx.AddMetadata(meta2.first, meta2.second);
1222
1223 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1224 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1225 response_reader->ReadInitialMetadata(tag(4));
1226
1227 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1228 cq_.get(), tag(2));
1229 Verifier().Expect(2, true).Verify(cq_.get());
1230 EXPECT_EQ(send_request.message(), recv_request.message());
1231 const auto& client_initial_metadata = srv_ctx.client_metadata();
1232 EXPECT_EQ(meta1.second,
1233 ToString(client_initial_metadata.find(meta1.first)->second));
1234 EXPECT_EQ(meta2.second,
1235 ToString(client_initial_metadata.find(meta2.first)->second));
1236 EXPECT_GE(client_initial_metadata.size(), 2);
1237
1238 srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
1239 srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
1240 response_writer.SendInitialMetadata(tag(3));
1241 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1242 const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1243 EXPECT_EQ(meta3.second,
1244 ToString(server_initial_metadata.find(meta3.first)->second));
1245 EXPECT_EQ(meta4.second,
1246 ToString(server_initial_metadata.find(meta4.first)->second));
1247 EXPECT_GE(server_initial_metadata.size(), 2);
1248
1249 send_response.set_message(recv_request.message());
1250 srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
1251 srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
1252 response_writer.Finish(send_response, Status::OK, tag(5));
1253 response_reader->Finish(&recv_response, &recv_status, tag(6));
1254
1255 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1256
1257 EXPECT_EQ(send_response.message(), recv_response.message());
1258 EXPECT_TRUE(recv_status.ok());
1259 const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1260 EXPECT_EQ(meta5.second,
1261 ToString(server_trailing_metadata.find(meta5.first)->second));
1262 EXPECT_EQ(meta6.second,
1263 ToString(server_trailing_metadata.find(meta6.first)->second));
1264 EXPECT_GE(server_trailing_metadata.size(), 2);
1265 }
1266
1267 // Server uses AsyncNotifyWhenDone API to check for cancellation
TEST_P(AsyncEnd2endTest,ServerCheckCancellation)1268 TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
1269 ResetStub();
1270
1271 EchoRequest send_request;
1272 EchoRequest recv_request;
1273 EchoResponse send_response;
1274 EchoResponse recv_response;
1275 Status recv_status;
1276
1277 ClientContext cli_ctx;
1278 ServerContext srv_ctx;
1279 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1280
1281 send_request.set_message(GetParam().message_content);
1282 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1283 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1284 response_reader->Finish(&recv_response, &recv_status, tag(4));
1285
1286 srv_ctx.AsyncNotifyWhenDone(tag(5));
1287 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1288 cq_.get(), tag(2));
1289
1290 Verifier().Expect(2, true).Verify(cq_.get());
1291 EXPECT_EQ(send_request.message(), recv_request.message());
1292
1293 cli_ctx.TryCancel();
1294 Verifier().Expect(5, true).Expect(4, true).Verify(cq_.get());
1295 EXPECT_TRUE(srv_ctx.IsCancelled());
1296
1297 EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
1298 }
1299
1300 // Server uses AsyncNotifyWhenDone API to check for normal finish
TEST_P(AsyncEnd2endTest,ServerCheckDone)1301 TEST_P(AsyncEnd2endTest, ServerCheckDone) {
1302 ResetStub();
1303
1304 EchoRequest send_request;
1305 EchoRequest recv_request;
1306 EchoResponse send_response;
1307 EchoResponse recv_response;
1308 Status recv_status;
1309
1310 ClientContext cli_ctx;
1311 ServerContext srv_ctx;
1312 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1313
1314 send_request.set_message(GetParam().message_content);
1315 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1316 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1317 response_reader->Finish(&recv_response, &recv_status, tag(4));
1318
1319 srv_ctx.AsyncNotifyWhenDone(tag(5));
1320 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1321 cq_.get(), tag(2));
1322
1323 Verifier().Expect(2, true).Verify(cq_.get());
1324 EXPECT_EQ(send_request.message(), recv_request.message());
1325
1326 send_response.set_message(recv_request.message());
1327 response_writer.Finish(send_response, Status::OK, tag(3));
1328 Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
1329 EXPECT_FALSE(srv_ctx.IsCancelled());
1330
1331 EXPECT_EQ(send_response.message(), recv_response.message());
1332 EXPECT_TRUE(recv_status.ok());
1333 }
1334
TEST_P(AsyncEnd2endTest,UnimplementedRpc)1335 TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
1336 ChannelArguments args;
1337 const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1338 GetParam().credentials_type, &args);
1339 std::shared_ptr<Channel> channel =
1340 !(GetParam().inproc) ? grpc::CreateCustomChannel(server_address_.str(),
1341 channel_creds, args)
1342 : server_->InProcessChannel(args);
1343 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1344 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1345 EchoRequest send_request;
1346 EchoResponse recv_response;
1347 Status recv_status;
1348
1349 ClientContext cli_ctx;
1350 send_request.set_message(GetParam().message_content);
1351 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1352 stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
1353
1354 response_reader->Finish(&recv_response, &recv_status, tag(4));
1355 Verifier().Expect(4, true).Verify(cq_.get());
1356
1357 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
1358 EXPECT_EQ("", recv_status.error_message());
1359 }
1360
1361 // This class is for testing scenarios where RPCs are cancelled on the server
1362 // by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
1363 // API to check for cancellation
1364 class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
1365 protected:
1366 typedef enum {
1367 DO_NOT_CANCEL = 0,
1368 CANCEL_BEFORE_PROCESSING,
1369 CANCEL_DURING_PROCESSING,
1370 CANCEL_AFTER_PROCESSING
1371 } ServerTryCancelRequestPhase;
1372
1373 // Helper for testing client-streaming RPCs which are cancelled on the server.
1374 // Depending on the value of server_try_cancel parameter, this will test one
1375 // of the following three scenarios:
1376 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
1377 // any messages from the client
1378 //
1379 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1380 // messages from the client
1381 //
1382 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1383 // messages from the client (but before sending any status back to the
1384 // client)
TestClientStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1385 void TestClientStreamingServerCancel(
1386 ServerTryCancelRequestPhase server_try_cancel) {
1387 ResetStub();
1388
1389 EchoRequest recv_request;
1390 EchoResponse send_response;
1391 EchoResponse recv_response;
1392 Status recv_status;
1393
1394 ClientContext cli_ctx;
1395 ServerContext srv_ctx;
1396 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1397
1398 // Initiate the 'RequestStream' call on client
1399 CompletionQueue cli_cq;
1400
1401 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
1402 stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1)));
1403
1404 // On the server, request to be notified of 'RequestStream' calls
1405 // and receive the 'RequestStream' call just made by the client
1406 srv_ctx.AsyncNotifyWhenDone(tag(11));
1407 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1408 tag(2));
1409 std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1410 Verifier().Expect(2, true).Verify(cq_.get());
1411 t1.join();
1412
1413 bool expected_server_cq_result = true;
1414 bool expected_client_cq_result = true;
1415
1416 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1417 srv_ctx.TryCancel();
1418 Verifier().Expect(11, true).Verify(cq_.get());
1419 EXPECT_TRUE(srv_ctx.IsCancelled());
1420
1421 // Since cancellation is done before server reads any results, we know
1422 // for sure that all server cq results will return false from this
1423 // point forward
1424 expected_server_cq_result = false;
1425 expected_client_cq_result = false;
1426 }
1427
1428 bool ignore_client_cq_result =
1429 (server_try_cancel == CANCEL_DURING_PROCESSING) ||
1430 (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1431
1432 std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1433 &ignore_client_cq_result] {
1434 EchoRequest send_request;
1435 // Client sends 3 messages (tags 3, 4 and 5)
1436 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1437 send_request.set_message("Ping " + std::to_string(tag_idx));
1438 cli_stream->Write(send_request, tag(tag_idx));
1439 Verifier()
1440 .Expect(tag_idx, expected_client_cq_result)
1441 .Verify(&cli_cq, ignore_client_cq_result);
1442 }
1443 cli_stream->WritesDone(tag(6));
1444 // Ignore ok on WritesDone since cancel can affect it
1445 Verifier()
1446 .Expect(6, expected_client_cq_result)
1447 .Verify(&cli_cq, ignore_client_cq_result);
1448 });
1449
1450 bool ignore_cq_result = false;
1451 bool want_done_tag = false;
1452 std::thread* server_try_cancel_thd = nullptr;
1453
1454 auto verif = Verifier();
1455
1456 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1457 server_try_cancel_thd =
1458 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1459 // Server will cancel the RPC in a parallel thread while reading the
1460 // requests from the client. Since the cancellation can happen at anytime,
1461 // some of the cq results (i.e those until cancellation) might be true but
1462 // its non deterministic. So better to ignore the cq results
1463 ignore_cq_result = true;
1464 // Expect that we might possibly see the done tag that
1465 // indicates cancellation completion in this case
1466 want_done_tag = true;
1467 verif.Expect(11, true);
1468 }
1469
1470 // Server reads 3 messages (tags 6, 7 and 8)
1471 // But if want_done_tag is true, we might also see tag 11
1472 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1473 srv_stream.Read(&recv_request, tag(tag_idx));
1474 // Note that we'll add something to the verifier and verify that
1475 // something was seen, but it might be tag 11 and not what we
1476 // just added
1477 int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
1478 .Next(cq_.get(), ignore_cq_result);
1479 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1480 if (got_tag == 11) {
1481 EXPECT_TRUE(srv_ctx.IsCancelled());
1482 want_done_tag = false;
1483 // Now get the other entry that we were waiting on
1484 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1485 }
1486 }
1487
1488 cli_thread.join();
1489
1490 if (server_try_cancel_thd != nullptr) {
1491 server_try_cancel_thd->join();
1492 delete server_try_cancel_thd;
1493 }
1494
1495 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1496 srv_ctx.TryCancel();
1497 want_done_tag = true;
1498 verif.Expect(11, true);
1499 }
1500
1501 if (want_done_tag) {
1502 verif.Verify(cq_.get());
1503 EXPECT_TRUE(srv_ctx.IsCancelled());
1504 want_done_tag = false;
1505 }
1506
1507 // The RPC has been cancelled at this point for sure (i.e irrespective of
1508 // the value of `server_try_cancel` is). So, from this point forward, we
1509 // know that cq results are supposed to return false on server.
1510
1511 // Server sends the final message and cancelled status (but the RPC is
1512 // already cancelled at this point. So we expect the operation to fail)
1513 srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
1514 Verifier().Expect(9, false).Verify(cq_.get());
1515
1516 // Client will see the cancellation
1517 cli_stream->Finish(&recv_status, tag(10));
1518 Verifier().Expect(10, true).Verify(&cli_cq);
1519 EXPECT_FALSE(recv_status.ok());
1520 EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1521
1522 cli_cq.Shutdown();
1523 void* phony_tag;
1524 bool phony_ok;
1525 while (cli_cq.Next(&phony_tag, &phony_ok)) {
1526 }
1527 }
1528
1529 // Helper for testing server-streaming RPCs which are cancelled on the server.
1530 // Depending on the value of server_try_cancel parameter, this will test one
1531 // of the following three scenarios:
1532 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
1533 // any messages to the client
1534 //
1535 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
1536 // messages to the client
1537 //
1538 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
1539 // messages to the client (but before sending any status back to the
1540 // client)
TestServerStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1541 void TestServerStreamingServerCancel(
1542 ServerTryCancelRequestPhase server_try_cancel) {
1543 ResetStub();
1544
1545 EchoRequest send_request;
1546 EchoRequest recv_request;
1547 EchoResponse send_response;
1548 Status recv_status;
1549 ClientContext cli_ctx;
1550 ServerContext srv_ctx;
1551 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1552
1553 send_request.set_message("Ping");
1554 // Initiate the 'ResponseStream' call on the client
1555 CompletionQueue cli_cq;
1556 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1557 stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1)));
1558 // On the server, request to be notified of 'ResponseStream' calls and
1559 // receive the call just made by the client
1560 srv_ctx.AsyncNotifyWhenDone(tag(11));
1561 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1562 cq_.get(), cq_.get(), tag(2));
1563
1564 std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1565 Verifier().Expect(2, true).Verify(cq_.get());
1566 t1.join();
1567
1568 EXPECT_EQ(send_request.message(), recv_request.message());
1569
1570 bool expected_cq_result = true;
1571 bool ignore_cq_result = false;
1572 bool want_done_tag = false;
1573 bool expected_client_cq_result = true;
1574 bool ignore_client_cq_result =
1575 (server_try_cancel != CANCEL_BEFORE_PROCESSING);
1576
1577 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1578 srv_ctx.TryCancel();
1579 Verifier().Expect(11, true).Verify(cq_.get());
1580 EXPECT_TRUE(srv_ctx.IsCancelled());
1581
1582 // We know for sure that all cq results will be false from this point
1583 // since the server cancelled the RPC
1584 expected_cq_result = false;
1585 expected_client_cq_result = false;
1586 }
1587
1588 std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1589 &ignore_client_cq_result] {
1590 // Client attempts to read the three messages from the server
1591 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1592 EchoResponse recv_response;
1593 cli_stream->Read(&recv_response, tag(tag_idx));
1594 Verifier()
1595 .Expect(tag_idx, expected_client_cq_result)
1596 .Verify(&cli_cq, ignore_client_cq_result);
1597 }
1598 });
1599
1600 std::thread* server_try_cancel_thd = nullptr;
1601
1602 auto verif = Verifier();
1603
1604 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1605 server_try_cancel_thd =
1606 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1607
1608 // Server will cancel the RPC in a parallel thread while writing responses
1609 // to the client. Since the cancellation can happen at anytime, some of
1610 // the cq results (i.e those until cancellation) might be true but it is
1611 // non deterministic. So better to ignore the cq results
1612 ignore_cq_result = true;
1613 // Expect that we might possibly see the done tag that
1614 // indicates cancellation completion in this case
1615 want_done_tag = true;
1616 verif.Expect(11, true);
1617 }
1618
1619 // Server sends three messages (tags 3, 4 and 5)
1620 // But if want_done tag is true, we might also see tag 11
1621 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1622 send_response.set_message("Pong " + std::to_string(tag_idx));
1623 srv_stream.Write(send_response, tag(tag_idx));
1624 // Note that we'll add something to the verifier and verify that
1625 // something was seen, but it might be tag 11 and not what we
1626 // just added
1627 int got_tag = verif.Expect(tag_idx, expected_cq_result)
1628 .Next(cq_.get(), ignore_cq_result);
1629 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1630 if (got_tag == 11) {
1631 EXPECT_TRUE(srv_ctx.IsCancelled());
1632 want_done_tag = false;
1633 // Now get the other entry that we were waiting on
1634 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1635 }
1636 }
1637
1638 if (server_try_cancel_thd != nullptr) {
1639 server_try_cancel_thd->join();
1640 delete server_try_cancel_thd;
1641 }
1642
1643 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1644 srv_ctx.TryCancel();
1645 want_done_tag = true;
1646 verif.Expect(11, true);
1647 }
1648
1649 if (want_done_tag) {
1650 verif.Verify(cq_.get());
1651 EXPECT_TRUE(srv_ctx.IsCancelled());
1652 want_done_tag = false;
1653 }
1654
1655 cli_thread.join();
1656
1657 // The RPC has been cancelled at this point for sure (i.e irrespective of
1658 // the value of `server_try_cancel` is). So, from this point forward, we
1659 // know that cq results are supposed to return false on server.
1660
1661 // Server finishes the stream (but the RPC is already cancelled)
1662 srv_stream.Finish(Status::CANCELLED, tag(9));
1663 Verifier().Expect(9, false).Verify(cq_.get());
1664
1665 // Client will see the cancellation
1666 cli_stream->Finish(&recv_status, tag(10));
1667 Verifier().Expect(10, true).Verify(&cli_cq);
1668 EXPECT_FALSE(recv_status.ok());
1669 EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1670
1671 cli_cq.Shutdown();
1672 void* phony_tag;
1673 bool phony_ok;
1674 while (cli_cq.Next(&phony_tag, &phony_ok)) {
1675 }
1676 }
1677
1678 // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1679 // server.
1680 //
1681 // Depending on the value of server_try_cancel parameter, this will
1682 // test one of the following three scenarios:
1683 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1684 // writing any messages from/to the client
1685 //
1686 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1687 // messages from the client
1688 //
1689 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1690 // messages from the client (but before sending any status back to the
1691 // client)
TestBidiStreamingServerCancel(ServerTryCancelRequestPhase server_try_cancel)1692 void TestBidiStreamingServerCancel(
1693 ServerTryCancelRequestPhase server_try_cancel) {
1694 ResetStub();
1695
1696 EchoRequest send_request;
1697 EchoRequest recv_request;
1698 EchoResponse send_response;
1699 EchoResponse recv_response;
1700 Status recv_status;
1701 ClientContext cli_ctx;
1702 ServerContext srv_ctx;
1703 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1704
1705 // Initiate the call from the client side
1706 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
1707 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
1708
1709 // On the server, request to be notified of the 'BidiStream' call and
1710 // receive the call just made by the client
1711 srv_ctx.AsyncNotifyWhenDone(tag(11));
1712 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1713 tag(2));
1714 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1715
1716 auto verif = Verifier();
1717
1718 // Client sends the first and the only message
1719 send_request.set_message("Ping");
1720 cli_stream->Write(send_request, tag(3));
1721 verif.Expect(3, true);
1722
1723 bool expected_cq_result = true;
1724 bool ignore_cq_result = false;
1725 bool want_done_tag = false;
1726
1727 int got_tag, got_tag2;
1728 bool tag_3_done = false;
1729
1730 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1731 srv_ctx.TryCancel();
1732 verif.Expect(11, true);
1733 // We know for sure that all server cq results will be false from
1734 // this point since the server cancelled the RPC. However, we can't
1735 // say for sure about the client
1736 expected_cq_result = false;
1737 ignore_cq_result = true;
1738
1739 do {
1740 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1741 GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
1742 if (got_tag == 3) {
1743 tag_3_done = true;
1744 }
1745 } while (got_tag != 11);
1746 EXPECT_TRUE(srv_ctx.IsCancelled());
1747 }
1748
1749 std::thread* server_try_cancel_thd = nullptr;
1750
1751 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1752 server_try_cancel_thd =
1753 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1754
1755 // Since server is going to cancel the RPC in a parallel thread, some of
1756 // the cq results (i.e those until the cancellation) might be true. Since
1757 // that number is non-deterministic, it is better to ignore the cq results
1758 ignore_cq_result = true;
1759 // Expect that we might possibly see the done tag that
1760 // indicates cancellation completion in this case
1761 want_done_tag = true;
1762 verif.Expect(11, true);
1763 }
1764
1765 srv_stream.Read(&recv_request, tag(4));
1766 verif.Expect(4, expected_cq_result);
1767 got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result);
1768 got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1769 GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
1770 (got_tag == 11 && want_done_tag));
1771 GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
1772 (got_tag2 == 11 && want_done_tag));
1773 // If we get 3 and 4, we don't need to wait for 11, but if
1774 // we get 11, we should also clear 3 and 4
1775 if (got_tag + got_tag2 != 7) {
1776 EXPECT_TRUE(srv_ctx.IsCancelled());
1777 want_done_tag = false;
1778 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1779 GPR_ASSERT((got_tag == 3) || (got_tag == 4));
1780 }
1781
1782 send_response.set_message("Pong");
1783 srv_stream.Write(send_response, tag(5));
1784 verif.Expect(5, expected_cq_result);
1785
1786 cli_stream->Read(&recv_response, tag(6));
1787 verif.Expect(6, expected_cq_result);
1788 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1789 got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1790 GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
1791 (got_tag == 11 && want_done_tag));
1792 GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
1793 (got_tag2 == 11 && want_done_tag));
1794 // If we get 5 and 6, we don't need to wait for 11, but if
1795 // we get 11, we should also clear 5 and 6
1796 if (got_tag + got_tag2 != 11) {
1797 EXPECT_TRUE(srv_ctx.IsCancelled());
1798 want_done_tag = false;
1799 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1800 GPR_ASSERT((got_tag == 5) || (got_tag == 6));
1801 }
1802
1803 // This is expected to succeed in all cases
1804 cli_stream->WritesDone(tag(7));
1805 verif.Expect(7, true);
1806 // TODO(vjpai): Consider whether the following is too flexible
1807 // or whether it should just be reset to ignore_cq_result
1808 bool ignore_cq_wd_result =
1809 ignore_cq_result || (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1810 got_tag = verif.Next(cq_.get(), ignore_cq_wd_result);
1811 GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1812 if (got_tag == 11) {
1813 EXPECT_TRUE(srv_ctx.IsCancelled());
1814 want_done_tag = false;
1815 // Now get the other entry that we were waiting on
1816 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_wd_result), 7);
1817 }
1818
1819 // This is expected to fail in all cases i.e for all values of
1820 // server_try_cancel. This is because at this point, either there are no
1821 // more msgs from the client (because client called WritesDone) or the RPC
1822 // is cancelled on the server
1823 srv_stream.Read(&recv_request, tag(8));
1824 verif.Expect(8, false);
1825 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1826 GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1827 if (got_tag == 11) {
1828 EXPECT_TRUE(srv_ctx.IsCancelled());
1829 want_done_tag = false;
1830 // Now get the other entry that we were waiting on
1831 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1832 }
1833
1834 if (server_try_cancel_thd != nullptr) {
1835 server_try_cancel_thd->join();
1836 delete server_try_cancel_thd;
1837 }
1838
1839 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1840 srv_ctx.TryCancel();
1841 want_done_tag = true;
1842 verif.Expect(11, true);
1843 }
1844
1845 if (want_done_tag) {
1846 verif.Verify(cq_.get());
1847 EXPECT_TRUE(srv_ctx.IsCancelled());
1848 want_done_tag = false;
1849 }
1850
1851 // The RPC has been cancelled at this point for sure (i.e irrespective of
1852 // the value of `server_try_cancel` is). So, from this point forward, we
1853 // know that cq results are supposed to return false on server.
1854
1855 srv_stream.Finish(Status::CANCELLED, tag(9));
1856 Verifier().Expect(9, false).Verify(cq_.get());
1857
1858 cli_stream->Finish(&recv_status, tag(10));
1859 Verifier().Expect(10, true).Verify(cq_.get());
1860 EXPECT_FALSE(recv_status.ok());
1861 EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1862 }
1863 };
1864
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelBefore)1865 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1866 TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1867 }
1868
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelDuring)1869 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1870 TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1871 }
1872
TEST_P(AsyncEnd2endServerTryCancelTest,ClientStreamingServerTryCancelAfter)1873 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1874 TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1875 }
1876
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelBefore)1877 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1878 TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1879 }
1880
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelDuring)1881 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1882 TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1883 }
1884
TEST_P(AsyncEnd2endServerTryCancelTest,ServerStreamingServerTryCancelAfter)1885 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1886 TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1887 }
1888
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelBefore)1889 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1890 TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1891 }
1892
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelDuring)1893 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1894 TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1895 }
1896
TEST_P(AsyncEnd2endServerTryCancelTest,ServerBidiStreamingTryCancelAfter)1897 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1898 TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1899 }
1900
CreateTestScenarios(bool,bool test_message_size_limit)1901 std::vector<TestScenario> CreateTestScenarios(bool /*test_secure*/,
1902 bool test_message_size_limit) {
1903 std::vector<TestScenario> scenarios;
1904 std::vector<std::string> credentials_types;
1905 std::vector<std::string> messages;
1906
1907 auto insec_ok = [] {
1908 // Only allow insecure credentials type when it is registered with the
1909 // provider. User may create providers that do not have insecure.
1910 return GetCredentialsProvider()->GetChannelCredentials(
1911 kInsecureCredentialsType, nullptr) != nullptr;
1912 };
1913
1914 if (insec_ok()) {
1915 credentials_types.push_back(kInsecureCredentialsType);
1916 }
1917 auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList();
1918 for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
1919 credentials_types.push_back(*sec);
1920 }
1921 GPR_ASSERT(!credentials_types.empty());
1922
1923 messages.push_back("Hello");
1924 if (test_message_size_limit) {
1925 for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024;
1926 k *= 32) {
1927 std::string big_msg;
1928 for (size_t i = 0; i < k * 1024; ++i) {
1929 char c = 'a' + (i % 26);
1930 big_msg += c;
1931 }
1932 messages.push_back(big_msg);
1933 }
1934 if (!BuiltUnderMsan()) {
1935 // 4MB message processing with SSL is very slow under msan
1936 // (causes timeouts) and doesn't really increase the signal from tests.
1937 // Reserve 100 bytes for other fields of the message proto.
1938 messages.push_back(
1939 std::string(GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH - 100, 'a'));
1940 }
1941 }
1942
1943 // TODO (sreek) Renable tests with health check service after the issue
1944 // https://github.com/grpc/grpc/issues/11223 is resolved
1945 for (auto health_check_service : {false}) {
1946 for (auto msg = messages.begin(); msg != messages.end(); msg++) {
1947 for (auto cred = credentials_types.begin();
1948 cred != credentials_types.end(); ++cred) {
1949 scenarios.emplace_back(false, *cred, health_check_service, *msg);
1950 }
1951 if (insec_ok()) {
1952 scenarios.emplace_back(true, kInsecureCredentialsType,
1953 health_check_service, *msg);
1954 }
1955 }
1956 }
1957 return scenarios;
1958 }
1959
1960 INSTANTIATE_TEST_SUITE_P(AsyncEnd2end, AsyncEnd2endTest,
1961 ::testing::ValuesIn(CreateTestScenarios(true, true)));
1962 INSTANTIATE_TEST_SUITE_P(AsyncEnd2endServerTryCancel,
1963 AsyncEnd2endServerTryCancelTest,
1964 ::testing::ValuesIn(CreateTestScenarios(false,
1965 false)));
1966
1967 } // namespace
1968 } // namespace testing
1969 } // namespace grpc
1970
main(int argc,char ** argv)1971 int main(int argc, char** argv) {
1972 // Change the backup poll interval from 5s to 100ms to speed up the
1973 // ReconnectChannel test
1974 grpc_core::ConfigVars::Overrides overrides;
1975 overrides.client_channel_backup_poll_interval_ms = 100;
1976 grpc_core::ConfigVars::SetOverrides(overrides);
1977 grpc::testing::TestEnvironment env(&argc, argv);
1978 ::testing::InitGoogleTest(&argc, argv);
1979 int ret = RUN_ALL_TESTS();
1980 return ret;
1981 }
1982