xref: /aosp_15_r20/external/pigweed/pw_async_basic/dispatcher_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #include "pw_async_basic/dispatcher.h"
15 
16 #include <vector>
17 
18 #include "pw_chrono/system_clock.h"
19 #include "pw_log/log.h"
20 #include "pw_sync/thread_notification.h"
21 #include "pw_thread/thread.h"
22 #include "pw_thread_stl/options.h"
23 #include "pw_unit_test/framework.h"
24 
25 #define ASSERT_CANCELLED(status) ASSERT_EQ(Status::Cancelled(), status)
26 
27 using namespace std::chrono_literals;
28 
29 namespace pw::async {
30 namespace {
31 
32 // Lambdas can only capture one ptr worth of memory without allocating, so we
33 // group the data we want to share between tasks and their containing tests
34 // inside one struct.
35 struct TestPrimitives {
36   int count = 0;
37   sync::ThreadNotification notification;
38 };
39 
TEST(DispatcherBasic,PostTasks)40 TEST(DispatcherBasic, PostTasks) {
41   BasicDispatcher dispatcher;
42   Thread work_thread(thread::stl::Options(), dispatcher);
43 
44   TestPrimitives tp;
45   auto inc_count = [&tp]([[maybe_unused]] Context& c, Status status) {
46     PW_TEST_ASSERT_OK(status);
47     ++tp.count;
48   };
49 
50   Task task(inc_count);
51   dispatcher.Post(task);
52 
53   Task task2(inc_count);
54   dispatcher.Post(task2);
55 
56   Task task3([&tp]([[maybe_unused]] Context& c, Status status) {
57     PW_TEST_ASSERT_OK(status);
58     ++tp.count;
59     tp.notification.release();
60   });
61   dispatcher.Post(task3);
62 
63   tp.notification.acquire();
64   dispatcher.RequestStop();
65   work_thread.join();
66   ASSERT_EQ(tp.count, 3);
67 }
68 
TEST(DispatcherBasic,ChainedTasks)69 TEST(DispatcherBasic, ChainedTasks) {
70   BasicDispatcher dispatcher;
71   Thread work_thread(thread::stl::Options(), dispatcher);
72 
73   sync::ThreadNotification notification;
74   Task task1([&notification]([[maybe_unused]] Context& c, Status status) {
75     PW_TEST_ASSERT_OK(status);
76     notification.release();
77   });
78 
79   Task task2([&task1](Context& c, Status status) {
80     PW_TEST_ASSERT_OK(status);
81     c.dispatcher->Post(task1);
82   });
83 
84   Task task3([&task2](Context& c, Status status) {
85     PW_TEST_ASSERT_OK(status);
86     c.dispatcher->Post(task2);
87   });
88   dispatcher.Post(task3);
89 
90   notification.acquire();
91   dispatcher.RequestStop();
92   work_thread.join();
93 }
94 
TEST(DispatcherBasic,TaskOrdering)95 TEST(DispatcherBasic, TaskOrdering) {
96   struct TestState {
97     std::vector<int> tasks;
98     sync::ThreadNotification notification;
99   };
100 
101   BasicDispatcher dispatcher;
102   Thread work_thread(thread::stl::Options(), dispatcher);
103   TestState state;
104 
105   Task task1([&state](Context&, Status status) {
106     PW_TEST_ASSERT_OK(status);
107     state.tasks.push_back(1);
108   });
109 
110   Task task2([&state](Context&, Status status) {
111     PW_TEST_ASSERT_OK(status);
112     state.tasks.push_back(2);
113     state.notification.release();
114   });
115 
116   // Task posted at same time should be ordered FIFO.
117   auto due_time = chrono::SystemClock::now();
118   dispatcher.PostAt(task1, due_time);
119   dispatcher.PostAt(task2, due_time);
120 
121   state.notification.acquire();
122   dispatcher.RequestStop();
123   work_thread.join();
124 
125   ASSERT_EQ(state.tasks.size(), 2U);
126   EXPECT_EQ(state.tasks[0], 1);
127   EXPECT_EQ(state.tasks[1], 2);
128 }
129 
130 // Test RequestStop() from inside task.
TEST(DispatcherBasic,RequestStopInsideTask)131 TEST(DispatcherBasic, RequestStopInsideTask) {
132   BasicDispatcher dispatcher;
133   Thread work_thread(thread::stl::Options(), dispatcher);
134 
135   int count = 0;
136   auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
137     ASSERT_CANCELLED(status);
138     ++count;
139   };
140 
141   // These tasks are never executed and cleaned up in RequestStop().
142   Task task0(inc_count), task1(inc_count);
143   dispatcher.PostAfter(task0, 20ms);
144   dispatcher.PostAfter(task1, 21ms);
145 
146   Task stop_task([&count]([[maybe_unused]] Context& c, Status status) {
147     PW_TEST_ASSERT_OK(status);
148     ++count;
149     static_cast<BasicDispatcher*>(c.dispatcher)->RequestStop();
150   });
151   dispatcher.Post(stop_task);
152 
153   work_thread.join();
154   ASSERT_EQ(count, 3);
155 }
156 
TEST(DispatcherBasic,TasksCancelledByRequestStopInDifferentThread)157 TEST(DispatcherBasic, TasksCancelledByRequestStopInDifferentThread) {
158   BasicDispatcher dispatcher;
159   Thread work_thread(thread::stl::Options(), dispatcher);
160 
161   int count = 0;
162   auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
163     ASSERT_CANCELLED(status);
164     ++count;
165   };
166 
167   Task task0(inc_count), task1(inc_count), task2(inc_count);
168   dispatcher.PostAfter(task0, 10s);
169   dispatcher.PostAfter(task1, 10s);
170   dispatcher.PostAfter(task2, 10s);
171 
172   dispatcher.RequestStop();
173   work_thread.join();
174   ASSERT_EQ(count, 3);
175 }
176 
TEST(DispatcherBasic,TasksCancelledByDispatcherDestructor)177 TEST(DispatcherBasic, TasksCancelledByDispatcherDestructor) {
178   int count = 0;
179   auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
180     ASSERT_CANCELLED(status);
181     ++count;
182   };
183   Task task0(inc_count), task1(inc_count), task2(inc_count);
184 
185   {
186     BasicDispatcher dispatcher;
187     dispatcher.PostAfter(task0, 10s);
188     dispatcher.PostAfter(task1, 10s);
189     dispatcher.PostAfter(task2, 10s);
190   }
191 
192   ASSERT_EQ(count, 3);
193 }
194 
TEST(DispatcherBasic,TasksCancelledByRunUntilIdle)195 TEST(DispatcherBasic, TasksCancelledByRunUntilIdle) {
196   int count = 0;
197   auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
198     ASSERT_CANCELLED(status);
199     ++count;
200   };
201   Task task0(inc_count), task1(inc_count), task2(inc_count);
202 
203   BasicDispatcher dispatcher;
204   dispatcher.PostAfter(task0, 10s);
205   dispatcher.PostAfter(task1, 10s);
206   dispatcher.PostAfter(task2, 10s);
207 
208   dispatcher.RequestStop();
209   dispatcher.RunUntilIdle();
210   ASSERT_EQ(count, 3);
211 }
212 
TEST(DispatcherBasic,TasksCancelledByRunFor)213 TEST(DispatcherBasic, TasksCancelledByRunFor) {
214   int count = 0;
215   auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
216     ASSERT_CANCELLED(status);
217     ++count;
218   };
219   Task task0(inc_count), task1(inc_count), task2(inc_count);
220 
221   BasicDispatcher dispatcher;
222   dispatcher.PostAfter(task0, 10s);
223   dispatcher.PostAfter(task1, 10s);
224   dispatcher.PostAfter(task2, 10s);
225 
226   dispatcher.RequestStop();
227   dispatcher.RunFor(5s);
228   ASSERT_EQ(count, 3);
229 }
230 
231 }  // namespace
232 }  // namespace pw::async
233