1 //! Abstracts out the entire chain of runtime sub-drivers into common types.
2 
3 // Eventually, this file will see significant refactoring / cleanup. For now, we
4 // don't need to worry much about dead code with certain feature permutations.
5 #![cfg_attr(
6     any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
7     allow(dead_code)
8 )]
9 
10 use crate::runtime::park::{ParkThread, UnparkThread};
11 
12 use std::io;
13 use std::time::Duration;
14 
15 #[derive(Debug)]
16 pub(crate) struct Driver {
17     inner: TimeDriver,
18 }
19 
20 #[derive(Debug)]
21 pub(crate) struct Handle {
22     /// IO driver handle
23     pub(crate) io: IoHandle,
24 
25     /// Signal driver handle
26     #[cfg_attr(any(not(unix), loom), allow(dead_code))]
27     pub(crate) signal: SignalHandle,
28 
29     /// Time driver handle
30     pub(crate) time: TimeHandle,
31 
32     /// Source of `Instant::now()`
33     #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
34     pub(crate) clock: Clock,
35 }
36 
37 pub(crate) struct Cfg {
38     pub(crate) enable_io: bool,
39     pub(crate) enable_time: bool,
40     pub(crate) enable_pause_time: bool,
41     pub(crate) start_paused: bool,
42     pub(crate) nevents: usize,
43     pub(crate) workers: usize,
44 }
45 
46 impl Driver {
new(cfg: Cfg) -> io::Result<(Self, Handle)>47     pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
48         let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;
49 
50         let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
51 
52         let (time_driver, time_handle) =
53             create_time_driver(cfg.enable_time, io_stack, &clock, cfg.workers);
54 
55         Ok((
56             Self { inner: time_driver },
57             Handle {
58                 io: io_handle,
59                 signal: signal_handle,
60                 time: time_handle,
61                 clock,
62             },
63         ))
64     }
65 
is_enabled(&self) -> bool66     pub(crate) fn is_enabled(&self) -> bool {
67         self.inner.is_enabled()
68     }
69 
park(&mut self, handle: &Handle)70     pub(crate) fn park(&mut self, handle: &Handle) {
71         self.inner.park(handle);
72     }
73 
park_timeout(&mut self, handle: &Handle, duration: Duration)74     pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
75         self.inner.park_timeout(handle, duration);
76     }
77 
shutdown(&mut self, handle: &Handle)78     pub(crate) fn shutdown(&mut self, handle: &Handle) {
79         self.inner.shutdown(handle);
80     }
81 }
82 
83 impl Handle {
unpark(&self)84     pub(crate) fn unpark(&self) {
85         #[cfg(feature = "time")]
86         if let Some(handle) = &self.time {
87             handle.unpark();
88         }
89 
90         self.io.unpark();
91     }
92 
93     cfg_io_driver! {
94         #[track_caller]
95         pub(crate) fn io(&self) -> &crate::runtime::io::Handle {
96             self.io
97                 .as_ref()
98                 .expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
99         }
100     }
101 
102     cfg_signal_internal_and_unix! {
103         #[track_caller]
104         pub(crate) fn signal(&self) -> &crate::runtime::signal::Handle {
105             self.signal
106                 .as_ref()
107                 .expect("there is no signal driver running, must be called from the context of Tokio runtime")
108         }
109     }
110 
111     cfg_time! {
112         /// Returns a reference to the time driver handle.
113         ///
114         /// Panics if no time driver is present.
115         #[track_caller]
116         pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
117             self.time
118                 .as_ref()
119                 .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
120         }
121 
122         pub(crate) fn clock(&self) -> &Clock {
123             &self.clock
124         }
125     }
126 }
127 
128 // ===== io driver =====
129 
130 cfg_io_driver! {
131     pub(crate) type IoDriver = crate::runtime::io::Driver;
132 
133     #[derive(Debug)]
134     pub(crate) enum IoStack {
135         Enabled(ProcessDriver),
136         Disabled(ParkThread),
137     }
138 
139     #[derive(Debug)]
140     pub(crate) enum IoHandle {
141         Enabled(crate::runtime::io::Handle),
142         Disabled(UnparkThread),
143     }
144 
145     fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
146         #[cfg(loom)]
147         assert!(!enabled);
148 
149         let ret = if enabled {
150             let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;
151 
152             let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
153             let process_driver = create_process_driver(signal_driver);
154 
155             (IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle)
156         } else {
157             let park_thread = ParkThread::new();
158             let unpark_thread = park_thread.unpark();
159             (IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), Default::default())
160         };
161 
162         Ok(ret)
163     }
164 
165     impl IoStack {
166         pub(crate) fn is_enabled(&self) -> bool {
167             match self {
168                 IoStack::Enabled(..) => true,
169                 IoStack::Disabled(..) => false,
170             }
171         }
172 
173         pub(crate) fn park(&mut self, handle: &Handle) {
174             match self {
175                 IoStack::Enabled(v) => v.park(handle),
176                 IoStack::Disabled(v) => v.park(),
177             }
178         }
179 
180         pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
181             match self {
182                 IoStack::Enabled(v) => v.park_timeout(handle, duration),
183                 IoStack::Disabled(v) => v.park_timeout(duration),
184             }
185         }
186 
187         pub(crate) fn shutdown(&mut self, handle: &Handle) {
188             match self {
189                 IoStack::Enabled(v) => v.shutdown(handle),
190                 IoStack::Disabled(v) => v.shutdown(),
191             }
192         }
193     }
194 
195     impl IoHandle {
196         pub(crate) fn unpark(&self) {
197             match self {
198                 IoHandle::Enabled(handle) => handle.unpark(),
199                 IoHandle::Disabled(handle) => handle.unpark(),
200             }
201         }
202 
203         pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
204             match self {
205                 IoHandle::Enabled(v) => Some(v),
206                 IoHandle::Disabled(..) => None,
207             }
208         }
209     }
210 }
211 
212 cfg_not_io_driver! {
213     pub(crate) type IoHandle = UnparkThread;
214 
215     #[derive(Debug)]
216     pub(crate) struct IoStack(ParkThread);
217 
218     fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
219         let park_thread = ParkThread::new();
220         let unpark_thread = park_thread.unpark();
221         Ok((IoStack(park_thread), unpark_thread, Default::default()))
222     }
223 
224     impl IoStack {
225         pub(crate) fn park(&mut self, _handle: &Handle) {
226             self.0.park();
227         }
228 
229         pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) {
230             self.0.park_timeout(duration);
231         }
232 
233         pub(crate) fn shutdown(&mut self, _handle: &Handle) {
234             self.0.shutdown();
235         }
236 
237         /// This is not a "real" driver, so it is not considered enabled.
238         pub(crate) fn is_enabled(&self) -> bool {
239             false
240         }
241     }
242 }
243 
244 // ===== signal driver =====
245 
246 cfg_signal_internal_and_unix! {
247     type SignalDriver = crate::runtime::signal::Driver;
248     pub(crate) type SignalHandle = Option<crate::runtime::signal::Handle>;
249 
250     fn create_signal_driver(io_driver: IoDriver, io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
251         let driver = crate::runtime::signal::Driver::new(io_driver, io_handle)?;
252         let handle = driver.handle();
253         Ok((driver, Some(handle)))
254     }
255 }
256 
257 cfg_not_signal_internal! {
258     pub(crate) type SignalHandle = ();
259 
260     cfg_io_driver! {
261         type SignalDriver = IoDriver;
262 
263         fn create_signal_driver(io_driver: IoDriver, _io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> {
264             Ok((io_driver, ()))
265         }
266     }
267 }
268 
269 // ===== process driver =====
270 
271 cfg_process_driver! {
272     type ProcessDriver = crate::runtime::process::Driver;
273 
274     fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
275         ProcessDriver::new(signal_driver)
276     }
277 }
278 
279 cfg_not_process_driver! {
280     cfg_io_driver! {
281         type ProcessDriver = SignalDriver;
282 
283         fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver {
284             signal_driver
285         }
286     }
287 }
288 
289 // ===== time driver =====
290 
291 cfg_time! {
292     #[derive(Debug)]
293     pub(crate) enum TimeDriver {
294         Enabled {
295             driver: crate::runtime::time::Driver,
296         },
297         Disabled(IoStack),
298     }
299 
300     pub(crate) type Clock = crate::time::Clock;
301     pub(crate) type TimeHandle = Option<crate::runtime::time::Handle>;
302 
303     fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
304         crate::time::Clock::new(enable_pausing, start_paused)
305     }
306 
307     fn create_time_driver(
308         enable: bool,
309         io_stack: IoStack,
310         clock: &Clock,
311         workers: usize,
312     ) -> (TimeDriver, TimeHandle) {
313         if enable {
314             let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock, workers as u32);
315 
316             (TimeDriver::Enabled { driver }, Some(handle))
317         } else {
318             (TimeDriver::Disabled(io_stack), None)
319         }
320     }
321 
322     impl TimeDriver {
323         pub(crate) fn is_enabled(&self) -> bool {
324             match self {
325                 TimeDriver::Enabled { .. } => true,
326                 TimeDriver::Disabled(inner) => inner.is_enabled(),
327             }
328         }
329 
330         pub(crate) fn park(&mut self, handle: &Handle) {
331             match self {
332                 TimeDriver::Enabled { driver, .. } => driver.park(handle),
333                 TimeDriver::Disabled(v) => v.park(handle),
334             }
335         }
336 
337         pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) {
338             match self {
339                 TimeDriver::Enabled { driver } => driver.park_timeout(handle, duration),
340                 TimeDriver::Disabled(v) => v.park_timeout(handle, duration),
341             }
342         }
343 
344         pub(crate) fn shutdown(&mut self, handle: &Handle) {
345             match self {
346                 TimeDriver::Enabled { driver } => driver.shutdown(handle),
347                 TimeDriver::Disabled(v) => v.shutdown(handle),
348             }
349         }
350     }
351 }
352 
353 cfg_not_time! {
354     type TimeDriver = IoStack;
355 
356     pub(crate) type Clock = ();
357     pub(crate) type TimeHandle = ();
358 
359     fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock {
360         ()
361     }
362 
363     fn create_time_driver(
364         _enable: bool,
365         io_stack: IoStack,
366         _clock: &Clock,
367         _workers: usize,
368     ) -> (TimeDriver, TimeHandle) {
369         (io_stack, ())
370     }
371 }
372