xref: /aosp_15_r20/system/chre/platform/linux/task_util/task_manager.cc (revision 84e339476a462649f82315436d70fd732297a399)
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <cassert>
18 #include <iostream>
19 #include <limits>
20 #include <memory>
21 
22 #include "chre/platform/linux/task_util/task_manager.h"
23 
24 namespace chre {
25 
TaskManager()26 TaskManager::TaskManager()
27     : mQueue(std::greater<Task>()),
28       mCurrentTask(nullptr),
29       mContinueRunningThread(true),
30       mCurrentId(0) {
31   mThread = std::thread(&TaskManager::run, this);
32 }
33 
~TaskManager()34 TaskManager::~TaskManager() {
35   flushAndStop();
36 }
37 
addTask(const Task::TaskFunction & func,std::chrono::nanoseconds intervalOrDelay,bool isOneShot)38 std::optional<uint32_t> TaskManager::addTask(
39     const Task::TaskFunction &func, std::chrono::nanoseconds intervalOrDelay,
40     bool isOneShot) {
41   bool success = false;
42   uint32_t returnId;
43   {
44     std::lock_guard<std::mutex> lock(mMutex);
45 
46     if (!mContinueRunningThread) {
47       LOGW("Execution thread is shutting down. Cannot add a task.");
48     } else {
49       // select the next ID
50       assert(mCurrentId < std::numeric_limits<uint32_t>::max());
51       returnId = mCurrentId++;
52       Task task(func, intervalOrDelay, returnId, isOneShot);
53       success = mQueue.push(task);
54     }
55   }
56 
57   if (success) {
58     mConditionVariable.notify_all();
59     return returnId;
60   }
61   return std::optional<uint32_t>();
62 }
63 
cancelTask(uint32_t taskId)64 bool TaskManager::cancelTask(uint32_t taskId) {
65   std::lock_guard<std::mutex> lock(mMutex);
66 
67   bool success = false;
68   if (!mContinueRunningThread) {
69     LOGW("Execution thread is shutting down. Cannot cancel a task.");
70   } else if (mCurrentTask != nullptr && mCurrentTask->getId() == taskId) {
71     // The currently executing task may want to cancel itself.
72     mCurrentTask->cancel();
73     success = true;
74   } else {
75     for (auto iter = mQueue.begin(); iter != mQueue.end(); ++iter) {
76       if (iter->getId() == taskId) {
77         iter->cancel();
78         success = true;
79         break;
80       }
81     }
82   }
83 
84   return success;
85 }
86 
flushTasks()87 void TaskManager::flushTasks() {
88   std::lock_guard<std::mutex> lock(mMutex);
89   while (!mQueue.empty()) {
90     mQueue.pop();
91   }
92 }
93 
flushAndStop()94 void TaskManager::flushAndStop() {
95   {
96     std::lock_guard<std::mutex> lock(mMutex);
97     if (!mContinueRunningThread) {
98       // Already shutting down.
99       return;
100     }
101 
102     while (!mQueue.empty()) {
103       mQueue.pop();
104     }
105     mContinueRunningThread = false;
106   }
107 
108   mConditionVariable.notify_all();
109   if (mThread.joinable()) {
110     mThread.join();
111   }
112 }
113 
run()114 void TaskManager::run() {
115   while (true) {
116     Task task;
117     {
118       std::unique_lock<std::mutex> lock(mMutex);
119       mConditionVariable.wait(lock, [this]() {
120         return !mContinueRunningThread || !mQueue.empty();
121       });
122       if (!mContinueRunningThread) {
123         return;
124       }
125 
126       task = mQueue.top();
127       if (!task.isReadyToExecute()) {
128         auto waitTime =
129             task.getExecutionTimestamp() - std::chrono::steady_clock::now();
130         if (waitTime.count() > 0) {
131           mConditionVariable.wait_for(lock, waitTime);
132         }
133 
134         /**
135          * We continue here instead of executing the same task because we are
136          * not guaranteed that the condition variable was not spuriously woken
137          * up, and another task with a timestamp < the current task could have
138          * been added in the current time.
139          */
140         continue;
141       }
142 
143       mQueue.pop();
144       mCurrentTask = &task;
145     }
146     task.execute();
147     {
148       std::lock_guard<std::mutex> lock(mMutex);
149       mCurrentTask = nullptr;
150 
151       if (task.isRepeating() && !mQueue.push(task)) {
152         LOGE("TaskManager: Could not push task to priority queue");
153       }
154     }
155   }
156 }
157 
158 }  // namespace chre
159