1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/pooled_single_thread_task_runner_manager.h"
6
7 #include <memory>
8 #include <string>
9 #include <utility>
10
11 #include "base/check.h"
12 #include "base/debug/leak_annotations.h"
13 #include "base/functional/bind.h"
14 #include "base/functional/callback.h"
15 #include "base/memory/ptr_util.h"
16 #include "base/memory/raw_ptr.h"
17 #include "base/message_loop/message_pump.h"
18 #include "base/ranges/algorithm.h"
19 #include "base/strings/stringprintf.h"
20 #include "base/synchronization/atomic_flag.h"
21 #include "base/task/default_delayed_task_handle_delegate.h"
22 #include "base/task/single_thread_task_runner.h"
23 #include "base/task/task_traits.h"
24 #include "base/task/thread_pool/delayed_task_manager.h"
25 #include "base/task/thread_pool/priority_queue.h"
26 #include "base/task/thread_pool/sequence.h"
27 #include "base/task/thread_pool/task.h"
28 #include "base/task/thread_pool/task_source.h"
29 #include "base/task/thread_pool/task_tracker.h"
30 #include "base/task/thread_pool/worker_thread_waitable_event.h"
31 #include "base/threading/platform_thread.h"
32 #include "base/time/time.h"
33 #include "build/build_config.h"
34
35 #if BUILDFLAG(IS_WIN)
36 #include <windows.h>
37
38 #include "base/win/scoped_com_initializer.h"
39 #endif // BUILDFLAG(IS_WIN)
40
41 namespace base {
42 namespace internal {
43
44 namespace {
45
46 // Boolean indicating whether there's a PooledSingleThreadTaskRunnerManager
47 // instance alive in this process. This variable should only be set when the
48 // PooledSingleThreadTaskRunnerManager instance is brought up (on the main
49 // thread; before any tasks are posted) and decremented when the instance is
50 // brought down (i.e., only when unit tests tear down the task environment and
51 // never in production). This makes the variable const while worker threads are
52 // up and as such it doesn't need to be atomic. It is used to tell when a task
53 // is posted from the main thread after the task environment was brought down in
54 // unit tests so that PooledSingleThreadTaskRunnerManager bound TaskRunners
55 // can return false on PostTask, letting such callers know they should complete
56 // necessary work synchronously. Note: |!g_manager_is_alive| is generally
57 // equivalent to |!ThreadPoolInstance::Get()| but has the advantage of being
58 // valid in thread_pool unit tests that don't instantiate a full
59 // thread pool.
60 bool g_manager_is_alive = false;
61
62 bool g_use_utility_thread_group = false;
63
GetEnvironmentIndexForTraits(const TaskTraits & traits)64 size_t GetEnvironmentIndexForTraits(const TaskTraits& traits) {
65 const bool is_background =
66 traits.priority() == TaskPriority::BEST_EFFORT &&
67 traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
68 CanUseBackgroundThreadTypeForWorkerThread();
69 const bool is_utility =
70 !is_background && traits.priority() <= TaskPriority::USER_VISIBLE &&
71 traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
72 g_use_utility_thread_group;
73 if (traits.may_block() || traits.with_base_sync_primitives()) {
74 return is_background ? BACKGROUND_BLOCKING
75 : is_utility ? UTILITY_BLOCKING
76 : FOREGROUND_BLOCKING;
77 }
78 return is_background ? BACKGROUND : is_utility ? UTILITY : FOREGROUND;
79 }
80
81 // Allows for checking the PlatformThread::CurrentRef() against a set
82 // PlatformThreadRef atomically without using locks.
83 class AtomicThreadRefChecker {
84 public:
85 AtomicThreadRefChecker() = default;
86 AtomicThreadRefChecker(const AtomicThreadRefChecker&) = delete;
87 AtomicThreadRefChecker& operator=(const AtomicThreadRefChecker&) = delete;
88 ~AtomicThreadRefChecker() = default;
89
Set()90 void Set() {
91 thread_ref_ = PlatformThread::CurrentRef();
92 is_set_.Set();
93 }
94
IsCurrentThreadSameAsSetThread()95 bool IsCurrentThreadSameAsSetThread() {
96 return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef();
97 }
98
99 private:
100 AtomicFlag is_set_;
101 PlatformThreadRef thread_ref_;
102 };
103
104 class WorkerThreadDelegate : public WorkerThreadWaitableEvent::Delegate {
105 public:
WorkerThreadDelegate(const std::string & thread_name,WorkerThread::ThreadLabel thread_label,TrackedRef<TaskTracker> task_tracker)106 WorkerThreadDelegate(const std::string& thread_name,
107 WorkerThread::ThreadLabel thread_label,
108 TrackedRef<TaskTracker> task_tracker)
109 : task_tracker_(std::move(task_tracker)),
110 thread_name_(thread_name),
111 thread_label_(thread_label) {}
112 WorkerThreadDelegate(const WorkerThreadDelegate&) = delete;
113 WorkerThreadDelegate& operator=(const WorkerThreadDelegate&) = delete;
114
set_worker(WorkerThreadWaitableEvent * worker)115 void set_worker(WorkerThreadWaitableEvent* worker) {
116 DCHECK(!worker_);
117 worker_ = worker;
118 }
119
GetThreadLabel() const120 WorkerThread::ThreadLabel GetThreadLabel() const final {
121 return thread_label_;
122 }
123
OnMainEntry(WorkerThread *)124 void OnMainEntry(WorkerThread* /* worker */) override {
125 thread_ref_checker_.Set();
126 PlatformThread::SetName(thread_name_);
127 }
128
GetWork(WorkerThread * worker)129 RegisteredTaskSource GetWork(WorkerThread* worker) override {
130 CheckedAutoLock auto_lock(lock_);
131 DCHECK(worker_awake_);
132
133 auto task_source = GetWorkLockRequired(worker);
134 if (!task_source) {
135 // The worker will sleep after this returns nullptr.
136 worker_awake_ = false;
137 return nullptr;
138 }
139 auto run_status = task_source.WillRunTask();
140 DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
141 return task_source;
142 }
143
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)144 RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
145 WorkerThread* worker) override {
146 std::optional<RegisteredTaskSourceAndTransaction>
147 task_source_with_transaction;
148 if (task_source) {
149 task_source_with_transaction.emplace(
150 RegisteredTaskSourceAndTransaction::FromTaskSource(
151 std::move(task_source)));
152 task_source_with_transaction->task_source.WillReEnqueue(
153 TimeTicks::Now(), &task_source_with_transaction->transaction);
154 }
155 CheckedAutoLock auto_lock(lock_);
156 if (task_source_with_transaction.has_value()) {
157 EnqueueTaskSourceLockRequired(std::move(*task_source_with_transaction));
158 }
159
160 // Calling WakeUp() guarantees that this WorkerThread will run Tasks from
161 // TaskSources returned by the GetWork() method of |delegate_| until it
162 // returns nullptr. Resetting |wake_up_event_| here doesn't break this
163 // invariant and avoids a useless loop iteration before going to sleep if
164 // WakeUp() is called while this WorkerThread is awake.
165 wake_up_event_.Reset();
166
167 auto new_task_source = GetWorkLockRequired(worker);
168 if (!new_task_source) {
169 // The worker will sleep after this returns nullptr.
170 worker_awake_ = false;
171 return nullptr;
172 }
173 auto run_status = new_task_source.WillRunTask();
174 DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
175 return new_task_source;
176 }
177
GetSleepTimeout()178 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
179
180 // `task_runner` isn't used but is forwarded to keep the task runner
181 // alive while the task is pending.
PostTaskNow(scoped_refptr<Sequence> sequence,scoped_refptr<SingleThreadTaskRunner> task_runner,Task task)182 bool PostTaskNow(scoped_refptr<Sequence> sequence,
183 scoped_refptr<SingleThreadTaskRunner> task_runner,
184 Task task) {
185 auto transaction = sequence->BeginTransaction();
186
187 // |task| will be pushed to |sequence|, and |sequence| will be queued
188 // to |priority_queue_| iff |sequence_should_be_queued| is true.
189 const bool sequence_should_be_queued = transaction.WillPushImmediateTask();
190 RegisteredTaskSource task_source;
191 if (sequence_should_be_queued) {
192 task_source = task_tracker_->RegisterTaskSource(sequence);
193 // We shouldn't push |task| if we're not allowed to queue |task_source|.
194 if (!task_source)
195 return false;
196 }
197 if (!task_tracker_->WillPostTaskNow(task, transaction.traits().priority()))
198 return false;
199 transaction.PushImmediateTask(std::move(task));
200 if (task_source) {
201 bool should_wakeup;
202 {
203 CheckedAutoLock auto_lock(lock_);
204 should_wakeup = EnqueueTaskSourceLockRequired(
205 {std::move(task_source), std::move(transaction)});
206 }
207 if (should_wakeup) {
208 worker_->WakeUp();
209 }
210 }
211 return true;
212 }
213
RunsTasksInCurrentSequence()214 bool RunsTasksInCurrentSequence() {
215 // We check the thread ref instead of the sequence for the benefit of COM
216 // callbacks which may execute without a sequence context.
217 return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
218 }
219
OnMainExit(WorkerThread *)220 void OnMainExit(WorkerThread* /* worker */) override {}
221
DidUpdateCanRunPolicy()222 void DidUpdateCanRunPolicy() {
223 bool should_wakeup = false;
224 {
225 CheckedAutoLock auto_lock(lock_);
226 if (!worker_awake_ && CanRunNextTaskSource()) {
227 should_wakeup = true;
228 worker_awake_ = true;
229 }
230 }
231 if (should_wakeup)
232 worker_->WakeUp();
233 }
234
EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting()235 void EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting() {
236 CheckedAutoLock auto_lock(lock_);
237 priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
238 }
239
240 protected:
GetWorkLockRequired(WorkerThread * worker)241 RegisteredTaskSource GetWorkLockRequired(WorkerThread* worker)
242 EXCLUSIVE_LOCKS_REQUIRED(lock_) {
243 if (!CanRunNextTaskSource()) {
244 return nullptr;
245 }
246 return priority_queue_.PopTaskSource();
247 }
248
task_tracker()249 const TrackedRef<TaskTracker>& task_tracker() { return task_tracker_; }
250
251 CheckedLock lock_;
252 bool worker_awake_ GUARDED_BY(lock_) = false;
253
254 const TrackedRef<TaskTracker> task_tracker_;
255
256 private:
257 // Enqueues a task source in this single-threaded worker's priority queue.
258 // Returns true iff the worker must wakeup, i.e. task source is allowed to run
259 // and the worker was not awake.
EnqueueTaskSourceLockRequired(RegisteredTaskSourceAndTransaction transaction_with_task_source)260 bool EnqueueTaskSourceLockRequired(
261 RegisteredTaskSourceAndTransaction transaction_with_task_source)
262 EXCLUSIVE_LOCKS_REQUIRED(lock_) {
263 auto sort_key = transaction_with_task_source.task_source->GetSortKey();
264 // When moving |task_source| into |priority_queue_|, it may be destroyed
265 // on another thread as soon as |lock_| is released, since we're no longer
266 // holding a reference to it. To prevent UAF, release |transaction| before
267 // moving |task_source|. Ref. crbug.com/1412008
268 transaction_with_task_source.transaction.Release();
269 priority_queue_.Push(std::move(transaction_with_task_source.task_source),
270 sort_key);
271 if (!worker_awake_ && CanRunNextTaskSource()) {
272 worker_awake_ = true;
273 return true;
274 }
275 return false;
276 }
277
CanRunNextTaskSource()278 bool CanRunNextTaskSource() EXCLUSIVE_LOCKS_REQUIRED(lock_) {
279 return !priority_queue_.IsEmpty() &&
280 task_tracker_->CanRunPriority(
281 priority_queue_.PeekSortKey().priority());
282 }
283
284 const std::string thread_name_;
285 const WorkerThread::ThreadLabel thread_label_;
286
287 // The WorkerThread that has |this| as a delegate. Must be set before
288 // starting or posting a task to the WorkerThread, because it's used in
289 // OnMainEntry() and PostTaskNow().
290 raw_ptr<WorkerThreadWaitableEvent> worker_ = nullptr;
291
292 PriorityQueue priority_queue_ GUARDED_BY(lock_);
293
294 AtomicThreadRefChecker thread_ref_checker_;
295 };
296
297 #if BUILDFLAG(IS_WIN)
298
299 class WorkerThreadCOMDelegate : public WorkerThreadDelegate {
300 public:
WorkerThreadCOMDelegate(const std::string & thread_name,WorkerThread::ThreadLabel thread_label,TrackedRef<TaskTracker> task_tracker)301 WorkerThreadCOMDelegate(const std::string& thread_name,
302 WorkerThread::ThreadLabel thread_label,
303 TrackedRef<TaskTracker> task_tracker)
304 : WorkerThreadDelegate(thread_name,
305 thread_label,
306 std::move(task_tracker)) {}
307
308 WorkerThreadCOMDelegate(const WorkerThreadCOMDelegate&) = delete;
309 WorkerThreadCOMDelegate& operator=(const WorkerThreadCOMDelegate&) = delete;
~WorkerThreadCOMDelegate()310 ~WorkerThreadCOMDelegate() override { DCHECK(!scoped_com_initializer_); }
311
312 // WorkerThreadWaitableEvent::Delegate:
OnMainEntry(WorkerThread * worker)313 void OnMainEntry(WorkerThread* worker) override {
314 WorkerThreadDelegate::OnMainEntry(worker);
315
316 scoped_com_initializer_ = std::make_unique<win::ScopedCOMInitializer>();
317
318 // CHECK to make sure this COM thread is initialized correctly in an STA.
319 CHECK(scoped_com_initializer_->Succeeded());
320 }
321
GetWork(WorkerThread * worker)322 RegisteredTaskSource GetWork(WorkerThread* worker) override {
323 // This scheme below allows us to cover the following scenarios:
324 // * Only WorkerThreadDelegate::GetWork() has work:
325 // Always return the task source from GetWork().
326 // * Only the Windows Message Queue has work:
327 // Always return the task source from GetWorkFromWindowsMessageQueue();
328 // * Both WorkerThreadDelegate::GetWork() and the Windows Message Queue
329 // have work:
330 // Process task sources from each source round-robin style.
331 CheckedAutoLock auto_lock(lock_);
332
333 // |worker_awake_| is always set before a call to WakeUp(), but it is
334 // not set when messages are added to the Windows Message Queue. Ensure that
335 // it is set before getting work, to avoid unnecessary wake ups.
336 //
337 // Note: It wouldn't be sufficient to set |worker_awake_| in WaitForWork()
338 // when MsgWaitForMultipleObjectsEx() indicates that it was woken up by a
339 // Windows Message, because of the following scenario:
340 // T1: PostTask
341 // Queue task
342 // Set |worker_awake_| to true
343 // T2: Woken up by a Windows Message
344 // Set |worker_awake_| to true
345 // Run the task posted by T1
346 // Wait for work
347 // T1: WakeUp()
348 // T2: Woken up by Waitable Event
349 // Does not set |worker_awake_| (wake up not from Windows Message)
350 // GetWork
351 // !! Getting work while |worker_awake_| is false !!
352 worker_awake_ = true;
353 RegisteredTaskSource task_source;
354 if (get_work_first_) {
355 task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
356 if (task_source)
357 get_work_first_ = false;
358 }
359
360 if (!task_source) {
361 CheckedAutoUnlock auto_unlock(lock_);
362 task_source = GetWorkFromWindowsMessageQueue();
363 if (task_source)
364 get_work_first_ = true;
365 }
366
367 if (!task_source && !get_work_first_) {
368 // This case is important if we checked the Windows Message Queue first
369 // and found there was no work. We don't want to return null immediately
370 // as that could cause the thread to go to sleep while work is waiting via
371 // WorkerThreadDelegate::GetWork().
372 task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
373 }
374 if (!task_source) {
375 // The worker will sleep after this returns nullptr.
376 worker_awake_ = false;
377 return nullptr;
378 }
379 auto run_status = task_source.WillRunTask();
380 DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
381 return task_source;
382 }
383
OnMainExit(WorkerThread *)384 void OnMainExit(WorkerThread* /* worker */) override {
385 scoped_com_initializer_.reset();
386 }
387
WaitForWork()388 void WaitForWork() override {
389 const TimeDelta sleep_time = GetSleepTimeout();
390 const DWORD milliseconds_wait = checked_cast<DWORD>(
391 sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds());
392 const HANDLE wake_up_event_handle = wake_up_event_.handle();
393 MsgWaitForMultipleObjectsEx(1, &wake_up_event_handle, milliseconds_wait,
394 QS_ALLINPUT, 0);
395 }
396
397 private:
GetWorkFromWindowsMessageQueue()398 RegisteredTaskSource GetWorkFromWindowsMessageQueue() {
399 MSG msg;
400 if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) {
401 Task pump_message_task(FROM_HERE,
402 BindOnce(
403 [](MSG msg) {
404 TranslateMessage(&msg);
405 DispatchMessage(&msg);
406 },
407 std::move(msg)),
408 TimeTicks::Now(), TimeDelta());
409 if (task_tracker()->WillPostTask(
410 &pump_message_task, TaskShutdownBehavior::SKIP_ON_SHUTDOWN)) {
411 auto transaction = message_pump_sequence_->BeginTransaction();
412 const bool sequence_should_be_queued =
413 transaction.WillPushImmediateTask();
414 DCHECK(sequence_should_be_queued)
415 << "GetWorkFromWindowsMessageQueue() does not expect "
416 "queueing of pump tasks.";
417 auto registered_task_source = task_tracker_->RegisterTaskSource(
418 std::move(message_pump_sequence_));
419 if (!registered_task_source)
420 return nullptr;
421 transaction.PushImmediateTask(std::move(pump_message_task));
422 return registered_task_source;
423 } else {
424 // `pump_message_task`'s destructor may run sequence-affine code, so it
425 // must be leaked when `WillPostTask` returns false.
426 auto leak = std::make_unique<Task>(std::move(pump_message_task));
427 ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
428 leak.release();
429 }
430 }
431 return nullptr;
432 }
433
434 bool get_work_first_ = true;
435 const scoped_refptr<Sequence> message_pump_sequence_ =
436 MakeRefCounted<Sequence>(TaskTraits{MayBlock()},
437 nullptr,
438 TaskSourceExecutionMode::kParallel);
439 std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
440 };
441
442 #endif // BUILDFLAG(IS_WIN)
443
444 } // namespace
445
446 class PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner
447 : public SingleThreadTaskRunner {
448 public:
449 // Constructs a PooledSingleThreadTaskRunner that indirectly controls the
450 // lifetime of a dedicated |worker| for |traits|.
PooledSingleThreadTaskRunner(PooledSingleThreadTaskRunnerManager * const outer,const TaskTraits & traits,WorkerThreadWaitableEvent * worker,SingleThreadTaskRunnerThreadMode thread_mode)451 PooledSingleThreadTaskRunner(PooledSingleThreadTaskRunnerManager* const outer,
452 const TaskTraits& traits,
453 WorkerThreadWaitableEvent* worker,
454 SingleThreadTaskRunnerThreadMode thread_mode)
455 : outer_(outer),
456 worker_(worker),
457 thread_mode_(thread_mode),
458 sequence_(
459 MakeRefCounted<Sequence>(traits,
460 this,
461 TaskSourceExecutionMode::kSingleThread)) {
462 DCHECK(outer_);
463 DCHECK(worker_);
464 }
465 PooledSingleThreadTaskRunner(const PooledSingleThreadTaskRunner&) = delete;
466 PooledSingleThreadTaskRunner& operator=(const PooledSingleThreadTaskRunner&) =
467 delete;
468
469 // SingleThreadTaskRunner:
PostDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)470 bool PostDelayedTask(const Location& from_here,
471 OnceClosure closure,
472 TimeDelta delay) override {
473 if (!g_manager_is_alive)
474 return false;
475
476 Task task(from_here, std::move(closure), TimeTicks::Now(), delay,
477 MessagePump::GetLeewayIgnoringThreadOverride());
478 return PostTask(std::move(task));
479 }
480
PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,const Location & from_here,OnceClosure closure,TimeTicks delayed_run_time,subtle::DelayPolicy delay_policy)481 bool PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,
482 const Location& from_here,
483 OnceClosure closure,
484 TimeTicks delayed_run_time,
485 subtle::DelayPolicy delay_policy) override {
486 if (!g_manager_is_alive)
487 return false;
488
489 Task task(from_here, std::move(closure), TimeTicks::Now(), delayed_run_time,
490 MessagePump::GetLeewayIgnoringThreadOverride(), delay_policy);
491 return PostTask(std::move(task));
492 }
493
PostNonNestableDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)494 bool PostNonNestableDelayedTask(const Location& from_here,
495 OnceClosure closure,
496 TimeDelta delay) override {
497 // Tasks are never nested within the thread pool.
498 return PostDelayedTask(from_here, std::move(closure), delay);
499 }
500
RunsTasksInCurrentSequence() const501 bool RunsTasksInCurrentSequence() const override {
502 if (!g_manager_is_alive)
503 return false;
504 return GetDelegate()->RunsTasksInCurrentSequence();
505 }
506
507 private:
~PooledSingleThreadTaskRunner()508 ~PooledSingleThreadTaskRunner() override {
509 // Only unregister if this is a DEDICATED SingleThreadTaskRunner. SHARED
510 // task runner WorkerThreads are managed separately as they are reused.
511 // |g_manager_is_alive| avoids a use-after-free should this
512 // PooledSingleThreadTaskRunner outlive its manager. It is safe to access
513 // |g_manager_is_alive| without synchronization primitives as it is const
514 // for the lifetime of the manager and ~PooledSingleThreadTaskRunner()
515 // either happens prior to the end of JoinForTesting() (which happens-before
516 // manager's destruction) or on main thread after the task environment's
517 // entire destruction (which happens-after the manager's destruction). Yes,
518 // there's a theoretical use case where the last ref to this
519 // PooledSingleThreadTaskRunner is handed to a thread not controlled by
520 // thread_pool and that this ends up causing
521 // ~PooledSingleThreadTaskRunner() to race with
522 // ~PooledSingleThreadTaskRunnerManager() but this is intentionally not
523 // supported (and it doesn't matter in production where we leak the task
524 // environment for such reasons). TSan should catch this weird paradigm
525 // should anyone elect to use it in a unit test and the error would point
526 // here.
527 if (g_manager_is_alive &&
528 thread_mode_ == SingleThreadTaskRunnerThreadMode::DEDICATED) {
529 outer_->UnregisterWorkerThread(worker_);
530 }
531 }
532
PostTask(Task task)533 bool PostTask(Task task) {
534 if (!outer_->task_tracker_->WillPostTask(&task,
535 sequence_->shutdown_behavior())) {
536 // `task`'s destructor may run sequence-affine code, so it must be leaked
537 // when `WillPostTask` returns false.
538 auto leak = std::make_unique<Task>(std::move(task));
539 ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
540 leak.release();
541 return false;
542 }
543
544 if (task.delayed_run_time.is_null())
545 return GetDelegate()->PostTaskNow(sequence_, nullptr, std::move(task));
546
547 // Unretained(GetDelegate()) is safe because this TaskRunner and its
548 // worker are kept alive as long as there are pending Tasks.
549 outer_->delayed_task_manager_->AddDelayedTask(
550 std::move(task),
551 BindOnce(IgnoreResult(&WorkerThreadDelegate::PostTaskNow),
552 Unretained(GetDelegate()), sequence_,
553 base::WrapRefCounted(this)));
554 return true;
555 }
556
GetDelegate() const557 WorkerThreadDelegate* GetDelegate() const {
558 return static_cast<WorkerThreadDelegate*>(worker_->delegate());
559 }
560
561 // Dangling but safe since use is controlled by `g_manager_is_alive`.
562 const raw_ptr<PooledSingleThreadTaskRunnerManager,
563 DisableDanglingPtrDetection>
564 outer_;
565
566 const raw_ptr<WorkerThreadWaitableEvent, AcrossTasksDanglingUntriaged>
567 worker_;
568 const SingleThreadTaskRunnerThreadMode thread_mode_;
569 const scoped_refptr<Sequence> sequence_;
570 };
571
PooledSingleThreadTaskRunnerManager(TrackedRef<TaskTracker> task_tracker,DelayedTaskManager * delayed_task_manager)572 PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunnerManager(
573 TrackedRef<TaskTracker> task_tracker,
574 DelayedTaskManager* delayed_task_manager)
575 : task_tracker_(std::move(task_tracker)),
576 delayed_task_manager_(delayed_task_manager) {
577 DCHECK(task_tracker_);
578 DCHECK(delayed_task_manager_);
579 #if BUILDFLAG(IS_WIN)
580 static_assert(std::extent<decltype(shared_com_worker_threads_)>() ==
581 std::extent<decltype(shared_worker_threads_)>(),
582 "The size of |shared_com_worker_threads_| must match "
583 "|shared_worker_threads_|");
584 static_assert(
585 std::extent<
586 std::remove_reference<decltype(shared_com_worker_threads_[0])>>() ==
587 std::extent<
588 std::remove_reference<decltype(shared_worker_threads_[0])>>(),
589 "The size of |shared_com_worker_threads_| must match "
590 "|shared_worker_threads_|");
591 #endif // BUILDFLAG(IS_WIN)
592 DCHECK(!g_manager_is_alive);
593 g_manager_is_alive = true;
594 }
595
~PooledSingleThreadTaskRunnerManager()596 PooledSingleThreadTaskRunnerManager::~PooledSingleThreadTaskRunnerManager() {
597 DCHECK(g_manager_is_alive);
598 g_manager_is_alive = false;
599 g_use_utility_thread_group = false;
600 }
601
Start(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,WorkerThreadObserver * worker_thread_observer)602 void PooledSingleThreadTaskRunnerManager::Start(
603 scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
604 WorkerThreadObserver* worker_thread_observer) {
605 DCHECK(!worker_thread_observer_);
606 worker_thread_observer_ = worker_thread_observer;
607 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
608 DCHECK(io_thread_task_runner);
609 io_thread_task_runner_ = std::move(io_thread_task_runner);
610 #endif // (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
611
612 g_use_utility_thread_group = CanUseUtilityThreadTypeForWorkerThread() &&
613 FeatureList::IsEnabled(kUseUtilityThreadGroup);
614
615 decltype(workers_) workers_to_start;
616 {
617 CheckedAutoLock auto_lock(lock_);
618 started_ = true;
619 workers_to_start = workers_;
620 }
621
622 // Start workers that were created before this method was called.
623 // Workers that already need to wake up are already signaled as part of
624 // PooledSingleThreadTaskRunner::PostTaskNow(). As a result, it's
625 // unnecessary to call WakeUp() for each worker (in fact, an extraneous
626 // WakeUp() would be racy and wrong - see https://crbug.com/862582).
627 for (scoped_refptr<WorkerThreadWaitableEvent> worker : workers_to_start) {
628 worker->Start(io_thread_task_runner_, worker_thread_observer_);
629 }
630 }
631
DidUpdateCanRunPolicy()632 void PooledSingleThreadTaskRunnerManager::DidUpdateCanRunPolicy() {
633 decltype(workers_) workers_to_update;
634
635 {
636 CheckedAutoLock auto_lock(lock_);
637 if (!started_)
638 return;
639 workers_to_update = workers_;
640 }
641 // Any worker created after the lock is released will see the latest
642 // CanRunPolicy if tasks are posted to it and thus doesn't need a
643 // DidUpdateCanRunPolicy() notification.
644 for (auto& worker : workers_to_update) {
645 static_cast<WorkerThreadDelegate*>(worker->delegate())
646 ->DidUpdateCanRunPolicy();
647 }
648 }
649
650 scoped_refptr<SingleThreadTaskRunner>
CreateSingleThreadTaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)651 PooledSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunner(
652 const TaskTraits& traits,
653 SingleThreadTaskRunnerThreadMode thread_mode) {
654 return CreateTaskRunnerImpl<WorkerThreadDelegate>(traits, thread_mode);
655 }
656
657 #if BUILDFLAG(IS_WIN)
658 scoped_refptr<SingleThreadTaskRunner>
CreateCOMSTATaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)659 PooledSingleThreadTaskRunnerManager::CreateCOMSTATaskRunner(
660 const TaskTraits& traits,
661 SingleThreadTaskRunnerThreadMode thread_mode) {
662 return CreateTaskRunnerImpl<WorkerThreadCOMDelegate>(traits, thread_mode);
663 }
664 #endif // BUILDFLAG(IS_WIN)
665
666 // static
667 PooledSingleThreadTaskRunnerManager::ContinueOnShutdown
TraitsToContinueOnShutdown(const TaskTraits & traits)668 PooledSingleThreadTaskRunnerManager::TraitsToContinueOnShutdown(
669 const TaskTraits& traits) {
670 if (traits.shutdown_behavior() == TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN)
671 return IS_CONTINUE_ON_SHUTDOWN;
672 return IS_NOT_CONTINUE_ON_SHUTDOWN;
673 }
674
675 template <typename DelegateType>
676 scoped_refptr<PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner>
CreateTaskRunnerImpl(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)677 PooledSingleThreadTaskRunnerManager::CreateTaskRunnerImpl(
678 const TaskTraits& traits,
679 SingleThreadTaskRunnerThreadMode thread_mode) {
680 DCHECK(thread_mode != SingleThreadTaskRunnerThreadMode::SHARED ||
681 !traits.with_base_sync_primitives())
682 << "Using WithBaseSyncPrimitives() on a shared SingleThreadTaskRunner "
683 "may cause deadlocks. Either reevaluate your usage (e.g. use "
684 "SequencedTaskRunner) or use "
685 "SingleThreadTaskRunnerThreadMode::DEDICATED.";
686 // To simplify the code, |dedicated_worker| is a local only variable that
687 // allows the code to treat both the DEDICATED and SHARED cases similarly for
688 // SingleThreadTaskRunnerThreadMode. In DEDICATED, the scoped_refptr is backed
689 // by a local variable and in SHARED, the scoped_refptr is backed by a member
690 // variable.
691 WorkerThreadWaitableEvent* dedicated_worker = nullptr;
692 WorkerThreadWaitableEvent*& worker =
693 thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
694 ? dedicated_worker
695 : GetSharedWorkerThreadForTraits<DelegateType>(traits);
696 bool new_worker = false;
697 bool started;
698 {
699 CheckedAutoLock auto_lock(lock_);
700 if (!worker) {
701 const auto& environment_params =
702 kEnvironmentParams[GetEnvironmentIndexForTraits(traits)];
703 std::string worker_name;
704 if (thread_mode == SingleThreadTaskRunnerThreadMode::SHARED)
705 worker_name += "Shared";
706 worker_name += environment_params.name_suffix;
707 worker = CreateAndRegisterWorkerThread<DelegateType>(
708 worker_name, thread_mode, environment_params.thread_type_hint);
709 new_worker = true;
710 }
711 started = started_;
712 }
713
714 if (new_worker && started)
715 worker->Start(io_thread_task_runner_, worker_thread_observer_);
716
717 return MakeRefCounted<PooledSingleThreadTaskRunner>(this, traits, worker,
718 thread_mode);
719 }
720
JoinForTesting()721 void PooledSingleThreadTaskRunnerManager::JoinForTesting() {
722 decltype(workers_) local_workers;
723 {
724 CheckedAutoLock auto_lock(lock_);
725 local_workers = std::move(workers_);
726 }
727
728 for (const auto& worker : local_workers) {
729 static_cast<WorkerThreadDelegate*>(worker->delegate())
730 ->EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting();
731 worker->JoinForTesting();
732 }
733
734 {
735 CheckedAutoLock auto_lock(lock_);
736 DCHECK(workers_.empty())
737 << "New worker(s) unexpectedly registered during join.";
738 workers_ = std::move(local_workers);
739 }
740
741 // Release shared WorkerThreads at the end so they get joined above. If
742 // this call happens before the joins, the WorkerThreads are effectively
743 // detached and may outlive the PooledSingleThreadTaskRunnerManager.
744 ReleaseSharedWorkerThreads();
745 }
746
747 template <>
748 std::unique_ptr<WorkerThreadDelegate>
CreateWorkerThreadDelegate(const std::string & name,int id,SingleThreadTaskRunnerThreadMode thread_mode)749 PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
750 WorkerThreadDelegate>(const std::string& name,
751 int id,
752 SingleThreadTaskRunnerThreadMode thread_mode) {
753 return std::make_unique<WorkerThreadDelegate>(
754 StringPrintf("ThreadPoolSingleThread%s%d", name.c_str(), id),
755 thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
756 ? WorkerThread::ThreadLabel::DEDICATED
757 : WorkerThread::ThreadLabel::SHARED,
758 task_tracker_);
759 }
760
761 #if BUILDFLAG(IS_WIN)
762 template <>
763 std::unique_ptr<WorkerThreadDelegate>
CreateWorkerThreadDelegate(const std::string & name,int id,SingleThreadTaskRunnerThreadMode thread_mode)764 PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
765 WorkerThreadCOMDelegate>(const std::string& name,
766 int id,
767 SingleThreadTaskRunnerThreadMode thread_mode) {
768 return std::make_unique<WorkerThreadCOMDelegate>(
769 StringPrintf("ThreadPoolSingleThreadCOMSTA%s%d", name.c_str(), id),
770 thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
771 ? WorkerThread::ThreadLabel::DEDICATED_COM
772 : WorkerThread::ThreadLabel::SHARED_COM,
773 task_tracker_);
774 }
775 #endif // BUILDFLAG(IS_WIN)
776
777 template <typename DelegateType>
778 WorkerThreadWaitableEvent*
CreateAndRegisterWorkerThread(const std::string & name,SingleThreadTaskRunnerThreadMode thread_mode,ThreadType thread_type_hint)779 PooledSingleThreadTaskRunnerManager::CreateAndRegisterWorkerThread(
780 const std::string& name,
781 SingleThreadTaskRunnerThreadMode thread_mode,
782 ThreadType thread_type_hint) {
783 int id = next_worker_id_++;
784 std::unique_ptr<WorkerThreadDelegate> delegate =
785 CreateWorkerThreadDelegate<DelegateType>(name, id, thread_mode);
786 WorkerThreadDelegate* delegate_raw = delegate.get();
787 scoped_refptr<WorkerThreadWaitableEvent> worker =
788 MakeRefCounted<WorkerThreadWaitableEvent>(thread_type_hint,
789 std::move(delegate),
790 task_tracker_, workers_.size());
791 delegate_raw->set_worker(worker.get());
792 workers_.emplace_back(std::move(worker));
793 return workers_.back().get();
794 }
795
796 template <>
797 WorkerThreadWaitableEvent*&
GetSharedWorkerThreadForTraits(const TaskTraits & traits)798 PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
799 WorkerThreadDelegate>(const TaskTraits& traits) {
800 return shared_worker_threads_[GetEnvironmentIndexForTraits(traits)]
801 [TraitsToContinueOnShutdown(traits)];
802 }
803
804 #if BUILDFLAG(IS_WIN)
805 template <>
806 WorkerThreadWaitableEvent*&
GetSharedWorkerThreadForTraits(const TaskTraits & traits)807 PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
808 WorkerThreadCOMDelegate>(const TaskTraits& traits) {
809 return shared_com_worker_threads_[GetEnvironmentIndexForTraits(traits)]
810 [TraitsToContinueOnShutdown(traits)];
811 }
812 #endif // BUILDFLAG(IS_WIN)
813
UnregisterWorkerThread(WorkerThreadWaitableEvent * worker)814 void PooledSingleThreadTaskRunnerManager::UnregisterWorkerThread(
815 WorkerThreadWaitableEvent* worker) {
816 // Cleanup uses a CheckedLock, so call Cleanup() after releasing |lock_|.
817 scoped_refptr<WorkerThreadWaitableEvent> worker_to_destroy;
818 {
819 CheckedAutoLock auto_lock(lock_);
820
821 // Skip when joining (the join logic takes care of the rest).
822 if (workers_.empty())
823 return;
824
825 auto worker_iter = ranges::find(workers_, worker);
826 CHECK(worker_iter != workers_.end(), base::NotFatalUntil::M125);
827 worker_to_destroy = std::move(*worker_iter);
828 workers_.erase(worker_iter);
829 }
830 worker_to_destroy->Cleanup();
831 }
832
ReleaseSharedWorkerThreads()833 void PooledSingleThreadTaskRunnerManager::ReleaseSharedWorkerThreads() {
834 decltype(shared_worker_threads_) local_shared_worker_threads;
835 #if BUILDFLAG(IS_WIN)
836 decltype(shared_com_worker_threads_) local_shared_com_worker_threads;
837 #endif
838 {
839 CheckedAutoLock auto_lock(lock_);
840 for (size_t i = 0; i < std::size(shared_worker_threads_); ++i) {
841 for (size_t j = 0; j < std::size(shared_worker_threads_[i]); ++j) {
842 local_shared_worker_threads[i][j] = shared_worker_threads_[i][j];
843 shared_worker_threads_[i][j] = nullptr;
844 #if BUILDFLAG(IS_WIN)
845 local_shared_com_worker_threads[i][j] =
846 shared_com_worker_threads_[i][j];
847 shared_com_worker_threads_[i][j] = nullptr;
848 #endif
849 }
850 }
851 }
852
853 for (size_t i = 0; i < std::size(local_shared_worker_threads); ++i) {
854 for (size_t j = 0; j < std::size(local_shared_worker_threads[i]); ++j) {
855 if (local_shared_worker_threads[i][j])
856 UnregisterWorkerThread(local_shared_worker_threads[i][j]);
857 #if BUILDFLAG(IS_WIN)
858 if (local_shared_com_worker_threads[i][j])
859 UnregisterWorkerThread(local_shared_com_worker_threads[i][j]);
860 #endif
861 }
862 }
863 }
864
865 } // namespace internal
866 } // namespace base
867