1 //! A linked list of debt nodes. 2 //! 3 //! A node may or may not be owned by a thread. Reader debts are allocated in its owned node, 4 //! writer walks everything (but may also use some owned values). 5 //! 6 //! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another 7 //! thread later on). This makes the implementation much simpler, since everything here is 8 //! `'static` and we don't have to care about knowing when to free stuff. 9 //! 10 //! The nodes contain both the fast primary slots and a secondary fallback ones. 11 //! 12 //! # Synchronization 13 //! 14 //! We synchronize several things here. 15 //! 16 //! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each 17 //! attempt to add another node). Note that certain parts never change after that (they aren't even 18 //! atomic) and other things that do change take care of themselves (the debt slots have their own 19 //! synchronization, etc). 20 //! 21 //! The ownership is acquire-release lock pattern. 22 //! 23 //! Similar, the counting of active writers is an acquire-release lock pattern. 24 //! 25 //! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see 26 //! at least as up to date value of the writers as when the cooldown started. That we if we see 0, 27 //! we know it must have happened since then. 28 29 use std::cell::Cell; 30 use std::ptr; 31 use std::slice::Iter; 32 use std::sync::atomic::Ordering::*; 33 use std::sync::atomic::{AtomicPtr, AtomicUsize}; 34 35 use super::fast::{Local as FastLocal, Slots as FastSlots}; 36 use super::helping::{Local as HelpingLocal, Slots as HelpingSlots}; 37 use super::Debt; 38 use crate::RefCnt; 39 40 const NODE_UNUSED: usize = 0; 41 const NODE_USED: usize = 1; 42 const NODE_COOLDOWN: usize = 2; 43 44 /// The head of the debt linked list. 45 static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut()); 46 47 pub struct NodeReservation<'a>(&'a Node); 48 49 impl Drop for NodeReservation<'_> { drop(&mut self)50 fn drop(&mut self) { 51 self.0.active_writers.fetch_sub(1, Release); 52 } 53 } 54 55 /// One thread-local node for debts. 56 #[repr(C, align(64))] 57 pub(crate) struct Node { 58 fast: FastSlots, 59 helping: HelpingSlots, 60 in_use: AtomicUsize, 61 // Next node in the list. 62 // 63 // It is a pointer because we touch it before synchronization (we don't _dereference_ it before 64 // synchronization, only manipulate the pointer itself). That is illegal according to strict 65 // interpretation of the rules by MIRI on references. 66 next: *const Node, 67 active_writers: AtomicUsize, 68 } 69 70 impl Default for Node { default() -> Self71 fn default() -> Self { 72 Node { 73 fast: FastSlots::default(), 74 helping: HelpingSlots::default(), 75 in_use: AtomicUsize::new(NODE_USED), 76 next: ptr::null(), 77 active_writers: AtomicUsize::new(0), 78 } 79 } 80 } 81 82 impl Node { 83 /// Goes through the debt linked list. 84 /// 85 /// This traverses the linked list, calling the closure on each node. If the closure returns 86 /// `Some`, it terminates with that value early, otherwise it runs to the end. traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R>87 pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> { 88 // Acquire ‒ we want to make sure we read the correct version of data at the end of the 89 // pointer. Any write to the DEBT_HEAD is with Release. 90 // 91 // Furthermore, we need to see the newest version of the list in case we examine the debts 92 // - if a new one is added recently, we don't want a stale read -> SeqCst. 93 // 94 // Note that the other pointers in the chain never change and are *ordinary* pointers. The 95 // whole linked list is synchronized through the head. 96 let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() }; 97 while let Some(node) = current { 98 let result = f(node); 99 if result.is_some() { 100 return result; 101 } 102 current = unsafe { node.next.as_ref() }; 103 } 104 None 105 } 106 107 /// Put the current thread node into cooldown start_cooldown(&self)108 fn start_cooldown(&self) { 109 // Trick: Make sure we have an up to date value of the active_writers in this thread, so we 110 // can properly release it below. 111 let _reservation = self.reserve_writer(); 112 assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release)); 113 } 114 115 /// Perform a cooldown if the node is ready. 116 /// 117 /// See the ABA protection at the [helping]. check_cooldown(&self)118 fn check_cooldown(&self) { 119 // Check if the node is in cooldown, for two reasons: 120 // * Skip most of nodes fast, without dealing with them. 121 // * More importantly, sync the value of active_writers to be at least the value when the 122 // cooldown started. That way we know the 0 we observe happened some time after 123 // start_cooldown. 124 if self.in_use.load(Acquire) == NODE_COOLDOWN { 125 // The rest can be nicely relaxed ‒ no memory is being synchronized by these 126 // operations. We just see an up to date 0 and allow someone (possibly us) to claim the 127 // node later on. 128 if self.active_writers.load(Relaxed) == 0 { 129 let _ = self 130 .in_use 131 .compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed); 132 } 133 } 134 } 135 136 /// Mark this node that a writer is currently playing with it. reserve_writer(&self) -> NodeReservation137 pub fn reserve_writer(&self) -> NodeReservation { 138 self.active_writers.fetch_add(1, Acquire); 139 NodeReservation(self) 140 } 141 142 /// "Allocate" a node. 143 /// 144 /// Either a new one is created, or previous one is reused. The node is claimed to become 145 /// in_use. get() -> &'static Self146 fn get() -> &'static Self { 147 // Try to find an unused one in the chain and reuse it. 148 Self::traverse(|node| { 149 node.check_cooldown(); 150 if node 151 .in_use 152 // We claim a unique control over the generation and the right to write to slots if 153 // they are NO_DEPT 154 .compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed) 155 .is_ok() 156 { 157 Some(node) 158 } else { 159 None 160 } 161 }) 162 // If that didn't work, create a new one and prepend to the list. 163 .unwrap_or_else(|| { 164 let node = Box::leak(Box::<Node>::default()); 165 node.helping.init(); 166 // We don't want to read any data in addition to the head, Relaxed is fine 167 // here. 168 // 169 // We do need to release the data to others, but for that, we acquire in the 170 // compare_exchange below. 171 let mut head = LIST_HEAD.load(Relaxed); 172 loop { 173 node.next = head; 174 if let Err(old) = LIST_HEAD.compare_exchange_weak( 175 head, node, 176 // We need to release *the whole chain* here. For that, we need to 177 // acquire it first. 178 // 179 // SeqCst because we need to make sure it is properly set "before" we do 180 // anything to the debts. 181 SeqCst, Relaxed, // Nothing changed, go next round of the loop. 182 ) { 183 head = old; 184 } else { 185 return node; 186 } 187 } 188 }) 189 } 190 191 /// Iterate over the fast slots. fast_slots(&self) -> Iter<Debt>192 pub(crate) fn fast_slots(&self) -> Iter<Debt> { 193 self.fast.into_iter() 194 } 195 196 /// Access the helping slot. helping_slot(&self) -> &Debt197 pub(crate) fn helping_slot(&self) -> &Debt { 198 self.helping.slot() 199 } 200 } 201 202 /// A wrapper around a node pointer, to un-claim the node on thread shutdown. 203 pub(crate) struct LocalNode { 204 /// Node for this thread, if any. 205 /// 206 /// We don't necessarily have to own one, but if we don't, we'll get one before the first use. 207 node: Cell<Option<&'static Node>>, 208 209 /// Thread-local data for the fast slots. 210 fast: FastLocal, 211 212 /// Thread local data for the helping strategy. 213 helping: HelpingLocal, 214 } 215 216 impl LocalNode { with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R217 pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R { 218 let f = Cell::new(Some(f)); 219 THREAD_HEAD 220 .try_with(|head| { 221 if head.node.get().is_none() { 222 head.node.set(Some(Node::get())); 223 } 224 let f = f.take().unwrap(); 225 f(head) 226 }) 227 // During the application shutdown, the thread local storage may be already 228 // deallocated. In that case, the above fails but we still need something. So we just 229 // find or allocate a node and use it just once. 230 // 231 // Note that the situation should be very very rare and not happen often, so the slower 232 // performance doesn't matter that much. 233 .unwrap_or_else(|_| { 234 let tmp_node = LocalNode { 235 node: Cell::new(Some(Node::get())), 236 fast: FastLocal::default(), 237 helping: HelpingLocal::default(), 238 }; 239 let f = f.take().unwrap(); 240 f(&tmp_node) 241 // Drop of tmp_node -> sends the node we just used into cooldown. 242 }) 243 } 244 245 /// Creates a new debt. 246 /// 247 /// This stores the debt of the given pointer (untyped, casted into an usize) and returns a 248 /// reference to that slot, or gives up with `None` if all the slots are currently full. 249 #[inline] new_fast(&self, ptr: usize) -> Option<&'static Debt>250 pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> { 251 let node = &self.node.get().expect("LocalNode::with ensures it is set"); 252 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); 253 node.fast.get_debt(ptr, &self.fast) 254 } 255 256 /// Initializes a helping slot transaction. 257 /// 258 /// Returns the generation (with tag). new_helping(&self, ptr: usize) -> usize259 pub(crate) fn new_helping(&self, ptr: usize) -> usize { 260 let node = &self.node.get().expect("LocalNode::with ensures it is set"); 261 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); 262 let (gen, discard) = node.helping.get_debt(ptr, &self.helping); 263 if discard { 264 // Too many generations happened, make sure the writers give the poor node a break for 265 // a while so they don't observe the generation wrapping around. 266 node.start_cooldown(); 267 self.node.take(); 268 } 269 gen 270 } 271 272 /// Confirm the helping transaction. 273 /// 274 /// The generation comes from previous new_helping. 275 /// 276 /// Will either return a debt with the pointer, or a debt to pay and a replacement (already 277 /// protected) address. confirm_helping( &self, gen: usize, ptr: usize, ) -> Result<&'static Debt, (&'static Debt, usize)>278 pub(crate) fn confirm_helping( 279 &self, 280 gen: usize, 281 ptr: usize, 282 ) -> Result<&'static Debt, (&'static Debt, usize)> { 283 let node = &self.node.get().expect("LocalNode::with ensures it is set"); 284 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); 285 let slot = node.helping_slot(); 286 node.helping 287 .confirm(gen, ptr) 288 .map(|()| slot) 289 .map_err(|repl| (slot, repl)) 290 } 291 292 /// The writer side of a helping slot. 293 /// 294 /// This potentially helps the `who` node (uses self as the local node, which must be 295 /// different) by loading the address that one is trying to load. help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R) where T: RefCnt, R: Fn() -> T,296 pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R) 297 where 298 T: RefCnt, 299 R: Fn() -> T, 300 { 301 let node = &self.node.get().expect("LocalNode::with ensures it is set"); 302 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); 303 node.helping.help(&who.helping, storage_addr, replacement) 304 } 305 } 306 307 impl Drop for LocalNode { drop(&mut self)308 fn drop(&mut self) { 309 if let Some(node) = self.node.get() { 310 // Release - syncing writes/ownership of this Node 311 node.start_cooldown(); 312 } 313 } 314 } 315 316 thread_local! { 317 /// A debt node assigned to this thread. 318 static THREAD_HEAD: LocalNode = LocalNode { 319 node: Cell::new(None), 320 fast: FastLocal::default(), 321 helping: HelpingLocal::default(), 322 }; 323 } 324 325 #[cfg(test)] 326 mod tests { 327 use super::*; 328 329 impl Node { is_empty(&self) -> bool330 fn is_empty(&self) -> bool { 331 self.fast_slots() 332 .chain(std::iter::once(self.helping_slot())) 333 .all(|d| d.0.load(Relaxed) == Debt::NONE) 334 } 335 get_thread() -> &'static Self336 fn get_thread() -> &'static Self { 337 LocalNode::with(|h| h.node.get().unwrap()) 338 } 339 } 340 341 /// A freshly acquired thread local node is empty. 342 #[test] new_empty()343 fn new_empty() { 344 assert!(Node::get_thread().is_empty()); 345 } 346 } 347