1 package kotlinx.atomicfu.locks
2 
3 import platform.posix.*
4 import interop.*
5 import kotlinx.cinterop.*
6 import kotlin.concurrent.*
7 import kotlin.native.internal.NativePtr
8 import kotlinx.atomicfu.locks.SynchronizedObject.Status.*
9 import kotlin.concurrent.AtomicNativePtr
10 import kotlin.concurrent.AtomicReference
11 import kotlin.native.SharedImmutable
12 import kotlin.native.concurrent.*
13 
14 public actual open class SynchronizedObject {
15 
16     protected val lock = AtomicReference(LockState(UNLOCKED, 0, 0))
17 
locknull18     public fun lock() {
19         val currentThreadId = pthread_self()!!
20         while (true) {
21             val state = lock.value
22             when (state.status) {
23                 UNLOCKED -> {
24                     val thinLock = LockState(THIN, 1, 0, currentThreadId)
25                     if (lock.compareAndSet(state, thinLock))
26                         return
27                 }
28                 THIN -> {
29                     if (currentThreadId == state.ownerThreadId) {
30                         // reentrant lock
31                         val thinNested = LockState(THIN, state.nestedLocks + 1, state.waiters, currentThreadId)
32                         if (lock.compareAndSet(state, thinNested))
33                             return
34                     } else {
35                         // another thread is trying to take this lock -> allocate native mutex
36                         val mutex = mutexPool.allocate()
37                         mutex.lock()
38                         val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, mutex)
39                         if (lock.compareAndSet(state, fatLock)) {
40                             //block the current thread waiting for the owner thread to release the permit
41                             mutex.lock()
42                             tryLockAfterResume(currentThreadId)
43                             return
44                         } else {
45                             // return permit taken for the owner thread and release mutex back to the pool
46                             mutex.unlock()
47                             mutexPool.release(mutex)
48                         }
49                     }
50                 }
51                 FAT -> {
52                     if (currentThreadId == state.ownerThreadId) {
53                         // reentrant lock
54                         val nestedFatLock = LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex)
55                         if (lock.compareAndSet(state, nestedFatLock)) return
56                     } else if (state.ownerThreadId != null) {
57                         val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex)
58                         if (lock.compareAndSet(state, fatLock)) {
59                             fatLock.mutex!!.lock()
60                             tryLockAfterResume(currentThreadId)
61                             return
62                         }
63                     }
64                 }
65             }
66         }
67     }
68 
tryLocknull69     public fun tryLock(): Boolean {
70         val currentThreadId = pthread_self()!!
71         while (true) {
72             val state = lock.value
73             if (state.status == UNLOCKED) {
74                 val thinLock = LockState(THIN, 1, 0, currentThreadId)
75                 if (lock.compareAndSet(state, thinLock))
76                     return true
77             } else {
78                 if (currentThreadId == state.ownerThreadId) {
79                     val nestedLock = LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex)
80                     if (lock.compareAndSet(state, nestedLock))
81                         return true
82                 } else {
83                     return false
84                 }
85             }
86         }
87     }
88 
unlocknull89     public fun unlock() {
90         val currentThreadId = pthread_self()!!
91         while (true) {
92             val state = lock.value
93             require(currentThreadId == state.ownerThreadId) { "Thin lock may be only released by the owner thread, expected: ${state.ownerThreadId}, real: $currentThreadId" }
94             when (state.status) {
95                 THIN -> {
96                     // nested unlock
97                     if (state.nestedLocks == 1) {
98                         val unlocked = LockState(UNLOCKED, 0, 0)
99                         if (lock.compareAndSet(state, unlocked))
100                             return
101                     } else {
102                         val releasedNestedLock =
103                             LockState(THIN, state.nestedLocks - 1, state.waiters, state.ownerThreadId)
104                         if (lock.compareAndSet(state, releasedNestedLock))
105                             return
106                     }
107                 }
108                 FAT -> {
109                     if (state.nestedLocks == 1) {
110                         // last nested unlock -> release completely, resume some waiter
111                         val releasedLock = LockState(FAT, 0, state.waiters - 1, null, state.mutex)
112                         if (lock.compareAndSet(state, releasedLock)) {
113                             releasedLock.mutex!!.unlock()
114                             return
115                         }
116                     } else {
117                         // lock is still owned by the current thread
118                         val releasedLock =
119                             LockState(FAT, state.nestedLocks - 1, state.waiters, state.ownerThreadId, state.mutex)
120                         if (lock.compareAndSet(state, releasedLock))
121                             return
122                     }
123                 }
124                 else -> error("It is not possible to unlock the mutex that is not obtained")
125             }
126         }
127     }
128 
tryLockAfterResumenull129     private fun tryLockAfterResume(threadId: pthread_t) {
130         while (true) {
131             val state = lock.value
132             val newState = if (state.waiters == 0) // deflate
133                 LockState(THIN, 1, 0, threadId)
134             else
135                 LockState(FAT, 1, state.waiters, threadId, state.mutex)
136             if (lock.compareAndSet(state, newState)) {
137                 if (state.waiters == 0) {
138                     state.mutex!!.unlock()
139                     mutexPool.release(state.mutex)
140                 }
141                 return
142             }
143         }
144     }
145 
146     protected class LockState(
147         val status: Status,
148         val nestedLocks: Int,
149         val waiters: Int,
150         val ownerThreadId: pthread_t? = null,
151         val mutex: CPointer<mutex_node_t>? = null
152     ) {
153         init { freeze() }
154     }
155 
156     protected enum class Status { UNLOCKED, THIN, FAT }
157 
locknull158     private fun CPointer<mutex_node_t>.lock() = lock(this.pointed.mutex)
159 
160     private fun CPointer<mutex_node_t>.unlock() = unlock(this.pointed.mutex)
161 }
162 
163 public actual fun reentrantLock() = ReentrantLock()
164 
165 public actual typealias ReentrantLock = SynchronizedObject
166 
167 public actual inline fun <T> ReentrantLock.withLock(block: () -> T): T {
168     lock()
169     try {
170         return block()
171     } finally {
172         unlock()
173     }
174 }
175 
synchronizednull176 public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T {
177     lock.lock()
178     try {
179         return block()
180     } finally {
181         lock.unlock()
182     }
183 }
184 
185 private const val INITIAL_POOL_CAPACITY = 64
186 
187 @SharedImmutable
<lambda>null188 private val mutexPool by lazy { MutexPool(INITIAL_POOL_CAPACITY) }
189 
190 class MutexPool(capacity: Int) {
191     private val top = AtomicNativePtr(NativePtr.NULL)
192 
<lambda>null193     private val mutexes = nativeHeap.allocArray<mutex_node_t>(capacity) { mutex_node_init(ptr) }
194 
195     init {
inull196         for (i in 0 until capacity) {
197             release(interpretCPointer<mutex_node_t>(mutexes.rawValue.plus(i * sizeOf<mutex_node_t>()))!!)
198         }
199     }
200 
<lambda>null201     private fun allocMutexNode() = nativeHeap.alloc<mutex_node_t> { mutex_node_init(ptr) }.ptr
202 
allocatenull203     fun allocate(): CPointer<mutex_node_t> = pop() ?: allocMutexNode()
204 
205     fun release(mutexNode: CPointer<mutex_node_t>) {
206         while (true) {
207             val oldTop = interpretCPointer<mutex_node_t>(top.value)
208             mutexNode.pointed.next = oldTop
209             if (top.compareAndSet(oldTop.rawValue, mutexNode.rawValue))
210                 return
211         }
212     }
213 
popnull214     private fun pop(): CPointer<mutex_node_t>? {
215         while (true) {
216             val oldTop = interpretCPointer<mutex_node_t>(top.value)
217             if (oldTop.rawValue === NativePtr.NULL)
218                 return null
219             val newHead = oldTop!!.pointed.next
220             if (top.compareAndSet(oldTop.rawValue, newHead.rawValue))
221                 return oldTop
222         }
223     }
224 }
225