1 //! Tokio context aware futures utilities. 2 //! 3 //! This module includes utilities around integrating tokio with other runtimes 4 //! by allowing the context to be attached to futures. This allows spawning 5 //! futures on other executors while still using tokio to drive them. This 6 //! can be useful if you need to use a tokio based library in an executor/runtime 7 //! that does not provide a tokio context. 8 9 use pin_project_lite::pin_project; 10 use std::{ 11 future::Future, 12 pin::Pin, 13 task::{Context, Poll}, 14 }; 15 use tokio::runtime::{Handle, Runtime}; 16 17 pin_project! { 18 /// `TokioContext` allows running futures that must be inside Tokio's 19 /// context on a non-Tokio runtime. 20 /// 21 /// It contains a [`Handle`] to the runtime. A handle to the runtime can be 22 /// obtain by calling the [`Runtime::handle()`] method. 23 /// 24 /// Note that the `TokioContext` wrapper only works if the `Runtime` it is 25 /// connected to has not yet been destroyed. You must keep the `Runtime` 26 /// alive until the future has finished executing. 27 /// 28 /// **Warning:** If `TokioContext` is used together with a [current thread] 29 /// runtime, that runtime must be inside a call to `block_on` for the 30 /// wrapped future to work. For this reason, it is recommended to use a 31 /// [multi thread] runtime, even if you configure it to only spawn one 32 /// worker thread. 33 /// 34 /// # Examples 35 /// 36 /// This example creates two runtimes, but only [enables time] on one of 37 /// them. It then uses the context of the runtime with the timer enabled to 38 /// execute a [`sleep`] future on the runtime with timing disabled. 39 /// ``` 40 /// use tokio::time::{sleep, Duration}; 41 /// use tokio_util::context::RuntimeExt; 42 /// 43 /// // This runtime has timers enabled. 44 /// let rt = tokio::runtime::Builder::new_multi_thread() 45 /// .enable_all() 46 /// .build() 47 /// .unwrap(); 48 /// 49 /// // This runtime has timers disabled. 50 /// let rt2 = tokio::runtime::Builder::new_multi_thread() 51 /// .build() 52 /// .unwrap(); 53 /// 54 /// // Wrap the sleep future in the context of rt. 55 /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await }); 56 /// 57 /// // Execute the future on rt2. 58 /// rt2.block_on(fut); 59 /// ``` 60 /// 61 /// [`Handle`]: struct@tokio::runtime::Handle 62 /// [`Runtime::handle()`]: fn@tokio::runtime::Runtime::handle 63 /// [`RuntimeExt`]: trait@crate::context::RuntimeExt 64 /// [`new_static`]: fn@Self::new_static 65 /// [`sleep`]: fn@tokio::time::sleep 66 /// [current thread]: fn@tokio::runtime::Builder::new_current_thread 67 /// [enables time]: fn@tokio::runtime::Builder::enable_time 68 /// [multi thread]: fn@tokio::runtime::Builder::new_multi_thread 69 pub struct TokioContext<F> { 70 #[pin] 71 inner: F, 72 handle: Handle, 73 } 74 } 75 76 impl<F> TokioContext<F> { 77 /// Associate the provided future with the context of the runtime behind 78 /// the provided `Handle`. 79 /// 80 /// This constructor uses a `'static` lifetime to opt-out of checking that 81 /// the runtime still exists. 82 /// 83 /// # Examples 84 /// 85 /// This is the same as the example above, but uses the `new` constructor 86 /// rather than [`RuntimeExt::wrap`]. 87 /// 88 /// [`RuntimeExt::wrap`]: fn@RuntimeExt::wrap 89 /// 90 /// ``` 91 /// use tokio::time::{sleep, Duration}; 92 /// use tokio_util::context::TokioContext; 93 /// 94 /// // This runtime has timers enabled. 95 /// let rt = tokio::runtime::Builder::new_multi_thread() 96 /// .enable_all() 97 /// .build() 98 /// .unwrap(); 99 /// 100 /// // This runtime has timers disabled. 101 /// let rt2 = tokio::runtime::Builder::new_multi_thread() 102 /// .build() 103 /// .unwrap(); 104 /// 105 /// let fut = TokioContext::new( 106 /// async { sleep(Duration::from_millis(2)).await }, 107 /// rt.handle().clone(), 108 /// ); 109 /// 110 /// // Execute the future on rt2. 111 /// rt2.block_on(fut); 112 /// ``` new(future: F, handle: Handle) -> TokioContext<F>113 pub fn new(future: F, handle: Handle) -> TokioContext<F> { 114 TokioContext { 115 inner: future, 116 handle, 117 } 118 } 119 120 /// Obtain a reference to the handle inside this `TokioContext`. handle(&self) -> &Handle121 pub fn handle(&self) -> &Handle { 122 &self.handle 123 } 124 125 /// Remove the association between the Tokio runtime and the wrapped future. into_inner(self) -> F126 pub fn into_inner(self) -> F { 127 self.inner 128 } 129 } 130 131 impl<F: Future> Future for TokioContext<F> { 132 type Output = F::Output; 133 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>134 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 135 let me = self.project(); 136 let handle = me.handle; 137 let fut = me.inner; 138 139 let _enter = handle.enter(); 140 fut.poll(cx) 141 } 142 } 143 144 /// Extension trait that simplifies bundling a `Handle` with a `Future`. 145 pub trait RuntimeExt { 146 /// Create a [`TokioContext`] that wraps the provided future and runs it in 147 /// this runtime's context. 148 /// 149 /// # Examples 150 /// 151 /// This example creates two runtimes, but only [enables time] on one of 152 /// them. It then uses the context of the runtime with the timer enabled to 153 /// execute a [`sleep`] future on the runtime with timing disabled. 154 /// 155 /// ``` 156 /// use tokio::time::{sleep, Duration}; 157 /// use tokio_util::context::RuntimeExt; 158 /// 159 /// // This runtime has timers enabled. 160 /// let rt = tokio::runtime::Builder::new_multi_thread() 161 /// .enable_all() 162 /// .build() 163 /// .unwrap(); 164 /// 165 /// // This runtime has timers disabled. 166 /// let rt2 = tokio::runtime::Builder::new_multi_thread() 167 /// .build() 168 /// .unwrap(); 169 /// 170 /// // Wrap the sleep future in the context of rt. 171 /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await }); 172 /// 173 /// // Execute the future on rt2. 174 /// rt2.block_on(fut); 175 /// ``` 176 /// 177 /// [`TokioContext`]: struct@crate::context::TokioContext 178 /// [`sleep`]: fn@tokio::time::sleep 179 /// [enables time]: fn@tokio::runtime::Builder::enable_time wrap<F: Future>(&self, fut: F) -> TokioContext<F>180 fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>; 181 } 182 183 impl RuntimeExt for Runtime { wrap<F: Future>(&self, fut: F) -> TokioContext<F>184 fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> { 185 TokioContext { 186 inner: fut, 187 handle: self.handle().clone(), 188 } 189 } 190 } 191