1 use std::ptr::NonNull;
2 use std::sync::atomic::Ordering;
3 
4 use crate::loom::sync::{Mutex, MutexGuard};
5 use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize};
6 
7 use super::linked_list::{Link, LinkedList};
8 
9 /// An intrusive linked list supporting highly concurrent updates.
10 ///
11 /// It currently relies on `LinkedList`, so it is the caller's
12 /// responsibility to ensure the list is empty before dropping it.
13 ///
14 /// Note: Due to its inner sharded design, the order of nodes cannot be guaranteed.
15 pub(crate) struct ShardedList<L, T> {
16     lists: Box<[Mutex<LinkedList<L, T>>]>,
17     added: MetricAtomicU64,
18     count: MetricAtomicUsize,
19     shard_mask: usize,
20 }
21 
22 /// Determines which linked list an item should be stored in.
23 ///
24 /// # Safety
25 ///
26 /// Implementations must guarantee that the id of an item does not change from
27 /// call to call.
28 pub(crate) unsafe trait ShardedListItem: Link {
29     /// # Safety
30     /// The provided pointer must point at a valid list item.
get_shard_id(target: NonNull<Self::Target>) -> usize31     unsafe fn get_shard_id(target: NonNull<Self::Target>) -> usize;
32 }
33 
34 impl<L, T> ShardedList<L, T> {
35     /// Creates a new and empty sharded linked list with the specified size.
new(sharded_size: usize) -> Self36     pub(crate) fn new(sharded_size: usize) -> Self {
37         assert!(sharded_size.is_power_of_two());
38 
39         let shard_mask = sharded_size - 1;
40         let mut lists = Vec::with_capacity(sharded_size);
41         for _ in 0..sharded_size {
42             lists.push(Mutex::new(LinkedList::<L, T>::new()))
43         }
44         Self {
45             lists: lists.into_boxed_slice(),
46             added: MetricAtomicU64::new(0),
47             count: MetricAtomicUsize::new(0),
48             shard_mask,
49         }
50     }
51 }
52 
53 /// Used to get the lock of shard.
54 pub(crate) struct ShardGuard<'a, L, T> {
55     lock: MutexGuard<'a, LinkedList<L, T>>,
56     added: &'a MetricAtomicU64,
57     count: &'a MetricAtomicUsize,
58     id: usize,
59 }
60 
61 impl<L: ShardedListItem> ShardedList<L, L::Target> {
62     /// Removes the last element from a list specified by `shard_id` and returns it, or None if it is
63     /// empty.
pop_back(&self, shard_id: usize) -> Option<L::Handle>64     pub(crate) fn pop_back(&self, shard_id: usize) -> Option<L::Handle> {
65         let mut lock = self.shard_inner(shard_id);
66         let node = lock.pop_back();
67         if node.is_some() {
68             self.count.decrement();
69         }
70         node
71     }
72 
73     /// Removes the specified node from the list.
74     ///
75     /// # Safety
76     ///
77     /// The caller **must** ensure that exactly one of the following is true:
78     /// - `node` is currently contained by `self`,
79     /// - `node` is not contained by any list,
80     /// - `node` is currently contained by some other `GuardedLinkedList`.
remove(&self, node: NonNull<L::Target>) -> Option<L::Handle>81     pub(crate) unsafe fn remove(&self, node: NonNull<L::Target>) -> Option<L::Handle> {
82         let id = L::get_shard_id(node);
83         let mut lock = self.shard_inner(id);
84         // SAFETY: Since the shard id cannot change, it's not possible for this node
85         // to be in any other list of the same sharded list.
86         let node = unsafe { lock.remove(node) };
87         if node.is_some() {
88             self.count.decrement();
89         }
90         node
91     }
92 
93     /// Gets the lock of `ShardedList`, makes us have the write permission.
lock_shard(&self, val: &L::Handle) -> ShardGuard<'_, L, L::Target>94     pub(crate) fn lock_shard(&self, val: &L::Handle) -> ShardGuard<'_, L, L::Target> {
95         let id = unsafe { L::get_shard_id(L::as_raw(val)) };
96         ShardGuard {
97             lock: self.shard_inner(id),
98             added: &self.added,
99             count: &self.count,
100             id,
101         }
102     }
103 
104     /// Gets the count of elements in this list.
len(&self) -> usize105     pub(crate) fn len(&self) -> usize {
106         self.count.load(Ordering::Relaxed)
107     }
108 
109     cfg_64bit_metrics! {
110         /// Gets the total number of elements added to this list.
111         pub(crate) fn added(&self) -> u64 {
112             self.added.load(Ordering::Relaxed)
113         }
114     }
115 
116     /// Returns whether the linked list does not contain any node.
is_empty(&self) -> bool117     pub(crate) fn is_empty(&self) -> bool {
118         self.len() == 0
119     }
120 
121     /// Gets the shard size of this `SharedList`.
122     ///
123     /// Used to help us to decide the parameter `shard_id` of the `pop_back` method.
shard_size(&self) -> usize124     pub(crate) fn shard_size(&self) -> usize {
125         self.shard_mask + 1
126     }
127 
128     #[inline]
shard_inner(&self, id: usize) -> MutexGuard<'_, LinkedList<L, <L as Link>::Target>>129     fn shard_inner(&self, id: usize) -> MutexGuard<'_, LinkedList<L, <L as Link>::Target>> {
130         // Safety: This modulo operation ensures that the index is not out of bounds.
131         unsafe { self.lists.get_unchecked(id & self.shard_mask).lock() }
132     }
133 }
134 
135 impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> {
136     /// Push a value to this shard.
push(mut self, val: L::Handle)137     pub(crate) fn push(mut self, val: L::Handle) {
138         let id = unsafe { L::get_shard_id(L::as_raw(&val)) };
139         assert_eq!(id, self.id);
140         self.lock.push_front(val);
141         self.added.add(1, Ordering::Relaxed);
142         self.count.increment();
143     }
144 }
145 
146 cfg_taskdump! {
147     impl<L: ShardedListItem> ShardedList<L, L::Target> {
148         pub(crate) fn for_each<F>(&self, mut f: F)
149         where
150             F: FnMut(&L::Handle),
151         {
152             let mut guards = Vec::with_capacity(self.lists.len());
153             for list in self.lists.iter() {
154                 guards.push(list.lock());
155             }
156             for g in &mut guards {
157                 g.for_each(&mut f);
158             }
159         }
160     }
161 }
162