xref: /aosp_15_r20/external/grpc-grpc/test/cpp/qps/server_async.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker //     http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker 
19*cc02d7e2SAndroid Build Coastguard Worker #include <algorithm>
20*cc02d7e2SAndroid Build Coastguard Worker #include <forward_list>
21*cc02d7e2SAndroid Build Coastguard Worker #include <functional>
22*cc02d7e2SAndroid Build Coastguard Worker #include <memory>
23*cc02d7e2SAndroid Build Coastguard Worker #include <mutex>
24*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
25*cc02d7e2SAndroid Build Coastguard Worker 
26*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/grpc.h>
27*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/alloc.h>
28*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
29*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/generic/async_generic_service.h>
30*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/resource_quota.h>
31*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/security/server_credentials.h>
32*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server.h>
33*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server_builder.h>
34*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server_context.h>
35*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/config.h>
36*cc02d7e2SAndroid Build Coastguard Worker 
37*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/crash.h"
38*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/host_port.h"
39*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/surface/completion_queue.h"
40*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
41*cc02d7e2SAndroid Build Coastguard Worker #include "test/core/util/test_config.h"
42*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/qps_server_builder.h"
43*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/server.h"
44*cc02d7e2SAndroid Build Coastguard Worker 
45*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
46*cc02d7e2SAndroid Build Coastguard Worker namespace testing {
47*cc02d7e2SAndroid Build Coastguard Worker 
48*cc02d7e2SAndroid Build Coastguard Worker template <class RequestType, class ResponseType, class ServiceType,
49*cc02d7e2SAndroid Build Coastguard Worker           class ServerContextType>
50*cc02d7e2SAndroid Build Coastguard Worker class AsyncQpsServerTest final : public grpc::testing::Server {
51*cc02d7e2SAndroid Build Coastguard Worker  public:
AsyncQpsServerTest(const ServerConfig & config,std::function<void (ServerBuilder *,ServiceType *)> register_service,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncResponseWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_unary_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReader<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_client_function,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_server_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_both_ways_function,std::function<grpc::Status (const PayloadConfig &,RequestType *,ResponseType *)> process_rpc)52*cc02d7e2SAndroid Build Coastguard Worker   AsyncQpsServerTest(
53*cc02d7e2SAndroid Build Coastguard Worker       const ServerConfig& config,
54*cc02d7e2SAndroid Build Coastguard Worker       std::function<void(ServerBuilder*, ServiceType*)> register_service,
55*cc02d7e2SAndroid Build Coastguard Worker       std::function<void(ServiceType*, ServerContextType*, RequestType*,
56*cc02d7e2SAndroid Build Coastguard Worker                          ServerAsyncResponseWriter<ResponseType>*,
57*cc02d7e2SAndroid Build Coastguard Worker                          CompletionQueue*, ServerCompletionQueue*, void*)>
58*cc02d7e2SAndroid Build Coastguard Worker           request_unary_function,
59*cc02d7e2SAndroid Build Coastguard Worker       std::function<void(ServiceType*, ServerContextType*,
60*cc02d7e2SAndroid Build Coastguard Worker                          ServerAsyncReaderWriter<ResponseType, RequestType>*,
61*cc02d7e2SAndroid Build Coastguard Worker                          CompletionQueue*, ServerCompletionQueue*, void*)>
62*cc02d7e2SAndroid Build Coastguard Worker           request_streaming_function,
63*cc02d7e2SAndroid Build Coastguard Worker       std::function<void(ServiceType*, ServerContextType*,
64*cc02d7e2SAndroid Build Coastguard Worker                          ServerAsyncReader<ResponseType, RequestType>*,
65*cc02d7e2SAndroid Build Coastguard Worker                          CompletionQueue*, ServerCompletionQueue*, void*)>
66*cc02d7e2SAndroid Build Coastguard Worker           request_streaming_from_client_function,
67*cc02d7e2SAndroid Build Coastguard Worker       std::function<void(ServiceType*, ServerContextType*, RequestType*,
68*cc02d7e2SAndroid Build Coastguard Worker                          ServerAsyncWriter<ResponseType>*, CompletionQueue*,
69*cc02d7e2SAndroid Build Coastguard Worker                          ServerCompletionQueue*, void*)>
70*cc02d7e2SAndroid Build Coastguard Worker           request_streaming_from_server_function,
71*cc02d7e2SAndroid Build Coastguard Worker       std::function<void(ServiceType*, ServerContextType*,
72*cc02d7e2SAndroid Build Coastguard Worker                          ServerAsyncReaderWriter<ResponseType, RequestType>*,
73*cc02d7e2SAndroid Build Coastguard Worker                          CompletionQueue*, ServerCompletionQueue*, void*)>
74*cc02d7e2SAndroid Build Coastguard Worker           request_streaming_both_ways_function,
75*cc02d7e2SAndroid Build Coastguard Worker       std::function<grpc::Status(const PayloadConfig&, RequestType*,
76*cc02d7e2SAndroid Build Coastguard Worker                                  ResponseType*)>
77*cc02d7e2SAndroid Build Coastguard Worker           process_rpc)
78*cc02d7e2SAndroid Build Coastguard Worker       : Server(config) {
79*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
80*cc02d7e2SAndroid Build Coastguard Worker 
81*cc02d7e2SAndroid Build Coastguard Worker     auto port_num = port();
82*cc02d7e2SAndroid Build Coastguard Worker     // Negative port number means inproc server, so no listen port needed
83*cc02d7e2SAndroid Build Coastguard Worker     if (port_num >= 0) {
84*cc02d7e2SAndroid Build Coastguard Worker       std::string server_address = grpc_core::JoinHostPort("::", port_num);
85*cc02d7e2SAndroid Build Coastguard Worker       builder->AddListeningPort(
86*cc02d7e2SAndroid Build Coastguard Worker           server_address, Server::CreateServerCredentials(config), &port_num);
87*cc02d7e2SAndroid Build Coastguard Worker     }
88*cc02d7e2SAndroid Build Coastguard Worker 
89*cc02d7e2SAndroid Build Coastguard Worker     register_service(builder.get(), &async_service_);
90*cc02d7e2SAndroid Build Coastguard Worker 
91*cc02d7e2SAndroid Build Coastguard Worker     int num_threads = config.async_server_threads();
92*cc02d7e2SAndroid Build Coastguard Worker     if (num_threads <= 0) {  // dynamic sizing
93*cc02d7e2SAndroid Build Coastguard Worker       num_threads = std::min(64, cores());
94*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_INFO,
95*cc02d7e2SAndroid Build Coastguard Worker               "Sizing async server to %d threads. Defaults to number of cores "
96*cc02d7e2SAndroid Build Coastguard Worker               "in machine or 64 threads if machine has more than 64 cores to "
97*cc02d7e2SAndroid Build Coastguard Worker               "avoid OOMs.",
98*cc02d7e2SAndroid Build Coastguard Worker               num_threads);
99*cc02d7e2SAndroid Build Coastguard Worker     }
100*cc02d7e2SAndroid Build Coastguard Worker 
101*cc02d7e2SAndroid Build Coastguard Worker     int tpc = std::max(1, config.threads_per_cq());  // 1 if unspecified
102*cc02d7e2SAndroid Build Coastguard Worker     int num_cqs = (num_threads + tpc - 1) / tpc;     // ceiling operator
103*cc02d7e2SAndroid Build Coastguard Worker     for (int i = 0; i < num_cqs; i++) {
104*cc02d7e2SAndroid Build Coastguard Worker       srv_cqs_.emplace_back(builder->AddCompletionQueue());
105*cc02d7e2SAndroid Build Coastguard Worker     }
106*cc02d7e2SAndroid Build Coastguard Worker     for (int i = 0; i < num_threads; i++) {
107*cc02d7e2SAndroid Build Coastguard Worker       cq_.emplace_back(i % srv_cqs_.size());
108*cc02d7e2SAndroid Build Coastguard Worker     }
109*cc02d7e2SAndroid Build Coastguard Worker 
110*cc02d7e2SAndroid Build Coastguard Worker     ApplyConfigToBuilder(config, builder.get());
111*cc02d7e2SAndroid Build Coastguard Worker 
112*cc02d7e2SAndroid Build Coastguard Worker     server_ = builder->BuildAndStart();
113*cc02d7e2SAndroid Build Coastguard Worker     if (server_ == nullptr) {
114*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
115*cc02d7e2SAndroid Build Coastguard Worker     } else {
116*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
117*cc02d7e2SAndroid Build Coastguard Worker     }
118*cc02d7e2SAndroid Build Coastguard Worker 
119*cc02d7e2SAndroid Build Coastguard Worker     auto process_rpc_bound =
120*cc02d7e2SAndroid Build Coastguard Worker         std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
121*cc02d7e2SAndroid Build Coastguard Worker                   std::placeholders::_2);
122*cc02d7e2SAndroid Build Coastguard Worker 
123*cc02d7e2SAndroid Build Coastguard Worker     for (int i = 0; i < 5000; i++) {
124*cc02d7e2SAndroid Build Coastguard Worker       for (int j = 0; j < num_cqs; j++) {
125*cc02d7e2SAndroid Build Coastguard Worker         if (request_unary_function) {
126*cc02d7e2SAndroid Build Coastguard Worker           auto request_unary = std::bind(
127*cc02d7e2SAndroid Build Coastguard Worker               request_unary_function, &async_service_, std::placeholders::_1,
128*cc02d7e2SAndroid Build Coastguard Worker               std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(),
129*cc02d7e2SAndroid Build Coastguard Worker               srv_cqs_[j].get(), std::placeholders::_4);
130*cc02d7e2SAndroid Build Coastguard Worker           contexts_.emplace_back(
131*cc02d7e2SAndroid Build Coastguard Worker               new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound));
132*cc02d7e2SAndroid Build Coastguard Worker         }
133*cc02d7e2SAndroid Build Coastguard Worker         if (request_streaming_function) {
134*cc02d7e2SAndroid Build Coastguard Worker           auto request_streaming = std::bind(
135*cc02d7e2SAndroid Build Coastguard Worker               request_streaming_function, &async_service_,
136*cc02d7e2SAndroid Build Coastguard Worker               std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
137*cc02d7e2SAndroid Build Coastguard Worker               srv_cqs_[j].get(), std::placeholders::_3);
138*cc02d7e2SAndroid Build Coastguard Worker           contexts_.emplace_back(new ServerRpcContextStreamingImpl(
139*cc02d7e2SAndroid Build Coastguard Worker               request_streaming, process_rpc_bound));
140*cc02d7e2SAndroid Build Coastguard Worker         }
141*cc02d7e2SAndroid Build Coastguard Worker         if (request_streaming_from_client_function) {
142*cc02d7e2SAndroid Build Coastguard Worker           auto request_streaming_from_client = std::bind(
143*cc02d7e2SAndroid Build Coastguard Worker               request_streaming_from_client_function, &async_service_,
144*cc02d7e2SAndroid Build Coastguard Worker               std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
145*cc02d7e2SAndroid Build Coastguard Worker               srv_cqs_[j].get(), std::placeholders::_3);
146*cc02d7e2SAndroid Build Coastguard Worker           contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl(
147*cc02d7e2SAndroid Build Coastguard Worker               request_streaming_from_client, process_rpc_bound));
148*cc02d7e2SAndroid Build Coastguard Worker         }
149*cc02d7e2SAndroid Build Coastguard Worker         if (request_streaming_from_server_function) {
150*cc02d7e2SAndroid Build Coastguard Worker           auto request_streaming_from_server =
151*cc02d7e2SAndroid Build Coastguard Worker               std::bind(request_streaming_from_server_function, &async_service_,
152*cc02d7e2SAndroid Build Coastguard Worker                         std::placeholders::_1, std::placeholders::_2,
153*cc02d7e2SAndroid Build Coastguard Worker                         std::placeholders::_3, srv_cqs_[j].get(),
154*cc02d7e2SAndroid Build Coastguard Worker                         srv_cqs_[j].get(), std::placeholders::_4);
155*cc02d7e2SAndroid Build Coastguard Worker           contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl(
156*cc02d7e2SAndroid Build Coastguard Worker               request_streaming_from_server, process_rpc_bound));
157*cc02d7e2SAndroid Build Coastguard Worker         }
158*cc02d7e2SAndroid Build Coastguard Worker         if (request_streaming_both_ways_function) {
159*cc02d7e2SAndroid Build Coastguard Worker           // TODO(vjpai): Add this code
160*cc02d7e2SAndroid Build Coastguard Worker         }
161*cc02d7e2SAndroid Build Coastguard Worker       }
162*cc02d7e2SAndroid Build Coastguard Worker     }
163*cc02d7e2SAndroid Build Coastguard Worker 
164*cc02d7e2SAndroid Build Coastguard Worker     for (int i = 0; i < num_threads; i++) {
165*cc02d7e2SAndroid Build Coastguard Worker       shutdown_state_.emplace_back(new PerThreadShutdownState());
166*cc02d7e2SAndroid Build Coastguard Worker       threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
167*cc02d7e2SAndroid Build Coastguard Worker     }
168*cc02d7e2SAndroid Build Coastguard Worker   }
~AsyncQpsServerTest()169*cc02d7e2SAndroid Build Coastguard Worker   ~AsyncQpsServerTest() override {
170*cc02d7e2SAndroid Build Coastguard Worker     for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
171*cc02d7e2SAndroid Build Coastguard Worker       std::lock_guard<std::mutex> lock((*ss)->mutex);
172*cc02d7e2SAndroid Build Coastguard Worker       (*ss)->shutdown = true;
173*cc02d7e2SAndroid Build Coastguard Worker     }
174*cc02d7e2SAndroid Build Coastguard Worker     // TODO(vjpai): Remove the following deadline and allow full proper
175*cc02d7e2SAndroid Build Coastguard Worker     // shutdown.
176*cc02d7e2SAndroid Build Coastguard Worker     server_->Shutdown(std::chrono::system_clock::now() +
177*cc02d7e2SAndroid Build Coastguard Worker                       std::chrono::seconds(3));
178*cc02d7e2SAndroid Build Coastguard Worker     for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
179*cc02d7e2SAndroid Build Coastguard Worker       (*cq)->Shutdown();
180*cc02d7e2SAndroid Build Coastguard Worker     }
181*cc02d7e2SAndroid Build Coastguard Worker     for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
182*cc02d7e2SAndroid Build Coastguard Worker       thr->join();
183*cc02d7e2SAndroid Build Coastguard Worker     }
184*cc02d7e2SAndroid Build Coastguard Worker     for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
185*cc02d7e2SAndroid Build Coastguard Worker       bool ok;
186*cc02d7e2SAndroid Build Coastguard Worker       void* got_tag;
187*cc02d7e2SAndroid Build Coastguard Worker       while ((*cq)->Next(&got_tag, &ok)) {
188*cc02d7e2SAndroid Build Coastguard Worker       }
189*cc02d7e2SAndroid Build Coastguard Worker     }
190*cc02d7e2SAndroid Build Coastguard Worker   }
191*cc02d7e2SAndroid Build Coastguard Worker 
GetPollCount()192*cc02d7e2SAndroid Build Coastguard Worker   int GetPollCount() override {
193*cc02d7e2SAndroid Build Coastguard Worker     int count = 0;
194*cc02d7e2SAndroid Build Coastguard Worker     for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
195*cc02d7e2SAndroid Build Coastguard Worker       count += grpc_get_cq_poll_num((*cq)->cq());
196*cc02d7e2SAndroid Build Coastguard Worker     }
197*cc02d7e2SAndroid Build Coastguard Worker     return count;
198*cc02d7e2SAndroid Build Coastguard Worker   }
199*cc02d7e2SAndroid Build Coastguard Worker 
InProcessChannel(const ChannelArguments & args)200*cc02d7e2SAndroid Build Coastguard Worker   std::shared_ptr<Channel> InProcessChannel(
201*cc02d7e2SAndroid Build Coastguard Worker       const ChannelArguments& args) override {
202*cc02d7e2SAndroid Build Coastguard Worker     return server_->InProcessChannel(args);
203*cc02d7e2SAndroid Build Coastguard Worker   }
204*cc02d7e2SAndroid Build Coastguard Worker 
205*cc02d7e2SAndroid Build Coastguard Worker  private:
ThreadFunc(int thread_idx)206*cc02d7e2SAndroid Build Coastguard Worker   void ThreadFunc(int thread_idx) {
207*cc02d7e2SAndroid Build Coastguard Worker     // Wait until work is available or we are shutting down
208*cc02d7e2SAndroid Build Coastguard Worker     bool ok;
209*cc02d7e2SAndroid Build Coastguard Worker     void* got_tag;
210*cc02d7e2SAndroid Build Coastguard Worker     if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
211*cc02d7e2SAndroid Build Coastguard Worker       return;
212*cc02d7e2SAndroid Build Coastguard Worker     }
213*cc02d7e2SAndroid Build Coastguard Worker     ServerRpcContext* ctx;
214*cc02d7e2SAndroid Build Coastguard Worker     std::mutex* mu_ptr = &shutdown_state_[thread_idx]->mutex;
215*cc02d7e2SAndroid Build Coastguard Worker     do {
216*cc02d7e2SAndroid Build Coastguard Worker       ctx = detag(got_tag);
217*cc02d7e2SAndroid Build Coastguard Worker       // The tag is a pointer to an RPC context to invoke
218*cc02d7e2SAndroid Build Coastguard Worker       // Proceed while holding a lock to make sure that
219*cc02d7e2SAndroid Build Coastguard Worker       // this thread isn't supposed to shut down
220*cc02d7e2SAndroid Build Coastguard Worker       mu_ptr->lock();
221*cc02d7e2SAndroid Build Coastguard Worker       if (shutdown_state_[thread_idx]->shutdown) {
222*cc02d7e2SAndroid Build Coastguard Worker         mu_ptr->unlock();
223*cc02d7e2SAndroid Build Coastguard Worker         return;
224*cc02d7e2SAndroid Build Coastguard Worker       }
225*cc02d7e2SAndroid Build Coastguard Worker     } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
226*cc02d7e2SAndroid Build Coastguard Worker         [&, ctx, ok, mu_ptr]() {
227*cc02d7e2SAndroid Build Coastguard Worker           ctx->lock();
228*cc02d7e2SAndroid Build Coastguard Worker           if (!ctx->RunNextState(ok)) {
229*cc02d7e2SAndroid Build Coastguard Worker             ctx->Reset();
230*cc02d7e2SAndroid Build Coastguard Worker           }
231*cc02d7e2SAndroid Build Coastguard Worker           ctx->unlock();
232*cc02d7e2SAndroid Build Coastguard Worker           mu_ptr->unlock();
233*cc02d7e2SAndroid Build Coastguard Worker         },
234*cc02d7e2SAndroid Build Coastguard Worker         &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
235*cc02d7e2SAndroid Build Coastguard Worker   }
236*cc02d7e2SAndroid Build Coastguard Worker 
237*cc02d7e2SAndroid Build Coastguard Worker   class ServerRpcContext {
238*cc02d7e2SAndroid Build Coastguard Worker    public:
ServerRpcContext()239*cc02d7e2SAndroid Build Coastguard Worker     ServerRpcContext() {}
lock()240*cc02d7e2SAndroid Build Coastguard Worker     void lock() { mu_.lock(); }
unlock()241*cc02d7e2SAndroid Build Coastguard Worker     void unlock() { mu_.unlock(); }
~ServerRpcContext()242*cc02d7e2SAndroid Build Coastguard Worker     virtual ~ServerRpcContext(){};
243*cc02d7e2SAndroid Build Coastguard Worker     virtual bool RunNextState(bool) = 0;  // next state, return false if done
244*cc02d7e2SAndroid Build Coastguard Worker     virtual void Reset() = 0;             // start this back at a clean state
245*cc02d7e2SAndroid Build Coastguard Worker    private:
246*cc02d7e2SAndroid Build Coastguard Worker     std::mutex mu_;
247*cc02d7e2SAndroid Build Coastguard Worker   };
tag(ServerRpcContext * func)248*cc02d7e2SAndroid Build Coastguard Worker   static void* tag(ServerRpcContext* func) { return static_cast<void*>(func); }
detag(void * tag)249*cc02d7e2SAndroid Build Coastguard Worker   static ServerRpcContext* detag(void* tag) {
250*cc02d7e2SAndroid Build Coastguard Worker     return static_cast<ServerRpcContext*>(tag);
251*cc02d7e2SAndroid Build Coastguard Worker   }
252*cc02d7e2SAndroid Build Coastguard Worker 
253*cc02d7e2SAndroid Build Coastguard Worker   class ServerRpcContextUnaryImpl final : public ServerRpcContext {
254*cc02d7e2SAndroid Build Coastguard Worker    public:
ServerRpcContextUnaryImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncResponseWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)255*cc02d7e2SAndroid Build Coastguard Worker     ServerRpcContextUnaryImpl(
256*cc02d7e2SAndroid Build Coastguard Worker         std::function<void(ServerContextType*, RequestType*,
257*cc02d7e2SAndroid Build Coastguard Worker                            grpc::ServerAsyncResponseWriter<ResponseType>*,
258*cc02d7e2SAndroid Build Coastguard Worker                            void*)>
259*cc02d7e2SAndroid Build Coastguard Worker             request_method,
260*cc02d7e2SAndroid Build Coastguard Worker         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
261*cc02d7e2SAndroid Build Coastguard Worker         : srv_ctx_(new ServerContextType),
262*cc02d7e2SAndroid Build Coastguard Worker           next_state_(&ServerRpcContextUnaryImpl::invoker),
263*cc02d7e2SAndroid Build Coastguard Worker           request_method_(request_method),
264*cc02d7e2SAndroid Build Coastguard Worker           invoke_method_(invoke_method),
265*cc02d7e2SAndroid Build Coastguard Worker           response_writer_(srv_ctx_.get()) {
266*cc02d7e2SAndroid Build Coastguard Worker       request_method_(srv_ctx_.get(), &req_, &response_writer_,
267*cc02d7e2SAndroid Build Coastguard Worker                       AsyncQpsServerTest::tag(this));
268*cc02d7e2SAndroid Build Coastguard Worker     }
~ServerRpcContextUnaryImpl()269*cc02d7e2SAndroid Build Coastguard Worker     ~ServerRpcContextUnaryImpl() override {}
RunNextState(bool ok)270*cc02d7e2SAndroid Build Coastguard Worker     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()271*cc02d7e2SAndroid Build Coastguard Worker     void Reset() override {
272*cc02d7e2SAndroid Build Coastguard Worker       srv_ctx_.reset(new ServerContextType);
273*cc02d7e2SAndroid Build Coastguard Worker       req_ = RequestType();
274*cc02d7e2SAndroid Build Coastguard Worker       response_writer_ =
275*cc02d7e2SAndroid Build Coastguard Worker           grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get());
276*cc02d7e2SAndroid Build Coastguard Worker 
277*cc02d7e2SAndroid Build Coastguard Worker       // Then request the method
278*cc02d7e2SAndroid Build Coastguard Worker       next_state_ = &ServerRpcContextUnaryImpl::invoker;
279*cc02d7e2SAndroid Build Coastguard Worker       request_method_(srv_ctx_.get(), &req_, &response_writer_,
280*cc02d7e2SAndroid Build Coastguard Worker                       AsyncQpsServerTest::tag(this));
281*cc02d7e2SAndroid Build Coastguard Worker     }
282*cc02d7e2SAndroid Build Coastguard Worker 
283*cc02d7e2SAndroid Build Coastguard Worker    private:
finisher(bool)284*cc02d7e2SAndroid Build Coastguard Worker     bool finisher(bool) { return false; }
invoker(bool ok)285*cc02d7e2SAndroid Build Coastguard Worker     bool invoker(bool ok) {
286*cc02d7e2SAndroid Build Coastguard Worker       if (!ok) {
287*cc02d7e2SAndroid Build Coastguard Worker         return false;
288*cc02d7e2SAndroid Build Coastguard Worker       }
289*cc02d7e2SAndroid Build Coastguard Worker 
290*cc02d7e2SAndroid Build Coastguard Worker       // Call the RPC processing function
291*cc02d7e2SAndroid Build Coastguard Worker       grpc::Status status = invoke_method_(&req_, &response_);
292*cc02d7e2SAndroid Build Coastguard Worker 
293*cc02d7e2SAndroid Build Coastguard Worker       // Have the response writer work and invoke on_finish when done
294*cc02d7e2SAndroid Build Coastguard Worker       next_state_ = &ServerRpcContextUnaryImpl::finisher;
295*cc02d7e2SAndroid Build Coastguard Worker       response_writer_.Finish(response_, status, AsyncQpsServerTest::tag(this));
296*cc02d7e2SAndroid Build Coastguard Worker       return true;
297*cc02d7e2SAndroid Build Coastguard Worker     }
298*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<ServerContextType> srv_ctx_;
299*cc02d7e2SAndroid Build Coastguard Worker     RequestType req_;
300*cc02d7e2SAndroid Build Coastguard Worker     ResponseType response_;
301*cc02d7e2SAndroid Build Coastguard Worker     bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
302*cc02d7e2SAndroid Build Coastguard Worker     std::function<void(ServerContextType*, RequestType*,
303*cc02d7e2SAndroid Build Coastguard Worker                        grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
304*cc02d7e2SAndroid Build Coastguard Worker         request_method_;
305*cc02d7e2SAndroid Build Coastguard Worker     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
306*cc02d7e2SAndroid Build Coastguard Worker     grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
307*cc02d7e2SAndroid Build Coastguard Worker   };
308*cc02d7e2SAndroid Build Coastguard Worker 
309*cc02d7e2SAndroid Build Coastguard Worker   class ServerRpcContextStreamingImpl final : public ServerRpcContext {
310*cc02d7e2SAndroid Build Coastguard Worker    public:
ServerRpcContextStreamingImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReaderWriter<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)311*cc02d7e2SAndroid Build Coastguard Worker     ServerRpcContextStreamingImpl(
312*cc02d7e2SAndroid Build Coastguard Worker         std::function<void(
313*cc02d7e2SAndroid Build Coastguard Worker             ServerContextType*,
314*cc02d7e2SAndroid Build Coastguard Worker             grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
315*cc02d7e2SAndroid Build Coastguard Worker             request_method,
316*cc02d7e2SAndroid Build Coastguard Worker         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
317*cc02d7e2SAndroid Build Coastguard Worker         : srv_ctx_(new ServerContextType),
318*cc02d7e2SAndroid Build Coastguard Worker           next_state_(&ServerRpcContextStreamingImpl::request_done),
319*cc02d7e2SAndroid Build Coastguard Worker           request_method_(request_method),
320*cc02d7e2SAndroid Build Coastguard Worker           invoke_method_(invoke_method),
321*cc02d7e2SAndroid Build Coastguard Worker           stream_(srv_ctx_.get()) {
322*cc02d7e2SAndroid Build Coastguard Worker       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
323*cc02d7e2SAndroid Build Coastguard Worker     }
~ServerRpcContextStreamingImpl()324*cc02d7e2SAndroid Build Coastguard Worker     ~ServerRpcContextStreamingImpl() override {}
RunNextState(bool ok)325*cc02d7e2SAndroid Build Coastguard Worker     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()326*cc02d7e2SAndroid Build Coastguard Worker     void Reset() override {
327*cc02d7e2SAndroid Build Coastguard Worker       srv_ctx_.reset(new ServerContextType);
328*cc02d7e2SAndroid Build Coastguard Worker       req_ = RequestType();
329*cc02d7e2SAndroid Build Coastguard Worker       stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(
330*cc02d7e2SAndroid Build Coastguard Worker           srv_ctx_.get());
331*cc02d7e2SAndroid Build Coastguard Worker 
332*cc02d7e2SAndroid Build Coastguard Worker       // Then request the method
333*cc02d7e2SAndroid Build Coastguard Worker       next_state_ = &ServerRpcContextStreamingImpl::request_done;
334*cc02d7e2SAndroid Build Coastguard Worker       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
335*cc02d7e2SAndroid Build Coastguard Worker     }
336*cc02d7e2SAndroid Build Coastguard Worker 
337*cc02d7e2SAndroid Build Coastguard Worker    private:
request_done(bool ok)338*cc02d7e2SAndroid Build Coastguard Worker     bool request_done(bool ok) {
339*cc02d7e2SAndroid Build Coastguard Worker       if (!ok) {
340*cc02d7e2SAndroid Build Coastguard Worker         return false;
341*cc02d7e2SAndroid Build Coastguard Worker       }
342*cc02d7e2SAndroid Build Coastguard Worker       next_state_ = &ServerRpcContextStreamingImpl::read_done;
343*cc02d7e2SAndroid Build Coastguard Worker       stream_.Read(&req_, AsyncQpsServerTest::tag(this));
344*cc02d7e2SAndroid Build Coastguard Worker       return true;
345*cc02d7e2SAndroid Build Coastguard Worker     }
346*cc02d7e2SAndroid Build Coastguard Worker 
read_done(bool ok)347*cc02d7e2SAndroid Build Coastguard Worker     bool read_done(bool ok) {
348*cc02d7e2SAndroid Build Coastguard Worker       if (ok) {
349*cc02d7e2SAndroid Build Coastguard Worker         // invoke the method
350*cc02d7e2SAndroid Build Coastguard Worker         // Call the RPC processing function
351*cc02d7e2SAndroid Build Coastguard Worker         grpc::Status status = invoke_method_(&req_, &response_);
352*cc02d7e2SAndroid Build Coastguard Worker         // initiate the write
353*cc02d7e2SAndroid Build Coastguard Worker         next_state_ = &ServerRpcContextStreamingImpl::write_done;
354*cc02d7e2SAndroid Build Coastguard Worker         stream_.Write(response_, AsyncQpsServerTest::tag(this));
355*cc02d7e2SAndroid Build Coastguard Worker       } else {  // client has sent writes done
356*cc02d7e2SAndroid Build Coastguard Worker         // finish the stream
357*cc02d7e2SAndroid Build Coastguard Worker         next_state_ = &ServerRpcContextStreamingImpl::finish_done;
358*cc02d7e2SAndroid Build Coastguard Worker         stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
359*cc02d7e2SAndroid Build Coastguard Worker       }
360*cc02d7e2SAndroid Build Coastguard Worker       return true;
361*cc02d7e2SAndroid Build Coastguard Worker     }
write_done(bool ok)362*cc02d7e2SAndroid Build Coastguard Worker     bool write_done(bool ok) {
363*cc02d7e2SAndroid Build Coastguard Worker       // now go back and get another streaming read!
364*cc02d7e2SAndroid Build Coastguard Worker       if (ok) {
365*cc02d7e2SAndroid Build Coastguard Worker         next_state_ = &ServerRpcContextStreamingImpl::read_done;
366*cc02d7e2SAndroid Build Coastguard Worker         stream_.Read(&req_, AsyncQpsServerTest::tag(this));
367*cc02d7e2SAndroid Build Coastguard Worker       } else {
368*cc02d7e2SAndroid Build Coastguard Worker         next_state_ = &ServerRpcContextStreamingImpl::finish_done;
369*cc02d7e2SAndroid Build Coastguard Worker         stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
370*cc02d7e2SAndroid Build Coastguard Worker       }
371*cc02d7e2SAndroid Build Coastguard Worker       return true;
372*cc02d7e2SAndroid Build Coastguard Worker     }
finish_done(bool)373*cc02d7e2SAndroid Build Coastguard Worker     bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
374*cc02d7e2SAndroid Build Coastguard Worker 
375*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<ServerContextType> srv_ctx_;
376*cc02d7e2SAndroid Build Coastguard Worker     RequestType req_;
377*cc02d7e2SAndroid Build Coastguard Worker     ResponseType response_;
378*cc02d7e2SAndroid Build Coastguard Worker     bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
379*cc02d7e2SAndroid Build Coastguard Worker     std::function<void(
380*cc02d7e2SAndroid Build Coastguard Worker         ServerContextType*,
381*cc02d7e2SAndroid Build Coastguard Worker         grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
382*cc02d7e2SAndroid Build Coastguard Worker         request_method_;
383*cc02d7e2SAndroid Build Coastguard Worker     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
384*cc02d7e2SAndroid Build Coastguard Worker     grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
385*cc02d7e2SAndroid Build Coastguard Worker   };
386*cc02d7e2SAndroid Build Coastguard Worker 
387*cc02d7e2SAndroid Build Coastguard Worker   class ServerRpcContextStreamingFromClientImpl final
388*cc02d7e2SAndroid Build Coastguard Worker       : public ServerRpcContext {
389*cc02d7e2SAndroid Build Coastguard Worker    public:
ServerRpcContextStreamingFromClientImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReader<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)390*cc02d7e2SAndroid Build Coastguard Worker     ServerRpcContextStreamingFromClientImpl(
391*cc02d7e2SAndroid Build Coastguard Worker         std::function<void(ServerContextType*,
392*cc02d7e2SAndroid Build Coastguard Worker                            grpc::ServerAsyncReader<ResponseType, RequestType>*,
393*cc02d7e2SAndroid Build Coastguard Worker                            void*)>
394*cc02d7e2SAndroid Build Coastguard Worker             request_method,
395*cc02d7e2SAndroid Build Coastguard Worker         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
396*cc02d7e2SAndroid Build Coastguard Worker         : srv_ctx_(new ServerContextType),
397*cc02d7e2SAndroid Build Coastguard Worker           next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
398*cc02d7e2SAndroid Build Coastguard Worker           request_method_(request_method),
399*cc02d7e2SAndroid Build Coastguard Worker           invoke_method_(invoke_method),
400*cc02d7e2SAndroid Build Coastguard Worker           stream_(srv_ctx_.get()) {
401*cc02d7e2SAndroid Build Coastguard Worker       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
402*cc02d7e2SAndroid Build Coastguard Worker     }
~ServerRpcContextStreamingFromClientImpl()403*cc02d7e2SAndroid Build Coastguard Worker     ~ServerRpcContextStreamingFromClientImpl() override {}
RunNextState(bool ok)404*cc02d7e2SAndroid Build Coastguard Worker     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()405*cc02d7e2SAndroid Build Coastguard Worker     void Reset() override {
406*cc02d7e2SAndroid Build Coastguard Worker       srv_ctx_.reset(new ServerContextType);
407*cc02d7e2SAndroid Build Coastguard Worker       req_ = RequestType();
408*cc02d7e2SAndroid Build Coastguard Worker       stream_ =
409*cc02d7e2SAndroid Build Coastguard Worker           grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get());
410*cc02d7e2SAndroid Build Coastguard Worker 
411*cc02d7e2SAndroid Build Coastguard Worker       // Then request the method
412*cc02d7e2SAndroid Build Coastguard Worker       next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done;
413*cc02d7e2SAndroid Build Coastguard Worker       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
414*cc02d7e2SAndroid Build Coastguard Worker     }
415*cc02d7e2SAndroid Build Coastguard Worker 
416*cc02d7e2SAndroid Build Coastguard Worker    private:
request_done(bool ok)417*cc02d7e2SAndroid Build Coastguard Worker     bool request_done(bool ok) {
418*cc02d7e2SAndroid Build Coastguard Worker       if (!ok) {
419*cc02d7e2SAndroid Build Coastguard Worker         return false;
420*cc02d7e2SAndroid Build Coastguard Worker       }
421*cc02d7e2SAndroid Build Coastguard Worker       next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done;
422*cc02d7e2SAndroid Build Coastguard Worker       stream_.Read(&req_, AsyncQpsServerTest::tag(this));
423*cc02d7e2SAndroid Build Coastguard Worker       return true;
424*cc02d7e2SAndroid Build Coastguard Worker     }
425*cc02d7e2SAndroid Build Coastguard Worker 
read_done(bool ok)426*cc02d7e2SAndroid Build Coastguard Worker     bool read_done(bool ok) {
427*cc02d7e2SAndroid Build Coastguard Worker       if (ok) {
428*cc02d7e2SAndroid Build Coastguard Worker         // In this case, just do another read
429*cc02d7e2SAndroid Build Coastguard Worker         // next_state_ is unchanged
430*cc02d7e2SAndroid Build Coastguard Worker         stream_.Read(&req_, AsyncQpsServerTest::tag(this));
431*cc02d7e2SAndroid Build Coastguard Worker         return true;
432*cc02d7e2SAndroid Build Coastguard Worker       } else {  // client has sent writes done
433*cc02d7e2SAndroid Build Coastguard Worker         // invoke the method
434*cc02d7e2SAndroid Build Coastguard Worker         // Call the RPC processing function
435*cc02d7e2SAndroid Build Coastguard Worker         grpc::Status status = invoke_method_(&req_, &response_);
436*cc02d7e2SAndroid Build Coastguard Worker         // finish the stream
437*cc02d7e2SAndroid Build Coastguard Worker         next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done;
438*cc02d7e2SAndroid Build Coastguard Worker         stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this));
439*cc02d7e2SAndroid Build Coastguard Worker       }
440*cc02d7e2SAndroid Build Coastguard Worker       return true;
441*cc02d7e2SAndroid Build Coastguard Worker     }
finish_done(bool)442*cc02d7e2SAndroid Build Coastguard Worker     bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
443*cc02d7e2SAndroid Build Coastguard Worker 
444*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<ServerContextType> srv_ctx_;
445*cc02d7e2SAndroid Build Coastguard Worker     RequestType req_;
446*cc02d7e2SAndroid Build Coastguard Worker     ResponseType response_;
447*cc02d7e2SAndroid Build Coastguard Worker     bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool);
448*cc02d7e2SAndroid Build Coastguard Worker     std::function<void(ServerContextType*,
449*cc02d7e2SAndroid Build Coastguard Worker                        grpc::ServerAsyncReader<ResponseType, RequestType>*,
450*cc02d7e2SAndroid Build Coastguard Worker                        void*)>
451*cc02d7e2SAndroid Build Coastguard Worker         request_method_;
452*cc02d7e2SAndroid Build Coastguard Worker     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
453*cc02d7e2SAndroid Build Coastguard Worker     grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
454*cc02d7e2SAndroid Build Coastguard Worker   };
455*cc02d7e2SAndroid Build Coastguard Worker 
456*cc02d7e2SAndroid Build Coastguard Worker   class ServerRpcContextStreamingFromServerImpl final
457*cc02d7e2SAndroid Build Coastguard Worker       : public ServerRpcContext {
458*cc02d7e2SAndroid Build Coastguard Worker    public:
ServerRpcContextStreamingFromServerImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)459*cc02d7e2SAndroid Build Coastguard Worker     ServerRpcContextStreamingFromServerImpl(
460*cc02d7e2SAndroid Build Coastguard Worker         std::function<void(ServerContextType*, RequestType*,
461*cc02d7e2SAndroid Build Coastguard Worker                            grpc::ServerAsyncWriter<ResponseType>*, void*)>
462*cc02d7e2SAndroid Build Coastguard Worker             request_method,
463*cc02d7e2SAndroid Build Coastguard Worker         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
464*cc02d7e2SAndroid Build Coastguard Worker         : srv_ctx_(new ServerContextType),
465*cc02d7e2SAndroid Build Coastguard Worker           next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
466*cc02d7e2SAndroid Build Coastguard Worker           request_method_(request_method),
467*cc02d7e2SAndroid Build Coastguard Worker           invoke_method_(invoke_method),
468*cc02d7e2SAndroid Build Coastguard Worker           stream_(srv_ctx_.get()) {
469*cc02d7e2SAndroid Build Coastguard Worker       request_method_(srv_ctx_.get(), &req_, &stream_,
470*cc02d7e2SAndroid Build Coastguard Worker                       AsyncQpsServerTest::tag(this));
471*cc02d7e2SAndroid Build Coastguard Worker     }
~ServerRpcContextStreamingFromServerImpl()472*cc02d7e2SAndroid Build Coastguard Worker     ~ServerRpcContextStreamingFromServerImpl() override {}
RunNextState(bool ok)473*cc02d7e2SAndroid Build Coastguard Worker     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()474*cc02d7e2SAndroid Build Coastguard Worker     void Reset() override {
475*cc02d7e2SAndroid Build Coastguard Worker       srv_ctx_.reset(new ServerContextType);
476*cc02d7e2SAndroid Build Coastguard Worker       req_ = RequestType();
477*cc02d7e2SAndroid Build Coastguard Worker       stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get());
478*cc02d7e2SAndroid Build Coastguard Worker 
479*cc02d7e2SAndroid Build Coastguard Worker       // Then request the method
480*cc02d7e2SAndroid Build Coastguard Worker       next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done;
481*cc02d7e2SAndroid Build Coastguard Worker       request_method_(srv_ctx_.get(), &req_, &stream_,
482*cc02d7e2SAndroid Build Coastguard Worker                       AsyncQpsServerTest::tag(this));
483*cc02d7e2SAndroid Build Coastguard Worker     }
484*cc02d7e2SAndroid Build Coastguard Worker 
485*cc02d7e2SAndroid Build Coastguard Worker    private:
request_done(bool ok)486*cc02d7e2SAndroid Build Coastguard Worker     bool request_done(bool ok) {
487*cc02d7e2SAndroid Build Coastguard Worker       if (!ok) {
488*cc02d7e2SAndroid Build Coastguard Worker         return false;
489*cc02d7e2SAndroid Build Coastguard Worker       }
490*cc02d7e2SAndroid Build Coastguard Worker       // invoke the method
491*cc02d7e2SAndroid Build Coastguard Worker       // Call the RPC processing function
492*cc02d7e2SAndroid Build Coastguard Worker       grpc::Status status = invoke_method_(&req_, &response_);
493*cc02d7e2SAndroid Build Coastguard Worker 
494*cc02d7e2SAndroid Build Coastguard Worker       next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done;
495*cc02d7e2SAndroid Build Coastguard Worker       stream_.Write(response_, AsyncQpsServerTest::tag(this));
496*cc02d7e2SAndroid Build Coastguard Worker       return true;
497*cc02d7e2SAndroid Build Coastguard Worker     }
498*cc02d7e2SAndroid Build Coastguard Worker 
write_done(bool ok)499*cc02d7e2SAndroid Build Coastguard Worker     bool write_done(bool ok) {
500*cc02d7e2SAndroid Build Coastguard Worker       if (ok) {
501*cc02d7e2SAndroid Build Coastguard Worker         // Do another write!
502*cc02d7e2SAndroid Build Coastguard Worker         // next_state_ is unchanged
503*cc02d7e2SAndroid Build Coastguard Worker         stream_.Write(response_, AsyncQpsServerTest::tag(this));
504*cc02d7e2SAndroid Build Coastguard Worker       } else {  // must be done so let's finish
505*cc02d7e2SAndroid Build Coastguard Worker         next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done;
506*cc02d7e2SAndroid Build Coastguard Worker         stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
507*cc02d7e2SAndroid Build Coastguard Worker       }
508*cc02d7e2SAndroid Build Coastguard Worker       return true;
509*cc02d7e2SAndroid Build Coastguard Worker     }
finish_done(bool)510*cc02d7e2SAndroid Build Coastguard Worker     bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
511*cc02d7e2SAndroid Build Coastguard Worker 
512*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<ServerContextType> srv_ctx_;
513*cc02d7e2SAndroid Build Coastguard Worker     RequestType req_;
514*cc02d7e2SAndroid Build Coastguard Worker     ResponseType response_;
515*cc02d7e2SAndroid Build Coastguard Worker     bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool);
516*cc02d7e2SAndroid Build Coastguard Worker     std::function<void(ServerContextType*, RequestType*,
517*cc02d7e2SAndroid Build Coastguard Worker                        grpc::ServerAsyncWriter<ResponseType>*, void*)>
518*cc02d7e2SAndroid Build Coastguard Worker         request_method_;
519*cc02d7e2SAndroid Build Coastguard Worker     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
520*cc02d7e2SAndroid Build Coastguard Worker     grpc::ServerAsyncWriter<ResponseType> stream_;
521*cc02d7e2SAndroid Build Coastguard Worker   };
522*cc02d7e2SAndroid Build Coastguard Worker 
523*cc02d7e2SAndroid Build Coastguard Worker   std::vector<std::thread> threads_;
524*cc02d7e2SAndroid Build Coastguard Worker   std::unique_ptr<grpc::Server> server_;
525*cc02d7e2SAndroid Build Coastguard Worker   std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
526*cc02d7e2SAndroid Build Coastguard Worker   std::vector<int> cq_;
527*cc02d7e2SAndroid Build Coastguard Worker   ServiceType async_service_;
528*cc02d7e2SAndroid Build Coastguard Worker   std::vector<std::unique_ptr<ServerRpcContext>> contexts_;
529*cc02d7e2SAndroid Build Coastguard Worker 
530*cc02d7e2SAndroid Build Coastguard Worker   struct PerThreadShutdownState {
531*cc02d7e2SAndroid Build Coastguard Worker     mutable std::mutex mutex;
532*cc02d7e2SAndroid Build Coastguard Worker     bool shutdown;
PerThreadShutdownStategrpc::testing::AsyncQpsServerTest::PerThreadShutdownState533*cc02d7e2SAndroid Build Coastguard Worker     PerThreadShutdownState() : shutdown(false) {}
534*cc02d7e2SAndroid Build Coastguard Worker   };
535*cc02d7e2SAndroid Build Coastguard Worker 
536*cc02d7e2SAndroid Build Coastguard Worker   std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
537*cc02d7e2SAndroid Build Coastguard Worker };
538*cc02d7e2SAndroid Build Coastguard Worker 
RegisterBenchmarkService(ServerBuilder * builder,BenchmarkService::AsyncService * service)539*cc02d7e2SAndroid Build Coastguard Worker static void RegisterBenchmarkService(ServerBuilder* builder,
540*cc02d7e2SAndroid Build Coastguard Worker                                      BenchmarkService::AsyncService* service) {
541*cc02d7e2SAndroid Build Coastguard Worker   builder->RegisterService(service);
542*cc02d7e2SAndroid Build Coastguard Worker }
RegisterGenericService(ServerBuilder * builder,grpc::AsyncGenericService * service)543*cc02d7e2SAndroid Build Coastguard Worker static void RegisterGenericService(ServerBuilder* builder,
544*cc02d7e2SAndroid Build Coastguard Worker                                    grpc::AsyncGenericService* service) {
545*cc02d7e2SAndroid Build Coastguard Worker   builder->RegisterAsyncGenericService(service);
546*cc02d7e2SAndroid Build Coastguard Worker }
547*cc02d7e2SAndroid Build Coastguard Worker 
ProcessSimpleRPC(const PayloadConfig &,SimpleRequest * request,SimpleResponse * response)548*cc02d7e2SAndroid Build Coastguard Worker static Status ProcessSimpleRPC(const PayloadConfig&, SimpleRequest* request,
549*cc02d7e2SAndroid Build Coastguard Worker                                SimpleResponse* response) {
550*cc02d7e2SAndroid Build Coastguard Worker   if (request->response_size() > 0) {
551*cc02d7e2SAndroid Build Coastguard Worker     if (!Server::SetPayload(request->response_type(), request->response_size(),
552*cc02d7e2SAndroid Build Coastguard Worker                             response->mutable_payload())) {
553*cc02d7e2SAndroid Build Coastguard Worker       return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
554*cc02d7e2SAndroid Build Coastguard Worker     }
555*cc02d7e2SAndroid Build Coastguard Worker   }
556*cc02d7e2SAndroid Build Coastguard Worker   // We are done using the request. Clear it to reduce working memory.
557*cc02d7e2SAndroid Build Coastguard Worker   // This proves to reduce cache misses in large message size cases.
558*cc02d7e2SAndroid Build Coastguard Worker   request->Clear();
559*cc02d7e2SAndroid Build Coastguard Worker   return Status::OK;
560*cc02d7e2SAndroid Build Coastguard Worker }
561*cc02d7e2SAndroid Build Coastguard Worker 
ProcessGenericRPC(const PayloadConfig & payload_config,ByteBuffer * request,ByteBuffer * response)562*cc02d7e2SAndroid Build Coastguard Worker static Status ProcessGenericRPC(const PayloadConfig& payload_config,
563*cc02d7e2SAndroid Build Coastguard Worker                                 ByteBuffer* request, ByteBuffer* response) {
564*cc02d7e2SAndroid Build Coastguard Worker   // We are done using the request. Clear it to reduce working memory.
565*cc02d7e2SAndroid Build Coastguard Worker   // This proves to reduce cache misses in large message size cases.
566*cc02d7e2SAndroid Build Coastguard Worker   request->Clear();
567*cc02d7e2SAndroid Build Coastguard Worker   int resp_size = payload_config.bytebuf_params().resp_size();
568*cc02d7e2SAndroid Build Coastguard Worker   std::unique_ptr<char[]> buf(new char[resp_size]);
569*cc02d7e2SAndroid Build Coastguard Worker   memset(buf.get(), 0, static_cast<size_t>(resp_size));
570*cc02d7e2SAndroid Build Coastguard Worker   Slice slice(buf.get(), resp_size);
571*cc02d7e2SAndroid Build Coastguard Worker   *response = ByteBuffer(&slice, 1);
572*cc02d7e2SAndroid Build Coastguard Worker   return Status::OK;
573*cc02d7e2SAndroid Build Coastguard Worker }
574*cc02d7e2SAndroid Build Coastguard Worker 
CreateAsyncServer(const ServerConfig & config)575*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) {
576*cc02d7e2SAndroid Build Coastguard Worker   return std::unique_ptr<Server>(
577*cc02d7e2SAndroid Build Coastguard Worker       new AsyncQpsServerTest<SimpleRequest, SimpleResponse,
578*cc02d7e2SAndroid Build Coastguard Worker                              BenchmarkService::AsyncService,
579*cc02d7e2SAndroid Build Coastguard Worker                              grpc::ServerContext>(
580*cc02d7e2SAndroid Build Coastguard Worker           config, RegisterBenchmarkService,
581*cc02d7e2SAndroid Build Coastguard Worker           &BenchmarkService::AsyncService::RequestUnaryCall,
582*cc02d7e2SAndroid Build Coastguard Worker           &BenchmarkService::AsyncService::RequestStreamingCall,
583*cc02d7e2SAndroid Build Coastguard Worker           &BenchmarkService::AsyncService::RequestStreamingFromClient,
584*cc02d7e2SAndroid Build Coastguard Worker           &BenchmarkService::AsyncService::RequestStreamingFromServer,
585*cc02d7e2SAndroid Build Coastguard Worker           &BenchmarkService::AsyncService::RequestStreamingBothWays,
586*cc02d7e2SAndroid Build Coastguard Worker           ProcessSimpleRPC));
587*cc02d7e2SAndroid Build Coastguard Worker }
CreateAsyncGenericServer(const ServerConfig & config)588*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config) {
589*cc02d7e2SAndroid Build Coastguard Worker   return std::unique_ptr<Server>(
590*cc02d7e2SAndroid Build Coastguard Worker       new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,
591*cc02d7e2SAndroid Build Coastguard Worker                              grpc::GenericServerContext>(
592*cc02d7e2SAndroid Build Coastguard Worker           config, RegisterGenericService, nullptr,
593*cc02d7e2SAndroid Build Coastguard Worker           &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
594*cc02d7e2SAndroid Build Coastguard Worker           ProcessGenericRPC));
595*cc02d7e2SAndroid Build Coastguard Worker }
596*cc02d7e2SAndroid Build Coastguard Worker 
597*cc02d7e2SAndroid Build Coastguard Worker }  // namespace testing
598*cc02d7e2SAndroid Build Coastguard Worker }  // namespace grpc
599