<lambda>null1 package kotlinx.coroutines.flow
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.flow.internal.*
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9
10 /**
11 * A [SharedFlow] that represents a read-only state with a single updatable data [value] that emits updates
12 * to the value to its collectors. A state flow is a _hot_ flow because its active instance exists independently
13 * of the presence of collectors. Its current value can be retrieved via the [value] property.
14 *
15 * **State flow never completes**. A call to [Flow.collect] on a state flow never completes normally, and
16 * neither does a coroutine started by the [Flow.launchIn] function. An active collector of a state flow is called a _subscriber_.
17 *
18 * A [mutable state flow][MutableStateFlow] is created using `MutableStateFlow(value)` constructor function with
19 * the initial value. The value of mutable state flow can be updated by setting its [value] property.
20 * Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector skips fast updates,
21 * but always collects the most recently emitted value.
22 *
23 * [StateFlow] is useful as a data-model class to represent any kind of state.
24 * Derived values can be defined using various operators on the flows, with [combine] operator being especially
25 * useful to combine values from multiple state flows using arbitrary functions.
26 *
27 * For example, the following class encapsulates an integer state and increments its value on each call to `inc`:
28 *
29 * ```
30 * class CounterModel {
31 * private val _counter = MutableStateFlow(0) // private mutable state flow
32 * val counter = _counter.asStateFlow() // publicly exposed as read-only state flow
33 *
34 * fun inc() {
35 * _counter.update { count -> count + 1 } // atomic, safe for concurrent use
36 * }
37 * }
38 * ```
39 *
40 * Having two instances of the above `CounterModel` class one can define the sum of their counters like this:
41 *
42 * ```
43 * val aModel = CounterModel()
44 * val bModel = CounterModel()
45 * val sumFlow: Flow<Int> = aModel.counter.combine(bModel.counter) { a, b -> a + b }
46 * ```
47 *
48 * As an alternative to the above usage with the `MutableStateFlow(...)` constructor function,
49 * any _cold_ [Flow] can be converted to a state flow using the [stateIn] operator.
50 *
51 * ### Strong equality-based conflation
52 *
53 * Values in state flow are conflated using [Any.equals] comparison in a similar way to
54 * [distinctUntilChanged] operator. It is used to conflate incoming updates
55 * to [value][MutableStateFlow.value] in [MutableStateFlow] and to suppress emission of the values to collectors
56 * when new value is equal to the previously emitted one. State flow behavior with classes that violate
57 * the contract for [Any.equals] is unspecified.
58 *
59 * ### State flow is a shared flow
60 *
61 * State flow is a special-purpose, high-performance, and efficient implementation of [SharedFlow] for the narrow,
62 * but widely used case of sharing a state. See the [SharedFlow] documentation for the basic rules,
63 * constraints, and operators that are applicable to all shared flows.
64 *
65 * State flow always has an initial value, replays one most recent value to new subscribers, does not buffer any
66 * more values, but keeps the last emitted one, and does not support [resetReplayCache][MutableSharedFlow.resetReplayCache].
67 * A state flow behaves identically to a shared flow when it is created
68 * with the following parameters and the [distinctUntilChanged] operator is applied to it:
69 *
70 * ```
71 * // MutableStateFlow(initialValue) is a shared flow with the following parameters:
72 * val shared = MutableSharedFlow(
73 * replay = 1,
74 * onBufferOverflow = BufferOverflow.DROP_OLDEST
75 * )
76 * shared.tryEmit(initialValue) // emit the initial value
77 * val state = shared.distinctUntilChanged() // get StateFlow-like behavior
78 * ```
79 *
80 * Use [SharedFlow] when you need a [StateFlow] with tweaks in its behavior such as extra buffering, replaying more
81 * values, or omitting the initial value.
82 *
83 * ### StateFlow vs ConflatedBroadcastChannel
84 *
85 * Conceptually, state flow is similar to [ConflatedBroadcastChannel]
86 * and is designed to completely replace it.
87 * It has the following important differences:
88 *
89 * - `StateFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows
90 * for faster, garbage-free implementation, unlike `ConflatedBroadcastChannel` implementation that
91 * allocates objects on each emitted value.
92 * - `StateFlow` always has a value which can be safely read at any time via [value] property.
93 * Unlike `ConflatedBroadcastChannel`, there is no way to create a state flow without a value.
94 * - `StateFlow` has a clear separation into a read-only `StateFlow` interface and a [MutableStateFlow].
95 * - `StateFlow` conflation is based on equality like [distinctUntilChanged] operator,
96 * unlike conflation in `ConflatedBroadcastChannel` that is based on reference identity.
97 * - `StateFlow` cannot be closed like `ConflatedBroadcastChannel` and can never represent a failure.
98 * All errors and completion signals should be explicitly _materialized_ if needed.
99 *
100 * `StateFlow` is designed to better cover typical use-cases of keeping track of state changes in time, taking
101 * more pragmatic design choices for the sake of convenience.
102 *
103 * To migrate [ConflatedBroadcastChannel] usage to [StateFlow], start by replacing usages of the `ConflatedBroadcastChannel()`
104 * constructor with `MutableStateFlow(initialValue)`, using `null` as an initial value if you don't have one.
105 * Replace [send][ConflatedBroadcastChannel.send] and [trySend][ConflatedBroadcastChannel.trySend] calls
106 * with updates to the state flow's [MutableStateFlow.value], and convert subscribers' code to flow operators.
107 * You can use the [filterNotNull] operator to mimic behavior of a `ConflatedBroadcastChannel` without initial value.
108 *
109 * ### Concurrency
110 *
111 * All methods of state flow are **thread-safe** and can be safely invoked from concurrent coroutines without
112 * external synchronization.
113 *
114 * ### Operator fusion
115 *
116 * Application of [flowOn][Flow.flowOn], [conflate][Flow.conflate],
117 * [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
118 * [distinctUntilChanged][Flow.distinctUntilChanged], or [cancellable] operators to a state flow has no effect.
119 *
120 * ### Implementation notes
121 *
122 * State flow implementation is optimized for memory consumption and allocation-freedom. It uses a lock to ensure
123 * thread-safety, but suspending collector coroutines are resumed outside of this lock to avoid dead-locks when
124 * using unconfined coroutines. Adding new subscribers has `O(1)` amortized cost, but updating a [value] has `O(N)`
125 * cost, where `N` is the number of active subscribers.
126 *
127 * ### Not stable for inheritance
128 *
129 * **`The StateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
130 * might be added to this interface in the future, but is stable for use.
131 * Use the `MutableStateFlow(value)` constructor function to create an implementation.
132 */
133 public interface StateFlow<out T> : SharedFlow<T> {
134 /**
135 * The current value of this state flow.
136 */
137 public val value: T
138 }
139
140 /**
141 * A mutable [StateFlow] that provides a setter for [value].
142 * An instance of `MutableStateFlow` with the given initial `value` can be created using
143 * `MutableStateFlow(value)` constructor function.
144
145 * See the [StateFlow] documentation for details on state flows.
146 * Note that all emission-related operators, such as [value]'s setter, [emit], and [tryEmit], are conflated using [Any.equals].
147 *
148 * ### Not stable for inheritance
149 *
150 * **The `MutableStateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
151 * might be added to this interface in the future, but is stable for use.
152 * Use the `MutableStateFlow()` constructor function to create an implementation.
153 */
154 public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
155 /**
156 * The current value of this state flow.
157 *
158 * Setting a value that is [equal][Any.equals] to the previous one does nothing.
159 *
160 * This property is **thread-safe** and can be safely updated from concurrent coroutines without
161 * external synchronization.
162 */
163 public override var value: T
164
165 /**
166 * Atomically compares the current [value] with [expect] and sets it to [update] if it is equal to [expect].
167 * The result is `true` if the [value] was set to [update] and `false` otherwise.
168 *
169 * This function use a regular comparison using [Any.equals]. If both [expect] and [update] are equal to the
170 * current [value], this function returns `true`, but it does not actually change the reference that is
171 * stored in the [value].
172 *
173 * This method is **thread-safe** and can be safely invoked from concurrent coroutines without
174 * external synchronization.
175 */
compareAndSetnull176 public fun compareAndSet(expect: T, update: T): Boolean
177 }
178
179 /**
180 * Creates a [MutableStateFlow] with the given initial [value].
181 */
182 @Suppress("FunctionName")
183 public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
184
185 // ------------------------------------ Update methods ------------------------------------
186
187 /**
188 * Updates the [MutableStateFlow.value] atomically using the specified [function] of its value, and returns the new
189 * value.
190 *
191 * [function] may be evaluated multiple times, if [value] is being concurrently updated.
192 */
193 public inline fun <T> MutableStateFlow<T>.updateAndGet(function: (T) -> T): T {
194 while (true) {
195 val prevValue = value
196 val nextValue = function(prevValue)
197 if (compareAndSet(prevValue, nextValue)) {
198 return nextValue
199 }
200 }
201 }
202
203 /**
204 * Updates the [MutableStateFlow.value] atomically using the specified [function] of its value, and returns its
205 * prior value.
206 *
207 * [function] may be evaluated multiple times, if [value] is being concurrently updated.
208 */
getAndUpdatenull209 public inline fun <T> MutableStateFlow<T>.getAndUpdate(function: (T) -> T): T {
210 while (true) {
211 val prevValue = value
212 val nextValue = function(prevValue)
213 if (compareAndSet(prevValue, nextValue)) {
214 return prevValue
215 }
216 }
217 }
218
219
220 /**
221 * Updates the [MutableStateFlow.value] atomically using the specified [function] of its value.
222 *
223 * [function] may be evaluated multiple times, if [value] is being concurrently updated.
224 */
updatenull225 public inline fun <T> MutableStateFlow<T>.update(function: (T) -> T) {
226 while (true) {
227 val prevValue = value
228 val nextValue = function(prevValue)
229 if (compareAndSet(prevValue, nextValue)) {
230 return
231 }
232 }
233 }
234
235 // ------------------------------------ Implementation ------------------------------------
236
237 private val NONE = Symbol("NONE")
238
239 private val PENDING = Symbol("PENDING")
240
241 // StateFlow slots are allocated for its collectors
242 private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
243 /**
244 * Each slot can have one of the following states:
245 *
246 * - `null` -- it is not used right now. Can [allocateLocked] to new collector.
247 * - `NONE` -- used by a collector, but neither suspended nor has pending value.
248 * - `PENDING` -- pending to process new value.
249 * - `CancellableContinuationImpl<Unit>` -- suspended waiting for new value.
250 *
251 * It is important that default `null` value is used, because there can be a race between allocation
252 * of a new slot and trying to do [makePending] on this slot.
253 *
254 * ===
255 * This should be `atomic<Any?>(null)` instead of the atomic reference, but because of #3820
256 * it is used as a **temporary** solution starting from 1.8.1 version.
257 * Depending on the fix rollout on Android, it will be removed in 1.9.0 or 2.0.0.
258 * See https://issuetracker.google.com/issues/325123736
259 */
260 private val _state = WorkaroundAtomicReference<Any?>(null)
261
allocateLockednull262 override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
263 // No need for atomic check & update here, since allocated happens under StateFlow lock
264 if (_state.value != null) return false // not free
265 _state.value = NONE // allocated
266 return true
267 }
268
freeLockednull269 override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
270 _state.value = null // free now
271 return EMPTY_RESUMES // nothing more to do
272 }
273
274 @Suppress("UNCHECKED_CAST")
makePendingnull275 fun makePending() {
276 _state.loop { state ->
277 when {
278 state == null -> return // this slot is free - skip it
279 state === PENDING -> return // already pending, nothing to do
280 state === NONE -> { // mark as pending
281 if (_state.compareAndSet(state, PENDING)) return
282 }
283 else -> { // must be a suspend continuation state
284 // we must still use CAS here since continuation may get cancelled and free the slot at any time
285 if (_state.compareAndSet(state, NONE)) {
286 (state as CancellableContinuationImpl<Unit>).resume(Unit)
287 return
288 }
289 }
290 }
291 }
292 }
293
statenull294 fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
295 assert { state !is CancellableContinuationImpl<*> }
296 return state === PENDING
297 }
298
awaitPendingnull299 suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
300 assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING
301 if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending
302 // CAS failed -- the only possible reason is that it is already in pending state now
303 assert { _state.value === PENDING }
304 cont.resume(Unit)
305 }
306 }
307
308 private class StateFlowImpl<T>(
309 initialState: Any // T | NULL
310 ) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
311 private val _state = atomic(initialState) // T | NULL
312 private var sequence = 0 // serializes updates, value update is in process when sequence is odd
313
314 public override var value: T
315 get() = NULL.unbox(_state.value)
316 set(value) { updateState(null, value ?: NULL) }
317
compareAndSetnull318 override fun compareAndSet(expect: T, update: T): Boolean =
319 updateState(expect ?: NULL, update ?: NULL)
320
321 private fun updateState(expectedState: Any?, newState: Any): Boolean {
322 var curSequence: Int
323 var curSlots: Array<StateFlowSlot?>? // benign race, we will not use it
324 synchronized(this) {
325 val oldState = _state.value
326 if (expectedState != null && oldState != expectedState) return false // CAS support
327 if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
328 _state.value = newState
329 curSequence = sequence
330 if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
331 curSequence++ // make it odd
332 sequence = curSequence
333 } else {
334 // update is already in process, notify it, and return
335 sequence = curSequence + 2 // change sequence to notify, keep it odd
336 return true // updated
337 }
338 curSlots = slots // read current reference to collectors under lock
339 }
340 /*
341 Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines.
342 Loop until we're done firing all the changes. This is a sort of simple flat combining that
343 ensures sequential firing of concurrent updates and avoids the storm of collector resumes
344 when updates happen concurrently from many threads.
345 */
346 while (true) {
347 // Benign race on element read from array
348 curSlots?.forEach {
349 it?.makePending()
350 }
351 // check if the value was updated again while we were updating the old one
352 synchronized(this) {
353 if (sequence == curSequence) { // nothing changed, we are done
354 sequence = curSequence + 1 // make sequence even again
355 return true // done, updated
356 }
357 // reread everything for the next loop under the lock
358 curSequence = sequence
359 curSlots = slots
360 }
361 }
362 }
363
364 override val replayCache: List<T>
365 get() = listOf(value)
366
tryEmitnull367 override fun tryEmit(value: T): Boolean {
368 this.value = value
369 return true
370 }
371
emitnull372 override suspend fun emit(value: T) {
373 this.value = value
374 }
375
376 @Suppress("UNCHECKED_CAST")
resetReplayCachenull377 override fun resetReplayCache() {
378 throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
379 }
380
collectnull381 override suspend fun collect(collector: FlowCollector<T>): Nothing {
382 val slot = allocateSlot()
383 try {
384 if (collector is SubscribedFlowCollector) collector.onSubscription()
385 val collectorJob = currentCoroutineContext()[Job]
386 var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
387 // The loop is arranged so that it starts delivering current value without waiting first
388 while (true) {
389 // Here the coroutine could have waited for a while to be dispatched,
390 // so we use the most recent state here to ensure the best possible conflation of stale values
391 val newState = _state.value
392 // always check for cancellation
393 collectorJob?.ensureActive()
394 // Conflate value emissions using equality
395 if (oldState == null || oldState != newState) {
396 collector.emit(NULL.unbox(newState))
397 oldState = newState
398 }
399 // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
400 if (!slot.takePending()) { // try fast-path without suspending first
401 slot.awaitPending() // only suspend for new values when needed
402 }
403 }
404 } finally {
405 freeSlot(slot)
406 }
407 }
408
createSlotnull409 override fun createSlot() = StateFlowSlot()
410 override fun createSlotArray(size: Int): Array<StateFlowSlot?> = arrayOfNulls(size)
411
412 override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
413 fuseStateFlow(context, capacity, onBufferOverflow)
414 }
415
416 internal fun <T> StateFlow<T>.fuseStateFlow(
417 context: CoroutineContext,
418 capacity: Int,
419 onBufferOverflow: BufferOverflow
420 ): Flow<T> {
421 // state flow is always conflated so additional conflation does not have any effect
422 assert { capacity != Channel.CONFLATED } // should be desugared by callers
423 if ((capacity in 0..1 || capacity == Channel.BUFFERED) && onBufferOverflow == BufferOverflow.DROP_OLDEST) {
424 return this
425 }
426 return fuseSharedFlow(context, capacity, onBufferOverflow)
427 }
428