1 use crate::loom::sync::atomic::AtomicBool;
2 use crate::loom::sync::Arc;
3 use crate::runtime::driver::{self, Driver};
4 use crate::runtime::scheduler::{self, Defer, Inject};
5 use crate::runtime::task::{
6     self, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks,
7 };
8 use crate::runtime::{
9     blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10 };
11 use crate::sync::notify::Notify;
12 use crate::util::atomic_cell::AtomicCell;
13 use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14 
15 use std::cell::RefCell;
16 use std::collections::VecDeque;
17 use std::future::{poll_fn, Future};
18 use std::sync::atomic::Ordering::{AcqRel, Release};
19 use std::task::Poll::{Pending, Ready};
20 use std::task::Waker;
21 use std::thread::ThreadId;
22 use std::time::Duration;
23 use std::{fmt, thread};
24 
25 /// Executes tasks on the current thread
26 pub(crate) struct CurrentThread {
27     /// Core scheduler data is acquired by a thread entering `block_on`.
28     core: AtomicCell<Core>,
29 
30     /// Notifier for waking up other threads to steal the
31     /// driver.
32     notify: Notify,
33 }
34 
35 /// Handle to the current thread scheduler
36 pub(crate) struct Handle {
37     /// Scheduler state shared across threads
38     shared: Shared,
39 
40     /// Resource driver handles
41     pub(crate) driver: driver::Handle,
42 
43     /// Blocking pool spawner
44     pub(crate) blocking_spawner: blocking::Spawner,
45 
46     /// Current random number generator seed
47     pub(crate) seed_generator: RngSeedGenerator,
48 
49     /// User-supplied hooks to invoke for things
50     pub(crate) task_hooks: TaskHooks,
51 
52     /// If this is a `LocalRuntime`, flags the owning thread ID.
53     pub(crate) local_tid: Option<ThreadId>,
54 }
55 
56 /// Data required for executing the scheduler. The struct is passed around to
57 /// a function that will perform the scheduling work and acts as a capability token.
58 struct Core {
59     /// Scheduler run queue
60     tasks: VecDeque<Notified>,
61 
62     /// Current tick
63     tick: u32,
64 
65     /// Runtime driver
66     ///
67     /// The driver is removed before starting to park the thread
68     driver: Option<Driver>,
69 
70     /// Metrics batch
71     metrics: MetricsBatch,
72 
73     /// How often to check the global queue
74     global_queue_interval: u32,
75 
76     /// True if a task panicked without being handled and the runtime is
77     /// configured to shutdown on unhandled panic.
78     unhandled_panic: bool,
79 }
80 
81 /// Scheduler state shared between threads.
82 struct Shared {
83     /// Remote run queue
84     inject: Inject<Arc<Handle>>,
85 
86     /// Collection of all active tasks spawned onto this executor.
87     owned: OwnedTasks<Arc<Handle>>,
88 
89     /// Indicates whether the blocked on thread was woken.
90     woken: AtomicBool,
91 
92     /// Scheduler configuration options
93     config: Config,
94 
95     /// Keeps track of various runtime metrics.
96     scheduler_metrics: SchedulerMetrics,
97 
98     /// This scheduler only has one worker.
99     worker_metrics: WorkerMetrics,
100 }
101 
102 /// Thread-local context.
103 ///
104 /// pub(crate) to store in `runtime::context`.
105 pub(crate) struct Context {
106     /// Scheduler handle
107     handle: Arc<Handle>,
108 
109     /// Scheduler core, enabling the holder of `Context` to execute the
110     /// scheduler.
111     core: RefCell<Option<Box<Core>>>,
112 
113     /// Deferred tasks, usually ones that called `task::yield_now()`.
114     pub(crate) defer: Defer,
115 }
116 
117 type Notified = task::Notified<Arc<Handle>>;
118 
119 /// Initial queue capacity.
120 const INITIAL_CAPACITY: usize = 64;
121 
122 /// Used if none is specified. This is a temporary constant and will be removed
123 /// as we unify tuning logic between the multi-thread and current-thread
124 /// schedulers.
125 const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
126 
127 impl CurrentThread {
new( driver: Driver, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, local_tid: Option<ThreadId>, ) -> (CurrentThread, Arc<Handle>)128     pub(crate) fn new(
129         driver: Driver,
130         driver_handle: driver::Handle,
131         blocking_spawner: blocking::Spawner,
132         seed_generator: RngSeedGenerator,
133         config: Config,
134         local_tid: Option<ThreadId>,
135     ) -> (CurrentThread, Arc<Handle>) {
136         let worker_metrics = WorkerMetrics::from_config(&config);
137         worker_metrics.set_thread_id(thread::current().id());
138 
139         // Get the configured global queue interval, or use the default.
140         let global_queue_interval = config
141             .global_queue_interval
142             .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
143 
144         let handle = Arc::new(Handle {
145             task_hooks: TaskHooks {
146                 task_spawn_callback: config.before_spawn.clone(),
147                 task_terminate_callback: config.after_termination.clone(),
148             },
149             shared: Shared {
150                 inject: Inject::new(),
151                 owned: OwnedTasks::new(1),
152                 woken: AtomicBool::new(false),
153                 config,
154                 scheduler_metrics: SchedulerMetrics::new(),
155                 worker_metrics,
156             },
157             driver: driver_handle,
158             blocking_spawner,
159             seed_generator,
160             local_tid,
161         });
162 
163         let core = AtomicCell::new(Some(Box::new(Core {
164             tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
165             tick: 0,
166             driver: Some(driver),
167             metrics: MetricsBatch::new(&handle.shared.worker_metrics),
168             global_queue_interval,
169             unhandled_panic: false,
170         })));
171 
172         let scheduler = CurrentThread {
173             core,
174             notify: Notify::new(),
175         };
176 
177         (scheduler, handle)
178     }
179 
180     #[track_caller]
block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output181     pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
182         pin!(future);
183 
184         crate::runtime::context::enter_runtime(handle, false, |blocking| {
185             let handle = handle.as_current_thread();
186 
187             // Attempt to steal the scheduler core and block_on the future if we can
188             // there, otherwise, lets select on a notification that the core is
189             // available or the future is complete.
190             loop {
191                 if let Some(core) = self.take_core(handle) {
192                     handle
193                         .shared
194                         .worker_metrics
195                         .set_thread_id(thread::current().id());
196                     return core.block_on(future);
197                 } else {
198                     let notified = self.notify.notified();
199                     pin!(notified);
200 
201                     if let Some(out) = blocking
202                         .block_on(poll_fn(|cx| {
203                             if notified.as_mut().poll(cx).is_ready() {
204                                 return Ready(None);
205                             }
206 
207                             if let Ready(out) = future.as_mut().poll(cx) {
208                                 return Ready(Some(out));
209                             }
210 
211                             Pending
212                         }))
213                         .expect("Failed to `Enter::block_on`")
214                     {
215                         return out;
216                     }
217                 }
218             }
219         })
220     }
221 
take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>>222     fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
223         let core = self.core.take()?;
224 
225         Some(CoreGuard {
226             context: scheduler::Context::CurrentThread(Context {
227                 handle: handle.clone(),
228                 core: RefCell::new(Some(core)),
229                 defer: Defer::new(),
230             }),
231             scheduler: self,
232         })
233     }
234 
shutdown(&mut self, handle: &scheduler::Handle)235     pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
236         let handle = handle.as_current_thread();
237 
238         // Avoid a double panic if we are currently panicking and
239         // the lock may be poisoned.
240 
241         let core = match self.take_core(handle) {
242             Some(core) => core,
243             None if std::thread::panicking() => return,
244             None => panic!("Oh no! We never placed the Core back, this is a bug!"),
245         };
246 
247         // Check that the thread-local is not being destroyed
248         let tls_available = context::with_current(|_| ()).is_ok();
249 
250         if tls_available {
251             core.enter(|core, _context| {
252                 let core = shutdown2(core, handle);
253                 (core, ())
254             });
255         } else {
256             // Shutdown without setting the context. `tokio::spawn` calls will
257             // fail, but those will fail either way because the thread-local is
258             // not available anymore.
259             let context = core.context.expect_current_thread();
260             let core = context.core.borrow_mut().take().unwrap();
261 
262             let core = shutdown2(core, handle);
263             *context.core.borrow_mut() = Some(core);
264         }
265     }
266 }
267 
shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core>268 fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
269     // Drain the OwnedTasks collection. This call also closes the
270     // collection, ensuring that no tasks are ever pushed after this
271     // call returns.
272     handle.shared.owned.close_and_shutdown_all(0);
273 
274     // Drain local queue
275     // We already shut down every task, so we just need to drop the task.
276     while let Some(task) = core.next_local_task(handle) {
277         drop(task);
278     }
279 
280     // Close the injection queue
281     handle.shared.inject.close();
282 
283     // Drain remote queue
284     while let Some(task) = handle.shared.inject.pop() {
285         drop(task);
286     }
287 
288     assert!(handle.shared.owned.is_empty());
289 
290     // Submit metrics
291     core.submit_metrics(handle);
292 
293     // Shutdown the resource drivers
294     if let Some(driver) = core.driver.as_mut() {
295         driver.shutdown(&handle.driver);
296     }
297 
298     core
299 }
300 
301 impl fmt::Debug for CurrentThread {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result302     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
303         fmt.debug_struct("CurrentThread").finish()
304     }
305 }
306 
307 // ===== impl Core =====
308 
309 impl Core {
310     /// Get and increment the current tick
tick(&mut self)311     fn tick(&mut self) {
312         self.tick = self.tick.wrapping_add(1);
313     }
314 
next_task(&mut self, handle: &Handle) -> Option<Notified>315     fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
316         if self.tick % self.global_queue_interval == 0 {
317             handle
318                 .next_remote_task()
319                 .or_else(|| self.next_local_task(handle))
320         } else {
321             self.next_local_task(handle)
322                 .or_else(|| handle.next_remote_task())
323         }
324     }
325 
next_local_task(&mut self, handle: &Handle) -> Option<Notified>326     fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
327         let ret = self.tasks.pop_front();
328         handle
329             .shared
330             .worker_metrics
331             .set_queue_depth(self.tasks.len());
332         ret
333     }
334 
push_task(&mut self, handle: &Handle, task: Notified)335     fn push_task(&mut self, handle: &Handle, task: Notified) {
336         self.tasks.push_back(task);
337         self.metrics.inc_local_schedule_count();
338         handle
339             .shared
340             .worker_metrics
341             .set_queue_depth(self.tasks.len());
342     }
343 
submit_metrics(&mut self, handle: &Handle)344     fn submit_metrics(&mut self, handle: &Handle) {
345         self.metrics.submit(&handle.shared.worker_metrics, 0);
346     }
347 }
348 
349 #[cfg(tokio_taskdump)]
wake_deferred_tasks_and_free(context: &Context)350 fn wake_deferred_tasks_and_free(context: &Context) {
351     let wakers = context.defer.take_deferred();
352     for waker in wakers {
353         waker.wake();
354     }
355 }
356 
357 // ===== impl Context =====
358 
359 impl Context {
360     /// Execute the closure with the given scheduler core stored in the
361     /// thread-local context.
run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R)362     fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
363         core.metrics.start_poll();
364         let mut ret = self.enter(core, || crate::runtime::coop::budget(f));
365         ret.0.metrics.end_poll();
366         ret
367     }
368 
369     /// Blocks the current thread until an event is received by the driver,
370     /// including I/O events, timer events, ...
park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core>371     fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
372         let mut driver = core.driver.take().expect("driver missing");
373 
374         if let Some(f) = &handle.shared.config.before_park {
375             let (c, ()) = self.enter(core, || f());
376             core = c;
377         }
378 
379         // This check will fail if `before_park` spawns a task for us to run
380         // instead of parking the thread
381         if core.tasks.is_empty() {
382             // Park until the thread is signaled
383             core.metrics.about_to_park();
384             core.submit_metrics(handle);
385 
386             let (c, ()) = self.enter(core, || {
387                 driver.park(&handle.driver);
388                 self.defer.wake();
389             });
390 
391             core = c;
392 
393             core.metrics.unparked();
394             core.submit_metrics(handle);
395         }
396 
397         if let Some(f) = &handle.shared.config.after_unpark {
398             let (c, ()) = self.enter(core, || f());
399             core = c;
400         }
401 
402         core.driver = Some(driver);
403         core
404     }
405 
406     /// Checks the driver for new events without blocking the thread.
park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core>407     fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
408         let mut driver = core.driver.take().expect("driver missing");
409 
410         core.submit_metrics(handle);
411 
412         let (mut core, ()) = self.enter(core, || {
413             driver.park_timeout(&handle.driver, Duration::from_millis(0));
414             self.defer.wake();
415         });
416 
417         core.driver = Some(driver);
418         core
419     }
420 
enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R)421     fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
422         // Store the scheduler core in the thread-local context
423         //
424         // A drop-guard is employed at a higher level.
425         *self.core.borrow_mut() = Some(core);
426 
427         // Execute the closure while tracking the execution budget
428         let ret = f();
429 
430         // Take the scheduler core back
431         let core = self.core.borrow_mut().take().expect("core missing");
432         (core, ret)
433     }
434 
defer(&self, waker: &Waker)435     pub(crate) fn defer(&self, waker: &Waker) {
436         self.defer.defer(waker);
437     }
438 }
439 
440 // ===== impl Handle =====
441 
442 impl Handle {
443     /// Spawns a future onto the `CurrentThread` scheduler
spawn<F>( me: &Arc<Self>, future: F, id: crate::runtime::task::Id, ) -> JoinHandle<F::Output> where F: crate::future::Future + Send + 'static, F::Output: Send + 'static,444     pub(crate) fn spawn<F>(
445         me: &Arc<Self>,
446         future: F,
447         id: crate::runtime::task::Id,
448     ) -> JoinHandle<F::Output>
449     where
450         F: crate::future::Future + Send + 'static,
451         F::Output: Send + 'static,
452     {
453         let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
454 
455         me.task_hooks.spawn(&TaskMeta {
456             id,
457             _phantom: Default::default(),
458         });
459 
460         if let Some(notified) = notified {
461             me.schedule(notified);
462         }
463 
464         handle
465     }
466 
467     /// Spawn a task which isn't safe to send across thread boundaries onto the runtime.
468     ///
469     /// # Safety
470     /// This should only be used when this is a `LocalRuntime` or in another case where the runtime
471     /// provably cannot be driven from or moved to different threads from the one on which the task
472     /// is spawned.
spawn_local<F>( me: &Arc<Self>, future: F, id: crate::runtime::task::Id, ) -> JoinHandle<F::Output> where F: crate::future::Future + 'static, F::Output: 'static,473     pub(crate) unsafe fn spawn_local<F>(
474         me: &Arc<Self>,
475         future: F,
476         id: crate::runtime::task::Id,
477     ) -> JoinHandle<F::Output>
478     where
479         F: crate::future::Future + 'static,
480         F::Output: 'static,
481     {
482         let (handle, notified) = me.shared.owned.bind_local(future, me.clone(), id);
483 
484         me.task_hooks.spawn(&TaskMeta {
485             id,
486             _phantom: Default::default(),
487         });
488 
489         if let Some(notified) = notified {
490             me.schedule(notified);
491         }
492 
493         handle
494     }
495 
496     /// Capture a snapshot of this runtime's state.
497     #[cfg(all(
498         tokio_unstable,
499         tokio_taskdump,
500         target_os = "linux",
501         any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
502     ))]
dump(&self) -> crate::runtime::Dump503     pub(crate) fn dump(&self) -> crate::runtime::Dump {
504         use crate::runtime::dump;
505         use task::trace::trace_current_thread;
506 
507         let mut traces = vec![];
508 
509         // todo: how to make this work outside of a runtime context?
510         context::with_scheduler(|maybe_context| {
511             // drain the local queue
512             let context = if let Some(context) = maybe_context {
513                 context.expect_current_thread()
514             } else {
515                 return;
516             };
517             let mut maybe_core = context.core.borrow_mut();
518             let core = if let Some(core) = maybe_core.as_mut() {
519                 core
520             } else {
521                 return;
522             };
523             let local = &mut core.tasks;
524 
525             if self.shared.inject.is_closed() {
526                 return;
527             }
528 
529             traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
530                 .into_iter()
531                 .map(|(id, trace)| dump::Task::new(id, trace))
532                 .collect();
533 
534             // Avoid double borrow panic
535             drop(maybe_core);
536 
537             // Taking a taskdump could wakes every task, but we probably don't want
538             // the `yield_now` vector to be that large under normal circumstances.
539             // Therefore, we free its allocation.
540             wake_deferred_tasks_and_free(context);
541         });
542 
543         dump::Dump::new(traces)
544     }
545 
next_remote_task(&self) -> Option<Notified>546     fn next_remote_task(&self) -> Option<Notified> {
547         self.shared.inject.pop()
548     }
549 
waker_ref(me: &Arc<Self>) -> WakerRef<'_>550     fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
551         // Set woken to true when enter block_on, ensure outer future
552         // be polled for the first time when enter loop
553         me.shared.woken.store(true, Release);
554         waker_ref(me)
555     }
556 
557     // reset woken to false and return original value
reset_woken(&self) -> bool558     pub(crate) fn reset_woken(&self) -> bool {
559         self.shared.woken.swap(false, AcqRel)
560     }
561 
num_alive_tasks(&self) -> usize562     pub(crate) fn num_alive_tasks(&self) -> usize {
563         self.shared.owned.num_alive_tasks()
564     }
565 
injection_queue_depth(&self) -> usize566     pub(crate) fn injection_queue_depth(&self) -> usize {
567         self.shared.inject.len()
568     }
569 }
570 
571 cfg_unstable_metrics! {
572     impl Handle {
573         pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
574             &self.shared.scheduler_metrics
575         }
576 
577         pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
578             assert_eq!(0, worker);
579             &self.shared.worker_metrics
580         }
581 
582         pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
583             self.worker_metrics(worker).queue_depth()
584         }
585 
586         pub(crate) fn num_blocking_threads(&self) -> usize {
587             self.blocking_spawner.num_threads()
588         }
589 
590         pub(crate) fn num_idle_blocking_threads(&self) -> usize {
591             self.blocking_spawner.num_idle_threads()
592         }
593 
594         pub(crate) fn blocking_queue_depth(&self) -> usize {
595             self.blocking_spawner.queue_depth()
596         }
597 
598         cfg_64bit_metrics! {
599             pub(crate) fn spawned_tasks_count(&self) -> u64 {
600                 self.shared.owned.spawned_tasks_count()
601             }
602         }
603     }
604 }
605 
606 cfg_unstable! {
607     use std::num::NonZeroU64;
608 
609     impl Handle {
610         pub(crate) fn owned_id(&self) -> NonZeroU64 {
611             self.shared.owned.id
612         }
613     }
614 }
615 
616 impl fmt::Debug for Handle {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result617     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
618         fmt.debug_struct("current_thread::Handle { ... }").finish()
619     }
620 }
621 
622 // ===== impl Shared =====
623 
624 impl Schedule for Arc<Handle> {
release(&self, task: &Task<Self>) -> Option<Task<Self>>625     fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
626         self.shared.owned.remove(task)
627     }
628 
schedule(&self, task: task::Notified<Self>)629     fn schedule(&self, task: task::Notified<Self>) {
630         use scheduler::Context::CurrentThread;
631 
632         context::with_scheduler(|maybe_cx| match maybe_cx {
633             Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
634                 let mut core = cx.core.borrow_mut();
635 
636                 // If `None`, the runtime is shutting down, so there is no need
637                 // to schedule the task.
638                 if let Some(core) = core.as_mut() {
639                     core.push_task(self, task);
640                 }
641             }
642             _ => {
643                 // Track that a task was scheduled from **outside** of the runtime.
644                 self.shared.scheduler_metrics.inc_remote_schedule_count();
645 
646                 // Schedule the task
647                 self.shared.inject.push(task);
648                 self.driver.unpark();
649             }
650         });
651     }
652 
hooks(&self) -> TaskHarnessScheduleHooks653     fn hooks(&self) -> TaskHarnessScheduleHooks {
654         TaskHarnessScheduleHooks {
655             task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
656         }
657     }
658 
659     cfg_unstable! {
660         fn unhandled_panic(&self) {
661             use crate::runtime::UnhandledPanic;
662 
663             match self.shared.config.unhandled_panic {
664                 UnhandledPanic::Ignore => {
665                     // Do nothing
666                 }
667                 UnhandledPanic::ShutdownRuntime => {
668                     use scheduler::Context::CurrentThread;
669 
670                     // This hook is only called from within the runtime, so
671                     // `context::with_scheduler` should match with `&self`, i.e.
672                     // there is no opportunity for a nested scheduler to be
673                     // called.
674                     context::with_scheduler(|maybe_cx| match maybe_cx {
675                         Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
676                             let mut core = cx.core.borrow_mut();
677 
678                             // If `None`, the runtime is shutting down, so there is no need to signal shutdown
679                             if let Some(core) = core.as_mut() {
680                                 core.unhandled_panic = true;
681                                 self.shared.owned.close_and_shutdown_all(0);
682                             }
683                         }
684                         _ => unreachable!("runtime core not set in CURRENT thread-local"),
685                     })
686                 }
687             }
688         }
689     }
690 }
691 
692 impl Wake for Handle {
wake(arc_self: Arc<Self>)693     fn wake(arc_self: Arc<Self>) {
694         Wake::wake_by_ref(&arc_self);
695     }
696 
697     /// Wake by reference
wake_by_ref(arc_self: &Arc<Self>)698     fn wake_by_ref(arc_self: &Arc<Self>) {
699         arc_self.shared.woken.store(true, Release);
700         arc_self.driver.unpark();
701     }
702 }
703 
704 // ===== CoreGuard =====
705 
706 /// Used to ensure we always place the `Core` value back into its slot in
707 /// `CurrentThread`, even if the future panics.
708 struct CoreGuard<'a> {
709     context: scheduler::Context,
710     scheduler: &'a CurrentThread,
711 }
712 
713 impl CoreGuard<'_> {
714     #[track_caller]
block_on<F: Future>(self, future: F) -> F::Output715     fn block_on<F: Future>(self, future: F) -> F::Output {
716         let ret = self.enter(|mut core, context| {
717             let waker = Handle::waker_ref(&context.handle);
718             let mut cx = std::task::Context::from_waker(&waker);
719 
720             pin!(future);
721 
722             core.metrics.start_processing_scheduled_tasks();
723 
724             'outer: loop {
725                 let handle = &context.handle;
726 
727                 if handle.reset_woken() {
728                     let (c, res) = context.enter(core, || {
729                         crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
730                     });
731 
732                     core = c;
733 
734                     if let Ready(v) = res {
735                         return (core, Some(v));
736                     }
737                 }
738 
739                 for _ in 0..handle.shared.config.event_interval {
740                     // Make sure we didn't hit an unhandled_panic
741                     if core.unhandled_panic {
742                         return (core, None);
743                     }
744 
745                     core.tick();
746 
747                     let entry = core.next_task(handle);
748 
749                     let task = match entry {
750                         Some(entry) => entry,
751                         None => {
752                             core.metrics.end_processing_scheduled_tasks();
753 
754                             core = if !context.defer.is_empty() {
755                                 context.park_yield(core, handle)
756                             } else {
757                                 context.park(core, handle)
758                             };
759 
760                             core.metrics.start_processing_scheduled_tasks();
761 
762                             // Try polling the `block_on` future next
763                             continue 'outer;
764                         }
765                     };
766 
767                     let task = context.handle.shared.owned.assert_owner(task);
768 
769                     let (c, ()) = context.run_task(core, || {
770                         task.run();
771                     });
772 
773                     core = c;
774                 }
775 
776                 core.metrics.end_processing_scheduled_tasks();
777 
778                 // Yield to the driver, this drives the timer and pulls any
779                 // pending I/O events.
780                 core = context.park_yield(core, handle);
781 
782                 core.metrics.start_processing_scheduled_tasks();
783             }
784         });
785 
786         match ret {
787             Some(ret) => ret,
788             None => {
789                 // `block_on` panicked.
790                 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
791             }
792         }
793     }
794 
795     /// Enters the scheduler context. This sets the queue and other necessary
796     /// scheduler state in the thread-local.
enter<F, R>(self, f: F) -> R where F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),797     fn enter<F, R>(self, f: F) -> R
798     where
799         F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
800     {
801         let context = self.context.expect_current_thread();
802 
803         // Remove `core` from `context` to pass into the closure.
804         let core = context.core.borrow_mut().take().expect("core missing");
805 
806         // Call the closure and place `core` back
807         let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
808 
809         *context.core.borrow_mut() = Some(core);
810 
811         ret
812     }
813 }
814 
815 impl Drop for CoreGuard<'_> {
drop(&mut self)816     fn drop(&mut self) {
817         let context = self.context.expect_current_thread();
818 
819         if let Some(core) = context.core.borrow_mut().take() {
820             // Replace old scheduler back into the state to allow
821             // other threads to pick it up and drive it.
822             self.scheduler.core.set(core);
823 
824             // Wake up other possible threads that could steal the driver.
825             self.scheduler.notify.notify_one();
826         }
827     }
828 }
829