1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use core::{
9     ffi,
10     mem::{self, MaybeUninit},
11     ptr,
12 };
13 use std::sync::atomic::{AtomicUsize, Ordering};
14 use std::time::Instant;
15 
16 const STATE_UNPARKED: usize = 0;
17 const STATE_PARKED: usize = 1;
18 const STATE_TIMED_OUT: usize = 2;
19 
20 use super::bindings::*;
21 
22 #[allow(non_snake_case)]
23 pub struct KeyedEvent {
24     handle: HANDLE,
25     NtReleaseKeyedEvent: extern "system" fn(
26         EventHandle: HANDLE,
27         Key: *mut ffi::c_void,
28         Alertable: BOOLEAN,
29         Timeout: *mut i64,
30     ) -> NTSTATUS,
31     NtWaitForKeyedEvent: extern "system" fn(
32         EventHandle: HANDLE,
33         Key: *mut ffi::c_void,
34         Alertable: BOOLEAN,
35         Timeout: *mut i64,
36     ) -> NTSTATUS,
37 }
38 
39 impl KeyedEvent {
40     #[inline]
wait_for(&self, key: *mut ffi::c_void, timeout: *mut i64) -> NTSTATUS41     unsafe fn wait_for(&self, key: *mut ffi::c_void, timeout: *mut i64) -> NTSTATUS {
42         (self.NtWaitForKeyedEvent)(self.handle, key, false.into(), timeout)
43     }
44 
45     #[inline]
release(&self, key: *mut ffi::c_void) -> NTSTATUS46     unsafe fn release(&self, key: *mut ffi::c_void) -> NTSTATUS {
47         (self.NtReleaseKeyedEvent)(self.handle, key, false.into(), ptr::null_mut())
48     }
49 
50     #[allow(non_snake_case)]
create() -> Option<KeyedEvent>51     pub fn create() -> Option<KeyedEvent> {
52         let ntdll = unsafe { GetModuleHandleA(b"ntdll.dll\0".as_ptr()) };
53         if ntdll == 0 {
54             return None;
55         }
56 
57         let NtCreateKeyedEvent =
58             unsafe { GetProcAddress(ntdll, b"NtCreateKeyedEvent\0".as_ptr())? };
59         let NtReleaseKeyedEvent =
60             unsafe { GetProcAddress(ntdll, b"NtReleaseKeyedEvent\0".as_ptr())? };
61         let NtWaitForKeyedEvent =
62             unsafe { GetProcAddress(ntdll, b"NtWaitForKeyedEvent\0".as_ptr())? };
63 
64         let NtCreateKeyedEvent: extern "system" fn(
65             KeyedEventHandle: *mut HANDLE,
66             DesiredAccess: u32,
67             ObjectAttributes: *mut ffi::c_void,
68             Flags: u32,
69         ) -> NTSTATUS = unsafe { mem::transmute(NtCreateKeyedEvent) };
70         let mut handle = MaybeUninit::uninit();
71         let status = NtCreateKeyedEvent(
72             handle.as_mut_ptr(),
73             GENERIC_READ | GENERIC_WRITE,
74             ptr::null_mut(),
75             0,
76         );
77         if status != STATUS_SUCCESS {
78             return None;
79         }
80 
81         Some(KeyedEvent {
82             handle: unsafe { handle.assume_init() },
83             NtReleaseKeyedEvent: unsafe { mem::transmute(NtReleaseKeyedEvent) },
84             NtWaitForKeyedEvent: unsafe { mem::transmute(NtWaitForKeyedEvent) },
85         })
86     }
87 
88     #[inline]
prepare_park(&'static self, key: &AtomicUsize)89     pub fn prepare_park(&'static self, key: &AtomicUsize) {
90         key.store(STATE_PARKED, Ordering::Relaxed);
91     }
92 
93     #[inline]
timed_out(&'static self, key: &AtomicUsize) -> bool94     pub fn timed_out(&'static self, key: &AtomicUsize) -> bool {
95         key.load(Ordering::Relaxed) == STATE_TIMED_OUT
96     }
97 
98     #[inline]
park(&'static self, key: &AtomicUsize)99     pub unsafe fn park(&'static self, key: &AtomicUsize) {
100         let status = self.wait_for(key as *const _ as *mut ffi::c_void, ptr::null_mut());
101         debug_assert_eq!(status, STATUS_SUCCESS);
102     }
103 
104     #[inline]
park_until(&'static self, key: &AtomicUsize, timeout: Instant) -> bool105     pub unsafe fn park_until(&'static self, key: &AtomicUsize, timeout: Instant) -> bool {
106         let now = Instant::now();
107         if timeout <= now {
108             // If another thread unparked us, we need to call
109             // NtWaitForKeyedEvent otherwise that thread will stay stuck at
110             // NtReleaseKeyedEvent.
111             if key.swap(STATE_TIMED_OUT, Ordering::Relaxed) == STATE_UNPARKED {
112                 self.park(key);
113                 return true;
114             }
115             return false;
116         }
117 
118         // NT uses a timeout in units of 100ns. We use a negative value to
119         // indicate a relative timeout based on a monotonic clock.
120         let diff = timeout - now;
121         let value = (diff.as_secs() as i64)
122             .checked_mul(-10000000)
123             .and_then(|x| x.checked_sub((diff.subsec_nanos() as i64 + 99) / 100));
124 
125         let mut nt_timeout = match value {
126             Some(x) => x,
127             None => {
128                 // Timeout overflowed, just sleep indefinitely
129                 self.park(key);
130                 return true;
131             }
132         };
133 
134         let status = self.wait_for(key as *const _ as *mut ffi::c_void, &mut nt_timeout);
135         if status == STATUS_SUCCESS {
136             return true;
137         }
138         debug_assert_eq!(status, STATUS_TIMEOUT);
139 
140         // If another thread unparked us, we need to call NtWaitForKeyedEvent
141         // otherwise that thread will stay stuck at NtReleaseKeyedEvent.
142         if key.swap(STATE_TIMED_OUT, Ordering::Relaxed) == STATE_UNPARKED {
143             self.park(key);
144             return true;
145         }
146         false
147     }
148 
149     #[inline]
unpark_lock(&'static self, key: &AtomicUsize) -> UnparkHandle150     pub unsafe fn unpark_lock(&'static self, key: &AtomicUsize) -> UnparkHandle {
151         // If the state was STATE_PARKED then we need to wake up the thread
152         if key.swap(STATE_UNPARKED, Ordering::Relaxed) == STATE_PARKED {
153             UnparkHandle {
154                 key: key,
155                 keyed_event: self,
156             }
157         } else {
158             UnparkHandle {
159                 key: ptr::null(),
160                 keyed_event: self,
161             }
162         }
163     }
164 }
165 
166 impl Drop for KeyedEvent {
167     #[inline]
drop(&mut self)168     fn drop(&mut self) {
169         unsafe {
170             let ok = CloseHandle(self.handle);
171             debug_assert_eq!(ok, true.into());
172         }
173     }
174 }
175 
176 // Handle for a thread that is about to be unparked. We need to mark the thread
177 // as unparked while holding the queue lock, but we delay the actual unparking
178 // until after the queue lock is released.
179 pub struct UnparkHandle {
180     key: *const AtomicUsize,
181     keyed_event: &'static KeyedEvent,
182 }
183 
184 impl UnparkHandle {
185     // Wakes up the parked thread. This should be called after the queue lock is
186     // released to avoid blocking the queue for too long.
187     #[inline]
unpark(self)188     pub unsafe fn unpark(self) {
189         if !self.key.is_null() {
190             let status = self.keyed_event.release(self.key as *mut ffi::c_void);
191             debug_assert_eq!(status, STATUS_SUCCESS);
192         }
193     }
194 }
195