1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/sequence.h"
6
7 #include <utility>
8
9 #include "base/check.h"
10 #include "base/critical_closure.h"
11 #include "base/feature_list.h"
12 #include "base/functional/bind.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/memory/stack_allocated.h"
15 #include "base/task/task_features.h"
16 #include "base/time/time.h"
17
18 namespace base {
19 namespace internal {
20
21 namespace {
22
23 // Asserts that a lock is acquired and annotates the scope such that
24 // base/thread_annotations.h can recognize that the lock is acquired.
25 class SCOPED_LOCKABLE AnnotateLockAcquired {
26 STACK_ALLOCATED();
27
28 public:
29 explicit AnnotateLockAcquired(const CheckedLock& lock)
EXCLUSIVE_LOCK_FUNCTION(lock)30 EXCLUSIVE_LOCK_FUNCTION(lock)
31 : acquired_lock_(lock) {
32 acquired_lock_.AssertAcquired();
33 }
34
UNLOCK_FUNCTION()35 ~AnnotateLockAcquired() UNLOCK_FUNCTION() { acquired_lock_.AssertAcquired(); }
36
37 private:
38 const CheckedLock& acquired_lock_;
39 };
40
MaybeMakeCriticalClosure(TaskShutdownBehavior shutdown_behavior,Task & task)41 void MaybeMakeCriticalClosure(TaskShutdownBehavior shutdown_behavior,
42 Task& task) {
43 switch (shutdown_behavior) {
44 case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
45 // Nothing to do.
46 break;
47 case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
48 // MakeCriticalClosure() is arguably useful for SKIP_ON_SHUTDOWN, possibly
49 // in combination with is_immediate=false. However, SKIP_ON_SHUTDOWN is
50 // the default and hence the theoretical benefits don't warrant the
51 // performance implications.
52 break;
53 case TaskShutdownBehavior::BLOCK_SHUTDOWN:
54 task.task =
55 MakeCriticalClosure(task.posted_from, std::move(task.task),
56 /*is_immediate=*/task.delayed_run_time.is_null());
57 break;
58 }
59 }
60
61 } // namespace
62
Transaction(Sequence * sequence)63 Sequence::Transaction::Transaction(Sequence* sequence)
64 : TaskSource::Transaction(sequence) {}
65
66 Sequence::Transaction::Transaction(Sequence::Transaction&& other) = default;
67
68 Sequence::Transaction::~Transaction() = default;
69
WillPushImmediateTask()70 bool Sequence::Transaction::WillPushImmediateTask() {
71 // In a Transaction.
72 AnnotateLockAcquired annotate(sequence()->lock_);
73
74 bool was_immediate =
75 sequence()->is_immediate_.exchange(true, std::memory_order_relaxed);
76 return !was_immediate;
77 }
78
PushImmediateTask(Task task)79 void Sequence::Transaction::PushImmediateTask(Task task) {
80 // In a Transaction.
81 AnnotateLockAcquired annotate(sequence()->lock_);
82
83 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
84 // for details.
85 CHECK(task.task);
86 DCHECK(!task.queue_time.is_null());
87 DCHECK(sequence()->is_immediate_.load(std::memory_order_relaxed));
88
89 bool was_unretained = sequence()->IsEmpty() && !sequence()->has_worker_;
90 bool queue_was_empty = sequence()->queue_.empty();
91
92 MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task);
93
94 sequence()->queue_.push(std::move(task));
95
96 if (queue_was_empty)
97 sequence()->UpdateReadyTimes();
98
99 // AddRef() matched by manual Release() when the sequence has no more tasks
100 // to run (in DidProcessTask() or Clear()).
101 if (was_unretained && sequence()->task_runner())
102 sequence()->task_runner()->AddRef();
103 }
104
PushDelayedTask(Task task)105 bool Sequence::Transaction::PushDelayedTask(Task task) {
106 // In a Transaction.
107 AnnotateLockAcquired annotate(sequence()->lock_);
108
109 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
110 // for details.
111 CHECK(task.task);
112 DCHECK(!task.queue_time.is_null());
113 DCHECK(!task.delayed_run_time.is_null());
114
115 bool top_will_change = sequence()->DelayedSortKeyWillChange(task);
116 bool was_empty = sequence()->IsEmpty();
117
118 MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task);
119
120 sequence()->delayed_queue_.insert(std::move(task));
121
122 if (sequence()->queue_.empty())
123 sequence()->UpdateReadyTimes();
124
125 // AddRef() matched by manual Release() when the sequence has no more tasks
126 // to run (in DidProcessTask() or Clear()).
127 if (was_empty && !sequence()->has_worker_ && sequence()->task_runner())
128 sequence()->task_runner()->AddRef();
129
130 return top_will_change;
131 }
132
133 // Delayed tasks are ordered by latest_delayed_run_time(). The top task may
134 // not be the first task eligible to run, but tasks will always become ripe
135 // before their latest_delayed_run_time().
operator ()(const Task & lhs,const Task & rhs) const136 bool Sequence::DelayedTaskGreater::operator()(const Task& lhs,
137 const Task& rhs) const {
138 TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time();
139 TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time();
140 return std::tie(lhs_latest_delayed_run_time, lhs.sequence_num) >
141 std::tie(rhs_latest_delayed_run_time, rhs.sequence_num);
142 }
143
WillRunTask()144 TaskSource::RunStatus Sequence::WillRunTask() {
145 // There should never be a second call to WillRunTask() before DidProcessTask
146 // since the RunStatus is always marked a saturated.
147 DCHECK(!has_worker_);
148
149 // It's ok to access |has_worker_| outside of a Transaction since
150 // WillRunTask() is externally synchronized, always called in sequence with
151 // TakeTask() and DidProcessTask() and only called if HasReadyTasks(), which
152 // means it won't race with Push[Immediate/Delayed]Task().
153 has_worker_ = true;
154 return RunStatus::kAllowedSaturated;
155 }
156
OnBecomeReady()157 bool Sequence::OnBecomeReady() {
158 DCHECK(!has_worker_);
159 // std::memory_order_relaxed is sufficient because no other state is
160 // synchronized with |is_immediate_| outside of |lock_|.
161 return !is_immediate_.exchange(true, std::memory_order_relaxed);
162 }
163
GetRemainingConcurrency() const164 size_t Sequence::GetRemainingConcurrency() const {
165 return 1;
166 }
167
TakeNextImmediateTask()168 Task Sequence::TakeNextImmediateTask() {
169 Task next_task = std::move(queue_.front());
170 queue_.pop();
171 return next_task;
172 }
173
TakeEarliestTask()174 Task Sequence::TakeEarliestTask() {
175 if (queue_.empty())
176 return delayed_queue_.take_top();
177
178 if (delayed_queue_.empty())
179 return TakeNextImmediateTask();
180
181 // Both queues contain at least a task. Decide from which one the task should
182 // be taken.
183 if (queue_.front().queue_time <=
184 delayed_queue_.top().latest_delayed_run_time())
185 return TakeNextImmediateTask();
186
187 return delayed_queue_.take_top();
188 }
189
UpdateReadyTimes()190 void Sequence::UpdateReadyTimes() {
191 DCHECK(!IsEmpty());
192 if (queue_.empty()) {
193 latest_ready_time_.store(delayed_queue_.top().latest_delayed_run_time(),
194 std::memory_order_relaxed);
195 earliest_ready_time_.store(delayed_queue_.top().earliest_delayed_run_time(),
196 std::memory_order_relaxed);
197 return;
198 }
199
200 if (delayed_queue_.empty()) {
201 latest_ready_time_.store(queue_.front().queue_time,
202 std::memory_order_relaxed);
203 } else {
204 latest_ready_time_.store(
205 std::min(queue_.front().queue_time,
206 delayed_queue_.top().latest_delayed_run_time()),
207 std::memory_order_relaxed);
208 }
209 earliest_ready_time_.store(TimeTicks(), std::memory_order_relaxed);
210 }
211
TakeTask(TaskSource::Transaction * transaction)212 Task Sequence::TakeTask(TaskSource::Transaction* transaction) {
213 CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
214 AnnotateLockAcquired annotate(lock_);
215
216 DCHECK(has_worker_);
217 DCHECK(is_immediate_.load(std::memory_order_relaxed));
218 DCHECK(!queue_.empty() || !delayed_queue_.empty());
219
220 auto next_task = TakeEarliestTask();
221
222 if (!IsEmpty())
223 UpdateReadyTimes();
224
225 return next_task;
226 }
227
DidProcessTask(TaskSource::Transaction * transaction)228 bool Sequence::DidProcessTask(TaskSource::Transaction* transaction) {
229 CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
230 AnnotateLockAcquired annotate(lock_);
231
232 // There should never be a call to DidProcessTask without an associated
233 // WillRunTask().
234 DCHECK(has_worker_);
235 has_worker_ = false;
236
237 // See comment on TaskSource::task_runner_ for lifetime management details.
238 if (IsEmpty()) {
239 is_immediate_.store(false, std::memory_order_relaxed);
240 ReleaseTaskRunner();
241 return false;
242 }
243
244 // Let the caller re-enqueue this non-empty Sequence regardless of
245 // |run_result| so it can continue churning through this Sequence's tasks and
246 // skip/delete them in the proper scope.
247 return true;
248 }
249
WillReEnqueue(TimeTicks now,TaskSource::Transaction * transaction)250 bool Sequence::WillReEnqueue(TimeTicks now,
251 TaskSource::Transaction* transaction) {
252 CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
253 AnnotateLockAcquired annotate(lock_);
254
255 // This should always be called from a worker thread and it will be
256 // called after DidProcessTask().
257 DCHECK(is_immediate_.load(std::memory_order_relaxed));
258
259 bool has_ready_tasks = HasReadyTasks(now);
260 if (!has_ready_tasks)
261 is_immediate_.store(false, std::memory_order_relaxed);
262
263 return has_ready_tasks;
264 }
265
DelayedSortKeyWillChange(const Task & delayed_task) const266 bool Sequence::DelayedSortKeyWillChange(const Task& delayed_task) const {
267 // If sequence has already been picked up by a worker or moved, no need to
268 // proceed further here.
269 if (is_immediate_.load(std::memory_order_relaxed)) {
270 return false;
271 }
272
273 if (IsEmpty()) {
274 return true;
275 }
276
277 return delayed_task.latest_delayed_run_time() <
278 delayed_queue_.top().latest_delayed_run_time();
279 }
280
HasReadyTasks(TimeTicks now) const281 bool Sequence::HasReadyTasks(TimeTicks now) const {
282 return now >= TS_UNCHECKED_READ(earliest_ready_time_)
283 .load(std::memory_order_relaxed);
284 }
285
HasImmediateTasks() const286 bool Sequence::HasImmediateTasks() const {
287 return !queue_.empty();
288 }
289
GetSortKey() const290 TaskSourceSortKey Sequence::GetSortKey() const {
291 return TaskSourceSortKey(
292 priority_racy(),
293 TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed));
294 }
295
GetDelayedSortKey() const296 TimeTicks Sequence::GetDelayedSortKey() const {
297 return TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed);
298 }
299
Clear(TaskSource::Transaction * transaction)300 std::optional<Task> Sequence::Clear(TaskSource::Transaction* transaction) {
301 CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
302 AnnotateLockAcquired annotate(lock_);
303
304 // See comment on TaskSource::task_runner_ for lifetime management details.
305 if (!IsEmpty() && !has_worker_) {
306 ReleaseTaskRunner();
307 }
308
309 return Task(
310 FROM_HERE,
311 base::BindOnce(
312 [](base::queue<Task> queue,
313 base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue) {
314 while (!queue.empty())
315 queue.pop();
316
317 while (!delayed_queue.empty())
318 delayed_queue.pop();
319 },
320 std::move(queue_), std::move(delayed_queue_)),
321 TimeTicks(), TimeDelta(), TimeDelta(),
322 static_cast<int>(reinterpret_cast<intptr_t>(this)));
323 }
324
ReleaseTaskRunner()325 void Sequence::ReleaseTaskRunner() {
326 if (!task_runner())
327 return;
328 // No member access after this point, releasing |task_runner()| might delete
329 // |this|.
330 task_runner()->Release();
331 }
332
Sequence(const TaskTraits & traits,SequencedTaskRunner * task_runner,TaskSourceExecutionMode execution_mode)333 Sequence::Sequence(const TaskTraits& traits,
334 SequencedTaskRunner* task_runner,
335 TaskSourceExecutionMode execution_mode)
336 : TaskSource(traits, execution_mode), task_runner_(task_runner) {}
337
338 Sequence::~Sequence() = default;
339
BeginTransaction()340 Sequence::Transaction Sequence::BeginTransaction() {
341 return Transaction(this);
342 }
343
GetExecutionEnvironment()344 ExecutionEnvironment Sequence::GetExecutionEnvironment() {
345 if (execution_mode() == TaskSourceExecutionMode::kSingleThread) {
346 return {token_, &sequence_local_storage_,
347 static_cast<SingleThreadTaskRunner*>(task_runner())};
348 }
349 return {token_, &sequence_local_storage_, task_runner()};
350 }
351
IsEmpty() const352 bool Sequence::IsEmpty() const {
353 return queue_.empty() && delayed_queue_.empty();
354 }
355
356 } // namespace internal
357 } // namespace base
358