1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/base/build_config.h"
18
19 #include "perfetto/ext/base/platform.h"
20 #include "perfetto/ext/base/unix_task_runner.h"
21
22 #include <errno.h>
23 #include <stdlib.h>
24
25 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
26 #include <Windows.h>
27 #include <synchapi.h>
28 #else
29 #include <unistd.h>
30 #endif
31
32 #include <algorithm>
33 #include <limits>
34
35 #include "perfetto/ext/base/watchdog.h"
36
37 namespace perfetto {
38 namespace base {
39
UnixTaskRunner()40 UnixTaskRunner::UnixTaskRunner() {
41 AddFileDescriptorWatch(event_.fd(), [] {
42 // Not reached -- see PostFileDescriptorWatches().
43 PERFETTO_DFATAL("Should be unreachable.");
44 });
45 }
46
47 UnixTaskRunner::~UnixTaskRunner() = default;
48
WakeUp()49 void UnixTaskRunner::WakeUp() {
50 event_.Notify();
51 }
52
Run()53 void UnixTaskRunner::Run() {
54 PERFETTO_DCHECK_THREAD(thread_checker_);
55 created_thread_id_.store(GetThreadId(), std::memory_order_relaxed);
56 {
57 std::lock_guard<std::mutex> lock(lock_);
58 quit_ = false;
59 }
60 for (;;) {
61 int poll_timeout_ms;
62 {
63 std::lock_guard<std::mutex> lock(lock_);
64 if (quit_)
65 return;
66 poll_timeout_ms = GetDelayMsToNextTaskLocked();
67 UpdateWatchTasksLocked();
68 }
69
70 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
71 DWORD timeout =
72 poll_timeout_ms >= 0 ? static_cast<DWORD>(poll_timeout_ms) : INFINITE;
73 DWORD ret =
74 WaitForMultipleObjects(static_cast<DWORD>(poll_fds_.size()),
75 &poll_fds_[0], /*bWaitAll=*/false, timeout);
76 // Unlike poll(2), WaitForMultipleObjects() returns only *one* handle in the
77 // set, even when >1 is signalled. In order to avoid starvation,
78 // PostFileDescriptorWatches() will WaitForSingleObject() each other handle
79 // to ensure fairness. |ret| here is passed just to avoid an extra
80 // WaitForSingleObject() for the one handle that WaitForMultipleObject()
81 // returned.
82 PostFileDescriptorWatches(ret);
83 #else
84 platform::BeforeMaybeBlockingSyscall();
85 int ret = PERFETTO_EINTR(poll(
86 &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
87 platform::AfterMaybeBlockingSyscall();
88 PERFETTO_CHECK(ret >= 0);
89 PostFileDescriptorWatches(0 /*ignored*/);
90 #endif
91
92 // To avoid starvation we always interleave all types of tasks -- immediate,
93 // delayed and file descriptor watches.
94 RunImmediateAndDelayedTask();
95 }
96 }
97
Quit()98 void UnixTaskRunner::Quit() {
99 std::lock_guard<std::mutex> lock(lock_);
100 quit_ = true;
101 WakeUp();
102 }
103
QuitCalled()104 bool UnixTaskRunner::QuitCalled() {
105 std::lock_guard<std::mutex> lock(lock_);
106 return quit_;
107 }
108
IsIdleForTesting()109 bool UnixTaskRunner::IsIdleForTesting() {
110 std::lock_guard<std::mutex> lock(lock_);
111 return immediate_tasks_.empty();
112 }
113
AdvanceTimeForTesting(uint32_t ms)114 void UnixTaskRunner::AdvanceTimeForTesting(uint32_t ms) {
115 std::lock_guard<std::mutex> lock(lock_);
116 advanced_time_for_testing_ += TimeMillis(ms);
117 }
118
UpdateWatchTasksLocked()119 void UnixTaskRunner::UpdateWatchTasksLocked() {
120 PERFETTO_DCHECK_THREAD(thread_checker_);
121 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
122 if (!watch_tasks_changed_)
123 return;
124 watch_tasks_changed_ = false;
125 #endif
126 poll_fds_.clear();
127 for (auto& it : watch_tasks_) {
128 PlatformHandle handle = it.first;
129 WatchTask& watch_task = it.second;
130 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
131 if (!watch_task.pending)
132 poll_fds_.push_back(handle);
133 #else
134 watch_task.poll_fd_index = poll_fds_.size();
135 poll_fds_.push_back({handle, POLLIN | POLLHUP, 0});
136 #endif
137 }
138 }
139
RunImmediateAndDelayedTask()140 void UnixTaskRunner::RunImmediateAndDelayedTask() {
141 // If locking overhead becomes an issue, add a separate work queue.
142 std::function<void()> immediate_task;
143 std::function<void()> delayed_task;
144 TimeMillis now = GetWallTimeMs();
145 {
146 std::lock_guard<std::mutex> lock(lock_);
147 if (!immediate_tasks_.empty()) {
148 immediate_task = std::move(immediate_tasks_.front());
149 immediate_tasks_.pop_front();
150 }
151 if (!delayed_tasks_.empty()) {
152 auto it = delayed_tasks_.begin();
153 if (now + advanced_time_for_testing_ >= it->first) {
154 delayed_task = std::move(it->second);
155 delayed_tasks_.erase(it);
156 }
157 }
158 }
159
160 errno = 0;
161 if (immediate_task)
162 RunTaskWithWatchdogGuard(immediate_task);
163 errno = 0;
164 if (delayed_task)
165 RunTaskWithWatchdogGuard(delayed_task);
166 }
167
PostFileDescriptorWatches(uint64_t windows_wait_result)168 void UnixTaskRunner::PostFileDescriptorWatches(uint64_t windows_wait_result) {
169 PERFETTO_DCHECK_THREAD(thread_checker_);
170 for (size_t i = 0; i < poll_fds_.size(); i++) {
171 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
172 const PlatformHandle handle = poll_fds_[i];
173 // |windows_wait_result| is the result of WaitForMultipleObjects() call. If
174 // one of the objects was signalled, it will have a value between
175 // [0, poll_fds_.size()].
176 if (i != windows_wait_result &&
177 WaitForSingleObject(handle, 0) != WAIT_OBJECT_0) {
178 continue;
179 }
180 #else
181 base::ignore_result(windows_wait_result);
182 const PlatformHandle handle = poll_fds_[i].fd;
183 if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
184 continue;
185 poll_fds_[i].revents = 0;
186 #endif
187
188 // The wake-up event is handled inline to avoid an infinite recursion of
189 // posted tasks.
190 if (handle == event_.fd()) {
191 event_.Clear();
192 continue;
193 }
194
195 // Binding to |this| is safe since we are the only object executing the
196 // task.
197 PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this, handle));
198
199 // Flag the task as pending.
200 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
201 // On Windows this is done by marking the WatchTask entry as pending. This
202 // is more expensive than Linux as requires rebuilding the |poll_fds_|
203 // vector on each call. There doesn't seem to be a good alternative though.
204 auto it = watch_tasks_.find(handle);
205 PERFETTO_CHECK(it != watch_tasks_.end());
206 PERFETTO_DCHECK(!it->second.pending);
207 it->second.pending = true;
208 #else
209 // On UNIX systems instead, we just make the fd negative while its task is
210 // pending. This makes poll(2) ignore the fd.
211 PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
212 poll_fds_[i].fd = -poll_fds_[i].fd;
213 #endif
214 }
215 }
216
RunFileDescriptorWatch(PlatformHandle fd)217 void UnixTaskRunner::RunFileDescriptorWatch(PlatformHandle fd) {
218 std::function<void()> task;
219 {
220 std::lock_guard<std::mutex> lock(lock_);
221 auto it = watch_tasks_.find(fd);
222 if (it == watch_tasks_.end())
223 return;
224 WatchTask& watch_task = it->second;
225
226 // Make poll(2) pay attention to the fd again. Since another thread may have
227 // updated this watch we need to refresh the set first.
228 UpdateWatchTasksLocked();
229
230 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
231 // On Windows we manually track the presence of outstanding tasks for the
232 // watch. The UpdateWatchTasksLocked() in the Run() loop will re-add the
233 // task to the |poll_fds_| vector.
234 PERFETTO_DCHECK(watch_task.pending);
235 watch_task.pending = false;
236 #else
237 size_t fd_index = watch_task.poll_fd_index;
238 PERFETTO_DCHECK(fd_index < poll_fds_.size());
239 PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
240 poll_fds_[fd_index].fd = fd;
241 #endif
242 task = watch_task.callback;
243 }
244 errno = 0;
245 RunTaskWithWatchdogGuard(task);
246 }
247
GetDelayMsToNextTaskLocked() const248 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
249 PERFETTO_DCHECK_THREAD(thread_checker_);
250 if (!immediate_tasks_.empty())
251 return 0;
252 if (!delayed_tasks_.empty()) {
253 TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs() -
254 advanced_time_for_testing_;
255 return std::max(0, static_cast<int>(diff.count()));
256 }
257 return -1;
258 }
259
PostTask(std::function<void ()> task)260 void UnixTaskRunner::PostTask(std::function<void()> task) {
261 bool was_empty;
262 {
263 std::lock_guard<std::mutex> lock(lock_);
264 was_empty = immediate_tasks_.empty();
265 immediate_tasks_.push_back(std::move(task));
266 }
267 if (was_empty)
268 WakeUp();
269 }
270
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)271 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
272 uint32_t delay_ms) {
273 TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
274 {
275 std::lock_guard<std::mutex> lock(lock_);
276 delayed_tasks_.insert(
277 std::make_pair(runtime + advanced_time_for_testing_, std::move(task)));
278 }
279 WakeUp();
280 }
281
AddFileDescriptorWatch(PlatformHandle fd,std::function<void ()> task)282 void UnixTaskRunner::AddFileDescriptorWatch(PlatformHandle fd,
283 std::function<void()> task) {
284 PERFETTO_DCHECK(PlatformHandleChecker::IsValid(fd));
285 {
286 std::lock_guard<std::mutex> lock(lock_);
287 PERFETTO_DCHECK(!watch_tasks_.count(fd));
288 WatchTask& watch_task = watch_tasks_[fd];
289 watch_task.callback = std::move(task);
290 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
291 watch_task.pending = false;
292 #else
293 watch_task.poll_fd_index = SIZE_MAX;
294 #endif
295 watch_tasks_changed_ = true;
296 }
297 WakeUp();
298 }
299
RemoveFileDescriptorWatch(PlatformHandle fd)300 void UnixTaskRunner::RemoveFileDescriptorWatch(PlatformHandle fd) {
301 PERFETTO_DCHECK(PlatformHandleChecker::IsValid(fd));
302 {
303 std::lock_guard<std::mutex> lock(lock_);
304 PERFETTO_DCHECK(watch_tasks_.count(fd));
305 watch_tasks_.erase(fd);
306 watch_tasks_changed_ = true;
307 }
308 // No need to schedule a wake-up for this.
309 }
310
RunsTasksOnCurrentThread() const311 bool UnixTaskRunner::RunsTasksOnCurrentThread() const {
312 return GetThreadId() == created_thread_id_.load(std::memory_order_relaxed);
313 }
314
315 } // namespace base
316 } // namespace perfetto
317