1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.example.tracing.demo.experiments
17 
18 import com.android.app.tracing.coroutines.flow.collectTraced as collect
19 import com.android.app.tracing.coroutines.flow.collectTraced
20 import com.android.app.tracing.coroutines.flow.filterTraced as filter
21 import com.android.app.tracing.coroutines.flow.flowName
22 import com.android.app.tracing.coroutines.flow.mapTraced as map
23 import com.android.app.tracing.coroutines.launchTraced as launch
24 import com.android.app.tracing.coroutines.nameCoroutine
25 import com.android.app.tracing.coroutines.traceCoroutine
26 import com.example.tracing.demo.FixedThreadA
27 import com.example.tracing.demo.FixedThreadB
28 import com.example.tracing.demo.FixedThreadC
29 import com.example.tracing.demo.FixedThreadD
30 import javax.inject.Inject
31 import javax.inject.Singleton
32 import kotlinx.coroutines.CoroutineDispatcher
33 import kotlinx.coroutines.coroutineScope
34 import kotlinx.coroutines.flow.SharingStarted
35 import kotlinx.coroutines.flow.flowOn
36 import kotlinx.coroutines.flow.stateIn
37 
38 @Singleton
39 class SharedFlowUsage
40 @Inject
41 constructor(
42     @FixedThreadA private var dispatcherA: CoroutineDispatcher,
43     @FixedThreadB private var dispatcherB: CoroutineDispatcher,
44     @FixedThreadC private var dispatcherC: CoroutineDispatcher,
45     @FixedThreadD private var dispatcherD: CoroutineDispatcher,
46 ) : Experiment {
47 
48     override val description: String = "Create a shared flow and collect from it"
49 
50     private val coldFlow =
51         coldCounterFlow("shared", 10)
52             // this trace name is NOT used because the dispatcher did NOT change
53             .flowName("UNUSED_NAME")
<lambda>null54             .map("pow2") {
55                 val rv = it * it
56                 forceSuspend("map($it) -> $rv", 50)
57                 rv
58             }
59             // this trace name is used here because the dispatcher changed
60             .flowOn(dispatcherC + nameCoroutine("NEW_COLD_FLOW_NAME"))
<lambda>null61             .filter("mod4") {
62                 val rv = it % 4 == 0
63                 forceSuspend("filter($it) -> $rv", 50)
64                 rv
65             }
66             // this trace name is used, because the scope it is collected in has a
67             // CoroutineTracingContext
68             .flowName("COLD_FLOW")
69 
startnull70     override suspend fun start() {
71         coroutineScope {
72             val stateFlow = coldFlow.stateIn(this, SharingStarted.Eagerly, 10)
73             launch("launchAAAA", dispatcherA) {
74                 stateFlow.collect("collectAAAA") {
75                     traceCoroutine("AAAA collected: $it") { forceSuspend("AAAA", 15) }
76                 }
77             }
78             launch("launchBBBB", dispatcherB) {
79                 // Don't pass a string. Instead, rely on default behavior to walk the stack for the
80                 // name. This results in trace sections like:
81                 // `collect:SharedFlowUsage$start$1$2:emit`
82                 // NOTE: `Flow.collect` is a member function and takes precedence, so we need
83                 // to invoke `collectTraced` using its original name instead of its `collect` alias
84                 stateFlow.collectTraced {
85                     traceCoroutine("BBBB collected: $it") { forceSuspend("BBBB", 30) }
86                 }
87             }
88             launch("launchCCCC", dispatcherC) {
89                 stateFlow.collect("collectCCCC") {
90                     traceCoroutine("CCCC collected: $it") { forceSuspend("CCCC", 60) }
91                 }
92             }
93             launch("launchDDDD", dispatcherD) {
94                 // Uses Flow.collect member function instead of collectTraced:
95                 stateFlow.collect {
96                     traceCoroutine("DDDD collected: $it") { forceSuspend("DDDD", 90) }
97                 }
98             }
99         }
100     }
101 }
102