xref: /aosp_15_r20/external/grpc-grpc-java/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadClient.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.benchmarks.driver;
18 
19 import com.sun.management.OperatingSystemMXBean;
20 import io.grpc.CallOptions;
21 import io.grpc.Channel;
22 import io.grpc.ClientCall;
23 import io.grpc.ManagedChannel;
24 import io.grpc.Metadata;
25 import io.grpc.Status;
26 import io.grpc.benchmarks.Transport;
27 import io.grpc.benchmarks.Utils;
28 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
29 import io.grpc.benchmarks.proto.Control;
30 import io.grpc.benchmarks.proto.Messages;
31 import io.grpc.benchmarks.proto.Payloads;
32 import io.grpc.benchmarks.proto.Stats;
33 import io.grpc.stub.ClientCalls;
34 import io.grpc.stub.StreamObserver;
35 import io.netty.buffer.ByteBuf;
36 import io.netty.buffer.PooledByteBufAllocator;
37 import io.netty.channel.epoll.Epoll;
38 import io.netty.util.concurrent.DefaultThreadFactory;
39 import java.lang.management.ManagementFactory;
40 import java.util.List;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.Semaphore;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicReference;
46 import java.util.concurrent.locks.LockSupport;
47 import java.util.logging.Level;
48 import java.util.logging.Logger;
49 import org.HdrHistogram.Histogram;
50 import org.HdrHistogram.LogarithmicIterator;
51 import org.HdrHistogram.Recorder;
52 import org.apache.commons.math3.distribution.ExponentialDistribution;
53 
54 /**
55  * Implements the client-side contract for the load testing scenarios.
56  */
57 class LoadClient {
58 
59   private static final Logger log = Logger.getLogger(LoadClient.class.getName());
60   private ByteBuf genericRequest;
61 
62   private final Control.ClientConfig config;
63   private final ExponentialDistribution distribution;
64   private volatile boolean shutdown;
65   private final int threadCount;
66 
67   ManagedChannel[] channels;
68   BenchmarkServiceGrpc.BenchmarkServiceBlockingStub[] blockingStubs;
69   BenchmarkServiceGrpc.BenchmarkServiceStub[] asyncStubs;
70   Recorder recorder;
71   private ExecutorService fixedThreadPool;
72   private Messages.SimpleRequest simpleRequest;
73   private final OperatingSystemMXBean osBean;
74   private long lastMarkCpuTime;
75 
LoadClient(Control.ClientConfig config)76   LoadClient(Control.ClientConfig config) throws Exception {
77     log.log(Level.INFO, "Client Config \n" + config.toString());
78     this.config = config;
79     // Create the channels
80     channels = new ManagedChannel[config.getClientChannels()];
81     for (int i = 0; i < config.getClientChannels(); i++) {
82       channels[i] =
83           Utils.newClientChannel(
84               Epoll.isAvailable() ?  Transport.NETTY_EPOLL : Transport.NETTY_NIO,
85               config.getServerTargets(i % config.getServerTargetsCount()),
86               config.hasSecurityParams(),
87               config.hasSecurityParams() && config.getSecurityParams().getUseTestCa(),
88               config.hasSecurityParams()
89                   ? config.getSecurityParams().getServerHostOverride()
90                   : null,
91               Utils.DEFAULT_FLOW_CONTROL_WINDOW,
92               false);
93     }
94 
95     // Create a stub per channel
96     if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) {
97       asyncStubs = new BenchmarkServiceGrpc.BenchmarkServiceStub[channels.length];
98       for (int i = 0; i < channels.length; i++) {
99         asyncStubs[i] = BenchmarkServiceGrpc.newStub(channels[i]);
100       }
101     } else {
102       blockingStubs = new BenchmarkServiceGrpc.BenchmarkServiceBlockingStub[channels.length];
103       for (int i = 0; i < channels.length; i++) {
104         blockingStubs[i] = BenchmarkServiceGrpc.newBlockingStub(channels[i]);
105       }
106     }
107 
108     // Determine no of threads
109     if (config.getClientType() == Control.ClientType.SYNC_CLIENT) {
110       threadCount = config.getOutstandingRpcsPerChannel() * config.getClientChannels();
111     } else {
112       threadCount = config.getAsyncClientThreads() == 0
113           ? Runtime.getRuntime().availableProcessors()
114           : config.getAsyncClientThreads();
115     }
116     // Use a fixed sized pool of daemon threads.
117     fixedThreadPool = Executors.newFixedThreadPool(threadCount,
118         new DefaultThreadFactory("client-worker", true));
119 
120     // Create the load distribution
121     switch (config.getLoadParams().getLoadCase()) {
122       case CLOSED_LOOP:
123         distribution = null;
124         break;
125       case LOAD_NOT_SET:
126         distribution = null;
127         break;
128       case POISSON:
129         // Mean of exp distribution per thread is <no threads> / <offered load per second>
130         distribution = new ExponentialDistribution(
131             threadCount / config.getLoadParams().getPoisson().getOfferedLoad());
132         break;
133       default:
134         throw new IllegalArgumentException("Scenario not implemented");
135     }
136 
137     // Create payloads
138     switch (config.getPayloadConfig().getPayloadCase()) {
139       case SIMPLE_PARAMS: {
140         Payloads.SimpleProtoParams simpleParams = config.getPayloadConfig().getSimpleParams();
141         simpleRequest = Utils.makeRequest(Messages.PayloadType.COMPRESSABLE,
142             simpleParams.getReqSize(), simpleParams.getRespSize());
143         break;
144       }
145       case BYTEBUF_PARAMS: {
146         PooledByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
147         genericRequest = alloc.buffer(config.getPayloadConfig().getBytebufParams().getRespSize());
148         if (genericRequest.capacity() > 0) {
149           genericRequest.writerIndex(genericRequest.capacity() - 1);
150         }
151         break;
152       }
153       default: {
154         // Not implemented yet
155         throw new IllegalArgumentException("Scenario not implemented");
156       }
157     }
158 
159     List<OperatingSystemMXBean> beans =
160         ManagementFactory.getPlatformMXBeans(OperatingSystemMXBean.class);
161     if (!beans.isEmpty()) {
162       osBean = beans.get(0);
163     } else {
164       osBean = null;
165     }
166 
167     // Create the histogram recorder
168     recorder = new Recorder((long) config.getHistogramParams().getMaxPossible(), 3);
169   }
170 
171   /**
172    * Start the load scenario.
173    */
start()174   void start() {
175     Runnable r;
176     for (int i = 0; i < threadCount; i++) {
177       r = null;
178       switch (config.getPayloadConfig().getPayloadCase()) {
179         case SIMPLE_PARAMS: {
180           if (config.getClientType() == Control.ClientType.SYNC_CLIENT) {
181             if (config.getRpcType() == Control.RpcType.UNARY) {
182               r = new BlockingUnaryWorker(blockingStubs[i % blockingStubs.length]);
183             }
184           } else if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) {
185             if (config.getRpcType() == Control.RpcType.UNARY) {
186               r = new AsyncUnaryWorker(asyncStubs[i % asyncStubs.length]);
187             } else if (config.getRpcType() == Control.RpcType.STREAMING) {
188               r = new AsyncPingPongWorker(asyncStubs[i % asyncStubs.length]);
189             }
190           }
191           break;
192         }
193         case BYTEBUF_PARAMS: {
194           if (config.getClientType() == Control.ClientType.SYNC_CLIENT) {
195             if (config.getRpcType() == Control.RpcType.UNARY) {
196               r = new GenericBlockingUnaryWorker(channels[i % channels.length]);
197             }
198           } else if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) {
199             if (config.getRpcType() == Control.RpcType.UNARY) {
200               r = new GenericAsyncUnaryWorker(channels[i % channels.length]);
201             } else if (config.getRpcType() == Control.RpcType.STREAMING) {
202               r = new GenericAsyncPingPongWorker(channels[i % channels.length]);
203             }
204           }
205 
206           break;
207         }
208         default: {
209           throw Status.UNIMPLEMENTED.withDescription(
210               "Unknown payload case " + config.getPayloadConfig().getPayloadCase().name())
211               .asRuntimeException();
212         }
213       }
214       if (r == null) {
215         throw new IllegalStateException(config.getRpcType().name()
216             + " not supported for client type "
217             + config.getClientType());
218       }
219       fixedThreadPool.execute(r);
220     }
221     if (osBean != null) {
222       lastMarkCpuTime = osBean.getProcessCpuTime();
223     }
224   }
225 
226   /**
227    * Take a snapshot of the statistics which can be returned to the driver.
228    */
getStats()229   Stats.ClientStats getStats() {
230     Histogram intervalHistogram = recorder.getIntervalHistogram();
231 
232     Stats.ClientStats.Builder statsBuilder = Stats.ClientStats.newBuilder();
233     Stats.HistogramData.Builder latenciesBuilder = statsBuilder.getLatenciesBuilder();
234     double resolution = 1.0 + Math.max(config.getHistogramParams().getResolution(), 0.01);
235     LogarithmicIterator logIterator = new LogarithmicIterator(intervalHistogram, 1,
236         resolution);
237     double base = 1;
238     while (logIterator.hasNext()) {
239       latenciesBuilder.addBucket((int) logIterator.next().getCountAddedInThisIterationStep());
240       base = base * resolution;
241     }
242     // Driver expects values for all buckets in the range, not just the range of buckets that
243     // have values.
244     while (base < config.getHistogramParams().getMaxPossible()) {
245       latenciesBuilder.addBucket(0);
246       base = base * resolution;
247     }
248     latenciesBuilder.setMaxSeen((double) intervalHistogram.getMaxValue());
249     latenciesBuilder.setMinSeen((double) intervalHistogram.getMinNonZeroValue());
250     latenciesBuilder.setCount((double) intervalHistogram.getTotalCount());
251     latenciesBuilder.setSum(intervalHistogram.getMean()
252         * intervalHistogram.getTotalCount());
253     // TODO: No support for sum of squares
254 
255     statsBuilder.setTimeElapsed((intervalHistogram.getEndTimeStamp()
256         - intervalHistogram.getStartTimeStamp()) / 1000.0);
257     if (osBean != null) {
258       // Report all the CPU time as user-time  (which is intentionally incorrect)
259       long nowCpu = osBean.getProcessCpuTime();
260       statsBuilder.setTimeUser(((double) nowCpu - lastMarkCpuTime) / 1000000000.0);
261       lastMarkCpuTime = nowCpu;
262     }
263     return statsBuilder.build();
264   }
265 
266   /**
267    * Shutdown the scenario as cleanly as possible.
268    */
shutdownNow()269   void shutdownNow() {
270     shutdown = true;
271     for (int i = 0; i < channels.length; i++) {
272       // Initiate channel shutdown
273       channels[i].shutdown();
274     }
275     for (int i = 0; i < channels.length; i++) {
276       try {
277         // Wait for channel termination
278         channels[i].awaitTermination(1, TimeUnit.SECONDS);
279       } catch (InterruptedException ie) {
280         channels[i].shutdownNow();
281       }
282     }
283     fixedThreadPool.shutdownNow();
284   }
285 
286   /**
287    * Record the event elapsed time to the histogram and delay initiation of the next event based
288    * on the load distribution.
289    */
delay(long alreadyElapsed)290   void delay(long alreadyElapsed) {
291     recorder.recordValue(alreadyElapsed);
292     if (distribution != null) {
293       long nextPermitted = Math.round(distribution.sample() * 1000000000.0);
294       if (nextPermitted > alreadyElapsed) {
295         LockSupport.parkNanos(nextPermitted - alreadyElapsed);
296       }
297     }
298   }
299 
300   /**
301    * Worker which executes blocking unary calls. Event timing is the duration between sending the
302    * request and receiving the response.
303    */
304   class BlockingUnaryWorker implements Runnable {
305     final BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub;
306 
BlockingUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub)307     private BlockingUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub) {
308       this.stub = stub;
309     }
310 
311     @Override
run()312     public void run() {
313       while (!shutdown) {
314         long now = System.nanoTime();
315         stub.unaryCall(simpleRequest);
316         delay(System.nanoTime() - now);
317       }
318     }
319   }
320 
321   /**
322    * Worker which executes async unary calls. Event timing is the duration between sending the
323    * request and receiving the response.
324    */
325   private class AsyncUnaryWorker implements Runnable {
326     final BenchmarkServiceGrpc.BenchmarkServiceStub stub;
327     final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
328 
AsyncUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub)329     AsyncUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub) {
330       this.stub = stub;
331     }
332 
333     @Override
run()334     public void run() {
335       while (true) {
336         maxOutstanding.acquireUninterruptibly();
337         if (shutdown) {
338           maxOutstanding.release();
339           return;
340         }
341         stub.unaryCall(simpleRequest, new StreamObserver<Messages.SimpleResponse>() {
342           long now = System.nanoTime();
343           @Override
344           public void onNext(Messages.SimpleResponse value) {
345 
346           }
347 
348           @Override
349           public void onError(Throwable t) {
350             maxOutstanding.release();
351             Level level = shutdown ? Level.FINE : Level.INFO;
352             log.log(level, "Error in AsyncUnary call", t);
353           }
354 
355           @Override
356           public void onCompleted() {
357             delay(System.nanoTime() - now);
358             maxOutstanding.release();
359           }
360         });
361       }
362     }
363   }
364 
365   /**
366    * Worker which executes a streaming ping-pong call. Event timing is the duration between
367    * sending the ping and receiving the pong.
368    */
369   private class AsyncPingPongWorker implements Runnable {
370     final BenchmarkServiceGrpc.BenchmarkServiceStub stub;
371     final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
372 
AsyncPingPongWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub)373     AsyncPingPongWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub) {
374       this.stub = stub;
375     }
376 
377     @Override
run()378     public void run() {
379       while (!shutdown) {
380         maxOutstanding.acquireUninterruptibly();
381         final AtomicReference<StreamObserver<Messages.SimpleRequest>> requestObserver =
382             new AtomicReference<>();
383         requestObserver.set(stub.streamingCall(
384             new StreamObserver<Messages.SimpleResponse>() {
385               long now = System.nanoTime();
386 
387               @Override
388               public void onNext(Messages.SimpleResponse value) {
389                 delay(System.nanoTime() - now);
390                 if (shutdown) {
391                   requestObserver.get().onCompleted();
392                   // Must not send another request.
393                   return;
394                 }
395                 requestObserver.get().onNext(simpleRequest);
396                 now = System.nanoTime();
397               }
398 
399               @Override
400               public void onError(Throwable t) {
401                 maxOutstanding.release();
402                 Level level = shutdown ? Level.FINE : Level.INFO;
403                 log.log(level, "Error in Async Ping-Pong call", t);
404 
405               }
406 
407               @Override
408               public void onCompleted() {
409                 maxOutstanding.release();
410               }
411             }));
412         requestObserver.get().onNext(simpleRequest);
413       }
414     }
415   }
416 
417   /**
418    * Worker which executes generic blocking unary calls. Event timing is the duration between
419    * sending the request and receiving the response.
420    */
421   private class GenericBlockingUnaryWorker implements Runnable {
422     final Channel channel;
423 
GenericBlockingUnaryWorker(Channel channel)424     GenericBlockingUnaryWorker(Channel channel) {
425       this.channel = channel;
426     }
427 
428     @Override
run()429     public void run() {
430       long now;
431       while (!shutdown) {
432         now = System.nanoTime();
433         ClientCalls.blockingUnaryCall(channel, LoadServer.GENERIC_UNARY_METHOD,
434             CallOptions.DEFAULT,
435             genericRequest.slice());
436         delay(System.nanoTime() - now);
437       }
438     }
439   }
440 
441   /**
442    * Worker which executes generic async unary calls. Event timing is the duration between
443    * sending the request and receiving the response.
444    */
445   private class GenericAsyncUnaryWorker implements Runnable {
446     final Channel channel;
447     final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
448 
GenericAsyncUnaryWorker(Channel channel)449     GenericAsyncUnaryWorker(Channel channel) {
450       this.channel = channel;
451     }
452 
453     @Override
run()454     public void run() {
455       while (true) {
456         maxOutstanding.acquireUninterruptibly();
457         if (shutdown) {
458           maxOutstanding.release();
459           return;
460         }
461         ClientCalls.asyncUnaryCall(
462             channel.newCall(LoadServer.GENERIC_UNARY_METHOD, CallOptions.DEFAULT),
463             genericRequest.slice(),
464             new StreamObserver<ByteBuf>() {
465               long now = System.nanoTime();
466 
467               @Override
468               public void onNext(ByteBuf value) {
469 
470               }
471 
472               @Override
473               public void onError(Throwable t) {
474                 maxOutstanding.release();
475                 Level level = shutdown ? Level.FINE : Level.INFO;
476                 log.log(level, "Error in Generic Async Unary call", t);
477               }
478 
479               @Override
480               public void onCompleted() {
481                 delay(System.nanoTime() - now);
482                 maxOutstanding.release();
483               }
484             });
485       }
486     }
487   }
488 
489   /**
490    * Worker which executes a streaming ping-pong call. Event timing is the duration between
491    * sending the ping and receiving the pong.
492    */
493   private class GenericAsyncPingPongWorker implements Runnable {
494     final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
495     final Channel channel;
496 
GenericAsyncPingPongWorker(Channel channel)497     GenericAsyncPingPongWorker(Channel channel) {
498       this.channel = channel;
499     }
500 
501     @Override
run()502     public void run() {
503       while (true) {
504         maxOutstanding.acquireUninterruptibly();
505         if (shutdown) {
506           maxOutstanding.release();
507           return;
508         }
509         final ClientCall<ByteBuf, ByteBuf> call =
510             channel.newCall(LoadServer.GENERIC_STREAMING_PING_PONG_METHOD, CallOptions.DEFAULT);
511         call.start(new ClientCall.Listener<ByteBuf>() {
512           long now = System.nanoTime();
513 
514           @Override
515           public void onMessage(ByteBuf message) {
516             delay(System.nanoTime() - now);
517             if (shutdown) {
518               call.cancel("Shutting down", null);
519               return;
520             }
521             call.request(1);
522             call.sendMessage(genericRequest.slice());
523             now = System.nanoTime();
524           }
525 
526           @Override
527           public void onClose(Status status, Metadata trailers) {
528             maxOutstanding.release();
529             Level level = shutdown ? Level.FINE : Level.INFO;
530             if (!status.isOk() && status.getCode() != Status.Code.CANCELLED) {
531               log.log(level, "Error in Generic Async Ping-Pong call", status.getCause());
532             }
533           }
534         }, new Metadata());
535         call.request(1);
536         call.sendMessage(genericRequest.slice());
537       }
538     }
539   }
540 }
541