xref: /aosp_15_r20/external/aws-sdk-java-v2/docs/design/core/event-streaming/alternate-syntax/README.md (revision 8a52c7834d808308836a99fc2a6e0ed8db339086)
1**Design:** New Feature, **Status:** [Proposed](../../../README.md)
2
3# Event Stream Alternate Syntax
4
5Event streaming allows long-running bi-directional communication between
6customers and AWS services over HTTP/2 connections.
7
8The current syntax for event streaming APIs is adequate for power users,
9but has a few disadvantages:
10
111. Customers must use reactive streams APIs, even for relatively simple
12   use-cases. Reactive streams APIs are powerful, but difficult to use
13   without external documentation and libraries.
142. All response processing must be performed in a callback (the
15   `ResponseHandler` abstraction), which makes it challenging to
16   propagate information to the rest of the application.
17
18This mini-proposal suggests an alternate syntax that customers would be
19able to use for all event streaming operations.
20
21## Proposal
22
23A new method will be added to each event streaming operation:
24`Running{OPERATION} {OPERATION}({OPERATION}Request)` (and its
25consumer-builder variant).
26
27A new type will be created for each event streaming operation:
28`Running{OPERATION}`:
29
30```Java
31interface Running{OPERATION} extends AutoCloseable {
32    // A future that is completed when the entire operation completes.
33    CompletableFuture<Void> completionFuture();
34
35    /**
36     * Methods enabling reading individual events asynchronously, as they are received.
37     */
38
39    CompletableFuture<Void> readAll(Consumer<{RESPONSE_EVENT_TYPE}> reader);
40    CompletableFuture<Void> readAll({RESPONSE_EVENT_TYPE}Visitor responseVisitor);
41    <T extends {RESPONSE_EVENT_TYPE}> CompletableFuture<Void> readAll(Class<T> type, Consumer<T> reader);
42
43    CompletableFuture<Optional<{REQUEST_EVENT_TYPE}>> readNext();
44    <T extends {RESPONSE_EVENT_TYPE}> CompletableFuture<Optional<T>> readNext(Class<T> type);
45
46    /**
47     * Methods enabling writing individual events asynchronously.
48     */
49
50    CompletableFuture<Void> writeAll(Publisher<? extends {REQUEST_EVENT_TYPE}> events);
51    CompletableFuture<Void> writeAll(Iterable<? extends {REQUEST_EVENT_TYPE}> events);
52    CompletableFuture<Void> write({REQUEST_EVENT_TYPE} event);
53
54    /**
55     * Reactive-streams methods for reading events and response messages, as they are received.
56     */
57    Publisher<{RESPONSE_EVENT_TYPE}> responseEventPublisher();
58    Publisher<{OPERATION}Response> responsePublisher();
59
60    /**
61     * Java-8-streams methods for reading events and response messages, as they are received.
62     */
63
64    Stream<{RESPONSE_EVENT_TYPE}> blockingResponseEventStream();
65    Stream<{OPERATION}Response> blockingResponseStream();
66
67    @Override
68    default void close() {
69        completionFuture().cancel(false);
70    }
71}
72```
73
74This type enables customers to use the operation in either a
75reactive-streams or a Java-8 usage pattern, depending on how they care
76to manage their threads and back-pressure.
77
78It's worth noting that every method on `Running{OPERATION}` is still
79non-blocking and will never throw exceptions directly. Any method that
80returns a type that itself contains blocking methods is prefixed with
81`blocking`, e.g. `Stream<{RESPONSE_EVENT_TYPE}>
82blockingResponseEventStream()`.
83
84**Example 1: Transcribe's `startStreamTranscription` with Reactive
85Streams**
86
87```Java
88try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create();
89     // Create the connection to transcribe and send the initial request message
90     RunningStartStreamTranscription transcription =
91            client.startStreamTranscription(r -> r.languageCode(LanguageCode.EN_US)
92                                                  .mediaEncoding(MediaEncoding.PCM)
93                                                  .mediaSampleRateHertz(16_000))) {
94
95    // Use RxJava to create the audio stream to be transcribed
96    Publisher<AudioStream> audioPublisher =
97            Bytes.from(audioFile)
98                 .map(SdkBytes::fromByteArray)
99                 .map(bytes -> AudioEvent.builder().audioChunk(bytes).build())
100                 .cast(AudioStream.class);
101
102    // Begin sending the audio data to transcribe, asynchronously
103    transcription.writeAll(audioPublisher);
104
105    // Get a publisher for the transcription
106    Publisher<TranscriptResultStream> transcriptionPublisher = transcription.responseEventPublisher();
107
108    // Use RxJava to log the transcription
109    Flowable.fromPublisher(transcriptionPublisher)
110            .filter(e -> e instanceof TranscriptEvent)
111            .cast(TranscriptEvent.class)
112            .forEach(e -> System.out.println(e.transcript().results()));
113
114    // Wait for the operation to complete
115    transcription.completionFuture().join();
116}
117```
118
119**Example 2: Transcribe's `startStreamTranscription` without Reactive
120Streams**
121
122```Java
123try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create();
124    // Create the connection to transcribe and send the initial request message
125    RunningStartStreamTranscription transcription =
126            client.startStreamTranscription(r -> r.languageCode(LanguageCode.EN_US)
127                                                  .mediaEncoding(MediaEncoding.PCM)
128                                                  .mediaSampleRateHertz(16_000))) {
129
130    // Asynchronously log response transcription events, as we receive them
131    transcription.readAll(TranscriptEvent.class, e -> System.out.println(e.transcript().results()));
132
133    // Read from our audio file, 4 KB at a time
134    try (InputStream reader = Files.newInputStream(audioFile)) {
135        byte[] buffer = new byte[4096];
136        int bytesRead;
137
138        while ((bytesRead = reader.read(buffer)) != -1) {
139            if (bytesRead > 0) {
140                // Write the 4 KB we read to transcribe, and wait for the write to complete
141                SdkBytes audioChunk = SdkBytes.fromByteBuffer(ByteBuffer.wrap(buffer, 0, bytesRead));
142                CompletableFuture<Void> writeCompleteFuture =
143                        transcription.write(AudioEvent.builder().audioChunk(audioChunk).build());
144                writeCompleteFuture.join();
145            }
146        }
147    }
148
149    // Wait for the operation to complete
150    transcription.completionFuture().join();
151}
152```
153
154**Example 3: Kinesis's `subscribeToShard` with Java 8 Streams**
155
156```Java
157try (KinesisAsyncClient client = KinesisAsyncClient.create();
158     // Create the connection to Kinesis and send the initial request message
159     RunningSubscribeToShard transcription = client.subscribeToShard(r -> r.shardId("myShardId"))) {
160
161    // Block this thread to log 5 Kinesis SubscribeToShardEvent messages
162    transcription.blockingResponseEventStream()
163                 .filter(SubscribeToShardEvent.class::isInstance)
164                 .map(SubscribeToShardEvent.class::cast)
165                 .limit(5)
166                 .forEach(event -> System.out.println(event.records()));
167}
168```