xref: /aosp_15_r20/external/pigweed/pw_async_fuchsia/fake_dispatcher.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_async_fuchsia/fake_dispatcher.h"
16 
17 #include "pw_async_fuchsia/util.h"
18 
19 namespace pw::async::test::backend {
20 
NativeFakeDispatcher(Dispatcher & test_dispatcher)21 NativeFakeDispatcher::NativeFakeDispatcher(Dispatcher& test_dispatcher)
22     : dispatcher_(test_dispatcher) {}
23 
DestroyLoop()24 bool NativeFakeDispatcher::DestroyLoop() { return fake_loop_.Shutdown(); }
25 
now()26 chrono::SystemClock::time_point NativeFakeDispatcher::now() {
27   return fake_loop_.Now();
28 }
29 
Post(Task & task)30 void NativeFakeDispatcher::Post(Task& task) { PostAt(task, now()); }
31 
PostAfter(Task & task,chrono::SystemClock::duration delay)32 void NativeFakeDispatcher::PostAfter(Task& task,
33                                      chrono::SystemClock::duration delay) {
34   PostAt(task, now() + delay);
35 }
36 
PostAt(Task & task,chrono::SystemClock::time_point time)37 void NativeFakeDispatcher::PostAt(Task& task,
38                                   chrono::SystemClock::time_point time) {
39   // TODO: https://fxbug.dev/42075952 - Return errors once these methods return
40   // a Status.
41   if (!fake_loop_.Runnable()) {
42     Context ctx{.dispatcher = &dispatcher_, .task = &task};
43     task(ctx, Status::Cancelled());
44     return;
45   }
46   ::pw::async::backend::NativeTask& native_task = task.native_type();
47   native_task.set_due_time(time);
48   native_task.dispatcher_ = &dispatcher_;
49   fake_loop_.PostTask(&native_task);
50 }
51 
Cancel(Task & task)52 bool NativeFakeDispatcher::Cancel(Task& task) {
53   return fake_loop_.Runnable() &&
54          fake_loop_.CancelTask(&task.native_type()) == ZX_OK;
55 }
56 
RunUntilIdle()57 bool NativeFakeDispatcher::RunUntilIdle() {
58   if (stop_requested_) {
59     return DestroyLoop();
60   }
61   return fake_loop_.RunUntilIdle();
62 }
63 
RunUntil(chrono::SystemClock::time_point end_time)64 bool NativeFakeDispatcher::RunUntil(chrono::SystemClock::time_point end_time) {
65   if (stop_requested_) {
66     return DestroyLoop();
67   }
68   return fake_loop_.Run(pw::async_fuchsia::TimepointToZxTime(end_time).get(),
69                         false);
70 }
71 
RunFor(chrono::SystemClock::duration duration)72 bool NativeFakeDispatcher::RunFor(chrono::SystemClock::duration duration) {
73   return RunUntil(now() + duration);
74 }
75 
FakeAsyncLoop()76 NativeFakeDispatcher::FakeAsyncLoop::FakeAsyncLoop() {
77   list_initialize(&task_list_);
78   list_initialize(&due_list_);
79 }
80 
Now() const81 chrono::SystemClock::time_point NativeFakeDispatcher::FakeAsyncLoop::Now()
82     const {
83   return pw::async_fuchsia::ZxTimeToTimepoint(zx::time{now_});
84 }
85 
PostTask(async_task_t * task)86 zx_status_t NativeFakeDispatcher::FakeAsyncLoop::PostTask(async_task_t* task) {
87   if (state_ == ASYNC_LOOP_SHUTDOWN) {
88     return ZX_ERR_BAD_STATE;
89   }
90 
91   InsertTask(task);
92   if (!dispatching_tasks_ && TaskToNode(task)->prev == &task_list_) {
93     // Task inserted at head.  Earliest deadline changed.
94     RestartTimer();
95   }
96 
97   return ZX_OK;
98 }
99 
CancelTask(async_task_t * task)100 zx_status_t NativeFakeDispatcher::FakeAsyncLoop::CancelTask(
101     async_task_t* task) {
102   // Note: We need to process cancellations even while the loop is being
103   // destroyed in case the client is counting on the handler not being
104   // invoked again past this point.  Also, the task we're removing here
105   // might be present in the dispatcher's |due_list| if it is pending
106   // dispatch instead of in the loop's |task_list| as usual.  The same
107   // logic works in both cases.
108 
109   list_node_t* node = TaskToNode(task);
110   if (!list_in_list(node)) {
111     return ZX_ERR_NOT_FOUND;
112   }
113 
114   // Determine whether the head task was canceled and following task has
115   // a later deadline.  If so, we will bump the timer along to that deadline.
116   bool must_restart = !dispatching_tasks_ && node->prev == &task_list_ &&
117                       (node->next == &task_list_ ||
118                        NodeToTask(node->next)->deadline > task->deadline);
119   list_delete(node);
120   if (must_restart) {
121     RestartTimer();
122   }
123 
124   return ZX_OK;
125 }
126 
RunUntilIdle()127 bool NativeFakeDispatcher::FakeAsyncLoop::RunUntilIdle() {
128   return Run(now_, false);
129 }
130 
Run(zx_time_t deadline,bool once)131 bool NativeFakeDispatcher::FakeAsyncLoop::Run(zx_time_t deadline, bool once) {
132   zx_status_t status;
133   bool task_invoked = false;
134   do {
135     status = RunOnce(deadline, &task_invoked);
136   } while (status == ZX_OK && !once);
137   return task_invoked;
138 }
139 
InsertTask(async_task_t * task)140 void NativeFakeDispatcher::FakeAsyncLoop::InsertTask(async_task_t* task) {
141   list_node_t* node;
142   for (node = task_list_.prev; node != &task_list_; node = node->prev) {
143     if (task->deadline >= NodeToTask(node)->deadline) {
144       break;
145     }
146   }
147   list_add_after(node, TaskToNode(task));
148 }
149 
RestartTimer()150 void NativeFakeDispatcher::FakeAsyncLoop::RestartTimer() {
151   zx_time_t deadline = NextDeadline();
152 
153   if (deadline == ZX_TIME_INFINITE) {
154     // Nothing is left on the queue to fire.
155     if (timer_armed_) {
156       next_timer_expiration_ =
157           ZX_TIME_INFINITE;  // Simulate timer cancellation.
158       timer_armed_ = false;
159     }
160     return;
161   }
162 
163   next_timer_expiration_ = deadline;
164 
165   if (!timer_armed_) {
166     timer_armed_ = true;
167   }
168 }
169 
NextDeadline()170 zx_time_t NativeFakeDispatcher::FakeAsyncLoop::NextDeadline() {
171   if (list_is_empty(&due_list_)) {
172     list_node_t* head = list_peek_head(&task_list_);
173     if (!head) {
174       return ZX_TIME_INFINITE;
175     }
176     async_task_t* task = NodeToTask(head);
177     return task->deadline;
178   }
179   // Fire now.
180   return 0ULL;
181 }
182 
RunOnce(zx_time_t deadline,bool * task_invoked)183 zx_status_t NativeFakeDispatcher::FakeAsyncLoop::RunOnce(zx_time_t deadline,
184                                                          bool* task_invoked) {
185   if (state_ == ASYNC_LOOP_SHUTDOWN) {
186     return ZX_ERR_BAD_STATE;
187   }
188   if (state_ != ASYNC_LOOP_RUNNABLE) {
189     return ZX_ERR_CANCELED;
190   }
191 
192   // Simulate timeout of zx_port_wait() syscall.
193   if (deadline < next_timer_expiration_) {
194     now_ = deadline;
195     return ZX_ERR_TIMED_OUT;
196   }
197   // Otherwise, a timer would have expired at or before `deadline`.
198   now_ = next_timer_expiration_;
199   next_timer_expiration_ = ZX_TIME_INFINITE;
200   *task_invoked |= DispatchTasks();
201   return ZX_OK;
202 }
203 
DispatchTasks()204 bool NativeFakeDispatcher::FakeAsyncLoop::DispatchTasks() {
205   bool task_invoked = false;
206   // Dequeue and dispatch one task at a time in case an earlier task wants
207   // to cancel a later task which has also come due. Timer restarts are
208   // suppressed until we run out of tasks to dispatch.
209   if (!dispatching_tasks_) {
210     dispatching_tasks_ = true;
211 
212     // Extract all of the tasks that are due into |due_list| for dispatch
213     // unless we already have some waiting from a previous iteration which
214     // we would like to process in order.
215     list_node_t* node;
216     if (list_is_empty(&due_list_)) {
217       zx_time_t due_time = now_;
218       list_node_t* tail = nullptr;
219       list_for_every(&task_list_, node) {
220         if (NodeToTask(node)->deadline > due_time) {
221           break;
222         }
223         tail = node;
224       }
225       if (tail) {
226         list_node_t* head = task_list_.next;
227         task_list_.next = tail->next;
228         tail->next->prev = &task_list_;
229         due_list_.next = head;
230         head->prev = &due_list_;
231         due_list_.prev = tail;
232         tail->next = &due_list_;
233       }
234     }
235 
236     // Dispatch all due tasks.
237     while ((node = list_remove_head(&due_list_))) {
238       // Invoke the handler.  Note that it might destroy itself.
239       async_task_t* task = NodeToTask(node);
240 
241       task->handler(nullptr, task, ZX_OK);
242       task_invoked = true;
243 
244       if (state_ != ASYNC_LOOP_RUNNABLE) {
245         break;
246       }
247     }
248 
249     dispatching_tasks_ = false;
250     timer_armed_ = false;
251     RestartTimer();
252   }
253 
254   return task_invoked;
255 }
256 
Shutdown()257 bool NativeFakeDispatcher::FakeAsyncLoop::Shutdown() {
258   if (state_ == ASYNC_LOOP_SHUTDOWN) {
259     return false;
260   }
261   state_ = ASYNC_LOOP_SHUTDOWN;
262 
263   // Cancel any remaining pending tasks on our queues.
264   return CancelAll();
265 }
266 
CancelAll()267 bool NativeFakeDispatcher::FakeAsyncLoop::CancelAll() {
268   ZX_DEBUG_ASSERT(state_ == ASYNC_LOOP_SHUTDOWN);
269   bool task_invoked = false;
270 
271   list_node_t* node;
272 
273   while ((node = list_remove_head(&due_list_))) {
274     async_task_t* task = NodeToTask(node);
275     task->handler(nullptr, task, ZX_ERR_CANCELED);
276     task_invoked = true;
277   }
278   while ((node = list_remove_head(&task_list_))) {
279     async_task_t* task = NodeToTask(node);
280     task->handler(nullptr, task, ZX_ERR_CANCELED);
281     task_invoked = true;
282   }
283   return task_invoked;
284 }
285 
286 }  // namespace pw::async::test::backend
287