xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/jdk8/src/stream/Stream.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1 package kotlinx.coroutines.stream
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.flow.*
6 import java.util.stream.*
7 
8 /**
9  * Represents the given stream as a flow and [closes][Stream.close] the stream afterwards.
10  * The resulting flow can be [collected][Flow.collect] only once
11  * and throws [IllegalStateException] when trying to collect it more than once.
12  */
consumeAsFlownull13 public fun <T> Stream<T>.consumeAsFlow(): Flow<T> = StreamFlow(this)
14 
15 private class StreamFlow<T>(private val stream: Stream<T>) : Flow<T> {
16     private val consumed = atomic(false)
17 
18     override suspend fun collect(collector: FlowCollector<T>) {
19         if (!consumed.compareAndSet(false, true)) error("Stream.consumeAsFlow can be collected only once")
20         try {
21             for (value in stream.iterator()) {
22                 collector.emit(value)
23             }
24         } finally {
25             stream.close()
26         }
27     }
28 }
29