1 package kotlinx.atomicfu.locks
2
3 import platform.posix.*
4 import interop.*
5 import kotlinx.cinterop.*
6 import kotlin.concurrent.*
7 import kotlin.native.internal.NativePtr
8 import kotlinx.atomicfu.locks.SynchronizedObject.Status.*
9 import kotlin.concurrent.AtomicNativePtr
10 import kotlin.concurrent.AtomicReference
11 import kotlin.native.SharedImmutable
12 import kotlin.native.concurrent.*
13
14 public actual open class SynchronizedObject {
15
16 protected val lock = AtomicReference(LockState(UNLOCKED, 0, 0))
17
locknull18 public fun lock() {
19 val currentThreadId = pthread_self()!!
20 while (true) {
21 val state = lock.value
22 when (state.status) {
23 UNLOCKED -> {
24 val thinLock = LockState(THIN, 1, 0, currentThreadId)
25 if (lock.compareAndSet(state, thinLock))
26 return
27 }
28 THIN -> {
29 if (currentThreadId == state.ownerThreadId) {
30 // reentrant lock
31 val thinNested = LockState(THIN, state.nestedLocks + 1, state.waiters, currentThreadId)
32 if (lock.compareAndSet(state, thinNested))
33 return
34 } else {
35 // another thread is trying to take this lock -> allocate native mutex
36 val mutex = mutexPool.allocate()
37 mutex.lock()
38 val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, mutex)
39 if (lock.compareAndSet(state, fatLock)) {
40 //block the current thread waiting for the owner thread to release the permit
41 mutex.lock()
42 tryLockAfterResume(currentThreadId)
43 return
44 } else {
45 // return permit taken for the owner thread and release mutex back to the pool
46 mutex.unlock()
47 mutexPool.release(mutex)
48 }
49 }
50 }
51 FAT -> {
52 if (currentThreadId == state.ownerThreadId) {
53 // reentrant lock
54 val nestedFatLock = LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex)
55 if (lock.compareAndSet(state, nestedFatLock)) return
56 } else if (state.ownerThreadId != null) {
57 val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex)
58 if (lock.compareAndSet(state, fatLock)) {
59 fatLock.mutex!!.lock()
60 tryLockAfterResume(currentThreadId)
61 return
62 }
63 }
64 }
65 }
66 }
67 }
68
tryLocknull69 public fun tryLock(): Boolean {
70 val currentThreadId = pthread_self()!!
71 while (true) {
72 val state = lock.value
73 if (state.status == UNLOCKED) {
74 val thinLock = LockState(THIN, 1, 0, currentThreadId)
75 if (lock.compareAndSet(state, thinLock))
76 return true
77 } else {
78 if (currentThreadId == state.ownerThreadId) {
79 val nestedLock = LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex)
80 if (lock.compareAndSet(state, nestedLock))
81 return true
82 } else {
83 return false
84 }
85 }
86 }
87 }
88
unlocknull89 public fun unlock() {
90 val currentThreadId = pthread_self()!!
91 while (true) {
92 val state = lock.value
93 require(currentThreadId == state.ownerThreadId) { "Thin lock may be only released by the owner thread, expected: ${state.ownerThreadId}, real: $currentThreadId" }
94 when (state.status) {
95 THIN -> {
96 // nested unlock
97 if (state.nestedLocks == 1) {
98 val unlocked = LockState(UNLOCKED, 0, 0)
99 if (lock.compareAndSet(state, unlocked))
100 return
101 } else {
102 val releasedNestedLock =
103 LockState(THIN, state.nestedLocks - 1, state.waiters, state.ownerThreadId)
104 if (lock.compareAndSet(state, releasedNestedLock))
105 return
106 }
107 }
108 FAT -> {
109 if (state.nestedLocks == 1) {
110 // last nested unlock -> release completely, resume some waiter
111 val releasedLock = LockState(FAT, 0, state.waiters - 1, null, state.mutex)
112 if (lock.compareAndSet(state, releasedLock)) {
113 releasedLock.mutex!!.unlock()
114 return
115 }
116 } else {
117 // lock is still owned by the current thread
118 val releasedLock =
119 LockState(FAT, state.nestedLocks - 1, state.waiters, state.ownerThreadId, state.mutex)
120 if (lock.compareAndSet(state, releasedLock))
121 return
122 }
123 }
124 else -> error("It is not possible to unlock the mutex that is not obtained")
125 }
126 }
127 }
128
tryLockAfterResumenull129 private fun tryLockAfterResume(threadId: pthread_t) {
130 while (true) {
131 val state = lock.value
132 val newState = if (state.waiters == 0) // deflate
133 LockState(THIN, 1, 0, threadId)
134 else
135 LockState(FAT, 1, state.waiters, threadId, state.mutex)
136 if (lock.compareAndSet(state, newState)) {
137 if (state.waiters == 0) {
138 state.mutex!!.unlock()
139 mutexPool.release(state.mutex)
140 }
141 return
142 }
143 }
144 }
145
146 protected class LockState(
147 val status: Status,
148 val nestedLocks: Int,
149 val waiters: Int,
150 val ownerThreadId: pthread_t? = null,
151 val mutex: CPointer<mutex_node_t>? = null
152 ) {
153 init { freeze() }
154 }
155
156 protected enum class Status { UNLOCKED, THIN, FAT }
157
locknull158 private fun CPointer<mutex_node_t>.lock() = lock(this.pointed.mutex)
159
160 private fun CPointer<mutex_node_t>.unlock() = unlock(this.pointed.mutex)
161 }
162
163 public actual fun reentrantLock() = ReentrantLock()
164
165 public actual typealias ReentrantLock = SynchronizedObject
166
167 public actual inline fun <T> ReentrantLock.withLock(block: () -> T): T {
168 lock()
169 try {
170 return block()
171 } finally {
172 unlock()
173 }
174 }
175
synchronizednull176 public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T {
177 lock.lock()
178 try {
179 return block()
180 } finally {
181 lock.unlock()
182 }
183 }
184
185 private const val INITIAL_POOL_CAPACITY = 64
186
187 @SharedImmutable
<lambda>null188 private val mutexPool by lazy { MutexPool(INITIAL_POOL_CAPACITY) }
189
190 class MutexPool(capacity: Int) {
191 private val top = AtomicNativePtr(NativePtr.NULL)
192
<lambda>null193 private val mutexes = nativeHeap.allocArray<mutex_node_t>(capacity) { mutex_node_init(ptr) }
194
195 init {
inull196 for (i in 0 until capacity) {
197 release(interpretCPointer<mutex_node_t>(mutexes.rawValue.plus(i * sizeOf<mutex_node_t>()))!!)
198 }
199 }
200
<lambda>null201 private fun allocMutexNode() = nativeHeap.alloc<mutex_node_t> { mutex_node_init(ptr) }.ptr
202
allocatenull203 fun allocate(): CPointer<mutex_node_t> = pop() ?: allocMutexNode()
204
205 fun release(mutexNode: CPointer<mutex_node_t>) {
206 while (true) {
207 val oldTop = interpretCPointer<mutex_node_t>(top.value)
208 mutexNode.pointed.next = oldTop
209 if (top.compareAndSet(oldTop.rawValue, mutexNode.rawValue))
210 return
211 }
212 }
213
popnull214 private fun pop(): CPointer<mutex_node_t>? {
215 while (true) {
216 val oldTop = interpretCPointer<mutex_node_t>(top.value)
217 if (oldTop.rawValue === NativePtr.NULL)
218 return null
219 val newHead = oldTop!!.pointed.next
220 if (top.compareAndSet(oldTop.rawValue, newHead.rawValue))
221 return oldTop
222 }
223 }
224 }
225