<lambda>null1 package kotlinx.coroutines.guava
2 
3 import com.google.common.util.concurrent.*
4 import com.google.common.util.concurrent.internal.*
5 import kotlinx.coroutines.*
6 import java.util.concurrent.*
7 import java.util.concurrent.CancellationException
8 import kotlin.coroutines.*
9 
10 /**
11  * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
12  *
13  * The coroutine is started immediately. Passing [CoroutineStart.LAZY] to [start] throws
14  * [IllegalArgumentException], because Futures don't have a way to start lazily.
15  *
16  * When the created coroutine [isCompleted][Job.isCompleted], it will try to
17  * *synchronously* complete the returned Future with the same outcome. This will
18  * succeed, barring a race with external cancellation of returned [ListenableFuture].
19  *
20  * Cancellation is propagated bidirectionally.
21  *
22  * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
23  * added/overlaid by passing [context].
24  *
25  * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
26  * member, [Dispatchers.Default] is used.
27  *
28  * The parent job is inherited from this [CoroutineScope], and can be overridden by passing
29  * a [Job] in [context].
30  *
31  * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
32  * facilities.
33  *
34  * Note that the error and cancellation semantics of [future] are _different_ than [async]'s.
35  * In contrast to [Deferred], [Future] doesn't have an intermediate `Cancelling` state. If
36  * the returned `Future` is successfully cancelled, and `block` throws afterward, the thrown
37  * error is dropped, and getting the `Future`'s value will throw a `CancellationException` with
38  * no cause. This is to match the specification and behavior of
39  * `java.util.concurrent.FutureTask`.
40  *
41  * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
42  * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
43  * @param block the code to execute.
44  */
45 public fun <T> CoroutineScope.future(
46     context: CoroutineContext = EmptyCoroutineContext,
47     start: CoroutineStart = CoroutineStart.DEFAULT,
48     block: suspend CoroutineScope.() -> T
49 ): ListenableFuture<T> {
50     require(!start.isLazy) { "$start start is not supported" }
51     val newContext = newCoroutineContext(context)
52     val coroutine = ListenableFutureCoroutine<T>(newContext)
53     coroutine.start(start, coroutine, block)
54     return coroutine.future
55 }
56 
57 /**
58  * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
59  *
60  * Completion is non-atomic between the two promises.
61  *
62  * Cancellation is propagated bidirectionally.
63  *
64  * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
65  * complete the returned `Deferred` with the same value or exception. This will succeed, barring a
66  * race with cancellation of the `Deferred`.
67  *
68  * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
69  * it will cancel the returned `Deferred`.
70  *
71  * When the returned `Deferred` is [cancelled][Deferred.cancel], it will try to propagate the
72  * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
73  * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
74  * will complete with a different outcome than `this` `ListenableFuture`.
75  */
asDeferrednull76 public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
77     /* This method creates very specific behaviour as it entangles the `Deferred` and
78      * `ListenableFuture`. This behaviour is the best discovered compromise between the possible
79      * states and interface contracts of a `Future` and the states of a `Deferred`. The specific
80      * behaviour is described here.
81      *
82      * When `this` `ListenableFuture` is successfully cancelled - meaning
83      * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
84      * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
85      * `Deferred` will always be put into its "cancelling" state and (barring uncooperative
86      * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
87      * cancelled.
88      *
89      * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
90      * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
91      * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
92      * terminal state.
93      *
94      * The returned `Deferred` may receive and suppress the `true` return value from
95      * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
96      * This is unavoidable, so make sure no idempotent cancellation work is performed by a
97      * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
98      * cancellation was from the `Deferred` representation of the task.
99      *
100      * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
101      * semantics. See `Job` for a description of coroutine cancellation semantics.
102      */
103     // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
104     // Exception by using the same instance the Future created.
105     if (this is InternalFutureFailureAccess) {
106         val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
107         if (t != null) {
108             return CompletableDeferred<T>().also {
109                 it.completeExceptionally(t)
110             }
111         }
112     }
113 
114     // Second, try the fast path for a completed Future. The Future is known to be done, so get()
115     // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
116     // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
117     // handle interruption.
118     if (isDone) {
119         return try {
120             CompletableDeferred(Uninterruptibles.getUninterruptibly(this))
121         } catch (e: CancellationException) {
122             CompletableDeferred<T>().also { it.cancel(e) }
123         } catch (e: ExecutionException) {
124             // ExecutionException is the only kind of exception that can be thrown from a gotten
125             // Future. Anything else showing up here indicates a very fundamental bug in a
126             // Future implementation.
127             CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
128         }
129     }
130 
131     // Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
132     val deferred = CompletableDeferred<T>()
133     Futures.addCallback(this, object : FutureCallback<T> {
134         override fun onSuccess(result: T) {
135             runCatching { deferred.complete(result) }
136                 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
137         }
138 
139         override fun onFailure(t: Throwable) {
140             runCatching { deferred.completeExceptionally(t) }
141                 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
142         }
143     }, MoreExecutors.directExecutor())
144 
145     // ... And cancel the Future when the deferred completes. Since the return type of this method
146     // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
147     // completion handler runs before the Future is completed, the Deferred must have been
148     // cancelled and should propagate its cancellation. If it runs after the Future is completed,
149     // this is a no-op.
150     deferred.invokeOnCompletion {
151         cancel(false)
152     }
153     // Return hides the CompletableDeferred. This should prevent casting.
154     return object : Deferred<T> by deferred {}
155 }
156 
157 /**
158  * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
159  *
160  * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
161  * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
162  * [Exception].
163  *
164  * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
165  * state - a serious fundamental bug.
166  */
ExecutionExceptionnull167 private fun ExecutionException.nonNullCause(): Throwable {
168     return this.cause!!
169 }
170 
171 /**
172  * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
173  *
174  * Completion is non-atomic between the two promises.
175  *
176  * When either promise successfully completes, it will attempt to synchronously complete its
177  * counterpart with the same value. This will succeed barring a race with cancellation.
178  *
179  * When either promise completes with an Exception, it will attempt to synchronously complete its
180  * counterpart with the same Exception. This will succeed barring a race with cancellation.
181  *
182  * Cancellation is propagated bidirectionally.
183  *
184  * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
185  * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
186  * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
187  * non-cancelled terminal state.
188  *
189  * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
190  * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
191  * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
192  * the returned `Future` will always _eventually_ reach its cancelled state when either promise is
193  * successfully cancelled, for their different meanings of "successfully cancelled".
194  *
195  * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
196  * semantics. See [Job] for a description of coroutine cancellation semantics. See
197  * [JobListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
198  * corner cases of this method.
199  */
asListenableFuturenull200 public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
201     val listenableFuture = JobListenableFuture<T>(this)
202     // This invokeOnCompletion completes the JobListenableFuture with the same result as `this` Deferred.
203     // The JobListenableFuture may have completed earlier if it got cancelled! See JobListenableFuture.cancel().
204     invokeOnCompletion { throwable ->
205         if (throwable == null) {
206             listenableFuture.complete(getCompleted())
207         } else {
208             listenableFuture.completeExceptionallyOrCancel(throwable)
209         }
210     }
211     return listenableFuture
212 }
213 
214 /**
215  * Awaits completion of `this` [ListenableFuture] without blocking a thread.
216  *
217  * This suspend function is cancellable.
218  *
219  * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
220  * stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
221  *
222  * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
223  * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
224  * [kotlinx.coroutines.NonCancellable].
225  */
awaitnull226 public suspend fun <T> ListenableFuture<T>.await(): T {
227     try {
228         if (isDone) return Uninterruptibles.getUninterruptibly(this)
229     } catch (e: ExecutionException) {
230         // ExecutionException is the only kind of exception that can be thrown from a gotten
231         // Future, other than CancellationException. Cancellation is propagated upward so that
232         // the coroutine running this suspend function may process it.
233         // Any other Exception showing up here indicates a very fundamental bug in a
234         // Future implementation.
235         throw e.nonNullCause()
236     }
237 
238     return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
239         addListener(
240             ToContinuation(this, cont),
241             MoreExecutors.directExecutor())
242         cont.invokeOnCancellation {
243             cancel(false)
244         }
245     }
246 }
247 
248 /**
249  * Propagates the outcome of [futureToObserve] to [continuation] on completion.
250  *
251  * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
252  * and fails, the cause of the Future will be propagated without a wrapping
253  * [ExecutionException] when thrown.
254  */
255 private class ToContinuation<T>(
256     val futureToObserve: ListenableFuture<T>,
257     val continuation: CancellableContinuation<T>
258 ): Runnable {
runnull259     override fun run() {
260         if (futureToObserve.isCancelled) {
261             continuation.cancel()
262         } else {
263             try {
264                 continuation.resume(Uninterruptibles.getUninterruptibly(futureToObserve))
265             } catch (e: ExecutionException) {
266                 // ExecutionException is the only kind of exception that can be thrown from a gotten
267                 // Future. Anything else showing up here indicates a very fundamental bug in a
268                 // Future implementation.
269                 continuation.resumeWithException(e.nonNullCause())
270             }
271         }
272     }
273 }
274 
275 /**
276  * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
277  * completion.
278  *
279  * If [future] is successfully cancelled, cancellation is propagated to `this` `Coroutine`.
280  * By documented contract, a [Future] has been cancelled if
281  * and only if its `isCancelled()` method returns true.
282  *
283  * Any error that occurs after successfully cancelling a [ListenableFuture] is lost.
284  * The contract of [Future] does not permit it to return an error after it is successfully cancelled.
285  * On the other hand, we can't report an unhandled exception to [CoroutineExceptionHandler],
286  * otherwise [Future.cancel] can lead to an app crash which arguably is a contract violation.
287  * In contrast to [Future] which can't change its outcome after a successful cancellation,
288  * cancelling a [Deferred] places that [Deferred] in the cancelling/cancelled states defined by [Job],
289  * which _can_ show the error.
290  *
291  * This may be counterintuitive, but it maintains the error and cancellation contracts of both
292  * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
293  * to the same running task.
294  */
295 private class ListenableFutureCoroutine<T>(
296     context: CoroutineContext
297 ) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {
298 
299     // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
300     @JvmField
301     val future = JobListenableFuture<T>(this)
302 
onCompletednull303     override fun onCompleted(value: T) {
304         future.complete(value)
305     }
306 
onCancellednull307     override fun onCancelled(cause: Throwable, handled: Boolean) {
308         // Note: if future was cancelled in a race with a cancellation of this
309         // coroutine, and the future was successfully cancelled first, the cause of coroutine
310         // cancellation is dropped in this promise. A Future can only be completed once.
311         //
312         // This is consistent with FutureTask behaviour. A race between a Future.cancel() and
313         // a FutureTask.setException() for the same Future will similarly drop the
314         // cause of a failure-after-cancellation.
315         future.completeExceptionallyOrCancel(cause)
316     }
317 }
318 
319 /**
320  * A [ListenableFuture] that delegates to an internal [SettableFuture], collaborating with it.
321  *
322  * This setup allows the returned [ListenableFuture] to maintain the following properties:
323  *
324  * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
325  *   and [isCancelled] methods
326  * - Cancellation propagation both to and from [Deferred]
327  * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
328  *   with different concrete implementations of [ListenableFuture]
329  *   - Fully correct cancellation and listener happens-after obeying [Future] and
330  *     [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
331  *     The best way to be correct, especially given the fun corner cases from
332  *     [AbstractFuture.setFuture], is to just use an [AbstractFuture].
333  *   - To maintain sanity, this class implements [ListenableFuture] and uses an auxiliary [SettableFuture]
334  *     around coroutine's result as a state engine to establish happens-after-completion. This
335  *     could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the
336  *     cost of the implementation's readability.
337  */
338 private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFuture<T> {
339     /**
340      * Serves as a state machine for [Future] cancellation.
341      *
342      * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
343      * cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to
344      * `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned.
345      *
346      * To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled].
347      */
348     private val auxFuture = SettableFuture.create<Any?>()
349 
350     /**
351      * `true` if [auxFuture.get][ListenableFuture.get] throws [ExecutionException].
352      *
353      * Note: this is eventually consistent with the state of [auxFuture].
354      *
355      * Unfortunately, there's no API to figure out if [ListenableFuture] throws [ExecutionException]
356      * apart from calling [ListenableFuture.get] on it. To avoid unnecessary [ExecutionException] allocation
357      * we use this field as an optimization.
358      */
359     private var auxFutureIsFailed: Boolean = false
360 
361     /**
362      * When the attached coroutine [isCompleted][Job.isCompleted] successfully
363      * its outcome should be passed to this method.
364      *
365      * This should succeed barring a race with external cancellation.
366      */
completenull367     fun complete(result: T): Boolean = auxFuture.set(result)
368 
369     /**
370      * When the attached coroutine [isCompleted][Job.isCompleted] [exceptionally][Job.isCancelled]
371      * its outcome should be passed to this method.
372      *
373      * This method will map coroutine's exception into corresponding Future's exception.
374      *
375      * This should succeed barring a race with external cancellation.
376      */
377     // CancellationException is wrapped into `Cancelled` to preserve original cause and message.
378     // All the other exceptions are delegated to SettableFuture.setException.
379     fun completeExceptionallyOrCancel(t: Throwable): Boolean =
380         if (t is CancellationException) auxFuture.set(Cancelled(t))
381         else auxFuture.setException(t).also { if (it) auxFutureIsFailed = true }
382 
383     /**
384      * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
385      * [Job.isCancelled].
386      *
387      * When done, this Future is cancelled if its [auxFuture] is cancelled, or if [auxFuture]
388      * contains [CancellationException].
389      *
390      * See [cancel].
391      */
isCancellednull392     override fun isCancelled(): Boolean {
393         // This expression ensures that isCancelled() will *never* return true when isDone() returns false.
394         // In the case that the deferred has completed with cancellation, completing `this`, its
395         // reaching the "cancelled" state with a cause of CancellationException is treated as the
396         // same thing as auxFuture getting cancelled. If the Job is in the "cancelling" state and
397         // this Future hasn't itself been successfully cancelled, the Future will return
398         // isCancelled() == false. This is the only discovered way to reconcile the two different
399         // cancellation contracts.
400         return auxFuture.isCancelled || isDone && !auxFutureIsFailed && try {
401             Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled
402         } catch (e: CancellationException) {
403             // `auxFuture` got cancelled right after `auxFuture.isCancelled` returned false.
404             true
405         } catch (e: ExecutionException) {
406             // `auxFutureIsFailed` hasn't been updated yet.
407             auxFutureIsFailed = true
408             false
409         }
410     }
411 
412     /**
413      * Waits for [auxFuture] to complete by blocking, then uses its `result`
414      * to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException].
415      * This establishes happens-after ordering for completion of the entangled coroutine.
416      *
417      * [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally.
418      * Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine.
419      *
420      * [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after
421      * contract of [Future] to be correctly followed.
422      */
getnull423     override fun get(): T {
424         return getInternal(auxFuture.get())
425     }
426 
427     /** See [get()]. */
getnull428     override fun get(timeout: Long, unit: TimeUnit): T {
429         return getInternal(auxFuture.get(timeout, unit))
430     }
431 
432     /** See [get()]. */
getInternalnull433     private fun getInternal(result: Any?): T = if (result is Cancelled) {
434         throw CancellationException().initCause(result.exception)
435     } else {
436         // We know that `auxFuture` can contain either `T` or `Cancelled`.
437         @Suppress("UNCHECKED_CAST")
438         result as T
439     }
440 
addListenernull441     override fun addListener(listener: Runnable, executor: Executor) {
442         auxFuture.addListener(listener, executor)
443     }
444 
isDonenull445     override fun isDone(): Boolean {
446         return auxFuture.isDone
447     }
448 
449     /**
450      * Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy.
451      *
452      * The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture]
453      * succeeds, [jobToCancel] will have its [Job.cancel] called.
454      *
455      * This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves
456      * in a particular way. [jobToCancel] may also be in its "cancelling" state while this
457      * ListenableFuture is complete and cancelled.
458      */
cancelnull459     override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
460         // TODO: call jobToCancel.cancel() _before_ running the listeners.
461         //  `auxFuture.cancel()` will execute auxFuture's listeners. This delays cancellation of
462         //  `jobToCancel` until after auxFuture's listeners have already run.
463         //  Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized.
464         return if (auxFuture.cancel(mayInterruptIfRunning)) {
465             jobToCancel.cancel()
466             true
467         } else {
468             false
469         }
470     }
471 
<lambda>null472     override fun toString(): String = buildString {
473         append(super.toString())
474         append("[status=")
475         if (isDone) {
476             try {
477                 when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) {
478                     is Cancelled -> append("CANCELLED, cause=[${result.exception}]")
479                     else -> append("SUCCESS, result=[$result]")
480                 }
481             } catch (e: CancellationException) {
482                 // `this` future was cancelled by `Future.cancel`. In this case there's no cause or message.
483                 append("CANCELLED")
484             } catch (e: ExecutionException) {
485                 append("FAILURE, cause=[${e.cause}]")
486             } catch (t: Throwable) {
487                 // Violation of Future's contract, should never happen.
488                 append("UNKNOWN, cause=[${t.javaClass} thrown from get()]")
489             }
490         } else {
491             append("PENDING, delegate=[$auxFuture]")
492         }
493         append(']')
494     }
495 }
496 
497 /**
498  * A wrapper for `Coroutine`'s [CancellationException].
499  *
500  * If the coroutine is _cancelled normally_, we want to show the reason of cancellation to the user. Unfortunately,
501  * [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap cancellation exception into this
502  * class and pass it into [SettableFuture.complete]. See implementation of [JobListenableFuture].
503  */
504 private class Cancelled(@JvmField val exception: CancellationException)
505