xref: /aosp_15_r20/external/pigweed/pw_async2_epoll/dispatcher_native.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker // Copyright 2024 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker 
15*61c4878aSAndroid Build Coastguard Worker #include "pw_async2/dispatcher_native.h"
16*61c4878aSAndroid Build Coastguard Worker 
17*61c4878aSAndroid Build Coastguard Worker #include <fcntl.h>
18*61c4878aSAndroid Build Coastguard Worker #include <sys/epoll.h>
19*61c4878aSAndroid Build Coastguard Worker #include <unistd.h>
20*61c4878aSAndroid Build Coastguard Worker 
21*61c4878aSAndroid Build Coastguard Worker #include <cstring>
22*61c4878aSAndroid Build Coastguard Worker #include <mutex>
23*61c4878aSAndroid Build Coastguard Worker 
24*61c4878aSAndroid Build Coastguard Worker #include "pw_assert/check.h"
25*61c4878aSAndroid Build Coastguard Worker #include "pw_log/log.h"
26*61c4878aSAndroid Build Coastguard Worker #include "pw_preprocessor/compiler.h"
27*61c4878aSAndroid Build Coastguard Worker #include "pw_status/status.h"
28*61c4878aSAndroid Build Coastguard Worker 
29*61c4878aSAndroid Build Coastguard Worker namespace pw::async2::backend {
30*61c4878aSAndroid Build Coastguard Worker namespace {
31*61c4878aSAndroid Build Coastguard Worker 
32*61c4878aSAndroid Build Coastguard Worker constexpr char kNotificationSignal = 'c';
33*61c4878aSAndroid Build Coastguard Worker 
34*61c4878aSAndroid Build Coastguard Worker }  // namespace
35*61c4878aSAndroid Build Coastguard Worker 
NativeInit()36*61c4878aSAndroid Build Coastguard Worker Status NativeDispatcher::NativeInit() {
37*61c4878aSAndroid Build Coastguard Worker   epoll_fd_ = epoll_create1(0);
38*61c4878aSAndroid Build Coastguard Worker   if (epoll_fd_ == -1) {
39*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Failed to open epoll: %s", std::strerror(errno));
40*61c4878aSAndroid Build Coastguard Worker     return Status::Internal();
41*61c4878aSAndroid Build Coastguard Worker   }
42*61c4878aSAndroid Build Coastguard Worker 
43*61c4878aSAndroid Build Coastguard Worker   int pipefd[2];
44*61c4878aSAndroid Build Coastguard Worker   if (pipe2(pipefd, O_DIRECT | O_NONBLOCK) == -1) {
45*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Failed to create pipe: %s", std::strerror(errno));
46*61c4878aSAndroid Build Coastguard Worker     return Status::Internal();
47*61c4878aSAndroid Build Coastguard Worker   }
48*61c4878aSAndroid Build Coastguard Worker 
49*61c4878aSAndroid Build Coastguard Worker   wait_fd_ = pipefd[0];
50*61c4878aSAndroid Build Coastguard Worker   notify_fd_ = pipefd[1];
51*61c4878aSAndroid Build Coastguard Worker 
52*61c4878aSAndroid Build Coastguard Worker   struct epoll_event event;
53*61c4878aSAndroid Build Coastguard Worker   event.events = EPOLLIN;
54*61c4878aSAndroid Build Coastguard Worker   event.data.fd = wait_fd_;
55*61c4878aSAndroid Build Coastguard Worker   if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, wait_fd_, &event) == -1) {
56*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Failed to initialize epoll event for dispatcher");
57*61c4878aSAndroid Build Coastguard Worker     return Status::Internal();
58*61c4878aSAndroid Build Coastguard Worker   }
59*61c4878aSAndroid Build Coastguard Worker 
60*61c4878aSAndroid Build Coastguard Worker   return OkStatus();
61*61c4878aSAndroid Build Coastguard Worker }
62*61c4878aSAndroid Build Coastguard Worker 
DoRunUntilStalled(Dispatcher & dispatcher,Task * task)63*61c4878aSAndroid Build Coastguard Worker Poll<> NativeDispatcher::DoRunUntilStalled(Dispatcher& dispatcher, Task* task) {
64*61c4878aSAndroid Build Coastguard Worker   {
65*61c4878aSAndroid Build Coastguard Worker     std::lock_guard lock(dispatcher_lock());
66*61c4878aSAndroid Build Coastguard Worker     PW_CHECK(task == nullptr || HasPostedTask(*task),
67*61c4878aSAndroid Build Coastguard Worker              "Attempted to run a dispatcher until a task was stalled, "
68*61c4878aSAndroid Build Coastguard Worker              "but that task has not been `Post`ed to that `Dispatcher`.");
69*61c4878aSAndroid Build Coastguard Worker   }
70*61c4878aSAndroid Build Coastguard Worker   while (true) {
71*61c4878aSAndroid Build Coastguard Worker     RunOneTaskResult result = RunOneTask(dispatcher, task);
72*61c4878aSAndroid Build Coastguard Worker     if (result.completed_main_task() || result.completed_all_tasks()) {
73*61c4878aSAndroid Build Coastguard Worker       return Ready();
74*61c4878aSAndroid Build Coastguard Worker     }
75*61c4878aSAndroid Build Coastguard Worker     if (!result.ran_a_task()) {
76*61c4878aSAndroid Build Coastguard Worker       return Pending();
77*61c4878aSAndroid Build Coastguard Worker     }
78*61c4878aSAndroid Build Coastguard Worker   }
79*61c4878aSAndroid Build Coastguard Worker }
80*61c4878aSAndroid Build Coastguard Worker 
DoRunToCompletion(Dispatcher & dispatcher,Task * task)81*61c4878aSAndroid Build Coastguard Worker void NativeDispatcher::DoRunToCompletion(Dispatcher& dispatcher, Task* task) {
82*61c4878aSAndroid Build Coastguard Worker   {
83*61c4878aSAndroid Build Coastguard Worker     std::lock_guard lock(dispatcher_lock());
84*61c4878aSAndroid Build Coastguard Worker     PW_CHECK(task == nullptr || HasPostedTask(*task),
85*61c4878aSAndroid Build Coastguard Worker              "Attempted to run a dispatcher until a task was complete, "
86*61c4878aSAndroid Build Coastguard Worker              "but that task has not been `Post`ed to that `Dispatcher`.");
87*61c4878aSAndroid Build Coastguard Worker   }
88*61c4878aSAndroid Build Coastguard Worker   while (true) {
89*61c4878aSAndroid Build Coastguard Worker     RunOneTaskResult result = RunOneTask(dispatcher, task);
90*61c4878aSAndroid Build Coastguard Worker     if (result.completed_main_task() || result.completed_all_tasks()) {
91*61c4878aSAndroid Build Coastguard Worker       return;
92*61c4878aSAndroid Build Coastguard Worker     }
93*61c4878aSAndroid Build Coastguard Worker     if (!result.ran_a_task()) {
94*61c4878aSAndroid Build Coastguard Worker       SleepInfo sleep_info = AttemptRequestWake(/*allow_empty=*/false);
95*61c4878aSAndroid Build Coastguard Worker       if (sleep_info.should_sleep()) {
96*61c4878aSAndroid Build Coastguard Worker         if (!NativeWaitForWake().ok()) {
97*61c4878aSAndroid Build Coastguard Worker           break;
98*61c4878aSAndroid Build Coastguard Worker         }
99*61c4878aSAndroid Build Coastguard Worker       }
100*61c4878aSAndroid Build Coastguard Worker     }
101*61c4878aSAndroid Build Coastguard Worker   }
102*61c4878aSAndroid Build Coastguard Worker }
103*61c4878aSAndroid Build Coastguard Worker 
NativeWaitForWake()104*61c4878aSAndroid Build Coastguard Worker Status NativeDispatcher::NativeWaitForWake() {
105*61c4878aSAndroid Build Coastguard Worker   std::array<epoll_event, kMaxEventsToProcessAtOnce> events;
106*61c4878aSAndroid Build Coastguard Worker 
107*61c4878aSAndroid Build Coastguard Worker   int num_events =
108*61c4878aSAndroid Build Coastguard Worker       epoll_wait(epoll_fd_, events.data(), events.size(), /*timeout=*/-1);
109*61c4878aSAndroid Build Coastguard Worker   if (num_events < 0) {
110*61c4878aSAndroid Build Coastguard Worker     if (errno == EINTR) {
111*61c4878aSAndroid Build Coastguard Worker       return OkStatus();
112*61c4878aSAndroid Build Coastguard Worker     }
113*61c4878aSAndroid Build Coastguard Worker 
114*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Dispatcher failed to wait for incoming events: %s",
115*61c4878aSAndroid Build Coastguard Worker                  std::strerror(errno));
116*61c4878aSAndroid Build Coastguard Worker     return Status::Internal();
117*61c4878aSAndroid Build Coastguard Worker   }
118*61c4878aSAndroid Build Coastguard Worker 
119*61c4878aSAndroid Build Coastguard Worker   for (int i = 0; i < num_events; ++i) {
120*61c4878aSAndroid Build Coastguard Worker     epoll_event& event = events[i];
121*61c4878aSAndroid Build Coastguard Worker     if (event.data.fd == wait_fd_) {
122*61c4878aSAndroid Build Coastguard Worker       // Consume the wake notification.
123*61c4878aSAndroid Build Coastguard Worker       char unused;
124*61c4878aSAndroid Build Coastguard Worker       ssize_t bytes_read = read(wait_fd_, &unused, 1);
125*61c4878aSAndroid Build Coastguard Worker       PW_CHECK_INT_EQ(
126*61c4878aSAndroid Build Coastguard Worker           bytes_read, 1, "Dispatcher failed to read wake notification");
127*61c4878aSAndroid Build Coastguard Worker       PW_DCHECK_INT_EQ(unused, kNotificationSignal);
128*61c4878aSAndroid Build Coastguard Worker       continue;
129*61c4878aSAndroid Build Coastguard Worker     }
130*61c4878aSAndroid Build Coastguard Worker 
131*61c4878aSAndroid Build Coastguard Worker     // Debug log for missed events.
132*61c4878aSAndroid Build Coastguard Worker     if (PW_LOG_LEVEL >= PW_LOG_LEVEL_DEBUG &&
133*61c4878aSAndroid Build Coastguard Worker         wakers_[event.data.fd].read.IsEmpty() &&
134*61c4878aSAndroid Build Coastguard Worker         wakers_[event.data.fd].write.IsEmpty()) {
135*61c4878aSAndroid Build Coastguard Worker       PW_LOG_DEBUG(
136*61c4878aSAndroid Build Coastguard Worker           "Received an event for registered file descriptor %d, but there is "
137*61c4878aSAndroid Build Coastguard Worker           "no task to wake",
138*61c4878aSAndroid Build Coastguard Worker           event.data.fd);
139*61c4878aSAndroid Build Coastguard Worker     }
140*61c4878aSAndroid Build Coastguard Worker 
141*61c4878aSAndroid Build Coastguard Worker     if ((event.events & (EPOLLIN | EPOLLRDHUP)) != 0) {
142*61c4878aSAndroid Build Coastguard Worker       std::move(wakers_[event.data.fd].read).Wake();
143*61c4878aSAndroid Build Coastguard Worker     }
144*61c4878aSAndroid Build Coastguard Worker     if ((event.events & EPOLLOUT) != 0) {
145*61c4878aSAndroid Build Coastguard Worker       std::move(wakers_[event.data.fd].write).Wake();
146*61c4878aSAndroid Build Coastguard Worker     }
147*61c4878aSAndroid Build Coastguard Worker   }
148*61c4878aSAndroid Build Coastguard Worker 
149*61c4878aSAndroid Build Coastguard Worker   return OkStatus();
150*61c4878aSAndroid Build Coastguard Worker }
151*61c4878aSAndroid Build Coastguard Worker 
NativeRegisterFileDescriptor(int fd,FileDescriptorType type)152*61c4878aSAndroid Build Coastguard Worker Status NativeDispatcher::NativeRegisterFileDescriptor(int fd,
153*61c4878aSAndroid Build Coastguard Worker                                                       FileDescriptorType type) {
154*61c4878aSAndroid Build Coastguard Worker   epoll_event event;
155*61c4878aSAndroid Build Coastguard Worker   event.events = EPOLLET;
156*61c4878aSAndroid Build Coastguard Worker   event.data.fd = fd;
157*61c4878aSAndroid Build Coastguard Worker 
158*61c4878aSAndroid Build Coastguard Worker   if ((type & FileDescriptorType::kReadable) != 0) {
159*61c4878aSAndroid Build Coastguard Worker     event.events |= EPOLLIN | EPOLLRDHUP;
160*61c4878aSAndroid Build Coastguard Worker   }
161*61c4878aSAndroid Build Coastguard Worker   if ((type & FileDescriptorType::kWritable) != 0) {
162*61c4878aSAndroid Build Coastguard Worker     event.events |= EPOLLOUT;
163*61c4878aSAndroid Build Coastguard Worker   }
164*61c4878aSAndroid Build Coastguard Worker 
165*61c4878aSAndroid Build Coastguard Worker   if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) == -1) {
166*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Failed to register epoll event: %s", std::strerror(errno));
167*61c4878aSAndroid Build Coastguard Worker     return Status::Internal();
168*61c4878aSAndroid Build Coastguard Worker   }
169*61c4878aSAndroid Build Coastguard Worker 
170*61c4878aSAndroid Build Coastguard Worker   return OkStatus();
171*61c4878aSAndroid Build Coastguard Worker }
172*61c4878aSAndroid Build Coastguard Worker 
NativeUnregisterFileDescriptor(int fd)173*61c4878aSAndroid Build Coastguard Worker Status NativeDispatcher::NativeUnregisterFileDescriptor(int fd) {
174*61c4878aSAndroid Build Coastguard Worker   epoll_event event;
175*61c4878aSAndroid Build Coastguard Worker   event.data.fd = fd;
176*61c4878aSAndroid Build Coastguard Worker   if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event) == -1) {
177*61c4878aSAndroid Build Coastguard Worker     PW_LOG_ERROR("Failed to unregister epoll event: %s", std::strerror(errno));
178*61c4878aSAndroid Build Coastguard Worker     return Status::Internal();
179*61c4878aSAndroid Build Coastguard Worker   }
180*61c4878aSAndroid Build Coastguard Worker   wakers_.erase(fd);
181*61c4878aSAndroid Build Coastguard Worker   return OkStatus();
182*61c4878aSAndroid Build Coastguard Worker }
183*61c4878aSAndroid Build Coastguard Worker 
DoWake()184*61c4878aSAndroid Build Coastguard Worker void NativeDispatcher::DoWake() {
185*61c4878aSAndroid Build Coastguard Worker   // Perform a write to unblock the waiting dispatcher.
186*61c4878aSAndroid Build Coastguard Worker   //
187*61c4878aSAndroid Build Coastguard Worker   // We ignore the result of the write, since nonblocking writes can
188*61c4878aSAndroid Build Coastguard Worker   // fail due to there already being messages in the `notify_fd_` pipe.
189*61c4878aSAndroid Build Coastguard Worker   // This is fine, since it means that the dispatcher thread is already queued
190*61c4878aSAndroid Build Coastguard Worker   // to wake up.
191*61c4878aSAndroid Build Coastguard Worker   write(notify_fd_, &kNotificationSignal, 1);
192*61c4878aSAndroid Build Coastguard Worker }
193*61c4878aSAndroid Build Coastguard Worker 
194*61c4878aSAndroid Build Coastguard Worker }  // namespace pw::async2::backend
195