xref: /aosp_15_r20/external/pigweed/pw_async2/dispatcher_base.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_async2/dispatcher_base.h"
16 
17 #include <mutex>
18 
19 #include "pw_assert/check.h"
20 #include "pw_sync/lock_annotations.h"
21 
22 namespace pw::async2 {
23 
ReEnqueue()24 void Context::ReEnqueue() {
25   Waker waker;
26   waker_->InternalCloneInto(waker);
27   std::move(waker).Wake();
28 }
29 
InternalStoreWaker(Waker & waker_out)30 void Context::InternalStoreWaker(Waker& waker_out) {
31   waker_->InternalCloneInto(waker_out);
32 }
33 
RemoveAllWakersLocked()34 void Task::RemoveAllWakersLocked() {
35   while (wakers_ != nullptr) {
36     Waker* current = wakers_;
37     wakers_ = current->next_;
38     current->task_ = nullptr;
39     current->next_ = nullptr;
40   }
41 }
42 
AddWakerLocked(Waker & waker)43 void Task::AddWakerLocked(Waker& waker) {
44   waker.task_ = this;
45   waker.next_ = wakers_;
46   wakers_ = &waker;
47 }
48 
RemoveWakerLocked(Waker & waker)49 void Task::RemoveWakerLocked(Waker& waker) {
50   if (&waker == wakers_) {
51     wakers_ = wakers_->next_;
52   } else {
53     Waker* current = wakers_;
54     while (current->next_ != &waker) {
55       current = current->next_;
56     }
57     current->next_ = current->next_->next_;
58   }
59   waker.task_ = nullptr;
60   waker.next_ = nullptr;
61 }
62 
IsRegistered() const63 bool Task::IsRegistered() const {
64   std::lock_guard lock(dispatcher_lock());
65   return state_ != Task::State::kUnposted;
66 }
67 
Deregister()68 void Task::Deregister() {
69   pw::sync::Mutex* task_execution_lock;
70   {
71     // Fast path: the task is not running.
72     std::lock_guard lock(dispatcher_lock());
73     if (TryDeregister()) {
74       return;
75     }
76     // The task was running, so we have to wait for the task to stop being
77     // run by acquiring the `task_lock`.
78     task_execution_lock = &dispatcher_->task_execution_lock_;
79   }
80 
81   // NOTE: there is a race here where `task_execution_lock_` may be
82   // invalidated by concurrent destruction of the dispatcher.
83   //
84   // This restriction is documented above, but is still fairly footgun-y.
85   std::lock_guard task_lock(*task_execution_lock);
86   std::lock_guard lock(dispatcher_lock());
87   PW_CHECK(TryDeregister());
88 }
89 
TryDeregister()90 bool Task::TryDeregister() {
91   switch (state_) {
92     case Task::State::kUnposted:
93       return true;
94     case Task::State::kSleeping:
95       dispatcher_->RemoveSleepingTaskLocked(*this);
96       break;
97     case Task::State::kRunning:
98       return false;
99     case Task::State::kWoken:
100       dispatcher_->RemoveWokenTaskLocked(*this);
101       break;
102   }
103   state_ = Task::State::kUnposted;
104   RemoveAllWakersLocked();
105 
106   // Wake the dispatcher up if this was the last task so that it can see that
107   // all tasks have completed.
108   if (dispatcher_->first_woken_ == nullptr &&
109       dispatcher_->sleeping_ == nullptr && dispatcher_->wants_wake_) {
110     dispatcher_->DoWake();
111   }
112   dispatcher_ = nullptr;
113   return true;
114 }
115 
Waker(Waker && other)116 Waker::Waker(Waker&& other) noexcept {
117   std::lock_guard lock(dispatcher_lock());
118   if (other.task_ == nullptr) {
119     return;
120   }
121   Task& task = *other.task_;
122   task.RemoveWakerLocked(other);
123   task.AddWakerLocked(*this);
124 }
125 
operator =(Waker && other)126 Waker& Waker::operator=(Waker&& other) noexcept {
127   std::lock_guard lock(dispatcher_lock());
128   RemoveFromTaskWakerListLocked();
129   if (other.task_ == nullptr) {
130     return *this;
131   }
132   Task& task = *other.task_;
133   task.RemoveWakerLocked(other);
134   task.AddWakerLocked(*this);
135   return *this;
136 }
137 
Wake()138 void Waker::Wake() && {
139   std::lock_guard lock(dispatcher_lock());
140   if (task_ != nullptr) {
141     task_->dispatcher_->WakeTask(*task_);
142     RemoveFromTaskWakerListLocked();
143   }
144 }
145 
InternalCloneInto(Waker & out)146 void Waker::InternalCloneInto(Waker& out) & {
147   std::lock_guard lock(dispatcher_lock());
148   // The `out` waker already points to this task, so no work is necessary.
149   if (out.task_ == task_) {
150     return;
151   }
152   // Remove the output waker from its existing task's list.
153   out.RemoveFromTaskWakerListLocked();
154   out.task_ = task_;
155   // Only add if the waker being cloned is actually associated with a task.
156   if (task_ != nullptr) {
157     task_->AddWakerLocked(out);
158   }
159 }
160 
IsEmpty() const161 bool Waker::IsEmpty() const {
162   std::lock_guard lock(dispatcher_lock());
163   return task_ == nullptr;
164 }
165 
InsertIntoTaskWakerList()166 void Waker::InsertIntoTaskWakerList() {
167   std::lock_guard lock(dispatcher_lock());
168   InsertIntoTaskWakerListLocked();
169 }
170 
InsertIntoTaskWakerListLocked()171 void Waker::InsertIntoTaskWakerListLocked() {
172   if (task_ != nullptr) {
173     task_->AddWakerLocked(*this);
174   }
175 }
176 
RemoveFromTaskWakerList()177 void Waker::RemoveFromTaskWakerList() {
178   std::lock_guard lock(dispatcher_lock());
179   RemoveFromTaskWakerListLocked();
180 }
181 
RemoveFromTaskWakerListLocked()182 void Waker::RemoveFromTaskWakerListLocked() {
183   if (task_ != nullptr) {
184     task_->RemoveWakerLocked(*this);
185   }
186 }
187 
Deregister()188 void NativeDispatcherBase::Deregister() {
189   std::lock_guard lock(dispatcher_lock());
190   UnpostTaskList(first_woken_);
191   first_woken_ = nullptr;
192   last_woken_ = nullptr;
193   UnpostTaskList(sleeping_);
194   sleeping_ = nullptr;
195 }
196 
Post(Task & task)197 void NativeDispatcherBase::Post(Task& task) {
198   bool wake_dispatcher = false;
199   {
200     std::lock_guard lock(dispatcher_lock());
201     PW_DASSERT(task.state_ == Task::State::kUnposted);
202     PW_DASSERT(task.dispatcher_ == nullptr);
203     task.state_ = Task::State::kWoken;
204     task.dispatcher_ = this;
205     AddTaskToWokenList(task);
206     if (wants_wake_) {
207       wake_dispatcher = true;
208       wants_wake_ = false;
209     }
210   }
211   // Note: unlike in ``WakeTask``, here we know that the ``Dispatcher`` will
212   // not be destroyed out from under our feet because we're in a method being
213   // called on the ``Dispatcher`` by a user.
214   if (wake_dispatcher) {
215     DoWake();
216   }
217 }
218 
AttemptRequestWake(bool allow_empty)219 NativeDispatcherBase::SleepInfo NativeDispatcherBase::AttemptRequestWake(
220     bool allow_empty) {
221   std::lock_guard lock(dispatcher_lock());
222   // Don't allow sleeping if there are already tasks waiting to be run.
223   if (first_woken_ != nullptr) {
224     return SleepInfo::DontSleep();
225   }
226   if (!allow_empty && sleeping_ == nullptr) {
227     return SleepInfo::DontSleep();
228   }
229   /// Indicate that the ``Dispatcher`` is sleeping and will need a ``DoWake``
230   /// call once more work can be done.
231   wants_wake_ = true;
232   // Once timers are added, this should check them.
233   return SleepInfo::Indefinitely();
234 }
235 
RunOneTask(Dispatcher & dispatcher,Task * task_to_look_for)236 NativeDispatcherBase::RunOneTaskResult NativeDispatcherBase::RunOneTask(
237     Dispatcher& dispatcher, Task* task_to_look_for) {
238   std::lock_guard task_lock(task_execution_lock_);
239   Task* task;
240   {
241     std::lock_guard lock(dispatcher_lock());
242     task = PopWokenTask();
243     if (task == nullptr) {
244       bool all_complete = first_woken_ == nullptr && sleeping_ == nullptr;
245       return RunOneTaskResult(
246           /*completed_all_tasks=*/all_complete,
247           /*completed_main_task=*/false,
248           /*ran_a_task=*/false);
249     }
250     task->state_ = Task::State::kRunning;
251   }
252 
253   bool complete;
254   {
255     Waker waker(*task);
256     Context context(dispatcher, waker);
257     complete = task->Pend(context).IsReady();
258   }
259   if (complete) {
260     bool all_complete;
261     {
262       std::lock_guard lock(dispatcher_lock());
263       switch (task->state_) {
264         case Task::State::kUnposted:
265         case Task::State::kSleeping:
266           PW_DASSERT(false);
267           PW_UNREACHABLE;
268         case Task::State::kRunning:
269           break;
270         case Task::State::kWoken:
271           RemoveWokenTaskLocked(*task);
272           break;
273       }
274       task->state_ = Task::State::kUnposted;
275       task->dispatcher_ = nullptr;
276       task->RemoveAllWakersLocked();
277       all_complete = first_woken_ == nullptr && sleeping_ == nullptr;
278     }
279     task->DoDestroy();
280     return RunOneTaskResult(
281         /*completed_all_tasks=*/all_complete,
282         /*completed_main_task=*/task == task_to_look_for,
283         /*ran_a_task=*/true);
284   } else {
285     std::lock_guard lock(dispatcher_lock());
286     if (task->state_ == Task::State::kRunning) {
287       task->state_ = Task::State::kSleeping;
288       AddTaskToSleepingList(*task);
289     }
290     return RunOneTaskResult(
291         /*completed_all_tasks=*/false,
292         /*completed_main_task=*/false,
293         /*ran_a_task=*/true);
294   }
295 }
296 
UnpostTaskList(Task * task)297 void NativeDispatcherBase::UnpostTaskList(Task* task) {
298   while (task != nullptr) {
299     task->state_ = Task::State::kUnposted;
300     task->dispatcher_ = nullptr;
301     task->prev_ = nullptr;
302     Task* next = task->next_;
303     task->next_ = nullptr;
304     task->RemoveAllWakersLocked();
305     task = next;
306   }
307 }
308 
RemoveTaskFromList(Task & task)309 void NativeDispatcherBase::RemoveTaskFromList(Task& task) {
310   if (task.prev_ != nullptr) {
311     task.prev_->next_ = task.next_;
312   }
313   if (task.next_ != nullptr) {
314     task.next_->prev_ = task.prev_;
315   }
316   task.prev_ = nullptr;
317   task.next_ = nullptr;
318 }
319 
RemoveWokenTaskLocked(Task & task)320 void NativeDispatcherBase::RemoveWokenTaskLocked(Task& task) {
321   if (first_woken_ == &task) {
322     first_woken_ = task.next_;
323   }
324   if (last_woken_ == &task) {
325     last_woken_ = task.prev_;
326   }
327   RemoveTaskFromList(task);
328 }
329 
RemoveSleepingTaskLocked(Task & task)330 void NativeDispatcherBase::RemoveSleepingTaskLocked(Task& task) {
331   if (sleeping_ == &task) {
332     sleeping_ = task.next_;
333   }
334   RemoveTaskFromList(task);
335 }
336 
AddTaskToWokenList(Task & task)337 void NativeDispatcherBase::AddTaskToWokenList(Task& task) {
338   if (first_woken_ == nullptr) {
339     first_woken_ = &task;
340   } else {
341     last_woken_->next_ = &task;
342     task.prev_ = last_woken_;
343   }
344   last_woken_ = &task;
345 }
346 
AddTaskToSleepingList(Task & task)347 void NativeDispatcherBase::AddTaskToSleepingList(Task& task) {
348   if (sleeping_ != nullptr) {
349     sleeping_->prev_ = &task;
350   }
351   task.next_ = sleeping_;
352   sleeping_ = &task;
353 }
354 
WakeTask(Task & task)355 void NativeDispatcherBase::WakeTask(Task& task) {
356   switch (task.state_) {
357     case Task::State::kWoken:
358       // Do nothing-- this has already been woken.
359       return;
360     case Task::State::kUnposted:
361       // This should be unreachable.
362       PW_CHECK(false);
363     case Task::State::kRunning:
364       // Wake again to indicate that this task should be run once more,
365       // as the state of the world may have changed since the task
366       // started running.
367       break;
368     case Task::State::kSleeping:
369       RemoveSleepingTaskLocked(task);
370       // Wake away!
371       break;
372   }
373   task.state_ = Task::State::kWoken;
374   AddTaskToWokenList(task);
375   if (wants_wake_) {
376     // Note: it's quite annoying to make this call under the lock, as it can
377     // result in extra thread wakeup/sleep cycles.
378     //
379     // However, releasing the lock first would allow for the possibility that
380     // the ``Dispatcher`` has been destroyed, making the call invalid.
381     DoWake();
382   }
383 }
384 
PopWokenTask()385 Task* NativeDispatcherBase::PopWokenTask() {
386   if (first_woken_ == nullptr) {
387     return nullptr;
388   }
389   Task& task = *first_woken_;
390   if (task.next_ != nullptr) {
391     task.next_->prev_ = nullptr;
392   } else {
393     last_woken_ = nullptr;
394   }
395   first_woken_ = task.next_;
396   task.prev_ = nullptr;
397   task.next_ = nullptr;
398   return &task;
399 }
400 
401 }  // namespace pw::async2
402