1 use futures::channel::oneshot;
2 use futures::executor::LocalPool;
3 use futures::future::{self, lazy, poll_fn, Future};
4 use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
5 use std::cell::{Cell, RefCell};
6 use std::marker::PhantomData;
7 use std::pin::Pin;
8 use std::rc::Rc;
9 use std::sync::atomic::{AtomicBool, Ordering};
10 use std::sync::Arc;
11 use std::thread;
12 use std::time::Duration;
13
14 struct Pending(PhantomData<Rc<()>>);
15
16 impl Future for Pending {
17 type Output = ();
18
poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()>19 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
20 Poll::Pending
21 }
22 }
23
pending() -> Pending24 fn pending() -> Pending {
25 Pending(PhantomData)
26 }
27
28 #[test]
run_until_single_future()29 fn run_until_single_future() {
30 let mut cnt = 0;
31
32 {
33 let mut pool = LocalPool::new();
34 let fut = lazy(|_| {
35 cnt += 1;
36 });
37 pool.run_until(fut);
38 }
39
40 assert_eq!(cnt, 1);
41 }
42
43 #[test]
run_until_ignores_spawned()44 fn run_until_ignores_spawned() {
45 let mut pool = LocalPool::new();
46 let spawn = pool.spawner();
47 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
48 pool.run_until(lazy(|_| ()));
49 }
50
51 #[test]
run_until_executes_spawned()52 fn run_until_executes_spawned() {
53 let (tx, rx) = oneshot::channel();
54 let mut pool = LocalPool::new();
55 let spawn = pool.spawner();
56 spawn
57 .spawn_local_obj(
58 Box::pin(lazy(move |_| {
59 tx.send(()).unwrap();
60 }))
61 .into(),
62 )
63 .unwrap();
64 pool.run_until(rx).unwrap();
65 }
66
67 #[test]
run_returns_if_empty()68 fn run_returns_if_empty() {
69 let mut pool = LocalPool::new();
70 pool.run();
71 pool.run();
72 }
73
74 #[test]
run_executes_spawned()75 fn run_executes_spawned() {
76 let cnt = Rc::new(Cell::new(0));
77 let cnt2 = cnt.clone();
78
79 let mut pool = LocalPool::new();
80 let spawn = pool.spawner();
81 let spawn2 = pool.spawner();
82
83 spawn
84 .spawn_local_obj(
85 Box::pin(lazy(move |_| {
86 spawn2
87 .spawn_local_obj(
88 Box::pin(lazy(move |_| {
89 cnt2.set(cnt2.get() + 1);
90 }))
91 .into(),
92 )
93 .unwrap();
94 }))
95 .into(),
96 )
97 .unwrap();
98
99 pool.run();
100
101 assert_eq!(cnt.get(), 1);
102 }
103
104 #[test]
run_spawn_many()105 fn run_spawn_many() {
106 const ITER: usize = 200;
107
108 let cnt = Rc::new(Cell::new(0));
109
110 let mut pool = LocalPool::new();
111 let spawn = pool.spawner();
112
113 for _ in 0..ITER {
114 let cnt = cnt.clone();
115 spawn
116 .spawn_local_obj(
117 Box::pin(lazy(move |_| {
118 cnt.set(cnt.get() + 1);
119 }))
120 .into(),
121 )
122 .unwrap();
123 }
124
125 pool.run();
126
127 assert_eq!(cnt.get(), ITER);
128 }
129
130 #[test]
try_run_one_returns_if_empty()131 fn try_run_one_returns_if_empty() {
132 let mut pool = LocalPool::new();
133 assert!(!pool.try_run_one());
134 }
135
136 #[test]
try_run_one_executes_one_ready()137 fn try_run_one_executes_one_ready() {
138 const ITER: usize = 200;
139
140 let cnt = Rc::new(Cell::new(0));
141
142 let mut pool = LocalPool::new();
143 let spawn = pool.spawner();
144
145 for _ in 0..ITER {
146 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
147
148 let cnt = cnt.clone();
149 spawn
150 .spawn_local_obj(
151 Box::pin(lazy(move |_| {
152 cnt.set(cnt.get() + 1);
153 }))
154 .into(),
155 )
156 .unwrap();
157
158 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
159 }
160
161 for i in 0..ITER {
162 assert_eq!(cnt.get(), i);
163 assert!(pool.try_run_one());
164 assert_eq!(cnt.get(), i + 1);
165 }
166 assert!(!pool.try_run_one());
167 }
168
169 #[test]
try_run_one_returns_on_no_progress()170 fn try_run_one_returns_on_no_progress() {
171 const ITER: usize = 10;
172
173 let cnt = Rc::new(Cell::new(0));
174
175 let mut pool = LocalPool::new();
176 let spawn = pool.spawner();
177
178 let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
179 {
180 let cnt = cnt.clone();
181 let waker = waker.clone();
182 spawn
183 .spawn_local_obj(
184 Box::pin(poll_fn(move |ctx| {
185 cnt.set(cnt.get() + 1);
186 waker.set(Some(ctx.waker().clone()));
187 if cnt.get() == ITER {
188 Poll::Ready(())
189 } else {
190 Poll::Pending
191 }
192 }))
193 .into(),
194 )
195 .unwrap();
196 }
197
198 for i in 0..ITER - 1 {
199 assert_eq!(cnt.get(), i);
200 assert!(!pool.try_run_one());
201 assert_eq!(cnt.get(), i + 1);
202 let w = waker.take();
203 assert!(w.is_some());
204 w.unwrap().wake();
205 }
206 assert!(pool.try_run_one());
207 assert_eq!(cnt.get(), ITER);
208 }
209
210 #[test]
try_run_one_runs_sub_futures()211 fn try_run_one_runs_sub_futures() {
212 let mut pool = LocalPool::new();
213 let spawn = pool.spawner();
214 let cnt = Rc::new(Cell::new(0));
215
216 let inner_spawner = spawn.clone();
217 let cnt1 = cnt.clone();
218 spawn
219 .spawn_local_obj(
220 Box::pin(poll_fn(move |_| {
221 cnt1.set(cnt1.get() + 1);
222
223 let cnt2 = cnt1.clone();
224 inner_spawner
225 .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
226 .unwrap();
227
228 Poll::Pending
229 }))
230 .into(),
231 )
232 .unwrap();
233
234 pool.try_run_one();
235 assert_eq!(cnt.get(), 2);
236 }
237
238 #[test]
run_until_stalled_returns_if_empty()239 fn run_until_stalled_returns_if_empty() {
240 let mut pool = LocalPool::new();
241 pool.run_until_stalled();
242 pool.run_until_stalled();
243 }
244
245 #[test]
run_until_stalled_returns_multiple_times()246 fn run_until_stalled_returns_multiple_times() {
247 let mut pool = LocalPool::new();
248 let spawn = pool.spawner();
249 let cnt = Rc::new(Cell::new(0));
250
251 let cnt1 = cnt.clone();
252 spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap();
253 pool.run_until_stalled();
254 assert_eq!(cnt.get(), 1);
255
256 let cnt2 = cnt.clone();
257 spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap();
258 pool.run_until_stalled();
259 assert_eq!(cnt.get(), 2);
260 }
261
262 #[test]
run_until_stalled_runs_spawned_sub_futures()263 fn run_until_stalled_runs_spawned_sub_futures() {
264 let mut pool = LocalPool::new();
265 let spawn = pool.spawner();
266 let cnt = Rc::new(Cell::new(0));
267
268 let inner_spawner = spawn.clone();
269 let cnt1 = cnt.clone();
270 spawn
271 .spawn_local_obj(
272 Box::pin(poll_fn(move |_| {
273 cnt1.set(cnt1.get() + 1);
274
275 let cnt2 = cnt1.clone();
276 inner_spawner
277 .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
278 .unwrap();
279
280 Poll::Pending
281 }))
282 .into(),
283 )
284 .unwrap();
285
286 pool.run_until_stalled();
287 assert_eq!(cnt.get(), 2);
288 }
289
290 #[test]
run_until_stalled_executes_all_ready()291 fn run_until_stalled_executes_all_ready() {
292 const ITER: usize = if cfg!(miri) { 50 } else { 200 };
293 const PER_ITER: usize = 3;
294
295 let cnt = Rc::new(Cell::new(0));
296
297 let mut pool = LocalPool::new();
298 let spawn = pool.spawner();
299
300 for i in 0..ITER {
301 for _ in 0..PER_ITER {
302 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
303
304 let cnt = cnt.clone();
305 spawn
306 .spawn_local_obj(
307 Box::pin(lazy(move |_| {
308 cnt.set(cnt.get() + 1);
309 }))
310 .into(),
311 )
312 .unwrap();
313
314 // also add some pending tasks to test if they are ignored
315 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
316 }
317 assert_eq!(cnt.get(), i * PER_ITER);
318 pool.run_until_stalled();
319 assert_eq!(cnt.get(), (i + 1) * PER_ITER);
320 }
321 }
322
323 #[test]
324 #[should_panic]
nesting_run()325 fn nesting_run() {
326 let mut pool = LocalPool::new();
327 let spawn = pool.spawner();
328
329 spawn
330 .spawn_obj(
331 Box::pin(lazy(|_| {
332 let mut pool = LocalPool::new();
333 pool.run();
334 }))
335 .into(),
336 )
337 .unwrap();
338
339 pool.run();
340 }
341
342 #[test]
343 #[should_panic]
nesting_run_run_until_stalled()344 fn nesting_run_run_until_stalled() {
345 let mut pool = LocalPool::new();
346 let spawn = pool.spawner();
347
348 spawn
349 .spawn_obj(
350 Box::pin(lazy(|_| {
351 let mut pool = LocalPool::new();
352 pool.run_until_stalled();
353 }))
354 .into(),
355 )
356 .unwrap();
357
358 pool.run();
359 }
360
361 #[test]
tasks_are_scheduled_fairly()362 fn tasks_are_scheduled_fairly() {
363 let state = Rc::new(RefCell::new([0, 0]));
364
365 struct Spin {
366 state: Rc<RefCell<[i32; 2]>>,
367 idx: usize,
368 }
369
370 impl Future for Spin {
371 type Output = ();
372
373 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
374 let mut state = self.state.borrow_mut();
375
376 if self.idx == 0 {
377 let diff = state[0] - state[1];
378
379 assert!(diff.abs() <= 1);
380
381 if state[0] >= 50 {
382 return Poll::Ready(());
383 }
384 }
385
386 state[self.idx] += 1;
387
388 if state[self.idx] >= 100 {
389 return Poll::Ready(());
390 }
391
392 cx.waker().wake_by_ref();
393 Poll::Pending
394 }
395 }
396
397 let mut pool = LocalPool::new();
398 let spawn = pool.spawner();
399
400 spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap();
401
402 spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap();
403
404 pool.run();
405 }
406
407 // Tests that the use of park/unpark in user-code has no
408 // effect on the expected behavior of the executor.
409 #[test]
park_unpark_independence()410 fn park_unpark_independence() {
411 let mut done = false;
412
413 let future = future::poll_fn(move |cx| {
414 if done {
415 return Poll::Ready(());
416 }
417 done = true;
418 cx.waker().clone().wake(); // (*)
419 // some user-code that temporarily parks the thread
420 let test = thread::current();
421 let latch = Arc::new(AtomicBool::new(false));
422 let signal = latch.clone();
423 thread::spawn(move || {
424 thread::sleep(Duration::from_millis(10));
425 signal.store(true, Ordering::SeqCst);
426 test.unpark()
427 });
428 while !latch.load(Ordering::Relaxed) {
429 thread::park();
430 }
431 Poll::Pending // Expect to be called again due to (*).
432 });
433
434 futures::executor::block_on(future)
435 }
436
437 struct SelfWaking {
438 wakeups_remaining: Rc<RefCell<usize>>,
439 }
440
441 impl Future for SelfWaking {
442 type Output = ();
443
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>444 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
445 if *self.wakeups_remaining.borrow() != 0 {
446 *self.wakeups_remaining.borrow_mut() -= 1;
447 cx.waker().wake_by_ref();
448 }
449
450 Poll::Pending
451 }
452 }
453
454 /// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
455 ///
456 /// The issue was that self-waking futures could cause `run_until_stalled`
457 /// to exit early, even when progress could still be made.
458 #[test]
self_waking_run_until_stalled()459 fn self_waking_run_until_stalled() {
460 let wakeups_remaining = Rc::new(RefCell::new(10));
461
462 let mut pool = LocalPool::new();
463 let spawner = pool.spawner();
464 for _ in 0..3 {
465 let wakeups_remaining = Rc::clone(&wakeups_remaining);
466 spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
467 }
468
469 // This should keep polling until there are no more wakeups.
470 pool.run_until_stalled();
471
472 assert_eq!(*wakeups_remaining.borrow(), 0);
473 }
474
475 /// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
476 ///
477 /// The issue was that self-waking futures could cause `try_run_one`
478 /// to exit early, even when progress could still be made.
479 #[test]
self_waking_try_run_one()480 fn self_waking_try_run_one() {
481 let wakeups_remaining = Rc::new(RefCell::new(10));
482
483 let mut pool = LocalPool::new();
484 let spawner = pool.spawner();
485 for _ in 0..3 {
486 let wakeups_remaining = Rc::clone(&wakeups_remaining);
487 spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
488 }
489
490 spawner.spawn(future::ready(())).unwrap();
491
492 // The `ready` future should complete.
493 assert!(pool.try_run_one());
494
495 // The self-waking futures are each polled once.
496 assert_eq!(*wakeups_remaining.borrow(), 7);
497 }
498