1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2014 Benoit Steiner <[email protected]>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
12
13 namespace Eigen {
14
15 // Runs an arbitrary function and then calls Notify() on the passed in
16 // Notification.
17 template <typename Function, typename... Args> struct FunctionWrapperWithNotification
18 {
runFunctionWrapperWithNotification19 static void run(Notification* n, Function f, Args... args) {
20 f(args...);
21 if (n) {
22 n->Notify();
23 }
24 }
25 };
26
27 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier
28 {
runFunctionWrapperWithBarrier29 static void run(Barrier* b, Function f, Args... args) {
30 f(args...);
31 if (b) {
32 b->Notify();
33 }
34 }
35 };
36
37 template <typename SyncType>
wait_until_ready(SyncType * n)38 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
39 if (n) {
40 n->Wait();
41 }
42 }
43
44 // An abstract interface to a device specific memory allocator.
45 class Allocator {
46 public:
~Allocator()47 virtual ~Allocator() {}
48 virtual void* allocate(size_t num_bytes) const = 0;
49 virtual void deallocate(void* buffer) const = 0;
50 };
51
52 // Build a thread pool device on top the an existing pool of threads.
53 struct ThreadPoolDevice {
54 // The ownership of the thread pool remains with the caller.
55 ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr)
pool_ThreadPoolDevice56 : pool_(pool), num_threads_(num_cores), allocator_(allocator) { }
57
allocateThreadPoolDevice58 EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
59 return allocator_ ? allocator_->allocate(num_bytes)
60 : internal::aligned_malloc(num_bytes);
61 }
62
deallocateThreadPoolDevice63 EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
64 if (allocator_) {
65 allocator_->deallocate(buffer);
66 } else {
67 internal::aligned_free(buffer);
68 }
69 }
70
allocate_tempThreadPoolDevice71 EIGEN_STRONG_INLINE void* allocate_temp(size_t num_bytes) const {
72 return allocate(num_bytes);
73 }
74
deallocate_tempThreadPoolDevice75 EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const {
76 deallocate(buffer);
77 }
78
79 template<typename Type>
getThreadPoolDevice80 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const {
81 return data;
82 }
83
memcpyThreadPoolDevice84 EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
85 #ifdef __ANDROID__
86 ::memcpy(dst, src, n);
87 #else
88 // TODO(rmlarsen): Align blocks on cache lines.
89 // We have observed that going beyond 4 threads usually just wastes
90 // CPU cycles due to the threads competing for memory bandwidth, so we
91 // statically schedule at most 4 block copies here.
92 const size_t kMinBlockSize = 32768;
93 const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
94 if (n <= kMinBlockSize || num_threads < 2) {
95 ::memcpy(dst, src, n);
96 } else {
97 const char* src_ptr = static_cast<const char*>(src);
98 char* dst_ptr = static_cast<char*>(dst);
99 const size_t blocksize = (n + (num_threads - 1)) / num_threads;
100 Barrier barrier(static_cast<int>(num_threads - 1));
101 // Launch the last 3 blocks on worker threads.
102 for (size_t i = 1; i < num_threads; ++i) {
103 enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] {
104 ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize,
105 numext::mini(blocksize, n - (i * blocksize)));
106 });
107 }
108 // Launch the first block on the main thread.
109 ::memcpy(dst_ptr, src_ptr, blocksize);
110 barrier.Wait();
111 }
112 #endif
113 }
memcpyHostToDeviceThreadPoolDevice114 EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
115 memcpy(dst, src, n);
116 }
memcpyDeviceToHostThreadPoolDevice117 EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
118 memcpy(dst, src, n);
119 }
120
memsetThreadPoolDevice121 EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
122 ::memset(buffer, c, n);
123 }
124
numThreadsThreadPoolDevice125 EIGEN_STRONG_INLINE int numThreads() const {
126 return num_threads_;
127 }
128
129 // Number of theads available in the underlying thread pool. This number can
130 // be different from the value returned by numThreads().
numThreadsInPoolThreadPoolDevice131 EIGEN_STRONG_INLINE int numThreadsInPool() const {
132 return pool_->NumThreads();
133 }
134
firstLevelCacheSizeThreadPoolDevice135 EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
136 return l1CacheSize();
137 }
138
lastLevelCacheSizeThreadPoolDevice139 EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
140 // The l3 cache size is shared between all the cores.
141 return l3CacheSize() / num_threads_;
142 }
143
majorDeviceVersionThreadPoolDevice144 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
145 // Should return an enum that encodes the ISA supported by the CPU
146 return 1;
147 }
148
149 template <class Function, class... Args>
enqueueThreadPoolDevice150 EIGEN_STRONG_INLINE Notification* enqueue(Function&& f,
151 Args&&... args) const {
152 Notification* n = new Notification();
153 pool_->Schedule(
154 std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n,
155 std::move(f), args...));
156 return n;
157 }
158
159 template <class Function, class... Args>
enqueue_with_barrierThreadPoolDevice160 EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, Function&& f,
161 Args&&... args) const {
162 pool_->Schedule(
163 std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run, b,
164 std::move(f), args...));
165 }
166
167 template <class Function, class... Args>
enqueueNoNotificationThreadPoolDevice168 EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f,
169 Args&&... args) const {
170 if (sizeof...(args) > 0) {
171 pool_->Schedule(std::bind(std::move(f), args...));
172 } else {
173 pool_->Schedule(std::move(f));
174 }
175 }
176
177 // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
178 // called from one of the threads in pool_. Returns -1 otherwise.
currentThreadIdThreadPoolDevice179 EIGEN_STRONG_INLINE int currentThreadId() const {
180 return pool_->CurrentThreadId();
181 }
182
183 // WARNING: This function is synchronous and will block the calling thread.
184 //
185 // Synchronous parallelFor executes f with [0, n) arguments in parallel and
186 // waits for completion. F accepts a half-open interval [first, last). Block
187 // size is chosen based on the iteration cost and resulting parallel
188 // efficiency. If block_align is not nullptr, it is called to round up the
189 // block size.
parallelForThreadPoolDevice190 void parallelFor(Index n, const TensorOpCost& cost,
191 std::function<Index(Index)> block_align,
192 std::function<void(Index, Index)> f) const {
193 if (EIGEN_PREDICT_FALSE(n <= 0)){
194 return;
195 // Compute small problems directly in the caller thread.
196 } else if (n == 1 || numThreads() == 1 ||
197 CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
198 f(0, n);
199 return;
200 }
201
202 // Compute block size and total count of blocks.
203 ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
204
205 // Recursively divide size into halves until we reach block_size.
206 // Division code rounds mid to block_size, so we are guaranteed to get
207 // block_count leaves that do actual computations.
208 Barrier barrier(static_cast<unsigned int>(block.count));
209 std::function<void(Index, Index)> handleRange;
210 handleRange = [=, &handleRange, &barrier, &f](Index firstIdx,
211 Index lastIdx) {
212 while (lastIdx - firstIdx > block.size) {
213 // Split into halves and schedule the second half on a different thread.
214 const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
215 pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
216 lastIdx = midIdx;
217 }
218 // Single block or less, execute directly.
219 f(firstIdx, lastIdx);
220 barrier.Notify();
221 };
222
223 if (block.count <= numThreads()) {
224 // Avoid a thread hop by running the root of the tree and one block on the
225 // main thread.
226 handleRange(0, n);
227 } else {
228 // Execute the root in the thread pool to avoid running work on more than
229 // numThreads() threads.
230 pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
231 }
232
233 barrier.Wait();
234 }
235
236 // Convenience wrapper for parallelFor that does not align blocks.
parallelForThreadPoolDevice237 void parallelFor(Index n, const TensorOpCost& cost,
238 std::function<void(Index, Index)> f) const {
239 parallelFor(n, cost, nullptr, std::move(f));
240 }
241
242 // WARNING: This function is asynchronous and will not block the calling thread.
243 //
244 // Asynchronous parallelFor executes f with [0, n) arguments in parallel
245 // without waiting for completion. When the last block finished, it will call
246 // 'done' callback. F accepts a half-open interval [first, last). Block size
247 // is chosen based on the iteration cost and resulting parallel efficiency. If
248 // block_align is not nullptr, it is called to round up the block size.
parallelForAsyncThreadPoolDevice249 void parallelForAsync(Index n, const TensorOpCost& cost,
250 std::function<Index(Index)> block_align,
251 std::function<void(Index, Index)> f,
252 std::function<void()> done) const {
253 // Compute small problems directly in the caller thread.
254 if (n <= 1 || numThreads() == 1 ||
255 CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
256 f(0, n);
257 done();
258 return;
259 }
260
261 // Compute block size and total count of blocks.
262 ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
263
264 ParallelForAsyncContext* const ctx =
265 new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
266
267 // Recursively divide size into halves until we reach block_size.
268 // Division code rounds mid to block_size, so we are guaranteed to get
269 // block_count leaves that do actual computations.
270 ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
271 while (lastIdx - firstIdx > block.size) {
272 // Split into halves and schedule the second half on a different thread.
273 const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
274 pool_->Schedule(
275 [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
276 lastIdx = midIdx;
277 }
278
279 // Single block or less, execute directly.
280 ctx->f(firstIdx, lastIdx);
281
282 // Delete async context if it was the last block.
283 if (ctx->count.fetch_sub(1) == 1) delete ctx;
284 };
285
286 if (block.count <= numThreads()) {
287 // Avoid a thread hop by running the root of the tree and one block on the
288 // main thread.
289 ctx->handle_range(0, n);
290 } else {
291 // Execute the root in the thread pool to avoid running work on more than
292 // numThreads() threads.
293 pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
294 }
295 }
296
297 // Convenience wrapper for parallelForAsync that does not align blocks.
parallelForAsyncThreadPoolDevice298 void parallelForAsync(Index n, const TensorOpCost& cost,
299 std::function<void(Index, Index)> f,
300 std::function<void()> done) const {
301 parallelForAsync(n, cost, nullptr, std::move(f), std::move(done));
302 }
303
304 // Thread pool accessor.
getPoolThreadPoolDevice305 ThreadPoolInterface* getPool() const { return pool_; }
306
307 // Allocator accessor.
allocatorThreadPoolDevice308 Allocator* allocator() const { return allocator_; }
309
310 private:
311 typedef TensorCostModel<ThreadPoolDevice> CostModel;
312
313 // For parallelForAsync we must keep passed in closures on the heap, and
314 // delete them only after `done` callback finished.
315 struct ParallelForAsyncContext {
ParallelForAsyncContextThreadPoolDevice::ParallelForAsyncContext316 ParallelForAsyncContext(Index block_count,
317 std::function<void(Index, Index)> block_f,
318 std::function<void()> done_callback)
319 : count(block_count),
320 f(std::move(block_f)),
321 done(std::move(done_callback)) {}
~ParallelForAsyncContextThreadPoolDevice::ParallelForAsyncContext322 ~ParallelForAsyncContext() { done(); }
323
324 std::atomic<Index> count;
325 std::function<void(Index, Index)> f;
326 std::function<void()> done;
327
328 std::function<void(Index, Index)> handle_range;
329 };
330
331 struct ParallelForBlock {
332 Index size; // block size
333 Index count; // number of blocks
334 };
335
336 // Calculates block size based on (1) the iteration cost and (2) parallel
337 // efficiency. We want blocks to be not too small to mitigate parallelization
338 // overheads; not too large to mitigate tail effect and potential load
339 // imbalance and we also want number of blocks to be evenly dividable across
340 // threads.
CalculateParallelForBlockThreadPoolDevice341 ParallelForBlock CalculateParallelForBlock(
342 const Index n, const TensorOpCost& cost,
343 std::function<Index(Index)> block_align) const {
344 const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
345 const Index max_oversharding_factor = 4;
346 Index block_size = numext::mini(
347 n, numext::maxi<Index>(
348 divup<Index>(n, max_oversharding_factor * numThreads()),
349 block_size_f));
350 const Index max_block_size = numext::mini(n, 2 * block_size);
351
352 if (block_align) {
353 Index new_block_size = block_align(block_size);
354 eigen_assert(new_block_size >= block_size);
355 block_size = numext::mini(n, new_block_size);
356 }
357
358 Index block_count = divup(n, block_size);
359
360 // Calculate parallel efficiency as fraction of total CPU time used for
361 // computations:
362 double max_efficiency =
363 static_cast<double>(block_count) /
364 (divup<int>(block_count, numThreads()) * numThreads());
365
366 // Now try to increase block size up to max_block_size as long as it
367 // doesn't decrease parallel efficiency.
368 for (Index prev_block_count = block_count;
369 max_efficiency < 1.0 && prev_block_count > 1;) {
370 // This is the next block size that divides size into a smaller number
371 // of blocks than the current block_size.
372 Index coarser_block_size = divup(n, prev_block_count - 1);
373 if (block_align) {
374 Index new_block_size = block_align(coarser_block_size);
375 eigen_assert(new_block_size >= coarser_block_size);
376 coarser_block_size = numext::mini(n, new_block_size);
377 }
378 if (coarser_block_size > max_block_size) {
379 break; // Reached max block size. Stop.
380 }
381 // Recalculate parallel efficiency.
382 const Index coarser_block_count = divup(n, coarser_block_size);
383 eigen_assert(coarser_block_count < prev_block_count);
384 prev_block_count = coarser_block_count;
385 const double coarser_efficiency =
386 static_cast<double>(coarser_block_count) /
387 (divup<int>(coarser_block_count, numThreads()) * numThreads());
388 if (coarser_efficiency + 0.01 >= max_efficiency) {
389 // Taking it.
390 block_size = coarser_block_size;
391 block_count = coarser_block_count;
392 if (max_efficiency < coarser_efficiency) {
393 max_efficiency = coarser_efficiency;
394 }
395 }
396 }
397
398 return {block_size, block_count};
399 }
400
401 ThreadPoolInterface* pool_;
402 int num_threads_;
403 Allocator* allocator_;
404 };
405
406
407 } // end namespace Eigen
408
409 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
410