<lambda>null1 package kotlinx.coroutines.testing.flow
2
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.flow.*
5 import kotlin.jvm.*
6 import kotlin.reflect.*
7
8 public typealias Handler<T> = suspend CoroutineScope.(T) -> Unit
9
10 /*
11 * Design of this builder is not yet stable, so leaving it as is.
12 */
13 public class LaunchFlowBuilder<T> {
14 /*
15 * NB: this implementation is a temporary ad-hoc (and slightly incorrect)
16 * solution until coroutine-builders are ready
17 *
18 * NB 2: this internal stuff is required to workaround KT-30795
19 */
20 @PublishedApi
21 internal var onEach: Handler<T>? = null
22 @PublishedApi
23 internal var finally: Handler<Throwable?>? = null
24 @PublishedApi
25 internal var exceptionHandlers = LinkedHashMap<KClass<*>, Handler<Throwable>>()
26
27 public fun onEach(action: suspend CoroutineScope.(value: T) -> Unit) {
28 check(onEach == null) { "onEach block is already registered" }
29 check(exceptionHandlers.isEmpty()) { "onEach block should be registered before exceptionHandlers block" }
30 check(finally == null) { "onEach block should be registered before finally block" }
31 onEach = action
32 }
33
34 public inline fun <reified T : Throwable> catch(noinline action: suspend CoroutineScope.(T) -> Unit) {
35 check(onEach != null) { "onEach block should be registered first" }
36 check(finally == null) { "exceptionHandlers block should be registered before finally block" }
37 @Suppress("UNCHECKED_CAST")
38 exceptionHandlers[T::class] = action as Handler<Throwable>
39 }
40
41 public fun finally(action: suspend CoroutineScope.(cause: Throwable?) -> Unit) {
42 check(finally == null) { "Finally block is already registered" }
43 check(onEach != null) { "onEach block should be registered before finally block" }
44 if (finally == null) finally = action
45 }
46
47 internal fun build(): Handlers<T> =
48 Handlers(onEach ?: error("onEach is not registered"), exceptionHandlers, finally)
49 }
50
51 internal class Handlers<T>(
52 @JvmField
53 internal var onEach: Handler<T>,
54 @JvmField
55 internal var exceptionHandlers: Map<KClass<*>, Handler<Throwable>>,
56 @JvmField
57 internal var finally: Handler<Throwable?>?
58 )
59
launchFlownull60 private fun <T> CoroutineScope.launchFlow(
61 flow: Flow<T>,
62 builder: LaunchFlowBuilder<T>.() -> Unit
63 ): Job {
64 val handlers = LaunchFlowBuilder<T>().apply(builder).build()
65 return launch {
66 var caught: Throwable? = null
67 try {
68 coroutineScope {
69 flow.collect { value ->
70 handlers.onEach(this, value)
71 }
72 }
73 } catch (e: Throwable) {
74 handlers.exceptionHandlers.forEach { (key, value) ->
75 if (key.isInstance(e)) {
76 caught = e
77 value.invoke(this, e)
78 return@forEach
79 }
80 }
81 if (caught == null) {
82 caught = e
83 throw e
84 }
85 } finally {
86 cancel() // TODO discuss
87 handlers.finally?.invoke(CoroutineScope(coroutineContext + NonCancellable), caught)
88 }
89 }
90 }
91
launchInnull92 public fun <T> Flow<T>.launchIn(
93 scope: CoroutineScope,
94 builder: LaunchFlowBuilder<T>.() -> Unit
95 ): Job = scope.launchFlow(this, builder)
96