xref: /aosp_15_r20/external/pigweed/pw_async_basic/dispatcher.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker // Copyright 2023 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker #include "pw_async_basic/dispatcher.h"
15*61c4878aSAndroid Build Coastguard Worker 
16*61c4878aSAndroid Build Coastguard Worker #include <mutex>
17*61c4878aSAndroid Build Coastguard Worker #include <utility>
18*61c4878aSAndroid Build Coastguard Worker 
19*61c4878aSAndroid Build Coastguard Worker #include "pw_chrono/system_clock.h"
20*61c4878aSAndroid Build Coastguard Worker 
21*61c4878aSAndroid Build Coastguard Worker using namespace std::chrono_literals;
22*61c4878aSAndroid Build Coastguard Worker 
23*61c4878aSAndroid Build Coastguard Worker namespace pw::async {
24*61c4878aSAndroid Build Coastguard Worker 
~BasicDispatcher()25*61c4878aSAndroid Build Coastguard Worker BasicDispatcher::~BasicDispatcher() {
26*61c4878aSAndroid Build Coastguard Worker   RequestStop();
27*61c4878aSAndroid Build Coastguard Worker   lock_.lock();
28*61c4878aSAndroid Build Coastguard Worker   DrainTaskQueue();
29*61c4878aSAndroid Build Coastguard Worker   lock_.unlock();
30*61c4878aSAndroid Build Coastguard Worker }
31*61c4878aSAndroid Build Coastguard Worker 
Run()32*61c4878aSAndroid Build Coastguard Worker void BasicDispatcher::Run() {
33*61c4878aSAndroid Build Coastguard Worker   lock_.lock();
34*61c4878aSAndroid Build Coastguard Worker   while (!stop_requested_) {
35*61c4878aSAndroid Build Coastguard Worker     MaybeSleep();
36*61c4878aSAndroid Build Coastguard Worker     ExecuteDueTasks();
37*61c4878aSAndroid Build Coastguard Worker   }
38*61c4878aSAndroid Build Coastguard Worker   DrainTaskQueue();
39*61c4878aSAndroid Build Coastguard Worker   lock_.unlock();
40*61c4878aSAndroid Build Coastguard Worker }
41*61c4878aSAndroid Build Coastguard Worker 
RunUntilIdle()42*61c4878aSAndroid Build Coastguard Worker void BasicDispatcher::RunUntilIdle() {
43*61c4878aSAndroid Build Coastguard Worker   lock_.lock();
44*61c4878aSAndroid Build Coastguard Worker   ExecuteDueTasks();
45*61c4878aSAndroid Build Coastguard Worker   if (stop_requested_) {
46*61c4878aSAndroid Build Coastguard Worker     DrainTaskQueue();
47*61c4878aSAndroid Build Coastguard Worker   }
48*61c4878aSAndroid Build Coastguard Worker   lock_.unlock();
49*61c4878aSAndroid Build Coastguard Worker }
50*61c4878aSAndroid Build Coastguard Worker 
RunUntil(chrono::SystemClock::time_point end_time)51*61c4878aSAndroid Build Coastguard Worker void BasicDispatcher::RunUntil(chrono::SystemClock::time_point end_time) {
52*61c4878aSAndroid Build Coastguard Worker   lock_.lock();
53*61c4878aSAndroid Build Coastguard Worker   while (end_time < now() && !stop_requested_) {
54*61c4878aSAndroid Build Coastguard Worker     MaybeSleep();
55*61c4878aSAndroid Build Coastguard Worker     ExecuteDueTasks();
56*61c4878aSAndroid Build Coastguard Worker   }
57*61c4878aSAndroid Build Coastguard Worker   if (stop_requested_) {
58*61c4878aSAndroid Build Coastguard Worker     DrainTaskQueue();
59*61c4878aSAndroid Build Coastguard Worker   }
60*61c4878aSAndroid Build Coastguard Worker   lock_.unlock();
61*61c4878aSAndroid Build Coastguard Worker }
62*61c4878aSAndroid Build Coastguard Worker 
RunFor(chrono::SystemClock::duration duration)63*61c4878aSAndroid Build Coastguard Worker void BasicDispatcher::RunFor(chrono::SystemClock::duration duration) {
64*61c4878aSAndroid Build Coastguard Worker   RunUntil(now() + duration);
65*61c4878aSAndroid Build Coastguard Worker }
66*61c4878aSAndroid Build Coastguard Worker 
MaybeSleep()67*61c4878aSAndroid Build Coastguard Worker void BasicDispatcher::MaybeSleep() {
68*61c4878aSAndroid Build Coastguard Worker   if (task_queue_.empty() || task_queue_.front().due_time_ > now()) {
69*61c4878aSAndroid Build Coastguard Worker     // Sleep until a notification is received or until the due time of the
70*61c4878aSAndroid Build Coastguard Worker     // next task. Notifications are sent when tasks are posted or 'stop' is
71*61c4878aSAndroid Build Coastguard Worker     // requested.
72*61c4878aSAndroid Build Coastguard Worker     std::optional<chrono::SystemClock::time_point> wake_time = std::nullopt;
73*61c4878aSAndroid Build Coastguard Worker     if (!task_queue_.empty()) {
74*61c4878aSAndroid Build Coastguard Worker       wake_time = task_queue_.front().due_time_;
75*61c4878aSAndroid Build Coastguard Worker     }
76*61c4878aSAndroid Build Coastguard Worker     lock_.unlock();
77*61c4878aSAndroid Build Coastguard Worker     if (wake_time.has_value()) {
78*61c4878aSAndroid Build Coastguard Worker       std::ignore = timed_notification_.try_acquire_until(*wake_time);
79*61c4878aSAndroid Build Coastguard Worker     } else {
80*61c4878aSAndroid Build Coastguard Worker       timed_notification_.acquire();
81*61c4878aSAndroid Build Coastguard Worker     }
82*61c4878aSAndroid Build Coastguard Worker     lock_.lock();
83*61c4878aSAndroid Build Coastguard Worker   }
84*61c4878aSAndroid Build Coastguard Worker }
85*61c4878aSAndroid Build Coastguard Worker 
ExecuteDueTasks()86*61c4878aSAndroid Build Coastguard Worker void BasicDispatcher::ExecuteDueTasks() {
87*61c4878aSAndroid Build Coastguard Worker   while (!task_queue_.empty() && task_queue_.front().due_time_ <= now() &&
88*61c4878aSAndroid Build Coastguard Worker          !stop_requested_) {
89*61c4878aSAndroid Build Coastguard Worker     backend::NativeTask& task = task_queue_.front();
90*61c4878aSAndroid Build Coastguard Worker     task_queue_.pop_front();
91*61c4878aSAndroid Build Coastguard Worker 
92*61c4878aSAndroid Build Coastguard Worker     lock_.unlock();
93*61c4878aSAndroid Build Coastguard Worker     Context ctx{this, &task.task_};
94*61c4878aSAndroid Build Coastguard Worker     task(ctx, OkStatus());
95*61c4878aSAndroid Build Coastguard Worker     lock_.lock();
96*61c4878aSAndroid Build Coastguard Worker   }
97*61c4878aSAndroid Build Coastguard Worker }
98*61c4878aSAndroid Build Coastguard Worker 
RequestStop()99*61c4878aSAndroid Build Coastguard Worker void BasicDispatcher::RequestStop() {
100*61c4878aSAndroid Build Coastguard Worker   {
101*61c4878aSAndroid Build Coastguard Worker     std::lock_guard lock(lock_);
102*61c4878aSAndroid Build Coastguard Worker     stop_requested_ = true;
103*61c4878aSAndroid Build Coastguard Worker   }
104*61c4878aSAndroid Build Coastguard Worker   timed_notification_.release();
105*61c4878aSAndroid Build Coastguard Worker }
106*61c4878aSAndroid Build Coastguard Worker 
DrainTaskQueue()107*61c4878aSAndroid Build Coastguard Worker void BasicDispatcher::DrainTaskQueue() {
108*61c4878aSAndroid Build Coastguard Worker   while (!task_queue_.empty()) {
109*61c4878aSAndroid Build Coastguard Worker     backend::NativeTask& task = task_queue_.front();
110*61c4878aSAndroid Build Coastguard Worker     task_queue_.pop_front();
111*61c4878aSAndroid Build Coastguard Worker 
112*61c4878aSAndroid Build Coastguard Worker     lock_.unlock();
113*61c4878aSAndroid Build Coastguard Worker     Context ctx{this, &task.task_};
114*61c4878aSAndroid Build Coastguard Worker     task(ctx, Status::Cancelled());
115*61c4878aSAndroid Build Coastguard Worker     lock_.lock();
116*61c4878aSAndroid Build Coastguard Worker   }
117*61c4878aSAndroid Build Coastguard Worker }
118*61c4878aSAndroid Build Coastguard Worker 
PostAt(Task & task,chrono::SystemClock::time_point time)119*61c4878aSAndroid Build Coastguard Worker void BasicDispatcher::PostAt(Task& task, chrono::SystemClock::time_point time) {
120*61c4878aSAndroid Build Coastguard Worker   PostTaskInternal(task.native_type(), time);
121*61c4878aSAndroid Build Coastguard Worker }
122*61c4878aSAndroid Build Coastguard Worker 
Cancel(Task & task)123*61c4878aSAndroid Build Coastguard Worker bool BasicDispatcher::Cancel(Task& task) {
124*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(lock_);
125*61c4878aSAndroid Build Coastguard Worker   return task_queue_.remove(task.native_type());
126*61c4878aSAndroid Build Coastguard Worker }
127*61c4878aSAndroid Build Coastguard Worker 
PostTaskInternal(backend::NativeTask & task,chrono::SystemClock::time_point time_due)128*61c4878aSAndroid Build Coastguard Worker void BasicDispatcher::PostTaskInternal(
129*61c4878aSAndroid Build Coastguard Worker     backend::NativeTask& task, chrono::SystemClock::time_point time_due) {
130*61c4878aSAndroid Build Coastguard Worker   lock_.lock();
131*61c4878aSAndroid Build Coastguard Worker   task.due_time_ = time_due;
132*61c4878aSAndroid Build Coastguard Worker   // Insert the new task in the queue after all tasks with the same or earlier
133*61c4878aSAndroid Build Coastguard Worker   // deadline to ensure FIFO execution order.
134*61c4878aSAndroid Build Coastguard Worker   auto it_front = task_queue_.begin();
135*61c4878aSAndroid Build Coastguard Worker   auto it_behind = task_queue_.before_begin();
136*61c4878aSAndroid Build Coastguard Worker   while (it_front != task_queue_.end() && time_due >= it_front->due_time_) {
137*61c4878aSAndroid Build Coastguard Worker     ++it_front;
138*61c4878aSAndroid Build Coastguard Worker     ++it_behind;
139*61c4878aSAndroid Build Coastguard Worker   }
140*61c4878aSAndroid Build Coastguard Worker   task_queue_.insert_after(it_behind, task);
141*61c4878aSAndroid Build Coastguard Worker   lock_.unlock();
142*61c4878aSAndroid Build Coastguard Worker   timed_notification_.release();
143*61c4878aSAndroid Build Coastguard Worker }
144*61c4878aSAndroid Build Coastguard Worker 
145*61c4878aSAndroid Build Coastguard Worker }  // namespace pw::async
146