1 use std::cell::Cell;
2 use std::future::Future;
3 use std::pin::Pin;
4 use std::sync::atomic::{AtomicUsize, Ordering};
5 use std::task::{Context, Poll};
6 use std::thread;
7 use std::time::Duration;
8
9 use async_task::Runnable;
10 use atomic_waker::AtomicWaker;
11
12 // Creates a future with event counters.
13 //
14 // Usage: `future!(f, get_waker, POLL, DROP)`
15 //
16 // The future `f` always sleeps for 200 ms, and returns `Poll::Ready` the second time it is polled.
17 // When it gets polled, `POLL` is incremented.
18 // When it gets dropped, `DROP` is incremented.
19 //
20 // Every time the future is run, it stores the waker into a global variable.
21 // This waker can be extracted using the `get_waker()` function.
22 macro_rules! future {
23 ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => {
24 static $poll: AtomicUsize = AtomicUsize::new(0);
25 static $drop: AtomicUsize = AtomicUsize::new(0);
26 static WAKER: AtomicWaker = AtomicWaker::new();
27
28 let ($name, $get_waker) = {
29 struct Fut(Cell<bool>, #[allow(dead_code)] Box<i32>);
30
31 impl Future for Fut {
32 type Output = Box<i32>;
33
34 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35 WAKER.register(cx.waker());
36 $poll.fetch_add(1, Ordering::SeqCst);
37 thread::sleep(ms(200));
38
39 if self.0.get() {
40 Poll::Ready(Box::new(0))
41 } else {
42 self.0.set(true);
43 Poll::Pending
44 }
45 }
46 }
47
48 impl Drop for Fut {
49 fn drop(&mut self) {
50 $drop.fetch_add(1, Ordering::SeqCst);
51 }
52 }
53
54 (Fut(Cell::new(false), Box::new(0)), || WAKER.take().unwrap())
55 };
56 };
57 }
58
59 // Creates a schedule function with event counters.
60 //
61 // Usage: `schedule!(s, chan, SCHED, DROP)`
62 //
63 // The schedule function `s` pushes the task into `chan`.
64 // When it gets invoked, `SCHED` is incremented.
65 // When it gets dropped, `DROP` is incremented.
66 //
67 // Receiver `chan` extracts the task when it is scheduled.
68 macro_rules! schedule {
69 ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
70 static $drop: AtomicUsize = AtomicUsize::new(0);
71 static $sched: AtomicUsize = AtomicUsize::new(0);
72
73 let ($name, $chan) = {
74 let (s, r) = flume::unbounded();
75
76 struct Guard(#[allow(dead_code)] Box<i32>);
77
78 impl Drop for Guard {
79 fn drop(&mut self) {
80 $drop.fetch_add(1, Ordering::SeqCst);
81 }
82 }
83
84 let guard = Guard(Box::new(0));
85 let sched = move |runnable: Runnable| {
86 let _ = &guard;
87 $sched.fetch_add(1, Ordering::SeqCst);
88 s.send(runnable).unwrap();
89 };
90
91 (sched, r)
92 };
93 };
94 }
95
ms(ms: u64) -> Duration96 fn ms(ms: u64) -> Duration {
97 Duration::from_millis(ms)
98 }
99
100 #[test]
wake()101 fn wake() {
102 future!(f, get_waker, POLL, DROP_F);
103 schedule!(s, chan, SCHEDULE, DROP_S);
104 let (mut runnable, task) = async_task::spawn(f, s);
105 task.detach();
106
107 assert!(chan.is_empty());
108
109 runnable.run();
110 assert_eq!(POLL.load(Ordering::SeqCst), 1);
111 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
112 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
113 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
114 assert_eq!(chan.len(), 0);
115
116 get_waker().wake();
117 runnable = chan.recv().unwrap();
118 assert_eq!(POLL.load(Ordering::SeqCst), 1);
119 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
120 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
121 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
122 assert_eq!(chan.len(), 0);
123
124 runnable.run();
125 assert_eq!(POLL.load(Ordering::SeqCst), 2);
126 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
127 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
128 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
129 assert_eq!(chan.len(), 0);
130
131 get_waker().wake();
132 assert_eq!(POLL.load(Ordering::SeqCst), 2);
133 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
134 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
135 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
136 assert_eq!(chan.len(), 0);
137 }
138
139 #[test]
wake_by_ref()140 fn wake_by_ref() {
141 future!(f, get_waker, POLL, DROP_F);
142 schedule!(s, chan, SCHEDULE, DROP_S);
143 let (mut runnable, task) = async_task::spawn(f, s);
144 task.detach();
145
146 assert!(chan.is_empty());
147
148 runnable.run();
149 assert_eq!(POLL.load(Ordering::SeqCst), 1);
150 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
151 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
152 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
153 assert_eq!(chan.len(), 0);
154
155 get_waker().wake_by_ref();
156 runnable = chan.recv().unwrap();
157 assert_eq!(POLL.load(Ordering::SeqCst), 1);
158 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
159 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
160 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
161 assert_eq!(chan.len(), 0);
162
163 runnable.run();
164 assert_eq!(POLL.load(Ordering::SeqCst), 2);
165 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
166 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
167 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
168 assert_eq!(chan.len(), 0);
169
170 get_waker().wake_by_ref();
171 assert_eq!(POLL.load(Ordering::SeqCst), 2);
172 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
173 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
174 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
175 assert_eq!(chan.len(), 0);
176 }
177
178 #[allow(clippy::redundant_clone)] // This is intentional
179 #[test]
clone()180 fn clone() {
181 future!(f, get_waker, POLL, DROP_F);
182 schedule!(s, chan, SCHEDULE, DROP_S);
183 let (mut runnable, task) = async_task::spawn(f, s);
184 task.detach();
185
186 runnable.run();
187 assert_eq!(POLL.load(Ordering::SeqCst), 1);
188 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
189 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
190 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
191 assert_eq!(chan.len(), 0);
192
193 let w2 = get_waker().clone();
194 let w3 = w2.clone();
195 let w4 = w3.clone();
196 w4.wake();
197
198 runnable = chan.recv().unwrap();
199 runnable.run();
200 assert_eq!(POLL.load(Ordering::SeqCst), 2);
201 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
202 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
203 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
204 assert_eq!(chan.len(), 0);
205
206 w3.wake();
207 assert_eq!(POLL.load(Ordering::SeqCst), 2);
208 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
209 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
210 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
211 assert_eq!(chan.len(), 0);
212
213 drop(w2);
214 drop(get_waker());
215 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
216 }
217
218 #[test]
wake_dropped()219 fn wake_dropped() {
220 future!(f, get_waker, POLL, DROP_F);
221 schedule!(s, chan, SCHEDULE, DROP_S);
222 let (runnable, task) = async_task::spawn(f, s);
223 task.detach();
224
225 runnable.run();
226 assert_eq!(POLL.load(Ordering::SeqCst), 1);
227 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
228 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
229 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
230 assert_eq!(chan.len(), 0);
231
232 let waker = get_waker();
233
234 waker.wake_by_ref();
235 drop(chan.recv().unwrap());
236 assert_eq!(POLL.load(Ordering::SeqCst), 1);
237 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
238 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
239 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
240 assert_eq!(chan.len(), 0);
241
242 waker.wake();
243 assert_eq!(POLL.load(Ordering::SeqCst), 1);
244 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
245 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
246 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
247 assert_eq!(chan.len(), 0);
248 }
249
250 #[test]
wake_completed()251 fn wake_completed() {
252 future!(f, get_waker, POLL, DROP_F);
253 schedule!(s, chan, SCHEDULE, DROP_S);
254 let (runnable, task) = async_task::spawn(f, s);
255 task.detach();
256
257 runnable.run();
258 let waker = get_waker();
259 assert_eq!(POLL.load(Ordering::SeqCst), 1);
260 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
261 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
262 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
263 assert_eq!(chan.len(), 0);
264
265 waker.wake();
266 chan.recv().unwrap().run();
267 assert_eq!(POLL.load(Ordering::SeqCst), 2);
268 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
269 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
270 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
271 assert_eq!(chan.len(), 0);
272
273 get_waker().wake();
274 assert_eq!(POLL.load(Ordering::SeqCst), 2);
275 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
276 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
277 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
278 assert_eq!(chan.len(), 0);
279 }
280