1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 //!
7 //! # Shutdown
8 //!
9 //! Shutting down the runtime involves the following steps:
10 //!
11 //! 1. The Shared::close method is called. This closes the inject queue and
12 //! `OwnedTasks` instance and wakes up all worker threads.
13 //!
14 //! 2. Each worker thread observes the close signal next time it runs
15 //! Core::maintenance by checking whether the inject queue is closed.
16 //! The `Core::is_shutdown` flag is set to true.
17 //!
18 //! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19 //! will keep removing tasks from `OwnedTasks` until it is empty. No new
20 //! tasks can be pushed to the `OwnedTasks` during or after this step as it
21 //! was closed in step 1.
22 //!
23 //! 5. The workers call Shared::shutdown to enter the single-threaded phase of
24 //! shutdown. These calls will push their core to `Shared::shutdown_cores`,
25 //! and the last thread to push its core will finish the shutdown procedure.
26 //!
27 //! 6. The local run queue of each core is emptied, then the inject queue is
28 //! emptied.
29 //!
30 //! At this point, shutdown has completed. It is not possible for any of the
31 //! collections to contain any tasks at this point, as each collection was
32 //! closed first, then emptied afterwards.
33 //!
34 //! ## Spawns during shutdown
35 //!
36 //! When spawning tasks during shutdown, there are two cases:
37 //!
38 //! * The spawner observes the `OwnedTasks` being open, and the inject queue is
39 //! closed.
40 //! * The spawner observes the `OwnedTasks` being closed and doesn't check the
41 //! inject queue.
42 //!
43 //! The first case can only happen if the `OwnedTasks::bind` call happens before
44 //! or during step 1 of shutdown. In this case, the runtime will clean up the
45 //! task in step 3 of shutdown.
46 //!
47 //! In the latter case, the task was not spawned and the task is immediately
48 //! cancelled by the spawner.
49 //!
50 //! The correctness of shutdown requires both the inject queue and `OwnedTasks`
51 //! collection to have a closed bit. With a close bit on only the inject queue,
52 //! spawning could run in to a situation where a task is successfully bound long
53 //! after the runtime has shut down. With a close bit on only the `OwnedTasks`,
54 //! the first spawning situation could result in the notification being pushed
55 //! to the inject queue after step 6 of shutdown, which would leave a task in
56 //! the inject queue indefinitely. This would be a ref-count cycle and a memory
57 //! leak.
58
59 use crate::loom::sync::{Arc, Condvar, Mutex, MutexGuard};
60 use crate::runtime;
61 use crate::runtime::driver::Driver;
62 use crate::runtime::scheduler::multi_thread_alt::{
63 idle, queue, stats, Counters, Handle, Idle, Overflow, Stats, TraceStatus,
64 };
65 use crate::runtime::scheduler::{self, inject, Lock};
66 use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
67 use crate::runtime::{blocking, coop, driver, task, Config, SchedulerMetrics, WorkerMetrics};
68 use crate::runtime::{context, TaskHooks};
69 use crate::util::atomic_cell::AtomicCell;
70 use crate::util::rand::{FastRand, RngSeedGenerator};
71
72 use std::cell::{Cell, RefCell};
73 use std::task::Waker;
74 use std::time::Duration;
75 use std::{cmp, thread};
76
77 cfg_unstable_metrics! {
78 mod metrics;
79 }
80
81 mod taskdump_mock;
82
83 /// A scheduler worker
84 ///
85 /// Data is stack-allocated and never migrates threads
86 pub(super) struct Worker {
87 /// Used to schedule bookkeeping tasks every so often.
88 tick: u32,
89
90 /// True if the scheduler is being shutdown
91 pub(super) is_shutdown: bool,
92
93 /// True if the scheduler is being traced
94 is_traced: bool,
95
96 /// Counter used to track when to poll from the local queue vs. the
97 /// injection queue
98 num_seq_local_queue_polls: u32,
99
100 /// How often to check the global queue
101 global_queue_interval: u32,
102
103 /// Used to collect a list of workers to notify
104 workers_to_notify: Vec<usize>,
105
106 /// Snapshot of idle core list. This helps speedup stealing
107 idle_snapshot: idle::Snapshot,
108
109 stats: stats::Ephemeral,
110 }
111
112 /// Core data
113 ///
114 /// Data is heap-allocated and migrates threads.
115 #[repr(align(128))]
116 pub(super) struct Core {
117 /// Index holding this core's remote/shared state.
118 pub(super) index: usize,
119
120 lifo_slot: Option<Notified>,
121
122 /// The worker-local run queue.
123 run_queue: queue::Local<Arc<Handle>>,
124
125 /// True if the worker is currently searching for more work. Searching
126 /// involves attempting to steal from other workers.
127 pub(super) is_searching: bool,
128
129 /// Per-worker runtime stats
130 stats: Stats,
131
132 /// Fast random number generator.
133 rand: FastRand,
134 }
135
136 /// State shared across all workers
137 pub(crate) struct Shared {
138 /// Per-core remote state.
139 remotes: Box<[Remote]>,
140
141 /// Global task queue used for:
142 /// 1. Submit work to the scheduler while **not** currently on a worker thread.
143 /// 2. Submit work to the scheduler when a worker run queue is saturated
144 pub(super) inject: inject::Shared<Arc<Handle>>,
145
146 /// Coordinates idle workers
147 idle: Idle,
148
149 /// Collection of all active tasks spawned onto this executor.
150 pub(super) owned: OwnedTasks<Arc<Handle>>,
151
152 /// Data synchronized by the scheduler mutex
153 pub(super) synced: Mutex<Synced>,
154
155 /// Power's Tokio's I/O, timers, etc... the responsibility of polling the
156 /// driver is shared across workers.
157 driver: AtomicCell<Driver>,
158
159 /// Condition variables used to unblock worker threads. Each worker thread
160 /// has its own `condvar` it waits on.
161 pub(super) condvars: Vec<Condvar>,
162
163 /// The number of cores that have observed the trace signal.
164 pub(super) trace_status: TraceStatus,
165
166 /// Scheduler configuration options
167 config: Config,
168
169 /// Collects metrics from the runtime.
170 pub(super) scheduler_metrics: SchedulerMetrics,
171
172 pub(super) worker_metrics: Box<[WorkerMetrics]>,
173
174 /// Only held to trigger some code on drop. This is used to get internal
175 /// runtime metrics that can be useful when doing performance
176 /// investigations. This does nothing (empty struct, no drop impl) unless
177 /// the `tokio_internal_mt_counters` `cfg` flag is set.
178 _counters: Counters,
179 }
180
181 /// Data synchronized by the scheduler mutex
182 pub(crate) struct Synced {
183 /// When worker is notified, it is assigned a core. The core is placed here
184 /// until the worker wakes up to take it.
185 pub(super) assigned_cores: Vec<Option<Box<Core>>>,
186
187 /// Cores that have observed the shutdown signal
188 ///
189 /// The core is **not** placed back in the worker to avoid it from being
190 /// stolen by a thread that was spawned as part of `block_in_place`.
191 shutdown_cores: Vec<Box<Core>>,
192
193 /// The driver goes here when shutting down
194 shutdown_driver: Option<Box<Driver>>,
195
196 /// Synchronized state for `Idle`.
197 pub(super) idle: idle::Synced,
198
199 /// Synchronized state for `Inject`.
200 pub(crate) inject: inject::Synced,
201 }
202
203 /// Used to communicate with a worker from other threads.
204 struct Remote {
205 /// When a task is scheduled from a worker, it is stored in this slot. The
206 /// worker will check this slot for a task **before** checking the run
207 /// queue. This effectively results in the **last** scheduled task to be run
208 /// next (LIFO). This is an optimization for improving locality which
209 /// benefits message passing patterns and helps to reduce latency.
210 // lifo_slot: Lifo,
211
212 /// Steals tasks from this worker.
213 pub(super) steal: queue::Steal<Arc<Handle>>,
214 }
215
216 /// Thread-local context
217 pub(crate) struct Context {
218 // Current scheduler's handle
219 handle: Arc<Handle>,
220
221 /// Worker index
222 index: usize,
223
224 /// True when the LIFO slot is enabled
225 lifo_enabled: Cell<bool>,
226
227 /// Core data
228 core: RefCell<Option<Box<Core>>>,
229
230 /// Used to pass cores to other threads when `block_in_place` is called
231 handoff_core: Arc<AtomicCell<Core>>,
232
233 /// Tasks to wake after resource drivers are polled. This is mostly to
234 /// handle yielded tasks.
235 pub(crate) defer: RefCell<Vec<Notified>>,
236 }
237
238 /// Running a task may consume the core. If the core is still available when
239 /// running the task completes, it is returned. Otherwise, the worker will need
240 /// to stop processing.
241 type RunResult = Result<Box<Core>, ()>;
242 type NextTaskResult = Result<(Option<Notified>, Box<Core>), ()>;
243
244 /// A task handle
245 type Task = task::Task<Arc<Handle>>;
246
247 /// A notified task handle
248 type Notified = task::Notified<Arc<Handle>>;
249
250 /// Value picked out of thin-air. Running the LIFO slot a handful of times
251 /// seems sufficient to benefit from locality. More than 3 times probably is
252 /// overweighing. The value can be tuned in the future with data that shows
253 /// improvements.
254 const MAX_LIFO_POLLS_PER_TICK: usize = 3;
255
create( num_cores: usize, driver: Driver, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, ) -> runtime::Handle256 pub(super) fn create(
257 num_cores: usize,
258 driver: Driver,
259 driver_handle: driver::Handle,
260 blocking_spawner: blocking::Spawner,
261 seed_generator: RngSeedGenerator,
262 config: Config,
263 ) -> runtime::Handle {
264 let mut num_workers = num_cores;
265
266 // If the driver is enabled, we need an extra thread to handle polling the
267 // driver when all cores are busy.
268 if driver.is_enabled() {
269 num_workers += 1;
270 }
271
272 let mut cores = Vec::with_capacity(num_cores);
273 let mut remotes = Vec::with_capacity(num_cores);
274 // Worker metrics are actually core based
275 let mut worker_metrics = Vec::with_capacity(num_cores);
276
277 // Create the local queues
278 for i in 0..num_cores {
279 let (steal, run_queue) = queue::local(config.local_queue_capacity);
280
281 let metrics = WorkerMetrics::from_config(&config);
282 let stats = Stats::new(&metrics);
283
284 cores.push(Box::new(Core {
285 index: i,
286 lifo_slot: None,
287 run_queue,
288 is_searching: false,
289 stats,
290 rand: FastRand::from_seed(config.seed_generator.next_seed()),
291 }));
292
293 remotes.push(Remote {
294 steal,
295 // lifo_slot: Lifo::new(),
296 });
297 worker_metrics.push(metrics);
298 }
299
300 // Allocate num-cores + 1 workers, so one worker can handle the I/O driver,
301 // if needed.
302 let (idle, idle_synced) = Idle::new(cores, num_workers);
303 let (inject, inject_synced) = inject::Shared::new();
304
305 let handle = Arc::new(Handle {
306 task_hooks: TaskHooks {
307 task_spawn_callback: config.before_spawn.clone(),
308 task_terminate_callback: config.after_termination.clone(),
309 },
310 shared: Shared {
311 remotes: remotes.into_boxed_slice(),
312 inject,
313 idle,
314 owned: OwnedTasks::new(num_cores),
315 synced: Mutex::new(Synced {
316 assigned_cores: (0..num_workers).map(|_| None).collect(),
317 shutdown_cores: Vec::with_capacity(num_cores),
318 shutdown_driver: None,
319 idle: idle_synced,
320 inject: inject_synced,
321 }),
322 driver: AtomicCell::new(Some(Box::new(driver))),
323 condvars: (0..num_workers).map(|_| Condvar::new()).collect(),
324 trace_status: TraceStatus::new(num_cores),
325 config,
326 scheduler_metrics: SchedulerMetrics::new(),
327 worker_metrics: worker_metrics.into_boxed_slice(),
328 _counters: Counters,
329 },
330 driver: driver_handle,
331 blocking_spawner,
332 seed_generator,
333 });
334
335 let rt_handle = runtime::Handle {
336 inner: scheduler::Handle::MultiThreadAlt(handle),
337 };
338
339 // Eagerly start worker threads
340 for index in 0..num_workers {
341 let handle = rt_handle.inner.expect_multi_thread_alt();
342 let h2 = handle.clone();
343 let handoff_core = Arc::new(AtomicCell::new(None));
344
345 handle
346 .blocking_spawner
347 .spawn_blocking(&rt_handle, move || run(index, h2, handoff_core, false));
348 }
349
350 rt_handle
351 }
352
353 #[track_caller]
block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R,354 pub(crate) fn block_in_place<F, R>(f: F) -> R
355 where
356 F: FnOnce() -> R,
357 {
358 // Try to steal the worker core back
359 struct Reset(coop::Budget);
360
361 impl Drop for Reset {
362 fn drop(&mut self) {
363 with_current(|maybe_cx| {
364 if let Some(cx) = maybe_cx {
365 let core = cx.handoff_core.take();
366 let mut cx_core = cx.core.borrow_mut();
367 assert!(cx_core.is_none());
368 *cx_core = core;
369
370 // Reset the task budget as we are re-entering the
371 // runtime.
372 coop::set(self.0);
373 }
374 });
375 }
376 }
377
378 let mut had_entered = false;
379
380 let setup_result = with_current(|maybe_cx| {
381 match (
382 crate::runtime::context::current_enter_context(),
383 maybe_cx.is_some(),
384 ) {
385 (context::EnterRuntime::Entered { .. }, true) => {
386 // We are on a thread pool runtime thread, so we just need to
387 // set up blocking.
388 had_entered = true;
389 }
390 (
391 context::EnterRuntime::Entered {
392 allow_block_in_place,
393 },
394 false,
395 ) => {
396 // We are on an executor, but _not_ on the thread pool. That is
397 // _only_ okay if we are in a thread pool runtime's block_on
398 // method:
399 if allow_block_in_place {
400 had_entered = true;
401 return Ok(());
402 } else {
403 // This probably means we are on the current_thread runtime or in a
404 // LocalSet, where it is _not_ okay to block.
405 return Err(
406 "can call blocking only when running on the multi-threaded runtime",
407 );
408 }
409 }
410 (context::EnterRuntime::NotEntered, true) => {
411 // This is a nested call to block_in_place (we already exited).
412 // All the necessary setup has already been done.
413 return Ok(());
414 }
415 (context::EnterRuntime::NotEntered, false) => {
416 // We are outside of the tokio runtime, so blocking is fine.
417 // We can also skip all of the thread pool blocking setup steps.
418 return Ok(());
419 }
420 }
421
422 let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
423
424 // Get the worker core. If none is set, then blocking is fine!
425 let core = match cx.core.borrow_mut().take() {
426 Some(core) => core,
427 None => return Ok(()),
428 };
429
430 // In order to block, the core must be sent to another thread for
431 // execution.
432 //
433 // First, move the core back into the worker's shared core slot.
434 cx.handoff_core.set(core);
435
436 // Next, clone the worker handle and send it to a new thread for
437 // processing.
438 //
439 // Once the blocking task is done executing, we will attempt to
440 // steal the core back.
441 let index = cx.index;
442 let handle = cx.handle.clone();
443 let handoff_core = cx.handoff_core.clone();
444 runtime::spawn_blocking(move || run(index, handle, handoff_core, true));
445 Ok(())
446 });
447
448 if let Err(panic_message) = setup_result {
449 panic!("{}", panic_message);
450 }
451
452 if had_entered {
453 // Unset the current task's budget. Blocking sections are not
454 // constrained by task budgets.
455 let _reset = Reset(coop::stop());
456
457 crate::runtime::context::exit_runtime(f)
458 } else {
459 f()
460 }
461 }
462
run( index: usize, handle: Arc<Handle>, handoff_core: Arc<AtomicCell<Core>>, blocking_in_place: bool, )463 fn run(
464 index: usize,
465 handle: Arc<Handle>,
466 handoff_core: Arc<AtomicCell<Core>>,
467 blocking_in_place: bool,
468 ) {
469 struct AbortOnPanic;
470
471 impl Drop for AbortOnPanic {
472 fn drop(&mut self) {
473 if std::thread::panicking() {
474 eprintln!("worker thread panicking; aborting process");
475 std::process::abort();
476 }
477 }
478 }
479
480 // Catching panics on worker threads in tests is quite tricky. Instead, when
481 // debug assertions are enabled, we just abort the process.
482 #[cfg(debug_assertions)]
483 let _abort_on_panic = AbortOnPanic;
484
485 let num_workers = handle.shared.condvars.len();
486
487 let mut worker = Worker {
488 tick: 0,
489 num_seq_local_queue_polls: 0,
490 global_queue_interval: Stats::DEFAULT_GLOBAL_QUEUE_INTERVAL,
491 is_shutdown: false,
492 is_traced: false,
493 workers_to_notify: Vec::with_capacity(num_workers - 1),
494 idle_snapshot: idle::Snapshot::new(&handle.shared.idle),
495 stats: stats::Ephemeral::new(),
496 };
497
498 let sched_handle = scheduler::Handle::MultiThreadAlt(handle.clone());
499
500 crate::runtime::context::enter_runtime(&sched_handle, true, |_| {
501 // Set the worker context.
502 let cx = scheduler::Context::MultiThreadAlt(Context {
503 index,
504 lifo_enabled: Cell::new(!handle.shared.config.disable_lifo_slot),
505 handle,
506 core: RefCell::new(None),
507 handoff_core,
508 defer: RefCell::new(Vec::with_capacity(64)),
509 });
510
511 context::set_scheduler(&cx, || {
512 let cx = cx.expect_multi_thread_alt();
513
514 // Run the worker
515 let res = worker.run(&cx, blocking_in_place);
516 // `err` here signifies the core was lost, this is an expected end
517 // state for a worker.
518 debug_assert!(res.is_err());
519
520 // Check if there are any deferred tasks to notify. This can happen when
521 // the worker core is lost due to `block_in_place()` being called from
522 // within the task.
523 if !cx.defer.borrow().is_empty() {
524 worker.schedule_deferred_without_core(&cx, &mut cx.shared().synced.lock());
525 }
526 });
527 });
528 }
529
530 macro_rules! try_task {
531 ($e:expr) => {{
532 let (task, core) = $e?;
533 if task.is_some() {
534 return Ok((task, core));
535 }
536 core
537 }};
538 }
539
540 macro_rules! try_task_new_batch {
541 ($w:expr, $e:expr) => {{
542 let (task, mut core) = $e?;
543 if task.is_some() {
544 core.stats.start_processing_scheduled_tasks(&mut $w.stats);
545 return Ok((task, core));
546 }
547 core
548 }};
549 }
550
551 impl Worker {
run(&mut self, cx: &Context, blocking_in_place: bool) -> RunResult552 fn run(&mut self, cx: &Context, blocking_in_place: bool) -> RunResult {
553 let (maybe_task, mut core) = {
554 if blocking_in_place {
555 if let Some(core) = cx.handoff_core.take() {
556 (None, core)
557 } else {
558 // Just shutdown
559 return Err(());
560 }
561 } else {
562 let mut synced = cx.shared().synced.lock();
563
564 // First try to acquire an available core
565 if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
566 // Try to poll a task from the global queue
567 let maybe_task = cx.shared().next_remote_task_synced(&mut synced);
568 (maybe_task, core)
569 } else {
570 // block the thread to wait for a core to be assigned to us
571 self.wait_for_core(cx, synced)?
572 }
573 }
574 };
575
576 cx.shared().worker_metrics[core.index].set_thread_id(thread::current().id());
577 core.stats.start_processing_scheduled_tasks(&mut self.stats);
578
579 if let Some(task) = maybe_task {
580 core = self.run_task(cx, core, task)?;
581 }
582
583 while !self.is_shutdown {
584 let (maybe_task, c) = self.next_task(cx, core)?;
585 core = c;
586
587 if let Some(task) = maybe_task {
588 core = self.run_task(cx, core, task)?;
589 } else {
590 // The only reason to get `None` from `next_task` is we have
591 // entered the shutdown phase.
592 assert!(self.is_shutdown);
593 break;
594 }
595 }
596
597 cx.shared().shutdown_core(&cx.handle, core);
598
599 // It is possible that tasks wake others during drop, so we need to
600 // clear the defer list.
601 self.shutdown_clear_defer(cx);
602
603 Err(())
604 }
605
606 // Try to acquire an available core, but do not block the thread
try_acquire_available_core( &mut self, cx: &Context, synced: &mut Synced, ) -> Option<Box<Core>>607 fn try_acquire_available_core(
608 &mut self,
609 cx: &Context,
610 synced: &mut Synced,
611 ) -> Option<Box<Core>> {
612 if let Some(mut core) = cx
613 .shared()
614 .idle
615 .try_acquire_available_core(&mut synced.idle)
616 {
617 self.reset_acquired_core(cx, synced, &mut core);
618 Some(core)
619 } else {
620 None
621 }
622 }
623
624 // Block the current thread, waiting for an available core
wait_for_core( &mut self, cx: &Context, mut synced: MutexGuard<'_, Synced>, ) -> NextTaskResult625 fn wait_for_core(
626 &mut self,
627 cx: &Context,
628 mut synced: MutexGuard<'_, Synced>,
629 ) -> NextTaskResult {
630 if cx.shared().idle.needs_searching() {
631 if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
632 cx.shared().idle.transition_worker_to_searching(&mut core);
633 return Ok((None, core));
634 }
635 }
636
637 cx.shared()
638 .idle
639 .transition_worker_to_parked(&mut synced, cx.index);
640
641 // Wait until a core is available, then exit the loop.
642 let mut core = loop {
643 if let Some(core) = synced.assigned_cores[cx.index].take() {
644 break core;
645 }
646
647 // If shutting down, abort
648 if cx.shared().inject.is_closed(&synced.inject) {
649 self.shutdown_clear_defer(cx);
650 return Err(());
651 }
652
653 synced = cx.shared().condvars[cx.index].wait(synced).unwrap();
654 };
655
656 self.reset_acquired_core(cx, &mut synced, &mut core);
657
658 if self.is_shutdown {
659 // Currently shutting down, don't do any more work
660 return Ok((None, core));
661 }
662
663 let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
664 let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n);
665
666 core.stats.unparked();
667 self.flush_metrics(cx, &mut core);
668
669 Ok((maybe_task, core))
670 }
671
672 /// Ensure core's state is set correctly for the worker to start using.
reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core)673 fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) {
674 self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config);
675
676 // Reset `lifo_enabled` here in case the core was previously stolen from
677 // a task that had the LIFO slot disabled.
678 self.reset_lifo_enabled(cx);
679
680 // At this point, the local queue should be empty
681 #[cfg(not(loom))]
682 debug_assert!(core.run_queue.is_empty());
683
684 // Update shutdown state while locked
685 self.update_global_flags(cx, synced);
686 }
687
688 /// Finds the next task to run, this could be from a queue or stealing. If
689 /// none are available, the thread sleeps and tries again.
next_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult690 fn next_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
691 self.assert_lifo_enabled_is_correct(cx);
692
693 if self.is_traced {
694 core = cx.handle.trace_core(core);
695 }
696
697 // Increment the tick
698 self.tick = self.tick.wrapping_add(1);
699
700 // Runs maintenance every so often. When maintenance is run, the
701 // driver is checked, which may result in a task being found.
702 core = try_task!(self.maybe_maintenance(&cx, core));
703
704 // Check the LIFO slot, local run queue, and the injection queue for
705 // a notified task.
706 core = try_task!(self.next_notified_task(cx, core));
707
708 // We consumed all work in the queues and will start searching for work.
709 core.stats.end_processing_scheduled_tasks(&mut self.stats);
710
711 super::counters::inc_num_no_local_work();
712
713 if !cx.defer.borrow().is_empty() {
714 // We are deferring tasks, so poll the resource driver and schedule
715 // the deferred tasks.
716 try_task_new_batch!(self, self.park_yield(cx, core));
717
718 panic!("what happened to the deferred tasks? ");
719 }
720
721 while !self.is_shutdown {
722 // Search for more work, this involves trying to poll the resource
723 // driver, steal from other workers, and check the global queue
724 // again.
725 core = try_task_new_batch!(self, self.search_for_work(cx, core));
726
727 debug_assert!(cx.defer.borrow().is_empty());
728 core = try_task_new_batch!(self, self.park(cx, core));
729 }
730
731 // Shutting down, drop any deferred tasks
732 self.shutdown_clear_defer(cx);
733
734 Ok((None, core))
735 }
736
next_notified_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult737 fn next_notified_task(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
738 self.num_seq_local_queue_polls += 1;
739
740 if self.num_seq_local_queue_polls % self.global_queue_interval == 0 {
741 super::counters::inc_global_queue_interval();
742
743 self.num_seq_local_queue_polls = 0;
744
745 // Update the global queue interval, if needed
746 self.tune_global_queue_interval(cx, &mut core);
747
748 if let Some(task) = self.next_remote_task(cx) {
749 return Ok((Some(task), core));
750 }
751 }
752
753 if let Some(task) = core.next_local_task() {
754 return Ok((Some(task), core));
755 }
756
757 self.next_remote_task_batch(cx, core)
758 }
759
next_remote_task(&self, cx: &Context) -> Option<Notified>760 fn next_remote_task(&self, cx: &Context) -> Option<Notified> {
761 if cx.shared().inject.is_empty() {
762 return None;
763 }
764
765 let mut synced = cx.shared().synced.lock();
766 cx.shared().next_remote_task_synced(&mut synced)
767 }
768
next_remote_task_batch(&self, cx: &Context, mut core: Box<Core>) -> NextTaskResult769 fn next_remote_task_batch(&self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
770 if cx.shared().inject.is_empty() {
771 return Ok((None, core));
772 }
773
774 // Other threads can only **remove** tasks from the current worker's
775 // `run_queue`. So, we can be confident that by the time we call
776 // `run_queue.push_back` below, there will be *at least* `cap`
777 // available slots in the queue.
778 let cap = usize::min(
779 core.run_queue.remaining_slots(),
780 usize::max(core.run_queue.max_capacity() / 2, 1),
781 );
782
783 let mut synced = cx.shared().synced.lock();
784 let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, cap);
785 Ok((maybe_task, core))
786 }
787
next_remote_task_batch_synced( &self, cx: &Context, synced: &mut Synced, core: &mut Core, max: usize, ) -> Option<Notified>788 fn next_remote_task_batch_synced(
789 &self,
790 cx: &Context,
791 synced: &mut Synced,
792 core: &mut Core,
793 max: usize,
794 ) -> Option<Notified> {
795 super::counters::inc_num_remote_batch();
796
797 // The worker is currently idle, pull a batch of work from the
798 // injection queue. We don't want to pull *all* the work so other
799 // workers can also get some.
800 let n = if core.is_searching {
801 cx.shared().inject.len() / cx.shared().idle.num_searching() + 1
802 } else {
803 cx.shared().inject.len() / cx.shared().remotes.len() + 1
804 };
805
806 let n = usize::min(n, max) + 1;
807
808 // safety: passing in the correct `inject::Synced`.
809 let mut tasks = unsafe { cx.shared().inject.pop_n(&mut synced.inject, n) };
810
811 // Pop the first task to return immediately
812 let ret = tasks.next();
813
814 // Push the rest of the on the run queue
815 core.run_queue.push_back(tasks);
816
817 ret
818 }
819
820 /// Function responsible for stealing tasks from another worker
821 ///
822 /// Note: Only if less than half the workers are searching for tasks to steal
823 /// a new worker will actually try to steal. The idea is to make sure not all
824 /// workers will be trying to steal at the same time.
search_for_work(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult825 fn search_for_work(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
826 #[cfg(not(loom))]
827 const ROUNDS: usize = 4;
828
829 #[cfg(loom)]
830 const ROUNDS: usize = 1;
831
832 debug_assert!(core.lifo_slot.is_none());
833 #[cfg(not(loom))]
834 debug_assert!(core.run_queue.is_empty());
835
836 if !core.run_queue.can_steal() {
837 return Ok((None, core));
838 }
839
840 if !self.transition_to_searching(cx, &mut core) {
841 return Ok((None, core));
842 }
843
844 // core = try_task!(self, self.poll_driver(cx, core));
845
846 // Get a snapshot of which workers are idle
847 cx.shared().idle.snapshot(&mut self.idle_snapshot);
848
849 let num = cx.shared().remotes.len();
850
851 for i in 0..ROUNDS {
852 // Start from a random worker
853 let start = core.rand.fastrand_n(num as u32) as usize;
854
855 if let Some(task) = self.steal_one_round(cx, &mut core, start) {
856 return Ok((Some(task), core));
857 }
858
859 core = try_task!(self.next_remote_task_batch(cx, core));
860
861 if i > 0 {
862 super::counters::inc_num_spin_stall();
863 std::thread::sleep(std::time::Duration::from_micros(i as u64));
864 }
865 }
866
867 Ok((None, core))
868 }
869
steal_one_round(&self, cx: &Context, core: &mut Core, start: usize) -> Option<Notified>870 fn steal_one_round(&self, cx: &Context, core: &mut Core, start: usize) -> Option<Notified> {
871 let num = cx.shared().remotes.len();
872
873 for i in 0..num {
874 let i = (start + i) % num;
875
876 // Don't steal from ourself! We know we don't have work.
877 if i == core.index {
878 continue;
879 }
880
881 // If the core is currently idle, then there is nothing to steal.
882 if self.idle_snapshot.is_idle(i) {
883 continue;
884 }
885
886 let target = &cx.shared().remotes[i];
887
888 if let Some(task) = target
889 .steal
890 .steal_into(&mut core.run_queue, &mut core.stats)
891 {
892 return Some(task);
893 }
894 }
895
896 None
897 }
898
run_task(&mut self, cx: &Context, mut core: Box<Core>, task: Notified) -> RunResult899 fn run_task(&mut self, cx: &Context, mut core: Box<Core>, task: Notified) -> RunResult {
900 let task = cx.shared().owned.assert_owner(task);
901
902 // Make sure the worker is not in the **searching** state. This enables
903 // another idle worker to try to steal work.
904 if self.transition_from_searching(cx, &mut core) {
905 super::counters::inc_num_relay_search();
906 cx.shared().notify_parked_local();
907 }
908
909 self.assert_lifo_enabled_is_correct(cx);
910
911 // Measure the poll start time. Note that we may end up polling other
912 // tasks under this measurement. In this case, the tasks came from the
913 // LIFO slot and are considered part of the current task for scheduling
914 // purposes. These tasks inherent the "parent"'s limits.
915 core.stats.start_poll(&mut self.stats);
916
917 // Make the core available to the runtime context
918 *cx.core.borrow_mut() = Some(core);
919
920 // Run the task
921 coop::budget(|| {
922 super::counters::inc_num_polls();
923 task.run();
924 let mut lifo_polls = 0;
925
926 // As long as there is budget remaining and a task exists in the
927 // `lifo_slot`, then keep running.
928 loop {
929 // Check if we still have the core. If not, the core was stolen
930 // by another worker.
931 let mut core = match cx.core.borrow_mut().take() {
932 Some(core) => core,
933 None => {
934 // In this case, we cannot call `reset_lifo_enabled()`
935 // because the core was stolen. The stealer will handle
936 // that at the top of `Context::run`
937 return Err(());
938 }
939 };
940
941 // Check for a task in the LIFO slot
942 let task = match core.next_lifo_task() {
943 Some(task) => task,
944 None => {
945 self.reset_lifo_enabled(cx);
946 core.stats.end_poll();
947 return Ok(core);
948 }
949 };
950
951 if !coop::has_budget_remaining() {
952 core.stats.end_poll();
953
954 // Not enough budget left to run the LIFO task, push it to
955 // the back of the queue and return.
956 core.run_queue
957 .push_back_or_overflow(task, cx.shared(), &mut core.stats);
958 // If we hit this point, the LIFO slot should be enabled.
959 // There is no need to reset it.
960 debug_assert!(cx.lifo_enabled.get());
961 return Ok(core);
962 }
963
964 // Track that we are about to run a task from the LIFO slot.
965 lifo_polls += 1;
966 super::counters::inc_lifo_schedules();
967
968 // Disable the LIFO slot if we reach our limit
969 //
970 // In ping-ping style workloads where task A notifies task B,
971 // which notifies task A again, continuously prioritizing the
972 // LIFO slot can cause starvation as these two tasks will
973 // repeatedly schedule the other. To mitigate this, we limit the
974 // number of times the LIFO slot is prioritized.
975 if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
976 cx.lifo_enabled.set(false);
977 super::counters::inc_lifo_capped();
978 }
979
980 // Run the LIFO task, then loop
981 *cx.core.borrow_mut() = Some(core);
982 let task = cx.shared().owned.assert_owner(task);
983 super::counters::inc_num_lifo_polls();
984 task.run();
985 }
986 })
987 }
988
schedule_deferred_with_core<'a>( &mut self, cx: &'a Context, mut core: Box<Core>, synced: impl FnOnce() -> MutexGuard<'a, Synced>, ) -> NextTaskResult989 fn schedule_deferred_with_core<'a>(
990 &mut self,
991 cx: &'a Context,
992 mut core: Box<Core>,
993 synced: impl FnOnce() -> MutexGuard<'a, Synced>,
994 ) -> NextTaskResult {
995 let mut defer = cx.defer.borrow_mut();
996
997 // Grab a task to run next
998 let task = defer.pop();
999
1000 if task.is_none() {
1001 return Ok((None, core));
1002 }
1003
1004 if !defer.is_empty() {
1005 let mut synced = synced();
1006
1007 // Number of tasks we want to try to spread across idle workers
1008 let num_fanout = cmp::min(defer.len(), cx.shared().idle.num_idle(&synced.idle));
1009
1010 // Cap the number of threads woken up at one time. This is to limit
1011 // the number of no-op wakes and reduce mutext contention.
1012 //
1013 // This number was picked after some basic benchmarks, but it can
1014 // probably be tuned using the mean poll time value (slower task
1015 // polls can leverage more woken workers).
1016 let num_fanout = cmp::min(2, num_fanout);
1017
1018 if num_fanout > 0 {
1019 cx.shared()
1020 .push_remote_task_batch_synced(&mut synced, defer.drain(..num_fanout));
1021
1022 cx.shared()
1023 .idle
1024 .notify_mult(&mut synced, &mut self.workers_to_notify, num_fanout);
1025 }
1026
1027 // Do not run the task while holding the lock...
1028 drop(synced);
1029 }
1030
1031 // Notify any workers
1032 for worker in self.workers_to_notify.drain(..) {
1033 cx.shared().condvars[worker].notify_one()
1034 }
1035
1036 if !defer.is_empty() {
1037 // Push the rest of the tasks on the local queue
1038 for task in defer.drain(..) {
1039 core.run_queue
1040 .push_back_or_overflow(task, cx.shared(), &mut core.stats);
1041 }
1042
1043 cx.shared().notify_parked_local();
1044 }
1045
1046 Ok((task, core))
1047 }
1048
schedule_deferred_without_core<'a>(&mut self, cx: &Context, synced: &mut Synced)1049 fn schedule_deferred_without_core<'a>(&mut self, cx: &Context, synced: &mut Synced) {
1050 let mut defer = cx.defer.borrow_mut();
1051 let num = defer.len();
1052
1053 if num > 0 {
1054 // Push all tasks to the injection queue
1055 cx.shared()
1056 .push_remote_task_batch_synced(synced, defer.drain(..));
1057
1058 debug_assert!(self.workers_to_notify.is_empty());
1059
1060 // Notify workers
1061 cx.shared()
1062 .idle
1063 .notify_mult(synced, &mut self.workers_to_notify, num);
1064
1065 // Notify any workers
1066 for worker in self.workers_to_notify.drain(..) {
1067 cx.shared().condvars[worker].notify_one()
1068 }
1069 }
1070 }
1071
maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult1072 fn maybe_maintenance(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1073 if self.tick % cx.shared().config.event_interval == 0 {
1074 super::counters::inc_num_maintenance();
1075
1076 core.stats.end_processing_scheduled_tasks(&mut self.stats);
1077
1078 // Run regularly scheduled maintenance
1079 core = try_task_new_batch!(self, self.park_yield(cx, core));
1080
1081 core.stats.start_processing_scheduled_tasks(&mut self.stats);
1082 }
1083
1084 Ok((None, core))
1085 }
1086
flush_metrics(&self, cx: &Context, core: &mut Core)1087 fn flush_metrics(&self, cx: &Context, core: &mut Core) {
1088 core.stats.submit(&cx.shared().worker_metrics[core.index]);
1089 }
1090
update_global_flags(&mut self, cx: &Context, synced: &mut Synced)1091 fn update_global_flags(&mut self, cx: &Context, synced: &mut Synced) {
1092 if !self.is_shutdown {
1093 self.is_shutdown = cx.shared().inject.is_closed(&synced.inject);
1094 }
1095
1096 if !self.is_traced {
1097 self.is_traced = cx.shared().trace_status.trace_requested();
1098 }
1099 }
1100
park_yield(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult1101 fn park_yield(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult {
1102 // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
1103 // to run without actually putting the thread to sleep.
1104 if let Some(mut driver) = cx.shared().driver.take() {
1105 driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
1106
1107 cx.shared().driver.set(driver);
1108 }
1109
1110 // If there are more I/O events, schedule them.
1111 let (maybe_task, mut core) =
1112 self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())?;
1113
1114 self.flush_metrics(cx, &mut core);
1115 self.update_global_flags(cx, &mut cx.shared().synced.lock());
1116
1117 Ok((maybe_task, core))
1118 }
1119
1120 /*
1121 fn poll_driver(&mut self, cx: &Context, core: Box<Core>) -> NextTaskResult {
1122 // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
1123 // to run without actually putting the thread to sleep.
1124 if let Some(mut driver) = cx.shared().driver.take() {
1125 driver.park_timeout(&cx.handle.driver, Duration::from_millis(0));
1126
1127 cx.shared().driver.set(driver);
1128
1129 // If there are more I/O events, schedule them.
1130 self.schedule_deferred_with_core(cx, core, || cx.shared().synced.lock())
1131 } else {
1132 Ok((None, core))
1133 }
1134 }
1135 */
1136
park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult1137 fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1138 if let Some(f) = &cx.shared().config.before_park {
1139 f();
1140 }
1141
1142 if self.can_transition_to_parked(&mut core) {
1143 debug_assert!(!self.is_shutdown);
1144 debug_assert!(!self.is_traced);
1145
1146 core = try_task!(self.do_park(cx, core));
1147 }
1148
1149 if let Some(f) = &cx.shared().config.after_unpark {
1150 f();
1151 }
1152
1153 Ok((None, core))
1154 }
1155
do_park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult1156 fn do_park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
1157 let was_searching = core.is_searching;
1158
1159 // Acquire the lock
1160 let mut synced = cx.shared().synced.lock();
1161
1162 // The local queue should be empty at this point
1163 #[cfg(not(loom))]
1164 debug_assert!(core.run_queue.is_empty());
1165
1166 // Try one last time to get tasks
1167 let n = cmp::max(core.run_queue.remaining_slots() / 2, 1);
1168 if let Some(task) = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n) {
1169 return Ok((Some(task), core));
1170 }
1171
1172 if !was_searching {
1173 if cx
1174 .shared()
1175 .idle
1176 .transition_worker_to_searching_if_needed(&mut synced.idle, &mut core)
1177 {
1178 // Skip parking, go back to searching
1179 return Ok((None, core));
1180 }
1181 }
1182
1183 super::counters::inc_num_parks();
1184 core.stats.about_to_park();
1185 // Flush metrics to the runtime metrics aggregator
1186 self.flush_metrics(cx, &mut core);
1187
1188 // If the runtime is shutdown, skip parking
1189 self.update_global_flags(cx, &mut synced);
1190
1191 if self.is_shutdown {
1192 return Ok((None, core));
1193 }
1194
1195 // Release the core
1196 core.is_searching = false;
1197 cx.shared().idle.release_core(&mut synced, core);
1198
1199 drop(synced);
1200
1201 if was_searching {
1202 if cx.shared().idle.transition_worker_from_searching() {
1203 // cx.shared().idle.snapshot(&mut self.idle_snapshot);
1204 // We were the last searching worker, we need to do one last check
1205 for i in 0..cx.shared().remotes.len() {
1206 if !cx.shared().remotes[i].steal.is_empty() {
1207 let mut synced = cx.shared().synced.lock();
1208
1209 // Try to get a core
1210 if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) {
1211 cx.shared().idle.transition_worker_to_searching(&mut core);
1212 return Ok((None, core));
1213 } else {
1214 // Fall back to the park routine
1215 break;
1216 }
1217 }
1218 }
1219 }
1220 }
1221
1222 if let Some(mut driver) = cx.shared().take_driver() {
1223 // Wait for driver events
1224 driver.park(&cx.handle.driver);
1225
1226 synced = cx.shared().synced.lock();
1227
1228 if cx.shared().inject.is_closed(&mut synced.inject) {
1229 synced.shutdown_driver = Some(driver);
1230 self.shutdown_clear_defer(cx);
1231 cx.shared().shutdown_finalize(&cx.handle, &mut synced);
1232 return Err(());
1233 }
1234
1235 // Put the driver back
1236 cx.shared().driver.set(driver);
1237
1238 // Try to acquire an available core to schedule I/O events
1239 if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
1240 // This may result in a task being run
1241 self.schedule_deferred_with_core(cx, core, move || synced)
1242 } else {
1243 // Schedule any deferred tasks
1244 self.schedule_deferred_without_core(cx, &mut synced);
1245
1246 // Wait for a core.
1247 self.wait_for_core(cx, synced)
1248 }
1249 } else {
1250 synced = cx.shared().synced.lock();
1251
1252 // Wait for a core to be assigned to us
1253 self.wait_for_core(cx, synced)
1254 }
1255 }
1256
transition_to_searching(&self, cx: &Context, core: &mut Core) -> bool1257 fn transition_to_searching(&self, cx: &Context, core: &mut Core) -> bool {
1258 if !core.is_searching {
1259 cx.shared().idle.try_transition_worker_to_searching(core);
1260 }
1261
1262 core.is_searching
1263 }
1264
1265 /// Returns `true` if another worker must be notified
transition_from_searching(&self, cx: &Context, core: &mut Core) -> bool1266 fn transition_from_searching(&self, cx: &Context, core: &mut Core) -> bool {
1267 if !core.is_searching {
1268 return false;
1269 }
1270
1271 core.is_searching = false;
1272 cx.shared().idle.transition_worker_from_searching()
1273 }
1274
can_transition_to_parked(&self, core: &mut Core) -> bool1275 fn can_transition_to_parked(&self, core: &mut Core) -> bool {
1276 !self.has_tasks(core) && !self.is_shutdown && !self.is_traced
1277 }
1278
has_tasks(&self, core: &Core) -> bool1279 fn has_tasks(&self, core: &Core) -> bool {
1280 core.lifo_slot.is_some() || !core.run_queue.is_empty()
1281 }
1282
reset_lifo_enabled(&self, cx: &Context)1283 fn reset_lifo_enabled(&self, cx: &Context) {
1284 cx.lifo_enabled
1285 .set(!cx.handle.shared.config.disable_lifo_slot);
1286 }
1287
assert_lifo_enabled_is_correct(&self, cx: &Context)1288 fn assert_lifo_enabled_is_correct(&self, cx: &Context) {
1289 debug_assert_eq!(
1290 cx.lifo_enabled.get(),
1291 !cx.handle.shared.config.disable_lifo_slot
1292 );
1293 }
1294
tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core)1295 fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) {
1296 let next = core.stats.tuned_global_queue_interval(&cx.shared().config);
1297
1298 // Smooth out jitter
1299 if u32::abs_diff(self.global_queue_interval, next) > 2 {
1300 self.global_queue_interval = next;
1301 }
1302 }
1303
shutdown_clear_defer(&self, cx: &Context)1304 fn shutdown_clear_defer(&self, cx: &Context) {
1305 let mut defer = cx.defer.borrow_mut();
1306
1307 for task in defer.drain(..) {
1308 drop(task);
1309 }
1310 }
1311 }
1312
1313 impl Context {
defer(&self, waker: &Waker)1314 pub(crate) fn defer(&self, waker: &Waker) {
1315 // TODO: refactor defer across all runtimes
1316 waker.wake_by_ref();
1317 }
1318
shared(&self) -> &Shared1319 fn shared(&self) -> &Shared {
1320 &self.handle.shared
1321 }
1322
1323 #[cfg_attr(not(feature = "time"), allow(dead_code))]
get_worker_index(&self) -> usize1324 pub(crate) fn get_worker_index(&self) -> usize {
1325 self.index
1326 }
1327 }
1328
1329 impl Core {
next_local_task(&mut self) -> Option<Notified>1330 fn next_local_task(&mut self) -> Option<Notified> {
1331 self.next_lifo_task().or_else(|| self.run_queue.pop())
1332 }
1333
next_lifo_task(&mut self) -> Option<Notified>1334 fn next_lifo_task(&mut self) -> Option<Notified> {
1335 self.lifo_slot.take()
1336 }
1337 }
1338
1339 impl Shared {
next_remote_task_synced(&self, synced: &mut Synced) -> Option<Notified>1340 fn next_remote_task_synced(&self, synced: &mut Synced) -> Option<Notified> {
1341 // safety: we only have access to a valid `Synced` in this file.
1342 unsafe { self.inject.pop(&mut synced.inject) }
1343 }
1344
schedule_task(&self, task: Notified, is_yield: bool)1345 pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1346 use std::ptr;
1347
1348 with_current(|maybe_cx| {
1349 if let Some(cx) = maybe_cx {
1350 // Make sure the task is part of the **current** scheduler.
1351 if ptr::eq(self, &cx.handle.shared) {
1352 // And the current thread still holds a core
1353 if let Some(core) = cx.core.borrow_mut().as_mut() {
1354 if is_yield {
1355 cx.defer.borrow_mut().push(task);
1356 } else {
1357 self.schedule_local(cx, core, task);
1358 }
1359 } else {
1360 // This can happen if either the core was stolen
1361 // (`block_in_place`) or the notification happens from
1362 // the driver.
1363 cx.defer.borrow_mut().push(task);
1364 }
1365 return;
1366 }
1367 }
1368
1369 // Otherwise, use the inject queue.
1370 self.schedule_remote(task);
1371 })
1372 }
1373
schedule_local(&self, cx: &Context, core: &mut Core, task: Notified)1374 fn schedule_local(&self, cx: &Context, core: &mut Core, task: Notified) {
1375 core.stats.inc_local_schedule_count();
1376
1377 if cx.lifo_enabled.get() {
1378 // Push to the LIFO slot
1379 let prev = std::mem::replace(&mut core.lifo_slot, Some(task));
1380 // let prev = cx.shared().remotes[core.index].lifo_slot.swap_local(task);
1381
1382 if let Some(prev) = prev {
1383 core.run_queue
1384 .push_back_or_overflow(prev, self, &mut core.stats);
1385 } else {
1386 return;
1387 }
1388 } else {
1389 core.run_queue
1390 .push_back_or_overflow(task, self, &mut core.stats);
1391 }
1392
1393 self.notify_parked_local();
1394 }
1395
notify_parked_local(&self)1396 fn notify_parked_local(&self) {
1397 super::counters::inc_num_inc_notify_local();
1398 self.idle.notify_local(self);
1399 }
1400
schedule_remote(&self, task: Notified)1401 fn schedule_remote(&self, task: Notified) {
1402 super::counters::inc_num_notify_remote();
1403 self.scheduler_metrics.inc_remote_schedule_count();
1404
1405 let mut synced = self.synced.lock();
1406 // Push the task in the
1407 self.push_remote_task(&mut synced, task);
1408
1409 // Notify a worker. The mutex is passed in and will be released as part
1410 // of the method call.
1411 self.idle.notify_remote(synced, self);
1412 }
1413
close(&self, handle: &Handle)1414 pub(super) fn close(&self, handle: &Handle) {
1415 {
1416 let mut synced = self.synced.lock();
1417
1418 if let Some(driver) = self.driver.take() {
1419 synced.shutdown_driver = Some(driver);
1420 }
1421
1422 if !self.inject.close(&mut synced.inject) {
1423 return;
1424 }
1425
1426 // Set the shutdown flag on all available cores
1427 self.idle.shutdown(&mut synced, self);
1428 }
1429
1430 // Any unassigned cores need to be shutdown, but we have to first drop
1431 // the lock
1432 self.idle.shutdown_unassigned_cores(handle, self);
1433 }
1434
push_remote_task(&self, synced: &mut Synced, task: Notified)1435 fn push_remote_task(&self, synced: &mut Synced, task: Notified) {
1436 // safety: passing in correct `idle::Synced`
1437 unsafe {
1438 self.inject.push(&mut synced.inject, task);
1439 }
1440 }
1441
push_remote_task_batch<I>(&self, iter: I) where I: Iterator<Item = task::Notified<Arc<Handle>>>,1442 fn push_remote_task_batch<I>(&self, iter: I)
1443 where
1444 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1445 {
1446 unsafe {
1447 self.inject.push_batch(self, iter);
1448 }
1449 }
1450
push_remote_task_batch_synced<I>(&self, synced: &mut Synced, iter: I) where I: Iterator<Item = task::Notified<Arc<Handle>>>,1451 fn push_remote_task_batch_synced<I>(&self, synced: &mut Synced, iter: I)
1452 where
1453 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1454 {
1455 unsafe {
1456 self.inject.push_batch(&mut synced.inject, iter);
1457 }
1458 }
1459
take_driver(&self) -> Option<Box<Driver>>1460 fn take_driver(&self) -> Option<Box<Driver>> {
1461 if !self.driver_enabled() {
1462 return None;
1463 }
1464
1465 self.driver.take()
1466 }
1467
driver_enabled(&self) -> bool1468 fn driver_enabled(&self) -> bool {
1469 self.condvars.len() > self.remotes.len()
1470 }
1471
shutdown_core(&self, handle: &Handle, mut core: Box<Core>)1472 pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
1473 // Start from a random inner list
1474 let start = core.rand.fastrand_n(self.owned.get_shard_size() as u32);
1475 self.owned.close_and_shutdown_all(start as usize);
1476
1477 core.stats.submit(&self.worker_metrics[core.index]);
1478
1479 let mut synced = self.synced.lock();
1480 synced.shutdown_cores.push(core);
1481
1482 self.shutdown_finalize(handle, &mut synced);
1483 }
1484
shutdown_finalize(&self, handle: &Handle, synced: &mut Synced)1485 pub(super) fn shutdown_finalize(&self, handle: &Handle, synced: &mut Synced) {
1486 // Wait for all cores
1487 if synced.shutdown_cores.len() != self.remotes.len() {
1488 return;
1489 }
1490
1491 let driver = synced.shutdown_driver.take();
1492
1493 if self.driver_enabled() && driver.is_none() {
1494 return;
1495 }
1496
1497 debug_assert!(self.owned.is_empty());
1498
1499 for mut core in synced.shutdown_cores.drain(..) {
1500 // Drain tasks from the local queue
1501 while core.next_local_task().is_some() {}
1502 }
1503
1504 // Shutdown the driver
1505 if let Some(mut driver) = driver {
1506 driver.shutdown(&handle.driver);
1507 }
1508
1509 // Drain the injection queue
1510 //
1511 // We already shut down every task, so we can simply drop the tasks. We
1512 // cannot call `next_remote_task()` because we already hold the lock.
1513 //
1514 // safety: passing in correct `idle::Synced`
1515 while let Some(task) = self.next_remote_task_synced(synced) {
1516 drop(task);
1517 }
1518 }
1519 }
1520
1521 impl Overflow<Arc<Handle>> for Shared {
push(&self, task: task::Notified<Arc<Handle>>)1522 fn push(&self, task: task::Notified<Arc<Handle>>) {
1523 self.push_remote_task(&mut self.synced.lock(), task);
1524 }
1525
push_batch<I>(&self, iter: I) where I: Iterator<Item = task::Notified<Arc<Handle>>>,1526 fn push_batch<I>(&self, iter: I)
1527 where
1528 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1529 {
1530 self.push_remote_task_batch(iter)
1531 }
1532 }
1533
1534 impl<'a> Lock<inject::Synced> for &'a Shared {
1535 type Handle = SyncedGuard<'a>;
1536
lock(self) -> Self::Handle1537 fn lock(self) -> Self::Handle {
1538 SyncedGuard {
1539 lock: self.synced.lock(),
1540 }
1541 }
1542 }
1543
1544 impl<'a> Lock<Synced> for &'a Shared {
1545 type Handle = SyncedGuard<'a>;
1546
lock(self) -> Self::Handle1547 fn lock(self) -> Self::Handle {
1548 SyncedGuard {
1549 lock: self.synced.lock(),
1550 }
1551 }
1552 }
1553
1554 impl task::Schedule for Arc<Handle> {
release(&self, task: &Task) -> Option<Task>1555 fn release(&self, task: &Task) -> Option<Task> {
1556 self.shared.owned.remove(task)
1557 }
1558
schedule(&self, task: Notified)1559 fn schedule(&self, task: Notified) {
1560 self.shared.schedule_task(task, false);
1561 }
1562
hooks(&self) -> TaskHarnessScheduleHooks1563 fn hooks(&self) -> TaskHarnessScheduleHooks {
1564 TaskHarnessScheduleHooks {
1565 task_terminate_callback: self.task_hooks.task_terminate_callback.clone(),
1566 }
1567 }
1568
yield_now(&self, task: Notified)1569 fn yield_now(&self, task: Notified) {
1570 self.shared.schedule_task(task, true);
1571 }
1572 }
1573
1574 impl AsMut<Synced> for Synced {
as_mut(&mut self) -> &mut Synced1575 fn as_mut(&mut self) -> &mut Synced {
1576 self
1577 }
1578 }
1579
1580 pub(crate) struct SyncedGuard<'a> {
1581 lock: crate::loom::sync::MutexGuard<'a, Synced>,
1582 }
1583
1584 impl<'a> AsMut<inject::Synced> for SyncedGuard<'a> {
as_mut(&mut self) -> &mut inject::Synced1585 fn as_mut(&mut self) -> &mut inject::Synced {
1586 &mut self.lock.inject
1587 }
1588 }
1589
1590 impl<'a> AsMut<Synced> for SyncedGuard<'a> {
as_mut(&mut self) -> &mut Synced1591 fn as_mut(&mut self) -> &mut Synced {
1592 &mut self.lock
1593 }
1594 }
1595
1596 #[track_caller]
with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R1597 fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1598 use scheduler::Context::MultiThreadAlt;
1599
1600 context::with_scheduler(|ctx| match ctx {
1601 Some(MultiThreadAlt(ctx)) => f(Some(ctx)),
1602 _ => f(None),
1603 })
1604 }
1605