xref: /aosp_15_r20/external/kotlinx.coroutines/test-utils/common/src/LaunchFlow.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)

<lambda>null1 package kotlinx.coroutines.testing.flow
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.flow.*
5 import kotlin.jvm.*
6 import kotlin.reflect.*
7 
8 public typealias Handler<T> = suspend CoroutineScope.(T) -> Unit
9 
10 /*
11  * Design of this builder is not yet stable, so leaving it as is.
12  */
13 public class LaunchFlowBuilder<T> {
14     /*
15      * NB: this implementation is a temporary ad-hoc (and slightly incorrect)
16      * solution until coroutine-builders are ready
17      *
18      * NB 2: this internal stuff is required to workaround KT-30795
19      */
20     @PublishedApi
21     internal var onEach: Handler<T>? = null
22     @PublishedApi
23     internal var finally: Handler<Throwable?>? = null
24     @PublishedApi
25     internal var exceptionHandlers = LinkedHashMap<KClass<*>, Handler<Throwable>>()
26 
27     public fun onEach(action: suspend CoroutineScope.(value: T) -> Unit) {
28         check(onEach == null) { "onEach block is already registered" }
29         check(exceptionHandlers.isEmpty()) { "onEach block should be registered before exceptionHandlers block" }
30         check(finally == null) { "onEach block should be registered before finally block" }
31         onEach = action
32     }
33 
34     public inline fun <reified T : Throwable> catch(noinline action: suspend CoroutineScope.(T) -> Unit) {
35         check(onEach != null) { "onEach block should be registered first" }
36         check(finally == null) { "exceptionHandlers block should be registered before finally block" }
37         @Suppress("UNCHECKED_CAST")
38         exceptionHandlers[T::class] = action as Handler<Throwable>
39     }
40 
41     public fun finally(action: suspend CoroutineScope.(cause: Throwable?) -> Unit) {
42         check(finally == null) { "Finally block is already registered" }
43         check(onEach != null) { "onEach block should be registered before finally block" }
44         if (finally == null) finally = action
45     }
46 
47     internal fun build(): Handlers<T> =
48         Handlers(onEach ?: error("onEach is not registered"), exceptionHandlers, finally)
49 }
50 
51 internal class Handlers<T>(
52     @JvmField
53     internal var onEach: Handler<T>,
54     @JvmField
55     internal var exceptionHandlers: Map<KClass<*>, Handler<Throwable>>,
56     @JvmField
57     internal var finally: Handler<Throwable?>?
58 )
59 
launchFlownull60 private fun <T> CoroutineScope.launchFlow(
61     flow: Flow<T>,
62     builder: LaunchFlowBuilder<T>.() -> Unit
63 ): Job {
64     val handlers = LaunchFlowBuilder<T>().apply(builder).build()
65     return launch {
66         var caught: Throwable? = null
67         try {
68             coroutineScope {
69                 flow.collect { value ->
70                     handlers.onEach(this, value)
71                 }
72             }
73         } catch (e: Throwable) {
74             handlers.exceptionHandlers.forEach { (key, value) ->
75                 if (key.isInstance(e)) {
76                     caught = e
77                     value.invoke(this, e)
78                     return@forEach
79                 }
80             }
81             if (caught == null) {
82                 caught = e
83                 throw e
84             }
85         } finally {
86             cancel() // TODO discuss
87             handlers.finally?.invoke(CoroutineScope(coroutineContext + NonCancellable), caught)
88         }
89     }
90 }
91 
launchInnull92 public fun <T> Flow<T>.launchIn(
93     scope: CoroutineScope,
94     builder: LaunchFlowBuilder<T>.() -> Unit
95 ): Job = scope.launchFlow(this, builder)
96