1 /*
<lambda>null2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.systemui.kairos.internal
18
19 import com.android.systemui.kairos.CoalescingMutableTFlow
20 import com.android.systemui.kairos.FrpBuildScope
21 import com.android.systemui.kairos.FrpCoalescingProducerScope
22 import com.android.systemui.kairos.FrpDeferredValue
23 import com.android.systemui.kairos.FrpEffectScope
24 import com.android.systemui.kairos.FrpNetwork
25 import com.android.systemui.kairos.FrpProducerScope
26 import com.android.systemui.kairos.FrpSpec
27 import com.android.systemui.kairos.FrpStateScope
28 import com.android.systemui.kairos.FrpTransactionScope
29 import com.android.systemui.kairos.GroupedTFlow
30 import com.android.systemui.kairos.LocalFrpNetwork
31 import com.android.systemui.kairos.MutableTFlow
32 import com.android.systemui.kairos.TFlow
33 import com.android.systemui.kairos.TFlowInit
34 import com.android.systemui.kairos.groupByKey
35 import com.android.systemui.kairos.init
36 import com.android.systemui.kairos.internal.util.childScope
37 import com.android.systemui.kairos.internal.util.launchOnCancel
38 import com.android.systemui.kairos.internal.util.mapValuesParallel
39 import com.android.systemui.kairos.launchEffect
40 import com.android.systemui.kairos.util.Just
41 import com.android.systemui.kairos.util.Maybe
42 import com.android.systemui.kairos.util.None
43 import com.android.systemui.kairos.util.just
44 import com.android.systemui.kairos.util.map
45 import java.util.concurrent.atomic.AtomicReference
46 import kotlin.coroutines.Continuation
47 import kotlin.coroutines.CoroutineContext
48 import kotlin.coroutines.EmptyCoroutineContext
49 import kotlin.coroutines.startCoroutine
50 import kotlinx.coroutines.CompletableDeferred
51 import kotlinx.coroutines.CompletableJob
52 import kotlinx.coroutines.CoroutineName
53 import kotlinx.coroutines.CoroutineScope
54 import kotlinx.coroutines.Deferred
55 import kotlinx.coroutines.Job
56 import kotlinx.coroutines.cancel
57 import kotlinx.coroutines.completeWith
58 import kotlinx.coroutines.job
59
60 internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope: CoroutineScope) :
61 BuildScope, StateScope by stateScope {
62
63 private val job: Job
64 get() = coroutineScope.coroutineContext.job
65
66 override val frpScope: FrpBuildScope = FrpBuildScopeImpl()
67
68 override suspend fun <R> runInBuildScope(block: suspend FrpBuildScope.() -> R): R {
69 val complete = CompletableDeferred<R>(parent = coroutineContext.job)
70 block.startCoroutine(
71 frpScope,
72 object : Continuation<R> {
73 override val context: CoroutineContext
74 get() = EmptyCoroutineContext
75
76 override fun resumeWith(result: Result<R>) {
77 complete.completeWith(result)
78 }
79 },
80 )
81 return complete.await()
82 }
83
84 private fun <A, T : TFlow<A>, S> buildTFlow(
85 constructFlow: (InputNode<A>) -> Pair<T, S>,
86 builder: suspend S.() -> Unit,
87 ): TFlow<A> {
88 var job: Job? = null
89 val stopEmitter = newStopEmitter()
90 val handle = this.job.invokeOnCompletion { stopEmitter.emit(Unit) }
91 // Create a child scope that will be kept alive beyond the end of this transaction.
92 val childScope = coroutineScope.childScope()
93 lateinit var emitter: Pair<T, S>
94 val inputNode =
95 InputNode<A>(
96 activate = {
97 check(job == null) { "already activated" }
98 job =
99 reenterBuildScope(this@BuildScopeImpl, childScope).runInBuildScope {
100 launchEffect {
101 builder(emitter.second)
102 handle.dispose()
103 stopEmitter.emit(Unit)
104 }
105 }
106 },
107 deactivate = {
108 checkNotNull(job) { "already deactivated" }.cancel()
109 job = null
110 },
111 )
112 emitter = constructFlow(inputNode)
113 return with(frpScope) { emitter.first.takeUntil(stopEmitter) }
114 }
115
116 private fun <T> tFlowInternal(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> =
117 buildTFlow(
118 constructFlow = { inputNode ->
119 val flow = MutableTFlow(network, inputNode)
120 flow to
121 object : FrpProducerScope<T> {
122 override suspend fun emit(value: T) {
123 flow.emit(value)
124 }
125 }
126 },
127 builder = builder,
128 )
129
130 private fun <In, Out> coalescingTFlowInternal(
131 getInitialValue: () -> Out,
132 coalesce: (old: Out, new: In) -> Out,
133 builder: suspend FrpCoalescingProducerScope<In>.() -> Unit,
134 ): TFlow<Out> =
135 buildTFlow(
136 constructFlow = { inputNode ->
137 val flow = CoalescingMutableTFlow(coalesce, network, getInitialValue, inputNode)
138 flow to
139 object : FrpCoalescingProducerScope<In> {
140 override fun emit(value: In) {
141 flow.emit(value)
142 }
143 }
144 },
145 builder = builder,
146 )
147
148 private fun <A> asyncScopeInternal(block: FrpSpec<A>): Pair<FrpDeferredValue<A>, Job> {
149 val childScope = mutableChildBuildScope()
150 return FrpDeferredValue(deferAsync { childScope.runInBuildScope(block) }) to childScope.job
151 }
152
153 private fun <R> deferredInternal(block: suspend FrpBuildScope.() -> R): FrpDeferredValue<R> =
154 FrpDeferredValue(deferAsync { runInBuildScope(block) })
155
156 private fun deferredActionInternal(block: suspend FrpBuildScope.() -> Unit) {
157 deferAction { runInBuildScope(block) }
158 }
159
160 private fun <A> TFlow<A>.observeEffectInternal(
161 context: CoroutineContext,
162 block: suspend FrpEffectScope.(A) -> Unit,
163 ): Job {
164 val subRef = AtomicReference<Maybe<Output<A>>>(null)
165 val childScope = coroutineScope.childScope()
166 // When our scope is cancelled, deactivate this observer.
167 childScope.launchOnCancel(CoroutineName("TFlow.observeEffect")) {
168 subRef.getAndSet(None)?.let { output ->
169 if (output is Just) {
170 @Suppress("DeferredResultUnused")
171 network.transaction { scheduleDeactivation(output.value) }
172 }
173 }
174 }
175 // Defer so that we don't suspend the caller
176 deferAction {
177 val outputNode =
178 Output<A>(
179 context = context,
180 onDeath = { subRef.getAndSet(None)?.let { childScope.cancel() } },
181 onEmit = { output ->
182 if (subRef.get() is Just) {
183 // Not cancelled, safe to emit
184 val coroutine: suspend FrpEffectScope.() -> Unit = { block(output) }
185 val complete = CompletableDeferred<Unit>(parent = coroutineContext.job)
186 coroutine.startCoroutine(
187 object : FrpEffectScope, FrpTransactionScope by frpScope {
188 override val frpCoroutineScope: CoroutineScope = childScope
189 override val frpNetwork: FrpNetwork =
190 LocalFrpNetwork(network, childScope, endSignal)
191 },
192 completion =
193 object : Continuation<Unit> {
194 override val context: CoroutineContext
195 get() = EmptyCoroutineContext
196
197 override fun resumeWith(result: Result<Unit>) {
198 complete.completeWith(result)
199 }
200 },
201 )
202 complete.await()
203 }
204 },
205 )
206 with(frpScope) { this@observeEffectInternal.takeUntil(endSignal) }
207 .init
208 .connect(evalScope = stateScope.evalScope)
209 .activate(evalScope = stateScope.evalScope, outputNode.schedulable)
210 ?.let { (conn, needsEval) ->
211 outputNode.upstream = conn
212 if (!subRef.compareAndSet(null, just(outputNode))) {
213 // Job's already been cancelled, schedule deactivation
214 scheduleDeactivation(outputNode)
215 } else if (needsEval) {
216 outputNode.schedule(evalScope = stateScope.evalScope)
217 }
218 } ?: childScope.cancel()
219 }
220 return childScope.coroutineContext.job
221 }
222
223 private fun <A, B> TFlow<A>.mapBuildInternal(
224 transform: suspend FrpBuildScope.(A) -> B
225 ): TFlow<B> {
226 val childScope = coroutineScope.childScope()
227 return TFlowInit(
228 constInit(
229 "mapBuild",
230 mapImpl({ init.connect(evalScope = this) }) { spec ->
231 reenterBuildScope(outerScope = this@BuildScopeImpl, childScope)
232 .runInBuildScope {
233 val (result, _) = asyncScope { transform(spec) }
234 result.get()
235 }
236 }
237 .cached(),
238 )
239 )
240 }
241
242 private fun <K, A, B> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestForKeyInternal(
243 init: FrpDeferredValue<Map<K, FrpSpec<B>>>,
244 numKeys: Int?,
245 ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> {
246 val eventsByKey: GroupedTFlow<K, Maybe<FrpSpec<A>>> = groupByKey(numKeys)
247 val initOut: Deferred<Map<K, B>> = deferAsync {
248 init.unwrapped.await().mapValuesParallel { (k, spec) ->
249 val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
250 val newScope = childBuildScope(newEnd)
251 newScope.runInBuildScope(spec)
252 }
253 }
254 val childScope = coroutineScope.childScope()
255 val changesNode: TFlowImpl<Map<K, Maybe<A>>> =
256 mapImpl(upstream = { this@applyLatestForKeyInternal.init.connect(evalScope = this) }) {
257 upstreamMap ->
258 reenterBuildScope(this@BuildScopeImpl, childScope).run {
259 upstreamMap.mapValuesParallel { (k: K, ma: Maybe<FrpSpec<A>>) ->
260 ma.map { spec ->
261 val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
262 val newScope = childBuildScope(newEnd)
263 newScope.runInBuildScope(spec)
264 }
265 }
266 }
267 }
268 val changes: TFlow<Map<K, Maybe<A>>> =
269 TFlowInit(constInit("applyLatestForKey", changesNode.cached()))
270 // Ensure effects are observed; otherwise init will stay alive longer than expected
271 changes.observeEffectInternal(EmptyCoroutineContext) {}
272 return changes to FrpDeferredValue(initOut)
273 }
274
275 private fun newStopEmitter(): CoalescingMutableTFlow<Unit, Unit> =
276 CoalescingMutableTFlow(
277 coalesce = { _, _: Unit -> },
278 network = network,
279 getInitialValue = {},
280 )
281
282 private suspend fun childBuildScope(newEnd: TFlow<Any>): BuildScopeImpl {
283 val newCoroutineScope: CoroutineScope = coroutineScope.childScope()
284 return BuildScopeImpl(
285 stateScope = stateScope.childStateScope(newEnd),
286 coroutineScope = newCoroutineScope,
287 )
288 .apply {
289 // Ensure that once this transaction is done, the new child scope enters the
290 // completing state (kept alive so long as there are child jobs).
291 scheduleOutput(
292 OneShot {
293 // TODO: don't like this cast
294 (newCoroutineScope.coroutineContext.job as CompletableJob).complete()
295 }
296 )
297 runInBuildScope { endSignal.nextOnly().observe { newCoroutineScope.cancel() } }
298 }
299 }
300
301 private fun mutableChildBuildScope(): BuildScopeImpl {
302 val stopEmitter = newStopEmitter()
303 val childScope = coroutineScope.childScope()
304 childScope.coroutineContext.job.invokeOnCompletion { stopEmitter.emit(Unit) }
305 // Ensure that once this transaction is done, the new child scope enters the completing
306 // state (kept alive so long as there are child jobs).
307 scheduleOutput(
308 OneShot {
309 // TODO: don't like this cast
310 (childScope.coroutineContext.job as CompletableJob).complete()
311 }
312 )
313 return BuildScopeImpl(
314 stateScope = StateScopeImpl(evalScope = stateScope.evalScope, endSignal = stopEmitter),
315 coroutineScope = childScope,
316 )
317 }
318
319 private inner class FrpBuildScopeImpl : FrpBuildScope, FrpStateScope by stateScope.frpScope {
320
321 override fun <T> tFlow(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> =
322 tFlowInternal(builder)
323
324 override fun <In, Out> coalescingTFlow(
325 getInitialValue: () -> Out,
326 coalesce: (old: Out, new: In) -> Out,
327 builder: suspend FrpCoalescingProducerScope<In>.() -> Unit,
328 ): TFlow<Out> = coalescingTFlowInternal(getInitialValue, coalesce, builder)
329
330 override fun <A> asyncScope(block: FrpSpec<A>): Pair<FrpDeferredValue<A>, Job> =
331 asyncScopeInternal(block)
332
333 override fun <R> deferredBuildScope(
334 block: suspend FrpBuildScope.() -> R
335 ): FrpDeferredValue<R> = deferredInternal(block)
336
337 override fun deferredBuildScopeAction(block: suspend FrpBuildScope.() -> Unit) =
338 deferredActionInternal(block)
339
340 override fun <A> TFlow<A>.observe(
341 coroutineContext: CoroutineContext,
342 block: suspend FrpEffectScope.(A) -> Unit,
343 ): Job = observeEffectInternal(coroutineContext, block)
344
345 override fun <A, B> TFlow<A>.mapBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B> =
346 mapBuildInternal(transform)
347
348 override fun <K, A, B> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestSpecForKey(
349 initialSpecs: FrpDeferredValue<Map<K, FrpSpec<B>>>,
350 numKeys: Int?,
351 ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> =
352 applyLatestForKeyInternal(initialSpecs, numKeys)
353 }
354 }
355
reenterBuildScopenull356 private fun EvalScope.reenterBuildScope(
357 outerScope: BuildScopeImpl,
358 coroutineScope: CoroutineScope,
359 ) =
360 BuildScopeImpl(
361 stateScope = StateScopeImpl(evalScope = this, endSignal = outerScope.endSignal),
362 coroutineScope,
363 )
364