1 //! Abstracts out the entire chain of runtime sub-drivers into common types. 2 3 // Eventually, this file will see significant refactoring / cleanup. For now, we 4 // don't need to worry much about dead code with certain feature permutations. 5 #![cfg_attr( 6 any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"), 7 allow(dead_code) 8 )] 9 10 use crate::runtime::park::{ParkThread, UnparkThread}; 11 12 use std::io; 13 use std::time::Duration; 14 15 #[derive(Debug)] 16 pub(crate) struct Driver { 17 inner: TimeDriver, 18 } 19 20 #[derive(Debug)] 21 pub(crate) struct Handle { 22 /// IO driver handle 23 pub(crate) io: IoHandle, 24 25 /// Signal driver handle 26 #[cfg_attr(any(not(unix), loom), allow(dead_code))] 27 pub(crate) signal: SignalHandle, 28 29 /// Time driver handle 30 pub(crate) time: TimeHandle, 31 32 /// Source of `Instant::now()` 33 #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))] 34 pub(crate) clock: Clock, 35 } 36 37 pub(crate) struct Cfg { 38 pub(crate) enable_io: bool, 39 pub(crate) enable_time: bool, 40 pub(crate) enable_pause_time: bool, 41 pub(crate) start_paused: bool, 42 pub(crate) nevents: usize, 43 pub(crate) workers: usize, 44 } 45 46 impl Driver { new(cfg: Cfg) -> io::Result<(Self, Handle)>47 pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> { 48 let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?; 49 50 let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); 51 52 let (time_driver, time_handle) = 53 create_time_driver(cfg.enable_time, io_stack, &clock, cfg.workers); 54 55 Ok(( 56 Self { inner: time_driver }, 57 Handle { 58 io: io_handle, 59 signal: signal_handle, 60 time: time_handle, 61 clock, 62 }, 63 )) 64 } 65 is_enabled(&self) -> bool66 pub(crate) fn is_enabled(&self) -> bool { 67 self.inner.is_enabled() 68 } 69 park(&mut self, handle: &Handle)70 pub(crate) fn park(&mut self, handle: &Handle) { 71 self.inner.park(handle); 72 } 73 park_timeout(&mut self, handle: &Handle, duration: Duration)74 pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { 75 self.inner.park_timeout(handle, duration); 76 } 77 shutdown(&mut self, handle: &Handle)78 pub(crate) fn shutdown(&mut self, handle: &Handle) { 79 self.inner.shutdown(handle); 80 } 81 } 82 83 impl Handle { unpark(&self)84 pub(crate) fn unpark(&self) { 85 #[cfg(feature = "time")] 86 if let Some(handle) = &self.time { 87 handle.unpark(); 88 } 89 90 self.io.unpark(); 91 } 92 93 cfg_io_driver! { 94 #[track_caller] 95 pub(crate) fn io(&self) -> &crate::runtime::io::Handle { 96 self.io 97 .as_ref() 98 .expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.") 99 } 100 } 101 102 cfg_signal_internal_and_unix! { 103 #[track_caller] 104 pub(crate) fn signal(&self) -> &crate::runtime::signal::Handle { 105 self.signal 106 .as_ref() 107 .expect("there is no signal driver running, must be called from the context of Tokio runtime") 108 } 109 } 110 111 cfg_time! { 112 /// Returns a reference to the time driver handle. 113 /// 114 /// Panics if no time driver is present. 115 #[track_caller] 116 pub(crate) fn time(&self) -> &crate::runtime::time::Handle { 117 self.time 118 .as_ref() 119 .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") 120 } 121 122 pub(crate) fn clock(&self) -> &Clock { 123 &self.clock 124 } 125 } 126 } 127 128 // ===== io driver ===== 129 130 cfg_io_driver! { 131 pub(crate) type IoDriver = crate::runtime::io::Driver; 132 133 #[derive(Debug)] 134 pub(crate) enum IoStack { 135 Enabled(ProcessDriver), 136 Disabled(ParkThread), 137 } 138 139 #[derive(Debug)] 140 pub(crate) enum IoHandle { 141 Enabled(crate::runtime::io::Handle), 142 Disabled(UnparkThread), 143 } 144 145 fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> { 146 #[cfg(loom)] 147 assert!(!enabled); 148 149 let ret = if enabled { 150 let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?; 151 152 let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?; 153 let process_driver = create_process_driver(signal_driver); 154 155 (IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle) 156 } else { 157 let park_thread = ParkThread::new(); 158 let unpark_thread = park_thread.unpark(); 159 (IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), Default::default()) 160 }; 161 162 Ok(ret) 163 } 164 165 impl IoStack { 166 pub(crate) fn is_enabled(&self) -> bool { 167 match self { 168 IoStack::Enabled(..) => true, 169 IoStack::Disabled(..) => false, 170 } 171 } 172 173 pub(crate) fn park(&mut self, handle: &Handle) { 174 match self { 175 IoStack::Enabled(v) => v.park(handle), 176 IoStack::Disabled(v) => v.park(), 177 } 178 } 179 180 pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { 181 match self { 182 IoStack::Enabled(v) => v.park_timeout(handle, duration), 183 IoStack::Disabled(v) => v.park_timeout(duration), 184 } 185 } 186 187 pub(crate) fn shutdown(&mut self, handle: &Handle) { 188 match self { 189 IoStack::Enabled(v) => v.shutdown(handle), 190 IoStack::Disabled(v) => v.shutdown(), 191 } 192 } 193 } 194 195 impl IoHandle { 196 pub(crate) fn unpark(&self) { 197 match self { 198 IoHandle::Enabled(handle) => handle.unpark(), 199 IoHandle::Disabled(handle) => handle.unpark(), 200 } 201 } 202 203 pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> { 204 match self { 205 IoHandle::Enabled(v) => Some(v), 206 IoHandle::Disabled(..) => None, 207 } 208 } 209 } 210 } 211 212 cfg_not_io_driver! { 213 pub(crate) type IoHandle = UnparkThread; 214 215 #[derive(Debug)] 216 pub(crate) struct IoStack(ParkThread); 217 218 fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> { 219 let park_thread = ParkThread::new(); 220 let unpark_thread = park_thread.unpark(); 221 Ok((IoStack(park_thread), unpark_thread, Default::default())) 222 } 223 224 impl IoStack { 225 pub(crate) fn park(&mut self, _handle: &Handle) { 226 self.0.park(); 227 } 228 229 pub(crate) fn park_timeout(&mut self, _handle: &Handle, duration: Duration) { 230 self.0.park_timeout(duration); 231 } 232 233 pub(crate) fn shutdown(&mut self, _handle: &Handle) { 234 self.0.shutdown(); 235 } 236 237 /// This is not a "real" driver, so it is not considered enabled. 238 pub(crate) fn is_enabled(&self) -> bool { 239 false 240 } 241 } 242 } 243 244 // ===== signal driver ===== 245 246 cfg_signal_internal_and_unix! { 247 type SignalDriver = crate::runtime::signal::Driver; 248 pub(crate) type SignalHandle = Option<crate::runtime::signal::Handle>; 249 250 fn create_signal_driver(io_driver: IoDriver, io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> { 251 let driver = crate::runtime::signal::Driver::new(io_driver, io_handle)?; 252 let handle = driver.handle(); 253 Ok((driver, Some(handle))) 254 } 255 } 256 257 cfg_not_signal_internal! { 258 pub(crate) type SignalHandle = (); 259 260 cfg_io_driver! { 261 type SignalDriver = IoDriver; 262 263 fn create_signal_driver(io_driver: IoDriver, _io_handle: &crate::runtime::io::Handle) -> io::Result<(SignalDriver, SignalHandle)> { 264 Ok((io_driver, ())) 265 } 266 } 267 } 268 269 // ===== process driver ===== 270 271 cfg_process_driver! { 272 type ProcessDriver = crate::runtime::process::Driver; 273 274 fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver { 275 ProcessDriver::new(signal_driver) 276 } 277 } 278 279 cfg_not_process_driver! { 280 cfg_io_driver! { 281 type ProcessDriver = SignalDriver; 282 283 fn create_process_driver(signal_driver: SignalDriver) -> ProcessDriver { 284 signal_driver 285 } 286 } 287 } 288 289 // ===== time driver ===== 290 291 cfg_time! { 292 #[derive(Debug)] 293 pub(crate) enum TimeDriver { 294 Enabled { 295 driver: crate::runtime::time::Driver, 296 }, 297 Disabled(IoStack), 298 } 299 300 pub(crate) type Clock = crate::time::Clock; 301 pub(crate) type TimeHandle = Option<crate::runtime::time::Handle>; 302 303 fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock { 304 crate::time::Clock::new(enable_pausing, start_paused) 305 } 306 307 fn create_time_driver( 308 enable: bool, 309 io_stack: IoStack, 310 clock: &Clock, 311 workers: usize, 312 ) -> (TimeDriver, TimeHandle) { 313 if enable { 314 let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock, workers as u32); 315 316 (TimeDriver::Enabled { driver }, Some(handle)) 317 } else { 318 (TimeDriver::Disabled(io_stack), None) 319 } 320 } 321 322 impl TimeDriver { 323 pub(crate) fn is_enabled(&self) -> bool { 324 match self { 325 TimeDriver::Enabled { .. } => true, 326 TimeDriver::Disabled(inner) => inner.is_enabled(), 327 } 328 } 329 330 pub(crate) fn park(&mut self, handle: &Handle) { 331 match self { 332 TimeDriver::Enabled { driver, .. } => driver.park(handle), 333 TimeDriver::Disabled(v) => v.park(handle), 334 } 335 } 336 337 pub(crate) fn park_timeout(&mut self, handle: &Handle, duration: Duration) { 338 match self { 339 TimeDriver::Enabled { driver } => driver.park_timeout(handle, duration), 340 TimeDriver::Disabled(v) => v.park_timeout(handle, duration), 341 } 342 } 343 344 pub(crate) fn shutdown(&mut self, handle: &Handle) { 345 match self { 346 TimeDriver::Enabled { driver } => driver.shutdown(handle), 347 TimeDriver::Disabled(v) => v.shutdown(handle), 348 } 349 } 350 } 351 } 352 353 cfg_not_time! { 354 type TimeDriver = IoStack; 355 356 pub(crate) type Clock = (); 357 pub(crate) type TimeHandle = (); 358 359 fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock { 360 () 361 } 362 363 fn create_time_driver( 364 _enable: bool, 365 io_stack: IoStack, 366 _clock: &Clock, 367 _workers: usize, 368 ) -> (TimeDriver, TimeHandle) { 369 (io_stack, ()) 370 } 371 } 372