1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/threading/thread.h"
6
7 #include <memory>
8 #include <type_traits>
9 #include <utility>
10
11 #include "base/dcheck_is_on.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/memory/ptr_util.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/message_loop/message_pump.h"
19 #include "base/run_loop.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/task/current_thread.h"
22 #include "base/task/sequence_manager/sequence_manager_impl.h"
23 #include "base/task/sequence_manager/task_queue.h"
24 #include "base/task/single_thread_task_runner.h"
25 #include "base/threading/thread_id_name_manager.h"
26 #include "base/threading/thread_restrictions.h"
27 #include "base/types/pass_key.h"
28 #include "build/build_config.h"
29 #include "third_party/abseil-cpp/absl/base/attributes.h"
30 #include "third_party/abseil-cpp/absl/base/dynamic_annotations.h"
31
32 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
33 #include <optional>
34
35 #include "base/files/file_descriptor_watcher_posix.h"
36 #endif
37
38 #if BUILDFLAG(IS_WIN)
39 #include "base/win/scoped_com_initializer.h"
40 #endif
41
42 namespace base {
43
44 #if DCHECK_IS_ON()
45 namespace {
46
47 // We use this thread-local variable to record whether or not a thread exited
48 // because its Stop method was called. This allows us to catch cases where
49 // MessageLoop::QuitWhenIdle() is called directly, which is unexpected when
50 // using a Thread to setup and run a MessageLoop.
51 ABSL_CONST_INIT thread_local bool was_quit_properly = false;
52
53 } // namespace
54 #endif
55
56 namespace internal {
57
58 class SequenceManagerThreadDelegate : public Thread::Delegate {
59 public:
SequenceManagerThreadDelegate(MessagePumpType message_pump_type,OnceCallback<std::unique_ptr<MessagePump> ()> message_pump_factory)60 explicit SequenceManagerThreadDelegate(
61 MessagePumpType message_pump_type,
62 OnceCallback<std::unique_ptr<MessagePump>()> message_pump_factory)
63 : sequence_manager_(
64 sequence_manager::internal::CreateUnboundSequenceManagerImpl(
65 PassKey<base::internal::SequenceManagerThreadDelegate>(),
66 sequence_manager::SequenceManager::Settings::Builder()
67 .SetMessagePumpType(message_pump_type)
68 .Build())),
69 default_task_queue_(sequence_manager_->CreateTaskQueue(
70 sequence_manager::TaskQueue::Spec(
71 sequence_manager::QueueName::DEFAULT_TQ))),
72 message_pump_factory_(std::move(message_pump_factory)) {
73 sequence_manager_->SetDefaultTaskRunner(default_task_queue_->task_runner());
74 }
75
76 ~SequenceManagerThreadDelegate() override = default;
77
GetDefaultTaskRunner()78 scoped_refptr<SingleThreadTaskRunner> GetDefaultTaskRunner() override {
79 // Surprisingly this might not be default_task_queue_->task_runner() which
80 // we set in the constructor. The Thread::Init() method could create a
81 // SequenceManager on top of the current one and call
82 // SequenceManager::SetDefaultTaskRunner which would propagate the new
83 // TaskRunner down to our SequenceManager. Turns out, code actually relies
84 // on this and somehow relies on
85 // SequenceManagerThreadDelegate::GetDefaultTaskRunner returning this new
86 // TaskRunner. So instead of returning default_task_queue_->task_runner() we
87 // need to query the SequenceManager for it.
88 // The underlying problem here is that Subclasses of Thread can do crazy
89 // stuff in Init() but they are not really in control of what happens in the
90 // Thread::Delegate, as this is passed in on calling StartWithOptions which
91 // could happen far away from where the Thread is created. We should
92 // consider getting rid of StartWithOptions, and pass them as a constructor
93 // argument instead.
94 return sequence_manager_->GetTaskRunner();
95 }
96
BindToCurrentThread()97 void BindToCurrentThread() override {
98 sequence_manager_->BindToMessagePump(
99 std::move(message_pump_factory_).Run());
100 }
101
102 private:
103 std::unique_ptr<sequence_manager::internal::SequenceManagerImpl>
104 sequence_manager_;
105 sequence_manager::TaskQueue::Handle default_task_queue_;
106 OnceCallback<std::unique_ptr<MessagePump>()> message_pump_factory_;
107 };
108
109 } // namespace internal
110
111 Thread::Options::Options() = default;
112
Options(MessagePumpType type,size_t size)113 Thread::Options::Options(MessagePumpType type, size_t size)
114 : message_pump_type(type), stack_size(size) {}
115
Options(ThreadType thread_type)116 Thread::Options::Options(ThreadType thread_type) : thread_type(thread_type) {}
117
Options(Options && other)118 Thread::Options::Options(Options&& other)
119 : message_pump_type(std::move(other.message_pump_type)),
120 delegate(std::move(other.delegate)),
121 message_pump_factory(std::move(other.message_pump_factory)),
122 stack_size(std::move(other.stack_size)),
123 thread_type(std::move(other.thread_type)),
124 joinable(std::move(other.joinable)) {
125 other.moved_from = true;
126 }
127
operator =(Thread::Options && other)128 Thread::Options& Thread::Options::operator=(Thread::Options&& other) {
129 DCHECK_NE(this, &other);
130
131 message_pump_type = std::move(other.message_pump_type);
132 delegate = std::move(other.delegate);
133 message_pump_factory = std::move(other.message_pump_factory);
134 stack_size = std::move(other.stack_size);
135 thread_type = std::move(other.thread_type);
136 joinable = std::move(other.joinable);
137 other.moved_from = true;
138
139 return *this;
140 }
141
142 Thread::Options::~Options() = default;
143
Thread(const std::string & name)144 Thread::Thread(const std::string& name)
145 : id_event_(WaitableEvent::ResetPolicy::MANUAL,
146 WaitableEvent::InitialState::NOT_SIGNALED),
147 name_(name),
148 start_event_(WaitableEvent::ResetPolicy::MANUAL,
149 WaitableEvent::InitialState::NOT_SIGNALED) {
150 // Only bind the sequence on Start(): the state is constant between
151 // construction and Start() and it's thus valid for Start() to be called on
152 // another sequence as long as every other operation is then performed on that
153 // sequence.
154 owning_sequence_checker_.DetachFromSequence();
155 }
156
~Thread()157 Thread::~Thread() {
158 Stop();
159 }
160
Start()161 bool Thread::Start() {
162 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
163
164 Options options;
165 #if BUILDFLAG(IS_WIN)
166 if (com_status_ == STA)
167 options.message_pump_type = MessagePumpType::UI;
168 #endif
169 return StartWithOptions(std::move(options));
170 }
171
StartWithOptions(Options options)172 bool Thread::StartWithOptions(Options options) {
173 DCHECK(options.IsValid());
174 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
175 DCHECK(!delegate_);
176 DCHECK(!IsRunning());
177 DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's "
178 << "not allowed!";
179 #if BUILDFLAG(IS_WIN)
180 DCHECK((com_status_ != STA) ||
181 (options.message_pump_type == MessagePumpType::UI));
182 #endif
183
184 // Reset |id_| here to support restarting the thread.
185 id_event_.Reset();
186 id_ = kInvalidThreadId;
187
188 SetThreadWasQuitProperly(false);
189
190 if (options.delegate) {
191 DCHECK(!options.message_pump_factory);
192 delegate_ = std::move(options.delegate);
193 } else if (options.message_pump_factory) {
194 delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
195 MessagePumpType::CUSTOM, options.message_pump_factory);
196 } else {
197 delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
198 options.message_pump_type,
199 BindOnce([](MessagePumpType type) { return MessagePump::Create(type); },
200 options.message_pump_type));
201 }
202
203 start_event_.Reset();
204
205 // Hold |thread_lock_| while starting the new thread to synchronize with
206 // Stop() while it's not guaranteed to be sequenced (until crbug/629139 is
207 // fixed).
208 {
209 AutoLock lock(thread_lock_);
210 bool success = options.joinable
211 ? PlatformThread::CreateWithType(
212 options.stack_size, this, &thread_,
213 options.thread_type, options.message_pump_type)
214 : PlatformThread::CreateNonJoinableWithType(
215 options.stack_size, this, options.thread_type,
216 options.message_pump_type);
217 if (!success) {
218 DLOG(ERROR) << "failed to create thread";
219 return false;
220 }
221 }
222
223 joinable_ = options.joinable;
224
225 return true;
226 }
227
StartAndWaitForTesting()228 bool Thread::StartAndWaitForTesting() {
229 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
230 bool result = Start();
231 if (!result)
232 return false;
233 WaitUntilThreadStarted();
234 return true;
235 }
236
WaitUntilThreadStarted() const237 bool Thread::WaitUntilThreadStarted() const {
238 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
239 if (!delegate_)
240 return false;
241 // https://crbug.com/918039
242 base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
243 start_event_.Wait();
244 return true;
245 }
246
FlushForTesting()247 void Thread::FlushForTesting() {
248 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
249 if (!delegate_)
250 return;
251
252 WaitableEvent done(WaitableEvent::ResetPolicy::AUTOMATIC,
253 WaitableEvent::InitialState::NOT_SIGNALED);
254 task_runner()->PostTask(FROM_HERE,
255 BindOnce(&WaitableEvent::Signal, Unretained(&done)));
256 done.Wait();
257 }
258
Stop()259 void Thread::Stop() {
260 DCHECK(joinable_);
261
262 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
263 // enable this check, until then synchronization with Start() via
264 // |thread_lock_| is required...
265 // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
266 AutoLock lock(thread_lock_);
267
268 StopSoon();
269
270 // Can't join if the |thread_| is either already gone or is non-joinable.
271 if (thread_.is_null())
272 return;
273
274 // Wait for the thread to exit.
275 //
276 // TODO(darin): Unfortunately, we need to keep |delegate_| around
277 // until the thread exits. Some consumers are abusing the API. Make them stop.
278 PlatformThread::Join(thread_);
279 thread_ = base::PlatformThreadHandle();
280
281 // The thread should release |delegate_| on exit (note: Join() adds
282 // an implicit memory barrier and no lock is thus required for this check).
283 DCHECK(!delegate_);
284
285 stopping_ = false;
286 }
287
StopSoon()288 void Thread::StopSoon() {
289 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
290 // enable this check.
291 // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
292
293 if (stopping_ || !delegate_)
294 return;
295
296 stopping_ = true;
297
298 task_runner()->PostTask(
299 FROM_HERE, base::BindOnce(&Thread::ThreadQuitHelper, Unretained(this)));
300 }
301
DetachFromSequence()302 void Thread::DetachFromSequence() {
303 DCHECK(owning_sequence_checker_.CalledOnValidSequence());
304 owning_sequence_checker_.DetachFromSequence();
305 }
306
GetThreadId() const307 PlatformThreadId Thread::GetThreadId() const {
308 if (!id_event_.IsSignaled()) {
309 // If the thread is created but not started yet, wait for |id_| being ready.
310 base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
311 id_event_.Wait();
312 }
313 return id_;
314 }
315
IsRunning() const316 bool Thread::IsRunning() const {
317 // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
318 // enable this check.
319 // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
320
321 // If the thread's already started (i.e. |delegate_| is non-null) and
322 // not yet requested to stop (i.e. |stopping_| is false) we can just return
323 // true. (Note that |stopping_| is touched only on the same sequence that
324 // starts / started the new thread so we need no locking here.)
325 if (delegate_ && !stopping_)
326 return true;
327 // Otherwise check the |running_| flag, which is set to true by the new thread
328 // only while it is inside Run().
329 AutoLock lock(running_lock_);
330 return running_;
331 }
332
Run(RunLoop * run_loop)333 void Thread::Run(RunLoop* run_loop) {
334 // Overridable protected method to be called from our |thread_| only.
335 DCHECK(id_event_.IsSignaled());
336 DCHECK_EQ(id_, PlatformThread::CurrentId());
337
338 run_loop->Run();
339 }
340
341 // static
SetThreadWasQuitProperly(bool flag)342 void Thread::SetThreadWasQuitProperly(bool flag) {
343 #if DCHECK_IS_ON()
344 was_quit_properly = flag;
345 #endif
346 }
347
348 // static
GetThreadWasQuitProperly()349 bool Thread::GetThreadWasQuitProperly() {
350 #if DCHECK_IS_ON()
351 return was_quit_properly;
352 #else
353 return true;
354 #endif
355 }
356
ThreadMain()357 void Thread::ThreadMain() {
358 // First, make GetThreadId() available to avoid deadlocks. It could be called
359 // any place in the following thread initialization code.
360 DCHECK(!id_event_.IsSignaled());
361 // Note: this read of |id_| while |id_event_| isn't signaled is exceptionally
362 // okay because ThreadMain has a happens-after relationship with the other
363 // write in StartWithOptions().
364 DCHECK_EQ(kInvalidThreadId, id_);
365 id_ = PlatformThread::CurrentId();
366 DCHECK_NE(kInvalidThreadId, id_);
367 id_event_.Signal();
368
369 // Complete the initialization of our Thread object.
370 PlatformThread::SetName(name_.c_str());
371 ABSL_ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector.
372
373 // Lazily initialize the |message_loop| so that it can run on this thread.
374 DCHECK(delegate_);
375 // This binds CurrentThread and SingleThreadTaskRunner::CurrentDefaultHandle.
376 delegate_->BindToCurrentThread();
377 DCHECK(CurrentThread::Get());
378 DCHECK(SingleThreadTaskRunner::HasCurrentDefault());
379 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
380 // Allow threads running a MessageLoopForIO to use FileDescriptorWatcher API.
381 std::unique_ptr<FileDescriptorWatcher> file_descriptor_watcher;
382 if (CurrentIOThread::IsSet()) {
383 file_descriptor_watcher = std::make_unique<FileDescriptorWatcher>(
384 delegate_->GetDefaultTaskRunner());
385 }
386 #endif // (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
387
388 #if BUILDFLAG(IS_WIN)
389 std::unique_ptr<win::ScopedCOMInitializer> com_initializer;
390 if (com_status_ != NONE) {
391 com_initializer.reset(
392 (com_status_ == STA)
393 ? new win::ScopedCOMInitializer()
394 : new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
395 }
396 #endif
397
398 // Let the thread do extra initialization.
399 Init();
400
401 {
402 AutoLock lock(running_lock_);
403 running_ = true;
404 }
405
406 start_event_.Signal();
407
408 RunLoop run_loop;
409 run_loop_ = &run_loop;
410 Run(run_loop_);
411
412 {
413 AutoLock lock(running_lock_);
414 running_ = false;
415 }
416
417 // Let the thread do extra cleanup.
418 CleanUp();
419
420 #if BUILDFLAG(IS_WIN)
421 com_initializer.reset();
422 #endif
423
424 DCHECK(GetThreadWasQuitProperly());
425
426 // We can't receive messages anymore.
427 // (The message loop is destructed at the end of this block)
428 delegate_.reset();
429 run_loop_ = nullptr;
430 }
431
ThreadQuitHelper()432 void Thread::ThreadQuitHelper() {
433 DCHECK(run_loop_);
434 run_loop_->QuitWhenIdle();
435 SetThreadWasQuitProperly(true);
436 }
437
438 } // namespace base
439