xref: /aosp_15_r20/external/cronet/base/task/thread_pool/pooled_single_thread_task_runner_manager.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/pooled_single_thread_task_runner_manager.h"
6 
7 #include <memory>
8 #include <string>
9 #include <utility>
10 
11 #include "base/check.h"
12 #include "base/debug/leak_annotations.h"
13 #include "base/functional/bind.h"
14 #include "base/functional/callback.h"
15 #include "base/memory/ptr_util.h"
16 #include "base/memory/raw_ptr.h"
17 #include "base/message_loop/message_pump.h"
18 #include "base/ranges/algorithm.h"
19 #include "base/strings/stringprintf.h"
20 #include "base/synchronization/atomic_flag.h"
21 #include "base/task/default_delayed_task_handle_delegate.h"
22 #include "base/task/single_thread_task_runner.h"
23 #include "base/task/task_traits.h"
24 #include "base/task/thread_pool/delayed_task_manager.h"
25 #include "base/task/thread_pool/priority_queue.h"
26 #include "base/task/thread_pool/sequence.h"
27 #include "base/task/thread_pool/task.h"
28 #include "base/task/thread_pool/task_source.h"
29 #include "base/task/thread_pool/task_tracker.h"
30 #include "base/task/thread_pool/worker_thread_waitable_event.h"
31 #include "base/threading/platform_thread.h"
32 #include "base/time/time.h"
33 #include "build/build_config.h"
34 
35 #if BUILDFLAG(IS_WIN)
36 #include <windows.h>
37 
38 #include "base/win/scoped_com_initializer.h"
39 #endif  // BUILDFLAG(IS_WIN)
40 
41 namespace base {
42 namespace internal {
43 
44 namespace {
45 
46 // Boolean indicating whether there's a PooledSingleThreadTaskRunnerManager
47 // instance alive in this process. This variable should only be set when the
48 // PooledSingleThreadTaskRunnerManager instance is brought up (on the main
49 // thread; before any tasks are posted) and decremented when the instance is
50 // brought down (i.e., only when unit tests tear down the task environment and
51 // never in production). This makes the variable const while worker threads are
52 // up and as such it doesn't need to be atomic. It is used to tell when a task
53 // is posted from the main thread after the task environment was brought down in
54 // unit tests so that PooledSingleThreadTaskRunnerManager bound TaskRunners
55 // can return false on PostTask, letting such callers know they should complete
56 // necessary work synchronously. Note: |!g_manager_is_alive| is generally
57 // equivalent to |!ThreadPoolInstance::Get()| but has the advantage of being
58 // valid in thread_pool unit tests that don't instantiate a full
59 // thread pool.
60 bool g_manager_is_alive = false;
61 
62 bool g_use_utility_thread_group = false;
63 
GetEnvironmentIndexForTraits(const TaskTraits & traits)64 size_t GetEnvironmentIndexForTraits(const TaskTraits& traits) {
65   const bool is_background =
66       traits.priority() == TaskPriority::BEST_EFFORT &&
67       traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
68       CanUseBackgroundThreadTypeForWorkerThread();
69   const bool is_utility =
70       !is_background && traits.priority() <= TaskPriority::USER_VISIBLE &&
71       traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
72       g_use_utility_thread_group;
73   if (traits.may_block() || traits.with_base_sync_primitives()) {
74     return is_background ? BACKGROUND_BLOCKING
75            : is_utility  ? UTILITY_BLOCKING
76                          : FOREGROUND_BLOCKING;
77   }
78   return is_background ? BACKGROUND : is_utility ? UTILITY : FOREGROUND;
79 }
80 
81 // Allows for checking the PlatformThread::CurrentRef() against a set
82 // PlatformThreadRef atomically without using locks.
83 class AtomicThreadRefChecker {
84  public:
85   AtomicThreadRefChecker() = default;
86   AtomicThreadRefChecker(const AtomicThreadRefChecker&) = delete;
87   AtomicThreadRefChecker& operator=(const AtomicThreadRefChecker&) = delete;
88   ~AtomicThreadRefChecker() = default;
89 
Set()90   void Set() {
91     thread_ref_ = PlatformThread::CurrentRef();
92     is_set_.Set();
93   }
94 
IsCurrentThreadSameAsSetThread()95   bool IsCurrentThreadSameAsSetThread() {
96     return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef();
97   }
98 
99  private:
100   AtomicFlag is_set_;
101   PlatformThreadRef thread_ref_;
102 };
103 
104 class WorkerThreadDelegate : public WorkerThreadWaitableEvent::Delegate {
105  public:
WorkerThreadDelegate(const std::string & thread_name,WorkerThread::ThreadLabel thread_label,TrackedRef<TaskTracker> task_tracker)106   WorkerThreadDelegate(const std::string& thread_name,
107                        WorkerThread::ThreadLabel thread_label,
108                        TrackedRef<TaskTracker> task_tracker)
109       : task_tracker_(std::move(task_tracker)),
110         thread_name_(thread_name),
111         thread_label_(thread_label) {}
112   WorkerThreadDelegate(const WorkerThreadDelegate&) = delete;
113   WorkerThreadDelegate& operator=(const WorkerThreadDelegate&) = delete;
114 
set_worker(WorkerThreadWaitableEvent * worker)115   void set_worker(WorkerThreadWaitableEvent* worker) {
116     DCHECK(!worker_);
117     worker_ = worker;
118   }
119 
GetThreadLabel() const120   WorkerThread::ThreadLabel GetThreadLabel() const final {
121     return thread_label_;
122   }
123 
OnMainEntry(WorkerThread *)124   void OnMainEntry(WorkerThread* /* worker */) override {
125     thread_ref_checker_.Set();
126     PlatformThread::SetName(thread_name_);
127   }
128 
GetWork(WorkerThread * worker)129   RegisteredTaskSource GetWork(WorkerThread* worker) override {
130     CheckedAutoLock auto_lock(lock_);
131     DCHECK(worker_awake_);
132 
133     auto task_source = GetWorkLockRequired(worker);
134     if (!task_source) {
135       // The worker will sleep after this returns nullptr.
136       worker_awake_ = false;
137       return nullptr;
138     }
139     auto run_status = task_source.WillRunTask();
140     DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
141     return task_source;
142   }
143 
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)144   RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
145                                          WorkerThread* worker) override {
146     std::optional<RegisteredTaskSourceAndTransaction>
147         task_source_with_transaction;
148     if (task_source) {
149       task_source_with_transaction.emplace(
150           RegisteredTaskSourceAndTransaction::FromTaskSource(
151               std::move(task_source)));
152       task_source_with_transaction->task_source.WillReEnqueue(
153           TimeTicks::Now(), &task_source_with_transaction->transaction);
154     }
155     CheckedAutoLock auto_lock(lock_);
156     if (task_source_with_transaction.has_value()) {
157       EnqueueTaskSourceLockRequired(std::move(*task_source_with_transaction));
158     }
159 
160     // Calling WakeUp() guarantees that this WorkerThread will run Tasks from
161     // TaskSources returned by the GetWork() method of |delegate_| until it
162     // returns nullptr. Resetting |wake_up_event_| here doesn't break this
163     // invariant and avoids a useless loop iteration before going to sleep if
164     // WakeUp() is called while this WorkerThread is awake.
165     wake_up_event_.Reset();
166 
167     auto new_task_source = GetWorkLockRequired(worker);
168     if (!new_task_source) {
169       // The worker will sleep after this returns nullptr.
170       worker_awake_ = false;
171       return nullptr;
172     }
173     auto run_status = new_task_source.WillRunTask();
174     DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
175     return new_task_source;
176   }
177 
GetSleepTimeout()178   TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
179 
180   // `task_runner` isn't used but is forwarded to keep the task runner
181   // alive while the task is pending.
PostTaskNow(scoped_refptr<Sequence> sequence,scoped_refptr<SingleThreadTaskRunner> task_runner,Task task)182   bool PostTaskNow(scoped_refptr<Sequence> sequence,
183                    scoped_refptr<SingleThreadTaskRunner> task_runner,
184                    Task task) {
185     auto transaction = sequence->BeginTransaction();
186 
187     // |task| will be pushed to |sequence|, and |sequence| will be queued
188     // to |priority_queue_| iff |sequence_should_be_queued| is true.
189     const bool sequence_should_be_queued = transaction.WillPushImmediateTask();
190     RegisteredTaskSource task_source;
191     if (sequence_should_be_queued) {
192       task_source = task_tracker_->RegisterTaskSource(sequence);
193       // We shouldn't push |task| if we're not allowed to queue |task_source|.
194       if (!task_source)
195         return false;
196     }
197     if (!task_tracker_->WillPostTaskNow(task, transaction.traits().priority()))
198       return false;
199     transaction.PushImmediateTask(std::move(task));
200     if (task_source) {
201       bool should_wakeup;
202       {
203         CheckedAutoLock auto_lock(lock_);
204         should_wakeup = EnqueueTaskSourceLockRequired(
205             {std::move(task_source), std::move(transaction)});
206       }
207       if (should_wakeup) {
208         worker_->WakeUp();
209       }
210     }
211     return true;
212   }
213 
RunsTasksInCurrentSequence()214   bool RunsTasksInCurrentSequence() {
215     // We check the thread ref instead of the sequence for the benefit of COM
216     // callbacks which may execute without a sequence context.
217     return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
218   }
219 
OnMainExit(WorkerThread *)220   void OnMainExit(WorkerThread* /* worker */) override {}
221 
DidUpdateCanRunPolicy()222   void DidUpdateCanRunPolicy() {
223     bool should_wakeup = false;
224     {
225       CheckedAutoLock auto_lock(lock_);
226       if (!worker_awake_ && CanRunNextTaskSource()) {
227         should_wakeup = true;
228         worker_awake_ = true;
229       }
230     }
231     if (should_wakeup)
232       worker_->WakeUp();
233   }
234 
EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting()235   void EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting() {
236     CheckedAutoLock auto_lock(lock_);
237     priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
238   }
239 
240  protected:
GetWorkLockRequired(WorkerThread * worker)241   RegisteredTaskSource GetWorkLockRequired(WorkerThread* worker)
242       EXCLUSIVE_LOCKS_REQUIRED(lock_) {
243     if (!CanRunNextTaskSource()) {
244       return nullptr;
245     }
246     return priority_queue_.PopTaskSource();
247   }
248 
task_tracker()249   const TrackedRef<TaskTracker>& task_tracker() { return task_tracker_; }
250 
251   CheckedLock lock_;
252   bool worker_awake_ GUARDED_BY(lock_) = false;
253 
254   const TrackedRef<TaskTracker> task_tracker_;
255 
256  private:
257   // Enqueues a task source in this single-threaded worker's priority queue.
258   // Returns true iff the worker must wakeup, i.e. task source is allowed to run
259   // and the worker was not awake.
EnqueueTaskSourceLockRequired(RegisteredTaskSourceAndTransaction transaction_with_task_source)260   bool EnqueueTaskSourceLockRequired(
261       RegisteredTaskSourceAndTransaction transaction_with_task_source)
262       EXCLUSIVE_LOCKS_REQUIRED(lock_) {
263     auto sort_key = transaction_with_task_source.task_source->GetSortKey();
264     // When moving |task_source| into |priority_queue_|, it may be destroyed
265     // on another thread as soon as |lock_| is released, since we're no longer
266     // holding a reference to it. To prevent UAF, release |transaction| before
267     // moving |task_source|. Ref. crbug.com/1412008
268     transaction_with_task_source.transaction.Release();
269     priority_queue_.Push(std::move(transaction_with_task_source.task_source),
270                          sort_key);
271     if (!worker_awake_ && CanRunNextTaskSource()) {
272       worker_awake_ = true;
273       return true;
274     }
275     return false;
276   }
277 
CanRunNextTaskSource()278   bool CanRunNextTaskSource() EXCLUSIVE_LOCKS_REQUIRED(lock_) {
279     return !priority_queue_.IsEmpty() &&
280            task_tracker_->CanRunPriority(
281                priority_queue_.PeekSortKey().priority());
282   }
283 
284   const std::string thread_name_;
285   const WorkerThread::ThreadLabel thread_label_;
286 
287   // The WorkerThread that has |this| as a delegate. Must be set before
288   // starting or posting a task to the WorkerThread, because it's used in
289   // OnMainEntry() and PostTaskNow().
290   raw_ptr<WorkerThreadWaitableEvent> worker_ = nullptr;
291 
292   PriorityQueue priority_queue_ GUARDED_BY(lock_);
293 
294   AtomicThreadRefChecker thread_ref_checker_;
295 };
296 
297 #if BUILDFLAG(IS_WIN)
298 
299 class WorkerThreadCOMDelegate : public WorkerThreadDelegate {
300  public:
WorkerThreadCOMDelegate(const std::string & thread_name,WorkerThread::ThreadLabel thread_label,TrackedRef<TaskTracker> task_tracker)301   WorkerThreadCOMDelegate(const std::string& thread_name,
302                           WorkerThread::ThreadLabel thread_label,
303                           TrackedRef<TaskTracker> task_tracker)
304       : WorkerThreadDelegate(thread_name,
305                              thread_label,
306                              std::move(task_tracker)) {}
307 
308   WorkerThreadCOMDelegate(const WorkerThreadCOMDelegate&) = delete;
309   WorkerThreadCOMDelegate& operator=(const WorkerThreadCOMDelegate&) = delete;
~WorkerThreadCOMDelegate()310   ~WorkerThreadCOMDelegate() override { DCHECK(!scoped_com_initializer_); }
311 
312   // WorkerThreadWaitableEvent::Delegate:
OnMainEntry(WorkerThread * worker)313   void OnMainEntry(WorkerThread* worker) override {
314     WorkerThreadDelegate::OnMainEntry(worker);
315 
316     scoped_com_initializer_ = std::make_unique<win::ScopedCOMInitializer>();
317 
318     // CHECK to make sure this COM thread is initialized correctly in an STA.
319     CHECK(scoped_com_initializer_->Succeeded());
320   }
321 
GetWork(WorkerThread * worker)322   RegisteredTaskSource GetWork(WorkerThread* worker) override {
323     // This scheme below allows us to cover the following scenarios:
324     // * Only WorkerThreadDelegate::GetWork() has work:
325     //   Always return the task source from GetWork().
326     // * Only the Windows Message Queue has work:
327     //   Always return the task source from GetWorkFromWindowsMessageQueue();
328     // * Both WorkerThreadDelegate::GetWork() and the Windows Message Queue
329     //   have work:
330     //   Process task sources from each source round-robin style.
331     CheckedAutoLock auto_lock(lock_);
332 
333     // |worker_awake_| is always set before a call to WakeUp(), but it is
334     // not set when messages are added to the Windows Message Queue. Ensure that
335     // it is set before getting work, to avoid unnecessary wake ups.
336     //
337     // Note: It wouldn't be sufficient to set |worker_awake_| in WaitForWork()
338     // when MsgWaitForMultipleObjectsEx() indicates that it was woken up by a
339     // Windows Message, because of the following scenario:
340     //  T1: PostTask
341     //      Queue task
342     //      Set |worker_awake_| to true
343     //  T2: Woken up by a Windows Message
344     //      Set |worker_awake_| to true
345     //      Run the task posted by T1
346     //      Wait for work
347     //  T1: WakeUp()
348     //  T2: Woken up by Waitable Event
349     //      Does not set |worker_awake_| (wake up not from Windows Message)
350     //      GetWork
351     //      !! Getting work while |worker_awake_| is false !!
352     worker_awake_ = true;
353     RegisteredTaskSource task_source;
354     if (get_work_first_) {
355       task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
356       if (task_source)
357         get_work_first_ = false;
358     }
359 
360     if (!task_source) {
361       CheckedAutoUnlock auto_unlock(lock_);
362       task_source = GetWorkFromWindowsMessageQueue();
363       if (task_source)
364         get_work_first_ = true;
365     }
366 
367     if (!task_source && !get_work_first_) {
368       // This case is important if we checked the Windows Message Queue first
369       // and found there was no work. We don't want to return null immediately
370       // as that could cause the thread to go to sleep while work is waiting via
371       // WorkerThreadDelegate::GetWork().
372       task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
373     }
374     if (!task_source) {
375       // The worker will sleep after this returns nullptr.
376       worker_awake_ = false;
377       return nullptr;
378     }
379     auto run_status = task_source.WillRunTask();
380     DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
381     return task_source;
382   }
383 
OnMainExit(WorkerThread *)384   void OnMainExit(WorkerThread* /* worker */) override {
385     scoped_com_initializer_.reset();
386   }
387 
WaitForWork()388   void WaitForWork() override {
389     const TimeDelta sleep_time = GetSleepTimeout();
390     const DWORD milliseconds_wait = checked_cast<DWORD>(
391         sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds());
392     const HANDLE wake_up_event_handle = wake_up_event_.handle();
393     MsgWaitForMultipleObjectsEx(1, &wake_up_event_handle, milliseconds_wait,
394                                 QS_ALLINPUT, 0);
395   }
396 
397  private:
GetWorkFromWindowsMessageQueue()398   RegisteredTaskSource GetWorkFromWindowsMessageQueue() {
399     MSG msg;
400     if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) {
401       Task pump_message_task(FROM_HERE,
402                              BindOnce(
403                                  [](MSG msg) {
404                                    TranslateMessage(&msg);
405                                    DispatchMessage(&msg);
406                                  },
407                                  std::move(msg)),
408                              TimeTicks::Now(), TimeDelta());
409       if (task_tracker()->WillPostTask(
410               &pump_message_task, TaskShutdownBehavior::SKIP_ON_SHUTDOWN)) {
411         auto transaction = message_pump_sequence_->BeginTransaction();
412         const bool sequence_should_be_queued =
413             transaction.WillPushImmediateTask();
414         DCHECK(sequence_should_be_queued)
415             << "GetWorkFromWindowsMessageQueue() does not expect "
416                "queueing of pump tasks.";
417         auto registered_task_source = task_tracker_->RegisterTaskSource(
418             std::move(message_pump_sequence_));
419         if (!registered_task_source)
420           return nullptr;
421         transaction.PushImmediateTask(std::move(pump_message_task));
422         return registered_task_source;
423       } else {
424         // `pump_message_task`'s destructor may run sequence-affine code, so it
425         // must be leaked when `WillPostTask` returns false.
426         auto leak = std::make_unique<Task>(std::move(pump_message_task));
427         ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
428         leak.release();
429       }
430     }
431     return nullptr;
432   }
433 
434   bool get_work_first_ = true;
435   const scoped_refptr<Sequence> message_pump_sequence_ =
436       MakeRefCounted<Sequence>(TaskTraits{MayBlock()},
437                                nullptr,
438                                TaskSourceExecutionMode::kParallel);
439   std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
440 };
441 
442 #endif  // BUILDFLAG(IS_WIN)
443 
444 }  // namespace
445 
446 class PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner
447     : public SingleThreadTaskRunner {
448  public:
449   // Constructs a PooledSingleThreadTaskRunner that indirectly controls the
450   // lifetime of a dedicated |worker| for |traits|.
PooledSingleThreadTaskRunner(PooledSingleThreadTaskRunnerManager * const outer,const TaskTraits & traits,WorkerThreadWaitableEvent * worker,SingleThreadTaskRunnerThreadMode thread_mode)451   PooledSingleThreadTaskRunner(PooledSingleThreadTaskRunnerManager* const outer,
452                                const TaskTraits& traits,
453                                WorkerThreadWaitableEvent* worker,
454                                SingleThreadTaskRunnerThreadMode thread_mode)
455       : outer_(outer),
456         worker_(worker),
457         thread_mode_(thread_mode),
458         sequence_(
459             MakeRefCounted<Sequence>(traits,
460                                      this,
461                                      TaskSourceExecutionMode::kSingleThread)) {
462     DCHECK(outer_);
463     DCHECK(worker_);
464   }
465   PooledSingleThreadTaskRunner(const PooledSingleThreadTaskRunner&) = delete;
466   PooledSingleThreadTaskRunner& operator=(const PooledSingleThreadTaskRunner&) =
467       delete;
468 
469   // SingleThreadTaskRunner:
PostDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)470   bool PostDelayedTask(const Location& from_here,
471                        OnceClosure closure,
472                        TimeDelta delay) override {
473     if (!g_manager_is_alive)
474       return false;
475 
476     Task task(from_here, std::move(closure), TimeTicks::Now(), delay,
477               MessagePump::GetLeewayIgnoringThreadOverride());
478     return PostTask(std::move(task));
479   }
480 
PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,const Location & from_here,OnceClosure closure,TimeTicks delayed_run_time,subtle::DelayPolicy delay_policy)481   bool PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,
482                          const Location& from_here,
483                          OnceClosure closure,
484                          TimeTicks delayed_run_time,
485                          subtle::DelayPolicy delay_policy) override {
486     if (!g_manager_is_alive)
487       return false;
488 
489     Task task(from_here, std::move(closure), TimeTicks::Now(), delayed_run_time,
490               MessagePump::GetLeewayIgnoringThreadOverride(), delay_policy);
491     return PostTask(std::move(task));
492   }
493 
PostNonNestableDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)494   bool PostNonNestableDelayedTask(const Location& from_here,
495                                   OnceClosure closure,
496                                   TimeDelta delay) override {
497     // Tasks are never nested within the thread pool.
498     return PostDelayedTask(from_here, std::move(closure), delay);
499   }
500 
RunsTasksInCurrentSequence() const501   bool RunsTasksInCurrentSequence() const override {
502     if (!g_manager_is_alive)
503       return false;
504     return GetDelegate()->RunsTasksInCurrentSequence();
505   }
506 
507  private:
~PooledSingleThreadTaskRunner()508   ~PooledSingleThreadTaskRunner() override {
509     // Only unregister if this is a DEDICATED SingleThreadTaskRunner. SHARED
510     // task runner WorkerThreads are managed separately as they are reused.
511     // |g_manager_is_alive| avoids a use-after-free should this
512     // PooledSingleThreadTaskRunner outlive its manager. It is safe to access
513     // |g_manager_is_alive| without synchronization primitives as it is const
514     // for the lifetime of the manager and ~PooledSingleThreadTaskRunner()
515     // either happens prior to the end of JoinForTesting() (which happens-before
516     // manager's destruction) or on main thread after the task environment's
517     // entire destruction (which happens-after the manager's destruction). Yes,
518     // there's a theoretical use case where the last ref to this
519     // PooledSingleThreadTaskRunner is handed to a thread not controlled by
520     // thread_pool and that this ends up causing
521     // ~PooledSingleThreadTaskRunner() to race with
522     // ~PooledSingleThreadTaskRunnerManager() but this is intentionally not
523     // supported (and it doesn't matter in production where we leak the task
524     // environment for such reasons). TSan should catch this weird paradigm
525     // should anyone elect to use it in a unit test and the error would point
526     // here.
527     if (g_manager_is_alive &&
528         thread_mode_ == SingleThreadTaskRunnerThreadMode::DEDICATED) {
529       outer_->UnregisterWorkerThread(worker_);
530     }
531   }
532 
PostTask(Task task)533   bool PostTask(Task task) {
534     if (!outer_->task_tracker_->WillPostTask(&task,
535                                              sequence_->shutdown_behavior())) {
536       // `task`'s destructor may run sequence-affine code, so it must be leaked
537       // when `WillPostTask` returns false.
538       auto leak = std::make_unique<Task>(std::move(task));
539       ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
540       leak.release();
541       return false;
542     }
543 
544     if (task.delayed_run_time.is_null())
545       return GetDelegate()->PostTaskNow(sequence_, nullptr, std::move(task));
546 
547     // Unretained(GetDelegate()) is safe because this TaskRunner and its
548     // worker are kept alive as long as there are pending Tasks.
549     outer_->delayed_task_manager_->AddDelayedTask(
550         std::move(task),
551         BindOnce(IgnoreResult(&WorkerThreadDelegate::PostTaskNow),
552                  Unretained(GetDelegate()), sequence_,
553                  base::WrapRefCounted(this)));
554     return true;
555   }
556 
GetDelegate() const557   WorkerThreadDelegate* GetDelegate() const {
558     return static_cast<WorkerThreadDelegate*>(worker_->delegate());
559   }
560 
561   // Dangling but safe since use is controlled by `g_manager_is_alive`.
562   const raw_ptr<PooledSingleThreadTaskRunnerManager,
563                 DisableDanglingPtrDetection>
564       outer_;
565 
566   const raw_ptr<WorkerThreadWaitableEvent, AcrossTasksDanglingUntriaged>
567       worker_;
568   const SingleThreadTaskRunnerThreadMode thread_mode_;
569   const scoped_refptr<Sequence> sequence_;
570 };
571 
PooledSingleThreadTaskRunnerManager(TrackedRef<TaskTracker> task_tracker,DelayedTaskManager * delayed_task_manager)572 PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunnerManager(
573     TrackedRef<TaskTracker> task_tracker,
574     DelayedTaskManager* delayed_task_manager)
575     : task_tracker_(std::move(task_tracker)),
576       delayed_task_manager_(delayed_task_manager) {
577   DCHECK(task_tracker_);
578   DCHECK(delayed_task_manager_);
579 #if BUILDFLAG(IS_WIN)
580   static_assert(std::extent<decltype(shared_com_worker_threads_)>() ==
581                     std::extent<decltype(shared_worker_threads_)>(),
582                 "The size of |shared_com_worker_threads_| must match "
583                 "|shared_worker_threads_|");
584   static_assert(
585       std::extent<
586           std::remove_reference<decltype(shared_com_worker_threads_[0])>>() ==
587           std::extent<
588               std::remove_reference<decltype(shared_worker_threads_[0])>>(),
589       "The size of |shared_com_worker_threads_| must match "
590       "|shared_worker_threads_|");
591 #endif  // BUILDFLAG(IS_WIN)
592   DCHECK(!g_manager_is_alive);
593   g_manager_is_alive = true;
594 }
595 
~PooledSingleThreadTaskRunnerManager()596 PooledSingleThreadTaskRunnerManager::~PooledSingleThreadTaskRunnerManager() {
597   DCHECK(g_manager_is_alive);
598   g_manager_is_alive = false;
599   g_use_utility_thread_group = false;
600 }
601 
Start(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,WorkerThreadObserver * worker_thread_observer)602 void PooledSingleThreadTaskRunnerManager::Start(
603     scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
604     WorkerThreadObserver* worker_thread_observer) {
605   DCHECK(!worker_thread_observer_);
606   worker_thread_observer_ = worker_thread_observer;
607 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
608   DCHECK(io_thread_task_runner);
609   io_thread_task_runner_ = std::move(io_thread_task_runner);
610 #endif  // (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
611 
612   g_use_utility_thread_group = CanUseUtilityThreadTypeForWorkerThread() &&
613                                FeatureList::IsEnabled(kUseUtilityThreadGroup);
614 
615   decltype(workers_) workers_to_start;
616   {
617     CheckedAutoLock auto_lock(lock_);
618     started_ = true;
619     workers_to_start = workers_;
620   }
621 
622   // Start workers that were created before this method was called.
623   // Workers that already need to wake up are already signaled as part of
624   // PooledSingleThreadTaskRunner::PostTaskNow(). As a result, it's
625   // unnecessary to call WakeUp() for each worker (in fact, an extraneous
626   // WakeUp() would be racy and wrong - see https://crbug.com/862582).
627   for (scoped_refptr<WorkerThreadWaitableEvent> worker : workers_to_start) {
628     worker->Start(io_thread_task_runner_, worker_thread_observer_);
629   }
630 }
631 
DidUpdateCanRunPolicy()632 void PooledSingleThreadTaskRunnerManager::DidUpdateCanRunPolicy() {
633   decltype(workers_) workers_to_update;
634 
635   {
636     CheckedAutoLock auto_lock(lock_);
637     if (!started_)
638       return;
639     workers_to_update = workers_;
640   }
641   // Any worker created after the lock is released will see the latest
642   // CanRunPolicy if tasks are posted to it and thus doesn't need a
643   // DidUpdateCanRunPolicy() notification.
644   for (auto& worker : workers_to_update) {
645     static_cast<WorkerThreadDelegate*>(worker->delegate())
646         ->DidUpdateCanRunPolicy();
647   }
648 }
649 
650 scoped_refptr<SingleThreadTaskRunner>
CreateSingleThreadTaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)651 PooledSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunner(
652     const TaskTraits& traits,
653     SingleThreadTaskRunnerThreadMode thread_mode) {
654   return CreateTaskRunnerImpl<WorkerThreadDelegate>(traits, thread_mode);
655 }
656 
657 #if BUILDFLAG(IS_WIN)
658 scoped_refptr<SingleThreadTaskRunner>
CreateCOMSTATaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)659 PooledSingleThreadTaskRunnerManager::CreateCOMSTATaskRunner(
660     const TaskTraits& traits,
661     SingleThreadTaskRunnerThreadMode thread_mode) {
662   return CreateTaskRunnerImpl<WorkerThreadCOMDelegate>(traits, thread_mode);
663 }
664 #endif  // BUILDFLAG(IS_WIN)
665 
666 // static
667 PooledSingleThreadTaskRunnerManager::ContinueOnShutdown
TraitsToContinueOnShutdown(const TaskTraits & traits)668 PooledSingleThreadTaskRunnerManager::TraitsToContinueOnShutdown(
669     const TaskTraits& traits) {
670   if (traits.shutdown_behavior() == TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN)
671     return IS_CONTINUE_ON_SHUTDOWN;
672   return IS_NOT_CONTINUE_ON_SHUTDOWN;
673 }
674 
675 template <typename DelegateType>
676 scoped_refptr<PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner>
CreateTaskRunnerImpl(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)677 PooledSingleThreadTaskRunnerManager::CreateTaskRunnerImpl(
678     const TaskTraits& traits,
679     SingleThreadTaskRunnerThreadMode thread_mode) {
680   DCHECK(thread_mode != SingleThreadTaskRunnerThreadMode::SHARED ||
681          !traits.with_base_sync_primitives())
682       << "Using WithBaseSyncPrimitives() on a shared SingleThreadTaskRunner "
683          "may cause deadlocks. Either reevaluate your usage (e.g. use "
684          "SequencedTaskRunner) or use "
685          "SingleThreadTaskRunnerThreadMode::DEDICATED.";
686   // To simplify the code, |dedicated_worker| is a local only variable that
687   // allows the code to treat both the DEDICATED and SHARED cases similarly for
688   // SingleThreadTaskRunnerThreadMode. In DEDICATED, the scoped_refptr is backed
689   // by a local variable and in SHARED, the scoped_refptr is backed by a member
690   // variable.
691   WorkerThreadWaitableEvent* dedicated_worker = nullptr;
692   WorkerThreadWaitableEvent*& worker =
693       thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
694           ? dedicated_worker
695           : GetSharedWorkerThreadForTraits<DelegateType>(traits);
696   bool new_worker = false;
697   bool started;
698   {
699     CheckedAutoLock auto_lock(lock_);
700     if (!worker) {
701       const auto& environment_params =
702           kEnvironmentParams[GetEnvironmentIndexForTraits(traits)];
703       std::string worker_name;
704       if (thread_mode == SingleThreadTaskRunnerThreadMode::SHARED)
705         worker_name += "Shared";
706       worker_name += environment_params.name_suffix;
707       worker = CreateAndRegisterWorkerThread<DelegateType>(
708           worker_name, thread_mode, environment_params.thread_type_hint);
709       new_worker = true;
710     }
711     started = started_;
712   }
713 
714   if (new_worker && started)
715     worker->Start(io_thread_task_runner_, worker_thread_observer_);
716 
717   return MakeRefCounted<PooledSingleThreadTaskRunner>(this, traits, worker,
718                                                       thread_mode);
719 }
720 
JoinForTesting()721 void PooledSingleThreadTaskRunnerManager::JoinForTesting() {
722   decltype(workers_) local_workers;
723   {
724     CheckedAutoLock auto_lock(lock_);
725     local_workers = std::move(workers_);
726   }
727 
728   for (const auto& worker : local_workers) {
729     static_cast<WorkerThreadDelegate*>(worker->delegate())
730         ->EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting();
731     worker->JoinForTesting();
732   }
733 
734   {
735     CheckedAutoLock auto_lock(lock_);
736     DCHECK(workers_.empty())
737         << "New worker(s) unexpectedly registered during join.";
738     workers_ = std::move(local_workers);
739   }
740 
741   // Release shared WorkerThreads at the end so they get joined above. If
742   // this call happens before the joins, the WorkerThreads are effectively
743   // detached and may outlive the PooledSingleThreadTaskRunnerManager.
744   ReleaseSharedWorkerThreads();
745 }
746 
747 template <>
748 std::unique_ptr<WorkerThreadDelegate>
CreateWorkerThreadDelegate(const std::string & name,int id,SingleThreadTaskRunnerThreadMode thread_mode)749 PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
750     WorkerThreadDelegate>(const std::string& name,
751                           int id,
752                           SingleThreadTaskRunnerThreadMode thread_mode) {
753   return std::make_unique<WorkerThreadDelegate>(
754       StringPrintf("ThreadPoolSingleThread%s%d", name.c_str(), id),
755       thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
756           ? WorkerThread::ThreadLabel::DEDICATED
757           : WorkerThread::ThreadLabel::SHARED,
758       task_tracker_);
759 }
760 
761 #if BUILDFLAG(IS_WIN)
762 template <>
763 std::unique_ptr<WorkerThreadDelegate>
CreateWorkerThreadDelegate(const std::string & name,int id,SingleThreadTaskRunnerThreadMode thread_mode)764 PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
765     WorkerThreadCOMDelegate>(const std::string& name,
766                              int id,
767                              SingleThreadTaskRunnerThreadMode thread_mode) {
768   return std::make_unique<WorkerThreadCOMDelegate>(
769       StringPrintf("ThreadPoolSingleThreadCOMSTA%s%d", name.c_str(), id),
770       thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
771           ? WorkerThread::ThreadLabel::DEDICATED_COM
772           : WorkerThread::ThreadLabel::SHARED_COM,
773       task_tracker_);
774 }
775 #endif  // BUILDFLAG(IS_WIN)
776 
777 template <typename DelegateType>
778 WorkerThreadWaitableEvent*
CreateAndRegisterWorkerThread(const std::string & name,SingleThreadTaskRunnerThreadMode thread_mode,ThreadType thread_type_hint)779 PooledSingleThreadTaskRunnerManager::CreateAndRegisterWorkerThread(
780     const std::string& name,
781     SingleThreadTaskRunnerThreadMode thread_mode,
782     ThreadType thread_type_hint) {
783   int id = next_worker_id_++;
784   std::unique_ptr<WorkerThreadDelegate> delegate =
785       CreateWorkerThreadDelegate<DelegateType>(name, id, thread_mode);
786   WorkerThreadDelegate* delegate_raw = delegate.get();
787   scoped_refptr<WorkerThreadWaitableEvent> worker =
788       MakeRefCounted<WorkerThreadWaitableEvent>(thread_type_hint,
789                                                 std::move(delegate),
790                                                 task_tracker_, workers_.size());
791   delegate_raw->set_worker(worker.get());
792   workers_.emplace_back(std::move(worker));
793   return workers_.back().get();
794 }
795 
796 template <>
797 WorkerThreadWaitableEvent*&
GetSharedWorkerThreadForTraits(const TaskTraits & traits)798 PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
799     WorkerThreadDelegate>(const TaskTraits& traits) {
800   return shared_worker_threads_[GetEnvironmentIndexForTraits(traits)]
801                                [TraitsToContinueOnShutdown(traits)];
802 }
803 
804 #if BUILDFLAG(IS_WIN)
805 template <>
806 WorkerThreadWaitableEvent*&
GetSharedWorkerThreadForTraits(const TaskTraits & traits)807 PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
808     WorkerThreadCOMDelegate>(const TaskTraits& traits) {
809   return shared_com_worker_threads_[GetEnvironmentIndexForTraits(traits)]
810                                    [TraitsToContinueOnShutdown(traits)];
811 }
812 #endif  // BUILDFLAG(IS_WIN)
813 
UnregisterWorkerThread(WorkerThreadWaitableEvent * worker)814 void PooledSingleThreadTaskRunnerManager::UnregisterWorkerThread(
815     WorkerThreadWaitableEvent* worker) {
816   // Cleanup uses a CheckedLock, so call Cleanup() after releasing |lock_|.
817   scoped_refptr<WorkerThreadWaitableEvent> worker_to_destroy;
818   {
819     CheckedAutoLock auto_lock(lock_);
820 
821     // Skip when joining (the join logic takes care of the rest).
822     if (workers_.empty())
823       return;
824 
825     auto worker_iter = ranges::find(workers_, worker);
826     CHECK(worker_iter != workers_.end(), base::NotFatalUntil::M125);
827     worker_to_destroy = std::move(*worker_iter);
828     workers_.erase(worker_iter);
829   }
830   worker_to_destroy->Cleanup();
831 }
832 
ReleaseSharedWorkerThreads()833 void PooledSingleThreadTaskRunnerManager::ReleaseSharedWorkerThreads() {
834   decltype(shared_worker_threads_) local_shared_worker_threads;
835 #if BUILDFLAG(IS_WIN)
836   decltype(shared_com_worker_threads_) local_shared_com_worker_threads;
837 #endif
838   {
839     CheckedAutoLock auto_lock(lock_);
840     for (size_t i = 0; i < std::size(shared_worker_threads_); ++i) {
841       for (size_t j = 0; j < std::size(shared_worker_threads_[i]); ++j) {
842         local_shared_worker_threads[i][j] = shared_worker_threads_[i][j];
843         shared_worker_threads_[i][j] = nullptr;
844 #if BUILDFLAG(IS_WIN)
845         local_shared_com_worker_threads[i][j] =
846             shared_com_worker_threads_[i][j];
847         shared_com_worker_threads_[i][j] = nullptr;
848 #endif
849       }
850     }
851   }
852 
853   for (size_t i = 0; i < std::size(local_shared_worker_threads); ++i) {
854     for (size_t j = 0; j < std::size(local_shared_worker_threads[i]); ++j) {
855       if (local_shared_worker_threads[i][j])
856         UnregisterWorkerThread(local_shared_worker_threads[i][j]);
857 #if BUILDFLAG(IS_WIN)
858       if (local_shared_com_worker_threads[i][j])
859         UnregisterWorkerThread(local_shared_com_worker_threads[i][j]);
860 #endif
861     }
862   }
863 }
864 
865 }  // namespace internal
866 }  // namespace base
867