xref: /aosp_15_r20/external/cronet/base/task/thread_pool/sequence.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/sequence.h"
6 
7 #include <utility>
8 
9 #include "base/check.h"
10 #include "base/critical_closure.h"
11 #include "base/feature_list.h"
12 #include "base/functional/bind.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/memory/stack_allocated.h"
15 #include "base/task/task_features.h"
16 #include "base/time/time.h"
17 
18 namespace base {
19 namespace internal {
20 
21 namespace {
22 
23 // Asserts that a lock is acquired and annotates the scope such that
24 // base/thread_annotations.h can recognize that the lock is acquired.
25 class SCOPED_LOCKABLE AnnotateLockAcquired {
26   STACK_ALLOCATED();
27 
28  public:
29   explicit AnnotateLockAcquired(const CheckedLock& lock)
EXCLUSIVE_LOCK_FUNCTION(lock)30       EXCLUSIVE_LOCK_FUNCTION(lock)
31       : acquired_lock_(lock) {
32     acquired_lock_.AssertAcquired();
33   }
34 
UNLOCK_FUNCTION()35   ~AnnotateLockAcquired() UNLOCK_FUNCTION() { acquired_lock_.AssertAcquired(); }
36 
37  private:
38   const CheckedLock& acquired_lock_;
39 };
40 
MaybeMakeCriticalClosure(TaskShutdownBehavior shutdown_behavior,Task & task)41 void MaybeMakeCriticalClosure(TaskShutdownBehavior shutdown_behavior,
42                               Task& task) {
43   switch (shutdown_behavior) {
44     case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
45       // Nothing to do.
46       break;
47     case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
48       // MakeCriticalClosure() is arguably useful for SKIP_ON_SHUTDOWN, possibly
49       // in combination with is_immediate=false. However, SKIP_ON_SHUTDOWN is
50       // the default and hence the theoretical benefits don't warrant the
51       // performance implications.
52       break;
53     case TaskShutdownBehavior::BLOCK_SHUTDOWN:
54       task.task =
55           MakeCriticalClosure(task.posted_from, std::move(task.task),
56                               /*is_immediate=*/task.delayed_run_time.is_null());
57       break;
58   }
59 }
60 
61 }  // namespace
62 
Transaction(Sequence * sequence)63 Sequence::Transaction::Transaction(Sequence* sequence)
64     : TaskSource::Transaction(sequence) {}
65 
66 Sequence::Transaction::Transaction(Sequence::Transaction&& other) = default;
67 
68 Sequence::Transaction::~Transaction() = default;
69 
WillPushImmediateTask()70 bool Sequence::Transaction::WillPushImmediateTask() {
71   // In a Transaction.
72   AnnotateLockAcquired annotate(sequence()->lock_);
73 
74   bool was_immediate =
75       sequence()->is_immediate_.exchange(true, std::memory_order_relaxed);
76   return !was_immediate;
77 }
78 
PushImmediateTask(Task task)79 void Sequence::Transaction::PushImmediateTask(Task task) {
80   // In a Transaction.
81   AnnotateLockAcquired annotate(sequence()->lock_);
82 
83   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
84   // for details.
85   CHECK(task.task);
86   DCHECK(!task.queue_time.is_null());
87   DCHECK(sequence()->is_immediate_.load(std::memory_order_relaxed));
88 
89   bool was_unretained = sequence()->IsEmpty() && !sequence()->has_worker_;
90   bool queue_was_empty = sequence()->queue_.empty();
91 
92   MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task);
93 
94   sequence()->queue_.push(std::move(task));
95 
96   if (queue_was_empty)
97     sequence()->UpdateReadyTimes();
98 
99   // AddRef() matched by manual Release() when the sequence has no more tasks
100   // to run (in DidProcessTask() or Clear()).
101   if (was_unretained && sequence()->task_runner())
102     sequence()->task_runner()->AddRef();
103 }
104 
PushDelayedTask(Task task)105 bool Sequence::Transaction::PushDelayedTask(Task task) {
106   // In a Transaction.
107   AnnotateLockAcquired annotate(sequence()->lock_);
108 
109   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
110   // for details.
111   CHECK(task.task);
112   DCHECK(!task.queue_time.is_null());
113   DCHECK(!task.delayed_run_time.is_null());
114 
115   bool top_will_change = sequence()->DelayedSortKeyWillChange(task);
116   bool was_empty = sequence()->IsEmpty();
117 
118   MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task);
119 
120   sequence()->delayed_queue_.insert(std::move(task));
121 
122   if (sequence()->queue_.empty())
123     sequence()->UpdateReadyTimes();
124 
125   // AddRef() matched by manual Release() when the sequence has no more tasks
126   // to run (in DidProcessTask() or Clear()).
127   if (was_empty && !sequence()->has_worker_ && sequence()->task_runner())
128     sequence()->task_runner()->AddRef();
129 
130   return top_will_change;
131 }
132 
133 // Delayed tasks are ordered by latest_delayed_run_time(). The top task may
134 // not be the first task eligible to run, but tasks will always become ripe
135 // before their latest_delayed_run_time().
operator ()(const Task & lhs,const Task & rhs) const136 bool Sequence::DelayedTaskGreater::operator()(const Task& lhs,
137                                               const Task& rhs) const {
138   TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time();
139   TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time();
140   return std::tie(lhs_latest_delayed_run_time, lhs.sequence_num) >
141          std::tie(rhs_latest_delayed_run_time, rhs.sequence_num);
142 }
143 
WillRunTask()144 TaskSource::RunStatus Sequence::WillRunTask() {
145   // There should never be a second call to WillRunTask() before DidProcessTask
146   // since the RunStatus is always marked a saturated.
147   DCHECK(!has_worker_);
148 
149   // It's ok to access |has_worker_| outside of a Transaction since
150   // WillRunTask() is externally synchronized, always called in sequence with
151   // TakeTask() and DidProcessTask() and only called if HasReadyTasks(), which
152   // means it won't race with Push[Immediate/Delayed]Task().
153   has_worker_ = true;
154   return RunStatus::kAllowedSaturated;
155 }
156 
OnBecomeReady()157 bool Sequence::OnBecomeReady() {
158   DCHECK(!has_worker_);
159   // std::memory_order_relaxed is sufficient because no other state is
160   // synchronized with |is_immediate_| outside of |lock_|.
161   return !is_immediate_.exchange(true, std::memory_order_relaxed);
162 }
163 
GetRemainingConcurrency() const164 size_t Sequence::GetRemainingConcurrency() const {
165   return 1;
166 }
167 
TakeNextImmediateTask()168 Task Sequence::TakeNextImmediateTask() {
169   Task next_task = std::move(queue_.front());
170   queue_.pop();
171   return next_task;
172 }
173 
TakeEarliestTask()174 Task Sequence::TakeEarliestTask() {
175   if (queue_.empty())
176     return delayed_queue_.take_top();
177 
178   if (delayed_queue_.empty())
179     return TakeNextImmediateTask();
180 
181   // Both queues contain at least a task. Decide from which one the task should
182   // be taken.
183   if (queue_.front().queue_time <=
184       delayed_queue_.top().latest_delayed_run_time())
185     return TakeNextImmediateTask();
186 
187   return delayed_queue_.take_top();
188 }
189 
UpdateReadyTimes()190 void Sequence::UpdateReadyTimes() {
191   DCHECK(!IsEmpty());
192   if (queue_.empty()) {
193     latest_ready_time_.store(delayed_queue_.top().latest_delayed_run_time(),
194                              std::memory_order_relaxed);
195     earliest_ready_time_.store(delayed_queue_.top().earliest_delayed_run_time(),
196                                std::memory_order_relaxed);
197     return;
198   }
199 
200   if (delayed_queue_.empty()) {
201     latest_ready_time_.store(queue_.front().queue_time,
202                              std::memory_order_relaxed);
203   } else {
204     latest_ready_time_.store(
205         std::min(queue_.front().queue_time,
206                  delayed_queue_.top().latest_delayed_run_time()),
207         std::memory_order_relaxed);
208   }
209   earliest_ready_time_.store(TimeTicks(), std::memory_order_relaxed);
210 }
211 
TakeTask(TaskSource::Transaction * transaction)212 Task Sequence::TakeTask(TaskSource::Transaction* transaction) {
213   CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
214   AnnotateLockAcquired annotate(lock_);
215 
216   DCHECK(has_worker_);
217   DCHECK(is_immediate_.load(std::memory_order_relaxed));
218   DCHECK(!queue_.empty() || !delayed_queue_.empty());
219 
220   auto next_task = TakeEarliestTask();
221 
222   if (!IsEmpty())
223     UpdateReadyTimes();
224 
225   return next_task;
226 }
227 
DidProcessTask(TaskSource::Transaction * transaction)228 bool Sequence::DidProcessTask(TaskSource::Transaction* transaction) {
229   CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
230   AnnotateLockAcquired annotate(lock_);
231 
232   // There should never be a call to DidProcessTask without an associated
233   // WillRunTask().
234   DCHECK(has_worker_);
235   has_worker_ = false;
236 
237   // See comment on TaskSource::task_runner_ for lifetime management details.
238   if (IsEmpty()) {
239     is_immediate_.store(false, std::memory_order_relaxed);
240     ReleaseTaskRunner();
241     return false;
242   }
243 
244   // Let the caller re-enqueue this non-empty Sequence regardless of
245   // |run_result| so it can continue churning through this Sequence's tasks and
246   // skip/delete them in the proper scope.
247   return true;
248 }
249 
WillReEnqueue(TimeTicks now,TaskSource::Transaction * transaction)250 bool Sequence::WillReEnqueue(TimeTicks now,
251                              TaskSource::Transaction* transaction) {
252   CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
253   AnnotateLockAcquired annotate(lock_);
254 
255   // This should always be called from a worker thread and it will be
256   // called after DidProcessTask().
257   DCHECK(is_immediate_.load(std::memory_order_relaxed));
258 
259   bool has_ready_tasks = HasReadyTasks(now);
260   if (!has_ready_tasks)
261     is_immediate_.store(false, std::memory_order_relaxed);
262 
263   return has_ready_tasks;
264 }
265 
DelayedSortKeyWillChange(const Task & delayed_task) const266 bool Sequence::DelayedSortKeyWillChange(const Task& delayed_task) const {
267   // If sequence has already been picked up by a worker or moved, no need to
268   // proceed further here.
269   if (is_immediate_.load(std::memory_order_relaxed)) {
270     return false;
271   }
272 
273   if (IsEmpty()) {
274     return true;
275   }
276 
277   return delayed_task.latest_delayed_run_time() <
278          delayed_queue_.top().latest_delayed_run_time();
279 }
280 
HasReadyTasks(TimeTicks now) const281 bool Sequence::HasReadyTasks(TimeTicks now) const {
282   return now >= TS_UNCHECKED_READ(earliest_ready_time_)
283                     .load(std::memory_order_relaxed);
284 }
285 
HasImmediateTasks() const286 bool Sequence::HasImmediateTasks() const {
287   return !queue_.empty();
288 }
289 
GetSortKey() const290 TaskSourceSortKey Sequence::GetSortKey() const {
291   return TaskSourceSortKey(
292       priority_racy(),
293       TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed));
294 }
295 
GetDelayedSortKey() const296 TimeTicks Sequence::GetDelayedSortKey() const {
297   return TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed);
298 }
299 
Clear(TaskSource::Transaction * transaction)300 std::optional<Task> Sequence::Clear(TaskSource::Transaction* transaction) {
301   CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
302   AnnotateLockAcquired annotate(lock_);
303 
304   // See comment on TaskSource::task_runner_ for lifetime management details.
305   if (!IsEmpty() && !has_worker_) {
306     ReleaseTaskRunner();
307   }
308 
309   return Task(
310       FROM_HERE,
311       base::BindOnce(
312           [](base::queue<Task> queue,
313              base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue) {
314             while (!queue.empty())
315               queue.pop();
316 
317             while (!delayed_queue.empty())
318               delayed_queue.pop();
319           },
320           std::move(queue_), std::move(delayed_queue_)),
321       TimeTicks(), TimeDelta(), TimeDelta(),
322       static_cast<int>(reinterpret_cast<intptr_t>(this)));
323 }
324 
ReleaseTaskRunner()325 void Sequence::ReleaseTaskRunner() {
326   if (!task_runner())
327     return;
328   // No member access after this point, releasing |task_runner()| might delete
329   // |this|.
330   task_runner()->Release();
331 }
332 
Sequence(const TaskTraits & traits,SequencedTaskRunner * task_runner,TaskSourceExecutionMode execution_mode)333 Sequence::Sequence(const TaskTraits& traits,
334                    SequencedTaskRunner* task_runner,
335                    TaskSourceExecutionMode execution_mode)
336     : TaskSource(traits, execution_mode), task_runner_(task_runner) {}
337 
338 Sequence::~Sequence() = default;
339 
BeginTransaction()340 Sequence::Transaction Sequence::BeginTransaction() {
341   return Transaction(this);
342 }
343 
GetExecutionEnvironment()344 ExecutionEnvironment Sequence::GetExecutionEnvironment() {
345   if (execution_mode() == TaskSourceExecutionMode::kSingleThread) {
346     return {token_, &sequence_local_storage_,
347             static_cast<SingleThreadTaskRunner*>(task_runner())};
348   }
349   return {token_, &sequence_local_storage_, task_runner()};
350 }
351 
IsEmpty() const352 bool Sequence::IsEmpty() const {
353   return queue_.empty() && delayed_queue_.empty();
354 }
355 
356 }  // namespace internal
357 }  // namespace base
358