1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/job_task_source.h"
6
7 #include <bit>
8 #include <type_traits>
9 #include <utility>
10
11 #include "base/check_op.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/notreached.h"
16 #include "base/task/common/checked_lock.h"
17 #include "base/task/task_features.h"
18 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
19 #include "base/template_util.h"
20 #include "base/threading/thread_restrictions.h"
21 #include "base/time/time.h"
22 #include "base/time/time_override.h"
23 #include "base/trace_event/base_tracing.h"
24
25 namespace base {
26 namespace internal {
27
28 namespace {
29
30 // Capped to allow assigning task_ids from a bitfield.
31 constexpr size_t kMaxWorkersPerJob = 32;
32 static_assert(
33 kMaxWorkersPerJob <=
34 std::numeric_limits<
35 std::invoke_result<decltype(&JobDelegate::GetTaskId),
36 JobDelegate>::type>::max(),
37 "AcquireTaskId return type isn't big enough to fit kMaxWorkersPerJob");
38
39 } // namespace
40
41 JobTaskSource::State::State() = default;
42 JobTaskSource::State::~State() = default;
43
Cancel()44 JobTaskSource::State::Value JobTaskSource::State::Cancel() {
45 return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)};
46 }
47
DecrementWorkerCount()48 JobTaskSource::State::Value JobTaskSource::State::DecrementWorkerCount() {
49 const uint32_t value_before_sub =
50 value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed);
51 DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
52 return {value_before_sub};
53 }
54
IncrementWorkerCount()55 JobTaskSource::State::Value JobTaskSource::State::IncrementWorkerCount() {
56 uint32_t value_before_add =
57 value_.fetch_add(kWorkerCountIncrement, std::memory_order_relaxed);
58 // The worker count must not overflow a uint8_t.
59 DCHECK((value_before_add >> kWorkerCountBitOffset) < ((1 << 8) - 1));
60 return {value_before_add};
61 }
62
Load() const63 JobTaskSource::State::Value JobTaskSource::State::Load() const {
64 return {value_.load(std::memory_order_relaxed)};
65 }
66
67 JobTaskSource::JoinFlag::JoinFlag() = default;
68 JobTaskSource::JoinFlag::~JoinFlag() = default;
69
Reset()70 void JobTaskSource::JoinFlag::Reset() {
71 value_.store(kNotWaiting, std::memory_order_relaxed);
72 }
73
SetWaiting()74 void JobTaskSource::JoinFlag::SetWaiting() {
75 value_.store(kWaitingForWorkerToYield, std::memory_order_relaxed);
76 }
77
ShouldWorkerYield()78 bool JobTaskSource::JoinFlag::ShouldWorkerYield() {
79 // The fetch_and() sets the state to kWaitingForWorkerToSignal if it was
80 // previously kWaitingForWorkerToYield, otherwise it leaves it unchanged.
81 return value_.fetch_and(kWaitingForWorkerToSignal,
82 std::memory_order_relaxed) ==
83 kWaitingForWorkerToYield;
84 }
85
ShouldWorkerSignal()86 bool JobTaskSource::JoinFlag::ShouldWorkerSignal() {
87 return value_.exchange(kNotWaiting, std::memory_order_relaxed) != kNotWaiting;
88 }
89
JobTaskSource(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback,PooledTaskRunnerDelegate * delegate)90 JobTaskSource::JobTaskSource(const Location& from_here,
91 const TaskTraits& traits,
92 RepeatingCallback<void(JobDelegate*)> worker_task,
93 MaxConcurrencyCallback max_concurrency_callback,
94 PooledTaskRunnerDelegate* delegate)
95 : TaskSource(traits, TaskSourceExecutionMode::kJob),
96 max_concurrency_callback_(std::move(max_concurrency_callback)),
97 worker_task_(std::move(worker_task)),
98 primary_task_(base::BindRepeating(
99 [](JobTaskSource* self) {
100 CheckedLock::AssertNoLockHeldOnCurrentThread();
101 // Each worker task has its own delegate with associated state.
102 JobDelegate job_delegate{self, self->delegate_};
103 self->worker_task_.Run(&job_delegate);
104 },
105 base::Unretained(this))),
106 task_metadata_(from_here),
107 ready_time_(TimeTicks::Now()),
108 delegate_(delegate) {
109 DCHECK(delegate_);
110 task_metadata_.sequence_num = -1;
111 }
112
~JobTaskSource()113 JobTaskSource::~JobTaskSource() {
114 // Make sure there's no outstanding active run operation left.
115 DCHECK_EQ(state_.Load().worker_count(), 0U);
116 }
117
GetExecutionEnvironment()118 ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() {
119 return {SequenceToken::Create()};
120 }
121
WillEnqueue(int sequence_num,TaskAnnotator & annotator)122 void JobTaskSource::WillEnqueue(int sequence_num, TaskAnnotator& annotator) {
123 if (task_metadata_.sequence_num != -1) {
124 // WillEnqueue() was already called.
125 return;
126 }
127 task_metadata_.sequence_num = sequence_num;
128 annotator.WillQueueTask("ThreadPool_PostJob", &task_metadata_);
129 }
130
WillJoin()131 bool JobTaskSource::WillJoin() {
132 TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
133 CheckedAutoLock auto_lock(worker_lock_);
134 DCHECK(!worker_released_condition_); // This may only be called once.
135 worker_lock_.CreateConditionVariableAndEmplace(worker_released_condition_);
136 // Prevent wait from triggering a ScopedBlockingCall as this would cause
137 // |ThreadGroup::lock_| to be acquired, causing lock inversion.
138 worker_released_condition_->declare_only_used_while_idle();
139 const auto state_before_add = state_.IncrementWorkerCount();
140
141 if (!state_before_add.is_canceled() &&
142 state_before_add.worker_count() <
143 GetMaxConcurrency(state_before_add.worker_count())) {
144 return true;
145 }
146 return WaitForParticipationOpportunity();
147 }
148
RunJoinTask()149 bool JobTaskSource::RunJoinTask() {
150 JobDelegate job_delegate{this, nullptr};
151 worker_task_.Run(&job_delegate);
152
153 // It is safe to read |state_| without a lock since this variable is atomic
154 // and the call to GetMaxConcurrency() is used for a best effort early exit.
155 // Stale values will only cause WaitForParticipationOpportunity() to be
156 // called.
157 const auto state = TS_UNCHECKED_READ(state_).Load();
158 // The condition is slightly different from the one in WillJoin() since we're
159 // using |state| that was already incremented to include the joining thread.
160 if (!state.is_canceled() &&
161 state.worker_count() <= GetMaxConcurrency(state.worker_count() - 1)) {
162 return true;
163 }
164
165 TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
166 CheckedAutoLock auto_lock(worker_lock_);
167 return WaitForParticipationOpportunity();
168 }
169
Cancel(TaskSource::Transaction * transaction)170 void JobTaskSource::Cancel(TaskSource::Transaction* transaction) {
171 // Sets the kCanceledMask bit on |state_| so that further calls to
172 // WillRunTask() never succeed. std::memory_order_relaxed without a lock is
173 // safe because this task source never needs to be re-enqueued after Cancel().
174 TS_UNCHECKED_READ(state_).Cancel();
175 }
176
177 // EXCLUSIVE_LOCK_REQUIRED(worker_lock_)
WaitForParticipationOpportunity()178 bool JobTaskSource::WaitForParticipationOpportunity() {
179 DCHECK(!join_flag_.IsWaiting());
180
181 // std::memory_order_relaxed is sufficient because no other state is
182 // synchronized with |state_| outside of |lock_|.
183 auto state = state_.Load();
184 // |worker_count - 1| to exclude the joining thread which is not active.
185 size_t max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
186
187 // Wait until either:
188 // A) |worker_count| is below or equal to max concurrency and state is not
189 // canceled.
190 // B) All other workers returned and |worker_count| is 1.
191 while (!((state.worker_count() <= max_concurrency && !state.is_canceled()) ||
192 state.worker_count() == 1)) {
193 // std::memory_order_relaxed is sufficient because no other state is
194 // synchronized with |join_flag_| outside of |lock_|.
195 join_flag_.SetWaiting();
196
197 // To avoid unnecessarily waiting, if either condition A) or B) change
198 // |lock_| is taken and |worker_released_condition_| signaled if necessary:
199 // 1- In DidProcessTask(), after worker count is decremented.
200 // 2- In NotifyConcurrencyIncrease(), following a max_concurrency increase.
201 worker_released_condition_->Wait();
202 state = state_.Load();
203 // |worker_count - 1| to exclude the joining thread which is not active.
204 max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
205 }
206 // It's possible though unlikely that the joining thread got a participation
207 // opportunity without a worker signaling.
208 join_flag_.Reset();
209
210 // Case A:
211 if (state.worker_count() <= max_concurrency && !state.is_canceled()) {
212 return true;
213 }
214 // Case B:
215 // Only the joining thread remains.
216 DCHECK_EQ(state.worker_count(), 1U);
217 DCHECK(state.is_canceled() || max_concurrency == 0U);
218 state_.DecrementWorkerCount();
219 // Prevent subsequent accesses to user callbacks.
220 state_.Cancel();
221 return false;
222 }
223
WillRunTask()224 TaskSource::RunStatus JobTaskSource::WillRunTask() {
225 CheckedAutoLock auto_lock(worker_lock_);
226 auto state_before_add = state_.Load();
227
228 // Don't allow this worker to run the task if either:
229 // A) |state_| was canceled.
230 // B) |worker_count| is already at |max_concurrency|.
231 // C) |max_concurrency| was lowered below or to |worker_count|.
232 // Case A:
233 if (state_before_add.is_canceled()) {
234 return RunStatus::kDisallowed;
235 }
236
237 const size_t max_concurrency =
238 GetMaxConcurrency(state_before_add.worker_count());
239 if (state_before_add.worker_count() < max_concurrency) {
240 state_before_add = state_.IncrementWorkerCount();
241 }
242 const size_t worker_count_before_add = state_before_add.worker_count();
243 // Case B) or C):
244 if (worker_count_before_add >= max_concurrency) {
245 return RunStatus::kDisallowed;
246 }
247
248 DCHECK_LT(worker_count_before_add, max_concurrency);
249 return max_concurrency == worker_count_before_add + 1
250 ? RunStatus::kAllowedSaturated
251 : RunStatus::kAllowedNotSaturated;
252 }
253
GetRemainingConcurrency() const254 size_t JobTaskSource::GetRemainingConcurrency() const {
255 // It is safe to read |state_| without a lock since this variable is atomic,
256 // and no other state is synchronized with GetRemainingConcurrency().
257 const auto state = TS_UNCHECKED_READ(state_).Load();
258 if (state.is_canceled()) {
259 return 0;
260 }
261 const size_t max_concurrency = GetMaxConcurrency(state.worker_count());
262 // Avoid underflows.
263 if (state.worker_count() > max_concurrency)
264 return 0;
265 return max_concurrency - state.worker_count();
266 }
267
IsActive() const268 bool JobTaskSource::IsActive() const {
269 CheckedAutoLock auto_lock(worker_lock_);
270 auto state = state_.Load();
271 return GetMaxConcurrency(state.worker_count()) != 0 ||
272 state.worker_count() != 0;
273 }
274
GetWorkerCount() const275 size_t JobTaskSource::GetWorkerCount() const {
276 return TS_UNCHECKED_READ(state_).Load().worker_count();
277 }
278
NotifyConcurrencyIncrease()279 void JobTaskSource::NotifyConcurrencyIncrease() {
280 // Avoid unnecessary locks when NotifyConcurrencyIncrease() is spuriously
281 // called.
282 if (GetRemainingConcurrency() == 0) {
283 return;
284 }
285
286 {
287 // Lock is taken to access |join_flag_| below and signal
288 // |worker_released_condition_|.
289 CheckedAutoLock auto_lock(worker_lock_);
290 if (join_flag_.ShouldWorkerSignal()) {
291 worker_released_condition_->Signal();
292 }
293 }
294
295 // Make sure the task source is in the queue if not already.
296 // Caveat: it's possible but unlikely that the task source has already reached
297 // its intended concurrency and doesn't need to be enqueued if there
298 // previously were too many worker. For simplicity, the task source is always
299 // enqueued and will get discarded if already saturated when it is popped from
300 // the priority queue.
301 delegate_->EnqueueJobTaskSource(this);
302 }
303
GetMaxConcurrency() const304 size_t JobTaskSource::GetMaxConcurrency() const {
305 return GetMaxConcurrency(TS_UNCHECKED_READ(state_).Load().worker_count());
306 }
307
GetMaxConcurrency(size_t worker_count) const308 size_t JobTaskSource::GetMaxConcurrency(size_t worker_count) const {
309 return std::min(max_concurrency_callback_.Run(worker_count),
310 kMaxWorkersPerJob);
311 }
312
AcquireTaskId()313 uint8_t JobTaskSource::AcquireTaskId() {
314 static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
315 "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
316 uint32_t assigned_task_ids =
317 assigned_task_ids_.load(std::memory_order_relaxed);
318 uint32_t new_assigned_task_ids = 0;
319 int task_id = 0;
320 // memory_order_acquire on success, matched with memory_order_release in
321 // ReleaseTaskId() so that operations done by previous threads that had
322 // the same task_id become visible to the current thread.
323 do {
324 // Count trailing one bits. This is the id of the right-most 0-bit in
325 // |assigned_task_ids|.
326 task_id = std::countr_one(assigned_task_ids);
327 new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
328 } while (!assigned_task_ids_.compare_exchange_weak(
329 assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire,
330 std::memory_order_relaxed));
331 return static_cast<uint8_t>(task_id);
332 }
333
ReleaseTaskId(uint8_t task_id)334 void JobTaskSource::ReleaseTaskId(uint8_t task_id) {
335 // memory_order_release to match AcquireTaskId().
336 uint32_t previous_task_ids = assigned_task_ids_.fetch_and(
337 ~(uint32_t(1) << task_id), std::memory_order_release);
338 DCHECK(previous_task_ids & (uint32_t(1) << task_id));
339 }
340
ShouldYield()341 bool JobTaskSource::ShouldYield() {
342 // It is safe to read |join_flag_| and |state_| without a lock since these
343 // variables are atomic, keeping in mind that threads may not immediately see
344 // the new value when it is updated.
345 return TS_UNCHECKED_READ(join_flag_).ShouldWorkerYield() ||
346 TS_UNCHECKED_READ(state_).Load().is_canceled();
347 }
348
TakeTask(TaskSource::Transaction * transaction)349 Task JobTaskSource::TakeTask(TaskSource::Transaction* transaction) {
350 // JobTaskSource members are not lock-protected so no need to acquire a lock
351 // if |transaction| is nullptr.
352 DCHECK_GT(TS_UNCHECKED_READ(state_).Load().worker_count(), 0U);
353 DCHECK(primary_task_);
354 return {task_metadata_, primary_task_};
355 }
356
DidProcessTask(TaskSource::Transaction *)357 bool JobTaskSource::DidProcessTask(TaskSource::Transaction* /*transaction*/) {
358 // Lock is needed to access |join_flag_| below and signal
359 // |worker_released_condition_|.
360 CheckedAutoLock auto_lock(worker_lock_);
361 const auto state_before_sub = state_.DecrementWorkerCount();
362
363 if (join_flag_.ShouldWorkerSignal()) {
364 worker_released_condition_->Signal();
365 }
366
367 // A canceled task source should never get re-enqueued.
368 if (state_before_sub.is_canceled()) {
369 return false;
370 }
371
372 DCHECK_GT(state_before_sub.worker_count(), 0U);
373
374 // Re-enqueue the TaskSource if the task ran and the worker count is below the
375 // max concurrency.
376 // |worker_count - 1| to exclude the returning thread.
377 return state_before_sub.worker_count() <=
378 GetMaxConcurrency(state_before_sub.worker_count() - 1);
379 }
380
381 // This is a no-op and should always return true.
WillReEnqueue(TimeTicks now,TaskSource::Transaction *)382 bool JobTaskSource::WillReEnqueue(TimeTicks now,
383 TaskSource::Transaction* /*transaction*/) {
384 return true;
385 }
386
387 // This is a no-op.
OnBecomeReady()388 bool JobTaskSource::OnBecomeReady() {
389 return false;
390 }
391
GetSortKey() const392 TaskSourceSortKey JobTaskSource::GetSortKey() const {
393 return TaskSourceSortKey(priority_racy(), ready_time_,
394 TS_UNCHECKED_READ(state_).Load().worker_count());
395 }
396
397 // This function isn't expected to be called since a job is never delayed.
398 // However, the class still needs to provide an override.
GetDelayedSortKey() const399 TimeTicks JobTaskSource::GetDelayedSortKey() const {
400 return TimeTicks();
401 }
402
403 // This function isn't expected to be called since a job is never delayed.
404 // However, the class still needs to provide an override.
HasReadyTasks(TimeTicks now) const405 bool JobTaskSource::HasReadyTasks(TimeTicks now) const {
406 NOTREACHED();
407 return true;
408 }
409
Clear(TaskSource::Transaction * transaction)410 std::optional<Task> JobTaskSource::Clear(TaskSource::Transaction* transaction) {
411 Cancel();
412
413 // Nothing is cleared since other workers might still racily run tasks. For
414 // simplicity, the destructor will take care of it once all references are
415 // released.
416 return std::nullopt;
417 }
418
419 } // namespace internal
420 } // namespace base
421