1 /*
2  * Copyright 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "async_manager.h"
18 
19 #include <fcntl.h>
20 #include <sys/select.h>
21 #include <unistd.h>
22 
23 #include <algorithm>
24 #include <atomic>
25 #include <condition_variable>
26 #include <mutex>
27 #include <thread>
28 #include <vector>
29 
30 #include "log.h"
31 
32 #ifndef TEMP_FAILURE_RETRY
33 /* Used to retry syscalls that can return EINTR. */
34 #define TEMP_FAILURE_RETRY(exp)            \
35   ({                                       \
36     __typeof__(exp) _rc;                   \
37     do {                                   \
38       _rc = (exp);                         \
39     } while (_rc == -1 && errno == EINTR); \
40     _rc;                                   \
41   })
42 #endif  // TEMP_FAILURE_RETRY
43 
44 namespace rootcanal {
45 // Implementation of AsyncManager is divided between two classes, three if
46 // AsyncManager itself is taken into account, but its only responsability
47 // besides being a proxy for the other two classes is to provide a global
48 // synchronization mechanism for callbacks and client code to use.
49 
50 // The watching of file descriptors is done through AsyncFdWatcher. Several
51 // objects of this class may coexist simultaneosly as they share no state.
52 // After construction of this objects nothing happens beyond some very simple
53 // member initialization. When the first FD is set up for watching the object
54 // starts a new thread which watches the given (and later provided) FDs using
55 // select() inside a loop. A special FD (a pipe) is also watched which is
56 // used to notify the thread of internal changes on the object state (like
57 // the addition of new FDs to watch on). Every access to internal state is
58 // synchronized using a single internal mutex. The thread is only stopped on
59 // destruction of the object, by modifying a flag, which is the only member
60 // variable accessed without acquiring the lock (because the notification to
61 // the thread is done later by writing to a pipe which means the thread will
62 // be notified regardless of what phase of the loop it is in that moment)
63 
64 // The scheduling of asynchronous tasks, periodic or not, is handled by the
65 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
66 // state between different instances so it is safe to use several objects of
67 // this class, also nothing interesting happens upon construction, but only
68 // after a Task has been scheduled and access to internal state is synchronized
69 // using a single internal mutex. When the first task is scheduled a thread
70 // is started which monitors a queue of tasks. The queue is peeked to see
71 // when the next task should be carried out and then the thread performs a
72 // (absolute) timed wait on a condition variable. The wait ends because of a
73 // time out or a notify on the cond var, the former means a task is due
74 // for execution while the later means there has been a change in internal
75 // state, like a task has been scheduled/canceled or the flag to stop has
76 // been set. Setting and querying the stop flag or modifying the task queue
77 // and subsequent notification on the cond var is done atomically (e.g while
78 // holding the lock on the internal mutex) to ensure that the thread never
79 // misses the notification, since notifying a cond var is not persistent as
80 // writing on a pipe (if not done this way, the thread could query the
81 // stopping flag and be put aside by the OS scheduler right after, then the
82 // 'stop thread' procedure could run, setting the flag, notifying a cond
83 // var that no one is waiting on and joining the thread, the thread then
84 // resumes execution believing that it needs to continue and waits on the
85 // cond var possibly forever if there are no tasks scheduled, efectively
86 // causing a deadlock).
87 
88 // This number also states the maximum number of scheduled tasks we can handle
89 // at a given time
90 static const uint16_t kMaxTaskId = -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)91 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
92   return (id == kMaxTaskId) ? 1 : id + 1;
93 }
94 // The buffer is only 10 bytes because the expected number of bytes
95 // written on this socket is 1. It is possible that the thread is notified
96 // more than once but highly unlikely, so a buffer of size 10 seems enough
97 // and the reads are performed inside a while just in case it isn't. From
98 // the thread routine's point of view it is the same to have been notified
99 // just once or 100 times so it just tries to consume the entire buffer.
100 // In the cases where an interrupt would cause read to return without
101 // having read everything that was available a new iteration of the thread
102 // loop will bring execution to this point almost immediately, so there is
103 // no need to treat that case.
104 static const int kNotificationBufferSize = 10;
105 
106 // Async File Descriptor Watcher Implementation:
107 class AsyncManager::AsyncFdWatcher {
108 public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)109   int WatchFdForNonBlockingReads(int file_descriptor,
110                                  const ReadCallback& on_read_fd_ready_callback) {
111     // add file descriptor and callback
112     {
113       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
114       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
115     }
116 
117     // start the thread if not started yet
118     int started = tryStartThread();
119     if (started != 0) {
120       ERROR("{}: Unable to start thread", __func__);
121       return started;
122     }
123 
124     // notify the thread so that it knows of the new FD
125     notifyThread();
126 
127     return 0;
128   }
129 
StopWatchingFileDescriptor(int file_descriptor)130   void StopWatchingFileDescriptor(int file_descriptor) {
131     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
132     watched_shared_fds_.erase(file_descriptor);
133   }
134 
135   AsyncFdWatcher() = default;
136   AsyncFdWatcher(const AsyncFdWatcher&) = delete;
137   AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
138 
139   ~AsyncFdWatcher() = default;
140 
stopThread()141   int stopThread() {
142     if (!std::atomic_exchange(&running_, false)) {
143       return 0;  // if not running already
144     }
145 
146     notifyThread();
147 
148     if (std::this_thread::get_id() != thread_.get_id()) {
149       thread_.join();
150     } else {
151       WARNING("{}: Starting thread stop from inside the reading thread itself", __func__);
152     }
153 
154     {
155       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
156       watched_shared_fds_.clear();
157     }
158 
159     return 0;
160   }
161 
162 private:
163   // Make sure to call this with at least one file descriptor ready to be
164   // watched upon or the thread routine will return immediately
tryStartThread()165   int tryStartThread() {
166     if (std::atomic_exchange(&running_, true)) {
167       return 0;  // if already running
168     }
169     // set up the communication channel
170     int pipe_fds[2];
171     if (pipe(pipe_fds)) {
172       ERROR("{}: Unable to establish a communication channel to the reading "
173             "thread",
174             __func__);
175       return -1;
176     }
177     // configure the fds as non blocking.
178     if (fcntl(pipe_fds[0], F_SETFL, O_NONBLOCK) || fcntl(pipe_fds[1], F_SETFL, O_NONBLOCK)) {
179       ERROR("{}: Unable to configure the communication channel to the reading "
180             "thread",
181             __func__);
182       return -1;
183     }
184 
185     notification_listen_fd_ = pipe_fds[0];
186     notification_write_fd_ = pipe_fds[1];
187 
188     thread_ = std::thread([this]() { ThreadRoutine(); });
189     if (!thread_.joinable()) {
190       ERROR("{}: Unable to start reading thread", __func__);
191       return -1;
192     }
193     return 0;
194   }
195 
notifyThread() const196   int notifyThread() const {
197     char buffer = '0';
198     if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
199       ERROR("{}: Unable to send message to reading thread", __func__);
200       return -1;
201     }
202     return 0;
203   }
204 
setUpFileDescriptorSet(fd_set & read_fds)205   int setUpFileDescriptorSet(fd_set& read_fds) {
206     // add comm channel to the set
207     FD_SET(notification_listen_fd_, &read_fds);
208     int nfds = notification_listen_fd_;
209 
210     // add watched FDs to the set
211     {
212       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
213       for (auto& fdp : watched_shared_fds_) {
214         FD_SET(fdp.first, &read_fds);
215         nfds = std::max(fdp.first, nfds);
216       }
217     }
218     return nfds;
219   }
220 
221   // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds) const222   bool consumeThreadNotifications(fd_set& read_fds) const {
223     if (FD_ISSET(notification_listen_fd_, &read_fds)) {
224       char buffer[kNotificationBufferSize];
225       while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer, kNotificationBufferSize)) ==
226              kNotificationBufferSize) {
227       }
228       return true;
229     }
230     return false;
231   }
232 
233   // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)234   void runAppropriateCallbacks(fd_set& read_fds) {
235     std::vector<decltype(watched_shared_fds_)::value_type> fds;
236     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
237     for (auto& fdc : watched_shared_fds_) {
238       if (FD_ISSET(fdc.first, &read_fds)) {
239         fds.push_back(fdc);
240       }
241     }
242     for (auto& p : fds) {
243       p.second(p.first);
244     }
245   }
246 
ThreadRoutine()247   void ThreadRoutine() {
248     while (running_) {
249       fd_set read_fds;
250       FD_ZERO(&read_fds);
251       int nfds = setUpFileDescriptorSet(read_fds);
252 
253       // wait until there is data available to read on some FD
254       int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
255       if (retval <= 0) {  // there was some error or a timeout
256         ERROR("{}: There was an error while waiting for data on the file "
257               "descriptors: {}",
258               __func__, strerror(errno));
259         continue;
260       }
261 
262       consumeThreadNotifications(read_fds);
263 
264       // Do not read if there was a call to stop running
265       if (!running_) {
266         break;
267       }
268 
269       runAppropriateCallbacks(read_fds);
270     }
271   }
272 
273   std::atomic_bool running_{false};
274   std::thread thread_;
275   std::recursive_mutex internal_mutex_;
276 
277   std::map<int, ReadCallback> watched_shared_fds_;
278 
279   // A pair of FD to send information to the reading thread
280   int notification_listen_fd_{};
281   int notification_write_fd_{};
282 };
283 
284 // Async task manager implementation
285 class AsyncManager::AsyncTaskManager {
286 public:
GetNextUserId()287   AsyncUserId GetNextUserId() { return lastUserId_++; }
288 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)289   AsyncTaskId ExecAsync(AsyncUserId user_id, std::chrono::milliseconds delay,
290                         const TaskCallback& callback) {
291     return scheduleTask(
292             std::make_shared<Task>(std::chrono::steady_clock::now() + delay, callback, user_id));
293   }
294 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)295   AsyncTaskId ExecAsyncPeriodically(AsyncUserId user_id, std::chrono::milliseconds delay,
296                                     std::chrono::milliseconds period,
297                                     const TaskCallback& callback) {
298     return scheduleTask(std::make_shared<Task>(std::chrono::steady_clock::now() + delay, period,
299                                                callback, user_id));
300   }
301 
CancelAsyncTask(AsyncTaskId async_task_id)302   bool CancelAsyncTask(AsyncTaskId async_task_id) {
303     // remove task from queue (and task id association) while holding lock
304     std::unique_lock<std::mutex> guard(internal_mutex_);
305     return cancel_task_with_lock_held(async_task_id);
306   }
307 
CancelAsyncTasksFromUser(AsyncUserId user_id)308   bool CancelAsyncTasksFromUser(AsyncUserId user_id) {
309     // remove task from queue (and task id association) while holding lock
310     std::unique_lock<std::mutex> guard(internal_mutex_);
311     if (tasks_by_user_id_.count(user_id) == 0) {
312       return false;
313     }
314     for (auto task : tasks_by_user_id_[user_id]) {
315       cancel_task_with_lock_held(task);
316     }
317     tasks_by_user_id_.erase(user_id);
318     return true;
319   }
320 
Synchronize(const CriticalCallback & critical)321   void Synchronize(const CriticalCallback& critical) {
322     std::unique_lock<std::mutex> guard(synchronization_mutex_);
323     critical();
324   }
325 
326   AsyncTaskManager() = default;
327   AsyncTaskManager(const AsyncTaskManager&) = delete;
328   AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
329 
330   ~AsyncTaskManager() = default;
331 
stopThread()332   int stopThread() {
333     {
334       std::unique_lock<std::mutex> guard(internal_mutex_);
335       tasks_by_id_.clear();
336       task_queue_.clear();
337       if (!running_) {
338         return 0;
339       }
340       running_ = false;
341       // notify the thread
342       internal_cond_var_.notify_one();
343     }  // release the lock before joining a thread that is likely waiting for it
344     if (std::this_thread::get_id() != thread_.get_id()) {
345       thread_.join();
346     } else {
347       WARNING("{}: Starting thread stop from inside the task thread itself", __func__);
348     }
349     return 0;
350   }
351 
352 private:
353   // Holds the data for each task
354   class Task {
355   public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback,AsyncUserId user)356     Task(std::chrono::steady_clock::time_point time, std::chrono::milliseconds period,
357          const TaskCallback& callback, AsyncUserId user)
358         : time(time),
359           periodic(true),
360           period(period),
361           callback(callback),
362           task_id(kInvalidTaskId),
363           user_id(user) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback,AsyncUserId user)364     Task(std::chrono::steady_clock::time_point time, const TaskCallback& callback, AsyncUserId user)
365         : time(time), periodic(false), callback(callback), task_id(kInvalidTaskId), user_id(user) {}
366 
367     // Operators needed to be in a collection
operator <(const Task & another) const368     bool operator<(const Task& another) const {
369       return std::make_pair(time, task_id) < std::make_pair(another.time, another.task_id);
370     }
371 
isPeriodic() const372     bool isPeriodic() const { return periodic; }
373 
374     // These fields should no longer be public if the class ever becomes
375     // public or gets more complex
376     std::chrono::steady_clock::time_point time;
377     bool periodic;
378     std::chrono::milliseconds period{};
379     std::mutex in_callback;  // Taken when the callback is active
380     TaskCallback callback;
381     AsyncTaskId task_id;
382     AsyncUserId user_id;
383   };
384 
385   // A comparator class to put shared pointers to tasks in an ordered set
386   struct task_p_comparator {
operator ()rootcanal::AsyncManager::AsyncTaskManager::task_p_comparator387     bool operator()(const std::shared_ptr<Task>& t1, const std::shared_ptr<Task>& t2) const {
388       return *t1 < *t2;
389     }
390   };
391 
cancel_task_with_lock_held(AsyncTaskId async_task_id)392   bool cancel_task_with_lock_held(AsyncTaskId async_task_id) {
393     if (tasks_by_id_.count(async_task_id) == 0) {
394       return false;
395     }
396 
397     // Now make sure we are not running this task.
398     // 2 cases:
399     // - This is called from thread_, this means a running
400     //   scheduled task is actually unregistering. All bets are off.
401     // - Another thread is calling us, let's make sure the task is not active.
402     if (thread_.get_id() != std::this_thread::get_id()) {
403       auto task = tasks_by_id_[async_task_id];
404       const std::lock_guard<std::mutex> lock(task->in_callback);
405       task_queue_.erase(task);
406       tasks_by_id_.erase(async_task_id);
407     } else {
408       task_queue_.erase(tasks_by_id_[async_task_id]);
409       tasks_by_id_.erase(async_task_id);
410     }
411 
412     return true;
413   }
414 
scheduleTask(const std::shared_ptr<Task> & task)415   AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
416     {
417       std::unique_lock<std::mutex> guard(internal_mutex_);
418       // no more room for new tasks, we need a larger type for IDs
419       if (tasks_by_id_.size() == kMaxTaskId) {  // TODO potentially type unsafe
420         return kInvalidTaskId;
421       }
422       do {
423         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
424       } while (isTaskIdInUse(lastTaskId_));
425       task->task_id = lastTaskId_;
426       // add task to the queue and map
427       tasks_by_id_[lastTaskId_] = task;
428       tasks_by_user_id_[task->user_id].insert(task->task_id);
429       task_queue_.insert(task);
430     }
431     // start thread if necessary
432     int started = tryStartThread();
433     if (started != 0) {
434       ERROR("{}: Unable to start thread", __func__);
435       return kInvalidTaskId;
436     }
437     // notify the thread so that it knows of the new task
438     internal_cond_var_.notify_one();
439     // return task id
440     return task->task_id;
441   }
442 
isTaskIdInUse(const AsyncTaskId & task_id) const443   bool isTaskIdInUse(const AsyncTaskId& task_id) const { return tasks_by_id_.count(task_id) != 0; }
444 
tryStartThread()445   int tryStartThread() {
446     // need the lock because of the running flag and the cond var
447     std::unique_lock<std::mutex> guard(internal_mutex_);
448     // check that the thread is not yet running
449     if (running_) {
450       return 0;
451     }
452     // start the thread
453     running_ = true;
454     thread_ = std::thread([this]() { ThreadRoutine(); });
455     if (!thread_.joinable()) {
456       ERROR("{}: Unable to start task thread", __func__);
457       return -1;
458     }
459     return 0;
460   }
461 
ThreadRoutine()462   void ThreadRoutine() {
463     while (running_) {
464       TaskCallback callback;
465       std::shared_ptr<Task> task_p;
466       bool run_it = false;
467       {
468         std::unique_lock<std::mutex> guard(internal_mutex_);
469         if (!task_queue_.empty()) {
470           task_p = *(task_queue_.begin());
471           if (task_p->time < std::chrono::steady_clock::now()) {
472             run_it = true;
473             callback = task_p->callback;
474             task_queue_.erase(task_p);  // need to remove and add again if
475                                         // periodic to update order
476             if (task_p->isPeriodic()) {
477               task_p->time += task_p->period;
478               task_queue_.insert(task_p);
479             } else {
480               tasks_by_user_id_[task_p->user_id].erase(task_p->task_id);
481               tasks_by_id_.erase(task_p->task_id);
482             }
483           }
484         }
485       }
486       if (run_it) {
487         const std::lock_guard<std::mutex> lock(task_p->in_callback);
488         Synchronize(callback);
489       }
490       {
491         std::unique_lock<std::mutex> guard(internal_mutex_);
492         // check for termination right before waiting
493         if (!running_) {
494           break;
495         }
496         // wait until time for the next task (if any)
497         if (!task_queue_.empty()) {
498           // Make a copy of the time_point because wait_until takes a reference
499           // to it and may read it after waiting, by which time the task may
500           // have been freed (e.g. via CancelAsyncTask).
501           std::chrono::steady_clock::time_point time = (*task_queue_.begin())->time;
502           internal_cond_var_.wait_until(guard, time);
503         } else {
504           internal_cond_var_.wait(guard);
505         }
506       }
507     }
508   }
509 
510   bool running_ = false;
511   std::thread thread_;
512   std::mutex internal_mutex_;
513   std::mutex synchronization_mutex_;
514   std::condition_variable internal_cond_var_;
515 
516   AsyncTaskId lastTaskId_ = kInvalidTaskId;
517   AsyncUserId lastUserId_{1};
518   std::map<AsyncTaskId, std::shared_ptr<Task>> tasks_by_id_;
519   std::map<AsyncUserId, std::set<AsyncTaskId>> tasks_by_user_id_;
520   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
521 };
522 
523 // Async Manager Implementation:
AsyncManager()524 AsyncManager::AsyncManager()
525     : fdWatcher_p_(new AsyncFdWatcher()), taskManager_p_(new AsyncTaskManager()) {}
526 
~AsyncManager()527 AsyncManager::~AsyncManager() {
528   // Make sure the threads are stopped before destroying the object.
529   // The threads need to be stopped here and not in each internal class'
530   // destructor because unique_ptr's reset() first assigns nullptr to the
531   // pointer and only then calls the destructor, so any callback running
532   // on these threads would dereference a null pointer if they called a member
533   // function of this class.
534   fdWatcher_p_->stopThread();
535   taskManager_p_->stopThread();
536 }
537 
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)538 int AsyncManager::WatchFdForNonBlockingReads(int file_descriptor,
539                                              const ReadCallback& on_read_fd_ready_callback) {
540   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor, on_read_fd_ready_callback);
541 }
542 
StopWatchingFileDescriptor(int file_descriptor)543 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
544   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
545 }
546 
GetNextUserId()547 AsyncUserId AsyncManager::GetNextUserId() { return taskManager_p_->GetNextUserId(); }
548 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)549 AsyncTaskId AsyncManager::ExecAsync(AsyncUserId user_id, std::chrono::milliseconds delay,
550                                     const TaskCallback& callback) {
551   return taskManager_p_->ExecAsync(user_id, delay, callback);
552 }
553 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)554 AsyncTaskId AsyncManager::ExecAsyncPeriodically(AsyncUserId user_id,
555                                                 std::chrono::milliseconds delay,
556                                                 std::chrono::milliseconds period,
557                                                 const TaskCallback& callback) {
558   return taskManager_p_->ExecAsyncPeriodically(user_id, delay, period, callback);
559 }
560 
CancelAsyncTask(AsyncTaskId async_task_id)561 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
562   return taskManager_p_->CancelAsyncTask(async_task_id);
563 }
564 
CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id)565 bool AsyncManager::CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id) {
566   return taskManager_p_->CancelAsyncTasksFromUser(user_id);
567 }
568 
Synchronize(const CriticalCallback & critical)569 void AsyncManager::Synchronize(const CriticalCallback& critical) {
570   taskManager_p_->Synchronize(critical);
571 }
572 }  // namespace rootcanal
573