1 /* <lambda>null2 * Copyright (C) 2024 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.systemui.kairos 18 19 import com.android.systemui.kairos.combine as combinePure 20 import com.android.systemui.kairos.map as mapPure 21 import com.android.systemui.kairos.util.Just 22 import com.android.systemui.kairos.util.Left 23 import com.android.systemui.kairos.util.Maybe 24 import com.android.systemui.kairos.util.Right 25 import com.android.systemui.kairos.util.WithPrev 26 import com.android.systemui.kairos.util.just 27 import com.android.systemui.kairos.util.map 28 import com.android.systemui.kairos.util.none 29 import com.android.systemui.kairos.util.partitionEithers 30 import com.android.systemui.kairos.util.zipWith 31 import kotlin.coroutines.RestrictsSuspension 32 33 typealias FrpStateful<R> = suspend FrpStateScope.() -> R 34 35 /** 36 * Returns a [FrpStateful] that, when [applied][FrpStateScope.applyStateful], invokes [block] with 37 * the applier's [FrpStateScope]. 38 */ 39 // TODO: caching story? should each Scope have a cache of applied FrpStateful instances? 40 @ExperimentalFrpApi 41 @Suppress("NOTHING_TO_INLINE") 42 inline fun <A> statefully(noinline block: suspend FrpStateScope.() -> A): FrpStateful<A> = block 43 44 /** 45 * Operations that accumulate state within the FRP network. 46 * 47 * State accumulation is an ongoing process that has a lifetime. Use `-Latest` combinators, such as 48 * [mapLatestStateful], to create smaller, nested lifecycles so that accumulation isn't running 49 * longer than needed. 50 */ 51 @ExperimentalFrpApi 52 @RestrictsSuspension 53 interface FrpStateScope : FrpTransactionScope { 54 55 /** TODO */ 56 @ExperimentalFrpApi 57 // TODO: wish this could just be `deferred` but alas 58 fun <A> deferredStateScope(block: suspend FrpStateScope.() -> A): FrpDeferredValue<A> 59 60 /** 61 * Returns a [TState] that holds onto the most recently emitted value from this [TFlow], or 62 * [initialValue] if nothing has been emitted since it was constructed. 63 * 64 * Note that the value contained within the [TState] is not updated until *after* all [TFlow]s 65 * have been processed; this keeps the value of the [TState] consistent during the entire FRP 66 * transaction. 67 */ 68 @ExperimentalFrpApi fun <A> TFlow<A>.holdDeferred(initialValue: FrpDeferredValue<A>): TState<A> 69 70 /** 71 * Returns a [TFlow] that emits from a merged, incrementally-accumulated collection of [TFlow]s 72 * emitted from this, following the same "patch" rules as outlined in [foldMapIncrementally]. 73 * 74 * Conceptually this is equivalent to: 75 * ```kotlin 76 * fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally( 77 * initialTFlows: Map<K, TFlow<V>>, 78 * ): TFlow<Map<K, V>> = 79 * foldMapIncrementally(initialTFlows).map { it.merge() }.switch() 80 * ``` 81 * 82 * While the behavior is equivalent to the conceptual definition above, the implementation is 83 * significantly more efficient. 84 * 85 * @see merge 86 */ 87 @ExperimentalFrpApi 88 fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally( 89 initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>> 90 ): TFlow<Map<K, V>> 91 92 /** 93 * Returns a [TFlow] that emits from a merged, incrementally-accumulated collection of [TFlow]s 94 * emitted from this, following the same "patch" rules as outlined in [foldMapIncrementally]. 95 * 96 * Conceptually this is equivalent to: 97 * ```kotlin 98 * fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPrompt( 99 * initialTFlows: Map<K, TFlow<V>>, 100 * ): TFlow<Map<K, V>> = 101 * foldMapIncrementally(initialTFlows).map { it.merge() }.switchPromptly() 102 * ``` 103 * 104 * While the behavior is equivalent to the conceptual definition above, the implementation is 105 * significantly more efficient. 106 * 107 * @see merge 108 */ 109 @ExperimentalFrpApi 110 fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly( 111 initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>> 112 ): TFlow<Map<K, V>> 113 114 // TODO: everything below this comment can be made into extensions once we have context params 115 116 /** 117 * Returns a [TFlow] that emits from a merged, incrementally-accumulated collection of [TFlow]s 118 * emitted from this, following the same "patch" rules as outlined in [foldMapIncrementally]. 119 * 120 * Conceptually this is equivalent to: 121 * ```kotlin 122 * fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally( 123 * initialTFlows: Map<K, TFlow<V>>, 124 * ): TFlow<Map<K, V>> = 125 * foldMapIncrementally(initialTFlows).map { it.merge() }.switch() 126 * ``` 127 * 128 * While the behavior is equivalent to the conceptual definition above, the implementation is 129 * significantly more efficient. 130 * 131 * @see merge 132 */ 133 @ExperimentalFrpApi 134 fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally( 135 initialTFlows: Map<K, TFlow<V>> = emptyMap() 136 ): TFlow<Map<K, V>> = mergeIncrementally(deferredOf(initialTFlows)) 137 138 /** 139 * Returns a [TFlow] that emits from a merged, incrementally-accumulated collection of [TFlow]s 140 * emitted from this, following the same "patch" rules as outlined in [foldMapIncrementally]. 141 * 142 * Conceptually this is equivalent to: 143 * ```kotlin 144 * fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPrompt( 145 * initialTFlows: Map<K, TFlow<V>>, 146 * ): TFlow<Map<K, V>> = 147 * foldMapIncrementally(initialTFlows).map { it.merge() }.switchPromptly() 148 * ``` 149 * 150 * While the behavior is equivalent to the conceptual definition above, the implementation is 151 * significantly more efficient. 152 * 153 * @see merge 154 */ 155 @ExperimentalFrpApi 156 fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly( 157 initialTFlows: Map<K, TFlow<V>> = emptyMap() 158 ): TFlow<Map<K, V>> = mergeIncrementallyPromptly(deferredOf(initialTFlows)) 159 160 /** Applies the [FrpStateful] within this [FrpStateScope]. */ 161 @ExperimentalFrpApi suspend fun <A> FrpStateful<A>.applyStateful(): A = this() 162 163 /** 164 * Applies the [FrpStateful] within this [FrpStateScope], returning the result as an 165 * [FrpDeferredValue]. 166 */ 167 @ExperimentalFrpApi 168 fun <A> FrpStateful<A>.applyStatefulDeferred(): FrpDeferredValue<A> = deferredStateScope { 169 applyStateful() 170 } 171 172 /** 173 * Returns a [TState] that holds onto the most recently emitted value from this [TFlow], or 174 * [initialValue] if nothing has been emitted since it was constructed. 175 * 176 * Note that the value contained within the [TState] is not updated until *after* all [TFlow]s 177 * have been processed; this keeps the value of the [TState] consistent during the entire FRP 178 * transaction. 179 */ 180 @ExperimentalFrpApi 181 fun <A> TFlow<A>.hold(initialValue: A): TState<A> = holdDeferred(deferredOf(initialValue)) 182 183 /** 184 * Returns a [TFlow] the emits the result of applying [FrpStatefuls][FrpStateful] emitted from 185 * the original [TFlow]. 186 * 187 * Unlike [applyLatestStateful], state accumulation is not stopped with each subsequent emission 188 * of the original [TFlow]. 189 */ 190 @ExperimentalFrpApi fun <A> TFlow<FrpStateful<A>>.applyStatefuls(): TFlow<A> 191 192 /** 193 * Returns a [TFlow] containing the results of applying [transform] to each value of the 194 * original [TFlow]. 195 * 196 * [transform] can perform state accumulation via its [FrpStateScope] receiver. Unlike 197 * [mapLatestStateful], accumulation is not stopped with each subsequent emission of the 198 * original [TFlow]. 199 */ 200 @ExperimentalFrpApi 201 fun <A, B> TFlow<A>.mapStateful(transform: suspend FrpStateScope.(A) -> B): TFlow<B> = 202 mapPure { statefully { transform(it) } }.applyStatefuls() 203 204 /** 205 * Returns a [TState] the holds the result of applying the [FrpStateful] held by the original 206 * [TState]. 207 * 208 * Unlike [applyLatestStateful], state accumulation is not stopped with each state change. 209 */ 210 @ExperimentalFrpApi 211 fun <A> TState<FrpStateful<A>>.applyStatefuls(): TState<A> = 212 stateChanges 213 .applyStatefuls() 214 .holdDeferred(initialValue = deferredStateScope { sampleDeferred().get()() }) 215 216 /** Returns a [TFlow] that switches to the [TFlow] emitted by the original [TFlow]. */ 217 @ExperimentalFrpApi fun <A> TFlow<TFlow<A>>.flatten() = hold(emptyTFlow).switch() 218 219 /** 220 * Returns a [TFlow] containing the results of applying [transform] to each value of the 221 * original [TFlow]. 222 * 223 * [transform] can perform state accumulation via its [FrpStateScope] receiver. With each 224 * invocation of [transform], state accumulation from previous invocation is stopped. 225 */ 226 @ExperimentalFrpApi 227 fun <A, B> TFlow<A>.mapLatestStateful(transform: suspend FrpStateScope.(A) -> B): TFlow<B> = 228 mapPure { statefully { transform(it) } }.applyLatestStateful() 229 230 /** 231 * Returns a [TFlow] that switches to a new [TFlow] produced by [transform] every time the 232 * original [TFlow] emits a value. 233 * 234 * [transform] can perform state accumulation via its [FrpStateScope] receiver. With each 235 * invocation of [transform], state accumulation from previous invocation is stopped. 236 */ 237 @ExperimentalFrpApi 238 fun <A, B> TFlow<A>.flatMapLatestStateful( 239 transform: suspend FrpStateScope.(A) -> TFlow<B> 240 ): TFlow<B> = mapLatestStateful(transform).flatten() 241 242 /** 243 * Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the 244 * original [TFlow]. 245 * 246 * When each [FrpStateful] is applied, state accumulation from the previously-active 247 * [FrpStateful] is stopped. 248 */ 249 @ExperimentalFrpApi 250 fun <A> TFlow<FrpStateful<A>>.applyLatestStateful(): TFlow<A> = applyLatestStateful {}.first 251 252 /** 253 * Returns a [TState] containing the value returned by applying the [FrpStateful] held by the 254 * original [TState]. 255 * 256 * When each [FrpStateful] is applied, state accumulation from the previously-active 257 * [FrpStateful] is stopped. 258 */ 259 @ExperimentalFrpApi 260 fun <A> TState<FrpStateful<A>>.applyLatestStateful(): TState<A> { 261 val (changes, init) = stateChanges.applyLatestStateful { sample()() } 262 return changes.holdDeferred(init) 263 } 264 265 /** 266 * Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the 267 * original [TFlow], and a [FrpDeferredValue] containing the result of applying [init] 268 * immediately. 269 * 270 * When each [FrpStateful] is applied, state accumulation from the previously-active 271 * [FrpStateful] is stopped. 272 */ 273 @ExperimentalFrpApi 274 fun <A, B> TFlow<FrpStateful<B>>.applyLatestStateful( 275 init: FrpStateful<A> 276 ): Pair<TFlow<B>, FrpDeferredValue<A>> { 277 val (flow, result) = 278 mapCheap { spec -> mapOf(Unit to just(spec)) } 279 .applyLatestStatefulForKey(init = mapOf(Unit to init), numKeys = 1) 280 val outFlow: TFlow<B> = 281 flow.mapMaybe { 282 checkNotNull(it[Unit]) { "applyLatest: expected result, but none present in: $it" } 283 } 284 val outInit: FrpDeferredValue<A> = deferredTransactionScope { 285 val initResult: Map<Unit, A> = result.get() 286 check(Unit in initResult) { 287 "applyLatest: expected initial result, but none present in: $initResult" 288 } 289 @Suppress("UNCHECKED_CAST") 290 initResult.getOrDefault(Unit) { null } as A 291 } 292 return Pair(outFlow, outInit) 293 } 294 295 /** 296 * Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the 297 * original [TFlow], and a [FrpDeferredValue] containing the result of applying [init] 298 * immediately. 299 * 300 * If the [Maybe] contained within the value for an associated key is [none], then the 301 * previously-active [FrpStateful] will be stopped with no replacement. 302 * 303 * When each [FrpStateful] is applied, state accumulation from the previously-active 304 * [FrpStateful] with the same key is stopped. 305 */ 306 @ExperimentalFrpApi 307 fun <K, A, B> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKey( 308 init: FrpDeferredValue<Map<K, FrpStateful<B>>>, 309 numKeys: Int? = null, 310 ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> 311 312 /** 313 * Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the 314 * original [TFlow], and a [FrpDeferredValue] containing the result of applying [init] 315 * immediately. 316 * 317 * When each [FrpStateful] is applied, state accumulation from the previously-active 318 * [FrpStateful] with the same key is stopped. 319 * 320 * If the [Maybe] contained within the value for an associated key is [none], then the 321 * previously-active [FrpStateful] will be stopped with no replacement. 322 */ 323 @ExperimentalFrpApi 324 fun <K, A, B> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKey( 325 init: Map<K, FrpStateful<B>>, 326 numKeys: Int? = null, 327 ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> = 328 applyLatestStatefulForKey(deferredOf(init), numKeys) 329 330 /** 331 * Returns a [TState] containing the latest results of applying each [FrpStateful] emitted from 332 * the original [TFlow]. 333 * 334 * When each [FrpStateful] is applied, state accumulation from the previously-active 335 * [FrpStateful] with the same key is stopped. 336 * 337 * If the [Maybe] contained within the value for an associated key is [none], then the 338 * previously-active [FrpStateful] will be stopped with no replacement. 339 */ 340 @ExperimentalFrpApi 341 fun <K, A> TFlow<Map<K, Maybe<FrpStateful<A>>>>.holdLatestStatefulForKey( 342 init: FrpDeferredValue<Map<K, FrpStateful<A>>>, 343 numKeys: Int? = null, 344 ): TState<Map<K, A>> { 345 val (changes, initialValues) = applyLatestStatefulForKey(init, numKeys) 346 return changes.foldMapIncrementally(initialValues) 347 } 348 349 /** 350 * Returns a [TState] containing the latest results of applying each [FrpStateful] emitted from 351 * the original [TFlow]. 352 * 353 * When each [FrpStateful] is applied, state accumulation from the previously-active 354 * [FrpStateful] with the same key is stopped. 355 * 356 * If the [Maybe] contained within the value for an associated key is [none], then the 357 * previously-active [FrpStateful] will be stopped with no replacement. 358 */ 359 @ExperimentalFrpApi 360 fun <K, A> TFlow<Map<K, Maybe<FrpStateful<A>>>>.holdLatestStatefulForKey( 361 init: Map<K, FrpStateful<A>> = emptyMap(), 362 numKeys: Int? = null, 363 ): TState<Map<K, A>> = holdLatestStatefulForKey(deferredOf(init), numKeys) 364 365 /** 366 * Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the 367 * original [TFlow], and a [FrpDeferredValue] containing the result of applying [init] 368 * immediately. 369 * 370 * When each [FrpStateful] is applied, state accumulation from the previously-active 371 * [FrpStateful] with the same key is stopped. 372 * 373 * If the [Maybe] contained within the value for an associated key is [none], then the 374 * previously-active [FrpStateful] will be stopped with no replacement. 375 */ 376 @ExperimentalFrpApi 377 fun <K, A> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKey( 378 numKeys: Int? = null 379 ): TFlow<Map<K, Maybe<A>>> = 380 applyLatestStatefulForKey(init = emptyMap<K, FrpStateful<*>>(), numKeys = numKeys).first 381 382 /** 383 * Returns a [TFlow] containing the results of applying [transform] to each value of the 384 * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to 385 * [initialValues] immediately. 386 * 387 * [transform] can perform state accumulation via its [FrpStateScope] receiver. With each 388 * invocation of [transform], state accumulation from previous invocation is stopped. 389 * 390 * If the [Maybe] contained within the value for an associated key is [none], then the 391 * previously-active [FrpStateScope] will be stopped with no replacement. 392 */ 393 @ExperimentalFrpApi 394 fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey( 395 initialValues: FrpDeferredValue<Map<K, A>>, 396 numKeys: Int? = null, 397 transform: suspend FrpStateScope.(A) -> B, 398 ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> = 399 mapPure { patch -> patch.mapValues { (_, v) -> v.map { statefully { transform(it) } } } } 400 .applyLatestStatefulForKey( 401 deferredStateScope { 402 initialValues.get().mapValues { (_, v) -> statefully { transform(v) } } 403 }, 404 numKeys = numKeys, 405 ) 406 407 /** 408 * Returns a [TFlow] containing the results of applying [transform] to each value of the 409 * original [TFlow], and a [FrpDeferredValue] containing the result of applying [transform] to 410 * [initialValues] immediately. 411 * 412 * [transform] can perform state accumulation via its [FrpStateScope] receiver. With each 413 * invocation of [transform], state accumulation from previous invocation is stopped. 414 * 415 * If the [Maybe] contained within the value for an associated key is [none], then the 416 * previously-active [FrpStateScope] will be stopped with no replacement. 417 */ 418 @ExperimentalFrpApi 419 fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey( 420 initialValues: Map<K, A>, 421 numKeys: Int? = null, 422 transform: suspend FrpStateScope.(A) -> B, 423 ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> = 424 mapLatestStatefulForKey(deferredOf(initialValues), numKeys, transform) 425 426 /** 427 * Returns a [TFlow] containing the results of applying [transform] to each value of the 428 * original [TFlow]. 429 * 430 * [transform] can perform state accumulation via its [FrpStateScope] receiver. With each 431 * invocation of [transform], state accumulation from previous invocation is stopped. 432 * 433 * If the [Maybe] contained within the value for an associated key is [none], then the 434 * previously-active [FrpStateScope] will be stopped with no replacement. 435 */ 436 @ExperimentalFrpApi 437 fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey( 438 numKeys: Int? = null, 439 transform: suspend FrpStateScope.(A) -> B, 440 ): TFlow<Map<K, Maybe<B>>> = mapLatestStatefulForKey(emptyMap(), numKeys, transform).first 441 442 /** 443 * Returns a [TFlow] that will only emit the next event of the original [TFlow], and then will 444 * act as [emptyTFlow]. 445 * 446 * If the original [TFlow] is emitting an event at this exact time, then it will be the only 447 * even emitted from the result [TFlow]. 448 */ 449 @ExperimentalFrpApi 450 fun <A> TFlow<A>.nextOnly(): TFlow<A> = 451 if (this === emptyTFlow) { 452 this 453 } else { 454 TFlowLoop<A>().also { 455 it.loopback = it.mapCheap { emptyTFlow }.hold(this@nextOnly).switch() 456 } 457 } 458 459 /** Returns a [TFlow] that skips the next emission of the original [TFlow]. */ 460 @ExperimentalFrpApi 461 fun <A> TFlow<A>.skipNext(): TFlow<A> = 462 if (this === emptyTFlow) { 463 this 464 } else { 465 nextOnly().mapCheap { this@skipNext }.hold(emptyTFlow).switch() 466 } 467 468 /** 469 * Returns a [TFlow] that emits values from the original [TFlow] up until [stop] emits a value. 470 * 471 * If the original [TFlow] emits at the same time as [stop], then the returned [TFlow] will emit 472 * that value. 473 */ 474 @ExperimentalFrpApi 475 fun <A> TFlow<A>.takeUntil(stop: TFlow<*>): TFlow<A> = 476 if (stop === emptyTFlow) { 477 this 478 } else { 479 stop.mapCheap { emptyTFlow }.nextOnly().hold(this).switch() 480 } 481 482 /** 483 * Invokes [stateful] in a new [FrpStateScope] that is a child of this one. 484 * 485 * This new scope is stopped when [stop] first emits a value, or when the parent scope is 486 * stopped. Stopping will end all state accumulation; any [TStates][TState] returned from this 487 * scope will no longer update. 488 */ 489 @ExperimentalFrpApi 490 fun <A> childStateScope(stop: TFlow<*>, stateful: FrpStateful<A>): FrpDeferredValue<A> { 491 val (_, init: FrpDeferredValue<Map<Unit, A>>) = 492 stop 493 .nextOnly() 494 .mapPure { mapOf(Unit to none<FrpStateful<A>>()) } 495 .applyLatestStatefulForKey(init = mapOf(Unit to stateful), numKeys = 1) 496 return deferredStateScope { init.get().getValue(Unit) } 497 } 498 499 /** 500 * Returns a [TFlow] that emits values from the original [TFlow] up to and including a value is 501 * emitted that satisfies [predicate]. 502 */ 503 @ExperimentalFrpApi 504 fun <A> TFlow<A>.takeUntil(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> = 505 takeUntil(filter(predicate)) 506 507 /** 508 * Returns a [TState] that is incrementally updated when this [TFlow] emits a value, by applying 509 * [transform] to both the emitted value and the currently tracked state. 510 * 511 * Note that the value contained within the [TState] is not updated until *after* all [TFlow]s 512 * have been processed; this keeps the value of the [TState] consistent during the entire FRP 513 * transaction. 514 */ 515 @ExperimentalFrpApi 516 fun <A, B> TFlow<A>.fold( 517 initialValue: B, 518 transform: suspend FrpTransactionScope.(A, B) -> B, 519 ): TState<B> { 520 lateinit var state: TState<B> 521 return mapPure { a -> transform(a, state.sample()) }.hold(initialValue).also { state = it } 522 } 523 524 /** 525 * Returns a [TState] that is incrementally updated when this [TFlow] emits a value, by applying 526 * [transform] to both the emitted value and the currently tracked state. 527 * 528 * Note that the value contained within the [TState] is not updated until *after* all [TFlow]s 529 * have been processed; this keeps the value of the [TState] consistent during the entire FRP 530 * transaction. 531 */ 532 @ExperimentalFrpApi 533 fun <A, B> TFlow<A>.foldDeferred( 534 initialValue: FrpDeferredValue<B>, 535 transform: suspend FrpTransactionScope.(A, B) -> B, 536 ): TState<B> { 537 lateinit var state: TState<B> 538 return mapPure { a -> transform(a, state.sample()) } 539 .holdDeferred(initialValue) 540 .also { state = it } 541 } 542 543 /** 544 * Returns a [TState] that holds onto the result of applying the most recently emitted 545 * [FrpStateful] this [TFlow], or [init] if nothing has been emitted since it was constructed. 546 * 547 * When each [FrpStateful] is applied, state accumulation from the previously-active 548 * [FrpStateful] is stopped. 549 * 550 * Note that the value contained within the [TState] is not updated until *after* all [TFlow]s 551 * have been processed; this keeps the value of the [TState] consistent during the entire FRP 552 * transaction. 553 * 554 * Shorthand for: 555 * ```kotlin 556 * val (changes, initApplied) = applyLatestStateful(init) 557 * return changes.toTStateDeferred(initApplied) 558 * ``` 559 */ 560 @ExperimentalFrpApi 561 fun <A> TFlow<FrpStateful<A>>.holdLatestStateful(init: FrpStateful<A>): TState<A> { 562 val (changes, initApplied) = applyLatestStateful(init) 563 return changes.holdDeferred(initApplied) 564 } 565 566 /** 567 * Returns a [TFlow] that emits the two most recent emissions from the original [TFlow]. 568 * [initialValue] is used as the previous value for the first emission. 569 * 570 * Shorthand for `sample(hold(init)) { new, old -> Pair(old, new) }` 571 */ 572 @ExperimentalFrpApi 573 fun <S, T : S> TFlow<T>.pairwise(initialValue: S): TFlow<WithPrev<S, T>> { 574 val previous = hold(initialValue) 575 return mapCheap { new -> WithPrev(previousValue = previous.sample(), newValue = new) } 576 } 577 578 /** 579 * Returns a [TFlow] that emits the two most recent emissions from the original [TFlow]. Note 580 * that the returned [TFlow] will not emit until the original [TFlow] has emitted twice. 581 */ 582 @ExperimentalFrpApi 583 fun <A> TFlow<A>.pairwise(): TFlow<WithPrev<A, A>> = 584 mapCheap { just(it) } 585 .pairwise(none) 586 .mapMaybe { (prev, next) -> prev.zipWith(next, ::WithPrev) } 587 588 /** 589 * Returns a [TState] that holds both the current and previous values of the original [TState]. 590 * [initialPreviousValue] is used as the first previous value. 591 * 592 * Shorthand for `sample(hold(init)) { new, old -> Pair(old, new) }` 593 */ 594 @ExperimentalFrpApi 595 fun <S, T : S> TState<T>.pairwise(initialPreviousValue: S): TState<WithPrev<S, T>> = 596 stateChanges 597 .pairwise(initialPreviousValue) 598 .holdDeferred(deferredTransactionScope { WithPrev(initialPreviousValue, sample()) }) 599 600 /** 601 * Returns a [TState] holding a [Map] that is updated incrementally whenever this emits a value. 602 * 603 * The value emitted is used as a "patch" for the tracked [Map]; for each key [K] in the emitted 604 * map, an associated value of [Just] will insert or replace the value in the tracked [Map], and 605 * an associated value of [none] will remove the key from the tracked [Map]. 606 */ 607 @ExperimentalFrpApi 608 fun <K, V> TFlow<Map<K, Maybe<V>>>.foldMapIncrementally( 609 initialValues: FrpDeferredValue<Map<K, V>> 610 ): TState<Map<K, V>> = 611 foldDeferred(initialValues) { patch, map -> 612 val (adds: List<Pair<K, V>>, removes: List<K>) = 613 patch 614 .asSequence() 615 .map { (k, v) -> if (v is Just) Left(k to v.value) else Right(k) } 616 .partitionEithers() 617 val removed: Map<K, V> = map - removes.toSet() 618 val updated: Map<K, V> = removed + adds 619 updated 620 } 621 622 /** 623 * Returns a [TState] holding a [Map] that is updated incrementally whenever this emits a value. 624 * 625 * The value emitted is used as a "patch" for the tracked [Map]; for each key [K] in the emitted 626 * map, an associated value of [Just] will insert or replace the value in the tracked [Map], and 627 * an associated value of [none] will remove the key from the tracked [Map]. 628 */ 629 @ExperimentalFrpApi 630 fun <K, V> TFlow<Map<K, Maybe<V>>>.foldMapIncrementally( 631 initialValues: Map<K, V> = emptyMap() 632 ): TState<Map<K, V>> = foldMapIncrementally(deferredOf(initialValues)) 633 634 /** 635 * Returns a [TFlow] that wraps each emission of the original [TFlow] into an [IndexedValue], 636 * containing the emitted value and its index (starting from zero). 637 * 638 * Shorthand for: 639 * ``` 640 * val index = fold(0) { _, oldIdx -> oldIdx + 1 } 641 * sample(index) { a, idx -> IndexedValue(idx, a) } 642 * ``` 643 */ 644 @ExperimentalFrpApi 645 fun <A> TFlow<A>.withIndex(): TFlow<IndexedValue<A>> { 646 val index = fold(0) { _, old -> old + 1 } 647 return sample(index) { a, idx -> IndexedValue(idx, a) } 648 } 649 650 /** 651 * Returns a [TFlow] containing the results of applying [transform] to each value of the 652 * original [TFlow] and its index (starting from zero). 653 * 654 * Shorthand for: 655 * ``` 656 * withIndex().map { (idx, a) -> transform(idx, a) } 657 * ``` 658 */ 659 @ExperimentalFrpApi 660 fun <A, B> TFlow<A>.mapIndexed(transform: suspend FrpTransactionScope.(Int, A) -> B): TFlow<B> { 661 val index = fold(0) { _, i -> i + 1 } 662 return sample(index) { a, idx -> transform(idx, a) } 663 } 664 665 /** Returns a [TFlow] where all subsequent repetitions of the same value are filtered out. */ 666 @ExperimentalFrpApi 667 fun <A> TFlow<A>.distinctUntilChanged(): TFlow<A> { 668 val state: TState<Any?> = hold(Any()) 669 return filter { it != state.sample() } 670 } 671 672 /** 673 * Returns a new [TFlow] that emits at the same rate as the original [TFlow], but combines the 674 * emitted value with the most recent emission from [other] using [transform]. 675 * 676 * Note that the returned [TFlow] will not emit anything until [other] has emitted at least one 677 * value. 678 */ 679 @ExperimentalFrpApi 680 fun <A, B, C> TFlow<A>.sample( 681 other: TFlow<B>, 682 transform: suspend FrpTransactionScope.(A, B) -> C, 683 ): TFlow<C> { 684 val state = other.mapCheap { just(it) }.hold(none) 685 return sample(state) { a, b -> b.map { transform(a, it) } }.filterJust() 686 } 687 688 /** 689 * Returns a [TState] that samples the [Transactional] held by the given [TState] within the 690 * same transaction that the state changes. 691 */ 692 @ExperimentalFrpApi 693 fun <A> TState<Transactional<A>>.sampleTransactionals(): TState<A> = 694 stateChanges 695 .sampleTransactionals() 696 .holdDeferred(deferredTransactionScope { sample().sample() }) 697 698 /** 699 * Returns a [TState] that transforms the value held inside this [TState] by applying it to the 700 * given function [transform]. 701 */ 702 @ExperimentalFrpApi 703 fun <A, B> TState<A>.map(transform: suspend FrpTransactionScope.(A) -> B): TState<B> = 704 mapPure { transactionally { transform(it) } }.sampleTransactionals() 705 706 /** 707 * Returns a [TState] whose value is generated with [transform] by combining the current values 708 * of each given [TState]. 709 * 710 * @see TState.combineWith 711 */ 712 @ExperimentalFrpApi 713 fun <A, B, Z> combine( 714 stateA: TState<A>, 715 stateB: TState<B>, 716 transform: suspend FrpTransactionScope.(A, B) -> Z, 717 ): TState<Z> = 718 com.android.systemui.kairos 719 .combine(stateA, stateB) { a, b -> transactionally { transform(a, b) } } 720 .sampleTransactionals() 721 722 /** 723 * Returns a [TState] whose value is generated with [transform] by combining the current values 724 * of each given [TState]. 725 * 726 * @see TState.combineWith 727 */ 728 @ExperimentalFrpApi 729 fun <A, B, C, D, Z> combine( 730 stateA: TState<A>, 731 stateB: TState<B>, 732 stateC: TState<C>, 733 stateD: TState<D>, 734 transform: suspend FrpTransactionScope.(A, B, C, D) -> Z, 735 ): TState<Z> = 736 com.android.systemui.kairos 737 .combine(stateA, stateB, stateC, stateD) { a, b, c, d -> 738 transactionally { transform(a, b, c, d) } 739 } 740 .sampleTransactionals() 741 742 /** Returns a [TState] by applying [transform] to the value held by the original [TState]. */ 743 @ExperimentalFrpApi 744 fun <A, B> TState<A>.flatMap( 745 transform: suspend FrpTransactionScope.(A) -> TState<B> 746 ): TState<B> = mapPure { transactionally { transform(it) } }.sampleTransactionals().flatten() 747 748 /** 749 * Returns a [TState] whose value is generated with [transform] by combining the current values 750 * of each given [TState]. 751 * 752 * @see TState.combineWith 753 */ 754 @ExperimentalFrpApi 755 fun <A, Z> combine( 756 vararg states: TState<A>, 757 transform: suspend FrpTransactionScope.(List<A>) -> Z, 758 ): TState<Z> = combinePure(*states).map(transform) 759 760 /** 761 * Returns a [TState] whose value is generated with [transform] by combining the current values 762 * of each given [TState]. 763 * 764 * @see TState.combineWith 765 */ 766 @ExperimentalFrpApi 767 fun <A, Z> Iterable<TState<A>>.combine( 768 transform: suspend FrpTransactionScope.(List<A>) -> Z 769 ): TState<Z> = combinePure().map(transform) 770 771 /** 772 * Returns a [TState] by combining the values held inside the given [TState]s by applying them 773 * to the given function [transform]. 774 */ 775 @ExperimentalFrpApi 776 fun <A, B, C> TState<A>.combineWith( 777 other: TState<B>, 778 transform: suspend FrpTransactionScope.(A, B) -> C, 779 ): TState<C> = combine(this, other, transform) 780 } 781