1 #![cfg(not(target_os = "wasi"))]
2
3 use std::{task::Context, time::Duration};
4
5 #[cfg(not(loom))]
6 use futures::task::noop_waker_ref;
7
8 use crate::loom::sync::atomic::{AtomicBool, Ordering};
9 use crate::loom::sync::Arc;
10 use crate::loom::thread;
11
12 use super::TimerEntry;
13
block_on<T>(f: impl std::future::Future<Output = T>) -> T14 fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
15 #[cfg(loom)]
16 return loom::future::block_on(f);
17
18 #[cfg(not(loom))]
19 {
20 let rt = crate::runtime::Builder::new_current_thread()
21 .build()
22 .unwrap();
23 rt.block_on(f)
24 }
25 }
26
model(f: impl Fn() + Send + Sync + 'static)27 fn model(f: impl Fn() + Send + Sync + 'static) {
28 #[cfg(loom)]
29 loom::model(f);
30
31 #[cfg(not(loom))]
32 f();
33 }
34
rt(start_paused: bool) -> crate::runtime::Runtime35 fn rt(start_paused: bool) -> crate::runtime::Runtime {
36 crate::runtime::Builder::new_current_thread()
37 .enable_time()
38 .start_paused(start_paused)
39 .build()
40 .unwrap()
41 }
42
43 #[test]
single_timer()44 fn single_timer() {
45 model(|| {
46 let rt = rt(false);
47 let handle = rt.handle();
48
49 let handle_ = handle.clone();
50 let jh = thread::spawn(move || {
51 let entry = TimerEntry::new(
52 handle_.inner.clone(),
53 handle_.inner.driver().clock().now() + Duration::from_secs(1),
54 );
55 pin!(entry);
56
57 block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
58 });
59
60 thread::yield_now();
61
62 let time = handle.inner.driver().time();
63 let clock = handle.inner.driver().clock();
64
65 // This may or may not return Some (depending on how it races with the
66 // thread). If it does return None, however, the timer should complete
67 // synchronously.
68 time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000);
69
70 jh.join().unwrap();
71 })
72 }
73
74 #[test]
drop_timer()75 fn drop_timer() {
76 model(|| {
77 let rt = rt(false);
78 let handle = rt.handle();
79
80 let handle_ = handle.clone();
81 let jh = thread::spawn(move || {
82 let entry = TimerEntry::new(
83 handle_.inner.clone(),
84 handle_.inner.driver().clock().now() + Duration::from_secs(1),
85 );
86 pin!(entry);
87
88 let _ = entry
89 .as_mut()
90 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
91 let _ = entry
92 .as_mut()
93 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
94 });
95
96 thread::yield_now();
97
98 let time = handle.inner.driver().time();
99 let clock = handle.inner.driver().clock();
100
101 // advance 2s in the future.
102 time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000);
103
104 jh.join().unwrap();
105 })
106 }
107
108 #[test]
change_waker()109 fn change_waker() {
110 model(|| {
111 let rt = rt(false);
112 let handle = rt.handle();
113
114 let handle_ = handle.clone();
115 let jh = thread::spawn(move || {
116 let entry = TimerEntry::new(
117 handle_.inner.clone(),
118 handle_.inner.driver().clock().now() + Duration::from_secs(1),
119 );
120 pin!(entry);
121
122 let _ = entry
123 .as_mut()
124 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
125
126 block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
127 });
128
129 thread::yield_now();
130
131 let time = handle.inner.driver().time();
132 let clock = handle.inner.driver().clock();
133
134 // advance 2s
135 time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000);
136
137 jh.join().unwrap();
138 })
139 }
140
141 #[test]
reset_future()142 fn reset_future() {
143 model(|| {
144 let finished_early = Arc::new(AtomicBool::new(false));
145
146 let rt = rt(false);
147 let handle = rt.handle();
148
149 let handle_ = handle.clone();
150 let finished_early_ = finished_early.clone();
151 let start = handle.inner.driver().clock().now();
152
153 let jh = thread::spawn(move || {
154 let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1));
155 pin!(entry);
156
157 let _ = entry
158 .as_mut()
159 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
160
161 entry.as_mut().reset(start + Duration::from_secs(2), true);
162
163 // shouldn't complete before 2s
164 block_on(std::future::poll_fn(|cx| entry.as_mut().poll_elapsed(cx))).unwrap();
165
166 finished_early_.store(true, Ordering::Relaxed);
167 });
168
169 thread::yield_now();
170
171 let handle = handle.inner.driver().time();
172
173 // This may or may not return a wakeup time.
174 handle.process_at_time(
175 0,
176 handle
177 .time_source()
178 .instant_to_tick(start + Duration::from_millis(1500)),
179 );
180
181 assert!(!finished_early.load(Ordering::Relaxed));
182
183 handle.process_at_time(
184 0,
185 handle
186 .time_source()
187 .instant_to_tick(start + Duration::from_millis(2500)),
188 );
189
190 jh.join().unwrap();
191
192 assert!(finished_early.load(Ordering::Relaxed));
193 })
194 }
195
196 #[cfg(not(loom))]
normal_or_miri<T>(normal: T, miri: T) -> T197 fn normal_or_miri<T>(normal: T, miri: T) -> T {
198 if cfg!(miri) {
199 miri
200 } else {
201 normal
202 }
203 }
204
205 #[test]
206 #[cfg(not(loom))]
poll_process_levels()207 fn poll_process_levels() {
208 let rt = rt(true);
209 let handle = rt.handle();
210
211 let mut entries = vec![];
212
213 for i in 0..normal_or_miri(1024, 64) {
214 let mut entry = Box::pin(TimerEntry::new(
215 handle.inner.clone(),
216 handle.inner.driver().clock().now() + Duration::from_millis(i),
217 ));
218
219 let _ = entry
220 .as_mut()
221 .poll_elapsed(&mut Context::from_waker(noop_waker_ref()));
222
223 entries.push(entry);
224 }
225
226 for t in 1..normal_or_miri(1024, 64) {
227 handle.inner.driver().time().process_at_time(0, t as u64);
228
229 for (deadline, future) in entries.iter_mut().enumerate() {
230 let mut context = Context::from_waker(noop_waker_ref());
231 if deadline <= t {
232 assert!(future.as_mut().poll_elapsed(&mut context).is_ready());
233 } else {
234 assert!(future.as_mut().poll_elapsed(&mut context).is_pending());
235 }
236 }
237 }
238 }
239
240 #[test]
241 #[cfg(not(loom))]
poll_process_levels_targeted()242 fn poll_process_levels_targeted() {
243 let mut context = Context::from_waker(noop_waker_ref());
244
245 let rt = rt(true);
246 let handle = rt.handle();
247
248 let e1 = TimerEntry::new(
249 handle.inner.clone(),
250 handle.inner.driver().clock().now() + Duration::from_millis(193),
251 );
252 pin!(e1);
253
254 let handle = handle.inner.driver().time();
255
256 handle.process_at_time(0, 62);
257 assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
258 handle.process_at_time(0, 192);
259 handle.process_at_time(0, 192);
260 }
261
262 #[test]
263 #[cfg(not(loom))]
instant_to_tick_max()264 fn instant_to_tick_max() {
265 use crate::runtime::time::entry::MAX_SAFE_MILLIS_DURATION;
266
267 let rt = rt(true);
268 let handle = rt.handle().inner.driver().time();
269
270 let start_time = handle.time_source.start_time();
271 let long_future = start_time + std::time::Duration::from_millis(MAX_SAFE_MILLIS_DURATION + 1);
272
273 assert!(handle.time_source.instant_to_tick(long_future) <= MAX_SAFE_MILLIS_DURATION);
274 }
275