1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/reactor.h"
18
19 #include <bluetooth/log.h>
20 #include <fcntl.h>
21 #include <sys/epoll.h>
22 #include <sys/eventfd.h>
23 #include <unistd.h>
24
25 #include <algorithm>
26 #include <cerrno>
27 #include <cinttypes>
28 #include <cstring>
29
30 namespace {
31
32 // Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory
33 constexpr int kEpollMaxEvents = 64;
34 constexpr uint64_t kStopReactor = 1 << 0;
35 constexpr uint64_t kWaitForIdle = 1 << 1;
36
37 } // namespace
38
39 namespace bluetooth {
40 namespace os {
41 using common::Closure;
42
43 struct Reactor::Event::impl {
implbluetooth::os::Reactor::Event::impl44 impl() {
45 fd_ = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
46 log::assert_that(fd_ != -1, "Unable to create nonblocking event file descriptor semaphore");
47 }
~implbluetooth::os::Reactor::Event::impl48 ~impl() {
49 if (fd_ != -1) {
50 close(fd_);
51 fd_ = -1;
52 }
53 }
54 int fd_ = -1;
55 };
56
Event()57 Reactor::Event::Event() : pimpl_(new impl()) {}
~Event()58 Reactor::Event::~Event() { delete pimpl_; }
59
Read()60 bool Reactor::Event::Read() {
61 uint64_t val = 0;
62 return eventfd_read(pimpl_->fd_, &val) == 0;
63 }
Id() const64 int Reactor::Event::Id() const { return pimpl_->fd_; }
Clear()65 void Reactor::Event::Clear() {
66 uint64_t val;
67 while (eventfd_read(pimpl_->fd_, &val) == 0) {
68 }
69 }
Close()70 void Reactor::Event::Close() {
71 int close_status;
72 RUN_NO_INTR(close_status = close(pimpl_->fd_));
73 log::assert_that(close_status != -1, "assert failed: close_status != -1");
74 pimpl_->fd_ = -1;
75 }
Notify()76 void Reactor::Event::Notify() {
77 uint64_t val = 1;
78 auto write_result = eventfd_write(pimpl_->fd_, val);
79 log::assert_that(write_result != -1, "assert failed: write_result != -1");
80 }
81
82 class Reactor::Reactable {
83 public:
Reactable(int fd,Closure on_read_ready,Closure on_write_ready)84 Reactable(int fd, Closure on_read_ready, Closure on_write_ready)
85 : fd_(fd),
86 on_read_ready_(std::move(on_read_ready)),
87 on_write_ready_(std::move(on_write_ready)),
88 is_executing_(false),
89 removed_(false) {}
90 const int fd_;
91 Closure on_read_ready_;
92 Closure on_write_ready_;
93 bool is_executing_;
94 bool removed_;
95 std::mutex mutex_;
96 std::unique_ptr<std::promise<void>> finished_promise_;
97 };
98
Reactor()99 Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false) {
100 RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC));
101 log::assert_that(epoll_fd_ != -1, "could not create epoll fd: {}", strerror(errno));
102
103 control_fd_ = eventfd(0, EFD_NONBLOCK);
104 log::assert_that(control_fd_ != -1, "assert failed: control_fd_ != -1");
105
106 epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}};
107 int result;
108 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event));
109 log::assert_that(result != -1, "assert failed: result != -1");
110 }
111
~Reactor()112 Reactor::~Reactor() {
113 int result;
114 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr));
115 log::assert_that(result != -1, "assert failed: result != -1");
116
117 RUN_NO_INTR(result = close(control_fd_));
118 log::assert_that(result != -1, "assert failed: result != -1");
119
120 RUN_NO_INTR(result = close(epoll_fd_));
121 log::assert_that(result != -1, "assert failed: result != -1");
122 }
123
Run()124 void Reactor::Run() {
125 bool already_running = is_running_.exchange(true);
126 log::assert_that(!already_running, "assert failed: !already_running");
127
128 int timeout_ms = -1;
129 bool waiting_for_idle = false;
130 for (;;) {
131 {
132 std::unique_lock<std::mutex> lock(mutex_);
133 invalidation_list_.clear();
134 }
135 epoll_event events[kEpollMaxEvents];
136 int count;
137 RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, timeout_ms));
138 log::assert_that(count != -1, "epoll_wait failed: fd={}, err={}", epoll_fd_, strerror(errno));
139 if (waiting_for_idle && count == 0) {
140 timeout_ms = -1;
141 waiting_for_idle = false;
142 std::scoped_lock<std::mutex> lock(mutex_);
143 idle_promise_->set_value();
144 idle_promise_ = nullptr;
145 }
146
147 for (int i = 0; i < count; ++i) {
148 auto event = events[i];
149 log::assert_that(event.events != 0u, "assert failed: event.events != 0u");
150
151 // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered
152 if (event.data.ptr == nullptr) {
153 uint64_t value;
154 eventfd_read(control_fd_, &value);
155 if ((value & kStopReactor) != 0) {
156 is_running_ = false;
157 return;
158 } else if ((value & kWaitForIdle) != 0) {
159 timeout_ms = 30;
160 waiting_for_idle = true;
161 continue;
162 } else {
163 log::error("Unknown control_fd value {:x}", value);
164 continue;
165 }
166 }
167 auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr);
168 std::unique_lock<std::mutex> lock(mutex_);
169 executing_reactable_finished_ = nullptr;
170 // See if this reactable has been removed in the meantime.
171 if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) !=
172 invalidation_list_.end()) {
173 continue;
174 }
175
176 {
177 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
178 lock.unlock();
179 reactable->is_executing_ = true;
180 }
181 if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) &&
182 !reactable->on_read_ready_.is_null()) {
183 reactable->on_read_ready_.Run();
184 }
185 if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) {
186 reactable->on_write_ready_.Run();
187 }
188 {
189 std::unique_lock<std::mutex> reactable_lock(reactable->mutex_);
190 reactable->is_executing_ = false;
191 if (reactable->removed_) {
192 reactable->finished_promise_->set_value();
193 reactable_lock.unlock();
194 delete reactable;
195 }
196 }
197 }
198 }
199 }
200
Stop()201 void Reactor::Stop() {
202 if (!is_running_) {
203 log::warn("not running, will stop once it's started");
204 }
205 auto control = eventfd_write(control_fd_, kStopReactor);
206 log::assert_that(control != -1, "assert failed: control != -1");
207 }
208
NewEvent() const209 std::unique_ptr<Reactor::Event> Reactor::NewEvent() const {
210 return std::make_unique<Reactor::Event>();
211 }
212
Register(int fd,Closure on_read_ready,Closure on_write_ready)213 Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) {
214 uint32_t poll_event_type = 0;
215 if (!on_read_ready.is_null()) {
216 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
217 }
218 if (!on_write_ready.is_null()) {
219 poll_event_type |= EPOLLOUT;
220 }
221 auto* reactable = new Reactable(fd, on_read_ready, on_write_ready);
222 epoll_event event = {
223 .events = poll_event_type,
224 .data = {.ptr = reactable},
225 };
226 int register_fd;
227 RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event));
228 log::assert_that(register_fd != -1, "assert failed: register_fd != -1");
229 return reactable;
230 }
231
Unregister(Reactor::Reactable * reactable)232 void Reactor::Unregister(Reactor::Reactable* reactable) {
233 log::assert_that(reactable != nullptr, "assert failed: reactable != nullptr");
234 {
235 std::lock_guard<std::mutex> lock(mutex_);
236 invalidation_list_.push_back(reactable);
237 }
238 bool delaying_delete_until_callback_finished = false;
239 {
240 int result;
241 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
242 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr));
243 if (result == -1 && errno == ENOENT) {
244 log::info("reactable is invalid or unregistered");
245 } else {
246 log::assert_that(result != -1, "could not unregister epoll fd: {}", strerror(errno));
247 }
248
249 // If we are unregistering during the callback event from this reactable, we delete it after the
250 // callback is executed. reactable->is_executing_ is protected by reactable->mutex_, so it's
251 // thread safe.
252 if (reactable->is_executing_) {
253 reactable->removed_ = true;
254 reactable->finished_promise_ = std::make_unique<std::promise<void>>();
255 executing_reactable_finished_ =
256 std::make_shared<std::future<void>>(reactable->finished_promise_->get_future());
257 delaying_delete_until_callback_finished = true;
258 }
259 }
260 // If we are unregistering outside of the callback event from this reactable, we delete it now
261 if (!delaying_delete_until_callback_finished) {
262 delete reactable;
263 }
264 }
265
WaitForUnregisteredReactable(std::chrono::milliseconds timeout)266 bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
267 std::lock_guard<std::mutex> lock(mutex_);
268 if (executing_reactable_finished_ == nullptr) {
269 return true;
270 }
271 auto stop_status = executing_reactable_finished_->wait_for(timeout);
272 if (stop_status != std::future_status::ready) {
273 log::error("Unregister reactable timed out");
274 }
275 return stop_status == std::future_status::ready;
276 }
277
WaitForIdle(std::chrono::milliseconds timeout)278 bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) {
279 auto promise = std::make_shared<std::promise<void>>();
280 auto future = std::make_unique<std::future<void>>(promise->get_future());
281 {
282 std::lock_guard<std::mutex> lock(mutex_);
283 idle_promise_ = promise;
284 }
285
286 auto control = eventfd_write(control_fd_, kWaitForIdle);
287 log::assert_that(control != -1, "assert failed: control != -1");
288
289 auto idle_status = future->wait_for(timeout);
290 return idle_status == std::future_status::ready;
291 }
292
ModifyRegistration(Reactor::Reactable * reactable,ReactOn react_on)293 void Reactor::ModifyRegistration(Reactor::Reactable* reactable, ReactOn react_on) {
294 log::assert_that(reactable != nullptr, "assert failed: reactable != nullptr");
295
296 uint32_t poll_event_type = 0;
297 if (react_on == REACT_ON_READ_ONLY || react_on == REACT_ON_READ_WRITE) {
298 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
299 }
300 if (react_on == REACT_ON_WRITE_ONLY || react_on == REACT_ON_READ_WRITE) {
301 poll_event_type |= EPOLLOUT;
302 }
303 epoll_event event = {
304 .events = poll_event_type,
305 .data = {.ptr = reactable},
306 };
307 int modify_fd;
308 RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event));
309 log::assert_that(modify_fd != -1, "assert failed: modify_fd != -1");
310 }
311
312 } // namespace os
313 } // namespace bluetooth
314