1 use crate::loom::sync::Arc;
2 use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3 use crate::sync::mpsc::chan;
4 use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5 
6 cfg_time! {
7     use crate::sync::mpsc::error::SendTimeoutError;
8     use crate::time::Duration;
9 }
10 
11 use std::fmt;
12 use std::task::{Context, Poll};
13 
14 /// Sends values to the associated `Receiver`.
15 ///
16 /// Instances are created by the [`channel`] function.
17 ///
18 /// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19 /// use the [`PollSender`] utility.
20 ///
21 /// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22 pub struct Sender<T> {
23     chan: chan::Tx<T, Semaphore>,
24 }
25 
26 /// A sender that does not prevent the channel from being closed.
27 ///
28 /// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29 /// instances remain, the channel is closed.
30 ///
31 /// In order to send messages, the `WeakSender` needs to be upgraded using
32 /// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33 /// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34 ///
35 /// [`Sender`]: Sender
36 /// [`WeakSender::upgrade`]: WeakSender::upgrade
37 ///
38 /// # Examples
39 ///
40 /// ```
41 /// use tokio::sync::mpsc::channel;
42 ///
43 /// #[tokio::main]
44 /// async fn main() {
45 ///     let (tx, _rx) = channel::<i32>(15);
46 ///     let tx_weak = tx.downgrade();
47 ///
48 ///     // Upgrading will succeed because `tx` still exists.
49 ///     assert!(tx_weak.upgrade().is_some());
50 ///
51 ///     // If we drop `tx`, then it will fail.
52 ///     drop(tx);
53 ///     assert!(tx_weak.clone().upgrade().is_none());
54 /// }
55 /// ```
56 pub struct WeakSender<T> {
57     chan: Arc<chan::Chan<T, Semaphore>>,
58 }
59 
60 /// Permits to send one value into the channel.
61 ///
62 /// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63 /// and are used to guarantee channel capacity before generating a message to send.
64 ///
65 /// [`Sender::reserve()`]: Sender::reserve
66 /// [`Sender::try_reserve()`]: Sender::try_reserve
67 pub struct Permit<'a, T> {
68     chan: &'a chan::Tx<T, Semaphore>,
69 }
70 
71 /// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72 ///
73 /// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74 /// and are used to guarantee channel capacity before generating `n` messages to send.
75 ///
76 /// [`Sender::reserve_many()`]: Sender::reserve_many
77 /// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78 pub struct PermitIterator<'a, T> {
79     chan: &'a chan::Tx<T, Semaphore>,
80     n: usize,
81 }
82 
83 /// Owned permit to send one value into the channel.
84 ///
85 /// This is identical to the [`Permit`] type, except that it moves the sender
86 /// rather than borrowing it.
87 ///
88 /// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89 /// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90 /// before generating a message to send.
91 ///
92 /// [`Permit`]: Permit
93 /// [`Sender::reserve_owned()`]: Sender::reserve_owned
94 /// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95 pub struct OwnedPermit<T> {
96     chan: Option<chan::Tx<T, Semaphore>>,
97 }
98 
99 /// Receives values from the associated `Sender`.
100 ///
101 /// Instances are created by the [`channel`] function.
102 ///
103 /// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104 ///
105 /// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106 pub struct Receiver<T> {
107     /// The channel receiver.
108     chan: chan::Rx<T, Semaphore>,
109 }
110 
111 /// Creates a bounded mpsc channel for communicating between asynchronous tasks
112 /// with backpressure.
113 ///
114 /// The channel will buffer up to the provided number of messages.  Once the
115 /// buffer is full, attempts to send new messages will wait until a message is
116 /// received from the channel. The provided buffer capacity must be at least 1.
117 ///
118 /// All data sent on `Sender` will become available on `Receiver` in the same
119 /// order as it was sent.
120 ///
121 /// The `Sender` can be cloned to `send` to the same channel from multiple code
122 /// locations. Only one `Receiver` is supported.
123 ///
124 /// If the `Receiver` is disconnected while trying to `send`, the `send` method
125 /// will return a `SendError`. Similarly, if `Sender` is disconnected while
126 /// trying to `recv`, the `recv` method will return `None`.
127 ///
128 /// # Panics
129 ///
130 /// Panics if the buffer capacity is 0.
131 ///
132 /// # Examples
133 ///
134 /// ```rust
135 /// use tokio::sync::mpsc;
136 ///
137 /// #[tokio::main]
138 /// async fn main() {
139 ///     let (tx, mut rx) = mpsc::channel(100);
140 ///
141 ///     tokio::spawn(async move {
142 ///         for i in 0..10 {
143 ///             if let Err(_) = tx.send(i).await {
144 ///                 println!("receiver dropped");
145 ///                 return;
146 ///             }
147 ///         }
148 ///     });
149 ///
150 ///     while let Some(i) = rx.recv().await {
151 ///         println!("got = {}", i);
152 ///     }
153 /// }
154 /// ```
155 #[track_caller]
channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>)156 pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
157     assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
158     let semaphore = Semaphore {
159         semaphore: semaphore::Semaphore::new(buffer),
160         bound: buffer,
161     };
162     let (tx, rx) = chan::channel(semaphore);
163 
164     let tx = Sender::new(tx);
165     let rx = Receiver::new(rx);
166 
167     (tx, rx)
168 }
169 
170 /// Channel semaphore is a tuple of the semaphore implementation and a `usize`
171 /// representing the channel bound.
172 #[derive(Debug)]
173 pub(crate) struct Semaphore {
174     pub(crate) semaphore: semaphore::Semaphore,
175     pub(crate) bound: usize,
176 }
177 
178 impl<T> Receiver<T> {
new(chan: chan::Rx<T, Semaphore>) -> Receiver<T>179     pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
180         Receiver { chan }
181     }
182 
183     /// Receives the next value for this receiver.
184     ///
185     /// This method returns `None` if the channel has been closed and there are
186     /// no remaining messages in the channel's buffer. This indicates that no
187     /// further values can ever be received from this `Receiver`. The channel is
188     /// closed when all senders have been dropped, or when [`close`] is called.
189     ///
190     /// If there are no messages in the channel's buffer, but the channel has
191     /// not yet been closed, this method will sleep until a message is sent or
192     /// the channel is closed.  Note that if [`close`] is called, but there are
193     /// still outstanding [`Permits`] from before it was closed, the channel is
194     /// not considered closed by `recv` until the permits are released.
195     ///
196     /// # Cancel safety
197     ///
198     /// This method is cancel safe. If `recv` is used as the event in a
199     /// [`tokio::select!`](crate::select) statement and some other branch
200     /// completes first, it is guaranteed that no messages were received on this
201     /// channel.
202     ///
203     /// [`close`]: Self::close
204     /// [`Permits`]: struct@crate::sync::mpsc::Permit
205     ///
206     /// # Examples
207     ///
208     /// ```
209     /// use tokio::sync::mpsc;
210     ///
211     /// #[tokio::main]
212     /// async fn main() {
213     ///     let (tx, mut rx) = mpsc::channel(100);
214     ///
215     ///     tokio::spawn(async move {
216     ///         tx.send("hello").await.unwrap();
217     ///     });
218     ///
219     ///     assert_eq!(Some("hello"), rx.recv().await);
220     ///     assert_eq!(None, rx.recv().await);
221     /// }
222     /// ```
223     ///
224     /// Values are buffered:
225     ///
226     /// ```
227     /// use tokio::sync::mpsc;
228     ///
229     /// #[tokio::main]
230     /// async fn main() {
231     ///     let (tx, mut rx) = mpsc::channel(100);
232     ///
233     ///     tx.send("hello").await.unwrap();
234     ///     tx.send("world").await.unwrap();
235     ///
236     ///     assert_eq!(Some("hello"), rx.recv().await);
237     ///     assert_eq!(Some("world"), rx.recv().await);
238     /// }
239     /// ```
recv(&mut self) -> Option<T>240     pub async fn recv(&mut self) -> Option<T> {
241         use std::future::poll_fn;
242         poll_fn(|cx| self.chan.recv(cx)).await
243     }
244 
245     /// Receives the next values for this receiver and extends `buffer`.
246     ///
247     /// This method extends `buffer` by no more than a fixed number of values
248     /// as specified by `limit`. If `limit` is zero, the function immediately
249     /// returns `0`. The return value is the number of values added to `buffer`.
250     ///
251     /// For `limit > 0`, if there are no messages in the channel's queue, but
252     /// the channel has not yet been closed, this method will sleep until a
253     /// message is sent or the channel is closed. Note that if [`close`] is
254     /// called, but there are still outstanding [`Permits`] from before it was
255     /// closed, the channel is not considered closed by `recv_many` until the
256     /// permits are released.
257     ///
258     /// For non-zero values of `limit`, this method will never return `0` unless
259     /// the channel has been closed and there are no remaining messages in the
260     /// channel's queue. This indicates that no further values can ever be
261     /// received from this `Receiver`. The channel is closed when all senders
262     /// have been dropped, or when [`close`] is called.
263     ///
264     /// The capacity of `buffer` is increased as needed.
265     ///
266     /// # Cancel safety
267     ///
268     /// This method is cancel safe. If `recv_many` is used as the event in a
269     /// [`tokio::select!`](crate::select) statement and some other branch
270     /// completes first, it is guaranteed that no messages were received on this
271     /// channel.
272     ///
273     /// [`close`]: Self::close
274     /// [`Permits`]: struct@crate::sync::mpsc::Permit
275     ///
276     /// # Examples
277     ///
278     /// ```
279     /// use tokio::sync::mpsc;
280     ///
281     /// #[tokio::main]
282     /// async fn main() {
283     ///     let mut buffer: Vec<&str> = Vec::with_capacity(2);
284     ///     let limit = 2;
285     ///     let (tx, mut rx) = mpsc::channel(100);
286     ///     let tx2 = tx.clone();
287     ///     tx2.send("first").await.unwrap();
288     ///     tx2.send("second").await.unwrap();
289     ///     tx2.send("third").await.unwrap();
290     ///
291     ///     // Call `recv_many` to receive up to `limit` (2) values.
292     ///     assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
293     ///     assert_eq!(vec!["first", "second"], buffer);
294     ///
295     ///     // If the buffer is full, the next call to `recv_many`
296     ///     // reserves additional capacity.
297     ///     assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
298     ///
299     ///     tokio::spawn(async move {
300     ///         tx.send("fourth").await.unwrap();
301     ///     });
302     ///
303     ///     // 'tx' is dropped, but `recv_many`
304     ///     // is guaranteed not to return 0 as the channel
305     ///     // is not yet closed.
306     ///     assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
307     ///     assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
308     ///
309     ///     // Once the last sender is dropped, the channel is
310     ///     // closed and `recv_many` returns 0, capacity unchanged.
311     ///     drop(tx2);
312     ///     assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
313     ///     assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
314     /// }
315     /// ```
recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize316     pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
317         use std::future::poll_fn;
318         poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
319     }
320 
321     /// Tries to receive the next value for this receiver.
322     ///
323     /// This method returns the [`Empty`] error if the channel is currently
324     /// empty, but there are still outstanding [senders] or [permits].
325     ///
326     /// This method returns the [`Disconnected`] error if the channel is
327     /// currently empty, and there are no outstanding [senders] or [permits].
328     ///
329     /// Unlike the [`poll_recv`] method, this method will never return an
330     /// [`Empty`] error spuriously.
331     ///
332     /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
333     /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
334     /// [`poll_recv`]: Self::poll_recv
335     /// [senders]: crate::sync::mpsc::Sender
336     /// [permits]: crate::sync::mpsc::Permit
337     ///
338     /// # Examples
339     ///
340     /// ```
341     /// use tokio::sync::mpsc;
342     /// use tokio::sync::mpsc::error::TryRecvError;
343     ///
344     /// #[tokio::main]
345     /// async fn main() {
346     ///     let (tx, mut rx) = mpsc::channel(100);
347     ///
348     ///     tx.send("hello").await.unwrap();
349     ///
350     ///     assert_eq!(Ok("hello"), rx.try_recv());
351     ///     assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
352     ///
353     ///     tx.send("hello").await.unwrap();
354     ///     // Drop the last sender, closing the channel.
355     ///     drop(tx);
356     ///
357     ///     assert_eq!(Ok("hello"), rx.try_recv());
358     ///     assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
359     /// }
360     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>361     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
362         self.chan.try_recv()
363     }
364 
365     /// Blocking receive to call outside of asynchronous contexts.
366     ///
367     /// This method returns `None` if the channel has been closed and there are
368     /// no remaining messages in the channel's buffer. This indicates that no
369     /// further values can ever be received from this `Receiver`. The channel is
370     /// closed when all senders have been dropped, or when [`close`] is called.
371     ///
372     /// If there are no messages in the channel's buffer, but the channel has
373     /// not yet been closed, this method will block until a message is sent or
374     /// the channel is closed.
375     ///
376     /// This method is intended for use cases where you are sending from
377     /// asynchronous code to synchronous code, and will work even if the sender
378     /// is not using [`blocking_send`] to send the message.
379     ///
380     /// Note that if [`close`] is called, but there are still outstanding
381     /// [`Permits`] from before it was closed, the channel is not considered
382     /// closed by `blocking_recv` until the permits are released.
383     ///
384     /// [`close`]: Self::close
385     /// [`Permits`]: struct@crate::sync::mpsc::Permit
386     /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
387     ///
388     /// # Panics
389     ///
390     /// This function panics if called within an asynchronous execution
391     /// context.
392     ///
393     /// # Examples
394     ///
395     /// ```
396     /// use std::thread;
397     /// use tokio::runtime::Runtime;
398     /// use tokio::sync::mpsc;
399     ///
400     /// fn main() {
401     ///     let (tx, mut rx) = mpsc::channel::<u8>(10);
402     ///
403     ///     let sync_code = thread::spawn(move || {
404     ///         assert_eq!(Some(10), rx.blocking_recv());
405     ///     });
406     ///
407     ///     Runtime::new()
408     ///         .unwrap()
409     ///         .block_on(async move {
410     ///             let _ = tx.send(10).await;
411     ///         });
412     ///     sync_code.join().unwrap()
413     /// }
414     /// ```
415     #[track_caller]
416     #[cfg(feature = "sync")]
417     #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
blocking_recv(&mut self) -> Option<T>418     pub fn blocking_recv(&mut self) -> Option<T> {
419         crate::future::block_on(self.recv())
420     }
421 
422     /// Variant of [`Self::recv_many`] for blocking contexts.
423     ///
424     /// The same conditions as in [`Self::blocking_recv`] apply.
425     #[track_caller]
426     #[cfg(feature = "sync")]
427     #[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize428     pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
429         crate::future::block_on(self.recv_many(buffer, limit))
430     }
431 
432     /// Closes the receiving half of a channel without dropping it.
433     ///
434     /// This prevents any further messages from being sent on the channel while
435     /// still enabling the receiver to drain messages that are buffered. Any
436     /// outstanding [`Permit`] values will still be able to send messages.
437     ///
438     /// To guarantee that no messages are dropped, after calling `close()`,
439     /// `recv()` must be called until `None` is returned. If there are
440     /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
441     /// not return `None` until those are released.
442     ///
443     /// [`Permit`]: Permit
444     /// [`OwnedPermit`]: OwnedPermit
445     ///
446     /// # Examples
447     ///
448     /// ```
449     /// use tokio::sync::mpsc;
450     ///
451     /// #[tokio::main]
452     /// async fn main() {
453     ///     let (tx, mut rx) = mpsc::channel(20);
454     ///
455     ///     tokio::spawn(async move {
456     ///         let mut i = 0;
457     ///         while let Ok(permit) = tx.reserve().await {
458     ///             permit.send(i);
459     ///             i += 1;
460     ///         }
461     ///     });
462     ///
463     ///     rx.close();
464     ///
465     ///     while let Some(msg) = rx.recv().await {
466     ///         println!("got {}", msg);
467     ///     }
468     ///
469     ///     // Channel closed and no messages are lost.
470     /// }
471     /// ```
close(&mut self)472     pub fn close(&mut self) {
473         self.chan.close();
474     }
475 
476     /// Checks if a channel is closed.
477     ///
478     /// This method returns `true` if the channel has been closed. The channel is closed
479     /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
480     ///
481     /// [`Sender`]: crate::sync::mpsc::Sender
482     /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
483     ///
484     /// # Examples
485     /// ```
486     /// use tokio::sync::mpsc;
487     ///
488     /// #[tokio::main]
489     /// async fn main() {
490     ///     let (_tx, mut rx) = mpsc::channel::<()>(10);
491     ///     assert!(!rx.is_closed());
492     ///
493     ///     rx.close();
494     ///
495     ///     assert!(rx.is_closed());
496     /// }
497     /// ```
is_closed(&self) -> bool498     pub fn is_closed(&self) -> bool {
499         self.chan.is_closed()
500     }
501 
502     /// Checks if a channel is empty.
503     ///
504     /// This method returns `true` if the channel has no messages.
505     ///
506     /// # Examples
507     /// ```
508     /// use tokio::sync::mpsc;
509     ///
510     /// #[tokio::main]
511     /// async fn main() {
512     ///     let (tx, rx) = mpsc::channel(10);
513     ///     assert!(rx.is_empty());
514     ///
515     ///     tx.send(0).await.unwrap();
516     ///     assert!(!rx.is_empty());
517     /// }
518     ///
519     /// ```
is_empty(&self) -> bool520     pub fn is_empty(&self) -> bool {
521         self.chan.is_empty()
522     }
523 
524     /// Returns the number of messages in the channel.
525     ///
526     /// # Examples
527     /// ```
528     /// use tokio::sync::mpsc;
529     ///
530     /// #[tokio::main]
531     /// async fn main() {
532     ///     let (tx, rx) = mpsc::channel(10);
533     ///     assert_eq!(0, rx.len());
534     ///
535     ///     tx.send(0).await.unwrap();
536     ///     assert_eq!(1, rx.len());
537     /// }
538     /// ```
len(&self) -> usize539     pub fn len(&self) -> usize {
540         self.chan.len()
541     }
542 
543     /// Returns the current capacity of the channel.
544     ///
545     /// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
546     /// capacity with [`Sender::reserve`]. The capacity goes up when values are received.
547     /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
548     /// specified when calling [`channel`].
549     ///
550     /// # Examples
551     ///
552     /// ```
553     /// use tokio::sync::mpsc;
554     ///
555     /// #[tokio::main]
556     /// async fn main() {
557     ///     let (tx, mut rx) = mpsc::channel::<()>(5);
558     ///
559     ///     assert_eq!(rx.capacity(), 5);
560     ///
561     ///     // Making a reservation drops the capacity by one.
562     ///     let permit = tx.reserve().await.unwrap();
563     ///     assert_eq!(rx.capacity(), 4);
564     ///     assert_eq!(rx.len(), 0);
565     ///
566     ///     // Sending and receiving a value increases the capacity by one.
567     ///     permit.send(());
568     ///     assert_eq!(rx.len(), 1);
569     ///     rx.recv().await.unwrap();
570     ///     assert_eq!(rx.capacity(), 5);
571     ///
572     ///     // Directly sending a message drops the capacity by one.
573     ///     tx.send(()).await.unwrap();
574     ///     assert_eq!(rx.capacity(), 4);
575     ///     assert_eq!(rx.len(), 1);
576     ///
577     ///     // Receiving the message increases the capacity by one.
578     ///     rx.recv().await.unwrap();
579     ///     assert_eq!(rx.capacity(), 5);
580     ///     assert_eq!(rx.len(), 0);
581     /// }
582     /// ```
583     /// [`capacity`]: Receiver::capacity
584     /// [`max_capacity`]: Receiver::max_capacity
capacity(&self) -> usize585     pub fn capacity(&self) -> usize {
586         self.chan.semaphore().semaphore.available_permits()
587     }
588 
589     /// Returns the maximum buffer capacity of the channel.
590     ///
591     /// The maximum capacity is the buffer capacity initially specified when calling
592     /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
593     /// available buffer capacity: as messages are sent and received, the value
594     /// returned by [`capacity`] will go up or down, whereas the value
595     /// returned by [`max_capacity`] will remain constant.
596     ///
597     /// # Examples
598     ///
599     /// ```
600     /// use tokio::sync::mpsc;
601     ///
602     /// #[tokio::main]
603     /// async fn main() {
604     ///     let (tx, rx) = mpsc::channel::<()>(5);
605     ///
606     ///     // both max capacity and capacity are the same at first
607     ///     assert_eq!(rx.max_capacity(), 5);
608     ///     assert_eq!(rx.capacity(), 5);
609     ///
610     ///     // Making a reservation doesn't change the max capacity.
611     ///     let permit = tx.reserve().await.unwrap();
612     ///     assert_eq!(rx.max_capacity(), 5);
613     ///     // but drops the capacity by one
614     ///     assert_eq!(rx.capacity(), 4);
615     /// }
616     /// ```
617     /// [`capacity`]: Receiver::capacity
618     /// [`max_capacity`]: Receiver::max_capacity
max_capacity(&self) -> usize619     pub fn max_capacity(&self) -> usize {
620         self.chan.semaphore().bound
621     }
622 
623     /// Polls to receive the next message on this channel.
624     ///
625     /// This method returns:
626     ///
627     ///  * `Poll::Pending` if no messages are available but the channel is not
628     ///    closed, or if a spurious failure happens.
629     ///  * `Poll::Ready(Some(message))` if a message is available.
630     ///  * `Poll::Ready(None)` if the channel has been closed and all messages
631     ///    sent before it was closed have been received.
632     ///
633     /// When the method returns `Poll::Pending`, the `Waker` in the provided
634     /// `Context` is scheduled to receive a wakeup when a message is sent on any
635     /// receiver, or when the channel is closed.  Note that on multiple calls to
636     /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
637     /// passed to the most recent call is scheduled to receive a wakeup.
638     ///
639     /// If this method returns `Poll::Pending` due to a spurious failure, then
640     /// the `Waker` will be notified when the situation causing the spurious
641     /// failure has been resolved. Note that receiving such a wakeup does not
642     /// guarantee that the next call will succeed — it could fail with another
643     /// spurious failure.
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>644     pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
645         self.chan.recv(cx)
646     }
647 
648     /// Polls to receive multiple messages on this channel, extending the provided buffer.
649     ///
650     /// This method returns:
651     /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
652     ///   spurious failure happens.
653     /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
654     ///   stored in `buffer`. This can be less than, or equal to, `limit`.
655     /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
656     ///
657     /// When the method returns `Poll::Pending`, the `Waker` in the provided
658     /// `Context` is scheduled to receive a wakeup when a message is sent on any
659     /// receiver, or when the channel is closed.  Note that on multiple calls to
660     /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
661     /// passed to the most recent call is scheduled to receive a wakeup.
662     ///
663     /// Note that this method does not guarantee that exactly `limit` messages
664     /// are received. Rather, if at least one message is available, it returns
665     /// as many messages as it can up to the given limit. This method returns
666     /// zero only if the channel is closed (or if `limit` is zero).
667     ///
668     /// # Examples
669     ///
670     /// ```
671     /// use std::task::{Context, Poll};
672     /// use std::pin::Pin;
673     /// use tokio::sync::mpsc;
674     /// use futures::Future;
675     ///
676     /// struct MyReceiverFuture<'a> {
677     ///     receiver: mpsc::Receiver<i32>,
678     ///     buffer: &'a mut Vec<i32>,
679     ///     limit: usize,
680     /// }
681     ///
682     /// impl<'a> Future for MyReceiverFuture<'a> {
683     ///     type Output = usize; // Number of messages received
684     ///
685     ///     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
686     ///         let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
687     ///
688     ///         // Now `receiver` and `buffer` are mutable references, and `limit` is copied
689     ///         match receiver.poll_recv_many(cx, *buffer, *limit) {
690     ///             Poll::Pending => Poll::Pending,
691     ///             Poll::Ready(count) => Poll::Ready(count),
692     ///         }
693     ///     }
694     /// }
695     ///
696     /// #[tokio::main]
697     /// async fn main() {
698     ///     let (tx, rx) = mpsc::channel(32);
699     ///     let mut buffer = Vec::new();
700     ///
701     ///     let my_receiver_future = MyReceiverFuture {
702     ///         receiver: rx,
703     ///         buffer: &mut buffer,
704     ///         limit: 3,
705     ///     };
706     ///
707     ///     for i in 0..10 {
708     ///         tx.send(i).await.unwrap();
709     ///     }
710     ///
711     ///     let count = my_receiver_future.await;
712     ///     assert_eq!(count, 3);
713     ///     assert_eq!(buffer, vec![0,1,2])
714     /// }
715     /// ```
poll_recv_many( &mut self, cx: &mut Context<'_>, buffer: &mut Vec<T>, limit: usize, ) -> Poll<usize>716     pub fn poll_recv_many(
717         &mut self,
718         cx: &mut Context<'_>,
719         buffer: &mut Vec<T>,
720         limit: usize,
721     ) -> Poll<usize> {
722         self.chan.recv_many(cx, buffer, limit)
723     }
724 
725     /// Returns the number of [`Sender`] handles.
sender_strong_count(&self) -> usize726     pub fn sender_strong_count(&self) -> usize {
727         self.chan.sender_strong_count()
728     }
729 
730     /// Returns the number of [`WeakSender`] handles.
sender_weak_count(&self) -> usize731     pub fn sender_weak_count(&self) -> usize {
732         self.chan.sender_weak_count()
733     }
734 }
735 
736 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result737     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
738         fmt.debug_struct("Receiver")
739             .field("chan", &self.chan)
740             .finish()
741     }
742 }
743 
744 impl<T> Unpin for Receiver<T> {}
745 
746 impl<T> Sender<T> {
new(chan: chan::Tx<T, Semaphore>) -> Sender<T>747     pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
748         Sender { chan }
749     }
750 
751     /// Sends a value, waiting until there is capacity.
752     ///
753     /// A successful send occurs when it is determined that the other end of the
754     /// channel has not hung up already. An unsuccessful send would be one where
755     /// the corresponding receiver has already been closed. Note that a return
756     /// value of `Err` means that the data will never be received, but a return
757     /// value of `Ok` does not mean that the data will be received. It is
758     /// possible for the corresponding receiver to hang up immediately after
759     /// this function returns `Ok`.
760     ///
761     /// # Errors
762     ///
763     /// If the receive half of the channel is closed, either due to [`close`]
764     /// being called or the [`Receiver`] handle dropping, the function returns
765     /// an error. The error includes the value passed to `send`.
766     ///
767     /// [`close`]: Receiver::close
768     /// [`Receiver`]: Receiver
769     ///
770     /// # Cancel safety
771     ///
772     /// If `send` is used as the event in a [`tokio::select!`](crate::select)
773     /// statement and some other branch completes first, then it is guaranteed
774     /// that the message was not sent. **However, in that case, the message
775     /// is dropped and will be lost.**
776     ///
777     /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
778     /// capacity, then use the returned [`Permit`] to send the message.
779     ///
780     /// This channel uses a queue to ensure that calls to `send` and `reserve`
781     /// complete in the order they were requested.  Cancelling a call to
782     /// `send` makes you lose your place in the queue.
783     ///
784     /// # Examples
785     ///
786     /// In the following example, each call to `send` will block until the
787     /// previously sent value was received.
788     ///
789     /// ```rust
790     /// use tokio::sync::mpsc;
791     ///
792     /// #[tokio::main]
793     /// async fn main() {
794     ///     let (tx, mut rx) = mpsc::channel(1);
795     ///
796     ///     tokio::spawn(async move {
797     ///         for i in 0..10 {
798     ///             if let Err(_) = tx.send(i).await {
799     ///                 println!("receiver dropped");
800     ///                 return;
801     ///             }
802     ///         }
803     ///     });
804     ///
805     ///     while let Some(i) = rx.recv().await {
806     ///         println!("got = {}", i);
807     ///     }
808     /// }
809     /// ```
send(&self, value: T) -> Result<(), SendError<T>>810     pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
811         match self.reserve().await {
812             Ok(permit) => {
813                 permit.send(value);
814                 Ok(())
815             }
816             Err(_) => Err(SendError(value)),
817         }
818     }
819 
820     /// Completes when the receiver has dropped.
821     ///
822     /// This allows the producers to get notified when interest in the produced
823     /// values is canceled and immediately stop doing work.
824     ///
825     /// # Cancel safety
826     ///
827     /// This method is cancel safe. Once the channel is closed, it stays closed
828     /// forever and all future calls to `closed` will return immediately.
829     ///
830     /// # Examples
831     ///
832     /// ```
833     /// use tokio::sync::mpsc;
834     ///
835     /// #[tokio::main]
836     /// async fn main() {
837     ///     let (tx1, rx) = mpsc::channel::<()>(1);
838     ///     let tx2 = tx1.clone();
839     ///     let tx3 = tx1.clone();
840     ///     let tx4 = tx1.clone();
841     ///     let tx5 = tx1.clone();
842     ///     tokio::spawn(async move {
843     ///         drop(rx);
844     ///     });
845     ///
846     ///     futures::join!(
847     ///         tx1.closed(),
848     ///         tx2.closed(),
849     ///         tx3.closed(),
850     ///         tx4.closed(),
851     ///         tx5.closed()
852     ///     );
853     ///     println!("Receiver dropped");
854     /// }
855     /// ```
closed(&self)856     pub async fn closed(&self) {
857         self.chan.closed().await;
858     }
859 
860     /// Attempts to immediately send a message on this `Sender`
861     ///
862     /// This method differs from [`send`] by returning immediately if the channel's
863     /// buffer is full or no receiver is waiting to acquire some data. Compared
864     /// with [`send`], this function has two failure cases instead of one (one for
865     /// disconnection, one for a full buffer).
866     ///
867     /// # Errors
868     ///
869     /// If the channel capacity has been reached, i.e., the channel has `n`
870     /// buffered values where `n` is the argument passed to [`channel`], then an
871     /// error is returned.
872     ///
873     /// If the receive half of the channel is closed, either due to [`close`]
874     /// being called or the [`Receiver`] handle dropping, the function returns
875     /// an error. The error includes the value passed to `send`.
876     ///
877     /// [`send`]: Sender::send
878     /// [`channel`]: channel
879     /// [`close`]: Receiver::close
880     ///
881     /// # Examples
882     ///
883     /// ```
884     /// use tokio::sync::mpsc;
885     ///
886     /// #[tokio::main]
887     /// async fn main() {
888     ///     // Create a channel with buffer size 1
889     ///     let (tx1, mut rx) = mpsc::channel(1);
890     ///     let tx2 = tx1.clone();
891     ///
892     ///     tokio::spawn(async move {
893     ///         tx1.send(1).await.unwrap();
894     ///         tx1.send(2).await.unwrap();
895     ///         // task waits until the receiver receives a value.
896     ///     });
897     ///
898     ///     tokio::spawn(async move {
899     ///         // This will return an error and send
900     ///         // no message if the buffer is full
901     ///         let _ = tx2.try_send(3);
902     ///     });
903     ///
904     ///     let mut msg;
905     ///     msg = rx.recv().await.unwrap();
906     ///     println!("message {} received", msg);
907     ///
908     ///     msg = rx.recv().await.unwrap();
909     ///     println!("message {} received", msg);
910     ///
911     ///     // Third message may have never been sent
912     ///     match rx.recv().await {
913     ///         Some(msg) => println!("message {} received", msg),
914     ///         None => println!("the third message was never sent"),
915     ///     }
916     /// }
917     /// ```
try_send(&self, message: T) -> Result<(), TrySendError<T>>918     pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
919         match self.chan.semaphore().semaphore.try_acquire(1) {
920             Ok(()) => {}
921             Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
922             Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
923         }
924 
925         // Send the message
926         self.chan.send(message);
927         Ok(())
928     }
929 
930     /// Sends a value, waiting until there is capacity, but only for a limited time.
931     ///
932     /// Shares the same success and error conditions as [`send`], adding one more
933     /// condition for an unsuccessful send, which is when the provided timeout has
934     /// elapsed, and there is no capacity available.
935     ///
936     /// [`send`]: Sender::send
937     ///
938     /// # Errors
939     ///
940     /// If the receive half of the channel is closed, either due to [`close`]
941     /// being called or the [`Receiver`] having been dropped,
942     /// the function returns an error. The error includes the value passed to `send`.
943     ///
944     /// [`close`]: Receiver::close
945     /// [`Receiver`]: Receiver
946     ///
947     /// # Panics
948     ///
949     /// This function panics if it is called outside the context of a Tokio
950     /// runtime [with time enabled](crate::runtime::Builder::enable_time).
951     ///
952     /// # Examples
953     ///
954     /// In the following example, each call to `send_timeout` will block until the
955     /// previously sent value was received, unless the timeout has elapsed.
956     ///
957     /// ```rust
958     /// use tokio::sync::mpsc;
959     /// use tokio::time::{sleep, Duration};
960     ///
961     /// #[tokio::main]
962     /// async fn main() {
963     ///     let (tx, mut rx) = mpsc::channel(1);
964     ///
965     ///     tokio::spawn(async move {
966     ///         for i in 0..10 {
967     ///             if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
968     ///                 println!("send error: #{:?}", e);
969     ///                 return;
970     ///             }
971     ///         }
972     ///     });
973     ///
974     ///     while let Some(i) = rx.recv().await {
975     ///         println!("got = {}", i);
976     ///         sleep(Duration::from_millis(200)).await;
977     ///     }
978     /// }
979     /// ```
980     #[cfg(feature = "time")]
981     #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
send_timeout( &self, value: T, timeout: Duration, ) -> Result<(), SendTimeoutError<T>>982     pub async fn send_timeout(
983         &self,
984         value: T,
985         timeout: Duration,
986     ) -> Result<(), SendTimeoutError<T>> {
987         let permit = match crate::time::timeout(timeout, self.reserve()).await {
988             Err(_) => {
989                 return Err(SendTimeoutError::Timeout(value));
990             }
991             Ok(Err(_)) => {
992                 return Err(SendTimeoutError::Closed(value));
993             }
994             Ok(Ok(permit)) => permit,
995         };
996 
997         permit.send(value);
998         Ok(())
999     }
1000 
1001     /// Blocking send to call outside of asynchronous contexts.
1002     ///
1003     /// This method is intended for use cases where you are sending from
1004     /// synchronous code to asynchronous code, and will work even if the
1005     /// receiver is not using [`blocking_recv`] to receive the message.
1006     ///
1007     /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
1008     ///
1009     /// # Panics
1010     ///
1011     /// This function panics if called within an asynchronous execution
1012     /// context.
1013     ///
1014     /// # Examples
1015     ///
1016     /// ```
1017     /// use std::thread;
1018     /// use tokio::runtime::Runtime;
1019     /// use tokio::sync::mpsc;
1020     ///
1021     /// fn main() {
1022     ///     let (tx, mut rx) = mpsc::channel::<u8>(1);
1023     ///
1024     ///     let sync_code = thread::spawn(move || {
1025     ///         tx.blocking_send(10).unwrap();
1026     ///     });
1027     ///
1028     ///     Runtime::new().unwrap().block_on(async move {
1029     ///         assert_eq!(Some(10), rx.recv().await);
1030     ///     });
1031     ///     sync_code.join().unwrap()
1032     /// }
1033     /// ```
1034     #[track_caller]
1035     #[cfg(feature = "sync")]
1036     #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
blocking_send(&self, value: T) -> Result<(), SendError<T>>1037     pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
1038         crate::future::block_on(self.send(value))
1039     }
1040 
1041     /// Checks if the channel has been closed. This happens when the
1042     /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
1043     /// called.
1044     ///
1045     /// [`Receiver`]: crate::sync::mpsc::Receiver
1046     /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
1047     ///
1048     /// ```
1049     /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
1050     /// assert!(!tx.is_closed());
1051     ///
1052     /// let tx2 = tx.clone();
1053     /// assert!(!tx2.is_closed());
1054     ///
1055     /// drop(rx);
1056     /// assert!(tx.is_closed());
1057     /// assert!(tx2.is_closed());
1058     /// ```
is_closed(&self) -> bool1059     pub fn is_closed(&self) -> bool {
1060         self.chan.is_closed()
1061     }
1062 
1063     /// Waits for channel capacity. Once capacity to send one message is
1064     /// available, it is reserved for the caller.
1065     ///
1066     /// If the channel is full, the function waits for the number of unreceived
1067     /// messages to become less than the channel capacity. Capacity to send one
1068     /// message is reserved for the caller. A [`Permit`] is returned to track
1069     /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
1070     /// reserved capacity.
1071     ///
1072     /// Dropping [`Permit`] without sending a message releases the capacity back
1073     /// to the channel.
1074     ///
1075     /// [`Permit`]: Permit
1076     /// [`send`]: Permit::send
1077     ///
1078     /// # Cancel safety
1079     ///
1080     /// This channel uses a queue to ensure that calls to `send` and `reserve`
1081     /// complete in the order they were requested.  Cancelling a call to
1082     /// `reserve` makes you lose your place in the queue.
1083     ///
1084     /// # Examples
1085     ///
1086     /// ```
1087     /// use tokio::sync::mpsc;
1088     ///
1089     /// #[tokio::main]
1090     /// async fn main() {
1091     ///     let (tx, mut rx) = mpsc::channel(1);
1092     ///
1093     ///     // Reserve capacity
1094     ///     let permit = tx.reserve().await.unwrap();
1095     ///
1096     ///     // Trying to send directly on the `tx` will fail due to no
1097     ///     // available capacity.
1098     ///     assert!(tx.try_send(123).is_err());
1099     ///
1100     ///     // Sending on the permit succeeds
1101     ///     permit.send(456);
1102     ///
1103     ///     // The value sent on the permit is received
1104     ///     assert_eq!(rx.recv().await.unwrap(), 456);
1105     /// }
1106     /// ```
reserve(&self) -> Result<Permit<'_, T>, SendError<()>>1107     pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
1108         self.reserve_inner(1).await?;
1109         Ok(Permit { chan: &self.chan })
1110     }
1111 
1112     /// Waits for channel capacity. Once capacity to send `n` messages is
1113     /// available, it is reserved for the caller.
1114     ///
1115     /// If the channel is full or if there are fewer than `n` permits available, the function waits
1116     /// for the number of unreceived messages to become `n` less than the channel capacity.
1117     /// Capacity to send `n` message is then reserved for the caller.
1118     ///
1119     /// A [`PermitIterator`] is returned to track the reserved capacity.
1120     /// You can call this [`Iterator`] until it is exhausted to
1121     /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1122     /// [`try_reserve_many`] except it awaits for the slots to become available.
1123     ///
1124     /// If the channel is closed, the function returns a [`SendError`].
1125     ///
1126     /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1127     /// permits back to the channel.
1128     ///
1129     /// [`PermitIterator`]: PermitIterator
1130     /// [`Permit`]: Permit
1131     /// [`send`]: Permit::send
1132     /// [`try_reserve_many`]: Sender::try_reserve_many
1133     ///
1134     /// # Cancel safety
1135     ///
1136     /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
1137     /// complete in the order they were requested. Cancelling a call to
1138     /// `reserve_many` makes you lose your place in the queue.
1139     ///
1140     /// # Examples
1141     ///
1142     /// ```
1143     /// use tokio::sync::mpsc;
1144     ///
1145     /// #[tokio::main]
1146     /// async fn main() {
1147     ///     let (tx, mut rx) = mpsc::channel(2);
1148     ///
1149     ///     // Reserve capacity
1150     ///     let mut permit = tx.reserve_many(2).await.unwrap();
1151     ///
1152     ///     // Trying to send directly on the `tx` will fail due to no
1153     ///     // available capacity.
1154     ///     assert!(tx.try_send(123).is_err());
1155     ///
1156     ///     // Sending with the permit iterator succeeds
1157     ///     permit.next().unwrap().send(456);
1158     ///     permit.next().unwrap().send(457);
1159     ///
1160     ///     // The iterator should now be exhausted
1161     ///     assert!(permit.next().is_none());
1162     ///
1163     ///     // The value sent on the permit is received
1164     ///     assert_eq!(rx.recv().await.unwrap(), 456);
1165     ///     assert_eq!(rx.recv().await.unwrap(), 457);
1166     /// }
1167     /// ```
reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>>1168     pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1169         self.reserve_inner(n).await?;
1170         Ok(PermitIterator {
1171             chan: &self.chan,
1172             n,
1173         })
1174     }
1175 
1176     /// Waits for channel capacity, moving the `Sender` and returning an owned
1177     /// permit. Once capacity to send one message is available, it is reserved
1178     /// for the caller.
1179     ///
1180     /// This moves the sender _by value_, and returns an owned permit that can
1181     /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1182     /// this method may be used in cases where the permit must be valid for the
1183     /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1184     /// essentially a reference count increment, comparable to [`Arc::clone`]),
1185     /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1186     /// moved, it can be cloned prior to calling `reserve_owned`.
1187     ///
1188     /// If the channel is full, the function waits for the number of unreceived
1189     /// messages to become less than the channel capacity. Capacity to send one
1190     /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1191     /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1192     /// consumes the reserved capacity.
1193     ///
1194     /// Dropping the [`OwnedPermit`] without sending a message releases the
1195     /// capacity back to the channel.
1196     ///
1197     /// # Cancel safety
1198     ///
1199     /// This channel uses a queue to ensure that calls to `send` and `reserve`
1200     /// complete in the order they were requested.  Cancelling a call to
1201     /// `reserve_owned` makes you lose your place in the queue.
1202     ///
1203     /// # Examples
1204     /// Sending a message using an [`OwnedPermit`]:
1205     /// ```
1206     /// use tokio::sync::mpsc;
1207     ///
1208     /// #[tokio::main]
1209     /// async fn main() {
1210     ///     let (tx, mut rx) = mpsc::channel(1);
1211     ///
1212     ///     // Reserve capacity, moving the sender.
1213     ///     let permit = tx.reserve_owned().await.unwrap();
1214     ///
1215     ///     // Send a message, consuming the permit and returning
1216     ///     // the moved sender.
1217     ///     let tx = permit.send(123);
1218     ///
1219     ///     // The value sent on the permit is received.
1220     ///     assert_eq!(rx.recv().await.unwrap(), 123);
1221     ///
1222     ///     // The sender can now be used again.
1223     ///     tx.send(456).await.unwrap();
1224     /// }
1225     /// ```
1226     ///
1227     /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1228     /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1229     ///
1230     /// ```
1231     /// use tokio::sync::mpsc;
1232     ///
1233     /// #[tokio::main]
1234     /// async fn main() {
1235     ///     let (tx, mut rx) = mpsc::channel(1);
1236     ///
1237     ///     // Clone the sender and reserve capacity.
1238     ///     let permit = tx.clone().reserve_owned().await.unwrap();
1239     ///
1240     ///     // Trying to send directly on the `tx` will fail due to no
1241     ///     // available capacity.
1242     ///     assert!(tx.try_send(123).is_err());
1243     ///
1244     ///     // Sending on the permit succeeds.
1245     ///     permit.send(456);
1246     ///
1247     ///     // The value sent on the permit is received
1248     ///     assert_eq!(rx.recv().await.unwrap(), 456);
1249     /// }
1250     /// ```
1251     ///
1252     /// [`Sender::reserve`]: Sender::reserve
1253     /// [`OwnedPermit`]: OwnedPermit
1254     /// [`send`]: OwnedPermit::send
1255     /// [`Arc::clone`]: std::sync::Arc::clone
reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>>1256     pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1257         self.reserve_inner(1).await?;
1258         Ok(OwnedPermit {
1259             chan: Some(self.chan),
1260         })
1261     }
1262 
reserve_inner(&self, n: usize) -> Result<(), SendError<()>>1263     async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1264         crate::trace::async_trace_leaf().await;
1265 
1266         if n > self.max_capacity() {
1267             return Err(SendError(()));
1268         }
1269         match self.chan.semaphore().semaphore.acquire(n).await {
1270             Ok(()) => Ok(()),
1271             Err(_) => Err(SendError(())),
1272         }
1273     }
1274 
1275     /// Tries to acquire a slot in the channel without waiting for the slot to become
1276     /// available.
1277     ///
1278     /// If the channel is full this function will return [`TrySendError`], otherwise
1279     /// if there is a slot available it will return a [`Permit`] that will then allow you
1280     /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1281     /// [`reserve`] except it does not await for the slot to become available.
1282     ///
1283     /// Dropping [`Permit`] without sending a message releases the capacity back
1284     /// to the channel.
1285     ///
1286     /// [`Permit`]: Permit
1287     /// [`send`]: Permit::send
1288     /// [`reserve`]: Sender::reserve
1289     ///
1290     /// # Examples
1291     ///
1292     /// ```
1293     /// use tokio::sync::mpsc;
1294     ///
1295     /// #[tokio::main]
1296     /// async fn main() {
1297     ///     let (tx, mut rx) = mpsc::channel(1);
1298     ///
1299     ///     // Reserve capacity
1300     ///     let permit = tx.try_reserve().unwrap();
1301     ///
1302     ///     // Trying to send directly on the `tx` will fail due to no
1303     ///     // available capacity.
1304     ///     assert!(tx.try_send(123).is_err());
1305     ///
1306     ///     // Trying to reserve an additional slot on the `tx` will
1307     ///     // fail because there is no capacity.
1308     ///     assert!(tx.try_reserve().is_err());
1309     ///
1310     ///     // Sending on the permit succeeds
1311     ///     permit.send(456);
1312     ///
1313     ///     // The value sent on the permit is received
1314     ///     assert_eq!(rx.recv().await.unwrap(), 456);
1315     ///
1316     /// }
1317     /// ```
try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>>1318     pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1319         match self.chan.semaphore().semaphore.try_acquire(1) {
1320             Ok(()) => {}
1321             Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1322             Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1323         }
1324 
1325         Ok(Permit { chan: &self.chan })
1326     }
1327 
1328     /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1329     /// available.
1330     ///
1331     /// A [`PermitIterator`] is returned to track the reserved capacity.
1332     /// You can call this [`Iterator`] until it is exhausted to
1333     /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1334     /// [`reserve_many`] except it does not await for the slots to become available.
1335     ///
1336     /// If there are fewer than `n` permits available on the channel, then
1337     /// this function will return a [`TrySendError::Full`]. If the channel is closed
1338     /// this function will return a [`TrySendError::Closed`].
1339     ///
1340     /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1341     /// permits back to the channel.
1342     ///
1343     /// [`PermitIterator`]: PermitIterator
1344     /// [`send`]: Permit::send
1345     /// [`reserve_many`]: Sender::reserve_many
1346     ///
1347     /// # Examples
1348     ///
1349     /// ```
1350     /// use tokio::sync::mpsc;
1351     ///
1352     /// #[tokio::main]
1353     /// async fn main() {
1354     ///     let (tx, mut rx) = mpsc::channel(2);
1355     ///
1356     ///     // Reserve capacity
1357     ///     let mut permit = tx.try_reserve_many(2).unwrap();
1358     ///
1359     ///     // Trying to send directly on the `tx` will fail due to no
1360     ///     // available capacity.
1361     ///     assert!(tx.try_send(123).is_err());
1362     ///
1363     ///     // Trying to reserve an additional slot on the `tx` will
1364     ///     // fail because there is no capacity.
1365     ///     assert!(tx.try_reserve().is_err());
1366     ///
1367     ///     // Sending with the permit iterator succeeds
1368     ///     permit.next().unwrap().send(456);
1369     ///     permit.next().unwrap().send(457);
1370     ///
1371     ///     // The iterator should now be exhausted
1372     ///     assert!(permit.next().is_none());
1373     ///
1374     ///     // The value sent on the permit is received
1375     ///     assert_eq!(rx.recv().await.unwrap(), 456);
1376     ///     assert_eq!(rx.recv().await.unwrap(), 457);
1377     ///
1378     ///     // Trying to call try_reserve_many with 0 will return an empty iterator
1379     ///     let mut permit = tx.try_reserve_many(0).unwrap();
1380     ///     assert!(permit.next().is_none());
1381     ///
1382     ///     // Trying to call try_reserve_many with a number greater than the channel
1383     ///     // capacity will return an error
1384     ///     let permit = tx.try_reserve_many(3);
1385     ///     assert!(permit.is_err());
1386     ///
1387     ///     // Trying to call try_reserve_many on a closed channel will return an error
1388     ///     drop(rx);
1389     ///     let permit = tx.try_reserve_many(1);
1390     ///     assert!(permit.is_err());
1391     ///
1392     ///     let permit = tx.try_reserve_many(0);
1393     ///     assert!(permit.is_err());
1394     /// }
1395     /// ```
try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>>1396     pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1397         if n > self.max_capacity() {
1398             return Err(TrySendError::Full(()));
1399         }
1400 
1401         match self.chan.semaphore().semaphore.try_acquire(n) {
1402             Ok(()) => {}
1403             Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1404             Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1405         }
1406 
1407         Ok(PermitIterator {
1408             chan: &self.chan,
1409             n,
1410         })
1411     }
1412 
1413     /// Tries to acquire a slot in the channel without waiting for the slot to become
1414     /// available, returning an owned permit.
1415     ///
1416     /// This moves the sender _by value_, and returns an owned permit that can
1417     /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1418     /// this method may be used in cases where the permit must be valid for the
1419     /// `'static` lifetime.  `Sender`s may be cloned cheaply (`Sender::clone` is
1420     /// essentially a reference count increment, comparable to [`Arc::clone`]),
1421     /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1422     /// moved, it can be cloned prior to calling `try_reserve_owned`.
1423     ///
1424     /// If the channel is full this function will return a [`TrySendError`].
1425     /// Since the sender is taken by value, the `TrySendError` returned in this
1426     /// case contains the sender, so that it may be used again. Otherwise, if
1427     /// there is a slot available, this method will return an [`OwnedPermit`]
1428     /// that can then be used to [`send`] on the channel with a guaranteed slot.
1429     /// This function is similar to  [`reserve_owned`] except it does not await
1430     /// for the slot to become available.
1431     ///
1432     /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1433     /// to the channel.
1434     ///
1435     /// [`OwnedPermit`]: OwnedPermit
1436     /// [`send`]: OwnedPermit::send
1437     /// [`reserve_owned`]: Sender::reserve_owned
1438     /// [`Arc::clone`]: std::sync::Arc::clone
1439     ///
1440     /// # Examples
1441     ///
1442     /// ```
1443     /// use tokio::sync::mpsc;
1444     ///
1445     /// #[tokio::main]
1446     /// async fn main() {
1447     ///     let (tx, mut rx) = mpsc::channel(1);
1448     ///
1449     ///     // Reserve capacity
1450     ///     let permit = tx.clone().try_reserve_owned().unwrap();
1451     ///
1452     ///     // Trying to send directly on the `tx` will fail due to no
1453     ///     // available capacity.
1454     ///     assert!(tx.try_send(123).is_err());
1455     ///
1456     ///     // Trying to reserve an additional slot on the `tx` will
1457     ///     // fail because there is no capacity.
1458     ///     assert!(tx.try_reserve().is_err());
1459     ///
1460     ///     // Sending on the permit succeeds
1461     ///     permit.send(456);
1462     ///
1463     ///     // The value sent on the permit is received
1464     ///     assert_eq!(rx.recv().await.unwrap(), 456);
1465     ///
1466     /// }
1467     /// ```
try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>>1468     pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1469         match self.chan.semaphore().semaphore.try_acquire(1) {
1470             Ok(()) => {}
1471             Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1472             Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1473         }
1474 
1475         Ok(OwnedPermit {
1476             chan: Some(self.chan),
1477         })
1478     }
1479 
1480     /// Returns `true` if senders belong to the same channel.
1481     ///
1482     /// # Examples
1483     ///
1484     /// ```
1485     /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1486     /// let  tx2 = tx.clone();
1487     /// assert!(tx.same_channel(&tx2));
1488     ///
1489     /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1490     /// assert!(!tx3.same_channel(&tx2));
1491     /// ```
same_channel(&self, other: &Self) -> bool1492     pub fn same_channel(&self, other: &Self) -> bool {
1493         self.chan.same_channel(&other.chan)
1494     }
1495 
1496     /// Returns the current capacity of the channel.
1497     ///
1498     /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1499     /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1500     /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1501     /// specified when calling [`channel`]
1502     ///
1503     /// # Examples
1504     ///
1505     /// ```
1506     /// use tokio::sync::mpsc;
1507     ///
1508     /// #[tokio::main]
1509     /// async fn main() {
1510     ///     let (tx, mut rx) = mpsc::channel::<()>(5);
1511     ///
1512     ///     assert_eq!(tx.capacity(), 5);
1513     ///
1514     ///     // Making a reservation drops the capacity by one.
1515     ///     let permit = tx.reserve().await.unwrap();
1516     ///     assert_eq!(tx.capacity(), 4);
1517     ///
1518     ///     // Sending and receiving a value increases the capacity by one.
1519     ///     permit.send(());
1520     ///     rx.recv().await.unwrap();
1521     ///     assert_eq!(tx.capacity(), 5);
1522     /// }
1523     /// ```
1524     ///
1525     /// [`send`]: Sender::send
1526     /// [`reserve`]: Sender::reserve
1527     /// [`channel`]: channel
1528     /// [`max_capacity`]: Sender::max_capacity
capacity(&self) -> usize1529     pub fn capacity(&self) -> usize {
1530         self.chan.semaphore().semaphore.available_permits()
1531     }
1532 
1533     /// Converts the `Sender` to a [`WeakSender`] that does not count
1534     /// towards RAII semantics, i.e. if all `Sender` instances of the
1535     /// channel were dropped and only `WeakSender` instances remain,
1536     /// the channel is closed.
1537     #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
downgrade(&self) -> WeakSender<T>1538     pub fn downgrade(&self) -> WeakSender<T> {
1539         WeakSender {
1540             chan: self.chan.downgrade(),
1541         }
1542     }
1543 
1544     /// Returns the maximum buffer capacity of the channel.
1545     ///
1546     /// The maximum capacity is the buffer capacity initially specified when calling
1547     /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1548     /// available buffer capacity: as messages are sent and received, the
1549     /// value returned by [`capacity`] will go up or down, whereas the value
1550     /// returned by [`max_capacity`] will remain constant.
1551     ///
1552     /// # Examples
1553     ///
1554     /// ```
1555     /// use tokio::sync::mpsc;
1556     ///
1557     /// #[tokio::main]
1558     /// async fn main() {
1559     ///     let (tx, _rx) = mpsc::channel::<()>(5);
1560     ///
1561     ///     // both max capacity and capacity are the same at first
1562     ///     assert_eq!(tx.max_capacity(), 5);
1563     ///     assert_eq!(tx.capacity(), 5);
1564     ///
1565     ///     // Making a reservation doesn't change the max capacity.
1566     ///     let permit = tx.reserve().await.unwrap();
1567     ///     assert_eq!(tx.max_capacity(), 5);
1568     ///     // but drops the capacity by one
1569     ///     assert_eq!(tx.capacity(), 4);
1570     /// }
1571     /// ```
1572     ///
1573     /// [`channel`]: channel
1574     /// [`max_capacity`]: Sender::max_capacity
1575     /// [`capacity`]: Sender::capacity
max_capacity(&self) -> usize1576     pub fn max_capacity(&self) -> usize {
1577         self.chan.semaphore().bound
1578     }
1579 
1580     /// Returns the number of [`Sender`] handles.
strong_count(&self) -> usize1581     pub fn strong_count(&self) -> usize {
1582         self.chan.strong_count()
1583     }
1584 
1585     /// Returns the number of [`WeakSender`] handles.
weak_count(&self) -> usize1586     pub fn weak_count(&self) -> usize {
1587         self.chan.weak_count()
1588     }
1589 }
1590 
1591 impl<T> Clone for Sender<T> {
clone(&self) -> Self1592     fn clone(&self) -> Self {
1593         Sender {
1594             chan: self.chan.clone(),
1595         }
1596     }
1597 }
1598 
1599 impl<T> fmt::Debug for Sender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1600     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1601         fmt.debug_struct("Sender")
1602             .field("chan", &self.chan)
1603             .finish()
1604     }
1605 }
1606 
1607 impl<T> Clone for WeakSender<T> {
clone(&self) -> Self1608     fn clone(&self) -> Self {
1609         self.chan.increment_weak_count();
1610 
1611         WeakSender {
1612             chan: self.chan.clone(),
1613         }
1614     }
1615 }
1616 
1617 impl<T> Drop for WeakSender<T> {
drop(&mut self)1618     fn drop(&mut self) {
1619         self.chan.decrement_weak_count();
1620     }
1621 }
1622 
1623 impl<T> WeakSender<T> {
1624     /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1625     /// if there are other `Sender` instances alive and the channel wasn't
1626     /// previously dropped, otherwise `None` is returned.
upgrade(&self) -> Option<Sender<T>>1627     pub fn upgrade(&self) -> Option<Sender<T>> {
1628         chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1629     }
1630 
1631     /// Returns the number of [`Sender`] handles.
strong_count(&self) -> usize1632     pub fn strong_count(&self) -> usize {
1633         self.chan.strong_count()
1634     }
1635 
1636     /// Returns the number of [`WeakSender`] handles.
weak_count(&self) -> usize1637     pub fn weak_count(&self) -> usize {
1638         self.chan.weak_count()
1639     }
1640 }
1641 
1642 impl<T> fmt::Debug for WeakSender<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1643     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1644         fmt.debug_struct("WeakSender").finish()
1645     }
1646 }
1647 
1648 // ===== impl Permit =====
1649 
1650 impl<T> Permit<'_, T> {
1651     /// Sends a value using the reserved capacity.
1652     ///
1653     /// Capacity for the message has already been reserved. The message is sent
1654     /// to the receiver and the permit is consumed. The operation will succeed
1655     /// even if the receiver half has been closed. See [`Receiver::close`] for
1656     /// more details on performing a clean shutdown.
1657     ///
1658     /// [`Receiver::close`]: Receiver::close
1659     ///
1660     /// # Examples
1661     ///
1662     /// ```
1663     /// use tokio::sync::mpsc;
1664     ///
1665     /// #[tokio::main]
1666     /// async fn main() {
1667     ///     let (tx, mut rx) = mpsc::channel(1);
1668     ///
1669     ///     // Reserve capacity
1670     ///     let permit = tx.reserve().await.unwrap();
1671     ///
1672     ///     // Trying to send directly on the `tx` will fail due to no
1673     ///     // available capacity.
1674     ///     assert!(tx.try_send(123).is_err());
1675     ///
1676     ///     // Send a message on the permit
1677     ///     permit.send(456);
1678     ///
1679     ///     // The value sent on the permit is received
1680     ///     assert_eq!(rx.recv().await.unwrap(), 456);
1681     /// }
1682     /// ```
send(self, value: T)1683     pub fn send(self, value: T) {
1684         use std::mem;
1685 
1686         self.chan.send(value);
1687 
1688         // Avoid the drop logic
1689         mem::forget(self);
1690     }
1691 }
1692 
1693 impl<T> Drop for Permit<'_, T> {
drop(&mut self)1694     fn drop(&mut self) {
1695         use chan::Semaphore;
1696 
1697         let semaphore = self.chan.semaphore();
1698 
1699         // Add the permit back to the semaphore
1700         semaphore.add_permit();
1701 
1702         // If this is the last sender for this channel, wake the receiver so
1703         // that it can be notified that the channel is closed.
1704         if semaphore.is_closed() && semaphore.is_idle() {
1705             self.chan.wake_rx();
1706         }
1707     }
1708 }
1709 
1710 impl<T> fmt::Debug for Permit<'_, T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1711     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1712         fmt.debug_struct("Permit")
1713             .field("chan", &self.chan)
1714             .finish()
1715     }
1716 }
1717 
1718 // ===== impl PermitIterator =====
1719 
1720 impl<'a, T> Iterator for PermitIterator<'a, T> {
1721     type Item = Permit<'a, T>;
1722 
next(&mut self) -> Option<Self::Item>1723     fn next(&mut self) -> Option<Self::Item> {
1724         if self.n == 0 {
1725             return None;
1726         }
1727 
1728         self.n -= 1;
1729         Some(Permit { chan: self.chan })
1730     }
1731 
size_hint(&self) -> (usize, Option<usize>)1732     fn size_hint(&self) -> (usize, Option<usize>) {
1733         let n = self.n;
1734         (n, Some(n))
1735     }
1736 }
1737 impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1738 impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1739 
1740 impl<T> Drop for PermitIterator<'_, T> {
drop(&mut self)1741     fn drop(&mut self) {
1742         use chan::Semaphore;
1743 
1744         if self.n == 0 {
1745             return;
1746         }
1747 
1748         let semaphore = self.chan.semaphore();
1749 
1750         // Add the remaining permits back to the semaphore
1751         semaphore.add_permits(self.n);
1752 
1753         // If this is the last sender for this channel, wake the receiver so
1754         // that it can be notified that the channel is closed.
1755         if semaphore.is_closed() && semaphore.is_idle() {
1756             self.chan.wake_rx();
1757         }
1758     }
1759 }
1760 
1761 impl<T> fmt::Debug for PermitIterator<'_, T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1762     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1763         fmt.debug_struct("PermitIterator")
1764             .field("chan", &self.chan)
1765             .field("capacity", &self.n)
1766             .finish()
1767     }
1768 }
1769 
1770 // ===== impl Permit =====
1771 
1772 impl<T> OwnedPermit<T> {
1773     /// Sends a value using the reserved capacity.
1774     ///
1775     /// Capacity for the message has already been reserved. The message is sent
1776     /// to the receiver and the permit is consumed. The operation will succeed
1777     /// even if the receiver half has been closed. See [`Receiver::close`] for
1778     /// more details on performing a clean shutdown.
1779     ///
1780     /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1781     /// the `OwnedPermit` was reserved.
1782     ///
1783     /// [`Receiver::close`]: Receiver::close
1784     ///
1785     /// # Examples
1786     ///
1787     /// ```
1788     /// use tokio::sync::mpsc;
1789     ///
1790     /// #[tokio::main]
1791     /// async fn main() {
1792     ///     let (tx, mut rx) = mpsc::channel(1);
1793     ///
1794     ///     // Reserve capacity
1795     ///     let permit = tx.reserve_owned().await.unwrap();
1796     ///
1797     ///     // Send a message on the permit, returning the sender.
1798     ///     let tx = permit.send(456);
1799     ///
1800     ///     // The value sent on the permit is received
1801     ///     assert_eq!(rx.recv().await.unwrap(), 456);
1802     ///
1803     ///     // We may now reuse `tx` to send another message.
1804     ///     tx.send(789).await.unwrap();
1805     /// }
1806     /// ```
send(mut self, value: T) -> Sender<T>1807     pub fn send(mut self, value: T) -> Sender<T> {
1808         let chan = self.chan.take().unwrap_or_else(|| {
1809             unreachable!("OwnedPermit channel is only taken when the permit is moved")
1810         });
1811         chan.send(value);
1812 
1813         Sender { chan }
1814     }
1815 
1816     /// Releases the reserved capacity *without* sending a message, returning the
1817     /// [`Sender`].
1818     ///
1819     /// # Examples
1820     ///
1821     /// ```
1822     /// use tokio::sync::mpsc;
1823     ///
1824     /// #[tokio::main]
1825     /// async fn main() {
1826     ///     let (tx, rx) = mpsc::channel(1);
1827     ///
1828     ///     // Clone the sender and reserve capacity
1829     ///     let permit = tx.clone().reserve_owned().await.unwrap();
1830     ///
1831     ///     // Trying to send on the original `tx` will fail, since the `permit`
1832     ///     // has reserved all the available capacity.
1833     ///     assert!(tx.try_send(123).is_err());
1834     ///
1835     ///     // Release the permit without sending a message, returning the clone
1836     ///     // of the sender.
1837     ///     let tx2 = permit.release();
1838     ///
1839     ///     // We may now reuse `tx` to send another message.
1840     ///     tx.send(789).await.unwrap();
1841     ///     # drop(rx); drop(tx2);
1842     /// }
1843     /// ```
1844     ///
1845     /// [`Sender`]: Sender
release(mut self) -> Sender<T>1846     pub fn release(mut self) -> Sender<T> {
1847         use chan::Semaphore;
1848 
1849         let chan = self.chan.take().unwrap_or_else(|| {
1850             unreachable!("OwnedPermit channel is only taken when the permit is moved")
1851         });
1852 
1853         // Add the permit back to the semaphore
1854         chan.semaphore().add_permit();
1855         Sender { chan }
1856     }
1857 }
1858 
1859 impl<T> Drop for OwnedPermit<T> {
drop(&mut self)1860     fn drop(&mut self) {
1861         use chan::Semaphore;
1862 
1863         // Are we still holding onto the sender?
1864         if let Some(chan) = self.chan.take() {
1865             let semaphore = chan.semaphore();
1866 
1867             // Add the permit back to the semaphore
1868             semaphore.add_permit();
1869 
1870             // If this `OwnedPermit` is holding the last sender for this
1871             // channel, wake the receiver so that it can be notified that the
1872             // channel is closed.
1873             if semaphore.is_closed() && semaphore.is_idle() {
1874                 chan.wake_rx();
1875             }
1876         }
1877 
1878         // Otherwise, do nothing.
1879     }
1880 }
1881 
1882 impl<T> fmt::Debug for OwnedPermit<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1883     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1884         fmt.debug_struct("OwnedPermit")
1885             .field("chan", &self.chan)
1886             .finish()
1887     }
1888 }
1889