1 use std::future::Future;
2 use std::pin::Pin;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::task::{Context, Poll};
5 use std::thread;
6 use std::time::Duration;
7
8 use async_task::Runnable;
9 use atomic_waker::AtomicWaker;
10 use easy_parallel::Parallel;
11
12 // Creates a future with event counters.
13 //
14 // Usage: `future!(f, get_waker, POLL, DROP)`
15 //
16 // The future `f` always sleeps for 200 ms and returns `Poll::Pending`.
17 // When it gets polled, `POLL` is incremented.
18 // When it gets dropped, `DROP` is incremented.
19 //
20 // Every time the future is run, it stores the waker into a global variable.
21 // This waker can be extracted using the `get_waker()` function.
22 macro_rules! future {
23 ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => {
24 static $poll: AtomicUsize = AtomicUsize::new(0);
25 static $drop: AtomicUsize = AtomicUsize::new(0);
26 static WAKER: AtomicWaker = AtomicWaker::new();
27
28 let ($name, $get_waker) = {
29 struct Fut(#[allow(dead_code)] Box<i32>);
30
31 impl Future for Fut {
32 type Output = ();
33
34 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35 WAKER.register(cx.waker());
36 $poll.fetch_add(1, Ordering::SeqCst);
37 thread::sleep(ms(400));
38 Poll::Pending
39 }
40 }
41
42 impl Drop for Fut {
43 fn drop(&mut self) {
44 $drop.fetch_add(1, Ordering::SeqCst);
45 }
46 }
47
48 (Fut(Box::new(0)), || WAKER.take().unwrap())
49 };
50 };
51 }
52
53 // Creates a schedule function with event counters.
54 //
55 // Usage: `schedule!(s, chan, SCHED, DROP)`
56 //
57 // The schedule function `s` pushes the task into `chan`.
58 // When it gets invoked, `SCHED` is incremented.
59 // When it gets dropped, `DROP` is incremented.
60 //
61 // Receiver `chan` extracts the task when it is scheduled.
62 macro_rules! schedule {
63 ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
64 static $drop: AtomicUsize = AtomicUsize::new(0);
65 static $sched: AtomicUsize = AtomicUsize::new(0);
66
67 let ($name, $chan) = {
68 let (s, r) = flume::unbounded();
69
70 struct Guard(#[allow(dead_code)] Box<i32>);
71
72 impl Drop for Guard {
73 fn drop(&mut self) {
74 $drop.fetch_add(1, Ordering::SeqCst);
75 }
76 }
77
78 let guard = Guard(Box::new(0));
79 let sched = move |runnable: Runnable| {
80 let _ = &guard;
81 $sched.fetch_add(1, Ordering::SeqCst);
82 s.send(runnable).unwrap();
83 };
84
85 (sched, r)
86 };
87 };
88 }
89
ms(ms: u64) -> Duration90 fn ms(ms: u64) -> Duration {
91 Duration::from_millis(ms)
92 }
93
94 #[test]
wake_during_run()95 fn wake_during_run() {
96 future!(f, get_waker, POLL, DROP_F);
97 schedule!(s, chan, SCHEDULE, DROP_S);
98 let (runnable, _task) = async_task::spawn(f, s);
99
100 runnable.run();
101 let waker = get_waker();
102 waker.wake_by_ref();
103 let runnable = chan.recv().unwrap();
104
105 Parallel::new()
106 .add(|| {
107 runnable.run();
108 assert_eq!(POLL.load(Ordering::SeqCst), 2);
109 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 2);
110 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
111 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
112 assert_eq!(chan.len(), 1);
113 })
114 .add(|| {
115 thread::sleep(ms(200));
116
117 waker.wake_by_ref();
118 assert_eq!(POLL.load(Ordering::SeqCst), 2);
119 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
120 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
121 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
122 assert_eq!(chan.len(), 0);
123
124 thread::sleep(ms(400));
125
126 assert_eq!(POLL.load(Ordering::SeqCst), 2);
127 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 2);
128 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
129 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
130 assert_eq!(chan.len(), 1);
131 })
132 .run();
133
134 chan.recv().unwrap();
135 drop(get_waker());
136 }
137
138 #[test]
cancel_during_run()139 fn cancel_during_run() {
140 future!(f, get_waker, POLL, DROP_F);
141 schedule!(s, chan, SCHEDULE, DROP_S);
142 let (runnable, task) = async_task::spawn(f, s);
143
144 runnable.run();
145 let waker = get_waker();
146 waker.wake();
147 let runnable = chan.recv().unwrap();
148
149 Parallel::new()
150 .add(|| {
151 runnable.run();
152 drop(get_waker());
153 assert_eq!(POLL.load(Ordering::SeqCst), 2);
154 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
155 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
156 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
157 assert_eq!(chan.len(), 0);
158 })
159 .add(|| {
160 thread::sleep(ms(200));
161
162 drop(task);
163 assert_eq!(POLL.load(Ordering::SeqCst), 2);
164 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
165 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
166 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
167 assert_eq!(chan.len(), 0);
168
169 thread::sleep(ms(400));
170
171 assert_eq!(POLL.load(Ordering::SeqCst), 2);
172 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
173 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
174 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
175 assert_eq!(chan.len(), 0);
176 })
177 .run();
178 }
179
180 #[test]
wake_and_cancel_during_run()181 fn wake_and_cancel_during_run() {
182 future!(f, get_waker, POLL, DROP_F);
183 schedule!(s, chan, SCHEDULE, DROP_S);
184 let (runnable, task) = async_task::spawn(f, s);
185
186 runnable.run();
187 let waker = get_waker();
188 waker.wake_by_ref();
189 let runnable = chan.recv().unwrap();
190
191 Parallel::new()
192 .add(|| {
193 runnable.run();
194 drop(get_waker());
195 assert_eq!(POLL.load(Ordering::SeqCst), 2);
196 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
197 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
198 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
199 assert_eq!(chan.len(), 0);
200 })
201 .add(|| {
202 thread::sleep(ms(200));
203
204 waker.wake();
205 assert_eq!(POLL.load(Ordering::SeqCst), 2);
206 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
207 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
208 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
209 assert_eq!(chan.len(), 0);
210
211 drop(task);
212 assert_eq!(POLL.load(Ordering::SeqCst), 2);
213 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
214 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
215 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
216 assert_eq!(chan.len(), 0);
217
218 thread::sleep(ms(400));
219
220 assert_eq!(POLL.load(Ordering::SeqCst), 2);
221 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
222 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
223 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
224 assert_eq!(chan.len(), 0);
225 })
226 .run();
227 }
228
229 #[test]
cancel_and_wake_during_run()230 fn cancel_and_wake_during_run() {
231 future!(f, get_waker, POLL, DROP_F);
232 schedule!(s, chan, SCHEDULE, DROP_S);
233 let (runnable, task) = async_task::spawn(f, s);
234
235 runnable.run();
236 let waker = get_waker();
237 waker.wake_by_ref();
238 let runnable = chan.recv().unwrap();
239
240 Parallel::new()
241 .add(|| {
242 runnable.run();
243 drop(get_waker());
244 assert_eq!(POLL.load(Ordering::SeqCst), 2);
245 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
246 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
247 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
248 assert_eq!(chan.len(), 0);
249 })
250 .add(|| {
251 thread::sleep(ms(200));
252
253 drop(task);
254 assert_eq!(POLL.load(Ordering::SeqCst), 2);
255 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
256 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
257 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
258 assert_eq!(chan.len(), 0);
259
260 waker.wake();
261 assert_eq!(POLL.load(Ordering::SeqCst), 2);
262 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
263 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
264 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
265 assert_eq!(chan.len(), 0);
266
267 thread::sleep(ms(400));
268
269 assert_eq!(POLL.load(Ordering::SeqCst), 2);
270 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
271 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
272 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
273 assert_eq!(chan.len(), 0);
274 })
275 .run();
276 }
277
278 #[test]
drop_last_waker()279 fn drop_last_waker() {
280 future!(f, get_waker, POLL, DROP_F);
281 schedule!(s, chan, SCHEDULE, DROP_S);
282 let (runnable, task) = async_task::spawn(f, s);
283
284 runnable.run();
285 let waker = get_waker();
286
287 task.detach();
288 assert_eq!(POLL.load(Ordering::SeqCst), 1);
289 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
290 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
291 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
292 assert_eq!(chan.len(), 0);
293
294 drop(waker);
295 assert_eq!(POLL.load(Ordering::SeqCst), 1);
296 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
297 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
298 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
299 assert_eq!(chan.len(), 1);
300
301 chan.recv().unwrap().run();
302 assert_eq!(POLL.load(Ordering::SeqCst), 1);
303 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
304 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
305 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
306 assert_eq!(chan.len(), 0);
307 }
308
309 #[test]
cancel_last_task()310 fn cancel_last_task() {
311 future!(f, get_waker, POLL, DROP_F);
312 schedule!(s, chan, SCHEDULE, DROP_S);
313 let (runnable, task) = async_task::spawn(f, s);
314
315 runnable.run();
316 drop(get_waker());
317 assert_eq!(POLL.load(Ordering::SeqCst), 1);
318 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
319 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
320 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
321 assert_eq!(chan.len(), 0);
322
323 drop(task);
324 assert_eq!(POLL.load(Ordering::SeqCst), 1);
325 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
326 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
327 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
328 assert_eq!(chan.len(), 1);
329
330 chan.recv().unwrap().run();
331 assert_eq!(POLL.load(Ordering::SeqCst), 1);
332 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
333 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
334 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
335 assert_eq!(chan.len(), 0);
336 }
337
338 #[test]
drop_last_task()339 fn drop_last_task() {
340 future!(f, get_waker, POLL, DROP_F);
341 schedule!(s, chan, SCHEDULE, DROP_S);
342 let (runnable, task) = async_task::spawn(f, s);
343
344 runnable.run();
345 drop(get_waker());
346 assert_eq!(POLL.load(Ordering::SeqCst), 1);
347 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
348 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
349 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
350 assert_eq!(chan.len(), 0);
351
352 task.detach();
353 assert_eq!(POLL.load(Ordering::SeqCst), 1);
354 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
355 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
356 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
357 assert_eq!(chan.len(), 1);
358
359 chan.recv().unwrap().run();
360 assert_eq!(POLL.load(Ordering::SeqCst), 1);
361 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
362 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
363 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
364 assert_eq!(chan.len(), 0);
365 }
366