1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #include "pw_async_basic/dispatcher.h"
15
16 #include <vector>
17
18 #include "pw_chrono/system_clock.h"
19 #include "pw_log/log.h"
20 #include "pw_sync/thread_notification.h"
21 #include "pw_thread/thread.h"
22 #include "pw_thread_stl/options.h"
23 #include "pw_unit_test/framework.h"
24
25 #define ASSERT_CANCELLED(status) ASSERT_EQ(Status::Cancelled(), status)
26
27 using namespace std::chrono_literals;
28
29 namespace pw::async {
30 namespace {
31
32 // Lambdas can only capture one ptr worth of memory without allocating, so we
33 // group the data we want to share between tasks and their containing tests
34 // inside one struct.
35 struct TestPrimitives {
36 int count = 0;
37 sync::ThreadNotification notification;
38 };
39
TEST(DispatcherBasic,PostTasks)40 TEST(DispatcherBasic, PostTasks) {
41 BasicDispatcher dispatcher;
42 Thread work_thread(thread::stl::Options(), dispatcher);
43
44 TestPrimitives tp;
45 auto inc_count = [&tp]([[maybe_unused]] Context& c, Status status) {
46 PW_TEST_ASSERT_OK(status);
47 ++tp.count;
48 };
49
50 Task task(inc_count);
51 dispatcher.Post(task);
52
53 Task task2(inc_count);
54 dispatcher.Post(task2);
55
56 Task task3([&tp]([[maybe_unused]] Context& c, Status status) {
57 PW_TEST_ASSERT_OK(status);
58 ++tp.count;
59 tp.notification.release();
60 });
61 dispatcher.Post(task3);
62
63 tp.notification.acquire();
64 dispatcher.RequestStop();
65 work_thread.join();
66 ASSERT_EQ(tp.count, 3);
67 }
68
TEST(DispatcherBasic,ChainedTasks)69 TEST(DispatcherBasic, ChainedTasks) {
70 BasicDispatcher dispatcher;
71 Thread work_thread(thread::stl::Options(), dispatcher);
72
73 sync::ThreadNotification notification;
74 Task task1([¬ification]([[maybe_unused]] Context& c, Status status) {
75 PW_TEST_ASSERT_OK(status);
76 notification.release();
77 });
78
79 Task task2([&task1](Context& c, Status status) {
80 PW_TEST_ASSERT_OK(status);
81 c.dispatcher->Post(task1);
82 });
83
84 Task task3([&task2](Context& c, Status status) {
85 PW_TEST_ASSERT_OK(status);
86 c.dispatcher->Post(task2);
87 });
88 dispatcher.Post(task3);
89
90 notification.acquire();
91 dispatcher.RequestStop();
92 work_thread.join();
93 }
94
TEST(DispatcherBasic,TaskOrdering)95 TEST(DispatcherBasic, TaskOrdering) {
96 struct TestState {
97 std::vector<int> tasks;
98 sync::ThreadNotification notification;
99 };
100
101 BasicDispatcher dispatcher;
102 Thread work_thread(thread::stl::Options(), dispatcher);
103 TestState state;
104
105 Task task1([&state](Context&, Status status) {
106 PW_TEST_ASSERT_OK(status);
107 state.tasks.push_back(1);
108 });
109
110 Task task2([&state](Context&, Status status) {
111 PW_TEST_ASSERT_OK(status);
112 state.tasks.push_back(2);
113 state.notification.release();
114 });
115
116 // Task posted at same time should be ordered FIFO.
117 auto due_time = chrono::SystemClock::now();
118 dispatcher.PostAt(task1, due_time);
119 dispatcher.PostAt(task2, due_time);
120
121 state.notification.acquire();
122 dispatcher.RequestStop();
123 work_thread.join();
124
125 ASSERT_EQ(state.tasks.size(), 2U);
126 EXPECT_EQ(state.tasks[0], 1);
127 EXPECT_EQ(state.tasks[1], 2);
128 }
129
130 // Test RequestStop() from inside task.
TEST(DispatcherBasic,RequestStopInsideTask)131 TEST(DispatcherBasic, RequestStopInsideTask) {
132 BasicDispatcher dispatcher;
133 Thread work_thread(thread::stl::Options(), dispatcher);
134
135 int count = 0;
136 auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
137 ASSERT_CANCELLED(status);
138 ++count;
139 };
140
141 // These tasks are never executed and cleaned up in RequestStop().
142 Task task0(inc_count), task1(inc_count);
143 dispatcher.PostAfter(task0, 20ms);
144 dispatcher.PostAfter(task1, 21ms);
145
146 Task stop_task([&count]([[maybe_unused]] Context& c, Status status) {
147 PW_TEST_ASSERT_OK(status);
148 ++count;
149 static_cast<BasicDispatcher*>(c.dispatcher)->RequestStop();
150 });
151 dispatcher.Post(stop_task);
152
153 work_thread.join();
154 ASSERT_EQ(count, 3);
155 }
156
TEST(DispatcherBasic,TasksCancelledByRequestStopInDifferentThread)157 TEST(DispatcherBasic, TasksCancelledByRequestStopInDifferentThread) {
158 BasicDispatcher dispatcher;
159 Thread work_thread(thread::stl::Options(), dispatcher);
160
161 int count = 0;
162 auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
163 ASSERT_CANCELLED(status);
164 ++count;
165 };
166
167 Task task0(inc_count), task1(inc_count), task2(inc_count);
168 dispatcher.PostAfter(task0, 10s);
169 dispatcher.PostAfter(task1, 10s);
170 dispatcher.PostAfter(task2, 10s);
171
172 dispatcher.RequestStop();
173 work_thread.join();
174 ASSERT_EQ(count, 3);
175 }
176
TEST(DispatcherBasic,TasksCancelledByDispatcherDestructor)177 TEST(DispatcherBasic, TasksCancelledByDispatcherDestructor) {
178 int count = 0;
179 auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
180 ASSERT_CANCELLED(status);
181 ++count;
182 };
183 Task task0(inc_count), task1(inc_count), task2(inc_count);
184
185 {
186 BasicDispatcher dispatcher;
187 dispatcher.PostAfter(task0, 10s);
188 dispatcher.PostAfter(task1, 10s);
189 dispatcher.PostAfter(task2, 10s);
190 }
191
192 ASSERT_EQ(count, 3);
193 }
194
TEST(DispatcherBasic,TasksCancelledByRunUntilIdle)195 TEST(DispatcherBasic, TasksCancelledByRunUntilIdle) {
196 int count = 0;
197 auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
198 ASSERT_CANCELLED(status);
199 ++count;
200 };
201 Task task0(inc_count), task1(inc_count), task2(inc_count);
202
203 BasicDispatcher dispatcher;
204 dispatcher.PostAfter(task0, 10s);
205 dispatcher.PostAfter(task1, 10s);
206 dispatcher.PostAfter(task2, 10s);
207
208 dispatcher.RequestStop();
209 dispatcher.RunUntilIdle();
210 ASSERT_EQ(count, 3);
211 }
212
TEST(DispatcherBasic,TasksCancelledByRunFor)213 TEST(DispatcherBasic, TasksCancelledByRunFor) {
214 int count = 0;
215 auto inc_count = [&count]([[maybe_unused]] Context& c, Status status) {
216 ASSERT_CANCELLED(status);
217 ++count;
218 };
219 Task task0(inc_count), task1(inc_count), task2(inc_count);
220
221 BasicDispatcher dispatcher;
222 dispatcher.PostAfter(task0, 10s);
223 dispatcher.PostAfter(task1, 10s);
224 dispatcher.PostAfter(task2, 10s);
225
226 dispatcher.RequestStop();
227 dispatcher.RunFor(5s);
228 ASSERT_EQ(count, 3);
229 }
230
231 } // namespace
232 } // namespace pw::async
233