1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_async2/dispatcher_base.h"
16
17 #include <mutex>
18
19 #include "pw_assert/check.h"
20 #include "pw_sync/lock_annotations.h"
21
22 namespace pw::async2 {
23
ReEnqueue()24 void Context::ReEnqueue() {
25 Waker waker;
26 waker_->InternalCloneInto(waker);
27 std::move(waker).Wake();
28 }
29
InternalStoreWaker(Waker & waker_out)30 void Context::InternalStoreWaker(Waker& waker_out) {
31 waker_->InternalCloneInto(waker_out);
32 }
33
RemoveAllWakersLocked()34 void Task::RemoveAllWakersLocked() {
35 while (wakers_ != nullptr) {
36 Waker* current = wakers_;
37 wakers_ = current->next_;
38 current->task_ = nullptr;
39 current->next_ = nullptr;
40 }
41 }
42
AddWakerLocked(Waker & waker)43 void Task::AddWakerLocked(Waker& waker) {
44 waker.task_ = this;
45 waker.next_ = wakers_;
46 wakers_ = &waker;
47 }
48
RemoveWakerLocked(Waker & waker)49 void Task::RemoveWakerLocked(Waker& waker) {
50 if (&waker == wakers_) {
51 wakers_ = wakers_->next_;
52 } else {
53 Waker* current = wakers_;
54 while (current->next_ != &waker) {
55 current = current->next_;
56 }
57 current->next_ = current->next_->next_;
58 }
59 waker.task_ = nullptr;
60 waker.next_ = nullptr;
61 }
62
IsRegistered() const63 bool Task::IsRegistered() const {
64 std::lock_guard lock(dispatcher_lock());
65 return state_ != Task::State::kUnposted;
66 }
67
Deregister()68 void Task::Deregister() {
69 pw::sync::Mutex* task_execution_lock;
70 {
71 // Fast path: the task is not running.
72 std::lock_guard lock(dispatcher_lock());
73 if (TryDeregister()) {
74 return;
75 }
76 // The task was running, so we have to wait for the task to stop being
77 // run by acquiring the `task_lock`.
78 task_execution_lock = &dispatcher_->task_execution_lock_;
79 }
80
81 // NOTE: there is a race here where `task_execution_lock_` may be
82 // invalidated by concurrent destruction of the dispatcher.
83 //
84 // This restriction is documented above, but is still fairly footgun-y.
85 std::lock_guard task_lock(*task_execution_lock);
86 std::lock_guard lock(dispatcher_lock());
87 PW_CHECK(TryDeregister());
88 }
89
TryDeregister()90 bool Task::TryDeregister() {
91 switch (state_) {
92 case Task::State::kUnposted:
93 return true;
94 case Task::State::kSleeping:
95 dispatcher_->RemoveSleepingTaskLocked(*this);
96 break;
97 case Task::State::kRunning:
98 return false;
99 case Task::State::kWoken:
100 dispatcher_->RemoveWokenTaskLocked(*this);
101 break;
102 }
103 state_ = Task::State::kUnposted;
104 RemoveAllWakersLocked();
105
106 // Wake the dispatcher up if this was the last task so that it can see that
107 // all tasks have completed.
108 if (dispatcher_->first_woken_ == nullptr &&
109 dispatcher_->sleeping_ == nullptr && dispatcher_->wants_wake_) {
110 dispatcher_->DoWake();
111 }
112 dispatcher_ = nullptr;
113 return true;
114 }
115
Waker(Waker && other)116 Waker::Waker(Waker&& other) noexcept {
117 std::lock_guard lock(dispatcher_lock());
118 if (other.task_ == nullptr) {
119 return;
120 }
121 Task& task = *other.task_;
122 task.RemoveWakerLocked(other);
123 task.AddWakerLocked(*this);
124 }
125
operator =(Waker && other)126 Waker& Waker::operator=(Waker&& other) noexcept {
127 std::lock_guard lock(dispatcher_lock());
128 RemoveFromTaskWakerListLocked();
129 if (other.task_ == nullptr) {
130 return *this;
131 }
132 Task& task = *other.task_;
133 task.RemoveWakerLocked(other);
134 task.AddWakerLocked(*this);
135 return *this;
136 }
137
Wake()138 void Waker::Wake() && {
139 std::lock_guard lock(dispatcher_lock());
140 if (task_ != nullptr) {
141 task_->dispatcher_->WakeTask(*task_);
142 RemoveFromTaskWakerListLocked();
143 }
144 }
145
InternalCloneInto(Waker & out)146 void Waker::InternalCloneInto(Waker& out) & {
147 std::lock_guard lock(dispatcher_lock());
148 // The `out` waker already points to this task, so no work is necessary.
149 if (out.task_ == task_) {
150 return;
151 }
152 // Remove the output waker from its existing task's list.
153 out.RemoveFromTaskWakerListLocked();
154 out.task_ = task_;
155 // Only add if the waker being cloned is actually associated with a task.
156 if (task_ != nullptr) {
157 task_->AddWakerLocked(out);
158 }
159 }
160
IsEmpty() const161 bool Waker::IsEmpty() const {
162 std::lock_guard lock(dispatcher_lock());
163 return task_ == nullptr;
164 }
165
InsertIntoTaskWakerList()166 void Waker::InsertIntoTaskWakerList() {
167 std::lock_guard lock(dispatcher_lock());
168 InsertIntoTaskWakerListLocked();
169 }
170
InsertIntoTaskWakerListLocked()171 void Waker::InsertIntoTaskWakerListLocked() {
172 if (task_ != nullptr) {
173 task_->AddWakerLocked(*this);
174 }
175 }
176
RemoveFromTaskWakerList()177 void Waker::RemoveFromTaskWakerList() {
178 std::lock_guard lock(dispatcher_lock());
179 RemoveFromTaskWakerListLocked();
180 }
181
RemoveFromTaskWakerListLocked()182 void Waker::RemoveFromTaskWakerListLocked() {
183 if (task_ != nullptr) {
184 task_->RemoveWakerLocked(*this);
185 }
186 }
187
Deregister()188 void NativeDispatcherBase::Deregister() {
189 std::lock_guard lock(dispatcher_lock());
190 UnpostTaskList(first_woken_);
191 first_woken_ = nullptr;
192 last_woken_ = nullptr;
193 UnpostTaskList(sleeping_);
194 sleeping_ = nullptr;
195 }
196
Post(Task & task)197 void NativeDispatcherBase::Post(Task& task) {
198 bool wake_dispatcher = false;
199 {
200 std::lock_guard lock(dispatcher_lock());
201 PW_DASSERT(task.state_ == Task::State::kUnposted);
202 PW_DASSERT(task.dispatcher_ == nullptr);
203 task.state_ = Task::State::kWoken;
204 task.dispatcher_ = this;
205 AddTaskToWokenList(task);
206 if (wants_wake_) {
207 wake_dispatcher = true;
208 wants_wake_ = false;
209 }
210 }
211 // Note: unlike in ``WakeTask``, here we know that the ``Dispatcher`` will
212 // not be destroyed out from under our feet because we're in a method being
213 // called on the ``Dispatcher`` by a user.
214 if (wake_dispatcher) {
215 DoWake();
216 }
217 }
218
AttemptRequestWake(bool allow_empty)219 NativeDispatcherBase::SleepInfo NativeDispatcherBase::AttemptRequestWake(
220 bool allow_empty) {
221 std::lock_guard lock(dispatcher_lock());
222 // Don't allow sleeping if there are already tasks waiting to be run.
223 if (first_woken_ != nullptr) {
224 return SleepInfo::DontSleep();
225 }
226 if (!allow_empty && sleeping_ == nullptr) {
227 return SleepInfo::DontSleep();
228 }
229 /// Indicate that the ``Dispatcher`` is sleeping and will need a ``DoWake``
230 /// call once more work can be done.
231 wants_wake_ = true;
232 // Once timers are added, this should check them.
233 return SleepInfo::Indefinitely();
234 }
235
RunOneTask(Dispatcher & dispatcher,Task * task_to_look_for)236 NativeDispatcherBase::RunOneTaskResult NativeDispatcherBase::RunOneTask(
237 Dispatcher& dispatcher, Task* task_to_look_for) {
238 std::lock_guard task_lock(task_execution_lock_);
239 Task* task;
240 {
241 std::lock_guard lock(dispatcher_lock());
242 task = PopWokenTask();
243 if (task == nullptr) {
244 bool all_complete = first_woken_ == nullptr && sleeping_ == nullptr;
245 return RunOneTaskResult(
246 /*completed_all_tasks=*/all_complete,
247 /*completed_main_task=*/false,
248 /*ran_a_task=*/false);
249 }
250 task->state_ = Task::State::kRunning;
251 }
252
253 bool complete;
254 {
255 Waker waker(*task);
256 Context context(dispatcher, waker);
257 complete = task->Pend(context).IsReady();
258 }
259 if (complete) {
260 bool all_complete;
261 {
262 std::lock_guard lock(dispatcher_lock());
263 switch (task->state_) {
264 case Task::State::kUnposted:
265 case Task::State::kSleeping:
266 PW_DASSERT(false);
267 PW_UNREACHABLE;
268 case Task::State::kRunning:
269 break;
270 case Task::State::kWoken:
271 RemoveWokenTaskLocked(*task);
272 break;
273 }
274 task->state_ = Task::State::kUnposted;
275 task->dispatcher_ = nullptr;
276 task->RemoveAllWakersLocked();
277 all_complete = first_woken_ == nullptr && sleeping_ == nullptr;
278 }
279 task->DoDestroy();
280 return RunOneTaskResult(
281 /*completed_all_tasks=*/all_complete,
282 /*completed_main_task=*/task == task_to_look_for,
283 /*ran_a_task=*/true);
284 } else {
285 std::lock_guard lock(dispatcher_lock());
286 if (task->state_ == Task::State::kRunning) {
287 task->state_ = Task::State::kSleeping;
288 AddTaskToSleepingList(*task);
289 }
290 return RunOneTaskResult(
291 /*completed_all_tasks=*/false,
292 /*completed_main_task=*/false,
293 /*ran_a_task=*/true);
294 }
295 }
296
UnpostTaskList(Task * task)297 void NativeDispatcherBase::UnpostTaskList(Task* task) {
298 while (task != nullptr) {
299 task->state_ = Task::State::kUnposted;
300 task->dispatcher_ = nullptr;
301 task->prev_ = nullptr;
302 Task* next = task->next_;
303 task->next_ = nullptr;
304 task->RemoveAllWakersLocked();
305 task = next;
306 }
307 }
308
RemoveTaskFromList(Task & task)309 void NativeDispatcherBase::RemoveTaskFromList(Task& task) {
310 if (task.prev_ != nullptr) {
311 task.prev_->next_ = task.next_;
312 }
313 if (task.next_ != nullptr) {
314 task.next_->prev_ = task.prev_;
315 }
316 task.prev_ = nullptr;
317 task.next_ = nullptr;
318 }
319
RemoveWokenTaskLocked(Task & task)320 void NativeDispatcherBase::RemoveWokenTaskLocked(Task& task) {
321 if (first_woken_ == &task) {
322 first_woken_ = task.next_;
323 }
324 if (last_woken_ == &task) {
325 last_woken_ = task.prev_;
326 }
327 RemoveTaskFromList(task);
328 }
329
RemoveSleepingTaskLocked(Task & task)330 void NativeDispatcherBase::RemoveSleepingTaskLocked(Task& task) {
331 if (sleeping_ == &task) {
332 sleeping_ = task.next_;
333 }
334 RemoveTaskFromList(task);
335 }
336
AddTaskToWokenList(Task & task)337 void NativeDispatcherBase::AddTaskToWokenList(Task& task) {
338 if (first_woken_ == nullptr) {
339 first_woken_ = &task;
340 } else {
341 last_woken_->next_ = &task;
342 task.prev_ = last_woken_;
343 }
344 last_woken_ = &task;
345 }
346
AddTaskToSleepingList(Task & task)347 void NativeDispatcherBase::AddTaskToSleepingList(Task& task) {
348 if (sleeping_ != nullptr) {
349 sleeping_->prev_ = &task;
350 }
351 task.next_ = sleeping_;
352 sleeping_ = &task;
353 }
354
WakeTask(Task & task)355 void NativeDispatcherBase::WakeTask(Task& task) {
356 switch (task.state_) {
357 case Task::State::kWoken:
358 // Do nothing-- this has already been woken.
359 return;
360 case Task::State::kUnposted:
361 // This should be unreachable.
362 PW_CHECK(false);
363 case Task::State::kRunning:
364 // Wake again to indicate that this task should be run once more,
365 // as the state of the world may have changed since the task
366 // started running.
367 break;
368 case Task::State::kSleeping:
369 RemoveSleepingTaskLocked(task);
370 // Wake away!
371 break;
372 }
373 task.state_ = Task::State::kWoken;
374 AddTaskToWokenList(task);
375 if (wants_wake_) {
376 // Note: it's quite annoying to make this call under the lock, as it can
377 // result in extra thread wakeup/sleep cycles.
378 //
379 // However, releasing the lock first would allow for the possibility that
380 // the ``Dispatcher`` has been destroyed, making the call invalid.
381 DoWake();
382 }
383 }
384
PopWokenTask()385 Task* NativeDispatcherBase::PopWokenTask() {
386 if (first_woken_ == nullptr) {
387 return nullptr;
388 }
389 Task& task = *first_woken_;
390 if (task.next_ != nullptr) {
391 task.next_->prev_ = nullptr;
392 } else {
393 last_woken_ = nullptr;
394 }
395 first_woken_ = task.next_;
396 task.prev_ = nullptr;
397 task.next_ = nullptr;
398 return &task;
399 }
400
401 } // namespace pw::async2
402