<lambda>null1 @file:Suppress("DEPRECATION_ERROR")
2
3 package kotlinx.coroutines
4
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.internal.*
7 import kotlinx.coroutines.selects.*
8 import kotlin.coroutines.*
9 import kotlin.coroutines.intrinsics.*
10 import kotlin.js.*
11 import kotlin.jvm.*
12
13 /**
14 * A concrete implementation of [Job]. It is optionally a child to a parent job.
15 *
16 * This is an open class designed for extension by more specific classes that might augment the
17 * state and mare store addition state information for completed jobs, like their result values.
18 *
19 * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
20 * @suppress **This is unstable API and it is subject to change.**
21 */
22 @Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
23 public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob {
24 final override val key: CoroutineContext.Key<*> get() = Job
25
26 /*
27 === Internal states ===
28
29 name state class public state description
30 ------ ------------ ------------ -----------
31 EMPTY_N EmptyNew : New no listeners
32 EMPTY_A EmptyActive : Active no listeners
33 SINGLE JobNode : Active a single listener
34 SINGLE+ JobNode : Active a single listener + NodeList added as its next
35 LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
36 LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
37 COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
38 CANCELLING Finishing : Cancelling -- " --
39 FINAL_C Cancelled : Cancelled Cancelled (final state)
40 FINAL_R <any> : Completed produced some result
41
42 === Transitions ===
43
44 New states Active states Inactive states
45
46 +---------+ +---------+ }
47 | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states
48 +---------+ +---------+ | }
49 | | | ^ | +----------+
50 | | | | +--> | FINAL_* |
51 | | V | | +----------+
52 | | +---------+ | }
53 | | | SINGLE | ----+ } JobNode states
54 | | +---------+ | }
55 | | | | }
56 | | V | }
57 | | +---------+ | }
58 | +-------> | SINGLE+ | ----+ }
59 | +---------+ | }
60 | | |
61 V V |
62 +---------+ +---------+ | }
63 | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states
64 +---------+ +---------+ | }
65 | | | | |
66 | | +--------+ | |
67 | | | V |
68 | | | +------------+ | +------------+ }
69 | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
70 | | +------------+ +------------+ }
71 | | | ^
72 | | | |
73 +--------+---------+--------------------+
74
75
76 This state machine and its transition matrix are optimized for the common case when a job is created in active
77 state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
78 successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
79 state without going to COMPLETING state)
80
81 Note that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
82
83 ---------- TIMELINE of state changes and notification in Job lifecycle ----------
84
85 | The longest possible chain of events in shown, shorter versions cut-through intermediate states,
86 | while still performing all the notifications in this order.
87
88 + Job object is created
89 ## NEW: state == EMPTY_NEW | is InactiveNodeList
90 + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
91 ~ waits for start
92 >> start / join / await invoked
93 ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
94 + onStart (lazy coroutine is started)
95 ~ active coroutine is working (or scheduled to execution)
96 >> childCancelled / cancelImpl invoked
97 ## CANCELLING: state is Finishing, state.rootCause != null
98 ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancelling=true) returns NonDisposableHandle
99 ------ new children get immediately cancelled, but are still admitted to the list
100 + onCancelling
101 + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
102 + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
103 ~ waits for completion of coroutine body
104 >> makeCompleting / makeCompletingOnce invoked
105 ## COMPLETING: state is Finishing, state.isCompleting == true
106 ------ new children are not admitted anymore, attachChild returns NonDisposableHandle
107 ~ waits for children
108 >> last child completes
109 - computes the final exception
110 ## SEALED: state is Finishing, state.isSealed == true
111 ------ cancel/childCancelled returns false (cannot handle exceptions anymore)
112 + cancelParent (final exception is communicated to the parent, parent incorporates it)
113 + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
114 ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
115 ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
116 + parentHandle.dispose
117 + notifyCompletion (invoke all completion listeners)
118 + onCompletionInternal / onCompleted / onCancelled
119
120 ---------------------------------------------------------------------------------
121 */
122
123 // Note: use shared objects while we have no listeners
124 // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
125 private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
126
127 private val _parentHandle = atomic<ChildHandle?>(null)
128 internal var parentHandle: ChildHandle?
129 get() = _parentHandle.value
130 set(value) { _parentHandle.value = value }
131
132 override val parent: Job?
133 get() = parentHandle?.parent
134
135 // ------------ initialization ------------
136
137 /**
138 * Initializes parent job.
139 * It shall be invoked at most once after construction after all other initialization.
140 */
141 protected fun initParentJob(parent: Job?) {
142 assert { parentHandle == null }
143 if (parent == null) {
144 parentHandle = NonDisposableHandle
145 return
146 }
147 parent.start() // make sure the parent is started
148 @Suppress("DEPRECATION")
149 val handle = parent.attachChild(this)
150 parentHandle = handle
151 // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
152 if (isCompleted) {
153 handle.dispose()
154 parentHandle = NonDisposableHandle // release it just in case, to aid GC
155 }
156 }
157
158 // ------------ state query ------------
159 /**
160 * Returns current state of this job.
161 * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
162 * and should be [unboxed][unboxState] before returning to user code.
163 */
164 internal val state: Any? get() {
165 _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
166 if (state !is OpDescriptor) return state
167 state.perform(this)
168 }
169 }
170
171 /**
172 * @suppress **This is unstable API and it is subject to change.**
173 */
174 private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
175 while (true) {
176 block(state)
177 }
178 }
179
180 public override val isActive: Boolean get() {
181 val state = this.state
182 return state is Incomplete && state.isActive
183 }
184
185 public final override val isCompleted: Boolean get() = state !is Incomplete
186
187 public final override val isCancelled: Boolean get() {
188 val state = this.state
189 return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
190 }
191
192 // ------------ state update ------------
193
194 // Finalizes Finishing -> Completed (terminal state) transition.
195 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
196 // Returns final state that was created and updated to
197 private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
198 /*
199 * Note: proposed state can be Incomplete, e.g.
200 * async {
201 * something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
202 * }
203 */
204 assert { this.state === state } // consistency check -- it cannot change
205 assert { !state.isSealed } // consistency check -- cannot be sealed yet
206 assert { state.isCompleting } // consistency check -- must be marked as completing
207 val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
208 // Create the final exception and seal the state so that no more exceptions can be added
209 val wasCancelling: Boolean
210 val finalException = synchronized(state) {
211 wasCancelling = state.isCancelling
212 val exceptions = state.sealLocked(proposedException)
213 val finalCause = getFinalRootCause(state, exceptions)
214 if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
215 finalCause
216 }
217 // Create the final state object
218 val finalState = when {
219 // was not cancelled (no exception) -> use proposed update value
220 finalException == null -> proposedUpdate
221 // small optimization when we can used proposeUpdate object as is on cancellation
222 finalException === proposedException -> proposedUpdate
223 // cancelled job final state
224 else -> CompletedExceptionally(finalException)
225 }
226 // Now handle the final exception
227 if (finalException != null) {
228 val handled = cancelParent(finalException) || handleJobException(finalException)
229 if (handled) (finalState as CompletedExceptionally).makeHandled()
230 }
231 // Process state updates for the final state before the state of the Job is actually set to the final state
232 // to avoid races where outside observer may see the job in the final state, yet exception is not handled yet.
233 if (!wasCancelling) onCancelling(finalException)
234 onCompletionInternal(finalState)
235 // Then CAS to completed state -> it must succeed
236 val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
237 assert { casSuccess }
238 // And process all post-completion actions
239 completeStateFinalization(state, finalState)
240 return finalState
241 }
242
243 private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
244 // A case of no exceptions
245 if (exceptions.isEmpty()) {
246 // materialize cancellation exception if it was not materialized yet
247 if (state.isCancelling) return defaultCancellationException()
248 return null
249 }
250 /*
251 * 1) If we have non-CE, use it as root cause
252 * 2) If our original cause was TCE, use *non-original* TCE because of the special nature of TCE
253 * - It is a CE, so it's not reported by children
254 * - The first instance (cancellation cause) is created by timeout coroutine and has no meaningful stacktrace
255 * - The potential second instance is thrown by withTimeout lexical block itself, then it has recovered stacktrace
256 * 3) Just return the very first CE
257 */
258 val firstNonCancellation = exceptions.firstOrNull { it !is CancellationException }
259 if (firstNonCancellation != null) return firstNonCancellation
260 val first = exceptions[0]
261 if (first is TimeoutCancellationException) {
262 val detailedTimeoutException = exceptions.firstOrNull { it !== first && it is TimeoutCancellationException }
263 if (detailedTimeoutException != null) return detailedTimeoutException
264 }
265 return first
266 }
267
268 private fun addSuppressedExceptions(rootCause: Throwable, exceptions: List<Throwable>) {
269 if (exceptions.size <= 1) return // nothing more to do here
270 val seenExceptions = identitySet<Throwable>(exceptions.size)
271 /*
272 * Note that root cause may be a recovered exception as well.
273 * To avoid cycles we unwrap the root cause and check for self-suppression against unwrapped cause,
274 * but add suppressed exceptions to the recovered root cause (as it is our final exception)
275 */
276 val unwrappedCause = unwrap(rootCause)
277 for (exception in exceptions) {
278 val unwrapped = unwrap(exception)
279 if (unwrapped !== rootCause && unwrapped !== unwrappedCause &&
280 unwrapped !is CancellationException && seenExceptions.add(unwrapped)) {
281 rootCause.addSuppressed(unwrapped)
282 }
283 }
284 }
285
286 // fast-path method to finalize normally completed coroutines without children
287 // returns true if complete, and afterCompletion(update) shall be called
288 private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
289 assert { state is Empty || state is JobNode } // only simple state without lists where children can concurrently add
290 assert { update !is CompletedExceptionally } // only for normal completion
291 if (!_state.compareAndSet(state, update.boxIncomplete())) return false
292 onCancelling(null) // simple state is not a failure
293 onCompletionInternal(update)
294 completeStateFinalization(state, update)
295 return true
296 }
297
298 // suppressed == true when any exceptions were suppressed while building the final completion cause
299 private fun completeStateFinalization(state: Incomplete, update: Any?) {
300 /*
301 * Now the job in THE FINAL state. We need to properly handle the resulting state.
302 * Order of various invocations here is important.
303 *
304 * 1) Unregister from parent job.
305 */
306 parentHandle?.let {
307 it.dispose() // volatile read parentHandle _after_ state was updated
308 parentHandle = NonDisposableHandle // release it just in case, to aid GC
309 }
310 val cause = (update as? CompletedExceptionally)?.cause
311 /*
312 * 2) Invoke completion handlers: .join(), callbacks etc.
313 * It's important to invoke them only AFTER exception handling and everything else, see #208
314 */
315 if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
316 try {
317 state.invoke(cause)
318 } catch (ex: Throwable) {
319 handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
320 }
321 } else {
322 state.list?.notifyCompletion(cause)
323 }
324 }
325
326 private fun notifyCancelling(list: NodeList, cause: Throwable) {
327 // first cancel our own children
328 onCancelling(cause)
329 notifyHandlers<JobCancellingNode>(list, cause)
330 // then cancel parent
331 cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
332 }
333
334 /**
335 * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
336 * Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
337 *
338 * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
339 * may leak to the [CoroutineExceptionHandler].
340 */
341 private fun cancelParent(cause: Throwable): Boolean {
342 // Is scoped coroutine -- don't propagate, will be rethrown
343 if (isScopedCoroutine) return true
344
345 /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
346 * This allow parent to cancel its children (normally) without being cancelled itself, unless
347 * child crashes and produce some other exception during its completion.
348 */
349 val isCancellation = cause is CancellationException
350 val parent = parentHandle
351 // No parent -- ignore CE, report other exceptions.
352 if (parent === null || parent === NonDisposableHandle) {
353 return isCancellation
354 }
355
356 // Notify parent but don't forget to check cancellation
357 return parent.childCancelled(cause) || isCancellation
358 }
359
360 private fun NodeList.notifyCompletion(cause: Throwable?) =
361 notifyHandlers<JobNode>(this, cause)
362
363 private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
364 var exception: Throwable? = null
365 list.forEach<T> { node ->
366 try {
367 node.invoke(cause)
368 } catch (ex: Throwable) {
369 exception?.apply { addSuppressed(ex) } ?: run {
370 exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
371 }
372 }
373 }
374 exception?.let { handleOnCompletionException(it) }
375 }
376
377 public final override fun start(): Boolean {
378 loopOnState { state ->
379 when (startInternal(state)) {
380 FALSE -> return false
381 TRUE -> return true
382 }
383 }
384 }
385
386 // returns: RETRY/FALSE/TRUE:
387 // FALSE when not new,
388 // TRUE when started
389 // RETRY when need to retry
390 private fun startInternal(state: Any?): Int {
391 when (state) {
392 is Empty -> { // EMPTY_X state -- no completion handlers
393 if (state.isActive) return FALSE // already active
394 if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
395 onStart()
396 return TRUE
397 }
398 is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
399 if (!_state.compareAndSet(state, state.list)) return RETRY
400 onStart()
401 return TRUE
402 }
403 else -> return FALSE // not a new state
404 }
405 }
406
407 /**
408 * Override to provide the actual [start] action.
409 * This function is invoked exactly once when non-active coroutine is [started][start].
410 */
411 protected open fun onStart() {}
412
413 public final override fun getCancellationException(): CancellationException =
414 when (val state = this.state) {
415 is Finishing -> state.rootCause?.toCancellationException("$classSimpleName is cancelling")
416 ?: error("Job is still new or active: $this")
417 is Incomplete -> error("Job is still new or active: $this")
418 is CompletedExceptionally -> state.cause.toCancellationException()
419 else -> JobCancellationException("$classSimpleName has completed normally", null, this)
420 }
421
422 protected fun Throwable.toCancellationException(message: String? = null): CancellationException =
423 this as? CancellationException ?: defaultCancellationException(message, this)
424
425 /**
426 * Returns the cause that signals the completion of this job -- it returns the original
427 * [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
428 * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
429 * is being cancelled yet.
430 */
431 protected val completionCause: Throwable?
432 get() = when (val state = state) {
433 is Finishing -> state.rootCause
434 ?: error("Job is still new or active: $this")
435 is Incomplete -> error("Job is still new or active: $this")
436 is CompletedExceptionally -> state.cause
437 else -> null
438 }
439
440 /**
441 * Returns `true` when [completionCause] exception was handled by parent coroutine.
442 */
443 protected val completionCauseHandled: Boolean
444 get() = state.let { it is CompletedExceptionally && it.handled }
445
446 public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
447 invokeOnCompletionInternal(
448 onCancelling = false,
449 invokeImmediately = true,
450 handler = InternalCompletionHandler.UserSupplied(handler)
451 )
452
453 public final override fun invokeOnCompletion(onCancelling: Boolean, invokeImmediately: Boolean, handler: CompletionHandler): DisposableHandle =
454 invokeOnCompletionInternal(
455 onCancelling = onCancelling,
456 invokeImmediately = invokeImmediately,
457 handler = InternalCompletionHandler.UserSupplied(handler)
458 )
459
460 internal fun invokeOnCompletionInternal(
461 onCancelling: Boolean,
462 invokeImmediately: Boolean,
463 handler: InternalCompletionHandler
464 ): DisposableHandle {
465 // Create node upfront -- for common cases it just initializes JobNode.job field,
466 // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
467 val node: JobNode = makeNode(handler, onCancelling)
468 loopOnState { state ->
469 when (state) {
470 is Empty -> { // EMPTY_X state -- no completion handlers
471 if (state.isActive) {
472 // try move to SINGLE state
473 if (_state.compareAndSet(state, node)) return node
474 } else
475 promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
476 }
477 is Incomplete -> {
478 val list = state.list
479 if (list == null) { // SINGLE/SINGLE+
480 promoteSingleToNodeList(state as JobNode)
481 } else {
482 var rootCause: Throwable? = null
483 var handle: DisposableHandle = NonDisposableHandle
484 if (onCancelling && state is Finishing) {
485 synchronized(state) {
486 // check if we are installing cancellation handler on job that is being cancelled
487 rootCause = state.rootCause // != null if cancelling job
488 // We add node to the list in two cases --- either the job is not being cancelled
489 // or we are adding a child to a coroutine that is not completing yet
490 if (rootCause == null || handler is ChildHandleNode && !state.isCompleting) {
491 // Note: add node the list while holding lock on state (make sure it cannot change)
492 if (!addLastAtomic(state, list, node)) return@loopOnState // retry
493 // just return node if we don't have to invoke handler (not cancelling yet)
494 if (rootCause == null) return node
495 // otherwise handler is invoked immediately out of the synchronized section & handle returned
496 handle = node
497 }
498 }
499 }
500 if (rootCause != null) {
501 // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
502 if (invokeImmediately) handler.invoke(rootCause)
503 return handle
504 } else {
505 if (addLastAtomic(state, list, node)) return node
506 }
507 }
508 }
509 else -> { // is complete
510 // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
511 // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
512 if (invokeImmediately) handler.invoke((state as? CompletedExceptionally)?.cause)
513 return NonDisposableHandle
514 }
515 }
516 }
517 }
518
519 private fun makeNode(handler: InternalCompletionHandler, onCancelling: Boolean): JobNode {
520 val node = if (onCancelling) {
521 (handler as? JobCancellingNode)
522 ?: InvokeOnCancelling(handler)
523 } else {
524 (handler as? JobNode)
525 ?.also { assert { it !is JobCancellingNode } }
526 ?: InvokeOnCompletion(handler)
527 }
528 node.job = this
529 return node
530 }
531
532 private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
533 list.addLastIf(node) { this.state === expect }
534
535 private fun promoteEmptyToNodeList(state: Empty) {
536 // try to promote it to LIST state with the corresponding state
537 val list = NodeList()
538 val update = if (state.isActive) list else InactiveNodeList(list)
539 _state.compareAndSet(state, update)
540 }
541
542 private fun promoteSingleToNodeList(state: JobNode) {
543 // try to promote it to list (SINGLE+ state)
544 state.addOneIfEmpty(NodeList())
545 // it must be in SINGLE+ state or state has changed (node could have need removed from state)
546 val list = state.nextNode // either our NodeList or somebody else won the race, updated state
547 // just attempt converting it to list if state is still the same, then we'll continue lock-free loop
548 _state.compareAndSet(state, list)
549 }
550
551 public final override suspend fun join() {
552 if (!joinInternal()) { // fast-path no wait
553 coroutineContext.ensureActive()
554 return // do not suspend
555 }
556 return joinSuspend() // slow-path wait
557 }
558
559 private fun joinInternal(): Boolean {
560 loopOnState { state ->
561 if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
562 if (startInternal(state) >= 0) return true // wait unless need to retry
563 }
564 }
565
566 private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
567 // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
568 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont)))
569 }
570
571 @Suppress("UNCHECKED_CAST")
572 public final override val onJoin: SelectClause0
573 get() = SelectClause0Impl(
574 clauseObject = this@JobSupport,
575 regFunc = JobSupport::registerSelectForOnJoin as RegistrationFunction
576 )
577
578 @Suppress("UNUSED_PARAMETER")
579 private fun registerSelectForOnJoin(select: SelectInstance<*>, ignoredParam: Any?) {
580 if (!joinInternal()) {
581 select.selectInRegistrationPhase(Unit)
582 return
583 }
584 val disposableHandle = invokeOnCompletion(handler = SelectOnJoinCompletionHandler(select))
585 select.disposeOnCompletion(disposableHandle)
586 }
587
588 private inner class SelectOnJoinCompletionHandler(
589 private val select: SelectInstance<*>
590 ) : JobNode() {
591 override fun invoke(cause: Throwable?) {
592 select.trySelect(this@JobSupport, Unit)
593 }
594 }
595
596 /**
597 * @suppress **This is unstable API and it is subject to change.**
598 */
599 internal fun removeNode(node: JobNode) {
600 // remove logic depends on the state of the job
601 loopOnState { state ->
602 when (state) {
603 is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
604 if (state !== node) return // a different job node --> we were already removed
605 // try remove and revert back to empty state
606 if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
607 }
608 is Incomplete -> { // may have a list of completion handlers
609 // remove node from the list if there is a list
610 if (state.list != null) node.remove()
611 return
612 }
613 else -> return // it is complete and does not have any completion handlers
614 }
615 }
616 }
617
618 /**
619 * Returns `true` for job that do not have "body block" to complete and should immediately go into
620 * completing state and start waiting for children.
621 *
622 * @suppress **This is unstable API and it is subject to change.**
623 */
624 internal open val onCancelComplete: Boolean get() = false
625
626 // external cancel with cause, never invoked implicitly from internal machinery
627 public override fun cancel(cause: CancellationException?) {
628 cancelInternal(cause ?: defaultCancellationException())
629 }
630
631 protected open fun cancellationExceptionMessage(): String = "Job was cancelled"
632
633 // HIDDEN in Job interface. Invoked only by legacy compiled code.
634 // external cancel with (optional) cause, never invoked implicitly from internal machinery
635 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Added since 1.2.0 for binary compatibility with versions <= 1.1.x")
636 public override fun cancel(cause: Throwable?): Boolean {
637 cancelInternal(cause?.toCancellationException() ?: defaultCancellationException())
638 return true
639 }
640
641 // It is overridden in channel-linked implementation
642 public open fun cancelInternal(cause: Throwable) {
643 cancelImpl(cause)
644 }
645
646 // Parent is cancelling child
647 public final override fun parentCancelled(parentJob: ParentJob) {
648 cancelImpl(parentJob)
649 }
650
651 /**
652 * Child was cancelled with a cause.
653 * In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
654 * It is overridden in supervisor implementations to completely ignore any child cancellation.
655 * Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
656 *
657 * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
658 * may leak to the [CoroutineExceptionHandler].
659 */
660 public open fun childCancelled(cause: Throwable): Boolean {
661 if (cause is CancellationException) return true
662 return cancelImpl(cause) && handlesException
663 }
664
665 /**
666 * Makes this [Job] cancelled with a specified [cause].
667 * It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
668 */
669 public fun cancelCoroutine(cause: Throwable?): Boolean = cancelImpl(cause)
670
671 // cause is Throwable or ParentJob when cancelChild was invoked
672 // returns true is exception was handled, false otherwise
673 internal fun cancelImpl(cause: Any?): Boolean {
674 var finalState: Any? = COMPLETING_ALREADY
675 if (onCancelComplete) {
676 // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
677 // completing and had recorded exception
678 finalState = cancelMakeCompleting(cause)
679 if (finalState === COMPLETING_WAITING_CHILDREN) return true
680 }
681 if (finalState === COMPLETING_ALREADY) {
682 finalState = makeCancelling(cause)
683 }
684 return when {
685 finalState === COMPLETING_ALREADY -> true
686 finalState === COMPLETING_WAITING_CHILDREN -> true
687 finalState === TOO_LATE_TO_CANCEL -> false
688 else -> {
689 afterCompletion(finalState)
690 true
691 }
692 }
693 }
694
695 // cause is Throwable or ParentJob when cancelChild was invoked
696 // It contains a loop and never returns COMPLETING_RETRY, can return
697 // COMPLETING_ALREADY -- if already completed/completing
698 // COMPLETING_WAITING_CHILDREN -- if started waiting for children
699 // final state -- when completed, for call to afterCompletion
700 private fun cancelMakeCompleting(cause: Any?): Any? {
701 loopOnState { state ->
702 if (state !is Incomplete || state is Finishing && state.isCompleting) {
703 // already completed/completing, do not even create exception to propose update
704 return COMPLETING_ALREADY
705 }
706 val proposedUpdate = CompletedExceptionally(createCauseException(cause))
707 val finalState = tryMakeCompleting(state, proposedUpdate)
708 if (finalState !== COMPLETING_RETRY) return finalState
709 }
710 }
711
712 @Suppress("NOTHING_TO_INLINE") // Save a stack frame
713 internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null) =
714 JobCancellationException(message ?: cancellationExceptionMessage(), cause, this)
715
716 override fun getChildJobCancellationCause(): CancellationException {
717 // determine root cancellation cause of this job (why is it cancelling its children?)
718 val state = this.state
719 val rootCause = when (state) {
720 is Finishing -> state.rootCause
721 is CompletedExceptionally -> state.cause
722 is Incomplete -> error("Cannot be cancelling child in this state: $state")
723 else -> null // create exception with the below code on normal completion
724 }
725 return (rootCause as? CancellationException) ?: JobCancellationException("Parent job is ${stateString(state)}", rootCause, this)
726 }
727
728 // cause is Throwable or ParentJob when cancelChild was invoked
729 private fun createCauseException(cause: Any?): Throwable = when (cause) {
730 is Throwable? -> cause ?: defaultCancellationException()
731 else -> (cause as ParentJob).getChildJobCancellationCause()
732 }
733
734 // transitions to Cancelling state
735 // cause is Throwable or ParentJob when cancelChild was invoked
736 // It contains a loop and never returns COMPLETING_RETRY, can return
737 // COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception
738 // COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception
739 // TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception
740 // final state -- when completed, for call to afterCompletion
741 private fun makeCancelling(cause: Any?): Any? {
742 var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
743 loopOnState { state ->
744 when (state) {
745 is Finishing -> { // already finishing -- collect exceptions
746 val notifyRootCause = synchronized(state) {
747 if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled
748 // add exception, do nothing is parent is cancelling child that is already being cancelled
749 val wasCancelling = state.isCancelling // will notify if was not cancelling
750 // Materialize missing exception if it is the first exception (otherwise -- don't)
751 if (cause != null || !wasCancelling) {
752 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
753 state.addExceptionLocked(causeException)
754 }
755 // take cause for notification if was not in cancelling state before
756 state.rootCause.takeIf { !wasCancelling }
757 }
758 notifyRootCause?.let { notifyCancelling(state.list, it) }
759 return COMPLETING_ALREADY
760 }
761 is Incomplete -> {
762 // Not yet finishing -- try to make it cancelling
763 val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
764 if (state.isActive) {
765 // active state becomes cancelling
766 if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
767 } else {
768 // non active state starts completing
769 val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))
770 when {
771 finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")
772 finalState === COMPLETING_RETRY -> return@loopOnState
773 else -> return finalState
774 }
775 }
776 }
777 else -> return TOO_LATE_TO_CANCEL // already complete
778 }
779 }
780 }
781
782 // Performs promotion of incomplete coroutine state to NodeList for the purpose of
783 // converting coroutine state to Cancelling, returns null when need to retry
784 private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
785 when (state) {
786 is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
787 is JobNode -> {
788 // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
789 // correctly capture a reference to it
790 promoteSingleToNodeList(state)
791 null // retry
792 }
793 else -> error("State should have list: $state")
794 }
795
796 // try make new Cancelling state on the condition that we're still in the expected state
797 private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
798 assert { state !is Finishing } // only for non-finishing states
799 assert { state.isActive } // only for active states
800 // get state's list or else promote to list to correctly operate on child lists
801 val list = getOrPromoteCancellingList(state) ?: return false
802 // Create cancelling state (with rootCause!)
803 val cancelling = Finishing(list, false, rootCause)
804 if (!_state.compareAndSet(state, cancelling)) return false
805 // Notify listeners
806 notifyCancelling(list, rootCause)
807 return true
808 }
809
810 /**
811 * Completes this job. Used by [CompletableDeferred.complete] (and exceptionally)
812 * and by [JobImpl.cancel]. It returns `false` on repeated invocation
813 * (when this job is already completing).
814 */
815 internal fun makeCompleting(proposedUpdate: Any?): Boolean {
816 loopOnState { state ->
817 val finalState = tryMakeCompleting(state, proposedUpdate)
818 when {
819 finalState === COMPLETING_ALREADY -> return false
820 finalState === COMPLETING_WAITING_CHILDREN -> return true
821 finalState === COMPLETING_RETRY -> return@loopOnState
822 else -> {
823 afterCompletion(finalState)
824 return true
825 }
826 }
827 }
828 }
829
830 /**
831 * Completes this job. Used by [AbstractCoroutine.resume].
832 * It throws [IllegalStateException] on repeated invocation (when this job is already completing).
833 * Returns:
834 * - [COMPLETING_WAITING_CHILDREN] if started waiting for children.
835 * - Final state otherwise (caller should do [afterCompletion])
836 */
837 internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
838 loopOnState { state ->
839 val finalState = tryMakeCompleting(state, proposedUpdate)
840 when {
841 finalState === COMPLETING_ALREADY ->
842 throw IllegalStateException(
843 "Job $this is already complete or completing, " +
844 "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
845 )
846 finalState === COMPLETING_RETRY -> return@loopOnState
847 else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
848 }
849 }
850 }
851
852 // Returns one of COMPLETING symbols or final state:
853 // COMPLETING_ALREADY -- when already complete or completing
854 // COMPLETING_RETRY -- when need to retry due to interference
855 // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
856 // final state -- when completed, for call to afterCompletion
857 private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
858 if (state !is Incomplete)
859 return COMPLETING_ALREADY
860 /*
861 * FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately
862 * Cancellation (failures) always have to go through Finishing state to serialize exception handling.
863 * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
864 * which may miss unhandled exception.
865 */
866 if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
867 if (tryFinalizeSimpleState(state, proposedUpdate)) {
868 // Completed successfully on fast path -- return updated state
869 return proposedUpdate
870 }
871 return COMPLETING_RETRY
872 }
873 // The separate slow-path function to simplify profiling
874 return tryMakeCompletingSlowPath(state, proposedUpdate)
875 }
876
877 // Returns one of COMPLETING symbols or final state:
878 // COMPLETING_ALREADY -- when already complete or completing
879 // COMPLETING_RETRY -- when need to retry due to interference
880 // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
881 // final state -- when completed, for call to afterCompletion
882 private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
883 // get state's list or else promote to list to correctly operate on child lists
884 val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
885 // promote to Finishing state if we are not in it yet
886 // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
887 // atomically transition to finishing & completing state
888 val finishing = state as? Finishing ?: Finishing(list, false, null)
889 // must synchronize updates to finishing state
890 var notifyRootCause: Throwable? = null
891 synchronized(finishing) {
892 // check if this state is already completing
893 if (finishing.isCompleting) return COMPLETING_ALREADY
894 // mark as completing
895 finishing.isCompleting = true
896 // if we need to promote to finishing then atomically do it here.
897 // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
898 // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
899 if (finishing !== state) {
900 if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
901 }
902 // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
903 assert { !finishing.isSealed } // cannot be sealed
904 // add new proposed exception to the finishing state
905 val wasCancelling = finishing.isCancelling
906 (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
907 // If it just becomes cancelling --> must process cancelling notifications
908 notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
909 }
910 // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
911 notifyRootCause?.let { notifyCancelling(list, it) }
912 // now wait for children
913 val child = firstChild(state)
914 if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
915 return COMPLETING_WAITING_CHILDREN
916 // otherwise -- we have not children left (all were already cancelled?)
917 return finalizeFinishingState(finishing, proposedUpdate)
918 }
919
920 private val Any?.exceptionOrNull: Throwable?
921 get() = (this as? CompletedExceptionally)?.cause
922
923 private fun firstChild(state: Incomplete) =
924 state as? ChildHandleNode ?: state.list?.nextChild()
925
926 // return false when there is no more incomplete children to wait
927 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
928 private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
929 val handle = child.childJob.invokeOnCompletion(
930 invokeImmediately = false,
931 handler = ChildCompletion(this, state, child, proposedUpdate)
932 )
933 if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
934 val nextChild = child.nextChild() ?: return false
935 return tryWaitForChild(state, nextChild, proposedUpdate)
936 }
937
938 // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
939 private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
940 assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
941 // figure out if we need to wait for next child
942 val waitChild = lastChild.nextChild()
943 // try wait for next child
944 if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
945 // no more children to wait -- try update state
946 val finalState = finalizeFinishingState(state, proposedUpdate)
947 afterCompletion(finalState)
948 }
949
950 private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? {
951 var cur = this
952 while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head)
953 while (true) {
954 cur = cur.nextNode
955 if (cur.isRemoved) continue
956 if (cur is ChildHandleNode) return cur
957 if (cur is NodeList) return null // checked all -- no more children
958 }
959 }
960
961 public final override val children: Sequence<Job> get() = sequence {
962 when (val state = this@JobSupport.state) {
963 is ChildHandleNode -> yield(state.childJob)
964 is Incomplete -> state.list?.let { list ->
965 list.forEach<ChildHandleNode> { yield(it.childJob) }
966 }
967 }
968 }
969
970 @Suppress("OverridingDeprecatedMember")
971 public final override fun attachChild(child: ChildJob): ChildHandle {
972 /*
973 * Note: This function attaches a special ChildHandleNode node object. This node object
974 * is handled in a special way on completion on the coroutine (we wait for all of them) and
975 * is handled specially by invokeOnCompletion itself -- it adds this node to the list even
976 * if the job is already cancelling. For cancelling state child is attached under state lock.
977 * It's required to properly wait all children before completion and provide linearizable hierarchy view:
978 * If child is attached when the job is already being cancelled, such child will receive immediate notification on
979 * cancellation, but parent *will* wait for that child before completion and will handle its exception.
980 */
981 return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(child)) as ChildHandle
982 }
983
984 /**
985 * Override to process any exceptions that were encountered while invoking completion handlers
986 * installed via [invokeOnCompletion].
987 *
988 * @suppress **This is unstable API and it is subject to change.**
989 */
990 internal open fun handleOnCompletionException(exception: Throwable) {
991 throw exception
992 }
993
994 /**
995 * This function is invoked once as soon as this job is being cancelled for any reason or completes,
996 * similarly to [invokeOnCompletion] with `onCancelling` set to `true`.
997 *
998 * The meaning of [cause] parameter:
999 * - Cause is `null` when the job has completed normally.
1000 * - Cause is an instance of [CancellationException] when the job was cancelled _normally_.
1001 * **It should not be treated as an error**. In particular, it should not be reported to error logs.
1002 * - Otherwise, the job had been cancelled or failed with exception.
1003 *
1004 * The specified [cause] is not the final cancellation cause of this job.
1005 * A job may produce other exceptions while it is failing and the final cause might be different.
1006 *
1007 * @suppress **This is unstable API and it is subject to change.*
1008 */
1009 protected open fun onCancelling(cause: Throwable?) {}
1010
1011 /**
1012 * Returns `true` for scoped coroutines.
1013 * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
1014 * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
1015 * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
1016 */
1017 protected open val isScopedCoroutine: Boolean get() = false
1018
1019 /**
1020 * Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal].
1021 * A valid implementation of this getter should recursively check parent as well before returning `false`.
1022 *
1023 * The only instance of the [Job] that does not handle its exceptions is [JobImpl] and its subclass [SupervisorJobImpl].
1024 * @suppress **This is unstable API and it is subject to change.*
1025 */
1026 internal open val handlesException: Boolean get() = true
1027
1028 /**
1029 * Handles the final job [exception] that was not handled by the parent coroutine.
1030 * Returns `true` if it handles exception (so handling at later stages is not needed).
1031 * It is designed to be overridden by launch-like coroutines
1032 * (`StandaloneCoroutine` and `ActorCoroutine`) that don't have a result type
1033 * that can represent exceptions.
1034 *
1035 * This method is invoked **exactly once** when the final exception of the job is determined
1036 * and before it becomes complete. At the moment of invocation the job and all its children are complete.
1037 */
1038 protected open fun handleJobException(exception: Throwable): Boolean = false
1039
1040 /**
1041 * Override for completion actions that need to update some external object depending on job's state,
1042 * right before all the waiters for coroutine's completion are notified.
1043 *
1044 * @param state the final state.
1045 *
1046 * @suppress **This is unstable API and it is subject to change.**
1047 */
1048 protected open fun onCompletionInternal(state: Any?) {}
1049
1050 /**
1051 * Override for the very last action on job's completion to resume the rest of the code in
1052 * scoped coroutines. It is called when this job is externally completed in an unknown
1053 * context and thus should resume with a default mode.
1054 *
1055 * @suppress **This is unstable API and it is subject to change.**
1056 */
1057 protected open fun afterCompletion(state: Any?) {}
1058
1059 // for nicer debugging
1060 public override fun toString(): String =
1061 "${toDebugString()}@$hexAddress"
1062
1063 @InternalCoroutinesApi
1064 public fun toDebugString(): String = "${nameString()}{${stateString(state)}}"
1065
1066 /**
1067 * @suppress **This is unstable API and it is subject to change.**
1068 */
1069 internal open fun nameString(): String = classSimpleName
1070
1071 private fun stateString(state: Any?): String = when (state) {
1072 is Finishing -> when {
1073 state.isCancelling -> "Cancelling"
1074 state.isCompleting -> "Completing"
1075 else -> "Active"
1076 }
1077 is Incomplete -> if (state.isActive) "Active" else "New"
1078 is CompletedExceptionally -> "Cancelled"
1079 else -> "Completed"
1080 }
1081
1082 // Completing & Cancelling states,
1083 // All updates are guarded by synchronized(this), reads are volatile
1084 @Suppress("UNCHECKED_CAST")
1085 private class Finishing(
1086 override val list: NodeList,
1087 isCompleting: Boolean,
1088 rootCause: Throwable?
1089 ) : SynchronizedObject(), Incomplete {
1090 private val _isCompleting = atomic(isCompleting)
1091 var isCompleting: Boolean
1092 get() = _isCompleting.value
1093 set(value) { _isCompleting.value = value }
1094
1095 private val _rootCause = atomic(rootCause)
1096 var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED
1097 get() = _rootCause.value
1098 set(value) { _rootCause.value = value }
1099
1100 private val _exceptionsHolder = atomic<Any?>(null)
1101 private var exceptionsHolder: Any? // Contains null | Throwable | ArrayList | SEALED
1102 get() = _exceptionsHolder.value
1103 set(value) { _exceptionsHolder.value = value }
1104
1105 // Note: cannot be modified when sealed
1106 val isSealed: Boolean get() = exceptionsHolder === SEALED
1107 val isCancelling: Boolean get() = rootCause != null
1108 override val isActive: Boolean get() = rootCause == null // !isCancelling
1109
1110 // Seals current state and returns list of exceptions
1111 // guarded by `synchronized(this)`
1112 fun sealLocked(proposedException: Throwable?): List<Throwable> {
1113 val list = when(val eh = exceptionsHolder) { // volatile read
1114 null -> allocateList()
1115 is Throwable -> allocateList().also { it.add(eh) }
1116 is ArrayList<*> -> eh as ArrayList<Throwable>
1117 else -> error("State is $eh") // already sealed -- cannot happen
1118 }
1119 val rootCause = this.rootCause // volatile read
1120 rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning
1121 if (proposedException != null && proposedException != rootCause) list.add(proposedException)
1122 exceptionsHolder = SEALED
1123 return list
1124 }
1125
1126 // guarded by `synchronized(this)`
1127 fun addExceptionLocked(exception: Throwable) {
1128 val rootCause = this.rootCause // volatile read
1129 if (rootCause == null) {
1130 this.rootCause = exception
1131 return
1132 }
1133 if (exception === rootCause) return // nothing to do
1134 when (val eh = exceptionsHolder) { // volatile read
1135 null -> exceptionsHolder = exception
1136 is Throwable -> {
1137 if (exception === eh) return // nothing to do
1138 exceptionsHolder = allocateList().apply {
1139 add(eh)
1140 add(exception)
1141
1142 }
1143 }
1144 is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception)
1145 else -> error("State is $eh") // already sealed -- cannot happen
1146 }
1147 }
1148
1149 private fun allocateList() = ArrayList<Throwable>(4)
1150
1151 override fun toString(): String =
1152 "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$exceptionsHolder, list=$list]"
1153 }
1154
1155 private val Incomplete.isCancelling: Boolean
1156 get() = this is Finishing && isCancelling
1157
1158 // Used by parent that is waiting for child completion
1159 private class ChildCompletion(
1160 private val parent: JobSupport,
1161 private val state: Finishing,
1162 private val child: ChildHandleNode,
1163 private val proposedUpdate: Any?
1164 ) : JobNode() {
1165 override fun invoke(cause: Throwable?) {
1166 parent.continueCompleting(state, child, proposedUpdate)
1167 }
1168 }
1169
1170 private class AwaitContinuation<T>(
1171 delegate: Continuation<T>,
1172 private val job: JobSupport
1173 ) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
1174 override fun getContinuationCancellationCause(parent: Job): Throwable {
1175 val state = job.state
1176 /*
1177 * When the job we are waiting for had already completely completed exceptionally or
1178 * is failing, we shall use its root/completion cause for await's result.
1179 */
1180 if (state is Finishing) state.rootCause?.let { return it }
1181 if (state is CompletedExceptionally) return state.cause
1182 return parent.getCancellationException()
1183 }
1184
1185 protected override fun nameString(): String =
1186 "AwaitContinuation"
1187 }
1188
1189 /*
1190 * =================================================================================================
1191 * This is ready-to-use implementation for Deferred interface.
1192 * However, it is not type-safe. Conceptually it just exposes the value of the underlying
1193 * completed state as `Any?`
1194 * =================================================================================================
1195 */
1196
1197 public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
1198
1199 public fun getCompletionExceptionOrNull(): Throwable? {
1200 val state = this.state
1201 check(state !is Incomplete) { "This job has not completed yet" }
1202 return state.exceptionOrNull
1203 }
1204
1205 /**
1206 * @suppress **This is unstable API and it is subject to change.**
1207 */
1208 internal fun getCompletedInternal(): Any? {
1209 val state = this.state
1210 check(state !is Incomplete) { "This job has not completed yet" }
1211 if (state is CompletedExceptionally) throw state.cause
1212 return state.unboxState()
1213 }
1214
1215 /**
1216 * @suppress **This is unstable API and it is subject to change.**
1217 */
1218 protected suspend fun awaitInternal(): Any? {
1219 // fast-path -- check state (avoid extra object creation)
1220 while (true) { // lock-free loop on state
1221 val state = this.state
1222 if (state !is Incomplete) {
1223 // already complete -- just return result
1224 if (state is CompletedExceptionally) { // Slow path to recover stacktrace
1225 recoverAndThrow(state.cause)
1226 }
1227 return state.unboxState()
1228
1229 }
1230 if (startInternal(state) >= 0) break // break unless needs to retry
1231 }
1232 return awaitSuspend() // slow-path
1233 }
1234
1235 private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
1236 /*
1237 * Custom code here, so that parent coroutine that is using await
1238 * on its child deferred (async) coroutine would throw the exception that this child had
1239 * thrown and not a JobCancellationException.
1240 */
1241 val cont = AwaitContinuation(uCont.intercepted(), this)
1242 // we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
1243 cont.initCancellability()
1244 cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeAwaitOnCompletion(cont)))
1245 cont.getResult()
1246 }
1247
1248 @Suppress("UNCHECKED_CAST")
1249 protected val onAwaitInternal: SelectClause1<*> get() = SelectClause1Impl<Any?>(
1250 clauseObject = this@JobSupport,
1251 regFunc = JobSupport::onAwaitInternalRegFunc as RegistrationFunction,
1252 processResFunc = JobSupport::onAwaitInternalProcessResFunc as ProcessResultFunction
1253 )
1254
1255 @Suppress("UNUSED_PARAMETER")
1256 private fun onAwaitInternalRegFunc(select: SelectInstance<*>, ignoredParam: Any?) {
1257 while (true) {
1258 val state = this.state
1259 if (state !is Incomplete) {
1260 val result = if (state is CompletedExceptionally) state else state.unboxState()
1261 select.selectInRegistrationPhase(result)
1262 return
1263 }
1264 if (startInternal(state) >= 0) break // break unless needs to retry
1265 }
1266 val disposableHandle = invokeOnCompletion(handler = SelectOnAwaitCompletionHandler(select))
1267 select.disposeOnCompletion(disposableHandle)
1268 }
1269
1270 @Suppress("UNUSED_PARAMETER")
1271 private fun onAwaitInternalProcessResFunc(ignoredParam: Any?, result: Any?): Any? {
1272 if (result is CompletedExceptionally) throw result.cause
1273 return result
1274 }
1275
1276 private inner class SelectOnAwaitCompletionHandler(
1277 private val select: SelectInstance<*>
1278 ) : JobNode() {
1279 override fun invoke(cause: Throwable?) {
1280 val state = this@JobSupport.state
1281 val result = if (state is CompletedExceptionally) state else state.unboxState()
1282 select.trySelect(this@JobSupport, result)
1283 }
1284 }
1285 }
1286
1287 /*
1288 * Class to represent object as the final state of the Job
1289 */
1290 private class IncompleteStateBox(@JvmField val state: Incomplete)
boxIncompletenull1291 internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
1292 internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
1293
1294 // --------------- helper classes & constants for job implementation
1295
1296 private val COMPLETING_ALREADY = Symbol("COMPLETING_ALREADY")
1297 @JvmField
1298 internal val COMPLETING_WAITING_CHILDREN = Symbol("COMPLETING_WAITING_CHILDREN")
1299 private val COMPLETING_RETRY = Symbol("COMPLETING_RETRY")
1300 private val TOO_LATE_TO_CANCEL = Symbol("TOO_LATE_TO_CANCEL")
1301
1302 private const val RETRY = -1
1303 private const val FALSE = 0
1304 private const val TRUE = 1
1305
1306 private val SEALED = Symbol("SEALED")
1307 private val EMPTY_NEW = Empty(false)
1308 private val EMPTY_ACTIVE = Empty(true)
1309
1310 private class Empty(override val isActive: Boolean) : Incomplete {
1311 override val list: NodeList? get() = null
1312 override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
1313 }
1314
1315 @PublishedApi // for a custom job in the test module
1316 internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
1317 init { initParentJob(parent) }
1318 override val onCancelComplete get() = true
1319 /*
1320 * Check whether parent is able to handle exceptions as well.
1321 * With this check, an exception in that pattern will be handled once:
1322 * ```
1323 * launch {
1324 * val child = Job(coroutineContext[Job])
1325 * launch(child) { throw ... }
1326 * }
1327 * ```
1328 */
1329 override val handlesException: Boolean = handlesException()
completenull1330 override fun complete() = makeCompleting(Unit)
1331 override fun completeExceptionally(exception: Throwable): Boolean =
1332 makeCompleting(CompletedExceptionally(exception))
1333
1334 @JsName("handlesExceptionF")
1335 private fun handlesException(): Boolean {
1336 var parentJob = (parentHandle as? ChildHandleNode)?.job ?: return false
1337 while (true) {
1338 if (parentJob.handlesException) return true
1339 parentJob = (parentJob.parentHandle as? ChildHandleNode)?.job ?: return false
1340 }
1341 }
1342 }
1343
1344 // -------- invokeOnCompletion nodes
1345
1346 internal interface Incomplete {
1347 val isActive: Boolean
1348 val list: NodeList? // is null only for Empty and JobNode incomplete state objects
1349 }
1350
1351 internal abstract class JobNode : LockFreeLinkedListNode(), InternalCompletionHandler, DisposableHandle, Incomplete {
1352 /**
1353 * Initialized by [JobSupport.makeNode].
1354 */
1355 lateinit var job: JobSupport
1356 override val isActive: Boolean get() = true
1357 override val list: NodeList? get() = null
disposenull1358 override fun dispose() = job.removeNode(this)
1359 override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
1360 }
1361
1362 internal class NodeList : LockFreeLinkedListHead(), Incomplete {
1363 override val isActive: Boolean get() = true
1364 override val list: NodeList get() = this
1365
1366 fun getString(state: String) = buildString {
1367 append("List{")
1368 append(state)
1369 append("}[")
1370 var first = true
1371 this@NodeList.forEach<JobNode> { node ->
1372 if (first) first = false else append(", ")
1373 append(node)
1374 }
1375 append("]")
1376 }
1377
1378 override fun toString(): String =
1379 if (DEBUG) getString("Active") else super.toString()
1380 }
1381
1382 internal class InactiveNodeList(
1383 override val list: NodeList
1384 ) : Incomplete {
1385 override val isActive: Boolean get() = false
toStringnull1386 override fun toString(): String = if (DEBUG) list.getString("New") else super.toString()
1387 }
1388
1389 private class InvokeOnCompletion(
1390 private val handler: InternalCompletionHandler
1391 ) : JobNode() {
1392 override fun invoke(cause: Throwable?) = handler.invoke(cause)
1393 }
1394
1395 private class ResumeOnCompletion(
1396 private val continuation: Continuation<Unit>
1397 ) : JobNode() {
invokenull1398 override fun invoke(cause: Throwable?) = continuation.resume(Unit)
1399 }
1400
1401 private class ResumeAwaitOnCompletion<T>(
1402 private val continuation: CancellableContinuationImpl<T>
1403 ) : JobNode() {
1404 override fun invoke(cause: Throwable?) {
1405 val state = job.state
1406 assert { state !is Incomplete }
1407 if (state is CompletedExceptionally) {
1408 // Resume with with the corresponding exception to preserve it
1409 continuation.resumeWithException(state.cause)
1410 } else {
1411 // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
1412 @Suppress("UNCHECKED_CAST")
1413 continuation.resume(state.unboxState() as T)
1414 }
1415 }
1416 }
1417
1418 internal class DisposeOnCompletion(
1419 private val handle: DisposableHandle
1420 ) : JobNode() {
invokenull1421 override fun invoke(cause: Throwable?) = handle.dispose()
1422 }
1423
1424 // -------- invokeOnCancellation nodes
1425
1426 /**
1427 * Marker for node that shall be invoked on in _cancelling_ state.
1428 * **Note: may be invoked multiple times.**
1429 */
1430 internal abstract class JobCancellingNode : JobNode()
1431
1432 private class InvokeOnCancelling(
1433 private val handler: InternalCompletionHandler
1434 ) : JobCancellingNode() {
1435 // delegate handler shall be invoked at most once, so here is an additional flag
1436 private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
1437 override fun invoke(cause: Throwable?) {
1438 if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
1439 }
1440 }
1441
1442 internal class ChildHandleNode(
1443 @JvmField val childJob: ChildJob
1444 ) : JobCancellingNode(), ChildHandle {
1445 override val parent: Job get() = job
invokenull1446 override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
1447 override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
1448 }
1449
1450 // Same as ChildHandleNode, but for cancellable continuation
1451 @PublishedApi
1452 internal class ChildContinuation(
1453 // Used by the IDEA debugger via reflection and must be kept binary-compatible, see KTIJ-24102
1454 @JvmField val child: CancellableContinuationImpl<*>
1455 ) : JobCancellingNode() {
1456 override fun invoke(cause: Throwable?) {
1457 child.parentCancelled(child.getContinuationCancellationCause(job))
1458 }
1459 }
1460
1461