<lambda>null1 package kotlinx.coroutines.internal
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlin.coroutines.*
6
7 /**
8 * The result of .limitedParallelism(x) call, a dispatcher
9 * that wraps the given dispatcher, but limits the parallelism level, while
10 * trying to emulate fairness.
11 *
12 * ### Implementation details
13 *
14 * By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
15 * to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
16 * dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
17 * in order to avoid starvation of the underlying dispatcher.
18 *
19 * Such behavior is crucial to be compatible with any underlying dispatcher implementation without
20 * direct cooperation.
21 */
22 internal class LimitedDispatcher(
23 private val dispatcher: CoroutineDispatcher,
24 private val parallelism: Int
25 ) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {
26
27 // Atomic is necessary here for the sake of K/N memory ordering,
28 // there is no need in atomic operations for this property
29 private val runningWorkers = atomic(0)
30
31 private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
32
33 // A separate object that we can synchronize on for K/N
34 private val workerAllocationLock = SynchronizedObject()
35
36 @ExperimentalCoroutinesApi
37 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
38 parallelism.checkParallelism()
39 if (parallelism >= this.parallelism) return this
40 return super.limitedParallelism(parallelism)
41 }
42
43 override fun dispatch(context: CoroutineContext, block: Runnable) {
44 dispatchInternal(block) { worker ->
45 dispatcher.dispatch(this, worker)
46 }
47 }
48
49 @InternalCoroutinesApi
50 override fun dispatchYield(context: CoroutineContext, block: Runnable) {
51 dispatchInternal(block) { worker ->
52 dispatcher.dispatchYield(this, worker)
53 }
54 }
55
56 /**
57 * Tries to dispatch the given [block].
58 * If there are not enough workers, it starts a new one via [startWorker].
59 */
60 private inline fun dispatchInternal(block: Runnable, startWorker: (Worker) -> Unit) {
61 // Add task to queue so running workers will be able to see that
62 queue.addLast(block)
63 if (runningWorkers.value >= parallelism) return
64 // allocation may fail if some workers were launched in parallel or a worker temporarily decreased
65 // `runningWorkers` when they observed an empty queue.
66 if (!tryAllocateWorker()) return
67 val task = obtainTaskOrDeallocateWorker() ?: return
68 startWorker(Worker(task))
69 }
70
71 /**
72 * Tries to obtain the permit to start a new worker.
73 */
74 private fun tryAllocateWorker(): Boolean {
75 synchronized(workerAllocationLock) {
76 if (runningWorkers.value >= parallelism) return false
77 runningWorkers.incrementAndGet()
78 return true
79 }
80 }
81
82 /**
83 * Obtains the next task from the queue, or logically deallocates the worker if the queue is empty.
84 */
85 private fun obtainTaskOrDeallocateWorker(): Runnable? {
86 while (true) {
87 when (val nextTask = queue.removeFirstOrNull()) {
88 null -> synchronized(workerAllocationLock) {
89 runningWorkers.decrementAndGet()
90 if (queue.size == 0) return null
91 runningWorkers.incrementAndGet()
92 }
93 else -> return nextTask
94 }
95 }
96 }
97
98 /**
99 * A worker that polls the queue and runs tasks until there are no more of them.
100 *
101 * It always stores the next task to run. This is done in order to prevent the possibility of the fairness
102 * re-dispatch happening when there are no more tasks in the queue. This is important because, after all the
103 * actual tasks are done, nothing prevents the user from closing the dispatcher and making it incorrect to
104 * perform any more dispatches.
105 */
106 private inner class Worker(private var currentTask: Runnable) : Runnable {
107 override fun run() {
108 var fairnessCounter = 0
109 while (true) {
110 try {
111 currentTask.run()
112 } catch (e: Throwable) {
113 handleCoroutineException(EmptyCoroutineContext, e)
114 }
115 currentTask = obtainTaskOrDeallocateWorker() ?: return
116 // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
117 if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this@LimitedDispatcher)) {
118 // Do "yield" to let other views execute their runnable as well
119 // Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
120 dispatcher.dispatch(this@LimitedDispatcher, this)
121 return
122 }
123 }
124 }
125 }
126 }
127
128 // Save a few bytecode ops
<lambda>null129 internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }
130