xref: /aosp_15_r20/frameworks/base/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpStateScope.kt (revision d57664e9bc4670b3ecf6748a746a57c557b6bc9e)
1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.systemui.kairos
18 
19 import com.android.systemui.kairos.combine as combinePure
20 import com.android.systemui.kairos.map as mapPure
21 import com.android.systemui.kairos.util.Just
22 import com.android.systemui.kairos.util.Left
23 import com.android.systemui.kairos.util.Maybe
24 import com.android.systemui.kairos.util.Right
25 import com.android.systemui.kairos.util.WithPrev
26 import com.android.systemui.kairos.util.just
27 import com.android.systemui.kairos.util.map
28 import com.android.systemui.kairos.util.none
29 import com.android.systemui.kairos.util.partitionEithers
30 import com.android.systemui.kairos.util.zipWith
31 import kotlin.coroutines.RestrictsSuspension
32 
33 typealias FrpStateful<R> = suspend FrpStateScope.() -> R
34 
35 /**
36  * Returns a [FrpStateful] that, when [applied][FrpStateScope.applyStateful], invokes [block] with
37  * the applier's [FrpStateScope].
38  */
39 // TODO: caching story? should each Scope have a cache of applied FrpStateful instances?
40 @ExperimentalFrpApi
41 @Suppress("NOTHING_TO_INLINE")
42 inline fun <A> statefully(noinline block: suspend FrpStateScope.() -> A): FrpStateful<A> = block
43 
44 /**
45  * Operations that accumulate state within the FRP network.
46  *
47  * State accumulation is an ongoing process that has a lifetime. Use `-Latest` combinators, such as
48  * [mapLatestStateful], to create smaller, nested lifecycles so that accumulation isn't running
49  * longer than needed.
50  */
51 @ExperimentalFrpApi
52 @RestrictsSuspension
53 interface FrpStateScope : FrpTransactionScope {
54 
55     /** TODO */
56     @ExperimentalFrpApi
57     // TODO: wish this could just be `deferred` but alas
58     fun <A> deferredStateScope(block: suspend FrpStateScope.() -> A): FrpDeferredValue<A>
59 
60     /**
61      * Returns a [TState] that holds onto the most recently emitted value from this [TFlow], or
62      * [initialValue] if nothing has been emitted since it was constructed.
63      *
64      * Note that the value contained within the [TState] is not updated until *after* all [TFlow]s
65      * have been processed; this keeps the value of the [TState] consistent during the entire FRP
66      * transaction.
67      */
68     @ExperimentalFrpApi fun <A> TFlow<A>.holdDeferred(initialValue: FrpDeferredValue<A>): TState<A>
69 
70     /**
71      * Returns a [TFlow] that emits from a merged, incrementally-accumulated collection of [TFlow]s
72      * emitted from this, following the same "patch" rules as outlined in [foldMapIncrementally].
73      *
74      * Conceptually this is equivalent to:
75      * ```kotlin
76      *   fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
77      *     initialTFlows: Map<K, TFlow<V>>,
78      *   ): TFlow<Map<K, V>> =
79      *     foldMapIncrementally(initialTFlows).map { it.merge() }.switch()
80      * ```
81      *
82      * While the behavior is equivalent to the conceptual definition above, the implementation is
83      * significantly more efficient.
84      *
85      * @see merge
86      */
87     @ExperimentalFrpApi
88     fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
89         initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
90     ): TFlow<Map<K, V>>
91 
92     /**
93      * Returns a [TFlow] that emits from a merged, incrementally-accumulated collection of [TFlow]s
94      * emitted from this, following the same "patch" rules as outlined in [foldMapIncrementally].
95      *
96      * Conceptually this is equivalent to:
97      * ```kotlin
98      *   fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPrompt(
99      *     initialTFlows: Map<K, TFlow<V>>,
100      *   ): TFlow<Map<K, V>> =
101      *     foldMapIncrementally(initialTFlows).map { it.merge() }.switchPromptly()
102      * ```
103      *
104      * While the behavior is equivalent to the conceptual definition above, the implementation is
105      * significantly more efficient.
106      *
107      * @see merge
108      */
109     @ExperimentalFrpApi
110     fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
111         initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
112     ): TFlow<Map<K, V>>
113 
114     // TODO: everything below this comment can be made into extensions once we have context params
115 
116     /**
117      * Returns a [TFlow] that emits from a merged, incrementally-accumulated collection of [TFlow]s
118      * emitted from this, following the same "patch" rules as outlined in [foldMapIncrementally].
119      *
120      * Conceptually this is equivalent to:
121      * ```kotlin
122      *   fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
123      *     initialTFlows: Map<K, TFlow<V>>,
124      *   ): TFlow<Map<K, V>> =
125      *     foldMapIncrementally(initialTFlows).map { it.merge() }.switch()
126      * ```
127      *
128      * While the behavior is equivalent to the conceptual definition above, the implementation is
129      * significantly more efficient.
130      *
131      * @see merge
132      */
133     @ExperimentalFrpApi
134     fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
135         initialTFlows: Map<K, TFlow<V>> = emptyMap()
136     ): TFlow<Map<K, V>> = mergeIncrementally(deferredOf(initialTFlows))
137 
138     /**
139      * Returns a [TFlow] that emits from a merged, incrementally-accumulated collection of [TFlow]s
140      * emitted from this, following the same "patch" rules as outlined in [foldMapIncrementally].
141      *
142      * Conceptually this is equivalent to:
143      * ```kotlin
144      *   fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPrompt(
145      *     initialTFlows: Map<K, TFlow<V>>,
146      *   ): TFlow<Map<K, V>> =
147      *     foldMapIncrementally(initialTFlows).map { it.merge() }.switchPromptly()
148      * ```
149      *
150      * While the behavior is equivalent to the conceptual definition above, the implementation is
151      * significantly more efficient.
152      *
153      * @see merge
154      */
155     @ExperimentalFrpApi
156     fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
157         initialTFlows: Map<K, TFlow<V>> = emptyMap()
158     ): TFlow<Map<K, V>> = mergeIncrementallyPromptly(deferredOf(initialTFlows))
159 
160     /** Applies the [FrpStateful] within this [FrpStateScope]. */
161     @ExperimentalFrpApi suspend fun <A> FrpStateful<A>.applyStateful(): A = this()
162 
163     /**
164      * Applies the [FrpStateful] within this [FrpStateScope], returning the result as an
165      * [FrpDeferredValue].
166      */
167     @ExperimentalFrpApi
168     fun <A> FrpStateful<A>.applyStatefulDeferred(): FrpDeferredValue<A> = deferredStateScope {
169         applyStateful()
170     }
171 
172     /**
173      * Returns a [TState] that holds onto the most recently emitted value from this [TFlow], or
174      * [initialValue] if nothing has been emitted since it was constructed.
175      *
176      * Note that the value contained within the [TState] is not updated until *after* all [TFlow]s
177      * have been processed; this keeps the value of the [TState] consistent during the entire FRP
178      * transaction.
179      */
180     @ExperimentalFrpApi
181     fun <A> TFlow<A>.hold(initialValue: A): TState<A> = holdDeferred(deferredOf(initialValue))
182 
183     /**
184      * Returns a [TFlow] the emits the result of applying [FrpStatefuls][FrpStateful] emitted from
185      * the original [TFlow].
186      *
187      * Unlike [applyLatestStateful], state accumulation is not stopped with each subsequent emission
188      * of the original [TFlow].
189      */
190     @ExperimentalFrpApi fun <A> TFlow<FrpStateful<A>>.applyStatefuls(): TFlow<A>
191 
192     /**
193      * Returns a [TFlow] containing the results of applying [transform] to each value of the
194      * original [TFlow].
195      *
196      * [transform] can perform state accumulation via its [FrpStateScope] receiver. Unlike
197      * [mapLatestStateful], accumulation is not stopped with each subsequent emission of the
198      * original [TFlow].
199      */
200     @ExperimentalFrpApi
201     fun <A, B> TFlow<A>.mapStateful(transform: suspend FrpStateScope.(A) -> B): TFlow<B> =
202         mapPure { statefully { transform(it) } }.applyStatefuls()
203 
204     /**
205      * Returns a [TState] the holds the result of applying the [FrpStateful] held by the original
206      * [TState].
207      *
208      * Unlike [applyLatestStateful], state accumulation is not stopped with each state change.
209      */
210     @ExperimentalFrpApi
211     fun <A> TState<FrpStateful<A>>.applyStatefuls(): TState<A> =
212         stateChanges
213             .applyStatefuls()
214             .holdDeferred(initialValue = deferredStateScope { sampleDeferred().get()() })
215 
216     /** Returns a [TFlow] that switches to the [TFlow] emitted by the original [TFlow]. */
217     @ExperimentalFrpApi fun <A> TFlow<TFlow<A>>.flatten() = hold(emptyTFlow).switch()
218 
219     /**
220      * Returns a [TFlow] containing the results of applying [transform] to each value of the
221      * original [TFlow].
222      *
223      * [transform] can perform state accumulation via its [FrpStateScope] receiver. With each
224      * invocation of [transform], state accumulation from previous invocation is stopped.
225      */
226     @ExperimentalFrpApi
227     fun <A, B> TFlow<A>.mapLatestStateful(transform: suspend FrpStateScope.(A) -> B): TFlow<B> =
228         mapPure { statefully { transform(it) } }.applyLatestStateful()
229 
230     /**
231      * Returns a [TFlow] that switches to a new [TFlow] produced by [transform] every time the
232      * original [TFlow] emits a value.
233      *
234      * [transform] can perform state accumulation via its [FrpStateScope] receiver. With each
235      * invocation of [transform], state accumulation from previous invocation is stopped.
236      */
237     @ExperimentalFrpApi
238     fun <A, B> TFlow<A>.flatMapLatestStateful(
239         transform: suspend FrpStateScope.(A) -> TFlow<B>
240     ): TFlow<B> = mapLatestStateful(transform).flatten()
241 
242     /**
243      * Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the
244      * original [TFlow].
245      *
246      * When each [FrpStateful] is applied, state accumulation from the previously-active
247      * [FrpStateful] is stopped.
248      */
249     @ExperimentalFrpApi
250     fun <A> TFlow<FrpStateful<A>>.applyLatestStateful(): TFlow<A> = applyLatestStateful {}.first
251 
252     /**
253      * Returns a [TState] containing the value returned by applying the [FrpStateful] held by the
254      * original [TState].
255      *
256      * When each [FrpStateful] is applied, state accumulation from the previously-active
257      * [FrpStateful] is stopped.
258      */
259     @ExperimentalFrpApi
260     fun <A> TState<FrpStateful<A>>.applyLatestStateful(): TState<A> {
261         val (changes, init) = stateChanges.applyLatestStateful { sample()() }
262         return changes.holdDeferred(init)
263     }
264 
265     /**
266      * Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the
267      * original [TFlow], and a [FrpDeferredValue] containing the result of applying [init]
268      * immediately.
269      *
270      * When each [FrpStateful] is applied, state accumulation from the previously-active
271      * [FrpStateful] is stopped.
272      */
273     @ExperimentalFrpApi
274     fun <A, B> TFlow<FrpStateful<B>>.applyLatestStateful(
275         init: FrpStateful<A>
276     ): Pair<TFlow<B>, FrpDeferredValue<A>> {
277         val (flow, result) =
278             mapCheap { spec -> mapOf(Unit to just(spec)) }
279                 .applyLatestStatefulForKey(init = mapOf(Unit to init), numKeys = 1)
280         val outFlow: TFlow<B> =
281             flow.mapMaybe {
282                 checkNotNull(it[Unit]) { "applyLatest: expected result, but none present in: $it" }
283             }
284         val outInit: FrpDeferredValue<A> = deferredTransactionScope {
285             val initResult: Map<Unit, A> = result.get()
286             check(Unit in initResult) {
287                 "applyLatest: expected initial result, but none present in: $initResult"
288             }
289             @Suppress("UNCHECKED_CAST")
290             initResult.getOrDefault(Unit) { null } as A
291         }
292         return Pair(outFlow, outInit)
293     }
294 
295     /**
296      * Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the
297      * original [TFlow], and a [FrpDeferredValue] containing the result of applying [init]
298      * immediately.
299      *
300      * If the [Maybe] contained within the value for an associated key is [none], then the
301      * previously-active [FrpStateful] will be stopped with no replacement.
302      *
303      * When each [FrpStateful] is applied, state accumulation from the previously-active
304      * [FrpStateful] with the same key is stopped.
305      */
306     @ExperimentalFrpApi
307     fun <K, A, B> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKey(
308         init: FrpDeferredValue<Map<K, FrpStateful<B>>>,
309         numKeys: Int? = null,
310     ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>>
311 
312     /**
313      * Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the
314      * original [TFlow], and a [FrpDeferredValue] containing the result of applying [init]
315      * immediately.
316      *
317      * When each [FrpStateful] is applied, state accumulation from the previously-active
318      * [FrpStateful] with the same key is stopped.
319      *
320      * If the [Maybe] contained within the value for an associated key is [none], then the
321      * previously-active [FrpStateful] will be stopped with no replacement.
322      */
323     @ExperimentalFrpApi
324     fun <K, A, B> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKey(
325         init: Map<K, FrpStateful<B>>,
326         numKeys: Int? = null,
327     ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> =
328         applyLatestStatefulForKey(deferredOf(init), numKeys)
329 
330     /**
331      * Returns a [TState] containing the latest results of applying each [FrpStateful] emitted from
332      * the original [TFlow].
333      *
334      * When each [FrpStateful] is applied, state accumulation from the previously-active
335      * [FrpStateful] with the same key is stopped.
336      *
337      * If the [Maybe] contained within the value for an associated key is [none], then the
338      * previously-active [FrpStateful] will be stopped with no replacement.
339      */
340     @ExperimentalFrpApi
341     fun <K, A> TFlow<Map<K, Maybe<FrpStateful<A>>>>.holdLatestStatefulForKey(
342         init: FrpDeferredValue<Map<K, FrpStateful<A>>>,
343         numKeys: Int? = null,
344     ): TState<Map<K, A>> {
345         val (changes, initialValues) = applyLatestStatefulForKey(init, numKeys)
346         return changes.foldMapIncrementally(initialValues)
347     }
348 
349     /**
350      * Returns a [TState] containing the latest results of applying each [FrpStateful] emitted from
351      * the original [TFlow].
352      *
353      * When each [FrpStateful] is applied, state accumulation from the previously-active
354      * [FrpStateful] with the same key is stopped.
355      *
356      * If the [Maybe] contained within the value for an associated key is [none], then the
357      * previously-active [FrpStateful] will be stopped with no replacement.
358      */
359     @ExperimentalFrpApi
360     fun <K, A> TFlow<Map<K, Maybe<FrpStateful<A>>>>.holdLatestStatefulForKey(
361         init: Map<K, FrpStateful<A>> = emptyMap(),
362         numKeys: Int? = null,
363     ): TState<Map<K, A>> = holdLatestStatefulForKey(deferredOf(init), numKeys)
364 
365     /**
366      * Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the
367      * original [TFlow], and a [FrpDeferredValue] containing the result of applying [init]
368      * immediately.
369      *
370      * When each [FrpStateful] is applied, state accumulation from the previously-active
371      * [FrpStateful] with the same key is stopped.
372      *
373      * If the [Maybe] contained within the value for an associated key is [none], then the
374      * previously-active [FrpStateful] will be stopped with no replacement.
375      */
376     @ExperimentalFrpApi
377     fun <K, A> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKey(
378         numKeys: Int? = null
379     ): TFlow<Map<K, Maybe<A>>> =
380         applyLatestStatefulForKey(init = emptyMap<K, FrpStateful<*>>(), numKeys = numKeys).first
381 
382     /**
383      * Returns a [TFlow] containing the results of applying [transform] to each value of the
384      * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to
385      * [initialValues] immediately.
386      *
387      * [transform] can perform state accumulation via its [FrpStateScope] receiver. With each
388      * invocation of [transform], state accumulation from previous invocation is stopped.
389      *
390      * If the [Maybe] contained within the value for an associated key is [none], then the
391      * previously-active [FrpStateScope] will be stopped with no replacement.
392      */
393     @ExperimentalFrpApi
394     fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey(
395         initialValues: FrpDeferredValue<Map<K, A>>,
396         numKeys: Int? = null,
397         transform: suspend FrpStateScope.(A) -> B,
398     ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> =
399         mapPure { patch -> patch.mapValues { (_, v) -> v.map { statefully { transform(it) } } } }
400             .applyLatestStatefulForKey(
401                 deferredStateScope {
402                     initialValues.get().mapValues { (_, v) -> statefully { transform(v) } }
403                 },
404                 numKeys = numKeys,
405             )
406 
407     /**
408      * Returns a [TFlow] containing the results of applying [transform] to each value of the
409      * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to
410      * [initialValues] immediately.
411      *
412      * [transform] can perform state accumulation via its [FrpStateScope] receiver. With each
413      * invocation of [transform], state accumulation from previous invocation is stopped.
414      *
415      * If the [Maybe] contained within the value for an associated key is [none], then the
416      * previously-active [FrpStateScope] will be stopped with no replacement.
417      */
418     @ExperimentalFrpApi
419     fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey(
420         initialValues: Map<K, A>,
421         numKeys: Int? = null,
422         transform: suspend FrpStateScope.(A) -> B,
423     ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> =
424         mapLatestStatefulForKey(deferredOf(initialValues), numKeys, transform)
425 
426     /**
427      * Returns a [TFlow] containing the results of applying [transform] to each value of the
428      * original [TFlow].
429      *
430      * [transform] can perform state accumulation via its [FrpStateScope] receiver. With each
431      * invocation of [transform], state accumulation from previous invocation is stopped.
432      *
433      * If the [Maybe] contained within the value for an associated key is [none], then the
434      * previously-active [FrpStateScope] will be stopped with no replacement.
435      */
436     @ExperimentalFrpApi
437     fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey(
438         numKeys: Int? = null,
439         transform: suspend FrpStateScope.(A) -> B,
440     ): TFlow<Map<K, Maybe<B>>> = mapLatestStatefulForKey(emptyMap(), numKeys, transform).first
441 
442     /**
443      * Returns a [TFlow] that will only emit the next event of the original [TFlow], and then will
444      * act as [emptyTFlow].
445      *
446      * If the original [TFlow] is emitting an event at this exact time, then it will be the only
447      * even emitted from the result [TFlow].
448      */
449     @ExperimentalFrpApi
450     fun <A> TFlow<A>.nextOnly(): TFlow<A> =
451         if (this === emptyTFlow) {
452             this
453         } else {
454             TFlowLoop<A>().also {
455                 it.loopback = it.mapCheap { emptyTFlow }.hold(this@nextOnly).switch()
456             }
457         }
458 
459     /** Returns a [TFlow] that skips the next emission of the original [TFlow]. */
460     @ExperimentalFrpApi
461     fun <A> TFlow<A>.skipNext(): TFlow<A> =
462         if (this === emptyTFlow) {
463             this
464         } else {
465             nextOnly().mapCheap { this@skipNext }.hold(emptyTFlow).switch()
466         }
467 
468     /**
469      * Returns a [TFlow] that emits values from the original [TFlow] up until [stop] emits a value.
470      *
471      * If the original [TFlow] emits at the same time as [stop], then the returned [TFlow] will emit
472      * that value.
473      */
474     @ExperimentalFrpApi
475     fun <A> TFlow<A>.takeUntil(stop: TFlow<*>): TFlow<A> =
476         if (stop === emptyTFlow) {
477             this
478         } else {
479             stop.mapCheap { emptyTFlow }.nextOnly().hold(this).switch()
480         }
481 
482     /**
483      * Invokes [stateful] in a new [FrpStateScope] that is a child of this one.
484      *
485      * This new scope is stopped when [stop] first emits a value, or when the parent scope is
486      * stopped. Stopping will end all state accumulation; any [TStates][TState] returned from this
487      * scope will no longer update.
488      */
489     @ExperimentalFrpApi
490     fun <A> childStateScope(stop: TFlow<*>, stateful: FrpStateful<A>): FrpDeferredValue<A> {
491         val (_, init: FrpDeferredValue<Map<Unit, A>>) =
492             stop
493                 .nextOnly()
494                 .mapPure { mapOf(Unit to none<FrpStateful<A>>()) }
495                 .applyLatestStatefulForKey(init = mapOf(Unit to stateful), numKeys = 1)
496         return deferredStateScope { init.get().getValue(Unit) }
497     }
498 
499     /**
500      * Returns a [TFlow] that emits values from the original [TFlow] up to and including a value is
501      * emitted that satisfies [predicate].
502      */
503     @ExperimentalFrpApi
504     fun <A> TFlow<A>.takeUntil(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> =
505         takeUntil(filter(predicate))
506 
507     /**
508      * Returns a [TState] that is incrementally updated when this [TFlow] emits a value, by applying
509      * [transform] to both the emitted value and the currently tracked state.
510      *
511      * Note that the value contained within the [TState] is not updated until *after* all [TFlow]s
512      * have been processed; this keeps the value of the [TState] consistent during the entire FRP
513      * transaction.
514      */
515     @ExperimentalFrpApi
516     fun <A, B> TFlow<A>.fold(
517         initialValue: B,
518         transform: suspend FrpTransactionScope.(A, B) -> B,
519     ): TState<B> {
520         lateinit var state: TState<B>
521         return mapPure { a -> transform(a, state.sample()) }.hold(initialValue).also { state = it }
522     }
523 
524     /**
525      * Returns a [TState] that is incrementally updated when this [TFlow] emits a value, by applying
526      * [transform] to both the emitted value and the currently tracked state.
527      *
528      * Note that the value contained within the [TState] is not updated until *after* all [TFlow]s
529      * have been processed; this keeps the value of the [TState] consistent during the entire FRP
530      * transaction.
531      */
532     @ExperimentalFrpApi
533     fun <A, B> TFlow<A>.foldDeferred(
534         initialValue: FrpDeferredValue<B>,
535         transform: suspend FrpTransactionScope.(A, B) -> B,
536     ): TState<B> {
537         lateinit var state: TState<B>
538         return mapPure { a -> transform(a, state.sample()) }
539             .holdDeferred(initialValue)
540             .also { state = it }
541     }
542 
543     /**
544      * Returns a [TState] that holds onto the result of applying the most recently emitted
545      * [FrpStateful] this [TFlow], or [init] if nothing has been emitted since it was constructed.
546      *
547      * When each [FrpStateful] is applied, state accumulation from the previously-active
548      * [FrpStateful] is stopped.
549      *
550      * Note that the value contained within the [TState] is not updated until *after* all [TFlow]s
551      * have been processed; this keeps the value of the [TState] consistent during the entire FRP
552      * transaction.
553      *
554      * Shorthand for:
555      * ```kotlin
556      * val (changes, initApplied) = applyLatestStateful(init)
557      * return changes.toTStateDeferred(initApplied)
558      * ```
559      */
560     @ExperimentalFrpApi
561     fun <A> TFlow<FrpStateful<A>>.holdLatestStateful(init: FrpStateful<A>): TState<A> {
562         val (changes, initApplied) = applyLatestStateful(init)
563         return changes.holdDeferred(initApplied)
564     }
565 
566     /**
567      * Returns a [TFlow] that emits the two most recent emissions from the original [TFlow].
568      * [initialValue] is used as the previous value for the first emission.
569      *
570      * Shorthand for `sample(hold(init)) { new, old -> Pair(old, new) }`
571      */
572     @ExperimentalFrpApi
573     fun <S, T : S> TFlow<T>.pairwise(initialValue: S): TFlow<WithPrev<S, T>> {
574         val previous = hold(initialValue)
575         return mapCheap { new -> WithPrev(previousValue = previous.sample(), newValue = new) }
576     }
577 
578     /**
579      * Returns a [TFlow] that emits the two most recent emissions from the original [TFlow]. Note
580      * that the returned [TFlow] will not emit until the original [TFlow] has emitted twice.
581      */
582     @ExperimentalFrpApi
583     fun <A> TFlow<A>.pairwise(): TFlow<WithPrev<A, A>> =
584         mapCheap { just(it) }
585             .pairwise(none)
586             .mapMaybe { (prev, next) -> prev.zipWith(next, ::WithPrev) }
587 
588     /**
589      * Returns a [TState] that holds both the current and previous values of the original [TState].
590      * [initialPreviousValue] is used as the first previous value.
591      *
592      * Shorthand for `sample(hold(init)) { new, old -> Pair(old, new) }`
593      */
594     @ExperimentalFrpApi
595     fun <S, T : S> TState<T>.pairwise(initialPreviousValue: S): TState<WithPrev<S, T>> =
596         stateChanges
597             .pairwise(initialPreviousValue)
598             .holdDeferred(deferredTransactionScope { WithPrev(initialPreviousValue, sample()) })
599 
600     /**
601      * Returns a [TState] holding a [Map] that is updated incrementally whenever this emits a value.
602      *
603      * The value emitted is used as a "patch" for the tracked [Map]; for each key [K] in the emitted
604      * map, an associated value of [Just] will insert or replace the value in the tracked [Map], and
605      * an associated value of [none] will remove the key from the tracked [Map].
606      */
607     @ExperimentalFrpApi
608     fun <K, V> TFlow<Map<K, Maybe<V>>>.foldMapIncrementally(
609         initialValues: FrpDeferredValue<Map<K, V>>
610     ): TState<Map<K, V>> =
611         foldDeferred(initialValues) { patch, map ->
612             val (adds: List<Pair<K, V>>, removes: List<K>) =
613                 patch
614                     .asSequence()
615                     .map { (k, v) -> if (v is Just) Left(k to v.value) else Right(k) }
616                     .partitionEithers()
617             val removed: Map<K, V> = map - removes.toSet()
618             val updated: Map<K, V> = removed + adds
619             updated
620         }
621 
622     /**
623      * Returns a [TState] holding a [Map] that is updated incrementally whenever this emits a value.
624      *
625      * The value emitted is used as a "patch" for the tracked [Map]; for each key [K] in the emitted
626      * map, an associated value of [Just] will insert or replace the value in the tracked [Map], and
627      * an associated value of [none] will remove the key from the tracked [Map].
628      */
629     @ExperimentalFrpApi
630     fun <K, V> TFlow<Map<K, Maybe<V>>>.foldMapIncrementally(
631         initialValues: Map<K, V> = emptyMap()
632     ): TState<Map<K, V>> = foldMapIncrementally(deferredOf(initialValues))
633 
634     /**
635      * Returns a [TFlow] that wraps each emission of the original [TFlow] into an [IndexedValue],
636      * containing the emitted value and its index (starting from zero).
637      *
638      * Shorthand for:
639      * ```
640      *   val index = fold(0) { _, oldIdx -> oldIdx + 1 }
641      *   sample(index) { a, idx -> IndexedValue(idx, a) }
642      * ```
643      */
644     @ExperimentalFrpApi
645     fun <A> TFlow<A>.withIndex(): TFlow<IndexedValue<A>> {
646         val index = fold(0) { _, old -> old + 1 }
647         return sample(index) { a, idx -> IndexedValue(idx, a) }
648     }
649 
650     /**
651      * Returns a [TFlow] containing the results of applying [transform] to each value of the
652      * original [TFlow] and its index (starting from zero).
653      *
654      * Shorthand for:
655      * ```
656      *   withIndex().map { (idx, a) -> transform(idx, a) }
657      * ```
658      */
659     @ExperimentalFrpApi
660     fun <A, B> TFlow<A>.mapIndexed(transform: suspend FrpTransactionScope.(Int, A) -> B): TFlow<B> {
661         val index = fold(0) { _, i -> i + 1 }
662         return sample(index) { a, idx -> transform(idx, a) }
663     }
664 
665     /** Returns a [TFlow] where all subsequent repetitions of the same value are filtered out. */
666     @ExperimentalFrpApi
667     fun <A> TFlow<A>.distinctUntilChanged(): TFlow<A> {
668         val state: TState<Any?> = hold(Any())
669         return filter { it != state.sample() }
670     }
671 
672     /**
673      * Returns a new [TFlow] that emits at the same rate as the original [TFlow], but combines the
674      * emitted value with the most recent emission from [other] using [transform].
675      *
676      * Note that the returned [TFlow] will not emit anything until [other] has emitted at least one
677      * value.
678      */
679     @ExperimentalFrpApi
680     fun <A, B, C> TFlow<A>.sample(
681         other: TFlow<B>,
682         transform: suspend FrpTransactionScope.(A, B) -> C,
683     ): TFlow<C> {
684         val state = other.mapCheap { just(it) }.hold(none)
685         return sample(state) { a, b -> b.map { transform(a, it) } }.filterJust()
686     }
687 
688     /**
689      * Returns a [TState] that samples the [Transactional] held by the given [TState] within the
690      * same transaction that the state changes.
691      */
692     @ExperimentalFrpApi
693     fun <A> TState<Transactional<A>>.sampleTransactionals(): TState<A> =
694         stateChanges
695             .sampleTransactionals()
696             .holdDeferred(deferredTransactionScope { sample().sample() })
697 
698     /**
699      * Returns a [TState] that transforms the value held inside this [TState] by applying it to the
700      * given function [transform].
701      */
702     @ExperimentalFrpApi
703     fun <A, B> TState<A>.map(transform: suspend FrpTransactionScope.(A) -> B): TState<B> =
704         mapPure { transactionally { transform(it) } }.sampleTransactionals()
705 
706     /**
707      * Returns a [TState] whose value is generated with [transform] by combining the current values
708      * of each given [TState].
709      *
710      * @see TState.combineWith
711      */
712     @ExperimentalFrpApi
713     fun <A, B, Z> combine(
714         stateA: TState<A>,
715         stateB: TState<B>,
716         transform: suspend FrpTransactionScope.(A, B) -> Z,
717     ): TState<Z> =
718         com.android.systemui.kairos
719             .combine(stateA, stateB) { a, b -> transactionally { transform(a, b) } }
720             .sampleTransactionals()
721 
722     /**
723      * Returns a [TState] whose value is generated with [transform] by combining the current values
724      * of each given [TState].
725      *
726      * @see TState.combineWith
727      */
728     @ExperimentalFrpApi
729     fun <A, B, C, D, Z> combine(
730         stateA: TState<A>,
731         stateB: TState<B>,
732         stateC: TState<C>,
733         stateD: TState<D>,
734         transform: suspend FrpTransactionScope.(A, B, C, D) -> Z,
735     ): TState<Z> =
736         com.android.systemui.kairos
737             .combine(stateA, stateB, stateC, stateD) { a, b, c, d ->
738                 transactionally { transform(a, b, c, d) }
739             }
740             .sampleTransactionals()
741 
742     /** Returns a [TState] by applying [transform] to the value held by the original [TState]. */
743     @ExperimentalFrpApi
744     fun <A, B> TState<A>.flatMap(
745         transform: suspend FrpTransactionScope.(A) -> TState<B>
746     ): TState<B> = mapPure { transactionally { transform(it) } }.sampleTransactionals().flatten()
747 
748     /**
749      * Returns a [TState] whose value is generated with [transform] by combining the current values
750      * of each given [TState].
751      *
752      * @see TState.combineWith
753      */
754     @ExperimentalFrpApi
755     fun <A, Z> combine(
756         vararg states: TState<A>,
757         transform: suspend FrpTransactionScope.(List<A>) -> Z,
758     ): TState<Z> = combinePure(*states).map(transform)
759 
760     /**
761      * Returns a [TState] whose value is generated with [transform] by combining the current values
762      * of each given [TState].
763      *
764      * @see TState.combineWith
765      */
766     @ExperimentalFrpApi
767     fun <A, Z> Iterable<TState<A>>.combine(
768         transform: suspend FrpTransactionScope.(List<A>) -> Z
769     ): TState<Z> = combinePure().map(transform)
770 
771     /**
772      * Returns a [TState] by combining the values held inside the given [TState]s by applying them
773      * to the given function [transform].
774      */
775     @ExperimentalFrpApi
776     fun <A, B, C> TState<A>.combineWith(
777         other: TState<B>,
778         transform: suspend FrpTransactionScope.(A, B) -> C,
779     ): TState<C> = combine(this, other, transform)
780 }
781