1 /*
<lambda>null2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.systemui.kairos
18
19 import com.android.systemui.kairos.internal.DemuxImpl
20 import com.android.systemui.kairos.internal.Init
21 import com.android.systemui.kairos.internal.InitScope
22 import com.android.systemui.kairos.internal.InputNode
23 import com.android.systemui.kairos.internal.Network
24 import com.android.systemui.kairos.internal.NoScope
25 import com.android.systemui.kairos.internal.TFlowImpl
26 import com.android.systemui.kairos.internal.activated
27 import com.android.systemui.kairos.internal.cached
28 import com.android.systemui.kairos.internal.constInit
29 import com.android.systemui.kairos.internal.filterNode
30 import com.android.systemui.kairos.internal.init
31 import com.android.systemui.kairos.internal.map
32 import com.android.systemui.kairos.internal.mapImpl
33 import com.android.systemui.kairos.internal.mapMaybeNode
34 import com.android.systemui.kairos.internal.mergeNodes
35 import com.android.systemui.kairos.internal.mergeNodesLeft
36 import com.android.systemui.kairos.internal.neverImpl
37 import com.android.systemui.kairos.internal.switchDeferredImplSingle
38 import com.android.systemui.kairos.internal.switchPromptImpl
39 import com.android.systemui.kairos.internal.util.hashString
40 import com.android.systemui.kairos.util.Either
41 import com.android.systemui.kairos.util.Left
42 import com.android.systemui.kairos.util.Maybe
43 import com.android.systemui.kairos.util.Right
44 import com.android.systemui.kairos.util.just
45 import com.android.systemui.kairos.util.map
46 import com.android.systemui.kairos.util.toMaybe
47 import java.util.concurrent.atomic.AtomicReference
48 import kotlin.reflect.KProperty
49 import kotlinx.coroutines.CompletableDeferred
50 import kotlinx.coroutines.CoroutineStart
51 import kotlinx.coroutines.Job
52 import kotlinx.coroutines.async
53 import kotlinx.coroutines.coroutineScope
54
55 /** A series of values of type [A] available at discrete points in time. */
56 @ExperimentalFrpApi
57 sealed class TFlow<out A> {
58 companion object {
59 /** A [TFlow] with no values. */
60 val empty: TFlow<Nothing> = EmptyFlow
61 }
62 }
63
64 /** A [TFlow] with no values. */
65 @ExperimentalFrpApi val emptyTFlow: TFlow<Nothing> = TFlow.empty
66
67 /**
68 * A forward-reference to a [TFlow]. Useful for recursive definitions.
69 *
70 * This reference can be used like a standard [TFlow], but will hold up evaluation of the FRP
71 * network until the [loopback] reference is set.
72 */
73 @ExperimentalFrpApi
74 class TFlowLoop<A> : TFlow<A>() {
75 private val deferred = CompletableDeferred<TFlow<A>>()
76
77 internal val init: Init<TFlowImpl<A>> =
<lambda>null78 init(name = null) { deferred.await().init.connect(evalScope = this) }
79
80 /** The [TFlow] this reference is referring to. */
81 @ExperimentalFrpApi
82 var loopback: TFlow<A>? = null
83 set(value) {
<lambda>null84 value?.let {
85 check(deferred.complete(value)) { "TFlowLoop.loopback has already been set." }
86 field = value
87 }
88 }
89
getValuenull90 operator fun getValue(thisRef: Any?, property: KProperty<*>): TFlow<A> = this
91
92 operator fun setValue(thisRef: Any?, property: KProperty<*>, value: TFlow<A>) {
93 loopback = value
94 }
95
toStringnull96 override fun toString(): String = "${this::class.simpleName}@$hashString"
97 }
98
99 /** TODO */
100 @ExperimentalFrpApi fun <A> Lazy<TFlow<A>>.defer(): TFlow<A> = deferInline { value }
101
102 /** TODO */
103 @ExperimentalFrpApi
<lambda>null104 fun <A> FrpDeferredValue<TFlow<A>>.defer(): TFlow<A> = deferInline { unwrapped.await() }
105
106 /** TODO */
107 @ExperimentalFrpApi
<lambda>null108 fun <A> deferTFlow(block: suspend FrpScope.() -> TFlow<A>): TFlow<A> = deferInline {
109 NoScope.runInFrpScope(block)
110 }
111
112 /** Returns a [TFlow] that emits the new value of this [TState] when it changes. */
113 @ExperimentalFrpApi
114 val <A> TState<A>.stateChanges: TFlow<A>
<lambda>null115 get() = TFlowInit(init(name = null) { init.connect(evalScope = this).changes })
116
117 /**
118 * Returns a [TFlow] that contains only the [just] results of applying [transform] to each value of
119 * the original [TFlow].
120 *
121 * @see mapNotNull
122 */
123 @ExperimentalFrpApi
mapMaybenull124 fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> {
125 val pulse =
126 mapMaybeNode({ init.connect(evalScope = this) }) { runInTransactionScope { transform(it) } }
127 return TFlowInit(constInit(name = null, pulse))
128 }
129
130 /**
131 * Returns a [TFlow] that contains only the non-null results of applying [transform] to each value
132 * of the original [TFlow].
133 *
134 * @see mapMaybe
135 */
136 @ExperimentalFrpApi
mapNotNullnull137 fun <A, B> TFlow<A>.mapNotNull(transform: suspend FrpTransactionScope.(A) -> B?): TFlow<B> =
138 mapMaybe {
139 transform(it).toMaybe()
140 }
141
142 /** Returns a [TFlow] containing only values of the original [TFlow] that are not null. */
<lambda>null143 @ExperimentalFrpApi fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapNotNull { it }
144
145 /** Shorthand for `mapNotNull { it as? A }`. */
146 @ExperimentalFrpApi
<lambda>null147 inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapNotNull { it as? A }
148
149 /** Shorthand for `mapMaybe { it }`. */
<lambda>null150 @ExperimentalFrpApi fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> = mapMaybe { it }
151
152 /**
153 * Returns a [TFlow] containing the results of applying [transform] to each value of the original
154 * [TFlow].
155 */
156 @ExperimentalFrpApi
mapnull157 fun <A, B> TFlow<A>.map(transform: suspend FrpTransactionScope.(A) -> B): TFlow<B> {
158 val mapped: TFlowImpl<B> =
159 mapImpl({ init.connect(evalScope = this) }) { a -> runInTransactionScope { transform(a) } }
160 return TFlowInit(constInit(name = null, mapped.cached()))
161 }
162
163 /**
164 * Like [map], but the emission is not cached during the transaction. Use only if [transform] is
165 * fast and pure.
166 *
167 * @see map
168 */
169 @ExperimentalFrpApi
mapCheapnull170 fun <A, B> TFlow<A>.mapCheap(transform: suspend FrpTransactionScope.(A) -> B): TFlow<B> =
171 TFlowInit(
172 constInit(
173 name = null,
174 mapImpl({ init.connect(evalScope = this) }) { a ->
<lambda>null175 runInTransactionScope { transform(a) }
176 },
177 )
178 )
179
180 /**
181 * Returns a [TFlow] that invokes [action] before each value of the original [TFlow] is emitted.
182 * Useful for logging and debugging.
183 *
184 * ```
185 * pulse.onEach { foo(it) } == pulse.map { foo(it); it }
186 * ```
187 *
188 * Note that the side effects performed in [onEach] are only performed while the resulting [TFlow]
189 * is connected to an output of the FRP network. If your goal is to reliably perform side effects in
190 * response to a [TFlow], use the output combinators available in [FrpBuildScope], such as
191 * [FrpBuildScope.toSharedFlow] or [FrpBuildScope.observe].
192 */
193 @ExperimentalFrpApi
<lambda>null194 fun <A> TFlow<A>.onEach(action: suspend FrpTransactionScope.(A) -> Unit): TFlow<A> = map {
195 action(it)
196 it
197 }
198
199 /**
200 * Returns a [TFlow] containing only values of the original [TFlow] that satisfy the given
201 * [predicate].
202 */
203 @ExperimentalFrpApi
filternull204 fun <A> TFlow<A>.filter(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> {
205 val pulse =
206 filterNode({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } }
207 return TFlowInit(constInit(name = null, pulse.cached()))
208 }
209
210 /**
211 * Splits a [TFlow] of pairs into a pair of [TFlows][TFlow], where each returned [TFlow] emits half
212 * of the original.
213 *
214 * Shorthand for:
215 * ```kotlin
216 * val lefts = map { it.first }
217 * val rights = map { it.second }
218 * return Pair(lefts, rights)
219 * ```
220 */
221 @ExperimentalFrpApi
unzipnull222 fun <A, B> TFlow<Pair<A, B>>.unzip(): Pair<TFlow<A>, TFlow<B>> {
223 val lefts = map { it.first }
224 val rights = map { it.second }
225 return lefts to rights
226 }
227
228 /**
229 * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from both.
230 *
231 * Because [TFlow]s can only emit one value per transaction, the provided [transformCoincidence]
232 * function is used to combine coincident emissions to produce the result value to be emitted by the
233 * merged [TFlow].
234 */
235 @ExperimentalFrpApi
mergeWithnull236 fun <A> TFlow<A>.mergeWith(
237 other: TFlow<A>,
238 transformCoincidence: suspend FrpTransactionScope.(A, A) -> A = { a, _ -> a },
239 ): TFlow<A> {
240 val node =
241 mergeNodes(
<lambda>null242 getPulse = { init.connect(evalScope = this) },
<lambda>null243 getOther = { other.init.connect(evalScope = this) },
anull244 ) { a, b ->
245 runInTransactionScope { transformCoincidence(a, b) }
246 }
247 return TFlowInit(constInit(name = null, node))
248 }
249
250 /**
251 * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from all. All coincident
252 * emissions are collected into the emitted [List], preserving the input ordering.
253 *
254 * @see mergeWith
255 * @see mergeLeft
256 */
257 @ExperimentalFrpApi
mergenull258 fun <A> merge(vararg flows: TFlow<A>): TFlow<List<A>> = flows.asIterable().merge()
259
260 /**
261 * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from all. In the case of
262 * coincident emissions, the emission from the left-most [TFlow] is emitted.
263 *
264 * @see merge
265 */
266 @ExperimentalFrpApi
267 fun <A> mergeLeft(vararg flows: TFlow<A>): TFlow<A> = flows.asIterable().mergeLeft()
268
269 /**
270 * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from all.
271 *
272 * Because [TFlow]s can only emit one value per transaction, the provided [transformCoincidence]
273 * function is used to combine coincident emissions to produce the result value to be emitted by the
274 * merged [TFlow].
275 */
276 // TODO: can be optimized to avoid creating the intermediate list
277 fun <A> merge(vararg flows: TFlow<A>, transformCoincidence: (A, A) -> A): TFlow<A> =
278 merge(*flows).map { l -> l.reduce(transformCoincidence) }
279
280 /**
281 * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from all. All coincident
282 * emissions are collected into the emitted [List], preserving the input ordering.
283 *
284 * @see mergeWith
285 * @see mergeLeft
286 */
287 @ExperimentalFrpApi
mergenull288 fun <A> Iterable<TFlow<A>>.merge(): TFlow<List<A>> =
289 TFlowInit(constInit(name = null, mergeNodes { map { it.init.connect(evalScope = this) } }))
290
291 /**
292 * Merges the given [TFlows][TFlow] into a single [TFlow] that emits events from all. In the case of
293 * coincident emissions, the emission from the left-most [TFlow] is emitted.
294 *
295 * @see merge
296 */
297 @ExperimentalFrpApi
mergeLeftnull298 fun <A> Iterable<TFlow<A>>.mergeLeft(): TFlow<A> =
299 TFlowInit(constInit(name = null, mergeNodesLeft { map { it.init.connect(evalScope = this) } }))
300
301 /**
302 * Creates a new [TFlow] that emits events from all given [TFlow]s. All simultaneous emissions are
303 * collected into the emitted [List], preserving the input ordering.
304 *
305 * @see mergeWith
306 */
mergenull307 @ExperimentalFrpApi fun <A> Sequence<TFlow<A>>.merge(): TFlow<List<A>> = asIterable().merge()
308
309 /**
310 * Creates a new [TFlow] that emits events from all given [TFlow]s. All simultaneous emissions are
311 * collected into the emitted [Map], and are given the same key of the associated [TFlow] in the
312 * input [Map].
313 *
314 * @see mergeWith
315 */
316 @ExperimentalFrpApi
317 fun <K, A> Map<K, TFlow<A>>.merge(): TFlow<Map<K, A>> =
318 asSequence().map { (k, flowA) -> flowA.map { a -> k to a } }.toList().merge().map { it.toMap() }
319
320 /**
321 * Returns a [GroupedTFlow] that can be used to efficiently split a single [TFlow] into multiple
322 * downstream [TFlow]s.
323 *
324 * The input [TFlow] emits [Map] instances that specify which downstream [TFlow] the associated
325 * value will be emitted from. These downstream [TFlow]s can be obtained via
326 * [GroupedTFlow.eventsForKey].
327 *
328 * An example:
329 * ```
330 * val sFoo: TFlow<Map<String, Foo>> = ...
331 * val fooById: GroupedTFlow<String, Foo> = sFoo.groupByKey()
332 * val fooBar: TFlow<Foo> = fooById["bar"]
333 * ```
334 *
335 * This is semantically equivalent to `val fooBar = sFoo.mapNotNull { map -> map["bar"] }` but is
336 * significantly more efficient; specifically, using [mapNotNull] in this way incurs a `O(n)`
337 * performance hit, where `n` is the number of different [mapNotNull] operations used to filter on a
338 * specific key's presence in the emitted [Map]. [groupByKey] internally uses a [HashMap] to lookup
339 * the appropriate downstream [TFlow], and so operates in `O(1)`.
340 *
341 * Note that the result [GroupedTFlow] should be cached and re-used to gain the performance benefit.
342 *
343 * @see selector
344 */
345 @ExperimentalFrpApi
groupByKeynull346 fun <K, A> TFlow<Map<K, A>>.groupByKey(numKeys: Int? = null): GroupedTFlow<K, A> =
347 GroupedTFlow(DemuxImpl({ init.connect(this) }, numKeys))
348
349 /**
350 * Shorthand for `map { mapOf(extractKey(it) to it) }.groupByKey()`
351 *
352 * @see groupByKey
353 */
354 @ExperimentalFrpApi
groupBynull355 fun <K, A> TFlow<A>.groupBy(
356 numKeys: Int? = null,
357 extractKey: suspend FrpTransactionScope.(A) -> K,
358 ): GroupedTFlow<K, A> = map { mapOf(extractKey(it) to it) }.groupByKey(numKeys)
359
360 /**
361 * Returns two new [TFlow]s that contain elements from this [TFlow] that satisfy or don't satisfy
362 * [predicate].
363 *
364 * Using this is equivalent to `upstream.filter(predicate) to upstream.filter { !predicate(it) }`
365 * but is more efficient; specifically, [partition] will only invoke [predicate] once per element.
366 */
367 @ExperimentalFrpApi
partitionnull368 fun <A> TFlow<A>.partition(
369 predicate: suspend FrpTransactionScope.(A) -> Boolean
370 ): Pair<TFlow<A>, TFlow<A>> {
371 val grouped: GroupedTFlow<Boolean, A> = groupBy(numKeys = 2, extractKey = predicate)
372 return Pair(grouped.eventsForKey(true), grouped.eventsForKey(false))
373 }
374
375 /**
376 * Returns two new [TFlow]s that contain elements from this [TFlow]; [Pair.first] will contain
377 * [Left] values, and [Pair.second] will contain [Right] values.
378 *
379 * Using this is equivalent to using [filterIsInstance] in conjunction with [map] twice, once for
380 * [Left]s and once for [Right]s, but is slightly more efficient; specifically, the
381 * [filterIsInstance] check is only performed once per element.
382 */
383 @ExperimentalFrpApi
partitionEithernull384 fun <A, B> TFlow<Either<A, B>>.partitionEither(): Pair<TFlow<A>, TFlow<B>> {
385 val (left, right) = partition { it is Left }
386 return Pair(left.mapCheap { (it as Left).value }, right.mapCheap { (it as Right).value })
387 }
388
389 /**
390 * A mapping from keys of type [K] to [TFlow]s emitting values of type [A].
391 *
392 * @see groupByKey
393 */
394 @ExperimentalFrpApi
395 class GroupedTFlow<in K, out A> internal constructor(internal val impl: DemuxImpl<K, A>) {
396 /**
397 * Returns a [TFlow] that emits values of type [A] that correspond to the given [key].
398 *
399 * @see groupByKey
400 */
401 @ExperimentalFrpApi
eventsForKeynull402 fun eventsForKey(key: K): TFlow<A> = TFlowInit(constInit(name = null, impl.eventsForKey(key)))
403
404 /**
405 * Returns a [TFlow] that emits values of type [A] that correspond to the given [key].
406 *
407 * @see groupByKey
408 */
409 @ExperimentalFrpApi operator fun get(key: K): TFlow<A> = eventsForKey(key)
410 }
411
412 /**
413 * Returns a [TFlow] that switches to the [TFlow] contained within this [TState] whenever it
414 * changes.
415 *
416 * This switch does take effect until the *next* transaction after [TState] changes. For a switch
417 * that takes effect immediately, see [switchPromptly].
418 */
419 @ExperimentalFrpApi
420 fun <A> TState<TFlow<A>>.switch(): TFlow<A> {
421 return TFlowInit(
422 constInit(
423 name = null,
424 switchDeferredImplSingle(
425 getStorage = {
426 init.connect(this).getCurrentWithEpoch(this).first.init.connect(this)
427 },
428 getPatches = {
429 mapImpl({ init.connect(this).changes }) { newFlow ->
430 newFlow.init.connect(this)
431 }
432 },
433 ),
434 )
435 )
436 }
437
438 /**
439 * Returns a [TFlow] that switches to the [TFlow] contained within this [TState] whenever it
440 * changes.
441 *
442 * This switch takes effect immediately within the same transaction that [TState] changes. In
443 * general, you should prefer [switch] over this method. It is both safer and more performant.
444 */
445 // TODO: parameter to handle coincidental emission from both old and new
446 @ExperimentalFrpApi
switchPromptlynull447 fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> {
448 val switchNode =
449 switchPromptImpl(
450 getStorage = {
451 mapOf(Unit to init.connect(this).getCurrentWithEpoch(this).first.init.connect(this))
452 },
453 getPatches = {
454 val patches = init.connect(this).changes
455 mapImpl({ patches }) { newFlow -> mapOf(Unit to just(newFlow.init.connect(this))) }
456 },
457 )
458 return TFlowInit(constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit) }))
459 }
460
461 /**
462 * A mutable [TFlow] that provides the ability to [emit] values to the flow, handling backpressure
463 * by coalescing all emissions into batches.
464 *
465 * @see FrpNetwork.coalescingMutableTFlow
466 */
467 @ExperimentalFrpApi
468 class CoalescingMutableTFlow<In, Out>
469 internal constructor(
470 internal val coalesce: (old: Out, new: In) -> Out,
471 internal val network: Network,
472 private val getInitialValue: () -> Out,
473 internal val impl: InputNode<Out> = InputNode(),
474 ) : TFlow<Out>() {
475 internal val name: String? = null
476 internal val storage = AtomicReference(false to getInitialValue())
477
toStringnull478 override fun toString(): String = "${this::class.simpleName}@$hashString"
479
480 /**
481 * Inserts [value] into the current batch, enqueueing it for emission from this [TFlow] if not
482 * already pending.
483 *
484 * Backpressure occurs when [emit] is called while the FRP network is currently in a
485 * transaction; if called multiple times, then emissions will be coalesced into a single batch
486 * that is then processed when the network is ready.
487 */
488 @ExperimentalFrpApi
489 fun emit(value: In) {
490 val (scheduled, _) = storage.getAndUpdate { (_, old) -> true to coalesce(old, value) }
491 if (!scheduled) {
492 @Suppress("DeferredResultUnused")
493 network.transaction {
494 impl.visit(this, storage.getAndSet(false to getInitialValue()).second)
495 }
496 }
497 }
498 }
499
500 /**
501 * A mutable [TFlow] that provides the ability to [emit] values to the flow, handling backpressure
502 * by suspending the emitter.
503 *
504 * @see FrpNetwork.coalescingMutableTFlow
505 */
506 @ExperimentalFrpApi
507 class MutableTFlow<T>
508 internal constructor(internal val network: Network, internal val impl: InputNode<T> = InputNode()) :
509 TFlow<T>() {
510 internal val name: String? = null
511
512 private val storage = AtomicReference<Job?>(null)
513
toStringnull514 override fun toString(): String = "${this::class.simpleName}@$hashString"
515
516 /**
517 * Emits a [value] to this [TFlow], suspending the caller until the FRP transaction containing
518 * the emission has completed.
519 */
520 @ExperimentalFrpApi
521 suspend fun emit(value: T) {
522 coroutineScope {
523 var jobOrNull: Job? = null
524 val newEmit =
525 async(start = CoroutineStart.LAZY) {
526 jobOrNull?.join()
527 network.transaction { impl.visit(this, value) }.await()
528 }
529 jobOrNull = storage.getAndSet(newEmit)
530 newEmit.await()
531 }
532 }
533
534 // internal suspend fun emitInCurrentTransaction(value: T, evalScope: EvalScope) {
535 // if (storage.getAndSet(just(value)) is None) {
536 // impl.visit(evalScope)
537 // }
538 // }
539 }
540
541 private data object EmptyFlow : TFlow<Nothing>()
542
543 internal class TFlowInit<out A>(val init: Init<TFlowImpl<A>>) : TFlow<A>() {
toStringnull544 override fun toString(): String = "${this::class.simpleName}@$hashString"
545 }
546
547 internal val <A> TFlow<A>.init: Init<TFlowImpl<A>>
548 get() =
549 when (this) {
550 is EmptyFlow -> constInit("EmptyFlow", neverImpl)
551 is TFlowInit -> init
552 is TFlowLoop -> init
553 is CoalescingMutableTFlow<*, A> -> constInit(name, impl.activated())
554 is MutableTFlow -> constInit(name, impl.activated())
555 }
556
deferInlinenull557 private inline fun <A> deferInline(crossinline block: suspend InitScope.() -> TFlow<A>): TFlow<A> =
558 TFlowInit(init(name = null) { block().init.connect(evalScope = this) })
559