1 package software.amazon.awssdk.crt.test; 2 3 import org.junit.Assert; 4 import org.junit.Assume; 5 import org.junit.Test; 6 import software.amazon.awssdk.crt.CrtRuntimeException; 7 import software.amazon.awssdk.crt.Log; 8 import software.amazon.awssdk.crt.auth.credentials.Credentials; 9 import software.amazon.awssdk.crt.auth.credentials.CredentialsProvider; 10 import software.amazon.awssdk.crt.auth.credentials.DefaultChainCredentialsProvider; 11 import software.amazon.awssdk.crt.auth.credentials.StaticCredentialsProvider; 12 import software.amazon.awssdk.crt.auth.signing.AwsSigningConfig; 13 import software.amazon.awssdk.crt.http.*; 14 import software.amazon.awssdk.crt.io.*; 15 import software.amazon.awssdk.crt.s3.*; 16 import software.amazon.awssdk.crt.s3.ChecksumConfig.ChecksumLocation; 17 import software.amazon.awssdk.crt.s3.S3MetaRequestOptions.MetaRequestType; 18 import software.amazon.awssdk.crt.utils.ByteBufferUtils; 19 20 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertNotNull; 22 import static org.junit.Assert.assertTrue; 23 import static org.junit.Assert.assertThrows; 24 25 import java.io.File; 26 import java.io.FileNotFoundException; 27 import java.io.IOException; 28 import java.io.PrintWriter; 29 import java.net.URI; 30 import java.nio.BufferOverflowException; 31 import java.nio.ByteBuffer; 32 import java.nio.charset.StandardCharsets; 33 import java.nio.file.Files; 34 import java.nio.file.Path; 35 import java.nio.file.Paths; 36 import java.time.Clock; 37 import java.time.Duration; 38 import java.time.Instant; 39 import java.time.temporal.ChronoUnit; 40 import java.util.ArrayList; 41 import java.util.Arrays; 42 import java.util.LinkedList; 43 import java.util.List; 44 import java.util.concurrent.*; 45 import java.util.concurrent.atomic.AtomicInteger; 46 import java.util.concurrent.atomic.AtomicLong; 47 import java.util.concurrent.atomic.AtomicReference; 48 import java.util.function.Predicate; 49 import java.util.stream.DoubleStream; 50 51 public class S3ClientTest extends CrtTestFixture { 52 53 static final String ENDPOINT = System.getProperty("ENDPOINT") == null 54 ? "aws-crt-test-stuff-us-west-2.s3.us-west-2.amazonaws.com" 55 : System.getenv("ENDPOINT"); 56 static final String S3EXPRESS_ENDPOINT = System.getenv("S3EXPRESS_ENDPOINT") == null 57 ? "crts-west2--usw2-az1--x-s3.s3express-usw2-az1.us-west-2.amazonaws.com" 58 : System.getenv("S3EXPRESS_ENDPOINT"); 59 static final String S3EXPRESS_ENDPOINT_EAST1 = System.getenv("S3EXPRESS_ENDPOINT_EAST1") == null 60 ? "crts-east1--use1-az4--x-s3.s3express-use1-az4.us-east-1.amazonaws.com" 61 : System.getenv("S3EXPRESS_ENDPOINT_EAST1"); 62 static final String REGION = System.getenv("REGION") == null ? "us-west-2" : System.getenv("REGION"); 63 64 65 static final String COPY_SOURCE_BUCKET = "aws-crt-test-stuff-us-west-2"; 66 static final String COPY_SOURCE_KEY = "crt-canary-obj.txt"; 67 static final String X_AMZ_COPY_SOURCE_HEADER = "x-amz-copy-source"; 68 S3ClientTest()69 public S3ClientTest() { 70 } 71 createS3Client(S3ClientOptions options, int numThreads)72 private S3Client createS3Client(S3ClientOptions options, int numThreads) { 73 74 return createS3Client(options, numThreads, 0); 75 } 76 createS3Client(S3ClientOptions options, int numThreads, int cpuGroup)77 private S3Client createS3Client(S3ClientOptions options, int numThreads, int cpuGroup) { 78 try (EventLoopGroup elg = new EventLoopGroup(cpuGroup, numThreads)) { 79 return createS3Client(options, elg); 80 } 81 } 82 createS3Client(S3ClientOptions options, EventLoopGroup elg)83 private S3Client createS3Client(S3ClientOptions options, EventLoopGroup elg) { 84 try (HostResolver hostResolver = new HostResolver(elg); 85 ClientBootstrap clientBootstrap = new ClientBootstrap(elg, hostResolver);) { 86 Assert.assertNotNull(clientBootstrap); 87 88 try (DefaultChainCredentialsProvider credentialsProvider = new DefaultChainCredentialsProvider.DefaultChainCredentialsProviderBuilder() 89 .withClientBootstrap(clientBootstrap).build(); 90 AwsSigningConfig signingConfig = AwsSigningConfig.getDefaultS3SigningConfig(REGION, 91 credentialsProvider);) { 92 Assert.assertNotNull(credentialsProvider); 93 options.withClientBootstrap(clientBootstrap).withSigningConfig(signingConfig); 94 return new S3Client(options); 95 } 96 } catch (NullPointerException ex) { 97 ex.printStackTrace(); 98 System.err.println(ex.getMessage()); 99 return null; 100 } 101 } 102 createS3Client(S3ClientOptions options)103 private S3Client createS3Client(S3ClientOptions options) { 104 return createS3Client(options, 1); 105 } 106 makeExceptionFromFinishedResponseContext(S3FinishedResponseContext context)107 private RuntimeException makeExceptionFromFinishedResponseContext(S3FinishedResponseContext context) { 108 return new RuntimeException(String.format("error code:(%d) response status code(%d), error payload(%s)", 109 context.getErrorCode(), 110 context.getResponseStatus(), 111 new String(context.getErrorPayload(), java.nio.charset.StandardCharsets.UTF_8), 112 context.getCause())); 113 } 114 115 @Test testS3ClientCreateDestroy()116 public void testS3ClientCreateDestroy() { 117 skipIfAndroid(); 118 skipIfNetworkUnavailable(); 119 120 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION) 121 .withComputeContentMd5(true) 122 .withMemoryLimitInBytes(5L * 1024 * 1024 * 1024); 123 try (S3Client client = createS3Client(clientOptions)) { 124 125 } 126 } 127 128 @Test testS3ClientCreateDestroyWithCredentialsProvider()129 public void testS3ClientCreateDestroyWithCredentialsProvider() { 130 skipIfAndroid(); 131 skipIfNetworkUnavailable(); 132 133 try (EventLoopGroup elg = new EventLoopGroup(0, 1); 134 HostResolver hostResolver = new HostResolver(elg); 135 ClientBootstrap clientBootstrap = new ClientBootstrap(elg, hostResolver); 136 DefaultChainCredentialsProvider credentialsProvider = new DefaultChainCredentialsProvider.DefaultChainCredentialsProviderBuilder() 137 .withClientBootstrap(clientBootstrap).build();) { 138 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION) 139 .withClientBootstrap(clientBootstrap).withCredentialsProvider(credentialsProvider); 140 try (S3Client client = new S3Client(clientOptions)) { 141 assertNotNull(client); 142 } 143 } 144 } 145 146 @Test testS3ClientCreateDestroyWithoutSigningConfig()147 public void testS3ClientCreateDestroyWithoutSigningConfig() throws Exception { 148 skipIfAndroid(); 149 skipIfNetworkUnavailable(); 150 try (EventLoopGroup elg = new EventLoopGroup(0, 1); 151 HostResolver hostResolver = new HostResolver(elg); 152 ClientBootstrap clientBootstrap = new ClientBootstrap(elg, hostResolver);) { 153 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION) 154 .withClientBootstrap(clientBootstrap); 155 try (S3Client client = new S3Client(clientOptions)) { 156 157 } 158 } 159 } 160 161 /* Test that a client can be created successfully with retry options. */ 162 @Test testS3ClientCreateDestroyRetryOptions()163 public void testS3ClientCreateDestroyRetryOptions() { 164 skipIfAndroid(); 165 skipIfNetworkUnavailable(); 166 167 try (EventLoopGroup elg = new EventLoopGroup(0, 1); EventLoopGroup retry_elg = new EventLoopGroup(0, 1)) { 168 169 final StandardRetryOptions standardRetryOptions = new StandardRetryOptions().withInitialBucketCapacity(123) 170 .withBackoffRetryOptions(new ExponentialBackoffRetryOptions().withEventLoopGroup(retry_elg)); 171 172 try (S3Client client = createS3Client(new S3ClientOptions().withRegion(REGION) 173 .withStandardRetryOptions(standardRetryOptions), elg)) { 174 175 } 176 } 177 } 178 179 /* 180 * Test that a client can be created successfully with retry options that do not 181 * specify an ELG. 182 */ 183 @Test testS3ClientCreateDestroyRetryOptionsUnspecifiedELG()184 public void testS3ClientCreateDestroyRetryOptionsUnspecifiedELG() { 185 skipIfAndroid(); 186 skipIfNetworkUnavailable(); 187 188 try (EventLoopGroup elg = new EventLoopGroup(0, 1)) { 189 190 final StandardRetryOptions standardRetryOptions = new StandardRetryOptions().withInitialBucketCapacity(123) 191 .withBackoffRetryOptions(new ExponentialBackoffRetryOptions().withMaxRetries(30)); 192 193 try (S3Client client = createS3Client(new S3ClientOptions().withRegion(REGION) 194 .withStandardRetryOptions(standardRetryOptions), elg)) { 195 196 } 197 } 198 } 199 200 /* 201 * Test that a client can be created successfully with Tcp Keep Alive options. 202 */ 203 @Test testS3ClientCreateDestroyTcpKeepAliveOptions()204 public void testS3ClientCreateDestroyTcpKeepAliveOptions() { 205 skipIfAndroid(); 206 skipIfNetworkUnavailable(); 207 208 try (EventLoopGroup elg = new EventLoopGroup(0, 1); EventLoopGroup retry_elg = new EventLoopGroup(0, 1)) { 209 210 S3TcpKeepAliveOptions tcpKeepAliveOptions = new S3TcpKeepAliveOptions(); 211 tcpKeepAliveOptions.setKeepAliveIntervalSec((short) 10); 212 tcpKeepAliveOptions.setKeepAliveTimeoutSec((short) 20); 213 tcpKeepAliveOptions.setKeepAliveMaxFailedProbes((short) 30); 214 215 try (S3Client client = createS3Client(new S3ClientOptions().withRegion(REGION) 216 .withS3TcpKeepAliveOptions(tcpKeepAliveOptions), elg)) { 217 218 } 219 } 220 } 221 222 /* 223 * Test that a client can be created successfully with monitoring options. 224 */ 225 @Test testS3ClientCreateDestroyMonitoringOptions()226 public void testS3ClientCreateDestroyMonitoringOptions() { 227 skipIfAndroid(); 228 skipIfNetworkUnavailable(); 229 230 try (EventLoopGroup elg = new EventLoopGroup(0, 1); EventLoopGroup retry_elg = new EventLoopGroup(0, 1)) { 231 232 HttpMonitoringOptions monitoringOptions = new HttpMonitoringOptions(); 233 monitoringOptions.setMinThroughputBytesPerSecond(100); 234 monitoringOptions.setAllowableThroughputFailureIntervalSeconds(10); 235 try (S3Client client = createS3Client(new S3ClientOptions().withRegion(REGION) 236 .withHttpMonitoringOptions(monitoringOptions).withConnectTimeoutMs(10), elg)) { 237 238 } 239 } 240 } 241 242 /* 243 * Test that a client can be created successfully with proxy options. 244 */ 245 @Test testS3ClientCreateDestroyHttpProxyOptions()246 public void testS3ClientCreateDestroyHttpProxyOptions() { 247 skipIfAndroid(); 248 skipIfNetworkUnavailable(); 249 try (EventLoopGroup elg = new EventLoopGroup(0, 1); 250 EventLoopGroup retry_elg = new EventLoopGroup(0, 1); 251 TlsContextOptions tlsContextOptions = TlsContextOptions.createDefaultClient(); 252 TlsContext tlsContext = new TlsContext(tlsContextOptions);) { 253 HttpProxyOptions proxyOptions = new HttpProxyOptions(); 254 proxyOptions.setHost("localhost"); 255 proxyOptions.setConnectionType(HttpProxyOptions.HttpProxyConnectionType.Tunneling); 256 proxyOptions.setPort(80); 257 proxyOptions.setTlsContext(tlsContext); 258 proxyOptions.setAuthorizationType(HttpProxyOptions.HttpProxyAuthorizationType.Basic); 259 proxyOptions.setAuthorizationUsername("username"); 260 proxyOptions.setAuthorizationPassword("password"); 261 try (S3Client client = createS3Client(new S3ClientOptions().withRegion(REGION) 262 .withProxyOptions(proxyOptions), elg)) { 263 } 264 } 265 } 266 267 /* 268 * Test that a client can be created successfully with proxy environment 269 * variable setting. 270 */ 271 @Test testS3ClientCreateDestroyHttpProxyEnvironmentVariableSetting()272 public void testS3ClientCreateDestroyHttpProxyEnvironmentVariableSetting() { 273 skipIfAndroid(); 274 skipIfNetworkUnavailable(); 275 try (EventLoopGroup elg = new EventLoopGroup(0, 1); 276 EventLoopGroup retry_elg = new EventLoopGroup(0, 1); 277 TlsContextOptions tlsContextOptions = TlsContextOptions.createDefaultClient(); 278 TlsContext tlsContext = new TlsContext(tlsContextOptions); 279 TlsConnectionOptions tlsConnectionOptions = new TlsConnectionOptions(tlsContext);) { 280 HttpProxyEnvironmentVariableSetting environmentVariableSetting = new HttpProxyEnvironmentVariableSetting(); 281 environmentVariableSetting.setConnectionType(HttpProxyOptions.HttpProxyConnectionType.Tunneling); 282 environmentVariableSetting.setEnvironmentVariableType( 283 HttpProxyEnvironmentVariableSetting.HttpProxyEnvironmentVariableType.DISABLED); 284 environmentVariableSetting.setTlsConnectionOptions(tlsConnectionOptions); 285 try (S3Client client = createS3Client(new S3ClientOptions().withRegion(REGION) 286 .withProxyEnvironmentVariableSetting(environmentVariableSetting), elg)) { 287 } 288 } 289 } 290 291 @Test testS3Get()292 public void testS3Get() { 293 skipIfAndroid(); 294 skipIfNetworkUnavailable(); 295 Assume.assumeTrue(hasAwsCredentials()); 296 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION); 297 try (S3Client client = createS3Client(clientOptions)) { 298 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 299 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 300 301 @Override 302 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 303 byte[] bytes = new byte[bodyBytesIn.remaining()]; 304 bodyBytesIn.get(bytes); 305 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, "Body Response: " + Arrays.toString(bytes)); 306 return 0; 307 } 308 309 @Override 310 public void onFinished(S3FinishedResponseContext context) { 311 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 312 "Meta request finished with error code " + context.getErrorCode()); 313 if (context.getErrorCode() != 0) { 314 onFinishedFuture.completeExceptionally(makeExceptionFromFinishedResponseContext(context)); 315 return; 316 } 317 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 318 } 319 }; 320 321 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) }; 322 HttpRequest httpRequest = new HttpRequest("GET", "/get_object_test_1MB.txt", headers, null); 323 324 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 325 .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) 326 .withResponseHandler(responseHandler); 327 328 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 329 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); 330 } 331 } catch (InterruptedException | ExecutionException ex) { 332 Assert.fail(ex.getMessage()); 333 } 334 } 335 336 @Test testS3GetErrorHeadersAreReported()337 public void testS3GetErrorHeadersAreReported() { 338 skipIfAndroid(); 339 skipIfNetworkUnavailable(); 340 Assume.assumeTrue(hasAwsCredentials()); 341 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION); 342 try (S3Client client = createS3Client(clientOptions)) { 343 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 344 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 345 346 @Override 347 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 348 byte[] bytes = new byte[bodyBytesIn.remaining()]; 349 bodyBytesIn.get(bytes); 350 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, "Body Response: " + Arrays.toString(bytes)); 351 return 0; 352 } 353 354 @Override 355 public void onFinished(S3FinishedResponseContext context) { 356 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 357 "Meta request finished with error code " + context.getErrorCode()); 358 try { 359 assertNotNull(context.getErrorHeaders()); 360 assertTrue(context.getErrorCode() > 0); 361 onFinishedFuture.complete(0); // Assertions passed 362 } catch (AssertionError e) { 363 onFinishedFuture.complete(-1); // Assertions failed 364 } 365 } 366 }; 367 368 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) }; 369 HttpRequest httpRequest = new HttpRequest("GET", "/key_does_not_exist.txt", headers, null); 370 371 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 372 .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) 373 .withResponseHandler(responseHandler); 374 375 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 376 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); 377 } 378 } catch (InterruptedException | ExecutionException ex) { 379 Assert.fail(ex.getMessage()); 380 } 381 } 382 383 @Test testS3GetAfterClientIsClose()384 public void testS3GetAfterClientIsClose() { 385 skipIfAndroid(); 386 skipIfNetworkUnavailable(); 387 Assume.assumeTrue(hasAwsCredentials()); 388 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION); 389 S3Client client = createS3Client(clientOptions); 390 client.close(); 391 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 392 393 @Override 394 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 395 return 0; 396 } 397 398 @Override 399 public void onFinished(S3FinishedResponseContext context) { 400 } 401 }; 402 403 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) }; 404 HttpRequest httpRequest = new HttpRequest("GET", "/get_object_test_1MB.txt", headers, null); 405 406 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 407 .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) 408 .withResponseHandler(responseHandler); 409 410 assertThrows(IllegalStateException.class, () -> client.makeMetaRequest(metaRequestOptions)); 411 } 412 413 @Test testS3CallbackExceptionIsProperlyPropagated()414 public void testS3CallbackExceptionIsProperlyPropagated() { 415 skipIfAndroid(); 416 skipIfNetworkUnavailable(); 417 Assume.assumeTrue(hasAwsCredentials()); 418 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION); 419 RuntimeException expectedException = new RuntimeException("Exception From a Java Function"); 420 421 try (S3Client client = createS3Client(clientOptions)) { 422 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 423 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 424 425 @Override 426 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 427 throw expectedException; 428 } 429 430 @Override 431 public void onFinished(S3FinishedResponseContext context) { 432 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 433 "Meta request finished with error code " + context.getErrorCode()); 434 if (context.getErrorCode() != 0) { 435 onFinishedFuture.completeExceptionally(context.getCause()); 436 return; 437 } 438 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 439 } 440 }; 441 442 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) }; 443 HttpRequest httpRequest = new HttpRequest("GET", "/get_object_test_1MB.txt", headers, null); 444 445 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 446 .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) 447 .withResponseHandler(responseHandler); 448 449 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 450 ExecutionException ex = assertThrows(ExecutionException.class, () -> onFinishedFuture.get()); 451 Assert.assertSame(expectedException, ex.getCause()); 452 } 453 } 454 } 455 456 @Test testS3GetWithEndpoint()457 public void testS3GetWithEndpoint() { 458 skipIfAndroid(); 459 skipIfNetworkUnavailable(); 460 Assume.assumeTrue(hasAwsCredentials()); 461 462 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION); 463 try (S3Client client = createS3Client(clientOptions)) { 464 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 465 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 466 467 @Override 468 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 469 byte[] bytes = new byte[bodyBytesIn.remaining()]; 470 bodyBytesIn.get(bytes); 471 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, "Body Response: " + Arrays.toString(bytes)); 472 return 0; 473 } 474 475 @Override 476 public void onFinished(S3FinishedResponseContext context) { 477 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 478 "Meta request finished with error code " + context.getErrorCode()); 479 if (context.getErrorCode() != 0) { 480 onFinishedFuture.completeExceptionally(makeExceptionFromFinishedResponseContext(context)); 481 return; 482 } 483 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 484 } 485 }; 486 487 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT + ":443") }; 488 HttpRequest httpRequest = new HttpRequest("GET", "/get_object_test_1MB.txt", headers, null); 489 490 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 491 .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) 492 .withResponseHandler(responseHandler) 493 .withEndpoint(URI.create("https://" + ENDPOINT + ":443")); 494 495 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 496 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); 497 } 498 } catch (Exception ex /* InterruptedException | ExecutionException ex */) { 499 Assert.fail(ex.getMessage()); 500 } 501 } 502 503 /** 504 * Test read-backpressure by repeatedly: 505 * - letting the download stall 506 * - incrementing the read window 507 * - repeat... 508 */ 509 @Test testS3GetWithBackpressure()510 public void testS3GetWithBackpressure() { 511 skipIfAndroid(); 512 skipIfNetworkUnavailable(); 513 Assume.assumeTrue(hasAwsCredentials()); 514 515 final String filePath = "/get_object_test_1MB.txt"; 516 final long fileSize = 1 * 1024 * 1024; 517 S3ClientOptions clientOptions = new S3ClientOptions() 518 519 .withRegion(REGION) 520 .withReadBackpressureEnabled(true) 521 .withInitialReadWindowSize(1024) 522 .withPartSize(fileSize / 4); 523 final long windowIncrementSize = clientOptions.getPartSize() / 2; 524 525 // how long to wait after download stalls, before incrementing read window again 526 final Duration ifNothingHappensAfterThisLongItStalled = Duration.ofSeconds(1); 527 int stallCount = 0; 528 Clock clock = Clock.systemUTC(); 529 Instant lastTimeSomethingHappened = clock.instant(); 530 AtomicLong downloadSizeDelta = new AtomicLong(0); 531 532 try (S3Client client = createS3Client(clientOptions)) { 533 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 534 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 535 536 @Override 537 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 538 int numBytes = bodyBytesIn.remaining(); 539 downloadSizeDelta.addAndGet(numBytes); 540 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, "Body Response numBytes:" + numBytes); 541 return 0; 542 } 543 544 @Override 545 public void onFinished(S3FinishedResponseContext context) { 546 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 547 "Meta request finished with error code " + context.getErrorCode()); 548 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 549 } 550 }; 551 552 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) }; 553 HttpRequest httpRequest = new HttpRequest("GET", filePath, headers, null); 554 555 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 556 .withMetaRequestType(MetaRequestType.GET_OBJECT) 557 .withHttpRequest(httpRequest) 558 .withResponseHandler(responseHandler); 559 560 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 561 562 while (!onFinishedFuture.isDone()) { 563 // Check if we've received data since last loop 564 long lastDownloadSizeDelta = downloadSizeDelta.getAndSet(0); 565 566 // Figure out how long it's been since we last received data 567 Instant currentTime = clock.instant(); 568 if (lastDownloadSizeDelta != 0) { 569 lastTimeSomethingHappened = clock.instant(); 570 } 571 572 Duration timeSinceSomethingHappened = Duration.between(lastTimeSomethingHappened, currentTime); 573 574 // If it seems like data has stopped flowing, then we know a stall happened due 575 // to backpressure. 576 if (timeSinceSomethingHappened.compareTo(ifNothingHappensAfterThisLongItStalled) >= 0) { 577 stallCount += 1; 578 lastTimeSomethingHappened = currentTime; 579 580 // Increment window so that download continues... 581 metaRequest.incrementReadWindow(windowIncrementSize); 582 } 583 584 // Sleep a moment and loop again... 585 Thread.sleep(100); 586 } 587 588 // Assert that download stalled due to backpressure at some point 589 Assert.assertTrue(stallCount > 0); 590 591 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); 592 } 593 } catch (InterruptedException | ExecutionException ex) { 594 Assert.fail(ex.getMessage()); 595 } 596 } 597 598 // Test that we can increment the flow-control window by returning a number from 599 // the onResponseBody callback 600 @Test testS3GetWithBackpressureIncrementViaOnResponseBody()601 public void testS3GetWithBackpressureIncrementViaOnResponseBody() { 602 skipIfAndroid(); 603 skipIfNetworkUnavailable(); 604 Assume.assumeTrue(hasAwsCredentials()); 605 606 final String filePath = "/get_object_test_1MB.txt"; 607 final long fileSize = 1 * 1024 * 1024; 608 S3ClientOptions clientOptions = new S3ClientOptions() 609 610 .withRegion(REGION) 611 .withReadBackpressureEnabled(true) 612 .withInitialReadWindowSize(1024) 613 .withPartSize(fileSize / 4); 614 615 try (S3Client client = createS3Client(clientOptions)) { 616 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 617 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 618 619 @Override 620 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 621 int numBytes = bodyBytesIn.remaining(); 622 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, "Body Response numBytes:" + numBytes); 623 return numBytes; 624 } 625 626 @Override 627 public void onFinished(S3FinishedResponseContext context) { 628 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 629 "Meta request finished with error code " + context.getErrorCode()); 630 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 631 } 632 }; 633 634 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) }; 635 HttpRequest httpRequest = new HttpRequest("GET", filePath, headers, null); 636 637 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 638 .withMetaRequestType(MetaRequestType.GET_OBJECT) 639 .withHttpRequest(httpRequest) 640 .withResponseHandler(responseHandler); 641 642 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 643 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get(60, TimeUnit.SECONDS)); 644 } 645 } catch (InterruptedException | ExecutionException | TimeoutException ex) { 646 Assert.fail(ex.getMessage()); 647 } 648 } 649 650 @Test testS3OverrideRequestCredentials()651 public void testS3OverrideRequestCredentials() { 652 skipIfAndroid(); 653 skipIfNetworkUnavailable(); 654 Assume.assumeTrue(hasAwsCredentials()); 655 656 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION); 657 boolean expectedException = false; 658 byte[] madeUpCredentials = "I am a madeup credentials".getBytes(); 659 StaticCredentialsProvider.StaticCredentialsProviderBuilder builder = new StaticCredentialsProvider.StaticCredentialsProviderBuilder() 660 .withAccessKeyId(madeUpCredentials).withSecretAccessKey(madeUpCredentials); 661 try (S3Client client = createS3Client(clientOptions); 662 CredentialsProvider emptyCredentialsProvider = builder.build(); 663 AwsSigningConfig signingConfig = AwsSigningConfig.getDefaultS3SigningConfig(REGION, 664 emptyCredentialsProvider);) { 665 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 666 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 667 @Override 668 public void onFinished(S3FinishedResponseContext context) { 669 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 670 "Meta request finished with error code " + context.getErrorCode()); 671 if (context.getErrorCode() != 0) { 672 onFinishedFuture.completeExceptionally(new CrtRuntimeException(context.getErrorCode())); 673 return; 674 } 675 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 676 } 677 }; 678 679 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) }; 680 HttpRequest httpRequest = new HttpRequest("GET", "/get_object_test_1MB.txt", headers, null); 681 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 682 .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) 683 .withResponseHandler(responseHandler).withSigningConfig(signingConfig); 684 685 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 686 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); 687 } 688 } catch (InterruptedException | ExecutionException ex) { 689 expectedException = true; 690 /* 691 * Maybe better to have a cause of the max retries exceed to be more informative 692 */ 693 if (!(ex.getCause() instanceof CrtRuntimeException)) { 694 Assert.fail(ex.getMessage()); 695 } 696 } 697 Assert.assertTrue(expectedException); 698 } 699 700 @Test testS3GetWithSignConfigShouldSignHeader()701 public void testS3GetWithSignConfigShouldSignHeader() throws Exception { 702 skipIfAndroid(); 703 skipIfNetworkUnavailable(); 704 Assume.assumeTrue(hasAwsCredentials()); 705 706 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION); 707 Predicate<String> shouldSignHeader = name -> !name.equalsIgnoreCase("DoNotSignThis"); 708 try (S3Client client = createS3Client(clientOptions); 709 EventLoopGroup elg = new EventLoopGroup(0, 1); 710 HostResolver hostResolver = new HostResolver(elg); 711 ClientBootstrap clientBootstrap = new ClientBootstrap(elg, hostResolver); 712 DefaultChainCredentialsProvider credentialsProvider = new DefaultChainCredentialsProvider.DefaultChainCredentialsProviderBuilder() 713 .withClientBootstrap(clientBootstrap).build(); 714 AwsSigningConfig signingConfig = AwsSigningConfig.getDefaultS3SigningConfig(REGION, 715 credentialsProvider);) { 716 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 717 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 718 @Override 719 public void onFinished(S3FinishedResponseContext context) { 720 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 721 "Meta request finished with error code " + context.getErrorCode()); 722 if (context.getErrorCode() != 0) { 723 onFinishedFuture.completeExceptionally(makeExceptionFromFinishedResponseContext(context)); 724 return; 725 } 726 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 727 } 728 }; 729 730 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) }; 731 HttpRequest httpRequest = new HttpRequest("GET", "/get_object_test_1MB.txt", headers, null); 732 733 signingConfig.setShouldSignHeader(shouldSignHeader); 734 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 735 .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) 736 .withResponseHandler(responseHandler).withSigningConfig(signingConfig); 737 738 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 739 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); 740 } 741 } 742 } 743 createTestPayload(int size)744 private byte[] createTestPayload(int size) { 745 String msg = "This is an S3 Java CRT Client Test"; 746 ByteBuffer payload = ByteBuffer.allocate(size); 747 while (true) { 748 try { 749 payload.put(msg.getBytes()); 750 } catch (BufferOverflowException ex1) { 751 while (true) { 752 try { 753 payload.put("#".getBytes()); 754 } catch (BufferOverflowException ex2) { 755 break; 756 } 757 } 758 break; 759 } 760 } 761 return payload.array(); 762 } 763 testS3PutHelper(boolean useFile, boolean unknownContentLength, String objectPath, boolean s3express, int contentLength)764 private void testS3PutHelper(boolean useFile, boolean unknownContentLength, String objectPath, boolean s3express, 765 int contentLength) throws IOException { 766 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION).withEnableS3Express(s3express); 767 Path uploadFilePath = Files.createTempFile("testS3PutFilePath", ".txt"); 768 try (S3Client client = createS3Client(clientOptions)) { 769 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 770 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 771 772 @Override 773 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 774 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, "Body Response: " + bodyBytesIn.toString()); 775 return 0; 776 } 777 778 @Override 779 public void onFinished(S3FinishedResponseContext context) { 780 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 781 "Meta request finished with error code " + context.getErrorCode()); 782 if (context.getErrorCode() != 0) { 783 onFinishedFuture.completeExceptionally(makeExceptionFromFinishedResponseContext(context)); 784 return; 785 } 786 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 787 } 788 }; 789 790 HttpHeader[] headers = { 791 new HttpHeader("Host", s3express ? S3EXPRESS_ENDPOINT : ENDPOINT), 792 new HttpHeader("x-amz-sdk-checksum-algorithm", "SHA1") 793 }; 794 HttpRequest httpRequest; 795 String path = objectPath == null ? "/put_object_test_10MB.txt" : objectPath; 796 String encodedPath = Uri.encodeUriPath(path); 797 final ByteBuffer payload = ByteBuffer.wrap(createTestPayload(contentLength)); 798 HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() { 799 @Override 800 public boolean sendRequestBody(ByteBuffer outBuffer) { 801 ByteBufferUtils.transferData(payload, outBuffer); 802 return payload.remaining() == 0; 803 } 804 805 @Override 806 public boolean resetPosition() { 807 return true; 808 } 809 810 @Override 811 public long getLength() { 812 return payload.capacity(); 813 } 814 }; 815 if (useFile) { 816 Files.write(uploadFilePath, createTestPayload(contentLength)); 817 httpRequest = new HttpRequest("PUT", encodedPath, headers, null); 818 } else { 819 httpRequest = new HttpRequest("PUT", encodedPath, headers, payloadStream); 820 } 821 822 if (!unknownContentLength) { 823 httpRequest.addHeader( 824 new HttpHeader("Content-Length", Integer.valueOf(contentLength).toString())); 825 } 826 AwsSigningConfig config = AwsSigningConfig.getDefaultS3SigningConfig(REGION, null); 827 ChecksumConfig checksumConfig = new ChecksumConfig().withChecksumAlgorithm(ChecksumAlgorithm.SHA1) 828 .withChecksumLocation(ChecksumLocation.TRAILER).withValidateChecksum(true); 829 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 830 .withMetaRequestType(MetaRequestType.PUT_OBJECT).withHttpRequest(httpRequest) 831 .withResponseHandler(responseHandler) 832 .withChecksumConfig(checksumConfig); 833 if (s3express) { 834 config.setAlgorithm(AwsSigningConfig.AwsSigningAlgorithm.SIGV4_S3EXPRESS); 835 metaRequestOptions = metaRequestOptions.withSigningConfig(config); 836 } 837 if (useFile) { 838 metaRequestOptions = metaRequestOptions.withRequestFilePath(uploadFilePath); 839 } 840 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 841 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); 842 } 843 } catch (InterruptedException | ExecutionException ex) { 844 Assert.fail(ex.getMessage()); 845 } finally { 846 Files.deleteIfExists(uploadFilePath); 847 } 848 } 849 850 @Test testS3Put()851 public void testS3Put() throws IOException { 852 skipIfAndroid(); 853 skipIfNetworkUnavailable(); 854 Assume.assumeTrue(hasAwsCredentials()); 855 testS3PutHelper(false, false, null, false, 16 * 1024 * 1024); 856 } 857 858 // Test that we can upload by passing a filepath instead of an HTTP body stream 859 @Test testS3PutFilePath()860 public void testS3PutFilePath() throws IOException { 861 skipIfAndroid(); 862 skipIfNetworkUnavailable(); 863 Assume.assumeTrue(hasAwsCredentials()); 864 testS3PutHelper(true, false, null, false, 10 * 1024 * 1024); 865 } 866 867 // Test that we can upload without provide the content length 868 @Test testS3PutUnknownContentLength()869 public void testS3PutUnknownContentLength() throws IOException { 870 skipIfAndroid(); 871 skipIfNetworkUnavailable(); 872 Assume.assumeTrue(hasAwsCredentials()); 873 testS3PutHelper(false, true, null, false, 10 * 1024 * 1024); 874 } 875 876 // Test that we can upload to a path with special characters 877 @Test testS3PutSpecialCharPath()878 public void testS3PutSpecialCharPath() throws IOException { 879 skipIfAndroid(); 880 skipIfNetworkUnavailable(); 881 Assume.assumeTrue(hasAwsCredentials()); 882 testS3PutHelper(false, true, "/put_object_test_10MB@$%.txt", false, 10 * 1024 * 1024); 883 } 884 885 @Test testS3PutS3Express()886 public void testS3PutS3Express() throws IOException { 887 skipIfNetworkUnavailable(); 888 Assume.assumeTrue(hasAwsCredentials()); 889 testS3PutHelper(false, false, null, true, 16 * 1024 * 1024); 890 } 891 892 @Test testS3PutS3ExpressSpecialCharPath()893 public void testS3PutS3ExpressSpecialCharPath() throws IOException { 894 skipIfNetworkUnavailable(); 895 Assume.assumeTrue(hasAwsCredentials()); 896 testS3PutHelper(false, true, "/put_object_test_10MB@$%.txt", true, 10 * 1024 * 1024); 897 } 898 899 // Test that passing a nonexistent file path will cause an error 900 @Test testS3PutNonexistentFilePath()901 public void testS3PutNonexistentFilePath() throws IOException { 902 skipIfAndroid(); 903 skipIfNetworkUnavailable(); 904 Assume.assumeTrue(hasAwsCredentials()); 905 906 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION); 907 try (S3Client client = createS3Client(clientOptions)) { 908 // response handler does nothing, it just needs to exist for this test 909 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 910 }; 911 912 HttpHeader[] headers = { 913 new HttpHeader("Host", ENDPOINT), 914 new HttpHeader("Content-Length", String.valueOf(1024)), 915 }; 916 HttpRequest httpRequest = new HttpRequest("PUT", "/put_nonexistent_file", headers, null); 917 918 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 919 .withMetaRequestType(MetaRequestType.PUT_OBJECT) 920 .withHttpRequest(httpRequest) 921 .withRequestFilePath(Paths.get("obviously_nonexistent_file.derp")) 922 .withResponseHandler(responseHandler); 923 924 // makeMetaRequest() should fail 925 Throwable thrown = assertThrows(Throwable.class, 926 () -> client.makeMetaRequest(metaRequestOptions)); 927 928 // exception should indicate the file doesn't exist 929 String exceptionString = thrown.toString(); 930 Assert.assertTrue(exceptionString.contains("AWS_ERROR_FILE_INVALID_PATH")); 931 } 932 } 933 createTestPutPauseResumeHandler(CompletableFuture<Integer> onFinishedFuture, CompletableFuture<Void> onProgressFuture)934 private S3MetaRequestResponseHandler createTestPutPauseResumeHandler(CompletableFuture<Integer> onFinishedFuture, 935 CompletableFuture<Void> onProgressFuture) { 936 return new S3MetaRequestResponseHandler() { 937 @Override 938 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 939 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, "Body Response: " + bodyBytesIn.toString()); 940 return 0; 941 } 942 943 @Override 944 public void onFinished(S3FinishedResponseContext context) { 945 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 946 "Meta request finished with error code " + context.getErrorCode()); 947 if (context.getErrorCode() != 0) { 948 onFinishedFuture.completeExceptionally(new CrtRuntimeException(context.getErrorCode())); 949 return; 950 } 951 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 952 } 953 954 @Override 955 public void onProgress(final S3MetaRequestProgress progress) { 956 onProgressFuture.complete(null); 957 } 958 }; 959 } 960 961 @Test 962 public void testS3PutPauseResume() { 963 skipIfAndroid(); 964 skipIfNetworkUnavailable(); 965 Assume.assumeTrue(hasAwsCredentials()); 966 967 S3ClientOptions clientOptions = new S3ClientOptions() 968 969 .withRegion(REGION); 970 try (S3Client client = createS3Client(clientOptions)) { 971 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 972 CompletableFuture<Void> onProgressFuture = new CompletableFuture<>(); 973 S3MetaRequestResponseHandler responseHandler = createTestPutPauseResumeHandler(onFinishedFuture, 974 onProgressFuture); 975 976 final ByteBuffer payload = ByteBuffer.wrap(createTestPayload(128 * 1024 * 1024)); 977 HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() { 978 @Override 979 public boolean sendRequestBody(ByteBuffer outBuffer) { 980 ByteBufferUtils.transferData(payload, outBuffer); 981 return payload.remaining() == 0; 982 } 983 984 @Override 985 public boolean resetPosition() { 986 return true; 987 } 988 989 @Override 990 public long getLength() { 991 return payload.capacity(); 992 } 993 }; 994 995 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT), 996 new HttpHeader("Content-Length", Integer.valueOf(payload.capacity()).toString()), }; 997 998 HttpRequest httpRequest = new HttpRequest("PUT", "/put_object_test_128MB.txt", headers, payloadStream); 999 1000 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 1001 .withMetaRequestType(MetaRequestType.PUT_OBJECT) 1002 .withChecksumAlgorithm(ChecksumAlgorithm.CRC32) 1003 .withHttpRequest(httpRequest) 1004 .withResponseHandler(responseHandler); 1005 1006 ResumeToken resumeToken; 1007 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 1008 onProgressFuture.get(); 1009 1010 resumeToken = metaRequest.pause(); 1011 Assert.assertNotNull(resumeToken); 1012 1013 Throwable thrown = assertThrows(Throwable.class, 1014 () -> onFinishedFuture.get()); 1015 1016 Assert.assertEquals("AWS_ERROR_S3_PAUSED", ((CrtRuntimeException) thrown.getCause()).errorName); 1017 } 1018 1019 final ByteBuffer payloadResume = ByteBuffer.wrap(createTestPayload(128 * 1024 * 1024)); 1020 HttpRequestBodyStream payloadStreamResume = new HttpRequestBodyStream() { 1021 @Override 1022 public boolean sendRequestBody(ByteBuffer outBuffer) { 1023 ByteBufferUtils.transferData(payloadResume, outBuffer); 1024 return payloadResume.remaining() == 0; 1025 } 1026 1027 @Override 1028 public boolean resetPosition() { 1029 return true; 1030 } 1031 1032 @Override 1033 public long getLength() { 1034 return payloadResume.capacity(); 1035 } 1036 }; 1037 1038 HttpHeader[] headersResume = { new HttpHeader("Host", ENDPOINT), 1039 new HttpHeader("Content-Length", Integer.valueOf(payloadResume.capacity()).toString()), }; 1040 1041 HttpRequest httpRequestResume = new HttpRequest("PUT", 1042 "/put_object_test_128MB.txt", headersResume, payloadStreamResume); 1043 1044 CompletableFuture<Integer> onFinishedFutureResume = new CompletableFuture<>(); 1045 CompletableFuture<Void> onProgressFutureResume = new CompletableFuture<>(); 1046 S3MetaRequestResponseHandler responseHandlerResume = createTestPutPauseResumeHandler(onFinishedFutureResume, 1047 onProgressFutureResume); 1048 S3MetaRequestOptions metaRequestOptionsResume = new S3MetaRequestOptions() 1049 .withMetaRequestType(MetaRequestType.PUT_OBJECT) 1050 .withHttpRequest(httpRequestResume) 1051 .withResponseHandler(responseHandlerResume) 1052 .withChecksumAlgorithm(ChecksumAlgorithm.CRC32) 1053 .withResumeToken(new ResumeToken.PutResumeTokenBuilder() 1054 .withPartSize(resumeToken.getPartSize()) 1055 .withTotalNumParts(resumeToken.getTotalNumParts()) 1056 .withNumPartsCompleted(resumeToken.getNumPartsCompleted()) 1057 .withUploadId(resumeToken.getUploadId()) 1058 .build()); 1059 1060 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptionsResume)) { 1061 Integer finish = onFinishedFutureResume.get(); 1062 Assert.assertEquals(Integer.valueOf(0), finish); 1063 } 1064 } catch (InterruptedException | ExecutionException ex) { 1065 Assert.fail(ex.getMessage()); 1066 } 1067 } 1068 1069 @Test 1070 public void testS3PutTrailerChecksums() { 1071 skipIfAndroid(); 1072 skipIfNetworkUnavailable(); 1073 Assume.assumeTrue(hasAwsCredentials()); 1074 1075 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION); 1076 try (S3Client client = createS3Client(clientOptions)) { 1077 CompletableFuture<Integer> onPutFinishedFuture = new CompletableFuture<>(); 1078 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 1079 1080 @Override 1081 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 1082 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, "Body Response: " + bodyBytesIn.toString()); 1083 return 0; 1084 } 1085 1086 @Override 1087 public void onFinished(S3FinishedResponseContext context) { 1088 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 1089 "Meta request finished with error code " + context.getErrorCode()); 1090 if (context.getErrorCode() != 0) { 1091 onPutFinishedFuture.completeExceptionally(makeExceptionFromFinishedResponseContext(context)); 1092 return; 1093 } 1094 onPutFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 1095 } 1096 }; 1097 1098 final ByteBuffer payload = ByteBuffer.wrap(createTestPayload(1024 * 1024)); 1099 1100 HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() { 1101 @Override 1102 public boolean sendRequestBody(ByteBuffer outBuffer) { 1103 ByteBufferUtils.transferData(payload, outBuffer); 1104 return payload.remaining() == 0; 1105 } 1106 1107 @Override 1108 public boolean resetPosition() { 1109 return true; 1110 } 1111 1112 @Override 1113 public long getLength() { 1114 return payload.capacity(); 1115 } 1116 }; 1117 1118 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT), 1119 new HttpHeader("Content-Length", Integer.valueOf(payload.capacity()).toString()), }; 1120 1121 HttpRequest httpRequest = new HttpRequest("PUT", "/java_round_trip_test_fc.txt", headers, payloadStream); 1122 ChecksumConfig config = new ChecksumConfig().withChecksumAlgorithm(ChecksumAlgorithm.CRC32) 1123 .withChecksumLocation(ChecksumLocation.TRAILER); 1124 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 1125 .withMetaRequestType(MetaRequestType.PUT_OBJECT).withHttpRequest(httpRequest) 1126 .withResponseHandler(responseHandler) 1127 .withChecksumConfig(config); 1128 1129 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 1130 Assert.assertEquals(Integer.valueOf(0), onPutFinishedFuture.get()); 1131 } 1132 1133 // Get request! 1134 1135 HttpHeader[] getHeaders = { new HttpHeader("Host", ENDPOINT), }; 1136 1137 HttpRequest httpGetRequest = new HttpRequest("GET", "/java_round_trip_test_fc.txt", getHeaders, null); 1138 1139 CompletableFuture<Integer> onGetFinishedFuture = new CompletableFuture<>(); 1140 S3MetaRequestResponseHandler getResponseHandler = new S3MetaRequestResponseHandler() { 1141 1142 @Override 1143 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 1144 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, "Body Response: " + bodyBytesIn.toString()); 1145 return 0; 1146 } 1147 1148 @Override 1149 public void onFinished(S3FinishedResponseContext context) { 1150 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 1151 "Meta request finished with error code " + context.getErrorCode()); 1152 if (context.getErrorCode() != 0) { 1153 onGetFinishedFuture.completeExceptionally(makeExceptionFromFinishedResponseContext(context)); 1154 return; 1155 } 1156 if (!context.isChecksumValidated()) { 1157 onGetFinishedFuture.completeExceptionally( 1158 new RuntimeException("Checksum was not validated")); 1159 return; 1160 } 1161 if (context.getChecksumAlgorithm() != ChecksumAlgorithm.CRC32) { 1162 onGetFinishedFuture.completeExceptionally( 1163 new RuntimeException("Checksum was not validated via CRC32")); 1164 return; 1165 } 1166 onGetFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 1167 } 1168 }; 1169 ArrayList<ChecksumAlgorithm> algorList = new ArrayList<ChecksumAlgorithm>(); 1170 algorList.add(ChecksumAlgorithm.CRC32); 1171 ChecksumConfig validateChecksumConfig = new ChecksumConfig().withValidateChecksum(true) 1172 .withValidateChecksumAlgorithmList(algorList); 1173 S3MetaRequestOptions getRequestOptions = new S3MetaRequestOptions() 1174 .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpGetRequest) 1175 .withResponseHandler(getResponseHandler) 1176 .withChecksumConfig(validateChecksumConfig); 1177 1178 try (S3MetaRequest metaRequest = client.makeMetaRequest(getRequestOptions)) { 1179 Assert.assertEquals(Integer.valueOf(0), onGetFinishedFuture.get()); 1180 } 1181 } catch (InterruptedException | ExecutionException ex) { 1182 Assert.fail(ex.getMessage()); 1183 } 1184 } 1185 1186 @Test 1187 public void testS3GetChecksums() { 1188 skipIfAndroid(); 1189 skipIfNetworkUnavailable(); 1190 Assume.assumeTrue(hasAwsCredentials()); 1191 1192 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION); 1193 try (S3Client client = createS3Client(clientOptions)) { 1194 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 1195 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 1196 1197 @Override 1198 public void onFinished(S3FinishedResponseContext context) { 1199 if (context.getErrorCode() != 0) { 1200 onFinishedFuture.completeExceptionally(makeExceptionFromFinishedResponseContext(context)); 1201 return; 1202 } 1203 if (!context.isChecksumValidated()) { 1204 onFinishedFuture.completeExceptionally( 1205 new RuntimeException("Checksum was not validated")); 1206 return; 1207 } 1208 if (context.getChecksumAlgorithm() != ChecksumAlgorithm.CRC32) { 1209 onFinishedFuture.completeExceptionally( 1210 new RuntimeException("Checksum was not validated via CRC32")); 1211 return; 1212 } 1213 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 1214 } 1215 1216 @Override 1217 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 1218 byte[] bytes = new byte[bodyBytesIn.remaining()]; 1219 bodyBytesIn.get(bytes); 1220 return 0; 1221 } 1222 }; 1223 1224 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) }; 1225 HttpRequest httpRequest = new HttpRequest("GET", "/java_get_test_fc.txt", headers, null); 1226 ArrayList<ChecksumAlgorithm> algorList = new ArrayList<ChecksumAlgorithm>(); 1227 algorList.add(ChecksumAlgorithm.CRC32); 1228 algorList.add(ChecksumAlgorithm.CRC32C); 1229 algorList.add(ChecksumAlgorithm.SHA1); 1230 algorList.add(ChecksumAlgorithm.SHA256); 1231 ChecksumConfig validateChecksumConfig = new ChecksumConfig().withValidateChecksum(true) 1232 .withValidateChecksumAlgorithmList(algorList); 1233 1234 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 1235 .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) 1236 .withResponseHandler(responseHandler) 1237 .withChecksumConfig(validateChecksumConfig); 1238 1239 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 1240 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); 1241 } 1242 } catch (InterruptedException | ExecutionException ex) { 1243 Assert.fail(ex.getMessage()); 1244 } 1245 } 1246 1247 @Test 1248 public void testS3GetS3ExpressOverride() throws Exception { 1249 skipIfNetworkUnavailable(); 1250 Assume.assumeTrue(hasAwsCredentials()); 1251 CompletableFuture<Credentials> orig_creds_future = new CompletableFuture<Credentials>(); 1252 Credentials fake_creds = new Credentials("my_access".getBytes(), 1253 "dont_tell_anyone".getBytes(), "token".getBytes()); 1254 1255 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION).withEnableS3Express(true) 1256 .withS3ExpressCredentialsProviderFactory( 1257 new S3ExpressCredentialsProviderFactory() { 1258 public S3ExpressCredentialsProvider createS3ExpressCredentialsProvider(S3Client client) { 1259 S3ExpressCredentialsProviderHandler handler = new S3ExpressCredentialsProviderHandler() { 1260 public CompletableFuture<Credentials> getS3ExpressCredentials( 1261 S3ExpressCredentialsProperties properties, 1262 Credentials origCredentials) { 1263 CompletableFuture<Credentials> future = new CompletableFuture<Credentials>(); 1264 orig_creds_future.complete(origCredentials); 1265 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 1266 "Get creds for : " + properties.getHostValue()); 1267 Credentials creds = new Credentials("access_key".getBytes(), 1268 "secret_access_key".getBytes(), "session_token".getBytes()); 1269 future.complete(creds); 1270 return future; 1271 } 1272 1273 public CompletableFuture<Void> destroyProvider() { 1274 CompletableFuture<Void> future = new CompletableFuture<Void>(); 1275 future.complete(null); 1276 return future; 1277 } 1278 }; 1279 S3ExpressCredentialsProvider provider = new S3ExpressCredentialsProvider(handler); 1280 return provider; 1281 } 1282 }); 1283 try (S3Client client = createS3Client(clientOptions)) { 1284 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 1285 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 1286 1287 @Override 1288 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 1289 byte[] bytes = new byte[bodyBytesIn.remaining()]; 1290 bodyBytesIn.get(bytes); 1291 return 0; 1292 } 1293 1294 @Override 1295 public void onFinished(S3FinishedResponseContext context) { 1296 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 1297 "Meta request finished with error code " + context.getErrorCode()); 1298 if (context.getErrorCode() != 0) { 1299 onFinishedFuture.completeExceptionally(new CrtRuntimeException(context.getErrorCode())); 1300 return; 1301 } 1302 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 1303 } 1304 }; 1305 1306 HttpHeader[] headers = { new HttpHeader("Host", S3EXPRESS_ENDPOINT) }; 1307 HttpRequest httpRequest = new HttpRequest("GET", "/get_object_test_1MB.txt", headers, null); 1308 1309 AwsSigningConfig config = new AwsSigningConfig(); 1310 config.setAlgorithm(AwsSigningConfig.AwsSigningAlgorithm.SIGV4_S3EXPRESS); 1311 config.setCredentials(fake_creds); 1312 1313 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 1314 .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) 1315 .withResponseHandler(responseHandler).withSigningConfig(config); 1316 1317 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 1318 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); 1319 } 1320 } catch (InterruptedException | ExecutionException ex) { 1321 if (!(ex.getCause() instanceof CrtRuntimeException)) { 1322 Assert.fail(ex.getMessage()); 1323 } else { 1324 CrtRuntimeException cause = (CrtRuntimeException) ex.getCause(); 1325 Assert.assertTrue(cause.errorName.equals("AWS_ERROR_S3_INVALID_RESPONSE_STATUS")); 1326 } 1327 } finally { 1328 /* 1329 * Check the request level override of the credentials was passed along to the 1330 * s3express provider override 1331 */ 1332 Credentials resolved_creds = orig_creds_future.get(); 1333 assertTrue(Arrays.equals(resolved_creds.getAccessKeyId(), fake_creds.getAccessKeyId())); 1334 assertTrue(Arrays.equals(resolved_creds.getSecretAccessKey(), fake_creds.getSecretAccessKey())); 1335 assertTrue(Arrays.equals(resolved_creds.getSessionToken(), fake_creds.getSessionToken())); 1336 } 1337 } 1338 1339 private void putS3ExpressHelper(String region, S3Client client) throws Exception{ 1340 1341 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 1342 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 1343 1344 @Override 1345 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 1346 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, "Body Response: " + bodyBytesIn.toString()); 1347 return 0; 1348 } 1349 1350 @Override 1351 public void onFinished(S3FinishedResponseContext context) { 1352 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 1353 "Meta request finished with error code " + context.getErrorCode()); 1354 if (context.getErrorCode() != 0) { 1355 onFinishedFuture.completeExceptionally(makeExceptionFromFinishedResponseContext(context)); 1356 return; 1357 } 1358 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 1359 } 1360 }; 1361 1362 HttpHeader[] headers = { 1363 new HttpHeader("Host", region.equals("us-east-1")? S3EXPRESS_ENDPOINT_EAST1 : S3EXPRESS_ENDPOINT), 1364 }; 1365 HttpRequest httpRequest; 1366 String path = "/put_object_test_10MB.txt"; 1367 String encodedPath = Uri.encodeUriPath(path); 1368 final ByteBuffer payload = ByteBuffer.wrap(createTestPayload(10 * 1024 * 1024)); 1369 HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() { 1370 @Override 1371 public boolean sendRequestBody(ByteBuffer outBuffer) { 1372 ByteBufferUtils.transferData(payload, outBuffer); 1373 return payload.remaining() == 0; 1374 } 1375 1376 @Override 1377 public boolean resetPosition() { 1378 return true; 1379 } 1380 1381 @Override 1382 public long getLength() { 1383 return payload.capacity(); 1384 } 1385 }; 1386 httpRequest = new HttpRequest("PUT", encodedPath, headers, payloadStream); 1387 1388 AwsSigningConfig config = new AwsSigningConfig(); 1389 config.setAlgorithm(AwsSigningConfig.AwsSigningAlgorithm.SIGV4_S3EXPRESS); 1390 config.setRegion(region); 1391 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 1392 .withMetaRequestType(MetaRequestType.PUT_OBJECT).withHttpRequest(httpRequest) 1393 .withResponseHandler(responseHandler).withSigningConfig(config); 1394 1395 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 1396 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); 1397 } 1398 } 1399 1400 @Test 1401 public void testS3PutS3ExpressOverrideSamples() throws Exception { 1402 skipIfNetworkUnavailable(); 1403 Assume.assumeTrue(hasAwsCredentials()); 1404 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION).withEnableS3Express(true) 1405 .withS3ExpressCredentialsProviderFactory( 1406 new S3ExpressCredentialsProviderFactory() { 1407 public S3ExpressCredentialsProvider createS3ExpressCredentialsProvider(S3Client client) { 1408 S3ExpressCredentialsProviderHandler handler = new S3ExpressCredentialsProviderHandlerSample( 1409 client); 1410 S3ExpressCredentialsProvider provider = new S3ExpressCredentialsProvider(handler); 1411 return provider; 1412 } 1413 }); 1414 1415 try (S3Client client = createS3Client(clientOptions)) { 1416 putS3ExpressHelper("us-west-2", client); 1417 putS3ExpressHelper("us-east-1", client); 1418 } catch (InterruptedException | ExecutionException ex) { 1419 Assert.fail(ex.getMessage()); 1420 } 1421 } 1422 1423 @Test 1424 public void testS3PutS3ExpressMultiRegionDefault() throws Exception { 1425 skipIfNetworkUnavailable(); 1426 Assume.assumeTrue(hasAwsCredentials()); 1427 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION).withEnableS3Express(true); 1428 1429 try (S3Client client = createS3Client(clientOptions)) { 1430 putS3ExpressHelper("us-west-2", client); 1431 putS3ExpressHelper("us-east-1", client); 1432 } catch (InterruptedException | ExecutionException ex) { 1433 Assert.fail(ex.getMessage()); 1434 } 1435 } 1436 1437 // TODO: copy is disabled currently because it does not work correctly on c 1438 // side. reenable once its fixed in crt. 1439 // @Test 1440 public void testS3Copy() { 1441 skipIfAndroid(); 1442 skipIfNetworkUnavailable(); 1443 Assume.assumeTrue(hasAwsCredentials()); 1444 1445 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(REGION); 1446 try (S3Client client = createS3Client(clientOptions)) { 1447 CompletableFuture<Integer> onFinishedFuture = new CompletableFuture<>(); 1448 final AtomicLong totalBytesTransferred = new AtomicLong(); 1449 final AtomicLong contentLength = new AtomicLong(); 1450 final AtomicInteger progressInvocationCount = new AtomicInteger(); 1451 1452 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 1453 1454 @Override 1455 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 1456 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, "Body Response: " + bodyBytesIn.toString()); 1457 return 0; 1458 } 1459 1460 @Override 1461 public void onFinished(S3FinishedResponseContext context) { 1462 Log.log(Log.LogLevel.Info, Log.LogSubject.JavaCrtS3, 1463 "Meta request finished with error code " + context.getErrorCode()); 1464 if (context.getErrorCode() != 0) { 1465 System.out.println("Test failed with error payload: " 1466 + new String(context.getErrorPayload(), StandardCharsets.UTF_8)); 1467 onFinishedFuture.completeExceptionally(makeExceptionFromFinishedResponseContext(context)); 1468 return; 1469 } 1470 onFinishedFuture.complete(Integer.valueOf(context.getErrorCode())); 1471 } 1472 1473 @Override 1474 public void onProgress(S3MetaRequestProgress progress) { 1475 progressInvocationCount.incrementAndGet(); 1476 contentLength.set(progress.getContentLength()); 1477 totalBytesTransferred.addAndGet(progress.getBytesTransferred()); 1478 } 1479 }; 1480 1481 // x-amz-copy-source-header is composed of {source_bucket}/{source_key} 1482 final String copySource = COPY_SOURCE_BUCKET + "/" + COPY_SOURCE_KEY; 1483 1484 HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT), 1485 new HttpHeader(X_AMZ_COPY_SOURCE_HEADER, copySource) }; 1486 1487 HttpRequest httpRequest = new HttpRequest("PUT", "/copy_object_test_5GB.txt", headers, null); 1488 1489 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 1490 .withMetaRequestType(MetaRequestType.COPY_OBJECT).withHttpRequest(httpRequest) 1491 .withResponseHandler(responseHandler); 1492 1493 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 1494 Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); 1495 Assert.assertTrue(progressInvocationCount.get() > 0); 1496 Assert.assertTrue(contentLength.get() > 0); 1497 Assert.assertEquals(contentLength.get(), totalBytesTransferred.get()); 1498 } 1499 } catch (InterruptedException | ExecutionException ex) { 1500 Assert.fail(ex.getMessage()); 1501 } 1502 } 1503 1504 static class TransferStats { 1505 static final double GBPS = 1000 * 1000 * 1000; 1506 1507 AtomicLong bytesRead = new AtomicLong(0); 1508 AtomicLong bytesSampled = new AtomicLong(0); 1509 AtomicLong bytesPeak = new AtomicLong(0); 1510 ConcurrentLinkedQueue<Long> bytesPerSecond = new ConcurrentLinkedQueue(); 1511 Instant startTime = Instant.now(); 1512 Duration sampleDelay = Duration.ZERO; 1513 AtomicReference<Instant> lastSampleTime = new AtomicReference<Instant>(Instant.now()); 1514 AtomicInteger msToFirstByte = new AtomicInteger(0); 1515 1516 public TransferStats withSampleDelay(Duration delay) { 1517 sampleDelay = delay; 1518 return this; 1519 } 1520 1521 public void recordRead(long size) { 1522 Instant now = Instant.now(); 1523 recordRead(size, now); 1524 if (this != global) { 1525 global.recordRead(size, now); 1526 } 1527 } 1528 1529 private void recordRead(long size, Instant now) { 1530 bytesRead.addAndGet(size); 1531 int latency = (int) ChronoUnit.MILLIS.between(startTime, now); 1532 if (msToFirstByte.compareAndSet(0, latency)) { 1533 if (this != global) { 1534 global.recordLatency(latency); 1535 } 1536 } 1537 if (now.minusSeconds(1).isAfter(lastSampleTime.get())) { 1538 long bytesThisSecond = bytesRead.get() - bytesSampled.get(); 1539 bytesSampled.getAndAdd(bytesThisSecond); 1540 bytesPeak.getAndUpdate((peak) -> { 1541 return (bytesThisSecond > peak) ? bytesThisSecond : peak; 1542 }); 1543 1544 bytesPerSecond.add(bytesThisSecond); 1545 1546 lastSampleTime.set(now); 1547 } 1548 } 1549 1550 private void recordLatency(long latencyMs) { 1551 msToFirstByte.getAndUpdate((ms) -> { 1552 return (int) Math.ceil((ms + latencyMs) * 0.5); 1553 }); 1554 } 1555 1556 public DoubleStream allSamples() { 1557 return Arrays.stream(bytesPerSecond.toArray(new Long[1])).mapToDouble(a -> a.doubleValue() * 8 / GBPS); 1558 } 1559 1560 public DoubleStream samples() { 1561 return samples(bytesPerSecond.size()); 1562 } 1563 1564 public DoubleStream samples(int limit) { 1565 return allSamples().skip(sampleDelay.getSeconds()).limit(limit); 1566 } 1567 1568 public double avgGbps() { 1569 double avg = samples().average().getAsDouble(); 1570 return avg; 1571 } 1572 1573 public double p90Gbps() { 1574 double[] sorted = samples().sorted().toArray(); 1575 int idx = (int) Math.ceil(sorted.length * 0.1); 1576 return ((sorted[idx] + sorted[idx + 1]) / 2); 1577 } 1578 1579 public double stddev() { 1580 double sumOfSquares = samples().map((a) -> a * a).sum(); 1581 double avg = samples().average().getAsDouble(); 1582 long count = samples().count(); 1583 return (count > 0) ? Math.sqrt((sumOfSquares / count) - (avg * avg)) : 0; 1584 } 1585 1586 public double peakGbps() { 1587 return (bytesPeak.get() * 8) / GBPS; 1588 } 1589 1590 public int latency() { 1591 return msToFirstByte.get(); 1592 } 1593 1594 public static TransferStats global = new TransferStats(); 1595 } 1596 1597 @Test 1598 public void benchmarkS3Get() { 1599 skipIfAndroid(); 1600 skipIfNetworkUnavailable(); 1601 Assume.assumeTrue(hasAwsCredentials()); 1602 Assume.assumeNotNull(System.getProperty("aws.crt.s3.benchmark")); 1603 1604 // Log.initLoggingToStdout(LogLevel.Trace); 1605 1606 // Override defaults with values from system properties, via -D on mvn 1607 // commandline 1608 final int threadCount = Integer.parseInt(System.getProperty("aws.crt.s3.benchmark.threads", "0")); 1609 final String region = System.getProperty("aws.crt.s3.benchmark.region", "us-west-2"); 1610 final String bucket = System.getProperty("aws.crt.s3.benchmark.bucket", 1611 (region == "us-west-2") ? "aws-crt-canary-bucket" : String.format("aws-crt-canary-bucket-%s", region)); 1612 final String endpoint = System.getProperty("aws.crt.s3.benchmark.endpoint", 1613 String.format("%s.s3.%s.amazonaws.com", bucket, region)); 1614 final String objectName = System.getProperty("aws.crt.s3.benchmark.object", 1615 "crt-canary-obj-single-part-9223372036854775807"); 1616 final boolean useTls = Boolean.parseBoolean(System.getProperty("aws.crt.s3.benchmark.tls", "false")); 1617 final double expectedGbps = Double.parseDouble(System.getProperty("aws.crt.s3.benchmark.gbps", "10")); 1618 final int numTransfers = Integer.parseInt(System.getProperty("aws.crt.s3.benchmark.transfers", "16")); 1619 final int concurrentTransfers = Integer.parseInt( 1620 System.getProperty("aws.crt.s3.benchmark.concurrent", "16")); /* should be 1.6 * expectedGbps */ 1621 // avg of .3Gbps per connection, 32 connections per vip, 5 seconds per vip 1622 // resolution 1623 final int vipsNeeded = (int) Math.ceil(expectedGbps / 0.5 / 10); 1624 final int sampleDelay = Integer.parseInt(System.getProperty("aws.crt.s3.benchmark.warmup", 1625 Integer.toString((int) Math.ceil(vipsNeeded / 5)))); 1626 System.out.println(String.format("REGION=%s, WARMUP=%s", region, sampleDelay)); 1627 1628 // Ignore stats during warm up time, they skew results 1629 TransferStats.global.withSampleDelay(Duration.ofSeconds(sampleDelay)); 1630 try (TlsContext tlsCtx = createTlsContextOptions(getContext().trustStore)) { 1631 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(region) 1632 .withThroughputTargetGbps(expectedGbps).withTlsContext(useTls ? tlsCtx : null); 1633 1634 try (S3Client client = createS3Client(clientOptions, threadCount)) { 1635 HttpHeader[] headers = { new HttpHeader("Host", endpoint) }; 1636 HttpRequest httpRequest = new HttpRequest("GET", String.format("/%s", objectName), headers, null); 1637 1638 List<CompletableFuture<TransferStats>> requestFutures = new LinkedList<>(); 1639 1640 // Each meta request will acquire a slot, and release it when it's done 1641 Semaphore concurrentSlots = new Semaphore(concurrentTransfers); 1642 1643 for (int transferIdx = 0; transferIdx < numTransfers; ++transferIdx) { 1644 try { 1645 concurrentSlots.acquire(); 1646 } catch (InterruptedException ex) { 1647 Assert.fail(ex.toString()); 1648 } 1649 1650 final int myIdx = transferIdx; 1651 CompletableFuture<TransferStats> onFinishedFuture = new CompletableFuture<>(); 1652 requestFutures.add(onFinishedFuture); 1653 1654 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 1655 1656 TransferStats stats = new TransferStats(); 1657 1658 @Override 1659 public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) { 1660 stats.recordRead(bodyBytesIn.remaining()); 1661 return 0; 1662 } 1663 1664 @Override 1665 public void onFinished(S3FinishedResponseContext context) { 1666 // release the slot first 1667 concurrentSlots.release(); 1668 1669 if (context.getErrorCode() != 0) { 1670 onFinishedFuture.completeExceptionally(makeExceptionFromFinishedResponseContext(context)); 1671 return; 1672 } 1673 1674 synchronized (System.out) { 1675 System.out.println( 1676 String.format("Transfer %d: Avg: %.3f Gbps Peak: %.3f Gbps First Byte: %dms", 1677 myIdx + 1, stats.avgGbps(), stats.peakGbps(), stats.latency())); 1678 } 1679 1680 onFinishedFuture.complete(stats); 1681 } 1682 }; 1683 1684 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 1685 .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) 1686 .withResponseHandler(responseHandler); 1687 1688 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 1689 1690 } 1691 } 1692 1693 // Finish each future, and deduct failures from completedTransfers 1694 int completedTransfers = numTransfers; 1695 double totalGbps = 0; 1696 for (CompletableFuture<TransferStats> request : requestFutures) { 1697 try { 1698 request.join(); 1699 totalGbps += request.get().avgGbps(); 1700 } catch (CompletionException | InterruptedException | ExecutionException ex) { 1701 System.out.println(ex.toString()); 1702 Throwable cause = ex.getCause(); 1703 if (cause != ex && cause != null) { 1704 System.out.println(cause.toString()); 1705 } 1706 cause = cause.getCause(); 1707 if (cause != null && cause != ex) { 1708 System.out.println(cause.toString()); 1709 } 1710 --completedTransfers; 1711 } 1712 } 1713 1714 // Dump overall stats 1715 TransferStats overall = TransferStats.global; 1716 System.out.println(String.format("%d/%d successful transfers", completedTransfers, numTransfers)); 1717 System.out.println(String.format("Avg: %.3f Gbps", overall.avgGbps())); 1718 System.out.println(String.format("Peak: %.3f Gbps", overall.peakGbps())); 1719 System.out.println(String.format("P90: %.3f Gbps (stddev: %.3f)", overall.p90Gbps(), overall.stddev())); 1720 System.out.println(String.format("Avg Latency: %dms", overall.latency())); 1721 System.out.flush(); 1722 1723 try { 1724 File csvFile = new File("samples.csv"); 1725 try (PrintWriter writer = new PrintWriter(csvFile)) { 1726 writer.println("seconds,gbps"); 1727 AtomicInteger idx = new AtomicInteger(0); 1728 overall.allSamples().mapToObj((gbps) -> { 1729 return String.format("%d,%.3f", idx.getAndIncrement(), gbps); 1730 }).forEach(writer::println); 1731 } 1732 } catch (FileNotFoundException ex) { 1733 System.err.println(ex.getMessage()); 1734 } 1735 } 1736 } 1737 } 1738 1739 @Test 1740 public void benchmarkS3Put() { 1741 skipIfAndroid(); 1742 skipIfNetworkUnavailable(); 1743 Assume.assumeTrue(hasAwsCredentials()); 1744 Assume.assumeNotNull(System.getProperty("aws.crt.s3.benchmark")); 1745 1746 // Override defaults with values from system properties, via -D on mvn 1747 // commandline 1748 final int threadCount = Integer.parseInt(System.getProperty("aws.crt.s3.benchmark.threads", "0")); 1749 final String region = System.getProperty("aws.crt.s3.benchmark.region", "us-west-2"); 1750 final String bucket = System.getProperty("aws.crt.s3.benchmark.bucket", 1751 (region == "us-west-2") ? "aws-crt-canary-bucket" : String.format("aws-crt-canary-bucket-%s", region)); 1752 final String endpoint = System.getProperty("aws.crt.s3.benchmark.endpoint", 1753 String.format("%s.s3.%s.amazonaws.com", bucket, region)); 1754 final boolean useTls = Boolean.parseBoolean(System.getProperty("aws.crt.s3.benchmark.tls", "false")); 1755 final double expectedGbps = Double.parseDouble(System.getProperty("aws.crt.s3.benchmark.gbps", "10")); 1756 final int numTransfers = Integer.parseInt(System.getProperty("aws.crt.s3.benchmark.transfers", "16")); 1757 final int concurrentTransfers = Integer.parseInt( 1758 System.getProperty("aws.crt.s3.benchmark.concurrent", "16")); /* should be 1.6 * expectedGbps */ 1759 // avg of .3Gbps per connection, 32 connections per vip, 5 seconds per vip 1760 // resolution 1761 final int vipsNeeded = (int) Math.ceil(expectedGbps / 0.5 / 10); 1762 final int sampleDelay = Integer.parseInt(System.getProperty("aws.crt.s3.benchmark.warmup", 1763 Integer.toString((int) Math.ceil(vipsNeeded / 5)))); 1764 System.out.println(String.format("REGION=%s, WARMUP=%s", region, sampleDelay)); 1765 1766 // Ignore stats during warm up time, they skew results 1767 TransferStats.global.withSampleDelay(Duration.ofSeconds(sampleDelay)); 1768 1769 try (TlsContext tlsCtx = createTlsContextOptions(getContext().trustStore)) { 1770 S3ClientOptions clientOptions = new S3ClientOptions().withRegion(region) 1771 .withThroughputTargetGbps(expectedGbps).withTlsContext(useTls ? tlsCtx : null); 1772 1773 try (S3Client client = createS3Client(clientOptions, threadCount)) { 1774 List<CompletableFuture<TransferStats>> requestFutures = new LinkedList<>(); 1775 1776 // Each meta request will acquire a slot, and release it when it's done 1777 Semaphore concurrentSlots = new Semaphore(concurrentTransfers); 1778 1779 for (int transferIdx = 0; transferIdx < numTransfers; ++transferIdx) { 1780 try { 1781 concurrentSlots.acquire(); 1782 } catch (InterruptedException ex) { 1783 Assert.fail(ex.toString()); 1784 } 1785 1786 final int myIdx = transferIdx; 1787 CompletableFuture<TransferStats> onFinishedFuture = new CompletableFuture<>(); 1788 requestFutures.add(onFinishedFuture); 1789 1790 S3MetaRequestResponseHandler responseHandler = new S3MetaRequestResponseHandler() { 1791 TransferStats stats = new TransferStats(); 1792 1793 @Override 1794 public void onFinished(S3FinishedResponseContext context) { 1795 // release the slot first 1796 concurrentSlots.release(); 1797 1798 if (context.getErrorCode() != 0) { 1799 onFinishedFuture.completeExceptionally(makeExceptionFromFinishedResponseContext(context)); 1800 return; 1801 } 1802 1803 synchronized (System.out) { 1804 System.out.println(String.format("Transfer %d finished.", myIdx + 1)); 1805 } 1806 1807 onFinishedFuture.complete(stats); 1808 } 1809 }; 1810 1811 final long payloadSize = 5L * 1024L * 1024L * 1024L; 1812 final String payloadString = "This is an S3 Test. This is an S3 Test. This is an S3 Test. This is an S3 Test."; 1813 1814 HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() { 1815 1816 private long remainingBody = payloadSize; 1817 1818 @Override 1819 public boolean sendRequestBody(ByteBuffer outBuffer) { 1820 1821 byte[] payloadBytes = null; 1822 1823 try { 1824 payloadBytes = payloadString.getBytes("ASCII"); 1825 } catch (Exception ex) { 1826 System.out.println("Encountered error trying to get payload bytes."); 1827 return true; 1828 } 1829 1830 while (remainingBody > 0 && outBuffer.remaining() > 0) { 1831 long amtToTransfer = Math.min(remainingBody, (long) outBuffer.remaining()); 1832 amtToTransfer = Math.min(amtToTransfer, (long) payloadBytes.length); 1833 1834 // Transfer the data 1835 outBuffer.put(payloadBytes, 0, (int) amtToTransfer); 1836 1837 remainingBody -= amtToTransfer; 1838 } 1839 1840 return remainingBody == 0; 1841 } 1842 1843 @Override 1844 public boolean resetPosition() { 1845 return true; 1846 } 1847 1848 @Override 1849 public long getLength() { 1850 return payloadSize; 1851 } 1852 }; 1853 1854 HttpHeader[] headers = { new HttpHeader("Host", endpoint), 1855 new HttpHeader("Content-Length", Long.valueOf(payloadSize).toString()), }; 1856 HttpRequest httpRequest = new HttpRequest("PUT", 1857 String.format("/put_object_test_5GB_%d.txt", myIdx + 1), headers, payloadStream); 1858 1859 S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() 1860 .withMetaRequestType(MetaRequestType.PUT_OBJECT).withHttpRequest(httpRequest) 1861 .withResponseHandler(responseHandler); 1862 1863 try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { 1864 } 1865 1866 } 1867 1868 // Finish each future, and deduct failures from completedTransfers 1869 int completedTransfers = numTransfers; 1870 for (CompletableFuture<TransferStats> request : requestFutures) { 1871 try { 1872 request.join(); 1873 } catch (CompletionException ex) { 1874 System.out.println(ex.toString()); 1875 Throwable cause = ex.getCause(); 1876 if (cause != ex && cause != null) { 1877 System.out.println(cause.toString()); 1878 } 1879 cause = cause.getCause(); 1880 if (cause != null && cause != ex) { 1881 System.out.println(cause.toString()); 1882 } 1883 --completedTransfers; 1884 } 1885 } 1886 } 1887 } 1888 } 1889 } 1890