1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/task_tracker.h"
6
7 #include <atomic>
8 #include <optional>
9 #include <string>
10 #include <utility>
11
12 #include "base/base_switches.h"
13 #include "base/command_line.h"
14 #include "base/compiler_specific.h"
15 #include "base/debug/alias.h"
16 #include "base/functional/callback.h"
17 #include "base/json/json_writer.h"
18 #include "base/logging.h"
19 #include "base/memory/ptr_util.h"
20 #include "base/metrics/histogram_macros.h"
21 #include "base/notreached.h"
22 #include "base/sequence_token.h"
23 #include "base/strings/string_util.h"
24 #include "base/synchronization/condition_variable.h"
25 #include "base/synchronization/waitable_event.h"
26 #include "base/task/scoped_set_task_priority_for_current_thread.h"
27 #include "base/task/sequenced_task_runner.h"
28 #include "base/task/single_thread_task_runner.h"
29 #include "base/task/thread_pool/job_task_source.h"
30 #include "base/task/thread_pool/task_source.h"
31 #include "base/threading/sequence_local_storage_map.h"
32 #include "base/threading/thread_restrictions.h"
33 #include "base/time/time.h"
34 #include "base/trace_event/base_tracing.h"
35 #include "base/values.h"
36 #include "build/build_config.h"
37 #include "third_party/abseil-cpp/absl/base/attributes.h"
38
39 namespace base {
40 namespace internal {
41
42 namespace {
43
44 #if BUILDFLAG(ENABLE_BASE_TRACING)
45 using perfetto::protos::pbzero::ChromeThreadPoolTask;
46 using perfetto::protos::pbzero::ChromeTrackEvent;
47 #endif // BUILDFLAG(ENABLE_BASE_TRACING)
48
49 constexpr const char* kExecutionModeString[] = {"parallel", "sequenced",
50 "single thread", "job"};
51 static_assert(
52 std::size(kExecutionModeString) ==
53 static_cast<size_t>(TaskSourceExecutionMode::kMax) + 1,
54 "Array kExecutionModeString is out of sync with TaskSourceExecutionMode.");
55
HasLogBestEffortTasksSwitch()56 bool HasLogBestEffortTasksSwitch() {
57 // The CommandLine might not be initialized if ThreadPool is initialized in a
58 // dynamic library which doesn't have access to argc/argv.
59 return CommandLine::InitializedForCurrentProcess() &&
60 CommandLine::ForCurrentProcess()->HasSwitch(
61 switches::kLogBestEffortTasks);
62 }
63
64 #if BUILDFLAG(ENABLE_BASE_TRACING)
TaskPriorityToProto(TaskPriority priority)65 ChromeThreadPoolTask::Priority TaskPriorityToProto(TaskPriority priority) {
66 switch (priority) {
67 case TaskPriority::BEST_EFFORT:
68 return ChromeThreadPoolTask::PRIORITY_BEST_EFFORT;
69 case TaskPriority::USER_VISIBLE:
70 return ChromeThreadPoolTask::PRIORITY_USER_VISIBLE;
71 case TaskPriority::USER_BLOCKING:
72 return ChromeThreadPoolTask::PRIORITY_USER_BLOCKING;
73 }
74 }
75
ExecutionModeToProto(TaskSourceExecutionMode mode)76 ChromeThreadPoolTask::ExecutionMode ExecutionModeToProto(
77 TaskSourceExecutionMode mode) {
78 switch (mode) {
79 case TaskSourceExecutionMode::kParallel:
80 return ChromeThreadPoolTask::EXECUTION_MODE_PARALLEL;
81 case TaskSourceExecutionMode::kSequenced:
82 return ChromeThreadPoolTask::EXECUTION_MODE_SEQUENCED;
83 case TaskSourceExecutionMode::kSingleThread:
84 return ChromeThreadPoolTask::EXECUTION_MODE_SINGLE_THREAD;
85 case TaskSourceExecutionMode::kJob:
86 return ChromeThreadPoolTask::EXECUTION_MODE_JOB;
87 }
88 }
89
ShutdownBehaviorToProto(TaskShutdownBehavior shutdown_behavior)90 ChromeThreadPoolTask::ShutdownBehavior ShutdownBehaviorToProto(
91 TaskShutdownBehavior shutdown_behavior) {
92 switch (shutdown_behavior) {
93 case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
94 return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_CONTINUE_ON_SHUTDOWN;
95 case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
96 return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_SKIP_ON_SHUTDOWN;
97 case TaskShutdownBehavior::BLOCK_SHUTDOWN:
98 return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_BLOCK_SHUTDOWN;
99 }
100 }
101 #endif // BUILDFLAG(ENABLE_BASE_TRACING)
102
EmitThreadPoolTraceEventMetadata(perfetto::EventContext & ctx,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)103 auto EmitThreadPoolTraceEventMetadata(perfetto::EventContext& ctx,
104 const TaskTraits& traits,
105 TaskSource* task_source,
106 const SequenceToken& token) {
107 #if BUILDFLAG(ENABLE_BASE_TRACING)
108 // Other parameters are included only when "scheduler" category is enabled.
109 const uint8_t* scheduler_category_enabled =
110 TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("scheduler");
111
112 if (!*scheduler_category_enabled)
113 return;
114 auto* task = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
115 ->set_thread_pool_task();
116 task->set_task_priority(TaskPriorityToProto(traits.priority()));
117 task->set_execution_mode(ExecutionModeToProto(task_source->execution_mode()));
118 task->set_shutdown_behavior(
119 ShutdownBehaviorToProto(traits.shutdown_behavior()));
120 if (token.IsValid())
121 task->set_sequence_token(token.ToInternalValue());
122 #endif // BUILDFLAG(ENABLE_BASE_TRACING)
123 }
124
125 // If this is greater than 0 on a given thread, it will ignore the DCHECK which
126 // prevents posting BLOCK_SHUTDOWN tasks after shutdown. There are cases where
127 // posting back to a BLOCK_SHUTDOWN sequence is a coincidence rather than part
128 // of a shutdown blocking series of tasks, this prevents racy DCHECKs in those
129 // cases.
130 ABSL_CONST_INIT thread_local int fizzle_block_shutdown_tasks_ref = 0;
131
132 } // namespace
133
134 // Atomic internal state used by TaskTracker to track items that are blocking
135 // Shutdown. An "item" consist of either:
136 // - A running SKIP_ON_SHUTDOWN task
137 // - A queued/running BLOCK_SHUTDOWN TaskSource.
138 // Sequential consistency shouldn't be assumed from these calls (i.e. a thread
139 // reading |HasShutdownStarted() == true| isn't guaranteed to see all writes
140 // made before |StartShutdown()| on the thread that invoked it).
141 class TaskTracker::State {
142 public:
143 State() = default;
144 State(const State&) = delete;
145 State& operator=(const State&) = delete;
146
147 // Sets a flag indicating that shutdown has started. Returns true if there are
148 // items blocking shutdown. Can only be called once.
StartShutdown()149 bool StartShutdown() {
150 const auto new_value =
151 subtle::NoBarrier_AtomicIncrement(&bits_, kShutdownHasStartedMask);
152
153 // Check that the "shutdown has started" bit isn't zero. This would happen
154 // if it was incremented twice.
155 DCHECK(new_value & kShutdownHasStartedMask);
156
157 const auto num_items_blocking_shutdown =
158 new_value >> kNumItemsBlockingShutdownBitOffset;
159 return num_items_blocking_shutdown != 0;
160 }
161
162 // Returns true if shutdown has started.
HasShutdownStarted() const163 bool HasShutdownStarted() const {
164 return subtle::NoBarrier_Load(&bits_) & kShutdownHasStartedMask;
165 }
166
167 // Returns true if there are items blocking shutdown.
AreItemsBlockingShutdown() const168 bool AreItemsBlockingShutdown() const {
169 const auto num_items_blocking_shutdown =
170 subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
171 DCHECK_GE(num_items_blocking_shutdown, 0);
172 return num_items_blocking_shutdown != 0;
173 }
174
175 // Increments the number of items blocking shutdown. Returns true if
176 // shutdown has started.
IncrementNumItemsBlockingShutdown()177 bool IncrementNumItemsBlockingShutdown() {
178 #if DCHECK_IS_ON()
179 // Verify that no overflow will occur.
180 const auto num_items_blocking_shutdown =
181 subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
182 DCHECK_LT(num_items_blocking_shutdown,
183 std::numeric_limits<subtle::Atomic32>::max() -
184 kNumItemsBlockingShutdownIncrement);
185 #endif
186
187 const auto new_bits = subtle::NoBarrier_AtomicIncrement(
188 &bits_, kNumItemsBlockingShutdownIncrement);
189 return new_bits & kShutdownHasStartedMask;
190 }
191
192 // Decrements the number of items blocking shutdown. Returns true if shutdown
193 // has started and the number of tasks blocking shutdown becomes zero.
DecrementNumItemsBlockingShutdown()194 bool DecrementNumItemsBlockingShutdown() {
195 const auto new_bits = subtle::NoBarrier_AtomicIncrement(
196 &bits_, -kNumItemsBlockingShutdownIncrement);
197 const bool shutdown_has_started = new_bits & kShutdownHasStartedMask;
198 const auto num_items_blocking_shutdown =
199 new_bits >> kNumItemsBlockingShutdownBitOffset;
200 DCHECK_GE(num_items_blocking_shutdown, 0);
201 return shutdown_has_started && num_items_blocking_shutdown == 0;
202 }
203
204 private:
205 static constexpr subtle::Atomic32 kShutdownHasStartedMask = 1;
206 static constexpr subtle::Atomic32 kNumItemsBlockingShutdownBitOffset = 1;
207 static constexpr subtle::Atomic32 kNumItemsBlockingShutdownIncrement =
208 1 << kNumItemsBlockingShutdownBitOffset;
209
210 // The LSB indicates whether shutdown has started. The other bits count the
211 // number of items blocking shutdown.
212 // No barriers are required to read/write |bits_| as this class is only used
213 // as an atomic state checker, it doesn't provide sequential consistency
214 // guarantees w.r.t. external state. Sequencing of the TaskTracker::State
215 // operations themselves is guaranteed by the AtomicIncrement RMW (read-
216 // modify-write) semantics however. For example, if two threads are racing to
217 // call IncrementNumItemsBlockingShutdown() and StartShutdown() respectively,
218 // either the first thread will win and the StartShutdown() call will see the
219 // blocking task or the second thread will win and
220 // IncrementNumItemsBlockingShutdown() will know that shutdown has started.
221 subtle::Atomic32 bits_ = 0;
222 };
223
TaskTracker()224 TaskTracker::TaskTracker()
225 : has_log_best_effort_tasks_switch_(HasLogBestEffortTasksSwitch()),
226 state_(new State),
227 can_run_policy_(CanRunPolicy::kAll),
228 flush_cv_(flush_lock_.CreateConditionVariable()),
229 shutdown_lock_(&flush_lock_),
230 tracked_ref_factory_(this) {
231 // |flush_cv_| is only waited upon in FlushForTesting(), avoid instantiating a
232 // ScopedBlockingCallWithBaseSyncPrimitives from test threads intentionally
233 // idling themselves to wait on the ThreadPool.
234 flush_cv_.declare_only_used_while_idle();
235 }
236
237 TaskTracker::~TaskTracker() = default;
238
StartShutdown()239 void TaskTracker::StartShutdown() {
240 CheckedAutoLock auto_lock(shutdown_lock_);
241
242 // This method can only be called once.
243 DCHECK(!shutdown_event_);
244 DCHECK(!state_->HasShutdownStarted());
245
246 shutdown_event_ = std::make_unique<WaitableEvent>();
247
248 const bool tasks_are_blocking_shutdown = state_->StartShutdown();
249
250 // From now, if a thread causes the number of tasks blocking shutdown to
251 // become zero, it will call OnBlockingShutdownTasksComplete().
252
253 if (!tasks_are_blocking_shutdown) {
254 // If another thread posts a BLOCK_SHUTDOWN task at this moment, it will
255 // block until this method releases |shutdown_lock_|. Then, it will fail
256 // DCHECK(!shutdown_event_->IsSignaled()). This is the desired behavior
257 // because posting a BLOCK_SHUTDOWN task after StartShutdown() when no
258 // tasks are blocking shutdown isn't allowed.
259 shutdown_event_->Signal();
260 return;
261 }
262 }
263
CompleteShutdown()264 void TaskTracker::CompleteShutdown() {
265 // It is safe to access |shutdown_event_| without holding |lock_| because the
266 // pointer never changes after being set by StartShutdown(), which must
267 // happen-before this.
268 DCHECK(TS_UNCHECKED_READ(shutdown_event_));
269
270 {
271 base::ScopedAllowBaseSyncPrimitives allow_wait;
272 // Allow tests to wait for and introduce logging about the shutdown tasks
273 // before we block this thread.
274 BeginCompleteShutdown(*TS_UNCHECKED_READ(shutdown_event_));
275 // Now block the thread until all tasks are done.
276 TS_UNCHECKED_READ(shutdown_event_)->Wait();
277 }
278
279 // Unblock FlushForTesting() and perform the FlushAsyncForTesting callback
280 // when shutdown completes.
281 {
282 CheckedAutoLock auto_lock(flush_lock_);
283 flush_cv_.Broadcast();
284 }
285 InvokeFlushCallbacksForTesting();
286 }
287
FlushForTesting()288 void TaskTracker::FlushForTesting() {
289 AssertFlushForTestingAllowed();
290 CheckedAutoLock auto_lock(flush_lock_);
291 while (num_incomplete_task_sources_.load(std::memory_order_acquire) != 0 &&
292 !IsShutdownComplete()) {
293 flush_cv_.Wait();
294 }
295 }
296
FlushAsyncForTesting(OnceClosure flush_callback)297 void TaskTracker::FlushAsyncForTesting(OnceClosure flush_callback) {
298 DCHECK(flush_callback);
299 {
300 CheckedAutoLock auto_lock(flush_lock_);
301 flush_callbacks_for_testing_.push_back(std::move(flush_callback));
302 }
303
304 if (num_incomplete_task_sources_.load(std::memory_order_acquire) == 0 ||
305 IsShutdownComplete()) {
306 InvokeFlushCallbacksForTesting();
307 }
308 }
309
SetCanRunPolicy(CanRunPolicy can_run_policy)310 void TaskTracker::SetCanRunPolicy(CanRunPolicy can_run_policy) {
311 can_run_policy_.store(can_run_policy);
312 }
313
WillEnqueueJob(JobTaskSource * task_source)314 void TaskTracker::WillEnqueueJob(JobTaskSource* task_source) {
315 task_source->WillEnqueue(sequence_nums_.GetNext(), task_annotator_);
316 }
317
WillPostTask(Task * task,TaskShutdownBehavior shutdown_behavior)318 bool TaskTracker::WillPostTask(Task* task,
319 TaskShutdownBehavior shutdown_behavior) {
320 DCHECK(task);
321 DCHECK(task->task);
322
323 task->sequence_num = sequence_nums_.GetNext();
324 if (state_->HasShutdownStarted()) {
325 // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
326 // started and the task is not delayed.
327 if (shutdown_behavior != TaskShutdownBehavior::BLOCK_SHUTDOWN ||
328 !task->delayed_run_time.is_null() ||
329 fizzle_block_shutdown_tasks_ref > 0) {
330 return false;
331 }
332
333 // A BLOCK_SHUTDOWN task posted after shutdown has completed is an ordering
334 // bug. This aims to catch those early. In some cases it's a racy
335 // coincidence (i.e. posting back to a BLOCK_SHUTDOWN sequence from a task
336 // that wasn't itself guaranteed to finish before shutdown), in those cases
337 // a ScopedFizzleBlockShutdownTasks can bump
338 // `fizzle_block_shutdown_tasks_ref` to bypass this DCHECK.
339 CheckedAutoLock auto_lock(shutdown_lock_);
340 DCHECK(shutdown_event_);
341 DCHECK(!shutdown_event_->IsSignaled())
342 << "posted_from: " << task->posted_from.ToString();
343 }
344
345 // TODO(scheduler-dev): Record the task traits here.
346 task_annotator_.WillQueueTask("ThreadPool_PostTask", task);
347
348 return true;
349 }
350
WillPostTaskNow(const Task & task,TaskPriority priority) const351 bool TaskTracker::WillPostTaskNow(const Task& task,
352 TaskPriority priority) const {
353 // Delayed tasks's TaskShutdownBehavior is implicitly capped at
354 // SKIP_ON_SHUTDOWN. i.e. it cannot BLOCK_SHUTDOWN, TaskTracker will not wait
355 // for a delayed task in a BLOCK_SHUTDOWN TaskSource and will also skip
356 // delayed tasks that happen to become ripe during shutdown.
357 if (!task.delayed_run_time.is_null() && state_->HasShutdownStarted())
358 return false;
359
360 if (has_log_best_effort_tasks_switch_ &&
361 priority == TaskPriority::BEST_EFFORT) {
362 // A TaskPriority::BEST_EFFORT task is being posted.
363 LOG(INFO) << task.posted_from.ToString();
364 }
365 return true;
366 }
367
RegisterTaskSource(scoped_refptr<TaskSource> task_source)368 RegisteredTaskSource TaskTracker::RegisterTaskSource(
369 scoped_refptr<TaskSource> task_source) {
370 DCHECK(task_source);
371
372 TaskShutdownBehavior shutdown_behavior = task_source->shutdown_behavior();
373 if (!BeforeQueueTaskSource(shutdown_behavior))
374 return nullptr;
375
376 num_incomplete_task_sources_.fetch_add(1, std::memory_order_relaxed);
377 return RegisteredTaskSource(std::move(task_source), this);
378 }
379
CanRunPriority(TaskPriority priority) const380 bool TaskTracker::CanRunPriority(TaskPriority priority) const {
381 auto can_run_policy = can_run_policy_.load();
382
383 if (can_run_policy == CanRunPolicy::kAll)
384 return true;
385
386 if (can_run_policy == CanRunPolicy::kForegroundOnly &&
387 priority >= TaskPriority::USER_VISIBLE) {
388 return true;
389 }
390
391 return false;
392 }
393
RunAndPopNextTask(RegisteredTaskSource task_source)394 RegisteredTaskSource TaskTracker::RunAndPopNextTask(
395 RegisteredTaskSource task_source) {
396 DCHECK(task_source);
397
398 const bool should_run_tasks = BeforeRunTask(task_source->shutdown_behavior());
399
400 // Run the next task in |task_source|.
401 std::optional<Task> task;
402 TaskTraits traits;
403 {
404 auto transaction = task_source->BeginTransaction();
405 task = should_run_tasks ? task_source.TakeTask(&transaction)
406 : task_source.Clear(&transaction);
407 traits = transaction.traits();
408 }
409
410 if (task) {
411 // Skip delayed tasks if shutdown started.
412 if (!task->delayed_run_time.is_null() && state_->HasShutdownStarted())
413 task->task = base::DoNothingWithBoundArgs(std::move(task->task));
414
415 // Run the |task| (whether it's a worker task or the Clear() closure).
416 RunTask(std::move(task.value()), task_source.get(), traits);
417 }
418 if (should_run_tasks)
419 AfterRunTask(task_source->shutdown_behavior());
420
421 const bool task_source_must_be_queued = task_source.DidProcessTask();
422 // |task_source| should be reenqueued iff requested by DidProcessTask().
423 if (task_source_must_be_queued)
424 return task_source;
425 return nullptr;
426 }
427
HasShutdownStarted() const428 bool TaskTracker::HasShutdownStarted() const {
429 return state_->HasShutdownStarted();
430 }
431
IsShutdownComplete() const432 bool TaskTracker::IsShutdownComplete() const {
433 CheckedAutoLock auto_lock(shutdown_lock_);
434 return shutdown_event_ && shutdown_event_->IsSignaled();
435 }
436
BeginFizzlingBlockShutdownTasks()437 void TaskTracker::BeginFizzlingBlockShutdownTasks() {
438 ++fizzle_block_shutdown_tasks_ref;
439 }
440
EndFizzlingBlockShutdownTasks()441 void TaskTracker::EndFizzlingBlockShutdownTasks() {
442 CHECK_GE(--fizzle_block_shutdown_tasks_ref, 0);
443 }
444
RunTask(Task task,TaskSource * task_source,const TaskTraits & traits)445 void TaskTracker::RunTask(Task task,
446 TaskSource* task_source,
447 const TaskTraits& traits) {
448 DCHECK(task_source);
449
450 const auto environment = task_source->GetExecutionEnvironment();
451
452 struct BlockShutdownTaskFizzler {
453 BlockShutdownTaskFizzler() {
454 // Nothing outside RunTask should be bumping
455 // `fizzle_block_shutdown_tasks_ref`.
456 DCHECK_EQ(fizzle_block_shutdown_tasks_ref, 0);
457 ++fizzle_block_shutdown_tasks_ref;
458 }
459 ~BlockShutdownTaskFizzler() {
460 --fizzle_block_shutdown_tasks_ref;
461 // The refs should be balanced after running the task.
462 DCHECK_EQ(fizzle_block_shutdown_tasks_ref, 0);
463 }
464 };
465 std::optional<ScopedDisallowSingleton> disallow_singleton;
466 std::optional<ScopedDisallowBlocking> disallow_blocking;
467 std::optional<ScopedDisallowBaseSyncPrimitives> disallow_sync_primitives;
468 std::optional<BlockShutdownTaskFizzler> fizzle_block_shutdown_tasks;
469 if (traits.shutdown_behavior() ==
470 TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
471 disallow_singleton.emplace();
472 fizzle_block_shutdown_tasks.emplace();
473 }
474 if (!traits.may_block())
475 disallow_blocking.emplace();
476 if (!traits.with_base_sync_primitives())
477 disallow_sync_primitives.emplace();
478
479 {
480 DCHECK(environment.token.IsValid());
481 TaskScope task_scope(environment.token,
482 /* is_thread_bound=*/task_source->execution_mode() ==
483 TaskSourceExecutionMode::kSingleThread);
484 ScopedSetTaskPriorityForCurrentThread
485 scoped_set_task_priority_for_current_thread(traits.priority());
486
487 // Local storage map used if none is provided by |environment|.
488 std::optional<SequenceLocalStorageMap> local_storage_map;
489 if (!environment.sequence_local_storage)
490 local_storage_map.emplace();
491
492 ScopedSetSequenceLocalStorageMapForCurrentThread
493 scoped_set_sequence_local_storage_map_for_current_thread(
494 environment.sequence_local_storage
495 ? environment.sequence_local_storage
496 : &local_storage_map.value());
497
498 // Set up TaskRunner CurrentDefaultHandle as expected for the scope of the
499 // task.
500 std::optional<SequencedTaskRunner::CurrentDefaultHandle>
501 sequenced_task_runner_current_default_handle;
502 std::optional<SingleThreadTaskRunner::CurrentDefaultHandle>
503 single_thread_task_runner_current_default_handle;
504 if (environment.sequenced_task_runner) {
505 DCHECK_EQ(TaskSourceExecutionMode::kSequenced,
506 task_source->execution_mode());
507 sequenced_task_runner_current_default_handle.emplace(
508 environment.sequenced_task_runner);
509 } else if (environment.single_thread_task_runner) {
510 DCHECK_EQ(TaskSourceExecutionMode::kSingleThread,
511 task_source->execution_mode());
512 single_thread_task_runner_current_default_handle.emplace(
513 environment.single_thread_task_runner);
514 } else {
515 DCHECK_NE(TaskSourceExecutionMode::kSequenced,
516 task_source->execution_mode());
517 DCHECK_NE(TaskSourceExecutionMode::kSingleThread,
518 task_source->execution_mode());
519 }
520
521 RunTaskWithShutdownBehavior(task, traits, task_source, environment.token);
522
523 // Make sure the arguments bound to the callback are deleted within the
524 // scope in which the callback runs.
525 task.task = OnceClosure();
526 }
527 }
528
BeginCompleteShutdown(base::WaitableEvent & shutdown_event)529 void TaskTracker::BeginCompleteShutdown(base::WaitableEvent& shutdown_event) {
530 // Do nothing in production, tests may override this.
531 }
532
HasIncompleteTaskSourcesForTesting() const533 bool TaskTracker::HasIncompleteTaskSourcesForTesting() const {
534 return num_incomplete_task_sources_.load(std::memory_order_acquire) != 0;
535 }
536
BeforeQueueTaskSource(TaskShutdownBehavior shutdown_behavior)537 bool TaskTracker::BeforeQueueTaskSource(
538 TaskShutdownBehavior shutdown_behavior) {
539 if (shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
540 // BLOCK_SHUTDOWN task sources block shutdown between the moment they are
541 // queued and the moment their last task completes its execution.
542 const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
543
544 if (shutdown_started) {
545 // A BLOCK_SHUTDOWN task posted after shutdown has completed is an
546 // ordering bug. This aims to catch those early.
547 CheckedAutoLock auto_lock(shutdown_lock_);
548 DCHECK(shutdown_event_);
549 DCHECK(!shutdown_event_->IsSignaled());
550 }
551
552 return true;
553 }
554
555 // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
556 // started.
557 return !state_->HasShutdownStarted();
558 }
559
BeforeRunTask(TaskShutdownBehavior shutdown_behavior)560 bool TaskTracker::BeforeRunTask(TaskShutdownBehavior shutdown_behavior) {
561 switch (shutdown_behavior) {
562 case TaskShutdownBehavior::BLOCK_SHUTDOWN: {
563 // The number of tasks blocking shutdown has been incremented when the
564 // task was posted.
565 DCHECK(state_->AreItemsBlockingShutdown());
566
567 // Trying to run a BLOCK_SHUTDOWN task after shutdown has completed is
568 // unexpected as it either shouldn't have been posted if shutdown
569 // completed or should be blocking shutdown if it was posted before it
570 // did.
571 DCHECK(!state_->HasShutdownStarted() || !IsShutdownComplete());
572
573 return true;
574 }
575
576 case TaskShutdownBehavior::SKIP_ON_SHUTDOWN: {
577 // SKIP_ON_SHUTDOWN tasks block shutdown while they are running.
578 const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
579
580 if (shutdown_started) {
581 // The SKIP_ON_SHUTDOWN task isn't allowed to run during shutdown.
582 // Decrement the number of tasks blocking shutdown that was wrongly
583 // incremented.
584 DecrementNumItemsBlockingShutdown();
585 return false;
586 }
587
588 return true;
589 }
590
591 case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN: {
592 return !state_->HasShutdownStarted();
593 }
594 }
595
596 NOTREACHED();
597 return false;
598 }
599
AfterRunTask(TaskShutdownBehavior shutdown_behavior)600 void TaskTracker::AfterRunTask(TaskShutdownBehavior shutdown_behavior) {
601 if (shutdown_behavior == TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
602 DecrementNumItemsBlockingShutdown();
603 }
604 }
605
UnregisterTaskSource(scoped_refptr<TaskSource> task_source)606 scoped_refptr<TaskSource> TaskTracker::UnregisterTaskSource(
607 scoped_refptr<TaskSource> task_source) {
608 DCHECK(task_source);
609 if (task_source->shutdown_behavior() ==
610 TaskShutdownBehavior::BLOCK_SHUTDOWN) {
611 DecrementNumItemsBlockingShutdown();
612 }
613 DecrementNumIncompleteTaskSources();
614 return task_source;
615 }
616
DecrementNumItemsBlockingShutdown()617 void TaskTracker::DecrementNumItemsBlockingShutdown() {
618 const bool shutdown_started_and_no_items_block_shutdown =
619 state_->DecrementNumItemsBlockingShutdown();
620 if (!shutdown_started_and_no_items_block_shutdown)
621 return;
622
623 CheckedAutoLock auto_lock(shutdown_lock_);
624 DCHECK(shutdown_event_);
625 shutdown_event_->Signal();
626 }
627
DecrementNumIncompleteTaskSources()628 void TaskTracker::DecrementNumIncompleteTaskSources() {
629 const auto prev_num_incomplete_task_sources =
630 num_incomplete_task_sources_.fetch_sub(1);
631 DCHECK_GE(prev_num_incomplete_task_sources, 1);
632 if (prev_num_incomplete_task_sources == 1) {
633 {
634 CheckedAutoLock auto_lock(flush_lock_);
635 flush_cv_.Broadcast();
636 }
637 InvokeFlushCallbacksForTesting();
638 }
639 }
640
InvokeFlushCallbacksForTesting()641 void TaskTracker::InvokeFlushCallbacksForTesting() {
642 base::circular_deque<OnceClosure> flush_callbacks;
643 {
644 CheckedAutoLock auto_lock(flush_lock_);
645 flush_callbacks = std::move(flush_callbacks_for_testing_);
646 }
647 for (auto& flush_callback : flush_callbacks)
648 std::move(flush_callback).Run();
649 }
650
RunContinueOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)651 NOINLINE void TaskTracker::RunContinueOnShutdown(Task& task,
652 const TaskTraits& traits,
653 TaskSource* task_source,
654 const SequenceToken& token) {
655 NO_CODE_FOLDING();
656 RunTaskImpl(task, traits, task_source, token);
657 }
658
RunSkipOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)659 NOINLINE void TaskTracker::RunSkipOnShutdown(Task& task,
660 const TaskTraits& traits,
661 TaskSource* task_source,
662 const SequenceToken& token) {
663 NO_CODE_FOLDING();
664 RunTaskImpl(task, traits, task_source, token);
665 }
666
RunBlockShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)667 NOINLINE void TaskTracker::RunBlockShutdown(Task& task,
668 const TaskTraits& traits,
669 TaskSource* task_source,
670 const SequenceToken& token) {
671 NO_CODE_FOLDING();
672 RunTaskImpl(task, traits, task_source, token);
673 }
674
RunTaskImpl(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)675 void TaskTracker::RunTaskImpl(Task& task,
676 const TaskTraits& traits,
677 TaskSource* task_source,
678 const SequenceToken& token) {
679 task_annotator_.RunTask(
680 "ThreadPool_RunTask", task, [&](perfetto::EventContext& ctx) {
681 EmitThreadPoolTraceEventMetadata(ctx, traits, task_source, token);
682 });
683 }
684
RunTaskWithShutdownBehavior(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)685 void TaskTracker::RunTaskWithShutdownBehavior(Task& task,
686 const TaskTraits& traits,
687 TaskSource* task_source,
688 const SequenceToken& token) {
689 switch (traits.shutdown_behavior()) {
690 case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
691 RunContinueOnShutdown(task, traits, task_source, token);
692 return;
693 case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
694 RunSkipOnShutdown(task, traits, task_source, token);
695 return;
696 case TaskShutdownBehavior::BLOCK_SHUTDOWN:
697 RunBlockShutdown(task, traits, task_source, token);
698 return;
699 }
700 }
701
702 } // namespace internal
703 } // namespace base
704