1 //! Thread-local context used in select.
2 
3 use std::cell::Cell;
4 use std::ptr;
5 use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
6 use std::sync::Arc;
7 use std::thread::{self, Thread, ThreadId};
8 use std::time::Instant;
9 
10 use crossbeam_utils::Backoff;
11 
12 use crate::select::Selected;
13 
14 /// Thread-local context used in select.
15 // This is a private API that is used by the select macro.
16 #[derive(Debug, Clone)]
17 pub struct Context {
18     inner: Arc<Inner>,
19 }
20 
21 /// Inner representation of `Context`.
22 #[derive(Debug)]
23 struct Inner {
24     /// Selected operation.
25     select: AtomicUsize,
26 
27     /// A slot into which another thread may store a pointer to its `Packet`.
28     packet: AtomicPtr<()>,
29 
30     /// Thread handle.
31     thread: Thread,
32 
33     /// Thread id.
34     thread_id: ThreadId,
35 }
36 
37 impl Context {
38     /// Creates a new context for the duration of the closure.
39     #[inline]
with<F, R>(f: F) -> R where F: FnOnce(&Context) -> R,40     pub fn with<F, R>(f: F) -> R
41     where
42         F: FnOnce(&Context) -> R,
43     {
44         std::thread_local! {
45             /// Cached thread-local context.
46             static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
47         }
48 
49         let mut f = Some(f);
50         let mut f = |cx: &Context| -> R {
51             let f = f.take().unwrap();
52             f(cx)
53         };
54 
55         CONTEXT
56             .try_with(|cell| match cell.take() {
57                 None => f(&Context::new()),
58                 Some(cx) => {
59                     cx.reset();
60                     let res = f(&cx);
61                     cell.set(Some(cx));
62                     res
63                 }
64             })
65             .unwrap_or_else(|_| f(&Context::new()))
66     }
67 
68     /// Creates a new `Context`.
69     #[cold]
new() -> Context70     fn new() -> Context {
71         Context {
72             inner: Arc::new(Inner {
73                 select: AtomicUsize::new(Selected::Waiting.into()),
74                 packet: AtomicPtr::new(ptr::null_mut()),
75                 thread: thread::current(),
76                 thread_id: thread::current().id(),
77             }),
78         }
79     }
80 
81     /// Resets `select` and `packet`.
82     #[inline]
reset(&self)83     fn reset(&self) {
84         self.inner
85             .select
86             .store(Selected::Waiting.into(), Ordering::Release);
87         self.inner.packet.store(ptr::null_mut(), Ordering::Release);
88     }
89 
90     /// Attempts to select an operation.
91     ///
92     /// On failure, the previously selected operation is returned.
93     #[inline]
try_select(&self, select: Selected) -> Result<(), Selected>94     pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
95         self.inner
96             .select
97             .compare_exchange(
98                 Selected::Waiting.into(),
99                 select.into(),
100                 Ordering::AcqRel,
101                 Ordering::Acquire,
102             )
103             .map(|_| ())
104             .map_err(|e| e.into())
105     }
106 
107     /// Returns the selected operation.
108     #[inline]
selected(&self) -> Selected109     pub fn selected(&self) -> Selected {
110         Selected::from(self.inner.select.load(Ordering::Acquire))
111     }
112 
113     /// Stores a packet.
114     ///
115     /// This method must be called after `try_select` succeeds and there is a packet to provide.
116     #[inline]
store_packet(&self, packet: *mut ())117     pub fn store_packet(&self, packet: *mut ()) {
118         if !packet.is_null() {
119             self.inner.packet.store(packet, Ordering::Release);
120         }
121     }
122 
123     /// Waits until a packet is provided and returns it.
124     #[inline]
wait_packet(&self) -> *mut ()125     pub fn wait_packet(&self) -> *mut () {
126         let backoff = Backoff::new();
127         loop {
128             let packet = self.inner.packet.load(Ordering::Acquire);
129             if !packet.is_null() {
130                 return packet;
131             }
132             backoff.snooze();
133         }
134     }
135 
136     /// Waits until an operation is selected and returns it.
137     ///
138     /// If the deadline is reached, `Selected::Aborted` will be selected.
139     #[inline]
wait_until(&self, deadline: Option<Instant>) -> Selected140     pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
141         // Spin for a short time, waiting until an operation is selected.
142         let backoff = Backoff::new();
143         loop {
144             let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
145             if sel != Selected::Waiting {
146                 return sel;
147             }
148 
149             if backoff.is_completed() {
150                 break;
151             } else {
152                 backoff.snooze();
153             }
154         }
155 
156         loop {
157             // Check whether an operation has been selected.
158             let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
159             if sel != Selected::Waiting {
160                 return sel;
161             }
162 
163             // If there's a deadline, park the current thread until the deadline is reached.
164             if let Some(end) = deadline {
165                 let now = Instant::now();
166 
167                 if now < end {
168                     thread::park_timeout(end - now);
169                 } else {
170                     // The deadline has been reached. Try aborting select.
171                     return match self.try_select(Selected::Aborted) {
172                         Ok(()) => Selected::Aborted,
173                         Err(s) => s,
174                     };
175                 }
176             } else {
177                 thread::park();
178             }
179         }
180     }
181 
182     /// Unparks the thread this context belongs to.
183     #[inline]
unpark(&self)184     pub fn unpark(&self) {
185         self.inner.thread.unpark();
186     }
187 
188     /// Returns the id of the thread this context belongs to.
189     #[inline]
thread_id(&self) -> ThreadId190     pub fn thread_id(&self) -> ThreadId {
191         self.inner.thread_id
192     }
193 }
194