<lambda>null1 package kotlinx.coroutines.internal
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlin.coroutines.*
6 
7 /**
8  * The result of .limitedParallelism(x) call, a dispatcher
9  * that wraps the given dispatcher, but limits the parallelism level, while
10  * trying to emulate fairness.
11  *
12  * ### Implementation details
13  *
14  * By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
15  * to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
16  * dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
17  * in order to avoid starvation of the underlying dispatcher.
18  *
19  * Such behavior is crucial to be compatible with any underlying dispatcher implementation without
20  * direct cooperation.
21  */
22 internal class LimitedDispatcher(
23     private val dispatcher: CoroutineDispatcher,
24     private val parallelism: Int
25 ) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {
26 
27     // Atomic is necessary here for the sake of K/N memory ordering,
28     // there is no need in atomic operations for this property
29     private val runningWorkers = atomic(0)
30 
31     private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
32 
33     // A separate object that we can synchronize on for K/N
34     private val workerAllocationLock = SynchronizedObject()
35 
36     @ExperimentalCoroutinesApi
37     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
38         parallelism.checkParallelism()
39         if (parallelism >= this.parallelism) return this
40         return super.limitedParallelism(parallelism)
41     }
42 
43     override fun dispatch(context: CoroutineContext, block: Runnable) {
44         dispatchInternal(block) { worker ->
45             dispatcher.dispatch(this, worker)
46         }
47     }
48 
49     @InternalCoroutinesApi
50     override fun dispatchYield(context: CoroutineContext, block: Runnable) {
51         dispatchInternal(block) { worker ->
52             dispatcher.dispatchYield(this, worker)
53         }
54     }
55 
56     /**
57      * Tries to dispatch the given [block].
58      * If there are not enough workers, it starts a new one via [startWorker].
59      */
60     private inline fun dispatchInternal(block: Runnable, startWorker: (Worker) -> Unit) {
61         // Add task to queue so running workers will be able to see that
62         queue.addLast(block)
63         if (runningWorkers.value >= parallelism) return
64         // allocation may fail if some workers were launched in parallel or a worker temporarily decreased
65         // `runningWorkers` when they observed an empty queue.
66         if (!tryAllocateWorker()) return
67         val task = obtainTaskOrDeallocateWorker() ?: return
68         startWorker(Worker(task))
69     }
70 
71     /**
72      * Tries to obtain the permit to start a new worker.
73      */
74     private fun tryAllocateWorker(): Boolean {
75         synchronized(workerAllocationLock) {
76             if (runningWorkers.value >= parallelism) return false
77             runningWorkers.incrementAndGet()
78             return true
79         }
80     }
81 
82     /**
83      * Obtains the next task from the queue, or logically deallocates the worker if the queue is empty.
84      */
85     private fun obtainTaskOrDeallocateWorker(): Runnable? {
86         while (true) {
87             when (val nextTask = queue.removeFirstOrNull()) {
88                 null -> synchronized(workerAllocationLock) {
89                     runningWorkers.decrementAndGet()
90                     if (queue.size == 0) return null
91                     runningWorkers.incrementAndGet()
92                 }
93                 else -> return nextTask
94             }
95         }
96     }
97 
98     /**
99      * A worker that polls the queue and runs tasks until there are no more of them.
100      *
101      * It always stores the next task to run. This is done in order to prevent the possibility of the fairness
102      * re-dispatch happening when there are no more tasks in the queue. This is important because, after all the
103      * actual tasks are done, nothing prevents the user from closing the dispatcher and making it incorrect to
104      * perform any more dispatches.
105      */
106     private inner class Worker(private var currentTask: Runnable) : Runnable {
107         override fun run() {
108             var fairnessCounter = 0
109             while (true) {
110                 try {
111                     currentTask.run()
112                 } catch (e: Throwable) {
113                     handleCoroutineException(EmptyCoroutineContext, e)
114                 }
115                 currentTask = obtainTaskOrDeallocateWorker() ?: return
116                 // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
117                 if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this@LimitedDispatcher)) {
118                     // Do "yield" to let other views execute their runnable as well
119                     // Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
120                     dispatcher.dispatch(this@LimitedDispatcher, this)
121                     return
122                 }
123             }
124         }
125     }
126 }
127 
128 // Save a few bytecode ops
<lambda>null129 internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }
130