1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_async_fuchsia/fake_dispatcher.h"
16
17 #include "pw_async_fuchsia/util.h"
18
19 namespace pw::async::test::backend {
20
NativeFakeDispatcher(Dispatcher & test_dispatcher)21 NativeFakeDispatcher::NativeFakeDispatcher(Dispatcher& test_dispatcher)
22 : dispatcher_(test_dispatcher) {}
23
DestroyLoop()24 bool NativeFakeDispatcher::DestroyLoop() { return fake_loop_.Shutdown(); }
25
now()26 chrono::SystemClock::time_point NativeFakeDispatcher::now() {
27 return fake_loop_.Now();
28 }
29
Post(Task & task)30 void NativeFakeDispatcher::Post(Task& task) { PostAt(task, now()); }
31
PostAfter(Task & task,chrono::SystemClock::duration delay)32 void NativeFakeDispatcher::PostAfter(Task& task,
33 chrono::SystemClock::duration delay) {
34 PostAt(task, now() + delay);
35 }
36
PostAt(Task & task,chrono::SystemClock::time_point time)37 void NativeFakeDispatcher::PostAt(Task& task,
38 chrono::SystemClock::time_point time) {
39 // TODO: https://fxbug.dev/42075952 - Return errors once these methods return
40 // a Status.
41 if (!fake_loop_.Runnable()) {
42 Context ctx{.dispatcher = &dispatcher_, .task = &task};
43 task(ctx, Status::Cancelled());
44 return;
45 }
46 ::pw::async::backend::NativeTask& native_task = task.native_type();
47 native_task.set_due_time(time);
48 native_task.dispatcher_ = &dispatcher_;
49 fake_loop_.PostTask(&native_task);
50 }
51
Cancel(Task & task)52 bool NativeFakeDispatcher::Cancel(Task& task) {
53 return fake_loop_.Runnable() &&
54 fake_loop_.CancelTask(&task.native_type()) == ZX_OK;
55 }
56
RunUntilIdle()57 bool NativeFakeDispatcher::RunUntilIdle() {
58 if (stop_requested_) {
59 return DestroyLoop();
60 }
61 return fake_loop_.RunUntilIdle();
62 }
63
RunUntil(chrono::SystemClock::time_point end_time)64 bool NativeFakeDispatcher::RunUntil(chrono::SystemClock::time_point end_time) {
65 if (stop_requested_) {
66 return DestroyLoop();
67 }
68 return fake_loop_.Run(pw::async_fuchsia::TimepointToZxTime(end_time).get(),
69 false);
70 }
71
RunFor(chrono::SystemClock::duration duration)72 bool NativeFakeDispatcher::RunFor(chrono::SystemClock::duration duration) {
73 return RunUntil(now() + duration);
74 }
75
FakeAsyncLoop()76 NativeFakeDispatcher::FakeAsyncLoop::FakeAsyncLoop() {
77 list_initialize(&task_list_);
78 list_initialize(&due_list_);
79 }
80
Now() const81 chrono::SystemClock::time_point NativeFakeDispatcher::FakeAsyncLoop::Now()
82 const {
83 return pw::async_fuchsia::ZxTimeToTimepoint(zx::time{now_});
84 }
85
PostTask(async_task_t * task)86 zx_status_t NativeFakeDispatcher::FakeAsyncLoop::PostTask(async_task_t* task) {
87 if (state_ == ASYNC_LOOP_SHUTDOWN) {
88 return ZX_ERR_BAD_STATE;
89 }
90
91 InsertTask(task);
92 if (!dispatching_tasks_ && TaskToNode(task)->prev == &task_list_) {
93 // Task inserted at head. Earliest deadline changed.
94 RestartTimer();
95 }
96
97 return ZX_OK;
98 }
99
CancelTask(async_task_t * task)100 zx_status_t NativeFakeDispatcher::FakeAsyncLoop::CancelTask(
101 async_task_t* task) {
102 // Note: We need to process cancellations even while the loop is being
103 // destroyed in case the client is counting on the handler not being
104 // invoked again past this point. Also, the task we're removing here
105 // might be present in the dispatcher's |due_list| if it is pending
106 // dispatch instead of in the loop's |task_list| as usual. The same
107 // logic works in both cases.
108
109 list_node_t* node = TaskToNode(task);
110 if (!list_in_list(node)) {
111 return ZX_ERR_NOT_FOUND;
112 }
113
114 // Determine whether the head task was canceled and following task has
115 // a later deadline. If so, we will bump the timer along to that deadline.
116 bool must_restart = !dispatching_tasks_ && node->prev == &task_list_ &&
117 (node->next == &task_list_ ||
118 NodeToTask(node->next)->deadline > task->deadline);
119 list_delete(node);
120 if (must_restart) {
121 RestartTimer();
122 }
123
124 return ZX_OK;
125 }
126
RunUntilIdle()127 bool NativeFakeDispatcher::FakeAsyncLoop::RunUntilIdle() {
128 return Run(now_, false);
129 }
130
Run(zx_time_t deadline,bool once)131 bool NativeFakeDispatcher::FakeAsyncLoop::Run(zx_time_t deadline, bool once) {
132 zx_status_t status;
133 bool task_invoked = false;
134 do {
135 status = RunOnce(deadline, &task_invoked);
136 } while (status == ZX_OK && !once);
137 return task_invoked;
138 }
139
InsertTask(async_task_t * task)140 void NativeFakeDispatcher::FakeAsyncLoop::InsertTask(async_task_t* task) {
141 list_node_t* node;
142 for (node = task_list_.prev; node != &task_list_; node = node->prev) {
143 if (task->deadline >= NodeToTask(node)->deadline) {
144 break;
145 }
146 }
147 list_add_after(node, TaskToNode(task));
148 }
149
RestartTimer()150 void NativeFakeDispatcher::FakeAsyncLoop::RestartTimer() {
151 zx_time_t deadline = NextDeadline();
152
153 if (deadline == ZX_TIME_INFINITE) {
154 // Nothing is left on the queue to fire.
155 if (timer_armed_) {
156 next_timer_expiration_ =
157 ZX_TIME_INFINITE; // Simulate timer cancellation.
158 timer_armed_ = false;
159 }
160 return;
161 }
162
163 next_timer_expiration_ = deadline;
164
165 if (!timer_armed_) {
166 timer_armed_ = true;
167 }
168 }
169
NextDeadline()170 zx_time_t NativeFakeDispatcher::FakeAsyncLoop::NextDeadline() {
171 if (list_is_empty(&due_list_)) {
172 list_node_t* head = list_peek_head(&task_list_);
173 if (!head) {
174 return ZX_TIME_INFINITE;
175 }
176 async_task_t* task = NodeToTask(head);
177 return task->deadline;
178 }
179 // Fire now.
180 return 0ULL;
181 }
182
RunOnce(zx_time_t deadline,bool * task_invoked)183 zx_status_t NativeFakeDispatcher::FakeAsyncLoop::RunOnce(zx_time_t deadline,
184 bool* task_invoked) {
185 if (state_ == ASYNC_LOOP_SHUTDOWN) {
186 return ZX_ERR_BAD_STATE;
187 }
188 if (state_ != ASYNC_LOOP_RUNNABLE) {
189 return ZX_ERR_CANCELED;
190 }
191
192 // Simulate timeout of zx_port_wait() syscall.
193 if (deadline < next_timer_expiration_) {
194 now_ = deadline;
195 return ZX_ERR_TIMED_OUT;
196 }
197 // Otherwise, a timer would have expired at or before `deadline`.
198 now_ = next_timer_expiration_;
199 next_timer_expiration_ = ZX_TIME_INFINITE;
200 *task_invoked |= DispatchTasks();
201 return ZX_OK;
202 }
203
DispatchTasks()204 bool NativeFakeDispatcher::FakeAsyncLoop::DispatchTasks() {
205 bool task_invoked = false;
206 // Dequeue and dispatch one task at a time in case an earlier task wants
207 // to cancel a later task which has also come due. Timer restarts are
208 // suppressed until we run out of tasks to dispatch.
209 if (!dispatching_tasks_) {
210 dispatching_tasks_ = true;
211
212 // Extract all of the tasks that are due into |due_list| for dispatch
213 // unless we already have some waiting from a previous iteration which
214 // we would like to process in order.
215 list_node_t* node;
216 if (list_is_empty(&due_list_)) {
217 zx_time_t due_time = now_;
218 list_node_t* tail = nullptr;
219 list_for_every(&task_list_, node) {
220 if (NodeToTask(node)->deadline > due_time) {
221 break;
222 }
223 tail = node;
224 }
225 if (tail) {
226 list_node_t* head = task_list_.next;
227 task_list_.next = tail->next;
228 tail->next->prev = &task_list_;
229 due_list_.next = head;
230 head->prev = &due_list_;
231 due_list_.prev = tail;
232 tail->next = &due_list_;
233 }
234 }
235
236 // Dispatch all due tasks.
237 while ((node = list_remove_head(&due_list_))) {
238 // Invoke the handler. Note that it might destroy itself.
239 async_task_t* task = NodeToTask(node);
240
241 task->handler(nullptr, task, ZX_OK);
242 task_invoked = true;
243
244 if (state_ != ASYNC_LOOP_RUNNABLE) {
245 break;
246 }
247 }
248
249 dispatching_tasks_ = false;
250 timer_armed_ = false;
251 RestartTimer();
252 }
253
254 return task_invoked;
255 }
256
Shutdown()257 bool NativeFakeDispatcher::FakeAsyncLoop::Shutdown() {
258 if (state_ == ASYNC_LOOP_SHUTDOWN) {
259 return false;
260 }
261 state_ = ASYNC_LOOP_SHUTDOWN;
262
263 // Cancel any remaining pending tasks on our queues.
264 return CancelAll();
265 }
266
CancelAll()267 bool NativeFakeDispatcher::FakeAsyncLoop::CancelAll() {
268 ZX_DEBUG_ASSERT(state_ == ASYNC_LOOP_SHUTDOWN);
269 bool task_invoked = false;
270
271 list_node_t* node;
272
273 while ((node = list_remove_head(&due_list_))) {
274 async_task_t* task = NodeToTask(node);
275 task->handler(nullptr, task, ZX_ERR_CANCELED);
276 task_invoked = true;
277 }
278 while ((node = list_remove_head(&task_list_))) {
279 async_task_t* task = NodeToTask(node);
280 task->handler(nullptr, task, ZX_ERR_CANCELED);
281 task_invoked = true;
282 }
283 return task_invoked;
284 }
285
286 } // namespace pw::async::test::backend
287