1 use std::future::Future;
2 use std::pin::Pin;
3 use std::ptr::NonNull;
4 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5 use std::sync::Arc;
6 use std::task::{Context, Poll};
7
8 use async_task::Runnable;
9 use smol::future;
10
11 // Creates a future with event counters.
12 //
13 // Usage: `future!(f, POLL, DROP)`
14 //
15 // The future `f` always returns `Poll::Ready`.
16 // When it gets polled, `POLL` is incremented.
17 // When it gets dropped, `DROP` is incremented.
18 macro_rules! future {
19 ($name:pat, $poll:ident, $drop:ident) => {
20 static $poll: AtomicUsize = AtomicUsize::new(0);
21 static $drop: AtomicUsize = AtomicUsize::new(0);
22
23 let $name = {
24 struct Fut(#[allow(dead_code)] Box<i32>);
25
26 impl Future for Fut {
27 type Output = Box<i32>;
28
29 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
30 $poll.fetch_add(1, Ordering::SeqCst);
31 Poll::Ready(Box::new(0))
32 }
33 }
34
35 impl Drop for Fut {
36 fn drop(&mut self) {
37 $drop.fetch_add(1, Ordering::SeqCst);
38 }
39 }
40
41 Fut(Box::new(0))
42 };
43 };
44 }
45
46 // Creates a schedule function with event counters.
47 //
48 // Usage: `schedule!(s, SCHED, DROP)`
49 //
50 // The schedule function `s` does nothing.
51 // When it gets invoked, `SCHED` is incremented.
52 // When it gets dropped, `DROP` is incremented.
53 macro_rules! schedule {
54 ($name:pat, $sched:ident, $drop:ident) => {
55 static $drop: AtomicUsize = AtomicUsize::new(0);
56 static $sched: AtomicUsize = AtomicUsize::new(0);
57
58 let $name = {
59 struct Guard(#[allow(dead_code)] Box<i32>);
60
61 impl Drop for Guard {
62 fn drop(&mut self) {
63 $drop.fetch_add(1, Ordering::SeqCst);
64 }
65 }
66
67 let guard = Guard(Box::new(0));
68 move |_runnable| {
69 let _ = &guard;
70 $sched.fetch_add(1, Ordering::SeqCst);
71 }
72 };
73 };
74 }
75
try_await<T>(f: impl Future<Output = T>) -> Option<T>76 fn try_await<T>(f: impl Future<Output = T>) -> Option<T> {
77 future::block_on(future::poll_once(f))
78 }
79
80 #[test]
drop_and_detach()81 fn drop_and_detach() {
82 future!(f, POLL, DROP_F);
83 schedule!(s, SCHEDULE, DROP_S);
84 let (runnable, task) = async_task::spawn(f, s);
85
86 assert_eq!(POLL.load(Ordering::SeqCst), 0);
87 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
88 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
89 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
90
91 drop(runnable);
92 assert_eq!(POLL.load(Ordering::SeqCst), 0);
93 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
94 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
95 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
96
97 task.detach();
98 assert_eq!(POLL.load(Ordering::SeqCst), 0);
99 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
100 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
101 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
102 }
103
104 #[test]
detach_and_drop()105 fn detach_and_drop() {
106 future!(f, POLL, DROP_F);
107 schedule!(s, SCHEDULE, DROP_S);
108 let (runnable, task) = async_task::spawn(f, s);
109
110 task.detach();
111 assert_eq!(POLL.load(Ordering::SeqCst), 0);
112 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
113 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
114 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
115
116 drop(runnable);
117 assert_eq!(POLL.load(Ordering::SeqCst), 0);
118 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
119 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
120 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
121 }
122
123 #[test]
detach_and_run()124 fn detach_and_run() {
125 future!(f, POLL, DROP_F);
126 schedule!(s, SCHEDULE, DROP_S);
127 let (runnable, task) = async_task::spawn(f, s);
128
129 task.detach();
130 assert_eq!(POLL.load(Ordering::SeqCst), 0);
131 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
132 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
133 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
134
135 runnable.run();
136 assert_eq!(POLL.load(Ordering::SeqCst), 1);
137 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
138 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
139 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
140 }
141
142 #[test]
run_and_detach()143 fn run_and_detach() {
144 future!(f, POLL, DROP_F);
145 schedule!(s, SCHEDULE, DROP_S);
146 let (runnable, task) = async_task::spawn(f, s);
147
148 runnable.run();
149 assert_eq!(POLL.load(Ordering::SeqCst), 1);
150 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
151 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
152 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
153
154 task.detach();
155 assert_eq!(POLL.load(Ordering::SeqCst), 1);
156 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
157 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
158 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
159 }
160
161 #[test]
cancel_and_run()162 fn cancel_and_run() {
163 future!(f, POLL, DROP_F);
164 schedule!(s, SCHEDULE, DROP_S);
165 let (runnable, task) = async_task::spawn(f, s);
166
167 drop(task);
168 assert_eq!(POLL.load(Ordering::SeqCst), 0);
169 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
170 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
171 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
172
173 runnable.run();
174 assert_eq!(POLL.load(Ordering::SeqCst), 0);
175 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
176 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
177 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
178 }
179
180 #[test]
run_and_cancel()181 fn run_and_cancel() {
182 future!(f, POLL, DROP_F);
183 schedule!(s, SCHEDULE, DROP_S);
184 let (runnable, task) = async_task::spawn(f, s);
185
186 runnable.run();
187 assert_eq!(POLL.load(Ordering::SeqCst), 1);
188 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
189 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
190 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
191
192 drop(task);
193 assert_eq!(POLL.load(Ordering::SeqCst), 1);
194 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
195 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
196 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
197 }
198
199 #[test]
cancel_join()200 fn cancel_join() {
201 future!(f, POLL, DROP_F);
202 schedule!(s, SCHEDULE, DROP_S);
203 let (runnable, mut task) = async_task::spawn(f, s);
204
205 assert!(try_await(&mut task).is_none());
206 assert_eq!(POLL.load(Ordering::SeqCst), 0);
207 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
208 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
209 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
210
211 runnable.run();
212 assert_eq!(POLL.load(Ordering::SeqCst), 1);
213 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
214 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
215 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
216
217 assert!(try_await(&mut task).is_some());
218 assert_eq!(POLL.load(Ordering::SeqCst), 1);
219 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
220 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
221 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
222
223 drop(task);
224 assert_eq!(POLL.load(Ordering::SeqCst), 1);
225 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
226 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
227 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
228 }
229
230 #[test]
schedule()231 fn schedule() {
232 let (s, r) = flume::unbounded();
233 let schedule = move |runnable| s.send(runnable).unwrap();
234 let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
235
236 assert!(r.is_empty());
237 runnable.schedule();
238
239 let runnable = r.recv().unwrap();
240 assert!(r.is_empty());
241 runnable.schedule();
242
243 let runnable = r.recv().unwrap();
244 assert!(r.is_empty());
245 runnable.schedule();
246
247 r.recv().unwrap();
248 }
249
250 #[test]
schedule_counter()251 fn schedule_counter() {
252 static COUNT: AtomicUsize = AtomicUsize::new(0);
253
254 let (s, r) = flume::unbounded();
255 let schedule = move |runnable: Runnable| {
256 COUNT.fetch_add(1, Ordering::SeqCst);
257 s.send(runnable).unwrap();
258 };
259 let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
260 runnable.schedule();
261
262 r.recv().unwrap().schedule();
263 r.recv().unwrap().schedule();
264 assert_eq!(COUNT.load(Ordering::SeqCst), 3);
265 r.recv().unwrap();
266 }
267
268 #[test]
drop_inside_schedule()269 fn drop_inside_schedule() {
270 struct DropGuard(AtomicUsize);
271 impl Drop for DropGuard {
272 fn drop(&mut self) {
273 self.0.fetch_add(1, Ordering::SeqCst);
274 }
275 }
276 let guard = DropGuard(AtomicUsize::new(0));
277
278 let (runnable, _) = async_task::spawn(async {}, move |runnable| {
279 assert_eq!(guard.0.load(Ordering::SeqCst), 0);
280 drop(runnable);
281 assert_eq!(guard.0.load(Ordering::SeqCst), 0);
282 });
283 runnable.schedule();
284 }
285
286 #[test]
waker()287 fn waker() {
288 let (s, r) = flume::unbounded();
289 let schedule = move |runnable| s.send(runnable).unwrap();
290 let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
291
292 assert!(r.is_empty());
293 let waker = runnable.waker();
294 runnable.run();
295 waker.wake_by_ref();
296
297 let runnable = r.recv().unwrap();
298 runnable.run();
299 waker.wake();
300 r.recv().unwrap();
301 }
302
303 #[test]
raw()304 fn raw() {
305 // Dispatch schedules a function for execution at a later point. For tests, we execute it straight away.
306 fn dispatch(trampoline: extern "C" fn(NonNull<()>), context: NonNull<()>) {
307 trampoline(context)
308 }
309 extern "C" fn trampoline(runnable: NonNull<()>) {
310 let task = unsafe { Runnable::<()>::from_raw(runnable) };
311 task.run();
312 }
313
314 let task_got_executed = Arc::new(AtomicBool::new(false));
315 let (runnable, _handle) = async_task::spawn(
316 {
317 let task_got_executed = task_got_executed.clone();
318 async move { task_got_executed.store(true, Ordering::SeqCst) }
319 },
320 |runnable: Runnable<()>| dispatch(trampoline, runnable.into_raw()),
321 );
322 runnable.schedule();
323
324 assert!(task_got_executed.load(Ordering::SeqCst));
325 }
326