1 //! A linked list of debt nodes.
2 //!
3 //! A node may or may not be owned by a thread. Reader debts are allocated in its owned node,
4 //! writer walks everything (but may also use some owned values).
5 //!
6 //! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another
7 //! thread later on). This makes the implementation much simpler, since everything here is
8 //! `'static` and we don't have to care about knowing when to free stuff.
9 //!
10 //! The nodes contain both the fast primary slots and a secondary fallback ones.
11 //!
12 //! # Synchronization
13 //!
14 //! We synchronize several things here.
15 //!
16 //! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each
17 //! attempt to add another node). Note that certain parts never change after that (they aren't even
18 //! atomic) and other things that do change take care of themselves (the debt slots have their own
19 //! synchronization, etc).
20 //!
21 //! The ownership is acquire-release lock pattern.
22 //!
23 //! Similar, the counting of active writers is an acquire-release lock pattern.
24 //!
25 //! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see
26 //! at least as up to date value of the writers as when the cooldown started. That we if we see 0,
27 //! we know it must have happened since then.
28 
29 use std::cell::Cell;
30 use std::ptr;
31 use std::slice::Iter;
32 use std::sync::atomic::Ordering::*;
33 use std::sync::atomic::{AtomicPtr, AtomicUsize};
34 
35 use super::fast::{Local as FastLocal, Slots as FastSlots};
36 use super::helping::{Local as HelpingLocal, Slots as HelpingSlots};
37 use super::Debt;
38 use crate::RefCnt;
39 
40 const NODE_UNUSED: usize = 0;
41 const NODE_USED: usize = 1;
42 const NODE_COOLDOWN: usize = 2;
43 
44 /// The head of the debt linked list.
45 static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());
46 
47 pub struct NodeReservation<'a>(&'a Node);
48 
49 impl Drop for NodeReservation<'_> {
drop(&mut self)50     fn drop(&mut self) {
51         self.0.active_writers.fetch_sub(1, Release);
52     }
53 }
54 
55 /// One thread-local node for debts.
56 #[repr(C, align(64))]
57 pub(crate) struct Node {
58     fast: FastSlots,
59     helping: HelpingSlots,
60     in_use: AtomicUsize,
61     // Next node in the list.
62     //
63     // It is a pointer because we touch it before synchronization (we don't _dereference_ it before
64     // synchronization, only manipulate the pointer itself). That is illegal according to strict
65     // interpretation of the rules by MIRI on references.
66     next: *const Node,
67     active_writers: AtomicUsize,
68 }
69 
70 impl Default for Node {
default() -> Self71     fn default() -> Self {
72         Node {
73             fast: FastSlots::default(),
74             helping: HelpingSlots::default(),
75             in_use: AtomicUsize::new(NODE_USED),
76             next: ptr::null(),
77             active_writers: AtomicUsize::new(0),
78         }
79     }
80 }
81 
82 impl Node {
83     /// Goes through the debt linked list.
84     ///
85     /// This traverses the linked list, calling the closure on each node. If the closure returns
86     /// `Some`, it terminates with that value early, otherwise it runs to the end.
traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R>87     pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> {
88         // Acquire ‒ we want to make sure we read the correct version of data at the end of the
89         // pointer. Any write to the DEBT_HEAD is with Release.
90         //
91         // Furthermore, we need to see the newest version of the list in case we examine the debts
92         // - if a new one is added recently, we don't want a stale read -> SeqCst.
93         //
94         // Note that the other pointers in the chain never change and are *ordinary* pointers. The
95         // whole linked list is synchronized through the head.
96         let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() };
97         while let Some(node) = current {
98             let result = f(node);
99             if result.is_some() {
100                 return result;
101             }
102             current = unsafe { node.next.as_ref() };
103         }
104         None
105     }
106 
107     /// Put the current thread node into cooldown
start_cooldown(&self)108     fn start_cooldown(&self) {
109         // Trick: Make sure we have an up to date value of the active_writers in this thread, so we
110         // can properly release it below.
111         let _reservation = self.reserve_writer();
112         assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release));
113     }
114 
115     /// Perform a cooldown if the node is ready.
116     ///
117     /// See the ABA protection at the [helping].
check_cooldown(&self)118     fn check_cooldown(&self) {
119         // Check if the node is in cooldown, for two reasons:
120         // * Skip most of nodes fast, without dealing with them.
121         // * More importantly, sync the value of active_writers to be at least the value when the
122         //   cooldown started. That way we know the 0 we observe happened some time after
123         //   start_cooldown.
124         if self.in_use.load(Acquire) == NODE_COOLDOWN {
125             // The rest can be nicely relaxed ‒ no memory is being synchronized by these
126             // operations. We just see an up to date 0 and allow someone (possibly us) to claim the
127             // node later on.
128             if self.active_writers.load(Relaxed) == 0 {
129                 let _ = self
130                     .in_use
131                     .compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed);
132             }
133         }
134     }
135 
136     /// Mark this node that a writer is currently playing with it.
reserve_writer(&self) -> NodeReservation137     pub fn reserve_writer(&self) -> NodeReservation {
138         self.active_writers.fetch_add(1, Acquire);
139         NodeReservation(self)
140     }
141 
142     /// "Allocate" a node.
143     ///
144     /// Either a new one is created, or previous one is reused. The node is claimed to become
145     /// in_use.
get() -> &'static Self146     fn get() -> &'static Self {
147         // Try to find an unused one in the chain and reuse it.
148         Self::traverse(|node| {
149             node.check_cooldown();
150             if node
151                 .in_use
152                 // We claim a unique control over the generation and the right to write to slots if
153                 // they are NO_DEPT
154                 .compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed)
155                 .is_ok()
156             {
157                 Some(node)
158             } else {
159                 None
160             }
161         })
162         // If that didn't work, create a new one and prepend to the list.
163         .unwrap_or_else(|| {
164             let node = Box::leak(Box::<Node>::default());
165             node.helping.init();
166             // We don't want to read any data in addition to the head, Relaxed is fine
167             // here.
168             //
169             // We do need to release the data to others, but for that, we acquire in the
170             // compare_exchange below.
171             let mut head = LIST_HEAD.load(Relaxed);
172             loop {
173                 node.next = head;
174                 if let Err(old) = LIST_HEAD.compare_exchange_weak(
175                     head, node,
176                     // We need to release *the whole chain* here. For that, we need to
177                     // acquire it first.
178                     //
179                     // SeqCst because we need to make sure it is properly set "before" we do
180                     // anything to the debts.
181                     SeqCst, Relaxed, // Nothing changed, go next round of the loop.
182                 ) {
183                     head = old;
184                 } else {
185                     return node;
186                 }
187             }
188         })
189     }
190 
191     /// Iterate over the fast slots.
fast_slots(&self) -> Iter<Debt>192     pub(crate) fn fast_slots(&self) -> Iter<Debt> {
193         self.fast.into_iter()
194     }
195 
196     /// Access the helping slot.
helping_slot(&self) -> &Debt197     pub(crate) fn helping_slot(&self) -> &Debt {
198         self.helping.slot()
199     }
200 }
201 
202 /// A wrapper around a node pointer, to un-claim the node on thread shutdown.
203 pub(crate) struct LocalNode {
204     /// Node for this thread, if any.
205     ///
206     /// We don't necessarily have to own one, but if we don't, we'll get one before the first use.
207     node: Cell<Option<&'static Node>>,
208 
209     /// Thread-local data for the fast slots.
210     fast: FastLocal,
211 
212     /// Thread local data for the helping strategy.
213     helping: HelpingLocal,
214 }
215 
216 impl LocalNode {
with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R217     pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R {
218         let f = Cell::new(Some(f));
219         THREAD_HEAD
220             .try_with(|head| {
221                 if head.node.get().is_none() {
222                     head.node.set(Some(Node::get()));
223                 }
224                 let f = f.take().unwrap();
225                 f(head)
226             })
227             // During the application shutdown, the thread local storage may be already
228             // deallocated. In that case, the above fails but we still need something. So we just
229             // find or allocate a node and use it just once.
230             //
231             // Note that the situation should be very very rare and not happen often, so the slower
232             // performance doesn't matter that much.
233             .unwrap_or_else(|_| {
234                 let tmp_node = LocalNode {
235                     node: Cell::new(Some(Node::get())),
236                     fast: FastLocal::default(),
237                     helping: HelpingLocal::default(),
238                 };
239                 let f = f.take().unwrap();
240                 f(&tmp_node)
241                 // Drop of tmp_node -> sends the node we just used into cooldown.
242             })
243     }
244 
245     /// Creates a new debt.
246     ///
247     /// This stores the debt of the given pointer (untyped, casted into an usize) and returns a
248     /// reference to that slot, or gives up with `None` if all the slots are currently full.
249     #[inline]
new_fast(&self, ptr: usize) -> Option<&'static Debt>250     pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> {
251         let node = &self.node.get().expect("LocalNode::with ensures it is set");
252         debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
253         node.fast.get_debt(ptr, &self.fast)
254     }
255 
256     /// Initializes a helping slot transaction.
257     ///
258     /// Returns the generation (with tag).
new_helping(&self, ptr: usize) -> usize259     pub(crate) fn new_helping(&self, ptr: usize) -> usize {
260         let node = &self.node.get().expect("LocalNode::with ensures it is set");
261         debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
262         let (gen, discard) = node.helping.get_debt(ptr, &self.helping);
263         if discard {
264             // Too many generations happened, make sure the writers give the poor node a break for
265             // a while so they don't observe the generation wrapping around.
266             node.start_cooldown();
267             self.node.take();
268         }
269         gen
270     }
271 
272     /// Confirm the helping transaction.
273     ///
274     /// The generation comes from previous new_helping.
275     ///
276     /// Will either return a debt with the pointer, or a debt to pay and a replacement (already
277     /// protected) address.
confirm_helping( &self, gen: usize, ptr: usize, ) -> Result<&'static Debt, (&'static Debt, usize)>278     pub(crate) fn confirm_helping(
279         &self,
280         gen: usize,
281         ptr: usize,
282     ) -> Result<&'static Debt, (&'static Debt, usize)> {
283         let node = &self.node.get().expect("LocalNode::with ensures it is set");
284         debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
285         let slot = node.helping_slot();
286         node.helping
287             .confirm(gen, ptr)
288             .map(|()| slot)
289             .map_err(|repl| (slot, repl))
290     }
291 
292     /// The writer side of a helping slot.
293     ///
294     /// This potentially helps the `who` node (uses self as the local node, which must be
295     /// different) by loading the address that one is trying to load.
help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R) where T: RefCnt, R: Fn() -> T,296     pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R)
297     where
298         T: RefCnt,
299         R: Fn() -> T,
300     {
301         let node = &self.node.get().expect("LocalNode::with ensures it is set");
302         debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
303         node.helping.help(&who.helping, storage_addr, replacement)
304     }
305 }
306 
307 impl Drop for LocalNode {
drop(&mut self)308     fn drop(&mut self) {
309         if let Some(node) = self.node.get() {
310             // Release - syncing writes/ownership of this Node
311             node.start_cooldown();
312         }
313     }
314 }
315 
316 thread_local! {
317     /// A debt node assigned to this thread.
318     static THREAD_HEAD: LocalNode = LocalNode {
319         node: Cell::new(None),
320         fast: FastLocal::default(),
321         helping: HelpingLocal::default(),
322     };
323 }
324 
325 #[cfg(test)]
326 mod tests {
327     use super::*;
328 
329     impl Node {
is_empty(&self) -> bool330         fn is_empty(&self) -> bool {
331             self.fast_slots()
332                 .chain(std::iter::once(self.helping_slot()))
333                 .all(|d| d.0.load(Relaxed) == Debt::NONE)
334         }
335 
get_thread() -> &'static Self336         fn get_thread() -> &'static Self {
337             LocalNode::with(|h| h.node.get().unwrap())
338         }
339     }
340 
341     /// A freshly acquired thread local node is empty.
342     #[test]
new_empty()343     fn new_empty() {
344         assert!(Node::get_thread().is_empty());
345     }
346 }
347