1 use std::cell::Cell;
2 use std::future::Future;
3 use std::panic::{catch_unwind, AssertUnwindSafe};
4 use std::pin::Pin;
5 use std::sync::atomic::{AtomicUsize, Ordering};
6 use std::task::{Context, Poll};
7 use std::thread;
8 use std::time::Duration;
9 
10 use async_task::Runnable;
11 use easy_parallel::Parallel;
12 use smol::future;
13 
14 // Creates a future with event counters.
15 //
16 // Usage: `future!(f, POLL, DROP_F, DROP_T)`
17 //
18 // The future `f` outputs `Poll::Ready`.
19 // When it gets polled, `POLL` is incremented.
20 // When it gets dropped, `DROP_F` is incremented.
21 // When the output gets dropped, `DROP_T` is incremented.
22 macro_rules! future {
23     ($name:pat, $poll:ident, $drop_f:ident, $drop_t:ident) => {
24         static $poll: AtomicUsize = AtomicUsize::new(0);
25         static $drop_f: AtomicUsize = AtomicUsize::new(0);
26         static $drop_t: AtomicUsize = AtomicUsize::new(0);
27 
28         let $name = {
29             struct Fut(#[allow(dead_code)] Box<i32>);
30 
31             impl Future for Fut {
32                 type Output = Out;
33 
34                 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
35                     $poll.fetch_add(1, Ordering::SeqCst);
36                     Poll::Ready(Out(Box::new(0), true))
37                 }
38             }
39 
40             impl Drop for Fut {
41                 fn drop(&mut self) {
42                     $drop_f.fetch_add(1, Ordering::SeqCst);
43                 }
44             }
45 
46             #[derive(Default)]
47             struct Out(#[allow(dead_code)] Box<i32>, bool);
48 
49             impl Drop for Out {
50                 fn drop(&mut self) {
51                     if self.1 {
52                         $drop_t.fetch_add(1, Ordering::SeqCst);
53                     }
54                 }
55             }
56 
57             Fut(Box::new(0))
58         };
59     };
60 }
61 
62 // Creates a schedule function with event counters.
63 //
64 // Usage: `schedule!(s, SCHED, DROP)`
65 //
66 // The schedule function `s` does nothing.
67 // When it gets invoked, `SCHED` is incremented.
68 // When it gets dropped, `DROP` is incremented.
69 macro_rules! schedule {
70     ($name:pat, $sched:ident, $drop:ident) => {
71         static $drop: AtomicUsize = AtomicUsize::new(0);
72         static $sched: AtomicUsize = AtomicUsize::new(0);
73 
74         let $name = {
75             struct Guard(#[allow(dead_code)] Box<i32>);
76 
77             impl Drop for Guard {
78                 fn drop(&mut self) {
79                     $drop.fetch_add(1, Ordering::SeqCst);
80                 }
81             }
82 
83             let guard = Guard(Box::new(0));
84             move |runnable: Runnable| {
85                 let _ = &guard;
86                 runnable.schedule();
87                 $sched.fetch_add(1, Ordering::SeqCst);
88             }
89         };
90     };
91 }
92 
ms(ms: u64) -> Duration93 fn ms(ms: u64) -> Duration {
94     Duration::from_millis(ms)
95 }
96 
97 #[test]
drop_and_join()98 fn drop_and_join() {
99     future!(f, POLL, DROP_F, DROP_T);
100     schedule!(s, SCHEDULE, DROP_S);
101     let (runnable, task) = async_task::spawn(f, s);
102 
103     assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
104 
105     drop(runnable);
106     assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
107 
108     assert!(catch_unwind(|| future::block_on(task)).is_err());
109     assert_eq!(POLL.load(Ordering::SeqCst), 0);
110     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
111     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
112     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
113     assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
114 }
115 
116 #[test]
run_and_join()117 fn run_and_join() {
118     future!(f, POLL, DROP_F, DROP_T);
119     schedule!(s, SCHEDULE, DROP_S);
120     let (runnable, task) = async_task::spawn(f, s);
121 
122     assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
123 
124     runnable.run();
125     assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
126 
127     assert!(catch_unwind(|| future::block_on(task)).is_ok());
128     assert_eq!(POLL.load(Ordering::SeqCst), 1);
129     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
130     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
131     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
132     assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
133 }
134 
135 #[test]
detach_and_run()136 fn detach_and_run() {
137     future!(f, POLL, DROP_F, DROP_T);
138     schedule!(s, SCHEDULE, DROP_S);
139     let (runnable, task) = async_task::spawn(f, s);
140 
141     assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
142 
143     task.detach();
144     assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
145 
146     runnable.run();
147     assert_eq!(POLL.load(Ordering::SeqCst), 1);
148     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
149     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
150     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
151     assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
152 }
153 
154 #[test]
join_twice()155 fn join_twice() {
156     future!(f, POLL, DROP_F, DROP_T);
157     schedule!(s, SCHEDULE, DROP_S);
158     let (runnable, mut task) = async_task::spawn(f, s);
159 
160     assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
161 
162     runnable.run();
163     assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
164 
165     future::block_on(&mut task);
166     assert_eq!(POLL.load(Ordering::SeqCst), 1);
167     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
168     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
169     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
170     assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
171 
172     assert!(catch_unwind(AssertUnwindSafe(|| future::block_on(&mut task))).is_err());
173     assert_eq!(POLL.load(Ordering::SeqCst), 1);
174     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
175     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
176     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
177     assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
178 
179     task.detach();
180     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
181 }
182 
183 #[test]
join_and_cancel()184 fn join_and_cancel() {
185     future!(f, POLL, DROP_F, DROP_T);
186     schedule!(s, SCHEDULE, DROP_S);
187     let (runnable, task) = async_task::spawn(f, s);
188 
189     Parallel::new()
190         .add(|| {
191             thread::sleep(ms(200));
192             drop(runnable);
193 
194             thread::sleep(ms(400));
195             assert_eq!(POLL.load(Ordering::SeqCst), 0);
196             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
197             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
198             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
199             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
200         })
201         .add(|| {
202             assert!(catch_unwind(|| future::block_on(task)).is_err());
203             assert_eq!(POLL.load(Ordering::SeqCst), 0);
204             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
205 
206             thread::sleep(ms(200));
207             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
208             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
209             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
210         })
211         .run();
212 }
213 
214 #[test]
join_and_run()215 fn join_and_run() {
216     future!(f, POLL, DROP_F, DROP_T);
217     schedule!(s, SCHEDULE, DROP_S);
218     let (runnable, task) = async_task::spawn(f, s);
219 
220     Parallel::new()
221         .add(|| {
222             thread::sleep(ms(400));
223 
224             runnable.run();
225             assert_eq!(POLL.load(Ordering::SeqCst), 1);
226             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
227             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
228 
229             thread::sleep(ms(200));
230             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
231         })
232         .add(|| {
233             future::block_on(task);
234             assert_eq!(POLL.load(Ordering::SeqCst), 1);
235             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
236             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
237             assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
238 
239             thread::sleep(ms(200));
240             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
241         })
242         .run();
243 }
244 
245 #[test]
try_join_and_run_and_join()246 fn try_join_and_run_and_join() {
247     future!(f, POLL, DROP_F, DROP_T);
248     schedule!(s, SCHEDULE, DROP_S);
249     let (runnable, mut task) = async_task::spawn(f, s);
250 
251     Parallel::new()
252         .add(|| {
253             thread::sleep(ms(400));
254 
255             runnable.run();
256             assert_eq!(POLL.load(Ordering::SeqCst), 1);
257             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
258             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
259 
260             thread::sleep(ms(200));
261             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
262         })
263         .add(|| {
264             future::block_on(future::or(&mut task, future::ready(Default::default())));
265             assert_eq!(POLL.load(Ordering::SeqCst), 0);
266             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
267             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
268             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
269             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
270 
271             future::block_on(task);
272             assert_eq!(POLL.load(Ordering::SeqCst), 1);
273             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
274             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
275             assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
276 
277             thread::sleep(ms(200));
278             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
279         })
280         .run();
281 }
282 
283 #[test]
try_join_and_cancel_and_run()284 fn try_join_and_cancel_and_run() {
285     future!(f, POLL, DROP_F, DROP_T);
286     schedule!(s, SCHEDULE, DROP_S);
287     let (runnable, mut task) = async_task::spawn(f, s);
288 
289     Parallel::new()
290         .add(|| {
291             thread::sleep(ms(200));
292 
293             runnable.run();
294             assert_eq!(POLL.load(Ordering::SeqCst), 0);
295             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
296             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
297             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
298         })
299         .add(|| {
300             future::block_on(future::or(&mut task, future::ready(Default::default())));
301             assert_eq!(POLL.load(Ordering::SeqCst), 0);
302             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
303             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
304             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
305             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
306 
307             drop(task);
308             assert_eq!(POLL.load(Ordering::SeqCst), 0);
309             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
310             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
311             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
312             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
313         })
314         .run();
315 }
316 
317 #[test]
try_join_and_run_and_cancel()318 fn try_join_and_run_and_cancel() {
319     future!(f, POLL, DROP_F, DROP_T);
320     schedule!(s, SCHEDULE, DROP_S);
321     let (runnable, mut task) = async_task::spawn(f, s);
322 
323     Parallel::new()
324         .add(|| {
325             thread::sleep(ms(200));
326 
327             runnable.run();
328             assert_eq!(POLL.load(Ordering::SeqCst), 1);
329             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
330             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
331             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
332         })
333         .add(|| {
334             future::block_on(future::or(&mut task, future::ready(Default::default())));
335             assert_eq!(POLL.load(Ordering::SeqCst), 0);
336             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
337             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
338             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
339             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
340 
341             thread::sleep(ms(400));
342 
343             drop(task);
344             assert_eq!(POLL.load(Ordering::SeqCst), 1);
345             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
346             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
347             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
348             assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
349         })
350         .run();
351 }
352 
353 #[test]
await_output()354 fn await_output() {
355     struct Fut<T>(Cell<Option<T>>);
356 
357     impl<T> Fut<T> {
358         fn new(t: T) -> Fut<T> {
359             Fut(Cell::new(Some(t)))
360         }
361     }
362 
363     impl<T> Future for Fut<T> {
364         type Output = T;
365 
366         fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
367             Poll::Ready(self.0.take().unwrap())
368         }
369     }
370 
371     for i in 0..10 {
372         let (runnable, task) = async_task::spawn(Fut::new(i), drop);
373         runnable.run();
374         assert_eq!(future::block_on(task), i);
375     }
376 
377     for i in 0..10 {
378         let (runnable, task) = async_task::spawn(Fut::new(vec![7; i]), drop);
379         runnable.run();
380         assert_eq!(future::block_on(task), vec![7; i]);
381     }
382 
383     let (runnable, task) = async_task::spawn(Fut::new("foo".to_string()), drop);
384     runnable.run();
385     assert_eq!(future::block_on(task), "foo");
386 }
387