1 #![cfg(not(target_os = "wasi"))]
2 
3 use std::{task::Context, time::Duration};
4 
5 #[cfg(not(loom))]
6 use futures::task::noop_waker_ref;
7 
8 use crate::loom::sync::atomic::{AtomicBool, Ordering};
9 use crate::loom::sync::Arc;
10 use crate::loom::thread;
11 
12 use super::TimerEntry;
13 
block_on<T>(f: impl std::future::Future<Output = T>) -> T14 fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
15     #[cfg(loom)]
16     return loom::future::block_on(f);
17 
18     #[cfg(not(loom))]
19     {
20         let rt = crate::runtime::Builder::new_current_thread()
21             .build()
22             .unwrap();
23         rt.block_on(f)
24     }
25 }
26 
model(f: impl Fn() + Send + Sync + 'static)27 fn model(f: impl Fn() + Send + Sync + 'static) {
28     #[cfg(loom)]
29     loom::model(f);
30 
31     #[cfg(not(loom))]
32     f();
33 }
34 
rt(start_paused: bool) -> crate::runtime::Runtime35 fn rt(start_paused: bool) -> crate::runtime::Runtime {
36     crate::runtime::Builder::new_current_thread()
37         .enable_time()
38         .start_paused(start_paused)
39         .build()
40         .unwrap()
41 }
42 
43 #[test]
single_timer()44 fn single_timer() {
45     model(|| {
46         let rt = rt(false);
47         let handle = rt.handle();
48 
49         let handle_ = handle.clone();
50         let jh = thread::spawn(move || {
51             let entry = TimerEntry::new(
52                 handle_.inner.clone(),
53                 handle_.inner.driver().clock().now() + Duration::from_secs(1),
54             );
55             pin!(entry);
56 
57             block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
58         });
59 
60         thread::yield_now();
61 
62         let time = handle.inner.driver().time();
63         let clock = handle.inner.driver().clock();
64 
65         // This may or may not return Some (depending on how it races with the
66         // thread). If it does return None, however, the timer should complete
67         // synchronously.
68         time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000);
69 
70         jh.join().unwrap();
71     })
72 }
73 
74 #[test]
drop_timer()75 fn drop_timer() {
76     model(|| {
77         let rt = rt(false);
78         let handle = rt.handle();
79 
80         let handle_ = handle.clone();
81         let jh = thread::spawn(move || {
82             let entry = TimerEntry::new(
83                 handle_.inner.clone(),
84                 handle_.inner.driver().clock().now() + Duration::from_secs(1),
85             );
86             pin!(entry);
87 
88             let _ = entry
89                 .as_mut()
90                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
91             let _ = entry
92                 .as_mut()
93                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
94         });
95 
96         thread::yield_now();
97 
98         let time = handle.inner.driver().time();
99         let clock = handle.inner.driver().clock();
100 
101         // advance 2s in the future.
102         time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000);
103 
104         jh.join().unwrap();
105     })
106 }
107 
108 #[test]
change_waker()109 fn change_waker() {
110     model(|| {
111         let rt = rt(false);
112         let handle = rt.handle();
113 
114         let handle_ = handle.clone();
115         let jh = thread::spawn(move || {
116             let entry = TimerEntry::new(
117                 handle_.inner.clone(),
118                 handle_.inner.driver().clock().now() + Duration::from_secs(1),
119             );
120             pin!(entry);
121 
122             let _ = entry
123                 .as_mut()
124                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
125 
126             block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
127         });
128 
129         thread::yield_now();
130 
131         let time = handle.inner.driver().time();
132         let clock = handle.inner.driver().clock();
133 
134         // advance 2s
135         time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000);
136 
137         jh.join().unwrap();
138     })
139 }
140 
141 #[test]
reset_future()142 fn reset_future() {
143     model(|| {
144         let finished_early = Arc::new(AtomicBool::new(false));
145 
146         let rt = rt(false);
147         let handle = rt.handle();
148 
149         let handle_ = handle.clone();
150         let finished_early_ = finished_early.clone();
151         let start = handle.inner.driver().clock().now();
152 
153         let jh = thread::spawn(move || {
154             let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1));
155             pin!(entry);
156 
157             let _ = entry
158                 .as_mut()
159                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
160 
161             entry.as_mut().reset(start + Duration::from_secs(2), true);
162 
163             // shouldn't complete before 2s
164             block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
165 
166             finished_early_.store(true, Ordering::Relaxed);
167         });
168 
169         thread::yield_now();
170 
171         let handle = handle.inner.driver().time();
172 
173         // This may or may not return a wakeup time.
174         handle.process_at_time(
175             0,
176             handle
177                 .time_source()
178                 .instant_to_tick(start + Duration::from_millis(1500)),
179         );
180 
181         assert!(!finished_early.load(Ordering::Relaxed));
182 
183         handle.process_at_time(
184             0,
185             handle
186                 .time_source()
187                 .instant_to_tick(start + Duration::from_millis(2500)),
188         );
189 
190         jh.join().unwrap();
191 
192         assert!(finished_early.load(Ordering::Relaxed));
193     })
194 }
195 
196 #[cfg(not(loom))]
normal_or_miri<T>(normal: T, miri: T) -> T197 fn normal_or_miri<T>(normal: T, miri: T) -> T {
198     if cfg!(miri) {
199         miri
200     } else {
201         normal
202     }
203 }
204 
205 #[test]
206 #[cfg(not(loom))]
poll_process_levels()207 fn poll_process_levels() {
208     let rt = rt(true);
209     let handle = rt.handle();
210 
211     let mut entries = vec![];
212 
213     for i in 0..normal_or_miri(1024, 64) {
214         let mut entry = Box::pin(TimerEntry::new(
215             handle.inner.clone(),
216             handle.inner.driver().clock().now() + Duration::from_millis(i),
217         ));
218 
219         let _ = entry
220             .as_mut()
221             .poll_elapsed(&mut Context::from_waker(noop_waker_ref()));
222 
223         entries.push(entry);
224     }
225 
226     for t in 1..normal_or_miri(1024, 64) {
227         handle.inner.driver().time().process_at_time(0, t as u64);
228 
229         for (deadline, future) in entries.iter_mut().enumerate() {
230             let mut context = Context::from_waker(noop_waker_ref());
231             if deadline <= t {
232                 assert!(future.as_mut().poll_elapsed(&mut context).is_ready());
233             } else {
234                 assert!(future.as_mut().poll_elapsed(&mut context).is_pending());
235             }
236         }
237     }
238 }
239 
240 #[test]
241 #[cfg(not(loom))]
poll_process_levels_targeted()242 fn poll_process_levels_targeted() {
243     let mut context = Context::from_waker(noop_waker_ref());
244 
245     let rt = rt(true);
246     let handle = rt.handle();
247 
248     let e1 = TimerEntry::new(
249         handle.inner.clone(),
250         handle.inner.driver().clock().now() + Duration::from_millis(193),
251     );
252     pin!(e1);
253 
254     let handle = handle.inner.driver().time();
255 
256     handle.process_at_time(0, 62);
257     assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
258     handle.process_at_time(0, 192);
259     handle.process_at_time(0, 192);
260 }
261 
262 #[test]
263 #[cfg(not(loom))]
instant_to_tick_max()264 fn instant_to_tick_max() {
265     use crate::runtime::time::entry::MAX_SAFE_MILLIS_DURATION;
266 
267     let rt = rt(true);
268     let handle = rt.handle().inner.driver().time();
269 
270     let start_time = handle.time_source.start_time();
271     let long_future = start_time + std::time::Duration::from_millis(MAX_SAFE_MILLIS_DURATION + 1);
272 
273     assert!(handle.time_source.instant_to_tick(long_future) <= MAX_SAFE_MILLIS_DURATION);
274 }
275