xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.internal.*
4 import java.util.concurrent.*
5 import kotlin.coroutines.*
6 
7 private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false)
8 
9 @PublishedApi
10 internal actual val DefaultDelay: Delay = initializeDefaultDelay()
11 
initializeDefaultDelaynull12 private fun initializeDefaultDelay(): Delay {
13     // Opt-out flag
14     if (!defaultMainDelayOptIn) return DefaultExecutor
15     val main = Dispatchers.Main
16     /*
17      * When we already are working with UI and Main threads, it makes
18      * no sense to create a separate thread with timer that cannot be controller
19      * by the UI runtime.
20      */
21     return if (main.isMissing() || main !is Delay) DefaultExecutor else main
22 }
23 
24 @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
25 internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
26     const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor"
27 
28     init {
29         incrementUseCount() // this event loop is never completed
30     }
31 
32     private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds
33 
34     private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
35         try {
36             java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS)
37         } catch (e: SecurityException) {
38             DEFAULT_KEEP_ALIVE_MS
39         })
40 
41     @Suppress("ObjectPropertyName")
42     @Volatile
43     private var _thread: Thread? = null
44 
45     override val thread: Thread
46         get() = _thread ?: createThreadSync()
47 
48     private const val FRESH = 0
49     private const val ACTIVE = 1
50     private const val SHUTDOWN_REQ = 2
51     private const val SHUTDOWN_ACK = 3
52     private const val SHUTDOWN = 4
53 
54     @Volatile
55     private var debugStatus: Int = FRESH
56 
57     private val isShutDown: Boolean get() = debugStatus == SHUTDOWN
58 
59     private val isShutdownRequested: Boolean get() {
60         val debugStatus = debugStatus
61         return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
62     }
63 
enqueuenull64     actual override fun enqueue(task: Runnable) {
65         if (isShutDown) shutdownError()
66         super.enqueue(task)
67     }
68 
reschedulenull69      override fun reschedule(now: Long, delayedTask: DelayedTask) {
70          // Reschedule on default executor can only be invoked after Dispatchers.shutdown
71          shutdownError()
72     }
73 
shutdownErrornull74     private fun shutdownError() {
75         throw RejectedExecutionException("DefaultExecutor was shut down. " +
76             "This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " +
77             "Please refer to Dispatchers.shutdown documentation for more details")
78     }
79 
shutdownnull80     override fun shutdown() {
81         debugStatus = SHUTDOWN
82         super.shutdown()
83     }
84 
85     /**
86      * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
87      * ```
88      * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } }
89      * ```
90      *
91      * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]),
92      * but it's not exposed as public API.
93      */
invokeOnTimeoutnull94     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
95         scheduleInvokeOnTimeout(timeMillis, block)
96 
97     override fun run() {
98         ThreadLocalEventLoop.setEventLoop(this)
99         registerTimeLoopThread()
100         try {
101             var shutdownNanos = Long.MAX_VALUE
102             if (!notifyStartup()) return
103             while (true) {
104                 Thread.interrupted() // just reset interruption flag
105                 var parkNanos = processNextEvent()
106                 if (parkNanos == Long.MAX_VALUE) {
107                     // nothing to do, initialize shutdown timeout
108                     val now = nanoTime()
109                     if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
110                     val tillShutdown = shutdownNanos - now
111                     if (tillShutdown <= 0) return // shut thread down
112                     parkNanos = parkNanos.coerceAtMost(tillShutdown)
113                 } else
114                     shutdownNanos = Long.MAX_VALUE
115                 if (parkNanos > 0) {
116                     // check if shutdown was requested and bail out in this case
117                     if (isShutdownRequested) return
118                     parkNanos(this, parkNanos)
119                 }
120             }
121         } finally {
122             _thread = null // this thread is dead
123             acknowledgeShutdownIfNeeded()
124             unregisterTimeLoopThread()
125             // recheck if queues are empty after _thread reference was set to null (!!!)
126             if (!isEmpty) thread // recreate thread if it is needed
127         }
128     }
129 
130     @Synchronized
createThreadSyncnull131     private fun createThreadSync(): Thread {
132         return _thread ?: Thread(this, THREAD_NAME).apply {
133             _thread = this
134             /*
135              * `DefaultExecutor` is a global singleton that creates its thread lazily.
136              * To isolate the classloaders properly, we are inherting the context classloader from
137              * the singleton itself instead of using parent' thread one
138              * in order not to accidentally capture temporary application classloader.
139              */
140             contextClassLoader = this@DefaultExecutor.javaClass.classLoader
141             isDaemon = true
142             start()
143         }
144     }
145 
146     // used for tests
147     @Synchronized
ensureStartednull148     internal fun ensureStarted() {
149         assert { _thread == null } // ensure we are at a clean state
150         assert { debugStatus == FRESH || debugStatus == SHUTDOWN_ACK }
151         debugStatus = FRESH
152         createThreadSync() // create fresh thread
153         while (debugStatus == FRESH) (this as Object).wait()
154     }
155 
156     @Synchronized
notifyStartupnull157     private fun notifyStartup(): Boolean {
158         if (isShutdownRequested) return false
159         debugStatus = ACTIVE
160         (this as Object).notifyAll()
161         return true
162     }
163 
164     @Synchronized // used _only_ for tests
shutdownForTestsnull165     fun shutdownForTests(timeout: Long) {
166         val deadline = System.currentTimeMillis() + timeout
167         if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
168         // loop while there is anything to do immediately or deadline passes
169         while (debugStatus != SHUTDOWN_ACK && _thread != null) {
170             _thread?.let { unpark(it) } // wake up thread if present
171             val remaining = deadline - System.currentTimeMillis()
172             if (remaining <= 0) break
173             (this as Object).wait(timeout)
174         }
175         // restore fresh status
176         debugStatus = FRESH
177     }
178 
179     @Synchronized
acknowledgeShutdownIfNeedednull180     private fun acknowledgeShutdownIfNeeded() {
181         if (!isShutdownRequested) return
182         debugStatus = SHUTDOWN_ACK
183         resetAll() // clear queues
184         (this as Object).notifyAll()
185     }
186 
187     internal val isThreadPresent
188         get() = _thread != null
189 }
190