1 // Copyright 2023 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group_worker_delegate.h"
6
7 #include "base/metrics/histogram_functions.h"
8 #include "base/strings/stringprintf.h"
9 #include "base/threading/scoped_blocking_call.h"
10 #include "base/threading/scoped_blocking_call_internal.h"
11 #include "base/threading/thread_checker.h"
12 #include "base/time/time.h"
13 #include "base/time/time_override.h"
14
15 namespace base::internal {
16
ThreadGroupWorkerDelegate(TrackedRef<ThreadGroup> outer,bool is_excess)17 ThreadGroup::ThreadGroupWorkerDelegate::ThreadGroupWorkerDelegate(
18 TrackedRef<ThreadGroup> outer,
19 bool is_excess)
20 : outer_(outer), is_excess_(is_excess) {
21 // Bound in OnMainEntry().
22 DETACH_FROM_THREAD(worker_thread_checker_);
23 }
24
25 ThreadGroup::ThreadGroupWorkerDelegate::~ThreadGroupWorkerDelegate() = default;
26
27 ThreadGroup::ThreadGroupWorkerDelegate::WorkerOnly::WorkerOnly() = default;
28 ThreadGroup::ThreadGroupWorkerDelegate::WorkerOnly::~WorkerOnly() = default;
29
ThreadPoolSleepTimeout()30 TimeDelta ThreadGroup::ThreadGroupWorkerDelegate::ThreadPoolSleepTimeout() {
31 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
32 if (!is_excess_) {
33 return TimeDelta::Max();
34 }
35 // Sleep for an extra 10% to avoid the following pathological case:
36 // 0) A task is running on a timer which matches
37 // |after_start().suggested_reclaim_time|.
38 // 1) The timer fires and this worker is created by
39 // MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
40 // worker was assigned the task.
41 // 2) This worker begins sleeping |after_start().suggested_reclaim_time|
42 // (at the front of the idle set).
43 // 3) The task assigned to the other worker completes and the worker goes
44 // back in the idle set (this worker may now second on the idle set;
45 // its GetLastUsedTime() is set to Now()).
46 // 4) The sleep in (2) expires. Since (3) was fast this worker is likely
47 // to have been second on the idle set long enough for
48 // CanCleanupLockRequired() to be satisfied in which case this worker
49 // is cleaned up.
50 // 5) The timer fires at roughly the same time and we're back to (1) if
51 // (4) resulted in a clean up; causing thread churn.
52 //
53 // Sleeping 10% longer in (2) makes it much less likely that (4) occurs
54 // before (5). In that case (5) will cause (3) and refresh this worker's
55 // GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
56 // and avoiding churn.
57 //
58 // Of course the same problem arises if in (0) the timer matches
59 // |after_start().suggested_reclaim_time * 1.1| but it's expected that any
60 // timer slower than |after_start().suggested_reclaim_time| will cause
61 // such churn during long idle periods. If this is a problem in practice,
62 // the standby thread configuration and algorithm should be revisited.
63 return outer_->after_start().suggested_reclaim_time * 1.1;
64 }
65
66 // BlockingObserver:
BlockingStarted(BlockingType blocking_type)67 void ThreadGroup::ThreadGroupWorkerDelegate::BlockingStarted(
68 BlockingType blocking_type) {
69 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
70 DCHECK(worker_only().worker_thread_);
71 // Skip if this blocking scope happened outside of a RunTask.
72 if (!read_worker().current_task_priority) {
73 return;
74 }
75
76 worker_only().worker_thread_->MaybeUpdateThreadType();
77
78 // WillBlock is always used when time overrides is active. crbug.com/1038867
79 if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
80 blocking_type = BlockingType::WILL_BLOCK;
81 }
82
83 std::unique_ptr<BaseScopedCommandsExecutor> executor = outer_->GetExecutor();
84 CheckedAutoLock auto_lock(outer_->lock_);
85
86 DCHECK(!incremented_max_tasks_since_blocked_);
87 DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
88 DCHECK(read_worker().blocking_start_time.is_null());
89 write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride();
90
91 if (incremented_max_tasks_for_shutdown_) {
92 return;
93 }
94
95 if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT) {
96 ++outer_->num_unresolved_best_effort_may_block_;
97 }
98
99 if (blocking_type == BlockingType::WILL_BLOCK) {
100 incremented_max_tasks_since_blocked_ = true;
101 outer_->IncrementMaxTasksLockRequired();
102 outer_->EnsureEnoughWorkersLockRequired(executor.get());
103 } else {
104 ++outer_->num_unresolved_may_block_;
105 }
106
107 outer_->MaybeScheduleAdjustMaxTasksLockRequired(executor.get());
108 }
109
BlockingTypeUpgraded()110 void ThreadGroup::ThreadGroupWorkerDelegate::BlockingTypeUpgraded() {
111 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
112 // Skip if this blocking scope happened outside of a RunTask.
113 if (!read_worker().current_task_priority) {
114 return;
115 }
116
117 // The blocking type always being WILL_BLOCK in this experiment and with
118 // time overrides, it should never be considered "upgraded".
119 if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
120 return;
121 }
122
123 std::unique_ptr<BaseScopedCommandsExecutor> executor = outer_->GetExecutor();
124 CheckedAutoLock auto_lock(outer_->lock_);
125
126 // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
127 // same scope already caused the max tasks to be incremented.
128 if (incremented_max_tasks_since_blocked_) {
129 return;
130 }
131
132 // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
133 // same scope.
134 --outer_->num_unresolved_may_block_;
135
136 incremented_max_tasks_since_blocked_ = true;
137 outer_->IncrementMaxTasksLockRequired();
138 outer_->EnsureEnoughWorkersLockRequired(executor.get());
139 }
140
BlockingEnded()141 void ThreadGroup::ThreadGroupWorkerDelegate::BlockingEnded() {
142 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
143 // Skip if this blocking scope happened outside of a RunTask.
144 if (!read_worker().current_task_priority) {
145 return;
146 }
147
148 CheckedAutoLock auto_lock(outer_->lock_);
149 DCHECK(!read_worker().blocking_start_time.is_null());
150 write_worker().blocking_start_time = TimeTicks();
151 if (!incremented_max_tasks_for_shutdown_) {
152 if (incremented_max_tasks_since_blocked_) {
153 outer_->DecrementMaxTasksLockRequired();
154 } else {
155 --outer_->num_unresolved_may_block_;
156 }
157
158 if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
159 if (incremented_max_best_effort_tasks_since_blocked_) {
160 outer_->DecrementMaxBestEffortTasksLockRequired();
161 } else {
162 --outer_->num_unresolved_best_effort_may_block_;
163 }
164 }
165 }
166
167 incremented_max_tasks_since_blocked_ = false;
168 incremented_max_best_effort_tasks_since_blocked_ = false;
169 }
170
171 // Notifies the worker of shutdown, possibly marking the running task as
172 // MAY_BLOCK.
OnShutdownStartedLockRequired(BaseScopedCommandsExecutor * executor)173 void ThreadGroup::ThreadGroupWorkerDelegate::OnShutdownStartedLockRequired(
174 BaseScopedCommandsExecutor* executor) {
175 if (!read_any().is_running_task()) {
176 return;
177 }
178 // Workers running a CONTINUE_ON_SHUTDOWN tasks are replaced by incrementing
179 // max_tasks/max_best_effort_tasks. The effect is reverted in
180 // DidProcessTask().
181 if (*read_any().current_shutdown_behavior ==
182 TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
183 incremented_max_tasks_for_shutdown_ = true;
184 IncrementMaxTasksLockRequired();
185 }
186 }
187
188 // Increments max [best effort] tasks iff this worker has been within a
189 // ScopedBlockingCall for more than |may_block_threshold|.
190 void ThreadGroup::ThreadGroupWorkerDelegate::
MaybeIncrementMaxTasksLockRequired()191 MaybeIncrementMaxTasksLockRequired()
192 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
193 if (read_any().blocking_start_time.is_null() ||
194 subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time <
195 outer_->after_start().may_block_threshold) {
196 return;
197 }
198 IncrementMaxTasksLockRequired();
199 }
200
201 // Increments max [best effort] tasks.
IncrementMaxTasksLockRequired()202 void ThreadGroup::ThreadGroupWorkerDelegate::IncrementMaxTasksLockRequired()
203 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
204 if (!incremented_max_tasks_since_blocked_) {
205 outer_->IncrementMaxTasksLockRequired();
206 // Update state for an unresolved ScopedBlockingCall.
207 if (!read_any().blocking_start_time.is_null()) {
208 incremented_max_tasks_since_blocked_ = true;
209 --outer_->num_unresolved_may_block_;
210 }
211 }
212 if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT &&
213 !incremented_max_best_effort_tasks_since_blocked_) {
214 outer_->IncrementMaxBestEffortTasksLockRequired();
215 // Update state for an unresolved ScopedBlockingCall.
216 if (!read_any().blocking_start_time.is_null()) {
217 incremented_max_best_effort_tasks_since_blocked_ = true;
218 --outer_->num_unresolved_best_effort_may_block_;
219 }
220 }
221 }
222
223 RegisteredTaskSource
GetWorkLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker)224 ThreadGroup::ThreadGroupWorkerDelegate::GetWorkLockRequired(
225 BaseScopedCommandsExecutor* executor,
226 WorkerThread* worker) {
227 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
228 DCHECK(ContainsWorker(outer_->workers_, worker));
229
230 // Use this opportunity, before assigning work to this worker, to
231 // create/signal additional workers if needed (doing this here allows us to
232 // reduce potentially expensive create/wake directly on PostTask()).
233 //
234 // Note: FlushWorkerCreation() below releases |outer_->lock_|. It is thus
235 // important that all other operations come after it to keep this method
236 // transactional.
237 outer_->EnsureEnoughWorkersLockRequired(executor);
238 executor->FlushWorkerCreation(&outer_->lock_);
239
240 if (!CanGetWorkLockRequired(executor, worker)) {
241 return nullptr;
242 }
243
244 RegisteredTaskSource task_source;
245 TaskPriority priority;
246 while (!task_source && !outer_->priority_queue_.IsEmpty()) {
247 // Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
248 // BEST_EFFORT tasks run concurrently.
249 priority = outer_->priority_queue_.PeekSortKey().priority();
250 if (!outer_->task_tracker_->CanRunPriority(priority) ||
251 (priority == TaskPriority::BEST_EFFORT &&
252 outer_->num_running_best_effort_tasks_ >=
253 outer_->max_best_effort_tasks_)) {
254 break;
255 }
256
257 task_source = outer_->TakeRegisteredTaskSource(executor);
258 }
259 if (!task_source) {
260 OnWorkerBecomesIdleLockRequired(executor, worker);
261 return nullptr;
262 }
263
264 // Running task bookkeeping.
265 outer_->IncrementTasksRunningLockRequired(priority);
266
267 write_worker().current_task_priority = priority;
268 write_worker().current_shutdown_behavior = task_source->shutdown_behavior();
269
270 return task_source;
271 }
272
RecordUnnecessaryWakeupImpl()273 void ThreadGroup::ThreadGroupWorkerDelegate::RecordUnnecessaryWakeupImpl() {
274 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
275
276 base::BooleanHistogram::FactoryGet(
277 std::string("ThreadPool.UnnecessaryWakeup.") + outer_->histogram_label_,
278 base::Histogram::kUmaTargetedHistogramFlag)
279 ->Add(true);
280
281 TRACE_EVENT_INSTANT("wakeup.flow", "ThreadPool.UnnecessaryWakeup");
282 }
283
OnMainEntryImpl(WorkerThread * worker)284 void ThreadGroup::ThreadGroupWorkerDelegate::OnMainEntryImpl(
285 WorkerThread* worker) {
286 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
287
288 {
289 #if DCHECK_IS_ON()
290 CheckedAutoLock auto_lock(outer_->lock_);
291 DCHECK(
292 ContainsWorker(outer_->workers_, static_cast<WorkerThread*>(worker)));
293 #endif
294 }
295
296 #if BUILDFLAG(IS_WIN)
297 worker_only().win_thread_environment = GetScopedWindowsThreadEnvironment(
298 outer_->after_start().worker_environment);
299 #endif // BUILDFLAG(IS_WIN)
300
301 PlatformThread::SetName(
302 StringPrintf("ThreadPool%sWorker", outer_->thread_group_label_.c_str()));
303
304 outer_->BindToCurrentThread();
305 worker_only().worker_thread_ = static_cast<WorkerThread*>(worker);
306 SetBlockingObserverForCurrentThread(this);
307
308 if (outer_->worker_started_for_testing_) {
309 // When |worker_started_for_testing_| is set, the thread that starts workers
310 // should wait for a worker to have started before starting the next one,
311 // and there should only be one thread that wakes up workers at a time.
312 DCHECK(!outer_->worker_started_for_testing_->IsSignaled());
313 outer_->worker_started_for_testing_->Signal();
314 }
315 }
316
317 } // namespace base::internal
318