1 use futures::channel::oneshot;
2 use futures::executor::LocalPool;
3 use futures::future::{self, lazy, poll_fn, Future};
4 use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
5 use std::cell::{Cell, RefCell};
6 use std::marker::PhantomData;
7 use std::pin::Pin;
8 use std::rc::Rc;
9 use std::sync::atomic::{AtomicBool, Ordering};
10 use std::sync::Arc;
11 use std::thread;
12 use std::time::Duration;
13 
14 struct Pending(PhantomData<Rc<()>>);
15 
16 impl Future for Pending {
17     type Output = ();
18 
poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()>19     fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
20         Poll::Pending
21     }
22 }
23 
pending() -> Pending24 fn pending() -> Pending {
25     Pending(PhantomData)
26 }
27 
28 #[test]
run_until_single_future()29 fn run_until_single_future() {
30     let mut cnt = 0;
31 
32     {
33         let mut pool = LocalPool::new();
34         let fut = lazy(|_| {
35             cnt += 1;
36         });
37         pool.run_until(fut);
38     }
39 
40     assert_eq!(cnt, 1);
41 }
42 
43 #[test]
run_until_ignores_spawned()44 fn run_until_ignores_spawned() {
45     let mut pool = LocalPool::new();
46     let spawn = pool.spawner();
47     spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
48     pool.run_until(lazy(|_| ()));
49 }
50 
51 #[test]
run_until_executes_spawned()52 fn run_until_executes_spawned() {
53     let (tx, rx) = oneshot::channel();
54     let mut pool = LocalPool::new();
55     let spawn = pool.spawner();
56     spawn
57         .spawn_local_obj(
58             Box::pin(lazy(move |_| {
59                 tx.send(()).unwrap();
60             }))
61             .into(),
62         )
63         .unwrap();
64     pool.run_until(rx).unwrap();
65 }
66 
67 #[test]
run_returns_if_empty()68 fn run_returns_if_empty() {
69     let mut pool = LocalPool::new();
70     pool.run();
71     pool.run();
72 }
73 
74 #[test]
run_executes_spawned()75 fn run_executes_spawned() {
76     let cnt = Rc::new(Cell::new(0));
77     let cnt2 = cnt.clone();
78 
79     let mut pool = LocalPool::new();
80     let spawn = pool.spawner();
81     let spawn2 = pool.spawner();
82 
83     spawn
84         .spawn_local_obj(
85             Box::pin(lazy(move |_| {
86                 spawn2
87                     .spawn_local_obj(
88                         Box::pin(lazy(move |_| {
89                             cnt2.set(cnt2.get() + 1);
90                         }))
91                         .into(),
92                     )
93                     .unwrap();
94             }))
95             .into(),
96         )
97         .unwrap();
98 
99     pool.run();
100 
101     assert_eq!(cnt.get(), 1);
102 }
103 
104 #[test]
run_spawn_many()105 fn run_spawn_many() {
106     const ITER: usize = 200;
107 
108     let cnt = Rc::new(Cell::new(0));
109 
110     let mut pool = LocalPool::new();
111     let spawn = pool.spawner();
112 
113     for _ in 0..ITER {
114         let cnt = cnt.clone();
115         spawn
116             .spawn_local_obj(
117                 Box::pin(lazy(move |_| {
118                     cnt.set(cnt.get() + 1);
119                 }))
120                 .into(),
121             )
122             .unwrap();
123     }
124 
125     pool.run();
126 
127     assert_eq!(cnt.get(), ITER);
128 }
129 
130 #[test]
try_run_one_returns_if_empty()131 fn try_run_one_returns_if_empty() {
132     let mut pool = LocalPool::new();
133     assert!(!pool.try_run_one());
134 }
135 
136 #[test]
try_run_one_executes_one_ready()137 fn try_run_one_executes_one_ready() {
138     const ITER: usize = 200;
139 
140     let cnt = Rc::new(Cell::new(0));
141 
142     let mut pool = LocalPool::new();
143     let spawn = pool.spawner();
144 
145     for _ in 0..ITER {
146         spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
147 
148         let cnt = cnt.clone();
149         spawn
150             .spawn_local_obj(
151                 Box::pin(lazy(move |_| {
152                     cnt.set(cnt.get() + 1);
153                 }))
154                 .into(),
155             )
156             .unwrap();
157 
158         spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
159     }
160 
161     for i in 0..ITER {
162         assert_eq!(cnt.get(), i);
163         assert!(pool.try_run_one());
164         assert_eq!(cnt.get(), i + 1);
165     }
166     assert!(!pool.try_run_one());
167 }
168 
169 #[test]
try_run_one_returns_on_no_progress()170 fn try_run_one_returns_on_no_progress() {
171     const ITER: usize = 10;
172 
173     let cnt = Rc::new(Cell::new(0));
174 
175     let mut pool = LocalPool::new();
176     let spawn = pool.spawner();
177 
178     let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
179     {
180         let cnt = cnt.clone();
181         let waker = waker.clone();
182         spawn
183             .spawn_local_obj(
184                 Box::pin(poll_fn(move |ctx| {
185                     cnt.set(cnt.get() + 1);
186                     waker.set(Some(ctx.waker().clone()));
187                     if cnt.get() == ITER {
188                         Poll::Ready(())
189                     } else {
190                         Poll::Pending
191                     }
192                 }))
193                 .into(),
194             )
195             .unwrap();
196     }
197 
198     for i in 0..ITER - 1 {
199         assert_eq!(cnt.get(), i);
200         assert!(!pool.try_run_one());
201         assert_eq!(cnt.get(), i + 1);
202         let w = waker.take();
203         assert!(w.is_some());
204         w.unwrap().wake();
205     }
206     assert!(pool.try_run_one());
207     assert_eq!(cnt.get(), ITER);
208 }
209 
210 #[test]
try_run_one_runs_sub_futures()211 fn try_run_one_runs_sub_futures() {
212     let mut pool = LocalPool::new();
213     let spawn = pool.spawner();
214     let cnt = Rc::new(Cell::new(0));
215 
216     let inner_spawner = spawn.clone();
217     let cnt1 = cnt.clone();
218     spawn
219         .spawn_local_obj(
220             Box::pin(poll_fn(move |_| {
221                 cnt1.set(cnt1.get() + 1);
222 
223                 let cnt2 = cnt1.clone();
224                 inner_spawner
225                     .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
226                     .unwrap();
227 
228                 Poll::Pending
229             }))
230             .into(),
231         )
232         .unwrap();
233 
234     pool.try_run_one();
235     assert_eq!(cnt.get(), 2);
236 }
237 
238 #[test]
run_until_stalled_returns_if_empty()239 fn run_until_stalled_returns_if_empty() {
240     let mut pool = LocalPool::new();
241     pool.run_until_stalled();
242     pool.run_until_stalled();
243 }
244 
245 #[test]
run_until_stalled_returns_multiple_times()246 fn run_until_stalled_returns_multiple_times() {
247     let mut pool = LocalPool::new();
248     let spawn = pool.spawner();
249     let cnt = Rc::new(Cell::new(0));
250 
251     let cnt1 = cnt.clone();
252     spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap();
253     pool.run_until_stalled();
254     assert_eq!(cnt.get(), 1);
255 
256     let cnt2 = cnt.clone();
257     spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap();
258     pool.run_until_stalled();
259     assert_eq!(cnt.get(), 2);
260 }
261 
262 #[test]
run_until_stalled_runs_spawned_sub_futures()263 fn run_until_stalled_runs_spawned_sub_futures() {
264     let mut pool = LocalPool::new();
265     let spawn = pool.spawner();
266     let cnt = Rc::new(Cell::new(0));
267 
268     let inner_spawner = spawn.clone();
269     let cnt1 = cnt.clone();
270     spawn
271         .spawn_local_obj(
272             Box::pin(poll_fn(move |_| {
273                 cnt1.set(cnt1.get() + 1);
274 
275                 let cnt2 = cnt1.clone();
276                 inner_spawner
277                     .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
278                     .unwrap();
279 
280                 Poll::Pending
281             }))
282             .into(),
283         )
284         .unwrap();
285 
286     pool.run_until_stalled();
287     assert_eq!(cnt.get(), 2);
288 }
289 
290 #[test]
run_until_stalled_executes_all_ready()291 fn run_until_stalled_executes_all_ready() {
292     const ITER: usize = if cfg!(miri) { 50 } else { 200 };
293     const PER_ITER: usize = 3;
294 
295     let cnt = Rc::new(Cell::new(0));
296 
297     let mut pool = LocalPool::new();
298     let spawn = pool.spawner();
299 
300     for i in 0..ITER {
301         for _ in 0..PER_ITER {
302             spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
303 
304             let cnt = cnt.clone();
305             spawn
306                 .spawn_local_obj(
307                     Box::pin(lazy(move |_| {
308                         cnt.set(cnt.get() + 1);
309                     }))
310                     .into(),
311                 )
312                 .unwrap();
313 
314             // also add some pending tasks to test if they are ignored
315             spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
316         }
317         assert_eq!(cnt.get(), i * PER_ITER);
318         pool.run_until_stalled();
319         assert_eq!(cnt.get(), (i + 1) * PER_ITER);
320     }
321 }
322 
323 #[test]
324 #[should_panic]
nesting_run()325 fn nesting_run() {
326     let mut pool = LocalPool::new();
327     let spawn = pool.spawner();
328 
329     spawn
330         .spawn_obj(
331             Box::pin(lazy(|_| {
332                 let mut pool = LocalPool::new();
333                 pool.run();
334             }))
335             .into(),
336         )
337         .unwrap();
338 
339     pool.run();
340 }
341 
342 #[test]
343 #[should_panic]
nesting_run_run_until_stalled()344 fn nesting_run_run_until_stalled() {
345     let mut pool = LocalPool::new();
346     let spawn = pool.spawner();
347 
348     spawn
349         .spawn_obj(
350             Box::pin(lazy(|_| {
351                 let mut pool = LocalPool::new();
352                 pool.run_until_stalled();
353             }))
354             .into(),
355         )
356         .unwrap();
357 
358     pool.run();
359 }
360 
361 #[test]
tasks_are_scheduled_fairly()362 fn tasks_are_scheduled_fairly() {
363     let state = Rc::new(RefCell::new([0, 0]));
364 
365     struct Spin {
366         state: Rc<RefCell<[i32; 2]>>,
367         idx: usize,
368     }
369 
370     impl Future for Spin {
371         type Output = ();
372 
373         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
374             let mut state = self.state.borrow_mut();
375 
376             if self.idx == 0 {
377                 let diff = state[0] - state[1];
378 
379                 assert!(diff.abs() <= 1);
380 
381                 if state[0] >= 50 {
382                     return Poll::Ready(());
383                 }
384             }
385 
386             state[self.idx] += 1;
387 
388             if state[self.idx] >= 100 {
389                 return Poll::Ready(());
390             }
391 
392             cx.waker().wake_by_ref();
393             Poll::Pending
394         }
395     }
396 
397     let mut pool = LocalPool::new();
398     let spawn = pool.spawner();
399 
400     spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap();
401 
402     spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap();
403 
404     pool.run();
405 }
406 
407 // Tests that the use of park/unpark in user-code has no
408 // effect on the expected behavior of the executor.
409 #[test]
park_unpark_independence()410 fn park_unpark_independence() {
411     let mut done = false;
412 
413     let future = future::poll_fn(move |cx| {
414         if done {
415             return Poll::Ready(());
416         }
417         done = true;
418         cx.waker().clone().wake(); // (*)
419                                    // some user-code that temporarily parks the thread
420         let test = thread::current();
421         let latch = Arc::new(AtomicBool::new(false));
422         let signal = latch.clone();
423         thread::spawn(move || {
424             thread::sleep(Duration::from_millis(10));
425             signal.store(true, Ordering::SeqCst);
426             test.unpark()
427         });
428         while !latch.load(Ordering::Relaxed) {
429             thread::park();
430         }
431         Poll::Pending // Expect to be called again due to (*).
432     });
433 
434     futures::executor::block_on(future)
435 }
436 
437 struct SelfWaking {
438     wakeups_remaining: Rc<RefCell<usize>>,
439 }
440 
441 impl Future for SelfWaking {
442     type Output = ();
443 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>444     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
445         if *self.wakeups_remaining.borrow() != 0 {
446             *self.wakeups_remaining.borrow_mut() -= 1;
447             cx.waker().wake_by_ref();
448         }
449 
450         Poll::Pending
451     }
452 }
453 
454 /// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
455 ///
456 /// The issue was that self-waking futures could cause `run_until_stalled`
457 /// to exit early, even when progress could still be made.
458 #[test]
self_waking_run_until_stalled()459 fn self_waking_run_until_stalled() {
460     let wakeups_remaining = Rc::new(RefCell::new(10));
461 
462     let mut pool = LocalPool::new();
463     let spawner = pool.spawner();
464     for _ in 0..3 {
465         let wakeups_remaining = Rc::clone(&wakeups_remaining);
466         spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
467     }
468 
469     // This should keep polling until there are no more wakeups.
470     pool.run_until_stalled();
471 
472     assert_eq!(*wakeups_remaining.borrow(), 0);
473 }
474 
475 /// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
476 ///
477 /// The issue was that self-waking futures could cause `try_run_one`
478 /// to exit early, even when progress could still be made.
479 #[test]
self_waking_try_run_one()480 fn self_waking_try_run_one() {
481     let wakeups_remaining = Rc::new(RefCell::new(10));
482 
483     let mut pool = LocalPool::new();
484     let spawner = pool.spawner();
485     for _ in 0..3 {
486         let wakeups_remaining = Rc::clone(&wakeups_remaining);
487         spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
488     }
489 
490     spawner.spawn(future::ready(())).unwrap();
491 
492     // The `ready` future should complete.
493     assert!(pool.try_run_one());
494 
495     // The self-waking futures are each polled once.
496     assert_eq!(*wakeups_remaining.borrow(), 7);
497 }
498