1 //! Thread-local context used in select. 2 3 use std::cell::Cell; 4 use std::ptr; 5 use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; 6 use std::sync::Arc; 7 use std::thread::{self, Thread, ThreadId}; 8 use std::time::Instant; 9 10 use crossbeam_utils::Backoff; 11 12 use crate::select::Selected; 13 14 /// Thread-local context used in select. 15 // This is a private API that is used by the select macro. 16 #[derive(Debug, Clone)] 17 pub struct Context { 18 inner: Arc<Inner>, 19 } 20 21 /// Inner representation of `Context`. 22 #[derive(Debug)] 23 struct Inner { 24 /// Selected operation. 25 select: AtomicUsize, 26 27 /// A slot into which another thread may store a pointer to its `Packet`. 28 packet: AtomicPtr<()>, 29 30 /// Thread handle. 31 thread: Thread, 32 33 /// Thread id. 34 thread_id: ThreadId, 35 } 36 37 impl Context { 38 /// Creates a new context for the duration of the closure. 39 #[inline] with<F, R>(f: F) -> R where F: FnOnce(&Context) -> R,40 pub fn with<F, R>(f: F) -> R 41 where 42 F: FnOnce(&Context) -> R, 43 { 44 std::thread_local! { 45 /// Cached thread-local context. 46 static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new())); 47 } 48 49 let mut f = Some(f); 50 let mut f = |cx: &Context| -> R { 51 let f = f.take().unwrap(); 52 f(cx) 53 }; 54 55 CONTEXT 56 .try_with(|cell| match cell.take() { 57 None => f(&Context::new()), 58 Some(cx) => { 59 cx.reset(); 60 let res = f(&cx); 61 cell.set(Some(cx)); 62 res 63 } 64 }) 65 .unwrap_or_else(|_| f(&Context::new())) 66 } 67 68 /// Creates a new `Context`. 69 #[cold] new() -> Context70 fn new() -> Context { 71 Context { 72 inner: Arc::new(Inner { 73 select: AtomicUsize::new(Selected::Waiting.into()), 74 packet: AtomicPtr::new(ptr::null_mut()), 75 thread: thread::current(), 76 thread_id: thread::current().id(), 77 }), 78 } 79 } 80 81 /// Resets `select` and `packet`. 82 #[inline] reset(&self)83 fn reset(&self) { 84 self.inner 85 .select 86 .store(Selected::Waiting.into(), Ordering::Release); 87 self.inner.packet.store(ptr::null_mut(), Ordering::Release); 88 } 89 90 /// Attempts to select an operation. 91 /// 92 /// On failure, the previously selected operation is returned. 93 #[inline] try_select(&self, select: Selected) -> Result<(), Selected>94 pub fn try_select(&self, select: Selected) -> Result<(), Selected> { 95 self.inner 96 .select 97 .compare_exchange( 98 Selected::Waiting.into(), 99 select.into(), 100 Ordering::AcqRel, 101 Ordering::Acquire, 102 ) 103 .map(|_| ()) 104 .map_err(|e| e.into()) 105 } 106 107 /// Returns the selected operation. 108 #[inline] selected(&self) -> Selected109 pub fn selected(&self) -> Selected { 110 Selected::from(self.inner.select.load(Ordering::Acquire)) 111 } 112 113 /// Stores a packet. 114 /// 115 /// This method must be called after `try_select` succeeds and there is a packet to provide. 116 #[inline] store_packet(&self, packet: *mut ())117 pub fn store_packet(&self, packet: *mut ()) { 118 if !packet.is_null() { 119 self.inner.packet.store(packet, Ordering::Release); 120 } 121 } 122 123 /// Waits until a packet is provided and returns it. 124 #[inline] wait_packet(&self) -> *mut ()125 pub fn wait_packet(&self) -> *mut () { 126 let backoff = Backoff::new(); 127 loop { 128 let packet = self.inner.packet.load(Ordering::Acquire); 129 if !packet.is_null() { 130 return packet; 131 } 132 backoff.snooze(); 133 } 134 } 135 136 /// Waits until an operation is selected and returns it. 137 /// 138 /// If the deadline is reached, `Selected::Aborted` will be selected. 139 #[inline] wait_until(&self, deadline: Option<Instant>) -> Selected140 pub fn wait_until(&self, deadline: Option<Instant>) -> Selected { 141 // Spin for a short time, waiting until an operation is selected. 142 let backoff = Backoff::new(); 143 loop { 144 let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); 145 if sel != Selected::Waiting { 146 return sel; 147 } 148 149 if backoff.is_completed() { 150 break; 151 } else { 152 backoff.snooze(); 153 } 154 } 155 156 loop { 157 // Check whether an operation has been selected. 158 let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); 159 if sel != Selected::Waiting { 160 return sel; 161 } 162 163 // If there's a deadline, park the current thread until the deadline is reached. 164 if let Some(end) = deadline { 165 let now = Instant::now(); 166 167 if now < end { 168 thread::park_timeout(end - now); 169 } else { 170 // The deadline has been reached. Try aborting select. 171 return match self.try_select(Selected::Aborted) { 172 Ok(()) => Selected::Aborted, 173 Err(s) => s, 174 }; 175 } 176 } else { 177 thread::park(); 178 } 179 } 180 } 181 182 /// Unparks the thread this context belongs to. 183 #[inline] unpark(&self)184 pub fn unpark(&self) { 185 self.inner.thread.unpark(); 186 } 187 188 /// Returns the id of the thread this context belongs to. 189 #[inline] thread_id(&self) -> ThreadId190 pub fn thread_id(&self) -> ThreadId { 191 self.inner.thread_id 192 } 193 } 194