xref: /aosp_15_r20/external/pigweed/pw_async2_epoll/dispatcher_native.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_async2/dispatcher_native.h"
16 
17 #include <fcntl.h>
18 #include <sys/epoll.h>
19 #include <unistd.h>
20 
21 #include <cstring>
22 #include <mutex>
23 
24 #include "pw_assert/check.h"
25 #include "pw_log/log.h"
26 #include "pw_preprocessor/compiler.h"
27 #include "pw_status/status.h"
28 
29 namespace pw::async2::backend {
30 namespace {
31 
32 constexpr char kNotificationSignal = 'c';
33 
34 }  // namespace
35 
NativeInit()36 Status NativeDispatcher::NativeInit() {
37   epoll_fd_ = epoll_create1(0);
38   if (epoll_fd_ == -1) {
39     PW_LOG_ERROR("Failed to open epoll: %s", std::strerror(errno));
40     return Status::Internal();
41   }
42 
43   int pipefd[2];
44   if (pipe2(pipefd, O_DIRECT | O_NONBLOCK) == -1) {
45     PW_LOG_ERROR("Failed to create pipe: %s", std::strerror(errno));
46     return Status::Internal();
47   }
48 
49   wait_fd_ = pipefd[0];
50   notify_fd_ = pipefd[1];
51 
52   struct epoll_event event;
53   event.events = EPOLLIN;
54   event.data.fd = wait_fd_;
55   if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, wait_fd_, &event) == -1) {
56     PW_LOG_ERROR("Failed to initialize epoll event for dispatcher");
57     return Status::Internal();
58   }
59 
60   return OkStatus();
61 }
62 
DoRunUntilStalled(Dispatcher & dispatcher,Task * task)63 Poll<> NativeDispatcher::DoRunUntilStalled(Dispatcher& dispatcher, Task* task) {
64   {
65     std::lock_guard lock(dispatcher_lock());
66     PW_CHECK(task == nullptr || HasPostedTask(*task),
67              "Attempted to run a dispatcher until a task was stalled, "
68              "but that task has not been `Post`ed to that `Dispatcher`.");
69   }
70   while (true) {
71     RunOneTaskResult result = RunOneTask(dispatcher, task);
72     if (result.completed_main_task() || result.completed_all_tasks()) {
73       return Ready();
74     }
75     if (!result.ran_a_task()) {
76       return Pending();
77     }
78   }
79 }
80 
DoRunToCompletion(Dispatcher & dispatcher,Task * task)81 void NativeDispatcher::DoRunToCompletion(Dispatcher& dispatcher, Task* task) {
82   {
83     std::lock_guard lock(dispatcher_lock());
84     PW_CHECK(task == nullptr || HasPostedTask(*task),
85              "Attempted to run a dispatcher until a task was complete, "
86              "but that task has not been `Post`ed to that `Dispatcher`.");
87   }
88   while (true) {
89     RunOneTaskResult result = RunOneTask(dispatcher, task);
90     if (result.completed_main_task() || result.completed_all_tasks()) {
91       return;
92     }
93     if (!result.ran_a_task()) {
94       SleepInfo sleep_info = AttemptRequestWake(/*allow_empty=*/false);
95       if (sleep_info.should_sleep()) {
96         if (!NativeWaitForWake().ok()) {
97           break;
98         }
99       }
100     }
101   }
102 }
103 
NativeWaitForWake()104 Status NativeDispatcher::NativeWaitForWake() {
105   std::array<epoll_event, kMaxEventsToProcessAtOnce> events;
106 
107   int num_events =
108       epoll_wait(epoll_fd_, events.data(), events.size(), /*timeout=*/-1);
109   if (num_events < 0) {
110     if (errno == EINTR) {
111       return OkStatus();
112     }
113 
114     PW_LOG_ERROR("Dispatcher failed to wait for incoming events: %s",
115                  std::strerror(errno));
116     return Status::Internal();
117   }
118 
119   for (int i = 0; i < num_events; ++i) {
120     epoll_event& event = events[i];
121     if (event.data.fd == wait_fd_) {
122       // Consume the wake notification.
123       char unused;
124       ssize_t bytes_read = read(wait_fd_, &unused, 1);
125       PW_CHECK_INT_EQ(
126           bytes_read, 1, "Dispatcher failed to read wake notification");
127       PW_DCHECK_INT_EQ(unused, kNotificationSignal);
128       continue;
129     }
130 
131     // Debug log for missed events.
132     if (PW_LOG_LEVEL >= PW_LOG_LEVEL_DEBUG &&
133         wakers_[event.data.fd].read.IsEmpty() &&
134         wakers_[event.data.fd].write.IsEmpty()) {
135       PW_LOG_DEBUG(
136           "Received an event for registered file descriptor %d, but there is "
137           "no task to wake",
138           event.data.fd);
139     }
140 
141     if ((event.events & (EPOLLIN | EPOLLRDHUP)) != 0) {
142       std::move(wakers_[event.data.fd].read).Wake();
143     }
144     if ((event.events & EPOLLOUT) != 0) {
145       std::move(wakers_[event.data.fd].write).Wake();
146     }
147   }
148 
149   return OkStatus();
150 }
151 
NativeRegisterFileDescriptor(int fd,FileDescriptorType type)152 Status NativeDispatcher::NativeRegisterFileDescriptor(int fd,
153                                                       FileDescriptorType type) {
154   epoll_event event;
155   event.events = EPOLLET;
156   event.data.fd = fd;
157 
158   if ((type & FileDescriptorType::kReadable) != 0) {
159     event.events |= EPOLLIN | EPOLLRDHUP;
160   }
161   if ((type & FileDescriptorType::kWritable) != 0) {
162     event.events |= EPOLLOUT;
163   }
164 
165   if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) == -1) {
166     PW_LOG_ERROR("Failed to register epoll event: %s", std::strerror(errno));
167     return Status::Internal();
168   }
169 
170   return OkStatus();
171 }
172 
NativeUnregisterFileDescriptor(int fd)173 Status NativeDispatcher::NativeUnregisterFileDescriptor(int fd) {
174   epoll_event event;
175   event.data.fd = fd;
176   if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event) == -1) {
177     PW_LOG_ERROR("Failed to unregister epoll event: %s", std::strerror(errno));
178     return Status::Internal();
179   }
180   wakers_.erase(fd);
181   return OkStatus();
182 }
183 
DoWake()184 void NativeDispatcher::DoWake() {
185   // Perform a write to unblock the waiting dispatcher.
186   //
187   // We ignore the result of the write, since nonblocking writes can
188   // fail due to there already being messages in the `notify_fd_` pipe.
189   // This is fine, since it means that the dispatcher thread is already queued
190   // to wake up.
191   write(notify_fd_, &kNotificationSignal, 1);
192 }
193 
194 }  // namespace pw::async2::backend
195