1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.utils;
17 
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.CompletionException;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Executor;
22 import java.util.function.Function;
23 import software.amazon.awssdk.annotations.SdkProtectedApi;
24 
25 /**
26  * Utility class for working with {@link CompletableFuture}.
27  */
28 @SdkProtectedApi
29 public final class CompletableFutureUtils {
30     private static final Logger log = Logger.loggerFor(CompletableFutureUtils.class);
31 
CompletableFutureUtils()32     private CompletableFutureUtils() {
33     }
34 
35     /**
36      * Convenience method for creating a future that is immediately completed
37      * exceptionally with the given {@code Throwable}.
38      * <p>
39      * Similar to {@code CompletableFuture#failedFuture} which was added in
40      * Java 9.
41      *
42      * @param t The failure.
43      * @param <U> The type of the element.
44      * @return The failed future.
45      */
failedFuture(Throwable t)46     public static <U> CompletableFuture<U> failedFuture(Throwable t) {
47         CompletableFuture<U> cf = new CompletableFuture<>();
48         cf.completeExceptionally(t);
49         return cf;
50     }
51 
52     /**
53      * Wraps the given error in a {@link CompletionException} if necessary.
54      * Useful if an exception needs to be rethrown from within {@link
55      * CompletableFuture#handle(java.util.function.BiFunction)} or similar
56      * methods.
57      *
58      * @param t The error.
59      * @return The error as a CompletionException.
60      */
errorAsCompletionException(Throwable t)61     public static CompletionException errorAsCompletionException(Throwable t) {
62         if (t instanceof CompletionException) {
63             return (CompletionException) t;
64         }
65         return new CompletionException(t);
66     }
67 
68     /**
69      * Forward the {@code Throwable} from {@code src} to {@code dst}.
70 
71      * @param src The source of the {@code Throwable}.
72      * @param dst The destination where the {@code Throwable} will be forwarded to.
73      *
74      * @return {@code src}.
75      */
forwardExceptionTo(CompletableFuture<T> src, CompletableFuture<?> dst)76     public static <T> CompletableFuture<T> forwardExceptionTo(CompletableFuture<T> src, CompletableFuture<?> dst) {
77         src.whenComplete((r, e) -> {
78             if (e != null) {
79                 dst.completeExceptionally(e);
80             }
81         });
82         return src;
83     }
84 
85 
86     /**
87      * Forward the {@code Throwable} that can be transformed as per the transformationFunction
88      * from {@code src} to {@code dst}.
89      * @param src The source of the {@code Throwable}.
90      * @param dst The destination where the {@code Throwable} will be forwarded to
91      * @param transformationFunction Transformation function taht will be applied on to the forwarded exception.
92      * @return
93      */
forwardTransformedExceptionTo(CompletableFuture<T> src, CompletableFuture<?> dst, Function<Throwable, Throwable> transformationFunction)94     public static <T> CompletableFuture<T> forwardTransformedExceptionTo(CompletableFuture<T> src,
95                                                                          CompletableFuture<?> dst,
96                                                                          Function<Throwable, Throwable>
97                                                                                  transformationFunction) {
98         src.whenComplete((r, e) -> {
99             if (e != null) {
100                 dst.completeExceptionally(transformationFunction.apply(e));
101             }
102         });
103         return src;
104     }
105 
106     /**
107      * Completes the {@code dst} future based on the result of the {@code src} future asynchronously on
108      * the provided {@link Executor} and return the {@code src} future.
109      *
110      * @param src The source {@link CompletableFuture}
111      * @param dst The destination where the {@code Throwable} or response will be forwarded to.
112      * @return the {@code src} future.
113      */
forwardResultTo(CompletableFuture<T> src, CompletableFuture<T> dst)114     public static <T> CompletableFuture<T> forwardResultTo(CompletableFuture<T> src,
115                                                            CompletableFuture<T> dst) {
116         src.whenComplete((r, e) -> {
117             if (e != null) {
118                 dst.completeExceptionally(e);
119             } else {
120                 dst.complete(r);
121             }
122         });
123 
124         return src;
125     }
126 
127     /**
128      * Completes the {@code dst} future based on the result of the {@code src} future asynchronously on
129      * the provided {@link Executor} and return the {@code src} future.
130      *
131      * @param src The source {@link CompletableFuture}
132      * @param dst The destination where the {@code Throwable} or response will be forwarded to.
133      * @param executor the executor to complete the des future
134      * @return the {@code src} future.
135      */
forwardResultTo(CompletableFuture<T> src, CompletableFuture<T> dst, Executor executor)136     public static <T> CompletableFuture<T> forwardResultTo(CompletableFuture<T> src,
137                                                            CompletableFuture<T> dst,
138                                                            Executor executor) {
139         src.whenCompleteAsync((r, e) -> {
140             if (e != null) {
141                 dst.completeExceptionally(e);
142             } else {
143                 dst.complete(r);
144             }
145         }, executor);
146 
147         return src;
148     }
149 
150     /**
151      * Completes the {@code dst} future based on the result of the {@code src} future, synchronously,
152      * after applying the provided transformation {@link Function} if successful.
153      *
154      * @param src The source {@link CompletableFuture}
155      * @param dst The destination where the {@code Throwable} or transformed result will be forwarded to.
156      * @return the {@code src} future.
157      */
forwardTransformedResultTo(CompletableFuture<SourceT> src, CompletableFuture<DestT> dst, Function<SourceT, DestT> function)158     public static <SourceT, DestT> CompletableFuture<SourceT> forwardTransformedResultTo(CompletableFuture<SourceT> src,
159                                                                                          CompletableFuture<DestT> dst,
160                                                                                          Function<SourceT, DestT> function) {
161         src.whenComplete((r, e) -> {
162             if (e != null) {
163                 dst.completeExceptionally(e);
164             } else {
165                 dst.complete(function.apply(r));
166             }
167         });
168 
169         return src;
170     }
171 
172     /**
173      * Similar to {@link CompletableFuture#allOf(CompletableFuture[])}, but
174      * when any future is completed exceptionally, forwards the
175      * exception to other futures.
176      *
177      * @param futures The futures.
178      * @return The new future that is completed when all the futures in {@code
179      * futures} are.
180      */
allOfExceptionForwarded(CompletableFuture<?>[] futures)181     public static CompletableFuture<Void> allOfExceptionForwarded(CompletableFuture<?>[] futures) {
182 
183         CompletableFuture<Void> anyFail = anyFail(futures);
184 
185         anyFail.whenComplete((r, t) -> {
186             if (t != null) {
187                 for (CompletableFuture<?> cf : futures) {
188                     cf.completeExceptionally(t);
189                 }
190             }
191         });
192 
193         return CompletableFuture.allOf(futures);
194     }
195 
196     /**
197      * Returns a new CompletableFuture that is completed when any of
198      * the given CompletableFutures completes exceptionally.
199      *
200      * @param futures the CompletableFutures
201      * @return a new CompletableFuture that is completed if any provided
202      * future completed exceptionally.
203      */
anyFail(CompletableFuture<?>[] futures)204     static CompletableFuture<Void> anyFail(CompletableFuture<?>[] futures) {
205         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
206 
207         for (CompletableFuture<?> future : futures) {
208             future.whenComplete((r, t) -> {
209                 if (t != null) {
210                     completableFuture.completeExceptionally(t);
211                 }
212             });
213         }
214 
215         return completableFuture;
216     }
217 
joinInterruptibly(CompletableFuture<T> future)218     public static <T> T joinInterruptibly(CompletableFuture<T> future) {
219         try {
220             return future.get();
221         } catch (InterruptedException e) {
222             Thread.currentThread().interrupt();
223             throw new CompletionException("Interrupted while waiting on a future.", e);
224         } catch (ExecutionException e) {
225             Throwable cause = e.getCause();
226             if (cause instanceof Error) {
227                 throw (Error) cause;
228             }
229             throw new CompletionException(cause);
230         }
231     }
232 
joinInterruptiblyIgnoringFailures(CompletableFuture<?> future)233     public static void joinInterruptiblyIgnoringFailures(CompletableFuture<?> future) {
234         try {
235             future.get();
236         } catch (InterruptedException e) {
237             Thread.currentThread().interrupt();
238         } catch (ExecutionException e) {
239             // Ignore
240         }
241     }
242 
243     /**
244      * Joins (interruptibly) on the future, and re-throws any RuntimeExceptions or Errors just like the async task would have
245      * thrown if it was executed synchronously.
246      */
joinLikeSync(CompletableFuture<T> future)247     public static <T> T joinLikeSync(CompletableFuture<T> future) {
248         try {
249             return joinInterruptibly(future);
250         } catch (CompletionException e) {
251             Throwable cause = e.getCause();
252             if (cause instanceof RuntimeException) {
253                 // Make sure we don't lose the context of where the join is in the stack...
254                 cause.addSuppressed(new RuntimeException("Task failed."));
255                 throw (RuntimeException) cause;
256             }
257             throw e;
258         }
259     }
260 }
261