xref: /aosp_15_r20/external/cronet/base/threading/thread.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/threading/thread.h"
6 
7 #include <memory>
8 #include <type_traits>
9 #include <utility>
10 
11 #include "base/dcheck_is_on.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/memory/ptr_util.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/message_loop/message_pump.h"
19 #include "base/run_loop.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/task/current_thread.h"
22 #include "base/task/sequence_manager/sequence_manager_impl.h"
23 #include "base/task/sequence_manager/task_queue.h"
24 #include "base/task/single_thread_task_runner.h"
25 #include "base/threading/thread_id_name_manager.h"
26 #include "base/threading/thread_restrictions.h"
27 #include "base/types/pass_key.h"
28 #include "build/build_config.h"
29 #include "third_party/abseil-cpp/absl/base/attributes.h"
30 #include "third_party/abseil-cpp/absl/base/dynamic_annotations.h"
31 
32 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
33 #include <optional>
34 
35 #include "base/files/file_descriptor_watcher_posix.h"
36 #endif
37 
38 #if BUILDFLAG(IS_WIN)
39 #include "base/win/scoped_com_initializer.h"
40 #endif
41 
42 namespace base {
43 
44 #if DCHECK_IS_ON()
45 namespace {
46 
47 // We use this thread-local variable to record whether or not a thread exited
48 // because its Stop method was called.  This allows us to catch cases where
49 // MessageLoop::QuitWhenIdle() is called directly, which is unexpected when
50 // using a Thread to setup and run a MessageLoop.
51 ABSL_CONST_INIT thread_local bool was_quit_properly = false;
52 
53 }  // namespace
54 #endif
55 
56 namespace internal {
57 
58 class SequenceManagerThreadDelegate : public Thread::Delegate {
59  public:
SequenceManagerThreadDelegate(MessagePumpType message_pump_type,OnceCallback<std::unique_ptr<MessagePump> ()> message_pump_factory)60   explicit SequenceManagerThreadDelegate(
61       MessagePumpType message_pump_type,
62       OnceCallback<std::unique_ptr<MessagePump>()> message_pump_factory)
63       : sequence_manager_(
64             sequence_manager::internal::CreateUnboundSequenceManagerImpl(
65                 PassKey<base::internal::SequenceManagerThreadDelegate>(),
66                 sequence_manager::SequenceManager::Settings::Builder()
67                     .SetMessagePumpType(message_pump_type)
68                     .Build())),
69         default_task_queue_(sequence_manager_->CreateTaskQueue(
70             sequence_manager::TaskQueue::Spec(
71                 sequence_manager::QueueName::DEFAULT_TQ))),
72         message_pump_factory_(std::move(message_pump_factory)) {
73     sequence_manager_->SetDefaultTaskRunner(default_task_queue_->task_runner());
74   }
75 
76   ~SequenceManagerThreadDelegate() override = default;
77 
GetDefaultTaskRunner()78   scoped_refptr<SingleThreadTaskRunner> GetDefaultTaskRunner() override {
79     // Surprisingly this might not be default_task_queue_->task_runner() which
80     // we set in the constructor. The Thread::Init() method could create a
81     // SequenceManager on top of the current one and call
82     // SequenceManager::SetDefaultTaskRunner which would propagate the new
83     // TaskRunner down to our SequenceManager. Turns out, code actually relies
84     // on this and somehow relies on
85     // SequenceManagerThreadDelegate::GetDefaultTaskRunner returning this new
86     // TaskRunner. So instead of returning default_task_queue_->task_runner() we
87     // need to query the SequenceManager for it.
88     // The underlying problem here is that Subclasses of Thread can do crazy
89     // stuff in Init() but they are not really in control of what happens in the
90     // Thread::Delegate, as this is passed in on calling StartWithOptions which
91     // could happen far away from where the Thread is created. We should
92     // consider getting rid of StartWithOptions, and pass them as a constructor
93     // argument instead.
94     return sequence_manager_->GetTaskRunner();
95   }
96 
BindToCurrentThread()97   void BindToCurrentThread() override {
98     sequence_manager_->BindToMessagePump(
99         std::move(message_pump_factory_).Run());
100   }
101 
102  private:
103   std::unique_ptr<sequence_manager::internal::SequenceManagerImpl>
104       sequence_manager_;
105   sequence_manager::TaskQueue::Handle default_task_queue_;
106   OnceCallback<std::unique_ptr<MessagePump>()> message_pump_factory_;
107 };
108 
109 }  // namespace internal
110 
111 Thread::Options::Options() = default;
112 
Options(MessagePumpType type,size_t size)113 Thread::Options::Options(MessagePumpType type, size_t size)
114     : message_pump_type(type), stack_size(size) {}
115 
Options(ThreadType thread_type)116 Thread::Options::Options(ThreadType thread_type) : thread_type(thread_type) {}
117 
Options(Options && other)118 Thread::Options::Options(Options&& other)
119     : message_pump_type(std::move(other.message_pump_type)),
120       delegate(std::move(other.delegate)),
121       message_pump_factory(std::move(other.message_pump_factory)),
122       stack_size(std::move(other.stack_size)),
123       thread_type(std::move(other.thread_type)),
124       joinable(std::move(other.joinable)) {
125   other.moved_from = true;
126 }
127 
operator =(Thread::Options && other)128 Thread::Options& Thread::Options::operator=(Thread::Options&& other) {
129   DCHECK_NE(this, &other);
130 
131   message_pump_type = std::move(other.message_pump_type);
132   delegate = std::move(other.delegate);
133   message_pump_factory = std::move(other.message_pump_factory);
134   stack_size = std::move(other.stack_size);
135   thread_type = std::move(other.thread_type);
136   joinable = std::move(other.joinable);
137   other.moved_from = true;
138 
139   return *this;
140 }
141 
142 Thread::Options::~Options() = default;
143 
Thread(const std::string & name)144 Thread::Thread(const std::string& name)
145     : id_event_(WaitableEvent::ResetPolicy::MANUAL,
146                 WaitableEvent::InitialState::NOT_SIGNALED),
147       name_(name),
148       start_event_(WaitableEvent::ResetPolicy::MANUAL,
149                    WaitableEvent::InitialState::NOT_SIGNALED) {
150   // Only bind the sequence on Start(): the state is constant between
151   // construction and Start() and it's thus valid for Start() to be called on
152   // another sequence as long as every other operation is then performed on that
153   // sequence.
154   owning_sequence_checker_.DetachFromSequence();
155 }
156 
~Thread()157 Thread::~Thread() {
158   Stop();
159 }
160 
Start()161 bool Thread::Start() {
162   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
163 
164   Options options;
165 #if BUILDFLAG(IS_WIN)
166   if (com_status_ == STA)
167     options.message_pump_type = MessagePumpType::UI;
168 #endif
169   return StartWithOptions(std::move(options));
170 }
171 
StartWithOptions(Options options)172 bool Thread::StartWithOptions(Options options) {
173   DCHECK(options.IsValid());
174   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
175   DCHECK(!delegate_);
176   DCHECK(!IsRunning());
177   DCHECK(!stopping_) << "Starting a non-joinable thread a second time? That's "
178                      << "not allowed!";
179 #if BUILDFLAG(IS_WIN)
180   DCHECK((com_status_ != STA) ||
181          (options.message_pump_type == MessagePumpType::UI));
182 #endif
183 
184   // Reset |id_| here to support restarting the thread.
185   id_event_.Reset();
186   id_ = kInvalidThreadId;
187 
188   SetThreadWasQuitProperly(false);
189 
190   if (options.delegate) {
191     DCHECK(!options.message_pump_factory);
192     delegate_ = std::move(options.delegate);
193   } else if (options.message_pump_factory) {
194     delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
195         MessagePumpType::CUSTOM, options.message_pump_factory);
196   } else {
197     delegate_ = std::make_unique<internal::SequenceManagerThreadDelegate>(
198         options.message_pump_type,
199         BindOnce([](MessagePumpType type) { return MessagePump::Create(type); },
200                  options.message_pump_type));
201   }
202 
203   start_event_.Reset();
204 
205   // Hold |thread_lock_| while starting the new thread to synchronize with
206   // Stop() while it's not guaranteed to be sequenced (until crbug/629139 is
207   // fixed).
208   {
209     AutoLock lock(thread_lock_);
210     bool success = options.joinable
211                        ? PlatformThread::CreateWithType(
212                              options.stack_size, this, &thread_,
213                              options.thread_type, options.message_pump_type)
214                        : PlatformThread::CreateNonJoinableWithType(
215                              options.stack_size, this, options.thread_type,
216                              options.message_pump_type);
217     if (!success) {
218       DLOG(ERROR) << "failed to create thread";
219       return false;
220     }
221   }
222 
223   joinable_ = options.joinable;
224 
225   return true;
226 }
227 
StartAndWaitForTesting()228 bool Thread::StartAndWaitForTesting() {
229   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
230   bool result = Start();
231   if (!result)
232     return false;
233   WaitUntilThreadStarted();
234   return true;
235 }
236 
WaitUntilThreadStarted() const237 bool Thread::WaitUntilThreadStarted() const {
238   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
239   if (!delegate_)
240     return false;
241   // https://crbug.com/918039
242   base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
243   start_event_.Wait();
244   return true;
245 }
246 
FlushForTesting()247 void Thread::FlushForTesting() {
248   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
249   if (!delegate_)
250     return;
251 
252   WaitableEvent done(WaitableEvent::ResetPolicy::AUTOMATIC,
253                      WaitableEvent::InitialState::NOT_SIGNALED);
254   task_runner()->PostTask(FROM_HERE,
255                           BindOnce(&WaitableEvent::Signal, Unretained(&done)));
256   done.Wait();
257 }
258 
Stop()259 void Thread::Stop() {
260   DCHECK(joinable_);
261 
262   // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
263   // enable this check, until then synchronization with Start() via
264   // |thread_lock_| is required...
265   // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
266   AutoLock lock(thread_lock_);
267 
268   StopSoon();
269 
270   // Can't join if the |thread_| is either already gone or is non-joinable.
271   if (thread_.is_null())
272     return;
273 
274   // Wait for the thread to exit.
275   //
276   // TODO(darin): Unfortunately, we need to keep |delegate_| around
277   // until the thread exits. Some consumers are abusing the API. Make them stop.
278   PlatformThread::Join(thread_);
279   thread_ = base::PlatformThreadHandle();
280 
281   // The thread should release |delegate_| on exit (note: Join() adds
282   // an implicit memory barrier and no lock is thus required for this check).
283   DCHECK(!delegate_);
284 
285   stopping_ = false;
286 }
287 
StopSoon()288 void Thread::StopSoon() {
289   // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
290   // enable this check.
291   // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
292 
293   if (stopping_ || !delegate_)
294     return;
295 
296   stopping_ = true;
297 
298   task_runner()->PostTask(
299       FROM_HERE, base::BindOnce(&Thread::ThreadQuitHelper, Unretained(this)));
300 }
301 
DetachFromSequence()302 void Thread::DetachFromSequence() {
303   DCHECK(owning_sequence_checker_.CalledOnValidSequence());
304   owning_sequence_checker_.DetachFromSequence();
305 }
306 
GetThreadId() const307 PlatformThreadId Thread::GetThreadId() const {
308   if (!id_event_.IsSignaled()) {
309     // If the thread is created but not started yet, wait for |id_| being ready.
310     base::ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
311     id_event_.Wait();
312   }
313   return id_;
314 }
315 
IsRunning() const316 bool Thread::IsRunning() const {
317   // TODO(gab): Fix improper usage of this API (http://crbug.com/629139) and
318   // enable this check.
319   // DCHECK(owning_sequence_checker_.CalledOnValidSequence());
320 
321   // If the thread's already started (i.e. |delegate_| is non-null) and
322   // not yet requested to stop (i.e. |stopping_| is false) we can just return
323   // true. (Note that |stopping_| is touched only on the same sequence that
324   // starts / started the new thread so we need no locking here.)
325   if (delegate_ && !stopping_)
326     return true;
327   // Otherwise check the |running_| flag, which is set to true by the new thread
328   // only while it is inside Run().
329   AutoLock lock(running_lock_);
330   return running_;
331 }
332 
Run(RunLoop * run_loop)333 void Thread::Run(RunLoop* run_loop) {
334   // Overridable protected method to be called from our |thread_| only.
335   DCHECK(id_event_.IsSignaled());
336   DCHECK_EQ(id_, PlatformThread::CurrentId());
337 
338   run_loop->Run();
339 }
340 
341 // static
SetThreadWasQuitProperly(bool flag)342 void Thread::SetThreadWasQuitProperly(bool flag) {
343 #if DCHECK_IS_ON()
344   was_quit_properly = flag;
345 #endif
346 }
347 
348 // static
GetThreadWasQuitProperly()349 bool Thread::GetThreadWasQuitProperly() {
350 #if DCHECK_IS_ON()
351   return was_quit_properly;
352 #else
353   return true;
354 #endif
355 }
356 
ThreadMain()357 void Thread::ThreadMain() {
358   // First, make GetThreadId() available to avoid deadlocks. It could be called
359   // any place in the following thread initialization code.
360   DCHECK(!id_event_.IsSignaled());
361   // Note: this read of |id_| while |id_event_| isn't signaled is exceptionally
362   // okay because ThreadMain has a happens-after relationship with the other
363   // write in StartWithOptions().
364   DCHECK_EQ(kInvalidThreadId, id_);
365   id_ = PlatformThread::CurrentId();
366   DCHECK_NE(kInvalidThreadId, id_);
367   id_event_.Signal();
368 
369   // Complete the initialization of our Thread object.
370   PlatformThread::SetName(name_.c_str());
371   ABSL_ANNOTATE_THREAD_NAME(name_.c_str());  // Tell the name to race detector.
372 
373   // Lazily initialize the |message_loop| so that it can run on this thread.
374   DCHECK(delegate_);
375   // This binds CurrentThread and SingleThreadTaskRunner::CurrentDefaultHandle.
376   delegate_->BindToCurrentThread();
377   DCHECK(CurrentThread::Get());
378   DCHECK(SingleThreadTaskRunner::HasCurrentDefault());
379 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
380   // Allow threads running a MessageLoopForIO to use FileDescriptorWatcher API.
381   std::unique_ptr<FileDescriptorWatcher> file_descriptor_watcher;
382   if (CurrentIOThread::IsSet()) {
383     file_descriptor_watcher = std::make_unique<FileDescriptorWatcher>(
384         delegate_->GetDefaultTaskRunner());
385   }
386 #endif  // (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
387 
388 #if BUILDFLAG(IS_WIN)
389   std::unique_ptr<win::ScopedCOMInitializer> com_initializer;
390   if (com_status_ != NONE) {
391     com_initializer.reset(
392         (com_status_ == STA)
393             ? new win::ScopedCOMInitializer()
394             : new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA));
395   }
396 #endif
397 
398   // Let the thread do extra initialization.
399   Init();
400 
401   {
402     AutoLock lock(running_lock_);
403     running_ = true;
404   }
405 
406   start_event_.Signal();
407 
408   RunLoop run_loop;
409   run_loop_ = &run_loop;
410   Run(run_loop_);
411 
412   {
413     AutoLock lock(running_lock_);
414     running_ = false;
415   }
416 
417   // Let the thread do extra cleanup.
418   CleanUp();
419 
420 #if BUILDFLAG(IS_WIN)
421   com_initializer.reset();
422 #endif
423 
424   DCHECK(GetThreadWasQuitProperly());
425 
426   // We can't receive messages anymore.
427   // (The message loop is destructed at the end of this block)
428   delegate_.reset();
429   run_loop_ = nullptr;
430 }
431 
ThreadQuitHelper()432 void Thread::ThreadQuitHelper() {
433   DCHECK(run_loop_);
434   run_loop_->QuitWhenIdle();
435   SetThreadWasQuitProperly(true);
436 }
437 
438 }  // namespace base
439