1 use crate::primitive::sync::atomic::{AtomicUsize, Ordering::SeqCst}; 2 use crate::primitive::sync::{Arc, Condvar, Mutex}; 3 use std::fmt; 4 use std::marker::PhantomData; 5 use std::time::{Duration, Instant}; 6 7 /// A thread parking primitive. 8 /// 9 /// Conceptually, each `Parker` has an associated token which is initially not present: 10 /// 11 /// * The [`park`] method blocks the current thread unless or until the token is available, at 12 /// which point it automatically consumes the token. 13 /// 14 /// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for 15 /// a specified maximum time. 16 /// 17 /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the 18 /// token is initially absent, [`unpark`] followed by [`park`] will result in the second call 19 /// returning immediately. 20 /// 21 /// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using 22 /// [`park`] and [`unpark`]. 23 /// 24 /// # Examples 25 /// 26 /// ``` 27 /// use std::thread; 28 /// use std::time::Duration; 29 /// use crossbeam_utils::sync::Parker; 30 /// 31 /// let p = Parker::new(); 32 /// let u = p.unparker().clone(); 33 /// 34 /// // Make the token available. 35 /// u.unpark(); 36 /// // Wakes up immediately and consumes the token. 37 /// p.park(); 38 /// 39 /// thread::spawn(move || { 40 /// thread::sleep(Duration::from_millis(500)); 41 /// u.unpark(); 42 /// }); 43 /// 44 /// // Wakes up when `u.unpark()` provides the token. 45 /// p.park(); 46 /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 47 /// ``` 48 /// 49 /// [`park`]: Parker::park 50 /// [`park_timeout`]: Parker::park_timeout 51 /// [`park_deadline`]: Parker::park_deadline 52 /// [`unpark`]: Unparker::unpark 53 pub struct Parker { 54 unparker: Unparker, 55 _marker: PhantomData<*const ()>, 56 } 57 58 unsafe impl Send for Parker {} 59 60 impl Default for Parker { default() -> Self61 fn default() -> Self { 62 Self { 63 unparker: Unparker { 64 inner: Arc::new(Inner { 65 state: AtomicUsize::new(EMPTY), 66 lock: Mutex::new(()), 67 cvar: Condvar::new(), 68 }), 69 }, 70 _marker: PhantomData, 71 } 72 } 73 } 74 75 impl Parker { 76 /// Creates a new `Parker`. 77 /// 78 /// # Examples 79 /// 80 /// ``` 81 /// use crossbeam_utils::sync::Parker; 82 /// 83 /// let p = Parker::new(); 84 /// ``` 85 /// new() -> Parker86 pub fn new() -> Parker { 87 Self::default() 88 } 89 90 /// Blocks the current thread until the token is made available. 91 /// 92 /// # Examples 93 /// 94 /// ``` 95 /// use crossbeam_utils::sync::Parker; 96 /// 97 /// let p = Parker::new(); 98 /// let u = p.unparker().clone(); 99 /// 100 /// // Make the token available. 101 /// u.unpark(); 102 /// 103 /// // Wakes up immediately and consumes the token. 104 /// p.park(); 105 /// ``` park(&self)106 pub fn park(&self) { 107 self.unparker.inner.park(None); 108 } 109 110 /// Blocks the current thread until the token is made available, but only for a limited time. 111 /// 112 /// # Examples 113 /// 114 /// ``` 115 /// use std::time::Duration; 116 /// use crossbeam_utils::sync::Parker; 117 /// 118 /// let p = Parker::new(); 119 /// 120 /// // Waits for the token to become available, but will not wait longer than 500 ms. 121 /// p.park_timeout(Duration::from_millis(500)); 122 /// ``` park_timeout(&self, timeout: Duration)123 pub fn park_timeout(&self, timeout: Duration) { 124 match Instant::now().checked_add(timeout) { 125 Some(deadline) => self.park_deadline(deadline), 126 None => self.park(), 127 } 128 } 129 130 /// Blocks the current thread until the token is made available, or until a certain deadline. 131 /// 132 /// # Examples 133 /// 134 /// ``` 135 /// use std::time::{Duration, Instant}; 136 /// use crossbeam_utils::sync::Parker; 137 /// 138 /// let p = Parker::new(); 139 /// let deadline = Instant::now() + Duration::from_millis(500); 140 /// 141 /// // Waits for the token to become available, but will not wait longer than 500 ms. 142 /// p.park_deadline(deadline); 143 /// ``` park_deadline(&self, deadline: Instant)144 pub fn park_deadline(&self, deadline: Instant) { 145 self.unparker.inner.park(Some(deadline)) 146 } 147 148 /// Returns a reference to an associated [`Unparker`]. 149 /// 150 /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned. 151 /// 152 /// # Examples 153 /// 154 /// ``` 155 /// use crossbeam_utils::sync::Parker; 156 /// 157 /// let p = Parker::new(); 158 /// let u = p.unparker().clone(); 159 /// 160 /// // Make the token available. 161 /// u.unpark(); 162 /// // Wakes up immediately and consumes the token. 163 /// p.park(); 164 /// ``` 165 /// 166 /// [`park`]: Parker::park 167 /// [`park_timeout`]: Parker::park_timeout unparker(&self) -> &Unparker168 pub fn unparker(&self) -> &Unparker { 169 &self.unparker 170 } 171 172 /// Converts a `Parker` into a raw pointer. 173 /// 174 /// # Examples 175 /// 176 /// ``` 177 /// use crossbeam_utils::sync::Parker; 178 /// 179 /// let p = Parker::new(); 180 /// let raw = Parker::into_raw(p); 181 /// # let _ = unsafe { Parker::from_raw(raw) }; 182 /// ``` into_raw(this: Parker) -> *const ()183 pub fn into_raw(this: Parker) -> *const () { 184 Unparker::into_raw(this.unparker) 185 } 186 187 /// Converts a raw pointer into a `Parker`. 188 /// 189 /// # Safety 190 /// 191 /// This method is safe to use only with pointers returned by [`Parker::into_raw`]. 192 /// 193 /// # Examples 194 /// 195 /// ``` 196 /// use crossbeam_utils::sync::Parker; 197 /// 198 /// let p = Parker::new(); 199 /// let raw = Parker::into_raw(p); 200 /// let p = unsafe { Parker::from_raw(raw) }; 201 /// ``` from_raw(ptr: *const ()) -> Parker202 pub unsafe fn from_raw(ptr: *const ()) -> Parker { 203 Parker { 204 unparker: Unparker::from_raw(ptr), 205 _marker: PhantomData, 206 } 207 } 208 } 209 210 impl fmt::Debug for Parker { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 212 f.pad("Parker { .. }") 213 } 214 } 215 216 /// Unparks a thread parked by the associated [`Parker`]. 217 pub struct Unparker { 218 inner: Arc<Inner>, 219 } 220 221 unsafe impl Send for Unparker {} 222 unsafe impl Sync for Unparker {} 223 224 impl Unparker { 225 /// Atomically makes the token available if it is not already. 226 /// 227 /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is 228 /// any. 229 /// 230 /// # Examples 231 /// 232 /// ``` 233 /// use std::thread; 234 /// use std::time::Duration; 235 /// use crossbeam_utils::sync::Parker; 236 /// 237 /// let p = Parker::new(); 238 /// let u = p.unparker().clone(); 239 /// 240 /// thread::spawn(move || { 241 /// thread::sleep(Duration::from_millis(500)); 242 /// u.unpark(); 243 /// }); 244 /// 245 /// // Wakes up when `u.unpark()` provides the token. 246 /// p.park(); 247 /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 248 /// ``` 249 /// 250 /// [`park`]: Parker::park 251 /// [`park_timeout`]: Parker::park_timeout unpark(&self)252 pub fn unpark(&self) { 253 self.inner.unpark() 254 } 255 256 /// Converts an `Unparker` into a raw pointer. 257 /// 258 /// # Examples 259 /// 260 /// ``` 261 /// use crossbeam_utils::sync::{Parker, Unparker}; 262 /// 263 /// let p = Parker::new(); 264 /// let u = p.unparker().clone(); 265 /// let raw = Unparker::into_raw(u); 266 /// # let _ = unsafe { Unparker::from_raw(raw) }; 267 /// ``` into_raw(this: Unparker) -> *const ()268 pub fn into_raw(this: Unparker) -> *const () { 269 Arc::into_raw(this.inner).cast::<()>() 270 } 271 272 /// Converts a raw pointer into an `Unparker`. 273 /// 274 /// # Safety 275 /// 276 /// This method is safe to use only with pointers returned by [`Unparker::into_raw`]. 277 /// 278 /// # Examples 279 /// 280 /// ``` 281 /// use crossbeam_utils::sync::{Parker, Unparker}; 282 /// 283 /// let p = Parker::new(); 284 /// let u = p.unparker().clone(); 285 /// 286 /// let raw = Unparker::into_raw(u); 287 /// let u = unsafe { Unparker::from_raw(raw) }; 288 /// ``` from_raw(ptr: *const ()) -> Unparker289 pub unsafe fn from_raw(ptr: *const ()) -> Unparker { 290 Unparker { 291 inner: Arc::from_raw(ptr.cast::<Inner>()), 292 } 293 } 294 } 295 296 impl fmt::Debug for Unparker { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result297 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 298 f.pad("Unparker { .. }") 299 } 300 } 301 302 impl Clone for Unparker { clone(&self) -> Unparker303 fn clone(&self) -> Unparker { 304 Unparker { 305 inner: self.inner.clone(), 306 } 307 } 308 } 309 310 const EMPTY: usize = 0; 311 const PARKED: usize = 1; 312 const NOTIFIED: usize = 2; 313 314 struct Inner { 315 state: AtomicUsize, 316 lock: Mutex<()>, 317 cvar: Condvar, 318 } 319 320 impl Inner { park(&self, deadline: Option<Instant>)321 fn park(&self, deadline: Option<Instant>) { 322 // If we were previously notified then we consume this notification and return quickly. 323 if self 324 .state 325 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 326 .is_ok() 327 { 328 return; 329 } 330 331 // If the timeout is zero, then there is no need to actually block. 332 if let Some(deadline) = deadline { 333 if deadline <= Instant::now() { 334 return; 335 } 336 } 337 338 // Otherwise we need to coordinate going to sleep. 339 let mut m = self.lock.lock().unwrap(); 340 341 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { 342 Ok(_) => {} 343 // Consume this notification to avoid spurious wakeups in the next park. 344 Err(NOTIFIED) => { 345 // We must read `state` here, even though we know it will be `NOTIFIED`. This is 346 // because `unpark` may have been called again since we read `NOTIFIED` in the 347 // `compare_exchange` above. We must perform an acquire operation that synchronizes 348 // with that `unpark` to observe any writes it made before the call to `unpark`. To 349 // do that we must read from the write it made to `state`. 350 let old = self.state.swap(EMPTY, SeqCst); 351 assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); 352 return; 353 } 354 Err(n) => panic!("inconsistent park_timeout state: {}", n), 355 } 356 357 loop { 358 // Block the current thread on the conditional variable. 359 m = match deadline { 360 None => self.cvar.wait(m).unwrap(), 361 Some(deadline) => { 362 let now = Instant::now(); 363 if now < deadline { 364 // We could check for a timeout here, in the return value of wait_timeout, 365 // but in the case that a timeout and an unpark arrive simultaneously, we 366 // prefer to report the former. 367 self.cvar.wait_timeout(m, deadline - now).unwrap().0 368 } else { 369 // We've timed out; swap out the state back to empty on our way out 370 match self.state.swap(EMPTY, SeqCst) { 371 NOTIFIED | PARKED => return, 372 n => panic!("inconsistent park_timeout state: {}", n), 373 }; 374 } 375 } 376 }; 377 378 if self 379 .state 380 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 381 .is_ok() 382 { 383 // got a notification 384 return; 385 } 386 387 // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught 388 // in the branch above, when we discover the deadline is in the past 389 } 390 } 391 unpark(&self)392 pub(crate) fn unpark(&self) { 393 // To ensure the unparked thread will observe any writes we made before this call, we must 394 // perform a release operation that `park` can synchronize with. To do that we must write 395 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather 396 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. 397 match self.state.swap(NOTIFIED, SeqCst) { 398 EMPTY => return, // no one was waiting 399 NOTIFIED => return, // already unparked 400 PARKED => {} // gotta go wake someone up 401 _ => panic!("inconsistent state in unpark"), 402 } 403 404 // There is a period between when the parked thread sets `state` to `PARKED` (or last 405 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. 406 // If we were to notify during this period it would be ignored and then when the parked 407 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this 408 // stage so we can acquire `lock` to wait until it is ready to receive the notification. 409 // 410 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes 411 // it doesn't get woken only to have to wait for us to release `lock`. 412 drop(self.lock.lock().unwrap()); 413 self.cvar.notify_one(); 414 } 415 } 416