1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker // http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker
19*cc02d7e2SAndroid Build Coastguard Worker #include <list>
20*cc02d7e2SAndroid Build Coastguard Worker #include <memory>
21*cc02d7e2SAndroid Build Coastguard Worker #include <mutex>
22*cc02d7e2SAndroid Build Coastguard Worker #include <sstream>
23*cc02d7e2SAndroid Build Coastguard Worker #include <string>
24*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
25*cc02d7e2SAndroid Build Coastguard Worker #include <utility>
26*cc02d7e2SAndroid Build Coastguard Worker #include <vector>
27*cc02d7e2SAndroid Build Coastguard Worker
28*cc02d7e2SAndroid Build Coastguard Worker #include "absl/memory/memory.h"
29*cc02d7e2SAndroid Build Coastguard Worker
30*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/grpc.h>
31*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/cpu.h>
32*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
33*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/alarm.h>
34*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/channel.h>
35*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/client_context.h>
36*cc02d7e2SAndroid Build Coastguard Worker
37*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/crash.h"
38*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
39*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/client.h"
40*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/usage_timer.h"
41*cc02d7e2SAndroid Build Coastguard Worker
42*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
43*cc02d7e2SAndroid Build Coastguard Worker namespace testing {
44*cc02d7e2SAndroid Build Coastguard Worker
45*cc02d7e2SAndroid Build Coastguard Worker ///
46*cc02d7e2SAndroid Build Coastguard Worker /// Maintains context info per RPC
47*cc02d7e2SAndroid Build Coastguard Worker ///
48*cc02d7e2SAndroid Build Coastguard Worker struct CallbackClientRpcContext {
CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext49*cc02d7e2SAndroid Build Coastguard Worker explicit CallbackClientRpcContext(BenchmarkService::Stub* stub)
50*cc02d7e2SAndroid Build Coastguard Worker : alarm_(nullptr), stub_(stub) {}
51*cc02d7e2SAndroid Build Coastguard Worker
~CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext52*cc02d7e2SAndroid Build Coastguard Worker ~CallbackClientRpcContext() {}
53*cc02d7e2SAndroid Build Coastguard Worker
54*cc02d7e2SAndroid Build Coastguard Worker SimpleResponse response_;
55*cc02d7e2SAndroid Build Coastguard Worker ClientContext context_;
56*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Alarm> alarm_;
57*cc02d7e2SAndroid Build Coastguard Worker BenchmarkService::Stub* stub_;
58*cc02d7e2SAndroid Build Coastguard Worker };
59*cc02d7e2SAndroid Build Coastguard Worker
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)60*cc02d7e2SAndroid Build Coastguard Worker static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
61*cc02d7e2SAndroid Build Coastguard Worker const std::shared_ptr<Channel>& ch) {
62*cc02d7e2SAndroid Build Coastguard Worker return BenchmarkService::NewStub(ch);
63*cc02d7e2SAndroid Build Coastguard Worker }
64*cc02d7e2SAndroid Build Coastguard Worker
65*cc02d7e2SAndroid Build Coastguard Worker class CallbackClient
66*cc02d7e2SAndroid Build Coastguard Worker : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
67*cc02d7e2SAndroid Build Coastguard Worker public:
CallbackClient(const ClientConfig & config)68*cc02d7e2SAndroid Build Coastguard Worker explicit CallbackClient(const ClientConfig& config)
69*cc02d7e2SAndroid Build Coastguard Worker : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
70*cc02d7e2SAndroid Build Coastguard Worker config, BenchmarkStubCreator) {
71*cc02d7e2SAndroid Build Coastguard Worker num_threads_ = NumThreads(config);
72*cc02d7e2SAndroid Build Coastguard Worker rpcs_done_ = 0;
73*cc02d7e2SAndroid Build Coastguard Worker
74*cc02d7e2SAndroid Build Coastguard Worker // Don't divide the fixed load among threads as the user threads
75*cc02d7e2SAndroid Build Coastguard Worker // only bootstrap the RPCs
76*cc02d7e2SAndroid Build Coastguard Worker SetupLoadTest(config, 1);
77*cc02d7e2SAndroid Build Coastguard Worker total_outstanding_rpcs_ =
78*cc02d7e2SAndroid Build Coastguard Worker config.client_channels() * config.outstanding_rpcs_per_channel();
79*cc02d7e2SAndroid Build Coastguard Worker }
80*cc02d7e2SAndroid Build Coastguard Worker
~CallbackClient()81*cc02d7e2SAndroid Build Coastguard Worker ~CallbackClient() override {}
82*cc02d7e2SAndroid Build Coastguard Worker
83*cc02d7e2SAndroid Build Coastguard Worker ///
84*cc02d7e2SAndroid Build Coastguard Worker /// The main thread of the benchmark will be waiting on DestroyMultithreading.
85*cc02d7e2SAndroid Build Coastguard Worker /// Increment the rpcs_done_ variable to signify that the Callback RPC
86*cc02d7e2SAndroid Build Coastguard Worker /// after thread completion is done. When the last outstanding rpc increments
87*cc02d7e2SAndroid Build Coastguard Worker /// the counter it should also signal the main thread's conditional variable.
88*cc02d7e2SAndroid Build Coastguard Worker ///
NotifyMainThreadOfThreadCompletion()89*cc02d7e2SAndroid Build Coastguard Worker void NotifyMainThreadOfThreadCompletion() {
90*cc02d7e2SAndroid Build Coastguard Worker std::lock_guard<std::mutex> l(shutdown_mu_);
91*cc02d7e2SAndroid Build Coastguard Worker rpcs_done_++;
92*cc02d7e2SAndroid Build Coastguard Worker if (rpcs_done_ == total_outstanding_rpcs_) {
93*cc02d7e2SAndroid Build Coastguard Worker shutdown_cv_.notify_one();
94*cc02d7e2SAndroid Build Coastguard Worker }
95*cc02d7e2SAndroid Build Coastguard Worker }
96*cc02d7e2SAndroid Build Coastguard Worker
NextRPCIssueTime()97*cc02d7e2SAndroid Build Coastguard Worker gpr_timespec NextRPCIssueTime() {
98*cc02d7e2SAndroid Build Coastguard Worker std::lock_guard<std::mutex> l(next_issue_time_mu_);
99*cc02d7e2SAndroid Build Coastguard Worker return Client::NextIssueTime(0);
100*cc02d7e2SAndroid Build Coastguard Worker }
101*cc02d7e2SAndroid Build Coastguard Worker
102*cc02d7e2SAndroid Build Coastguard Worker protected:
103*cc02d7e2SAndroid Build Coastguard Worker size_t num_threads_;
104*cc02d7e2SAndroid Build Coastguard Worker size_t total_outstanding_rpcs_;
105*cc02d7e2SAndroid Build Coastguard Worker // The below mutex and condition variable is used by main benchmark thread to
106*cc02d7e2SAndroid Build Coastguard Worker // wait on completion of all RPCs before shutdown
107*cc02d7e2SAndroid Build Coastguard Worker std::mutex shutdown_mu_;
108*cc02d7e2SAndroid Build Coastguard Worker std::condition_variable shutdown_cv_;
109*cc02d7e2SAndroid Build Coastguard Worker // Number of rpcs done after thread completion
110*cc02d7e2SAndroid Build Coastguard Worker size_t rpcs_done_;
111*cc02d7e2SAndroid Build Coastguard Worker // Vector of Context data pointers for running a RPC
112*cc02d7e2SAndroid Build Coastguard Worker std::vector<std::unique_ptr<CallbackClientRpcContext>> ctx_;
113*cc02d7e2SAndroid Build Coastguard Worker
114*cc02d7e2SAndroid Build Coastguard Worker virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
115*cc02d7e2SAndroid Build Coastguard Worker virtual bool ThreadFuncImpl(Thread* t, size_t thread_idx) = 0;
116*cc02d7e2SAndroid Build Coastguard Worker
ThreadFunc(size_t thread_idx,Thread * t)117*cc02d7e2SAndroid Build Coastguard Worker void ThreadFunc(size_t thread_idx, Thread* t) override {
118*cc02d7e2SAndroid Build Coastguard Worker InitThreadFuncImpl(thread_idx);
119*cc02d7e2SAndroid Build Coastguard Worker ThreadFuncImpl(t, thread_idx);
120*cc02d7e2SAndroid Build Coastguard Worker }
121*cc02d7e2SAndroid Build Coastguard Worker
122*cc02d7e2SAndroid Build Coastguard Worker private:
123*cc02d7e2SAndroid Build Coastguard Worker std::mutex next_issue_time_mu_; // Used by next issue time
124*cc02d7e2SAndroid Build Coastguard Worker
NumThreads(const ClientConfig & config)125*cc02d7e2SAndroid Build Coastguard Worker int NumThreads(const ClientConfig& config) {
126*cc02d7e2SAndroid Build Coastguard Worker int num_threads = config.async_client_threads();
127*cc02d7e2SAndroid Build Coastguard Worker if (num_threads <= 0) { // Use dynamic sizing
128*cc02d7e2SAndroid Build Coastguard Worker num_threads = cores_;
129*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Sizing callback client to %d threads", num_threads);
130*cc02d7e2SAndroid Build Coastguard Worker }
131*cc02d7e2SAndroid Build Coastguard Worker return num_threads;
132*cc02d7e2SAndroid Build Coastguard Worker }
133*cc02d7e2SAndroid Build Coastguard Worker
134*cc02d7e2SAndroid Build Coastguard Worker ///
135*cc02d7e2SAndroid Build Coastguard Worker /// Wait until all outstanding Callback RPCs are done
136*cc02d7e2SAndroid Build Coastguard Worker ///
DestroyMultithreading()137*cc02d7e2SAndroid Build Coastguard Worker void DestroyMultithreading() final {
138*cc02d7e2SAndroid Build Coastguard Worker std::unique_lock<std::mutex> l(shutdown_mu_);
139*cc02d7e2SAndroid Build Coastguard Worker while (rpcs_done_ != total_outstanding_rpcs_) {
140*cc02d7e2SAndroid Build Coastguard Worker shutdown_cv_.wait(l);
141*cc02d7e2SAndroid Build Coastguard Worker }
142*cc02d7e2SAndroid Build Coastguard Worker EndThreads();
143*cc02d7e2SAndroid Build Coastguard Worker }
144*cc02d7e2SAndroid Build Coastguard Worker };
145*cc02d7e2SAndroid Build Coastguard Worker
146*cc02d7e2SAndroid Build Coastguard Worker class CallbackUnaryClient final : public CallbackClient {
147*cc02d7e2SAndroid Build Coastguard Worker public:
CallbackUnaryClient(const ClientConfig & config)148*cc02d7e2SAndroid Build Coastguard Worker explicit CallbackUnaryClient(const ClientConfig& config)
149*cc02d7e2SAndroid Build Coastguard Worker : CallbackClient(config) {
150*cc02d7e2SAndroid Build Coastguard Worker for (int ch = 0; ch < config.client_channels(); ch++) {
151*cc02d7e2SAndroid Build Coastguard Worker for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
152*cc02d7e2SAndroid Build Coastguard Worker ctx_.emplace_back(
153*cc02d7e2SAndroid Build Coastguard Worker new CallbackClientRpcContext(channels_[ch].get_stub()));
154*cc02d7e2SAndroid Build Coastguard Worker }
155*cc02d7e2SAndroid Build Coastguard Worker }
156*cc02d7e2SAndroid Build Coastguard Worker StartThreads(num_threads_);
157*cc02d7e2SAndroid Build Coastguard Worker }
~CallbackUnaryClient()158*cc02d7e2SAndroid Build Coastguard Worker ~CallbackUnaryClient() override {}
159*cc02d7e2SAndroid Build Coastguard Worker
160*cc02d7e2SAndroid Build Coastguard Worker protected:
ThreadFuncImpl(Thread * t,size_t thread_idx)161*cc02d7e2SAndroid Build Coastguard Worker bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
162*cc02d7e2SAndroid Build Coastguard Worker for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
163*cc02d7e2SAndroid Build Coastguard Worker vector_idx += num_threads_) {
164*cc02d7e2SAndroid Build Coastguard Worker ScheduleRpc(t, vector_idx);
165*cc02d7e2SAndroid Build Coastguard Worker }
166*cc02d7e2SAndroid Build Coastguard Worker return true;
167*cc02d7e2SAndroid Build Coastguard Worker }
168*cc02d7e2SAndroid Build Coastguard Worker
InitThreadFuncImpl(size_t)169*cc02d7e2SAndroid Build Coastguard Worker void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
170*cc02d7e2SAndroid Build Coastguard Worker
171*cc02d7e2SAndroid Build Coastguard Worker private:
ScheduleRpc(Thread * t,size_t vector_idx)172*cc02d7e2SAndroid Build Coastguard Worker void ScheduleRpc(Thread* t, size_t vector_idx) {
173*cc02d7e2SAndroid Build Coastguard Worker if (!closed_loop_) {
174*cc02d7e2SAndroid Build Coastguard Worker gpr_timespec next_issue_time = NextRPCIssueTime();
175*cc02d7e2SAndroid Build Coastguard Worker // Start an alarm callback to run the internal callback after
176*cc02d7e2SAndroid Build Coastguard Worker // next_issue_time
177*cc02d7e2SAndroid Build Coastguard Worker if (ctx_[vector_idx]->alarm_ == nullptr) {
178*cc02d7e2SAndroid Build Coastguard Worker ctx_[vector_idx]->alarm_ = std::make_unique<Alarm>();
179*cc02d7e2SAndroid Build Coastguard Worker }
180*cc02d7e2SAndroid Build Coastguard Worker ctx_[vector_idx]->alarm_->Set(next_issue_time,
181*cc02d7e2SAndroid Build Coastguard Worker [this, t, vector_idx](bool /*ok*/) {
182*cc02d7e2SAndroid Build Coastguard Worker IssueUnaryCallbackRpc(t, vector_idx);
183*cc02d7e2SAndroid Build Coastguard Worker });
184*cc02d7e2SAndroid Build Coastguard Worker } else {
185*cc02d7e2SAndroid Build Coastguard Worker IssueUnaryCallbackRpc(t, vector_idx);
186*cc02d7e2SAndroid Build Coastguard Worker }
187*cc02d7e2SAndroid Build Coastguard Worker }
188*cc02d7e2SAndroid Build Coastguard Worker
IssueUnaryCallbackRpc(Thread * t,size_t vector_idx)189*cc02d7e2SAndroid Build Coastguard Worker void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
190*cc02d7e2SAndroid Build Coastguard Worker double start = UsageTimer::Now();
191*cc02d7e2SAndroid Build Coastguard Worker ctx_[vector_idx]->stub_->async()->UnaryCall(
192*cc02d7e2SAndroid Build Coastguard Worker (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
193*cc02d7e2SAndroid Build Coastguard Worker [this, t, start, vector_idx](grpc::Status s) {
194*cc02d7e2SAndroid Build Coastguard Worker // Update Histogram with data from the callback run
195*cc02d7e2SAndroid Build Coastguard Worker HistogramEntry entry;
196*cc02d7e2SAndroid Build Coastguard Worker if (s.ok()) {
197*cc02d7e2SAndroid Build Coastguard Worker entry.set_value((UsageTimer::Now() - start) * 1e9);
198*cc02d7e2SAndroid Build Coastguard Worker }
199*cc02d7e2SAndroid Build Coastguard Worker entry.set_status(s.error_code());
200*cc02d7e2SAndroid Build Coastguard Worker t->UpdateHistogram(&entry);
201*cc02d7e2SAndroid Build Coastguard Worker
202*cc02d7e2SAndroid Build Coastguard Worker if (ThreadCompleted() || !s.ok()) {
203*cc02d7e2SAndroid Build Coastguard Worker // Notify thread of completion
204*cc02d7e2SAndroid Build Coastguard Worker NotifyMainThreadOfThreadCompletion();
205*cc02d7e2SAndroid Build Coastguard Worker } else {
206*cc02d7e2SAndroid Build Coastguard Worker // Reallocate ctx for next RPC
207*cc02d7e2SAndroid Build Coastguard Worker ctx_[vector_idx] = std::make_unique<CallbackClientRpcContext>(
208*cc02d7e2SAndroid Build Coastguard Worker ctx_[vector_idx]->stub_);
209*cc02d7e2SAndroid Build Coastguard Worker // Schedule a new RPC
210*cc02d7e2SAndroid Build Coastguard Worker ScheduleRpc(t, vector_idx);
211*cc02d7e2SAndroid Build Coastguard Worker }
212*cc02d7e2SAndroid Build Coastguard Worker });
213*cc02d7e2SAndroid Build Coastguard Worker }
214*cc02d7e2SAndroid Build Coastguard Worker };
215*cc02d7e2SAndroid Build Coastguard Worker
216*cc02d7e2SAndroid Build Coastguard Worker class CallbackStreamingClient : public CallbackClient {
217*cc02d7e2SAndroid Build Coastguard Worker public:
CallbackStreamingClient(const ClientConfig & config)218*cc02d7e2SAndroid Build Coastguard Worker explicit CallbackStreamingClient(const ClientConfig& config)
219*cc02d7e2SAndroid Build Coastguard Worker : CallbackClient(config),
220*cc02d7e2SAndroid Build Coastguard Worker messages_per_stream_(config.messages_per_stream()) {
221*cc02d7e2SAndroid Build Coastguard Worker for (int ch = 0; ch < config.client_channels(); ch++) {
222*cc02d7e2SAndroid Build Coastguard Worker for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
223*cc02d7e2SAndroid Build Coastguard Worker ctx_.emplace_back(
224*cc02d7e2SAndroid Build Coastguard Worker new CallbackClientRpcContext(channels_[ch].get_stub()));
225*cc02d7e2SAndroid Build Coastguard Worker }
226*cc02d7e2SAndroid Build Coastguard Worker }
227*cc02d7e2SAndroid Build Coastguard Worker StartThreads(num_threads_);
228*cc02d7e2SAndroid Build Coastguard Worker }
~CallbackStreamingClient()229*cc02d7e2SAndroid Build Coastguard Worker ~CallbackStreamingClient() override {}
230*cc02d7e2SAndroid Build Coastguard Worker
AddHistogramEntry(double start,bool ok,Thread * thread_ptr)231*cc02d7e2SAndroid Build Coastguard Worker void AddHistogramEntry(double start, bool ok, Thread* thread_ptr) {
232*cc02d7e2SAndroid Build Coastguard Worker // Update Histogram with data from the callback run
233*cc02d7e2SAndroid Build Coastguard Worker HistogramEntry entry;
234*cc02d7e2SAndroid Build Coastguard Worker if (ok) {
235*cc02d7e2SAndroid Build Coastguard Worker entry.set_value((UsageTimer::Now() - start) * 1e9);
236*cc02d7e2SAndroid Build Coastguard Worker }
237*cc02d7e2SAndroid Build Coastguard Worker thread_ptr->UpdateHistogram(&entry);
238*cc02d7e2SAndroid Build Coastguard Worker }
239*cc02d7e2SAndroid Build Coastguard Worker
messages_per_stream()240*cc02d7e2SAndroid Build Coastguard Worker int messages_per_stream() { return messages_per_stream_; }
241*cc02d7e2SAndroid Build Coastguard Worker
242*cc02d7e2SAndroid Build Coastguard Worker protected:
243*cc02d7e2SAndroid Build Coastguard Worker const int messages_per_stream_;
244*cc02d7e2SAndroid Build Coastguard Worker };
245*cc02d7e2SAndroid Build Coastguard Worker
246*cc02d7e2SAndroid Build Coastguard Worker class CallbackStreamingPingPongClient : public CallbackStreamingClient {
247*cc02d7e2SAndroid Build Coastguard Worker public:
CallbackStreamingPingPongClient(const ClientConfig & config)248*cc02d7e2SAndroid Build Coastguard Worker explicit CallbackStreamingPingPongClient(const ClientConfig& config)
249*cc02d7e2SAndroid Build Coastguard Worker : CallbackStreamingClient(config) {}
~CallbackStreamingPingPongClient()250*cc02d7e2SAndroid Build Coastguard Worker ~CallbackStreamingPingPongClient() override {}
251*cc02d7e2SAndroid Build Coastguard Worker };
252*cc02d7e2SAndroid Build Coastguard Worker
253*cc02d7e2SAndroid Build Coastguard Worker class CallbackStreamingPingPongReactor final
254*cc02d7e2SAndroid Build Coastguard Worker : public grpc::ClientBidiReactor<SimpleRequest, SimpleResponse> {
255*cc02d7e2SAndroid Build Coastguard Worker public:
CallbackStreamingPingPongReactor(CallbackStreamingPingPongClient * client,std::unique_ptr<CallbackClientRpcContext> ctx)256*cc02d7e2SAndroid Build Coastguard Worker CallbackStreamingPingPongReactor(
257*cc02d7e2SAndroid Build Coastguard Worker CallbackStreamingPingPongClient* client,
258*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<CallbackClientRpcContext> ctx)
259*cc02d7e2SAndroid Build Coastguard Worker : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
260*cc02d7e2SAndroid Build Coastguard Worker
StartNewRpc()261*cc02d7e2SAndroid Build Coastguard Worker void StartNewRpc() {
262*cc02d7e2SAndroid Build Coastguard Worker ctx_->stub_->async()->StreamingCall(&(ctx_->context_), this);
263*cc02d7e2SAndroid Build Coastguard Worker write_time_ = UsageTimer::Now();
264*cc02d7e2SAndroid Build Coastguard Worker StartWrite(client_->request());
265*cc02d7e2SAndroid Build Coastguard Worker writes_done_started_.clear();
266*cc02d7e2SAndroid Build Coastguard Worker StartCall();
267*cc02d7e2SAndroid Build Coastguard Worker }
268*cc02d7e2SAndroid Build Coastguard Worker
OnWriteDone(bool ok)269*cc02d7e2SAndroid Build Coastguard Worker void OnWriteDone(bool ok) override {
270*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
271*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_ERROR, "Error writing RPC");
272*cc02d7e2SAndroid Build Coastguard Worker }
273*cc02d7e2SAndroid Build Coastguard Worker if ((!ok || client_->ThreadCompleted()) &&
274*cc02d7e2SAndroid Build Coastguard Worker !writes_done_started_.test_and_set()) {
275*cc02d7e2SAndroid Build Coastguard Worker StartWritesDone();
276*cc02d7e2SAndroid Build Coastguard Worker }
277*cc02d7e2SAndroid Build Coastguard Worker StartRead(&ctx_->response_);
278*cc02d7e2SAndroid Build Coastguard Worker }
279*cc02d7e2SAndroid Build Coastguard Worker
OnReadDone(bool ok)280*cc02d7e2SAndroid Build Coastguard Worker void OnReadDone(bool ok) override {
281*cc02d7e2SAndroid Build Coastguard Worker client_->AddHistogramEntry(write_time_, ok, thread_ptr_);
282*cc02d7e2SAndroid Build Coastguard Worker
283*cc02d7e2SAndroid Build Coastguard Worker if (client_->ThreadCompleted() || !ok ||
284*cc02d7e2SAndroid Build Coastguard Worker (client_->messages_per_stream() != 0 &&
285*cc02d7e2SAndroid Build Coastguard Worker ++messages_issued_ >= client_->messages_per_stream())) {
286*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
287*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_ERROR, "Error reading RPC");
288*cc02d7e2SAndroid Build Coastguard Worker }
289*cc02d7e2SAndroid Build Coastguard Worker if (!writes_done_started_.test_and_set()) {
290*cc02d7e2SAndroid Build Coastguard Worker StartWritesDone();
291*cc02d7e2SAndroid Build Coastguard Worker }
292*cc02d7e2SAndroid Build Coastguard Worker return;
293*cc02d7e2SAndroid Build Coastguard Worker }
294*cc02d7e2SAndroid Build Coastguard Worker if (!client_->IsClosedLoop()) {
295*cc02d7e2SAndroid Build Coastguard Worker gpr_timespec next_issue_time = client_->NextRPCIssueTime();
296*cc02d7e2SAndroid Build Coastguard Worker // Start an alarm callback to run the internal callback after
297*cc02d7e2SAndroid Build Coastguard Worker // next_issue_time
298*cc02d7e2SAndroid Build Coastguard Worker ctx_->alarm_->Set(next_issue_time, [this](bool /*ok*/) {
299*cc02d7e2SAndroid Build Coastguard Worker write_time_ = UsageTimer::Now();
300*cc02d7e2SAndroid Build Coastguard Worker StartWrite(client_->request());
301*cc02d7e2SAndroid Build Coastguard Worker });
302*cc02d7e2SAndroid Build Coastguard Worker } else {
303*cc02d7e2SAndroid Build Coastguard Worker write_time_ = UsageTimer::Now();
304*cc02d7e2SAndroid Build Coastguard Worker StartWrite(client_->request());
305*cc02d7e2SAndroid Build Coastguard Worker }
306*cc02d7e2SAndroid Build Coastguard Worker }
307*cc02d7e2SAndroid Build Coastguard Worker
OnDone(const Status & s)308*cc02d7e2SAndroid Build Coastguard Worker void OnDone(const Status& s) override {
309*cc02d7e2SAndroid Build Coastguard Worker if (client_->ThreadCompleted() || !s.ok()) {
310*cc02d7e2SAndroid Build Coastguard Worker client_->NotifyMainThreadOfThreadCompletion();
311*cc02d7e2SAndroid Build Coastguard Worker return;
312*cc02d7e2SAndroid Build Coastguard Worker }
313*cc02d7e2SAndroid Build Coastguard Worker ctx_ = std::make_unique<CallbackClientRpcContext>(ctx_->stub_);
314*cc02d7e2SAndroid Build Coastguard Worker ScheduleRpc();
315*cc02d7e2SAndroid Build Coastguard Worker }
316*cc02d7e2SAndroid Build Coastguard Worker
ScheduleRpc()317*cc02d7e2SAndroid Build Coastguard Worker void ScheduleRpc() {
318*cc02d7e2SAndroid Build Coastguard Worker if (!client_->IsClosedLoop()) {
319*cc02d7e2SAndroid Build Coastguard Worker gpr_timespec next_issue_time = client_->NextRPCIssueTime();
320*cc02d7e2SAndroid Build Coastguard Worker // Start an alarm callback to run the internal callback after
321*cc02d7e2SAndroid Build Coastguard Worker // next_issue_time
322*cc02d7e2SAndroid Build Coastguard Worker if (ctx_->alarm_ == nullptr) {
323*cc02d7e2SAndroid Build Coastguard Worker ctx_->alarm_ = std::make_unique<Alarm>();
324*cc02d7e2SAndroid Build Coastguard Worker }
325*cc02d7e2SAndroid Build Coastguard Worker ctx_->alarm_->Set(next_issue_time,
326*cc02d7e2SAndroid Build Coastguard Worker [this](bool /*ok*/) { StartNewRpc(); });
327*cc02d7e2SAndroid Build Coastguard Worker } else {
328*cc02d7e2SAndroid Build Coastguard Worker StartNewRpc();
329*cc02d7e2SAndroid Build Coastguard Worker }
330*cc02d7e2SAndroid Build Coastguard Worker }
331*cc02d7e2SAndroid Build Coastguard Worker
set_thread_ptr(Client::Thread * ptr)332*cc02d7e2SAndroid Build Coastguard Worker void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
333*cc02d7e2SAndroid Build Coastguard Worker
334*cc02d7e2SAndroid Build Coastguard Worker CallbackStreamingPingPongClient* client_;
335*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<CallbackClientRpcContext> ctx_;
336*cc02d7e2SAndroid Build Coastguard Worker std::atomic_flag writes_done_started_;
337*cc02d7e2SAndroid Build Coastguard Worker Client::Thread* thread_ptr_; // Needed to update histogram entries
338*cc02d7e2SAndroid Build Coastguard Worker double write_time_; // Track ping-pong round start time
339*cc02d7e2SAndroid Build Coastguard Worker int messages_issued_; // Messages issued by this stream
340*cc02d7e2SAndroid Build Coastguard Worker };
341*cc02d7e2SAndroid Build Coastguard Worker
342*cc02d7e2SAndroid Build Coastguard Worker class CallbackStreamingPingPongClientImpl final
343*cc02d7e2SAndroid Build Coastguard Worker : public CallbackStreamingPingPongClient {
344*cc02d7e2SAndroid Build Coastguard Worker public:
CallbackStreamingPingPongClientImpl(const ClientConfig & config)345*cc02d7e2SAndroid Build Coastguard Worker explicit CallbackStreamingPingPongClientImpl(const ClientConfig& config)
346*cc02d7e2SAndroid Build Coastguard Worker : CallbackStreamingPingPongClient(config) {
347*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < total_outstanding_rpcs_; i++) {
348*cc02d7e2SAndroid Build Coastguard Worker reactor_.emplace_back(
349*cc02d7e2SAndroid Build Coastguard Worker new CallbackStreamingPingPongReactor(this, std::move(ctx_[i])));
350*cc02d7e2SAndroid Build Coastguard Worker }
351*cc02d7e2SAndroid Build Coastguard Worker }
~CallbackStreamingPingPongClientImpl()352*cc02d7e2SAndroid Build Coastguard Worker ~CallbackStreamingPingPongClientImpl() override {}
353*cc02d7e2SAndroid Build Coastguard Worker
ThreadFuncImpl(Client::Thread * t,size_t thread_idx)354*cc02d7e2SAndroid Build Coastguard Worker bool ThreadFuncImpl(Client::Thread* t, size_t thread_idx) override {
355*cc02d7e2SAndroid Build Coastguard Worker for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
356*cc02d7e2SAndroid Build Coastguard Worker vector_idx += num_threads_) {
357*cc02d7e2SAndroid Build Coastguard Worker reactor_[vector_idx]->set_thread_ptr(t);
358*cc02d7e2SAndroid Build Coastguard Worker reactor_[vector_idx]->ScheduleRpc();
359*cc02d7e2SAndroid Build Coastguard Worker }
360*cc02d7e2SAndroid Build Coastguard Worker return true;
361*cc02d7e2SAndroid Build Coastguard Worker }
362*cc02d7e2SAndroid Build Coastguard Worker
InitThreadFuncImpl(size_t)363*cc02d7e2SAndroid Build Coastguard Worker void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
364*cc02d7e2SAndroid Build Coastguard Worker
365*cc02d7e2SAndroid Build Coastguard Worker private:
366*cc02d7e2SAndroid Build Coastguard Worker std::vector<std::unique_ptr<CallbackStreamingPingPongReactor>> reactor_;
367*cc02d7e2SAndroid Build Coastguard Worker };
368*cc02d7e2SAndroid Build Coastguard Worker
369*cc02d7e2SAndroid Build Coastguard Worker // TODO(mhaidry) : Implement Streaming from client, server and both ways
370*cc02d7e2SAndroid Build Coastguard Worker
CreateCallbackClient(const ClientConfig & config)371*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) {
372*cc02d7e2SAndroid Build Coastguard Worker switch (config.rpc_type()) {
373*cc02d7e2SAndroid Build Coastguard Worker case UNARY:
374*cc02d7e2SAndroid Build Coastguard Worker return std::unique_ptr<Client>(new CallbackUnaryClient(config));
375*cc02d7e2SAndroid Build Coastguard Worker case STREAMING:
376*cc02d7e2SAndroid Build Coastguard Worker return std::unique_ptr<Client>(
377*cc02d7e2SAndroid Build Coastguard Worker new CallbackStreamingPingPongClientImpl(config));
378*cc02d7e2SAndroid Build Coastguard Worker case STREAMING_FROM_CLIENT:
379*cc02d7e2SAndroid Build Coastguard Worker case STREAMING_FROM_SERVER:
380*cc02d7e2SAndroid Build Coastguard Worker case STREAMING_BOTH_WAYS:
381*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(
382*cc02d7e2SAndroid Build Coastguard Worker "STREAMING_FROM_* scenarios are not supported by the callback "
383*cc02d7e2SAndroid Build Coastguard Worker "API");
384*cc02d7e2SAndroid Build Coastguard Worker default:
385*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrCat("Unknown RPC type: ", config.rpc_type()));
386*cc02d7e2SAndroid Build Coastguard Worker }
387*cc02d7e2SAndroid Build Coastguard Worker }
388*cc02d7e2SAndroid Build Coastguard Worker
389*cc02d7e2SAndroid Build Coastguard Worker } // namespace testing
390*cc02d7e2SAndroid Build Coastguard Worker } // namespace grpc
391