1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.benchmarks.driver; 18 19 import com.sun.management.OperatingSystemMXBean; 20 import io.grpc.CallOptions; 21 import io.grpc.Channel; 22 import io.grpc.ClientCall; 23 import io.grpc.ManagedChannel; 24 import io.grpc.Metadata; 25 import io.grpc.Status; 26 import io.grpc.benchmarks.Transport; 27 import io.grpc.benchmarks.Utils; 28 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc; 29 import io.grpc.benchmarks.proto.Control; 30 import io.grpc.benchmarks.proto.Messages; 31 import io.grpc.benchmarks.proto.Payloads; 32 import io.grpc.benchmarks.proto.Stats; 33 import io.grpc.stub.ClientCalls; 34 import io.grpc.stub.StreamObserver; 35 import io.netty.buffer.ByteBuf; 36 import io.netty.buffer.PooledByteBufAllocator; 37 import io.netty.channel.epoll.Epoll; 38 import io.netty.util.concurrent.DefaultThreadFactory; 39 import java.lang.management.ManagementFactory; 40 import java.util.List; 41 import java.util.concurrent.ExecutorService; 42 import java.util.concurrent.Executors; 43 import java.util.concurrent.Semaphore; 44 import java.util.concurrent.TimeUnit; 45 import java.util.concurrent.atomic.AtomicReference; 46 import java.util.concurrent.locks.LockSupport; 47 import java.util.logging.Level; 48 import java.util.logging.Logger; 49 import org.HdrHistogram.Histogram; 50 import org.HdrHistogram.LogarithmicIterator; 51 import org.HdrHistogram.Recorder; 52 import org.apache.commons.math3.distribution.ExponentialDistribution; 53 54 /** 55 * Implements the client-side contract for the load testing scenarios. 56 */ 57 class LoadClient { 58 59 private static final Logger log = Logger.getLogger(LoadClient.class.getName()); 60 private ByteBuf genericRequest; 61 62 private final Control.ClientConfig config; 63 private final ExponentialDistribution distribution; 64 private volatile boolean shutdown; 65 private final int threadCount; 66 67 ManagedChannel[] channels; 68 BenchmarkServiceGrpc.BenchmarkServiceBlockingStub[] blockingStubs; 69 BenchmarkServiceGrpc.BenchmarkServiceStub[] asyncStubs; 70 Recorder recorder; 71 private ExecutorService fixedThreadPool; 72 private Messages.SimpleRequest simpleRequest; 73 private final OperatingSystemMXBean osBean; 74 private long lastMarkCpuTime; 75 LoadClient(Control.ClientConfig config)76 LoadClient(Control.ClientConfig config) throws Exception { 77 log.log(Level.INFO, "Client Config \n" + config.toString()); 78 this.config = config; 79 // Create the channels 80 channels = new ManagedChannel[config.getClientChannels()]; 81 for (int i = 0; i < config.getClientChannels(); i++) { 82 channels[i] = 83 Utils.newClientChannel( 84 Epoll.isAvailable() ? Transport.NETTY_EPOLL : Transport.NETTY_NIO, 85 config.getServerTargets(i % config.getServerTargetsCount()), 86 config.hasSecurityParams(), 87 config.hasSecurityParams() && config.getSecurityParams().getUseTestCa(), 88 config.hasSecurityParams() 89 ? config.getSecurityParams().getServerHostOverride() 90 : null, 91 Utils.DEFAULT_FLOW_CONTROL_WINDOW, 92 false); 93 } 94 95 // Create a stub per channel 96 if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) { 97 asyncStubs = new BenchmarkServiceGrpc.BenchmarkServiceStub[channels.length]; 98 for (int i = 0; i < channels.length; i++) { 99 asyncStubs[i] = BenchmarkServiceGrpc.newStub(channels[i]); 100 } 101 } else { 102 blockingStubs = new BenchmarkServiceGrpc.BenchmarkServiceBlockingStub[channels.length]; 103 for (int i = 0; i < channels.length; i++) { 104 blockingStubs[i] = BenchmarkServiceGrpc.newBlockingStub(channels[i]); 105 } 106 } 107 108 // Determine no of threads 109 if (config.getClientType() == Control.ClientType.SYNC_CLIENT) { 110 threadCount = config.getOutstandingRpcsPerChannel() * config.getClientChannels(); 111 } else { 112 threadCount = config.getAsyncClientThreads() == 0 113 ? Runtime.getRuntime().availableProcessors() 114 : config.getAsyncClientThreads(); 115 } 116 // Use a fixed sized pool of daemon threads. 117 fixedThreadPool = Executors.newFixedThreadPool(threadCount, 118 new DefaultThreadFactory("client-worker", true)); 119 120 // Create the load distribution 121 switch (config.getLoadParams().getLoadCase()) { 122 case CLOSED_LOOP: 123 distribution = null; 124 break; 125 case LOAD_NOT_SET: 126 distribution = null; 127 break; 128 case POISSON: 129 // Mean of exp distribution per thread is <no threads> / <offered load per second> 130 distribution = new ExponentialDistribution( 131 threadCount / config.getLoadParams().getPoisson().getOfferedLoad()); 132 break; 133 default: 134 throw new IllegalArgumentException("Scenario not implemented"); 135 } 136 137 // Create payloads 138 switch (config.getPayloadConfig().getPayloadCase()) { 139 case SIMPLE_PARAMS: { 140 Payloads.SimpleProtoParams simpleParams = config.getPayloadConfig().getSimpleParams(); 141 simpleRequest = Utils.makeRequest(Messages.PayloadType.COMPRESSABLE, 142 simpleParams.getReqSize(), simpleParams.getRespSize()); 143 break; 144 } 145 case BYTEBUF_PARAMS: { 146 PooledByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT; 147 genericRequest = alloc.buffer(config.getPayloadConfig().getBytebufParams().getRespSize()); 148 if (genericRequest.capacity() > 0) { 149 genericRequest.writerIndex(genericRequest.capacity() - 1); 150 } 151 break; 152 } 153 default: { 154 // Not implemented yet 155 throw new IllegalArgumentException("Scenario not implemented"); 156 } 157 } 158 159 List<OperatingSystemMXBean> beans = 160 ManagementFactory.getPlatformMXBeans(OperatingSystemMXBean.class); 161 if (!beans.isEmpty()) { 162 osBean = beans.get(0); 163 } else { 164 osBean = null; 165 } 166 167 // Create the histogram recorder 168 recorder = new Recorder((long) config.getHistogramParams().getMaxPossible(), 3); 169 } 170 171 /** 172 * Start the load scenario. 173 */ start()174 void start() { 175 Runnable r; 176 for (int i = 0; i < threadCount; i++) { 177 r = null; 178 switch (config.getPayloadConfig().getPayloadCase()) { 179 case SIMPLE_PARAMS: { 180 if (config.getClientType() == Control.ClientType.SYNC_CLIENT) { 181 if (config.getRpcType() == Control.RpcType.UNARY) { 182 r = new BlockingUnaryWorker(blockingStubs[i % blockingStubs.length]); 183 } 184 } else if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) { 185 if (config.getRpcType() == Control.RpcType.UNARY) { 186 r = new AsyncUnaryWorker(asyncStubs[i % asyncStubs.length]); 187 } else if (config.getRpcType() == Control.RpcType.STREAMING) { 188 r = new AsyncPingPongWorker(asyncStubs[i % asyncStubs.length]); 189 } 190 } 191 break; 192 } 193 case BYTEBUF_PARAMS: { 194 if (config.getClientType() == Control.ClientType.SYNC_CLIENT) { 195 if (config.getRpcType() == Control.RpcType.UNARY) { 196 r = new GenericBlockingUnaryWorker(channels[i % channels.length]); 197 } 198 } else if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) { 199 if (config.getRpcType() == Control.RpcType.UNARY) { 200 r = new GenericAsyncUnaryWorker(channels[i % channels.length]); 201 } else if (config.getRpcType() == Control.RpcType.STREAMING) { 202 r = new GenericAsyncPingPongWorker(channels[i % channels.length]); 203 } 204 } 205 206 break; 207 } 208 default: { 209 throw Status.UNIMPLEMENTED.withDescription( 210 "Unknown payload case " + config.getPayloadConfig().getPayloadCase().name()) 211 .asRuntimeException(); 212 } 213 } 214 if (r == null) { 215 throw new IllegalStateException(config.getRpcType().name() 216 + " not supported for client type " 217 + config.getClientType()); 218 } 219 fixedThreadPool.execute(r); 220 } 221 if (osBean != null) { 222 lastMarkCpuTime = osBean.getProcessCpuTime(); 223 } 224 } 225 226 /** 227 * Take a snapshot of the statistics which can be returned to the driver. 228 */ getStats()229 Stats.ClientStats getStats() { 230 Histogram intervalHistogram = recorder.getIntervalHistogram(); 231 232 Stats.ClientStats.Builder statsBuilder = Stats.ClientStats.newBuilder(); 233 Stats.HistogramData.Builder latenciesBuilder = statsBuilder.getLatenciesBuilder(); 234 double resolution = 1.0 + Math.max(config.getHistogramParams().getResolution(), 0.01); 235 LogarithmicIterator logIterator = new LogarithmicIterator(intervalHistogram, 1, 236 resolution); 237 double base = 1; 238 while (logIterator.hasNext()) { 239 latenciesBuilder.addBucket((int) logIterator.next().getCountAddedInThisIterationStep()); 240 base = base * resolution; 241 } 242 // Driver expects values for all buckets in the range, not just the range of buckets that 243 // have values. 244 while (base < config.getHistogramParams().getMaxPossible()) { 245 latenciesBuilder.addBucket(0); 246 base = base * resolution; 247 } 248 latenciesBuilder.setMaxSeen((double) intervalHistogram.getMaxValue()); 249 latenciesBuilder.setMinSeen((double) intervalHistogram.getMinNonZeroValue()); 250 latenciesBuilder.setCount((double) intervalHistogram.getTotalCount()); 251 latenciesBuilder.setSum(intervalHistogram.getMean() 252 * intervalHistogram.getTotalCount()); 253 // TODO: No support for sum of squares 254 255 statsBuilder.setTimeElapsed((intervalHistogram.getEndTimeStamp() 256 - intervalHistogram.getStartTimeStamp()) / 1000.0); 257 if (osBean != null) { 258 // Report all the CPU time as user-time (which is intentionally incorrect) 259 long nowCpu = osBean.getProcessCpuTime(); 260 statsBuilder.setTimeUser(((double) nowCpu - lastMarkCpuTime) / 1000000000.0); 261 lastMarkCpuTime = nowCpu; 262 } 263 return statsBuilder.build(); 264 } 265 266 /** 267 * Shutdown the scenario as cleanly as possible. 268 */ shutdownNow()269 void shutdownNow() { 270 shutdown = true; 271 for (int i = 0; i < channels.length; i++) { 272 // Initiate channel shutdown 273 channels[i].shutdown(); 274 } 275 for (int i = 0; i < channels.length; i++) { 276 try { 277 // Wait for channel termination 278 channels[i].awaitTermination(1, TimeUnit.SECONDS); 279 } catch (InterruptedException ie) { 280 channels[i].shutdownNow(); 281 } 282 } 283 fixedThreadPool.shutdownNow(); 284 } 285 286 /** 287 * Record the event elapsed time to the histogram and delay initiation of the next event based 288 * on the load distribution. 289 */ delay(long alreadyElapsed)290 void delay(long alreadyElapsed) { 291 recorder.recordValue(alreadyElapsed); 292 if (distribution != null) { 293 long nextPermitted = Math.round(distribution.sample() * 1000000000.0); 294 if (nextPermitted > alreadyElapsed) { 295 LockSupport.parkNanos(nextPermitted - alreadyElapsed); 296 } 297 } 298 } 299 300 /** 301 * Worker which executes blocking unary calls. Event timing is the duration between sending the 302 * request and receiving the response. 303 */ 304 class BlockingUnaryWorker implements Runnable { 305 final BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub; 306 BlockingUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub)307 private BlockingUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub) { 308 this.stub = stub; 309 } 310 311 @Override run()312 public void run() { 313 while (!shutdown) { 314 long now = System.nanoTime(); 315 stub.unaryCall(simpleRequest); 316 delay(System.nanoTime() - now); 317 } 318 } 319 } 320 321 /** 322 * Worker which executes async unary calls. Event timing is the duration between sending the 323 * request and receiving the response. 324 */ 325 private class AsyncUnaryWorker implements Runnable { 326 final BenchmarkServiceGrpc.BenchmarkServiceStub stub; 327 final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel()); 328 AsyncUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub)329 AsyncUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub) { 330 this.stub = stub; 331 } 332 333 @Override run()334 public void run() { 335 while (true) { 336 maxOutstanding.acquireUninterruptibly(); 337 if (shutdown) { 338 maxOutstanding.release(); 339 return; 340 } 341 stub.unaryCall(simpleRequest, new StreamObserver<Messages.SimpleResponse>() { 342 long now = System.nanoTime(); 343 @Override 344 public void onNext(Messages.SimpleResponse value) { 345 346 } 347 348 @Override 349 public void onError(Throwable t) { 350 maxOutstanding.release(); 351 Level level = shutdown ? Level.FINE : Level.INFO; 352 log.log(level, "Error in AsyncUnary call", t); 353 } 354 355 @Override 356 public void onCompleted() { 357 delay(System.nanoTime() - now); 358 maxOutstanding.release(); 359 } 360 }); 361 } 362 } 363 } 364 365 /** 366 * Worker which executes a streaming ping-pong call. Event timing is the duration between 367 * sending the ping and receiving the pong. 368 */ 369 private class AsyncPingPongWorker implements Runnable { 370 final BenchmarkServiceGrpc.BenchmarkServiceStub stub; 371 final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel()); 372 AsyncPingPongWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub)373 AsyncPingPongWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub) { 374 this.stub = stub; 375 } 376 377 @Override run()378 public void run() { 379 while (!shutdown) { 380 maxOutstanding.acquireUninterruptibly(); 381 final AtomicReference<StreamObserver<Messages.SimpleRequest>> requestObserver = 382 new AtomicReference<>(); 383 requestObserver.set(stub.streamingCall( 384 new StreamObserver<Messages.SimpleResponse>() { 385 long now = System.nanoTime(); 386 387 @Override 388 public void onNext(Messages.SimpleResponse value) { 389 delay(System.nanoTime() - now); 390 if (shutdown) { 391 requestObserver.get().onCompleted(); 392 // Must not send another request. 393 return; 394 } 395 requestObserver.get().onNext(simpleRequest); 396 now = System.nanoTime(); 397 } 398 399 @Override 400 public void onError(Throwable t) { 401 maxOutstanding.release(); 402 Level level = shutdown ? Level.FINE : Level.INFO; 403 log.log(level, "Error in Async Ping-Pong call", t); 404 405 } 406 407 @Override 408 public void onCompleted() { 409 maxOutstanding.release(); 410 } 411 })); 412 requestObserver.get().onNext(simpleRequest); 413 } 414 } 415 } 416 417 /** 418 * Worker which executes generic blocking unary calls. Event timing is the duration between 419 * sending the request and receiving the response. 420 */ 421 private class GenericBlockingUnaryWorker implements Runnable { 422 final Channel channel; 423 GenericBlockingUnaryWorker(Channel channel)424 GenericBlockingUnaryWorker(Channel channel) { 425 this.channel = channel; 426 } 427 428 @Override run()429 public void run() { 430 long now; 431 while (!shutdown) { 432 now = System.nanoTime(); 433 ClientCalls.blockingUnaryCall(channel, LoadServer.GENERIC_UNARY_METHOD, 434 CallOptions.DEFAULT, 435 genericRequest.slice()); 436 delay(System.nanoTime() - now); 437 } 438 } 439 } 440 441 /** 442 * Worker which executes generic async unary calls. Event timing is the duration between 443 * sending the request and receiving the response. 444 */ 445 private class GenericAsyncUnaryWorker implements Runnable { 446 final Channel channel; 447 final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel()); 448 GenericAsyncUnaryWorker(Channel channel)449 GenericAsyncUnaryWorker(Channel channel) { 450 this.channel = channel; 451 } 452 453 @Override run()454 public void run() { 455 while (true) { 456 maxOutstanding.acquireUninterruptibly(); 457 if (shutdown) { 458 maxOutstanding.release(); 459 return; 460 } 461 ClientCalls.asyncUnaryCall( 462 channel.newCall(LoadServer.GENERIC_UNARY_METHOD, CallOptions.DEFAULT), 463 genericRequest.slice(), 464 new StreamObserver<ByteBuf>() { 465 long now = System.nanoTime(); 466 467 @Override 468 public void onNext(ByteBuf value) { 469 470 } 471 472 @Override 473 public void onError(Throwable t) { 474 maxOutstanding.release(); 475 Level level = shutdown ? Level.FINE : Level.INFO; 476 log.log(level, "Error in Generic Async Unary call", t); 477 } 478 479 @Override 480 public void onCompleted() { 481 delay(System.nanoTime() - now); 482 maxOutstanding.release(); 483 } 484 }); 485 } 486 } 487 } 488 489 /** 490 * Worker which executes a streaming ping-pong call. Event timing is the duration between 491 * sending the ping and receiving the pong. 492 */ 493 private class GenericAsyncPingPongWorker implements Runnable { 494 final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel()); 495 final Channel channel; 496 GenericAsyncPingPongWorker(Channel channel)497 GenericAsyncPingPongWorker(Channel channel) { 498 this.channel = channel; 499 } 500 501 @Override run()502 public void run() { 503 while (true) { 504 maxOutstanding.acquireUninterruptibly(); 505 if (shutdown) { 506 maxOutstanding.release(); 507 return; 508 } 509 final ClientCall<ByteBuf, ByteBuf> call = 510 channel.newCall(LoadServer.GENERIC_STREAMING_PING_PONG_METHOD, CallOptions.DEFAULT); 511 call.start(new ClientCall.Listener<ByteBuf>() { 512 long now = System.nanoTime(); 513 514 @Override 515 public void onMessage(ByteBuf message) { 516 delay(System.nanoTime() - now); 517 if (shutdown) { 518 call.cancel("Shutting down", null); 519 return; 520 } 521 call.request(1); 522 call.sendMessage(genericRequest.slice()); 523 now = System.nanoTime(); 524 } 525 526 @Override 527 public void onClose(Status status, Metadata trailers) { 528 maxOutstanding.release(); 529 Level level = shutdown ? Level.FINE : Level.INFO; 530 if (!status.isOk() && status.getCode() != Status.Code.CANCELLED) { 531 log.log(level, "Error in Generic Async Ping-Pong call", status.getCause()); 532 } 533 } 534 }, new Metadata()); 535 call.request(1); 536 call.sendMessage(genericRequest.slice()); 537 } 538 } 539 } 540 } 541