1 // This file is part of Eigen, a lightweight C++ template library 2 // for linear algebra. 3 // 4 // Copyright (C) 2016 Dmitry Vyukov <[email protected]> 5 // 6 // This Source Code Form is subject to the terms of the Mozilla 7 // Public License v. 2.0. If a copy of the MPL was not distributed 8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. 9 10 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ 11 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ 12 13 namespace Eigen { 14 15 // RunQueue is a fixed-size, partially non-blocking deque or Work items. 16 // Operations on front of the queue must be done by a single thread (owner), 17 // operations on back of the queue can be done by multiple threads concurrently. 18 // 19 // Algorithm outline: 20 // All remote threads operating on the queue back are serialized by a mutex. 21 // This ensures that at most two threads access state: owner and one remote 22 // thread (Size aside). The algorithm ensures that the occupied region of the 23 // underlying array is logically continuous (can wraparound, but no stray 24 // occupied elements). Owner operates on one end of this region, remote thread 25 // operates on the other end. Synchronization between these threads 26 // (potential consumption of the last element and take up of the last empty 27 // element) happens by means of state variable in each element. States are: 28 // empty, busy (in process of insertion of removal) and ready. Threads claim 29 // elements (empty->busy and ready->busy transitions) by means of a CAS 30 // operation. The finishing transition (busy->empty and busy->ready) are done 31 // with plain store as the element is exclusively owned by the current thread. 32 // 33 // Note: we could permit only pointers as elements, then we would not need 34 // separate state variable as null/non-null pointer value would serve as state, 35 // but that would require malloc/free per operation for large, complex values 36 // (and this is designed to store std::function<()>). 37 template <typename Work, unsigned kSize> 38 class RunQueue { 39 public: RunQueue()40 RunQueue() : front_(0), back_(0) { 41 // require power-of-two for fast masking 42 eigen_plain_assert((kSize & (kSize - 1)) == 0); 43 eigen_plain_assert(kSize > 2); // why would you do this? 44 eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter 45 for (unsigned i = 0; i < kSize; i++) 46 array_[i].state.store(kEmpty, std::memory_order_relaxed); 47 } 48 ~RunQueue()49 ~RunQueue() { eigen_plain_assert(Size() == 0); } 50 51 // PushFront inserts w at the beginning of the queue. 52 // If queue is full returns w, otherwise returns default-constructed Work. PushFront(Work w)53 Work PushFront(Work w) { 54 unsigned front = front_.load(std::memory_order_relaxed); 55 Elem* e = &array_[front & kMask]; 56 uint8_t s = e->state.load(std::memory_order_relaxed); 57 if (s != kEmpty || 58 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 59 return w; 60 front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed); 61 e->w = std::move(w); 62 e->state.store(kReady, std::memory_order_release); 63 return Work(); 64 } 65 66 // PopFront removes and returns the first element in the queue. 67 // If the queue was empty returns default-constructed Work. PopFront()68 Work PopFront() { 69 unsigned front = front_.load(std::memory_order_relaxed); 70 Elem* e = &array_[(front - 1) & kMask]; 71 uint8_t s = e->state.load(std::memory_order_relaxed); 72 if (s != kReady || 73 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 74 return Work(); 75 Work w = std::move(e->w); 76 e->state.store(kEmpty, std::memory_order_release); 77 front = ((front - 1) & kMask2) | (front & ~kMask2); 78 front_.store(front, std::memory_order_relaxed); 79 return w; 80 } 81 82 // PushBack adds w at the end of the queue. 83 // If queue is full returns w, otherwise returns default-constructed Work. PushBack(Work w)84 Work PushBack(Work w) { 85 std::unique_lock<std::mutex> lock(mutex_); 86 unsigned back = back_.load(std::memory_order_relaxed); 87 Elem* e = &array_[(back - 1) & kMask]; 88 uint8_t s = e->state.load(std::memory_order_relaxed); 89 if (s != kEmpty || 90 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 91 return w; 92 back = ((back - 1) & kMask2) | (back & ~kMask2); 93 back_.store(back, std::memory_order_relaxed); 94 e->w = std::move(w); 95 e->state.store(kReady, std::memory_order_release); 96 return Work(); 97 } 98 99 // PopBack removes and returns the last elements in the queue. PopBack()100 Work PopBack() { 101 if (Empty()) return Work(); 102 std::unique_lock<std::mutex> lock(mutex_); 103 unsigned back = back_.load(std::memory_order_relaxed); 104 Elem* e = &array_[back & kMask]; 105 uint8_t s = e->state.load(std::memory_order_relaxed); 106 if (s != kReady || 107 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 108 return Work(); 109 Work w = std::move(e->w); 110 e->state.store(kEmpty, std::memory_order_release); 111 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed); 112 return w; 113 } 114 115 // PopBackHalf removes and returns half last elements in the queue. 116 // Returns number of elements removed. PopBackHalf(std::vector<Work> * result)117 unsigned PopBackHalf(std::vector<Work>* result) { 118 if (Empty()) return 0; 119 std::unique_lock<std::mutex> lock(mutex_); 120 unsigned back = back_.load(std::memory_order_relaxed); 121 unsigned size = Size(); 122 unsigned mid = back; 123 if (size > 1) mid = back + (size - 1) / 2; 124 unsigned n = 0; 125 unsigned start = 0; 126 for (; static_cast<int>(mid - back) >= 0; mid--) { 127 Elem* e = &array_[mid & kMask]; 128 uint8_t s = e->state.load(std::memory_order_relaxed); 129 if (n == 0) { 130 if (s != kReady || !e->state.compare_exchange_strong( 131 s, kBusy, std::memory_order_acquire)) 132 continue; 133 start = mid; 134 } else { 135 // Note: no need to store temporal kBusy, we exclusively own these 136 // elements. 137 eigen_plain_assert(s == kReady); 138 } 139 result->push_back(std::move(e->w)); 140 e->state.store(kEmpty, std::memory_order_release); 141 n++; 142 } 143 if (n != 0) 144 back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed); 145 return n; 146 } 147 148 // Size returns current queue size. 149 // Can be called by any thread at any time. Size()150 unsigned Size() const { return SizeOrNotEmpty<true>(); } 151 152 // Empty tests whether container is empty. 153 // Can be called by any thread at any time. Empty()154 bool Empty() const { return SizeOrNotEmpty<false>() == 0; } 155 156 // Delete all the elements from the queue. Flush()157 void Flush() { 158 while (!Empty()) { 159 PopFront(); 160 } 161 } 162 163 private: 164 static const unsigned kMask = kSize - 1; 165 static const unsigned kMask2 = (kSize << 1) - 1; 166 struct Elem { 167 std::atomic<uint8_t> state; 168 Work w; 169 }; 170 enum { 171 kEmpty, 172 kBusy, 173 kReady, 174 }; 175 std::mutex mutex_; 176 // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of 177 // front/back, respectively. The remaining bits contain modification counters 178 // that are incremented on Push operations. This allows us to (1) distinguish 179 // between empty and full conditions (if we would use log(kSize) bits for 180 // position, these conditions would be indistinguishable); (2) obtain 181 // consistent snapshot of front_/back_ for Size operation using the 182 // modification counters. 183 std::atomic<unsigned> front_; 184 std::atomic<unsigned> back_; 185 Elem array_[kSize]; 186 187 // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false, 188 // only whether the size is 0 is guaranteed to be correct. 189 // Can be called by any thread at any time. 190 template<bool NeedSizeEstimate> SizeOrNotEmpty()191 unsigned SizeOrNotEmpty() const { 192 // Emptiness plays critical role in thread pool blocking. So we go to great 193 // effort to not produce false positives (claim non-empty queue as empty). 194 unsigned front = front_.load(std::memory_order_acquire); 195 for (;;) { 196 // Capture a consistent snapshot of front/tail. 197 unsigned back = back_.load(std::memory_order_acquire); 198 unsigned front1 = front_.load(std::memory_order_relaxed); 199 if (front != front1) { 200 front = front1; 201 std::atomic_thread_fence(std::memory_order_acquire); 202 continue; 203 } 204 if (NeedSizeEstimate) { 205 return CalculateSize(front, back); 206 } else { 207 // This value will be 0 if the queue is empty, and undefined otherwise. 208 unsigned maybe_zero = ((front ^ back) & kMask2); 209 // Queue size estimate must agree with maybe zero check on the queue 210 // empty/non-empty state. 211 eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0)); 212 return maybe_zero; 213 } 214 } 215 } 216 217 EIGEN_ALWAYS_INLINE CalculateSize(unsigned front,unsigned back)218 unsigned CalculateSize(unsigned front, unsigned back) const { 219 int size = (front & kMask2) - (back & kMask2); 220 // Fix overflow. 221 if (size < 0) size += 2 * kSize; 222 // Order of modification in push/pop is crafted to make the queue look 223 // larger than it is during concurrent modifications. E.g. push can 224 // increment size before the corresponding pop has decremented it. 225 // So the computed size can be up to kSize + 1, fix it. 226 if (size > static_cast<int>(kSize)) size = kSize; 227 return static_cast<unsigned>(size); 228 } 229 230 RunQueue(const RunQueue&) = delete; 231 void operator=(const RunQueue&) = delete; 232 }; 233 234 } // namespace Eigen 235 236 #endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ 237