1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.http.nio.netty.internal.utils; 17 18 import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.CHANNEL_DIAGNOSTICS; 19 20 import io.netty.buffer.ByteBufAllocator; 21 import io.netty.channel.Channel; 22 import io.netty.channel.EventLoop; 23 import io.netty.handler.ssl.SslContext; 24 import io.netty.handler.ssl.SslHandler; 25 import io.netty.handler.timeout.ReadTimeoutException; 26 import io.netty.handler.timeout.WriteTimeoutException; 27 import io.netty.util.AttributeKey; 28 import io.netty.util.concurrent.EventExecutor; 29 import io.netty.util.concurrent.Future; 30 import io.netty.util.concurrent.GenericFutureListener; 31 import io.netty.util.concurrent.Promise; 32 import io.netty.util.concurrent.SucceededFuture; 33 import java.io.IOException; 34 import java.nio.channels.ClosedChannelException; 35 import java.time.Duration; 36 import java.util.concurrent.CompletableFuture; 37 import java.util.concurrent.TimeUnit; 38 import java.util.concurrent.TimeoutException; 39 import java.util.function.BiConsumer; 40 import java.util.function.Consumer; 41 import java.util.function.Function; 42 import javax.net.ssl.SSLEngine; 43 import javax.net.ssl.SSLParameters; 44 import software.amazon.awssdk.annotations.SdkInternalApi; 45 import software.amazon.awssdk.http.nio.netty.internal.ChannelDiagnostics; 46 import software.amazon.awssdk.utils.FunctionalUtils; 47 import software.amazon.awssdk.utils.Logger; 48 49 @SdkInternalApi 50 public final class NettyUtils { 51 /** 52 * Completed succeed future. 53 */ 54 public static final SucceededFuture<?> SUCCEEDED_FUTURE = new SucceededFuture<>(null, null); 55 56 public static final String CLOSED_CHANNEL_ERROR_MESSAGE = "The connection was closed during the request. The request will " 57 + "usually succeed on a retry, but if it does not: consider " 58 + "disabling any proxies you have configured, enabling debug " 59 + "logging, or performing a TCP dump to identify the root cause. " 60 + "If this is a streaming operation, validate that data is being " 61 + "read or written in a timely manner."; 62 private static final Logger log = Logger.loggerFor(NettyUtils.class); 63 NettyUtils()64 private NettyUtils() { 65 } 66 decorateException(Channel channel, Throwable originalCause)67 public static Throwable decorateException(Channel channel, Throwable originalCause) { 68 if (isAcquireTimeoutException(originalCause)) { 69 return new Throwable(getMessageForAcquireTimeoutException(), originalCause); 70 } else if (isTooManyPendingAcquiresException(originalCause)) { 71 return new Throwable(getMessageForTooManyAcquireOperationsError(), originalCause); 72 } else if (originalCause instanceof ReadTimeoutException) { 73 return new IOException("Read timed out", originalCause); 74 } else if (originalCause instanceof WriteTimeoutException) { 75 return new IOException("Write timed out", originalCause); 76 } else if (originalCause instanceof ClosedChannelException || isConnectionResetException(originalCause)) { 77 return new IOException(NettyUtils.closedChannelMessage(channel), originalCause); 78 } 79 80 return originalCause; 81 } 82 isConnectionResetException(Throwable originalCause)83 private static boolean isConnectionResetException(Throwable originalCause) { 84 String message = originalCause.getMessage(); 85 return originalCause instanceof IOException && 86 message != null && 87 message.contains("Connection reset by peer"); 88 } 89 isAcquireTimeoutException(Throwable originalCause)90 private static boolean isAcquireTimeoutException(Throwable originalCause) { 91 String message = originalCause.getMessage(); 92 return originalCause instanceof TimeoutException && 93 message != null && 94 message.contains("Acquire operation took longer"); 95 } 96 isTooManyPendingAcquiresException(Throwable originalCause)97 private static boolean isTooManyPendingAcquiresException(Throwable originalCause) { 98 String message = originalCause.getMessage(); 99 return originalCause instanceof IllegalStateException && 100 message != null && 101 message.contains("Too many outstanding acquire operations"); 102 } 103 getMessageForAcquireTimeoutException()104 private static String getMessageForAcquireTimeoutException() { 105 return "Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a " 106 + "connection from the pool within the specified maximum time. This can be due to high request rate.\n" 107 108 + "Consider taking any of the following actions to mitigate the issue: increase max connections, " 109 + "increase acquire timeout, or slowing the request rate.\n" 110 111 + "Increasing the max connections can increase client throughput (unless the network interface is already " 112 + "fully utilized), but can eventually start to hit operation system limitations on the number of file " 113 + "descriptors used by the process. If you already are fully utilizing your network interface or cannot " 114 + "further increase your connection count, increasing the acquire timeout gives extra time for requests to " 115 + "acquire a connection before timing out. If the connections doesn't free up, the subsequent requests " 116 + "will still timeout.\n" 117 118 + "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large " 119 + "traffic bursts cannot overload the client, being more efficient with the number of times you need to " 120 + "call AWS, or by increasing the number of hosts sending requests."; 121 } 122 getMessageForTooManyAcquireOperationsError()123 private static String getMessageForTooManyAcquireOperationsError() { 124 return "Maximum pending connection acquisitions exceeded. The request rate is too high for the client to keep up.\n" 125 126 + "Consider taking any of the following actions to mitigate the issue: increase max connections, " 127 + "increase max pending acquire count, decrease connection acquisition timeout, or " 128 + "slow the request rate.\n" 129 130 + "Increasing the max connections can increase client throughput (unless the network interface is already " 131 + "fully utilized), but can eventually start to hit operation system limitations on the number of file " 132 + "descriptors used by the process. If you already are fully utilizing your network interface or cannot " 133 + "further increase your connection count, increasing the pending acquire count allows extra requests to be " 134 + "buffered by the client, but can cause additional request latency and higher memory usage. If your request" 135 + " latency or memory usage is already too high, decreasing the lease timeout will allow requests to fail " 136 + "more quickly, reducing the number of pending connection acquisitions, but likely won't decrease the total " 137 + "number of failed requests.\n" 138 139 + "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large " 140 + "traffic bursts cannot overload the client, being more efficient with the number of times you need to call " 141 + "AWS, or by increasing the number of hosts sending requests."; 142 } 143 closedChannelMessage(Channel channel)144 public static String closedChannelMessage(Channel channel) { 145 ChannelDiagnostics channelDiagnostics = channel != null && channel.attr(CHANNEL_DIAGNOSTICS) != null ? 146 channel.attr(CHANNEL_DIAGNOSTICS).get() : null; 147 ChannelDiagnostics parentChannelDiagnostics = channel != null && channel.parent() != null && 148 channel.parent().attr(CHANNEL_DIAGNOSTICS) != null ? 149 channel.parent().attr(CHANNEL_DIAGNOSTICS).get() : null; 150 151 StringBuilder error = new StringBuilder(); 152 error.append(CLOSED_CHANNEL_ERROR_MESSAGE); 153 154 if (channelDiagnostics != null) { 155 error.append(" Channel Information: ").append(channelDiagnostics); 156 157 if (parentChannelDiagnostics != null) { 158 error.append(" Parent Channel Information: ").append(parentChannelDiagnostics); 159 } 160 } 161 162 return error.toString(); 163 } 164 165 /** 166 * Creates a {@link BiConsumer} that notifies the promise of any failures either via the {@link Throwable} passed into the 167 * BiConsumer of as a result of running the successFunction. 168 * 169 * @param successFunction Function called to process the successful result and map it into the result to notify the promise 170 * with. 171 * @param promise Promise to notify of success or failure. 172 * @param <SuccessT> Success type. 173 * @param <PromiseT> Type being fulfilled by the promise. 174 * @return BiConsumer that can be used in a {@link CompletableFuture#whenComplete(BiConsumer)} method. 175 */ promiseNotifyingBiConsumer( Function<SuccessT, PromiseT> successFunction, Promise<PromiseT> promise)176 public static <SuccessT, PromiseT> BiConsumer<SuccessT, ? super Throwable> promiseNotifyingBiConsumer( 177 Function<SuccessT, PromiseT> successFunction, Promise<PromiseT> promise) { 178 return (success, fail) -> { 179 if (fail != null) { 180 promise.setFailure(fail); 181 } else { 182 try { 183 promise.setSuccess(successFunction.apply(success)); 184 } catch (Throwable e) { 185 promise.setFailure(e); 186 } 187 } 188 }; 189 } 190 191 /** 192 * Creates a {@link BiConsumer} that notifies the promise of any failures either via the throwable passed into the BiConsumer 193 * or as a result of running the successConsumer. This assumes that the successConsumer will notify the promise when it 194 * completes successfully. 195 * 196 * @param successConsumer BiConsumer to call if the result is successful. Promise is also passed and must be notified on 197 * success. 198 * @param promise Promise to notify. 199 * @param <SuccessT> Success type. 200 * @param <PromiseT> Type being fulfilled by the Promise. 201 * @return BiConsumer that can be used in a {@link CompletableFuture#whenComplete(BiConsumer)} method. 202 */ 203 public static <SuccessT, PromiseT> BiConsumer<SuccessT, ? super Throwable> asyncPromiseNotifyingBiConsumer( 204 BiConsumer<SuccessT, Promise<PromiseT>> successConsumer, Promise<PromiseT> promise) { 205 return (success, fail) -> { 206 if (fail != null) { 207 promise.setFailure(fail); 208 } else { 209 try { 210 successConsumer.accept(success, promise); 211 } catch (Throwable e) { 212 // If the successConsumer fails synchronously then we can notify the promise. If it fails asynchronously 213 // it's up to the successConsumer to notify. 214 promise.setFailure(e); 215 } 216 } 217 }; 218 } 219 220 /** 221 * Create a {@link GenericFutureListener} that will notify the provided {@link Promise} on success and failure. 222 * 223 * @param channelPromise Promise to notify. 224 * @return GenericFutureListener 225 */ 226 public static <T> GenericFutureListener<Future<T>> promiseNotifyingListener(Promise<T> channelPromise) { 227 return future -> { 228 if (future.isSuccess()) { 229 channelPromise.setSuccess(future.getNow()); 230 } else { 231 channelPromise.setFailure(future.cause()); 232 } 233 }; 234 } 235 236 /** 237 * Runs a task in the given {@link EventExecutor}. Runs immediately if the current thread is in the 238 * eventExecutor. 239 * 240 * @param eventExecutor Executor to run task in. 241 * @param runnable Task to run. 242 * 243 * @return The {@code Future} from from the executor. 244 */ 245 public static Future<?> doInEventLoop(EventExecutor eventExecutor, Runnable runnable) { 246 if (eventExecutor.inEventLoop()) { 247 try { 248 runnable.run(); 249 return eventExecutor.newSucceededFuture(null); 250 } catch (Throwable t) { 251 return eventExecutor.newFailedFuture(t); 252 } 253 } 254 return eventExecutor.submit(runnable); 255 } 256 257 /** 258 * Runs a task in the given {@link EventExecutor}. Runs immediately if the current thread is in the 259 * eventExecutor. Notifies the given {@link Promise} if a failure occurs. 260 * 261 * @param eventExecutor Executor to run task in. 262 * @param runnable Task to run. 263 * @param promise Promise to notify if failure occurs. 264 */ 265 public static void doInEventLoop(EventExecutor eventExecutor, Runnable runnable, Promise<?> promise) { 266 try { 267 if (eventExecutor.inEventLoop()) { 268 runnable.run(); 269 } else { 270 eventExecutor.submit(() -> { 271 try { 272 runnable.run(); 273 } catch (Throwable e) { 274 promise.setFailure(e); 275 } 276 }); 277 } 278 } catch (Throwable e) { 279 promise.setFailure(e); 280 } 281 } 282 283 public static void warnIfNotInEventLoop(EventLoop loop) { 284 assert loop.inEventLoop(); 285 if (!loop.inEventLoop()) { 286 Exception exception = 287 new IllegalStateException("Execution is not in the expected event loop. Please report this issue to the " 288 + "AWS SDK for Java team on GitHub, because it could result in race conditions."); 289 log.warn(() -> "Execution is happening outside of the expected event loop.", exception); 290 } 291 } 292 293 /** 294 * @return an {@code AttributeKey} for {@code attr}. This returns an existing instance if it was previously created. 295 */ 296 public static <T> AttributeKey<T> getOrCreateAttributeKey(String attr) { 297 if (AttributeKey.exists(attr)) { 298 return AttributeKey.valueOf(attr); 299 } 300 //CHECKSTYLE:OFF - This is the only place allowed to call AttributeKey.newInstance() 301 return AttributeKey.newInstance(attr); 302 //CHECKSTYLE:ON 303 } 304 305 /** 306 * @return a new {@link SslHandler} with ssl engine configured 307 */ 308 public static SslHandler newSslHandler(SslContext sslContext, ByteBufAllocator alloc, String peerHost, int peerPort, 309 Duration handshakeTimeout) { 310 // Need to provide host and port to enable SNI 311 // https://github.com/netty/netty/issues/3801#issuecomment-104274440 312 SslHandler sslHandler = sslContext.newHandler(alloc, peerHost, peerPort); 313 sslHandler.setHandshakeTimeout(handshakeTimeout.toMillis(), TimeUnit.MILLISECONDS); 314 configureSslEngine(sslHandler.engine()); 315 return sslHandler; 316 } 317 318 /** 319 * Enable Hostname verification. 320 * 321 * See https://netty.io/4.0/api/io/netty/handler/ssl/SslContext.html#newHandler-io.netty.buffer.ByteBufAllocator-java.lang 322 * .String-int- 323 * 324 * @param sslEngine the sslEngine to configure 325 */ 326 private static void configureSslEngine(SSLEngine sslEngine) { 327 SSLParameters sslParameters = sslEngine.getSSLParameters(); 328 sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); 329 sslEngine.setSSLParameters(sslParameters); 330 } 331 332 /** 333 * Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise}, 334 * or invoke the provided {@link Consumer} with the result of a successful operation completion. This is useful for chaining 335 * together multiple futures that may depend upon each other but that may not have the same return type. 336 * <p> 337 * Note that if you do not need the value returned by a successful completion (or if it returns {@link Void}) you may use 338 * {@link #runOrPropagate(Promise, Runnable)} instead. 339 * 340 * @param destination the Promise to notify upon failure or cancellation 341 * @param onSuccess the Consumer to invoke upon success 342 */ 343 public static <T> GenericFutureListener<Future<T>> consumeOrPropagate(Promise<?> destination, Consumer<T> onSuccess) { 344 return f -> { 345 if (f.isSuccess()) { 346 try { 347 T result = f.getNow(); 348 onSuccess.accept(result); 349 } catch (Throwable t) { 350 destination.tryFailure(t); 351 } 352 } else if (f.isCancelled()) { 353 destination.cancel(false); 354 } else { 355 destination.tryFailure(f.cause()); 356 } 357 }; 358 } 359 360 /** 361 * Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise}, 362 * or invoke the provided {@link Runnable} upon successful operation completion. This is useful for chaining together multiple 363 * futures that may depend upon each other but that may not have the same return type. 364 * 365 * @param destination the Promise to notify upon failure or cancellation 366 * @param onSuccess the Runnable to invoke upon success 367 */ 368 public static <T> GenericFutureListener<Future<T>> runOrPropagate(Promise<?> destination, Runnable onSuccess) { 369 return f -> { 370 if (f.isSuccess()) { 371 try { 372 onSuccess.run(); 373 } catch (Throwable t) { 374 destination.tryFailure(t); 375 } 376 } else if (f.isCancelled()) { 377 destination.cancel(false); 378 } else { 379 destination.tryFailure(f.cause()); 380 } 381 }; 382 } 383 384 public static void runAndLogError(NettyClientLogger log, String errorMsg, FunctionalUtils.UnsafeRunnable runnable) { 385 try { 386 runnable.run(); 387 } catch (Exception e) { 388 log.error(null, () -> errorMsg, e); 389 } 390 } 391 } 392