1 use std::future::Future;
2 use std::pin::Pin;
3 use std::ptr::NonNull;
4 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5 use std::sync::Arc;
6 use std::task::{Context, Poll};
7 
8 use async_task::Runnable;
9 use smol::future;
10 
11 // Creates a future with event counters.
12 //
13 // Usage: `future!(f, POLL, DROP)`
14 //
15 // The future `f` always returns `Poll::Ready`.
16 // When it gets polled, `POLL` is incremented.
17 // When it gets dropped, `DROP` is incremented.
18 macro_rules! future {
19     ($name:pat, $poll:ident, $drop:ident) => {
20         static $poll: AtomicUsize = AtomicUsize::new(0);
21         static $drop: AtomicUsize = AtomicUsize::new(0);
22 
23         let $name = {
24             struct Fut(#[allow(dead_code)] Box<i32>);
25 
26             impl Future for Fut {
27                 type Output = Box<i32>;
28 
29                 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
30                     $poll.fetch_add(1, Ordering::SeqCst);
31                     Poll::Ready(Box::new(0))
32                 }
33             }
34 
35             impl Drop for Fut {
36                 fn drop(&mut self) {
37                     $drop.fetch_add(1, Ordering::SeqCst);
38                 }
39             }
40 
41             Fut(Box::new(0))
42         };
43     };
44 }
45 
46 // Creates a schedule function with event counters.
47 //
48 // Usage: `schedule!(s, SCHED, DROP)`
49 //
50 // The schedule function `s` does nothing.
51 // When it gets invoked, `SCHED` is incremented.
52 // When it gets dropped, `DROP` is incremented.
53 macro_rules! schedule {
54     ($name:pat, $sched:ident, $drop:ident) => {
55         static $drop: AtomicUsize = AtomicUsize::new(0);
56         static $sched: AtomicUsize = AtomicUsize::new(0);
57 
58         let $name = {
59             struct Guard(#[allow(dead_code)] Box<i32>);
60 
61             impl Drop for Guard {
62                 fn drop(&mut self) {
63                     $drop.fetch_add(1, Ordering::SeqCst);
64                 }
65             }
66 
67             let guard = Guard(Box::new(0));
68             move |_runnable| {
69                 let _ = &guard;
70                 $sched.fetch_add(1, Ordering::SeqCst);
71             }
72         };
73     };
74 }
75 
try_await<T>(f: impl Future<Output = T>) -> Option<T>76 fn try_await<T>(f: impl Future<Output = T>) -> Option<T> {
77     future::block_on(future::poll_once(f))
78 }
79 
80 #[test]
drop_and_detach()81 fn drop_and_detach() {
82     future!(f, POLL, DROP_F);
83     schedule!(s, SCHEDULE, DROP_S);
84     let (runnable, task) = async_task::spawn(f, s);
85 
86     assert_eq!(POLL.load(Ordering::SeqCst), 0);
87     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
88     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
89     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
90 
91     drop(runnable);
92     assert_eq!(POLL.load(Ordering::SeqCst), 0);
93     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
94     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
95     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
96 
97     task.detach();
98     assert_eq!(POLL.load(Ordering::SeqCst), 0);
99     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
100     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
101     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
102 }
103 
104 #[test]
detach_and_drop()105 fn detach_and_drop() {
106     future!(f, POLL, DROP_F);
107     schedule!(s, SCHEDULE, DROP_S);
108     let (runnable, task) = async_task::spawn(f, s);
109 
110     task.detach();
111     assert_eq!(POLL.load(Ordering::SeqCst), 0);
112     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
113     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
114     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
115 
116     drop(runnable);
117     assert_eq!(POLL.load(Ordering::SeqCst), 0);
118     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
119     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
120     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
121 }
122 
123 #[test]
detach_and_run()124 fn detach_and_run() {
125     future!(f, POLL, DROP_F);
126     schedule!(s, SCHEDULE, DROP_S);
127     let (runnable, task) = async_task::spawn(f, s);
128 
129     task.detach();
130     assert_eq!(POLL.load(Ordering::SeqCst), 0);
131     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
132     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
133     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
134 
135     runnable.run();
136     assert_eq!(POLL.load(Ordering::SeqCst), 1);
137     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
138     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
139     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
140 }
141 
142 #[test]
run_and_detach()143 fn run_and_detach() {
144     future!(f, POLL, DROP_F);
145     schedule!(s, SCHEDULE, DROP_S);
146     let (runnable, task) = async_task::spawn(f, s);
147 
148     runnable.run();
149     assert_eq!(POLL.load(Ordering::SeqCst), 1);
150     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
151     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
152     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
153 
154     task.detach();
155     assert_eq!(POLL.load(Ordering::SeqCst), 1);
156     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
157     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
158     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
159 }
160 
161 #[test]
cancel_and_run()162 fn cancel_and_run() {
163     future!(f, POLL, DROP_F);
164     schedule!(s, SCHEDULE, DROP_S);
165     let (runnable, task) = async_task::spawn(f, s);
166 
167     drop(task);
168     assert_eq!(POLL.load(Ordering::SeqCst), 0);
169     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
170     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
171     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
172 
173     runnable.run();
174     assert_eq!(POLL.load(Ordering::SeqCst), 0);
175     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
176     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
177     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
178 }
179 
180 #[test]
run_and_cancel()181 fn run_and_cancel() {
182     future!(f, POLL, DROP_F);
183     schedule!(s, SCHEDULE, DROP_S);
184     let (runnable, task) = async_task::spawn(f, s);
185 
186     runnable.run();
187     assert_eq!(POLL.load(Ordering::SeqCst), 1);
188     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
189     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
190     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
191 
192     drop(task);
193     assert_eq!(POLL.load(Ordering::SeqCst), 1);
194     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
195     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
196     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
197 }
198 
199 #[test]
cancel_join()200 fn cancel_join() {
201     future!(f, POLL, DROP_F);
202     schedule!(s, SCHEDULE, DROP_S);
203     let (runnable, mut task) = async_task::spawn(f, s);
204 
205     assert!(try_await(&mut task).is_none());
206     assert_eq!(POLL.load(Ordering::SeqCst), 0);
207     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
208     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
209     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
210 
211     runnable.run();
212     assert_eq!(POLL.load(Ordering::SeqCst), 1);
213     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
214     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
215     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
216 
217     assert!(try_await(&mut task).is_some());
218     assert_eq!(POLL.load(Ordering::SeqCst), 1);
219     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
220     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
221     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
222 
223     drop(task);
224     assert_eq!(POLL.load(Ordering::SeqCst), 1);
225     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
226     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
227     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
228 }
229 
230 #[test]
schedule()231 fn schedule() {
232     let (s, r) = flume::unbounded();
233     let schedule = move |runnable| s.send(runnable).unwrap();
234     let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
235 
236     assert!(r.is_empty());
237     runnable.schedule();
238 
239     let runnable = r.recv().unwrap();
240     assert!(r.is_empty());
241     runnable.schedule();
242 
243     let runnable = r.recv().unwrap();
244     assert!(r.is_empty());
245     runnable.schedule();
246 
247     r.recv().unwrap();
248 }
249 
250 #[test]
schedule_counter()251 fn schedule_counter() {
252     static COUNT: AtomicUsize = AtomicUsize::new(0);
253 
254     let (s, r) = flume::unbounded();
255     let schedule = move |runnable: Runnable| {
256         COUNT.fetch_add(1, Ordering::SeqCst);
257         s.send(runnable).unwrap();
258     };
259     let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
260     runnable.schedule();
261 
262     r.recv().unwrap().schedule();
263     r.recv().unwrap().schedule();
264     assert_eq!(COUNT.load(Ordering::SeqCst), 3);
265     r.recv().unwrap();
266 }
267 
268 #[test]
drop_inside_schedule()269 fn drop_inside_schedule() {
270     struct DropGuard(AtomicUsize);
271     impl Drop for DropGuard {
272         fn drop(&mut self) {
273             self.0.fetch_add(1, Ordering::SeqCst);
274         }
275     }
276     let guard = DropGuard(AtomicUsize::new(0));
277 
278     let (runnable, _) = async_task::spawn(async {}, move |runnable| {
279         assert_eq!(guard.0.load(Ordering::SeqCst), 0);
280         drop(runnable);
281         assert_eq!(guard.0.load(Ordering::SeqCst), 0);
282     });
283     runnable.schedule();
284 }
285 
286 #[test]
waker()287 fn waker() {
288     let (s, r) = flume::unbounded();
289     let schedule = move |runnable| s.send(runnable).unwrap();
290     let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
291 
292     assert!(r.is_empty());
293     let waker = runnable.waker();
294     runnable.run();
295     waker.wake_by_ref();
296 
297     let runnable = r.recv().unwrap();
298     runnable.run();
299     waker.wake();
300     r.recv().unwrap();
301 }
302 
303 #[test]
raw()304 fn raw() {
305     // Dispatch schedules a function for execution at a later point. For tests, we execute it straight away.
306     fn dispatch(trampoline: extern "C" fn(NonNull<()>), context: NonNull<()>) {
307         trampoline(context)
308     }
309     extern "C" fn trampoline(runnable: NonNull<()>) {
310         let task = unsafe { Runnable::<()>::from_raw(runnable) };
311         task.run();
312     }
313 
314     let task_got_executed = Arc::new(AtomicBool::new(false));
315     let (runnable, _handle) = async_task::spawn(
316         {
317             let task_got_executed = task_got_executed.clone();
318             async move { task_got_executed.store(true, Ordering::SeqCst) }
319         },
320         |runnable: Runnable<()>| dispatch(trampoline, runnable.into_raw()),
321     );
322     runnable.schedule();
323 
324     assert!(task_got_executed.load(Ordering::SeqCst));
325 }
326