1 use crate::loom::sync::atomic::AtomicBool;
2 use crate::loom::sync::Arc;
3 use crate::runtime::driver::{self, Driver};
4 use crate::runtime::scheduler::{self, Defer, Inject};
5 use crate::runtime::task::{
6 self, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks,
7 };
8 use crate::runtime::{
9 blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
10 };
11 use crate::sync::notify::Notify;
12 use crate::util::atomic_cell::AtomicCell;
13 use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
14
15 use std::cell::RefCell;
16 use std::collections::VecDeque;
17 use std::future::{poll_fn, Future};
18 use std::sync::atomic::Ordering::{AcqRel, Release};
19 use std::task::Poll::{Pending, Ready};
20 use std::task::Waker;
21 use std::thread::ThreadId;
22 use std::time::Duration;
23 use std::{fmt, thread};
24
25 /// Executes tasks on the current thread
26 pub(crate) struct CurrentThread {
27 /// Core scheduler data is acquired by a thread entering `block_on`.
28 core: AtomicCell<Core>,
29
30 /// Notifier for waking up other threads to steal the
31 /// driver.
32 notify: Notify,
33 }
34
35 /// Handle to the current thread scheduler
36 pub(crate) struct Handle {
37 /// Scheduler state shared across threads
38 shared: Shared,
39
40 /// Resource driver handles
41 pub(crate) driver: driver::Handle,
42
43 /// Blocking pool spawner
44 pub(crate) blocking_spawner: blocking::Spawner,
45
46 /// Current random number generator seed
47 pub(crate) seed_generator: RngSeedGenerator,
48
49 /// User-supplied hooks to invoke for things
50 pub(crate) task_hooks: TaskHooks,
51
52 /// If this is a `LocalRuntime`, flags the owning thread ID.
53 pub(crate) local_tid: Option<ThreadId>,
54 }
55
56 /// Data required for executing the scheduler. The struct is passed around to
57 /// a function that will perform the scheduling work and acts as a capability token.
58 struct Core {
59 /// Scheduler run queue
60 tasks: VecDeque<Notified>,
61
62 /// Current tick
63 tick: u32,
64
65 /// Runtime driver
66 ///
67 /// The driver is removed before starting to park the thread
68 driver: Option<Driver>,
69
70 /// Metrics batch
71 metrics: MetricsBatch,
72
73 /// How often to check the global queue
74 global_queue_interval: u32,
75
76 /// True if a task panicked without being handled and the runtime is
77 /// configured to shutdown on unhandled panic.
78 unhandled_panic: bool,
79 }
80
81 /// Scheduler state shared between threads.
82 struct Shared {
83 /// Remote run queue
84 inject: Inject<Arc<Handle>>,
85
86 /// Collection of all active tasks spawned onto this executor.
87 owned: OwnedTasks<Arc<Handle>>,
88
89 /// Indicates whether the blocked on thread was woken.
90 woken: AtomicBool,
91
92 /// Scheduler configuration options
93 config: Config,
94
95 /// Keeps track of various runtime metrics.
96 scheduler_metrics: SchedulerMetrics,
97
98 /// This scheduler only has one worker.
99 worker_metrics: WorkerMetrics,
100 }
101
102 /// Thread-local context.
103 ///
104 /// pub(crate) to store in `runtime::context`.
105 pub(crate) struct Context {
106 /// Scheduler handle
107 handle: Arc<Handle>,
108
109 /// Scheduler core, enabling the holder of `Context` to execute the
110 /// scheduler.
111 core: RefCell<Option<Box<Core>>>,
112
113 /// Deferred tasks, usually ones that called `task::yield_now()`.
114 pub(crate) defer: Defer,
115 }
116
117 type Notified = task::Notified<Arc<Handle>>;
118
119 /// Initial queue capacity.
120 const INITIAL_CAPACITY: usize = 64;
121
122 /// Used if none is specified. This is a temporary constant and will be removed
123 /// as we unify tuning logic between the multi-thread and current-thread
124 /// schedulers.
125 const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
126
127 impl CurrentThread {
new( driver: Driver, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, local_tid: Option<ThreadId>, ) -> (CurrentThread, Arc<Handle>)128 pub(crate) fn new(
129 driver: Driver,
130 driver_handle: driver::Handle,
131 blocking_spawner: blocking::Spawner,
132 seed_generator: RngSeedGenerator,
133 config: Config,
134 local_tid: Option<ThreadId>,
135 ) -> (CurrentThread, Arc<Handle>) {
136 let worker_metrics = WorkerMetrics::from_config(&config);
137 worker_metrics.set_thread_id(thread::current().id());
138
139 // Get the configured global queue interval, or use the default.
140 let global_queue_interval = config
141 .global_queue_interval
142 .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
143
144 let handle = Arc::new(Handle {
145 task_hooks: TaskHooks {
146 task_spawn_callback: config.before_spawn.clone(),
147 task_terminate_callback: config.after_termination.clone(),
148 },
149 shared: Shared {
150 inject: Inject::new(),
151 owned: OwnedTasks::new(1),
152 woken: AtomicBool::new(false),
153 config,
154 scheduler_metrics: SchedulerMetrics::new(),
155 worker_metrics,
156 },
157 driver: driver_handle,
158 blocking_spawner,
159 seed_generator,
160 local_tid,
161 });
162
163 let core = AtomicCell::new(Some(Box::new(Core {
164 tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
165 tick: 0,
166 driver: Some(driver),
167 metrics: MetricsBatch::new(&handle.shared.worker_metrics),
168 global_queue_interval,
169 unhandled_panic: false,
170 })));
171
172 let scheduler = CurrentThread {
173 core,
174 notify: Notify::new(),
175 };
176
177 (scheduler, handle)
178 }
179
180 #[track_caller]
block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output181 pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
182 pin!(future);
183
184 crate::runtime::context::enter_runtime(handle, false, |blocking| {
185 let handle = handle.as_current_thread();
186
187 // Attempt to steal the scheduler core and block_on the future if we can
188 // there, otherwise, lets select on a notification that the core is
189 // available or the future is complete.
190 loop {
191 if let Some(core) = self.take_core(handle) {
192 handle
193 .shared
194 .worker_metrics
195 .set_thread_id(thread::current().id());
196 return core.block_on(future);
197 } else {
198 let notified = self.notify.notified();
199 pin!(notified);
200
201 if let Some(out) = blocking
202 .block_on(poll_fn(|cx| {
203 if notified.as_mut().poll(cx).is_ready() {
204 return Ready(None);
205 }
206
207 if let Ready(out) = future.as_mut().poll(cx) {
208 return Ready(Some(out));
209 }
210
211 Pending
212 }))
213 .expect("Failed to `Enter::block_on`")
214 {
215 return out;
216 }
217 }
218 }
219 })
220 }
221
take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>>222 fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
223 let core = self.core.take()?;
224
225 Some(CoreGuard {
226 context: scheduler::Context::CurrentThread(Context {
227 handle: handle.clone(),
228 core: RefCell::new(Some(core)),
229 defer: Defer::new(),
230 }),
231 scheduler: self,
232 })
233 }
234
shutdown(&mut self, handle: &scheduler::Handle)235 pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
236 let handle = handle.as_current_thread();
237
238 // Avoid a double panic if we are currently panicking and
239 // the lock may be poisoned.
240
241 let core = match self.take_core(handle) {
242 Some(core) => core,
243 None if std::thread::panicking() => return,
244 None => panic!("Oh no! We never placed the Core back, this is a bug!"),
245 };
246
247 // Check that the thread-local is not being destroyed
248 let tls_available = context::with_current(|_| ()).is_ok();
249
250 if tls_available {
251 core.enter(|core, _context| {
252 let core = shutdown2(core, handle);
253 (core, ())
254 });
255 } else {
256 // Shutdown without setting the context. `tokio::spawn` calls will
257 // fail, but those will fail either way because the thread-local is
258 // not available anymore.
259 let context = core.context.expect_current_thread();
260 let core = context.core.borrow_mut().take().unwrap();
261
262 let core = shutdown2(core, handle);
263 *context.core.borrow_mut() = Some(core);
264 }
265 }
266 }
267
shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core>268 fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
269 // Drain the OwnedTasks collection. This call also closes the
270 // collection, ensuring that no tasks are ever pushed after this
271 // call returns.
272 handle.shared.owned.close_and_shutdown_all(0);
273
274 // Drain local queue
275 // We already shut down every task, so we just need to drop the task.
276 while let Some(task) = core.next_local_task(handle) {
277 drop(task);
278 }
279
280 // Close the injection queue
281 handle.shared.inject.close();
282
283 // Drain remote queue
284 while let Some(task) = handle.shared.inject.pop() {
285 drop(task);
286 }
287
288 assert!(handle.shared.owned.is_empty());
289
290 // Submit metrics
291 core.submit_metrics(handle);
292
293 // Shutdown the resource drivers
294 if let Some(driver) = core.driver.as_mut() {
295 driver.shutdown(&handle.driver);
296 }
297
298 core
299 }
300
301 impl fmt::Debug for CurrentThread {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result302 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
303 fmt.debug_struct("CurrentThread").finish()
304 }
305 }
306
307 // ===== impl Core =====
308
309 impl Core {
310 /// Get and increment the current tick
tick(&mut self)311 fn tick(&mut self) {
312 self.tick = self.tick.wrapping_add(1);
313 }
314
next_task(&mut self, handle: &Handle) -> Option<Notified>315 fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
316 if self.tick % self.global_queue_interval == 0 {
317 handle
318 .next_remote_task()
319 .or_else(|| self.next_local_task(handle))
320 } else {
321 self.next_local_task(handle)
322 .or_else(|| handle.next_remote_task())
323 }
324 }
325
next_local_task(&mut self, handle: &Handle) -> Option<Notified>326 fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
327 let ret = self.tasks.pop_front();
328 handle
329 .shared
330 .worker_metrics
331 .set_queue_depth(self.tasks.len());
332 ret
333 }
334
push_task(&mut self, handle: &Handle, task: Notified)335 fn push_task(&mut self, handle: &Handle, task: Notified) {
336 self.tasks.push_back(task);
337 self.metrics.inc_local_schedule_count();
338 handle
339 .shared
340 .worker_metrics
341 .set_queue_depth(self.tasks.len());
342 }
343
submit_metrics(&mut self, handle: &Handle)344 fn submit_metrics(&mut self, handle: &Handle) {
345 self.metrics.submit(&handle.shared.worker_metrics, 0);
346 }
347 }
348
349 #[cfg(tokio_taskdump)]
wake_deferred_tasks_and_free(context: &Context)350 fn wake_deferred_tasks_and_free(context: &Context) {
351 let wakers = context.defer.take_deferred();
352 for waker in wakers {
353 waker.wake();
354 }
355 }
356
357 // ===== impl Context =====
358
359 impl Context {
360 /// Execute the closure with the given scheduler core stored in the
361 /// thread-local context.
run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R)362 fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
363 core.metrics.start_poll();
364 let mut ret = self.enter(core, || crate::runtime::coop::budget(f));
365 ret.0.metrics.end_poll();
366 ret
367 }
368
369 /// Blocks the current thread until an event is received by the driver,
370 /// including I/O events, timer events, ...
park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core>371 fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
372 let mut driver = core.driver.take().expect("driver missing");
373
374 if let Some(f) = &handle.shared.config.before_park {
375 let (c, ()) = self.enter(core, || f());
376 core = c;
377 }
378
379 // This check will fail if `before_park` spawns a task for us to run
380 // instead of parking the thread
381 if core.tasks.is_empty() {
382 // Park until the thread is signaled
383 core.metrics.about_to_park();
384 core.submit_metrics(handle);
385
386 let (c, ()) = self.enter(core, || {
387 driver.park(&handle.driver);
388 self.defer.wake();
389 });
390
391 core = c;
392
393 core.metrics.unparked();
394 core.submit_metrics(handle);
395 }
396
397 if let Some(f) = &handle.shared.config.after_unpark {
398 let (c, ()) = self.enter(core, || f());
399 core = c;
400 }
401
402 core.driver = Some(driver);
403 core
404 }
405
406 /// Checks the driver for new events without blocking the thread.
park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core>407 fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
408 let mut driver = core.driver.take().expect("driver missing");
409
410 core.submit_metrics(handle);
411
412 let (mut core, ()) = self.enter(core, || {
413 driver.park_timeout(&handle.driver, Duration::from_millis(0));
414 self.defer.wake();
415 });
416
417 core.driver = Some(driver);
418 core
419 }
420
enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R)421 fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
422 // Store the scheduler core in the thread-local context
423 //
424 // A drop-guard is employed at a higher level.
425 *self.core.borrow_mut() = Some(core);
426
427 // Execute the closure while tracking the execution budget
428 let ret = f();
429
430 // Take the scheduler core back
431 let core = self.core.borrow_mut().take().expect("core missing");
432 (core, ret)
433 }
434
defer(&self, waker: &Waker)435 pub(crate) fn defer(&self, waker: &Waker) {
436 self.defer.defer(waker);
437 }
438 }
439
440 // ===== impl Handle =====
441
442 impl Handle {
443 /// Spawns a future onto the `CurrentThread` scheduler
spawn<F>( me: &Arc<Self>, future: F, id: crate::runtime::task::Id, ) -> JoinHandle<F::Output> where F: crate::future::Future + Send + 'static, F::Output: Send + 'static,444 pub(crate) fn spawn<F>(
445 me: &Arc<Self>,
446 future: F,
447 id: crate::runtime::task::Id,
448 ) -> JoinHandle<F::Output>
449 where
450 F: crate::future::Future + Send + 'static,
451 F::Output: Send + 'static,
452 {
453 let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
454
455 me.task_hooks.spawn(&TaskMeta {
456 id,
457 _phantom: Default::default(),
458 });
459
460 if let Some(notified) = notified {
461 me.schedule(notified);
462 }
463
464 handle
465 }
466
467 /// Spawn a task which isn't safe to send across thread boundaries onto the runtime.
468 ///
469 /// # Safety
470 /// This should only be used when this is a `LocalRuntime` or in another case where the runtime
471 /// provably cannot be driven from or moved to different threads from the one on which the task
472 /// is spawned.
spawn_local<F>( me: &Arc<Self>, future: F, id: crate::runtime::task::Id, ) -> JoinHandle<F::Output> where F: crate::future::Future + 'static, F::Output: 'static,473 pub(crate) unsafe fn spawn_local<F>(
474 me: &Arc<Self>,
475 future: F,
476 id: crate::runtime::task::Id,
477 ) -> JoinHandle<F::Output>
478 where
479 F: crate::future::Future + 'static,
480 F::Output: 'static,
481 {
482 let (handle, notified) = me.shared.owned.bind_local(future, me.clone(), id);
483
484 me.task_hooks.spawn(&TaskMeta {
485 id,
486 _phantom: Default::default(),
487 });
488
489 if let Some(notified) = notified {
490 me.schedule(notified);
491 }
492
493 handle
494 }
495
496 /// Capture a snapshot of this runtime's state.
497 #[cfg(all(
498 tokio_unstable,
499 tokio_taskdump,
500 target_os = "linux",
501 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
502 ))]
dump(&self) -> crate::runtime::Dump503 pub(crate) fn dump(&self) -> crate::runtime::Dump {
504 use crate::runtime::dump;
505 use task::trace::trace_current_thread;
506
507 let mut traces = vec![];
508
509 // todo: how to make this work outside of a runtime context?
510 context::with_scheduler(|maybe_context| {
511 // drain the local queue
512 let context = if let Some(context) = maybe_context {
513 context.expect_current_thread()
514 } else {
515 return;
516 };
517 let mut maybe_core = context.core.borrow_mut();
518 let core = if let Some(core) = maybe_core.as_mut() {
519 core
520 } else {
521 return;
522 };
523 let local = &mut core.tasks;
524
525 if self.shared.inject.is_closed() {
526 return;
527 }
528
529 traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
530 .into_iter()
531 .map(|(id, trace)| dump::Task::new(id, trace))
532 .collect();
533
534 // Avoid double borrow panic
535 drop(maybe_core);
536
537 // Taking a taskdump could wakes every task, but we probably don't want
538 // the `yield_now` vector to be that large under normal circumstances.
539 // Therefore, we free its allocation.
540 wake_deferred_tasks_and_free(context);
541 });
542
543 dump::Dump::new(traces)
544 }
545
next_remote_task(&self) -> Option<Notified>546 fn next_remote_task(&self) -> Option<Notified> {
547 self.shared.inject.pop()
548 }
549
waker_ref(me: &Arc<Self>) -> WakerRef<'_>550 fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
551 // Set woken to true when enter block_on, ensure outer future
552 // be polled for the first time when enter loop
553 me.shared.woken.store(true, Release);
554 waker_ref(me)
555 }
556
557 // reset woken to false and return original value
reset_woken(&self) -> bool558 pub(crate) fn reset_woken(&self) -> bool {
559 self.shared.woken.swap(false, AcqRel)
560 }
561
num_alive_tasks(&self) -> usize562 pub(crate) fn num_alive_tasks(&self) -> usize {
563 self.shared.owned.num_alive_tasks()
564 }
565
injection_queue_depth(&self) -> usize566 pub(crate) fn injection_queue_depth(&self) -> usize {
567 self.shared.inject.len()
568 }
569 }
570
571 cfg_unstable_metrics! {
572 impl Handle {
573 pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
574 &self.shared.scheduler_metrics
575 }
576
577 pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
578 assert_eq!(0, worker);
579 &self.shared.worker_metrics
580 }
581
582 pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
583 self.worker_metrics(worker).queue_depth()
584 }
585
586 pub(crate) fn num_blocking_threads(&self) -> usize {
587 self.blocking_spawner.num_threads()
588 }
589
590 pub(crate) fn num_idle_blocking_threads(&self) -> usize {
591 self.blocking_spawner.num_idle_threads()
592 }
593
594 pub(crate) fn blocking_queue_depth(&self) -> usize {
595 self.blocking_spawner.queue_depth()
596 }
597
598 cfg_64bit_metrics! {
599 pub(crate) fn spawned_tasks_count(&self) -> u64 {
600 self.shared.owned.spawned_tasks_count()
601 }
602 }
603 }
604 }
605
606 cfg_unstable! {
607 use std::num::NonZeroU64;
608
609 impl Handle {
610 pub(crate) fn owned_id(&self) -> NonZeroU64 {
611 self.shared.owned.id
612 }
613 }
614 }
615
616 impl fmt::Debug for Handle {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result617 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
618 fmt.debug_struct("current_thread::Handle { ... }").finish()
619 }
620 }
621
622 // ===== impl Shared =====
623
624 impl Schedule for Arc<Handle> {
release(&self, task: &Task<Self>) -> Option<Task<Self>>625 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
626 self.shared.owned.remove(task)
627 }
628
schedule(&self, task: task::Notified<Self>)629 fn schedule(&self, task: task::Notified<Self>) {
630 use scheduler::Context::CurrentThread;
631
632 context::with_scheduler(|maybe_cx| match maybe_cx {
633 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
634 let mut core = cx.core.borrow_mut();
635
636 // If `None`, the runtime is shutting down, so there is no need
637 // to schedule the task.
638 if let Some(core) = core.as_mut() {
639 core.push_task(self, task);
640 }
641 }
642 _ => {
643 // Track that a task was scheduled from **outside** of the runtime.
644 self.shared.scheduler_metrics.inc_remote_schedule_count();
645
646 // Schedule the task
647 self.shared.inject.push(task);
648 self.driver.unpark();
649 }
650 });
651 }
652
hooks(&self) -> TaskHarnessScheduleHooks653 fn hooks(&self) -> TaskHarnessScheduleHooks {
654 TaskHarnessScheduleHooks {
655 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
656 }
657 }
658
659 cfg_unstable! {
660 fn unhandled_panic(&self) {
661 use crate::runtime::UnhandledPanic;
662
663 match self.shared.config.unhandled_panic {
664 UnhandledPanic::Ignore => {
665 // Do nothing
666 }
667 UnhandledPanic::ShutdownRuntime => {
668 use scheduler::Context::CurrentThread;
669
670 // This hook is only called from within the runtime, so
671 // `context::with_scheduler` should match with `&self`, i.e.
672 // there is no opportunity for a nested scheduler to be
673 // called.
674 context::with_scheduler(|maybe_cx| match maybe_cx {
675 Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
676 let mut core = cx.core.borrow_mut();
677
678 // If `None`, the runtime is shutting down, so there is no need to signal shutdown
679 if let Some(core) = core.as_mut() {
680 core.unhandled_panic = true;
681 self.shared.owned.close_and_shutdown_all(0);
682 }
683 }
684 _ => unreachable!("runtime core not set in CURRENT thread-local"),
685 })
686 }
687 }
688 }
689 }
690 }
691
692 impl Wake for Handle {
wake(arc_self: Arc<Self>)693 fn wake(arc_self: Arc<Self>) {
694 Wake::wake_by_ref(&arc_self);
695 }
696
697 /// Wake by reference
wake_by_ref(arc_self: &Arc<Self>)698 fn wake_by_ref(arc_self: &Arc<Self>) {
699 arc_self.shared.woken.store(true, Release);
700 arc_self.driver.unpark();
701 }
702 }
703
704 // ===== CoreGuard =====
705
706 /// Used to ensure we always place the `Core` value back into its slot in
707 /// `CurrentThread`, even if the future panics.
708 struct CoreGuard<'a> {
709 context: scheduler::Context,
710 scheduler: &'a CurrentThread,
711 }
712
713 impl CoreGuard<'_> {
714 #[track_caller]
block_on<F: Future>(self, future: F) -> F::Output715 fn block_on<F: Future>(self, future: F) -> F::Output {
716 let ret = self.enter(|mut core, context| {
717 let waker = Handle::waker_ref(&context.handle);
718 let mut cx = std::task::Context::from_waker(&waker);
719
720 pin!(future);
721
722 core.metrics.start_processing_scheduled_tasks();
723
724 'outer: loop {
725 let handle = &context.handle;
726
727 if handle.reset_woken() {
728 let (c, res) = context.enter(core, || {
729 crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
730 });
731
732 core = c;
733
734 if let Ready(v) = res {
735 return (core, Some(v));
736 }
737 }
738
739 for _ in 0..handle.shared.config.event_interval {
740 // Make sure we didn't hit an unhandled_panic
741 if core.unhandled_panic {
742 return (core, None);
743 }
744
745 core.tick();
746
747 let entry = core.next_task(handle);
748
749 let task = match entry {
750 Some(entry) => entry,
751 None => {
752 core.metrics.end_processing_scheduled_tasks();
753
754 core = if !context.defer.is_empty() {
755 context.park_yield(core, handle)
756 } else {
757 context.park(core, handle)
758 };
759
760 core.metrics.start_processing_scheduled_tasks();
761
762 // Try polling the `block_on` future next
763 continue 'outer;
764 }
765 };
766
767 let task = context.handle.shared.owned.assert_owner(task);
768
769 let (c, ()) = context.run_task(core, || {
770 task.run();
771 });
772
773 core = c;
774 }
775
776 core.metrics.end_processing_scheduled_tasks();
777
778 // Yield to the driver, this drives the timer and pulls any
779 // pending I/O events.
780 core = context.park_yield(core, handle);
781
782 core.metrics.start_processing_scheduled_tasks();
783 }
784 });
785
786 match ret {
787 Some(ret) => ret,
788 None => {
789 // `block_on` panicked.
790 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
791 }
792 }
793 }
794
795 /// Enters the scheduler context. This sets the queue and other necessary
796 /// scheduler state in the thread-local.
enter<F, R>(self, f: F) -> R where F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),797 fn enter<F, R>(self, f: F) -> R
798 where
799 F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
800 {
801 let context = self.context.expect_current_thread();
802
803 // Remove `core` from `context` to pass into the closure.
804 let core = context.core.borrow_mut().take().expect("core missing");
805
806 // Call the closure and place `core` back
807 let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
808
809 *context.core.borrow_mut() = Some(core);
810
811 ret
812 }
813 }
814
815 impl Drop for CoreGuard<'_> {
drop(&mut self)816 fn drop(&mut self) {
817 let context = self.context.expect_current_thread();
818
819 if let Some(core) = context.core.borrow_mut().take() {
820 // Replace old scheduler back into the state to allow
821 // other threads to pick it up and drive it.
822 self.scheduler.core.set(core);
823
824 // Wake up other possible threads that could steal the driver.
825 self.scheduler.notify.notify_one();
826 }
827 }
828 }
829