xref: /aosp_15_r20/frameworks/base/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt (revision d57664e9bc4670b3ecf6748a746a57c557b6bc9e)
1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 @file:OptIn(ExperimentalCoroutinesApi::class, ExperimentalFrpApi::class)
18 
19 package com.android.systemui.kairos
20 
21 import com.android.systemui.kairos.util.Either
22 import com.android.systemui.kairos.util.Left
23 import com.android.systemui.kairos.util.Maybe
24 import com.android.systemui.kairos.util.None
25 import com.android.systemui.kairos.util.Right
26 import com.android.systemui.kairos.util.just
27 import com.android.systemui.kairos.util.map
28 import com.android.systemui.kairos.util.maybe
29 import com.android.systemui.kairos.util.none
30 import kotlin.time.Duration
31 import kotlin.time.Duration.Companion.seconds
32 import kotlin.time.DurationUnit
33 import kotlin.time.measureTime
34 import kotlinx.coroutines.CompletableDeferred
35 import kotlinx.coroutines.ExperimentalCoroutinesApi
36 import kotlinx.coroutines.async
37 import kotlinx.coroutines.awaitCancellation
38 import kotlinx.coroutines.flow.Flow
39 import kotlinx.coroutines.flow.MutableSharedFlow
40 import kotlinx.coroutines.flow.MutableStateFlow
41 import kotlinx.coroutines.flow.SharedFlow
42 import kotlinx.coroutines.flow.SharingStarted
43 import kotlinx.coroutines.flow.StateFlow
44 import kotlinx.coroutines.flow.first
45 import kotlinx.coroutines.flow.flow
46 import kotlinx.coroutines.flow.map
47 import kotlinx.coroutines.flow.onEach
48 import kotlinx.coroutines.flow.stateIn
49 import kotlinx.coroutines.flow.toCollection
50 import kotlinx.coroutines.launch
51 import kotlinx.coroutines.test.TestScope
52 import kotlinx.coroutines.test.runCurrent
53 import kotlinx.coroutines.test.runTest
54 import org.junit.Assert.assertNull
55 import org.junit.Assert.assertTrue
56 import org.junit.Test
57 
58 class KairosTests {
59 
60     @Test
61     fun basic() = runFrpTest { network ->
62         val emitter = network.mutableTFlow<Int>()
63         var result: Int? = null
64         activateSpec(network) { emitter.observe { result = it } }
65         runCurrent()
66         emitter.emit(3)
67         runCurrent()
68         assertEquals(3, result)
69         runCurrent()
70     }
71 
72     @Test
73     fun basicTFlow() = runFrpTest { network ->
74         val emitter = network.mutableTFlow<Int>()
75         println("starting network")
76         val result = activateSpecWithResult(network) { emitter.nextDeferred() }
77         runCurrent()
78         println("emitting")
79         emitter.emit(3)
80         runCurrent()
81         println("awaiting")
82         assertEquals(3, result.await())
83         runCurrent()
84     }
85 
86     @Test
87     fun basicTState() = runFrpTest { network ->
88         val emitter = network.mutableTFlow<Int>()
89         val result = activateSpecWithResult(network) { emitter.hold(0).stateChanges.nextDeferred() }
90         runCurrent()
91 
92         emitter.emit(3)
93         runCurrent()
94 
95         assertEquals(3, result.await())
96     }
97 
98     @Test
99     fun basicEvent() = runFrpTest { network ->
100         val emitter = MutableSharedFlow<Int>()
101         val result = activateSpecWithResult(network) { async { emitter.first() } }
102         runCurrent()
103         emitter.emit(1)
104         runCurrent()
105         assertTrue("Result eventual has not completed.", result.isCompleted)
106         assertEquals(1, result.await())
107     }
108 
109     @Test
110     fun basicTransactional() = runFrpTest { network ->
111         var value: Int? = null
112         var bSource = 1
113         val emitter = network.mutableTFlow<Unit>()
114         // Sampling this transactional will increment the source count.
115         val transactional = transactionally { bSource++ }
116         measureTime {
117                 activateSpecWithResult(network) {
118                         // Two different flows that sample the same transactional.
119                         (0 until 2).map {
120                             val sampled = emitter.sample(transactional) { _, v -> v }
121                             sampled.toSharedFlow()
122                         }
123                     }
124                     .forEach { backgroundScope.launch { it.collect { value = it } } }
125                 runCurrent()
126             }
127             .also { println("setup: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
128 
129         measureTime {
130                 emitter.emit(Unit)
131                 runCurrent()
132             }
133             .also { println("emit 1: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
134 
135         // Even though the transactional would be sampled twice, the first result is cached.
136         assertEquals(2, bSource)
137         assertEquals(1, value)
138 
139         measureTime {
140                 bSource = 10
141                 emitter.emit(Unit)
142                 runCurrent()
143             }
144             .also { println("emit 2: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
145 
146         assertEquals(11, bSource)
147         assertEquals(10, value)
148     }
149 
150     @Test
151     fun diamondGraph() = runFrpTest { network ->
152         val flow = network.mutableTFlow<Int>()
153         val outFlow =
154             activateSpecWithResult(network) {
155                 // map TFlow like we map Flow
156                 val left = flow.map { "left" to it }.onEach { println("left: $it") }
157                 val right = flow.map { "right" to it }.onEach { println("right: $it") }
158 
159                 // convert TFlows to TStates so that they can be combined
160                 val combined =
161                     left.hold("left" to 0).combineWith(right.hold("right" to 0)) { l, r -> l to r }
162                 combined.stateChanges // get TState changes
163                     .onEach { println("merged: $it") }
164                     .toSharedFlow() // convert back to Flow
165             }
166         runCurrent()
167 
168         val results = mutableListOf<Pair<Pair<String, Int>, Pair<String, Int>>>()
169         backgroundScope.launch { outFlow.toCollection(results) }
170         runCurrent()
171 
172         flow.emit(1)
173         runCurrent()
174 
175         flow.emit(2)
176         runCurrent()
177 
178         assertEquals(
179             listOf(("left" to 1) to ("right" to 1), ("left" to 2) to ("right" to 2)),
180             results,
181         )
182     }
183 
184     @Test
185     fun staticNetwork() = runFrpTest { network ->
186         var finalSum: Int? = null
187 
188         val intEmitter = network.mutableTFlow<Int>()
189         val sampleEmitter = network.mutableTFlow<Unit>()
190 
191         activateSpecWithResult(network) {
192                 val updates = intEmitter.map { a -> { b: Int -> a + b } }
193 
194                 val sumD =
195                     TStateLoop<Int>().apply {
196                         loopback =
197                             updates
198                                 .sample(this) { f, sum -> f(sum) }
199                                 .onEach { println("sum update: $it") }
200                                 .hold(0)
201                     }
202                 sampleEmitter
203                     .onEach { println("sampleEmitter emitted") }
204                     .sample(sumD) { _, sum -> sum }
205                     .onEach { println("sampled: $it") }
206                     .nextDeferred()
207             }
208             .let { launch { finalSum = it.await() } }
209 
210         runCurrent()
211 
212         (1..5).forEach { i ->
213             println("emitting: $i")
214             intEmitter.emit(i)
215             runCurrent()
216         }
217         runCurrent()
218 
219         sampleEmitter.emit(Unit)
220         runCurrent()
221 
222         assertEquals(15, finalSum)
223     }
224 
225     @Test
226     fun recursiveDefinition() = runFrpTest { network ->
227         var wasSold = false
228         var currentAmt: Int? = null
229 
230         val coin = network.mutableTFlow<Unit>()
231         val price = 50
232         val frpSpec = frpSpec {
233             val eSold = TFlowLoop<Unit>()
234 
235             val eInsert =
236                 coin.map {
237                     { runningTotal: Int ->
238                         println("TEST: $runningTotal - 10 = ${runningTotal - 10}")
239                         runningTotal - 10
240                     }
241                 }
242 
243             val eReset =
244                 eSold.map {
245                     { _: Int ->
246                         println("TEST: Resetting")
247                         price
248                     }
249                 }
250 
251             val eUpdate = eInsert.mergeWith(eReset) { f, g -> { a -> g(f(a)) } }
252 
253             val dTotal = TStateLoop<Int>()
254             dTotal.loopback = eUpdate.sample(dTotal) { f, total -> f(total) }.hold(price)
255 
256             val eAmt = dTotal.stateChanges
257             val bAmt = transactionally { dTotal.sample() }
258             eSold.loopback =
259                 coin
260                     .sample(bAmt) { coin, total -> coin to total }
261                     .mapMaybe { (_, total) -> maybe { guard { total <= 10 } } }
262 
263             val amts = eAmt.filter { amt -> amt >= 0 }
264 
265             amts.observe { currentAmt = it }
266             eSold.observe { wasSold = true }
267 
268             eSold.nextDeferred()
269         }
270 
271         activateSpec(network) { frpSpec.applySpec() }
272 
273         runCurrent()
274 
275         println()
276         println()
277         coin.emit(Unit)
278         runCurrent()
279 
280         assertEquals(40, currentAmt)
281 
282         println()
283         println()
284         coin.emit(Unit)
285         runCurrent()
286 
287         assertEquals(30, currentAmt)
288 
289         println()
290         println()
291         coin.emit(Unit)
292         runCurrent()
293 
294         assertEquals(20, currentAmt)
295 
296         println()
297         println()
298         coin.emit(Unit)
299         runCurrent()
300 
301         assertEquals(10, currentAmt)
302         assertEquals(false, wasSold)
303 
304         println()
305         println()
306         coin.emit(Unit)
307         runCurrent()
308 
309         assertEquals(true, wasSold)
310         assertEquals(50, currentAmt)
311     }
312 
313     @Test
314     fun promptCleanup() = runFrpTest { network ->
315         val emitter = network.mutableTFlow<Int>()
316         val stopper = network.mutableTFlow<Unit>()
317 
318         var result: Int? = null
319 
320         val flow = activateSpecWithResult(network) { emitter.takeUntil(stopper).toSharedFlow() }
321         backgroundScope.launch { flow.collect { result = it } }
322         runCurrent()
323 
324         emitter.emit(2)
325         runCurrent()
326 
327         assertEquals(2, result)
328 
329         stopper.emit(Unit)
330         runCurrent()
331     }
332 
333     @Test
334     fun switchTFlow() = runFrpTest { network ->
335         var currentSum: Int? = null
336 
337         val switchHandler = network.mutableTFlow<Pair<TFlow<Int>, String>>()
338         val aHandler = network.mutableTFlow<Int>()
339         val stopHandler = network.mutableTFlow<Unit>()
340         val bHandler = network.mutableTFlow<Int>()
341 
342         val sumFlow =
343             activateSpecWithResult(network) {
344                 val switchE = TFlowLoop<TFlow<Int>>()
345                 switchE.loopback =
346                     switchHandler.mapStateful { (intFlow, name) ->
347                         println("[onEach] Switching to: $name")
348                         val nextSwitch =
349                             switchE.skipNext().onEach { println("[onEach] switched-out") }
350                         val stopEvent =
351                             stopHandler
352                                 .onEach { println("[onEach] stopped") }
353                                 .mergeWith(nextSwitch) { _, b -> b }
354                         intFlow.takeUntil(stopEvent)
355                     }
356 
357                 val adderE: TFlow<(Int) -> Int> =
358                     switchE.hold(emptyTFlow).switch().map { a ->
359                         println("[onEach] new number $a")
360                         ({ sum: Int ->
361                             println("$a+$sum=${a + sum}")
362                             sum + a
363                         })
364                     }
365 
366                 val sumD = TStateLoop<Int>()
367                 sumD.loopback =
368                     adderE
369                         .sample(sumD) { f, sum -> f(sum) }
370                         .onEach { println("[onEach] writing sum: $it") }
371                         .hold(0)
372                 val sumE = sumD.stateChanges
373 
374                 sumE.toSharedFlow()
375             }
376 
377         runCurrent()
378 
379         backgroundScope.launch { sumFlow.collect { currentSum = it } }
380 
381         runCurrent()
382 
383         switchHandler.emit(aHandler to "A")
384         runCurrent()
385 
386         aHandler.emit(1)
387         runCurrent()
388 
389         assertEquals(1, currentSum)
390 
391         aHandler.emit(2)
392         runCurrent()
393 
394         assertEquals(3, currentSum)
395 
396         aHandler.emit(3)
397         runCurrent()
398 
399         assertEquals(6, currentSum)
400 
401         aHandler.emit(4)
402         runCurrent()
403 
404         assertEquals(10, currentSum)
405 
406         aHandler.emit(5)
407         runCurrent()
408 
409         assertEquals(15, currentSum)
410 
411         switchHandler.emit(bHandler to "B")
412         runCurrent()
413 
414         aHandler.emit(6)
415         runCurrent()
416 
417         assertEquals(15, currentSum)
418 
419         bHandler.emit(6)
420         runCurrent()
421 
422         assertEquals(21, currentSum)
423 
424         bHandler.emit(7)
425         runCurrent()
426 
427         assertEquals(28, currentSum)
428 
429         bHandler.emit(8)
430         runCurrent()
431 
432         assertEquals(36, currentSum)
433 
434         bHandler.emit(9)
435         runCurrent()
436 
437         assertEquals(45, currentSum)
438 
439         bHandler.emit(10)
440         runCurrent()
441 
442         assertEquals(55, currentSum)
443 
444         println()
445         println("Stopping: B")
446         stopHandler.emit(Unit) // bHandler.complete()
447         runCurrent()
448 
449         bHandler.emit(20)
450         runCurrent()
451 
452         assertEquals(55, currentSum)
453 
454         println()
455         println("Switching to: A2")
456         switchHandler.emit(aHandler to "A2")
457         runCurrent()
458 
459         println("aHandler.emit(11)")
460         aHandler.emit(11)
461         runCurrent()
462 
463         assertEquals(66, currentSum)
464 
465         aHandler.emit(12)
466         runCurrent()
467 
468         assertEquals(78, currentSum)
469 
470         aHandler.emit(13)
471         runCurrent()
472 
473         assertEquals(91, currentSum)
474 
475         aHandler.emit(14)
476         runCurrent()
477 
478         assertEquals(105, currentSum)
479 
480         aHandler.emit(15)
481         runCurrent()
482 
483         assertEquals(120, currentSum)
484 
485         stopHandler.emit(Unit)
486         runCurrent()
487 
488         aHandler.emit(100)
489         runCurrent()
490 
491         assertEquals(120, currentSum)
492     }
493 
494     @Test
495     fun switchIndirect() = runFrpTest { network ->
496         val emitter = network.mutableTFlow<Unit>()
497         activateSpec(network) {
498             emptyTFlow.map { emitter.map { 1 } }.flatten().map { "$it" }.observe()
499         }
500         runCurrent()
501     }
502 
503     @Test
504     fun switchInWithResult() = runFrpTest { network ->
505         val emitter = network.mutableTFlow<Unit>()
506         val out =
507             activateSpecWithResult(network) {
508                 emitter.map { emitter.map { 1 } }.flatten().toSharedFlow()
509             }
510         val result = out.stateIn(backgroundScope, SharingStarted.Eagerly, null)
511         runCurrent()
512         emitter.emit(Unit)
513         runCurrent()
514         assertEquals(null, result.value)
515     }
516 
517     @Test
518     fun switchInCompleted() = runFrpTest { network ->
519         val outputs = mutableListOf<Int>()
520 
521         val switchAH = network.mutableTFlow<Unit>()
522         val intAH = network.mutableTFlow<Int>()
523         val stopEmitter = network.mutableTFlow<Unit>()
524 
525         val top = frpSpec {
526             val intS = intAH.takeUntil(stopEmitter)
527             val switched = switchAH.map { intS }.flatten()
528             switched.toSharedFlow()
529         }
530         val flow = activateSpecWithResult(network) { top.applySpec() }
531         backgroundScope.launch { flow.collect { outputs.add(it) } }
532         runCurrent()
533 
534         switchAH.emit(Unit)
535         runCurrent()
536 
537         stopEmitter.emit(Unit)
538         runCurrent()
539 
540         // assertEquals(0, intAH.subscriptionCount.value)
541         intAH.emit(10)
542         runCurrent()
543 
544         assertEquals(true, outputs.isEmpty())
545 
546         switchAH.emit(Unit)
547         runCurrent()
548 
549         // assertEquals(0, intAH.subscriptionCount.value)
550         intAH.emit(10)
551         runCurrent()
552 
553         assertEquals(true, outputs.isEmpty())
554     }
555 
556     @Test
557     fun switchTFlow_outerCompletesFirst() = runFrpTest { network ->
558         var stepResult: Int? = null
559 
560         val switchAH = network.mutableTFlow<Unit>()
561         val switchStopEmitter = network.mutableTFlow<Unit>()
562         val intStopEmitter = network.mutableTFlow<Unit>()
563         val intAH = network.mutableTFlow<Int>()
564         val flow =
565             activateSpecWithResult(network) {
566                 val intS = intAH.takeUntil(intStopEmitter)
567                 val switchS = switchAH.takeUntil(switchStopEmitter)
568 
569                 val switched = switchS.map { intS }.flatten()
570                 switched.toSharedFlow()
571             }
572         backgroundScope.launch { flow.collect { stepResult = it } }
573         runCurrent()
574 
575         // assertEquals(0, intAH.subscriptionCount.value)
576         intAH.emit(100)
577         runCurrent()
578 
579         assertEquals(null, stepResult)
580 
581         switchAH.emit(Unit)
582         runCurrent()
583 
584         //            assertEquals(1, intAH.subscriptionCount.value)
585 
586         intAH.emit(5)
587         runCurrent()
588 
589         assertEquals(5, stepResult)
590 
591         println("stop outer")
592         switchStopEmitter.emit(Unit) // switchAH.complete()
593         runCurrent()
594 
595         // assertEquals(1, intAH.subscriptionCount.value)
596         // assertEquals(0, switchAH.subscriptionCount.value)
597 
598         intAH.emit(10)
599         runCurrent()
600 
601         assertEquals(10, stepResult)
602 
603         println("stop inner")
604         intStopEmitter.emit(Unit) // intAH.complete()
605         runCurrent()
606 
607         // assertEquals(just(10), network.await())
608     }
609 
610     @Test
611     fun mapTFlow() = runFrpTest { network ->
612         val emitter = network.mutableTFlow<Int>()
613         var stepResult: Int? = null
614 
615         val flow =
616             activateSpecWithResult(network) {
617                 val mappedS = emitter.map { it * it }
618                 mappedS.toSharedFlow()
619             }
620 
621         backgroundScope.launch { flow.collect { stepResult = it } }
622         runCurrent()
623 
624         emitter.emit(1)
625         runCurrent()
626 
627         assertEquals(1, stepResult)
628 
629         emitter.emit(2)
630         runCurrent()
631 
632         assertEquals(4, stepResult)
633 
634         emitter.emit(10)
635         runCurrent()
636 
637         assertEquals(100, stepResult)
638     }
639 
640     @Test
641     fun mapTransactional() = runFrpTest { network ->
642         var doubledResult: Int? = null
643         var pullValue = 0
644         val a = transactionally { pullValue }
645         val b = transactionally { a.sample() * 2 }
646         val emitter = network.mutableTFlow<Unit>()
647         val flow =
648             activateSpecWithResult(network) {
649                 val sampleB = emitter.sample(b) { _, b -> b }
650                 sampleB.toSharedFlow()
651             }
652 
653         backgroundScope.launch { flow.collect { doubledResult = it } }
654 
655         runCurrent()
656 
657         emitter.emit(Unit)
658         runCurrent()
659 
660         assertEquals(0, doubledResult)
661 
662         pullValue = 5
663         emitter.emit(Unit)
664         runCurrent()
665 
666         assertEquals(10, doubledResult)
667     }
668 
669     @Test
670     fun mapTState() = runFrpTest { network ->
671         val emitter = network.mutableTFlow<Int>()
672         var stepResult: Int? = null
673         val flow =
674             activateSpecWithResult(network) {
675                 val state = emitter.hold(0).map { it + 2 }
676                 val stateCurrent = transactionally { state.sample() }
677                 val stateChanges = state.stateChanges
678                 val sampleState = emitter.sample(stateCurrent) { _, b -> b }
679                 val merge = stateChanges.mergeWith(sampleState) { a, b -> a + b }
680                 merge.toSharedFlow()
681             }
682         backgroundScope.launch { flow.collect { stepResult = it } }
683         runCurrent()
684 
685         emitter.emit(1)
686         runCurrent()
687 
688         assertEquals(5, stepResult)
689 
690         emitter.emit(10)
691         runCurrent()
692 
693         assertEquals(15, stepResult)
694     }
695 
696     @Test
697     fun partitionEither() = runFrpTest { network ->
698         val emitter = network.mutableTFlow<Either<Int, Int>>()
699         val result =
700             activateSpecWithResult(network) {
701                 val (l, r) = emitter.partitionEither()
702                 val pDiamond =
703                     l.map { it * 2 }
704                         .mergeWith(r.map { it * -1 }) { _, _ -> error("unexpected coincidence") }
705                 pDiamond.hold(null).toStateFlow()
706             }
707         runCurrent()
708 
709         emitter.emit(Left(10))
710         runCurrent()
711 
712         assertEquals(20, result.value)
713 
714         emitter.emit(Right(30))
715         runCurrent()
716 
717         assertEquals(-30, result.value)
718     }
719 
720     @Test
721     fun accumTState() = runFrpTest { network ->
722         val emitter = network.mutableTFlow<Int>()
723         val sampler = network.mutableTFlow<Unit>()
724         var stepResult: Int? = null
725         val flow =
726             activateSpecWithResult(network) {
727                 val sumState = emitter.map { a -> { b: Int -> a + b } }.fold(0) { f, a -> f(a) }
728 
729                 sumState.stateChanges
730                     .mergeWith(sampler.sample(sumState) { _, sum -> sum }) { _, _ ->
731                         error("Unexpected coincidence")
732                     }
733                     .toSharedFlow()
734             }
735 
736         backgroundScope.launch { flow.collect { stepResult = it } }
737         runCurrent()
738 
739         emitter.emit(5)
740         runCurrent()
741         assertEquals(5, stepResult)
742 
743         emitter.emit(10)
744         runCurrent()
745         assertEquals(15, stepResult)
746 
747         sampler.emit(Unit)
748         runCurrent()
749         assertEquals(15, stepResult)
750     }
751 
752     @Test
753     fun mergeTFlows() = runFrpTest { network ->
754         val first = network.mutableTFlow<Int>()
755         val stopFirst = network.mutableTFlow<Unit>()
756         val second = network.mutableTFlow<Int>()
757         val stopSecond = network.mutableTFlow<Unit>()
758         var stepResult: Int? = null
759 
760         val flow: SharedFlow<Int>
761         val setupDuration = measureTime {
762             flow =
763                 activateSpecWithResult(network) {
764                     val firstS = first.takeUntil(stopFirst)
765                     val secondS = second.takeUntil(stopSecond)
766                     val mergedS =
767                         firstS.mergeWith(secondS) { _, _ -> error("Unexpected coincidence") }
768                     mergedS.toSharedFlow()
769                     // mergedS.last("onComplete")
770                 }
771             backgroundScope.launch { flow.collect { stepResult = it } }
772             runCurrent()
773         }
774 
775         //            assertEquals(1, first.subscriptionCount.value)
776         //            assertEquals(1, second.subscriptionCount.value)
777 
778         val firstEmitDuration = measureTime {
779             first.emit(1)
780             runCurrent()
781         }
782 
783         assertEquals(1, stepResult)
784 
785         val secondEmitDuration = measureTime {
786             second.emit(2)
787             runCurrent()
788         }
789 
790         assertEquals(2, stepResult)
791 
792         val stopFirstDuration = measureTime {
793             stopFirst.emit(Unit)
794             runCurrent()
795         }
796 
797         // assertEquals(0, first.subscriptionCount.value)
798         val testDeadEmitFirstDuration = measureTime {
799             first.emit(10)
800             runCurrent()
801         }
802 
803         assertEquals(2, stepResult)
804 
805         //            assertEquals(1, second.subscriptionCount.value)
806 
807         val secondEmitDuration2 = measureTime {
808             second.emit(3)
809             runCurrent()
810         }
811 
812         assertEquals(3, stepResult)
813 
814         val stopSecondDuration = measureTime {
815             stopSecond.emit(Unit)
816             runCurrent()
817         }
818 
819         // assertEquals(0, second.subscriptionCount.value)
820         val testDeadEmitSecondDuration = measureTime {
821             second.emit(10)
822             runCurrent()
823         }
824 
825         assertEquals(3, stepResult)
826 
827         println(
828             """
829                 setupDuration: ${setupDuration.toString(DurationUnit.MILLISECONDS, 2)}
830                 firstEmitDuration: ${firstEmitDuration.toString(DurationUnit.MILLISECONDS, 2)}
831                 secondEmitDuration: ${secondEmitDuration.toString(DurationUnit.MILLISECONDS, 2)}
832                 stopFirstDuration: ${stopFirstDuration.toString(DurationUnit.MILLISECONDS, 2)}
833                 testDeadEmitFirstDuration: ${
834                     testDeadEmitFirstDuration.toString(
835                         DurationUnit.MILLISECONDS,
836                         2,
837                     )
838                 }
839                 secondEmitDuration2: ${secondEmitDuration2.toString(DurationUnit.MILLISECONDS, 2)}
840                 stopSecondDuration: ${stopSecondDuration.toString(DurationUnit.MILLISECONDS, 2)}
841                 testDeadEmitSecondDuration: ${
842                     testDeadEmitSecondDuration.toString(
843                         DurationUnit.MILLISECONDS,
844                         2,
845                     )
846                 }
847             """
848                 .trimIndent()
849         )
850     }
851 
852     @Test
853     fun sampleCancel() = runFrpTest { network ->
854         val updater = network.mutableTFlow<Int>()
855         val stopUpdater = network.mutableTFlow<Unit>()
856         val sampler = network.mutableTFlow<Unit>()
857         val stopSampler = network.mutableTFlow<Unit>()
858         var stepResult: Int? = null
859         val flow =
860             activateSpecWithResult(network) {
861                 val stopSamplerFirst = stopSampler
862                 val samplerS = sampler.takeUntil(stopSamplerFirst)
863                 val stopUpdaterFirst = stopUpdater
864                 val updaterS = updater.takeUntil(stopUpdaterFirst)
865                 val sampledS = samplerS.sample(updaterS.hold(0)) { _, b -> b }
866                 sampledS.toSharedFlow()
867             }
868 
869         backgroundScope.launch { flow.collect { stepResult = it } }
870         runCurrent()
871 
872         updater.emit(1)
873         runCurrent()
874 
875         sampler.emit(Unit)
876         runCurrent()
877 
878         assertEquals(1, stepResult)
879 
880         stopSampler.emit(Unit)
881         runCurrent()
882 
883         // assertEquals(0, updater.subscriptionCount.value)
884         // assertEquals(0, sampler.subscriptionCount.value)
885         updater.emit(10)
886         runCurrent()
887 
888         sampler.emit(Unit)
889         runCurrent()
890 
891         assertEquals(1, stepResult)
892     }
893 
894     @Test
895     fun combineStates_differentUpstreams() = runFrpTest { network ->
896         val a = network.mutableTFlow<Int>()
897         val b = network.mutableTFlow<Int>()
898         var observed: Pair<Int, Int>? = null
899         val tState =
900             activateSpecWithResult(network) {
901                 val state = combine(a.hold(0), b.hold(0)) { a, b -> Pair(a, b) }
902                 state.stateChanges.observe { observed = it }
903                 state
904             }
905         assertEquals(0 to 0, network.transact { tState.sample() })
906         assertEquals(null, observed)
907         a.emit(5)
908         assertEquals(5 to 0, observed)
909         assertEquals(5 to 0, network.transact { tState.sample() })
910         b.emit(3)
911         assertEquals(5 to 3, observed)
912         assertEquals(5 to 3, network.transact { tState.sample() })
913     }
914 
915     @Test
916     fun sampleCombinedStates() = runFrpTest { network ->
917         val updater = network.mutableTFlow<Int>()
918         val emitter = network.mutableTFlow<Unit>()
919 
920         val result =
921             activateSpecWithResult(network) {
922                 val bA = updater.map { it * 2 }.hold(0)
923                 val bB = updater.hold(0)
924                 val combineD: TState<Pair<Int, Int>> = bA.combineWith(bB) { a, b -> a to b }
925                 val sampleS = emitter.sample(combineD) { _, b -> b }
926                 sampleS.nextDeferred()
927             }
928         println("launching")
929         runCurrent()
930 
931         println("emitting update")
932         updater.emit(10)
933         runCurrent()
934 
935         println("emitting sampler")
936         emitter.emit(Unit)
937         runCurrent()
938 
939         println("asserting")
940         assertEquals(20 to 10, result.await())
941     }
942 
943     @Test
944     fun switchMapPromptly() = runFrpTest { network ->
945         val emitter = network.mutableTFlow<Unit>()
946         val result =
947             activateSpecWithResult(network) {
948                 emitter
949                     .map { emitter.map { 1 }.map { it + 1 }.map { it * 2 } }
950                     .hold(emptyTFlow)
951                     .switchPromptly()
952                     .nextDeferred()
953             }
954         runCurrent()
955 
956         emitter.emit(Unit)
957         runCurrent()
958 
959         assertTrue("Not complete", result.isCompleted)
960         assertEquals(4, result.await())
961     }
962 
963     @Test
964     fun switchDeeper() = runFrpTest { network ->
965         val emitter = network.mutableTFlow<Unit>()
966         val e2 = network.mutableTFlow<Unit>()
967         val result =
968             activateSpecWithResult(network) {
969                 val tres =
970                     merge(e2.map { 1 }, e2.map { 2 }, transformCoincidence = { a, b -> a + b })
971                 tres.observeBuild()
972                 val switch = emitter.map { tres }.flatten()
973                 merge(switch, e2.map { null }, transformCoincidence = { a, _ -> a })
974                     .filterNotNull()
975                     .nextDeferred()
976             }
977         runCurrent()
978 
979         emitter.emit(Unit)
980         runCurrent()
981 
982         e2.emit(Unit)
983         runCurrent()
984 
985         assertTrue("Not complete", result.isCompleted)
986         assertEquals(3, result.await())
987     }
988 
989     @Test
990     fun recursionBasic() = runFrpTest { network ->
991         val add1 = network.mutableTFlow<Unit>()
992         val sub1 = network.mutableTFlow<Unit>()
993         val stepResult: StateFlow<Int> =
994             activateSpecWithResult(network) {
995                 val dSum = TStateLoop<Int>()
996                 val sAdd1 = add1.sample(dSum) { _, sum -> sum + 1 }
997                 val sMinus1 = sub1.sample(dSum) { _, sum -> sum - 1 }
998                 dSum.loopback = sAdd1.mergeWith(sMinus1) { a, _ -> a }.hold(0)
999                 dSum.toStateFlow()
1000             }
1001         runCurrent()
1002 
1003         add1.emit(Unit)
1004         runCurrent()
1005 
1006         assertEquals(1, stepResult.value)
1007 
1008         add1.emit(Unit)
1009         runCurrent()
1010 
1011         assertEquals(2, stepResult.value)
1012 
1013         sub1.emit(Unit)
1014         runCurrent()
1015 
1016         assertEquals(1, stepResult.value)
1017     }
1018 
1019     @Test
1020     fun recursiveTState() = runFrpTest { network ->
1021         val e = network.mutableTFlow<Unit>()
1022         var changes = 0
1023         val state =
1024             activateSpecWithResult(network) {
1025                 val s = TFlowLoop<Unit>()
1026                 val deferred = s.map { tStateOf(null) }
1027                 val e3 = e.map { tStateOf(Unit) }
1028                 val flattened = e3.mergeWith(deferred) { a, _ -> a }.hold(tStateOf(null)).flatten()
1029                 s.loopback = emptyTFlow
1030                 flattened.toStateFlow()
1031             }
1032 
1033         backgroundScope.launch { state.collect { changes++ } }
1034         runCurrent()
1035     }
1036 
1037     @Test
1038     fun fanOut() = runFrpTest { network ->
1039         val e = network.mutableTFlow<Map<String, Int>>()
1040         val (fooFlow, barFlow) =
1041             activateSpecWithResult(network) {
1042                 val selector = e.groupByKey()
1043                 val foos = selector.eventsForKey("foo")
1044                 val bars = selector.eventsForKey("bar")
1045                 foos.toSharedFlow() to bars.toSharedFlow()
1046             }
1047         val stateFlow = fooFlow.stateIn(backgroundScope, SharingStarted.Eagerly, null)
1048         backgroundScope.launch { barFlow.collect { error("unexpected bar") } }
1049         runCurrent()
1050 
1051         assertEquals(null, stateFlow.value)
1052 
1053         e.emit(mapOf("foo" to 1))
1054         runCurrent()
1055 
1056         assertEquals(1, stateFlow.value)
1057     }
1058 
1059     @Test
1060     fun fanOutLateSubscribe() = runFrpTest { network ->
1061         val e = network.mutableTFlow<Map<String, Int>>()
1062         val barFlow =
1063             activateSpecWithResult(network) {
1064                 val selector = e.groupByKey()
1065                 selector
1066                     .eventsForKey("foo")
1067                     .map { selector.eventsForKey("bar") }
1068                     .hold(emptyTFlow)
1069                     .switchPromptly()
1070                     .toSharedFlow()
1071             }
1072         val stateFlow = barFlow.stateIn(backgroundScope, SharingStarted.Eagerly, null)
1073         runCurrent()
1074 
1075         assertEquals(null, stateFlow.value)
1076 
1077         e.emit(mapOf("foo" to 0, "bar" to 1))
1078         runCurrent()
1079 
1080         assertEquals(1, stateFlow.value)
1081     }
1082 
1083     @Test
1084     fun inputFlowCompleted() = runFrpTest { network ->
1085         val results = mutableListOf<Int>()
1086         val e = network.mutableTFlow<Int>()
1087         activateSpec(network) { e.nextOnly().observe { results.add(it) } }
1088         runCurrent()
1089 
1090         e.emit(10)
1091         runCurrent()
1092 
1093         assertEquals(listOf(10), results)
1094 
1095         e.emit(20)
1096         runCurrent()
1097         assertEquals(listOf(10), results)
1098     }
1099 
1100     @Test
1101     fun fanOutThenMergeIncrementally() = runFrpTest { network ->
1102         // A tflow of group updates, where a group is a tflow of child updates, where a child is a
1103         // stateflow
1104         val e = network.mutableTFlow<Map<Int, Maybe<TFlow<Map<Int, Maybe<StateFlow<String>>>>>>>()
1105         println("fanOutMergeInc START")
1106         val state =
1107             activateSpecWithResult(network) {
1108                 // Convert nested Flows to nested TFlow/TState
1109                 val emitter: TFlow<Map<Int, Maybe<TFlow<Map<Int, Maybe<TState<String>>>>>>> =
1110                     e.mapBuild { m ->
1111                         m.mapValues { (_, mFlow) ->
1112                             mFlow.map {
1113                                 it.mapBuild { m2 ->
1114                                     m2.mapValues { (_, mState) ->
1115                                         mState.map { stateFlow -> stateFlow.toTState() }
1116                                     }
1117                                 }
1118                             }
1119                         }
1120                     }
1121                 // Accumulate all of our updates into a single TState
1122                 val accState: TState<Map<Int, Map<Int, String>>> =
1123                     emitter
1124                         .mapStateful {
1125                             changeMap: Map<Int, Maybe<TFlow<Map<Int, Maybe<TState<String>>>>>> ->
1126                             changeMap.mapValues { (groupId, mGroupChanges) ->
1127                                 mGroupChanges.map {
1128                                     groupChanges: TFlow<Map<Int, Maybe<TState<String>>>> ->
1129                                     // New group
1130                                     val childChangeById = groupChanges.groupByKey()
1131                                     val map: TFlow<Map<Int, Maybe<TFlow<Maybe<TState<String>>>>>> =
1132                                         groupChanges.mapStateful {
1133                                             gChangeMap: Map<Int, Maybe<TState<String>>> ->
1134                                             gChangeMap.mapValues { (childId, mChild) ->
1135                                                 mChild.map { child: TState<String> ->
1136                                                     println("new child $childId in the house")
1137                                                     // New child
1138                                                     val eRemoved =
1139                                                         childChangeById
1140                                                             .eventsForKey(childId)
1141                                                             .filter { it === None }
1142                                                             .nextOnly()
1143 
1144                                                     val addChild: TFlow<Maybe<TState<String>>> =
1145                                                         now.map { mChild }
1146                                                             .onEach {
1147                                                                 println(
1148                                                                     "addChild (groupId=$groupId, childId=$childId) ${child.sample()}"
1149                                                                 )
1150                                                             }
1151 
1152                                                     val removeChild: TFlow<Maybe<TState<String>>> =
1153                                                         eRemoved
1154                                                             .onEach {
1155                                                                 println(
1156                                                                     "removeChild (groupId=$groupId, childId=$childId)"
1157                                                                 )
1158                                                             }
1159                                                             .map { none() }
1160 
1161                                                     addChild.mergeWith(removeChild) { _, _ ->
1162                                                         error("unexpected coincidence")
1163                                                     }
1164                                                 }
1165                                             }
1166                                         }
1167                                     val mergeIncrementally: TFlow<Map<Int, Maybe<TState<String>>>> =
1168                                         map.onEach { println("merge patch: $it") }
1169                                             .mergeIncrementallyPromptly()
1170                                     mergeIncrementally
1171                                         .onEach { println("patch: $it") }
1172                                         .foldMapIncrementally()
1173                                         .flatMap { it.combine() }
1174                                 }
1175                             }
1176                         }
1177                         .foldMapIncrementally()
1178                         .flatMap { it.combine() }
1179 
1180                 accState.toStateFlow()
1181             }
1182         runCurrent()
1183 
1184         assertEquals(emptyMap(), state.value)
1185 
1186         val emitter2 = network.mutableTFlow<Map<Int, Maybe<StateFlow<String>>>>()
1187         println()
1188         println("init outer 0")
1189         e.emit(mapOf(0 to just(emitter2.onEach { println("emitter2 emit: $it") })))
1190         runCurrent()
1191 
1192         assertEquals(mapOf(0 to emptyMap()), state.value)
1193 
1194         println()
1195         println("init inner 10")
1196         emitter2.emit(mapOf(10 to just(MutableStateFlow("(0, 10)"))))
1197         runCurrent()
1198 
1199         assertEquals(mapOf(0 to mapOf(10 to "(0, 10)")), state.value)
1200 
1201         // replace
1202         println()
1203         println("replace inner 10")
1204         emitter2.emit(mapOf(10 to just(MutableStateFlow("(1, 10)"))))
1205         runCurrent()
1206 
1207         assertEquals(mapOf(0 to mapOf(10 to "(1, 10)")), state.value)
1208 
1209         // remove
1210         emitter2.emit(mapOf(10 to none()))
1211         runCurrent()
1212 
1213         assertEquals(mapOf(0 to emptyMap()), state.value)
1214 
1215         // add again
1216         emitter2.emit(mapOf(10 to just(MutableStateFlow("(2, 10)"))))
1217         runCurrent()
1218 
1219         assertEquals(mapOf(0 to mapOf(10 to "(2, 10)")), state.value)
1220 
1221         // batch update
1222         emitter2.emit(
1223             mapOf(
1224                 10 to none(),
1225                 11 to just(MutableStateFlow("(0, 11)")),
1226                 12 to just(MutableStateFlow("(0, 12)")),
1227             )
1228         )
1229         runCurrent()
1230 
1231         assertEquals(mapOf(0 to mapOf(11 to "(0, 11)", 12 to "(0, 12)")), state.value)
1232     }
1233 
1234     @Test
1235     fun applyLatestNetworkChanges() = runFrpTest { network ->
1236         val newCount = network.mutableTFlow<FrpSpec<Flow<Int>>>()
1237         val flowOfFlows: Flow<Flow<Int>> =
1238             activateSpecWithResult(network) { newCount.applyLatestSpec().toSharedFlow() }
1239         runCurrent()
1240 
1241         val incCount = network.mutableTFlow<Unit>()
1242         fun newFlow(): FrpSpec<SharedFlow<Int>> = frpSpec {
1243             launchEffect {
1244                 try {
1245                     println("new flow!")
1246                     awaitCancellation()
1247                 } finally {
1248                     println("cancelling old flow")
1249                 }
1250             }
1251             lateinit var count: TState<Int>
1252             count =
1253                 incCount
1254                     .onEach { println("incrementing ${count.sample()}") }
1255                     .fold(0) { _, c -> c + 1 }
1256             count.stateChanges.toSharedFlow()
1257         }
1258 
1259         var outerCount = 0
1260         val lastFlows: StateFlow<Pair<StateFlow<Int?>, StateFlow<Int?>>> =
1261             flowOfFlows
1262                 .map { it.stateIn(backgroundScope, SharingStarted.Eagerly, null) }
1263                 .pairwise(MutableStateFlow(null))
1264                 .onEach { outerCount++ }
1265                 .stateIn(
1266                     backgroundScope,
1267                     SharingStarted.Eagerly,
1268                     MutableStateFlow(null) to MutableStateFlow(null),
1269                 )
1270 
1271         runCurrent()
1272 
1273         newCount.emit(newFlow())
1274         runCurrent()
1275 
1276         assertEquals(1, outerCount)
1277         //        assertEquals(1, incCount.subscriptionCount)
1278         assertNull(lastFlows.value.second.value)
1279 
1280         incCount.emit(Unit)
1281         runCurrent()
1282 
1283         println("checking")
1284         assertEquals(1, lastFlows.value.second.value)
1285 
1286         incCount.emit(Unit)
1287         runCurrent()
1288 
1289         assertEquals(2, lastFlows.value.second.value)
1290 
1291         newCount.emit(newFlow())
1292         runCurrent()
1293         incCount.emit(Unit)
1294         runCurrent()
1295 
1296         // verify old flow is not getting updates
1297         assertEquals(2, lastFlows.value.first.value)
1298         // but the new one is
1299         assertEquals(1, lastFlows.value.second.value)
1300     }
1301 
1302     @Test
1303     fun effect() = runFrpTest { network ->
1304         val input = network.mutableTFlow<Unit>()
1305         var effectRunning = false
1306         var count = 0
1307         activateSpec(network) {
1308             val j = launchEffect {
1309                 effectRunning = true
1310                 try {
1311                     awaitCancellation()
1312                 } finally {
1313                     effectRunning = false
1314                 }
1315             }
1316             merge(emptyTFlow, input.nextOnly()).observe {
1317                 count++
1318                 j.cancel()
1319             }
1320         }
1321         runCurrent()
1322         assertEquals(true, effectRunning)
1323         assertEquals(0, count)
1324 
1325         println("1")
1326         input.emit(Unit)
1327         assertEquals(false, effectRunning)
1328         assertEquals(1, count)
1329 
1330         println("2")
1331         input.emit(Unit)
1332         assertEquals(1, count)
1333         println("3")
1334         input.emit(Unit)
1335         assertEquals(1, count)
1336     }
1337 
1338     private fun runFrpTest(
1339         timeout: Duration = 3.seconds,
1340         block: suspend TestScope.(FrpNetwork) -> Unit,
1341     ) {
1342         runTest(timeout = timeout) {
1343             val network = backgroundScope.newFrpNetwork()
1344             runCurrent()
1345             block(network)
1346         }
1347     }
1348 
1349     private fun TestScope.activateSpec(network: FrpNetwork, spec: FrpSpec<*>) =
1350         backgroundScope.launch { network.activateSpec(spec) }
1351 
1352     private suspend fun <R> TestScope.activateSpecWithResult(
1353         network: FrpNetwork,
1354         spec: FrpSpec<R>,
1355     ): R =
1356         CompletableDeferred<R>()
1357             .apply { activateSpec(network) { complete(spec.applySpec()) } }
1358             .await()
1359 }
1360 
assertEqualsnull1361 private fun <T> assertEquals(expected: T, actual: T) =
1362     org.junit.Assert.assertEquals(expected, actual)
1363 
1364 private fun <A> Flow<A>.pairwise(init: A): Flow<Pair<A, A>> = flow {
1365     var prev = init
1366     collect {
1367         emit(prev to it)
1368         prev = it
1369     }
1370 }
1371