1 use crate::enter;
2 use futures_core::future::Future;
3 use futures_core::stream::Stream;
4 use futures_core::task::{Context, Poll};
5 use futures_task::{waker_ref, ArcWake};
6 use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
7 use futures_util::pin_mut;
8 use futures_util::stream::FuturesUnordered;
9 use futures_util::stream::StreamExt;
10 use std::cell::RefCell;
11 use std::ops::{Deref, DerefMut};
12 use std::rc::{Rc, Weak};
13 use std::sync::{
14     atomic::{AtomicBool, Ordering},
15     Arc,
16 };
17 use std::thread::{self, Thread};
18 use std::vec::Vec;
19 
20 /// A single-threaded task pool for polling futures to completion.
21 ///
22 /// This executor allows you to multiplex any number of tasks onto a single
23 /// thread. It's appropriate to poll strictly I/O-bound futures that do very
24 /// little work in between I/O actions.
25 ///
26 /// To get a handle to the pool that implements
27 /// [`Spawn`](futures_task::Spawn), use the
28 /// [`spawner()`](LocalPool::spawner) method. Because the executor is
29 /// single-threaded, it supports a special form of task spawning for non-`Send`
30 /// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
31 #[derive(Debug)]
32 pub struct LocalPool {
33     pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
34     incoming: Rc<Incoming>,
35 }
36 
37 /// A handle to a [`LocalPool`] that implements [`Spawn`](futures_task::Spawn).
38 #[derive(Clone, Debug)]
39 pub struct LocalSpawner {
40     incoming: Weak<Incoming>,
41 }
42 
43 type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
44 
45 pub(crate) struct ThreadNotify {
46     /// The (single) executor thread.
47     thread: Thread,
48     /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
49     /// before the next `park()`, which may otherwise happen if the code
50     /// being executed as part of the future(s) being polled makes use of
51     /// park / unpark calls of its own, i.e. we cannot assume that no other
52     /// code uses park / unpark on the executing `thread`.
53     unparked: AtomicBool,
54 }
55 
56 std::thread_local! {
57     static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
58         thread: thread::current(),
59         unparked: AtomicBool::new(false),
60     });
61 }
62 
63 impl ArcWake for ThreadNotify {
wake_by_ref(arc_self: &Arc<Self>)64     fn wake_by_ref(arc_self: &Arc<Self>) {
65         // Make sure the wakeup is remembered until the next `park()`.
66         let unparked = arc_self.unparked.swap(true, Ordering::Release);
67         if !unparked {
68             // If the thread has not been unparked yet, it must be done
69             // now. If it was actually parked, it will run again,
70             // otherwise the token made available by `unpark`
71             // may be consumed before reaching `park()`, but `unparked`
72             // ensures it is not forgotten.
73             arc_self.thread.unpark();
74         }
75     }
76 }
77 
78 // Set up and run a basic single-threaded spawner loop, invoking `f` on each
79 // turn.
run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T80 fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
81     let _enter = enter().expect(
82         "cannot execute `LocalPool` executor from within \
83          another executor",
84     );
85 
86     CURRENT_THREAD_NOTIFY.with(|thread_notify| {
87         let waker = waker_ref(thread_notify);
88         let mut cx = Context::from_waker(&waker);
89         loop {
90             if let Poll::Ready(t) = f(&mut cx) {
91                 return t;
92             }
93 
94             // Wait for a wakeup.
95             while !thread_notify.unparked.swap(false, Ordering::Acquire) {
96                 // No wakeup occurred. It may occur now, right before parking,
97                 // but in that case the token made available by `unpark()`
98                 // is guaranteed to still be available and `park()` is a no-op.
99                 thread::park();
100             }
101         }
102     })
103 }
104 
105 /// Check for a wakeup, but don't consume it.
woken() -> bool106 fn woken() -> bool {
107     CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire))
108 }
109 
110 impl LocalPool {
111     /// Create a new, empty pool of tasks.
new() -> Self112     pub fn new() -> Self {
113         Self { pool: FuturesUnordered::new(), incoming: Default::default() }
114     }
115 
116     /// Get a clonable handle to the pool as a [`Spawn`].
spawner(&self) -> LocalSpawner117     pub fn spawner(&self) -> LocalSpawner {
118         LocalSpawner { incoming: Rc::downgrade(&self.incoming) }
119     }
120 
121     /// Run all tasks in the pool to completion.
122     ///
123     /// ```
124     /// use futures::executor::LocalPool;
125     ///
126     /// let mut pool = LocalPool::new();
127     ///
128     /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
129     ///
130     /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
131     /// pool.run();
132     /// ```
133     ///
134     /// The function will block the calling thread until *all* tasks in the pool
135     /// are complete, including any spawned while running existing tasks.
run(&mut self)136     pub fn run(&mut self) {
137         run_executor(|cx| self.poll_pool(cx))
138     }
139 
140     /// Runs all the tasks in the pool until the given future completes.
141     ///
142     /// ```
143     /// use futures::executor::LocalPool;
144     ///
145     /// let mut pool = LocalPool::new();
146     /// # let my_app  = async {};
147     ///
148     /// // run tasks in the pool until `my_app` completes
149     /// pool.run_until(my_app);
150     /// ```
151     ///
152     /// The function will block the calling thread *only* until the future `f`
153     /// completes; there may still be incomplete tasks in the pool, which will
154     /// be inert after the call completes, but can continue with further use of
155     /// one of the pool's run or poll methods. While the function is running,
156     /// however, all tasks in the pool will try to make progress.
run_until<F: Future>(&mut self, future: F) -> F::Output157     pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
158         pin_mut!(future);
159 
160         run_executor(|cx| {
161             {
162                 // if our main task is done, so are we
163                 let result = future.as_mut().poll(cx);
164                 if let Poll::Ready(output) = result {
165                     return Poll::Ready(output);
166                 }
167             }
168 
169             let _ = self.poll_pool(cx);
170             Poll::Pending
171         })
172     }
173 
174     /// Runs all tasks and returns after completing one future or until no more progress
175     /// can be made. Returns `true` if one future was completed, `false` otherwise.
176     ///
177     /// ```
178     /// use futures::executor::LocalPool;
179     /// use futures::task::LocalSpawnExt;
180     /// use futures::future::{ready, pending};
181     ///
182     /// let mut pool = LocalPool::new();
183     /// let spawner = pool.spawner();
184     ///
185     /// spawner.spawn_local(ready(())).unwrap();
186     /// spawner.spawn_local(ready(())).unwrap();
187     /// spawner.spawn_local(pending()).unwrap();
188     ///
189     /// // Run the two ready tasks and return true for them.
190     /// pool.try_run_one(); // returns true after completing one of the ready futures
191     /// pool.try_run_one(); // returns true after completing the other ready future
192     ///
193     /// // the remaining task can not be completed
194     /// assert!(!pool.try_run_one()); // returns false
195     /// ```
196     ///
197     /// This function will not block the calling thread and will return the moment
198     /// that there are no tasks left for which progress can be made or after exactly one
199     /// task was completed; Remaining incomplete tasks in the pool can continue with
200     /// further use of one of the pool's run or poll methods.
201     /// Though only one task will be completed, progress may be made on multiple tasks.
try_run_one(&mut self) -> bool202     pub fn try_run_one(&mut self) -> bool {
203         run_executor(|cx| {
204             loop {
205                 self.drain_incoming();
206 
207                 match self.pool.poll_next_unpin(cx) {
208                     // Success!
209                     Poll::Ready(Some(())) => return Poll::Ready(true),
210                     // The pool was empty.
211                     Poll::Ready(None) => return Poll::Ready(false),
212                     Poll::Pending => (),
213                 }
214 
215                 if !self.incoming.borrow().is_empty() {
216                     // New tasks were spawned; try again.
217                     continue;
218                 } else if woken() {
219                     // The pool yielded to us, but there's more progress to be made.
220                     return Poll::Pending;
221                 } else {
222                     return Poll::Ready(false);
223                 }
224             }
225         })
226     }
227 
228     /// Runs all tasks in the pool and returns if no more progress can be made
229     /// on any task.
230     ///
231     /// ```
232     /// use futures::executor::LocalPool;
233     /// use futures::task::LocalSpawnExt;
234     /// use futures::future::{ready, pending};
235     ///
236     /// let mut pool = LocalPool::new();
237     /// let spawner = pool.spawner();
238     ///
239     /// spawner.spawn_local(ready(())).unwrap();
240     /// spawner.spawn_local(ready(())).unwrap();
241     /// spawner.spawn_local(pending()).unwrap();
242     ///
243     /// // Runs the two ready task and returns.
244     /// // The empty task remains in the pool.
245     /// pool.run_until_stalled();
246     /// ```
247     ///
248     /// This function will not block the calling thread and will return the moment
249     /// that there are no tasks left for which progress can be made;
250     /// remaining incomplete tasks in the pool can continue with further use of one
251     /// of the pool's run or poll methods. While the function is running, all tasks
252     /// in the pool will try to make progress.
run_until_stalled(&mut self)253     pub fn run_until_stalled(&mut self) {
254         run_executor(|cx| match self.poll_pool(cx) {
255             // The pool is empty.
256             Poll::Ready(()) => Poll::Ready(()),
257             Poll::Pending => {
258                 if woken() {
259                     Poll::Pending
260                 } else {
261                     // We're stalled for now.
262                     Poll::Ready(())
263                 }
264             }
265         });
266     }
267 
268     /// Poll `self.pool`, re-filling it with any newly-spawned tasks.
269     /// Repeat until either the pool is empty, or it returns `Pending`.
270     ///
271     /// Returns `Ready` if the pool was empty, and `Pending` otherwise.
272     ///
273     /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
274     /// mean that the pool can't make progress.
poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()>275     fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
276         loop {
277             self.drain_incoming();
278 
279             let pool_ret = self.pool.poll_next_unpin(cx);
280 
281             // We queued up some new tasks; add them and poll again.
282             if !self.incoming.borrow().is_empty() {
283                 continue;
284             }
285 
286             match pool_ret {
287                 Poll::Ready(Some(())) => continue,
288                 Poll::Ready(None) => return Poll::Ready(()),
289                 Poll::Pending => return Poll::Pending,
290             }
291         }
292     }
293 
294     /// Empty the incoming queue of newly-spawned tasks.
drain_incoming(&mut self)295     fn drain_incoming(&mut self) {
296         let mut incoming = self.incoming.borrow_mut();
297         for task in incoming.drain(..) {
298             self.pool.push(task)
299         }
300     }
301 }
302 
303 impl Default for LocalPool {
default() -> Self304     fn default() -> Self {
305         Self::new()
306     }
307 }
308 
309 /// Run a future to completion on the current thread.
310 ///
311 /// This function will block the caller until the given future has completed.
312 ///
313 /// Use a [`LocalPool`] if you need finer-grained control over spawned tasks.
block_on<F: Future>(f: F) -> F::Output314 pub fn block_on<F: Future>(f: F) -> F::Output {
315     pin_mut!(f);
316     run_executor(|cx| f.as_mut().poll(cx))
317 }
318 
319 /// Turn a stream into a blocking iterator.
320 ///
321 /// When `next` is called on the resulting `BlockingStream`, the caller
322 /// will be blocked until the next element of the `Stream` becomes available.
block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S>323 pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
324     BlockingStream { stream }
325 }
326 
327 /// An iterator which blocks on values from a stream until they become available.
328 #[derive(Debug)]
329 pub struct BlockingStream<S: Stream + Unpin> {
330     stream: S,
331 }
332 
333 impl<S: Stream + Unpin> Deref for BlockingStream<S> {
334     type Target = S;
deref(&self) -> &Self::Target335     fn deref(&self) -> &Self::Target {
336         &self.stream
337     }
338 }
339 
340 impl<S: Stream + Unpin> DerefMut for BlockingStream<S> {
deref_mut(&mut self) -> &mut Self::Target341     fn deref_mut(&mut self) -> &mut Self::Target {
342         &mut self.stream
343     }
344 }
345 
346 impl<S: Stream + Unpin> BlockingStream<S> {
347     /// Convert this `BlockingStream` into the inner `Stream` type.
into_inner(self) -> S348     pub fn into_inner(self) -> S {
349         self.stream
350     }
351 }
352 
353 impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
354     type Item = S::Item;
355 
next(&mut self) -> Option<Self::Item>356     fn next(&mut self) -> Option<Self::Item> {
357         LocalPool::new().run_until(self.stream.next())
358     }
359 
size_hint(&self) -> (usize, Option<usize>)360     fn size_hint(&self) -> (usize, Option<usize>) {
361         self.stream.size_hint()
362     }
363 }
364 
365 impl Spawn for LocalSpawner {
spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError>366     fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
367         if let Some(incoming) = self.incoming.upgrade() {
368             incoming.borrow_mut().push(future.into());
369             Ok(())
370         } else {
371             Err(SpawnError::shutdown())
372         }
373     }
374 
status(&self) -> Result<(), SpawnError>375     fn status(&self) -> Result<(), SpawnError> {
376         if self.incoming.upgrade().is_some() {
377             Ok(())
378         } else {
379             Err(SpawnError::shutdown())
380         }
381     }
382 }
383 
384 impl LocalSpawn for LocalSpawner {
spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError>385     fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
386         if let Some(incoming) = self.incoming.upgrade() {
387             incoming.borrow_mut().push(future);
388             Ok(())
389         } else {
390             Err(SpawnError::shutdown())
391         }
392     }
393 
status_local(&self) -> Result<(), SpawnError>394     fn status_local(&self) -> Result<(), SpawnError> {
395         if self.incoming.upgrade().is_some() {
396             Ok(())
397         } else {
398             Err(SpawnError::shutdown())
399         }
400     }
401 }
402