xref: /aosp_15_r20/external/grpc-grpc/test/cpp/qps/client.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker //     http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker 
19*cc02d7e2SAndroid Build Coastguard Worker #ifndef GRPC_TEST_CPP_QPS_CLIENT_H
20*cc02d7e2SAndroid Build Coastguard Worker #define GRPC_TEST_CPP_QPS_CLIENT_H
21*cc02d7e2SAndroid Build Coastguard Worker 
22*cc02d7e2SAndroid Build Coastguard Worker #include <inttypes.h>
23*cc02d7e2SAndroid Build Coastguard Worker #include <stdint.h>
24*cc02d7e2SAndroid Build Coastguard Worker #include <stdlib.h>
25*cc02d7e2SAndroid Build Coastguard Worker 
26*cc02d7e2SAndroid Build Coastguard Worker #include <condition_variable>
27*cc02d7e2SAndroid Build Coastguard Worker #include <mutex>
28*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
29*cc02d7e2SAndroid Build Coastguard Worker #include <unordered_map>
30*cc02d7e2SAndroid Build Coastguard Worker #include <vector>
31*cc02d7e2SAndroid Build Coastguard Worker 
32*cc02d7e2SAndroid Build Coastguard Worker #include "absl/memory/memory.h"
33*cc02d7e2SAndroid Build Coastguard Worker #include "absl/strings/match.h"
34*cc02d7e2SAndroid Build Coastguard Worker #include "absl/strings/str_format.h"
35*cc02d7e2SAndroid Build Coastguard Worker 
36*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
37*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/time.h>
38*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/channel.h>
39*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/byte_buffer.h>
40*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/channel_arguments.h>
41*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/slice.h>
42*cc02d7e2SAndroid Build Coastguard Worker 
43*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/crash.h"
44*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/env.h"
45*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
46*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/payloads.pb.h"
47*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/histogram.h"
48*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/interarrival.h"
49*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/qps_worker.h"
50*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/server.h"
51*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/usage_timer.h"
52*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/util/create_test_channel.h"
53*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/util/test_credentials_provider.h"
54*cc02d7e2SAndroid Build Coastguard Worker 
55*cc02d7e2SAndroid Build Coastguard Worker #define INPROC_NAME_PREFIX "qpsinproc:"
56*cc02d7e2SAndroid Build Coastguard Worker 
57*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
58*cc02d7e2SAndroid Build Coastguard Worker namespace testing {
59*cc02d7e2SAndroid Build Coastguard Worker 
60*cc02d7e2SAndroid Build Coastguard Worker template <class RequestType>
61*cc02d7e2SAndroid Build Coastguard Worker class ClientRequestCreator {
62*cc02d7e2SAndroid Build Coastguard Worker  public:
ClientRequestCreator(RequestType *,const PayloadConfig &)63*cc02d7e2SAndroid Build Coastguard Worker   ClientRequestCreator(RequestType* /*req*/, const PayloadConfig&) {
64*cc02d7e2SAndroid Build Coastguard Worker     // this template must be specialized
65*cc02d7e2SAndroid Build Coastguard Worker     // fail with an assertion rather than a compile-time
66*cc02d7e2SAndroid Build Coastguard Worker     // check since these only happen at the beginning anyway
67*cc02d7e2SAndroid Build Coastguard Worker     grpc_core::Crash("unreachable");
68*cc02d7e2SAndroid Build Coastguard Worker   }
69*cc02d7e2SAndroid Build Coastguard Worker };
70*cc02d7e2SAndroid Build Coastguard Worker 
71*cc02d7e2SAndroid Build Coastguard Worker template <>
72*cc02d7e2SAndroid Build Coastguard Worker class ClientRequestCreator<SimpleRequest> {
73*cc02d7e2SAndroid Build Coastguard Worker  public:
ClientRequestCreator(SimpleRequest * req,const PayloadConfig & payload_config)74*cc02d7e2SAndroid Build Coastguard Worker   ClientRequestCreator(SimpleRequest* req,
75*cc02d7e2SAndroid Build Coastguard Worker                        const PayloadConfig& payload_config) {
76*cc02d7e2SAndroid Build Coastguard Worker     if (payload_config.has_bytebuf_params()) {
77*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat(
78*cc02d7e2SAndroid Build Coastguard Worker           "Invalid PayloadConfig, config cannot have bytebuf_params: %s",
79*cc02d7e2SAndroid Build Coastguard Worker           payload_config.DebugString()
80*cc02d7e2SAndroid Build Coastguard Worker               .c_str()));  // not appropriate for this specialization
81*cc02d7e2SAndroid Build Coastguard Worker     } else if (payload_config.has_simple_params()) {
82*cc02d7e2SAndroid Build Coastguard Worker       req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
83*cc02d7e2SAndroid Build Coastguard Worker       req->set_response_size(payload_config.simple_params().resp_size());
84*cc02d7e2SAndroid Build Coastguard Worker       req->mutable_payload()->set_type(
85*cc02d7e2SAndroid Build Coastguard Worker           grpc::testing::PayloadType::COMPRESSABLE);
86*cc02d7e2SAndroid Build Coastguard Worker       int size = payload_config.simple_params().req_size();
87*cc02d7e2SAndroid Build Coastguard Worker       std::unique_ptr<char[]> body(new char[size]);
88*cc02d7e2SAndroid Build Coastguard Worker       req->mutable_payload()->set_body(body.get(), size);
89*cc02d7e2SAndroid Build Coastguard Worker     } else if (payload_config.has_complex_params()) {
90*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat(
91*cc02d7e2SAndroid Build Coastguard Worker           "Invalid PayloadConfig, cannot have complex_params: %s",
92*cc02d7e2SAndroid Build Coastguard Worker           payload_config.DebugString()
93*cc02d7e2SAndroid Build Coastguard Worker               .c_str()));  // not appropriate for this specialization
94*cc02d7e2SAndroid Build Coastguard Worker     } else {
95*cc02d7e2SAndroid Build Coastguard Worker       // default should be simple proto without payloads
96*cc02d7e2SAndroid Build Coastguard Worker       req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
97*cc02d7e2SAndroid Build Coastguard Worker       req->set_response_size(0);
98*cc02d7e2SAndroid Build Coastguard Worker       req->mutable_payload()->set_type(
99*cc02d7e2SAndroid Build Coastguard Worker           grpc::testing::PayloadType::COMPRESSABLE);
100*cc02d7e2SAndroid Build Coastguard Worker     }
101*cc02d7e2SAndroid Build Coastguard Worker   }
102*cc02d7e2SAndroid Build Coastguard Worker };
103*cc02d7e2SAndroid Build Coastguard Worker 
104*cc02d7e2SAndroid Build Coastguard Worker template <>
105*cc02d7e2SAndroid Build Coastguard Worker class ClientRequestCreator<ByteBuffer> {
106*cc02d7e2SAndroid Build Coastguard Worker  public:
ClientRequestCreator(ByteBuffer * req,const PayloadConfig & payload_config)107*cc02d7e2SAndroid Build Coastguard Worker   ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) {
108*cc02d7e2SAndroid Build Coastguard Worker     if (payload_config.has_bytebuf_params()) {
109*cc02d7e2SAndroid Build Coastguard Worker       size_t req_sz =
110*cc02d7e2SAndroid Build Coastguard Worker           static_cast<size_t>(payload_config.bytebuf_params().req_size());
111*cc02d7e2SAndroid Build Coastguard Worker       std::unique_ptr<char[]> buf(new char[req_sz]);
112*cc02d7e2SAndroid Build Coastguard Worker       memset(buf.get(), 0, req_sz);
113*cc02d7e2SAndroid Build Coastguard Worker       Slice slice(buf.get(), req_sz);
114*cc02d7e2SAndroid Build Coastguard Worker       *req = ByteBuffer(&slice, 1);
115*cc02d7e2SAndroid Build Coastguard Worker     } else {
116*cc02d7e2SAndroid Build Coastguard Worker       grpc_core::Crash(absl::StrFormat(
117*cc02d7e2SAndroid Build Coastguard Worker           "Invalid PayloadConfig, missing bytebug_params: %s",
118*cc02d7e2SAndroid Build Coastguard Worker           payload_config.DebugString()
119*cc02d7e2SAndroid Build Coastguard Worker               .c_str()));  // not appropriate for this specialization
120*cc02d7e2SAndroid Build Coastguard Worker     }
121*cc02d7e2SAndroid Build Coastguard Worker   }
122*cc02d7e2SAndroid Build Coastguard Worker };
123*cc02d7e2SAndroid Build Coastguard Worker 
124*cc02d7e2SAndroid Build Coastguard Worker class HistogramEntry final {
125*cc02d7e2SAndroid Build Coastguard Worker  public:
HistogramEntry()126*cc02d7e2SAndroid Build Coastguard Worker   HistogramEntry() : value_used_(false), status_used_(false) {}
value_used()127*cc02d7e2SAndroid Build Coastguard Worker   bool value_used() const { return value_used_; }
value()128*cc02d7e2SAndroid Build Coastguard Worker   double value() const { return value_; }
set_value(double v)129*cc02d7e2SAndroid Build Coastguard Worker   void set_value(double v) {
130*cc02d7e2SAndroid Build Coastguard Worker     value_used_ = true;
131*cc02d7e2SAndroid Build Coastguard Worker     value_ = v;
132*cc02d7e2SAndroid Build Coastguard Worker   }
status_used()133*cc02d7e2SAndroid Build Coastguard Worker   bool status_used() const { return status_used_; }
status()134*cc02d7e2SAndroid Build Coastguard Worker   int status() const { return status_; }
set_status(int status)135*cc02d7e2SAndroid Build Coastguard Worker   void set_status(int status) {
136*cc02d7e2SAndroid Build Coastguard Worker     status_used_ = true;
137*cc02d7e2SAndroid Build Coastguard Worker     status_ = status;
138*cc02d7e2SAndroid Build Coastguard Worker   }
139*cc02d7e2SAndroid Build Coastguard Worker 
140*cc02d7e2SAndroid Build Coastguard Worker  private:
141*cc02d7e2SAndroid Build Coastguard Worker   bool value_used_;
142*cc02d7e2SAndroid Build Coastguard Worker   double value_;
143*cc02d7e2SAndroid Build Coastguard Worker   bool status_used_;
144*cc02d7e2SAndroid Build Coastguard Worker   int status_;
145*cc02d7e2SAndroid Build Coastguard Worker };
146*cc02d7e2SAndroid Build Coastguard Worker 
147*cc02d7e2SAndroid Build Coastguard Worker typedef std::unordered_map<int, int64_t> StatusHistogram;
148*cc02d7e2SAndroid Build Coastguard Worker 
MergeStatusHistogram(const StatusHistogram & from,StatusHistogram * to)149*cc02d7e2SAndroid Build Coastguard Worker inline void MergeStatusHistogram(const StatusHistogram& from,
150*cc02d7e2SAndroid Build Coastguard Worker                                  StatusHistogram* to) {
151*cc02d7e2SAndroid Build Coastguard Worker   for (StatusHistogram::const_iterator it = from.begin(); it != from.end();
152*cc02d7e2SAndroid Build Coastguard Worker        ++it) {
153*cc02d7e2SAndroid Build Coastguard Worker     (*to)[it->first] += it->second;
154*cc02d7e2SAndroid Build Coastguard Worker   }
155*cc02d7e2SAndroid Build Coastguard Worker }
156*cc02d7e2SAndroid Build Coastguard Worker 
157*cc02d7e2SAndroid Build Coastguard Worker class Client {
158*cc02d7e2SAndroid Build Coastguard Worker  public:
Client()159*cc02d7e2SAndroid Build Coastguard Worker   Client()
160*cc02d7e2SAndroid Build Coastguard Worker       : timer_(new UsageTimer),
161*cc02d7e2SAndroid Build Coastguard Worker         interarrival_timer_(),
162*cc02d7e2SAndroid Build Coastguard Worker         started_requests_(false),
163*cc02d7e2SAndroid Build Coastguard Worker         last_reset_poll_count_(0) {
164*cc02d7e2SAndroid Build Coastguard Worker     gpr_event_init(&start_requests_);
165*cc02d7e2SAndroid Build Coastguard Worker   }
~Client()166*cc02d7e2SAndroid Build Coastguard Worker   virtual ~Client() {}
167*cc02d7e2SAndroid Build Coastguard Worker 
Mark(bool reset)168*cc02d7e2SAndroid Build Coastguard Worker   ClientStats Mark(bool reset) {
169*cc02d7e2SAndroid Build Coastguard Worker     Histogram latencies;
170*cc02d7e2SAndroid Build Coastguard Worker     StatusHistogram statuses;
171*cc02d7e2SAndroid Build Coastguard Worker     UsageTimer::Result timer_result;
172*cc02d7e2SAndroid Build Coastguard Worker 
173*cc02d7e2SAndroid Build Coastguard Worker     MaybeStartRequests();
174*cc02d7e2SAndroid Build Coastguard Worker 
175*cc02d7e2SAndroid Build Coastguard Worker     int cur_poll_count = GetPollCount();
176*cc02d7e2SAndroid Build Coastguard Worker     int poll_count = cur_poll_count - last_reset_poll_count_;
177*cc02d7e2SAndroid Build Coastguard Worker     if (reset) {
178*cc02d7e2SAndroid Build Coastguard Worker       std::vector<Histogram> to_merge(threads_.size());
179*cc02d7e2SAndroid Build Coastguard Worker       std::vector<StatusHistogram> to_merge_status(threads_.size());
180*cc02d7e2SAndroid Build Coastguard Worker 
181*cc02d7e2SAndroid Build Coastguard Worker       for (size_t i = 0; i < threads_.size(); i++) {
182*cc02d7e2SAndroid Build Coastguard Worker         threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]);
183*cc02d7e2SAndroid Build Coastguard Worker       }
184*cc02d7e2SAndroid Build Coastguard Worker       std::unique_ptr<UsageTimer> timer(new UsageTimer);
185*cc02d7e2SAndroid Build Coastguard Worker       timer_.swap(timer);
186*cc02d7e2SAndroid Build Coastguard Worker       for (size_t i = 0; i < threads_.size(); i++) {
187*cc02d7e2SAndroid Build Coastguard Worker         latencies.Merge(to_merge[i]);
188*cc02d7e2SAndroid Build Coastguard Worker         MergeStatusHistogram(to_merge_status[i], &statuses);
189*cc02d7e2SAndroid Build Coastguard Worker       }
190*cc02d7e2SAndroid Build Coastguard Worker       timer_result = timer->Mark();
191*cc02d7e2SAndroid Build Coastguard Worker       last_reset_poll_count_ = cur_poll_count;
192*cc02d7e2SAndroid Build Coastguard Worker     } else {
193*cc02d7e2SAndroid Build Coastguard Worker       // merge snapshots of each thread histogram
194*cc02d7e2SAndroid Build Coastguard Worker       for (size_t i = 0; i < threads_.size(); i++) {
195*cc02d7e2SAndroid Build Coastguard Worker         threads_[i]->MergeStatsInto(&latencies, &statuses);
196*cc02d7e2SAndroid Build Coastguard Worker       }
197*cc02d7e2SAndroid Build Coastguard Worker       timer_result = timer_->Mark();
198*cc02d7e2SAndroid Build Coastguard Worker     }
199*cc02d7e2SAndroid Build Coastguard Worker 
200*cc02d7e2SAndroid Build Coastguard Worker     // Print the median latency per interval for one thread.
201*cc02d7e2SAndroid Build Coastguard Worker     // If the number of warmup seconds is x, then the first x + 1 numbers in the
202*cc02d7e2SAndroid Build Coastguard Worker     // vector are from the warmup period and should be discarded.
203*cc02d7e2SAndroid Build Coastguard Worker     if (median_latency_collection_interval_seconds_ > 0) {
204*cc02d7e2SAndroid Build Coastguard Worker       std::vector<double> medians_per_interval =
205*cc02d7e2SAndroid Build Coastguard Worker           threads_[0]->GetMedianPerIntervalList();
206*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_INFO, "Num threads: %zu", threads_.size());
207*cc02d7e2SAndroid Build Coastguard Worker       gpr_log(GPR_INFO, "Number of medians: %zu", medians_per_interval.size());
208*cc02d7e2SAndroid Build Coastguard Worker       for (size_t j = 0; j < medians_per_interval.size(); j++) {
209*cc02d7e2SAndroid Build Coastguard Worker         gpr_log(GPR_INFO, "%f", medians_per_interval[j]);
210*cc02d7e2SAndroid Build Coastguard Worker       }
211*cc02d7e2SAndroid Build Coastguard Worker     }
212*cc02d7e2SAndroid Build Coastguard Worker 
213*cc02d7e2SAndroid Build Coastguard Worker     ClientStats stats;
214*cc02d7e2SAndroid Build Coastguard Worker     latencies.FillProto(stats.mutable_latencies());
215*cc02d7e2SAndroid Build Coastguard Worker     for (StatusHistogram::const_iterator it = statuses.begin();
216*cc02d7e2SAndroid Build Coastguard Worker          it != statuses.end(); ++it) {
217*cc02d7e2SAndroid Build Coastguard Worker       RequestResultCount* rrc = stats.add_request_results();
218*cc02d7e2SAndroid Build Coastguard Worker       rrc->set_status_code(it->first);
219*cc02d7e2SAndroid Build Coastguard Worker       rrc->set_count(it->second);
220*cc02d7e2SAndroid Build Coastguard Worker     }
221*cc02d7e2SAndroid Build Coastguard Worker     stats.set_time_elapsed(timer_result.wall);
222*cc02d7e2SAndroid Build Coastguard Worker     stats.set_time_system(timer_result.system);
223*cc02d7e2SAndroid Build Coastguard Worker     stats.set_time_user(timer_result.user);
224*cc02d7e2SAndroid Build Coastguard Worker     stats.set_cq_poll_count(poll_count);
225*cc02d7e2SAndroid Build Coastguard Worker     return stats;
226*cc02d7e2SAndroid Build Coastguard Worker   }
227*cc02d7e2SAndroid Build Coastguard Worker 
228*cc02d7e2SAndroid Build Coastguard Worker   // Must call AwaitThreadsCompletion before destructor to avoid a race
229*cc02d7e2SAndroid Build Coastguard Worker   // between destructor and invocation of virtual ThreadFunc
AwaitThreadsCompletion()230*cc02d7e2SAndroid Build Coastguard Worker   void AwaitThreadsCompletion() {
231*cc02d7e2SAndroid Build Coastguard Worker     gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(true));
232*cc02d7e2SAndroid Build Coastguard Worker     DestroyMultithreading();
233*cc02d7e2SAndroid Build Coastguard Worker     std::unique_lock<std::mutex> g(thread_completion_mu_);
234*cc02d7e2SAndroid Build Coastguard Worker     while (threads_remaining_ != 0) {
235*cc02d7e2SAndroid Build Coastguard Worker       threads_complete_.wait(g);
236*cc02d7e2SAndroid Build Coastguard Worker     }
237*cc02d7e2SAndroid Build Coastguard Worker   }
238*cc02d7e2SAndroid Build Coastguard Worker 
239*cc02d7e2SAndroid Build Coastguard Worker   // Returns the interval (in seconds) between collecting latency medians. If 0,
240*cc02d7e2SAndroid Build Coastguard Worker   // no periodic median latencies will be collected.
GetLatencyCollectionIntervalInSeconds()241*cc02d7e2SAndroid Build Coastguard Worker   double GetLatencyCollectionIntervalInSeconds() {
242*cc02d7e2SAndroid Build Coastguard Worker     return median_latency_collection_interval_seconds_;
243*cc02d7e2SAndroid Build Coastguard Worker   }
244*cc02d7e2SAndroid Build Coastguard Worker 
GetPollCount()245*cc02d7e2SAndroid Build Coastguard Worker   virtual int GetPollCount() {
246*cc02d7e2SAndroid Build Coastguard Worker     // For sync client.
247*cc02d7e2SAndroid Build Coastguard Worker     return 0;
248*cc02d7e2SAndroid Build Coastguard Worker   }
249*cc02d7e2SAndroid Build Coastguard Worker 
IsClosedLoop()250*cc02d7e2SAndroid Build Coastguard Worker   bool IsClosedLoop() { return closed_loop_; }
251*cc02d7e2SAndroid Build Coastguard Worker 
NextIssueTime(int thread_idx)252*cc02d7e2SAndroid Build Coastguard Worker   gpr_timespec NextIssueTime(int thread_idx) {
253*cc02d7e2SAndroid Build Coastguard Worker     const gpr_timespec result = next_time_[thread_idx];
254*cc02d7e2SAndroid Build Coastguard Worker     next_time_[thread_idx] =
255*cc02d7e2SAndroid Build Coastguard Worker         gpr_time_add(next_time_[thread_idx],
256*cc02d7e2SAndroid Build Coastguard Worker                      gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
257*cc02d7e2SAndroid Build Coastguard Worker                                          GPR_TIMESPAN));
258*cc02d7e2SAndroid Build Coastguard Worker     return result;
259*cc02d7e2SAndroid Build Coastguard Worker   }
260*cc02d7e2SAndroid Build Coastguard Worker 
ThreadCompleted()261*cc02d7e2SAndroid Build Coastguard Worker   bool ThreadCompleted() {
262*cc02d7e2SAndroid Build Coastguard Worker     return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
263*cc02d7e2SAndroid Build Coastguard Worker   }
264*cc02d7e2SAndroid Build Coastguard Worker 
265*cc02d7e2SAndroid Build Coastguard Worker   class Thread {
266*cc02d7e2SAndroid Build Coastguard Worker    public:
Thread(Client * client,size_t idx)267*cc02d7e2SAndroid Build Coastguard Worker     Thread(Client* client, size_t idx)
268*cc02d7e2SAndroid Build Coastguard Worker         : client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {}
269*cc02d7e2SAndroid Build Coastguard Worker 
~Thread()270*cc02d7e2SAndroid Build Coastguard Worker     ~Thread() { impl_.join(); }
271*cc02d7e2SAndroid Build Coastguard Worker 
BeginSwap(Histogram * n,StatusHistogram * s)272*cc02d7e2SAndroid Build Coastguard Worker     void BeginSwap(Histogram* n, StatusHistogram* s) {
273*cc02d7e2SAndroid Build Coastguard Worker       std::lock_guard<std::mutex> g(mu_);
274*cc02d7e2SAndroid Build Coastguard Worker       n->Swap(&histogram_);
275*cc02d7e2SAndroid Build Coastguard Worker       s->swap(statuses_);
276*cc02d7e2SAndroid Build Coastguard Worker     }
277*cc02d7e2SAndroid Build Coastguard Worker 
MergeStatsInto(Histogram * hist,StatusHistogram * s)278*cc02d7e2SAndroid Build Coastguard Worker     void MergeStatsInto(Histogram* hist, StatusHistogram* s) {
279*cc02d7e2SAndroid Build Coastguard Worker       std::unique_lock<std::mutex> g(mu_);
280*cc02d7e2SAndroid Build Coastguard Worker       hist->Merge(histogram_);
281*cc02d7e2SAndroid Build Coastguard Worker       MergeStatusHistogram(statuses_, s);
282*cc02d7e2SAndroid Build Coastguard Worker     }
283*cc02d7e2SAndroid Build Coastguard Worker 
GetMedianPerIntervalList()284*cc02d7e2SAndroid Build Coastguard Worker     std::vector<double> GetMedianPerIntervalList() {
285*cc02d7e2SAndroid Build Coastguard Worker       return medians_each_interval_list_;
286*cc02d7e2SAndroid Build Coastguard Worker     }
287*cc02d7e2SAndroid Build Coastguard Worker 
UpdateHistogram(HistogramEntry * entry)288*cc02d7e2SAndroid Build Coastguard Worker     void UpdateHistogram(HistogramEntry* entry) {
289*cc02d7e2SAndroid Build Coastguard Worker       std::lock_guard<std::mutex> g(mu_);
290*cc02d7e2SAndroid Build Coastguard Worker       if (entry->value_used()) {
291*cc02d7e2SAndroid Build Coastguard Worker         histogram_.Add(entry->value());
292*cc02d7e2SAndroid Build Coastguard Worker         if (client_->GetLatencyCollectionIntervalInSeconds() > 0) {
293*cc02d7e2SAndroid Build Coastguard Worker           histogram_per_interval_.Add(entry->value());
294*cc02d7e2SAndroid Build Coastguard Worker           double now = UsageTimer::Now();
295*cc02d7e2SAndroid Build Coastguard Worker           if ((now - interval_start_time_) >=
296*cc02d7e2SAndroid Build Coastguard Worker               client_->GetLatencyCollectionIntervalInSeconds()) {
297*cc02d7e2SAndroid Build Coastguard Worker             // Record the median latency of requests from the last interval.
298*cc02d7e2SAndroid Build Coastguard Worker             // Divide by 1e3 to get microseconds.
299*cc02d7e2SAndroid Build Coastguard Worker             medians_each_interval_list_.push_back(
300*cc02d7e2SAndroid Build Coastguard Worker                 histogram_per_interval_.Percentile(50) / 1e3);
301*cc02d7e2SAndroid Build Coastguard Worker             histogram_per_interval_.Reset();
302*cc02d7e2SAndroid Build Coastguard Worker             interval_start_time_ = now;
303*cc02d7e2SAndroid Build Coastguard Worker           }
304*cc02d7e2SAndroid Build Coastguard Worker         }
305*cc02d7e2SAndroid Build Coastguard Worker       }
306*cc02d7e2SAndroid Build Coastguard Worker       if (entry->status_used()) {
307*cc02d7e2SAndroid Build Coastguard Worker         statuses_[entry->status()]++;
308*cc02d7e2SAndroid Build Coastguard Worker       }
309*cc02d7e2SAndroid Build Coastguard Worker     }
310*cc02d7e2SAndroid Build Coastguard Worker 
311*cc02d7e2SAndroid Build Coastguard Worker    private:
312*cc02d7e2SAndroid Build Coastguard Worker     Thread(const Thread&);
313*cc02d7e2SAndroid Build Coastguard Worker     Thread& operator=(const Thread&);
314*cc02d7e2SAndroid Build Coastguard Worker 
ThreadFunc()315*cc02d7e2SAndroid Build Coastguard Worker     void ThreadFunc() {
316*cc02d7e2SAndroid Build Coastguard Worker       int wait_loop = 0;
317*cc02d7e2SAndroid Build Coastguard Worker       while (!gpr_event_wait(
318*cc02d7e2SAndroid Build Coastguard Worker           &client_->start_requests_,
319*cc02d7e2SAndroid Build Coastguard Worker           gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
320*cc02d7e2SAndroid Build Coastguard Worker                        gpr_time_from_seconds(20, GPR_TIMESPAN)))) {
321*cc02d7e2SAndroid Build Coastguard Worker         gpr_log(GPR_INFO, "%" PRIdPTR ": Waiting for benchmark to start (%d)",
322*cc02d7e2SAndroid Build Coastguard Worker                 idx_, wait_loop);
323*cc02d7e2SAndroid Build Coastguard Worker         wait_loop++;
324*cc02d7e2SAndroid Build Coastguard Worker       }
325*cc02d7e2SAndroid Build Coastguard Worker 
326*cc02d7e2SAndroid Build Coastguard Worker       client_->ThreadFunc(idx_, this);
327*cc02d7e2SAndroid Build Coastguard Worker       client_->CompleteThread();
328*cc02d7e2SAndroid Build Coastguard Worker     }
329*cc02d7e2SAndroid Build Coastguard Worker 
330*cc02d7e2SAndroid Build Coastguard Worker     std::mutex mu_;
331*cc02d7e2SAndroid Build Coastguard Worker     Histogram histogram_;
332*cc02d7e2SAndroid Build Coastguard Worker     StatusHistogram statuses_;
333*cc02d7e2SAndroid Build Coastguard Worker     Client* client_;
334*cc02d7e2SAndroid Build Coastguard Worker     const size_t idx_;
335*cc02d7e2SAndroid Build Coastguard Worker     std::thread impl_;
336*cc02d7e2SAndroid Build Coastguard Worker     // The following are used only if
337*cc02d7e2SAndroid Build Coastguard Worker     // median_latency_collection_interval_seconds_ is greater than 0
338*cc02d7e2SAndroid Build Coastguard Worker     Histogram histogram_per_interval_;
339*cc02d7e2SAndroid Build Coastguard Worker     std::vector<double> medians_each_interval_list_;
340*cc02d7e2SAndroid Build Coastguard Worker     double interval_start_time_;
341*cc02d7e2SAndroid Build Coastguard Worker   };
342*cc02d7e2SAndroid Build Coastguard Worker 
343*cc02d7e2SAndroid Build Coastguard Worker  protected:
344*cc02d7e2SAndroid Build Coastguard Worker   bool closed_loop_;
345*cc02d7e2SAndroid Build Coastguard Worker   gpr_atm thread_pool_done_;
346*cc02d7e2SAndroid Build Coastguard Worker   double median_latency_collection_interval_seconds_;  // In seconds
347*cc02d7e2SAndroid Build Coastguard Worker 
StartThreads(size_t num_threads)348*cc02d7e2SAndroid Build Coastguard Worker   void StartThreads(size_t num_threads) {
349*cc02d7e2SAndroid Build Coastguard Worker     gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
350*cc02d7e2SAndroid Build Coastguard Worker     threads_remaining_ = num_threads;
351*cc02d7e2SAndroid Build Coastguard Worker     for (size_t i = 0; i < num_threads; i++) {
352*cc02d7e2SAndroid Build Coastguard Worker       threads_.emplace_back(new Thread(this, i));
353*cc02d7e2SAndroid Build Coastguard Worker     }
354*cc02d7e2SAndroid Build Coastguard Worker   }
355*cc02d7e2SAndroid Build Coastguard Worker 
EndThreads()356*cc02d7e2SAndroid Build Coastguard Worker   void EndThreads() {
357*cc02d7e2SAndroid Build Coastguard Worker     MaybeStartRequests();
358*cc02d7e2SAndroid Build Coastguard Worker     threads_.clear();
359*cc02d7e2SAndroid Build Coastguard Worker   }
360*cc02d7e2SAndroid Build Coastguard Worker 
361*cc02d7e2SAndroid Build Coastguard Worker   virtual void DestroyMultithreading() = 0;
362*cc02d7e2SAndroid Build Coastguard Worker 
SetupLoadTest(const ClientConfig & config,size_t num_threads)363*cc02d7e2SAndroid Build Coastguard Worker   void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
364*cc02d7e2SAndroid Build Coastguard Worker     // Set up the load distribution based on the number of threads
365*cc02d7e2SAndroid Build Coastguard Worker     const auto& load = config.load_params();
366*cc02d7e2SAndroid Build Coastguard Worker 
367*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<RandomDistInterface> random_dist;
368*cc02d7e2SAndroid Build Coastguard Worker     switch (load.load_case()) {
369*cc02d7e2SAndroid Build Coastguard Worker       case LoadParams::kClosedLoop:
370*cc02d7e2SAndroid Build Coastguard Worker         // Closed-loop doesn't use random dist at all
371*cc02d7e2SAndroid Build Coastguard Worker         break;
372*cc02d7e2SAndroid Build Coastguard Worker       case LoadParams::kPoisson:
373*cc02d7e2SAndroid Build Coastguard Worker         random_dist = std::make_unique<ExpDist>(load.poisson().offered_load() /
374*cc02d7e2SAndroid Build Coastguard Worker                                                 num_threads);
375*cc02d7e2SAndroid Build Coastguard Worker         break;
376*cc02d7e2SAndroid Build Coastguard Worker       default:
377*cc02d7e2SAndroid Build Coastguard Worker         grpc_core::Crash("unreachable");
378*cc02d7e2SAndroid Build Coastguard Worker     }
379*cc02d7e2SAndroid Build Coastguard Worker 
380*cc02d7e2SAndroid Build Coastguard Worker     // Set closed_loop_ based on whether or not random_dist is set
381*cc02d7e2SAndroid Build Coastguard Worker     if (!random_dist) {
382*cc02d7e2SAndroid Build Coastguard Worker       closed_loop_ = true;
383*cc02d7e2SAndroid Build Coastguard Worker     } else {
384*cc02d7e2SAndroid Build Coastguard Worker       closed_loop_ = false;
385*cc02d7e2SAndroid Build Coastguard Worker       // set up interarrival timer according to random dist
386*cc02d7e2SAndroid Build Coastguard Worker       interarrival_timer_.init(*random_dist, num_threads);
387*cc02d7e2SAndroid Build Coastguard Worker       const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
388*cc02d7e2SAndroid Build Coastguard Worker       for (size_t i = 0; i < num_threads; i++) {
389*cc02d7e2SAndroid Build Coastguard Worker         next_time_.push_back(gpr_time_add(
390*cc02d7e2SAndroid Build Coastguard Worker             now,
391*cc02d7e2SAndroid Build Coastguard Worker             gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
392*cc02d7e2SAndroid Build Coastguard Worker       }
393*cc02d7e2SAndroid Build Coastguard Worker     }
394*cc02d7e2SAndroid Build Coastguard Worker   }
395*cc02d7e2SAndroid Build Coastguard Worker 
NextIssuer(int thread_idx)396*cc02d7e2SAndroid Build Coastguard Worker   std::function<gpr_timespec()> NextIssuer(int thread_idx) {
397*cc02d7e2SAndroid Build Coastguard Worker     return closed_loop_ ? std::function<gpr_timespec()>()
398*cc02d7e2SAndroid Build Coastguard Worker                         : std::bind(&Client::NextIssueTime, this, thread_idx);
399*cc02d7e2SAndroid Build Coastguard Worker   }
400*cc02d7e2SAndroid Build Coastguard Worker 
401*cc02d7e2SAndroid Build Coastguard Worker   virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
402*cc02d7e2SAndroid Build Coastguard Worker 
403*cc02d7e2SAndroid Build Coastguard Worker   std::vector<std::unique_ptr<Thread>> threads_;
404*cc02d7e2SAndroid Build Coastguard Worker   std::unique_ptr<UsageTimer> timer_;
405*cc02d7e2SAndroid Build Coastguard Worker 
406*cc02d7e2SAndroid Build Coastguard Worker   InterarrivalTimer interarrival_timer_;
407*cc02d7e2SAndroid Build Coastguard Worker   std::vector<gpr_timespec> next_time_;
408*cc02d7e2SAndroid Build Coastguard Worker 
409*cc02d7e2SAndroid Build Coastguard Worker   std::mutex thread_completion_mu_;
410*cc02d7e2SAndroid Build Coastguard Worker   size_t threads_remaining_;
411*cc02d7e2SAndroid Build Coastguard Worker   std::condition_variable threads_complete_;
412*cc02d7e2SAndroid Build Coastguard Worker 
413*cc02d7e2SAndroid Build Coastguard Worker   gpr_event start_requests_;
414*cc02d7e2SAndroid Build Coastguard Worker   bool started_requests_;
415*cc02d7e2SAndroid Build Coastguard Worker 
416*cc02d7e2SAndroid Build Coastguard Worker   int last_reset_poll_count_;
417*cc02d7e2SAndroid Build Coastguard Worker 
MaybeStartRequests()418*cc02d7e2SAndroid Build Coastguard Worker   void MaybeStartRequests() {
419*cc02d7e2SAndroid Build Coastguard Worker     if (!started_requests_) {
420*cc02d7e2SAndroid Build Coastguard Worker       started_requests_ = true;
421*cc02d7e2SAndroid Build Coastguard Worker       gpr_event_set(&start_requests_, reinterpret_cast<void*>(1));
422*cc02d7e2SAndroid Build Coastguard Worker     }
423*cc02d7e2SAndroid Build Coastguard Worker   }
424*cc02d7e2SAndroid Build Coastguard Worker 
CompleteThread()425*cc02d7e2SAndroid Build Coastguard Worker   void CompleteThread() {
426*cc02d7e2SAndroid Build Coastguard Worker     std::lock_guard<std::mutex> g(thread_completion_mu_);
427*cc02d7e2SAndroid Build Coastguard Worker     threads_remaining_--;
428*cc02d7e2SAndroid Build Coastguard Worker     if (threads_remaining_ == 0) {
429*cc02d7e2SAndroid Build Coastguard Worker       threads_complete_.notify_all();
430*cc02d7e2SAndroid Build Coastguard Worker     }
431*cc02d7e2SAndroid Build Coastguard Worker   }
432*cc02d7e2SAndroid Build Coastguard Worker };
433*cc02d7e2SAndroid Build Coastguard Worker 
434*cc02d7e2SAndroid Build Coastguard Worker template <class StubType, class RequestType>
435*cc02d7e2SAndroid Build Coastguard Worker class ClientImpl : public Client {
436*cc02d7e2SAndroid Build Coastguard Worker  public:
ClientImpl(const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub)437*cc02d7e2SAndroid Build Coastguard Worker   ClientImpl(const ClientConfig& config,
438*cc02d7e2SAndroid Build Coastguard Worker              std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
439*cc02d7e2SAndroid Build Coastguard Worker                  create_stub)
440*cc02d7e2SAndroid Build Coastguard Worker       : cores_(gpr_cpu_num_cores()), create_stub_(create_stub) {
441*cc02d7e2SAndroid Build Coastguard Worker     for (int i = 0; i < config.client_channels(); i++) {
442*cc02d7e2SAndroid Build Coastguard Worker       channels_.emplace_back(
443*cc02d7e2SAndroid Build Coastguard Worker           config.server_targets(i % config.server_targets_size()), config,
444*cc02d7e2SAndroid Build Coastguard Worker           create_stub_, i);
445*cc02d7e2SAndroid Build Coastguard Worker     }
446*cc02d7e2SAndroid Build Coastguard Worker     WaitForChannelsToConnect();
447*cc02d7e2SAndroid Build Coastguard Worker     median_latency_collection_interval_seconds_ =
448*cc02d7e2SAndroid Build Coastguard Worker         config.median_latency_collection_interval_millis() / 1e3;
449*cc02d7e2SAndroid Build Coastguard Worker     ClientRequestCreator<RequestType> create_req(&request_,
450*cc02d7e2SAndroid Build Coastguard Worker                                                  config.payload_config());
451*cc02d7e2SAndroid Build Coastguard Worker   }
~ClientImpl()452*cc02d7e2SAndroid Build Coastguard Worker   ~ClientImpl() override {}
request()453*cc02d7e2SAndroid Build Coastguard Worker   const RequestType* request() { return &request_; }
454*cc02d7e2SAndroid Build Coastguard Worker 
WaitForChannelsToConnect()455*cc02d7e2SAndroid Build Coastguard Worker   void WaitForChannelsToConnect() {
456*cc02d7e2SAndroid Build Coastguard Worker     int connect_deadline_seconds = 10;
457*cc02d7e2SAndroid Build Coastguard Worker     // Allow optionally overriding connect_deadline in order
458*cc02d7e2SAndroid Build Coastguard Worker     // to deal with benchmark environments in which the server
459*cc02d7e2SAndroid Build Coastguard Worker     // can take a long time to become ready.
460*cc02d7e2SAndroid Build Coastguard Worker     auto channel_connect_timeout_str =
461*cc02d7e2SAndroid Build Coastguard Worker         grpc_core::GetEnv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
462*cc02d7e2SAndroid Build Coastguard Worker     if (channel_connect_timeout_str.has_value() &&
463*cc02d7e2SAndroid Build Coastguard Worker         !channel_connect_timeout_str->empty()) {
464*cc02d7e2SAndroid Build Coastguard Worker       connect_deadline_seconds = atoi(channel_connect_timeout_str->c_str());
465*cc02d7e2SAndroid Build Coastguard Worker     }
466*cc02d7e2SAndroid Build Coastguard Worker     gpr_log(GPR_INFO,
467*cc02d7e2SAndroid Build Coastguard Worker             "Waiting for up to %d seconds for all channels to connect",
468*cc02d7e2SAndroid Build Coastguard Worker             connect_deadline_seconds);
469*cc02d7e2SAndroid Build Coastguard Worker     gpr_timespec connect_deadline = gpr_time_add(
470*cc02d7e2SAndroid Build Coastguard Worker         gpr_now(GPR_CLOCK_REALTIME),
471*cc02d7e2SAndroid Build Coastguard Worker         gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN));
472*cc02d7e2SAndroid Build Coastguard Worker     CompletionQueue cq;
473*cc02d7e2SAndroid Build Coastguard Worker     size_t num_remaining = 0;
474*cc02d7e2SAndroid Build Coastguard Worker     for (auto& c : channels_) {
475*cc02d7e2SAndroid Build Coastguard Worker       if (!c.is_inproc()) {
476*cc02d7e2SAndroid Build Coastguard Worker         Channel* channel = c.get_channel();
477*cc02d7e2SAndroid Build Coastguard Worker         grpc_connectivity_state last_observed = channel->GetState(true);
478*cc02d7e2SAndroid Build Coastguard Worker         if (last_observed == GRPC_CHANNEL_READY) {
479*cc02d7e2SAndroid Build Coastguard Worker           gpr_log(GPR_INFO, "Channel %p connected!", channel);
480*cc02d7e2SAndroid Build Coastguard Worker         } else {
481*cc02d7e2SAndroid Build Coastguard Worker           num_remaining++;
482*cc02d7e2SAndroid Build Coastguard Worker           channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
483*cc02d7e2SAndroid Build Coastguard Worker                                        channel);
484*cc02d7e2SAndroid Build Coastguard Worker         }
485*cc02d7e2SAndroid Build Coastguard Worker       }
486*cc02d7e2SAndroid Build Coastguard Worker     }
487*cc02d7e2SAndroid Build Coastguard Worker     while (num_remaining > 0) {
488*cc02d7e2SAndroid Build Coastguard Worker       bool ok = false;
489*cc02d7e2SAndroid Build Coastguard Worker       void* tag = nullptr;
490*cc02d7e2SAndroid Build Coastguard Worker       cq.Next(&tag, &ok);
491*cc02d7e2SAndroid Build Coastguard Worker       Channel* channel = static_cast<Channel*>(tag);
492*cc02d7e2SAndroid Build Coastguard Worker       if (!ok) {
493*cc02d7e2SAndroid Build Coastguard Worker         grpc_core::Crash(absl::StrFormat(
494*cc02d7e2SAndroid Build Coastguard Worker             "Channel %p failed to connect within the deadline", channel));
495*cc02d7e2SAndroid Build Coastguard Worker       } else {
496*cc02d7e2SAndroid Build Coastguard Worker         grpc_connectivity_state last_observed = channel->GetState(true);
497*cc02d7e2SAndroid Build Coastguard Worker         if (last_observed == GRPC_CHANNEL_READY) {
498*cc02d7e2SAndroid Build Coastguard Worker           gpr_log(GPR_INFO, "Channel %p connected!", channel);
499*cc02d7e2SAndroid Build Coastguard Worker           num_remaining--;
500*cc02d7e2SAndroid Build Coastguard Worker         } else {
501*cc02d7e2SAndroid Build Coastguard Worker           channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
502*cc02d7e2SAndroid Build Coastguard Worker                                        channel);
503*cc02d7e2SAndroid Build Coastguard Worker         }
504*cc02d7e2SAndroid Build Coastguard Worker       }
505*cc02d7e2SAndroid Build Coastguard Worker     }
506*cc02d7e2SAndroid Build Coastguard Worker   }
507*cc02d7e2SAndroid Build Coastguard Worker 
508*cc02d7e2SAndroid Build Coastguard Worker  protected:
509*cc02d7e2SAndroid Build Coastguard Worker   const int cores_;
510*cc02d7e2SAndroid Build Coastguard Worker   RequestType request_;
511*cc02d7e2SAndroid Build Coastguard Worker 
512*cc02d7e2SAndroid Build Coastguard Worker   class ClientChannelInfo {
513*cc02d7e2SAndroid Build Coastguard Worker    public:
ClientChannelInfo(const std::string & target,const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub,int shard)514*cc02d7e2SAndroid Build Coastguard Worker     ClientChannelInfo(
515*cc02d7e2SAndroid Build Coastguard Worker         const std::string& target, const ClientConfig& config,
516*cc02d7e2SAndroid Build Coastguard Worker         std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
517*cc02d7e2SAndroid Build Coastguard Worker             create_stub,
518*cc02d7e2SAndroid Build Coastguard Worker         int shard) {
519*cc02d7e2SAndroid Build Coastguard Worker       ChannelArguments args;
520*cc02d7e2SAndroid Build Coastguard Worker       args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
521*cc02d7e2SAndroid Build Coastguard Worker       set_channel_args(config, &args);
522*cc02d7e2SAndroid Build Coastguard Worker 
523*cc02d7e2SAndroid Build Coastguard Worker       std::string type;
524*cc02d7e2SAndroid Build Coastguard Worker       if (config.has_security_params() &&
525*cc02d7e2SAndroid Build Coastguard Worker           config.security_params().cred_type().empty()) {
526*cc02d7e2SAndroid Build Coastguard Worker         type = kTlsCredentialsType;
527*cc02d7e2SAndroid Build Coastguard Worker       } else {
528*cc02d7e2SAndroid Build Coastguard Worker         type = config.security_params().cred_type();
529*cc02d7e2SAndroid Build Coastguard Worker       }
530*cc02d7e2SAndroid Build Coastguard Worker 
531*cc02d7e2SAndroid Build Coastguard Worker       std::string inproc_pfx(INPROC_NAME_PREFIX);
532*cc02d7e2SAndroid Build Coastguard Worker       if (!absl::StartsWith(target, inproc_pfx)) {
533*cc02d7e2SAndroid Build Coastguard Worker         channel_ = CreateTestChannel(
534*cc02d7e2SAndroid Build Coastguard Worker             target, type, config.security_params().server_host_override(),
535*cc02d7e2SAndroid Build Coastguard Worker             !config.security_params().use_test_ca(),
536*cc02d7e2SAndroid Build Coastguard Worker             std::shared_ptr<CallCredentials>(), args);
537*cc02d7e2SAndroid Build Coastguard Worker         gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
538*cc02d7e2SAndroid Build Coastguard Worker         is_inproc_ = false;
539*cc02d7e2SAndroid Build Coastguard Worker       } else {
540*cc02d7e2SAndroid Build Coastguard Worker         std::string tgt = target;
541*cc02d7e2SAndroid Build Coastguard Worker         tgt.erase(0, inproc_pfx.length());
542*cc02d7e2SAndroid Build Coastguard Worker         int srv_num = std::stoi(tgt);
543*cc02d7e2SAndroid Build Coastguard Worker         channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args);
544*cc02d7e2SAndroid Build Coastguard Worker         is_inproc_ = true;
545*cc02d7e2SAndroid Build Coastguard Worker       }
546*cc02d7e2SAndroid Build Coastguard Worker       stub_ = create_stub(channel_);
547*cc02d7e2SAndroid Build Coastguard Worker     }
get_channel()548*cc02d7e2SAndroid Build Coastguard Worker     Channel* get_channel() { return channel_.get(); }
get_stub()549*cc02d7e2SAndroid Build Coastguard Worker     StubType* get_stub() { return stub_.get(); }
is_inproc()550*cc02d7e2SAndroid Build Coastguard Worker     bool is_inproc() { return is_inproc_; }
551*cc02d7e2SAndroid Build Coastguard Worker 
552*cc02d7e2SAndroid Build Coastguard Worker    private:
set_channel_args(const ClientConfig & config,ChannelArguments * args)553*cc02d7e2SAndroid Build Coastguard Worker     void set_channel_args(const ClientConfig& config, ChannelArguments* args) {
554*cc02d7e2SAndroid Build Coastguard Worker       for (const auto& channel_arg : config.channel_args()) {
555*cc02d7e2SAndroid Build Coastguard Worker         if (channel_arg.value_case() == ChannelArg::kStrValue) {
556*cc02d7e2SAndroid Build Coastguard Worker           args->SetString(channel_arg.name(), channel_arg.str_value());
557*cc02d7e2SAndroid Build Coastguard Worker         } else if (channel_arg.value_case() == ChannelArg::kIntValue) {
558*cc02d7e2SAndroid Build Coastguard Worker           args->SetInt(channel_arg.name(), channel_arg.int_value());
559*cc02d7e2SAndroid Build Coastguard Worker         } else {
560*cc02d7e2SAndroid Build Coastguard Worker           gpr_log(GPR_ERROR, "Empty channel arg value.");
561*cc02d7e2SAndroid Build Coastguard Worker         }
562*cc02d7e2SAndroid Build Coastguard Worker       }
563*cc02d7e2SAndroid Build Coastguard Worker     }
564*cc02d7e2SAndroid Build Coastguard Worker 
565*cc02d7e2SAndroid Build Coastguard Worker     std::shared_ptr<Channel> channel_;
566*cc02d7e2SAndroid Build Coastguard Worker     std::unique_ptr<StubType> stub_;
567*cc02d7e2SAndroid Build Coastguard Worker     bool is_inproc_;
568*cc02d7e2SAndroid Build Coastguard Worker   };
569*cc02d7e2SAndroid Build Coastguard Worker   std::vector<ClientChannelInfo> channels_;
570*cc02d7e2SAndroid Build Coastguard Worker   std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
571*cc02d7e2SAndroid Build Coastguard Worker       create_stub_;
572*cc02d7e2SAndroid Build Coastguard Worker };
573*cc02d7e2SAndroid Build Coastguard Worker 
574*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config);
575*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config);
576*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config);
577*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
578*cc02d7e2SAndroid Build Coastguard Worker     const ClientConfig& config);
579*cc02d7e2SAndroid Build Coastguard Worker 
580*cc02d7e2SAndroid Build Coastguard Worker }  // namespace testing
581*cc02d7e2SAndroid Build Coastguard Worker }  // namespace grpc
582*cc02d7e2SAndroid Build Coastguard Worker 
583*cc02d7e2SAndroid Build Coastguard Worker #endif  // GRPC_TEST_CPP_QPS_CLIENT_H
584