1 /* <lambda>null2 * Copyright (C) 2024 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.systemui.kairos.internal 18 19 import com.android.systemui.kairos.TState 20 import com.android.systemui.kairos.internal.util.HeteroMap 21 import com.android.systemui.kairos.util.Just 22 import com.android.systemui.kairos.util.Maybe 23 import com.android.systemui.kairos.util.just 24 import com.android.systemui.kairos.util.none 25 import java.util.concurrent.ConcurrentHashMap 26 import java.util.concurrent.ConcurrentLinkedDeque 27 import java.util.concurrent.ConcurrentLinkedQueue 28 import java.util.concurrent.atomic.AtomicLong 29 import kotlin.coroutines.ContinuationInterceptor 30 import kotlinx.coroutines.CompletableDeferred 31 import kotlinx.coroutines.CoroutineScope 32 import kotlinx.coroutines.Deferred 33 import kotlinx.coroutines.Dispatchers 34 import kotlinx.coroutines.channels.Channel 35 import kotlinx.coroutines.coroutineScope 36 import kotlinx.coroutines.job 37 import kotlinx.coroutines.launch 38 import kotlinx.coroutines.sync.Mutex 39 import kotlinx.coroutines.sync.withLock 40 import kotlinx.coroutines.yield 41 42 private val nextNetworkId = AtomicLong() 43 44 internal class Network(val coroutineScope: CoroutineScope) : NetworkScope { 45 46 override val networkId: Any = nextNetworkId.getAndIncrement() 47 48 @Volatile 49 override var epoch: Long = 0L 50 private set 51 52 override val network 53 get() = this 54 55 override val compactor = SchedulerImpl() 56 override val scheduler = SchedulerImpl() 57 override val transactionStore = HeteroMap() 58 59 private val stateWrites = ConcurrentLinkedQueue<TStateSource<*>>() 60 private val outputsByDispatcher = 61 ConcurrentHashMap<ContinuationInterceptor, ConcurrentLinkedQueue<Output<*>>>() 62 private val muxMovers = ConcurrentLinkedQueue<MuxDeferredNode<*, *>>() 63 private val deactivations = ConcurrentLinkedDeque<PushNode<*>>() 64 private val outputDeactivations = ConcurrentLinkedQueue<Output<*>>() 65 private val transactionMutex = Mutex() 66 private val inputScheduleChan = Channel<ScheduledAction<*>>() 67 68 override fun scheduleOutput(output: Output<*>) { 69 val continuationInterceptor = 70 output.context[ContinuationInterceptor] ?: Dispatchers.Unconfined 71 outputsByDispatcher 72 .computeIfAbsent(continuationInterceptor) { ConcurrentLinkedQueue() } 73 .add(output) 74 } 75 76 override fun scheduleMuxMover(muxMover: MuxDeferredNode<*, *>) { 77 muxMovers.add(muxMover) 78 } 79 80 override fun schedule(state: TStateSource<*>) { 81 stateWrites.add(state) 82 } 83 84 // TODO: weird that we have this *and* scheduler exposed 85 override suspend fun schedule(node: MuxNode<*, *, *>) { 86 scheduler.schedule(node.depthTracker.dirty_directDepth, node) 87 } 88 89 override fun scheduleDeactivation(node: PushNode<*>) { 90 deactivations.add(node) 91 } 92 93 override fun scheduleDeactivation(output: Output<*>) { 94 outputDeactivations.add(output) 95 } 96 97 /** Listens for external events and starts FRP transactions. Runs forever. */ 98 suspend fun runInputScheduler() { 99 val actions = mutableListOf<ScheduledAction<*>>() 100 for (first in inputScheduleChan) { 101 // Drain and conflate all transaction requests into a single transaction 102 actions.add(first) 103 while (true) { 104 yield() 105 val func = inputScheduleChan.tryReceive().getOrNull() ?: break 106 actions.add(func) 107 } 108 transactionMutex.withLock { 109 // Run all actions 110 evalScope { 111 for (action in actions) { 112 launch { action.started(evalScope = this@evalScope) } 113 } 114 } 115 // Step through the network 116 doTransaction() 117 // Signal completion 118 while (actions.isNotEmpty()) { 119 actions.removeLast().completed() 120 } 121 } 122 } 123 } 124 125 /** Evaluates [block] inside of a new transaction when the network is ready. */ 126 fun <R> transaction(block: suspend EvalScope.() -> R): Deferred<R> = 127 CompletableDeferred<R>(parent = coroutineScope.coroutineContext.job).also { onResult -> 128 val job = 129 coroutineScope.launch { 130 inputScheduleChan.send( 131 ScheduledAction(onStartTransaction = block, onResult = onResult) 132 ) 133 } 134 onResult.invokeOnCompletion { job.cancel() } 135 } 136 137 suspend fun <R> evalScope(block: suspend EvalScope.() -> R): R = deferScope { 138 block(EvalScopeImpl(this@Network, this)) 139 } 140 141 /** Performs a transactional update of the FRP network. */ 142 private suspend fun doTransaction() { 143 // Traverse network, then run outputs 144 do { 145 scheduler.drainEval(this) 146 } while (evalScope { evalOutputs(this) }) 147 // Update states 148 evalScope { evalStateWriters(this) } 149 transactionStore.clear() 150 // Perform deferred switches 151 evalScope { evalMuxMovers(this) } 152 // Compact depths 153 scheduler.drainCompact() 154 compactor.drainCompact() 155 // Deactivate nodes with no downstream 156 evalDeactivations() 157 epoch++ 158 } 159 160 /** Invokes all [Output]s that have received data within this transaction. */ 161 private suspend fun evalOutputs(evalScope: EvalScope): Boolean { 162 // Outputs can enqueue other outputs, so we need two loops 163 if (outputsByDispatcher.isEmpty()) return false 164 while (outputsByDispatcher.isNotEmpty()) { 165 var launchedAny = false 166 coroutineScope { 167 for ((key, outputs) in outputsByDispatcher) { 168 if (outputs.isNotEmpty()) { 169 launchedAny = true 170 launch(key) { 171 while (outputs.isNotEmpty()) { 172 val output = outputs.remove() 173 launch { output.visit(evalScope) } 174 } 175 } 176 } 177 } 178 } 179 if (!launchedAny) outputsByDispatcher.clear() 180 } 181 return true 182 } 183 184 private suspend fun evalMuxMovers(evalScope: EvalScope) { 185 while (muxMovers.isNotEmpty()) { 186 coroutineScope { 187 val toMove = muxMovers.remove() 188 launch { toMove.performMove(evalScope) } 189 } 190 } 191 } 192 193 /** Updates all [TState]es that have changed within this transaction. */ 194 private suspend fun evalStateWriters(evalScope: EvalScope) { 195 coroutineScope { 196 while (stateWrites.isNotEmpty()) { 197 val latch = stateWrites.remove() 198 launch { latch.updateState(evalScope) } 199 } 200 } 201 } 202 203 private suspend fun evalDeactivations() { 204 coroutineScope { 205 launch { 206 while (deactivations.isNotEmpty()) { 207 // traverse in reverse order 208 // - deactivations are added in depth-order during the node traversal phase 209 // - perform deactivations in reverse order, in case later ones propagate to 210 // earlier ones 211 val toDeactivate = deactivations.removeLast() 212 launch { toDeactivate.deactivateIfNeeded() } 213 } 214 } 215 while (outputDeactivations.isNotEmpty()) { 216 val toDeactivate = outputDeactivations.remove() 217 launch { 218 toDeactivate.upstream?.removeDownstreamAndDeactivateIfNeeded( 219 downstream = toDeactivate.schedulable 220 ) 221 } 222 } 223 } 224 check(deactivations.isEmpty()) { "unexpected lingering deactivations" } 225 check(outputDeactivations.isEmpty()) { "unexpected lingering output deactivations" } 226 } 227 } 228 229 internal class ScheduledAction<T>( 230 private val onResult: CompletableDeferred<T>? = null, 231 private val onStartTransaction: suspend EvalScope.() -> T, 232 ) { 233 private var result: Maybe<T> = none 234 startednull235 suspend fun started(evalScope: EvalScope) { 236 result = just(onStartTransaction(evalScope)) 237 } 238 completednull239 fun completed() { 240 if (onResult != null) { 241 when (val result = result) { 242 is Just -> onResult.complete(result.value) 243 else -> {} 244 } 245 } 246 result = none 247 } 248 } 249 250 internal typealias TransactionStore = HeteroMap 251