1 use std::marker::PhantomData;
2 use std::ops::Deref;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::Arc;
5 use std::usize;
6 
7 use crate::registry::{Registry, WorkerThread};
8 use crate::sync::{Condvar, Mutex};
9 
10 /// We define various kinds of latches, which are all a primitive signaling
11 /// mechanism. A latch starts as false. Eventually someone calls `set()` and
12 /// it becomes true. You can test if it has been set by calling `probe()`.
13 ///
14 /// Some kinds of latches, but not all, support a `wait()` operation
15 /// that will wait until the latch is set, blocking efficiently. That
16 /// is not part of the trait since it is not possibly to do with all
17 /// latches.
18 ///
19 /// The intention is that `set()` is called once, but `probe()` may be
20 /// called any number of times. Once `probe()` returns true, the memory
21 /// effects that occurred before `set()` become visible.
22 ///
23 /// It'd probably be better to refactor the API into two paired types,
24 /// but that's a bit of work, and this is not a public API.
25 ///
26 /// ## Memory ordering
27 ///
28 /// Latches need to guarantee two things:
29 ///
30 /// - Once `probe()` returns true, all memory effects from the `set()`
31 ///   are visible (in other words, the set should synchronize-with
32 ///   the probe).
33 /// - Once `set()` occurs, the next `probe()` *will* observe it.  This
34 ///   typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
35 ///   README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
36 pub(super) trait Latch {
37     /// Set the latch, signalling others.
38     ///
39     /// # WARNING
40     ///
41     /// Setting a latch triggers other threads to wake up and (in some
42     /// cases) complete. This may, in turn, cause memory to be
43     /// deallocated and so forth. One must be very careful about this,
44     /// and it's typically better to read all the fields you will need
45     /// to access *before* a latch is set!
46     ///
47     /// This function operates on `*const Self` instead of `&self` to allow it
48     /// to become dangling during this call. The caller must ensure that the
49     /// pointer is valid upon entry, and not invalidated during the call by any
50     /// actions other than `set` itself.
set(this: *const Self)51     unsafe fn set(this: *const Self);
52 }
53 
54 pub(super) trait AsCoreLatch {
as_core_latch(&self) -> &CoreLatch55     fn as_core_latch(&self) -> &CoreLatch;
56 }
57 
58 /// Latch is not set, owning thread is awake
59 const UNSET: usize = 0;
60 
61 /// Latch is not set, owning thread is going to sleep on this latch
62 /// (but has not yet fallen asleep).
63 const SLEEPY: usize = 1;
64 
65 /// Latch is not set, owning thread is asleep on this latch and
66 /// must be awoken.
67 const SLEEPING: usize = 2;
68 
69 /// Latch is set.
70 const SET: usize = 3;
71 
72 /// Spin latches are the simplest, most efficient kind, but they do
73 /// not support a `wait()` operation. They just have a boolean flag
74 /// that becomes true when `set()` is called.
75 #[derive(Debug)]
76 pub(super) struct CoreLatch {
77     state: AtomicUsize,
78 }
79 
80 impl CoreLatch {
81     #[inline]
new() -> Self82     fn new() -> Self {
83         Self {
84             state: AtomicUsize::new(0),
85         }
86     }
87 
88     /// Invoked by owning thread as it prepares to sleep. Returns true
89     /// if the owning thread may proceed to fall asleep, false if the
90     /// latch was set in the meantime.
91     #[inline]
get_sleepy(&self) -> bool92     pub(super) fn get_sleepy(&self) -> bool {
93         self.state
94             .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
95             .is_ok()
96     }
97 
98     /// Invoked by owning thread as it falls asleep sleep. Returns
99     /// true if the owning thread should block, or false if the latch
100     /// was set in the meantime.
101     #[inline]
fall_asleep(&self) -> bool102     pub(super) fn fall_asleep(&self) -> bool {
103         self.state
104             .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
105             .is_ok()
106     }
107 
108     /// Invoked by owning thread as it falls asleep sleep. Returns
109     /// true if the owning thread should block, or false if the latch
110     /// was set in the meantime.
111     #[inline]
wake_up(&self)112     pub(super) fn wake_up(&self) {
113         if !self.probe() {
114             let _ =
115                 self.state
116                     .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
117         }
118     }
119 
120     /// Set the latch. If this returns true, the owning thread was sleeping
121     /// and must be awoken.
122     ///
123     /// This is private because, typically, setting a latch involves
124     /// doing some wakeups; those are encapsulated in the surrounding
125     /// latch code.
126     #[inline]
set(this: *const Self) -> bool127     unsafe fn set(this: *const Self) -> bool {
128         let old_state = (*this).state.swap(SET, Ordering::AcqRel);
129         old_state == SLEEPING
130     }
131 
132     /// Test if this latch has been set.
133     #[inline]
probe(&self) -> bool134     pub(super) fn probe(&self) -> bool {
135         self.state.load(Ordering::Acquire) == SET
136     }
137 }
138 
139 impl AsCoreLatch for CoreLatch {
140     #[inline]
as_core_latch(&self) -> &CoreLatch141     fn as_core_latch(&self) -> &CoreLatch {
142         self
143     }
144 }
145 
146 /// Spin latches are the simplest, most efficient kind, but they do
147 /// not support a `wait()` operation. They just have a boolean flag
148 /// that becomes true when `set()` is called.
149 pub(super) struct SpinLatch<'r> {
150     core_latch: CoreLatch,
151     registry: &'r Arc<Registry>,
152     target_worker_index: usize,
153     cross: bool,
154 }
155 
156 impl<'r> SpinLatch<'r> {
157     /// Creates a new spin latch that is owned by `thread`. This means
158     /// that `thread` is the only thread that should be blocking on
159     /// this latch -- it also means that when the latch is set, we
160     /// will wake `thread` if it is sleeping.
161     #[inline]
new(thread: &'r WorkerThread) -> SpinLatch<'r>162     pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
163         SpinLatch {
164             core_latch: CoreLatch::new(),
165             registry: thread.registry(),
166             target_worker_index: thread.index(),
167             cross: false,
168         }
169     }
170 
171     /// Creates a new spin latch for cross-threadpool blocking.  Notably, we
172     /// need to make sure the registry is kept alive after setting, so we can
173     /// safely call the notification.
174     #[inline]
cross(thread: &'r WorkerThread) -> SpinLatch<'r>175     pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
176         SpinLatch {
177             cross: true,
178             ..SpinLatch::new(thread)
179         }
180     }
181 
182     #[inline]
probe(&self) -> bool183     pub(super) fn probe(&self) -> bool {
184         self.core_latch.probe()
185     }
186 }
187 
188 impl<'r> AsCoreLatch for SpinLatch<'r> {
189     #[inline]
as_core_latch(&self) -> &CoreLatch190     fn as_core_latch(&self) -> &CoreLatch {
191         &self.core_latch
192     }
193 }
194 
195 impl<'r> Latch for SpinLatch<'r> {
196     #[inline]
set(this: *const Self)197     unsafe fn set(this: *const Self) {
198         let cross_registry;
199 
200         let registry: &Registry = if (*this).cross {
201             // Ensure the registry stays alive while we notify it.
202             // Otherwise, it would be possible that we set the spin
203             // latch and the other thread sees it and exits, causing
204             // the registry to be deallocated, all before we get a
205             // chance to invoke `registry.notify_worker_latch_is_set`.
206             cross_registry = Arc::clone((*this).registry);
207             &cross_registry
208         } else {
209             // If this is not a "cross-registry" spin-latch, then the
210             // thread which is performing `set` is itself ensuring
211             // that the registry stays alive. However, that doesn't
212             // include this *particular* `Arc` handle if the waiting
213             // thread then exits, so we must completely dereference it.
214             (*this).registry
215         };
216         let target_worker_index = (*this).target_worker_index;
217 
218         // NOTE: Once we `set`, the target may proceed and invalidate `this`!
219         if CoreLatch::set(&(*this).core_latch) {
220             // Subtle: at this point, we can no longer read from
221             // `self`, because the thread owning this spin latch may
222             // have awoken and deallocated the latch. Therefore, we
223             // only use fields whose values we already read.
224             registry.notify_worker_latch_is_set(target_worker_index);
225         }
226     }
227 }
228 
229 /// A Latch starts as false and eventually becomes true. You can block
230 /// until it becomes true.
231 #[derive(Debug)]
232 pub(super) struct LockLatch {
233     m: Mutex<bool>,
234     v: Condvar,
235 }
236 
237 impl LockLatch {
238     #[inline]
new() -> LockLatch239     pub(super) fn new() -> LockLatch {
240         LockLatch {
241             m: Mutex::new(false),
242             v: Condvar::new(),
243         }
244     }
245 
246     /// Block until latch is set, then resets this lock latch so it can be reused again.
wait_and_reset(&self)247     pub(super) fn wait_and_reset(&self) {
248         let mut guard = self.m.lock().unwrap();
249         while !*guard {
250             guard = self.v.wait(guard).unwrap();
251         }
252         *guard = false;
253     }
254 
255     /// Block until latch is set.
wait(&self)256     pub(super) fn wait(&self) {
257         let mut guard = self.m.lock().unwrap();
258         while !*guard {
259             guard = self.v.wait(guard).unwrap();
260         }
261     }
262 }
263 
264 impl Latch for LockLatch {
265     #[inline]
set(this: *const Self)266     unsafe fn set(this: *const Self) {
267         let mut guard = (*this).m.lock().unwrap();
268         *guard = true;
269         (*this).v.notify_all();
270     }
271 }
272 
273 /// Once latches are used to implement one-time blocking, primarily
274 /// for the termination flag of the threads in the pool.
275 ///
276 /// Note: like a `SpinLatch`, once-latches are always associated with
277 /// some registry that is probing them, which must be tickled when
278 /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
279 /// reference to that registry. This is because in some cases the
280 /// registry owns the once-latch, and that would create a cycle. So a
281 /// `OnceLatch` must be given a reference to its owning registry when
282 /// it is set. For this reason, it does not implement the `Latch`
283 /// trait (but it doesn't have to, as it is not used in those generic
284 /// contexts).
285 #[derive(Debug)]
286 pub(super) struct OnceLatch {
287     core_latch: CoreLatch,
288 }
289 
290 impl OnceLatch {
291     #[inline]
new() -> OnceLatch292     pub(super) fn new() -> OnceLatch {
293         Self {
294             core_latch: CoreLatch::new(),
295         }
296     }
297 
298     /// Set the latch, then tickle the specific worker thread,
299     /// which should be the one that owns this latch.
300     #[inline]
set_and_tickle_one( this: *const Self, registry: &Registry, target_worker_index: usize, )301     pub(super) unsafe fn set_and_tickle_one(
302         this: *const Self,
303         registry: &Registry,
304         target_worker_index: usize,
305     ) {
306         if CoreLatch::set(&(*this).core_latch) {
307             registry.notify_worker_latch_is_set(target_worker_index);
308         }
309     }
310 }
311 
312 impl AsCoreLatch for OnceLatch {
313     #[inline]
as_core_latch(&self) -> &CoreLatch314     fn as_core_latch(&self) -> &CoreLatch {
315         &self.core_latch
316     }
317 }
318 
319 /// Counting latches are used to implement scopes. They track a
320 /// counter. Unlike other latches, calling `set()` does not
321 /// necessarily make the latch be considered `set()`; instead, it just
322 /// decrements the counter. The latch is only "set" (in the sense that
323 /// `probe()` returns true) once the counter reaches zero.
324 #[derive(Debug)]
325 pub(super) struct CountLatch {
326     counter: AtomicUsize,
327     kind: CountLatchKind,
328 }
329 
330 enum CountLatchKind {
331     /// A latch for scopes created on a rayon thread which will participate in work-
332     /// stealing while it waits for completion. This thread is not necessarily part
333     /// of the same registry as the scope itself!
334     Stealing {
335         latch: CoreLatch,
336         /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
337         /// with registry B, when a job completes in a thread of registry B, we may
338         /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
339         /// That means we need a reference to registry A (since at that point we will
340         /// only have a reference to registry B), so we stash it here.
341         registry: Arc<Registry>,
342         /// The index of the worker to wake in `registry`
343         worker_index: usize,
344     },
345 
346     /// A latch for scopes created on a non-rayon thread which will block to wait.
347     Blocking { latch: LockLatch },
348 }
349 
350 impl std::fmt::Debug for CountLatchKind {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result351     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352         match self {
353             CountLatchKind::Stealing { latch, .. } => {
354                 f.debug_tuple("Stealing").field(latch).finish()
355             }
356             CountLatchKind::Blocking { latch, .. } => {
357                 f.debug_tuple("Blocking").field(latch).finish()
358             }
359         }
360     }
361 }
362 
363 impl CountLatch {
new(owner: Option<&WorkerThread>) -> Self364     pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
365         Self::with_count(1, owner)
366     }
367 
with_count(count: usize, owner: Option<&WorkerThread>) -> Self368     pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
369         Self {
370             counter: AtomicUsize::new(count),
371             kind: match owner {
372                 Some(owner) => CountLatchKind::Stealing {
373                     latch: CoreLatch::new(),
374                     registry: Arc::clone(owner.registry()),
375                     worker_index: owner.index(),
376                 },
377                 None => CountLatchKind::Blocking {
378                     latch: LockLatch::new(),
379                 },
380             },
381         }
382     }
383 
384     #[inline]
increment(&self)385     pub(super) fn increment(&self) {
386         let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
387         debug_assert!(old_counter != 0);
388     }
389 
wait(&self, owner: Option<&WorkerThread>)390     pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
391         match &self.kind {
392             CountLatchKind::Stealing {
393                 latch,
394                 registry,
395                 worker_index,
396             } => unsafe {
397                 let owner = owner.expect("owner thread");
398                 debug_assert_eq!(registry.id(), owner.registry().id());
399                 debug_assert_eq!(*worker_index, owner.index());
400                 owner.wait_until(latch);
401             },
402             CountLatchKind::Blocking { latch } => latch.wait(),
403         }
404     }
405 }
406 
407 impl Latch for CountLatch {
408     #[inline]
set(this: *const Self)409     unsafe fn set(this: *const Self) {
410         if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
411             // NOTE: Once we call `set` on the internal `latch`,
412             // the target may proceed and invalidate `this`!
413             match (*this).kind {
414                 CountLatchKind::Stealing {
415                     ref latch,
416                     ref registry,
417                     worker_index,
418                 } => {
419                     let registry = Arc::clone(registry);
420                     if CoreLatch::set(latch) {
421                         registry.notify_worker_latch_is_set(worker_index);
422                     }
423                 }
424                 CountLatchKind::Blocking { ref latch } => LockLatch::set(latch),
425             }
426         }
427     }
428 }
429 
430 /// `&L` without any implication of `dereferenceable` for `Latch::set`
431 pub(super) struct LatchRef<'a, L> {
432     inner: *const L,
433     marker: PhantomData<&'a L>,
434 }
435 
436 impl<L> LatchRef<'_, L> {
new(inner: &L) -> LatchRef<'_, L>437     pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
438         LatchRef {
439             inner,
440             marker: PhantomData,
441         }
442     }
443 }
444 
445 unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
446 
447 impl<L> Deref for LatchRef<'_, L> {
448     type Target = L;
449 
deref(&self) -> &L450     fn deref(&self) -> &L {
451         // SAFETY: if we have &self, the inner latch is still alive
452         unsafe { &*self.inner }
453     }
454 }
455 
456 impl<L: Latch> Latch for LatchRef<'_, L> {
457     #[inline]
set(this: *const Self)458     unsafe fn set(this: *const Self) {
459         L::set((*this).inner);
460     }
461 }
462