xref: /aosp_15_r20/external/cronet/base/task/thread_pool/delayed_task_manager.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/delayed_task_manager.h"
6 
7 #include <algorithm>
8 #include <optional>
9 
10 #include "base/check.h"
11 #include "base/feature_list.h"
12 #include "base/functional/bind.h"
13 #include "base/task/common/checked_lock.h"
14 #include "base/task/sequenced_task_runner.h"
15 #include "base/task/task_features.h"
16 #include "base/task/task_runner.h"
17 #include "base/task/thread_pool/task.h"
18 
19 namespace base {
20 namespace internal {
21 
22 DelayedTaskManager::DelayedTask::DelayedTask() = default;
23 
DelayedTask(Task task,PostTaskNowCallback callback)24 DelayedTaskManager::DelayedTask::DelayedTask(Task task,
25                                              PostTaskNowCallback callback)
26     : task(std::move(task)), callback(std::move(callback)) {}
27 
28 DelayedTaskManager::DelayedTask::DelayedTask(
29     DelayedTaskManager::DelayedTask&& other) = default;
30 
31 DelayedTaskManager::DelayedTask::~DelayedTask() = default;
32 
33 DelayedTaskManager::DelayedTask& DelayedTaskManager::DelayedTask::operator=(
34     DelayedTaskManager::DelayedTask&& other) = default;
35 
operator >(const DelayedTask & other) const36 bool DelayedTaskManager::DelayedTask::operator>(
37     const DelayedTask& other) const {
38   TimeTicks latest_delayed_run_time = task.latest_delayed_run_time();
39   TimeTicks other_latest_delayed_run_time =
40       other.task.latest_delayed_run_time();
41   return std::tie(latest_delayed_run_time, task.sequence_num) >
42          std::tie(other_latest_delayed_run_time, other.task.sequence_num);
43 }
44 
DelayedTaskManager(const TickClock * tick_clock)45 DelayedTaskManager::DelayedTaskManager(const TickClock* tick_clock)
46     : process_ripe_tasks_closure_(
47           BindRepeating(&DelayedTaskManager::ProcessRipeTasks,
48                         Unretained(this))),
49       schedule_process_ripe_tasks_closure_(BindRepeating(
50           &DelayedTaskManager::ScheduleProcessRipeTasksOnServiceThread,
51           Unretained(this))),
52       tick_clock_(tick_clock) {
53   DETACH_FROM_SEQUENCE(sequence_checker_);
54   DCHECK(tick_clock_);
55 }
56 
~DelayedTaskManager()57 DelayedTaskManager::~DelayedTaskManager() {
58   delayed_task_handle_.CancelTask();
59 }
60 
Start(scoped_refptr<SequencedTaskRunner> service_thread_task_runner)61 void DelayedTaskManager::Start(
62     scoped_refptr<SequencedTaskRunner> service_thread_task_runner) {
63   DCHECK(service_thread_task_runner);
64 
65   TimeTicks process_ripe_tasks_time;
66   subtle::DelayPolicy delay_policy;
67   {
68     CheckedAutoLock auto_lock(queue_lock_);
69     DCHECK(!service_thread_task_runner_);
70     service_thread_task_runner_ = std::move(service_thread_task_runner);
71     max_precise_delay = kMaxPreciseDelay.Get();
72     std::tie(process_ripe_tasks_time, delay_policy) =
73         GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
74   }
75   if (!process_ripe_tasks_time.is_max()) {
76     service_thread_task_runner_->PostTask(FROM_HERE,
77                                           schedule_process_ripe_tasks_closure_);
78   }
79 }
80 
AddDelayedTask(Task task,PostTaskNowCallback post_task_now_callback)81 void DelayedTaskManager::AddDelayedTask(
82     Task task,
83     PostTaskNowCallback post_task_now_callback) {
84   DCHECK(task.task);
85   DCHECK(!task.delayed_run_time.is_null());
86   DCHECK(!task.queue_time.is_null());
87 
88   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
89   // for details.
90   CHECK(task.task);
91   TimeTicks process_ripe_tasks_time;
92   subtle::DelayPolicy delay_policy;
93   {
94     CheckedAutoLock auto_lock(queue_lock_);
95     task.delay_policy = subtle::MaybeOverrideDelayPolicy(
96         task.delay_policy, task.delayed_run_time - task.queue_time,
97         max_precise_delay);
98 
99     auto [old_process_ripe_tasks_time, old_delay_policy] =
100         GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
101     delayed_task_queue_.insert(
102         DelayedTask(std::move(task), std::move(post_task_now_callback)));
103     // Not started or already shutdown.
104     if (service_thread_task_runner_ == nullptr)
105       return;
106 
107     std::tie(process_ripe_tasks_time, delay_policy) =
108         GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
109     // The next invocation of ProcessRipeTasks() doesn't need to change.
110     if (old_process_ripe_tasks_time == process_ripe_tasks_time &&
111         old_delay_policy == delay_policy) {
112       return;
113     }
114   }
115   if (!process_ripe_tasks_time.is_max()) {
116     service_thread_task_runner_->PostTask(FROM_HERE,
117                                           schedule_process_ripe_tasks_closure_);
118   }
119 }
120 
ProcessRipeTasks()121 void DelayedTaskManager::ProcessRipeTasks() {
122   std::vector<DelayedTask> ripe_delayed_tasks;
123   TimeTicks process_ripe_tasks_time;
124 
125   {
126     CheckedAutoLock auto_lock(queue_lock_);
127 
128     // Already shutdown.
129     if (!service_thread_task_runner_)
130       return;
131 
132     const TimeTicks now = tick_clock_->NowTicks();
133     // A delayed task is ripe if it reached its delayed run time or if it is
134     // canceled. If it is canceled, schedule its deletion on the correct
135     // sequence now rather than in the future, to minimize CPU wake ups and save
136     // power.
137     while (!delayed_task_queue_.empty() &&
138            (delayed_task_queue_.top().task.earliest_delayed_run_time() <= now ||
139             !delayed_task_queue_.top().task.task.MaybeValid())) {
140       // The const_cast on top is okay since the DelayedTask is
141       // transactionally being popped from |delayed_task_queue_| right after
142       // and the move doesn't alter the sort order.
143       ripe_delayed_tasks.push_back(
144           std::move(const_cast<DelayedTask&>(delayed_task_queue_.top())));
145       delayed_task_queue_.pop();
146     }
147     std::tie(process_ripe_tasks_time, std::ignore) =
148         GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
149   }
150   if (!process_ripe_tasks_time.is_max()) {
151     if (service_thread_task_runner_->RunsTasksInCurrentSequence()) {
152       ScheduleProcessRipeTasksOnServiceThread();
153     } else {
154       // ProcessRipeTasks may be called on another thread under tests.
155       service_thread_task_runner_->PostTask(
156           FROM_HERE, schedule_process_ripe_tasks_closure_);
157     }
158   }
159 
160   for (auto& delayed_task : ripe_delayed_tasks) {
161     std::move(delayed_task.callback).Run(std::move(delayed_task.task));
162   }
163 }
164 
NextScheduledRunTime() const165 std::optional<TimeTicks> DelayedTaskManager::NextScheduledRunTime() const {
166   CheckedAutoLock auto_lock(queue_lock_);
167   if (delayed_task_queue_.empty())
168     return std::nullopt;
169   return delayed_task_queue_.top().task.delayed_run_time;
170 }
171 
TopTaskDelayPolicyForTesting() const172 subtle::DelayPolicy DelayedTaskManager::TopTaskDelayPolicyForTesting() const {
173   CheckedAutoLock auto_lock(queue_lock_);
174   return delayed_task_queue_.top().task.delay_policy;
175 }
176 
Shutdown()177 void DelayedTaskManager::Shutdown() {
178   scoped_refptr<SequencedTaskRunner> service_thread_task_runner;
179 
180   {
181     CheckedAutoLock auto_lock(queue_lock_);
182     // Prevent delayed tasks from being posted or processed after this.
183     service_thread_task_runner = service_thread_task_runner_;
184   }
185 
186   if (service_thread_task_runner) {
187     // Cancel our delayed task on the service thread. This cannot be done from
188     // ~DelayedTaskManager because the delayed task handle is sequence-affine.
189     service_thread_task_runner->PostTask(
190         FROM_HERE,
191         base::BindOnce(
192             [](DelayedTaskManager* manager) {
193               DCHECK_CALLED_ON_VALID_SEQUENCE(manager->sequence_checker_);
194               manager->delayed_task_handle_.CancelTask();
195             },
196             // Unretained() is safe because the caller must flush tasks posted
197             // to the service thread before deleting `this`.
198             Unretained(this)));
199   }
200 }
201 
202 std::pair<TimeTicks, subtle::DelayPolicy> DelayedTaskManager::
GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired()203     GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired() {
204   queue_lock_.AssertAcquired();
205   if (delayed_task_queue_.empty()) {
206     return std::make_pair(TimeTicks::Max(),
207                           subtle::DelayPolicy::kFlexibleNoSooner);
208   }
209 
210   const DelayedTask& ripest_delayed_task = delayed_task_queue_.top();
211   subtle::DelayPolicy delay_policy = ripest_delayed_task.task.delay_policy;
212   return std::make_pair(ripest_delayed_task.task.delayed_run_time,
213                         delay_policy);
214 }
215 
ScheduleProcessRipeTasksOnServiceThread()216 void DelayedTaskManager::ScheduleProcessRipeTasksOnServiceThread() {
217   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
218 
219   TimeTicks process_ripe_tasks_time;
220   subtle::DelayPolicy delay_policy;
221   {
222     CheckedAutoLock auto_lock(queue_lock_);
223     std::tie(process_ripe_tasks_time, delay_policy) =
224         GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
225   }
226   DCHECK(!process_ripe_tasks_time.is_null());
227   if (process_ripe_tasks_time.is_max())
228     return;
229   delayed_task_handle_.CancelTask();
230   delayed_task_handle_ =
231       service_thread_task_runner_->PostCancelableDelayedTaskAt(
232           subtle::PostDelayedTaskPassKey(), FROM_HERE,
233           process_ripe_tasks_closure_, process_ripe_tasks_time, delay_policy);
234 }
235 
236 }  // namespace internal
237 }  // namespace base
238