1 /******************************************************************************
2 *
3 * Copyright 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
19 #define LOG_TAG "bt_osi_reactor"
20
21 #include "osi/include/reactor.h"
22
23 #include <bluetooth/log.h>
24 #include <pthread.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/epoll.h>
28 #include <sys/eventfd.h>
29 #include <unistd.h>
30
31 #include <mutex>
32
33 #include "osi/include/allocator.h"
34 #include "osi/include/list.h"
35
36 #if !defined(EFD_SEMAPHORE)
37 #define EFD_SEMAPHORE (1 << 0)
38 #endif
39
40 using namespace bluetooth;
41
42 struct reactor_t {
43 int epoll_fd;
44 int event_fd;
45 std::mutex* list_mutex;
46 list_t* invalidation_list; // reactor objects that have been unregistered.
47 pthread_t run_thread; // the pthread on which reactor_run is executing.
48 bool is_running; // indicates whether |run_thread| is valid.
49 bool object_removed;
50 };
51
52 struct reactor_object_t {
53 int fd; // the file descriptor to monitor for events.
54 void* context; // a context that's passed back to the *_ready functions.
55 reactor_t* reactor; // the reactor instance this object is registered with.
56 std::mutex* mutex; // protects the lifetime of this object and all variables.
57
58 void (*read_ready)(void* context); // function to call when the file
59 // descriptor becomes readable.
60 void (*write_ready)(void* context); // function to call when the file
61 // descriptor becomes writeable.
62 };
63
64 static reactor_status_t run_reactor(reactor_t* reactor, int iterations);
65
66 static const size_t MAX_EVENTS = 64;
67 static const eventfd_t EVENT_REACTOR_STOP = 1;
68
reactor_new(void)69 reactor_t* reactor_new(void) {
70 reactor_t* ret = (reactor_t*)osi_calloc(sizeof(reactor_t));
71
72 ret->epoll_fd = INVALID_FD;
73 ret->event_fd = INVALID_FD;
74
75 ret->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
76 if (ret->epoll_fd == INVALID_FD) {
77 log::error("unable to create epoll instance: {}", strerror(errno));
78 goto error;
79 }
80
81 ret->event_fd = eventfd(0, 0);
82 if (ret->event_fd == INVALID_FD) {
83 log::error("unable to create eventfd: {}", strerror(errno));
84 goto error;
85 }
86
87 ret->list_mutex = new std::mutex;
88 ret->invalidation_list = list_new(NULL);
89 if (!ret->invalidation_list) {
90 log::error("unable to allocate object invalidation list.");
91 goto error;
92 }
93
94 struct epoll_event event;
95 memset(&event, 0, sizeof(event));
96 event.events = EPOLLIN;
97 event.data.ptr = NULL;
98 if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {
99 log::error("unable to register eventfd with epoll set: {}", strerror(errno));
100 goto error;
101 }
102
103 return ret;
104
105 error:;
106 reactor_free(ret);
107 return NULL;
108 }
109
reactor_free(reactor_t * reactor)110 void reactor_free(reactor_t* reactor) {
111 if (!reactor) {
112 return;
113 }
114
115 list_free(reactor->invalidation_list);
116 close(reactor->event_fd);
117 close(reactor->epoll_fd);
118 delete reactor->list_mutex;
119 osi_free(reactor);
120 }
121
reactor_start(reactor_t * reactor)122 reactor_status_t reactor_start(reactor_t* reactor) {
123 log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
124 return run_reactor(reactor, 0);
125 }
126
reactor_run_once(reactor_t * reactor)127 reactor_status_t reactor_run_once(reactor_t* reactor) {
128 log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
129 return run_reactor(reactor, 1);
130 }
131
reactor_stop(reactor_t * reactor)132 void reactor_stop(reactor_t* reactor) {
133 log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
134
135 eventfd_write(reactor->event_fd, EVENT_REACTOR_STOP);
136 }
137
reactor_register(reactor_t * reactor,int fd,void * context,void (* read_ready)(void * context),void (* write_ready)(void * context))138 reactor_object_t* reactor_register(reactor_t* reactor, int fd, void* context,
139 void (*read_ready)(void* context),
140 void (*write_ready)(void* context)) {
141 log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
142 log::assert_that(fd != INVALID_FD, "assert failed: fd != INVALID_FD");
143
144 reactor_object_t* object = (reactor_object_t*)osi_calloc(sizeof(reactor_object_t));
145
146 object->reactor = reactor;
147 object->fd = fd;
148 object->context = context;
149 object->read_ready = read_ready;
150 object->write_ready = write_ready;
151 object->mutex = new std::mutex;
152
153 struct epoll_event event;
154 memset(&event, 0, sizeof(event));
155 if (read_ready) {
156 event.events |= (EPOLLIN | EPOLLRDHUP);
157 }
158 if (write_ready) {
159 event.events |= EPOLLOUT;
160 }
161 event.data.ptr = object;
162
163 if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
164 log::error("unable to register fd {} to epoll set: {}", fd, strerror(errno));
165 delete object->mutex;
166 osi_free(object);
167 return NULL;
168 }
169
170 return object;
171 }
172
reactor_change_registration(reactor_object_t * object,void (* read_ready)(void * context),void (* write_ready)(void * context))173 bool reactor_change_registration(reactor_object_t* object, void (*read_ready)(void* context),
174 void (*write_ready)(void* context)) {
175 log::assert_that(object != NULL, "assert failed: object != NULL");
176
177 struct epoll_event event;
178 memset(&event, 0, sizeof(event));
179 if (read_ready) {
180 event.events |= (EPOLLIN | EPOLLRDHUP);
181 }
182 if (write_ready) {
183 event.events |= EPOLLOUT;
184 }
185 event.data.ptr = object;
186
187 if (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) {
188 log::error("unable to modify interest set for fd {}: {}", object->fd, strerror(errno));
189 return false;
190 }
191
192 std::lock_guard<std::mutex> lock(*object->mutex);
193 object->read_ready = read_ready;
194 object->write_ready = write_ready;
195
196 return true;
197 }
198
reactor_unregister(reactor_object_t * obj)199 void reactor_unregister(reactor_object_t* obj) {
200 log::assert_that(obj != NULL, "assert failed: obj != NULL");
201
202 reactor_t* reactor = obj->reactor;
203
204 if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_DEL, obj->fd, NULL) == -1) {
205 log::error("unable to unregister fd {} from epoll set: {}", obj->fd, strerror(errno));
206 }
207
208 if (reactor->is_running && pthread_equal(pthread_self(), reactor->run_thread)) {
209 reactor->object_removed = true;
210 return;
211 }
212
213 {
214 std::unique_lock<std::mutex> lock(*reactor->list_mutex);
215 list_append(reactor->invalidation_list, obj);
216 }
217
218 // Taking the object lock here makes sure a callback for |obj| isn't
219 // currently executing. The reactor thread must then either be before
220 // the callbacks or after. If after, we know that the object won't be
221 // referenced because it has been taken out of the epoll set. If before,
222 // it won't be referenced because the reactor thread will check the
223 // invalidation_list and find it in there. So by taking this lock, we
224 // are waiting until the reactor thread drops all references to |obj|.
225 // One the wait completes, we can unlock and destroy |obj| safely.
226 obj->mutex->lock();
227 obj->mutex->unlock();
228 delete obj->mutex;
229 osi_free(obj);
230 }
231
232 // Runs the reactor loop for a maximum of |iterations|.
233 // 0 |iterations| means loop forever.
234 // |reactor| may not be NULL.
run_reactor(reactor_t * reactor,int iterations)235 static reactor_status_t run_reactor(reactor_t* reactor, int iterations) {
236 log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
237
238 reactor->run_thread = pthread_self();
239 reactor->is_running = true;
240
241 struct epoll_event events[MAX_EVENTS];
242 for (int i = 0; iterations == 0 || i < iterations; ++i) {
243 {
244 std::lock_guard<std::mutex> lock(*reactor->list_mutex);
245 list_clear(reactor->invalidation_list);
246 }
247
248 int ret;
249 OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1));
250 if (ret == -1) {
251 log::error("error in epoll_wait: {}", strerror(errno));
252 reactor->is_running = false;
253 return REACTOR_STATUS_ERROR;
254 }
255
256 for (int j = 0; j < ret; ++j) {
257 // The event file descriptor is the only one that registers with
258 // a NULL data pointer. We use the NULL to identify it and break
259 // out of the reactor loop.
260 if (events[j].data.ptr == NULL) {
261 eventfd_t value;
262 eventfd_read(reactor->event_fd, &value);
263 reactor->is_running = false;
264 return REACTOR_STATUS_STOP;
265 }
266
267 reactor_object_t* object = (reactor_object_t*)events[j].data.ptr;
268
269 std::unique_lock<std::mutex> lock(*reactor->list_mutex);
270 if (list_contains(reactor->invalidation_list, object)) {
271 continue;
272 }
273
274 // Downgrade the list lock to an object lock.
275 {
276 std::lock_guard<std::mutex> obj_lock(*object->mutex);
277 lock.unlock();
278
279 reactor->object_removed = false;
280 if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready) {
281 object->read_ready(object->context);
282 }
283 if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready) {
284 object->write_ready(object->context);
285 }
286 }
287
288 if (reactor->object_removed) {
289 delete object->mutex;
290 osi_free(object);
291 }
292 }
293 }
294
295 reactor->is_running = false;
296 return REACTOR_STATUS_DONE;
297 }
298