xref: /aosp_15_r20/external/cronet/base/files/file_descriptor_watcher_posix.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/files/file_descriptor_watcher_posix.h"
6 
7 #include <utility>
8 
9 #include "base/functional/bind.h"
10 #include "base/functional/callback_helpers.h"
11 #include "base/memory/ptr_util.h"
12 #include "base/memory/raw_ref.h"
13 #include "base/message_loop/message_pump_for_io.h"
14 #include "base/synchronization/waitable_event.h"
15 #include "base/task/current_thread.h"
16 #include "base/task/sequenced_task_runner.h"
17 #include "base/task/single_thread_task_runner.h"
18 #include "base/threading/thread_checker.h"
19 #include "base/threading/thread_restrictions.h"
20 #include "third_party/abseil-cpp/absl/base/attributes.h"
21 
22 namespace base {
23 
24 namespace {
25 
26 // Per-thread FileDescriptorWatcher registration.
27 ABSL_CONST_INIT thread_local FileDescriptorWatcher* fd_watcher = nullptr;
28 
29 }  // namespace
30 
31 class FileDescriptorWatcher::Controller::Watcher
32     : public MessagePumpForIO::FdWatcher,
33       public CurrentThread::DestructionObserver {
34  public:
35   Watcher(WeakPtr<Controller> controller,
36           base::WaitableEvent& on_destroyed,
37           MessagePumpForIO::Mode mode,
38           int fd);
39   Watcher(const Watcher&) = delete;
40   Watcher& operator=(const Watcher&) = delete;
41   ~Watcher() override;
42 
43   void StartWatching();
44 
45  private:
46   friend class FileDescriptorWatcher;
47 
48   // MessagePumpForIO::FdWatcher:
49   void OnFileCanReadWithoutBlocking(int fd) override;
50   void OnFileCanWriteWithoutBlocking(int fd) override;
51 
52   // CurrentThread::DestructionObserver:
53   void WillDestroyCurrentMessageLoop() override;
54 
55   // The MessagePumpForIO's watch handle (stops the watch when destroyed).
56   MessagePumpForIO::FdWatchController fd_watch_controller_;
57 
58   // Runs tasks on the sequence on which this was instantiated (i.e. the
59   // sequence on which the callback must run).
60   const scoped_refptr<SequencedTaskRunner> callback_task_runner_ =
61       SequencedTaskRunner::GetCurrentDefault();
62 
63   // The Controller that created this Watcher. This WeakPtr is bound to the
64   // |controller_| thread and can only be used by this Watcher to post back to
65   // |callback_task_runner_|.
66   WeakPtr<Controller> controller_;
67 
68   // WaitableEvent to signal to ensure that the Watcher is always destroyed
69   // before the Controller.
70   const raw_ref<base::WaitableEvent, AcrossTasksDanglingUntriaged>
71       on_destroyed_;
72 
73   // Whether this Watcher is notified when |fd_| becomes readable or writable
74   // without blocking.
75   const MessagePumpForIO::Mode mode_;
76 
77   // The watched file descriptor.
78   const int fd_;
79 
80   // Except for the constructor, every method of this class must run on the same
81   // MessagePumpForIO thread.
82   ThreadChecker thread_checker_;
83 
84   // Whether this Watcher was registered as a DestructionObserver on the
85   // MessagePumpForIO thread.
86   bool registered_as_destruction_observer_ = false;
87 };
88 
Watcher(WeakPtr<Controller> controller,base::WaitableEvent & on_destroyed,MessagePumpForIO::Mode mode,int fd)89 FileDescriptorWatcher::Controller::Watcher::Watcher(
90     WeakPtr<Controller> controller,
91     base::WaitableEvent& on_destroyed,
92     MessagePumpForIO::Mode mode,
93     int fd)
94     : fd_watch_controller_(FROM_HERE),
95       controller_(controller),
96       on_destroyed_(on_destroyed),
97       mode_(mode),
98       fd_(fd) {
99   DCHECK(callback_task_runner_);
100   thread_checker_.DetachFromThread();
101 }
102 
~Watcher()103 FileDescriptorWatcher::Controller::Watcher::~Watcher() {
104   DCHECK(thread_checker_.CalledOnValidThread());
105   CurrentIOThread::Get()->RemoveDestructionObserver(this);
106 
107   // Stop watching the descriptor before signalling |on_destroyed_|.
108   CHECK(fd_watch_controller_.StopWatchingFileDescriptor());
109   on_destroyed_->Signal();
110 }
111 
StartWatching()112 void FileDescriptorWatcher::Controller::Watcher::StartWatching() {
113   DCHECK(thread_checker_.CalledOnValidThread());
114   DCHECK(CurrentIOThread::IsSet());
115 
116   const bool watch_success = CurrentIOThread::Get()->WatchFileDescriptor(
117       fd_, false, mode_, &fd_watch_controller_, this);
118   DCHECK(watch_success) << "Failed to watch fd=" << fd_;
119 
120   if (!registered_as_destruction_observer_) {
121     CurrentIOThread::Get()->AddDestructionObserver(this);
122     registered_as_destruction_observer_ = true;
123   }
124 }
125 
OnFileCanReadWithoutBlocking(int fd)126 void FileDescriptorWatcher::Controller::Watcher::OnFileCanReadWithoutBlocking(
127     int fd) {
128   DCHECK_EQ(fd_, fd);
129   DCHECK_EQ(MessagePumpForIO::WATCH_READ, mode_);
130   DCHECK(thread_checker_.CalledOnValidThread());
131 
132   // Run the callback on the sequence on which the watch was initiated.
133   callback_task_runner_->PostTask(
134       FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
135 }
136 
OnFileCanWriteWithoutBlocking(int fd)137 void FileDescriptorWatcher::Controller::Watcher::OnFileCanWriteWithoutBlocking(
138     int fd) {
139   DCHECK_EQ(fd_, fd);
140   DCHECK_EQ(MessagePumpForIO::WATCH_WRITE, mode_);
141   DCHECK(thread_checker_.CalledOnValidThread());
142 
143   // Run the callback on the sequence on which the watch was initiated.
144   callback_task_runner_->PostTask(
145       FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
146 }
147 
148 void FileDescriptorWatcher::Controller::Watcher::
WillDestroyCurrentMessageLoop()149     WillDestroyCurrentMessageLoop() {
150   DCHECK(thread_checker_.CalledOnValidThread());
151 
152   if (callback_task_runner_->RunsTasksInCurrentSequence()) {
153     // |controller_| can be accessed directly when Watcher runs on the same
154     // thread.
155     Watcher* watcher = controller_->watcher_;
156     controller_->watcher_ = nullptr;
157     delete watcher;
158   } else {
159     // If the Watcher and the Controller live on different threads, delete
160     // |this| synchronously. Pending tasks bound to an unretained Watcher* will
161     // not run since this loop is dead. The associated Controller will not know
162     // whether the Watcher has been destroyed but it never uses it directly and
163     // will ultimately send it to this thread for deletion (and that also  won't
164     // run since the loop being dead).
165     delete this;
166   }
167 }
168 
Controller(MessagePumpForIO::Mode mode,int fd,const RepeatingClosure & callback)169 FileDescriptorWatcher::Controller::Controller(MessagePumpForIO::Mode mode,
170                                               int fd,
171                                               const RepeatingClosure& callback)
172     : callback_(callback),
173       io_thread_task_runner_(fd_watcher->io_thread_task_runner()) {
174   DCHECK(!callback_.is_null());
175   DCHECK(io_thread_task_runner_);
176   watcher_ =
177       new Watcher(weak_factory_.GetWeakPtr(), on_watcher_destroyed_, mode, fd);
178   StartWatching();
179 }
180 
~Controller()181 FileDescriptorWatcher::Controller::~Controller() {
182   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
183 
184   if (io_thread_task_runner_->BelongsToCurrentThread()) {
185     // If the MessagePumpForIO and the Controller live on the same thread.
186     if (watcher_)
187       delete watcher_;
188   } else {
189     // Synchronously wait until |watcher_| is deleted on the MessagePumpForIO
190     // thread. This ensures that the file descriptor is never accessed after
191     // this destructor returns.
192     //
193     // We considered associating "generations" to file descriptors to avoid the
194     // synchronous wait. For example, if the IO thread gets a "cancel" for fd=6,
195     // generation=1 after getting a "start watching" for fd=6, generation=2, it
196     // can ignore the "Cancel". However, "generations" didn't solve this race:
197     //
198     // T1 (client) Start watching fd = 6 with WatchReadable()
199     //             Stop watching fd = 6
200     //             Close fd = 6
201     //             Open a new file, fd = 6 gets reused.
202     // T2 (io)     Watcher::StartWatching()
203     //               Incorrectly starts watching fd = 6 which now refers to a
204     //               different file than when WatchReadable() was called.
205     auto delete_task = BindOnce(
206         [](Watcher* watcher) {
207           // Since |watcher| is a raw pointer, it isn't deleted if this callback
208           // is deleted before it gets to run.
209           delete watcher;
210         },
211         UnsafeDanglingUntriaged(watcher_));
212     io_thread_task_runner_->PostTask(FROM_HERE, std::move(delete_task));
213     ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow;
214     on_watcher_destroyed_.Wait();
215   }
216 
217   // Since WeakPtrs are invalidated by the destructor, any pending RunCallback()
218   // won't be invoked after this returns.
219 }
220 
StartWatching()221 void FileDescriptorWatcher::Controller::StartWatching() {
222   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
223   if (io_thread_task_runner_->BelongsToCurrentThread()) {
224     // If the MessagePumpForIO and the Controller live on the same thread.
225     watcher_->StartWatching();
226   } else {
227     // It is safe to use Unretained() below because |watcher_| can only be
228     // deleted by a delete task posted to |io_thread_task_runner_| by this
229     // Controller's destructor. Since this delete task hasn't been posted yet,
230     // it can't run before the task posted below.
231     io_thread_task_runner_->PostTask(
232         FROM_HERE, BindOnce(&Watcher::StartWatching, Unretained(watcher_)));
233   }
234 }
235 
RunCallback()236 void FileDescriptorWatcher::Controller::RunCallback() {
237   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
238 
239   WeakPtr<Controller> weak_this = weak_factory_.GetWeakPtr();
240 
241   // Run a copy of the callback in case this Controller is deleted by the
242   // callback. This would cause the callback itself to be deleted while it is
243   // being run.
244   RepeatingClosure callback_copy = callback_;
245   callback_copy.Run();
246 
247   // If |this| wasn't deleted, re-enable the watch.
248   if (weak_this)
249     StartWatching();
250 }
251 
FileDescriptorWatcher(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner)252 FileDescriptorWatcher::FileDescriptorWatcher(
253     scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner)
254     : resetter_(&fd_watcher, this, nullptr),
255       io_thread_task_runner_(std::move(io_thread_task_runner)) {}
256 
257 FileDescriptorWatcher::~FileDescriptorWatcher() = default;
258 
259 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchReadable(int fd,const RepeatingClosure & callback)260 FileDescriptorWatcher::WatchReadable(int fd, const RepeatingClosure& callback) {
261   return WrapUnique(new Controller(MessagePumpForIO::WATCH_READ, fd, callback));
262 }
263 
264 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchWritable(int fd,const RepeatingClosure & callback)265 FileDescriptorWatcher::WatchWritable(int fd, const RepeatingClosure& callback) {
266   return WrapUnique(
267       new Controller(MessagePumpForIO::WATCH_WRITE, fd, callback));
268 }
269 
270 #if DCHECK_IS_ON()
AssertAllowed()271 void FileDescriptorWatcher::AssertAllowed() {
272   DCHECK(fd_watcher);
273 }
274 #endif
275 
276 }  // namespace base
277