1 // Copyright 2016 Amanieu d'Antras 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use crate::mutex::MutexGuard; 9 use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL}; 10 use crate::{deadlock, util}; 11 use core::{ 12 fmt, ptr, 13 sync::atomic::{AtomicPtr, Ordering}, 14 }; 15 use lock_api::RawMutex as RawMutex_; 16 use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN}; 17 use std::ops::DerefMut; 18 use std::time::{Duration, Instant}; 19 20 /// A type indicating whether a timed wait on a condition variable returned 21 /// due to a time out or not. 22 #[derive(Debug, PartialEq, Eq, Copy, Clone)] 23 pub struct WaitTimeoutResult(bool); 24 25 impl WaitTimeoutResult { 26 /// Returns whether the wait was known to have timed out. 27 #[inline] timed_out(self) -> bool28 pub fn timed_out(self) -> bool { 29 self.0 30 } 31 } 32 33 /// A Condition Variable 34 /// 35 /// Condition variables represent the ability to block a thread such that it 36 /// consumes no CPU time while waiting for an event to occur. Condition 37 /// variables are typically associated with a boolean predicate (a condition) 38 /// and a mutex. The predicate is always verified inside of the mutex before 39 /// determining that thread must block. 40 /// 41 /// Note that this module places one additional restriction over the system 42 /// condition variables: each condvar can be used with only one mutex at a 43 /// time. Any attempt to use multiple mutexes on the same condition variable 44 /// simultaneously will result in a runtime panic. However it is possible to 45 /// switch to a different mutex if there are no threads currently waiting on 46 /// the condition variable. 47 /// 48 /// # Differences from the standard library `Condvar` 49 /// 50 /// - No spurious wakeups: A wait will only return a non-timeout result if it 51 /// was woken up by `notify_one` or `notify_all`. 52 /// - `Condvar::notify_all` will only wake up a single thread, the rest are 53 /// requeued to wait for the `Mutex` to be unlocked by the thread that was 54 /// woken up. 55 /// - Only requires 1 word of space, whereas the standard library boxes the 56 /// `Condvar` due to platform limitations. 57 /// - Can be statically constructed. 58 /// - Does not require any drop glue when dropped. 59 /// - Inline fast path for the uncontended case. 60 /// 61 /// # Examples 62 /// 63 /// ``` 64 /// use parking_lot::{Mutex, Condvar}; 65 /// use std::sync::Arc; 66 /// use std::thread; 67 /// 68 /// let pair = Arc::new((Mutex::new(false), Condvar::new())); 69 /// let pair2 = pair.clone(); 70 /// 71 /// // Inside of our lock, spawn a new thread, and then wait for it to start 72 /// thread::spawn(move|| { 73 /// let &(ref lock, ref cvar) = &*pair2; 74 /// let mut started = lock.lock(); 75 /// *started = true; 76 /// cvar.notify_one(); 77 /// }); 78 /// 79 /// // wait for the thread to start up 80 /// let &(ref lock, ref cvar) = &*pair; 81 /// let mut started = lock.lock(); 82 /// if !*started { 83 /// cvar.wait(&mut started); 84 /// } 85 /// // Note that we used an if instead of a while loop above. This is only 86 /// // possible because parking_lot's Condvar will never spuriously wake up. 87 /// // This means that wait() will only return after notify_one or notify_all is 88 /// // called. 89 /// ``` 90 pub struct Condvar { 91 state: AtomicPtr<RawMutex>, 92 } 93 94 impl Condvar { 95 /// Creates a new condition variable which is ready to be waited on and 96 /// notified. 97 #[inline] new() -> Condvar98 pub const fn new() -> Condvar { 99 Condvar { 100 state: AtomicPtr::new(ptr::null_mut()), 101 } 102 } 103 104 /// Wakes up one blocked thread on this condvar. 105 /// 106 /// Returns whether a thread was woken up. 107 /// 108 /// If there is a blocked thread on this condition variable, then it will 109 /// be woken up from its call to `wait` or `wait_timeout`. Calls to 110 /// `notify_one` are not buffered in any way. 111 /// 112 /// To wake up all threads, see `notify_all()`. 113 /// 114 /// # Examples 115 /// 116 /// ``` 117 /// use parking_lot::Condvar; 118 /// 119 /// let condvar = Condvar::new(); 120 /// 121 /// // do something with condvar, share it with other threads 122 /// 123 /// if !condvar.notify_one() { 124 /// println!("Nobody was listening for this."); 125 /// } 126 /// ``` 127 #[inline] notify_one(&self) -> bool128 pub fn notify_one(&self) -> bool { 129 // Nothing to do if there are no waiting threads 130 let state = self.state.load(Ordering::Relaxed); 131 if state.is_null() { 132 return false; 133 } 134 135 self.notify_one_slow(state) 136 } 137 138 #[cold] notify_one_slow(&self, mutex: *mut RawMutex) -> bool139 fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool { 140 // Unpark one thread and requeue the rest onto the mutex 141 let from = self as *const _ as usize; 142 let to = mutex as usize; 143 let validate = || { 144 // Make sure that our atomic state still points to the same 145 // mutex. If not then it means that all threads on the current 146 // mutex were woken up and a new waiting thread switched to a 147 // different mutex. In that case we can get away with doing 148 // nothing. 149 if self.state.load(Ordering::Relaxed) != mutex { 150 return RequeueOp::Abort; 151 } 152 153 // Unpark one thread if the mutex is unlocked, otherwise just 154 // requeue everything to the mutex. This is safe to do here 155 // since unlocking the mutex when the parked bit is set requires 156 // locking the queue. There is the possibility of a race if the 157 // mutex gets locked after we check, but that doesn't matter in 158 // this case. 159 if unsafe { (*mutex).mark_parked_if_locked() } { 160 RequeueOp::RequeueOne 161 } else { 162 RequeueOp::UnparkOne 163 } 164 }; 165 let callback = |_op, result: UnparkResult| { 166 // Clear our state if there are no more waiting threads 167 if !result.have_more_threads { 168 self.state.store(ptr::null_mut(), Ordering::Relaxed); 169 } 170 TOKEN_NORMAL 171 }; 172 let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) }; 173 174 res.unparked_threads + res.requeued_threads != 0 175 } 176 177 /// Wakes up all blocked threads on this condvar. 178 /// 179 /// Returns the number of threads woken up. 180 /// 181 /// This method will ensure that any current waiters on the condition 182 /// variable are awoken. Calls to `notify_all()` are not buffered in any 183 /// way. 184 /// 185 /// To wake up only one thread, see `notify_one()`. 186 #[inline] notify_all(&self) -> usize187 pub fn notify_all(&self) -> usize { 188 // Nothing to do if there are no waiting threads 189 let state = self.state.load(Ordering::Relaxed); 190 if state.is_null() { 191 return 0; 192 } 193 194 self.notify_all_slow(state) 195 } 196 197 #[cold] notify_all_slow(&self, mutex: *mut RawMutex) -> usize198 fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize { 199 // Unpark one thread and requeue the rest onto the mutex 200 let from = self as *const _ as usize; 201 let to = mutex as usize; 202 let validate = || { 203 // Make sure that our atomic state still points to the same 204 // mutex. If not then it means that all threads on the current 205 // mutex were woken up and a new waiting thread switched to a 206 // different mutex. In that case we can get away with doing 207 // nothing. 208 if self.state.load(Ordering::Relaxed) != mutex { 209 return RequeueOp::Abort; 210 } 211 212 // Clear our state since we are going to unpark or requeue all 213 // threads. 214 self.state.store(ptr::null_mut(), Ordering::Relaxed); 215 216 // Unpark one thread if the mutex is unlocked, otherwise just 217 // requeue everything to the mutex. This is safe to do here 218 // since unlocking the mutex when the parked bit is set requires 219 // locking the queue. There is the possibility of a race if the 220 // mutex gets locked after we check, but that doesn't matter in 221 // this case. 222 if unsafe { (*mutex).mark_parked_if_locked() } { 223 RequeueOp::RequeueAll 224 } else { 225 RequeueOp::UnparkOneRequeueRest 226 } 227 }; 228 let callback = |op, result: UnparkResult| { 229 // If we requeued threads to the mutex, mark it as having 230 // parked threads. The RequeueAll case is already handled above. 231 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 { 232 unsafe { (*mutex).mark_parked() }; 233 } 234 TOKEN_NORMAL 235 }; 236 let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) }; 237 238 res.unparked_threads + res.requeued_threads 239 } 240 241 /// Blocks the current thread until this condition variable receives a 242 /// notification. 243 /// 244 /// This function will atomically unlock the mutex specified (represented by 245 /// `mutex_guard`) and block the current thread. This means that any calls 246 /// to `notify_*()` which happen logically after the mutex is unlocked are 247 /// candidates to wake this thread up. When this function call returns, the 248 /// lock specified will have been re-acquired. 249 /// 250 /// # Panics 251 /// 252 /// This function will panic if another thread is waiting on the `Condvar` 253 /// with a different `Mutex` object. 254 #[inline] wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>)255 pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) { 256 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None); 257 } 258 259 /// Waits on this condition variable for a notification, timing out after 260 /// the specified time instant. 261 /// 262 /// The semantics of this function are equivalent to `wait()` except that 263 /// the thread will be blocked roughly until `timeout` is reached. This 264 /// method should not be used for precise timing due to anomalies such as 265 /// preemption or platform differences that may not cause the maximum 266 /// amount of time waited to be precisely `timeout`. 267 /// 268 /// Note that the best effort is made to ensure that the time waited is 269 /// measured with a monotonic clock, and not affected by the changes made to 270 /// the system time. 271 /// 272 /// The returned `WaitTimeoutResult` value indicates if the timeout is 273 /// known to have elapsed. 274 /// 275 /// Like `wait`, the lock specified will be re-acquired when this function 276 /// returns, regardless of whether the timeout elapsed or not. 277 /// 278 /// # Panics 279 /// 280 /// This function will panic if another thread is waiting on the `Condvar` 281 /// with a different `Mutex` object. 282 #[inline] wait_until<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Instant, ) -> WaitTimeoutResult283 pub fn wait_until<T: ?Sized>( 284 &self, 285 mutex_guard: &mut MutexGuard<'_, T>, 286 timeout: Instant, 287 ) -> WaitTimeoutResult { 288 self.wait_until_internal( 289 unsafe { MutexGuard::mutex(mutex_guard).raw() }, 290 Some(timeout), 291 ) 292 } 293 294 // This is a non-generic function to reduce the monomorphization cost of 295 // using `wait_until`. wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult296 fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult { 297 let result; 298 let mut bad_mutex = false; 299 let mut requeued = false; 300 { 301 let addr = self as *const _ as usize; 302 let lock_addr = mutex as *const _ as *mut _; 303 let validate = || { 304 // Ensure we don't use two different mutexes with the same 305 // Condvar at the same time. This is done while locked to 306 // avoid races with notify_one 307 let state = self.state.load(Ordering::Relaxed); 308 if state.is_null() { 309 self.state.store(lock_addr, Ordering::Relaxed); 310 } else if state != lock_addr { 311 bad_mutex = true; 312 return false; 313 } 314 true 315 }; 316 let before_sleep = || { 317 // Unlock the mutex before sleeping... 318 unsafe { mutex.unlock() }; 319 }; 320 let timed_out = |k, was_last_thread| { 321 // If we were requeued to a mutex, then we did not time out. 322 // We'll just park ourselves on the mutex again when we try 323 // to lock it later. 324 requeued = k != addr; 325 326 // If we were the last thread on the queue then we need to 327 // clear our state. This is normally done by the 328 // notify_{one,all} functions when not timing out. 329 if !requeued && was_last_thread { 330 self.state.store(ptr::null_mut(), Ordering::Relaxed); 331 } 332 }; 333 result = unsafe { parking_lot_core::park( 334 addr, 335 validate, 336 before_sleep, 337 timed_out, 338 DEFAULT_PARK_TOKEN, 339 timeout, 340 ) }; 341 } 342 343 // Panic if we tried to use multiple mutexes with a Condvar. Note 344 // that at this point the MutexGuard is still locked. It will be 345 // unlocked by the unwinding logic. 346 if bad_mutex { 347 panic!("attempted to use a condition variable with more than one mutex"); 348 } 349 350 // ... and re-lock it once we are done sleeping 351 if result == ParkResult::Unparked(TOKEN_HANDOFF) { 352 unsafe { deadlock::acquire_resource(mutex as *const _ as usize) }; 353 } else { 354 mutex.lock(); 355 } 356 357 WaitTimeoutResult(!(result.is_unparked() || requeued)) 358 } 359 360 /// Waits on this condition variable for a notification, timing out after a 361 /// specified duration. 362 /// 363 /// The semantics of this function are equivalent to `wait()` except that 364 /// the thread will be blocked for roughly no longer than `timeout`. This 365 /// method should not be used for precise timing due to anomalies such as 366 /// preemption or platform differences that may not cause the maximum 367 /// amount of time waited to be precisely `timeout`. 368 /// 369 /// Note that the best effort is made to ensure that the time waited is 370 /// measured with a monotonic clock, and not affected by the changes made to 371 /// the system time. 372 /// 373 /// The returned `WaitTimeoutResult` value indicates if the timeout is 374 /// known to have elapsed. 375 /// 376 /// Like `wait`, the lock specified will be re-acquired when this function 377 /// returns, regardless of whether the timeout elapsed or not. 378 #[inline] wait_for<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Duration, ) -> WaitTimeoutResult379 pub fn wait_for<T: ?Sized>( 380 &self, 381 mutex_guard: &mut MutexGuard<'_, T>, 382 timeout: Duration, 383 ) -> WaitTimeoutResult { 384 let deadline = util::to_deadline(timeout); 385 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline) 386 } 387 388 #[inline] wait_while_until_internal<T, F>( &self, mutex_guard: &mut MutexGuard<'_, T>, mut condition: F, timeout: Option<Instant>, ) -> WaitTimeoutResult where T: ?Sized, F: FnMut(&mut T) -> bool,389 fn wait_while_until_internal<T, F>( 390 &self, 391 mutex_guard: &mut MutexGuard<'_, T>, 392 mut condition: F, 393 timeout: Option<Instant>, 394 ) -> WaitTimeoutResult 395 where 396 T: ?Sized, 397 F: FnMut(&mut T) -> bool, 398 { 399 let mut result = WaitTimeoutResult(false); 400 401 while !result.timed_out() && condition(mutex_guard.deref_mut()) { 402 result = 403 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout); 404 } 405 406 result 407 } 408 /// Blocks the current thread until this condition variable receives a 409 /// notification. If the provided condition evaluates to `false`, then the 410 /// thread is no longer blocked and the operation is completed. If the 411 /// condition evaluates to `true`, then the thread is blocked again and 412 /// waits for another notification before repeating this process. 413 /// 414 /// This function will atomically unlock the mutex specified (represented by 415 /// `mutex_guard`) and block the current thread. This means that any calls 416 /// to `notify_*()` which happen logically after the mutex is unlocked are 417 /// candidates to wake this thread up. When this function call returns, the 418 /// lock specified will have been re-acquired. 419 /// 420 /// # Panics 421 /// 422 /// This function will panic if another thread is waiting on the `Condvar` 423 /// with a different `Mutex` object. 424 #[inline] wait_while<T, F>(&self, mutex_guard: &mut MutexGuard<'_, T>, condition: F) where T: ?Sized, F: FnMut(&mut T) -> bool,425 pub fn wait_while<T, F>(&self, mutex_guard: &mut MutexGuard<'_, T>, condition: F) 426 where 427 T: ?Sized, 428 F: FnMut(&mut T) -> bool, 429 { 430 self.wait_while_until_internal(mutex_guard, condition, None); 431 } 432 433 /// Waits on this condition variable for a notification, timing out after 434 /// the specified time instant. If the provided condition evaluates to 435 /// `false`, then the thread is no longer blocked and the operation is 436 /// completed. If the condition evaluates to `true`, then the thread is 437 /// blocked again and waits for another notification before repeating 438 /// this process. 439 /// 440 /// The semantics of this function are equivalent to `wait()` except that 441 /// the thread will be blocked roughly until `timeout` is reached. This 442 /// method should not be used for precise timing due to anomalies such as 443 /// preemption or platform differences that may not cause the maximum 444 /// amount of time waited to be precisely `timeout`. 445 /// 446 /// Note that the best effort is made to ensure that the time waited is 447 /// measured with a monotonic clock, and not affected by the changes made to 448 /// the system time. 449 /// 450 /// The returned `WaitTimeoutResult` value indicates if the timeout is 451 /// known to have elapsed. 452 /// 453 /// Like `wait`, the lock specified will be re-acquired when this function 454 /// returns, regardless of whether the timeout elapsed or not. 455 /// 456 /// # Panics 457 /// 458 /// This function will panic if another thread is waiting on the `Condvar` 459 /// with a different `Mutex` object. 460 #[inline] wait_while_until<T, F>( &self, mutex_guard: &mut MutexGuard<'_, T>, condition: F, timeout: Instant, ) -> WaitTimeoutResult where T: ?Sized, F: FnMut(&mut T) -> bool,461 pub fn wait_while_until<T, F>( 462 &self, 463 mutex_guard: &mut MutexGuard<'_, T>, 464 condition: F, 465 timeout: Instant, 466 ) -> WaitTimeoutResult 467 where 468 T: ?Sized, 469 F: FnMut(&mut T) -> bool, 470 { 471 self.wait_while_until_internal(mutex_guard, condition, Some(timeout)) 472 } 473 474 /// Waits on this condition variable for a notification, timing out after a 475 /// specified duration. If the provided condition evaluates to `false`, 476 /// then the thread is no longer blocked and the operation is completed. 477 /// If the condition evaluates to `true`, then the thread is blocked again 478 /// and waits for another notification before repeating this process. 479 /// 480 /// The semantics of this function are equivalent to `wait()` except that 481 /// the thread will be blocked for roughly no longer than `timeout`. This 482 /// method should not be used for precise timing due to anomalies such as 483 /// preemption or platform differences that may not cause the maximum 484 /// amount of time waited to be precisely `timeout`. 485 /// 486 /// Note that the best effort is made to ensure that the time waited is 487 /// measured with a monotonic clock, and not affected by the changes made to 488 /// the system time. 489 /// 490 /// The returned `WaitTimeoutResult` value indicates if the timeout is 491 /// known to have elapsed. 492 /// 493 /// Like `wait`, the lock specified will be re-acquired when this function 494 /// returns, regardless of whether the timeout elapsed or not. 495 #[inline] wait_while_for<T: ?Sized, F>( &self, mutex_guard: &mut MutexGuard<'_, T>, condition: F, timeout: Duration, ) -> WaitTimeoutResult where F: FnMut(&mut T) -> bool,496 pub fn wait_while_for<T: ?Sized, F>( 497 &self, 498 mutex_guard: &mut MutexGuard<'_, T>, 499 condition: F, 500 timeout: Duration, 501 ) -> WaitTimeoutResult 502 where 503 F: FnMut(&mut T) -> bool, 504 { 505 let deadline = util::to_deadline(timeout); 506 self.wait_while_until_internal(mutex_guard, condition, deadline) 507 } 508 } 509 510 impl Default for Condvar { 511 #[inline] default() -> Condvar512 fn default() -> Condvar { 513 Condvar::new() 514 } 515 } 516 517 impl fmt::Debug for Condvar { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result518 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 519 f.pad("Condvar { .. }") 520 } 521 } 522 523 #[cfg(test)] 524 mod tests { 525 use crate::{Condvar, Mutex, MutexGuard}; 526 use std::sync::mpsc::channel; 527 use std::sync::Arc; 528 use std::thread; 529 use std::thread::sleep; 530 use std::thread::JoinHandle; 531 use std::time::Duration; 532 use std::time::Instant; 533 534 #[test] smoke()535 fn smoke() { 536 let c = Condvar::new(); 537 c.notify_one(); 538 c.notify_all(); 539 } 540 541 #[test] notify_one()542 fn notify_one() { 543 let m = Arc::new(Mutex::new(())); 544 let m2 = m.clone(); 545 let c = Arc::new(Condvar::new()); 546 let c2 = c.clone(); 547 548 let mut g = m.lock(); 549 let _t = thread::spawn(move || { 550 let _g = m2.lock(); 551 c2.notify_one(); 552 }); 553 c.wait(&mut g); 554 } 555 556 #[test] notify_all()557 fn notify_all() { 558 const N: usize = 10; 559 560 let data = Arc::new((Mutex::new(0), Condvar::new())); 561 let (tx, rx) = channel(); 562 for _ in 0..N { 563 let data = data.clone(); 564 let tx = tx.clone(); 565 thread::spawn(move || { 566 let &(ref lock, ref cond) = &*data; 567 let mut cnt = lock.lock(); 568 *cnt += 1; 569 if *cnt == N { 570 tx.send(()).unwrap(); 571 } 572 while *cnt != 0 { 573 cond.wait(&mut cnt); 574 } 575 tx.send(()).unwrap(); 576 }); 577 } 578 drop(tx); 579 580 let &(ref lock, ref cond) = &*data; 581 rx.recv().unwrap(); 582 let mut cnt = lock.lock(); 583 *cnt = 0; 584 cond.notify_all(); 585 drop(cnt); 586 587 for _ in 0..N { 588 rx.recv().unwrap(); 589 } 590 } 591 592 #[test] notify_one_return_true()593 fn notify_one_return_true() { 594 let m = Arc::new(Mutex::new(())); 595 let m2 = m.clone(); 596 let c = Arc::new(Condvar::new()); 597 let c2 = c.clone(); 598 599 let mut g = m.lock(); 600 let _t = thread::spawn(move || { 601 let _g = m2.lock(); 602 assert!(c2.notify_one()); 603 }); 604 c.wait(&mut g); 605 } 606 607 #[test] notify_one_return_false()608 fn notify_one_return_false() { 609 let m = Arc::new(Mutex::new(())); 610 let c = Arc::new(Condvar::new()); 611 612 let _t = thread::spawn(move || { 613 let _g = m.lock(); 614 assert!(!c.notify_one()); 615 }); 616 } 617 618 #[test] notify_all_return()619 fn notify_all_return() { 620 const N: usize = 10; 621 622 let data = Arc::new((Mutex::new(0), Condvar::new())); 623 let (tx, rx) = channel(); 624 for _ in 0..N { 625 let data = data.clone(); 626 let tx = tx.clone(); 627 thread::spawn(move || { 628 let &(ref lock, ref cond) = &*data; 629 let mut cnt = lock.lock(); 630 *cnt += 1; 631 if *cnt == N { 632 tx.send(()).unwrap(); 633 } 634 while *cnt != 0 { 635 cond.wait(&mut cnt); 636 } 637 tx.send(()).unwrap(); 638 }); 639 } 640 drop(tx); 641 642 let &(ref lock, ref cond) = &*data; 643 rx.recv().unwrap(); 644 let mut cnt = lock.lock(); 645 *cnt = 0; 646 assert_eq!(cond.notify_all(), N); 647 drop(cnt); 648 649 for _ in 0..N { 650 rx.recv().unwrap(); 651 } 652 653 assert_eq!(cond.notify_all(), 0); 654 } 655 656 #[test] wait_for()657 fn wait_for() { 658 let m = Arc::new(Mutex::new(())); 659 let m2 = m.clone(); 660 let c = Arc::new(Condvar::new()); 661 let c2 = c.clone(); 662 663 let mut g = m.lock(); 664 let no_timeout = c.wait_for(&mut g, Duration::from_millis(1)); 665 assert!(no_timeout.timed_out()); 666 667 let _t = thread::spawn(move || { 668 let _g = m2.lock(); 669 c2.notify_one(); 670 }); 671 let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value())); 672 assert!(!timeout_res.timed_out()); 673 674 drop(g); 675 } 676 677 #[test] wait_until()678 fn wait_until() { 679 let m = Arc::new(Mutex::new(())); 680 let m2 = m.clone(); 681 let c = Arc::new(Condvar::new()); 682 let c2 = c.clone(); 683 684 let mut g = m.lock(); 685 let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1)); 686 assert!(no_timeout.timed_out()); 687 let _t = thread::spawn(move || { 688 let _g = m2.lock(); 689 c2.notify_one(); 690 }); 691 let timeout_res = c.wait_until( 692 &mut g, 693 Instant::now() + Duration::from_millis(u32::max_value() as u64), 694 ); 695 assert!(!timeout_res.timed_out()); 696 drop(g); 697 } 698 spawn_wait_while_notifier( mutex: Arc<Mutex<u32>>, cv: Arc<Condvar>, num_iters: u32, timeout: Option<Instant>, ) -> JoinHandle<()>699 fn spawn_wait_while_notifier( 700 mutex: Arc<Mutex<u32>>, 701 cv: Arc<Condvar>, 702 num_iters: u32, 703 timeout: Option<Instant>, 704 ) -> JoinHandle<()> { 705 thread::spawn(move || { 706 for epoch in 1..=num_iters { 707 // spin to wait for main test thread to block 708 // before notifying it to wake back up and check 709 // its condition. 710 let mut sleep_backoff = Duration::from_millis(1); 711 let _mutex_guard = loop { 712 let mutex_guard = mutex.lock(); 713 714 if let Some(timeout) = timeout { 715 if Instant::now() >= timeout { 716 return; 717 } 718 } 719 720 if *mutex_guard == epoch { 721 break mutex_guard; 722 } 723 724 drop(mutex_guard); 725 726 // give main test thread a good chance to 727 // acquire the lock before this thread does. 728 sleep(sleep_backoff); 729 sleep_backoff *= 2; 730 }; 731 732 cv.notify_one(); 733 } 734 }) 735 } 736 737 #[test] wait_while_until_internal_does_not_wait_if_initially_false()738 fn wait_while_until_internal_does_not_wait_if_initially_false() { 739 let mutex = Arc::new(Mutex::new(0)); 740 let cv = Arc::new(Condvar::new()); 741 742 let condition = |counter: &mut u32| { 743 *counter += 1; 744 false 745 }; 746 747 let mut mutex_guard = mutex.lock(); 748 let timeout_result = cv 749 .wait_while_until_internal(&mut mutex_guard, condition, None); 750 751 assert!(!timeout_result.timed_out()); 752 assert!(*mutex_guard == 1); 753 } 754 755 #[test] wait_while_until_internal_times_out_before_false()756 fn wait_while_until_internal_times_out_before_false() { 757 let mutex = Arc::new(Mutex::new(0)); 758 let cv = Arc::new(Condvar::new()); 759 760 let num_iters = 3; 761 let condition = |counter: &mut u32| { 762 *counter += 1; 763 true 764 }; 765 766 let mut mutex_guard = mutex.lock(); 767 let timeout = Some(Instant::now() + Duration::from_millis(500)); 768 let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, timeout); 769 770 let timeout_result = 771 cv.wait_while_until_internal(&mut mutex_guard, condition, timeout); 772 773 assert!(timeout_result.timed_out()); 774 assert!(*mutex_guard == num_iters + 1); 775 776 // prevent deadlock with notifier 777 drop(mutex_guard); 778 handle.join().unwrap(); 779 } 780 781 #[test] wait_while_until_internal()782 fn wait_while_until_internal() { 783 let mutex = Arc::new(Mutex::new(0)); 784 let cv = Arc::new(Condvar::new()); 785 786 let num_iters = 4; 787 788 let condition = |counter: &mut u32| { 789 *counter += 1; 790 *counter <= num_iters 791 }; 792 793 let mut mutex_guard = mutex.lock(); 794 let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None); 795 796 let timeout_result = 797 cv.wait_while_until_internal(&mut mutex_guard, condition, None); 798 799 assert!(!timeout_result.timed_out()); 800 assert!(*mutex_guard == num_iters + 1); 801 802 let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None); 803 handle.join().unwrap(); 804 805 assert!(!timeout_result.timed_out()); 806 assert!(*mutex_guard == num_iters + 2); 807 } 808 809 #[test] 810 #[should_panic] two_mutexes()811 fn two_mutexes() { 812 let m = Arc::new(Mutex::new(())); 813 let m2 = m.clone(); 814 let m3 = Arc::new(Mutex::new(())); 815 let c = Arc::new(Condvar::new()); 816 let c2 = c.clone(); 817 818 // Make sure we don't leave the child thread dangling 819 struct PanicGuard<'a>(&'a Condvar); 820 impl<'a> Drop for PanicGuard<'a> { 821 fn drop(&mut self) { 822 self.0.notify_one(); 823 } 824 } 825 826 let (tx, rx) = channel(); 827 let g = m.lock(); 828 let _t = thread::spawn(move || { 829 let mut g = m2.lock(); 830 tx.send(()).unwrap(); 831 c2.wait(&mut g); 832 }); 833 drop(g); 834 rx.recv().unwrap(); 835 let _g = m.lock(); 836 let _guard = PanicGuard(&*c); 837 c.wait(&mut m3.lock()); 838 } 839 840 #[test] two_mutexes_disjoint()841 fn two_mutexes_disjoint() { 842 let m = Arc::new(Mutex::new(())); 843 let m2 = m.clone(); 844 let m3 = Arc::new(Mutex::new(())); 845 let c = Arc::new(Condvar::new()); 846 let c2 = c.clone(); 847 848 let mut g = m.lock(); 849 let _t = thread::spawn(move || { 850 let _g = m2.lock(); 851 c2.notify_one(); 852 }); 853 c.wait(&mut g); 854 drop(g); 855 856 let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1)); 857 } 858 859 #[test] test_debug_condvar()860 fn test_debug_condvar() { 861 let c = Condvar::new(); 862 assert_eq!(format!("{:?}", c), "Condvar { .. }"); 863 } 864 865 #[test] test_condvar_requeue()866 fn test_condvar_requeue() { 867 let m = Arc::new(Mutex::new(())); 868 let m2 = m.clone(); 869 let c = Arc::new(Condvar::new()); 870 let c2 = c.clone(); 871 let t = thread::spawn(move || { 872 let mut g = m2.lock(); 873 c2.wait(&mut g); 874 }); 875 876 let mut g = m.lock(); 877 while !c.notify_one() { 878 // Wait for the thread to get into wait() 879 MutexGuard::bump(&mut g); 880 // Yield, so the other thread gets a chance to do something. 881 // (At least Miri needs this, because it doesn't preempt threads.) 882 thread::yield_now(); 883 } 884 // The thread should have been requeued to the mutex, which we wake up now. 885 drop(g); 886 t.join().unwrap(); 887 } 888 889 #[test] test_issue_129()890 fn test_issue_129() { 891 let locks = Arc::new((Mutex::new(()), Condvar::new())); 892 893 let (tx, rx) = channel(); 894 for _ in 0..4 { 895 let locks = locks.clone(); 896 let tx = tx.clone(); 897 thread::spawn(move || { 898 let mut guard = locks.0.lock(); 899 locks.1.wait(&mut guard); 900 locks.1.wait_for(&mut guard, Duration::from_millis(1)); 901 locks.1.notify_one(); 902 tx.send(()).unwrap(); 903 }); 904 } 905 906 thread::sleep(Duration::from_millis(100)); 907 locks.1.notify_one(); 908 909 for _ in 0..4 { 910 assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(())); 911 } 912 } 913 } 914 915 /// This module contains an integration test that is heavily inspired from WebKit's own integration 916 /// tests for it's own Condvar. 917 #[cfg(test)] 918 mod webkit_queue_test { 919 use crate::{Condvar, Mutex, MutexGuard}; 920 use std::{collections::VecDeque, sync::Arc, thread, time::Duration}; 921 922 #[derive(Clone, Copy)] 923 enum Timeout { 924 Bounded(Duration), 925 Forever, 926 } 927 928 #[derive(Clone, Copy)] 929 enum NotifyStyle { 930 One, 931 All, 932 } 933 934 struct Queue { 935 items: VecDeque<usize>, 936 should_continue: bool, 937 } 938 939 impl Queue { new() -> Self940 fn new() -> Self { 941 Self { 942 items: VecDeque::new(), 943 should_continue: true, 944 } 945 } 946 } 947 wait<T: ?Sized>( condition: &Condvar, lock: &mut MutexGuard<'_, T>, predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, timeout: &Timeout, )948 fn wait<T: ?Sized>( 949 condition: &Condvar, 950 lock: &mut MutexGuard<'_, T>, 951 predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, 952 timeout: &Timeout, 953 ) { 954 while !predicate(lock) { 955 match timeout { 956 Timeout::Forever => condition.wait(lock), 957 Timeout::Bounded(bound) => { 958 condition.wait_for(lock, *bound); 959 } 960 } 961 } 962 } 963 notify(style: NotifyStyle, condition: &Condvar, should_notify: bool)964 fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) { 965 match style { 966 NotifyStyle::One => { 967 condition.notify_one(); 968 } 969 NotifyStyle::All => { 970 if should_notify { 971 condition.notify_all(); 972 } 973 } 974 } 975 } 976 run_queue_test( num_producers: usize, num_consumers: usize, max_queue_size: usize, messages_per_producer: usize, notify_style: NotifyStyle, timeout: Timeout, delay: Duration, )977 fn run_queue_test( 978 num_producers: usize, 979 num_consumers: usize, 980 max_queue_size: usize, 981 messages_per_producer: usize, 982 notify_style: NotifyStyle, 983 timeout: Timeout, 984 delay: Duration, 985 ) { 986 let input_queue = Arc::new(Mutex::new(Queue::new())); 987 let empty_condition = Arc::new(Condvar::new()); 988 let full_condition = Arc::new(Condvar::new()); 989 990 let output_vec = Arc::new(Mutex::new(vec![])); 991 992 let consumers = (0..num_consumers) 993 .map(|_| { 994 consumer_thread( 995 input_queue.clone(), 996 empty_condition.clone(), 997 full_condition.clone(), 998 timeout, 999 notify_style, 1000 output_vec.clone(), 1001 max_queue_size, 1002 ) 1003 }) 1004 .collect::<Vec<_>>(); 1005 let producers = (0..num_producers) 1006 .map(|_| { 1007 producer_thread( 1008 messages_per_producer, 1009 input_queue.clone(), 1010 empty_condition.clone(), 1011 full_condition.clone(), 1012 timeout, 1013 notify_style, 1014 max_queue_size, 1015 ) 1016 }) 1017 .collect::<Vec<_>>(); 1018 1019 thread::sleep(delay); 1020 1021 for producer in producers.into_iter() { 1022 producer.join().expect("Producer thread panicked"); 1023 } 1024 1025 { 1026 let mut input_queue = input_queue.lock(); 1027 input_queue.should_continue = false; 1028 } 1029 empty_condition.notify_all(); 1030 1031 for consumer in consumers.into_iter() { 1032 consumer.join().expect("Consumer thread panicked"); 1033 } 1034 1035 let mut output_vec = output_vec.lock(); 1036 assert_eq!(output_vec.len(), num_producers * messages_per_producer); 1037 output_vec.sort(); 1038 for msg_idx in 0..messages_per_producer { 1039 for producer_idx in 0..num_producers { 1040 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]); 1041 } 1042 } 1043 } 1044 consumer_thread( input_queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, output_queue: Arc<Mutex<Vec<usize>>>, max_queue_size: usize, ) -> thread::JoinHandle<()>1045 fn consumer_thread( 1046 input_queue: Arc<Mutex<Queue>>, 1047 empty_condition: Arc<Condvar>, 1048 full_condition: Arc<Condvar>, 1049 timeout: Timeout, 1050 notify_style: NotifyStyle, 1051 output_queue: Arc<Mutex<Vec<usize>>>, 1052 max_queue_size: usize, 1053 ) -> thread::JoinHandle<()> { 1054 thread::spawn(move || loop { 1055 let (should_notify, result) = { 1056 let mut queue = input_queue.lock(); 1057 wait( 1058 &*empty_condition, 1059 &mut queue, 1060 |state| -> bool { !state.items.is_empty() || !state.should_continue }, 1061 &timeout, 1062 ); 1063 if queue.items.is_empty() && !queue.should_continue { 1064 return; 1065 } 1066 let should_notify = queue.items.len() == max_queue_size; 1067 let result = queue.items.pop_front(); 1068 std::mem::drop(queue); 1069 (should_notify, result) 1070 }; 1071 notify(notify_style, &*full_condition, should_notify); 1072 1073 if let Some(result) = result { 1074 output_queue.lock().push(result); 1075 } 1076 }) 1077 } 1078 producer_thread( num_messages: usize, queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, max_queue_size: usize, ) -> thread::JoinHandle<()>1079 fn producer_thread( 1080 num_messages: usize, 1081 queue: Arc<Mutex<Queue>>, 1082 empty_condition: Arc<Condvar>, 1083 full_condition: Arc<Condvar>, 1084 timeout: Timeout, 1085 notify_style: NotifyStyle, 1086 max_queue_size: usize, 1087 ) -> thread::JoinHandle<()> { 1088 thread::spawn(move || { 1089 for message in 0..num_messages { 1090 let should_notify = { 1091 let mut queue = queue.lock(); 1092 wait( 1093 &*full_condition, 1094 &mut queue, 1095 |state| state.items.len() < max_queue_size, 1096 &timeout, 1097 ); 1098 let should_notify = queue.items.is_empty(); 1099 queue.items.push_back(message); 1100 std::mem::drop(queue); 1101 should_notify 1102 }; 1103 notify(notify_style, &*empty_condition, should_notify); 1104 } 1105 }) 1106 } 1107 1108 macro_rules! run_queue_tests { 1109 ( $( $name:ident( 1110 num_producers: $num_producers:expr, 1111 num_consumers: $num_consumers:expr, 1112 max_queue_size: $max_queue_size:expr, 1113 messages_per_producer: $messages_per_producer:expr, 1114 notification_style: $notification_style:expr, 1115 timeout: $timeout:expr, 1116 delay_seconds: $delay_seconds:expr); 1117 )* ) => { 1118 $(#[test] 1119 fn $name() { 1120 let delay = Duration::from_secs($delay_seconds); 1121 run_queue_test( 1122 $num_producers, 1123 $num_consumers, 1124 $max_queue_size, 1125 $messages_per_producer, 1126 $notification_style, 1127 $timeout, 1128 delay, 1129 ); 1130 })* 1131 }; 1132 } 1133 1134 run_queue_tests! { 1135 sanity_check_queue( 1136 num_producers: 1, 1137 num_consumers: 1, 1138 max_queue_size: 1, 1139 messages_per_producer: 100_000, 1140 notification_style: NotifyStyle::All, 1141 timeout: Timeout::Bounded(Duration::from_secs(1)), 1142 delay_seconds: 0 1143 ); 1144 sanity_check_queue_timeout( 1145 num_producers: 1, 1146 num_consumers: 1, 1147 max_queue_size: 1, 1148 messages_per_producer: 100_000, 1149 notification_style: NotifyStyle::All, 1150 timeout: Timeout::Forever, 1151 delay_seconds: 0 1152 ); 1153 new_test_without_timeout_5( 1154 num_producers: 1, 1155 num_consumers: 5, 1156 max_queue_size: 1, 1157 messages_per_producer: 100_000, 1158 notification_style: NotifyStyle::All, 1159 timeout: Timeout::Forever, 1160 delay_seconds: 0 1161 ); 1162 one_producer_one_consumer_one_slot( 1163 num_producers: 1, 1164 num_consumers: 1, 1165 max_queue_size: 1, 1166 messages_per_producer: 100_000, 1167 notification_style: NotifyStyle::All, 1168 timeout: Timeout::Forever, 1169 delay_seconds: 0 1170 ); 1171 one_producer_one_consumer_one_slot_timeout( 1172 num_producers: 1, 1173 num_consumers: 1, 1174 max_queue_size: 1, 1175 messages_per_producer: 100_000, 1176 notification_style: NotifyStyle::All, 1177 timeout: Timeout::Forever, 1178 delay_seconds: 1 1179 ); 1180 one_producer_one_consumer_hundred_slots( 1181 num_producers: 1, 1182 num_consumers: 1, 1183 max_queue_size: 100, 1184 messages_per_producer: 1_000_000, 1185 notification_style: NotifyStyle::All, 1186 timeout: Timeout::Forever, 1187 delay_seconds: 0 1188 ); 1189 ten_producers_one_consumer_one_slot( 1190 num_producers: 10, 1191 num_consumers: 1, 1192 max_queue_size: 1, 1193 messages_per_producer: 10000, 1194 notification_style: NotifyStyle::All, 1195 timeout: Timeout::Forever, 1196 delay_seconds: 0 1197 ); 1198 ten_producers_one_consumer_hundred_slots_notify_all( 1199 num_producers: 10, 1200 num_consumers: 1, 1201 max_queue_size: 100, 1202 messages_per_producer: 10000, 1203 notification_style: NotifyStyle::All, 1204 timeout: Timeout::Forever, 1205 delay_seconds: 0 1206 ); 1207 ten_producers_one_consumer_hundred_slots_notify_one( 1208 num_producers: 10, 1209 num_consumers: 1, 1210 max_queue_size: 100, 1211 messages_per_producer: 10000, 1212 notification_style: NotifyStyle::One, 1213 timeout: Timeout::Forever, 1214 delay_seconds: 0 1215 ); 1216 one_producer_ten_consumers_one_slot( 1217 num_producers: 1, 1218 num_consumers: 10, 1219 max_queue_size: 1, 1220 messages_per_producer: 10000, 1221 notification_style: NotifyStyle::All, 1222 timeout: Timeout::Forever, 1223 delay_seconds: 0 1224 ); 1225 one_producer_ten_consumers_hundred_slots_notify_all( 1226 num_producers: 1, 1227 num_consumers: 10, 1228 max_queue_size: 100, 1229 messages_per_producer: 100_000, 1230 notification_style: NotifyStyle::All, 1231 timeout: Timeout::Forever, 1232 delay_seconds: 0 1233 ); 1234 one_producer_ten_consumers_hundred_slots_notify_one( 1235 num_producers: 1, 1236 num_consumers: 10, 1237 max_queue_size: 100, 1238 messages_per_producer: 100_000, 1239 notification_style: NotifyStyle::One, 1240 timeout: Timeout::Forever, 1241 delay_seconds: 0 1242 ); 1243 ten_producers_ten_consumers_one_slot( 1244 num_producers: 10, 1245 num_consumers: 10, 1246 max_queue_size: 1, 1247 messages_per_producer: 50000, 1248 notification_style: NotifyStyle::All, 1249 timeout: Timeout::Forever, 1250 delay_seconds: 0 1251 ); 1252 ten_producers_ten_consumers_hundred_slots_notify_all( 1253 num_producers: 10, 1254 num_consumers: 10, 1255 max_queue_size: 100, 1256 messages_per_producer: 50000, 1257 notification_style: NotifyStyle::All, 1258 timeout: Timeout::Forever, 1259 delay_seconds: 0 1260 ); 1261 ten_producers_ten_consumers_hundred_slots_notify_one( 1262 num_producers: 10, 1263 num_consumers: 10, 1264 max_queue_size: 100, 1265 messages_per_producer: 50000, 1266 notification_style: NotifyStyle::One, 1267 timeout: Timeout::Forever, 1268 delay_seconds: 0 1269 ); 1270 } 1271 } 1272