1 //! Code that decides when workers should go to sleep. See README.md 2 //! for an overview. 3 4 use crate::latch::CoreLatch; 5 use crate::sync::{Condvar, Mutex}; 6 use crossbeam_utils::CachePadded; 7 use std::sync::atomic::Ordering; 8 use std::thread; 9 use std::usize; 10 11 mod counters; 12 pub(crate) use self::counters::THREADS_MAX; 13 use self::counters::{AtomicCounters, JobsEventCounter}; 14 15 /// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping 16 /// of workers. It has callbacks that are invoked periodically at significant events, 17 /// such as when workers are looping and looking for work, when latches are set, or when 18 /// jobs are published, and it either blocks threads or wakes them in response to these 19 /// events. See the [`README.md`] in this module for more details. 20 /// 21 /// [`README.md`] README.md 22 pub(super) struct Sleep { 23 /// One "sleep state" per worker. Used to track if a worker is sleeping and to have 24 /// them block. 25 worker_sleep_states: Vec<CachePadded<WorkerSleepState>>, 26 27 counters: AtomicCounters, 28 } 29 30 /// An instance of this struct is created when a thread becomes idle. 31 /// It is consumed when the thread finds work, and passed by `&mut` 32 /// reference for operations that preserve the idle state. (In other 33 /// words, producing one of these structs is evidence the thread is 34 /// idle.) It tracks state such as how long the thread has been idle. 35 pub(super) struct IdleState { 36 /// What is worker index of the idle thread? 37 worker_index: usize, 38 39 /// How many rounds have we been circling without sleeping? 40 rounds: u32, 41 42 /// Once we become sleepy, what was the sleepy counter value? 43 /// Set to `INVALID_SLEEPY_COUNTER` otherwise. 44 jobs_counter: JobsEventCounter, 45 } 46 47 /// The "sleep state" for an individual worker. 48 #[derive(Default)] 49 struct WorkerSleepState { 50 /// Set to true when the worker goes to sleep; set to false when 51 /// the worker is notified or when it wakes. 52 is_blocked: Mutex<bool>, 53 54 condvar: Condvar, 55 } 56 57 const ROUNDS_UNTIL_SLEEPY: u32 = 32; 58 const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1; 59 60 impl Sleep { new(n_threads: usize) -> Sleep61 pub(super) fn new(n_threads: usize) -> Sleep { 62 assert!(n_threads <= THREADS_MAX); 63 Sleep { 64 worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(), 65 counters: AtomicCounters::new(), 66 } 67 } 68 69 #[inline] start_looking(&self, worker_index: usize) -> IdleState70 pub(super) fn start_looking(&self, worker_index: usize) -> IdleState { 71 self.counters.add_inactive_thread(); 72 73 IdleState { 74 worker_index, 75 rounds: 0, 76 jobs_counter: JobsEventCounter::DUMMY, 77 } 78 } 79 80 #[inline] work_found(&self)81 pub(super) fn work_found(&self) { 82 // If we were the last idle thread and other threads are still sleeping, 83 // then we should wake up another thread. 84 let threads_to_wake = self.counters.sub_inactive_thread(); 85 self.wake_any_threads(threads_to_wake as u32); 86 } 87 88 #[inline] no_work_found( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )89 pub(super) fn no_work_found( 90 &self, 91 idle_state: &mut IdleState, 92 latch: &CoreLatch, 93 has_injected_jobs: impl FnOnce() -> bool, 94 ) { 95 if idle_state.rounds < ROUNDS_UNTIL_SLEEPY { 96 thread::yield_now(); 97 idle_state.rounds += 1; 98 } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY { 99 idle_state.jobs_counter = self.announce_sleepy(); 100 idle_state.rounds += 1; 101 thread::yield_now(); 102 } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING { 103 idle_state.rounds += 1; 104 thread::yield_now(); 105 } else { 106 debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING); 107 self.sleep(idle_state, latch, has_injected_jobs); 108 } 109 } 110 111 #[cold] announce_sleepy(&self) -> JobsEventCounter112 fn announce_sleepy(&self) -> JobsEventCounter { 113 self.counters 114 .increment_jobs_event_counter_if(JobsEventCounter::is_active) 115 .jobs_counter() 116 } 117 118 #[cold] sleep( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )119 fn sleep( 120 &self, 121 idle_state: &mut IdleState, 122 latch: &CoreLatch, 123 has_injected_jobs: impl FnOnce() -> bool, 124 ) { 125 let worker_index = idle_state.worker_index; 126 127 if !latch.get_sleepy() { 128 return; 129 } 130 131 let sleep_state = &self.worker_sleep_states[worker_index]; 132 let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); 133 debug_assert!(!*is_blocked); 134 135 // Our latch was signalled. We should wake back up fully as we 136 // will have some stuff to do. 137 if !latch.fall_asleep() { 138 idle_state.wake_fully(); 139 return; 140 } 141 142 loop { 143 let counters = self.counters.load(Ordering::SeqCst); 144 145 // Check if the JEC has changed since we got sleepy. 146 debug_assert!(idle_state.jobs_counter.is_sleepy()); 147 if counters.jobs_counter() != idle_state.jobs_counter { 148 // JEC has changed, so a new job was posted, but for some reason 149 // we didn't see it. We should return to just before the SLEEPY 150 // state so we can do another search and (if we fail to find 151 // work) go back to sleep. 152 idle_state.wake_partly(); 153 latch.wake_up(); 154 return; 155 } 156 157 // Otherwise, let's move from IDLE to SLEEPING. 158 if self.counters.try_add_sleeping_thread(counters) { 159 break; 160 } 161 } 162 163 // Successfully registered as asleep. 164 165 // We have one last check for injected jobs to do. This protects against 166 // deadlock in the very unlikely event that 167 // 168 // - an external job is being injected while we are sleepy 169 // - that job triggers the rollover over the JEC such that we don't see it 170 // - we are the last active worker thread 171 std::sync::atomic::fence(Ordering::SeqCst); 172 if has_injected_jobs() { 173 // If we see an externally injected job, then we have to 'wake 174 // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by 175 // the one that wakes us.) 176 self.counters.sub_sleeping_thread(); 177 } else { 178 // If we don't see an injected job (the normal case), then flag 179 // ourselves as asleep and wait till we are notified. 180 // 181 // (Note that `is_blocked` is held under a mutex and the mutex was 182 // acquired *before* we incremented the "sleepy counter". This means 183 // that whomever is coming to wake us will have to wait until we 184 // release the mutex in the call to `wait`, so they will see this 185 // boolean as true.) 186 *is_blocked = true; 187 while *is_blocked { 188 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap(); 189 } 190 } 191 192 // Update other state: 193 idle_state.wake_fully(); 194 latch.wake_up(); 195 } 196 197 /// Notify the given thread that it should wake up (if it is 198 /// sleeping). When this method is invoked, we typically know the 199 /// thread is asleep, though in rare cases it could have been 200 /// awoken by (e.g.) new work having been posted. notify_worker_latch_is_set(&self, target_worker_index: usize)201 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { 202 self.wake_specific_thread(target_worker_index); 203 } 204 205 /// Signals that `num_jobs` new jobs were injected into the thread 206 /// pool from outside. This function will ensure that there are 207 /// threads available to process them, waking threads from sleep 208 /// if necessary. 209 /// 210 /// # Parameters 211 /// 212 /// - `num_jobs` -- lower bound on number of jobs available for stealing. 213 /// We'll try to get at least one thread per job. 214 #[inline] new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool)215 pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) { 216 // This fence is needed to guarantee that threads 217 // as they are about to fall asleep, observe any 218 // new jobs that may have been injected. 219 std::sync::atomic::fence(Ordering::SeqCst); 220 221 self.new_jobs(num_jobs, queue_was_empty) 222 } 223 224 /// Signals that `num_jobs` new jobs were pushed onto a thread's 225 /// local deque. This function will try to ensure that there are 226 /// threads available to process them, waking threads from sleep 227 /// if necessary. However, this is not guaranteed: under certain 228 /// race conditions, the function may fail to wake any new 229 /// threads; in that case the existing thread should eventually 230 /// pop the job. 231 /// 232 /// # Parameters 233 /// 234 /// - `num_jobs` -- lower bound on number of jobs available for stealing. 235 /// We'll try to get at least one thread per job. 236 #[inline] new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool)237 pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) { 238 self.new_jobs(num_jobs, queue_was_empty) 239 } 240 241 /// Common helper for `new_injected_jobs` and `new_internal_jobs`. 242 #[inline] new_jobs(&self, num_jobs: u32, queue_was_empty: bool)243 fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) { 244 // Read the counters and -- if sleepy workers have announced themselves 245 // -- announce that there is now work available. The final value of `counters` 246 // with which we exit the loop thus corresponds to a state when 247 let counters = self 248 .counters 249 .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy); 250 let num_awake_but_idle = counters.awake_but_idle_threads(); 251 let num_sleepers = counters.sleeping_threads(); 252 253 if num_sleepers == 0 { 254 // nobody to wake 255 return; 256 } 257 258 // Promote from u16 to u32 so we can interoperate with 259 // num_jobs more easily. 260 let num_awake_but_idle = num_awake_but_idle as u32; 261 let num_sleepers = num_sleepers as u32; 262 263 // If the queue is non-empty, then we always wake up a worker 264 // -- clearly the existing idle jobs aren't enough. Otherwise, 265 // check to see if we have enough idle workers. 266 if !queue_was_empty { 267 let num_to_wake = std::cmp::min(num_jobs, num_sleepers); 268 self.wake_any_threads(num_to_wake); 269 } else if num_awake_but_idle < num_jobs { 270 let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers); 271 self.wake_any_threads(num_to_wake); 272 } 273 } 274 275 #[cold] wake_any_threads(&self, mut num_to_wake: u32)276 fn wake_any_threads(&self, mut num_to_wake: u32) { 277 if num_to_wake > 0 { 278 for i in 0..self.worker_sleep_states.len() { 279 if self.wake_specific_thread(i) { 280 num_to_wake -= 1; 281 if num_to_wake == 0 { 282 return; 283 } 284 } 285 } 286 } 287 } 288 wake_specific_thread(&self, index: usize) -> bool289 fn wake_specific_thread(&self, index: usize) -> bool { 290 let sleep_state = &self.worker_sleep_states[index]; 291 292 let mut is_blocked = sleep_state.is_blocked.lock().unwrap(); 293 if *is_blocked { 294 *is_blocked = false; 295 sleep_state.condvar.notify_one(); 296 297 // When the thread went to sleep, it will have incremented 298 // this value. When we wake it, its our job to decrement 299 // it. We could have the thread do it, but that would 300 // introduce a delay between when the thread was 301 // *notified* and when this counter was decremented. That 302 // might mislead people with new work into thinking that 303 // there are sleeping threads that they should try to 304 // wake, when in fact there is nothing left for them to 305 // do. 306 self.counters.sub_sleeping_thread(); 307 308 true 309 } else { 310 false 311 } 312 } 313 } 314 315 impl IdleState { wake_fully(&mut self)316 fn wake_fully(&mut self) { 317 self.rounds = 0; 318 self.jobs_counter = JobsEventCounter::DUMMY; 319 } 320 wake_partly(&mut self)321 fn wake_partly(&mut self) { 322 self.rounds = ROUNDS_UNTIL_SLEEPY; 323 self.jobs_counter = JobsEventCounter::DUMMY; 324 } 325 } 326