1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::elision::{have_elision, AtomicElisionExt};
9 use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10 use crate::util;
11 use core::{
12     cell::Cell,
13     sync::atomic::{AtomicUsize, Ordering},
14 };
15 use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade};
16 use parking_lot_core::{
17     self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
18 };
19 use std::time::{Duration, Instant};
20 
21 // This reader-writer lock implementation is based on Boost's upgrade_mutex:
22 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432
23 //
24 // This implementation uses 2 wait queues, one at key [addr] and one at key
25 // [addr + 1]. The primary queue is used for all new waiting threads, and the
26 // secondary queue is used by the thread which has acquired WRITER_BIT but is
27 // waiting for the remaining readers to exit the lock.
28 //
29 // This implementation is fair between readers and writers since it uses the
30 // order in which threads first started queuing to alternate between read phases
31 // and write phases. In particular is it not vulnerable to write starvation
32 // since readers will block if there is a pending writer.
33 
34 // There is at least one thread in the main queue.
35 const PARKED_BIT: usize = 0b0001;
36 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set.
37 const WRITER_PARKED_BIT: usize = 0b0010;
38 // A reader is holding an upgradable lock. The reader count must be non-zero and
39 // WRITER_BIT must not be set.
40 const UPGRADABLE_BIT: usize = 0b0100;
41 // If the reader count is zero: a writer is currently holding an exclusive lock.
42 // Otherwise: a writer is waiting for the remaining readers to exit the lock.
43 const WRITER_BIT: usize = 0b1000;
44 // Mask of bits used to count readers.
45 const READERS_MASK: usize = !0b1111;
46 // Base unit for counting readers.
47 const ONE_READER: usize = 0b10000;
48 
49 // Token indicating what type of lock a queued thread is trying to acquire
50 const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
51 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
52 const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
53 
54 /// Raw reader-writer lock type backed by the parking lot.
55 pub struct RawRwLock {
56     state: AtomicUsize,
57 }
58 
59 unsafe impl lock_api::RawRwLock for RawRwLock {
60     const INIT: RawRwLock = RawRwLock {
61         state: AtomicUsize::new(0),
62     };
63 
64     type GuardMarker = crate::GuardMarker;
65 
66     #[inline]
lock_exclusive(&self)67     fn lock_exclusive(&self) {
68         if self
69             .state
70             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
71             .is_err()
72         {
73             let result = self.lock_exclusive_slow(None);
74             debug_assert!(result);
75         }
76         self.deadlock_acquire();
77     }
78 
79     #[inline]
try_lock_exclusive(&self) -> bool80     fn try_lock_exclusive(&self) -> bool {
81         if self
82             .state
83             .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
84             .is_ok()
85         {
86             self.deadlock_acquire();
87             true
88         } else {
89             false
90         }
91     }
92 
93     #[inline]
unlock_exclusive(&self)94     unsafe fn unlock_exclusive(&self) {
95         self.deadlock_release();
96         if self
97             .state
98             .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99             .is_ok()
100         {
101             return;
102         }
103         self.unlock_exclusive_slow(false);
104     }
105 
106     #[inline]
lock_shared(&self)107     fn lock_shared(&self) {
108         if !self.try_lock_shared_fast(false) {
109             let result = self.lock_shared_slow(false, None);
110             debug_assert!(result);
111         }
112         self.deadlock_acquire();
113     }
114 
115     #[inline]
try_lock_shared(&self) -> bool116     fn try_lock_shared(&self) -> bool {
117         let result = if self.try_lock_shared_fast(false) {
118             true
119         } else {
120             self.try_lock_shared_slow(false)
121         };
122         if result {
123             self.deadlock_acquire();
124         }
125         result
126     }
127 
128     #[inline]
unlock_shared(&self)129     unsafe fn unlock_shared(&self) {
130         self.deadlock_release();
131         let state = if have_elision() {
132             self.state.elision_fetch_sub_release(ONE_READER)
133         } else {
134             self.state.fetch_sub(ONE_READER, Ordering::Release)
135         };
136         if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
137             self.unlock_shared_slow();
138         }
139     }
140 
141     #[inline]
is_locked(&self) -> bool142     fn is_locked(&self) -> bool {
143         let state = self.state.load(Ordering::Relaxed);
144         state & (WRITER_BIT | READERS_MASK) != 0
145     }
146 
147     #[inline]
is_locked_exclusive(&self) -> bool148     fn is_locked_exclusive(&self) -> bool {
149         let state = self.state.load(Ordering::Relaxed);
150         state & (WRITER_BIT) != 0
151     }
152 }
153 
154 unsafe impl lock_api::RawRwLockFair for RawRwLock {
155     #[inline]
unlock_shared_fair(&self)156     unsafe fn unlock_shared_fair(&self) {
157         // Shared unlocking is always fair in this implementation.
158         self.unlock_shared();
159     }
160 
161     #[inline]
unlock_exclusive_fair(&self)162     unsafe fn unlock_exclusive_fair(&self) {
163         self.deadlock_release();
164         if self
165             .state
166             .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
167             .is_ok()
168         {
169             return;
170         }
171         self.unlock_exclusive_slow(true);
172     }
173 
174     #[inline]
bump_shared(&self)175     unsafe fn bump_shared(&self) {
176         if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT)
177             == ONE_READER | WRITER_BIT
178         {
179             self.bump_shared_slow();
180         }
181     }
182 
183     #[inline]
bump_exclusive(&self)184     unsafe fn bump_exclusive(&self) {
185         if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
186             self.bump_exclusive_slow();
187         }
188     }
189 }
190 
191 unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
192     #[inline]
downgrade(&self)193     unsafe fn downgrade(&self) {
194         let state = self
195             .state
196             .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
197 
198         // Wake up parked shared and upgradable threads if there are any
199         if state & PARKED_BIT != 0 {
200             self.downgrade_slow();
201         }
202     }
203 }
204 
205 unsafe impl lock_api::RawRwLockTimed for RawRwLock {
206     type Duration = Duration;
207     type Instant = Instant;
208 
209     #[inline]
try_lock_shared_for(&self, timeout: Self::Duration) -> bool210     fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
211         let result = if self.try_lock_shared_fast(false) {
212             true
213         } else {
214             self.lock_shared_slow(false, util::to_deadline(timeout))
215         };
216         if result {
217             self.deadlock_acquire();
218         }
219         result
220     }
221 
222     #[inline]
try_lock_shared_until(&self, timeout: Self::Instant) -> bool223     fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
224         let result = if self.try_lock_shared_fast(false) {
225             true
226         } else {
227             self.lock_shared_slow(false, Some(timeout))
228         };
229         if result {
230             self.deadlock_acquire();
231         }
232         result
233     }
234 
235     #[inline]
try_lock_exclusive_for(&self, timeout: Duration) -> bool236     fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
237         let result = if self
238             .state
239             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
240             .is_ok()
241         {
242             true
243         } else {
244             self.lock_exclusive_slow(util::to_deadline(timeout))
245         };
246         if result {
247             self.deadlock_acquire();
248         }
249         result
250     }
251 
252     #[inline]
try_lock_exclusive_until(&self, timeout: Instant) -> bool253     fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
254         let result = if self
255             .state
256             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
257             .is_ok()
258         {
259             true
260         } else {
261             self.lock_exclusive_slow(Some(timeout))
262         };
263         if result {
264             self.deadlock_acquire();
265         }
266         result
267     }
268 }
269 
270 unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
271     #[inline]
lock_shared_recursive(&self)272     fn lock_shared_recursive(&self) {
273         if !self.try_lock_shared_fast(true) {
274             let result = self.lock_shared_slow(true, None);
275             debug_assert!(result);
276         }
277         self.deadlock_acquire();
278     }
279 
280     #[inline]
try_lock_shared_recursive(&self) -> bool281     fn try_lock_shared_recursive(&self) -> bool {
282         let result = if self.try_lock_shared_fast(true) {
283             true
284         } else {
285             self.try_lock_shared_slow(true)
286         };
287         if result {
288             self.deadlock_acquire();
289         }
290         result
291     }
292 }
293 
294 unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
295     #[inline]
try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool296     fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
297         let result = if self.try_lock_shared_fast(true) {
298             true
299         } else {
300             self.lock_shared_slow(true, util::to_deadline(timeout))
301         };
302         if result {
303             self.deadlock_acquire();
304         }
305         result
306     }
307 
308     #[inline]
try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool309     fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
310         let result = if self.try_lock_shared_fast(true) {
311             true
312         } else {
313             self.lock_shared_slow(true, Some(timeout))
314         };
315         if result {
316             self.deadlock_acquire();
317         }
318         result
319     }
320 }
321 
322 unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
323     #[inline]
lock_upgradable(&self)324     fn lock_upgradable(&self) {
325         if !self.try_lock_upgradable_fast() {
326             let result = self.lock_upgradable_slow(None);
327             debug_assert!(result);
328         }
329         self.deadlock_acquire();
330     }
331 
332     #[inline]
try_lock_upgradable(&self) -> bool333     fn try_lock_upgradable(&self) -> bool {
334         let result = if self.try_lock_upgradable_fast() {
335             true
336         } else {
337             self.try_lock_upgradable_slow()
338         };
339         if result {
340             self.deadlock_acquire();
341         }
342         result
343     }
344 
345     #[inline]
unlock_upgradable(&self)346     unsafe fn unlock_upgradable(&self) {
347         self.deadlock_release();
348         let state = self.state.load(Ordering::Relaxed);
349         if state & PARKED_BIT == 0 {
350             if self
351                 .state
352                 .compare_exchange_weak(
353                     state,
354                     state - (ONE_READER | UPGRADABLE_BIT),
355                     Ordering::Release,
356                     Ordering::Relaxed,
357                 )
358                 .is_ok()
359             {
360                 return;
361             }
362         }
363         self.unlock_upgradable_slow(false);
364     }
365 
366     #[inline]
upgrade(&self)367     unsafe fn upgrade(&self) {
368         let state = self.state.fetch_sub(
369             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
370             Ordering::Acquire,
371         );
372         if state & READERS_MASK != ONE_READER {
373             let result = self.upgrade_slow(None);
374             debug_assert!(result);
375         }
376     }
377 
378     #[inline]
try_upgrade(&self) -> bool379     unsafe fn try_upgrade(&self) -> bool {
380         if self
381             .state
382             .compare_exchange_weak(
383                 ONE_READER | UPGRADABLE_BIT,
384                 WRITER_BIT,
385                 Ordering::Acquire,
386                 Ordering::Relaxed,
387             )
388             .is_ok()
389         {
390             true
391         } else {
392             self.try_upgrade_slow()
393         }
394     }
395 }
396 
397 unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
398     #[inline]
unlock_upgradable_fair(&self)399     unsafe fn unlock_upgradable_fair(&self) {
400         self.deadlock_release();
401         let state = self.state.load(Ordering::Relaxed);
402         if state & PARKED_BIT == 0 {
403             if self
404                 .state
405                 .compare_exchange_weak(
406                     state,
407                     state - (ONE_READER | UPGRADABLE_BIT),
408                     Ordering::Release,
409                     Ordering::Relaxed,
410                 )
411                 .is_ok()
412             {
413                 return;
414             }
415         }
416         self.unlock_upgradable_slow(false);
417     }
418 
419     #[inline]
bump_upgradable(&self)420     unsafe fn bump_upgradable(&self) {
421         if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT {
422             self.bump_upgradable_slow();
423         }
424     }
425 }
426 
427 unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
428     #[inline]
downgrade_upgradable(&self)429     unsafe fn downgrade_upgradable(&self) {
430         let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
431 
432         // Wake up parked upgradable threads if there are any
433         if state & PARKED_BIT != 0 {
434             self.downgrade_slow();
435         }
436     }
437 
438     #[inline]
downgrade_to_upgradable(&self)439     unsafe fn downgrade_to_upgradable(&self) {
440         let state = self.state.fetch_add(
441             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
442             Ordering::Release,
443         );
444 
445         // Wake up parked shared threads if there are any
446         if state & PARKED_BIT != 0 {
447             self.downgrade_to_upgradable_slow();
448         }
449     }
450 }
451 
452 unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
453     #[inline]
try_lock_upgradable_until(&self, timeout: Instant) -> bool454     fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
455         let result = if self.try_lock_upgradable_fast() {
456             true
457         } else {
458             self.lock_upgradable_slow(Some(timeout))
459         };
460         if result {
461             self.deadlock_acquire();
462         }
463         result
464     }
465 
466     #[inline]
try_lock_upgradable_for(&self, timeout: Duration) -> bool467     fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
468         let result = if self.try_lock_upgradable_fast() {
469             true
470         } else {
471             self.lock_upgradable_slow(util::to_deadline(timeout))
472         };
473         if result {
474             self.deadlock_acquire();
475         }
476         result
477     }
478 
479     #[inline]
try_upgrade_until(&self, timeout: Instant) -> bool480     unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool {
481         let state = self.state.fetch_sub(
482             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
483             Ordering::Relaxed,
484         );
485         if state & READERS_MASK == ONE_READER {
486             true
487         } else {
488             self.upgrade_slow(Some(timeout))
489         }
490     }
491 
492     #[inline]
try_upgrade_for(&self, timeout: Duration) -> bool493     unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool {
494         let state = self.state.fetch_sub(
495             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
496             Ordering::Relaxed,
497         );
498         if state & READERS_MASK == ONE_READER {
499             true
500         } else {
501             self.upgrade_slow(util::to_deadline(timeout))
502         }
503     }
504 }
505 
506 impl RawRwLock {
507     #[inline(always)]
try_lock_shared_fast(&self, recursive: bool) -> bool508     fn try_lock_shared_fast(&self, recursive: bool) -> bool {
509         let state = self.state.load(Ordering::Relaxed);
510 
511         // We can't allow grabbing a shared lock if there is a writer, even if
512         // the writer is still waiting for the remaining readers to exit.
513         if state & WRITER_BIT != 0 {
514             // To allow recursive locks, we make an exception and allow readers
515             // to skip ahead of a pending writer to avoid deadlocking, at the
516             // cost of breaking the fairness guarantees.
517             if !recursive || state & READERS_MASK == 0 {
518                 return false;
519             }
520         }
521 
522         // Use hardware lock elision to avoid cache conflicts when multiple
523         // readers try to acquire the lock. We only do this if the lock is
524         // completely empty since elision handles conflicts poorly.
525         if have_elision() && state == 0 {
526             self.state
527                 .elision_compare_exchange_acquire(0, ONE_READER)
528                 .is_ok()
529         } else if let Some(new_state) = state.checked_add(ONE_READER) {
530             self.state
531                 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
532                 .is_ok()
533         } else {
534             false
535         }
536     }
537 
538     #[cold]
try_lock_shared_slow(&self, recursive: bool) -> bool539     fn try_lock_shared_slow(&self, recursive: bool) -> bool {
540         let mut state = self.state.load(Ordering::Relaxed);
541         loop {
542             // This mirrors the condition in try_lock_shared_fast
543             if state & WRITER_BIT != 0 {
544                 if !recursive || state & READERS_MASK == 0 {
545                     return false;
546                 }
547             }
548             if have_elision() && state == 0 {
549                 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
550                     Ok(_) => return true,
551                     Err(x) => state = x,
552                 }
553             } else {
554                 match self.state.compare_exchange_weak(
555                     state,
556                     state
557                         .checked_add(ONE_READER)
558                         .expect("RwLock reader count overflow"),
559                     Ordering::Acquire,
560                     Ordering::Relaxed,
561                 ) {
562                     Ok(_) => return true,
563                     Err(x) => state = x,
564                 }
565             }
566         }
567     }
568 
569     #[inline(always)]
try_lock_upgradable_fast(&self) -> bool570     fn try_lock_upgradable_fast(&self) -> bool {
571         let state = self.state.load(Ordering::Relaxed);
572 
573         // We can't grab an upgradable lock if there is already a writer or
574         // upgradable reader.
575         if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
576             return false;
577         }
578 
579         if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
580             self.state
581                 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
582                 .is_ok()
583         } else {
584             false
585         }
586     }
587 
588     #[cold]
try_lock_upgradable_slow(&self) -> bool589     fn try_lock_upgradable_slow(&self) -> bool {
590         let mut state = self.state.load(Ordering::Relaxed);
591         loop {
592             // This mirrors the condition in try_lock_upgradable_fast
593             if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
594                 return false;
595             }
596 
597             match self.state.compare_exchange_weak(
598                 state,
599                 state
600                     .checked_add(ONE_READER | UPGRADABLE_BIT)
601                     .expect("RwLock reader count overflow"),
602                 Ordering::Acquire,
603                 Ordering::Relaxed,
604             ) {
605                 Ok(_) => return true,
606                 Err(x) => state = x,
607             }
608         }
609     }
610 
611     #[cold]
lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool612     fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
613         let try_lock = |state: &mut usize| {
614             loop {
615                 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
616                     return false;
617                 }
618 
619                 // Grab WRITER_BIT if it isn't set, even if there are parked threads.
620                 match self.state.compare_exchange_weak(
621                     *state,
622                     *state | WRITER_BIT,
623                     Ordering::Acquire,
624                     Ordering::Relaxed,
625                 ) {
626                     Ok(_) => return true,
627                     Err(x) => *state = x,
628                 }
629             }
630         };
631 
632         // Step 1: grab exclusive ownership of WRITER_BIT
633         let timed_out = !self.lock_common(
634             timeout,
635             TOKEN_EXCLUSIVE,
636             try_lock,
637             WRITER_BIT | UPGRADABLE_BIT,
638         );
639         if timed_out {
640             return false;
641         }
642 
643         // Step 2: wait for all remaining readers to exit the lock.
644         self.wait_for_readers(timeout, 0)
645     }
646 
647     #[cold]
unlock_exclusive_slow(&self, force_fair: bool)648     fn unlock_exclusive_slow(&self, force_fair: bool) {
649         // There are threads to unpark. Try to unpark as many as we can.
650         let callback = |mut new_state, result: UnparkResult| {
651             // If we are using a fair unlock then we should keep the
652             // rwlock locked and hand it off to the unparked threads.
653             if result.unparked_threads != 0 && (force_fair || result.be_fair) {
654                 if result.have_more_threads {
655                     new_state |= PARKED_BIT;
656                 }
657                 self.state.store(new_state, Ordering::Release);
658                 TOKEN_HANDOFF
659             } else {
660                 // Clear the parked bit if there are no more parked threads.
661                 if result.have_more_threads {
662                     self.state.store(PARKED_BIT, Ordering::Release);
663                 } else {
664                     self.state.store(0, Ordering::Release);
665                 }
666                 TOKEN_NORMAL
667             }
668         };
669         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
670         unsafe {
671             self.wake_parked_threads(0, callback);
672         }
673     }
674 
675     #[cold]
lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool676     fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
677         let try_lock = |state: &mut usize| {
678             let mut spinwait_shared = SpinWait::new();
679             loop {
680                 // Use hardware lock elision to avoid cache conflicts when multiple
681                 // readers try to acquire the lock. We only do this if the lock is
682                 // completely empty since elision handles conflicts poorly.
683                 if have_elision() && *state == 0 {
684                     match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
685                         Ok(_) => return true,
686                         Err(x) => *state = x,
687                     }
688                 }
689 
690                 // This is the same condition as try_lock_shared_fast
691                 if *state & WRITER_BIT != 0 {
692                     if !recursive || *state & READERS_MASK == 0 {
693                         return false;
694                     }
695                 }
696 
697                 if self
698                     .state
699                     .compare_exchange_weak(
700                         *state,
701                         state
702                             .checked_add(ONE_READER)
703                             .expect("RwLock reader count overflow"),
704                         Ordering::Acquire,
705                         Ordering::Relaxed,
706                     )
707                     .is_ok()
708                 {
709                     return true;
710                 }
711 
712                 // If there is high contention on the reader count then we want
713                 // to leave some time between attempts to acquire the lock to
714                 // let other threads make progress.
715                 spinwait_shared.spin_no_yield();
716                 *state = self.state.load(Ordering::Relaxed);
717             }
718         };
719         self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
720     }
721 
722     #[cold]
unlock_shared_slow(&self)723     fn unlock_shared_slow(&self) {
724         // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We
725         // just need to wake up a potentially sleeping pending writer.
726         // Using the 2nd key at addr + 1
727         let addr = self as *const _ as usize + 1;
728         let callback = |_result: UnparkResult| {
729             // Clear the WRITER_PARKED_BIT here since there can only be one
730             // parked writer thread.
731             self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
732             TOKEN_NORMAL
733         };
734         // SAFETY:
735         //   * `addr` is an address we control.
736         //   * `callback` does not panic or call into any function of `parking_lot`.
737         unsafe {
738             parking_lot_core::unpark_one(addr, callback);
739         }
740     }
741 
742     #[cold]
lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool743     fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
744         let try_lock = |state: &mut usize| {
745             let mut spinwait_shared = SpinWait::new();
746             loop {
747                 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
748                     return false;
749                 }
750 
751                 if self
752                     .state
753                     .compare_exchange_weak(
754                         *state,
755                         state
756                             .checked_add(ONE_READER | UPGRADABLE_BIT)
757                             .expect("RwLock reader count overflow"),
758                         Ordering::Acquire,
759                         Ordering::Relaxed,
760                     )
761                     .is_ok()
762                 {
763                     return true;
764                 }
765 
766                 // If there is high contention on the reader count then we want
767                 // to leave some time between attempts to acquire the lock to
768                 // let other threads make progress.
769                 spinwait_shared.spin_no_yield();
770                 *state = self.state.load(Ordering::Relaxed);
771             }
772         };
773         self.lock_common(
774             timeout,
775             TOKEN_UPGRADABLE,
776             try_lock,
777             WRITER_BIT | UPGRADABLE_BIT,
778         )
779     }
780 
781     #[cold]
unlock_upgradable_slow(&self, force_fair: bool)782     fn unlock_upgradable_slow(&self, force_fair: bool) {
783         // Just release the lock if there are no parked threads.
784         let mut state = self.state.load(Ordering::Relaxed);
785         while state & PARKED_BIT == 0 {
786             match self.state.compare_exchange_weak(
787                 state,
788                 state - (ONE_READER | UPGRADABLE_BIT),
789                 Ordering::Release,
790                 Ordering::Relaxed,
791             ) {
792                 Ok(_) => return,
793                 Err(x) => state = x,
794             }
795         }
796 
797         // There are threads to unpark. Try to unpark as many as we can.
798         let callback = |new_state, result: UnparkResult| {
799             // If we are using a fair unlock then we should keep the
800             // rwlock locked and hand it off to the unparked threads.
801             let mut state = self.state.load(Ordering::Relaxed);
802             if force_fair || result.be_fair {
803                 // Fall back to normal unpark on overflow. Panicking is
804                 // not allowed in parking_lot callbacks.
805                 while let Some(mut new_state) =
806                     (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
807                 {
808                     if result.have_more_threads {
809                         new_state |= PARKED_BIT;
810                     } else {
811                         new_state &= !PARKED_BIT;
812                     }
813                     match self.state.compare_exchange_weak(
814                         state,
815                         new_state,
816                         Ordering::Relaxed,
817                         Ordering::Relaxed,
818                     ) {
819                         Ok(_) => return TOKEN_HANDOFF,
820                         Err(x) => state = x,
821                     }
822                 }
823             }
824 
825             // Otherwise just release the upgradable lock and update PARKED_BIT.
826             loop {
827                 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
828                 if result.have_more_threads {
829                     new_state |= PARKED_BIT;
830                 } else {
831                     new_state &= !PARKED_BIT;
832                 }
833                 match self.state.compare_exchange_weak(
834                     state,
835                     new_state,
836                     Ordering::Relaxed,
837                     Ordering::Relaxed,
838                 ) {
839                     Ok(_) => return TOKEN_NORMAL,
840                     Err(x) => state = x,
841                 }
842             }
843         };
844         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
845         unsafe {
846             self.wake_parked_threads(0, callback);
847         }
848     }
849 
850     #[cold]
try_upgrade_slow(&self) -> bool851     fn try_upgrade_slow(&self) -> bool {
852         let mut state = self.state.load(Ordering::Relaxed);
853         loop {
854             if state & READERS_MASK != ONE_READER {
855                 return false;
856             }
857             match self.state.compare_exchange_weak(
858                 state,
859                 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
860                 Ordering::Relaxed,
861                 Ordering::Relaxed,
862             ) {
863                 Ok(_) => return true,
864                 Err(x) => state = x,
865             }
866         }
867     }
868 
869     #[cold]
upgrade_slow(&self, timeout: Option<Instant>) -> bool870     fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
871         self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT)
872     }
873 
874     #[cold]
downgrade_slow(&self)875     fn downgrade_slow(&self) {
876         // We only reach this point if PARKED_BIT is set.
877         let callback = |_, result: UnparkResult| {
878             // Clear the parked bit if there no more parked threads
879             if !result.have_more_threads {
880                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
881             }
882             TOKEN_NORMAL
883         };
884         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
885         unsafe {
886             self.wake_parked_threads(ONE_READER, callback);
887         }
888     }
889 
890     #[cold]
downgrade_to_upgradable_slow(&self)891     fn downgrade_to_upgradable_slow(&self) {
892         // We only reach this point if PARKED_BIT is set.
893         let callback = |_, result: UnparkResult| {
894             // Clear the parked bit if there no more parked threads
895             if !result.have_more_threads {
896                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
897             }
898             TOKEN_NORMAL
899         };
900         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
901         unsafe {
902             self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
903         }
904     }
905 
906     #[cold]
bump_shared_slow(&self)907     unsafe fn bump_shared_slow(&self) {
908         self.unlock_shared();
909         self.lock_shared();
910     }
911 
912     #[cold]
bump_exclusive_slow(&self)913     fn bump_exclusive_slow(&self) {
914         self.deadlock_release();
915         self.unlock_exclusive_slow(true);
916         self.lock_exclusive();
917     }
918 
919     #[cold]
bump_upgradable_slow(&self)920     fn bump_upgradable_slow(&self) {
921         self.deadlock_release();
922         self.unlock_upgradable_slow(true);
923         self.lock_upgradable();
924     }
925 
926     /// Common code for waking up parked threads after releasing WRITER_BIT or
927     /// UPGRADABLE_BIT.
928     ///
929     /// # Safety
930     ///
931     /// `callback` must uphold the requirements of the `callback` parameter to
932     /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in
933     /// `parking_lot`.
934     #[inline]
wake_parked_threads( &self, new_state: usize, callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, )935     unsafe fn wake_parked_threads(
936         &self,
937         new_state: usize,
938         callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
939     ) {
940         // We must wake up at least one upgrader or writer if there is one,
941         // otherwise they may end up parked indefinitely since unlock_shared
942         // does not call wake_parked_threads.
943         let new_state = Cell::new(new_state);
944         let addr = self as *const _ as usize;
945         let filter = |ParkToken(token)| {
946             let s = new_state.get();
947 
948             // If we are waking up a writer, don't wake anything else.
949             if s & WRITER_BIT != 0 {
950                 return FilterOp::Stop;
951             }
952 
953             // Otherwise wake *all* readers and one upgrader/writer.
954             if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
955                 // Skip writers and upgradable readers if we already have
956                 // a writer/upgradable reader.
957                 FilterOp::Skip
958             } else {
959                 new_state.set(s + token);
960                 FilterOp::Unpark
961             }
962         };
963         let callback = |result| callback(new_state.get(), result);
964         // SAFETY:
965         // * `addr` is an address we control.
966         // * `filter` does not panic or call into any function of `parking_lot`.
967         // * `callback` safety responsibility is on caller
968         parking_lot_core::unpark_filter(addr, filter, callback);
969     }
970 
971     // Common code for waiting for readers to exit the lock after acquiring
972     // WRITER_BIT.
973     #[inline]
wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool974     fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
975         // At this point WRITER_BIT is already set, we just need to wait for the
976         // remaining readers to exit the lock.
977         let mut spinwait = SpinWait::new();
978         let mut state = self.state.load(Ordering::Acquire);
979         while state & READERS_MASK != 0 {
980             // Spin a few times to wait for readers to exit
981             if spinwait.spin() {
982                 state = self.state.load(Ordering::Acquire);
983                 continue;
984             }
985 
986             // Set the parked bit
987             if state & WRITER_PARKED_BIT == 0 {
988                 if let Err(x) = self.state.compare_exchange_weak(
989                     state,
990                     state | WRITER_PARKED_BIT,
991                     Ordering::Acquire,
992                     Ordering::Acquire,
993                 ) {
994                     state = x;
995                     continue;
996                 }
997             }
998 
999             // Park our thread until we are woken up by an unlock
1000             // Using the 2nd key at addr + 1
1001             let addr = self as *const _ as usize + 1;
1002             let validate = || {
1003                 let state = self.state.load(Ordering::Relaxed);
1004                 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
1005             };
1006             let before_sleep = || {};
1007             let timed_out = |_, _| {};
1008             // SAFETY:
1009             //   * `addr` is an address we control.
1010             //   * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1011             //   * `before_sleep` does not call `park`, nor does it panic.
1012             let park_result = unsafe {
1013                 parking_lot_core::park(
1014                     addr,
1015                     validate,
1016                     before_sleep,
1017                     timed_out,
1018                     TOKEN_EXCLUSIVE,
1019                     timeout,
1020                 )
1021             };
1022             match park_result {
1023                 // We still need to re-check the state if we are unparked
1024                 // since a previous writer timing-out could have allowed
1025                 // another reader to sneak in before we parked.
1026                 ParkResult::Unparked(_) | ParkResult::Invalid => {
1027                     state = self.state.load(Ordering::Acquire);
1028                     continue;
1029                 }
1030 
1031                 // Timeout expired
1032                 ParkResult::TimedOut => {
1033                     // We need to release WRITER_BIT and revert back to
1034                     // our previous value. We also wake up any threads that
1035                     // might be waiting on WRITER_BIT.
1036                     let state = self.state.fetch_add(
1037                         prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT),
1038                         Ordering::Relaxed,
1039                     );
1040                     if state & PARKED_BIT != 0 {
1041                         let callback = |_, result: UnparkResult| {
1042                             // Clear the parked bit if there no more parked threads
1043                             if !result.have_more_threads {
1044                                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1045                             }
1046                             TOKEN_NORMAL
1047                         };
1048                         // SAFETY: `callback` does not panic or call any function of `parking_lot`.
1049                         unsafe {
1050                             self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
1051                         }
1052                     }
1053                     return false;
1054                 }
1055             }
1056         }
1057         true
1058     }
1059 
1060     /// Common code for acquiring a lock
1061     #[inline]
lock_common( &self, timeout: Option<Instant>, token: ParkToken, mut try_lock: impl FnMut(&mut usize) -> bool, validate_flags: usize, ) -> bool1062     fn lock_common(
1063         &self,
1064         timeout: Option<Instant>,
1065         token: ParkToken,
1066         mut try_lock: impl FnMut(&mut usize) -> bool,
1067         validate_flags: usize,
1068     ) -> bool {
1069         let mut spinwait = SpinWait::new();
1070         let mut state = self.state.load(Ordering::Relaxed);
1071         loop {
1072             // Attempt to grab the lock
1073             if try_lock(&mut state) {
1074                 return true;
1075             }
1076 
1077             // If there are no parked threads, try spinning a few times.
1078             if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1079                 state = self.state.load(Ordering::Relaxed);
1080                 continue;
1081             }
1082 
1083             // Set the parked bit
1084             if state & PARKED_BIT == 0 {
1085                 if let Err(x) = self.state.compare_exchange_weak(
1086                     state,
1087                     state | PARKED_BIT,
1088                     Ordering::Relaxed,
1089                     Ordering::Relaxed,
1090                 ) {
1091                     state = x;
1092                     continue;
1093                 }
1094             }
1095 
1096             // Park our thread until we are woken up by an unlock
1097             let addr = self as *const _ as usize;
1098             let validate = || {
1099                 let state = self.state.load(Ordering::Relaxed);
1100                 state & PARKED_BIT != 0 && (state & validate_flags != 0)
1101             };
1102             let before_sleep = || {};
1103             let timed_out = |_, was_last_thread| {
1104                 // Clear the parked bit if we were the last parked thread
1105                 if was_last_thread {
1106                     self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1107                 }
1108             };
1109 
1110             // SAFETY:
1111             // * `addr` is an address we control.
1112             // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1113             // * `before_sleep` does not call `park`, nor does it panic.
1114             let park_result = unsafe {
1115                 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1116             };
1117             match park_result {
1118                 // The thread that unparked us passed the lock on to us
1119                 // directly without unlocking it.
1120                 ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1121 
1122                 // We were unparked normally, try acquiring the lock again
1123                 ParkResult::Unparked(_) => (),
1124 
1125                 // The validation function failed, try locking again
1126                 ParkResult::Invalid => (),
1127 
1128                 // Timeout expired
1129                 ParkResult::TimedOut => return false,
1130             }
1131 
1132             // Loop back and try locking again
1133             spinwait.reset();
1134             state = self.state.load(Ordering::Relaxed);
1135         }
1136     }
1137 
1138     #[inline]
deadlock_acquire(&self)1139     fn deadlock_acquire(&self) {
1140         unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1141         unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1142     }
1143 
1144     #[inline]
deadlock_release(&self)1145     fn deadlock_release(&self) {
1146         unsafe { deadlock::release_resource(self as *const _ as usize) };
1147         unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1148     }
1149 }
1150