xref: /aosp_15_r20/external/crosvm/cros_async/src/sync/mu.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::UnsafeCell;
6 use std::hint;
7 use std::mem;
8 use std::ops::Deref;
9 use std::ops::DerefMut;
10 use std::sync::atomic::AtomicUsize;
11 use std::sync::atomic::Ordering;
12 use std::sync::Arc;
13 use std::thread::yield_now;
14 
15 use super::super::sync::waiter::Kind as WaiterKind;
16 use super::super::sync::waiter::Waiter;
17 use super::super::sync::waiter::WaiterAdapter;
18 use super::super::sync::waiter::WaiterList;
19 use super::super::sync::waiter::WaitingFor;
20 
21 // Set when the rwlock is exclusively locked.
22 const LOCKED: usize = 1 << 0;
23 // Set when there are one or more threads waiting to acquire the lock.
24 const HAS_WAITERS: usize = 1 << 1;
25 // Set when a thread has been woken up from the wait queue. Cleared when that thread either acquires
26 // the lock or adds itself back into the wait queue. Used to prevent unnecessary wake ups when a
27 // thread has been removed from the wait queue but has not gotten CPU time yet.
28 const DESIGNATED_WAKER: usize = 1 << 2;
29 // Used to provide exclusive access to the `waiters` field in `RwLock`. Should only be held while
30 // modifying the waiter list.
31 const SPINLOCK: usize = 1 << 3;
32 // Set when a thread that wants an exclusive lock adds itself to the wait queue. New threads
33 // attempting to acquire a shared lock will be preventing from getting it when this bit is set.
34 // However, this bit is ignored once a thread has gone through the wait queue at least once.
35 const WRITER_WAITING: usize = 1 << 4;
36 // Set when a thread has gone through the wait queue many times but has failed to acquire the lock
37 // every time it is woken up. When this bit is set, all other threads are prevented from acquiring
38 // the lock until the thread that set the `LONG_WAIT` bit has acquired the lock.
39 const LONG_WAIT: usize = 1 << 5;
40 // The bit that is added to the rwlock state in order to acquire a shared lock. Since more than one
41 // thread can acquire a shared lock, we cannot use a single bit. Instead we use all the remaining
42 // bits in the state to track the number of threads that have acquired a shared lock.
43 const READ_LOCK: usize = 1 << 8;
44 // Mask used for checking if any threads currently hold a shared lock.
45 const READ_MASK: usize = !0xff;
46 
47 // The number of times the thread should just spin and attempt to re-acquire the lock.
48 const SPIN_THRESHOLD: usize = 7;
49 
50 // The number of times the thread needs to go through the wait queue before it sets the `LONG_WAIT`
51 // bit and forces all other threads to wait for it to acquire the lock. This value is set relatively
52 // high so that we don't lose the benefit of having running threads unless it is absolutely
53 // necessary.
54 const LONG_WAIT_THRESHOLD: usize = 19;
55 
56 // Common methods between shared and exclusive locks.
57 trait Kind {
58     // The bits that must be zero for the thread to acquire this kind of lock. If any of these bits
59     // are not zero then the thread will first spin and retry a few times before adding itself to
60     // the wait queue.
zero_to_acquire() -> usize61     fn zero_to_acquire() -> usize;
62 
63     // The bit that must be added in order to acquire this kind of lock. This should either be
64     // `LOCKED` or `READ_LOCK`.
add_to_acquire() -> usize65     fn add_to_acquire() -> usize;
66 
67     // The bits that should be set when a thread adds itself to the wait queue while waiting to
68     // acquire this kind of lock.
set_when_waiting() -> usize69     fn set_when_waiting() -> usize;
70 
71     // The bits that should be cleared when a thread acquires this kind of lock.
clear_on_acquire() -> usize72     fn clear_on_acquire() -> usize;
73 
74     // The waiter that a thread should use when waiting to acquire this kind of lock.
new_waiter(raw: &RawRwLock) -> Arc<Waiter>75     fn new_waiter(raw: &RawRwLock) -> Arc<Waiter>;
76 }
77 
78 // A lock type for shared read-only access to the data. More than one thread may hold this kind of
79 // lock simultaneously.
80 struct Shared;
81 
82 impl Kind for Shared {
zero_to_acquire() -> usize83     fn zero_to_acquire() -> usize {
84         LOCKED | WRITER_WAITING | LONG_WAIT
85     }
86 
add_to_acquire() -> usize87     fn add_to_acquire() -> usize {
88         READ_LOCK
89     }
90 
set_when_waiting() -> usize91     fn set_when_waiting() -> usize {
92         0
93     }
94 
clear_on_acquire() -> usize95     fn clear_on_acquire() -> usize {
96         0
97     }
98 
new_waiter(raw: &RawRwLock) -> Arc<Waiter>99     fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
100         Arc::new(Waiter::new(
101             WaiterKind::Shared,
102             cancel_waiter,
103             raw as *const RawRwLock as usize,
104             WaitingFor::Mutex,
105         ))
106     }
107 }
108 
109 // A lock type for mutually exclusive read-write access to the data. Only one thread can hold this
110 // kind of lock at a time.
111 struct Exclusive;
112 
113 impl Kind for Exclusive {
zero_to_acquire() -> usize114     fn zero_to_acquire() -> usize {
115         LOCKED | READ_MASK | LONG_WAIT
116     }
117 
add_to_acquire() -> usize118     fn add_to_acquire() -> usize {
119         LOCKED
120     }
121 
set_when_waiting() -> usize122     fn set_when_waiting() -> usize {
123         WRITER_WAITING
124     }
125 
clear_on_acquire() -> usize126     fn clear_on_acquire() -> usize {
127         WRITER_WAITING
128     }
129 
new_waiter(raw: &RawRwLock) -> Arc<Waiter>130     fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
131         Arc::new(Waiter::new(
132             WaiterKind::Exclusive,
133             cancel_waiter,
134             raw as *const RawRwLock as usize,
135             WaitingFor::Mutex,
136         ))
137     }
138 }
139 
140 // Scan `waiters` and return the ones that should be woken up. Also returns any bits that should be
141 // set in the rwlock state when the current thread releases the spin lock protecting the waiter
142 // list.
143 //
144 // If the first waiter is trying to acquire a shared lock, then all waiters in the list that are
145 // waiting for a shared lock are also woken up. If any waiters waiting for an exclusive lock are
146 // found when iterating through the list, then the returned `usize` contains the `WRITER_WAITING`
147 // bit, which should be set when the thread releases the spin lock.
148 //
149 // If the first waiter is trying to acquire an exclusive lock, then only that waiter is returned and
150 // no bits are set in the returned `usize`.
get_wake_list(waiters: &mut WaiterList) -> (WaiterList, usize)151 fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, usize) {
152     let mut to_wake = WaiterList::new(WaiterAdapter::new());
153     let mut set_on_release = 0;
154     let mut cursor = waiters.front_mut();
155 
156     let mut waking_readers = false;
157     while let Some(w) = cursor.get() {
158         match w.kind() {
159             WaiterKind::Exclusive if !waking_readers => {
160                 // This is the first waiter and it's a writer. No need to check the other waiters.
161                 let waiter = cursor.remove().unwrap();
162                 waiter.set_waiting_for(WaitingFor::None);
163                 to_wake.push_back(waiter);
164                 break;
165             }
166 
167             WaiterKind::Shared => {
168                 // This is a reader and the first waiter in the list was not a writer so wake up all
169                 // the readers in the wait list.
170                 let waiter = cursor.remove().unwrap();
171                 waiter.set_waiting_for(WaitingFor::None);
172                 to_wake.push_back(waiter);
173                 waking_readers = true;
174             }
175 
176             WaiterKind::Exclusive => {
177                 // We found a writer while looking for more readers to wake up. Set the
178                 // `WRITER_WAITING` bit to prevent any new readers from acquiring the lock. All
179                 // readers currently in the wait list will ignore this bit since they already waited
180                 // once.
181                 set_on_release |= WRITER_WAITING;
182                 cursor.move_next();
183             }
184         }
185     }
186 
187     (to_wake, set_on_release)
188 }
189 
190 #[inline]
cpu_relax(iterations: usize)191 fn cpu_relax(iterations: usize) {
192     for _ in 0..iterations {
193         hint::spin_loop();
194     }
195 }
196 
197 pub(crate) struct RawRwLock {
198     state: AtomicUsize,
199     waiters: UnsafeCell<WaiterList>,
200 }
201 
202 impl RawRwLock {
new() -> RawRwLock203     pub fn new() -> RawRwLock {
204         RawRwLock {
205             state: AtomicUsize::new(0),
206             waiters: UnsafeCell::new(WaiterList::new(WaiterAdapter::new())),
207         }
208     }
209 
210     #[inline]
lock(&self)211     pub async fn lock(&self) {
212         match self
213             .state
214             .compare_exchange_weak(0, LOCKED, Ordering::Acquire, Ordering::Relaxed)
215         {
216             Ok(_) => {}
217             Err(oldstate) => {
218                 // If any bits that should be zero are not zero or if we fail to acquire the lock
219                 // with a single compare_exchange then go through the slow path.
220                 if (oldstate & Exclusive::zero_to_acquire()) != 0
221                     || self
222                         .state
223                         .compare_exchange_weak(
224                             oldstate,
225                             (oldstate + Exclusive::add_to_acquire())
226                                 & !Exclusive::clear_on_acquire(),
227                             Ordering::Acquire,
228                             Ordering::Relaxed,
229                         )
230                         .is_err()
231                 {
232                     self.lock_slow::<Exclusive>(0, 0).await;
233                 }
234             }
235         }
236     }
237 
238     #[inline]
read_lock(&self)239     pub async fn read_lock(&self) {
240         match self
241             .state
242             .compare_exchange_weak(0, READ_LOCK, Ordering::Acquire, Ordering::Relaxed)
243         {
244             Ok(_) => {}
245             Err(oldstate) => {
246                 if (oldstate & Shared::zero_to_acquire()) != 0
247                     || self
248                         .state
249                         .compare_exchange_weak(
250                             oldstate,
251                             (oldstate + Shared::add_to_acquire()) & !Shared::clear_on_acquire(),
252                             Ordering::Acquire,
253                             Ordering::Relaxed,
254                         )
255                         .is_err()
256                 {
257                     self.lock_slow::<Shared>(0, 0).await;
258                 }
259             }
260         }
261     }
262 
263     // Slow path for acquiring the lock. `clear` should contain any bits that need to be cleared
264     // when the lock is acquired. Any bits set in `zero_mask` are cleared from the bits returned by
265     // `K::zero_to_acquire()`.
266     #[cold]
lock_slow<K: Kind>(&self, mut clear: usize, zero_mask: usize)267     async fn lock_slow<K: Kind>(&self, mut clear: usize, zero_mask: usize) {
268         let mut zero_to_acquire = K::zero_to_acquire() & !zero_mask;
269 
270         let mut spin_count = 0;
271         let mut wait_count = 0;
272         let mut waiter = None;
273         loop {
274             let oldstate = self.state.load(Ordering::Relaxed);
275             //  If all the bits in `zero_to_acquire` are actually zero then try to acquire the lock
276             //  directly.
277             if (oldstate & zero_to_acquire) == 0 {
278                 if self
279                     .state
280                     .compare_exchange_weak(
281                         oldstate,
282                         (oldstate + K::add_to_acquire()) & !(clear | K::clear_on_acquire()),
283                         Ordering::Acquire,
284                         Ordering::Relaxed,
285                     )
286                     .is_ok()
287                 {
288                     return;
289                 }
290             } else if (oldstate & SPINLOCK) == 0 {
291                 // The rwlock is locked and the spin lock is available.  Try to add this thread to
292                 // the waiter queue.
293                 let w = waiter.get_or_insert_with(|| K::new_waiter(self));
294                 w.reset(WaitingFor::Mutex);
295 
296                 if self
297                     .state
298                     .compare_exchange_weak(
299                         oldstate,
300                         (oldstate | SPINLOCK | HAS_WAITERS | K::set_when_waiting()) & !clear,
301                         Ordering::Acquire,
302                         Ordering::Relaxed,
303                     )
304                     .is_ok()
305                 {
306                     let mut set_on_release = 0;
307 
308                     if wait_count < LONG_WAIT_THRESHOLD {
309                         // Add the waiter to the back of the queue.
310                         // SAFETY:
311                         // Safe because we have acquired the spin lock and it provides exclusive
312                         // access to the waiter queue.
313                         unsafe { (*self.waiters.get()).push_back(w.clone()) };
314                     } else {
315                         // This waiter has gone through the queue too many times. Put it in the
316                         // front of the queue and block all other threads from acquiring the lock
317                         // until this one has acquired it at least once.
318                         // SAFETY:
319                         // Safe because we have acquired the spin lock and it provides exclusive
320                         // access to the waiter queue.
321                         unsafe { (*self.waiters.get()).push_front(w.clone()) };
322 
323                         // Set the LONG_WAIT bit to prevent all other threads from acquiring the
324                         // lock.
325                         set_on_release |= LONG_WAIT;
326 
327                         // Make sure we clear the LONG_WAIT bit when we do finally get the lock.
328                         clear |= LONG_WAIT;
329 
330                         // Since we set the LONG_WAIT bit we shouldn't allow that bit to prevent us
331                         // from acquiring the lock.
332                         zero_to_acquire &= !LONG_WAIT;
333                     }
334 
335                     // Release the spin lock.
336                     let mut state = oldstate;
337                     loop {
338                         match self.state.compare_exchange_weak(
339                             state,
340                             (state | set_on_release) & !SPINLOCK,
341                             Ordering::Release,
342                             Ordering::Relaxed,
343                         ) {
344                             Ok(_) => break,
345                             Err(w) => state = w,
346                         }
347                     }
348 
349                     // Now wait until we are woken.
350                     w.wait().await;
351 
352                     // The `DESIGNATED_WAKER` bit gets set when this thread is woken up by the
353                     // thread that originally held the lock. While this bit is set, no other waiters
354                     // will be woken up so it's important to clear it the next time we try to
355                     // acquire the main lock or the spin lock.
356                     clear |= DESIGNATED_WAKER;
357 
358                     // Now that the thread has waited once, we no longer care if there is a writer
359                     // waiting. Only the limits of mutual exclusion can prevent us from acquiring
360                     // the lock.
361                     zero_to_acquire &= !WRITER_WAITING;
362 
363                     // Reset the spin count since we just went through the wait queue.
364                     spin_count = 0;
365 
366                     // Increment the wait count since we went through the wait queue.
367                     wait_count += 1;
368 
369                     // Skip the `cpu_relax` below.
370                     continue;
371                 }
372             }
373 
374             // Both the lock and the spin lock are held by one or more other threads. First, we'll
375             // spin a few times in case we can acquire the lock or the spin lock. If that fails then
376             // we yield because we might be preventing the threads that do hold the 2 locks from
377             // getting cpu time.
378             if spin_count < SPIN_THRESHOLD {
379                 cpu_relax(1 << spin_count);
380                 spin_count += 1;
381             } else {
382                 yield_now();
383             }
384         }
385     }
386 
387     #[inline]
unlock(&self)388     pub fn unlock(&self) {
389         // Fast path, if possible. We can directly clear the locked bit since we have exclusive
390         // access to the rwlock.
391         let oldstate = self.state.fetch_sub(LOCKED, Ordering::Release);
392 
393         // Panic if we just tried to unlock a rwlock that wasn't held by this thread. This shouldn't
394         // really be possible since `unlock` is not a public method.
395         debug_assert_eq!(
396             oldstate & READ_MASK,
397             0,
398             "`unlock` called on rwlock held in read-mode"
399         );
400         debug_assert_ne!(
401             oldstate & LOCKED,
402             0,
403             "`unlock` called on rwlock not held in write-mode"
404         );
405 
406         if (oldstate & HAS_WAITERS) != 0 && (oldstate & DESIGNATED_WAKER) == 0 {
407             // The oldstate has waiters but no designated waker has been chosen yet.
408             self.unlock_slow();
409         }
410     }
411 
412     #[inline]
read_unlock(&self)413     pub fn read_unlock(&self) {
414         // Fast path, if possible. We can directly subtract the READ_LOCK bit since we had
415         // previously added it.
416         let oldstate = self.state.fetch_sub(READ_LOCK, Ordering::Release);
417 
418         debug_assert_eq!(
419             oldstate & LOCKED,
420             0,
421             "`read_unlock` called on rwlock held in write-mode"
422         );
423         debug_assert_ne!(
424             oldstate & READ_MASK,
425             0,
426             "`read_unlock` called on rwlock not held in read-mode"
427         );
428 
429         if (oldstate & HAS_WAITERS) != 0
430             && (oldstate & DESIGNATED_WAKER) == 0
431             && (oldstate & READ_MASK) == READ_LOCK
432         {
433             // There are waiters, no designated waker has been chosen yet, and the last reader is
434             // unlocking so we have to take the slow path.
435             self.unlock_slow();
436         }
437     }
438 
439     #[cold]
unlock_slow(&self)440     fn unlock_slow(&self) {
441         let mut spin_count = 0;
442 
443         loop {
444             let oldstate = self.state.load(Ordering::Relaxed);
445             if (oldstate & HAS_WAITERS) == 0 || (oldstate & DESIGNATED_WAKER) != 0 {
446                 // No more waiters or a designated waker has been chosen. Nothing left for us to do.
447                 return;
448             } else if (oldstate & SPINLOCK) == 0 {
449                 // The spin lock is not held by another thread. Try to acquire it. Also set the
450                 // `DESIGNATED_WAKER` bit since we are likely going to wake up one or more threads.
451                 if self
452                     .state
453                     .compare_exchange_weak(
454                         oldstate,
455                         oldstate | SPINLOCK | DESIGNATED_WAKER,
456                         Ordering::Acquire,
457                         Ordering::Relaxed,
458                     )
459                     .is_ok()
460                 {
461                     // Acquired the spinlock. Try to wake a waiter. We may also end up wanting to
462                     // clear the HAS_WAITER and DESIGNATED_WAKER bits so start collecting the bits
463                     // to be cleared.
464                     let mut clear = SPINLOCK;
465 
466                     // SAFETY:
467                     // Safe because the spinlock guarantees exclusive access to the waiter list and
468                     // the reference does not escape this function.
469                     let waiters = unsafe { &mut *self.waiters.get() };
470                     let (wake_list, set_on_release) = get_wake_list(waiters);
471 
472                     // If the waiter list is now empty, clear the HAS_WAITERS bit.
473                     if waiters.is_empty() {
474                         clear |= HAS_WAITERS;
475                     }
476 
477                     if wake_list.is_empty() {
478                         // Since we are not going to wake any waiters clear the DESIGNATED_WAKER bit
479                         // that we set when we acquired the spin lock.
480                         clear |= DESIGNATED_WAKER;
481                     }
482 
483                     // Release the spin lock and clear any other bits as necessary. Also, set any
484                     // bits returned by `get_wake_list`. For now, this is just the `WRITER_WAITING`
485                     // bit, which needs to be set when we are waking up a bunch of readers and there
486                     // are still writers in the wait queue. This will prevent any readers that
487                     // aren't in `wake_list` from acquiring the read lock.
488                     let mut state = oldstate;
489                     loop {
490                         match self.state.compare_exchange_weak(
491                             state,
492                             (state | set_on_release) & !clear,
493                             Ordering::Release,
494                             Ordering::Relaxed,
495                         ) {
496                             Ok(_) => break,
497                             Err(w) => state = w,
498                         }
499                     }
500 
501                     // Now wake the waiters, if any.
502                     for w in wake_list {
503                         w.wake();
504                     }
505 
506                     // We're done.
507                     return;
508                 }
509             }
510 
511             // Spin and try again.  It's ok to block here as we have already released the lock.
512             if spin_count < SPIN_THRESHOLD {
513                 cpu_relax(1 << spin_count);
514                 spin_count += 1;
515             } else {
516                 yield_now();
517             }
518         }
519     }
520 
cancel_waiter(&self, waiter: &Waiter, wake_next: bool)521     fn cancel_waiter(&self, waiter: &Waiter, wake_next: bool) {
522         let mut oldstate = self.state.load(Ordering::Relaxed);
523         while oldstate & SPINLOCK != 0
524             || self
525                 .state
526                 .compare_exchange_weak(
527                     oldstate,
528                     oldstate | SPINLOCK,
529                     Ordering::Acquire,
530                     Ordering::Relaxed,
531                 )
532                 .is_err()
533         {
534             hint::spin_loop();
535             oldstate = self.state.load(Ordering::Relaxed);
536         }
537 
538         // SAFETY:
539         // Safe because the spin lock provides exclusive access and the reference does not escape
540         // this function.
541         let waiters = unsafe { &mut *self.waiters.get() };
542 
543         let mut clear = SPINLOCK;
544 
545         // If we are about to remove the first waiter in the wait list, then clear the LONG_WAIT
546         // bit. Also clear the bit if we are going to be waking some other waiters. In this case the
547         // waiter that set the bit may have already been removed from the waiter list (and could be
548         // the one that is currently being dropped). If it is still in the waiter list then clearing
549         // this bit may starve it for one more iteration through the lock_slow() loop, whereas not
550         // clearing this bit could cause a deadlock if the waiter that set it is the one that is
551         // being dropped.
552         if wake_next
553             || waiters
554                 .front()
555                 .get()
556                 .map(|front| std::ptr::eq(front, waiter))
557                 .unwrap_or(false)
558         {
559             clear |= LONG_WAIT;
560         }
561 
562         let waiting_for = waiter.is_waiting_for();
563 
564         // Don't drop the old waiter while holding the spin lock.
565         let old_waiter = if waiter.is_linked() && waiting_for == WaitingFor::Mutex {
566             // SAFETY:
567             // We know that the waiter is still linked and is waiting for the rwlock, which
568             // guarantees that it is still linked into `self.waiters`.
569             let mut cursor = unsafe { waiters.cursor_mut_from_ptr(waiter as *const Waiter) };
570             cursor.remove()
571         } else {
572             None
573         };
574 
575         let (wake_list, set_on_release) = if wake_next || waiting_for == WaitingFor::None {
576             // Either the waiter was already woken or it's been removed from the rwlock's waiter
577             // list and is going to be woken. Either way, we need to wake up another thread.
578             get_wake_list(waiters)
579         } else {
580             (WaiterList::new(WaiterAdapter::new()), 0)
581         };
582 
583         if waiters.is_empty() {
584             clear |= HAS_WAITERS;
585         }
586 
587         if wake_list.is_empty() {
588             // We're not waking any other threads so clear the DESIGNATED_WAKER bit. In the worst
589             // case this leads to an additional thread being woken up but we risk a deadlock if we
590             // don't clear it.
591             clear |= DESIGNATED_WAKER;
592         }
593 
594         if let WaiterKind::Exclusive = waiter.kind() {
595             // The waiter being dropped is a writer so clear the writer waiting bit for now. If we
596             // found more writers in the list while fetching waiters to wake up then this bit will
597             // be set again via `set_on_release`.
598             clear |= WRITER_WAITING;
599         }
600 
601         while self
602             .state
603             .compare_exchange_weak(
604                 oldstate,
605                 (oldstate & !clear) | set_on_release,
606                 Ordering::Release,
607                 Ordering::Relaxed,
608             )
609             .is_err()
610         {
611             hint::spin_loop();
612             oldstate = self.state.load(Ordering::Relaxed);
613         }
614 
615         for w in wake_list {
616             w.wake();
617         }
618 
619         mem::drop(old_waiter);
620     }
621 }
622 
623 // TODO(b/315998194): Add safety comment
624 #[allow(clippy::undocumented_unsafe_blocks)]
625 unsafe impl Send for RawRwLock {}
626 // TODO(b/315998194): Add safety comment
627 #[allow(clippy::undocumented_unsafe_blocks)]
628 unsafe impl Sync for RawRwLock {}
629 
cancel_waiter(raw: usize, waiter: &Waiter, wake_next: bool)630 fn cancel_waiter(raw: usize, waiter: &Waiter, wake_next: bool) {
631     let raw_rwlock = raw as *const RawRwLock;
632 
633     // SAFETY:
634     // Safe because the thread that owns the waiter that is being canceled must also own a reference
635     // to the rwlock, which ensures that this pointer is valid.
636     unsafe { (*raw_rwlock).cancel_waiter(waiter, wake_next) }
637 }
638 
639 /// A high-level primitive that provides safe, mutable access to a shared resource.
640 ///
641 /// `RwLock` safely provides both shared, immutable access (via `read_lock()`) as well as exclusive,
642 /// mutable access (via `lock()`) to an underlying resource asynchronously while ensuring fairness
643 /// with no loss of performance. If you don't need `read_lock()` nor fairness, try upstream
644 /// `futures::lock::Mutex` instead.
645 ///
646 /// # Poisoning
647 ///
648 /// `RwLock` does not support lock poisoning so if a thread panics while holding the lock, the
649 /// poisoned data will be accessible by other threads in your program. If you need to guarantee that
650 /// other threads cannot access poisoned data then you may wish to wrap this `RwLock` inside another
651 /// type that provides the poisoning feature. See the implementation of `std::sync::Mutex` for an
652 /// example of this. Note `futures::lock::Mutex` does not support poisoning either.
653 ///
654 ///
655 /// # Fairness
656 ///
657 /// This `RwLock` implementation does not guarantee that threads will acquire the lock in the same
658 /// order that they call `lock()` or `read_lock()`. However it will attempt to prevent long-term
659 /// starvation: if a thread repeatedly fails to acquire the lock beyond a threshold then all other
660 /// threads will fail to acquire the lock until the starved thread has acquired it. Note, on the
661 /// other hand, `futures::lock::Mutex` does not guarantee fairness.
662 ///
663 /// Similarly, this `RwLock` will attempt to balance reader and writer threads: once there is a
664 /// writer thread waiting to acquire the lock no new reader threads will be allowed to acquire it.
665 /// However, any reader threads that were already waiting will still be allowed to acquire it.
666 ///
667 /// # Examples
668 ///
669 /// ```edition2018
670 /// use std::sync::Arc;
671 /// use std::thread;
672 /// use std::sync::mpsc::channel;
673 ///
674 /// use cros_async::{block_on, sync::RwLock};
675 ///
676 /// const N: usize = 10;
677 ///
678 /// // Spawn a few threads to increment a shared variable (non-atomically), and
679 /// // let the main thread know once all increments are done.
680 /// //
681 /// // Here we're using an Arc to share memory among threads, and the data inside
682 /// // the Arc is protected with a rwlock.
683 /// let data = Arc::new(RwLock::new(0));
684 ///
685 /// let (tx, rx) = channel();
686 /// for _ in 0..N {
687 ///     let (data, tx) = (Arc::clone(&data), tx.clone());
688 ///     thread::spawn(move || {
689 ///         // The shared state can only be accessed once the lock is held.
690 ///         // Our non-atomic increment is safe because we're the only thread
691 ///         // which can access the shared state when the lock is held.
692 ///         let mut data = block_on(data.lock());
693 ///         *data += 1;
694 ///         if *data == N {
695 ///             tx.send(()).unwrap();
696 ///         }
697 ///         // the lock is unlocked here when `data` goes out of scope.
698 ///     });
699 /// }
700 ///
701 /// rx.recv().unwrap();
702 /// ```
703 #[repr(align(128))]
704 pub struct RwLock<T: ?Sized> {
705     raw: RawRwLock,
706     value: UnsafeCell<T>,
707 }
708 
709 impl<T> RwLock<T> {
710     /// Create a new, unlocked `RwLock` ready for use.
new(v: T) -> RwLock<T>711     pub fn new(v: T) -> RwLock<T> {
712         RwLock {
713             raw: RawRwLock::new(),
714             value: UnsafeCell::new(v),
715         }
716     }
717 
718     /// Consume the `RwLock` and return the contained value. This method does not perform any
719     /// locking as the compiler will guarantee that there are no other references to `self` and the
720     /// caller owns the `RwLock`.
into_inner(self) -> T721     pub fn into_inner(self) -> T {
722         // Don't need to acquire the lock because the compiler guarantees that there are
723         // no references to `self`.
724         self.value.into_inner()
725     }
726 }
727 
728 impl<T: ?Sized> RwLock<T> {
729     /// Acquires exclusive, mutable access to the resource protected by the `RwLock`, blocking the
730     /// current thread until it is able to do so. Upon returning, the current thread will be the
731     /// only thread with access to the resource. The `RwLock` will be released when the returned
732     /// `RwLockWriteGuard` is dropped.
733     ///
734     /// Calling `lock()` while holding a `RwLockWriteGuard` or a `RwLockReadGuard` will cause a
735     /// deadlock.
736     ///
737     /// Callers that are not in an async context may wish to use the `block_on` method to block the
738     /// thread until the `RwLock` is acquired.
739     #[inline]
lock(&self) -> RwLockWriteGuard<'_, T>740     pub async fn lock(&self) -> RwLockWriteGuard<'_, T> {
741         self.raw.lock().await;
742 
743         RwLockWriteGuard {
744             mu: self,
745             // SAFETY:
746             // Safe because we have exclusive access to `self.value`.
747             value: unsafe { &mut *self.value.get() },
748         }
749     }
750 
751     /// Acquires shared, immutable access to the resource protected by the `RwLock`, blocking the
752     /// current thread until it is able to do so. Upon returning there may be other threads that
753     /// also have immutable access to the resource but there will not be any threads that have
754     /// mutable access to the resource. When the returned `RwLockReadGuard` is dropped the thread
755     /// releases its access to the resource.
756     ///
757     /// Calling `read_lock()` while holding a `RwLockReadGuard` may deadlock. Calling `read_lock()`
758     /// while holding a `RwLockWriteGuard` will deadlock.
759     ///
760     /// Callers that are not in an async context may wish to use the `block_on` method to block the
761     /// thread until the `RwLock` is acquired.
762     #[inline]
read_lock(&self) -> RwLockReadGuard<'_, T>763     pub async fn read_lock(&self) -> RwLockReadGuard<'_, T> {
764         self.raw.read_lock().await;
765 
766         RwLockReadGuard {
767             mu: self,
768             // SAFETY:
769             // Safe because we have shared read-only access to `self.value`.
770             value: unsafe { &*self.value.get() },
771         }
772     }
773 
774     // Called from `Condvar::wait` when the thread wants to reacquire the lock.
775     #[inline]
lock_from_cv(&self) -> RwLockWriteGuard<'_, T>776     pub(crate) async fn lock_from_cv(&self) -> RwLockWriteGuard<'_, T> {
777         self.raw.lock_slow::<Exclusive>(DESIGNATED_WAKER, 0).await;
778 
779         RwLockWriteGuard {
780             mu: self,
781             // SAFETY:
782             // Safe because we have exclusive access to `self.value`.
783             value: unsafe { &mut *self.value.get() },
784         }
785     }
786 
787     // Like `lock_from_cv` but for acquiring a shared lock.
788     #[inline]
read_lock_from_cv(&self) -> RwLockReadGuard<'_, T>789     pub(crate) async fn read_lock_from_cv(&self) -> RwLockReadGuard<'_, T> {
790         // Threads that have waited in the Condvar's waiter list don't have to care if there is a
791         // writer waiting since they have already waited once.
792         self.raw
793             .lock_slow::<Shared>(DESIGNATED_WAKER, WRITER_WAITING)
794             .await;
795 
796         RwLockReadGuard {
797             mu: self,
798             // SAFETY:
799             // Safe because we have exclusive access to `self.value`.
800             value: unsafe { &*self.value.get() },
801         }
802     }
803 
804     #[inline]
unlock(&self)805     fn unlock(&self) {
806         self.raw.unlock();
807     }
808 
809     #[inline]
read_unlock(&self)810     fn read_unlock(&self) {
811         self.raw.read_unlock();
812     }
813 
get_mut(&mut self) -> &mut T814     pub fn get_mut(&mut self) -> &mut T {
815         // SAFETY:
816         // Safe because the compiler statically guarantees that are no other references to `self`.
817         // This is also why we don't need to acquire the lock first.
818         unsafe { &mut *self.value.get() }
819     }
820 }
821 
822 // TODO(b/315998194): Add safety comment
823 #[allow(clippy::undocumented_unsafe_blocks)]
824 unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
825 // TODO(b/315998194): Add safety comment
826 #[allow(clippy::undocumented_unsafe_blocks)]
827 unsafe impl<T: ?Sized + Send> Sync for RwLock<T> {}
828 
829 impl<T: Default> Default for RwLock<T> {
default() -> Self830     fn default() -> Self {
831         Self::new(Default::default())
832     }
833 }
834 
835 impl<T> From<T> for RwLock<T> {
from(source: T) -> Self836     fn from(source: T) -> Self {
837         Self::new(source)
838     }
839 }
840 
841 /// An RAII implementation of a "scoped exclusive lock" for a `RwLock`. When this structure is
842 /// dropped, the lock will be released. The resource protected by the `RwLock` can be accessed via
843 /// the `Deref` and `DerefMut` implementations of this structure.
844 pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> {
845     mu: &'a RwLock<T>,
846     value: &'a mut T,
847 }
848 
849 impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
into_inner(self) -> &'a RwLock<T>850     pub(crate) fn into_inner(self) -> &'a RwLock<T> {
851         self.mu
852     }
853 
as_raw_rwlock(&self) -> &RawRwLock854     pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
855         &self.mu.raw
856     }
857 }
858 
859 impl<'a, T: ?Sized> Deref for RwLockWriteGuard<'a, T> {
860     type Target = T;
861 
deref(&self) -> &Self::Target862     fn deref(&self) -> &Self::Target {
863         self.value
864     }
865 }
866 
867 impl<'a, T: ?Sized> DerefMut for RwLockWriteGuard<'a, T> {
deref_mut(&mut self) -> &mut Self::Target868     fn deref_mut(&mut self) -> &mut Self::Target {
869         self.value
870     }
871 }
872 
873 impl<'a, T: ?Sized> Drop for RwLockWriteGuard<'a, T> {
drop(&mut self)874     fn drop(&mut self) {
875         self.mu.unlock()
876     }
877 }
878 
879 /// An RAII implementation of a "scoped shared lock" for a `RwLock`. When this structure is dropped,
880 /// the lock will be released. The resource protected by the `RwLock` can be accessed via the
881 /// `Deref` implementation of this structure.
882 pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
883     mu: &'a RwLock<T>,
884     value: &'a T,
885 }
886 
887 impl<'a, T: ?Sized> RwLockReadGuard<'a, T> {
into_inner(self) -> &'a RwLock<T>888     pub(crate) fn into_inner(self) -> &'a RwLock<T> {
889         self.mu
890     }
891 
as_raw_rwlock(&self) -> &RawRwLock892     pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
893         &self.mu.raw
894     }
895 }
896 
897 impl<'a, T: ?Sized> Deref for RwLockReadGuard<'a, T> {
898     type Target = T;
899 
deref(&self) -> &Self::Target900     fn deref(&self) -> &Self::Target {
901         self.value
902     }
903 }
904 
905 impl<'a, T: ?Sized> Drop for RwLockReadGuard<'a, T> {
drop(&mut self)906     fn drop(&mut self) {
907         self.mu.read_unlock()
908     }
909 }
910 
911 // TODO(b/194338842): Fix tests for windows
912 #[cfg(any(target_os = "android", target_os = "linux"))]
913 #[cfg(test)]
914 mod test {
915     use std::future::Future;
916     use std::mem;
917     use std::pin::Pin;
918     use std::rc::Rc;
919     use std::sync::atomic::AtomicUsize;
920     use std::sync::atomic::Ordering;
921     use std::sync::mpsc::channel;
922     use std::sync::mpsc::Sender;
923     use std::sync::Arc;
924     use std::task::Context;
925     use std::task::Poll;
926     use std::task::Waker;
927     use std::thread;
928     use std::time::Duration;
929 
930     use futures::channel::oneshot;
931     use futures::pending;
932     use futures::select;
933     use futures::task::waker_ref;
934     use futures::task::ArcWake;
935     use futures::FutureExt;
936     use futures_executor::LocalPool;
937     use futures_executor::ThreadPool;
938     use futures_util::task::LocalSpawnExt;
939 
940     use super::super::super::block_on;
941     use super::super::super::sync::Condvar;
942     use super::super::super::sync::SpinLock;
943     use super::*;
944 
945     #[derive(Debug, Eq, PartialEq)]
946     struct NonCopy(u32);
947 
948     // Dummy waker used when we want to manually drive futures.
949     struct TestWaker;
950     impl ArcWake for TestWaker {
wake_by_ref(_arc_self: &Arc<Self>)951         fn wake_by_ref(_arc_self: &Arc<Self>) {}
952     }
953 
954     #[test]
it_works()955     fn it_works() {
956         let mu = RwLock::new(NonCopy(13));
957 
958         assert_eq!(*block_on(mu.lock()), NonCopy(13));
959     }
960 
961     #[test]
smoke()962     fn smoke() {
963         let mu = RwLock::new(NonCopy(7));
964 
965         mem::drop(block_on(mu.lock()));
966         mem::drop(block_on(mu.lock()));
967     }
968 
969     #[test]
rw_smoke()970     fn rw_smoke() {
971         let mu = RwLock::new(NonCopy(7));
972 
973         mem::drop(block_on(mu.lock()));
974         mem::drop(block_on(mu.read_lock()));
975         mem::drop((block_on(mu.read_lock()), block_on(mu.read_lock())));
976         mem::drop(block_on(mu.lock()));
977     }
978 
979     #[test]
async_smoke()980     fn async_smoke() {
981         async fn lock(mu: Rc<RwLock<NonCopy>>) {
982             mu.lock().await;
983         }
984 
985         async fn read_lock(mu: Rc<RwLock<NonCopy>>) {
986             mu.read_lock().await;
987         }
988 
989         async fn double_read_lock(mu: Rc<RwLock<NonCopy>>) {
990             let first = mu.read_lock().await;
991             mu.read_lock().await;
992 
993             // Make sure first lives past the second read lock.
994             first.as_raw_rwlock();
995         }
996 
997         let mu = Rc::new(RwLock::new(NonCopy(7)));
998 
999         let mut ex = LocalPool::new();
1000         let spawner = ex.spawner();
1001 
1002         spawner
1003             .spawn_local(lock(Rc::clone(&mu)))
1004             .expect("Failed to spawn future");
1005         spawner
1006             .spawn_local(read_lock(Rc::clone(&mu)))
1007             .expect("Failed to spawn future");
1008         spawner
1009             .spawn_local(double_read_lock(Rc::clone(&mu)))
1010             .expect("Failed to spawn future");
1011         spawner
1012             .spawn_local(lock(Rc::clone(&mu)))
1013             .expect("Failed to spawn future");
1014 
1015         ex.run();
1016     }
1017 
1018     #[test]
send()1019     fn send() {
1020         let mu = RwLock::new(NonCopy(19));
1021 
1022         thread::spawn(move || {
1023             let value = block_on(mu.lock());
1024             assert_eq!(*value, NonCopy(19));
1025         })
1026         .join()
1027         .unwrap();
1028     }
1029 
1030     #[test]
arc_nested()1031     fn arc_nested() {
1032         // Tests nested rwlocks and access to underlying data.
1033         let mu = RwLock::new(1);
1034         let arc = Arc::new(RwLock::new(mu));
1035         thread::spawn(move || {
1036             let nested = block_on(arc.lock());
1037             let lock2 = block_on(nested.lock());
1038             assert_eq!(*lock2, 1);
1039         })
1040         .join()
1041         .unwrap();
1042     }
1043 
1044     #[test]
arc_access_in_unwind()1045     fn arc_access_in_unwind() {
1046         let arc = Arc::new(RwLock::new(1));
1047         let arc2 = arc.clone();
1048         thread::spawn(move || {
1049             struct Unwinder {
1050                 i: Arc<RwLock<i32>>,
1051             }
1052             impl Drop for Unwinder {
1053                 fn drop(&mut self) {
1054                     *block_on(self.i.lock()) += 1;
1055                 }
1056             }
1057             let _u = Unwinder { i: arc2 };
1058             panic!();
1059         })
1060         .join()
1061         .expect_err("thread did not panic");
1062         let lock = block_on(arc.lock());
1063         assert_eq!(*lock, 2);
1064     }
1065 
1066     #[test]
unsized_value()1067     fn unsized_value() {
1068         let rwlock: &RwLock<[i32]> = &RwLock::new([1, 2, 3]);
1069         {
1070             let b = &mut *block_on(rwlock.lock());
1071             b[0] = 4;
1072             b[2] = 5;
1073         }
1074         let expected: &[i32] = &[4, 2, 5];
1075         assert_eq!(&*block_on(rwlock.lock()), expected);
1076     }
1077     #[test]
high_contention()1078     fn high_contention() {
1079         const THREADS: usize = 17;
1080         const ITERATIONS: usize = 103;
1081 
1082         let mut threads = Vec::with_capacity(THREADS);
1083 
1084         let mu = Arc::new(RwLock::new(0usize));
1085         for _ in 0..THREADS {
1086             let mu2 = mu.clone();
1087             threads.push(thread::spawn(move || {
1088                 for _ in 0..ITERATIONS {
1089                     *block_on(mu2.lock()) += 1;
1090                 }
1091             }));
1092         }
1093 
1094         for t in threads.into_iter() {
1095             t.join().unwrap();
1096         }
1097 
1098         assert_eq!(*block_on(mu.read_lock()), THREADS * ITERATIONS);
1099         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1100     }
1101 
1102     #[test]
high_contention_with_cancel()1103     fn high_contention_with_cancel() {
1104         const TASKS: usize = 17;
1105         const ITERATIONS: usize = 103;
1106 
1107         async fn increment(mu: Arc<RwLock<usize>>, alt_mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1108             for _ in 0..ITERATIONS {
1109                 select! {
1110                     mut count = mu.lock().fuse() => *count += 1,
1111                     mut count = alt_mu.lock().fuse() => *count += 1,
1112                 }
1113             }
1114             tx.send(()).expect("Failed to send completion signal");
1115         }
1116 
1117         let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1118 
1119         let mu = Arc::new(RwLock::new(0usize));
1120         let alt_mu = Arc::new(RwLock::new(0usize));
1121 
1122         let (tx, rx) = channel();
1123         for _ in 0..TASKS {
1124             ex.spawn_ok(increment(Arc::clone(&mu), Arc::clone(&alt_mu), tx.clone()));
1125         }
1126 
1127         for _ in 0..TASKS {
1128             if let Err(e) = rx.recv_timeout(Duration::from_secs(10)) {
1129                 panic!("Error while waiting for threads to complete: {}", e);
1130             }
1131         }
1132 
1133         assert_eq!(
1134             *block_on(mu.read_lock()) + *block_on(alt_mu.read_lock()),
1135             TASKS * ITERATIONS
1136         );
1137         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1138         assert_eq!(alt_mu.raw.state.load(Ordering::Relaxed), 0);
1139     }
1140 
1141     #[test]
single_thread_async()1142     fn single_thread_async() {
1143         const TASKS: usize = 17;
1144         const ITERATIONS: usize = 103;
1145 
1146         // Async closures are unstable.
1147         async fn increment(mu: Rc<RwLock<usize>>) {
1148             for _ in 0..ITERATIONS {
1149                 *mu.lock().await += 1;
1150             }
1151         }
1152 
1153         let mut ex = LocalPool::new();
1154         let spawner = ex.spawner();
1155 
1156         let mu = Rc::new(RwLock::new(0usize));
1157         for _ in 0..TASKS {
1158             spawner
1159                 .spawn_local(increment(Rc::clone(&mu)))
1160                 .expect("Failed to spawn task");
1161         }
1162 
1163         ex.run();
1164 
1165         assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1166         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1167     }
1168 
1169     #[test]
multi_thread_async()1170     fn multi_thread_async() {
1171         const TASKS: usize = 17;
1172         const ITERATIONS: usize = 103;
1173 
1174         // Async closures are unstable.
1175         async fn increment(mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1176             for _ in 0..ITERATIONS {
1177                 *mu.lock().await += 1;
1178             }
1179             tx.send(()).expect("Failed to send completion signal");
1180         }
1181 
1182         let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1183 
1184         let mu = Arc::new(RwLock::new(0usize));
1185         let (tx, rx) = channel();
1186         for _ in 0..TASKS {
1187             ex.spawn_ok(increment(Arc::clone(&mu), tx.clone()));
1188         }
1189 
1190         for _ in 0..TASKS {
1191             rx.recv_timeout(Duration::from_secs(5))
1192                 .expect("Failed to receive completion signal");
1193         }
1194         assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1195         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1196     }
1197 
1198     #[test]
get_mut()1199     fn get_mut() {
1200         let mut mu = RwLock::new(NonCopy(13));
1201         *mu.get_mut() = NonCopy(17);
1202 
1203         assert_eq!(mu.into_inner(), NonCopy(17));
1204     }
1205 
1206     #[test]
into_inner()1207     fn into_inner() {
1208         let mu = RwLock::new(NonCopy(29));
1209         assert_eq!(mu.into_inner(), NonCopy(29));
1210     }
1211 
1212     #[test]
into_inner_drop()1213     fn into_inner_drop() {
1214         struct NeedsDrop(Arc<AtomicUsize>);
1215         impl Drop for NeedsDrop {
1216             fn drop(&mut self) {
1217                 self.0.fetch_add(1, Ordering::AcqRel);
1218             }
1219         }
1220 
1221         let value = Arc::new(AtomicUsize::new(0));
1222         let needs_drop = RwLock::new(NeedsDrop(value.clone()));
1223         assert_eq!(value.load(Ordering::Acquire), 0);
1224 
1225         {
1226             let inner = needs_drop.into_inner();
1227             assert_eq!(inner.0.load(Ordering::Acquire), 0);
1228         }
1229 
1230         assert_eq!(value.load(Ordering::Acquire), 1);
1231     }
1232 
1233     #[test]
rw_arc()1234     fn rw_arc() {
1235         const THREADS: isize = 7;
1236         const ITERATIONS: isize = 13;
1237 
1238         let mu = Arc::new(RwLock::new(0isize));
1239         let mu2 = mu.clone();
1240 
1241         let (tx, rx) = channel();
1242         thread::spawn(move || {
1243             let mut guard = block_on(mu2.lock());
1244             for _ in 0..ITERATIONS {
1245                 let tmp = *guard;
1246                 *guard = -1;
1247                 thread::yield_now();
1248                 *guard = tmp + 1;
1249             }
1250             tx.send(()).unwrap();
1251         });
1252 
1253         let mut readers = Vec::with_capacity(10);
1254         for _ in 0..THREADS {
1255             let mu3 = mu.clone();
1256             let handle = thread::spawn(move || {
1257                 let guard = block_on(mu3.read_lock());
1258                 assert!(*guard >= 0);
1259             });
1260 
1261             readers.push(handle);
1262         }
1263 
1264         // Wait for the readers to finish their checks.
1265         for r in readers {
1266             r.join().expect("One or more readers saw a negative value");
1267         }
1268 
1269         // Wait for the writer to finish.
1270         rx.recv_timeout(Duration::from_secs(5)).unwrap();
1271         assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1272         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1273     }
1274 
1275     #[test]
rw_single_thread_async()1276     fn rw_single_thread_async() {
1277         // A Future that returns `Poll::pending` the first time it is polled and `Poll::Ready` every
1278         // time after that.
1279         struct TestFuture {
1280             polled: bool,
1281             waker: Arc<SpinLock<Option<Waker>>>,
1282         }
1283 
1284         impl Future for TestFuture {
1285             type Output = ();
1286 
1287             fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1288                 if self.polled {
1289                     Poll::Ready(())
1290                 } else {
1291                     self.polled = true;
1292                     *self.waker.lock() = Some(cx.waker().clone());
1293                     Poll::Pending
1294                 }
1295             }
1296         }
1297 
1298         fn wake_future(waker: Arc<SpinLock<Option<Waker>>>) {
1299             loop {
1300                 if let Some(w) = waker.lock().take() {
1301                     w.wake();
1302                     return;
1303                 }
1304 
1305                 // This sleep cannot be moved into an else branch because we would end up holding
1306                 // the lock while sleeping due to rust's drop ordering rules.
1307                 thread::sleep(Duration::from_millis(10));
1308             }
1309         }
1310 
1311         async fn writer(mu: Rc<RwLock<isize>>) {
1312             let mut guard = mu.lock().await;
1313             for _ in 0..ITERATIONS {
1314                 let tmp = *guard;
1315                 *guard = -1;
1316                 let waker = Arc::new(SpinLock::new(None));
1317                 let waker2 = Arc::clone(&waker);
1318                 thread::spawn(move || wake_future(waker2));
1319                 let fut = TestFuture {
1320                     polled: false,
1321                     waker,
1322                 };
1323                 fut.await;
1324                 *guard = tmp + 1;
1325             }
1326         }
1327 
1328         async fn reader(mu: Rc<RwLock<isize>>) {
1329             let guard = mu.read_lock().await;
1330             assert!(*guard >= 0);
1331         }
1332 
1333         const TASKS: isize = 7;
1334         const ITERATIONS: isize = 13;
1335 
1336         let mu = Rc::new(RwLock::new(0isize));
1337         let mut ex = LocalPool::new();
1338         let spawner = ex.spawner();
1339 
1340         spawner
1341             .spawn_local(writer(Rc::clone(&mu)))
1342             .expect("Failed to spawn writer");
1343 
1344         for _ in 0..TASKS {
1345             spawner
1346                 .spawn_local(reader(Rc::clone(&mu)))
1347                 .expect("Failed to spawn reader");
1348         }
1349 
1350         ex.run();
1351 
1352         assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1353         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1354     }
1355 
1356     #[test]
rw_multi_thread_async()1357     fn rw_multi_thread_async() {
1358         async fn writer(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1359             let mut guard = mu.lock().await;
1360             for _ in 0..ITERATIONS {
1361                 let tmp = *guard;
1362                 *guard = -1;
1363                 thread::yield_now();
1364                 *guard = tmp + 1;
1365             }
1366 
1367             mem::drop(guard);
1368             tx.send(()).unwrap();
1369         }
1370 
1371         async fn reader(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1372             let guard = mu.read_lock().await;
1373             assert!(*guard >= 0);
1374 
1375             mem::drop(guard);
1376             tx.send(()).expect("Failed to send completion message");
1377         }
1378 
1379         const TASKS: isize = 7;
1380         const ITERATIONS: isize = 13;
1381 
1382         let mu = Arc::new(RwLock::new(0isize));
1383         let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1384 
1385         let (txw, rxw) = channel();
1386         ex.spawn_ok(writer(Arc::clone(&mu), txw));
1387 
1388         let (txr, rxr) = channel();
1389         for _ in 0..TASKS {
1390             ex.spawn_ok(reader(Arc::clone(&mu), txr.clone()));
1391         }
1392 
1393         // Wait for the readers to finish their checks.
1394         for _ in 0..TASKS {
1395             rxr.recv_timeout(Duration::from_secs(5))
1396                 .expect("Failed to receive completion message from reader");
1397         }
1398 
1399         // Wait for the writer to finish.
1400         rxw.recv_timeout(Duration::from_secs(5))
1401             .expect("Failed to receive completion message from writer");
1402 
1403         assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1404         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1405     }
1406 
1407     #[test]
wake_all_readers()1408     fn wake_all_readers() {
1409         async fn read(mu: Arc<RwLock<()>>) {
1410             let g = mu.read_lock().await;
1411             pending!();
1412             mem::drop(g);
1413         }
1414 
1415         async fn write(mu: Arc<RwLock<()>>) {
1416             mu.lock().await;
1417         }
1418 
1419         let mu = Arc::new(RwLock::new(()));
1420         let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
1421             Box::pin(read(mu.clone())),
1422             Box::pin(read(mu.clone())),
1423             Box::pin(read(mu.clone())),
1424             Box::pin(write(mu.clone())),
1425             Box::pin(read(mu.clone())),
1426         ];
1427         const NUM_READERS: usize = 4;
1428 
1429         let arc_waker = Arc::new(TestWaker);
1430         let waker = waker_ref(&arc_waker);
1431         let mut cx = Context::from_waker(&waker);
1432 
1433         // Acquire the lock so that the futures cannot get it.
1434         let g = block_on(mu.lock());
1435 
1436         for r in &mut futures {
1437             if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1438                 panic!("future unexpectedly ready");
1439             }
1440         }
1441 
1442         assert_eq!(
1443             mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1444             HAS_WAITERS
1445         );
1446 
1447         assert_eq!(
1448             mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1449             WRITER_WAITING
1450         );
1451 
1452         // Drop the lock. This should allow all readers to make progress. Since they already waited
1453         // once they should ignore the WRITER_WAITING bit that is currently set.
1454         mem::drop(g);
1455         for r in &mut futures {
1456             if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1457                 panic!("future unexpectedly ready");
1458             }
1459         }
1460 
1461         // Check that all readers were able to acquire the lock.
1462         assert_eq!(
1463             mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1464             READ_LOCK * NUM_READERS
1465         );
1466         assert_eq!(
1467             mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1468             WRITER_WAITING
1469         );
1470 
1471         let mut needs_poll = None;
1472 
1473         // All the readers can now finish but the writer needs to be polled again.
1474         for (i, r) in futures.iter_mut().enumerate() {
1475             match r.as_mut().poll(&mut cx) {
1476                 Poll::Ready(()) => {}
1477                 Poll::Pending => {
1478                     if needs_poll.is_some() {
1479                         panic!("More than one future unable to complete");
1480                     }
1481                     needs_poll = Some(i);
1482                 }
1483             }
1484         }
1485 
1486         if futures[needs_poll.expect("Writer unexpectedly able to complete")]
1487             .as_mut()
1488             .poll(&mut cx)
1489             .is_pending()
1490         {
1491             panic!("Writer unable to complete");
1492         }
1493 
1494         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1495     }
1496 
1497     #[test]
long_wait()1498     fn long_wait() {
1499         async fn tight_loop(mu: Arc<RwLock<bool>>) {
1500             loop {
1501                 let ready = mu.lock().await;
1502                 if *ready {
1503                     break;
1504                 }
1505                 pending!();
1506             }
1507         }
1508 
1509         async fn mark_ready(mu: Arc<RwLock<bool>>) {
1510             *mu.lock().await = true;
1511         }
1512 
1513         let mu = Arc::new(RwLock::new(false));
1514         let mut tl = Box::pin(tight_loop(mu.clone()));
1515         let mut mark = Box::pin(mark_ready(mu.clone()));
1516 
1517         let arc_waker = Arc::new(TestWaker);
1518         let waker = waker_ref(&arc_waker);
1519         let mut cx = Context::from_waker(&waker);
1520 
1521         for _ in 0..=LONG_WAIT_THRESHOLD {
1522             if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1523                 panic!("tight_loop unexpectedly ready");
1524             }
1525 
1526             if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1527                 panic!("mark_ready unexpectedly ready");
1528             }
1529         }
1530 
1531         assert_eq!(
1532             mu.raw.state.load(Ordering::Relaxed),
1533             LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1534         );
1535 
1536         // This time the tight loop will fail to acquire the lock.
1537         if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1538             panic!("tight_loop unexpectedly ready");
1539         }
1540 
1541         // Which will finally allow the mark_ready function to make progress.
1542         if mark.as_mut().poll(&mut cx).is_pending() {
1543             panic!("mark_ready not able to make progress");
1544         }
1545 
1546         // Now the tight loop will finish.
1547         if tl.as_mut().poll(&mut cx).is_pending() {
1548             panic!("tight_loop not able to finish");
1549         }
1550 
1551         assert!(*block_on(mu.lock()));
1552         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1553     }
1554 
1555     #[test]
cancel_long_wait_before_wake()1556     fn cancel_long_wait_before_wake() {
1557         async fn tight_loop(mu: Arc<RwLock<bool>>) {
1558             loop {
1559                 let ready = mu.lock().await;
1560                 if *ready {
1561                     break;
1562                 }
1563                 pending!();
1564             }
1565         }
1566 
1567         async fn mark_ready(mu: Arc<RwLock<bool>>) {
1568             *mu.lock().await = true;
1569         }
1570 
1571         let mu = Arc::new(RwLock::new(false));
1572         let mut tl = Box::pin(tight_loop(mu.clone()));
1573         let mut mark = Box::pin(mark_ready(mu.clone()));
1574 
1575         let arc_waker = Arc::new(TestWaker);
1576         let waker = waker_ref(&arc_waker);
1577         let mut cx = Context::from_waker(&waker);
1578 
1579         for _ in 0..=LONG_WAIT_THRESHOLD {
1580             if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1581                 panic!("tight_loop unexpectedly ready");
1582             }
1583 
1584             if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1585                 panic!("mark_ready unexpectedly ready");
1586             }
1587         }
1588 
1589         assert_eq!(
1590             mu.raw.state.load(Ordering::Relaxed),
1591             LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1592         );
1593 
1594         // Now drop the mark_ready future, which should clear the LONG_WAIT bit.
1595         mem::drop(mark);
1596         assert_eq!(mu.raw.state.load(Ordering::Relaxed), LOCKED);
1597 
1598         mem::drop(tl);
1599         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1600     }
1601 
1602     #[test]
cancel_long_wait_after_wake()1603     fn cancel_long_wait_after_wake() {
1604         async fn tight_loop(mu: Arc<RwLock<bool>>) {
1605             loop {
1606                 let ready = mu.lock().await;
1607                 if *ready {
1608                     break;
1609                 }
1610                 pending!();
1611             }
1612         }
1613 
1614         async fn mark_ready(mu: Arc<RwLock<bool>>) {
1615             *mu.lock().await = true;
1616         }
1617 
1618         let mu = Arc::new(RwLock::new(false));
1619         let mut tl = Box::pin(tight_loop(mu.clone()));
1620         let mut mark = Box::pin(mark_ready(mu.clone()));
1621 
1622         let arc_waker = Arc::new(TestWaker);
1623         let waker = waker_ref(&arc_waker);
1624         let mut cx = Context::from_waker(&waker);
1625 
1626         for _ in 0..=LONG_WAIT_THRESHOLD {
1627             if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1628                 panic!("tight_loop unexpectedly ready");
1629             }
1630 
1631             if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1632                 panic!("mark_ready unexpectedly ready");
1633             }
1634         }
1635 
1636         assert_eq!(
1637             mu.raw.state.load(Ordering::Relaxed),
1638             LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1639         );
1640 
1641         // This time the tight loop will fail to acquire the lock.
1642         if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1643             panic!("tight_loop unexpectedly ready");
1644         }
1645 
1646         // Now drop the mark_ready future, which should clear the LONG_WAIT bit.
1647         mem::drop(mark);
1648         assert_eq!(mu.raw.state.load(Ordering::Relaxed) & LONG_WAIT, 0);
1649 
1650         // Since the lock is not held, we should be able to spawn a future to set the ready flag.
1651         block_on(mark_ready(mu.clone()));
1652 
1653         // Now the tight loop will finish.
1654         if tl.as_mut().poll(&mut cx).is_pending() {
1655             panic!("tight_loop not able to finish");
1656         }
1657 
1658         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1659     }
1660 
1661     #[test]
designated_waker()1662     fn designated_waker() {
1663         async fn inc(mu: Arc<RwLock<usize>>) {
1664             *mu.lock().await += 1;
1665         }
1666 
1667         let mu = Arc::new(RwLock::new(0));
1668 
1669         let mut futures = [
1670             Box::pin(inc(mu.clone())),
1671             Box::pin(inc(mu.clone())),
1672             Box::pin(inc(mu.clone())),
1673         ];
1674 
1675         let arc_waker = Arc::new(TestWaker);
1676         let waker = waker_ref(&arc_waker);
1677         let mut cx = Context::from_waker(&waker);
1678 
1679         let count = block_on(mu.lock());
1680 
1681         // Poll 2 futures. Since neither will be able to acquire the lock, they should get added to
1682         // the waiter list.
1683         if let Poll::Ready(()) = futures[0].as_mut().poll(&mut cx) {
1684             panic!("future unexpectedly ready");
1685         }
1686         if let Poll::Ready(()) = futures[1].as_mut().poll(&mut cx) {
1687             panic!("future unexpectedly ready");
1688         }
1689 
1690         assert_eq!(
1691             mu.raw.state.load(Ordering::Relaxed),
1692             LOCKED | HAS_WAITERS | WRITER_WAITING,
1693         );
1694 
1695         // Now drop the lock. This should set the DESIGNATED_WAKER bit and wake up the first future
1696         // in the wait list.
1697         mem::drop(count);
1698 
1699         assert_eq!(
1700             mu.raw.state.load(Ordering::Relaxed),
1701             DESIGNATED_WAKER | HAS_WAITERS | WRITER_WAITING,
1702         );
1703 
1704         // Now poll the third future.  It should be able to acquire the lock immediately.
1705         if futures[2].as_mut().poll(&mut cx).is_pending() {
1706             panic!("future unable to complete");
1707         }
1708         assert_eq!(*block_on(mu.lock()), 1);
1709 
1710         // There should still be a waiter in the wait list and the DESIGNATED_WAKER bit should still
1711         // be set.
1712         assert_eq!(
1713             mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1714             DESIGNATED_WAKER
1715         );
1716         assert_eq!(
1717             mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1718             HAS_WAITERS
1719         );
1720 
1721         // Now let the future that was woken up run.
1722         if futures[0].as_mut().poll(&mut cx).is_pending() {
1723             panic!("future unable to complete");
1724         }
1725         assert_eq!(*block_on(mu.lock()), 2);
1726 
1727         if futures[1].as_mut().poll(&mut cx).is_pending() {
1728             panic!("future unable to complete");
1729         }
1730         assert_eq!(*block_on(mu.lock()), 3);
1731 
1732         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1733     }
1734 
1735     #[test]
cancel_designated_waker()1736     fn cancel_designated_waker() {
1737         async fn inc(mu: Arc<RwLock<usize>>) {
1738             *mu.lock().await += 1;
1739         }
1740 
1741         let mu = Arc::new(RwLock::new(0));
1742 
1743         let mut fut = Box::pin(inc(mu.clone()));
1744 
1745         let arc_waker = Arc::new(TestWaker);
1746         let waker = waker_ref(&arc_waker);
1747         let mut cx = Context::from_waker(&waker);
1748 
1749         let count = block_on(mu.lock());
1750 
1751         if let Poll::Ready(()) = fut.as_mut().poll(&mut cx) {
1752             panic!("Future unexpectedly ready when lock is held");
1753         }
1754 
1755         // Drop the lock.  This will wake up the future.
1756         mem::drop(count);
1757 
1758         // Now drop the future without polling. This should clear all the state in the rwlock.
1759         mem::drop(fut);
1760 
1761         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1762     }
1763 
1764     #[test]
cancel_before_wake()1765     fn cancel_before_wake() {
1766         async fn inc(mu: Arc<RwLock<usize>>) {
1767             *mu.lock().await += 1;
1768         }
1769 
1770         let mu = Arc::new(RwLock::new(0));
1771 
1772         let mut fut1 = Box::pin(inc(mu.clone()));
1773 
1774         let mut fut2 = Box::pin(inc(mu.clone()));
1775 
1776         let arc_waker = Arc::new(TestWaker);
1777         let waker = waker_ref(&arc_waker);
1778         let mut cx = Context::from_waker(&waker);
1779 
1780         // First acquire the lock.
1781         let count = block_on(mu.lock());
1782 
1783         // Now poll the futures. Since the lock is acquired they will both get queued in the waiter
1784         // list.
1785         match fut1.as_mut().poll(&mut cx) {
1786             Poll::Pending => {}
1787             Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1788         }
1789 
1790         match fut2.as_mut().poll(&mut cx) {
1791             Poll::Pending => {}
1792             Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1793         }
1794 
1795         assert_eq!(
1796             mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1797             WRITER_WAITING
1798         );
1799 
1800         // Drop fut1.  This should remove it from the waiter list but shouldn't wake fut2.
1801         mem::drop(fut1);
1802 
1803         // There should be no designated waker.
1804         assert_eq!(mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER, 0);
1805 
1806         // Since the waiter was a writer, we should clear the WRITER_WAITING bit.
1807         assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1808 
1809         match fut2.as_mut().poll(&mut cx) {
1810             Poll::Pending => {}
1811             Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1812         }
1813 
1814         // Now drop the lock.  This should mark fut2 as ready to make progress.
1815         mem::drop(count);
1816 
1817         match fut2.as_mut().poll(&mut cx) {
1818             Poll::Pending => panic!("Future is not ready to make progress"),
1819             Poll::Ready(()) => {}
1820         }
1821 
1822         // Verify that we only incremented the count once.
1823         assert_eq!(*block_on(mu.lock()), 1);
1824         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1825     }
1826 
1827     #[test]
cancel_after_wake()1828     fn cancel_after_wake() {
1829         async fn inc(mu: Arc<RwLock<usize>>) {
1830             *mu.lock().await += 1;
1831         }
1832 
1833         let mu = Arc::new(RwLock::new(0));
1834 
1835         let mut fut1 = Box::pin(inc(mu.clone()));
1836 
1837         let mut fut2 = Box::pin(inc(mu.clone()));
1838 
1839         let arc_waker = Arc::new(TestWaker);
1840         let waker = waker_ref(&arc_waker);
1841         let mut cx = Context::from_waker(&waker);
1842 
1843         // First acquire the lock.
1844         let count = block_on(mu.lock());
1845 
1846         // Now poll the futures. Since the lock is acquired they will both get queued in the waiter
1847         // list.
1848         match fut1.as_mut().poll(&mut cx) {
1849             Poll::Pending => {}
1850             Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1851         }
1852 
1853         match fut2.as_mut().poll(&mut cx) {
1854             Poll::Pending => {}
1855             Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1856         }
1857 
1858         assert_eq!(
1859             mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1860             WRITER_WAITING
1861         );
1862 
1863         // Drop the lock.  This should mark fut1 as ready to make progress.
1864         mem::drop(count);
1865 
1866         // Now drop fut1.  This should make fut2 ready to make progress.
1867         mem::drop(fut1);
1868 
1869         // Since there was still another waiter in the list we shouldn't have cleared the
1870         // DESIGNATED_WAKER bit.
1871         assert_eq!(
1872             mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1873             DESIGNATED_WAKER
1874         );
1875 
1876         // Since the waiter was a writer, we should clear the WRITER_WAITING bit.
1877         assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1878 
1879         match fut2.as_mut().poll(&mut cx) {
1880             Poll::Pending => panic!("Future is not ready to make progress"),
1881             Poll::Ready(()) => {}
1882         }
1883 
1884         // Verify that we only incremented the count once.
1885         assert_eq!(*block_on(mu.lock()), 1);
1886         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1887     }
1888 
1889     #[test]
timeout()1890     fn timeout() {
1891         async fn timed_lock(timer: oneshot::Receiver<()>, mu: Arc<RwLock<()>>) {
1892             select! {
1893                 res = timer.fuse() => {
1894                     match res {
1895                         Ok(()) => {},
1896                         Err(e) => panic!("Timer unexpectedly canceled: {}", e),
1897                     }
1898                 }
1899                 _ = mu.lock().fuse() => panic!("Successfuly acquired lock"),
1900             }
1901         }
1902 
1903         let mu = Arc::new(RwLock::new(()));
1904         let (tx, rx) = oneshot::channel();
1905 
1906         let mut timeout = Box::pin(timed_lock(rx, mu.clone()));
1907 
1908         let arc_waker = Arc::new(TestWaker);
1909         let waker = waker_ref(&arc_waker);
1910         let mut cx = Context::from_waker(&waker);
1911 
1912         // Acquire the lock.
1913         let g = block_on(mu.lock());
1914 
1915         // Poll the future.
1916         if let Poll::Ready(()) = timeout.as_mut().poll(&mut cx) {
1917             panic!("timed_lock unexpectedly ready");
1918         }
1919 
1920         assert_eq!(
1921             mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1922             HAS_WAITERS
1923         );
1924 
1925         // Signal the channel, which should cancel the lock.
1926         tx.send(()).expect("Failed to send wakeup");
1927 
1928         // Now the future should have completed without acquiring the lock.
1929         if timeout.as_mut().poll(&mut cx).is_pending() {
1930             panic!("timed_lock not ready after timeout");
1931         }
1932 
1933         // The rwlock state should not show any waiters.
1934         assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
1935 
1936         mem::drop(g);
1937 
1938         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1939     }
1940 
1941     #[test]
writer_waiting()1942     fn writer_waiting() {
1943         async fn read_zero(mu: Arc<RwLock<usize>>) {
1944             let val = mu.read_lock().await;
1945             pending!();
1946 
1947             assert_eq!(*val, 0);
1948         }
1949 
1950         async fn inc(mu: Arc<RwLock<usize>>) {
1951             *mu.lock().await += 1;
1952         }
1953 
1954         async fn read_one(mu: Arc<RwLock<usize>>) {
1955             let val = mu.read_lock().await;
1956 
1957             assert_eq!(*val, 1);
1958         }
1959 
1960         let mu = Arc::new(RwLock::new(0));
1961 
1962         let mut r1 = Box::pin(read_zero(mu.clone()));
1963         let mut r2 = Box::pin(read_zero(mu.clone()));
1964 
1965         let mut w = Box::pin(inc(mu.clone()));
1966         let mut r3 = Box::pin(read_one(mu.clone()));
1967 
1968         let arc_waker = Arc::new(TestWaker);
1969         let waker = waker_ref(&arc_waker);
1970         let mut cx = Context::from_waker(&waker);
1971 
1972         if let Poll::Ready(()) = r1.as_mut().poll(&mut cx) {
1973             panic!("read_zero unexpectedly ready");
1974         }
1975         if let Poll::Ready(()) = r2.as_mut().poll(&mut cx) {
1976             panic!("read_zero unexpectedly ready");
1977         }
1978         assert_eq!(
1979             mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1980             2 * READ_LOCK
1981         );
1982 
1983         if let Poll::Ready(()) = w.as_mut().poll(&mut cx) {
1984             panic!("inc unexpectedly ready");
1985         }
1986         assert_eq!(
1987             mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1988             WRITER_WAITING
1989         );
1990 
1991         // The WRITER_WAITING bit should prevent the next reader from acquiring the lock.
1992         if let Poll::Ready(()) = r3.as_mut().poll(&mut cx) {
1993             panic!("read_one unexpectedly ready");
1994         }
1995         assert_eq!(
1996             mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1997             2 * READ_LOCK
1998         );
1999 
2000         if r1.as_mut().poll(&mut cx).is_pending() {
2001             panic!("read_zero unable to complete");
2002         }
2003         if r2.as_mut().poll(&mut cx).is_pending() {
2004             panic!("read_zero unable to complete");
2005         }
2006         if w.as_mut().poll(&mut cx).is_pending() {
2007             panic!("inc unable to complete");
2008         }
2009         if r3.as_mut().poll(&mut cx).is_pending() {
2010             panic!("read_one unable to complete");
2011         }
2012 
2013         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2014     }
2015 
2016     #[test]
notify_one()2017     fn notify_one() {
2018         async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2019             let mut count = mu.read_lock().await;
2020             while *count == 0 {
2021                 count = cv.wait_read(count).await;
2022             }
2023         }
2024 
2025         async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2026             let mut count = mu.lock().await;
2027             while *count == 0 {
2028                 count = cv.wait(count).await;
2029             }
2030 
2031             *count -= 1;
2032         }
2033 
2034         let mu = Arc::new(RwLock::new(0));
2035         let cv = Arc::new(Condvar::new());
2036 
2037         let arc_waker = Arc::new(TestWaker);
2038         let waker = waker_ref(&arc_waker);
2039         let mut cx = Context::from_waker(&waker);
2040 
2041         let mut readers = [
2042             Box::pin(read(mu.clone(), cv.clone())),
2043             Box::pin(read(mu.clone(), cv.clone())),
2044             Box::pin(read(mu.clone(), cv.clone())),
2045             Box::pin(read(mu.clone(), cv.clone())),
2046         ];
2047         let mut writer = Box::pin(write(mu.clone(), cv.clone()));
2048 
2049         for r in &mut readers {
2050             if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
2051                 panic!("reader unexpectedly ready");
2052             }
2053         }
2054         if let Poll::Ready(()) = writer.as_mut().poll(&mut cx) {
2055             panic!("writer unexpectedly ready");
2056         }
2057 
2058         let mut count = block_on(mu.lock());
2059         *count = 1;
2060 
2061         // This should wake all readers + one writer.
2062         cv.notify_one();
2063 
2064         // Poll the readers and the writer so they add themselves to the rwlock's waiter list.
2065         for r in &mut readers {
2066             if r.as_mut().poll(&mut cx).is_ready() {
2067                 panic!("reader unexpectedly ready");
2068             }
2069         }
2070 
2071         if writer.as_mut().poll(&mut cx).is_ready() {
2072             panic!("writer unexpectedly ready");
2073         }
2074 
2075         assert_eq!(
2076             mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
2077             HAS_WAITERS
2078         );
2079         assert_eq!(
2080             mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
2081             WRITER_WAITING
2082         );
2083 
2084         mem::drop(count);
2085 
2086         assert_eq!(
2087             mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2088             HAS_WAITERS | WRITER_WAITING
2089         );
2090 
2091         for r in &mut readers {
2092             if r.as_mut().poll(&mut cx).is_pending() {
2093                 panic!("reader unable to complete");
2094             }
2095         }
2096 
2097         if writer.as_mut().poll(&mut cx).is_pending() {
2098             panic!("writer unable to complete");
2099         }
2100 
2101         assert_eq!(*block_on(mu.read_lock()), 0);
2102     }
2103 
2104     #[test]
notify_when_unlocked()2105     fn notify_when_unlocked() {
2106         async fn dec(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2107             let mut count = mu.lock().await;
2108 
2109             while *count == 0 {
2110                 count = cv.wait(count).await;
2111             }
2112 
2113             *count -= 1;
2114         }
2115 
2116         let mu = Arc::new(RwLock::new(0));
2117         let cv = Arc::new(Condvar::new());
2118 
2119         let arc_waker = Arc::new(TestWaker);
2120         let waker = waker_ref(&arc_waker);
2121         let mut cx = Context::from_waker(&waker);
2122 
2123         let mut futures = [
2124             Box::pin(dec(mu.clone(), cv.clone())),
2125             Box::pin(dec(mu.clone(), cv.clone())),
2126             Box::pin(dec(mu.clone(), cv.clone())),
2127             Box::pin(dec(mu.clone(), cv.clone())),
2128         ];
2129 
2130         for f in &mut futures {
2131             if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2132                 panic!("future unexpectedly ready");
2133             }
2134         }
2135 
2136         *block_on(mu.lock()) = futures.len();
2137         cv.notify_all();
2138 
2139         // Since we haven't polled `futures` yet, the rwlock should not have any waiters.
2140         assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2141 
2142         for f in &mut futures {
2143             if f.as_mut().poll(&mut cx).is_pending() {
2144                 panic!("future unexpectedly ready");
2145             }
2146         }
2147         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2148     }
2149 
2150     #[test]
notify_reader_writer()2151     fn notify_reader_writer() {
2152         async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2153             let mut count = mu.read_lock().await;
2154             while *count == 0 {
2155                 count = cv.wait_read(count).await;
2156             }
2157 
2158             // Yield once while holding the read lock, which should prevent the writer from waking
2159             // up.
2160             pending!();
2161         }
2162 
2163         async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2164             let mut count = mu.lock().await;
2165             while *count == 0 {
2166                 count = cv.wait(count).await;
2167             }
2168 
2169             *count -= 1;
2170         }
2171 
2172         async fn lock(mu: Arc<RwLock<usize>>) {
2173             mem::drop(mu.lock().await);
2174         }
2175 
2176         let mu = Arc::new(RwLock::new(0));
2177         let cv = Arc::new(Condvar::new());
2178 
2179         let arc_waker = Arc::new(TestWaker);
2180         let waker = waker_ref(&arc_waker);
2181         let mut cx = Context::from_waker(&waker);
2182 
2183         let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
2184             Box::pin(read(mu.clone(), cv.clone())),
2185             Box::pin(read(mu.clone(), cv.clone())),
2186             Box::pin(read(mu.clone(), cv.clone())),
2187             Box::pin(write(mu.clone(), cv.clone())),
2188             Box::pin(read(mu.clone(), cv.clone())),
2189         ];
2190         const NUM_READERS: usize = 4;
2191 
2192         let mut l = Box::pin(lock(mu.clone()));
2193 
2194         for f in &mut futures {
2195             if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2196                 panic!("future unexpectedly ready");
2197             }
2198         }
2199 
2200         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2201 
2202         let mut count = block_on(mu.lock());
2203         *count = 1;
2204 
2205         // Now poll the lock function. Since the lock is held by us, it will get queued on the
2206         // waiter list.
2207         if let Poll::Ready(()) = l.as_mut().poll(&mut cx) {
2208             panic!("lock() unexpectedly ready");
2209         }
2210 
2211         assert_eq!(
2212             mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2213             HAS_WAITERS | WRITER_WAITING
2214         );
2215 
2216         // Wake up waiters while holding the lock.
2217         cv.notify_all();
2218 
2219         // Drop the lock.  This should wake up the lock function.
2220         mem::drop(count);
2221 
2222         if l.as_mut().poll(&mut cx).is_pending() {
2223             panic!("lock() unable to complete");
2224         }
2225 
2226         // Since we haven't polled `futures` yet, the rwlock state should now be empty.
2227         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2228 
2229         // Poll everything again. The readers should be able to make progress (but not complete) but
2230         // the writer should be blocked.
2231         for f in &mut futures {
2232             if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2233                 panic!("future unexpectedly ready");
2234             }
2235         }
2236 
2237         assert_eq!(
2238             mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2239             READ_LOCK * NUM_READERS
2240         );
2241 
2242         // All the readers can now finish but the writer needs to be polled again.
2243         let mut needs_poll = None;
2244         for (i, r) in futures.iter_mut().enumerate() {
2245             match r.as_mut().poll(&mut cx) {
2246                 Poll::Ready(()) => {}
2247                 Poll::Pending => {
2248                     if needs_poll.is_some() {
2249                         panic!("More than one future unable to complete");
2250                     }
2251                     needs_poll = Some(i);
2252                 }
2253             }
2254         }
2255 
2256         if futures[needs_poll.expect("Writer unexpectedly able to complete")]
2257             .as_mut()
2258             .poll(&mut cx)
2259             .is_pending()
2260         {
2261             panic!("Writer unable to complete");
2262         }
2263 
2264         assert_eq!(*block_on(mu.lock()), 0);
2265         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2266     }
2267 
2268     #[test]
notify_readers_with_read_lock()2269     fn notify_readers_with_read_lock() {
2270         async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2271             let mut count = mu.read_lock().await;
2272             while *count == 0 {
2273                 count = cv.wait_read(count).await;
2274             }
2275 
2276             // Yield once while holding the read lock.
2277             pending!();
2278         }
2279 
2280         let mu = Arc::new(RwLock::new(0));
2281         let cv = Arc::new(Condvar::new());
2282 
2283         let arc_waker = Arc::new(TestWaker);
2284         let waker = waker_ref(&arc_waker);
2285         let mut cx = Context::from_waker(&waker);
2286 
2287         let mut futures = [
2288             Box::pin(read(mu.clone(), cv.clone())),
2289             Box::pin(read(mu.clone(), cv.clone())),
2290             Box::pin(read(mu.clone(), cv.clone())),
2291             Box::pin(read(mu.clone(), cv.clone())),
2292         ];
2293 
2294         for f in &mut futures {
2295             if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2296                 panic!("future unexpectedly ready");
2297             }
2298         }
2299 
2300         // Increment the count and then grab a read lock.
2301         *block_on(mu.lock()) = 1;
2302 
2303         let g = block_on(mu.read_lock());
2304 
2305         // Notify the condvar while holding the read lock. This should wake up all the waiters.
2306         cv.notify_all();
2307 
2308         // Since the lock is held in shared mode, all the readers should immediately be able to
2309         // acquire the read lock.
2310         for f in &mut futures {
2311             if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2312                 panic!("future unexpectedly ready");
2313             }
2314         }
2315         assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2316         assert_eq!(
2317             mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2318             READ_LOCK * (futures.len() + 1)
2319         );
2320 
2321         mem::drop(g);
2322 
2323         for f in &mut futures {
2324             if f.as_mut().poll(&mut cx).is_pending() {
2325                 panic!("future unable to complete");
2326             }
2327         }
2328 
2329         assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2330     }
2331 }
2332