1 use std::cell::Cell;
2 use std::future::Future;
3 use std::panic::{catch_unwind, AssertUnwindSafe};
4 use std::pin::Pin;
5 use std::sync::atomic::{AtomicUsize, Ordering};
6 use std::task::{Context, Poll};
7 use std::thread;
8 use std::time::Duration;
9
10 use async_task::Runnable;
11 use easy_parallel::Parallel;
12 use smol::future;
13
14 // Creates a future with event counters.
15 //
16 // Usage: `future!(f, POLL, DROP_F, DROP_T)`
17 //
18 // The future `f` outputs `Poll::Ready`.
19 // When it gets polled, `POLL` is incremented.
20 // When it gets dropped, `DROP_F` is incremented.
21 // When the output gets dropped, `DROP_T` is incremented.
22 macro_rules! future {
23 ($name:pat, $poll:ident, $drop_f:ident, $drop_t:ident) => {
24 static $poll: AtomicUsize = AtomicUsize::new(0);
25 static $drop_f: AtomicUsize = AtomicUsize::new(0);
26 static $drop_t: AtomicUsize = AtomicUsize::new(0);
27
28 let $name = {
29 struct Fut(#[allow(dead_code)] Box<i32>);
30
31 impl Future for Fut {
32 type Output = Out;
33
34 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
35 $poll.fetch_add(1, Ordering::SeqCst);
36 Poll::Ready(Out(Box::new(0), true))
37 }
38 }
39
40 impl Drop for Fut {
41 fn drop(&mut self) {
42 $drop_f.fetch_add(1, Ordering::SeqCst);
43 }
44 }
45
46 #[derive(Default)]
47 struct Out(#[allow(dead_code)] Box<i32>, bool);
48
49 impl Drop for Out {
50 fn drop(&mut self) {
51 if self.1 {
52 $drop_t.fetch_add(1, Ordering::SeqCst);
53 }
54 }
55 }
56
57 Fut(Box::new(0))
58 };
59 };
60 }
61
62 // Creates a schedule function with event counters.
63 //
64 // Usage: `schedule!(s, SCHED, DROP)`
65 //
66 // The schedule function `s` does nothing.
67 // When it gets invoked, `SCHED` is incremented.
68 // When it gets dropped, `DROP` is incremented.
69 macro_rules! schedule {
70 ($name:pat, $sched:ident, $drop:ident) => {
71 static $drop: AtomicUsize = AtomicUsize::new(0);
72 static $sched: AtomicUsize = AtomicUsize::new(0);
73
74 let $name = {
75 struct Guard(#[allow(dead_code)] Box<i32>);
76
77 impl Drop for Guard {
78 fn drop(&mut self) {
79 $drop.fetch_add(1, Ordering::SeqCst);
80 }
81 }
82
83 let guard = Guard(Box::new(0));
84 move |runnable: Runnable| {
85 let _ = &guard;
86 runnable.schedule();
87 $sched.fetch_add(1, Ordering::SeqCst);
88 }
89 };
90 };
91 }
92
ms(ms: u64) -> Duration93 fn ms(ms: u64) -> Duration {
94 Duration::from_millis(ms)
95 }
96
97 #[test]
drop_and_join()98 fn drop_and_join() {
99 future!(f, POLL, DROP_F, DROP_T);
100 schedule!(s, SCHEDULE, DROP_S);
101 let (runnable, task) = async_task::spawn(f, s);
102
103 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
104
105 drop(runnable);
106 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
107
108 assert!(catch_unwind(|| future::block_on(task)).is_err());
109 assert_eq!(POLL.load(Ordering::SeqCst), 0);
110 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
111 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
112 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
113 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
114 }
115
116 #[test]
run_and_join()117 fn run_and_join() {
118 future!(f, POLL, DROP_F, DROP_T);
119 schedule!(s, SCHEDULE, DROP_S);
120 let (runnable, task) = async_task::spawn(f, s);
121
122 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
123
124 runnable.run();
125 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
126
127 assert!(catch_unwind(|| future::block_on(task)).is_ok());
128 assert_eq!(POLL.load(Ordering::SeqCst), 1);
129 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
130 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
131 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
132 assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
133 }
134
135 #[test]
detach_and_run()136 fn detach_and_run() {
137 future!(f, POLL, DROP_F, DROP_T);
138 schedule!(s, SCHEDULE, DROP_S);
139 let (runnable, task) = async_task::spawn(f, s);
140
141 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
142
143 task.detach();
144 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
145
146 runnable.run();
147 assert_eq!(POLL.load(Ordering::SeqCst), 1);
148 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
149 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
150 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
151 assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
152 }
153
154 #[test]
join_twice()155 fn join_twice() {
156 future!(f, POLL, DROP_F, DROP_T);
157 schedule!(s, SCHEDULE, DROP_S);
158 let (runnable, mut task) = async_task::spawn(f, s);
159
160 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
161
162 runnable.run();
163 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
164
165 future::block_on(&mut task);
166 assert_eq!(POLL.load(Ordering::SeqCst), 1);
167 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
168 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
169 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
170 assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
171
172 assert!(catch_unwind(AssertUnwindSafe(|| future::block_on(&mut task))).is_err());
173 assert_eq!(POLL.load(Ordering::SeqCst), 1);
174 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
175 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
176 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
177 assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
178
179 task.detach();
180 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
181 }
182
183 #[test]
join_and_cancel()184 fn join_and_cancel() {
185 future!(f, POLL, DROP_F, DROP_T);
186 schedule!(s, SCHEDULE, DROP_S);
187 let (runnable, task) = async_task::spawn(f, s);
188
189 Parallel::new()
190 .add(|| {
191 thread::sleep(ms(200));
192 drop(runnable);
193
194 thread::sleep(ms(400));
195 assert_eq!(POLL.load(Ordering::SeqCst), 0);
196 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
197 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
198 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
199 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
200 })
201 .add(|| {
202 assert!(catch_unwind(|| future::block_on(task)).is_err());
203 assert_eq!(POLL.load(Ordering::SeqCst), 0);
204 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
205
206 thread::sleep(ms(200));
207 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
208 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
209 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
210 })
211 .run();
212 }
213
214 #[test]
join_and_run()215 fn join_and_run() {
216 future!(f, POLL, DROP_F, DROP_T);
217 schedule!(s, SCHEDULE, DROP_S);
218 let (runnable, task) = async_task::spawn(f, s);
219
220 Parallel::new()
221 .add(|| {
222 thread::sleep(ms(400));
223
224 runnable.run();
225 assert_eq!(POLL.load(Ordering::SeqCst), 1);
226 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
227 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
228
229 thread::sleep(ms(200));
230 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
231 })
232 .add(|| {
233 future::block_on(task);
234 assert_eq!(POLL.load(Ordering::SeqCst), 1);
235 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
236 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
237 assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
238
239 thread::sleep(ms(200));
240 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
241 })
242 .run();
243 }
244
245 #[test]
try_join_and_run_and_join()246 fn try_join_and_run_and_join() {
247 future!(f, POLL, DROP_F, DROP_T);
248 schedule!(s, SCHEDULE, DROP_S);
249 let (runnable, mut task) = async_task::spawn(f, s);
250
251 Parallel::new()
252 .add(|| {
253 thread::sleep(ms(400));
254
255 runnable.run();
256 assert_eq!(POLL.load(Ordering::SeqCst), 1);
257 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
258 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
259
260 thread::sleep(ms(200));
261 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
262 })
263 .add(|| {
264 future::block_on(future::or(&mut task, future::ready(Default::default())));
265 assert_eq!(POLL.load(Ordering::SeqCst), 0);
266 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
267 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
268 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
269 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
270
271 future::block_on(task);
272 assert_eq!(POLL.load(Ordering::SeqCst), 1);
273 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
274 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
275 assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
276
277 thread::sleep(ms(200));
278 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
279 })
280 .run();
281 }
282
283 #[test]
try_join_and_cancel_and_run()284 fn try_join_and_cancel_and_run() {
285 future!(f, POLL, DROP_F, DROP_T);
286 schedule!(s, SCHEDULE, DROP_S);
287 let (runnable, mut task) = async_task::spawn(f, s);
288
289 Parallel::new()
290 .add(|| {
291 thread::sleep(ms(200));
292
293 runnable.run();
294 assert_eq!(POLL.load(Ordering::SeqCst), 0);
295 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
296 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
297 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
298 })
299 .add(|| {
300 future::block_on(future::or(&mut task, future::ready(Default::default())));
301 assert_eq!(POLL.load(Ordering::SeqCst), 0);
302 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
303 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
304 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
305 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
306
307 drop(task);
308 assert_eq!(POLL.load(Ordering::SeqCst), 0);
309 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
310 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
311 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
312 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
313 })
314 .run();
315 }
316
317 #[test]
try_join_and_run_and_cancel()318 fn try_join_and_run_and_cancel() {
319 future!(f, POLL, DROP_F, DROP_T);
320 schedule!(s, SCHEDULE, DROP_S);
321 let (runnable, mut task) = async_task::spawn(f, s);
322
323 Parallel::new()
324 .add(|| {
325 thread::sleep(ms(200));
326
327 runnable.run();
328 assert_eq!(POLL.load(Ordering::SeqCst), 1);
329 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
330 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
331 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
332 })
333 .add(|| {
334 future::block_on(future::or(&mut task, future::ready(Default::default())));
335 assert_eq!(POLL.load(Ordering::SeqCst), 0);
336 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
337 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
338 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
339 assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
340
341 thread::sleep(ms(400));
342
343 drop(task);
344 assert_eq!(POLL.load(Ordering::SeqCst), 1);
345 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
346 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
347 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
348 assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
349 })
350 .run();
351 }
352
353 #[test]
await_output()354 fn await_output() {
355 struct Fut<T>(Cell<Option<T>>);
356
357 impl<T> Fut<T> {
358 fn new(t: T) -> Fut<T> {
359 Fut(Cell::new(Some(t)))
360 }
361 }
362
363 impl<T> Future for Fut<T> {
364 type Output = T;
365
366 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
367 Poll::Ready(self.0.take().unwrap())
368 }
369 }
370
371 for i in 0..10 {
372 let (runnable, task) = async_task::spawn(Fut::new(i), drop);
373 runnable.run();
374 assert_eq!(future::block_on(task), i);
375 }
376
377 for i in 0..10 {
378 let (runnable, task) = async_task::spawn(Fut::new(vec![7; i]), drop);
379 runnable.run();
380 assert_eq!(future::block_on(task), vec![7; i]);
381 }
382
383 let (runnable, task) = async_task::spawn(Fut::new("foo".to_string()), drop);
384 runnable.run();
385 assert_eq!(future::block_on(task), "foo");
386 }
387