1 // Copyright 2019 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_ 6 #define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_ 7 8 #include <stddef.h> 9 10 #include <atomic> 11 #include <limits> 12 #include <optional> 13 #include <utility> 14 15 #include "base/base_export.h" 16 #include "base/functional/callback.h" 17 #include "base/memory/raw_ptr.h" 18 #include "base/synchronization/condition_variable.h" 19 #include "base/task/common/checked_lock.h" 20 #include "base/task/common/task_annotator.h" 21 #include "base/task/post_job.h" 22 #include "base/task/task_traits.h" 23 #include "base/task/thread_pool/task.h" 24 #include "base/task/thread_pool/task_source.h" 25 #include "base/task/thread_pool/task_source_sort_key.h" 26 27 namespace base { 28 namespace internal { 29 30 class PooledTaskRunnerDelegate; 31 32 // A JobTaskSource generates many Tasks from a single RepeatingClosure. 33 // 34 // Derived classes control the intended concurrency with GetMaxConcurrency(). 35 class BASE_EXPORT JobTaskSource : public TaskSource { 36 public: 37 JobTaskSource(const Location& from_here, 38 const TaskTraits& traits, 39 RepeatingCallback<void(JobDelegate*)> worker_task, 40 MaxConcurrencyCallback max_concurrency_callback, 41 PooledTaskRunnerDelegate* delegate); 42 JobTaskSource(const JobTaskSource&) = delete; 43 JobTaskSource& operator=(const JobTaskSource&) = delete; 44 CreateJobHandle(scoped_refptr<internal::JobTaskSource> task_source)45 static JobHandle CreateJobHandle( 46 scoped_refptr<internal::JobTaskSource> task_source) { 47 return JobHandle(std::move(task_source)); 48 } 49 50 // Called before the task source is enqueued to initialize task metadata. 51 void WillEnqueue(int sequence_num, TaskAnnotator& annotator); 52 53 // Notifies this task source that max concurrency was increased, and the 54 // number of worker should be adjusted. 55 void NotifyConcurrencyIncrease(); 56 57 // Informs this JobTaskSource that the current thread would like to join and 58 // contribute to running |worker_task|. Returns true if the joining thread can 59 // contribute (RunJoinTask() can be called), or false if joining was completed 60 // and all other workers returned because either there's no work remaining or 61 // Job was cancelled. 62 bool WillJoin(); 63 64 // Contributes to running |worker_task| and returns true if the joining thread 65 // can contribute again (RunJoinTask() can be called again), or false if 66 // joining was completed and all other workers returned because either there's 67 // no work remaining or Job was cancelled. This should be called only after 68 // WillJoin() or RunJoinTask() previously returned true. 69 bool RunJoinTask(); 70 71 // Cancels this JobTaskSource, causing all workers to yield and WillRunTask() 72 // to return RunStatus::kDisallowed. 73 void Cancel(TaskSource::Transaction* transaction = nullptr); 74 75 // TaskSource: 76 ExecutionEnvironment GetExecutionEnvironment() override; 77 size_t GetRemainingConcurrency() const override; 78 TaskSourceSortKey GetSortKey() const override; 79 TimeTicks GetDelayedSortKey() const override; 80 bool HasReadyTasks(TimeTicks now) const override; 81 82 bool IsActive() const; 83 size_t GetWorkerCount() const; 84 85 // Returns the maximum number of tasks from this TaskSource that can run 86 // concurrently. 87 size_t GetMaxConcurrency() const; 88 89 uint8_t AcquireTaskId(); 90 void ReleaseTaskId(uint8_t task_id); 91 92 // Returns true if a worker should return from the worker task on the current 93 // thread ASAP. 94 bool ShouldYield(); 95 delegate()96 PooledTaskRunnerDelegate* delegate() const { return delegate_; } 97 98 private: 99 // Atomic internal state to track the number of workers running a task from 100 // this JobTaskSource and whether this JobTaskSource is canceled. All 101 // operations are performed with std::memory_order_relaxed as State is only 102 // ever modified under a lock or read atomically (optimistic read). 103 class State { 104 public: 105 static constexpr uint32_t kCanceledMask = 1; 106 static constexpr int kWorkerCountBitOffset = 1; 107 static constexpr uint32_t kWorkerCountIncrement = 1 108 << kWorkerCountBitOffset; 109 110 struct Value { worker_countValue111 uint8_t worker_count() const { 112 return static_cast<uint8_t>(value >> kWorkerCountBitOffset); 113 } 114 // Returns true if canceled. is_canceledValue115 bool is_canceled() const { return value & kCanceledMask; } 116 117 uint32_t value; 118 }; 119 120 State(); 121 ~State(); 122 123 // Sets as canceled. Returns the state 124 // before the operation. 125 Value Cancel(); 126 127 // Increments the worker count by 1. Returns the state before the operation. 128 Value IncrementWorkerCount(); 129 130 // Decrements the worker count by 1. Returns the state before the operation. 131 Value DecrementWorkerCount(); 132 133 // Loads and returns the state. 134 Value Load() const; 135 136 private: 137 std::atomic<uint32_t> value_{0}; 138 }; 139 140 // Atomic flag that indicates if the joining thread is currently waiting on 141 // another worker to yield or to signal. 142 class JoinFlag { 143 public: 144 static constexpr uint32_t kNotWaiting = 0; 145 static constexpr uint32_t kWaitingForWorkerToSignal = 1; 146 static constexpr uint32_t kWaitingForWorkerToYield = 3; 147 // kWaitingForWorkerToYield is 3 because the impl relies on the following 148 // property. 149 static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) == 150 kWaitingForWorkerToSignal, 151 ""); 152 153 JoinFlag(); 154 ~JoinFlag(); 155 156 // Returns true if the status is not kNotWaiting, using 157 // std::memory_order_relaxed. IsWaiting()158 bool IsWaiting() { 159 return value_.load(std::memory_order_relaxed) != kNotWaiting; 160 } 161 162 // Resets the status as kNotWaiting using std::memory_order_relaxed. 163 void Reset(); 164 165 // Sets the status as kWaitingForWorkerToYield using 166 // std::memory_order_relaxed. 167 void SetWaiting(); 168 169 // If the flag is kWaitingForWorkerToYield, returns true indicating that the 170 // worker should yield, and atomically updates to kWaitingForWorkerToSignal 171 // (using std::memory_order_relaxed) to ensure that a single worker yields 172 // in response to SetWaiting(). 173 bool ShouldWorkerYield(); 174 175 // If the flag is kWaiting*, returns true indicating that the worker should 176 // signal, and atomically updates to kNotWaiting (using 177 // std::memory_order_relaxed) to ensure that a single worker signals in 178 // response to SetWaiting(). 179 bool ShouldWorkerSignal(); 180 181 private: 182 std::atomic<uint32_t> value_{kNotWaiting}; 183 }; 184 185 ~JobTaskSource() override; 186 187 // Called from the joining thread. Waits for the worker count to be below or 188 // equal to max concurrency (will happen when a worker calls 189 // DidProcessTask()). Returns true if the joining thread should run a task, or 190 // false if joining was completed and all other workers returned because 191 // either there's no work remaining or Job was cancelled. 192 bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_); 193 194 size_t GetMaxConcurrency(size_t worker_count) const; 195 196 // TaskSource: 197 RunStatus WillRunTask() override; 198 Task TakeTask(TaskSource::Transaction* transaction) override; 199 std::optional<Task> Clear(TaskSource::Transaction* transaction) override; 200 bool DidProcessTask(TaskSource::Transaction* transaction) override; 201 bool WillReEnqueue(TimeTicks now, 202 TaskSource::Transaction* transaction) override; 203 bool OnBecomeReady() override; 204 205 // Synchronizes access to workers state. 206 mutable CheckedLock worker_lock_{UniversalSuccessor()}; 207 208 // Current atomic state (atomic despite the lock to allow optimistic reads 209 // and cancellation without the lock). 210 State state_ GUARDED_BY(worker_lock_); 211 // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield() 212 // hence the use of atomics. 213 JoinFlag join_flag_ GUARDED_BY(worker_lock_); 214 // Signaled when |join_flag_| is kWaiting* and a worker returns. 215 std::optional<ConditionVariable> worker_released_condition_ 216 GUARDED_BY(worker_lock_); 217 218 std::atomic<uint32_t> assigned_task_ids_{0}; 219 220 RepeatingCallback<size_t(size_t)> max_concurrency_callback_; 221 222 // Worker task set by the job owner. 223 RepeatingCallback<void(JobDelegate*)> worker_task_; 224 // Task returned from TakeTask(), that calls |worker_task_| internally. 225 RepeatingClosure primary_task_; 226 227 TaskMetadata task_metadata_; 228 229 const TimeTicks ready_time_; 230 raw_ptr<PooledTaskRunnerDelegate, LeakedDanglingUntriaged> delegate_; 231 }; 232 233 } // namespace internal 234 } // namespace base 235 236 #endif // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_ 237