//! An asynchronously awaitable `CancellationToken`. //! The token allows to signal a cancellation request to one or more tasks. pub(crate) mod guard; mod tree_node; use crate::loom::sync::Arc; use crate::util::MaybeDangling; use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; use guard::DropGuard; use pin_project_lite::pin_project; /// A token which can be used to signal a cancellation request to one or more /// tasks. /// /// Tasks can call [`CancellationToken::cancelled()`] in order to /// obtain a Future which will be resolved when cancellation is requested. /// /// Cancellation can be requested through the [`CancellationToken::cancel`] method. /// /// # Examples /// /// ```no_run /// use tokio::select; /// use tokio_util::sync::CancellationToken; /// /// #[tokio::main] /// async fn main() { /// let token = CancellationToken::new(); /// let cloned_token = token.clone(); /// /// let join_handle = tokio::spawn(async move { /// // Wait for either cancellation or a very long time /// select! { /// _ = cloned_token.cancelled() => { /// // The token was cancelled /// 5 /// } /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { /// 99 /// } /// } /// }); /// /// tokio::spawn(async move { /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; /// token.cancel(); /// }); /// /// assert_eq!(5, join_handle.await.unwrap()); /// } /// ``` pub struct CancellationToken { inner: Arc, } impl std::panic::UnwindSafe for CancellationToken {} impl std::panic::RefUnwindSafe for CancellationToken {} pin_project! { /// A Future that is resolved once the corresponding [`CancellationToken`] /// is cancelled. #[must_use = "futures do nothing unless polled"] pub struct WaitForCancellationFuture<'a> { cancellation_token: &'a CancellationToken, #[pin] future: tokio::sync::futures::Notified<'a>, } } pin_project! { /// A Future that is resolved once the corresponding [`CancellationToken`] /// is cancelled. /// /// This is the counterpart to [`WaitForCancellationFuture`] that takes /// [`CancellationToken`] by value instead of using a reference. #[must_use = "futures do nothing unless polled"] pub struct WaitForCancellationFutureOwned { // This field internally has a reference to the cancellation token, but camouflages // the relationship with `'static`. To avoid Undefined Behavior, we must ensure // that the reference is only used while the cancellation token is still alive. To // do that, we ensure that the future is the first field, so that it is dropped // before the cancellation token. // // We use `MaybeDanglingFuture` here because without it, the compiler could assert // the reference inside `future` to be valid even after the destructor of that // field runs. (Specifically, when the `WaitForCancellationFutureOwned` is passed // as an argument to a function, the reference can be asserted to be valid for the // rest of that function.) To avoid that, we use `MaybeDangling` which tells the // compiler that the reference stored inside it might not be valid. // // See // for more info. #[pin] future: MaybeDangling>, cancellation_token: CancellationToken, } } // ===== impl CancellationToken ===== impl core::fmt::Debug for CancellationToken { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("CancellationToken") .field("is_cancelled", &self.is_cancelled()) .finish() } } impl Clone for CancellationToken { /// Creates a clone of the `CancellationToken` which will get cancelled /// whenever the current token gets cancelled, and vice versa. fn clone(&self) -> Self { tree_node::increase_handle_refcount(&self.inner); CancellationToken { inner: self.inner.clone(), } } } impl Drop for CancellationToken { fn drop(&mut self) { tree_node::decrease_handle_refcount(&self.inner); } } impl Default for CancellationToken { fn default() -> CancellationToken { CancellationToken::new() } } impl CancellationToken { /// Creates a new `CancellationToken` in the non-cancelled state. pub fn new() -> CancellationToken { CancellationToken { inner: Arc::new(tree_node::TreeNode::new()), } } /// Creates a `CancellationToken` which will get cancelled whenever the /// current token gets cancelled. Unlike a cloned `CancellationToken`, /// cancelling a child token does not cancel the parent token. /// /// If the current token is already cancelled, the child token will get /// returned in cancelled state. /// /// # Examples /// /// ```no_run /// use tokio::select; /// use tokio_util::sync::CancellationToken; /// /// #[tokio::main] /// async fn main() { /// let token = CancellationToken::new(); /// let child_token = token.child_token(); /// /// let join_handle = tokio::spawn(async move { /// // Wait for either cancellation or a very long time /// select! { /// _ = child_token.cancelled() => { /// // The token was cancelled /// 5 /// } /// _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { /// 99 /// } /// } /// }); /// /// tokio::spawn(async move { /// tokio::time::sleep(std::time::Duration::from_millis(10)).await; /// token.cancel(); /// }); /// /// assert_eq!(5, join_handle.await.unwrap()); /// } /// ``` pub fn child_token(&self) -> CancellationToken { CancellationToken { inner: tree_node::child_node(&self.inner), } } /// Cancel the [`CancellationToken`] and all child tokens which had been /// derived from it. /// /// This will wake up all tasks which are waiting for cancellation. /// /// Be aware that cancellation is not an atomic operation. It is possible /// for another thread running in parallel with a call to `cancel` to first /// receive `true` from `is_cancelled` on one child node, and then receive /// `false` from `is_cancelled` on another child node. However, once the /// call to `cancel` returns, all child nodes have been fully cancelled. pub fn cancel(&self) { tree_node::cancel(&self.inner); } /// Returns `true` if the `CancellationToken` is cancelled. pub fn is_cancelled(&self) -> bool { tree_node::is_cancelled(&self.inner) } /// Returns a `Future` that gets fulfilled when cancellation is requested. /// /// The future will complete immediately if the token is already cancelled /// when this method is called. /// /// # Cancel safety /// /// This method is cancel safe. pub fn cancelled(&self) -> WaitForCancellationFuture<'_> { WaitForCancellationFuture { cancellation_token: self, future: self.inner.notified(), } } /// Returns a `Future` that gets fulfilled when cancellation is requested. /// /// The future will complete immediately if the token is already cancelled /// when this method is called. /// /// The function takes self by value and returns a future that owns the /// token. /// /// # Cancel safety /// /// This method is cancel safe. pub fn cancelled_owned(self) -> WaitForCancellationFutureOwned { WaitForCancellationFutureOwned::new(self) } /// Creates a `DropGuard` for this token. /// /// Returned guard will cancel this token (and all its children) on drop /// unless disarmed. pub fn drop_guard(self) -> DropGuard { DropGuard { inner: Some(self) } } /// Runs a future to completion and returns its result wrapped inside of an `Option` /// unless the `CancellationToken` is cancelled. In that case the function returns /// `None` and the future gets dropped. /// /// # Cancel safety /// /// This method is only cancel safe if `fut` is cancel safe. pub async fn run_until_cancelled(&self, fut: F) -> Option where F: Future, { pin_project! { /// A Future that is resolved once the corresponding [`CancellationToken`] /// is cancelled or a given Future gets resolved. It is biased towards the /// Future completion. #[must_use = "futures do nothing unless polled"] struct RunUntilCancelledFuture<'a, F: Future> { #[pin] cancellation: WaitForCancellationFuture<'a>, #[pin] future: F, } } impl<'a, F: Future> Future for RunUntilCancelledFuture<'a, F> { type Output = Option; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); if let Poll::Ready(res) = this.future.poll(cx) { Poll::Ready(Some(res)) } else if this.cancellation.poll(cx).is_ready() { Poll::Ready(None) } else { Poll::Pending } } } RunUntilCancelledFuture { cancellation: self.cancelled(), future: fut, } .await } } // ===== impl WaitForCancellationFuture ===== impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("WaitForCancellationFuture").finish() } } impl<'a> Future for WaitForCancellationFuture<'a> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let mut this = self.project(); loop { if this.cancellation_token.is_cancelled() { return Poll::Ready(()); } // No wakeups can be lost here because there is always a call to // `is_cancelled` between the creation of the future and the call to // `poll`, and the code that sets the cancelled flag does so before // waking the `Notified`. if this.future.as_mut().poll(cx).is_pending() { return Poll::Pending; } this.future.set(this.cancellation_token.inner.notified()); } } } // ===== impl WaitForCancellationFutureOwned ===== impl core::fmt::Debug for WaitForCancellationFutureOwned { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("WaitForCancellationFutureOwned").finish() } } impl WaitForCancellationFutureOwned { fn new(cancellation_token: CancellationToken) -> Self { WaitForCancellationFutureOwned { // cancellation_token holds a heap allocation and is guaranteed to have a // stable deref, thus it would be ok to move the cancellation_token while // the future holds a reference to it. // // # Safety // // cancellation_token is dropped after future due to the field ordering. future: MaybeDangling::new(unsafe { Self::new_future(&cancellation_token) }), cancellation_token, } } /// # Safety /// The returned future must be destroyed before the cancellation token is /// destroyed. unsafe fn new_future( cancellation_token: &CancellationToken, ) -> tokio::sync::futures::Notified<'static> { let inner_ptr = Arc::as_ptr(&cancellation_token.inner); // SAFETY: The `Arc::as_ptr` method guarantees that `inner_ptr` remains // valid until the strong count of the Arc drops to zero, and the caller // guarantees that they will drop the future before that happens. (*inner_ptr).notified() } } impl Future for WaitForCancellationFutureOwned { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let mut this = self.project(); loop { if this.cancellation_token.is_cancelled() { return Poll::Ready(()); } // No wakeups can be lost here because there is always a call to // `is_cancelled` between the creation of the future and the call to // `poll`, and the code that sets the cancelled flag does so before // waking the `Notified`. if this.future.as_mut().poll(cx).is_pending() { return Poll::Pending; } // # Safety // // cancellation_token is dropped after future due to the field ordering. this.future.set(MaybeDangling::new(unsafe { Self::new_future(this.cancellation_token) })); } } }