1 use crate::loom::cell::UnsafeCell;
2 use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
3 
4 use std::alloc::Layout;
5 use std::mem::MaybeUninit;
6 use std::ops;
7 use std::ptr::{self, NonNull};
8 use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
9 
10 /// A block in a linked list.
11 ///
12 /// Each block in the list can hold up to `BLOCK_CAP` messages.
13 pub(crate) struct Block<T> {
14     /// The header fields.
15     header: BlockHeader<T>,
16 
17     /// Array containing values pushed into the block. Values are stored in a
18     /// continuous array in order to improve cache line behavior when reading.
19     /// The values must be manually dropped.
20     values: Values<T>,
21 }
22 
23 /// Extra fields for a `Block<T>`.
24 struct BlockHeader<T> {
25     /// The start index of this block.
26     ///
27     /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
28     start_index: usize,
29 
30     /// The next block in the linked list.
31     next: AtomicPtr<Block<T>>,
32 
33     /// Bitfield tracking slots that are ready to have their values consumed.
34     ready_slots: AtomicUsize,
35 
36     /// The observed `tail_position` value *after* the block has been passed by
37     /// `block_tail`.
38     observed_tail_position: UnsafeCell<usize>,
39 }
40 
41 pub(crate) enum Read<T> {
42     Value(T),
43     Closed,
44 }
45 
46 #[repr(transparent)]
47 struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
48 
49 use super::BLOCK_CAP;
50 
51 /// Masks an index to get the block identifier.
52 const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
53 
54 /// Masks an index to get the value offset in a block.
55 const SLOT_MASK: usize = BLOCK_CAP - 1;
56 
57 /// Flag tracking that a block has gone through the sender's release routine.
58 ///
59 /// When this is set, the receiver may consider freeing the block.
60 const RELEASED: usize = 1 << BLOCK_CAP;
61 
62 /// Flag tracking all senders dropped.
63 ///
64 /// When this flag is set, the send half of the channel has closed.
65 const TX_CLOSED: usize = RELEASED << 1;
66 
67 /// Mask covering all bits used to track slot readiness.
68 const READY_MASK: usize = RELEASED - 1;
69 
70 /// Returns the index of the first slot in the block referenced by `slot_index`.
71 #[inline(always)]
start_index(slot_index: usize) -> usize72 pub(crate) fn start_index(slot_index: usize) -> usize {
73     BLOCK_MASK & slot_index
74 }
75 
76 /// Returns the offset into the block referenced by `slot_index`.
77 #[inline(always)]
offset(slot_index: usize) -> usize78 pub(crate) fn offset(slot_index: usize) -> usize {
79     SLOT_MASK & slot_index
80 }
81 
82 generate_addr_of_methods! {
83     impl<T> Block<T> {
84         unsafe fn addr_of_header(self: NonNull<Self>) -> NonNull<BlockHeader<T>> {
85             &self.header
86         }
87 
88         unsafe fn addr_of_values(self: NonNull<Self>) -> NonNull<Values<T>> {
89             &self.values
90         }
91     }
92 }
93 
94 impl<T> Block<T> {
new(start_index: usize) -> Box<Block<T>>95     pub(crate) fn new(start_index: usize) -> Box<Block<T>> {
96         unsafe {
97             // Allocate the block on the heap.
98             // SAFETY: The size of the Block<T> is non-zero, since it is at least the size of the header.
99             let block = std::alloc::alloc(Layout::new::<Block<T>>()) as *mut Block<T>;
100             let block = match NonNull::new(block) {
101                 Some(block) => block,
102                 None => std::alloc::handle_alloc_error(Layout::new::<Block<T>>()),
103             };
104 
105             // Write the header to the block.
106             Block::addr_of_header(block).as_ptr().write(BlockHeader {
107                 // The absolute index in the channel of the first slot in the block.
108                 start_index,
109 
110                 // Pointer to the next block in the linked list.
111                 next: AtomicPtr::new(ptr::null_mut()),
112 
113                 ready_slots: AtomicUsize::new(0),
114 
115                 observed_tail_position: UnsafeCell::new(0),
116             });
117 
118             // Initialize the values array.
119             Values::initialize(Block::addr_of_values(block));
120 
121             // Convert the pointer to a `Box`.
122             // Safety: The raw pointer was allocated using the global allocator, and with
123             // the layout for a `Block<T>`, so it's valid to convert it to box.
124             Box::from_raw(block.as_ptr())
125         }
126     }
127 
128     /// Returns `true` if the block matches the given index.
is_at_index(&self, index: usize) -> bool129     pub(crate) fn is_at_index(&self, index: usize) -> bool {
130         debug_assert!(offset(index) == 0);
131         self.header.start_index == index
132     }
133 
134     /// Returns the number of blocks between `self` and the block at the
135     /// specified index.
136     ///
137     /// `start_index` must represent a block *after* `self`.
distance(&self, other_index: usize) -> usize138     pub(crate) fn distance(&self, other_index: usize) -> usize {
139         debug_assert!(offset(other_index) == 0);
140         other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP
141     }
142 
143     /// Reads the value at the given offset.
144     ///
145     /// Returns `None` if the slot is empty.
146     ///
147     /// # Safety
148     ///
149     /// To maintain safety, the caller must ensure:
150     ///
151     /// * No concurrent access to the slot.
read(&self, slot_index: usize) -> Option<Read<T>>152     pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
153         let offset = offset(slot_index);
154 
155         let ready_bits = self.header.ready_slots.load(Acquire);
156 
157         if !is_ready(ready_bits, offset) {
158             if is_tx_closed(ready_bits) {
159                 return Some(Read::Closed);
160             }
161 
162             return None;
163         }
164 
165         // Get the value
166         let value = self.values[offset].with(|ptr| ptr::read(ptr));
167 
168         Some(Read::Value(value.assume_init()))
169     }
170 
171     /// Returns true if *this* block has a value in the given slot.
172     ///
173     /// Always returns false when given an index from a different block.
has_value(&self, slot_index: usize) -> bool174     pub(crate) fn has_value(&self, slot_index: usize) -> bool {
175         if slot_index < self.header.start_index {
176             return false;
177         }
178         if slot_index >= self.header.start_index + super::BLOCK_CAP {
179             return false;
180         }
181 
182         let offset = offset(slot_index);
183         let ready_bits = self.header.ready_slots.load(Acquire);
184         is_ready(ready_bits, offset)
185     }
186 
187     /// Writes a value to the block at the given offset.
188     ///
189     /// # Safety
190     ///
191     /// To maintain safety, the caller must ensure:
192     ///
193     /// * The slot is empty.
194     /// * No concurrent access to the slot.
write(&self, slot_index: usize, value: T)195     pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
196         // Get the offset into the block
197         let slot_offset = offset(slot_index);
198 
199         self.values[slot_offset].with_mut(|ptr| {
200             ptr::write(ptr, MaybeUninit::new(value));
201         });
202 
203         // Release the value. After this point, the slot ref may no longer
204         // be used. It is possible for the receiver to free the memory at
205         // any point.
206         self.set_ready(slot_offset);
207     }
208 
209     /// Signal to the receiver that the sender half of the list is closed.
tx_close(&self)210     pub(crate) unsafe fn tx_close(&self) {
211         self.header.ready_slots.fetch_or(TX_CLOSED, Release);
212     }
213 
is_closed(&self) -> bool214     pub(crate) unsafe fn is_closed(&self) -> bool {
215         let ready_bits = self.header.ready_slots.load(Acquire);
216         is_tx_closed(ready_bits)
217     }
218 
219     /// Resets the block to a blank state. This enables reusing blocks in the
220     /// channel.
221     ///
222     /// # Safety
223     ///
224     /// To maintain safety, the caller must ensure:
225     ///
226     /// * All slots are empty.
227     /// * The caller holds a unique pointer to the block.
reclaim(&mut self)228     pub(crate) unsafe fn reclaim(&mut self) {
229         self.header.start_index = 0;
230         self.header.next = AtomicPtr::new(ptr::null_mut());
231         self.header.ready_slots = AtomicUsize::new(0);
232     }
233 
234     /// Releases the block to the rx half for freeing.
235     ///
236     /// This function is called by the tx half once it can be guaranteed that no
237     /// more senders will attempt to access the block.
238     ///
239     /// # Safety
240     ///
241     /// To maintain safety, the caller must ensure:
242     ///
243     /// * The block will no longer be accessed by any sender.
tx_release(&self, tail_position: usize)244     pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
245         // Track the observed tail_position. Any sender targeting a greater
246         // tail_position is guaranteed to not access this block.
247         self.header
248             .observed_tail_position
249             .with_mut(|ptr| *ptr = tail_position);
250 
251         // Set the released bit, signalling to the receiver that it is safe to
252         // free the block's memory as soon as all slots **prior** to
253         // `observed_tail_position` have been filled.
254         self.header.ready_slots.fetch_or(RELEASED, Release);
255     }
256 
257     /// Mark a slot as ready
set_ready(&self, slot: usize)258     fn set_ready(&self, slot: usize) {
259         let mask = 1 << slot;
260         self.header.ready_slots.fetch_or(mask, Release);
261     }
262 
263     /// Returns `true` when all slots have their `ready` bits set.
264     ///
265     /// This indicates that the block is in its final state and will no longer
266     /// be mutated.
is_final(&self) -> bool267     pub(crate) fn is_final(&self) -> bool {
268         self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK
269     }
270 
271     /// Returns the `observed_tail_position` value, if set
observed_tail_position(&self) -> Option<usize>272     pub(crate) fn observed_tail_position(&self) -> Option<usize> {
273         if 0 == RELEASED & self.header.ready_slots.load(Acquire) {
274             None
275         } else {
276             Some(
277                 self.header
278                     .observed_tail_position
279                     .with(|ptr| unsafe { *ptr }),
280             )
281         }
282     }
283 
284     /// Loads the next block
load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>>285     pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
286         let ret = NonNull::new(self.header.next.load(ordering));
287 
288         debug_assert!(unsafe {
289             ret.map_or(true, |block| {
290                 block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP)
291             })
292         });
293 
294         ret
295     }
296 
297     /// Pushes `block` as the next block in the link.
298     ///
299     /// Returns Ok if successful, otherwise, a pointer to the next block in
300     /// the list is returned.
301     ///
302     /// This requires that the next pointer is null.
303     ///
304     /// # Ordering
305     ///
306     /// This performs a compare-and-swap on `next` using `AcqRel` ordering.
307     ///
308     /// # Safety
309     ///
310     /// To maintain safety, the caller must ensure:
311     ///
312     /// * `block` is not freed until it has been removed from the list.
try_push( &self, block: &mut NonNull<Block<T>>, success: Ordering, failure: Ordering, ) -> Result<(), NonNull<Block<T>>>313     pub(crate) unsafe fn try_push(
314         &self,
315         block: &mut NonNull<Block<T>>,
316         success: Ordering,
317         failure: Ordering,
318     ) -> Result<(), NonNull<Block<T>>> {
319         block.as_mut().header.start_index = self.header.start_index.wrapping_add(BLOCK_CAP);
320 
321         let next_ptr = self
322             .header
323             .next
324             .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
325             .unwrap_or_else(|x| x);
326 
327         match NonNull::new(next_ptr) {
328             Some(next_ptr) => Err(next_ptr),
329             None => Ok(()),
330         }
331     }
332 
333     /// Grows the `Block` linked list by allocating and appending a new block.
334     ///
335     /// The next block in the linked list is returned. This may or may not be
336     /// the one allocated by the function call.
337     ///
338     /// # Implementation
339     ///
340     /// It is assumed that `self.next` is null. A new block is allocated with
341     /// `start_index` set to be the next block. A compare-and-swap is performed
342     /// with `AcqRel` memory ordering. If the compare-and-swap is successful, the
343     /// newly allocated block is released to other threads walking the block
344     /// linked list. If the compare-and-swap fails, the current thread acquires
345     /// the next block in the linked list, allowing the current thread to access
346     /// the slots.
grow(&self) -> NonNull<Block<T>>347     pub(crate) fn grow(&self) -> NonNull<Block<T>> {
348         // Create the new block. It is assumed that the block will become the
349         // next one after `&self`. If this turns out to not be the case,
350         // `start_index` is updated accordingly.
351         let new_block = Block::new(self.header.start_index + BLOCK_CAP);
352 
353         let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
354 
355         // Attempt to store the block. The first compare-and-swap attempt is
356         // "unrolled" due to minor differences in logic
357         //
358         // `AcqRel` is used as the ordering **only** when attempting the
359         // compare-and-swap on self.next.
360         //
361         // If the compare-and-swap fails, then the actual value of the cell is
362         // returned from this function and accessed by the caller. Given this,
363         // the memory must be acquired.
364         //
365         // `Release` ensures that the newly allocated block is available to
366         // other threads acquiring the next pointer.
367         let next = NonNull::new(
368             self.header
369                 .next
370                 .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
371                 .unwrap_or_else(|x| x),
372         );
373 
374         let next = match next {
375             Some(next) => next,
376             None => {
377                 // The compare-and-swap succeeded and the newly allocated block
378                 // is successfully pushed.
379                 return new_block;
380             }
381         };
382 
383         // There already is a next block in the linked list. The newly allocated
384         // block could be dropped and the discovered next block returned;
385         // however, that would be wasteful. Instead, the linked list is walked
386         // by repeatedly attempting to compare-and-swap the pointer into the
387         // `next` register until the compare-and-swap succeed.
388         //
389         // Care is taken to update new_block's start_index field as appropriate.
390 
391         let mut curr = next;
392 
393         // TODO: Should this iteration be capped?
394         loop {
395             let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };
396 
397             curr = match actual {
398                 Ok(()) => {
399                     return next;
400                 }
401                 Err(curr) => curr,
402             };
403 
404             crate::loom::thread::yield_now();
405         }
406     }
407 }
408 
409 /// Returns `true` if the specified slot has a value ready to be consumed.
is_ready(bits: usize, slot: usize) -> bool410 fn is_ready(bits: usize, slot: usize) -> bool {
411     let mask = 1 << slot;
412     mask == mask & bits
413 }
414 
415 /// Returns `true` if the closed flag has been set.
is_tx_closed(bits: usize) -> bool416 fn is_tx_closed(bits: usize) -> bool {
417     TX_CLOSED == bits & TX_CLOSED
418 }
419 
420 impl<T> Values<T> {
421     /// Initialize a `Values` struct from a pointer.
422     ///
423     /// # Safety
424     ///
425     /// The raw pointer must be valid for writing a `Values<T>`.
initialize(_value: NonNull<Values<T>>)426     unsafe fn initialize(_value: NonNull<Values<T>>) {
427         // When fuzzing, `UnsafeCell` needs to be initialized.
428         if_loom! {
429             let p = _value.as_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
430             for i in 0..BLOCK_CAP {
431                 p.add(i)
432                     .write(UnsafeCell::new(MaybeUninit::uninit()));
433             }
434         }
435     }
436 }
437 
438 impl<T> ops::Index<usize> for Values<T> {
439     type Output = UnsafeCell<MaybeUninit<T>>;
440 
index(&self, index: usize) -> &Self::Output441     fn index(&self, index: usize) -> &Self::Output {
442         self.0.index(index)
443     }
444 }
445 
446 #[cfg(all(test, not(loom)))]
447 #[test]
assert_no_stack_overflow()448 fn assert_no_stack_overflow() {
449     // https://github.com/tokio-rs/tokio/issues/5293
450 
451     struct Foo {
452         _a: [u8; 2_000_000],
453     }
454 
455     assert_eq!(
456         Layout::new::<MaybeUninit<Block<Foo>>>(),
457         Layout::new::<Block<Foo>>()
458     );
459 
460     let _block = Block::<Foo>::new(0);
461 }
462