1 // Allow `unreachable_pub` warnings when sync is not enabled
2 // due to the usage of `Notify` within the `rt` feature set.
3 // When this module is compiled with `sync` enabled we will warn on
4 // this lint. When `rt` is enabled we use `pub(crate)` which
5 // triggers this warning but it is safe to ignore in this case.
6 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7 
8 use crate::loom::cell::UnsafeCell;
9 use crate::loom::sync::atomic::AtomicUsize;
10 use crate::loom::sync::Mutex;
11 use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
12 use crate::util::WakeList;
13 
14 use std::future::Future;
15 use std::marker::PhantomPinned;
16 use std::panic::{RefUnwindSafe, UnwindSafe};
17 use std::pin::Pin;
18 use std::ptr::NonNull;
19 use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20 use std::task::{Context, Poll, Waker};
21 
22 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
23 type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
24 
25 /// Notifies a single task to wake up.
26 ///
27 /// `Notify` provides a basic mechanism to notify a single task of an event.
28 /// `Notify` itself does not carry any data. Instead, it is to be used to signal
29 /// another task to perform an operation.
30 ///
31 /// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
32 /// [`notified().await`] method waits for a permit to become available, and
33 /// [`notify_one()`] sets a permit **if there currently are no available
34 /// permits**.
35 ///
36 /// The synchronization details of `Notify` are similar to
37 /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
38 /// value contains a single permit. [`notified().await`] waits for the permit to
39 /// be made available, consumes the permit, and resumes.  [`notify_one()`] sets
40 /// the permit, waking a pending task if there is one.
41 ///
42 /// If `notify_one()` is called **before** `notified().await`, then the next
43 /// call to `notified().await` will complete immediately, consuming the permit.
44 /// Any subsequent calls to `notified().await` will wait for a new permit.
45 ///
46 /// If `notify_one()` is called **multiple** times before `notified().await`,
47 /// only a **single** permit is stored. The next call to `notified().await` will
48 /// complete immediately, but the one after will wait for a new permit.
49 ///
50 /// # Examples
51 ///
52 /// Basic usage.
53 ///
54 /// ```
55 /// use tokio::sync::Notify;
56 /// use std::sync::Arc;
57 ///
58 /// #[tokio::main]
59 /// async fn main() {
60 ///     let notify = Arc::new(Notify::new());
61 ///     let notify2 = notify.clone();
62 ///
63 ///     let handle = tokio::spawn(async move {
64 ///         notify2.notified().await;
65 ///         println!("received notification");
66 ///     });
67 ///
68 ///     println!("sending notification");
69 ///     notify.notify_one();
70 ///
71 ///     // Wait for task to receive notification.
72 ///     handle.await.unwrap();
73 /// }
74 /// ```
75 ///
76 /// Unbound multi-producer single-consumer (mpsc) channel.
77 ///
78 /// No wakeups can be lost when using this channel because the call to
79 /// `notify_one()` will store a permit in the `Notify`, which the following call
80 /// to `notified()` will consume.
81 ///
82 /// ```
83 /// use tokio::sync::Notify;
84 ///
85 /// use std::collections::VecDeque;
86 /// use std::sync::Mutex;
87 ///
88 /// struct Channel<T> {
89 ///     values: Mutex<VecDeque<T>>,
90 ///     notify: Notify,
91 /// }
92 ///
93 /// impl<T> Channel<T> {
94 ///     pub fn send(&self, value: T) {
95 ///         self.values.lock().unwrap()
96 ///             .push_back(value);
97 ///
98 ///         // Notify the consumer a value is available
99 ///         self.notify.notify_one();
100 ///     }
101 ///
102 ///     // This is a single-consumer channel, so several concurrent calls to
103 ///     // `recv` are not allowed.
104 ///     pub async fn recv(&self) -> T {
105 ///         loop {
106 ///             // Drain values
107 ///             if let Some(value) = self.values.lock().unwrap().pop_front() {
108 ///                 return value;
109 ///             }
110 ///
111 ///             // Wait for values to be available
112 ///             self.notify.notified().await;
113 ///         }
114 ///     }
115 /// }
116 /// ```
117 ///
118 /// Unbound multi-producer multi-consumer (mpmc) channel.
119 ///
120 /// The call to [`enable`] is important because otherwise if you have two
121 /// calls to `recv` and two calls to `send` in parallel, the following could
122 /// happen:
123 ///
124 ///  1. Both calls to `try_recv` return `None`.
125 ///  2. Both new elements are added to the vector.
126 ///  3. The `notify_one` method is called twice, adding only a single
127 ///     permit to the `Notify`.
128 ///  4. Both calls to `recv` reach the `Notified` future. One of them
129 ///     consumes the permit, and the other sleeps forever.
130 ///
131 /// By adding the `Notified` futures to the list by calling `enable` before
132 /// `try_recv`, the `notify_one` calls in step three would remove the
133 /// futures from the list and mark them notified instead of adding a permit
134 /// to the `Notify`. This ensures that both futures are woken.
135 ///
136 /// Notice that this failure can only happen if there are two concurrent calls
137 /// to `recv`. This is why the mpsc example above does not require a call to
138 /// `enable`.
139 ///
140 /// ```
141 /// use tokio::sync::Notify;
142 ///
143 /// use std::collections::VecDeque;
144 /// use std::sync::Mutex;
145 ///
146 /// struct Channel<T> {
147 ///     messages: Mutex<VecDeque<T>>,
148 ///     notify_on_sent: Notify,
149 /// }
150 ///
151 /// impl<T> Channel<T> {
152 ///     pub fn send(&self, msg: T) {
153 ///         let mut locked_queue = self.messages.lock().unwrap();
154 ///         locked_queue.push_back(msg);
155 ///         drop(locked_queue);
156 ///
157 ///         // Send a notification to one of the calls currently
158 ///         // waiting in a call to `recv`.
159 ///         self.notify_on_sent.notify_one();
160 ///     }
161 ///
162 ///     pub fn try_recv(&self) -> Option<T> {
163 ///         let mut locked_queue = self.messages.lock().unwrap();
164 ///         locked_queue.pop_front()
165 ///     }
166 ///
167 ///     pub async fn recv(&self) -> T {
168 ///         let future = self.notify_on_sent.notified();
169 ///         tokio::pin!(future);
170 ///
171 ///         loop {
172 ///             // Make sure that no wakeup is lost if we get
173 ///             // `None` from `try_recv`.
174 ///             future.as_mut().enable();
175 ///
176 ///             if let Some(msg) = self.try_recv() {
177 ///                 return msg;
178 ///             }
179 ///
180 ///             // Wait for a call to `notify_one`.
181 ///             //
182 ///             // This uses `.as_mut()` to avoid consuming the future,
183 ///             // which lets us call `Pin::set` below.
184 ///             future.as_mut().await;
185 ///
186 ///             // Reset the future in case another call to
187 ///             // `try_recv` got the message before us.
188 ///             future.set(self.notify_on_sent.notified());
189 ///         }
190 ///     }
191 /// }
192 /// ```
193 ///
194 /// [park]: std::thread::park
195 /// [unpark]: std::thread::Thread::unpark
196 /// [`notified().await`]: Notify::notified()
197 /// [`notify_one()`]: Notify::notify_one()
198 /// [`enable`]: Notified::enable()
199 /// [`Semaphore`]: crate::sync::Semaphore
200 #[derive(Debug)]
201 pub struct Notify {
202     // `state` uses 2 bits to store one of `EMPTY`,
203     // `WAITING` or `NOTIFIED`. The rest of the bits
204     // are used to store the number of times `notify_waiters`
205     // was called.
206     //
207     // Throughout the code there are two assumptions:
208     // - state can be transitioned *from* `WAITING` only if
209     //   `waiters` lock is held
210     // - number of times `notify_waiters` was called can
211     //   be modified only if `waiters` lock is held
212     state: AtomicUsize,
213     waiters: Mutex<WaitList>,
214 }
215 
216 #[derive(Debug)]
217 struct Waiter {
218     /// Intrusive linked-list pointers.
219     pointers: linked_list::Pointers<Waiter>,
220 
221     /// Waiting task's waker. Depending on the value of `notification`,
222     /// this field is either protected by the `waiters` lock in
223     /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
224     waker: UnsafeCell<Option<Waker>>,
225 
226     /// Notification for this waiter. Uses 2 bits to store if and how was
227     /// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and
228     /// the rest of it is unused.
229     /// * if it's `None`, then `waker` is protected by the `waiters` lock.
230     /// * if it's `Some`, then `waker` is exclusively owned by the
231     ///   enclosing `Waiter` and can be accessed without locking.
232     notification: AtomicNotification,
233 
234     /// Should not be `Unpin`.
235     _p: PhantomPinned,
236 }
237 
238 impl Waiter {
new() -> Waiter239     fn new() -> Waiter {
240         Waiter {
241             pointers: linked_list::Pointers::new(),
242             waker: UnsafeCell::new(None),
243             notification: AtomicNotification::none(),
244             _p: PhantomPinned,
245         }
246     }
247 }
248 
249 generate_addr_of_methods! {
250     impl<> Waiter {
251         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
252             &self.pointers
253         }
254     }
255 }
256 
257 // No notification.
258 const NOTIFICATION_NONE: usize = 0b000;
259 
260 // Notification type used by `notify_one`.
261 const NOTIFICATION_ONE: usize = 0b001;
262 
263 // Notification type used by `notify_last`.
264 const NOTIFICATION_LAST: usize = 0b101;
265 
266 // Notification type used by `notify_waiters`.
267 const NOTIFICATION_ALL: usize = 0b010;
268 
269 /// Notification for a `Waiter`.
270 /// This struct is equivalent to `Option<Notification>`, but uses
271 /// `AtomicUsize` inside for atomic operations.
272 #[derive(Debug)]
273 struct AtomicNotification(AtomicUsize);
274 
275 impl AtomicNotification {
none() -> Self276     fn none() -> Self {
277         AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
278     }
279 
280     /// Store-release a notification.
281     /// This method should be called exactly once.
store_release(&self, notification: Notification)282     fn store_release(&self, notification: Notification) {
283         let data: usize = match notification {
284             Notification::All => NOTIFICATION_ALL,
285             Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE,
286             Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST,
287         };
288         self.0.store(data, Release);
289     }
290 
load(&self, ordering: Ordering) -> Option<Notification>291     fn load(&self, ordering: Ordering) -> Option<Notification> {
292         let data = self.0.load(ordering);
293         match data {
294             NOTIFICATION_NONE => None,
295             NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)),
296             NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)),
297             NOTIFICATION_ALL => Some(Notification::All),
298             _ => unreachable!(),
299         }
300     }
301 
302     /// Clears the notification.
303     /// This method is used by a `Notified` future to consume the
304     /// notification. It uses relaxed ordering and should be only
305     /// used once the atomic notification is no longer shared.
clear(&self)306     fn clear(&self) {
307         self.0.store(NOTIFICATION_NONE, Relaxed);
308     }
309 }
310 
311 #[derive(Debug, PartialEq, Eq)]
312 #[repr(usize)]
313 enum NotifyOneStrategy {
314     Fifo,
315     Lifo,
316 }
317 
318 #[derive(Debug, PartialEq, Eq)]
319 #[repr(usize)]
320 enum Notification {
321     One(NotifyOneStrategy),
322     All,
323 }
324 
325 /// List used in `Notify::notify_waiters`. It wraps a guarded linked list
326 /// and gates the access to it on `notify.waiters` mutex. It also empties
327 /// the list on drop.
328 struct NotifyWaitersList<'a> {
329     list: GuardedWaitList,
330     is_empty: bool,
331     notify: &'a Notify,
332 }
333 
334 impl<'a> NotifyWaitersList<'a> {
new( unguarded_list: WaitList, guard: Pin<&'a Waiter>, notify: &'a Notify, ) -> NotifyWaitersList<'a>335     fn new(
336         unguarded_list: WaitList,
337         guard: Pin<&'a Waiter>,
338         notify: &'a Notify,
339     ) -> NotifyWaitersList<'a> {
340         let guard_ptr = NonNull::from(guard.get_ref());
341         let list = unguarded_list.into_guarded(guard_ptr);
342         NotifyWaitersList {
343             list,
344             is_empty: false,
345             notify,
346         }
347     }
348 
349     /// Removes the last element from the guarded list. Modifying this list
350     /// requires an exclusive access to the main list in `Notify`.
pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>>351     fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
352         let result = self.list.pop_back();
353         if result.is_none() {
354             // Save information about emptiness to avoid waiting for lock
355             // in the destructor.
356             self.is_empty = true;
357         }
358         result
359     }
360 }
361 
362 impl Drop for NotifyWaitersList<'_> {
drop(&mut self)363     fn drop(&mut self) {
364         // If the list is not empty, we unlink all waiters from it.
365         // We do not wake the waiters to avoid double panics.
366         if !self.is_empty {
367             let _lock_guard = self.notify.waiters.lock();
368             while let Some(waiter) = self.list.pop_back() {
369                 // Safety: we never make mutable references to waiters.
370                 let waiter = unsafe { waiter.as_ref() };
371                 waiter.notification.store_release(Notification::All);
372             }
373         }
374     }
375 }
376 
377 /// Future returned from [`Notify::notified()`].
378 ///
379 /// This future is fused, so once it has completed, any future calls to poll
380 /// will immediately return `Poll::Ready`.
381 #[derive(Debug)]
382 #[must_use = "futures do nothing unless you `.await` or poll them"]
383 pub struct Notified<'a> {
384     /// The `Notify` being received on.
385     notify: &'a Notify,
386 
387     /// The current state of the receiving process.
388     state: State,
389 
390     /// Number of calls to `notify_waiters` at the time of creation.
391     notify_waiters_calls: usize,
392 
393     /// Entry in the waiter `LinkedList`.
394     waiter: Waiter,
395 }
396 
397 unsafe impl<'a> Send for Notified<'a> {}
398 unsafe impl<'a> Sync for Notified<'a> {}
399 
400 #[derive(Debug)]
401 enum State {
402     Init,
403     Waiting,
404     Done,
405 }
406 
407 const NOTIFY_WAITERS_SHIFT: usize = 2;
408 const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
409 const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
410 
411 /// Initial "idle" state.
412 const EMPTY: usize = 0;
413 
414 /// One or more threads are currently waiting to be notified.
415 const WAITING: usize = 1;
416 
417 /// Pending notification.
418 const NOTIFIED: usize = 2;
419 
set_state(data: usize, state: usize) -> usize420 fn set_state(data: usize, state: usize) -> usize {
421     (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
422 }
423 
get_state(data: usize) -> usize424 fn get_state(data: usize) -> usize {
425     data & STATE_MASK
426 }
427 
get_num_notify_waiters_calls(data: usize) -> usize428 fn get_num_notify_waiters_calls(data: usize) -> usize {
429     (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
430 }
431 
inc_num_notify_waiters_calls(data: usize) -> usize432 fn inc_num_notify_waiters_calls(data: usize) -> usize {
433     data + (1 << NOTIFY_WAITERS_SHIFT)
434 }
435 
atomic_inc_num_notify_waiters_calls(data: &AtomicUsize)436 fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
437     data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
438 }
439 
440 impl Notify {
441     /// Create a new `Notify`, initialized without a permit.
442     ///
443     /// # Examples
444     ///
445     /// ```
446     /// use tokio::sync::Notify;
447     ///
448     /// let notify = Notify::new();
449     /// ```
new() -> Notify450     pub fn new() -> Notify {
451         Notify {
452             state: AtomicUsize::new(0),
453             waiters: Mutex::new(LinkedList::new()),
454         }
455     }
456 
457     /// Create a new `Notify`, initialized without a permit.
458     ///
459     /// When using the `tracing` [unstable feature], a `Notify` created with
460     /// `const_new` will not be instrumented. As such, it will not be visible
461     /// in [`tokio-console`]. Instead, [`Notify::new`] should be used to create
462     /// an instrumented object if that is needed.
463     ///
464     /// # Examples
465     ///
466     /// ```
467     /// use tokio::sync::Notify;
468     ///
469     /// static NOTIFY: Notify = Notify::const_new();
470     /// ```
471     ///
472     /// [`tokio-console`]: https://github.com/tokio-rs/console
473     /// [unstable feature]: crate#unstable-features
474     #[cfg(not(all(loom, test)))]
const_new() -> Notify475     pub const fn const_new() -> Notify {
476         Notify {
477             state: AtomicUsize::new(0),
478             waiters: Mutex::const_new(LinkedList::new()),
479         }
480     }
481 
482     /// Wait for a notification.
483     ///
484     /// Equivalent to:
485     ///
486     /// ```ignore
487     /// async fn notified(&self);
488     /// ```
489     ///
490     /// Each `Notify` value holds a single permit. If a permit is available from
491     /// an earlier call to [`notify_one()`], then `notified().await` will complete
492     /// immediately, consuming that permit. Otherwise, `notified().await` waits
493     /// for a permit to be made available by the next call to `notify_one()`.
494     ///
495     /// The `Notified` future is not guaranteed to receive wakeups from calls to
496     /// `notify_one()` if it has not yet been polled. See the documentation for
497     /// [`Notified::enable()`] for more details.
498     ///
499     /// The `Notified` future is guaranteed to receive wakeups from
500     /// `notify_waiters()` as soon as it has been created, even if it has not
501     /// yet been polled.
502     ///
503     /// [`notify_one()`]: Notify::notify_one
504     /// [`Notified::enable()`]: Notified::enable
505     ///
506     /// # Cancel safety
507     ///
508     /// This method uses a queue to fairly distribute notifications in the order
509     /// they were requested. Cancelling a call to `notified` makes you lose your
510     /// place in the queue.
511     ///
512     /// # Examples
513     ///
514     /// ```
515     /// use tokio::sync::Notify;
516     /// use std::sync::Arc;
517     ///
518     /// #[tokio::main]
519     /// async fn main() {
520     ///     let notify = Arc::new(Notify::new());
521     ///     let notify2 = notify.clone();
522     ///
523     ///     tokio::spawn(async move {
524     ///         notify2.notified().await;
525     ///         println!("received notification");
526     ///     });
527     ///
528     ///     println!("sending notification");
529     ///     notify.notify_one();
530     /// }
531     /// ```
notified(&self) -> Notified<'_>532     pub fn notified(&self) -> Notified<'_> {
533         // we load the number of times notify_waiters
534         // was called and store that in the future.
535         let state = self.state.load(SeqCst);
536         Notified {
537             notify: self,
538             state: State::Init,
539             notify_waiters_calls: get_num_notify_waiters_calls(state),
540             waiter: Waiter::new(),
541         }
542     }
543 
544     /// Notifies the first waiting task.
545     ///
546     /// If a task is currently waiting, that task is notified. Otherwise, a
547     /// permit is stored in this `Notify` value and the **next** call to
548     /// [`notified().await`] will complete immediately consuming the permit made
549     /// available by this call to `notify_one()`.
550     ///
551     /// At most one permit may be stored by `Notify`. Many sequential calls to
552     /// `notify_one` will result in a single permit being stored. The next call to
553     /// `notified().await` will complete immediately, but the one after that
554     /// will wait.
555     ///
556     /// [`notified().await`]: Notify::notified()
557     ///
558     /// # Examples
559     ///
560     /// ```
561     /// use tokio::sync::Notify;
562     /// use std::sync::Arc;
563     ///
564     /// #[tokio::main]
565     /// async fn main() {
566     ///     let notify = Arc::new(Notify::new());
567     ///     let notify2 = notify.clone();
568     ///
569     ///     tokio::spawn(async move {
570     ///         notify2.notified().await;
571     ///         println!("received notification");
572     ///     });
573     ///
574     ///     println!("sending notification");
575     ///     notify.notify_one();
576     /// }
577     /// ```
578     // Alias for old name in 0.x
579     #[cfg_attr(docsrs, doc(alias = "notify"))]
notify_one(&self)580     pub fn notify_one(&self) {
581         self.notify_with_strategy(NotifyOneStrategy::Fifo);
582     }
583 
584     /// Notifies the last waiting task.
585     ///
586     /// This function behaves similar to `notify_one`. The only difference is that it wakes
587     /// the most recently added waiter instead of the oldest waiter.
588     ///
589     /// Check the [`notify_one()`] documentation for more info and
590     /// examples.
591     ///
592     /// [`notify_one()`]: Notify::notify_one
notify_last(&self)593     pub fn notify_last(&self) {
594         self.notify_with_strategy(NotifyOneStrategy::Lifo);
595     }
596 
notify_with_strategy(&self, strategy: NotifyOneStrategy)597     fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
598         // Load the current state
599         let mut curr = self.state.load(SeqCst);
600 
601         // If the state is `EMPTY`, transition to `NOTIFIED` and return.
602         while let EMPTY | NOTIFIED = get_state(curr) {
603             // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
604             // happens-before synchronization must happen between this atomic
605             // operation and a task calling `notified().await`.
606             let new = set_state(curr, NOTIFIED);
607             let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
608 
609             match res {
610                 // No waiters, no further work to do
611                 Ok(_) => return,
612                 Err(actual) => {
613                     curr = actual;
614                 }
615             }
616         }
617 
618         // There are waiters, the lock must be acquired to notify.
619         let mut waiters = self.waiters.lock();
620 
621         // The state must be reloaded while the lock is held. The state may only
622         // transition out of WAITING while the lock is held.
623         curr = self.state.load(SeqCst);
624 
625         if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
626             drop(waiters);
627             waker.wake();
628         }
629     }
630 
631     /// Notifies all waiting tasks.
632     ///
633     /// If a task is currently waiting, that task is notified. Unlike with
634     /// `notify_one()`, no permit is stored to be used by the next call to
635     /// `notified().await`. The purpose of this method is to notify all
636     /// already registered waiters. Registering for notification is done by
637     /// acquiring an instance of the `Notified` future via calling `notified()`.
638     ///
639     /// # Examples
640     ///
641     /// ```
642     /// use tokio::sync::Notify;
643     /// use std::sync::Arc;
644     ///
645     /// #[tokio::main]
646     /// async fn main() {
647     ///     let notify = Arc::new(Notify::new());
648     ///     let notify2 = notify.clone();
649     ///
650     ///     let notified1 = notify.notified();
651     ///     let notified2 = notify.notified();
652     ///
653     ///     let handle = tokio::spawn(async move {
654     ///         println!("sending notifications");
655     ///         notify2.notify_waiters();
656     ///     });
657     ///
658     ///     notified1.await;
659     ///     notified2.await;
660     ///     println!("received notifications");
661     /// }
662     /// ```
notify_waiters(&self)663     pub fn notify_waiters(&self) {
664         let mut waiters = self.waiters.lock();
665 
666         // The state must be loaded while the lock is held. The state may only
667         // transition out of WAITING while the lock is held.
668         let curr = self.state.load(SeqCst);
669 
670         if matches!(get_state(curr), EMPTY | NOTIFIED) {
671             // There are no waiting tasks. All we need to do is increment the
672             // number of times this method was called.
673             atomic_inc_num_notify_waiters_calls(&self.state);
674             return;
675         }
676 
677         // Increment the number of times this method was called
678         // and transition to empty.
679         let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
680         self.state.store(new_state, SeqCst);
681 
682         // It is critical for `GuardedLinkedList` safety that the guard node is
683         // pinned in memory and is not dropped until the guarded list is dropped.
684         let guard = Waiter::new();
685         pin!(guard);
686 
687         // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
688         // underneath to allow every waiter to safely remove itself from it.
689         //
690         // * This list will be still guarded by the `waiters` lock.
691         //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
692         // * This wrapper will empty the list on drop. It is critical for safety
693         //   that we will not leave any list entry with a pointer to the local
694         //   guard node after this function returns / panics.
695         let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
696 
697         let mut wakers = WakeList::new();
698         'outer: loop {
699             while wakers.can_push() {
700                 match list.pop_back_locked(&mut waiters) {
701                     Some(waiter) => {
702                         // Safety: we never make mutable references to waiters.
703                         let waiter = unsafe { waiter.as_ref() };
704 
705                         // Safety: we hold the lock, so we can access the waker.
706                         if let Some(waker) =
707                             unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
708                         {
709                             wakers.push(waker);
710                         }
711 
712                         // This waiter is unlinked and will not be shared ever again, release it.
713                         waiter.notification.store_release(Notification::All);
714                     }
715                     None => {
716                         break 'outer;
717                     }
718                 }
719             }
720 
721             // Release the lock before notifying.
722             drop(waiters);
723 
724             // One of the wakers may panic, but the remaining waiters will still
725             // be unlinked from the list in `NotifyWaitersList` destructor.
726             wakers.wake_all();
727 
728             // Acquire the lock again.
729             waiters = self.waiters.lock();
730         }
731 
732         // Release the lock before notifying
733         drop(waiters);
734 
735         wakers.wake_all();
736     }
737 }
738 
739 impl Default for Notify {
default() -> Notify740     fn default() -> Notify {
741         Notify::new()
742     }
743 }
744 
745 impl UnwindSafe for Notify {}
746 impl RefUnwindSafe for Notify {}
747 
notify_locked( waiters: &mut WaitList, state: &AtomicUsize, curr: usize, strategy: NotifyOneStrategy, ) -> Option<Waker>748 fn notify_locked(
749     waiters: &mut WaitList,
750     state: &AtomicUsize,
751     curr: usize,
752     strategy: NotifyOneStrategy,
753 ) -> Option<Waker> {
754     match get_state(curr) {
755         EMPTY | NOTIFIED => {
756             let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
757 
758             match res {
759                 Ok(_) => None,
760                 Err(actual) => {
761                     let actual_state = get_state(actual);
762                     assert!(actual_state == EMPTY || actual_state == NOTIFIED);
763                     state.store(set_state(actual, NOTIFIED), SeqCst);
764                     None
765                 }
766             }
767         }
768         WAITING => {
769             // At this point, it is guaranteed that the state will not
770             // concurrently change as holding the lock is required to
771             // transition **out** of `WAITING`.
772             //
773             // Get a pending waiter using one of the available dequeue strategies.
774             let waiter = match strategy {
775                 NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
776                 NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
777             };
778 
779             // Safety: we never make mutable references to waiters.
780             let waiter = unsafe { waiter.as_ref() };
781 
782             // Safety: we hold the lock, so we can access the waker.
783             let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
784 
785             // This waiter is unlinked and will not be shared ever again, release it.
786             waiter
787                 .notification
788                 .store_release(Notification::One(strategy));
789 
790             if waiters.is_empty() {
791                 // As this the **final** waiter in the list, the state
792                 // must be transitioned to `EMPTY`. As transitioning
793                 // **from** `WAITING` requires the lock to be held, a
794                 // `store` is sufficient.
795                 state.store(set_state(curr, EMPTY), SeqCst);
796             }
797             waker
798         }
799         _ => unreachable!(),
800     }
801 }
802 
803 // ===== impl Notified =====
804 
805 impl Notified<'_> {
806     /// Adds this future to the list of futures that are ready to receive
807     /// wakeups from calls to [`notify_one`].
808     ///
809     /// Polling the future also adds it to the list, so this method should only
810     /// be used if you want to add the future to the list before the first call
811     /// to `poll`. (In fact, this method is equivalent to calling `poll` except
812     /// that no `Waker` is registered.)
813     ///
814     /// This has no effect on notifications sent using [`notify_waiters`], which
815     /// are received as long as they happen after the creation of the `Notified`
816     /// regardless of whether `enable` or `poll` has been called.
817     ///
818     /// This method returns true if the `Notified` is ready. This happens in the
819     /// following situations:
820     ///
821     ///  1. The `notify_waiters` method was called between the creation of the
822     ///     `Notified` and the call to this method.
823     ///  2. This is the first call to `enable` or `poll` on this future, and the
824     ///     `Notify` was holding a permit from a previous call to `notify_one`.
825     ///     The call consumes the permit in that case.
826     ///  3. The future has previously been enabled or polled, and it has since
827     ///     then been marked ready by either consuming a permit from the
828     ///     `Notify`, or by a call to `notify_one` or `notify_waiters` that
829     ///     removed it from the list of futures ready to receive wakeups.
830     ///
831     /// If this method returns true, any future calls to poll on the same future
832     /// will immediately return `Poll::Ready`.
833     ///
834     /// # Examples
835     ///
836     /// Unbound multi-producer multi-consumer (mpmc) channel.
837     ///
838     /// The call to `enable` is important because otherwise if you have two
839     /// calls to `recv` and two calls to `send` in parallel, the following could
840     /// happen:
841     ///
842     ///  1. Both calls to `try_recv` return `None`.
843     ///  2. Both new elements are added to the vector.
844     ///  3. The `notify_one` method is called twice, adding only a single
845     ///     permit to the `Notify`.
846     ///  4. Both calls to `recv` reach the `Notified` future. One of them
847     ///     consumes the permit, and the other sleeps forever.
848     ///
849     /// By adding the `Notified` futures to the list by calling `enable` before
850     /// `try_recv`, the `notify_one` calls in step three would remove the
851     /// futures from the list and mark them notified instead of adding a permit
852     /// to the `Notify`. This ensures that both futures are woken.
853     ///
854     /// ```
855     /// use tokio::sync::Notify;
856     ///
857     /// use std::collections::VecDeque;
858     /// use std::sync::Mutex;
859     ///
860     /// struct Channel<T> {
861     ///     messages: Mutex<VecDeque<T>>,
862     ///     notify_on_sent: Notify,
863     /// }
864     ///
865     /// impl<T> Channel<T> {
866     ///     pub fn send(&self, msg: T) {
867     ///         let mut locked_queue = self.messages.lock().unwrap();
868     ///         locked_queue.push_back(msg);
869     ///         drop(locked_queue);
870     ///
871     ///         // Send a notification to one of the calls currently
872     ///         // waiting in a call to `recv`.
873     ///         self.notify_on_sent.notify_one();
874     ///     }
875     ///
876     ///     pub fn try_recv(&self) -> Option<T> {
877     ///         let mut locked_queue = self.messages.lock().unwrap();
878     ///         locked_queue.pop_front()
879     ///     }
880     ///
881     ///     pub async fn recv(&self) -> T {
882     ///         let future = self.notify_on_sent.notified();
883     ///         tokio::pin!(future);
884     ///
885     ///         loop {
886     ///             // Make sure that no wakeup is lost if we get
887     ///             // `None` from `try_recv`.
888     ///             future.as_mut().enable();
889     ///
890     ///             if let Some(msg) = self.try_recv() {
891     ///                 return msg;
892     ///             }
893     ///
894     ///             // Wait for a call to `notify_one`.
895     ///             //
896     ///             // This uses `.as_mut()` to avoid consuming the future,
897     ///             // which lets us call `Pin::set` below.
898     ///             future.as_mut().await;
899     ///
900     ///             // Reset the future in case another call to
901     ///             // `try_recv` got the message before us.
902     ///             future.set(self.notify_on_sent.notified());
903     ///         }
904     ///     }
905     /// }
906     /// ```
907     ///
908     /// [`notify_one`]: Notify::notify_one()
909     /// [`notify_waiters`]: Notify::notify_waiters()
enable(self: Pin<&mut Self>) -> bool910     pub fn enable(self: Pin<&mut Self>) -> bool {
911         self.poll_notified(None).is_ready()
912     }
913 
914     /// A custom `project` implementation is used in place of `pin-project-lite`
915     /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter)916     fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) {
917         unsafe {
918             // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
919 
920             is_unpin::<&Notify>();
921             is_unpin::<State>();
922             is_unpin::<usize>();
923 
924             let me = self.get_unchecked_mut();
925             (
926                 me.notify,
927                 &mut me.state,
928                 &me.notify_waiters_calls,
929                 &me.waiter,
930             )
931         }
932     }
933 
poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()>934     fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
935         let (notify, state, notify_waiters_calls, waiter) = self.project();
936 
937         'outer_loop: loop {
938             match *state {
939                 State::Init => {
940                     let curr = notify.state.load(SeqCst);
941 
942                     // Optimistically try acquiring a pending notification
943                     let res = notify.state.compare_exchange(
944                         set_state(curr, NOTIFIED),
945                         set_state(curr, EMPTY),
946                         SeqCst,
947                         SeqCst,
948                     );
949 
950                     if res.is_ok() {
951                         // Acquired the notification
952                         *state = State::Done;
953                         continue 'outer_loop;
954                     }
955 
956                     // Clone the waker before locking, a waker clone can be
957                     // triggering arbitrary code.
958                     let waker = waker.cloned();
959 
960                     // Acquire the lock and attempt to transition to the waiting
961                     // state.
962                     let mut waiters = notify.waiters.lock();
963 
964                     // Reload the state with the lock held
965                     let mut curr = notify.state.load(SeqCst);
966 
967                     // if notify_waiters has been called after the future
968                     // was created, then we are done
969                     if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
970                         *state = State::Done;
971                         continue 'outer_loop;
972                     }
973 
974                     // Transition the state to WAITING.
975                     loop {
976                         match get_state(curr) {
977                             EMPTY => {
978                                 // Transition to WAITING
979                                 let res = notify.state.compare_exchange(
980                                     set_state(curr, EMPTY),
981                                     set_state(curr, WAITING),
982                                     SeqCst,
983                                     SeqCst,
984                                 );
985 
986                                 if let Err(actual) = res {
987                                     assert_eq!(get_state(actual), NOTIFIED);
988                                     curr = actual;
989                                 } else {
990                                     break;
991                                 }
992                             }
993                             WAITING => break,
994                             NOTIFIED => {
995                                 // Try consuming the notification
996                                 let res = notify.state.compare_exchange(
997                                     set_state(curr, NOTIFIED),
998                                     set_state(curr, EMPTY),
999                                     SeqCst,
1000                                     SeqCst,
1001                                 );
1002 
1003                                 match res {
1004                                     Ok(_) => {
1005                                         // Acquired the notification
1006                                         *state = State::Done;
1007                                         continue 'outer_loop;
1008                                     }
1009                                     Err(actual) => {
1010                                         assert_eq!(get_state(actual), EMPTY);
1011                                         curr = actual;
1012                                     }
1013                                 }
1014                             }
1015                             _ => unreachable!(),
1016                         }
1017                     }
1018 
1019                     let mut old_waker = None;
1020                     if waker.is_some() {
1021                         // Safety: called while locked.
1022                         //
1023                         // The use of `old_waiter` here is not necessary, as the field is always
1024                         // None when we reach this line.
1025                         unsafe {
1026                             old_waker =
1027                                 waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
1028                         }
1029                     }
1030 
1031                     // Insert the waiter into the linked list
1032                     waiters.push_front(NonNull::from(waiter));
1033 
1034                     *state = State::Waiting;
1035 
1036                     drop(waiters);
1037                     drop(old_waker);
1038 
1039                     return Poll::Pending;
1040                 }
1041                 State::Waiting => {
1042                     #[cfg(tokio_taskdump)]
1043                     if let Some(waker) = waker {
1044                         let mut ctx = Context::from_waker(waker);
1045                         std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1046                     }
1047 
1048                     if waiter.notification.load(Acquire).is_some() {
1049                         // Safety: waiter is already unlinked and will not be shared again,
1050                         // so we have an exclusive access to `waker`.
1051                         drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
1052 
1053                         waiter.notification.clear();
1054                         *state = State::Done;
1055                         return Poll::Ready(());
1056                     }
1057 
1058                     // Our waiter was not notified, implying it is still stored in a waiter
1059                     // list (guarded by `notify.waiters`). In order to access the waker
1060                     // fields, we must acquire the lock.
1061 
1062                     let mut old_waker = None;
1063                     let mut waiters = notify.waiters.lock();
1064 
1065                     // We hold the lock and notifications are set only with the lock held,
1066                     // so this can be relaxed, because the happens-before relationship is
1067                     // established through the mutex.
1068                     if waiter.notification.load(Relaxed).is_some() {
1069                         // Safety: waiter is already unlinked and will not be shared again,
1070                         // so we have an exclusive access to `waker`.
1071                         old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1072 
1073                         waiter.notification.clear();
1074 
1075                         // Drop the old waker after releasing the lock.
1076                         drop(waiters);
1077                         drop(old_waker);
1078 
1079                         *state = State::Done;
1080                         return Poll::Ready(());
1081                     }
1082 
1083                     // Load the state with the lock held.
1084                     let curr = notify.state.load(SeqCst);
1085 
1086                     if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1087                         // Before we add a waiter to the list we check if these numbers are
1088                         // different while holding the lock. If these numbers are different now,
1089                         // it means that there is a call to `notify_waiters` in progress and this
1090                         // waiter must be contained by a guarded list used in `notify_waiters`.
1091                         // We can treat the waiter as notified and remove it from the list, as
1092                         // it would have been notified in the `notify_waiters` call anyways.
1093 
1094                         // Safety: we hold the lock, so we can modify the waker.
1095                         old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1096 
1097                         // Safety: we hold the lock, so we have an exclusive access to the list.
1098                         // The list is used in `notify_waiters`, so it must be guarded.
1099                         unsafe { waiters.remove(NonNull::from(waiter)) };
1100 
1101                         *state = State::Done;
1102                     } else {
1103                         // Safety: we hold the lock, so we can modify the waker.
1104                         unsafe {
1105                             waiter.waker.with_mut(|v| {
1106                                 if let Some(waker) = waker {
1107                                     let should_update = match &*v {
1108                                         Some(current_waker) => !current_waker.will_wake(waker),
1109                                         None => true,
1110                                     };
1111                                     if should_update {
1112                                         old_waker = std::mem::replace(&mut *v, Some(waker.clone()));
1113                                     }
1114                                 }
1115                             });
1116                         }
1117 
1118                         // Drop the old waker after releasing the lock.
1119                         drop(waiters);
1120                         drop(old_waker);
1121 
1122                         return Poll::Pending;
1123                     }
1124 
1125                     // Explicit drop of the lock to indicate the scope that the
1126                     // lock is held. Because holding the lock is required to
1127                     // ensure safe access to fields not held within the lock, it
1128                     // is helpful to visualize the scope of the critical
1129                     // section.
1130                     drop(waiters);
1131 
1132                     // Drop the old waker after releasing the lock.
1133                     drop(old_waker);
1134                 }
1135                 State::Done => {
1136                     #[cfg(tokio_taskdump)]
1137                     if let Some(waker) = waker {
1138                         let mut ctx = Context::from_waker(waker);
1139                         std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1140                     }
1141                     return Poll::Ready(());
1142                 }
1143             }
1144         }
1145     }
1146 }
1147 
1148 impl Future for Notified<'_> {
1149     type Output = ();
1150 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>1151     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1152         self.poll_notified(Some(cx.waker()))
1153     }
1154 }
1155 
1156 impl Drop for Notified<'_> {
drop(&mut self)1157     fn drop(&mut self) {
1158         // Safety: The type only transitions to a "Waiting" state when pinned.
1159         let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() };
1160 
1161         // This is where we ensure safety. The `Notified` value is being
1162         // dropped, which means we must ensure that the waiter entry is no
1163         // longer stored in the linked list.
1164         if matches!(*state, State::Waiting) {
1165             let mut waiters = notify.waiters.lock();
1166             let mut notify_state = notify.state.load(SeqCst);
1167 
1168             // We hold the lock, so this field is not concurrently accessed by
1169             // `notify_*` functions and we can use the relaxed ordering.
1170             let notification = waiter.notification.load(Relaxed);
1171 
1172             // remove the entry from the list (if not already removed)
1173             //
1174             // Safety: we hold the lock, so we have an exclusive access to every list the
1175             // waiter may be contained in. If the node is not contained in the `waiters`
1176             // list, then it is contained by a guarded list used by `notify_waiters`.
1177             unsafe { waiters.remove(NonNull::from(waiter)) };
1178 
1179             if waiters.is_empty() && get_state(notify_state) == WAITING {
1180                 notify_state = set_state(notify_state, EMPTY);
1181                 notify.state.store(notify_state, SeqCst);
1182             }
1183 
1184             // See if the node was notified but not received. In this case, if
1185             // the notification was triggered via `notify_one`, it must be sent
1186             // to the next waiter.
1187             if let Some(Notification::One(strategy)) = notification {
1188                 if let Some(waker) =
1189                     notify_locked(&mut waiters, &notify.state, notify_state, strategy)
1190                 {
1191                     drop(waiters);
1192                     waker.wake();
1193                 }
1194             }
1195         }
1196     }
1197 }
1198 
1199 /// # Safety
1200 ///
1201 /// `Waiter` is forced to be !Unpin.
1202 unsafe impl linked_list::Link for Waiter {
1203     type Handle = NonNull<Waiter>;
1204     type Target = Waiter;
1205 
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>1206     fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1207         *handle
1208     }
1209 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>1210     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1211         ptr
1212     }
1213 
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>1214     unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1215         Waiter::addr_of_pointers(target)
1216     }
1217 }
1218 
is_unpin<T: Unpin>()1219 fn is_unpin<T: Unpin>() {}
1220