1 use std::cell::Cell;
2 use std::future::Future;
3 use std::panic::{catch_unwind, AssertUnwindSafe};
4 use std::pin::Pin;
5 use std::sync::atomic::{AtomicUsize, Ordering};
6 use std::task::{Context, Poll};
7 use std::thread;
8 use std::time::Duration;
9
10 use async_task::Runnable;
11 use atomic_waker::AtomicWaker;
12 use easy_parallel::Parallel;
13 use smol::future;
14
15 // Creates a future with event counters.
16 //
17 // Usage: `future!(f, get_waker, POLL, DROP)`
18 //
19 // The future `f` always sleeps for 200 ms, and panics the second time it is polled.
20 // When it gets polled, `POLL` is incremented.
21 // When it gets dropped, `DROP` is incremented.
22 //
23 // Every time the future is run, it stores the waker into a global variable.
24 // This waker can be extracted using the `get_waker()` function.
25 macro_rules! future {
26 ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => {
27 static $poll: AtomicUsize = AtomicUsize::new(0);
28 static $drop: AtomicUsize = AtomicUsize::new(0);
29 static WAKER: AtomicWaker = AtomicWaker::new();
30
31 let ($name, $get_waker) = {
32 struct Fut(Cell<bool>, #[allow(dead_code)] Box<i32>);
33
34 impl Future for Fut {
35 type Output = ();
36
37 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38 WAKER.register(cx.waker());
39 $poll.fetch_add(1, Ordering::SeqCst);
40 thread::sleep(ms(400));
41
42 if self.0.get() {
43 panic!()
44 } else {
45 self.0.set(true);
46 Poll::Pending
47 }
48 }
49 }
50
51 impl Drop for Fut {
52 fn drop(&mut self) {
53 $drop.fetch_add(1, Ordering::SeqCst);
54 }
55 }
56
57 (Fut(Cell::new(false), Box::new(0)), || WAKER.take().unwrap())
58 };
59 };
60 }
61
62 // Creates a schedule function with event counters.
63 //
64 // Usage: `schedule!(s, chan, SCHED, DROP)`
65 //
66 // The schedule function `s` pushes the task into `chan`.
67 // When it gets invoked, `SCHED` is incremented.
68 // When it gets dropped, `DROP` is incremented.
69 //
70 // Receiver `chan` extracts the task when it is scheduled.
71 macro_rules! schedule {
72 ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
73 static $drop: AtomicUsize = AtomicUsize::new(0);
74 static $sched: AtomicUsize = AtomicUsize::new(0);
75
76 let ($name, $chan) = {
77 let (s, r) = flume::unbounded();
78
79 struct Guard(#[allow(dead_code)] Box<i32>);
80
81 impl Drop for Guard {
82 fn drop(&mut self) {
83 $drop.fetch_add(1, Ordering::SeqCst);
84 }
85 }
86
87 let guard = Guard(Box::new(0));
88 let sched = move |runnable: Runnable| {
89 let _ = &guard;
90 $sched.fetch_add(1, Ordering::SeqCst);
91 s.send(runnable).unwrap();
92 };
93
94 (sched, r)
95 };
96 };
97 }
98
ms(ms: u64) -> Duration99 fn ms(ms: u64) -> Duration {
100 Duration::from_millis(ms)
101 }
102
try_await<T>(f: impl Future<Output = T>) -> Option<T>103 fn try_await<T>(f: impl Future<Output = T>) -> Option<T> {
104 future::block_on(future::poll_once(f))
105 }
106
107 #[test]
wake_during_run()108 fn wake_during_run() {
109 future!(f, get_waker, POLL, DROP_F);
110 schedule!(s, chan, SCHEDULE, DROP_S);
111 let (runnable, task) = async_task::spawn(f, s);
112
113 runnable.run();
114 let waker = get_waker();
115 waker.wake_by_ref();
116 let runnable = chan.recv().unwrap();
117
118 Parallel::new()
119 .add(|| {
120 assert!(catch_unwind(|| runnable.run()).is_err());
121 drop(get_waker());
122 assert_eq!(POLL.load(Ordering::SeqCst), 2);
123 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
124 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
125 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
126 assert_eq!(chan.len(), 0);
127 })
128 .add(|| {
129 thread::sleep(ms(200));
130
131 waker.wake();
132 task.detach();
133 assert_eq!(POLL.load(Ordering::SeqCst), 2);
134 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
135 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
136 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
137 assert_eq!(chan.len(), 0);
138
139 thread::sleep(ms(400));
140
141 assert_eq!(POLL.load(Ordering::SeqCst), 2);
142 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
143 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
144 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
145 assert_eq!(chan.len(), 0);
146 })
147 .run();
148 }
149
150 #[test]
cancel_during_run()151 fn cancel_during_run() {
152 future!(f, get_waker, POLL, DROP_F);
153 schedule!(s, chan, SCHEDULE, DROP_S);
154 let (runnable, task) = async_task::spawn(f, s);
155
156 runnable.run();
157 let waker = get_waker();
158 waker.wake();
159 let runnable = chan.recv().unwrap();
160
161 Parallel::new()
162 .add(|| {
163 assert!(catch_unwind(|| runnable.run()).is_err());
164 drop(get_waker());
165 assert_eq!(POLL.load(Ordering::SeqCst), 2);
166 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
167 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
168 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
169 assert_eq!(chan.len(), 0);
170 })
171 .add(|| {
172 thread::sleep(ms(200));
173
174 drop(task);
175 assert_eq!(POLL.load(Ordering::SeqCst), 2);
176 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
177 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
178 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
179 assert_eq!(chan.len(), 0);
180
181 thread::sleep(ms(400));
182
183 assert_eq!(POLL.load(Ordering::SeqCst), 2);
184 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
185 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
186 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
187 assert_eq!(chan.len(), 0);
188 })
189 .run();
190 }
191
192 #[test]
wake_and_cancel_during_run()193 fn wake_and_cancel_during_run() {
194 future!(f, get_waker, POLL, DROP_F);
195 schedule!(s, chan, SCHEDULE, DROP_S);
196 let (runnable, task) = async_task::spawn(f, s);
197
198 runnable.run();
199 let waker = get_waker();
200 waker.wake_by_ref();
201 let runnable = chan.recv().unwrap();
202
203 Parallel::new()
204 .add(|| {
205 assert!(catch_unwind(|| runnable.run()).is_err());
206 drop(get_waker());
207 assert_eq!(POLL.load(Ordering::SeqCst), 2);
208 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
209 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
210 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
211 assert_eq!(chan.len(), 0);
212 })
213 .add(|| {
214 thread::sleep(ms(200));
215
216 waker.wake();
217 assert_eq!(POLL.load(Ordering::SeqCst), 2);
218 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
219 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
220 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
221 assert_eq!(chan.len(), 0);
222
223 drop(task);
224 assert_eq!(POLL.load(Ordering::SeqCst), 2);
225 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
226 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
227 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
228 assert_eq!(chan.len(), 0);
229
230 thread::sleep(ms(400));
231
232 assert_eq!(POLL.load(Ordering::SeqCst), 2);
233 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
234 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
235 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
236 assert_eq!(chan.len(), 0);
237 })
238 .run();
239 }
240
241 #[flaky_test::flaky_test]
cancel_and_wake_during_run()242 fn cancel_and_wake_during_run() {
243 future!(f, get_waker, POLL, DROP_F);
244 schedule!(s, chan, SCHEDULE, DROP_S);
245 POLL.store(0, Ordering::SeqCst);
246 DROP_F.store(0, Ordering::SeqCst);
247 SCHEDULE.store(0, Ordering::SeqCst);
248 DROP_S.store(0, Ordering::SeqCst);
249
250 let (runnable, task) = async_task::spawn(f, s);
251
252 runnable.run();
253 let waker = get_waker();
254 waker.wake_by_ref();
255 let runnable = chan.recv().unwrap();
256
257 Parallel::new()
258 .add(|| {
259 assert!(catch_unwind(|| runnable.run()).is_err());
260 drop(get_waker());
261 assert_eq!(POLL.load(Ordering::SeqCst), 2);
262 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
263 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
264 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
265 assert_eq!(chan.len(), 0);
266 })
267 .add(|| {
268 thread::sleep(ms(200));
269
270 drop(task);
271 assert_eq!(POLL.load(Ordering::SeqCst), 2);
272 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
273 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
274 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
275 assert_eq!(chan.len(), 0);
276
277 waker.wake();
278 assert_eq!(POLL.load(Ordering::SeqCst), 2);
279 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
280 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
281 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
282 assert_eq!(chan.len(), 0);
283
284 thread::sleep(ms(400));
285
286 assert_eq!(POLL.load(Ordering::SeqCst), 2);
287 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
288 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
289 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
290 assert_eq!(chan.len(), 0);
291 })
292 .run();
293 }
294
295 #[test]
panic_and_poll()296 fn panic_and_poll() {
297 future!(f, get_waker, POLL, DROP_F);
298 schedule!(s, chan, SCHEDULE, DROP_S);
299 let (runnable, task) = async_task::spawn(f, s);
300
301 runnable.run();
302 get_waker().wake();
303 assert_eq!(POLL.load(Ordering::SeqCst), 1);
304 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
305 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
306 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
307
308 let mut task = task;
309 assert!(try_await(&mut task).is_none());
310
311 let runnable = chan.recv().unwrap();
312 assert!(catch_unwind(|| runnable.run()).is_err());
313 assert_eq!(POLL.load(Ordering::SeqCst), 2);
314 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
315 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
316 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
317
318 assert!(catch_unwind(AssertUnwindSafe(|| try_await(&mut task))).is_err());
319 assert_eq!(POLL.load(Ordering::SeqCst), 2);
320 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
321 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
322 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
323
324 drop(get_waker());
325 drop(task);
326 assert_eq!(POLL.load(Ordering::SeqCst), 2);
327 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
328 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
329 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
330 }
331