xref: /aosp_15_r20/frameworks/base/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt (revision d57664e9bc4670b3ecf6748a746a57c557b6bc9e)
1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.systemui.kairos.internal
18 
19 import com.android.systemui.kairos.TState
20 import com.android.systemui.kairos.internal.util.HeteroMap
21 import com.android.systemui.kairos.util.Just
22 import com.android.systemui.kairos.util.Maybe
23 import com.android.systemui.kairos.util.just
24 import com.android.systemui.kairos.util.none
25 import java.util.concurrent.ConcurrentHashMap
26 import java.util.concurrent.ConcurrentLinkedDeque
27 import java.util.concurrent.ConcurrentLinkedQueue
28 import java.util.concurrent.atomic.AtomicLong
29 import kotlin.coroutines.ContinuationInterceptor
30 import kotlinx.coroutines.CompletableDeferred
31 import kotlinx.coroutines.CoroutineScope
32 import kotlinx.coroutines.Deferred
33 import kotlinx.coroutines.Dispatchers
34 import kotlinx.coroutines.channels.Channel
35 import kotlinx.coroutines.coroutineScope
36 import kotlinx.coroutines.job
37 import kotlinx.coroutines.launch
38 import kotlinx.coroutines.sync.Mutex
39 import kotlinx.coroutines.sync.withLock
40 import kotlinx.coroutines.yield
41 
42 private val nextNetworkId = AtomicLong()
43 
44 internal class Network(val coroutineScope: CoroutineScope) : NetworkScope {
45 
46     override val networkId: Any = nextNetworkId.getAndIncrement()
47 
48     @Volatile
49     override var epoch: Long = 0L
50         private set
51 
52     override val network
53         get() = this
54 
55     override val compactor = SchedulerImpl()
56     override val scheduler = SchedulerImpl()
57     override val transactionStore = HeteroMap()
58 
59     private val stateWrites = ConcurrentLinkedQueue<TStateSource<*>>()
60     private val outputsByDispatcher =
61         ConcurrentHashMap<ContinuationInterceptor, ConcurrentLinkedQueue<Output<*>>>()
62     private val muxMovers = ConcurrentLinkedQueue<MuxDeferredNode<*, *>>()
63     private val deactivations = ConcurrentLinkedDeque<PushNode<*>>()
64     private val outputDeactivations = ConcurrentLinkedQueue<Output<*>>()
65     private val transactionMutex = Mutex()
66     private val inputScheduleChan = Channel<ScheduledAction<*>>()
67 
68     override fun scheduleOutput(output: Output<*>) {
69         val continuationInterceptor =
70             output.context[ContinuationInterceptor] ?: Dispatchers.Unconfined
71         outputsByDispatcher
72             .computeIfAbsent(continuationInterceptor) { ConcurrentLinkedQueue() }
73             .add(output)
74     }
75 
76     override fun scheduleMuxMover(muxMover: MuxDeferredNode<*, *>) {
77         muxMovers.add(muxMover)
78     }
79 
80     override fun schedule(state: TStateSource<*>) {
81         stateWrites.add(state)
82     }
83 
84     // TODO: weird that we have this *and* scheduler exposed
85     override suspend fun schedule(node: MuxNode<*, *, *>) {
86         scheduler.schedule(node.depthTracker.dirty_directDepth, node)
87     }
88 
89     override fun scheduleDeactivation(node: PushNode<*>) {
90         deactivations.add(node)
91     }
92 
93     override fun scheduleDeactivation(output: Output<*>) {
94         outputDeactivations.add(output)
95     }
96 
97     /** Listens for external events and starts FRP transactions. Runs forever. */
98     suspend fun runInputScheduler() {
99         val actions = mutableListOf<ScheduledAction<*>>()
100         for (first in inputScheduleChan) {
101             // Drain and conflate all transaction requests into a single transaction
102             actions.add(first)
103             while (true) {
104                 yield()
105                 val func = inputScheduleChan.tryReceive().getOrNull() ?: break
106                 actions.add(func)
107             }
108             transactionMutex.withLock {
109                 // Run all actions
110                 evalScope {
111                     for (action in actions) {
112                         launch { action.started(evalScope = this@evalScope) }
113                     }
114                 }
115                 // Step through the network
116                 doTransaction()
117                 // Signal completion
118                 while (actions.isNotEmpty()) {
119                     actions.removeLast().completed()
120                 }
121             }
122         }
123     }
124 
125     /** Evaluates [block] inside of a new transaction when the network is ready. */
126     fun <R> transaction(block: suspend EvalScope.() -> R): Deferred<R> =
127         CompletableDeferred<R>(parent = coroutineScope.coroutineContext.job).also { onResult ->
128             val job =
129                 coroutineScope.launch {
130                     inputScheduleChan.send(
131                         ScheduledAction(onStartTransaction = block, onResult = onResult)
132                     )
133                 }
134             onResult.invokeOnCompletion { job.cancel() }
135         }
136 
137     suspend fun <R> evalScope(block: suspend EvalScope.() -> R): R = deferScope {
138         block(EvalScopeImpl(this@Network, this))
139     }
140 
141     /** Performs a transactional update of the FRP network. */
142     private suspend fun doTransaction() {
143         // Traverse network, then run outputs
144         do {
145             scheduler.drainEval(this)
146         } while (evalScope { evalOutputs(this) })
147         // Update states
148         evalScope { evalStateWriters(this) }
149         transactionStore.clear()
150         // Perform deferred switches
151         evalScope { evalMuxMovers(this) }
152         // Compact depths
153         scheduler.drainCompact()
154         compactor.drainCompact()
155         // Deactivate nodes with no downstream
156         evalDeactivations()
157         epoch++
158     }
159 
160     /** Invokes all [Output]s that have received data within this transaction. */
161     private suspend fun evalOutputs(evalScope: EvalScope): Boolean {
162         // Outputs can enqueue other outputs, so we need two loops
163         if (outputsByDispatcher.isEmpty()) return false
164         while (outputsByDispatcher.isNotEmpty()) {
165             var launchedAny = false
166             coroutineScope {
167                 for ((key, outputs) in outputsByDispatcher) {
168                     if (outputs.isNotEmpty()) {
169                         launchedAny = true
170                         launch(key) {
171                             while (outputs.isNotEmpty()) {
172                                 val output = outputs.remove()
173                                 launch { output.visit(evalScope) }
174                             }
175                         }
176                     }
177                 }
178             }
179             if (!launchedAny) outputsByDispatcher.clear()
180         }
181         return true
182     }
183 
184     private suspend fun evalMuxMovers(evalScope: EvalScope) {
185         while (muxMovers.isNotEmpty()) {
186             coroutineScope {
187                 val toMove = muxMovers.remove()
188                 launch { toMove.performMove(evalScope) }
189             }
190         }
191     }
192 
193     /** Updates all [TState]es that have changed within this transaction. */
194     private suspend fun evalStateWriters(evalScope: EvalScope) {
195         coroutineScope {
196             while (stateWrites.isNotEmpty()) {
197                 val latch = stateWrites.remove()
198                 launch { latch.updateState(evalScope) }
199             }
200         }
201     }
202 
203     private suspend fun evalDeactivations() {
204         coroutineScope {
205             launch {
206                 while (deactivations.isNotEmpty()) {
207                     // traverse in reverse order
208                     //   - deactivations are added in depth-order during the node traversal phase
209                     //   - perform deactivations in reverse order, in case later ones propagate to
210                     //     earlier ones
211                     val toDeactivate = deactivations.removeLast()
212                     launch { toDeactivate.deactivateIfNeeded() }
213                 }
214             }
215             while (outputDeactivations.isNotEmpty()) {
216                 val toDeactivate = outputDeactivations.remove()
217                 launch {
218                     toDeactivate.upstream?.removeDownstreamAndDeactivateIfNeeded(
219                         downstream = toDeactivate.schedulable
220                     )
221                 }
222             }
223         }
224         check(deactivations.isEmpty()) { "unexpected lingering deactivations" }
225         check(outputDeactivations.isEmpty()) { "unexpected lingering output deactivations" }
226     }
227 }
228 
229 internal class ScheduledAction<T>(
230     private val onResult: CompletableDeferred<T>? = null,
231     private val onStartTransaction: suspend EvalScope.() -> T,
232 ) {
233     private var result: Maybe<T> = none
234 
startednull235     suspend fun started(evalScope: EvalScope) {
236         result = just(onStartTransaction(evalScope))
237     }
238 
completednull239     fun completed() {
240         if (onResult != null) {
241             when (val result = result) {
242                 is Just -> onResult.complete(result.value)
243                 else -> {}
244             }
245         }
246         result = none
247     }
248 }
249 
250 internal typealias TransactionStore = HeteroMap
251