1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 //!
7 //! # Shutdown
8 //!
9 //! Shutting down the runtime involves the following steps:
10 //!
11 //!  1. The Shared::close method is called. This closes the inject queue and
12 //!     `OwnedTasks` instance and wakes up all worker threads.
13 //!
14 //!  2. Each worker thread observes the close signal next time it runs
15 //!     Core::maintenance by checking whether the inject queue is closed.
16 //!     The `Core::is_shutdown` flag is set to true.
17 //!
18 //!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19 //!     will keep removing tasks from `OwnedTasks` until it is empty. No new
20 //!     tasks can be pushed to the `OwnedTasks` during or after this step as it
21 //!     was closed in step 1.
22 //!
23 //!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24 //!     shutdown. These calls will push their core to `Shared::shutdown_cores`,
25 //!     and the last thread to push its core will finish the shutdown procedure.
26 //!
27 //!  6. The local run queue of each core is emptied, then the inject queue is
28 //!     emptied.
29 //!
30 //! At this point, shutdown has completed. It is not possible for any of the
31 //! collections to contain any tasks at this point, as each collection was
32 //! closed first, then emptied afterwards.
33 //!
34 //! ## Spawns during shutdown
35 //!
36 //! When spawning tasks during shutdown, there are two cases:
37 //!
38 //!  * The spawner observes the `OwnedTasks` being open, and the inject queue is
39 //!    closed.
40 //!  * The spawner observes the `OwnedTasks` being closed and doesn't check the
41 //!    inject queue.
42 //!
43 //! The first case can only happen if the `OwnedTasks::bind` call happens before
44 //! or during step 1 of shutdown. In this case, the runtime will clean up the
45 //! task in step 3 of shutdown.
46 //!
47 //! In the latter case, the task was not spawned and the task is immediately
48 //! cancelled by the spawner.
49 //!
50 //! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51 //! collection to have a closed bit. With a close bit on only the inject queue,
52 //! spawning could run in to a situation where a task is successfully bound long
53 //! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54 //! the first spawning situation could result in the notification being pushed
55 //! to the inject queue after step 6 of shutdown, which would leave a task in
56 //! the inject queue indefinitely. This would be a ref-count cycle and a memory
57 //! leak.
58 
59 use crate::loom::sync::{Arc, Condvar, Mutex, MutexGuard};
60 use crate::runtime;
61 use crate::runtime::driver::Driver;
62 use crate::runtime::scheduler::multi_thread_alt::{
63     idle, queue, stats, Counters, Handle, Idle, Overflow, Stats, TraceStatus,
64 };
65 use crate::runtime::scheduler::{self, inject, Lock};
66 use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
67 use crate::runtime::{blocking, coop, driver, task, Config, SchedulerMetrics, WorkerMetrics};
68 use crate::runtime::{context, TaskHooks};
69 use crate::util::atomic_cell::AtomicCell;
70 use crate::util::rand::{FastRand, RngSeedGenerator};
71 
72 use std::cell::{Cell, RefCell};
73 use std::task::Waker;
74 use std::time::Duration;
75 use std::{cmp, thread};
76 
77 cfg_unstable_metrics! {
78     mod metrics;
79 }
80 
81 mod taskdump_mock;
82 
83 /// A scheduler worker
84 ///
85 /// Data is stack-allocated and never migrates threads
86 pub(super) struct Worker {
87     /// Used to schedule bookkeeping tasks every so often.
88     tick: u32,
89 
90     /// True if the scheduler is being shutdown
91     pub(super) is_shutdown: bool,
92 
93     /// True if the scheduler is being traced
94     is_traced: bool,
95 
96     /// Counter used to track when to poll from the local queue vs. the
97     /// injection queue
98     num_seq_local_queue_polls: u32,
99 
100     /// How often to check the global queue
101     global_queue_interval: u32,
102 
103     /// Used to collect a list of workers to notify
104     workers_to_notify: Vec<usize>,
105 
106     /// Snapshot of idle core list. This helps speedup stealing
107     idle_snapshot: idle::Snapshot,
108 
109     stats: stats::Ephemeral,
110 }
111 
112 /// Core data
113 ///
114 /// Data is heap-allocated and migrates threads.
115 #[repr(align(128))]
116 pub(super) struct Core {
117     /// Index holding this core's remote/shared state.
118     pub(super) index: usize,
119 
120     lifo_slot: Option<Notified>,
121 
122     /// The worker-local run queue.
123     run_queue: queue::Local<Arc<Handle>>,
124 
125     /// True if the worker is currently searching for more work. Searching
126     /// involves attempting to steal from other workers.
127     pub(super) is_searching: bool,
128 
129     /// Per-worker runtime stats
130     stats: Stats,
131 
132     /// Fast random number generator.
133     rand: FastRand,
134 }
135 
136 /// State shared across all workers
137 pub(crate) struct Shared {
138     /// Per-core remote state.
139     remotes: Box<[Remote]>,
140 
141     /// Global task queue used for:
142     ///  1. Submit work to the scheduler while **not** currently on a worker thread.
143     ///  2. Submit work to the scheduler when a worker run queue is saturated
144     pub(super) inject: inject::Shared<Arc<Handle>>,
145 
146     /// Coordinates idle workers
147     idle: Idle,
148 
149     /// Collection of all active tasks spawned onto this executor.
150     pub(super) owned: OwnedTasks<Arc<Handle>>,
151 
152     /// Data synchronized by the scheduler mutex
153     pub(super) synced: Mutex<Synced>,
154 
155     /// Power's Tokio's I/O, timers, etc... the responsibility of polling the
156     /// driver is shared across workers.
157     driver: AtomicCell<Driver>,
158 
159     /// Condition variables used to unblock worker threads. Each worker thread
160     /// has its own `condvar` it waits on.
161     pub(super) condvars: Vec<Condvar>,
162 
163     /// The number of cores that have observed the trace signal.
164     pub(super) trace_status: TraceStatus,
165 
166     /// Scheduler configuration options
167     config: Config,
168 
169     /// Collects metrics from the runtime.
170     pub(super) scheduler_metrics: SchedulerMetrics,
171 
172     pub(super) worker_metrics: Box<[WorkerMetrics]>,
173 
174     /// Only held to trigger some code on drop. This is used to get internal
175     /// runtime metrics that can be useful when doing performance
176     /// investigations. This does nothing (empty struct, no drop impl) unless
177     /// the `tokio_internal_mt_counters` `cfg` flag is set.
178     _counters: Counters,
179 }
180 
181 /// Data synchronized by the scheduler mutex
182 pub(crate) struct Synced {
183     /// When worker is notified, it is assigned a core. The core is placed here
184     /// until the worker wakes up to take it.
185     pub(super) assigned_cores: Vec<Option<Box<Core>>>,
186 
187     /// Cores that have observed the shutdown signal
188     ///
189     /// The core is **not** placed back in the worker to avoid it from being
190     /// stolen by a thread that was spawned as part of `block_in_place`.
191     shutdown_cores: Vec<Box<Core>>,
192 
193     /// The driver goes here when shutting down
194     shutdown_driver: Option<Box<Driver>>,
195 
196     /// Synchronized state for `Idle`.
197     pub(super) idle: idle::Synced,
198 
199     /// Synchronized state for `Inject`.
200     pub(crate) inject: inject::Synced,
201 }
202 
203 /// Used to communicate with a worker from other threads.
204 struct Remote {
205     /// When a task is scheduled from a worker, it is stored in this slot. The
206     /// worker will check this slot for a task **before** checking the run
207     /// queue. This effectively results in the **last** scheduled task to be run
208     /// next (LIFO). This is an optimization for improving locality which
209     /// benefits message passing patterns and helps to reduce latency.
210     // lifo_slot: Lifo,
211 
212     /// Steals tasks from this worker.
213     pub(super) steal: queue::Steal<Arc<Handle>>,
214 }
215 
216 /// Thread-local context
217 pub(crate) struct Context {
218     // Current scheduler's handle
219     handle: Arc<Handle>,
220 
221     /// Worker index
222     index: usize,
223 
224     /// True when the LIFO slot is enabled
225     lifo_enabled: Cell<bool>,
226 
227     /// Core data
228     core: RefCell<Option<Box<Core>>>,
229 
230     /// Used to pass cores to other threads when `block_in_place` is called
231     handoff_core: Arc<AtomicCell<Core>>,
232 
233     /// Tasks to wake after resource drivers are polled. This is mostly to
234     /// handle yielded tasks.
235     pub(crate) defer: RefCell<Vec<Notified>>,
236 }
237 
238 /// Running a task may consume the core. If the core is still available when
239 /// running the task completes, it is returned. Otherwise, the worker will need
240 /// to stop processing.
241 type RunResult = Result<Box<Core>, ()>;
242 type NextTaskResult = Result<(Option<Notified>, Box<Core>), ()>;
243 
244 /// A task handle
245 type Task = task::Task<Arc<Handle>>;
246 
247 /// A notified task handle
248 type Notified = task::Notified<Arc<Handle>>;
249 
250 /// Value picked out of thin-air. Running the LIFO slot a handful of times
251 /// seems sufficient to benefit from locality. More than 3 times probably is
252 /// overweighing. The value can be tuned in the future with data that shows
253 /// improvements.
254 const MAX_LIFO_POLLS_PER_TICK: usize = 3;
255 
create( num_cores: usize, driver: Driver, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, ) -> runtime::Handle256 pub(super) fn create(
257     num_cores: usize,
258     driver: Driver,
259     driver_handle: driver::Handle,
260     blocking_spawner: blocking::Spawner,
261     seed_generator: RngSeedGenerator,
262     config: Config,
263 ) -> runtime::Handle {
264     let mut num_workers = num_cores;
265 
266     // If the driver is enabled, we need an extra thread to handle polling the
267     // driver when all cores are busy.
268     if driver.is_enabled() {
269         num_workers += 1;
270     }
271 
272     let mut cores = Vec::with_capacity(num_cores);
273     let mut remotes = Vec::with_capacity(num_cores);
274     // Worker metrics are actually core based
275     let mut worker_metrics = Vec::with_capacity(num_cores);
276 
277     // Create the local queues
278     for i in 0..num_cores {
279         let (steal, run_queue) = queue::local(config.local_queue_capacity);
280 
281         let metrics = WorkerMetrics::from_config(&config);
282         let stats = Stats::new(&metrics);
283 
284         cores.push(Box::new(Core {
285             index: i,
286             lifo_slot: None,
287             run_queue,
288             is_searching: false,
289             stats,
290             rand: FastRand::from_seed(config.seed_generator.next_seed()),
291         }));
292 
293         remotes.push(Remote {
294             steal,
295             // lifo_slot: Lifo::new(),
296         });
297         worker_metrics.push(metrics);
298     }
299 
300     // Allocate num-cores + 1 workers, so one worker can handle the I/O driver,
301     // if needed.
302     let (idle, idle_synced) = Idle::new(cores, num_workers);
303     let (inject, inject_synced) = inject::Shared::new();
304 
305     let handle = Arc::new(Handle {
306         task_hooks: TaskHooks {
307             task_spawn_callback: config.before_spawn.clone(),
308             task_terminate_callback: config.after_termination.clone(),
309         },
310         shared: Shared {
311             remotes: remotes.into_boxed_slice(),
312             inject,
313             idle,
314             owned: OwnedTasks::new(num_cores),
315             synced: Mutex::new(Synced {
316                 assigned_cores: (0..num_workers).map(|_| None).collect(),
317                 shutdown_cores: Vec::with_capacity(num_cores),
318                 shutdown_driver: None,
319                 idle: idle_synced,
320                 inject: inject_synced,
321             }),
322             driver: AtomicCell::new(Some(Box::new(driver))),
323             condvars: (0..num_workers).map(|_| Condvar::new()).collect(),
324             trace_status: TraceStatus::new(num_cores),
325             config,
326             scheduler_metrics: SchedulerMetrics::new(),
327             worker_metrics: worker_metrics.into_boxed_slice(),
328             _counters: Counters,
329         },
330         driver: driver_handle,
331         blocking_spawner,
332         seed_generator,
333     });
334 
335     let rt_handle = runtime::Handle {
336         inner: scheduler::Handle::MultiThreadAlt(handle),
337     };
338 
339     // Eagerly start worker threads
340     for index in 0..num_workers {
341         let handle = rt_handle.inner.expect_multi_thread_alt();
342         let h2 = handle.clone();
343         let handoff_core = Arc::new(AtomicCell::new(None));
344 
345         handle
346             .blocking_spawner
347             .spawn_blocking(&rt_handle, move || run(index, h2, handoff_core, false));
348     }
349 
350     rt_handle
351 }
352 
353 #[track_caller]
block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R,354 pub(crate) fn block_in_place<F, R>(f: F) -> R
355 where
356     F: FnOnce() -> R,
357 {
358     // Try to steal the worker core back
359     struct Reset(coop::Budget);
360 
361     impl Drop for Reset {
362         fn drop(&mut self) {
363             with_current(|maybe_cx| {
364                 if let Some(cx) = maybe_cx {
365                     let core = cx.handoff_core.take();
366                     let mut cx_core = cx.core.borrow_mut();
367                     assert!(cx_core.is_none());
368                     *cx_core = core;
369 
370                     // Reset the task budget as we are re-entering the
371                     // runtime.
372                     coop::set(self.0);
373                 }
374             });
375         }
376     }
377 
378     let mut had_entered = false;
379 
380     let setup_result = with_current(|maybe_cx| {
381         match (
382             crate::runtime::context::current_enter_context(),
383             maybe_cx.is_some(),
384         ) {
385             (context::EnterRuntime::Entered { .. }, true) => {
386                 // We are on a thread pool runtime thread, so we just need to
387                 // set up blocking.
388                 had_entered = true;
389             }
390             (
391                 context::EnterRuntime::Entered {
392                     allow_block_in_place,
393                 },
394                 false,
395             ) => {
396                 // We are on an executor, but _not_ on the thread pool.  That is
397                 // _only_ okay if we are in a thread pool runtime's block_on
398                 // method:
399                 if allow_block_in_place {
400                     had_entered = true;
401                     return Ok(());
402                 } else {
403                     // This probably means we are on the current_thread runtime or in a
404                     // LocalSet, where it is _not_ okay to block.
405                     return Err(
406                         "can call blocking only when running on the multi-threaded runtime",
407                     );
408                 }
409             }
410             (context::EnterRuntime::NotEntered, true) => {
411                 // This is a nested call to block_in_place (we already exited).
412                 // All the necessary setup has already been done.
413                 return Ok(());
414             }
415             (context::EnterRuntime::NotEntered, false) => {
416                 // We are outside of the tokio runtime, so blocking is fine.
417                 // We can also skip all of the thread pool blocking setup steps.
418                 return Ok(());
419             }
420         }
421 
422         let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
423 
424         // Get the worker core. If none is set, then blocking is fine!
425         let core = match cx.core.borrow_mut().take() {
426             Some(core) => core,
427             None => return Ok(()),
428         };
429 
430         // In order to block, the core must be sent to another thread for
431         // execution.
432         //
433         // First, move the core back into the worker's shared core slot.
434         cx.handoff_core.set(core);
435 
436         // Next, clone the worker handle and send it to a new thread for
437         // processing.
438         //
439         // Once the blocking task is done executing, we will attempt to
440         // steal the core back.
441         let index = cx.index;
442         let handle = cx.handle.clone();
443         let handoff_core = cx.handoff_core.clone();
444         runtime::spawn_blocking(move || run(index, handle, handoff_core, true));
445         Ok(())
446     });
447 
448     if let Err(panic_message) = setup_result {
449         panic!("{}", panic_message);
450     }
451 
452     if had_entered {
453         // Unset the current task's budget. Blocking sections are not
454         // constrained by task budgets.
455         let _reset = Reset(coop::stop());
456 
457         crate::runtime::context::exit_runtime(f)
458     } else {
459         f()
460     }
461 }
462 
run( index: usize, handle: Arc<Handle>, handoff_core: Arc<AtomicCell<Core>>, blocking_in_place: bool, )463 fn run(
464     index: usize,
465     handle: Arc<Handle>,
466     handoff_core: Arc<AtomicCell<Core>>,
467     blocking_in_place: bool,
468 ) {
469     struct AbortOnPanic;
470 
471     impl Drop for AbortOnPanic {
472         fn drop(&mut self) {
473             if std::thread::panicking() {
474                 eprintln!("worker thread panicking; aborting process");
475                 std::process::abort();
476             }
477         }
478     }
479 
480     // Catching panics on worker threads in tests is quite tricky. Instead, when
481     // debug assertions are enabled, we just abort the process.
482     #[cfg(debug_assertions)]
483     let _abort_on_panic = AbortOnPanic;
484 
485     let num_workers = handle.shared.condvars.len();
486 
487     let mut worker = Worker {
488         tick: 0,
489         num_seq_local_queue_polls: 0,
490         global_queue_interval: Stats::DEFAULT_GLOBAL_QUEUE_INTERVAL,
491         is_shutdown: false,
492         is_traced: false,
493         workers_to_notify: Vec::with_capacity(num_workers - 1),
494         idle_snapshot: idle::Snapshot::new(&handle.shared.idle),
495         stats: stats::Ephemeral::new(),
496     };
497 
498     let sched_handle = scheduler::Handle::MultiThreadAlt(handle.clone());
499 
500     crate::runtime::context::enter_runtime(&sched_handle, true, |_| {
501         // Set the worker context.
502         let cx = scheduler::Context::MultiThreadAlt(Context {
503             index,
504             lifo_enabled: Cell::new(!handle.shared.config.disable_lifo_slot),
505             handle,
506             core: RefCell::new(None),
507             handoff_core,
508             defer: RefCell::new(Vec::with_capacity(64)),
509         });
510 
511         context::set_scheduler(&cx, || {
512             let cx = cx.expect_multi_thread_alt();
513 
514             // Run the worker
515             let res = worker.run(&cx, blocking_in_place);
516             // `err` here signifies the core was lost, this is an expected end
517             // state for a worker.
518             debug_assert!(res.is_err());
519 
520             // Check if there are any deferred tasks to notify. This can happen when
521             // the worker core is lost due to `block_in_place()` being called from
522             // within the task.
523             if !cx.defer.borrow().is_empty() {
524                 worker.schedule_deferred_without_core(&cx, &mut cx.shared().synced.lock());
525             }
526         });
527     });
528 }
529 
530 macro_rules! try_task {
531     ($e:expr) => {{
532         let (task, core) = $e?;
533         if task.is_some() {
534             return Ok((task, core));
535         }
536         core
537     }};
538 }
539 
540 macro_rules! try_task_new_batch {
541     ($w:expr, $e:expr) => {{
542         let (task, mut core) = $e?;
543         if task.is_some() {
544             core.stats.start_processing_scheduled_tasks(&mut $w.stats);
545             return Ok((task, core));
546         }
547         core
548     }};
549 }
550 
551 impl Worker {
run(&mut self, cx: &Context, blocking_in_place: bool) -> RunResult552     fn run(&mut self, cx: &Context, blocking_in_place: bool) -> RunResult {
553         let (maybe_task, mut core) = {
554             if blocking_in_place {
555                 if let Some(core) = cx.handoff_core.take() {
556                     (None, core)
557                 } else {
558                     // Just shutdown
559                     return Err(());
560                 }
561             } else {
562                 let mut synced = cx.shared().synced.lock();
563 
564                 // First try to acquire an available core
565                 if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
566                     // Try to poll a task from the global queue
567                     let maybe_task = cx.shared().next_remote_task_synced(&mut synced);
568                     (maybe_task, core)
569                 } else {
570                     // block the thread to wait for a core to be assigned to us
571                     self.wait_for_core(cx, synced)?
572                 }
573             }
574         };
575 
576         cx.shared().worker_metrics[core.index].set_thread_id(thread::current().id());
577         core.stats.start_processing_scheduled_tasks(&mut self.stats);
578 
579         if let Some(task) = maybe_task {
580             core = self.run_task(cx, core, task)?;
581         }
582 
583         while !self.is_shutdown {
584             let (maybe_task, c) = self.next_task(cx, core)?;
585             core = c;
586 
587             if let Some(task) = maybe_task {
588                 core = self.run_task(cx, core, task)?;
589             } else {
590                 // The only reason to get `None` from `next_task` is we have
591                 // entered the shutdown phase.
592                 assert!(self.is_shutdown);
593                 break;
594             }
595         }
596 
597         cx.shared().shutdown_core(&cx.handle, core);
598 
599         // It is possible that tasks wake others during drop, so we need to
600         // clear the defer list.
601         self.shutdown_clear_defer(cx);
602 
603         Err(())
604     }
605 
606     // Try to acquire an available core, but do not block the thread
try_acquire_available_core( &mut self, cx: &Context, synced: &mut Synced, ) -> Option<Box<Core>>607     fn try_acquire_available_core(
608         &mut self,
609         cx: &Context,
610         synced: &mut Synced,
611     ) -> Option<Box<Core>> {
612         if let Some(mut core) = cx
613             .shared()
614             .idle
615             .try_acquire_available_core(&mut synced.idle)
616         {
617             self.reset_acquired_core(cx, synced, &mut core);
618             Some(core)
619         } else {
620             None
621         }
622     }
623 
624     // Block the current thread, waiting for an available core
wait_for_core( &mut self, cx: &Context, mut synced: MutexGuard<'_, Synced>, ) -> NextTaskResult625     fn wait_for_core(
626         &mut self,
627         cx: &Context,
628         mut synced: MutexGuard<'_, Synced>,
629     ) -> NextTaskResult {
630         if cx.shared().idle.needs_searching() {
631             if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
632                 cx.shared().idle.transition_worker_to_searching(&mut core);
633                 return Ok((None, core));
634             }
635         }
636 
637         cx.shared()
638             .idle
639             .transition_worker_to_parked(&mut synced, cx.index);
640 
641         // Wait until a core is available, then exit the loop.
642         let mut core = loop {
643             if let Some(core) = synced.assigned_cores[cx.index].take() {
644                 break core;
645             }
646 
647             // If shutting down, abort
648             if cx.shared().inject.is_closed(&synced.inject) {
649                 self.shutdown_clear_defer(cx);
650                 return Err(());
651             }
652 
653             synced = cx.shared().condvars[cx.index].wait(synced).unwrap();
654         };
655 
656         self.reset_acquired_core(cx, &mut synced, &mut core);
657 
658         if self.is_shutdown {
659             // Currently shutting down, don't do any more work
660             return Ok((None, core));
661         }
662 
663         let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
664         let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n);
665 
666         core.stats.unparked();
667         self.flush_metrics(cx, &mut core);
668 
669         Ok((maybe_task, core))
670     }
671 
672     /// Ensure core's state is set correctly for the worker to start using.
reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core)673     fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) {
674         self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config);
675 
676         // Reset `lifo_enabled` here in case the core was previously stolen from
677         // a task that had the LIFO slot disabled.
678         self.reset_lifo_enabled(cx);
679 
680         // At this point, the local queue should be empty
681         #[cfg(not(loom))]
682         debug_assert!(core.run_queue.is_empty());
683 
684         // Update shutdown state while locked
685         self.update_global_flags(cx, synced);
686     }
687 
688     /// Finds the next task to run, this could be from a queue or stealing. If
689     /// none are available, the thread sleeps and tries again.
next_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult690     fn next_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
691         self.assert_lifo_enabled_is_correct(cx);
692 
693         if self.is_traced {
694             core = cx.handle.trace_core(core);
695         }
696 
697         // Increment the tick
698         self.tick = self.tick.wrapping_add(1);
699 
700         // Runs maintenance every so often. When maintenance is run, the
701         // driver is checked, which may result in a task being found.
702         core = try_task!(self.maybe_maintenance(&cx, core));
703 
704         // Check the LIFO slot, local run queue, and the injection queue for
705         // a notified task.
706         core = try_task!(self.next_notified_task(cx, core));
707 
708         // We consumed all work in the queues and will start searching for work.
709         core.stats.end_processing_scheduled_tasks(&mut self.stats);
710 
711         super::counters::inc_num_no_local_work();
712 
713         if !cx.defer.borrow().is_empty() {
714             // We are deferring tasks, so poll the resource driver and schedule
715             // the deferred tasks.
716             try_task_new_batch!(self, self.park_yield(cx, core));
717 
718             panic!("what happened to the deferred tasks? ��");
719         }
720 
721         while !self.is_shutdown {
722             // Search for more work, this involves trying to poll the resource
723             // driver, steal from other workers, and check the global queue
724             // again.
725             core = try_task_new_batch!(self, self.search_for_work(cx, core));
726 
727             debug_assert!(cx.defer.borrow().is_empty());
728             core = try_task_new_batch!(self, self.park(cx, core));
729         }
730 
731         // Shutting down, drop any deferred tasks
732         self.shutdown_clear_defer(cx);
733 
734         Ok((None, core))
735     }
736 
next_notified_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult737     fn next_notified_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
738         self.num_seq_local_queue_polls += 1;
739 
740         if self.num_seq_local_queue_polls % self.global_queue_interval == 0 {
741             super::counters::inc_global_queue_interval();
742 
743             self.num_seq_local_queue_polls = 0;
744 
745             // Update the global queue interval, if needed
746             self.tune_global_queue_interval(cx, &mut core);
747 
748             if let Some(task) = self.next_remote_task(cx) {
749                 return Ok((Some(task), core));
750             }
751         }
752 
753         if let Some(task) = core.next_local_task() {
754             return Ok((Some(task), core));
755         }
756 
757         self.next_remote_task_batch(cx, core)
758     }
759 
next_remote_task(&self, cx: &Context) -> Option<Notified>760     fn next_remote_task(&self, cx: &Context) -> Option<Notified> {
761         if cx.shared().inject.is_empty() {
762             return None;
763         }
764 
765         let mut synced = cx.shared().synced.lock();
766         cx.shared().next_remote_task_synced(&mut synced)
767     }
768 
next_remote_task_batch(&self, cx: &Context, mut core: Box<Core>) -> NextTaskResult769     fn next_remote_task_batch(&self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
770         if cx.shared().inject.is_empty() {
771             return Ok((None, core));
772         }
773 
774         // Other threads can only **remove** tasks from the current worker's
775         // `run_queue`. So, we can be confident that by the time we call
776         // `run_queue.push_back` below, there will be *at least* `cap`
777         // available slots in the queue.
778         let cap = usize::min(
779             core.run_queue.remaining_slots(),
780             usize::max(core.run_queue.max_capacity() / 2, 1),
781         );
782 
783         let mut synced = cx.shared().synced.lock();
784         let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, cap);
785         Ok((maybe_task, core))
786     }
787 
next_remote_task_batch_synced( &self, cx: &Context, synced: &mut Synced, core: &mut Core, max: usize, ) -> Option<Notified>788     fn next_remote_task_batch_synced(
789         &self,
790         cx: &Context,
791         synced: &mut Synced,
792         core: &mut Core,
793         max: usize,
794     ) -> Option<Notified> {
795         super::counters::inc_num_remote_batch();
796 
797         // The worker is currently idle, pull a batch of work from the
798         // injection queue. We don't want to pull *all* the work so other
799         // workers can also get some.
800         let n = if core.is_searching {
801             cx.shared().inject.len() / cx.shared().idle.num_searching() + 1
802         } else {
803             cx.shared().inject.len() / cx.shared().remotes.len() + 1
804         };
805 
806         let n = usize::min(n, max) + 1;
807 
808         // safety: passing in the correct `inject::Synced`.
809         let mut tasks = unsafe { cx.shared().inject.pop_n(&mut synced.inject, n) };
810 
811         // Pop the first task to return immediately
812         let ret = tasks.next();
813 
814         // Push the rest of the on the run queue
815         core.run_queue.push_back(tasks);
816 
817         ret
818     }
819 
820     /// Function responsible for stealing tasks from another worker
821     ///
822     /// Note: Only if less than half the workers are searching for tasks to steal
823     /// a new worker will actually try to steal. The idea is to make sure not all
824     /// workers will be trying to steal at the same time.
search_for_work(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult825     fn search_for_work(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
826         #[cfg(not(loom))]
827         const ROUNDS: usize = 4;
828 
829         #[cfg(loom)]
830         const ROUNDS: usize = 1;
831 
832         debug_assert!(core.lifo_slot.is_none());
833         #[cfg(not(loom))]
834         debug_assert!(core.run_queue.is_empty());
835 
836         if !core.run_queue.can_steal() {
837             return Ok((None, core));
838         }
839 
840         if !self.transition_to_searching(cx, &mut core) {
841             return Ok((None, core));
842         }
843 
844         // core = try_task!(self, self.poll_driver(cx, core));
845 
846         // Get a snapshot of which workers are idle
847         cx.shared().idle.snapshot(&mut self.idle_snapshot);
848 
849         let num = cx.shared().remotes.len();
850 
851         for i in 0..ROUNDS {
852             // Start from a random worker
853             let start = core.rand.fastrand_n(num as u32) as usize;
854 
855             if let Some(task) = self.steal_one_round(cx, &mut core, start) {
856                 return Ok((Some(task), core));
857             }
858 
859             core = try_task!(self.next_remote_task_batch(cx, core));
860 
861             if i > 0 {
862                 super::counters::inc_num_spin_stall();
863                 std::thread::sleep(std::time::Duration::from_micros(i as u64));
864             }
865         }
866 
867         Ok((None, core))
868     }
869 
steal_one_round(&self, cx: &Context, core: &mut Core, start: usize) -> Option<Notified>870     fn steal_one_round(&self, cx: &Context, core: &mut Core, start: usize) -> Option<Notified> {
871         let num = cx.shared().remotes.len();
872 
873         for i in 0..num {
874             let i = (start + i) % num;
875 
876             // Don't steal from ourself! We know we don't have work.
877             if i == core.index {
878                 continue;
879             }
880 
881             // If the core is currently idle, then there is nothing to steal.
882             if self.idle_snapshot.is_idle(i) {
883                 continue;
884             }
885 
886             let target = &cx.shared().remotes[i];
887 
888             if let Some(task) = target
889                 .steal
890                 .steal_into(&mut core.run_queue, &mut core.stats)
891             {
892                 return Some(task);
893             }
894         }
895 
896         None
897     }
898 
run_task(&mut self, cx: &Context, mut core: Box<Core>, task: Notified) -> RunResult899     fn run_task(&mut self, cx: &Context, mut core: Box<Core>, task: Notified) -> RunResult {
900         let task = cx.shared().owned.assert_owner(task);
901 
902         // Make sure the worker is not in the **searching** state. This enables
903         // another idle worker to try to steal work.
904         if self.transition_from_searching(cx, &mut core) {
905             super::counters::inc_num_relay_search();
906             cx.shared().notify_parked_local();
907         }
908 
909         self.assert_lifo_enabled_is_correct(cx);
910 
911         // Measure the poll start time. Note that we may end up polling other
912         // tasks under this measurement. In this case, the tasks came from the
913         // LIFO slot and are considered part of the current task for scheduling
914         // purposes. These tasks inherent the "parent"'s limits.
915         core.stats.start_poll(&mut self.stats);
916 
917         // Make the core available to the runtime context
918         *cx.core.borrow_mut() = Some(core);
919 
920         // Run the task
921         coop::budget(|| {
922             super::counters::inc_num_polls();
923             task.run();
924             let mut lifo_polls = 0;
925 
926             // As long as there is budget remaining and a task exists in the
927             // `lifo_slot`, then keep running.
928             loop {
929                 // Check if we still have the core. If not, the core was stolen
930                 // by another worker.
931                 let mut core = match cx.core.borrow_mut().take() {
932                     Some(core) => core,
933                     None => {
934                         // In this case, we cannot call `reset_lifo_enabled()`
935                         // because the core was stolen. The stealer will handle
936                         // that at the top of `Context::run`
937                         return Err(());
938                     }
939                 };
940 
941                 // Check for a task in the LIFO slot
942                 let task = match core.next_lifo_task() {
943                     Some(task) => task,
944                     None => {
945                         self.reset_lifo_enabled(cx);
946                         core.stats.end_poll();
947                         return Ok(core);
948                     }
949                 };
950 
951                 if !coop::has_budget_remaining() {
952                     core.stats.end_poll();
953 
954                     // Not enough budget left to run the LIFO task, push it to
955                     // the back of the queue and return.
956                     core.run_queue
957                         .push_back_or_overflow(task, cx.shared(), &mut core.stats);
958                     // If we hit this point, the LIFO slot should be enabled.
959                     // There is no need to reset it.
960                     debug_assert!(cx.lifo_enabled.get());
961                     return Ok(core);
962                 }
963 
964                 // Track that we are about to run a task from the LIFO slot.
965                 lifo_polls += 1;
966                 super::counters::inc_lifo_schedules();
967 
968                 // Disable the LIFO slot if we reach our limit
969                 //
970                 // In ping-ping style workloads where task A notifies task B,
971                 // which notifies task A again, continuously prioritizing the
972                 // LIFO slot can cause starvation as these two tasks will
973                 // repeatedly schedule the other. To mitigate this, we limit the
974                 // number of times the LIFO slot is prioritized.
975                 if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
976                     cx.lifo_enabled.set(false);
977                     super::counters::inc_lifo_capped();
978                 }
979 
980                 // Run the LIFO task, then loop
981                 *cx.core.borrow_mut() = Some(core);
982                 let task = cx.shared().owned.assert_owner(task);
983                 super::counters::inc_num_lifo_polls();
984                 task.run();
985             }
986         })
987     }
988 
schedule_deferred_with_core<'a>( &mut self, cx: &'a Context, mut core: Box<Core>, synced: impl FnOnce() -> MutexGuard<'a, Synced>, ) -> NextTaskResult989     fn schedule_deferred_with_core<'a>(
990         &mut self,
991         cx: &'a Context,
992         mut core: Box<Core>,
993         synced: impl FnOnce() -> MutexGuard<'a, Synced>,
994     ) -> NextTaskResult {
995         let mut defer = cx.defer.borrow_mut();
996 
997         // Grab a task to run next
998         let task = defer.pop();
999 
1000         if task.is_none() {
1001             return Ok((None, core));
1002         }
1003 
1004         if !defer.is_empty() {
1005             let mut synced = synced();
1006 
1007             // Number of tasks we want to try to spread across idle workers
1008             let num_fanout = cmp::min(defer.len(), cx.shared().idle.num_idle(&synced.idle));
1009 
1010             // Cap the number of threads woken up at one time. This is to limit
1011             // the number of no-op wakes and reduce mutext contention.
1012             //
1013             // This number was picked after some basic benchmarks, but it can
1014             // probably be tuned using the mean poll time value (slower task
1015             // polls can leverage more woken workers).
1016             let num_fanout = cmp::min(2, num_fanout);
1017 
1018             if num_fanout > 0 {
1019                 cx.shared()
1020                     .push_remote_task_batch_synced(&mut synced, defer.drain(..num_fanout));
1021 
1022                 cx.shared()
1023                     .idle
1024                     .notify_mult(&mut synced, &mut self.workers_to_notify, num_fanout);
1025             }
1026 
1027             // Do not run the task while holding the lock...
1028             drop(synced);
1029         }
1030 
1031         // Notify any workers
1032         for worker in self.workers_to_notify.drain(..) {
1033             cx.shared().condvars[worker].notify_one()
1034         }
1035 
1036         if !defer.is_empty() {
1037             // Push the rest of the tasks on the local queue
1038             for task in defer.drain(..) {
1039                 core.run_queue
1040                     .push_back_or_overflow(task, cx.shared(), &mut core.stats);
1041             }
1042 
1043             cx.shared().notify_parked_local();
1044         }
1045 
1046         Ok((task, core))
1047     }
1048 
schedule_deferred_without_core<'a>(&mut self, cx: &Context, synced: &mut Synced)1049     fn schedule_deferred_without_core<'a>(&mut self, cx: &Context, synced: &mut Synced) {
1050         let mut defer = cx.defer.borrow_mut();
1051         let num = defer.len();
1052 
1053         if num > 0 {
1054             // Push all tasks to the injection queue
1055             cx.shared()
1056                 .push_remote_task_batch_synced(synced, defer.drain(..));
1057 
1058             debug_assert!(self.workers_to_notify.is_empty());
1059 
1060             // Notify workers
1061             cx.shared()
1062                 .idle
1063                 .notify_mult(synced, &mut self.workers_to_notify, num);
1064 
1065             // Notify any workers
1066             for worker in self.workers_to_notify.drain(..) {
1067                 cx.shared().condvars[worker].notify_one()
1068             }
1069         }
1070     }
1071 
maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult1072     fn maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1073         if self.tick % cx.shared().config.event_interval == 0 {
1074             super::counters::inc_num_maintenance();
1075 
1076             core.stats.end_processing_scheduled_tasks(&mut self.stats);
1077 
1078             // Run regularly scheduled maintenance
1079             core = try_task_new_batch!(self, self.park_yield(cx, core));
1080 
1081             core.stats.start_processing_scheduled_tasks(&mut self.stats);
1082         }
1083 
1084         Ok((None, core))
1085     }
1086 
flush_metrics(&self, cx: &Context, core: &mut Core)1087     fn flush_metrics(&self, cx: &Context, core: &mut Core) {
1088         core.stats.submit(&cx.shared().worker_metrics[core.index]);
1089     }
1090 
update_global_flags(&mut self, cx: &Context, synced: &mut Synced)1091     fn update_global_flags(&mut self, cx: &Context, synced: &mut Synced) {
1092         if !self.is_shutdown {
1093             self.is_shutdown = cx.shared().inject.is_closed(&synced.inject);
1094         }
1095 
1096         if !self.is_traced {
1097             self.is_traced = cx.shared().trace_status.trace_requested();
1098         }
1099     }
1100 
park_yield(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult1101     fn park_yield(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult {
1102         // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
1103         // to run without actually putting the thread to sleep.
1104         if let Some(mut driver) = cx.shared().driver.take() {
1105             driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
1106 
1107             cx.shared().driver.set(driver);
1108         }
1109 
1110         // If there are more I/O events, schedule them.
1111         let (maybe_task, mut core) =
1112             self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())?;
1113 
1114         self.flush_metrics(cx, &mut core);
1115         self.update_global_flags(cx, &mut cx.shared().synced.lock());
1116 
1117         Ok((maybe_task, core))
1118     }
1119 
1120     /*
1121     fn poll_driver(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult {
1122         // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
1123         // to run without actually putting the thread to sleep.
1124         if let Some(mut driver) = cx.shared().driver.take() {
1125             driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
1126 
1127             cx.shared().driver.set(driver);
1128 
1129             // If there are more I/O events, schedule them.
1130             self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())
1131         } else {
1132             Ok((None, core))
1133         }
1134     }
1135     */
1136 
park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult1137     fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1138         if let Some(f) = &cx.shared().config.before_park {
1139             f();
1140         }
1141 
1142         if self.can_transition_to_parked(&mut core) {
1143             debug_assert!(!self.is_shutdown);
1144             debug_assert!(!self.is_traced);
1145 
1146             core = try_task!(self.do_park(cx, core));
1147         }
1148 
1149         if let Some(f) = &cx.shared().config.after_unpark {
1150             f();
1151         }
1152 
1153         Ok((None, core))
1154     }
1155 
do_park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult1156     fn do_park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1157         let was_searching = core.is_searching;
1158 
1159         // Acquire the lock
1160         let mut synced = cx.shared().synced.lock();
1161 
1162         // The local queue should be empty at this point
1163         #[cfg(not(loom))]
1164         debug_assert!(core.run_queue.is_empty());
1165 
1166         // Try one last time to get tasks
1167         let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
1168         if let Some(task) = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n) {
1169             return Ok((Some(task), core));
1170         }
1171 
1172         if !was_searching {
1173             if cx
1174                 .shared()
1175                 .idle
1176                 .transition_worker_to_searching_if_needed(&mut synced.idle, &mut core)
1177             {
1178                 // Skip parking, go back to searching
1179                 return Ok((None, core));
1180             }
1181         }
1182 
1183         super::counters::inc_num_parks();
1184         core.stats.about_to_park();
1185         // Flush metrics to the runtime metrics aggregator
1186         self.flush_metrics(cx, &mut core);
1187 
1188         // If the runtime is shutdown, skip parking
1189         self.update_global_flags(cx, &mut synced);
1190 
1191         if self.is_shutdown {
1192             return Ok((None, core));
1193         }
1194 
1195         // Release the core
1196         core.is_searching = false;
1197         cx.shared().idle.release_core(&mut synced, core);
1198 
1199         drop(synced);
1200 
1201         if was_searching {
1202             if cx.shared().idle.transition_worker_from_searching() {
1203                 // cx.shared().idle.snapshot(&mut self.idle_snapshot);
1204                 // We were the last searching worker, we need to do one last check
1205                 for i in 0..cx.shared().remotes.len() {
1206                     if !cx.shared().remotes[i].steal.is_empty() {
1207                         let mut synced = cx.shared().synced.lock();
1208 
1209                         // Try to get a core
1210                         if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
1211                             cx.shared().idle.transition_worker_to_searching(&mut core);
1212                             return Ok((None, core));
1213                         } else {
1214                             // Fall back to the park routine
1215                             break;
1216                         }
1217                     }
1218                 }
1219             }
1220         }
1221 
1222         if let Some(mut driver) = cx.shared().take_driver() {
1223             // Wait for driver events
1224             driver.park(&cx.handle.driver);
1225 
1226             synced = cx.shared().synced.lock();
1227 
1228             if cx.shared().inject.is_closed(&mut synced.inject) {
1229                 synced.shutdown_driver = Some(driver);
1230                 self.shutdown_clear_defer(cx);
1231                 cx.shared().shutdown_finalize(&cx.handle, &mut synced);
1232                 return Err(());
1233             }
1234 
1235             // Put the driver back
1236             cx.shared().driver.set(driver);
1237 
1238             // Try to acquire an available core to schedule I/O events
1239             if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
1240                 // This may result in a task being run
1241                 self.schedule_deferred_with_core(cx, core, move || synced)
1242             } else {
1243                 // Schedule any deferred tasks
1244                 self.schedule_deferred_without_core(cx, &mut synced);
1245 
1246                 // Wait for a core.
1247                 self.wait_for_core(cx, synced)
1248             }
1249         } else {
1250             synced = cx.shared().synced.lock();
1251 
1252             // Wait for a core to be assigned to us
1253             self.wait_for_core(cx, synced)
1254         }
1255     }
1256 
transition_to_searching(&self, cx: &Context, core: &mut Core) -> bool1257     fn transition_to_searching(&self, cx: &Context, core: &mut Core) -> bool {
1258         if !core.is_searching {
1259             cx.shared().idle.try_transition_worker_to_searching(core);
1260         }
1261 
1262         core.is_searching
1263     }
1264 
1265     /// Returns `true` if another worker must be notified
transition_from_searching(&self, cx: &Context, core: &mut Core) -> bool1266     fn transition_from_searching(&self, cx: &Context, core: &mut Core) -> bool {
1267         if !core.is_searching {
1268             return false;
1269         }
1270 
1271         core.is_searching = false;
1272         cx.shared().idle.transition_worker_from_searching()
1273     }
1274 
can_transition_to_parked(&self, core: &mut Core) -> bool1275     fn can_transition_to_parked(&self, core: &mut Core) -> bool {
1276         !self.has_tasks(core) && !self.is_shutdown && !self.is_traced
1277     }
1278 
has_tasks(&self, core: &Core) -> bool1279     fn has_tasks(&self, core: &Core) -> bool {
1280         core.lifo_slot.is_some() || !core.run_queue.is_empty()
1281     }
1282 
reset_lifo_enabled(&self, cx: &Context)1283     fn reset_lifo_enabled(&self, cx: &Context) {
1284         cx.lifo_enabled
1285             .set(!cx.handle.shared.config.disable_lifo_slot);
1286     }
1287 
assert_lifo_enabled_is_correct(&self, cx: &Context)1288     fn assert_lifo_enabled_is_correct(&self, cx: &Context) {
1289         debug_assert_eq!(
1290             cx.lifo_enabled.get(),
1291             !cx.handle.shared.config.disable_lifo_slot
1292         );
1293     }
1294 
tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core)1295     fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) {
1296         let next = core.stats.tuned_global_queue_interval(&cx.shared().config);
1297 
1298         // Smooth out jitter
1299         if u32::abs_diff(self.global_queue_interval, next) > 2 {
1300             self.global_queue_interval = next;
1301         }
1302     }
1303 
shutdown_clear_defer(&self, cx: &Context)1304     fn shutdown_clear_defer(&self, cx: &Context) {
1305         let mut defer = cx.defer.borrow_mut();
1306 
1307         for task in defer.drain(..) {
1308             drop(task);
1309         }
1310     }
1311 }
1312 
1313 impl Context {
defer(&self, waker: &Waker)1314     pub(crate) fn defer(&self, waker: &Waker) {
1315         // TODO: refactor defer across all runtimes
1316         waker.wake_by_ref();
1317     }
1318 
shared(&self) -> &Shared1319     fn shared(&self) -> &Shared {
1320         &self.handle.shared
1321     }
1322 
1323     #[cfg_attr(not(feature = "time"), allow(dead_code))]
get_worker_index(&self) -> usize1324     pub(crate) fn get_worker_index(&self) -> usize {
1325         self.index
1326     }
1327 }
1328 
1329 impl Core {
next_local_task(&mut self) -> Option<Notified>1330     fn next_local_task(&mut self) -> Option<Notified> {
1331         self.next_lifo_task().or_else(|| self.run_queue.pop())
1332     }
1333 
next_lifo_task(&mut self) -> Option<Notified>1334     fn next_lifo_task(&mut self) -> Option<Notified> {
1335         self.lifo_slot.take()
1336     }
1337 }
1338 
1339 impl Shared {
next_remote_task_synced(&self, synced: &mut Synced) -> Option<Notified>1340     fn next_remote_task_synced(&self, synced: &mut Synced) -> Option<Notified> {
1341         // safety: we only have access to a valid `Synced` in this file.
1342         unsafe { self.inject.pop(&mut synced.inject) }
1343     }
1344 
schedule_task(&self, task: Notified, is_yield: bool)1345     pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1346         use std::ptr;
1347 
1348         with_current(|maybe_cx| {
1349             if let Some(cx) = maybe_cx {
1350                 // Make sure the task is part of the **current** scheduler.
1351                 if ptr::eq(self, &cx.handle.shared) {
1352                     // And the current thread still holds a core
1353                     if let Some(core) = cx.core.borrow_mut().as_mut() {
1354                         if is_yield {
1355                             cx.defer.borrow_mut().push(task);
1356                         } else {
1357                             self.schedule_local(cx, core, task);
1358                         }
1359                     } else {
1360                         // This can happen if either the core was stolen
1361                         // (`block_in_place`) or the notification happens from
1362                         // the driver.
1363                         cx.defer.borrow_mut().push(task);
1364                     }
1365                     return;
1366                 }
1367             }
1368 
1369             // Otherwise, use the inject queue.
1370             self.schedule_remote(task);
1371         })
1372     }
1373 
schedule_local(&self, cx: &Context, core: &mut Core, task: Notified)1374     fn schedule_local(&self, cx: &Context, core: &mut Core, task: Notified) {
1375         core.stats.inc_local_schedule_count();
1376 
1377         if cx.lifo_enabled.get() {
1378             // Push to the LIFO slot
1379             let prev = std::mem::replace(&mut core.lifo_slot, Some(task));
1380             // let prev = cx.shared().remotes[core.index].lifo_slot.swap_local(task);
1381 
1382             if let Some(prev) = prev {
1383                 core.run_queue
1384                     .push_back_or_overflow(prev, self, &mut core.stats);
1385             } else {
1386                 return;
1387             }
1388         } else {
1389             core.run_queue
1390                 .push_back_or_overflow(task, self, &mut core.stats);
1391         }
1392 
1393         self.notify_parked_local();
1394     }
1395 
notify_parked_local(&self)1396     fn notify_parked_local(&self) {
1397         super::counters::inc_num_inc_notify_local();
1398         self.idle.notify_local(self);
1399     }
1400 
schedule_remote(&self, task: Notified)1401     fn schedule_remote(&self, task: Notified) {
1402         super::counters::inc_num_notify_remote();
1403         self.scheduler_metrics.inc_remote_schedule_count();
1404 
1405         let mut synced = self.synced.lock();
1406         // Push the task in the
1407         self.push_remote_task(&mut synced, task);
1408 
1409         // Notify a worker. The mutex is passed in and will be released as part
1410         // of the method call.
1411         self.idle.notify_remote(synced, self);
1412     }
1413 
close(&self, handle: &Handle)1414     pub(super) fn close(&self, handle: &Handle) {
1415         {
1416             let mut synced = self.synced.lock();
1417 
1418             if let Some(driver) = self.driver.take() {
1419                 synced.shutdown_driver = Some(driver);
1420             }
1421 
1422             if !self.inject.close(&mut synced.inject) {
1423                 return;
1424             }
1425 
1426             // Set the shutdown flag on all available cores
1427             self.idle.shutdown(&mut synced, self);
1428         }
1429 
1430         // Any unassigned cores need to be shutdown, but we have to first drop
1431         // the lock
1432         self.idle.shutdown_unassigned_cores(handle, self);
1433     }
1434 
push_remote_task(&self, synced: &mut Synced, task: Notified)1435     fn push_remote_task(&self, synced: &mut Synced, task: Notified) {
1436         // safety: passing in correct `idle::Synced`
1437         unsafe {
1438             self.inject.push(&mut synced.inject, task);
1439         }
1440     }
1441 
push_remote_task_batch<I>(&self, iter: I) where I: Iterator<Item = task::Notified<Arc<Handle>>>,1442     fn push_remote_task_batch<I>(&self, iter: I)
1443     where
1444         I: Iterator<Item = task::Notified<Arc<Handle>>>,
1445     {
1446         unsafe {
1447             self.inject.push_batch(self, iter);
1448         }
1449     }
1450 
push_remote_task_batch_synced<I>(&self, synced: &mut Synced, iter: I) where I: Iterator<Item = task::Notified<Arc<Handle>>>,1451     fn push_remote_task_batch_synced<I>(&self, synced: &mut Synced, iter: I)
1452     where
1453         I: Iterator<Item = task::Notified<Arc<Handle>>>,
1454     {
1455         unsafe {
1456             self.inject.push_batch(&mut synced.inject, iter);
1457         }
1458     }
1459 
take_driver(&self) -> Option<Box<Driver>>1460     fn take_driver(&self) -> Option<Box<Driver>> {
1461         if !self.driver_enabled() {
1462             return None;
1463         }
1464 
1465         self.driver.take()
1466     }
1467 
driver_enabled(&self) -> bool1468     fn driver_enabled(&self) -> bool {
1469         self.condvars.len() > self.remotes.len()
1470     }
1471 
shutdown_core(&self, handle: &Handle, mut core: Box<Core>)1472     pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
1473         // Start from a random inner list
1474         let start = core.rand.fastrand_n(self.owned.get_shard_size() as u32);
1475         self.owned.close_and_shutdown_all(start as usize);
1476 
1477         core.stats.submit(&self.worker_metrics[core.index]);
1478 
1479         let mut synced = self.synced.lock();
1480         synced.shutdown_cores.push(core);
1481 
1482         self.shutdown_finalize(handle, &mut synced);
1483     }
1484 
shutdown_finalize(&self, handle: &Handle, synced: &mut Synced)1485     pub(super) fn shutdown_finalize(&self, handle: &Handle, synced: &mut Synced) {
1486         // Wait for all cores
1487         if synced.shutdown_cores.len() != self.remotes.len() {
1488             return;
1489         }
1490 
1491         let driver = synced.shutdown_driver.take();
1492 
1493         if self.driver_enabled() && driver.is_none() {
1494             return;
1495         }
1496 
1497         debug_assert!(self.owned.is_empty());
1498 
1499         for mut core in synced.shutdown_cores.drain(..) {
1500             // Drain tasks from the local queue
1501             while core.next_local_task().is_some() {}
1502         }
1503 
1504         // Shutdown the driver
1505         if let Some(mut driver) = driver {
1506             driver.shutdown(&handle.driver);
1507         }
1508 
1509         // Drain the injection queue
1510         //
1511         // We already shut down every task, so we can simply drop the tasks. We
1512         // cannot call `next_remote_task()` because we already hold the lock.
1513         //
1514         // safety: passing in correct `idle::Synced`
1515         while let Some(task) = self.next_remote_task_synced(synced) {
1516             drop(task);
1517         }
1518     }
1519 }
1520 
1521 impl Overflow<Arc<Handle>> for Shared {
push(&self, task: task::Notified<Arc<Handle>>)1522     fn push(&self, task: task::Notified<Arc<Handle>>) {
1523         self.push_remote_task(&mut self.synced.lock(), task);
1524     }
1525 
push_batch<I>(&self, iter: I) where I: Iterator<Item = task::Notified<Arc<Handle>>>,1526     fn push_batch<I>(&self, iter: I)
1527     where
1528         I: Iterator<Item = task::Notified<Arc<Handle>>>,
1529     {
1530         self.push_remote_task_batch(iter)
1531     }
1532 }
1533 
1534 impl<'a> Lock<inject::Synced> for &'a Shared {
1535     type Handle = SyncedGuard<'a>;
1536 
lock(self) -> Self::Handle1537     fn lock(self) -> Self::Handle {
1538         SyncedGuard {
1539             lock: self.synced.lock(),
1540         }
1541     }
1542 }
1543 
1544 impl<'a> Lock<Synced> for &'a Shared {
1545     type Handle = SyncedGuard<'a>;
1546 
lock(self) -> Self::Handle1547     fn lock(self) -> Self::Handle {
1548         SyncedGuard {
1549             lock: self.synced.lock(),
1550         }
1551     }
1552 }
1553 
1554 impl task::Schedule for Arc<Handle> {
release(&self, task: &Task) -> Option<Task>1555     fn release(&self, task: &Task) -> Option<Task> {
1556         self.shared.owned.remove(task)
1557     }
1558 
schedule(&self, task: Notified)1559     fn schedule(&self, task: Notified) {
1560         self.shared.schedule_task(task, false);
1561     }
1562 
hooks(&self) -> TaskHarnessScheduleHooks1563     fn hooks(&self) -> TaskHarnessScheduleHooks {
1564         TaskHarnessScheduleHooks {
1565             task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
1566         }
1567     }
1568 
yield_now(&self, task: Notified)1569     fn yield_now(&self, task: Notified) {
1570         self.shared.schedule_task(task, true);
1571     }
1572 }
1573 
1574 impl AsMut<Synced> for Synced {
as_mut(&mut self) -> &mut Synced1575     fn as_mut(&mut self) -> &mut Synced {
1576         self
1577     }
1578 }
1579 
1580 pub(crate) struct SyncedGuard<'a> {
1581     lock: crate::loom::sync::MutexGuard<'a, Synced>,
1582 }
1583 
1584 impl<'a> AsMut<inject::Synced> for SyncedGuard<'a> {
as_mut(&mut self) -> &mut inject::Synced1585     fn as_mut(&mut self) -> &mut inject::Synced {
1586         &mut self.lock.inject
1587     }
1588 }
1589 
1590 impl<'a> AsMut<Synced> for SyncedGuard<'a> {
as_mut(&mut self) -> &mut Synced1591     fn as_mut(&mut self) -> &mut Synced {
1592         &mut self.lock
1593     }
1594 }
1595 
1596 #[track_caller]
with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R1597 fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1598     use scheduler::Context::MultiThreadAlt;
1599 
1600     context::with_scheduler(|ctx| match ctx {
1601         Some(MultiThreadAlt(ctx)) => f(Some(ctx)),
1602         _ => f(None),
1603     })
1604 }
1605