1 use std::future::Future;
2 use std::pin::Pin;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::task::{Context, Poll};
5 use std::thread;
6 use std::time::Duration;
7 
8 use async_task::Runnable;
9 use atomic_waker::AtomicWaker;
10 use easy_parallel::Parallel;
11 
12 // Creates a future with event counters.
13 //
14 // Usage: `future!(f, get_waker, POLL, DROP)`
15 //
16 // The future `f` always sleeps for 200 ms and returns `Poll::Pending`.
17 // When it gets polled, `POLL` is incremented.
18 // When it gets dropped, `DROP` is incremented.
19 //
20 // Every time the future is run, it stores the waker into a global variable.
21 // This waker can be extracted using the `get_waker()` function.
22 macro_rules! future {
23     ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => {
24         static $poll: AtomicUsize = AtomicUsize::new(0);
25         static $drop: AtomicUsize = AtomicUsize::new(0);
26         static WAKER: AtomicWaker = AtomicWaker::new();
27 
28         let ($name, $get_waker) = {
29             struct Fut(#[allow(dead_code)] Box<i32>);
30 
31             impl Future for Fut {
32                 type Output = ();
33 
34                 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35                     WAKER.register(cx.waker());
36                     $poll.fetch_add(1, Ordering::SeqCst);
37                     thread::sleep(ms(400));
38                     Poll::Pending
39                 }
40             }
41 
42             impl Drop for Fut {
43                 fn drop(&mut self) {
44                     $drop.fetch_add(1, Ordering::SeqCst);
45                 }
46             }
47 
48             (Fut(Box::new(0)), || WAKER.take().unwrap())
49         };
50     };
51 }
52 
53 // Creates a schedule function with event counters.
54 //
55 // Usage: `schedule!(s, chan, SCHED, DROP)`
56 //
57 // The schedule function `s` pushes the task into `chan`.
58 // When it gets invoked, `SCHED` is incremented.
59 // When it gets dropped, `DROP` is incremented.
60 //
61 // Receiver `chan` extracts the task when it is scheduled.
62 macro_rules! schedule {
63     ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
64         static $drop: AtomicUsize = AtomicUsize::new(0);
65         static $sched: AtomicUsize = AtomicUsize::new(0);
66 
67         let ($name, $chan) = {
68             let (s, r) = flume::unbounded();
69 
70             struct Guard(#[allow(dead_code)] Box<i32>);
71 
72             impl Drop for Guard {
73                 fn drop(&mut self) {
74                     $drop.fetch_add(1, Ordering::SeqCst);
75                 }
76             }
77 
78             let guard = Guard(Box::new(0));
79             let sched = move |runnable: Runnable| {
80                 let _ = &guard;
81                 $sched.fetch_add(1, Ordering::SeqCst);
82                 s.send(runnable).unwrap();
83             };
84 
85             (sched, r)
86         };
87     };
88 }
89 
ms(ms: u64) -> Duration90 fn ms(ms: u64) -> Duration {
91     Duration::from_millis(ms)
92 }
93 
94 #[test]
wake_during_run()95 fn wake_during_run() {
96     future!(f, get_waker, POLL, DROP_F);
97     schedule!(s, chan, SCHEDULE, DROP_S);
98     let (runnable, _task) = async_task::spawn(f, s);
99 
100     runnable.run();
101     let waker = get_waker();
102     waker.wake_by_ref();
103     let runnable = chan.recv().unwrap();
104 
105     Parallel::new()
106         .add(|| {
107             runnable.run();
108             assert_eq!(POLL.load(Ordering::SeqCst), 2);
109             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 2);
110             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
111             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
112             assert_eq!(chan.len(), 1);
113         })
114         .add(|| {
115             thread::sleep(ms(200));
116 
117             waker.wake_by_ref();
118             assert_eq!(POLL.load(Ordering::SeqCst), 2);
119             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
120             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
121             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
122             assert_eq!(chan.len(), 0);
123 
124             thread::sleep(ms(400));
125 
126             assert_eq!(POLL.load(Ordering::SeqCst), 2);
127             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 2);
128             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
129             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
130             assert_eq!(chan.len(), 1);
131         })
132         .run();
133 
134     chan.recv().unwrap();
135     drop(get_waker());
136 }
137 
138 #[test]
cancel_during_run()139 fn cancel_during_run() {
140     future!(f, get_waker, POLL, DROP_F);
141     schedule!(s, chan, SCHEDULE, DROP_S);
142     let (runnable, task) = async_task::spawn(f, s);
143 
144     runnable.run();
145     let waker = get_waker();
146     waker.wake();
147     let runnable = chan.recv().unwrap();
148 
149     Parallel::new()
150         .add(|| {
151             runnable.run();
152             drop(get_waker());
153             assert_eq!(POLL.load(Ordering::SeqCst), 2);
154             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
155             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
156             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
157             assert_eq!(chan.len(), 0);
158         })
159         .add(|| {
160             thread::sleep(ms(200));
161 
162             drop(task);
163             assert_eq!(POLL.load(Ordering::SeqCst), 2);
164             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
165             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
166             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
167             assert_eq!(chan.len(), 0);
168 
169             thread::sleep(ms(400));
170 
171             assert_eq!(POLL.load(Ordering::SeqCst), 2);
172             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
173             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
174             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
175             assert_eq!(chan.len(), 0);
176         })
177         .run();
178 }
179 
180 #[test]
wake_and_cancel_during_run()181 fn wake_and_cancel_during_run() {
182     future!(f, get_waker, POLL, DROP_F);
183     schedule!(s, chan, SCHEDULE, DROP_S);
184     let (runnable, task) = async_task::spawn(f, s);
185 
186     runnable.run();
187     let waker = get_waker();
188     waker.wake_by_ref();
189     let runnable = chan.recv().unwrap();
190 
191     Parallel::new()
192         .add(|| {
193             runnable.run();
194             drop(get_waker());
195             assert_eq!(POLL.load(Ordering::SeqCst), 2);
196             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
197             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
198             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
199             assert_eq!(chan.len(), 0);
200         })
201         .add(|| {
202             thread::sleep(ms(200));
203 
204             waker.wake();
205             assert_eq!(POLL.load(Ordering::SeqCst), 2);
206             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
207             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
208             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
209             assert_eq!(chan.len(), 0);
210 
211             drop(task);
212             assert_eq!(POLL.load(Ordering::SeqCst), 2);
213             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
214             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
215             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
216             assert_eq!(chan.len(), 0);
217 
218             thread::sleep(ms(400));
219 
220             assert_eq!(POLL.load(Ordering::SeqCst), 2);
221             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
222             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
223             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
224             assert_eq!(chan.len(), 0);
225         })
226         .run();
227 }
228 
229 #[test]
cancel_and_wake_during_run()230 fn cancel_and_wake_during_run() {
231     future!(f, get_waker, POLL, DROP_F);
232     schedule!(s, chan, SCHEDULE, DROP_S);
233     let (runnable, task) = async_task::spawn(f, s);
234 
235     runnable.run();
236     let waker = get_waker();
237     waker.wake_by_ref();
238     let runnable = chan.recv().unwrap();
239 
240     Parallel::new()
241         .add(|| {
242             runnable.run();
243             drop(get_waker());
244             assert_eq!(POLL.load(Ordering::SeqCst), 2);
245             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
246             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
247             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
248             assert_eq!(chan.len(), 0);
249         })
250         .add(|| {
251             thread::sleep(ms(200));
252 
253             drop(task);
254             assert_eq!(POLL.load(Ordering::SeqCst), 2);
255             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
256             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
257             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
258             assert_eq!(chan.len(), 0);
259 
260             waker.wake();
261             assert_eq!(POLL.load(Ordering::SeqCst), 2);
262             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
263             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
264             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
265             assert_eq!(chan.len(), 0);
266 
267             thread::sleep(ms(400));
268 
269             assert_eq!(POLL.load(Ordering::SeqCst), 2);
270             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
271             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
272             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
273             assert_eq!(chan.len(), 0);
274         })
275         .run();
276 }
277 
278 #[test]
drop_last_waker()279 fn drop_last_waker() {
280     future!(f, get_waker, POLL, DROP_F);
281     schedule!(s, chan, SCHEDULE, DROP_S);
282     let (runnable, task) = async_task::spawn(f, s);
283 
284     runnable.run();
285     let waker = get_waker();
286 
287     task.detach();
288     assert_eq!(POLL.load(Ordering::SeqCst), 1);
289     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
290     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
291     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
292     assert_eq!(chan.len(), 0);
293 
294     drop(waker);
295     assert_eq!(POLL.load(Ordering::SeqCst), 1);
296     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
297     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
298     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
299     assert_eq!(chan.len(), 1);
300 
301     chan.recv().unwrap().run();
302     assert_eq!(POLL.load(Ordering::SeqCst), 1);
303     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
304     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
305     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
306     assert_eq!(chan.len(), 0);
307 }
308 
309 #[test]
cancel_last_task()310 fn cancel_last_task() {
311     future!(f, get_waker, POLL, DROP_F);
312     schedule!(s, chan, SCHEDULE, DROP_S);
313     let (runnable, task) = async_task::spawn(f, s);
314 
315     runnable.run();
316     drop(get_waker());
317     assert_eq!(POLL.load(Ordering::SeqCst), 1);
318     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
319     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
320     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
321     assert_eq!(chan.len(), 0);
322 
323     drop(task);
324     assert_eq!(POLL.load(Ordering::SeqCst), 1);
325     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
326     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
327     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
328     assert_eq!(chan.len(), 1);
329 
330     chan.recv().unwrap().run();
331     assert_eq!(POLL.load(Ordering::SeqCst), 1);
332     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
333     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
334     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
335     assert_eq!(chan.len(), 0);
336 }
337 
338 #[test]
drop_last_task()339 fn drop_last_task() {
340     future!(f, get_waker, POLL, DROP_F);
341     schedule!(s, chan, SCHEDULE, DROP_S);
342     let (runnable, task) = async_task::spawn(f, s);
343 
344     runnable.run();
345     drop(get_waker());
346     assert_eq!(POLL.load(Ordering::SeqCst), 1);
347     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
348     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
349     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
350     assert_eq!(chan.len(), 0);
351 
352     task.detach();
353     assert_eq!(POLL.load(Ordering::SeqCst), 1);
354     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
355     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
356     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
357     assert_eq!(chan.len(), 1);
358 
359     chan.recv().unwrap().run();
360     assert_eq!(POLL.load(Ordering::SeqCst), 1);
361     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
362     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
363     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
364     assert_eq!(chan.len(), 0);
365 }
366