1 //! Code that decides when workers should go to sleep. See README.md
2 //! for an overview.
3 
4 use crate::latch::CoreLatch;
5 use crate::sync::{Condvar, Mutex};
6 use crossbeam_utils::CachePadded;
7 use std::sync::atomic::Ordering;
8 use std::thread;
9 use std::usize;
10 
11 mod counters;
12 pub(crate) use self::counters::THREADS_MAX;
13 use self::counters::{AtomicCounters, JobsEventCounter};
14 
15 /// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
16 /// of workers. It has callbacks that are invoked periodically at significant events,
17 /// such as when workers are looping and looking for work, when latches are set, or when
18 /// jobs are published, and it either blocks threads or wakes them in response to these
19 /// events. See the [`README.md`] in this module for more details.
20 ///
21 /// [`README.md`] README.md
22 pub(super) struct Sleep {
23     /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
24     /// them block.
25     worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
26 
27     counters: AtomicCounters,
28 }
29 
30 /// An instance of this struct is created when a thread becomes idle.
31 /// It is consumed when the thread finds work, and passed by `&mut`
32 /// reference for operations that preserve the idle state. (In other
33 /// words, producing one of these structs is evidence the thread is
34 /// idle.) It tracks state such as how long the thread has been idle.
35 pub(super) struct IdleState {
36     /// What is worker index of the idle thread?
37     worker_index: usize,
38 
39     /// How many rounds have we been circling without sleeping?
40     rounds: u32,
41 
42     /// Once we become sleepy, what was the sleepy counter value?
43     /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
44     jobs_counter: JobsEventCounter,
45 }
46 
47 /// The "sleep state" for an individual worker.
48 #[derive(Default)]
49 struct WorkerSleepState {
50     /// Set to true when the worker goes to sleep; set to false when
51     /// the worker is notified or when it wakes.
52     is_blocked: Mutex<bool>,
53 
54     condvar: Condvar,
55 }
56 
57 const ROUNDS_UNTIL_SLEEPY: u32 = 32;
58 const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
59 
60 impl Sleep {
new(n_threads: usize) -> Sleep61     pub(super) fn new(n_threads: usize) -> Sleep {
62         assert!(n_threads <= THREADS_MAX);
63         Sleep {
64             worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
65             counters: AtomicCounters::new(),
66         }
67     }
68 
69     #[inline]
start_looking(&self, worker_index: usize) -> IdleState70     pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
71         self.counters.add_inactive_thread();
72 
73         IdleState {
74             worker_index,
75             rounds: 0,
76             jobs_counter: JobsEventCounter::DUMMY,
77         }
78     }
79 
80     #[inline]
work_found(&self)81     pub(super) fn work_found(&self) {
82         // If we were the last idle thread and other threads are still sleeping,
83         // then we should wake up another thread.
84         let threads_to_wake = self.counters.sub_inactive_thread();
85         self.wake_any_threads(threads_to_wake as u32);
86     }
87 
88     #[inline]
no_work_found( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )89     pub(super) fn no_work_found(
90         &self,
91         idle_state: &mut IdleState,
92         latch: &CoreLatch,
93         has_injected_jobs: impl FnOnce() -> bool,
94     ) {
95         if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
96             thread::yield_now();
97             idle_state.rounds += 1;
98         } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
99             idle_state.jobs_counter = self.announce_sleepy();
100             idle_state.rounds += 1;
101             thread::yield_now();
102         } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
103             idle_state.rounds += 1;
104             thread::yield_now();
105         } else {
106             debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
107             self.sleep(idle_state, latch, has_injected_jobs);
108         }
109     }
110 
111     #[cold]
announce_sleepy(&self) -> JobsEventCounter112     fn announce_sleepy(&self) -> JobsEventCounter {
113         self.counters
114             .increment_jobs_event_counter_if(JobsEventCounter::is_active)
115             .jobs_counter()
116     }
117 
118     #[cold]
sleep( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )119     fn sleep(
120         &self,
121         idle_state: &mut IdleState,
122         latch: &CoreLatch,
123         has_injected_jobs: impl FnOnce() -> bool,
124     ) {
125         let worker_index = idle_state.worker_index;
126 
127         if !latch.get_sleepy() {
128             return;
129         }
130 
131         let sleep_state = &self.worker_sleep_states[worker_index];
132         let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
133         debug_assert!(!*is_blocked);
134 
135         // Our latch was signalled. We should wake back up fully as we
136         // will have some stuff to do.
137         if !latch.fall_asleep() {
138             idle_state.wake_fully();
139             return;
140         }
141 
142         loop {
143             let counters = self.counters.load(Ordering::SeqCst);
144 
145             // Check if the JEC has changed since we got sleepy.
146             debug_assert!(idle_state.jobs_counter.is_sleepy());
147             if counters.jobs_counter() != idle_state.jobs_counter {
148                 // JEC has changed, so a new job was posted, but for some reason
149                 // we didn't see it. We should return to just before the SLEEPY
150                 // state so we can do another search and (if we fail to find
151                 // work) go back to sleep.
152                 idle_state.wake_partly();
153                 latch.wake_up();
154                 return;
155             }
156 
157             // Otherwise, let's move from IDLE to SLEEPING.
158             if self.counters.try_add_sleeping_thread(counters) {
159                 break;
160             }
161         }
162 
163         // Successfully registered as asleep.
164 
165         // We have one last check for injected jobs to do. This protects against
166         // deadlock in the very unlikely event that
167         //
168         // - an external job is being injected while we are sleepy
169         // - that job triggers the rollover over the JEC such that we don't see it
170         // - we are the last active worker thread
171         std::sync::atomic::fence(Ordering::SeqCst);
172         if has_injected_jobs() {
173             // If we see an externally injected job, then we have to 'wake
174             // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
175             // the one that wakes us.)
176             self.counters.sub_sleeping_thread();
177         } else {
178             // If we don't see an injected job (the normal case), then flag
179             // ourselves as asleep and wait till we are notified.
180             //
181             // (Note that `is_blocked` is held under a mutex and the mutex was
182             // acquired *before* we incremented the "sleepy counter". This means
183             // that whomever is coming to wake us will have to wait until we
184             // release the mutex in the call to `wait`, so they will see this
185             // boolean as true.)
186             *is_blocked = true;
187             while *is_blocked {
188                 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
189             }
190         }
191 
192         // Update other state:
193         idle_state.wake_fully();
194         latch.wake_up();
195     }
196 
197     /// Notify the given thread that it should wake up (if it is
198     /// sleeping).  When this method is invoked, we typically know the
199     /// thread is asleep, though in rare cases it could have been
200     /// awoken by (e.g.) new work having been posted.
notify_worker_latch_is_set(&self, target_worker_index: usize)201     pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
202         self.wake_specific_thread(target_worker_index);
203     }
204 
205     /// Signals that `num_jobs` new jobs were injected into the thread
206     /// pool from outside. This function will ensure that there are
207     /// threads available to process them, waking threads from sleep
208     /// if necessary.
209     ///
210     /// # Parameters
211     ///
212     /// - `num_jobs` -- lower bound on number of jobs available for stealing.
213     ///   We'll try to get at least one thread per job.
214     #[inline]
new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool)215     pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
216         // This fence is needed to guarantee that threads
217         // as they are about to fall asleep, observe any
218         // new jobs that may have been injected.
219         std::sync::atomic::fence(Ordering::SeqCst);
220 
221         self.new_jobs(num_jobs, queue_was_empty)
222     }
223 
224     /// Signals that `num_jobs` new jobs were pushed onto a thread's
225     /// local deque. This function will try to ensure that there are
226     /// threads available to process them, waking threads from sleep
227     /// if necessary. However, this is not guaranteed: under certain
228     /// race conditions, the function may fail to wake any new
229     /// threads; in that case the existing thread should eventually
230     /// pop the job.
231     ///
232     /// # Parameters
233     ///
234     /// - `num_jobs` -- lower bound on number of jobs available for stealing.
235     ///   We'll try to get at least one thread per job.
236     #[inline]
new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool)237     pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
238         self.new_jobs(num_jobs, queue_was_empty)
239     }
240 
241     /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
242     #[inline]
new_jobs(&self, num_jobs: u32, queue_was_empty: bool)243     fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
244         // Read the counters and -- if sleepy workers have announced themselves
245         // -- announce that there is now work available. The final value of `counters`
246         // with which we exit the loop thus corresponds to a state when
247         let counters = self
248             .counters
249             .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
250         let num_awake_but_idle = counters.awake_but_idle_threads();
251         let num_sleepers = counters.sleeping_threads();
252 
253         if num_sleepers == 0 {
254             // nobody to wake
255             return;
256         }
257 
258         // Promote from u16 to u32 so we can interoperate with
259         // num_jobs more easily.
260         let num_awake_but_idle = num_awake_but_idle as u32;
261         let num_sleepers = num_sleepers as u32;
262 
263         // If the queue is non-empty, then we always wake up a worker
264         // -- clearly the existing idle jobs aren't enough. Otherwise,
265         // check to see if we have enough idle workers.
266         if !queue_was_empty {
267             let num_to_wake = std::cmp::min(num_jobs, num_sleepers);
268             self.wake_any_threads(num_to_wake);
269         } else if num_awake_but_idle < num_jobs {
270             let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
271             self.wake_any_threads(num_to_wake);
272         }
273     }
274 
275     #[cold]
wake_any_threads(&self, mut num_to_wake: u32)276     fn wake_any_threads(&self, mut num_to_wake: u32) {
277         if num_to_wake > 0 {
278             for i in 0..self.worker_sleep_states.len() {
279                 if self.wake_specific_thread(i) {
280                     num_to_wake -= 1;
281                     if num_to_wake == 0 {
282                         return;
283                     }
284                 }
285             }
286         }
287     }
288 
wake_specific_thread(&self, index: usize) -> bool289     fn wake_specific_thread(&self, index: usize) -> bool {
290         let sleep_state = &self.worker_sleep_states[index];
291 
292         let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
293         if *is_blocked {
294             *is_blocked = false;
295             sleep_state.condvar.notify_one();
296 
297             // When the thread went to sleep, it will have incremented
298             // this value. When we wake it, its our job to decrement
299             // it. We could have the thread do it, but that would
300             // introduce a delay between when the thread was
301             // *notified* and when this counter was decremented. That
302             // might mislead people with new work into thinking that
303             // there are sleeping threads that they should try to
304             // wake, when in fact there is nothing left for them to
305             // do.
306             self.counters.sub_sleeping_thread();
307 
308             true
309         } else {
310             false
311         }
312     }
313 }
314 
315 impl IdleState {
wake_fully(&mut self)316     fn wake_fully(&mut self) {
317         self.rounds = 0;
318         self.jobs_counter = JobsEventCounter::DUMMY;
319     }
320 
wake_partly(&mut self)321     fn wake_partly(&mut self) {
322         self.rounds = ROUNDS_UNTIL_SLEEPY;
323         self.jobs_counter = JobsEventCounter::DUMMY;
324     }
325 }
326