1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.core.async; 17 18 import java.io.File; 19 import java.io.InputStream; 20 import java.io.OutputStream; 21 import java.nio.ByteBuffer; 22 import java.nio.charset.Charset; 23 import java.nio.charset.StandardCharsets; 24 import java.nio.file.Path; 25 import java.util.Arrays; 26 import java.util.Optional; 27 import java.util.concurrent.ExecutorService; 28 import java.util.function.Consumer; 29 import org.reactivestreams.Publisher; 30 import org.reactivestreams.Subscriber; 31 import software.amazon.awssdk.annotations.SdkPublicApi; 32 import software.amazon.awssdk.core.FileRequestBodyConfiguration; 33 import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody; 34 import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; 35 import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody; 36 import software.amazon.awssdk.core.internal.async.SplittingPublisher; 37 import software.amazon.awssdk.core.internal.util.Mimetype; 38 import software.amazon.awssdk.utils.BinaryUtils; 39 import software.amazon.awssdk.utils.Validate; 40 41 /** 42 * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is 43 * the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e. 44 * to write that data on the wire). 45 * 46 * <p> 47 * {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe 48 * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a 49 * {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the 50 * SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls. 51 * </p> 52 * 53 * <p> 54 * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The 55 * subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be 56 * notified via the {@link org.reactivestreams.Subscription#request(long)} method. 57 * </p> 58 * 59 * @see FileAsyncRequestBody 60 * @see ByteBuffersAsyncRequestBody 61 */ 62 @SdkPublicApi 63 public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> { 64 65 /** 66 * @return The content length of the data being produced. 67 */ contentLength()68 Optional<Long> contentLength(); 69 70 /** 71 * @return The content type of the data being produced. 72 */ contentType()73 default String contentType() { 74 return Mimetype.MIMETYPE_OCTET_STREAM; 75 } 76 77 /** 78 * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the 79 * publisher publishes the data. 80 * 81 * @param publisher Publisher of source data 82 * @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher 83 */ fromPublisher(Publisher<ByteBuffer> publisher)84 static AsyncRequestBody fromPublisher(Publisher<ByteBuffer> publisher) { 85 return new AsyncRequestBody() { 86 87 /** 88 * Returns empty optional as size of the each bytebuffer sent is unknown 89 */ 90 @Override 91 public Optional<Long> contentLength() { 92 return Optional.empty(); 93 } 94 95 @Override 96 public void subscribe(Subscriber<? super ByteBuffer> s) { 97 publisher.subscribe(s); 98 } 99 }; 100 } 101 102 /** 103 * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. See 104 * {@link FileAsyncRequestBody#builder} to create a customized body implementation. 105 * 106 * @param path Path to file to read from. 107 * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file. 108 * @see FileAsyncRequestBody 109 */ 110 static AsyncRequestBody fromFile(Path path) { 111 return FileAsyncRequestBody.builder().path(path).build(); 112 } 113 114 /** 115 * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. See 116 * {@link #fromFile(FileRequestBodyConfiguration)} to create a customized body implementation. 117 * 118 * @param file The file to read from. 119 * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file. 120 */ 121 static AsyncRequestBody fromFile(File file) { 122 return FileAsyncRequestBody.builder().path(file.toPath()).build(); 123 } 124 125 /** 126 * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. 127 * 128 * @param configuration configuration for how the SDK should read the file 129 * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file. 130 */ 131 static AsyncRequestBody fromFile(FileRequestBodyConfiguration configuration) { 132 Validate.notNull(configuration, "configuration"); 133 return FileAsyncRequestBody.builder() 134 .path(configuration.path()) 135 .position(configuration.position()) 136 .chunkSizeInBytes(configuration.chunkSizeInBytes()) 137 .numBytesToRead(configuration.numBytesToRead()) 138 .build(); 139 } 140 141 /** 142 * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. 143 * 144 * <p> 145 * This is a convenience method that creates an instance of the {@link FileRequestBodyConfiguration} builder, 146 * avoiding the need to create one manually via {@link FileRequestBodyConfiguration#builder()}. 147 * 148 * @param configuration configuration for how the SDK should read the file 149 * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file. 150 */ 151 static AsyncRequestBody fromFile(Consumer<FileRequestBodyConfiguration.Builder> configuration) { 152 Validate.notNull(configuration, "configuration"); 153 return fromFile(FileRequestBodyConfiguration.builder().applyMutation(configuration).build()); 154 } 155 156 /** 157 * Creates an {@link AsyncRequestBody} that uses a single string as data. 158 * 159 * @param string The string to provide. 160 * @param cs The {@link Charset} to use. 161 * @return Implementation of {@link AsyncRequestBody} that uses the specified string. 162 * @see ByteBuffersAsyncRequestBody 163 */ 164 static AsyncRequestBody fromString(String string, Charset cs) { 165 return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(), 166 string.getBytes(cs)); 167 } 168 169 /** 170 * Creates an {@link AsyncRequestBody} that uses a single string as data with UTF_8 encoding. 171 * 172 * @param string The string to send. 173 * @return Implementation of {@link AsyncRequestBody} that uses the specified string. 174 * @see #fromString(String, Charset) 175 */ 176 static AsyncRequestBody fromString(String string) { 177 return fromString(string, StandardCharsets.UTF_8); 178 } 179 180 /** 181 * Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent 182 * modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}. 183 * 184 * @param bytes The bytes to send to the service. 185 * @return AsyncRequestBody instance. 186 */ 187 static AsyncRequestBody fromBytes(byte[] bytes) { 188 byte[] clonedBytes = bytes.clone(); 189 return ByteBuffersAsyncRequestBody.from(clonedBytes); 190 } 191 192 /** 193 * Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This 194 * introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody} 195 * implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this 196 * {@code AsyncRequestBody} implementation. 197 * 198 * <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks. 199 * 200 * @param bytes The bytes to send to the service. 201 * @return AsyncRequestBody instance. 202 */ 203 static AsyncRequestBody fromBytesUnsafe(byte[] bytes) { 204 return ByteBuffersAsyncRequestBody.from(bytes); 205 } 206 207 /** 208 * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to 209 * prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}. 210 * <p> 211 * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need 212 * it to copy only the remaining readable bytes. 213 * 214 * @param byteBuffer ByteBuffer to send to the service. 215 * @return AsyncRequestBody instance. 216 */ 217 static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) { 218 ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer); 219 immutableCopy.rewind(); 220 return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy); 221 } 222 223 /** 224 * Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the 225 * remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being 226 * reflected in the {@link AsyncRequestBody}. 227 * <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads 228 * only the remaining bytes. 229 * 230 * @param byteBuffer ByteBuffer to send to the service. 231 * @return AsyncRequestBody instance. 232 */ 233 static AsyncRequestBody fromRemainingByteBuffer(ByteBuffer byteBuffer) { 234 ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(byteBuffer); 235 return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy); 236 } 237 238 /** 239 * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the 240 * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this 241 * {@code AsyncRequestBody} implementation. 242 * <p> 243 * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you 244 * need it to copy only the remaining readable bytes. 245 * 246 * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the 247 * risks. 248 * 249 * @param byteBuffer ByteBuffer to send to the service. 250 * @return AsyncRequestBody instance. 251 */ 252 static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) { 253 ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer(); 254 readOnlyBuffer.rewind(); 255 return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer); 256 } 257 258 /** 259 * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the 260 * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this 261 * {@code AsyncRequestBody} implementation. 262 * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of 263 * the buffer and reads only the remaining bytes. 264 * 265 * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the 266 * risks. 267 * 268 * @param byteBuffer ByteBuffer to send to the service. 269 * @return AsyncRequestBody instance. 270 */ 271 static AsyncRequestBody fromRemainingByteBufferUnsafe(ByteBuffer byteBuffer) { 272 ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer(); 273 return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer); 274 } 275 276 /** 277 * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer} 278 * to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}. 279 * <p> 280 * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use 281 * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes. 282 * 283 * @param byteBuffers ByteBuffer array to send to the service. 284 * @return AsyncRequestBody instance. 285 */ 286 static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) { 287 ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers) 288 .map(BinaryUtils::immutableCopyOf) 289 .peek(ByteBuffer::rewind) 290 .toArray(ByteBuffer[]::new); 291 return ByteBuffersAsyncRequestBody.of(immutableCopy); 292 } 293 294 /** 295 * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each 296 * {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the 297 * {@link AsyncRequestBody}. 298 * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, 299 * this method respects the current read position of each buffer and reads only the remaining bytes. 300 * 301 * @param byteBuffers ByteBuffer array to send to the service. 302 * @return AsyncRequestBody instance. 303 */ 304 static AsyncRequestBody fromRemainingByteBuffers(ByteBuffer... byteBuffers) { 305 ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers) 306 .map(BinaryUtils::immutableCopyOfRemaining) 307 .peek(ByteBuffer::rewind) 308 .toArray(ByteBuffer[]::new); 309 return ByteBuffersAsyncRequestBody.of(immutableCopy); 310 } 311 312 /** 313 * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each 314 * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this 315 * {@code AsyncRequestBody} implementation. 316 * <p> 317 * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use 318 * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes. 319 * 320 * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the 321 * risks. 322 * 323 * @param byteBuffers ByteBuffer array to send to the service. 324 * @return AsyncRequestBody instance. 325 */ 326 static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) { 327 ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers) 328 .map(ByteBuffer::asReadOnlyBuffer) 329 .peek(ByteBuffer::rewind) 330 .toArray(ByteBuffer[]::new); 331 return ByteBuffersAsyncRequestBody.of(readOnlyBuffers); 332 } 333 334 /** 335 * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each 336 * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this 337 * {@code AsyncRequestBody} implementation. 338 * <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)}, 339 * this method respects the current read position of each buffer and reads only the remaining bytes. 340 * 341 * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the 342 * risks. 343 * 344 * @param byteBuffers ByteBuffer array to send to the service. 345 * @return AsyncRequestBody instance. 346 */ 347 static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers) { 348 ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers) 349 .map(ByteBuffer::asReadOnlyBuffer) 350 .toArray(ByteBuffer[]::new); 351 return ByteBuffersAsyncRequestBody.of(readOnlyBuffers); 352 } 353 354 /** 355 * Creates an {@link AsyncRequestBody} from an {@link InputStream}. 356 * 357 * <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the 358 * non-blocking event loop threads owned by the SDK. 359 */ 360 static AsyncRequestBody fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor) { 361 return fromInputStream(b -> b.inputStream(inputStream).contentLength(contentLength).executor(executor)); 362 } 363 364 /** 365 * Creates an {@link AsyncRequestBody} from an {@link InputStream} with the provided 366 * {@link AsyncRequestBodySplitConfiguration}. 367 */ 368 static AsyncRequestBody fromInputStream(AsyncRequestBodyFromInputStreamConfiguration configuration) { 369 Validate.notNull(configuration, "configuration"); 370 return new InputStreamWithExecutorAsyncRequestBody(configuration); 371 } 372 373 /** 374 * This is a convenience method that passes an instance of the {@link AsyncRequestBodyFromInputStreamConfiguration} builder, 375 * avoiding the need to create one manually via {@link AsyncRequestBodyFromInputStreamConfiguration#builder()}. 376 * 377 * @see #fromInputStream(AsyncRequestBodyFromInputStreamConfiguration) 378 */ 379 static AsyncRequestBody fromInputStream(Consumer<AsyncRequestBodyFromInputStreamConfiguration.Builder> configuration) { 380 Validate.notNull(configuration, "configuration"); 381 return fromInputStream(AsyncRequestBodyFromInputStreamConfiguration.builder().applyMutation(configuration).build()); 382 } 383 384 /** 385 * Creates a {@link BlockingInputStreamAsyncRequestBody} to use for writing an input stream to the downstream service. 386 * 387 * <p><b>Example Usage</b> 388 * 389 * <p> 390 * {@snippet : 391 * S3AsyncClient s3 = S3AsyncClient.create(); // Use one client for your whole application! 392 * 393 * byte[] dataToSend = "Hello".getBytes(StandardCharsets.UTF_8); 394 * InputStream streamToSend = new ByteArrayInputStream(); 395 * long streamToSendLength = dataToSend.length(); 396 * 397 * // Start the operation 398 * BlockingInputStreamAsyncRequestBody body = 399 * AsyncRequestBody.forBlockingInputStream(streamToSendLength); 400 * CompletableFuture<PutObjectResponse> responseFuture = 401 * s3.putObject(r -> r.bucket("bucketName").key("key"), body); 402 * 403 * // Write the input stream to the running operation 404 * body.writeInputStream(streamToSend); 405 * 406 * // Wait for the service to respond. 407 * PutObjectResponse response = responseFuture.join(); 408 * } 409 */ 410 static BlockingInputStreamAsyncRequestBody forBlockingInputStream(Long contentLength) { 411 return new BlockingInputStreamAsyncRequestBody(contentLength); 412 } 413 414 /** 415 * Creates a {@link BlockingOutputStreamAsyncRequestBody} to use for writing to the downstream service as if it's an output 416 * stream. Retries are not supported for this request body. 417 * 418 * <p>The caller is responsible for calling {@link OutputStream#close()} on the 419 * {@link BlockingOutputStreamAsyncRequestBody#outputStream()} when writing is complete. 420 * 421 * <p><b>Example Usage</b> 422 * <p> 423 * {@snippet : 424 * S3AsyncClient s3 = S3AsyncClient.create(); // Use one client for your whole application! 425 * 426 * byte[] dataToSend = "Hello".getBytes(StandardCharsets.UTF_8); 427 * long lengthOfDataToSend = dataToSend.length(); 428 * 429 * // Start the operation 430 * BlockingInputStreamAsyncRequestBody body = 431 * AsyncRequestBody.forBlockingOutputStream(lengthOfDataToSend); 432 * CompletableFuture<PutObjectResponse> responseFuture = 433 * s3.putObject(r -> r.bucket("bucketName").key("key"), body); 434 * 435 * // Write the input stream to the running operation 436 * try (CancellableOutputStream outputStream = body.outputStream()) { 437 * outputStream.write(dataToSend); 438 * } 439 * 440 * // Wait for the service to respond. 441 * PutObjectResponse response = responseFuture.join(); 442 * } 443 */ 444 static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long contentLength) { 445 return new BlockingOutputStreamAsyncRequestBody(contentLength); 446 } 447 448 /** 449 * Creates an {@link AsyncRequestBody} with no content. 450 * 451 * @return AsyncRequestBody instance. 452 */ 453 static AsyncRequestBody empty() { 454 return fromBytes(new byte[0]); 455 } 456 457 458 /** 459 * Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific 460 * portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk size 461 * is 2MB and the default buffer size is 8MB. 462 * 463 * <p> 464 * By default, if content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is 465 * delivered to the subscriber right after it's initialized. On the other hand, if content length is null, it is sent after 466 * the entire content for that chunk is buffered. In this case, the configured {@code maxMemoryUsageInBytes} must be larger 467 * than or equal to {@code chunkSizeInBytes}. Note that this behavior may be different if a specific implementation of this 468 * interface overrides this method. 469 * 470 * @see AsyncRequestBodySplitConfiguration 471 */ 472 default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) { 473 Validate.notNull(splitConfiguration, "splitConfiguration"); 474 475 return new SplittingPublisher(this, splitConfiguration); 476 } 477 478 /** 479 * This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder, 480 * avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}. 481 * 482 * @see #split(AsyncRequestBodySplitConfiguration) 483 */ 484 default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) { 485 Validate.notNull(splitConfiguration, "splitConfiguration"); 486 return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build()); 487 } 488 } 489