1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.app.tracing.coroutines.flow
18 
19 import com.android.app.tracing.coroutines.nameCoroutine
20 import com.android.app.tracing.coroutines.traceCoroutine
21 import com.android.systemui.Flags
22 import kotlin.experimental.ExperimentalTypeInference
23 import kotlinx.coroutines.flow.Flow
24 import kotlinx.coroutines.flow.FlowCollector
25 import kotlinx.coroutines.flow.SharedFlow
26 import kotlinx.coroutines.flow.collect
27 import kotlinx.coroutines.flow.collectLatest
28 import kotlinx.coroutines.flow.filter
29 import kotlinx.coroutines.flow.flow as safeFlow
30 import kotlinx.coroutines.flow.flowOn
31 import kotlinx.coroutines.flow.map
32 import kotlinx.coroutines.flow.transform
33 
34 /** @see kotlinx.coroutines.flow.internal.unsafeFlow */
35 @PublishedApi
36 internal inline fun <T> unsafeFlow(
37     crossinline block: suspend FlowCollector<T>.() -> Unit
38 ): Flow<T> {
39     return object : Flow<T> {
40         override suspend fun collect(collector: FlowCollector<T>) {
41             collector.block()
42         }
43     }
44 }
45 
46 /** @see kotlinx.coroutines.flow.unsafeTransform */
47 @PublishedApi
unsafeTransformnull48 internal inline fun <T, R> Flow<T>.unsafeTransform(
49     crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
50 ): Flow<R> = unsafeFlow { collect { value -> transform(value) } }
51 
52 /**
53  * Helper for naming the coroutine a flow is collected in. This only has an effect if the flow
54  * changes contexts (e.g. `flowOn()` is used to change the dispatcher), meaning a new coroutine is
55  * created during collection.
56  *
57  * For example, the following would `emit(1)` from a trace section named "a" and collect in section
58  * named "b".
59  *
60  * ```
61  *   launch(nameCoroutine("b") {
62  *     val flow {
63  *       emit(1)
64  *     }
65  *     .flowName("a")
66  *     .flowOn(Dispatchers.Default)
67  *     .collect {
68  *     }
69  *   }
70  * ```
71  */
flowNamenull72 public fun <T> Flow<T>.flowName(name: String): Flow<T> = flowOn(nameCoroutine(name))
73 
74 /**
75  * Applying [flowName][Flow.flowName] to [SharedFlow] has no effect. See the [SharedFlow]
76  * documentation on Operator Fusion.
77  *
78  * @see SharedFlow.flowOn
79  */
80 @Deprecated(
81     level = DeprecationLevel.ERROR,
82     message =
83         "Applying 'flowName' to SharedFlow has no effect. See the SharedFlow documentation on Operator Fusion.",
84     replaceWith = ReplaceWith("this"),
85 )
86 @Suppress("UnusedReceiverParameter")
87 public fun <T> SharedFlow<T>.flowName(@Suppress("UNUSED_PARAMETER") name: String): Flow<T> =
88     throw UnsupportedOperationException("Not implemented, should not be called")
89 
90 /**
91  * NOTE: [Flow.collect] is a member function and takes precedence if this function is imported as
92  * `collect` and the default parameter is used. (In Kotlin, when an extension function has the same
93  * receiver type, name, and applicable arguments as a class member function, the member takes
94  * precedence).
95  *
96  * For example,
97  * ```
98  * import com.android.app.tracing.coroutines.flow.collectTraced as collect
99  * ...
100  * flowOf(1).collect { ... } // this will call `Flow.collect`
101  * flowOf(1).collect(null) { ... } // this will call `collectTraced`
102  * ```
103  */
104 public suspend fun <T> Flow<T>.collectTraced(name: String, collector: FlowCollector<T>) {
105     if (Flags.coroutineTracing()) {
106         val collectName = "collect:$name"
107         val emitName = "$collectName:emit"
108         traceCoroutine(collectName) { collect { traceCoroutine(emitName) { collector.emit(it) } } }
109     } else {
110         collect(collector)
111     }
112 }
113 
114 /** @see Flow.collectTraced */
collectTracednull115 public suspend fun <T> Flow<T>.collectTraced(collector: FlowCollector<T>) {
116     if (Flags.coroutineTracing()) {
117         collectTraced(
118             name = collector::class.java.name.substringAfterLast("."),
119             collector = collector,
120         )
121     } else {
122         collect(collector)
123     }
124 }
125 
collectLatestTracednull126 internal suspend fun <T> Flow<T>.collectLatestTraced(
127     name: String,
128     action: suspend (value: T) -> Unit,
129 ) {
130     if (Flags.coroutineTracing()) {
131         val collectName = "collectLatest:$name"
132         val actionName = "$collectName:action"
133         return traceCoroutine(collectName) {
134             collectLatest { traceCoroutine(actionName) { action(it) } }
135         }
136     } else {
137         collectLatest(action)
138     }
139 }
140 
collectLatestTracednull141 public suspend fun <T> Flow<T>.collectLatestTraced(action: suspend (value: T) -> Unit) {
142     if (Flags.coroutineTracing()) {
143         collectLatestTraced(action::class.java.name.substringAfterLast("."), action)
144     } else {
145         collectLatest(action)
146     }
147 }
148 
149 /** @see kotlinx.coroutines.flow.transform */
150 @OptIn(ExperimentalTypeInference::class)
transformTracednull151 public inline fun <T, R> Flow<T>.transformTraced(
152     name: String,
153     @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit,
154 ): Flow<R> =
155     if (Flags.coroutineTracing()) {
156         val emitName = "$name:emit"
157         safeFlow { collect { value -> traceCoroutine(emitName) { transform(value) } } }
158     } else {
159         transform(transform)
160     }
161 
filterTracednull162 public inline fun <T> Flow<T>.filterTraced(
163     name: String,
164     crossinline predicate: suspend (T) -> Boolean,
165 ): Flow<T> {
166     if (Flags.coroutineTracing()) {
167         val predicateName = "filter:$name:predicate"
168         val emitName = "filter:$name:emit"
169         return unsafeTransform { value ->
170             if (traceCoroutine(predicateName) { predicate(value) }) {
171                 traceCoroutine(emitName) {
172                     return@unsafeTransform emit(value)
173                 }
174             }
175         }
176     } else {
177         return filter(predicate)
178     }
179 }
180 
mapTracednull181 public inline fun <T, R> Flow<T>.mapTraced(
182     name: String,
183     crossinline transform: suspend (value: T) -> R,
184 ): Flow<R> {
185     if (Flags.coroutineTracing()) {
186         val transformName = "map:$name:transform"
187         val emitName = "map:$name:emit"
188         return unsafeTransform { value ->
189             val transformedValue = traceCoroutine(transformName) { transform(value) }
190             traceCoroutine(emitName) {
191                 return@unsafeTransform emit(transformedValue)
192             }
193         }
194     } else {
195         return map(transform)
196     }
197 }
198