1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.utils; 17 18 import java.util.concurrent.CompletableFuture; 19 import java.util.concurrent.CompletionException; 20 import java.util.concurrent.ExecutionException; 21 import java.util.concurrent.Executor; 22 import java.util.function.Function; 23 import software.amazon.awssdk.annotations.SdkProtectedApi; 24 25 /** 26 * Utility class for working with {@link CompletableFuture}. 27 */ 28 @SdkProtectedApi 29 public final class CompletableFutureUtils { 30 private static final Logger log = Logger.loggerFor(CompletableFutureUtils.class); 31 CompletableFutureUtils()32 private CompletableFutureUtils() { 33 } 34 35 /** 36 * Convenience method for creating a future that is immediately completed 37 * exceptionally with the given {@code Throwable}. 38 * <p> 39 * Similar to {@code CompletableFuture#failedFuture} which was added in 40 * Java 9. 41 * 42 * @param t The failure. 43 * @param <U> The type of the element. 44 * @return The failed future. 45 */ failedFuture(Throwable t)46 public static <U> CompletableFuture<U> failedFuture(Throwable t) { 47 CompletableFuture<U> cf = new CompletableFuture<>(); 48 cf.completeExceptionally(t); 49 return cf; 50 } 51 52 /** 53 * Wraps the given error in a {@link CompletionException} if necessary. 54 * Useful if an exception needs to be rethrown from within {@link 55 * CompletableFuture#handle(java.util.function.BiFunction)} or similar 56 * methods. 57 * 58 * @param t The error. 59 * @return The error as a CompletionException. 60 */ errorAsCompletionException(Throwable t)61 public static CompletionException errorAsCompletionException(Throwable t) { 62 if (t instanceof CompletionException) { 63 return (CompletionException) t; 64 } 65 return new CompletionException(t); 66 } 67 68 /** 69 * Forward the {@code Throwable} from {@code src} to {@code dst}. 70 71 * @param src The source of the {@code Throwable}. 72 * @param dst The destination where the {@code Throwable} will be forwarded to. 73 * 74 * @return {@code src}. 75 */ forwardExceptionTo(CompletableFuture<T> src, CompletableFuture<?> dst)76 public static <T> CompletableFuture<T> forwardExceptionTo(CompletableFuture<T> src, CompletableFuture<?> dst) { 77 src.whenComplete((r, e) -> { 78 if (e != null) { 79 dst.completeExceptionally(e); 80 } 81 }); 82 return src; 83 } 84 85 86 /** 87 * Forward the {@code Throwable} that can be transformed as per the transformationFunction 88 * from {@code src} to {@code dst}. 89 * @param src The source of the {@code Throwable}. 90 * @param dst The destination where the {@code Throwable} will be forwarded to 91 * @param transformationFunction Transformation function taht will be applied on to the forwarded exception. 92 * @return 93 */ forwardTransformedExceptionTo(CompletableFuture<T> src, CompletableFuture<?> dst, Function<Throwable, Throwable> transformationFunction)94 public static <T> CompletableFuture<T> forwardTransformedExceptionTo(CompletableFuture<T> src, 95 CompletableFuture<?> dst, 96 Function<Throwable, Throwable> 97 transformationFunction) { 98 src.whenComplete((r, e) -> { 99 if (e != null) { 100 dst.completeExceptionally(transformationFunction.apply(e)); 101 } 102 }); 103 return src; 104 } 105 106 /** 107 * Completes the {@code dst} future based on the result of the {@code src} future asynchronously on 108 * the provided {@link Executor} and return the {@code src} future. 109 * 110 * @param src The source {@link CompletableFuture} 111 * @param dst The destination where the {@code Throwable} or response will be forwarded to. 112 * @return the {@code src} future. 113 */ forwardResultTo(CompletableFuture<T> src, CompletableFuture<T> dst)114 public static <T> CompletableFuture<T> forwardResultTo(CompletableFuture<T> src, 115 CompletableFuture<T> dst) { 116 src.whenComplete((r, e) -> { 117 if (e != null) { 118 dst.completeExceptionally(e); 119 } else { 120 dst.complete(r); 121 } 122 }); 123 124 return src; 125 } 126 127 /** 128 * Completes the {@code dst} future based on the result of the {@code src} future asynchronously on 129 * the provided {@link Executor} and return the {@code src} future. 130 * 131 * @param src The source {@link CompletableFuture} 132 * @param dst The destination where the {@code Throwable} or response will be forwarded to. 133 * @param executor the executor to complete the des future 134 * @return the {@code src} future. 135 */ forwardResultTo(CompletableFuture<T> src, CompletableFuture<T> dst, Executor executor)136 public static <T> CompletableFuture<T> forwardResultTo(CompletableFuture<T> src, 137 CompletableFuture<T> dst, 138 Executor executor) { 139 src.whenCompleteAsync((r, e) -> { 140 if (e != null) { 141 dst.completeExceptionally(e); 142 } else { 143 dst.complete(r); 144 } 145 }, executor); 146 147 return src; 148 } 149 150 /** 151 * Completes the {@code dst} future based on the result of the {@code src} future, synchronously, 152 * after applying the provided transformation {@link Function} if successful. 153 * 154 * @param src The source {@link CompletableFuture} 155 * @param dst The destination where the {@code Throwable} or transformed result will be forwarded to. 156 * @return the {@code src} future. 157 */ forwardTransformedResultTo(CompletableFuture<SourceT> src, CompletableFuture<DestT> dst, Function<SourceT, DestT> function)158 public static <SourceT, DestT> CompletableFuture<SourceT> forwardTransformedResultTo(CompletableFuture<SourceT> src, 159 CompletableFuture<DestT> dst, 160 Function<SourceT, DestT> function) { 161 src.whenComplete((r, e) -> { 162 if (e != null) { 163 dst.completeExceptionally(e); 164 } else { 165 dst.complete(function.apply(r)); 166 } 167 }); 168 169 return src; 170 } 171 172 /** 173 * Similar to {@link CompletableFuture#allOf(CompletableFuture[])}, but 174 * when any future is completed exceptionally, forwards the 175 * exception to other futures. 176 * 177 * @param futures The futures. 178 * @return The new future that is completed when all the futures in {@code 179 * futures} are. 180 */ allOfExceptionForwarded(CompletableFuture<?>[] futures)181 public static CompletableFuture<Void> allOfExceptionForwarded(CompletableFuture<?>[] futures) { 182 183 CompletableFuture<Void> anyFail = anyFail(futures); 184 185 anyFail.whenComplete((r, t) -> { 186 if (t != null) { 187 for (CompletableFuture<?> cf : futures) { 188 cf.completeExceptionally(t); 189 } 190 } 191 }); 192 193 return CompletableFuture.allOf(futures); 194 } 195 196 /** 197 * Returns a new CompletableFuture that is completed when any of 198 * the given CompletableFutures completes exceptionally. 199 * 200 * @param futures the CompletableFutures 201 * @return a new CompletableFuture that is completed if any provided 202 * future completed exceptionally. 203 */ anyFail(CompletableFuture<?>[] futures)204 static CompletableFuture<Void> anyFail(CompletableFuture<?>[] futures) { 205 CompletableFuture<Void> completableFuture = new CompletableFuture<>(); 206 207 for (CompletableFuture<?> future : futures) { 208 future.whenComplete((r, t) -> { 209 if (t != null) { 210 completableFuture.completeExceptionally(t); 211 } 212 }); 213 } 214 215 return completableFuture; 216 } 217 joinInterruptibly(CompletableFuture<T> future)218 public static <T> T joinInterruptibly(CompletableFuture<T> future) { 219 try { 220 return future.get(); 221 } catch (InterruptedException e) { 222 Thread.currentThread().interrupt(); 223 throw new CompletionException("Interrupted while waiting on a future.", e); 224 } catch (ExecutionException e) { 225 Throwable cause = e.getCause(); 226 if (cause instanceof Error) { 227 throw (Error) cause; 228 } 229 throw new CompletionException(cause); 230 } 231 } 232 joinInterruptiblyIgnoringFailures(CompletableFuture<?> future)233 public static void joinInterruptiblyIgnoringFailures(CompletableFuture<?> future) { 234 try { 235 future.get(); 236 } catch (InterruptedException e) { 237 Thread.currentThread().interrupt(); 238 } catch (ExecutionException e) { 239 // Ignore 240 } 241 } 242 243 /** 244 * Joins (interruptibly) on the future, and re-throws any RuntimeExceptions or Errors just like the async task would have 245 * thrown if it was executed synchronously. 246 */ joinLikeSync(CompletableFuture<T> future)247 public static <T> T joinLikeSync(CompletableFuture<T> future) { 248 try { 249 return joinInterruptibly(future); 250 } catch (CompletionException e) { 251 Throwable cause = e.getCause(); 252 if (cause instanceof RuntimeException) { 253 // Make sure we don't lose the context of where the join is in the stack... 254 cause.addSuppressed(new RuntimeException("Task failed.")); 255 throw (RuntimeException) cause; 256 } 257 throw e; 258 } 259 } 260 } 261