xref: /aosp_15_r20/external/grpc-grpc/test/cpp/qps/client_callback.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker //     http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker 
19*cc02d7e2SAndroid Build Coastguard Worker #include <list>
20*cc02d7e2SAndroid Build Coastguard Worker #include <memory>
21*cc02d7e2SAndroid Build Coastguard Worker #include <mutex>
22*cc02d7e2SAndroid Build Coastguard Worker #include <sstream>
23*cc02d7e2SAndroid Build Coastguard Worker #include <string>
24*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
25*cc02d7e2SAndroid Build Coastguard Worker #include <utility>
26*cc02d7e2SAndroid Build Coastguard Worker #include <vector>
27*cc02d7e2SAndroid Build Coastguard Worker 
28*cc02d7e2SAndroid Build Coastguard Worker #include "absl/memory/memory.h"
29*cc02d7e2SAndroid Build Coastguard Worker 
30*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/grpc.h>
31*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/cpu.h>
32*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
33*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/alarm.h>
34*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/channel.h>
35*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/client_context.h>
36*cc02d7e2SAndroid Build Coastguard Worker 
37*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/crash.h"
38*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
39*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/client.h"
40*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/usage_timer.h"
41*cc02d7e2SAndroid Build Coastguard Worker 
42*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
43*cc02d7e2SAndroid Build Coastguard Worker namespace testing {
44*cc02d7e2SAndroid Build Coastguard Worker 
45*cc02d7e2SAndroid Build Coastguard Worker ///
46*cc02d7e2SAndroid Build Coastguard Worker /// Maintains context info per RPC
47*cc02d7e2SAndroid Build Coastguard Worker ///
48*cc02d7e2SAndroid Build Coastguard Worker struct CallbackClientRpcContext {
CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext49*cc02d7e2SAndroid Build Coastguard Worker   explicit CallbackClientRpcContext(BenchmarkService::Stub* stub)
50*cc02d7e2SAndroid Build Coastguard Worker       : alarm_(nullptr), stub_(stub) {}
51*cc02d7e2SAndroid Build Coastguard Worker 
~CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext52*cc02d7e2SAndroid Build Coastguard Worker   ~CallbackClientRpcContext() {}
53*cc02d7e2SAndroid Build Coastguard Worker 
54*cc02d7e2SAndroid Build Coastguard Worker   SimpleResponse response_;
55*cc02d7e2SAndroid Build Coastguard Worker   ClientContext context_;
56*cc02d7e2SAndroid Build Coastguard Worker   std::unique_ptr<Alarm> alarm_;
57*cc02d7e2SAndroid Build Coastguard Worker   BenchmarkService::Stub* stub_;
58*cc02d7e2SAndroid Build Coastguard Worker };
59*cc02d7e2SAndroid Build Coastguard Worker 
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)60*cc02d7e2SAndroid Build Coastguard Worker static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
61*cc02d7e2SAndroid Build Coastguard Worker     const std::shared_ptr<Channel>& ch) {
62*cc02d7e2SAndroid Build Coastguard Worker   return BenchmarkService::NewStub(ch);
63*cc02d7e2SAndroid Build Coastguard Worker }
64*cc02d7e2SAndroid Build Coastguard Worker 
65*cc02d7e2SAndroid Build Coastguard Worker class CallbackClient
66*cc02d7e2SAndroid Build Coastguard Worker     : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
67*cc02d7e2SAndroid Build Coastguard Worker  public:
CallbackClient(const ClientConfig & config)68*cc02d7e2SAndroid Build Coastguard Worker   explicit CallbackClient(const ClientConfig& config)
69*cc02d7e2SAndroid Build Coastguard Worker       : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
70*cc02d7e2SAndroid Build Coastguard Worker             config, BenchmarkStubCreator) {
71*cc02d7e2SAndroid Build Coastguard Worker     num_threads_ = NumThreads(config);
72*cc02d7e2SAndroid Build Coastguard Worker     rpcs_done_ = 0;
73*cc02d7e2SAndroid Build Coastguard Worker 
74*cc02d7e2SAndroid Build Coastguard Worker     //  Don't divide the fixed load among threads as the user threads
75*cc02d7e2SAndroid Build Coastguard Worker     //  only bootstrap the RPCs
76*cc02d7e2SAndroid Build Coastguard Worker     SetupLoadTest(config, 1);
77*cc02d7e2SAndroid Build Coastguard Worker     total_outstanding_rpcs_ =
78*cc02d7e2SAndroid Build Coastguard Worker         config.client_channels() * config.outstanding_rpcs_per_channel();
79*cc02d7e2SAndroid Build Coastguard Worker   }
80*cc02d7e2SAndroid Build Coastguard Worker 
~CallbackClient()81*cc02d7e2SAndroid Build Coastguard Worker   ~CallbackClient() override {}
82*cc02d7e2SAndroid Build Coastguard Worker 
83*cc02d7e2SAndroid Build Coastguard Worker   ///
84*cc02d7e2SAndroid Build Coastguard Worker   /// The main thread of the benchmark will be waiting on DestroyMultithreading.
85*cc02d7e2SAndroid Build Coastguard Worker   /// Increment the rpcs_done_ variable to signify that the Callback RPC
86*cc02d7e2SAndroid Build Coastguard Worker   /// after thread completion is done. When the last outstanding rpc increments
87*cc02d7e2SAndroid Build Coastguard Worker   /// the counter it should also signal the main thread's conditional variable.
88*cc02d7e2SAndroid Build Coastguard Worker   ///
NotifyMainThreadOfThreadCompletion()89*cc02d7e2SAndroid Build Coastguard Worker   void NotifyMainThreadOfThreadCompletion() {
90*cc02d7e2SAndroid Build Coastguard Worker     std::lock_guard<std::mutex> l(shutdown_mu_);
91*cc02d7e2SAndroid Build Coastguard Worker     rpcs_done_++;
92*cc02d7e2SAndroid Build Coastguard Worker     if (rpcs_done_ == total_outstanding_rpcs_) {
93*cc02d7e2SAndroid Build Coastguard Worker       shutdown_cv_.notify_one();
94*cc02d7e2SAndroid Build Coastguard Worker     }
95*cc02d7e2SAndroid Build Coastguard Worker   }
96*cc02d7e2SAndroid Build Coastguard Worker 
NextRPCIssueTime()97*cc02d7e2SAndroid Build Coastguard Worker   gpr_timespec NextRPCIssueTime() {
98*cc02d7e2SAndroid Build Coastguard Worker     std::lock_guard<std::mutex> l(next_issue_time_mu_);
99*cc02d7e2SAndroid Build Coastguard Worker     return Client::NextIssueTime(0);
100*cc02d7e2SAndroid Build Coastguard Worker   }
101*cc02d7e2SAndroid Build Coastguard Worker 
102*cc02d7e2SAndroid Build Coastguard Worker  protected:
103*cc02d7e2SAndroid Build Coastguard Worker   size_t num_threads_;
104*cc02d7e2SAndroid Build Coastguard Worker   size_t total_outstanding_rpcs_;
105*cc02d7e2SAndroid Build Coastguard Worker   // The below mutex and condition variable is used by main benchmark thread to
106*cc02d7e2SAndroid Build Coastguard Worker   // wait on completion of all RPCs before shutdown
107*cc02d7e2SAndroid Build Coastguard Worker   std::mutex shutdown_mu_;
108*cc02d7e2SAndroid Build Coastguard Worker   std::condition_variable shutdown_cv_;
109*cc02d7e2SAndroid Build Coastguard Worker   // Number of rpcs done after thread completion
110*cc02d7e2SAndroid Build Coastguard Worker   size_t rpcs_done_;
111*cc02d7e2SAndroid Build Coastguard Worker   // Vector of Context data pointers for running a RPC
112*cc02d7e2SAndroid Build Coastguard Worker   std::vector<std::unique_ptr<CallbackClientRpcContext>> ctx_;
113*cc02d7e2SAndroid Build Coastguard Worker 
114*cc02d7e2SAndroid Build Coastguard Worker   virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
115*cc02d7e2SAndroid Build Coastguard Worker   virtual bool ThreadFuncImpl(Thread* t, size_t thread_idx) = 0;
116*cc02d7e2SAndroid Build Coastguard Worker 
ThreadFunc(size_t thread_idx,Thread * t)117*cc02d7e2SAndroid Build Coastguard Worker   void ThreadFunc(size_t thread_idx, Thread* t) override {
118*cc02d7e2SAndroid Build Coastguard Worker     InitThreadFuncImpl(thread_idx);
119*cc02d7e2SAndroid Build Coastguard Worker     ThreadFuncImpl(t, thread_idx);
120*cc02d7e2SAndroid Build Coastguard Worker   }
121*cc02d7e2SAndroid Build Coastguard Worker 
122*cc02d7e2SAndroid Build Coastguard Worker  private:
123*cc02d7e2SAndroid Build Coastguard Worker   std::mutex next_issue_time_mu_;  // Used by next issue time
124*cc02d7e2SAndroid Build Coastguard Worker 
NumThreads(const ClientConfig & config)125*cc02d7e2SAndroid Build Coastguard Worker   int NumThreads(const ClientConfig& config) {
126*cc02d7e2SAndroid Build Coastguard Worker     int num_threads = config.async_client_threads();
127*cc02d7e2SAndroid Build Coastguard Worker     if (num_threads <= 0) {  // Use dynamic sizing
128*cc02d7e2SAndroid Build Coastguard Worker       num_threads = cores_;
129*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_INFO, "Sizing callback client to %d threads", num_threads);
130*cc02d7e2SAndroid Build Coastguard Worker     }
131*cc02d7e2SAndroid Build Coastguard Worker     return num_threads;
132*cc02d7e2SAndroid Build Coastguard Worker   }
133*cc02d7e2SAndroid Build Coastguard Worker 
134*cc02d7e2SAndroid Build Coastguard Worker   ///
135*cc02d7e2SAndroid Build Coastguard Worker   /// Wait until all outstanding Callback RPCs are done
136*cc02d7e2SAndroid Build Coastguard Worker   ///
DestroyMultithreading()137*cc02d7e2SAndroid Build Coastguard Worker   void DestroyMultithreading() final {
138*cc02d7e2SAndroid Build Coastguard Worker     std::unique_lock<std::mutex> l(shutdown_mu_);
139*cc02d7e2SAndroid Build Coastguard Worker     while (rpcs_done_ != total_outstanding_rpcs_) {
140*cc02d7e2SAndroid Build Coastguard Worker       shutdown_cv_.wait(l);
141*cc02d7e2SAndroid Build Coastguard Worker     }
142*cc02d7e2SAndroid Build Coastguard Worker     EndThreads();
143*cc02d7e2SAndroid Build Coastguard Worker   }
144*cc02d7e2SAndroid Build Coastguard Worker };
145*cc02d7e2SAndroid Build Coastguard Worker 
146*cc02d7e2SAndroid Build Coastguard Worker class CallbackUnaryClient final : public CallbackClient {
147*cc02d7e2SAndroid Build Coastguard Worker  public:
CallbackUnaryClient(const ClientConfig & config)148*cc02d7e2SAndroid Build Coastguard Worker   explicit CallbackUnaryClient(const ClientConfig& config)
149*cc02d7e2SAndroid Build Coastguard Worker       : CallbackClient(config) {
150*cc02d7e2SAndroid Build Coastguard Worker     for (int ch = 0; ch < config.client_channels(); ch++) {
151*cc02d7e2SAndroid Build Coastguard Worker       for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
152*cc02d7e2SAndroid Build Coastguard Worker         ctx_.emplace_back(
153*cc02d7e2SAndroid Build Coastguard Worker             new CallbackClientRpcContext(channels_[ch].get_stub()));
154*cc02d7e2SAndroid Build Coastguard Worker       }
155*cc02d7e2SAndroid Build Coastguard Worker     }
156*cc02d7e2SAndroid Build Coastguard Worker     StartThreads(num_threads_);
157*cc02d7e2SAndroid Build Coastguard Worker   }
~CallbackUnaryClient()158*cc02d7e2SAndroid Build Coastguard Worker   ~CallbackUnaryClient() override {}
159*cc02d7e2SAndroid Build Coastguard Worker 
160*cc02d7e2SAndroid Build Coastguard Worker  protected:
ThreadFuncImpl(Thread * t,size_t thread_idx)161*cc02d7e2SAndroid Build Coastguard Worker   bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
162*cc02d7e2SAndroid Build Coastguard Worker     for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
163*cc02d7e2SAndroid Build Coastguard Worker          vector_idx += num_threads_) {
164*cc02d7e2SAndroid Build Coastguard Worker       ScheduleRpc(t, vector_idx);
165*cc02d7e2SAndroid Build Coastguard Worker     }
166*cc02d7e2SAndroid Build Coastguard Worker     return true;
167*cc02d7e2SAndroid Build Coastguard Worker   }
168*cc02d7e2SAndroid Build Coastguard Worker 
InitThreadFuncImpl(size_t)169*cc02d7e2SAndroid Build Coastguard Worker   void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
170*cc02d7e2SAndroid Build Coastguard Worker 
171*cc02d7e2SAndroid Build Coastguard Worker  private:
ScheduleRpc(Thread * t,size_t vector_idx)172*cc02d7e2SAndroid Build Coastguard Worker   void ScheduleRpc(Thread* t, size_t vector_idx) {
173*cc02d7e2SAndroid Build Coastguard Worker     if (!closed_loop_) {
174*cc02d7e2SAndroid Build Coastguard Worker       gpr_timespec next_issue_time = NextRPCIssueTime();
175*cc02d7e2SAndroid Build Coastguard Worker       // Start an alarm callback to run the internal callback after
176*cc02d7e2SAndroid Build Coastguard Worker       // next_issue_time
177*cc02d7e2SAndroid Build Coastguard Worker       if (ctx_[vector_idx]->alarm_ == nullptr) {
178*cc02d7e2SAndroid Build Coastguard Worker         ctx_[vector_idx]->alarm_ = std::make_unique<Alarm>();
179*cc02d7e2SAndroid Build Coastguard Worker       }
180*cc02d7e2SAndroid Build Coastguard Worker       ctx_[vector_idx]->alarm_->Set(next_issue_time,
181*cc02d7e2SAndroid Build Coastguard Worker                                     [this, t, vector_idx](bool /*ok*/) {
182*cc02d7e2SAndroid Build Coastguard Worker                                       IssueUnaryCallbackRpc(t, vector_idx);
183*cc02d7e2SAndroid Build Coastguard Worker                                     });
184*cc02d7e2SAndroid Build Coastguard Worker     } else {
185*cc02d7e2SAndroid Build Coastguard Worker       IssueUnaryCallbackRpc(t, vector_idx);
186*cc02d7e2SAndroid Build Coastguard Worker     }
187*cc02d7e2SAndroid Build Coastguard Worker   }
188*cc02d7e2SAndroid Build Coastguard Worker 
IssueUnaryCallbackRpc(Thread * t,size_t vector_idx)189*cc02d7e2SAndroid Build Coastguard Worker   void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
190*cc02d7e2SAndroid Build Coastguard Worker     double start = UsageTimer::Now();
191*cc02d7e2SAndroid Build Coastguard Worker     ctx_[vector_idx]->stub_->async()->UnaryCall(
192*cc02d7e2SAndroid Build Coastguard Worker         (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
193*cc02d7e2SAndroid Build Coastguard Worker         [this, t, start, vector_idx](grpc::Status s) {
194*cc02d7e2SAndroid Build Coastguard Worker           // Update Histogram with data from the callback run
195*cc02d7e2SAndroid Build Coastguard Worker           HistogramEntry entry;
196*cc02d7e2SAndroid Build Coastguard Worker           if (s.ok()) {
197*cc02d7e2SAndroid Build Coastguard Worker             entry.set_value((UsageTimer::Now() - start) * 1e9);
198*cc02d7e2SAndroid Build Coastguard Worker           }
199*cc02d7e2SAndroid Build Coastguard Worker           entry.set_status(s.error_code());
200*cc02d7e2SAndroid Build Coastguard Worker           t->UpdateHistogram(&entry);
201*cc02d7e2SAndroid Build Coastguard Worker 
202*cc02d7e2SAndroid Build Coastguard Worker           if (ThreadCompleted() || !s.ok()) {
203*cc02d7e2SAndroid Build Coastguard Worker             // Notify thread of completion
204*cc02d7e2SAndroid Build Coastguard Worker             NotifyMainThreadOfThreadCompletion();
205*cc02d7e2SAndroid Build Coastguard Worker           } else {
206*cc02d7e2SAndroid Build Coastguard Worker             // Reallocate ctx for next RPC
207*cc02d7e2SAndroid Build Coastguard Worker             ctx_[vector_idx] = std::make_unique<CallbackClientRpcContext>(
208*cc02d7e2SAndroid Build Coastguard Worker                 ctx_[vector_idx]->stub_);
209*cc02d7e2SAndroid Build Coastguard Worker             // Schedule a new RPC
210*cc02d7e2SAndroid Build Coastguard Worker             ScheduleRpc(t, vector_idx);
211*cc02d7e2SAndroid Build Coastguard Worker           }
212*cc02d7e2SAndroid Build Coastguard Worker         });
213*cc02d7e2SAndroid Build Coastguard Worker   }
214*cc02d7e2SAndroid Build Coastguard Worker };
215*cc02d7e2SAndroid Build Coastguard Worker 
216*cc02d7e2SAndroid Build Coastguard Worker class CallbackStreamingClient : public CallbackClient {
217*cc02d7e2SAndroid Build Coastguard Worker  public:
CallbackStreamingClient(const ClientConfig & config)218*cc02d7e2SAndroid Build Coastguard Worker   explicit CallbackStreamingClient(const ClientConfig& config)
219*cc02d7e2SAndroid Build Coastguard Worker       : CallbackClient(config),
220*cc02d7e2SAndroid Build Coastguard Worker         messages_per_stream_(config.messages_per_stream()) {
221*cc02d7e2SAndroid Build Coastguard Worker     for (int ch = 0; ch < config.client_channels(); ch++) {
222*cc02d7e2SAndroid Build Coastguard Worker       for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
223*cc02d7e2SAndroid Build Coastguard Worker         ctx_.emplace_back(
224*cc02d7e2SAndroid Build Coastguard Worker             new CallbackClientRpcContext(channels_[ch].get_stub()));
225*cc02d7e2SAndroid Build Coastguard Worker       }
226*cc02d7e2SAndroid Build Coastguard Worker     }
227*cc02d7e2SAndroid Build Coastguard Worker     StartThreads(num_threads_);
228*cc02d7e2SAndroid Build Coastguard Worker   }
~CallbackStreamingClient()229*cc02d7e2SAndroid Build Coastguard Worker   ~CallbackStreamingClient() override {}
230*cc02d7e2SAndroid Build Coastguard Worker 
AddHistogramEntry(double start,bool ok,Thread * thread_ptr)231*cc02d7e2SAndroid Build Coastguard Worker   void AddHistogramEntry(double start, bool ok, Thread* thread_ptr) {
232*cc02d7e2SAndroid Build Coastguard Worker     // Update Histogram with data from the callback run
233*cc02d7e2SAndroid Build Coastguard Worker     HistogramEntry entry;
234*cc02d7e2SAndroid Build Coastguard Worker     if (ok) {
235*cc02d7e2SAndroid Build Coastguard Worker       entry.set_value((UsageTimer::Now() - start) * 1e9);
236*cc02d7e2SAndroid Build Coastguard Worker     }
237*cc02d7e2SAndroid Build Coastguard Worker     thread_ptr->UpdateHistogram(&entry);
238*cc02d7e2SAndroid Build Coastguard Worker   }
239*cc02d7e2SAndroid Build Coastguard Worker 
messages_per_stream()240*cc02d7e2SAndroid Build Coastguard Worker   int messages_per_stream() { return messages_per_stream_; }
241*cc02d7e2SAndroid Build Coastguard Worker 
242*cc02d7e2SAndroid Build Coastguard Worker  protected:
243*cc02d7e2SAndroid Build Coastguard Worker   const int messages_per_stream_;
244*cc02d7e2SAndroid Build Coastguard Worker };
245*cc02d7e2SAndroid Build Coastguard Worker 
246*cc02d7e2SAndroid Build Coastguard Worker class CallbackStreamingPingPongClient : public CallbackStreamingClient {
247*cc02d7e2SAndroid Build Coastguard Worker  public:
CallbackStreamingPingPongClient(const ClientConfig & config)248*cc02d7e2SAndroid Build Coastguard Worker   explicit CallbackStreamingPingPongClient(const ClientConfig& config)
249*cc02d7e2SAndroid Build Coastguard Worker       : CallbackStreamingClient(config) {}
~CallbackStreamingPingPongClient()250*cc02d7e2SAndroid Build Coastguard Worker   ~CallbackStreamingPingPongClient() override {}
251*cc02d7e2SAndroid Build Coastguard Worker };
252*cc02d7e2SAndroid Build Coastguard Worker 
253*cc02d7e2SAndroid Build Coastguard Worker class CallbackStreamingPingPongReactor final
254*cc02d7e2SAndroid Build Coastguard Worker     : public grpc::ClientBidiReactor<SimpleRequest, SimpleResponse> {
255*cc02d7e2SAndroid Build Coastguard Worker  public:
CallbackStreamingPingPongReactor(CallbackStreamingPingPongClient * client,std::unique_ptr<CallbackClientRpcContext> ctx)256*cc02d7e2SAndroid Build Coastguard Worker   CallbackStreamingPingPongReactor(
257*cc02d7e2SAndroid Build Coastguard Worker       CallbackStreamingPingPongClient* client,
258*cc02d7e2SAndroid Build Coastguard Worker       std::unique_ptr<CallbackClientRpcContext> ctx)
259*cc02d7e2SAndroid Build Coastguard Worker       : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
260*cc02d7e2SAndroid Build Coastguard Worker 
StartNewRpc()261*cc02d7e2SAndroid Build Coastguard Worker   void StartNewRpc() {
262*cc02d7e2SAndroid Build Coastguard Worker     ctx_->stub_->async()->StreamingCall(&(ctx_->context_), this);
263*cc02d7e2SAndroid Build Coastguard Worker     write_time_ = UsageTimer::Now();
264*cc02d7e2SAndroid Build Coastguard Worker     StartWrite(client_->request());
265*cc02d7e2SAndroid Build Coastguard Worker     writes_done_started_.clear();
266*cc02d7e2SAndroid Build Coastguard Worker     StartCall();
267*cc02d7e2SAndroid Build Coastguard Worker   }
268*cc02d7e2SAndroid Build Coastguard Worker 
OnWriteDone(bool ok)269*cc02d7e2SAndroid Build Coastguard Worker   void OnWriteDone(bool ok) override {
270*cc02d7e2SAndroid Build Coastguard Worker     if (!ok) {
271*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_ERROR, "Error writing RPC");
272*cc02d7e2SAndroid Build Coastguard Worker     }
273*cc02d7e2SAndroid Build Coastguard Worker     if ((!ok || client_->ThreadCompleted()) &&
274*cc02d7e2SAndroid Build Coastguard Worker         !writes_done_started_.test_and_set()) {
275*cc02d7e2SAndroid Build Coastguard Worker       StartWritesDone();
276*cc02d7e2SAndroid Build Coastguard Worker     }
277*cc02d7e2SAndroid Build Coastguard Worker     StartRead(&ctx_->response_);
278*cc02d7e2SAndroid Build Coastguard Worker   }
279*cc02d7e2SAndroid Build Coastguard Worker 
OnReadDone(bool ok)280*cc02d7e2SAndroid Build Coastguard Worker   void OnReadDone(bool ok) override {
281*cc02d7e2SAndroid Build Coastguard Worker     client_->AddHistogramEntry(write_time_, ok, thread_ptr_);
282*cc02d7e2SAndroid Build Coastguard Worker 
283*cc02d7e2SAndroid Build Coastguard Worker     if (client_->ThreadCompleted() || !ok ||
284*cc02d7e2SAndroid Build Coastguard Worker         (client_->messages_per_stream() != 0 &&
285*cc02d7e2SAndroid Build Coastguard Worker          ++messages_issued_ >= client_->messages_per_stream())) {
286*cc02d7e2SAndroid Build Coastguard Worker       if (!ok) {
287*cc02d7e2SAndroid Build Coastguard Worker         gpr_log(GPR_ERROR, "Error reading RPC");
288*cc02d7e2SAndroid Build Coastguard Worker       }
289*cc02d7e2SAndroid Build Coastguard Worker       if (!writes_done_started_.test_and_set()) {
290*cc02d7e2SAndroid Build Coastguard Worker         StartWritesDone();
291*cc02d7e2SAndroid Build Coastguard Worker       }
292*cc02d7e2SAndroid Build Coastguard Worker       return;
293*cc02d7e2SAndroid Build Coastguard Worker     }
294*cc02d7e2SAndroid Build Coastguard Worker     if (!client_->IsClosedLoop()) {
295*cc02d7e2SAndroid Build Coastguard Worker       gpr_timespec next_issue_time = client_->NextRPCIssueTime();
296*cc02d7e2SAndroid Build Coastguard Worker       // Start an alarm callback to run the internal callback after
297*cc02d7e2SAndroid Build Coastguard Worker       // next_issue_time
298*cc02d7e2SAndroid Build Coastguard Worker       ctx_->alarm_->Set(next_issue_time, [this](bool /*ok*/) {
299*cc02d7e2SAndroid Build Coastguard Worker         write_time_ = UsageTimer::Now();
300*cc02d7e2SAndroid Build Coastguard Worker         StartWrite(client_->request());
301*cc02d7e2SAndroid Build Coastguard Worker       });
302*cc02d7e2SAndroid Build Coastguard Worker     } else {
303*cc02d7e2SAndroid Build Coastguard Worker       write_time_ = UsageTimer::Now();
304*cc02d7e2SAndroid Build Coastguard Worker       StartWrite(client_->request());
305*cc02d7e2SAndroid Build Coastguard Worker     }
306*cc02d7e2SAndroid Build Coastguard Worker   }
307*cc02d7e2SAndroid Build Coastguard Worker 
OnDone(const Status & s)308*cc02d7e2SAndroid Build Coastguard Worker   void OnDone(const Status& s) override {
309*cc02d7e2SAndroid Build Coastguard Worker     if (client_->ThreadCompleted() || !s.ok()) {
310*cc02d7e2SAndroid Build Coastguard Worker       client_->NotifyMainThreadOfThreadCompletion();
311*cc02d7e2SAndroid Build Coastguard Worker       return;
312*cc02d7e2SAndroid Build Coastguard Worker     }
313*cc02d7e2SAndroid Build Coastguard Worker     ctx_ = std::make_unique<CallbackClientRpcContext>(ctx_->stub_);
314*cc02d7e2SAndroid Build Coastguard Worker     ScheduleRpc();
315*cc02d7e2SAndroid Build Coastguard Worker   }
316*cc02d7e2SAndroid Build Coastguard Worker 
ScheduleRpc()317*cc02d7e2SAndroid Build Coastguard Worker   void ScheduleRpc() {
318*cc02d7e2SAndroid Build Coastguard Worker     if (!client_->IsClosedLoop()) {
319*cc02d7e2SAndroid Build Coastguard Worker       gpr_timespec next_issue_time = client_->NextRPCIssueTime();
320*cc02d7e2SAndroid Build Coastguard Worker       // Start an alarm callback to run the internal callback after
321*cc02d7e2SAndroid Build Coastguard Worker       // next_issue_time
322*cc02d7e2SAndroid Build Coastguard Worker       if (ctx_->alarm_ == nullptr) {
323*cc02d7e2SAndroid Build Coastguard Worker         ctx_->alarm_ = std::make_unique<Alarm>();
324*cc02d7e2SAndroid Build Coastguard Worker       }
325*cc02d7e2SAndroid Build Coastguard Worker       ctx_->alarm_->Set(next_issue_time,
326*cc02d7e2SAndroid Build Coastguard Worker                         [this](bool /*ok*/) { StartNewRpc(); });
327*cc02d7e2SAndroid Build Coastguard Worker     } else {
328*cc02d7e2SAndroid Build Coastguard Worker       StartNewRpc();
329*cc02d7e2SAndroid Build Coastguard Worker     }
330*cc02d7e2SAndroid Build Coastguard Worker   }
331*cc02d7e2SAndroid Build Coastguard Worker 
set_thread_ptr(Client::Thread * ptr)332*cc02d7e2SAndroid Build Coastguard Worker   void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
333*cc02d7e2SAndroid Build Coastguard Worker 
334*cc02d7e2SAndroid Build Coastguard Worker   CallbackStreamingPingPongClient* client_;
335*cc02d7e2SAndroid Build Coastguard Worker   std::unique_ptr<CallbackClientRpcContext> ctx_;
336*cc02d7e2SAndroid Build Coastguard Worker   std::atomic_flag writes_done_started_;
337*cc02d7e2SAndroid Build Coastguard Worker   Client::Thread* thread_ptr_;  // Needed to update histogram entries
338*cc02d7e2SAndroid Build Coastguard Worker   double write_time_;           // Track ping-pong round start time
339*cc02d7e2SAndroid Build Coastguard Worker   int messages_issued_;         // Messages issued by this stream
340*cc02d7e2SAndroid Build Coastguard Worker };
341*cc02d7e2SAndroid Build Coastguard Worker 
342*cc02d7e2SAndroid Build Coastguard Worker class CallbackStreamingPingPongClientImpl final
343*cc02d7e2SAndroid Build Coastguard Worker     : public CallbackStreamingPingPongClient {
344*cc02d7e2SAndroid Build Coastguard Worker  public:
CallbackStreamingPingPongClientImpl(const ClientConfig & config)345*cc02d7e2SAndroid Build Coastguard Worker   explicit CallbackStreamingPingPongClientImpl(const ClientConfig& config)
346*cc02d7e2SAndroid Build Coastguard Worker       : CallbackStreamingPingPongClient(config) {
347*cc02d7e2SAndroid Build Coastguard Worker     for (size_t i = 0; i < total_outstanding_rpcs_; i++) {
348*cc02d7e2SAndroid Build Coastguard Worker       reactor_.emplace_back(
349*cc02d7e2SAndroid Build Coastguard Worker           new CallbackStreamingPingPongReactor(this, std::move(ctx_[i])));
350*cc02d7e2SAndroid Build Coastguard Worker     }
351*cc02d7e2SAndroid Build Coastguard Worker   }
~CallbackStreamingPingPongClientImpl()352*cc02d7e2SAndroid Build Coastguard Worker   ~CallbackStreamingPingPongClientImpl() override {}
353*cc02d7e2SAndroid Build Coastguard Worker 
ThreadFuncImpl(Client::Thread * t,size_t thread_idx)354*cc02d7e2SAndroid Build Coastguard Worker   bool ThreadFuncImpl(Client::Thread* t, size_t thread_idx) override {
355*cc02d7e2SAndroid Build Coastguard Worker     for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
356*cc02d7e2SAndroid Build Coastguard Worker          vector_idx += num_threads_) {
357*cc02d7e2SAndroid Build Coastguard Worker       reactor_[vector_idx]->set_thread_ptr(t);
358*cc02d7e2SAndroid Build Coastguard Worker       reactor_[vector_idx]->ScheduleRpc();
359*cc02d7e2SAndroid Build Coastguard Worker     }
360*cc02d7e2SAndroid Build Coastguard Worker     return true;
361*cc02d7e2SAndroid Build Coastguard Worker   }
362*cc02d7e2SAndroid Build Coastguard Worker 
InitThreadFuncImpl(size_t)363*cc02d7e2SAndroid Build Coastguard Worker   void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
364*cc02d7e2SAndroid Build Coastguard Worker 
365*cc02d7e2SAndroid Build Coastguard Worker  private:
366*cc02d7e2SAndroid Build Coastguard Worker   std::vector<std::unique_ptr<CallbackStreamingPingPongReactor>> reactor_;
367*cc02d7e2SAndroid Build Coastguard Worker };
368*cc02d7e2SAndroid Build Coastguard Worker 
369*cc02d7e2SAndroid Build Coastguard Worker // TODO(mhaidry) : Implement Streaming from client, server and both ways
370*cc02d7e2SAndroid Build Coastguard Worker 
CreateCallbackClient(const ClientConfig & config)371*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) {
372*cc02d7e2SAndroid Build Coastguard Worker   switch (config.rpc_type()) {
373*cc02d7e2SAndroid Build Coastguard Worker     case UNARY:
374*cc02d7e2SAndroid Build Coastguard Worker       return std::unique_ptr<Client>(new CallbackUnaryClient(config));
375*cc02d7e2SAndroid Build Coastguard Worker     case STREAMING:
376*cc02d7e2SAndroid Build Coastguard Worker       return std::unique_ptr<Client>(
377*cc02d7e2SAndroid Build Coastguard Worker           new CallbackStreamingPingPongClientImpl(config));
378*cc02d7e2SAndroid Build Coastguard Worker     case STREAMING_FROM_CLIENT:
379*cc02d7e2SAndroid Build Coastguard Worker     case STREAMING_FROM_SERVER:
380*cc02d7e2SAndroid Build Coastguard Worker     case STREAMING_BOTH_WAYS:
381*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(
382*cc02d7e2SAndroid Build Coastguard Worker           "STREAMING_FROM_* scenarios are not supported by the callback "
383*cc02d7e2SAndroid Build Coastguard Worker           "API");
384*cc02d7e2SAndroid Build Coastguard Worker     default:
385*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrCat("Unknown RPC type: ", config.rpc_type()));
386*cc02d7e2SAndroid Build Coastguard Worker   }
387*cc02d7e2SAndroid Build Coastguard Worker }
388*cc02d7e2SAndroid Build Coastguard Worker 
389*cc02d7e2SAndroid Build Coastguard Worker }  // namespace testing
390*cc02d7e2SAndroid Build Coastguard Worker }  // namespace grpc
391