1 package kotlinx.coroutines.stream 2 3 import kotlinx.atomicfu.* 4 import kotlinx.coroutines.* 5 import kotlinx.coroutines.flow.* 6 import java.util.stream.* 7 8 /** 9 * Represents the given stream as a flow and [closes][Stream.close] the stream afterwards. 10 * The resulting flow can be [collected][Flow.collect] only once 11 * and throws [IllegalStateException] when trying to collect it more than once. 12 */ consumeAsFlownull13public fun <T> Stream<T>.consumeAsFlow(): Flow<T> = StreamFlow(this) 14 15 private class StreamFlow<T>(private val stream: Stream<T>) : Flow<T> { 16 private val consumed = atomic(false) 17 18 override suspend fun collect(collector: FlowCollector<T>) { 19 if (!consumed.compareAndSet(false, true)) error("Stream.consumeAsFlow can be collected only once") 20 try { 21 for (value in stream.iterator()) { 22 collector.emit(value) 23 } 24 } finally { 25 stream.close() 26 } 27 } 28 } 29