1 use core::fmt;
2 use core::future::Future;
3 use core::marker::PhantomData;
4 use core::mem;
5 use core::ptr::NonNull;
6 use core::sync::atomic::Ordering;
7 use core::task::Waker;
8 
9 use alloc::boxed::Box;
10 
11 use crate::header::Header;
12 use crate::raw::RawTask;
13 use crate::state::*;
14 use crate::Task;
15 
16 mod sealed {
17     use super::*;
18     pub trait Sealed<M> {}
19 
20     impl<M, F> Sealed<M> for F where F: Fn(Runnable<M>) {}
21 
22     impl<M, F> Sealed<M> for WithInfo<F> where F: Fn(Runnable<M>, ScheduleInfo) {}
23 }
24 
25 /// A builder that creates a new task.
26 #[derive(Debug)]
27 pub struct Builder<M> {
28     /// The metadata associated with the task.
29     pub(crate) metadata: M,
30 
31     /// Whether or not a panic that occurs in the task should be propagated.
32     #[cfg(feature = "std")]
33     pub(crate) propagate_panic: bool,
34 }
35 
36 impl<M: Default> Default for Builder<M> {
default() -> Self37     fn default() -> Self {
38         Builder::new().metadata(M::default())
39     }
40 }
41 
42 /// Extra scheduling information that can be passed to the scheduling function.
43 ///
44 /// The data source of this struct is directly from the actual implementation
45 /// of the crate itself, different from [`Runnable`]'s metadata, which is
46 /// managed by the caller.
47 ///
48 /// # Examples
49 ///
50 /// ```
51 /// use async_task::{Runnable, ScheduleInfo, WithInfo};
52 /// use std::sync::{Arc, Mutex};
53 ///
54 /// // The future inside the task.
55 /// let future = async {
56 ///     println!("Hello, world!");
57 /// };
58 ///
59 /// // If the task gets woken up while running, it will be sent into this channel.
60 /// let (s, r) = flume::unbounded();
61 /// // Otherwise, it will be placed into this slot.
62 /// let lifo_slot = Arc::new(Mutex::new(None));
63 /// let schedule = move |runnable: Runnable, info: ScheduleInfo| {
64 ///     if info.woken_while_running {
65 ///         s.send(runnable).unwrap()
66 ///     } else {
67 ///         let last = lifo_slot.lock().unwrap().replace(runnable);
68 ///         if let Some(last) = last {
69 ///             s.send(last).unwrap()
70 ///         }
71 ///     }
72 /// };
73 ///
74 /// // Create the actual scheduler to be spawned with some future.
75 /// let scheduler = WithInfo(schedule);
76 /// // Create a task with the future and the scheduler.
77 /// let (runnable, task) = async_task::spawn(future, scheduler);
78 /// ```
79 #[derive(Debug, Copy, Clone)]
80 #[non_exhaustive]
81 pub struct ScheduleInfo {
82     /// Indicates whether the task gets woken up while running.
83     ///
84     /// It is set to true usually because the task has yielded itself to the
85     /// scheduler.
86     pub woken_while_running: bool,
87 }
88 
89 impl ScheduleInfo {
new(woken_while_running: bool) -> Self90     pub(crate) fn new(woken_while_running: bool) -> Self {
91         ScheduleInfo {
92             woken_while_running,
93         }
94     }
95 }
96 
97 /// The trait for scheduling functions.
98 pub trait Schedule<M = ()>: sealed::Sealed<M> {
99     /// The actual scheduling procedure.
schedule(&self, runnable: Runnable<M>, info: ScheduleInfo)100     fn schedule(&self, runnable: Runnable<M>, info: ScheduleInfo);
101 }
102 
103 impl<M, F> Schedule<M> for F
104 where
105     F: Fn(Runnable<M>),
106 {
schedule(&self, runnable: Runnable<M>, _: ScheduleInfo)107     fn schedule(&self, runnable: Runnable<M>, _: ScheduleInfo) {
108         self(runnable)
109     }
110 }
111 
112 /// Pass a scheduling function with more scheduling information - a.k.a.
113 /// [`ScheduleInfo`].
114 ///
115 /// Sometimes, it's useful to pass the runnable's state directly to the
116 /// scheduling function, such as whether it's woken up while running. The
117 /// scheduler can thus use the information to determine its scheduling
118 /// strategy.
119 ///
120 /// The data source of [`ScheduleInfo`] is directly from the actual
121 /// implementation of the crate itself, different from [`Runnable`]'s metadata,
122 /// which is managed by the caller.
123 ///
124 /// # Examples
125 ///
126 /// ```
127 /// use async_task::{ScheduleInfo, WithInfo};
128 /// use std::sync::{Arc, Mutex};
129 ///
130 /// // The future inside the task.
131 /// let future = async {
132 ///     println!("Hello, world!");
133 /// };
134 ///
135 /// // If the task gets woken up while running, it will be sent into this channel.
136 /// let (s, r) = flume::unbounded();
137 /// // Otherwise, it will be placed into this slot.
138 /// let lifo_slot = Arc::new(Mutex::new(None));
139 /// let schedule = move |runnable, info: ScheduleInfo| {
140 ///     if info.woken_while_running {
141 ///         s.send(runnable).unwrap()
142 ///     } else {
143 ///         let last = lifo_slot.lock().unwrap().replace(runnable);
144 ///         if let Some(last) = last {
145 ///             s.send(last).unwrap()
146 ///         }
147 ///     }
148 /// };
149 ///
150 /// // Create a task with the future and the schedule function.
151 /// let (runnable, task) = async_task::spawn(future, WithInfo(schedule));
152 /// ```
153 #[derive(Debug)]
154 pub struct WithInfo<F>(pub F);
155 
156 impl<F> From<F> for WithInfo<F> {
from(value: F) -> Self157     fn from(value: F) -> Self {
158         WithInfo(value)
159     }
160 }
161 
162 impl<M, F> Schedule<M> for WithInfo<F>
163 where
164     F: Fn(Runnable<M>, ScheduleInfo),
165 {
schedule(&self, runnable: Runnable<M>, info: ScheduleInfo)166     fn schedule(&self, runnable: Runnable<M>, info: ScheduleInfo) {
167         (self.0)(runnable, info)
168     }
169 }
170 
171 impl Builder<()> {
172     /// Creates a new task builder.
173     ///
174     /// By default, this task builder has no metadata. Use the [`metadata`] method to
175     /// set the metadata.
176     ///
177     /// # Examples
178     ///
179     /// ```
180     /// use async_task::Builder;
181     ///
182     /// let (runnable, task) = Builder::new().spawn(|()| async {}, |_| {});
183     /// ```
new() -> Builder<()>184     pub fn new() -> Builder<()> {
185         Builder {
186             metadata: (),
187             #[cfg(feature = "std")]
188             propagate_panic: false,
189         }
190     }
191 
192     /// Adds metadata to the task.
193     ///
194     /// In certain cases, it may be useful to associate some metadata with a task. For instance,
195     /// you may want to associate a name with a task, or a priority for a priority queue. This
196     /// method allows the user to attach arbitrary metadata to a task that is available through
197     /// the [`Runnable`] or the [`Task`].
198     ///
199     /// # Examples
200     ///
201     /// This example creates an executor that associates a "priority" number with each task, and
202     /// then runs the tasks in order of priority.
203     ///
204     /// ```
205     /// use async_task::{Builder, Runnable};
206     /// use once_cell::sync::Lazy;
207     /// use std::cmp;
208     /// use std::collections::BinaryHeap;
209     /// use std::sync::Mutex;
210     ///
211     /// # smol::future::block_on(async {
212     /// /// A wrapper around a `Runnable<usize>` that implements `Ord` so that it can be used in a
213     /// /// priority queue.
214     /// struct TaskWrapper(Runnable<usize>);
215     ///
216     /// impl PartialEq for TaskWrapper {
217     ///     fn eq(&self, other: &Self) -> bool {
218     ///         self.0.metadata() == other.0.metadata()
219     ///     }
220     /// }
221     ///
222     /// impl Eq for TaskWrapper {}
223     ///
224     /// impl PartialOrd for TaskWrapper {
225     ///    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
226     ///       Some(self.cmp(other))
227     ///    }
228     /// }
229     ///
230     /// impl Ord for TaskWrapper {
231     ///    fn cmp(&self, other: &Self) -> cmp::Ordering {
232     ///        self.0.metadata().cmp(other.0.metadata())
233     ///    }
234     /// }
235     ///
236     /// static EXECUTOR: Lazy<Mutex<BinaryHeap<TaskWrapper>>> = Lazy::new(|| {
237     ///     Mutex::new(BinaryHeap::new())
238     /// });
239     ///
240     /// let schedule = |runnable| {
241     ///     EXECUTOR.lock().unwrap().push(TaskWrapper(runnable));
242     /// };
243     ///
244     /// // Spawn a few tasks with different priorities.
245     /// let spawn_task = move |priority| {
246     ///     let (runnable, task) = Builder::new().metadata(priority).spawn(
247     ///         move |_| async move { priority },
248     ///         schedule,
249     ///     );
250     ///     runnable.schedule();
251     ///     task
252     /// };
253     ///
254     /// let t1 = spawn_task(1);
255     /// let t2 = spawn_task(2);
256     /// let t3 = spawn_task(3);
257     ///
258     /// // Run the tasks in order of priority.
259     /// let mut metadata_seen = vec![];
260     /// while let Some(TaskWrapper(runnable)) = EXECUTOR.lock().unwrap().pop() {
261     ///     metadata_seen.push(*runnable.metadata());
262     ///     runnable.run();
263     /// }
264     ///
265     /// assert_eq!(metadata_seen, vec![3, 2, 1]);
266     /// assert_eq!(t1.await, 1);
267     /// assert_eq!(t2.await, 2);
268     /// assert_eq!(t3.await, 3);
269     /// # });
270     /// ```
metadata<M>(self, metadata: M) -> Builder<M>271     pub fn metadata<M>(self, metadata: M) -> Builder<M> {
272         Builder {
273             metadata,
274             #[cfg(feature = "std")]
275             propagate_panic: self.propagate_panic,
276         }
277     }
278 }
279 
280 impl<M> Builder<M> {
281     /// Propagates panics that occur in the task.
282     ///
283     /// When this is `true`, panics that occur in the task will be propagated to the caller of
284     /// the [`Task`]. When this is false, no special action is taken when a panic occurs in the
285     /// task, meaning that the caller of [`Runnable::run`] will observe a panic.
286     ///
287     /// This is only available when the `std` feature is enabled. By default, this is `false`.
288     ///
289     /// # Examples
290     ///
291     /// ```
292     /// use async_task::Builder;
293     /// use futures_lite::future::poll_fn;
294     /// use std::future::Future;
295     /// use std::panic;
296     /// use std::pin::Pin;
297     /// use std::task::{Context, Poll};
298     ///
299     /// fn did_panic<F: FnOnce()>(f: F) -> bool {
300     ///     panic::catch_unwind(panic::AssertUnwindSafe(f)).is_err()
301     /// }
302     ///
303     /// # smol::future::block_on(async {
304     /// let (runnable1, mut task1) = Builder::new()
305     ///    .propagate_panic(true)
306     ///    .spawn(|()| async move { panic!() }, |_| {});
307     ///
308     /// let (runnable2, mut task2) = Builder::new()
309     ///    .propagate_panic(false)
310     ///    .spawn(|()| async move { panic!() }, |_| {});
311     ///
312     /// assert!(!did_panic(|| { runnable1.run(); }));
313     /// assert!(did_panic(|| { runnable2.run(); }));
314     ///
315     /// let waker = poll_fn(|cx| Poll::Ready(cx.waker().clone())).await;
316     /// let mut cx = Context::from_waker(&waker);
317     /// assert!(did_panic(|| { let _ = Pin::new(&mut task1).poll(&mut cx); }));
318     /// assert!(did_panic(|| { let _ = Pin::new(&mut task2).poll(&mut cx); }));
319     /// # });
320     /// ```
321     #[cfg(feature = "std")]
propagate_panic(self, propagate_panic: bool) -> Builder<M>322     pub fn propagate_panic(self, propagate_panic: bool) -> Builder<M> {
323         Builder {
324             metadata: self.metadata,
325             propagate_panic,
326         }
327     }
328 
329     /// Creates a new task.
330     ///
331     /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
332     /// output.
333     ///
334     /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
335     /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
336     /// again.
337     ///
338     /// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
339     /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
340     /// should push it into a task queue so that it can be processed later.
341     ///
342     /// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider
343     /// using [`spawn_local()`] or [`spawn_unchecked()`] instead.
344     ///
345     /// # Examples
346     ///
347     /// ```
348     /// use async_task::Builder;
349     ///
350     /// // The future inside the task.
351     /// let future = async {
352     ///     println!("Hello, world!");
353     /// };
354     ///
355     /// // A function that schedules the task when it gets woken up.
356     /// let (s, r) = flume::unbounded();
357     /// let schedule = move |runnable| s.send(runnable).unwrap();
358     ///
359     /// // Create a task with the future and the schedule function.
360     /// let (runnable, task) = Builder::new().spawn(|()| future, schedule);
361     /// ```
spawn<F, Fut, S>(self, future: F, schedule: S) -> (Runnable<M>, Task<Fut::Output, M>) where F: FnOnce(&M) -> Fut, Fut: Future + Send + 'static, Fut::Output: Send + 'static, S: Schedule<M> + Send + Sync + 'static,362     pub fn spawn<F, Fut, S>(self, future: F, schedule: S) -> (Runnable<M>, Task<Fut::Output, M>)
363     where
364         F: FnOnce(&M) -> Fut,
365         Fut: Future + Send + 'static,
366         Fut::Output: Send + 'static,
367         S: Schedule<M> + Send + Sync + 'static,
368     {
369         unsafe { self.spawn_unchecked(future, schedule) }
370     }
371 
372     /// Creates a new thread-local task.
373     ///
374     /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the
375     /// [`Runnable`] is used or dropped on another thread, a panic will occur.
376     ///
377     /// This function is only available when the `std` feature for this crate is enabled.
378     ///
379     /// # Examples
380     ///
381     /// ```
382     /// use async_task::{Builder, Runnable};
383     /// use flume::{Receiver, Sender};
384     /// use std::rc::Rc;
385     ///
386     /// thread_local! {
387     ///     // A queue that holds scheduled tasks.
388     ///     static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
389     /// }
390     ///
391     /// // Make a non-Send future.
392     /// let msg: Rc<str> = "Hello, world!".into();
393     /// let future = async move {
394     ///     println!("{}", msg);
395     /// };
396     ///
397     /// // A function that schedules the task when it gets woken up.
398     /// let s = QUEUE.with(|(s, _)| s.clone());
399     /// let schedule = move |runnable| s.send(runnable).unwrap();
400     ///
401     /// // Create a task with the future and the schedule function.
402     /// let (runnable, task) = Builder::new().spawn_local(move |()| future, schedule);
403     /// ```
404     #[cfg(feature = "std")]
spawn_local<F, Fut, S>( self, future: F, schedule: S, ) -> (Runnable<M>, Task<Fut::Output, M>) where F: FnOnce(&M) -> Fut, Fut: Future + 'static, Fut::Output: 'static, S: Schedule<M> + Send + Sync + 'static,405     pub fn spawn_local<F, Fut, S>(
406         self,
407         future: F,
408         schedule: S,
409     ) -> (Runnable<M>, Task<Fut::Output, M>)
410     where
411         F: FnOnce(&M) -> Fut,
412         Fut: Future + 'static,
413         Fut::Output: 'static,
414         S: Schedule<M> + Send + Sync + 'static,
415     {
416         use std::mem::ManuallyDrop;
417         use std::pin::Pin;
418         use std::task::{Context, Poll};
419         use std::thread::{self, ThreadId};
420 
421         #[inline]
422         fn thread_id() -> ThreadId {
423             std::thread_local! {
424                 static ID: ThreadId = thread::current().id();
425             }
426             ID.try_with(|id| *id)
427                 .unwrap_or_else(|_| thread::current().id())
428         }
429 
430         struct Checked<F> {
431             id: ThreadId,
432             inner: ManuallyDrop<F>,
433         }
434 
435         impl<F> Drop for Checked<F> {
436             fn drop(&mut self) {
437                 assert!(
438                     self.id == thread_id(),
439                     "local task dropped by a thread that didn't spawn it"
440                 );
441                 unsafe {
442                     ManuallyDrop::drop(&mut self.inner);
443                 }
444             }
445         }
446 
447         impl<F: Future> Future for Checked<F> {
448             type Output = F::Output;
449 
450             fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451                 assert!(
452                     self.id == thread_id(),
453                     "local task polled by a thread that didn't spawn it"
454                 );
455                 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
456             }
457         }
458 
459         // Wrap the future into one that checks which thread it's on.
460         let future = move |meta| {
461             let future = future(meta);
462 
463             Checked {
464                 id: thread_id(),
465                 inner: ManuallyDrop::new(future),
466             }
467         };
468 
469         unsafe { self.spawn_unchecked(future, schedule) }
470     }
471 
472     /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
473     ///
474     /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and
475     /// `'static` on `future` and `schedule`.
476     ///
477     /// # Safety
478     ///
479     /// - If `Fut` is not [`Send`], its [`Runnable`] must be used and dropped on the original
480     ///   thread.
481     /// - If `Fut` is not `'static`, borrowed non-metadata variables must outlive its [`Runnable`].
482     /// - If `schedule` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`]
483     ///   must be used and dropped on the original thread.
484     /// - If `schedule` is not `'static`, borrowed variables must outlive all instances of the
485     ///   [`Runnable`]'s [`Waker`].
486     ///
487     /// # Examples
488     ///
489     /// ```
490     /// use async_task::Builder;
491     ///
492     /// // The future inside the task.
493     /// let future = async {
494     ///     println!("Hello, world!");
495     /// };
496     ///
497     /// // If the task gets woken up, it will be sent into this channel.
498     /// let (s, r) = flume::unbounded();
499     /// let schedule = move |runnable| s.send(runnable).unwrap();
500     ///
501     /// // Create a task with the future and the schedule function.
502     /// let (runnable, task) = unsafe { Builder::new().spawn_unchecked(move |()| future, schedule) };
503     /// ```
spawn_unchecked<'a, F, Fut, S>( self, future: F, schedule: S, ) -> (Runnable<M>, Task<Fut::Output, M>) where F: FnOnce(&'a M) -> Fut, Fut: Future + 'a, S: Schedule<M>, M: 'a,504     pub unsafe fn spawn_unchecked<'a, F, Fut, S>(
505         self,
506         future: F,
507         schedule: S,
508     ) -> (Runnable<M>, Task<Fut::Output, M>)
509     where
510         F: FnOnce(&'a M) -> Fut,
511         Fut: Future + 'a,
512         S: Schedule<M>,
513         M: 'a,
514     {
515         // Allocate large futures on the heap.
516         let ptr = if mem::size_of::<Fut>() >= 2048 {
517             let future = |meta| {
518                 let future = future(meta);
519                 Box::pin(future)
520             };
521 
522             RawTask::<_, Fut::Output, S, M>::allocate(future, schedule, self)
523         } else {
524             RawTask::<Fut, Fut::Output, S, M>::allocate(future, schedule, self)
525         };
526 
527         let runnable = Runnable::from_raw(ptr);
528         let task = Task {
529             ptr,
530             _marker: PhantomData,
531         };
532         (runnable, task)
533     }
534 }
535 
536 /// Creates a new task.
537 ///
538 /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
539 /// output.
540 ///
541 /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
542 /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
543 /// again.
544 ///
545 /// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
546 /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
547 /// should push it into a task queue so that it can be processed later.
548 ///
549 /// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider
550 /// using [`spawn_local()`] or [`spawn_unchecked()`] instead.
551 ///
552 /// # Examples
553 ///
554 /// ```
555 /// // The future inside the task.
556 /// let future = async {
557 ///     println!("Hello, world!");
558 /// };
559 ///
560 /// // A function that schedules the task when it gets woken up.
561 /// let (s, r) = flume::unbounded();
562 /// let schedule = move |runnable| s.send(runnable).unwrap();
563 ///
564 /// // Create a task with the future and the schedule function.
565 /// let (runnable, task) = async_task::spawn(future, schedule);
566 /// ```
spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) where F: Future + Send + 'static, F::Output: Send + 'static, S: Schedule + Send + Sync + 'static,567 pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
568 where
569     F: Future + Send + 'static,
570     F::Output: Send + 'static,
571     S: Schedule + Send + Sync + 'static,
572 {
573     unsafe { spawn_unchecked(future, schedule) }
574 }
575 
576 /// Creates a new thread-local task.
577 ///
578 /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the
579 /// [`Runnable`] is used or dropped on another thread, a panic will occur.
580 ///
581 /// This function is only available when the `std` feature for this crate is enabled.
582 ///
583 /// # Examples
584 ///
585 /// ```
586 /// use async_task::Runnable;
587 /// use flume::{Receiver, Sender};
588 /// use std::rc::Rc;
589 ///
590 /// thread_local! {
591 ///     // A queue that holds scheduled tasks.
592 ///     static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
593 /// }
594 ///
595 /// // Make a non-Send future.
596 /// let msg: Rc<str> = "Hello, world!".into();
597 /// let future = async move {
598 ///     println!("{}", msg);
599 /// };
600 ///
601 /// // A function that schedules the task when it gets woken up.
602 /// let s = QUEUE.with(|(s, _)| s.clone());
603 /// let schedule = move |runnable| s.send(runnable).unwrap();
604 ///
605 /// // Create a task with the future and the schedule function.
606 /// let (runnable, task) = async_task::spawn_local(future, schedule);
607 /// ```
608 #[cfg(feature = "std")]
spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) where F: Future + 'static, F::Output: 'static, S: Schedule + Send + Sync + 'static,609 pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
610 where
611     F: Future + 'static,
612     F::Output: 'static,
613     S: Schedule + Send + Sync + 'static,
614 {
615     Builder::new().spawn_local(move |()| future, schedule)
616 }
617 
618 /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
619 ///
620 /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and
621 /// `'static` on `future` and `schedule`.
622 ///
623 /// # Safety
624 ///
625 /// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original
626 ///   thread.
627 /// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`].
628 /// - If `schedule` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`]
629 ///   must be used and dropped on the original thread.
630 /// - If `schedule` is not `'static`, borrowed variables must outlive all instances of the
631 ///   [`Runnable`]'s [`Waker`].
632 ///
633 /// # Examples
634 ///
635 /// ```
636 /// // The future inside the task.
637 /// let future = async {
638 ///     println!("Hello, world!");
639 /// };
640 ///
641 /// // If the task gets woken up, it will be sent into this channel.
642 /// let (s, r) = flume::unbounded();
643 /// let schedule = move |runnable| s.send(runnable).unwrap();
644 ///
645 /// // Create a task with the future and the schedule function.
646 /// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
647 /// ```
spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) where F: Future, S: Schedule,648 pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
649 where
650     F: Future,
651     S: Schedule,
652 {
653     Builder::new().spawn_unchecked(move |()| future, schedule)
654 }
655 
656 /// A handle to a runnable task.
657 ///
658 /// Every spawned task has a single [`Runnable`] handle, which only exists when the task is
659 /// scheduled for running.
660 ///
661 /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
662 /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
663 /// again.
664 ///
665 /// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and
666 /// awaiting the [`Task`] after that will result in a panic.
667 ///
668 /// # Examples
669 ///
670 /// ```
671 /// use async_task::Runnable;
672 /// use once_cell::sync::Lazy;
673 /// use std::{panic, thread};
674 ///
675 /// // A simple executor.
676 /// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
677 ///     let (sender, receiver) = flume::unbounded::<Runnable>();
678 ///     thread::spawn(|| {
679 ///         for runnable in receiver {
680 ///             let _ignore_panic = panic::catch_unwind(|| runnable.run());
681 ///         }
682 ///     });
683 ///     sender
684 /// });
685 ///
686 /// // Create a task with a simple future.
687 /// let schedule = |runnable| QUEUE.send(runnable).unwrap();
688 /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
689 ///
690 /// // Schedule the task and await its output.
691 /// runnable.schedule();
692 /// assert_eq!(smol::future::block_on(task), 3);
693 /// ```
694 pub struct Runnable<M = ()> {
695     /// A pointer to the heap-allocated task.
696     pub(crate) ptr: NonNull<()>,
697 
698     /// A marker capturing generic type `M`.
699     pub(crate) _marker: PhantomData<M>,
700 }
701 
702 unsafe impl<M: Send + Sync> Send for Runnable<M> {}
703 unsafe impl<M: Send + Sync> Sync for Runnable<M> {}
704 
705 #[cfg(feature = "std")]
706 impl<M> std::panic::UnwindSafe for Runnable<M> {}
707 #[cfg(feature = "std")]
708 impl<M> std::panic::RefUnwindSafe for Runnable<M> {}
709 
710 impl<M> Runnable<M> {
711     /// Get the metadata associated with this task.
712     ///
713     /// Tasks can be created with a metadata object associated with them; by default, this
714     /// is a `()` value. See the [`Builder::metadata()`] method for more information.
metadata(&self) -> &M715     pub fn metadata(&self) -> &M {
716         &self.header().metadata
717     }
718 
719     /// Schedules the task.
720     ///
721     /// This is a convenience method that passes the [`Runnable`] to the schedule function.
722     ///
723     /// # Examples
724     ///
725     /// ```
726     /// // A function that schedules the task when it gets woken up.
727     /// let (s, r) = flume::unbounded();
728     /// let schedule = move |runnable| s.send(runnable).unwrap();
729     ///
730     /// // Create a task with a simple future and the schedule function.
731     /// let (runnable, task) = async_task::spawn(async {}, schedule);
732     ///
733     /// // Schedule the task.
734     /// assert_eq!(r.len(), 0);
735     /// runnable.schedule();
736     /// assert_eq!(r.len(), 1);
737     /// ```
schedule(self)738     pub fn schedule(self) {
739         let ptr = self.ptr.as_ptr();
740         let header = ptr as *const Header<M>;
741         mem::forget(self);
742 
743         unsafe {
744             ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
745         }
746     }
747 
748     /// Runs the task by polling its future.
749     ///
750     /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets
751     /// rescheduled at the end of this method invocation. Otherwise, returns `false` and the
752     /// [`Runnable`] vanishes until the task is woken.
753     /// The return value is just a hint: `true` usually indicates that the task has yielded, i.e.
754     /// it woke itself and then gave the control back to the executor.
755     ///
756     /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then
757     /// this method simply destroys the task.
758     ///
759     /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`]
760     /// after that will also result in a panic.
761     ///
762     /// # Examples
763     ///
764     /// ```
765     /// // A function that schedules the task when it gets woken up.
766     /// let (s, r) = flume::unbounded();
767     /// let schedule = move |runnable| s.send(runnable).unwrap();
768     ///
769     /// // Create a task with a simple future and the schedule function.
770     /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
771     ///
772     /// // Run the task and check its output.
773     /// runnable.run();
774     /// assert_eq!(smol::future::block_on(task), 3);
775     /// ```
run(self) -> bool776     pub fn run(self) -> bool {
777         let ptr = self.ptr.as_ptr();
778         let header = ptr as *const Header<M>;
779         mem::forget(self);
780 
781         unsafe { ((*header).vtable.run)(ptr) }
782     }
783 
784     /// Returns a waker associated with this task.
785     ///
786     /// # Examples
787     ///
788     /// ```
789     /// use smol::future;
790     ///
791     /// // A function that schedules the task when it gets woken up.
792     /// let (s, r) = flume::unbounded();
793     /// let schedule = move |runnable| s.send(runnable).unwrap();
794     ///
795     /// // Create a task with a simple future and the schedule function.
796     /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule);
797     ///
798     /// // Take a waker and run the task.
799     /// let waker = runnable.waker();
800     /// runnable.run();
801     ///
802     /// // Reschedule the task by waking it.
803     /// assert_eq!(r.len(), 0);
804     /// waker.wake();
805     /// assert_eq!(r.len(), 1);
806     /// ```
waker(&self) -> Waker807     pub fn waker(&self) -> Waker {
808         let ptr = self.ptr.as_ptr();
809         let header = ptr as *const Header<M>;
810 
811         unsafe {
812             let raw_waker = ((*header).vtable.clone_waker)(ptr);
813             Waker::from_raw(raw_waker)
814         }
815     }
816 
header(&self) -> &Header<M>817     fn header(&self) -> &Header<M> {
818         unsafe { &*(self.ptr.as_ptr() as *const Header<M>) }
819     }
820 
821     /// Converts this task into a raw pointer.
822     ///
823     /// To avoid a memory leak the pointer must be converted back to a Runnable using [`Runnable<M>::from_raw`][from_raw].
824     ///
825     /// `into_raw` does not change the state of the [`Task`], but there is no guarantee that it will be in the same state after calling [`Runnable<M>::from_raw`][from_raw],
826     /// as the corresponding [`Task`] might have been dropped or cancelled.
827     ///
828     /// # Examples
829     ///
830     /// ```rust
831     /// use async_task::{Runnable, spawn};
832 
833     /// let (runnable, task) = spawn(async {}, |_| {});
834     /// let runnable_pointer = runnable.into_raw();
835     ///
836     /// unsafe {
837     ///     // Convert back to an `Runnable` to prevent leak.
838     ///     let runnable = Runnable::<()>::from_raw(runnable_pointer);
839     ///     runnable.run();
840     ///     // Further calls to `Runnable::from_raw(runnable_pointer)` would be memory-unsafe.
841     /// }
842     /// // The memory was freed when `x` went out of scope above, so `runnable_pointer` is now dangling!
843     /// ```
844     /// [from_raw]: #method.from_raw
into_raw(self) -> NonNull<()>845     pub fn into_raw(self) -> NonNull<()> {
846         let ptr = self.ptr;
847         mem::forget(self);
848         ptr
849     }
850 
851     /// Converts a raw pointer into a Runnable.
852     ///
853     /// # Safety
854     ///
855     /// This method should only be used with raw pointers returned from [`Runnable<M>::into_raw`][into_raw].
856     /// It is not safe to use the provided pointer once it is passed to `from_raw`.
857     /// Crucially, it is unsafe to call `from_raw` multiple times with the same pointer - even if the resulting [`Runnable`] is not used -
858     /// as internally `async-task` uses reference counting.
859     ///
860     /// It is however safe to call [`Runnable<M>::into_raw`][into_raw] on a [`Runnable`] created with `from_raw` or
861     /// after the [`Task`] associated with a given Runnable has been dropped or cancelled.
862     ///
863     /// The state of the [`Runnable`] created with `from_raw` is not specified.
864     ///
865     /// # Examples
866     ///
867     /// ```rust
868     /// use async_task::{Runnable, spawn};
869 
870     /// let (runnable, task) = spawn(async {}, |_| {});
871     /// let runnable_pointer = runnable.into_raw();
872     ///
873     /// drop(task);
874     /// unsafe {
875     ///     // Convert back to an `Runnable` to prevent leak.
876     ///     let runnable = Runnable::<()>::from_raw(runnable_pointer);
877     ///     let did_poll = runnable.run();
878     ///     assert!(!did_poll);
879     ///     // Further calls to `Runnable::from_raw(runnable_pointer)` would be memory-unsafe.
880     /// }
881     /// // The memory was freed when `x` went out of scope above, so `runnable_pointer` is now dangling!
882     /// ```
883 
884     /// [into_raw]: #method.into_raw
from_raw(ptr: NonNull<()>) -> Self885     pub unsafe fn from_raw(ptr: NonNull<()>) -> Self {
886         Self {
887             ptr,
888             _marker: Default::default(),
889         }
890     }
891 }
892 
893 impl<M> Drop for Runnable<M> {
drop(&mut self)894     fn drop(&mut self) {
895         let ptr = self.ptr.as_ptr();
896         let header = self.header();
897 
898         unsafe {
899             let mut state = header.state.load(Ordering::Acquire);
900 
901             loop {
902                 // If the task has been completed or closed, it can't be canceled.
903                 if state & (COMPLETED | CLOSED) != 0 {
904                     break;
905                 }
906 
907                 // Mark the task as closed.
908                 match header.state.compare_exchange_weak(
909                     state,
910                     state | CLOSED,
911                     Ordering::AcqRel,
912                     Ordering::Acquire,
913                 ) {
914                     Ok(_) => break,
915                     Err(s) => state = s,
916                 }
917             }
918 
919             // Drop the future.
920             (header.vtable.drop_future)(ptr);
921 
922             // Mark the task as unscheduled.
923             let state = header.state.fetch_and(!SCHEDULED, Ordering::AcqRel);
924 
925             // Notify the awaiter that the future has been dropped.
926             if state & AWAITER != 0 {
927                 (*header).notify(None);
928             }
929 
930             // Drop the task reference.
931             (header.vtable.drop_ref)(ptr);
932         }
933     }
934 }
935 
936 impl<M: fmt::Debug> fmt::Debug for Runnable<M> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result937     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
938         let ptr = self.ptr.as_ptr();
939         let header = ptr as *const Header<M>;
940 
941         f.debug_struct("Runnable")
942             .field("header", unsafe { &(*header) })
943             .finish()
944     }
945 }
946