1 //! The channel interface.
2
3 use std::fmt;
4 use std::iter::FusedIterator;
5 use std::mem;
6 use std::panic::{RefUnwindSafe, UnwindSafe};
7 use std::sync::Arc;
8 use std::time::{Duration, Instant};
9
10 use crate::context::Context;
11 use crate::counter;
12 use crate::err::{
13 RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
14 };
15 use crate::flavors;
16 use crate::select::{Operation, SelectHandle, Token};
17
18 /// Creates a channel of unbounded capacity.
19 ///
20 /// This channel has a growable buffer that can hold any number of messages at a time.
21 ///
22 /// # Examples
23 ///
24 /// ```
25 /// use std::thread;
26 /// use crossbeam_channel::unbounded;
27 ///
28 /// let (s, r) = unbounded();
29 ///
30 /// // Computes the n-th Fibonacci number.
31 /// fn fib(n: i32) -> i32 {
32 /// if n <= 1 {
33 /// n
34 /// } else {
35 /// fib(n - 1) + fib(n - 2)
36 /// }
37 /// }
38 ///
39 /// // Spawn an asynchronous computation.
40 /// thread::spawn(move || s.send(fib(20)).unwrap());
41 ///
42 /// // Print the result of the computation.
43 /// println!("{}", r.recv().unwrap());
44 /// ```
unbounded<T>() -> (Sender<T>, Receiver<T>)45 pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
46 let (s, r) = counter::new(flavors::list::Channel::new());
47 let s = Sender {
48 flavor: SenderFlavor::List(s),
49 };
50 let r = Receiver {
51 flavor: ReceiverFlavor::List(r),
52 };
53 (s, r)
54 }
55
56 /// Creates a channel of bounded capacity.
57 ///
58 /// This channel has a buffer that can hold at most `cap` messages at a time.
59 ///
60 /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
61 /// receive operations must appear at the same time in order to pair up and pass the message over.
62 ///
63 /// # Examples
64 ///
65 /// A channel of capacity 1:
66 ///
67 /// ```
68 /// use std::thread;
69 /// use std::time::Duration;
70 /// use crossbeam_channel::bounded;
71 ///
72 /// let (s, r) = bounded(1);
73 ///
74 /// // This call returns immediately because there is enough space in the channel.
75 /// s.send(1).unwrap();
76 ///
77 /// thread::spawn(move || {
78 /// // This call blocks the current thread because the channel is full.
79 /// // It will be able to complete only after the first message is received.
80 /// s.send(2).unwrap();
81 /// });
82 ///
83 /// thread::sleep(Duration::from_secs(1));
84 /// assert_eq!(r.recv(), Ok(1));
85 /// assert_eq!(r.recv(), Ok(2));
86 /// ```
87 ///
88 /// A zero-capacity channel:
89 ///
90 /// ```
91 /// use std::thread;
92 /// use std::time::Duration;
93 /// use crossbeam_channel::bounded;
94 ///
95 /// let (s, r) = bounded(0);
96 ///
97 /// thread::spawn(move || {
98 /// // This call blocks the current thread until a receive operation appears
99 /// // on the other side of the channel.
100 /// s.send(1).unwrap();
101 /// });
102 ///
103 /// thread::sleep(Duration::from_secs(1));
104 /// assert_eq!(r.recv(), Ok(1));
105 /// ```
bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>)106 pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
107 if cap == 0 {
108 let (s, r) = counter::new(flavors::zero::Channel::new());
109 let s = Sender {
110 flavor: SenderFlavor::Zero(s),
111 };
112 let r = Receiver {
113 flavor: ReceiverFlavor::Zero(r),
114 };
115 (s, r)
116 } else {
117 let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
118 let s = Sender {
119 flavor: SenderFlavor::Array(s),
120 };
121 let r = Receiver {
122 flavor: ReceiverFlavor::Array(r),
123 };
124 (s, r)
125 }
126 }
127
128 /// Creates a receiver that delivers a message after a certain duration of time.
129 ///
130 /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
131 /// be sent into the channel after `duration` elapses. The message is the instant at which it is
132 /// sent.
133 ///
134 /// # Examples
135 ///
136 /// Using an `after` channel for timeouts:
137 ///
138 /// ```
139 /// use std::time::Duration;
140 /// use crossbeam_channel::{after, select, unbounded};
141 ///
142 /// let (s, r) = unbounded::<i32>();
143 /// let timeout = Duration::from_millis(100);
144 ///
145 /// select! {
146 /// recv(r) -> msg => println!("received {:?}", msg),
147 /// recv(after(timeout)) -> _ => println!("timed out"),
148 /// }
149 /// ```
150 ///
151 /// When the message gets sent:
152 ///
153 /// ```
154 /// use std::thread;
155 /// use std::time::{Duration, Instant};
156 /// use crossbeam_channel::after;
157 ///
158 /// // Converts a number of milliseconds into a `Duration`.
159 /// let ms = |ms| Duration::from_millis(ms);
160 ///
161 /// // Returns `true` if `a` and `b` are very close `Instant`s.
162 /// let eq = |a, b| a + ms(60) > b && b + ms(60) > a;
163 ///
164 /// let start = Instant::now();
165 /// let r = after(ms(100));
166 ///
167 /// thread::sleep(ms(500));
168 ///
169 /// // This message was sent 100 ms from the start and received 500 ms from the start.
170 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
171 /// assert!(eq(Instant::now(), start + ms(500)));
172 /// ```
after(duration: Duration) -> Receiver<Instant>173 pub fn after(duration: Duration) -> Receiver<Instant> {
174 match Instant::now().checked_add(duration) {
175 Some(deadline) => Receiver {
176 flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))),
177 },
178 None => never(),
179 }
180 }
181
182 /// Creates a receiver that delivers a message at a certain instant in time.
183 ///
184 /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
185 /// be sent into the channel at the moment in time `when`. The message is the instant at which it
186 /// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
187 /// instantly to the receiver.
188 ///
189 /// # Examples
190 ///
191 /// Using an `at` channel for timeouts:
192 ///
193 /// ```
194 /// use std::time::{Instant, Duration};
195 /// use crossbeam_channel::{at, select, unbounded};
196 ///
197 /// let (s, r) = unbounded::<i32>();
198 /// let deadline = Instant::now() + Duration::from_millis(500);
199 ///
200 /// select! {
201 /// recv(r) -> msg => println!("received {:?}", msg),
202 /// recv(at(deadline)) -> _ => println!("timed out"),
203 /// }
204 /// ```
205 ///
206 /// When the message gets sent:
207 ///
208 /// ```
209 /// use std::time::{Duration, Instant};
210 /// use crossbeam_channel::at;
211 ///
212 /// // Converts a number of milliseconds into a `Duration`.
213 /// let ms = |ms| Duration::from_millis(ms);
214 ///
215 /// let start = Instant::now();
216 /// let end = start + ms(100);
217 ///
218 /// let r = at(end);
219 ///
220 /// // This message was sent 100 ms from the start
221 /// assert_eq!(r.recv().unwrap(), end);
222 /// assert!(Instant::now() > start + ms(100));
223 /// ```
at(when: Instant) -> Receiver<Instant>224 pub fn at(when: Instant) -> Receiver<Instant> {
225 Receiver {
226 flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
227 }
228 }
229
230 /// Creates a receiver that never delivers messages.
231 ///
232 /// The channel is bounded with capacity of 0 and never gets disconnected.
233 ///
234 /// # Examples
235 ///
236 /// Using a `never` channel to optionally add a timeout to [`select!`]:
237 ///
238 /// [`select!`]: crate::select!
239 ///
240 /// ```
241 /// use std::thread;
242 /// use std::time::Duration;
243 /// use crossbeam_channel::{after, select, never, unbounded};
244 ///
245 /// let (s, r) = unbounded();
246 ///
247 /// thread::spawn(move || {
248 /// thread::sleep(Duration::from_secs(1));
249 /// s.send(1).unwrap();
250 /// });
251 ///
252 /// // Suppose this duration can be a `Some` or a `None`.
253 /// let duration = Some(Duration::from_millis(100));
254 ///
255 /// // Create a channel that times out after the specified duration.
256 /// let timeout = duration
257 /// .map(|d| after(d))
258 /// .unwrap_or(never());
259 ///
260 /// select! {
261 /// recv(r) -> msg => assert_eq!(msg, Ok(1)),
262 /// recv(timeout) -> _ => println!("timed out"),
263 /// }
264 /// ```
never<T>() -> Receiver<T>265 pub fn never<T>() -> Receiver<T> {
266 Receiver {
267 flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
268 }
269 }
270
271 /// Creates a receiver that delivers messages periodically.
272 ///
273 /// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
274 /// sent into the channel in intervals of `duration`. Each message is the instant at which it is
275 /// sent.
276 ///
277 /// # Examples
278 ///
279 /// Using a `tick` channel to periodically print elapsed time:
280 ///
281 /// ```
282 /// use std::time::{Duration, Instant};
283 /// use crossbeam_channel::tick;
284 ///
285 /// let start = Instant::now();
286 /// let ticker = tick(Duration::from_millis(100));
287 ///
288 /// for _ in 0..5 {
289 /// ticker.recv().unwrap();
290 /// println!("elapsed: {:?}", start.elapsed());
291 /// }
292 /// ```
293 ///
294 /// When messages get sent:
295 ///
296 /// ```
297 /// use std::thread;
298 /// use std::time::{Duration, Instant};
299 /// use crossbeam_channel::tick;
300 ///
301 /// // Converts a number of milliseconds into a `Duration`.
302 /// let ms = |ms| Duration::from_millis(ms);
303 ///
304 /// // Returns `true` if `a` and `b` are very close `Instant`s.
305 /// let eq = |a, b| a + ms(65) > b && b + ms(65) > a;
306 ///
307 /// let start = Instant::now();
308 /// let r = tick(ms(100));
309 ///
310 /// // This message was sent 100 ms from the start and received 100 ms from the start.
311 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
312 /// assert!(eq(Instant::now(), start + ms(100)));
313 ///
314 /// thread::sleep(ms(500));
315 ///
316 /// // This message was sent 200 ms from the start and received 600 ms from the start.
317 /// assert!(eq(r.recv().unwrap(), start + ms(200)));
318 /// assert!(eq(Instant::now(), start + ms(600)));
319 ///
320 /// // This message was sent 700 ms from the start and received 700 ms from the start.
321 /// assert!(eq(r.recv().unwrap(), start + ms(700)));
322 /// assert!(eq(Instant::now(), start + ms(700)));
323 /// ```
tick(duration: Duration) -> Receiver<Instant>324 pub fn tick(duration: Duration) -> Receiver<Instant> {
325 match Instant::now().checked_add(duration) {
326 Some(delivery_time) => Receiver {
327 flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(
328 delivery_time,
329 duration,
330 ))),
331 },
332 None => never(),
333 }
334 }
335
336 /// The sending side of a channel.
337 ///
338 /// # Examples
339 ///
340 /// ```
341 /// use std::thread;
342 /// use crossbeam_channel::unbounded;
343 ///
344 /// let (s1, r) = unbounded();
345 /// let s2 = s1.clone();
346 ///
347 /// thread::spawn(move || s1.send(1).unwrap());
348 /// thread::spawn(move || s2.send(2).unwrap());
349 ///
350 /// let msg1 = r.recv().unwrap();
351 /// let msg2 = r.recv().unwrap();
352 ///
353 /// assert_eq!(msg1 + msg2, 3);
354 /// ```
355 pub struct Sender<T> {
356 flavor: SenderFlavor<T>,
357 }
358
359 /// Sender flavors.
360 enum SenderFlavor<T> {
361 /// Bounded channel based on a preallocated array.
362 Array(counter::Sender<flavors::array::Channel<T>>),
363
364 /// Unbounded channel implemented as a linked list.
365 List(counter::Sender<flavors::list::Channel<T>>),
366
367 /// Zero-capacity channel.
368 Zero(counter::Sender<flavors::zero::Channel<T>>),
369 }
370
371 unsafe impl<T: Send> Send for Sender<T> {}
372 unsafe impl<T: Send> Sync for Sender<T> {}
373
374 impl<T> UnwindSafe for Sender<T> {}
375 impl<T> RefUnwindSafe for Sender<T> {}
376
377 impl<T> Sender<T> {
378 /// Attempts to send a message into the channel without blocking.
379 ///
380 /// This method will either send a message into the channel immediately or return an error if
381 /// the channel is full or disconnected. The returned error contains the original message.
382 ///
383 /// If called on a zero-capacity channel, this method will send the message only if there
384 /// happens to be a receive operation on the other side of the channel at the same time.
385 ///
386 /// # Examples
387 ///
388 /// ```
389 /// use crossbeam_channel::{bounded, TrySendError};
390 ///
391 /// let (s, r) = bounded(1);
392 ///
393 /// assert_eq!(s.try_send(1), Ok(()));
394 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
395 ///
396 /// drop(r);
397 /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
398 /// ```
try_send(&self, msg: T) -> Result<(), TrySendError<T>>399 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
400 match &self.flavor {
401 SenderFlavor::Array(chan) => chan.try_send(msg),
402 SenderFlavor::List(chan) => chan.try_send(msg),
403 SenderFlavor::Zero(chan) => chan.try_send(msg),
404 }
405 }
406
407 /// Blocks the current thread until a message is sent or the channel is disconnected.
408 ///
409 /// If the channel is full and not disconnected, this call will block until the send operation
410 /// can proceed. If the channel becomes disconnected, this call will wake up and return an
411 /// error. The returned error contains the original message.
412 ///
413 /// If called on a zero-capacity channel, this method will wait for a receive operation to
414 /// appear on the other side of the channel.
415 ///
416 /// # Examples
417 ///
418 /// ```
419 /// use std::thread;
420 /// use std::time::Duration;
421 /// use crossbeam_channel::{bounded, SendError};
422 ///
423 /// let (s, r) = bounded(1);
424 /// assert_eq!(s.send(1), Ok(()));
425 ///
426 /// thread::spawn(move || {
427 /// assert_eq!(r.recv(), Ok(1));
428 /// thread::sleep(Duration::from_secs(1));
429 /// drop(r);
430 /// });
431 ///
432 /// assert_eq!(s.send(2), Ok(()));
433 /// assert_eq!(s.send(3), Err(SendError(3)));
434 /// ```
send(&self, msg: T) -> Result<(), SendError<T>>435 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
436 match &self.flavor {
437 SenderFlavor::Array(chan) => chan.send(msg, None),
438 SenderFlavor::List(chan) => chan.send(msg, None),
439 SenderFlavor::Zero(chan) => chan.send(msg, None),
440 }
441 .map_err(|err| match err {
442 SendTimeoutError::Disconnected(msg) => SendError(msg),
443 SendTimeoutError::Timeout(_) => unreachable!(),
444 })
445 }
446
447 /// Waits for a message to be sent into the channel, but only for a limited time.
448 ///
449 /// If the channel is full and not disconnected, this call will block until the send operation
450 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
451 /// wake up and return an error. The returned error contains the original message.
452 ///
453 /// If called on a zero-capacity channel, this method will wait for a receive operation to
454 /// appear on the other side of the channel.
455 ///
456 /// # Examples
457 ///
458 /// ```
459 /// use std::thread;
460 /// use std::time::Duration;
461 /// use crossbeam_channel::{bounded, SendTimeoutError};
462 ///
463 /// let (s, r) = bounded(0);
464 ///
465 /// thread::spawn(move || {
466 /// thread::sleep(Duration::from_secs(1));
467 /// assert_eq!(r.recv(), Ok(2));
468 /// drop(r);
469 /// });
470 ///
471 /// assert_eq!(
472 /// s.send_timeout(1, Duration::from_millis(500)),
473 /// Err(SendTimeoutError::Timeout(1)),
474 /// );
475 /// assert_eq!(
476 /// s.send_timeout(2, Duration::from_secs(1)),
477 /// Ok(()),
478 /// );
479 /// assert_eq!(
480 /// s.send_timeout(3, Duration::from_millis(500)),
481 /// Err(SendTimeoutError::Disconnected(3)),
482 /// );
483 /// ```
send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>>484 pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
485 match Instant::now().checked_add(timeout) {
486 Some(deadline) => self.send_deadline(msg, deadline),
487 None => self.send(msg).map_err(SendTimeoutError::from),
488 }
489 }
490
491 /// Waits for a message to be sent into the channel, but only until a given deadline.
492 ///
493 /// If the channel is full and not disconnected, this call will block until the send operation
494 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
495 /// wake up and return an error. The returned error contains the original message.
496 ///
497 /// If called on a zero-capacity channel, this method will wait for a receive operation to
498 /// appear on the other side of the channel.
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// use std::thread;
504 /// use std::time::{Duration, Instant};
505 /// use crossbeam_channel::{bounded, SendTimeoutError};
506 ///
507 /// let (s, r) = bounded(0);
508 ///
509 /// thread::spawn(move || {
510 /// thread::sleep(Duration::from_secs(1));
511 /// assert_eq!(r.recv(), Ok(2));
512 /// drop(r);
513 /// });
514 ///
515 /// let now = Instant::now();
516 ///
517 /// assert_eq!(
518 /// s.send_deadline(1, now + Duration::from_millis(500)),
519 /// Err(SendTimeoutError::Timeout(1)),
520 /// );
521 /// assert_eq!(
522 /// s.send_deadline(2, now + Duration::from_millis(1500)),
523 /// Ok(()),
524 /// );
525 /// assert_eq!(
526 /// s.send_deadline(3, now + Duration::from_millis(2000)),
527 /// Err(SendTimeoutError::Disconnected(3)),
528 /// );
529 /// ```
send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>>530 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
531 match &self.flavor {
532 SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
533 SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
534 SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
535 }
536 }
537
538 /// Returns `true` if the channel is empty.
539 ///
540 /// Note: Zero-capacity channels are always empty.
541 ///
542 /// # Examples
543 ///
544 /// ```
545 /// use crossbeam_channel::unbounded;
546 ///
547 /// let (s, r) = unbounded();
548 /// assert!(s.is_empty());
549 ///
550 /// s.send(0).unwrap();
551 /// assert!(!s.is_empty());
552 /// ```
is_empty(&self) -> bool553 pub fn is_empty(&self) -> bool {
554 match &self.flavor {
555 SenderFlavor::Array(chan) => chan.is_empty(),
556 SenderFlavor::List(chan) => chan.is_empty(),
557 SenderFlavor::Zero(chan) => chan.is_empty(),
558 }
559 }
560
561 /// Returns `true` if the channel is full.
562 ///
563 /// Note: Zero-capacity channels are always full.
564 ///
565 /// # Examples
566 ///
567 /// ```
568 /// use crossbeam_channel::bounded;
569 ///
570 /// let (s, r) = bounded(1);
571 ///
572 /// assert!(!s.is_full());
573 /// s.send(0).unwrap();
574 /// assert!(s.is_full());
575 /// ```
is_full(&self) -> bool576 pub fn is_full(&self) -> bool {
577 match &self.flavor {
578 SenderFlavor::Array(chan) => chan.is_full(),
579 SenderFlavor::List(chan) => chan.is_full(),
580 SenderFlavor::Zero(chan) => chan.is_full(),
581 }
582 }
583
584 /// Returns the number of messages in the channel.
585 ///
586 /// # Examples
587 ///
588 /// ```
589 /// use crossbeam_channel::unbounded;
590 ///
591 /// let (s, r) = unbounded();
592 /// assert_eq!(s.len(), 0);
593 ///
594 /// s.send(1).unwrap();
595 /// s.send(2).unwrap();
596 /// assert_eq!(s.len(), 2);
597 /// ```
len(&self) -> usize598 pub fn len(&self) -> usize {
599 match &self.flavor {
600 SenderFlavor::Array(chan) => chan.len(),
601 SenderFlavor::List(chan) => chan.len(),
602 SenderFlavor::Zero(chan) => chan.len(),
603 }
604 }
605
606 /// If the channel is bounded, returns its capacity.
607 ///
608 /// # Examples
609 ///
610 /// ```
611 /// use crossbeam_channel::{bounded, unbounded};
612 ///
613 /// let (s, _) = unbounded::<i32>();
614 /// assert_eq!(s.capacity(), None);
615 ///
616 /// let (s, _) = bounded::<i32>(5);
617 /// assert_eq!(s.capacity(), Some(5));
618 ///
619 /// let (s, _) = bounded::<i32>(0);
620 /// assert_eq!(s.capacity(), Some(0));
621 /// ```
capacity(&self) -> Option<usize>622 pub fn capacity(&self) -> Option<usize> {
623 match &self.flavor {
624 SenderFlavor::Array(chan) => chan.capacity(),
625 SenderFlavor::List(chan) => chan.capacity(),
626 SenderFlavor::Zero(chan) => chan.capacity(),
627 }
628 }
629
630 /// Returns `true` if senders belong to the same channel.
631 ///
632 /// # Examples
633 ///
634 /// ```rust
635 /// use crossbeam_channel::unbounded;
636 ///
637 /// let (s, _) = unbounded::<usize>();
638 ///
639 /// let s2 = s.clone();
640 /// assert!(s.same_channel(&s2));
641 ///
642 /// let (s3, _) = unbounded();
643 /// assert!(!s.same_channel(&s3));
644 /// ```
same_channel(&self, other: &Sender<T>) -> bool645 pub fn same_channel(&self, other: &Sender<T>) -> bool {
646 match (&self.flavor, &other.flavor) {
647 (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
648 (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
649 (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
650 _ => false,
651 }
652 }
653 }
654
655 impl<T> Drop for Sender<T> {
drop(&mut self)656 fn drop(&mut self) {
657 unsafe {
658 match &self.flavor {
659 SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
660 SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
661 SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
662 }
663 }
664 }
665 }
666
667 impl<T> Clone for Sender<T> {
clone(&self) -> Self668 fn clone(&self) -> Self {
669 let flavor = match &self.flavor {
670 SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
671 SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
672 SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
673 };
674
675 Sender { flavor }
676 }
677 }
678
679 impl<T> fmt::Debug for Sender<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result680 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
681 f.pad("Sender { .. }")
682 }
683 }
684
685 /// The receiving side of a channel.
686 ///
687 /// # Examples
688 ///
689 /// ```
690 /// use std::thread;
691 /// use std::time::Duration;
692 /// use crossbeam_channel::unbounded;
693 ///
694 /// let (s, r) = unbounded();
695 ///
696 /// thread::spawn(move || {
697 /// let _ = s.send(1);
698 /// thread::sleep(Duration::from_secs(1));
699 /// let _ = s.send(2);
700 /// });
701 ///
702 /// assert_eq!(r.recv(), Ok(1)); // Received immediately.
703 /// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
704 /// ```
705 pub struct Receiver<T> {
706 flavor: ReceiverFlavor<T>,
707 }
708
709 /// Receiver flavors.
710 enum ReceiverFlavor<T> {
711 /// Bounded channel based on a preallocated array.
712 Array(counter::Receiver<flavors::array::Channel<T>>),
713
714 /// Unbounded channel implemented as a linked list.
715 List(counter::Receiver<flavors::list::Channel<T>>),
716
717 /// Zero-capacity channel.
718 Zero(counter::Receiver<flavors::zero::Channel<T>>),
719
720 /// The after flavor.
721 At(Arc<flavors::at::Channel>),
722
723 /// The tick flavor.
724 Tick(Arc<flavors::tick::Channel>),
725
726 /// The never flavor.
727 Never(flavors::never::Channel<T>),
728 }
729
730 unsafe impl<T: Send> Send for Receiver<T> {}
731 unsafe impl<T: Send> Sync for Receiver<T> {}
732
733 impl<T> UnwindSafe for Receiver<T> {}
734 impl<T> RefUnwindSafe for Receiver<T> {}
735
736 impl<T> Receiver<T> {
737 /// Attempts to receive a message from the channel without blocking.
738 ///
739 /// This method will either receive a message from the channel immediately or return an error
740 /// if the channel is empty.
741 ///
742 /// If called on a zero-capacity channel, this method will receive a message only if there
743 /// happens to be a send operation on the other side of the channel at the same time.
744 ///
745 /// # Examples
746 ///
747 /// ```
748 /// use crossbeam_channel::{unbounded, TryRecvError};
749 ///
750 /// let (s, r) = unbounded();
751 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
752 ///
753 /// s.send(5).unwrap();
754 /// drop(s);
755 ///
756 /// assert_eq!(r.try_recv(), Ok(5));
757 /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
758 /// ```
try_recv(&self) -> Result<T, TryRecvError>759 pub fn try_recv(&self) -> Result<T, TryRecvError> {
760 match &self.flavor {
761 ReceiverFlavor::Array(chan) => chan.try_recv(),
762 ReceiverFlavor::List(chan) => chan.try_recv(),
763 ReceiverFlavor::Zero(chan) => chan.try_recv(),
764 ReceiverFlavor::At(chan) => {
765 let msg = chan.try_recv();
766 unsafe {
767 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
768 &msg,
769 )
770 }
771 }
772 ReceiverFlavor::Tick(chan) => {
773 let msg = chan.try_recv();
774 unsafe {
775 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
776 &msg,
777 )
778 }
779 }
780 ReceiverFlavor::Never(chan) => chan.try_recv(),
781 }
782 }
783
784 /// Blocks the current thread until a message is received or the channel is empty and
785 /// disconnected.
786 ///
787 /// If the channel is empty and not disconnected, this call will block until the receive
788 /// operation can proceed. If the channel is empty and becomes disconnected, this call will
789 /// wake up and return an error.
790 ///
791 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
792 /// on the other side of the channel.
793 ///
794 /// # Examples
795 ///
796 /// ```
797 /// use std::thread;
798 /// use std::time::Duration;
799 /// use crossbeam_channel::{unbounded, RecvError};
800 ///
801 /// let (s, r) = unbounded();
802 ///
803 /// thread::spawn(move || {
804 /// thread::sleep(Duration::from_secs(1));
805 /// s.send(5).unwrap();
806 /// drop(s);
807 /// });
808 ///
809 /// assert_eq!(r.recv(), Ok(5));
810 /// assert_eq!(r.recv(), Err(RecvError));
811 /// ```
recv(&self) -> Result<T, RecvError>812 pub fn recv(&self) -> Result<T, RecvError> {
813 match &self.flavor {
814 ReceiverFlavor::Array(chan) => chan.recv(None),
815 ReceiverFlavor::List(chan) => chan.recv(None),
816 ReceiverFlavor::Zero(chan) => chan.recv(None),
817 ReceiverFlavor::At(chan) => {
818 let msg = chan.recv(None);
819 unsafe {
820 mem::transmute_copy::<
821 Result<Instant, RecvTimeoutError>,
822 Result<T, RecvTimeoutError>,
823 >(&msg)
824 }
825 }
826 ReceiverFlavor::Tick(chan) => {
827 let msg = chan.recv(None);
828 unsafe {
829 mem::transmute_copy::<
830 Result<Instant, RecvTimeoutError>,
831 Result<T, RecvTimeoutError>,
832 >(&msg)
833 }
834 }
835 ReceiverFlavor::Never(chan) => chan.recv(None),
836 }
837 .map_err(|_| RecvError)
838 }
839
840 /// Waits for a message to be received from the channel, but only for a limited time.
841 ///
842 /// If the channel is empty and not disconnected, this call will block until the receive
843 /// operation can proceed or the operation times out. If the channel is empty and becomes
844 /// disconnected, this call will wake up and return an error.
845 ///
846 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
847 /// on the other side of the channel.
848 ///
849 /// # Examples
850 ///
851 /// ```
852 /// use std::thread;
853 /// use std::time::Duration;
854 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
855 ///
856 /// let (s, r) = unbounded();
857 ///
858 /// thread::spawn(move || {
859 /// thread::sleep(Duration::from_secs(1));
860 /// s.send(5).unwrap();
861 /// drop(s);
862 /// });
863 ///
864 /// assert_eq!(
865 /// r.recv_timeout(Duration::from_millis(500)),
866 /// Err(RecvTimeoutError::Timeout),
867 /// );
868 /// assert_eq!(
869 /// r.recv_timeout(Duration::from_secs(1)),
870 /// Ok(5),
871 /// );
872 /// assert_eq!(
873 /// r.recv_timeout(Duration::from_secs(1)),
874 /// Err(RecvTimeoutError::Disconnected),
875 /// );
876 /// ```
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>877 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
878 match Instant::now().checked_add(timeout) {
879 Some(deadline) => self.recv_deadline(deadline),
880 None => self.recv().map_err(RecvTimeoutError::from),
881 }
882 }
883
884 /// Waits for a message to be received from the channel, but only before a given deadline.
885 ///
886 /// If the channel is empty and not disconnected, this call will block until the receive
887 /// operation can proceed or the operation times out. If the channel is empty and becomes
888 /// disconnected, this call will wake up and return an error.
889 ///
890 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
891 /// on the other side of the channel.
892 ///
893 /// # Examples
894 ///
895 /// ```
896 /// use std::thread;
897 /// use std::time::{Instant, Duration};
898 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
899 ///
900 /// let (s, r) = unbounded();
901 ///
902 /// thread::spawn(move || {
903 /// thread::sleep(Duration::from_secs(1));
904 /// s.send(5).unwrap();
905 /// drop(s);
906 /// });
907 ///
908 /// let now = Instant::now();
909 ///
910 /// assert_eq!(
911 /// r.recv_deadline(now + Duration::from_millis(500)),
912 /// Err(RecvTimeoutError::Timeout),
913 /// );
914 /// assert_eq!(
915 /// r.recv_deadline(now + Duration::from_millis(1500)),
916 /// Ok(5),
917 /// );
918 /// assert_eq!(
919 /// r.recv_deadline(now + Duration::from_secs(5)),
920 /// Err(RecvTimeoutError::Disconnected),
921 /// );
922 /// ```
recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError>923 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
924 match &self.flavor {
925 ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
926 ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
927 ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
928 ReceiverFlavor::At(chan) => {
929 let msg = chan.recv(Some(deadline));
930 unsafe {
931 mem::transmute_copy::<
932 Result<Instant, RecvTimeoutError>,
933 Result<T, RecvTimeoutError>,
934 >(&msg)
935 }
936 }
937 ReceiverFlavor::Tick(chan) => {
938 let msg = chan.recv(Some(deadline));
939 unsafe {
940 mem::transmute_copy::<
941 Result<Instant, RecvTimeoutError>,
942 Result<T, RecvTimeoutError>,
943 >(&msg)
944 }
945 }
946 ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
947 }
948 }
949
950 /// Returns `true` if the channel is empty.
951 ///
952 /// Note: Zero-capacity channels are always empty.
953 ///
954 /// # Examples
955 ///
956 /// ```
957 /// use crossbeam_channel::unbounded;
958 ///
959 /// let (s, r) = unbounded();
960 ///
961 /// assert!(r.is_empty());
962 /// s.send(0).unwrap();
963 /// assert!(!r.is_empty());
964 /// ```
is_empty(&self) -> bool965 pub fn is_empty(&self) -> bool {
966 match &self.flavor {
967 ReceiverFlavor::Array(chan) => chan.is_empty(),
968 ReceiverFlavor::List(chan) => chan.is_empty(),
969 ReceiverFlavor::Zero(chan) => chan.is_empty(),
970 ReceiverFlavor::At(chan) => chan.is_empty(),
971 ReceiverFlavor::Tick(chan) => chan.is_empty(),
972 ReceiverFlavor::Never(chan) => chan.is_empty(),
973 }
974 }
975
976 /// Returns `true` if the channel is full.
977 ///
978 /// Note: Zero-capacity channels are always full.
979 ///
980 /// # Examples
981 ///
982 /// ```
983 /// use crossbeam_channel::bounded;
984 ///
985 /// let (s, r) = bounded(1);
986 ///
987 /// assert!(!r.is_full());
988 /// s.send(0).unwrap();
989 /// assert!(r.is_full());
990 /// ```
is_full(&self) -> bool991 pub fn is_full(&self) -> bool {
992 match &self.flavor {
993 ReceiverFlavor::Array(chan) => chan.is_full(),
994 ReceiverFlavor::List(chan) => chan.is_full(),
995 ReceiverFlavor::Zero(chan) => chan.is_full(),
996 ReceiverFlavor::At(chan) => chan.is_full(),
997 ReceiverFlavor::Tick(chan) => chan.is_full(),
998 ReceiverFlavor::Never(chan) => chan.is_full(),
999 }
1000 }
1001
1002 /// Returns the number of messages in the channel.
1003 ///
1004 /// # Examples
1005 ///
1006 /// ```
1007 /// use crossbeam_channel::unbounded;
1008 ///
1009 /// let (s, r) = unbounded();
1010 /// assert_eq!(r.len(), 0);
1011 ///
1012 /// s.send(1).unwrap();
1013 /// s.send(2).unwrap();
1014 /// assert_eq!(r.len(), 2);
1015 /// ```
len(&self) -> usize1016 pub fn len(&self) -> usize {
1017 match &self.flavor {
1018 ReceiverFlavor::Array(chan) => chan.len(),
1019 ReceiverFlavor::List(chan) => chan.len(),
1020 ReceiverFlavor::Zero(chan) => chan.len(),
1021 ReceiverFlavor::At(chan) => chan.len(),
1022 ReceiverFlavor::Tick(chan) => chan.len(),
1023 ReceiverFlavor::Never(chan) => chan.len(),
1024 }
1025 }
1026
1027 /// If the channel is bounded, returns its capacity.
1028 ///
1029 /// # Examples
1030 ///
1031 /// ```
1032 /// use crossbeam_channel::{bounded, unbounded};
1033 ///
1034 /// let (_, r) = unbounded::<i32>();
1035 /// assert_eq!(r.capacity(), None);
1036 ///
1037 /// let (_, r) = bounded::<i32>(5);
1038 /// assert_eq!(r.capacity(), Some(5));
1039 ///
1040 /// let (_, r) = bounded::<i32>(0);
1041 /// assert_eq!(r.capacity(), Some(0));
1042 /// ```
capacity(&self) -> Option<usize>1043 pub fn capacity(&self) -> Option<usize> {
1044 match &self.flavor {
1045 ReceiverFlavor::Array(chan) => chan.capacity(),
1046 ReceiverFlavor::List(chan) => chan.capacity(),
1047 ReceiverFlavor::Zero(chan) => chan.capacity(),
1048 ReceiverFlavor::At(chan) => chan.capacity(),
1049 ReceiverFlavor::Tick(chan) => chan.capacity(),
1050 ReceiverFlavor::Never(chan) => chan.capacity(),
1051 }
1052 }
1053
1054 /// A blocking iterator over messages in the channel.
1055 ///
1056 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
1057 /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
1058 ///
1059 /// [`next`]: Iterator::next
1060 ///
1061 /// # Examples
1062 ///
1063 /// ```
1064 /// use std::thread;
1065 /// use crossbeam_channel::unbounded;
1066 ///
1067 /// let (s, r) = unbounded();
1068 ///
1069 /// thread::spawn(move || {
1070 /// s.send(1).unwrap();
1071 /// s.send(2).unwrap();
1072 /// s.send(3).unwrap();
1073 /// drop(s); // Disconnect the channel.
1074 /// });
1075 ///
1076 /// // Collect all messages from the channel.
1077 /// // Note that the call to `collect` blocks until the sender is dropped.
1078 /// let v: Vec<_> = r.iter().collect();
1079 ///
1080 /// assert_eq!(v, [1, 2, 3]);
1081 /// ```
iter(&self) -> Iter<'_, T>1082 pub fn iter(&self) -> Iter<'_, T> {
1083 Iter { receiver: self }
1084 }
1085
1086 /// A non-blocking iterator over messages in the channel.
1087 ///
1088 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1089 /// never blocks waiting for the next message.
1090 ///
1091 /// [`next`]: Iterator::next
1092 ///
1093 /// # Examples
1094 ///
1095 /// ```
1096 /// use std::thread;
1097 /// use std::time::Duration;
1098 /// use crossbeam_channel::unbounded;
1099 ///
1100 /// let (s, r) = unbounded::<i32>();
1101 ///
1102 /// thread::spawn(move || {
1103 /// s.send(1).unwrap();
1104 /// thread::sleep(Duration::from_secs(1));
1105 /// s.send(2).unwrap();
1106 /// thread::sleep(Duration::from_secs(2));
1107 /// s.send(3).unwrap();
1108 /// });
1109 ///
1110 /// thread::sleep(Duration::from_secs(2));
1111 ///
1112 /// // Collect all messages from the channel without blocking.
1113 /// // The third message hasn't been sent yet so we'll collect only the first two.
1114 /// let v: Vec<_> = r.try_iter().collect();
1115 ///
1116 /// assert_eq!(v, [1, 2]);
1117 /// ```
try_iter(&self) -> TryIter<'_, T>1118 pub fn try_iter(&self) -> TryIter<'_, T> {
1119 TryIter { receiver: self }
1120 }
1121
1122 /// Returns `true` if receivers belong to the same channel.
1123 ///
1124 /// # Examples
1125 ///
1126 /// ```rust
1127 /// use crossbeam_channel::unbounded;
1128 ///
1129 /// let (_, r) = unbounded::<usize>();
1130 ///
1131 /// let r2 = r.clone();
1132 /// assert!(r.same_channel(&r2));
1133 ///
1134 /// let (_, r3) = unbounded();
1135 /// assert!(!r.same_channel(&r3));
1136 /// ```
same_channel(&self, other: &Receiver<T>) -> bool1137 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1138 match (&self.flavor, &other.flavor) {
1139 (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1140 (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1141 (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1142 (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
1143 (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
1144 (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
1145 _ => false,
1146 }
1147 }
1148 }
1149
1150 impl<T> Drop for Receiver<T> {
drop(&mut self)1151 fn drop(&mut self) {
1152 unsafe {
1153 match &self.flavor {
1154 ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
1155 ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1156 ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1157 ReceiverFlavor::At(_) => {}
1158 ReceiverFlavor::Tick(_) => {}
1159 ReceiverFlavor::Never(_) => {}
1160 }
1161 }
1162 }
1163 }
1164
1165 impl<T> Clone for Receiver<T> {
clone(&self) -> Self1166 fn clone(&self) -> Self {
1167 let flavor = match &self.flavor {
1168 ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1169 ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1170 ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1171 ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
1172 ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
1173 ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
1174 };
1175
1176 Receiver { flavor }
1177 }
1178 }
1179
1180 impl<T> fmt::Debug for Receiver<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1182 f.pad("Receiver { .. }")
1183 }
1184 }
1185
1186 impl<'a, T> IntoIterator for &'a Receiver<T> {
1187 type Item = T;
1188 type IntoIter = Iter<'a, T>;
1189
into_iter(self) -> Self::IntoIter1190 fn into_iter(self) -> Self::IntoIter {
1191 self.iter()
1192 }
1193 }
1194
1195 impl<T> IntoIterator for Receiver<T> {
1196 type Item = T;
1197 type IntoIter = IntoIter<T>;
1198
into_iter(self) -> Self::IntoIter1199 fn into_iter(self) -> Self::IntoIter {
1200 IntoIter { receiver: self }
1201 }
1202 }
1203
1204 /// A blocking iterator over messages in a channel.
1205 ///
1206 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1207 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1208 ///
1209 /// [`next`]: Iterator::next
1210 ///
1211 /// # Examples
1212 ///
1213 /// ```
1214 /// use std::thread;
1215 /// use crossbeam_channel::unbounded;
1216 ///
1217 /// let (s, r) = unbounded();
1218 ///
1219 /// thread::spawn(move || {
1220 /// s.send(1).unwrap();
1221 /// s.send(2).unwrap();
1222 /// s.send(3).unwrap();
1223 /// drop(s); // Disconnect the channel.
1224 /// });
1225 ///
1226 /// // Collect all messages from the channel.
1227 /// // Note that the call to `collect` blocks until the sender is dropped.
1228 /// let v: Vec<_> = r.iter().collect();
1229 ///
1230 /// assert_eq!(v, [1, 2, 3]);
1231 /// ```
1232 pub struct Iter<'a, T> {
1233 receiver: &'a Receiver<T>,
1234 }
1235
1236 impl<T> FusedIterator for Iter<'_, T> {}
1237
1238 impl<T> Iterator for Iter<'_, T> {
1239 type Item = T;
1240
next(&mut self) -> Option<Self::Item>1241 fn next(&mut self) -> Option<Self::Item> {
1242 self.receiver.recv().ok()
1243 }
1244 }
1245
1246 impl<T> fmt::Debug for Iter<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1248 f.pad("Iter { .. }")
1249 }
1250 }
1251
1252 /// A non-blocking iterator over messages in a channel.
1253 ///
1254 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1255 /// never blocks waiting for the next message.
1256 ///
1257 /// [`next`]: Iterator::next
1258 ///
1259 /// # Examples
1260 ///
1261 /// ```
1262 /// use std::thread;
1263 /// use std::time::Duration;
1264 /// use crossbeam_channel::unbounded;
1265 ///
1266 /// let (s, r) = unbounded::<i32>();
1267 ///
1268 /// thread::spawn(move || {
1269 /// s.send(1).unwrap();
1270 /// thread::sleep(Duration::from_secs(1));
1271 /// s.send(2).unwrap();
1272 /// thread::sleep(Duration::from_secs(2));
1273 /// s.send(3).unwrap();
1274 /// });
1275 ///
1276 /// thread::sleep(Duration::from_secs(2));
1277 ///
1278 /// // Collect all messages from the channel without blocking.
1279 /// // The third message hasn't been sent yet so we'll collect only the first two.
1280 /// let v: Vec<_> = r.try_iter().collect();
1281 ///
1282 /// assert_eq!(v, [1, 2]);
1283 /// ```
1284 pub struct TryIter<'a, T> {
1285 receiver: &'a Receiver<T>,
1286 }
1287
1288 impl<T> Iterator for TryIter<'_, T> {
1289 type Item = T;
1290
next(&mut self) -> Option<Self::Item>1291 fn next(&mut self) -> Option<Self::Item> {
1292 self.receiver.try_recv().ok()
1293 }
1294 }
1295
1296 impl<T> fmt::Debug for TryIter<'_, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1297 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1298 f.pad("TryIter { .. }")
1299 }
1300 }
1301
1302 /// A blocking iterator over messages in a channel.
1303 ///
1304 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1305 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1306 ///
1307 /// [`next`]: Iterator::next
1308 ///
1309 /// # Examples
1310 ///
1311 /// ```
1312 /// use std::thread;
1313 /// use crossbeam_channel::unbounded;
1314 ///
1315 /// let (s, r) = unbounded();
1316 ///
1317 /// thread::spawn(move || {
1318 /// s.send(1).unwrap();
1319 /// s.send(2).unwrap();
1320 /// s.send(3).unwrap();
1321 /// drop(s); // Disconnect the channel.
1322 /// });
1323 ///
1324 /// // Collect all messages from the channel.
1325 /// // Note that the call to `collect` blocks until the sender is dropped.
1326 /// let v: Vec<_> = r.into_iter().collect();
1327 ///
1328 /// assert_eq!(v, [1, 2, 3]);
1329 /// ```
1330 pub struct IntoIter<T> {
1331 receiver: Receiver<T>,
1332 }
1333
1334 impl<T> FusedIterator for IntoIter<T> {}
1335
1336 impl<T> Iterator for IntoIter<T> {
1337 type Item = T;
1338
next(&mut self) -> Option<Self::Item>1339 fn next(&mut self) -> Option<Self::Item> {
1340 self.receiver.recv().ok()
1341 }
1342 }
1343
1344 impl<T> fmt::Debug for IntoIter<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1345 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1346 f.pad("IntoIter { .. }")
1347 }
1348 }
1349
1350 impl<T> SelectHandle for Sender<T> {
try_select(&self, token: &mut Token) -> bool1351 fn try_select(&self, token: &mut Token) -> bool {
1352 match &self.flavor {
1353 SenderFlavor::Array(chan) => chan.sender().try_select(token),
1354 SenderFlavor::List(chan) => chan.sender().try_select(token),
1355 SenderFlavor::Zero(chan) => chan.sender().try_select(token),
1356 }
1357 }
1358
deadline(&self) -> Option<Instant>1359 fn deadline(&self) -> Option<Instant> {
1360 None
1361 }
1362
register(&self, oper: Operation, cx: &Context) -> bool1363 fn register(&self, oper: Operation, cx: &Context) -> bool {
1364 match &self.flavor {
1365 SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1366 SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1367 SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1368 }
1369 }
1370
unregister(&self, oper: Operation)1371 fn unregister(&self, oper: Operation) {
1372 match &self.flavor {
1373 SenderFlavor::Array(chan) => chan.sender().unregister(oper),
1374 SenderFlavor::List(chan) => chan.sender().unregister(oper),
1375 SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
1376 }
1377 }
1378
accept(&self, token: &mut Token, cx: &Context) -> bool1379 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1380 match &self.flavor {
1381 SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
1382 SenderFlavor::List(chan) => chan.sender().accept(token, cx),
1383 SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
1384 }
1385 }
1386
is_ready(&self) -> bool1387 fn is_ready(&self) -> bool {
1388 match &self.flavor {
1389 SenderFlavor::Array(chan) => chan.sender().is_ready(),
1390 SenderFlavor::List(chan) => chan.sender().is_ready(),
1391 SenderFlavor::Zero(chan) => chan.sender().is_ready(),
1392 }
1393 }
1394
watch(&self, oper: Operation, cx: &Context) -> bool1395 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1396 match &self.flavor {
1397 SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
1398 SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
1399 SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
1400 }
1401 }
1402
unwatch(&self, oper: Operation)1403 fn unwatch(&self, oper: Operation) {
1404 match &self.flavor {
1405 SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
1406 SenderFlavor::List(chan) => chan.sender().unwatch(oper),
1407 SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
1408 }
1409 }
1410 }
1411
1412 impl<T> SelectHandle for Receiver<T> {
try_select(&self, token: &mut Token) -> bool1413 fn try_select(&self, token: &mut Token) -> bool {
1414 match &self.flavor {
1415 ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
1416 ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
1417 ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
1418 ReceiverFlavor::At(chan) => chan.try_select(token),
1419 ReceiverFlavor::Tick(chan) => chan.try_select(token),
1420 ReceiverFlavor::Never(chan) => chan.try_select(token),
1421 }
1422 }
1423
deadline(&self) -> Option<Instant>1424 fn deadline(&self) -> Option<Instant> {
1425 match &self.flavor {
1426 ReceiverFlavor::Array(_) => None,
1427 ReceiverFlavor::List(_) => None,
1428 ReceiverFlavor::Zero(_) => None,
1429 ReceiverFlavor::At(chan) => chan.deadline(),
1430 ReceiverFlavor::Tick(chan) => chan.deadline(),
1431 ReceiverFlavor::Never(chan) => chan.deadline(),
1432 }
1433 }
1434
register(&self, oper: Operation, cx: &Context) -> bool1435 fn register(&self, oper: Operation, cx: &Context) -> bool {
1436 match &self.flavor {
1437 ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1438 ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1439 ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1440 ReceiverFlavor::At(chan) => chan.register(oper, cx),
1441 ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1442 ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1443 }
1444 }
1445
unregister(&self, oper: Operation)1446 fn unregister(&self, oper: Operation) {
1447 match &self.flavor {
1448 ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
1449 ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
1450 ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
1451 ReceiverFlavor::At(chan) => chan.unregister(oper),
1452 ReceiverFlavor::Tick(chan) => chan.unregister(oper),
1453 ReceiverFlavor::Never(chan) => chan.unregister(oper),
1454 }
1455 }
1456
accept(&self, token: &mut Token, cx: &Context) -> bool1457 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1458 match &self.flavor {
1459 ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
1460 ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
1461 ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
1462 ReceiverFlavor::At(chan) => chan.accept(token, cx),
1463 ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
1464 ReceiverFlavor::Never(chan) => chan.accept(token, cx),
1465 }
1466 }
1467
is_ready(&self) -> bool1468 fn is_ready(&self) -> bool {
1469 match &self.flavor {
1470 ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
1471 ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
1472 ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
1473 ReceiverFlavor::At(chan) => chan.is_ready(),
1474 ReceiverFlavor::Tick(chan) => chan.is_ready(),
1475 ReceiverFlavor::Never(chan) => chan.is_ready(),
1476 }
1477 }
1478
watch(&self, oper: Operation, cx: &Context) -> bool1479 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1480 match &self.flavor {
1481 ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
1482 ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
1483 ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
1484 ReceiverFlavor::At(chan) => chan.watch(oper, cx),
1485 ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
1486 ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
1487 }
1488 }
1489
unwatch(&self, oper: Operation)1490 fn unwatch(&self, oper: Operation) {
1491 match &self.flavor {
1492 ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
1493 ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
1494 ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
1495 ReceiverFlavor::At(chan) => chan.unwatch(oper),
1496 ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
1497 ReceiverFlavor::Never(chan) => chan.unwatch(oper),
1498 }
1499 }
1500 }
1501
1502 /// Writes a message into the channel.
write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T>1503 pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
1504 match &s.flavor {
1505 SenderFlavor::Array(chan) => chan.write(token, msg),
1506 SenderFlavor::List(chan) => chan.write(token, msg),
1507 SenderFlavor::Zero(chan) => chan.write(token, msg),
1508 }
1509 }
1510
1511 /// Reads a message from the channel.
read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()>1512 pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
1513 match &r.flavor {
1514 ReceiverFlavor::Array(chan) => chan.read(token),
1515 ReceiverFlavor::List(chan) => chan.read(token),
1516 ReceiverFlavor::Zero(chan) => chan.read(token),
1517 ReceiverFlavor::At(chan) => {
1518 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1519 }
1520 ReceiverFlavor::Tick(chan) => {
1521 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1522 }
1523 ReceiverFlavor::Never(chan) => chan.read(token),
1524 }
1525 }
1526