1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/delayed_task_manager.h"
6
7 #include <algorithm>
8 #include <optional>
9
10 #include "base/check.h"
11 #include "base/feature_list.h"
12 #include "base/functional/bind.h"
13 #include "base/task/common/checked_lock.h"
14 #include "base/task/sequenced_task_runner.h"
15 #include "base/task/task_features.h"
16 #include "base/task/task_runner.h"
17 #include "base/task/thread_pool/task.h"
18
19 namespace base {
20 namespace internal {
21
22 DelayedTaskManager::DelayedTask::DelayedTask() = default;
23
DelayedTask(Task task,PostTaskNowCallback callback)24 DelayedTaskManager::DelayedTask::DelayedTask(Task task,
25 PostTaskNowCallback callback)
26 : task(std::move(task)), callback(std::move(callback)) {}
27
28 DelayedTaskManager::DelayedTask::DelayedTask(
29 DelayedTaskManager::DelayedTask&& other) = default;
30
31 DelayedTaskManager::DelayedTask::~DelayedTask() = default;
32
33 DelayedTaskManager::DelayedTask& DelayedTaskManager::DelayedTask::operator=(
34 DelayedTaskManager::DelayedTask&& other) = default;
35
operator >(const DelayedTask & other) const36 bool DelayedTaskManager::DelayedTask::operator>(
37 const DelayedTask& other) const {
38 TimeTicks latest_delayed_run_time = task.latest_delayed_run_time();
39 TimeTicks other_latest_delayed_run_time =
40 other.task.latest_delayed_run_time();
41 return std::tie(latest_delayed_run_time, task.sequence_num) >
42 std::tie(other_latest_delayed_run_time, other.task.sequence_num);
43 }
44
DelayedTaskManager(const TickClock * tick_clock)45 DelayedTaskManager::DelayedTaskManager(const TickClock* tick_clock)
46 : process_ripe_tasks_closure_(
47 BindRepeating(&DelayedTaskManager::ProcessRipeTasks,
48 Unretained(this))),
49 schedule_process_ripe_tasks_closure_(BindRepeating(
50 &DelayedTaskManager::ScheduleProcessRipeTasksOnServiceThread,
51 Unretained(this))),
52 tick_clock_(tick_clock) {
53 DETACH_FROM_SEQUENCE(sequence_checker_);
54 DCHECK(tick_clock_);
55 }
56
~DelayedTaskManager()57 DelayedTaskManager::~DelayedTaskManager() {
58 delayed_task_handle_.CancelTask();
59 }
60
Start(scoped_refptr<SequencedTaskRunner> service_thread_task_runner)61 void DelayedTaskManager::Start(
62 scoped_refptr<SequencedTaskRunner> service_thread_task_runner) {
63 DCHECK(service_thread_task_runner);
64
65 TimeTicks process_ripe_tasks_time;
66 subtle::DelayPolicy delay_policy;
67 {
68 CheckedAutoLock auto_lock(queue_lock_);
69 DCHECK(!service_thread_task_runner_);
70 service_thread_task_runner_ = std::move(service_thread_task_runner);
71 max_precise_delay = kMaxPreciseDelay.Get();
72 std::tie(process_ripe_tasks_time, delay_policy) =
73 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
74 }
75 if (!process_ripe_tasks_time.is_max()) {
76 service_thread_task_runner_->PostTask(FROM_HERE,
77 schedule_process_ripe_tasks_closure_);
78 }
79 }
80
AddDelayedTask(Task task,PostTaskNowCallback post_task_now_callback)81 void DelayedTaskManager::AddDelayedTask(
82 Task task,
83 PostTaskNowCallback post_task_now_callback) {
84 DCHECK(task.task);
85 DCHECK(!task.delayed_run_time.is_null());
86 DCHECK(!task.queue_time.is_null());
87
88 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
89 // for details.
90 CHECK(task.task);
91 TimeTicks process_ripe_tasks_time;
92 subtle::DelayPolicy delay_policy;
93 {
94 CheckedAutoLock auto_lock(queue_lock_);
95 task.delay_policy = subtle::MaybeOverrideDelayPolicy(
96 task.delay_policy, task.delayed_run_time - task.queue_time,
97 max_precise_delay);
98
99 auto [old_process_ripe_tasks_time, old_delay_policy] =
100 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
101 delayed_task_queue_.insert(
102 DelayedTask(std::move(task), std::move(post_task_now_callback)));
103 // Not started or already shutdown.
104 if (service_thread_task_runner_ == nullptr)
105 return;
106
107 std::tie(process_ripe_tasks_time, delay_policy) =
108 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
109 // The next invocation of ProcessRipeTasks() doesn't need to change.
110 if (old_process_ripe_tasks_time == process_ripe_tasks_time &&
111 old_delay_policy == delay_policy) {
112 return;
113 }
114 }
115 if (!process_ripe_tasks_time.is_max()) {
116 service_thread_task_runner_->PostTask(FROM_HERE,
117 schedule_process_ripe_tasks_closure_);
118 }
119 }
120
ProcessRipeTasks()121 void DelayedTaskManager::ProcessRipeTasks() {
122 std::vector<DelayedTask> ripe_delayed_tasks;
123 TimeTicks process_ripe_tasks_time;
124
125 {
126 CheckedAutoLock auto_lock(queue_lock_);
127
128 // Already shutdown.
129 if (!service_thread_task_runner_)
130 return;
131
132 const TimeTicks now = tick_clock_->NowTicks();
133 // A delayed task is ripe if it reached its delayed run time or if it is
134 // canceled. If it is canceled, schedule its deletion on the correct
135 // sequence now rather than in the future, to minimize CPU wake ups and save
136 // power.
137 while (!delayed_task_queue_.empty() &&
138 (delayed_task_queue_.top().task.earliest_delayed_run_time() <= now ||
139 !delayed_task_queue_.top().task.task.MaybeValid())) {
140 // The const_cast on top is okay since the DelayedTask is
141 // transactionally being popped from |delayed_task_queue_| right after
142 // and the move doesn't alter the sort order.
143 ripe_delayed_tasks.push_back(
144 std::move(const_cast<DelayedTask&>(delayed_task_queue_.top())));
145 delayed_task_queue_.pop();
146 }
147 std::tie(process_ripe_tasks_time, std::ignore) =
148 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
149 }
150 if (!process_ripe_tasks_time.is_max()) {
151 if (service_thread_task_runner_->RunsTasksInCurrentSequence()) {
152 ScheduleProcessRipeTasksOnServiceThread();
153 } else {
154 // ProcessRipeTasks may be called on another thread under tests.
155 service_thread_task_runner_->PostTask(
156 FROM_HERE, schedule_process_ripe_tasks_closure_);
157 }
158 }
159
160 for (auto& delayed_task : ripe_delayed_tasks) {
161 std::move(delayed_task.callback).Run(std::move(delayed_task.task));
162 }
163 }
164
NextScheduledRunTime() const165 std::optional<TimeTicks> DelayedTaskManager::NextScheduledRunTime() const {
166 CheckedAutoLock auto_lock(queue_lock_);
167 if (delayed_task_queue_.empty())
168 return std::nullopt;
169 return delayed_task_queue_.top().task.delayed_run_time;
170 }
171
TopTaskDelayPolicyForTesting() const172 subtle::DelayPolicy DelayedTaskManager::TopTaskDelayPolicyForTesting() const {
173 CheckedAutoLock auto_lock(queue_lock_);
174 return delayed_task_queue_.top().task.delay_policy;
175 }
176
Shutdown()177 void DelayedTaskManager::Shutdown() {
178 scoped_refptr<SequencedTaskRunner> service_thread_task_runner;
179
180 {
181 CheckedAutoLock auto_lock(queue_lock_);
182 // Prevent delayed tasks from being posted or processed after this.
183 service_thread_task_runner = service_thread_task_runner_;
184 }
185
186 if (service_thread_task_runner) {
187 // Cancel our delayed task on the service thread. This cannot be done from
188 // ~DelayedTaskManager because the delayed task handle is sequence-affine.
189 service_thread_task_runner->PostTask(
190 FROM_HERE,
191 base::BindOnce(
192 [](DelayedTaskManager* manager) {
193 DCHECK_CALLED_ON_VALID_SEQUENCE(manager->sequence_checker_);
194 manager->delayed_task_handle_.CancelTask();
195 },
196 // Unretained() is safe because the caller must flush tasks posted
197 // to the service thread before deleting `this`.
198 Unretained(this)));
199 }
200 }
201
202 std::pair<TimeTicks, subtle::DelayPolicy> DelayedTaskManager::
GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired()203 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired() {
204 queue_lock_.AssertAcquired();
205 if (delayed_task_queue_.empty()) {
206 return std::make_pair(TimeTicks::Max(),
207 subtle::DelayPolicy::kFlexibleNoSooner);
208 }
209
210 const DelayedTask& ripest_delayed_task = delayed_task_queue_.top();
211 subtle::DelayPolicy delay_policy = ripest_delayed_task.task.delay_policy;
212 return std::make_pair(ripest_delayed_task.task.delayed_run_time,
213 delay_policy);
214 }
215
ScheduleProcessRipeTasksOnServiceThread()216 void DelayedTaskManager::ScheduleProcessRipeTasksOnServiceThread() {
217 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
218
219 TimeTicks process_ripe_tasks_time;
220 subtle::DelayPolicy delay_policy;
221 {
222 CheckedAutoLock auto_lock(queue_lock_);
223 std::tie(process_ripe_tasks_time, delay_policy) =
224 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
225 }
226 DCHECK(!process_ripe_tasks_time.is_null());
227 if (process_ripe_tasks_time.is_max())
228 return;
229 delayed_task_handle_.CancelTask();
230 delayed_task_handle_ =
231 service_thread_task_runner_->PostCancelableDelayedTaskAt(
232 subtle::PostDelayedTaskPassKey(), FROM_HERE,
233 process_ripe_tasks_closure_, process_ripe_tasks_time, delay_policy);
234 }
235
236 } // namespace internal
237 } // namespace base
238