1 /*
<lambda>null2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 @file:OptIn(ExperimentalCoroutinesApi::class, ExperimentalFrpApi::class)
18
19 package com.android.systemui.kairos
20
21 import com.android.systemui.kairos.util.Either
22 import com.android.systemui.kairos.util.Left
23 import com.android.systemui.kairos.util.Maybe
24 import com.android.systemui.kairos.util.None
25 import com.android.systemui.kairos.util.Right
26 import com.android.systemui.kairos.util.just
27 import com.android.systemui.kairos.util.map
28 import com.android.systemui.kairos.util.maybe
29 import com.android.systemui.kairos.util.none
30 import kotlin.time.Duration
31 import kotlin.time.Duration.Companion.seconds
32 import kotlin.time.DurationUnit
33 import kotlin.time.measureTime
34 import kotlinx.coroutines.CompletableDeferred
35 import kotlinx.coroutines.ExperimentalCoroutinesApi
36 import kotlinx.coroutines.async
37 import kotlinx.coroutines.awaitCancellation
38 import kotlinx.coroutines.flow.Flow
39 import kotlinx.coroutines.flow.MutableSharedFlow
40 import kotlinx.coroutines.flow.MutableStateFlow
41 import kotlinx.coroutines.flow.SharedFlow
42 import kotlinx.coroutines.flow.SharingStarted
43 import kotlinx.coroutines.flow.StateFlow
44 import kotlinx.coroutines.flow.first
45 import kotlinx.coroutines.flow.flow
46 import kotlinx.coroutines.flow.map
47 import kotlinx.coroutines.flow.onEach
48 import kotlinx.coroutines.flow.stateIn
49 import kotlinx.coroutines.flow.toCollection
50 import kotlinx.coroutines.launch
51 import kotlinx.coroutines.test.TestScope
52 import kotlinx.coroutines.test.runCurrent
53 import kotlinx.coroutines.test.runTest
54 import org.junit.Assert.assertNull
55 import org.junit.Assert.assertTrue
56 import org.junit.Test
57
58 class KairosTests {
59
60 @Test
61 fun basic() = runFrpTest { network ->
62 val emitter = network.mutableTFlow<Int>()
63 var result: Int? = null
64 activateSpec(network) { emitter.observe { result = it } }
65 runCurrent()
66 emitter.emit(3)
67 runCurrent()
68 assertEquals(3, result)
69 runCurrent()
70 }
71
72 @Test
73 fun basicTFlow() = runFrpTest { network ->
74 val emitter = network.mutableTFlow<Int>()
75 println("starting network")
76 val result = activateSpecWithResult(network) { emitter.nextDeferred() }
77 runCurrent()
78 println("emitting")
79 emitter.emit(3)
80 runCurrent()
81 println("awaiting")
82 assertEquals(3, result.await())
83 runCurrent()
84 }
85
86 @Test
87 fun basicTState() = runFrpTest { network ->
88 val emitter = network.mutableTFlow<Int>()
89 val result = activateSpecWithResult(network) { emitter.hold(0).stateChanges.nextDeferred() }
90 runCurrent()
91
92 emitter.emit(3)
93 runCurrent()
94
95 assertEquals(3, result.await())
96 }
97
98 @Test
99 fun basicEvent() = runFrpTest { network ->
100 val emitter = MutableSharedFlow<Int>()
101 val result = activateSpecWithResult(network) { async { emitter.first() } }
102 runCurrent()
103 emitter.emit(1)
104 runCurrent()
105 assertTrue("Result eventual has not completed.", result.isCompleted)
106 assertEquals(1, result.await())
107 }
108
109 @Test
110 fun basicTransactional() = runFrpTest { network ->
111 var value: Int? = null
112 var bSource = 1
113 val emitter = network.mutableTFlow<Unit>()
114 // Sampling this transactional will increment the source count.
115 val transactional = transactionally { bSource++ }
116 measureTime {
117 activateSpecWithResult(network) {
118 // Two different flows that sample the same transactional.
119 (0 until 2).map {
120 val sampled = emitter.sample(transactional) { _, v -> v }
121 sampled.toSharedFlow()
122 }
123 }
124 .forEach { backgroundScope.launch { it.collect { value = it } } }
125 runCurrent()
126 }
127 .also { println("setup: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
128
129 measureTime {
130 emitter.emit(Unit)
131 runCurrent()
132 }
133 .also { println("emit 1: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
134
135 // Even though the transactional would be sampled twice, the first result is cached.
136 assertEquals(2, bSource)
137 assertEquals(1, value)
138
139 measureTime {
140 bSource = 10
141 emitter.emit(Unit)
142 runCurrent()
143 }
144 .also { println("emit 2: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
145
146 assertEquals(11, bSource)
147 assertEquals(10, value)
148 }
149
150 @Test
151 fun diamondGraph() = runFrpTest { network ->
152 val flow = network.mutableTFlow<Int>()
153 val outFlow =
154 activateSpecWithResult(network) {
155 // map TFlow like we map Flow
156 val left = flow.map { "left" to it }.onEach { println("left: $it") }
157 val right = flow.map { "right" to it }.onEach { println("right: $it") }
158
159 // convert TFlows to TStates so that they can be combined
160 val combined =
161 left.hold("left" to 0).combineWith(right.hold("right" to 0)) { l, r -> l to r }
162 combined.stateChanges // get TState changes
163 .onEach { println("merged: $it") }
164 .toSharedFlow() // convert back to Flow
165 }
166 runCurrent()
167
168 val results = mutableListOf<Pair<Pair<String, Int>, Pair<String, Int>>>()
169 backgroundScope.launch { outFlow.toCollection(results) }
170 runCurrent()
171
172 flow.emit(1)
173 runCurrent()
174
175 flow.emit(2)
176 runCurrent()
177
178 assertEquals(
179 listOf(("left" to 1) to ("right" to 1), ("left" to 2) to ("right" to 2)),
180 results,
181 )
182 }
183
184 @Test
185 fun staticNetwork() = runFrpTest { network ->
186 var finalSum: Int? = null
187
188 val intEmitter = network.mutableTFlow<Int>()
189 val sampleEmitter = network.mutableTFlow<Unit>()
190
191 activateSpecWithResult(network) {
192 val updates = intEmitter.map { a -> { b: Int -> a + b } }
193
194 val sumD =
195 TStateLoop<Int>().apply {
196 loopback =
197 updates
198 .sample(this) { f, sum -> f(sum) }
199 .onEach { println("sum update: $it") }
200 .hold(0)
201 }
202 sampleEmitter
203 .onEach { println("sampleEmitter emitted") }
204 .sample(sumD) { _, sum -> sum }
205 .onEach { println("sampled: $it") }
206 .nextDeferred()
207 }
208 .let { launch { finalSum = it.await() } }
209
210 runCurrent()
211
212 (1..5).forEach { i ->
213 println("emitting: $i")
214 intEmitter.emit(i)
215 runCurrent()
216 }
217 runCurrent()
218
219 sampleEmitter.emit(Unit)
220 runCurrent()
221
222 assertEquals(15, finalSum)
223 }
224
225 @Test
226 fun recursiveDefinition() = runFrpTest { network ->
227 var wasSold = false
228 var currentAmt: Int? = null
229
230 val coin = network.mutableTFlow<Unit>()
231 val price = 50
232 val frpSpec = frpSpec {
233 val eSold = TFlowLoop<Unit>()
234
235 val eInsert =
236 coin.map {
237 { runningTotal: Int ->
238 println("TEST: $runningTotal - 10 = ${runningTotal - 10}")
239 runningTotal - 10
240 }
241 }
242
243 val eReset =
244 eSold.map {
245 { _: Int ->
246 println("TEST: Resetting")
247 price
248 }
249 }
250
251 val eUpdate = eInsert.mergeWith(eReset) { f, g -> { a -> g(f(a)) } }
252
253 val dTotal = TStateLoop<Int>()
254 dTotal.loopback = eUpdate.sample(dTotal) { f, total -> f(total) }.hold(price)
255
256 val eAmt = dTotal.stateChanges
257 val bAmt = transactionally { dTotal.sample() }
258 eSold.loopback =
259 coin
260 .sample(bAmt) { coin, total -> coin to total }
261 .mapMaybe { (_, total) -> maybe { guard { total <= 10 } } }
262
263 val amts = eAmt.filter { amt -> amt >= 0 }
264
265 amts.observe { currentAmt = it }
266 eSold.observe { wasSold = true }
267
268 eSold.nextDeferred()
269 }
270
271 activateSpec(network) { frpSpec.applySpec() }
272
273 runCurrent()
274
275 println()
276 println()
277 coin.emit(Unit)
278 runCurrent()
279
280 assertEquals(40, currentAmt)
281
282 println()
283 println()
284 coin.emit(Unit)
285 runCurrent()
286
287 assertEquals(30, currentAmt)
288
289 println()
290 println()
291 coin.emit(Unit)
292 runCurrent()
293
294 assertEquals(20, currentAmt)
295
296 println()
297 println()
298 coin.emit(Unit)
299 runCurrent()
300
301 assertEquals(10, currentAmt)
302 assertEquals(false, wasSold)
303
304 println()
305 println()
306 coin.emit(Unit)
307 runCurrent()
308
309 assertEquals(true, wasSold)
310 assertEquals(50, currentAmt)
311 }
312
313 @Test
314 fun promptCleanup() = runFrpTest { network ->
315 val emitter = network.mutableTFlow<Int>()
316 val stopper = network.mutableTFlow<Unit>()
317
318 var result: Int? = null
319
320 val flow = activateSpecWithResult(network) { emitter.takeUntil(stopper).toSharedFlow() }
321 backgroundScope.launch { flow.collect { result = it } }
322 runCurrent()
323
324 emitter.emit(2)
325 runCurrent()
326
327 assertEquals(2, result)
328
329 stopper.emit(Unit)
330 runCurrent()
331 }
332
333 @Test
334 fun switchTFlow() = runFrpTest { network ->
335 var currentSum: Int? = null
336
337 val switchHandler = network.mutableTFlow<Pair<TFlow<Int>, String>>()
338 val aHandler = network.mutableTFlow<Int>()
339 val stopHandler = network.mutableTFlow<Unit>()
340 val bHandler = network.mutableTFlow<Int>()
341
342 val sumFlow =
343 activateSpecWithResult(network) {
344 val switchE = TFlowLoop<TFlow<Int>>()
345 switchE.loopback =
346 switchHandler.mapStateful { (intFlow, name) ->
347 println("[onEach] Switching to: $name")
348 val nextSwitch =
349 switchE.skipNext().onEach { println("[onEach] switched-out") }
350 val stopEvent =
351 stopHandler
352 .onEach { println("[onEach] stopped") }
353 .mergeWith(nextSwitch) { _, b -> b }
354 intFlow.takeUntil(stopEvent)
355 }
356
357 val adderE: TFlow<(Int) -> Int> =
358 switchE.hold(emptyTFlow).switch().map { a ->
359 println("[onEach] new number $a")
360 ({ sum: Int ->
361 println("$a+$sum=${a + sum}")
362 sum + a
363 })
364 }
365
366 val sumD = TStateLoop<Int>()
367 sumD.loopback =
368 adderE
369 .sample(sumD) { f, sum -> f(sum) }
370 .onEach { println("[onEach] writing sum: $it") }
371 .hold(0)
372 val sumE = sumD.stateChanges
373
374 sumE.toSharedFlow()
375 }
376
377 runCurrent()
378
379 backgroundScope.launch { sumFlow.collect { currentSum = it } }
380
381 runCurrent()
382
383 switchHandler.emit(aHandler to "A")
384 runCurrent()
385
386 aHandler.emit(1)
387 runCurrent()
388
389 assertEquals(1, currentSum)
390
391 aHandler.emit(2)
392 runCurrent()
393
394 assertEquals(3, currentSum)
395
396 aHandler.emit(3)
397 runCurrent()
398
399 assertEquals(6, currentSum)
400
401 aHandler.emit(4)
402 runCurrent()
403
404 assertEquals(10, currentSum)
405
406 aHandler.emit(5)
407 runCurrent()
408
409 assertEquals(15, currentSum)
410
411 switchHandler.emit(bHandler to "B")
412 runCurrent()
413
414 aHandler.emit(6)
415 runCurrent()
416
417 assertEquals(15, currentSum)
418
419 bHandler.emit(6)
420 runCurrent()
421
422 assertEquals(21, currentSum)
423
424 bHandler.emit(7)
425 runCurrent()
426
427 assertEquals(28, currentSum)
428
429 bHandler.emit(8)
430 runCurrent()
431
432 assertEquals(36, currentSum)
433
434 bHandler.emit(9)
435 runCurrent()
436
437 assertEquals(45, currentSum)
438
439 bHandler.emit(10)
440 runCurrent()
441
442 assertEquals(55, currentSum)
443
444 println()
445 println("Stopping: B")
446 stopHandler.emit(Unit) // bHandler.complete()
447 runCurrent()
448
449 bHandler.emit(20)
450 runCurrent()
451
452 assertEquals(55, currentSum)
453
454 println()
455 println("Switching to: A2")
456 switchHandler.emit(aHandler to "A2")
457 runCurrent()
458
459 println("aHandler.emit(11)")
460 aHandler.emit(11)
461 runCurrent()
462
463 assertEquals(66, currentSum)
464
465 aHandler.emit(12)
466 runCurrent()
467
468 assertEquals(78, currentSum)
469
470 aHandler.emit(13)
471 runCurrent()
472
473 assertEquals(91, currentSum)
474
475 aHandler.emit(14)
476 runCurrent()
477
478 assertEquals(105, currentSum)
479
480 aHandler.emit(15)
481 runCurrent()
482
483 assertEquals(120, currentSum)
484
485 stopHandler.emit(Unit)
486 runCurrent()
487
488 aHandler.emit(100)
489 runCurrent()
490
491 assertEquals(120, currentSum)
492 }
493
494 @Test
495 fun switchIndirect() = runFrpTest { network ->
496 val emitter = network.mutableTFlow<Unit>()
497 activateSpec(network) {
498 emptyTFlow.map { emitter.map { 1 } }.flatten().map { "$it" }.observe()
499 }
500 runCurrent()
501 }
502
503 @Test
504 fun switchInWithResult() = runFrpTest { network ->
505 val emitter = network.mutableTFlow<Unit>()
506 val out =
507 activateSpecWithResult(network) {
508 emitter.map { emitter.map { 1 } }.flatten().toSharedFlow()
509 }
510 val result = out.stateIn(backgroundScope, SharingStarted.Eagerly, null)
511 runCurrent()
512 emitter.emit(Unit)
513 runCurrent()
514 assertEquals(null, result.value)
515 }
516
517 @Test
518 fun switchInCompleted() = runFrpTest { network ->
519 val outputs = mutableListOf<Int>()
520
521 val switchAH = network.mutableTFlow<Unit>()
522 val intAH = network.mutableTFlow<Int>()
523 val stopEmitter = network.mutableTFlow<Unit>()
524
525 val top = frpSpec {
526 val intS = intAH.takeUntil(stopEmitter)
527 val switched = switchAH.map { intS }.flatten()
528 switched.toSharedFlow()
529 }
530 val flow = activateSpecWithResult(network) { top.applySpec() }
531 backgroundScope.launch { flow.collect { outputs.add(it) } }
532 runCurrent()
533
534 switchAH.emit(Unit)
535 runCurrent()
536
537 stopEmitter.emit(Unit)
538 runCurrent()
539
540 // assertEquals(0, intAH.subscriptionCount.value)
541 intAH.emit(10)
542 runCurrent()
543
544 assertEquals(true, outputs.isEmpty())
545
546 switchAH.emit(Unit)
547 runCurrent()
548
549 // assertEquals(0, intAH.subscriptionCount.value)
550 intAH.emit(10)
551 runCurrent()
552
553 assertEquals(true, outputs.isEmpty())
554 }
555
556 @Test
557 fun switchTFlow_outerCompletesFirst() = runFrpTest { network ->
558 var stepResult: Int? = null
559
560 val switchAH = network.mutableTFlow<Unit>()
561 val switchStopEmitter = network.mutableTFlow<Unit>()
562 val intStopEmitter = network.mutableTFlow<Unit>()
563 val intAH = network.mutableTFlow<Int>()
564 val flow =
565 activateSpecWithResult(network) {
566 val intS = intAH.takeUntil(intStopEmitter)
567 val switchS = switchAH.takeUntil(switchStopEmitter)
568
569 val switched = switchS.map { intS }.flatten()
570 switched.toSharedFlow()
571 }
572 backgroundScope.launch { flow.collect { stepResult = it } }
573 runCurrent()
574
575 // assertEquals(0, intAH.subscriptionCount.value)
576 intAH.emit(100)
577 runCurrent()
578
579 assertEquals(null, stepResult)
580
581 switchAH.emit(Unit)
582 runCurrent()
583
584 // assertEquals(1, intAH.subscriptionCount.value)
585
586 intAH.emit(5)
587 runCurrent()
588
589 assertEquals(5, stepResult)
590
591 println("stop outer")
592 switchStopEmitter.emit(Unit) // switchAH.complete()
593 runCurrent()
594
595 // assertEquals(1, intAH.subscriptionCount.value)
596 // assertEquals(0, switchAH.subscriptionCount.value)
597
598 intAH.emit(10)
599 runCurrent()
600
601 assertEquals(10, stepResult)
602
603 println("stop inner")
604 intStopEmitter.emit(Unit) // intAH.complete()
605 runCurrent()
606
607 // assertEquals(just(10), network.await())
608 }
609
610 @Test
611 fun mapTFlow() = runFrpTest { network ->
612 val emitter = network.mutableTFlow<Int>()
613 var stepResult: Int? = null
614
615 val flow =
616 activateSpecWithResult(network) {
617 val mappedS = emitter.map { it * it }
618 mappedS.toSharedFlow()
619 }
620
621 backgroundScope.launch { flow.collect { stepResult = it } }
622 runCurrent()
623
624 emitter.emit(1)
625 runCurrent()
626
627 assertEquals(1, stepResult)
628
629 emitter.emit(2)
630 runCurrent()
631
632 assertEquals(4, stepResult)
633
634 emitter.emit(10)
635 runCurrent()
636
637 assertEquals(100, stepResult)
638 }
639
640 @Test
641 fun mapTransactional() = runFrpTest { network ->
642 var doubledResult: Int? = null
643 var pullValue = 0
644 val a = transactionally { pullValue }
645 val b = transactionally { a.sample() * 2 }
646 val emitter = network.mutableTFlow<Unit>()
647 val flow =
648 activateSpecWithResult(network) {
649 val sampleB = emitter.sample(b) { _, b -> b }
650 sampleB.toSharedFlow()
651 }
652
653 backgroundScope.launch { flow.collect { doubledResult = it } }
654
655 runCurrent()
656
657 emitter.emit(Unit)
658 runCurrent()
659
660 assertEquals(0, doubledResult)
661
662 pullValue = 5
663 emitter.emit(Unit)
664 runCurrent()
665
666 assertEquals(10, doubledResult)
667 }
668
669 @Test
670 fun mapTState() = runFrpTest { network ->
671 val emitter = network.mutableTFlow<Int>()
672 var stepResult: Int? = null
673 val flow =
674 activateSpecWithResult(network) {
675 val state = emitter.hold(0).map { it + 2 }
676 val stateCurrent = transactionally { state.sample() }
677 val stateChanges = state.stateChanges
678 val sampleState = emitter.sample(stateCurrent) { _, b -> b }
679 val merge = stateChanges.mergeWith(sampleState) { a, b -> a + b }
680 merge.toSharedFlow()
681 }
682 backgroundScope.launch { flow.collect { stepResult = it } }
683 runCurrent()
684
685 emitter.emit(1)
686 runCurrent()
687
688 assertEquals(5, stepResult)
689
690 emitter.emit(10)
691 runCurrent()
692
693 assertEquals(15, stepResult)
694 }
695
696 @Test
697 fun partitionEither() = runFrpTest { network ->
698 val emitter = network.mutableTFlow<Either<Int, Int>>()
699 val result =
700 activateSpecWithResult(network) {
701 val (l, r) = emitter.partitionEither()
702 val pDiamond =
703 l.map { it * 2 }
704 .mergeWith(r.map { it * -1 }) { _, _ -> error("unexpected coincidence") }
705 pDiamond.hold(null).toStateFlow()
706 }
707 runCurrent()
708
709 emitter.emit(Left(10))
710 runCurrent()
711
712 assertEquals(20, result.value)
713
714 emitter.emit(Right(30))
715 runCurrent()
716
717 assertEquals(-30, result.value)
718 }
719
720 @Test
721 fun accumTState() = runFrpTest { network ->
722 val emitter = network.mutableTFlow<Int>()
723 val sampler = network.mutableTFlow<Unit>()
724 var stepResult: Int? = null
725 val flow =
726 activateSpecWithResult(network) {
727 val sumState = emitter.map { a -> { b: Int -> a + b } }.fold(0) { f, a -> f(a) }
728
729 sumState.stateChanges
730 .mergeWith(sampler.sample(sumState) { _, sum -> sum }) { _, _ ->
731 error("Unexpected coincidence")
732 }
733 .toSharedFlow()
734 }
735
736 backgroundScope.launch { flow.collect { stepResult = it } }
737 runCurrent()
738
739 emitter.emit(5)
740 runCurrent()
741 assertEquals(5, stepResult)
742
743 emitter.emit(10)
744 runCurrent()
745 assertEquals(15, stepResult)
746
747 sampler.emit(Unit)
748 runCurrent()
749 assertEquals(15, stepResult)
750 }
751
752 @Test
753 fun mergeTFlows() = runFrpTest { network ->
754 val first = network.mutableTFlow<Int>()
755 val stopFirst = network.mutableTFlow<Unit>()
756 val second = network.mutableTFlow<Int>()
757 val stopSecond = network.mutableTFlow<Unit>()
758 var stepResult: Int? = null
759
760 val flow: SharedFlow<Int>
761 val setupDuration = measureTime {
762 flow =
763 activateSpecWithResult(network) {
764 val firstS = first.takeUntil(stopFirst)
765 val secondS = second.takeUntil(stopSecond)
766 val mergedS =
767 firstS.mergeWith(secondS) { _, _ -> error("Unexpected coincidence") }
768 mergedS.toSharedFlow()
769 // mergedS.last("onComplete")
770 }
771 backgroundScope.launch { flow.collect { stepResult = it } }
772 runCurrent()
773 }
774
775 // assertEquals(1, first.subscriptionCount.value)
776 // assertEquals(1, second.subscriptionCount.value)
777
778 val firstEmitDuration = measureTime {
779 first.emit(1)
780 runCurrent()
781 }
782
783 assertEquals(1, stepResult)
784
785 val secondEmitDuration = measureTime {
786 second.emit(2)
787 runCurrent()
788 }
789
790 assertEquals(2, stepResult)
791
792 val stopFirstDuration = measureTime {
793 stopFirst.emit(Unit)
794 runCurrent()
795 }
796
797 // assertEquals(0, first.subscriptionCount.value)
798 val testDeadEmitFirstDuration = measureTime {
799 first.emit(10)
800 runCurrent()
801 }
802
803 assertEquals(2, stepResult)
804
805 // assertEquals(1, second.subscriptionCount.value)
806
807 val secondEmitDuration2 = measureTime {
808 second.emit(3)
809 runCurrent()
810 }
811
812 assertEquals(3, stepResult)
813
814 val stopSecondDuration = measureTime {
815 stopSecond.emit(Unit)
816 runCurrent()
817 }
818
819 // assertEquals(0, second.subscriptionCount.value)
820 val testDeadEmitSecondDuration = measureTime {
821 second.emit(10)
822 runCurrent()
823 }
824
825 assertEquals(3, stepResult)
826
827 println(
828 """
829 setupDuration: ${setupDuration.toString(DurationUnit.MILLISECONDS, 2)}
830 firstEmitDuration: ${firstEmitDuration.toString(DurationUnit.MILLISECONDS, 2)}
831 secondEmitDuration: ${secondEmitDuration.toString(DurationUnit.MILLISECONDS, 2)}
832 stopFirstDuration: ${stopFirstDuration.toString(DurationUnit.MILLISECONDS, 2)}
833 testDeadEmitFirstDuration: ${
834 testDeadEmitFirstDuration.toString(
835 DurationUnit.MILLISECONDS,
836 2,
837 )
838 }
839 secondEmitDuration2: ${secondEmitDuration2.toString(DurationUnit.MILLISECONDS, 2)}
840 stopSecondDuration: ${stopSecondDuration.toString(DurationUnit.MILLISECONDS, 2)}
841 testDeadEmitSecondDuration: ${
842 testDeadEmitSecondDuration.toString(
843 DurationUnit.MILLISECONDS,
844 2,
845 )
846 }
847 """
848 .trimIndent()
849 )
850 }
851
852 @Test
853 fun sampleCancel() = runFrpTest { network ->
854 val updater = network.mutableTFlow<Int>()
855 val stopUpdater = network.mutableTFlow<Unit>()
856 val sampler = network.mutableTFlow<Unit>()
857 val stopSampler = network.mutableTFlow<Unit>()
858 var stepResult: Int? = null
859 val flow =
860 activateSpecWithResult(network) {
861 val stopSamplerFirst = stopSampler
862 val samplerS = sampler.takeUntil(stopSamplerFirst)
863 val stopUpdaterFirst = stopUpdater
864 val updaterS = updater.takeUntil(stopUpdaterFirst)
865 val sampledS = samplerS.sample(updaterS.hold(0)) { _, b -> b }
866 sampledS.toSharedFlow()
867 }
868
869 backgroundScope.launch { flow.collect { stepResult = it } }
870 runCurrent()
871
872 updater.emit(1)
873 runCurrent()
874
875 sampler.emit(Unit)
876 runCurrent()
877
878 assertEquals(1, stepResult)
879
880 stopSampler.emit(Unit)
881 runCurrent()
882
883 // assertEquals(0, updater.subscriptionCount.value)
884 // assertEquals(0, sampler.subscriptionCount.value)
885 updater.emit(10)
886 runCurrent()
887
888 sampler.emit(Unit)
889 runCurrent()
890
891 assertEquals(1, stepResult)
892 }
893
894 @Test
895 fun combineStates_differentUpstreams() = runFrpTest { network ->
896 val a = network.mutableTFlow<Int>()
897 val b = network.mutableTFlow<Int>()
898 var observed: Pair<Int, Int>? = null
899 val tState =
900 activateSpecWithResult(network) {
901 val state = combine(a.hold(0), b.hold(0)) { a, b -> Pair(a, b) }
902 state.stateChanges.observe { observed = it }
903 state
904 }
905 assertEquals(0 to 0, network.transact { tState.sample() })
906 assertEquals(null, observed)
907 a.emit(5)
908 assertEquals(5 to 0, observed)
909 assertEquals(5 to 0, network.transact { tState.sample() })
910 b.emit(3)
911 assertEquals(5 to 3, observed)
912 assertEquals(5 to 3, network.transact { tState.sample() })
913 }
914
915 @Test
916 fun sampleCombinedStates() = runFrpTest { network ->
917 val updater = network.mutableTFlow<Int>()
918 val emitter = network.mutableTFlow<Unit>()
919
920 val result =
921 activateSpecWithResult(network) {
922 val bA = updater.map { it * 2 }.hold(0)
923 val bB = updater.hold(0)
924 val combineD: TState<Pair<Int, Int>> = bA.combineWith(bB) { a, b -> a to b }
925 val sampleS = emitter.sample(combineD) { _, b -> b }
926 sampleS.nextDeferred()
927 }
928 println("launching")
929 runCurrent()
930
931 println("emitting update")
932 updater.emit(10)
933 runCurrent()
934
935 println("emitting sampler")
936 emitter.emit(Unit)
937 runCurrent()
938
939 println("asserting")
940 assertEquals(20 to 10, result.await())
941 }
942
943 @Test
944 fun switchMapPromptly() = runFrpTest { network ->
945 val emitter = network.mutableTFlow<Unit>()
946 val result =
947 activateSpecWithResult(network) {
948 emitter
949 .map { emitter.map { 1 }.map { it + 1 }.map { it * 2 } }
950 .hold(emptyTFlow)
951 .switchPromptly()
952 .nextDeferred()
953 }
954 runCurrent()
955
956 emitter.emit(Unit)
957 runCurrent()
958
959 assertTrue("Not complete", result.isCompleted)
960 assertEquals(4, result.await())
961 }
962
963 @Test
964 fun switchDeeper() = runFrpTest { network ->
965 val emitter = network.mutableTFlow<Unit>()
966 val e2 = network.mutableTFlow<Unit>()
967 val result =
968 activateSpecWithResult(network) {
969 val tres =
970 merge(e2.map { 1 }, e2.map { 2 }, transformCoincidence = { a, b -> a + b })
971 tres.observeBuild()
972 val switch = emitter.map { tres }.flatten()
973 merge(switch, e2.map { null }, transformCoincidence = { a, _ -> a })
974 .filterNotNull()
975 .nextDeferred()
976 }
977 runCurrent()
978
979 emitter.emit(Unit)
980 runCurrent()
981
982 e2.emit(Unit)
983 runCurrent()
984
985 assertTrue("Not complete", result.isCompleted)
986 assertEquals(3, result.await())
987 }
988
989 @Test
990 fun recursionBasic() = runFrpTest { network ->
991 val add1 = network.mutableTFlow<Unit>()
992 val sub1 = network.mutableTFlow<Unit>()
993 val stepResult: StateFlow<Int> =
994 activateSpecWithResult(network) {
995 val dSum = TStateLoop<Int>()
996 val sAdd1 = add1.sample(dSum) { _, sum -> sum + 1 }
997 val sMinus1 = sub1.sample(dSum) { _, sum -> sum - 1 }
998 dSum.loopback = sAdd1.mergeWith(sMinus1) { a, _ -> a }.hold(0)
999 dSum.toStateFlow()
1000 }
1001 runCurrent()
1002
1003 add1.emit(Unit)
1004 runCurrent()
1005
1006 assertEquals(1, stepResult.value)
1007
1008 add1.emit(Unit)
1009 runCurrent()
1010
1011 assertEquals(2, stepResult.value)
1012
1013 sub1.emit(Unit)
1014 runCurrent()
1015
1016 assertEquals(1, stepResult.value)
1017 }
1018
1019 @Test
1020 fun recursiveTState() = runFrpTest { network ->
1021 val e = network.mutableTFlow<Unit>()
1022 var changes = 0
1023 val state =
1024 activateSpecWithResult(network) {
1025 val s = TFlowLoop<Unit>()
1026 val deferred = s.map { tStateOf(null) }
1027 val e3 = e.map { tStateOf(Unit) }
1028 val flattened = e3.mergeWith(deferred) { a, _ -> a }.hold(tStateOf(null)).flatten()
1029 s.loopback = emptyTFlow
1030 flattened.toStateFlow()
1031 }
1032
1033 backgroundScope.launch { state.collect { changes++ } }
1034 runCurrent()
1035 }
1036
1037 @Test
1038 fun fanOut() = runFrpTest { network ->
1039 val e = network.mutableTFlow<Map<String, Int>>()
1040 val (fooFlow, barFlow) =
1041 activateSpecWithResult(network) {
1042 val selector = e.groupByKey()
1043 val foos = selector.eventsForKey("foo")
1044 val bars = selector.eventsForKey("bar")
1045 foos.toSharedFlow() to bars.toSharedFlow()
1046 }
1047 val stateFlow = fooFlow.stateIn(backgroundScope, SharingStarted.Eagerly, null)
1048 backgroundScope.launch { barFlow.collect { error("unexpected bar") } }
1049 runCurrent()
1050
1051 assertEquals(null, stateFlow.value)
1052
1053 e.emit(mapOf("foo" to 1))
1054 runCurrent()
1055
1056 assertEquals(1, stateFlow.value)
1057 }
1058
1059 @Test
1060 fun fanOutLateSubscribe() = runFrpTest { network ->
1061 val e = network.mutableTFlow<Map<String, Int>>()
1062 val barFlow =
1063 activateSpecWithResult(network) {
1064 val selector = e.groupByKey()
1065 selector
1066 .eventsForKey("foo")
1067 .map { selector.eventsForKey("bar") }
1068 .hold(emptyTFlow)
1069 .switchPromptly()
1070 .toSharedFlow()
1071 }
1072 val stateFlow = barFlow.stateIn(backgroundScope, SharingStarted.Eagerly, null)
1073 runCurrent()
1074
1075 assertEquals(null, stateFlow.value)
1076
1077 e.emit(mapOf("foo" to 0, "bar" to 1))
1078 runCurrent()
1079
1080 assertEquals(1, stateFlow.value)
1081 }
1082
1083 @Test
1084 fun inputFlowCompleted() = runFrpTest { network ->
1085 val results = mutableListOf<Int>()
1086 val e = network.mutableTFlow<Int>()
1087 activateSpec(network) { e.nextOnly().observe { results.add(it) } }
1088 runCurrent()
1089
1090 e.emit(10)
1091 runCurrent()
1092
1093 assertEquals(listOf(10), results)
1094
1095 e.emit(20)
1096 runCurrent()
1097 assertEquals(listOf(10), results)
1098 }
1099
1100 @Test
1101 fun fanOutThenMergeIncrementally() = runFrpTest { network ->
1102 // A tflow of group updates, where a group is a tflow of child updates, where a child is a
1103 // stateflow
1104 val e = network.mutableTFlow<Map<Int, Maybe<TFlow<Map<Int, Maybe<StateFlow<String>>>>>>>()
1105 println("fanOutMergeInc START")
1106 val state =
1107 activateSpecWithResult(network) {
1108 // Convert nested Flows to nested TFlow/TState
1109 val emitter: TFlow<Map<Int, Maybe<TFlow<Map<Int, Maybe<TState<String>>>>>>> =
1110 e.mapBuild { m ->
1111 m.mapValues { (_, mFlow) ->
1112 mFlow.map {
1113 it.mapBuild { m2 ->
1114 m2.mapValues { (_, mState) ->
1115 mState.map { stateFlow -> stateFlow.toTState() }
1116 }
1117 }
1118 }
1119 }
1120 }
1121 // Accumulate all of our updates into a single TState
1122 val accState: TState<Map<Int, Map<Int, String>>> =
1123 emitter
1124 .mapStateful {
1125 changeMap: Map<Int, Maybe<TFlow<Map<Int, Maybe<TState<String>>>>>> ->
1126 changeMap.mapValues { (groupId, mGroupChanges) ->
1127 mGroupChanges.map {
1128 groupChanges: TFlow<Map<Int, Maybe<TState<String>>>> ->
1129 // New group
1130 val childChangeById = groupChanges.groupByKey()
1131 val map: TFlow<Map<Int, Maybe<TFlow<Maybe<TState<String>>>>>> =
1132 groupChanges.mapStateful {
1133 gChangeMap: Map<Int, Maybe<TState<String>>> ->
1134 gChangeMap.mapValues { (childId, mChild) ->
1135 mChild.map { child: TState<String> ->
1136 println("new child $childId in the house")
1137 // New child
1138 val eRemoved =
1139 childChangeById
1140 .eventsForKey(childId)
1141 .filter { it === None }
1142 .nextOnly()
1143
1144 val addChild: TFlow<Maybe<TState<String>>> =
1145 now.map { mChild }
1146 .onEach {
1147 println(
1148 "addChild (groupId=$groupId, childId=$childId) ${child.sample()}"
1149 )
1150 }
1151
1152 val removeChild: TFlow<Maybe<TState<String>>> =
1153 eRemoved
1154 .onEach {
1155 println(
1156 "removeChild (groupId=$groupId, childId=$childId)"
1157 )
1158 }
1159 .map { none() }
1160
1161 addChild.mergeWith(removeChild) { _, _ ->
1162 error("unexpected coincidence")
1163 }
1164 }
1165 }
1166 }
1167 val mergeIncrementally: TFlow<Map<Int, Maybe<TState<String>>>> =
1168 map.onEach { println("merge patch: $it") }
1169 .mergeIncrementallyPromptly()
1170 mergeIncrementally
1171 .onEach { println("patch: $it") }
1172 .foldMapIncrementally()
1173 .flatMap { it.combine() }
1174 }
1175 }
1176 }
1177 .foldMapIncrementally()
1178 .flatMap { it.combine() }
1179
1180 accState.toStateFlow()
1181 }
1182 runCurrent()
1183
1184 assertEquals(emptyMap(), state.value)
1185
1186 val emitter2 = network.mutableTFlow<Map<Int, Maybe<StateFlow<String>>>>()
1187 println()
1188 println("init outer 0")
1189 e.emit(mapOf(0 to just(emitter2.onEach { println("emitter2 emit: $it") })))
1190 runCurrent()
1191
1192 assertEquals(mapOf(0 to emptyMap()), state.value)
1193
1194 println()
1195 println("init inner 10")
1196 emitter2.emit(mapOf(10 to just(MutableStateFlow("(0, 10)"))))
1197 runCurrent()
1198
1199 assertEquals(mapOf(0 to mapOf(10 to "(0, 10)")), state.value)
1200
1201 // replace
1202 println()
1203 println("replace inner 10")
1204 emitter2.emit(mapOf(10 to just(MutableStateFlow("(1, 10)"))))
1205 runCurrent()
1206
1207 assertEquals(mapOf(0 to mapOf(10 to "(1, 10)")), state.value)
1208
1209 // remove
1210 emitter2.emit(mapOf(10 to none()))
1211 runCurrent()
1212
1213 assertEquals(mapOf(0 to emptyMap()), state.value)
1214
1215 // add again
1216 emitter2.emit(mapOf(10 to just(MutableStateFlow("(2, 10)"))))
1217 runCurrent()
1218
1219 assertEquals(mapOf(0 to mapOf(10 to "(2, 10)")), state.value)
1220
1221 // batch update
1222 emitter2.emit(
1223 mapOf(
1224 10 to none(),
1225 11 to just(MutableStateFlow("(0, 11)")),
1226 12 to just(MutableStateFlow("(0, 12)")),
1227 )
1228 )
1229 runCurrent()
1230
1231 assertEquals(mapOf(0 to mapOf(11 to "(0, 11)", 12 to "(0, 12)")), state.value)
1232 }
1233
1234 @Test
1235 fun applyLatestNetworkChanges() = runFrpTest { network ->
1236 val newCount = network.mutableTFlow<FrpSpec<Flow<Int>>>()
1237 val flowOfFlows: Flow<Flow<Int>> =
1238 activateSpecWithResult(network) { newCount.applyLatestSpec().toSharedFlow() }
1239 runCurrent()
1240
1241 val incCount = network.mutableTFlow<Unit>()
1242 fun newFlow(): FrpSpec<SharedFlow<Int>> = frpSpec {
1243 launchEffect {
1244 try {
1245 println("new flow!")
1246 awaitCancellation()
1247 } finally {
1248 println("cancelling old flow")
1249 }
1250 }
1251 lateinit var count: TState<Int>
1252 count =
1253 incCount
1254 .onEach { println("incrementing ${count.sample()}") }
1255 .fold(0) { _, c -> c + 1 }
1256 count.stateChanges.toSharedFlow()
1257 }
1258
1259 var outerCount = 0
1260 val lastFlows: StateFlow<Pair<StateFlow<Int?>, StateFlow<Int?>>> =
1261 flowOfFlows
1262 .map { it.stateIn(backgroundScope, SharingStarted.Eagerly, null) }
1263 .pairwise(MutableStateFlow(null))
1264 .onEach { outerCount++ }
1265 .stateIn(
1266 backgroundScope,
1267 SharingStarted.Eagerly,
1268 MutableStateFlow(null) to MutableStateFlow(null),
1269 )
1270
1271 runCurrent()
1272
1273 newCount.emit(newFlow())
1274 runCurrent()
1275
1276 assertEquals(1, outerCount)
1277 // assertEquals(1, incCount.subscriptionCount)
1278 assertNull(lastFlows.value.second.value)
1279
1280 incCount.emit(Unit)
1281 runCurrent()
1282
1283 println("checking")
1284 assertEquals(1, lastFlows.value.second.value)
1285
1286 incCount.emit(Unit)
1287 runCurrent()
1288
1289 assertEquals(2, lastFlows.value.second.value)
1290
1291 newCount.emit(newFlow())
1292 runCurrent()
1293 incCount.emit(Unit)
1294 runCurrent()
1295
1296 // verify old flow is not getting updates
1297 assertEquals(2, lastFlows.value.first.value)
1298 // but the new one is
1299 assertEquals(1, lastFlows.value.second.value)
1300 }
1301
1302 @Test
1303 fun effect() = runFrpTest { network ->
1304 val input = network.mutableTFlow<Unit>()
1305 var effectRunning = false
1306 var count = 0
1307 activateSpec(network) {
1308 val j = launchEffect {
1309 effectRunning = true
1310 try {
1311 awaitCancellation()
1312 } finally {
1313 effectRunning = false
1314 }
1315 }
1316 merge(emptyTFlow, input.nextOnly()).observe {
1317 count++
1318 j.cancel()
1319 }
1320 }
1321 runCurrent()
1322 assertEquals(true, effectRunning)
1323 assertEquals(0, count)
1324
1325 println("1")
1326 input.emit(Unit)
1327 assertEquals(false, effectRunning)
1328 assertEquals(1, count)
1329
1330 println("2")
1331 input.emit(Unit)
1332 assertEquals(1, count)
1333 println("3")
1334 input.emit(Unit)
1335 assertEquals(1, count)
1336 }
1337
1338 private fun runFrpTest(
1339 timeout: Duration = 3.seconds,
1340 block: suspend TestScope.(FrpNetwork) -> Unit,
1341 ) {
1342 runTest(timeout = timeout) {
1343 val network = backgroundScope.newFrpNetwork()
1344 runCurrent()
1345 block(network)
1346 }
1347 }
1348
1349 private fun TestScope.activateSpec(network: FrpNetwork, spec: FrpSpec<*>) =
1350 backgroundScope.launch { network.activateSpec(spec) }
1351
1352 private suspend fun <R> TestScope.activateSpecWithResult(
1353 network: FrpNetwork,
1354 spec: FrpSpec<R>,
1355 ): R =
1356 CompletableDeferred<R>()
1357 .apply { activateSpec(network) { complete(spec.applySpec()) } }
1358 .await()
1359 }
1360
assertEqualsnull1361 private fun <T> assertEquals(expected: T, actual: T) =
1362 org.junit.Assert.assertEquals(expected, actual)
1363
1364 private fun <A> Flow<A>.pairwise(init: A): Flow<Pair<A, A>> = flow {
1365 var prev = init
1366 collect {
1367 emit(prev to it)
1368 prev = it
1369 }
1370 }
1371