1 /* 2 * Copyright 2017 Google LLC 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions are 6 * met: 7 * 8 * * Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above 11 * copyright notice, this list of conditions and the following disclaimer 12 * in the documentation and/or other materials provided with the 13 * distribution. 14 * * Neither the name of Google LLC nor the names of its 15 * contributors may be used to endorse or promote products derived from 16 * this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 package com.google.api.gax.retrying; 32 33 import static com.google.common.base.Preconditions.checkNotNull; 34 35 import com.google.api.core.ApiFuture; 36 import com.google.api.core.ApiFutures; 37 import com.google.api.gax.tracing.ApiTracer; 38 import com.google.common.util.concurrent.AbstractFuture; 39 import com.google.common.util.concurrent.MoreExecutors; 40 import java.util.concurrent.Callable; 41 import java.util.concurrent.CancellationException; 42 import java.util.concurrent.ExecutionException; 43 import java.util.concurrent.RejectedExecutionException; 44 import java.util.logging.Level; 45 import java.util.logging.Logger; 46 47 /** 48 * For internal use only. 49 * 50 * <p>Basic implementation of {@link RetryingFuture} interface. On its own, suitable for usage in 51 * 'busy loop' retry implementations. Can be used as the basis for more advanced implementations. 52 * 53 * <p>This class is thread-safe. 54 */ 55 class BasicRetryingFuture<ResponseT> extends AbstractFuture<ResponseT> 56 implements RetryingFuture<ResponseT> { 57 58 final Object lock = new Object(); 59 60 private final Callable<ResponseT> callable; 61 62 private final RetryAlgorithm<ResponseT> retryAlgorithm; 63 private final RetryingContext retryingContext; 64 65 private volatile TimedAttemptSettings attemptSettings; 66 67 private volatile ApiFuture<ResponseT> latestCompletedAttemptResult; 68 private volatile ApiFuture<ResponseT> attemptResult; 69 70 private static final Logger LOG = Logger.getLogger(BasicRetryingFuture.class.getName()); 71 BasicRetryingFuture( Callable<ResponseT> callable, RetryAlgorithm<ResponseT> retryAlgorithm, RetryingContext context)72 BasicRetryingFuture( 73 Callable<ResponseT> callable, 74 RetryAlgorithm<ResponseT> retryAlgorithm, 75 RetryingContext context) { 76 this.callable = checkNotNull(callable); 77 this.retryAlgorithm = checkNotNull(retryAlgorithm); 78 this.retryingContext = checkNotNull(context); 79 80 this.attemptSettings = retryAlgorithm.createFirstAttempt(context); 81 82 // A micro crime, letting "this" reference to escape from constructor before initialization is 83 // completed (via internal non-static class CompletionListener). But it is guaranteed to be ok, 84 // since listener is guaranteed to be called only after this future is 85 // completed and this future is guaranteed to be completed only after it is initialized. Also 86 // since "super" is called explicitly here there are no unexpected overrides of addListener 87 // here. 88 super.addListener(new CompletionListener(), MoreExecutors.directExecutor()); 89 } 90 91 @Override setAttemptFuture(ApiFuture<ResponseT> attemptFuture)92 public void setAttemptFuture(ApiFuture<ResponseT> attemptFuture) { 93 try { 94 if (isDone()) { 95 return; 96 } 97 ResponseT response = attemptFuture.get(); 98 handleAttempt(null, response); 99 } catch (ExecutionException e) { 100 handleAttempt(e.getCause(), null); 101 } catch (Throwable e) { 102 handleAttempt(e, null); 103 } 104 } 105 106 @Override getCallable()107 public Callable<ResponseT> getCallable() { 108 return callable; 109 } 110 111 @Override getAttemptSettings()112 public TimedAttemptSettings getAttemptSettings() { 113 synchronized (lock) { 114 return attemptSettings; 115 } 116 } 117 118 @Override peekAttemptResult()119 public ApiFuture<ResponseT> peekAttemptResult() { 120 synchronized (lock) { 121 return latestCompletedAttemptResult; 122 } 123 } 124 125 // Lazily initializes attempt result. This allows to prevent overhead of relatively 126 // heavy (and in most cases redundant) settable future instantiation on each attempt, plus reduces 127 // possibility of callback chaining going into an infinite loop in case of buggy external 128 // callbacks implementation. 129 @Override getAttemptResult()130 public ApiFuture<ResponseT> getAttemptResult() { 131 synchronized (lock) { 132 if (attemptResult == null) { 133 attemptResult = new NonCancellableFuture<>(); 134 } 135 return attemptResult; 136 } 137 } 138 139 // Called in the beginning of handleAttempt() and completion listeners to cleanup all attempt 140 // service data (callbacks and stuff) clearAttemptServiceData()141 void clearAttemptServiceData() { 142 // no-op for the basic implementation 143 } 144 145 // "super." is used here to avoid infinite loops of callback chains handleAttempt(Throwable throwable, ResponseT response)146 void handleAttempt(Throwable throwable, ResponseT response) { 147 ApiTracer tracer = retryingContext.getTracer(); 148 149 synchronized (lock) { 150 try { 151 clearAttemptServiceData(); 152 if (throwable instanceof CancellationException) { 153 // An attempt triggered cancellation. 154 // In almost all cases, the operation caller caused the attempt to trigger the 155 // cancellation by invoking cancel() on the CallbackChainRetryingFuture, which cancelled 156 // the current attempt. 157 // In a theoretical scenario, the attempt callable might've thrown the exception on its 158 // own volition. However it's currently impossible to disambiguate the 2 scenarios. 159 tracer.attemptCancelled(); 160 super.cancel(false); 161 } else if (throwable instanceof RejectedExecutionException) { 162 // external executor cannot continue retrying 163 tracer.attemptPermanentFailure(throwable); 164 super.setException(throwable); 165 } 166 if (isDone()) { 167 return; 168 } 169 170 TimedAttemptSettings nextAttemptSettings = 171 retryAlgorithm.createNextAttempt(retryingContext, throwable, response, attemptSettings); 172 boolean shouldRetry = 173 retryAlgorithm.shouldRetry(retryingContext, throwable, response, nextAttemptSettings); 174 if (shouldRetry) { 175 // Log retry info 176 if (LOG.isLoggable(Level.FINEST)) { 177 LOG.log( 178 Level.FINEST, 179 "Retrying with:\n{0}\n{1}\n{2}\n{3}", 180 new Object[] { 181 "enclosingMethod: " 182 + (callable.getClass().getEnclosingMethod() != null 183 ? callable.getClass().getEnclosingMethod().getName() 184 : ""), 185 "attemptCount: " + attemptSettings.getAttemptCount(), 186 "delay: " + attemptSettings.getRetryDelay(), 187 "retriableException: " + throwable 188 }); 189 } 190 tracer.attemptFailed(throwable, nextAttemptSettings.getRandomizedRetryDelay()); 191 attemptSettings = nextAttemptSettings; 192 setAttemptResult(throwable, response, true); 193 // a new attempt will be (must be) scheduled by an external executor 194 } else if (throwable != null) { 195 if (retryAlgorithm.shouldRetryBasedOnResult(retryingContext, throwable, response)) { 196 tracer.attemptFailedRetriesExhausted(throwable); 197 } else { 198 tracer.attemptPermanentFailure(throwable); 199 } 200 super.setException(throwable); 201 } else { 202 tracer.attemptSucceeded(); 203 super.set(response); 204 } 205 } catch (CancellationException e) { 206 // A retry algorithm triggered cancellation. 207 tracer.attemptFailedRetriesExhausted(e); 208 super.cancel(false); 209 } catch (Exception e) { 210 // Should never happen, but still possible in case of buggy retry algorithm implementation. 211 // Any bugs/exceptions (except CancellationException) in retry algorithms immediately 212 // terminate retrying future and set the result to the thrown exception. 213 tracer.attemptPermanentFailure(e); 214 super.setException(e); 215 } 216 } 217 } 218 219 // Sets attempt result futures. Note the "attempt result future" and "attempt future" are not same 220 // things because there are more attempt futures than attempt result futures. 221 // See AttemptCallable.call() for an example of such condition. 222 // 223 // The assignments order in this method is crucial. Wrong ordering may lead to infinite 224 // loops in callback chains. 225 // 226 // If this is not the last attempt this method sets attemptResult to null, so the next 227 // getAttemptResult() call will return a new future, tracking the new attempt. Otherwise 228 // attemptResult is set to the same result as the one returned by peekAttemptResult(), indicating 229 // that the ultimate unmodifiable result of the whole future was reached. setAttemptResult(Throwable throwable, ResponseT response, boolean shouldRetry)230 private void setAttemptResult(Throwable throwable, ResponseT response, boolean shouldRetry) { 231 ApiFuture<ResponseT> prevAttemptResult = attemptResult; 232 try { 233 if (throwable instanceof CancellationException) { 234 NonCancellableFuture<ResponseT> future = new NonCancellableFuture<>(); 235 future.cancelPrivately(); 236 latestCompletedAttemptResult = future; 237 attemptResult = shouldRetry ? null : latestCompletedAttemptResult; 238 if (prevAttemptResult instanceof NonCancellableFuture) { 239 ((NonCancellableFuture<ResponseT>) prevAttemptResult).cancelPrivately(); 240 } 241 } else if (throwable != null) { 242 latestCompletedAttemptResult = ApiFutures.immediateFailedFuture(throwable); 243 attemptResult = shouldRetry ? null : latestCompletedAttemptResult; 244 if (prevAttemptResult instanceof NonCancellableFuture) { 245 ((NonCancellableFuture<ResponseT>) prevAttemptResult).setExceptionPrivately(throwable); 246 } 247 } else { 248 latestCompletedAttemptResult = ApiFutures.immediateFuture(response); 249 attemptResult = shouldRetry ? null : latestCompletedAttemptResult; 250 if (prevAttemptResult instanceof NonCancellableFuture) { 251 ((NonCancellableFuture<ResponseT>) prevAttemptResult).setPrivately(response); 252 } 253 } 254 } catch (Exception e) { 255 // Usually should not happen but is still possible, for example if one of the attempt result 256 // callbacks throws an exception. An example of such condition is the OperationFuture 257 // which uses ApiFutures.transform(), which actually assign callbacks to the attempt result 258 // futures, and those can fail, for example if metadata class is a wrong one. 259 // 260 // The exception is swallowed to avoid buggy callback implementations breaking retrying future 261 // execution. In case if a callback is executed in a separate thread executor (the recommended 262 // way) the exception will be thrown in a separate thread and will not be swallowed by this 263 // catch block anyways. 264 } 265 } 266 267 private class CompletionListener implements Runnable { 268 @Override run()269 public void run() { 270 synchronized (lock) { 271 try { 272 clearAttemptServiceData(); 273 ResponseT response = get(); 274 setAttemptResult(null, response, false); 275 } catch (ExecutionException e) { 276 setAttemptResult(e.getCause(), null, false); 277 } catch (Throwable e) { 278 setAttemptResult(e, null, false); 279 } 280 } 281 } 282 } 283 } 284