1 //! A multi-producer, single-consumer queue for sending values across
2 //! asynchronous tasks.
3 //!
4 //! Similarly to the `std`, channel creation provides [`Receiver`] and
5 //! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6 //! read values out of the channel. If there is no message to read from the
7 //! channel, the current task will be notified when a new value is sent.
8 //! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9 //! the channel. If the channel is at capacity, the send will be rejected and
10 //! the task will be notified when additional capacity is available. In other
11 //! words, the channel provides backpressure.
12 //!
13 //! Unbounded channels are also available using the `unbounded` constructor.
14 //!
15 //! # Disconnection
16 //!
17 //! When all [`Sender`] handles have been dropped, it is no longer
18 //! possible to send values into the channel. This is considered the termination
19 //! event of the stream. As such, [`Receiver::poll_next`]
20 //! will return `Ok(Ready(None))`.
21 //!
22 //! If the [`Receiver`] handle is dropped, then messages can no longer
23 //! be read out of the channel. In this case, all further attempts to send will
24 //! result in an error.
25 //!
26 //! # Clean Shutdown
27 //!
28 //! If the [`Receiver`] is simply dropped, then it is possible for
29 //! there to be messages still in the channel that will not be processed. As
30 //! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31 //! receiver will first call `close`, which will prevent any further messages to
32 //! be sent into the channel. Then, the receiver consumes the channel to
33 //! completion, at which point the receiver can be dropped.
34 //!
35 //! [`Sender`]: struct.Sender.html
36 //! [`Receiver`]: struct.Receiver.html
37 //! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38 //! [`Receiver::poll_next`]:
39 //!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40 
41 // At the core, the channel uses an atomic FIFO queue for message passing. This
42 // queue is used as the primary coordination primitive. In order to enforce
43 // capacity limits and handle back pressure, a secondary FIFO queue is used to
44 // send parked task handles.
45 //
46 // The general idea is that the channel is created with a `buffer` size of `n`.
47 // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48 // slot to hold a message. This allows `Sender` to know for a fact that a send
49 // will succeed *before* starting to do the actual work of sending the value.
50 // Since most of this work is lock-free, once the work starts, it is impossible
51 // to safely revert.
52 //
53 // If the sender is unable to process a send operation, then the current
54 // task is parked and the handle is sent on the parked task queue.
55 //
56 // Note that the implementation guarantees that the channel capacity will never
57 // exceed the configured limit, however there is no *strict* guarantee that the
58 // receiver will wake up a parked task *immediately* when a slot becomes
59 // available. However, it will almost always unpark a task when a slot becomes
60 // available and it is *guaranteed* that a sender will be unparked when the
61 // message that caused the sender to become parked is read out of the channel.
62 //
63 // The steps for sending a message are roughly:
64 //
65 // 1) Increment the channel message count
66 // 2) If the channel is at capacity, push the task handle onto the wait queue
67 // 3) Push the message onto the message queue.
68 //
69 // The steps for receiving a message are roughly:
70 //
71 // 1) Pop a message from the message queue
72 // 2) Pop a task handle from the wait queue
73 // 3) Decrement the channel message count.
74 //
75 // It's important for the order of operations on lock-free structures to happen
76 // in reverse order between the sender and receiver. This makes the message
77 // queue the primary coordination structure and establishes the necessary
78 // happens-before semantics required for the acquire / release semantics used
79 // by the queue structure.
80 
81 use futures_core::stream::{FusedStream, Stream};
82 use futures_core::task::__internal::AtomicWaker;
83 use futures_core::task::{Context, Poll, Waker};
84 use std::fmt;
85 use std::pin::Pin;
86 use std::sync::atomic::AtomicUsize;
87 use std::sync::atomic::Ordering::SeqCst;
88 use std::sync::{Arc, Mutex};
89 use std::thread;
90 
91 use crate::mpsc::queue::Queue;
92 
93 mod queue;
94 #[cfg(feature = "sink")]
95 mod sink_impl;
96 
97 struct UnboundedSenderInner<T> {
98     // Channel state shared between the sender and receiver.
99     inner: Arc<UnboundedInner<T>>,
100 }
101 
102 struct BoundedSenderInner<T> {
103     // Channel state shared between the sender and receiver.
104     inner: Arc<BoundedInner<T>>,
105 
106     // Handle to the task that is blocked on this sender. This handle is sent
107     // to the receiver half in order to be notified when the sender becomes
108     // unblocked.
109     sender_task: Arc<Mutex<SenderTask>>,
110 
111     // `true` if the sender might be blocked. This is an optimization to avoid
112     // having to lock the mutex most of the time.
113     maybe_parked: bool,
114 }
115 
116 // We never project Pin<&mut SenderInner> to `Pin<&mut T>`
117 impl<T> Unpin for UnboundedSenderInner<T> {}
118 impl<T> Unpin for BoundedSenderInner<T> {}
119 
120 /// The transmission end of a bounded mpsc channel.
121 ///
122 /// This value is created by the [`channel`] function.
123 pub struct Sender<T>(Option<BoundedSenderInner<T>>);
124 
125 /// The transmission end of an unbounded mpsc channel.
126 ///
127 /// This value is created by the [`unbounded`] function.
128 pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
129 
130 #[allow(dead_code)]
131 trait AssertKinds: Send + Sync + Clone {}
132 impl AssertKinds for UnboundedSender<u32> {}
133 
134 /// The receiving end of a bounded mpsc channel.
135 ///
136 /// This value is created by the [`channel`] function.
137 pub struct Receiver<T> {
138     inner: Option<Arc<BoundedInner<T>>>,
139 }
140 
141 /// The receiving end of an unbounded mpsc channel.
142 ///
143 /// This value is created by the [`unbounded`] function.
144 pub struct UnboundedReceiver<T> {
145     inner: Option<Arc<UnboundedInner<T>>>,
146 }
147 
148 // `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
149 impl<T> Unpin for UnboundedReceiver<T> {}
150 
151 /// The error type for [`Sender`s](Sender) used as `Sink`s.
152 #[derive(Clone, Debug, PartialEq, Eq)]
153 pub struct SendError {
154     kind: SendErrorKind,
155 }
156 
157 /// The error type returned from [`try_send`](Sender::try_send).
158 #[derive(Clone, PartialEq, Eq)]
159 pub struct TrySendError<T> {
160     err: SendError,
161     val: T,
162 }
163 
164 #[derive(Clone, Debug, PartialEq, Eq)]
165 enum SendErrorKind {
166     Full,
167     Disconnected,
168 }
169 
170 /// The error type returned from [`try_next`](Receiver::try_next).
171 pub struct TryRecvError {
172     _priv: (),
173 }
174 
175 impl fmt::Display for SendError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result176     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177         if self.is_full() {
178             write!(f, "send failed because channel is full")
179         } else {
180             write!(f, "send failed because receiver is gone")
181         }
182     }
183 }
184 
185 impl std::error::Error for SendError {}
186 
187 impl SendError {
188     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool189     pub fn is_full(&self) -> bool {
190         match self.kind {
191             SendErrorKind::Full => true,
192             _ => false,
193         }
194     }
195 
196     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool197     pub fn is_disconnected(&self) -> bool {
198         match self.kind {
199             SendErrorKind::Disconnected => true,
200             _ => false,
201         }
202     }
203 }
204 
205 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result206     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207         f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
208     }
209 }
210 
211 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result212     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213         if self.is_full() {
214             write!(f, "send failed because channel is full")
215         } else {
216             write!(f, "send failed because receiver is gone")
217         }
218     }
219 }
220 
221 impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
222 
223 impl<T> TrySendError<T> {
224     /// Returns `true` if this error is a result of the channel being full.
is_full(&self) -> bool225     pub fn is_full(&self) -> bool {
226         self.err.is_full()
227     }
228 
229     /// Returns `true` if this error is a result of the receiver being dropped.
is_disconnected(&self) -> bool230     pub fn is_disconnected(&self) -> bool {
231         self.err.is_disconnected()
232     }
233 
234     /// Returns the message that was attempted to be sent but failed.
into_inner(self) -> T235     pub fn into_inner(self) -> T {
236         self.val
237     }
238 
239     /// Drops the message and converts into a `SendError`.
into_send_error(self) -> SendError240     pub fn into_send_error(self) -> SendError {
241         self.err
242     }
243 }
244 
245 impl fmt::Debug for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result246     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247         f.debug_tuple("TryRecvError").finish()
248     }
249 }
250 
251 impl fmt::Display for TryRecvError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result252     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253         write!(f, "receiver channel is empty")
254     }
255 }
256 
257 impl std::error::Error for TryRecvError {}
258 
259 struct UnboundedInner<T> {
260     // Internal channel state. Consists of the number of messages stored in the
261     // channel as well as a flag signalling that the channel is closed.
262     state: AtomicUsize,
263 
264     // Atomic, FIFO queue used to send messages to the receiver
265     message_queue: Queue<T>,
266 
267     // Number of senders in existence
268     num_senders: AtomicUsize,
269 
270     // Handle to the receiver's task.
271     recv_task: AtomicWaker,
272 }
273 
274 struct BoundedInner<T> {
275     // Max buffer size of the channel. If `None` then the channel is unbounded.
276     buffer: usize,
277 
278     // Internal channel state. Consists of the number of messages stored in the
279     // channel as well as a flag signalling that the channel is closed.
280     state: AtomicUsize,
281 
282     // Atomic, FIFO queue used to send messages to the receiver
283     message_queue: Queue<T>,
284 
285     // Atomic, FIFO queue used to send parked task handles to the receiver.
286     parked_queue: Queue<Arc<Mutex<SenderTask>>>,
287 
288     // Number of senders in existence
289     num_senders: AtomicUsize,
290 
291     // Handle to the receiver's task.
292     recv_task: AtomicWaker,
293 }
294 
295 // Struct representation of `Inner::state`.
296 #[derive(Clone, Copy)]
297 struct State {
298     // `true` when the channel is open
299     is_open: bool,
300 
301     // Number of messages in the channel
302     num_messages: usize,
303 }
304 
305 // The `is_open` flag is stored in the left-most bit of `Inner::state`
306 const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
307 
308 // When a new channel is created, it is created in the open state with no
309 // pending messages.
310 const INIT_STATE: usize = OPEN_MASK;
311 
312 // The maximum number of messages that a channel can track is `usize::MAX >> 1`
313 const MAX_CAPACITY: usize = !(OPEN_MASK);
314 
315 // The maximum requested buffer size must be less than the maximum capacity of
316 // a channel. This is because each sender gets a guaranteed slot.
317 const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
318 
319 // Sent to the consumer to wake up blocked producers
320 struct SenderTask {
321     task: Option<Waker>,
322     is_parked: bool,
323 }
324 
325 impl SenderTask {
new() -> Self326     fn new() -> Self {
327         Self { task: None, is_parked: false }
328     }
329 
notify(&mut self)330     fn notify(&mut self) {
331         self.is_parked = false;
332 
333         if let Some(task) = self.task.take() {
334             task.wake();
335         }
336     }
337 }
338 
339 /// Creates a bounded mpsc channel for communicating between asynchronous tasks.
340 ///
341 /// Being bounded, this channel provides backpressure to ensure that the sender
342 /// outpaces the receiver by only a limited amount. The channel's capacity is
343 /// equal to `buffer + num-senders`. In other words, each sender gets a
344 /// guaranteed slot in the channel capacity, and on top of that there are
345 /// `buffer` "first come, first serve" slots available to all senders.
346 ///
347 /// The [`Receiver`] returned implements the [`Stream`] trait, while [`Sender`]
348 /// implements `Sink`.
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)349 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
350     // Check that the requested buffer size does not exceed the maximum buffer
351     // size permitted by the system.
352     assert!(buffer < MAX_BUFFER, "requested buffer size too large");
353 
354     let inner = Arc::new(BoundedInner {
355         buffer,
356         state: AtomicUsize::new(INIT_STATE),
357         message_queue: Queue::new(),
358         parked_queue: Queue::new(),
359         num_senders: AtomicUsize::new(1),
360         recv_task: AtomicWaker::new(),
361     });
362 
363     let tx = BoundedSenderInner {
364         inner: inner.clone(),
365         sender_task: Arc::new(Mutex::new(SenderTask::new())),
366         maybe_parked: false,
367     };
368 
369     let rx = Receiver { inner: Some(inner) };
370 
371     (Sender(Some(tx)), rx)
372 }
373 
374 /// Creates an unbounded mpsc channel for communicating between asynchronous
375 /// tasks.
376 ///
377 /// A `send` on this channel will always succeed as long as the receive half has
378 /// not been closed. If the receiver falls behind, messages will be arbitrarily
379 /// buffered.
380 ///
381 /// **Note** that the amount of available system memory is an implicit bound to
382 /// the channel. Using an `unbounded` channel has the ability of causing the
383 /// process to run out of memory. In this case, the process will be aborted.
unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>)384 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
385     let inner = Arc::new(UnboundedInner {
386         state: AtomicUsize::new(INIT_STATE),
387         message_queue: Queue::new(),
388         num_senders: AtomicUsize::new(1),
389         recv_task: AtomicWaker::new(),
390     });
391 
392     let tx = UnboundedSenderInner { inner: inner.clone() };
393 
394     let rx = UnboundedReceiver { inner: Some(inner) };
395 
396     (UnboundedSender(Some(tx)), rx)
397 }
398 
399 /*
400  *
401  * ===== impl Sender =====
402  *
403  */
404 
405 impl<T> UnboundedSenderInner<T> {
poll_ready_nb(&self) -> Poll<Result<(), SendError>>406     fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
407         let state = decode_state(self.inner.state.load(SeqCst));
408         if state.is_open {
409             Poll::Ready(Ok(()))
410         } else {
411             Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
412         }
413     }
414 
415     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)416     fn queue_push_and_signal(&self, msg: T) {
417         // Push the message onto the message queue
418         self.inner.message_queue.push(msg);
419 
420         // Signal to the receiver that a message has been enqueued. If the
421         // receiver is parked, this will unpark the task.
422         self.inner.recv_task.wake();
423     }
424 
425     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>426     fn inc_num_messages(&self) -> Option<usize> {
427         let mut curr = self.inner.state.load(SeqCst);
428 
429         loop {
430             let mut state = decode_state(curr);
431 
432             // The receiver end closed the channel.
433             if !state.is_open {
434                 return None;
435             }
436 
437             // This probably is never hit? Odds are the process will run out of
438             // memory first. It may be worth to return something else in this
439             // case?
440             assert!(
441                 state.num_messages < MAX_CAPACITY,
442                 "buffer space \
443                     exhausted; sending this messages would overflow the state"
444             );
445 
446             state.num_messages += 1;
447 
448             let next = encode_state(&state);
449             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
450                 Ok(_) => return Some(state.num_messages),
451                 Err(actual) => curr = actual,
452             }
453         }
454     }
455 
456     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool457     fn same_receiver(&self, other: &Self) -> bool {
458         Arc::ptr_eq(&self.inner, &other.inner)
459     }
460 
461     /// Returns whether the sender send to this receiver.
is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool462     fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
463         Arc::ptr_eq(&self.inner, inner)
464     }
465 
466     /// Returns pointer to the Arc containing sender
467     ///
468     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const UnboundedInner<T>469     fn ptr(&self) -> *const UnboundedInner<T> {
470         &*self.inner
471     }
472 
473     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool474     fn is_closed(&self) -> bool {
475         !decode_state(self.inner.state.load(SeqCst)).is_open
476     }
477 
478     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)479     fn close_channel(&self) {
480         // There's no need to park this sender, its dropping,
481         // and we don't want to check for capacity, so skip
482         // that stuff from `do_send`.
483 
484         self.inner.set_closed();
485         self.inner.recv_task.wake();
486     }
487 }
488 
489 impl<T> BoundedSenderInner<T> {
490     /// Attempts to send a message on this `Sender`, returning the message
491     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>492     fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
493         // If the sender is currently blocked, reject the message
494         if !self.poll_unparked(None).is_ready() {
495             return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
496         }
497 
498         // The channel has capacity to accept the message, so send it
499         self.do_send_b(msg)
500     }
501 
502     // Do the send without failing.
503     // Can be called only by bounded sender.
do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>>504     fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
505         // Anyone calling do_send *should* make sure there is room first,
506         // but assert here for tests as a sanity check.
507         debug_assert!(self.poll_unparked(None).is_ready());
508 
509         // First, increment the number of messages contained by the channel.
510         // This operation will also atomically determine if the sender task
511         // should be parked.
512         //
513         // `None` is returned in the case that the channel has been closed by the
514         // receiver. This happens when `Receiver::close` is called or the
515         // receiver is dropped.
516         let park_self = match self.inc_num_messages() {
517             Some(num_messages) => {
518                 // Block if the current number of pending messages has exceeded
519                 // the configured buffer size
520                 num_messages > self.inner.buffer
521             }
522             None => {
523                 return Err(TrySendError {
524                     err: SendError { kind: SendErrorKind::Disconnected },
525                     val: msg,
526                 })
527             }
528         };
529 
530         // If the channel has reached capacity, then the sender task needs to
531         // be parked. This will send the task handle on the parked task queue.
532         //
533         // However, when `do_send` is called while dropping the `Sender`,
534         // `task::current()` can't be called safely. In this case, in order to
535         // maintain internal consistency, a blank message is pushed onto the
536         // parked task queue.
537         if park_self {
538             self.park();
539         }
540 
541         self.queue_push_and_signal(msg);
542 
543         Ok(())
544     }
545 
546     // Push message to the queue and signal to the receiver
queue_push_and_signal(&self, msg: T)547     fn queue_push_and_signal(&self, msg: T) {
548         // Push the message onto the message queue
549         self.inner.message_queue.push(msg);
550 
551         // Signal to the receiver that a message has been enqueued. If the
552         // receiver is parked, this will unpark the task.
553         self.inner.recv_task.wake();
554     }
555 
556     // Increment the number of queued messages. Returns the resulting number.
inc_num_messages(&self) -> Option<usize>557     fn inc_num_messages(&self) -> Option<usize> {
558         let mut curr = self.inner.state.load(SeqCst);
559 
560         loop {
561             let mut state = decode_state(curr);
562 
563             // The receiver end closed the channel.
564             if !state.is_open {
565                 return None;
566             }
567 
568             // This probably is never hit? Odds are the process will run out of
569             // memory first. It may be worth to return something else in this
570             // case?
571             assert!(
572                 state.num_messages < MAX_CAPACITY,
573                 "buffer space \
574                     exhausted; sending this messages would overflow the state"
575             );
576 
577             state.num_messages += 1;
578 
579             let next = encode_state(&state);
580             match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
581                 Ok(_) => return Some(state.num_messages),
582                 Err(actual) => curr = actual,
583             }
584         }
585     }
586 
park(&mut self)587     fn park(&mut self) {
588         {
589             let mut sender = self.sender_task.lock().unwrap();
590             sender.task = None;
591             sender.is_parked = true;
592         }
593 
594         // Send handle over queue
595         let t = self.sender_task.clone();
596         self.inner.parked_queue.push(t);
597 
598         // Check to make sure we weren't closed after we sent our task on the
599         // queue
600         let state = decode_state(self.inner.state.load(SeqCst));
601         self.maybe_parked = state.is_open;
602     }
603 
604     /// Polls the channel to determine if there is guaranteed capacity to send
605     /// at least one item without waiting.
606     ///
607     /// # Return value
608     ///
609     /// This method returns:
610     ///
611     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
612     /// - `Poll::Pending` if the channel may not have
613     ///   capacity, in which case the current task is queued to be notified once
614     ///   capacity is available;
615     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>>616     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
617         let state = decode_state(self.inner.state.load(SeqCst));
618         if !state.is_open {
619             return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
620         }
621 
622         self.poll_unparked(Some(cx)).map(Ok)
623     }
624 
625     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool626     fn same_receiver(&self, other: &Self) -> bool {
627         Arc::ptr_eq(&self.inner, &other.inner)
628     }
629 
630     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool631     fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
632         Arc::ptr_eq(&self.inner, receiver)
633     }
634 
635     /// Returns pointer to the Arc containing sender
636     ///
637     /// The returned pointer is not referenced and should be only used for hashing!
ptr(&self) -> *const BoundedInner<T>638     fn ptr(&self) -> *const BoundedInner<T> {
639         &*self.inner
640     }
641 
642     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool643     fn is_closed(&self) -> bool {
644         !decode_state(self.inner.state.load(SeqCst)).is_open
645     }
646 
647     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)648     fn close_channel(&self) {
649         // There's no need to park this sender, its dropping,
650         // and we don't want to check for capacity, so skip
651         // that stuff from `do_send`.
652 
653         self.inner.set_closed();
654         self.inner.recv_task.wake();
655     }
656 
poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()>657     fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
658         // First check the `maybe_parked` variable. This avoids acquiring the
659         // lock in most cases
660         if self.maybe_parked {
661             // Get a lock on the task handle
662             let mut task = self.sender_task.lock().unwrap();
663 
664             if !task.is_parked {
665                 self.maybe_parked = false;
666                 return Poll::Ready(());
667             }
668 
669             // At this point, an unpark request is pending, so there will be an
670             // unpark sometime in the future. We just need to make sure that
671             // the correct task will be notified.
672             //
673             // Update the task in case the `Sender` has been moved to another
674             // task
675             task.task = cx.map(|cx| cx.waker().clone());
676 
677             Poll::Pending
678         } else {
679             Poll::Ready(())
680         }
681     }
682 }
683 
684 impl<T> Sender<T> {
685     /// Attempts to send a message on this `Sender`, returning the message
686     /// if there was an error.
try_send(&mut self, msg: T) -> Result<(), TrySendError<T>>687     pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
688         if let Some(inner) = &mut self.0 {
689             inner.try_send(msg)
690         } else {
691             Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
692         }
693     }
694 
695     /// Send a message on the channel.
696     ///
697     /// This function should only be called after
698     /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
699     /// ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>700     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
701         self.try_send(msg).map_err(|e| e.err)
702     }
703 
704     /// Polls the channel to determine if there is guaranteed capacity to send
705     /// at least one item without waiting.
706     ///
707     /// # Return value
708     ///
709     /// This method returns:
710     ///
711     /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
712     /// - `Poll::Pending` if the channel may not have
713     ///   capacity, in which case the current task is queued to be notified once
714     ///   capacity is available;
715     /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>>716     pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
717         let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
718         inner.poll_ready(cx)
719     }
720 
721     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool722     pub fn is_closed(&self) -> bool {
723         self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
724     }
725 
726     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&mut self)727     pub fn close_channel(&mut self) {
728         if let Some(inner) = &mut self.0 {
729             inner.close_channel();
730         }
731     }
732 
733     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)734     pub fn disconnect(&mut self) {
735         self.0 = None;
736     }
737 
738     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool739     pub fn same_receiver(&self, other: &Self) -> bool {
740         match (&self.0, &other.0) {
741             (Some(inner), Some(other)) => inner.same_receiver(other),
742             _ => false,
743         }
744     }
745 
746     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &Receiver<T>) -> bool747     pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
748         match (&self.0, &receiver.inner) {
749             (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
750             _ => false,
751         }
752     }
753 
754     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher,755     pub fn hash_receiver<H>(&self, hasher: &mut H)
756     where
757         H: std::hash::Hasher,
758     {
759         use std::hash::Hash;
760 
761         let ptr = self.0.as_ref().map(|inner| inner.ptr());
762         ptr.hash(hasher);
763     }
764 }
765 
766 impl<T> UnboundedSender<T> {
767     /// Check if the channel is ready to receive a message.
poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>>768     pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
769         let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
770         inner.poll_ready_nb()
771     }
772 
773     /// Returns whether this channel is closed without needing a context.
is_closed(&self) -> bool774     pub fn is_closed(&self) -> bool {
775         self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
776     }
777 
778     /// Closes this channel from the sender side, preventing any new messages.
close_channel(&self)779     pub fn close_channel(&self) {
780         if let Some(inner) = &self.0 {
781             inner.close_channel();
782         }
783     }
784 
785     /// Disconnects this sender from the channel, closing it if there are no more senders left.
disconnect(&mut self)786     pub fn disconnect(&mut self) {
787         self.0 = None;
788     }
789 
790     // Do the send without parking current task.
do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>>791     fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
792         if let Some(inner) = &self.0 {
793             if inner.inc_num_messages().is_some() {
794                 inner.queue_push_and_signal(msg);
795                 return Ok(());
796             }
797         }
798 
799         Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
800     }
801 
802     /// Send a message on the channel.
803     ///
804     /// This method should only be called after `poll_ready` has been used to
805     /// verify that the channel is ready to receive a message.
start_send(&mut self, msg: T) -> Result<(), SendError>806     pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
807         self.do_send_nb(msg).map_err(|e| e.err)
808     }
809 
810     /// Sends a message along this channel.
811     ///
812     /// This is an unbounded sender, so this function differs from `Sink::send`
813     /// by ensuring the return type reflects that the channel is always ready to
814     /// receive messages.
unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>>815     pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
816         self.do_send_nb(msg)
817     }
818 
819     /// Returns whether the senders send to the same receiver.
same_receiver(&self, other: &Self) -> bool820     pub fn same_receiver(&self, other: &Self) -> bool {
821         match (&self.0, &other.0) {
822             (Some(inner), Some(other)) => inner.same_receiver(other),
823             _ => false,
824         }
825     }
826 
827     /// Returns whether the sender send to this receiver.
is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool828     pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
829         match (&self.0, &receiver.inner) {
830             (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
831             _ => false,
832         }
833     }
834 
835     /// Hashes the receiver into the provided hasher
hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher,836     pub fn hash_receiver<H>(&self, hasher: &mut H)
837     where
838         H: std::hash::Hasher,
839     {
840         use std::hash::Hash;
841 
842         let ptr = self.0.as_ref().map(|inner| inner.ptr());
843         ptr.hash(hasher);
844     }
845 
846     /// Return the number of messages in the queue or 0 if channel is disconnected.
len(&self) -> usize847     pub fn len(&self) -> usize {
848         if let Some(sender) = &self.0 {
849             decode_state(sender.inner.state.load(SeqCst)).num_messages
850         } else {
851             0
852         }
853     }
854 
855     /// Return false is channel has no queued messages, true otherwise.
is_empty(&self) -> bool856     pub fn is_empty(&self) -> bool {
857         self.len() == 0
858     }
859 }
860 
861 impl<T> Clone for Sender<T> {
clone(&self) -> Self862     fn clone(&self) -> Self {
863         Self(self.0.clone())
864     }
865 }
866 
867 impl<T> Clone for UnboundedSender<T> {
clone(&self) -> Self868     fn clone(&self) -> Self {
869         Self(self.0.clone())
870     }
871 }
872 
873 impl<T> Clone for UnboundedSenderInner<T> {
clone(&self) -> Self874     fn clone(&self) -> Self {
875         // Since this atomic op isn't actually guarding any memory and we don't
876         // care about any orderings besides the ordering on the single atomic
877         // variable, a relaxed ordering is acceptable.
878         let mut curr = self.inner.num_senders.load(SeqCst);
879 
880         loop {
881             // If the maximum number of senders has been reached, then fail
882             if curr == MAX_BUFFER {
883                 panic!("cannot clone `Sender` -- too many outstanding senders");
884             }
885 
886             debug_assert!(curr < MAX_BUFFER);
887 
888             let next = curr + 1;
889             match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
890                 Ok(_) => {
891                     // The ABA problem doesn't matter here. We only care that the
892                     // number of senders never exceeds the maximum.
893                     return Self { inner: self.inner.clone() };
894                 }
895                 Err(actual) => curr = actual,
896             }
897         }
898     }
899 }
900 
901 impl<T> Clone for BoundedSenderInner<T> {
clone(&self) -> Self902     fn clone(&self) -> Self {
903         // Since this atomic op isn't actually guarding any memory and we don't
904         // care about any orderings besides the ordering on the single atomic
905         // variable, a relaxed ordering is acceptable.
906         let mut curr = self.inner.num_senders.load(SeqCst);
907 
908         loop {
909             // If the maximum number of senders has been reached, then fail
910             if curr == self.inner.max_senders() {
911                 panic!("cannot clone `Sender` -- too many outstanding senders");
912             }
913 
914             debug_assert!(curr < self.inner.max_senders());
915 
916             let next = curr + 1;
917             match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
918                 Ok(_) => {
919                     // The ABA problem doesn't matter here. We only care that the
920                     // number of senders never exceeds the maximum.
921                     return Self {
922                         inner: self.inner.clone(),
923                         sender_task: Arc::new(Mutex::new(SenderTask::new())),
924                         maybe_parked: false,
925                     };
926                 }
927                 Err(actual) => curr = actual,
928             }
929         }
930     }
931 }
932 
933 impl<T> Drop for UnboundedSenderInner<T> {
drop(&mut self)934     fn drop(&mut self) {
935         // Ordering between variables don't matter here
936         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
937 
938         if prev == 1 {
939             self.close_channel();
940         }
941     }
942 }
943 
944 impl<T> Drop for BoundedSenderInner<T> {
drop(&mut self)945     fn drop(&mut self) {
946         // Ordering between variables don't matter here
947         let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
948 
949         if prev == 1 {
950             self.close_channel();
951         }
952     }
953 }
954 
955 impl<T> fmt::Debug for Sender<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result956     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
957         f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
958     }
959 }
960 
961 impl<T> fmt::Debug for UnboundedSender<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result962     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
963         f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
964     }
965 }
966 
967 /*
968  *
969  * ===== impl Receiver =====
970  *
971  */
972 
973 impl<T> Receiver<T> {
974     /// Closes the receiving half of a channel, without dropping it.
975     ///
976     /// This prevents any further messages from being sent on the channel while
977     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)978     pub fn close(&mut self) {
979         if let Some(inner) = &mut self.inner {
980             inner.set_closed();
981 
982             // Wake up any threads waiting as they'll see that we've closed the
983             // channel and will continue on their merry way.
984             while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
985                 task.lock().unwrap().notify();
986             }
987         }
988     }
989 
990     /// Tries to receive the next message without notifying a context if empty.
991     ///
992     /// It is not recommended to call this function from inside of a future,
993     /// only when you've otherwise arranged to be notified when the channel is
994     /// no longer empty.
995     ///
996     /// This function returns:
997     /// * `Ok(Some(t))` when message is fetched
998     /// * `Ok(None)` when channel is closed and no messages left in the queue
999     /// * `Err(e)` when there are no messages available, but channel is not yet closed
try_next(&mut self) -> Result<Option<T>, TryRecvError>1000     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1001         match self.next_message() {
1002             Poll::Ready(msg) => Ok(msg),
1003             Poll::Pending => Err(TryRecvError { _priv: () }),
1004         }
1005     }
1006 
next_message(&mut self) -> Poll<Option<T>>1007     fn next_message(&mut self) -> Poll<Option<T>> {
1008         let inner = match self.inner.as_mut() {
1009             None => return Poll::Ready(None),
1010             Some(inner) => inner,
1011         };
1012         // Pop off a message
1013         match unsafe { inner.message_queue.pop_spin() } {
1014             Some(msg) => {
1015                 // If there are any parked task handles in the parked queue,
1016                 // pop one and unpark it.
1017                 self.unpark_one();
1018 
1019                 // Decrement number of messages
1020                 self.dec_num_messages();
1021 
1022                 Poll::Ready(Some(msg))
1023             }
1024             None => {
1025                 let state = decode_state(inner.state.load(SeqCst));
1026                 if state.is_closed() {
1027                     // If closed flag is set AND there are no pending messages
1028                     // it means end of stream
1029                     self.inner = None;
1030                     Poll::Ready(None)
1031                 } else {
1032                     // If queue is open, we need to return Pending
1033                     // to be woken up when new messages arrive.
1034                     // If queue is closed but num_messages is non-zero,
1035                     // it means that senders updated the state,
1036                     // but didn't put message to queue yet,
1037                     // so we need to park until sender unparks the task
1038                     // after queueing the message.
1039                     Poll::Pending
1040                 }
1041             }
1042         }
1043     }
1044 
1045     // Unpark a single task handle if there is one pending in the parked queue
unpark_one(&mut self)1046     fn unpark_one(&mut self) {
1047         if let Some(inner) = &mut self.inner {
1048             if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1049                 task.lock().unwrap().notify();
1050             }
1051         }
1052     }
1053 
dec_num_messages(&self)1054     fn dec_num_messages(&self) {
1055         if let Some(inner) = &self.inner {
1056             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1057             // unless there's underflow, and we know there's no underflow
1058             // because number of messages at this point is always > 0.
1059             inner.state.fetch_sub(1, SeqCst);
1060         }
1061     }
1062 }
1063 
1064 // The receiver does not ever take a Pin to the inner T
1065 impl<T> Unpin for Receiver<T> {}
1066 
1067 impl<T> FusedStream for Receiver<T> {
is_terminated(&self) -> bool1068     fn is_terminated(&self) -> bool {
1069         self.inner.is_none()
1070     }
1071 }
1072 
1073 impl<T> Stream for Receiver<T> {
1074     type Item = T;
1075 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>1076     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1077         // Try to read a message off of the message queue.
1078         match self.next_message() {
1079             Poll::Ready(msg) => {
1080                 if msg.is_none() {
1081                     self.inner = None;
1082                 }
1083                 Poll::Ready(msg)
1084             }
1085             Poll::Pending => {
1086                 // There are no messages to read, in this case, park.
1087                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1088                 // Check queue again after parking to prevent race condition:
1089                 // a message could be added to the queue after previous `next_message`
1090                 // before `register` call.
1091                 self.next_message()
1092             }
1093         }
1094     }
1095 
size_hint(&self) -> (usize, Option<usize>)1096     fn size_hint(&self) -> (usize, Option<usize>) {
1097         if let Some(inner) = &self.inner {
1098             decode_state(inner.state.load(SeqCst)).size_hint()
1099         } else {
1100             (0, Some(0))
1101         }
1102     }
1103 }
1104 
1105 impl<T> Drop for Receiver<T> {
drop(&mut self)1106     fn drop(&mut self) {
1107         // Drain the channel of all pending messages
1108         self.close();
1109         if self.inner.is_some() {
1110             loop {
1111                 match self.next_message() {
1112                     Poll::Ready(Some(_)) => {}
1113                     Poll::Ready(None) => break,
1114                     Poll::Pending => {
1115                         let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1116 
1117                         // If the channel is closed, then there is no need to park.
1118                         if state.is_closed() {
1119                             break;
1120                         }
1121 
1122                         // TODO: Spinning isn't ideal, it might be worth
1123                         // investigating using a condvar or some other strategy
1124                         // here. That said, if this case is hit, then another thread
1125                         // is about to push the value into the queue and this isn't
1126                         // the only spinlock in the impl right now.
1127                         thread::yield_now();
1128                     }
1129                 }
1130             }
1131         }
1132     }
1133 }
1134 
1135 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1136     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1137         let closed = if let Some(ref inner) = self.inner {
1138             decode_state(inner.state.load(SeqCst)).is_closed()
1139         } else {
1140             false
1141         };
1142 
1143         f.debug_struct("Receiver").field("closed", &closed).finish()
1144     }
1145 }
1146 
1147 impl<T> UnboundedReceiver<T> {
1148     /// Closes the receiving half of a channel, without dropping it.
1149     ///
1150     /// This prevents any further messages from being sent on the channel while
1151     /// still enabling the receiver to drain messages that are buffered.
close(&mut self)1152     pub fn close(&mut self) {
1153         if let Some(inner) = &mut self.inner {
1154             inner.set_closed();
1155         }
1156     }
1157 
1158     /// Tries to receive the next message without notifying a context if empty.
1159     ///
1160     /// It is not recommended to call this function from inside of a future,
1161     /// only when you've otherwise arranged to be notified when the channel is
1162     /// no longer empty.
1163     ///
1164     /// This function returns:
1165     /// * `Ok(Some(t))` when message is fetched
1166     /// * `Ok(None)` when channel is closed and no messages left in the queue
1167     /// * `Err(e)` when there are no messages available, but channel is not yet closed
try_next(&mut self) -> Result<Option<T>, TryRecvError>1168     pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1169         match self.next_message() {
1170             Poll::Ready(msg) => Ok(msg),
1171             Poll::Pending => Err(TryRecvError { _priv: () }),
1172         }
1173     }
1174 
next_message(&mut self) -> Poll<Option<T>>1175     fn next_message(&mut self) -> Poll<Option<T>> {
1176         let inner = match self.inner.as_mut() {
1177             None => return Poll::Ready(None),
1178             Some(inner) => inner,
1179         };
1180         // Pop off a message
1181         match unsafe { inner.message_queue.pop_spin() } {
1182             Some(msg) => {
1183                 // Decrement number of messages
1184                 self.dec_num_messages();
1185 
1186                 Poll::Ready(Some(msg))
1187             }
1188             None => {
1189                 let state = decode_state(inner.state.load(SeqCst));
1190                 if state.is_closed() {
1191                     // If closed flag is set AND there are no pending messages
1192                     // it means end of stream
1193                     self.inner = None;
1194                     Poll::Ready(None)
1195                 } else {
1196                     // If queue is open, we need to return Pending
1197                     // to be woken up when new messages arrive.
1198                     // If queue is closed but num_messages is non-zero,
1199                     // it means that senders updated the state,
1200                     // but didn't put message to queue yet,
1201                     // so we need to park until sender unparks the task
1202                     // after queueing the message.
1203                     Poll::Pending
1204                 }
1205             }
1206         }
1207     }
1208 
dec_num_messages(&self)1209     fn dec_num_messages(&self) {
1210         if let Some(inner) = &self.inner {
1211             // OPEN_MASK is highest bit, so it's unaffected by subtraction
1212             // unless there's underflow, and we know there's no underflow
1213             // because number of messages at this point is always > 0.
1214             inner.state.fetch_sub(1, SeqCst);
1215         }
1216     }
1217 }
1218 
1219 impl<T> FusedStream for UnboundedReceiver<T> {
is_terminated(&self) -> bool1220     fn is_terminated(&self) -> bool {
1221         self.inner.is_none()
1222     }
1223 }
1224 
1225 impl<T> Stream for UnboundedReceiver<T> {
1226     type Item = T;
1227 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>>1228     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1229         // Try to read a message off of the message queue.
1230         match self.next_message() {
1231             Poll::Ready(msg) => {
1232                 if msg.is_none() {
1233                     self.inner = None;
1234                 }
1235                 Poll::Ready(msg)
1236             }
1237             Poll::Pending => {
1238                 // There are no messages to read, in this case, park.
1239                 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1240                 // Check queue again after parking to prevent race condition:
1241                 // a message could be added to the queue after previous `next_message`
1242                 // before `register` call.
1243                 self.next_message()
1244             }
1245         }
1246     }
1247 
size_hint(&self) -> (usize, Option<usize>)1248     fn size_hint(&self) -> (usize, Option<usize>) {
1249         if let Some(inner) = &self.inner {
1250             decode_state(inner.state.load(SeqCst)).size_hint()
1251         } else {
1252             (0, Some(0))
1253         }
1254     }
1255 }
1256 
1257 impl<T> Drop for UnboundedReceiver<T> {
drop(&mut self)1258     fn drop(&mut self) {
1259         // Drain the channel of all pending messages
1260         self.close();
1261         if self.inner.is_some() {
1262             loop {
1263                 match self.next_message() {
1264                     Poll::Ready(Some(_)) => {}
1265                     Poll::Ready(None) => break,
1266                     Poll::Pending => {
1267                         let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1268 
1269                         // If the channel is closed, then there is no need to park.
1270                         if state.is_closed() {
1271                             break;
1272                         }
1273 
1274                         // TODO: Spinning isn't ideal, it might be worth
1275                         // investigating using a condvar or some other strategy
1276                         // here. That said, if this case is hit, then another thread
1277                         // is about to push the value into the queue and this isn't
1278                         // the only spinlock in the impl right now.
1279                         thread::yield_now();
1280                     }
1281                 }
1282             }
1283         }
1284     }
1285 }
1286 
1287 impl<T> fmt::Debug for UnboundedReceiver<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1288     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1289         let closed = if let Some(ref inner) = self.inner {
1290             decode_state(inner.state.load(SeqCst)).is_closed()
1291         } else {
1292             false
1293         };
1294 
1295         f.debug_struct("Receiver").field("closed", &closed).finish()
1296     }
1297 }
1298 
1299 /*
1300  *
1301  * ===== impl Inner =====
1302  *
1303  */
1304 
1305 impl<T> UnboundedInner<T> {
1306     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1307     fn set_closed(&self) {
1308         let curr = self.state.load(SeqCst);
1309         if !decode_state(curr).is_open {
1310             return;
1311         }
1312 
1313         self.state.fetch_and(!OPEN_MASK, SeqCst);
1314     }
1315 }
1316 
1317 impl<T> BoundedInner<T> {
1318     // The return value is such that the total number of messages that can be
1319     // enqueued into the channel will never exceed MAX_CAPACITY
max_senders(&self) -> usize1320     fn max_senders(&self) -> usize {
1321         MAX_CAPACITY - self.buffer
1322     }
1323 
1324     // Clear `open` flag in the state, keep `num_messages` intact.
set_closed(&self)1325     fn set_closed(&self) {
1326         let curr = self.state.load(SeqCst);
1327         if !decode_state(curr).is_open {
1328             return;
1329         }
1330 
1331         self.state.fetch_and(!OPEN_MASK, SeqCst);
1332     }
1333 }
1334 
1335 unsafe impl<T: Send> Send for UnboundedInner<T> {}
1336 unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1337 
1338 unsafe impl<T: Send> Send for BoundedInner<T> {}
1339 unsafe impl<T: Send> Sync for BoundedInner<T> {}
1340 
1341 impl State {
is_closed(&self) -> bool1342     fn is_closed(&self) -> bool {
1343         !self.is_open && self.num_messages == 0
1344     }
1345 
size_hint(&self) -> (usize, Option<usize>)1346     fn size_hint(&self) -> (usize, Option<usize>) {
1347         if self.is_open {
1348             (self.num_messages, None)
1349         } else {
1350             (self.num_messages, Some(self.num_messages))
1351         }
1352     }
1353 }
1354 
1355 /*
1356  *
1357  * ===== Helpers =====
1358  *
1359  */
1360 
decode_state(num: usize) -> State1361 fn decode_state(num: usize) -> State {
1362     State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1363 }
1364 
encode_state(state: &State) -> usize1365 fn encode_state(state: &State) -> usize {
1366     let mut num = state.num_messages;
1367 
1368     if state.is_open {
1369         num |= OPEN_MASK;
1370     }
1371 
1372     num
1373 }
1374