1 /*
<lambda>null2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 @file:OptIn(ExperimentalCoroutinesApi::class)
18
19 package com.android.systemui.kairos
20
21 import com.android.systemui.kairos.util.Maybe
22 import com.android.systemui.kairos.util.just
23 import com.android.systemui.kairos.util.map
24 import kotlin.coroutines.CoroutineContext
25 import kotlin.coroutines.EmptyCoroutineContext
26 import kotlin.coroutines.RestrictsSuspension
27 import kotlinx.coroutines.CompletableDeferred
28 import kotlinx.coroutines.CoroutineScope
29 import kotlinx.coroutines.Deferred
30 import kotlinx.coroutines.ExperimentalCoroutinesApi
31 import kotlinx.coroutines.Job
32 import kotlinx.coroutines.awaitCancellation
33 import kotlinx.coroutines.coroutineScope
34 import kotlinx.coroutines.flow.Flow
35 import kotlinx.coroutines.flow.FlowCollector
36 import kotlinx.coroutines.flow.MutableSharedFlow
37 import kotlinx.coroutines.flow.MutableStateFlow
38 import kotlinx.coroutines.flow.SharedFlow
39 import kotlinx.coroutines.flow.StateFlow
40 import kotlinx.coroutines.flow.dropWhile
41 import kotlinx.coroutines.flow.scan
42 import kotlinx.coroutines.launch
43
44 /** A function that modifies the FrpNetwork. */
45 typealias FrpSpec<A> = suspend FrpBuildScope.() -> A
46
47 /**
48 * Constructs an [FrpSpec]. The passed [block] will be invoked with an [FrpBuildScope] that can be
49 * used to perform network-building operations, including adding new inputs and outputs to the
50 * network, as well as all operations available in [FrpTransactionScope].
51 */
52 @ExperimentalFrpApi
53 @Suppress("NOTHING_TO_INLINE")
54 inline fun <A> frpSpec(noinline block: suspend FrpBuildScope.() -> A): FrpSpec<A> = block
55
56 /** Applies the [FrpSpec] within this [FrpBuildScope]. */
57 @ExperimentalFrpApi
58 inline operator fun <A> FrpBuildScope.invoke(block: FrpBuildScope.() -> A) = run(block)
59
60 /** Operations that add inputs and outputs to an FRP network. */
61 @ExperimentalFrpApi
62 @RestrictsSuspension
63 interface FrpBuildScope : FrpStateScope {
64
65 /** TODO: Javadoc */
66 @ExperimentalFrpApi
67 fun <R> deferredBuildScope(block: suspend FrpBuildScope.() -> R): FrpDeferredValue<R>
68
69 /** TODO: Javadoc */
70 @ExperimentalFrpApi fun deferredBuildScopeAction(block: suspend FrpBuildScope.() -> Unit)
71
72 /**
73 * Returns a [TFlow] containing the results of applying [transform] to each value of the
74 * original [TFlow].
75 *
76 * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
77 * Unlike [mapLatestBuild], these modifications are not undone with each subsequent emission of
78 * the original [TFlow].
79 *
80 * **NOTE:** This API does not [observe] the original [TFlow], meaning that unless the returned
81 * (or a downstream) [TFlow] is observed separately, [transform] will not be invoked, and no
82 * internal side-effects will occur.
83 */
84 @ExperimentalFrpApi
85 fun <A, B> TFlow<A>.mapBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B>
86
87 /**
88 * Invokes [block] whenever this [TFlow] emits a value, allowing side-effects to be safely
89 * performed in reaction to the emission.
90 *
91 * Specifically, [block] is deferred to the end of the transaction, and is only actually
92 * executed if this [FrpBuildScope] is still active by that time. It can be deactivated due to a
93 * -Latest combinator, for example.
94 *
95 * Shorthand for:
96 * ```kotlin
97 * tFlow.observe { effect { ... } }
98 * ```
99 */
100 @ExperimentalFrpApi
101 fun <A> TFlow<A>.observe(
102 coroutineContext: CoroutineContext = EmptyCoroutineContext,
103 block: suspend FrpEffectScope.(A) -> Unit = {},
104 ): Job
105
106 /**
107 * Returns a [TFlow] containing the results of applying each [FrpSpec] emitted from the original
108 * [TFlow], and a [FrpDeferredValue] containing the result of applying [initialSpecs]
109 * immediately.
110 *
111 * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] with the same
112 * key are undone (any registered [observers][observe] are unregistered, and any pending
113 * [side-effects][effect] are cancelled).
114 *
115 * If the [Maybe] contained within the value for an associated key is [none], then the
116 * previously-active [FrpSpec] will be undone with no replacement.
117 */
118 @ExperimentalFrpApi
119 fun <K, A, B> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestSpecForKey(
120 initialSpecs: FrpDeferredValue<Map<K, FrpSpec<B>>>,
121 numKeys: Int? = null,
122 ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>>
123
124 /**
125 * Creates an instance of a [TFlow] with elements that are from [builder].
126 *
127 * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the
128 * provided [MutableTFlow].
129 *
130 * By default, [builder] is only running while the returned [TFlow] is being
131 * [observed][observe]. If you want it to run at all times, simply add a no-op observer:
132 * ```kotlin
133 * tFlow { ... }.apply { observe() }
134 * ```
135 */
136 @ExperimentalFrpApi fun <T> tFlow(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T>
137
138 /**
139 * Creates an instance of a [TFlow] with elements that are emitted from [builder].
140 *
141 * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the
142 * provided [MutableTFlow].
143 *
144 * By default, [builder] is only running while the returned [TFlow] is being
145 * [observed][observe]. If you want it to run at all times, simply add a no-op observer:
146 * ```kotlin
147 * tFlow { ... }.apply { observe() }
148 * ```
149 *
150 * In the event of backpressure, emissions are *coalesced* into batches. When a value is
151 * [emitted][FrpCoalescingProducerScope.emit] from [builder], it is merged into the batch via
152 * [coalesce]. Once the batch is consumed by the frp network in the next transaction, the batch
153 * is reset back to [getInitialValue].
154 */
155 @ExperimentalFrpApi
156 fun <In, Out> coalescingTFlow(
157 getInitialValue: () -> Out,
158 coalesce: (old: Out, new: In) -> Out,
159 builder: suspend FrpCoalescingProducerScope<In>.() -> Unit,
160 ): TFlow<Out>
161
162 /**
163 * Creates a new [FrpBuildScope] that is a child of this one.
164 *
165 * This new scope can be manually cancelled via the returned [Job], or will be cancelled
166 * automatically when its parent is cancelled. Cancellation will unregister all
167 * [observers][observe] and cancel all scheduled [effects][effect].
168 *
169 * The return value from [block] can be accessed via the returned [FrpDeferredValue].
170 */
171 @ExperimentalFrpApi fun <A> asyncScope(block: FrpSpec<A>): Pair<FrpDeferredValue<A>, Job>
172
173 // TODO: once we have context params, these can all become extensions:
174
175 /**
176 * Returns a [TFlow] containing the results of applying the given [transform] function to each
177 * value of the original [TFlow].
178 *
179 * Unlike [TFlow.map], [transform] can perform arbitrary asynchronous code. This code is run
180 * outside of the current FRP transaction; when [transform] returns, the returned value is
181 * emitted from the result [TFlow] in a new transaction.
182 *
183 * Shorthand for:
184 * ```kotlin
185 * tflow.mapLatestBuild { a -> asyncTFlow { transform(a) } }.flatten()
186 * ```
187 */
188 @ExperimentalFrpApi
189 fun <A, B> TFlow<A>.mapAsyncLatest(transform: suspend (A) -> B): TFlow<B> =
190 mapLatestBuild { a -> asyncTFlow { transform(a) } }.flatten()
191
192 /**
193 * Invokes [block] whenever this [TFlow] emits a value. [block] receives an [FrpBuildScope] that
194 * can be used to make further modifications to the FRP network, and/or perform side-effects via
195 * [effect].
196 *
197 * @see observe
198 */
199 @ExperimentalFrpApi
200 fun <A> TFlow<A>.observeBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
201 mapBuild(block).observe()
202
203 /**
204 * Returns a [StateFlow] whose [value][StateFlow.value] tracks the current
205 * [value of this TState][TState.sample], and will emit at the same rate as
206 * [TState.stateChanges].
207 *
208 * Note that the [value][StateFlow.value] is not available until the *end* of the current
209 * transaction. If you need the current value before this time, then use [TState.sample].
210 */
211 @ExperimentalFrpApi
212 fun <A> TState<A>.toStateFlow(): StateFlow<A> {
213 val uninitialized = Any()
214 var initialValue: Any? = uninitialized
215 val innerStateFlow = MutableStateFlow<Any?>(uninitialized)
216 deferredBuildScope {
217 initialValue = sample()
218 stateChanges.observe {
219 innerStateFlow.value = it
220 initialValue = null
221 }
222 }
223
224 @Suppress("UNCHECKED_CAST")
225 fun getValue(innerValue: Any?): A =
226 when {
227 innerValue !== uninitialized -> innerValue as A
228 initialValue !== uninitialized -> initialValue as A
229 else ->
230 error(
231 "Attempted to access StateFlow.value before FRP transaction has completed."
232 )
233 }
234
235 return object : StateFlow<A> {
236 override val replayCache: List<A>
237 get() = innerStateFlow.replayCache.map(::getValue)
238
239 override val value: A
240 get() = getValue(innerStateFlow.value)
241
242 override suspend fun collect(collector: FlowCollector<A>): Nothing {
243 innerStateFlow.collect { collector.emit(getValue(it)) }
244 }
245 }
246 }
247
248 /**
249 * Returns a [SharedFlow] configured with a replay cache of size [replay] that emits the current
250 * [value][TState.sample] of this [TState] followed by all [stateChanges].
251 */
252 @ExperimentalFrpApi
253 fun <A> TState<A>.toSharedFlow(replay: Int = 0): SharedFlow<A> {
254 val result = MutableSharedFlow<A>(replay, extraBufferCapacity = 1)
255 deferredBuildScope {
256 result.tryEmit(sample())
257 stateChanges.observe { a -> result.tryEmit(a) }
258 }
259 return result
260 }
261
262 /**
263 * Returns a [SharedFlow] configured with a replay cache of size [replay] that emits values
264 * whenever this [TFlow] emits.
265 */
266 @ExperimentalFrpApi
267 fun <A> TFlow<A>.toSharedFlow(replay: Int = 0): SharedFlow<A> {
268 val result = MutableSharedFlow<A>(replay, extraBufferCapacity = 1)
269 observe { a -> result.tryEmit(a) }
270 return result
271 }
272
273 /**
274 * Returns a [TState] that holds onto the value returned by applying the most recently emitted
275 * [FrpSpec] from the original [TFlow], or the value returned by applying [initialSpec] if
276 * nothing has been emitted since it was constructed.
277 *
278 * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] are undone (any
279 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
280 * cancelled).
281 */
282 @ExperimentalFrpApi
283 fun <A> TFlow<FrpSpec<A>>.holdLatestSpec(initialSpec: FrpSpec<A>): TState<A> {
284 val (changes: TFlow<A>, initApplied: FrpDeferredValue<A>) = applyLatestSpec(initialSpec)
285 return changes.holdDeferred(initApplied)
286 }
287
288 /**
289 * Returns a [TState] containing the value returned by applying the [FrpSpec] held by the
290 * original [TState].
291 *
292 * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] are undone (any
293 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
294 * cancelled).
295 */
296 @ExperimentalFrpApi
297 fun <A> TState<FrpSpec<A>>.applyLatestSpec(): TState<A> {
298 val (appliedChanges: TFlow<A>, init: FrpDeferredValue<A>) =
299 stateChanges.applyLatestSpec(frpSpec { sample().applySpec() })
300 return appliedChanges.holdDeferred(init)
301 }
302
303 /**
304 * Returns a [TFlow] containing the results of applying each [FrpSpec] emitted from the original
305 * [TFlow].
306 *
307 * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] are undone (any
308 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
309 * cancelled).
310 */
311 @ExperimentalFrpApi
312 fun <A> TFlow<FrpSpec<A>>.applyLatestSpec(): TFlow<A> = applyLatestSpec(frpSpec {}).first
313
314 /**
315 * Returns a [TFlow] that switches to a new [TFlow] produced by [transform] every time the
316 * original [TFlow] emits a value.
317 *
318 * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
319 * When the original [TFlow] emits a new value, those changes are undone (any registered
320 * [observers][observe] are unregistered, and any pending [effects][effect] are cancelled).
321 */
322 @ExperimentalFrpApi
323 fun <A, B> TFlow<A>.flatMapLatestBuild(
324 transform: suspend FrpBuildScope.(A) -> TFlow<B>
325 ): TFlow<B> = mapCheap { frpSpec { transform(it) } }.applyLatestSpec().flatten()
326
327 /**
328 * Returns a [TState] by applying [transform] to the value held by the original [TState].
329 *
330 * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
331 * When the value held by the original [TState] changes, those changes are undone (any
332 * registered [observers][observe] are unregistered, and any pending [effects][effect] are
333 * cancelled).
334 */
335 @ExperimentalFrpApi
336 fun <A, B> TState<A>.flatMapLatestBuild(
337 transform: suspend FrpBuildScope.(A) -> TState<B>
338 ): TState<B> = mapLatestBuild { transform(it) }.flatten()
339
340 /**
341 * Returns a [TState] that transforms the value held inside this [TState] by applying it to the
342 * [transform].
343 *
344 * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
345 * When the value held by the original [TState] changes, those changes are undone (any
346 * registered [observers][observe] are unregistered, and any pending [effects][effect] are
347 * cancelled).
348 */
349 @ExperimentalFrpApi
350 fun <A, B> TState<A>.mapLatestBuild(transform: suspend FrpBuildScope.(A) -> B): TState<B> =
351 mapCheapUnsafe { frpSpec { transform(it) } }.applyLatestSpec()
352
353 /**
354 * Returns a [TFlow] containing the results of applying each [FrpSpec] emitted from the original
355 * [TFlow], and a [FrpDeferredValue] containing the result of applying [initialSpec]
356 * immediately.
357 *
358 * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] are undone (any
359 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
360 * cancelled).
361 */
362 @ExperimentalFrpApi
363 fun <A : Any?, B> TFlow<FrpSpec<B>>.applyLatestSpec(
364 initialSpec: FrpSpec<A>
365 ): Pair<TFlow<B>, FrpDeferredValue<A>> {
366 val (flow, result) =
367 mapCheap { spec -> mapOf(Unit to just(spec)) }
368 .applyLatestSpecForKey(initialSpecs = mapOf(Unit to initialSpec), numKeys = 1)
369 val outFlow: TFlow<B> =
370 flow.mapMaybe {
371 checkNotNull(it[Unit]) { "applyLatest: expected result, but none present in: $it" }
372 }
373 val outInit: FrpDeferredValue<A> = deferredBuildScope {
374 val initResult: Map<Unit, A> = result.get()
375 check(Unit in initResult) {
376 "applyLatest: expected initial result, but none present in: $initResult"
377 }
378 @Suppress("UNCHECKED_CAST")
379 initResult.getOrDefault(Unit) { null } as A
380 }
381 return Pair(outFlow, outInit)
382 }
383
384 /**
385 * Returns a [TFlow] containing the results of applying [transform] to each value of the
386 * original [TFlow].
387 *
388 * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
389 * With each invocation of [transform], changes from the previous invocation are undone (any
390 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
391 * cancelled).
392 */
393 @ExperimentalFrpApi
394 fun <A, B> TFlow<A>.mapLatestBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B> =
395 mapCheap { frpSpec { transform(it) } }.applyLatestSpec()
396
397 /**
398 * Returns a [TFlow] containing the results of applying [transform] to each value of the
399 * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to
400 * [initialValue] immediately.
401 *
402 * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
403 * With each invocation of [transform], changes from the previous invocation are undone (any
404 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
405 * cancelled).
406 */
407 @ExperimentalFrpApi
408 fun <A, B> TFlow<A>.mapLatestBuild(
409 initialValue: A,
410 transform: suspend FrpBuildScope.(A) -> B,
411 ): Pair<TFlow<B>, FrpDeferredValue<B>> =
412 mapLatestBuildDeferred(deferredOf(initialValue), transform)
413
414 /**
415 * Returns a [TFlow] containing the results of applying [transform] to each value of the
416 * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to
417 * [initialValue] immediately.
418 *
419 * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
420 * With each invocation of [transform], changes from the previous invocation are undone (any
421 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
422 * cancelled).
423 */
424 @ExperimentalFrpApi
425 fun <A, B> TFlow<A>.mapLatestBuildDeferred(
426 initialValue: FrpDeferredValue<A>,
427 transform: suspend FrpBuildScope.(A) -> B,
428 ): Pair<TFlow<B>, FrpDeferredValue<B>> =
429 mapCheap { frpSpec { transform(it) } }
430 .applyLatestSpec(initialSpec = frpSpec { transform(initialValue.get()) })
431
432 /**
433 * Returns a [TFlow] containing the results of applying each [FrpSpec] emitted from the original
434 * [TFlow], and a [FrpDeferredValue] containing the result of applying [initialSpecs]
435 * immediately.
436 *
437 * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] with the same
438 * key are undone (any registered [observers][observe] are unregistered, and any pending
439 * [side-effects][effect] are cancelled).
440 *
441 * If the [Maybe] contained within the value for an associated key is [none], then the
442 * previously-active [FrpSpec] will be undone with no replacement.
443 */
444 @ExperimentalFrpApi
445 fun <K, A, B> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestSpecForKey(
446 initialSpecs: Map<K, FrpSpec<B>>,
447 numKeys: Int? = null,
448 ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> =
449 applyLatestSpecForKey(deferredOf(initialSpecs), numKeys)
450
451 /**
452 * Returns a [TFlow] containing the results of applying each [FrpSpec] emitted from the original
453 * [TFlow].
454 *
455 * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] with the same
456 * key are undone (any registered [observers][observe] are unregistered, and any pending
457 * [side-effects][effect] are cancelled).
458 *
459 * If the [Maybe] contained within the value for an associated key is [none], then the
460 * previously-active [FrpSpec] will be undone with no replacement.
461 */
462 @ExperimentalFrpApi
463 fun <K, A> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestSpecForKey(
464 numKeys: Int? = null
465 ): TFlow<Map<K, Maybe<A>>> =
466 applyLatestSpecForKey<K, A, Nothing>(deferredOf(emptyMap()), numKeys).first
467
468 /**
469 * Returns a [TState] containing the latest results of applying each [FrpSpec] emitted from the
470 * original [TFlow].
471 *
472 * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] with the same
473 * key are undone (any registered [observers][observe] are unregistered, and any pending
474 * [side-effects][effect] are cancelled).
475 *
476 * If the [Maybe] contained within the value for an associated key is [none], then the
477 * previously-active [FrpSpec] will be undone with no replacement.
478 */
479 @ExperimentalFrpApi
480 fun <K, A> TFlow<Map<K, Maybe<FrpSpec<A>>>>.holdLatestSpecForKey(
481 initialSpecs: FrpDeferredValue<Map<K, FrpSpec<A>>>,
482 numKeys: Int? = null,
483 ): TState<Map<K, A>> {
484 val (changes, initialValues) = applyLatestSpecForKey(initialSpecs, numKeys)
485 return changes.foldMapIncrementally(initialValues)
486 }
487
488 /**
489 * Returns a [TState] containing the latest results of applying each [FrpSpec] emitted from the
490 * original [TFlow].
491 *
492 * When each [FrpSpec] is applied, changes from the previously-active [FrpSpec] with the same
493 * key are undone (any registered [observers][observe] are unregistered, and any pending
494 * [side-effects][effect] are cancelled).
495 *
496 * If the [Maybe] contained within the value for an associated key is [none], then the
497 * previously-active [FrpSpec] will be undone with no replacement.
498 */
499 @ExperimentalFrpApi
500 fun <K, A> TFlow<Map<K, Maybe<FrpSpec<A>>>>.holdLatestSpecForKey(
501 initialSpecs: Map<K, FrpSpec<A>> = emptyMap(),
502 numKeys: Int? = null,
503 ): TState<Map<K, A>> = holdLatestSpecForKey(deferredOf(initialSpecs), numKeys)
504
505 /**
506 * Returns a [TFlow] containing the results of applying [transform] to each value of the
507 * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to
508 * [initialValues] immediately.
509 *
510 * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
511 * With each invocation of [transform], changes from the previous invocation are undone (any
512 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
513 * cancelled).
514 *
515 * If the [Maybe] contained within the value for an associated key is [none], then the
516 * previously-active [FrpBuildScope] will be undone with no replacement.
517 */
518 @ExperimentalFrpApi
519 fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey(
520 initialValues: FrpDeferredValue<Map<K, A>>,
521 numKeys: Int? = null,
522 transform: suspend FrpBuildScope.(A) -> B,
523 ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> =
524 map { patch -> patch.mapValues { (_, v) -> v.map { frpSpec { transform(it) } } } }
525 .applyLatestSpecForKey(
526 deferredBuildScope {
527 initialValues.get().mapValues { (_, v) -> frpSpec { transform(v) } }
528 },
529 numKeys = numKeys,
530 )
531
532 /**
533 * Returns a [TFlow] containing the results of applying [transform] to each value of the
534 * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to
535 * [initialValues] immediately.
536 *
537 * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
538 * With each invocation of [transform], changes from the previous invocation are undone (any
539 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
540 * cancelled).
541 *
542 * If the [Maybe] contained within the value for an associated key is [none], then the
543 * previously-active [FrpBuildScope] will be undone with no replacement.
544 */
545 @ExperimentalFrpApi
546 fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey(
547 initialValues: Map<K, A>,
548 numKeys: Int? = null,
549 transform: suspend FrpBuildScope.(A) -> B,
550 ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> =
551 mapLatestBuildForKey(deferredOf(initialValues), numKeys, transform)
552
553 /**
554 * Returns a [TFlow] containing the results of applying [transform] to each value of the
555 * original [TFlow].
556 *
557 * [transform] can perform modifications to the FRP network via its [FrpBuildScope] receiver.
558 * With each invocation of [transform], changes from the previous invocation are undone (any
559 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
560 * cancelled).
561 *
562 * If the [Maybe] contained within the value for an associated key is [none], then the
563 * previously-active [FrpBuildScope] will be undone with no replacement.
564 */
565 @ExperimentalFrpApi
566 fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey(
567 numKeys: Int? = null,
568 transform: suspend FrpBuildScope.(A) -> B,
569 ): TFlow<Map<K, Maybe<B>>> = mapLatestBuildForKey(emptyMap(), numKeys, transform).first
570
571 /** Returns a [Deferred] containing the next value to be emitted from this [TFlow]. */
572 @ExperimentalFrpApi
573 fun <R> TFlow<R>.nextDeferred(): Deferred<R> {
574 lateinit var next: CompletableDeferred<R>
575 val job = nextOnly().observe { next.complete(it) }
576 next = CompletableDeferred<R>(parent = job)
577 return next
578 }
579
580 /** Returns a [TState] that reflects the [StateFlow.value] of this [StateFlow]. */
581 @ExperimentalFrpApi
582 fun <A> StateFlow<A>.toTState(): TState<A> {
583 val initial = value
584 return tFlow { dropWhile { it == initial }.collect { emit(it) } }.hold(initial)
585 }
586
587 /** Returns a [TFlow] that emits whenever this [Flow] emits. */
588 @ExperimentalFrpApi fun <A> Flow<A>.toTFlow(): TFlow<A> = tFlow { collect { emit(it) } }
589
590 /**
591 * Shorthand for:
592 * ```kotlin
593 * flow.toTFlow().hold(initialValue)
594 * ```
595 */
596 @ExperimentalFrpApi
597 fun <A> Flow<A>.toTState(initialValue: A): TState<A> = toTFlow().hold(initialValue)
598
599 /**
600 * Shorthand for:
601 * ```kotlin
602 * flow.scan(initialValue, operation).toTFlow().hold(initialValue)
603 * ```
604 */
605 @ExperimentalFrpApi
606 fun <A, B> Flow<A>.scanToTState(initialValue: B, operation: (B, A) -> B): TState<B> =
607 scan(initialValue, operation).toTFlow().hold(initialValue)
608
609 /**
610 * Shorthand for:
611 * ```kotlin
612 * flow.scan(initialValue) { a, f -> f(a) }.toTFlow().hold(initialValue)
613 * ```
614 */
615 @ExperimentalFrpApi
616 fun <A> Flow<(A) -> A>.scanToTState(initialValue: A): TState<A> =
617 scanToTState(initialValue) { a, f -> f(a) }
618
619 /**
620 * Invokes [block] whenever this [TFlow] emits a value. [block] receives an [FrpBuildScope] that
621 * can be used to make further modifications to the FRP network, and/or perform side-effects via
622 * [effect].
623 *
624 * With each invocation of [block], changes from the previous invocation are undone (any
625 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
626 * cancelled).
627 */
628 @ExperimentalFrpApi
629 fun <A> TFlow<A>.observeLatestBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
630 mapLatestBuild { block(it) }.observe()
631
632 /**
633 * Invokes [block] whenever this [TFlow] emits a value, allowing side-effects to be safely
634 * performed in reaction to the emission.
635 *
636 * With each invocation of [block], running effects from the previous invocation are cancelled.
637 */
638 @ExperimentalFrpApi
639 fun <A> TFlow<A>.observeLatest(block: suspend FrpEffectScope.(A) -> Unit = {}): Job {
640 var innerJob: Job? = null
641 return observeBuild {
642 innerJob?.cancel()
643 innerJob = effect { block(it) }
644 }
645 }
646
647 /**
648 * Invokes [block] with the value held by this [TState], allowing side-effects to be safely
649 * performed in reaction to the state changing.
650 *
651 * With each invocation of [block], running effects from the previous invocation are cancelled.
652 */
653 @ExperimentalFrpApi
654 fun <A> TState<A>.observeLatest(block: suspend FrpEffectScope.(A) -> Unit = {}): Job =
655 launchScope {
656 var innerJob = effect { block(sample()) }
657 stateChanges.observeBuild {
658 innerJob.cancel()
659 innerJob = effect { block(it) }
660 }
661 }
662
663 /**
664 * Applies [block] to the value held by this [TState]. [block] receives an [FrpBuildScope] that
665 * can be used to make further modifications to the FRP network, and/or perform side-effects via
666 * [effect].
667 *
668 * [block] can perform modifications to the FRP network via its [FrpBuildScope] receiver. With
669 * each invocation of [block], changes from the previous invocation are undone (any registered
670 * [observers][observe] are unregistered, and any pending [side-effects][effect] are cancelled).
671 */
672 @ExperimentalFrpApi
673 fun <A> TState<A>.observeLatestBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
674 launchScope {
675 var innerJob: Job = launchScope { block(sample()) }
676 stateChanges.observeBuild {
677 innerJob.cancel()
678 innerJob = launchScope { block(it) }
679 }
680 }
681
682 /** Applies the [FrpSpec] within this [FrpBuildScope]. */
683 @ExperimentalFrpApi suspend fun <A> FrpSpec<A>.applySpec(): A = this()
684
685 /**
686 * Applies the [FrpSpec] within this [FrpBuildScope], returning the result as an
687 * [FrpDeferredValue].
688 */
689 @ExperimentalFrpApi
690 fun <A> FrpSpec<A>.applySpecDeferred(): FrpDeferredValue<A> = deferredBuildScope { applySpec() }
691
692 /**
693 * Invokes [block] on the value held in this [TState]. [block] receives an [FrpBuildScope] that
694 * can be used to make further modifications to the FRP network, and/or perform side-effects via
695 * [effect].
696 */
697 @ExperimentalFrpApi
698 fun <A> TState<A>.observeBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
699 launchScope {
700 block(sample())
701 stateChanges.observeBuild(block)
702 }
703
704 /**
705 * Invokes [block] with the current value of this [TState], re-invoking whenever it changes,
706 * allowing side-effects to be safely performed in reaction value changing.
707 *
708 * Specifically, [block] is deferred to the end of the transaction, and is only actually
709 * executed if this [FrpBuildScope] is still active by that time. It can be deactivated due to a
710 * -Latest combinator, for example.
711 *
712 * If the [TState] is changing within the *current* transaction (i.e. [stateChanges] is
713 * presently emitting) then [block] will be invoked for the first time with the new value;
714 * otherwise, it will be invoked with the [current][sample] value.
715 */
716 @ExperimentalFrpApi
717 fun <A> TState<A>.observe(block: suspend FrpEffectScope.(A) -> Unit = {}): Job =
718 now.map { sample() }.mergeWith(stateChanges) { _, new -> new }.observe { block(it) }
719 }
720
721 /**
722 * Returns a [TFlow] that emits the result of [block] once it completes. [block] is evaluated
723 * outside of the current FRP transaction; when it completes, the returned [TFlow] emits in a new
724 * transaction.
725 *
726 * Shorthand for:
727 * ```
728 * tFlow { emitter: MutableTFlow<A> ->
729 * val a = block()
730 * emitter.emit(a)
731 * }
732 * ```
733 */
734 @ExperimentalFrpApi
asyncTFlownull735 fun <A> FrpBuildScope.asyncTFlow(block: suspend () -> A): TFlow<A> =
736 tFlow {
737 // TODO: if block completes synchronously, it would be nice to emit within this
738 // transaction
739 emit(block())
740 }
<lambda>null741 .apply { observe() }
742
743 /**
744 * Performs a side-effect in a safe manner w/r/t the current FRP transaction.
745 *
746 * Specifically, [block] is deferred to the end of the current transaction, and is only actually
747 * executed if this [FrpBuildScope] is still active by that time. It can be deactivated due to a
748 * -Latest combinator, for example.
749 *
750 * Shorthand for:
751 * ```kotlin
752 * now.observe { block() }
753 * ```
754 */
755 @ExperimentalFrpApi
<lambda>null756 fun FrpBuildScope.effect(block: suspend FrpEffectScope.() -> Unit): Job = now.observe { block() }
757
758 /**
759 * Launches [block] in a new coroutine, returning a [Job] bound to the coroutine.
760 *
761 * This coroutine is not actually started until the *end* of the current FRP transaction. This is
762 * done because the current [FrpBuildScope] might be deactivated within this transaction, perhaps
763 * due to a -Latest combinator. If this happens, then the coroutine will never actually be started.
764 *
765 * Shorthand for:
766 * ```kotlin
767 * effect { frpCoroutineScope.launch { block() } }
768 * ```
769 */
770 @ExperimentalFrpApi
launchEffectnull771 fun FrpBuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asyncEffect(block)
772
773 /**
774 * Launches [block] in a new coroutine, returning the result as a [Deferred].
775 *
776 * This coroutine is not actually started until the *end* of the current FRP transaction. This is
777 * done because the current [FrpBuildScope] might be deactivated within this transaction, perhaps
778 * due to a -Latest combinator. If this happens, then the coroutine will never actually be started.
779 *
780 * Shorthand for:
781 * ```kotlin
782 * CompletableDeferred<R>.apply {
783 * effect { frpCoroutineScope.launch { complete(coroutineScope { block() }) } }
784 * }
785 * .await()
786 * ```
787 */
788 @ExperimentalFrpApi
789 fun <R> FrpBuildScope.asyncEffect(block: suspend CoroutineScope.() -> R): Deferred<R> {
790 val result = CompletableDeferred<R>()
791 val job = now.observe { frpCoroutineScope.launch { result.complete(coroutineScope(block)) } }
792 val handle = job.invokeOnCompletion { result.cancel() }
793 result.invokeOnCompletion {
794 handle.dispose()
795 job.cancel()
796 }
797 return result
798 }
799
800 /** Like [FrpBuildScope.asyncScope], but ignores the result of [block]. */
launchScopenull801 @ExperimentalFrpApi fun FrpBuildScope.launchScope(block: FrpSpec<*>): Job = asyncScope(block).second
802
803 /**
804 * Creates an instance of a [TFlow] with elements that are emitted from [builder].
805 *
806 * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the provided
807 * [MutableTFlow].
808 *
809 * By default, [builder] is only running while the returned [TFlow] is being
810 * [observed][FrpBuildScope.observe]. If you want it to run at all times, simply add a no-op
811 * observer:
812 * ```kotlin
813 * tFlow { ... }.apply { observe() }
814 * ```
815 *
816 * In the event of backpressure, emissions are *coalesced* into batches. When a value is
817 * [emitted][FrpCoalescingProducerScope.emit] from [builder], it is merged into the batch via
818 * [coalesce]. Once the batch is consumed by the FRP network in the next transaction, the batch is
819 * reset back to [initialValue].
820 */
821 @ExperimentalFrpApi
822 fun <In, Out> FrpBuildScope.coalescingTFlow(
823 initialValue: Out,
824 coalesce: (old: Out, new: In) -> Out,
825 builder: suspend FrpCoalescingProducerScope<In>.() -> Unit,
826 ): TFlow<Out> = coalescingTFlow(getInitialValue = { initialValue }, coalesce, builder)
827
828 /**
829 * Creates an instance of a [TFlow] with elements that are emitted from [builder].
830 *
831 * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the provided
832 * [MutableTFlow].
833 *
834 * By default, [builder] is only running while the returned [TFlow] is being
835 * [observed][FrpBuildScope.observe]. If you want it to run at all times, simply add a no-op
836 * observer:
837 * ```kotlin
838 * tFlow { ... }.apply { observe() }
839 * ```
840 *
841 * In the event of backpressure, emissions are *conflated*; any older emissions are dropped and only
842 * the most recent emission will be used when the FRP network is ready.
843 */
844 @ExperimentalFrpApi
conflatedTFlownull845 fun <T> FrpBuildScope.conflatedTFlow(
846 builder: suspend FrpCoalescingProducerScope<T>.() -> Unit
847 ): TFlow<T> =
848 coalescingTFlow<T, Any?>(initialValue = Any(), coalesce = { _, new -> new }, builder = builder)
<lambda>null849 .mapCheap {
850 @Suppress("UNCHECKED_CAST")
851 it as T
852 }
853
854 /** Scope for emitting to a [FrpBuildScope.coalescingTFlow]. */
855 interface FrpCoalescingProducerScope<in T> {
856 /**
857 * Inserts [value] into the current batch, enqueueing it for emission from this [TFlow] if not
858 * already pending.
859 *
860 * Backpressure occurs when [emit] is called while the FRP network is currently in a
861 * transaction; if called multiple times, then emissions will be coalesced into a single batch
862 * that is then processed when the network is ready.
863 */
emitnull864 fun emit(value: T)
865 }
866
867 /** Scope for emitting to a [FrpBuildScope.tFlow]. */
868 interface FrpProducerScope<in T> {
869 /**
870 * Emits a [value] to this [TFlow], suspending the caller until the FRP transaction containing
871 * the emission has completed.
872 */
873 suspend fun emit(value: T)
874 }
875
876 /**
877 * Suspends forever. Upon cancellation, runs [block]. Useful for unregistering callbacks inside of
878 * [FrpBuildScope.tFlow] and [FrpBuildScope.coalescingTFlow].
879 */
awaitClosenull880 suspend fun awaitClose(block: () -> Unit): Nothing =
881 try {
882 awaitCancellation()
883 } finally {
884 block()
885 }
886