1*61c4878aSAndroid Build Coastguard Worker // Copyright 2024 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker
15*61c4878aSAndroid Build Coastguard Worker #include "pw_async2/dispatcher_native.h"
16*61c4878aSAndroid Build Coastguard Worker
17*61c4878aSAndroid Build Coastguard Worker #include <fcntl.h>
18*61c4878aSAndroid Build Coastguard Worker #include <sys/epoll.h>
19*61c4878aSAndroid Build Coastguard Worker #include <unistd.h>
20*61c4878aSAndroid Build Coastguard Worker
21*61c4878aSAndroid Build Coastguard Worker #include <cstring>
22*61c4878aSAndroid Build Coastguard Worker #include <mutex>
23*61c4878aSAndroid Build Coastguard Worker
24*61c4878aSAndroid Build Coastguard Worker #include "pw_assert/check.h"
25*61c4878aSAndroid Build Coastguard Worker #include "pw_log/log.h"
26*61c4878aSAndroid Build Coastguard Worker #include "pw_preprocessor/compiler.h"
27*61c4878aSAndroid Build Coastguard Worker #include "pw_status/status.h"
28*61c4878aSAndroid Build Coastguard Worker
29*61c4878aSAndroid Build Coastguard Worker namespace pw::async2::backend {
30*61c4878aSAndroid Build Coastguard Worker namespace {
31*61c4878aSAndroid Build Coastguard Worker
32*61c4878aSAndroid Build Coastguard Worker constexpr char kNotificationSignal = 'c';
33*61c4878aSAndroid Build Coastguard Worker
34*61c4878aSAndroid Build Coastguard Worker } // namespace
35*61c4878aSAndroid Build Coastguard Worker
NativeInit()36*61c4878aSAndroid Build Coastguard Worker Status NativeDispatcher::NativeInit() {
37*61c4878aSAndroid Build Coastguard Worker epoll_fd_ = epoll_create1(0);
38*61c4878aSAndroid Build Coastguard Worker if (epoll_fd_ == -1) {
39*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Failed to open epoll: %s", std::strerror(errno));
40*61c4878aSAndroid Build Coastguard Worker return Status::Internal();
41*61c4878aSAndroid Build Coastguard Worker }
42*61c4878aSAndroid Build Coastguard Worker
43*61c4878aSAndroid Build Coastguard Worker int pipefd[2];
44*61c4878aSAndroid Build Coastguard Worker if (pipe2(pipefd, O_DIRECT | O_NONBLOCK) == -1) {
45*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Failed to create pipe: %s", std::strerror(errno));
46*61c4878aSAndroid Build Coastguard Worker return Status::Internal();
47*61c4878aSAndroid Build Coastguard Worker }
48*61c4878aSAndroid Build Coastguard Worker
49*61c4878aSAndroid Build Coastguard Worker wait_fd_ = pipefd[0];
50*61c4878aSAndroid Build Coastguard Worker notify_fd_ = pipefd[1];
51*61c4878aSAndroid Build Coastguard Worker
52*61c4878aSAndroid Build Coastguard Worker struct epoll_event event;
53*61c4878aSAndroid Build Coastguard Worker event.events = EPOLLIN;
54*61c4878aSAndroid Build Coastguard Worker event.data.fd = wait_fd_;
55*61c4878aSAndroid Build Coastguard Worker if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, wait_fd_, &event) == -1) {
56*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Failed to initialize epoll event for dispatcher");
57*61c4878aSAndroid Build Coastguard Worker return Status::Internal();
58*61c4878aSAndroid Build Coastguard Worker }
59*61c4878aSAndroid Build Coastguard Worker
60*61c4878aSAndroid Build Coastguard Worker return OkStatus();
61*61c4878aSAndroid Build Coastguard Worker }
62*61c4878aSAndroid Build Coastguard Worker
DoRunUntilStalled(Dispatcher & dispatcher,Task * task)63*61c4878aSAndroid Build Coastguard Worker Poll<> NativeDispatcher::DoRunUntilStalled(Dispatcher& dispatcher, Task* task) {
64*61c4878aSAndroid Build Coastguard Worker {
65*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(dispatcher_lock());
66*61c4878aSAndroid Build Coastguard Worker PW_CHECK(task == nullptr || HasPostedTask(*task),
67*61c4878aSAndroid Build Coastguard Worker "Attempted to run a dispatcher until a task was stalled, "
68*61c4878aSAndroid Build Coastguard Worker "but that task has not been `Post`ed to that `Dispatcher`.");
69*61c4878aSAndroid Build Coastguard Worker }
70*61c4878aSAndroid Build Coastguard Worker while (true) {
71*61c4878aSAndroid Build Coastguard Worker RunOneTaskResult result = RunOneTask(dispatcher, task);
72*61c4878aSAndroid Build Coastguard Worker if (result.completed_main_task() || result.completed_all_tasks()) {
73*61c4878aSAndroid Build Coastguard Worker return Ready();
74*61c4878aSAndroid Build Coastguard Worker }
75*61c4878aSAndroid Build Coastguard Worker if (!result.ran_a_task()) {
76*61c4878aSAndroid Build Coastguard Worker return Pending();
77*61c4878aSAndroid Build Coastguard Worker }
78*61c4878aSAndroid Build Coastguard Worker }
79*61c4878aSAndroid Build Coastguard Worker }
80*61c4878aSAndroid Build Coastguard Worker
DoRunToCompletion(Dispatcher & dispatcher,Task * task)81*61c4878aSAndroid Build Coastguard Worker void NativeDispatcher::DoRunToCompletion(Dispatcher& dispatcher, Task* task) {
82*61c4878aSAndroid Build Coastguard Worker {
83*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(dispatcher_lock());
84*61c4878aSAndroid Build Coastguard Worker PW_CHECK(task == nullptr || HasPostedTask(*task),
85*61c4878aSAndroid Build Coastguard Worker "Attempted to run a dispatcher until a task was complete, "
86*61c4878aSAndroid Build Coastguard Worker "but that task has not been `Post`ed to that `Dispatcher`.");
87*61c4878aSAndroid Build Coastguard Worker }
88*61c4878aSAndroid Build Coastguard Worker while (true) {
89*61c4878aSAndroid Build Coastguard Worker RunOneTaskResult result = RunOneTask(dispatcher, task);
90*61c4878aSAndroid Build Coastguard Worker if (result.completed_main_task() || result.completed_all_tasks()) {
91*61c4878aSAndroid Build Coastguard Worker return;
92*61c4878aSAndroid Build Coastguard Worker }
93*61c4878aSAndroid Build Coastguard Worker if (!result.ran_a_task()) {
94*61c4878aSAndroid Build Coastguard Worker SleepInfo sleep_info = AttemptRequestWake(/*allow_empty=*/false);
95*61c4878aSAndroid Build Coastguard Worker if (sleep_info.should_sleep()) {
96*61c4878aSAndroid Build Coastguard Worker if (!NativeWaitForWake().ok()) {
97*61c4878aSAndroid Build Coastguard Worker break;
98*61c4878aSAndroid Build Coastguard Worker }
99*61c4878aSAndroid Build Coastguard Worker }
100*61c4878aSAndroid Build Coastguard Worker }
101*61c4878aSAndroid Build Coastguard Worker }
102*61c4878aSAndroid Build Coastguard Worker }
103*61c4878aSAndroid Build Coastguard Worker
NativeWaitForWake()104*61c4878aSAndroid Build Coastguard Worker Status NativeDispatcher::NativeWaitForWake() {
105*61c4878aSAndroid Build Coastguard Worker std::array<epoll_event, kMaxEventsToProcessAtOnce> events;
106*61c4878aSAndroid Build Coastguard Worker
107*61c4878aSAndroid Build Coastguard Worker int num_events =
108*61c4878aSAndroid Build Coastguard Worker epoll_wait(epoll_fd_, events.data(), events.size(), /*timeout=*/-1);
109*61c4878aSAndroid Build Coastguard Worker if (num_events < 0) {
110*61c4878aSAndroid Build Coastguard Worker if (errno == EINTR) {
111*61c4878aSAndroid Build Coastguard Worker return OkStatus();
112*61c4878aSAndroid Build Coastguard Worker }
113*61c4878aSAndroid Build Coastguard Worker
114*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Dispatcher failed to wait for incoming events: %s",
115*61c4878aSAndroid Build Coastguard Worker std::strerror(errno));
116*61c4878aSAndroid Build Coastguard Worker return Status::Internal();
117*61c4878aSAndroid Build Coastguard Worker }
118*61c4878aSAndroid Build Coastguard Worker
119*61c4878aSAndroid Build Coastguard Worker for (int i = 0; i < num_events; ++i) {
120*61c4878aSAndroid Build Coastguard Worker epoll_event& event = events[i];
121*61c4878aSAndroid Build Coastguard Worker if (event.data.fd == wait_fd_) {
122*61c4878aSAndroid Build Coastguard Worker // Consume the wake notification.
123*61c4878aSAndroid Build Coastguard Worker char unused;
124*61c4878aSAndroid Build Coastguard Worker ssize_t bytes_read = read(wait_fd_, &unused, 1);
125*61c4878aSAndroid Build Coastguard Worker PW_CHECK_INT_EQ(
126*61c4878aSAndroid Build Coastguard Worker bytes_read, 1, "Dispatcher failed to read wake notification");
127*61c4878aSAndroid Build Coastguard Worker PW_DCHECK_INT_EQ(unused, kNotificationSignal);
128*61c4878aSAndroid Build Coastguard Worker continue;
129*61c4878aSAndroid Build Coastguard Worker }
130*61c4878aSAndroid Build Coastguard Worker
131*61c4878aSAndroid Build Coastguard Worker // Debug log for missed events.
132*61c4878aSAndroid Build Coastguard Worker if (PW_LOG_LEVEL >= PW_LOG_LEVEL_DEBUG &&
133*61c4878aSAndroid Build Coastguard Worker wakers_[event.data.fd].read.IsEmpty() &&
134*61c4878aSAndroid Build Coastguard Worker wakers_[event.data.fd].write.IsEmpty()) {
135*61c4878aSAndroid Build Coastguard Worker PW_LOG_DEBUG(
136*61c4878aSAndroid Build Coastguard Worker "Received an event for registered file descriptor %d, but there is "
137*61c4878aSAndroid Build Coastguard Worker "no task to wake",
138*61c4878aSAndroid Build Coastguard Worker event.data.fd);
139*61c4878aSAndroid Build Coastguard Worker }
140*61c4878aSAndroid Build Coastguard Worker
141*61c4878aSAndroid Build Coastguard Worker if ((event.events & (EPOLLIN | EPOLLRDHUP)) != 0) {
142*61c4878aSAndroid Build Coastguard Worker std::move(wakers_[event.data.fd].read).Wake();
143*61c4878aSAndroid Build Coastguard Worker }
144*61c4878aSAndroid Build Coastguard Worker if ((event.events & EPOLLOUT) != 0) {
145*61c4878aSAndroid Build Coastguard Worker std::move(wakers_[event.data.fd].write).Wake();
146*61c4878aSAndroid Build Coastguard Worker }
147*61c4878aSAndroid Build Coastguard Worker }
148*61c4878aSAndroid Build Coastguard Worker
149*61c4878aSAndroid Build Coastguard Worker return OkStatus();
150*61c4878aSAndroid Build Coastguard Worker }
151*61c4878aSAndroid Build Coastguard Worker
NativeRegisterFileDescriptor(int fd,FileDescriptorType type)152*61c4878aSAndroid Build Coastguard Worker Status NativeDispatcher::NativeRegisterFileDescriptor(int fd,
153*61c4878aSAndroid Build Coastguard Worker FileDescriptorType type) {
154*61c4878aSAndroid Build Coastguard Worker epoll_event event;
155*61c4878aSAndroid Build Coastguard Worker event.events = EPOLLET;
156*61c4878aSAndroid Build Coastguard Worker event.data.fd = fd;
157*61c4878aSAndroid Build Coastguard Worker
158*61c4878aSAndroid Build Coastguard Worker if ((type & FileDescriptorType::kReadable) != 0) {
159*61c4878aSAndroid Build Coastguard Worker event.events |= EPOLLIN | EPOLLRDHUP;
160*61c4878aSAndroid Build Coastguard Worker }
161*61c4878aSAndroid Build Coastguard Worker if ((type & FileDescriptorType::kWritable) != 0) {
162*61c4878aSAndroid Build Coastguard Worker event.events |= EPOLLOUT;
163*61c4878aSAndroid Build Coastguard Worker }
164*61c4878aSAndroid Build Coastguard Worker
165*61c4878aSAndroid Build Coastguard Worker if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) == -1) {
166*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Failed to register epoll event: %s", std::strerror(errno));
167*61c4878aSAndroid Build Coastguard Worker return Status::Internal();
168*61c4878aSAndroid Build Coastguard Worker }
169*61c4878aSAndroid Build Coastguard Worker
170*61c4878aSAndroid Build Coastguard Worker return OkStatus();
171*61c4878aSAndroid Build Coastguard Worker }
172*61c4878aSAndroid Build Coastguard Worker
NativeUnregisterFileDescriptor(int fd)173*61c4878aSAndroid Build Coastguard Worker Status NativeDispatcher::NativeUnregisterFileDescriptor(int fd) {
174*61c4878aSAndroid Build Coastguard Worker epoll_event event;
175*61c4878aSAndroid Build Coastguard Worker event.data.fd = fd;
176*61c4878aSAndroid Build Coastguard Worker if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event) == -1) {
177*61c4878aSAndroid Build Coastguard Worker PW_LOG_ERROR("Failed to unregister epoll event: %s", std::strerror(errno));
178*61c4878aSAndroid Build Coastguard Worker return Status::Internal();
179*61c4878aSAndroid Build Coastguard Worker }
180*61c4878aSAndroid Build Coastguard Worker wakers_.erase(fd);
181*61c4878aSAndroid Build Coastguard Worker return OkStatus();
182*61c4878aSAndroid Build Coastguard Worker }
183*61c4878aSAndroid Build Coastguard Worker
DoWake()184*61c4878aSAndroid Build Coastguard Worker void NativeDispatcher::DoWake() {
185*61c4878aSAndroid Build Coastguard Worker // Perform a write to unblock the waiting dispatcher.
186*61c4878aSAndroid Build Coastguard Worker //
187*61c4878aSAndroid Build Coastguard Worker // We ignore the result of the write, since nonblocking writes can
188*61c4878aSAndroid Build Coastguard Worker // fail due to there already being messages in the `notify_fd_` pipe.
189*61c4878aSAndroid Build Coastguard Worker // This is fine, since it means that the dispatcher thread is already queued
190*61c4878aSAndroid Build Coastguard Worker // to wake up.
191*61c4878aSAndroid Build Coastguard Worker write(notify_fd_, &kNotificationSignal, 1);
192*61c4878aSAndroid Build Coastguard Worker }
193*61c4878aSAndroid Build Coastguard Worker
194*61c4878aSAndroid Build Coastguard Worker } // namespace pw::async2::backend
195