1 package kotlinx.coroutines.scheduling 2 3 import kotlinx.coroutines.* 4 import kotlinx.coroutines.internal.* 5 import java.util.concurrent.* 6 import kotlin.coroutines.* 7 8 // Instance of Dispatchers.Default 9 internal object DefaultScheduler : SchedulerCoroutineDispatcher( 10 CORE_POOL_SIZE, MAX_POOL_SIZE, 11 IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME 12 ) { 13 14 @ExperimentalCoroutinesApi limitedParallelismnull15 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { 16 parallelism.checkParallelism() 17 if (parallelism >= CORE_POOL_SIZE) return this 18 return super.limitedParallelism(parallelism) 19 } 20 21 // Shuts down the dispatcher, used only by Dispatchers.shutdown() shutdownnull22 internal fun shutdown() { 23 super.close() 24 } 25 26 // Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close() closenull27 override fun close() { 28 throw UnsupportedOperationException("Dispatchers.Default cannot be closed") 29 } 30 toStringnull31 override fun toString(): String = "Dispatchers.Default" 32 } 33 34 // The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides 35 private object UnlimitedIoScheduler : CoroutineDispatcher() { 36 37 @InternalCoroutinesApi 38 override fun dispatchYield(context: CoroutineContext, block: Runnable) { 39 DefaultScheduler.dispatchWithContext(block, BlockingContext, true) 40 } 41 42 override fun dispatch(context: CoroutineContext, block: Runnable) { 43 DefaultScheduler.dispatchWithContext(block, BlockingContext, false) 44 } 45 46 @ExperimentalCoroutinesApi 47 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { 48 parallelism.checkParallelism() 49 if (parallelism >= MAX_POOL_SIZE) return this 50 return super.limitedParallelism(parallelism) 51 } 52 } 53 54 // Dispatchers.IO 55 internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { 56 57 private val default = UnlimitedIoScheduler.limitedParallelism( 58 systemProp( 59 IO_PARALLELISM_PROPERTY_NAME, 60 64.coerceAtLeast(AVAILABLE_PROCESSORS) 61 ) 62 ) 63 64 override val executor: Executor 65 get() = this 66 executenull67 override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command) 68 69 @ExperimentalCoroutinesApi 70 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { 71 // See documentation to Dispatchers.IO for the rationale 72 return UnlimitedIoScheduler.limitedParallelism(parallelism) 73 } 74 dispatchnull75 override fun dispatch(context: CoroutineContext, block: Runnable) { 76 default.dispatch(context, block) 77 } 78 79 @InternalCoroutinesApi dispatchYieldnull80 override fun dispatchYield(context: CoroutineContext, block: Runnable) { 81 default.dispatchYield(context, block) 82 } 83 closenull84 override fun close() { 85 error("Cannot be invoked on Dispatchers.IO") 86 } 87 toStringnull88 override fun toString(): String = "Dispatchers.IO" 89 } 90 91 // Instantiated in tests so we can test it in isolation 92 internal open class SchedulerCoroutineDispatcher( 93 private val corePoolSize: Int = CORE_POOL_SIZE, 94 private val maxPoolSize: Int = MAX_POOL_SIZE, 95 private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, 96 private val schedulerName: String = "CoroutineScheduler", 97 ) : ExecutorCoroutineDispatcher() { 98 99 override val executor: Executor 100 get() = coroutineScheduler 101 102 // This is variable for test purposes, so that we can reinitialize from clean state 103 private var coroutineScheduler = createScheduler() 104 105 private fun createScheduler() = 106 CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) 107 108 override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) 109 110 override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = 111 coroutineScheduler.dispatch(block, tailDispatch = true) 112 113 internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) { 114 coroutineScheduler.dispatch(block, context, tailDispatch) 115 } 116 117 override fun close() { 118 coroutineScheduler.close() 119 } 120 121 // fot tests only 122 @Synchronized 123 internal fun usePrivateScheduler() { 124 coroutineScheduler.shutdown(1_000L) 125 coroutineScheduler = createScheduler() 126 } 127 128 // for tests only 129 @Synchronized 130 internal fun shutdown(timeout: Long) { 131 coroutineScheduler.shutdown(timeout) 132 } 133 134 // for tests only 135 internal fun restore() = usePrivateScheduler() // recreate scheduler 136 } 137