1 // Copyright 2019 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_POST_JOB_H_ 6 #define BASE_TASK_POST_JOB_H_ 7 8 #include <limits> 9 10 #include "base/base_export.h" 11 #include "base/dcheck_is_on.h" 12 #include "base/functional/callback.h" 13 #include "base/location.h" 14 #include "base/memory/stack_allocated.h" 15 16 namespace base { 17 namespace internal { 18 class JobTaskSource; 19 class PooledTaskRunnerDelegate; 20 } 21 22 class TaskTraits; 23 enum class TaskPriority : uint8_t; 24 25 // Delegate that's passed to Job's worker task, providing an entry point to 26 // communicate with the scheduler. To prevent deadlocks, JobDelegate methods 27 // should never be called while holding a user lock. 28 class BASE_EXPORT JobDelegate { 29 STACK_ALLOCATED(); 30 31 public: 32 // A JobDelegate is instantiated for each worker task that is run. 33 // |task_source| is the task source whose worker task is running with this 34 // delegate and |pooled_task_runner_delegate| is used by ShouldYield() to 35 // check whether the pool wants this worker task to yield (null if this worker 36 // should never yield -- e.g. when the main thread is a worker). 37 JobDelegate(internal::JobTaskSource* task_source, 38 internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate); 39 40 JobDelegate(const JobDelegate&) = delete; 41 JobDelegate& operator=(const JobDelegate&) = delete; 42 43 ~JobDelegate(); 44 45 // Returns true if this thread *must* return from the worker task on the 46 // current thread ASAP. Workers should periodically invoke ShouldYield (or 47 // YieldIfNeeded()) as often as is reasonable. 48 bool ShouldYield(); 49 50 // If ShouldYield(), this will pause the current thread (allowing it to be 51 // replaced in the pool); no-ops otherwise. If it pauses, it will resume and 52 // return from this call whenever higher priority work completes. 53 // Prefer ShouldYield() over this (only use YieldIfNeeded() when unwinding 54 // the stack is not possible). 55 void YieldIfNeeded(); 56 57 // Notifies the scheduler that max concurrency was increased, and the number 58 // of worker should be adjusted accordingly. See PostJob() for more details. 59 void NotifyConcurrencyIncrease(); 60 61 // Returns a task_id unique among threads currently running this job, such 62 // that GetTaskId() < worker count. To achieve this, the same task_id may be 63 // reused by a different thread after a worker_task returns. 64 uint8_t GetTaskId(); 65 66 // Returns true if the current task is called from the thread currently 67 // running JobHandle::Join(). IsJoiningThread()68 bool IsJoiningThread() const { 69 return pooled_task_runner_delegate_ == nullptr; 70 } 71 72 private: 73 static constexpr uint8_t kInvalidTaskId = std::numeric_limits<uint8_t>::max(); 74 75 internal::JobTaskSource* task_source_ = nullptr; 76 internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate_ = nullptr; 77 uint8_t task_id_ = kInvalidTaskId; 78 79 #if DCHECK_IS_ON() 80 // Value returned by the last call to ShouldYield(). 81 bool last_should_yield_ = false; 82 #endif 83 }; 84 85 // Handle returned when posting a Job. Provides methods to control execution of 86 // the posted Job. To prevent deadlocks, JobHandle methods should never be 87 // called while holding a user lock. 88 class BASE_EXPORT JobHandle { 89 public: 90 JobHandle(); 91 92 JobHandle(const JobHandle&) = delete; 93 JobHandle& operator=(const JobHandle&) = delete; 94 95 // A job must either be joined, canceled or detached before the JobHandle is 96 // destroyed. 97 ~JobHandle(); 98 99 JobHandle(JobHandle&&); 100 JobHandle& operator=(JobHandle&&); 101 102 // Returns true if associated with a Job. 103 explicit operator bool() const { return task_source_ != nullptr; } 104 105 // Returns true if there's any work pending or any worker running. 106 bool IsActive() const; 107 108 // Update this Job's priority. 109 void UpdatePriority(TaskPriority new_priority); 110 111 // Notifies the scheduler that max concurrency was increased, and the number 112 // of workers should be adjusted accordingly. See PostJob() for more details. 113 void NotifyConcurrencyIncrease(); 114 115 // Contributes to the job on this thread. Doesn't return until all tasks have 116 // completed and max concurrency becomes 0. This also promotes this Job's 117 // priority to be at least as high as the calling thread's priority. When 118 // called immediately, prefer CreateJob(...).Join() over PostJob(...).Join() 119 // to avoid having too many workers scheduled for executing the workload. 120 void Join(); 121 122 // Forces all existing workers to yield ASAP. Waits until they have all 123 // returned from the Job's callback before returning. 124 void Cancel(); 125 126 // Forces all existing workers to yield ASAP but doesn’t wait for them. 127 // Warning, this is dangerous if the Job's callback is bound to or has access 128 // to state which may be deleted after this call. 129 void CancelAndDetach(); 130 131 // Can be invoked before ~JobHandle() to avoid waiting on the job completing. 132 void Detach(); 133 134 private: 135 friend class internal::JobTaskSource; 136 137 explicit JobHandle(scoped_refptr<internal::JobTaskSource> task_source); 138 139 scoped_refptr<internal::JobTaskSource> task_source_; 140 }; 141 142 // Callback used in PostJob() to control the maximum number of threads calling 143 // the worker task concurrently. 144 145 // Returns the maximum number of threads which may call a job's worker task 146 // concurrently. |worker_count| is the number of threads currently assigned to 147 // this job which some callers may need to determine their return value. 148 using MaxConcurrencyCallback = 149 RepeatingCallback<size_t(size_t /*worker_count*/)>; 150 151 // Posts a repeating |worker_task| with specific |traits| to run in parallel on 152 // base::ThreadPool. 153 // Returns a JobHandle associated with the Job, which can be joined, canceled or 154 // detached. 155 // ThreadPool APIs, including PostJob() and methods of the returned JobHandle, 156 // must never be called while holding a lock that could be acquired by 157 // |worker_task| or |max_concurrency_callback| -- that could result in a 158 // deadlock. This is because [1] |max_concurrency_callback| may be invoked while 159 // holding internal ThreadPool lock (A), hence |max_concurrency_callback| can 160 // only use a lock (B) if that lock is *never* held while calling back into a 161 // ThreadPool entry point from any thread (A=>B/B=>A deadlock) and [2] 162 // |worker_task| or |max_concurrency_callback| is invoked synchronously from 163 // JobHandle::Join() (A=>JobHandle::Join()=>A deadlock). 164 // To avoid scheduling overhead, |worker_task| should do as much work as 165 // possible in a loop when invoked, and JobDelegate::ShouldYield() should be 166 // periodically invoked to conditionally exit and let the scheduler prioritize 167 // work. 168 // 169 // A canonical implementation of |worker_task| looks like: 170 // void WorkerTask(JobDelegate* job_delegate) { 171 // while (!job_delegate->ShouldYield()) { 172 // auto work_item = worker_queue.TakeWorkItem(); // Smallest unit of work. 173 // if (!work_item) 174 // return: 175 // ProcessWork(work_item); 176 // } 177 // } 178 // 179 // |max_concurrency_callback| controls the maximum number of threads calling 180 // |worker_task| concurrently. |worker_task| is only invoked if the number of 181 // threads previously running |worker_task| was less than the value returned by 182 // |max_concurrency_callback|. In general, |max_concurrency_callback| should 183 // return the latest number of incomplete work items (smallest unit of work) 184 // left to processed. JobHandle/JobDelegate::NotifyConcurrencyIncrease() *must* 185 // be invoked shortly after |max_concurrency_callback| starts returning a value 186 // larger than previously returned values. This usually happens when new work 187 // items are added and the API user wants additional threads to invoke 188 // |worker_task| concurrently. The callbacks may be called concurrently on any 189 // thread until the job is complete. If the job handle is detached, the 190 // callbacks may still be called, so they must not access global state that 191 // could be destroyed. 192 // 193 // |traits| requirements: 194 // - base::ThreadPolicy must be specified if the priority of the task runner 195 // will ever be increased from BEST_EFFORT. 196 JobHandle BASE_EXPORT PostJob(const Location& from_here, 197 const TaskTraits& traits, 198 RepeatingCallback<void(JobDelegate*)> worker_task, 199 MaxConcurrencyCallback max_concurrency_callback); 200 201 // Creates and returns a JobHandle associated with a Job. Unlike PostJob(), this 202 // doesn't immediately schedules |worker_task| to run on base::ThreadPool 203 // workers; the Job is then scheduled by calling either 204 // NotifyConcurrencyIncrease() or Join(). 205 JobHandle BASE_EXPORT 206 CreateJob(const Location& from_here, 207 const TaskTraits& traits, 208 RepeatingCallback<void(JobDelegate*)> worker_task, 209 MaxConcurrencyCallback max_concurrency_callback); 210 211 } // namespace base 212 213 #endif // BASE_TASK_POST_JOB_H_ 214