1 //! This module defines an `IdleNotifiedSet`, which is a collection of elements.
2 //! Each element is intended to correspond to a task, and the collection will
3 //! keep track of which tasks have had their waker notified, and which have not.
4 //!
5 //! Each entry in the set holds some user-specified value. The value's type is
6 //! specified using the `T` parameter. It will usually be a `JoinHandle` or
7 //! similar.
8 
9 use std::marker::PhantomPinned;
10 use std::mem::ManuallyDrop;
11 use std::ptr::NonNull;
12 use std::task::{Context, Waker};
13 
14 use crate::loom::cell::UnsafeCell;
15 use crate::loom::sync::{Arc, Mutex};
16 use crate::util::linked_list::{self, Link};
17 use crate::util::{waker_ref, Wake};
18 
19 type LinkedList<T> =
20     linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>;
21 
22 /// This is the main handle to the collection.
23 pub(crate) struct IdleNotifiedSet<T> {
24     lists: Arc<Lists<T>>,
25     length: usize,
26 }
27 
28 /// A handle to an entry that is guaranteed to be stored in the idle or notified
29 /// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet`
30 /// mutably to prevent the entry from being moved to the `Neither` list, which
31 /// only the `IdleNotifiedSet` may do.
32 ///
33 /// The main consequence of being stored in one of the lists is that the `value`
34 /// field has not yet been consumed.
35 ///
36 /// Note: This entry can be moved from the idle to the notified list while this
37 /// object exists by waking its waker.
38 pub(crate) struct EntryInOneOfTheLists<'a, T> {
39     entry: Arc<ListEntry<T>>,
40     set: &'a mut IdleNotifiedSet<T>,
41 }
42 
43 type Lists<T> = Mutex<ListsInner<T>>;
44 
45 /// The linked lists hold strong references to the `ListEntry` items, and the
46 /// `ListEntry` items also hold a strong reference back to the Lists object, but
47 /// the destructor of the `IdleNotifiedSet` will clear the two lists, so once
48 /// that object is destroyed, no ref-cycles will remain.
49 struct ListsInner<T> {
50     notified: LinkedList<T>,
51     idle: LinkedList<T>,
52     /// Whenever an element in the `notified` list is woken, this waker will be
53     /// notified and consumed, if it exists.
54     waker: Option<Waker>,
55 }
56 
57 /// Which of the two lists in the shared Lists object is this entry stored in?
58 ///
59 /// If the value is `Idle`, then an entry's waker may move it to the notified
60 /// list. Otherwise, only the `IdleNotifiedSet` may move it.
61 ///
62 /// If the value is `Neither`, then it is still possible that the entry is in
63 /// some third external list (this happens in `drain`).
64 #[derive(Copy, Clone, Eq, PartialEq)]
65 enum List {
66     Notified,
67     Idle,
68     Neither,
69 }
70 
71 /// An entry in the list.
72 ///
73 /// # Safety
74 ///
75 /// The `my_list` field must only be accessed while holding the mutex in
76 /// `parent`. It is an invariant that the value of `my_list` corresponds to
77 /// which linked list in the `parent` holds this entry. Once this field takes
78 /// the value `Neither`, then it may never be modified again.
79 ///
80 /// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field
81 /// must only be accessed while holding the mutex. If the value of `my_list` is
82 /// `Neither`, then the `pointers` field may be accessed by the
83 /// `IdleNotifiedSet` (this happens inside `drain`).
84 ///
85 /// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed
86 /// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to
87 /// `Neither` assumes ownership of the `value`, and it must either drop it or
88 /// move it out from this entry to prevent it from getting leaked. (Since the
89 /// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the
90 /// value should not be leaked.)
91 struct ListEntry<T> {
92     /// The linked list pointers of the list this entry is in.
93     pointers: linked_list::Pointers<ListEntry<T>>,
94     /// Pointer to the shared `Lists` struct.
95     parent: Arc<Lists<T>>,
96     /// The value stored in this entry.
97     value: UnsafeCell<ManuallyDrop<T>>,
98     /// Used to remember which list this entry is in.
99     my_list: UnsafeCell<List>,
100     /// Required by the `linked_list::Pointers` field.
101     _pin: PhantomPinned,
102 }
103 
104 generate_addr_of_methods! {
105     impl<T> ListEntry<T> {
106         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
107             &self.pointers
108         }
109     }
110 }
111 
112 // With mutable access to the `IdleNotifiedSet`, you can get mutable access to
113 // the values.
114 unsafe impl<T: Send> Send for IdleNotifiedSet<T> {}
115 // With the current API we strictly speaking don't even need `T: Sync`, but we
116 // require it anyway to support adding &self APIs that access the values in the
117 // future.
118 unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {}
119 
120 // These impls control when it is safe to create a Waker. Since the waker does
121 // not allow access to the value in any way (including its destructor), it is
122 // not necessary for `T` to be Send or Sync.
123 unsafe impl<T> Send for ListEntry<T> {}
124 unsafe impl<T> Sync for ListEntry<T> {}
125 
126 impl<T> IdleNotifiedSet<T> {
127     /// Create a new `IdleNotifiedSet`.
new() -> Self128     pub(crate) fn new() -> Self {
129         let lists = Mutex::new(ListsInner {
130             notified: LinkedList::new(),
131             idle: LinkedList::new(),
132             waker: None,
133         });
134 
135         IdleNotifiedSet {
136             lists: Arc::new(lists),
137             length: 0,
138         }
139     }
140 
len(&self) -> usize141     pub(crate) fn len(&self) -> usize {
142         self.length
143     }
144 
is_empty(&self) -> bool145     pub(crate) fn is_empty(&self) -> bool {
146         self.length == 0
147     }
148 
149     /// Insert the given value into the `idle` list.
insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T>150     pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> {
151         self.length += 1;
152 
153         let entry = Arc::new(ListEntry {
154             parent: self.lists.clone(),
155             value: UnsafeCell::new(ManuallyDrop::new(value)),
156             my_list: UnsafeCell::new(List::Idle),
157             pointers: linked_list::Pointers::new(),
158             _pin: PhantomPinned,
159         });
160 
161         {
162             let mut lock = self.lists.lock();
163             lock.idle.push_front(entry.clone());
164         }
165 
166         // Safety: We just put the entry in the idle list, so it is in one of the lists.
167         EntryInOneOfTheLists { entry, set: self }
168     }
169 
170     /// Pop an entry from the notified list to poll it. The entry is moved to
171     /// the idle list atomically.
pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>>172     pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> {
173         // We don't decrement the length because this call moves the entry to
174         // the idle list rather than removing it.
175         if self.length == 0 {
176             // Fast path.
177             return None;
178         }
179 
180         let mut lock = self.lists.lock();
181 
182         let should_update_waker = match lock.waker.as_mut() {
183             Some(cur_waker) => !waker.will_wake(cur_waker),
184             None => true,
185         };
186         if should_update_waker {
187             lock.waker = Some(waker.clone());
188         }
189 
190         // Pop the entry, returning None if empty.
191         let entry = lock.notified.pop_back()?;
192 
193         lock.idle.push_front(entry.clone());
194 
195         // Safety: We are holding the lock.
196         entry.my_list.with_mut(|ptr| unsafe {
197             *ptr = List::Idle;
198         });
199 
200         drop(lock);
201 
202         // Safety: We just put the entry in the idle list, so it is in one of the lists.
203         Some(EntryInOneOfTheLists { entry, set: self })
204     }
205 
206     /// Tries to pop an entry from the notified list to poll it. The entry is moved to
207     /// the idle list atomically.
try_pop_notified(&mut self) -> Option<EntryInOneOfTheLists<'_, T>>208     pub(crate) fn try_pop_notified(&mut self) -> Option<EntryInOneOfTheLists<'_, T>> {
209         // We don't decrement the length because this call moves the entry to
210         // the idle list rather than removing it.
211         if self.length == 0 {
212             // Fast path.
213             return None;
214         }
215 
216         let mut lock = self.lists.lock();
217 
218         // Pop the entry, returning None if empty.
219         let entry = lock.notified.pop_back()?;
220 
221         lock.idle.push_front(entry.clone());
222 
223         // Safety: We are holding the lock.
224         entry.my_list.with_mut(|ptr| unsafe {
225             *ptr = List::Idle;
226         });
227 
228         drop(lock);
229 
230         // Safety: We just put the entry in the idle list, so it is in one of the lists.
231         Some(EntryInOneOfTheLists { entry, set: self })
232     }
233 
234     /// Call a function on every element in this list.
for_each<F: FnMut(&mut T)>(&mut self, mut func: F)235     pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) {
236         fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) {
237             let mut node = list.last();
238 
239             while let Some(entry) = node {
240                 ptrs.push(entry.value.with_mut(|ptr| {
241                     let ptr: *mut ManuallyDrop<T> = ptr;
242                     let ptr: *mut T = ptr.cast();
243                     ptr
244                 }));
245 
246                 let prev = entry.pointers.get_prev();
247                 node = prev.map(|prev| unsafe { &*prev.as_ptr() });
248             }
249         }
250 
251         // Atomically get a raw pointer to the value of every entry.
252         //
253         // Since this only locks the mutex once, it is not possible for a value
254         // to get moved from the idle list to the notified list during the
255         // operation, which would otherwise result in some value being listed
256         // twice.
257         let mut ptrs = Vec::with_capacity(self.len());
258         {
259             let mut lock = self.lists.lock();
260 
261             get_ptrs(&mut lock.idle, &mut ptrs);
262             get_ptrs(&mut lock.notified, &mut ptrs);
263         }
264         debug_assert_eq!(ptrs.len(), ptrs.capacity());
265 
266         for ptr in ptrs {
267             // Safety: When we grabbed the pointers, the entries were in one of
268             // the two lists. This means that their value was valid at the time,
269             // and it must still be valid because we are the IdleNotifiedSet,
270             // and only we can remove an entry from the two lists. (It's
271             // possible that an entry is moved from one list to the other during
272             // this loop, but that is ok.)
273             func(unsafe { &mut *ptr });
274         }
275     }
276 
277     /// Remove all entries in both lists, applying some function to each element.
278     ///
279     /// The closure is called on all elements even if it panics. Having it panic
280     /// twice is a double-panic, and will abort the application.
drain<F: FnMut(T)>(&mut self, func: F)281     pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) {
282         if self.length == 0 {
283             // Fast path.
284             return;
285         }
286         self.length = 0;
287 
288         // The LinkedList is not cleared on panic, so we use a bomb to clear it.
289         //
290         // This value has the invariant that any entry in its `all_entries` list
291         // has `my_list` set to `Neither` and that the value has not yet been
292         // dropped.
293         struct AllEntries<T, F: FnMut(T)> {
294             all_entries: LinkedList<T>,
295             func: F,
296         }
297 
298         impl<T, F: FnMut(T)> AllEntries<T, F> {
299             fn pop_next(&mut self) -> bool {
300                 if let Some(entry) = self.all_entries.pop_back() {
301                     // Safety: We just took this value from the list, so we can
302                     // destroy the value in the entry.
303                     entry
304                         .value
305                         .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) });
306                     true
307                 } else {
308                     false
309                 }
310             }
311         }
312 
313         impl<T, F: FnMut(T)> Drop for AllEntries<T, F> {
314             fn drop(&mut self) {
315                 while self.pop_next() {}
316             }
317         }
318 
319         let mut all_entries = AllEntries {
320             all_entries: LinkedList::new(),
321             func,
322         };
323 
324         // Atomically move all entries to the new linked list in the AllEntries
325         // object.
326         {
327             let mut lock = self.lists.lock();
328             unsafe {
329                 // Safety: We are holding the lock and `all_entries` is a new
330                 // LinkedList.
331                 move_to_new_list(&mut lock.idle, &mut all_entries.all_entries);
332                 move_to_new_list(&mut lock.notified, &mut all_entries.all_entries);
333             }
334         }
335 
336         // Keep destroying entries in the list until it is empty.
337         //
338         // If the closure panics, then the destructor of the `AllEntries` bomb
339         // ensures that we keep running the destructor on the remaining values.
340         // A second panic will abort the program.
341         while all_entries.pop_next() {}
342     }
343 }
344 
345 /// # Safety
346 ///
347 /// The mutex for the entries must be held, and the target list must be such
348 /// that setting `my_list` to `Neither` is ok.
move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>)349 unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) {
350     while let Some(entry) = from.pop_back() {
351         entry.my_list.with_mut(|ptr| {
352             *ptr = List::Neither;
353         });
354         to.push_front(entry);
355     }
356 }
357 
358 impl<'a, T> EntryInOneOfTheLists<'a, T> {
359     /// Remove this entry from the list it is in, returning the value associated
360     /// with the entry.
361     ///
362     /// This consumes the value, since it is no longer guaranteed to be in a
363     /// list.
remove(self) -> T364     pub(crate) fn remove(self) -> T {
365         self.set.length -= 1;
366 
367         {
368             let mut lock = self.set.lists.lock();
369 
370             // Safety: We are holding the lock so there is no race, and we will
371             // remove the entry afterwards to uphold invariants.
372             let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe {
373                 let old_my_list = *ptr;
374                 *ptr = List::Neither;
375                 old_my_list
376             });
377 
378             let list = match old_my_list {
379                 List::Idle => &mut lock.idle,
380                 List::Notified => &mut lock.notified,
381                 // An entry in one of the lists is in one of the lists.
382                 List::Neither => unreachable!(),
383             };
384 
385             unsafe {
386                 // Safety: We just checked that the entry is in this particular
387                 // list.
388                 list.remove(ListEntry::as_raw(&self.entry)).unwrap();
389             }
390         }
391 
392         // By setting `my_list` to `Neither`, we have taken ownership of the
393         // value. We return it to the caller.
394         //
395         // Safety: We have a mutable reference to the `IdleNotifiedSet` that
396         // owns this entry, so we can use its permission to access the value.
397         self.entry
398             .value
399             .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) })
400     }
401 
402     /// Access the value in this entry together with a context for its waker.
with_value_and_context<F, U>(&mut self, func: F) -> U where F: FnOnce(&mut T, &mut Context<'_>) -> U, T: 'static,403     pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U
404     where
405         F: FnOnce(&mut T, &mut Context<'_>) -> U,
406         T: 'static,
407     {
408         let waker = waker_ref(&self.entry);
409 
410         let mut context = Context::from_waker(&waker);
411 
412         // Safety: We have a mutable reference to the `IdleNotifiedSet` that
413         // owns this entry, so we can use its permission to access the value.
414         self.entry
415             .value
416             .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) })
417     }
418 }
419 
420 impl<T> Drop for IdleNotifiedSet<T> {
drop(&mut self)421     fn drop(&mut self) {
422         // Clear both lists.
423         self.drain(drop);
424 
425         #[cfg(debug_assertions)]
426         if !std::thread::panicking() {
427             let lock = self.lists.lock();
428             assert!(lock.idle.is_empty());
429             assert!(lock.notified.is_empty());
430         }
431     }
432 }
433 
434 impl<T: 'static> Wake for ListEntry<T> {
wake_by_ref(me: &Arc<Self>)435     fn wake_by_ref(me: &Arc<Self>) {
436         let mut lock = me.parent.lock();
437 
438         // Safety: We are holding the lock and we will update the lists to
439         // maintain invariants.
440         let old_my_list = me.my_list.with_mut(|ptr| unsafe {
441             let old_my_list = *ptr;
442             if old_my_list == List::Idle {
443                 *ptr = List::Notified;
444             }
445             old_my_list
446         });
447 
448         if old_my_list == List::Idle {
449             // We move ourself to the notified list.
450             let me = unsafe {
451                 // Safety: We just checked that we are in this particular list.
452                 lock.idle.remove(ListEntry::as_raw(me)).unwrap()
453             };
454             lock.notified.push_front(me);
455 
456             if let Some(waker) = lock.waker.take() {
457                 drop(lock);
458                 waker.wake();
459             }
460         }
461     }
462 
wake(me: Arc<Self>)463     fn wake(me: Arc<Self>) {
464         Self::wake_by_ref(&me);
465     }
466 }
467 
468 /// # Safety
469 ///
470 /// `ListEntry` is forced to be !Unpin.
471 unsafe impl<T> linked_list::Link for ListEntry<T> {
472     type Handle = Arc<ListEntry<T>>;
473     type Target = ListEntry<T>;
474 
as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>>475     fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> {
476         let ptr: *const ListEntry<T> = Arc::as_ptr(handle);
477         // Safety: We can't get a null pointer from `Arc::as_ptr`.
478         unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) }
479     }
480 
from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>>481     unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> {
482         Arc::from_raw(ptr.as_ptr())
483     }
484 
pointers( target: NonNull<ListEntry<T>>, ) -> NonNull<linked_list::Pointers<ListEntry<T>>>485     unsafe fn pointers(
486         target: NonNull<ListEntry<T>>,
487     ) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
488         ListEntry::addr_of_pointers(target)
489     }
490 }
491 
492 #[cfg(all(test, not(loom)))]
493 mod tests {
494     use crate::runtime::Builder;
495     use crate::task::JoinSet;
496 
497     // A test that runs under miri.
498     //
499     // https://github.com/tokio-rs/tokio/pull/5693
500     #[test]
join_set_test()501     fn join_set_test() {
502         let rt = Builder::new_current_thread().build().unwrap();
503 
504         let mut set = JoinSet::new();
505         set.spawn_on(futures::future::ready(()), rt.handle());
506 
507         rt.block_on(set.join_next()).unwrap().unwrap();
508     }
509 }
510