1**Design:** New Feature, **Status:** [Proposed](../../../README.md) 2 3# Event Stream Alternate Syntax 4 5Event streaming allows long-running bi-directional communication between 6customers and AWS services over HTTP/2 connections. 7 8The current syntax for event streaming APIs is adequate for power users, 9but has a few disadvantages: 10 111. Customers must use reactive streams APIs, even for relatively simple 12 use-cases. Reactive streams APIs are powerful, but difficult to use 13 without external documentation and libraries. 142. All response processing must be performed in a callback (the 15 `ResponseHandler` abstraction), which makes it challenging to 16 propagate information to the rest of the application. 17 18This mini-proposal suggests an alternate syntax that customers would be 19able to use for all event streaming operations. 20 21## Proposal 22 23A new method will be added to each event streaming operation: 24`Running{OPERATION} {OPERATION}({OPERATION}Request)` (and its 25consumer-builder variant). 26 27A new type will be created for each event streaming operation: 28`Running{OPERATION}`: 29 30```Java 31interface Running{OPERATION} extends AutoCloseable { 32 // A future that is completed when the entire operation completes. 33 CompletableFuture<Void> completionFuture(); 34 35 /** 36 * Methods enabling reading individual events asynchronously, as they are received. 37 */ 38 39 CompletableFuture<Void> readAll(Consumer<{RESPONSE_EVENT_TYPE}> reader); 40 CompletableFuture<Void> readAll({RESPONSE_EVENT_TYPE}Visitor responseVisitor); 41 <T extends {RESPONSE_EVENT_TYPE}> CompletableFuture<Void> readAll(Class<T> type, Consumer<T> reader); 42 43 CompletableFuture<Optional<{REQUEST_EVENT_TYPE}>> readNext(); 44 <T extends {RESPONSE_EVENT_TYPE}> CompletableFuture<Optional<T>> readNext(Class<T> type); 45 46 /** 47 * Methods enabling writing individual events asynchronously. 48 */ 49 50 CompletableFuture<Void> writeAll(Publisher<? extends {REQUEST_EVENT_TYPE}> events); 51 CompletableFuture<Void> writeAll(Iterable<? extends {REQUEST_EVENT_TYPE}> events); 52 CompletableFuture<Void> write({REQUEST_EVENT_TYPE} event); 53 54 /** 55 * Reactive-streams methods for reading events and response messages, as they are received. 56 */ 57 Publisher<{RESPONSE_EVENT_TYPE}> responseEventPublisher(); 58 Publisher<{OPERATION}Response> responsePublisher(); 59 60 /** 61 * Java-8-streams methods for reading events and response messages, as they are received. 62 */ 63 64 Stream<{RESPONSE_EVENT_TYPE}> blockingResponseEventStream(); 65 Stream<{OPERATION}Response> blockingResponseStream(); 66 67 @Override 68 default void close() { 69 completionFuture().cancel(false); 70 } 71} 72``` 73 74This type enables customers to use the operation in either a 75reactive-streams or a Java-8 usage pattern, depending on how they care 76to manage their threads and back-pressure. 77 78It's worth noting that every method on `Running{OPERATION}` is still 79non-blocking and will never throw exceptions directly. Any method that 80returns a type that itself contains blocking methods is prefixed with 81`blocking`, e.g. `Stream<{RESPONSE_EVENT_TYPE}> 82blockingResponseEventStream()`. 83 84**Example 1: Transcribe's `startStreamTranscription` with Reactive 85Streams** 86 87```Java 88try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create(); 89 // Create the connection to transcribe and send the initial request message 90 RunningStartStreamTranscription transcription = 91 client.startStreamTranscription(r -> r.languageCode(LanguageCode.EN_US) 92 .mediaEncoding(MediaEncoding.PCM) 93 .mediaSampleRateHertz(16_000))) { 94 95 // Use RxJava to create the audio stream to be transcribed 96 Publisher<AudioStream> audioPublisher = 97 Bytes.from(audioFile) 98 .map(SdkBytes::fromByteArray) 99 .map(bytes -> AudioEvent.builder().audioChunk(bytes).build()) 100 .cast(AudioStream.class); 101 102 // Begin sending the audio data to transcribe, asynchronously 103 transcription.writeAll(audioPublisher); 104 105 // Get a publisher for the transcription 106 Publisher<TranscriptResultStream> transcriptionPublisher = transcription.responseEventPublisher(); 107 108 // Use RxJava to log the transcription 109 Flowable.fromPublisher(transcriptionPublisher) 110 .filter(e -> e instanceof TranscriptEvent) 111 .cast(TranscriptEvent.class) 112 .forEach(e -> System.out.println(e.transcript().results())); 113 114 // Wait for the operation to complete 115 transcription.completionFuture().join(); 116} 117``` 118 119**Example 2: Transcribe's `startStreamTranscription` without Reactive 120Streams** 121 122```Java 123try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create(); 124 // Create the connection to transcribe and send the initial request message 125 RunningStartStreamTranscription transcription = 126 client.startStreamTranscription(r -> r.languageCode(LanguageCode.EN_US) 127 .mediaEncoding(MediaEncoding.PCM) 128 .mediaSampleRateHertz(16_000))) { 129 130 // Asynchronously log response transcription events, as we receive them 131 transcription.readAll(TranscriptEvent.class, e -> System.out.println(e.transcript().results())); 132 133 // Read from our audio file, 4 KB at a time 134 try (InputStream reader = Files.newInputStream(audioFile)) { 135 byte[] buffer = new byte[4096]; 136 int bytesRead; 137 138 while ((bytesRead = reader.read(buffer)) != -1) { 139 if (bytesRead > 0) { 140 // Write the 4 KB we read to transcribe, and wait for the write to complete 141 SdkBytes audioChunk = SdkBytes.fromByteBuffer(ByteBuffer.wrap(buffer, 0, bytesRead)); 142 CompletableFuture<Void> writeCompleteFuture = 143 transcription.write(AudioEvent.builder().audioChunk(audioChunk).build()); 144 writeCompleteFuture.join(); 145 } 146 } 147 } 148 149 // Wait for the operation to complete 150 transcription.completionFuture().join(); 151} 152``` 153 154**Example 3: Kinesis's `subscribeToShard` with Java 8 Streams** 155 156```Java 157try (KinesisAsyncClient client = KinesisAsyncClient.create(); 158 // Create the connection to Kinesis and send the initial request message 159 RunningSubscribeToShard transcription = client.subscribeToShard(r -> r.shardId("myShardId"))) { 160 161 // Block this thread to log 5 Kinesis SubscribeToShardEvent messages 162 transcription.blockingResponseEventStream() 163 .filter(SubscribeToShardEvent.class::isInstance) 164 .map(SubscribeToShardEvent.class::cast) 165 .limit(5) 166 .forEach(event -> System.out.println(event.records())); 167} 168```