1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 //
20 // Basic I/O ping-pong benchmarks.
21
22 // The goal here is to establish lower bounds on how fast the stack could get by
23 // measuring the cost of using various I/O strategies to do a basic
24 // request-response loop.
25 //
26
27 #include <errno.h>
28 #include <netinet/in.h>
29 #include <poll.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <unistd.h>
33 #ifdef __linux__
34 #include <sys/epoll.h>
35 #endif
36 #include <sys/socket.h>
37
38 #include <grpc/support/alloc.h>
39 #include <grpc/support/log.h>
40 #include <grpc/support/time.h>
41
42 #include "src/core/lib/gpr/useful.h"
43 #include "src/core/lib/gprpp/strerror.h"
44 #include "src/core/lib/gprpp/thd.h"
45 #include "src/core/lib/iomgr/error.h"
46 #include "src/core/lib/iomgr/socket_utils_posix.h"
47 #include "test/core/util/cmdline.h"
48 #include "test/core/util/histogram.h"
49
50 typedef struct fd_pair {
51 int read_fd;
52 int write_fd;
53 } fd_pair;
54
55 typedef struct thread_args {
56 fd_pair fds;
57 size_t msg_size;
58 int (*read_bytes)(struct thread_args* args, char* buf);
59 int (*write_bytes)(struct thread_args* args, char* buf);
60 int (*setup)(struct thread_args* args);
61 int epoll_fd;
62 const char* strategy_name;
63 } thread_args;
64
65 //
66 // Read strategies
67
68 // There are a number of read strategies, each of which has a blocking and
69 // non-blocking version.
70 //
71
72 // Basic call to read()
read_bytes(int fd,char * buf,size_t read_size,int spin)73 static int read_bytes(int fd, char* buf, size_t read_size, int spin) {
74 size_t bytes_read = 0;
75 ssize_t err;
76 do {
77 err = read(fd, buf + bytes_read, read_size - bytes_read);
78 if (err < 0) {
79 if (errno == EINTR) {
80 continue;
81 } else {
82 if (errno == EAGAIN && spin == 1) {
83 continue;
84 }
85 gpr_log(GPR_ERROR, "Read failed: %s",
86 grpc_core::StrError(errno).c_str());
87 return -1;
88 }
89 } else {
90 bytes_read += static_cast<size_t>(err);
91 }
92 } while (bytes_read < read_size);
93 return 0;
94 }
95
blocking_read_bytes(thread_args * args,char * buf)96 static int blocking_read_bytes(thread_args* args, char* buf) {
97 return read_bytes(args->fds.read_fd, buf, args->msg_size, 0);
98 }
99
spin_read_bytes(thread_args * args,char * buf)100 static int spin_read_bytes(thread_args* args, char* buf) {
101 return read_bytes(args->fds.read_fd, buf, args->msg_size, 1);
102 }
103
104 // Call poll() to monitor a non-blocking fd
poll_read_bytes(int fd,char * buf,size_t read_size,int spin)105 static int poll_read_bytes(int fd, char* buf, size_t read_size, int spin) {
106 struct pollfd pfd;
107 size_t bytes_read = 0;
108 int err;
109 ssize_t err2;
110
111 pfd.fd = fd;
112 pfd.events = POLLIN;
113 do {
114 err = poll(&pfd, 1, spin ? 0 : -1);
115 if (err < 0) {
116 if (errno == EINTR) {
117 continue;
118 } else {
119 gpr_log(GPR_ERROR, "Poll failed: %s",
120 grpc_core::StrError(errno).c_str());
121 return -1;
122 }
123 }
124 if (err == 0 && spin) continue;
125 GPR_ASSERT(err == 1);
126 GPR_ASSERT(pfd.revents == POLLIN);
127 do {
128 err2 = read(fd, buf + bytes_read, read_size - bytes_read);
129 } while (err2 < 0 && errno == EINTR);
130 if (err2 < 0 && errno != EAGAIN) {
131 gpr_log(GPR_ERROR, "Read failed: %s", grpc_core::StrError(errno).c_str());
132 return -1;
133 }
134 bytes_read += static_cast<size_t>(err2);
135 } while (bytes_read < read_size);
136 return 0;
137 }
138
poll_read_bytes_blocking(struct thread_args * args,char * buf)139 static int poll_read_bytes_blocking(struct thread_args* args, char* buf) {
140 return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 0);
141 }
142
poll_read_bytes_spin(struct thread_args * args,char * buf)143 static int poll_read_bytes_spin(struct thread_args* args, char* buf) {
144 return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 1);
145 }
146
147 #ifdef __linux__
148 // Call epoll_wait() to monitor a non-blocking fd
epoll_read_bytes(struct thread_args * args,char * buf,int spin)149 static int epoll_read_bytes(struct thread_args* args, char* buf, int spin) {
150 struct epoll_event ev;
151 size_t bytes_read = 0;
152 int err;
153 ssize_t err2;
154 size_t read_size = args->msg_size;
155
156 do {
157 err = epoll_wait(args->epoll_fd, &ev, 1, spin ? 0 : -1);
158 if (err < 0) {
159 if (errno == EINTR) continue;
160 gpr_log(GPR_ERROR, "epoll_wait failed: %s",
161 grpc_core::StrError(errno).c_str());
162 return -1;
163 }
164 if (err == 0 && spin) continue;
165 GPR_ASSERT(err == 1);
166 GPR_ASSERT(ev.events & EPOLLIN);
167 GPR_ASSERT(ev.data.fd == args->fds.read_fd);
168 do {
169 do {
170 err2 =
171 read(args->fds.read_fd, buf + bytes_read, read_size - bytes_read);
172 } while (err2 < 0 && errno == EINTR);
173 if (errno == EAGAIN) break;
174 bytes_read += static_cast<size_t>(err2);
175 // TODO(klempner): This should really be doing an extra call after we are
176 // done to ensure we see an EAGAIN
177 } while (bytes_read < read_size);
178 } while (bytes_read < read_size);
179 GPR_ASSERT(bytes_read == read_size);
180 return 0;
181 }
182
epoll_read_bytes_blocking(struct thread_args * args,char * buf)183 static int epoll_read_bytes_blocking(struct thread_args* args, char* buf) {
184 return epoll_read_bytes(args, buf, 0);
185 }
186
epoll_read_bytes_spin(struct thread_args * args,char * buf)187 static int epoll_read_bytes_spin(struct thread_args* args, char* buf) {
188 return epoll_read_bytes(args, buf, 1);
189 }
190 #endif // __linux__
191
192 // Write out bytes.
193 // At this point we only have one strategy, since in the common case these
194 // writes go directly out to the kernel.
195 //
blocking_write_bytes(struct thread_args * args,char * buf)196 static int blocking_write_bytes(struct thread_args* args, char* buf) {
197 size_t bytes_written = 0;
198 ssize_t err;
199 size_t write_size = args->msg_size;
200 do {
201 err = write(args->fds.write_fd, buf + bytes_written,
202 write_size - bytes_written);
203 if (err < 0) {
204 if (errno == EINTR) {
205 continue;
206 } else {
207 gpr_log(GPR_ERROR, "Read failed: %s",
208 grpc_core::StrError(errno).c_str());
209 return -1;
210 }
211 } else {
212 bytes_written += static_cast<size_t>(err);
213 }
214 } while (bytes_written < write_size);
215 return 0;
216 }
217
218 //
219 // Initialization code
220
221 // These are called at the beginning of the client and server thread, depending
222 // on the scenario we're using.
223 //
set_socket_nonblocking(thread_args * args)224 static int set_socket_nonblocking(thread_args* args) {
225 if (!GRPC_LOG_IF_ERROR("Unable to set read socket nonblocking",
226 grpc_set_socket_nonblocking(args->fds.read_fd, 1))) {
227 return -1;
228 }
229 if (!GRPC_LOG_IF_ERROR("Unable to set write socket nonblocking",
230 grpc_set_socket_nonblocking(args->fds.write_fd, 1))) {
231 return -1;
232 }
233 return 0;
234 }
235
do_nothing(thread_args *)236 static int do_nothing(thread_args* /*args*/) { return 0; }
237
238 #ifdef __linux__
239 // Special case for epoll, where we need to create the fd ahead of time.
epoll_setup(thread_args * args)240 static int epoll_setup(thread_args* args) {
241 int epoll_fd;
242 struct epoll_event ev;
243 set_socket_nonblocking(args);
244 epoll_fd = epoll_create(1);
245 if (epoll_fd < 0) {
246 gpr_log(GPR_ERROR, "epoll_create: %s", grpc_core::StrError(errno).c_str());
247 return -1;
248 }
249
250 args->epoll_fd = epoll_fd;
251
252 ev.events = EPOLLIN | EPOLLET;
253 ev.data.fd = args->fds.read_fd;
254 if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, args->fds.read_fd, &ev) < 0) {
255 gpr_log(GPR_ERROR, "epoll_ctl: %s", grpc_core::StrError(errno).c_str());
256 }
257 return 0;
258 }
259 #endif
260
server_thread(thread_args * args)261 static void server_thread(thread_args* args) {
262 char* buf = static_cast<char*>(gpr_malloc(args->msg_size));
263 if (args->setup(args) < 0) {
264 gpr_log(GPR_ERROR, "Setup failed");
265 }
266 for (;;) {
267 if (args->read_bytes(args, buf) < 0) {
268 gpr_log(GPR_ERROR, "Server read failed");
269 gpr_free(buf);
270 return;
271 }
272 if (args->write_bytes(args, buf) < 0) {
273 gpr_log(GPR_ERROR, "Server write failed");
274 gpr_free(buf);
275 return;
276 }
277 }
278 }
279
server_thread_wrap(void * arg)280 static void server_thread_wrap(void* arg) {
281 thread_args* args = static_cast<thread_args*>(arg);
282 server_thread(args);
283 }
284
print_histogram(grpc_histogram * histogram)285 static void print_histogram(grpc_histogram* histogram) {
286 // TODO(klempner): Print more detailed information, such as detailed histogram
287 // buckets
288 gpr_log(GPR_INFO, "latency (50/95/99/99.9): %f/%f/%f/%f",
289 grpc_histogram_percentile(histogram, 50),
290 grpc_histogram_percentile(histogram, 95),
291 grpc_histogram_percentile(histogram, 99),
292 grpc_histogram_percentile(histogram, 99.9));
293 }
294
now(void)295 static double now(void) {
296 gpr_timespec tv = gpr_now(GPR_CLOCK_REALTIME);
297 return 1e9 * static_cast<double>(tv.tv_sec) + static_cast<double>(tv.tv_nsec);
298 }
299
client_thread(thread_args * args)300 static void client_thread(thread_args* args) {
301 char* buf = static_cast<char*>(gpr_malloc(args->msg_size * sizeof(char)));
302 memset(buf, 0, args->msg_size * sizeof(char));
303 grpc_histogram* histogram = grpc_histogram_create(0.01, 60e9);
304 double start_time;
305 double end_time;
306 double interval;
307 const int kNumIters = 100000;
308 int i;
309
310 if (args->setup(args) < 0) {
311 gpr_log(GPR_ERROR, "Setup failed");
312 }
313 for (i = 0; i < kNumIters; ++i) {
314 start_time = now();
315 if (args->write_bytes(args, buf) < 0) {
316 gpr_log(GPR_ERROR, "Client write failed");
317 goto error;
318 }
319 if (args->read_bytes(args, buf) < 0) {
320 gpr_log(GPR_ERROR, "Client read failed");
321 goto error;
322 }
323 end_time = now();
324 if (i > kNumIters / 2) {
325 interval = end_time - start_time;
326 grpc_histogram_add(histogram, interval);
327 }
328 }
329 print_histogram(histogram);
330 error:
331 gpr_free(buf);
332 grpc_histogram_destroy(histogram);
333 }
334
335 // This roughly matches tcp_server's create_listening_socket
create_listening_socket(struct sockaddr * port,socklen_t len)336 static int create_listening_socket(struct sockaddr* port, socklen_t len) {
337 int fd = socket(port->sa_family, SOCK_STREAM, 0);
338 if (fd < 0) {
339 gpr_log(GPR_ERROR, "Unable to create socket: %s",
340 grpc_core::StrError(errno).c_str());
341 goto error;
342 }
343
344 if (!GRPC_LOG_IF_ERROR("Failed to set listening socket cloexec",
345 grpc_set_socket_cloexec(fd, 1))) {
346 goto error;
347 }
348 if (!GRPC_LOG_IF_ERROR("Failed to set listening socket low latency",
349 grpc_set_socket_low_latency(fd, 1))) {
350 goto error;
351 }
352 if (!GRPC_LOG_IF_ERROR("Failed to set listening socket reuse addr",
353 grpc_set_socket_reuse_addr(fd, 1))) {
354 goto error;
355 }
356
357 if (bind(fd, port, len) < 0) {
358 gpr_log(GPR_ERROR, "bind: %s", grpc_core::StrError(errno).c_str());
359 goto error;
360 }
361
362 if (listen(fd, 1) < 0) {
363 gpr_log(GPR_ERROR, "listen: %s", grpc_core::StrError(errno).c_str());
364 goto error;
365 }
366
367 if (getsockname(fd, port, &len) < 0) {
368 gpr_log(GPR_ERROR, "getsockname: %s", grpc_core::StrError(errno).c_str());
369 goto error;
370 }
371
372 return fd;
373
374 error:
375 if (fd >= 0) {
376 close(fd);
377 }
378 return -1;
379 }
380
connect_client(struct sockaddr * addr,socklen_t len)381 static int connect_client(struct sockaddr* addr, socklen_t len) {
382 int fd = socket(addr->sa_family, SOCK_STREAM, 0);
383 int err;
384 if (fd < 0) {
385 gpr_log(GPR_ERROR, "Unable to create socket: %s",
386 grpc_core::StrError(errno).c_str());
387 goto error;
388 }
389
390 if (!GRPC_LOG_IF_ERROR("Failed to set connecting socket cloexec",
391 grpc_set_socket_cloexec(fd, 1))) {
392 goto error;
393 }
394 if (!GRPC_LOG_IF_ERROR("Failed to set connecting socket low latency",
395 grpc_set_socket_low_latency(fd, 1))) {
396 goto error;
397 }
398
399 do {
400 err = connect(fd, addr, len);
401 } while (err < 0 && errno == EINTR);
402
403 if (err < 0) {
404 gpr_log(GPR_ERROR, "connect error: %s", grpc_core::StrError(errno).c_str());
405 goto error;
406 }
407 return fd;
408
409 error:
410 if (fd >= 0) {
411 close(fd);
412 }
413 return -1;
414 }
415
accept_server(int listen_fd)416 static int accept_server(int listen_fd) {
417 int fd = accept(listen_fd, nullptr, nullptr);
418 if (fd < 0) {
419 gpr_log(GPR_ERROR, "Accept failed: %s", grpc_core::StrError(errno).c_str());
420 return -1;
421 }
422 return fd;
423 }
424
create_sockets_tcp(fd_pair * client_fds,fd_pair * server_fds)425 static int create_sockets_tcp(fd_pair* client_fds, fd_pair* server_fds) {
426 int listen_fd = -1;
427 int client_fd = -1;
428 int server_fd = -1;
429
430 struct sockaddr_in port;
431 struct sockaddr* sa_port = reinterpret_cast<struct sockaddr*>(&port);
432
433 port.sin_family = AF_INET;
434 port.sin_port = 0;
435 port.sin_addr.s_addr = INADDR_ANY;
436
437 listen_fd = create_listening_socket(sa_port, sizeof(port));
438 if (listen_fd == -1) {
439 gpr_log(GPR_ERROR, "Listen failed");
440 goto error;
441 }
442
443 client_fd = connect_client(sa_port, sizeof(port));
444 if (client_fd == -1) {
445 gpr_log(GPR_ERROR, "Connect failed");
446 goto error;
447 }
448
449 server_fd = accept_server(listen_fd);
450 if (server_fd == -1) {
451 gpr_log(GPR_ERROR, "Accept failed");
452 goto error;
453 }
454
455 client_fds->read_fd = client_fd;
456 client_fds->write_fd = client_fd;
457 server_fds->read_fd = server_fd;
458 server_fds->write_fd = server_fd;
459 close(listen_fd);
460 return 0;
461
462 error:
463 if (listen_fd != -1) {
464 close(listen_fd);
465 }
466 if (client_fd != -1) {
467 close(client_fd);
468 }
469 if (server_fd != -1) {
470 close(server_fd);
471 }
472 return -1;
473 }
474
create_sockets_socketpair(fd_pair * client_fds,fd_pair * server_fds)475 static int create_sockets_socketpair(fd_pair* client_fds, fd_pair* server_fds) {
476 int fds[2];
477 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
478 gpr_log(GPR_ERROR, "socketpair: %s", grpc_core::StrError(errno).c_str());
479 return -1;
480 }
481
482 client_fds->read_fd = fds[0];
483 client_fds->write_fd = fds[0];
484 server_fds->read_fd = fds[1];
485 server_fds->write_fd = fds[1];
486 return 0;
487 }
488
create_sockets_pipe(fd_pair * client_fds,fd_pair * server_fds)489 static int create_sockets_pipe(fd_pair* client_fds, fd_pair* server_fds) {
490 int cfds[2];
491 int sfds[2];
492 if (pipe(cfds) < 0) {
493 gpr_log(GPR_ERROR, "pipe: %s", grpc_core::StrError(errno).c_str());
494 return -1;
495 }
496
497 if (pipe(sfds) < 0) {
498 gpr_log(GPR_ERROR, "pipe: %s", grpc_core::StrError(errno).c_str());
499 return -1;
500 }
501
502 client_fds->read_fd = cfds[0];
503 client_fds->write_fd = cfds[1];
504 server_fds->read_fd = sfds[0];
505 server_fds->write_fd = sfds[1];
506 return 0;
507 }
508
509 static const char* read_strategy_usage =
510 "Strategy for doing reads, which is one of:\n"
511 " blocking: blocking read calls\n"
512 " same_thread_poll: poll() call on same thread \n"
513 #ifdef __linux__
514 " same_thread_epoll: epoll_wait() on same thread \n"
515 #endif
516 " spin_read: spinning non-blocking read() calls \n"
517 " spin_poll: spinning 0 timeout poll() calls \n"
518 #ifdef __linux__
519 " spin_epoll: spinning 0 timeout epoll_wait() calls \n"
520 #endif
521 "";
522
523 static const char* socket_type_usage =
524 "Type of socket used, one of:\n"
525 " tcp: fds are endpoints of a TCP connection\n"
526 " socketpair: fds come from socketpair()\n"
527 " pipe: fds come from pipe()\n";
528
print_usage(char * argv0)529 void print_usage(char* argv0) {
530 fprintf(stderr, "%s usage:\n\n", argv0);
531 fprintf(stderr, "%s read_strategy socket_type msg_size\n\n", argv0);
532 fprintf(stderr, "where read_strategy is one of:\n");
533 fprintf(stderr, " blocking: blocking read calls\n");
534 fprintf(stderr, " same_thread_poll: poll() call on same thread \n");
535 #ifdef __linux__
536 fprintf(stderr, " same_thread_epoll: epoll_wait() on same thread \n");
537 #endif
538 fprintf(stderr, " spin_read: spinning non-blocking read() calls \n");
539 fprintf(stderr, " spin_poll: spinning 0 timeout poll() calls \n");
540 #ifdef __linux__
541 fprintf(stderr, " spin_epoll: spinning 0 timeout epoll_wait() calls \n");
542 #endif
543 fprintf(stderr, "and socket_type is one of:\n");
544 fprintf(stderr, " tcp: fds are endpoints of a TCP connection\n");
545 fprintf(stderr, " socketpair: fds come from socketpair()\n");
546 fprintf(stderr, " pipe: fds come from pipe()\n");
547 fflush(stderr);
548 }
549
550 typedef struct test_strategy {
551 const char* name;
552 int (*read_strategy)(struct thread_args* args, char* buf);
553 int (*setup)(struct thread_args* args);
554 } test_strategy;
555
556 static test_strategy test_strategies[] = {
557 {"blocking", blocking_read_bytes, do_nothing},
558 {"same_thread_poll", poll_read_bytes_blocking, set_socket_nonblocking},
559 #ifdef __linux__
560 {"same_thread_epoll", epoll_read_bytes_blocking, epoll_setup},
561 {"spin_epoll", epoll_read_bytes_spin, epoll_setup},
562 #endif // __linux__
563 {"spin_read", spin_read_bytes, set_socket_nonblocking},
564 {"spin_poll", poll_read_bytes_spin, set_socket_nonblocking}};
565
566 static const char* socket_types[] = {"tcp", "socketpair", "pipe"};
567
create_socket(const char * socket_type,fd_pair * client_fds,fd_pair * server_fds)568 int create_socket(const char* socket_type, fd_pair* client_fds,
569 fd_pair* server_fds) {
570 if (strcmp(socket_type, "tcp") == 0) {
571 create_sockets_tcp(client_fds, server_fds);
572 } else if (strcmp(socket_type, "socketpair") == 0) {
573 create_sockets_socketpair(client_fds, server_fds);
574 } else if (strcmp(socket_type, "pipe") == 0) {
575 create_sockets_pipe(client_fds, server_fds);
576 } else {
577 fprintf(stderr, "Invalid socket type %s\n", socket_type);
578 fflush(stderr);
579 return -1;
580 }
581 return 0;
582 }
583
run_benchmark(const char * socket_type,thread_args * client_args,thread_args * server_args)584 static int run_benchmark(const char* socket_type, thread_args* client_args,
585 thread_args* server_args) {
586 int rv = 0;
587
588 rv = create_socket(socket_type, &client_args->fds, &server_args->fds);
589 if (rv < 0) {
590 return rv;
591 }
592
593 gpr_log(GPR_INFO, "Starting test %s %s %zu", client_args->strategy_name,
594 socket_type, client_args->msg_size);
595
596 grpc_core::Thread server("server_thread", server_thread_wrap, server_args);
597 server.Start();
598 client_thread(client_args);
599 server.Join();
600
601 return 0;
602 }
603
run_all_benchmarks(size_t msg_size)604 static int run_all_benchmarks(size_t msg_size) {
605 int error = 0;
606 size_t i;
607 for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
608 test_strategy* strategy = &test_strategies[i];
609 size_t j;
610 for (j = 0; j < GPR_ARRAY_SIZE(socket_types); ++j) {
611 thread_args* client_args =
612 static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
613 thread_args* server_args =
614 static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
615 const char* socket_type = socket_types[j];
616
617 client_args->read_bytes = strategy->read_strategy;
618 client_args->write_bytes = blocking_write_bytes;
619 client_args->setup = strategy->setup;
620 client_args->msg_size = msg_size;
621 client_args->strategy_name = strategy->name;
622 server_args->read_bytes = strategy->read_strategy;
623 server_args->write_bytes = blocking_write_bytes;
624 server_args->setup = strategy->setup;
625 server_args->msg_size = msg_size;
626 server_args->strategy_name = strategy->name;
627 error = run_benchmark(socket_type, client_args, server_args);
628 if (error < 0) {
629 return error;
630 }
631 }
632 }
633 return error;
634 }
635
main(int argc,char ** argv)636 int main(int argc, char** argv) {
637 thread_args* client_args =
638 static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
639 thread_args* server_args =
640 static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
641 int msg_size = -1;
642 const char* read_strategy = nullptr;
643 const char* socket_type = nullptr;
644 size_t i;
645 const test_strategy* strategy = nullptr;
646 int error = 0;
647
648 gpr_cmdline* cmdline =
649 gpr_cmdline_create("low_level_ping_pong network benchmarking tool");
650
651 gpr_cmdline_add_int(cmdline, "msg_size", "Size of sent messages", &msg_size);
652 gpr_cmdline_add_string(cmdline, "read_strategy", read_strategy_usage,
653 &read_strategy);
654 gpr_cmdline_add_string(cmdline, "socket_type", socket_type_usage,
655 &socket_type);
656
657 gpr_cmdline_parse(cmdline, argc, argv);
658
659 if (msg_size == -1) {
660 msg_size = 50;
661 }
662
663 if (read_strategy == nullptr) {
664 gpr_log(GPR_INFO, "No strategy specified, running all benchmarks");
665 return run_all_benchmarks(static_cast<size_t>(msg_size));
666 }
667
668 if (socket_type == nullptr) {
669 socket_type = "tcp";
670 }
671 if (msg_size <= 0) {
672 fprintf(stderr, "msg_size must be > 0\n");
673 fflush(stderr);
674 print_usage(argv[0]);
675 return -1;
676 }
677
678 for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
679 if (strcmp(test_strategies[i].name, read_strategy) == 0) {
680 strategy = &test_strategies[i];
681 }
682 }
683 if (strategy == nullptr) {
684 fprintf(stderr, "Invalid read strategy %s\n", read_strategy);
685 fflush(stderr);
686 return -1;
687 }
688
689 client_args->read_bytes = strategy->read_strategy;
690 client_args->write_bytes = blocking_write_bytes;
691 client_args->setup = strategy->setup;
692 client_args->msg_size = static_cast<size_t>(msg_size);
693 client_args->strategy_name = read_strategy;
694 server_args->read_bytes = strategy->read_strategy;
695 server_args->write_bytes = blocking_write_bytes;
696 server_args->setup = strategy->setup;
697 server_args->msg_size = static_cast<size_t>(msg_size);
698 server_args->strategy_name = read_strategy;
699
700 error = run_benchmark(socket_type, client_args, server_args);
701
702 gpr_cmdline_destroy(cmdline);
703 return error;
704 }
705