xref: /aosp_15_r20/external/cronet/base/task/thread_pool/task_tracker.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/task_tracker.h"
6 
7 #include <atomic>
8 #include <optional>
9 #include <string>
10 #include <utility>
11 
12 #include "base/base_switches.h"
13 #include "base/command_line.h"
14 #include "base/compiler_specific.h"
15 #include "base/debug/alias.h"
16 #include "base/functional/callback.h"
17 #include "base/json/json_writer.h"
18 #include "base/logging.h"
19 #include "base/memory/ptr_util.h"
20 #include "base/metrics/histogram_macros.h"
21 #include "base/notreached.h"
22 #include "base/sequence_token.h"
23 #include "base/strings/string_util.h"
24 #include "base/synchronization/condition_variable.h"
25 #include "base/synchronization/waitable_event.h"
26 #include "base/task/scoped_set_task_priority_for_current_thread.h"
27 #include "base/task/sequenced_task_runner.h"
28 #include "base/task/single_thread_task_runner.h"
29 #include "base/task/thread_pool/job_task_source.h"
30 #include "base/task/thread_pool/task_source.h"
31 #include "base/threading/sequence_local_storage_map.h"
32 #include "base/threading/thread_restrictions.h"
33 #include "base/time/time.h"
34 #include "base/trace_event/base_tracing.h"
35 #include "base/values.h"
36 #include "build/build_config.h"
37 #include "third_party/abseil-cpp/absl/base/attributes.h"
38 
39 namespace base {
40 namespace internal {
41 
42 namespace {
43 
44 #if BUILDFLAG(ENABLE_BASE_TRACING)
45 using perfetto::protos::pbzero::ChromeThreadPoolTask;
46 using perfetto::protos::pbzero::ChromeTrackEvent;
47 #endif  // BUILDFLAG(ENABLE_BASE_TRACING)
48 
49 constexpr const char* kExecutionModeString[] = {"parallel", "sequenced",
50                                                 "single thread", "job"};
51 static_assert(
52     std::size(kExecutionModeString) ==
53         static_cast<size_t>(TaskSourceExecutionMode::kMax) + 1,
54     "Array kExecutionModeString is out of sync with TaskSourceExecutionMode.");
55 
HasLogBestEffortTasksSwitch()56 bool HasLogBestEffortTasksSwitch() {
57   // The CommandLine might not be initialized if ThreadPool is initialized in a
58   // dynamic library which doesn't have access to argc/argv.
59   return CommandLine::InitializedForCurrentProcess() &&
60          CommandLine::ForCurrentProcess()->HasSwitch(
61              switches::kLogBestEffortTasks);
62 }
63 
64 #if BUILDFLAG(ENABLE_BASE_TRACING)
TaskPriorityToProto(TaskPriority priority)65 ChromeThreadPoolTask::Priority TaskPriorityToProto(TaskPriority priority) {
66   switch (priority) {
67     case TaskPriority::BEST_EFFORT:
68       return ChromeThreadPoolTask::PRIORITY_BEST_EFFORT;
69     case TaskPriority::USER_VISIBLE:
70       return ChromeThreadPoolTask::PRIORITY_USER_VISIBLE;
71     case TaskPriority::USER_BLOCKING:
72       return ChromeThreadPoolTask::PRIORITY_USER_BLOCKING;
73   }
74 }
75 
ExecutionModeToProto(TaskSourceExecutionMode mode)76 ChromeThreadPoolTask::ExecutionMode ExecutionModeToProto(
77     TaskSourceExecutionMode mode) {
78   switch (mode) {
79     case TaskSourceExecutionMode::kParallel:
80       return ChromeThreadPoolTask::EXECUTION_MODE_PARALLEL;
81     case TaskSourceExecutionMode::kSequenced:
82       return ChromeThreadPoolTask::EXECUTION_MODE_SEQUENCED;
83     case TaskSourceExecutionMode::kSingleThread:
84       return ChromeThreadPoolTask::EXECUTION_MODE_SINGLE_THREAD;
85     case TaskSourceExecutionMode::kJob:
86       return ChromeThreadPoolTask::EXECUTION_MODE_JOB;
87   }
88 }
89 
ShutdownBehaviorToProto(TaskShutdownBehavior shutdown_behavior)90 ChromeThreadPoolTask::ShutdownBehavior ShutdownBehaviorToProto(
91     TaskShutdownBehavior shutdown_behavior) {
92   switch (shutdown_behavior) {
93     case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
94       return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_CONTINUE_ON_SHUTDOWN;
95     case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
96       return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_SKIP_ON_SHUTDOWN;
97     case TaskShutdownBehavior::BLOCK_SHUTDOWN:
98       return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_BLOCK_SHUTDOWN;
99   }
100 }
101 #endif  //  BUILDFLAG(ENABLE_BASE_TRACING)
102 
EmitThreadPoolTraceEventMetadata(perfetto::EventContext & ctx,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)103 auto EmitThreadPoolTraceEventMetadata(perfetto::EventContext& ctx,
104                                       const TaskTraits& traits,
105                                       TaskSource* task_source,
106                                       const SequenceToken& token) {
107 #if BUILDFLAG(ENABLE_BASE_TRACING)
108   // Other parameters are included only when "scheduler" category is enabled.
109   const uint8_t* scheduler_category_enabled =
110       TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("scheduler");
111 
112   if (!*scheduler_category_enabled)
113     return;
114   auto* task = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
115                    ->set_thread_pool_task();
116   task->set_task_priority(TaskPriorityToProto(traits.priority()));
117   task->set_execution_mode(ExecutionModeToProto(task_source->execution_mode()));
118   task->set_shutdown_behavior(
119       ShutdownBehaviorToProto(traits.shutdown_behavior()));
120   if (token.IsValid())
121     task->set_sequence_token(token.ToInternalValue());
122 #endif  //  BUILDFLAG(ENABLE_BASE_TRACING)
123 }
124 
125 // If this is greater than 0 on a given thread, it will ignore the DCHECK which
126 // prevents posting BLOCK_SHUTDOWN tasks after shutdown. There are cases where
127 // posting back to a BLOCK_SHUTDOWN sequence is a coincidence rather than part
128 // of a shutdown blocking series of tasks, this prevents racy DCHECKs in those
129 // cases.
130 ABSL_CONST_INIT thread_local int fizzle_block_shutdown_tasks_ref = 0;
131 
132 }  // namespace
133 
134 // Atomic internal state used by TaskTracker to track items that are blocking
135 // Shutdown. An "item" consist of either:
136 // - A running SKIP_ON_SHUTDOWN task
137 // - A queued/running BLOCK_SHUTDOWN TaskSource.
138 // Sequential consistency shouldn't be assumed from these calls (i.e. a thread
139 // reading |HasShutdownStarted() == true| isn't guaranteed to see all writes
140 // made before |StartShutdown()| on the thread that invoked it).
141 class TaskTracker::State {
142  public:
143   State() = default;
144   State(const State&) = delete;
145   State& operator=(const State&) = delete;
146 
147   // Sets a flag indicating that shutdown has started. Returns true if there are
148   // items blocking shutdown. Can only be called once.
StartShutdown()149   bool StartShutdown() {
150     const auto new_value =
151         subtle::NoBarrier_AtomicIncrement(&bits_, kShutdownHasStartedMask);
152 
153     // Check that the "shutdown has started" bit isn't zero. This would happen
154     // if it was incremented twice.
155     DCHECK(new_value & kShutdownHasStartedMask);
156 
157     const auto num_items_blocking_shutdown =
158         new_value >> kNumItemsBlockingShutdownBitOffset;
159     return num_items_blocking_shutdown != 0;
160   }
161 
162   // Returns true if shutdown has started.
HasShutdownStarted() const163   bool HasShutdownStarted() const {
164     return subtle::NoBarrier_Load(&bits_) & kShutdownHasStartedMask;
165   }
166 
167   // Returns true if there are items blocking shutdown.
AreItemsBlockingShutdown() const168   bool AreItemsBlockingShutdown() const {
169     const auto num_items_blocking_shutdown =
170         subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
171     DCHECK_GE(num_items_blocking_shutdown, 0);
172     return num_items_blocking_shutdown != 0;
173   }
174 
175   // Increments the number of items blocking shutdown. Returns true if
176   // shutdown has started.
IncrementNumItemsBlockingShutdown()177   bool IncrementNumItemsBlockingShutdown() {
178 #if DCHECK_IS_ON()
179     // Verify that no overflow will occur.
180     const auto num_items_blocking_shutdown =
181         subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
182     DCHECK_LT(num_items_blocking_shutdown,
183               std::numeric_limits<subtle::Atomic32>::max() -
184                   kNumItemsBlockingShutdownIncrement);
185 #endif
186 
187     const auto new_bits = subtle::NoBarrier_AtomicIncrement(
188         &bits_, kNumItemsBlockingShutdownIncrement);
189     return new_bits & kShutdownHasStartedMask;
190   }
191 
192   // Decrements the number of items blocking shutdown. Returns true if shutdown
193   // has started and the number of tasks blocking shutdown becomes zero.
DecrementNumItemsBlockingShutdown()194   bool DecrementNumItemsBlockingShutdown() {
195     const auto new_bits = subtle::NoBarrier_AtomicIncrement(
196         &bits_, -kNumItemsBlockingShutdownIncrement);
197     const bool shutdown_has_started = new_bits & kShutdownHasStartedMask;
198     const auto num_items_blocking_shutdown =
199         new_bits >> kNumItemsBlockingShutdownBitOffset;
200     DCHECK_GE(num_items_blocking_shutdown, 0);
201     return shutdown_has_started && num_items_blocking_shutdown == 0;
202   }
203 
204  private:
205   static constexpr subtle::Atomic32 kShutdownHasStartedMask = 1;
206   static constexpr subtle::Atomic32 kNumItemsBlockingShutdownBitOffset = 1;
207   static constexpr subtle::Atomic32 kNumItemsBlockingShutdownIncrement =
208       1 << kNumItemsBlockingShutdownBitOffset;
209 
210   // The LSB indicates whether shutdown has started. The other bits count the
211   // number of items blocking shutdown.
212   // No barriers are required to read/write |bits_| as this class is only used
213   // as an atomic state checker, it doesn't provide sequential consistency
214   // guarantees w.r.t. external state. Sequencing of the TaskTracker::State
215   // operations themselves is guaranteed by the AtomicIncrement RMW (read-
216   // modify-write) semantics however. For example, if two threads are racing to
217   // call IncrementNumItemsBlockingShutdown() and StartShutdown() respectively,
218   // either the first thread will win and the StartShutdown() call will see the
219   // blocking task or the second thread will win and
220   // IncrementNumItemsBlockingShutdown() will know that shutdown has started.
221   subtle::Atomic32 bits_ = 0;
222 };
223 
TaskTracker()224 TaskTracker::TaskTracker()
225     : has_log_best_effort_tasks_switch_(HasLogBestEffortTasksSwitch()),
226       state_(new State),
227       can_run_policy_(CanRunPolicy::kAll),
228       flush_cv_(flush_lock_.CreateConditionVariable()),
229       shutdown_lock_(&flush_lock_),
230       tracked_ref_factory_(this) {
231   // |flush_cv_| is only waited upon in FlushForTesting(), avoid instantiating a
232   // ScopedBlockingCallWithBaseSyncPrimitives from test threads intentionally
233   // idling themselves to wait on the ThreadPool.
234   flush_cv_.declare_only_used_while_idle();
235 }
236 
237 TaskTracker::~TaskTracker() = default;
238 
StartShutdown()239 void TaskTracker::StartShutdown() {
240   CheckedAutoLock auto_lock(shutdown_lock_);
241 
242   // This method can only be called once.
243   DCHECK(!shutdown_event_);
244   DCHECK(!state_->HasShutdownStarted());
245 
246   shutdown_event_ = std::make_unique<WaitableEvent>();
247 
248   const bool tasks_are_blocking_shutdown = state_->StartShutdown();
249 
250   // From now, if a thread causes the number of tasks blocking shutdown to
251   // become zero, it will call OnBlockingShutdownTasksComplete().
252 
253   if (!tasks_are_blocking_shutdown) {
254     // If another thread posts a BLOCK_SHUTDOWN task at this moment, it will
255     // block until this method releases |shutdown_lock_|. Then, it will fail
256     // DCHECK(!shutdown_event_->IsSignaled()). This is the desired behavior
257     // because posting a BLOCK_SHUTDOWN task after StartShutdown() when no
258     // tasks are blocking shutdown isn't allowed.
259     shutdown_event_->Signal();
260     return;
261   }
262 }
263 
CompleteShutdown()264 void TaskTracker::CompleteShutdown() {
265   // It is safe to access |shutdown_event_| without holding |lock_| because the
266   // pointer never changes after being set by StartShutdown(), which must
267   // happen-before this.
268   DCHECK(TS_UNCHECKED_READ(shutdown_event_));
269 
270   {
271     base::ScopedAllowBaseSyncPrimitives allow_wait;
272     // Allow tests to wait for and introduce logging about the shutdown tasks
273     // before we block this thread.
274     BeginCompleteShutdown(*TS_UNCHECKED_READ(shutdown_event_));
275     // Now block the thread until all tasks are done.
276     TS_UNCHECKED_READ(shutdown_event_)->Wait();
277   }
278 
279   // Unblock FlushForTesting() and perform the FlushAsyncForTesting callback
280   // when shutdown completes.
281   {
282     CheckedAutoLock auto_lock(flush_lock_);
283     flush_cv_.Broadcast();
284   }
285   InvokeFlushCallbacksForTesting();
286 }
287 
FlushForTesting()288 void TaskTracker::FlushForTesting() {
289   AssertFlushForTestingAllowed();
290   CheckedAutoLock auto_lock(flush_lock_);
291   while (num_incomplete_task_sources_.load(std::memory_order_acquire) != 0 &&
292          !IsShutdownComplete()) {
293     flush_cv_.Wait();
294   }
295 }
296 
FlushAsyncForTesting(OnceClosure flush_callback)297 void TaskTracker::FlushAsyncForTesting(OnceClosure flush_callback) {
298   DCHECK(flush_callback);
299   {
300     CheckedAutoLock auto_lock(flush_lock_);
301     flush_callbacks_for_testing_.push_back(std::move(flush_callback));
302   }
303 
304   if (num_incomplete_task_sources_.load(std::memory_order_acquire) == 0 ||
305       IsShutdownComplete()) {
306     InvokeFlushCallbacksForTesting();
307   }
308 }
309 
SetCanRunPolicy(CanRunPolicy can_run_policy)310 void TaskTracker::SetCanRunPolicy(CanRunPolicy can_run_policy) {
311   can_run_policy_.store(can_run_policy);
312 }
313 
WillEnqueueJob(JobTaskSource * task_source)314 void TaskTracker::WillEnqueueJob(JobTaskSource* task_source) {
315   task_source->WillEnqueue(sequence_nums_.GetNext(), task_annotator_);
316 }
317 
WillPostTask(Task * task,TaskShutdownBehavior shutdown_behavior)318 bool TaskTracker::WillPostTask(Task* task,
319                                TaskShutdownBehavior shutdown_behavior) {
320   DCHECK(task);
321   DCHECK(task->task);
322 
323   task->sequence_num = sequence_nums_.GetNext();
324   if (state_->HasShutdownStarted()) {
325     // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
326     // started and the task is not delayed.
327     if (shutdown_behavior != TaskShutdownBehavior::BLOCK_SHUTDOWN ||
328         !task->delayed_run_time.is_null() ||
329         fizzle_block_shutdown_tasks_ref > 0) {
330       return false;
331     }
332 
333     // A BLOCK_SHUTDOWN task posted after shutdown has completed is an ordering
334     // bug. This aims to catch those early. In some cases it's a racy
335     // coincidence (i.e. posting back to a BLOCK_SHUTDOWN sequence from a task
336     // that wasn't itself guaranteed to finish before shutdown), in those cases
337     // a ScopedFizzleBlockShutdownTasks can bump
338     // `fizzle_block_shutdown_tasks_ref` to bypass this DCHECK.
339     CheckedAutoLock auto_lock(shutdown_lock_);
340     DCHECK(shutdown_event_);
341     DCHECK(!shutdown_event_->IsSignaled())
342         << "posted_from: " << task->posted_from.ToString();
343   }
344 
345   // TODO(scheduler-dev): Record the task traits here.
346   task_annotator_.WillQueueTask("ThreadPool_PostTask", task);
347 
348   return true;
349 }
350 
WillPostTaskNow(const Task & task,TaskPriority priority) const351 bool TaskTracker::WillPostTaskNow(const Task& task,
352                                   TaskPriority priority) const {
353   // Delayed tasks's TaskShutdownBehavior is implicitly capped at
354   // SKIP_ON_SHUTDOWN. i.e. it cannot BLOCK_SHUTDOWN, TaskTracker will not wait
355   // for a delayed task in a BLOCK_SHUTDOWN TaskSource and will also skip
356   // delayed tasks that happen to become ripe during shutdown.
357   if (!task.delayed_run_time.is_null() && state_->HasShutdownStarted())
358     return false;
359 
360   if (has_log_best_effort_tasks_switch_ &&
361       priority == TaskPriority::BEST_EFFORT) {
362     // A TaskPriority::BEST_EFFORT task is being posted.
363     LOG(INFO) << task.posted_from.ToString();
364   }
365   return true;
366 }
367 
RegisterTaskSource(scoped_refptr<TaskSource> task_source)368 RegisteredTaskSource TaskTracker::RegisterTaskSource(
369     scoped_refptr<TaskSource> task_source) {
370   DCHECK(task_source);
371 
372   TaskShutdownBehavior shutdown_behavior = task_source->shutdown_behavior();
373   if (!BeforeQueueTaskSource(shutdown_behavior))
374     return nullptr;
375 
376   num_incomplete_task_sources_.fetch_add(1, std::memory_order_relaxed);
377   return RegisteredTaskSource(std::move(task_source), this);
378 }
379 
CanRunPriority(TaskPriority priority) const380 bool TaskTracker::CanRunPriority(TaskPriority priority) const {
381   auto can_run_policy = can_run_policy_.load();
382 
383   if (can_run_policy == CanRunPolicy::kAll)
384     return true;
385 
386   if (can_run_policy == CanRunPolicy::kForegroundOnly &&
387       priority >= TaskPriority::USER_VISIBLE) {
388     return true;
389   }
390 
391   return false;
392 }
393 
RunAndPopNextTask(RegisteredTaskSource task_source)394 RegisteredTaskSource TaskTracker::RunAndPopNextTask(
395     RegisteredTaskSource task_source) {
396   DCHECK(task_source);
397 
398   const bool should_run_tasks = BeforeRunTask(task_source->shutdown_behavior());
399 
400   // Run the next task in |task_source|.
401   std::optional<Task> task;
402   TaskTraits traits;
403   {
404     auto transaction = task_source->BeginTransaction();
405     task = should_run_tasks ? task_source.TakeTask(&transaction)
406                             : task_source.Clear(&transaction);
407     traits = transaction.traits();
408   }
409 
410   if (task) {
411     // Skip delayed tasks if shutdown started.
412     if (!task->delayed_run_time.is_null() && state_->HasShutdownStarted())
413       task->task = base::DoNothingWithBoundArgs(std::move(task->task));
414 
415     // Run the |task| (whether it's a worker task or the Clear() closure).
416     RunTask(std::move(task.value()), task_source.get(), traits);
417   }
418   if (should_run_tasks)
419     AfterRunTask(task_source->shutdown_behavior());
420 
421   const bool task_source_must_be_queued = task_source.DidProcessTask();
422   // |task_source| should be reenqueued iff requested by DidProcessTask().
423   if (task_source_must_be_queued)
424     return task_source;
425   return nullptr;
426 }
427 
HasShutdownStarted() const428 bool TaskTracker::HasShutdownStarted() const {
429   return state_->HasShutdownStarted();
430 }
431 
IsShutdownComplete() const432 bool TaskTracker::IsShutdownComplete() const {
433   CheckedAutoLock auto_lock(shutdown_lock_);
434   return shutdown_event_ && shutdown_event_->IsSignaled();
435 }
436 
BeginFizzlingBlockShutdownTasks()437 void TaskTracker::BeginFizzlingBlockShutdownTasks() {
438   ++fizzle_block_shutdown_tasks_ref;
439 }
440 
EndFizzlingBlockShutdownTasks()441 void TaskTracker::EndFizzlingBlockShutdownTasks() {
442   CHECK_GE(--fizzle_block_shutdown_tasks_ref, 0);
443 }
444 
RunTask(Task task,TaskSource * task_source,const TaskTraits & traits)445 void TaskTracker::RunTask(Task task,
446                           TaskSource* task_source,
447                           const TaskTraits& traits) {
448   DCHECK(task_source);
449 
450   const auto environment = task_source->GetExecutionEnvironment();
451 
452   struct BlockShutdownTaskFizzler {
453     BlockShutdownTaskFizzler() {
454       // Nothing outside RunTask should be bumping
455       // `fizzle_block_shutdown_tasks_ref`.
456       DCHECK_EQ(fizzle_block_shutdown_tasks_ref, 0);
457       ++fizzle_block_shutdown_tasks_ref;
458     }
459     ~BlockShutdownTaskFizzler() {
460       --fizzle_block_shutdown_tasks_ref;
461       // The refs should be balanced after running the task.
462       DCHECK_EQ(fizzle_block_shutdown_tasks_ref, 0);
463     }
464   };
465   std::optional<ScopedDisallowSingleton> disallow_singleton;
466   std::optional<ScopedDisallowBlocking> disallow_blocking;
467   std::optional<ScopedDisallowBaseSyncPrimitives> disallow_sync_primitives;
468   std::optional<BlockShutdownTaskFizzler> fizzle_block_shutdown_tasks;
469   if (traits.shutdown_behavior() ==
470       TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
471     disallow_singleton.emplace();
472     fizzle_block_shutdown_tasks.emplace();
473   }
474   if (!traits.may_block())
475     disallow_blocking.emplace();
476   if (!traits.with_base_sync_primitives())
477     disallow_sync_primitives.emplace();
478 
479   {
480     DCHECK(environment.token.IsValid());
481     TaskScope task_scope(environment.token,
482                          /* is_thread_bound=*/task_source->execution_mode() ==
483                              TaskSourceExecutionMode::kSingleThread);
484     ScopedSetTaskPriorityForCurrentThread
485         scoped_set_task_priority_for_current_thread(traits.priority());
486 
487     // Local storage map used if none is provided by |environment|.
488     std::optional<SequenceLocalStorageMap> local_storage_map;
489     if (!environment.sequence_local_storage)
490       local_storage_map.emplace();
491 
492     ScopedSetSequenceLocalStorageMapForCurrentThread
493         scoped_set_sequence_local_storage_map_for_current_thread(
494             environment.sequence_local_storage
495                 ? environment.sequence_local_storage
496                 : &local_storage_map.value());
497 
498     // Set up TaskRunner CurrentDefaultHandle as expected for the scope of the
499     // task.
500     std::optional<SequencedTaskRunner::CurrentDefaultHandle>
501         sequenced_task_runner_current_default_handle;
502     std::optional<SingleThreadTaskRunner::CurrentDefaultHandle>
503         single_thread_task_runner_current_default_handle;
504     if (environment.sequenced_task_runner) {
505       DCHECK_EQ(TaskSourceExecutionMode::kSequenced,
506                 task_source->execution_mode());
507       sequenced_task_runner_current_default_handle.emplace(
508           environment.sequenced_task_runner);
509     } else if (environment.single_thread_task_runner) {
510       DCHECK_EQ(TaskSourceExecutionMode::kSingleThread,
511                 task_source->execution_mode());
512       single_thread_task_runner_current_default_handle.emplace(
513           environment.single_thread_task_runner);
514     } else {
515       DCHECK_NE(TaskSourceExecutionMode::kSequenced,
516                 task_source->execution_mode());
517       DCHECK_NE(TaskSourceExecutionMode::kSingleThread,
518                 task_source->execution_mode());
519     }
520 
521     RunTaskWithShutdownBehavior(task, traits, task_source, environment.token);
522 
523     // Make sure the arguments bound to the callback are deleted within the
524     // scope in which the callback runs.
525     task.task = OnceClosure();
526   }
527 }
528 
BeginCompleteShutdown(base::WaitableEvent & shutdown_event)529 void TaskTracker::BeginCompleteShutdown(base::WaitableEvent& shutdown_event) {
530   // Do nothing in production, tests may override this.
531 }
532 
HasIncompleteTaskSourcesForTesting() const533 bool TaskTracker::HasIncompleteTaskSourcesForTesting() const {
534   return num_incomplete_task_sources_.load(std::memory_order_acquire) != 0;
535 }
536 
BeforeQueueTaskSource(TaskShutdownBehavior shutdown_behavior)537 bool TaskTracker::BeforeQueueTaskSource(
538     TaskShutdownBehavior shutdown_behavior) {
539   if (shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
540     // BLOCK_SHUTDOWN task sources block shutdown between the moment they are
541     // queued and the moment their last task completes its execution.
542     const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
543 
544     if (shutdown_started) {
545       // A BLOCK_SHUTDOWN task posted after shutdown has completed is an
546       // ordering bug. This aims to catch those early.
547       CheckedAutoLock auto_lock(shutdown_lock_);
548       DCHECK(shutdown_event_);
549       DCHECK(!shutdown_event_->IsSignaled());
550     }
551 
552     return true;
553   }
554 
555   // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
556   // started.
557   return !state_->HasShutdownStarted();
558 }
559 
BeforeRunTask(TaskShutdownBehavior shutdown_behavior)560 bool TaskTracker::BeforeRunTask(TaskShutdownBehavior shutdown_behavior) {
561   switch (shutdown_behavior) {
562     case TaskShutdownBehavior::BLOCK_SHUTDOWN: {
563       // The number of tasks blocking shutdown has been incremented when the
564       // task was posted.
565       DCHECK(state_->AreItemsBlockingShutdown());
566 
567       // Trying to run a BLOCK_SHUTDOWN task after shutdown has completed is
568       // unexpected as it either shouldn't have been posted if shutdown
569       // completed or should be blocking shutdown if it was posted before it
570       // did.
571       DCHECK(!state_->HasShutdownStarted() || !IsShutdownComplete());
572 
573       return true;
574     }
575 
576     case TaskShutdownBehavior::SKIP_ON_SHUTDOWN: {
577       // SKIP_ON_SHUTDOWN tasks block shutdown while they are running.
578       const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
579 
580       if (shutdown_started) {
581         // The SKIP_ON_SHUTDOWN task isn't allowed to run during shutdown.
582         // Decrement the number of tasks blocking shutdown that was wrongly
583         // incremented.
584         DecrementNumItemsBlockingShutdown();
585         return false;
586       }
587 
588       return true;
589     }
590 
591     case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN: {
592       return !state_->HasShutdownStarted();
593     }
594   }
595 
596   NOTREACHED();
597   return false;
598 }
599 
AfterRunTask(TaskShutdownBehavior shutdown_behavior)600 void TaskTracker::AfterRunTask(TaskShutdownBehavior shutdown_behavior) {
601   if (shutdown_behavior == TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
602     DecrementNumItemsBlockingShutdown();
603   }
604 }
605 
UnregisterTaskSource(scoped_refptr<TaskSource> task_source)606 scoped_refptr<TaskSource> TaskTracker::UnregisterTaskSource(
607     scoped_refptr<TaskSource> task_source) {
608   DCHECK(task_source);
609   if (task_source->shutdown_behavior() ==
610       TaskShutdownBehavior::BLOCK_SHUTDOWN) {
611     DecrementNumItemsBlockingShutdown();
612   }
613   DecrementNumIncompleteTaskSources();
614   return task_source;
615 }
616 
DecrementNumItemsBlockingShutdown()617 void TaskTracker::DecrementNumItemsBlockingShutdown() {
618   const bool shutdown_started_and_no_items_block_shutdown =
619       state_->DecrementNumItemsBlockingShutdown();
620   if (!shutdown_started_and_no_items_block_shutdown)
621     return;
622 
623   CheckedAutoLock auto_lock(shutdown_lock_);
624   DCHECK(shutdown_event_);
625   shutdown_event_->Signal();
626 }
627 
DecrementNumIncompleteTaskSources()628 void TaskTracker::DecrementNumIncompleteTaskSources() {
629   const auto prev_num_incomplete_task_sources =
630       num_incomplete_task_sources_.fetch_sub(1);
631   DCHECK_GE(prev_num_incomplete_task_sources, 1);
632   if (prev_num_incomplete_task_sources == 1) {
633     {
634       CheckedAutoLock auto_lock(flush_lock_);
635       flush_cv_.Broadcast();
636     }
637     InvokeFlushCallbacksForTesting();
638   }
639 }
640 
InvokeFlushCallbacksForTesting()641 void TaskTracker::InvokeFlushCallbacksForTesting() {
642   base::circular_deque<OnceClosure> flush_callbacks;
643   {
644     CheckedAutoLock auto_lock(flush_lock_);
645     flush_callbacks = std::move(flush_callbacks_for_testing_);
646   }
647   for (auto& flush_callback : flush_callbacks)
648     std::move(flush_callback).Run();
649 }
650 
RunContinueOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)651 NOINLINE void TaskTracker::RunContinueOnShutdown(Task& task,
652                                                  const TaskTraits& traits,
653                                                  TaskSource* task_source,
654                                                  const SequenceToken& token) {
655   NO_CODE_FOLDING();
656   RunTaskImpl(task, traits, task_source, token);
657 }
658 
RunSkipOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)659 NOINLINE void TaskTracker::RunSkipOnShutdown(Task& task,
660                                              const TaskTraits& traits,
661                                              TaskSource* task_source,
662                                              const SequenceToken& token) {
663   NO_CODE_FOLDING();
664   RunTaskImpl(task, traits, task_source, token);
665 }
666 
RunBlockShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)667 NOINLINE void TaskTracker::RunBlockShutdown(Task& task,
668                                             const TaskTraits& traits,
669                                             TaskSource* task_source,
670                                             const SequenceToken& token) {
671   NO_CODE_FOLDING();
672   RunTaskImpl(task, traits, task_source, token);
673 }
674 
RunTaskImpl(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)675 void TaskTracker::RunTaskImpl(Task& task,
676                               const TaskTraits& traits,
677                               TaskSource* task_source,
678                               const SequenceToken& token) {
679   task_annotator_.RunTask(
680       "ThreadPool_RunTask", task, [&](perfetto::EventContext& ctx) {
681         EmitThreadPoolTraceEventMetadata(ctx, traits, task_source, token);
682       });
683 }
684 
RunTaskWithShutdownBehavior(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)685 void TaskTracker::RunTaskWithShutdownBehavior(Task& task,
686                                               const TaskTraits& traits,
687                                               TaskSource* task_source,
688                                               const SequenceToken& token) {
689   switch (traits.shutdown_behavior()) {
690     case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
691       RunContinueOnShutdown(task, traits, task_source, token);
692       return;
693     case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
694       RunSkipOnShutdown(task, traits, task_source, token);
695       return;
696     case TaskShutdownBehavior::BLOCK_SHUTDOWN:
697       RunBlockShutdown(task, traits, task_source, token);
698       return;
699   }
700 }
701 
702 }  // namespace internal
703 }  // namespace base
704