1 //! Tokio context aware futures utilities.
2 //!
3 //! This module includes utilities around integrating tokio with other runtimes
4 //! by allowing the context to be attached to futures. This allows spawning
5 //! futures on other executors while still using tokio to drive them. This
6 //! can be useful if you need to use a tokio based library in an executor/runtime
7 //! that does not provide a tokio context.
8 
9 use pin_project_lite::pin_project;
10 use std::{
11     future::Future,
12     pin::Pin,
13     task::{Context, Poll},
14 };
15 use tokio::runtime::{Handle, Runtime};
16 
17 pin_project! {
18     /// `TokioContext` allows running futures that must be inside Tokio's
19     /// context on a non-Tokio runtime.
20     ///
21     /// It contains a [`Handle`] to the runtime. A handle to the runtime can be
22     /// obtain by calling the [`Runtime::handle()`] method.
23     ///
24     /// Note that the `TokioContext` wrapper only works if the `Runtime` it is
25     /// connected to has not yet been destroyed. You must keep the `Runtime`
26     /// alive until the future has finished executing.
27     ///
28     /// **Warning:** If `TokioContext` is used together with a [current thread]
29     /// runtime, that runtime must be inside a call to `block_on` for the
30     /// wrapped future to work. For this reason, it is recommended to use a
31     /// [multi thread] runtime, even if you configure it to only spawn one
32     /// worker thread.
33     ///
34     /// # Examples
35     ///
36     /// This example creates two runtimes, but only [enables time] on one of
37     /// them. It then uses the context of the runtime with the timer enabled to
38     /// execute a [`sleep`] future on the runtime with timing disabled.
39     /// ```
40     /// use tokio::time::{sleep, Duration};
41     /// use tokio_util::context::RuntimeExt;
42     ///
43     /// // This runtime has timers enabled.
44     /// let rt = tokio::runtime::Builder::new_multi_thread()
45     ///     .enable_all()
46     ///     .build()
47     ///     .unwrap();
48     ///
49     /// // This runtime has timers disabled.
50     /// let rt2 = tokio::runtime::Builder::new_multi_thread()
51     ///     .build()
52     ///     .unwrap();
53     ///
54     /// // Wrap the sleep future in the context of rt.
55     /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await });
56     ///
57     /// // Execute the future on rt2.
58     /// rt2.block_on(fut);
59     /// ```
60     ///
61     /// [`Handle`]: struct@tokio::runtime::Handle
62     /// [`Runtime::handle()`]: fn@tokio::runtime::Runtime::handle
63     /// [`RuntimeExt`]: trait@crate::context::RuntimeExt
64     /// [`new_static`]: fn@Self::new_static
65     /// [`sleep`]: fn@tokio::time::sleep
66     /// [current thread]: fn@tokio::runtime::Builder::new_current_thread
67     /// [enables time]: fn@tokio::runtime::Builder::enable_time
68     /// [multi thread]: fn@tokio::runtime::Builder::new_multi_thread
69     pub struct TokioContext<F> {
70         #[pin]
71         inner: F,
72         handle: Handle,
73     }
74 }
75 
76 impl<F> TokioContext<F> {
77     /// Associate the provided future with the context of the runtime behind
78     /// the provided `Handle`.
79     ///
80     /// This constructor uses a `'static` lifetime to opt-out of checking that
81     /// the runtime still exists.
82     ///
83     /// # Examples
84     ///
85     /// This is the same as the example above, but uses the `new` constructor
86     /// rather than [`RuntimeExt::wrap`].
87     ///
88     /// [`RuntimeExt::wrap`]: fn@RuntimeExt::wrap
89     ///
90     /// ```
91     /// use tokio::time::{sleep, Duration};
92     /// use tokio_util::context::TokioContext;
93     ///
94     /// // This runtime has timers enabled.
95     /// let rt = tokio::runtime::Builder::new_multi_thread()
96     ///     .enable_all()
97     ///     .build()
98     ///     .unwrap();
99     ///
100     /// // This runtime has timers disabled.
101     /// let rt2 = tokio::runtime::Builder::new_multi_thread()
102     ///     .build()
103     ///     .unwrap();
104     ///
105     /// let fut = TokioContext::new(
106     ///     async { sleep(Duration::from_millis(2)).await },
107     ///     rt.handle().clone(),
108     /// );
109     ///
110     /// // Execute the future on rt2.
111     /// rt2.block_on(fut);
112     /// ```
new(future: F, handle: Handle) -> TokioContext<F>113     pub fn new(future: F, handle: Handle) -> TokioContext<F> {
114         TokioContext {
115             inner: future,
116             handle,
117         }
118     }
119 
120     /// Obtain a reference to the handle inside this `TokioContext`.
handle(&self) -> &Handle121     pub fn handle(&self) -> &Handle {
122         &self.handle
123     }
124 
125     /// Remove the association between the Tokio runtime and the wrapped future.
into_inner(self) -> F126     pub fn into_inner(self) -> F {
127         self.inner
128     }
129 }
130 
131 impl<F: Future> Future for TokioContext<F> {
132     type Output = F::Output;
133 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>134     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
135         let me = self.project();
136         let handle = me.handle;
137         let fut = me.inner;
138 
139         let _enter = handle.enter();
140         fut.poll(cx)
141     }
142 }
143 
144 /// Extension trait that simplifies bundling a `Handle` with a `Future`.
145 pub trait RuntimeExt {
146     /// Create a [`TokioContext`] that wraps the provided future and runs it in
147     /// this runtime's context.
148     ///
149     /// # Examples
150     ///
151     /// This example creates two runtimes, but only [enables time] on one of
152     /// them. It then uses the context of the runtime with the timer enabled to
153     /// execute a [`sleep`] future on the runtime with timing disabled.
154     ///
155     /// ```
156     /// use tokio::time::{sleep, Duration};
157     /// use tokio_util::context::RuntimeExt;
158     ///
159     /// // This runtime has timers enabled.
160     /// let rt = tokio::runtime::Builder::new_multi_thread()
161     ///     .enable_all()
162     ///     .build()
163     ///     .unwrap();
164     ///
165     /// // This runtime has timers disabled.
166     /// let rt2 = tokio::runtime::Builder::new_multi_thread()
167     ///     .build()
168     ///     .unwrap();
169     ///
170     /// // Wrap the sleep future in the context of rt.
171     /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await });
172     ///
173     /// // Execute the future on rt2.
174     /// rt2.block_on(fut);
175     /// ```
176     ///
177     /// [`TokioContext`]: struct@crate::context::TokioContext
178     /// [`sleep`]: fn@tokio::time::sleep
179     /// [enables time]: fn@tokio::runtime::Builder::enable_time
wrap<F: Future>(&self, fut: F) -> TokioContext<F>180     fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
181 }
182 
183 impl RuntimeExt for Runtime {
wrap<F: Future>(&self, fut: F) -> TokioContext<F>184     fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
185         TokioContext {
186             inner: fut,
187             handle: self.handle().clone(),
188         }
189     }
190 }
191