1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_ 17 #define TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_ 18 19 #include <functional> 20 #include <memory> 21 22 #include "absl/types/optional.h" 23 #include "tensorflow/core/platform/env.h" 24 #include "tensorflow/core/platform/macros.h" 25 #include "tensorflow/core/platform/threadpool_interface.h" 26 #include "tensorflow/core/platform/types.h" 27 28 namespace Eigen { 29 class Allocator; 30 class ThreadPoolInterface; 31 struct ThreadPoolDevice; 32 33 template <typename Environment> 34 class ThreadPoolTempl; 35 } // namespace Eigen 36 37 namespace tensorflow { 38 namespace thread { 39 40 struct EigenEnvironment; 41 42 class ThreadPool { 43 public: 44 // Scheduling strategies for ParallelFor. The strategy governs how the given 45 // units of work are distributed among the available threads in the 46 // threadpool. 47 enum class SchedulingStrategy { 48 // The Adaptive scheduling strategy adaptively chooses the shard sizes based 49 // on the cost of each unit of work, and the cost model of the underlying 50 // threadpool device. 51 // 52 // The 'cost_per_unit' is an estimate of the number of CPU cycles (or 53 // nanoseconds if not CPU-bound) to complete a unit of work. Overestimating 54 // creates too many shards and CPU time will be dominated by per-shard 55 // overhead, such as Context creation. Underestimating may not fully make 56 // use of the specified parallelism, and may also cause inefficiencies due 57 // to load balancing issues and stragglers. 58 kAdaptive, 59 // The Fixed Block Size scheduling strategy shards the given units of work 60 // into shards of fixed size. In case the total number of units is not 61 // evenly divisible by 'block_size', at most one of the shards may be of 62 // smaller size. The exact number of shards may be found by a call to 63 // NumShardsUsedByFixedBlockSizeScheduling. 64 // 65 // Each shard may be executed on a different thread in parallel, depending 66 // on the number of threads available in the pool. Note that when there 67 // aren't enough threads in the pool to achieve full parallelism, function 68 // calls will be automatically queued. 69 kFixedBlockSize 70 }; 71 72 // Contains additional parameters for either the Adaptive or the Fixed Block 73 // Size scheduling strategy. 74 class SchedulingParams { 75 public: SchedulingParams(SchedulingStrategy strategy,absl::optional<int64_t> cost_per_unit,absl::optional<int64_t> block_size)76 explicit SchedulingParams(SchedulingStrategy strategy, 77 absl::optional<int64_t> cost_per_unit, 78 absl::optional<int64_t> block_size) 79 : strategy_(strategy), 80 cost_per_unit_(cost_per_unit), 81 block_size_(block_size) {} 82 strategy()83 SchedulingStrategy strategy() const { return strategy_; } cost_per_unit()84 absl::optional<int64_t> cost_per_unit() const { return cost_per_unit_; } block_size()85 absl::optional<int64_t> block_size() const { return block_size_; } 86 87 private: 88 // The underlying Scheduling Strategy for which this instance contains 89 // additional parameters. 90 SchedulingStrategy strategy_; 91 92 // The estimated cost per unit of work in number of CPU cycles (or 93 // nanoseconds if not CPU-bound). Only applicable for Adaptive scheduling 94 // strategy. 95 absl::optional<int64_t> cost_per_unit_; 96 97 // The block size of each shard. Only applicable for Fixed Block Size 98 // scheduling strategy. 99 absl::optional<int64_t> block_size_; 100 }; 101 102 // Constructs a pool that contains "num_threads" threads with specified 103 // "name". env->StartThread() is used to create individual threads with the 104 // given ThreadOptions. If "low_latency_hint" is true the thread pool 105 // implementation may use it as a hint that lower latency is preferred at the 106 // cost of higher CPU usage, e.g. by letting one or more idle threads spin 107 // wait. Conversely, if the threadpool is used to schedule high-latency 108 // operations like I/O the hint should be set to false. 109 // 110 // REQUIRES: num_threads > 0 111 ThreadPool(Env* env, const ThreadOptions& thread_options, 112 const std::string& name, int num_threads, bool low_latency_hint, 113 Eigen::Allocator* allocator = nullptr); 114 115 // Constructs a pool for low-latency ops that contains "num_threads" threads 116 // with specified "name". env->StartThread() is used to create individual 117 // threads. 118 // REQUIRES: num_threads > 0 119 ThreadPool(Env* env, const std::string& name, int num_threads); 120 121 // Constructs a pool for low-latency ops that contains "num_threads" threads 122 // with specified "name". env->StartThread() is used to create individual 123 // threads with the given ThreadOptions. 124 // REQUIRES: num_threads > 0 125 ThreadPool(Env* env, const ThreadOptions& thread_options, 126 const std::string& name, int num_threads); 127 128 // Constructs a pool that wraps around the thread::ThreadPoolInterface 129 // instance provided by the caller. Caller retains ownership of 130 // `user_threadpool` and must ensure its lifetime is longer than the 131 // ThreadPool instance. 132 explicit ThreadPool(thread::ThreadPoolInterface* user_threadpool); 133 134 // Waits until all scheduled work has finished and then destroy the 135 // set of threads. 136 ~ThreadPool(); 137 138 // Schedules fn() for execution in the pool of threads. 139 void Schedule(std::function<void()> fn); 140 141 void SetStealPartitions( 142 const std::vector<std::pair<unsigned, unsigned>>& partitions); 143 144 void ScheduleWithHint(std::function<void()> fn, int start, int limit); 145 146 // Returns the number of shards used by ParallelForFixedBlockSizeScheduling 147 // with these parameters. 148 int NumShardsUsedByFixedBlockSizeScheduling(const int64_t total, 149 const int64_t block_size); 150 151 // Returns the number of threads spawned by calling TransformRangeConcurrently 152 // with these parameters. 153 // Deprecated. Use NumShardsUsedByFixedBlockSizeScheduling. 154 int NumShardsUsedByTransformRangeConcurrently(const int64_t block_size, 155 const int64_t total); 156 157 // ParallelFor shards the "total" units of work assuming each unit of work 158 // having roughly "cost_per_unit" cost, in cycles. Each unit of work is 159 // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work 160 // and the total cost of each shard is roughly the same. 161 // 162 // "cost_per_unit" is an estimate of the number of CPU cycles (or nanoseconds 163 // if not CPU-bound) to complete a unit of work. Overestimating creates too 164 // many shards and CPU time will be dominated by per-shard overhead, such as 165 // Context creation. Underestimating may not fully make use of the specified 166 // parallelism, and may also cause inefficiencies due to load balancing 167 // issues and stragglers. 168 void ParallelFor(int64_t total, int64_t cost_per_unit, 169 const std::function<void(int64_t, int64_t)>& fn); 170 171 // Similar to ParallelFor above, but takes the specified scheduling strategy 172 // into account. 173 void ParallelFor(int64_t total, const SchedulingParams& scheduling_params, 174 const std::function<void(int64_t, int64_t)>& fn); 175 176 // Same as ParallelFor with Fixed Block Size scheduling strategy. 177 // Deprecated. Prefer ParallelFor with a SchedulingStrategy argument. 178 void TransformRangeConcurrently( 179 const int64_t block_size, const int64_t total, 180 const std::function<void(int64_t, int64_t)>& fn); 181 182 // Shards the "total" units of work. For more details, see "ParallelFor". 183 // 184 // The function is passed a thread_id between 0 and NumThreads() *inclusive*. 185 // This is because some work can happen on the caller thread while the threads 186 // in the pool are also being used. 187 // 188 // The caller can allocate NumThreads() + 1 separate buffers for each thread. 189 // Each thread can safely write to the buffer given by its id without 190 // synchronization. However, the worker fn may be called multiple times 191 // sequentially with the same id. 192 // 193 // At most NumThreads() unique ids will actually be used, and only a few may 194 // be used for small workloads. If each buffer is expensive, the buffers 195 // should be stored in an array initially filled with null, and a buffer 196 // should be allocated by fn the first time that the id is used. 197 void ParallelForWithWorkerId( 198 int64_t total, int64_t cost_per_unit, 199 const std::function<void(int64_t, int64_t, int)>& fn); 200 201 // Similar to ParallelForWithWorkerId above, but takes the specified 202 // scheduling strategy into account. 203 void ParallelForWithWorkerId( 204 int64_t total, const SchedulingParams& scheduling_params, 205 const std::function<void(int64_t, int64_t, int)>& fn); 206 207 // Returns the number of threads in the pool. 208 int NumThreads() const; 209 210 // Returns current thread id between 0 and NumThreads() - 1, if called from a 211 // thread in the pool. Returns -1 otherwise. 212 int CurrentThreadId() const; 213 214 // If ThreadPool implementation is compatible with Eigen::ThreadPoolInterface, 215 // returns a non-null pointer. The caller does not own the object the returned 216 // pointer points to, and should not attempt to delete. 217 Eigen::ThreadPoolInterface* AsEigenThreadPool() const; 218 219 private: 220 // Divides the work represented by the range [0, total) into k shards. 221 // Calls fn(i*block_size, (i+1)*block_size) from the ith shard (0 <= i < k). 222 // Each shard may be executed on a different thread in parallel, depending on 223 // the number of threads available in the pool. 224 // When (i+1)*block_size > total, fn(i*block_size, total) is called instead. 225 // Here, k = NumShardsUsedByFixedBlockSizeScheduling(total, block_size). 226 // Requires 0 < block_size <= total. 227 void ParallelForFixedBlockSizeScheduling( 228 const int64_t total, const int64_t block_size, 229 const std::function<void(int64_t, int64_t)>& fn); 230 231 // underlying_threadpool_ is the user_threadpool if user_threadpool is 232 // provided in the constructor. Otherwise it is the eigen_threadpool_. 233 Eigen::ThreadPoolInterface* underlying_threadpool_; 234 // eigen_threadpool_ is instantiated and owned by thread::ThreadPool if 235 // user_threadpool is not in the constructor. 236 std::unique_ptr<Eigen::ThreadPoolTempl<EigenEnvironment>> eigen_threadpool_; 237 std::unique_ptr<Eigen::ThreadPoolDevice> threadpool_device_; 238 TF_DISALLOW_COPY_AND_ASSIGN(ThreadPool); 239 }; 240 241 } // namespace thread 242 } // namespace tensorflow 243 244 #endif // TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_ 245