1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker //
3*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015 gRPC authors.
4*cc02d7e2SAndroid Build Coastguard Worker //
5*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
6*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
7*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
8*cc02d7e2SAndroid Build Coastguard Worker //
9*cc02d7e2SAndroid Build Coastguard Worker // http://www.apache.org/licenses/LICENSE-2.0
10*cc02d7e2SAndroid Build Coastguard Worker //
11*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
12*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
13*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
15*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker //
18*cc02d7e2SAndroid Build Coastguard Worker
19*cc02d7e2SAndroid Build Coastguard Worker #ifndef GRPC_TEST_CPP_QPS_CLIENT_H
20*cc02d7e2SAndroid Build Coastguard Worker #define GRPC_TEST_CPP_QPS_CLIENT_H
21*cc02d7e2SAndroid Build Coastguard Worker
22*cc02d7e2SAndroid Build Coastguard Worker #include <inttypes.h>
23*cc02d7e2SAndroid Build Coastguard Worker #include <stdint.h>
24*cc02d7e2SAndroid Build Coastguard Worker #include <stdlib.h>
25*cc02d7e2SAndroid Build Coastguard Worker
26*cc02d7e2SAndroid Build Coastguard Worker #include <condition_variable>
27*cc02d7e2SAndroid Build Coastguard Worker #include <mutex>
28*cc02d7e2SAndroid Build Coastguard Worker #include <thread>
29*cc02d7e2SAndroid Build Coastguard Worker #include <unordered_map>
30*cc02d7e2SAndroid Build Coastguard Worker #include <vector>
31*cc02d7e2SAndroid Build Coastguard Worker
32*cc02d7e2SAndroid Build Coastguard Worker #include "absl/memory/memory.h"
33*cc02d7e2SAndroid Build Coastguard Worker #include "absl/strings/match.h"
34*cc02d7e2SAndroid Build Coastguard Worker #include "absl/strings/str_format.h"
35*cc02d7e2SAndroid Build Coastguard Worker
36*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
37*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/time.h>
38*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/channel.h>
39*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/byte_buffer.h>
40*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/channel_arguments.h>
41*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/slice.h>
42*cc02d7e2SAndroid Build Coastguard Worker
43*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/crash.h"
44*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/env.h"
45*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
46*cc02d7e2SAndroid Build Coastguard Worker #include "src/proto/grpc/testing/payloads.pb.h"
47*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/histogram.h"
48*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/interarrival.h"
49*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/qps_worker.h"
50*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/server.h"
51*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/qps/usage_timer.h"
52*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/util/create_test_channel.h"
53*cc02d7e2SAndroid Build Coastguard Worker #include "test/cpp/util/test_credentials_provider.h"
54*cc02d7e2SAndroid Build Coastguard Worker
55*cc02d7e2SAndroid Build Coastguard Worker #define INPROC_NAME_PREFIX "qpsinproc:"
56*cc02d7e2SAndroid Build Coastguard Worker
57*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
58*cc02d7e2SAndroid Build Coastguard Worker namespace testing {
59*cc02d7e2SAndroid Build Coastguard Worker
60*cc02d7e2SAndroid Build Coastguard Worker template <class RequestType>
61*cc02d7e2SAndroid Build Coastguard Worker class ClientRequestCreator {
62*cc02d7e2SAndroid Build Coastguard Worker public:
ClientRequestCreator(RequestType *,const PayloadConfig &)63*cc02d7e2SAndroid Build Coastguard Worker ClientRequestCreator(RequestType* /*req*/, const PayloadConfig&) {
64*cc02d7e2SAndroid Build Coastguard Worker // this template must be specialized
65*cc02d7e2SAndroid Build Coastguard Worker // fail with an assertion rather than a compile-time
66*cc02d7e2SAndroid Build Coastguard Worker // check since these only happen at the beginning anyway
67*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash("unreachable");
68*cc02d7e2SAndroid Build Coastguard Worker }
69*cc02d7e2SAndroid Build Coastguard Worker };
70*cc02d7e2SAndroid Build Coastguard Worker
71*cc02d7e2SAndroid Build Coastguard Worker template <>
72*cc02d7e2SAndroid Build Coastguard Worker class ClientRequestCreator<SimpleRequest> {
73*cc02d7e2SAndroid Build Coastguard Worker public:
ClientRequestCreator(SimpleRequest * req,const PayloadConfig & payload_config)74*cc02d7e2SAndroid Build Coastguard Worker ClientRequestCreator(SimpleRequest* req,
75*cc02d7e2SAndroid Build Coastguard Worker const PayloadConfig& payload_config) {
76*cc02d7e2SAndroid Build Coastguard Worker if (payload_config.has_bytebuf_params()) {
77*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat(
78*cc02d7e2SAndroid Build Coastguard Worker "Invalid PayloadConfig, config cannot have bytebuf_params: %s",
79*cc02d7e2SAndroid Build Coastguard Worker payload_config.DebugString()
80*cc02d7e2SAndroid Build Coastguard Worker .c_str())); // not appropriate for this specialization
81*cc02d7e2SAndroid Build Coastguard Worker } else if (payload_config.has_simple_params()) {
82*cc02d7e2SAndroid Build Coastguard Worker req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
83*cc02d7e2SAndroid Build Coastguard Worker req->set_response_size(payload_config.simple_params().resp_size());
84*cc02d7e2SAndroid Build Coastguard Worker req->mutable_payload()->set_type(
85*cc02d7e2SAndroid Build Coastguard Worker grpc::testing::PayloadType::COMPRESSABLE);
86*cc02d7e2SAndroid Build Coastguard Worker int size = payload_config.simple_params().req_size();
87*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<char[]> body(new char[size]);
88*cc02d7e2SAndroid Build Coastguard Worker req->mutable_payload()->set_body(body.get(), size);
89*cc02d7e2SAndroid Build Coastguard Worker } else if (payload_config.has_complex_params()) {
90*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat(
91*cc02d7e2SAndroid Build Coastguard Worker "Invalid PayloadConfig, cannot have complex_params: %s",
92*cc02d7e2SAndroid Build Coastguard Worker payload_config.DebugString()
93*cc02d7e2SAndroid Build Coastguard Worker .c_str())); // not appropriate for this specialization
94*cc02d7e2SAndroid Build Coastguard Worker } else {
95*cc02d7e2SAndroid Build Coastguard Worker // default should be simple proto without payloads
96*cc02d7e2SAndroid Build Coastguard Worker req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
97*cc02d7e2SAndroid Build Coastguard Worker req->set_response_size(0);
98*cc02d7e2SAndroid Build Coastguard Worker req->mutable_payload()->set_type(
99*cc02d7e2SAndroid Build Coastguard Worker grpc::testing::PayloadType::COMPRESSABLE);
100*cc02d7e2SAndroid Build Coastguard Worker }
101*cc02d7e2SAndroid Build Coastguard Worker }
102*cc02d7e2SAndroid Build Coastguard Worker };
103*cc02d7e2SAndroid Build Coastguard Worker
104*cc02d7e2SAndroid Build Coastguard Worker template <>
105*cc02d7e2SAndroid Build Coastguard Worker class ClientRequestCreator<ByteBuffer> {
106*cc02d7e2SAndroid Build Coastguard Worker public:
ClientRequestCreator(ByteBuffer * req,const PayloadConfig & payload_config)107*cc02d7e2SAndroid Build Coastguard Worker ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) {
108*cc02d7e2SAndroid Build Coastguard Worker if (payload_config.has_bytebuf_params()) {
109*cc02d7e2SAndroid Build Coastguard Worker size_t req_sz =
110*cc02d7e2SAndroid Build Coastguard Worker static_cast<size_t>(payload_config.bytebuf_params().req_size());
111*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<char[]> buf(new char[req_sz]);
112*cc02d7e2SAndroid Build Coastguard Worker memset(buf.get(), 0, req_sz);
113*cc02d7e2SAndroid Build Coastguard Worker Slice slice(buf.get(), req_sz);
114*cc02d7e2SAndroid Build Coastguard Worker *req = ByteBuffer(&slice, 1);
115*cc02d7e2SAndroid Build Coastguard Worker } else {
116*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat(
117*cc02d7e2SAndroid Build Coastguard Worker "Invalid PayloadConfig, missing bytebug_params: %s",
118*cc02d7e2SAndroid Build Coastguard Worker payload_config.DebugString()
119*cc02d7e2SAndroid Build Coastguard Worker .c_str())); // not appropriate for this specialization
120*cc02d7e2SAndroid Build Coastguard Worker }
121*cc02d7e2SAndroid Build Coastguard Worker }
122*cc02d7e2SAndroid Build Coastguard Worker };
123*cc02d7e2SAndroid Build Coastguard Worker
124*cc02d7e2SAndroid Build Coastguard Worker class HistogramEntry final {
125*cc02d7e2SAndroid Build Coastguard Worker public:
HistogramEntry()126*cc02d7e2SAndroid Build Coastguard Worker HistogramEntry() : value_used_(false), status_used_(false) {}
value_used()127*cc02d7e2SAndroid Build Coastguard Worker bool value_used() const { return value_used_; }
value()128*cc02d7e2SAndroid Build Coastguard Worker double value() const { return value_; }
set_value(double v)129*cc02d7e2SAndroid Build Coastguard Worker void set_value(double v) {
130*cc02d7e2SAndroid Build Coastguard Worker value_used_ = true;
131*cc02d7e2SAndroid Build Coastguard Worker value_ = v;
132*cc02d7e2SAndroid Build Coastguard Worker }
status_used()133*cc02d7e2SAndroid Build Coastguard Worker bool status_used() const { return status_used_; }
status()134*cc02d7e2SAndroid Build Coastguard Worker int status() const { return status_; }
set_status(int status)135*cc02d7e2SAndroid Build Coastguard Worker void set_status(int status) {
136*cc02d7e2SAndroid Build Coastguard Worker status_used_ = true;
137*cc02d7e2SAndroid Build Coastguard Worker status_ = status;
138*cc02d7e2SAndroid Build Coastguard Worker }
139*cc02d7e2SAndroid Build Coastguard Worker
140*cc02d7e2SAndroid Build Coastguard Worker private:
141*cc02d7e2SAndroid Build Coastguard Worker bool value_used_;
142*cc02d7e2SAndroid Build Coastguard Worker double value_;
143*cc02d7e2SAndroid Build Coastguard Worker bool status_used_;
144*cc02d7e2SAndroid Build Coastguard Worker int status_;
145*cc02d7e2SAndroid Build Coastguard Worker };
146*cc02d7e2SAndroid Build Coastguard Worker
147*cc02d7e2SAndroid Build Coastguard Worker typedef std::unordered_map<int, int64_t> StatusHistogram;
148*cc02d7e2SAndroid Build Coastguard Worker
MergeStatusHistogram(const StatusHistogram & from,StatusHistogram * to)149*cc02d7e2SAndroid Build Coastguard Worker inline void MergeStatusHistogram(const StatusHistogram& from,
150*cc02d7e2SAndroid Build Coastguard Worker StatusHistogram* to) {
151*cc02d7e2SAndroid Build Coastguard Worker for (StatusHistogram::const_iterator it = from.begin(); it != from.end();
152*cc02d7e2SAndroid Build Coastguard Worker ++it) {
153*cc02d7e2SAndroid Build Coastguard Worker (*to)[it->first] += it->second;
154*cc02d7e2SAndroid Build Coastguard Worker }
155*cc02d7e2SAndroid Build Coastguard Worker }
156*cc02d7e2SAndroid Build Coastguard Worker
157*cc02d7e2SAndroid Build Coastguard Worker class Client {
158*cc02d7e2SAndroid Build Coastguard Worker public:
Client()159*cc02d7e2SAndroid Build Coastguard Worker Client()
160*cc02d7e2SAndroid Build Coastguard Worker : timer_(new UsageTimer),
161*cc02d7e2SAndroid Build Coastguard Worker interarrival_timer_(),
162*cc02d7e2SAndroid Build Coastguard Worker started_requests_(false),
163*cc02d7e2SAndroid Build Coastguard Worker last_reset_poll_count_(0) {
164*cc02d7e2SAndroid Build Coastguard Worker gpr_event_init(&start_requests_);
165*cc02d7e2SAndroid Build Coastguard Worker }
~Client()166*cc02d7e2SAndroid Build Coastguard Worker virtual ~Client() {}
167*cc02d7e2SAndroid Build Coastguard Worker
Mark(bool reset)168*cc02d7e2SAndroid Build Coastguard Worker ClientStats Mark(bool reset) {
169*cc02d7e2SAndroid Build Coastguard Worker Histogram latencies;
170*cc02d7e2SAndroid Build Coastguard Worker StatusHistogram statuses;
171*cc02d7e2SAndroid Build Coastguard Worker UsageTimer::Result timer_result;
172*cc02d7e2SAndroid Build Coastguard Worker
173*cc02d7e2SAndroid Build Coastguard Worker MaybeStartRequests();
174*cc02d7e2SAndroid Build Coastguard Worker
175*cc02d7e2SAndroid Build Coastguard Worker int cur_poll_count = GetPollCount();
176*cc02d7e2SAndroid Build Coastguard Worker int poll_count = cur_poll_count - last_reset_poll_count_;
177*cc02d7e2SAndroid Build Coastguard Worker if (reset) {
178*cc02d7e2SAndroid Build Coastguard Worker std::vector<Histogram> to_merge(threads_.size());
179*cc02d7e2SAndroid Build Coastguard Worker std::vector<StatusHistogram> to_merge_status(threads_.size());
180*cc02d7e2SAndroid Build Coastguard Worker
181*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < threads_.size(); i++) {
182*cc02d7e2SAndroid Build Coastguard Worker threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]);
183*cc02d7e2SAndroid Build Coastguard Worker }
184*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<UsageTimer> timer(new UsageTimer);
185*cc02d7e2SAndroid Build Coastguard Worker timer_.swap(timer);
186*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < threads_.size(); i++) {
187*cc02d7e2SAndroid Build Coastguard Worker latencies.Merge(to_merge[i]);
188*cc02d7e2SAndroid Build Coastguard Worker MergeStatusHistogram(to_merge_status[i], &statuses);
189*cc02d7e2SAndroid Build Coastguard Worker }
190*cc02d7e2SAndroid Build Coastguard Worker timer_result = timer->Mark();
191*cc02d7e2SAndroid Build Coastguard Worker last_reset_poll_count_ = cur_poll_count;
192*cc02d7e2SAndroid Build Coastguard Worker } else {
193*cc02d7e2SAndroid Build Coastguard Worker // merge snapshots of each thread histogram
194*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < threads_.size(); i++) {
195*cc02d7e2SAndroid Build Coastguard Worker threads_[i]->MergeStatsInto(&latencies, &statuses);
196*cc02d7e2SAndroid Build Coastguard Worker }
197*cc02d7e2SAndroid Build Coastguard Worker timer_result = timer_->Mark();
198*cc02d7e2SAndroid Build Coastguard Worker }
199*cc02d7e2SAndroid Build Coastguard Worker
200*cc02d7e2SAndroid Build Coastguard Worker // Print the median latency per interval for one thread.
201*cc02d7e2SAndroid Build Coastguard Worker // If the number of warmup seconds is x, then the first x + 1 numbers in the
202*cc02d7e2SAndroid Build Coastguard Worker // vector are from the warmup period and should be discarded.
203*cc02d7e2SAndroid Build Coastguard Worker if (median_latency_collection_interval_seconds_ > 0) {
204*cc02d7e2SAndroid Build Coastguard Worker std::vector<double> medians_per_interval =
205*cc02d7e2SAndroid Build Coastguard Worker threads_[0]->GetMedianPerIntervalList();
206*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Num threads: %zu", threads_.size());
207*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Number of medians: %zu", medians_per_interval.size());
208*cc02d7e2SAndroid Build Coastguard Worker for (size_t j = 0; j < medians_per_interval.size(); j++) {
209*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "%f", medians_per_interval[j]);
210*cc02d7e2SAndroid Build Coastguard Worker }
211*cc02d7e2SAndroid Build Coastguard Worker }
212*cc02d7e2SAndroid Build Coastguard Worker
213*cc02d7e2SAndroid Build Coastguard Worker ClientStats stats;
214*cc02d7e2SAndroid Build Coastguard Worker latencies.FillProto(stats.mutable_latencies());
215*cc02d7e2SAndroid Build Coastguard Worker for (StatusHistogram::const_iterator it = statuses.begin();
216*cc02d7e2SAndroid Build Coastguard Worker it != statuses.end(); ++it) {
217*cc02d7e2SAndroid Build Coastguard Worker RequestResultCount* rrc = stats.add_request_results();
218*cc02d7e2SAndroid Build Coastguard Worker rrc->set_status_code(it->first);
219*cc02d7e2SAndroid Build Coastguard Worker rrc->set_count(it->second);
220*cc02d7e2SAndroid Build Coastguard Worker }
221*cc02d7e2SAndroid Build Coastguard Worker stats.set_time_elapsed(timer_result.wall);
222*cc02d7e2SAndroid Build Coastguard Worker stats.set_time_system(timer_result.system);
223*cc02d7e2SAndroid Build Coastguard Worker stats.set_time_user(timer_result.user);
224*cc02d7e2SAndroid Build Coastguard Worker stats.set_cq_poll_count(poll_count);
225*cc02d7e2SAndroid Build Coastguard Worker return stats;
226*cc02d7e2SAndroid Build Coastguard Worker }
227*cc02d7e2SAndroid Build Coastguard Worker
228*cc02d7e2SAndroid Build Coastguard Worker // Must call AwaitThreadsCompletion before destructor to avoid a race
229*cc02d7e2SAndroid Build Coastguard Worker // between destructor and invocation of virtual ThreadFunc
AwaitThreadsCompletion()230*cc02d7e2SAndroid Build Coastguard Worker void AwaitThreadsCompletion() {
231*cc02d7e2SAndroid Build Coastguard Worker gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(true));
232*cc02d7e2SAndroid Build Coastguard Worker DestroyMultithreading();
233*cc02d7e2SAndroid Build Coastguard Worker std::unique_lock<std::mutex> g(thread_completion_mu_);
234*cc02d7e2SAndroid Build Coastguard Worker while (threads_remaining_ != 0) {
235*cc02d7e2SAndroid Build Coastguard Worker threads_complete_.wait(g);
236*cc02d7e2SAndroid Build Coastguard Worker }
237*cc02d7e2SAndroid Build Coastguard Worker }
238*cc02d7e2SAndroid Build Coastguard Worker
239*cc02d7e2SAndroid Build Coastguard Worker // Returns the interval (in seconds) between collecting latency medians. If 0,
240*cc02d7e2SAndroid Build Coastguard Worker // no periodic median latencies will be collected.
GetLatencyCollectionIntervalInSeconds()241*cc02d7e2SAndroid Build Coastguard Worker double GetLatencyCollectionIntervalInSeconds() {
242*cc02d7e2SAndroid Build Coastguard Worker return median_latency_collection_interval_seconds_;
243*cc02d7e2SAndroid Build Coastguard Worker }
244*cc02d7e2SAndroid Build Coastguard Worker
GetPollCount()245*cc02d7e2SAndroid Build Coastguard Worker virtual int GetPollCount() {
246*cc02d7e2SAndroid Build Coastguard Worker // For sync client.
247*cc02d7e2SAndroid Build Coastguard Worker return 0;
248*cc02d7e2SAndroid Build Coastguard Worker }
249*cc02d7e2SAndroid Build Coastguard Worker
IsClosedLoop()250*cc02d7e2SAndroid Build Coastguard Worker bool IsClosedLoop() { return closed_loop_; }
251*cc02d7e2SAndroid Build Coastguard Worker
NextIssueTime(int thread_idx)252*cc02d7e2SAndroid Build Coastguard Worker gpr_timespec NextIssueTime(int thread_idx) {
253*cc02d7e2SAndroid Build Coastguard Worker const gpr_timespec result = next_time_[thread_idx];
254*cc02d7e2SAndroid Build Coastguard Worker next_time_[thread_idx] =
255*cc02d7e2SAndroid Build Coastguard Worker gpr_time_add(next_time_[thread_idx],
256*cc02d7e2SAndroid Build Coastguard Worker gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
257*cc02d7e2SAndroid Build Coastguard Worker GPR_TIMESPAN));
258*cc02d7e2SAndroid Build Coastguard Worker return result;
259*cc02d7e2SAndroid Build Coastguard Worker }
260*cc02d7e2SAndroid Build Coastguard Worker
ThreadCompleted()261*cc02d7e2SAndroid Build Coastguard Worker bool ThreadCompleted() {
262*cc02d7e2SAndroid Build Coastguard Worker return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
263*cc02d7e2SAndroid Build Coastguard Worker }
264*cc02d7e2SAndroid Build Coastguard Worker
265*cc02d7e2SAndroid Build Coastguard Worker class Thread {
266*cc02d7e2SAndroid Build Coastguard Worker public:
Thread(Client * client,size_t idx)267*cc02d7e2SAndroid Build Coastguard Worker Thread(Client* client, size_t idx)
268*cc02d7e2SAndroid Build Coastguard Worker : client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {}
269*cc02d7e2SAndroid Build Coastguard Worker
~Thread()270*cc02d7e2SAndroid Build Coastguard Worker ~Thread() { impl_.join(); }
271*cc02d7e2SAndroid Build Coastguard Worker
BeginSwap(Histogram * n,StatusHistogram * s)272*cc02d7e2SAndroid Build Coastguard Worker void BeginSwap(Histogram* n, StatusHistogram* s) {
273*cc02d7e2SAndroid Build Coastguard Worker std::lock_guard<std::mutex> g(mu_);
274*cc02d7e2SAndroid Build Coastguard Worker n->Swap(&histogram_);
275*cc02d7e2SAndroid Build Coastguard Worker s->swap(statuses_);
276*cc02d7e2SAndroid Build Coastguard Worker }
277*cc02d7e2SAndroid Build Coastguard Worker
MergeStatsInto(Histogram * hist,StatusHistogram * s)278*cc02d7e2SAndroid Build Coastguard Worker void MergeStatsInto(Histogram* hist, StatusHistogram* s) {
279*cc02d7e2SAndroid Build Coastguard Worker std::unique_lock<std::mutex> g(mu_);
280*cc02d7e2SAndroid Build Coastguard Worker hist->Merge(histogram_);
281*cc02d7e2SAndroid Build Coastguard Worker MergeStatusHistogram(statuses_, s);
282*cc02d7e2SAndroid Build Coastguard Worker }
283*cc02d7e2SAndroid Build Coastguard Worker
GetMedianPerIntervalList()284*cc02d7e2SAndroid Build Coastguard Worker std::vector<double> GetMedianPerIntervalList() {
285*cc02d7e2SAndroid Build Coastguard Worker return medians_each_interval_list_;
286*cc02d7e2SAndroid Build Coastguard Worker }
287*cc02d7e2SAndroid Build Coastguard Worker
UpdateHistogram(HistogramEntry * entry)288*cc02d7e2SAndroid Build Coastguard Worker void UpdateHistogram(HistogramEntry* entry) {
289*cc02d7e2SAndroid Build Coastguard Worker std::lock_guard<std::mutex> g(mu_);
290*cc02d7e2SAndroid Build Coastguard Worker if (entry->value_used()) {
291*cc02d7e2SAndroid Build Coastguard Worker histogram_.Add(entry->value());
292*cc02d7e2SAndroid Build Coastguard Worker if (client_->GetLatencyCollectionIntervalInSeconds() > 0) {
293*cc02d7e2SAndroid Build Coastguard Worker histogram_per_interval_.Add(entry->value());
294*cc02d7e2SAndroid Build Coastguard Worker double now = UsageTimer::Now();
295*cc02d7e2SAndroid Build Coastguard Worker if ((now - interval_start_time_) >=
296*cc02d7e2SAndroid Build Coastguard Worker client_->GetLatencyCollectionIntervalInSeconds()) {
297*cc02d7e2SAndroid Build Coastguard Worker // Record the median latency of requests from the last interval.
298*cc02d7e2SAndroid Build Coastguard Worker // Divide by 1e3 to get microseconds.
299*cc02d7e2SAndroid Build Coastguard Worker medians_each_interval_list_.push_back(
300*cc02d7e2SAndroid Build Coastguard Worker histogram_per_interval_.Percentile(50) / 1e3);
301*cc02d7e2SAndroid Build Coastguard Worker histogram_per_interval_.Reset();
302*cc02d7e2SAndroid Build Coastguard Worker interval_start_time_ = now;
303*cc02d7e2SAndroid Build Coastguard Worker }
304*cc02d7e2SAndroid Build Coastguard Worker }
305*cc02d7e2SAndroid Build Coastguard Worker }
306*cc02d7e2SAndroid Build Coastguard Worker if (entry->status_used()) {
307*cc02d7e2SAndroid Build Coastguard Worker statuses_[entry->status()]++;
308*cc02d7e2SAndroid Build Coastguard Worker }
309*cc02d7e2SAndroid Build Coastguard Worker }
310*cc02d7e2SAndroid Build Coastguard Worker
311*cc02d7e2SAndroid Build Coastguard Worker private:
312*cc02d7e2SAndroid Build Coastguard Worker Thread(const Thread&);
313*cc02d7e2SAndroid Build Coastguard Worker Thread& operator=(const Thread&);
314*cc02d7e2SAndroid Build Coastguard Worker
ThreadFunc()315*cc02d7e2SAndroid Build Coastguard Worker void ThreadFunc() {
316*cc02d7e2SAndroid Build Coastguard Worker int wait_loop = 0;
317*cc02d7e2SAndroid Build Coastguard Worker while (!gpr_event_wait(
318*cc02d7e2SAndroid Build Coastguard Worker &client_->start_requests_,
319*cc02d7e2SAndroid Build Coastguard Worker gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
320*cc02d7e2SAndroid Build Coastguard Worker gpr_time_from_seconds(20, GPR_TIMESPAN)))) {
321*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "%" PRIdPTR ": Waiting for benchmark to start (%d)",
322*cc02d7e2SAndroid Build Coastguard Worker idx_, wait_loop);
323*cc02d7e2SAndroid Build Coastguard Worker wait_loop++;
324*cc02d7e2SAndroid Build Coastguard Worker }
325*cc02d7e2SAndroid Build Coastguard Worker
326*cc02d7e2SAndroid Build Coastguard Worker client_->ThreadFunc(idx_, this);
327*cc02d7e2SAndroid Build Coastguard Worker client_->CompleteThread();
328*cc02d7e2SAndroid Build Coastguard Worker }
329*cc02d7e2SAndroid Build Coastguard Worker
330*cc02d7e2SAndroid Build Coastguard Worker std::mutex mu_;
331*cc02d7e2SAndroid Build Coastguard Worker Histogram histogram_;
332*cc02d7e2SAndroid Build Coastguard Worker StatusHistogram statuses_;
333*cc02d7e2SAndroid Build Coastguard Worker Client* client_;
334*cc02d7e2SAndroid Build Coastguard Worker const size_t idx_;
335*cc02d7e2SAndroid Build Coastguard Worker std::thread impl_;
336*cc02d7e2SAndroid Build Coastguard Worker // The following are used only if
337*cc02d7e2SAndroid Build Coastguard Worker // median_latency_collection_interval_seconds_ is greater than 0
338*cc02d7e2SAndroid Build Coastguard Worker Histogram histogram_per_interval_;
339*cc02d7e2SAndroid Build Coastguard Worker std::vector<double> medians_each_interval_list_;
340*cc02d7e2SAndroid Build Coastguard Worker double interval_start_time_;
341*cc02d7e2SAndroid Build Coastguard Worker };
342*cc02d7e2SAndroid Build Coastguard Worker
343*cc02d7e2SAndroid Build Coastguard Worker protected:
344*cc02d7e2SAndroid Build Coastguard Worker bool closed_loop_;
345*cc02d7e2SAndroid Build Coastguard Worker gpr_atm thread_pool_done_;
346*cc02d7e2SAndroid Build Coastguard Worker double median_latency_collection_interval_seconds_; // In seconds
347*cc02d7e2SAndroid Build Coastguard Worker
StartThreads(size_t num_threads)348*cc02d7e2SAndroid Build Coastguard Worker void StartThreads(size_t num_threads) {
349*cc02d7e2SAndroid Build Coastguard Worker gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
350*cc02d7e2SAndroid Build Coastguard Worker threads_remaining_ = num_threads;
351*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_threads; i++) {
352*cc02d7e2SAndroid Build Coastguard Worker threads_.emplace_back(new Thread(this, i));
353*cc02d7e2SAndroid Build Coastguard Worker }
354*cc02d7e2SAndroid Build Coastguard Worker }
355*cc02d7e2SAndroid Build Coastguard Worker
EndThreads()356*cc02d7e2SAndroid Build Coastguard Worker void EndThreads() {
357*cc02d7e2SAndroid Build Coastguard Worker MaybeStartRequests();
358*cc02d7e2SAndroid Build Coastguard Worker threads_.clear();
359*cc02d7e2SAndroid Build Coastguard Worker }
360*cc02d7e2SAndroid Build Coastguard Worker
361*cc02d7e2SAndroid Build Coastguard Worker virtual void DestroyMultithreading() = 0;
362*cc02d7e2SAndroid Build Coastguard Worker
SetupLoadTest(const ClientConfig & config,size_t num_threads)363*cc02d7e2SAndroid Build Coastguard Worker void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
364*cc02d7e2SAndroid Build Coastguard Worker // Set up the load distribution based on the number of threads
365*cc02d7e2SAndroid Build Coastguard Worker const auto& load = config.load_params();
366*cc02d7e2SAndroid Build Coastguard Worker
367*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<RandomDistInterface> random_dist;
368*cc02d7e2SAndroid Build Coastguard Worker switch (load.load_case()) {
369*cc02d7e2SAndroid Build Coastguard Worker case LoadParams::kClosedLoop:
370*cc02d7e2SAndroid Build Coastguard Worker // Closed-loop doesn't use random dist at all
371*cc02d7e2SAndroid Build Coastguard Worker break;
372*cc02d7e2SAndroid Build Coastguard Worker case LoadParams::kPoisson:
373*cc02d7e2SAndroid Build Coastguard Worker random_dist = std::make_unique<ExpDist>(load.poisson().offered_load() /
374*cc02d7e2SAndroid Build Coastguard Worker num_threads);
375*cc02d7e2SAndroid Build Coastguard Worker break;
376*cc02d7e2SAndroid Build Coastguard Worker default:
377*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash("unreachable");
378*cc02d7e2SAndroid Build Coastguard Worker }
379*cc02d7e2SAndroid Build Coastguard Worker
380*cc02d7e2SAndroid Build Coastguard Worker // Set closed_loop_ based on whether or not random_dist is set
381*cc02d7e2SAndroid Build Coastguard Worker if (!random_dist) {
382*cc02d7e2SAndroid Build Coastguard Worker closed_loop_ = true;
383*cc02d7e2SAndroid Build Coastguard Worker } else {
384*cc02d7e2SAndroid Build Coastguard Worker closed_loop_ = false;
385*cc02d7e2SAndroid Build Coastguard Worker // set up interarrival timer according to random dist
386*cc02d7e2SAndroid Build Coastguard Worker interarrival_timer_.init(*random_dist, num_threads);
387*cc02d7e2SAndroid Build Coastguard Worker const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
388*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_threads; i++) {
389*cc02d7e2SAndroid Build Coastguard Worker next_time_.push_back(gpr_time_add(
390*cc02d7e2SAndroid Build Coastguard Worker now,
391*cc02d7e2SAndroid Build Coastguard Worker gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
392*cc02d7e2SAndroid Build Coastguard Worker }
393*cc02d7e2SAndroid Build Coastguard Worker }
394*cc02d7e2SAndroid Build Coastguard Worker }
395*cc02d7e2SAndroid Build Coastguard Worker
NextIssuer(int thread_idx)396*cc02d7e2SAndroid Build Coastguard Worker std::function<gpr_timespec()> NextIssuer(int thread_idx) {
397*cc02d7e2SAndroid Build Coastguard Worker return closed_loop_ ? std::function<gpr_timespec()>()
398*cc02d7e2SAndroid Build Coastguard Worker : std::bind(&Client::NextIssueTime, this, thread_idx);
399*cc02d7e2SAndroid Build Coastguard Worker }
400*cc02d7e2SAndroid Build Coastguard Worker
401*cc02d7e2SAndroid Build Coastguard Worker virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
402*cc02d7e2SAndroid Build Coastguard Worker
403*cc02d7e2SAndroid Build Coastguard Worker std::vector<std::unique_ptr<Thread>> threads_;
404*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<UsageTimer> timer_;
405*cc02d7e2SAndroid Build Coastguard Worker
406*cc02d7e2SAndroid Build Coastguard Worker InterarrivalTimer interarrival_timer_;
407*cc02d7e2SAndroid Build Coastguard Worker std::vector<gpr_timespec> next_time_;
408*cc02d7e2SAndroid Build Coastguard Worker
409*cc02d7e2SAndroid Build Coastguard Worker std::mutex thread_completion_mu_;
410*cc02d7e2SAndroid Build Coastguard Worker size_t threads_remaining_;
411*cc02d7e2SAndroid Build Coastguard Worker std::condition_variable threads_complete_;
412*cc02d7e2SAndroid Build Coastguard Worker
413*cc02d7e2SAndroid Build Coastguard Worker gpr_event start_requests_;
414*cc02d7e2SAndroid Build Coastguard Worker bool started_requests_;
415*cc02d7e2SAndroid Build Coastguard Worker
416*cc02d7e2SAndroid Build Coastguard Worker int last_reset_poll_count_;
417*cc02d7e2SAndroid Build Coastguard Worker
MaybeStartRequests()418*cc02d7e2SAndroid Build Coastguard Worker void MaybeStartRequests() {
419*cc02d7e2SAndroid Build Coastguard Worker if (!started_requests_) {
420*cc02d7e2SAndroid Build Coastguard Worker started_requests_ = true;
421*cc02d7e2SAndroid Build Coastguard Worker gpr_event_set(&start_requests_, reinterpret_cast<void*>(1));
422*cc02d7e2SAndroid Build Coastguard Worker }
423*cc02d7e2SAndroid Build Coastguard Worker }
424*cc02d7e2SAndroid Build Coastguard Worker
CompleteThread()425*cc02d7e2SAndroid Build Coastguard Worker void CompleteThread() {
426*cc02d7e2SAndroid Build Coastguard Worker std::lock_guard<std::mutex> g(thread_completion_mu_);
427*cc02d7e2SAndroid Build Coastguard Worker threads_remaining_--;
428*cc02d7e2SAndroid Build Coastguard Worker if (threads_remaining_ == 0) {
429*cc02d7e2SAndroid Build Coastguard Worker threads_complete_.notify_all();
430*cc02d7e2SAndroid Build Coastguard Worker }
431*cc02d7e2SAndroid Build Coastguard Worker }
432*cc02d7e2SAndroid Build Coastguard Worker };
433*cc02d7e2SAndroid Build Coastguard Worker
434*cc02d7e2SAndroid Build Coastguard Worker template <class StubType, class RequestType>
435*cc02d7e2SAndroid Build Coastguard Worker class ClientImpl : public Client {
436*cc02d7e2SAndroid Build Coastguard Worker public:
ClientImpl(const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub)437*cc02d7e2SAndroid Build Coastguard Worker ClientImpl(const ClientConfig& config,
438*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
439*cc02d7e2SAndroid Build Coastguard Worker create_stub)
440*cc02d7e2SAndroid Build Coastguard Worker : cores_(gpr_cpu_num_cores()), create_stub_(create_stub) {
441*cc02d7e2SAndroid Build Coastguard Worker for (int i = 0; i < config.client_channels(); i++) {
442*cc02d7e2SAndroid Build Coastguard Worker channels_.emplace_back(
443*cc02d7e2SAndroid Build Coastguard Worker config.server_targets(i % config.server_targets_size()), config,
444*cc02d7e2SAndroid Build Coastguard Worker create_stub_, i);
445*cc02d7e2SAndroid Build Coastguard Worker }
446*cc02d7e2SAndroid Build Coastguard Worker WaitForChannelsToConnect();
447*cc02d7e2SAndroid Build Coastguard Worker median_latency_collection_interval_seconds_ =
448*cc02d7e2SAndroid Build Coastguard Worker config.median_latency_collection_interval_millis() / 1e3;
449*cc02d7e2SAndroid Build Coastguard Worker ClientRequestCreator<RequestType> create_req(&request_,
450*cc02d7e2SAndroid Build Coastguard Worker config.payload_config());
451*cc02d7e2SAndroid Build Coastguard Worker }
~ClientImpl()452*cc02d7e2SAndroid Build Coastguard Worker ~ClientImpl() override {}
request()453*cc02d7e2SAndroid Build Coastguard Worker const RequestType* request() { return &request_; }
454*cc02d7e2SAndroid Build Coastguard Worker
WaitForChannelsToConnect()455*cc02d7e2SAndroid Build Coastguard Worker void WaitForChannelsToConnect() {
456*cc02d7e2SAndroid Build Coastguard Worker int connect_deadline_seconds = 10;
457*cc02d7e2SAndroid Build Coastguard Worker // Allow optionally overriding connect_deadline in order
458*cc02d7e2SAndroid Build Coastguard Worker // to deal with benchmark environments in which the server
459*cc02d7e2SAndroid Build Coastguard Worker // can take a long time to become ready.
460*cc02d7e2SAndroid Build Coastguard Worker auto channel_connect_timeout_str =
461*cc02d7e2SAndroid Build Coastguard Worker grpc_core::GetEnv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
462*cc02d7e2SAndroid Build Coastguard Worker if (channel_connect_timeout_str.has_value() &&
463*cc02d7e2SAndroid Build Coastguard Worker !channel_connect_timeout_str->empty()) {
464*cc02d7e2SAndroid Build Coastguard Worker connect_deadline_seconds = atoi(channel_connect_timeout_str->c_str());
465*cc02d7e2SAndroid Build Coastguard Worker }
466*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO,
467*cc02d7e2SAndroid Build Coastguard Worker "Waiting for up to %d seconds for all channels to connect",
468*cc02d7e2SAndroid Build Coastguard Worker connect_deadline_seconds);
469*cc02d7e2SAndroid Build Coastguard Worker gpr_timespec connect_deadline = gpr_time_add(
470*cc02d7e2SAndroid Build Coastguard Worker gpr_now(GPR_CLOCK_REALTIME),
471*cc02d7e2SAndroid Build Coastguard Worker gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN));
472*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue cq;
473*cc02d7e2SAndroid Build Coastguard Worker size_t num_remaining = 0;
474*cc02d7e2SAndroid Build Coastguard Worker for (auto& c : channels_) {
475*cc02d7e2SAndroid Build Coastguard Worker if (!c.is_inproc()) {
476*cc02d7e2SAndroid Build Coastguard Worker Channel* channel = c.get_channel();
477*cc02d7e2SAndroid Build Coastguard Worker grpc_connectivity_state last_observed = channel->GetState(true);
478*cc02d7e2SAndroid Build Coastguard Worker if (last_observed == GRPC_CHANNEL_READY) {
479*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Channel %p connected!", channel);
480*cc02d7e2SAndroid Build Coastguard Worker } else {
481*cc02d7e2SAndroid Build Coastguard Worker num_remaining++;
482*cc02d7e2SAndroid Build Coastguard Worker channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
483*cc02d7e2SAndroid Build Coastguard Worker channel);
484*cc02d7e2SAndroid Build Coastguard Worker }
485*cc02d7e2SAndroid Build Coastguard Worker }
486*cc02d7e2SAndroid Build Coastguard Worker }
487*cc02d7e2SAndroid Build Coastguard Worker while (num_remaining > 0) {
488*cc02d7e2SAndroid Build Coastguard Worker bool ok = false;
489*cc02d7e2SAndroid Build Coastguard Worker void* tag = nullptr;
490*cc02d7e2SAndroid Build Coastguard Worker cq.Next(&tag, &ok);
491*cc02d7e2SAndroid Build Coastguard Worker Channel* channel = static_cast<Channel*>(tag);
492*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
493*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Crash(absl::StrFormat(
494*cc02d7e2SAndroid Build Coastguard Worker "Channel %p failed to connect within the deadline", channel));
495*cc02d7e2SAndroid Build Coastguard Worker } else {
496*cc02d7e2SAndroid Build Coastguard Worker grpc_connectivity_state last_observed = channel->GetState(true);
497*cc02d7e2SAndroid Build Coastguard Worker if (last_observed == GRPC_CHANNEL_READY) {
498*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Channel %p connected!", channel);
499*cc02d7e2SAndroid Build Coastguard Worker num_remaining--;
500*cc02d7e2SAndroid Build Coastguard Worker } else {
501*cc02d7e2SAndroid Build Coastguard Worker channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
502*cc02d7e2SAndroid Build Coastguard Worker channel);
503*cc02d7e2SAndroid Build Coastguard Worker }
504*cc02d7e2SAndroid Build Coastguard Worker }
505*cc02d7e2SAndroid Build Coastguard Worker }
506*cc02d7e2SAndroid Build Coastguard Worker }
507*cc02d7e2SAndroid Build Coastguard Worker
508*cc02d7e2SAndroid Build Coastguard Worker protected:
509*cc02d7e2SAndroid Build Coastguard Worker const int cores_;
510*cc02d7e2SAndroid Build Coastguard Worker RequestType request_;
511*cc02d7e2SAndroid Build Coastguard Worker
512*cc02d7e2SAndroid Build Coastguard Worker class ClientChannelInfo {
513*cc02d7e2SAndroid Build Coastguard Worker public:
ClientChannelInfo(const std::string & target,const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub,int shard)514*cc02d7e2SAndroid Build Coastguard Worker ClientChannelInfo(
515*cc02d7e2SAndroid Build Coastguard Worker const std::string& target, const ClientConfig& config,
516*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
517*cc02d7e2SAndroid Build Coastguard Worker create_stub,
518*cc02d7e2SAndroid Build Coastguard Worker int shard) {
519*cc02d7e2SAndroid Build Coastguard Worker ChannelArguments args;
520*cc02d7e2SAndroid Build Coastguard Worker args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
521*cc02d7e2SAndroid Build Coastguard Worker set_channel_args(config, &args);
522*cc02d7e2SAndroid Build Coastguard Worker
523*cc02d7e2SAndroid Build Coastguard Worker std::string type;
524*cc02d7e2SAndroid Build Coastguard Worker if (config.has_security_params() &&
525*cc02d7e2SAndroid Build Coastguard Worker config.security_params().cred_type().empty()) {
526*cc02d7e2SAndroid Build Coastguard Worker type = kTlsCredentialsType;
527*cc02d7e2SAndroid Build Coastguard Worker } else {
528*cc02d7e2SAndroid Build Coastguard Worker type = config.security_params().cred_type();
529*cc02d7e2SAndroid Build Coastguard Worker }
530*cc02d7e2SAndroid Build Coastguard Worker
531*cc02d7e2SAndroid Build Coastguard Worker std::string inproc_pfx(INPROC_NAME_PREFIX);
532*cc02d7e2SAndroid Build Coastguard Worker if (!absl::StartsWith(target, inproc_pfx)) {
533*cc02d7e2SAndroid Build Coastguard Worker channel_ = CreateTestChannel(
534*cc02d7e2SAndroid Build Coastguard Worker target, type, config.security_params().server_host_override(),
535*cc02d7e2SAndroid Build Coastguard Worker !config.security_params().use_test_ca(),
536*cc02d7e2SAndroid Build Coastguard Worker std::shared_ptr<CallCredentials>(), args);
537*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
538*cc02d7e2SAndroid Build Coastguard Worker is_inproc_ = false;
539*cc02d7e2SAndroid Build Coastguard Worker } else {
540*cc02d7e2SAndroid Build Coastguard Worker std::string tgt = target;
541*cc02d7e2SAndroid Build Coastguard Worker tgt.erase(0, inproc_pfx.length());
542*cc02d7e2SAndroid Build Coastguard Worker int srv_num = std::stoi(tgt);
543*cc02d7e2SAndroid Build Coastguard Worker channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args);
544*cc02d7e2SAndroid Build Coastguard Worker is_inproc_ = true;
545*cc02d7e2SAndroid Build Coastguard Worker }
546*cc02d7e2SAndroid Build Coastguard Worker stub_ = create_stub(channel_);
547*cc02d7e2SAndroid Build Coastguard Worker }
get_channel()548*cc02d7e2SAndroid Build Coastguard Worker Channel* get_channel() { return channel_.get(); }
get_stub()549*cc02d7e2SAndroid Build Coastguard Worker StubType* get_stub() { return stub_.get(); }
is_inproc()550*cc02d7e2SAndroid Build Coastguard Worker bool is_inproc() { return is_inproc_; }
551*cc02d7e2SAndroid Build Coastguard Worker
552*cc02d7e2SAndroid Build Coastguard Worker private:
set_channel_args(const ClientConfig & config,ChannelArguments * args)553*cc02d7e2SAndroid Build Coastguard Worker void set_channel_args(const ClientConfig& config, ChannelArguments* args) {
554*cc02d7e2SAndroid Build Coastguard Worker for (const auto& channel_arg : config.channel_args()) {
555*cc02d7e2SAndroid Build Coastguard Worker if (channel_arg.value_case() == ChannelArg::kStrValue) {
556*cc02d7e2SAndroid Build Coastguard Worker args->SetString(channel_arg.name(), channel_arg.str_value());
557*cc02d7e2SAndroid Build Coastguard Worker } else if (channel_arg.value_case() == ChannelArg::kIntValue) {
558*cc02d7e2SAndroid Build Coastguard Worker args->SetInt(channel_arg.name(), channel_arg.int_value());
559*cc02d7e2SAndroid Build Coastguard Worker } else {
560*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_ERROR, "Empty channel arg value.");
561*cc02d7e2SAndroid Build Coastguard Worker }
562*cc02d7e2SAndroid Build Coastguard Worker }
563*cc02d7e2SAndroid Build Coastguard Worker }
564*cc02d7e2SAndroid Build Coastguard Worker
565*cc02d7e2SAndroid Build Coastguard Worker std::shared_ptr<Channel> channel_;
566*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<StubType> stub_;
567*cc02d7e2SAndroid Build Coastguard Worker bool is_inproc_;
568*cc02d7e2SAndroid Build Coastguard Worker };
569*cc02d7e2SAndroid Build Coastguard Worker std::vector<ClientChannelInfo> channels_;
570*cc02d7e2SAndroid Build Coastguard Worker std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
571*cc02d7e2SAndroid Build Coastguard Worker create_stub_;
572*cc02d7e2SAndroid Build Coastguard Worker };
573*cc02d7e2SAndroid Build Coastguard Worker
574*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config);
575*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config);
576*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config);
577*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
578*cc02d7e2SAndroid Build Coastguard Worker const ClientConfig& config);
579*cc02d7e2SAndroid Build Coastguard Worker
580*cc02d7e2SAndroid Build Coastguard Worker } // namespace testing
581*cc02d7e2SAndroid Build Coastguard Worker } // namespace grpc
582*cc02d7e2SAndroid Build Coastguard Worker
583*cc02d7e2SAndroid Build Coastguard Worker #endif // GRPC_TEST_CPP_QPS_CLIENT_H
584