xref: /aosp_15_r20/external/eigen/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h (revision bf2c37156dfe67e5dfebd6d394bad8b2ab5804d4)
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <[email protected]>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
11 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
12 
13 namespace Eigen {
14 
15 // RunQueue is a fixed-size, partially non-blocking deque or Work items.
16 // Operations on front of the queue must be done by a single thread (owner),
17 // operations on back of the queue can be done by multiple threads concurrently.
18 //
19 // Algorithm outline:
20 // All remote threads operating on the queue back are serialized by a mutex.
21 // This ensures that at most two threads access state: owner and one remote
22 // thread (Size aside). The algorithm ensures that the occupied region of the
23 // underlying array is logically continuous (can wraparound, but no stray
24 // occupied elements). Owner operates on one end of this region, remote thread
25 // operates on the other end. Synchronization between these threads
26 // (potential consumption of the last element and take up of the last empty
27 // element) happens by means of state variable in each element. States are:
28 // empty, busy (in process of insertion of removal) and ready. Threads claim
29 // elements (empty->busy and ready->busy transitions) by means of a CAS
30 // operation. The finishing transition (busy->empty and busy->ready) are done
31 // with plain store as the element is exclusively owned by the current thread.
32 //
33 // Note: we could permit only pointers as elements, then we would not need
34 // separate state variable as null/non-null pointer value would serve as state,
35 // but that would require malloc/free per operation for large, complex values
36 // (and this is designed to store std::function<()>).
37 template <typename Work, unsigned kSize>
38 class RunQueue {
39  public:
RunQueue()40   RunQueue() : front_(0), back_(0) {
41     // require power-of-two for fast masking
42     eigen_plain_assert((kSize & (kSize - 1)) == 0);
43     eigen_plain_assert(kSize > 2);            // why would you do this?
44     eigen_plain_assert(kSize <= (64 << 10));  // leave enough space for counter
45     for (unsigned i = 0; i < kSize; i++)
46       array_[i].state.store(kEmpty, std::memory_order_relaxed);
47   }
48 
~RunQueue()49   ~RunQueue() { eigen_plain_assert(Size() == 0); }
50 
51   // PushFront inserts w at the beginning of the queue.
52   // If queue is full returns w, otherwise returns default-constructed Work.
PushFront(Work w)53   Work PushFront(Work w) {
54     unsigned front = front_.load(std::memory_order_relaxed);
55     Elem* e = &array_[front & kMask];
56     uint8_t s = e->state.load(std::memory_order_relaxed);
57     if (s != kEmpty ||
58         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
59       return w;
60     front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
61     e->w = std::move(w);
62     e->state.store(kReady, std::memory_order_release);
63     return Work();
64   }
65 
66   // PopFront removes and returns the first element in the queue.
67   // If the queue was empty returns default-constructed Work.
PopFront()68   Work PopFront() {
69     unsigned front = front_.load(std::memory_order_relaxed);
70     Elem* e = &array_[(front - 1) & kMask];
71     uint8_t s = e->state.load(std::memory_order_relaxed);
72     if (s != kReady ||
73         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
74       return Work();
75     Work w = std::move(e->w);
76     e->state.store(kEmpty, std::memory_order_release);
77     front = ((front - 1) & kMask2) | (front & ~kMask2);
78     front_.store(front, std::memory_order_relaxed);
79     return w;
80   }
81 
82   // PushBack adds w at the end of the queue.
83   // If queue is full returns w, otherwise returns default-constructed Work.
PushBack(Work w)84   Work PushBack(Work w) {
85     std::unique_lock<std::mutex> lock(mutex_);
86     unsigned back = back_.load(std::memory_order_relaxed);
87     Elem* e = &array_[(back - 1) & kMask];
88     uint8_t s = e->state.load(std::memory_order_relaxed);
89     if (s != kEmpty ||
90         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
91       return w;
92     back = ((back - 1) & kMask2) | (back & ~kMask2);
93     back_.store(back, std::memory_order_relaxed);
94     e->w = std::move(w);
95     e->state.store(kReady, std::memory_order_release);
96     return Work();
97   }
98 
99   // PopBack removes and returns the last elements in the queue.
PopBack()100   Work PopBack() {
101     if (Empty()) return Work();
102     std::unique_lock<std::mutex> lock(mutex_);
103     unsigned back = back_.load(std::memory_order_relaxed);
104     Elem* e = &array_[back & kMask];
105     uint8_t s = e->state.load(std::memory_order_relaxed);
106     if (s != kReady ||
107         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
108       return Work();
109     Work w = std::move(e->w);
110     e->state.store(kEmpty, std::memory_order_release);
111     back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
112     return w;
113   }
114 
115   // PopBackHalf removes and returns half last elements in the queue.
116   // Returns number of elements removed.
PopBackHalf(std::vector<Work> * result)117   unsigned PopBackHalf(std::vector<Work>* result) {
118     if (Empty()) return 0;
119     std::unique_lock<std::mutex> lock(mutex_);
120     unsigned back = back_.load(std::memory_order_relaxed);
121     unsigned size = Size();
122     unsigned mid = back;
123     if (size > 1) mid = back + (size - 1) / 2;
124     unsigned n = 0;
125     unsigned start = 0;
126     for (; static_cast<int>(mid - back) >= 0; mid--) {
127       Elem* e = &array_[mid & kMask];
128       uint8_t s = e->state.load(std::memory_order_relaxed);
129       if (n == 0) {
130         if (s != kReady || !e->state.compare_exchange_strong(
131                                s, kBusy, std::memory_order_acquire))
132           continue;
133         start = mid;
134       } else {
135         // Note: no need to store temporal kBusy, we exclusively own these
136         // elements.
137         eigen_plain_assert(s == kReady);
138       }
139       result->push_back(std::move(e->w));
140       e->state.store(kEmpty, std::memory_order_release);
141       n++;
142     }
143     if (n != 0)
144       back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
145     return n;
146   }
147 
148   // Size returns current queue size.
149   // Can be called by any thread at any time.
Size()150   unsigned Size() const { return SizeOrNotEmpty<true>(); }
151 
152   // Empty tests whether container is empty.
153   // Can be called by any thread at any time.
Empty()154   bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
155 
156   // Delete all the elements from the queue.
Flush()157   void Flush() {
158     while (!Empty()) {
159       PopFront();
160     }
161   }
162 
163  private:
164   static const unsigned kMask = kSize - 1;
165   static const unsigned kMask2 = (kSize << 1) - 1;
166   struct Elem {
167     std::atomic<uint8_t> state;
168     Work w;
169   };
170   enum {
171     kEmpty,
172     kBusy,
173     kReady,
174   };
175   std::mutex mutex_;
176   // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
177   // front/back, respectively. The remaining bits contain modification counters
178   // that are incremented on Push operations. This allows us to (1) distinguish
179   // between empty and full conditions (if we would use log(kSize) bits for
180   // position, these conditions would be indistinguishable); (2) obtain
181   // consistent snapshot of front_/back_ for Size operation using the
182   // modification counters.
183   std::atomic<unsigned> front_;
184   std::atomic<unsigned> back_;
185   Elem array_[kSize];
186 
187   // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
188   // only whether the size is 0 is guaranteed to be correct.
189   // Can be called by any thread at any time.
190   template<bool NeedSizeEstimate>
SizeOrNotEmpty()191   unsigned SizeOrNotEmpty() const {
192     // Emptiness plays critical role in thread pool blocking. So we go to great
193     // effort to not produce false positives (claim non-empty queue as empty).
194     unsigned front = front_.load(std::memory_order_acquire);
195     for (;;) {
196       // Capture a consistent snapshot of front/tail.
197       unsigned back = back_.load(std::memory_order_acquire);
198       unsigned front1 = front_.load(std::memory_order_relaxed);
199       if (front != front1) {
200         front = front1;
201         std::atomic_thread_fence(std::memory_order_acquire);
202         continue;
203       }
204       if (NeedSizeEstimate) {
205         return CalculateSize(front, back);
206       } else {
207         // This value will be 0 if the queue is empty, and undefined otherwise.
208         unsigned maybe_zero = ((front ^ back) & kMask2);
209         // Queue size estimate must agree with maybe zero check on the queue
210         // empty/non-empty state.
211         eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
212         return maybe_zero;
213       }
214     }
215   }
216 
217   EIGEN_ALWAYS_INLINE
CalculateSize(unsigned front,unsigned back)218   unsigned CalculateSize(unsigned front, unsigned back) const {
219     int size = (front & kMask2) - (back & kMask2);
220     // Fix overflow.
221     if (size < 0) size += 2 * kSize;
222     // Order of modification in push/pop is crafted to make the queue look
223     // larger than it is during concurrent modifications. E.g. push can
224     // increment size before the corresponding pop has decremented it.
225     // So the computed size can be up to kSize + 1, fix it.
226     if (size > static_cast<int>(kSize)) size = kSize;
227     return static_cast<unsigned>(size);
228   }
229 
230   RunQueue(const RunQueue&) = delete;
231   void operator=(const RunQueue&) = delete;
232 };
233 
234 }  // namespace Eigen
235 
236 #endif  // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
237