1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <cassert>
18 #include <iostream>
19 #include <limits>
20 #include <memory>
21
22 #include "chre/platform/linux/task_util/task_manager.h"
23
24 namespace chre {
25
TaskManager()26 TaskManager::TaskManager()
27 : mQueue(std::greater<Task>()),
28 mCurrentTask(nullptr),
29 mContinueRunningThread(true),
30 mCurrentId(0) {
31 mThread = std::thread(&TaskManager::run, this);
32 }
33
~TaskManager()34 TaskManager::~TaskManager() {
35 flushAndStop();
36 }
37
addTask(const Task::TaskFunction & func,std::chrono::nanoseconds intervalOrDelay,bool isOneShot)38 std::optional<uint32_t> TaskManager::addTask(
39 const Task::TaskFunction &func, std::chrono::nanoseconds intervalOrDelay,
40 bool isOneShot) {
41 bool success = false;
42 uint32_t returnId;
43 {
44 std::lock_guard<std::mutex> lock(mMutex);
45
46 if (!mContinueRunningThread) {
47 LOGW("Execution thread is shutting down. Cannot add a task.");
48 } else {
49 // select the next ID
50 assert(mCurrentId < std::numeric_limits<uint32_t>::max());
51 returnId = mCurrentId++;
52 Task task(func, intervalOrDelay, returnId, isOneShot);
53 success = mQueue.push(task);
54 }
55 }
56
57 if (success) {
58 mConditionVariable.notify_all();
59 return returnId;
60 }
61 return std::optional<uint32_t>();
62 }
63
cancelTask(uint32_t taskId)64 bool TaskManager::cancelTask(uint32_t taskId) {
65 std::lock_guard<std::mutex> lock(mMutex);
66
67 bool success = false;
68 if (!mContinueRunningThread) {
69 LOGW("Execution thread is shutting down. Cannot cancel a task.");
70 } else if (mCurrentTask != nullptr && mCurrentTask->getId() == taskId) {
71 // The currently executing task may want to cancel itself.
72 mCurrentTask->cancel();
73 success = true;
74 } else {
75 for (auto iter = mQueue.begin(); iter != mQueue.end(); ++iter) {
76 if (iter->getId() == taskId) {
77 iter->cancel();
78 success = true;
79 break;
80 }
81 }
82 }
83
84 return success;
85 }
86
flushTasks()87 void TaskManager::flushTasks() {
88 std::lock_guard<std::mutex> lock(mMutex);
89 while (!mQueue.empty()) {
90 mQueue.pop();
91 }
92 }
93
flushAndStop()94 void TaskManager::flushAndStop() {
95 {
96 std::lock_guard<std::mutex> lock(mMutex);
97 if (!mContinueRunningThread) {
98 // Already shutting down.
99 return;
100 }
101
102 while (!mQueue.empty()) {
103 mQueue.pop();
104 }
105 mContinueRunningThread = false;
106 }
107
108 mConditionVariable.notify_all();
109 if (mThread.joinable()) {
110 mThread.join();
111 }
112 }
113
run()114 void TaskManager::run() {
115 while (true) {
116 Task task;
117 {
118 std::unique_lock<std::mutex> lock(mMutex);
119 mConditionVariable.wait(lock, [this]() {
120 return !mContinueRunningThread || !mQueue.empty();
121 });
122 if (!mContinueRunningThread) {
123 return;
124 }
125
126 task = mQueue.top();
127 if (!task.isReadyToExecute()) {
128 auto waitTime =
129 task.getExecutionTimestamp() - std::chrono::steady_clock::now();
130 if (waitTime.count() > 0) {
131 mConditionVariable.wait_for(lock, waitTime);
132 }
133
134 /**
135 * We continue here instead of executing the same task because we are
136 * not guaranteed that the condition variable was not spuriously woken
137 * up, and another task with a timestamp < the current task could have
138 * been added in the current time.
139 */
140 continue;
141 }
142
143 mQueue.pop();
144 mCurrentTask = &task;
145 }
146 task.execute();
147 {
148 std::lock_guard<std::mutex> lock(mMutex);
149 mCurrentTask = nullptr;
150
151 if (task.isRepeating() && !mQueue.push(task)) {
152 LOGE("TaskManager: Could not push task to priority queue");
153 }
154 }
155 }
156 }
157
158 } // namespace chre
159