1 cfg_rt! { 2 pub(crate) mod current_thread; 3 pub(crate) use current_thread::CurrentThread; 4 5 mod defer; 6 use defer::Defer; 7 8 pub(crate) mod inject; 9 pub(crate) use inject::Inject; 10 11 use crate::runtime::TaskHooks; 12 } 13 14 cfg_rt_multi_thread! { 15 mod block_in_place; 16 pub(crate) use block_in_place::block_in_place; 17 18 mod lock; 19 use lock::Lock; 20 21 pub(crate) mod multi_thread; 22 pub(crate) use multi_thread::MultiThread; 23 24 cfg_unstable! { 25 pub(crate) mod multi_thread_alt; 26 pub(crate) use multi_thread_alt::MultiThread as MultiThreadAlt; 27 } 28 } 29 30 use crate::runtime::driver; 31 32 #[derive(Debug, Clone)] 33 pub(crate) enum Handle { 34 #[cfg(feature = "rt")] 35 CurrentThread(Arc<current_thread::Handle>), 36 37 #[cfg(feature = "rt-multi-thread")] 38 MultiThread(Arc<multi_thread::Handle>), 39 40 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 41 MultiThreadAlt(Arc<multi_thread_alt::Handle>), 42 43 // TODO: This is to avoid triggering "dead code" warnings many other places 44 // in the codebase. Remove this during a later cleanup 45 #[cfg(not(feature = "rt"))] 46 #[allow(dead_code)] 47 Disabled, 48 } 49 50 #[cfg(feature = "rt")] 51 pub(super) enum Context { 52 CurrentThread(current_thread::Context), 53 54 #[cfg(feature = "rt-multi-thread")] 55 MultiThread(multi_thread::Context), 56 57 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 58 MultiThreadAlt(multi_thread_alt::Context), 59 } 60 61 impl Handle { 62 #[cfg_attr(not(feature = "full"), allow(dead_code))] driver(&self) -> &driver::Handle63 pub(crate) fn driver(&self) -> &driver::Handle { 64 match *self { 65 #[cfg(feature = "rt")] 66 Handle::CurrentThread(ref h) => &h.driver, 67 68 #[cfg(feature = "rt-multi-thread")] 69 Handle::MultiThread(ref h) => &h.driver, 70 71 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 72 Handle::MultiThreadAlt(ref h) => &h.driver, 73 74 #[cfg(not(feature = "rt"))] 75 Handle::Disabled => unreachable!(), 76 } 77 } 78 } 79 80 cfg_rt! { 81 use crate::future::Future; 82 use crate::loom::sync::Arc; 83 use crate::runtime::{blocking, task::Id}; 84 use crate::runtime::context; 85 use crate::task::JoinHandle; 86 use crate::util::RngSeedGenerator; 87 use std::task::Waker; 88 89 macro_rules! match_flavor { 90 ($self:expr, $ty:ident($h:ident) => $e:expr) => { 91 match $self { 92 $ty::CurrentThread($h) => $e, 93 94 #[cfg(feature = "rt-multi-thread")] 95 $ty::MultiThread($h) => $e, 96 97 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 98 $ty::MultiThreadAlt($h) => $e, 99 } 100 } 101 } 102 103 impl Handle { 104 #[track_caller] 105 pub(crate) fn current() -> Handle { 106 match context::with_current(Clone::clone) { 107 Ok(handle) => handle, 108 Err(e) => panic!("{}", e), 109 } 110 } 111 112 pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner { 113 match_flavor!(self, Handle(h) => &h.blocking_spawner) 114 } 115 116 pub(crate) fn is_local(&self) -> bool { 117 match self { 118 Handle::CurrentThread(h) => h.local_tid.is_some(), 119 120 #[cfg(feature = "rt-multi-thread")] 121 Handle::MultiThread(_) => false, 122 123 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 124 Handle::MultiThreadAlt(_) => false, 125 } 126 } 127 128 /// Returns true if this is a local runtime and the runtime is owned by the current thread. 129 pub(crate) fn can_spawn_local_on_local_runtime(&self) -> bool { 130 match self { 131 Handle::CurrentThread(h) => h.local_tid.map(|x| std::thread::current().id() == x).unwrap_or(false), 132 133 #[cfg(feature = "rt-multi-thread")] 134 Handle::MultiThread(_) => false, 135 136 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 137 Handle::MultiThreadAlt(_) => false, 138 } 139 } 140 141 pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output> 142 where 143 F: Future + Send + 'static, 144 F::Output: Send + 'static, 145 { 146 match self { 147 Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id), 148 149 #[cfg(feature = "rt-multi-thread")] 150 Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id), 151 152 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 153 Handle::MultiThreadAlt(h) => multi_thread_alt::Handle::spawn(h, future, id), 154 } 155 } 156 157 /// Spawn a local task 158 /// 159 /// # Safety 160 /// This should only be called in `LocalRuntime` if the runtime has been verified to be owned 161 /// by the current thread. 162 #[allow(irrefutable_let_patterns)] 163 pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id) -> JoinHandle<F::Output> 164 where 165 F: Future + 'static, 166 F::Output: 'static, 167 { 168 if let Handle::CurrentThread(h) = self { 169 current_thread::Handle::spawn_local(h, future, id) 170 } else { 171 panic!("Only current_thread and LocalSet have spawn_local internals implemented") 172 } 173 } 174 175 pub(crate) fn shutdown(&self) { 176 match *self { 177 Handle::CurrentThread(_) => {}, 178 179 #[cfg(feature = "rt-multi-thread")] 180 Handle::MultiThread(ref h) => h.shutdown(), 181 182 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 183 Handle::MultiThreadAlt(ref h) => h.shutdown(), 184 } 185 } 186 187 pub(crate) fn seed_generator(&self) -> &RngSeedGenerator { 188 match_flavor!(self, Handle(h) => &h.seed_generator) 189 } 190 191 pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> { 192 match self { 193 Handle::CurrentThread(handle) => handle, 194 #[cfg(feature = "rt-multi-thread")] 195 _ => panic!("not a CurrentThread handle"), 196 } 197 } 198 199 pub(crate) fn hooks(&self) -> &TaskHooks { 200 match self { 201 Handle::CurrentThread(h) => &h.task_hooks, 202 #[cfg(feature = "rt-multi-thread")] 203 Handle::MultiThread(h) => &h.task_hooks, 204 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 205 Handle::MultiThreadAlt(h) => &h.task_hooks, 206 } 207 } 208 209 cfg_rt_multi_thread! { 210 cfg_unstable! { 211 pub(crate) fn expect_multi_thread_alt(&self) -> &Arc<multi_thread_alt::Handle> { 212 match self { 213 Handle::MultiThreadAlt(handle) => handle, 214 _ => panic!("not a `MultiThreadAlt` handle"), 215 } 216 } 217 } 218 } 219 } 220 221 impl Handle { 222 pub(crate) fn num_workers(&self) -> usize { 223 match self { 224 Handle::CurrentThread(_) => 1, 225 #[cfg(feature = "rt-multi-thread")] 226 Handle::MultiThread(handle) => handle.num_workers(), 227 #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] 228 Handle::MultiThreadAlt(handle) => handle.num_workers(), 229 } 230 } 231 232 pub(crate) fn num_alive_tasks(&self) -> usize { 233 match_flavor!(self, Handle(handle) => handle.num_alive_tasks()) 234 } 235 236 pub(crate) fn injection_queue_depth(&self) -> usize { 237 match_flavor!(self, Handle(handle) => handle.injection_queue_depth()) 238 } 239 } 240 241 cfg_unstable_metrics! { 242 use crate::runtime::{SchedulerMetrics, WorkerMetrics}; 243 244 impl Handle { 245 cfg_64bit_metrics! { 246 pub(crate) fn spawned_tasks_count(&self) -> u64 { 247 match_flavor!(self, Handle(handle) => handle.spawned_tasks_count()) 248 } 249 } 250 251 pub(crate) fn num_blocking_threads(&self) -> usize { 252 match_flavor!(self, Handle(handle) => handle.num_blocking_threads()) 253 } 254 255 pub(crate) fn num_idle_blocking_threads(&self) -> usize { 256 match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads()) 257 } 258 259 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { 260 match_flavor!(self, Handle(handle) => handle.scheduler_metrics()) 261 } 262 263 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { 264 match_flavor!(self, Handle(handle) => handle.worker_metrics(worker)) 265 } 266 267 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { 268 match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker)) 269 } 270 271 pub(crate) fn blocking_queue_depth(&self) -> usize { 272 match_flavor!(self, Handle(handle) => handle.blocking_queue_depth()) 273 } 274 } 275 } 276 277 impl Context { 278 #[track_caller] 279 pub(crate) fn expect_current_thread(&self) -> ¤t_thread::Context { 280 match self { 281 Context::CurrentThread(context) => context, 282 #[cfg(feature = "rt-multi-thread")] 283 _ => panic!("expected `CurrentThread::Context`") 284 } 285 } 286 287 pub(crate) fn defer(&self, waker: &Waker) { 288 match_flavor!(self, Context(context) => context.defer(waker)); 289 } 290 291 cfg_rt_multi_thread! { 292 #[track_caller] 293 pub(crate) fn expect_multi_thread(&self) -> &multi_thread::Context { 294 match self { 295 Context::MultiThread(context) => context, 296 _ => panic!("expected `MultiThread::Context`") 297 } 298 } 299 300 cfg_unstable! { 301 #[track_caller] 302 pub(crate) fn expect_multi_thread_alt(&self) -> &multi_thread_alt::Context { 303 match self { 304 Context::MultiThreadAlt(context) => context, 305 _ => panic!("expected `MultiThreadAlt::Context`") 306 } 307 } 308 } 309 } 310 } 311 } 312 313 cfg_not_rt! { 314 #[cfg(any( 315 feature = "net", 316 all(unix, feature = "process"), 317 all(unix, feature = "signal"), 318 feature = "time", 319 ))] 320 impl Handle { 321 #[track_caller] 322 pub(crate) fn current() -> Handle { 323 panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) 324 } 325 } 326 } 327