xref: /aosp_15_r20/external/grpc-grpc/test/cpp/qps/client_sync.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker //     http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker 
19*cc02d7e2SAndroid Build Coastguard Worker #include <chrono>
20*cc02d7e2SAndroid Build Coastguard Worker #include <memory>
21*cc02d7e2SAndroid Build Coastguard Worker #include <mutex>
22*cc02d7e2SAndroid Build Coastguard Worker #include <sstream>
23*cc02d7e2SAndroid Build Coastguard Worker #include <string>
24*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
25*cc02d7e2SAndroid Build Coastguard Worker #include <vector>
26*cc02d7e2SAndroid Build Coastguard Worker 
27*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/grpc.h>
28*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/alloc.h>
29*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
30*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/time.h>
31*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/channel.h>
32*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/client_context.h>
33*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server.h>
34*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server_builder.h>
35*cc02d7e2SAndroid Build Coastguard Worker 
36*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/crash.h"
37*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
38*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/client.h"
39*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/interarrival.h"
40*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/usage_timer.h"
41*cc02d7e2SAndroid Build Coastguard Worker 
42*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
43*cc02d7e2SAndroid Build Coastguard Worker namespace testing {
44*cc02d7e2SAndroid Build Coastguard Worker 
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)45*cc02d7e2SAndroid Build Coastguard Worker static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
46*cc02d7e2SAndroid Build Coastguard Worker     const std::shared_ptr<Channel>& ch) {
47*cc02d7e2SAndroid Build Coastguard Worker   return BenchmarkService::NewStub(ch);
48*cc02d7e2SAndroid Build Coastguard Worker }
49*cc02d7e2SAndroid Build Coastguard Worker 
50*cc02d7e2SAndroid Build Coastguard Worker class SynchronousClient
51*cc02d7e2SAndroid Build Coastguard Worker     : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
52*cc02d7e2SAndroid Build Coastguard Worker  public:
SynchronousClient(const ClientConfig & config)53*cc02d7e2SAndroid Build Coastguard Worker   explicit SynchronousClient(const ClientConfig& config)
54*cc02d7e2SAndroid Build Coastguard Worker       : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
55*cc02d7e2SAndroid Build Coastguard Worker             config, BenchmarkStubCreator) {
56*cc02d7e2SAndroid Build Coastguard Worker     num_threads_ =
57*cc02d7e2SAndroid Build Coastguard Worker         config.outstanding_rpcs_per_channel() * config.client_channels();
58*cc02d7e2SAndroid Build Coastguard Worker     responses_.resize(num_threads_);
59*cc02d7e2SAndroid Build Coastguard Worker     SetupLoadTest(config, num_threads_);
60*cc02d7e2SAndroid Build Coastguard Worker   }
61*cc02d7e2SAndroid Build Coastguard Worker 
~SynchronousClient()62*cc02d7e2SAndroid Build Coastguard Worker   ~SynchronousClient() override {}
63*cc02d7e2SAndroid Build Coastguard Worker 
64*cc02d7e2SAndroid Build Coastguard Worker   virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
65*cc02d7e2SAndroid Build Coastguard Worker   virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
66*cc02d7e2SAndroid Build Coastguard Worker 
ThreadFunc(size_t thread_idx,Thread * t)67*cc02d7e2SAndroid Build Coastguard Worker   void ThreadFunc(size_t thread_idx, Thread* t) override {
68*cc02d7e2SAndroid Build Coastguard Worker     if (!InitThreadFuncImpl(thread_idx)) {
69*cc02d7e2SAndroid Build Coastguard Worker       return;
70*cc02d7e2SAndroid Build Coastguard Worker     }
71*cc02d7e2SAndroid Build Coastguard Worker     for (;;) {
72*cc02d7e2SAndroid Build Coastguard Worker       // run the loop body
73*cc02d7e2SAndroid Build Coastguard Worker       HistogramEntry entry;
74*cc02d7e2SAndroid Build Coastguard Worker       const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
75*cc02d7e2SAndroid Build Coastguard Worker       t->UpdateHistogram(&entry);
76*cc02d7e2SAndroid Build Coastguard Worker       if (!thread_still_ok || ThreadCompleted()) {
77*cc02d7e2SAndroid Build Coastguard Worker         return;
78*cc02d7e2SAndroid Build Coastguard Worker       }
79*cc02d7e2SAndroid Build Coastguard Worker     }
80*cc02d7e2SAndroid Build Coastguard Worker   }
81*cc02d7e2SAndroid Build Coastguard Worker 
82*cc02d7e2SAndroid Build Coastguard Worker  protected:
83*cc02d7e2SAndroid Build Coastguard Worker   // WaitToIssue returns false if we realize that we need to break out
WaitToIssue(int thread_idx)84*cc02d7e2SAndroid Build Coastguard Worker   bool WaitToIssue(int thread_idx) {
85*cc02d7e2SAndroid Build Coastguard Worker     if (!closed_loop_) {
86*cc02d7e2SAndroid Build Coastguard Worker       const gpr_timespec next_issue_time = NextIssueTime(thread_idx);
87*cc02d7e2SAndroid Build Coastguard Worker       // Avoid sleeping for too long continuously because we might
88*cc02d7e2SAndroid Build Coastguard Worker       // need to terminate before then. This is an issue since
89*cc02d7e2SAndroid Build Coastguard Worker       // exponential distribution can occasionally produce bad outliers
90*cc02d7e2SAndroid Build Coastguard Worker       while (true) {
91*cc02d7e2SAndroid Build Coastguard Worker         const gpr_timespec one_sec_delay =
92*cc02d7e2SAndroid Build Coastguard Worker             gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
93*cc02d7e2SAndroid Build Coastguard Worker                          gpr_time_from_seconds(1, GPR_TIMESPAN));
94*cc02d7e2SAndroid Build Coastguard Worker         if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) {
95*cc02d7e2SAndroid Build Coastguard Worker           gpr_sleep_until(next_issue_time);
96*cc02d7e2SAndroid Build Coastguard Worker           return true;
97*cc02d7e2SAndroid Build Coastguard Worker         } else {
98*cc02d7e2SAndroid Build Coastguard Worker           gpr_sleep_until(one_sec_delay);
99*cc02d7e2SAndroid Build Coastguard Worker           if (gpr_atm_acq_load(&thread_pool_done_) != gpr_atm{0}) {
100*cc02d7e2SAndroid Build Coastguard Worker             return false;
101*cc02d7e2SAndroid Build Coastguard Worker           }
102*cc02d7e2SAndroid Build Coastguard Worker         }
103*cc02d7e2SAndroid Build Coastguard Worker       }
104*cc02d7e2SAndroid Build Coastguard Worker     }
105*cc02d7e2SAndroid Build Coastguard Worker     return true;
106*cc02d7e2SAndroid Build Coastguard Worker   }
107*cc02d7e2SAndroid Build Coastguard Worker 
108*cc02d7e2SAndroid Build Coastguard Worker   size_t num_threads_;
109*cc02d7e2SAndroid Build Coastguard Worker   std::vector<SimpleResponse> responses_;
110*cc02d7e2SAndroid Build Coastguard Worker };
111*cc02d7e2SAndroid Build Coastguard Worker 
112*cc02d7e2SAndroid Build Coastguard Worker class SynchronousUnaryClient final : public SynchronousClient {
113*cc02d7e2SAndroid Build Coastguard Worker  public:
SynchronousUnaryClient(const ClientConfig & config)114*cc02d7e2SAndroid Build Coastguard Worker   explicit SynchronousUnaryClient(const ClientConfig& config)
115*cc02d7e2SAndroid Build Coastguard Worker       : SynchronousClient(config) {
116*cc02d7e2SAndroid Build Coastguard Worker     StartThreads(num_threads_);
117*cc02d7e2SAndroid Build Coastguard Worker   }
~SynchronousUnaryClient()118*cc02d7e2SAndroid Build Coastguard Worker   ~SynchronousUnaryClient() override {}
119*cc02d7e2SAndroid Build Coastguard Worker 
InitThreadFuncImpl(size_t)120*cc02d7e2SAndroid Build Coastguard Worker   bool InitThreadFuncImpl(size_t /*thread_idx*/) override { return true; }
121*cc02d7e2SAndroid Build Coastguard Worker 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)122*cc02d7e2SAndroid Build Coastguard Worker   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
123*cc02d7e2SAndroid Build Coastguard Worker     if (!WaitToIssue(thread_idx)) {
124*cc02d7e2SAndroid Build Coastguard Worker       return true;
125*cc02d7e2SAndroid Build Coastguard Worker     }
126*cc02d7e2SAndroid Build Coastguard Worker     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
127*cc02d7e2SAndroid Build Coastguard Worker     double start = UsageTimer::Now();
128*cc02d7e2SAndroid Build Coastguard Worker     grpc::ClientContext context;
129*cc02d7e2SAndroid Build Coastguard Worker     grpc::Status s =
130*cc02d7e2SAndroid Build Coastguard Worker         stub->UnaryCall(&context, request_, &responses_[thread_idx]);
131*cc02d7e2SAndroid Build Coastguard Worker     if (s.ok()) {
132*cc02d7e2SAndroid Build Coastguard Worker       entry->set_value((UsageTimer::Now() - start) * 1e9);
133*cc02d7e2SAndroid Build Coastguard Worker     }
134*cc02d7e2SAndroid Build Coastguard Worker     entry->set_status(s.error_code());
135*cc02d7e2SAndroid Build Coastguard Worker     return true;
136*cc02d7e2SAndroid Build Coastguard Worker   }
137*cc02d7e2SAndroid Build Coastguard Worker 
138*cc02d7e2SAndroid Build Coastguard Worker  private:
DestroyMultithreading()139*cc02d7e2SAndroid Build Coastguard Worker   void DestroyMultithreading() final { EndThreads(); }
140*cc02d7e2SAndroid Build Coastguard Worker };
141*cc02d7e2SAndroid Build Coastguard Worker 
142*cc02d7e2SAndroid Build Coastguard Worker template <class StreamType>
143*cc02d7e2SAndroid Build Coastguard Worker class SynchronousStreamingClient : public SynchronousClient {
144*cc02d7e2SAndroid Build Coastguard Worker  public:
SynchronousStreamingClient(const ClientConfig & config)145*cc02d7e2SAndroid Build Coastguard Worker   explicit SynchronousStreamingClient(const ClientConfig& config)
146*cc02d7e2SAndroid Build Coastguard Worker       : SynchronousClient(config),
147*cc02d7e2SAndroid Build Coastguard Worker         context_(num_threads_),
148*cc02d7e2SAndroid Build Coastguard Worker         stream_(num_threads_),
149*cc02d7e2SAndroid Build Coastguard Worker         stream_mu_(num_threads_),
150*cc02d7e2SAndroid Build Coastguard Worker         shutdown_(num_threads_),
151*cc02d7e2SAndroid Build Coastguard Worker         messages_per_stream_(config.messages_per_stream()),
152*cc02d7e2SAndroid Build Coastguard Worker         messages_issued_(num_threads_) {
153*cc02d7e2SAndroid Build Coastguard Worker     StartThreads(num_threads_);
154*cc02d7e2SAndroid Build Coastguard Worker   }
~SynchronousStreamingClient()155*cc02d7e2SAndroid Build Coastguard Worker   ~SynchronousStreamingClient() override {
156*cc02d7e2SAndroid Build Coastguard Worker     CleanupAllStreams([this](size_t thread_idx) {
157*cc02d7e2SAndroid Build Coastguard Worker       // Don't log any kind of error since we may have canceled this
158*cc02d7e2SAndroid Build Coastguard Worker       stream_[thread_idx]->Finish().IgnoreError();
159*cc02d7e2SAndroid Build Coastguard Worker     });
160*cc02d7e2SAndroid Build Coastguard Worker   }
161*cc02d7e2SAndroid Build Coastguard Worker 
162*cc02d7e2SAndroid Build Coastguard Worker  protected:
163*cc02d7e2SAndroid Build Coastguard Worker   std::vector<grpc::ClientContext> context_;
164*cc02d7e2SAndroid Build Coastguard Worker   std::vector<std::unique_ptr<StreamType>> stream_;
165*cc02d7e2SAndroid Build Coastguard Worker   // stream_mu_ is only needed when changing an element of stream_ or context_
166*cc02d7e2SAndroid Build Coastguard Worker   std::vector<std::mutex> stream_mu_;
167*cc02d7e2SAndroid Build Coastguard Worker   // use struct Bool rather than bool because vector<bool> is not concurrent
168*cc02d7e2SAndroid Build Coastguard Worker   struct Bool {
169*cc02d7e2SAndroid Build Coastguard Worker     bool val;
Boolgrpc::testing::SynchronousStreamingClient::Bool170*cc02d7e2SAndroid Build Coastguard Worker     Bool() : val(false) {}
171*cc02d7e2SAndroid Build Coastguard Worker   };
172*cc02d7e2SAndroid Build Coastguard Worker   std::vector<Bool> shutdown_;
173*cc02d7e2SAndroid Build Coastguard Worker   const int messages_per_stream_;
174*cc02d7e2SAndroid Build Coastguard Worker   std::vector<int> messages_issued_;
175*cc02d7e2SAndroid Build Coastguard Worker 
FinishStream(HistogramEntry * entry,size_t thread_idx)176*cc02d7e2SAndroid Build Coastguard Worker   void FinishStream(HistogramEntry* entry, size_t thread_idx) {
177*cc02d7e2SAndroid Build Coastguard Worker     Status s = stream_[thread_idx]->Finish();
178*cc02d7e2SAndroid Build Coastguard Worker     // don't set the value since the stream is failed and shouldn't be timed
179*cc02d7e2SAndroid Build Coastguard Worker     entry->set_status(s.error_code());
180*cc02d7e2SAndroid Build Coastguard Worker     if (!s.ok()) {
181*cc02d7e2SAndroid Build Coastguard Worker       std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
182*cc02d7e2SAndroid Build Coastguard Worker       if (!shutdown_[thread_idx].val) {
183*cc02d7e2SAndroid Build Coastguard Worker         gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
184*cc02d7e2SAndroid Build Coastguard Worker                 thread_idx, s.error_message().c_str());
185*cc02d7e2SAndroid Build Coastguard Worker       }
186*cc02d7e2SAndroid Build Coastguard Worker     }
187*cc02d7e2SAndroid Build Coastguard Worker     // Lock the stream_mu_ now because the client context could change
188*cc02d7e2SAndroid Build Coastguard Worker     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
189*cc02d7e2SAndroid Build Coastguard Worker     context_[thread_idx].~ClientContext();
190*cc02d7e2SAndroid Build Coastguard Worker     new (&context_[thread_idx]) ClientContext();
191*cc02d7e2SAndroid Build Coastguard Worker   }
192*cc02d7e2SAndroid Build Coastguard Worker 
CleanupAllStreams(const std::function<void (size_t)> & cleaner)193*cc02d7e2SAndroid Build Coastguard Worker   void CleanupAllStreams(const std::function<void(size_t)>& cleaner) {
194*cc02d7e2SAndroid Build Coastguard Worker     std::vector<std::thread> cleanup_threads;
195*cc02d7e2SAndroid Build Coastguard Worker     for (size_t i = 0; i < num_threads_; i++) {
196*cc02d7e2SAndroid Build Coastguard Worker       cleanup_threads.emplace_back([this, i, cleaner] {
197*cc02d7e2SAndroid Build Coastguard Worker         std::lock_guard<std::mutex> l(stream_mu_[i]);
198*cc02d7e2SAndroid Build Coastguard Worker         shutdown_[i].val = true;
199*cc02d7e2SAndroid Build Coastguard Worker         if (stream_[i]) {
200*cc02d7e2SAndroid Build Coastguard Worker           cleaner(i);
201*cc02d7e2SAndroid Build Coastguard Worker         }
202*cc02d7e2SAndroid Build Coastguard Worker       });
203*cc02d7e2SAndroid Build Coastguard Worker     }
204*cc02d7e2SAndroid Build Coastguard Worker     for (auto& th : cleanup_threads) {
205*cc02d7e2SAndroid Build Coastguard Worker       th.join();
206*cc02d7e2SAndroid Build Coastguard Worker     }
207*cc02d7e2SAndroid Build Coastguard Worker   }
208*cc02d7e2SAndroid Build Coastguard Worker 
209*cc02d7e2SAndroid Build Coastguard Worker  private:
DestroyMultithreading()210*cc02d7e2SAndroid Build Coastguard Worker   void DestroyMultithreading() final {
211*cc02d7e2SAndroid Build Coastguard Worker     CleanupAllStreams(
212*cc02d7e2SAndroid Build Coastguard Worker         [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
213*cc02d7e2SAndroid Build Coastguard Worker     EndThreads();
214*cc02d7e2SAndroid Build Coastguard Worker   }
215*cc02d7e2SAndroid Build Coastguard Worker };
216*cc02d7e2SAndroid Build Coastguard Worker 
217*cc02d7e2SAndroid Build Coastguard Worker class SynchronousStreamingPingPongClient final
218*cc02d7e2SAndroid Build Coastguard Worker     : public SynchronousStreamingClient<
219*cc02d7e2SAndroid Build Coastguard Worker           grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
220*cc02d7e2SAndroid Build Coastguard Worker  public:
SynchronousStreamingPingPongClient(const ClientConfig & config)221*cc02d7e2SAndroid Build Coastguard Worker   explicit SynchronousStreamingPingPongClient(const ClientConfig& config)
222*cc02d7e2SAndroid Build Coastguard Worker       : SynchronousStreamingClient(config) {}
~SynchronousStreamingPingPongClient()223*cc02d7e2SAndroid Build Coastguard Worker   ~SynchronousStreamingPingPongClient() override {
224*cc02d7e2SAndroid Build Coastguard Worker     CleanupAllStreams(
225*cc02d7e2SAndroid Build Coastguard Worker         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
226*cc02d7e2SAndroid Build Coastguard Worker   }
227*cc02d7e2SAndroid Build Coastguard Worker 
228*cc02d7e2SAndroid Build Coastguard Worker  private:
InitThreadFuncImpl(size_t thread_idx)229*cc02d7e2SAndroid Build Coastguard Worker   bool InitThreadFuncImpl(size_t thread_idx) override {
230*cc02d7e2SAndroid Build Coastguard Worker     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
231*cc02d7e2SAndroid Build Coastguard Worker     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
232*cc02d7e2SAndroid Build Coastguard Worker     if (!shutdown_[thread_idx].val) {
233*cc02d7e2SAndroid Build Coastguard Worker       stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
234*cc02d7e2SAndroid Build Coastguard Worker     } else {
235*cc02d7e2SAndroid Build Coastguard Worker       return false;
236*cc02d7e2SAndroid Build Coastguard Worker     }
237*cc02d7e2SAndroid Build Coastguard Worker     messages_issued_[thread_idx] = 0;
238*cc02d7e2SAndroid Build Coastguard Worker     return true;
239*cc02d7e2SAndroid Build Coastguard Worker   }
240*cc02d7e2SAndroid Build Coastguard Worker 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)241*cc02d7e2SAndroid Build Coastguard Worker   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
242*cc02d7e2SAndroid Build Coastguard Worker     if (!WaitToIssue(thread_idx)) {
243*cc02d7e2SAndroid Build Coastguard Worker       return true;
244*cc02d7e2SAndroid Build Coastguard Worker     }
245*cc02d7e2SAndroid Build Coastguard Worker     double start = UsageTimer::Now();
246*cc02d7e2SAndroid Build Coastguard Worker     if (stream_[thread_idx]->Write(request_) &&
247*cc02d7e2SAndroid Build Coastguard Worker         stream_[thread_idx]->Read(&responses_[thread_idx])) {
248*cc02d7e2SAndroid Build Coastguard Worker       entry->set_value((UsageTimer::Now() - start) * 1e9);
249*cc02d7e2SAndroid Build Coastguard Worker       // don't set the status since there isn't one yet
250*cc02d7e2SAndroid Build Coastguard Worker       if ((messages_per_stream_ != 0) &&
251*cc02d7e2SAndroid Build Coastguard Worker           (++messages_issued_[thread_idx] < messages_per_stream_)) {
252*cc02d7e2SAndroid Build Coastguard Worker         return true;
253*cc02d7e2SAndroid Build Coastguard Worker       } else if (messages_per_stream_ == 0) {
254*cc02d7e2SAndroid Build Coastguard Worker         return true;
255*cc02d7e2SAndroid Build Coastguard Worker       } else {
256*cc02d7e2SAndroid Build Coastguard Worker         // Fall through to the below resetting code after finish
257*cc02d7e2SAndroid Build Coastguard Worker       }
258*cc02d7e2SAndroid Build Coastguard Worker     }
259*cc02d7e2SAndroid Build Coastguard Worker     stream_[thread_idx]->WritesDone();
260*cc02d7e2SAndroid Build Coastguard Worker     FinishStream(entry, thread_idx);
261*cc02d7e2SAndroid Build Coastguard Worker     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
262*cc02d7e2SAndroid Build Coastguard Worker     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
263*cc02d7e2SAndroid Build Coastguard Worker     if (!shutdown_[thread_idx].val) {
264*cc02d7e2SAndroid Build Coastguard Worker       stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
265*cc02d7e2SAndroid Build Coastguard Worker     } else {
266*cc02d7e2SAndroid Build Coastguard Worker       stream_[thread_idx].reset();
267*cc02d7e2SAndroid Build Coastguard Worker       return false;
268*cc02d7e2SAndroid Build Coastguard Worker     }
269*cc02d7e2SAndroid Build Coastguard Worker     messages_issued_[thread_idx] = 0;
270*cc02d7e2SAndroid Build Coastguard Worker     return true;
271*cc02d7e2SAndroid Build Coastguard Worker   }
272*cc02d7e2SAndroid Build Coastguard Worker };
273*cc02d7e2SAndroid Build Coastguard Worker 
274*cc02d7e2SAndroid Build Coastguard Worker class SynchronousStreamingFromClientClient final
275*cc02d7e2SAndroid Build Coastguard Worker     : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
276*cc02d7e2SAndroid Build Coastguard Worker  public:
SynchronousStreamingFromClientClient(const ClientConfig & config)277*cc02d7e2SAndroid Build Coastguard Worker   explicit SynchronousStreamingFromClientClient(const ClientConfig& config)
278*cc02d7e2SAndroid Build Coastguard Worker       : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient()279*cc02d7e2SAndroid Build Coastguard Worker   ~SynchronousStreamingFromClientClient() override {
280*cc02d7e2SAndroid Build Coastguard Worker     CleanupAllStreams(
281*cc02d7e2SAndroid Build Coastguard Worker         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
282*cc02d7e2SAndroid Build Coastguard Worker   }
283*cc02d7e2SAndroid Build Coastguard Worker 
284*cc02d7e2SAndroid Build Coastguard Worker  private:
285*cc02d7e2SAndroid Build Coastguard Worker   std::vector<double> last_issue_;
286*cc02d7e2SAndroid Build Coastguard Worker 
InitThreadFuncImpl(size_t thread_idx)287*cc02d7e2SAndroid Build Coastguard Worker   bool InitThreadFuncImpl(size_t thread_idx) override {
288*cc02d7e2SAndroid Build Coastguard Worker     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
289*cc02d7e2SAndroid Build Coastguard Worker     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
290*cc02d7e2SAndroid Build Coastguard Worker     if (!shutdown_[thread_idx].val) {
291*cc02d7e2SAndroid Build Coastguard Worker       stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
292*cc02d7e2SAndroid Build Coastguard Worker                                                       &responses_[thread_idx]);
293*cc02d7e2SAndroid Build Coastguard Worker     } else {
294*cc02d7e2SAndroid Build Coastguard Worker       return false;
295*cc02d7e2SAndroid Build Coastguard Worker     }
296*cc02d7e2SAndroid Build Coastguard Worker     last_issue_[thread_idx] = UsageTimer::Now();
297*cc02d7e2SAndroid Build Coastguard Worker     return true;
298*cc02d7e2SAndroid Build Coastguard Worker   }
299*cc02d7e2SAndroid Build Coastguard Worker 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)300*cc02d7e2SAndroid Build Coastguard Worker   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
301*cc02d7e2SAndroid Build Coastguard Worker     // Figure out how to make histogram sensible if this is rate-paced
302*cc02d7e2SAndroid Build Coastguard Worker     if (!WaitToIssue(thread_idx)) {
303*cc02d7e2SAndroid Build Coastguard Worker       return true;
304*cc02d7e2SAndroid Build Coastguard Worker     }
305*cc02d7e2SAndroid Build Coastguard Worker     if (stream_[thread_idx]->Write(request_)) {
306*cc02d7e2SAndroid Build Coastguard Worker       double now = UsageTimer::Now();
307*cc02d7e2SAndroid Build Coastguard Worker       entry->set_value((now - last_issue_[thread_idx]) * 1e9);
308*cc02d7e2SAndroid Build Coastguard Worker       last_issue_[thread_idx] = now;
309*cc02d7e2SAndroid Build Coastguard Worker       return true;
310*cc02d7e2SAndroid Build Coastguard Worker     }
311*cc02d7e2SAndroid Build Coastguard Worker     stream_[thread_idx]->WritesDone();
312*cc02d7e2SAndroid Build Coastguard Worker     FinishStream(entry, thread_idx);
313*cc02d7e2SAndroid Build Coastguard Worker     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
314*cc02d7e2SAndroid Build Coastguard Worker     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
315*cc02d7e2SAndroid Build Coastguard Worker     if (!shutdown_[thread_idx].val) {
316*cc02d7e2SAndroid Build Coastguard Worker       stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
317*cc02d7e2SAndroid Build Coastguard Worker                                                       &responses_[thread_idx]);
318*cc02d7e2SAndroid Build Coastguard Worker     } else {
319*cc02d7e2SAndroid Build Coastguard Worker       stream_[thread_idx].reset();
320*cc02d7e2SAndroid Build Coastguard Worker       return false;
321*cc02d7e2SAndroid Build Coastguard Worker     }
322*cc02d7e2SAndroid Build Coastguard Worker     return true;
323*cc02d7e2SAndroid Build Coastguard Worker   }
324*cc02d7e2SAndroid Build Coastguard Worker };
325*cc02d7e2SAndroid Build Coastguard Worker 
326*cc02d7e2SAndroid Build Coastguard Worker class SynchronousStreamingFromServerClient final
327*cc02d7e2SAndroid Build Coastguard Worker     : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
328*cc02d7e2SAndroid Build Coastguard Worker  public:
SynchronousStreamingFromServerClient(const ClientConfig & config)329*cc02d7e2SAndroid Build Coastguard Worker   explicit SynchronousStreamingFromServerClient(const ClientConfig& config)
330*cc02d7e2SAndroid Build Coastguard Worker       : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
~SynchronousStreamingFromServerClient()331*cc02d7e2SAndroid Build Coastguard Worker   ~SynchronousStreamingFromServerClient() override {}
332*cc02d7e2SAndroid Build Coastguard Worker 
333*cc02d7e2SAndroid Build Coastguard Worker  private:
334*cc02d7e2SAndroid Build Coastguard Worker   std::vector<double> last_recv_;
335*cc02d7e2SAndroid Build Coastguard Worker 
InitThreadFuncImpl(size_t thread_idx)336*cc02d7e2SAndroid Build Coastguard Worker   bool InitThreadFuncImpl(size_t thread_idx) override {
337*cc02d7e2SAndroid Build Coastguard Worker     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
338*cc02d7e2SAndroid Build Coastguard Worker     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
339*cc02d7e2SAndroid Build Coastguard Worker     if (!shutdown_[thread_idx].val) {
340*cc02d7e2SAndroid Build Coastguard Worker       stream_[thread_idx] =
341*cc02d7e2SAndroid Build Coastguard Worker           stub->StreamingFromServer(&context_[thread_idx], request_);
342*cc02d7e2SAndroid Build Coastguard Worker     } else {
343*cc02d7e2SAndroid Build Coastguard Worker       return false;
344*cc02d7e2SAndroid Build Coastguard Worker     }
345*cc02d7e2SAndroid Build Coastguard Worker     last_recv_[thread_idx] = UsageTimer::Now();
346*cc02d7e2SAndroid Build Coastguard Worker     return true;
347*cc02d7e2SAndroid Build Coastguard Worker   }
348*cc02d7e2SAndroid Build Coastguard Worker 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)349*cc02d7e2SAndroid Build Coastguard Worker   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
350*cc02d7e2SAndroid Build Coastguard Worker     if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
351*cc02d7e2SAndroid Build Coastguard Worker       double now = UsageTimer::Now();
352*cc02d7e2SAndroid Build Coastguard Worker       entry->set_value((now - last_recv_[thread_idx]) * 1e9);
353*cc02d7e2SAndroid Build Coastguard Worker       last_recv_[thread_idx] = now;
354*cc02d7e2SAndroid Build Coastguard Worker       return true;
355*cc02d7e2SAndroid Build Coastguard Worker     }
356*cc02d7e2SAndroid Build Coastguard Worker     FinishStream(entry, thread_idx);
357*cc02d7e2SAndroid Build Coastguard Worker     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
358*cc02d7e2SAndroid Build Coastguard Worker     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
359*cc02d7e2SAndroid Build Coastguard Worker     if (!shutdown_[thread_idx].val) {
360*cc02d7e2SAndroid Build Coastguard Worker       stream_[thread_idx] =
361*cc02d7e2SAndroid Build Coastguard Worker           stub->StreamingFromServer(&context_[thread_idx], request_);
362*cc02d7e2SAndroid Build Coastguard Worker     } else {
363*cc02d7e2SAndroid Build Coastguard Worker       stream_[thread_idx].reset();
364*cc02d7e2SAndroid Build Coastguard Worker       return false;
365*cc02d7e2SAndroid Build Coastguard Worker     }
366*cc02d7e2SAndroid Build Coastguard Worker     return true;
367*cc02d7e2SAndroid Build Coastguard Worker   }
368*cc02d7e2SAndroid Build Coastguard Worker };
369*cc02d7e2SAndroid Build Coastguard Worker 
370*cc02d7e2SAndroid Build Coastguard Worker class SynchronousStreamingBothWaysClient final
371*cc02d7e2SAndroid Build Coastguard Worker     : public SynchronousStreamingClient<
372*cc02d7e2SAndroid Build Coastguard Worker           grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
373*cc02d7e2SAndroid Build Coastguard Worker  public:
SynchronousStreamingBothWaysClient(const ClientConfig & config)374*cc02d7e2SAndroid Build Coastguard Worker   explicit SynchronousStreamingBothWaysClient(const ClientConfig& config)
375*cc02d7e2SAndroid Build Coastguard Worker       : SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient()376*cc02d7e2SAndroid Build Coastguard Worker   ~SynchronousStreamingBothWaysClient() override {
377*cc02d7e2SAndroid Build Coastguard Worker     CleanupAllStreams(
378*cc02d7e2SAndroid Build Coastguard Worker         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
379*cc02d7e2SAndroid Build Coastguard Worker   }
380*cc02d7e2SAndroid Build Coastguard Worker 
381*cc02d7e2SAndroid Build Coastguard Worker  private:
InitThreadFuncImpl(size_t thread_idx)382*cc02d7e2SAndroid Build Coastguard Worker   bool InitThreadFuncImpl(size_t thread_idx) override {
383*cc02d7e2SAndroid Build Coastguard Worker     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
384*cc02d7e2SAndroid Build Coastguard Worker     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
385*cc02d7e2SAndroid Build Coastguard Worker     if (!shutdown_[thread_idx].val) {
386*cc02d7e2SAndroid Build Coastguard Worker       stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
387*cc02d7e2SAndroid Build Coastguard Worker     } else {
388*cc02d7e2SAndroid Build Coastguard Worker       return false;
389*cc02d7e2SAndroid Build Coastguard Worker     }
390*cc02d7e2SAndroid Build Coastguard Worker     return true;
391*cc02d7e2SAndroid Build Coastguard Worker   }
392*cc02d7e2SAndroid Build Coastguard Worker 
ThreadFuncImpl(HistogramEntry *,size_t)393*cc02d7e2SAndroid Build Coastguard Worker   bool ThreadFuncImpl(HistogramEntry* /*entry*/,
394*cc02d7e2SAndroid Build Coastguard Worker                       size_t /*thread_idx*/) override {
395*cc02d7e2SAndroid Build Coastguard Worker     // TODO (vjpai): Do this
396*cc02d7e2SAndroid Build Coastguard Worker     return true;
397*cc02d7e2SAndroid Build Coastguard Worker   }
398*cc02d7e2SAndroid Build Coastguard Worker };
399*cc02d7e2SAndroid Build Coastguard Worker 
CreateSynchronousClient(const ClientConfig & config)400*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
401*cc02d7e2SAndroid Build Coastguard Worker   GPR_ASSERT(!config.use_coalesce_api());  // not supported yet.
402*cc02d7e2SAndroid Build Coastguard Worker   switch (config.rpc_type()) {
403*cc02d7e2SAndroid Build Coastguard Worker     case UNARY:
404*cc02d7e2SAndroid Build Coastguard Worker       return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
405*cc02d7e2SAndroid Build Coastguard Worker     case STREAMING:
406*cc02d7e2SAndroid Build Coastguard Worker       return std::unique_ptr<Client>(
407*cc02d7e2SAndroid Build Coastguard Worker           new SynchronousStreamingPingPongClient(config));
408*cc02d7e2SAndroid Build Coastguard Worker     case STREAMING_FROM_CLIENT:
409*cc02d7e2SAndroid Build Coastguard Worker       return std::unique_ptr<Client>(
410*cc02d7e2SAndroid Build Coastguard Worker           new SynchronousStreamingFromClientClient(config));
411*cc02d7e2SAndroid Build Coastguard Worker     case STREAMING_FROM_SERVER:
412*cc02d7e2SAndroid Build Coastguard Worker       return std::unique_ptr<Client>(
413*cc02d7e2SAndroid Build Coastguard Worker           new SynchronousStreamingFromServerClient(config));
414*cc02d7e2SAndroid Build Coastguard Worker     case STREAMING_BOTH_WAYS:
415*cc02d7e2SAndroid Build Coastguard Worker       return std::unique_ptr<Client>(
416*cc02d7e2SAndroid Build Coastguard Worker           new SynchronousStreamingBothWaysClient(config));
417*cc02d7e2SAndroid Build Coastguard Worker     default:
418*cc02d7e2SAndroid Build Coastguard Worker       assert(false);
419*cc02d7e2SAndroid Build Coastguard Worker       return nullptr;
420*cc02d7e2SAndroid Build Coastguard Worker   }
421*cc02d7e2SAndroid Build Coastguard Worker }
422*cc02d7e2SAndroid Build Coastguard Worker 
423*cc02d7e2SAndroid Build Coastguard Worker }  // namespace testing
424*cc02d7e2SAndroid Build Coastguard Worker }  // namespace grpc
425