1 //! The channel interface.
2 
3 use std::fmt;
4 use std::iter::FusedIterator;
5 use std::mem;
6 use std::panic::{RefUnwindSafe, UnwindSafe};
7 use std::sync::Arc;
8 use std::time::{Duration, Instant};
9 
10 use crate::context::Context;
11 use crate::counter;
12 use crate::err::{
13     RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
14 };
15 use crate::flavors;
16 use crate::select::{Operation, SelectHandle, Token};
17 
18 /// Creates a channel of unbounded capacity.
19 ///
20 /// This channel has a growable buffer that can hold any number of messages at a time.
21 ///
22 /// # Examples
23 ///
24 /// ```
25 /// use std::thread;
26 /// use crossbeam_channel::unbounded;
27 ///
28 /// let (s, r) = unbounded();
29 ///
30 /// // Computes the n-th Fibonacci number.
31 /// fn fib(n: i32) -> i32 {
32 ///     if n <= 1 {
33 ///         n
34 ///     } else {
35 ///         fib(n - 1) + fib(n - 2)
36 ///     }
37 /// }
38 ///
39 /// // Spawn an asynchronous computation.
40 /// thread::spawn(move || s.send(fib(20)).unwrap());
41 ///
42 /// // Print the result of the computation.
43 /// println!("{}", r.recv().unwrap());
44 /// ```
unbounded<T>() -> (Sender<T>, Receiver<T>)45 pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
46     let (s, r) = counter::new(flavors::list::Channel::new());
47     let s = Sender {
48         flavor: SenderFlavor::List(s),
49     };
50     let r = Receiver {
51         flavor: ReceiverFlavor::List(r),
52     };
53     (s, r)
54 }
55 
56 /// Creates a channel of bounded capacity.
57 ///
58 /// This channel has a buffer that can hold at most `cap` messages at a time.
59 ///
60 /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
61 /// receive operations must appear at the same time in order to pair up and pass the message over.
62 ///
63 /// # Examples
64 ///
65 /// A channel of capacity 1:
66 ///
67 /// ```
68 /// use std::thread;
69 /// use std::time::Duration;
70 /// use crossbeam_channel::bounded;
71 ///
72 /// let (s, r) = bounded(1);
73 ///
74 /// // This call returns immediately because there is enough space in the channel.
75 /// s.send(1).unwrap();
76 ///
77 /// thread::spawn(move || {
78 ///     // This call blocks the current thread because the channel is full.
79 ///     // It will be able to complete only after the first message is received.
80 ///     s.send(2).unwrap();
81 /// });
82 ///
83 /// thread::sleep(Duration::from_secs(1));
84 /// assert_eq!(r.recv(), Ok(1));
85 /// assert_eq!(r.recv(), Ok(2));
86 /// ```
87 ///
88 /// A zero-capacity channel:
89 ///
90 /// ```
91 /// use std::thread;
92 /// use std::time::Duration;
93 /// use crossbeam_channel::bounded;
94 ///
95 /// let (s, r) = bounded(0);
96 ///
97 /// thread::spawn(move || {
98 ///     // This call blocks the current thread until a receive operation appears
99 ///     // on the other side of the channel.
100 ///     s.send(1).unwrap();
101 /// });
102 ///
103 /// thread::sleep(Duration::from_secs(1));
104 /// assert_eq!(r.recv(), Ok(1));
105 /// ```
bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>)106 pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
107     if cap == 0 {
108         let (s, r) = counter::new(flavors::zero::Channel::new());
109         let s = Sender {
110             flavor: SenderFlavor::Zero(s),
111         };
112         let r = Receiver {
113             flavor: ReceiverFlavor::Zero(r),
114         };
115         (s, r)
116     } else {
117         let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
118         let s = Sender {
119             flavor: SenderFlavor::Array(s),
120         };
121         let r = Receiver {
122             flavor: ReceiverFlavor::Array(r),
123         };
124         (s, r)
125     }
126 }
127 
128 /// Creates a receiver that delivers a message after a certain duration of time.
129 ///
130 /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
131 /// be sent into the channel after `duration` elapses. The message is the instant at which it is
132 /// sent.
133 ///
134 /// # Examples
135 ///
136 /// Using an `after` channel for timeouts:
137 ///
138 /// ```
139 /// use std::time::Duration;
140 /// use crossbeam_channel::{after, select, unbounded};
141 ///
142 /// let (s, r) = unbounded::<i32>();
143 /// let timeout = Duration::from_millis(100);
144 ///
145 /// select! {
146 ///     recv(r) -> msg => println!("received {:?}", msg),
147 ///     recv(after(timeout)) -> _ => println!("timed out"),
148 /// }
149 /// ```
150 ///
151 /// When the message gets sent:
152 ///
153 /// ```
154 /// use std::thread;
155 /// use std::time::{Duration, Instant};
156 /// use crossbeam_channel::after;
157 ///
158 /// // Converts a number of milliseconds into a `Duration`.
159 /// let ms = |ms| Duration::from_millis(ms);
160 ///
161 /// // Returns `true` if `a` and `b` are very close `Instant`s.
162 /// let eq = |a, b| a + ms(60) > b && b + ms(60) > a;
163 ///
164 /// let start = Instant::now();
165 /// let r = after(ms(100));
166 ///
167 /// thread::sleep(ms(500));
168 ///
169 /// // This message was sent 100 ms from the start and received 500 ms from the start.
170 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
171 /// assert!(eq(Instant::now(), start + ms(500)));
172 /// ```
after(duration: Duration) -> Receiver<Instant>173 pub fn after(duration: Duration) -> Receiver<Instant> {
174     match Instant::now().checked_add(duration) {
175         Some(deadline) => Receiver {
176             flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))),
177         },
178         None => never(),
179     }
180 }
181 
182 /// Creates a receiver that delivers a message at a certain instant in time.
183 ///
184 /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
185 /// be sent into the channel at the moment in time `when`. The message is the instant at which it
186 /// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
187 /// instantly to the receiver.
188 ///
189 /// # Examples
190 ///
191 /// Using an `at` channel for timeouts:
192 ///
193 /// ```
194 /// use std::time::{Instant, Duration};
195 /// use crossbeam_channel::{at, select, unbounded};
196 ///
197 /// let (s, r) = unbounded::<i32>();
198 /// let deadline = Instant::now() + Duration::from_millis(500);
199 ///
200 /// select! {
201 ///     recv(r) -> msg => println!("received {:?}", msg),
202 ///     recv(at(deadline)) -> _ => println!("timed out"),
203 /// }
204 /// ```
205 ///
206 /// When the message gets sent:
207 ///
208 /// ```
209 /// use std::time::{Duration, Instant};
210 /// use crossbeam_channel::at;
211 ///
212 /// // Converts a number of milliseconds into a `Duration`.
213 /// let ms = |ms| Duration::from_millis(ms);
214 ///
215 /// let start = Instant::now();
216 /// let end = start + ms(100);
217 ///
218 /// let r = at(end);
219 ///
220 /// // This message was sent 100 ms from the start
221 /// assert_eq!(r.recv().unwrap(), end);
222 /// assert!(Instant::now() > start + ms(100));
223 /// ```
at(when: Instant) -> Receiver<Instant>224 pub fn at(when: Instant) -> Receiver<Instant> {
225     Receiver {
226         flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
227     }
228 }
229 
230 /// Creates a receiver that never delivers messages.
231 ///
232 /// The channel is bounded with capacity of 0 and never gets disconnected.
233 ///
234 /// # Examples
235 ///
236 /// Using a `never` channel to optionally add a timeout to [`select!`]:
237 ///
238 /// [`select!`]: crate::select!
239 ///
240 /// ```
241 /// use std::thread;
242 /// use std::time::Duration;
243 /// use crossbeam_channel::{after, select, never, unbounded};
244 ///
245 /// let (s, r) = unbounded();
246 ///
247 /// thread::spawn(move || {
248 ///     thread::sleep(Duration::from_secs(1));
249 ///     s.send(1).unwrap();
250 /// });
251 ///
252 /// // Suppose this duration can be a `Some` or a `None`.
253 /// let duration = Some(Duration::from_millis(100));
254 ///
255 /// // Create a channel that times out after the specified duration.
256 /// let timeout = duration
257 ///     .map(|d| after(d))
258 ///     .unwrap_or(never());
259 ///
260 /// select! {
261 ///     recv(r) -> msg => assert_eq!(msg, Ok(1)),
262 ///     recv(timeout) -> _ => println!("timed out"),
263 /// }
264 /// ```
never<T>() -> Receiver<T>265 pub fn never<T>() -> Receiver<T> {
266     Receiver {
267         flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
268     }
269 }
270 
271 /// Creates a receiver that delivers messages periodically.
272 ///
273 /// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
274 /// sent into the channel in intervals of `duration`. Each message is the instant at which it is
275 /// sent.
276 ///
277 /// # Examples
278 ///
279 /// Using a `tick` channel to periodically print elapsed time:
280 ///
281 /// ```
282 /// use std::time::{Duration, Instant};
283 /// use crossbeam_channel::tick;
284 ///
285 /// let start = Instant::now();
286 /// let ticker = tick(Duration::from_millis(100));
287 ///
288 /// for _ in 0..5 {
289 ///     ticker.recv().unwrap();
290 ///     println!("elapsed: {:?}", start.elapsed());
291 /// }
292 /// ```
293 ///
294 /// When messages get sent:
295 ///
296 /// ```
297 /// use std::thread;
298 /// use std::time::{Duration, Instant};
299 /// use crossbeam_channel::tick;
300 ///
301 /// // Converts a number of milliseconds into a `Duration`.
302 /// let ms = |ms| Duration::from_millis(ms);
303 ///
304 /// // Returns `true` if `a` and `b` are very close `Instant`s.
305 /// let eq = |a, b| a + ms(65) > b && b + ms(65) > a;
306 ///
307 /// let start = Instant::now();
308 /// let r = tick(ms(100));
309 ///
310 /// // This message was sent 100 ms from the start and received 100 ms from the start.
311 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
312 /// assert!(eq(Instant::now(), start + ms(100)));
313 ///
314 /// thread::sleep(ms(500));
315 ///
316 /// // This message was sent 200 ms from the start and received 600 ms from the start.
317 /// assert!(eq(r.recv().unwrap(), start + ms(200)));
318 /// assert!(eq(Instant::now(), start + ms(600)));
319 ///
320 /// // This message was sent 700 ms from the start and received 700 ms from the start.
321 /// assert!(eq(r.recv().unwrap(), start + ms(700)));
322 /// assert!(eq(Instant::now(), start + ms(700)));
323 /// ```
tick(duration: Duration) -> Receiver<Instant>324 pub fn tick(duration: Duration) -> Receiver<Instant> {
325     match Instant::now().checked_add(duration) {
326         Some(delivery_time) => Receiver {
327             flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(
328                 delivery_time,
329                 duration,
330             ))),
331         },
332         None => never(),
333     }
334 }
335 
336 /// The sending side of a channel.
337 ///
338 /// # Examples
339 ///
340 /// ```
341 /// use std::thread;
342 /// use crossbeam_channel::unbounded;
343 ///
344 /// let (s1, r) = unbounded();
345 /// let s2 = s1.clone();
346 ///
347 /// thread::spawn(move || s1.send(1).unwrap());
348 /// thread::spawn(move || s2.send(2).unwrap());
349 ///
350 /// let msg1 = r.recv().unwrap();
351 /// let msg2 = r.recv().unwrap();
352 ///
353 /// assert_eq!(msg1 + msg2, 3);
354 /// ```
355 pub struct Sender<T> {
356     flavor: SenderFlavor<T>,
357 }
358 
359 /// Sender flavors.
360 enum SenderFlavor<T> {
361     /// Bounded channel based on a preallocated array.
362     Array(counter::Sender<flavors::array::Channel<T>>),
363 
364     /// Unbounded channel implemented as a linked list.
365     List(counter::Sender<flavors::list::Channel<T>>),
366 
367     /// Zero-capacity channel.
368     Zero(counter::Sender<flavors::zero::Channel<T>>),
369 }
370 
371 unsafe impl<T: Send> Send for Sender<T> {}
372 unsafe impl<T: Send> Sync for Sender<T> {}
373 
374 impl<T> UnwindSafe for Sender<T> {}
375 impl<T> RefUnwindSafe for Sender<T> {}
376 
377 impl<T> Sender<T> {
378     /// Attempts to send a message into the channel without blocking.
379     ///
380     /// This method will either send a message into the channel immediately or return an error if
381     /// the channel is full or disconnected. The returned error contains the original message.
382     ///
383     /// If called on a zero-capacity channel, this method will send the message only if there
384     /// happens to be a receive operation on the other side of the channel at the same time.
385     ///
386     /// # Examples
387     ///
388     /// ```
389     /// use crossbeam_channel::{bounded, TrySendError};
390     ///
391     /// let (s, r) = bounded(1);
392     ///
393     /// assert_eq!(s.try_send(1), Ok(()));
394     /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
395     ///
396     /// drop(r);
397     /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
398     /// ```
try_send(&self, msg: T) -> Result<(), TrySendError<T>>399     pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
400         match &self.flavor {
401             SenderFlavor::Array(chan) => chan.try_send(msg),
402             SenderFlavor::List(chan) => chan.try_send(msg),
403             SenderFlavor::Zero(chan) => chan.try_send(msg),
404         }
405     }
406 
407     /// Blocks the current thread until a message is sent or the channel is disconnected.
408     ///
409     /// If the channel is full and not disconnected, this call will block until the send operation
410     /// can proceed. If the channel becomes disconnected, this call will wake up and return an
411     /// error. The returned error contains the original message.
412     ///
413     /// If called on a zero-capacity channel, this method will wait for a receive operation to
414     /// appear on the other side of the channel.
415     ///
416     /// # Examples
417     ///
418     /// ```
419     /// use std::thread;
420     /// use std::time::Duration;
421     /// use crossbeam_channel::{bounded, SendError};
422     ///
423     /// let (s, r) = bounded(1);
424     /// assert_eq!(s.send(1), Ok(()));
425     ///
426     /// thread::spawn(move || {
427     ///     assert_eq!(r.recv(), Ok(1));
428     ///     thread::sleep(Duration::from_secs(1));
429     ///     drop(r);
430     /// });
431     ///
432     /// assert_eq!(s.send(2), Ok(()));
433     /// assert_eq!(s.send(3), Err(SendError(3)));
434     /// ```
send(&self, msg: T) -> Result<(), SendError<T>>435     pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
436         match &self.flavor {
437             SenderFlavor::Array(chan) => chan.send(msg, None),
438             SenderFlavor::List(chan) => chan.send(msg, None),
439             SenderFlavor::Zero(chan) => chan.send(msg, None),
440         }
441         .map_err(|err| match err {
442             SendTimeoutError::Disconnected(msg) => SendError(msg),
443             SendTimeoutError::Timeout(_) => unreachable!(),
444         })
445     }
446 
447     /// Waits for a message to be sent into the channel, but only for a limited time.
448     ///
449     /// If the channel is full and not disconnected, this call will block until the send operation
450     /// can proceed or the operation times out. If the channel becomes disconnected, this call will
451     /// wake up and return an error. The returned error contains the original message.
452     ///
453     /// If called on a zero-capacity channel, this method will wait for a receive operation to
454     /// appear on the other side of the channel.
455     ///
456     /// # Examples
457     ///
458     /// ```
459     /// use std::thread;
460     /// use std::time::Duration;
461     /// use crossbeam_channel::{bounded, SendTimeoutError};
462     ///
463     /// let (s, r) = bounded(0);
464     ///
465     /// thread::spawn(move || {
466     ///     thread::sleep(Duration::from_secs(1));
467     ///     assert_eq!(r.recv(), Ok(2));
468     ///     drop(r);
469     /// });
470     ///
471     /// assert_eq!(
472     ///     s.send_timeout(1, Duration::from_millis(500)),
473     ///     Err(SendTimeoutError::Timeout(1)),
474     /// );
475     /// assert_eq!(
476     ///     s.send_timeout(2, Duration::from_secs(1)),
477     ///     Ok(()),
478     /// );
479     /// assert_eq!(
480     ///     s.send_timeout(3, Duration::from_millis(500)),
481     ///     Err(SendTimeoutError::Disconnected(3)),
482     /// );
483     /// ```
send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>>484     pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
485         match Instant::now().checked_add(timeout) {
486             Some(deadline) => self.send_deadline(msg, deadline),
487             None => self.send(msg).map_err(SendTimeoutError::from),
488         }
489     }
490 
491     /// Waits for a message to be sent into the channel, but only until a given deadline.
492     ///
493     /// If the channel is full and not disconnected, this call will block until the send operation
494     /// can proceed or the operation times out. If the channel becomes disconnected, this call will
495     /// wake up and return an error. The returned error contains the original message.
496     ///
497     /// If called on a zero-capacity channel, this method will wait for a receive operation to
498     /// appear on the other side of the channel.
499     ///
500     /// # Examples
501     ///
502     /// ```
503     /// use std::thread;
504     /// use std::time::{Duration, Instant};
505     /// use crossbeam_channel::{bounded, SendTimeoutError};
506     ///
507     /// let (s, r) = bounded(0);
508     ///
509     /// thread::spawn(move || {
510     ///     thread::sleep(Duration::from_secs(1));
511     ///     assert_eq!(r.recv(), Ok(2));
512     ///     drop(r);
513     /// });
514     ///
515     /// let now = Instant::now();
516     ///
517     /// assert_eq!(
518     ///     s.send_deadline(1, now + Duration::from_millis(500)),
519     ///     Err(SendTimeoutError::Timeout(1)),
520     /// );
521     /// assert_eq!(
522     ///     s.send_deadline(2, now + Duration::from_millis(1500)),
523     ///     Ok(()),
524     /// );
525     /// assert_eq!(
526     ///     s.send_deadline(3, now + Duration::from_millis(2000)),
527     ///     Err(SendTimeoutError::Disconnected(3)),
528     /// );
529     /// ```
send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>>530     pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
531         match &self.flavor {
532             SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
533             SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
534             SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
535         }
536     }
537 
538     /// Returns `true` if the channel is empty.
539     ///
540     /// Note: Zero-capacity channels are always empty.
541     ///
542     /// # Examples
543     ///
544     /// ```
545     /// use crossbeam_channel::unbounded;
546     ///
547     /// let (s, r) = unbounded();
548     /// assert!(s.is_empty());
549     ///
550     /// s.send(0).unwrap();
551     /// assert!(!s.is_empty());
552     /// ```
is_empty(&self) -> bool553     pub fn is_empty(&self) -> bool {
554         match &self.flavor {
555             SenderFlavor::Array(chan) => chan.is_empty(),
556             SenderFlavor::List(chan) => chan.is_empty(),
557             SenderFlavor::Zero(chan) => chan.is_empty(),
558         }
559     }
560 
561     /// Returns `true` if the channel is full.
562     ///
563     /// Note: Zero-capacity channels are always full.
564     ///
565     /// # Examples
566     ///
567     /// ```
568     /// use crossbeam_channel::bounded;
569     ///
570     /// let (s, r) = bounded(1);
571     ///
572     /// assert!(!s.is_full());
573     /// s.send(0).unwrap();
574     /// assert!(s.is_full());
575     /// ```
is_full(&self) -> bool576     pub fn is_full(&self) -> bool {
577         match &self.flavor {
578             SenderFlavor::Array(chan) => chan.is_full(),
579             SenderFlavor::List(chan) => chan.is_full(),
580             SenderFlavor::Zero(chan) => chan.is_full(),
581         }
582     }
583 
584     /// Returns the number of messages in the channel.
585     ///
586     /// # Examples
587     ///
588     /// ```
589     /// use crossbeam_channel::unbounded;
590     ///
591     /// let (s, r) = unbounded();
592     /// assert_eq!(s.len(), 0);
593     ///
594     /// s.send(1).unwrap();
595     /// s.send(2).unwrap();
596     /// assert_eq!(s.len(), 2);
597     /// ```
len(&self) -> usize598     pub fn len(&self) -> usize {
599         match &self.flavor {
600             SenderFlavor::Array(chan) => chan.len(),
601             SenderFlavor::List(chan) => chan.len(),
602             SenderFlavor::Zero(chan) => chan.len(),
603         }
604     }
605 
606     /// If the channel is bounded, returns its capacity.
607     ///
608     /// # Examples
609     ///
610     /// ```
611     /// use crossbeam_channel::{bounded, unbounded};
612     ///
613     /// let (s, _) = unbounded::<i32>();
614     /// assert_eq!(s.capacity(), None);
615     ///
616     /// let (s, _) = bounded::<i32>(5);
617     /// assert_eq!(s.capacity(), Some(5));
618     ///
619     /// let (s, _) = bounded::<i32>(0);
620     /// assert_eq!(s.capacity(), Some(0));
621     /// ```
capacity(&self) -> Option<usize>622     pub fn capacity(&self) -> Option<usize> {
623         match &self.flavor {
624             SenderFlavor::Array(chan) => chan.capacity(),
625             SenderFlavor::List(chan) => chan.capacity(),
626             SenderFlavor::Zero(chan) => chan.capacity(),
627         }
628     }
629 
630     /// Returns `true` if senders belong to the same channel.
631     ///
632     /// # Examples
633     ///
634     /// ```rust
635     /// use crossbeam_channel::unbounded;
636     ///
637     /// let (s, _) = unbounded::<usize>();
638     ///
639     /// let s2 = s.clone();
640     /// assert!(s.same_channel(&s2));
641     ///
642     /// let (s3, _) = unbounded();
643     /// assert!(!s.same_channel(&s3));
644     /// ```
same_channel(&self, other: &Sender<T>) -> bool645     pub fn same_channel(&self, other: &Sender<T>) -> bool {
646         match (&self.flavor, &other.flavor) {
647             (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
648             (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
649             (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
650             _ => false,
651         }
652     }
653 }
654 
655 impl<T> Drop for Sender<T> {
drop(&mut self)656     fn drop(&mut self) {
657         unsafe {
658             match &self.flavor {
659                 SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
660                 SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
661                 SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
662             }
663         }
664     }
665 }
666 
667 impl<T> Clone for Sender<T> {
clone(&self) -> Self668     fn clone(&self) -> Self {
669         let flavor = match &self.flavor {
670             SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
671             SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
672             SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
673         };
674 
675         Sender { flavor }
676     }
677 }
678 
679 impl<T> fmt::Debug for Sender<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result680     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
681         f.pad("Sender { .. }")
682     }
683 }
684 
685 /// The receiving side of a channel.
686 ///
687 /// # Examples
688 ///
689 /// ```
690 /// use std::thread;
691 /// use std::time::Duration;
692 /// use crossbeam_channel::unbounded;
693 ///
694 /// let (s, r) = unbounded();
695 ///
696 /// thread::spawn(move || {
697 ///     let _ = s.send(1);
698 ///     thread::sleep(Duration::from_secs(1));
699 ///     let _ = s.send(2);
700 /// });
701 ///
702 /// assert_eq!(r.recv(), Ok(1)); // Received immediately.
703 /// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
704 /// ```
705 pub struct Receiver<T> {
706     flavor: ReceiverFlavor<T>,
707 }
708 
709 /// Receiver flavors.
710 enum ReceiverFlavor<T> {
711     /// Bounded channel based on a preallocated array.
712     Array(counter::Receiver<flavors::array::Channel<T>>),
713 
714     /// Unbounded channel implemented as a linked list.
715     List(counter::Receiver<flavors::list::Channel<T>>),
716 
717     /// Zero-capacity channel.
718     Zero(counter::Receiver<flavors::zero::Channel<T>>),
719 
720     /// The after flavor.
721     At(Arc<flavors::at::Channel>),
722 
723     /// The tick flavor.
724     Tick(Arc<flavors::tick::Channel>),
725 
726     /// The never flavor.
727     Never(flavors::never::Channel<T>),
728 }
729 
730 unsafe impl<T: Send> Send for Receiver<T> {}
731 unsafe impl<T: Send> Sync for Receiver<T> {}
732 
733 impl<T> UnwindSafe for Receiver<T> {}
734 impl<T> RefUnwindSafe for Receiver<T> {}
735 
736 impl<T> Receiver<T> {
737     /// Attempts to receive a message from the channel without blocking.
738     ///
739     /// This method will either receive a message from the channel immediately or return an error
740     /// if the channel is empty.
741     ///
742     /// If called on a zero-capacity channel, this method will receive a message only if there
743     /// happens to be a send operation on the other side of the channel at the same time.
744     ///
745     /// # Examples
746     ///
747     /// ```
748     /// use crossbeam_channel::{unbounded, TryRecvError};
749     ///
750     /// let (s, r) = unbounded();
751     /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
752     ///
753     /// s.send(5).unwrap();
754     /// drop(s);
755     ///
756     /// assert_eq!(r.try_recv(), Ok(5));
757     /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
758     /// ```
try_recv(&self) -> Result<T, TryRecvError>759     pub fn try_recv(&self) -> Result<T, TryRecvError> {
760         match &self.flavor {
761             ReceiverFlavor::Array(chan) => chan.try_recv(),
762             ReceiverFlavor::List(chan) => chan.try_recv(),
763             ReceiverFlavor::Zero(chan) => chan.try_recv(),
764             ReceiverFlavor::At(chan) => {
765                 let msg = chan.try_recv();
766                 unsafe {
767                     mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
768                         &msg,
769                     )
770                 }
771             }
772             ReceiverFlavor::Tick(chan) => {
773                 let msg = chan.try_recv();
774                 unsafe {
775                     mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
776                         &msg,
777                     )
778                 }
779             }
780             ReceiverFlavor::Never(chan) => chan.try_recv(),
781         }
782     }
783 
784     /// Blocks the current thread until a message is received or the channel is empty and
785     /// disconnected.
786     ///
787     /// If the channel is empty and not disconnected, this call will block until the receive
788     /// operation can proceed. If the channel is empty and becomes disconnected, this call will
789     /// wake up and return an error.
790     ///
791     /// If called on a zero-capacity channel, this method will wait for a send operation to appear
792     /// on the other side of the channel.
793     ///
794     /// # Examples
795     ///
796     /// ```
797     /// use std::thread;
798     /// use std::time::Duration;
799     /// use crossbeam_channel::{unbounded, RecvError};
800     ///
801     /// let (s, r) = unbounded();
802     ///
803     /// thread::spawn(move || {
804     ///     thread::sleep(Duration::from_secs(1));
805     ///     s.send(5).unwrap();
806     ///     drop(s);
807     /// });
808     ///
809     /// assert_eq!(r.recv(), Ok(5));
810     /// assert_eq!(r.recv(), Err(RecvError));
811     /// ```
recv(&self) -> Result<T, RecvError>812     pub fn recv(&self) -> Result<T, RecvError> {
813         match &self.flavor {
814             ReceiverFlavor::Array(chan) => chan.recv(None),
815             ReceiverFlavor::List(chan) => chan.recv(None),
816             ReceiverFlavor::Zero(chan) => chan.recv(None),
817             ReceiverFlavor::At(chan) => {
818                 let msg = chan.recv(None);
819                 unsafe {
820                     mem::transmute_copy::<
821                         Result<Instant, RecvTimeoutError>,
822                         Result<T, RecvTimeoutError>,
823                     >(&msg)
824                 }
825             }
826             ReceiverFlavor::Tick(chan) => {
827                 let msg = chan.recv(None);
828                 unsafe {
829                     mem::transmute_copy::<
830                         Result<Instant, RecvTimeoutError>,
831                         Result<T, RecvTimeoutError>,
832                     >(&msg)
833                 }
834             }
835             ReceiverFlavor::Never(chan) => chan.recv(None),
836         }
837         .map_err(|_| RecvError)
838     }
839 
840     /// Waits for a message to be received from the channel, but only for a limited time.
841     ///
842     /// If the channel is empty and not disconnected, this call will block until the receive
843     /// operation can proceed or the operation times out. If the channel is empty and becomes
844     /// disconnected, this call will wake up and return an error.
845     ///
846     /// If called on a zero-capacity channel, this method will wait for a send operation to appear
847     /// on the other side of the channel.
848     ///
849     /// # Examples
850     ///
851     /// ```
852     /// use std::thread;
853     /// use std::time::Duration;
854     /// use crossbeam_channel::{unbounded, RecvTimeoutError};
855     ///
856     /// let (s, r) = unbounded();
857     ///
858     /// thread::spawn(move || {
859     ///     thread::sleep(Duration::from_secs(1));
860     ///     s.send(5).unwrap();
861     ///     drop(s);
862     /// });
863     ///
864     /// assert_eq!(
865     ///     r.recv_timeout(Duration::from_millis(500)),
866     ///     Err(RecvTimeoutError::Timeout),
867     /// );
868     /// assert_eq!(
869     ///     r.recv_timeout(Duration::from_secs(1)),
870     ///     Ok(5),
871     /// );
872     /// assert_eq!(
873     ///     r.recv_timeout(Duration::from_secs(1)),
874     ///     Err(RecvTimeoutError::Disconnected),
875     /// );
876     /// ```
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>877     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
878         match Instant::now().checked_add(timeout) {
879             Some(deadline) => self.recv_deadline(deadline),
880             None => self.recv().map_err(RecvTimeoutError::from),
881         }
882     }
883 
884     /// Waits for a message to be received from the channel, but only before a given deadline.
885     ///
886     /// If the channel is empty and not disconnected, this call will block until the receive
887     /// operation can proceed or the operation times out. If the channel is empty and becomes
888     /// disconnected, this call will wake up and return an error.
889     ///
890     /// If called on a zero-capacity channel, this method will wait for a send operation to appear
891     /// on the other side of the channel.
892     ///
893     /// # Examples
894     ///
895     /// ```
896     /// use std::thread;
897     /// use std::time::{Instant, Duration};
898     /// use crossbeam_channel::{unbounded, RecvTimeoutError};
899     ///
900     /// let (s, r) = unbounded();
901     ///
902     /// thread::spawn(move || {
903     ///     thread::sleep(Duration::from_secs(1));
904     ///     s.send(5).unwrap();
905     ///     drop(s);
906     /// });
907     ///
908     /// let now = Instant::now();
909     ///
910     /// assert_eq!(
911     ///     r.recv_deadline(now + Duration::from_millis(500)),
912     ///     Err(RecvTimeoutError::Timeout),
913     /// );
914     /// assert_eq!(
915     ///     r.recv_deadline(now + Duration::from_millis(1500)),
916     ///     Ok(5),
917     /// );
918     /// assert_eq!(
919     ///     r.recv_deadline(now + Duration::from_secs(5)),
920     ///     Err(RecvTimeoutError::Disconnected),
921     /// );
922     /// ```
recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError>923     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
924         match &self.flavor {
925             ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
926             ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
927             ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
928             ReceiverFlavor::At(chan) => {
929                 let msg = chan.recv(Some(deadline));
930                 unsafe {
931                     mem::transmute_copy::<
932                         Result<Instant, RecvTimeoutError>,
933                         Result<T, RecvTimeoutError>,
934                     >(&msg)
935                 }
936             }
937             ReceiverFlavor::Tick(chan) => {
938                 let msg = chan.recv(Some(deadline));
939                 unsafe {
940                     mem::transmute_copy::<
941                         Result<Instant, RecvTimeoutError>,
942                         Result<T, RecvTimeoutError>,
943                     >(&msg)
944                 }
945             }
946             ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
947         }
948     }
949 
950     /// Returns `true` if the channel is empty.
951     ///
952     /// Note: Zero-capacity channels are always empty.
953     ///
954     /// # Examples
955     ///
956     /// ```
957     /// use crossbeam_channel::unbounded;
958     ///
959     /// let (s, r) = unbounded();
960     ///
961     /// assert!(r.is_empty());
962     /// s.send(0).unwrap();
963     /// assert!(!r.is_empty());
964     /// ```
is_empty(&self) -> bool965     pub fn is_empty(&self) -> bool {
966         match &self.flavor {
967             ReceiverFlavor::Array(chan) => chan.is_empty(),
968             ReceiverFlavor::List(chan) => chan.is_empty(),
969             ReceiverFlavor::Zero(chan) => chan.is_empty(),
970             ReceiverFlavor::At(chan) => chan.is_empty(),
971             ReceiverFlavor::Tick(chan) => chan.is_empty(),
972             ReceiverFlavor::Never(chan) => chan.is_empty(),
973         }
974     }
975 
976     /// Returns `true` if the channel is full.
977     ///
978     /// Note: Zero-capacity channels are always full.
979     ///
980     /// # Examples
981     ///
982     /// ```
983     /// use crossbeam_channel::bounded;
984     ///
985     /// let (s, r) = bounded(1);
986     ///
987     /// assert!(!r.is_full());
988     /// s.send(0).unwrap();
989     /// assert!(r.is_full());
990     /// ```
is_full(&self) -> bool991     pub fn is_full(&self) -> bool {
992         match &self.flavor {
993             ReceiverFlavor::Array(chan) => chan.is_full(),
994             ReceiverFlavor::List(chan) => chan.is_full(),
995             ReceiverFlavor::Zero(chan) => chan.is_full(),
996             ReceiverFlavor::At(chan) => chan.is_full(),
997             ReceiverFlavor::Tick(chan) => chan.is_full(),
998             ReceiverFlavor::Never(chan) => chan.is_full(),
999         }
1000     }
1001 
1002     /// Returns the number of messages in the channel.
1003     ///
1004     /// # Examples
1005     ///
1006     /// ```
1007     /// use crossbeam_channel::unbounded;
1008     ///
1009     /// let (s, r) = unbounded();
1010     /// assert_eq!(r.len(), 0);
1011     ///
1012     /// s.send(1).unwrap();
1013     /// s.send(2).unwrap();
1014     /// assert_eq!(r.len(), 2);
1015     /// ```
len(&self) -> usize1016     pub fn len(&self) -> usize {
1017         match &self.flavor {
1018             ReceiverFlavor::Array(chan) => chan.len(),
1019             ReceiverFlavor::List(chan) => chan.len(),
1020             ReceiverFlavor::Zero(chan) => chan.len(),
1021             ReceiverFlavor::At(chan) => chan.len(),
1022             ReceiverFlavor::Tick(chan) => chan.len(),
1023             ReceiverFlavor::Never(chan) => chan.len(),
1024         }
1025     }
1026 
1027     /// If the channel is bounded, returns its capacity.
1028     ///
1029     /// # Examples
1030     ///
1031     /// ```
1032     /// use crossbeam_channel::{bounded, unbounded};
1033     ///
1034     /// let (_, r) = unbounded::<i32>();
1035     /// assert_eq!(r.capacity(), None);
1036     ///
1037     /// let (_, r) = bounded::<i32>(5);
1038     /// assert_eq!(r.capacity(), Some(5));
1039     ///
1040     /// let (_, r) = bounded::<i32>(0);
1041     /// assert_eq!(r.capacity(), Some(0));
1042     /// ```
capacity(&self) -> Option<usize>1043     pub fn capacity(&self) -> Option<usize> {
1044         match &self.flavor {
1045             ReceiverFlavor::Array(chan) => chan.capacity(),
1046             ReceiverFlavor::List(chan) => chan.capacity(),
1047             ReceiverFlavor::Zero(chan) => chan.capacity(),
1048             ReceiverFlavor::At(chan) => chan.capacity(),
1049             ReceiverFlavor::Tick(chan) => chan.capacity(),
1050             ReceiverFlavor::Never(chan) => chan.capacity(),
1051         }
1052     }
1053 
1054     /// A blocking iterator over messages in the channel.
1055     ///
1056     /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
1057     /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
1058     ///
1059     /// [`next`]: Iterator::next
1060     ///
1061     /// # Examples
1062     ///
1063     /// ```
1064     /// use std::thread;
1065     /// use crossbeam_channel::unbounded;
1066     ///
1067     /// let (s, r) = unbounded();
1068     ///
1069     /// thread::spawn(move || {
1070     ///     s.send(1).unwrap();
1071     ///     s.send(2).unwrap();
1072     ///     s.send(3).unwrap();
1073     ///     drop(s); // Disconnect the channel.
1074     /// });
1075     ///
1076     /// // Collect all messages from the channel.
1077     /// // Note that the call to `collect` blocks until the sender is dropped.
1078     /// let v: Vec<_> = r.iter().collect();
1079     ///
1080     /// assert_eq!(v, [1, 2, 3]);
1081     /// ```
iter(&self) -> Iter<'_, T>1082     pub fn iter(&self) -> Iter<'_, T> {
1083         Iter { receiver: self }
1084     }
1085 
1086     /// A non-blocking iterator over messages in the channel.
1087     ///
1088     /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1089     /// never blocks waiting for the next message.
1090     ///
1091     /// [`next`]: Iterator::next
1092     ///
1093     /// # Examples
1094     ///
1095     /// ```
1096     /// use std::thread;
1097     /// use std::time::Duration;
1098     /// use crossbeam_channel::unbounded;
1099     ///
1100     /// let (s, r) = unbounded::<i32>();
1101     ///
1102     /// thread::spawn(move || {
1103     ///     s.send(1).unwrap();
1104     ///     thread::sleep(Duration::from_secs(1));
1105     ///     s.send(2).unwrap();
1106     ///     thread::sleep(Duration::from_secs(2));
1107     ///     s.send(3).unwrap();
1108     /// });
1109     ///
1110     /// thread::sleep(Duration::from_secs(2));
1111     ///
1112     /// // Collect all messages from the channel without blocking.
1113     /// // The third message hasn't been sent yet so we'll collect only the first two.
1114     /// let v: Vec<_> = r.try_iter().collect();
1115     ///
1116     /// assert_eq!(v, [1, 2]);
1117     /// ```
try_iter(&self) -> TryIter<'_, T>1118     pub fn try_iter(&self) -> TryIter<'_, T> {
1119         TryIter { receiver: self }
1120     }
1121 
1122     /// Returns `true` if receivers belong to the same channel.
1123     ///
1124     /// # Examples
1125     ///
1126     /// ```rust
1127     /// use crossbeam_channel::unbounded;
1128     ///
1129     /// let (_, r) = unbounded::<usize>();
1130     ///
1131     /// let r2 = r.clone();
1132     /// assert!(r.same_channel(&r2));
1133     ///
1134     /// let (_, r3) = unbounded();
1135     /// assert!(!r.same_channel(&r3));
1136     /// ```
same_channel(&self, other: &Receiver<T>) -> bool1137     pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1138         match (&self.flavor, &other.flavor) {
1139             (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1140             (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1141             (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1142             (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
1143             (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
1144             (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
1145             _ => false,
1146         }
1147     }
1148 }
1149 
1150 impl<T> Drop for Receiver<T> {
drop(&mut self)1151     fn drop(&mut self) {
1152         unsafe {
1153             match &self.flavor {
1154                 ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
1155                 ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1156                 ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1157                 ReceiverFlavor::At(_) => {}
1158                 ReceiverFlavor::Tick(_) => {}
1159                 ReceiverFlavor::Never(_) => {}
1160             }
1161         }
1162     }
1163 }
1164 
1165 impl<T> Clone for Receiver<T> {
clone(&self) -> Self1166     fn clone(&self) -> Self {
1167         let flavor = match &self.flavor {
1168             ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1169             ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1170             ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1171             ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
1172             ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
1173             ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
1174         };
1175 
1176         Receiver { flavor }
1177     }
1178 }
1179 
1180 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1181     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1182         f.pad("Receiver { .. }")
1183     }
1184 }
1185 
1186 impl<'a, T> IntoIterator for &'a Receiver<T> {
1187     type Item = T;
1188     type IntoIter = Iter<'a, T>;
1189 
into_iter(self) -> Self::IntoIter1190     fn into_iter(self) -> Self::IntoIter {
1191         self.iter()
1192     }
1193 }
1194 
1195 impl<T> IntoIterator for Receiver<T> {
1196     type Item = T;
1197     type IntoIter = IntoIter<T>;
1198 
into_iter(self) -> Self::IntoIter1199     fn into_iter(self) -> Self::IntoIter {
1200         IntoIter { receiver: self }
1201     }
1202 }
1203 
1204 /// A blocking iterator over messages in a channel.
1205 ///
1206 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1207 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1208 ///
1209 /// [`next`]: Iterator::next
1210 ///
1211 /// # Examples
1212 ///
1213 /// ```
1214 /// use std::thread;
1215 /// use crossbeam_channel::unbounded;
1216 ///
1217 /// let (s, r) = unbounded();
1218 ///
1219 /// thread::spawn(move || {
1220 ///     s.send(1).unwrap();
1221 ///     s.send(2).unwrap();
1222 ///     s.send(3).unwrap();
1223 ///     drop(s); // Disconnect the channel.
1224 /// });
1225 ///
1226 /// // Collect all messages from the channel.
1227 /// // Note that the call to `collect` blocks until the sender is dropped.
1228 /// let v: Vec<_> = r.iter().collect();
1229 ///
1230 /// assert_eq!(v, [1, 2, 3]);
1231 /// ```
1232 pub struct Iter<'a, T> {
1233     receiver: &'a Receiver<T>,
1234 }
1235 
1236 impl<T> FusedIterator for Iter<'_, T> {}
1237 
1238 impl<T> Iterator for Iter<'_, T> {
1239     type Item = T;
1240 
next(&mut self) -> Option<Self::Item>1241     fn next(&mut self) -> Option<Self::Item> {
1242         self.receiver.recv().ok()
1243     }
1244 }
1245 
1246 impl<T> fmt::Debug for Iter<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1247     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1248         f.pad("Iter { .. }")
1249     }
1250 }
1251 
1252 /// A non-blocking iterator over messages in a channel.
1253 ///
1254 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1255 /// never blocks waiting for the next message.
1256 ///
1257 /// [`next`]: Iterator::next
1258 ///
1259 /// # Examples
1260 ///
1261 /// ```
1262 /// use std::thread;
1263 /// use std::time::Duration;
1264 /// use crossbeam_channel::unbounded;
1265 ///
1266 /// let (s, r) = unbounded::<i32>();
1267 ///
1268 /// thread::spawn(move || {
1269 ///     s.send(1).unwrap();
1270 ///     thread::sleep(Duration::from_secs(1));
1271 ///     s.send(2).unwrap();
1272 ///     thread::sleep(Duration::from_secs(2));
1273 ///     s.send(3).unwrap();
1274 /// });
1275 ///
1276 /// thread::sleep(Duration::from_secs(2));
1277 ///
1278 /// // Collect all messages from the channel without blocking.
1279 /// // The third message hasn't been sent yet so we'll collect only the first two.
1280 /// let v: Vec<_> = r.try_iter().collect();
1281 ///
1282 /// assert_eq!(v, [1, 2]);
1283 /// ```
1284 pub struct TryIter<'a, T> {
1285     receiver: &'a Receiver<T>,
1286 }
1287 
1288 impl<T> Iterator for TryIter<'_, T> {
1289     type Item = T;
1290 
next(&mut self) -> Option<Self::Item>1291     fn next(&mut self) -> Option<Self::Item> {
1292         self.receiver.try_recv().ok()
1293     }
1294 }
1295 
1296 impl<T> fmt::Debug for TryIter<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1297     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1298         f.pad("TryIter { .. }")
1299     }
1300 }
1301 
1302 /// A blocking iterator over messages in a channel.
1303 ///
1304 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1305 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1306 ///
1307 /// [`next`]: Iterator::next
1308 ///
1309 /// # Examples
1310 ///
1311 /// ```
1312 /// use std::thread;
1313 /// use crossbeam_channel::unbounded;
1314 ///
1315 /// let (s, r) = unbounded();
1316 ///
1317 /// thread::spawn(move || {
1318 ///     s.send(1).unwrap();
1319 ///     s.send(2).unwrap();
1320 ///     s.send(3).unwrap();
1321 ///     drop(s); // Disconnect the channel.
1322 /// });
1323 ///
1324 /// // Collect all messages from the channel.
1325 /// // Note that the call to `collect` blocks until the sender is dropped.
1326 /// let v: Vec<_> = r.into_iter().collect();
1327 ///
1328 /// assert_eq!(v, [1, 2, 3]);
1329 /// ```
1330 pub struct IntoIter<T> {
1331     receiver: Receiver<T>,
1332 }
1333 
1334 impl<T> FusedIterator for IntoIter<T> {}
1335 
1336 impl<T> Iterator for IntoIter<T> {
1337     type Item = T;
1338 
next(&mut self) -> Option<Self::Item>1339     fn next(&mut self) -> Option<Self::Item> {
1340         self.receiver.recv().ok()
1341     }
1342 }
1343 
1344 impl<T> fmt::Debug for IntoIter<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1345     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1346         f.pad("IntoIter { .. }")
1347     }
1348 }
1349 
1350 impl<T> SelectHandle for Sender<T> {
try_select(&self, token: &mut Token) -> bool1351     fn try_select(&self, token: &mut Token) -> bool {
1352         match &self.flavor {
1353             SenderFlavor::Array(chan) => chan.sender().try_select(token),
1354             SenderFlavor::List(chan) => chan.sender().try_select(token),
1355             SenderFlavor::Zero(chan) => chan.sender().try_select(token),
1356         }
1357     }
1358 
deadline(&self) -> Option<Instant>1359     fn deadline(&self) -> Option<Instant> {
1360         None
1361     }
1362 
register(&self, oper: Operation, cx: &Context) -> bool1363     fn register(&self, oper: Operation, cx: &Context) -> bool {
1364         match &self.flavor {
1365             SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1366             SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1367             SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1368         }
1369     }
1370 
unregister(&self, oper: Operation)1371     fn unregister(&self, oper: Operation) {
1372         match &self.flavor {
1373             SenderFlavor::Array(chan) => chan.sender().unregister(oper),
1374             SenderFlavor::List(chan) => chan.sender().unregister(oper),
1375             SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
1376         }
1377     }
1378 
accept(&self, token: &mut Token, cx: &Context) -> bool1379     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1380         match &self.flavor {
1381             SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
1382             SenderFlavor::List(chan) => chan.sender().accept(token, cx),
1383             SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
1384         }
1385     }
1386 
is_ready(&self) -> bool1387     fn is_ready(&self) -> bool {
1388         match &self.flavor {
1389             SenderFlavor::Array(chan) => chan.sender().is_ready(),
1390             SenderFlavor::List(chan) => chan.sender().is_ready(),
1391             SenderFlavor::Zero(chan) => chan.sender().is_ready(),
1392         }
1393     }
1394 
watch(&self, oper: Operation, cx: &Context) -> bool1395     fn watch(&self, oper: Operation, cx: &Context) -> bool {
1396         match &self.flavor {
1397             SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
1398             SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
1399             SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
1400         }
1401     }
1402 
unwatch(&self, oper: Operation)1403     fn unwatch(&self, oper: Operation) {
1404         match &self.flavor {
1405             SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
1406             SenderFlavor::List(chan) => chan.sender().unwatch(oper),
1407             SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
1408         }
1409     }
1410 }
1411 
1412 impl<T> SelectHandle for Receiver<T> {
try_select(&self, token: &mut Token) -> bool1413     fn try_select(&self, token: &mut Token) -> bool {
1414         match &self.flavor {
1415             ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
1416             ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
1417             ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
1418             ReceiverFlavor::At(chan) => chan.try_select(token),
1419             ReceiverFlavor::Tick(chan) => chan.try_select(token),
1420             ReceiverFlavor::Never(chan) => chan.try_select(token),
1421         }
1422     }
1423 
deadline(&self) -> Option<Instant>1424     fn deadline(&self) -> Option<Instant> {
1425         match &self.flavor {
1426             ReceiverFlavor::Array(_) => None,
1427             ReceiverFlavor::List(_) => None,
1428             ReceiverFlavor::Zero(_) => None,
1429             ReceiverFlavor::At(chan) => chan.deadline(),
1430             ReceiverFlavor::Tick(chan) => chan.deadline(),
1431             ReceiverFlavor::Never(chan) => chan.deadline(),
1432         }
1433     }
1434 
register(&self, oper: Operation, cx: &Context) -> bool1435     fn register(&self, oper: Operation, cx: &Context) -> bool {
1436         match &self.flavor {
1437             ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1438             ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1439             ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1440             ReceiverFlavor::At(chan) => chan.register(oper, cx),
1441             ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1442             ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1443         }
1444     }
1445 
unregister(&self, oper: Operation)1446     fn unregister(&self, oper: Operation) {
1447         match &self.flavor {
1448             ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
1449             ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
1450             ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
1451             ReceiverFlavor::At(chan) => chan.unregister(oper),
1452             ReceiverFlavor::Tick(chan) => chan.unregister(oper),
1453             ReceiverFlavor::Never(chan) => chan.unregister(oper),
1454         }
1455     }
1456 
accept(&self, token: &mut Token, cx: &Context) -> bool1457     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1458         match &self.flavor {
1459             ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
1460             ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
1461             ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
1462             ReceiverFlavor::At(chan) => chan.accept(token, cx),
1463             ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
1464             ReceiverFlavor::Never(chan) => chan.accept(token, cx),
1465         }
1466     }
1467 
is_ready(&self) -> bool1468     fn is_ready(&self) -> bool {
1469         match &self.flavor {
1470             ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
1471             ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
1472             ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
1473             ReceiverFlavor::At(chan) => chan.is_ready(),
1474             ReceiverFlavor::Tick(chan) => chan.is_ready(),
1475             ReceiverFlavor::Never(chan) => chan.is_ready(),
1476         }
1477     }
1478 
watch(&self, oper: Operation, cx: &Context) -> bool1479     fn watch(&self, oper: Operation, cx: &Context) -> bool {
1480         match &self.flavor {
1481             ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
1482             ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
1483             ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
1484             ReceiverFlavor::At(chan) => chan.watch(oper, cx),
1485             ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
1486             ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
1487         }
1488     }
1489 
unwatch(&self, oper: Operation)1490     fn unwatch(&self, oper: Operation) {
1491         match &self.flavor {
1492             ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
1493             ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
1494             ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
1495             ReceiverFlavor::At(chan) => chan.unwatch(oper),
1496             ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
1497             ReceiverFlavor::Never(chan) => chan.unwatch(oper),
1498         }
1499     }
1500 }
1501 
1502 /// Writes a message into the channel.
write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T>1503 pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
1504     match &s.flavor {
1505         SenderFlavor::Array(chan) => chan.write(token, msg),
1506         SenderFlavor::List(chan) => chan.write(token, msg),
1507         SenderFlavor::Zero(chan) => chan.write(token, msg),
1508     }
1509 }
1510 
1511 /// Reads a message from the channel.
read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()>1512 pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
1513     match &r.flavor {
1514         ReceiverFlavor::Array(chan) => chan.read(token),
1515         ReceiverFlavor::List(chan) => chan.read(token),
1516         ReceiverFlavor::Zero(chan) => chan.read(token),
1517         ReceiverFlavor::At(chan) => {
1518             mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1519         }
1520         ReceiverFlavor::Tick(chan) => {
1521             mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1522         }
1523         ReceiverFlavor::Never(chan) => chan.read(token),
1524     }
1525 }
1526