xref: /aosp_15_r20/external/cronet/base/task/thread_pool/thread_group_worker_delegate.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2023 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group_worker_delegate.h"
6 
7 #include "base/metrics/histogram_functions.h"
8 #include "base/strings/stringprintf.h"
9 #include "base/threading/scoped_blocking_call.h"
10 #include "base/threading/scoped_blocking_call_internal.h"
11 #include "base/threading/thread_checker.h"
12 #include "base/time/time.h"
13 #include "base/time/time_override.h"
14 
15 namespace base::internal {
16 
ThreadGroupWorkerDelegate(TrackedRef<ThreadGroup> outer,bool is_excess)17 ThreadGroup::ThreadGroupWorkerDelegate::ThreadGroupWorkerDelegate(
18     TrackedRef<ThreadGroup> outer,
19     bool is_excess)
20     : outer_(outer), is_excess_(is_excess) {
21   // Bound in OnMainEntry().
22   DETACH_FROM_THREAD(worker_thread_checker_);
23 }
24 
25 ThreadGroup::ThreadGroupWorkerDelegate::~ThreadGroupWorkerDelegate() = default;
26 
27 ThreadGroup::ThreadGroupWorkerDelegate::WorkerOnly::WorkerOnly() = default;
28 ThreadGroup::ThreadGroupWorkerDelegate::WorkerOnly::~WorkerOnly() = default;
29 
ThreadPoolSleepTimeout()30 TimeDelta ThreadGroup::ThreadGroupWorkerDelegate::ThreadPoolSleepTimeout() {
31   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
32   if (!is_excess_) {
33     return TimeDelta::Max();
34   }
35   // Sleep for an extra 10% to avoid the following pathological case:
36   //   0) A task is running on a timer which matches
37   //      |after_start().suggested_reclaim_time|.
38   //   1) The timer fires and this worker is created by
39   //      MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
40   //      worker was assigned the task.
41   //   2) This worker begins sleeping |after_start().suggested_reclaim_time|
42   //      (at the front of the idle set).
43   //   3) The task assigned to the other worker completes and the worker goes
44   //      back in the idle set (this worker may now second on the idle set;
45   //      its GetLastUsedTime() is set to Now()).
46   //   4) The sleep in (2) expires. Since (3) was fast this worker is likely
47   //      to have been second on the idle set long enough for
48   //      CanCleanupLockRequired() to be satisfied in which case this worker
49   //      is cleaned up.
50   //   5) The timer fires at roughly the same time and we're back to (1) if
51   //      (4) resulted in a clean up; causing thread churn.
52   //
53   //   Sleeping 10% longer in (2) makes it much less likely that (4) occurs
54   //   before (5). In that case (5) will cause (3) and refresh this worker's
55   //   GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
56   //   and avoiding churn.
57   //
58   //   Of course the same problem arises if in (0) the timer matches
59   //   |after_start().suggested_reclaim_time * 1.1| but it's expected that any
60   //   timer slower than |after_start().suggested_reclaim_time| will cause
61   //   such churn during long idle periods. If this is a problem in practice,
62   //   the standby thread configuration and algorithm should be revisited.
63   return outer_->after_start().suggested_reclaim_time * 1.1;
64 }
65 
66 // BlockingObserver:
BlockingStarted(BlockingType blocking_type)67 void ThreadGroup::ThreadGroupWorkerDelegate::BlockingStarted(
68     BlockingType blocking_type) {
69   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
70   DCHECK(worker_only().worker_thread_);
71   // Skip if this blocking scope happened outside of a RunTask.
72   if (!read_worker().current_task_priority) {
73     return;
74   }
75 
76   worker_only().worker_thread_->MaybeUpdateThreadType();
77 
78   // WillBlock is always used when time overrides is active. crbug.com/1038867
79   if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
80     blocking_type = BlockingType::WILL_BLOCK;
81   }
82 
83   std::unique_ptr<BaseScopedCommandsExecutor> executor = outer_->GetExecutor();
84   CheckedAutoLock auto_lock(outer_->lock_);
85 
86   DCHECK(!incremented_max_tasks_since_blocked_);
87   DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
88   DCHECK(read_worker().blocking_start_time.is_null());
89   write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride();
90 
91   if (incremented_max_tasks_for_shutdown_) {
92     return;
93   }
94 
95   if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT) {
96     ++outer_->num_unresolved_best_effort_may_block_;
97   }
98 
99   if (blocking_type == BlockingType::WILL_BLOCK) {
100     incremented_max_tasks_since_blocked_ = true;
101     outer_->IncrementMaxTasksLockRequired();
102     outer_->EnsureEnoughWorkersLockRequired(executor.get());
103   } else {
104     ++outer_->num_unresolved_may_block_;
105   }
106 
107   outer_->MaybeScheduleAdjustMaxTasksLockRequired(executor.get());
108 }
109 
BlockingTypeUpgraded()110 void ThreadGroup::ThreadGroupWorkerDelegate::BlockingTypeUpgraded() {
111   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
112   // Skip if this blocking scope happened outside of a RunTask.
113   if (!read_worker().current_task_priority) {
114     return;
115   }
116 
117   // The blocking type always being WILL_BLOCK in this experiment and with
118   // time overrides, it should never be considered "upgraded".
119   if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
120     return;
121   }
122 
123   std::unique_ptr<BaseScopedCommandsExecutor> executor = outer_->GetExecutor();
124   CheckedAutoLock auto_lock(outer_->lock_);
125 
126   // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
127   // same scope already caused the max tasks to be incremented.
128   if (incremented_max_tasks_since_blocked_) {
129     return;
130   }
131 
132   // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
133   // same scope.
134   --outer_->num_unresolved_may_block_;
135 
136   incremented_max_tasks_since_blocked_ = true;
137   outer_->IncrementMaxTasksLockRequired();
138   outer_->EnsureEnoughWorkersLockRequired(executor.get());
139 }
140 
BlockingEnded()141 void ThreadGroup::ThreadGroupWorkerDelegate::BlockingEnded() {
142   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
143   // Skip if this blocking scope happened outside of a RunTask.
144   if (!read_worker().current_task_priority) {
145     return;
146   }
147 
148   CheckedAutoLock auto_lock(outer_->lock_);
149   DCHECK(!read_worker().blocking_start_time.is_null());
150   write_worker().blocking_start_time = TimeTicks();
151   if (!incremented_max_tasks_for_shutdown_) {
152     if (incremented_max_tasks_since_blocked_) {
153       outer_->DecrementMaxTasksLockRequired();
154     } else {
155       --outer_->num_unresolved_may_block_;
156     }
157 
158     if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
159       if (incremented_max_best_effort_tasks_since_blocked_) {
160         outer_->DecrementMaxBestEffortTasksLockRequired();
161       } else {
162         --outer_->num_unresolved_best_effort_may_block_;
163       }
164     }
165   }
166 
167   incremented_max_tasks_since_blocked_ = false;
168   incremented_max_best_effort_tasks_since_blocked_ = false;
169 }
170 
171 // Notifies the worker of shutdown, possibly marking the running task as
172 // MAY_BLOCK.
OnShutdownStartedLockRequired(BaseScopedCommandsExecutor * executor)173 void ThreadGroup::ThreadGroupWorkerDelegate::OnShutdownStartedLockRequired(
174     BaseScopedCommandsExecutor* executor) {
175   if (!read_any().is_running_task()) {
176     return;
177   }
178   // Workers running a CONTINUE_ON_SHUTDOWN tasks are replaced by incrementing
179   // max_tasks/max_best_effort_tasks. The effect is reverted in
180   // DidProcessTask().
181   if (*read_any().current_shutdown_behavior ==
182       TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
183     incremented_max_tasks_for_shutdown_ = true;
184     IncrementMaxTasksLockRequired();
185   }
186 }
187 
188 // Increments max [best effort] tasks iff this worker has been within a
189 // ScopedBlockingCall for more than |may_block_threshold|.
190 void ThreadGroup::ThreadGroupWorkerDelegate::
MaybeIncrementMaxTasksLockRequired()191     MaybeIncrementMaxTasksLockRequired()
192         EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
193   if (read_any().blocking_start_time.is_null() ||
194       subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time <
195           outer_->after_start().may_block_threshold) {
196     return;
197   }
198   IncrementMaxTasksLockRequired();
199 }
200 
201 // Increments max [best effort] tasks.
IncrementMaxTasksLockRequired()202 void ThreadGroup::ThreadGroupWorkerDelegate::IncrementMaxTasksLockRequired()
203     EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
204   if (!incremented_max_tasks_since_blocked_) {
205     outer_->IncrementMaxTasksLockRequired();
206     // Update state for an unresolved ScopedBlockingCall.
207     if (!read_any().blocking_start_time.is_null()) {
208       incremented_max_tasks_since_blocked_ = true;
209       --outer_->num_unresolved_may_block_;
210     }
211   }
212   if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT &&
213       !incremented_max_best_effort_tasks_since_blocked_) {
214     outer_->IncrementMaxBestEffortTasksLockRequired();
215     // Update state for an unresolved ScopedBlockingCall.
216     if (!read_any().blocking_start_time.is_null()) {
217       incremented_max_best_effort_tasks_since_blocked_ = true;
218       --outer_->num_unresolved_best_effort_may_block_;
219     }
220   }
221 }
222 
223 RegisteredTaskSource
GetWorkLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker)224 ThreadGroup::ThreadGroupWorkerDelegate::GetWorkLockRequired(
225     BaseScopedCommandsExecutor* executor,
226     WorkerThread* worker) {
227   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
228   DCHECK(ContainsWorker(outer_->workers_, worker));
229 
230   // Use this opportunity, before assigning work to this worker, to
231   // create/signal additional workers if needed (doing this here allows us to
232   // reduce potentially expensive create/wake directly on PostTask()).
233   //
234   // Note: FlushWorkerCreation() below releases |outer_->lock_|. It is thus
235   // important that all other operations come after it to keep this method
236   // transactional.
237   outer_->EnsureEnoughWorkersLockRequired(executor);
238   executor->FlushWorkerCreation(&outer_->lock_);
239 
240   if (!CanGetWorkLockRequired(executor, worker)) {
241     return nullptr;
242   }
243 
244   RegisteredTaskSource task_source;
245   TaskPriority priority;
246   while (!task_source && !outer_->priority_queue_.IsEmpty()) {
247     // Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
248     // BEST_EFFORT tasks run concurrently.
249     priority = outer_->priority_queue_.PeekSortKey().priority();
250     if (!outer_->task_tracker_->CanRunPriority(priority) ||
251         (priority == TaskPriority::BEST_EFFORT &&
252          outer_->num_running_best_effort_tasks_ >=
253              outer_->max_best_effort_tasks_)) {
254       break;
255     }
256 
257     task_source = outer_->TakeRegisteredTaskSource(executor);
258   }
259   if (!task_source) {
260     OnWorkerBecomesIdleLockRequired(executor, worker);
261     return nullptr;
262   }
263 
264   // Running task bookkeeping.
265   outer_->IncrementTasksRunningLockRequired(priority);
266 
267   write_worker().current_task_priority = priority;
268   write_worker().current_shutdown_behavior = task_source->shutdown_behavior();
269 
270   return task_source;
271 }
272 
RecordUnnecessaryWakeupImpl()273 void ThreadGroup::ThreadGroupWorkerDelegate::RecordUnnecessaryWakeupImpl() {
274   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
275 
276   base::BooleanHistogram::FactoryGet(
277       std::string("ThreadPool.UnnecessaryWakeup.") + outer_->histogram_label_,
278       base::Histogram::kUmaTargetedHistogramFlag)
279       ->Add(true);
280 
281   TRACE_EVENT_INSTANT("wakeup.flow", "ThreadPool.UnnecessaryWakeup");
282 }
283 
OnMainEntryImpl(WorkerThread * worker)284 void ThreadGroup::ThreadGroupWorkerDelegate::OnMainEntryImpl(
285     WorkerThread* worker) {
286   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
287 
288   {
289 #if DCHECK_IS_ON()
290     CheckedAutoLock auto_lock(outer_->lock_);
291     DCHECK(
292         ContainsWorker(outer_->workers_, static_cast<WorkerThread*>(worker)));
293 #endif
294   }
295 
296 #if BUILDFLAG(IS_WIN)
297   worker_only().win_thread_environment = GetScopedWindowsThreadEnvironment(
298       outer_->after_start().worker_environment);
299 #endif  // BUILDFLAG(IS_WIN)
300 
301   PlatformThread::SetName(
302       StringPrintf("ThreadPool%sWorker", outer_->thread_group_label_.c_str()));
303 
304   outer_->BindToCurrentThread();
305   worker_only().worker_thread_ = static_cast<WorkerThread*>(worker);
306   SetBlockingObserverForCurrentThread(this);
307 
308   if (outer_->worker_started_for_testing_) {
309     // When |worker_started_for_testing_| is set, the thread that starts workers
310     // should wait for a worker to have started before starting the next one,
311     // and there should only be one thread that wakes up workers at a time.
312     DCHECK(!outer_->worker_started_for_testing_->IsSignaled());
313     outer_->worker_started_for_testing_->Signal();
314   }
315 }
316 
317 }  // namespace base::internal
318