1 use std::cell::Cell;
2 use std::future::Future;
3 use std::panic::{catch_unwind, AssertUnwindSafe};
4 use std::pin::Pin;
5 use std::sync::atomic::{AtomicUsize, Ordering};
6 use std::task::{Context, Poll};
7 use std::thread;
8 use std::time::Duration;
9 
10 use async_task::Runnable;
11 use atomic_waker::AtomicWaker;
12 use easy_parallel::Parallel;
13 use smol::future;
14 
15 // Creates a future with event counters.
16 //
17 // Usage: `future!(f, get_waker, POLL, DROP)`
18 //
19 // The future `f` always sleeps for 200 ms, and panics the second time it is polled.
20 // When it gets polled, `POLL` is incremented.
21 // When it gets dropped, `DROP` is incremented.
22 //
23 // Every time the future is run, it stores the waker into a global variable.
24 // This waker can be extracted using the `get_waker()` function.
25 macro_rules! future {
26     ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => {
27         static $poll: AtomicUsize = AtomicUsize::new(0);
28         static $drop: AtomicUsize = AtomicUsize::new(0);
29         static WAKER: AtomicWaker = AtomicWaker::new();
30 
31         let ($name, $get_waker) = {
32             struct Fut(Cell<bool>, #[allow(dead_code)] Box<i32>);
33 
34             impl Future for Fut {
35                 type Output = ();
36 
37                 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38                     WAKER.register(cx.waker());
39                     $poll.fetch_add(1, Ordering::SeqCst);
40                     thread::sleep(ms(400));
41 
42                     if self.0.get() {
43                         panic!()
44                     } else {
45                         self.0.set(true);
46                         Poll::Pending
47                     }
48                 }
49             }
50 
51             impl Drop for Fut {
52                 fn drop(&mut self) {
53                     $drop.fetch_add(1, Ordering::SeqCst);
54                 }
55             }
56 
57             (Fut(Cell::new(false), Box::new(0)), || WAKER.take().unwrap())
58         };
59     };
60 }
61 
62 // Creates a schedule function with event counters.
63 //
64 // Usage: `schedule!(s, chan, SCHED, DROP)`
65 //
66 // The schedule function `s` pushes the task into `chan`.
67 // When it gets invoked, `SCHED` is incremented.
68 // When it gets dropped, `DROP` is incremented.
69 //
70 // Receiver `chan` extracts the task when it is scheduled.
71 macro_rules! schedule {
72     ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
73         static $drop: AtomicUsize = AtomicUsize::new(0);
74         static $sched: AtomicUsize = AtomicUsize::new(0);
75 
76         let ($name, $chan) = {
77             let (s, r) = flume::unbounded();
78 
79             struct Guard(#[allow(dead_code)] Box<i32>);
80 
81             impl Drop for Guard {
82                 fn drop(&mut self) {
83                     $drop.fetch_add(1, Ordering::SeqCst);
84                 }
85             }
86 
87             let guard = Guard(Box::new(0));
88             let sched = move |runnable: Runnable| {
89                 let _ = &guard;
90                 $sched.fetch_add(1, Ordering::SeqCst);
91                 s.send(runnable).unwrap();
92             };
93 
94             (sched, r)
95         };
96     };
97 }
98 
ms(ms: u64) -> Duration99 fn ms(ms: u64) -> Duration {
100     Duration::from_millis(ms)
101 }
102 
try_await<T>(f: impl Future<Output = T>) -> Option<T>103 fn try_await<T>(f: impl Future<Output = T>) -> Option<T> {
104     future::block_on(future::poll_once(f))
105 }
106 
107 #[test]
wake_during_run()108 fn wake_during_run() {
109     future!(f, get_waker, POLL, DROP_F);
110     schedule!(s, chan, SCHEDULE, DROP_S);
111     let (runnable, task) = async_task::spawn(f, s);
112 
113     runnable.run();
114     let waker = get_waker();
115     waker.wake_by_ref();
116     let runnable = chan.recv().unwrap();
117 
118     Parallel::new()
119         .add(|| {
120             assert!(catch_unwind(|| runnable.run()).is_err());
121             drop(get_waker());
122             assert_eq!(POLL.load(Ordering::SeqCst), 2);
123             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
124             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
125             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
126             assert_eq!(chan.len(), 0);
127         })
128         .add(|| {
129             thread::sleep(ms(200));
130 
131             waker.wake();
132             task.detach();
133             assert_eq!(POLL.load(Ordering::SeqCst), 2);
134             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
135             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
136             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
137             assert_eq!(chan.len(), 0);
138 
139             thread::sleep(ms(400));
140 
141             assert_eq!(POLL.load(Ordering::SeqCst), 2);
142             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
143             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
144             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
145             assert_eq!(chan.len(), 0);
146         })
147         .run();
148 }
149 
150 #[test]
cancel_during_run()151 fn cancel_during_run() {
152     future!(f, get_waker, POLL, DROP_F);
153     schedule!(s, chan, SCHEDULE, DROP_S);
154     let (runnable, task) = async_task::spawn(f, s);
155 
156     runnable.run();
157     let waker = get_waker();
158     waker.wake();
159     let runnable = chan.recv().unwrap();
160 
161     Parallel::new()
162         .add(|| {
163             assert!(catch_unwind(|| runnable.run()).is_err());
164             drop(get_waker());
165             assert_eq!(POLL.load(Ordering::SeqCst), 2);
166             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
167             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
168             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
169             assert_eq!(chan.len(), 0);
170         })
171         .add(|| {
172             thread::sleep(ms(200));
173 
174             drop(task);
175             assert_eq!(POLL.load(Ordering::SeqCst), 2);
176             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
177             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
178             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
179             assert_eq!(chan.len(), 0);
180 
181             thread::sleep(ms(400));
182 
183             assert_eq!(POLL.load(Ordering::SeqCst), 2);
184             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
185             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
186             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
187             assert_eq!(chan.len(), 0);
188         })
189         .run();
190 }
191 
192 #[test]
wake_and_cancel_during_run()193 fn wake_and_cancel_during_run() {
194     future!(f, get_waker, POLL, DROP_F);
195     schedule!(s, chan, SCHEDULE, DROP_S);
196     let (runnable, task) = async_task::spawn(f, s);
197 
198     runnable.run();
199     let waker = get_waker();
200     waker.wake_by_ref();
201     let runnable = chan.recv().unwrap();
202 
203     Parallel::new()
204         .add(|| {
205             assert!(catch_unwind(|| runnable.run()).is_err());
206             drop(get_waker());
207             assert_eq!(POLL.load(Ordering::SeqCst), 2);
208             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
209             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
210             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
211             assert_eq!(chan.len(), 0);
212         })
213         .add(|| {
214             thread::sleep(ms(200));
215 
216             waker.wake();
217             assert_eq!(POLL.load(Ordering::SeqCst), 2);
218             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
219             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
220             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
221             assert_eq!(chan.len(), 0);
222 
223             drop(task);
224             assert_eq!(POLL.load(Ordering::SeqCst), 2);
225             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
226             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
227             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
228             assert_eq!(chan.len(), 0);
229 
230             thread::sleep(ms(400));
231 
232             assert_eq!(POLL.load(Ordering::SeqCst), 2);
233             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
234             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
235             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
236             assert_eq!(chan.len(), 0);
237         })
238         .run();
239 }
240 
241 #[flaky_test::flaky_test]
cancel_and_wake_during_run()242 fn cancel_and_wake_during_run() {
243     future!(f, get_waker, POLL, DROP_F);
244     schedule!(s, chan, SCHEDULE, DROP_S);
245     POLL.store(0, Ordering::SeqCst);
246     DROP_F.store(0, Ordering::SeqCst);
247     SCHEDULE.store(0, Ordering::SeqCst);
248     DROP_S.store(0, Ordering::SeqCst);
249 
250     let (runnable, task) = async_task::spawn(f, s);
251 
252     runnable.run();
253     let waker = get_waker();
254     waker.wake_by_ref();
255     let runnable = chan.recv().unwrap();
256 
257     Parallel::new()
258         .add(|| {
259             assert!(catch_unwind(|| runnable.run()).is_err());
260             drop(get_waker());
261             assert_eq!(POLL.load(Ordering::SeqCst), 2);
262             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
263             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
264             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
265             assert_eq!(chan.len(), 0);
266         })
267         .add(|| {
268             thread::sleep(ms(200));
269 
270             drop(task);
271             assert_eq!(POLL.load(Ordering::SeqCst), 2);
272             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
273             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
274             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
275             assert_eq!(chan.len(), 0);
276 
277             waker.wake();
278             assert_eq!(POLL.load(Ordering::SeqCst), 2);
279             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
280             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
281             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
282             assert_eq!(chan.len(), 0);
283 
284             thread::sleep(ms(400));
285 
286             assert_eq!(POLL.load(Ordering::SeqCst), 2);
287             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
288             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
289             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
290             assert_eq!(chan.len(), 0);
291         })
292         .run();
293 }
294 
295 #[test]
panic_and_poll()296 fn panic_and_poll() {
297     future!(f, get_waker, POLL, DROP_F);
298     schedule!(s, chan, SCHEDULE, DROP_S);
299     let (runnable, task) = async_task::spawn(f, s);
300 
301     runnable.run();
302     get_waker().wake();
303     assert_eq!(POLL.load(Ordering::SeqCst), 1);
304     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
305     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
306     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
307 
308     let mut task = task;
309     assert!(try_await(&mut task).is_none());
310 
311     let runnable = chan.recv().unwrap();
312     assert!(catch_unwind(|| runnable.run()).is_err());
313     assert_eq!(POLL.load(Ordering::SeqCst), 2);
314     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
315     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
316     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
317 
318     assert!(catch_unwind(AssertUnwindSafe(|| try_await(&mut task))).is_err());
319     assert_eq!(POLL.load(Ordering::SeqCst), 2);
320     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
321     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
322     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
323 
324     drop(get_waker());
325     drop(task);
326     assert_eq!(POLL.load(Ordering::SeqCst), 2);
327     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
328     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
329     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
330 }
331