xref: /aosp_15_r20/external/perfetto/src/base/unix_task_runner.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/base/build_config.h"
18 
19 #include "perfetto/ext/base/platform.h"
20 #include "perfetto/ext/base/unix_task_runner.h"
21 
22 #include <errno.h>
23 #include <stdlib.h>
24 
25 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
26 #include <Windows.h>
27 #include <synchapi.h>
28 #else
29 #include <unistd.h>
30 #endif
31 
32 #include <algorithm>
33 #include <limits>
34 
35 #include "perfetto/ext/base/watchdog.h"
36 
37 namespace perfetto {
38 namespace base {
39 
UnixTaskRunner()40 UnixTaskRunner::UnixTaskRunner() {
41   AddFileDescriptorWatch(event_.fd(), [] {
42     // Not reached -- see PostFileDescriptorWatches().
43     PERFETTO_DFATAL("Should be unreachable.");
44   });
45 }
46 
47 UnixTaskRunner::~UnixTaskRunner() = default;
48 
WakeUp()49 void UnixTaskRunner::WakeUp() {
50   event_.Notify();
51 }
52 
Run()53 void UnixTaskRunner::Run() {
54   PERFETTO_DCHECK_THREAD(thread_checker_);
55   created_thread_id_.store(GetThreadId(), std::memory_order_relaxed);
56   {
57     std::lock_guard<std::mutex> lock(lock_);
58     quit_ = false;
59   }
60   for (;;) {
61     int poll_timeout_ms;
62     {
63       std::lock_guard<std::mutex> lock(lock_);
64       if (quit_)
65         return;
66       poll_timeout_ms = GetDelayMsToNextTaskLocked();
67       UpdateWatchTasksLocked();
68     }
69 
70 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
71     DWORD timeout =
72         poll_timeout_ms >= 0 ? static_cast<DWORD>(poll_timeout_ms) : INFINITE;
73     DWORD ret =
74         WaitForMultipleObjects(static_cast<DWORD>(poll_fds_.size()),
75                                &poll_fds_[0], /*bWaitAll=*/false, timeout);
76     // Unlike poll(2), WaitForMultipleObjects() returns only *one* handle in the
77     // set, even when >1 is signalled. In order to avoid starvation,
78     // PostFileDescriptorWatches() will WaitForSingleObject() each other handle
79     // to ensure fairness. |ret| here is passed just to avoid an extra
80     // WaitForSingleObject() for the one handle that WaitForMultipleObject()
81     // returned.
82     PostFileDescriptorWatches(ret);
83 #else
84     platform::BeforeMaybeBlockingSyscall();
85     int ret = PERFETTO_EINTR(poll(
86         &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
87     platform::AfterMaybeBlockingSyscall();
88     PERFETTO_CHECK(ret >= 0);
89     PostFileDescriptorWatches(0 /*ignored*/);
90 #endif
91 
92     // To avoid starvation we always interleave all types of tasks -- immediate,
93     // delayed and file descriptor watches.
94     RunImmediateAndDelayedTask();
95   }
96 }
97 
Quit()98 void UnixTaskRunner::Quit() {
99   std::lock_guard<std::mutex> lock(lock_);
100   quit_ = true;
101   WakeUp();
102 }
103 
QuitCalled()104 bool UnixTaskRunner::QuitCalled() {
105   std::lock_guard<std::mutex> lock(lock_);
106   return quit_;
107 }
108 
IsIdleForTesting()109 bool UnixTaskRunner::IsIdleForTesting() {
110   std::lock_guard<std::mutex> lock(lock_);
111   return immediate_tasks_.empty();
112 }
113 
AdvanceTimeForTesting(uint32_t ms)114 void UnixTaskRunner::AdvanceTimeForTesting(uint32_t ms) {
115   std::lock_guard<std::mutex> lock(lock_);
116   advanced_time_for_testing_ += TimeMillis(ms);
117 }
118 
UpdateWatchTasksLocked()119 void UnixTaskRunner::UpdateWatchTasksLocked() {
120   PERFETTO_DCHECK_THREAD(thread_checker_);
121 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
122   if (!watch_tasks_changed_)
123     return;
124   watch_tasks_changed_ = false;
125 #endif
126   poll_fds_.clear();
127   for (auto& it : watch_tasks_) {
128     PlatformHandle handle = it.first;
129     WatchTask& watch_task = it.second;
130 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
131     if (!watch_task.pending)
132       poll_fds_.push_back(handle);
133 #else
134     watch_task.poll_fd_index = poll_fds_.size();
135     poll_fds_.push_back({handle, POLLIN | POLLHUP, 0});
136 #endif
137   }
138 }
139 
RunImmediateAndDelayedTask()140 void UnixTaskRunner::RunImmediateAndDelayedTask() {
141   // If locking overhead becomes an issue, add a separate work queue.
142   std::function<void()> immediate_task;
143   std::function<void()> delayed_task;
144   TimeMillis now = GetWallTimeMs();
145   {
146     std::lock_guard<std::mutex> lock(lock_);
147     if (!immediate_tasks_.empty()) {
148       immediate_task = std::move(immediate_tasks_.front());
149       immediate_tasks_.pop_front();
150     }
151     if (!delayed_tasks_.empty()) {
152       auto it = delayed_tasks_.begin();
153       if (now + advanced_time_for_testing_ >= it->first) {
154         delayed_task = std::move(it->second);
155         delayed_tasks_.erase(it);
156       }
157     }
158   }
159 
160   errno = 0;
161   if (immediate_task)
162     RunTaskWithWatchdogGuard(immediate_task);
163   errno = 0;
164   if (delayed_task)
165     RunTaskWithWatchdogGuard(delayed_task);
166 }
167 
PostFileDescriptorWatches(uint64_t windows_wait_result)168 void UnixTaskRunner::PostFileDescriptorWatches(uint64_t windows_wait_result) {
169   PERFETTO_DCHECK_THREAD(thread_checker_);
170   for (size_t i = 0; i < poll_fds_.size(); i++) {
171 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
172     const PlatformHandle handle = poll_fds_[i];
173     // |windows_wait_result| is the result of WaitForMultipleObjects() call. If
174     // one of the objects was signalled, it will have a value between
175     // [0, poll_fds_.size()].
176     if (i != windows_wait_result &&
177         WaitForSingleObject(handle, 0) != WAIT_OBJECT_0) {
178       continue;
179     }
180 #else
181     base::ignore_result(windows_wait_result);
182     const PlatformHandle handle = poll_fds_[i].fd;
183     if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
184       continue;
185     poll_fds_[i].revents = 0;
186 #endif
187 
188     // The wake-up event is handled inline to avoid an infinite recursion of
189     // posted tasks.
190     if (handle == event_.fd()) {
191       event_.Clear();
192       continue;
193     }
194 
195     // Binding to |this| is safe since we are the only object executing the
196     // task.
197     PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this, handle));
198 
199     // Flag the task as pending.
200 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
201     // On Windows this is done by marking the WatchTask entry as pending. This
202     // is more expensive than Linux as requires rebuilding the |poll_fds_|
203     // vector on each call. There doesn't seem to be a good alternative though.
204     auto it = watch_tasks_.find(handle);
205     PERFETTO_CHECK(it != watch_tasks_.end());
206     PERFETTO_DCHECK(!it->second.pending);
207     it->second.pending = true;
208 #else
209     // On UNIX systems instead, we just make the fd negative while its task is
210     // pending. This makes poll(2) ignore the fd.
211     PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
212     poll_fds_[i].fd = -poll_fds_[i].fd;
213 #endif
214   }
215 }
216 
RunFileDescriptorWatch(PlatformHandle fd)217 void UnixTaskRunner::RunFileDescriptorWatch(PlatformHandle fd) {
218   std::function<void()> task;
219   {
220     std::lock_guard<std::mutex> lock(lock_);
221     auto it = watch_tasks_.find(fd);
222     if (it == watch_tasks_.end())
223       return;
224     WatchTask& watch_task = it->second;
225 
226     // Make poll(2) pay attention to the fd again. Since another thread may have
227     // updated this watch we need to refresh the set first.
228     UpdateWatchTasksLocked();
229 
230 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
231     // On Windows we manually track the presence of outstanding tasks for the
232     // watch. The UpdateWatchTasksLocked() in the Run() loop will re-add the
233     // task to the |poll_fds_| vector.
234     PERFETTO_DCHECK(watch_task.pending);
235     watch_task.pending = false;
236 #else
237     size_t fd_index = watch_task.poll_fd_index;
238     PERFETTO_DCHECK(fd_index < poll_fds_.size());
239     PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
240     poll_fds_[fd_index].fd = fd;
241 #endif
242     task = watch_task.callback;
243   }
244   errno = 0;
245   RunTaskWithWatchdogGuard(task);
246 }
247 
GetDelayMsToNextTaskLocked() const248 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
249   PERFETTO_DCHECK_THREAD(thread_checker_);
250   if (!immediate_tasks_.empty())
251     return 0;
252   if (!delayed_tasks_.empty()) {
253     TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs() -
254                       advanced_time_for_testing_;
255     return std::max(0, static_cast<int>(diff.count()));
256   }
257   return -1;
258 }
259 
PostTask(std::function<void ()> task)260 void UnixTaskRunner::PostTask(std::function<void()> task) {
261   bool was_empty;
262   {
263     std::lock_guard<std::mutex> lock(lock_);
264     was_empty = immediate_tasks_.empty();
265     immediate_tasks_.push_back(std::move(task));
266   }
267   if (was_empty)
268     WakeUp();
269 }
270 
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)271 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
272                                      uint32_t delay_ms) {
273   TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
274   {
275     std::lock_guard<std::mutex> lock(lock_);
276     delayed_tasks_.insert(
277         std::make_pair(runtime + advanced_time_for_testing_, std::move(task)));
278   }
279   WakeUp();
280 }
281 
AddFileDescriptorWatch(PlatformHandle fd,std::function<void ()> task)282 void UnixTaskRunner::AddFileDescriptorWatch(PlatformHandle fd,
283                                             std::function<void()> task) {
284   PERFETTO_DCHECK(PlatformHandleChecker::IsValid(fd));
285   {
286     std::lock_guard<std::mutex> lock(lock_);
287     PERFETTO_DCHECK(!watch_tasks_.count(fd));
288     WatchTask& watch_task = watch_tasks_[fd];
289     watch_task.callback = std::move(task);
290 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
291     watch_task.pending = false;
292 #else
293     watch_task.poll_fd_index = SIZE_MAX;
294 #endif
295     watch_tasks_changed_ = true;
296   }
297   WakeUp();
298 }
299 
RemoveFileDescriptorWatch(PlatformHandle fd)300 void UnixTaskRunner::RemoveFileDescriptorWatch(PlatformHandle fd) {
301   PERFETTO_DCHECK(PlatformHandleChecker::IsValid(fd));
302   {
303     std::lock_guard<std::mutex> lock(lock_);
304     PERFETTO_DCHECK(watch_tasks_.count(fd));
305     watch_tasks_.erase(fd);
306     watch_tasks_changed_ = true;
307   }
308   // No need to schedule a wake-up for this.
309 }
310 
RunsTasksOnCurrentThread() const311 bool UnixTaskRunner::RunsTasksOnCurrentThread() const {
312   return GetThreadId() == created_thread_id_.load(std::memory_order_relaxed);
313 }
314 
315 }  // namespace base
316 }  // namespace perfetto
317