<lambda>null1 package kotlinx.coroutines.guava
2
3 import com.google.common.util.concurrent.*
4 import com.google.common.util.concurrent.internal.*
5 import kotlinx.coroutines.*
6 import java.util.concurrent.*
7 import java.util.concurrent.CancellationException
8 import kotlin.coroutines.*
9
10 /**
11 * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
12 *
13 * The coroutine is started immediately. Passing [CoroutineStart.LAZY] to [start] throws
14 * [IllegalArgumentException], because Futures don't have a way to start lazily.
15 *
16 * When the created coroutine [isCompleted][Job.isCompleted], it will try to
17 * *synchronously* complete the returned Future with the same outcome. This will
18 * succeed, barring a race with external cancellation of returned [ListenableFuture].
19 *
20 * Cancellation is propagated bidirectionally.
21 *
22 * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
23 * added/overlaid by passing [context].
24 *
25 * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
26 * member, [Dispatchers.Default] is used.
27 *
28 * The parent job is inherited from this [CoroutineScope], and can be overridden by passing
29 * a [Job] in [context].
30 *
31 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
32 * facilities.
33 *
34 * Note that the error and cancellation semantics of [future] are _different_ than [async]'s.
35 * In contrast to [Deferred], [Future] doesn't have an intermediate `Cancelling` state. If
36 * the returned `Future` is successfully cancelled, and `block` throws afterward, the thrown
37 * error is dropped, and getting the `Future`'s value will throw a `CancellationException` with
38 * no cause. This is to match the specification and behavior of
39 * `java.util.concurrent.FutureTask`.
40 *
41 * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
42 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
43 * @param block the code to execute.
44 */
45 public fun <T> CoroutineScope.future(
46 context: CoroutineContext = EmptyCoroutineContext,
47 start: CoroutineStart = CoroutineStart.DEFAULT,
48 block: suspend CoroutineScope.() -> T
49 ): ListenableFuture<T> {
50 require(!start.isLazy) { "$start start is not supported" }
51 val newContext = newCoroutineContext(context)
52 val coroutine = ListenableFutureCoroutine<T>(newContext)
53 coroutine.start(start, coroutine, block)
54 return coroutine.future
55 }
56
57 /**
58 * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
59 *
60 * Completion is non-atomic between the two promises.
61 *
62 * Cancellation is propagated bidirectionally.
63 *
64 * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
65 * complete the returned `Deferred` with the same value or exception. This will succeed, barring a
66 * race with cancellation of the `Deferred`.
67 *
68 * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
69 * it will cancel the returned `Deferred`.
70 *
71 * When the returned `Deferred` is [cancelled][Deferred.cancel], it will try to propagate the
72 * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
73 * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
74 * will complete with a different outcome than `this` `ListenableFuture`.
75 */
asDeferrednull76 public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
77 /* This method creates very specific behaviour as it entangles the `Deferred` and
78 * `ListenableFuture`. This behaviour is the best discovered compromise between the possible
79 * states and interface contracts of a `Future` and the states of a `Deferred`. The specific
80 * behaviour is described here.
81 *
82 * When `this` `ListenableFuture` is successfully cancelled - meaning
83 * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
84 * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
85 * `Deferred` will always be put into its "cancelling" state and (barring uncooperative
86 * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
87 * cancelled.
88 *
89 * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
90 * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
91 * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
92 * terminal state.
93 *
94 * The returned `Deferred` may receive and suppress the `true` return value from
95 * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
96 * This is unavoidable, so make sure no idempotent cancellation work is performed by a
97 * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
98 * cancellation was from the `Deferred` representation of the task.
99 *
100 * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
101 * semantics. See `Job` for a description of coroutine cancellation semantics.
102 */
103 // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
104 // Exception by using the same instance the Future created.
105 if (this is InternalFutureFailureAccess) {
106 val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
107 if (t != null) {
108 return CompletableDeferred<T>().also {
109 it.completeExceptionally(t)
110 }
111 }
112 }
113
114 // Second, try the fast path for a completed Future. The Future is known to be done, so get()
115 // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
116 // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
117 // handle interruption.
118 if (isDone) {
119 return try {
120 CompletableDeferred(Uninterruptibles.getUninterruptibly(this))
121 } catch (e: CancellationException) {
122 CompletableDeferred<T>().also { it.cancel(e) }
123 } catch (e: ExecutionException) {
124 // ExecutionException is the only kind of exception that can be thrown from a gotten
125 // Future. Anything else showing up here indicates a very fundamental bug in a
126 // Future implementation.
127 CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
128 }
129 }
130
131 // Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
132 val deferred = CompletableDeferred<T>()
133 Futures.addCallback(this, object : FutureCallback<T> {
134 override fun onSuccess(result: T) {
135 runCatching { deferred.complete(result) }
136 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
137 }
138
139 override fun onFailure(t: Throwable) {
140 runCatching { deferred.completeExceptionally(t) }
141 .onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
142 }
143 }, MoreExecutors.directExecutor())
144
145 // ... And cancel the Future when the deferred completes. Since the return type of this method
146 // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
147 // completion handler runs before the Future is completed, the Deferred must have been
148 // cancelled and should propagate its cancellation. If it runs after the Future is completed,
149 // this is a no-op.
150 deferred.invokeOnCompletion {
151 cancel(false)
152 }
153 // Return hides the CompletableDeferred. This should prevent casting.
154 return object : Deferred<T> by deferred {}
155 }
156
157 /**
158 * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
159 *
160 * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
161 * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
162 * [Exception].
163 *
164 * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
165 * state - a serious fundamental bug.
166 */
ExecutionExceptionnull167 private fun ExecutionException.nonNullCause(): Throwable {
168 return this.cause!!
169 }
170
171 /**
172 * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
173 *
174 * Completion is non-atomic between the two promises.
175 *
176 * When either promise successfully completes, it will attempt to synchronously complete its
177 * counterpart with the same value. This will succeed barring a race with cancellation.
178 *
179 * When either promise completes with an Exception, it will attempt to synchronously complete its
180 * counterpart with the same Exception. This will succeed barring a race with cancellation.
181 *
182 * Cancellation is propagated bidirectionally.
183 *
184 * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
185 * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
186 * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
187 * non-cancelled terminal state.
188 *
189 * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
190 * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
191 * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
192 * the returned `Future` will always _eventually_ reach its cancelled state when either promise is
193 * successfully cancelled, for their different meanings of "successfully cancelled".
194 *
195 * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
196 * semantics. See [Job] for a description of coroutine cancellation semantics. See
197 * [JobListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
198 * corner cases of this method.
199 */
asListenableFuturenull200 public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
201 val listenableFuture = JobListenableFuture<T>(this)
202 // This invokeOnCompletion completes the JobListenableFuture with the same result as `this` Deferred.
203 // The JobListenableFuture may have completed earlier if it got cancelled! See JobListenableFuture.cancel().
204 invokeOnCompletion { throwable ->
205 if (throwable == null) {
206 listenableFuture.complete(getCompleted())
207 } else {
208 listenableFuture.completeExceptionallyOrCancel(throwable)
209 }
210 }
211 return listenableFuture
212 }
213
214 /**
215 * Awaits completion of `this` [ListenableFuture] without blocking a thread.
216 *
217 * This suspend function is cancellable.
218 *
219 * If the [Job] of the current coroutine is cancelled while this suspending function is waiting, this function
220 * stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
221 *
222 * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
223 * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
224 * [kotlinx.coroutines.NonCancellable].
225 */
awaitnull226 public suspend fun <T> ListenableFuture<T>.await(): T {
227 try {
228 if (isDone) return Uninterruptibles.getUninterruptibly(this)
229 } catch (e: ExecutionException) {
230 // ExecutionException is the only kind of exception that can be thrown from a gotten
231 // Future, other than CancellationException. Cancellation is propagated upward so that
232 // the coroutine running this suspend function may process it.
233 // Any other Exception showing up here indicates a very fundamental bug in a
234 // Future implementation.
235 throw e.nonNullCause()
236 }
237
238 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
239 addListener(
240 ToContinuation(this, cont),
241 MoreExecutors.directExecutor())
242 cont.invokeOnCancellation {
243 cancel(false)
244 }
245 }
246 }
247
248 /**
249 * Propagates the outcome of [futureToObserve] to [continuation] on completion.
250 *
251 * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
252 * and fails, the cause of the Future will be propagated without a wrapping
253 * [ExecutionException] when thrown.
254 */
255 private class ToContinuation<T>(
256 val futureToObserve: ListenableFuture<T>,
257 val continuation: CancellableContinuation<T>
258 ): Runnable {
runnull259 override fun run() {
260 if (futureToObserve.isCancelled) {
261 continuation.cancel()
262 } else {
263 try {
264 continuation.resume(Uninterruptibles.getUninterruptibly(futureToObserve))
265 } catch (e: ExecutionException) {
266 // ExecutionException is the only kind of exception that can be thrown from a gotten
267 // Future. Anything else showing up here indicates a very fundamental bug in a
268 // Future implementation.
269 continuation.resumeWithException(e.nonNullCause())
270 }
271 }
272 }
273 }
274
275 /**
276 * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
277 * completion.
278 *
279 * If [future] is successfully cancelled, cancellation is propagated to `this` `Coroutine`.
280 * By documented contract, a [Future] has been cancelled if
281 * and only if its `isCancelled()` method returns true.
282 *
283 * Any error that occurs after successfully cancelling a [ListenableFuture] is lost.
284 * The contract of [Future] does not permit it to return an error after it is successfully cancelled.
285 * On the other hand, we can't report an unhandled exception to [CoroutineExceptionHandler],
286 * otherwise [Future.cancel] can lead to an app crash which arguably is a contract violation.
287 * In contrast to [Future] which can't change its outcome after a successful cancellation,
288 * cancelling a [Deferred] places that [Deferred] in the cancelling/cancelled states defined by [Job],
289 * which _can_ show the error.
290 *
291 * This may be counterintuitive, but it maintains the error and cancellation contracts of both
292 * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
293 * to the same running task.
294 */
295 private class ListenableFutureCoroutine<T>(
296 context: CoroutineContext
297 ) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {
298
299 // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
300 @JvmField
301 val future = JobListenableFuture<T>(this)
302
onCompletednull303 override fun onCompleted(value: T) {
304 future.complete(value)
305 }
306
onCancellednull307 override fun onCancelled(cause: Throwable, handled: Boolean) {
308 // Note: if future was cancelled in a race with a cancellation of this
309 // coroutine, and the future was successfully cancelled first, the cause of coroutine
310 // cancellation is dropped in this promise. A Future can only be completed once.
311 //
312 // This is consistent with FutureTask behaviour. A race between a Future.cancel() and
313 // a FutureTask.setException() for the same Future will similarly drop the
314 // cause of a failure-after-cancellation.
315 future.completeExceptionallyOrCancel(cause)
316 }
317 }
318
319 /**
320 * A [ListenableFuture] that delegates to an internal [SettableFuture], collaborating with it.
321 *
322 * This setup allows the returned [ListenableFuture] to maintain the following properties:
323 *
324 * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
325 * and [isCancelled] methods
326 * - Cancellation propagation both to and from [Deferred]
327 * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
328 * with different concrete implementations of [ListenableFuture]
329 * - Fully correct cancellation and listener happens-after obeying [Future] and
330 * [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
331 * The best way to be correct, especially given the fun corner cases from
332 * [AbstractFuture.setFuture], is to just use an [AbstractFuture].
333 * - To maintain sanity, this class implements [ListenableFuture] and uses an auxiliary [SettableFuture]
334 * around coroutine's result as a state engine to establish happens-after-completion. This
335 * could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the
336 * cost of the implementation's readability.
337 */
338 private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFuture<T> {
339 /**
340 * Serves as a state machine for [Future] cancellation.
341 *
342 * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
343 * cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to
344 * `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned.
345 *
346 * To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled].
347 */
348 private val auxFuture = SettableFuture.create<Any?>()
349
350 /**
351 * `true` if [auxFuture.get][ListenableFuture.get] throws [ExecutionException].
352 *
353 * Note: this is eventually consistent with the state of [auxFuture].
354 *
355 * Unfortunately, there's no API to figure out if [ListenableFuture] throws [ExecutionException]
356 * apart from calling [ListenableFuture.get] on it. To avoid unnecessary [ExecutionException] allocation
357 * we use this field as an optimization.
358 */
359 private var auxFutureIsFailed: Boolean = false
360
361 /**
362 * When the attached coroutine [isCompleted][Job.isCompleted] successfully
363 * its outcome should be passed to this method.
364 *
365 * This should succeed barring a race with external cancellation.
366 */
completenull367 fun complete(result: T): Boolean = auxFuture.set(result)
368
369 /**
370 * When the attached coroutine [isCompleted][Job.isCompleted] [exceptionally][Job.isCancelled]
371 * its outcome should be passed to this method.
372 *
373 * This method will map coroutine's exception into corresponding Future's exception.
374 *
375 * This should succeed barring a race with external cancellation.
376 */
377 // CancellationException is wrapped into `Cancelled` to preserve original cause and message.
378 // All the other exceptions are delegated to SettableFuture.setException.
379 fun completeExceptionallyOrCancel(t: Throwable): Boolean =
380 if (t is CancellationException) auxFuture.set(Cancelled(t))
381 else auxFuture.setException(t).also { if (it) auxFutureIsFailed = true }
382
383 /**
384 * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
385 * [Job.isCancelled].
386 *
387 * When done, this Future is cancelled if its [auxFuture] is cancelled, or if [auxFuture]
388 * contains [CancellationException].
389 *
390 * See [cancel].
391 */
isCancellednull392 override fun isCancelled(): Boolean {
393 // This expression ensures that isCancelled() will *never* return true when isDone() returns false.
394 // In the case that the deferred has completed with cancellation, completing `this`, its
395 // reaching the "cancelled" state with a cause of CancellationException is treated as the
396 // same thing as auxFuture getting cancelled. If the Job is in the "cancelling" state and
397 // this Future hasn't itself been successfully cancelled, the Future will return
398 // isCancelled() == false. This is the only discovered way to reconcile the two different
399 // cancellation contracts.
400 return auxFuture.isCancelled || isDone && !auxFutureIsFailed && try {
401 Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled
402 } catch (e: CancellationException) {
403 // `auxFuture` got cancelled right after `auxFuture.isCancelled` returned false.
404 true
405 } catch (e: ExecutionException) {
406 // `auxFutureIsFailed` hasn't been updated yet.
407 auxFutureIsFailed = true
408 false
409 }
410 }
411
412 /**
413 * Waits for [auxFuture] to complete by blocking, then uses its `result`
414 * to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException].
415 * This establishes happens-after ordering for completion of the entangled coroutine.
416 *
417 * [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally.
418 * Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine.
419 *
420 * [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after
421 * contract of [Future] to be correctly followed.
422 */
getnull423 override fun get(): T {
424 return getInternal(auxFuture.get())
425 }
426
427 /** See [get()]. */
getnull428 override fun get(timeout: Long, unit: TimeUnit): T {
429 return getInternal(auxFuture.get(timeout, unit))
430 }
431
432 /** See [get()]. */
getInternalnull433 private fun getInternal(result: Any?): T = if (result is Cancelled) {
434 throw CancellationException().initCause(result.exception)
435 } else {
436 // We know that `auxFuture` can contain either `T` or `Cancelled`.
437 @Suppress("UNCHECKED_CAST")
438 result as T
439 }
440
addListenernull441 override fun addListener(listener: Runnable, executor: Executor) {
442 auxFuture.addListener(listener, executor)
443 }
444
isDonenull445 override fun isDone(): Boolean {
446 return auxFuture.isDone
447 }
448
449 /**
450 * Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy.
451 *
452 * The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture]
453 * succeeds, [jobToCancel] will have its [Job.cancel] called.
454 *
455 * This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves
456 * in a particular way. [jobToCancel] may also be in its "cancelling" state while this
457 * ListenableFuture is complete and cancelled.
458 */
cancelnull459 override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
460 // TODO: call jobToCancel.cancel() _before_ running the listeners.
461 // `auxFuture.cancel()` will execute auxFuture's listeners. This delays cancellation of
462 // `jobToCancel` until after auxFuture's listeners have already run.
463 // Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized.
464 return if (auxFuture.cancel(mayInterruptIfRunning)) {
465 jobToCancel.cancel()
466 true
467 } else {
468 false
469 }
470 }
471
<lambda>null472 override fun toString(): String = buildString {
473 append(super.toString())
474 append("[status=")
475 if (isDone) {
476 try {
477 when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) {
478 is Cancelled -> append("CANCELLED, cause=[${result.exception}]")
479 else -> append("SUCCESS, result=[$result]")
480 }
481 } catch (e: CancellationException) {
482 // `this` future was cancelled by `Future.cancel`. In this case there's no cause or message.
483 append("CANCELLED")
484 } catch (e: ExecutionException) {
485 append("FAILURE, cause=[${e.cause}]")
486 } catch (t: Throwable) {
487 // Violation of Future's contract, should never happen.
488 append("UNKNOWN, cause=[${t.javaClass} thrown from get()]")
489 }
490 } else {
491 append("PENDING, delegate=[$auxFuture]")
492 }
493 append(']')
494 }
495 }
496
497 /**
498 * A wrapper for `Coroutine`'s [CancellationException].
499 *
500 * If the coroutine is _cancelled normally_, we want to show the reason of cancellation to the user. Unfortunately,
501 * [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap cancellation exception into this
502 * class and pass it into [SettableFuture.complete]. See implementation of [JobListenableFuture].
503 */
504 private class Cancelled(@JvmField val exception: CancellationException)
505