1 use std::cell::Cell;
2 use std::future::Future;
3 use std::pin::Pin;
4 use std::sync::atomic::{AtomicUsize, Ordering};
5 use std::task::{Context, Poll};
6 use std::thread;
7 use std::time::Duration;
8 
9 use async_task::Runnable;
10 use atomic_waker::AtomicWaker;
11 
12 // Creates a future with event counters.
13 //
14 // Usage: `future!(f, get_waker, POLL, DROP)`
15 //
16 // The future `f` always sleeps for 200 ms, and returns `Poll::Ready` the second time it is polled.
17 // When it gets polled, `POLL` is incremented.
18 // When it gets dropped, `DROP` is incremented.
19 //
20 // Every time the future is run, it stores the waker into a global variable.
21 // This waker can be extracted using the `get_waker()` function.
22 macro_rules! future {
23     ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => {
24         static $poll: AtomicUsize = AtomicUsize::new(0);
25         static $drop: AtomicUsize = AtomicUsize::new(0);
26         static WAKER: AtomicWaker = AtomicWaker::new();
27 
28         let ($name, $get_waker) = {
29             struct Fut(Cell<bool>, #[allow(dead_code)] Box<i32>);
30 
31             impl Future for Fut {
32                 type Output = Box<i32>;
33 
34                 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35                     WAKER.register(cx.waker());
36                     $poll.fetch_add(1, Ordering::SeqCst);
37                     thread::sleep(ms(200));
38 
39                     if self.0.get() {
40                         Poll::Ready(Box::new(0))
41                     } else {
42                         self.0.set(true);
43                         Poll::Pending
44                     }
45                 }
46             }
47 
48             impl Drop for Fut {
49                 fn drop(&mut self) {
50                     $drop.fetch_add(1, Ordering::SeqCst);
51                 }
52             }
53 
54             (Fut(Cell::new(false), Box::new(0)), || WAKER.take().unwrap())
55         };
56     };
57 }
58 
59 // Creates a schedule function with event counters.
60 //
61 // Usage: `schedule!(s, chan, SCHED, DROP)`
62 //
63 // The schedule function `s` pushes the task into `chan`.
64 // When it gets invoked, `SCHED` is incremented.
65 // When it gets dropped, `DROP` is incremented.
66 //
67 // Receiver `chan` extracts the task when it is scheduled.
68 macro_rules! schedule {
69     ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
70         static $drop: AtomicUsize = AtomicUsize::new(0);
71         static $sched: AtomicUsize = AtomicUsize::new(0);
72 
73         let ($name, $chan) = {
74             let (s, r) = flume::unbounded();
75 
76             struct Guard(#[allow(dead_code)] Box<i32>);
77 
78             impl Drop for Guard {
79                 fn drop(&mut self) {
80                     $drop.fetch_add(1, Ordering::SeqCst);
81                 }
82             }
83 
84             let guard = Guard(Box::new(0));
85             let sched = move |runnable: Runnable| {
86                 let _ = &guard;
87                 $sched.fetch_add(1, Ordering::SeqCst);
88                 s.send(runnable).unwrap();
89             };
90 
91             (sched, r)
92         };
93     };
94 }
95 
ms(ms: u64) -> Duration96 fn ms(ms: u64) -> Duration {
97     Duration::from_millis(ms)
98 }
99 
100 #[test]
wake()101 fn wake() {
102     future!(f, get_waker, POLL, DROP_F);
103     schedule!(s, chan, SCHEDULE, DROP_S);
104     let (mut runnable, task) = async_task::spawn(f, s);
105     task.detach();
106 
107     assert!(chan.is_empty());
108 
109     runnable.run();
110     assert_eq!(POLL.load(Ordering::SeqCst), 1);
111     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
112     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
113     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
114     assert_eq!(chan.len(), 0);
115 
116     get_waker().wake();
117     runnable = chan.recv().unwrap();
118     assert_eq!(POLL.load(Ordering::SeqCst), 1);
119     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
120     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
121     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
122     assert_eq!(chan.len(), 0);
123 
124     runnable.run();
125     assert_eq!(POLL.load(Ordering::SeqCst), 2);
126     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
127     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
128     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
129     assert_eq!(chan.len(), 0);
130 
131     get_waker().wake();
132     assert_eq!(POLL.load(Ordering::SeqCst), 2);
133     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
134     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
135     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
136     assert_eq!(chan.len(), 0);
137 }
138 
139 #[test]
wake_by_ref()140 fn wake_by_ref() {
141     future!(f, get_waker, POLL, DROP_F);
142     schedule!(s, chan, SCHEDULE, DROP_S);
143     let (mut runnable, task) = async_task::spawn(f, s);
144     task.detach();
145 
146     assert!(chan.is_empty());
147 
148     runnable.run();
149     assert_eq!(POLL.load(Ordering::SeqCst), 1);
150     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
151     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
152     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
153     assert_eq!(chan.len(), 0);
154 
155     get_waker().wake_by_ref();
156     runnable = chan.recv().unwrap();
157     assert_eq!(POLL.load(Ordering::SeqCst), 1);
158     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
159     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
160     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
161     assert_eq!(chan.len(), 0);
162 
163     runnable.run();
164     assert_eq!(POLL.load(Ordering::SeqCst), 2);
165     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
166     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
167     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
168     assert_eq!(chan.len(), 0);
169 
170     get_waker().wake_by_ref();
171     assert_eq!(POLL.load(Ordering::SeqCst), 2);
172     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
173     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
174     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
175     assert_eq!(chan.len(), 0);
176 }
177 
178 #[allow(clippy::redundant_clone)] // This is intentional
179 #[test]
clone()180 fn clone() {
181     future!(f, get_waker, POLL, DROP_F);
182     schedule!(s, chan, SCHEDULE, DROP_S);
183     let (mut runnable, task) = async_task::spawn(f, s);
184     task.detach();
185 
186     runnable.run();
187     assert_eq!(POLL.load(Ordering::SeqCst), 1);
188     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
189     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
190     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
191     assert_eq!(chan.len(), 0);
192 
193     let w2 = get_waker().clone();
194     let w3 = w2.clone();
195     let w4 = w3.clone();
196     w4.wake();
197 
198     runnable = chan.recv().unwrap();
199     runnable.run();
200     assert_eq!(POLL.load(Ordering::SeqCst), 2);
201     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
202     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
203     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
204     assert_eq!(chan.len(), 0);
205 
206     w3.wake();
207     assert_eq!(POLL.load(Ordering::SeqCst), 2);
208     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
209     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
210     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
211     assert_eq!(chan.len(), 0);
212 
213     drop(w2);
214     drop(get_waker());
215     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
216 }
217 
218 #[test]
wake_dropped()219 fn wake_dropped() {
220     future!(f, get_waker, POLL, DROP_F);
221     schedule!(s, chan, SCHEDULE, DROP_S);
222     let (runnable, task) = async_task::spawn(f, s);
223     task.detach();
224 
225     runnable.run();
226     assert_eq!(POLL.load(Ordering::SeqCst), 1);
227     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
228     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
229     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
230     assert_eq!(chan.len(), 0);
231 
232     let waker = get_waker();
233 
234     waker.wake_by_ref();
235     drop(chan.recv().unwrap());
236     assert_eq!(POLL.load(Ordering::SeqCst), 1);
237     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
238     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
239     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
240     assert_eq!(chan.len(), 0);
241 
242     waker.wake();
243     assert_eq!(POLL.load(Ordering::SeqCst), 1);
244     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
245     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
246     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
247     assert_eq!(chan.len(), 0);
248 }
249 
250 #[test]
wake_completed()251 fn wake_completed() {
252     future!(f, get_waker, POLL, DROP_F);
253     schedule!(s, chan, SCHEDULE, DROP_S);
254     let (runnable, task) = async_task::spawn(f, s);
255     task.detach();
256 
257     runnable.run();
258     let waker = get_waker();
259     assert_eq!(POLL.load(Ordering::SeqCst), 1);
260     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
261     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
262     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
263     assert_eq!(chan.len(), 0);
264 
265     waker.wake();
266     chan.recv().unwrap().run();
267     assert_eq!(POLL.load(Ordering::SeqCst), 2);
268     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
269     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
270     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
271     assert_eq!(chan.len(), 0);
272 
273     get_waker().wake();
274     assert_eq!(POLL.load(Ordering::SeqCst), 2);
275     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
276     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
277     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
278     assert_eq!(chan.len(), 0);
279 }
280