1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_async2/dispatcher_native.h"
16
17 #include <fcntl.h>
18 #include <sys/epoll.h>
19 #include <unistd.h>
20
21 #include <cstring>
22 #include <mutex>
23
24 #include "pw_assert/check.h"
25 #include "pw_log/log.h"
26 #include "pw_preprocessor/compiler.h"
27 #include "pw_status/status.h"
28
29 namespace pw::async2::backend {
30 namespace {
31
32 constexpr char kNotificationSignal = 'c';
33
34 } // namespace
35
NativeInit()36 Status NativeDispatcher::NativeInit() {
37 epoll_fd_ = epoll_create1(0);
38 if (epoll_fd_ == -1) {
39 PW_LOG_ERROR("Failed to open epoll: %s", std::strerror(errno));
40 return Status::Internal();
41 }
42
43 int pipefd[2];
44 if (pipe2(pipefd, O_DIRECT | O_NONBLOCK) == -1) {
45 PW_LOG_ERROR("Failed to create pipe: %s", std::strerror(errno));
46 return Status::Internal();
47 }
48
49 wait_fd_ = pipefd[0];
50 notify_fd_ = pipefd[1];
51
52 struct epoll_event event;
53 event.events = EPOLLIN;
54 event.data.fd = wait_fd_;
55 if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, wait_fd_, &event) == -1) {
56 PW_LOG_ERROR("Failed to initialize epoll event for dispatcher");
57 return Status::Internal();
58 }
59
60 return OkStatus();
61 }
62
DoRunUntilStalled(Dispatcher & dispatcher,Task * task)63 Poll<> NativeDispatcher::DoRunUntilStalled(Dispatcher& dispatcher, Task* task) {
64 {
65 std::lock_guard lock(dispatcher_lock());
66 PW_CHECK(task == nullptr || HasPostedTask(*task),
67 "Attempted to run a dispatcher until a task was stalled, "
68 "but that task has not been `Post`ed to that `Dispatcher`.");
69 }
70 while (true) {
71 RunOneTaskResult result = RunOneTask(dispatcher, task);
72 if (result.completed_main_task() || result.completed_all_tasks()) {
73 return Ready();
74 }
75 if (!result.ran_a_task()) {
76 return Pending();
77 }
78 }
79 }
80
DoRunToCompletion(Dispatcher & dispatcher,Task * task)81 void NativeDispatcher::DoRunToCompletion(Dispatcher& dispatcher, Task* task) {
82 {
83 std::lock_guard lock(dispatcher_lock());
84 PW_CHECK(task == nullptr || HasPostedTask(*task),
85 "Attempted to run a dispatcher until a task was complete, "
86 "but that task has not been `Post`ed to that `Dispatcher`.");
87 }
88 while (true) {
89 RunOneTaskResult result = RunOneTask(dispatcher, task);
90 if (result.completed_main_task() || result.completed_all_tasks()) {
91 return;
92 }
93 if (!result.ran_a_task()) {
94 SleepInfo sleep_info = AttemptRequestWake(/*allow_empty=*/false);
95 if (sleep_info.should_sleep()) {
96 if (!NativeWaitForWake().ok()) {
97 break;
98 }
99 }
100 }
101 }
102 }
103
NativeWaitForWake()104 Status NativeDispatcher::NativeWaitForWake() {
105 std::array<epoll_event, kMaxEventsToProcessAtOnce> events;
106
107 int num_events =
108 epoll_wait(epoll_fd_, events.data(), events.size(), /*timeout=*/-1);
109 if (num_events < 0) {
110 if (errno == EINTR) {
111 return OkStatus();
112 }
113
114 PW_LOG_ERROR("Dispatcher failed to wait for incoming events: %s",
115 std::strerror(errno));
116 return Status::Internal();
117 }
118
119 for (int i = 0; i < num_events; ++i) {
120 epoll_event& event = events[i];
121 if (event.data.fd == wait_fd_) {
122 // Consume the wake notification.
123 char unused;
124 ssize_t bytes_read = read(wait_fd_, &unused, 1);
125 PW_CHECK_INT_EQ(
126 bytes_read, 1, "Dispatcher failed to read wake notification");
127 PW_DCHECK_INT_EQ(unused, kNotificationSignal);
128 continue;
129 }
130
131 // Debug log for missed events.
132 if (PW_LOG_LEVEL >= PW_LOG_LEVEL_DEBUG &&
133 wakers_[event.data.fd].read.IsEmpty() &&
134 wakers_[event.data.fd].write.IsEmpty()) {
135 PW_LOG_DEBUG(
136 "Received an event for registered file descriptor %d, but there is "
137 "no task to wake",
138 event.data.fd);
139 }
140
141 if ((event.events & (EPOLLIN | EPOLLRDHUP)) != 0) {
142 std::move(wakers_[event.data.fd].read).Wake();
143 }
144 if ((event.events & EPOLLOUT) != 0) {
145 std::move(wakers_[event.data.fd].write).Wake();
146 }
147 }
148
149 return OkStatus();
150 }
151
NativeRegisterFileDescriptor(int fd,FileDescriptorType type)152 Status NativeDispatcher::NativeRegisterFileDescriptor(int fd,
153 FileDescriptorType type) {
154 epoll_event event;
155 event.events = EPOLLET;
156 event.data.fd = fd;
157
158 if ((type & FileDescriptorType::kReadable) != 0) {
159 event.events |= EPOLLIN | EPOLLRDHUP;
160 }
161 if ((type & FileDescriptorType::kWritable) != 0) {
162 event.events |= EPOLLOUT;
163 }
164
165 if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) == -1) {
166 PW_LOG_ERROR("Failed to register epoll event: %s", std::strerror(errno));
167 return Status::Internal();
168 }
169
170 return OkStatus();
171 }
172
NativeUnregisterFileDescriptor(int fd)173 Status NativeDispatcher::NativeUnregisterFileDescriptor(int fd) {
174 epoll_event event;
175 event.data.fd = fd;
176 if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event) == -1) {
177 PW_LOG_ERROR("Failed to unregister epoll event: %s", std::strerror(errno));
178 return Status::Internal();
179 }
180 wakers_.erase(fd);
181 return OkStatus();
182 }
183
DoWake()184 void NativeDispatcher::DoWake() {
185 // Perform a write to unblock the waiting dispatcher.
186 //
187 // We ignore the result of the write, since nonblocking writes can
188 // fail due to there already being messages in the `notify_fd_` pipe.
189 // This is fine, since it means that the dispatcher thread is already queued
190 // to wake up.
191 write(notify_fd_, &kNotificationSignal, 1);
192 }
193
194 } // namespace pw::async2::backend
195