1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::mutex::MutexGuard;
9 use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10 use crate::{deadlock, util};
11 use core::{
12     fmt, ptr,
13     sync::atomic::{AtomicPtr, Ordering},
14 };
15 use lock_api::RawMutex as RawMutex_;
16 use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
17 use std::ops::DerefMut;
18 use std::time::{Duration, Instant};
19 
20 /// A type indicating whether a timed wait on a condition variable returned
21 /// due to a time out or not.
22 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
23 pub struct WaitTimeoutResult(bool);
24 
25 impl WaitTimeoutResult {
26     /// Returns whether the wait was known to have timed out.
27     #[inline]
timed_out(self) -> bool28     pub fn timed_out(self) -> bool {
29         self.0
30     }
31 }
32 
33 /// A Condition Variable
34 ///
35 /// Condition variables represent the ability to block a thread such that it
36 /// consumes no CPU time while waiting for an event to occur. Condition
37 /// variables are typically associated with a boolean predicate (a condition)
38 /// and a mutex. The predicate is always verified inside of the mutex before
39 /// determining that thread must block.
40 ///
41 /// Note that this module places one additional restriction over the system
42 /// condition variables: each condvar can be used with only one mutex at a
43 /// time. Any attempt to use multiple mutexes on the same condition variable
44 /// simultaneously will result in a runtime panic. However it is possible to
45 /// switch to a different mutex if there are no threads currently waiting on
46 /// the condition variable.
47 ///
48 /// # Differences from the standard library `Condvar`
49 ///
50 /// - No spurious wakeups: A wait will only return a non-timeout result if it
51 ///   was woken up by `notify_one` or `notify_all`.
52 /// - `Condvar::notify_all` will only wake up a single thread, the rest are
53 ///   requeued to wait for the `Mutex` to be unlocked by the thread that was
54 ///   woken up.
55 /// - Only requires 1 word of space, whereas the standard library boxes the
56 ///   `Condvar` due to platform limitations.
57 /// - Can be statically constructed.
58 /// - Does not require any drop glue when dropped.
59 /// - Inline fast path for the uncontended case.
60 ///
61 /// # Examples
62 ///
63 /// ```
64 /// use parking_lot::{Mutex, Condvar};
65 /// use std::sync::Arc;
66 /// use std::thread;
67 ///
68 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
69 /// let pair2 = pair.clone();
70 ///
71 /// // Inside of our lock, spawn a new thread, and then wait for it to start
72 /// thread::spawn(move|| {
73 ///     let &(ref lock, ref cvar) = &*pair2;
74 ///     let mut started = lock.lock();
75 ///     *started = true;
76 ///     cvar.notify_one();
77 /// });
78 ///
79 /// // wait for the thread to start up
80 /// let &(ref lock, ref cvar) = &*pair;
81 /// let mut started = lock.lock();
82 /// if !*started {
83 ///     cvar.wait(&mut started);
84 /// }
85 /// // Note that we used an if instead of a while loop above. This is only
86 /// // possible because parking_lot's Condvar will never spuriously wake up.
87 /// // This means that wait() will only return after notify_one or notify_all is
88 /// // called.
89 /// ```
90 pub struct Condvar {
91     state: AtomicPtr<RawMutex>,
92 }
93 
94 impl Condvar {
95     /// Creates a new condition variable which is ready to be waited on and
96     /// notified.
97     #[inline]
new() -> Condvar98     pub const fn new() -> Condvar {
99         Condvar {
100             state: AtomicPtr::new(ptr::null_mut()),
101         }
102     }
103 
104     /// Wakes up one blocked thread on this condvar.
105     ///
106     /// Returns whether a thread was woken up.
107     ///
108     /// If there is a blocked thread on this condition variable, then it will
109     /// be woken up from its call to `wait` or `wait_timeout`. Calls to
110     /// `notify_one` are not buffered in any way.
111     ///
112     /// To wake up all threads, see `notify_all()`.
113     ///
114     /// # Examples
115     ///
116     /// ```
117     /// use parking_lot::Condvar;
118     ///
119     /// let condvar = Condvar::new();
120     ///
121     /// // do something with condvar, share it with other threads
122     ///
123     /// if !condvar.notify_one() {
124     ///     println!("Nobody was listening for this.");
125     /// }
126     /// ```
127     #[inline]
notify_one(&self) -> bool128     pub fn notify_one(&self) -> bool {
129         // Nothing to do if there are no waiting threads
130         let state = self.state.load(Ordering::Relaxed);
131         if state.is_null() {
132             return false;
133         }
134 
135         self.notify_one_slow(state)
136     }
137 
138     #[cold]
notify_one_slow(&self, mutex: *mut RawMutex) -> bool139     fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
140         // Unpark one thread and requeue the rest onto the mutex
141         let from = self as *const _ as usize;
142         let to = mutex as usize;
143             let validate = || {
144                 // Make sure that our atomic state still points to the same
145                 // mutex. If not then it means that all threads on the current
146                 // mutex were woken up and a new waiting thread switched to a
147                 // different mutex. In that case we can get away with doing
148                 // nothing.
149                 if self.state.load(Ordering::Relaxed) != mutex {
150                     return RequeueOp::Abort;
151                 }
152 
153                 // Unpark one thread if the mutex is unlocked, otherwise just
154                 // requeue everything to the mutex. This is safe to do here
155                 // since unlocking the mutex when the parked bit is set requires
156                 // locking the queue. There is the possibility of a race if the
157                 // mutex gets locked after we check, but that doesn't matter in
158                 // this case.
159                 if unsafe { (*mutex).mark_parked_if_locked() } {
160                     RequeueOp::RequeueOne
161                 } else {
162                     RequeueOp::UnparkOne
163                 }
164             };
165             let callback = |_op, result: UnparkResult| {
166                 // Clear our state if there are no more waiting threads
167                 if !result.have_more_threads {
168                     self.state.store(ptr::null_mut(), Ordering::Relaxed);
169                 }
170                 TOKEN_NORMAL
171             };
172             let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
173 
174             res.unparked_threads + res.requeued_threads != 0
175     }
176 
177     /// Wakes up all blocked threads on this condvar.
178     ///
179     /// Returns the number of threads woken up.
180     ///
181     /// This method will ensure that any current waiters on the condition
182     /// variable are awoken. Calls to `notify_all()` are not buffered in any
183     /// way.
184     ///
185     /// To wake up only one thread, see `notify_one()`.
186     #[inline]
notify_all(&self) -> usize187     pub fn notify_all(&self) -> usize {
188         // Nothing to do if there are no waiting threads
189         let state = self.state.load(Ordering::Relaxed);
190         if state.is_null() {
191             return 0;
192         }
193 
194         self.notify_all_slow(state)
195     }
196 
197     #[cold]
notify_all_slow(&self, mutex: *mut RawMutex) -> usize198     fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
199             // Unpark one thread and requeue the rest onto the mutex
200             let from = self as *const _ as usize;
201             let to = mutex as usize;
202             let validate = || {
203                 // Make sure that our atomic state still points to the same
204                 // mutex. If not then it means that all threads on the current
205                 // mutex were woken up and a new waiting thread switched to a
206                 // different mutex. In that case we can get away with doing
207                 // nothing.
208                 if self.state.load(Ordering::Relaxed) != mutex {
209                     return RequeueOp::Abort;
210                 }
211 
212                 // Clear our state since we are going to unpark or requeue all
213                 // threads.
214                 self.state.store(ptr::null_mut(), Ordering::Relaxed);
215 
216                 // Unpark one thread if the mutex is unlocked, otherwise just
217                 // requeue everything to the mutex. This is safe to do here
218                 // since unlocking the mutex when the parked bit is set requires
219                 // locking the queue. There is the possibility of a race if the
220                 // mutex gets locked after we check, but that doesn't matter in
221                 // this case.
222                 if unsafe { (*mutex).mark_parked_if_locked() } {
223                     RequeueOp::RequeueAll
224                 } else {
225                     RequeueOp::UnparkOneRequeueRest
226                 }
227             };
228             let callback = |op, result: UnparkResult| {
229                 // If we requeued threads to the mutex, mark it as having
230                 // parked threads. The RequeueAll case is already handled above.
231                 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
232                     unsafe { (*mutex).mark_parked() };
233                 }
234                 TOKEN_NORMAL
235             };
236             let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
237 
238             res.unparked_threads + res.requeued_threads
239     }
240 
241     /// Blocks the current thread until this condition variable receives a
242     /// notification.
243     ///
244     /// This function will atomically unlock the mutex specified (represented by
245     /// `mutex_guard`) and block the current thread. This means that any calls
246     /// to `notify_*()` which happen logically after the mutex is unlocked are
247     /// candidates to wake this thread up. When this function call returns, the
248     /// lock specified will have been re-acquired.
249     ///
250     /// # Panics
251     ///
252     /// This function will panic if another thread is waiting on the `Condvar`
253     /// with a different `Mutex` object.
254     #[inline]
wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>)255     pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
256         self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
257     }
258 
259     /// Waits on this condition variable for a notification, timing out after
260     /// the specified time instant.
261     ///
262     /// The semantics of this function are equivalent to `wait()` except that
263     /// the thread will be blocked roughly until `timeout` is reached. This
264     /// method should not be used for precise timing due to anomalies such as
265     /// preemption or platform differences that may not cause the maximum
266     /// amount of time waited to be precisely `timeout`.
267     ///
268     /// Note that the best effort is made to ensure that the time waited is
269     /// measured with a monotonic clock, and not affected by the changes made to
270     /// the system time.
271     ///
272     /// The returned `WaitTimeoutResult` value indicates if the timeout is
273     /// known to have elapsed.
274     ///
275     /// Like `wait`, the lock specified will be re-acquired when this function
276     /// returns, regardless of whether the timeout elapsed or not.
277     ///
278     /// # Panics
279     ///
280     /// This function will panic if another thread is waiting on the `Condvar`
281     /// with a different `Mutex` object.
282     #[inline]
wait_until<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Instant, ) -> WaitTimeoutResult283     pub fn wait_until<T: ?Sized>(
284         &self,
285         mutex_guard: &mut MutexGuard<'_, T>,
286         timeout: Instant,
287     ) -> WaitTimeoutResult {
288         self.wait_until_internal(
289             unsafe { MutexGuard::mutex(mutex_guard).raw() },
290             Some(timeout),
291         )
292     }
293 
294     // This is a non-generic function to reduce the monomorphization cost of
295     // using `wait_until`.
wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult296     fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
297             let result;
298             let mut bad_mutex = false;
299             let mut requeued = false;
300             {
301                 let addr = self as *const _ as usize;
302                 let lock_addr = mutex as *const _ as *mut _;
303                 let validate = || {
304                     // Ensure we don't use two different mutexes with the same
305                     // Condvar at the same time. This is done while locked to
306                     // avoid races with notify_one
307                     let state = self.state.load(Ordering::Relaxed);
308                     if state.is_null() {
309                         self.state.store(lock_addr, Ordering::Relaxed);
310                     } else if state != lock_addr {
311                         bad_mutex = true;
312                         return false;
313                     }
314                     true
315                 };
316                 let before_sleep = || {
317                     // Unlock the mutex before sleeping...
318                     unsafe { mutex.unlock() };
319                 };
320                 let timed_out = |k, was_last_thread| {
321                     // If we were requeued to a mutex, then we did not time out.
322                     // We'll just park ourselves on the mutex again when we try
323                     // to lock it later.
324                     requeued = k != addr;
325 
326                     // If we were the last thread on the queue then we need to
327                     // clear our state. This is normally done by the
328                     // notify_{one,all} functions when not timing out.
329                     if !requeued && was_last_thread {
330                         self.state.store(ptr::null_mut(), Ordering::Relaxed);
331                     }
332                 };
333                 result = unsafe { parking_lot_core::park(
334                     addr,
335                     validate,
336                     before_sleep,
337                     timed_out,
338                     DEFAULT_PARK_TOKEN,
339                     timeout,
340                 ) };
341             }
342 
343             // Panic if we tried to use multiple mutexes with a Condvar. Note
344             // that at this point the MutexGuard is still locked. It will be
345             // unlocked by the unwinding logic.
346             if bad_mutex {
347                 panic!("attempted to use a condition variable with more than one mutex");
348             }
349 
350             // ... and re-lock it once we are done sleeping
351             if result == ParkResult::Unparked(TOKEN_HANDOFF) {
352                 unsafe { deadlock::acquire_resource(mutex as *const _ as usize) };
353             } else {
354                 mutex.lock();
355             }
356 
357             WaitTimeoutResult(!(result.is_unparked() || requeued))
358     }
359 
360     /// Waits on this condition variable for a notification, timing out after a
361     /// specified duration.
362     ///
363     /// The semantics of this function are equivalent to `wait()` except that
364     /// the thread will be blocked for roughly no longer than `timeout`. This
365     /// method should not be used for precise timing due to anomalies such as
366     /// preemption or platform differences that may not cause the maximum
367     /// amount of time waited to be precisely `timeout`.
368     ///
369     /// Note that the best effort is made to ensure that the time waited is
370     /// measured with a monotonic clock, and not affected by the changes made to
371     /// the system time.
372     ///
373     /// The returned `WaitTimeoutResult` value indicates if the timeout is
374     /// known to have elapsed.
375     ///
376     /// Like `wait`, the lock specified will be re-acquired when this function
377     /// returns, regardless of whether the timeout elapsed or not.
378     #[inline]
wait_for<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Duration, ) -> WaitTimeoutResult379     pub fn wait_for<T: ?Sized>(
380         &self,
381         mutex_guard: &mut MutexGuard<'_, T>,
382         timeout: Duration,
383     ) -> WaitTimeoutResult {
384         let deadline = util::to_deadline(timeout);
385         self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
386     }
387 
388     #[inline]
wait_while_until_internal<T, F>( &self, mutex_guard: &mut MutexGuard<'_, T>, mut condition: F, timeout: Option<Instant>, ) -> WaitTimeoutResult where T: ?Sized, F: FnMut(&mut T) -> bool,389     fn wait_while_until_internal<T, F>(
390         &self,
391         mutex_guard: &mut MutexGuard<'_, T>,
392         mut condition: F,
393         timeout: Option<Instant>,
394     ) -> WaitTimeoutResult
395     where
396         T: ?Sized,
397         F: FnMut(&mut T) -> bool,
398     {
399         let mut result = WaitTimeoutResult(false);
400 
401         while !result.timed_out() && condition(mutex_guard.deref_mut()) {
402             result =
403                 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout);
404         }
405 
406         result
407     }
408     /// Blocks the current thread until this condition variable receives a
409     /// notification. If the provided condition evaluates to `false`, then the
410     /// thread is no longer blocked and the operation is completed. If the
411     /// condition evaluates to `true`, then the thread is blocked again and
412     /// waits for another notification before repeating this process.
413     ///
414     /// This function will atomically unlock the mutex specified (represented by
415     /// `mutex_guard`) and block the current thread. This means that any calls
416     /// to `notify_*()` which happen logically after the mutex is unlocked are
417     /// candidates to wake this thread up. When this function call returns, the
418     /// lock specified will have been re-acquired.
419     ///
420     /// # Panics
421     ///
422     /// This function will panic if another thread is waiting on the `Condvar`
423     /// with a different `Mutex` object.
424     #[inline]
wait_while<T, F>(&self, mutex_guard: &mut MutexGuard<'_, T>, condition: F) where T: ?Sized, F: FnMut(&mut T) -> bool,425     pub fn wait_while<T, F>(&self, mutex_guard: &mut MutexGuard<'_, T>, condition: F)
426     where
427         T: ?Sized,
428         F: FnMut(&mut T) -> bool,
429     {
430         self.wait_while_until_internal(mutex_guard, condition, None);
431     }
432 
433     /// Waits on this condition variable for a notification, timing out after
434     /// the specified time instant. If the provided condition evaluates to
435     /// `false`, then the thread is no longer blocked and the operation is
436     /// completed. If the condition evaluates to `true`, then the thread is
437     /// blocked again and waits for another notification before repeating
438     /// this process.
439     ///
440     /// The semantics of this function are equivalent to `wait()` except that
441     /// the thread will be blocked roughly until `timeout` is reached. This
442     /// method should not be used for precise timing due to anomalies such as
443     /// preemption or platform differences that may not cause the maximum
444     /// amount of time waited to be precisely `timeout`.
445     ///
446     /// Note that the best effort is made to ensure that the time waited is
447     /// measured with a monotonic clock, and not affected by the changes made to
448     /// the system time.
449     ///
450     /// The returned `WaitTimeoutResult` value indicates if the timeout is
451     /// known to have elapsed.
452     ///
453     /// Like `wait`, the lock specified will be re-acquired when this function
454     /// returns, regardless of whether the timeout elapsed or not.
455     ///
456     /// # Panics
457     ///
458     /// This function will panic if another thread is waiting on the `Condvar`
459     /// with a different `Mutex` object.
460     #[inline]
wait_while_until<T, F>( &self, mutex_guard: &mut MutexGuard<'_, T>, condition: F, timeout: Instant, ) -> WaitTimeoutResult where T: ?Sized, F: FnMut(&mut T) -> bool,461     pub fn wait_while_until<T, F>(
462         &self,
463         mutex_guard: &mut MutexGuard<'_, T>,
464         condition: F,
465         timeout: Instant,
466     ) -> WaitTimeoutResult
467     where
468         T: ?Sized,
469         F: FnMut(&mut T) -> bool,
470     {
471         self.wait_while_until_internal(mutex_guard, condition, Some(timeout))
472     }
473 
474     /// Waits on this condition variable for a notification, timing out after a
475     /// specified duration. If the provided condition evaluates to `false`,
476     /// then the thread is no longer blocked and the operation is completed.
477     /// If the condition evaluates to `true`, then the thread is blocked again
478     /// and waits for another notification before repeating this process.
479     ///
480     /// The semantics of this function are equivalent to `wait()` except that
481     /// the thread will be blocked for roughly no longer than `timeout`. This
482     /// method should not be used for precise timing due to anomalies such as
483     /// preemption or platform differences that may not cause the maximum
484     /// amount of time waited to be precisely `timeout`.
485     ///
486     /// Note that the best effort is made to ensure that the time waited is
487     /// measured with a monotonic clock, and not affected by the changes made to
488     /// the system time.
489     ///
490     /// The returned `WaitTimeoutResult` value indicates if the timeout is
491     /// known to have elapsed.
492     ///
493     /// Like `wait`, the lock specified will be re-acquired when this function
494     /// returns, regardless of whether the timeout elapsed or not.
495     #[inline]
wait_while_for<T: ?Sized, F>( &self, mutex_guard: &mut MutexGuard<'_, T>, condition: F, timeout: Duration, ) -> WaitTimeoutResult where F: FnMut(&mut T) -> bool,496     pub fn wait_while_for<T: ?Sized, F>(
497         &self,
498         mutex_guard: &mut MutexGuard<'_, T>,
499         condition: F,
500         timeout: Duration,
501     ) -> WaitTimeoutResult
502     where
503         F: FnMut(&mut T) -> bool,
504     {
505         let deadline = util::to_deadline(timeout);
506         self.wait_while_until_internal(mutex_guard, condition, deadline)
507     }
508 }
509 
510 impl Default for Condvar {
511     #[inline]
default() -> Condvar512     fn default() -> Condvar {
513         Condvar::new()
514     }
515 }
516 
517 impl fmt::Debug for Condvar {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result518     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
519         f.pad("Condvar { .. }")
520     }
521 }
522 
523 #[cfg(test)]
524 mod tests {
525     use crate::{Condvar, Mutex, MutexGuard};
526     use std::sync::mpsc::channel;
527     use std::sync::Arc;
528     use std::thread;
529     use std::thread::sleep;
530     use std::thread::JoinHandle;
531     use std::time::Duration;
532     use std::time::Instant;
533 
534     #[test]
smoke()535     fn smoke() {
536         let c = Condvar::new();
537         c.notify_one();
538         c.notify_all();
539     }
540 
541     #[test]
notify_one()542     fn notify_one() {
543         let m = Arc::new(Mutex::new(()));
544         let m2 = m.clone();
545         let c = Arc::new(Condvar::new());
546         let c2 = c.clone();
547 
548         let mut g = m.lock();
549         let _t = thread::spawn(move || {
550             let _g = m2.lock();
551             c2.notify_one();
552         });
553         c.wait(&mut g);
554     }
555 
556     #[test]
notify_all()557     fn notify_all() {
558         const N: usize = 10;
559 
560         let data = Arc::new((Mutex::new(0), Condvar::new()));
561         let (tx, rx) = channel();
562         for _ in 0..N {
563             let data = data.clone();
564             let tx = tx.clone();
565             thread::spawn(move || {
566                 let &(ref lock, ref cond) = &*data;
567                 let mut cnt = lock.lock();
568                 *cnt += 1;
569                 if *cnt == N {
570                     tx.send(()).unwrap();
571                 }
572                 while *cnt != 0 {
573                     cond.wait(&mut cnt);
574                 }
575                 tx.send(()).unwrap();
576             });
577         }
578         drop(tx);
579 
580         let &(ref lock, ref cond) = &*data;
581         rx.recv().unwrap();
582         let mut cnt = lock.lock();
583         *cnt = 0;
584         cond.notify_all();
585         drop(cnt);
586 
587         for _ in 0..N {
588             rx.recv().unwrap();
589         }
590     }
591 
592     #[test]
notify_one_return_true()593     fn notify_one_return_true() {
594         let m = Arc::new(Mutex::new(()));
595         let m2 = m.clone();
596         let c = Arc::new(Condvar::new());
597         let c2 = c.clone();
598 
599         let mut g = m.lock();
600         let _t = thread::spawn(move || {
601             let _g = m2.lock();
602             assert!(c2.notify_one());
603         });
604         c.wait(&mut g);
605     }
606 
607     #[test]
notify_one_return_false()608     fn notify_one_return_false() {
609         let m = Arc::new(Mutex::new(()));
610         let c = Arc::new(Condvar::new());
611 
612         let _t = thread::spawn(move || {
613             let _g = m.lock();
614             assert!(!c.notify_one());
615         });
616     }
617 
618     #[test]
notify_all_return()619     fn notify_all_return() {
620         const N: usize = 10;
621 
622         let data = Arc::new((Mutex::new(0), Condvar::new()));
623         let (tx, rx) = channel();
624         for _ in 0..N {
625             let data = data.clone();
626             let tx = tx.clone();
627             thread::spawn(move || {
628                 let &(ref lock, ref cond) = &*data;
629                 let mut cnt = lock.lock();
630                 *cnt += 1;
631                 if *cnt == N {
632                     tx.send(()).unwrap();
633                 }
634                 while *cnt != 0 {
635                     cond.wait(&mut cnt);
636                 }
637                 tx.send(()).unwrap();
638             });
639         }
640         drop(tx);
641 
642         let &(ref lock, ref cond) = &*data;
643         rx.recv().unwrap();
644         let mut cnt = lock.lock();
645         *cnt = 0;
646         assert_eq!(cond.notify_all(), N);
647         drop(cnt);
648 
649         for _ in 0..N {
650             rx.recv().unwrap();
651         }
652 
653         assert_eq!(cond.notify_all(), 0);
654     }
655 
656     #[test]
wait_for()657     fn wait_for() {
658         let m = Arc::new(Mutex::new(()));
659         let m2 = m.clone();
660         let c = Arc::new(Condvar::new());
661         let c2 = c.clone();
662 
663         let mut g = m.lock();
664         let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
665         assert!(no_timeout.timed_out());
666 
667         let _t = thread::spawn(move || {
668             let _g = m2.lock();
669             c2.notify_one();
670         });
671         let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value()));
672         assert!(!timeout_res.timed_out());
673 
674         drop(g);
675     }
676 
677     #[test]
wait_until()678     fn wait_until() {
679         let m = Arc::new(Mutex::new(()));
680         let m2 = m.clone();
681         let c = Arc::new(Condvar::new());
682         let c2 = c.clone();
683 
684         let mut g = m.lock();
685         let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
686         assert!(no_timeout.timed_out());
687         let _t = thread::spawn(move || {
688             let _g = m2.lock();
689             c2.notify_one();
690         });
691         let timeout_res = c.wait_until(
692             &mut g,
693             Instant::now() + Duration::from_millis(u32::max_value() as u64),
694         );
695         assert!(!timeout_res.timed_out());
696         drop(g);
697     }
698 
spawn_wait_while_notifier( mutex: Arc<Mutex<u32>>, cv: Arc<Condvar>, num_iters: u32, timeout: Option<Instant>, ) -> JoinHandle<()>699     fn spawn_wait_while_notifier(
700         mutex: Arc<Mutex<u32>>,
701         cv: Arc<Condvar>,
702         num_iters: u32,
703         timeout: Option<Instant>,
704     ) -> JoinHandle<()> {
705         thread::spawn(move || {
706             for epoch in 1..=num_iters {
707                 // spin to wait for main test thread to block
708                 // before notifying it to wake back up and check
709                 // its condition.
710                 let mut sleep_backoff = Duration::from_millis(1);
711                 let _mutex_guard = loop {
712                     let mutex_guard = mutex.lock();
713 
714                     if let Some(timeout) = timeout {
715                         if Instant::now() >= timeout {
716                             return;
717                         }
718                     }
719 
720                     if *mutex_guard == epoch {
721                         break mutex_guard;
722                     }
723 
724                     drop(mutex_guard);
725 
726                     // give main test thread a good chance to
727                     // acquire the lock before this thread does.
728                     sleep(sleep_backoff);
729                     sleep_backoff *= 2;
730                 };
731 
732                 cv.notify_one();
733             }
734         })
735     }
736 
737     #[test]
wait_while_until_internal_does_not_wait_if_initially_false()738     fn wait_while_until_internal_does_not_wait_if_initially_false() {
739         let mutex = Arc::new(Mutex::new(0));
740         let cv = Arc::new(Condvar::new());
741 
742         let condition = |counter: &mut u32| {
743             *counter += 1;
744             false
745         };
746 
747         let mut mutex_guard = mutex.lock();
748         let timeout_result = cv
749             .wait_while_until_internal(&mut mutex_guard, condition, None);
750 
751         assert!(!timeout_result.timed_out());
752         assert!(*mutex_guard == 1);
753     }
754 
755     #[test]
wait_while_until_internal_times_out_before_false()756     fn wait_while_until_internal_times_out_before_false() {
757         let mutex = Arc::new(Mutex::new(0));
758         let cv = Arc::new(Condvar::new());
759 
760         let num_iters = 3;
761         let condition = |counter: &mut u32| {
762             *counter += 1;
763             true
764         };
765 
766         let mut mutex_guard = mutex.lock();
767         let timeout = Some(Instant::now() + Duration::from_millis(500));
768         let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, timeout);
769 
770         let timeout_result =
771             cv.wait_while_until_internal(&mut mutex_guard, condition, timeout);
772 
773         assert!(timeout_result.timed_out());
774         assert!(*mutex_guard == num_iters + 1);
775 
776         // prevent deadlock with notifier
777         drop(mutex_guard);
778         handle.join().unwrap();
779     }
780 
781     #[test]
wait_while_until_internal()782     fn wait_while_until_internal() {
783         let mutex = Arc::new(Mutex::new(0));
784         let cv = Arc::new(Condvar::new());
785 
786         let num_iters = 4;
787 
788         let condition = |counter: &mut u32| {
789             *counter += 1;
790             *counter <= num_iters
791         };
792 
793         let mut mutex_guard = mutex.lock();
794         let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None);
795 
796         let timeout_result =
797             cv.wait_while_until_internal(&mut mutex_guard, condition, None);
798 
799         assert!(!timeout_result.timed_out());
800         assert!(*mutex_guard == num_iters + 1);
801 
802         let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
803         handle.join().unwrap();
804 
805         assert!(!timeout_result.timed_out());
806         assert!(*mutex_guard == num_iters + 2);
807     }
808 
809     #[test]
810     #[should_panic]
two_mutexes()811     fn two_mutexes() {
812         let m = Arc::new(Mutex::new(()));
813         let m2 = m.clone();
814         let m3 = Arc::new(Mutex::new(()));
815         let c = Arc::new(Condvar::new());
816         let c2 = c.clone();
817 
818         // Make sure we don't leave the child thread dangling
819         struct PanicGuard<'a>(&'a Condvar);
820         impl<'a> Drop for PanicGuard<'a> {
821             fn drop(&mut self) {
822                 self.0.notify_one();
823             }
824         }
825 
826         let (tx, rx) = channel();
827         let g = m.lock();
828         let _t = thread::spawn(move || {
829             let mut g = m2.lock();
830             tx.send(()).unwrap();
831             c2.wait(&mut g);
832         });
833         drop(g);
834         rx.recv().unwrap();
835         let _g = m.lock();
836         let _guard = PanicGuard(&*c);
837         c.wait(&mut m3.lock());
838     }
839 
840     #[test]
two_mutexes_disjoint()841     fn two_mutexes_disjoint() {
842         let m = Arc::new(Mutex::new(()));
843         let m2 = m.clone();
844         let m3 = Arc::new(Mutex::new(()));
845         let c = Arc::new(Condvar::new());
846         let c2 = c.clone();
847 
848         let mut g = m.lock();
849         let _t = thread::spawn(move || {
850             let _g = m2.lock();
851             c2.notify_one();
852         });
853         c.wait(&mut g);
854         drop(g);
855 
856         let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
857     }
858 
859     #[test]
test_debug_condvar()860     fn test_debug_condvar() {
861         let c = Condvar::new();
862         assert_eq!(format!("{:?}", c), "Condvar { .. }");
863     }
864 
865     #[test]
test_condvar_requeue()866     fn test_condvar_requeue() {
867         let m = Arc::new(Mutex::new(()));
868         let m2 = m.clone();
869         let c = Arc::new(Condvar::new());
870         let c2 = c.clone();
871         let t = thread::spawn(move || {
872             let mut g = m2.lock();
873             c2.wait(&mut g);
874         });
875 
876         let mut g = m.lock();
877         while !c.notify_one() {
878             // Wait for the thread to get into wait()
879             MutexGuard::bump(&mut g);
880             // Yield, so the other thread gets a chance to do something.
881             // (At least Miri needs this, because it doesn't preempt threads.)
882             thread::yield_now();
883         }
884         // The thread should have been requeued to the mutex, which we wake up now.
885         drop(g);
886         t.join().unwrap();
887     }
888 
889     #[test]
test_issue_129()890     fn test_issue_129() {
891         let locks = Arc::new((Mutex::new(()), Condvar::new()));
892 
893         let (tx, rx) = channel();
894         for _ in 0..4 {
895             let locks = locks.clone();
896             let tx = tx.clone();
897             thread::spawn(move || {
898                 let mut guard = locks.0.lock();
899                 locks.1.wait(&mut guard);
900                 locks.1.wait_for(&mut guard, Duration::from_millis(1));
901                 locks.1.notify_one();
902                 tx.send(()).unwrap();
903             });
904         }
905 
906         thread::sleep(Duration::from_millis(100));
907         locks.1.notify_one();
908 
909         for _ in 0..4 {
910             assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
911         }
912     }
913 }
914 
915 /// This module contains an integration test that is heavily inspired from WebKit's own integration
916 /// tests for it's own Condvar.
917 #[cfg(test)]
918 mod webkit_queue_test {
919     use crate::{Condvar, Mutex, MutexGuard};
920     use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
921 
922     #[derive(Clone, Copy)]
923     enum Timeout {
924         Bounded(Duration),
925         Forever,
926     }
927 
928     #[derive(Clone, Copy)]
929     enum NotifyStyle {
930         One,
931         All,
932     }
933 
934     struct Queue {
935         items: VecDeque<usize>,
936         should_continue: bool,
937     }
938 
939     impl Queue {
new() -> Self940         fn new() -> Self {
941             Self {
942                 items: VecDeque::new(),
943                 should_continue: true,
944             }
945         }
946     }
947 
wait<T: ?Sized>( condition: &Condvar, lock: &mut MutexGuard<'_, T>, predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, timeout: &Timeout, )948     fn wait<T: ?Sized>(
949         condition: &Condvar,
950         lock: &mut MutexGuard<'_, T>,
951         predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
952         timeout: &Timeout,
953     ) {
954         while !predicate(lock) {
955             match timeout {
956                 Timeout::Forever => condition.wait(lock),
957                 Timeout::Bounded(bound) => {
958                     condition.wait_for(lock, *bound);
959                 }
960             }
961         }
962     }
963 
notify(style: NotifyStyle, condition: &Condvar, should_notify: bool)964     fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
965         match style {
966             NotifyStyle::One => {
967                 condition.notify_one();
968             }
969             NotifyStyle::All => {
970                 if should_notify {
971                     condition.notify_all();
972                 }
973             }
974         }
975     }
976 
run_queue_test( num_producers: usize, num_consumers: usize, max_queue_size: usize, messages_per_producer: usize, notify_style: NotifyStyle, timeout: Timeout, delay: Duration, )977     fn run_queue_test(
978         num_producers: usize,
979         num_consumers: usize,
980         max_queue_size: usize,
981         messages_per_producer: usize,
982         notify_style: NotifyStyle,
983         timeout: Timeout,
984         delay: Duration,
985     ) {
986         let input_queue = Arc::new(Mutex::new(Queue::new()));
987         let empty_condition = Arc::new(Condvar::new());
988         let full_condition = Arc::new(Condvar::new());
989 
990         let output_vec = Arc::new(Mutex::new(vec![]));
991 
992         let consumers = (0..num_consumers)
993             .map(|_| {
994                 consumer_thread(
995                     input_queue.clone(),
996                     empty_condition.clone(),
997                     full_condition.clone(),
998                     timeout,
999                     notify_style,
1000                     output_vec.clone(),
1001                     max_queue_size,
1002                 )
1003             })
1004             .collect::<Vec<_>>();
1005         let producers = (0..num_producers)
1006             .map(|_| {
1007                 producer_thread(
1008                     messages_per_producer,
1009                     input_queue.clone(),
1010                     empty_condition.clone(),
1011                     full_condition.clone(),
1012                     timeout,
1013                     notify_style,
1014                     max_queue_size,
1015                 )
1016             })
1017             .collect::<Vec<_>>();
1018 
1019         thread::sleep(delay);
1020 
1021         for producer in producers.into_iter() {
1022             producer.join().expect("Producer thread panicked");
1023         }
1024 
1025         {
1026             let mut input_queue = input_queue.lock();
1027             input_queue.should_continue = false;
1028         }
1029         empty_condition.notify_all();
1030 
1031         for consumer in consumers.into_iter() {
1032             consumer.join().expect("Consumer thread panicked");
1033         }
1034 
1035         let mut output_vec = output_vec.lock();
1036         assert_eq!(output_vec.len(), num_producers * messages_per_producer);
1037         output_vec.sort();
1038         for msg_idx in 0..messages_per_producer {
1039             for producer_idx in 0..num_producers {
1040                 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
1041             }
1042         }
1043     }
1044 
consumer_thread( input_queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, output_queue: Arc<Mutex<Vec<usize>>>, max_queue_size: usize, ) -> thread::JoinHandle<()>1045     fn consumer_thread(
1046         input_queue: Arc<Mutex<Queue>>,
1047         empty_condition: Arc<Condvar>,
1048         full_condition: Arc<Condvar>,
1049         timeout: Timeout,
1050         notify_style: NotifyStyle,
1051         output_queue: Arc<Mutex<Vec<usize>>>,
1052         max_queue_size: usize,
1053     ) -> thread::JoinHandle<()> {
1054         thread::spawn(move || loop {
1055             let (should_notify, result) = {
1056                 let mut queue = input_queue.lock();
1057                 wait(
1058                     &*empty_condition,
1059                     &mut queue,
1060                     |state| -> bool { !state.items.is_empty() || !state.should_continue },
1061                     &timeout,
1062                 );
1063                 if queue.items.is_empty() && !queue.should_continue {
1064                     return;
1065                 }
1066                 let should_notify = queue.items.len() == max_queue_size;
1067                 let result = queue.items.pop_front();
1068                 std::mem::drop(queue);
1069                 (should_notify, result)
1070             };
1071             notify(notify_style, &*full_condition, should_notify);
1072 
1073             if let Some(result) = result {
1074                 output_queue.lock().push(result);
1075             }
1076         })
1077     }
1078 
producer_thread( num_messages: usize, queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, max_queue_size: usize, ) -> thread::JoinHandle<()>1079     fn producer_thread(
1080         num_messages: usize,
1081         queue: Arc<Mutex<Queue>>,
1082         empty_condition: Arc<Condvar>,
1083         full_condition: Arc<Condvar>,
1084         timeout: Timeout,
1085         notify_style: NotifyStyle,
1086         max_queue_size: usize,
1087     ) -> thread::JoinHandle<()> {
1088         thread::spawn(move || {
1089             for message in 0..num_messages {
1090                 let should_notify = {
1091                     let mut queue = queue.lock();
1092                     wait(
1093                         &*full_condition,
1094                         &mut queue,
1095                         |state| state.items.len() < max_queue_size,
1096                         &timeout,
1097                     );
1098                     let should_notify = queue.items.is_empty();
1099                     queue.items.push_back(message);
1100                     std::mem::drop(queue);
1101                     should_notify
1102                 };
1103                 notify(notify_style, &*empty_condition, should_notify);
1104             }
1105         })
1106     }
1107 
1108     macro_rules! run_queue_tests {
1109         ( $( $name:ident(
1110             num_producers: $num_producers:expr,
1111             num_consumers: $num_consumers:expr,
1112             max_queue_size: $max_queue_size:expr,
1113             messages_per_producer: $messages_per_producer:expr,
1114             notification_style: $notification_style:expr,
1115             timeout: $timeout:expr,
1116             delay_seconds: $delay_seconds:expr);
1117         )* ) => {
1118             $(#[test]
1119             fn $name() {
1120                 let delay = Duration::from_secs($delay_seconds);
1121                 run_queue_test(
1122                     $num_producers,
1123                     $num_consumers,
1124                     $max_queue_size,
1125                     $messages_per_producer,
1126                     $notification_style,
1127                     $timeout,
1128                     delay,
1129                     );
1130             })*
1131         };
1132     }
1133 
1134     run_queue_tests! {
1135         sanity_check_queue(
1136             num_producers: 1,
1137             num_consumers: 1,
1138             max_queue_size: 1,
1139             messages_per_producer: 100_000,
1140             notification_style: NotifyStyle::All,
1141             timeout: Timeout::Bounded(Duration::from_secs(1)),
1142             delay_seconds: 0
1143         );
1144         sanity_check_queue_timeout(
1145             num_producers: 1,
1146             num_consumers: 1,
1147             max_queue_size: 1,
1148             messages_per_producer: 100_000,
1149             notification_style: NotifyStyle::All,
1150             timeout: Timeout::Forever,
1151             delay_seconds: 0
1152         );
1153         new_test_without_timeout_5(
1154             num_producers: 1,
1155             num_consumers: 5,
1156             max_queue_size: 1,
1157             messages_per_producer: 100_000,
1158             notification_style: NotifyStyle::All,
1159             timeout: Timeout::Forever,
1160             delay_seconds: 0
1161         );
1162         one_producer_one_consumer_one_slot(
1163             num_producers: 1,
1164             num_consumers: 1,
1165             max_queue_size: 1,
1166             messages_per_producer: 100_000,
1167             notification_style: NotifyStyle::All,
1168             timeout: Timeout::Forever,
1169             delay_seconds: 0
1170         );
1171         one_producer_one_consumer_one_slot_timeout(
1172             num_producers: 1,
1173             num_consumers: 1,
1174             max_queue_size: 1,
1175             messages_per_producer: 100_000,
1176             notification_style: NotifyStyle::All,
1177             timeout: Timeout::Forever,
1178             delay_seconds: 1
1179         );
1180         one_producer_one_consumer_hundred_slots(
1181             num_producers: 1,
1182             num_consumers: 1,
1183             max_queue_size: 100,
1184             messages_per_producer: 1_000_000,
1185             notification_style: NotifyStyle::All,
1186             timeout: Timeout::Forever,
1187             delay_seconds: 0
1188         );
1189         ten_producers_one_consumer_one_slot(
1190             num_producers: 10,
1191             num_consumers: 1,
1192             max_queue_size: 1,
1193             messages_per_producer: 10000,
1194             notification_style: NotifyStyle::All,
1195             timeout: Timeout::Forever,
1196             delay_seconds: 0
1197         );
1198         ten_producers_one_consumer_hundred_slots_notify_all(
1199             num_producers: 10,
1200             num_consumers: 1,
1201             max_queue_size: 100,
1202             messages_per_producer: 10000,
1203             notification_style: NotifyStyle::All,
1204             timeout: Timeout::Forever,
1205             delay_seconds: 0
1206         );
1207         ten_producers_one_consumer_hundred_slots_notify_one(
1208             num_producers: 10,
1209             num_consumers: 1,
1210             max_queue_size: 100,
1211             messages_per_producer: 10000,
1212             notification_style: NotifyStyle::One,
1213             timeout: Timeout::Forever,
1214             delay_seconds: 0
1215         );
1216         one_producer_ten_consumers_one_slot(
1217             num_producers: 1,
1218             num_consumers: 10,
1219             max_queue_size: 1,
1220             messages_per_producer: 10000,
1221             notification_style: NotifyStyle::All,
1222             timeout: Timeout::Forever,
1223             delay_seconds: 0
1224         );
1225         one_producer_ten_consumers_hundred_slots_notify_all(
1226             num_producers: 1,
1227             num_consumers: 10,
1228             max_queue_size: 100,
1229             messages_per_producer: 100_000,
1230             notification_style: NotifyStyle::All,
1231             timeout: Timeout::Forever,
1232             delay_seconds: 0
1233         );
1234         one_producer_ten_consumers_hundred_slots_notify_one(
1235             num_producers: 1,
1236             num_consumers: 10,
1237             max_queue_size: 100,
1238             messages_per_producer: 100_000,
1239             notification_style: NotifyStyle::One,
1240             timeout: Timeout::Forever,
1241             delay_seconds: 0
1242         );
1243         ten_producers_ten_consumers_one_slot(
1244             num_producers: 10,
1245             num_consumers: 10,
1246             max_queue_size: 1,
1247             messages_per_producer: 50000,
1248             notification_style: NotifyStyle::All,
1249             timeout: Timeout::Forever,
1250             delay_seconds: 0
1251         );
1252         ten_producers_ten_consumers_hundred_slots_notify_all(
1253             num_producers: 10,
1254             num_consumers: 10,
1255             max_queue_size: 100,
1256             messages_per_producer: 50000,
1257             notification_style: NotifyStyle::All,
1258             timeout: Timeout::Forever,
1259             delay_seconds: 0
1260         );
1261         ten_producers_ten_consumers_hundred_slots_notify_one(
1262             num_producers: 10,
1263             num_consumers: 10,
1264             max_queue_size: 100,
1265             messages_per_producer: 50000,
1266             notification_style: NotifyStyle::One,
1267             timeout: Timeout::Forever,
1268             delay_seconds: 0
1269         );
1270     }
1271 }
1272