xref: /aosp_15_r20/external/webrtc/rtc_base/task_queue_libevent.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "rtc_base/task_queue_libevent.h"
12 
13 #include <errno.h>
14 #include <fcntl.h>
15 #include <pthread.h>
16 #include <signal.h>
17 #include <stdint.h>
18 #include <time.h>
19 #include <unistd.h>
20 
21 #include <list>
22 #include <memory>
23 #include <type_traits>
24 #include <utility>
25 
26 #include "absl/container/inlined_vector.h"
27 #include "absl/functional/any_invocable.h"
28 #include "absl/strings/string_view.h"
29 #include "api/task_queue/task_queue_base.h"
30 #include "api/units/time_delta.h"
31 #include "rtc_base/checks.h"
32 #include "rtc_base/logging.h"
33 #include "rtc_base/numerics/safe_conversions.h"
34 #include "rtc_base/platform_thread.h"
35 #include "rtc_base/platform_thread_types.h"
36 #include "rtc_base/synchronization/mutex.h"
37 #include "rtc_base/thread_annotations.h"
38 #include "rtc_base/time_utils.h"
39 #include "third_party/libevent/event.h"
40 
41 namespace webrtc {
42 namespace {
43 constexpr char kQuit = 1;
44 constexpr char kRunTasks = 2;
45 
46 using Priority = TaskQueueFactory::Priority;
47 
48 // This ignores the SIGPIPE signal on the calling thread.
49 // This signal can be fired when trying to write() to a pipe that's being
50 // closed or while closing a pipe that's being written to.
51 // We can run into that situation so we ignore this signal and continue as
52 // normal.
53 // As a side note for this implementation, it would be great if we could safely
54 // restore the sigmask, but unfortunately the operation of restoring it, can
55 // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
56 // The SIGPIPE signal by default causes the process to be terminated, so we
57 // don't want to risk that.
58 // An alternative to this approach is to ignore the signal for the whole
59 // process:
60 //   signal(SIGPIPE, SIG_IGN);
IgnoreSigPipeSignalOnCurrentThread()61 void IgnoreSigPipeSignalOnCurrentThread() {
62   sigset_t sigpipe_mask;
63   sigemptyset(&sigpipe_mask);
64   sigaddset(&sigpipe_mask, SIGPIPE);
65   pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
66 }
67 
SetNonBlocking(int fd)68 bool SetNonBlocking(int fd) {
69   const int flags = fcntl(fd, F_GETFL);
70   RTC_CHECK(flags != -1);
71   return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
72 }
73 
74 // TODO(tommi): This is a hack to support two versions of libevent that we're
75 // compatible with.  The method we really want to call is event_assign(),
76 // since event_set() has been marked as deprecated (and doesn't accept
77 // passing event_base__ as a parameter).  However, the version of libevent
78 // that we have in Chromium, doesn't have event_assign(), so we need to call
79 // event_set() there.
EventAssign(struct event * ev,struct event_base * base,int fd,short events,void (* callback)(int,short,void *),void * arg)80 void EventAssign(struct event* ev,
81                  struct event_base* base,
82                  int fd,
83                  short events,
84                  void (*callback)(int, short, void*),
85                  void* arg) {
86 #if defined(_EVENT2_EVENT_H_)
87   RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
88 #else
89   event_set(ev, fd, events, callback, arg);
90   RTC_CHECK_EQ(0, event_base_set(base, ev));
91 #endif
92 }
93 
TaskQueuePriorityToThreadPriority(Priority priority)94 rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
95   switch (priority) {
96     case Priority::HIGH:
97       return rtc::ThreadPriority::kRealtime;
98     case Priority::LOW:
99       return rtc::ThreadPriority::kLow;
100     case Priority::NORMAL:
101       return rtc::ThreadPriority::kNormal;
102   }
103 }
104 
105 class TaskQueueLibevent final : public TaskQueueBase {
106  public:
107   TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
108 
109   void Delete() override;
110   void PostTask(absl::AnyInvocable<void() &&> task) override;
111   void PostDelayedTask(absl::AnyInvocable<void() &&> task,
112                        TimeDelta delay) override;
113   void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
114                                     TimeDelta delay) override;
115 
116  private:
117   struct TimerEvent;
118 
119   void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
120                                   TimeDelta delay);
121 
122   ~TaskQueueLibevent() override = default;
123 
124   static void OnWakeup(int socket, short flags, void* context);  // NOLINT
125   static void RunTimer(int fd, short flags, void* context);      // NOLINT
126 
127   bool is_active_ = true;
128   int wakeup_pipe_in_ = -1;
129   int wakeup_pipe_out_ = -1;
130   event_base* event_base_;
131   event wakeup_event_;
132   rtc::PlatformThread thread_;
133   Mutex pending_lock_;
134   absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
135       RTC_GUARDED_BY(pending_lock_);
136   // Holds a list of events pending timers for cleanup when the loop exits.
137   std::list<TimerEvent*> pending_timers_;
138 };
139 
140 struct TaskQueueLibevent::TimerEvent {
TimerEventwebrtc::__anon8d624fd20111::TaskQueueLibevent::TimerEvent141   TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
142       : task_queue(task_queue), task(std::move(task)) {}
~TimerEventwebrtc::__anon8d624fd20111::TaskQueueLibevent::TimerEvent143   ~TimerEvent() { event_del(&ev); }
144 
145   event ev;
146   TaskQueueLibevent* task_queue;
147   absl::AnyInvocable<void() &&> task;
148 };
149 
TaskQueueLibevent(absl::string_view queue_name,rtc::ThreadPriority priority)150 TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
151                                      rtc::ThreadPriority priority)
152     : event_base_(event_base_new()) {
153   int fds[2];
154   RTC_CHECK(pipe(fds) == 0);
155   SetNonBlocking(fds[0]);
156   SetNonBlocking(fds[1]);
157   wakeup_pipe_out_ = fds[0];
158   wakeup_pipe_in_ = fds[1];
159 
160   EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
161               EV_READ | EV_PERSIST, OnWakeup, this);
162   event_add(&wakeup_event_, 0);
163   thread_ = rtc::PlatformThread::SpawnJoinable(
164       [this] {
165         {
166           CurrentTaskQueueSetter set_current(this);
167           while (is_active_)
168             event_base_loop(event_base_, 0);
169         }
170 
171         for (TimerEvent* timer : pending_timers_)
172           delete timer;
173       },
174       queue_name, rtc::ThreadAttributes().SetPriority(priority));
175 }
176 
Delete()177 void TaskQueueLibevent::Delete() {
178   RTC_DCHECK(!IsCurrent());
179   struct timespec ts;
180   char message = kQuit;
181   while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
182     // The queue is full, so we have no choice but to wait and retry.
183     RTC_CHECK_EQ(EAGAIN, errno);
184     ts.tv_sec = 0;
185     ts.tv_nsec = 1000000;
186     nanosleep(&ts, nullptr);
187   }
188 
189   thread_.Finalize();
190 
191   event_del(&wakeup_event_);
192 
193   IgnoreSigPipeSignalOnCurrentThread();
194 
195   close(wakeup_pipe_in_);
196   close(wakeup_pipe_out_);
197   wakeup_pipe_in_ = -1;
198   wakeup_pipe_out_ = -1;
199 
200   event_base_free(event_base_);
201   delete this;
202 }
203 
PostTask(absl::AnyInvocable<void ()&&> task)204 void TaskQueueLibevent::PostTask(absl::AnyInvocable<void() &&> task) {
205   {
206     MutexLock lock(&pending_lock_);
207     bool had_pending_tasks = !pending_.empty();
208     pending_.push_back(std::move(task));
209 
210     // Only write to the pipe if there were no pending tasks before this one
211     // since the thread could be sleeping. If there were already pending tasks
212     // then we know there's either a pending write in the pipe or the thread has
213     // not yet processed the pending tasks. In either case, the thread will
214     // eventually wake up and process all pending tasks including this one.
215     if (had_pending_tasks) {
216       return;
217     }
218   }
219 
220   // Note: This behvior outlined above ensures we never fill up the pipe write
221   // buffer since there will only ever be 1 byte pending.
222   char message = kRunTasks;
223   RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
224                sizeof(message));
225 }
226 
PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void ()&&> task,TimeDelta delay)227 void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
228     absl::AnyInvocable<void() &&> task,
229     TimeDelta delay) {
230   // libevent api is not thread safe by default, thus event_add need to be
231   // called on the `thread_`.
232   RTC_DCHECK(IsCurrent());
233 
234   TimerEvent* timer = new TimerEvent(this, std::move(task));
235   EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
236               timer);
237   pending_timers_.push_back(timer);
238   timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
239                 .tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
240   event_add(&timer->ev, &tv);
241 }
242 
PostDelayedTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)243 void TaskQueueLibevent::PostDelayedTask(absl::AnyInvocable<void() &&> task,
244                                         TimeDelta delay) {
245   if (IsCurrent()) {
246     PostDelayedTaskOnTaskQueue(std::move(task), delay);
247   } else {
248     int64_t posted_us = rtc::TimeMicros();
249     PostTask([posted_us, delay, task = std::move(task), this]() mutable {
250       // Compensate for the time that has passed since the posting.
251       TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
252       PostDelayedTaskOnTaskQueue(
253           std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
254     });
255   }
256 }
257 
PostDelayedHighPrecisionTask(absl::AnyInvocable<void ()&&> task,TimeDelta delay)258 void TaskQueueLibevent::PostDelayedHighPrecisionTask(
259     absl::AnyInvocable<void() &&> task,
260     TimeDelta delay) {
261   PostDelayedTask(std::move(task), delay);
262 }
263 
264 // static
OnWakeup(int socket,short flags,void * context)265 void TaskQueueLibevent::OnWakeup(int socket,
266                                  short flags,  // NOLINT
267                                  void* context) {
268   TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
269   RTC_DCHECK(me->wakeup_pipe_out_ == socket);
270   char buf;
271   RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
272   switch (buf) {
273     case kQuit:
274       me->is_active_ = false;
275       event_base_loopbreak(me->event_base_);
276       break;
277     case kRunTasks: {
278       absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
279       {
280         MutexLock lock(&me->pending_lock_);
281         tasks.swap(me->pending_);
282       }
283       RTC_DCHECK(!tasks.empty());
284       for (auto& task : tasks) {
285         std::move(task)();
286         // Prefer to delete the `task` before running the next one.
287         task = nullptr;
288       }
289       break;
290     }
291     default:
292       RTC_DCHECK_NOTREACHED();
293       break;
294   }
295 }
296 
297 // static
RunTimer(int fd,short flags,void * context)298 void TaskQueueLibevent::RunTimer(int fd,
299                                  short flags,  // NOLINT
300                                  void* context) {
301   TimerEvent* timer = static_cast<TimerEvent*>(context);
302   std::move(timer->task)();
303   timer->task_queue->pending_timers_.remove(timer);
304   delete timer;
305 }
306 
307 class TaskQueueLibeventFactory final : public TaskQueueFactory {
308  public:
CreateTaskQueue(absl::string_view name,Priority priority) const309   std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
310       absl::string_view name,
311       Priority priority) const override {
312     return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
313         new TaskQueueLibevent(name,
314                               TaskQueuePriorityToThreadPriority(priority)));
315   }
316 };
317 
318 }  // namespace
319 
CreateTaskQueueLibeventFactory()320 std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
321   return std::make_unique<TaskQueueLibeventFactory>();
322 }
323 
324 }  // namespace webrtc
325