1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.systemui.kairos.internal
18 
19 import com.android.systemui.kairos.CoalescingMutableTFlow
20 import com.android.systemui.kairos.FrpBuildScope
21 import com.android.systemui.kairos.FrpCoalescingProducerScope
22 import com.android.systemui.kairos.FrpDeferredValue
23 import com.android.systemui.kairos.FrpEffectScope
24 import com.android.systemui.kairos.FrpNetwork
25 import com.android.systemui.kairos.FrpProducerScope
26 import com.android.systemui.kairos.FrpSpec
27 import com.android.systemui.kairos.FrpStateScope
28 import com.android.systemui.kairos.FrpTransactionScope
29 import com.android.systemui.kairos.GroupedTFlow
30 import com.android.systemui.kairos.LocalFrpNetwork
31 import com.android.systemui.kairos.MutableTFlow
32 import com.android.systemui.kairos.TFlow
33 import com.android.systemui.kairos.TFlowInit
34 import com.android.systemui.kairos.groupByKey
35 import com.android.systemui.kairos.init
36 import com.android.systemui.kairos.internal.util.childScope
37 import com.android.systemui.kairos.internal.util.launchOnCancel
38 import com.android.systemui.kairos.internal.util.mapValuesParallel
39 import com.android.systemui.kairos.launchEffect
40 import com.android.systemui.kairos.util.Just
41 import com.android.systemui.kairos.util.Maybe
42 import com.android.systemui.kairos.util.None
43 import com.android.systemui.kairos.util.just
44 import com.android.systemui.kairos.util.map
45 import java.util.concurrent.atomic.AtomicReference
46 import kotlin.coroutines.Continuation
47 import kotlin.coroutines.CoroutineContext
48 import kotlin.coroutines.EmptyCoroutineContext
49 import kotlin.coroutines.startCoroutine
50 import kotlinx.coroutines.CompletableDeferred
51 import kotlinx.coroutines.CompletableJob
52 import kotlinx.coroutines.CoroutineName
53 import kotlinx.coroutines.CoroutineScope
54 import kotlinx.coroutines.Deferred
55 import kotlinx.coroutines.Job
56 import kotlinx.coroutines.cancel
57 import kotlinx.coroutines.completeWith
58 import kotlinx.coroutines.job
59 
60 internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope: CoroutineScope) :
61     BuildScope, StateScope by stateScope {
62 
63     private val job: Job
64         get() = coroutineScope.coroutineContext.job
65 
66     override val frpScope: FrpBuildScope = FrpBuildScopeImpl()
67 
68     override suspend fun <R> runInBuildScope(block: suspend FrpBuildScope.() -> R): R {
69         val complete = CompletableDeferred<R>(parent = coroutineContext.job)
70         block.startCoroutine(
71             frpScope,
72             object : Continuation<R> {
73                 override val context: CoroutineContext
74                     get() = EmptyCoroutineContext
75 
76                 override fun resumeWith(result: Result<R>) {
77                     complete.completeWith(result)
78                 }
79             },
80         )
81         return complete.await()
82     }
83 
84     private fun <A, T : TFlow<A>, S> buildTFlow(
85         constructFlow: (InputNode<A>) -> Pair<T, S>,
86         builder: suspend S.() -> Unit,
87     ): TFlow<A> {
88         var job: Job? = null
89         val stopEmitter = newStopEmitter()
90         val handle = this.job.invokeOnCompletion { stopEmitter.emit(Unit) }
91         // Create a child scope that will be kept alive beyond the end of this transaction.
92         val childScope = coroutineScope.childScope()
93         lateinit var emitter: Pair<T, S>
94         val inputNode =
95             InputNode<A>(
96                 activate = {
97                     check(job == null) { "already activated" }
98                     job =
99                         reenterBuildScope(this@BuildScopeImpl, childScope).runInBuildScope {
100                             launchEffect {
101                                 builder(emitter.second)
102                                 handle.dispose()
103                                 stopEmitter.emit(Unit)
104                             }
105                         }
106                 },
107                 deactivate = {
108                     checkNotNull(job) { "already deactivated" }.cancel()
109                     job = null
110                 },
111             )
112         emitter = constructFlow(inputNode)
113         return with(frpScope) { emitter.first.takeUntil(stopEmitter) }
114     }
115 
116     private fun <T> tFlowInternal(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> =
117         buildTFlow(
118             constructFlow = { inputNode ->
119                 val flow = MutableTFlow(network, inputNode)
120                 flow to
121                     object : FrpProducerScope<T> {
122                         override suspend fun emit(value: T) {
123                             flow.emit(value)
124                         }
125                     }
126             },
127             builder = builder,
128         )
129 
130     private fun <In, Out> coalescingTFlowInternal(
131         getInitialValue: () -> Out,
132         coalesce: (old: Out, new: In) -> Out,
133         builder: suspend FrpCoalescingProducerScope<In>.() -> Unit,
134     ): TFlow<Out> =
135         buildTFlow(
136             constructFlow = { inputNode ->
137                 val flow = CoalescingMutableTFlow(coalesce, network, getInitialValue, inputNode)
138                 flow to
139                     object : FrpCoalescingProducerScope<In> {
140                         override fun emit(value: In) {
141                             flow.emit(value)
142                         }
143                     }
144             },
145             builder = builder,
146         )
147 
148     private fun <A> asyncScopeInternal(block: FrpSpec<A>): Pair<FrpDeferredValue<A>, Job> {
149         val childScope = mutableChildBuildScope()
150         return FrpDeferredValue(deferAsync { childScope.runInBuildScope(block) }) to childScope.job
151     }
152 
153     private fun <R> deferredInternal(block: suspend FrpBuildScope.() -> R): FrpDeferredValue<R> =
154         FrpDeferredValue(deferAsync { runInBuildScope(block) })
155 
156     private fun deferredActionInternal(block: suspend FrpBuildScope.() -> Unit) {
157         deferAction { runInBuildScope(block) }
158     }
159 
160     private fun <A> TFlow<A>.observeEffectInternal(
161         context: CoroutineContext,
162         block: suspend FrpEffectScope.(A) -> Unit,
163     ): Job {
164         val subRef = AtomicReference<Maybe<Output<A>>>(null)
165         val childScope = coroutineScope.childScope()
166         // When our scope is cancelled, deactivate this observer.
167         childScope.launchOnCancel(CoroutineName("TFlow.observeEffect")) {
168             subRef.getAndSet(None)?.let { output ->
169                 if (output is Just) {
170                     @Suppress("DeferredResultUnused")
171                     network.transaction { scheduleDeactivation(output.value) }
172                 }
173             }
174         }
175         // Defer so that we don't suspend the caller
176         deferAction {
177             val outputNode =
178                 Output<A>(
179                     context = context,
180                     onDeath = { subRef.getAndSet(None)?.let { childScope.cancel() } },
181                     onEmit = { output ->
182                         if (subRef.get() is Just) {
183                             // Not cancelled, safe to emit
184                             val coroutine: suspend FrpEffectScope.() -> Unit = { block(output) }
185                             val complete = CompletableDeferred<Unit>(parent = coroutineContext.job)
186                             coroutine.startCoroutine(
187                                 object : FrpEffectScope, FrpTransactionScope by frpScope {
188                                     override val frpCoroutineScope: CoroutineScope = childScope
189                                     override val frpNetwork: FrpNetwork =
190                                         LocalFrpNetwork(network, childScope, endSignal)
191                                 },
192                                 completion =
193                                     object : Continuation<Unit> {
194                                         override val context: CoroutineContext
195                                             get() = EmptyCoroutineContext
196 
197                                         override fun resumeWith(result: Result<Unit>) {
198                                             complete.completeWith(result)
199                                         }
200                                     },
201                             )
202                             complete.await()
203                         }
204                     },
205                 )
206             with(frpScope) { this@observeEffectInternal.takeUntil(endSignal) }
207                 .init
208                 .connect(evalScope = stateScope.evalScope)
209                 .activate(evalScope = stateScope.evalScope, outputNode.schedulable)
210                 ?.let { (conn, needsEval) ->
211                     outputNode.upstream = conn
212                     if (!subRef.compareAndSet(null, just(outputNode))) {
213                         // Job's already been cancelled, schedule deactivation
214                         scheduleDeactivation(outputNode)
215                     } else if (needsEval) {
216                         outputNode.schedule(evalScope = stateScope.evalScope)
217                     }
218                 } ?: childScope.cancel()
219         }
220         return childScope.coroutineContext.job
221     }
222 
223     private fun <A, B> TFlow<A>.mapBuildInternal(
224         transform: suspend FrpBuildScope.(A) -> B
225     ): TFlow<B> {
226         val childScope = coroutineScope.childScope()
227         return TFlowInit(
228             constInit(
229                 "mapBuild",
230                 mapImpl({ init.connect(evalScope = this) }) { spec ->
231                         reenterBuildScope(outerScope = this@BuildScopeImpl, childScope)
232                             .runInBuildScope {
233                                 val (result, _) = asyncScope { transform(spec) }
234                                 result.get()
235                             }
236                     }
237                     .cached(),
238             )
239         )
240     }
241 
242     private fun <K, A, B> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestForKeyInternal(
243         init: FrpDeferredValue<Map<K, FrpSpec<B>>>,
244         numKeys: Int?,
245     ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> {
246         val eventsByKey: GroupedTFlow<K, Maybe<FrpSpec<A>>> = groupByKey(numKeys)
247         val initOut: Deferred<Map<K, B>> = deferAsync {
248             init.unwrapped.await().mapValuesParallel { (k, spec) ->
249                 val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
250                 val newScope = childBuildScope(newEnd)
251                 newScope.runInBuildScope(spec)
252             }
253         }
254         val childScope = coroutineScope.childScope()
255         val changesNode: TFlowImpl<Map<K, Maybe<A>>> =
256             mapImpl(upstream = { this@applyLatestForKeyInternal.init.connect(evalScope = this) }) {
257                 upstreamMap ->
258                 reenterBuildScope(this@BuildScopeImpl, childScope).run {
259                     upstreamMap.mapValuesParallel { (k: K, ma: Maybe<FrpSpec<A>>) ->
260                         ma.map { spec ->
261                             val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
262                             val newScope = childBuildScope(newEnd)
263                             newScope.runInBuildScope(spec)
264                         }
265                     }
266                 }
267             }
268         val changes: TFlow<Map<K, Maybe<A>>> =
269             TFlowInit(constInit("applyLatestForKey", changesNode.cached()))
270         // Ensure effects are observed; otherwise init will stay alive longer than expected
271         changes.observeEffectInternal(EmptyCoroutineContext) {}
272         return changes to FrpDeferredValue(initOut)
273     }
274 
275     private fun newStopEmitter(): CoalescingMutableTFlow<Unit, Unit> =
276         CoalescingMutableTFlow(
277             coalesce = { _, _: Unit -> },
278             network = network,
279             getInitialValue = {},
280         )
281 
282     private suspend fun childBuildScope(newEnd: TFlow<Any>): BuildScopeImpl {
283         val newCoroutineScope: CoroutineScope = coroutineScope.childScope()
284         return BuildScopeImpl(
285                 stateScope = stateScope.childStateScope(newEnd),
286                 coroutineScope = newCoroutineScope,
287             )
288             .apply {
289                 // Ensure that once this transaction is done, the new child scope enters the
290                 // completing state (kept alive so long as there are child jobs).
291                 scheduleOutput(
292                     OneShot {
293                         // TODO: don't like this cast
294                         (newCoroutineScope.coroutineContext.job as CompletableJob).complete()
295                     }
296                 )
297                 runInBuildScope { endSignal.nextOnly().observe { newCoroutineScope.cancel() } }
298             }
299     }
300 
301     private fun mutableChildBuildScope(): BuildScopeImpl {
302         val stopEmitter = newStopEmitter()
303         val childScope = coroutineScope.childScope()
304         childScope.coroutineContext.job.invokeOnCompletion { stopEmitter.emit(Unit) }
305         // Ensure that once this transaction is done, the new child scope enters the completing
306         // state (kept alive so long as there are child jobs).
307         scheduleOutput(
308             OneShot {
309                 // TODO: don't like this cast
310                 (childScope.coroutineContext.job as CompletableJob).complete()
311             }
312         )
313         return BuildScopeImpl(
314             stateScope = StateScopeImpl(evalScope = stateScope.evalScope, endSignal = stopEmitter),
315             coroutineScope = childScope,
316         )
317     }
318 
319     private inner class FrpBuildScopeImpl : FrpBuildScope, FrpStateScope by stateScope.frpScope {
320 
321         override fun <T> tFlow(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> =
322             tFlowInternal(builder)
323 
324         override fun <In, Out> coalescingTFlow(
325             getInitialValue: () -> Out,
326             coalesce: (old: Out, new: In) -> Out,
327             builder: suspend FrpCoalescingProducerScope<In>.() -> Unit,
328         ): TFlow<Out> = coalescingTFlowInternal(getInitialValue, coalesce, builder)
329 
330         override fun <A> asyncScope(block: FrpSpec<A>): Pair<FrpDeferredValue<A>, Job> =
331             asyncScopeInternal(block)
332 
333         override fun <R> deferredBuildScope(
334             block: suspend FrpBuildScope.() -> R
335         ): FrpDeferredValue<R> = deferredInternal(block)
336 
337         override fun deferredBuildScopeAction(block: suspend FrpBuildScope.() -> Unit) =
338             deferredActionInternal(block)
339 
340         override fun <A> TFlow<A>.observe(
341             coroutineContext: CoroutineContext,
342             block: suspend FrpEffectScope.(A) -> Unit,
343         ): Job = observeEffectInternal(coroutineContext, block)
344 
345         override fun <A, B> TFlow<A>.mapBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B> =
346             mapBuildInternal(transform)
347 
348         override fun <K, A, B> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestSpecForKey(
349             initialSpecs: FrpDeferredValue<Map<K, FrpSpec<B>>>,
350             numKeys: Int?,
351         ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> =
352             applyLatestForKeyInternal(initialSpecs, numKeys)
353     }
354 }
355 
reenterBuildScopenull356 private fun EvalScope.reenterBuildScope(
357     outerScope: BuildScopeImpl,
358     coroutineScope: CoroutineScope,
359 ) =
360     BuildScopeImpl(
361         stateScope = StateScopeImpl(evalScope = this, endSignal = outerScope.endSignal),
362         coroutineScope,
363     )
364