xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1 package kotlinx.coroutines.scheduling
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.internal.*
5 import java.util.concurrent.*
6 import kotlin.coroutines.*
7 
8 // Instance of Dispatchers.Default
9 internal object DefaultScheduler : SchedulerCoroutineDispatcher(
10     CORE_POOL_SIZE, MAX_POOL_SIZE,
11     IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
12 ) {
13 
14     @ExperimentalCoroutinesApi
limitedParallelismnull15     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
16         parallelism.checkParallelism()
17         if (parallelism >= CORE_POOL_SIZE) return this
18         return super.limitedParallelism(parallelism)
19     }
20 
21     // Shuts down the dispatcher, used only by Dispatchers.shutdown()
shutdownnull22     internal fun shutdown() {
23         super.close()
24     }
25 
26     // Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close()
closenull27     override fun close() {
28         throw UnsupportedOperationException("Dispatchers.Default cannot be closed")
29     }
30 
toStringnull31     override fun toString(): String = "Dispatchers.Default"
32 }
33 
34 // The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
35 private object UnlimitedIoScheduler : CoroutineDispatcher() {
36 
37     @InternalCoroutinesApi
38     override fun dispatchYield(context: CoroutineContext, block: Runnable) {
39         DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
40     }
41 
42     override fun dispatch(context: CoroutineContext, block: Runnable) {
43         DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
44     }
45 
46     @ExperimentalCoroutinesApi
47     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
48         parallelism.checkParallelism()
49         if (parallelism >= MAX_POOL_SIZE) return this
50         return super.limitedParallelism(parallelism)
51     }
52 }
53 
54 // Dispatchers.IO
55 internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
56 
57     private val default = UnlimitedIoScheduler.limitedParallelism(
58         systemProp(
59             IO_PARALLELISM_PROPERTY_NAME,
60             64.coerceAtLeast(AVAILABLE_PROCESSORS)
61         )
62     )
63 
64     override val executor: Executor
65         get() = this
66 
executenull67     override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)
68 
69     @ExperimentalCoroutinesApi
70     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
71         // See documentation to Dispatchers.IO for the rationale
72         return UnlimitedIoScheduler.limitedParallelism(parallelism)
73     }
74 
dispatchnull75     override fun dispatch(context: CoroutineContext, block: Runnable) {
76         default.dispatch(context, block)
77     }
78 
79     @InternalCoroutinesApi
dispatchYieldnull80     override fun dispatchYield(context: CoroutineContext, block: Runnable) {
81         default.dispatchYield(context, block)
82     }
83 
closenull84     override fun close() {
85         error("Cannot be invoked on Dispatchers.IO")
86     }
87 
toStringnull88     override fun toString(): String = "Dispatchers.IO"
89 }
90 
91 // Instantiated in tests so we can test it in isolation
92 internal open class SchedulerCoroutineDispatcher(
93     private val corePoolSize: Int = CORE_POOL_SIZE,
94     private val maxPoolSize: Int = MAX_POOL_SIZE,
95     private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
96     private val schedulerName: String = "CoroutineScheduler",
97 ) : ExecutorCoroutineDispatcher() {
98 
99     override val executor: Executor
100         get() = coroutineScheduler
101 
102     // This is variable for test purposes, so that we can reinitialize from clean state
103     private var coroutineScheduler = createScheduler()
104 
105     private fun createScheduler() =
106         CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
107 
108     override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
109 
110     override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
111         coroutineScheduler.dispatch(block, tailDispatch = true)
112 
113     internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
114         coroutineScheduler.dispatch(block, context, tailDispatch)
115     }
116 
117     override fun close() {
118         coroutineScheduler.close()
119     }
120 
121     // fot tests only
122     @Synchronized
123     internal fun usePrivateScheduler() {
124         coroutineScheduler.shutdown(1_000L)
125         coroutineScheduler = createScheduler()
126     }
127 
128     // for tests only
129     @Synchronized
130     internal fun shutdown(timeout: Long) {
131         coroutineScheduler.shutdown(timeout)
132     }
133 
134     // for tests only
135     internal fun restore() = usePrivateScheduler() // recreate scheduler
136 }
137