1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.http.nio.netty.internal.utils;
17 
18 import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.CHANNEL_DIAGNOSTICS;
19 
20 import io.netty.buffer.ByteBufAllocator;
21 import io.netty.channel.Channel;
22 import io.netty.channel.EventLoop;
23 import io.netty.handler.ssl.SslContext;
24 import io.netty.handler.ssl.SslHandler;
25 import io.netty.handler.timeout.ReadTimeoutException;
26 import io.netty.handler.timeout.WriteTimeoutException;
27 import io.netty.util.AttributeKey;
28 import io.netty.util.concurrent.EventExecutor;
29 import io.netty.util.concurrent.Future;
30 import io.netty.util.concurrent.GenericFutureListener;
31 import io.netty.util.concurrent.Promise;
32 import io.netty.util.concurrent.SucceededFuture;
33 import java.io.IOException;
34 import java.nio.channels.ClosedChannelException;
35 import java.time.Duration;
36 import java.util.concurrent.CompletableFuture;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39 import java.util.function.BiConsumer;
40 import java.util.function.Consumer;
41 import java.util.function.Function;
42 import javax.net.ssl.SSLEngine;
43 import javax.net.ssl.SSLParameters;
44 import software.amazon.awssdk.annotations.SdkInternalApi;
45 import software.amazon.awssdk.http.nio.netty.internal.ChannelDiagnostics;
46 import software.amazon.awssdk.utils.FunctionalUtils;
47 import software.amazon.awssdk.utils.Logger;
48 
49 @SdkInternalApi
50 public final class NettyUtils {
51     /**
52      * Completed succeed future.
53      */
54     public static final SucceededFuture<?> SUCCEEDED_FUTURE = new SucceededFuture<>(null, null);
55 
56     public static final String CLOSED_CHANNEL_ERROR_MESSAGE = "The connection was closed during the request. The request will "
57                                                               + "usually succeed on a retry, but if it does not: consider "
58                                                               + "disabling any proxies you have configured, enabling debug "
59                                                               + "logging, or performing a TCP dump to identify the root cause. "
60                                                               + "If this is a streaming operation, validate that data is being "
61                                                               + "read or written in a timely manner.";
62     private static final Logger log = Logger.loggerFor(NettyUtils.class);
63 
NettyUtils()64     private NettyUtils() {
65     }
66 
decorateException(Channel channel, Throwable originalCause)67     public static Throwable decorateException(Channel channel, Throwable originalCause) {
68         if (isAcquireTimeoutException(originalCause)) {
69             return new Throwable(getMessageForAcquireTimeoutException(), originalCause);
70         } else if (isTooManyPendingAcquiresException(originalCause)) {
71             return new Throwable(getMessageForTooManyAcquireOperationsError(), originalCause);
72         } else if (originalCause instanceof ReadTimeoutException) {
73             return new IOException("Read timed out", originalCause);
74         } else if (originalCause instanceof WriteTimeoutException) {
75             return new IOException("Write timed out", originalCause);
76         } else if (originalCause instanceof ClosedChannelException || isConnectionResetException(originalCause)) {
77             return new IOException(NettyUtils.closedChannelMessage(channel), originalCause);
78         }
79 
80         return originalCause;
81     }
82 
isConnectionResetException(Throwable originalCause)83     private static boolean isConnectionResetException(Throwable originalCause) {
84         String message = originalCause.getMessage();
85         return originalCause instanceof IOException &&
86                message != null &&
87                message.contains("Connection reset by peer");
88     }
89 
isAcquireTimeoutException(Throwable originalCause)90     private static boolean isAcquireTimeoutException(Throwable originalCause) {
91         String message = originalCause.getMessage();
92         return originalCause instanceof TimeoutException &&
93                message != null &&
94                message.contains("Acquire operation took longer");
95     }
96 
isTooManyPendingAcquiresException(Throwable originalCause)97     private static boolean isTooManyPendingAcquiresException(Throwable originalCause) {
98         String message = originalCause.getMessage();
99         return originalCause instanceof IllegalStateException &&
100                message != null &&
101                message.contains("Too many outstanding acquire operations");
102     }
103 
getMessageForAcquireTimeoutException()104     private static String getMessageForAcquireTimeoutException() {
105         return "Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a "
106                + "connection from the pool within the specified maximum time. This can be due to high request rate.\n"
107 
108                + "Consider taking any of the following actions to mitigate the issue: increase max connections, "
109                + "increase acquire timeout, or slowing the request rate.\n"
110 
111                + "Increasing the max connections can increase client throughput (unless the network interface is already "
112                + "fully utilized), but can eventually start to hit operation system limitations on the number of file "
113                + "descriptors used by the process. If you already are fully utilizing your network interface or cannot "
114                + "further increase your connection count, increasing the acquire timeout gives extra time for requests to "
115                + "acquire a connection before timing out. If the connections doesn't free up, the subsequent requests "
116                + "will still timeout.\n"
117 
118                + "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large "
119                + "traffic bursts cannot overload the client, being more efficient with the number of times you need to "
120                + "call AWS, or by increasing the number of hosts sending requests.";
121     }
122 
getMessageForTooManyAcquireOperationsError()123     private static String getMessageForTooManyAcquireOperationsError() {
124         return "Maximum pending connection acquisitions exceeded. The request rate is too high for the client to keep up.\n"
125 
126                + "Consider taking any of the following actions to mitigate the issue: increase max connections, "
127                + "increase max pending acquire count, decrease connection acquisition timeout, or "
128                + "slow the request rate.\n"
129 
130                + "Increasing the max connections can increase client throughput (unless the network interface is already "
131                + "fully utilized), but can eventually start to hit operation system limitations on the number of file "
132                + "descriptors used by the process. If you already are fully utilizing your network interface or cannot "
133                + "further increase your connection count, increasing the pending acquire count allows extra requests to be "
134                + "buffered by the client, but can cause additional request latency and higher memory usage. If your request"
135                + " latency or memory usage is already too high, decreasing the lease timeout will allow requests to fail "
136                + "more quickly, reducing the number of pending connection acquisitions, but likely won't decrease the total "
137                + "number of failed requests.\n"
138 
139                + "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large "
140                + "traffic bursts cannot overload the client, being more efficient with the number of times you need to call "
141                + "AWS, or by increasing the number of hosts sending requests.";
142     }
143 
closedChannelMessage(Channel channel)144     public static String closedChannelMessage(Channel channel) {
145         ChannelDiagnostics channelDiagnostics = channel != null && channel.attr(CHANNEL_DIAGNOSTICS) != null ?
146                                                 channel.attr(CHANNEL_DIAGNOSTICS).get() : null;
147         ChannelDiagnostics parentChannelDiagnostics = channel != null && channel.parent() != null &&
148                                                       channel.parent().attr(CHANNEL_DIAGNOSTICS) != null ?
149                                                       channel.parent().attr(CHANNEL_DIAGNOSTICS).get() : null;
150 
151         StringBuilder error = new StringBuilder();
152         error.append(CLOSED_CHANNEL_ERROR_MESSAGE);
153 
154         if (channelDiagnostics != null) {
155             error.append(" Channel Information: ").append(channelDiagnostics);
156 
157             if (parentChannelDiagnostics != null) {
158                 error.append(" Parent Channel Information: ").append(parentChannelDiagnostics);
159             }
160         }
161 
162         return error.toString();
163     }
164 
165     /**
166      * Creates a {@link BiConsumer} that notifies the promise of any failures either via the {@link Throwable} passed into the
167      * BiConsumer of as a result of running the successFunction.
168      *
169      * @param successFunction Function called to process the successful result and map it into the result to notify the promise
170      * with.
171      * @param promise Promise to notify of success or failure.
172      * @param <SuccessT> Success type.
173      * @param <PromiseT> Type being fulfilled by the promise.
174      * @return BiConsumer that can be used in a {@link CompletableFuture#whenComplete(BiConsumer)} method.
175      */
promiseNotifyingBiConsumer( Function<SuccessT, PromiseT> successFunction, Promise<PromiseT> promise)176     public static <SuccessT, PromiseT> BiConsumer<SuccessT, ? super Throwable> promiseNotifyingBiConsumer(
177         Function<SuccessT, PromiseT> successFunction, Promise<PromiseT> promise) {
178         return (success, fail) -> {
179             if (fail != null) {
180                 promise.setFailure(fail);
181             } else {
182                 try {
183                     promise.setSuccess(successFunction.apply(success));
184                 } catch (Throwable e) {
185                     promise.setFailure(e);
186                 }
187             }
188         };
189     }
190 
191     /**
192      * Creates a {@link BiConsumer} that notifies the promise of any failures either via the throwable passed into the BiConsumer
193      * or as a result of running the successConsumer. This assumes that the successConsumer will notify the promise when it
194      * completes successfully.
195      *
196      * @param successConsumer BiConsumer to call if the result is successful. Promise is also passed and must be notified on
197      * success.
198      * @param promise Promise to notify.
199      * @param <SuccessT> Success type.
200      * @param <PromiseT> Type being fulfilled by the Promise.
201      * @return BiConsumer that can be used in a {@link CompletableFuture#whenComplete(BiConsumer)} method.
202      */
203     public static <SuccessT, PromiseT> BiConsumer<SuccessT, ? super Throwable> asyncPromiseNotifyingBiConsumer(
204         BiConsumer<SuccessT, Promise<PromiseT>> successConsumer, Promise<PromiseT> promise) {
205         return (success, fail) -> {
206             if (fail != null) {
207                 promise.setFailure(fail);
208             } else {
209                 try {
210                     successConsumer.accept(success, promise);
211                 } catch (Throwable e) {
212                     // If the successConsumer fails synchronously then we can notify the promise. If it fails asynchronously
213                     // it's up to the successConsumer to notify.
214                     promise.setFailure(e);
215                 }
216             }
217         };
218     }
219 
220     /**
221      * Create a {@link GenericFutureListener} that will notify the provided {@link Promise} on success and failure.
222      *
223      * @param channelPromise Promise to notify.
224      * @return GenericFutureListener
225      */
226     public static <T> GenericFutureListener<Future<T>> promiseNotifyingListener(Promise<T> channelPromise) {
227         return future -> {
228             if (future.isSuccess()) {
229                 channelPromise.setSuccess(future.getNow());
230             } else {
231                 channelPromise.setFailure(future.cause());
232             }
233         };
234     }
235 
236     /**
237      * Runs a task in the given {@link EventExecutor}. Runs immediately if the current thread is in the
238      * eventExecutor.
239      *
240      * @param eventExecutor Executor to run task in.
241      * @param runnable Task to run.
242      *
243      * @return The {@code Future} from from the executor.
244      */
245     public static Future<?> doInEventLoop(EventExecutor eventExecutor, Runnable runnable) {
246         if (eventExecutor.inEventLoop()) {
247             try {
248                 runnable.run();
249                 return eventExecutor.newSucceededFuture(null);
250             } catch (Throwable t) {
251                 return eventExecutor.newFailedFuture(t);
252             }
253         }
254         return eventExecutor.submit(runnable);
255     }
256 
257     /**
258      * Runs a task in the given {@link EventExecutor}. Runs immediately if the current thread is in the
259      * eventExecutor. Notifies the given {@link Promise} if a failure occurs.
260      *
261      * @param eventExecutor Executor to run task in.
262      * @param runnable Task to run.
263      * @param promise Promise to notify if failure occurs.
264      */
265     public static void doInEventLoop(EventExecutor eventExecutor, Runnable runnable, Promise<?> promise) {
266         try {
267             if (eventExecutor.inEventLoop()) {
268                 runnable.run();
269             } else {
270                 eventExecutor.submit(() -> {
271                     try {
272                         runnable.run();
273                     } catch (Throwable e) {
274                         promise.setFailure(e);
275                     }
276                 });
277             }
278         } catch (Throwable e) {
279             promise.setFailure(e);
280         }
281     }
282 
283     public static void warnIfNotInEventLoop(EventLoop loop) {
284         assert loop.inEventLoop();
285         if (!loop.inEventLoop()) {
286             Exception exception =
287                 new IllegalStateException("Execution is not in the expected event loop. Please report this issue to the "
288                                           + "AWS SDK for Java team on GitHub, because it could result in race conditions.");
289             log.warn(() -> "Execution is happening outside of the expected event loop.", exception);
290         }
291     }
292 
293     /**
294      * @return an {@code AttributeKey} for {@code attr}. This returns an existing instance if it was previously created.
295      */
296     public static <T> AttributeKey<T> getOrCreateAttributeKey(String attr) {
297         if (AttributeKey.exists(attr)) {
298             return AttributeKey.valueOf(attr);
299         }
300         //CHECKSTYLE:OFF - This is the only place allowed to call AttributeKey.newInstance()
301         return AttributeKey.newInstance(attr);
302         //CHECKSTYLE:ON
303     }
304 
305     /**
306      * @return a new {@link SslHandler} with ssl engine configured
307      */
308     public static SslHandler newSslHandler(SslContext sslContext, ByteBufAllocator alloc, String peerHost, int peerPort,
309                                            Duration handshakeTimeout) {
310         // Need to provide host and port to enable SNI
311         // https://github.com/netty/netty/issues/3801#issuecomment-104274440
312         SslHandler sslHandler = sslContext.newHandler(alloc, peerHost, peerPort);
313         sslHandler.setHandshakeTimeout(handshakeTimeout.toMillis(), TimeUnit.MILLISECONDS);
314         configureSslEngine(sslHandler.engine());
315         return sslHandler;
316     }
317 
318     /**
319      * Enable Hostname verification.
320      *
321      * See https://netty.io/4.0/api/io/netty/handler/ssl/SslContext.html#newHandler-io.netty.buffer.ByteBufAllocator-java.lang
322      * .String-int-
323      *
324      * @param sslEngine the sslEngine to configure
325      */
326     private static void configureSslEngine(SSLEngine sslEngine) {
327         SSLParameters sslParameters = sslEngine.getSSLParameters();
328         sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
329         sslEngine.setSSLParameters(sslParameters);
330     }
331 
332     /**
333      * Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
334      * or invoke the provided {@link Consumer} with the result of a successful operation completion. This is useful for chaining
335      * together multiple futures that may depend upon each other but that may not have the same return type.
336      * <p>
337      * Note that if you do not need the value returned by a successful completion (or if it returns {@link Void}) you may use
338      * {@link #runOrPropagate(Promise, Runnable)} instead.
339      *
340      * @param destination the Promise to notify upon failure or cancellation
341      * @param onSuccess   the Consumer to invoke upon success
342      */
343     public static <T> GenericFutureListener<Future<T>> consumeOrPropagate(Promise<?> destination, Consumer<T> onSuccess) {
344         return f -> {
345             if (f.isSuccess()) {
346                 try {
347                     T result = f.getNow();
348                     onSuccess.accept(result);
349                 } catch (Throwable t) {
350                     destination.tryFailure(t);
351                 }
352             } else if (f.isCancelled()) {
353                 destination.cancel(false);
354             } else {
355                 destination.tryFailure(f.cause());
356             }
357         };
358     }
359 
360     /**
361      * Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
362      * or invoke the provided {@link Runnable} upon successful operation completion. This is useful for chaining together multiple
363      * futures that may depend upon each other but that may not have the same return type.
364      *
365      * @param destination the Promise to notify upon failure or cancellation
366      * @param onSuccess   the Runnable to invoke upon success
367      */
368     public static <T> GenericFutureListener<Future<T>> runOrPropagate(Promise<?> destination, Runnable onSuccess) {
369         return f -> {
370             if (f.isSuccess()) {
371                 try {
372                     onSuccess.run();
373                 } catch (Throwable t) {
374                     destination.tryFailure(t);
375                 }
376             } else if (f.isCancelled()) {
377                 destination.cancel(false);
378             } else {
379                 destination.tryFailure(f.cause());
380             }
381         };
382     }
383 
384     public static void runAndLogError(NettyClientLogger log, String errorMsg, FunctionalUtils.UnsafeRunnable runnable) {
385         try {
386             runnable.run();
387         } catch (Exception e) {
388             log.error(null, () -> errorMsg, e);
389         }
390     }
391 }
392