1 package kotlinx.coroutines
2
3 import kotlinx.coroutines.internal.*
4 import java.util.concurrent.*
5 import kotlin.coroutines.*
6
7 private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false)
8
9 @PublishedApi
10 internal actual val DefaultDelay: Delay = initializeDefaultDelay()
11
initializeDefaultDelaynull12 private fun initializeDefaultDelay(): Delay {
13 // Opt-out flag
14 if (!defaultMainDelayOptIn) return DefaultExecutor
15 val main = Dispatchers.Main
16 /*
17 * When we already are working with UI and Main threads, it makes
18 * no sense to create a separate thread with timer that cannot be controller
19 * by the UI runtime.
20 */
21 return if (main.isMissing() || main !is Delay) DefaultExecutor else main
22 }
23
24 @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
25 internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
26 const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"
27
28 init {
29 incrementUseCount() // this event loop is never completed
30 }
31
32 private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds
33
34 private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
35 try {
36 java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS)
37 } catch (e: SecurityException) {
38 DEFAULT_KEEP_ALIVE_MS
39 })
40
41 @Suppress("ObjectPropertyName")
42 @Volatile
43 private var _thread: Thread? = null
44
45 override val thread: Thread
46 get() = _thread ?: createThreadSync()
47
48 private const val FRESH = 0
49 private const val ACTIVE = 1
50 private const val SHUTDOWN_REQ = 2
51 private const val SHUTDOWN_ACK = 3
52 private const val SHUTDOWN = 4
53
54 @Volatile
55 private var debugStatus: Int = FRESH
56
57 private val isShutDown: Boolean get() = debugStatus == SHUTDOWN
58
59 private val isShutdownRequested: Boolean get() {
60 val debugStatus = debugStatus
61 return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
62 }
63
enqueuenull64 actual override fun enqueue(task: Runnable) {
65 if (isShutDown) shutdownError()
66 super.enqueue(task)
67 }
68
reschedulenull69 override fun reschedule(now: Long, delayedTask: DelayedTask) {
70 // Reschedule on default executor can only be invoked after Dispatchers.shutdown
71 shutdownError()
72 }
73
shutdownErrornull74 private fun shutdownError() {
75 throw RejectedExecutionException("DefaultExecutor was shut down. " +
76 "This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " +
77 "Please refer to Dispatchers.shutdown documentation for more details")
78 }
79
shutdownnull80 override fun shutdown() {
81 debugStatus = SHUTDOWN
82 super.shutdown()
83 }
84
85 /**
86 * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
87 * ```
88 * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } }
89 * ```
90 *
91 * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]),
92 * but it's not exposed as public API.
93 */
invokeOnTimeoutnull94 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
95 scheduleInvokeOnTimeout(timeMillis, block)
96
97 override fun run() {
98 ThreadLocalEventLoop.setEventLoop(this)
99 registerTimeLoopThread()
100 try {
101 var shutdownNanos = Long.MAX_VALUE
102 if (!notifyStartup()) return
103 while (true) {
104 Thread.interrupted() // just reset interruption flag
105 var parkNanos = processNextEvent()
106 if (parkNanos == Long.MAX_VALUE) {
107 // nothing to do, initialize shutdown timeout
108 val now = nanoTime()
109 if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
110 val tillShutdown = shutdownNanos - now
111 if (tillShutdown <= 0) return // shut thread down
112 parkNanos = parkNanos.coerceAtMost(tillShutdown)
113 } else
114 shutdownNanos = Long.MAX_VALUE
115 if (parkNanos > 0) {
116 // check if shutdown was requested and bail out in this case
117 if (isShutdownRequested) return
118 parkNanos(this, parkNanos)
119 }
120 }
121 } finally {
122 _thread = null // this thread is dead
123 acknowledgeShutdownIfNeeded()
124 unregisterTimeLoopThread()
125 // recheck if queues are empty after _thread reference was set to null (!!!)
126 if (!isEmpty) thread // recreate thread if it is needed
127 }
128 }
129
130 @Synchronized
createThreadSyncnull131 private fun createThreadSync(): Thread {
132 return _thread ?: Thread(this, THREAD_NAME).apply {
133 _thread = this
134 /*
135 * `DefaultExecutor` is a global singleton that creates its thread lazily.
136 * To isolate the classloaders properly, we are inherting the context classloader from
137 * the singleton itself instead of using parent' thread one
138 * in order not to accidentally capture temporary application classloader.
139 */
140 contextClassLoader = this@DefaultExecutor.javaClass.classLoader
141 isDaemon = true
142 start()
143 }
144 }
145
146 // used for tests
147 @Synchronized
ensureStartednull148 internal fun ensureStarted() {
149 assert { _thread == null } // ensure we are at a clean state
150 assert { debugStatus == FRESH || debugStatus == SHUTDOWN_ACK }
151 debugStatus = FRESH
152 createThreadSync() // create fresh thread
153 while (debugStatus == FRESH) (this as Object).wait()
154 }
155
156 @Synchronized
notifyStartupnull157 private fun notifyStartup(): Boolean {
158 if (isShutdownRequested) return false
159 debugStatus = ACTIVE
160 (this as Object).notifyAll()
161 return true
162 }
163
164 @Synchronized // used _only_ for tests
shutdownForTestsnull165 fun shutdownForTests(timeout: Long) {
166 val deadline = System.currentTimeMillis() + timeout
167 if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
168 // loop while there is anything to do immediately or deadline passes
169 while (debugStatus != SHUTDOWN_ACK && _thread != null) {
170 _thread?.let { unpark(it) } // wake up thread if present
171 val remaining = deadline - System.currentTimeMillis()
172 if (remaining <= 0) break
173 (this as Object).wait(timeout)
174 }
175 // restore fresh status
176 debugStatus = FRESH
177 }
178
179 @Synchronized
acknowledgeShutdownIfNeedednull180 private fun acknowledgeShutdownIfNeeded() {
181 if (!isShutdownRequested) return
182 debugStatus = SHUTDOWN_ACK
183 resetAll() // clear queues
184 (this as Object).notifyAll()
185 }
186
187 internal val isThreadPresent
188 get() = _thread != null
189 }
190