<lambda>null1 package kotlinx.coroutines.internal
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlin.coroutines.*
6 import kotlin.jvm.*
7
8 private val UNDEFINED = Symbol("UNDEFINED")
9 @JvmField
10 internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED")
11
12 @PublishedApi
13 internal class DispatchedContinuation<in T>(
14 @JvmField internal val dispatcher: CoroutineDispatcher,
15 // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
16 @JvmField val continuation: Continuation<T>
17 ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
18 @JvmField
19 @Suppress("PropertyName")
20 internal var _state: Any? = UNDEFINED
21 override val callerFrame: CoroutineStackFrame? get() = continuation as? CoroutineStackFrame
22 override fun getStackTraceElement(): StackTraceElement? = null
23 @JvmField // pre-cached value to avoid ctx.fold on every resumption
24 internal val countOrElement = threadContextElements(context)
25
26 /**
27 * Possible states of reusability:
28 *
29 * 1) `null`. Cancellable continuation wasn't yet attempted to be reused or
30 * was used and then invalidated (e.g. because of the cancellation).
31 * 2) [CancellableContinuation]. Continuation to be/that is being reused.
32 * 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block:
33 * ```
34 * // state == null | CC
35 * suspendCancellableCoroutineReusable { cont ->
36 * // state == REUSABLE_CLAIMED
37 * block(cont)
38 * }
39 * // state == CC
40 * ```
41 * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
42 * [CancellableContinuationImpl.getResult] will check for cancellation later.
43 *
44 * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
45 * In the `getResult`, we have the following code:
46 * ```
47 * if (trySuspend()) {
48 * // <- at this moment current continuation can be redispatched and claimed again.
49 * attachChildToParent()
50 * releaseClaimedContinuation()
51 * }
52 * ```
53 */
54 private val _reusableCancellableContinuation = atomic<Any?>(null)
55
56 private val reusableCancellableContinuation: CancellableContinuationImpl<*>?
57 get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
58
59 internal fun isReusable(): Boolean {
60 /*
61 Invariant: caller.resumeMode.isReusableMode
62 * Reusability control:
63 * `null` -> no reusability at all, `false`
64 * anything else -> reusable.
65 */
66 return _reusableCancellableContinuation.value != null
67 }
68
69 /**
70 * Awaits until previous call to `suspendCancellableCoroutineReusable` will
71 * stop mutating cached instance
72 */
73 internal fun awaitReusability() {
74 _reusableCancellableContinuation.loop {
75 if (it !== REUSABLE_CLAIMED) return
76 }
77 }
78
79 internal fun release() {
80 /*
81 * Called from `releaseInterceptedContinuation`, can be concurrent with
82 * the code in `getResult` right after `trySuspend` returned `true`, so we have
83 * to wait for a release here.
84 */
85 awaitReusability()
86 reusableCancellableContinuation?.detachChild()
87 }
88
89 /**
90 * Claims the continuation for [suspendCancellableCoroutineReusable] block,
91 * so all cancellations will be postponed.
92 */
93 @Suppress("UNCHECKED_CAST")
94 internal fun claimReusableCancellableContinuation(): CancellableContinuationImpl<T>? {
95 /*
96 * Transitions:
97 * 1) `null` -> claimed, caller will instantiate CC instance
98 * 2) `CC` -> claimed, caller will reuse CC instance
99 */
100 _reusableCancellableContinuation.loop { state ->
101 when {
102 state === null -> {
103 /*
104 * null -> CC was not yet published -> we do not compete with cancel
105 * -> can use plain store instead of CAS
106 */
107 _reusableCancellableContinuation.value = REUSABLE_CLAIMED
108 return null
109 }
110 // potentially competing with cancel
111 state is CancellableContinuationImpl<*> -> {
112 if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
113 return state as CancellableContinuationImpl<T>
114 }
115 }
116 state === REUSABLE_CLAIMED -> {
117 // Do nothing, wait until reusable instance will be returned from
118 // getResult() of a previous `suspendCancellableCoroutineReusable`
119 }
120 state is Throwable -> {
121 // Also do nothing, Throwable can only indicate that the CC
122 // is in REUSABLE_CLAIMED state, but with postponed cancellation
123 }
124 else -> error("Inconsistent state $state")
125 }
126 }
127 }
128
129 /**
130 * Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state
131 * and returns cancellation cause if so, `null` otherwise.
132 * If continuation was cancelled, it becomes non-reusable.
133 *
134 * ```
135 * suspendCancellableCoroutineReusable { // <- claimed
136 * // Any asynchronous cancellation is "postponed" while this block
137 * // is being executed
138 * } // postponed cancellation is checked here in `getResult`
139 * ```
140 *
141 * See [CancellableContinuationImpl.getResult].
142 */
143 internal fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
144 _reusableCancellableContinuation.loop { state ->
145 // not when(state) to avoid Intrinsics.equals call
146 when {
147 state === REUSABLE_CLAIMED -> {
148 if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
149 }
150 state is Throwable -> {
151 require(_reusableCancellableContinuation.compareAndSet(state, null))
152 return state
153 }
154 else -> error("Inconsistent state $state")
155 }
156 }
157 }
158
159 /**
160 * Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state.
161 * Returns `true` if cancellation is (or previously was) postponed, `false` otherwise.
162 */
163 internal fun postponeCancellation(cause: Throwable): Boolean {
164 _reusableCancellableContinuation.loop { state ->
165 when (state) {
166 REUSABLE_CLAIMED -> {
167 if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause))
168 return true
169 }
170 is Throwable -> return true
171 else -> {
172 // Invalidate
173 if (_reusableCancellableContinuation.compareAndSet(state, null))
174 return false
175 }
176 }
177 }
178 }
179
180 override fun takeState(): Any? {
181 val state = _state
182 assert { state !== UNDEFINED } // fail-fast if repeatedly invoked
183 _state = UNDEFINED
184 return state
185 }
186
187 override val delegate: Continuation<T>
188 get() = this
189
190 override fun resumeWith(result: Result<T>) {
191 val context = continuation.context
192 val state = result.toState()
193 if (dispatcher.isDispatchNeeded(context)) {
194 _state = state
195 resumeMode = MODE_ATOMIC
196 dispatcher.dispatch(context, this)
197 } else {
198 executeUnconfined(state, MODE_ATOMIC) {
199 withCoroutineContext(this.context, countOrElement) {
200 continuation.resumeWith(result)
201 }
202 }
203 }
204 }
205
206 // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
207 // It is used only in Continuation<T>.resumeCancellableWith
208 @Suppress("NOTHING_TO_INLINE")
209 internal inline fun resumeCancellableWith(
210 result: Result<T>,
211 noinline onCancellation: ((cause: Throwable) -> Unit)?
212 ) {
213 val state = result.toState(onCancellation)
214 if (dispatcher.isDispatchNeeded(context)) {
215 _state = state
216 resumeMode = MODE_CANCELLABLE
217 dispatcher.dispatch(context, this)
218 } else {
219 executeUnconfined(state, MODE_CANCELLABLE) {
220 if (!resumeCancelled(state)) {
221 resumeUndispatchedWith(result)
222 }
223 }
224 }
225 }
226
227 // takeState had already cleared the state so we cancel takenState here
228 override fun cancelCompletedResult(takenState: Any?, cause: Throwable) {
229 // It is Ok to call onCancellation here without try/catch around it, since this function only faces
230 // a "bound" cancellation handler that performs the safe call to the user-specified code.
231 if (takenState is CompletedWithCancellation) {
232 takenState.onCancellation(cause)
233 }
234 }
235
236 // inline here is to save us an entry on the stack for the sake of better stacktraces
237 @Suppress("NOTHING_TO_INLINE")
238 internal inline fun resumeCancelled(state: Any?): Boolean {
239 val job = context[Job]
240 if (job != null && !job.isActive) {
241 val cause = job.getCancellationException()
242 cancelCompletedResult(state, cause)
243 resumeWithException(cause)
244 return true
245 }
246 return false
247 }
248
249 @Suppress("NOTHING_TO_INLINE")
250 internal inline fun resumeUndispatchedWith(result: Result<T>) {
251 withContinuationContext(continuation, countOrElement) {
252 continuation.resumeWith(result)
253 }
254 }
255
256 // used by "yield" implementation
257 internal fun dispatchYield(context: CoroutineContext, value: T) {
258 _state = value
259 resumeMode = MODE_CANCELLABLE
260 dispatcher.dispatchYield(context, this)
261 }
262
263 override fun toString(): String =
264 "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
265 }
266
267 /**
268 * It is not inline to save bytecode (it is pretty big and used in many places)
269 * and we leave it public so that its name is not mangled in use stack traces if it shows there.
270 * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher.
271 * @suppress **This an internal API and should not be used from general code.**
272 */
273 @InternalCoroutinesApi
resumeCancellableWithnull274 public fun <T> Continuation<T>.resumeCancellableWith(
275 result: Result<T>,
276 onCancellation: ((cause: Throwable) -> Unit)? = null
277 ): Unit = when (this) {
278 is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
279 else -> resumeWith(result)
280 }
281
yieldUndispatchednull282 internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
283 executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
284 run()
285 }
286
287 /**
288 * Executes given [block] as part of current event loop, updating current continuation
289 * mode and state if continuation is not resumed immediately.
290 * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
291 * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
292 */
executeUnconfinednull293 private inline fun DispatchedContinuation<*>.executeUnconfined(
294 contState: Any?, mode: Int, doYield: Boolean = false,
295 block: () -> Unit
296 ): Boolean {
297 assert { mode != MODE_UNINITIALIZED } // invalid execution mode
298 val eventLoop = ThreadLocalEventLoop.eventLoop
299 // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
300 if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
301 return if (eventLoop.isUnconfinedLoopActive) {
302 // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
303 _state = contState
304 resumeMode = mode
305 eventLoop.dispatchUnconfined(this)
306 true // queued into the active loop
307 } else {
308 // Was not active -- run event loop until all unconfined tasks are executed
309 runUnconfinedEventLoop(eventLoop, block = block)
310 false
311 }
312 }
313