xref: /aosp_15_r20/tools/netsim/src/hci/async_manager.cc (revision cf78ab8cffb8fc9207af348f23af247fb04370a6)
1*cf78ab8cSAndroid Build Coastguard Worker /*
2*cf78ab8cSAndroid Build Coastguard Worker  * Copyright 2016 The Android Open Source Project
3*cf78ab8cSAndroid Build Coastguard Worker  *
4*cf78ab8cSAndroid Build Coastguard Worker  * Licensed under the Apache License, Version 2.0 (the "License");
5*cf78ab8cSAndroid Build Coastguard Worker  * you may not use this file except in compliance with the License.
6*cf78ab8cSAndroid Build Coastguard Worker  * You may obtain a copy of the License at
7*cf78ab8cSAndroid Build Coastguard Worker  *
8*cf78ab8cSAndroid Build Coastguard Worker  *      http://www.apache.org/licenses/LICENSE-2.0
9*cf78ab8cSAndroid Build Coastguard Worker  *
10*cf78ab8cSAndroid Build Coastguard Worker  * Unless required by applicable law or agreed to in writing, software
11*cf78ab8cSAndroid Build Coastguard Worker  * distributed under the License is distributed on an "AS IS" BASIS,
12*cf78ab8cSAndroid Build Coastguard Worker  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*cf78ab8cSAndroid Build Coastguard Worker  * See the License for the specific language governing permissions and
14*cf78ab8cSAndroid Build Coastguard Worker  * limitations under the License.
15*cf78ab8cSAndroid Build Coastguard Worker  */
16*cf78ab8cSAndroid Build Coastguard Worker #include "model/setup/async_manager.h"  // for AsyncManager
17*cf78ab8cSAndroid Build Coastguard Worker 
18*cf78ab8cSAndroid Build Coastguard Worker #include <errno.h>  // for errno
19*cf78ab8cSAndroid Build Coastguard Worker 
20*cf78ab8cSAndroid Build Coastguard Worker #include <atomic>              // for atomic_bool, atomic_e...
21*cf78ab8cSAndroid Build Coastguard Worker #include <condition_variable>  // for condition_variable
22*cf78ab8cSAndroid Build Coastguard Worker #include <cstring>             // for strerror
23*cf78ab8cSAndroid Build Coastguard Worker #include <limits>              // for numeric_limits
24*cf78ab8cSAndroid Build Coastguard Worker #include <map>                 // for map<>::value_type, map
25*cf78ab8cSAndroid Build Coastguard Worker #include <mutex>               // for unique_lock, mutex
26*cf78ab8cSAndroid Build Coastguard Worker #include <ratio>               // for ratio
27*cf78ab8cSAndroid Build Coastguard Worker #include <set>                 // for set
28*cf78ab8cSAndroid Build Coastguard Worker #include <thread>              // for thread
29*cf78ab8cSAndroid Build Coastguard Worker #include <type_traits>         // for remove_extent_t
30*cf78ab8cSAndroid Build Coastguard Worker #include <utility>             // for pair, make_pair, oper...
31*cf78ab8cSAndroid Build Coastguard Worker #include <vector>              // for vector
32*cf78ab8cSAndroid Build Coastguard Worker 
33*cf78ab8cSAndroid Build Coastguard Worker #include "aemu/base/EintrWrapper.h"  // for HANDLE_EINTR
34*cf78ab8cSAndroid Build Coastguard Worker #include "aemu/base/Log.h"           // for LogStreamVoidify, Log...
35*cf78ab8cSAndroid Build Coastguard Worker #include "aemu/base/logging/CLog.h"
36*cf78ab8cSAndroid Build Coastguard Worker #include "aemu/base/sockets/SocketUtils.h"   // for socketRecv, socketSet...
37*cf78ab8cSAndroid Build Coastguard Worker #include "aemu/base/sockets/SocketWaiter.h"  // for SocketWaiter, SocketW...
38*cf78ab8cSAndroid Build Coastguard Worker 
39*cf78ab8cSAndroid Build Coastguard Worker namespace rootcanal {
40*cf78ab8cSAndroid Build Coastguard Worker // Implementation of AsyncManager is divided between two classes, three if
41*cf78ab8cSAndroid Build Coastguard Worker // AsyncManager itself is taken into account, but its only responsability
42*cf78ab8cSAndroid Build Coastguard Worker // besides being a proxy for the other two classes is to provide a global
43*cf78ab8cSAndroid Build Coastguard Worker // synchronization mechanism for callbacks and client code to use.
44*cf78ab8cSAndroid Build Coastguard Worker 
45*cf78ab8cSAndroid Build Coastguard Worker // The watching of file descriptors is done through AsyncFdWatcher. Several
46*cf78ab8cSAndroid Build Coastguard Worker // objects of this class may coexist simultaneosly as they share no state.
47*cf78ab8cSAndroid Build Coastguard Worker // After construction of this objects nothing happens beyond some very simple
48*cf78ab8cSAndroid Build Coastguard Worker // member initialization. When the first FD is set up for watching the object
49*cf78ab8cSAndroid Build Coastguard Worker // starts a new thread which watches the given (and later provided) FDs using
50*cf78ab8cSAndroid Build Coastguard Worker // select() inside a loop. A special FD (a pipe) is also watched which is
51*cf78ab8cSAndroid Build Coastguard Worker // used to notify the thread of internal changes on the object state (like
52*cf78ab8cSAndroid Build Coastguard Worker // the addition of new FDs to watch on). Every access to internal state is
53*cf78ab8cSAndroid Build Coastguard Worker // synchronized using a single internal mutex. The thread is only stopped on
54*cf78ab8cSAndroid Build Coastguard Worker // destruction of the object, by modifying a flag, which is the only member
55*cf78ab8cSAndroid Build Coastguard Worker // variable accessed without acquiring the lock (because the notification to
56*cf78ab8cSAndroid Build Coastguard Worker // the thread is done later by writing to a pipe which means the thread will
57*cf78ab8cSAndroid Build Coastguard Worker // be notified regardless of what phase of the loop it is in that moment)
58*cf78ab8cSAndroid Build Coastguard Worker 
59*cf78ab8cSAndroid Build Coastguard Worker // The scheduling of asynchronous tasks, periodic or not, is handled by the
60*cf78ab8cSAndroid Build Coastguard Worker // AsyncTaskManager class. Like the one for FDs, this class shares no internal
61*cf78ab8cSAndroid Build Coastguard Worker // state between different instances so it is safe to use several objects of
62*cf78ab8cSAndroid Build Coastguard Worker // this class, also nothing interesting happens upon construction, but only
63*cf78ab8cSAndroid Build Coastguard Worker // after a Task has been scheduled and access to internal state is synchronized
64*cf78ab8cSAndroid Build Coastguard Worker // using a single internal mutex. When the first task is scheduled a thread
65*cf78ab8cSAndroid Build Coastguard Worker // is started which monitors a queue of tasks. The queue is peeked to see
66*cf78ab8cSAndroid Build Coastguard Worker // when the next task should be carried out and then the thread performs a
67*cf78ab8cSAndroid Build Coastguard Worker // (absolute) timed wait on a condition variable. The wait ends because of a
68*cf78ab8cSAndroid Build Coastguard Worker // time out or a notify on the cond var, the former means a task is due
69*cf78ab8cSAndroid Build Coastguard Worker // for execution while the later means there has been a change in internal
70*cf78ab8cSAndroid Build Coastguard Worker // state, like a task has been scheduled/canceled or the flag to stop has
71*cf78ab8cSAndroid Build Coastguard Worker // been set. Setting and querying the stop flag or modifying the task queue
72*cf78ab8cSAndroid Build Coastguard Worker // and subsequent notification on the cond var is done atomically (e.g while
73*cf78ab8cSAndroid Build Coastguard Worker // holding the lock on the internal mutex) to ensure that the thread never
74*cf78ab8cSAndroid Build Coastguard Worker // misses the notification, since notifying a cond var is not persistent as
75*cf78ab8cSAndroid Build Coastguard Worker // writing on a pipe (if not done this way, the thread could query the
76*cf78ab8cSAndroid Build Coastguard Worker // stopping flag and be put aside by the OS scheduler right after, then the
77*cf78ab8cSAndroid Build Coastguard Worker // 'stop thread' procedure could run, setting the flag, notifying a cond
78*cf78ab8cSAndroid Build Coastguard Worker // var that no one is waiting on and joining the thread, the thread then
79*cf78ab8cSAndroid Build Coastguard Worker // resumes execution believing that it needs to continue and waits on the
80*cf78ab8cSAndroid Build Coastguard Worker // cond var possibly forever if there are no tasks scheduled, efectively
81*cf78ab8cSAndroid Build Coastguard Worker // causing a deadlock).
82*cf78ab8cSAndroid Build Coastguard Worker 
83*cf78ab8cSAndroid Build Coastguard Worker // This number also states the maximum number of scheduled tasks we can handle
84*cf78ab8cSAndroid Build Coastguard Worker // at a given time
85*cf78ab8cSAndroid Build Coastguard Worker static const uint16_t kMaxTaskId =
86*cf78ab8cSAndroid Build Coastguard Worker     -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)87*cf78ab8cSAndroid Build Coastguard Worker static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
88*cf78ab8cSAndroid Build Coastguard Worker   return (id == kMaxTaskId) ? 1 : id + 1;
89*cf78ab8cSAndroid Build Coastguard Worker }
90*cf78ab8cSAndroid Build Coastguard Worker // The buffer is only 10 bytes because the expected number of bytes
91*cf78ab8cSAndroid Build Coastguard Worker // written on this socket is 1. It is possible that the thread is notified
92*cf78ab8cSAndroid Build Coastguard Worker // more than once but highly unlikely, so a buffer of size 10 seems enough
93*cf78ab8cSAndroid Build Coastguard Worker // and the reads are performed inside a while just in case it isn't. From
94*cf78ab8cSAndroid Build Coastguard Worker // the thread routine's point of view it is the same to have been notified
95*cf78ab8cSAndroid Build Coastguard Worker // just once or 100 times so it just tries to consume the entire buffer.
96*cf78ab8cSAndroid Build Coastguard Worker // In the cases where an interrupt would cause read to return without
97*cf78ab8cSAndroid Build Coastguard Worker // having read everything that was available a new iteration of the thread
98*cf78ab8cSAndroid Build Coastguard Worker // loop will bring execution to this point almost immediately, so there is
99*cf78ab8cSAndroid Build Coastguard Worker // no need to treat that case.
100*cf78ab8cSAndroid Build Coastguard Worker static const int kNotificationBufferSize = 10;
101*cf78ab8cSAndroid Build Coastguard Worker 
102*cf78ab8cSAndroid Build Coastguard Worker using android::base::SocketWaiter;
103*cf78ab8cSAndroid Build Coastguard Worker 
104*cf78ab8cSAndroid Build Coastguard Worker // Async File Descriptor Watcher Implementation:
105*cf78ab8cSAndroid Build Coastguard Worker class AsyncManager::AsyncFdWatcher {
106*cf78ab8cSAndroid Build Coastguard Worker  public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)107*cf78ab8cSAndroid Build Coastguard Worker   int WatchFdForNonBlockingReads(
108*cf78ab8cSAndroid Build Coastguard Worker       int file_descriptor, const ReadCallback &on_read_fd_ready_callback) {
109*cf78ab8cSAndroid Build Coastguard Worker     // add file descriptor and callback
110*cf78ab8cSAndroid Build Coastguard Worker     {
111*cf78ab8cSAndroid Build Coastguard Worker       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
112*cf78ab8cSAndroid Build Coastguard Worker       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
113*cf78ab8cSAndroid Build Coastguard Worker     }
114*cf78ab8cSAndroid Build Coastguard Worker 
115*cf78ab8cSAndroid Build Coastguard Worker     // start the thread if not started yet
116*cf78ab8cSAndroid Build Coastguard Worker     int started = tryStartThread();
117*cf78ab8cSAndroid Build Coastguard Worker     if (started != 0) {
118*cf78ab8cSAndroid Build Coastguard Worker       derror("%s: Unable to start thread", __func__);
119*cf78ab8cSAndroid Build Coastguard Worker       return started;
120*cf78ab8cSAndroid Build Coastguard Worker     }
121*cf78ab8cSAndroid Build Coastguard Worker 
122*cf78ab8cSAndroid Build Coastguard Worker     // notify the thread so that it knows of the new FD
123*cf78ab8cSAndroid Build Coastguard Worker     notifyThread();
124*cf78ab8cSAndroid Build Coastguard Worker 
125*cf78ab8cSAndroid Build Coastguard Worker     return 0;
126*cf78ab8cSAndroid Build Coastguard Worker   }
127*cf78ab8cSAndroid Build Coastguard Worker 
StopWatchingFileDescriptor(int file_descriptor)128*cf78ab8cSAndroid Build Coastguard Worker   void StopWatchingFileDescriptor(int file_descriptor) {
129*cf78ab8cSAndroid Build Coastguard Worker     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
130*cf78ab8cSAndroid Build Coastguard Worker     watched_shared_fds_.erase(file_descriptor);
131*cf78ab8cSAndroid Build Coastguard Worker   }
132*cf78ab8cSAndroid Build Coastguard Worker 
133*cf78ab8cSAndroid Build Coastguard Worker   AsyncFdWatcher() = default;
134*cf78ab8cSAndroid Build Coastguard Worker   AsyncFdWatcher(const AsyncFdWatcher &) = delete;
135*cf78ab8cSAndroid Build Coastguard Worker   AsyncFdWatcher &operator=(const AsyncFdWatcher &) = delete;
136*cf78ab8cSAndroid Build Coastguard Worker 
137*cf78ab8cSAndroid Build Coastguard Worker   ~AsyncFdWatcher() = default;
138*cf78ab8cSAndroid Build Coastguard Worker 
stopThread()139*cf78ab8cSAndroid Build Coastguard Worker   int stopThread() {
140*cf78ab8cSAndroid Build Coastguard Worker     if (!std::atomic_exchange(&running_, false)) {
141*cf78ab8cSAndroid Build Coastguard Worker       return 0;  // if not running already
142*cf78ab8cSAndroid Build Coastguard Worker     }
143*cf78ab8cSAndroid Build Coastguard Worker 
144*cf78ab8cSAndroid Build Coastguard Worker     notifyThread();
145*cf78ab8cSAndroid Build Coastguard Worker 
146*cf78ab8cSAndroid Build Coastguard Worker     if (std::this_thread::get_id() != thread_.get_id()) {
147*cf78ab8cSAndroid Build Coastguard Worker       thread_.join();
148*cf78ab8cSAndroid Build Coastguard Worker     } else {
149*cf78ab8cSAndroid Build Coastguard Worker       dwarning("%s: Starting thread stop from inside the reading thread itself",
150*cf78ab8cSAndroid Build Coastguard Worker                __func__);
151*cf78ab8cSAndroid Build Coastguard Worker     }
152*cf78ab8cSAndroid Build Coastguard Worker 
153*cf78ab8cSAndroid Build Coastguard Worker     {
154*cf78ab8cSAndroid Build Coastguard Worker       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
155*cf78ab8cSAndroid Build Coastguard Worker       watched_shared_fds_.clear();
156*cf78ab8cSAndroid Build Coastguard Worker     }
157*cf78ab8cSAndroid Build Coastguard Worker 
158*cf78ab8cSAndroid Build Coastguard Worker     return 0;
159*cf78ab8cSAndroid Build Coastguard Worker   }
160*cf78ab8cSAndroid Build Coastguard Worker 
161*cf78ab8cSAndroid Build Coastguard Worker  private:
162*cf78ab8cSAndroid Build Coastguard Worker   // Make sure to call this with at least one file descriptor ready to be
163*cf78ab8cSAndroid Build Coastguard Worker   // watched upon or the thread routine will return immediately
tryStartThread()164*cf78ab8cSAndroid Build Coastguard Worker   int tryStartThread() {
165*cf78ab8cSAndroid Build Coastguard Worker     if (std::atomic_exchange(&running_, true)) {
166*cf78ab8cSAndroid Build Coastguard Worker       return 0;  // if already running
167*cf78ab8cSAndroid Build Coastguard Worker     }
168*cf78ab8cSAndroid Build Coastguard Worker     // set up the communication channel
169*cf78ab8cSAndroid Build Coastguard Worker     if (android::base::socketCreatePair(&notification_listen_fd_,
170*cf78ab8cSAndroid Build Coastguard Worker                                         &notification_write_fd_)) {
171*cf78ab8cSAndroid Build Coastguard Worker       derror(
172*cf78ab8cSAndroid Build Coastguard Worker           "%s:Unable to establish a communication channel to the reading "
173*cf78ab8cSAndroid Build Coastguard Worker           "thread",
174*cf78ab8cSAndroid Build Coastguard Worker           __func__);
175*cf78ab8cSAndroid Build Coastguard Worker       return -1;
176*cf78ab8cSAndroid Build Coastguard Worker     }
177*cf78ab8cSAndroid Build Coastguard Worker     android::base::socketSetNonBlocking(notification_listen_fd_);
178*cf78ab8cSAndroid Build Coastguard Worker     android::base::socketSetNonBlocking(notification_write_fd_);
179*cf78ab8cSAndroid Build Coastguard Worker 
180*cf78ab8cSAndroid Build Coastguard Worker     thread_ = std::thread([this]() { ThreadRoutine(); });
181*cf78ab8cSAndroid Build Coastguard Worker     if (!thread_.joinable()) {
182*cf78ab8cSAndroid Build Coastguard Worker       derror("%s: Unable to start reading thread", __func__);
183*cf78ab8cSAndroid Build Coastguard Worker       return -1;
184*cf78ab8cSAndroid Build Coastguard Worker     }
185*cf78ab8cSAndroid Build Coastguard Worker     return 0;
186*cf78ab8cSAndroid Build Coastguard Worker   }
187*cf78ab8cSAndroid Build Coastguard Worker 
notifyThread()188*cf78ab8cSAndroid Build Coastguard Worker   int notifyThread() {
189*cf78ab8cSAndroid Build Coastguard Worker     char buffer = '0';
190*cf78ab8cSAndroid Build Coastguard Worker     if (android::base::socketSend(notification_write_fd_, &buffer, 1) < 0) {
191*cf78ab8cSAndroid Build Coastguard Worker       derror("%s: Unable to send message to reading thread", __func__);
192*cf78ab8cSAndroid Build Coastguard Worker       return -1;
193*cf78ab8cSAndroid Build Coastguard Worker     }
194*cf78ab8cSAndroid Build Coastguard Worker     return 0;
195*cf78ab8cSAndroid Build Coastguard Worker   }
196*cf78ab8cSAndroid Build Coastguard Worker 
setUpFileDescriptorSet(SocketWaiter * read_fds)197*cf78ab8cSAndroid Build Coastguard Worker   void setUpFileDescriptorSet(SocketWaiter *read_fds) {
198*cf78ab8cSAndroid Build Coastguard Worker     // add comm channel to the set
199*cf78ab8cSAndroid Build Coastguard Worker     read_fds->update(notification_listen_fd_, SocketWaiter::Event::kEventRead);
200*cf78ab8cSAndroid Build Coastguard Worker 
201*cf78ab8cSAndroid Build Coastguard Worker     // add watched FDs to the set
202*cf78ab8cSAndroid Build Coastguard Worker     {
203*cf78ab8cSAndroid Build Coastguard Worker       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
204*cf78ab8cSAndroid Build Coastguard Worker       for (auto &fdp : watched_shared_fds_) {
205*cf78ab8cSAndroid Build Coastguard Worker         read_fds->update(fdp.first, SocketWaiter::Event::kEventRead);
206*cf78ab8cSAndroid Build Coastguard Worker       }
207*cf78ab8cSAndroid Build Coastguard Worker     }
208*cf78ab8cSAndroid Build Coastguard Worker   }
209*cf78ab8cSAndroid Build Coastguard Worker 
210*cf78ab8cSAndroid Build Coastguard Worker   // check the comm channel and read everything there
consumeThreadNotifications(SocketWaiter * read_fds)211*cf78ab8cSAndroid Build Coastguard Worker   bool consumeThreadNotifications(SocketWaiter *read_fds) {
212*cf78ab8cSAndroid Build Coastguard Worker     if (read_fds->pendingEventsFor(notification_listen_fd_)) {
213*cf78ab8cSAndroid Build Coastguard Worker       char buffer[kNotificationBufferSize];
214*cf78ab8cSAndroid Build Coastguard Worker       while (HANDLE_EINTR(android::base::socketRecv(
215*cf78ab8cSAndroid Build Coastguard Worker                  notification_listen_fd_, buffer, kNotificationBufferSize)) ==
216*cf78ab8cSAndroid Build Coastguard Worker              kNotificationBufferSize) {
217*cf78ab8cSAndroid Build Coastguard Worker       }
218*cf78ab8cSAndroid Build Coastguard Worker       return true;
219*cf78ab8cSAndroid Build Coastguard Worker     }
220*cf78ab8cSAndroid Build Coastguard Worker     return false;
221*cf78ab8cSAndroid Build Coastguard Worker   }
222*cf78ab8cSAndroid Build Coastguard Worker 
223*cf78ab8cSAndroid Build Coastguard Worker   // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(SocketWaiter * read_fds)224*cf78ab8cSAndroid Build Coastguard Worker   void runAppropriateCallbacks(SocketWaiter *read_fds) {
225*cf78ab8cSAndroid Build Coastguard Worker     // not a good idea to call a callback while holding the FD lock,
226*cf78ab8cSAndroid Build Coastguard Worker     // nor to release the lock while traversing the map
227*cf78ab8cSAndroid Build Coastguard Worker     std::vector<decltype(watched_shared_fds_)::value_type> fds;
228*cf78ab8cSAndroid Build Coastguard Worker     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
229*cf78ab8cSAndroid Build Coastguard Worker     for (auto &fdc : watched_shared_fds_) {
230*cf78ab8cSAndroid Build Coastguard Worker       auto pending = read_fds->pendingEventsFor(fdc.first);
231*cf78ab8cSAndroid Build Coastguard Worker       if (pending == SocketWaiter::kEventRead) {
232*cf78ab8cSAndroid Build Coastguard Worker         fds.push_back(fdc);
233*cf78ab8cSAndroid Build Coastguard Worker       }
234*cf78ab8cSAndroid Build Coastguard Worker     }
235*cf78ab8cSAndroid Build Coastguard Worker 
236*cf78ab8cSAndroid Build Coastguard Worker     for (auto &p : fds) {
237*cf78ab8cSAndroid Build Coastguard Worker       p.second(p.first);
238*cf78ab8cSAndroid Build Coastguard Worker     }
239*cf78ab8cSAndroid Build Coastguard Worker   }
240*cf78ab8cSAndroid Build Coastguard Worker 
ThreadRoutine()241*cf78ab8cSAndroid Build Coastguard Worker   void ThreadRoutine() {
242*cf78ab8cSAndroid Build Coastguard Worker     auto read_fds = std::unique_ptr<SocketWaiter>(SocketWaiter::create());
243*cf78ab8cSAndroid Build Coastguard Worker     while (running_) {
244*cf78ab8cSAndroid Build Coastguard Worker       read_fds->reset();
245*cf78ab8cSAndroid Build Coastguard Worker       setUpFileDescriptorSet(read_fds.get());
246*cf78ab8cSAndroid Build Coastguard Worker 
247*cf78ab8cSAndroid Build Coastguard Worker       // wait until there is data available to read on some FD
248*cf78ab8cSAndroid Build Coastguard Worker       int retval = read_fds->wait(std::numeric_limits<int64_t>::max());
249*cf78ab8cSAndroid Build Coastguard Worker       if (retval <= 0) {  // there was some error or a timeout
250*cf78ab8cSAndroid Build Coastguard Worker         derror(
251*cf78ab8cSAndroid Build Coastguard Worker             "%s: There was an error while waiting for data on the file "
252*cf78ab8cSAndroid Build Coastguard Worker             "descriptors: %s",
253*cf78ab8cSAndroid Build Coastguard Worker             __func__, strerror(errno));
254*cf78ab8cSAndroid Build Coastguard Worker         continue;
255*cf78ab8cSAndroid Build Coastguard Worker       }
256*cf78ab8cSAndroid Build Coastguard Worker 
257*cf78ab8cSAndroid Build Coastguard Worker       consumeThreadNotifications(read_fds.get());
258*cf78ab8cSAndroid Build Coastguard Worker 
259*cf78ab8cSAndroid Build Coastguard Worker       // Do not read if there was a call to stop running
260*cf78ab8cSAndroid Build Coastguard Worker       if (!running_) {
261*cf78ab8cSAndroid Build Coastguard Worker         break;
262*cf78ab8cSAndroid Build Coastguard Worker       }
263*cf78ab8cSAndroid Build Coastguard Worker 
264*cf78ab8cSAndroid Build Coastguard Worker       runAppropriateCallbacks(read_fds.get());
265*cf78ab8cSAndroid Build Coastguard Worker     }
266*cf78ab8cSAndroid Build Coastguard Worker   }
267*cf78ab8cSAndroid Build Coastguard Worker 
268*cf78ab8cSAndroid Build Coastguard Worker   std::atomic_bool running_{false};
269*cf78ab8cSAndroid Build Coastguard Worker   std::thread thread_;
270*cf78ab8cSAndroid Build Coastguard Worker   std::recursive_mutex internal_mutex_;
271*cf78ab8cSAndroid Build Coastguard Worker 
272*cf78ab8cSAndroid Build Coastguard Worker   // android::base::SocketWaiter socket_waiter_;
273*cf78ab8cSAndroid Build Coastguard Worker   std::map<int, ReadCallback> watched_shared_fds_;
274*cf78ab8cSAndroid Build Coastguard Worker 
275*cf78ab8cSAndroid Build Coastguard Worker   // A pair of FD to send information to the reading thread
276*cf78ab8cSAndroid Build Coastguard Worker   int notification_listen_fd_{};
277*cf78ab8cSAndroid Build Coastguard Worker   int notification_write_fd_{};
278*cf78ab8cSAndroid Build Coastguard Worker };
279*cf78ab8cSAndroid Build Coastguard Worker 
280*cf78ab8cSAndroid Build Coastguard Worker // Async task manager implementation
281*cf78ab8cSAndroid Build Coastguard Worker class AsyncManager::AsyncTaskManager {
282*cf78ab8cSAndroid Build Coastguard Worker  public:
GetNextUserId()283*cf78ab8cSAndroid Build Coastguard Worker   AsyncUserId GetNextUserId() { return lastUserId_++; }
284*cf78ab8cSAndroid Build Coastguard Worker 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)285*cf78ab8cSAndroid Build Coastguard Worker   AsyncTaskId ExecAsync(AsyncUserId user_id, std::chrono::milliseconds delay,
286*cf78ab8cSAndroid Build Coastguard Worker                         const TaskCallback &callback) {
287*cf78ab8cSAndroid Build Coastguard Worker     return scheduleTask(std::make_shared<Task>(
288*cf78ab8cSAndroid Build Coastguard Worker         std::chrono::steady_clock::now() + delay, callback, user_id));
289*cf78ab8cSAndroid Build Coastguard Worker   }
290*cf78ab8cSAndroid Build Coastguard Worker 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)291*cf78ab8cSAndroid Build Coastguard Worker   AsyncTaskId ExecAsyncPeriodically(AsyncUserId user_id,
292*cf78ab8cSAndroid Build Coastguard Worker                                     std::chrono::milliseconds delay,
293*cf78ab8cSAndroid Build Coastguard Worker                                     std::chrono::milliseconds period,
294*cf78ab8cSAndroid Build Coastguard Worker                                     const TaskCallback &callback) {
295*cf78ab8cSAndroid Build Coastguard Worker     return scheduleTask(std::make_shared<Task>(
296*cf78ab8cSAndroid Build Coastguard Worker         std::chrono::steady_clock::now() + delay, period, callback, user_id));
297*cf78ab8cSAndroid Build Coastguard Worker   }
298*cf78ab8cSAndroid Build Coastguard Worker 
CancelAsyncTask(AsyncTaskId async_task_id)299*cf78ab8cSAndroid Build Coastguard Worker   bool CancelAsyncTask(AsyncTaskId async_task_id) {
300*cf78ab8cSAndroid Build Coastguard Worker     // remove task from queue (and task id association) while holding lock
301*cf78ab8cSAndroid Build Coastguard Worker     std::unique_lock<std::mutex> guard(internal_mutex_);
302*cf78ab8cSAndroid Build Coastguard Worker     return cancel_task_with_lock_held(async_task_id);
303*cf78ab8cSAndroid Build Coastguard Worker   }
304*cf78ab8cSAndroid Build Coastguard Worker 
CancelAsyncTasksFromUser(AsyncUserId user_id)305*cf78ab8cSAndroid Build Coastguard Worker   bool CancelAsyncTasksFromUser(AsyncUserId user_id) {
306*cf78ab8cSAndroid Build Coastguard Worker     // remove task from queue (and task id association) while holding lock
307*cf78ab8cSAndroid Build Coastguard Worker     std::unique_lock<std::mutex> guard(internal_mutex_);
308*cf78ab8cSAndroid Build Coastguard Worker     if (tasks_by_user_id_.count(user_id) == 0) {
309*cf78ab8cSAndroid Build Coastguard Worker       return false;
310*cf78ab8cSAndroid Build Coastguard Worker     }
311*cf78ab8cSAndroid Build Coastguard Worker     for (auto task : tasks_by_user_id_[user_id]) {
312*cf78ab8cSAndroid Build Coastguard Worker       cancel_task_with_lock_held(task);
313*cf78ab8cSAndroid Build Coastguard Worker     }
314*cf78ab8cSAndroid Build Coastguard Worker     tasks_by_user_id_.erase(user_id);
315*cf78ab8cSAndroid Build Coastguard Worker     return true;
316*cf78ab8cSAndroid Build Coastguard Worker   }
317*cf78ab8cSAndroid Build Coastguard Worker 
Synchronize(const CriticalCallback & critical)318*cf78ab8cSAndroid Build Coastguard Worker   void Synchronize(const CriticalCallback &critical) {
319*cf78ab8cSAndroid Build Coastguard Worker     std::unique_lock<std::mutex> guard(synchronization_mutex_);
320*cf78ab8cSAndroid Build Coastguard Worker     critical();
321*cf78ab8cSAndroid Build Coastguard Worker   }
322*cf78ab8cSAndroid Build Coastguard Worker 
323*cf78ab8cSAndroid Build Coastguard Worker   AsyncTaskManager() = default;
324*cf78ab8cSAndroid Build Coastguard Worker   AsyncTaskManager(const AsyncTaskManager &) = delete;
325*cf78ab8cSAndroid Build Coastguard Worker   AsyncTaskManager &operator=(const AsyncTaskManager &) = delete;
326*cf78ab8cSAndroid Build Coastguard Worker 
327*cf78ab8cSAndroid Build Coastguard Worker   ~AsyncTaskManager() = default;
328*cf78ab8cSAndroid Build Coastguard Worker 
stopThread()329*cf78ab8cSAndroid Build Coastguard Worker   int stopThread() {
330*cf78ab8cSAndroid Build Coastguard Worker     {
331*cf78ab8cSAndroid Build Coastguard Worker       std::unique_lock<std::mutex> guard(internal_mutex_);
332*cf78ab8cSAndroid Build Coastguard Worker       tasks_by_id_.clear();
333*cf78ab8cSAndroid Build Coastguard Worker       task_queue_.clear();
334*cf78ab8cSAndroid Build Coastguard Worker       if (!running_) {
335*cf78ab8cSAndroid Build Coastguard Worker         return 0;
336*cf78ab8cSAndroid Build Coastguard Worker       }
337*cf78ab8cSAndroid Build Coastguard Worker       running_ = false;
338*cf78ab8cSAndroid Build Coastguard Worker       // notify the thread
339*cf78ab8cSAndroid Build Coastguard Worker       internal_cond_var_.notify_one();
340*cf78ab8cSAndroid Build Coastguard Worker     }  // release the lock before joining a thread that is likely waiting for it
341*cf78ab8cSAndroid Build Coastguard Worker     if (std::this_thread::get_id() != thread_.get_id()) {
342*cf78ab8cSAndroid Build Coastguard Worker       thread_.join();
343*cf78ab8cSAndroid Build Coastguard Worker     } else {
344*cf78ab8cSAndroid Build Coastguard Worker       dwarning("%s: Starting thread stop from inside the task thread itself",
345*cf78ab8cSAndroid Build Coastguard Worker                __func__);
346*cf78ab8cSAndroid Build Coastguard Worker     }
347*cf78ab8cSAndroid Build Coastguard Worker     return 0;
348*cf78ab8cSAndroid Build Coastguard Worker   }
349*cf78ab8cSAndroid Build Coastguard Worker 
350*cf78ab8cSAndroid Build Coastguard Worker  private:
351*cf78ab8cSAndroid Build Coastguard Worker   // Holds the data for each task
352*cf78ab8cSAndroid Build Coastguard Worker   class Task {
353*cf78ab8cSAndroid Build Coastguard Worker    public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback,AsyncUserId user)354*cf78ab8cSAndroid Build Coastguard Worker     Task(std::chrono::steady_clock::time_point time,
355*cf78ab8cSAndroid Build Coastguard Worker          std::chrono::milliseconds period, const TaskCallback &callback,
356*cf78ab8cSAndroid Build Coastguard Worker          AsyncUserId user)
357*cf78ab8cSAndroid Build Coastguard Worker         : time(time),
358*cf78ab8cSAndroid Build Coastguard Worker           periodic(true),
359*cf78ab8cSAndroid Build Coastguard Worker           period(period),
360*cf78ab8cSAndroid Build Coastguard Worker           callback(callback),
361*cf78ab8cSAndroid Build Coastguard Worker           task_id(kInvalidTaskId),
362*cf78ab8cSAndroid Build Coastguard Worker           user_id(user) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback,AsyncUserId user)363*cf78ab8cSAndroid Build Coastguard Worker     Task(std::chrono::steady_clock::time_point time,
364*cf78ab8cSAndroid Build Coastguard Worker          const TaskCallback &callback, AsyncUserId user)
365*cf78ab8cSAndroid Build Coastguard Worker         : time(time),
366*cf78ab8cSAndroid Build Coastguard Worker           periodic(false),
367*cf78ab8cSAndroid Build Coastguard Worker           callback(callback),
368*cf78ab8cSAndroid Build Coastguard Worker           task_id(kInvalidTaskId),
369*cf78ab8cSAndroid Build Coastguard Worker           user_id(user) {}
370*cf78ab8cSAndroid Build Coastguard Worker 
371*cf78ab8cSAndroid Build Coastguard Worker     // Operators needed to be in a collection
operator <(const Task & another) const372*cf78ab8cSAndroid Build Coastguard Worker     bool operator<(const Task &another) const {
373*cf78ab8cSAndroid Build Coastguard Worker       return std::make_pair(time, task_id) <
374*cf78ab8cSAndroid Build Coastguard Worker              std::make_pair(another.time, another.task_id);
375*cf78ab8cSAndroid Build Coastguard Worker     }
376*cf78ab8cSAndroid Build Coastguard Worker 
isPeriodic() const377*cf78ab8cSAndroid Build Coastguard Worker     bool isPeriodic() const { return periodic; }
378*cf78ab8cSAndroid Build Coastguard Worker 
379*cf78ab8cSAndroid Build Coastguard Worker     // These fields should no longer be public if the class ever becomes
380*cf78ab8cSAndroid Build Coastguard Worker     // public or gets more complex
381*cf78ab8cSAndroid Build Coastguard Worker     std::chrono::steady_clock::time_point time;
382*cf78ab8cSAndroid Build Coastguard Worker     bool periodic;
383*cf78ab8cSAndroid Build Coastguard Worker     std::chrono::milliseconds period{};
384*cf78ab8cSAndroid Build Coastguard Worker     std::mutex in_callback;  // Taken when the callback is active
385*cf78ab8cSAndroid Build Coastguard Worker     TaskCallback callback;
386*cf78ab8cSAndroid Build Coastguard Worker     AsyncTaskId task_id;
387*cf78ab8cSAndroid Build Coastguard Worker     AsyncUserId user_id;
388*cf78ab8cSAndroid Build Coastguard Worker   };
389*cf78ab8cSAndroid Build Coastguard Worker 
390*cf78ab8cSAndroid Build Coastguard Worker   // A comparator class to put shared pointers to tasks in an ordered set
391*cf78ab8cSAndroid Build Coastguard Worker   struct task_p_comparator {
operator ()rootcanal::AsyncManager::AsyncTaskManager::task_p_comparator392*cf78ab8cSAndroid Build Coastguard Worker     bool operator()(const std::shared_ptr<Task> &t1,
393*cf78ab8cSAndroid Build Coastguard Worker                     const std::shared_ptr<Task> &t2) const {
394*cf78ab8cSAndroid Build Coastguard Worker       return *t1 < *t2;
395*cf78ab8cSAndroid Build Coastguard Worker     }
396*cf78ab8cSAndroid Build Coastguard Worker   };
397*cf78ab8cSAndroid Build Coastguard Worker 
cancel_task_with_lock_held(AsyncTaskId async_task_id)398*cf78ab8cSAndroid Build Coastguard Worker   bool cancel_task_with_lock_held(AsyncTaskId async_task_id) {
399*cf78ab8cSAndroid Build Coastguard Worker     if (tasks_by_id_.count(async_task_id) == 0) {
400*cf78ab8cSAndroid Build Coastguard Worker       return false;
401*cf78ab8cSAndroid Build Coastguard Worker     }
402*cf78ab8cSAndroid Build Coastguard Worker 
403*cf78ab8cSAndroid Build Coastguard Worker     // Now make sure we are not running this task.
404*cf78ab8cSAndroid Build Coastguard Worker     // 2 cases
405*cf78ab8cSAndroid Build Coastguard Worker     // - This is called from thread_, this means a scheduled task is actually
406*cf78ab8cSAndroid Build Coastguard Worker     //   unregistering.
407*cf78ab8cSAndroid Build Coastguard Worker     // - Another thread is calling us, let's make sure the task is not active.
408*cf78ab8cSAndroid Build Coastguard Worker     if (thread_.get_id() != std::this_thread::get_id()) {
409*cf78ab8cSAndroid Build Coastguard Worker       auto task = tasks_by_id_[async_task_id];
410*cf78ab8cSAndroid Build Coastguard Worker       const std::lock_guard<std::mutex> lock(task->in_callback);
411*cf78ab8cSAndroid Build Coastguard Worker       task_queue_.erase(task);
412*cf78ab8cSAndroid Build Coastguard Worker       tasks_by_id_.erase(async_task_id);
413*cf78ab8cSAndroid Build Coastguard Worker     } else {
414*cf78ab8cSAndroid Build Coastguard Worker       task_queue_.erase(tasks_by_id_[async_task_id]);
415*cf78ab8cSAndroid Build Coastguard Worker       tasks_by_id_.erase(async_task_id);
416*cf78ab8cSAndroid Build Coastguard Worker     }
417*cf78ab8cSAndroid Build Coastguard Worker 
418*cf78ab8cSAndroid Build Coastguard Worker     return true;
419*cf78ab8cSAndroid Build Coastguard Worker   }
420*cf78ab8cSAndroid Build Coastguard Worker 
scheduleTask(const std::shared_ptr<Task> & task)421*cf78ab8cSAndroid Build Coastguard Worker   AsyncTaskId scheduleTask(const std::shared_ptr<Task> &task) {
422*cf78ab8cSAndroid Build Coastguard Worker     {
423*cf78ab8cSAndroid Build Coastguard Worker       std::unique_lock<std::mutex> guard(internal_mutex_);
424*cf78ab8cSAndroid Build Coastguard Worker       // no more room for new tasks, we need a larger type for IDs
425*cf78ab8cSAndroid Build Coastguard Worker       if (tasks_by_id_.size() == kMaxTaskId)  // TODO potentially type unsafe
426*cf78ab8cSAndroid Build Coastguard Worker         return kInvalidTaskId;
427*cf78ab8cSAndroid Build Coastguard Worker       do {
428*cf78ab8cSAndroid Build Coastguard Worker         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
429*cf78ab8cSAndroid Build Coastguard Worker       } while (isTaskIdInUse(lastTaskId_));
430*cf78ab8cSAndroid Build Coastguard Worker       task->task_id = lastTaskId_;
431*cf78ab8cSAndroid Build Coastguard Worker       // add task to the queue and map
432*cf78ab8cSAndroid Build Coastguard Worker       tasks_by_id_[lastTaskId_] = task;
433*cf78ab8cSAndroid Build Coastguard Worker       tasks_by_user_id_[task->user_id].insert(task->task_id);
434*cf78ab8cSAndroid Build Coastguard Worker       task_queue_.insert(task);
435*cf78ab8cSAndroid Build Coastguard Worker     }
436*cf78ab8cSAndroid Build Coastguard Worker     // start thread if necessary
437*cf78ab8cSAndroid Build Coastguard Worker     int started = tryStartThread();
438*cf78ab8cSAndroid Build Coastguard Worker     if (started != 0) {
439*cf78ab8cSAndroid Build Coastguard Worker       derror("%s: Unable to start thread", __func__);
440*cf78ab8cSAndroid Build Coastguard Worker       return kInvalidTaskId;
441*cf78ab8cSAndroid Build Coastguard Worker     }
442*cf78ab8cSAndroid Build Coastguard Worker     // notify the thread so that it knows of the new task
443*cf78ab8cSAndroid Build Coastguard Worker     internal_cond_var_.notify_one();
444*cf78ab8cSAndroid Build Coastguard Worker     // return task id
445*cf78ab8cSAndroid Build Coastguard Worker     return task->task_id;
446*cf78ab8cSAndroid Build Coastguard Worker   }
447*cf78ab8cSAndroid Build Coastguard Worker 
isTaskIdInUse(const AsyncTaskId & task_id) const448*cf78ab8cSAndroid Build Coastguard Worker   bool isTaskIdInUse(const AsyncTaskId &task_id) const {
449*cf78ab8cSAndroid Build Coastguard Worker     return tasks_by_id_.count(task_id) != 0;
450*cf78ab8cSAndroid Build Coastguard Worker   }
451*cf78ab8cSAndroid Build Coastguard Worker 
tryStartThread()452*cf78ab8cSAndroid Build Coastguard Worker   int tryStartThread() {
453*cf78ab8cSAndroid Build Coastguard Worker     // need the lock because of the running flag and the cond var
454*cf78ab8cSAndroid Build Coastguard Worker     std::unique_lock<std::mutex> guard(internal_mutex_);
455*cf78ab8cSAndroid Build Coastguard Worker     // check that the thread is not yet running
456*cf78ab8cSAndroid Build Coastguard Worker     if (running_) {
457*cf78ab8cSAndroid Build Coastguard Worker       return 0;
458*cf78ab8cSAndroid Build Coastguard Worker     }
459*cf78ab8cSAndroid Build Coastguard Worker     // start the thread
460*cf78ab8cSAndroid Build Coastguard Worker     running_ = true;
461*cf78ab8cSAndroid Build Coastguard Worker     thread_ = std::thread([this]() { ThreadRoutine(); });
462*cf78ab8cSAndroid Build Coastguard Worker     if (!thread_.joinable()) {
463*cf78ab8cSAndroid Build Coastguard Worker       derror("%s: Unable to start task thread", __func__);
464*cf78ab8cSAndroid Build Coastguard Worker       return -1;
465*cf78ab8cSAndroid Build Coastguard Worker     }
466*cf78ab8cSAndroid Build Coastguard Worker     return 0;
467*cf78ab8cSAndroid Build Coastguard Worker   }
468*cf78ab8cSAndroid Build Coastguard Worker 
ThreadRoutine()469*cf78ab8cSAndroid Build Coastguard Worker   void ThreadRoutine() {
470*cf78ab8cSAndroid Build Coastguard Worker     while (running_) {
471*cf78ab8cSAndroid Build Coastguard Worker       TaskCallback callback;
472*cf78ab8cSAndroid Build Coastguard Worker       std::shared_ptr<Task> task_p;
473*cf78ab8cSAndroid Build Coastguard Worker       bool run_it = false;
474*cf78ab8cSAndroid Build Coastguard Worker       {
475*cf78ab8cSAndroid Build Coastguard Worker         std::unique_lock<std::mutex> guard(internal_mutex_);
476*cf78ab8cSAndroid Build Coastguard Worker         if (!task_queue_.empty()) {
477*cf78ab8cSAndroid Build Coastguard Worker           task_p = *(task_queue_.begin());
478*cf78ab8cSAndroid Build Coastguard Worker           if (task_p->time < std::chrono::steady_clock::now()) {
479*cf78ab8cSAndroid Build Coastguard Worker             run_it = true;
480*cf78ab8cSAndroid Build Coastguard Worker             callback = task_p->callback;
481*cf78ab8cSAndroid Build Coastguard Worker             task_queue_.erase(task_p);  // need to remove and add again if
482*cf78ab8cSAndroid Build Coastguard Worker                                         // periodic to update order
483*cf78ab8cSAndroid Build Coastguard Worker             if (task_p->isPeriodic()) {
484*cf78ab8cSAndroid Build Coastguard Worker               task_p->time += task_p->period;
485*cf78ab8cSAndroid Build Coastguard Worker               task_queue_.insert(task_p);
486*cf78ab8cSAndroid Build Coastguard Worker             } else {
487*cf78ab8cSAndroid Build Coastguard Worker               tasks_by_user_id_[task_p->user_id].erase(task_p->task_id);
488*cf78ab8cSAndroid Build Coastguard Worker               tasks_by_id_.erase(task_p->task_id);
489*cf78ab8cSAndroid Build Coastguard Worker             }
490*cf78ab8cSAndroid Build Coastguard Worker           }
491*cf78ab8cSAndroid Build Coastguard Worker         }
492*cf78ab8cSAndroid Build Coastguard Worker       }
493*cf78ab8cSAndroid Build Coastguard Worker       if (run_it) {
494*cf78ab8cSAndroid Build Coastguard Worker         const std::lock_guard<std::mutex> lock(task_p->in_callback);
495*cf78ab8cSAndroid Build Coastguard Worker         Synchronize(callback);
496*cf78ab8cSAndroid Build Coastguard Worker       }
497*cf78ab8cSAndroid Build Coastguard Worker       {
498*cf78ab8cSAndroid Build Coastguard Worker         std::unique_lock<std::mutex> guard(internal_mutex_);
499*cf78ab8cSAndroid Build Coastguard Worker         // check for termination right before waiting
500*cf78ab8cSAndroid Build Coastguard Worker         if (!running_) break;
501*cf78ab8cSAndroid Build Coastguard Worker         // wait until time for the next task (if any)
502*cf78ab8cSAndroid Build Coastguard Worker         if (task_queue_.size() > 0) {
503*cf78ab8cSAndroid Build Coastguard Worker           // Make a copy of the time_point because wait_until takes a reference
504*cf78ab8cSAndroid Build Coastguard Worker           // to it and may read it after waiting, by which time the task may
505*cf78ab8cSAndroid Build Coastguard Worker           // have been freed (e.g. via CancelAsyncTask).
506*cf78ab8cSAndroid Build Coastguard Worker           std::chrono::steady_clock::time_point time =
507*cf78ab8cSAndroid Build Coastguard Worker               (*task_queue_.begin())->time;
508*cf78ab8cSAndroid Build Coastguard Worker           internal_cond_var_.wait_until(guard, time);
509*cf78ab8cSAndroid Build Coastguard Worker         } else {
510*cf78ab8cSAndroid Build Coastguard Worker           internal_cond_var_.wait(guard);
511*cf78ab8cSAndroid Build Coastguard Worker         }
512*cf78ab8cSAndroid Build Coastguard Worker       }
513*cf78ab8cSAndroid Build Coastguard Worker     }
514*cf78ab8cSAndroid Build Coastguard Worker   }
515*cf78ab8cSAndroid Build Coastguard Worker 
516*cf78ab8cSAndroid Build Coastguard Worker   bool running_ = false;
517*cf78ab8cSAndroid Build Coastguard Worker   std::thread thread_;
518*cf78ab8cSAndroid Build Coastguard Worker   std::mutex internal_mutex_;
519*cf78ab8cSAndroid Build Coastguard Worker   std::mutex synchronization_mutex_;
520*cf78ab8cSAndroid Build Coastguard Worker   std::condition_variable internal_cond_var_;
521*cf78ab8cSAndroid Build Coastguard Worker 
522*cf78ab8cSAndroid Build Coastguard Worker   AsyncTaskId lastTaskId_ = kInvalidTaskId;
523*cf78ab8cSAndroid Build Coastguard Worker   AsyncUserId lastUserId_{1};
524*cf78ab8cSAndroid Build Coastguard Worker   std::map<AsyncTaskId, std::shared_ptr<Task>> tasks_by_id_;
525*cf78ab8cSAndroid Build Coastguard Worker   std::map<AsyncUserId, std::set<AsyncTaskId>> tasks_by_user_id_;
526*cf78ab8cSAndroid Build Coastguard Worker   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
527*cf78ab8cSAndroid Build Coastguard Worker };
528*cf78ab8cSAndroid Build Coastguard Worker 
529*cf78ab8cSAndroid Build Coastguard Worker // Async Manager Implementation:
AsyncManager()530*cf78ab8cSAndroid Build Coastguard Worker AsyncManager::AsyncManager()
531*cf78ab8cSAndroid Build Coastguard Worker     : fdWatcher_p_(new AsyncFdWatcher()),
532*cf78ab8cSAndroid Build Coastguard Worker       taskManager_p_(new AsyncTaskManager()) {}
533*cf78ab8cSAndroid Build Coastguard Worker 
~AsyncManager()534*cf78ab8cSAndroid Build Coastguard Worker AsyncManager::~AsyncManager() {
535*cf78ab8cSAndroid Build Coastguard Worker   // Make sure the threads are stopped before destroying the object.
536*cf78ab8cSAndroid Build Coastguard Worker   // The threads need to be stopped here and not in each internal class'
537*cf78ab8cSAndroid Build Coastguard Worker   // destructor because unique_ptr's reset() first assigns nullptr to the
538*cf78ab8cSAndroid Build Coastguard Worker   // pointer and only then calls the destructor, so any callback running
539*cf78ab8cSAndroid Build Coastguard Worker   // on these threads would dereference a null pointer if they called a member
540*cf78ab8cSAndroid Build Coastguard Worker   // function of this class.
541*cf78ab8cSAndroid Build Coastguard Worker   fdWatcher_p_->stopThread();
542*cf78ab8cSAndroid Build Coastguard Worker   taskManager_p_->stopThread();
543*cf78ab8cSAndroid Build Coastguard Worker }
544*cf78ab8cSAndroid Build Coastguard Worker 
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)545*cf78ab8cSAndroid Build Coastguard Worker int AsyncManager::WatchFdForNonBlockingReads(
546*cf78ab8cSAndroid Build Coastguard Worker     int file_descriptor, const ReadCallback &on_read_fd_ready_callback) {
547*cf78ab8cSAndroid Build Coastguard Worker   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor,
548*cf78ab8cSAndroid Build Coastguard Worker                                                   on_read_fd_ready_callback);
549*cf78ab8cSAndroid Build Coastguard Worker }
550*cf78ab8cSAndroid Build Coastguard Worker 
StopWatchingFileDescriptor(int file_descriptor)551*cf78ab8cSAndroid Build Coastguard Worker void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
552*cf78ab8cSAndroid Build Coastguard Worker   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
553*cf78ab8cSAndroid Build Coastguard Worker }
554*cf78ab8cSAndroid Build Coastguard Worker 
GetNextUserId()555*cf78ab8cSAndroid Build Coastguard Worker AsyncUserId AsyncManager::GetNextUserId() {
556*cf78ab8cSAndroid Build Coastguard Worker   return taskManager_p_->GetNextUserId();
557*cf78ab8cSAndroid Build Coastguard Worker }
558*cf78ab8cSAndroid Build Coastguard Worker 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)559*cf78ab8cSAndroid Build Coastguard Worker AsyncTaskId AsyncManager::ExecAsync(AsyncUserId user_id,
560*cf78ab8cSAndroid Build Coastguard Worker                                     std::chrono::milliseconds delay,
561*cf78ab8cSAndroid Build Coastguard Worker                                     const TaskCallback &callback) {
562*cf78ab8cSAndroid Build Coastguard Worker   return taskManager_p_->ExecAsync(user_id, delay, callback);
563*cf78ab8cSAndroid Build Coastguard Worker }
564*cf78ab8cSAndroid Build Coastguard Worker 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)565*cf78ab8cSAndroid Build Coastguard Worker AsyncTaskId AsyncManager::ExecAsyncPeriodically(
566*cf78ab8cSAndroid Build Coastguard Worker     AsyncUserId user_id, std::chrono::milliseconds delay,
567*cf78ab8cSAndroid Build Coastguard Worker     std::chrono::milliseconds period, const TaskCallback &callback) {
568*cf78ab8cSAndroid Build Coastguard Worker   return taskManager_p_->ExecAsyncPeriodically(user_id, delay, period,
569*cf78ab8cSAndroid Build Coastguard Worker                                                callback);
570*cf78ab8cSAndroid Build Coastguard Worker }
571*cf78ab8cSAndroid Build Coastguard Worker 
CancelAsyncTask(AsyncTaskId async_task_id)572*cf78ab8cSAndroid Build Coastguard Worker bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
573*cf78ab8cSAndroid Build Coastguard Worker   return taskManager_p_->CancelAsyncTask(async_task_id);
574*cf78ab8cSAndroid Build Coastguard Worker }
575*cf78ab8cSAndroid Build Coastguard Worker 
CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id)576*cf78ab8cSAndroid Build Coastguard Worker bool AsyncManager::CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id) {
577*cf78ab8cSAndroid Build Coastguard Worker   return taskManager_p_->CancelAsyncTasksFromUser(user_id);
578*cf78ab8cSAndroid Build Coastguard Worker }
579*cf78ab8cSAndroid Build Coastguard Worker 
Synchronize(const CriticalCallback & critical)580*cf78ab8cSAndroid Build Coastguard Worker void AsyncManager::Synchronize(const CriticalCallback &critical) {
581*cf78ab8cSAndroid Build Coastguard Worker   taskManager_p_->Synchronize(critical);
582*cf78ab8cSAndroid Build Coastguard Worker }
583*cf78ab8cSAndroid Build Coastguard Worker }  // namespace rootcanal
584