1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/files/file_descriptor_watcher_posix.h"
6
7 #include <utility>
8
9 #include "base/functional/bind.h"
10 #include "base/functional/callback_helpers.h"
11 #include "base/memory/ptr_util.h"
12 #include "base/memory/raw_ref.h"
13 #include "base/message_loop/message_pump_for_io.h"
14 #include "base/synchronization/waitable_event.h"
15 #include "base/task/current_thread.h"
16 #include "base/task/sequenced_task_runner.h"
17 #include "base/task/single_thread_task_runner.h"
18 #include "base/threading/thread_checker.h"
19 #include "base/threading/thread_restrictions.h"
20 #include "third_party/abseil-cpp/absl/base/attributes.h"
21
22 namespace base {
23
24 namespace {
25
26 // Per-thread FileDescriptorWatcher registration.
27 ABSL_CONST_INIT thread_local FileDescriptorWatcher* fd_watcher = nullptr;
28
29 } // namespace
30
31 class FileDescriptorWatcher::Controller::Watcher
32 : public MessagePumpForIO::FdWatcher,
33 public CurrentThread::DestructionObserver {
34 public:
35 Watcher(WeakPtr<Controller> controller,
36 base::WaitableEvent& on_destroyed,
37 MessagePumpForIO::Mode mode,
38 int fd);
39 Watcher(const Watcher&) = delete;
40 Watcher& operator=(const Watcher&) = delete;
41 ~Watcher() override;
42
43 void StartWatching();
44
45 private:
46 friend class FileDescriptorWatcher;
47
48 // MessagePumpForIO::FdWatcher:
49 void OnFileCanReadWithoutBlocking(int fd) override;
50 void OnFileCanWriteWithoutBlocking(int fd) override;
51
52 // CurrentThread::DestructionObserver:
53 void WillDestroyCurrentMessageLoop() override;
54
55 // The MessagePumpForIO's watch handle (stops the watch when destroyed).
56 MessagePumpForIO::FdWatchController fd_watch_controller_;
57
58 // Runs tasks on the sequence on which this was instantiated (i.e. the
59 // sequence on which the callback must run).
60 const scoped_refptr<SequencedTaskRunner> callback_task_runner_ =
61 SequencedTaskRunner::GetCurrentDefault();
62
63 // The Controller that created this Watcher. This WeakPtr is bound to the
64 // |controller_| thread and can only be used by this Watcher to post back to
65 // |callback_task_runner_|.
66 WeakPtr<Controller> controller_;
67
68 // WaitableEvent to signal to ensure that the Watcher is always destroyed
69 // before the Controller.
70 const raw_ref<base::WaitableEvent, AcrossTasksDanglingUntriaged>
71 on_destroyed_;
72
73 // Whether this Watcher is notified when |fd_| becomes readable or writable
74 // without blocking.
75 const MessagePumpForIO::Mode mode_;
76
77 // The watched file descriptor.
78 const int fd_;
79
80 // Except for the constructor, every method of this class must run on the same
81 // MessagePumpForIO thread.
82 ThreadChecker thread_checker_;
83
84 // Whether this Watcher was registered as a DestructionObserver on the
85 // MessagePumpForIO thread.
86 bool registered_as_destruction_observer_ = false;
87 };
88
Watcher(WeakPtr<Controller> controller,base::WaitableEvent & on_destroyed,MessagePumpForIO::Mode mode,int fd)89 FileDescriptorWatcher::Controller::Watcher::Watcher(
90 WeakPtr<Controller> controller,
91 base::WaitableEvent& on_destroyed,
92 MessagePumpForIO::Mode mode,
93 int fd)
94 : fd_watch_controller_(FROM_HERE),
95 controller_(controller),
96 on_destroyed_(on_destroyed),
97 mode_(mode),
98 fd_(fd) {
99 DCHECK(callback_task_runner_);
100 thread_checker_.DetachFromThread();
101 }
102
~Watcher()103 FileDescriptorWatcher::Controller::Watcher::~Watcher() {
104 DCHECK(thread_checker_.CalledOnValidThread());
105 CurrentIOThread::Get()->RemoveDestructionObserver(this);
106
107 // Stop watching the descriptor before signalling |on_destroyed_|.
108 CHECK(fd_watch_controller_.StopWatchingFileDescriptor());
109 on_destroyed_->Signal();
110 }
111
StartWatching()112 void FileDescriptorWatcher::Controller::Watcher::StartWatching() {
113 DCHECK(thread_checker_.CalledOnValidThread());
114 DCHECK(CurrentIOThread::IsSet());
115
116 const bool watch_success = CurrentIOThread::Get()->WatchFileDescriptor(
117 fd_, false, mode_, &fd_watch_controller_, this);
118 DCHECK(watch_success) << "Failed to watch fd=" << fd_;
119
120 if (!registered_as_destruction_observer_) {
121 CurrentIOThread::Get()->AddDestructionObserver(this);
122 registered_as_destruction_observer_ = true;
123 }
124 }
125
OnFileCanReadWithoutBlocking(int fd)126 void FileDescriptorWatcher::Controller::Watcher::OnFileCanReadWithoutBlocking(
127 int fd) {
128 DCHECK_EQ(fd_, fd);
129 DCHECK_EQ(MessagePumpForIO::WATCH_READ, mode_);
130 DCHECK(thread_checker_.CalledOnValidThread());
131
132 // Run the callback on the sequence on which the watch was initiated.
133 callback_task_runner_->PostTask(
134 FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
135 }
136
OnFileCanWriteWithoutBlocking(int fd)137 void FileDescriptorWatcher::Controller::Watcher::OnFileCanWriteWithoutBlocking(
138 int fd) {
139 DCHECK_EQ(fd_, fd);
140 DCHECK_EQ(MessagePumpForIO::WATCH_WRITE, mode_);
141 DCHECK(thread_checker_.CalledOnValidThread());
142
143 // Run the callback on the sequence on which the watch was initiated.
144 callback_task_runner_->PostTask(
145 FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
146 }
147
148 void FileDescriptorWatcher::Controller::Watcher::
WillDestroyCurrentMessageLoop()149 WillDestroyCurrentMessageLoop() {
150 DCHECK(thread_checker_.CalledOnValidThread());
151
152 if (callback_task_runner_->RunsTasksInCurrentSequence()) {
153 // |controller_| can be accessed directly when Watcher runs on the same
154 // thread.
155 Watcher* watcher = controller_->watcher_;
156 controller_->watcher_ = nullptr;
157 delete watcher;
158 } else {
159 // If the Watcher and the Controller live on different threads, delete
160 // |this| synchronously. Pending tasks bound to an unretained Watcher* will
161 // not run since this loop is dead. The associated Controller will not know
162 // whether the Watcher has been destroyed but it never uses it directly and
163 // will ultimately send it to this thread for deletion (and that also won't
164 // run since the loop being dead).
165 delete this;
166 }
167 }
168
Controller(MessagePumpForIO::Mode mode,int fd,const RepeatingClosure & callback)169 FileDescriptorWatcher::Controller::Controller(MessagePumpForIO::Mode mode,
170 int fd,
171 const RepeatingClosure& callback)
172 : callback_(callback),
173 io_thread_task_runner_(fd_watcher->io_thread_task_runner()) {
174 DCHECK(!callback_.is_null());
175 DCHECK(io_thread_task_runner_);
176 watcher_ =
177 new Watcher(weak_factory_.GetWeakPtr(), on_watcher_destroyed_, mode, fd);
178 StartWatching();
179 }
180
~Controller()181 FileDescriptorWatcher::Controller::~Controller() {
182 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
183
184 if (io_thread_task_runner_->BelongsToCurrentThread()) {
185 // If the MessagePumpForIO and the Controller live on the same thread.
186 if (watcher_)
187 delete watcher_;
188 } else {
189 // Synchronously wait until |watcher_| is deleted on the MessagePumpForIO
190 // thread. This ensures that the file descriptor is never accessed after
191 // this destructor returns.
192 //
193 // We considered associating "generations" to file descriptors to avoid the
194 // synchronous wait. For example, if the IO thread gets a "cancel" for fd=6,
195 // generation=1 after getting a "start watching" for fd=6, generation=2, it
196 // can ignore the "Cancel". However, "generations" didn't solve this race:
197 //
198 // T1 (client) Start watching fd = 6 with WatchReadable()
199 // Stop watching fd = 6
200 // Close fd = 6
201 // Open a new file, fd = 6 gets reused.
202 // T2 (io) Watcher::StartWatching()
203 // Incorrectly starts watching fd = 6 which now refers to a
204 // different file than when WatchReadable() was called.
205 auto delete_task = BindOnce(
206 [](Watcher* watcher) {
207 // Since |watcher| is a raw pointer, it isn't deleted if this callback
208 // is deleted before it gets to run.
209 delete watcher;
210 },
211 UnsafeDanglingUntriaged(watcher_));
212 io_thread_task_runner_->PostTask(FROM_HERE, std::move(delete_task));
213 ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow;
214 on_watcher_destroyed_.Wait();
215 }
216
217 // Since WeakPtrs are invalidated by the destructor, any pending RunCallback()
218 // won't be invoked after this returns.
219 }
220
StartWatching()221 void FileDescriptorWatcher::Controller::StartWatching() {
222 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
223 if (io_thread_task_runner_->BelongsToCurrentThread()) {
224 // If the MessagePumpForIO and the Controller live on the same thread.
225 watcher_->StartWatching();
226 } else {
227 // It is safe to use Unretained() below because |watcher_| can only be
228 // deleted by a delete task posted to |io_thread_task_runner_| by this
229 // Controller's destructor. Since this delete task hasn't been posted yet,
230 // it can't run before the task posted below.
231 io_thread_task_runner_->PostTask(
232 FROM_HERE, BindOnce(&Watcher::StartWatching, Unretained(watcher_)));
233 }
234 }
235
RunCallback()236 void FileDescriptorWatcher::Controller::RunCallback() {
237 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
238
239 WeakPtr<Controller> weak_this = weak_factory_.GetWeakPtr();
240
241 // Run a copy of the callback in case this Controller is deleted by the
242 // callback. This would cause the callback itself to be deleted while it is
243 // being run.
244 RepeatingClosure callback_copy = callback_;
245 callback_copy.Run();
246
247 // If |this| wasn't deleted, re-enable the watch.
248 if (weak_this)
249 StartWatching();
250 }
251
FileDescriptorWatcher(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner)252 FileDescriptorWatcher::FileDescriptorWatcher(
253 scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner)
254 : resetter_(&fd_watcher, this, nullptr),
255 io_thread_task_runner_(std::move(io_thread_task_runner)) {}
256
257 FileDescriptorWatcher::~FileDescriptorWatcher() = default;
258
259 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchReadable(int fd,const RepeatingClosure & callback)260 FileDescriptorWatcher::WatchReadable(int fd, const RepeatingClosure& callback) {
261 return WrapUnique(new Controller(MessagePumpForIO::WATCH_READ, fd, callback));
262 }
263
264 std::unique_ptr<FileDescriptorWatcher::Controller>
WatchWritable(int fd,const RepeatingClosure & callback)265 FileDescriptorWatcher::WatchWritable(int fd, const RepeatingClosure& callback) {
266 return WrapUnique(
267 new Controller(MessagePumpForIO::WATCH_WRITE, fd, callback));
268 }
269
270 #if DCHECK_IS_ON()
AssertAllowed()271 void FileDescriptorWatcher::AssertAllowed() {
272 DCHECK(fd_watcher);
273 }
274 #endif
275
276 } // namespace base
277