1 use core::fmt;
2 use core::future::Future;
3 use core::marker::PhantomData;
4 use core::mem;
5 use core::pin::Pin;
6 use core::ptr::NonNull;
7 use core::sync::atomic::Ordering;
8 use core::task::{Context, Poll};
9 
10 use crate::header::Header;
11 use crate::raw::Panic;
12 use crate::runnable::ScheduleInfo;
13 use crate::state::*;
14 
15 /// A spawned task.
16 ///
17 /// A [`Task`] can be awaited to retrieve the output of its future.
18 ///
19 /// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
20 /// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
21 /// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
22 /// method.
23 ///
24 /// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
25 /// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking
26 /// [`run()`][`super::Runnable::run()`].
27 ///
28 /// # Examples
29 ///
30 /// ```
31 /// use smol::{future, Executor};
32 /// use std::thread;
33 ///
34 /// let ex = Executor::new();
35 ///
36 /// // Spawn a future onto the executor.
37 /// let task = ex.spawn(async {
38 ///     println!("Hello from a task!");
39 ///     1 + 2
40 /// });
41 ///
42 /// // Run an executor thread.
43 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
44 ///
45 /// // Wait for the task's output.
46 /// assert_eq!(future::block_on(task), 3);
47 /// ```
48 #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
49 pub struct Task<T, M = ()> {
50     /// A raw task pointer.
51     pub(crate) ptr: NonNull<()>,
52 
53     /// A marker capturing generic types `T` and `M`.
54     pub(crate) _marker: PhantomData<(T, M)>,
55 }
56 
57 unsafe impl<T: Send, M: Send + Sync> Send for Task<T, M> {}
58 unsafe impl<T, M: Send + Sync> Sync for Task<T, M> {}
59 
60 impl<T, M> Unpin for Task<T, M> {}
61 
62 #[cfg(feature = "std")]
63 impl<T, M> std::panic::UnwindSafe for Task<T, M> {}
64 #[cfg(feature = "std")]
65 impl<T, M> std::panic::RefUnwindSafe for Task<T, M> {}
66 
67 impl<T, M> Task<T, M> {
68     /// Detaches the task to let it keep running in the background.
69     ///
70     /// # Examples
71     ///
72     /// ```
73     /// use smol::{Executor, Timer};
74     /// use std::time::Duration;
75     ///
76     /// let ex = Executor::new();
77     ///
78     /// // Spawn a deamon future.
79     /// ex.spawn(async {
80     ///     loop {
81     ///         println!("I'm a daemon task looping forever.");
82     ///         Timer::after(Duration::from_secs(1)).await;
83     ///     }
84     /// })
85     /// .detach();
86     /// ```
detach(self)87     pub fn detach(self) {
88         let mut this = self;
89         let _out = this.set_detached();
90         mem::forget(this);
91     }
92 
93     /// Cancels the task and waits for it to stop running.
94     ///
95     /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
96     /// it didn't complete.
97     ///
98     /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
99     /// canceling because it also waits for the task to stop running.
100     ///
101     /// # Examples
102     ///
103     /// ```
104     /// # if cfg!(miri) { return; } // Miri does not support epoll
105     /// use smol::{future, Executor, Timer};
106     /// use std::thread;
107     /// use std::time::Duration;
108     ///
109     /// let ex = Executor::new();
110     ///
111     /// // Spawn a deamon future.
112     /// let task = ex.spawn(async {
113     ///     loop {
114     ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
115     ///         Timer::after(Duration::from_secs(1)).await;
116     ///     }
117     /// });
118     ///
119     /// // Run an executor thread.
120     /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
121     ///
122     /// future::block_on(async {
123     ///     Timer::after(Duration::from_secs(3)).await;
124     ///     task.cancel().await;
125     /// });
126     /// ```
cancel(self) -> Option<T>127     pub async fn cancel(self) -> Option<T> {
128         let mut this = self;
129         this.set_canceled();
130         this.fallible().await
131     }
132 
133     /// Converts this task into a [`FallibleTask`].
134     ///
135     /// Like [`Task`], a fallible task will poll the task's output until it is
136     /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being
137     /// dropped without being run. Resolves to the task's output when completed,
138     /// or [`None`] if it didn't complete.
139     ///
140     /// # Examples
141     ///
142     /// ```
143     /// use smol::{future, Executor};
144     /// use std::thread;
145     ///
146     /// let ex = Executor::new();
147     ///
148     /// // Spawn a future onto the executor.
149     /// let task = ex.spawn(async {
150     ///     println!("Hello from a task!");
151     ///     1 + 2
152     /// })
153     /// .fallible();
154     ///
155     /// // Run an executor thread.
156     /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
157     ///
158     /// // Wait for the task's output.
159     /// assert_eq!(future::block_on(task), Some(3));
160     /// ```
161     ///
162     /// ```
163     /// use smol::future;
164     ///
165     /// // Schedule function which drops the runnable without running it.
166     /// let schedule = move |runnable| drop(runnable);
167     ///
168     /// // Create a task with the future and the schedule function.
169     /// let (runnable, task) = async_task::spawn(async {
170     ///     println!("Hello from a task!");
171     ///     1 + 2
172     /// }, schedule);
173     /// runnable.schedule();
174     ///
175     /// // Wait for the task's output.
176     /// assert_eq!(future::block_on(task.fallible()), None);
177     /// ```
fallible(self) -> FallibleTask<T, M>178     pub fn fallible(self) -> FallibleTask<T, M> {
179         FallibleTask { task: self }
180     }
181 
182     /// Puts the task in canceled state.
set_canceled(&mut self)183     fn set_canceled(&mut self) {
184         let ptr = self.ptr.as_ptr();
185         let header = ptr as *const Header<M>;
186 
187         unsafe {
188             let mut state = (*header).state.load(Ordering::Acquire);
189 
190             loop {
191                 // If the task has been completed or closed, it can't be canceled.
192                 if state & (COMPLETED | CLOSED) != 0 {
193                     break;
194                 }
195 
196                 // If the task is not scheduled nor running, we'll need to schedule it.
197                 let new = if state & (SCHEDULED | RUNNING) == 0 {
198                     (state | SCHEDULED | CLOSED) + REFERENCE
199                 } else {
200                     state | CLOSED
201                 };
202 
203                 // Mark the task as closed.
204                 match (*header).state.compare_exchange_weak(
205                     state,
206                     new,
207                     Ordering::AcqRel,
208                     Ordering::Acquire,
209                 ) {
210                     Ok(_) => {
211                         // If the task is not scheduled nor running, schedule it one more time so
212                         // that its future gets dropped by the executor.
213                         if state & (SCHEDULED | RUNNING) == 0 {
214                             ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
215                         }
216 
217                         // Notify the awaiter that the task has been closed.
218                         if state & AWAITER != 0 {
219                             (*header).notify(None);
220                         }
221 
222                         break;
223                     }
224                     Err(s) => state = s,
225                 }
226             }
227         }
228     }
229 
230     /// Puts the task in detached state.
set_detached(&mut self) -> Option<Result<T, Panic>>231     fn set_detached(&mut self) -> Option<Result<T, Panic>> {
232         let ptr = self.ptr.as_ptr();
233         let header = ptr as *const Header<M>;
234 
235         unsafe {
236             // A place where the output will be stored in case it needs to be dropped.
237             let mut output = None;
238 
239             // Optimistically assume the `Task` is being detached just after creating the task.
240             // This is a common case so if the `Task` is datached, the overhead of it is only one
241             // compare-exchange operation.
242             if let Err(mut state) = (*header).state.compare_exchange_weak(
243                 SCHEDULED | TASK | REFERENCE,
244                 SCHEDULED | REFERENCE,
245                 Ordering::AcqRel,
246                 Ordering::Acquire,
247             ) {
248                 loop {
249                     // If the task has been completed but not yet closed, that means its output
250                     // must be dropped.
251                     if state & COMPLETED != 0 && state & CLOSED == 0 {
252                         // Mark the task as closed in order to grab its output.
253                         match (*header).state.compare_exchange_weak(
254                             state,
255                             state | CLOSED,
256                             Ordering::AcqRel,
257                             Ordering::Acquire,
258                         ) {
259                             Ok(_) => {
260                                 // Read the output.
261                                 output = Some(
262                                     (((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>)
263                                         .read(),
264                                 );
265 
266                                 // Update the state variable because we're continuing the loop.
267                                 state |= CLOSED;
268                             }
269                             Err(s) => state = s,
270                         }
271                     } else {
272                         // If this is the last reference to the task and it's not closed, then
273                         // close it and schedule one more time so that its future gets dropped by
274                         // the executor.
275                         let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
276                             SCHEDULED | CLOSED | REFERENCE
277                         } else {
278                             state & !TASK
279                         };
280 
281                         // Unset the `TASK` flag.
282                         match (*header).state.compare_exchange_weak(
283                             state,
284                             new,
285                             Ordering::AcqRel,
286                             Ordering::Acquire,
287                         ) {
288                             Ok(_) => {
289                                 // If this is the last reference to the task, we need to either
290                                 // schedule dropping its future or destroy it.
291                                 if state & !(REFERENCE - 1) == 0 {
292                                     if state & CLOSED == 0 {
293                                         ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
294                                     } else {
295                                         ((*header).vtable.destroy)(ptr);
296                                     }
297                                 }
298 
299                                 break;
300                             }
301                             Err(s) => state = s,
302                         }
303                     }
304                 }
305             }
306 
307             output
308         }
309     }
310 
311     /// Polls the task to retrieve its output.
312     ///
313     /// Returns `Some` if the task has completed or `None` if it was closed.
314     ///
315     /// A task becomes closed in the following cases:
316     ///
317     /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
318     /// 2. Its output gets awaited by the `Task`.
319     /// 3. It panics while polling the future.
320     /// 4. It is completed and the `Task` gets dropped.
poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>321     fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
322         let ptr = self.ptr.as_ptr();
323         let header = ptr as *const Header<M>;
324 
325         unsafe {
326             let mut state = (*header).state.load(Ordering::Acquire);
327 
328             loop {
329                 // If the task has been closed, notify the awaiter and return `None`.
330                 if state & CLOSED != 0 {
331                     // If the task is scheduled or running, we need to wait until its future is
332                     // dropped.
333                     if state & (SCHEDULED | RUNNING) != 0 {
334                         // Replace the waker with one associated with the current task.
335                         (*header).register(cx.waker());
336 
337                         // Reload the state after registering. It is possible changes occurred just
338                         // before registration so we need to check for that.
339                         state = (*header).state.load(Ordering::Acquire);
340 
341                         // If the task is still scheduled or running, we need to wait because its
342                         // future is not dropped yet.
343                         if state & (SCHEDULED | RUNNING) != 0 {
344                             return Poll::Pending;
345                         }
346                     }
347 
348                     // Even though the awaiter is most likely the current task, it could also be
349                     // another task.
350                     (*header).notify(Some(cx.waker()));
351                     return Poll::Ready(None);
352                 }
353 
354                 // If the task is not completed, register the current task.
355                 if state & COMPLETED == 0 {
356                     // Replace the waker with one associated with the current task.
357                     (*header).register(cx.waker());
358 
359                     // Reload the state after registering. It is possible that the task became
360                     // completed or closed just before registration so we need to check for that.
361                     state = (*header).state.load(Ordering::Acquire);
362 
363                     // If the task has been closed, restart.
364                     if state & CLOSED != 0 {
365                         continue;
366                     }
367 
368                     // If the task is still not completed, we're blocked on it.
369                     if state & COMPLETED == 0 {
370                         return Poll::Pending;
371                     }
372                 }
373 
374                 // Since the task is now completed, mark it as closed in order to grab its output.
375                 match (*header).state.compare_exchange(
376                     state,
377                     state | CLOSED,
378                     Ordering::AcqRel,
379                     Ordering::Acquire,
380                 ) {
381                     Ok(_) => {
382                         // Notify the awaiter. Even though the awaiter is most likely the current
383                         // task, it could also be another task.
384                         if state & AWAITER != 0 {
385                             (*header).notify(Some(cx.waker()));
386                         }
387 
388                         // Take the output from the task.
389                         let output = ((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>;
390                         let output = output.read();
391 
392                         // Propagate the panic if the task panicked.
393                         let output = match output {
394                             Ok(output) => output,
395                             Err(panic) => {
396                                 #[cfg(feature = "std")]
397                                 std::panic::resume_unwind(panic);
398 
399                                 #[cfg(not(feature = "std"))]
400                                 match panic {}
401                             }
402                         };
403 
404                         return Poll::Ready(Some(output));
405                     }
406                     Err(s) => state = s,
407                 }
408             }
409         }
410     }
411 
header(&self) -> &Header<M>412     fn header(&self) -> &Header<M> {
413         let ptr = self.ptr.as_ptr();
414         let header = ptr as *const Header<M>;
415         unsafe { &*header }
416     }
417 
418     /// Returns `true` if the current task is finished.
419     ///
420     /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
is_finished(&self) -> bool421     pub fn is_finished(&self) -> bool {
422         let ptr = self.ptr.as_ptr();
423         let header = ptr as *const Header<M>;
424 
425         unsafe {
426             let state = (*header).state.load(Ordering::Acquire);
427             state & (CLOSED | COMPLETED) != 0
428         }
429     }
430 
431     /// Get the metadata associated with this task.
432     ///
433     /// Tasks can be created with a metadata object associated with them; by default, this
434     /// is a `()` value. See the [`Builder::metadata()`] method for more information.
metadata(&self) -> &M435     pub fn metadata(&self) -> &M {
436         &self.header().metadata
437     }
438 }
439 
440 impl<T, M> Drop for Task<T, M> {
drop(&mut self)441     fn drop(&mut self) {
442         self.set_canceled();
443         self.set_detached();
444     }
445 }
446 
447 impl<T, M> Future for Task<T, M> {
448     type Output = T;
449 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>450     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451         match self.poll_task(cx) {
452             Poll::Ready(t) => Poll::Ready(t.expect("Task polled after completion")),
453             Poll::Pending => Poll::Pending,
454         }
455     }
456 }
457 
458 impl<T, M: fmt::Debug> fmt::Debug for Task<T, M> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result459     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
460         f.debug_struct("Task")
461             .field("header", self.header())
462             .finish()
463     }
464 }
465 
466 /// A spawned task with a fallible response.
467 ///
468 /// This type behaves like [`Task`], however it produces an `Option<T>` when
469 /// polled and will return `None` if the executor dropped its
470 /// [`Runnable`][`super::Runnable`] without being run.
471 ///
472 /// This can be useful to avoid the panic produced when polling the `Task`
473 /// future if the executor dropped its `Runnable`.
474 #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
475 pub struct FallibleTask<T, M = ()> {
476     task: Task<T, M>,
477 }
478 
479 impl<T, M> FallibleTask<T, M> {
480     /// Detaches the task to let it keep running in the background.
481     ///
482     /// # Examples
483     ///
484     /// ```
485     /// use smol::{Executor, Timer};
486     /// use std::time::Duration;
487     ///
488     /// let ex = Executor::new();
489     ///
490     /// // Spawn a deamon future.
491     /// ex.spawn(async {
492     ///     loop {
493     ///         println!("I'm a daemon task looping forever.");
494     ///         Timer::after(Duration::from_secs(1)).await;
495     ///     }
496     /// })
497     /// .fallible()
498     /// .detach();
499     /// ```
detach(self)500     pub fn detach(self) {
501         self.task.detach()
502     }
503 
504     /// Cancels the task and waits for it to stop running.
505     ///
506     /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
507     /// it didn't complete.
508     ///
509     /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
510     /// canceling because it also waits for the task to stop running.
511     ///
512     /// # Examples
513     ///
514     /// ```
515     /// # if cfg!(miri) { return; } // Miri does not support epoll
516     /// use smol::{future, Executor, Timer};
517     /// use std::thread;
518     /// use std::time::Duration;
519     ///
520     /// let ex = Executor::new();
521     ///
522     /// // Spawn a deamon future.
523     /// let task = ex.spawn(async {
524     ///     loop {
525     ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
526     ///         Timer::after(Duration::from_secs(1)).await;
527     ///     }
528     /// })
529     /// .fallible();
530     ///
531     /// // Run an executor thread.
532     /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
533     ///
534     /// future::block_on(async {
535     ///     Timer::after(Duration::from_secs(3)).await;
536     ///     task.cancel().await;
537     /// });
538     /// ```
cancel(self) -> Option<T>539     pub async fn cancel(self) -> Option<T> {
540         self.task.cancel().await
541     }
542 
543     /// Returns `true` if the current task is finished.
544     ///
545     /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
is_finished(&self) -> bool546     pub fn is_finished(&self) -> bool {
547         self.task.is_finished()
548     }
549 }
550 
551 impl<T, M> Future for FallibleTask<T, M> {
552     type Output = Option<T>;
553 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>554     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
555         self.task.poll_task(cx)
556     }
557 }
558 
559 impl<T, M: fmt::Debug> fmt::Debug for FallibleTask<T, M> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result560     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
561         f.debug_struct("FallibleTask")
562             .field("header", self.task.header())
563             .finish()
564     }
565 }
566