1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
8 use crate::util::UncheckedOptionExt;
9 use crate::word_lock::WordLock;
10 use core::{
11     cell::{Cell, UnsafeCell},
12     ptr,
13     sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
14 };
15 use smallvec::SmallVec;
16 use std::time::{Duration, Instant};
17 
18 // Don't use Instant on wasm32-unknown-unknown, it just panics.
19 cfg_if::cfg_if! {
20     if #[cfg(all(
21         target_family = "wasm",
22         target_os = "unknown",
23         target_vendor = "unknown"
24     ))] {
25         #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
26         struct TimeoutInstant;
27         impl TimeoutInstant {
28             fn now() -> TimeoutInstant {
29                 TimeoutInstant
30             }
31         }
32         impl core::ops::Add<Duration> for TimeoutInstant {
33             type Output = Self;
34             fn add(self, _rhs: Duration) -> Self::Output {
35                 TimeoutInstant
36             }
37         }
38     } else {
39         use std::time::Instant as TimeoutInstant;
40     }
41 }
42 
43 static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
44 
45 /// Holds the pointer to the currently active `HashTable`.
46 ///
47 /// # Safety
48 ///
49 /// Except for the initial value of null, it must always point to a valid `HashTable` instance.
50 /// Any `HashTable` this global static has ever pointed to must never be freed.
51 static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut());
52 
53 // Even with 3x more buckets than threads, the memory overhead per thread is
54 // still only a few hundred bytes per thread.
55 const LOAD_FACTOR: usize = 3;
56 
57 struct HashTable {
58     // Hash buckets for the table
59     entries: Box<[Bucket]>,
60 
61     // Number of bits used for the hash function
62     hash_bits: u32,
63 
64     // Previous table. This is only kept to keep leak detectors happy.
65     _prev: *const HashTable,
66 }
67 
68 impl HashTable {
69     #[inline]
new(num_threads: usize, prev: *const HashTable) -> Box<HashTable>70     fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
71         let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
72         let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
73 
74         let now = TimeoutInstant::now();
75         let mut entries = Vec::with_capacity(new_size);
76         for i in 0..new_size {
77             // We must ensure the seed is not zero
78             entries.push(Bucket::new(now, i as u32 + 1));
79         }
80 
81         Box::new(HashTable {
82             entries: entries.into_boxed_slice(),
83             hash_bits,
84             _prev: prev,
85         })
86     }
87 }
88 
89 #[repr(align(64))]
90 struct Bucket {
91     // Lock protecting the queue
92     mutex: WordLock,
93 
94     // Linked list of threads waiting on this bucket
95     queue_head: Cell<*const ThreadData>,
96     queue_tail: Cell<*const ThreadData>,
97 
98     // Next time at which point be_fair should be set
99     fair_timeout: UnsafeCell<FairTimeout>,
100 }
101 
102 impl Bucket {
103     #[inline]
new(timeout: TimeoutInstant, seed: u32) -> Self104     pub fn new(timeout: TimeoutInstant, seed: u32) -> Self {
105         Self {
106             mutex: WordLock::new(),
107             queue_head: Cell::new(ptr::null()),
108             queue_tail: Cell::new(ptr::null()),
109             fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)),
110         }
111     }
112 }
113 
114 struct FairTimeout {
115     // Next time at which point be_fair should be set
116     timeout: TimeoutInstant,
117 
118     // the PRNG state for calculating the next timeout
119     seed: u32,
120 }
121 
122 impl FairTimeout {
123     #[inline]
new(timeout: TimeoutInstant, seed: u32) -> FairTimeout124     fn new(timeout: TimeoutInstant, seed: u32) -> FairTimeout {
125         FairTimeout { timeout, seed }
126     }
127 
128     // Determine whether we should force a fair unlock, and update the timeout
129     #[inline]
should_timeout(&mut self) -> bool130     fn should_timeout(&mut self) -> bool {
131         let now = TimeoutInstant::now();
132         if now > self.timeout {
133             // Time between 0 and 1ms.
134             let nanos = self.gen_u32() % 1_000_000;
135             self.timeout = now + Duration::new(0, nanos);
136             true
137         } else {
138             false
139         }
140     }
141 
142     // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia.
gen_u32(&mut self) -> u32143     fn gen_u32(&mut self) -> u32 {
144         self.seed ^= self.seed << 13;
145         self.seed ^= self.seed >> 17;
146         self.seed ^= self.seed << 5;
147         self.seed
148     }
149 }
150 
151 struct ThreadData {
152     parker: ThreadParker,
153 
154     // Key that this thread is sleeping on. This may change if the thread is
155     // requeued to a different key.
156     key: AtomicUsize,
157 
158     // Linked list of parked threads in a bucket
159     next_in_queue: Cell<*const ThreadData>,
160 
161     // UnparkToken passed to this thread when it is unparked
162     unpark_token: Cell<UnparkToken>,
163 
164     // ParkToken value set by the thread when it was parked
165     park_token: Cell<ParkToken>,
166 
167     // Is the thread parked with a timeout?
168     parked_with_timeout: Cell<bool>,
169 
170     // Extra data for deadlock detection
171     #[cfg(feature = "deadlock_detection")]
172     deadlock_data: deadlock::DeadlockData,
173 }
174 
175 impl ThreadData {
new() -> ThreadData176     fn new() -> ThreadData {
177         // Keep track of the total number of live ThreadData objects and resize
178         // the hash table accordingly.
179         let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
180         grow_hashtable(num_threads);
181 
182         ThreadData {
183             parker: ThreadParker::new(),
184             key: AtomicUsize::new(0),
185             next_in_queue: Cell::new(ptr::null()),
186             unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
187             park_token: Cell::new(DEFAULT_PARK_TOKEN),
188             parked_with_timeout: Cell::new(false),
189             #[cfg(feature = "deadlock_detection")]
190             deadlock_data: deadlock::DeadlockData::new(),
191         }
192     }
193 }
194 
195 // Invokes the given closure with a reference to the current thread `ThreadData`.
196 #[inline(always)]
with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T197 fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
198     // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
199     // to construct. Try to use a thread-local version if possible. Otherwise just
200     // create a ThreadData on the stack
201     let mut thread_data_storage = None;
202     thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
203     let thread_data_ptr = THREAD_DATA
204         .try_with(|x| x as *const ThreadData)
205         .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new));
206 
207     f(unsafe { &*thread_data_ptr })
208 }
209 
210 impl Drop for ThreadData {
drop(&mut self)211     fn drop(&mut self) {
212         NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
213     }
214 }
215 
216 /// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
217 /// The reference is valid forever. However, the `HashTable` it references might become stale
218 /// at any point. Meaning it still exists, but it is not the instance in active use.
219 #[inline]
get_hashtable() -> &'static HashTable220 fn get_hashtable() -> &'static HashTable {
221     let table = HASHTABLE.load(Ordering::Acquire);
222 
223     // If there is no table, create one
224     if table.is_null() {
225         create_hashtable()
226     } else {
227         // SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed.
228         unsafe { &*table }
229     }
230 }
231 
232 /// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
233 /// The reference is valid forever. However, the `HashTable` it references might become stale
234 /// at any point. Meaning it still exists, but it is not the instance in active use.
235 #[cold]
create_hashtable() -> &'static HashTable236 fn create_hashtable() -> &'static HashTable {
237     let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
238 
239     // If this fails then it means some other thread created the hash table first.
240     let table = match HASHTABLE.compare_exchange(
241         ptr::null_mut(),
242         new_table,
243         Ordering::AcqRel,
244         Ordering::Acquire,
245     ) {
246         Ok(_) => new_table,
247         Err(old_table) => {
248             // Free the table we created
249             // SAFETY: `new_table` is created from `Box::into_raw` above and only freed here.
250             unsafe {
251                 let _ = Box::from_raw(new_table);
252             }
253             old_table
254         }
255     };
256     // SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we
257     // created here, or it is one loaded from `HASHTABLE`.
258     unsafe { &*table }
259 }
260 
261 // Grow the hash table so that it is big enough for the given number of threads.
262 // This isn't performance-critical since it is only done when a ThreadData is
263 // created, which only happens once per thread.
grow_hashtable(num_threads: usize)264 fn grow_hashtable(num_threads: usize) {
265     // Lock all buckets in the existing table and get a reference to it
266     let old_table = loop {
267         let table = get_hashtable();
268 
269         // Check if we need to resize the existing table
270         if table.entries.len() >= LOAD_FACTOR * num_threads {
271             return;
272         }
273 
274         // Lock all buckets in the old table
275         for bucket in &table.entries[..] {
276             bucket.mutex.lock();
277         }
278 
279         // Now check if our table is still the latest one. Another thread could
280         // have grown the hash table between us reading HASHTABLE and locking
281         // the buckets.
282         if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ {
283             break table;
284         }
285 
286         // Unlock buckets and try again
287         for bucket in &table.entries[..] {
288             // SAFETY: We hold the lock here, as required
289             unsafe { bucket.mutex.unlock() };
290         }
291     };
292 
293     // Create the new table
294     let mut new_table = HashTable::new(num_threads, old_table);
295 
296     // Move the entries from the old table to the new one
297     for bucket in &old_table.entries[..] {
298         // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked
299         // lists. All `ThreadData` instances in these lists will remain valid as long as they are
300         // present in the lists, meaning as long as their threads are parked.
301         unsafe { rehash_bucket_into(bucket, &mut new_table) };
302     }
303 
304     // Publish the new table. No races are possible at this point because
305     // any other thread trying to grow the hash table is blocked on the bucket
306     // locks in the old table.
307     HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);
308 
309     // Unlock all buckets in the old table
310     for bucket in &old_table.entries[..] {
311         // SAFETY: We hold the lock here, as required
312         unsafe { bucket.mutex.unlock() };
313     }
314 }
315 
316 /// Iterate through all `ThreadData` objects in the bucket and insert them into the given table
317 /// in the bucket their key correspond to for this table.
318 ///
319 /// # Safety
320 ///
321 /// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing
322 /// `ThreadData` instances that must stay valid at least as long as the given `table` is in use.
323 ///
324 /// The given `table` must only contain buckets with correctly constructed linked lists.
rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable)325 unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) {
326     let mut current: *const ThreadData = bucket.queue_head.get();
327     while !current.is_null() {
328         let next = (*current).next_in_queue.get();
329         let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits);
330         if table.entries[hash].queue_tail.get().is_null() {
331             table.entries[hash].queue_head.set(current);
332         } else {
333             (*table.entries[hash].queue_tail.get())
334                 .next_in_queue
335                 .set(current);
336         }
337         table.entries[hash].queue_tail.set(current);
338         (*current).next_in_queue.set(ptr::null());
339         current = next;
340     }
341 }
342 
343 // Hash function for addresses
344 #[cfg(target_pointer_width = "32")]
345 #[inline]
hash(key: usize, bits: u32) -> usize346 fn hash(key: usize, bits: u32) -> usize {
347     key.wrapping_mul(0x9E3779B9) >> (32 - bits)
348 }
349 #[cfg(target_pointer_width = "64")]
350 #[inline]
hash(key: usize, bits: u32) -> usize351 fn hash(key: usize, bits: u32) -> usize {
352     key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
353 }
354 
355 /// Locks the bucket for the given key and returns a reference to it.
356 /// The returned bucket must be unlocked again in order to not cause deadlocks.
357 #[inline]
lock_bucket(key: usize) -> &'static Bucket358 fn lock_bucket(key: usize) -> &'static Bucket {
359     loop {
360         let hashtable = get_hashtable();
361 
362         let hash = hash(key, hashtable.hash_bits);
363         let bucket = &hashtable.entries[hash];
364 
365         // Lock the bucket
366         bucket.mutex.lock();
367 
368         // If no other thread has rehashed the table before we grabbed the lock
369         // then we are good to go! The lock we grabbed prevents any rehashes.
370         if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
371             return bucket;
372         }
373 
374         // Unlock the bucket and try again
375         // SAFETY: We hold the lock here, as required
376         unsafe { bucket.mutex.unlock() };
377     }
378 }
379 
380 /// Locks the bucket for the given key and returns a reference to it. But checks that the key
381 /// hasn't been changed in the meantime due to a requeue.
382 /// The returned bucket must be unlocked again in order to not cause deadlocks.
383 #[inline]
lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket)384 fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) {
385     loop {
386         let hashtable = get_hashtable();
387         let current_key = key.load(Ordering::Relaxed);
388 
389         let hash = hash(current_key, hashtable.hash_bits);
390         let bucket = &hashtable.entries[hash];
391 
392         // Lock the bucket
393         bucket.mutex.lock();
394 
395         // Check that both the hash table and key are correct while the bucket
396         // is locked. Note that the key can't change once we locked the proper
397         // bucket for it, so we just keep trying until we have the correct key.
398         if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _
399             && key.load(Ordering::Relaxed) == current_key
400         {
401             return (current_key, bucket);
402         }
403 
404         // Unlock the bucket and try again
405         // SAFETY: We hold the lock here, as required
406         unsafe { bucket.mutex.unlock() };
407     }
408 }
409 
410 /// Locks the two buckets for the given pair of keys and returns references to them.
411 /// The returned buckets must be unlocked again in order to not cause deadlocks.
412 ///
413 /// If both keys hash to the same value, both returned references will be to the same bucket. Be
414 /// careful to only unlock it once in this case, always use `unlock_bucket_pair`.
415 #[inline]
lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket)416 fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) {
417     loop {
418         let hashtable = get_hashtable();
419 
420         let hash1 = hash(key1, hashtable.hash_bits);
421         let hash2 = hash(key2, hashtable.hash_bits);
422 
423         // Get the bucket at the lowest hash/index first
424         let bucket1 = if hash1 <= hash2 {
425             &hashtable.entries[hash1]
426         } else {
427             &hashtable.entries[hash2]
428         };
429 
430         // Lock the first bucket
431         bucket1.mutex.lock();
432 
433         // If no other thread has rehashed the table before we grabbed the lock
434         // then we are good to go! The lock we grabbed prevents any rehashes.
435         if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
436             // Now lock the second bucket and return the two buckets
437             if hash1 == hash2 {
438                 return (bucket1, bucket1);
439             } else if hash1 < hash2 {
440                 let bucket2 = &hashtable.entries[hash2];
441                 bucket2.mutex.lock();
442                 return (bucket1, bucket2);
443             } else {
444                 let bucket2 = &hashtable.entries[hash1];
445                 bucket2.mutex.lock();
446                 return (bucket2, bucket1);
447             }
448         }
449 
450         // Unlock the bucket and try again
451         // SAFETY: We hold the lock here, as required
452         unsafe { bucket1.mutex.unlock() };
453     }
454 }
455 
456 /// Unlock a pair of buckets
457 ///
458 /// # Safety
459 ///
460 /// Both buckets must be locked
461 #[inline]
unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket)462 unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
463     bucket1.mutex.unlock();
464     if !ptr::eq(bucket1, bucket2) {
465         bucket2.mutex.unlock();
466     }
467 }
468 
469 /// Result of a park operation.
470 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
471 pub enum ParkResult {
472     /// We were unparked by another thread with the given token.
473     Unparked(UnparkToken),
474 
475     /// The validation callback returned false.
476     Invalid,
477 
478     /// The timeout expired.
479     TimedOut,
480 }
481 
482 impl ParkResult {
483     /// Returns true if we were unparked by another thread.
484     #[inline]
is_unparked(self) -> bool485     pub fn is_unparked(self) -> bool {
486         if let ParkResult::Unparked(_) = self {
487             true
488         } else {
489             false
490         }
491     }
492 }
493 
494 /// Result of an unpark operation.
495 #[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
496 pub struct UnparkResult {
497     /// The number of threads that were unparked.
498     pub unparked_threads: usize,
499 
500     /// The number of threads that were requeued.
501     pub requeued_threads: usize,
502 
503     /// Whether there are any threads remaining in the queue. This only returns
504     /// true if a thread was unparked.
505     pub have_more_threads: bool,
506 
507     /// This is set to true on average once every 0.5ms for any given key. It
508     /// should be used to switch to a fair unlocking mechanism for a particular
509     /// unlock.
510     pub be_fair: bool,
511 
512     /// Private field so new fields can be added without breakage.
513     _sealed: (),
514 }
515 
516 /// Operation that `unpark_requeue` should perform.
517 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
518 pub enum RequeueOp {
519     /// Abort the operation without doing anything.
520     Abort,
521 
522     /// Unpark one thread and requeue the rest onto the target queue.
523     UnparkOneRequeueRest,
524 
525     /// Requeue all threads onto the target queue.
526     RequeueAll,
527 
528     /// Unpark one thread and leave the rest parked. No requeuing is done.
529     UnparkOne,
530 
531     /// Requeue one thread and leave the rest parked on the original queue.
532     RequeueOne,
533 }
534 
535 /// Operation that `unpark_filter` should perform for each thread.
536 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
537 pub enum FilterOp {
538     /// Unpark the thread and continue scanning the list of parked threads.
539     Unpark,
540 
541     /// Don't unpark the thread and continue scanning the list of parked threads.
542     Skip,
543 
544     /// Don't unpark the thread and stop scanning the list of parked threads.
545     Stop,
546 }
547 
548 /// A value which is passed from an unparker to a parked thread.
549 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
550 pub struct UnparkToken(pub usize);
551 
552 /// A value associated with a parked thread which can be used by `unpark_filter`.
553 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
554 pub struct ParkToken(pub usize);
555 
556 /// A default unpark token to use.
557 pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
558 
559 /// A default park token to use.
560 pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
561 
562 /// Parks the current thread in the queue associated with the given key.
563 ///
564 /// The `validate` function is called while the queue is locked and can abort
565 /// the operation by returning false. If `validate` returns true then the
566 /// current thread is appended to the queue and the queue is unlocked.
567 ///
568 /// The `before_sleep` function is called after the queue is unlocked but before
569 /// the thread is put to sleep. The thread will then sleep until it is unparked
570 /// or the given timeout is reached.
571 ///
572 /// The `timed_out` function is also called while the queue is locked, but only
573 /// if the timeout was reached. It is passed the key of the queue it was in when
574 /// it timed out, which may be different from the original key if
575 /// `unpark_requeue` was called. It is also passed a bool which indicates
576 /// whether it was the last thread in the queue.
577 ///
578 /// # Safety
579 ///
580 /// You should only call this function with an address that you control, since
581 /// you could otherwise interfere with the operation of other synchronization
582 /// primitives.
583 ///
584 /// The `validate` and `timed_out` functions are called while the queue is
585 /// locked and must not panic or call into any function in `parking_lot`.
586 ///
587 /// The `before_sleep` function is called outside the queue lock and is allowed
588 /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
589 /// it is not allowed to call `park` or panic.
590 #[inline]
park( key: usize, validate: impl FnOnce() -> bool, before_sleep: impl FnOnce(), timed_out: impl FnOnce(usize, bool), park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult591 pub unsafe fn park(
592     key: usize,
593     validate: impl FnOnce() -> bool,
594     before_sleep: impl FnOnce(),
595     timed_out: impl FnOnce(usize, bool),
596     park_token: ParkToken,
597     timeout: Option<Instant>,
598 ) -> ParkResult {
599     // Grab our thread data, this also ensures that the hash table exists
600     with_thread_data(|thread_data| {
601         // Lock the bucket for the given key
602         let bucket = lock_bucket(key);
603 
604         // If the validation function fails, just return
605         if !validate() {
606             // SAFETY: We hold the lock here, as required
607             bucket.mutex.unlock();
608             return ParkResult::Invalid;
609         }
610 
611         // Append our thread data to the queue and unlock the bucket
612         thread_data.parked_with_timeout.set(timeout.is_some());
613         thread_data.next_in_queue.set(ptr::null());
614         thread_data.key.store(key, Ordering::Relaxed);
615         thread_data.park_token.set(park_token);
616         thread_data.parker.prepare_park();
617         if !bucket.queue_head.get().is_null() {
618             (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
619         } else {
620             bucket.queue_head.set(thread_data);
621         }
622         bucket.queue_tail.set(thread_data);
623         // SAFETY: We hold the lock here, as required
624         bucket.mutex.unlock();
625 
626         // Invoke the pre-sleep callback
627         before_sleep();
628 
629         // Park our thread and determine whether we were woken up by an unpark
630         // or by our timeout. Note that this isn't precise: we can still be
631         // unparked since we are still in the queue.
632         let unparked = match timeout {
633             Some(timeout) => thread_data.parker.park_until(timeout),
634             None => {
635                 thread_data.parker.park();
636                 // call deadlock detection on_unpark hook
637                 deadlock::on_unpark(thread_data);
638                 true
639             }
640         };
641 
642         // If we were unparked, return now
643         if unparked {
644             return ParkResult::Unparked(thread_data.unpark_token.get());
645         }
646 
647         // Lock our bucket again. Note that the hashtable may have been rehashed in
648         // the meantime. Our key may also have changed if we were requeued.
649         let (key, bucket) = lock_bucket_checked(&thread_data.key);
650 
651         // Now we need to check again if we were unparked or timed out. Unlike the
652         // last check this is precise because we hold the bucket lock.
653         if !thread_data.parker.timed_out() {
654             // SAFETY: We hold the lock here, as required
655             bucket.mutex.unlock();
656             return ParkResult::Unparked(thread_data.unpark_token.get());
657         }
658 
659         // We timed out, so we now need to remove our thread from the queue
660         let mut link = &bucket.queue_head;
661         let mut current = bucket.queue_head.get();
662         let mut previous = ptr::null();
663         let mut was_last_thread = true;
664         while !current.is_null() {
665             if current == thread_data {
666                 let next = (*current).next_in_queue.get();
667                 link.set(next);
668                 if bucket.queue_tail.get() == current {
669                     bucket.queue_tail.set(previous);
670                 } else {
671                     // Scan the rest of the queue to see if there are any other
672                     // entries with the given key.
673                     let mut scan = next;
674                     while !scan.is_null() {
675                         if (*scan).key.load(Ordering::Relaxed) == key {
676                             was_last_thread = false;
677                             break;
678                         }
679                         scan = (*scan).next_in_queue.get();
680                     }
681                 }
682 
683                 // Callback to indicate that we timed out, and whether we were the
684                 // last thread on the queue.
685                 timed_out(key, was_last_thread);
686                 break;
687             } else {
688                 if (*current).key.load(Ordering::Relaxed) == key {
689                     was_last_thread = false;
690                 }
691                 link = &(*current).next_in_queue;
692                 previous = current;
693                 current = link.get();
694             }
695         }
696 
697         // There should be no way for our thread to have been removed from the queue
698         // if we timed out.
699         debug_assert!(!current.is_null());
700 
701         // Unlock the bucket, we are done
702         // SAFETY: We hold the lock here, as required
703         bucket.mutex.unlock();
704         ParkResult::TimedOut
705     })
706 }
707 
708 /// Unparks one thread from the queue associated with the given key.
709 ///
710 /// The `callback` function is called while the queue is locked and before the
711 /// target thread is woken up. The `UnparkResult` argument to the function
712 /// indicates whether a thread was found in the queue and whether this was the
713 /// last thread in the queue. This value is also returned by `unpark_one`.
714 ///
715 /// The `callback` function should return an `UnparkToken` value which will be
716 /// passed to the thread that is unparked. If no thread is unparked then the
717 /// returned value is ignored.
718 ///
719 /// # Safety
720 ///
721 /// You should only call this function with an address that you control, since
722 /// you could otherwise interfere with the operation of other synchronization
723 /// primitives.
724 ///
725 /// The `callback` function is called while the queue is locked and must not
726 /// panic or call into any function in `parking_lot`.
727 ///
728 /// The `parking_lot` functions are not re-entrant and calling this method
729 /// from the context of an asynchronous signal handler may result in undefined
730 /// behavior, including corruption of internal state and/or deadlocks.
731 #[inline]
unpark_one( key: usize, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult732 pub unsafe fn unpark_one(
733     key: usize,
734     callback: impl FnOnce(UnparkResult) -> UnparkToken,
735 ) -> UnparkResult {
736     // Lock the bucket for the given key
737     let bucket = lock_bucket(key);
738 
739     // Find a thread with a matching key and remove it from the queue
740     let mut link = &bucket.queue_head;
741     let mut current = bucket.queue_head.get();
742     let mut previous = ptr::null();
743     let mut result = UnparkResult::default();
744     while !current.is_null() {
745         if (*current).key.load(Ordering::Relaxed) == key {
746             // Remove the thread from the queue
747             let next = (*current).next_in_queue.get();
748             link.set(next);
749             if bucket.queue_tail.get() == current {
750                 bucket.queue_tail.set(previous);
751             } else {
752                 // Scan the rest of the queue to see if there are any other
753                 // entries with the given key.
754                 let mut scan = next;
755                 while !scan.is_null() {
756                     if (*scan).key.load(Ordering::Relaxed) == key {
757                         result.have_more_threads = true;
758                         break;
759                     }
760                     scan = (*scan).next_in_queue.get();
761                 }
762             }
763 
764             // Invoke the callback before waking up the thread
765             result.unparked_threads = 1;
766             result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
767             let token = callback(result);
768 
769             // Set the token for the target thread
770             (*current).unpark_token.set(token);
771 
772             // This is a bit tricky: we first lock the ThreadParker to prevent
773             // the thread from exiting and freeing its ThreadData if its wait
774             // times out. Then we unlock the queue since we don't want to keep
775             // the queue locked while we perform a system call. Finally we wake
776             // up the parked thread.
777             let handle = (*current).parker.unpark_lock();
778             // SAFETY: We hold the lock here, as required
779             bucket.mutex.unlock();
780             handle.unpark();
781 
782             return result;
783         } else {
784             link = &(*current).next_in_queue;
785             previous = current;
786             current = link.get();
787         }
788     }
789 
790     // No threads with a matching key were found in the bucket
791     callback(result);
792     // SAFETY: We hold the lock here, as required
793     bucket.mutex.unlock();
794     result
795 }
796 
797 /// Unparks all threads in the queue associated with the given key.
798 ///
799 /// The given `UnparkToken` is passed to all unparked threads.
800 ///
801 /// This function returns the number of threads that were unparked.
802 ///
803 /// # Safety
804 ///
805 /// You should only call this function with an address that you control, since
806 /// you could otherwise interfere with the operation of other synchronization
807 /// primitives.
808 ///
809 /// The `parking_lot` functions are not re-entrant and calling this method
810 /// from the context of an asynchronous signal handler may result in undefined
811 /// behavior, including corruption of internal state and/or deadlocks.
812 #[inline]
unpark_all(key: usize, unpark_token: UnparkToken) -> usize813 pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
814     // Lock the bucket for the given key
815     let bucket = lock_bucket(key);
816 
817     // Remove all threads with the given key in the bucket
818     let mut link = &bucket.queue_head;
819     let mut current = bucket.queue_head.get();
820     let mut previous = ptr::null();
821     let mut threads = SmallVec::<[_; 8]>::new();
822     while !current.is_null() {
823         if (*current).key.load(Ordering::Relaxed) == key {
824             // Remove the thread from the queue
825             let next = (*current).next_in_queue.get();
826             link.set(next);
827             if bucket.queue_tail.get() == current {
828                 bucket.queue_tail.set(previous);
829             }
830 
831             // Set the token for the target thread
832             (*current).unpark_token.set(unpark_token);
833 
834             // Don't wake up threads while holding the queue lock. See comment
835             // in unpark_one. For now just record which threads we need to wake
836             // up.
837             threads.push((*current).parker.unpark_lock());
838             current = next;
839         } else {
840             link = &(*current).next_in_queue;
841             previous = current;
842             current = link.get();
843         }
844     }
845 
846     // Unlock the bucket
847     // SAFETY: We hold the lock here, as required
848     bucket.mutex.unlock();
849 
850     // Now that we are outside the lock, wake up all the threads that we removed
851     // from the queue.
852     let num_threads = threads.len();
853     for handle in threads.into_iter() {
854         handle.unpark();
855     }
856 
857     num_threads
858 }
859 
860 /// Removes all threads from the queue associated with `key_from`, optionally
861 /// unparks the first one and requeues the rest onto the queue associated with
862 /// `key_to`.
863 ///
864 /// The `validate` function is called while both queues are locked. Its return
865 /// value will determine which operation is performed, or whether the operation
866 /// should be aborted. See `RequeueOp` for details about the different possible
867 /// return values.
868 ///
869 /// The `callback` function is also called while both queues are locked. It is
870 /// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
871 /// indicating whether a thread was unparked and whether there are threads still
872 /// parked in the new queue. This `UnparkResult` value is also returned by
873 /// `unpark_requeue`.
874 ///
875 /// The `callback` function should return an `UnparkToken` value which will be
876 /// passed to the thread that is unparked. If no thread is unparked then the
877 /// returned value is ignored.
878 ///
879 /// # Safety
880 ///
881 /// You should only call this function with an address that you control, since
882 /// you could otherwise interfere with the operation of other synchronization
883 /// primitives.
884 ///
885 /// The `validate` and `callback` functions are called while the queue is locked
886 /// and must not panic or call into any function in `parking_lot`.
887 #[inline]
unpark_requeue( key_from: usize, key_to: usize, validate: impl FnOnce() -> RequeueOp, callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult888 pub unsafe fn unpark_requeue(
889     key_from: usize,
890     key_to: usize,
891     validate: impl FnOnce() -> RequeueOp,
892     callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
893 ) -> UnparkResult {
894     // Lock the two buckets for the given key
895     let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
896 
897     // If the validation function fails, just return
898     let mut result = UnparkResult::default();
899     let op = validate();
900     if op == RequeueOp::Abort {
901         // SAFETY: Both buckets are locked, as required.
902         unlock_bucket_pair(bucket_from, bucket_to);
903         return result;
904     }
905 
906     // Remove all threads with the given key in the source bucket
907     let mut link = &bucket_from.queue_head;
908     let mut current = bucket_from.queue_head.get();
909     let mut previous = ptr::null();
910     let mut requeue_threads: *const ThreadData = ptr::null();
911     let mut requeue_threads_tail: *const ThreadData = ptr::null();
912     let mut wakeup_thread = None;
913     while !current.is_null() {
914         if (*current).key.load(Ordering::Relaxed) == key_from {
915             // Remove the thread from the queue
916             let next = (*current).next_in_queue.get();
917             link.set(next);
918             if bucket_from.queue_tail.get() == current {
919                 bucket_from.queue_tail.set(previous);
920             }
921 
922             // Prepare the first thread for wakeup and requeue the rest.
923             if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne)
924                 && wakeup_thread.is_none()
925             {
926                 wakeup_thread = Some(current);
927                 result.unparked_threads = 1;
928             } else {
929                 if !requeue_threads.is_null() {
930                     (*requeue_threads_tail).next_in_queue.set(current);
931                 } else {
932                     requeue_threads = current;
933                 }
934                 requeue_threads_tail = current;
935                 (*current).key.store(key_to, Ordering::Relaxed);
936                 result.requeued_threads += 1;
937             }
938             if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne {
939                 // Scan the rest of the queue to see if there are any other
940                 // entries with the given key.
941                 let mut scan = next;
942                 while !scan.is_null() {
943                     if (*scan).key.load(Ordering::Relaxed) == key_from {
944                         result.have_more_threads = true;
945                         break;
946                     }
947                     scan = (*scan).next_in_queue.get();
948                 }
949                 break;
950             }
951             current = next;
952         } else {
953             link = &(*current).next_in_queue;
954             previous = current;
955             current = link.get();
956         }
957     }
958 
959     // Add the requeued threads to the destination bucket
960     if !requeue_threads.is_null() {
961         (*requeue_threads_tail).next_in_queue.set(ptr::null());
962         if !bucket_to.queue_head.get().is_null() {
963             (*bucket_to.queue_tail.get())
964                 .next_in_queue
965                 .set(requeue_threads);
966         } else {
967             bucket_to.queue_head.set(requeue_threads);
968         }
969         bucket_to.queue_tail.set(requeue_threads_tail);
970     }
971 
972     // Invoke the callback before waking up the thread
973     if result.unparked_threads != 0 {
974         result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
975     }
976     let token = callback(op, result);
977 
978     // See comment in unpark_one for why we mess with the locking
979     if let Some(wakeup_thread) = wakeup_thread {
980         (*wakeup_thread).unpark_token.set(token);
981         let handle = (*wakeup_thread).parker.unpark_lock();
982         // SAFETY: Both buckets are locked, as required.
983         unlock_bucket_pair(bucket_from, bucket_to);
984         handle.unpark();
985     } else {
986         // SAFETY: Both buckets are locked, as required.
987         unlock_bucket_pair(bucket_from, bucket_to);
988     }
989 
990     result
991 }
992 
993 /// Unparks a number of threads from the front of the queue associated with
994 /// `key` depending on the results of a filter function which inspects the
995 /// `ParkToken` associated with each thread.
996 ///
997 /// The `filter` function is called for each thread in the queue or until
998 /// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
999 /// associated with a particular thread, which is unparked if `FilterOp::Unpark`
1000 /// is returned.
1001 ///
1002 /// The `callback` function is also called while both queues are locked. It is
1003 /// passed an `UnparkResult` indicating the number of threads that were unparked
1004 /// and whether there are still parked threads in the queue. This `UnparkResult`
1005 /// value is also returned by `unpark_filter`.
1006 ///
1007 /// The `callback` function should return an `UnparkToken` value which will be
1008 /// passed to all threads that are unparked. If no thread is unparked then the
1009 /// returned value is ignored.
1010 ///
1011 /// # Safety
1012 ///
1013 /// You should only call this function with an address that you control, since
1014 /// you could otherwise interfere with the operation of other synchronization
1015 /// primitives.
1016 ///
1017 /// The `filter` and `callback` functions are called while the queue is locked
1018 /// and must not panic or call into any function in `parking_lot`.
1019 #[inline]
unpark_filter( key: usize, mut filter: impl FnMut(ParkToken) -> FilterOp, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult1020 pub unsafe fn unpark_filter(
1021     key: usize,
1022     mut filter: impl FnMut(ParkToken) -> FilterOp,
1023     callback: impl FnOnce(UnparkResult) -> UnparkToken,
1024 ) -> UnparkResult {
1025     // Lock the bucket for the given key
1026     let bucket = lock_bucket(key);
1027 
1028     // Go through the queue looking for threads with a matching key
1029     let mut link = &bucket.queue_head;
1030     let mut current = bucket.queue_head.get();
1031     let mut previous = ptr::null();
1032     let mut threads = SmallVec::<[_; 8]>::new();
1033     let mut result = UnparkResult::default();
1034     while !current.is_null() {
1035         if (*current).key.load(Ordering::Relaxed) == key {
1036             // Call the filter function with the thread's ParkToken
1037             let next = (*current).next_in_queue.get();
1038             match filter((*current).park_token.get()) {
1039                 FilterOp::Unpark => {
1040                     // Remove the thread from the queue
1041                     link.set(next);
1042                     if bucket.queue_tail.get() == current {
1043                         bucket.queue_tail.set(previous);
1044                     }
1045 
1046                     // Add the thread to our list of threads to unpark
1047                     threads.push((current, None));
1048 
1049                     current = next;
1050                 }
1051                 FilterOp::Skip => {
1052                     result.have_more_threads = true;
1053                     link = &(*current).next_in_queue;
1054                     previous = current;
1055                     current = link.get();
1056                 }
1057                 FilterOp::Stop => {
1058                     result.have_more_threads = true;
1059                     break;
1060                 }
1061             }
1062         } else {
1063             link = &(*current).next_in_queue;
1064             previous = current;
1065             current = link.get();
1066         }
1067     }
1068 
1069     // Invoke the callback before waking up the threads
1070     result.unparked_threads = threads.len();
1071     if result.unparked_threads != 0 {
1072         result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1073     }
1074     let token = callback(result);
1075 
1076     // Pass the token to all threads that are going to be unparked and prepare
1077     // them for unparking.
1078     for t in threads.iter_mut() {
1079         (*t.0).unpark_token.set(token);
1080         t.1 = Some((*t.0).parker.unpark_lock());
1081     }
1082 
1083     // SAFETY: We hold the lock here, as required
1084     bucket.mutex.unlock();
1085 
1086     // Now that we are outside the lock, wake up all the threads that we removed
1087     // from the queue.
1088     for (_, handle) in threads.into_iter() {
1089         handle.unchecked_unwrap().unpark();
1090     }
1091 
1092     result
1093 }
1094 
1095 /// \[Experimental\] Deadlock detection
1096 ///
1097 /// Enabled via the `deadlock_detection` feature flag.
1098 pub mod deadlock {
1099     #[cfg(feature = "deadlock_detection")]
1100     use super::deadlock_impl;
1101 
1102     #[cfg(feature = "deadlock_detection")]
1103     pub(super) use super::deadlock_impl::DeadlockData;
1104 
1105     /// Acquire a resource identified by key in the deadlock detector
1106     /// Noop if deadlock_detection feature isn't enabled.
1107     ///
1108     /// # Safety
1109     ///
1110     /// Call after the resource is acquired
1111     #[inline]
acquire_resource(_key: usize)1112     pub unsafe fn acquire_resource(_key: usize) {
1113         #[cfg(feature = "deadlock_detection")]
1114         deadlock_impl::acquire_resource(_key);
1115     }
1116 
1117     /// Release a resource identified by key in the deadlock detector.
1118     /// Noop if deadlock_detection feature isn't enabled.
1119     ///
1120     /// # Panics
1121     ///
1122     /// Panics if the resource was already released or wasn't acquired in this thread.
1123     ///
1124     /// # Safety
1125     ///
1126     /// Call before the resource is released
1127     #[inline]
release_resource(_key: usize)1128     pub unsafe fn release_resource(_key: usize) {
1129         #[cfg(feature = "deadlock_detection")]
1130         deadlock_impl::release_resource(_key);
1131     }
1132 
1133     /// Returns all deadlocks detected *since* the last call.
1134     /// Each cycle consist of a vector of `DeadlockedThread`.
1135     #[cfg(feature = "deadlock_detection")]
1136     #[inline]
check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>>1137     pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1138         deadlock_impl::check_deadlock()
1139     }
1140 
1141     #[inline]
on_unpark(_td: &super::ThreadData)1142     pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1143         #[cfg(feature = "deadlock_detection")]
1144         deadlock_impl::on_unpark(_td);
1145     }
1146 }
1147 
1148 #[cfg(feature = "deadlock_detection")]
1149 mod deadlock_impl {
1150     use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS};
1151     use crate::thread_parker::{ThreadParkerT, UnparkHandleT};
1152     use crate::word_lock::WordLock;
1153     use backtrace::Backtrace;
1154     use petgraph;
1155     use petgraph::graphmap::DiGraphMap;
1156     use std::cell::{Cell, UnsafeCell};
1157     use std::collections::HashSet;
1158     use std::sync::atomic::Ordering;
1159     use std::sync::mpsc;
1160     use thread_id;
1161 
1162     /// Representation of a deadlocked thread
1163     pub struct DeadlockedThread {
1164         thread_id: usize,
1165         backtrace: Backtrace,
1166     }
1167 
1168     impl DeadlockedThread {
1169         /// The system thread id
thread_id(&self) -> usize1170         pub fn thread_id(&self) -> usize {
1171             self.thread_id
1172         }
1173 
1174         /// The thread backtrace
backtrace(&self) -> &Backtrace1175         pub fn backtrace(&self) -> &Backtrace {
1176             &self.backtrace
1177         }
1178     }
1179 
1180     pub struct DeadlockData {
1181         // Currently owned resources (keys)
1182         resources: UnsafeCell<Vec<usize>>,
1183 
1184         // Set when there's a pending callstack request
1185         deadlocked: Cell<bool>,
1186 
1187         // Sender used to report the backtrace
1188         backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1189 
1190         // System thread id
1191         thread_id: usize,
1192     }
1193 
1194     impl DeadlockData {
new() -> Self1195         pub fn new() -> Self {
1196             DeadlockData {
1197                 resources: UnsafeCell::new(Vec::new()),
1198                 deadlocked: Cell::new(false),
1199                 backtrace_sender: UnsafeCell::new(None),
1200                 thread_id: thread_id::get(),
1201             }
1202         }
1203     }
1204 
on_unpark(td: &ThreadData)1205     pub(super) unsafe fn on_unpark(td: &ThreadData) {
1206         if td.deadlock_data.deadlocked.get() {
1207             let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1208             sender
1209                 .send(DeadlockedThread {
1210                     thread_id: td.deadlock_data.thread_id,
1211                     backtrace: Backtrace::new(),
1212                 })
1213                 .unwrap();
1214             // make sure to close this sender
1215             drop(sender);
1216 
1217             // park until the end of the time
1218             td.parker.prepare_park();
1219             td.parker.park();
1220             unreachable!("unparked deadlocked thread!");
1221         }
1222     }
1223 
acquire_resource(key: usize)1224     pub unsafe fn acquire_resource(key: usize) {
1225         with_thread_data(|thread_data| {
1226             (*thread_data.deadlock_data.resources.get()).push(key);
1227         });
1228     }
1229 
release_resource(key: usize)1230     pub unsafe fn release_resource(key: usize) {
1231         with_thread_data(|thread_data| {
1232             let resources = &mut (*thread_data.deadlock_data.resources.get());
1233 
1234             // There is only one situation where we can fail to find the
1235             // resource: we are currently running TLS destructors and our
1236             // ThreadData has already been freed. There isn't much we can do
1237             // about it at this point, so just ignore it.
1238             if let Some(p) = resources.iter().rposition(|x| *x == key) {
1239                 resources.swap_remove(p);
1240             }
1241         });
1242     }
1243 
check_deadlock() -> Vec<Vec<DeadlockedThread>>1244     pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1245         unsafe {
1246             // fast pass
1247             if check_wait_graph_fast() {
1248                 // double check
1249                 check_wait_graph_slow()
1250             } else {
1251                 Vec::new()
1252             }
1253         }
1254     }
1255 
1256     // Simple algorithm that builds a wait graph f the threads and the resources,
1257     // then checks for the presence of cycles (deadlocks).
1258     // This variant isn't precise as it doesn't lock the entire table before checking
check_wait_graph_fast() -> bool1259     unsafe fn check_wait_graph_fast() -> bool {
1260         let table = get_hashtable();
1261         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1262         let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1263 
1264         for b in &(*table).entries[..] {
1265             b.mutex.lock();
1266             let mut current = b.queue_head.get();
1267             while !current.is_null() {
1268                 if !(*current).parked_with_timeout.get()
1269                     && !(*current).deadlock_data.deadlocked.get()
1270                 {
1271                     // .resources are waiting for their owner
1272                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1273                         graph.add_edge(resource, current as usize, ());
1274                     }
1275                     // owner waits for resource .key
1276                     graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1277                 }
1278                 current = (*current).next_in_queue.get();
1279             }
1280             // SAFETY: We hold the lock here, as required
1281             b.mutex.unlock();
1282         }
1283 
1284         petgraph::algo::is_cyclic_directed(&graph)
1285     }
1286 
1287     #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1288     enum WaitGraphNode {
1289         Thread(*const ThreadData),
1290         Resource(usize),
1291     }
1292 
1293     use self::WaitGraphNode::*;
1294 
1295     // Contrary to the _fast variant this locks the entries table before looking for cycles.
1296     // Returns all detected thread wait cycles.
1297     // Note that once a cycle is reported it's never reported again.
check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>>1298     unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1299         static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::new();
1300         DEADLOCK_DETECTION_LOCK.lock();
1301 
1302         let mut table = get_hashtable();
1303         loop {
1304             // Lock all buckets in the old table
1305             for b in &table.entries[..] {
1306                 b.mutex.lock();
1307             }
1308 
1309             // Now check if our table is still the latest one. Another thread could
1310             // have grown the hash table between us getting and locking the hash table.
1311             let new_table = get_hashtable();
1312             if new_table as *const _ == table as *const _ {
1313                 break;
1314             }
1315 
1316             // Unlock buckets and try again
1317             for b in &table.entries[..] {
1318                 // SAFETY: We hold the lock here, as required
1319                 b.mutex.unlock();
1320             }
1321 
1322             table = new_table;
1323         }
1324 
1325         let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1326         let mut graph =
1327             DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1328 
1329         for b in &table.entries[..] {
1330             let mut current = b.queue_head.get();
1331             while !current.is_null() {
1332                 if !(*current).parked_with_timeout.get()
1333                     && !(*current).deadlock_data.deadlocked.get()
1334                 {
1335                     // .resources are waiting for their owner
1336                     for &resource in &(*(*current).deadlock_data.resources.get()) {
1337                         graph.add_edge(Resource(resource), Thread(current), ());
1338                     }
1339                     // owner waits for resource .key
1340                     graph.add_edge(
1341                         Thread(current),
1342                         Resource((*current).key.load(Ordering::Relaxed)),
1343                         (),
1344                     );
1345                 }
1346                 current = (*current).next_in_queue.get();
1347             }
1348         }
1349 
1350         for b in &table.entries[..] {
1351             // SAFETY: We hold the lock here, as required
1352             b.mutex.unlock();
1353         }
1354 
1355         // find cycles
1356         let cycles = graph_cycles(&graph);
1357 
1358         let mut results = Vec::with_capacity(cycles.len());
1359 
1360         for cycle in cycles {
1361             let (sender, receiver) = mpsc::channel();
1362             for td in cycle {
1363                 let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1364                 (*td).deadlock_data.deadlocked.set(true);
1365                 *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1366                 let handle = (*td).parker.unpark_lock();
1367                 // SAFETY: We hold the lock here, as required
1368                 bucket.mutex.unlock();
1369                 // unpark the deadlocked thread!
1370                 // on unpark it'll notice the deadlocked flag and report back
1371                 handle.unpark();
1372             }
1373             // make sure to drop our sender before collecting results
1374             drop(sender);
1375             results.push(receiver.iter().collect());
1376         }
1377 
1378         DEADLOCK_DETECTION_LOCK.unlock();
1379 
1380         results
1381     }
1382 
1383     // normalize a cycle to start with the "smallest" node
normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T>1384     fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1385         let min_pos = input
1386             .iter()
1387             .enumerate()
1388             .min_by_key(|&(_, &t)| t)
1389             .map(|(p, _)| p)
1390             .unwrap_or(0);
1391         input
1392             .iter()
1393             .cycle()
1394             .skip(min_pos)
1395             .take(input.len())
1396             .cloned()
1397             .collect()
1398     }
1399 
1400     // returns all thread cycles in the wait graph
graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>>1401     fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1402         use petgraph::visit::depth_first_search;
1403         use petgraph::visit::DfsEvent;
1404         use petgraph::visit::NodeIndexable;
1405 
1406         let mut cycles = HashSet::new();
1407         let mut path = Vec::with_capacity(g.node_bound());
1408         // start from threads to get the correct threads cycle
1409         let threads = g
1410             .nodes()
1411             .filter(|n| if let &Thread(_) = n { true } else { false });
1412 
1413         depth_first_search(g, threads, |e| match e {
1414             DfsEvent::Discover(Thread(n), _) => path.push(n),
1415             DfsEvent::Finish(Thread(_), _) => {
1416                 path.pop();
1417             }
1418             DfsEvent::BackEdge(_, Thread(n)) => {
1419                 let from = path.iter().rposition(|&i| i == n).unwrap();
1420                 cycles.insert(normalize_cycle(&path[from..]));
1421             }
1422             _ => (),
1423         });
1424 
1425         cycles.iter().cloned().collect()
1426     }
1427 }
1428 
1429 #[cfg(test)]
1430 mod tests {
1431     use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
1432     use std::{
1433         ptr,
1434         sync::{
1435             atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
1436             Arc,
1437         },
1438         thread,
1439         time::Duration,
1440     };
1441 
1442     /// Calls a closure for every `ThreadData` currently parked on a given key
for_each(key: usize, mut f: impl FnMut(&ThreadData))1443     fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) {
1444         let bucket = super::lock_bucket(key);
1445 
1446         let mut current: *const ThreadData = bucket.queue_head.get();
1447         while !current.is_null() {
1448             let current_ref = unsafe { &*current };
1449             if current_ref.key.load(Ordering::Relaxed) == key {
1450                 f(current_ref);
1451             }
1452             current = current_ref.next_in_queue.get();
1453         }
1454 
1455         // SAFETY: We hold the lock here, as required
1456         unsafe { bucket.mutex.unlock() };
1457     }
1458 
1459     macro_rules! test {
1460         ( $( $name:ident(
1461             repeats: $repeats:expr,
1462             latches: $latches:expr,
1463             delay: $delay:expr,
1464             threads: $threads:expr,
1465             single_unparks: $single_unparks:expr);
1466         )* ) => {
1467             $(#[test]
1468             fn $name() {
1469                 let delay = Duration::from_micros($delay);
1470                 for _ in 0..$repeats {
1471                     run_parking_test($latches, delay, $threads, $single_unparks);
1472                 }
1473             })*
1474         };
1475     }
1476 
1477     test! {
1478         unpark_all_one_fast(
1479             repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 0
1480         );
1481         unpark_all_hundred_fast(
1482             repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
1483         );
1484         unpark_one_one_fast(
1485             repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
1486         );
1487         unpark_one_hundred_fast(
1488             repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
1489         );
1490         unpark_one_fifty_then_fifty_all_fast(
1491             repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
1492         );
1493         unpark_all_one(
1494             repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
1495         );
1496         unpark_all_hundred(
1497             repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
1498         );
1499         unpark_one_one(
1500             repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
1501         );
1502         unpark_one_fifty(
1503             repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
1504         );
1505         unpark_one_fifty_then_fifty_all(
1506             repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
1507         );
1508         hundred_unpark_all_one_fast(
1509             repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
1510         );
1511         hundred_unpark_all_one(
1512             repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
1513         );
1514     }
1515 
run_parking_test( num_latches: usize, delay: Duration, num_threads: usize, num_single_unparks: usize, )1516     fn run_parking_test(
1517         num_latches: usize,
1518         delay: Duration,
1519         num_threads: usize,
1520         num_single_unparks: usize,
1521     ) {
1522         let mut tests = Vec::with_capacity(num_latches);
1523 
1524         for _ in 0..num_latches {
1525             let test = Arc::new(SingleLatchTest::new(num_threads));
1526             let mut threads = Vec::with_capacity(num_threads);
1527             for _ in 0..num_threads {
1528                 let test = test.clone();
1529                 threads.push(thread::spawn(move || test.run()));
1530             }
1531             tests.push((test, threads));
1532         }
1533 
1534         for unpark_index in 0..num_single_unparks {
1535             thread::sleep(delay);
1536             for (test, _) in &tests {
1537                 test.unpark_one(unpark_index);
1538             }
1539         }
1540 
1541         for (test, threads) in tests {
1542             test.finish(num_single_unparks);
1543             for thread in threads {
1544                 thread.join().expect("Test thread panic");
1545             }
1546         }
1547     }
1548 
1549     struct SingleLatchTest {
1550         semaphore: AtomicIsize,
1551         num_awake: AtomicUsize,
1552         /// Holds the pointer to the last *unprocessed* woken up thread.
1553         last_awoken: AtomicPtr<ThreadData>,
1554         /// Total number of threads participating in this test.
1555         num_threads: usize,
1556     }
1557 
1558     impl SingleLatchTest {
new(num_threads: usize) -> Self1559         pub fn new(num_threads: usize) -> Self {
1560             Self {
1561                 // This implements a fair (FIFO) semaphore, and it starts out unavailable.
1562                 semaphore: AtomicIsize::new(0),
1563                 num_awake: AtomicUsize::new(0),
1564                 last_awoken: AtomicPtr::new(ptr::null_mut()),
1565                 num_threads,
1566             }
1567         }
1568 
run(&self)1569         pub fn run(&self) {
1570             // Get one slot from the semaphore
1571             self.down();
1572 
1573             // Report back to the test verification code that this thread woke up
1574             let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _);
1575             self.last_awoken.store(this_thread_ptr, Ordering::SeqCst);
1576             self.num_awake.fetch_add(1, Ordering::SeqCst);
1577         }
1578 
unpark_one(&self, single_unpark_index: usize)1579         pub fn unpark_one(&self, single_unpark_index: usize) {
1580             // last_awoken should be null at all times except between self.up() and at the bottom
1581             // of this method where it's reset to null again
1582             assert!(self.last_awoken.load(Ordering::SeqCst).is_null());
1583 
1584             let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads);
1585             for_each(self.semaphore_addr(), |thread_data| {
1586                 queue.push(thread_data as *const _ as *mut _);
1587             });
1588             assert!(queue.len() <= self.num_threads - single_unpark_index);
1589 
1590             let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
1591 
1592             self.up();
1593 
1594             // Wait for a parked thread to wake up and update num_awake + last_awoken.
1595             while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
1596                 thread::yield_now();
1597             }
1598 
1599             // At this point the other thread should have set last_awoken inside the run() method
1600             let last_awoken = self.last_awoken.load(Ordering::SeqCst);
1601             assert!(!last_awoken.is_null());
1602             if !queue.is_empty() && queue[0] != last_awoken {
1603                 panic!(
1604                     "Woke up wrong thread:\n\tqueue: {:?}\n\tlast awoken: {:?}",
1605                     queue, last_awoken
1606                 );
1607             }
1608             self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst);
1609         }
1610 
finish(&self, num_single_unparks: usize)1611         pub fn finish(&self, num_single_unparks: usize) {
1612             // The amount of threads not unparked via unpark_one
1613             let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap();
1614 
1615             // Wake remaining threads up with unpark_all. Has to be in a loop, because there might
1616             // still be threads that has not yet parked.
1617             while num_threads_left > 0 {
1618                 let mut num_waiting_on_address = 0;
1619                 for_each(self.semaphore_addr(), |_thread_data| {
1620                     num_waiting_on_address += 1;
1621                 });
1622                 assert!(num_waiting_on_address <= num_threads_left);
1623 
1624                 let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
1625 
1626                 let num_unparked =
1627                     unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) };
1628                 assert!(num_unparked >= num_waiting_on_address);
1629                 assert!(num_unparked <= num_threads_left);
1630 
1631                 // Wait for all unparked threads to wake up and update num_awake + last_awoken.
1632                 while self.num_awake.load(Ordering::SeqCst)
1633                     != num_awake_before_unpark + num_unparked
1634                 {
1635                     thread::yield_now()
1636                 }
1637 
1638                 num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
1639             }
1640             // By now, all threads should have been woken up
1641             assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
1642 
1643             // Make sure no thread is parked on our semaphore address
1644             let mut num_waiting_on_address = 0;
1645             for_each(self.semaphore_addr(), |_thread_data| {
1646                 num_waiting_on_address += 1;
1647             });
1648             assert_eq!(num_waiting_on_address, 0);
1649         }
1650 
down(&self)1651         pub fn down(&self) {
1652             let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
1653 
1654             if old_semaphore_value > 0 {
1655                 // We acquired the semaphore. Done.
1656                 return;
1657             }
1658 
1659             // We need to wait.
1660             let validate = || true;
1661             let before_sleep = || {};
1662             let timed_out = |_, _| {};
1663             unsafe {
1664                 super::park(
1665                     self.semaphore_addr(),
1666                     validate,
1667                     before_sleep,
1668                     timed_out,
1669                     DEFAULT_PARK_TOKEN,
1670                     None,
1671                 );
1672             }
1673         }
1674 
up(&self)1675         pub fn up(&self) {
1676             let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst);
1677 
1678             // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
1679             if old_semaphore_value < 0 {
1680                 // We need to continue until we have actually unparked someone. It might be that
1681                 // the thread we want to pass ownership to has decremented the semaphore counter,
1682                 // but not yet parked.
1683                 loop {
1684                     match unsafe {
1685                         super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN)
1686                             .unparked_threads
1687                     } {
1688                         1 => break,
1689                         0 => (),
1690                         i => panic!("Should not wake up {} threads", i),
1691                     }
1692                 }
1693             }
1694         }
1695 
semaphore_addr(&self) -> usize1696         fn semaphore_addr(&self) -> usize {
1697             &self.semaphore as *const _ as usize
1698         }
1699     }
1700 }
1701