1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.core.async;
17 
18 import java.io.File;
19 import java.io.InputStream;
20 import java.io.OutputStream;
21 import java.nio.ByteBuffer;
22 import java.nio.charset.Charset;
23 import java.nio.charset.StandardCharsets;
24 import java.nio.file.Path;
25 import java.util.Arrays;
26 import java.util.Optional;
27 import java.util.concurrent.ExecutorService;
28 import java.util.function.Consumer;
29 import org.reactivestreams.Publisher;
30 import org.reactivestreams.Subscriber;
31 import software.amazon.awssdk.annotations.SdkPublicApi;
32 import software.amazon.awssdk.core.FileRequestBodyConfiguration;
33 import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
34 import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
35 import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
36 import software.amazon.awssdk.core.internal.async.SplittingPublisher;
37 import software.amazon.awssdk.core.internal.util.Mimetype;
38 import software.amazon.awssdk.utils.BinaryUtils;
39 import software.amazon.awssdk.utils.Validate;
40 
41 /**
42  * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
43  * the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
44  * to write that data on the wire).
45  *
46  * <p>
47  * {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
48  * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
49  * {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
50  * SDK. If the content is not reproducible,  an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
51  * </p>
52  *
53  * <p>
54  * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
55  * subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
56  * notified via the {@link org.reactivestreams.Subscription#request(long)} method.
57  * </p>
58  *
59  * @see FileAsyncRequestBody
60  * @see ByteBuffersAsyncRequestBody
61  */
62 @SdkPublicApi
63 public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {
64 
65     /**
66      * @return The content length of the data being produced.
67      */
contentLength()68     Optional<Long> contentLength();
69 
70     /**
71      * @return The content type of the data being produced.
72      */
contentType()73     default String contentType() {
74         return Mimetype.MIMETYPE_OCTET_STREAM;
75     }
76 
77     /**
78      * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
79      * publisher publishes the data.
80      *
81      * @param publisher Publisher of source data
82      * @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
83      */
fromPublisher(Publisher<ByteBuffer> publisher)84     static AsyncRequestBody fromPublisher(Publisher<ByteBuffer> publisher) {
85         return new AsyncRequestBody() {
86 
87             /**
88              * Returns empty optional as size of the each bytebuffer sent is unknown
89              */
90             @Override
91             public Optional<Long> contentLength() {
92                 return Optional.empty();
93             }
94 
95             @Override
96             public void subscribe(Subscriber<? super ByteBuffer> s) {
97                 publisher.subscribe(s);
98             }
99         };
100     }
101 
102     /**
103      * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. See
104      * {@link FileAsyncRequestBody#builder} to create a customized body implementation.
105      *
106      * @param path Path to file to read from.
107      * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file.
108      * @see FileAsyncRequestBody
109      */
110     static AsyncRequestBody fromFile(Path path) {
111         return FileAsyncRequestBody.builder().path(path).build();
112     }
113 
114     /**
115      * Creates an {@link AsyncRequestBody} that produces data from the contents of a file. See
116      * {@link #fromFile(FileRequestBodyConfiguration)} to create a customized body implementation.
117      *
118      * @param file The file to read from.
119      * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file.
120      */
121     static AsyncRequestBody fromFile(File file) {
122         return FileAsyncRequestBody.builder().path(file.toPath()).build();
123     }
124 
125     /**
126      * Creates an {@link AsyncRequestBody} that produces data from the contents of a file.
127      *
128      * @param configuration configuration for how the SDK should read the file
129      * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file.
130      */
131     static AsyncRequestBody fromFile(FileRequestBodyConfiguration configuration) {
132         Validate.notNull(configuration, "configuration");
133         return FileAsyncRequestBody.builder()
134                                    .path(configuration.path())
135                                    .position(configuration.position())
136                                    .chunkSizeInBytes(configuration.chunkSizeInBytes())
137                                    .numBytesToRead(configuration.numBytesToRead())
138                                    .build();
139     }
140 
141     /**
142      * Creates an {@link AsyncRequestBody} that produces data from the contents of a file.
143      *
144      * <p>
145      * This is a convenience method that creates an instance of the {@link FileRequestBodyConfiguration} builder,
146      * avoiding the need to create one manually via {@link FileRequestBodyConfiguration#builder()}.
147      *
148      * @param configuration configuration for how the SDK should read the file
149      * @return Implementation of {@link AsyncRequestBody} that reads data from the specified file.
150      */
151     static AsyncRequestBody fromFile(Consumer<FileRequestBodyConfiguration.Builder> configuration) {
152         Validate.notNull(configuration, "configuration");
153         return fromFile(FileRequestBodyConfiguration.builder().applyMutation(configuration).build());
154     }
155 
156     /**
157      * Creates an {@link AsyncRequestBody} that uses a single string as data.
158      *
159      * @param string The string to provide.
160      * @param cs The {@link Charset} to use.
161      * @return Implementation of {@link AsyncRequestBody} that uses the specified string.
162      * @see ByteBuffersAsyncRequestBody
163      */
164     static AsyncRequestBody fromString(String string, Charset cs) {
165         return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
166                                                 string.getBytes(cs));
167     }
168 
169     /**
170      * Creates an {@link AsyncRequestBody} that uses a single string as data with UTF_8 encoding.
171      *
172      * @param string The string to send.
173      * @return Implementation of {@link AsyncRequestBody} that uses the specified string.
174      * @see #fromString(String, Charset)
175      */
176     static AsyncRequestBody fromString(String string) {
177         return fromString(string, StandardCharsets.UTF_8);
178     }
179 
180     /**
181      * Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
182      * modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
183      *
184      * @param bytes The bytes to send to the service.
185      * @return AsyncRequestBody instance.
186      */
187     static AsyncRequestBody fromBytes(byte[] bytes) {
188         byte[] clonedBytes = bytes.clone();
189         return ByteBuffersAsyncRequestBody.from(clonedBytes);
190     }
191 
192     /**
193      * Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This
194      * introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
195      * implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
196      * {@code AsyncRequestBody} implementation.
197      *
198      * <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
199      *
200      * @param bytes The bytes to send to the service.
201      * @return AsyncRequestBody instance.
202      */
203     static AsyncRequestBody fromBytesUnsafe(byte[] bytes) {
204         return ByteBuffersAsyncRequestBody.from(bytes);
205     }
206 
207     /**
208      * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
209      * prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
210      * <p>
211      * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
212      * it to copy only the remaining readable bytes.
213      *
214      * @param byteBuffer ByteBuffer to send to the service.
215      * @return AsyncRequestBody instance.
216      */
217     static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
218         ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer);
219         immutableCopy.rewind();
220         return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
221     }
222 
223     /**
224      * Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
225      * remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
226      * reflected in the {@link AsyncRequestBody}.
227      * <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
228      * only the remaining bytes.
229      *
230      * @param byteBuffer ByteBuffer to send to the service.
231      * @return AsyncRequestBody instance.
232      */
233     static AsyncRequestBody fromRemainingByteBuffer(ByteBuffer byteBuffer) {
234         ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(byteBuffer);
235         return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
236     }
237 
238     /**
239      * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
240      * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
241      * {@code AsyncRequestBody} implementation.
242      * <p>
243      * <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
244      * need it to copy only the remaining readable bytes.
245      *
246      * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
247      * risks.
248      *
249      * @param byteBuffer ByteBuffer to send to the service.
250      * @return AsyncRequestBody instance.
251      */
252     static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) {
253         ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
254         readOnlyBuffer.rewind();
255         return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
256     }
257 
258     /**
259      * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
260      * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
261      * {@code AsyncRequestBody} implementation.
262      * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
263      * the buffer and reads only the remaining bytes.
264      *
265      * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
266      * risks.
267      *
268      * @param byteBuffer ByteBuffer to send to the service.
269      * @return AsyncRequestBody instance.
270      */
271     static AsyncRequestBody fromRemainingByteBufferUnsafe(ByteBuffer byteBuffer) {
272         ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
273         return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
274     }
275 
276     /**
277      * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
278      * to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
279      * <p>
280      * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
281      * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
282      *
283      * @param byteBuffers ByteBuffer array to send to the service.
284      * @return AsyncRequestBody instance.
285      */
286     static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
287         ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
288                                            .map(BinaryUtils::immutableCopyOf)
289                                            .peek(ByteBuffer::rewind)
290                                            .toArray(ByteBuffer[]::new);
291         return ByteBuffersAsyncRequestBody.of(immutableCopy);
292     }
293 
294     /**
295      * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
296      * {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
297      * {@link AsyncRequestBody}.
298      * <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
299      * this method respects the current read position of each buffer and reads only the remaining bytes.
300      *
301      * @param byteBuffers ByteBuffer array to send to the service.
302      * @return AsyncRequestBody instance.
303      */
304     static AsyncRequestBody fromRemainingByteBuffers(ByteBuffer... byteBuffers) {
305         ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
306                                            .map(BinaryUtils::immutableCopyOfRemaining)
307                                            .peek(ByteBuffer::rewind)
308                                            .toArray(ByteBuffer[]::new);
309         return ByteBuffersAsyncRequestBody.of(immutableCopy);
310     }
311 
312     /**
313      * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
314      * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
315      * {@code AsyncRequestBody} implementation.
316      * <p>
317      * <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
318      * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
319      *
320      * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
321      * risks.
322      *
323      * @param byteBuffers ByteBuffer array to send to the service.
324      * @return AsyncRequestBody instance.
325      */
326     static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) {
327         ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
328                                              .map(ByteBuffer::asReadOnlyBuffer)
329                                              .peek(ByteBuffer::rewind)
330                                              .toArray(ByteBuffer[]::new);
331         return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
332     }
333 
334     /**
335      * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
336      * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
337      * {@code AsyncRequestBody} implementation.
338      * <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
339      * this method respects the current read position of each buffer and reads only the remaining bytes.
340      *
341      * <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
342      * risks.
343      *
344      * @param byteBuffers ByteBuffer array to send to the service.
345      * @return AsyncRequestBody instance.
346      */
347     static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers) {
348         ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
349                                              .map(ByteBuffer::asReadOnlyBuffer)
350                                              .toArray(ByteBuffer[]::new);
351         return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
352     }
353 
354     /**
355      * Creates an {@link AsyncRequestBody} from an {@link InputStream}.
356      *
357      * <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
358      * non-blocking event loop threads owned by the SDK.
359      */
360     static AsyncRequestBody fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor) {
361         return fromInputStream(b -> b.inputStream(inputStream).contentLength(contentLength).executor(executor));
362     }
363 
364     /**
365      * Creates an {@link AsyncRequestBody} from an {@link InputStream} with the provided
366      * {@link AsyncRequestBodySplitConfiguration}.
367      */
368     static AsyncRequestBody fromInputStream(AsyncRequestBodyFromInputStreamConfiguration configuration) {
369         Validate.notNull(configuration, "configuration");
370         return new InputStreamWithExecutorAsyncRequestBody(configuration);
371     }
372 
373     /**
374      * This is a convenience method that passes an instance of the {@link AsyncRequestBodyFromInputStreamConfiguration} builder,
375      * avoiding the need to create one manually via {@link AsyncRequestBodyFromInputStreamConfiguration#builder()}.
376      *
377      * @see #fromInputStream(AsyncRequestBodyFromInputStreamConfiguration)
378      */
379     static AsyncRequestBody fromInputStream(Consumer<AsyncRequestBodyFromInputStreamConfiguration.Builder> configuration) {
380         Validate.notNull(configuration, "configuration");
381         return fromInputStream(AsyncRequestBodyFromInputStreamConfiguration.builder().applyMutation(configuration).build());
382     }
383 
384     /**
385      * Creates a {@link BlockingInputStreamAsyncRequestBody} to use for writing an input stream to the downstream service.
386      *
387      * <p><b>Example Usage</b>
388      *
389      * <p>
390      * {@snippet :
391      *     S3AsyncClient s3 = S3AsyncClient.create(); // Use one client for your whole application!
392      *
393      *     byte[] dataToSend = "Hello".getBytes(StandardCharsets.UTF_8);
394      *     InputStream streamToSend = new ByteArrayInputStream();
395      *     long streamToSendLength = dataToSend.length();
396      *
397      *     // Start the operation
398      *     BlockingInputStreamAsyncRequestBody body =
399      *         AsyncRequestBody.forBlockingInputStream(streamToSendLength);
400      *     CompletableFuture<PutObjectResponse> responseFuture =
401      *         s3.putObject(r -> r.bucket("bucketName").key("key"), body);
402      *
403      *     // Write the input stream to the running operation
404      *     body.writeInputStream(streamToSend);
405      *
406      *     // Wait for the service to respond.
407      *     PutObjectResponse response = responseFuture.join();
408      * }
409      */
410     static BlockingInputStreamAsyncRequestBody forBlockingInputStream(Long contentLength) {
411         return new BlockingInputStreamAsyncRequestBody(contentLength);
412     }
413 
414     /**
415      * Creates a {@link BlockingOutputStreamAsyncRequestBody} to use for writing to the downstream service as if it's an output
416      * stream. Retries are not supported for this request body.
417      *
418      * <p>The caller is responsible for calling {@link OutputStream#close()} on the
419      * {@link BlockingOutputStreamAsyncRequestBody#outputStream()} when writing is complete.
420      *
421      * <p><b>Example Usage</b>
422      * <p>
423      * {@snippet :
424      *     S3AsyncClient s3 = S3AsyncClient.create(); // Use one client for your whole application!
425      *
426      *     byte[] dataToSend = "Hello".getBytes(StandardCharsets.UTF_8);
427      *     long lengthOfDataToSend = dataToSend.length();
428      *
429      *     // Start the operation
430      *     BlockingInputStreamAsyncRequestBody body =
431      *         AsyncRequestBody.forBlockingOutputStream(lengthOfDataToSend);
432      *     CompletableFuture<PutObjectResponse> responseFuture =
433      *         s3.putObject(r -> r.bucket("bucketName").key("key"), body);
434      *
435      *     // Write the input stream to the running operation
436      *     try (CancellableOutputStream outputStream = body.outputStream()) {
437      *         outputStream.write(dataToSend);
438      *     }
439      *
440      *     // Wait for the service to respond.
441      *     PutObjectResponse response = responseFuture.join();
442      * }
443      */
444     static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long contentLength) {
445         return new BlockingOutputStreamAsyncRequestBody(contentLength);
446     }
447 
448     /**
449      * Creates an {@link AsyncRequestBody} with no content.
450      *
451      * @return AsyncRequestBody instance.
452      */
453     static AsyncRequestBody empty() {
454         return fromBytes(new byte[0]);
455     }
456 
457 
458     /**
459      * Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific
460      * portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk size
461      * is 2MB and the default buffer size is 8MB.
462      *
463      * <p>
464      * By default, if content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is
465      * delivered to the subscriber right after it's initialized. On the other hand, if content length is null, it is sent after
466      * the entire content for that chunk is buffered. In this case, the configured {@code maxMemoryUsageInBytes} must be larger
467      * than or equal to {@code chunkSizeInBytes}. Note that this behavior may be different if a specific implementation of this
468      * interface overrides this method.
469      *
470      * @see AsyncRequestBodySplitConfiguration
471      */
472     default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
473         Validate.notNull(splitConfiguration, "splitConfiguration");
474 
475         return new SplittingPublisher(this, splitConfiguration);
476     }
477 
478     /**
479      * This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
480      * avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
481      *
482      * @see #split(AsyncRequestBodySplitConfiguration)
483      */
484     default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
485         Validate.notNull(splitConfiguration, "splitConfiguration");
486         return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
487     }
488 }
489