1 /*
<lambda>null2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.app.tracing.coroutines.flow
18
19 import com.android.app.tracing.coroutines.nameCoroutine
20 import com.android.app.tracing.coroutines.traceCoroutine
21 import com.android.systemui.Flags
22 import kotlin.experimental.ExperimentalTypeInference
23 import kotlinx.coroutines.flow.Flow
24 import kotlinx.coroutines.flow.FlowCollector
25 import kotlinx.coroutines.flow.SharedFlow
26 import kotlinx.coroutines.flow.collect
27 import kotlinx.coroutines.flow.collectLatest
28 import kotlinx.coroutines.flow.filter
29 import kotlinx.coroutines.flow.flow as safeFlow
30 import kotlinx.coroutines.flow.flowOn
31 import kotlinx.coroutines.flow.map
32 import kotlinx.coroutines.flow.transform
33
34 /** @see kotlinx.coroutines.flow.internal.unsafeFlow */
35 @PublishedApi
36 internal inline fun <T> unsafeFlow(
37 crossinline block: suspend FlowCollector<T>.() -> Unit
38 ): Flow<T> {
39 return object : Flow<T> {
40 override suspend fun collect(collector: FlowCollector<T>) {
41 collector.block()
42 }
43 }
44 }
45
46 /** @see kotlinx.coroutines.flow.unsafeTransform */
47 @PublishedApi
unsafeTransformnull48 internal inline fun <T, R> Flow<T>.unsafeTransform(
49 crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
50 ): Flow<R> = unsafeFlow { collect { value -> transform(value) } }
51
52 /**
53 * Helper for naming the coroutine a flow is collected in. This only has an effect if the flow
54 * changes contexts (e.g. `flowOn()` is used to change the dispatcher), meaning a new coroutine is
55 * created during collection.
56 *
57 * For example, the following would `emit(1)` from a trace section named "a" and collect in section
58 * named "b".
59 *
60 * ```
61 * launch(nameCoroutine("b") {
62 * val flow {
63 * emit(1)
64 * }
65 * .flowName("a")
66 * .flowOn(Dispatchers.Default)
67 * .collect {
68 * }
69 * }
70 * ```
71 */
flowNamenull72 public fun <T> Flow<T>.flowName(name: String): Flow<T> = flowOn(nameCoroutine(name))
73
74 /**
75 * Applying [flowName][Flow.flowName] to [SharedFlow] has no effect. See the [SharedFlow]
76 * documentation on Operator Fusion.
77 *
78 * @see SharedFlow.flowOn
79 */
80 @Deprecated(
81 level = DeprecationLevel.ERROR,
82 message =
83 "Applying 'flowName' to SharedFlow has no effect. See the SharedFlow documentation on Operator Fusion.",
84 replaceWith = ReplaceWith("this"),
85 )
86 @Suppress("UnusedReceiverParameter")
87 public fun <T> SharedFlow<T>.flowName(@Suppress("UNUSED_PARAMETER") name: String): Flow<T> =
88 throw UnsupportedOperationException("Not implemented, should not be called")
89
90 /**
91 * NOTE: [Flow.collect] is a member function and takes precedence if this function is imported as
92 * `collect` and the default parameter is used. (In Kotlin, when an extension function has the same
93 * receiver type, name, and applicable arguments as a class member function, the member takes
94 * precedence).
95 *
96 * For example,
97 * ```
98 * import com.android.app.tracing.coroutines.flow.collectTraced as collect
99 * ...
100 * flowOf(1).collect { ... } // this will call `Flow.collect`
101 * flowOf(1).collect(null) { ... } // this will call `collectTraced`
102 * ```
103 */
104 public suspend fun <T> Flow<T>.collectTraced(name: String, collector: FlowCollector<T>) {
105 if (Flags.coroutineTracing()) {
106 val collectName = "collect:$name"
107 val emitName = "$collectName:emit"
108 traceCoroutine(collectName) { collect { traceCoroutine(emitName) { collector.emit(it) } } }
109 } else {
110 collect(collector)
111 }
112 }
113
114 /** @see Flow.collectTraced */
collectTracednull115 public suspend fun <T> Flow<T>.collectTraced(collector: FlowCollector<T>) {
116 if (Flags.coroutineTracing()) {
117 collectTraced(
118 name = collector::class.java.name.substringAfterLast("."),
119 collector = collector,
120 )
121 } else {
122 collect(collector)
123 }
124 }
125
collectLatestTracednull126 internal suspend fun <T> Flow<T>.collectLatestTraced(
127 name: String,
128 action: suspend (value: T) -> Unit,
129 ) {
130 if (Flags.coroutineTracing()) {
131 val collectName = "collectLatest:$name"
132 val actionName = "$collectName:action"
133 return traceCoroutine(collectName) {
134 collectLatest { traceCoroutine(actionName) { action(it) } }
135 }
136 } else {
137 collectLatest(action)
138 }
139 }
140
collectLatestTracednull141 public suspend fun <T> Flow<T>.collectLatestTraced(action: suspend (value: T) -> Unit) {
142 if (Flags.coroutineTracing()) {
143 collectLatestTraced(action::class.java.name.substringAfterLast("."), action)
144 } else {
145 collectLatest(action)
146 }
147 }
148
149 /** @see kotlinx.coroutines.flow.transform */
150 @OptIn(ExperimentalTypeInference::class)
transformTracednull151 public inline fun <T, R> Flow<T>.transformTraced(
152 name: String,
153 @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit,
154 ): Flow<R> =
155 if (Flags.coroutineTracing()) {
156 val emitName = "$name:emit"
157 safeFlow { collect { value -> traceCoroutine(emitName) { transform(value) } } }
158 } else {
159 transform(transform)
160 }
161
filterTracednull162 public inline fun <T> Flow<T>.filterTraced(
163 name: String,
164 crossinline predicate: suspend (T) -> Boolean,
165 ): Flow<T> {
166 if (Flags.coroutineTracing()) {
167 val predicateName = "filter:$name:predicate"
168 val emitName = "filter:$name:emit"
169 return unsafeTransform { value ->
170 if (traceCoroutine(predicateName) { predicate(value) }) {
171 traceCoroutine(emitName) {
172 return@unsafeTransform emit(value)
173 }
174 }
175 }
176 } else {
177 return filter(predicate)
178 }
179 }
180
mapTracednull181 public inline fun <T, R> Flow<T>.mapTraced(
182 name: String,
183 crossinline transform: suspend (value: T) -> R,
184 ): Flow<R> {
185 if (Flags.coroutineTracing()) {
186 val transformName = "map:$name:transform"
187 val emitName = "map:$name:emit"
188 return unsafeTransform { value ->
189 val transformedValue = traceCoroutine(transformName) { transform(value) }
190 traceCoroutine(emitName) {
191 return@unsafeTransform emit(transformedValue)
192 }
193 }
194 } else {
195 return map(transform)
196 }
197 }
198