xref: /aosp_15_r20/frameworks/base/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt (revision d57664e9bc4670b3ecf6748a746a57c557b6bc9e)
1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.systemui.kairos
18 
19 import com.android.systemui.kairos.internal.DemuxImpl
20 import com.android.systemui.kairos.internal.Init
21 import com.android.systemui.kairos.internal.InitScope
22 import com.android.systemui.kairos.internal.InputNode
23 import com.android.systemui.kairos.internal.Network
24 import com.android.systemui.kairos.internal.NoScope
25 import com.android.systemui.kairos.internal.TFlowImpl
26 import com.android.systemui.kairos.internal.activated
27 import com.android.systemui.kairos.internal.cached
28 import com.android.systemui.kairos.internal.constInit
29 import com.android.systemui.kairos.internal.filterNode
30 import com.android.systemui.kairos.internal.init
31 import com.android.systemui.kairos.internal.map
32 import com.android.systemui.kairos.internal.mapImpl
33 import com.android.systemui.kairos.internal.mapMaybeNode
34 import com.android.systemui.kairos.internal.mergeNodes
35 import com.android.systemui.kairos.internal.mergeNodesLeft
36 import com.android.systemui.kairos.internal.neverImpl
37 import com.android.systemui.kairos.internal.switchDeferredImplSingle
38 import com.android.systemui.kairos.internal.switchPromptImpl
39 import com.android.systemui.kairos.internal.util.hashString
40 import com.android.systemui.kairos.util.Either
41 import com.android.systemui.kairos.util.Left
42 import com.android.systemui.kairos.util.Maybe
43 import com.android.systemui.kairos.util.Right
44 import com.android.systemui.kairos.util.just
45 import com.android.systemui.kairos.util.map
46 import com.android.systemui.kairos.util.toMaybe
47 import java.util.concurrent.atomic.AtomicReference
48 import kotlin.reflect.KProperty
49 import kotlinx.coroutines.CompletableDeferred
50 import kotlinx.coroutines.CoroutineStart
51 import kotlinx.coroutines.Job
52 import kotlinx.coroutines.async
53 import kotlinx.coroutines.coroutineScope
54 
55 /** A series of values of type [A] available at discrete points in time. */
56 @ExperimentalFrpApi
57 sealed class TFlow<out A> {
58     companion object {
59         /** A [TFlow] with no values. */
60         val empty: TFlow<Nothing> = EmptyFlow
61     }
62 }
63 
64 /** A [TFlow] with no values. */
65 @ExperimentalFrpApi val emptyTFlow: TFlow<Nothing> = TFlow.empty
66 
67 /**
68  * A forward-reference to a [TFlow]. Useful for recursive definitions.
69  *
70  * This reference can be used like a standard [TFlow], but will hold up evaluation of the FRP
71  * network until the [loopback] reference is set.
72  */
73 @ExperimentalFrpApi
74 class TFlowLoop<A> : TFlow<A>() {
75     private val deferred = CompletableDeferred<TFlow<A>>()
76 
77     internal val init: Init<TFlowImpl<A>> =
<lambda>null78         init(name = null) { deferred.await().init.connect(evalScope = this) }
79 
80     /** The [TFlow] this reference is referring to. */
81     @ExperimentalFrpApi
82     var loopback: TFlow<A>? = null
83         set(value) {
<lambda>null84             value?.let {
85                 check(deferred.complete(value)) { "TFlowLoop.loopback has already been set." }
86                 field = value
87             }
88         }
89 
getValuenull90     operator fun getValue(thisRef: Any?, property: KProperty<*>): TFlow<A> = this
91 
92     operator fun setValue(thisRef: Any?, property: KProperty<*>, value: TFlow<A>) {
93         loopback = value
94     }
95 
toStringnull96     override fun toString(): String = "${this::class.simpleName}@$hashString"
97 }
98 
99 /** TODO */
100 @ExperimentalFrpApi fun <A> Lazy<TFlow<A>>.defer(): TFlow<A> = deferInline { value }
101 
102 /** TODO */
103 @ExperimentalFrpApi
<lambda>null104 fun <A> FrpDeferredValue<TFlow<A>>.defer(): TFlow<A> = deferInline { unwrapped.await() }
105 
106 /** TODO */
107 @ExperimentalFrpApi
<lambda>null108 fun <A> deferTFlow(block: suspend FrpScope.() -> TFlow<A>): TFlow<A> = deferInline {
109     NoScope.runInFrpScope(block)
110 }
111 
112 /** Returns a [TFlow] that emits the new value of this [TState] when it changes. */
113 @ExperimentalFrpApi
114 val <A> TState<A>.stateChanges: TFlow<A>
<lambda>null115     get() = TFlowInit(init(name = null) { init.connect(evalScope = this).changes })
116 
117 /**
118  * Returns a [TFlow] that contains only the [just] results of applying [transform] to each value of
119  * the original [TFlow].
120  *
121  * @see mapNotNull
122  */
123 @ExperimentalFrpApi
mapMaybenull124 fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> {
125     val pulse =
126         mapMaybeNode({ init.connect(evalScope = this) }) { runInTransactionScope { transform(it) } }
127     return TFlowInit(constInit(name = null, pulse))
128 }
129 
130 /**
131  * Returns a [TFlow] that contains only the non-null results of applying [transform] to each value
132  * of the original [TFlow].
133  *
134  * @see mapMaybe
135  */
136 @ExperimentalFrpApi
mapNotNullnull137 fun <A, B> TFlow<A>.mapNotNull(transform: suspend FrpTransactionScope.(A) -> B?): TFlow<B> =
138     mapMaybe {
139         transform(it).toMaybe()
140     }
141 
142 /** Returns a [TFlow] containing only values of the original [TFlow] that are not null. */
<lambda>null143 @ExperimentalFrpApi fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapNotNull { it }
144 
145 /** Shorthand for `mapNotNull { it as? A }`. */
146 @ExperimentalFrpApi
<lambda>null147 inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapNotNull { it as? A }
148 
149 /** Shorthand for `mapMaybe { it }`. */
<lambda>null150 @ExperimentalFrpApi fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> = mapMaybe { it }
151 
152 /**
153  * Returns a [TFlow] containing the results of applying [transform] to each value of the original
154  * [TFlow].
155  */
156 @ExperimentalFrpApi
mapnull157 fun <A, B> TFlow<A>.map(transform: suspend FrpTransactionScope.(A) -> B): TFlow<B> {
158     val mapped: TFlowImpl<B> =
159         mapImpl({ init.connect(evalScope = this) }) { a -> runInTransactionScope { transform(a) } }
160     return TFlowInit(constInit(name = null, mapped.cached()))
161 }
162 
163 /**
164  * Like [map], but the emission is not cached during the transaction. Use only if [transform] is
165  * fast and pure.
166  *
167  * @see map
168  */
169 @ExperimentalFrpApi
mapCheapnull170 fun <A, B> TFlow<A>.mapCheap(transform: suspend FrpTransactionScope.(A) -> B): TFlow<B> =
171     TFlowInit(
172         constInit(
173             name = null,
174             mapImpl({ init.connect(evalScope = this) }) { a ->
<lambda>null175                 runInTransactionScope { transform(a) }
176             },
177         )
178     )
179 
180 /**
181  * Returns a [TFlow] that invokes [action] before each value of the original [TFlow] is emitted.
182  * Useful for logging and debugging.
183  *
184  * ```
185  *   pulse.onEach { foo(it) } == pulse.map { foo(it); it }
186  * ```
187  *
188  * Note that the side effects performed in [onEach] are only performed while the resulting [TFlow]
189  * is connected to an output of the FRP network. If your goal is to reliably perform side effects in
190  * response to a [TFlow], use the output combinators available in [FrpBuildScope], such as
191  * [FrpBuildScope.toSharedFlow] or [FrpBuildScope.observe].
192  */
193 @ExperimentalFrpApi
<lambda>null194 fun <A> TFlow<A>.onEach(action: suspend FrpTransactionScope.(A) -> Unit): TFlow<A> = map {
195     action(it)
196     it
197 }
198 
199 /**
200  * Returns a [TFlow] containing only values of the original [TFlow] that satisfy the given
201  * [predicate].
202  */
203 @ExperimentalFrpApi
filternull204 fun <A> TFlow<A>.filter(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> {
205     val pulse =
206         filterNode({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } }
207     return TFlowInit(constInit(name = null, pulse.cached()))
208 }
209 
210 /**
211  * Splits a [TFlow] of pairs into a pair of [TFlows][TFlow], where each returned [TFlow] emits half
212  * of the original.
213  *
214  * Shorthand for:
215  * ```kotlin
216  * val lefts = map { it.first }
217  * val rights = map { it.second }
218  * return Pair(lefts, rights)
219  * ```
220  */
221 @ExperimentalFrpApi
unzipnull222 fun <A, B> TFlow<Pair<A, B>>.unzip(): Pair<TFlow<A>, TFlow<B>> {
223     val lefts = map { it.first }
224     val rights = map { it.second }
225     return lefts to rights
226 }
227 
228 /**
229  * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from both.
230  *
231  * Because [TFlow]s can only emit one value per transaction, the provided [transformCoincidence]
232  * function is used to combine coincident emissions to produce the result value to be emitted by the
233  * merged [TFlow].
234  */
235 @ExperimentalFrpApi
mergeWithnull236 fun <A> TFlow<A>.mergeWith(
237     other: TFlow<A>,
238     transformCoincidence: suspend FrpTransactionScope.(A, A) -> A = { a, _ -> a },
239 ): TFlow<A> {
240     val node =
241         mergeNodes(
<lambda>null242             getPulse = { init.connect(evalScope = this) },
<lambda>null243             getOther = { other.init.connect(evalScope = this) },
anull244         ) { a, b ->
245             runInTransactionScope { transformCoincidence(a, b) }
246         }
247     return TFlowInit(constInit(name = null, node))
248 }
249 
250 /**
251  * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from all. All coincident
252  * emissions are collected into the emitted [List], preserving the input ordering.
253  *
254  * @see mergeWith
255  * @see mergeLeft
256  */
257 @ExperimentalFrpApi
mergenull258 fun <A> merge(vararg flows: TFlow<A>): TFlow<List<A>> = flows.asIterable().merge()
259 
260 /**
261  * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from all. In the case of
262  * coincident emissions, the emission from the left-most [TFlow] is emitted.
263  *
264  * @see merge
265  */
266 @ExperimentalFrpApi
267 fun <A> mergeLeft(vararg flows: TFlow<A>): TFlow<A> = flows.asIterable().mergeLeft()
268 
269 /**
270  * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from all.
271  *
272  * Because [TFlow]s can only emit one value per transaction, the provided [transformCoincidence]
273  * function is used to combine coincident emissions to produce the result value to be emitted by the
274  * merged [TFlow].
275  */
276 // TODO: can be optimized to avoid creating the intermediate list
277 fun <A> merge(vararg flows: TFlow<A>, transformCoincidence: (A, A) -> A): TFlow<A> =
278     merge(*flows).map { l -> l.reduce(transformCoincidence) }
279 
280 /**
281  * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from all. All coincident
282  * emissions are collected into the emitted [List], preserving the input ordering.
283  *
284  * @see mergeWith
285  * @see mergeLeft
286  */
287 @ExperimentalFrpApi
mergenull288 fun <A> Iterable<TFlow<A>>.merge(): TFlow<List<A>> =
289     TFlowInit(constInit(name = null, mergeNodes { map { it.init.connect(evalScope = this) } }))
290 
291 /**
292  * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from all. In the case of
293  * coincident emissions, the emission from the left-most [TFlow] is emitted.
294  *
295  * @see merge
296  */
297 @ExperimentalFrpApi
mergeLeftnull298 fun <A> Iterable<TFlow<A>>.mergeLeft(): TFlow<A> =
299     TFlowInit(constInit(name = null, mergeNodesLeft { map { it.init.connect(evalScope = this) } }))
300 
301 /**
302  * Creates a new [TFlow] that emits events from all given [TFlow]s. All simultaneous emissions are
303  * collected into the emitted [List], preserving the input ordering.
304  *
305  * @see mergeWith
306  */
mergenull307 @ExperimentalFrpApi fun <A> Sequence<TFlow<A>>.merge(): TFlow<List<A>> = asIterable().merge()
308 
309 /**
310  * Creates a new [TFlow] that emits events from all given [TFlow]s. All simultaneous emissions are
311  * collected into the emitted [Map], and are given the same key of the associated [TFlow] in the
312  * input [Map].
313  *
314  * @see mergeWith
315  */
316 @ExperimentalFrpApi
317 fun <K, A> Map<K, TFlow<A>>.merge(): TFlow<Map<K, A>> =
318     asSequence().map { (k, flowA) -> flowA.map { a -> k to a } }.toList().merge().map { it.toMap() }
319 
320 /**
321  * Returns a [GroupedTFlow] that can be used to efficiently split a single [TFlow] into multiple
322  * downstream [TFlow]s.
323  *
324  * The input [TFlow] emits [Map] instances that specify which downstream [TFlow] the associated
325  * value will be emitted from. These downstream [TFlow]s can be obtained via
326  * [GroupedTFlow.eventsForKey].
327  *
328  * An example:
329  * ```
330  *   val sFoo: TFlow<Map<String, Foo>> = ...
331  *   val fooById: GroupedTFlow<String, Foo> = sFoo.groupByKey()
332  *   val fooBar: TFlow<Foo> = fooById["bar"]
333  * ```
334  *
335  * This is semantically equivalent to `val fooBar = sFoo.mapNotNull { map -> map["bar"] }` but is
336  * significantly more efficient; specifically, using [mapNotNull] in this way incurs a `O(n)`
337  * performance hit, where `n` is the number of different [mapNotNull] operations used to filter on a
338  * specific key's presence in the emitted [Map]. [groupByKey] internally uses a [HashMap] to lookup
339  * the appropriate downstream [TFlow], and so operates in `O(1)`.
340  *
341  * Note that the result [GroupedTFlow] should be cached and re-used to gain the performance benefit.
342  *
343  * @see selector
344  */
345 @ExperimentalFrpApi
groupByKeynull346 fun <K, A> TFlow<Map<K, A>>.groupByKey(numKeys: Int? = null): GroupedTFlow<K, A> =
347     GroupedTFlow(DemuxImpl({ init.connect(this) }, numKeys))
348 
349 /**
350  * Shorthand for `map { mapOf(extractKey(it) to it) }.groupByKey()`
351  *
352  * @see groupByKey
353  */
354 @ExperimentalFrpApi
groupBynull355 fun <K, A> TFlow<A>.groupBy(
356     numKeys: Int? = null,
357     extractKey: suspend FrpTransactionScope.(A) -> K,
358 ): GroupedTFlow<K, A> = map { mapOf(extractKey(it) to it) }.groupByKey(numKeys)
359 
360 /**
361  * Returns two new [TFlow]s that contain elements from this [TFlow] that satisfy or don't satisfy
362  * [predicate].
363  *
364  * Using this is equivalent to `upstream.filter(predicate) to upstream.filter { !predicate(it) }`
365  * but is more efficient; specifically, [partition] will only invoke [predicate] once per element.
366  */
367 @ExperimentalFrpApi
partitionnull368 fun <A> TFlow<A>.partition(
369     predicate: suspend FrpTransactionScope.(A) -> Boolean
370 ): Pair<TFlow<A>, TFlow<A>> {
371     val grouped: GroupedTFlow<Boolean, A> = groupBy(numKeys = 2, extractKey = predicate)
372     return Pair(grouped.eventsForKey(true), grouped.eventsForKey(false))
373 }
374 
375 /**
376  * Returns two new [TFlow]s that contain elements from this [TFlow]; [Pair.first] will contain
377  * [Left] values, and [Pair.second] will contain [Right] values.
378  *
379  * Using this is equivalent to using [filterIsInstance] in conjunction with [map] twice, once for
380  * [Left]s and once for [Right]s, but is slightly more efficient; specifically, the
381  * [filterIsInstance] check is only performed once per element.
382  */
383 @ExperimentalFrpApi
partitionEithernull384 fun <A, B> TFlow<Either<A, B>>.partitionEither(): Pair<TFlow<A>, TFlow<B>> {
385     val (left, right) = partition { it is Left }
386     return Pair(left.mapCheap { (it as Left).value }, right.mapCheap { (it as Right).value })
387 }
388 
389 /**
390  * A mapping from keys of type [K] to [TFlow]s emitting values of type [A].
391  *
392  * @see groupByKey
393  */
394 @ExperimentalFrpApi
395 class GroupedTFlow<in K, out A> internal constructor(internal val impl: DemuxImpl<K, A>) {
396     /**
397      * Returns a [TFlow] that emits values of type [A] that correspond to the given [key].
398      *
399      * @see groupByKey
400      */
401     @ExperimentalFrpApi
eventsForKeynull402     fun eventsForKey(key: K): TFlow<A> = TFlowInit(constInit(name = null, impl.eventsForKey(key)))
403 
404     /**
405      * Returns a [TFlow] that emits values of type [A] that correspond to the given [key].
406      *
407      * @see groupByKey
408      */
409     @ExperimentalFrpApi operator fun get(key: K): TFlow<A> = eventsForKey(key)
410 }
411 
412 /**
413  * Returns a [TFlow] that switches to the [TFlow] contained within this [TState] whenever it
414  * changes.
415  *
416  * This switch does take effect until the *next* transaction after [TState] changes. For a switch
417  * that takes effect immediately, see [switchPromptly].
418  */
419 @ExperimentalFrpApi
420 fun <A> TState<TFlow<A>>.switch(): TFlow<A> {
421     return TFlowInit(
422         constInit(
423             name = null,
424             switchDeferredImplSingle(
425                 getStorage = {
426                     init.connect(this).getCurrentWithEpoch(this).first.init.connect(this)
427                 },
428                 getPatches = {
429                     mapImpl({ init.connect(this).changes }) { newFlow ->
430                         newFlow.init.connect(this)
431                     }
432                 },
433             ),
434         )
435     )
436 }
437 
438 /**
439  * Returns a [TFlow] that switches to the [TFlow] contained within this [TState] whenever it
440  * changes.
441  *
442  * This switch takes effect immediately within the same transaction that [TState] changes. In
443  * general, you should prefer [switch] over this method. It is both safer and more performant.
444  */
445 // TODO: parameter to handle coincidental emission from both old and new
446 @ExperimentalFrpApi
switchPromptlynull447 fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> {
448     val switchNode =
449         switchPromptImpl(
450             getStorage = {
451                 mapOf(Unit to init.connect(this).getCurrentWithEpoch(this).first.init.connect(this))
452             },
453             getPatches = {
454                 val patches = init.connect(this).changes
455                 mapImpl({ patches }) { newFlow -> mapOf(Unit to just(newFlow.init.connect(this))) }
456             },
457         )
458     return TFlowInit(constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit) }))
459 }
460 
461 /**
462  * A mutable [TFlow] that provides the ability to [emit] values to the flow, handling backpressure
463  * by coalescing all emissions into batches.
464  *
465  * @see FrpNetwork.coalescingMutableTFlow
466  */
467 @ExperimentalFrpApi
468 class CoalescingMutableTFlow<In, Out>
469 internal constructor(
470     internal val coalesce: (old: Out, new: In) -> Out,
471     internal val network: Network,
472     private val getInitialValue: () -> Out,
473     internal val impl: InputNode<Out> = InputNode(),
474 ) : TFlow<Out>() {
475     internal val name: String? = null
476     internal val storage = AtomicReference(false to getInitialValue())
477 
toStringnull478     override fun toString(): String = "${this::class.simpleName}@$hashString"
479 
480     /**
481      * Inserts [value] into the current batch, enqueueing it for emission from this [TFlow] if not
482      * already pending.
483      *
484      * Backpressure occurs when [emit] is called while the FRP network is currently in a
485      * transaction; if called multiple times, then emissions will be coalesced into a single batch
486      * that is then processed when the network is ready.
487      */
488     @ExperimentalFrpApi
489     fun emit(value: In) {
490         val (scheduled, _) = storage.getAndUpdate { (_, old) -> true to coalesce(old, value) }
491         if (!scheduled) {
492             @Suppress("DeferredResultUnused")
493             network.transaction {
494                 impl.visit(this, storage.getAndSet(false to getInitialValue()).second)
495             }
496         }
497     }
498 }
499 
500 /**
501  * A mutable [TFlow] that provides the ability to [emit] values to the flow, handling backpressure
502  * by suspending the emitter.
503  *
504  * @see FrpNetwork.coalescingMutableTFlow
505  */
506 @ExperimentalFrpApi
507 class MutableTFlow<T>
508 internal constructor(internal val network: Network, internal val impl: InputNode<T> = InputNode()) :
509     TFlow<T>() {
510     internal val name: String? = null
511 
512     private val storage = AtomicReference<Job?>(null)
513 
toStringnull514     override fun toString(): String = "${this::class.simpleName}@$hashString"
515 
516     /**
517      * Emits a [value] to this [TFlow], suspending the caller until the FRP transaction containing
518      * the emission has completed.
519      */
520     @ExperimentalFrpApi
521     suspend fun emit(value: T) {
522         coroutineScope {
523             var jobOrNull: Job? = null
524             val newEmit =
525                 async(start = CoroutineStart.LAZY) {
526                     jobOrNull?.join()
527                     network.transaction { impl.visit(this, value) }.await()
528                 }
529             jobOrNull = storage.getAndSet(newEmit)
530             newEmit.await()
531         }
532     }
533 
534     //    internal suspend fun emitInCurrentTransaction(value: T, evalScope: EvalScope) {
535     //        if (storage.getAndSet(just(value)) is None) {
536     //            impl.visit(evalScope)
537     //        }
538     //    }
539 }
540 
541 private data object EmptyFlow : TFlow<Nothing>()
542 
543 internal class TFlowInit<out A>(val init: Init<TFlowImpl<A>>) : TFlow<A>() {
toStringnull544     override fun toString(): String = "${this::class.simpleName}@$hashString"
545 }
546 
547 internal val <A> TFlow<A>.init: Init<TFlowImpl<A>>
548     get() =
549         when (this) {
550             is EmptyFlow -> constInit("EmptyFlow", neverImpl)
551             is TFlowInit -> init
552             is TFlowLoop -> init
553             is CoalescingMutableTFlow<*, A> -> constInit(name, impl.activated())
554             is MutableTFlow -> constInit(name, impl.activated())
555         }
556 
deferInlinenull557 private inline fun <A> deferInline(crossinline block: suspend InitScope.() -> TFlow<A>): TFlow<A> =
558     TFlowInit(init(name = null) { block().init.connect(evalScope = this) })
559