1*bb4ee6a4SAndroid Build Coastguard Worker // Copyright 2020 The ChromiumOS Authors
2*bb4ee6a4SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be
3*bb4ee6a4SAndroid Build Coastguard Worker // found in the LICENSE file.
4*bb4ee6a4SAndroid Build Coastguard Worker
5*bb4ee6a4SAndroid Build Coastguard Worker use std::cell::UnsafeCell;
6*bb4ee6a4SAndroid Build Coastguard Worker use std::hint;
7*bb4ee6a4SAndroid Build Coastguard Worker use std::mem;
8*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::atomic::AtomicUsize;
9*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::atomic::Ordering;
10*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Arc;
11*bb4ee6a4SAndroid Build Coastguard Worker
12*bb4ee6a4SAndroid Build Coastguard Worker use super::super::sync::mu::RawRwLock;
13*bb4ee6a4SAndroid Build Coastguard Worker use super::super::sync::mu::RwLockReadGuard;
14*bb4ee6a4SAndroid Build Coastguard Worker use super::super::sync::mu::RwLockWriteGuard;
15*bb4ee6a4SAndroid Build Coastguard Worker use super::super::sync::waiter::Kind as WaiterKind;
16*bb4ee6a4SAndroid Build Coastguard Worker use super::super::sync::waiter::Waiter;
17*bb4ee6a4SAndroid Build Coastguard Worker use super::super::sync::waiter::WaiterAdapter;
18*bb4ee6a4SAndroid Build Coastguard Worker use super::super::sync::waiter::WaiterList;
19*bb4ee6a4SAndroid Build Coastguard Worker use super::super::sync::waiter::WaitingFor;
20*bb4ee6a4SAndroid Build Coastguard Worker
21*bb4ee6a4SAndroid Build Coastguard Worker const SPINLOCK: usize = 1 << 0;
22*bb4ee6a4SAndroid Build Coastguard Worker const HAS_WAITERS: usize = 1 << 1;
23*bb4ee6a4SAndroid Build Coastguard Worker
24*bb4ee6a4SAndroid Build Coastguard Worker /// A primitive to wait for an event to occur without consuming CPU time.
25*bb4ee6a4SAndroid Build Coastguard Worker ///
26*bb4ee6a4SAndroid Build Coastguard Worker /// Condition variables are used in combination with a `RwLock` when a thread wants to wait for some
27*bb4ee6a4SAndroid Build Coastguard Worker /// condition to become true. The condition must always be verified while holding the `RwLock` lock.
28*bb4ee6a4SAndroid Build Coastguard Worker /// It is an error to use a `Condvar` with more than one `RwLock` while there are threads waiting on
29*bb4ee6a4SAndroid Build Coastguard Worker /// the `Condvar`.
30*bb4ee6a4SAndroid Build Coastguard Worker ///
31*bb4ee6a4SAndroid Build Coastguard Worker /// # Examples
32*bb4ee6a4SAndroid Build Coastguard Worker ///
33*bb4ee6a4SAndroid Build Coastguard Worker /// ```edition2018
34*bb4ee6a4SAndroid Build Coastguard Worker /// use std::sync::Arc;
35*bb4ee6a4SAndroid Build Coastguard Worker /// use std::thread;
36*bb4ee6a4SAndroid Build Coastguard Worker /// use std::sync::mpsc::channel;
37*bb4ee6a4SAndroid Build Coastguard Worker ///
38*bb4ee6a4SAndroid Build Coastguard Worker /// use cros_async::{
39*bb4ee6a4SAndroid Build Coastguard Worker /// block_on,
40*bb4ee6a4SAndroid Build Coastguard Worker /// sync::{Condvar, RwLock},
41*bb4ee6a4SAndroid Build Coastguard Worker /// };
42*bb4ee6a4SAndroid Build Coastguard Worker ///
43*bb4ee6a4SAndroid Build Coastguard Worker /// const N: usize = 13;
44*bb4ee6a4SAndroid Build Coastguard Worker ///
45*bb4ee6a4SAndroid Build Coastguard Worker /// // Spawn a few threads to increment a shared variable (non-atomically), and
46*bb4ee6a4SAndroid Build Coastguard Worker /// // let all threads waiting on the Condvar know once the increments are done.
47*bb4ee6a4SAndroid Build Coastguard Worker /// let data = Arc::new(RwLock::new(0));
48*bb4ee6a4SAndroid Build Coastguard Worker /// let cv = Arc::new(Condvar::new());
49*bb4ee6a4SAndroid Build Coastguard Worker ///
50*bb4ee6a4SAndroid Build Coastguard Worker /// for _ in 0..N {
51*bb4ee6a4SAndroid Build Coastguard Worker /// let (data, cv) = (data.clone(), cv.clone());
52*bb4ee6a4SAndroid Build Coastguard Worker /// thread::spawn(move || {
53*bb4ee6a4SAndroid Build Coastguard Worker /// let mut data = block_on(data.lock());
54*bb4ee6a4SAndroid Build Coastguard Worker /// *data += 1;
55*bb4ee6a4SAndroid Build Coastguard Worker /// if *data == N {
56*bb4ee6a4SAndroid Build Coastguard Worker /// cv.notify_all();
57*bb4ee6a4SAndroid Build Coastguard Worker /// }
58*bb4ee6a4SAndroid Build Coastguard Worker /// });
59*bb4ee6a4SAndroid Build Coastguard Worker /// }
60*bb4ee6a4SAndroid Build Coastguard Worker ///
61*bb4ee6a4SAndroid Build Coastguard Worker /// let mut val = block_on(data.lock());
62*bb4ee6a4SAndroid Build Coastguard Worker /// while *val != N {
63*bb4ee6a4SAndroid Build Coastguard Worker /// val = block_on(cv.wait(val));
64*bb4ee6a4SAndroid Build Coastguard Worker /// }
65*bb4ee6a4SAndroid Build Coastguard Worker /// ```
66*bb4ee6a4SAndroid Build Coastguard Worker #[repr(align(128))]
67*bb4ee6a4SAndroid Build Coastguard Worker pub struct Condvar {
68*bb4ee6a4SAndroid Build Coastguard Worker state: AtomicUsize,
69*bb4ee6a4SAndroid Build Coastguard Worker waiters: UnsafeCell<WaiterList>,
70*bb4ee6a4SAndroid Build Coastguard Worker mu: UnsafeCell<usize>,
71*bb4ee6a4SAndroid Build Coastguard Worker }
72*bb4ee6a4SAndroid Build Coastguard Worker
73*bb4ee6a4SAndroid Build Coastguard Worker impl Condvar {
74*bb4ee6a4SAndroid Build Coastguard Worker /// Creates a new condition variable ready to be waited on and notified.
new() -> Condvar75*bb4ee6a4SAndroid Build Coastguard Worker pub fn new() -> Condvar {
76*bb4ee6a4SAndroid Build Coastguard Worker Condvar {
77*bb4ee6a4SAndroid Build Coastguard Worker state: AtomicUsize::new(0),
78*bb4ee6a4SAndroid Build Coastguard Worker waiters: UnsafeCell::new(WaiterList::new(WaiterAdapter::new())),
79*bb4ee6a4SAndroid Build Coastguard Worker mu: UnsafeCell::new(0),
80*bb4ee6a4SAndroid Build Coastguard Worker }
81*bb4ee6a4SAndroid Build Coastguard Worker }
82*bb4ee6a4SAndroid Build Coastguard Worker
83*bb4ee6a4SAndroid Build Coastguard Worker /// Block the current thread until this `Condvar` is notified by another thread.
84*bb4ee6a4SAndroid Build Coastguard Worker ///
85*bb4ee6a4SAndroid Build Coastguard Worker /// This method will atomically unlock the `RwLock` held by `guard` and then block the current
86*bb4ee6a4SAndroid Build Coastguard Worker /// thread. Any call to `notify_one` or `notify_all` after the `RwLock` is unlocked may wake up
87*bb4ee6a4SAndroid Build Coastguard Worker /// the thread.
88*bb4ee6a4SAndroid Build Coastguard Worker ///
89*bb4ee6a4SAndroid Build Coastguard Worker /// To allow for more efficient scheduling, this call may return even when the programmer
90*bb4ee6a4SAndroid Build Coastguard Worker /// doesn't expect the thread to be woken. Therefore, calls to `wait()` should be used inside a
91*bb4ee6a4SAndroid Build Coastguard Worker /// loop that checks the predicate before continuing.
92*bb4ee6a4SAndroid Build Coastguard Worker ///
93*bb4ee6a4SAndroid Build Coastguard Worker /// Callers that are not in an async context may wish to use the `block_on` method to block the
94*bb4ee6a4SAndroid Build Coastguard Worker /// thread until the `Condvar` is notified.
95*bb4ee6a4SAndroid Build Coastguard Worker ///
96*bb4ee6a4SAndroid Build Coastguard Worker /// # Panics
97*bb4ee6a4SAndroid Build Coastguard Worker ///
98*bb4ee6a4SAndroid Build Coastguard Worker /// This method will panic if used with more than one `RwLock` at the same time.
99*bb4ee6a4SAndroid Build Coastguard Worker ///
100*bb4ee6a4SAndroid Build Coastguard Worker /// # Examples
101*bb4ee6a4SAndroid Build Coastguard Worker ///
102*bb4ee6a4SAndroid Build Coastguard Worker /// ```
103*bb4ee6a4SAndroid Build Coastguard Worker /// # use std::sync::Arc;
104*bb4ee6a4SAndroid Build Coastguard Worker /// # use std::thread;
105*bb4ee6a4SAndroid Build Coastguard Worker ///
106*bb4ee6a4SAndroid Build Coastguard Worker /// # use cros_async::{
107*bb4ee6a4SAndroid Build Coastguard Worker /// # block_on,
108*bb4ee6a4SAndroid Build Coastguard Worker /// # sync::{Condvar, RwLock},
109*bb4ee6a4SAndroid Build Coastguard Worker /// # };
110*bb4ee6a4SAndroid Build Coastguard Worker ///
111*bb4ee6a4SAndroid Build Coastguard Worker /// # let mu = Arc::new(RwLock::new(false));
112*bb4ee6a4SAndroid Build Coastguard Worker /// # let cv = Arc::new(Condvar::new());
113*bb4ee6a4SAndroid Build Coastguard Worker /// # let (mu2, cv2) = (mu.clone(), cv.clone());
114*bb4ee6a4SAndroid Build Coastguard Worker ///
115*bb4ee6a4SAndroid Build Coastguard Worker /// # let t = thread::spawn(move || {
116*bb4ee6a4SAndroid Build Coastguard Worker /// # *block_on(mu2.lock()) = true;
117*bb4ee6a4SAndroid Build Coastguard Worker /// # cv2.notify_all();
118*bb4ee6a4SAndroid Build Coastguard Worker /// # });
119*bb4ee6a4SAndroid Build Coastguard Worker ///
120*bb4ee6a4SAndroid Build Coastguard Worker /// let mut ready = block_on(mu.lock());
121*bb4ee6a4SAndroid Build Coastguard Worker /// while !*ready {
122*bb4ee6a4SAndroid Build Coastguard Worker /// ready = block_on(cv.wait(ready));
123*bb4ee6a4SAndroid Build Coastguard Worker /// }
124*bb4ee6a4SAndroid Build Coastguard Worker ///
125*bb4ee6a4SAndroid Build Coastguard Worker /// # t.join().expect("failed to join thread");
126*bb4ee6a4SAndroid Build Coastguard Worker /// ```
127*bb4ee6a4SAndroid Build Coastguard Worker // Clippy doesn't like the lifetime parameters here but doing what it suggests leads to code
128*bb4ee6a4SAndroid Build Coastguard Worker // that doesn't compile.
129*bb4ee6a4SAndroid Build Coastguard Worker #[allow(clippy::needless_lifetimes)]
wait<'g, T>(&self, guard: RwLockWriteGuard<'g, T>) -> RwLockWriteGuard<'g, T>130*bb4ee6a4SAndroid Build Coastguard Worker pub async fn wait<'g, T>(&self, guard: RwLockWriteGuard<'g, T>) -> RwLockWriteGuard<'g, T> {
131*bb4ee6a4SAndroid Build Coastguard Worker let waiter = Arc::new(Waiter::new(
132*bb4ee6a4SAndroid Build Coastguard Worker WaiterKind::Exclusive,
133*bb4ee6a4SAndroid Build Coastguard Worker cancel_waiter,
134*bb4ee6a4SAndroid Build Coastguard Worker self as *const Condvar as usize,
135*bb4ee6a4SAndroid Build Coastguard Worker WaitingFor::Condvar,
136*bb4ee6a4SAndroid Build Coastguard Worker ));
137*bb4ee6a4SAndroid Build Coastguard Worker
138*bb4ee6a4SAndroid Build Coastguard Worker self.add_waiter(waiter.clone(), guard.as_raw_rwlock());
139*bb4ee6a4SAndroid Build Coastguard Worker
140*bb4ee6a4SAndroid Build Coastguard Worker // Get a reference to the rwlock and then drop the lock.
141*bb4ee6a4SAndroid Build Coastguard Worker let mu = guard.into_inner();
142*bb4ee6a4SAndroid Build Coastguard Worker
143*bb4ee6a4SAndroid Build Coastguard Worker // Wait to be woken up.
144*bb4ee6a4SAndroid Build Coastguard Worker waiter.wait().await;
145*bb4ee6a4SAndroid Build Coastguard Worker
146*bb4ee6a4SAndroid Build Coastguard Worker // Now re-acquire the lock.
147*bb4ee6a4SAndroid Build Coastguard Worker mu.lock_from_cv().await
148*bb4ee6a4SAndroid Build Coastguard Worker }
149*bb4ee6a4SAndroid Build Coastguard Worker
150*bb4ee6a4SAndroid Build Coastguard Worker /// Like `wait()` but takes and returns a `RwLockReadGuard` instead.
151*bb4ee6a4SAndroid Build Coastguard Worker // Clippy doesn't like the lifetime parameters here but doing what it suggests leads to code
152*bb4ee6a4SAndroid Build Coastguard Worker // that doesn't compile.
153*bb4ee6a4SAndroid Build Coastguard Worker #[allow(clippy::needless_lifetimes)]
wait_read<'g, T>(&self, guard: RwLockReadGuard<'g, T>) -> RwLockReadGuard<'g, T>154*bb4ee6a4SAndroid Build Coastguard Worker pub async fn wait_read<'g, T>(&self, guard: RwLockReadGuard<'g, T>) -> RwLockReadGuard<'g, T> {
155*bb4ee6a4SAndroid Build Coastguard Worker let waiter = Arc::new(Waiter::new(
156*bb4ee6a4SAndroid Build Coastguard Worker WaiterKind::Shared,
157*bb4ee6a4SAndroid Build Coastguard Worker cancel_waiter,
158*bb4ee6a4SAndroid Build Coastguard Worker self as *const Condvar as usize,
159*bb4ee6a4SAndroid Build Coastguard Worker WaitingFor::Condvar,
160*bb4ee6a4SAndroid Build Coastguard Worker ));
161*bb4ee6a4SAndroid Build Coastguard Worker
162*bb4ee6a4SAndroid Build Coastguard Worker self.add_waiter(waiter.clone(), guard.as_raw_rwlock());
163*bb4ee6a4SAndroid Build Coastguard Worker
164*bb4ee6a4SAndroid Build Coastguard Worker // Get a reference to the rwlock and then drop the lock.
165*bb4ee6a4SAndroid Build Coastguard Worker let mu = guard.into_inner();
166*bb4ee6a4SAndroid Build Coastguard Worker
167*bb4ee6a4SAndroid Build Coastguard Worker // Wait to be woken up.
168*bb4ee6a4SAndroid Build Coastguard Worker waiter.wait().await;
169*bb4ee6a4SAndroid Build Coastguard Worker
170*bb4ee6a4SAndroid Build Coastguard Worker // Now re-acquire the lock.
171*bb4ee6a4SAndroid Build Coastguard Worker mu.read_lock_from_cv().await
172*bb4ee6a4SAndroid Build Coastguard Worker }
173*bb4ee6a4SAndroid Build Coastguard Worker
add_waiter(&self, waiter: Arc<Waiter>, raw_rwlock: &RawRwLock)174*bb4ee6a4SAndroid Build Coastguard Worker fn add_waiter(&self, waiter: Arc<Waiter>, raw_rwlock: &RawRwLock) {
175*bb4ee6a4SAndroid Build Coastguard Worker // Acquire the spin lock.
176*bb4ee6a4SAndroid Build Coastguard Worker let mut oldstate = self.state.load(Ordering::Relaxed);
177*bb4ee6a4SAndroid Build Coastguard Worker while (oldstate & SPINLOCK) != 0
178*bb4ee6a4SAndroid Build Coastguard Worker || self
179*bb4ee6a4SAndroid Build Coastguard Worker .state
180*bb4ee6a4SAndroid Build Coastguard Worker .compare_exchange_weak(
181*bb4ee6a4SAndroid Build Coastguard Worker oldstate,
182*bb4ee6a4SAndroid Build Coastguard Worker oldstate | SPINLOCK | HAS_WAITERS,
183*bb4ee6a4SAndroid Build Coastguard Worker Ordering::Acquire,
184*bb4ee6a4SAndroid Build Coastguard Worker Ordering::Relaxed,
185*bb4ee6a4SAndroid Build Coastguard Worker )
186*bb4ee6a4SAndroid Build Coastguard Worker .is_err()
187*bb4ee6a4SAndroid Build Coastguard Worker {
188*bb4ee6a4SAndroid Build Coastguard Worker hint::spin_loop();
189*bb4ee6a4SAndroid Build Coastguard Worker oldstate = self.state.load(Ordering::Relaxed);
190*bb4ee6a4SAndroid Build Coastguard Worker }
191*bb4ee6a4SAndroid Build Coastguard Worker
192*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY:
193*bb4ee6a4SAndroid Build Coastguard Worker // Safe because the spin lock guarantees exclusive access and the reference does not escape
194*bb4ee6a4SAndroid Build Coastguard Worker // this function.
195*bb4ee6a4SAndroid Build Coastguard Worker let mu = unsafe { &mut *self.mu.get() };
196*bb4ee6a4SAndroid Build Coastguard Worker let muptr = raw_rwlock as *const RawRwLock as usize;
197*bb4ee6a4SAndroid Build Coastguard Worker
198*bb4ee6a4SAndroid Build Coastguard Worker match *mu {
199*bb4ee6a4SAndroid Build Coastguard Worker 0 => *mu = muptr,
200*bb4ee6a4SAndroid Build Coastguard Worker p if p == muptr => {}
201*bb4ee6a4SAndroid Build Coastguard Worker _ => panic!("Attempting to use Condvar with more than one RwLock at the same time"),
202*bb4ee6a4SAndroid Build Coastguard Worker }
203*bb4ee6a4SAndroid Build Coastguard Worker
204*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY:
205*bb4ee6a4SAndroid Build Coastguard Worker // Safe because the spin lock guarantees exclusive access.
206*bb4ee6a4SAndroid Build Coastguard Worker unsafe { (*self.waiters.get()).push_back(waiter) };
207*bb4ee6a4SAndroid Build Coastguard Worker
208*bb4ee6a4SAndroid Build Coastguard Worker // Release the spin lock. Use a direct store here because no other thread can modify
209*bb4ee6a4SAndroid Build Coastguard Worker // `self.state` while we hold the spin lock. Keep the `HAS_WAITERS` bit that we set earlier
210*bb4ee6a4SAndroid Build Coastguard Worker // because we just added a waiter.
211*bb4ee6a4SAndroid Build Coastguard Worker self.state.store(HAS_WAITERS, Ordering::Release);
212*bb4ee6a4SAndroid Build Coastguard Worker }
213*bb4ee6a4SAndroid Build Coastguard Worker
214*bb4ee6a4SAndroid Build Coastguard Worker /// Notify at most one thread currently waiting on the `Condvar`.
215*bb4ee6a4SAndroid Build Coastguard Worker ///
216*bb4ee6a4SAndroid Build Coastguard Worker /// If there is a thread currently waiting on the `Condvar` it will be woken up from its call to
217*bb4ee6a4SAndroid Build Coastguard Worker /// `wait`.
218*bb4ee6a4SAndroid Build Coastguard Worker ///
219*bb4ee6a4SAndroid Build Coastguard Worker /// Unlike more traditional condition variable interfaces, this method requires a reference to
220*bb4ee6a4SAndroid Build Coastguard Worker /// the `RwLock` associated with this `Condvar`. This is because it is inherently racy to call
221*bb4ee6a4SAndroid Build Coastguard Worker /// `notify_one` or `notify_all` without first acquiring the `RwLock` lock. Additionally, taking
222*bb4ee6a4SAndroid Build Coastguard Worker /// a reference to the `RwLock` here allows us to make some optimizations that can improve
223*bb4ee6a4SAndroid Build Coastguard Worker /// performance by reducing unnecessary wakeups.
notify_one(&self)224*bb4ee6a4SAndroid Build Coastguard Worker pub fn notify_one(&self) {
225*bb4ee6a4SAndroid Build Coastguard Worker let mut oldstate = self.state.load(Ordering::Relaxed);
226*bb4ee6a4SAndroid Build Coastguard Worker if (oldstate & HAS_WAITERS) == 0 {
227*bb4ee6a4SAndroid Build Coastguard Worker // No waiters.
228*bb4ee6a4SAndroid Build Coastguard Worker return;
229*bb4ee6a4SAndroid Build Coastguard Worker }
230*bb4ee6a4SAndroid Build Coastguard Worker
231*bb4ee6a4SAndroid Build Coastguard Worker while (oldstate & SPINLOCK) != 0
232*bb4ee6a4SAndroid Build Coastguard Worker || self
233*bb4ee6a4SAndroid Build Coastguard Worker .state
234*bb4ee6a4SAndroid Build Coastguard Worker .compare_exchange_weak(
235*bb4ee6a4SAndroid Build Coastguard Worker oldstate,
236*bb4ee6a4SAndroid Build Coastguard Worker oldstate | SPINLOCK,
237*bb4ee6a4SAndroid Build Coastguard Worker Ordering::Acquire,
238*bb4ee6a4SAndroid Build Coastguard Worker Ordering::Relaxed,
239*bb4ee6a4SAndroid Build Coastguard Worker )
240*bb4ee6a4SAndroid Build Coastguard Worker .is_err()
241*bb4ee6a4SAndroid Build Coastguard Worker {
242*bb4ee6a4SAndroid Build Coastguard Worker hint::spin_loop();
243*bb4ee6a4SAndroid Build Coastguard Worker oldstate = self.state.load(Ordering::Relaxed);
244*bb4ee6a4SAndroid Build Coastguard Worker }
245*bb4ee6a4SAndroid Build Coastguard Worker
246*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY:
247*bb4ee6a4SAndroid Build Coastguard Worker // Safe because the spin lock guarantees exclusive access and the reference does not escape
248*bb4ee6a4SAndroid Build Coastguard Worker // this function.
249*bb4ee6a4SAndroid Build Coastguard Worker let waiters = unsafe { &mut *self.waiters.get() };
250*bb4ee6a4SAndroid Build Coastguard Worker let wake_list = get_wake_list(waiters);
251*bb4ee6a4SAndroid Build Coastguard Worker
252*bb4ee6a4SAndroid Build Coastguard Worker let newstate = if waiters.is_empty() {
253*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY:
254*bb4ee6a4SAndroid Build Coastguard Worker // Also clear the rwlock associated with this Condvar since there are no longer any
255*bb4ee6a4SAndroid Build Coastguard Worker // waiters. Safe because the spin lock guarantees exclusive access.
256*bb4ee6a4SAndroid Build Coastguard Worker unsafe { *self.mu.get() = 0 };
257*bb4ee6a4SAndroid Build Coastguard Worker
258*bb4ee6a4SAndroid Build Coastguard Worker // We are releasing the spin lock and there are no more waiters so we can clear all bits
259*bb4ee6a4SAndroid Build Coastguard Worker // in `self.state`.
260*bb4ee6a4SAndroid Build Coastguard Worker 0
261*bb4ee6a4SAndroid Build Coastguard Worker } else {
262*bb4ee6a4SAndroid Build Coastguard Worker // There are still waiters so we need to keep the HAS_WAITERS bit in the state.
263*bb4ee6a4SAndroid Build Coastguard Worker HAS_WAITERS
264*bb4ee6a4SAndroid Build Coastguard Worker };
265*bb4ee6a4SAndroid Build Coastguard Worker
266*bb4ee6a4SAndroid Build Coastguard Worker // Release the spin lock.
267*bb4ee6a4SAndroid Build Coastguard Worker self.state.store(newstate, Ordering::Release);
268*bb4ee6a4SAndroid Build Coastguard Worker
269*bb4ee6a4SAndroid Build Coastguard Worker // Now wake any waiters in the wake list.
270*bb4ee6a4SAndroid Build Coastguard Worker for w in wake_list {
271*bb4ee6a4SAndroid Build Coastguard Worker w.wake();
272*bb4ee6a4SAndroid Build Coastguard Worker }
273*bb4ee6a4SAndroid Build Coastguard Worker }
274*bb4ee6a4SAndroid Build Coastguard Worker
275*bb4ee6a4SAndroid Build Coastguard Worker /// Notify all threads currently waiting on the `Condvar`.
276*bb4ee6a4SAndroid Build Coastguard Worker ///
277*bb4ee6a4SAndroid Build Coastguard Worker /// All threads currently waiting on the `Condvar` will be woken up from their call to `wait`.
278*bb4ee6a4SAndroid Build Coastguard Worker ///
279*bb4ee6a4SAndroid Build Coastguard Worker /// Unlike more traditional condition variable interfaces, this method requires a reference to
280*bb4ee6a4SAndroid Build Coastguard Worker /// the `RwLock` associated with this `Condvar`. This is because it is inherently racy to call
281*bb4ee6a4SAndroid Build Coastguard Worker /// `notify_one` or `notify_all` without first acquiring the `RwLock` lock. Additionally, taking
282*bb4ee6a4SAndroid Build Coastguard Worker /// a reference to the `RwLock` here allows us to make some optimizations that can improve
283*bb4ee6a4SAndroid Build Coastguard Worker /// performance by reducing unnecessary wakeups.
notify_all(&self)284*bb4ee6a4SAndroid Build Coastguard Worker pub fn notify_all(&self) {
285*bb4ee6a4SAndroid Build Coastguard Worker let mut oldstate = self.state.load(Ordering::Relaxed);
286*bb4ee6a4SAndroid Build Coastguard Worker if (oldstate & HAS_WAITERS) == 0 {
287*bb4ee6a4SAndroid Build Coastguard Worker // No waiters.
288*bb4ee6a4SAndroid Build Coastguard Worker return;
289*bb4ee6a4SAndroid Build Coastguard Worker }
290*bb4ee6a4SAndroid Build Coastguard Worker
291*bb4ee6a4SAndroid Build Coastguard Worker while (oldstate & SPINLOCK) != 0
292*bb4ee6a4SAndroid Build Coastguard Worker || self
293*bb4ee6a4SAndroid Build Coastguard Worker .state
294*bb4ee6a4SAndroid Build Coastguard Worker .compare_exchange_weak(
295*bb4ee6a4SAndroid Build Coastguard Worker oldstate,
296*bb4ee6a4SAndroid Build Coastguard Worker oldstate | SPINLOCK,
297*bb4ee6a4SAndroid Build Coastguard Worker Ordering::Acquire,
298*bb4ee6a4SAndroid Build Coastguard Worker Ordering::Relaxed,
299*bb4ee6a4SAndroid Build Coastguard Worker )
300*bb4ee6a4SAndroid Build Coastguard Worker .is_err()
301*bb4ee6a4SAndroid Build Coastguard Worker {
302*bb4ee6a4SAndroid Build Coastguard Worker hint::spin_loop();
303*bb4ee6a4SAndroid Build Coastguard Worker oldstate = self.state.load(Ordering::Relaxed);
304*bb4ee6a4SAndroid Build Coastguard Worker }
305*bb4ee6a4SAndroid Build Coastguard Worker
306*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY:
307*bb4ee6a4SAndroid Build Coastguard Worker // Safe because the spin lock guarantees exclusive access to `self.waiters`.
308*bb4ee6a4SAndroid Build Coastguard Worker let wake_list = unsafe { (*self.waiters.get()).take() };
309*bb4ee6a4SAndroid Build Coastguard Worker
310*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY:
311*bb4ee6a4SAndroid Build Coastguard Worker // Clear the rwlock associated with this Condvar since there are no longer any waiters. Safe
312*bb4ee6a4SAndroid Build Coastguard Worker // because we the spin lock guarantees exclusive access.
313*bb4ee6a4SAndroid Build Coastguard Worker unsafe { *self.mu.get() = 0 };
314*bb4ee6a4SAndroid Build Coastguard Worker
315*bb4ee6a4SAndroid Build Coastguard Worker // Mark any waiters left as no longer waiting for the Condvar.
316*bb4ee6a4SAndroid Build Coastguard Worker for w in &wake_list {
317*bb4ee6a4SAndroid Build Coastguard Worker w.set_waiting_for(WaitingFor::None);
318*bb4ee6a4SAndroid Build Coastguard Worker }
319*bb4ee6a4SAndroid Build Coastguard Worker
320*bb4ee6a4SAndroid Build Coastguard Worker // Release the spin lock. We can clear all bits in the state since we took all the waiters.
321*bb4ee6a4SAndroid Build Coastguard Worker self.state.store(0, Ordering::Release);
322*bb4ee6a4SAndroid Build Coastguard Worker
323*bb4ee6a4SAndroid Build Coastguard Worker // Now wake any waiters in the wake list.
324*bb4ee6a4SAndroid Build Coastguard Worker for w in wake_list {
325*bb4ee6a4SAndroid Build Coastguard Worker w.wake();
326*bb4ee6a4SAndroid Build Coastguard Worker }
327*bb4ee6a4SAndroid Build Coastguard Worker }
328*bb4ee6a4SAndroid Build Coastguard Worker
cancel_waiter(&self, waiter: &Waiter, wake_next: bool)329*bb4ee6a4SAndroid Build Coastguard Worker fn cancel_waiter(&self, waiter: &Waiter, wake_next: bool) {
330*bb4ee6a4SAndroid Build Coastguard Worker let mut oldstate = self.state.load(Ordering::Relaxed);
331*bb4ee6a4SAndroid Build Coastguard Worker while oldstate & SPINLOCK != 0
332*bb4ee6a4SAndroid Build Coastguard Worker || self
333*bb4ee6a4SAndroid Build Coastguard Worker .state
334*bb4ee6a4SAndroid Build Coastguard Worker .compare_exchange_weak(
335*bb4ee6a4SAndroid Build Coastguard Worker oldstate,
336*bb4ee6a4SAndroid Build Coastguard Worker oldstate | SPINLOCK,
337*bb4ee6a4SAndroid Build Coastguard Worker Ordering::Acquire,
338*bb4ee6a4SAndroid Build Coastguard Worker Ordering::Relaxed,
339*bb4ee6a4SAndroid Build Coastguard Worker )
340*bb4ee6a4SAndroid Build Coastguard Worker .is_err()
341*bb4ee6a4SAndroid Build Coastguard Worker {
342*bb4ee6a4SAndroid Build Coastguard Worker hint::spin_loop();
343*bb4ee6a4SAndroid Build Coastguard Worker oldstate = self.state.load(Ordering::Relaxed);
344*bb4ee6a4SAndroid Build Coastguard Worker }
345*bb4ee6a4SAndroid Build Coastguard Worker
346*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY:
347*bb4ee6a4SAndroid Build Coastguard Worker // Safe because the spin lock provides exclusive access and the reference does not escape
348*bb4ee6a4SAndroid Build Coastguard Worker // this function.
349*bb4ee6a4SAndroid Build Coastguard Worker let waiters = unsafe { &mut *self.waiters.get() };
350*bb4ee6a4SAndroid Build Coastguard Worker
351*bb4ee6a4SAndroid Build Coastguard Worker let waiting_for = waiter.is_waiting_for();
352*bb4ee6a4SAndroid Build Coastguard Worker // Don't drop the old waiter now as we're still holding the spin lock.
353*bb4ee6a4SAndroid Build Coastguard Worker let old_waiter = if waiter.is_linked() && waiting_for == WaitingFor::Condvar {
354*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY:
355*bb4ee6a4SAndroid Build Coastguard Worker // Safe because we know that the waiter is still linked and is waiting for the Condvar,
356*bb4ee6a4SAndroid Build Coastguard Worker // which guarantees that it is still in `self.waiters`.
357*bb4ee6a4SAndroid Build Coastguard Worker let mut cursor = unsafe { waiters.cursor_mut_from_ptr(waiter as *const Waiter) };
358*bb4ee6a4SAndroid Build Coastguard Worker cursor.remove()
359*bb4ee6a4SAndroid Build Coastguard Worker } else {
360*bb4ee6a4SAndroid Build Coastguard Worker None
361*bb4ee6a4SAndroid Build Coastguard Worker };
362*bb4ee6a4SAndroid Build Coastguard Worker
363*bb4ee6a4SAndroid Build Coastguard Worker let wake_list = if wake_next || waiting_for == WaitingFor::None {
364*bb4ee6a4SAndroid Build Coastguard Worker // Either the waiter was already woken or it's been removed from the condvar's waiter
365*bb4ee6a4SAndroid Build Coastguard Worker // list and is going to be woken. Either way, we need to wake up another thread.
366*bb4ee6a4SAndroid Build Coastguard Worker get_wake_list(waiters)
367*bb4ee6a4SAndroid Build Coastguard Worker } else {
368*bb4ee6a4SAndroid Build Coastguard Worker WaiterList::new(WaiterAdapter::new())
369*bb4ee6a4SAndroid Build Coastguard Worker };
370*bb4ee6a4SAndroid Build Coastguard Worker
371*bb4ee6a4SAndroid Build Coastguard Worker let set_on_release = if waiters.is_empty() {
372*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY:
373*bb4ee6a4SAndroid Build Coastguard Worker // Clear the rwlock associated with this Condvar since there are no longer any waiters.
374*bb4ee6a4SAndroid Build Coastguard Worker // Safe because we the spin lock guarantees exclusive access.
375*bb4ee6a4SAndroid Build Coastguard Worker unsafe { *self.mu.get() = 0 };
376*bb4ee6a4SAndroid Build Coastguard Worker
377*bb4ee6a4SAndroid Build Coastguard Worker 0
378*bb4ee6a4SAndroid Build Coastguard Worker } else {
379*bb4ee6a4SAndroid Build Coastguard Worker HAS_WAITERS
380*bb4ee6a4SAndroid Build Coastguard Worker };
381*bb4ee6a4SAndroid Build Coastguard Worker
382*bb4ee6a4SAndroid Build Coastguard Worker self.state.store(set_on_release, Ordering::Release);
383*bb4ee6a4SAndroid Build Coastguard Worker
384*bb4ee6a4SAndroid Build Coastguard Worker // Now wake any waiters still left in the wake list.
385*bb4ee6a4SAndroid Build Coastguard Worker for w in wake_list {
386*bb4ee6a4SAndroid Build Coastguard Worker w.wake();
387*bb4ee6a4SAndroid Build Coastguard Worker }
388*bb4ee6a4SAndroid Build Coastguard Worker
389*bb4ee6a4SAndroid Build Coastguard Worker mem::drop(old_waiter);
390*bb4ee6a4SAndroid Build Coastguard Worker }
391*bb4ee6a4SAndroid Build Coastguard Worker }
392*bb4ee6a4SAndroid Build Coastguard Worker
393*bb4ee6a4SAndroid Build Coastguard Worker // TODO(b/315998194): Add safety comment
394*bb4ee6a4SAndroid Build Coastguard Worker #[allow(clippy::undocumented_unsafe_blocks)]
395*bb4ee6a4SAndroid Build Coastguard Worker unsafe impl Send for Condvar {}
396*bb4ee6a4SAndroid Build Coastguard Worker // TODO(b/315998194): Add safety comment
397*bb4ee6a4SAndroid Build Coastguard Worker #[allow(clippy::undocumented_unsafe_blocks)]
398*bb4ee6a4SAndroid Build Coastguard Worker unsafe impl Sync for Condvar {}
399*bb4ee6a4SAndroid Build Coastguard Worker
400*bb4ee6a4SAndroid Build Coastguard Worker impl Default for Condvar {
default() -> Self401*bb4ee6a4SAndroid Build Coastguard Worker fn default() -> Self {
402*bb4ee6a4SAndroid Build Coastguard Worker Self::new()
403*bb4ee6a4SAndroid Build Coastguard Worker }
404*bb4ee6a4SAndroid Build Coastguard Worker }
405*bb4ee6a4SAndroid Build Coastguard Worker
406*bb4ee6a4SAndroid Build Coastguard Worker // Scan `waiters` and return all waiters that should be woken up.
407*bb4ee6a4SAndroid Build Coastguard Worker //
408*bb4ee6a4SAndroid Build Coastguard Worker // If the first waiter is trying to acquire a shared lock, then all waiters in the list that are
409*bb4ee6a4SAndroid Build Coastguard Worker // waiting for a shared lock are also woken up. In addition one writer is woken up, if possible.
410*bb4ee6a4SAndroid Build Coastguard Worker //
411*bb4ee6a4SAndroid Build Coastguard Worker // If the first waiter is trying to acquire an exclusive lock, then only that waiter is returned and
412*bb4ee6a4SAndroid Build Coastguard Worker // the rest of the list is not scanned.
get_wake_list(waiters: &mut WaiterList) -> WaiterList413*bb4ee6a4SAndroid Build Coastguard Worker fn get_wake_list(waiters: &mut WaiterList) -> WaiterList {
414*bb4ee6a4SAndroid Build Coastguard Worker let mut to_wake = WaiterList::new(WaiterAdapter::new());
415*bb4ee6a4SAndroid Build Coastguard Worker let mut cursor = waiters.front_mut();
416*bb4ee6a4SAndroid Build Coastguard Worker
417*bb4ee6a4SAndroid Build Coastguard Worker let mut waking_readers = false;
418*bb4ee6a4SAndroid Build Coastguard Worker let mut all_readers = true;
419*bb4ee6a4SAndroid Build Coastguard Worker while let Some(w) = cursor.get() {
420*bb4ee6a4SAndroid Build Coastguard Worker match w.kind() {
421*bb4ee6a4SAndroid Build Coastguard Worker WaiterKind::Exclusive if !waking_readers => {
422*bb4ee6a4SAndroid Build Coastguard Worker // This is the first waiter and it's a writer. No need to check the other waiters.
423*bb4ee6a4SAndroid Build Coastguard Worker // Also mark the waiter as having been removed from the Condvar's waiter list.
424*bb4ee6a4SAndroid Build Coastguard Worker let waiter = cursor.remove().unwrap();
425*bb4ee6a4SAndroid Build Coastguard Worker waiter.set_waiting_for(WaitingFor::None);
426*bb4ee6a4SAndroid Build Coastguard Worker to_wake.push_back(waiter);
427*bb4ee6a4SAndroid Build Coastguard Worker break;
428*bb4ee6a4SAndroid Build Coastguard Worker }
429*bb4ee6a4SAndroid Build Coastguard Worker
430*bb4ee6a4SAndroid Build Coastguard Worker WaiterKind::Shared => {
431*bb4ee6a4SAndroid Build Coastguard Worker // This is a reader and the first waiter in the list was not a writer so wake up all
432*bb4ee6a4SAndroid Build Coastguard Worker // the readers in the wait list.
433*bb4ee6a4SAndroid Build Coastguard Worker let waiter = cursor.remove().unwrap();
434*bb4ee6a4SAndroid Build Coastguard Worker waiter.set_waiting_for(WaitingFor::None);
435*bb4ee6a4SAndroid Build Coastguard Worker to_wake.push_back(waiter);
436*bb4ee6a4SAndroid Build Coastguard Worker waking_readers = true;
437*bb4ee6a4SAndroid Build Coastguard Worker }
438*bb4ee6a4SAndroid Build Coastguard Worker
439*bb4ee6a4SAndroid Build Coastguard Worker WaiterKind::Exclusive => {
440*bb4ee6a4SAndroid Build Coastguard Worker debug_assert!(waking_readers);
441*bb4ee6a4SAndroid Build Coastguard Worker if all_readers {
442*bb4ee6a4SAndroid Build Coastguard Worker // We are waking readers but we need to ensure that at least one writer is woken
443*bb4ee6a4SAndroid Build Coastguard Worker // up. Since we haven't yet woken up a writer, wake up this one.
444*bb4ee6a4SAndroid Build Coastguard Worker let waiter = cursor.remove().unwrap();
445*bb4ee6a4SAndroid Build Coastguard Worker waiter.set_waiting_for(WaitingFor::None);
446*bb4ee6a4SAndroid Build Coastguard Worker to_wake.push_back(waiter);
447*bb4ee6a4SAndroid Build Coastguard Worker all_readers = false;
448*bb4ee6a4SAndroid Build Coastguard Worker } else {
449*bb4ee6a4SAndroid Build Coastguard Worker // We are waking readers and have already woken one writer. Skip this one.
450*bb4ee6a4SAndroid Build Coastguard Worker cursor.move_next();
451*bb4ee6a4SAndroid Build Coastguard Worker }
452*bb4ee6a4SAndroid Build Coastguard Worker }
453*bb4ee6a4SAndroid Build Coastguard Worker }
454*bb4ee6a4SAndroid Build Coastguard Worker }
455*bb4ee6a4SAndroid Build Coastguard Worker
456*bb4ee6a4SAndroid Build Coastguard Worker to_wake
457*bb4ee6a4SAndroid Build Coastguard Worker }
458*bb4ee6a4SAndroid Build Coastguard Worker
cancel_waiter(cv: usize, waiter: &Waiter, wake_next: bool)459*bb4ee6a4SAndroid Build Coastguard Worker fn cancel_waiter(cv: usize, waiter: &Waiter, wake_next: bool) {
460*bb4ee6a4SAndroid Build Coastguard Worker let condvar = cv as *const Condvar;
461*bb4ee6a4SAndroid Build Coastguard Worker
462*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY:
463*bb4ee6a4SAndroid Build Coastguard Worker // Safe because the thread that owns the waiter being canceled must also own a reference to the
464*bb4ee6a4SAndroid Build Coastguard Worker // Condvar, which guarantees that this pointer is valid.
465*bb4ee6a4SAndroid Build Coastguard Worker unsafe { (*condvar).cancel_waiter(waiter, wake_next) }
466*bb4ee6a4SAndroid Build Coastguard Worker }
467*bb4ee6a4SAndroid Build Coastguard Worker
468*bb4ee6a4SAndroid Build Coastguard Worker // TODO(b/194338842): Fix tests for windows
469*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))]
470*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(test)]
471*bb4ee6a4SAndroid Build Coastguard Worker mod test {
472*bb4ee6a4SAndroid Build Coastguard Worker use std::future::Future;
473*bb4ee6a4SAndroid Build Coastguard Worker use std::mem;
474*bb4ee6a4SAndroid Build Coastguard Worker use std::ptr;
475*bb4ee6a4SAndroid Build Coastguard Worker use std::rc::Rc;
476*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::mpsc::channel;
477*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::mpsc::Sender;
478*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Arc;
479*bb4ee6a4SAndroid Build Coastguard Worker use std::task::Context;
480*bb4ee6a4SAndroid Build Coastguard Worker use std::task::Poll;
481*bb4ee6a4SAndroid Build Coastguard Worker use std::thread;
482*bb4ee6a4SAndroid Build Coastguard Worker use std::thread::JoinHandle;
483*bb4ee6a4SAndroid Build Coastguard Worker use std::time::Duration;
484*bb4ee6a4SAndroid Build Coastguard Worker
485*bb4ee6a4SAndroid Build Coastguard Worker use futures::channel::oneshot;
486*bb4ee6a4SAndroid Build Coastguard Worker use futures::select;
487*bb4ee6a4SAndroid Build Coastguard Worker use futures::task::waker_ref;
488*bb4ee6a4SAndroid Build Coastguard Worker use futures::task::ArcWake;
489*bb4ee6a4SAndroid Build Coastguard Worker use futures::FutureExt;
490*bb4ee6a4SAndroid Build Coastguard Worker use futures_executor::LocalPool;
491*bb4ee6a4SAndroid Build Coastguard Worker use futures_executor::LocalSpawner;
492*bb4ee6a4SAndroid Build Coastguard Worker use futures_executor::ThreadPool;
493*bb4ee6a4SAndroid Build Coastguard Worker use futures_util::task::LocalSpawnExt;
494*bb4ee6a4SAndroid Build Coastguard Worker
495*bb4ee6a4SAndroid Build Coastguard Worker use super::super::super::block_on;
496*bb4ee6a4SAndroid Build Coastguard Worker use super::super::super::sync::RwLock;
497*bb4ee6a4SAndroid Build Coastguard Worker use super::*;
498*bb4ee6a4SAndroid Build Coastguard Worker
499*bb4ee6a4SAndroid Build Coastguard Worker // Dummy waker used when we want to manually drive futures.
500*bb4ee6a4SAndroid Build Coastguard Worker struct TestWaker;
501*bb4ee6a4SAndroid Build Coastguard Worker impl ArcWake for TestWaker {
wake_by_ref(_arc_self: &Arc<Self>)502*bb4ee6a4SAndroid Build Coastguard Worker fn wake_by_ref(_arc_self: &Arc<Self>) {}
503*bb4ee6a4SAndroid Build Coastguard Worker }
504*bb4ee6a4SAndroid Build Coastguard Worker
505*bb4ee6a4SAndroid Build Coastguard Worker #[test]
smoke()506*bb4ee6a4SAndroid Build Coastguard Worker fn smoke() {
507*bb4ee6a4SAndroid Build Coastguard Worker let cv = Condvar::new();
508*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_one();
509*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_all();
510*bb4ee6a4SAndroid Build Coastguard Worker }
511*bb4ee6a4SAndroid Build Coastguard Worker
512*bb4ee6a4SAndroid Build Coastguard Worker #[test]
notify_one()513*bb4ee6a4SAndroid Build Coastguard Worker fn notify_one() {
514*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(()));
515*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
516*bb4ee6a4SAndroid Build Coastguard Worker
517*bb4ee6a4SAndroid Build Coastguard Worker let mu2 = mu.clone();
518*bb4ee6a4SAndroid Build Coastguard Worker let cv2 = cv.clone();
519*bb4ee6a4SAndroid Build Coastguard Worker
520*bb4ee6a4SAndroid Build Coastguard Worker let guard = block_on(mu.lock());
521*bb4ee6a4SAndroid Build Coastguard Worker thread::spawn(move || {
522*bb4ee6a4SAndroid Build Coastguard Worker let _g = block_on(mu2.lock());
523*bb4ee6a4SAndroid Build Coastguard Worker cv2.notify_one();
524*bb4ee6a4SAndroid Build Coastguard Worker });
525*bb4ee6a4SAndroid Build Coastguard Worker
526*bb4ee6a4SAndroid Build Coastguard Worker let guard = block_on(cv.wait(guard));
527*bb4ee6a4SAndroid Build Coastguard Worker mem::drop(guard);
528*bb4ee6a4SAndroid Build Coastguard Worker }
529*bb4ee6a4SAndroid Build Coastguard Worker
530*bb4ee6a4SAndroid Build Coastguard Worker #[test]
multi_rwlock()531*bb4ee6a4SAndroid Build Coastguard Worker fn multi_rwlock() {
532*bb4ee6a4SAndroid Build Coastguard Worker const NUM_THREADS: usize = 5;
533*bb4ee6a4SAndroid Build Coastguard Worker
534*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(false));
535*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
536*bb4ee6a4SAndroid Build Coastguard Worker
537*bb4ee6a4SAndroid Build Coastguard Worker let mut threads = Vec::with_capacity(NUM_THREADS);
538*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..NUM_THREADS {
539*bb4ee6a4SAndroid Build Coastguard Worker let mu = mu.clone();
540*bb4ee6a4SAndroid Build Coastguard Worker let cv = cv.clone();
541*bb4ee6a4SAndroid Build Coastguard Worker
542*bb4ee6a4SAndroid Build Coastguard Worker threads.push(thread::spawn(move || {
543*bb4ee6a4SAndroid Build Coastguard Worker let mut ready = block_on(mu.lock());
544*bb4ee6a4SAndroid Build Coastguard Worker while !*ready {
545*bb4ee6a4SAndroid Build Coastguard Worker ready = block_on(cv.wait(ready));
546*bb4ee6a4SAndroid Build Coastguard Worker }
547*bb4ee6a4SAndroid Build Coastguard Worker }));
548*bb4ee6a4SAndroid Build Coastguard Worker }
549*bb4ee6a4SAndroid Build Coastguard Worker
550*bb4ee6a4SAndroid Build Coastguard Worker let mut g = block_on(mu.lock());
551*bb4ee6a4SAndroid Build Coastguard Worker *g = true;
552*bb4ee6a4SAndroid Build Coastguard Worker mem::drop(g);
553*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_all();
554*bb4ee6a4SAndroid Build Coastguard Worker
555*bb4ee6a4SAndroid Build Coastguard Worker threads
556*bb4ee6a4SAndroid Build Coastguard Worker .into_iter()
557*bb4ee6a4SAndroid Build Coastguard Worker .try_for_each(JoinHandle::join)
558*bb4ee6a4SAndroid Build Coastguard Worker .expect("Failed to join threads");
559*bb4ee6a4SAndroid Build Coastguard Worker
560*bb4ee6a4SAndroid Build Coastguard Worker // Now use the Condvar with a different rwlock.
561*bb4ee6a4SAndroid Build Coastguard Worker let alt_mu = Arc::new(RwLock::new(None));
562*bb4ee6a4SAndroid Build Coastguard Worker let alt_mu2 = alt_mu.clone();
563*bb4ee6a4SAndroid Build Coastguard Worker let cv2 = cv.clone();
564*bb4ee6a4SAndroid Build Coastguard Worker let handle = thread::spawn(move || {
565*bb4ee6a4SAndroid Build Coastguard Worker let mut g = block_on(alt_mu2.lock());
566*bb4ee6a4SAndroid Build Coastguard Worker while g.is_none() {
567*bb4ee6a4SAndroid Build Coastguard Worker g = block_on(cv2.wait(g));
568*bb4ee6a4SAndroid Build Coastguard Worker }
569*bb4ee6a4SAndroid Build Coastguard Worker });
570*bb4ee6a4SAndroid Build Coastguard Worker
571*bb4ee6a4SAndroid Build Coastguard Worker let mut alt_g = block_on(alt_mu.lock());
572*bb4ee6a4SAndroid Build Coastguard Worker *alt_g = Some(());
573*bb4ee6a4SAndroid Build Coastguard Worker mem::drop(alt_g);
574*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_all();
575*bb4ee6a4SAndroid Build Coastguard Worker
576*bb4ee6a4SAndroid Build Coastguard Worker handle
577*bb4ee6a4SAndroid Build Coastguard Worker .join()
578*bb4ee6a4SAndroid Build Coastguard Worker .expect("Failed to join thread alternate rwlock");
579*bb4ee6a4SAndroid Build Coastguard Worker }
580*bb4ee6a4SAndroid Build Coastguard Worker
581*bb4ee6a4SAndroid Build Coastguard Worker #[test]
notify_one_single_thread_async()582*bb4ee6a4SAndroid Build Coastguard Worker fn notify_one_single_thread_async() {
583*bb4ee6a4SAndroid Build Coastguard Worker async fn notify(mu: Rc<RwLock<()>>, cv: Rc<Condvar>) {
584*bb4ee6a4SAndroid Build Coastguard Worker let _g = mu.lock().await;
585*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_one();
586*bb4ee6a4SAndroid Build Coastguard Worker }
587*bb4ee6a4SAndroid Build Coastguard Worker
588*bb4ee6a4SAndroid Build Coastguard Worker async fn wait(mu: Rc<RwLock<()>>, cv: Rc<Condvar>, spawner: LocalSpawner) {
589*bb4ee6a4SAndroid Build Coastguard Worker let mu2 = Rc::clone(&mu);
590*bb4ee6a4SAndroid Build Coastguard Worker let cv2 = Rc::clone(&cv);
591*bb4ee6a4SAndroid Build Coastguard Worker
592*bb4ee6a4SAndroid Build Coastguard Worker let g = mu.lock().await;
593*bb4ee6a4SAndroid Build Coastguard Worker // Has to be spawned _after_ acquiring the lock to prevent a race
594*bb4ee6a4SAndroid Build Coastguard Worker // where the notify happens before the waiter has acquired the lock.
595*bb4ee6a4SAndroid Build Coastguard Worker spawner
596*bb4ee6a4SAndroid Build Coastguard Worker .spawn_local(notify(mu2, cv2))
597*bb4ee6a4SAndroid Build Coastguard Worker .expect("Failed to spawn `notify` task");
598*bb4ee6a4SAndroid Build Coastguard Worker let _g = cv.wait(g).await;
599*bb4ee6a4SAndroid Build Coastguard Worker }
600*bb4ee6a4SAndroid Build Coastguard Worker
601*bb4ee6a4SAndroid Build Coastguard Worker let mut ex = LocalPool::new();
602*bb4ee6a4SAndroid Build Coastguard Worker let spawner = ex.spawner();
603*bb4ee6a4SAndroid Build Coastguard Worker
604*bb4ee6a4SAndroid Build Coastguard Worker let mu = Rc::new(RwLock::new(()));
605*bb4ee6a4SAndroid Build Coastguard Worker let cv = Rc::new(Condvar::new());
606*bb4ee6a4SAndroid Build Coastguard Worker
607*bb4ee6a4SAndroid Build Coastguard Worker spawner
608*bb4ee6a4SAndroid Build Coastguard Worker .spawn_local(wait(mu, cv, spawner.clone()))
609*bb4ee6a4SAndroid Build Coastguard Worker .expect("Failed to spawn `wait` task");
610*bb4ee6a4SAndroid Build Coastguard Worker
611*bb4ee6a4SAndroid Build Coastguard Worker ex.run();
612*bb4ee6a4SAndroid Build Coastguard Worker }
613*bb4ee6a4SAndroid Build Coastguard Worker
614*bb4ee6a4SAndroid Build Coastguard Worker #[test]
notify_one_multi_thread_async()615*bb4ee6a4SAndroid Build Coastguard Worker fn notify_one_multi_thread_async() {
616*bb4ee6a4SAndroid Build Coastguard Worker async fn notify(mu: Arc<RwLock<()>>, cv: Arc<Condvar>) {
617*bb4ee6a4SAndroid Build Coastguard Worker let _g = mu.lock().await;
618*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_one();
619*bb4ee6a4SAndroid Build Coastguard Worker }
620*bb4ee6a4SAndroid Build Coastguard Worker
621*bb4ee6a4SAndroid Build Coastguard Worker async fn wait(mu: Arc<RwLock<()>>, cv: Arc<Condvar>, tx: Sender<()>, pool: ThreadPool) {
622*bb4ee6a4SAndroid Build Coastguard Worker let mu2 = Arc::clone(&mu);
623*bb4ee6a4SAndroid Build Coastguard Worker let cv2 = Arc::clone(&cv);
624*bb4ee6a4SAndroid Build Coastguard Worker
625*bb4ee6a4SAndroid Build Coastguard Worker let g = mu.lock().await;
626*bb4ee6a4SAndroid Build Coastguard Worker // Has to be spawned _after_ acquiring the lock to prevent a race
627*bb4ee6a4SAndroid Build Coastguard Worker // where the notify happens before the waiter has acquired the lock.
628*bb4ee6a4SAndroid Build Coastguard Worker pool.spawn_ok(notify(mu2, cv2));
629*bb4ee6a4SAndroid Build Coastguard Worker let _g = cv.wait(g).await;
630*bb4ee6a4SAndroid Build Coastguard Worker
631*bb4ee6a4SAndroid Build Coastguard Worker tx.send(()).expect("Failed to send completion notification");
632*bb4ee6a4SAndroid Build Coastguard Worker }
633*bb4ee6a4SAndroid Build Coastguard Worker
634*bb4ee6a4SAndroid Build Coastguard Worker let ex = ThreadPool::new().expect("Failed to create ThreadPool");
635*bb4ee6a4SAndroid Build Coastguard Worker
636*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(()));
637*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
638*bb4ee6a4SAndroid Build Coastguard Worker
639*bb4ee6a4SAndroid Build Coastguard Worker let (tx, rx) = channel();
640*bb4ee6a4SAndroid Build Coastguard Worker ex.spawn_ok(wait(mu, cv, tx, ex.clone()));
641*bb4ee6a4SAndroid Build Coastguard Worker
642*bb4ee6a4SAndroid Build Coastguard Worker rx.recv_timeout(Duration::from_secs(5))
643*bb4ee6a4SAndroid Build Coastguard Worker .expect("Failed to receive completion notification");
644*bb4ee6a4SAndroid Build Coastguard Worker }
645*bb4ee6a4SAndroid Build Coastguard Worker
646*bb4ee6a4SAndroid Build Coastguard Worker #[test]
notify_one_with_cancel()647*bb4ee6a4SAndroid Build Coastguard Worker fn notify_one_with_cancel() {
648*bb4ee6a4SAndroid Build Coastguard Worker const TASKS: usize = 17;
649*bb4ee6a4SAndroid Build Coastguard Worker const OBSERVERS: usize = 7;
650*bb4ee6a4SAndroid Build Coastguard Worker const ITERATIONS: usize = 103;
651*bb4ee6a4SAndroid Build Coastguard Worker
652*bb4ee6a4SAndroid Build Coastguard Worker async fn observe(mu: &Arc<RwLock<usize>>, cv: &Arc<Condvar>) {
653*bb4ee6a4SAndroid Build Coastguard Worker let mut count = mu.read_lock().await;
654*bb4ee6a4SAndroid Build Coastguard Worker while *count == 0 {
655*bb4ee6a4SAndroid Build Coastguard Worker count = cv.wait_read(count).await;
656*bb4ee6a4SAndroid Build Coastguard Worker }
657*bb4ee6a4SAndroid Build Coastguard Worker // SAFETY: Safe because count is valid and is byte aligned.
658*bb4ee6a4SAndroid Build Coastguard Worker let _ = unsafe { ptr::read_volatile(&*count as *const usize) };
659*bb4ee6a4SAndroid Build Coastguard Worker }
660*bb4ee6a4SAndroid Build Coastguard Worker
661*bb4ee6a4SAndroid Build Coastguard Worker async fn decrement(mu: &Arc<RwLock<usize>>, cv: &Arc<Condvar>) {
662*bb4ee6a4SAndroid Build Coastguard Worker let mut count = mu.lock().await;
663*bb4ee6a4SAndroid Build Coastguard Worker while *count == 0 {
664*bb4ee6a4SAndroid Build Coastguard Worker count = cv.wait(count).await;
665*bb4ee6a4SAndroid Build Coastguard Worker }
666*bb4ee6a4SAndroid Build Coastguard Worker *count -= 1;
667*bb4ee6a4SAndroid Build Coastguard Worker }
668*bb4ee6a4SAndroid Build Coastguard Worker
669*bb4ee6a4SAndroid Build Coastguard Worker async fn increment(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>, done: Sender<()>) {
670*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..TASKS * OBSERVERS * ITERATIONS {
671*bb4ee6a4SAndroid Build Coastguard Worker *mu.lock().await += 1;
672*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_one();
673*bb4ee6a4SAndroid Build Coastguard Worker }
674*bb4ee6a4SAndroid Build Coastguard Worker
675*bb4ee6a4SAndroid Build Coastguard Worker done.send(()).expect("Failed to send completion message");
676*bb4ee6a4SAndroid Build Coastguard Worker }
677*bb4ee6a4SAndroid Build Coastguard Worker
678*bb4ee6a4SAndroid Build Coastguard Worker async fn observe_either(
679*bb4ee6a4SAndroid Build Coastguard Worker mu: Arc<RwLock<usize>>,
680*bb4ee6a4SAndroid Build Coastguard Worker cv: Arc<Condvar>,
681*bb4ee6a4SAndroid Build Coastguard Worker alt_mu: Arc<RwLock<usize>>,
682*bb4ee6a4SAndroid Build Coastguard Worker alt_cv: Arc<Condvar>,
683*bb4ee6a4SAndroid Build Coastguard Worker done: Sender<()>,
684*bb4ee6a4SAndroid Build Coastguard Worker ) {
685*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..ITERATIONS {
686*bb4ee6a4SAndroid Build Coastguard Worker select! {
687*bb4ee6a4SAndroid Build Coastguard Worker () = observe(&mu, &cv).fuse() => {},
688*bb4ee6a4SAndroid Build Coastguard Worker () = observe(&alt_mu, &alt_cv).fuse() => {},
689*bb4ee6a4SAndroid Build Coastguard Worker }
690*bb4ee6a4SAndroid Build Coastguard Worker }
691*bb4ee6a4SAndroid Build Coastguard Worker
692*bb4ee6a4SAndroid Build Coastguard Worker done.send(()).expect("Failed to send completion message");
693*bb4ee6a4SAndroid Build Coastguard Worker }
694*bb4ee6a4SAndroid Build Coastguard Worker
695*bb4ee6a4SAndroid Build Coastguard Worker async fn decrement_either(
696*bb4ee6a4SAndroid Build Coastguard Worker mu: Arc<RwLock<usize>>,
697*bb4ee6a4SAndroid Build Coastguard Worker cv: Arc<Condvar>,
698*bb4ee6a4SAndroid Build Coastguard Worker alt_mu: Arc<RwLock<usize>>,
699*bb4ee6a4SAndroid Build Coastguard Worker alt_cv: Arc<Condvar>,
700*bb4ee6a4SAndroid Build Coastguard Worker done: Sender<()>,
701*bb4ee6a4SAndroid Build Coastguard Worker ) {
702*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..ITERATIONS {
703*bb4ee6a4SAndroid Build Coastguard Worker select! {
704*bb4ee6a4SAndroid Build Coastguard Worker () = decrement(&mu, &cv).fuse() => {},
705*bb4ee6a4SAndroid Build Coastguard Worker () = decrement(&alt_mu, &alt_cv).fuse() => {},
706*bb4ee6a4SAndroid Build Coastguard Worker }
707*bb4ee6a4SAndroid Build Coastguard Worker }
708*bb4ee6a4SAndroid Build Coastguard Worker
709*bb4ee6a4SAndroid Build Coastguard Worker done.send(()).expect("Failed to send completion message");
710*bb4ee6a4SAndroid Build Coastguard Worker }
711*bb4ee6a4SAndroid Build Coastguard Worker
712*bb4ee6a4SAndroid Build Coastguard Worker let ex = ThreadPool::new().expect("Failed to create ThreadPool");
713*bb4ee6a4SAndroid Build Coastguard Worker
714*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(0usize));
715*bb4ee6a4SAndroid Build Coastguard Worker let alt_mu = Arc::new(RwLock::new(0usize));
716*bb4ee6a4SAndroid Build Coastguard Worker
717*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
718*bb4ee6a4SAndroid Build Coastguard Worker let alt_cv = Arc::new(Condvar::new());
719*bb4ee6a4SAndroid Build Coastguard Worker
720*bb4ee6a4SAndroid Build Coastguard Worker let (tx, rx) = channel();
721*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..TASKS {
722*bb4ee6a4SAndroid Build Coastguard Worker ex.spawn_ok(decrement_either(
723*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&mu),
724*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&cv),
725*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&alt_mu),
726*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&alt_cv),
727*bb4ee6a4SAndroid Build Coastguard Worker tx.clone(),
728*bb4ee6a4SAndroid Build Coastguard Worker ));
729*bb4ee6a4SAndroid Build Coastguard Worker }
730*bb4ee6a4SAndroid Build Coastguard Worker
731*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..OBSERVERS {
732*bb4ee6a4SAndroid Build Coastguard Worker ex.spawn_ok(observe_either(
733*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&mu),
734*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&cv),
735*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&alt_mu),
736*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&alt_cv),
737*bb4ee6a4SAndroid Build Coastguard Worker tx.clone(),
738*bb4ee6a4SAndroid Build Coastguard Worker ));
739*bb4ee6a4SAndroid Build Coastguard Worker }
740*bb4ee6a4SAndroid Build Coastguard Worker
741*bb4ee6a4SAndroid Build Coastguard Worker ex.spawn_ok(increment(Arc::clone(&mu), Arc::clone(&cv), tx.clone()));
742*bb4ee6a4SAndroid Build Coastguard Worker ex.spawn_ok(increment(Arc::clone(&alt_mu), Arc::clone(&alt_cv), tx));
743*bb4ee6a4SAndroid Build Coastguard Worker
744*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..TASKS + OBSERVERS + 2 {
745*bb4ee6a4SAndroid Build Coastguard Worker if let Err(e) = rx.recv_timeout(Duration::from_secs(20)) {
746*bb4ee6a4SAndroid Build Coastguard Worker panic!("Error while waiting for threads to complete: {}", e);
747*bb4ee6a4SAndroid Build Coastguard Worker }
748*bb4ee6a4SAndroid Build Coastguard Worker }
749*bb4ee6a4SAndroid Build Coastguard Worker
750*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(
751*bb4ee6a4SAndroid Build Coastguard Worker *block_on(mu.read_lock()) + *block_on(alt_mu.read_lock()),
752*bb4ee6a4SAndroid Build Coastguard Worker (TASKS * OBSERVERS * ITERATIONS * 2) - (TASKS * ITERATIONS)
753*bb4ee6a4SAndroid Build Coastguard Worker );
754*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed), 0);
755*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(alt_cv.state.load(Ordering::Relaxed), 0);
756*bb4ee6a4SAndroid Build Coastguard Worker }
757*bb4ee6a4SAndroid Build Coastguard Worker
758*bb4ee6a4SAndroid Build Coastguard Worker #[test]
notify_all_with_cancel()759*bb4ee6a4SAndroid Build Coastguard Worker fn notify_all_with_cancel() {
760*bb4ee6a4SAndroid Build Coastguard Worker const TASKS: usize = 17;
761*bb4ee6a4SAndroid Build Coastguard Worker const ITERATIONS: usize = 103;
762*bb4ee6a4SAndroid Build Coastguard Worker
763*bb4ee6a4SAndroid Build Coastguard Worker async fn decrement(mu: &Arc<RwLock<usize>>, cv: &Arc<Condvar>) {
764*bb4ee6a4SAndroid Build Coastguard Worker let mut count = mu.lock().await;
765*bb4ee6a4SAndroid Build Coastguard Worker while *count == 0 {
766*bb4ee6a4SAndroid Build Coastguard Worker count = cv.wait(count).await;
767*bb4ee6a4SAndroid Build Coastguard Worker }
768*bb4ee6a4SAndroid Build Coastguard Worker *count -= 1;
769*bb4ee6a4SAndroid Build Coastguard Worker }
770*bb4ee6a4SAndroid Build Coastguard Worker
771*bb4ee6a4SAndroid Build Coastguard Worker async fn increment(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>, done: Sender<()>) {
772*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..TASKS * ITERATIONS {
773*bb4ee6a4SAndroid Build Coastguard Worker *mu.lock().await += 1;
774*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_all();
775*bb4ee6a4SAndroid Build Coastguard Worker }
776*bb4ee6a4SAndroid Build Coastguard Worker
777*bb4ee6a4SAndroid Build Coastguard Worker done.send(()).expect("Failed to send completion message");
778*bb4ee6a4SAndroid Build Coastguard Worker }
779*bb4ee6a4SAndroid Build Coastguard Worker
780*bb4ee6a4SAndroid Build Coastguard Worker async fn decrement_either(
781*bb4ee6a4SAndroid Build Coastguard Worker mu: Arc<RwLock<usize>>,
782*bb4ee6a4SAndroid Build Coastguard Worker cv: Arc<Condvar>,
783*bb4ee6a4SAndroid Build Coastguard Worker alt_mu: Arc<RwLock<usize>>,
784*bb4ee6a4SAndroid Build Coastguard Worker alt_cv: Arc<Condvar>,
785*bb4ee6a4SAndroid Build Coastguard Worker done: Sender<()>,
786*bb4ee6a4SAndroid Build Coastguard Worker ) {
787*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..ITERATIONS {
788*bb4ee6a4SAndroid Build Coastguard Worker select! {
789*bb4ee6a4SAndroid Build Coastguard Worker () = decrement(&mu, &cv).fuse() => {},
790*bb4ee6a4SAndroid Build Coastguard Worker () = decrement(&alt_mu, &alt_cv).fuse() => {},
791*bb4ee6a4SAndroid Build Coastguard Worker }
792*bb4ee6a4SAndroid Build Coastguard Worker }
793*bb4ee6a4SAndroid Build Coastguard Worker
794*bb4ee6a4SAndroid Build Coastguard Worker done.send(()).expect("Failed to send completion message");
795*bb4ee6a4SAndroid Build Coastguard Worker }
796*bb4ee6a4SAndroid Build Coastguard Worker
797*bb4ee6a4SAndroid Build Coastguard Worker let ex = ThreadPool::new().expect("Failed to create ThreadPool");
798*bb4ee6a4SAndroid Build Coastguard Worker
799*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(0usize));
800*bb4ee6a4SAndroid Build Coastguard Worker let alt_mu = Arc::new(RwLock::new(0usize));
801*bb4ee6a4SAndroid Build Coastguard Worker
802*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
803*bb4ee6a4SAndroid Build Coastguard Worker let alt_cv = Arc::new(Condvar::new());
804*bb4ee6a4SAndroid Build Coastguard Worker
805*bb4ee6a4SAndroid Build Coastguard Worker let (tx, rx) = channel();
806*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..TASKS {
807*bb4ee6a4SAndroid Build Coastguard Worker ex.spawn_ok(decrement_either(
808*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&mu),
809*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&cv),
810*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&alt_mu),
811*bb4ee6a4SAndroid Build Coastguard Worker Arc::clone(&alt_cv),
812*bb4ee6a4SAndroid Build Coastguard Worker tx.clone(),
813*bb4ee6a4SAndroid Build Coastguard Worker ));
814*bb4ee6a4SAndroid Build Coastguard Worker }
815*bb4ee6a4SAndroid Build Coastguard Worker
816*bb4ee6a4SAndroid Build Coastguard Worker ex.spawn_ok(increment(Arc::clone(&mu), Arc::clone(&cv), tx.clone()));
817*bb4ee6a4SAndroid Build Coastguard Worker ex.spawn_ok(increment(Arc::clone(&alt_mu), Arc::clone(&alt_cv), tx));
818*bb4ee6a4SAndroid Build Coastguard Worker
819*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..TASKS + 2 {
820*bb4ee6a4SAndroid Build Coastguard Worker if let Err(e) = rx.recv_timeout(Duration::from_secs(10)) {
821*bb4ee6a4SAndroid Build Coastguard Worker panic!("Error while waiting for threads to complete: {}", e);
822*bb4ee6a4SAndroid Build Coastguard Worker }
823*bb4ee6a4SAndroid Build Coastguard Worker }
824*bb4ee6a4SAndroid Build Coastguard Worker
825*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(
826*bb4ee6a4SAndroid Build Coastguard Worker *block_on(mu.read_lock()) + *block_on(alt_mu.read_lock()),
827*bb4ee6a4SAndroid Build Coastguard Worker TASKS * ITERATIONS
828*bb4ee6a4SAndroid Build Coastguard Worker );
829*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed), 0);
830*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(alt_cv.state.load(Ordering::Relaxed), 0);
831*bb4ee6a4SAndroid Build Coastguard Worker }
832*bb4ee6a4SAndroid Build Coastguard Worker #[test]
notify_all()833*bb4ee6a4SAndroid Build Coastguard Worker fn notify_all() {
834*bb4ee6a4SAndroid Build Coastguard Worker const THREADS: usize = 13;
835*bb4ee6a4SAndroid Build Coastguard Worker
836*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(0));
837*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
838*bb4ee6a4SAndroid Build Coastguard Worker let (tx, rx) = channel();
839*bb4ee6a4SAndroid Build Coastguard Worker
840*bb4ee6a4SAndroid Build Coastguard Worker let mut threads = Vec::with_capacity(THREADS);
841*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..THREADS {
842*bb4ee6a4SAndroid Build Coastguard Worker let mu2 = mu.clone();
843*bb4ee6a4SAndroid Build Coastguard Worker let cv2 = cv.clone();
844*bb4ee6a4SAndroid Build Coastguard Worker let tx2 = tx.clone();
845*bb4ee6a4SAndroid Build Coastguard Worker
846*bb4ee6a4SAndroid Build Coastguard Worker threads.push(thread::spawn(move || {
847*bb4ee6a4SAndroid Build Coastguard Worker let mut count = block_on(mu2.lock());
848*bb4ee6a4SAndroid Build Coastguard Worker *count += 1;
849*bb4ee6a4SAndroid Build Coastguard Worker if *count == THREADS {
850*bb4ee6a4SAndroid Build Coastguard Worker tx2.send(()).unwrap();
851*bb4ee6a4SAndroid Build Coastguard Worker }
852*bb4ee6a4SAndroid Build Coastguard Worker
853*bb4ee6a4SAndroid Build Coastguard Worker while *count != 0 {
854*bb4ee6a4SAndroid Build Coastguard Worker count = block_on(cv2.wait(count));
855*bb4ee6a4SAndroid Build Coastguard Worker }
856*bb4ee6a4SAndroid Build Coastguard Worker }));
857*bb4ee6a4SAndroid Build Coastguard Worker }
858*bb4ee6a4SAndroid Build Coastguard Worker
859*bb4ee6a4SAndroid Build Coastguard Worker mem::drop(tx);
860*bb4ee6a4SAndroid Build Coastguard Worker
861*bb4ee6a4SAndroid Build Coastguard Worker // Wait till all threads have started.
862*bb4ee6a4SAndroid Build Coastguard Worker rx.recv_timeout(Duration::from_secs(5)).unwrap();
863*bb4ee6a4SAndroid Build Coastguard Worker
864*bb4ee6a4SAndroid Build Coastguard Worker let mut count = block_on(mu.lock());
865*bb4ee6a4SAndroid Build Coastguard Worker *count = 0;
866*bb4ee6a4SAndroid Build Coastguard Worker mem::drop(count);
867*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_all();
868*bb4ee6a4SAndroid Build Coastguard Worker
869*bb4ee6a4SAndroid Build Coastguard Worker for t in threads {
870*bb4ee6a4SAndroid Build Coastguard Worker t.join().unwrap();
871*bb4ee6a4SAndroid Build Coastguard Worker }
872*bb4ee6a4SAndroid Build Coastguard Worker }
873*bb4ee6a4SAndroid Build Coastguard Worker
874*bb4ee6a4SAndroid Build Coastguard Worker #[test]
notify_all_single_thread_async()875*bb4ee6a4SAndroid Build Coastguard Worker fn notify_all_single_thread_async() {
876*bb4ee6a4SAndroid Build Coastguard Worker const TASKS: usize = 13;
877*bb4ee6a4SAndroid Build Coastguard Worker
878*bb4ee6a4SAndroid Build Coastguard Worker async fn reset(mu: Rc<RwLock<usize>>, cv: Rc<Condvar>) {
879*bb4ee6a4SAndroid Build Coastguard Worker let mut count = mu.lock().await;
880*bb4ee6a4SAndroid Build Coastguard Worker *count = 0;
881*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_all();
882*bb4ee6a4SAndroid Build Coastguard Worker }
883*bb4ee6a4SAndroid Build Coastguard Worker
884*bb4ee6a4SAndroid Build Coastguard Worker async fn watcher(mu: Rc<RwLock<usize>>, cv: Rc<Condvar>, spawner: LocalSpawner) {
885*bb4ee6a4SAndroid Build Coastguard Worker let mut count = mu.lock().await;
886*bb4ee6a4SAndroid Build Coastguard Worker *count += 1;
887*bb4ee6a4SAndroid Build Coastguard Worker if *count == TASKS {
888*bb4ee6a4SAndroid Build Coastguard Worker spawner
889*bb4ee6a4SAndroid Build Coastguard Worker .spawn_local(reset(mu.clone(), cv.clone()))
890*bb4ee6a4SAndroid Build Coastguard Worker .expect("Failed to spawn reset task");
891*bb4ee6a4SAndroid Build Coastguard Worker }
892*bb4ee6a4SAndroid Build Coastguard Worker
893*bb4ee6a4SAndroid Build Coastguard Worker while *count != 0 {
894*bb4ee6a4SAndroid Build Coastguard Worker count = cv.wait(count).await;
895*bb4ee6a4SAndroid Build Coastguard Worker }
896*bb4ee6a4SAndroid Build Coastguard Worker }
897*bb4ee6a4SAndroid Build Coastguard Worker
898*bb4ee6a4SAndroid Build Coastguard Worker let mut ex = LocalPool::new();
899*bb4ee6a4SAndroid Build Coastguard Worker let spawner = ex.spawner();
900*bb4ee6a4SAndroid Build Coastguard Worker
901*bb4ee6a4SAndroid Build Coastguard Worker let mu = Rc::new(RwLock::new(0));
902*bb4ee6a4SAndroid Build Coastguard Worker let cv = Rc::new(Condvar::new());
903*bb4ee6a4SAndroid Build Coastguard Worker
904*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..TASKS {
905*bb4ee6a4SAndroid Build Coastguard Worker spawner
906*bb4ee6a4SAndroid Build Coastguard Worker .spawn_local(watcher(mu.clone(), cv.clone(), spawner.clone()))
907*bb4ee6a4SAndroid Build Coastguard Worker .expect("Failed to spawn watcher task");
908*bb4ee6a4SAndroid Build Coastguard Worker }
909*bb4ee6a4SAndroid Build Coastguard Worker
910*bb4ee6a4SAndroid Build Coastguard Worker ex.run();
911*bb4ee6a4SAndroid Build Coastguard Worker }
912*bb4ee6a4SAndroid Build Coastguard Worker
913*bb4ee6a4SAndroid Build Coastguard Worker #[test]
notify_all_multi_thread_async()914*bb4ee6a4SAndroid Build Coastguard Worker fn notify_all_multi_thread_async() {
915*bb4ee6a4SAndroid Build Coastguard Worker const TASKS: usize = 13;
916*bb4ee6a4SAndroid Build Coastguard Worker
917*bb4ee6a4SAndroid Build Coastguard Worker async fn reset(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
918*bb4ee6a4SAndroid Build Coastguard Worker let mut count = mu.lock().await;
919*bb4ee6a4SAndroid Build Coastguard Worker *count = 0;
920*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_all();
921*bb4ee6a4SAndroid Build Coastguard Worker }
922*bb4ee6a4SAndroid Build Coastguard Worker
923*bb4ee6a4SAndroid Build Coastguard Worker async fn watcher(
924*bb4ee6a4SAndroid Build Coastguard Worker mu: Arc<RwLock<usize>>,
925*bb4ee6a4SAndroid Build Coastguard Worker cv: Arc<Condvar>,
926*bb4ee6a4SAndroid Build Coastguard Worker pool: ThreadPool,
927*bb4ee6a4SAndroid Build Coastguard Worker tx: Sender<()>,
928*bb4ee6a4SAndroid Build Coastguard Worker ) {
929*bb4ee6a4SAndroid Build Coastguard Worker let mut count = mu.lock().await;
930*bb4ee6a4SAndroid Build Coastguard Worker *count += 1;
931*bb4ee6a4SAndroid Build Coastguard Worker if *count == TASKS {
932*bb4ee6a4SAndroid Build Coastguard Worker pool.spawn_ok(reset(mu.clone(), cv.clone()));
933*bb4ee6a4SAndroid Build Coastguard Worker }
934*bb4ee6a4SAndroid Build Coastguard Worker
935*bb4ee6a4SAndroid Build Coastguard Worker while *count != 0 {
936*bb4ee6a4SAndroid Build Coastguard Worker count = cv.wait(count).await;
937*bb4ee6a4SAndroid Build Coastguard Worker }
938*bb4ee6a4SAndroid Build Coastguard Worker
939*bb4ee6a4SAndroid Build Coastguard Worker tx.send(()).expect("Failed to send completion notification");
940*bb4ee6a4SAndroid Build Coastguard Worker }
941*bb4ee6a4SAndroid Build Coastguard Worker
942*bb4ee6a4SAndroid Build Coastguard Worker let pool = ThreadPool::new().expect("Failed to create ThreadPool");
943*bb4ee6a4SAndroid Build Coastguard Worker
944*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(0));
945*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
946*bb4ee6a4SAndroid Build Coastguard Worker
947*bb4ee6a4SAndroid Build Coastguard Worker let (tx, rx) = channel();
948*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..TASKS {
949*bb4ee6a4SAndroid Build Coastguard Worker pool.spawn_ok(watcher(mu.clone(), cv.clone(), pool.clone(), tx.clone()));
950*bb4ee6a4SAndroid Build Coastguard Worker }
951*bb4ee6a4SAndroid Build Coastguard Worker
952*bb4ee6a4SAndroid Build Coastguard Worker for _ in 0..TASKS {
953*bb4ee6a4SAndroid Build Coastguard Worker rx.recv_timeout(Duration::from_secs(5))
954*bb4ee6a4SAndroid Build Coastguard Worker .expect("Failed to receive completion notification");
955*bb4ee6a4SAndroid Build Coastguard Worker }
956*bb4ee6a4SAndroid Build Coastguard Worker }
957*bb4ee6a4SAndroid Build Coastguard Worker
958*bb4ee6a4SAndroid Build Coastguard Worker #[test]
wake_all_readers()959*bb4ee6a4SAndroid Build Coastguard Worker fn wake_all_readers() {
960*bb4ee6a4SAndroid Build Coastguard Worker async fn read(mu: Arc<RwLock<bool>>, cv: Arc<Condvar>) {
961*bb4ee6a4SAndroid Build Coastguard Worker let mut ready = mu.read_lock().await;
962*bb4ee6a4SAndroid Build Coastguard Worker while !*ready {
963*bb4ee6a4SAndroid Build Coastguard Worker ready = cv.wait_read(ready).await;
964*bb4ee6a4SAndroid Build Coastguard Worker }
965*bb4ee6a4SAndroid Build Coastguard Worker }
966*bb4ee6a4SAndroid Build Coastguard Worker
967*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(false));
968*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
969*bb4ee6a4SAndroid Build Coastguard Worker let mut readers = [
970*bb4ee6a4SAndroid Build Coastguard Worker Box::pin(read(mu.clone(), cv.clone())),
971*bb4ee6a4SAndroid Build Coastguard Worker Box::pin(read(mu.clone(), cv.clone())),
972*bb4ee6a4SAndroid Build Coastguard Worker Box::pin(read(mu.clone(), cv.clone())),
973*bb4ee6a4SAndroid Build Coastguard Worker Box::pin(read(mu.clone(), cv.clone())),
974*bb4ee6a4SAndroid Build Coastguard Worker ];
975*bb4ee6a4SAndroid Build Coastguard Worker
976*bb4ee6a4SAndroid Build Coastguard Worker let arc_waker = Arc::new(TestWaker);
977*bb4ee6a4SAndroid Build Coastguard Worker let waker = waker_ref(&arc_waker);
978*bb4ee6a4SAndroid Build Coastguard Worker let mut cx = Context::from_waker(&waker);
979*bb4ee6a4SAndroid Build Coastguard Worker
980*bb4ee6a4SAndroid Build Coastguard Worker // First have all the readers wait on the Condvar.
981*bb4ee6a4SAndroid Build Coastguard Worker for r in &mut readers {
982*bb4ee6a4SAndroid Build Coastguard Worker if let Poll::Ready(()) = r.as_mut().poll(&mut cx) {
983*bb4ee6a4SAndroid Build Coastguard Worker panic!("reader unexpectedly ready");
984*bb4ee6a4SAndroid Build Coastguard Worker }
985*bb4ee6a4SAndroid Build Coastguard Worker }
986*bb4ee6a4SAndroid Build Coastguard Worker
987*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed) & HAS_WAITERS, HAS_WAITERS);
988*bb4ee6a4SAndroid Build Coastguard Worker
989*bb4ee6a4SAndroid Build Coastguard Worker // Now make the condition true and notify the condvar. Even though we will call notify_one,
990*bb4ee6a4SAndroid Build Coastguard Worker // all the readers should be woken up.
991*bb4ee6a4SAndroid Build Coastguard Worker *block_on(mu.lock()) = true;
992*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_one();
993*bb4ee6a4SAndroid Build Coastguard Worker
994*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed), 0);
995*bb4ee6a4SAndroid Build Coastguard Worker
996*bb4ee6a4SAndroid Build Coastguard Worker // All readers should now be able to complete.
997*bb4ee6a4SAndroid Build Coastguard Worker for r in &mut readers {
998*bb4ee6a4SAndroid Build Coastguard Worker if r.as_mut().poll(&mut cx).is_pending() {
999*bb4ee6a4SAndroid Build Coastguard Worker panic!("reader unable to complete");
1000*bb4ee6a4SAndroid Build Coastguard Worker }
1001*bb4ee6a4SAndroid Build Coastguard Worker }
1002*bb4ee6a4SAndroid Build Coastguard Worker }
1003*bb4ee6a4SAndroid Build Coastguard Worker
1004*bb4ee6a4SAndroid Build Coastguard Worker #[test]
cancel_before_notify()1005*bb4ee6a4SAndroid Build Coastguard Worker fn cancel_before_notify() {
1006*bb4ee6a4SAndroid Build Coastguard Worker async fn dec(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
1007*bb4ee6a4SAndroid Build Coastguard Worker let mut count = mu.lock().await;
1008*bb4ee6a4SAndroid Build Coastguard Worker
1009*bb4ee6a4SAndroid Build Coastguard Worker while *count == 0 {
1010*bb4ee6a4SAndroid Build Coastguard Worker count = cv.wait(count).await;
1011*bb4ee6a4SAndroid Build Coastguard Worker }
1012*bb4ee6a4SAndroid Build Coastguard Worker
1013*bb4ee6a4SAndroid Build Coastguard Worker *count -= 1;
1014*bb4ee6a4SAndroid Build Coastguard Worker }
1015*bb4ee6a4SAndroid Build Coastguard Worker
1016*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(0));
1017*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
1018*bb4ee6a4SAndroid Build Coastguard Worker
1019*bb4ee6a4SAndroid Build Coastguard Worker let arc_waker = Arc::new(TestWaker);
1020*bb4ee6a4SAndroid Build Coastguard Worker let waker = waker_ref(&arc_waker);
1021*bb4ee6a4SAndroid Build Coastguard Worker let mut cx = Context::from_waker(&waker);
1022*bb4ee6a4SAndroid Build Coastguard Worker
1023*bb4ee6a4SAndroid Build Coastguard Worker let mut fut1 = Box::pin(dec(mu.clone(), cv.clone()));
1024*bb4ee6a4SAndroid Build Coastguard Worker let mut fut2 = Box::pin(dec(mu.clone(), cv.clone()));
1025*bb4ee6a4SAndroid Build Coastguard Worker
1026*bb4ee6a4SAndroid Build Coastguard Worker if let Poll::Ready(()) = fut1.as_mut().poll(&mut cx) {
1027*bb4ee6a4SAndroid Build Coastguard Worker panic!("future unexpectedly ready");
1028*bb4ee6a4SAndroid Build Coastguard Worker }
1029*bb4ee6a4SAndroid Build Coastguard Worker if let Poll::Ready(()) = fut2.as_mut().poll(&mut cx) {
1030*bb4ee6a4SAndroid Build Coastguard Worker panic!("future unexpectedly ready");
1031*bb4ee6a4SAndroid Build Coastguard Worker }
1032*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed) & HAS_WAITERS, HAS_WAITERS);
1033*bb4ee6a4SAndroid Build Coastguard Worker
1034*bb4ee6a4SAndroid Build Coastguard Worker *block_on(mu.lock()) = 2;
1035*bb4ee6a4SAndroid Build Coastguard Worker // Drop fut1 before notifying the cv.
1036*bb4ee6a4SAndroid Build Coastguard Worker mem::drop(fut1);
1037*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_one();
1038*bb4ee6a4SAndroid Build Coastguard Worker
1039*bb4ee6a4SAndroid Build Coastguard Worker // fut2 should now be ready to complete.
1040*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed), 0);
1041*bb4ee6a4SAndroid Build Coastguard Worker
1042*bb4ee6a4SAndroid Build Coastguard Worker if fut2.as_mut().poll(&mut cx).is_pending() {
1043*bb4ee6a4SAndroid Build Coastguard Worker panic!("future unable to complete");
1044*bb4ee6a4SAndroid Build Coastguard Worker }
1045*bb4ee6a4SAndroid Build Coastguard Worker
1046*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(*block_on(mu.lock()), 1);
1047*bb4ee6a4SAndroid Build Coastguard Worker }
1048*bb4ee6a4SAndroid Build Coastguard Worker
1049*bb4ee6a4SAndroid Build Coastguard Worker #[test]
cancel_after_notify_one()1050*bb4ee6a4SAndroid Build Coastguard Worker fn cancel_after_notify_one() {
1051*bb4ee6a4SAndroid Build Coastguard Worker async fn dec(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
1052*bb4ee6a4SAndroid Build Coastguard Worker let mut count = mu.lock().await;
1053*bb4ee6a4SAndroid Build Coastguard Worker
1054*bb4ee6a4SAndroid Build Coastguard Worker while *count == 0 {
1055*bb4ee6a4SAndroid Build Coastguard Worker count = cv.wait(count).await;
1056*bb4ee6a4SAndroid Build Coastguard Worker }
1057*bb4ee6a4SAndroid Build Coastguard Worker
1058*bb4ee6a4SAndroid Build Coastguard Worker *count -= 1;
1059*bb4ee6a4SAndroid Build Coastguard Worker }
1060*bb4ee6a4SAndroid Build Coastguard Worker
1061*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(0));
1062*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
1063*bb4ee6a4SAndroid Build Coastguard Worker
1064*bb4ee6a4SAndroid Build Coastguard Worker let arc_waker = Arc::new(TestWaker);
1065*bb4ee6a4SAndroid Build Coastguard Worker let waker = waker_ref(&arc_waker);
1066*bb4ee6a4SAndroid Build Coastguard Worker let mut cx = Context::from_waker(&waker);
1067*bb4ee6a4SAndroid Build Coastguard Worker
1068*bb4ee6a4SAndroid Build Coastguard Worker let mut fut1 = Box::pin(dec(mu.clone(), cv.clone()));
1069*bb4ee6a4SAndroid Build Coastguard Worker let mut fut2 = Box::pin(dec(mu.clone(), cv.clone()));
1070*bb4ee6a4SAndroid Build Coastguard Worker
1071*bb4ee6a4SAndroid Build Coastguard Worker if let Poll::Ready(()) = fut1.as_mut().poll(&mut cx) {
1072*bb4ee6a4SAndroid Build Coastguard Worker panic!("future unexpectedly ready");
1073*bb4ee6a4SAndroid Build Coastguard Worker }
1074*bb4ee6a4SAndroid Build Coastguard Worker if let Poll::Ready(()) = fut2.as_mut().poll(&mut cx) {
1075*bb4ee6a4SAndroid Build Coastguard Worker panic!("future unexpectedly ready");
1076*bb4ee6a4SAndroid Build Coastguard Worker }
1077*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed) & HAS_WAITERS, HAS_WAITERS);
1078*bb4ee6a4SAndroid Build Coastguard Worker
1079*bb4ee6a4SAndroid Build Coastguard Worker *block_on(mu.lock()) = 2;
1080*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_one();
1081*bb4ee6a4SAndroid Build Coastguard Worker
1082*bb4ee6a4SAndroid Build Coastguard Worker // fut1 should now be ready to complete. Drop it before polling. This should wake up fut2.
1083*bb4ee6a4SAndroid Build Coastguard Worker mem::drop(fut1);
1084*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed), 0);
1085*bb4ee6a4SAndroid Build Coastguard Worker
1086*bb4ee6a4SAndroid Build Coastguard Worker if fut2.as_mut().poll(&mut cx).is_pending() {
1087*bb4ee6a4SAndroid Build Coastguard Worker panic!("future unable to complete");
1088*bb4ee6a4SAndroid Build Coastguard Worker }
1089*bb4ee6a4SAndroid Build Coastguard Worker
1090*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(*block_on(mu.lock()), 1);
1091*bb4ee6a4SAndroid Build Coastguard Worker }
1092*bb4ee6a4SAndroid Build Coastguard Worker
1093*bb4ee6a4SAndroid Build Coastguard Worker #[test]
cancel_after_notify_all()1094*bb4ee6a4SAndroid Build Coastguard Worker fn cancel_after_notify_all() {
1095*bb4ee6a4SAndroid Build Coastguard Worker async fn dec(mu: Arc<RwLock<usize>>, cv: Arc<Condvar>) {
1096*bb4ee6a4SAndroid Build Coastguard Worker let mut count = mu.lock().await;
1097*bb4ee6a4SAndroid Build Coastguard Worker
1098*bb4ee6a4SAndroid Build Coastguard Worker while *count == 0 {
1099*bb4ee6a4SAndroid Build Coastguard Worker count = cv.wait(count).await;
1100*bb4ee6a4SAndroid Build Coastguard Worker }
1101*bb4ee6a4SAndroid Build Coastguard Worker
1102*bb4ee6a4SAndroid Build Coastguard Worker *count -= 1;
1103*bb4ee6a4SAndroid Build Coastguard Worker }
1104*bb4ee6a4SAndroid Build Coastguard Worker
1105*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(0));
1106*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
1107*bb4ee6a4SAndroid Build Coastguard Worker
1108*bb4ee6a4SAndroid Build Coastguard Worker let arc_waker = Arc::new(TestWaker);
1109*bb4ee6a4SAndroid Build Coastguard Worker let waker = waker_ref(&arc_waker);
1110*bb4ee6a4SAndroid Build Coastguard Worker let mut cx = Context::from_waker(&waker);
1111*bb4ee6a4SAndroid Build Coastguard Worker
1112*bb4ee6a4SAndroid Build Coastguard Worker let mut fut1 = Box::pin(dec(mu.clone(), cv.clone()));
1113*bb4ee6a4SAndroid Build Coastguard Worker let mut fut2 = Box::pin(dec(mu.clone(), cv.clone()));
1114*bb4ee6a4SAndroid Build Coastguard Worker
1115*bb4ee6a4SAndroid Build Coastguard Worker if let Poll::Ready(()) = fut1.as_mut().poll(&mut cx) {
1116*bb4ee6a4SAndroid Build Coastguard Worker panic!("future unexpectedly ready");
1117*bb4ee6a4SAndroid Build Coastguard Worker }
1118*bb4ee6a4SAndroid Build Coastguard Worker if let Poll::Ready(()) = fut2.as_mut().poll(&mut cx) {
1119*bb4ee6a4SAndroid Build Coastguard Worker panic!("future unexpectedly ready");
1120*bb4ee6a4SAndroid Build Coastguard Worker }
1121*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed) & HAS_WAITERS, HAS_WAITERS);
1122*bb4ee6a4SAndroid Build Coastguard Worker
1123*bb4ee6a4SAndroid Build Coastguard Worker let mut count = block_on(mu.lock());
1124*bb4ee6a4SAndroid Build Coastguard Worker *count = 2;
1125*bb4ee6a4SAndroid Build Coastguard Worker
1126*bb4ee6a4SAndroid Build Coastguard Worker // Notify the cv while holding the lock. This should wake up both waiters.
1127*bb4ee6a4SAndroid Build Coastguard Worker cv.notify_all();
1128*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed), 0);
1129*bb4ee6a4SAndroid Build Coastguard Worker
1130*bb4ee6a4SAndroid Build Coastguard Worker mem::drop(count);
1131*bb4ee6a4SAndroid Build Coastguard Worker
1132*bb4ee6a4SAndroid Build Coastguard Worker mem::drop(fut1);
1133*bb4ee6a4SAndroid Build Coastguard Worker
1134*bb4ee6a4SAndroid Build Coastguard Worker if fut2.as_mut().poll(&mut cx).is_pending() {
1135*bb4ee6a4SAndroid Build Coastguard Worker panic!("future unable to complete");
1136*bb4ee6a4SAndroid Build Coastguard Worker }
1137*bb4ee6a4SAndroid Build Coastguard Worker
1138*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(*block_on(mu.lock()), 1);
1139*bb4ee6a4SAndroid Build Coastguard Worker }
1140*bb4ee6a4SAndroid Build Coastguard Worker
1141*bb4ee6a4SAndroid Build Coastguard Worker #[test]
timed_wait()1142*bb4ee6a4SAndroid Build Coastguard Worker fn timed_wait() {
1143*bb4ee6a4SAndroid Build Coastguard Worker async fn wait_deadline(
1144*bb4ee6a4SAndroid Build Coastguard Worker mu: Arc<RwLock<usize>>,
1145*bb4ee6a4SAndroid Build Coastguard Worker cv: Arc<Condvar>,
1146*bb4ee6a4SAndroid Build Coastguard Worker timeout: oneshot::Receiver<()>,
1147*bb4ee6a4SAndroid Build Coastguard Worker ) {
1148*bb4ee6a4SAndroid Build Coastguard Worker let mut count = mu.lock().await;
1149*bb4ee6a4SAndroid Build Coastguard Worker
1150*bb4ee6a4SAndroid Build Coastguard Worker if *count == 0 {
1151*bb4ee6a4SAndroid Build Coastguard Worker let mut rx = timeout.fuse();
1152*bb4ee6a4SAndroid Build Coastguard Worker
1153*bb4ee6a4SAndroid Build Coastguard Worker while *count == 0 {
1154*bb4ee6a4SAndroid Build Coastguard Worker select! {
1155*bb4ee6a4SAndroid Build Coastguard Worker res = rx => {
1156*bb4ee6a4SAndroid Build Coastguard Worker if let Err(e) = res {
1157*bb4ee6a4SAndroid Build Coastguard Worker panic!("Error while receiving timeout notification: {}", e);
1158*bb4ee6a4SAndroid Build Coastguard Worker }
1159*bb4ee6a4SAndroid Build Coastguard Worker
1160*bb4ee6a4SAndroid Build Coastguard Worker return;
1161*bb4ee6a4SAndroid Build Coastguard Worker },
1162*bb4ee6a4SAndroid Build Coastguard Worker c = cv.wait(count).fuse() => count = c,
1163*bb4ee6a4SAndroid Build Coastguard Worker }
1164*bb4ee6a4SAndroid Build Coastguard Worker }
1165*bb4ee6a4SAndroid Build Coastguard Worker }
1166*bb4ee6a4SAndroid Build Coastguard Worker
1167*bb4ee6a4SAndroid Build Coastguard Worker *count += 1;
1168*bb4ee6a4SAndroid Build Coastguard Worker }
1169*bb4ee6a4SAndroid Build Coastguard Worker
1170*bb4ee6a4SAndroid Build Coastguard Worker let mu = Arc::new(RwLock::new(0));
1171*bb4ee6a4SAndroid Build Coastguard Worker let cv = Arc::new(Condvar::new());
1172*bb4ee6a4SAndroid Build Coastguard Worker
1173*bb4ee6a4SAndroid Build Coastguard Worker let arc_waker = Arc::new(TestWaker);
1174*bb4ee6a4SAndroid Build Coastguard Worker let waker = waker_ref(&arc_waker);
1175*bb4ee6a4SAndroid Build Coastguard Worker let mut cx = Context::from_waker(&waker);
1176*bb4ee6a4SAndroid Build Coastguard Worker
1177*bb4ee6a4SAndroid Build Coastguard Worker let (tx, rx) = oneshot::channel();
1178*bb4ee6a4SAndroid Build Coastguard Worker let mut wait = Box::pin(wait_deadline(mu.clone(), cv.clone(), rx));
1179*bb4ee6a4SAndroid Build Coastguard Worker
1180*bb4ee6a4SAndroid Build Coastguard Worker if let Poll::Ready(()) = wait.as_mut().poll(&mut cx) {
1181*bb4ee6a4SAndroid Build Coastguard Worker panic!("wait_deadline unexpectedly ready");
1182*bb4ee6a4SAndroid Build Coastguard Worker }
1183*bb4ee6a4SAndroid Build Coastguard Worker
1184*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed), HAS_WAITERS);
1185*bb4ee6a4SAndroid Build Coastguard Worker
1186*bb4ee6a4SAndroid Build Coastguard Worker // Signal the channel, which should cancel the wait.
1187*bb4ee6a4SAndroid Build Coastguard Worker tx.send(()).expect("Failed to send wakeup");
1188*bb4ee6a4SAndroid Build Coastguard Worker
1189*bb4ee6a4SAndroid Build Coastguard Worker // Wait for the timer to run out.
1190*bb4ee6a4SAndroid Build Coastguard Worker if wait.as_mut().poll(&mut cx).is_pending() {
1191*bb4ee6a4SAndroid Build Coastguard Worker panic!("wait_deadline unable to complete in time");
1192*bb4ee6a4SAndroid Build Coastguard Worker }
1193*bb4ee6a4SAndroid Build Coastguard Worker
1194*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(cv.state.load(Ordering::Relaxed), 0);
1195*bb4ee6a4SAndroid Build Coastguard Worker assert_eq!(*block_on(mu.lock()), 0);
1196*bb4ee6a4SAndroid Build Coastguard Worker }
1197*bb4ee6a4SAndroid Build Coastguard Worker }
1198