1 /*
<lambda>null2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.systemui.kairos.internal
18
19 import com.android.systemui.kairos.internal.util.Bag
20 import java.util.TreeMap
21 import kotlinx.coroutines.CoroutineScope
22 import kotlinx.coroutines.launch
23
24 /**
25 * Tracks all upstream connections for Mux nodes.
26 *
27 * Connections come in two flavors:
28 * 1. **DIRECT** :: The upstream node may emit events that would cause the owner of this depth
29 * tracker to also emit.
30 * 2. **INDIRECT** :: The upstream node will not emit events, but may start doing so in a future
31 * transaction (at which point its depth will change to DIRECT).
32 *
33 * DIRECT connections are the standard, active connections that propagate events through the graph.
34 * They are used to calculate the evaluation depth of a node, so that it is only visited once it is
35 * certain that all DIRECT upstream connections have already been visited (or are not emitting in
36 * the current transaction).
37 *
38 * It is *invalid* for a node to be directly upstream of itself. Doing so is an error.
39 *
40 * INDIRECT connections identify nodes that are still "alive" (should not be garbage-collected) but
41 * are presently "dormant". This only occurs when a MuxDeferredNode has nothing switched-in, but is
42 * still connected to its "patches" upstream node, implying that something *may* be switched-in at a
43 * later time.
44 *
45 * It is *invalid* for a node to be indirectly upstream of itself. These connections are
46 * automatically filtered out.
47 *
48 * When there are no connections, either DIRECT or INDIRECT, a node *dies* and all incoming/outgoing
49 * connections are freed so that it can be garbage-collected.
50 *
51 * Note that there is an edge case where a MuxDeferredNode is connected to itself via its "patches"
52 * upstream node. In this case:
53 * 1. If the node has switched-in upstream nodes, then this is perfectly valid. Downstream nodes
54 * will see a direct connection to this MuxDeferredNode.
55 * 2. Otherwise, the node would normally be considered "dormant" and downstream nodes would see an
56 * indirect connection. However, because a node cannot be indirectly upstream of itself, then the
57 * MuxDeferredNode sees no connection via its patches upstream node, and so is considered "dead".
58 * Conceptually, this makes some sense: The only way for this recursive MuxDeferredNode to become
59 * non-dormant is to switch some upstream nodes back in, but since the patches node is itself,
60 * this will never happen.
61 *
62 * This behavior underpins the recursive definition of `nextOnly`.
63 */
64 internal class DepthTracker {
65
66 @Volatile var snapshotIsDirect = true
67 @Volatile private var snapshotIsIndirectRoot = false
68
69 private inline val snapshotIsIndirect: Boolean
70 get() = !snapshotIsDirect
71
72 @Volatile var snapshotIndirectDepth: Int = 0
73 @Volatile var snapshotDirectDepth: Int = 0
74
75 private val _snapshotIndirectRoots = HashSet<MuxDeferredNode<*, *>>()
76 val snapshotIndirectRoots
77 get() = _snapshotIndirectRoots.toSet()
78
79 private val indirectAdditions = HashSet<MuxDeferredNode<*, *>>()
80 private val indirectRemovals = HashSet<MuxDeferredNode<*, *>>()
81 private val dirty_directUpstreamDepths = TreeMap<Int, Int>()
82 private val dirty_indirectUpstreamDepths = TreeMap<Int, Int>()
83 private val dirty_indirectUpstreamRoots = Bag<MuxDeferredNode<*, *>>()
84 @Volatile var dirty_directDepth = 0
85 @Volatile private var dirty_indirectDepth = 0
86 @Volatile private var dirty_depthIsDirect = true
87 @Volatile private var dirty_isIndirectRoot = false
88
89 fun schedule(scheduler: Scheduler, node: MuxNode<*, *, *>) {
90 if (dirty_depthIsDirect) {
91 scheduler.schedule(dirty_directDepth, node)
92 } else {
93 scheduler.scheduleIndirect(dirty_indirectDepth, node)
94 }
95 }
96
97 // only used by MuxDeferred
98 // and only when there is a direct connection to the patch node
99 fun setIsIndirectRoot(isRoot: Boolean): Boolean {
100 if (isRoot != dirty_isIndirectRoot) {
101 dirty_isIndirectRoot = isRoot
102 return !dirty_depthIsDirect
103 }
104 return false
105 }
106
107 // adds an upstream connection, and recalcs depth
108 // returns true if depth has changed
109 fun addDirectUpstream(oldDepth: Int?, newDepth: Int): Boolean {
110 if (oldDepth != null) {
111 dirty_directUpstreamDepths.compute(oldDepth) { _, count ->
112 count?.minus(1)?.takeIf { it > 0 }
113 }
114 }
115 dirty_directUpstreamDepths.compute(newDepth) { _, current -> current?.plus(1) ?: 1 }
116 return recalcDepth()
117 }
118
119 private fun recalcDepth(): Boolean {
120 val newDepth =
121 dirty_directUpstreamDepths.lastEntry()?.let { (maxDepth, _) -> maxDepth + 1 } ?: 0
122
123 val isDirect = dirty_directUpstreamDepths.isNotEmpty()
124 val isDirectChanged = dirty_depthIsDirect != isDirect
125 dirty_depthIsDirect = isDirect
126
127 return (newDepth != dirty_directDepth).also { dirty_directDepth = newDepth } or
128 isDirectChanged
129 }
130
131 private fun recalcIndirDepth(): Boolean {
132 val newDepth =
133 dirty_indirectUpstreamDepths.lastEntry()?.let { (maxDepth, _) -> maxDepth + 1 } ?: 0
134 return (!dirty_depthIsDirect && !dirty_isIndirectRoot && newDepth != dirty_indirectDepth)
135 .also { dirty_indirectDepth = newDepth }
136 }
137
138 fun removeDirectUpstream(depth: Int): Boolean {
139 dirty_directUpstreamDepths.compute(depth) { _, count -> count?.minus(1)?.takeIf { it > 0 } }
140 return recalcDepth()
141 }
142
143 fun addIndirectUpstream(oldDepth: Int?, newDepth: Int): Boolean =
144 if (oldDepth == newDepth) {
145 false
146 } else {
147 if (oldDepth != null) {
148 dirty_indirectUpstreamDepths.compute(oldDepth) { _, current ->
149 current?.minus(1)?.takeIf { it > 0 }
150 }
151 }
152 dirty_indirectUpstreamDepths.compute(newDepth) { _, current -> current?.plus(1) ?: 1 }
153 recalcIndirDepth()
154 }
155
156 fun removeIndirectUpstream(depth: Int): Boolean {
157 dirty_indirectUpstreamDepths.compute(depth) { _, current ->
158 current?.minus(1)?.takeIf { it > 0 }
159 }
160 return recalcIndirDepth()
161 }
162
163 fun updateIndirectRoots(
164 additions: Set<MuxDeferredNode<*, *>>? = null,
165 removals: Set<MuxDeferredNode<*, *>>? = null,
166 butNot: MuxDeferredNode<*, *>? = null,
167 ): Boolean {
168 val addsChanged =
169 additions
170 ?.let { dirty_indirectUpstreamRoots.addAll(additions, butNot) }
171 ?.let {
172 indirectAdditions.addAll(indirectRemovals.applyRemovalDiff(it))
173 true
174 } ?: false
175 val removalsChanged =
176 removals
177 ?.let { dirty_indirectUpstreamRoots.removeAll(removals) }
178 ?.let {
179 indirectRemovals.addAll(indirectAdditions.applyRemovalDiff(it))
180 true
181 } ?: false
182 return (!dirty_depthIsDirect && (addsChanged || removalsChanged))
183 }
184
185 private fun <T> HashSet<T>.applyRemovalDiff(changeSet: Set<T>): Set<T> {
186 val remainder = HashSet<T>()
187 for (element in changeSet) {
188 if (!add(element)) {
189 remainder.add(element)
190 }
191 }
192 return remainder
193 }
194
195 suspend fun propagateChanges(scheduler: Scheduler, muxNode: MuxNode<*, *, *>) {
196 if (isDirty()) {
197 schedule(scheduler, muxNode)
198 }
199 }
200
201 fun applyChanges(
202 coroutineScope: CoroutineScope,
203 scheduler: Scheduler,
204 downstreamSet: DownstreamSet,
205 muxNode: MuxNode<*, *, *>,
206 ) {
207 when {
208 dirty_depthIsDirect -> {
209 if (snapshotIsDirect) {
210 downstreamSet.adjustDirectUpstream(
211 coroutineScope,
212 scheduler,
213 oldDepth = snapshotDirectDepth,
214 newDepth = dirty_directDepth,
215 )
216 } else {
217 downstreamSet.moveIndirectUpstreamToDirect(
218 coroutineScope,
219 scheduler,
220 oldIndirectDepth = snapshotIndirectDepth,
221 oldIndirectSet =
222 buildSet {
223 addAll(snapshotIndirectRoots)
224 if (snapshotIsIndirectRoot) {
225 add(muxNode as MuxDeferredNode<*, *>)
226 }
227 },
228 newDirectDepth = dirty_directDepth,
229 )
230 }
231 }
232
233 dirty_hasIndirectUpstream() || dirty_isIndirectRoot -> {
234 if (snapshotIsDirect) {
235 downstreamSet.moveDirectUpstreamToIndirect(
236 coroutineScope,
237 scheduler,
238 oldDirectDepth = snapshotDirectDepth,
239 newIndirectDepth = dirty_indirectDepth,
240 newIndirectSet =
241 buildSet {
242 addAll(dirty_indirectUpstreamRoots)
243 if (dirty_isIndirectRoot) {
244 add(muxNode as MuxDeferredNode<*, *>)
245 }
246 },
247 )
248 } else {
249 downstreamSet.adjustIndirectUpstream(
250 coroutineScope,
251 scheduler,
252 oldDepth = snapshotIndirectDepth,
253 newDepth = dirty_indirectDepth,
254 removals =
255 buildSet {
256 addAll(indirectRemovals)
257 if (snapshotIsIndirectRoot && !dirty_isIndirectRoot) {
258 add(muxNode as MuxDeferredNode<*, *>)
259 }
260 },
261 additions =
262 buildSet {
263 addAll(indirectAdditions)
264 if (!snapshotIsIndirectRoot && dirty_isIndirectRoot) {
265 add(muxNode as MuxDeferredNode<*, *>)
266 }
267 },
268 )
269 }
270 }
271
272 else -> {
273 // die
274 muxNode.lifecycle.lifecycleState = MuxLifecycleState.Dead
275
276 if (snapshotIsDirect) {
277 downstreamSet.removeDirectUpstream(
278 coroutineScope,
279 scheduler,
280 depth = snapshotDirectDepth,
281 )
282 } else {
283 downstreamSet.removeIndirectUpstream(
284 coroutineScope,
285 scheduler,
286 depth = snapshotIndirectDepth,
287 indirectSet =
288 buildSet {
289 addAll(snapshotIndirectRoots)
290 if (snapshotIsIndirectRoot) {
291 add(muxNode as MuxDeferredNode<*, *>)
292 }
293 },
294 )
295 }
296 downstreamSet.clear()
297 }
298 }
299 reset()
300 }
301
302 fun dirty_hasDirectUpstream(): Boolean = dirty_directUpstreamDepths.isNotEmpty()
303
304 private fun dirty_hasIndirectUpstream(): Boolean = dirty_indirectUpstreamRoots.isNotEmpty()
305
306 override fun toString(): String =
307 "DepthTracker(" +
308 "sIsDirect=$snapshotIsDirect, " +
309 "sDirectDepth=$snapshotDirectDepth, " +
310 "sIndirectDepth=$snapshotIndirectDepth, " +
311 "sIndirectRoots=$snapshotIndirectRoots, " +
312 "dIsIndirectRoot=$dirty_isIndirectRoot, " +
313 "dDirectDepths=$dirty_directUpstreamDepths, " +
314 "dIndirectDepths=$dirty_indirectUpstreamDepths, " +
315 "dIndirectRoots=$dirty_indirectUpstreamRoots" +
316 ")"
317
318 fun reset() {
319 snapshotIsDirect = dirty_hasDirectUpstream()
320 snapshotDirectDepth = dirty_directDepth
321 snapshotIndirectDepth = dirty_indirectDepth
322 snapshotIsIndirectRoot = dirty_isIndirectRoot
323 if (indirectAdditions.isNotEmpty() || indirectRemovals.isNotEmpty()) {
324 _snapshotIndirectRoots.clear()
325 _snapshotIndirectRoots.addAll(dirty_indirectUpstreamRoots)
326 }
327 indirectAdditions.clear()
328 indirectRemovals.clear()
329 // check(!isDirty()) { "should not be dirty after a reset" }
330 }
331
332 fun isDirty(): Boolean =
333 when {
334 snapshotIsDirect -> !dirty_depthIsDirect || snapshotDirectDepth != dirty_directDepth
335 snapshotIsIndirectRoot -> dirty_depthIsDirect || !dirty_isIndirectRoot
336 else ->
337 dirty_depthIsDirect ||
338 dirty_isIndirectRoot ||
339 snapshotIndirectDepth != dirty_indirectDepth ||
340 indirectAdditions.isNotEmpty() ||
341 indirectRemovals.isNotEmpty()
342 }
343
344 fun dirty_depthIncreased(): Boolean =
345 snapshotDirectDepth < dirty_directDepth || snapshotIsIndirect && dirty_hasDirectUpstream()
346 }
347
348 /**
349 * Tracks downstream nodes to be scheduled when the owner of this DownstreamSet produces a value in
350 * a transaction.
351 */
352 internal class DownstreamSet {
353
354 val outputs = HashSet<Output<*>>()
355 val stateWriters = mutableListOf<TStateSource<*>>()
356 val muxMovers = HashSet<MuxDeferredNode<*, *>>()
357 val nodes = HashSet<SchedulableNode>()
358
addnull359 fun add(schedulable: Schedulable) {
360 when (schedulable) {
361 is Schedulable.S -> stateWriters.add(schedulable.state)
362 is Schedulable.M -> muxMovers.add(schedulable.muxMover)
363 is Schedulable.N -> nodes.add(schedulable.node)
364 is Schedulable.O -> outputs.add(schedulable.output)
365 }
366 }
367
removenull368 fun remove(schedulable: Schedulable) {
369 when (schedulable) {
370 is Schedulable.S -> error("WTF: latches are never removed")
371 is Schedulable.M -> muxMovers.remove(schedulable.muxMover)
372 is Schedulable.N -> nodes.remove(schedulable.node)
373 is Schedulable.O -> outputs.remove(schedulable.output)
374 }
375 }
376
adjustDirectUpstreamnull377 fun adjustDirectUpstream(
378 coroutineScope: CoroutineScope,
379 scheduler: Scheduler,
380 oldDepth: Int,
381 newDepth: Int,
382 ) =
383 coroutineScope.run {
384 for (node in nodes) {
385 launch { node.adjustDirectUpstream(scheduler, oldDepth, newDepth) }
386 }
387 }
388
moveIndirectUpstreamToDirectnull389 fun moveIndirectUpstreamToDirect(
390 coroutineScope: CoroutineScope,
391 scheduler: Scheduler,
392 oldIndirectDepth: Int,
393 oldIndirectSet: Set<MuxDeferredNode<*, *>>,
394 newDirectDepth: Int,
395 ) =
396 coroutineScope.run {
397 for (node in nodes) {
398 launch {
399 node.moveIndirectUpstreamToDirect(
400 scheduler,
401 oldIndirectDepth,
402 oldIndirectSet,
403 newDirectDepth,
404 )
405 }
406 }
407 for (mover in muxMovers) {
408 launch {
409 mover.moveIndirectPatchNodeToDirect(scheduler, oldIndirectDepth, oldIndirectSet)
410 }
411 }
412 }
413
adjustIndirectUpstreamnull414 fun adjustIndirectUpstream(
415 coroutineScope: CoroutineScope,
416 scheduler: Scheduler,
417 oldDepth: Int,
418 newDepth: Int,
419 removals: Set<MuxDeferredNode<*, *>>,
420 additions: Set<MuxDeferredNode<*, *>>,
421 ) =
422 coroutineScope.run {
423 for (node in nodes) {
424 launch {
425 node.adjustIndirectUpstream(scheduler, oldDepth, newDepth, removals, additions)
426 }
427 }
428 for (mover in muxMovers) {
429 launch {
430 mover.adjustIndirectPatchNode(
431 scheduler,
432 oldDepth,
433 newDepth,
434 removals,
435 additions,
436 )
437 }
438 }
439 }
440
moveDirectUpstreamToIndirectnull441 fun moveDirectUpstreamToIndirect(
442 coroutineScope: CoroutineScope,
443 scheduler: Scheduler,
444 oldDirectDepth: Int,
445 newIndirectDepth: Int,
446 newIndirectSet: Set<MuxDeferredNode<*, *>>,
447 ) =
448 coroutineScope.run {
449 for (node in nodes) {
450 launch {
451 node.moveDirectUpstreamToIndirect(
452 scheduler,
453 oldDirectDepth,
454 newIndirectDepth,
455 newIndirectSet,
456 )
457 }
458 }
459 for (mover in muxMovers) {
460 launch {
461 mover.moveDirectPatchNodeToIndirect(scheduler, newIndirectDepth, newIndirectSet)
462 }
463 }
464 }
465
removeIndirectUpstreamnull466 fun removeIndirectUpstream(
467 coroutineScope: CoroutineScope,
468 scheduler: Scheduler,
469 depth: Int,
470 indirectSet: Set<MuxDeferredNode<*, *>>,
471 ) =
472 coroutineScope.run {
473 for (node in nodes) {
474 launch { node.removeIndirectUpstream(scheduler, depth, indirectSet) }
475 }
476 for (mover in muxMovers) {
477 launch { mover.removeIndirectPatchNode(scheduler, depth, indirectSet) }
478 }
479 for (output in outputs) {
480 launch { output.kill() }
481 }
482 }
483
removeDirectUpstreamnull484 fun removeDirectUpstream(coroutineScope: CoroutineScope, scheduler: Scheduler, depth: Int) =
485 coroutineScope.run {
486 for (node in nodes) {
487 launch { node.removeDirectUpstream(scheduler, depth) }
488 }
489 for (mover in muxMovers) {
490 launch { mover.removeDirectPatchNode(scheduler) }
491 }
492 for (output in outputs) {
493 launch { output.kill() }
494 }
495 }
496
clearnull497 fun clear() {
498 outputs.clear()
499 stateWriters.clear()
500 muxMovers.clear()
501 nodes.clear()
502 }
503 }
504
505 // TODO: remove this indirection
506 internal sealed interface Schedulable {
507 data class S constructor(val state: TStateSource<*>) : Schedulable
508
509 data class M constructor(val muxMover: MuxDeferredNode<*, *>) : Schedulable
510
511 data class N constructor(val node: SchedulableNode) : Schedulable
512
513 data class O constructor(val output: Output<*>) : Schedulable
514 }
515
isEmptynull516 internal fun DownstreamSet.isEmpty() =
517 nodes.isEmpty() && outputs.isEmpty() && muxMovers.isEmpty() && stateWriters.isEmpty()
518
519 @Suppress("NOTHING_TO_INLINE") internal inline fun DownstreamSet.isNotEmpty() = !isEmpty()
520
521 internal fun CoroutineScope.scheduleAll(
522 downstreamSet: DownstreamSet,
523 evalScope: EvalScope,
524 ): Boolean {
525 downstreamSet.nodes.forEach { launch { it.schedule(evalScope) } }
526 downstreamSet.muxMovers.forEach { launch { it.scheduleMover(evalScope) } }
527 downstreamSet.outputs.forEach { launch { it.schedule(evalScope) } }
528 downstreamSet.stateWriters.forEach { evalScope.schedule(it) }
529 return downstreamSet.isNotEmpty()
530 }
531