<lambda>null1package kotlinx.coroutines.flow 2 3 import kotlinx.coroutines.* 4 import kotlinx.coroutines.flow.internal.* 5 import kotlinx.coroutines.flow.internal.AbortFlowException 6 import kotlinx.coroutines.flow.internal.unsafeFlow 7 import org.openjdk.jmh.annotations.* 8 import java.util.concurrent.* 9 10 @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) 11 @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) 12 @Fork(value = 1) 13 @BenchmarkMode(Mode.AverageTime) 14 @OutputTimeUnit(TimeUnit.MICROSECONDS) 15 @State(Scope.Benchmark) 16 open class TakeWhileBenchmark { 17 @Param("1", "10", "100", "1000") 18 private var size: Int = 0 19 20 private suspend inline fun Flow<Long>.consume() = 21 filter { it % 2L != 0L } 22 .map { it * it }.count() 23 24 @Benchmark 25 fun baseline() = runBlocking<Int> { 26 (0L until size).asFlow().consume() 27 } 28 29 @Benchmark 30 fun takeWhileDirect() = runBlocking<Int> { 31 (0L..Long.MAX_VALUE).asFlow().takeWhileDirect { it < size }.consume() 32 } 33 34 @Benchmark 35 fun takeWhileViaCollectWhile() = runBlocking<Int> { 36 (0L..Long.MAX_VALUE).asFlow().takeWhileViaCollectWhile { it < size }.consume() 37 } 38 39 // Direct implementation by checking predicate and throwing AbortFlowException 40 private fun <T> Flow<T>.takeWhileDirect(predicate: suspend (T) -> Boolean): Flow<T> = unsafeFlow { 41 try { 42 collect { value -> 43 if (predicate(value)) emit(value) 44 else throw AbortFlowException(this) 45 } 46 } catch (e: AbortFlowException) { 47 e.checkOwnership(owner = this) 48 } 49 } 50 51 // Essentially the same code, but reusing the logic via collectWhile function 52 private fun <T> Flow<T>.takeWhileViaCollectWhile(predicate: suspend (T) -> Boolean): Flow<T> = unsafeFlow { 53 // This return is needed to work around a bug in JS BE: KT-39227 54 return@unsafeFlow collectWhile { value -> 55 if (predicate(value)) { 56 emit(value) 57 true 58 } else { 59 false 60 } 61 } 62 } 63 } 64