xref: /aosp_15_r20/frameworks/base/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpBuildScope.kt (revision d57664e9bc4670b3ecf6748a746a57c557b6bc9e)
1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 @file:OptIn(ExperimentalCoroutinesApi::class)
18 
19 package com.android.systemui.kairos
20 
21 import com.android.systemui.kairos.util.Maybe
22 import com.android.systemui.kairos.util.just
23 import com.android.systemui.kairos.util.map
24 import kotlin.coroutines.CoroutineContext
25 import kotlin.coroutines.EmptyCoroutineContext
26 import kotlin.coroutines.RestrictsSuspension
27 import kotlinx.coroutines.CompletableDeferred
28 import kotlinx.coroutines.CoroutineScope
29 import kotlinx.coroutines.Deferred
30 import kotlinx.coroutines.ExperimentalCoroutinesApi
31 import kotlinx.coroutines.Job
32 import kotlinx.coroutines.awaitCancellation
33 import kotlinx.coroutines.coroutineScope
34 import kotlinx.coroutines.flow.Flow
35 import kotlinx.coroutines.flow.FlowCollector
36 import kotlinx.coroutines.flow.MutableSharedFlow
37 import kotlinx.coroutines.flow.MutableStateFlow
38 import kotlinx.coroutines.flow.SharedFlow
39 import kotlinx.coroutines.flow.StateFlow
40 import kotlinx.coroutines.flow.dropWhile
41 import kotlinx.coroutines.flow.scan
42 import kotlinx.coroutines.launch
43 
44 /** A function that modifies the FrpNetwork. */
45 typealias FrpSpec<A> = suspend FrpBuildScope.() -> A
46 
47 /**
48  * Constructs an [FrpSpec]. The passed [block] will be invoked with an [FrpBuildScope] that can be
49  * used to perform network-building operations, including adding new inputs and outputs to the
50  * network, as well as all operations available in [FrpTransactionScope].
51  */
52 @ExperimentalFrpApi
53 @Suppress("NOTHING_TO_INLINE")
54 inline fun <A> frpSpec(noinline block: suspend FrpBuildScope.() -> A): FrpSpec<A> = block
55 
56 /** Applies the [FrpSpec] within this [FrpBuildScope]. */
57 @ExperimentalFrpApi
58 inline operator fun <A> FrpBuildScope.invoke(block: FrpBuildScope.() -> A) = run(block)
59 
60 /** Operations that add inputs and outputs to an FRP network. */
61 @ExperimentalFrpApi
62 @RestrictsSuspension
63 interface FrpBuildScope : FrpStateScope {
64 
65     /** TODO: Javadoc */
66     @ExperimentalFrpApi
67     fun <R> deferredBuildScope(block: suspend FrpBuildScope.() -> R): FrpDeferredValue<R>
68 
69     /** TODO: Javadoc */
70     @ExperimentalFrpApi fun deferredBuildScopeAction(block: suspend FrpBuildScope.() -> Unit)
71 
72     /**
73      * Returns a [TFlow] containing the results of applying [transform] to each value of the
74      * original [TFlow].
75      *
76      * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
77      * Unlike [mapLatestBuild], these modifications are not undone with each subsequent emission of
78      * the original [TFlow].
79      *
80      * **NOTE:** This API does not [observe] the original [TFlow], meaning that unless the returned
81      * (or a downstream) [TFlow] is observed separately, [transform] will not be invoked, and no
82      * internal side-effects will occur.
83      */
84     @ExperimentalFrpApi
85     fun <A, B> TFlow<A>.mapBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B>
86 
87     /**
88      * Invokes [block] whenever this [TFlow] emits a value, allowing side-effects to be safely
89      * performed in reaction to the emission.
90      *
91      * Specifically, [block] is deferred to the end of the transaction, and is only actually
92      * executed if this [FrpBuildScope] is still active by that time. It can be deactivated due to a
93      * -Latest combinator, for example.
94      *
95      * Shorthand for:
96      * ```kotlin
97      *   tFlow.observe { effect { ... } }
98      * ```
99      */
100     @ExperimentalFrpApi
101     fun <A> TFlow<A>.observe(
102         coroutineContext: CoroutineContext = EmptyCoroutineContext,
103         block: suspend FrpEffectScope.(A) -> Unit = {},
104     ): Job
105 
106     /**
107      * Returns a [TFlow] containing the results of applying each [FrpSpec] emitted from the original
108      * [TFlow], and a [FrpDeferredValue] containing the result of applying [initialSpecs]
109      * immediately.
110      *
111      * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] with the same
112      * key are undone (any registered [observers][observe] are unregistered, and any pending
113      * [side-effects][effect] are cancelled).
114      *
115      * If the [Maybe] contained within the value for an associated key is [none], then the
116      * previously-active [FrpSpec] will be undone with no replacement.
117      */
118     @ExperimentalFrpApi
119     fun <K, A, B> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestSpecForKey(
120         initialSpecs: FrpDeferredValue<Map<K, FrpSpec<B>>>,
121         numKeys: Int? = null,
122     ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>>
123 
124     /**
125      * Creates an instance of a [TFlow] with elements that are from [builder].
126      *
127      * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the
128      * provided [MutableTFlow].
129      *
130      * By default, [builder] is only running while the returned [TFlow] is being
131      * [observed][observe]. If you want it to run at all times, simply add a no-op observer:
132      * ```kotlin
133      * tFlow { ... }.apply { observe() }
134      * ```
135      */
136     @ExperimentalFrpApi fun <T> tFlow(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T>
137 
138     /**
139      * Creates an instance of a [TFlow] with elements that are emitted from [builder].
140      *
141      * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the
142      * provided [MutableTFlow].
143      *
144      * By default, [builder] is only running while the returned [TFlow] is being
145      * [observed][observe]. If you want it to run at all times, simply add a no-op observer:
146      * ```kotlin
147      * tFlow { ... }.apply { observe() }
148      * ```
149      *
150      * In the event of backpressure, emissions are *coalesced* into batches. When a value is
151      * [emitted][FrpCoalescingProducerScope.emit] from [builder], it is merged into the batch via
152      * [coalesce]. Once the batch is consumed by the frp network in the next transaction, the batch
153      * is reset back to [getInitialValue].
154      */
155     @ExperimentalFrpApi
156     fun <In, Out> coalescingTFlow(
157         getInitialValue: () -> Out,
158         coalesce: (old: Out, new: In) -> Out,
159         builder: suspend FrpCoalescingProducerScope<In>.() -> Unit,
160     ): TFlow<Out>
161 
162     /**
163      * Creates a new [FrpBuildScope] that is a child of this one.
164      *
165      * This new scope can be manually cancelled via the returned [Job], or will be cancelled
166      * automatically when its parent is cancelled. Cancellation will unregister all
167      * [observers][observe] and cancel all scheduled [effects][effect].
168      *
169      * The return value from [block] can be accessed via the returned [FrpDeferredValue].
170      */
171     @ExperimentalFrpApi fun <A> asyncScope(block: FrpSpec<A>): Pair<FrpDeferredValue<A>, Job>
172 
173     // TODO: once we have context params, these can all become extensions:
174 
175     /**
176      * Returns a [TFlow] containing the results of applying the given [transform] function to each
177      * value of the original [TFlow].
178      *
179      * Unlike [TFlow.map], [transform] can perform arbitrary asynchronous code. This code is run
180      * outside of the current FRP transaction; when [transform] returns, the returned value is
181      * emitted from the result [TFlow] in a new transaction.
182      *
183      * Shorthand for:
184      * ```kotlin
185      * tflow.mapLatestBuild { a -> asyncTFlow { transform(a) } }.flatten()
186      * ```
187      */
188     @ExperimentalFrpApi
189     fun <A, B> TFlow<A>.mapAsyncLatest(transform: suspend (A) -> B): TFlow<B> =
190         mapLatestBuild { a -> asyncTFlow { transform(a) } }.flatten()
191 
192     /**
193      * Invokes [block] whenever this [TFlow] emits a value. [block] receives an [FrpBuildScope] that
194      * can be used to make further modifications to the FRP network, and/or perform side-effects via
195      * [effect].
196      *
197      * @see observe
198      */
199     @ExperimentalFrpApi
200     fun <A> TFlow<A>.observeBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
201         mapBuild(block).observe()
202 
203     /**
204      * Returns a [StateFlow] whose [value][StateFlow.value] tracks the current
205      * [value of this TState][TState.sample], and will emit at the same rate as
206      * [TState.stateChanges].
207      *
208      * Note that the [value][StateFlow.value] is not available until the *end* of the current
209      * transaction. If you need the current value before this time, then use [TState.sample].
210      */
211     @ExperimentalFrpApi
212     fun <A> TState<A>.toStateFlow(): StateFlow<A> {
213         val uninitialized = Any()
214         var initialValue: Any? = uninitialized
215         val innerStateFlow = MutableStateFlow<Any?>(uninitialized)
216         deferredBuildScope {
217             initialValue = sample()
218             stateChanges.observe {
219                 innerStateFlow.value = it
220                 initialValue = null
221             }
222         }
223 
224         @Suppress("UNCHECKED_CAST")
225         fun getValue(innerValue: Any?): A =
226             when {
227                 innerValue !== uninitialized -> innerValue as A
228                 initialValue !== uninitialized -> initialValue as A
229                 else ->
230                     error(
231                         "Attempted to access StateFlow.value before FRP transaction has completed."
232                     )
233             }
234 
235         return object : StateFlow<A> {
236             override val replayCache: List<A>
237                 get() = innerStateFlow.replayCache.map(::getValue)
238 
239             override val value: A
240                 get() = getValue(innerStateFlow.value)
241 
242             override suspend fun collect(collector: FlowCollector<A>): Nothing {
243                 innerStateFlow.collect { collector.emit(getValue(it)) }
244             }
245         }
246     }
247 
248     /**
249      * Returns a [SharedFlow] configured with a replay cache of size [replay] that emits the current
250      * [value][TState.sample] of this [TState] followed by all [stateChanges].
251      */
252     @ExperimentalFrpApi
253     fun <A> TState<A>.toSharedFlow(replay: Int = 0): SharedFlow<A> {
254         val result = MutableSharedFlow<A>(replay, extraBufferCapacity = 1)
255         deferredBuildScope {
256             result.tryEmit(sample())
257             stateChanges.observe { a -> result.tryEmit(a) }
258         }
259         return result
260     }
261 
262     /**
263      * Returns a [SharedFlow] configured with a replay cache of size [replay] that emits values
264      * whenever this [TFlow] emits.
265      */
266     @ExperimentalFrpApi
267     fun <A> TFlow<A>.toSharedFlow(replay: Int = 0): SharedFlow<A> {
268         val result = MutableSharedFlow<A>(replay, extraBufferCapacity = 1)
269         observe { a -> result.tryEmit(a) }
270         return result
271     }
272 
273     /**
274      * Returns a [TState] that holds onto the value returned by applying the most recently emitted
275      * [FrpSpec] from the original [TFlow], or the value returned by applying [initialSpec] if
276      * nothing has been emitted since it was constructed.
277      *
278      * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] are undone (any
279      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
280      * cancelled).
281      */
282     @ExperimentalFrpApi
283     fun <A> TFlow<FrpSpec<A>>.holdLatestSpec(initialSpec: FrpSpec<A>): TState<A> {
284         val (changes: TFlow<A>, initApplied: FrpDeferredValue<A>) = applyLatestSpec(initialSpec)
285         return changes.holdDeferred(initApplied)
286     }
287 
288     /**
289      * Returns a [TState] containing the value returned by applying the [FrpSpec] held by the
290      * original [TState].
291      *
292      * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] are undone (any
293      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
294      * cancelled).
295      */
296     @ExperimentalFrpApi
297     fun <A> TState<FrpSpec<A>>.applyLatestSpec(): TState<A> {
298         val (appliedChanges: TFlow<A>, init: FrpDeferredValue<A>) =
299             stateChanges.applyLatestSpec(frpSpec { sample().applySpec() })
300         return appliedChanges.holdDeferred(init)
301     }
302 
303     /**
304      * Returns a [TFlow] containing the results of applying each [FrpSpec] emitted from the original
305      * [TFlow].
306      *
307      * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] are undone (any
308      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
309      * cancelled).
310      */
311     @ExperimentalFrpApi
312     fun <A> TFlow<FrpSpec<A>>.applyLatestSpec(): TFlow<A> = applyLatestSpec(frpSpec {}).first
313 
314     /**
315      * Returns a [TFlow] that switches to a new [TFlow] produced by [transform] every time the
316      * original [TFlow] emits a value.
317      *
318      * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
319      * When the original [TFlow] emits a new value, those changes are undone (any registered
320      * [observers][observe] are unregistered, and any pending [effects][effect] are cancelled).
321      */
322     @ExperimentalFrpApi
323     fun <A, B> TFlow<A>.flatMapLatestBuild(
324         transform: suspend FrpBuildScope.(A) -> TFlow<B>
325     ): TFlow<B> = mapCheap { frpSpec { transform(it) } }.applyLatestSpec().flatten()
326 
327     /**
328      * Returns a [TState] by applying [transform] to the value held by the original [TState].
329      *
330      * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
331      * When the value held by the original [TState] changes, those changes are undone (any
332      * registered [observers][observe] are unregistered, and any pending [effects][effect] are
333      * cancelled).
334      */
335     @ExperimentalFrpApi
336     fun <A, B> TState<A>.flatMapLatestBuild(
337         transform: suspend FrpBuildScope.(A) -> TState<B>
338     ): TState<B> = mapLatestBuild { transform(it) }.flatten()
339 
340     /**
341      * Returns a [TState] that transforms the value held inside this [TState] by applying it to the
342      * [transform].
343      *
344      * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
345      * When the value held by the original [TState] changes, those changes are undone (any
346      * registered [observers][observe] are unregistered, and any pending [effects][effect] are
347      * cancelled).
348      */
349     @ExperimentalFrpApi
350     fun <A, B> TState<A>.mapLatestBuild(transform: suspend FrpBuildScope.(A) -> B): TState<B> =
351         mapCheapUnsafe { frpSpec { transform(it) } }.applyLatestSpec()
352 
353     /**
354      * Returns a [TFlow] containing the results of applying each [FrpSpec] emitted from the original
355      * [TFlow], and a [FrpDeferredValue] containing the result of applying [initialSpec]
356      * immediately.
357      *
358      * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] are undone (any
359      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
360      * cancelled).
361      */
362     @ExperimentalFrpApi
363     fun <A : Any?, B> TFlow<FrpSpec<B>>.applyLatestSpec(
364         initialSpec: FrpSpec<A>
365     ): Pair<TFlow<B>, FrpDeferredValue<A>> {
366         val (flow, result) =
367             mapCheap { spec -> mapOf(Unit to just(spec)) }
368                 .applyLatestSpecForKey(initialSpecs = mapOf(Unit to initialSpec), numKeys = 1)
369         val outFlow: TFlow<B> =
370             flow.mapMaybe {
371                 checkNotNull(it[Unit]) { "applyLatest: expected result, but none present in: $it" }
372             }
373         val outInit: FrpDeferredValue<A> = deferredBuildScope {
374             val initResult: Map<Unit, A> = result.get()
375             check(Unit in initResult) {
376                 "applyLatest: expected initial result, but none present in: $initResult"
377             }
378             @Suppress("UNCHECKED_CAST")
379             initResult.getOrDefault(Unit) { null } as A
380         }
381         return Pair(outFlow, outInit)
382     }
383 
384     /**
385      * Returns a [TFlow] containing the results of applying [transform] to each value of the
386      * original [TFlow].
387      *
388      * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
389      * With each invocation of [transform], changes from the previous invocation are undone (any
390      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
391      * cancelled).
392      */
393     @ExperimentalFrpApi
394     fun <A, B> TFlow<A>.mapLatestBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B> =
395         mapCheap { frpSpec { transform(it) } }.applyLatestSpec()
396 
397     /**
398      * Returns a [TFlow] containing the results of applying [transform] to each value of the
399      * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to
400      * [initialValue] immediately.
401      *
402      * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
403      * With each invocation of [transform], changes from the previous invocation are undone (any
404      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
405      * cancelled).
406      */
407     @ExperimentalFrpApi
408     fun <A, B> TFlow<A>.mapLatestBuild(
409         initialValue: A,
410         transform: suspend FrpBuildScope.(A) -> B,
411     ): Pair<TFlow<B>, FrpDeferredValue<B>> =
412         mapLatestBuildDeferred(deferredOf(initialValue), transform)
413 
414     /**
415      * Returns a [TFlow] containing the results of applying [transform] to each value of the
416      * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to
417      * [initialValue] immediately.
418      *
419      * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
420      * With each invocation of [transform], changes from the previous invocation are undone (any
421      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
422      * cancelled).
423      */
424     @ExperimentalFrpApi
425     fun <A, B> TFlow<A>.mapLatestBuildDeferred(
426         initialValue: FrpDeferredValue<A>,
427         transform: suspend FrpBuildScope.(A) -> B,
428     ): Pair<TFlow<B>, FrpDeferredValue<B>> =
429         mapCheap { frpSpec { transform(it) } }
430             .applyLatestSpec(initialSpec = frpSpec { transform(initialValue.get()) })
431 
432     /**
433      * Returns a [TFlow] containing the results of applying each [FrpSpec] emitted from the original
434      * [TFlow], and a [FrpDeferredValue] containing the result of applying [initialSpecs]
435      * immediately.
436      *
437      * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] with the same
438      * key are undone (any registered [observers][observe] are unregistered, and any pending
439      * [side-effects][effect] are cancelled).
440      *
441      * If the [Maybe] contained within the value for an associated key is [none], then the
442      * previously-active [FrpSpec] will be undone with no replacement.
443      */
444     @ExperimentalFrpApi
445     fun <K, A, B> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestSpecForKey(
446         initialSpecs: Map<K, FrpSpec<B>>,
447         numKeys: Int? = null,
448     ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> =
449         applyLatestSpecForKey(deferredOf(initialSpecs), numKeys)
450 
451     /**
452      * Returns a [TFlow] containing the results of applying each [FrpSpec] emitted from the original
453      * [TFlow].
454      *
455      * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] with the same
456      * key are undone (any registered [observers][observe] are unregistered, and any pending
457      * [side-effects][effect] are cancelled).
458      *
459      * If the [Maybe] contained within the value for an associated key is [none], then the
460      * previously-active [FrpSpec] will be undone with no replacement.
461      */
462     @ExperimentalFrpApi
463     fun <K, A> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestSpecForKey(
464         numKeys: Int? = null
465     ): TFlow<Map<K, Maybe<A>>> =
466         applyLatestSpecForKey<K, A, Nothing>(deferredOf(emptyMap()), numKeys).first
467 
468     /**
469      * Returns a [TState] containing the latest results of applying each [FrpSpec] emitted from the
470      * original [TFlow].
471      *
472      * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] with the same
473      * key are undone (any registered [observers][observe] are unregistered, and any pending
474      * [side-effects][effect] are cancelled).
475      *
476      * If the [Maybe] contained within the value for an associated key is [none], then the
477      * previously-active [FrpSpec] will be undone with no replacement.
478      */
479     @ExperimentalFrpApi
480     fun <K, A> TFlow<Map<K, Maybe<FrpSpec<A>>>>.holdLatestSpecForKey(
481         initialSpecs: FrpDeferredValue<Map<K, FrpSpec<A>>>,
482         numKeys: Int? = null,
483     ): TState<Map<K, A>> {
484         val (changes, initialValues) = applyLatestSpecForKey(initialSpecs, numKeys)
485         return changes.foldMapIncrementally(initialValues)
486     }
487 
488     /**
489      * Returns a [TState] containing the latest results of applying each [FrpSpec] emitted from the
490      * original [TFlow].
491      *
492      * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] with the same
493      * key are undone (any registered [observers][observe] are unregistered, and any pending
494      * [side-effects][effect] are cancelled).
495      *
496      * If the [Maybe] contained within the value for an associated key is [none], then the
497      * previously-active [FrpSpec] will be undone with no replacement.
498      */
499     @ExperimentalFrpApi
500     fun <K, A> TFlow<Map<K, Maybe<FrpSpec<A>>>>.holdLatestSpecForKey(
501         initialSpecs: Map<K, FrpSpec<A>> = emptyMap(),
502         numKeys: Int? = null,
503     ): TState<Map<K, A>> = holdLatestSpecForKey(deferredOf(initialSpecs), numKeys)
504 
505     /**
506      * Returns a [TFlow] containing the results of applying [transform] to each value of the
507      * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to
508      * [initialValues] immediately.
509      *
510      * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
511      * With each invocation of [transform], changes from the previous invocation are undone (any
512      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
513      * cancelled).
514      *
515      * If the [Maybe] contained within the value for an associated key is [none], then the
516      * previously-active [FrpBuildScope] will be undone with no replacement.
517      */
518     @ExperimentalFrpApi
519     fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey(
520         initialValues: FrpDeferredValue<Map<K, A>>,
521         numKeys: Int? = null,
522         transform: suspend FrpBuildScope.(A) -> B,
523     ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> =
524         map { patch -> patch.mapValues { (_, v) -> v.map { frpSpec { transform(it) } } } }
525             .applyLatestSpecForKey(
526                 deferredBuildScope {
527                     initialValues.get().mapValues { (_, v) -> frpSpec { transform(v) } }
528                 },
529                 numKeys = numKeys,
530             )
531 
532     /**
533      * Returns a [TFlow] containing the results of applying [transform] to each value of the
534      * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to
535      * [initialValues] immediately.
536      *
537      * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
538      * With each invocation of [transform], changes from the previous invocation are undone (any
539      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
540      * cancelled).
541      *
542      * If the [Maybe] contained within the value for an associated key is [none], then the
543      * previously-active [FrpBuildScope] will be undone with no replacement.
544      */
545     @ExperimentalFrpApi
546     fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey(
547         initialValues: Map<K, A>,
548         numKeys: Int? = null,
549         transform: suspend FrpBuildScope.(A) -> B,
550     ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> =
551         mapLatestBuildForKey(deferredOf(initialValues), numKeys, transform)
552 
553     /**
554      * Returns a [TFlow] containing the results of applying [transform] to each value of the
555      * original [TFlow].
556      *
557      * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
558      * With each invocation of [transform], changes from the previous invocation are undone (any
559      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
560      * cancelled).
561      *
562      * If the [Maybe] contained within the value for an associated key is [none], then the
563      * previously-active [FrpBuildScope] will be undone with no replacement.
564      */
565     @ExperimentalFrpApi
566     fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey(
567         numKeys: Int? = null,
568         transform: suspend FrpBuildScope.(A) -> B,
569     ): TFlow<Map<K, Maybe<B>>> = mapLatestBuildForKey(emptyMap(), numKeys, transform).first
570 
571     /** Returns a [Deferred] containing the next value to be emitted from this [TFlow]. */
572     @ExperimentalFrpApi
573     fun <R> TFlow<R>.nextDeferred(): Deferred<R> {
574         lateinit var next: CompletableDeferred<R>
575         val job = nextOnly().observe { next.complete(it) }
576         next = CompletableDeferred<R>(parent = job)
577         return next
578     }
579 
580     /** Returns a [TState] that reflects the [StateFlow.value] of this [StateFlow]. */
581     @ExperimentalFrpApi
582     fun <A> StateFlow<A>.toTState(): TState<A> {
583         val initial = value
584         return tFlow { dropWhile { it == initial }.collect { emit(it) } }.hold(initial)
585     }
586 
587     /** Returns a [TFlow] that emits whenever this [Flow] emits. */
588     @ExperimentalFrpApi fun <A> Flow<A>.toTFlow(): TFlow<A> = tFlow { collect { emit(it) } }
589 
590     /**
591      * Shorthand for:
592      * ```kotlin
593      * flow.toTFlow().hold(initialValue)
594      * ```
595      */
596     @ExperimentalFrpApi
597     fun <A> Flow<A>.toTState(initialValue: A): TState<A> = toTFlow().hold(initialValue)
598 
599     /**
600      * Shorthand for:
601      * ```kotlin
602      * flow.scan(initialValue, operation).toTFlow().hold(initialValue)
603      * ```
604      */
605     @ExperimentalFrpApi
606     fun <A, B> Flow<A>.scanToTState(initialValue: B, operation: (B, A) -> B): TState<B> =
607         scan(initialValue, operation).toTFlow().hold(initialValue)
608 
609     /**
610      * Shorthand for:
611      * ```kotlin
612      * flow.scan(initialValue) { a, f -> f(a) }.toTFlow().hold(initialValue)
613      * ```
614      */
615     @ExperimentalFrpApi
616     fun <A> Flow<(A) -> A>.scanToTState(initialValue: A): TState<A> =
617         scanToTState(initialValue) { a, f -> f(a) }
618 
619     /**
620      * Invokes [block] whenever this [TFlow] emits a value. [block] receives an [FrpBuildScope] that
621      * can be used to make further modifications to the FRP network, and/or perform side-effects via
622      * [effect].
623      *
624      * With each invocation of [block], changes from the previous invocation are undone (any
625      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
626      * cancelled).
627      */
628     @ExperimentalFrpApi
629     fun <A> TFlow<A>.observeLatestBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
630         mapLatestBuild { block(it) }.observe()
631 
632     /**
633      * Invokes [block] whenever this [TFlow] emits a value, allowing side-effects to be safely
634      * performed in reaction to the emission.
635      *
636      * With each invocation of [block], running effects from the previous invocation are cancelled.
637      */
638     @ExperimentalFrpApi
639     fun <A> TFlow<A>.observeLatest(block: suspend FrpEffectScope.(A) -> Unit = {}): Job {
640         var innerJob: Job? = null
641         return observeBuild {
642             innerJob?.cancel()
643             innerJob = effect { block(it) }
644         }
645     }
646 
647     /**
648      * Invokes [block] with the value held by this [TState], allowing side-effects to be safely
649      * performed in reaction to the state changing.
650      *
651      * With each invocation of [block], running effects from the previous invocation are cancelled.
652      */
653     @ExperimentalFrpApi
654     fun <A> TState<A>.observeLatest(block: suspend FrpEffectScope.(A) -> Unit = {}): Job =
655         launchScope {
656             var innerJob = effect { block(sample()) }
657             stateChanges.observeBuild {
658                 innerJob.cancel()
659                 innerJob = effect { block(it) }
660             }
661         }
662 
663     /**
664      * Applies [block] to the value held by this [TState]. [block] receives an [FrpBuildScope] that
665      * can be used to make further modifications to the FRP network, and/or perform side-effects via
666      * [effect].
667      *
668      * [block] can perform modifications to the FRP network via its [FrpBuildScope] receiver. With
669      * each invocation of [block], changes from the previous invocation are undone (any registered
670      * [observers][observe] are unregistered, and any pending [side-effects][effect] are cancelled).
671      */
672     @ExperimentalFrpApi
673     fun <A> TState<A>.observeLatestBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
674         launchScope {
675             var innerJob: Job = launchScope { block(sample()) }
676             stateChanges.observeBuild {
677                 innerJob.cancel()
678                 innerJob = launchScope { block(it) }
679             }
680         }
681 
682     /** Applies the [FrpSpec] within this [FrpBuildScope]. */
683     @ExperimentalFrpApi suspend fun <A> FrpSpec<A>.applySpec(): A = this()
684 
685     /**
686      * Applies the [FrpSpec] within this [FrpBuildScope], returning the result as an
687      * [FrpDeferredValue].
688      */
689     @ExperimentalFrpApi
690     fun <A> FrpSpec<A>.applySpecDeferred(): FrpDeferredValue<A> = deferredBuildScope { applySpec() }
691 
692     /**
693      * Invokes [block] on the value held in this [TState]. [block] receives an [FrpBuildScope] that
694      * can be used to make further modifications to the FRP network, and/or perform side-effects via
695      * [effect].
696      */
697     @ExperimentalFrpApi
698     fun <A> TState<A>.observeBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
699         launchScope {
700             block(sample())
701             stateChanges.observeBuild(block)
702         }
703 
704     /**
705      * Invokes [block] with the current value of this [TState], re-invoking whenever it changes,
706      * allowing side-effects to be safely performed in reaction value changing.
707      *
708      * Specifically, [block] is deferred to the end of the transaction, and is only actually
709      * executed if this [FrpBuildScope] is still active by that time. It can be deactivated due to a
710      * -Latest combinator, for example.
711      *
712      * If the [TState] is changing within the *current* transaction (i.e. [stateChanges] is
713      * presently emitting) then [block] will be invoked for the first time with the new value;
714      * otherwise, it will be invoked with the [current][sample] value.
715      */
716     @ExperimentalFrpApi
717     fun <A> TState<A>.observe(block: suspend FrpEffectScope.(A) -> Unit = {}): Job =
718         now.map { sample() }.mergeWith(stateChanges) { _, new -> new }.observe { block(it) }
719 }
720 
721 /**
722  * Returns a [TFlow] that emits the result of [block] once it completes. [block] is evaluated
723  * outside of the current FRP transaction; when it completes, the returned [TFlow] emits in a new
724  * transaction.
725  *
726  * Shorthand for:
727  * ```
728  * tFlow { emitter: MutableTFlow<A> ->
729  *     val a = block()
730  *     emitter.emit(a)
731  * }
732  * ```
733  */
734 @ExperimentalFrpApi
asyncTFlownull735 fun <A> FrpBuildScope.asyncTFlow(block: suspend () -> A): TFlow<A> =
736     tFlow {
737             // TODO: if block completes synchronously, it would be nice to emit within this
738             //  transaction
739             emit(block())
740         }
<lambda>null741         .apply { observe() }
742 
743 /**
744  * Performs a side-effect in a safe manner w/r/t the current FRP transaction.
745  *
746  * Specifically, [block] is deferred to the end of the current transaction, and is only actually
747  * executed if this [FrpBuildScope] is still active by that time. It can be deactivated due to a
748  * -Latest combinator, for example.
749  *
750  * Shorthand for:
751  * ```kotlin
752  *   now.observe { block() }
753  * ```
754  */
755 @ExperimentalFrpApi
<lambda>null756 fun FrpBuildScope.effect(block: suspend FrpEffectScope.() -> Unit): Job = now.observe { block() }
757 
758 /**
759  * Launches [block] in a new coroutine, returning a [Job] bound to the coroutine.
760  *
761  * This coroutine is not actually started until the *end* of the current FRP transaction. This is
762  * done because the current [FrpBuildScope] might be deactivated within this transaction, perhaps
763  * due to a -Latest combinator. If this happens, then the coroutine will never actually be started.
764  *
765  * Shorthand for:
766  * ```kotlin
767  *   effect { frpCoroutineScope.launch { block() } }
768  * ```
769  */
770 @ExperimentalFrpApi
launchEffectnull771 fun FrpBuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asyncEffect(block)
772 
773 /**
774  * Launches [block] in a new coroutine, returning the result as a [Deferred].
775  *
776  * This coroutine is not actually started until the *end* of the current FRP transaction. This is
777  * done because the current [FrpBuildScope] might be deactivated within this transaction, perhaps
778  * due to a -Latest combinator. If this happens, then the coroutine will never actually be started.
779  *
780  * Shorthand for:
781  * ```kotlin
782  *   CompletableDeferred<R>.apply {
783  *       effect { frpCoroutineScope.launch { complete(coroutineScope { block() }) } }
784  *     }
785  *     .await()
786  * ```
787  */
788 @ExperimentalFrpApi
789 fun <R> FrpBuildScope.asyncEffect(block: suspend CoroutineScope.() -> R): Deferred<R> {
790     val result = CompletableDeferred<R>()
791     val job = now.observe { frpCoroutineScope.launch { result.complete(coroutineScope(block)) } }
792     val handle = job.invokeOnCompletion { result.cancel() }
793     result.invokeOnCompletion {
794         handle.dispose()
795         job.cancel()
796     }
797     return result
798 }
799 
800 /** Like [FrpBuildScope.asyncScope], but ignores the result of [block]. */
launchScopenull801 @ExperimentalFrpApi fun FrpBuildScope.launchScope(block: FrpSpec<*>): Job = asyncScope(block).second
802 
803 /**
804  * Creates an instance of a [TFlow] with elements that are emitted from [builder].
805  *
806  * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the provided
807  * [MutableTFlow].
808  *
809  * By default, [builder] is only running while the returned [TFlow] is being
810  * [observed][FrpBuildScope.observe]. If you want it to run at all times, simply add a no-op
811  * observer:
812  * ```kotlin
813  * tFlow { ... }.apply { observe() }
814  * ```
815  *
816  * In the event of backpressure, emissions are *coalesced* into batches. When a value is
817  * [emitted][FrpCoalescingProducerScope.emit] from [builder], it is merged into the batch via
818  * [coalesce]. Once the batch is consumed by the FRP network in the next transaction, the batch is
819  * reset back to [initialValue].
820  */
821 @ExperimentalFrpApi
822 fun <In, Out> FrpBuildScope.coalescingTFlow(
823     initialValue: Out,
824     coalesce: (old: Out, new: In) -> Out,
825     builder: suspend FrpCoalescingProducerScope<In>.() -> Unit,
826 ): TFlow<Out> = coalescingTFlow(getInitialValue = { initialValue }, coalesce, builder)
827 
828 /**
829  * Creates an instance of a [TFlow] with elements that are emitted from [builder].
830  *
831  * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the provided
832  * [MutableTFlow].
833  *
834  * By default, [builder] is only running while the returned [TFlow] is being
835  * [observed][FrpBuildScope.observe]. If you want it to run at all times, simply add a no-op
836  * observer:
837  * ```kotlin
838  * tFlow { ... }.apply { observe() }
839  * ```
840  *
841  * In the event of backpressure, emissions are *conflated*; any older emissions are dropped and only
842  * the most recent emission will be used when the FRP network is ready.
843  */
844 @ExperimentalFrpApi
conflatedTFlownull845 fun <T> FrpBuildScope.conflatedTFlow(
846     builder: suspend FrpCoalescingProducerScope<T>.() -> Unit
847 ): TFlow<T> =
848     coalescingTFlow<T, Any?>(initialValue = Any(), coalesce = { _, new -> new }, builder = builder)
<lambda>null849         .mapCheap {
850             @Suppress("UNCHECKED_CAST")
851             it as T
852         }
853 
854 /** Scope for emitting to a [FrpBuildScope.coalescingTFlow]. */
855 interface FrpCoalescingProducerScope<in T> {
856     /**
857      * Inserts [value] into the current batch, enqueueing it for emission from this [TFlow] if not
858      * already pending.
859      *
860      * Backpressure occurs when [emit] is called while the FRP network is currently in a
861      * transaction; if called multiple times, then emissions will be coalesced into a single batch
862      * that is then processed when the network is ready.
863      */
emitnull864     fun emit(value: T)
865 }
866 
867 /** Scope for emitting to a [FrpBuildScope.tFlow]. */
868 interface FrpProducerScope<in T> {
869     /**
870      * Emits a [value] to this [TFlow], suspending the caller until the FRP transaction containing
871      * the emission has completed.
872      */
873     suspend fun emit(value: T)
874 }
875 
876 /**
877  * Suspends forever. Upon cancellation, runs [block]. Useful for unregistering callbacks inside of
878  * [FrpBuildScope.tFlow] and [FrpBuildScope.coalescingTFlow].
879  */
awaitClosenull880 suspend fun awaitClose(block: () -> Unit): Nothing =
881     try {
882         awaitCancellation()
883     } finally {
884         block()
885     }
886