xref: /aosp_15_r20/external/eigen/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h (revision bf2c37156dfe67e5dfebd6d394bad8b2ab5804d4)
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2014 Benoit Steiner <[email protected]>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
12 
13 namespace Eigen {
14 
15 // Runs an arbitrary function and then calls Notify() on the passed in
16 // Notification.
17 template <typename Function, typename... Args> struct FunctionWrapperWithNotification
18 {
runFunctionWrapperWithNotification19   static void run(Notification* n, Function f, Args... args) {
20     f(args...);
21     if (n) {
22       n->Notify();
23     }
24   }
25 };
26 
27 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier
28 {
runFunctionWrapperWithBarrier29   static void run(Barrier* b, Function f, Args... args) {
30     f(args...);
31     if (b) {
32       b->Notify();
33     }
34   }
35 };
36 
37 template <typename SyncType>
wait_until_ready(SyncType * n)38 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
39   if (n) {
40     n->Wait();
41   }
42 }
43 
44 // An abstract interface to a device specific memory allocator.
45 class Allocator {
46  public:
~Allocator()47   virtual ~Allocator() {}
48   virtual void* allocate(size_t num_bytes) const = 0;
49   virtual void deallocate(void* buffer) const = 0;
50 };
51 
52 // Build a thread pool device on top the an existing pool of threads.
53 struct ThreadPoolDevice {
54   // The ownership of the thread pool remains with the caller.
55   ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr)
pool_ThreadPoolDevice56       : pool_(pool), num_threads_(num_cores), allocator_(allocator) { }
57 
allocateThreadPoolDevice58   EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
59     return allocator_ ? allocator_->allocate(num_bytes)
60         : internal::aligned_malloc(num_bytes);
61   }
62 
deallocateThreadPoolDevice63   EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
64     if (allocator_) {
65       allocator_->deallocate(buffer);
66     } else {
67       internal::aligned_free(buffer);
68     }
69   }
70 
allocate_tempThreadPoolDevice71     EIGEN_STRONG_INLINE void* allocate_temp(size_t num_bytes) const {
72     return allocate(num_bytes);
73   }
74 
deallocate_tempThreadPoolDevice75   EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const {
76     deallocate(buffer);
77   }
78 
79   template<typename Type>
getThreadPoolDevice80   EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const {
81     return data;
82   }
83 
memcpyThreadPoolDevice84   EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
85 #ifdef __ANDROID__
86     ::memcpy(dst, src, n);
87 #else
88     // TODO(rmlarsen): Align blocks on cache lines.
89     // We have observed that going beyond 4 threads usually just wastes
90     // CPU cycles due to the threads competing for memory bandwidth, so we
91     // statically schedule at most 4 block copies here.
92     const size_t kMinBlockSize = 32768;
93     const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
94     if (n <= kMinBlockSize || num_threads < 2) {
95       ::memcpy(dst, src, n);
96     } else {
97       const char* src_ptr = static_cast<const char*>(src);
98       char* dst_ptr = static_cast<char*>(dst);
99       const size_t blocksize = (n + (num_threads - 1)) / num_threads;
100       Barrier barrier(static_cast<int>(num_threads - 1));
101       // Launch the last 3 blocks on worker threads.
102       for (size_t i = 1; i < num_threads; ++i) {
103         enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] {
104           ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize,
105                    numext::mini(blocksize, n - (i * blocksize)));
106         });
107       }
108       // Launch the first block on the main thread.
109       ::memcpy(dst_ptr, src_ptr, blocksize);
110       barrier.Wait();
111     }
112 #endif
113   }
memcpyHostToDeviceThreadPoolDevice114   EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
115     memcpy(dst, src, n);
116   }
memcpyDeviceToHostThreadPoolDevice117   EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
118     memcpy(dst, src, n);
119   }
120 
memsetThreadPoolDevice121   EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
122     ::memset(buffer, c, n);
123   }
124 
numThreadsThreadPoolDevice125   EIGEN_STRONG_INLINE int numThreads() const {
126     return num_threads_;
127   }
128 
129   // Number of theads available in the underlying thread pool. This number can
130   // be different from the value returned by numThreads().
numThreadsInPoolThreadPoolDevice131   EIGEN_STRONG_INLINE int numThreadsInPool() const {
132     return pool_->NumThreads();
133   }
134 
firstLevelCacheSizeThreadPoolDevice135   EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
136     return l1CacheSize();
137   }
138 
lastLevelCacheSizeThreadPoolDevice139   EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
140     // The l3 cache size is shared between all the cores.
141     return l3CacheSize() / num_threads_;
142   }
143 
majorDeviceVersionThreadPoolDevice144   EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
145     // Should return an enum that encodes the ISA supported by the CPU
146     return 1;
147   }
148 
149   template <class Function, class... Args>
enqueueThreadPoolDevice150   EIGEN_STRONG_INLINE Notification* enqueue(Function&& f,
151                                             Args&&... args) const {
152     Notification* n = new Notification();
153     pool_->Schedule(
154         std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n,
155                   std::move(f), args...));
156     return n;
157   }
158 
159   template <class Function, class... Args>
enqueue_with_barrierThreadPoolDevice160   EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, Function&& f,
161                                                 Args&&... args) const {
162     pool_->Schedule(
163         std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run, b,
164                   std::move(f), args...));
165   }
166 
167   template <class Function, class... Args>
enqueueNoNotificationThreadPoolDevice168   EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f,
169                                                  Args&&... args) const {
170     if (sizeof...(args) > 0) {
171       pool_->Schedule(std::bind(std::move(f), args...));
172     } else {
173       pool_->Schedule(std::move(f));
174     }
175   }
176 
177   // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
178   // called from one of the threads in pool_. Returns -1 otherwise.
currentThreadIdThreadPoolDevice179   EIGEN_STRONG_INLINE int currentThreadId() const {
180     return pool_->CurrentThreadId();
181   }
182 
183   // WARNING: This function is synchronous and will block the calling thread.
184   //
185   // Synchronous parallelFor executes f with [0, n) arguments in parallel and
186   // waits for completion. F accepts a half-open interval [first, last). Block
187   // size is chosen based on the iteration cost and resulting parallel
188   // efficiency. If block_align is not nullptr, it is called to round up the
189   // block size.
parallelForThreadPoolDevice190   void parallelFor(Index n, const TensorOpCost& cost,
191                    std::function<Index(Index)> block_align,
192                    std::function<void(Index, Index)> f) const {
193     if (EIGEN_PREDICT_FALSE(n <= 0)){
194       return;
195     // Compute small problems directly in the caller thread.
196     } else if (n == 1 || numThreads() == 1 ||
197                CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
198       f(0, n);
199       return;
200     }
201 
202     // Compute block size and total count of blocks.
203     ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
204 
205     // Recursively divide size into halves until we reach block_size.
206     // Division code rounds mid to block_size, so we are guaranteed to get
207     // block_count leaves that do actual computations.
208     Barrier barrier(static_cast<unsigned int>(block.count));
209     std::function<void(Index, Index)> handleRange;
210     handleRange = [=, &handleRange, &barrier, &f](Index firstIdx,
211                                                   Index lastIdx) {
212       while (lastIdx - firstIdx > block.size) {
213         // Split into halves and schedule the second half on a different thread.
214         const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
215         pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
216         lastIdx = midIdx;
217       }
218       // Single block or less, execute directly.
219       f(firstIdx, lastIdx);
220       barrier.Notify();
221     };
222 
223     if (block.count <= numThreads()) {
224       // Avoid a thread hop by running the root of the tree and one block on the
225       // main thread.
226       handleRange(0, n);
227     } else {
228       // Execute the root in the thread pool to avoid running work on more than
229       // numThreads() threads.
230       pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
231     }
232 
233     barrier.Wait();
234   }
235 
236   // Convenience wrapper for parallelFor that does not align blocks.
parallelForThreadPoolDevice237   void parallelFor(Index n, const TensorOpCost& cost,
238                    std::function<void(Index, Index)> f) const {
239     parallelFor(n, cost, nullptr, std::move(f));
240   }
241 
242   // WARNING: This function is asynchronous and will not block the calling thread.
243   //
244   // Asynchronous parallelFor executes f with [0, n) arguments in parallel
245   // without waiting for completion. When the last block finished, it will call
246   // 'done' callback. F accepts a half-open interval [first, last). Block size
247   // is chosen based on the iteration cost and resulting parallel efficiency. If
248   // block_align is not nullptr, it is called to round up the block size.
parallelForAsyncThreadPoolDevice249   void parallelForAsync(Index n, const TensorOpCost& cost,
250                         std::function<Index(Index)> block_align,
251                         std::function<void(Index, Index)> f,
252                         std::function<void()> done) const {
253     // Compute small problems directly in the caller thread.
254     if (n <= 1 || numThreads() == 1 ||
255         CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
256       f(0, n);
257       done();
258       return;
259     }
260 
261     // Compute block size and total count of blocks.
262     ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
263 
264     ParallelForAsyncContext* const ctx =
265         new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
266 
267     // Recursively divide size into halves until we reach block_size.
268     // Division code rounds mid to block_size, so we are guaranteed to get
269     // block_count leaves that do actual computations.
270     ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
271       while (lastIdx - firstIdx > block.size) {
272         // Split into halves and schedule the second half on a different thread.
273         const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
274         pool_->Schedule(
275             [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
276         lastIdx = midIdx;
277       }
278 
279       // Single block or less, execute directly.
280       ctx->f(firstIdx, lastIdx);
281 
282       // Delete async context if it was the last block.
283       if (ctx->count.fetch_sub(1) == 1) delete ctx;
284     };
285 
286     if (block.count <= numThreads()) {
287       // Avoid a thread hop by running the root of the tree and one block on the
288       // main thread.
289       ctx->handle_range(0, n);
290     } else {
291       // Execute the root in the thread pool to avoid running work on more than
292       // numThreads() threads.
293       pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
294     }
295   }
296 
297   // Convenience wrapper for parallelForAsync that does not align blocks.
parallelForAsyncThreadPoolDevice298   void parallelForAsync(Index n, const TensorOpCost& cost,
299                         std::function<void(Index, Index)> f,
300                         std::function<void()> done) const {
301     parallelForAsync(n, cost, nullptr, std::move(f), std::move(done));
302   }
303 
304   // Thread pool accessor.
getPoolThreadPoolDevice305   ThreadPoolInterface* getPool() const { return pool_; }
306 
307   // Allocator accessor.
allocatorThreadPoolDevice308   Allocator* allocator() const { return allocator_; }
309 
310  private:
311   typedef TensorCostModel<ThreadPoolDevice> CostModel;
312 
313   // For parallelForAsync we must keep passed in closures on the heap, and
314   // delete them only after `done` callback finished.
315   struct ParallelForAsyncContext {
ParallelForAsyncContextThreadPoolDevice::ParallelForAsyncContext316     ParallelForAsyncContext(Index block_count,
317                             std::function<void(Index, Index)> block_f,
318                             std::function<void()> done_callback)
319         : count(block_count),
320           f(std::move(block_f)),
321           done(std::move(done_callback)) {}
~ParallelForAsyncContextThreadPoolDevice::ParallelForAsyncContext322     ~ParallelForAsyncContext() { done(); }
323 
324     std::atomic<Index> count;
325     std::function<void(Index, Index)> f;
326     std::function<void()> done;
327 
328     std::function<void(Index, Index)> handle_range;
329   };
330 
331   struct ParallelForBlock {
332     Index size;   // block size
333     Index count;  // number of blocks
334   };
335 
336   // Calculates block size based on (1) the iteration cost and (2) parallel
337   // efficiency. We want blocks to be not too small to mitigate parallelization
338   // overheads; not too large to mitigate tail effect and potential load
339   // imbalance and we also want number of blocks to be evenly dividable across
340   // threads.
CalculateParallelForBlockThreadPoolDevice341   ParallelForBlock CalculateParallelForBlock(
342       const Index n, const TensorOpCost& cost,
343       std::function<Index(Index)> block_align) const {
344     const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
345     const Index max_oversharding_factor = 4;
346     Index block_size = numext::mini(
347         n, numext::maxi<Index>(
348                divup<Index>(n, max_oversharding_factor * numThreads()),
349                block_size_f));
350     const Index max_block_size = numext::mini(n, 2 * block_size);
351 
352     if (block_align) {
353       Index new_block_size = block_align(block_size);
354       eigen_assert(new_block_size >= block_size);
355       block_size = numext::mini(n, new_block_size);
356     }
357 
358     Index block_count = divup(n, block_size);
359 
360     // Calculate parallel efficiency as fraction of total CPU time used for
361     // computations:
362     double max_efficiency =
363         static_cast<double>(block_count) /
364         (divup<int>(block_count, numThreads()) * numThreads());
365 
366     // Now try to increase block size up to max_block_size as long as it
367     // doesn't decrease parallel efficiency.
368     for (Index prev_block_count = block_count;
369          max_efficiency < 1.0 && prev_block_count > 1;) {
370       // This is the next block size that divides size into a smaller number
371       // of blocks than the current block_size.
372       Index coarser_block_size = divup(n, prev_block_count - 1);
373       if (block_align) {
374         Index new_block_size = block_align(coarser_block_size);
375         eigen_assert(new_block_size >= coarser_block_size);
376         coarser_block_size = numext::mini(n, new_block_size);
377       }
378       if (coarser_block_size > max_block_size) {
379         break;  // Reached max block size. Stop.
380       }
381       // Recalculate parallel efficiency.
382       const Index coarser_block_count = divup(n, coarser_block_size);
383       eigen_assert(coarser_block_count < prev_block_count);
384       prev_block_count = coarser_block_count;
385       const double coarser_efficiency =
386           static_cast<double>(coarser_block_count) /
387           (divup<int>(coarser_block_count, numThreads()) * numThreads());
388       if (coarser_efficiency + 0.01 >= max_efficiency) {
389         // Taking it.
390         block_size = coarser_block_size;
391         block_count = coarser_block_count;
392         if (max_efficiency < coarser_efficiency) {
393           max_efficiency = coarser_efficiency;
394         }
395       }
396     }
397 
398     return {block_size, block_count};
399   }
400 
401   ThreadPoolInterface* pool_;
402   int num_threads_;
403   Allocator* allocator_;
404 };
405 
406 
407 }  // end namespace Eigen
408 
409 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
410