1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::cell::UnsafeCell;
6 use std::hint;
7 use std::mem;
8 use std::ops::Deref;
9 use std::ops::DerefMut;
10 use std::sync::atomic::AtomicUsize;
11 use std::sync::atomic::Ordering;
12 use std::sync::Arc;
13 use std::thread::yield_now;
14
15 use super::super::sync::waiter::Kind as WaiterKind;
16 use super::super::sync::waiter::Waiter;
17 use super::super::sync::waiter::WaiterAdapter;
18 use super::super::sync::waiter::WaiterList;
19 use super::super::sync::waiter::WaitingFor;
20
21 // Set when the rwlock is exclusively locked.
22 const LOCKED: usize = 1 << 0;
23 // Set when there are one or more threads waiting to acquire the lock.
24 const HAS_WAITERS: usize = 1 << 1;
25 // Set when a thread has been woken up from the wait queue. Cleared when that thread either acquires
26 // the lock or adds itself back into the wait queue. Used to prevent unnecessary wake ups when a
27 // thread has been removed from the wait queue but has not gotten CPU time yet.
28 const DESIGNATED_WAKER: usize = 1 << 2;
29 // Used to provide exclusive access to the `waiters` field in `RwLock`. Should only be held while
30 // modifying the waiter list.
31 const SPINLOCK: usize = 1 << 3;
32 // Set when a thread that wants an exclusive lock adds itself to the wait queue. New threads
33 // attempting to acquire a shared lock will be preventing from getting it when this bit is set.
34 // However, this bit is ignored once a thread has gone through the wait queue at least once.
35 const WRITER_WAITING: usize = 1 << 4;
36 // Set when a thread has gone through the wait queue many times but has failed to acquire the lock
37 // every time it is woken up. When this bit is set, all other threads are prevented from acquiring
38 // the lock until the thread that set the `LONG_WAIT` bit has acquired the lock.
39 const LONG_WAIT: usize = 1 << 5;
40 // The bit that is added to the rwlock state in order to acquire a shared lock. Since more than one
41 // thread can acquire a shared lock, we cannot use a single bit. Instead we use all the remaining
42 // bits in the state to track the number of threads that have acquired a shared lock.
43 const READ_LOCK: usize = 1 << 8;
44 // Mask used for checking if any threads currently hold a shared lock.
45 const READ_MASK: usize = !0xff;
46
47 // The number of times the thread should just spin and attempt to re-acquire the lock.
48 const SPIN_THRESHOLD: usize = 7;
49
50 // The number of times the thread needs to go through the wait queue before it sets the `LONG_WAIT`
51 // bit and forces all other threads to wait for it to acquire the lock. This value is set relatively
52 // high so that we don't lose the benefit of having running threads unless it is absolutely
53 // necessary.
54 const LONG_WAIT_THRESHOLD: usize = 19;
55
56 // Common methods between shared and exclusive locks.
57 trait Kind {
58 // The bits that must be zero for the thread to acquire this kind of lock. If any of these bits
59 // are not zero then the thread will first spin and retry a few times before adding itself to
60 // the wait queue.
zero_to_acquire() -> usize61 fn zero_to_acquire() -> usize;
62
63 // The bit that must be added in order to acquire this kind of lock. This should either be
64 // `LOCKED` or `READ_LOCK`.
add_to_acquire() -> usize65 fn add_to_acquire() -> usize;
66
67 // The bits that should be set when a thread adds itself to the wait queue while waiting to
68 // acquire this kind of lock.
set_when_waiting() -> usize69 fn set_when_waiting() -> usize;
70
71 // The bits that should be cleared when a thread acquires this kind of lock.
clear_on_acquire() -> usize72 fn clear_on_acquire() -> usize;
73
74 // The waiter that a thread should use when waiting to acquire this kind of lock.
new_waiter(raw: &RawRwLock) -> Arc<Waiter>75 fn new_waiter(raw: &RawRwLock) -> Arc<Waiter>;
76 }
77
78 // A lock type for shared read-only access to the data. More than one thread may hold this kind of
79 // lock simultaneously.
80 struct Shared;
81
82 impl Kind for Shared {
zero_to_acquire() -> usize83 fn zero_to_acquire() -> usize {
84 LOCKED | WRITER_WAITING | LONG_WAIT
85 }
86
add_to_acquire() -> usize87 fn add_to_acquire() -> usize {
88 READ_LOCK
89 }
90
set_when_waiting() -> usize91 fn set_when_waiting() -> usize {
92 0
93 }
94
clear_on_acquire() -> usize95 fn clear_on_acquire() -> usize {
96 0
97 }
98
new_waiter(raw: &RawRwLock) -> Arc<Waiter>99 fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
100 Arc::new(Waiter::new(
101 WaiterKind::Shared,
102 cancel_waiter,
103 raw as *const RawRwLock as usize,
104 WaitingFor::Mutex,
105 ))
106 }
107 }
108
109 // A lock type for mutually exclusive read-write access to the data. Only one thread can hold this
110 // kind of lock at a time.
111 struct Exclusive;
112
113 impl Kind for Exclusive {
zero_to_acquire() -> usize114 fn zero_to_acquire() -> usize {
115 LOCKED | READ_MASK | LONG_WAIT
116 }
117
add_to_acquire() -> usize118 fn add_to_acquire() -> usize {
119 LOCKED
120 }
121
set_when_waiting() -> usize122 fn set_when_waiting() -> usize {
123 WRITER_WAITING
124 }
125
clear_on_acquire() -> usize126 fn clear_on_acquire() -> usize {
127 WRITER_WAITING
128 }
129
new_waiter(raw: &RawRwLock) -> Arc<Waiter>130 fn new_waiter(raw: &RawRwLock) -> Arc<Waiter> {
131 Arc::new(Waiter::new(
132 WaiterKind::Exclusive,
133 cancel_waiter,
134 raw as *const RawRwLock as usize,
135 WaitingFor::Mutex,
136 ))
137 }
138 }
139
140 // Scan `waiters` and return the ones that should be woken up. Also returns any bits that should be
141 // set in the rwlock state when the current thread releases the spin lock protecting the waiter
142 // list.
143 //
144 // If the first waiter is trying to acquire a shared lock, then all waiters in the list that are
145 // waiting for a shared lock are also woken up. If any waiters waiting for an exclusive lock are
146 // found when iterating through the list, then the returned `usize` contains the `WRITER_WAITING`
147 // bit, which should be set when the thread releases the spin lock.
148 //
149 // If the first waiter is trying to acquire an exclusive lock, then only that waiter is returned and
150 // no bits are set in the returned `usize`.
get_wake_list(waiters: &mut WaiterList) -> (WaiterList, usize)151 fn get_wake_list(waiters: &mut WaiterList) -> (WaiterList, usize) {
152 let mut to_wake = WaiterList::new(WaiterAdapter::new());
153 let mut set_on_release = 0;
154 let mut cursor = waiters.front_mut();
155
156 let mut waking_readers = false;
157 while let Some(w) = cursor.get() {
158 match w.kind() {
159 WaiterKind::Exclusive if !waking_readers => {
160 // This is the first waiter and it's a writer. No need to check the other waiters.
161 let waiter = cursor.remove().unwrap();
162 waiter.set_waiting_for(WaitingFor::None);
163 to_wake.push_back(waiter);
164 break;
165 }
166
167 WaiterKind::Shared => {
168 // This is a reader and the first waiter in the list was not a writer so wake up all
169 // the readers in the wait list.
170 let waiter = cursor.remove().unwrap();
171 waiter.set_waiting_for(WaitingFor::None);
172 to_wake.push_back(waiter);
173 waking_readers = true;
174 }
175
176 WaiterKind::Exclusive => {
177 // We found a writer while looking for more readers to wake up. Set the
178 // `WRITER_WAITING` bit to prevent any new readers from acquiring the lock. All
179 // readers currently in the wait list will ignore this bit since they already waited
180 // once.
181 set_on_release |= WRITER_WAITING;
182 cursor.move_next();
183 }
184 }
185 }
186
187 (to_wake, set_on_release)
188 }
189
190 #[inline]
cpu_relax(iterations: usize)191 fn cpu_relax(iterations: usize) {
192 for _ in 0..iterations {
193 hint::spin_loop();
194 }
195 }
196
197 pub(crate) struct RawRwLock {
198 state: AtomicUsize,
199 waiters: UnsafeCell<WaiterList>,
200 }
201
202 impl RawRwLock {
new() -> RawRwLock203 pub fn new() -> RawRwLock {
204 RawRwLock {
205 state: AtomicUsize::new(0),
206 waiters: UnsafeCell::new(WaiterList::new(WaiterAdapter::new())),
207 }
208 }
209
210 #[inline]
lock(&self)211 pub async fn lock(&self) {
212 match self
213 .state
214 .compare_exchange_weak(0, LOCKED, Ordering::Acquire, Ordering::Relaxed)
215 {
216 Ok(_) => {}
217 Err(oldstate) => {
218 // If any bits that should be zero are not zero or if we fail to acquire the lock
219 // with a single compare_exchange then go through the slow path.
220 if (oldstate & Exclusive::zero_to_acquire()) != 0
221 || self
222 .state
223 .compare_exchange_weak(
224 oldstate,
225 (oldstate + Exclusive::add_to_acquire())
226 & !Exclusive::clear_on_acquire(),
227 Ordering::Acquire,
228 Ordering::Relaxed,
229 )
230 .is_err()
231 {
232 self.lock_slow::<Exclusive>(0, 0).await;
233 }
234 }
235 }
236 }
237
238 #[inline]
read_lock(&self)239 pub async fn read_lock(&self) {
240 match self
241 .state
242 .compare_exchange_weak(0, READ_LOCK, Ordering::Acquire, Ordering::Relaxed)
243 {
244 Ok(_) => {}
245 Err(oldstate) => {
246 if (oldstate & Shared::zero_to_acquire()) != 0
247 || self
248 .state
249 .compare_exchange_weak(
250 oldstate,
251 (oldstate + Shared::add_to_acquire()) & !Shared::clear_on_acquire(),
252 Ordering::Acquire,
253 Ordering::Relaxed,
254 )
255 .is_err()
256 {
257 self.lock_slow::<Shared>(0, 0).await;
258 }
259 }
260 }
261 }
262
263 // Slow path for acquiring the lock. `clear` should contain any bits that need to be cleared
264 // when the lock is acquired. Any bits set in `zero_mask` are cleared from the bits returned by
265 // `K::zero_to_acquire()`.
266 #[cold]
lock_slow<K: Kind>(&self, mut clear: usize, zero_mask: usize)267 async fn lock_slow<K: Kind>(&self, mut clear: usize, zero_mask: usize) {
268 let mut zero_to_acquire = K::zero_to_acquire() & !zero_mask;
269
270 let mut spin_count = 0;
271 let mut wait_count = 0;
272 let mut waiter = None;
273 loop {
274 let oldstate = self.state.load(Ordering::Relaxed);
275 // If all the bits in `zero_to_acquire` are actually zero then try to acquire the lock
276 // directly.
277 if (oldstate & zero_to_acquire) == 0 {
278 if self
279 .state
280 .compare_exchange_weak(
281 oldstate,
282 (oldstate + K::add_to_acquire()) & !(clear | K::clear_on_acquire()),
283 Ordering::Acquire,
284 Ordering::Relaxed,
285 )
286 .is_ok()
287 {
288 return;
289 }
290 } else if (oldstate & SPINLOCK) == 0 {
291 // The rwlock is locked and the spin lock is available. Try to add this thread to
292 // the waiter queue.
293 let w = waiter.get_or_insert_with(|| K::new_waiter(self));
294 w.reset(WaitingFor::Mutex);
295
296 if self
297 .state
298 .compare_exchange_weak(
299 oldstate,
300 (oldstate | SPINLOCK | HAS_WAITERS | K::set_when_waiting()) & !clear,
301 Ordering::Acquire,
302 Ordering::Relaxed,
303 )
304 .is_ok()
305 {
306 let mut set_on_release = 0;
307
308 if wait_count < LONG_WAIT_THRESHOLD {
309 // Add the waiter to the back of the queue.
310 // SAFETY:
311 // Safe because we have acquired the spin lock and it provides exclusive
312 // access to the waiter queue.
313 unsafe { (*self.waiters.get()).push_back(w.clone()) };
314 } else {
315 // This waiter has gone through the queue too many times. Put it in the
316 // front of the queue and block all other threads from acquiring the lock
317 // until this one has acquired it at least once.
318 // SAFETY:
319 // Safe because we have acquired the spin lock and it provides exclusive
320 // access to the waiter queue.
321 unsafe { (*self.waiters.get()).push_front(w.clone()) };
322
323 // Set the LONG_WAIT bit to prevent all other threads from acquiring the
324 // lock.
325 set_on_release |= LONG_WAIT;
326
327 // Make sure we clear the LONG_WAIT bit when we do finally get the lock.
328 clear |= LONG_WAIT;
329
330 // Since we set the LONG_WAIT bit we shouldn't allow that bit to prevent us
331 // from acquiring the lock.
332 zero_to_acquire &= !LONG_WAIT;
333 }
334
335 // Release the spin lock.
336 let mut state = oldstate;
337 loop {
338 match self.state.compare_exchange_weak(
339 state,
340 (state | set_on_release) & !SPINLOCK,
341 Ordering::Release,
342 Ordering::Relaxed,
343 ) {
344 Ok(_) => break,
345 Err(w) => state = w,
346 }
347 }
348
349 // Now wait until we are woken.
350 w.wait().await;
351
352 // The `DESIGNATED_WAKER` bit gets set when this thread is woken up by the
353 // thread that originally held the lock. While this bit is set, no other waiters
354 // will be woken up so it's important to clear it the next time we try to
355 // acquire the main lock or the spin lock.
356 clear |= DESIGNATED_WAKER;
357
358 // Now that the thread has waited once, we no longer care if there is a writer
359 // waiting. Only the limits of mutual exclusion can prevent us from acquiring
360 // the lock.
361 zero_to_acquire &= !WRITER_WAITING;
362
363 // Reset the spin count since we just went through the wait queue.
364 spin_count = 0;
365
366 // Increment the wait count since we went through the wait queue.
367 wait_count += 1;
368
369 // Skip the `cpu_relax` below.
370 continue;
371 }
372 }
373
374 // Both the lock and the spin lock are held by one or more other threads. First, we'll
375 // spin a few times in case we can acquire the lock or the spin lock. If that fails then
376 // we yield because we might be preventing the threads that do hold the 2 locks from
377 // getting cpu time.
378 if spin_count < SPIN_THRESHOLD {
379 cpu_relax(1 << spin_count);
380 spin_count += 1;
381 } else {
382 yield_now();
383 }
384 }
385 }
386
387 #[inline]
unlock(&self)388 pub fn unlock(&self) {
389 // Fast path, if possible. We can directly clear the locked bit since we have exclusive
390 // access to the rwlock.
391 let oldstate = self.state.fetch_sub(LOCKED, Ordering::Release);
392
393 // Panic if we just tried to unlock a rwlock that wasn't held by this thread. This shouldn't
394 // really be possible since `unlock` is not a public method.
395 debug_assert_eq!(
396 oldstate & READ_MASK,
397 0,
398 "`unlock` called on rwlock held in read-mode"
399 );
400 debug_assert_ne!(
401 oldstate & LOCKED,
402 0,
403 "`unlock` called on rwlock not held in write-mode"
404 );
405
406 if (oldstate & HAS_WAITERS) != 0 && (oldstate & DESIGNATED_WAKER) == 0 {
407 // The oldstate has waiters but no designated waker has been chosen yet.
408 self.unlock_slow();
409 }
410 }
411
412 #[inline]
read_unlock(&self)413 pub fn read_unlock(&self) {
414 // Fast path, if possible. We can directly subtract the READ_LOCK bit since we had
415 // previously added it.
416 let oldstate = self.state.fetch_sub(READ_LOCK, Ordering::Release);
417
418 debug_assert_eq!(
419 oldstate & LOCKED,
420 0,
421 "`read_unlock` called on rwlock held in write-mode"
422 );
423 debug_assert_ne!(
424 oldstate & READ_MASK,
425 0,
426 "`read_unlock` called on rwlock not held in read-mode"
427 );
428
429 if (oldstate & HAS_WAITERS) != 0
430 && (oldstate & DESIGNATED_WAKER) == 0
431 && (oldstate & READ_MASK) == READ_LOCK
432 {
433 // There are waiters, no designated waker has been chosen yet, and the last reader is
434 // unlocking so we have to take the slow path.
435 self.unlock_slow();
436 }
437 }
438
439 #[cold]
unlock_slow(&self)440 fn unlock_slow(&self) {
441 let mut spin_count = 0;
442
443 loop {
444 let oldstate = self.state.load(Ordering::Relaxed);
445 if (oldstate & HAS_WAITERS) == 0 || (oldstate & DESIGNATED_WAKER) != 0 {
446 // No more waiters or a designated waker has been chosen. Nothing left for us to do.
447 return;
448 } else if (oldstate & SPINLOCK) == 0 {
449 // The spin lock is not held by another thread. Try to acquire it. Also set the
450 // `DESIGNATED_WAKER` bit since we are likely going to wake up one or more threads.
451 if self
452 .state
453 .compare_exchange_weak(
454 oldstate,
455 oldstate | SPINLOCK | DESIGNATED_WAKER,
456 Ordering::Acquire,
457 Ordering::Relaxed,
458 )
459 .is_ok()
460 {
461 // Acquired the spinlock. Try to wake a waiter. We may also end up wanting to
462 // clear the HAS_WAITER and DESIGNATED_WAKER bits so start collecting the bits
463 // to be cleared.
464 let mut clear = SPINLOCK;
465
466 // SAFETY:
467 // Safe because the spinlock guarantees exclusive access to the waiter list and
468 // the reference does not escape this function.
469 let waiters = unsafe { &mut *self.waiters.get() };
470 let (wake_list, set_on_release) = get_wake_list(waiters);
471
472 // If the waiter list is now empty, clear the HAS_WAITERS bit.
473 if waiters.is_empty() {
474 clear |= HAS_WAITERS;
475 }
476
477 if wake_list.is_empty() {
478 // Since we are not going to wake any waiters clear the DESIGNATED_WAKER bit
479 // that we set when we acquired the spin lock.
480 clear |= DESIGNATED_WAKER;
481 }
482
483 // Release the spin lock and clear any other bits as necessary. Also, set any
484 // bits returned by `get_wake_list`. For now, this is just the `WRITER_WAITING`
485 // bit, which needs to be set when we are waking up a bunch of readers and there
486 // are still writers in the wait queue. This will prevent any readers that
487 // aren't in `wake_list` from acquiring the read lock.
488 let mut state = oldstate;
489 loop {
490 match self.state.compare_exchange_weak(
491 state,
492 (state | set_on_release) & !clear,
493 Ordering::Release,
494 Ordering::Relaxed,
495 ) {
496 Ok(_) => break,
497 Err(w) => state = w,
498 }
499 }
500
501 // Now wake the waiters, if any.
502 for w in wake_list {
503 w.wake();
504 }
505
506 // We're done.
507 return;
508 }
509 }
510
511 // Spin and try again. It's ok to block here as we have already released the lock.
512 if spin_count < SPIN_THRESHOLD {
513 cpu_relax(1 << spin_count);
514 spin_count += 1;
515 } else {
516 yield_now();
517 }
518 }
519 }
520
cancel_waiter(&self, waiter: &Waiter, wake_next: bool)521 fn cancel_waiter(&self, waiter: &Waiter, wake_next: bool) {
522 let mut oldstate = self.state.load(Ordering::Relaxed);
523 while oldstate & SPINLOCK != 0
524 || self
525 .state
526 .compare_exchange_weak(
527 oldstate,
528 oldstate | SPINLOCK,
529 Ordering::Acquire,
530 Ordering::Relaxed,
531 )
532 .is_err()
533 {
534 hint::spin_loop();
535 oldstate = self.state.load(Ordering::Relaxed);
536 }
537
538 // SAFETY:
539 // Safe because the spin lock provides exclusive access and the reference does not escape
540 // this function.
541 let waiters = unsafe { &mut *self.waiters.get() };
542
543 let mut clear = SPINLOCK;
544
545 // If we are about to remove the first waiter in the wait list, then clear the LONG_WAIT
546 // bit. Also clear the bit if we are going to be waking some other waiters. In this case the
547 // waiter that set the bit may have already been removed from the waiter list (and could be
548 // the one that is currently being dropped). If it is still in the waiter list then clearing
549 // this bit may starve it for one more iteration through the lock_slow() loop, whereas not
550 // clearing this bit could cause a deadlock if the waiter that set it is the one that is
551 // being dropped.
552 if wake_next
553 || waiters
554 .front()
555 .get()
556 .map(|front| std::ptr::eq(front, waiter))
557 .unwrap_or(false)
558 {
559 clear |= LONG_WAIT;
560 }
561
562 let waiting_for = waiter.is_waiting_for();
563
564 // Don't drop the old waiter while holding the spin lock.
565 let old_waiter = if waiter.is_linked() && waiting_for == WaitingFor::Mutex {
566 // SAFETY:
567 // We know that the waiter is still linked and is waiting for the rwlock, which
568 // guarantees that it is still linked into `self.waiters`.
569 let mut cursor = unsafe { waiters.cursor_mut_from_ptr(waiter as *const Waiter) };
570 cursor.remove()
571 } else {
572 None
573 };
574
575 let (wake_list, set_on_release) = if wake_next || waiting_for == WaitingFor::None {
576 // Either the waiter was already woken or it's been removed from the rwlock's waiter
577 // list and is going to be woken. Either way, we need to wake up another thread.
578 get_wake_list(waiters)
579 } else {
580 (WaiterList::new(WaiterAdapter::new()), 0)
581 };
582
583 if waiters.is_empty() {
584 clear |= HAS_WAITERS;
585 }
586
587 if wake_list.is_empty() {
588 // We're not waking any other threads so clear the DESIGNATED_WAKER bit. In the worst
589 // case this leads to an additional thread being woken up but we risk a deadlock if we
590 // don't clear it.
591 clear |= DESIGNATED_WAKER;
592 }
593
594 if let WaiterKind::Exclusive = waiter.kind() {
595 // The waiter being dropped is a writer so clear the writer waiting bit for now. If we
596 // found more writers in the list while fetching waiters to wake up then this bit will
597 // be set again via `set_on_release`.
598 clear |= WRITER_WAITING;
599 }
600
601 while self
602 .state
603 .compare_exchange_weak(
604 oldstate,
605 (oldstate & !clear) | set_on_release,
606 Ordering::Release,
607 Ordering::Relaxed,
608 )
609 .is_err()
610 {
611 hint::spin_loop();
612 oldstate = self.state.load(Ordering::Relaxed);
613 }
614
615 for w in wake_list {
616 w.wake();
617 }
618
619 mem::drop(old_waiter);
620 }
621 }
622
623 // TODO(b/315998194): Add safety comment
624 #[allow(clippy::undocumented_unsafe_blocks)]
625 unsafe impl Send for RawRwLock {}
626 // TODO(b/315998194): Add safety comment
627 #[allow(clippy::undocumented_unsafe_blocks)]
628 unsafe impl Sync for RawRwLock {}
629
cancel_waiter(raw: usize, waiter: &Waiter, wake_next: bool)630 fn cancel_waiter(raw: usize, waiter: &Waiter, wake_next: bool) {
631 let raw_rwlock = raw as *const RawRwLock;
632
633 // SAFETY:
634 // Safe because the thread that owns the waiter that is being canceled must also own a reference
635 // to the rwlock, which ensures that this pointer is valid.
636 unsafe { (*raw_rwlock).cancel_waiter(waiter, wake_next) }
637 }
638
639 /// A high-level primitive that provides safe, mutable access to a shared resource.
640 ///
641 /// `RwLock` safely provides both shared, immutable access (via `read_lock()`) as well as exclusive,
642 /// mutable access (via `lock()`) to an underlying resource asynchronously while ensuring fairness
643 /// with no loss of performance. If you don't need `read_lock()` nor fairness, try upstream
644 /// `futures::lock::Mutex` instead.
645 ///
646 /// # Poisoning
647 ///
648 /// `RwLock` does not support lock poisoning so if a thread panics while holding the lock, the
649 /// poisoned data will be accessible by other threads in your program. If you need to guarantee that
650 /// other threads cannot access poisoned data then you may wish to wrap this `RwLock` inside another
651 /// type that provides the poisoning feature. See the implementation of `std::sync::Mutex` for an
652 /// example of this. Note `futures::lock::Mutex` does not support poisoning either.
653 ///
654 ///
655 /// # Fairness
656 ///
657 /// This `RwLock` implementation does not guarantee that threads will acquire the lock in the same
658 /// order that they call `lock()` or `read_lock()`. However it will attempt to prevent long-term
659 /// starvation: if a thread repeatedly fails to acquire the lock beyond a threshold then all other
660 /// threads will fail to acquire the lock until the starved thread has acquired it. Note, on the
661 /// other hand, `futures::lock::Mutex` does not guarantee fairness.
662 ///
663 /// Similarly, this `RwLock` will attempt to balance reader and writer threads: once there is a
664 /// writer thread waiting to acquire the lock no new reader threads will be allowed to acquire it.
665 /// However, any reader threads that were already waiting will still be allowed to acquire it.
666 ///
667 /// # Examples
668 ///
669 /// ```edition2018
670 /// use std::sync::Arc;
671 /// use std::thread;
672 /// use std::sync::mpsc::channel;
673 ///
674 /// use cros_async::{block_on, sync::RwLock};
675 ///
676 /// const N: usize = 10;
677 ///
678 /// // Spawn a few threads to increment a shared variable (non-atomically), and
679 /// // let the main thread know once all increments are done.
680 /// //
681 /// // Here we're using an Arc to share memory among threads, and the data inside
682 /// // the Arc is protected with a rwlock.
683 /// let data = Arc::new(RwLock::new(0));
684 ///
685 /// let (tx, rx) = channel();
686 /// for _ in 0..N {
687 /// let (data, tx) = (Arc::clone(&data), tx.clone());
688 /// thread::spawn(move || {
689 /// // The shared state can only be accessed once the lock is held.
690 /// // Our non-atomic increment is safe because we're the only thread
691 /// // which can access the shared state when the lock is held.
692 /// let mut data = block_on(data.lock());
693 /// *data += 1;
694 /// if *data == N {
695 /// tx.send(()).unwrap();
696 /// }
697 /// // the lock is unlocked here when `data` goes out of scope.
698 /// });
699 /// }
700 ///
701 /// rx.recv().unwrap();
702 /// ```
703 #[repr(align(128))]
704 pub struct RwLock<T: ?Sized> {
705 raw: RawRwLock,
706 value: UnsafeCell<T>,
707 }
708
709 impl<T> RwLock<T> {
710 /// Create a new, unlocked `RwLock` ready for use.
new(v: T) -> RwLock<T>711 pub fn new(v: T) -> RwLock<T> {
712 RwLock {
713 raw: RawRwLock::new(),
714 value: UnsafeCell::new(v),
715 }
716 }
717
718 /// Consume the `RwLock` and return the contained value. This method does not perform any
719 /// locking as the compiler will guarantee that there are no other references to `self` and the
720 /// caller owns the `RwLock`.
into_inner(self) -> T721 pub fn into_inner(self) -> T {
722 // Don't need to acquire the lock because the compiler guarantees that there are
723 // no references to `self`.
724 self.value.into_inner()
725 }
726 }
727
728 impl<T: ?Sized> RwLock<T> {
729 /// Acquires exclusive, mutable access to the resource protected by the `RwLock`, blocking the
730 /// current thread until it is able to do so. Upon returning, the current thread will be the
731 /// only thread with access to the resource. The `RwLock` will be released when the returned
732 /// `RwLockWriteGuard` is dropped.
733 ///
734 /// Calling `lock()` while holding a `RwLockWriteGuard` or a `RwLockReadGuard` will cause a
735 /// deadlock.
736 ///
737 /// Callers that are not in an async context may wish to use the `block_on` method to block the
738 /// thread until the `RwLock` is acquired.
739 #[inline]
lock(&self) -> RwLockWriteGuard<'_, T>740 pub async fn lock(&self) -> RwLockWriteGuard<'_, T> {
741 self.raw.lock().await;
742
743 RwLockWriteGuard {
744 mu: self,
745 // SAFETY:
746 // Safe because we have exclusive access to `self.value`.
747 value: unsafe { &mut *self.value.get() },
748 }
749 }
750
751 /// Acquires shared, immutable access to the resource protected by the `RwLock`, blocking the
752 /// current thread until it is able to do so. Upon returning there may be other threads that
753 /// also have immutable access to the resource but there will not be any threads that have
754 /// mutable access to the resource. When the returned `RwLockReadGuard` is dropped the thread
755 /// releases its access to the resource.
756 ///
757 /// Calling `read_lock()` while holding a `RwLockReadGuard` may deadlock. Calling `read_lock()`
758 /// while holding a `RwLockWriteGuard` will deadlock.
759 ///
760 /// Callers that are not in an async context may wish to use the `block_on` method to block the
761 /// thread until the `RwLock` is acquired.
762 #[inline]
read_lock(&self) -> RwLockReadGuard<'_, T>763 pub async fn read_lock(&self) -> RwLockReadGuard<'_, T> {
764 self.raw.read_lock().await;
765
766 RwLockReadGuard {
767 mu: self,
768 // SAFETY:
769 // Safe because we have shared read-only access to `self.value`.
770 value: unsafe { &*self.value.get() },
771 }
772 }
773
774 // Called from `Condvar::wait` when the thread wants to reacquire the lock.
775 #[inline]
lock_from_cv(&self) -> RwLockWriteGuard<'_, T>776 pub(crate) async fn lock_from_cv(&self) -> RwLockWriteGuard<'_, T> {
777 self.raw.lock_slow::<Exclusive>(DESIGNATED_WAKER, 0).await;
778
779 RwLockWriteGuard {
780 mu: self,
781 // SAFETY:
782 // Safe because we have exclusive access to `self.value`.
783 value: unsafe { &mut *self.value.get() },
784 }
785 }
786
787 // Like `lock_from_cv` but for acquiring a shared lock.
788 #[inline]
read_lock_from_cv(&self) -> RwLockReadGuard<'_, T>789 pub(crate) async fn read_lock_from_cv(&self) -> RwLockReadGuard<'_, T> {
790 // Threads that have waited in the Condvar's waiter list don't have to care if there is a
791 // writer waiting since they have already waited once.
792 self.raw
793 .lock_slow::<Shared>(DESIGNATED_WAKER, WRITER_WAITING)
794 .await;
795
796 RwLockReadGuard {
797 mu: self,
798 // SAFETY:
799 // Safe because we have exclusive access to `self.value`.
800 value: unsafe { &*self.value.get() },
801 }
802 }
803
804 #[inline]
unlock(&self)805 fn unlock(&self) {
806 self.raw.unlock();
807 }
808
809 #[inline]
read_unlock(&self)810 fn read_unlock(&self) {
811 self.raw.read_unlock();
812 }
813
get_mut(&mut self) -> &mut T814 pub fn get_mut(&mut self) -> &mut T {
815 // SAFETY:
816 // Safe because the compiler statically guarantees that are no other references to `self`.
817 // This is also why we don't need to acquire the lock first.
818 unsafe { &mut *self.value.get() }
819 }
820 }
821
822 // TODO(b/315998194): Add safety comment
823 #[allow(clippy::undocumented_unsafe_blocks)]
824 unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
825 // TODO(b/315998194): Add safety comment
826 #[allow(clippy::undocumented_unsafe_blocks)]
827 unsafe impl<T: ?Sized + Send> Sync for RwLock<T> {}
828
829 impl<T: Default> Default for RwLock<T> {
default() -> Self830 fn default() -> Self {
831 Self::new(Default::default())
832 }
833 }
834
835 impl<T> From<T> for RwLock<T> {
from(source: T) -> Self836 fn from(source: T) -> Self {
837 Self::new(source)
838 }
839 }
840
841 /// An RAII implementation of a "scoped exclusive lock" for a `RwLock`. When this structure is
842 /// dropped, the lock will be released. The resource protected by the `RwLock` can be accessed via
843 /// the `Deref` and `DerefMut` implementations of this structure.
844 pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> {
845 mu: &'a RwLock<T>,
846 value: &'a mut T,
847 }
848
849 impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
into_inner(self) -> &'a RwLock<T>850 pub(crate) fn into_inner(self) -> &'a RwLock<T> {
851 self.mu
852 }
853
as_raw_rwlock(&self) -> &RawRwLock854 pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
855 &self.mu.raw
856 }
857 }
858
859 impl<'a, T: ?Sized> Deref for RwLockWriteGuard<'a, T> {
860 type Target = T;
861
deref(&self) -> &Self::Target862 fn deref(&self) -> &Self::Target {
863 self.value
864 }
865 }
866
867 impl<'a, T: ?Sized> DerefMut for RwLockWriteGuard<'a, T> {
deref_mut(&mut self) -> &mut Self::Target868 fn deref_mut(&mut self) -> &mut Self::Target {
869 self.value
870 }
871 }
872
873 impl<'a, T: ?Sized> Drop for RwLockWriteGuard<'a, T> {
drop(&mut self)874 fn drop(&mut self) {
875 self.mu.unlock()
876 }
877 }
878
879 /// An RAII implementation of a "scoped shared lock" for a `RwLock`. When this structure is dropped,
880 /// the lock will be released. The resource protected by the `RwLock` can be accessed via the
881 /// `Deref` implementation of this structure.
882 pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
883 mu: &'a RwLock<T>,
884 value: &'a T,
885 }
886
887 impl<'a, T: ?Sized> RwLockReadGuard<'a, T> {
into_inner(self) -> &'a RwLock<T>888 pub(crate) fn into_inner(self) -> &'a RwLock<T> {
889 self.mu
890 }
891
as_raw_rwlock(&self) -> &RawRwLock892 pub(crate) fn as_raw_rwlock(&self) -> &RawRwLock {
893 &self.mu.raw
894 }
895 }
896
897 impl<'a, T: ?Sized> Deref for RwLockReadGuard<'a, T> {
898 type Target = T;
899
deref(&self) -> &Self::Target900 fn deref(&self) -> &Self::Target {
901 self.value
902 }
903 }
904
905 impl<'a, T: ?Sized> Drop for RwLockReadGuard<'a, T> {
drop(&mut self)906 fn drop(&mut self) {
907 self.mu.read_unlock()
908 }
909 }
910
911 // TODO(b/194338842): Fix tests for windows
912 #[cfg(any(target_os = "android", target_os = "linux"))]
913 #[cfg(test)]
914 mod test {
915 use std::future::Future;
916 use std::mem;
917 use std::pin::Pin;
918 use std::rc::Rc;
919 use std::sync::atomic::AtomicUsize;
920 use std::sync::atomic::Ordering;
921 use std::sync::mpsc::channel;
922 use std::sync::mpsc::Sender;
923 use std::sync::Arc;
924 use std::task::Context;
925 use std::task::Poll;
926 use std::task::Waker;
927 use std::thread;
928 use std::time::Duration;
929
930 use futures::channel::oneshot;
931 use futures::pending;
932 use futures::select;
933 use futures::task::waker_ref;
934 use futures::task::ArcWake;
935 use futures::FutureExt;
936 use futures_executor::LocalPool;
937 use futures_executor::ThreadPool;
938 use futures_util::task::LocalSpawnExt;
939
940 use super::super::super::block_on;
941 use super::super::super::sync::Condvar;
942 use super::super::super::sync::SpinLock;
943 use super::*;
944
945 #[derive(Debug, Eq, PartialEq)]
946 struct NonCopy(u32);
947
948 // Dummy waker used when we want to manually drive futures.
949 struct TestWaker;
950 impl ArcWake for TestWaker {
wake_by_ref(_arc_self: &Arc<Self>)951 fn wake_by_ref(_arc_self: &Arc<Self>) {}
952 }
953
954 #[test]
it_works()955 fn it_works() {
956 let mu = RwLock::new(NonCopy(13));
957
958 assert_eq!(*block_on(mu.lock()), NonCopy(13));
959 }
960
961 #[test]
smoke()962 fn smoke() {
963 let mu = RwLock::new(NonCopy(7));
964
965 mem::drop(block_on(mu.lock()));
966 mem::drop(block_on(mu.lock()));
967 }
968
969 #[test]
rw_smoke()970 fn rw_smoke() {
971 let mu = RwLock::new(NonCopy(7));
972
973 mem::drop(block_on(mu.lock()));
974 mem::drop(block_on(mu.read_lock()));
975 mem::drop((block_on(mu.read_lock()), block_on(mu.read_lock())));
976 mem::drop(block_on(mu.lock()));
977 }
978
979 #[test]
async_smoke()980 fn async_smoke() {
981 async fn lock(mu: Rc<RwLock<NonCopy>>) {
982 mu.lock().await;
983 }
984
985 async fn read_lock(mu: Rc<RwLock<NonCopy>>) {
986 mu.read_lock().await;
987 }
988
989 async fn double_read_lock(mu: Rc<RwLock<NonCopy>>) {
990 let first = mu.read_lock().await;
991 mu.read_lock().await;
992
993 // Make sure first lives past the second read lock.
994 first.as_raw_rwlock();
995 }
996
997 let mu = Rc::new(RwLock::new(NonCopy(7)));
998
999 let mut ex = LocalPool::new();
1000 let spawner = ex.spawner();
1001
1002 spawner
1003 .spawn_local(lock(Rc::clone(&mu)))
1004 .expect("Failed to spawn future");
1005 spawner
1006 .spawn_local(read_lock(Rc::clone(&mu)))
1007 .expect("Failed to spawn future");
1008 spawner
1009 .spawn_local(double_read_lock(Rc::clone(&mu)))
1010 .expect("Failed to spawn future");
1011 spawner
1012 .spawn_local(lock(Rc::clone(&mu)))
1013 .expect("Failed to spawn future");
1014
1015 ex.run();
1016 }
1017
1018 #[test]
send()1019 fn send() {
1020 let mu = RwLock::new(NonCopy(19));
1021
1022 thread::spawn(move || {
1023 let value = block_on(mu.lock());
1024 assert_eq!(*value, NonCopy(19));
1025 })
1026 .join()
1027 .unwrap();
1028 }
1029
1030 #[test]
arc_nested()1031 fn arc_nested() {
1032 // Tests nested rwlocks and access to underlying data.
1033 let mu = RwLock::new(1);
1034 let arc = Arc::new(RwLock::new(mu));
1035 thread::spawn(move || {
1036 let nested = block_on(arc.lock());
1037 let lock2 = block_on(nested.lock());
1038 assert_eq!(*lock2, 1);
1039 })
1040 .join()
1041 .unwrap();
1042 }
1043
1044 #[test]
arc_access_in_unwind()1045 fn arc_access_in_unwind() {
1046 let arc = Arc::new(RwLock::new(1));
1047 let arc2 = arc.clone();
1048 thread::spawn(move || {
1049 struct Unwinder {
1050 i: Arc<RwLock<i32>>,
1051 }
1052 impl Drop for Unwinder {
1053 fn drop(&mut self) {
1054 *block_on(self.i.lock()) += 1;
1055 }
1056 }
1057 let _u = Unwinder { i: arc2 };
1058 panic!();
1059 })
1060 .join()
1061 .expect_err("thread did not panic");
1062 let lock = block_on(arc.lock());
1063 assert_eq!(*lock, 2);
1064 }
1065
1066 #[test]
unsized_value()1067 fn unsized_value() {
1068 let rwlock: &RwLock<[i32]> = &RwLock::new([1, 2, 3]);
1069 {
1070 let b = &mut *block_on(rwlock.lock());
1071 b[0] = 4;
1072 b[2] = 5;
1073 }
1074 let expected: &[i32] = &[4, 2, 5];
1075 assert_eq!(&*block_on(rwlock.lock()), expected);
1076 }
1077 #[test]
high_contention()1078 fn high_contention() {
1079 const THREADS: usize = 17;
1080 const ITERATIONS: usize = 103;
1081
1082 let mut threads = Vec::with_capacity(THREADS);
1083
1084 let mu = Arc::new(RwLock::new(0usize));
1085 for _ in 0..THREADS {
1086 let mu2 = mu.clone();
1087 threads.push(thread::spawn(move || {
1088 for _ in 0..ITERATIONS {
1089 *block_on(mu2.lock()) += 1;
1090 }
1091 }));
1092 }
1093
1094 for t in threads.into_iter() {
1095 t.join().unwrap();
1096 }
1097
1098 assert_eq!(*block_on(mu.read_lock()), THREADS * ITERATIONS);
1099 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1100 }
1101
1102 #[test]
high_contention_with_cancel()1103 fn high_contention_with_cancel() {
1104 const TASKS: usize = 17;
1105 const ITERATIONS: usize = 103;
1106
1107 async fn increment(mu: Arc<RwLock<usize>>, alt_mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1108 for _ in 0..ITERATIONS {
1109 select! {
1110 mut count = mu.lock().fuse() => *count += 1,
1111 mut count = alt_mu.lock().fuse() => *count += 1,
1112 }
1113 }
1114 tx.send(()).expect("Failed to send completion signal");
1115 }
1116
1117 let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1118
1119 let mu = Arc::new(RwLock::new(0usize));
1120 let alt_mu = Arc::new(RwLock::new(0usize));
1121
1122 let (tx, rx) = channel();
1123 for _ in 0..TASKS {
1124 ex.spawn_ok(increment(Arc::clone(&mu), Arc::clone(&alt_mu), tx.clone()));
1125 }
1126
1127 for _ in 0..TASKS {
1128 if let Err(e) = rx.recv_timeout(Duration::from_secs(10)) {
1129 panic!("Error while waiting for threads to complete: {}", e);
1130 }
1131 }
1132
1133 assert_eq!(
1134 *block_on(mu.read_lock()) + *block_on(alt_mu.read_lock()),
1135 TASKS * ITERATIONS
1136 );
1137 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1138 assert_eq!(alt_mu.raw.state.load(Ordering::Relaxed), 0);
1139 }
1140
1141 #[test]
single_thread_async()1142 fn single_thread_async() {
1143 const TASKS: usize = 17;
1144 const ITERATIONS: usize = 103;
1145
1146 // Async closures are unstable.
1147 async fn increment(mu: Rc<RwLock<usize>>) {
1148 for _ in 0..ITERATIONS {
1149 *mu.lock().await += 1;
1150 }
1151 }
1152
1153 let mut ex = LocalPool::new();
1154 let spawner = ex.spawner();
1155
1156 let mu = Rc::new(RwLock::new(0usize));
1157 for _ in 0..TASKS {
1158 spawner
1159 .spawn_local(increment(Rc::clone(&mu)))
1160 .expect("Failed to spawn task");
1161 }
1162
1163 ex.run();
1164
1165 assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1166 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1167 }
1168
1169 #[test]
multi_thread_async()1170 fn multi_thread_async() {
1171 const TASKS: usize = 17;
1172 const ITERATIONS: usize = 103;
1173
1174 // Async closures are unstable.
1175 async fn increment(mu: Arc<RwLock<usize>>, tx: Sender<()>) {
1176 for _ in 0..ITERATIONS {
1177 *mu.lock().await += 1;
1178 }
1179 tx.send(()).expect("Failed to send completion signal");
1180 }
1181
1182 let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1183
1184 let mu = Arc::new(RwLock::new(0usize));
1185 let (tx, rx) = channel();
1186 for _ in 0..TASKS {
1187 ex.spawn_ok(increment(Arc::clone(&mu), tx.clone()));
1188 }
1189
1190 for _ in 0..TASKS {
1191 rx.recv_timeout(Duration::from_secs(5))
1192 .expect("Failed to receive completion signal");
1193 }
1194 assert_eq!(*block_on(mu.read_lock()), TASKS * ITERATIONS);
1195 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1196 }
1197
1198 #[test]
get_mut()1199 fn get_mut() {
1200 let mut mu = RwLock::new(NonCopy(13));
1201 *mu.get_mut() = NonCopy(17);
1202
1203 assert_eq!(mu.into_inner(), NonCopy(17));
1204 }
1205
1206 #[test]
into_inner()1207 fn into_inner() {
1208 let mu = RwLock::new(NonCopy(29));
1209 assert_eq!(mu.into_inner(), NonCopy(29));
1210 }
1211
1212 #[test]
into_inner_drop()1213 fn into_inner_drop() {
1214 struct NeedsDrop(Arc<AtomicUsize>);
1215 impl Drop for NeedsDrop {
1216 fn drop(&mut self) {
1217 self.0.fetch_add(1, Ordering::AcqRel);
1218 }
1219 }
1220
1221 let value = Arc::new(AtomicUsize::new(0));
1222 let needs_drop = RwLock::new(NeedsDrop(value.clone()));
1223 assert_eq!(value.load(Ordering::Acquire), 0);
1224
1225 {
1226 let inner = needs_drop.into_inner();
1227 assert_eq!(inner.0.load(Ordering::Acquire), 0);
1228 }
1229
1230 assert_eq!(value.load(Ordering::Acquire), 1);
1231 }
1232
1233 #[test]
rw_arc()1234 fn rw_arc() {
1235 const THREADS: isize = 7;
1236 const ITERATIONS: isize = 13;
1237
1238 let mu = Arc::new(RwLock::new(0isize));
1239 let mu2 = mu.clone();
1240
1241 let (tx, rx) = channel();
1242 thread::spawn(move || {
1243 let mut guard = block_on(mu2.lock());
1244 for _ in 0..ITERATIONS {
1245 let tmp = *guard;
1246 *guard = -1;
1247 thread::yield_now();
1248 *guard = tmp + 1;
1249 }
1250 tx.send(()).unwrap();
1251 });
1252
1253 let mut readers = Vec::with_capacity(10);
1254 for _ in 0..THREADS {
1255 let mu3 = mu.clone();
1256 let handle = thread::spawn(move || {
1257 let guard = block_on(mu3.read_lock());
1258 assert!(*guard >= 0);
1259 });
1260
1261 readers.push(handle);
1262 }
1263
1264 // Wait for the readers to finish their checks.
1265 for r in readers {
1266 r.join().expect("One or more readers saw a negative value");
1267 }
1268
1269 // Wait for the writer to finish.
1270 rx.recv_timeout(Duration::from_secs(5)).unwrap();
1271 assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1272 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1273 }
1274
1275 #[test]
rw_single_thread_async()1276 fn rw_single_thread_async() {
1277 // A Future that returns `Poll::pending` the first time it is polled and `Poll::Ready` every
1278 // time after that.
1279 struct TestFuture {
1280 polled: bool,
1281 waker: Arc<SpinLock<Option<Waker>>>,
1282 }
1283
1284 impl Future for TestFuture {
1285 type Output = ();
1286
1287 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1288 if self.polled {
1289 Poll::Ready(())
1290 } else {
1291 self.polled = true;
1292 *self.waker.lock() = Some(cx.waker().clone());
1293 Poll::Pending
1294 }
1295 }
1296 }
1297
1298 fn wake_future(waker: Arc<SpinLock<Option<Waker>>>) {
1299 loop {
1300 if let Some(w) = waker.lock().take() {
1301 w.wake();
1302 return;
1303 }
1304
1305 // This sleep cannot be moved into an else branch because we would end up holding
1306 // the lock while sleeping due to rust's drop ordering rules.
1307 thread::sleep(Duration::from_millis(10));
1308 }
1309 }
1310
1311 async fn writer(mu: Rc<RwLock<isize>>) {
1312 let mut guard = mu.lock().await;
1313 for _ in 0..ITERATIONS {
1314 let tmp = *guard;
1315 *guard = -1;
1316 let waker = Arc::new(SpinLock::new(None));
1317 let waker2 = Arc::clone(&waker);
1318 thread::spawn(move || wake_future(waker2));
1319 let fut = TestFuture {
1320 polled: false,
1321 waker,
1322 };
1323 fut.await;
1324 *guard = tmp + 1;
1325 }
1326 }
1327
1328 async fn reader(mu: Rc<RwLock<isize>>) {
1329 let guard = mu.read_lock().await;
1330 assert!(*guard >= 0);
1331 }
1332
1333 const TASKS: isize = 7;
1334 const ITERATIONS: isize = 13;
1335
1336 let mu = Rc::new(RwLock::new(0isize));
1337 let mut ex = LocalPool::new();
1338 let spawner = ex.spawner();
1339
1340 spawner
1341 .spawn_local(writer(Rc::clone(&mu)))
1342 .expect("Failed to spawn writer");
1343
1344 for _ in 0..TASKS {
1345 spawner
1346 .spawn_local(reader(Rc::clone(&mu)))
1347 .expect("Failed to spawn reader");
1348 }
1349
1350 ex.run();
1351
1352 assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1353 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1354 }
1355
1356 #[test]
rw_multi_thread_async()1357 fn rw_multi_thread_async() {
1358 async fn writer(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1359 let mut guard = mu.lock().await;
1360 for _ in 0..ITERATIONS {
1361 let tmp = *guard;
1362 *guard = -1;
1363 thread::yield_now();
1364 *guard = tmp + 1;
1365 }
1366
1367 mem::drop(guard);
1368 tx.send(()).unwrap();
1369 }
1370
1371 async fn reader(mu: Arc<RwLock<isize>>, tx: Sender<()>) {
1372 let guard = mu.read_lock().await;
1373 assert!(*guard >= 0);
1374
1375 mem::drop(guard);
1376 tx.send(()).expect("Failed to send completion message");
1377 }
1378
1379 const TASKS: isize = 7;
1380 const ITERATIONS: isize = 13;
1381
1382 let mu = Arc::new(RwLock::new(0isize));
1383 let ex = ThreadPool::new().expect("Failed to create ThreadPool");
1384
1385 let (txw, rxw) = channel();
1386 ex.spawn_ok(writer(Arc::clone(&mu), txw));
1387
1388 let (txr, rxr) = channel();
1389 for _ in 0..TASKS {
1390 ex.spawn_ok(reader(Arc::clone(&mu), txr.clone()));
1391 }
1392
1393 // Wait for the readers to finish their checks.
1394 for _ in 0..TASKS {
1395 rxr.recv_timeout(Duration::from_secs(5))
1396 .expect("Failed to receive completion message from reader");
1397 }
1398
1399 // Wait for the writer to finish.
1400 rxw.recv_timeout(Duration::from_secs(5))
1401 .expect("Failed to receive completion message from writer");
1402
1403 assert_eq!(*block_on(mu.read_lock()), ITERATIONS);
1404 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1405 }
1406
1407 #[test]
wake_all_readers()1408 fn wake_all_readers() {
1409 async fn read(mu: Arc<RwLock<()>>) {
1410 let g = mu.read_lock().await;
1411 pending!();
1412 mem::drop(g);
1413 }
1414
1415 async fn write(mu: Arc<RwLock<()>>) {
1416 mu.lock().await;
1417 }
1418
1419 let mu = Arc::new(RwLock::new(()));
1420 let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
1421 Box::pin(read(mu.clone())),
1422 Box::pin(read(mu.clone())),
1423 Box::pin(read(mu.clone())),
1424 Box::pin(write(mu.clone())),
1425 Box::pin(read(mu.clone())),
1426 ];
1427 const NUM_READERS: usize = 4;
1428
1429 let arc_waker = Arc::new(TestWaker);
1430 let waker = waker_ref(&arc_waker);
1431 let mut cx = Context::from_waker(&waker);
1432
1433 // Acquire the lock so that the futures cannot get it.
1434 let g = block_on(mu.lock());
1435
1436 for r in &mut futures {
1437 if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1438 panic!("future unexpectedly ready");
1439 }
1440 }
1441
1442 assert_eq!(
1443 mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1444 HAS_WAITERS
1445 );
1446
1447 assert_eq!(
1448 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1449 WRITER_WAITING
1450 );
1451
1452 // Drop the lock. This should allow all readers to make progress. Since they already waited
1453 // once they should ignore the WRITER_WAITING bit that is currently set.
1454 mem::drop(g);
1455 for r in &mut futures {
1456 if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
1457 panic!("future unexpectedly ready");
1458 }
1459 }
1460
1461 // Check that all readers were able to acquire the lock.
1462 assert_eq!(
1463 mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1464 READ_LOCK * NUM_READERS
1465 );
1466 assert_eq!(
1467 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1468 WRITER_WAITING
1469 );
1470
1471 let mut needs_poll = None;
1472
1473 // All the readers can now finish but the writer needs to be polled again.
1474 for (i, r) in futures.iter_mut().enumerate() {
1475 match r.as_mut().poll(&mut cx) {
1476 Poll::Ready(()) => {}
1477 Poll::Pending => {
1478 if needs_poll.is_some() {
1479 panic!("More than one future unable to complete");
1480 }
1481 needs_poll = Some(i);
1482 }
1483 }
1484 }
1485
1486 if futures[needs_poll.expect("Writer unexpectedly able to complete")]
1487 .as_mut()
1488 .poll(&mut cx)
1489 .is_pending()
1490 {
1491 panic!("Writer unable to complete");
1492 }
1493
1494 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1495 }
1496
1497 #[test]
long_wait()1498 fn long_wait() {
1499 async fn tight_loop(mu: Arc<RwLock<bool>>) {
1500 loop {
1501 let ready = mu.lock().await;
1502 if *ready {
1503 break;
1504 }
1505 pending!();
1506 }
1507 }
1508
1509 async fn mark_ready(mu: Arc<RwLock<bool>>) {
1510 *mu.lock().await = true;
1511 }
1512
1513 let mu = Arc::new(RwLock::new(false));
1514 let mut tl = Box::pin(tight_loop(mu.clone()));
1515 let mut mark = Box::pin(mark_ready(mu.clone()));
1516
1517 let arc_waker = Arc::new(TestWaker);
1518 let waker = waker_ref(&arc_waker);
1519 let mut cx = Context::from_waker(&waker);
1520
1521 for _ in 0..=LONG_WAIT_THRESHOLD {
1522 if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1523 panic!("tight_loop unexpectedly ready");
1524 }
1525
1526 if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1527 panic!("mark_ready unexpectedly ready");
1528 }
1529 }
1530
1531 assert_eq!(
1532 mu.raw.state.load(Ordering::Relaxed),
1533 LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1534 );
1535
1536 // This time the tight loop will fail to acquire the lock.
1537 if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1538 panic!("tight_loop unexpectedly ready");
1539 }
1540
1541 // Which will finally allow the mark_ready function to make progress.
1542 if mark.as_mut().poll(&mut cx).is_pending() {
1543 panic!("mark_ready not able to make progress");
1544 }
1545
1546 // Now the tight loop will finish.
1547 if tl.as_mut().poll(&mut cx).is_pending() {
1548 panic!("tight_loop not able to finish");
1549 }
1550
1551 assert!(*block_on(mu.lock()));
1552 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1553 }
1554
1555 #[test]
cancel_long_wait_before_wake()1556 fn cancel_long_wait_before_wake() {
1557 async fn tight_loop(mu: Arc<RwLock<bool>>) {
1558 loop {
1559 let ready = mu.lock().await;
1560 if *ready {
1561 break;
1562 }
1563 pending!();
1564 }
1565 }
1566
1567 async fn mark_ready(mu: Arc<RwLock<bool>>) {
1568 *mu.lock().await = true;
1569 }
1570
1571 let mu = Arc::new(RwLock::new(false));
1572 let mut tl = Box::pin(tight_loop(mu.clone()));
1573 let mut mark = Box::pin(mark_ready(mu.clone()));
1574
1575 let arc_waker = Arc::new(TestWaker);
1576 let waker = waker_ref(&arc_waker);
1577 let mut cx = Context::from_waker(&waker);
1578
1579 for _ in 0..=LONG_WAIT_THRESHOLD {
1580 if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1581 panic!("tight_loop unexpectedly ready");
1582 }
1583
1584 if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1585 panic!("mark_ready unexpectedly ready");
1586 }
1587 }
1588
1589 assert_eq!(
1590 mu.raw.state.load(Ordering::Relaxed),
1591 LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1592 );
1593
1594 // Now drop the mark_ready future, which should clear the LONG_WAIT bit.
1595 mem::drop(mark);
1596 assert_eq!(mu.raw.state.load(Ordering::Relaxed), LOCKED);
1597
1598 mem::drop(tl);
1599 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1600 }
1601
1602 #[test]
cancel_long_wait_after_wake()1603 fn cancel_long_wait_after_wake() {
1604 async fn tight_loop(mu: Arc<RwLock<bool>>) {
1605 loop {
1606 let ready = mu.lock().await;
1607 if *ready {
1608 break;
1609 }
1610 pending!();
1611 }
1612 }
1613
1614 async fn mark_ready(mu: Arc<RwLock<bool>>) {
1615 *mu.lock().await = true;
1616 }
1617
1618 let mu = Arc::new(RwLock::new(false));
1619 let mut tl = Box::pin(tight_loop(mu.clone()));
1620 let mut mark = Box::pin(mark_ready(mu.clone()));
1621
1622 let arc_waker = Arc::new(TestWaker);
1623 let waker = waker_ref(&arc_waker);
1624 let mut cx = Context::from_waker(&waker);
1625
1626 for _ in 0..=LONG_WAIT_THRESHOLD {
1627 if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1628 panic!("tight_loop unexpectedly ready");
1629 }
1630
1631 if let Poll::Ready(()) = mark.as_mut().poll(&mut cx) {
1632 panic!("mark_ready unexpectedly ready");
1633 }
1634 }
1635
1636 assert_eq!(
1637 mu.raw.state.load(Ordering::Relaxed),
1638 LOCKED | HAS_WAITERS | WRITER_WAITING | LONG_WAIT
1639 );
1640
1641 // This time the tight loop will fail to acquire the lock.
1642 if let Poll::Ready(()) = tl.as_mut().poll(&mut cx) {
1643 panic!("tight_loop unexpectedly ready");
1644 }
1645
1646 // Now drop the mark_ready future, which should clear the LONG_WAIT bit.
1647 mem::drop(mark);
1648 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & LONG_WAIT, 0);
1649
1650 // Since the lock is not held, we should be able to spawn a future to set the ready flag.
1651 block_on(mark_ready(mu.clone()));
1652
1653 // Now the tight loop will finish.
1654 if tl.as_mut().poll(&mut cx).is_pending() {
1655 panic!("tight_loop not able to finish");
1656 }
1657
1658 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1659 }
1660
1661 #[test]
designated_waker()1662 fn designated_waker() {
1663 async fn inc(mu: Arc<RwLock<usize>>) {
1664 *mu.lock().await += 1;
1665 }
1666
1667 let mu = Arc::new(RwLock::new(0));
1668
1669 let mut futures = [
1670 Box::pin(inc(mu.clone())),
1671 Box::pin(inc(mu.clone())),
1672 Box::pin(inc(mu.clone())),
1673 ];
1674
1675 let arc_waker = Arc::new(TestWaker);
1676 let waker = waker_ref(&arc_waker);
1677 let mut cx = Context::from_waker(&waker);
1678
1679 let count = block_on(mu.lock());
1680
1681 // Poll 2 futures. Since neither will be able to acquire the lock, they should get added to
1682 // the waiter list.
1683 if let Poll::Ready(()) = futures[0].as_mut().poll(&mut cx) {
1684 panic!("future unexpectedly ready");
1685 }
1686 if let Poll::Ready(()) = futures[1].as_mut().poll(&mut cx) {
1687 panic!("future unexpectedly ready");
1688 }
1689
1690 assert_eq!(
1691 mu.raw.state.load(Ordering::Relaxed),
1692 LOCKED | HAS_WAITERS | WRITER_WAITING,
1693 );
1694
1695 // Now drop the lock. This should set the DESIGNATED_WAKER bit and wake up the first future
1696 // in the wait list.
1697 mem::drop(count);
1698
1699 assert_eq!(
1700 mu.raw.state.load(Ordering::Relaxed),
1701 DESIGNATED_WAKER | HAS_WAITERS | WRITER_WAITING,
1702 );
1703
1704 // Now poll the third future. It should be able to acquire the lock immediately.
1705 if futures[2].as_mut().poll(&mut cx).is_pending() {
1706 panic!("future unable to complete");
1707 }
1708 assert_eq!(*block_on(mu.lock()), 1);
1709
1710 // There should still be a waiter in the wait list and the DESIGNATED_WAKER bit should still
1711 // be set.
1712 assert_eq!(
1713 mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1714 DESIGNATED_WAKER
1715 );
1716 assert_eq!(
1717 mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1718 HAS_WAITERS
1719 );
1720
1721 // Now let the future that was woken up run.
1722 if futures[0].as_mut().poll(&mut cx).is_pending() {
1723 panic!("future unable to complete");
1724 }
1725 assert_eq!(*block_on(mu.lock()), 2);
1726
1727 if futures[1].as_mut().poll(&mut cx).is_pending() {
1728 panic!("future unable to complete");
1729 }
1730 assert_eq!(*block_on(mu.lock()), 3);
1731
1732 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1733 }
1734
1735 #[test]
cancel_designated_waker()1736 fn cancel_designated_waker() {
1737 async fn inc(mu: Arc<RwLock<usize>>) {
1738 *mu.lock().await += 1;
1739 }
1740
1741 let mu = Arc::new(RwLock::new(0));
1742
1743 let mut fut = Box::pin(inc(mu.clone()));
1744
1745 let arc_waker = Arc::new(TestWaker);
1746 let waker = waker_ref(&arc_waker);
1747 let mut cx = Context::from_waker(&waker);
1748
1749 let count = block_on(mu.lock());
1750
1751 if let Poll::Ready(()) = fut.as_mut().poll(&mut cx) {
1752 panic!("Future unexpectedly ready when lock is held");
1753 }
1754
1755 // Drop the lock. This will wake up the future.
1756 mem::drop(count);
1757
1758 // Now drop the future without polling. This should clear all the state in the rwlock.
1759 mem::drop(fut);
1760
1761 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1762 }
1763
1764 #[test]
cancel_before_wake()1765 fn cancel_before_wake() {
1766 async fn inc(mu: Arc<RwLock<usize>>) {
1767 *mu.lock().await += 1;
1768 }
1769
1770 let mu = Arc::new(RwLock::new(0));
1771
1772 let mut fut1 = Box::pin(inc(mu.clone()));
1773
1774 let mut fut2 = Box::pin(inc(mu.clone()));
1775
1776 let arc_waker = Arc::new(TestWaker);
1777 let waker = waker_ref(&arc_waker);
1778 let mut cx = Context::from_waker(&waker);
1779
1780 // First acquire the lock.
1781 let count = block_on(mu.lock());
1782
1783 // Now poll the futures. Since the lock is acquired they will both get queued in the waiter
1784 // list.
1785 match fut1.as_mut().poll(&mut cx) {
1786 Poll::Pending => {}
1787 Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1788 }
1789
1790 match fut2.as_mut().poll(&mut cx) {
1791 Poll::Pending => {}
1792 Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1793 }
1794
1795 assert_eq!(
1796 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1797 WRITER_WAITING
1798 );
1799
1800 // Drop fut1. This should remove it from the waiter list but shouldn't wake fut2.
1801 mem::drop(fut1);
1802
1803 // There should be no designated waker.
1804 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER, 0);
1805
1806 // Since the waiter was a writer, we should clear the WRITER_WAITING bit.
1807 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1808
1809 match fut2.as_mut().poll(&mut cx) {
1810 Poll::Pending => {}
1811 Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1812 }
1813
1814 // Now drop the lock. This should mark fut2 as ready to make progress.
1815 mem::drop(count);
1816
1817 match fut2.as_mut().poll(&mut cx) {
1818 Poll::Pending => panic!("Future is not ready to make progress"),
1819 Poll::Ready(()) => {}
1820 }
1821
1822 // Verify that we only incremented the count once.
1823 assert_eq!(*block_on(mu.lock()), 1);
1824 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1825 }
1826
1827 #[test]
cancel_after_wake()1828 fn cancel_after_wake() {
1829 async fn inc(mu: Arc<RwLock<usize>>) {
1830 *mu.lock().await += 1;
1831 }
1832
1833 let mu = Arc::new(RwLock::new(0));
1834
1835 let mut fut1 = Box::pin(inc(mu.clone()));
1836
1837 let mut fut2 = Box::pin(inc(mu.clone()));
1838
1839 let arc_waker = Arc::new(TestWaker);
1840 let waker = waker_ref(&arc_waker);
1841 let mut cx = Context::from_waker(&waker);
1842
1843 // First acquire the lock.
1844 let count = block_on(mu.lock());
1845
1846 // Now poll the futures. Since the lock is acquired they will both get queued in the waiter
1847 // list.
1848 match fut1.as_mut().poll(&mut cx) {
1849 Poll::Pending => {}
1850 Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1851 }
1852
1853 match fut2.as_mut().poll(&mut cx) {
1854 Poll::Pending => {}
1855 Poll::Ready(()) => panic!("Future is unexpectedly ready"),
1856 }
1857
1858 assert_eq!(
1859 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1860 WRITER_WAITING
1861 );
1862
1863 // Drop the lock. This should mark fut1 as ready to make progress.
1864 mem::drop(count);
1865
1866 // Now drop fut1. This should make fut2 ready to make progress.
1867 mem::drop(fut1);
1868
1869 // Since there was still another waiter in the list we shouldn't have cleared the
1870 // DESIGNATED_WAKER bit.
1871 assert_eq!(
1872 mu.raw.state.load(Ordering::Relaxed) & DESIGNATED_WAKER,
1873 DESIGNATED_WAKER
1874 );
1875
1876 // Since the waiter was a writer, we should clear the WRITER_WAITING bit.
1877 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING, 0);
1878
1879 match fut2.as_mut().poll(&mut cx) {
1880 Poll::Pending => panic!("Future is not ready to make progress"),
1881 Poll::Ready(()) => {}
1882 }
1883
1884 // Verify that we only incremented the count once.
1885 assert_eq!(*block_on(mu.lock()), 1);
1886 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1887 }
1888
1889 #[test]
timeout()1890 fn timeout() {
1891 async fn timed_lock(timer: oneshot::Receiver<()>, mu: Arc<RwLock<()>>) {
1892 select! {
1893 res = timer.fuse() => {
1894 match res {
1895 Ok(()) => {},
1896 Err(e) => panic!("Timer unexpectedly canceled: {}", e),
1897 }
1898 }
1899 _ = mu.lock().fuse() => panic!("Successfuly acquired lock"),
1900 }
1901 }
1902
1903 let mu = Arc::new(RwLock::new(()));
1904 let (tx, rx) = oneshot::channel();
1905
1906 let mut timeout = Box::pin(timed_lock(rx, mu.clone()));
1907
1908 let arc_waker = Arc::new(TestWaker);
1909 let waker = waker_ref(&arc_waker);
1910 let mut cx = Context::from_waker(&waker);
1911
1912 // Acquire the lock.
1913 let g = block_on(mu.lock());
1914
1915 // Poll the future.
1916 if let Poll::Ready(()) = timeout.as_mut().poll(&mut cx) {
1917 panic!("timed_lock unexpectedly ready");
1918 }
1919
1920 assert_eq!(
1921 mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
1922 HAS_WAITERS
1923 );
1924
1925 // Signal the channel, which should cancel the lock.
1926 tx.send(()).expect("Failed to send wakeup");
1927
1928 // Now the future should have completed without acquiring the lock.
1929 if timeout.as_mut().poll(&mut cx).is_pending() {
1930 panic!("timed_lock not ready after timeout");
1931 }
1932
1933 // The rwlock state should not show any waiters.
1934 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
1935
1936 mem::drop(g);
1937
1938 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
1939 }
1940
1941 #[test]
writer_waiting()1942 fn writer_waiting() {
1943 async fn read_zero(mu: Arc<RwLock<usize>>) {
1944 let val = mu.read_lock().await;
1945 pending!();
1946
1947 assert_eq!(*val, 0);
1948 }
1949
1950 async fn inc(mu: Arc<RwLock<usize>>) {
1951 *mu.lock().await += 1;
1952 }
1953
1954 async fn read_one(mu: Arc<RwLock<usize>>) {
1955 let val = mu.read_lock().await;
1956
1957 assert_eq!(*val, 1);
1958 }
1959
1960 let mu = Arc::new(RwLock::new(0));
1961
1962 let mut r1 = Box::pin(read_zero(mu.clone()));
1963 let mut r2 = Box::pin(read_zero(mu.clone()));
1964
1965 let mut w = Box::pin(inc(mu.clone()));
1966 let mut r3 = Box::pin(read_one(mu.clone()));
1967
1968 let arc_waker = Arc::new(TestWaker);
1969 let waker = waker_ref(&arc_waker);
1970 let mut cx = Context::from_waker(&waker);
1971
1972 if let Poll::Ready(()) = r1.as_mut().poll(&mut cx) {
1973 panic!("read_zero unexpectedly ready");
1974 }
1975 if let Poll::Ready(()) = r2.as_mut().poll(&mut cx) {
1976 panic!("read_zero unexpectedly ready");
1977 }
1978 assert_eq!(
1979 mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1980 2 * READ_LOCK
1981 );
1982
1983 if let Poll::Ready(()) = w.as_mut().poll(&mut cx) {
1984 panic!("inc unexpectedly ready");
1985 }
1986 assert_eq!(
1987 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
1988 WRITER_WAITING
1989 );
1990
1991 // The WRITER_WAITING bit should prevent the next reader from acquiring the lock.
1992 if let Poll::Ready(()) = r3.as_mut().poll(&mut cx) {
1993 panic!("read_one unexpectedly ready");
1994 }
1995 assert_eq!(
1996 mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
1997 2 * READ_LOCK
1998 );
1999
2000 if r1.as_mut().poll(&mut cx).is_pending() {
2001 panic!("read_zero unable to complete");
2002 }
2003 if r2.as_mut().poll(&mut cx).is_pending() {
2004 panic!("read_zero unable to complete");
2005 }
2006 if w.as_mut().poll(&mut cx).is_pending() {
2007 panic!("inc unable to complete");
2008 }
2009 if r3.as_mut().poll(&mut cx).is_pending() {
2010 panic!("read_one unable to complete");
2011 }
2012
2013 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2014 }
2015
2016 #[test]
notify_one()2017 fn notify_one() {
2018 async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2019 let mut count = mu.read_lock().await;
2020 while *count == 0 {
2021 count = cv.wait_read(count).await;
2022 }
2023 }
2024
2025 async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2026 let mut count = mu.lock().await;
2027 while *count == 0 {
2028 count = cv.wait(count).await;
2029 }
2030
2031 *count -= 1;
2032 }
2033
2034 let mu = Arc::new(RwLock::new(0));
2035 let cv = Arc::new(Condvar::new());
2036
2037 let arc_waker = Arc::new(TestWaker);
2038 let waker = waker_ref(&arc_waker);
2039 let mut cx = Context::from_waker(&waker);
2040
2041 let mut readers = [
2042 Box::pin(read(mu.clone(), cv.clone())),
2043 Box::pin(read(mu.clone(), cv.clone())),
2044 Box::pin(read(mu.clone(), cv.clone())),
2045 Box::pin(read(mu.clone(), cv.clone())),
2046 ];
2047 let mut writer = Box::pin(write(mu.clone(), cv.clone()));
2048
2049 for r in &mut readers {
2050 if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
2051 panic!("reader unexpectedly ready");
2052 }
2053 }
2054 if let Poll::Ready(()) = writer.as_mut().poll(&mut cx) {
2055 panic!("writer unexpectedly ready");
2056 }
2057
2058 let mut count = block_on(mu.lock());
2059 *count = 1;
2060
2061 // This should wake all readers + one writer.
2062 cv.notify_one();
2063
2064 // Poll the readers and the writer so they add themselves to the rwlock's waiter list.
2065 for r in &mut readers {
2066 if r.as_mut().poll(&mut cx).is_ready() {
2067 panic!("reader unexpectedly ready");
2068 }
2069 }
2070
2071 if writer.as_mut().poll(&mut cx).is_ready() {
2072 panic!("writer unexpectedly ready");
2073 }
2074
2075 assert_eq!(
2076 mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS,
2077 HAS_WAITERS
2078 );
2079 assert_eq!(
2080 mu.raw.state.load(Ordering::Relaxed) & WRITER_WAITING,
2081 WRITER_WAITING
2082 );
2083
2084 mem::drop(count);
2085
2086 assert_eq!(
2087 mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2088 HAS_WAITERS | WRITER_WAITING
2089 );
2090
2091 for r in &mut readers {
2092 if r.as_mut().poll(&mut cx).is_pending() {
2093 panic!("reader unable to complete");
2094 }
2095 }
2096
2097 if writer.as_mut().poll(&mut cx).is_pending() {
2098 panic!("writer unable to complete");
2099 }
2100
2101 assert_eq!(*block_on(mu.read_lock()), 0);
2102 }
2103
2104 #[test]
notify_when_unlocked()2105 fn notify_when_unlocked() {
2106 async fn dec(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2107 let mut count = mu.lock().await;
2108
2109 while *count == 0 {
2110 count = cv.wait(count).await;
2111 }
2112
2113 *count -= 1;
2114 }
2115
2116 let mu = Arc::new(RwLock::new(0));
2117 let cv = Arc::new(Condvar::new());
2118
2119 let arc_waker = Arc::new(TestWaker);
2120 let waker = waker_ref(&arc_waker);
2121 let mut cx = Context::from_waker(&waker);
2122
2123 let mut futures = [
2124 Box::pin(dec(mu.clone(), cv.clone())),
2125 Box::pin(dec(mu.clone(), cv.clone())),
2126 Box::pin(dec(mu.clone(), cv.clone())),
2127 Box::pin(dec(mu.clone(), cv.clone())),
2128 ];
2129
2130 for f in &mut futures {
2131 if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2132 panic!("future unexpectedly ready");
2133 }
2134 }
2135
2136 *block_on(mu.lock()) = futures.len();
2137 cv.notify_all();
2138
2139 // Since we haven't polled `futures` yet, the rwlock should not have any waiters.
2140 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2141
2142 for f in &mut futures {
2143 if f.as_mut().poll(&mut cx).is_pending() {
2144 panic!("future unexpectedly ready");
2145 }
2146 }
2147 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2148 }
2149
2150 #[test]
notify_reader_writer()2151 fn notify_reader_writer() {
2152 async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2153 let mut count = mu.read_lock().await;
2154 while *count == 0 {
2155 count = cv.wait_read(count).await;
2156 }
2157
2158 // Yield once while holding the read lock, which should prevent the writer from waking
2159 // up.
2160 pending!();
2161 }
2162
2163 async fn write(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2164 let mut count = mu.lock().await;
2165 while *count == 0 {
2166 count = cv.wait(count).await;
2167 }
2168
2169 *count -= 1;
2170 }
2171
2172 async fn lock(mu: Arc<RwLock<usize>>) {
2173 mem::drop(mu.lock().await);
2174 }
2175
2176 let mu = Arc::new(RwLock::new(0));
2177 let cv = Arc::new(Condvar::new());
2178
2179 let arc_waker = Arc::new(TestWaker);
2180 let waker = waker_ref(&arc_waker);
2181 let mut cx = Context::from_waker(&waker);
2182
2183 let mut futures: [Pin<Box<dyn Future<Output = ()>>>; 5] = [
2184 Box::pin(read(mu.clone(), cv.clone())),
2185 Box::pin(read(mu.clone(), cv.clone())),
2186 Box::pin(read(mu.clone(), cv.clone())),
2187 Box::pin(write(mu.clone(), cv.clone())),
2188 Box::pin(read(mu.clone(), cv.clone())),
2189 ];
2190 const NUM_READERS: usize = 4;
2191
2192 let mut l = Box::pin(lock(mu.clone()));
2193
2194 for f in &mut futures {
2195 if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2196 panic!("future unexpectedly ready");
2197 }
2198 }
2199
2200 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2201
2202 let mut count = block_on(mu.lock());
2203 *count = 1;
2204
2205 // Now poll the lock function. Since the lock is held by us, it will get queued on the
2206 // waiter list.
2207 if let Poll::Ready(()) = l.as_mut().poll(&mut cx) {
2208 panic!("lock() unexpectedly ready");
2209 }
2210
2211 assert_eq!(
2212 mu.raw.state.load(Ordering::Relaxed) & (HAS_WAITERS | WRITER_WAITING),
2213 HAS_WAITERS | WRITER_WAITING
2214 );
2215
2216 // Wake up waiters while holding the lock.
2217 cv.notify_all();
2218
2219 // Drop the lock. This should wake up the lock function.
2220 mem::drop(count);
2221
2222 if l.as_mut().poll(&mut cx).is_pending() {
2223 panic!("lock() unable to complete");
2224 }
2225
2226 // Since we haven't polled `futures` yet, the rwlock state should now be empty.
2227 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2228
2229 // Poll everything again. The readers should be able to make progress (but not complete) but
2230 // the writer should be blocked.
2231 for f in &mut futures {
2232 if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2233 panic!("future unexpectedly ready");
2234 }
2235 }
2236
2237 assert_eq!(
2238 mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2239 READ_LOCK * NUM_READERS
2240 );
2241
2242 // All the readers can now finish but the writer needs to be polled again.
2243 let mut needs_poll = None;
2244 for (i, r) in futures.iter_mut().enumerate() {
2245 match r.as_mut().poll(&mut cx) {
2246 Poll::Ready(()) => {}
2247 Poll::Pending => {
2248 if needs_poll.is_some() {
2249 panic!("More than one future unable to complete");
2250 }
2251 needs_poll = Some(i);
2252 }
2253 }
2254 }
2255
2256 if futures[needs_poll.expect("Writer unexpectedly able to complete")]
2257 .as_mut()
2258 .poll(&mut cx)
2259 .is_pending()
2260 {
2261 panic!("Writer unable to complete");
2262 }
2263
2264 assert_eq!(*block_on(mu.lock()), 0);
2265 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2266 }
2267
2268 #[test]
notify_readers_with_read_lock()2269 fn notify_readers_with_read_lock() {
2270 async fn read(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
2271 let mut count = mu.read_lock().await;
2272 while *count == 0 {
2273 count = cv.wait_read(count).await;
2274 }
2275
2276 // Yield once while holding the read lock.
2277 pending!();
2278 }
2279
2280 let mu = Arc::new(RwLock::new(0));
2281 let cv = Arc::new(Condvar::new());
2282
2283 let arc_waker = Arc::new(TestWaker);
2284 let waker = waker_ref(&arc_waker);
2285 let mut cx = Context::from_waker(&waker);
2286
2287 let mut futures = [
2288 Box::pin(read(mu.clone(), cv.clone())),
2289 Box::pin(read(mu.clone(), cv.clone())),
2290 Box::pin(read(mu.clone(), cv.clone())),
2291 Box::pin(read(mu.clone(), cv.clone())),
2292 ];
2293
2294 for f in &mut futures {
2295 if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2296 panic!("future unexpectedly ready");
2297 }
2298 }
2299
2300 // Increment the count and then grab a read lock.
2301 *block_on(mu.lock()) = 1;
2302
2303 let g = block_on(mu.read_lock());
2304
2305 // Notify the condvar while holding the read lock. This should wake up all the waiters.
2306 cv.notify_all();
2307
2308 // Since the lock is held in shared mode, all the readers should immediately be able to
2309 // acquire the read lock.
2310 for f in &mut futures {
2311 if let Poll::Ready(()) = f.as_mut().poll(&mut cx) {
2312 panic!("future unexpectedly ready");
2313 }
2314 }
2315 assert_eq!(mu.raw.state.load(Ordering::Relaxed) & HAS_WAITERS, 0);
2316 assert_eq!(
2317 mu.raw.state.load(Ordering::Relaxed) & READ_MASK,
2318 READ_LOCK * (futures.len() + 1)
2319 );
2320
2321 mem::drop(g);
2322
2323 for f in &mut futures {
2324 if f.as_mut().poll(&mut cx).is_pending() {
2325 panic!("future unable to complete");
2326 }
2327 }
2328
2329 assert_eq!(mu.raw.state.load(Ordering::Relaxed), 0);
2330 }
2331 }
2332