1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "rtc_base/task_queue_libevent.h"
12
13 #include <errno.h>
14 #include <fcntl.h>
15 #include <pthread.h>
16 #include <signal.h>
17 #include <stdint.h>
18 #include <time.h>
19 #include <unistd.h>
20
21 #include <list>
22 #include <memory>
23 #include <type_traits>
24 #include <utility>
25
26 #include "absl/container/inlined_vector.h"
27 #include "absl/functional/any_invocable.h"
28 #include "absl/strings/string_view.h"
29 #include "api/task_queue/task_queue_base.h"
30 #include "api/units/time_delta.h"
31 #include "rtc_base/checks.h"
32 #include "rtc_base/logging.h"
33 #include "rtc_base/numerics/safe_conversions.h"
34 #include "rtc_base/platform_thread.h"
35 #include "rtc_base/platform_thread_types.h"
36 #include "rtc_base/synchronization/mutex.h"
37 #include "rtc_base/thread_annotations.h"
38 #include "rtc_base/time_utils.h"
39 #include "third_party/libevent/event.h"
40
41 namespace webrtc {
42 namespace {
43 constexpr char kQuit = 1;
44 constexpr char kRunTasks = 2;
45
46 using Priority = TaskQueueFactory::Priority;
47
48 // This ignores the SIGPIPE signal on the calling thread.
49 // This signal can be fired when trying to write() to a pipe that's being
50 // closed or while closing a pipe that's being written to.
51 // We can run into that situation so we ignore this signal and continue as
52 // normal.
53 // As a side note for this implementation, it would be great if we could safely
54 // restore the sigmask, but unfortunately the operation of restoring it, can
55 // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
56 // The SIGPIPE signal by default causes the process to be terminated, so we
57 // don't want to risk that.
58 // An alternative to this approach is to ignore the signal for the whole
59 // process:
60 // signal(SIGPIPE, SIG_IGN);
IgnoreSigPipeSignalOnCurrentThread()61 void IgnoreSigPipeSignalOnCurrentThread() {
62 sigset_t sigpipe_mask;
63 sigemptyset(&sigpipe_mask);
64 sigaddset(&sigpipe_mask, SIGPIPE);
65 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
66 }
67
SetNonBlocking(int fd)68 bool SetNonBlocking(int fd) {
69 const int flags = fcntl(fd, F_GETFL);
70 RTC_CHECK(flags != -1);
71 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
72 }
73
74 // TODO(tommi): This is a hack to support two versions of libevent that we're
75 // compatible with. The method we really want to call is event_assign(),
76 // since event_set() has been marked as deprecated (and doesn't accept
77 // passing event_base__ as a parameter). However, the version of libevent
78 // that we have in Chromium, doesn't have event_assign(), so we need to call
79 // event_set() there.
EventAssign(struct event * ev,struct event_base * base,int fd,short events,void (* callback)(int,short,void *),void * arg)80 void EventAssign(struct event* ev,
81 struct event_base* base,
82 int fd,
83 short events,
84 void (*callback)(int, short, void*),
85 void* arg) {
86 #if defined(_EVENT2_EVENT_H_)
87 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
88 #else
89 event_set(ev, fd, events, callback, arg);
90 RTC_CHECK_EQ(0, event_base_set(base, ev));
91 #endif
92 }
93
TaskQueuePriorityToThreadPriority(Priority priority)94 rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
95 switch (priority) {
96 case Priority::HIGH:
97 return rtc::ThreadPriority::kRealtime;
98 case Priority::LOW:
99 return rtc::ThreadPriority::kLow;
100 case Priority::NORMAL:
101 return rtc::ThreadPriority::kNormal;
102 }
103 }
104
105 class TaskQueueLibevent final : public TaskQueueBase {
106 public:
107 TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
108
109 void Delete() override;
110 void PostTask(absl::AnyInvocable<void() &&> task) override;
111 void PostDelayedTask(absl::AnyInvocable<void() &&> task,
112 TimeDelta delay) override;
113 void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
114 TimeDelta delay) override;
115
116 private:
117 struct TimerEvent;
118
119 void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
120 TimeDelta delay);
121
122 ~TaskQueueLibevent() override = default;
123
124 static void OnWakeup(int socket, short flags, void* context); // NOLINT
125 static void RunTimer(int fd, short flags, void* context); // NOLINT
126
127 bool is_active_ = true;
128 int wakeup_pipe_in_ = -1;
129 int wakeup_pipe_out_ = -1;
130 event_base* event_base_;
131 event wakeup_event_;
132 rtc::PlatformThread thread_;
133 Mutex pending_lock_;
134 absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
135 RTC_GUARDED_BY(pending_lock_);
136 // Holds a list of events pending timers for cleanup when the loop exits.
137 std::list<TimerEvent*> pending_timers_;
138 };
139
140 struct TaskQueueLibevent::TimerEvent {
TimerEventwebrtc::__anon8d624fd20111::TaskQueueLibevent::TimerEvent141 TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
142 : task_queue(task_queue), task(std::move(task)) {}
~TimerEventwebrtc::__anon8d624fd20111::TaskQueueLibevent::TimerEvent143 ~TimerEvent() { event_del(&ev); }
144
145 event ev;
146 TaskQueueLibevent* task_queue;
147 absl::AnyInvocable<void() &&> task;
148 };
149
TaskQueueLibevent(absl::string_view queue_name,rtc::ThreadPriority priority)150 TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
151 rtc::ThreadPriority priority)
152 : event_base_(event_base_new()) {
153 int fds[2];
154 RTC_CHECK(pipe(fds) == 0);
155 SetNonBlocking(fds[0]);
156 SetNonBlocking(fds[1]);
157 wakeup_pipe_out_ = fds[0];
158 wakeup_pipe_in_ = fds[1];
159
160 EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
161 EV_READ | EV_PERSIST, OnWakeup, this);
162 event_add(&wakeup_event_, 0);
163 thread_ = rtc::PlatformThread::SpawnJoinable(
164 [this] {
165 {
166 CurrentTaskQueueSetter set_current(this);
167 while (is_active_)
168 event_base_loop(event_base_, 0);
169 }
170
171 for (TimerEvent* timer : pending_timers_)
172 delete timer;
173 },
174 queue_name, rtc::ThreadAttributes().SetPriority(priority));
175 }
176
Delete()177 void TaskQueueLibevent::Delete() {
178 RTC_DCHECK(!IsCurrent());
179 struct timespec ts;
180 char message = kQuit;
181 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
182 // The queue is full, so we have no choice but to wait and retry.
183 RTC_CHECK_EQ(EAGAIN, errno);
184 ts.tv_sec = 0;
185 ts.tv_nsec = 1000000;
186 nanosleep(&ts, nullptr);
187 }
188
189 thread_.Finalize();
190
191 event_del(&wakeup_event_);
192
193 IgnoreSigPipeSignalOnCurrentThread();
194
195 close(wakeup_pipe_in_);
196 close(wakeup_pipe_out_);
197 wakeup_pipe_in_ = -1;
198 wakeup_pipe_out_ = -1;
199
200 event_base_free(event_base_);
201 delete this;
202 }
203
PostTask(absl::AnyInvocable<void ()&&> task)204 void TaskQueueLibevent::PostTask(absl::AnyInvocable<void() &&> task) {
205 {
206 MutexLock lock(&pending_lock_);
207 bool had_pending_tasks = !pending_.empty();
208 pending_.push_back(std::move(task));
209
210 // Only write to the pipe if there were no pending tasks before this one
211 // since the thread could be sleeping. If there were already pending tasks
212 // then we know there's either a pending write in the pipe or the thread has
213 // not yet processed the pending tasks. In either case, the thread will
214 // eventually wake up and process all pending tasks including this one.
215 if (had_pending_tasks) {
216 return;
217 }
218 }
219
220 // Note: This behvior outlined above ensures we never fill up the pipe write
221 // buffer since there will only ever be 1 byte pending.
222 char message = kRunTasks;
223 RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
224 sizeof(message));
225 }
226
PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void ()&&> task,TimeDelta delay)227 void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
228 absl::AnyInvocable<void() &&> task,
229 TimeDelta delay) {
230 // libevent api is not thread safe by default, thus event_add need to be
231 // called on the `thread_`.
232 RTC_DCHECK(IsCurrent());
233
234 TimerEvent* timer = new TimerEvent(this, std::move(task));
235 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
236 timer);
237 pending_timers_.push_back(timer);
238 timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
239 .tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
240 event_add(&timer->ev, &tv);
241 }
242
PostDelayedTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)243 void TaskQueueLibevent::PostDelayedTask(absl::AnyInvocable<void() &&> task,
244 TimeDelta delay) {
245 if (IsCurrent()) {
246 PostDelayedTaskOnTaskQueue(std::move(task), delay);
247 } else {
248 int64_t posted_us = rtc::TimeMicros();
249 PostTask([posted_us, delay, task = std::move(task), this]() mutable {
250 // Compensate for the time that has passed since the posting.
251 TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
252 PostDelayedTaskOnTaskQueue(
253 std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
254 });
255 }
256 }
257
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)258 void TaskQueueLibevent::PostDelayedHighPrecisionTask(
259 absl::AnyInvocable<void() &&> task,
260 TimeDelta delay) {
261 PostDelayedTask(std::move(task), delay);
262 }
263
264 // static
OnWakeup(int socket,short flags,void * context)265 void TaskQueueLibevent::OnWakeup(int socket,
266 short flags, // NOLINT
267 void* context) {
268 TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
269 RTC_DCHECK(me->wakeup_pipe_out_ == socket);
270 char buf;
271 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
272 switch (buf) {
273 case kQuit:
274 me->is_active_ = false;
275 event_base_loopbreak(me->event_base_);
276 break;
277 case kRunTasks: {
278 absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
279 {
280 MutexLock lock(&me->pending_lock_);
281 tasks.swap(me->pending_);
282 }
283 RTC_DCHECK(!tasks.empty());
284 for (auto& task : tasks) {
285 std::move(task)();
286 // Prefer to delete the `task` before running the next one.
287 task = nullptr;
288 }
289 break;
290 }
291 default:
292 RTC_DCHECK_NOTREACHED();
293 break;
294 }
295 }
296
297 // static
RunTimer(int fd,short flags,void * context)298 void TaskQueueLibevent::RunTimer(int fd,
299 short flags, // NOLINT
300 void* context) {
301 TimerEvent* timer = static_cast<TimerEvent*>(context);
302 std::move(timer->task)();
303 timer->task_queue->pending_timers_.remove(timer);
304 delete timer;
305 }
306
307 class TaskQueueLibeventFactory final : public TaskQueueFactory {
308 public:
CreateTaskQueue(absl::string_view name,Priority priority) const309 std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
310 absl::string_view name,
311 Priority priority) const override {
312 return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
313 new TaskQueueLibevent(name,
314 TaskQueuePriorityToThreadPriority(priority)));
315 }
316 };
317
318 } // namespace
319
CreateTaskQueueLibeventFactory()320 std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
321 return std::make_unique<TaskQueueLibeventFactory>();
322 }
323
324 } // namespace webrtc
325