1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import com.google.common.base.Stopwatch; 20 import com.google.common.base.Supplier; 21 import com.google.common.base.Ticker; 22 import com.google.common.util.concurrent.AbstractFuture; 23 import io.grpc.Deadline; 24 import java.util.ArrayList; 25 import java.util.Collection; 26 import java.util.List; 27 import java.util.concurrent.Callable; 28 import java.util.concurrent.Delayed; 29 import java.util.concurrent.Future; 30 import java.util.concurrent.LinkedBlockingQueue; 31 import java.util.concurrent.PriorityBlockingQueue; 32 import java.util.concurrent.ScheduledExecutorService; 33 import java.util.concurrent.ScheduledFuture; 34 import java.util.concurrent.TimeUnit; 35 36 /** 37 * A manipulated clock that exports a {@link Ticker} and a {@link ScheduledExecutorService}. 38 * 39 * <p>To simulate the locking scenario of using real executors, it never runs tasks within {@code 40 * schedule()} or {@code execute()}. Instead, you should call {@link #runDueTasks} in your test 41 * method to run all due tasks. {@link #forwardTime} and {@link #forwardNanos} call {@link 42 * #runDueTasks} automatically. 43 */ 44 public final class FakeClock { 45 46 private static final TaskFilter ACCEPT_ALL_FILTER = new TaskFilter() { 47 @Override 48 public boolean shouldAccept(Runnable command) { 49 return true; 50 } 51 }; 52 53 private final ScheduledExecutorService scheduledExecutorService = new ScheduledExecutorImpl(); 54 55 private final PriorityBlockingQueue<ScheduledTask> scheduledTasks = new PriorityBlockingQueue<>(); 56 private final LinkedBlockingQueue<ScheduledTask> dueTasks = new LinkedBlockingQueue<>(); 57 58 private final Ticker ticker = 59 new Ticker() { 60 @Override public long read() { 61 return currentTimeNanos; 62 } 63 }; 64 65 private final Deadline.Ticker deadlineTicker = 66 new Deadline.Ticker() { 67 @Override public long nanoTime() { 68 return currentTimeNanos; 69 } 70 }; 71 72 private final Supplier<Stopwatch> stopwatchSupplier = 73 new Supplier<Stopwatch>() { 74 @Override public Stopwatch get() { 75 return Stopwatch.createUnstarted(ticker); 76 } 77 }; 78 79 private final TimeProvider timeProvider = 80 new TimeProvider() { 81 @Override 82 public long currentTimeNanos() { 83 return currentTimeNanos; 84 } 85 }; 86 87 private long currentTimeNanos; 88 89 public class ScheduledTask extends AbstractFuture<Void> implements ScheduledFuture<Void> { 90 public final Runnable command; 91 public long dueTimeNanos; 92 ScheduledTask(Runnable command)93 ScheduledTask(Runnable command) { 94 this.command = command; 95 } 96 run()97 void run() { 98 command.run(); 99 set(null); 100 } 101 setDueTimeNanos(long dueTimeNanos)102 void setDueTimeNanos(long dueTimeNanos) { 103 this.dueTimeNanos = dueTimeNanos; 104 } 105 cancel(boolean mayInterruptIfRunning)106 @Override public boolean cancel(boolean mayInterruptIfRunning) { 107 scheduledTasks.remove(this); 108 dueTasks.remove(this); 109 return super.cancel(mayInterruptIfRunning); 110 } 111 getDelay(TimeUnit unit)112 @Override public long getDelay(TimeUnit unit) { 113 return unit.convert(dueTimeNanos - currentTimeNanos, TimeUnit.NANOSECONDS); 114 } 115 compareTo(Delayed other)116 @Override public int compareTo(Delayed other) { 117 ScheduledTask otherTask = (ScheduledTask) other; 118 if (dueTimeNanos > otherTask.dueTimeNanos) { 119 return 1; 120 } else if (dueTimeNanos < otherTask.dueTimeNanos) { 121 return -1; 122 } else { 123 return 0; 124 } 125 } 126 127 @Override toString()128 public String toString() { 129 return "[due=" + dueTimeNanos + ", task=" + command + "]"; 130 } 131 } 132 133 private class ScheduledExecutorImpl implements ScheduledExecutorService { schedule( Callable<V> callable, long delay, TimeUnit unit)134 @Override public <V> ScheduledFuture<V> schedule( 135 Callable<V> callable, long delay, TimeUnit unit) { 136 throw new UnsupportedOperationException(); 137 } 138 schedule(ScheduledTask task, long delay, TimeUnit unit)139 private void schedule(ScheduledTask task, long delay, TimeUnit unit) { 140 task.setDueTimeNanos(currentTimeNanos + unit.toNanos(delay)); 141 if (delay > 0) { 142 scheduledTasks.add(task); 143 } else { 144 dueTasks.add(task); 145 } 146 } 147 schedule(Runnable cmd, long delay, TimeUnit unit)148 @Override public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) { 149 ScheduledTask task = new ScheduledTask(cmd); 150 schedule(task, delay, unit); 151 return task; 152 } 153 scheduleAtFixedRate( Runnable cmd, long initialDelay, long period, TimeUnit unit)154 @Override public ScheduledFuture<?> scheduleAtFixedRate( 155 Runnable cmd, long initialDelay, long period, TimeUnit unit) { 156 ScheduledTask task = new ScheduleAtFixedRateTask(cmd, period, unit); 157 schedule(task, initialDelay, unit); 158 return task; 159 } 160 scheduleWithFixedDelay( Runnable cmd, long initialDelay, long delay, TimeUnit unit)161 @Override public ScheduledFuture<?> scheduleWithFixedDelay( 162 Runnable cmd, long initialDelay, long delay, TimeUnit unit) { 163 ScheduledTask task = new ScheduleWithFixedDelayTask(cmd, delay, unit); 164 schedule(task, initialDelay, unit); 165 return task; 166 } 167 awaitTermination(long timeout, TimeUnit unit)168 @Override public boolean awaitTermination(long timeout, TimeUnit unit) { 169 throw new UnsupportedOperationException(); 170 } 171 invokeAll(Collection<? extends Callable<T>> tasks)172 @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 173 throw new UnsupportedOperationException(); 174 } 175 invokeAll( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)176 @Override public <T> List<Future<T>> invokeAll( 177 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) { 178 throw new UnsupportedOperationException(); 179 } 180 invokeAny(Collection<? extends Callable<T>> tasks)181 @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) { 182 throw new UnsupportedOperationException(); 183 } 184 invokeAny( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)185 @Override public <T> T invokeAny( 186 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) { 187 throw new UnsupportedOperationException(); 188 } 189 isShutdown()190 @Override public boolean isShutdown() { 191 throw new UnsupportedOperationException(); 192 } 193 isTerminated()194 @Override public boolean isTerminated() { 195 throw new UnsupportedOperationException(); 196 } 197 shutdown()198 @Override public void shutdown() { 199 throw new UnsupportedOperationException(); 200 } 201 shutdownNow()202 @Override public List<Runnable> shutdownNow() { 203 throw new UnsupportedOperationException(); 204 } 205 submit(Callable<T> task)206 @Override public <T> Future<T> submit(Callable<T> task) { 207 throw new UnsupportedOperationException(); 208 } 209 submit(Runnable task)210 @Override public Future<?> submit(Runnable task) { 211 throw new UnsupportedOperationException(); 212 } 213 submit(Runnable task, T result)214 @Override public <T> Future<T> submit(Runnable task, T result) { 215 throw new UnsupportedOperationException(); 216 } 217 execute(Runnable command)218 @Override public void execute(Runnable command) { 219 // Since it is being enqueued immediately, no point in tracing the future for cancellation. 220 Future<?> unused = schedule(command, 0, TimeUnit.NANOSECONDS); 221 } 222 223 class ScheduleAtFixedRateTask extends ScheduledTask { 224 final long periodNanos; 225 ScheduleAtFixedRateTask(Runnable command, long period, TimeUnit unit)226 public ScheduleAtFixedRateTask(Runnable command, long period, TimeUnit unit) { 227 super(command); 228 this.periodNanos = unit.toNanos(period); 229 } 230 run()231 @Override void run() { 232 long startTimeNanos = currentTimeNanos; 233 command.run(); 234 if (!isCancelled()) { 235 schedule(this, startTimeNanos + periodNanos - currentTimeNanos, TimeUnit.NANOSECONDS); 236 } 237 } 238 } 239 240 class ScheduleWithFixedDelayTask extends ScheduledTask { 241 242 final long delayNanos; 243 ScheduleWithFixedDelayTask(Runnable command, long delay, TimeUnit unit)244 ScheduleWithFixedDelayTask(Runnable command, long delay, TimeUnit unit) { 245 super(command); 246 this.delayNanos = unit.toNanos(delay); 247 } 248 249 @Override run()250 void run() { 251 command.run(); 252 if (!isCancelled()) { 253 schedule(this, delayNanos, TimeUnit.NANOSECONDS); 254 } 255 } 256 } 257 } 258 259 /** 260 * Provides a partially implemented instance of {@link ScheduledExecutorService} that uses the 261 * fake clock ticker for testing. 262 */ getScheduledExecutorService()263 public ScheduledExecutorService getScheduledExecutorService() { 264 return scheduledExecutorService; 265 } 266 267 /** 268 * Provides a {@link TimeProvider} that is backed by this FakeClock. 269 */ getTimeProvider()270 public TimeProvider getTimeProvider() { 271 return timeProvider; 272 } 273 274 /** 275 * Provides a stopwatch instance that uses the fake clock ticker. 276 */ getStopwatchSupplier()277 public Supplier<Stopwatch> getStopwatchSupplier() { 278 return stopwatchSupplier; 279 } 280 281 /** 282 * Ticker of the FakeClock. 283 */ getTicker()284 public Ticker getTicker() { 285 return ticker; 286 } 287 288 /** 289 * Deadline ticker of the FakeClock. 290 */ getDeadlineTicker()291 public Deadline.Ticker getDeadlineTicker() { 292 return deadlineTicker; 293 } 294 295 /** 296 * Run all due tasks. Immediately due tasks that are queued during the process also get executed. 297 * 298 * @return the number of tasks run by this call 299 */ runDueTasks()300 public int runDueTasks() { 301 int count = 0; 302 while (true) { 303 checkDueTasks(); 304 if (dueTasks.isEmpty()) { 305 break; 306 } 307 ScheduledTask task; 308 while ((task = dueTasks.poll()) != null) { 309 task.run(); 310 count++; 311 } 312 } 313 return count; 314 } 315 checkDueTasks()316 private void checkDueTasks() { 317 while (true) { 318 ScheduledTask task = scheduledTasks.peek(); 319 if (task == null || task.dueTimeNanos > currentTimeNanos) { 320 break; 321 } 322 if (scheduledTasks.remove(task)) { 323 dueTasks.add(task); 324 } 325 } 326 } 327 328 /** 329 * Return all due tasks. 330 */ getDueTasks()331 public Collection<ScheduledTask> getDueTasks() { 332 checkDueTasks(); 333 return new ArrayList<>(dueTasks); 334 } 335 336 /** 337 * Return all unrun tasks. 338 */ getPendingTasks()339 public Collection<ScheduledTask> getPendingTasks() { 340 return getPendingTasks(ACCEPT_ALL_FILTER); 341 } 342 343 /** 344 * Return all unrun tasks accepted by the given filter. 345 */ getPendingTasks(TaskFilter filter)346 public Collection<ScheduledTask> getPendingTasks(TaskFilter filter) { 347 ArrayList<ScheduledTask> result = new ArrayList<>(); 348 for (ScheduledTask task : dueTasks) { 349 if (filter.shouldAccept(task.command)) { 350 result.add(task); 351 } 352 } 353 for (ScheduledTask task : scheduledTasks) { 354 if (filter.shouldAccept(task.command)) { 355 result.add(task); 356 } 357 } 358 return result; 359 } 360 361 /** 362 * Forward the time by the given duration and run all due tasks. 363 * 364 * @return the number of tasks run by this call 365 */ forwardTime(long value, TimeUnit unit)366 public int forwardTime(long value, TimeUnit unit) { 367 currentTimeNanos += unit.toNanos(value); 368 return runDueTasks(); 369 } 370 371 /** 372 * Forward the time by the given nanoseconds and run all due tasks. 373 * 374 * @return the number of tasks run by this call 375 */ forwardNanos(long nanos)376 public int forwardNanos(long nanos) { 377 return forwardTime(nanos, TimeUnit.NANOSECONDS); 378 } 379 380 /** 381 * Return the number of queued tasks. 382 */ numPendingTasks()383 public int numPendingTasks() { 384 return dueTasks.size() + scheduledTasks.size(); 385 } 386 387 /** 388 * Return the number of queued tasks accepted by the given filter. 389 */ numPendingTasks(TaskFilter filter)390 public int numPendingTasks(TaskFilter filter) { 391 int count = 0; 392 for (ScheduledTask task : dueTasks) { 393 if (filter.shouldAccept(task.command)) { 394 count++; 395 } 396 } 397 for (ScheduledTask task : scheduledTasks) { 398 if (filter.shouldAccept(task.command)) { 399 count++; 400 } 401 } 402 return count; 403 } 404 currentTimeMillis()405 public long currentTimeMillis() { 406 // Normally millis and nanos are of different epochs. Add an offset to simulate that. 407 return TimeUnit.NANOSECONDS.toMillis(currentTimeNanos + 1234567890123456789L); 408 } 409 410 /** 411 * A filter that allows us to have fine grained control over which tasks are accepted for certain 412 * operation. 413 */ 414 public interface TaskFilter { 415 /** 416 * Inspect the Runnable and returns true if it should be accepted. 417 */ shouldAccept(Runnable runnable)418 boolean shouldAccept(Runnable runnable); 419 } 420 } 421