1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "os/reactor.h"
18 
19 #include <bluetooth/log.h>
20 #include <fcntl.h>
21 #include <sys/epoll.h>
22 #include <sys/eventfd.h>
23 #include <unistd.h>
24 
25 #include <algorithm>
26 #include <cerrno>
27 #include <cinttypes>
28 #include <cstring>
29 
30 namespace {
31 
32 // Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory
33 constexpr int kEpollMaxEvents = 64;
34 constexpr uint64_t kStopReactor = 1 << 0;
35 constexpr uint64_t kWaitForIdle = 1 << 1;
36 
37 }  // namespace
38 
39 namespace bluetooth {
40 namespace os {
41 using common::Closure;
42 
43 struct Reactor::Event::impl {
implbluetooth::os::Reactor::Event::impl44   impl() {
45     fd_ = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
46     log::assert_that(fd_ != -1, "Unable to create nonblocking event file descriptor semaphore");
47   }
~implbluetooth::os::Reactor::Event::impl48   ~impl() {
49     if (fd_ != -1) {
50       close(fd_);
51       fd_ = -1;
52     }
53   }
54   int fd_ = -1;
55 };
56 
Event()57 Reactor::Event::Event() : pimpl_(new impl()) {}
~Event()58 Reactor::Event::~Event() { delete pimpl_; }
59 
Read()60 bool Reactor::Event::Read() {
61   uint64_t val = 0;
62   return eventfd_read(pimpl_->fd_, &val) == 0;
63 }
Id() const64 int Reactor::Event::Id() const { return pimpl_->fd_; }
Clear()65 void Reactor::Event::Clear() {
66   uint64_t val;
67   while (eventfd_read(pimpl_->fd_, &val) == 0) {
68   }
69 }
Close()70 void Reactor::Event::Close() {
71   int close_status;
72   RUN_NO_INTR(close_status = close(pimpl_->fd_));
73   log::assert_that(close_status != -1, "assert failed: close_status != -1");
74   pimpl_->fd_ = -1;
75 }
Notify()76 void Reactor::Event::Notify() {
77   uint64_t val = 1;
78   auto write_result = eventfd_write(pimpl_->fd_, val);
79   log::assert_that(write_result != -1, "assert failed: write_result != -1");
80 }
81 
82 class Reactor::Reactable {
83 public:
Reactable(int fd,Closure on_read_ready,Closure on_write_ready)84   Reactable(int fd, Closure on_read_ready, Closure on_write_ready)
85       : fd_(fd),
86         on_read_ready_(std::move(on_read_ready)),
87         on_write_ready_(std::move(on_write_ready)),
88         is_executing_(false),
89         removed_(false) {}
90   const int fd_;
91   Closure on_read_ready_;
92   Closure on_write_ready_;
93   bool is_executing_;
94   bool removed_;
95   std::mutex mutex_;
96   std::unique_ptr<std::promise<void>> finished_promise_;
97 };
98 
Reactor()99 Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false) {
100   RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC));
101   log::assert_that(epoll_fd_ != -1, "could not create epoll fd: {}", strerror(errno));
102 
103   control_fd_ = eventfd(0, EFD_NONBLOCK);
104   log::assert_that(control_fd_ != -1, "assert failed: control_fd_ != -1");
105 
106   epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}};
107   int result;
108   RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event));
109   log::assert_that(result != -1, "assert failed: result != -1");
110 }
111 
~Reactor()112 Reactor::~Reactor() {
113   int result;
114   RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr));
115   log::assert_that(result != -1, "assert failed: result != -1");
116 
117   RUN_NO_INTR(result = close(control_fd_));
118   log::assert_that(result != -1, "assert failed: result != -1");
119 
120   RUN_NO_INTR(result = close(epoll_fd_));
121   log::assert_that(result != -1, "assert failed: result != -1");
122 }
123 
Run()124 void Reactor::Run() {
125   bool already_running = is_running_.exchange(true);
126   log::assert_that(!already_running, "assert failed: !already_running");
127 
128   int timeout_ms = -1;
129   bool waiting_for_idle = false;
130   for (;;) {
131     {
132       std::unique_lock<std::mutex> lock(mutex_);
133       invalidation_list_.clear();
134     }
135     epoll_event events[kEpollMaxEvents];
136     int count;
137     RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, timeout_ms));
138     log::assert_that(count != -1, "epoll_wait failed: fd={}, err={}", epoll_fd_, strerror(errno));
139     if (waiting_for_idle && count == 0) {
140       timeout_ms = -1;
141       waiting_for_idle = false;
142       std::scoped_lock<std::mutex> lock(mutex_);
143       idle_promise_->set_value();
144       idle_promise_ = nullptr;
145     }
146 
147     for (int i = 0; i < count; ++i) {
148       auto event = events[i];
149       log::assert_that(event.events != 0u, "assert failed: event.events != 0u");
150 
151       // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered
152       if (event.data.ptr == nullptr) {
153         uint64_t value;
154         eventfd_read(control_fd_, &value);
155         if ((value & kStopReactor) != 0) {
156           is_running_ = false;
157           return;
158         } else if ((value & kWaitForIdle) != 0) {
159           timeout_ms = 30;
160           waiting_for_idle = true;
161           continue;
162         } else {
163           log::error("Unknown control_fd value {:x}", value);
164           continue;
165         }
166       }
167       auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr);
168       std::unique_lock<std::mutex> lock(mutex_);
169       executing_reactable_finished_ = nullptr;
170       // See if this reactable has been removed in the meantime.
171       if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) !=
172           invalidation_list_.end()) {
173         continue;
174       }
175 
176       {
177         std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
178         lock.unlock();
179         reactable->is_executing_ = true;
180       }
181       if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) &&
182           !reactable->on_read_ready_.is_null()) {
183         reactable->on_read_ready_.Run();
184       }
185       if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) {
186         reactable->on_write_ready_.Run();
187       }
188       {
189         std::unique_lock<std::mutex> reactable_lock(reactable->mutex_);
190         reactable->is_executing_ = false;
191         if (reactable->removed_) {
192           reactable->finished_promise_->set_value();
193           reactable_lock.unlock();
194           delete reactable;
195         }
196       }
197     }
198   }
199 }
200 
Stop()201 void Reactor::Stop() {
202   if (!is_running_) {
203     log::warn("not running, will stop once it's started");
204   }
205   auto control = eventfd_write(control_fd_, kStopReactor);
206   log::assert_that(control != -1, "assert failed: control != -1");
207 }
208 
NewEvent() const209 std::unique_ptr<Reactor::Event> Reactor::NewEvent() const {
210   return std::make_unique<Reactor::Event>();
211 }
212 
Register(int fd,Closure on_read_ready,Closure on_write_ready)213 Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) {
214   uint32_t poll_event_type = 0;
215   if (!on_read_ready.is_null()) {
216     poll_event_type |= (EPOLLIN | EPOLLRDHUP);
217   }
218   if (!on_write_ready.is_null()) {
219     poll_event_type |= EPOLLOUT;
220   }
221   auto* reactable = new Reactable(fd, on_read_ready, on_write_ready);
222   epoll_event event = {
223           .events = poll_event_type,
224           .data = {.ptr = reactable},
225   };
226   int register_fd;
227   RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event));
228   log::assert_that(register_fd != -1, "assert failed: register_fd != -1");
229   return reactable;
230 }
231 
Unregister(Reactor::Reactable * reactable)232 void Reactor::Unregister(Reactor::Reactable* reactable) {
233   log::assert_that(reactable != nullptr, "assert failed: reactable != nullptr");
234   {
235     std::lock_guard<std::mutex> lock(mutex_);
236     invalidation_list_.push_back(reactable);
237   }
238   bool delaying_delete_until_callback_finished = false;
239   {
240     int result;
241     std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
242     RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr));
243     if (result == -1 && errno == ENOENT) {
244       log::info("reactable is invalid or unregistered");
245     } else {
246       log::assert_that(result != -1, "could not unregister epoll fd: {}", strerror(errno));
247     }
248 
249     // If we are unregistering during the callback event from this reactable, we delete it after the
250     // callback is executed. reactable->is_executing_ is protected by reactable->mutex_, so it's
251     // thread safe.
252     if (reactable->is_executing_) {
253       reactable->removed_ = true;
254       reactable->finished_promise_ = std::make_unique<std::promise<void>>();
255       executing_reactable_finished_ =
256               std::make_shared<std::future<void>>(reactable->finished_promise_->get_future());
257       delaying_delete_until_callback_finished = true;
258     }
259   }
260   // If we are unregistering outside of the callback event from this reactable, we delete it now
261   if (!delaying_delete_until_callback_finished) {
262     delete reactable;
263   }
264 }
265 
WaitForUnregisteredReactable(std::chrono::milliseconds timeout)266 bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
267   std::lock_guard<std::mutex> lock(mutex_);
268   if (executing_reactable_finished_ == nullptr) {
269     return true;
270   }
271   auto stop_status = executing_reactable_finished_->wait_for(timeout);
272   if (stop_status != std::future_status::ready) {
273     log::error("Unregister reactable timed out");
274   }
275   return stop_status == std::future_status::ready;
276 }
277 
WaitForIdle(std::chrono::milliseconds timeout)278 bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) {
279   auto promise = std::make_shared<std::promise<void>>();
280   auto future = std::make_unique<std::future<void>>(promise->get_future());
281   {
282     std::lock_guard<std::mutex> lock(mutex_);
283     idle_promise_ = promise;
284   }
285 
286   auto control = eventfd_write(control_fd_, kWaitForIdle);
287   log::assert_that(control != -1, "assert failed: control != -1");
288 
289   auto idle_status = future->wait_for(timeout);
290   return idle_status == std::future_status::ready;
291 }
292 
ModifyRegistration(Reactor::Reactable * reactable,ReactOn react_on)293 void Reactor::ModifyRegistration(Reactor::Reactable* reactable, ReactOn react_on) {
294   log::assert_that(reactable != nullptr, "assert failed: reactable != nullptr");
295 
296   uint32_t poll_event_type = 0;
297   if (react_on == REACT_ON_READ_ONLY || react_on == REACT_ON_READ_WRITE) {
298     poll_event_type |= (EPOLLIN | EPOLLRDHUP);
299   }
300   if (react_on == REACT_ON_WRITE_ONLY || react_on == REACT_ON_READ_WRITE) {
301     poll_event_type |= EPOLLOUT;
302   }
303   epoll_event event = {
304           .events = poll_event_type,
305           .data = {.ptr = reactable},
306   };
307   int modify_fd;
308   RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event));
309   log::assert_that(modify_fd != -1, "assert failed: modify_fd != -1");
310 }
311 
312 }  // namespace os
313 }  // namespace bluetooth
314