1 use std::marker::PhantomData; 2 use std::ops::Deref; 3 use std::sync::atomic::{AtomicUsize, Ordering}; 4 use std::sync::Arc; 5 use std::usize; 6 7 use crate::registry::{Registry, WorkerThread}; 8 use crate::sync::{Condvar, Mutex}; 9 10 /// We define various kinds of latches, which are all a primitive signaling 11 /// mechanism. A latch starts as false. Eventually someone calls `set()` and 12 /// it becomes true. You can test if it has been set by calling `probe()`. 13 /// 14 /// Some kinds of latches, but not all, support a `wait()` operation 15 /// that will wait until the latch is set, blocking efficiently. That 16 /// is not part of the trait since it is not possibly to do with all 17 /// latches. 18 /// 19 /// The intention is that `set()` is called once, but `probe()` may be 20 /// called any number of times. Once `probe()` returns true, the memory 21 /// effects that occurred before `set()` become visible. 22 /// 23 /// It'd probably be better to refactor the API into two paired types, 24 /// but that's a bit of work, and this is not a public API. 25 /// 26 /// ## Memory ordering 27 /// 28 /// Latches need to guarantee two things: 29 /// 30 /// - Once `probe()` returns true, all memory effects from the `set()` 31 /// are visible (in other words, the set should synchronize-with 32 /// the probe). 33 /// - Once `set()` occurs, the next `probe()` *will* observe it. This 34 /// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep 35 /// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. 36 pub(super) trait Latch { 37 /// Set the latch, signalling others. 38 /// 39 /// # WARNING 40 /// 41 /// Setting a latch triggers other threads to wake up and (in some 42 /// cases) complete. This may, in turn, cause memory to be 43 /// deallocated and so forth. One must be very careful about this, 44 /// and it's typically better to read all the fields you will need 45 /// to access *before* a latch is set! 46 /// 47 /// This function operates on `*const Self` instead of `&self` to allow it 48 /// to become dangling during this call. The caller must ensure that the 49 /// pointer is valid upon entry, and not invalidated during the call by any 50 /// actions other than `set` itself. set(this: *const Self)51 unsafe fn set(this: *const Self); 52 } 53 54 pub(super) trait AsCoreLatch { as_core_latch(&self) -> &CoreLatch55 fn as_core_latch(&self) -> &CoreLatch; 56 } 57 58 /// Latch is not set, owning thread is awake 59 const UNSET: usize = 0; 60 61 /// Latch is not set, owning thread is going to sleep on this latch 62 /// (but has not yet fallen asleep). 63 const SLEEPY: usize = 1; 64 65 /// Latch is not set, owning thread is asleep on this latch and 66 /// must be awoken. 67 const SLEEPING: usize = 2; 68 69 /// Latch is set. 70 const SET: usize = 3; 71 72 /// Spin latches are the simplest, most efficient kind, but they do 73 /// not support a `wait()` operation. They just have a boolean flag 74 /// that becomes true when `set()` is called. 75 #[derive(Debug)] 76 pub(super) struct CoreLatch { 77 state: AtomicUsize, 78 } 79 80 impl CoreLatch { 81 #[inline] new() -> Self82 fn new() -> Self { 83 Self { 84 state: AtomicUsize::new(0), 85 } 86 } 87 88 /// Invoked by owning thread as it prepares to sleep. Returns true 89 /// if the owning thread may proceed to fall asleep, false if the 90 /// latch was set in the meantime. 91 #[inline] get_sleepy(&self) -> bool92 pub(super) fn get_sleepy(&self) -> bool { 93 self.state 94 .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) 95 .is_ok() 96 } 97 98 /// Invoked by owning thread as it falls asleep sleep. Returns 99 /// true if the owning thread should block, or false if the latch 100 /// was set in the meantime. 101 #[inline] fall_asleep(&self) -> bool102 pub(super) fn fall_asleep(&self) -> bool { 103 self.state 104 .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) 105 .is_ok() 106 } 107 108 /// Invoked by owning thread as it falls asleep sleep. Returns 109 /// true if the owning thread should block, or false if the latch 110 /// was set in the meantime. 111 #[inline] wake_up(&self)112 pub(super) fn wake_up(&self) { 113 if !self.probe() { 114 let _ = 115 self.state 116 .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); 117 } 118 } 119 120 /// Set the latch. If this returns true, the owning thread was sleeping 121 /// and must be awoken. 122 /// 123 /// This is private because, typically, setting a latch involves 124 /// doing some wakeups; those are encapsulated in the surrounding 125 /// latch code. 126 #[inline] set(this: *const Self) -> bool127 unsafe fn set(this: *const Self) -> bool { 128 let old_state = (*this).state.swap(SET, Ordering::AcqRel); 129 old_state == SLEEPING 130 } 131 132 /// Test if this latch has been set. 133 #[inline] probe(&self) -> bool134 pub(super) fn probe(&self) -> bool { 135 self.state.load(Ordering::Acquire) == SET 136 } 137 } 138 139 impl AsCoreLatch for CoreLatch { 140 #[inline] as_core_latch(&self) -> &CoreLatch141 fn as_core_latch(&self) -> &CoreLatch { 142 self 143 } 144 } 145 146 /// Spin latches are the simplest, most efficient kind, but they do 147 /// not support a `wait()` operation. They just have a boolean flag 148 /// that becomes true when `set()` is called. 149 pub(super) struct SpinLatch<'r> { 150 core_latch: CoreLatch, 151 registry: &'r Arc<Registry>, 152 target_worker_index: usize, 153 cross: bool, 154 } 155 156 impl<'r> SpinLatch<'r> { 157 /// Creates a new spin latch that is owned by `thread`. This means 158 /// that `thread` is the only thread that should be blocking on 159 /// this latch -- it also means that when the latch is set, we 160 /// will wake `thread` if it is sleeping. 161 #[inline] new(thread: &'r WorkerThread) -> SpinLatch<'r>162 pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { 163 SpinLatch { 164 core_latch: CoreLatch::new(), 165 registry: thread.registry(), 166 target_worker_index: thread.index(), 167 cross: false, 168 } 169 } 170 171 /// Creates a new spin latch for cross-threadpool blocking. Notably, we 172 /// need to make sure the registry is kept alive after setting, so we can 173 /// safely call the notification. 174 #[inline] cross(thread: &'r WorkerThread) -> SpinLatch<'r>175 pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { 176 SpinLatch { 177 cross: true, 178 ..SpinLatch::new(thread) 179 } 180 } 181 182 #[inline] probe(&self) -> bool183 pub(super) fn probe(&self) -> bool { 184 self.core_latch.probe() 185 } 186 } 187 188 impl<'r> AsCoreLatch for SpinLatch<'r> { 189 #[inline] as_core_latch(&self) -> &CoreLatch190 fn as_core_latch(&self) -> &CoreLatch { 191 &self.core_latch 192 } 193 } 194 195 impl<'r> Latch for SpinLatch<'r> { 196 #[inline] set(this: *const Self)197 unsafe fn set(this: *const Self) { 198 let cross_registry; 199 200 let registry: &Registry = if (*this).cross { 201 // Ensure the registry stays alive while we notify it. 202 // Otherwise, it would be possible that we set the spin 203 // latch and the other thread sees it and exits, causing 204 // the registry to be deallocated, all before we get a 205 // chance to invoke `registry.notify_worker_latch_is_set`. 206 cross_registry = Arc::clone((*this).registry); 207 &cross_registry 208 } else { 209 // If this is not a "cross-registry" spin-latch, then the 210 // thread which is performing `set` is itself ensuring 211 // that the registry stays alive. However, that doesn't 212 // include this *particular* `Arc` handle if the waiting 213 // thread then exits, so we must completely dereference it. 214 (*this).registry 215 }; 216 let target_worker_index = (*this).target_worker_index; 217 218 // NOTE: Once we `set`, the target may proceed and invalidate `this`! 219 if CoreLatch::set(&(*this).core_latch) { 220 // Subtle: at this point, we can no longer read from 221 // `self`, because the thread owning this spin latch may 222 // have awoken and deallocated the latch. Therefore, we 223 // only use fields whose values we already read. 224 registry.notify_worker_latch_is_set(target_worker_index); 225 } 226 } 227 } 228 229 /// A Latch starts as false and eventually becomes true. You can block 230 /// until it becomes true. 231 #[derive(Debug)] 232 pub(super) struct LockLatch { 233 m: Mutex<bool>, 234 v: Condvar, 235 } 236 237 impl LockLatch { 238 #[inline] new() -> LockLatch239 pub(super) fn new() -> LockLatch { 240 LockLatch { 241 m: Mutex::new(false), 242 v: Condvar::new(), 243 } 244 } 245 246 /// Block until latch is set, then resets this lock latch so it can be reused again. wait_and_reset(&self)247 pub(super) fn wait_and_reset(&self) { 248 let mut guard = self.m.lock().unwrap(); 249 while !*guard { 250 guard = self.v.wait(guard).unwrap(); 251 } 252 *guard = false; 253 } 254 255 /// Block until latch is set. wait(&self)256 pub(super) fn wait(&self) { 257 let mut guard = self.m.lock().unwrap(); 258 while !*guard { 259 guard = self.v.wait(guard).unwrap(); 260 } 261 } 262 } 263 264 impl Latch for LockLatch { 265 #[inline] set(this: *const Self)266 unsafe fn set(this: *const Self) { 267 let mut guard = (*this).m.lock().unwrap(); 268 *guard = true; 269 (*this).v.notify_all(); 270 } 271 } 272 273 /// Once latches are used to implement one-time blocking, primarily 274 /// for the termination flag of the threads in the pool. 275 /// 276 /// Note: like a `SpinLatch`, once-latches are always associated with 277 /// some registry that is probing them, which must be tickled when 278 /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a 279 /// reference to that registry. This is because in some cases the 280 /// registry owns the once-latch, and that would create a cycle. So a 281 /// `OnceLatch` must be given a reference to its owning registry when 282 /// it is set. For this reason, it does not implement the `Latch` 283 /// trait (but it doesn't have to, as it is not used in those generic 284 /// contexts). 285 #[derive(Debug)] 286 pub(super) struct OnceLatch { 287 core_latch: CoreLatch, 288 } 289 290 impl OnceLatch { 291 #[inline] new() -> OnceLatch292 pub(super) fn new() -> OnceLatch { 293 Self { 294 core_latch: CoreLatch::new(), 295 } 296 } 297 298 /// Set the latch, then tickle the specific worker thread, 299 /// which should be the one that owns this latch. 300 #[inline] set_and_tickle_one( this: *const Self, registry: &Registry, target_worker_index: usize, )301 pub(super) unsafe fn set_and_tickle_one( 302 this: *const Self, 303 registry: &Registry, 304 target_worker_index: usize, 305 ) { 306 if CoreLatch::set(&(*this).core_latch) { 307 registry.notify_worker_latch_is_set(target_worker_index); 308 } 309 } 310 } 311 312 impl AsCoreLatch for OnceLatch { 313 #[inline] as_core_latch(&self) -> &CoreLatch314 fn as_core_latch(&self) -> &CoreLatch { 315 &self.core_latch 316 } 317 } 318 319 /// Counting latches are used to implement scopes. They track a 320 /// counter. Unlike other latches, calling `set()` does not 321 /// necessarily make the latch be considered `set()`; instead, it just 322 /// decrements the counter. The latch is only "set" (in the sense that 323 /// `probe()` returns true) once the counter reaches zero. 324 #[derive(Debug)] 325 pub(super) struct CountLatch { 326 counter: AtomicUsize, 327 kind: CountLatchKind, 328 } 329 330 enum CountLatchKind { 331 /// A latch for scopes created on a rayon thread which will participate in work- 332 /// stealing while it waits for completion. This thread is not necessarily part 333 /// of the same registry as the scope itself! 334 Stealing { 335 latch: CoreLatch, 336 /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool 337 /// with registry B, when a job completes in a thread of registry B, we may 338 /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A. 339 /// That means we need a reference to registry A (since at that point we will 340 /// only have a reference to registry B), so we stash it here. 341 registry: Arc<Registry>, 342 /// The index of the worker to wake in `registry` 343 worker_index: usize, 344 }, 345 346 /// A latch for scopes created on a non-rayon thread which will block to wait. 347 Blocking { latch: LockLatch }, 348 } 349 350 impl std::fmt::Debug for CountLatchKind { fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result351 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 352 match self { 353 CountLatchKind::Stealing { latch, .. } => { 354 f.debug_tuple("Stealing").field(latch).finish() 355 } 356 CountLatchKind::Blocking { latch, .. } => { 357 f.debug_tuple("Blocking").field(latch).finish() 358 } 359 } 360 } 361 } 362 363 impl CountLatch { new(owner: Option<&WorkerThread>) -> Self364 pub(super) fn new(owner: Option<&WorkerThread>) -> Self { 365 Self::with_count(1, owner) 366 } 367 with_count(count: usize, owner: Option<&WorkerThread>) -> Self368 pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { 369 Self { 370 counter: AtomicUsize::new(count), 371 kind: match owner { 372 Some(owner) => CountLatchKind::Stealing { 373 latch: CoreLatch::new(), 374 registry: Arc::clone(owner.registry()), 375 worker_index: owner.index(), 376 }, 377 None => CountLatchKind::Blocking { 378 latch: LockLatch::new(), 379 }, 380 }, 381 } 382 } 383 384 #[inline] increment(&self)385 pub(super) fn increment(&self) { 386 let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); 387 debug_assert!(old_counter != 0); 388 } 389 wait(&self, owner: Option<&WorkerThread>)390 pub(super) fn wait(&self, owner: Option<&WorkerThread>) { 391 match &self.kind { 392 CountLatchKind::Stealing { 393 latch, 394 registry, 395 worker_index, 396 } => unsafe { 397 let owner = owner.expect("owner thread"); 398 debug_assert_eq!(registry.id(), owner.registry().id()); 399 debug_assert_eq!(*worker_index, owner.index()); 400 owner.wait_until(latch); 401 }, 402 CountLatchKind::Blocking { latch } => latch.wait(), 403 } 404 } 405 } 406 407 impl Latch for CountLatch { 408 #[inline] set(this: *const Self)409 unsafe fn set(this: *const Self) { 410 if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 { 411 // NOTE: Once we call `set` on the internal `latch`, 412 // the target may proceed and invalidate `this`! 413 match (*this).kind { 414 CountLatchKind::Stealing { 415 ref latch, 416 ref registry, 417 worker_index, 418 } => { 419 let registry = Arc::clone(registry); 420 if CoreLatch::set(latch) { 421 registry.notify_worker_latch_is_set(worker_index); 422 } 423 } 424 CountLatchKind::Blocking { ref latch } => LockLatch::set(latch), 425 } 426 } 427 } 428 } 429 430 /// `&L` without any implication of `dereferenceable` for `Latch::set` 431 pub(super) struct LatchRef<'a, L> { 432 inner: *const L, 433 marker: PhantomData<&'a L>, 434 } 435 436 impl<L> LatchRef<'_, L> { new(inner: &L) -> LatchRef<'_, L>437 pub(super) fn new(inner: &L) -> LatchRef<'_, L> { 438 LatchRef { 439 inner, 440 marker: PhantomData, 441 } 442 } 443 } 444 445 unsafe impl<L: Sync> Sync for LatchRef<'_, L> {} 446 447 impl<L> Deref for LatchRef<'_, L> { 448 type Target = L; 449 deref(&self) -> &L450 fn deref(&self) -> &L { 451 // SAFETY: if we have &self, the inner latch is still alive 452 unsafe { &*self.inner } 453 } 454 } 455 456 impl<L: Latch> Latch for LatchRef<'_, L> { 457 #[inline] set(this: *const Self)458 unsafe fn set(this: *const Self) { 459 L::set((*this).inner); 460 } 461 } 462