1 // Copyright 2016 Amanieu d'Antras 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use crate::elision::{have_elision, AtomicElisionExt}; 9 use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL}; 10 use crate::util; 11 use core::{ 12 cell::Cell, 13 sync::atomic::{AtomicUsize, Ordering}, 14 }; 15 use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade}; 16 use parking_lot_core::{ 17 self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken, 18 }; 19 use std::time::{Duration, Instant}; 20 21 // This reader-writer lock implementation is based on Boost's upgrade_mutex: 22 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432 23 // 24 // This implementation uses 2 wait queues, one at key [addr] and one at key 25 // [addr + 1]. The primary queue is used for all new waiting threads, and the 26 // secondary queue is used by the thread which has acquired WRITER_BIT but is 27 // waiting for the remaining readers to exit the lock. 28 // 29 // This implementation is fair between readers and writers since it uses the 30 // order in which threads first started queuing to alternate between read phases 31 // and write phases. In particular is it not vulnerable to write starvation 32 // since readers will block if there is a pending writer. 33 34 // There is at least one thread in the main queue. 35 const PARKED_BIT: usize = 0b0001; 36 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set. 37 const WRITER_PARKED_BIT: usize = 0b0010; 38 // A reader is holding an upgradable lock. The reader count must be non-zero and 39 // WRITER_BIT must not be set. 40 const UPGRADABLE_BIT: usize = 0b0100; 41 // If the reader count is zero: a writer is currently holding an exclusive lock. 42 // Otherwise: a writer is waiting for the remaining readers to exit the lock. 43 const WRITER_BIT: usize = 0b1000; 44 // Mask of bits used to count readers. 45 const READERS_MASK: usize = !0b1111; 46 // Base unit for counting readers. 47 const ONE_READER: usize = 0b10000; 48 49 // Token indicating what type of lock a queued thread is trying to acquire 50 const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER); 51 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT); 52 const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT); 53 54 /// Raw reader-writer lock type backed by the parking lot. 55 pub struct RawRwLock { 56 state: AtomicUsize, 57 } 58 59 unsafe impl lock_api::RawRwLock for RawRwLock { 60 const INIT: RawRwLock = RawRwLock { 61 state: AtomicUsize::new(0), 62 }; 63 64 type GuardMarker = crate::GuardMarker; 65 66 #[inline] lock_exclusive(&self)67 fn lock_exclusive(&self) { 68 if self 69 .state 70 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 71 .is_err() 72 { 73 let result = self.lock_exclusive_slow(None); 74 debug_assert!(result); 75 } 76 self.deadlock_acquire(); 77 } 78 79 #[inline] try_lock_exclusive(&self) -> bool80 fn try_lock_exclusive(&self) -> bool { 81 if self 82 .state 83 .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 84 .is_ok() 85 { 86 self.deadlock_acquire(); 87 true 88 } else { 89 false 90 } 91 } 92 93 #[inline] unlock_exclusive(&self)94 unsafe fn unlock_exclusive(&self) { 95 self.deadlock_release(); 96 if self 97 .state 98 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) 99 .is_ok() 100 { 101 return; 102 } 103 self.unlock_exclusive_slow(false); 104 } 105 106 #[inline] lock_shared(&self)107 fn lock_shared(&self) { 108 if !self.try_lock_shared_fast(false) { 109 let result = self.lock_shared_slow(false, None); 110 debug_assert!(result); 111 } 112 self.deadlock_acquire(); 113 } 114 115 #[inline] try_lock_shared(&self) -> bool116 fn try_lock_shared(&self) -> bool { 117 let result = if self.try_lock_shared_fast(false) { 118 true 119 } else { 120 self.try_lock_shared_slow(false) 121 }; 122 if result { 123 self.deadlock_acquire(); 124 } 125 result 126 } 127 128 #[inline] unlock_shared(&self)129 unsafe fn unlock_shared(&self) { 130 self.deadlock_release(); 131 let state = if have_elision() { 132 self.state.elision_fetch_sub_release(ONE_READER) 133 } else { 134 self.state.fetch_sub(ONE_READER, Ordering::Release) 135 }; 136 if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) { 137 self.unlock_shared_slow(); 138 } 139 } 140 141 #[inline] is_locked(&self) -> bool142 fn is_locked(&self) -> bool { 143 let state = self.state.load(Ordering::Relaxed); 144 state & (WRITER_BIT | READERS_MASK) != 0 145 } 146 147 #[inline] is_locked_exclusive(&self) -> bool148 fn is_locked_exclusive(&self) -> bool { 149 let state = self.state.load(Ordering::Relaxed); 150 state & (WRITER_BIT) != 0 151 } 152 } 153 154 unsafe impl lock_api::RawRwLockFair for RawRwLock { 155 #[inline] unlock_shared_fair(&self)156 unsafe fn unlock_shared_fair(&self) { 157 // Shared unlocking is always fair in this implementation. 158 self.unlock_shared(); 159 } 160 161 #[inline] unlock_exclusive_fair(&self)162 unsafe fn unlock_exclusive_fair(&self) { 163 self.deadlock_release(); 164 if self 165 .state 166 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) 167 .is_ok() 168 { 169 return; 170 } 171 self.unlock_exclusive_slow(true); 172 } 173 174 #[inline] bump_shared(&self)175 unsafe fn bump_shared(&self) { 176 if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT) 177 == ONE_READER | WRITER_BIT 178 { 179 self.bump_shared_slow(); 180 } 181 } 182 183 #[inline] bump_exclusive(&self)184 unsafe fn bump_exclusive(&self) { 185 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { 186 self.bump_exclusive_slow(); 187 } 188 } 189 } 190 191 unsafe impl lock_api::RawRwLockDowngrade for RawRwLock { 192 #[inline] downgrade(&self)193 unsafe fn downgrade(&self) { 194 let state = self 195 .state 196 .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release); 197 198 // Wake up parked shared and upgradable threads if there are any 199 if state & PARKED_BIT != 0 { 200 self.downgrade_slow(); 201 } 202 } 203 } 204 205 unsafe impl lock_api::RawRwLockTimed for RawRwLock { 206 type Duration = Duration; 207 type Instant = Instant; 208 209 #[inline] try_lock_shared_for(&self, timeout: Self::Duration) -> bool210 fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool { 211 let result = if self.try_lock_shared_fast(false) { 212 true 213 } else { 214 self.lock_shared_slow(false, util::to_deadline(timeout)) 215 }; 216 if result { 217 self.deadlock_acquire(); 218 } 219 result 220 } 221 222 #[inline] try_lock_shared_until(&self, timeout: Self::Instant) -> bool223 fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool { 224 let result = if self.try_lock_shared_fast(false) { 225 true 226 } else { 227 self.lock_shared_slow(false, Some(timeout)) 228 }; 229 if result { 230 self.deadlock_acquire(); 231 } 232 result 233 } 234 235 #[inline] try_lock_exclusive_for(&self, timeout: Duration) -> bool236 fn try_lock_exclusive_for(&self, timeout: Duration) -> bool { 237 let result = if self 238 .state 239 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 240 .is_ok() 241 { 242 true 243 } else { 244 self.lock_exclusive_slow(util::to_deadline(timeout)) 245 }; 246 if result { 247 self.deadlock_acquire(); 248 } 249 result 250 } 251 252 #[inline] try_lock_exclusive_until(&self, timeout: Instant) -> bool253 fn try_lock_exclusive_until(&self, timeout: Instant) -> bool { 254 let result = if self 255 .state 256 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 257 .is_ok() 258 { 259 true 260 } else { 261 self.lock_exclusive_slow(Some(timeout)) 262 }; 263 if result { 264 self.deadlock_acquire(); 265 } 266 result 267 } 268 } 269 270 unsafe impl lock_api::RawRwLockRecursive for RawRwLock { 271 #[inline] lock_shared_recursive(&self)272 fn lock_shared_recursive(&self) { 273 if !self.try_lock_shared_fast(true) { 274 let result = self.lock_shared_slow(true, None); 275 debug_assert!(result); 276 } 277 self.deadlock_acquire(); 278 } 279 280 #[inline] try_lock_shared_recursive(&self) -> bool281 fn try_lock_shared_recursive(&self) -> bool { 282 let result = if self.try_lock_shared_fast(true) { 283 true 284 } else { 285 self.try_lock_shared_slow(true) 286 }; 287 if result { 288 self.deadlock_acquire(); 289 } 290 result 291 } 292 } 293 294 unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock { 295 #[inline] try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool296 fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool { 297 let result = if self.try_lock_shared_fast(true) { 298 true 299 } else { 300 self.lock_shared_slow(true, util::to_deadline(timeout)) 301 }; 302 if result { 303 self.deadlock_acquire(); 304 } 305 result 306 } 307 308 #[inline] try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool309 fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool { 310 let result = if self.try_lock_shared_fast(true) { 311 true 312 } else { 313 self.lock_shared_slow(true, Some(timeout)) 314 }; 315 if result { 316 self.deadlock_acquire(); 317 } 318 result 319 } 320 } 321 322 unsafe impl lock_api::RawRwLockUpgrade for RawRwLock { 323 #[inline] lock_upgradable(&self)324 fn lock_upgradable(&self) { 325 if !self.try_lock_upgradable_fast() { 326 let result = self.lock_upgradable_slow(None); 327 debug_assert!(result); 328 } 329 self.deadlock_acquire(); 330 } 331 332 #[inline] try_lock_upgradable(&self) -> bool333 fn try_lock_upgradable(&self) -> bool { 334 let result = if self.try_lock_upgradable_fast() { 335 true 336 } else { 337 self.try_lock_upgradable_slow() 338 }; 339 if result { 340 self.deadlock_acquire(); 341 } 342 result 343 } 344 345 #[inline] unlock_upgradable(&self)346 unsafe fn unlock_upgradable(&self) { 347 self.deadlock_release(); 348 let state = self.state.load(Ordering::Relaxed); 349 if state & PARKED_BIT == 0 { 350 if self 351 .state 352 .compare_exchange_weak( 353 state, 354 state - (ONE_READER | UPGRADABLE_BIT), 355 Ordering::Release, 356 Ordering::Relaxed, 357 ) 358 .is_ok() 359 { 360 return; 361 } 362 } 363 self.unlock_upgradable_slow(false); 364 } 365 366 #[inline] upgrade(&self)367 unsafe fn upgrade(&self) { 368 let state = self.state.fetch_sub( 369 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 370 Ordering::Acquire, 371 ); 372 if state & READERS_MASK != ONE_READER { 373 let result = self.upgrade_slow(None); 374 debug_assert!(result); 375 } 376 } 377 378 #[inline] try_upgrade(&self) -> bool379 unsafe fn try_upgrade(&self) -> bool { 380 if self 381 .state 382 .compare_exchange_weak( 383 ONE_READER | UPGRADABLE_BIT, 384 WRITER_BIT, 385 Ordering::Acquire, 386 Ordering::Relaxed, 387 ) 388 .is_ok() 389 { 390 true 391 } else { 392 self.try_upgrade_slow() 393 } 394 } 395 } 396 397 unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock { 398 #[inline] unlock_upgradable_fair(&self)399 unsafe fn unlock_upgradable_fair(&self) { 400 self.deadlock_release(); 401 let state = self.state.load(Ordering::Relaxed); 402 if state & PARKED_BIT == 0 { 403 if self 404 .state 405 .compare_exchange_weak( 406 state, 407 state - (ONE_READER | UPGRADABLE_BIT), 408 Ordering::Release, 409 Ordering::Relaxed, 410 ) 411 .is_ok() 412 { 413 return; 414 } 415 } 416 self.unlock_upgradable_slow(false); 417 } 418 419 #[inline] bump_upgradable(&self)420 unsafe fn bump_upgradable(&self) { 421 if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT { 422 self.bump_upgradable_slow(); 423 } 424 } 425 } 426 427 unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock { 428 #[inline] downgrade_upgradable(&self)429 unsafe fn downgrade_upgradable(&self) { 430 let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed); 431 432 // Wake up parked upgradable threads if there are any 433 if state & PARKED_BIT != 0 { 434 self.downgrade_slow(); 435 } 436 } 437 438 #[inline] downgrade_to_upgradable(&self)439 unsafe fn downgrade_to_upgradable(&self) { 440 let state = self.state.fetch_add( 441 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 442 Ordering::Release, 443 ); 444 445 // Wake up parked shared threads if there are any 446 if state & PARKED_BIT != 0 { 447 self.downgrade_to_upgradable_slow(); 448 } 449 } 450 } 451 452 unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock { 453 #[inline] try_lock_upgradable_until(&self, timeout: Instant) -> bool454 fn try_lock_upgradable_until(&self, timeout: Instant) -> bool { 455 let result = if self.try_lock_upgradable_fast() { 456 true 457 } else { 458 self.lock_upgradable_slow(Some(timeout)) 459 }; 460 if result { 461 self.deadlock_acquire(); 462 } 463 result 464 } 465 466 #[inline] try_lock_upgradable_for(&self, timeout: Duration) -> bool467 fn try_lock_upgradable_for(&self, timeout: Duration) -> bool { 468 let result = if self.try_lock_upgradable_fast() { 469 true 470 } else { 471 self.lock_upgradable_slow(util::to_deadline(timeout)) 472 }; 473 if result { 474 self.deadlock_acquire(); 475 } 476 result 477 } 478 479 #[inline] try_upgrade_until(&self, timeout: Instant) -> bool480 unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool { 481 let state = self.state.fetch_sub( 482 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 483 Ordering::Relaxed, 484 ); 485 if state & READERS_MASK == ONE_READER { 486 true 487 } else { 488 self.upgrade_slow(Some(timeout)) 489 } 490 } 491 492 #[inline] try_upgrade_for(&self, timeout: Duration) -> bool493 unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool { 494 let state = self.state.fetch_sub( 495 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 496 Ordering::Relaxed, 497 ); 498 if state & READERS_MASK == ONE_READER { 499 true 500 } else { 501 self.upgrade_slow(util::to_deadline(timeout)) 502 } 503 } 504 } 505 506 impl RawRwLock { 507 #[inline(always)] try_lock_shared_fast(&self, recursive: bool) -> bool508 fn try_lock_shared_fast(&self, recursive: bool) -> bool { 509 let state = self.state.load(Ordering::Relaxed); 510 511 // We can't allow grabbing a shared lock if there is a writer, even if 512 // the writer is still waiting for the remaining readers to exit. 513 if state & WRITER_BIT != 0 { 514 // To allow recursive locks, we make an exception and allow readers 515 // to skip ahead of a pending writer to avoid deadlocking, at the 516 // cost of breaking the fairness guarantees. 517 if !recursive || state & READERS_MASK == 0 { 518 return false; 519 } 520 } 521 522 // Use hardware lock elision to avoid cache conflicts when multiple 523 // readers try to acquire the lock. We only do this if the lock is 524 // completely empty since elision handles conflicts poorly. 525 if have_elision() && state == 0 { 526 self.state 527 .elision_compare_exchange_acquire(0, ONE_READER) 528 .is_ok() 529 } else if let Some(new_state) = state.checked_add(ONE_READER) { 530 self.state 531 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 532 .is_ok() 533 } else { 534 false 535 } 536 } 537 538 #[cold] try_lock_shared_slow(&self, recursive: bool) -> bool539 fn try_lock_shared_slow(&self, recursive: bool) -> bool { 540 let mut state = self.state.load(Ordering::Relaxed); 541 loop { 542 // This mirrors the condition in try_lock_shared_fast 543 if state & WRITER_BIT != 0 { 544 if !recursive || state & READERS_MASK == 0 { 545 return false; 546 } 547 } 548 if have_elision() && state == 0 { 549 match self.state.elision_compare_exchange_acquire(0, ONE_READER) { 550 Ok(_) => return true, 551 Err(x) => state = x, 552 } 553 } else { 554 match self.state.compare_exchange_weak( 555 state, 556 state 557 .checked_add(ONE_READER) 558 .expect("RwLock reader count overflow"), 559 Ordering::Acquire, 560 Ordering::Relaxed, 561 ) { 562 Ok(_) => return true, 563 Err(x) => state = x, 564 } 565 } 566 } 567 } 568 569 #[inline(always)] try_lock_upgradable_fast(&self) -> bool570 fn try_lock_upgradable_fast(&self) -> bool { 571 let state = self.state.load(Ordering::Relaxed); 572 573 // We can't grab an upgradable lock if there is already a writer or 574 // upgradable reader. 575 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 576 return false; 577 } 578 579 if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) { 580 self.state 581 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 582 .is_ok() 583 } else { 584 false 585 } 586 } 587 588 #[cold] try_lock_upgradable_slow(&self) -> bool589 fn try_lock_upgradable_slow(&self) -> bool { 590 let mut state = self.state.load(Ordering::Relaxed); 591 loop { 592 // This mirrors the condition in try_lock_upgradable_fast 593 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 594 return false; 595 } 596 597 match self.state.compare_exchange_weak( 598 state, 599 state 600 .checked_add(ONE_READER | UPGRADABLE_BIT) 601 .expect("RwLock reader count overflow"), 602 Ordering::Acquire, 603 Ordering::Relaxed, 604 ) { 605 Ok(_) => return true, 606 Err(x) => state = x, 607 } 608 } 609 } 610 611 #[cold] lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool612 fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool { 613 let try_lock = |state: &mut usize| { 614 loop { 615 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 616 return false; 617 } 618 619 // Grab WRITER_BIT if it isn't set, even if there are parked threads. 620 match self.state.compare_exchange_weak( 621 *state, 622 *state | WRITER_BIT, 623 Ordering::Acquire, 624 Ordering::Relaxed, 625 ) { 626 Ok(_) => return true, 627 Err(x) => *state = x, 628 } 629 } 630 }; 631 632 // Step 1: grab exclusive ownership of WRITER_BIT 633 let timed_out = !self.lock_common( 634 timeout, 635 TOKEN_EXCLUSIVE, 636 try_lock, 637 WRITER_BIT | UPGRADABLE_BIT, 638 ); 639 if timed_out { 640 return false; 641 } 642 643 // Step 2: wait for all remaining readers to exit the lock. 644 self.wait_for_readers(timeout, 0) 645 } 646 647 #[cold] unlock_exclusive_slow(&self, force_fair: bool)648 fn unlock_exclusive_slow(&self, force_fair: bool) { 649 // There are threads to unpark. Try to unpark as many as we can. 650 let callback = |mut new_state, result: UnparkResult| { 651 // If we are using a fair unlock then we should keep the 652 // rwlock locked and hand it off to the unparked threads. 653 if result.unparked_threads != 0 && (force_fair || result.be_fair) { 654 if result.have_more_threads { 655 new_state |= PARKED_BIT; 656 } 657 self.state.store(new_state, Ordering::Release); 658 TOKEN_HANDOFF 659 } else { 660 // Clear the parked bit if there are no more parked threads. 661 if result.have_more_threads { 662 self.state.store(PARKED_BIT, Ordering::Release); 663 } else { 664 self.state.store(0, Ordering::Release); 665 } 666 TOKEN_NORMAL 667 } 668 }; 669 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 670 unsafe { 671 self.wake_parked_threads(0, callback); 672 } 673 } 674 675 #[cold] lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool676 fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool { 677 let try_lock = |state: &mut usize| { 678 let mut spinwait_shared = SpinWait::new(); 679 loop { 680 // Use hardware lock elision to avoid cache conflicts when multiple 681 // readers try to acquire the lock. We only do this if the lock is 682 // completely empty since elision handles conflicts poorly. 683 if have_elision() && *state == 0 { 684 match self.state.elision_compare_exchange_acquire(0, ONE_READER) { 685 Ok(_) => return true, 686 Err(x) => *state = x, 687 } 688 } 689 690 // This is the same condition as try_lock_shared_fast 691 if *state & WRITER_BIT != 0 { 692 if !recursive || *state & READERS_MASK == 0 { 693 return false; 694 } 695 } 696 697 if self 698 .state 699 .compare_exchange_weak( 700 *state, 701 state 702 .checked_add(ONE_READER) 703 .expect("RwLock reader count overflow"), 704 Ordering::Acquire, 705 Ordering::Relaxed, 706 ) 707 .is_ok() 708 { 709 return true; 710 } 711 712 // If there is high contention on the reader count then we want 713 // to leave some time between attempts to acquire the lock to 714 // let other threads make progress. 715 spinwait_shared.spin_no_yield(); 716 *state = self.state.load(Ordering::Relaxed); 717 } 718 }; 719 self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT) 720 } 721 722 #[cold] unlock_shared_slow(&self)723 fn unlock_shared_slow(&self) { 724 // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We 725 // just need to wake up a potentially sleeping pending writer. 726 // Using the 2nd key at addr + 1 727 let addr = self as *const _ as usize + 1; 728 let callback = |_result: UnparkResult| { 729 // Clear the WRITER_PARKED_BIT here since there can only be one 730 // parked writer thread. 731 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed); 732 TOKEN_NORMAL 733 }; 734 // SAFETY: 735 // * `addr` is an address we control. 736 // * `callback` does not panic or call into any function of `parking_lot`. 737 unsafe { 738 parking_lot_core::unpark_one(addr, callback); 739 } 740 } 741 742 #[cold] lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool743 fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool { 744 let try_lock = |state: &mut usize| { 745 let mut spinwait_shared = SpinWait::new(); 746 loop { 747 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 748 return false; 749 } 750 751 if self 752 .state 753 .compare_exchange_weak( 754 *state, 755 state 756 .checked_add(ONE_READER | UPGRADABLE_BIT) 757 .expect("RwLock reader count overflow"), 758 Ordering::Acquire, 759 Ordering::Relaxed, 760 ) 761 .is_ok() 762 { 763 return true; 764 } 765 766 // If there is high contention on the reader count then we want 767 // to leave some time between attempts to acquire the lock to 768 // let other threads make progress. 769 spinwait_shared.spin_no_yield(); 770 *state = self.state.load(Ordering::Relaxed); 771 } 772 }; 773 self.lock_common( 774 timeout, 775 TOKEN_UPGRADABLE, 776 try_lock, 777 WRITER_BIT | UPGRADABLE_BIT, 778 ) 779 } 780 781 #[cold] unlock_upgradable_slow(&self, force_fair: bool)782 fn unlock_upgradable_slow(&self, force_fair: bool) { 783 // Just release the lock if there are no parked threads. 784 let mut state = self.state.load(Ordering::Relaxed); 785 while state & PARKED_BIT == 0 { 786 match self.state.compare_exchange_weak( 787 state, 788 state - (ONE_READER | UPGRADABLE_BIT), 789 Ordering::Release, 790 Ordering::Relaxed, 791 ) { 792 Ok(_) => return, 793 Err(x) => state = x, 794 } 795 } 796 797 // There are threads to unpark. Try to unpark as many as we can. 798 let callback = |new_state, result: UnparkResult| { 799 // If we are using a fair unlock then we should keep the 800 // rwlock locked and hand it off to the unparked threads. 801 let mut state = self.state.load(Ordering::Relaxed); 802 if force_fair || result.be_fair { 803 // Fall back to normal unpark on overflow. Panicking is 804 // not allowed in parking_lot callbacks. 805 while let Some(mut new_state) = 806 (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state) 807 { 808 if result.have_more_threads { 809 new_state |= PARKED_BIT; 810 } else { 811 new_state &= !PARKED_BIT; 812 } 813 match self.state.compare_exchange_weak( 814 state, 815 new_state, 816 Ordering::Relaxed, 817 Ordering::Relaxed, 818 ) { 819 Ok(_) => return TOKEN_HANDOFF, 820 Err(x) => state = x, 821 } 822 } 823 } 824 825 // Otherwise just release the upgradable lock and update PARKED_BIT. 826 loop { 827 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT); 828 if result.have_more_threads { 829 new_state |= PARKED_BIT; 830 } else { 831 new_state &= !PARKED_BIT; 832 } 833 match self.state.compare_exchange_weak( 834 state, 835 new_state, 836 Ordering::Relaxed, 837 Ordering::Relaxed, 838 ) { 839 Ok(_) => return TOKEN_NORMAL, 840 Err(x) => state = x, 841 } 842 } 843 }; 844 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 845 unsafe { 846 self.wake_parked_threads(0, callback); 847 } 848 } 849 850 #[cold] try_upgrade_slow(&self) -> bool851 fn try_upgrade_slow(&self) -> bool { 852 let mut state = self.state.load(Ordering::Relaxed); 853 loop { 854 if state & READERS_MASK != ONE_READER { 855 return false; 856 } 857 match self.state.compare_exchange_weak( 858 state, 859 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT, 860 Ordering::Relaxed, 861 Ordering::Relaxed, 862 ) { 863 Ok(_) => return true, 864 Err(x) => state = x, 865 } 866 } 867 } 868 869 #[cold] upgrade_slow(&self, timeout: Option<Instant>) -> bool870 fn upgrade_slow(&self, timeout: Option<Instant>) -> bool { 871 self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT) 872 } 873 874 #[cold] downgrade_slow(&self)875 fn downgrade_slow(&self) { 876 // We only reach this point if PARKED_BIT is set. 877 let callback = |_, result: UnparkResult| { 878 // Clear the parked bit if there no more parked threads 879 if !result.have_more_threads { 880 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 881 } 882 TOKEN_NORMAL 883 }; 884 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 885 unsafe { 886 self.wake_parked_threads(ONE_READER, callback); 887 } 888 } 889 890 #[cold] downgrade_to_upgradable_slow(&self)891 fn downgrade_to_upgradable_slow(&self) { 892 // We only reach this point if PARKED_BIT is set. 893 let callback = |_, result: UnparkResult| { 894 // Clear the parked bit if there no more parked threads 895 if !result.have_more_threads { 896 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 897 } 898 TOKEN_NORMAL 899 }; 900 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 901 unsafe { 902 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); 903 } 904 } 905 906 #[cold] bump_shared_slow(&self)907 unsafe fn bump_shared_slow(&self) { 908 self.unlock_shared(); 909 self.lock_shared(); 910 } 911 912 #[cold] bump_exclusive_slow(&self)913 fn bump_exclusive_slow(&self) { 914 self.deadlock_release(); 915 self.unlock_exclusive_slow(true); 916 self.lock_exclusive(); 917 } 918 919 #[cold] bump_upgradable_slow(&self)920 fn bump_upgradable_slow(&self) { 921 self.deadlock_release(); 922 self.unlock_upgradable_slow(true); 923 self.lock_upgradable(); 924 } 925 926 /// Common code for waking up parked threads after releasing WRITER_BIT or 927 /// UPGRADABLE_BIT. 928 /// 929 /// # Safety 930 /// 931 /// `callback` must uphold the requirements of the `callback` parameter to 932 /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in 933 /// `parking_lot`. 934 #[inline] wake_parked_threads( &self, new_state: usize, callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, )935 unsafe fn wake_parked_threads( 936 &self, 937 new_state: usize, 938 callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, 939 ) { 940 // We must wake up at least one upgrader or writer if there is one, 941 // otherwise they may end up parked indefinitely since unlock_shared 942 // does not call wake_parked_threads. 943 let new_state = Cell::new(new_state); 944 let addr = self as *const _ as usize; 945 let filter = |ParkToken(token)| { 946 let s = new_state.get(); 947 948 // If we are waking up a writer, don't wake anything else. 949 if s & WRITER_BIT != 0 { 950 return FilterOp::Stop; 951 } 952 953 // Otherwise wake *all* readers and one upgrader/writer. 954 if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 { 955 // Skip writers and upgradable readers if we already have 956 // a writer/upgradable reader. 957 FilterOp::Skip 958 } else { 959 new_state.set(s + token); 960 FilterOp::Unpark 961 } 962 }; 963 let callback = |result| callback(new_state.get(), result); 964 // SAFETY: 965 // * `addr` is an address we control. 966 // * `filter` does not panic or call into any function of `parking_lot`. 967 // * `callback` safety responsibility is on caller 968 parking_lot_core::unpark_filter(addr, filter, callback); 969 } 970 971 // Common code for waiting for readers to exit the lock after acquiring 972 // WRITER_BIT. 973 #[inline] wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool974 fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool { 975 // At this point WRITER_BIT is already set, we just need to wait for the 976 // remaining readers to exit the lock. 977 let mut spinwait = SpinWait::new(); 978 let mut state = self.state.load(Ordering::Acquire); 979 while state & READERS_MASK != 0 { 980 // Spin a few times to wait for readers to exit 981 if spinwait.spin() { 982 state = self.state.load(Ordering::Acquire); 983 continue; 984 } 985 986 // Set the parked bit 987 if state & WRITER_PARKED_BIT == 0 { 988 if let Err(x) = self.state.compare_exchange_weak( 989 state, 990 state | WRITER_PARKED_BIT, 991 Ordering::Acquire, 992 Ordering::Acquire, 993 ) { 994 state = x; 995 continue; 996 } 997 } 998 999 // Park our thread until we are woken up by an unlock 1000 // Using the 2nd key at addr + 1 1001 let addr = self as *const _ as usize + 1; 1002 let validate = || { 1003 let state = self.state.load(Ordering::Relaxed); 1004 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0 1005 }; 1006 let before_sleep = || {}; 1007 let timed_out = |_, _| {}; 1008 // SAFETY: 1009 // * `addr` is an address we control. 1010 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. 1011 // * `before_sleep` does not call `park`, nor does it panic. 1012 let park_result = unsafe { 1013 parking_lot_core::park( 1014 addr, 1015 validate, 1016 before_sleep, 1017 timed_out, 1018 TOKEN_EXCLUSIVE, 1019 timeout, 1020 ) 1021 }; 1022 match park_result { 1023 // We still need to re-check the state if we are unparked 1024 // since a previous writer timing-out could have allowed 1025 // another reader to sneak in before we parked. 1026 ParkResult::Unparked(_) | ParkResult::Invalid => { 1027 state = self.state.load(Ordering::Acquire); 1028 continue; 1029 } 1030 1031 // Timeout expired 1032 ParkResult::TimedOut => { 1033 // We need to release WRITER_BIT and revert back to 1034 // our previous value. We also wake up any threads that 1035 // might be waiting on WRITER_BIT. 1036 let state = self.state.fetch_add( 1037 prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT), 1038 Ordering::Relaxed, 1039 ); 1040 if state & PARKED_BIT != 0 { 1041 let callback = |_, result: UnparkResult| { 1042 // Clear the parked bit if there no more parked threads 1043 if !result.have_more_threads { 1044 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 1045 } 1046 TOKEN_NORMAL 1047 }; 1048 // SAFETY: `callback` does not panic or call any function of `parking_lot`. 1049 unsafe { 1050 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); 1051 } 1052 } 1053 return false; 1054 } 1055 } 1056 } 1057 true 1058 } 1059 1060 /// Common code for acquiring a lock 1061 #[inline] lock_common( &self, timeout: Option<Instant>, token: ParkToken, mut try_lock: impl FnMut(&mut usize) -> bool, validate_flags: usize, ) -> bool1062 fn lock_common( 1063 &self, 1064 timeout: Option<Instant>, 1065 token: ParkToken, 1066 mut try_lock: impl FnMut(&mut usize) -> bool, 1067 validate_flags: usize, 1068 ) -> bool { 1069 let mut spinwait = SpinWait::new(); 1070 let mut state = self.state.load(Ordering::Relaxed); 1071 loop { 1072 // Attempt to grab the lock 1073 if try_lock(&mut state) { 1074 return true; 1075 } 1076 1077 // If there are no parked threads, try spinning a few times. 1078 if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() { 1079 state = self.state.load(Ordering::Relaxed); 1080 continue; 1081 } 1082 1083 // Set the parked bit 1084 if state & PARKED_BIT == 0 { 1085 if let Err(x) = self.state.compare_exchange_weak( 1086 state, 1087 state | PARKED_BIT, 1088 Ordering::Relaxed, 1089 Ordering::Relaxed, 1090 ) { 1091 state = x; 1092 continue; 1093 } 1094 } 1095 1096 // Park our thread until we are woken up by an unlock 1097 let addr = self as *const _ as usize; 1098 let validate = || { 1099 let state = self.state.load(Ordering::Relaxed); 1100 state & PARKED_BIT != 0 && (state & validate_flags != 0) 1101 }; 1102 let before_sleep = || {}; 1103 let timed_out = |_, was_last_thread| { 1104 // Clear the parked bit if we were the last parked thread 1105 if was_last_thread { 1106 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 1107 } 1108 }; 1109 1110 // SAFETY: 1111 // * `addr` is an address we control. 1112 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. 1113 // * `before_sleep` does not call `park`, nor does it panic. 1114 let park_result = unsafe { 1115 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout) 1116 }; 1117 match park_result { 1118 // The thread that unparked us passed the lock on to us 1119 // directly without unlocking it. 1120 ParkResult::Unparked(TOKEN_HANDOFF) => return true, 1121 1122 // We were unparked normally, try acquiring the lock again 1123 ParkResult::Unparked(_) => (), 1124 1125 // The validation function failed, try locking again 1126 ParkResult::Invalid => (), 1127 1128 // Timeout expired 1129 ParkResult::TimedOut => return false, 1130 } 1131 1132 // Loop back and try locking again 1133 spinwait.reset(); 1134 state = self.state.load(Ordering::Relaxed); 1135 } 1136 } 1137 1138 #[inline] deadlock_acquire(&self)1139 fn deadlock_acquire(&self) { 1140 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 1141 unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) }; 1142 } 1143 1144 #[inline] deadlock_release(&self)1145 fn deadlock_release(&self) { 1146 unsafe { deadlock::release_resource(self as *const _ as usize) }; 1147 unsafe { deadlock::release_resource(self as *const _ as usize + 1) }; 1148 } 1149 } 1150