<lambda>null1 package kotlinx.coroutines.internal
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlin.coroutines.*
6 import kotlin.jvm.*
7 
8 private val UNDEFINED = Symbol("UNDEFINED")
9 @JvmField
10 internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
11 
12 @PublishedApi
13 internal class DispatchedContinuation<in T>(
14     @JvmField internal val dispatcher: CoroutineDispatcher,
15     // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
16     @JvmField val continuation: Continuation<T>
17 ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
18     @JvmField
19     @Suppress("PropertyName")
20     internal var _state: Any? = UNDEFINED
21     override val callerFrame: CoroutineStackFrame? get() = continuation as? CoroutineStackFrame
22     override fun getStackTraceElement(): StackTraceElement? = null
23     @JvmField // pre-cached value to avoid ctx.fold on every resumption
24     internal val countOrElement = threadContextElements(context)
25 
26     /**
27      * Possible states of reusability:
28      *
29      * 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
30      *     was used and then invalidated (e.g. because of the cancellation).
31      * 2) [CancellableContinuation]. Continuation to be/that is being reused.
32      * 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
33      *    ```
34      *    // state == null | CC
35      *    suspendCancellableCoroutineReusable { cont ->
36      *        // state == REUSABLE_CLAIMED
37      *        block(cont)
38      *    }
39      *    // state == CC
40      *    ```
41      * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
42      *    [CancellableContinuationImpl.getResult] will check for cancellation later.
43      *
44      * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
45      * In the `getResult`, we have the following code:
46      * ```
47      * if (trySuspend()) {
48      *     // <- at this moment current continuation can be redispatched and claimed again.
49      *     attachChildToParent()
50      *     releaseClaimedContinuation()
51      * }
52      * ```
53      */
54     private val _reusableCancellableContinuation = atomic<Any?>(null)
55 
56     private val reusableCancellableContinuation: CancellableContinuationImpl<*>?
57         get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
58 
59     internal fun isReusable(): Boolean {
60         /*
61         Invariant: caller.resumeMode.isReusableMode
62          * Reusability control:
63          * `null` -> no reusability at all, `false`
64          * anything else -> reusable.
65          */
66         return _reusableCancellableContinuation.value != null
67     }
68 
69     /**
70      * Awaits until previous call to `suspendCancellableCoroutineReusable` will
71      * stop mutating cached instance
72      */
73     internal fun awaitReusability() {
74         _reusableCancellableContinuation.loop {
75             if (it !== REUSABLE_CLAIMED) return
76         }
77     }
78 
79     internal fun release() {
80         /*
81          * Called from `releaseInterceptedContinuation`, can be concurrent with
82          * the code in `getResult` right after `trySuspend` returned `true`, so we have
83          * to wait for a release here.
84          */
85         awaitReusability()
86         reusableCancellableContinuation?.detachChild()
87     }
88 
89     /**
90      * Claims the continuation for [suspendCancellableCoroutineReusable] block,
91      * so all cancellations will be postponed.
92      */
93     @Suppress("UNCHECKED_CAST")
94     internal fun claimReusableCancellableContinuation(): CancellableContinuationImpl<T>? {
95         /*
96          * Transitions:
97          * 1) `null` -> claimed, caller will instantiate CC instance
98          * 2) `CC` -> claimed, caller will reuse CC instance
99          */
100         _reusableCancellableContinuation.loop { state ->
101             when {
102                 state === null -> {
103                     /*
104                      * null -> CC was not yet published -> we do not compete with cancel
105                      * -> can use plain store instead of CAS
106                      */
107                     _reusableCancellableContinuation.value = REUSABLE_CLAIMED
108                     return null
109                 }
110                 // potentially competing with cancel
111                 state is CancellableContinuationImpl<*> -> {
112                     if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
113                         return state as CancellableContinuationImpl<T>
114                     }
115                 }
116                 state === REUSABLE_CLAIMED -> {
117                     // Do nothing, wait until reusable instance will be returned from
118                     // getResult() of a previous `suspendCancellableCoroutineReusable`
119                 }
120                 state is Throwable -> {
121                     // Also do nothing, Throwable can only indicate that the CC
122                     // is in REUSABLE_CLAIMED state, but with postponed cancellation
123                 }
124                 else -> error("Inconsistent state $state")
125             }
126         }
127     }
128 
129     /**
130      * Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state
131      * and returns cancellation cause if so, `null` otherwise.
132      * If continuation was cancelled, it becomes non-reusable.
133      *
134      * ```
135      * suspendCancellableCoroutineReusable { // <- claimed
136      * // Any asynchronous cancellation is "postponed" while this block
137      * // is being executed
138      * } // postponed cancellation is checked here in `getResult`
139      * ```
140      *
141      * See [CancellableContinuationImpl.getResult].
142      */
143     internal fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
144         _reusableCancellableContinuation.loop { state ->
145             // not when(state) to avoid Intrinsics.equals call
146             when {
147                 state === REUSABLE_CLAIMED -> {
148                     if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
149                 }
150                 state is Throwable -> {
151                     require(_reusableCancellableContinuation.compareAndSet(state, null))
152                     return state
153                 }
154                 else -> error("Inconsistent state $state")
155             }
156         }
157     }
158 
159     /**
160      * Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
161      * Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
162      */
163     internal fun postponeCancellation(cause: Throwable): Boolean {
164         _reusableCancellableContinuation.loop { state ->
165             when (state) {
166                 REUSABLE_CLAIMED -> {
167                     if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause))
168                         return true
169                 }
170                 is Throwable -> return true
171                 else -> {
172                     // Invalidate
173                     if (_reusableCancellableContinuation.compareAndSet(state, null))
174                         return false
175                 }
176             }
177         }
178     }
179 
180     override fun takeState(): Any? {
181         val state = _state
182         assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
183         _state = UNDEFINED
184         return state
185     }
186 
187     override val delegate: Continuation<T>
188         get() = this
189 
190     override fun resumeWith(result: Result<T>) {
191         val context = continuation.context
192         val state = result.toState()
193         if (dispatcher.isDispatchNeeded(context)) {
194             _state = state
195             resumeMode = MODE_ATOMIC
196             dispatcher.dispatch(context, this)
197         } else {
198             executeUnconfined(state, MODE_ATOMIC) {
199                 withCoroutineContext(this.context, countOrElement) {
200                     continuation.resumeWith(result)
201                 }
202             }
203         }
204     }
205 
206     // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
207     // It is used only in Continuation<T>.resumeCancellableWith
208     @Suppress("NOTHING_TO_INLINE")
209     internal inline fun resumeCancellableWith(
210         result: Result<T>,
211         noinline onCancellation: ((cause: Throwable) -> Unit)?
212     ) {
213         val state = result.toState(onCancellation)
214         if (dispatcher.isDispatchNeeded(context)) {
215             _state = state
216             resumeMode = MODE_CANCELLABLE
217             dispatcher.dispatch(context, this)
218         } else {
219             executeUnconfined(state, MODE_CANCELLABLE) {
220                 if (!resumeCancelled(state)) {
221                     resumeUndispatchedWith(result)
222                 }
223             }
224         }
225     }
226 
227     // takeState had already cleared the state so we cancel takenState here
228     override fun cancelCompletedResult(takenState: Any?, cause: Throwable) {
229         // It is Ok to call onCancellation here without try/catch around it, since this function only faces
230         // a "bound" cancellation handler that performs the safe call to the user-specified code.
231         if (takenState is CompletedWithCancellation) {
232             takenState.onCancellation(cause)
233         }
234     }
235 
236     // inline here is to save us an entry on the stack for the sake of better stacktraces
237     @Suppress("NOTHING_TO_INLINE")
238     internal inline fun resumeCancelled(state: Any?): Boolean {
239         val job = context[Job]
240         if (job != null && !job.isActive) {
241             val cause = job.getCancellationException()
242             cancelCompletedResult(state, cause)
243             resumeWithException(cause)
244             return true
245         }
246         return false
247     }
248 
249     @Suppress("NOTHING_TO_INLINE")
250     internal inline fun resumeUndispatchedWith(result: Result<T>) {
251         withContinuationContext(continuation, countOrElement) {
252             continuation.resumeWith(result)
253         }
254     }
255 
256     // used by "yield" implementation
257     internal fun dispatchYield(context: CoroutineContext, value: T) {
258         _state = value
259         resumeMode = MODE_CANCELLABLE
260         dispatcher.dispatchYield(context, this)
261     }
262 
263     override fun toString(): String =
264         "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
265 }
266 
267 /**
268  * It is not inline to save bytecode (it is pretty big and used in many places)
269  * and we leave it public so that its name is not mangled in use stack traces if it shows there.
270  * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher.
271  * @suppress **This an internal API and should not be used from general code.**
272  */
273 @InternalCoroutinesApi
resumeCancellableWithnull274 public fun <T> Continuation<T>.resumeCancellableWith(
275     result: Result<T>,
276     onCancellation: ((cause: Throwable) -> Unit)? = null
277 ): Unit = when (this) {
278     is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
279     else -> resumeWith(result)
280 }
281 
yieldUndispatchednull282 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
283     executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
284         run()
285     }
286 
287 /**
288  * Executes given [block] as part of current event loop, updating current continuation
289  * mode and state if continuation is not resumed immediately.
290  * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
291  * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
292  */
executeUnconfinednull293 private inline fun DispatchedContinuation<*>.executeUnconfined(
294     contState: Any?, mode: Int, doYield: Boolean = false,
295     block: () -> Unit
296 ): Boolean {
297     assert { mode != MODE_UNINITIALIZED } // invalid execution mode
298     val eventLoop = ThreadLocalEventLoop.eventLoop
299     // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
300     if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
301     return if (eventLoop.isUnconfinedLoopActive) {
302         // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
303         _state = contState
304         resumeMode = mode
305         eventLoop.dispatchUnconfined(this)
306         true // queued into the active loop
307     } else {
308         // Was not active -- run event loop until all unconfined tasks are executed
309         runUnconfinedEventLoop(eventLoop, block = block)
310         false
311     }
312 }
313