1 use crate::primitive::sync::atomic::{AtomicUsize, Ordering::SeqCst};
2 use crate::primitive::sync::{Arc, Condvar, Mutex};
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::time::{Duration, Instant};
6 
7 /// A thread parking primitive.
8 ///
9 /// Conceptually, each `Parker` has an associated token which is initially not present:
10 ///
11 /// * The [`park`] method blocks the current thread unless or until the token is available, at
12 ///   which point it automatically consumes the token.
13 ///
14 /// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
15 ///   a specified maximum time.
16 ///
17 /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
18 ///   token is initially absent, [`unpark`] followed by [`park`] will result in the second call
19 ///   returning immediately.
20 ///
21 /// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
22 /// [`park`] and [`unpark`].
23 ///
24 /// # Examples
25 ///
26 /// ```
27 /// use std::thread;
28 /// use std::time::Duration;
29 /// use crossbeam_utils::sync::Parker;
30 ///
31 /// let p = Parker::new();
32 /// let u = p.unparker().clone();
33 ///
34 /// // Make the token available.
35 /// u.unpark();
36 /// // Wakes up immediately and consumes the token.
37 /// p.park();
38 ///
39 /// thread::spawn(move || {
40 ///     thread::sleep(Duration::from_millis(500));
41 ///     u.unpark();
42 /// });
43 ///
44 /// // Wakes up when `u.unpark()` provides the token.
45 /// p.park();
46 /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
47 /// ```
48 ///
49 /// [`park`]: Parker::park
50 /// [`park_timeout`]: Parker::park_timeout
51 /// [`park_deadline`]: Parker::park_deadline
52 /// [`unpark`]: Unparker::unpark
53 pub struct Parker {
54     unparker: Unparker,
55     _marker: PhantomData<*const ()>,
56 }
57 
58 unsafe impl Send for Parker {}
59 
60 impl Default for Parker {
default() -> Self61     fn default() -> Self {
62         Self {
63             unparker: Unparker {
64                 inner: Arc::new(Inner {
65                     state: AtomicUsize::new(EMPTY),
66                     lock: Mutex::new(()),
67                     cvar: Condvar::new(),
68                 }),
69             },
70             _marker: PhantomData,
71         }
72     }
73 }
74 
75 impl Parker {
76     /// Creates a new `Parker`.
77     ///
78     /// # Examples
79     ///
80     /// ```
81     /// use crossbeam_utils::sync::Parker;
82     ///
83     /// let p = Parker::new();
84     /// ```
85     ///
new() -> Parker86     pub fn new() -> Parker {
87         Self::default()
88     }
89 
90     /// Blocks the current thread until the token is made available.
91     ///
92     /// # Examples
93     ///
94     /// ```
95     /// use crossbeam_utils::sync::Parker;
96     ///
97     /// let p = Parker::new();
98     /// let u = p.unparker().clone();
99     ///
100     /// // Make the token available.
101     /// u.unpark();
102     ///
103     /// // Wakes up immediately and consumes the token.
104     /// p.park();
105     /// ```
park(&self)106     pub fn park(&self) {
107         self.unparker.inner.park(None);
108     }
109 
110     /// Blocks the current thread until the token is made available, but only for a limited time.
111     ///
112     /// # Examples
113     ///
114     /// ```
115     /// use std::time::Duration;
116     /// use crossbeam_utils::sync::Parker;
117     ///
118     /// let p = Parker::new();
119     ///
120     /// // Waits for the token to become available, but will not wait longer than 500 ms.
121     /// p.park_timeout(Duration::from_millis(500));
122     /// ```
park_timeout(&self, timeout: Duration)123     pub fn park_timeout(&self, timeout: Duration) {
124         match Instant::now().checked_add(timeout) {
125             Some(deadline) => self.park_deadline(deadline),
126             None => self.park(),
127         }
128     }
129 
130     /// Blocks the current thread until the token is made available, or until a certain deadline.
131     ///
132     /// # Examples
133     ///
134     /// ```
135     /// use std::time::{Duration, Instant};
136     /// use crossbeam_utils::sync::Parker;
137     ///
138     /// let p = Parker::new();
139     /// let deadline = Instant::now() + Duration::from_millis(500);
140     ///
141     /// // Waits for the token to become available, but will not wait longer than 500 ms.
142     /// p.park_deadline(deadline);
143     /// ```
park_deadline(&self, deadline: Instant)144     pub fn park_deadline(&self, deadline: Instant) {
145         self.unparker.inner.park(Some(deadline))
146     }
147 
148     /// Returns a reference to an associated [`Unparker`].
149     ///
150     /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
151     ///
152     /// # Examples
153     ///
154     /// ```
155     /// use crossbeam_utils::sync::Parker;
156     ///
157     /// let p = Parker::new();
158     /// let u = p.unparker().clone();
159     ///
160     /// // Make the token available.
161     /// u.unpark();
162     /// // Wakes up immediately and consumes the token.
163     /// p.park();
164     /// ```
165     ///
166     /// [`park`]: Parker::park
167     /// [`park_timeout`]: Parker::park_timeout
unparker(&self) -> &Unparker168     pub fn unparker(&self) -> &Unparker {
169         &self.unparker
170     }
171 
172     /// Converts a `Parker` into a raw pointer.
173     ///
174     /// # Examples
175     ///
176     /// ```
177     /// use crossbeam_utils::sync::Parker;
178     ///
179     /// let p = Parker::new();
180     /// let raw = Parker::into_raw(p);
181     /// # let _ = unsafe { Parker::from_raw(raw) };
182     /// ```
into_raw(this: Parker) -> *const ()183     pub fn into_raw(this: Parker) -> *const () {
184         Unparker::into_raw(this.unparker)
185     }
186 
187     /// Converts a raw pointer into a `Parker`.
188     ///
189     /// # Safety
190     ///
191     /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
192     ///
193     /// # Examples
194     ///
195     /// ```
196     /// use crossbeam_utils::sync::Parker;
197     ///
198     /// let p = Parker::new();
199     /// let raw = Parker::into_raw(p);
200     /// let p = unsafe { Parker::from_raw(raw) };
201     /// ```
from_raw(ptr: *const ()) -> Parker202     pub unsafe fn from_raw(ptr: *const ()) -> Parker {
203         Parker {
204             unparker: Unparker::from_raw(ptr),
205             _marker: PhantomData,
206         }
207     }
208 }
209 
210 impl fmt::Debug for Parker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result211     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212         f.pad("Parker { .. }")
213     }
214 }
215 
216 /// Unparks a thread parked by the associated [`Parker`].
217 pub struct Unparker {
218     inner: Arc<Inner>,
219 }
220 
221 unsafe impl Send for Unparker {}
222 unsafe impl Sync for Unparker {}
223 
224 impl Unparker {
225     /// Atomically makes the token available if it is not already.
226     ///
227     /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
228     /// any.
229     ///
230     /// # Examples
231     ///
232     /// ```
233     /// use std::thread;
234     /// use std::time::Duration;
235     /// use crossbeam_utils::sync::Parker;
236     ///
237     /// let p = Parker::new();
238     /// let u = p.unparker().clone();
239     ///
240     /// thread::spawn(move || {
241     ///     thread::sleep(Duration::from_millis(500));
242     ///     u.unpark();
243     /// });
244     ///
245     /// // Wakes up when `u.unpark()` provides the token.
246     /// p.park();
247     /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
248     /// ```
249     ///
250     /// [`park`]: Parker::park
251     /// [`park_timeout`]: Parker::park_timeout
unpark(&self)252     pub fn unpark(&self) {
253         self.inner.unpark()
254     }
255 
256     /// Converts an `Unparker` into a raw pointer.
257     ///
258     /// # Examples
259     ///
260     /// ```
261     /// use crossbeam_utils::sync::{Parker, Unparker};
262     ///
263     /// let p = Parker::new();
264     /// let u = p.unparker().clone();
265     /// let raw = Unparker::into_raw(u);
266     /// # let _ = unsafe { Unparker::from_raw(raw) };
267     /// ```
into_raw(this: Unparker) -> *const ()268     pub fn into_raw(this: Unparker) -> *const () {
269         Arc::into_raw(this.inner).cast::<()>()
270     }
271 
272     /// Converts a raw pointer into an `Unparker`.
273     ///
274     /// # Safety
275     ///
276     /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
277     ///
278     /// # Examples
279     ///
280     /// ```
281     /// use crossbeam_utils::sync::{Parker, Unparker};
282     ///
283     /// let p = Parker::new();
284     /// let u = p.unparker().clone();
285     ///
286     /// let raw = Unparker::into_raw(u);
287     /// let u = unsafe { Unparker::from_raw(raw) };
288     /// ```
from_raw(ptr: *const ()) -> Unparker289     pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
290         Unparker {
291             inner: Arc::from_raw(ptr.cast::<Inner>()),
292         }
293     }
294 }
295 
296 impl fmt::Debug for Unparker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result297     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298         f.pad("Unparker { .. }")
299     }
300 }
301 
302 impl Clone for Unparker {
clone(&self) -> Unparker303     fn clone(&self) -> Unparker {
304         Unparker {
305             inner: self.inner.clone(),
306         }
307     }
308 }
309 
310 const EMPTY: usize = 0;
311 const PARKED: usize = 1;
312 const NOTIFIED: usize = 2;
313 
314 struct Inner {
315     state: AtomicUsize,
316     lock: Mutex<()>,
317     cvar: Condvar,
318 }
319 
320 impl Inner {
park(&self, deadline: Option<Instant>)321     fn park(&self, deadline: Option<Instant>) {
322         // If we were previously notified then we consume this notification and return quickly.
323         if self
324             .state
325             .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
326             .is_ok()
327         {
328             return;
329         }
330 
331         // If the timeout is zero, then there is no need to actually block.
332         if let Some(deadline) = deadline {
333             if deadline <= Instant::now() {
334                 return;
335             }
336         }
337 
338         // Otherwise we need to coordinate going to sleep.
339         let mut m = self.lock.lock().unwrap();
340 
341         match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
342             Ok(_) => {}
343             // Consume this notification to avoid spurious wakeups in the next park.
344             Err(NOTIFIED) => {
345                 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
346                 // because `unpark` may have been called again since we read `NOTIFIED` in the
347                 // `compare_exchange` above. We must perform an acquire operation that synchronizes
348                 // with that `unpark` to observe any writes it made before the call to `unpark`. To
349                 // do that we must read from the write it made to `state`.
350                 let old = self.state.swap(EMPTY, SeqCst);
351                 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
352                 return;
353             }
354             Err(n) => panic!("inconsistent park_timeout state: {}", n),
355         }
356 
357         loop {
358             // Block the current thread on the conditional variable.
359             m = match deadline {
360                 None => self.cvar.wait(m).unwrap(),
361                 Some(deadline) => {
362                     let now = Instant::now();
363                     if now < deadline {
364                         // We could check for a timeout here, in the return value of wait_timeout,
365                         // but in the case that a timeout and an unpark arrive simultaneously, we
366                         // prefer to report the former.
367                         self.cvar.wait_timeout(m, deadline - now).unwrap().0
368                     } else {
369                         // We've timed out; swap out the state back to empty on our way out
370                         match self.state.swap(EMPTY, SeqCst) {
371                             NOTIFIED | PARKED => return,
372                             n => panic!("inconsistent park_timeout state: {}", n),
373                         };
374                     }
375                 }
376             };
377 
378             if self
379                 .state
380                 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
381                 .is_ok()
382             {
383                 // got a notification
384                 return;
385             }
386 
387             // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
388             // in the branch above, when we discover the deadline is in the past
389         }
390     }
391 
unpark(&self)392     pub(crate) fn unpark(&self) {
393         // To ensure the unparked thread will observe any writes we made before this call, we must
394         // perform a release operation that `park` can synchronize with. To do that we must write
395         // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
396         // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
397         match self.state.swap(NOTIFIED, SeqCst) {
398             EMPTY => return,    // no one was waiting
399             NOTIFIED => return, // already unparked
400             PARKED => {}        // gotta go wake someone up
401             _ => panic!("inconsistent state in unpark"),
402         }
403 
404         // There is a period between when the parked thread sets `state` to `PARKED` (or last
405         // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
406         // If we were to notify during this period it would be ignored and then when the parked
407         // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
408         // stage so we can acquire `lock` to wait until it is ready to receive the notification.
409         //
410         // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
411         // it doesn't get woken only to have to wait for us to release `lock`.
412         drop(self.lock.lock().unwrap());
413         self.cvar.notify_one();
414     }
415 }
416