1 //! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2 //! all consumers.
3 //!
4 //! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5 //! values. [`Sender`] handles are clone-able, allowing concurrent send and
6 //! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7 //! long as `T` is `Send`.
8 //!
9 //! When a value is sent, **all** [`Receiver`] handles are notified and will
10 //! receive the value. The value is stored once inside the channel and cloned on
11 //! demand for each receiver. Once all receivers have received a clone of the
12 //! value, the value is released from the channel.
13 //!
14 //! A channel is created by calling [`channel`], specifying the maximum number
15 //! of messages the channel can retain at any given time.
16 //!
17 //! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18 //! returned [`Receiver`] will receive values sent **after** the call to
19 //! `subscribe`.
20 //!
21 //! This channel is also suitable for the single-producer multi-consumer
22 //! use-case, where a single sender broadcasts values to many receivers.
23 //!
24 //! ## Lagging
25 //!
26 //! As sent messages must be retained until **all** [`Receiver`] handles receive
27 //! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28 //! In this case, all but one receiver are able to receive values at the rate
29 //! they are sent. Because one receiver is stalled, the channel starts to fill
30 //! up.
31 //!
32 //! This broadcast channel implementation handles this case by setting a hard
33 //! upper bound on the number of values the channel may retain at any given
34 //! time. This upper bound is passed to the [`channel`] function as an argument.
35 //!
36 //! If a value is sent when the channel is at capacity, the oldest value
37 //! currently held by the channel is released. This frees up space for the new
38 //! value. Any receiver that has not yet seen the released value will return
39 //! [`RecvError::Lagged`] the next time [`recv`] is called.
40 //!
41 //! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42 //! updated to the oldest value contained by the channel. The next call to
43 //! [`recv`] will return this value.
44 //!
45 //! This behavior enables a receiver to detect when it has lagged so far behind
46 //! that data has been dropped. The caller may decide how to respond to this:
47 //! either by aborting its task or by tolerating lost messages and resuming
48 //! consumption of the channel.
49 //!
50 //! ## Closing
51 //!
52 //! When **all** [`Sender`] handles have been dropped, no new values may be
53 //! sent. At this point, the channel is "closed". Once a receiver has received
54 //! all values retained by the channel, the next call to [`recv`] will return
55 //! with [`RecvError::Closed`].
56 //!
57 //! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58 //! will be marked as read. If this receiver was the only one not to have read
59 //! that message, the message will be dropped at this point.
60 //!
61 //! [`Sender`]: crate::sync::broadcast::Sender
62 //! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63 //! [`Receiver`]: crate::sync::broadcast::Receiver
64 //! [`channel`]: crate::sync::broadcast::channel
65 //! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66 //! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67 //! [`recv`]: crate::sync::broadcast::Receiver::recv
68 //!
69 //! # Examples
70 //!
71 //! Basic usage
72 //!
73 //! ```
74 //! use tokio::sync::broadcast;
75 //!
76 //! #[tokio::main]
77 //! async fn main() {
78 //!     let (tx, mut rx1) = broadcast::channel(16);
79 //!     let mut rx2 = tx.subscribe();
80 //!
81 //!     tokio::spawn(async move {
82 //!         assert_eq!(rx1.recv().await.unwrap(), 10);
83 //!         assert_eq!(rx1.recv().await.unwrap(), 20);
84 //!     });
85 //!
86 //!     tokio::spawn(async move {
87 //!         assert_eq!(rx2.recv().await.unwrap(), 10);
88 //!         assert_eq!(rx2.recv().await.unwrap(), 20);
89 //!     });
90 //!
91 //!     tx.send(10).unwrap();
92 //!     tx.send(20).unwrap();
93 //! }
94 //! ```
95 //!
96 //! Handling lag
97 //!
98 //! ```
99 //! use tokio::sync::broadcast;
100 //!
101 //! #[tokio::main]
102 //! async fn main() {
103 //!     let (tx, mut rx) = broadcast::channel(2);
104 //!
105 //!     tx.send(10).unwrap();
106 //!     tx.send(20).unwrap();
107 //!     tx.send(30).unwrap();
108 //!
109 //!     // The receiver lagged behind
110 //!     assert!(rx.recv().await.is_err());
111 //!
112 //!     // At this point, we can abort or continue with lost messages
113 //!
114 //!     assert_eq!(20, rx.recv().await.unwrap());
115 //!     assert_eq!(30, rx.recv().await.unwrap());
116 //! }
117 //! ```
118 
119 use crate::loom::cell::UnsafeCell;
120 use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121 use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
122 use crate::runtime::coop::cooperative;
123 use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124 use crate::util::WakeList;
125 
126 use std::fmt;
127 use std::future::Future;
128 use std::marker::PhantomPinned;
129 use std::pin::Pin;
130 use std::ptr::NonNull;
131 use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
132 use std::task::{ready, Context, Poll, Waker};
133 
134 /// Sending-half of the [`broadcast`] channel.
135 ///
136 /// May be used from many threads. Messages can be sent with
137 /// [`send`][Sender::send].
138 ///
139 /// # Examples
140 ///
141 /// ```
142 /// use tokio::sync::broadcast;
143 ///
144 /// #[tokio::main]
145 /// async fn main() {
146 ///     let (tx, mut rx1) = broadcast::channel(16);
147 ///     let mut rx2 = tx.subscribe();
148 ///
149 ///     tokio::spawn(async move {
150 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
151 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
152 ///     });
153 ///
154 ///     tokio::spawn(async move {
155 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
156 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
157 ///     });
158 ///
159 ///     tx.send(10).unwrap();
160 ///     tx.send(20).unwrap();
161 /// }
162 /// ```
163 ///
164 /// [`broadcast`]: crate::sync::broadcast
165 pub struct Sender<T> {
166     shared: Arc<Shared<T>>,
167 }
168 
169 /// Receiving-half of the [`broadcast`] channel.
170 ///
171 /// Must not be used concurrently. Messages may be retrieved using
172 /// [`recv`][Receiver::recv].
173 ///
174 /// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
175 /// wrapper.
176 ///
177 /// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
178 ///
179 /// # Examples
180 ///
181 /// ```
182 /// use tokio::sync::broadcast;
183 ///
184 /// #[tokio::main]
185 /// async fn main() {
186 ///     let (tx, mut rx1) = broadcast::channel(16);
187 ///     let mut rx2 = tx.subscribe();
188 ///
189 ///     tokio::spawn(async move {
190 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
191 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
192 ///     });
193 ///
194 ///     tokio::spawn(async move {
195 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
196 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
197 ///     });
198 ///
199 ///     tx.send(10).unwrap();
200 ///     tx.send(20).unwrap();
201 /// }
202 /// ```
203 ///
204 /// [`broadcast`]: crate::sync::broadcast
205 pub struct Receiver<T> {
206     /// State shared with all receivers and senders.
207     shared: Arc<Shared<T>>,
208 
209     /// Next position to read from
210     next: u64,
211 }
212 
213 pub mod error {
214     //! Broadcast error types
215 
216     use std::fmt;
217 
218     /// Error returned by the [`send`] function on a [`Sender`].
219     ///
220     /// A **send** operation can only fail if there are no active receivers,
221     /// implying that the message could never be received. The error contains the
222     /// message being sent as a payload so it can be recovered.
223     ///
224     /// [`send`]: crate::sync::broadcast::Sender::send
225     /// [`Sender`]: crate::sync::broadcast::Sender
226     #[derive(Debug)]
227     pub struct SendError<T>(pub T);
228 
229     impl<T> fmt::Display for SendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result230         fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231             write!(f, "channel closed")
232         }
233     }
234 
235     impl<T: fmt::Debug> std::error::Error for SendError<T> {}
236 
237     /// An error returned from the [`recv`] function on a [`Receiver`].
238     ///
239     /// [`recv`]: crate::sync::broadcast::Receiver::recv
240     /// [`Receiver`]: crate::sync::broadcast::Receiver
241     #[derive(Debug, PartialEq, Eq, Clone)]
242     pub enum RecvError {
243         /// There are no more active senders implying no further messages will ever
244         /// be sent.
245         Closed,
246 
247         /// The receiver lagged too far behind. Attempting to receive again will
248         /// return the oldest message still retained by the channel.
249         ///
250         /// Includes the number of skipped messages.
251         Lagged(u64),
252     }
253 
254     impl fmt::Display for RecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result255         fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256             match self {
257                 RecvError::Closed => write!(f, "channel closed"),
258                 RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
259             }
260         }
261     }
262 
263     impl std::error::Error for RecvError {}
264 
265     /// An error returned from the [`try_recv`] function on a [`Receiver`].
266     ///
267     /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
268     /// [`Receiver`]: crate::sync::broadcast::Receiver
269     #[derive(Debug, PartialEq, Eq, Clone)]
270     pub enum TryRecvError {
271         /// The channel is currently empty. There are still active
272         /// [`Sender`] handles, so data may yet become available.
273         ///
274         /// [`Sender`]: crate::sync::broadcast::Sender
275         Empty,
276 
277         /// There are no more active senders implying no further messages will ever
278         /// be sent.
279         Closed,
280 
281         /// The receiver lagged too far behind and has been forcibly disconnected.
282         /// Attempting to receive again will return the oldest message still
283         /// retained by the channel.
284         ///
285         /// Includes the number of skipped messages.
286         Lagged(u64),
287     }
288 
289     impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result290         fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291             match self {
292                 TryRecvError::Empty => write!(f, "channel empty"),
293                 TryRecvError::Closed => write!(f, "channel closed"),
294                 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
295             }
296         }
297     }
298 
299     impl std::error::Error for TryRecvError {}
300 }
301 
302 use self::error::{RecvError, SendError, TryRecvError};
303 
304 /// Data shared between senders and receivers.
305 struct Shared<T> {
306     /// slots in the channel.
307     buffer: Box<[RwLock<Slot<T>>]>,
308 
309     /// Mask a position -> index.
310     mask: usize,
311 
312     /// Tail of the queue. Includes the rx wait list.
313     tail: Mutex<Tail>,
314 
315     /// Number of outstanding Sender handles.
316     num_tx: AtomicUsize,
317 }
318 
319 /// Next position to write a value.
320 struct Tail {
321     /// Next position to write to.
322     pos: u64,
323 
324     /// Number of active receivers.
325     rx_cnt: usize,
326 
327     /// True if the channel is closed.
328     closed: bool,
329 
330     /// Receivers waiting for a value.
331     waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
332 }
333 
334 /// Slot in the buffer.
335 struct Slot<T> {
336     /// Remaining number of receivers that are expected to see this value.
337     ///
338     /// When this goes to zero, the value is released.
339     ///
340     /// An atomic is used as it is mutated concurrently with the slot read lock
341     /// acquired.
342     rem: AtomicUsize,
343 
344     /// Uniquely identifies the `send` stored in the slot.
345     pos: u64,
346 
347     /// The value being broadcast.
348     ///
349     /// The value is set by `send` when the write lock is held. When a reader
350     /// drops, `rem` is decremented. When it hits zero, the value is dropped.
351     val: UnsafeCell<Option<T>>,
352 }
353 
354 /// An entry in the wait queue.
355 struct Waiter {
356     /// True if queued.
357     queued: AtomicBool,
358 
359     /// Task waiting on the broadcast channel.
360     waker: Option<Waker>,
361 
362     /// Intrusive linked-list pointers.
363     pointers: linked_list::Pointers<Waiter>,
364 
365     /// Should not be `Unpin`.
366     _p: PhantomPinned,
367 }
368 
369 impl Waiter {
new() -> Self370     fn new() -> Self {
371         Self {
372             queued: AtomicBool::new(false),
373             waker: None,
374             pointers: linked_list::Pointers::new(),
375             _p: PhantomPinned,
376         }
377     }
378 }
379 
380 generate_addr_of_methods! {
381     impl<> Waiter {
382         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
383             &self.pointers
384         }
385     }
386 }
387 
388 struct RecvGuard<'a, T> {
389     slot: RwLockReadGuard<'a, Slot<T>>,
390 }
391 
392 /// Receive a value future.
393 struct Recv<'a, T> {
394     /// Receiver being waited on.
395     receiver: &'a mut Receiver<T>,
396 
397     /// Entry in the waiter `LinkedList`.
398     waiter: UnsafeCell<Waiter>,
399 }
400 
401 unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
402 unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
403 
404 /// Max number of receivers. Reserve space to lock.
405 const MAX_RECEIVERS: usize = usize::MAX >> 2;
406 
407 /// Create a bounded, multi-producer, multi-consumer channel where each sent
408 /// value is broadcasted to all active receivers.
409 ///
410 /// **Note:** The actual capacity may be greater than the provided `capacity`.
411 ///
412 /// All data sent on [`Sender`] will become available on every active
413 /// [`Receiver`] in the same order as it was sent.
414 ///
415 /// The `Sender` can be cloned to `send` to the same channel from multiple
416 /// points in the process or it can be used concurrently from an `Arc`. New
417 /// `Receiver` handles are created by calling [`Sender::subscribe`].
418 ///
419 /// If all [`Receiver`] handles are dropped, the `send` method will return a
420 /// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
421 /// method will return a [`RecvError`].
422 ///
423 /// [`Sender`]: crate::sync::broadcast::Sender
424 /// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
425 /// [`Receiver`]: crate::sync::broadcast::Receiver
426 /// [`recv`]: crate::sync::broadcast::Receiver::recv
427 /// [`SendError`]: crate::sync::broadcast::error::SendError
428 /// [`RecvError`]: crate::sync::broadcast::error::RecvError
429 ///
430 /// # Examples
431 ///
432 /// ```
433 /// use tokio::sync::broadcast;
434 ///
435 /// #[tokio::main]
436 /// async fn main() {
437 ///     let (tx, mut rx1) = broadcast::channel(16);
438 ///     let mut rx2 = tx.subscribe();
439 ///
440 ///     tokio::spawn(async move {
441 ///         assert_eq!(rx1.recv().await.unwrap(), 10);
442 ///         assert_eq!(rx1.recv().await.unwrap(), 20);
443 ///     });
444 ///
445 ///     tokio::spawn(async move {
446 ///         assert_eq!(rx2.recv().await.unwrap(), 10);
447 ///         assert_eq!(rx2.recv().await.unwrap(), 20);
448 ///     });
449 ///
450 ///     tx.send(10).unwrap();
451 ///     tx.send(20).unwrap();
452 /// }
453 /// ```
454 ///
455 /// # Panics
456 ///
457 /// This will panic if `capacity` is equal to `0` or larger
458 /// than `usize::MAX / 2`.
459 #[track_caller]
channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>)460 pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
461     // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
462     let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
463     let rx = Receiver {
464         shared: tx.shared.clone(),
465         next: 0,
466     };
467     (tx, rx)
468 }
469 
470 unsafe impl<T: Send> Send for Sender<T> {}
471 unsafe impl<T: Send> Sync for Sender<T> {}
472 
473 unsafe impl<T: Send> Send for Receiver<T> {}
474 unsafe impl<T: Send> Sync for Receiver<T> {}
475 
476 impl<T> Sender<T> {
477     /// Creates the sending-half of the [`broadcast`] channel.
478     ///
479     /// See the documentation of [`broadcast::channel`] for more information on this method.
480     ///
481     /// [`broadcast`]: crate::sync::broadcast
482     /// [`broadcast::channel`]: crate::sync::broadcast::channel
483     #[track_caller]
new(capacity: usize) -> Self484     pub fn new(capacity: usize) -> Self {
485         // SAFETY: We don't create extra receivers, so there are 0.
486         unsafe { Self::new_with_receiver_count(0, capacity) }
487     }
488 
489     /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
490     /// count.
491     ///
492     /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
493     /// calling this function.
494     ///
495     /// # Safety:
496     ///
497     /// The caller must ensure that the amount of receivers for this Sender is correct before
498     /// the channel functionalities are used, the count is zero by default, as this function
499     /// does not create any receivers by itself.
500     #[track_caller]
new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self501     unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
502         assert!(capacity > 0, "broadcast channel capacity cannot be zero");
503         assert!(
504             capacity <= usize::MAX >> 1,
505             "broadcast channel capacity exceeded `usize::MAX / 2`"
506         );
507 
508         // Round to a power of two
509         capacity = capacity.next_power_of_two();
510 
511         let mut buffer = Vec::with_capacity(capacity);
512 
513         for i in 0..capacity {
514             buffer.push(RwLock::new(Slot {
515                 rem: AtomicUsize::new(0),
516                 pos: (i as u64).wrapping_sub(capacity as u64),
517                 val: UnsafeCell::new(None),
518             }));
519         }
520 
521         let shared = Arc::new(Shared {
522             buffer: buffer.into_boxed_slice(),
523             mask: capacity - 1,
524             tail: Mutex::new(Tail {
525                 pos: 0,
526                 rx_cnt: receiver_count,
527                 closed: false,
528                 waiters: LinkedList::new(),
529             }),
530             num_tx: AtomicUsize::new(1),
531         });
532 
533         Sender { shared }
534     }
535 
536     /// Attempts to send a value to all active [`Receiver`] handles, returning
537     /// it back if it could not be sent.
538     ///
539     /// A successful send occurs when there is at least one active [`Receiver`]
540     /// handle. An unsuccessful send would be one where all associated
541     /// [`Receiver`] handles have already been dropped.
542     ///
543     /// # Return
544     ///
545     /// On success, the number of subscribed [`Receiver`] handles is returned.
546     /// This does not mean that this number of receivers will see the message as
547     /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
548     /// the message.
549     ///
550     /// # Note
551     ///
552     /// A return value of `Ok` **does not** mean that the sent value will be
553     /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
554     /// handles may be dropped before receiving the sent message.
555     ///
556     /// A return value of `Err` **does not** mean that future calls to `send`
557     /// will fail. New [`Receiver`] handles may be created by calling
558     /// [`subscribe`].
559     ///
560     /// [`Receiver`]: crate::sync::broadcast::Receiver
561     /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
562     ///
563     /// # Examples
564     ///
565     /// ```
566     /// use tokio::sync::broadcast;
567     ///
568     /// #[tokio::main]
569     /// async fn main() {
570     ///     let (tx, mut rx1) = broadcast::channel(16);
571     ///     let mut rx2 = tx.subscribe();
572     ///
573     ///     tokio::spawn(async move {
574     ///         assert_eq!(rx1.recv().await.unwrap(), 10);
575     ///         assert_eq!(rx1.recv().await.unwrap(), 20);
576     ///     });
577     ///
578     ///     tokio::spawn(async move {
579     ///         assert_eq!(rx2.recv().await.unwrap(), 10);
580     ///         assert_eq!(rx2.recv().await.unwrap(), 20);
581     ///     });
582     ///
583     ///     tx.send(10).unwrap();
584     ///     tx.send(20).unwrap();
585     /// }
586     /// ```
send(&self, value: T) -> Result<usize, SendError<T>>587     pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
588         let mut tail = self.shared.tail.lock();
589 
590         if tail.rx_cnt == 0 {
591             return Err(SendError(value));
592         }
593 
594         // Position to write into
595         let pos = tail.pos;
596         let rem = tail.rx_cnt;
597         let idx = (pos & self.shared.mask as u64) as usize;
598 
599         // Update the tail position
600         tail.pos = tail.pos.wrapping_add(1);
601 
602         // Get the slot
603         let mut slot = self.shared.buffer[idx].write();
604 
605         // Track the position
606         slot.pos = pos;
607 
608         // Set remaining receivers
609         slot.rem.with_mut(|v| *v = rem);
610 
611         // Write the value
612         slot.val = UnsafeCell::new(Some(value));
613 
614         // Release the slot lock before notifying the receivers.
615         drop(slot);
616 
617         // Notify and release the mutex. This must happen after the slot lock is
618         // released, otherwise the writer lock bit could be cleared while another
619         // thread is in the critical section.
620         self.shared.notify_rx(tail);
621 
622         Ok(rem)
623     }
624 
625     /// Creates a new [`Receiver`] handle that will receive values sent **after**
626     /// this call to `subscribe`.
627     ///
628     /// # Examples
629     ///
630     /// ```
631     /// use tokio::sync::broadcast;
632     ///
633     /// #[tokio::main]
634     /// async fn main() {
635     ///     let (tx, _rx) = broadcast::channel(16);
636     ///
637     ///     // Will not be seen
638     ///     tx.send(10).unwrap();
639     ///
640     ///     let mut rx = tx.subscribe();
641     ///
642     ///     tx.send(20).unwrap();
643     ///
644     ///     let value = rx.recv().await.unwrap();
645     ///     assert_eq!(20, value);
646     /// }
647     /// ```
subscribe(&self) -> Receiver<T>648     pub fn subscribe(&self) -> Receiver<T> {
649         let shared = self.shared.clone();
650         new_receiver(shared)
651     }
652 
653     /// Returns the number of queued values.
654     ///
655     /// A value is queued until it has either been seen by all receivers that were alive at the time
656     /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
657     /// queue's capacity.
658     ///
659     /// # Note
660     ///
661     /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
662     /// have been evicted from the queue before being seen by all receivers.
663     ///
664     /// # Examples
665     ///
666     /// ```
667     /// use tokio::sync::broadcast;
668     ///
669     /// #[tokio::main]
670     /// async fn main() {
671     ///     let (tx, mut rx1) = broadcast::channel(16);
672     ///     let mut rx2 = tx.subscribe();
673     ///
674     ///     tx.send(10).unwrap();
675     ///     tx.send(20).unwrap();
676     ///     tx.send(30).unwrap();
677     ///
678     ///     assert_eq!(tx.len(), 3);
679     ///
680     ///     rx1.recv().await.unwrap();
681     ///
682     ///     // The len is still 3 since rx2 hasn't seen the first value yet.
683     ///     assert_eq!(tx.len(), 3);
684     ///
685     ///     rx2.recv().await.unwrap();
686     ///
687     ///     assert_eq!(tx.len(), 2);
688     /// }
689     /// ```
len(&self) -> usize690     pub fn len(&self) -> usize {
691         let tail = self.shared.tail.lock();
692 
693         let base_idx = (tail.pos & self.shared.mask as u64) as usize;
694         let mut low = 0;
695         let mut high = self.shared.buffer.len();
696         while low < high {
697             let mid = low + (high - low) / 2;
698             let idx = base_idx.wrapping_add(mid) & self.shared.mask;
699             if self.shared.buffer[idx].read().rem.load(SeqCst) == 0 {
700                 low = mid + 1;
701             } else {
702                 high = mid;
703             }
704         }
705 
706         self.shared.buffer.len() - low
707     }
708 
709     /// Returns true if there are no queued values.
710     ///
711     /// # Examples
712     ///
713     /// ```
714     /// use tokio::sync::broadcast;
715     ///
716     /// #[tokio::main]
717     /// async fn main() {
718     ///     let (tx, mut rx1) = broadcast::channel(16);
719     ///     let mut rx2 = tx.subscribe();
720     ///
721     ///     assert!(tx.is_empty());
722     ///
723     ///     tx.send(10).unwrap();
724     ///
725     ///     assert!(!tx.is_empty());
726     ///
727     ///     rx1.recv().await.unwrap();
728     ///
729     ///     // The queue is still not empty since rx2 hasn't seen the value.
730     ///     assert!(!tx.is_empty());
731     ///
732     ///     rx2.recv().await.unwrap();
733     ///
734     ///     assert!(tx.is_empty());
735     /// }
736     /// ```
is_empty(&self) -> bool737     pub fn is_empty(&self) -> bool {
738         let tail = self.shared.tail.lock();
739 
740         let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
741         self.shared.buffer[idx].read().rem.load(SeqCst) == 0
742     }
743 
744     /// Returns the number of active receivers.
745     ///
746     /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
747     /// [`subscribe`]. These are the handles that will receive values sent on
748     /// this [`Sender`].
749     ///
750     /// # Note
751     ///
752     /// It is not guaranteed that a sent message will reach this number of
753     /// receivers. Active receivers may never call [`recv`] again before
754     /// dropping.
755     ///
756     /// [`recv`]: crate::sync::broadcast::Receiver::recv
757     /// [`Receiver`]: crate::sync::broadcast::Receiver
758     /// [`Sender`]: crate::sync::broadcast::Sender
759     /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
760     /// [`channel`]: crate::sync::broadcast::channel
761     ///
762     /// # Examples
763     ///
764     /// ```
765     /// use tokio::sync::broadcast;
766     ///
767     /// #[tokio::main]
768     /// async fn main() {
769     ///     let (tx, _rx1) = broadcast::channel(16);
770     ///
771     ///     assert_eq!(1, tx.receiver_count());
772     ///
773     ///     let mut _rx2 = tx.subscribe();
774     ///
775     ///     assert_eq!(2, tx.receiver_count());
776     ///
777     ///     tx.send(10).unwrap();
778     /// }
779     /// ```
receiver_count(&self) -> usize780     pub fn receiver_count(&self) -> usize {
781         let tail = self.shared.tail.lock();
782         tail.rx_cnt
783     }
784 
785     /// Returns `true` if senders belong to the same channel.
786     ///
787     /// # Examples
788     ///
789     /// ```
790     /// use tokio::sync::broadcast;
791     ///
792     /// #[tokio::main]
793     /// async fn main() {
794     ///     let (tx, _rx) = broadcast::channel::<()>(16);
795     ///     let tx2 = tx.clone();
796     ///
797     ///     assert!(tx.same_channel(&tx2));
798     ///
799     ///     let (tx3, _rx3) = broadcast::channel::<()>(16);
800     ///
801     ///     assert!(!tx3.same_channel(&tx2));
802     /// }
803     /// ```
same_channel(&self, other: &Self) -> bool804     pub fn same_channel(&self, other: &Self) -> bool {
805         Arc::ptr_eq(&self.shared, &other.shared)
806     }
807 
close_channel(&self)808     fn close_channel(&self) {
809         let mut tail = self.shared.tail.lock();
810         tail.closed = true;
811 
812         self.shared.notify_rx(tail);
813     }
814 }
815 
816 /// Create a new `Receiver` which reads starting from the tail.
new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T>817 fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
818     let mut tail = shared.tail.lock();
819 
820     assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
821 
822     tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
823 
824     let next = tail.pos;
825 
826     drop(tail);
827 
828     Receiver { shared, next }
829 }
830 
831 /// List used in `Shared::notify_rx`. It wraps a guarded linked list
832 /// and gates the access to it on the `Shared.tail` mutex. It also empties
833 /// the list on drop.
834 struct WaitersList<'a, T> {
835     list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
836     is_empty: bool,
837     shared: &'a Shared<T>,
838 }
839 
840 impl<'a, T> Drop for WaitersList<'a, T> {
drop(&mut self)841     fn drop(&mut self) {
842         // If the list is not empty, we unlink all waiters from it.
843         // We do not wake the waiters to avoid double panics.
844         if !self.is_empty {
845             let _lock_guard = self.shared.tail.lock();
846             while self.list.pop_back().is_some() {}
847         }
848     }
849 }
850 
851 impl<'a, T> WaitersList<'a, T> {
new( unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>, guard: Pin<&'a Waiter>, shared: &'a Shared<T>, ) -> Self852     fn new(
853         unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
854         guard: Pin<&'a Waiter>,
855         shared: &'a Shared<T>,
856     ) -> Self {
857         let guard_ptr = NonNull::from(guard.get_ref());
858         let list = unguarded_list.into_guarded(guard_ptr);
859         WaitersList {
860             list,
861             is_empty: false,
862             shared,
863         }
864     }
865 
866     /// Removes the last element from the guarded list. Modifying this list
867     /// requires an exclusive access to the main list in `Notify`.
pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>>868     fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
869         let result = self.list.pop_back();
870         if result.is_none() {
871             // Save information about emptiness to avoid waiting for lock
872             // in the destructor.
873             self.is_empty = true;
874         }
875         result
876     }
877 }
878 
879 impl<T> Shared<T> {
notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>)880     fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
881         // It is critical for `GuardedLinkedList` safety that the guard node is
882         // pinned in memory and is not dropped until the guarded list is dropped.
883         let guard = Waiter::new();
884         pin!(guard);
885 
886         // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
887         // underneath to allow every waiter to safely remove itself from it.
888         //
889         // * This list will be still guarded by the `waiters` lock.
890         //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
891         // * This wrapper will empty the list on drop. It is critical for safety
892         //   that we will not leave any list entry with a pointer to the local
893         //   guard node after this function returns / panics.
894         let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
895 
896         let mut wakers = WakeList::new();
897         'outer: loop {
898             while wakers.can_push() {
899                 match list.pop_back_locked(&mut tail) {
900                     Some(waiter) => {
901                         unsafe {
902                             // Safety: accessing `waker` is safe because
903                             // the tail lock is held.
904                             if let Some(waker) = (*waiter.as_ptr()).waker.take() {
905                                 wakers.push(waker);
906                             }
907 
908                             // Safety: `queued` is atomic.
909                             let queued = &(*waiter.as_ptr()).queued;
910                             // `Relaxed` suffices because the tail lock is held.
911                             assert!(queued.load(Relaxed));
912                             // `Release` is needed to synchronize with `Recv::drop`.
913                             // It is critical to set this variable **after** waker
914                             // is extracted, otherwise we may data race with `Recv::drop`.
915                             queued.store(false, Release);
916                         }
917                     }
918                     None => {
919                         break 'outer;
920                     }
921                 }
922             }
923 
924             // Release the lock before waking.
925             drop(tail);
926 
927             // Before we acquire the lock again all sorts of things can happen:
928             // some waiters may remove themselves from the list and new waiters
929             // may be added. This is fine since at worst we will unnecessarily
930             // wake up waiters which will then queue themselves again.
931 
932             wakers.wake_all();
933 
934             // Acquire the lock again.
935             tail = self.tail.lock();
936         }
937 
938         // Release the lock before waking.
939         drop(tail);
940 
941         wakers.wake_all();
942     }
943 }
944 
945 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>946     fn clone(&self) -> Sender<T> {
947         let shared = self.shared.clone();
948         shared.num_tx.fetch_add(1, SeqCst);
949 
950         Sender { shared }
951     }
952 }
953 
954 impl<T> Drop for Sender<T> {
drop(&mut self)955     fn drop(&mut self) {
956         if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
957             self.close_channel();
958         }
959     }
960 }
961 
962 impl<T> Receiver<T> {
963     /// Returns the number of messages that were sent into the channel and that
964     /// this [`Receiver`] has yet to receive.
965     ///
966     /// If the returned value from `len` is larger than the next largest power of 2
967     /// of the capacity of the channel any call to [`recv`] will return an
968     /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
969     /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
970     /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
971     /// values larger than 16.
972     ///
973     /// [`Receiver`]: crate::sync::broadcast::Receiver
974     /// [`recv`]: crate::sync::broadcast::Receiver::recv
975     /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
976     ///
977     /// # Examples
978     ///
979     /// ```
980     /// use tokio::sync::broadcast;
981     ///
982     /// #[tokio::main]
983     /// async fn main() {
984     ///     let (tx, mut rx1) = broadcast::channel(16);
985     ///
986     ///     tx.send(10).unwrap();
987     ///     tx.send(20).unwrap();
988     ///
989     ///     assert_eq!(rx1.len(), 2);
990     ///     assert_eq!(rx1.recv().await.unwrap(), 10);
991     ///     assert_eq!(rx1.len(), 1);
992     ///     assert_eq!(rx1.recv().await.unwrap(), 20);
993     ///     assert_eq!(rx1.len(), 0);
994     /// }
995     /// ```
len(&self) -> usize996     pub fn len(&self) -> usize {
997         let next_send_pos = self.shared.tail.lock().pos;
998         (next_send_pos - self.next) as usize
999     }
1000 
1001     /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1002     /// has yet to receive.
1003     ///
1004     /// [`Receiver]: create::sync::broadcast::Receiver
1005     ///
1006     /// # Examples
1007     ///
1008     /// ```
1009     /// use tokio::sync::broadcast;
1010     ///
1011     /// #[tokio::main]
1012     /// async fn main() {
1013     ///     let (tx, mut rx1) = broadcast::channel(16);
1014     ///
1015     ///     assert!(rx1.is_empty());
1016     ///
1017     ///     tx.send(10).unwrap();
1018     ///     tx.send(20).unwrap();
1019     ///
1020     ///     assert!(!rx1.is_empty());
1021     ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1022     ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1023     ///     assert!(rx1.is_empty());
1024     /// }
1025     /// ```
is_empty(&self) -> bool1026     pub fn is_empty(&self) -> bool {
1027         self.len() == 0
1028     }
1029 
1030     /// Returns `true` if receivers belong to the same channel.
1031     ///
1032     /// # Examples
1033     ///
1034     /// ```
1035     /// use tokio::sync::broadcast;
1036     ///
1037     /// #[tokio::main]
1038     /// async fn main() {
1039     ///     let (tx, rx) = broadcast::channel::<()>(16);
1040     ///     let rx2 = tx.subscribe();
1041     ///
1042     ///     assert!(rx.same_channel(&rx2));
1043     ///
1044     ///     let (_tx3, rx3) = broadcast::channel::<()>(16);
1045     ///
1046     ///     assert!(!rx3.same_channel(&rx2));
1047     /// }
1048     /// ```
same_channel(&self, other: &Self) -> bool1049     pub fn same_channel(&self, other: &Self) -> bool {
1050         Arc::ptr_eq(&self.shared, &other.shared)
1051     }
1052 
1053     /// Locks the next value if there is one.
recv_ref( &mut self, waiter: Option<(&UnsafeCell<Waiter>, &Waker)>, ) -> Result<RecvGuard<'_, T>, TryRecvError>1054     fn recv_ref(
1055         &mut self,
1056         waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1057     ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1058         let idx = (self.next & self.shared.mask as u64) as usize;
1059 
1060         // The slot holding the next value to read
1061         let mut slot = self.shared.buffer[idx].read();
1062 
1063         if slot.pos != self.next {
1064             // Release the `slot` lock before attempting to acquire the `tail`
1065             // lock. This is required because `send2` acquires the tail lock
1066             // first followed by the slot lock. Acquiring the locks in reverse
1067             // order here would result in a potential deadlock: `recv_ref`
1068             // acquires the `slot` lock and attempts to acquire the `tail` lock
1069             // while `send2` acquired the `tail` lock and attempts to acquire
1070             // the slot lock.
1071             drop(slot);
1072 
1073             let mut old_waker = None;
1074 
1075             let mut tail = self.shared.tail.lock();
1076 
1077             // Acquire slot lock again
1078             slot = self.shared.buffer[idx].read();
1079 
1080             // Make sure the position did not change. This could happen in the
1081             // unlikely event that the buffer is wrapped between dropping the
1082             // read lock and acquiring the tail lock.
1083             if slot.pos != self.next {
1084                 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1085 
1086                 if next_pos == self.next {
1087                     // At this point the channel is empty for *this* receiver. If
1088                     // it's been closed, then that's what we return, otherwise we
1089                     // set a waker and return empty.
1090                     if tail.closed {
1091                         return Err(TryRecvError::Closed);
1092                     }
1093 
1094                     // Store the waker
1095                     if let Some((waiter, waker)) = waiter {
1096                         // Safety: called while locked.
1097                         unsafe {
1098                             // Only queue if not already queued
1099                             waiter.with_mut(|ptr| {
1100                                 // If there is no waker **or** if the currently
1101                                 // stored waker references a **different** task,
1102                                 // track the tasks' waker to be notified on
1103                                 // receipt of a new value.
1104                                 match (*ptr).waker {
1105                                     Some(ref w) if w.will_wake(waker) => {}
1106                                     _ => {
1107                                         old_waker = std::mem::replace(
1108                                             &mut (*ptr).waker,
1109                                             Some(waker.clone()),
1110                                         );
1111                                     }
1112                                 }
1113 
1114                                 // If the waiter is not already queued, enqueue it.
1115                                 // `Relaxed` order suffices: we have synchronized with
1116                                 // all writers through the tail lock that we hold.
1117                                 if !(*ptr).queued.load(Relaxed) {
1118                                     // `Relaxed` order suffices: all the readers will
1119                                     // synchronize with this write through the tail lock.
1120                                     (*ptr).queued.store(true, Relaxed);
1121                                     tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1122                                 }
1123                             });
1124                         }
1125                     }
1126 
1127                     // Drop the old waker after releasing the locks.
1128                     drop(slot);
1129                     drop(tail);
1130                     drop(old_waker);
1131 
1132                     return Err(TryRecvError::Empty);
1133                 }
1134 
1135                 // At this point, the receiver has lagged behind the sender by
1136                 // more than the channel capacity. The receiver will attempt to
1137                 // catch up by skipping dropped messages and setting the
1138                 // internal cursor to the **oldest** message stored by the
1139                 // channel.
1140                 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1141 
1142                 let missed = next.wrapping_sub(self.next);
1143 
1144                 drop(tail);
1145 
1146                 // The receiver is slow but no values have been missed
1147                 if missed == 0 {
1148                     self.next = self.next.wrapping_add(1);
1149 
1150                     return Ok(RecvGuard { slot });
1151                 }
1152 
1153                 self.next = next;
1154 
1155                 return Err(TryRecvError::Lagged(missed));
1156             }
1157         }
1158 
1159         self.next = self.next.wrapping_add(1);
1160 
1161         Ok(RecvGuard { slot })
1162     }
1163 }
1164 
1165 impl<T: Clone> Receiver<T> {
1166     /// Re-subscribes to the channel starting from the current tail element.
1167     ///
1168     /// This [`Receiver`] handle will receive a clone of all values sent
1169     /// **after** it has resubscribed. This will not include elements that are
1170     /// in the queue of the current receiver. Consider the following example.
1171     ///
1172     /// # Examples
1173     ///
1174     /// ```
1175     /// use tokio::sync::broadcast;
1176     ///
1177     /// #[tokio::main]
1178     /// async fn main() {
1179     ///   let (tx, mut rx) = broadcast::channel(2);
1180     ///
1181     ///   tx.send(1).unwrap();
1182     ///   let mut rx2 = rx.resubscribe();
1183     ///   tx.send(2).unwrap();
1184     ///
1185     ///   assert_eq!(rx2.recv().await.unwrap(), 2);
1186     ///   assert_eq!(rx.recv().await.unwrap(), 1);
1187     /// }
1188     /// ```
resubscribe(&self) -> Self1189     pub fn resubscribe(&self) -> Self {
1190         let shared = self.shared.clone();
1191         new_receiver(shared)
1192     }
1193     /// Receives the next value for this receiver.
1194     ///
1195     /// Each [`Receiver`] handle will receive a clone of all values sent
1196     /// **after** it has subscribed.
1197     ///
1198     /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1199     /// dropped, indicating that no further values can be sent on the channel.
1200     ///
1201     /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1202     /// sent values will overwrite old values. At this point, a call to [`recv`]
1203     /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1204     /// internal cursor is updated to point to the oldest value still held by
1205     /// the channel. A subsequent call to [`recv`] will return this value
1206     /// **unless** it has been since overwritten.
1207     ///
1208     /// # Cancel safety
1209     ///
1210     /// This method is cancel safe. If `recv` is used as the event in a
1211     /// [`tokio::select!`](crate::select) statement and some other branch
1212     /// completes first, it is guaranteed that no messages were received on this
1213     /// channel.
1214     ///
1215     /// [`Receiver`]: crate::sync::broadcast::Receiver
1216     /// [`recv`]: crate::sync::broadcast::Receiver::recv
1217     ///
1218     /// # Examples
1219     ///
1220     /// ```
1221     /// use tokio::sync::broadcast;
1222     ///
1223     /// #[tokio::main]
1224     /// async fn main() {
1225     ///     let (tx, mut rx1) = broadcast::channel(16);
1226     ///     let mut rx2 = tx.subscribe();
1227     ///
1228     ///     tokio::spawn(async move {
1229     ///         assert_eq!(rx1.recv().await.unwrap(), 10);
1230     ///         assert_eq!(rx1.recv().await.unwrap(), 20);
1231     ///     });
1232     ///
1233     ///     tokio::spawn(async move {
1234     ///         assert_eq!(rx2.recv().await.unwrap(), 10);
1235     ///         assert_eq!(rx2.recv().await.unwrap(), 20);
1236     ///     });
1237     ///
1238     ///     tx.send(10).unwrap();
1239     ///     tx.send(20).unwrap();
1240     /// }
1241     /// ```
1242     ///
1243     /// Handling lag
1244     ///
1245     /// ```
1246     /// use tokio::sync::broadcast;
1247     ///
1248     /// #[tokio::main]
1249     /// async fn main() {
1250     ///     let (tx, mut rx) = broadcast::channel(2);
1251     ///
1252     ///     tx.send(10).unwrap();
1253     ///     tx.send(20).unwrap();
1254     ///     tx.send(30).unwrap();
1255     ///
1256     ///     // The receiver lagged behind
1257     ///     assert!(rx.recv().await.is_err());
1258     ///
1259     ///     // At this point, we can abort or continue with lost messages
1260     ///
1261     ///     assert_eq!(20, rx.recv().await.unwrap());
1262     ///     assert_eq!(30, rx.recv().await.unwrap());
1263     /// }
1264     /// ```
recv(&mut self) -> Result<T, RecvError>1265     pub async fn recv(&mut self) -> Result<T, RecvError> {
1266         cooperative(Recv::new(self)).await
1267     }
1268 
1269     /// Attempts to return a pending value on this receiver without awaiting.
1270     ///
1271     /// This is useful for a flavor of "optimistic check" before deciding to
1272     /// await on a receiver.
1273     ///
1274     /// Compared with [`recv`], this function has three failure cases instead of two
1275     /// (one for closed, one for an empty buffer, one for a lagging receiver).
1276     ///
1277     /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1278     /// dropped, indicating that no further values can be sent on the channel.
1279     ///
1280     /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1281     /// sent values will overwrite old values. At this point, a call to [`recv`]
1282     /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1283     /// internal cursor is updated to point to the oldest value still held by
1284     /// the channel. A subsequent call to [`try_recv`] will return this value
1285     /// **unless** it has been since overwritten. If there are no values to
1286     /// receive, `Err(TryRecvError::Empty)` is returned.
1287     ///
1288     /// [`recv`]: crate::sync::broadcast::Receiver::recv
1289     /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1290     /// [`Receiver`]: crate::sync::broadcast::Receiver
1291     ///
1292     /// # Examples
1293     ///
1294     /// ```
1295     /// use tokio::sync::broadcast;
1296     ///
1297     /// #[tokio::main]
1298     /// async fn main() {
1299     ///     let (tx, mut rx) = broadcast::channel(16);
1300     ///
1301     ///     assert!(rx.try_recv().is_err());
1302     ///
1303     ///     tx.send(10).unwrap();
1304     ///
1305     ///     let value = rx.try_recv().unwrap();
1306     ///     assert_eq!(10, value);
1307     /// }
1308     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>1309     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1310         let guard = self.recv_ref(None)?;
1311         guard.clone_value().ok_or(TryRecvError::Closed)
1312     }
1313 
1314     /// Blocking receive to call outside of asynchronous contexts.
1315     ///
1316     /// # Panics
1317     ///
1318     /// This function panics if called within an asynchronous execution
1319     /// context.
1320     ///
1321     /// # Examples
1322     /// ```
1323     /// use std::thread;
1324     /// use tokio::sync::broadcast;
1325     ///
1326     /// #[tokio::main]
1327     /// async fn main() {
1328     ///     let (tx, mut rx) = broadcast::channel(16);
1329     ///
1330     ///     let sync_code = thread::spawn(move || {
1331     ///         assert_eq!(rx.blocking_recv(), Ok(10));
1332     ///     });
1333     ///
1334     ///     let _ = tx.send(10);
1335     ///     sync_code.join().unwrap();
1336     /// }
1337     /// ```
blocking_recv(&mut self) -> Result<T, RecvError>1338     pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1339         crate::future::block_on(self.recv())
1340     }
1341 }
1342 
1343 impl<T> Drop for Receiver<T> {
drop(&mut self)1344     fn drop(&mut self) {
1345         let mut tail = self.shared.tail.lock();
1346 
1347         tail.rx_cnt -= 1;
1348         let until = tail.pos;
1349 
1350         drop(tail);
1351 
1352         while self.next < until {
1353             match self.recv_ref(None) {
1354                 Ok(_) => {}
1355                 // The channel is closed
1356                 Err(TryRecvError::Closed) => break,
1357                 // Ignore lagging, we will catch up
1358                 Err(TryRecvError::Lagged(..)) => {}
1359                 // Can't be empty
1360                 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1361             }
1362         }
1363     }
1364 }
1365 
1366 impl<'a, T> Recv<'a, T> {
new(receiver: &'a mut Receiver<T>) -> Recv<'a, T>1367     fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1368         Recv {
1369             receiver,
1370             waiter: UnsafeCell::new(Waiter {
1371                 queued: AtomicBool::new(false),
1372                 waker: None,
1373                 pointers: linked_list::Pointers::new(),
1374                 _p: PhantomPinned,
1375             }),
1376         }
1377     }
1378 
1379     /// A custom `project` implementation is used in place of `pin-project-lite`
1380     /// as a custom drop implementation is needed.
project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>)1381     fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1382         unsafe {
1383             // Safety: Receiver is Unpin
1384             is_unpin::<&mut Receiver<T>>();
1385 
1386             let me = self.get_unchecked_mut();
1387             (me.receiver, &me.waiter)
1388         }
1389     }
1390 }
1391 
1392 impl<'a, T> Future for Recv<'a, T>
1393 where
1394     T: Clone,
1395 {
1396     type Output = Result<T, RecvError>;
1397 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>1398     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1399         ready!(crate::trace::trace_leaf(cx));
1400 
1401         let (receiver, waiter) = self.project();
1402 
1403         let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1404             Ok(value) => value,
1405             Err(TryRecvError::Empty) => return Poll::Pending,
1406             Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1407             Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1408         };
1409 
1410         Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1411     }
1412 }
1413 
1414 impl<'a, T> Drop for Recv<'a, T> {
drop(&mut self)1415     fn drop(&mut self) {
1416         // Safety: `waiter.queued` is atomic.
1417         // Acquire ordering is required to synchronize with
1418         // `Shared::notify_rx` before we drop the object.
1419         let queued = self
1420             .waiter
1421             .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1422 
1423         // If the waiter is queued, we need to unlink it from the waiters list.
1424         // If not, no further synchronization is required, since the waiter
1425         // is not in the list and, as such, is not shared with any other threads.
1426         if queued {
1427             // Acquire the tail lock. This is required for safety before accessing
1428             // the waiter node.
1429             let mut tail = self.receiver.shared.tail.lock();
1430 
1431             // Safety: tail lock is held.
1432             // `Relaxed` order suffices because we hold the tail lock.
1433             let queued = self
1434                 .waiter
1435                 .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1436 
1437             if queued {
1438                 // Remove the node
1439                 //
1440                 // safety: tail lock is held and the wait node is verified to be in
1441                 // the list.
1442                 unsafe {
1443                     self.waiter.with_mut(|ptr| {
1444                         tail.waiters.remove((&mut *ptr).into());
1445                     });
1446                 }
1447             }
1448         }
1449     }
1450 }
1451 
1452 /// # Safety
1453 ///
1454 /// `Waiter` is forced to be !Unpin.
1455 unsafe impl linked_list::Link for Waiter {
1456     type Handle = NonNull<Waiter>;
1457     type Target = Waiter;
1458 
as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter>1459     fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1460         *handle
1461     }
1462 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>1463     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1464         ptr
1465     }
1466 
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>1467     unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1468         Waiter::addr_of_pointers(target)
1469     }
1470 }
1471 
1472 impl<T> fmt::Debug for Sender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1473     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1474         write!(fmt, "broadcast::Sender")
1475     }
1476 }
1477 
1478 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1479     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1480         write!(fmt, "broadcast::Receiver")
1481     }
1482 }
1483 
1484 impl<'a, T> RecvGuard<'a, T> {
clone_value(&self) -> Option<T> where T: Clone,1485     fn clone_value(&self) -> Option<T>
1486     where
1487         T: Clone,
1488     {
1489         self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1490     }
1491 }
1492 
1493 impl<'a, T> Drop for RecvGuard<'a, T> {
drop(&mut self)1494     fn drop(&mut self) {
1495         // Decrement the remaining counter
1496         if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1497             // Safety: Last receiver, drop the value
1498             self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1499         }
1500     }
1501 }
1502 
is_unpin<T: Unpin>()1503 fn is_unpin<T: Unpin>() {}
1504 
1505 #[cfg(not(loom))]
1506 #[cfg(test)]
1507 mod tests {
1508     use super::*;
1509 
1510     #[test]
receiver_count_on_sender_constructor()1511     fn receiver_count_on_sender_constructor() {
1512         let sender = Sender::<i32>::new(16);
1513         assert_eq!(sender.receiver_count(), 0);
1514 
1515         let rx_1 = sender.subscribe();
1516         assert_eq!(sender.receiver_count(), 1);
1517 
1518         let rx_2 = rx_1.resubscribe();
1519         assert_eq!(sender.receiver_count(), 2);
1520 
1521         let rx_3 = sender.subscribe();
1522         assert_eq!(sender.receiver_count(), 3);
1523 
1524         drop(rx_3);
1525         drop(rx_1);
1526         assert_eq!(sender.receiver_count(), 1);
1527 
1528         drop(rx_2);
1529         assert_eq!(sender.receiver_count(), 0);
1530     }
1531 
1532     #[cfg(not(loom))]
1533     #[test]
receiver_count_on_channel_constructor()1534     fn receiver_count_on_channel_constructor() {
1535         let (sender, rx) = channel::<i32>(16);
1536         assert_eq!(sender.receiver_count(), 1);
1537 
1538         let _rx_2 = rx.resubscribe();
1539         assert_eq!(sender.receiver_count(), 2);
1540     }
1541 }
1542