<lambda>null1 package kotlinx.coroutines.flow
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.flow.internal.*
5 import kotlinx.coroutines.flow.internal.AbortFlowException
6 import kotlinx.coroutines.flow.internal.unsafeFlow
7 import org.openjdk.jmh.annotations.*
8 import java.util.concurrent.*
9 
10 @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
11 @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
12 @Fork(value = 1)
13 @BenchmarkMode(Mode.AverageTime)
14 @OutputTimeUnit(TimeUnit.MICROSECONDS)
15 @State(Scope.Benchmark)
16 open class TakeWhileBenchmark {
17     @Param("1", "10", "100", "1000")
18     private var size: Int = 0
19 
20     private suspend inline fun Flow<Long>.consume() =
21         filter { it % 2L != 0L }
22             .map { it * it }.count()
23 
24     @Benchmark
25     fun baseline() = runBlocking<Int> {
26         (0L until size).asFlow().consume()
27     }
28 
29     @Benchmark
30     fun takeWhileDirect() = runBlocking<Int> {
31         (0L..Long.MAX_VALUE).asFlow().takeWhileDirect { it < size }.consume()
32     }
33 
34     @Benchmark
35     fun takeWhileViaCollectWhile() = runBlocking<Int> {
36         (0L..Long.MAX_VALUE).asFlow().takeWhileViaCollectWhile { it < size }.consume()
37     }
38 
39     // Direct implementation by checking predicate and throwing AbortFlowException
40     private fun <T> Flow<T>.takeWhileDirect(predicate: suspend (T) -> Boolean): Flow<T> = unsafeFlow {
41         try {
42             collect { value ->
43                 if (predicate(value)) emit(value)
44                 else throw AbortFlowException(this)
45             }
46         } catch (e: AbortFlowException) {
47             e.checkOwnership(owner = this)
48         }
49     }
50 
51     // Essentially the same code, but reusing the logic via collectWhile function
52     private fun <T> Flow<T>.takeWhileViaCollectWhile(predicate: suspend (T) -> Boolean): Flow<T> = unsafeFlow {
53         // This return is needed to work around a bug in JS BE: KT-39227
54         return@unsafeFlow collectWhile { value ->
55             if (predicate(value)) {
56                 emit(value)
57                 true
58             } else {
59                 false
60             }
61         }
62     }
63 }
64