1 //! Tests copied from `std::sync::mpsc`.
2 //!
3 //! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but
4 //! modified to work with `crossbeam-channel` instead.
5 //!
6 //! Minor tweaks were needed to make the tests compile:
7 //!
8 //! - Replace `box` syntax with `Box::new`.
9 //! - Replace all uses of `Select` with `select!`.
10 //! - Change the imports.
11 //! - Join all spawned threads.
12 //! - Removed assertion from oneshot_multi_thread_send_close_stress tests.
13 //!
14 //! Source:
15 //!   - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc
16 //!
17 //! Copyright & License:
18 //!   - Copyright 2013-2014 The Rust Project Developers
19 //!   - Apache License, Version 2.0 or MIT license, at your option
20 //!   - https://github.com/rust-lang/rust/blob/master/COPYRIGHT
21 //!   - https://www.rust-lang.org/en-US/legal.html
22 
23 #![allow(clippy::match_single_binding, clippy::redundant_clone)]
24 
25 use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
26 use std::sync::mpsc::{SendError, TrySendError};
27 use std::thread::JoinHandle;
28 use std::time::Duration;
29 
30 use crossbeam_channel as cc;
31 
32 pub struct Sender<T> {
33     pub inner: cc::Sender<T>,
34 }
35 
36 impl<T> Sender<T> {
send(&self, t: T) -> Result<(), SendError<T>>37     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
38         self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
39     }
40 }
41 
42 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>43     fn clone(&self) -> Sender<T> {
44         Sender {
45             inner: self.inner.clone(),
46         }
47     }
48 }
49 
50 pub struct SyncSender<T> {
51     pub inner: cc::Sender<T>,
52 }
53 
54 impl<T> SyncSender<T> {
send(&self, t: T) -> Result<(), SendError<T>>55     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
56         self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
57     }
58 
try_send(&self, t: T) -> Result<(), TrySendError<T>>59     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
60         self.inner.try_send(t).map_err(|err| match err {
61             cc::TrySendError::Full(m) => TrySendError::Full(m),
62             cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m),
63         })
64     }
65 }
66 
67 impl<T> Clone for SyncSender<T> {
clone(&self) -> SyncSender<T>68     fn clone(&self) -> SyncSender<T> {
69         SyncSender {
70             inner: self.inner.clone(),
71         }
72     }
73 }
74 
75 pub struct Receiver<T> {
76     pub inner: cc::Receiver<T>,
77 }
78 
79 impl<T> Receiver<T> {
try_recv(&self) -> Result<T, TryRecvError>80     pub fn try_recv(&self) -> Result<T, TryRecvError> {
81         self.inner.try_recv().map_err(|err| match err {
82             cc::TryRecvError::Empty => TryRecvError::Empty,
83             cc::TryRecvError::Disconnected => TryRecvError::Disconnected,
84         })
85     }
86 
recv(&self) -> Result<T, RecvError>87     pub fn recv(&self) -> Result<T, RecvError> {
88         self.inner.recv().map_err(|_| RecvError)
89     }
90 
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>91     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
92         self.inner.recv_timeout(timeout).map_err(|err| match err {
93             cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
94             cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
95         })
96     }
97 
iter(&self) -> Iter<T>98     pub fn iter(&self) -> Iter<T> {
99         Iter { inner: self }
100     }
101 
try_iter(&self) -> TryIter<T>102     pub fn try_iter(&self) -> TryIter<T> {
103         TryIter { inner: self }
104     }
105 }
106 
107 impl<'a, T> IntoIterator for &'a Receiver<T> {
108     type Item = T;
109     type IntoIter = Iter<'a, T>;
110 
into_iter(self) -> Iter<'a, T>111     fn into_iter(self) -> Iter<'a, T> {
112         self.iter()
113     }
114 }
115 
116 impl<T> IntoIterator for Receiver<T> {
117     type Item = T;
118     type IntoIter = IntoIter<T>;
119 
into_iter(self) -> IntoIter<T>120     fn into_iter(self) -> IntoIter<T> {
121         IntoIter { inner: self }
122     }
123 }
124 
125 pub struct TryIter<'a, T: 'a> {
126     inner: &'a Receiver<T>,
127 }
128 
129 impl<'a, T> Iterator for TryIter<'a, T> {
130     type Item = T;
131 
next(&mut self) -> Option<T>132     fn next(&mut self) -> Option<T> {
133         self.inner.try_recv().ok()
134     }
135 }
136 
137 pub struct Iter<'a, T: 'a> {
138     inner: &'a Receiver<T>,
139 }
140 
141 impl<'a, T> Iterator for Iter<'a, T> {
142     type Item = T;
143 
next(&mut self) -> Option<T>144     fn next(&mut self) -> Option<T> {
145         self.inner.recv().ok()
146     }
147 }
148 
149 pub struct IntoIter<T> {
150     inner: Receiver<T>,
151 }
152 
153 impl<T> Iterator for IntoIter<T> {
154     type Item = T;
155 
next(&mut self) -> Option<T>156     fn next(&mut self) -> Option<T> {
157         self.inner.recv().ok()
158     }
159 }
160 
channel<T>() -> (Sender<T>, Receiver<T>)161 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
162     let (s, r) = cc::unbounded();
163     let s = Sender { inner: s };
164     let r = Receiver { inner: r };
165     (s, r)
166 }
167 
sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)168 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
169     let (s, r) = cc::bounded(bound);
170     let s = SyncSender { inner: s };
171     let r = Receiver { inner: r };
172     (s, r)
173 }
174 
175 macro_rules! select {
176     (
177         $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
178     ) => ({
179         const _IS_BIASED: bool = false;
180 
181         cc::crossbeam_channel_internal! {
182             $(
183                 $meth(($rx).inner) -> res => {
184                     let $name = res.map_err(|_| ::std::sync::mpsc::RecvError);
185                     $code
186                 }
187             )+
188         }
189     })
190 }
191 
192 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
193 mod channel_tests {
194     use super::*;
195 
196     use std::env;
197     use std::thread;
198     use std::time::Instant;
199 
stress_factor() -> usize200     pub fn stress_factor() -> usize {
201         match env::var("RUST_TEST_STRESS") {
202             Ok(val) => val.parse().unwrap(),
203             Err(..) => 1,
204         }
205     }
206 
207     #[test]
smoke()208     fn smoke() {
209         let (tx, rx) = channel::<i32>();
210         tx.send(1).unwrap();
211         assert_eq!(rx.recv().unwrap(), 1);
212     }
213 
214     #[test]
drop_full()215     fn drop_full() {
216         let (tx, _rx) = channel::<Box<isize>>();
217         tx.send(Box::new(1)).unwrap();
218     }
219 
220     #[test]
drop_full_shared()221     fn drop_full_shared() {
222         let (tx, _rx) = channel::<Box<isize>>();
223         drop(tx.clone());
224         drop(tx.clone());
225         tx.send(Box::new(1)).unwrap();
226     }
227 
228     #[test]
smoke_shared()229     fn smoke_shared() {
230         let (tx, rx) = channel::<i32>();
231         tx.send(1).unwrap();
232         assert_eq!(rx.recv().unwrap(), 1);
233         let tx = tx.clone();
234         tx.send(1).unwrap();
235         assert_eq!(rx.recv().unwrap(), 1);
236     }
237 
238     #[test]
smoke_threads()239     fn smoke_threads() {
240         let (tx, rx) = channel::<i32>();
241         let t = thread::spawn(move || {
242             tx.send(1).unwrap();
243         });
244         assert_eq!(rx.recv().unwrap(), 1);
245         t.join().unwrap();
246     }
247 
248     #[test]
smoke_port_gone()249     fn smoke_port_gone() {
250         let (tx, rx) = channel::<i32>();
251         drop(rx);
252         assert!(tx.send(1).is_err());
253     }
254 
255     #[test]
smoke_shared_port_gone()256     fn smoke_shared_port_gone() {
257         let (tx, rx) = channel::<i32>();
258         drop(rx);
259         assert!(tx.send(1).is_err())
260     }
261 
262     #[test]
smoke_shared_port_gone2()263     fn smoke_shared_port_gone2() {
264         let (tx, rx) = channel::<i32>();
265         drop(rx);
266         let tx2 = tx.clone();
267         drop(tx);
268         assert!(tx2.send(1).is_err());
269     }
270 
271     #[test]
port_gone_concurrent()272     fn port_gone_concurrent() {
273         let (tx, rx) = channel::<i32>();
274         let t = thread::spawn(move || {
275             rx.recv().unwrap();
276         });
277         while tx.send(1).is_ok() {}
278         t.join().unwrap();
279     }
280 
281     #[test]
port_gone_concurrent_shared()282     fn port_gone_concurrent_shared() {
283         let (tx, rx) = channel::<i32>();
284         let tx2 = tx.clone();
285         let t = thread::spawn(move || {
286             rx.recv().unwrap();
287         });
288         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
289         t.join().unwrap();
290     }
291 
292     #[test]
smoke_chan_gone()293     fn smoke_chan_gone() {
294         let (tx, rx) = channel::<i32>();
295         drop(tx);
296         assert!(rx.recv().is_err());
297     }
298 
299     #[test]
smoke_chan_gone_shared()300     fn smoke_chan_gone_shared() {
301         let (tx, rx) = channel::<()>();
302         let tx2 = tx.clone();
303         drop(tx);
304         drop(tx2);
305         assert!(rx.recv().is_err());
306     }
307 
308     #[test]
chan_gone_concurrent()309     fn chan_gone_concurrent() {
310         let (tx, rx) = channel::<i32>();
311         let t = thread::spawn(move || {
312             tx.send(1).unwrap();
313             tx.send(1).unwrap();
314         });
315         while rx.recv().is_ok() {}
316         t.join().unwrap();
317     }
318 
319     #[test]
stress()320     fn stress() {
321         #[cfg(miri)]
322         const COUNT: usize = 100;
323         #[cfg(not(miri))]
324         const COUNT: usize = 10000;
325 
326         let (tx, rx) = channel::<i32>();
327         let t = thread::spawn(move || {
328             for _ in 0..COUNT {
329                 tx.send(1).unwrap();
330             }
331         });
332         for _ in 0..COUNT {
333             assert_eq!(rx.recv().unwrap(), 1);
334         }
335         t.join().ok().unwrap();
336     }
337 
338     #[test]
stress_shared()339     fn stress_shared() {
340         let amt: u32 = if cfg!(miri) { 100 } else { 10_000 };
341         let nthreads: u32 = if cfg!(miri) { 4 } else { 8 };
342         let (tx, rx) = channel::<i32>();
343 
344         let t = thread::spawn(move || {
345             for _ in 0..amt * nthreads {
346                 assert_eq!(rx.recv().unwrap(), 1);
347             }
348             assert!(rx.try_recv().is_err());
349         });
350 
351         let mut ts = Vec::with_capacity(nthreads as usize);
352         for _ in 0..nthreads {
353             let tx = tx.clone();
354             let t = thread::spawn(move || {
355                 for _ in 0..amt {
356                     tx.send(1).unwrap();
357                 }
358             });
359             ts.push(t);
360         }
361         drop(tx);
362         t.join().ok().unwrap();
363         for t in ts {
364             t.join().unwrap();
365         }
366     }
367 
368     #[test]
send_from_outside_runtime()369     fn send_from_outside_runtime() {
370         let (tx1, rx1) = channel::<()>();
371         let (tx2, rx2) = channel::<i32>();
372         let t1 = thread::spawn(move || {
373             tx1.send(()).unwrap();
374             for _ in 0..40 {
375                 assert_eq!(rx2.recv().unwrap(), 1);
376             }
377         });
378         rx1.recv().unwrap();
379         let t2 = thread::spawn(move || {
380             for _ in 0..40 {
381                 tx2.send(1).unwrap();
382             }
383         });
384         t1.join().ok().unwrap();
385         t2.join().ok().unwrap();
386     }
387 
388     #[test]
recv_from_outside_runtime()389     fn recv_from_outside_runtime() {
390         let (tx, rx) = channel::<i32>();
391         let t = thread::spawn(move || {
392             for _ in 0..40 {
393                 assert_eq!(rx.recv().unwrap(), 1);
394             }
395         });
396         for _ in 0..40 {
397             tx.send(1).unwrap();
398         }
399         t.join().ok().unwrap();
400     }
401 
402     #[test]
no_runtime()403     fn no_runtime() {
404         let (tx1, rx1) = channel::<i32>();
405         let (tx2, rx2) = channel::<i32>();
406         let t1 = thread::spawn(move || {
407             assert_eq!(rx1.recv().unwrap(), 1);
408             tx2.send(2).unwrap();
409         });
410         let t2 = thread::spawn(move || {
411             tx1.send(1).unwrap();
412             assert_eq!(rx2.recv().unwrap(), 2);
413         });
414         t1.join().ok().unwrap();
415         t2.join().ok().unwrap();
416     }
417 
418     #[test]
oneshot_single_thread_close_port_first()419     fn oneshot_single_thread_close_port_first() {
420         // Simple test of closing without sending
421         let (_tx, rx) = channel::<i32>();
422         drop(rx);
423     }
424 
425     #[test]
oneshot_single_thread_close_chan_first()426     fn oneshot_single_thread_close_chan_first() {
427         // Simple test of closing without sending
428         let (tx, _rx) = channel::<i32>();
429         drop(tx);
430     }
431 
432     #[test]
oneshot_single_thread_send_port_close()433     fn oneshot_single_thread_send_port_close() {
434         // Testing that the sender cleans up the payload if receiver is closed
435         let (tx, rx) = channel::<Box<i32>>();
436         drop(rx);
437         assert!(tx.send(Box::new(0)).is_err());
438     }
439 
440     #[test]
oneshot_single_thread_recv_chan_close()441     fn oneshot_single_thread_recv_chan_close() {
442         let (tx, rx) = channel::<i32>();
443         drop(tx);
444         assert_eq!(rx.recv(), Err(RecvError));
445     }
446 
447     #[test]
oneshot_single_thread_send_then_recv()448     fn oneshot_single_thread_send_then_recv() {
449         let (tx, rx) = channel::<Box<i32>>();
450         tx.send(Box::new(10)).unwrap();
451         assert!(*rx.recv().unwrap() == 10);
452     }
453 
454     #[test]
oneshot_single_thread_try_send_open()455     fn oneshot_single_thread_try_send_open() {
456         let (tx, rx) = channel::<i32>();
457         assert!(tx.send(10).is_ok());
458         assert!(rx.recv().unwrap() == 10);
459     }
460 
461     #[test]
oneshot_single_thread_try_send_closed()462     fn oneshot_single_thread_try_send_closed() {
463         let (tx, rx) = channel::<i32>();
464         drop(rx);
465         assert!(tx.send(10).is_err());
466     }
467 
468     #[test]
oneshot_single_thread_try_recv_open()469     fn oneshot_single_thread_try_recv_open() {
470         let (tx, rx) = channel::<i32>();
471         tx.send(10).unwrap();
472         assert!(rx.recv() == Ok(10));
473     }
474 
475     #[test]
oneshot_single_thread_try_recv_closed()476     fn oneshot_single_thread_try_recv_closed() {
477         let (tx, rx) = channel::<i32>();
478         drop(tx);
479         assert!(rx.recv().is_err());
480     }
481 
482     #[test]
oneshot_single_thread_peek_data()483     fn oneshot_single_thread_peek_data() {
484         let (tx, rx) = channel::<i32>();
485         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
486         tx.send(10).unwrap();
487         assert_eq!(rx.try_recv(), Ok(10));
488     }
489 
490     #[test]
oneshot_single_thread_peek_close()491     fn oneshot_single_thread_peek_close() {
492         let (tx, rx) = channel::<i32>();
493         drop(tx);
494         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
495         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
496     }
497 
498     #[test]
oneshot_single_thread_peek_open()499     fn oneshot_single_thread_peek_open() {
500         let (_tx, rx) = channel::<i32>();
501         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
502     }
503 
504     #[test]
oneshot_multi_task_recv_then_send()505     fn oneshot_multi_task_recv_then_send() {
506         let (tx, rx) = channel::<Box<i32>>();
507         let t = thread::spawn(move || {
508             assert!(*rx.recv().unwrap() == 10);
509         });
510 
511         tx.send(Box::new(10)).unwrap();
512         t.join().unwrap();
513     }
514 
515     #[test]
oneshot_multi_task_recv_then_close()516     fn oneshot_multi_task_recv_then_close() {
517         let (tx, rx) = channel::<Box<i32>>();
518         let t = thread::spawn(move || {
519             drop(tx);
520         });
521         thread::spawn(move || {
522             assert_eq!(rx.recv(), Err(RecvError));
523         })
524         .join()
525         .unwrap();
526         t.join().unwrap();
527     }
528 
529     #[test]
oneshot_multi_thread_close_stress()530     fn oneshot_multi_thread_close_stress() {
531         let stress_factor = stress_factor();
532         let mut ts = Vec::with_capacity(stress_factor);
533         for _ in 0..stress_factor {
534             let (tx, rx) = channel::<i32>();
535             let t = thread::spawn(move || {
536                 drop(rx);
537             });
538             ts.push(t);
539             drop(tx);
540         }
541         for t in ts {
542             t.join().unwrap();
543         }
544     }
545 
546     #[test]
oneshot_multi_thread_send_close_stress()547     fn oneshot_multi_thread_send_close_stress() {
548         let stress_factor = stress_factor();
549         let mut ts = Vec::with_capacity(2 * stress_factor);
550         for _ in 0..stress_factor {
551             let (tx, rx) = channel::<i32>();
552             let t = thread::spawn(move || {
553                 drop(rx);
554             });
555             ts.push(t);
556             thread::spawn(move || {
557                 let _ = tx.send(1);
558             })
559             .join()
560             .unwrap();
561         }
562         for t in ts {
563             t.join().unwrap();
564         }
565     }
566 
567     #[test]
oneshot_multi_thread_recv_close_stress()568     fn oneshot_multi_thread_recv_close_stress() {
569         let stress_factor = stress_factor();
570         let mut ts = Vec::with_capacity(2 * stress_factor);
571         for _ in 0..stress_factor {
572             let (tx, rx) = channel::<i32>();
573             let t = thread::spawn(move || {
574                 thread::spawn(move || {
575                     assert_eq!(rx.recv(), Err(RecvError));
576                 })
577                 .join()
578                 .unwrap();
579             });
580             ts.push(t);
581             let t2 = thread::spawn(move || {
582                 let t = thread::spawn(move || {
583                     drop(tx);
584                 });
585                 t.join().unwrap();
586             });
587             ts.push(t2);
588         }
589         for t in ts {
590             t.join().unwrap();
591         }
592     }
593 
594     #[test]
oneshot_multi_thread_send_recv_stress()595     fn oneshot_multi_thread_send_recv_stress() {
596         let stress_factor = stress_factor();
597         let mut ts = Vec::with_capacity(stress_factor);
598         for _ in 0..stress_factor {
599             let (tx, rx) = channel::<Box<isize>>();
600             let t = thread::spawn(move || {
601                 tx.send(Box::new(10)).unwrap();
602             });
603             ts.push(t);
604             assert!(*rx.recv().unwrap() == 10);
605         }
606         for t in ts {
607             t.join().unwrap();
608         }
609     }
610 
611     #[test]
stream_send_recv_stress()612     fn stream_send_recv_stress() {
613         let stress_factor = stress_factor();
614         let mut ts = Vec::with_capacity(2 * stress_factor);
615         for _ in 0..stress_factor {
616             let (tx, rx) = channel();
617 
618             if let Some(t) = send(tx, 0) {
619                 ts.push(t);
620             }
621             if let Some(t2) = recv(rx, 0) {
622                 ts.push(t2);
623             }
624 
625             fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
626                 if i == 10 {
627                     return None;
628                 }
629 
630                 Some(thread::spawn(move || {
631                     tx.send(Box::new(i)).unwrap();
632                     send(tx, i + 1);
633                 }))
634             }
635 
636             fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
637                 if i == 10 {
638                     return None;
639                 }
640 
641                 Some(thread::spawn(move || {
642                     assert!(*rx.recv().unwrap() == i);
643                     recv(rx, i + 1);
644                 }))
645             }
646         }
647         for t in ts {
648             t.join().unwrap();
649         }
650     }
651 
652     #[test]
oneshot_single_thread_recv_timeout()653     fn oneshot_single_thread_recv_timeout() {
654         let (tx, rx) = channel();
655         tx.send(()).unwrap();
656         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
657         assert_eq!(
658             rx.recv_timeout(Duration::from_millis(1)),
659             Err(RecvTimeoutError::Timeout)
660         );
661         tx.send(()).unwrap();
662         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
663     }
664 
665     #[test]
stress_recv_timeout_two_threads()666     fn stress_recv_timeout_two_threads() {
667         let (tx, rx) = channel();
668         let stress = stress_factor() + 100;
669         let timeout = Duration::from_millis(100);
670 
671         let t = thread::spawn(move || {
672             for i in 0..stress {
673                 if i % 2 == 0 {
674                     thread::sleep(timeout * 2);
675                 }
676                 tx.send(1usize).unwrap();
677             }
678         });
679 
680         let mut recv_count = 0;
681         loop {
682             match rx.recv_timeout(timeout) {
683                 Ok(n) => {
684                     assert_eq!(n, 1usize);
685                     recv_count += 1;
686                 }
687                 Err(RecvTimeoutError::Timeout) => continue,
688                 Err(RecvTimeoutError::Disconnected) => break,
689             }
690         }
691 
692         assert_eq!(recv_count, stress);
693         t.join().unwrap()
694     }
695 
696     #[test]
recv_timeout_upgrade()697     fn recv_timeout_upgrade() {
698         let (tx, rx) = channel::<()>();
699         let timeout = Duration::from_millis(1);
700         let _tx_clone = tx.clone();
701 
702         let start = Instant::now();
703         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
704         assert!(Instant::now() >= start + timeout);
705     }
706 
707     #[test]
stress_recv_timeout_shared()708     fn stress_recv_timeout_shared() {
709         let (tx, rx) = channel();
710         let stress = stress_factor() + 100;
711 
712         let mut ts = Vec::with_capacity(stress);
713         for i in 0..stress {
714             let tx = tx.clone();
715             let t = thread::spawn(move || {
716                 thread::sleep(Duration::from_millis(i as u64 * 10));
717                 tx.send(1usize).unwrap();
718             });
719             ts.push(t);
720         }
721 
722         drop(tx);
723 
724         let mut recv_count = 0;
725         loop {
726             match rx.recv_timeout(Duration::from_millis(10)) {
727                 Ok(n) => {
728                     assert_eq!(n, 1usize);
729                     recv_count += 1;
730                 }
731                 Err(RecvTimeoutError::Timeout) => continue,
732                 Err(RecvTimeoutError::Disconnected) => break,
733             }
734         }
735 
736         assert_eq!(recv_count, stress);
737         for t in ts {
738             t.join().unwrap();
739         }
740     }
741 
742     #[test]
recv_a_lot()743     fn recv_a_lot() {
744         #[cfg(miri)]
745         const N: usize = 50;
746         #[cfg(not(miri))]
747         const N: usize = 10000;
748 
749         // Regression test that we don't run out of stack in scheduler context
750         let (tx, rx) = channel();
751         for _ in 0..N {
752             tx.send(()).unwrap();
753         }
754         for _ in 0..N {
755             rx.recv().unwrap();
756         }
757     }
758 
759     #[test]
shared_recv_timeout()760     fn shared_recv_timeout() {
761         let (tx, rx) = channel();
762         let total = 5;
763         let mut ts = Vec::with_capacity(total);
764         for _ in 0..total {
765             let tx = tx.clone();
766             let t = thread::spawn(move || {
767                 tx.send(()).unwrap();
768             });
769             ts.push(t);
770         }
771 
772         for _ in 0..total {
773             rx.recv().unwrap();
774         }
775 
776         assert_eq!(
777             rx.recv_timeout(Duration::from_millis(1)),
778             Err(RecvTimeoutError::Timeout)
779         );
780         tx.send(()).unwrap();
781         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
782         for t in ts {
783             t.join().unwrap();
784         }
785     }
786 
787     #[test]
shared_chan_stress()788     fn shared_chan_stress() {
789         let (tx, rx) = channel();
790         let total = stress_factor() + 100;
791         let mut ts = Vec::with_capacity(total);
792         for _ in 0..total {
793             let tx = tx.clone();
794             let t = thread::spawn(move || {
795                 tx.send(()).unwrap();
796             });
797             ts.push(t);
798         }
799 
800         for _ in 0..total {
801             rx.recv().unwrap();
802         }
803         for t in ts {
804             t.join().unwrap();
805         }
806     }
807 
808     #[test]
test_nested_recv_iter()809     fn test_nested_recv_iter() {
810         let (tx, rx) = channel::<i32>();
811         let (total_tx, total_rx) = channel::<i32>();
812 
813         let t = thread::spawn(move || {
814             let mut acc = 0;
815             for x in rx.iter() {
816                 acc += x;
817             }
818             total_tx.send(acc).unwrap();
819         });
820 
821         tx.send(3).unwrap();
822         tx.send(1).unwrap();
823         tx.send(2).unwrap();
824         drop(tx);
825         assert_eq!(total_rx.recv().unwrap(), 6);
826         t.join().unwrap();
827     }
828 
829     #[test]
test_recv_iter_break()830     fn test_recv_iter_break() {
831         let (tx, rx) = channel::<i32>();
832         let (count_tx, count_rx) = channel();
833 
834         let t = thread::spawn(move || {
835             let mut count = 0;
836             for x in rx.iter() {
837                 if count >= 3 {
838                     break;
839                 } else {
840                     count += x;
841                 }
842             }
843             count_tx.send(count).unwrap();
844         });
845 
846         tx.send(2).unwrap();
847         tx.send(2).unwrap();
848         tx.send(2).unwrap();
849         let _ = tx.send(2);
850         drop(tx);
851         assert_eq!(count_rx.recv().unwrap(), 4);
852         t.join().unwrap();
853     }
854 
855     #[test]
test_recv_try_iter()856     fn test_recv_try_iter() {
857         let (request_tx, request_rx) = channel();
858         let (response_tx, response_rx) = channel();
859 
860         // Request `x`s until we have `6`.
861         let t = thread::spawn(move || {
862             let mut count = 0;
863             loop {
864                 for x in response_rx.try_iter() {
865                     count += x;
866                     if count == 6 {
867                         return count;
868                     }
869                 }
870                 request_tx.send(()).unwrap();
871             }
872         });
873 
874         for _ in request_rx.iter() {
875             if response_tx.send(2).is_err() {
876                 break;
877             }
878         }
879 
880         assert_eq!(t.join().unwrap(), 6);
881     }
882 
883     #[test]
test_recv_into_iter_owned()884     fn test_recv_into_iter_owned() {
885         let mut iter = {
886             let (tx, rx) = channel::<i32>();
887             tx.send(1).unwrap();
888             tx.send(2).unwrap();
889 
890             rx.into_iter()
891         };
892         assert_eq!(iter.next().unwrap(), 1);
893         assert_eq!(iter.next().unwrap(), 2);
894         assert!(iter.next().is_none());
895     }
896 
897     #[test]
test_recv_into_iter_borrowed()898     fn test_recv_into_iter_borrowed() {
899         let (tx, rx) = channel::<i32>();
900         tx.send(1).unwrap();
901         tx.send(2).unwrap();
902         drop(tx);
903         let mut iter = (&rx).into_iter();
904         assert_eq!(iter.next().unwrap(), 1);
905         assert_eq!(iter.next().unwrap(), 2);
906         assert!(iter.next().is_none());
907     }
908 
909     #[test]
try_recv_states()910     fn try_recv_states() {
911         let (tx1, rx1) = channel::<i32>();
912         let (tx2, rx2) = channel::<()>();
913         let (tx3, rx3) = channel::<()>();
914         let t = thread::spawn(move || {
915             rx2.recv().unwrap();
916             tx1.send(1).unwrap();
917             tx3.send(()).unwrap();
918             rx2.recv().unwrap();
919             drop(tx1);
920             tx3.send(()).unwrap();
921         });
922 
923         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
924         tx2.send(()).unwrap();
925         rx3.recv().unwrap();
926         assert_eq!(rx1.try_recv(), Ok(1));
927         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
928         tx2.send(()).unwrap();
929         rx3.recv().unwrap();
930         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
931         t.join().unwrap();
932     }
933 
934     // This bug used to end up in a livelock inside of the Receiver destructor
935     // because the internal state of the Shared packet was corrupted
936     #[test]
destroy_upgraded_shared_port_when_sender_still_active()937     fn destroy_upgraded_shared_port_when_sender_still_active() {
938         let (tx, rx) = channel();
939         let (tx2, rx2) = channel();
940         let t = thread::spawn(move || {
941             rx.recv().unwrap(); // wait on a oneshot
942             drop(rx); // destroy a shared
943             tx2.send(()).unwrap();
944         });
945         // make sure the other thread has gone to sleep
946         for _ in 0..5000 {
947             thread::yield_now();
948         }
949 
950         // upgrade to a shared chan and send a message
951         let tx2 = tx.clone();
952         drop(tx);
953         tx2.send(()).unwrap();
954 
955         // wait for the child thread to exit before we exit
956         rx2.recv().unwrap();
957         t.join().unwrap();
958     }
959 
960     #[test]
issue_32114()961     fn issue_32114() {
962         let (tx, _) = channel();
963         let _ = tx.send(123);
964         assert_eq!(tx.send(123), Err(SendError(123)));
965     }
966 }
967 
968 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
969 mod sync_channel_tests {
970     use super::*;
971 
972     use std::env;
973     use std::thread;
974 
stress_factor() -> usize975     pub fn stress_factor() -> usize {
976         match env::var("RUST_TEST_STRESS") {
977             Ok(val) => val.parse().unwrap(),
978             Err(..) => 1,
979         }
980     }
981 
982     #[test]
smoke()983     fn smoke() {
984         let (tx, rx) = sync_channel::<i32>(1);
985         tx.send(1).unwrap();
986         assert_eq!(rx.recv().unwrap(), 1);
987     }
988 
989     #[test]
drop_full()990     fn drop_full() {
991         let (tx, _rx) = sync_channel::<Box<isize>>(1);
992         tx.send(Box::new(1)).unwrap();
993     }
994 
995     #[test]
smoke_shared()996     fn smoke_shared() {
997         let (tx, rx) = sync_channel::<i32>(1);
998         tx.send(1).unwrap();
999         assert_eq!(rx.recv().unwrap(), 1);
1000         let tx = tx.clone();
1001         tx.send(1).unwrap();
1002         assert_eq!(rx.recv().unwrap(), 1);
1003     }
1004 
1005     #[test]
recv_timeout()1006     fn recv_timeout() {
1007         let (tx, rx) = sync_channel::<i32>(1);
1008         assert_eq!(
1009             rx.recv_timeout(Duration::from_millis(1)),
1010             Err(RecvTimeoutError::Timeout)
1011         );
1012         tx.send(1).unwrap();
1013         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
1014     }
1015 
1016     #[test]
smoke_threads()1017     fn smoke_threads() {
1018         let (tx, rx) = sync_channel::<i32>(0);
1019         let t = thread::spawn(move || {
1020             tx.send(1).unwrap();
1021         });
1022         assert_eq!(rx.recv().unwrap(), 1);
1023         t.join().unwrap();
1024     }
1025 
1026     #[test]
smoke_port_gone()1027     fn smoke_port_gone() {
1028         let (tx, rx) = sync_channel::<i32>(0);
1029         drop(rx);
1030         assert!(tx.send(1).is_err());
1031     }
1032 
1033     #[test]
smoke_shared_port_gone2()1034     fn smoke_shared_port_gone2() {
1035         let (tx, rx) = sync_channel::<i32>(0);
1036         drop(rx);
1037         let tx2 = tx.clone();
1038         drop(tx);
1039         assert!(tx2.send(1).is_err());
1040     }
1041 
1042     #[test]
port_gone_concurrent()1043     fn port_gone_concurrent() {
1044         let (tx, rx) = sync_channel::<i32>(0);
1045         let t = thread::spawn(move || {
1046             rx.recv().unwrap();
1047         });
1048         while tx.send(1).is_ok() {}
1049         t.join().unwrap();
1050     }
1051 
1052     #[test]
port_gone_concurrent_shared()1053     fn port_gone_concurrent_shared() {
1054         let (tx, rx) = sync_channel::<i32>(0);
1055         let tx2 = tx.clone();
1056         let t = thread::spawn(move || {
1057             rx.recv().unwrap();
1058         });
1059         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1060         t.join().unwrap();
1061     }
1062 
1063     #[test]
smoke_chan_gone()1064     fn smoke_chan_gone() {
1065         let (tx, rx) = sync_channel::<i32>(0);
1066         drop(tx);
1067         assert!(rx.recv().is_err());
1068     }
1069 
1070     #[test]
smoke_chan_gone_shared()1071     fn smoke_chan_gone_shared() {
1072         let (tx, rx) = sync_channel::<()>(0);
1073         let tx2 = tx.clone();
1074         drop(tx);
1075         drop(tx2);
1076         assert!(rx.recv().is_err());
1077     }
1078 
1079     #[test]
chan_gone_concurrent()1080     fn chan_gone_concurrent() {
1081         let (tx, rx) = sync_channel::<i32>(0);
1082         let t = thread::spawn(move || {
1083             tx.send(1).unwrap();
1084             tx.send(1).unwrap();
1085         });
1086         while rx.recv().is_ok() {}
1087         t.join().unwrap();
1088     }
1089 
1090     #[test]
stress()1091     fn stress() {
1092         #[cfg(miri)]
1093         const N: usize = 100;
1094         #[cfg(not(miri))]
1095         const N: usize = 10000;
1096 
1097         let (tx, rx) = sync_channel::<i32>(0);
1098         let t = thread::spawn(move || {
1099             for _ in 0..N {
1100                 tx.send(1).unwrap();
1101             }
1102         });
1103         for _ in 0..N {
1104             assert_eq!(rx.recv().unwrap(), 1);
1105         }
1106         t.join().unwrap();
1107     }
1108 
1109     #[test]
stress_recv_timeout_two_threads()1110     fn stress_recv_timeout_two_threads() {
1111         #[cfg(miri)]
1112         const N: usize = 100;
1113         #[cfg(not(miri))]
1114         const N: usize = 10000;
1115 
1116         let (tx, rx) = sync_channel::<i32>(0);
1117 
1118         let t = thread::spawn(move || {
1119             for _ in 0..N {
1120                 tx.send(1).unwrap();
1121             }
1122         });
1123 
1124         let mut recv_count = 0;
1125         loop {
1126             match rx.recv_timeout(Duration::from_millis(1)) {
1127                 Ok(v) => {
1128                     assert_eq!(v, 1);
1129                     recv_count += 1;
1130                 }
1131                 Err(RecvTimeoutError::Timeout) => continue,
1132                 Err(RecvTimeoutError::Disconnected) => break,
1133             }
1134         }
1135 
1136         assert_eq!(recv_count, N);
1137         t.join().unwrap();
1138     }
1139 
1140     #[test]
stress_recv_timeout_shared()1141     fn stress_recv_timeout_shared() {
1142         #[cfg(miri)]
1143         const AMT: u32 = 100;
1144         #[cfg(not(miri))]
1145         const AMT: u32 = 1000;
1146         const NTHREADS: u32 = 8;
1147         let (tx, rx) = sync_channel::<i32>(0);
1148         let (dtx, drx) = sync_channel::<()>(0);
1149 
1150         let t = thread::spawn(move || {
1151             let mut recv_count = 0;
1152             loop {
1153                 match rx.recv_timeout(Duration::from_millis(10)) {
1154                     Ok(v) => {
1155                         assert_eq!(v, 1);
1156                         recv_count += 1;
1157                     }
1158                     Err(RecvTimeoutError::Timeout) => continue,
1159                     Err(RecvTimeoutError::Disconnected) => break,
1160                 }
1161             }
1162 
1163             assert_eq!(recv_count, AMT * NTHREADS);
1164             assert!(rx.try_recv().is_err());
1165 
1166             dtx.send(()).unwrap();
1167         });
1168 
1169         let mut ts = Vec::with_capacity(NTHREADS as usize);
1170         for _ in 0..NTHREADS {
1171             let tx = tx.clone();
1172             let t = thread::spawn(move || {
1173                 for _ in 0..AMT {
1174                     tx.send(1).unwrap();
1175                 }
1176             });
1177             ts.push(t);
1178         }
1179 
1180         drop(tx);
1181 
1182         drx.recv().unwrap();
1183         for t in ts {
1184             t.join().unwrap();
1185         }
1186         t.join().unwrap();
1187     }
1188 
1189     #[test]
stress_shared()1190     fn stress_shared() {
1191         #[cfg(miri)]
1192         const AMT: u32 = 100;
1193         #[cfg(not(miri))]
1194         const AMT: u32 = 1000;
1195         const NTHREADS: u32 = 8;
1196         let (tx, rx) = sync_channel::<i32>(0);
1197         let (dtx, drx) = sync_channel::<()>(0);
1198 
1199         let t = thread::spawn(move || {
1200             for _ in 0..AMT * NTHREADS {
1201                 assert_eq!(rx.recv().unwrap(), 1);
1202             }
1203             assert!(rx.try_recv().is_err());
1204             dtx.send(()).unwrap();
1205         });
1206 
1207         let mut ts = Vec::with_capacity(NTHREADS as usize);
1208         for _ in 0..NTHREADS {
1209             let tx = tx.clone();
1210             let t = thread::spawn(move || {
1211                 for _ in 0..AMT {
1212                     tx.send(1).unwrap();
1213                 }
1214             });
1215             ts.push(t);
1216         }
1217         drop(tx);
1218         drx.recv().unwrap();
1219         for t in ts {
1220             t.join().unwrap();
1221         }
1222         t.join().unwrap();
1223     }
1224 
1225     #[test]
oneshot_single_thread_close_port_first()1226     fn oneshot_single_thread_close_port_first() {
1227         // Simple test of closing without sending
1228         let (_tx, rx) = sync_channel::<i32>(0);
1229         drop(rx);
1230     }
1231 
1232     #[test]
oneshot_single_thread_close_chan_first()1233     fn oneshot_single_thread_close_chan_first() {
1234         // Simple test of closing without sending
1235         let (tx, _rx) = sync_channel::<i32>(0);
1236         drop(tx);
1237     }
1238 
1239     #[test]
oneshot_single_thread_send_port_close()1240     fn oneshot_single_thread_send_port_close() {
1241         // Testing that the sender cleans up the payload if receiver is closed
1242         let (tx, rx) = sync_channel::<Box<i32>>(0);
1243         drop(rx);
1244         assert!(tx.send(Box::new(0)).is_err());
1245     }
1246 
1247     #[test]
oneshot_single_thread_recv_chan_close()1248     fn oneshot_single_thread_recv_chan_close() {
1249         let (tx, rx) = sync_channel::<i32>(0);
1250         drop(tx);
1251         assert_eq!(rx.recv(), Err(RecvError));
1252     }
1253 
1254     #[test]
oneshot_single_thread_send_then_recv()1255     fn oneshot_single_thread_send_then_recv() {
1256         let (tx, rx) = sync_channel::<Box<i32>>(1);
1257         tx.send(Box::new(10)).unwrap();
1258         assert!(*rx.recv().unwrap() == 10);
1259     }
1260 
1261     #[test]
oneshot_single_thread_try_send_open()1262     fn oneshot_single_thread_try_send_open() {
1263         let (tx, rx) = sync_channel::<i32>(1);
1264         assert_eq!(tx.try_send(10), Ok(()));
1265         assert!(rx.recv().unwrap() == 10);
1266     }
1267 
1268     #[test]
oneshot_single_thread_try_send_closed()1269     fn oneshot_single_thread_try_send_closed() {
1270         let (tx, rx) = sync_channel::<i32>(0);
1271         drop(rx);
1272         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1273     }
1274 
1275     #[test]
oneshot_single_thread_try_send_closed2()1276     fn oneshot_single_thread_try_send_closed2() {
1277         let (tx, _rx) = sync_channel::<i32>(0);
1278         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1279     }
1280 
1281     #[test]
oneshot_single_thread_try_recv_open()1282     fn oneshot_single_thread_try_recv_open() {
1283         let (tx, rx) = sync_channel::<i32>(1);
1284         tx.send(10).unwrap();
1285         assert!(rx.recv() == Ok(10));
1286     }
1287 
1288     #[test]
oneshot_single_thread_try_recv_closed()1289     fn oneshot_single_thread_try_recv_closed() {
1290         let (tx, rx) = sync_channel::<i32>(0);
1291         drop(tx);
1292         assert!(rx.recv().is_err());
1293     }
1294 
1295     #[test]
oneshot_single_thread_try_recv_closed_with_data()1296     fn oneshot_single_thread_try_recv_closed_with_data() {
1297         let (tx, rx) = sync_channel::<i32>(1);
1298         tx.send(10).unwrap();
1299         drop(tx);
1300         assert_eq!(rx.try_recv(), Ok(10));
1301         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1302     }
1303 
1304     #[test]
oneshot_single_thread_peek_data()1305     fn oneshot_single_thread_peek_data() {
1306         let (tx, rx) = sync_channel::<i32>(1);
1307         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1308         tx.send(10).unwrap();
1309         assert_eq!(rx.try_recv(), Ok(10));
1310     }
1311 
1312     #[test]
oneshot_single_thread_peek_close()1313     fn oneshot_single_thread_peek_close() {
1314         let (tx, rx) = sync_channel::<i32>(0);
1315         drop(tx);
1316         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1317         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1318     }
1319 
1320     #[test]
oneshot_single_thread_peek_open()1321     fn oneshot_single_thread_peek_open() {
1322         let (_tx, rx) = sync_channel::<i32>(0);
1323         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1324     }
1325 
1326     #[test]
oneshot_multi_task_recv_then_send()1327     fn oneshot_multi_task_recv_then_send() {
1328         let (tx, rx) = sync_channel::<Box<i32>>(0);
1329         let t = thread::spawn(move || {
1330             assert!(*rx.recv().unwrap() == 10);
1331         });
1332 
1333         tx.send(Box::new(10)).unwrap();
1334         t.join().unwrap();
1335     }
1336 
1337     #[test]
oneshot_multi_task_recv_then_close()1338     fn oneshot_multi_task_recv_then_close() {
1339         let (tx, rx) = sync_channel::<Box<i32>>(0);
1340         let t = thread::spawn(move || {
1341             drop(tx);
1342         });
1343         thread::spawn(move || {
1344             assert_eq!(rx.recv(), Err(RecvError));
1345         })
1346         .join()
1347         .unwrap();
1348         t.join().unwrap();
1349     }
1350 
1351     #[test]
oneshot_multi_thread_close_stress()1352     fn oneshot_multi_thread_close_stress() {
1353         let stress_factor = stress_factor();
1354         let mut ts = Vec::with_capacity(stress_factor);
1355         for _ in 0..stress_factor {
1356             let (tx, rx) = sync_channel::<i32>(0);
1357             let t = thread::spawn(move || {
1358                 drop(rx);
1359             });
1360             ts.push(t);
1361             drop(tx);
1362         }
1363         for t in ts {
1364             t.join().unwrap();
1365         }
1366     }
1367 
1368     #[test]
oneshot_multi_thread_send_close_stress()1369     fn oneshot_multi_thread_send_close_stress() {
1370         let stress_factor = stress_factor();
1371         let mut ts = Vec::with_capacity(stress_factor);
1372         for _ in 0..stress_factor {
1373             let (tx, rx) = sync_channel::<i32>(0);
1374             let t = thread::spawn(move || {
1375                 drop(rx);
1376             });
1377             ts.push(t);
1378             thread::spawn(move || {
1379                 let _ = tx.send(1);
1380             })
1381             .join()
1382             .unwrap();
1383         }
1384         for t in ts {
1385             t.join().unwrap();
1386         }
1387     }
1388 
1389     #[test]
oneshot_multi_thread_recv_close_stress()1390     fn oneshot_multi_thread_recv_close_stress() {
1391         let stress_factor = stress_factor();
1392         let mut ts = Vec::with_capacity(2 * stress_factor);
1393         for _ in 0..stress_factor {
1394             let (tx, rx) = sync_channel::<i32>(0);
1395             let t = thread::spawn(move || {
1396                 thread::spawn(move || {
1397                     assert_eq!(rx.recv(), Err(RecvError));
1398                 })
1399                 .join()
1400                 .unwrap();
1401             });
1402             ts.push(t);
1403             let t2 = thread::spawn(move || {
1404                 thread::spawn(move || {
1405                     drop(tx);
1406                 });
1407             });
1408             ts.push(t2);
1409         }
1410         for t in ts {
1411             t.join().unwrap();
1412         }
1413     }
1414 
1415     #[test]
oneshot_multi_thread_send_recv_stress()1416     fn oneshot_multi_thread_send_recv_stress() {
1417         let stress_factor = stress_factor();
1418         let mut ts = Vec::with_capacity(stress_factor);
1419         for _ in 0..stress_factor {
1420             let (tx, rx) = sync_channel::<Box<i32>>(0);
1421             let t = thread::spawn(move || {
1422                 tx.send(Box::new(10)).unwrap();
1423             });
1424             ts.push(t);
1425             assert!(*rx.recv().unwrap() == 10);
1426         }
1427         for t in ts {
1428             t.join().unwrap();
1429         }
1430     }
1431 
1432     #[test]
stream_send_recv_stress()1433     fn stream_send_recv_stress() {
1434         let stress_factor = stress_factor();
1435         let mut ts = Vec::with_capacity(2 * stress_factor);
1436         for _ in 0..stress_factor {
1437             let (tx, rx) = sync_channel::<Box<i32>>(0);
1438 
1439             if let Some(t) = send(tx, 0) {
1440                 ts.push(t);
1441             }
1442             if let Some(t) = recv(rx, 0) {
1443                 ts.push(t);
1444             }
1445 
1446             fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1447                 if i == 10 {
1448                     return None;
1449                 }
1450 
1451                 Some(thread::spawn(move || {
1452                     tx.send(Box::new(i)).unwrap();
1453                     send(tx, i + 1);
1454                 }))
1455             }
1456 
1457             fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1458                 if i == 10 {
1459                     return None;
1460                 }
1461 
1462                 Some(thread::spawn(move || {
1463                     assert!(*rx.recv().unwrap() == i);
1464                     recv(rx, i + 1);
1465                 }))
1466             }
1467         }
1468         for t in ts {
1469             t.join().unwrap();
1470         }
1471     }
1472 
1473     #[test]
recv_a_lot()1474     fn recv_a_lot() {
1475         #[cfg(miri)]
1476         const N: usize = 100;
1477         #[cfg(not(miri))]
1478         const N: usize = 10000;
1479 
1480         // Regression test that we don't run out of stack in scheduler context
1481         let (tx, rx) = sync_channel(N);
1482         for _ in 0..N {
1483             tx.send(()).unwrap();
1484         }
1485         for _ in 0..N {
1486             rx.recv().unwrap();
1487         }
1488     }
1489 
1490     #[test]
shared_chan_stress()1491     fn shared_chan_stress() {
1492         let (tx, rx) = sync_channel(0);
1493         let total = stress_factor() + 100;
1494         let mut ts = Vec::with_capacity(total);
1495         for _ in 0..total {
1496             let tx = tx.clone();
1497             let t = thread::spawn(move || {
1498                 tx.send(()).unwrap();
1499             });
1500             ts.push(t);
1501         }
1502 
1503         for _ in 0..total {
1504             rx.recv().unwrap();
1505         }
1506         for t in ts {
1507             t.join().unwrap();
1508         }
1509     }
1510 
1511     #[test]
test_nested_recv_iter()1512     fn test_nested_recv_iter() {
1513         let (tx, rx) = sync_channel::<i32>(0);
1514         let (total_tx, total_rx) = sync_channel::<i32>(0);
1515 
1516         let t = thread::spawn(move || {
1517             let mut acc = 0;
1518             for x in rx.iter() {
1519                 acc += x;
1520             }
1521             total_tx.send(acc).unwrap();
1522         });
1523 
1524         tx.send(3).unwrap();
1525         tx.send(1).unwrap();
1526         tx.send(2).unwrap();
1527         drop(tx);
1528         assert_eq!(total_rx.recv().unwrap(), 6);
1529         t.join().unwrap();
1530     }
1531 
1532     #[test]
test_recv_iter_break()1533     fn test_recv_iter_break() {
1534         let (tx, rx) = sync_channel::<i32>(0);
1535         let (count_tx, count_rx) = sync_channel(0);
1536 
1537         let t = thread::spawn(move || {
1538             let mut count = 0;
1539             for x in rx.iter() {
1540                 if count >= 3 {
1541                     break;
1542                 } else {
1543                     count += x;
1544                 }
1545             }
1546             count_tx.send(count).unwrap();
1547         });
1548 
1549         tx.send(2).unwrap();
1550         tx.send(2).unwrap();
1551         tx.send(2).unwrap();
1552         let _ = tx.try_send(2);
1553         drop(tx);
1554         assert_eq!(count_rx.recv().unwrap(), 4);
1555         t.join().unwrap();
1556     }
1557 
1558     #[test]
try_recv_states()1559     fn try_recv_states() {
1560         let (tx1, rx1) = sync_channel::<i32>(1);
1561         let (tx2, rx2) = sync_channel::<()>(1);
1562         let (tx3, rx3) = sync_channel::<()>(1);
1563         let t = thread::spawn(move || {
1564             rx2.recv().unwrap();
1565             tx1.send(1).unwrap();
1566             tx3.send(()).unwrap();
1567             rx2.recv().unwrap();
1568             drop(tx1);
1569             tx3.send(()).unwrap();
1570         });
1571 
1572         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1573         tx2.send(()).unwrap();
1574         rx3.recv().unwrap();
1575         assert_eq!(rx1.try_recv(), Ok(1));
1576         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1577         tx2.send(()).unwrap();
1578         rx3.recv().unwrap();
1579         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1580         t.join().unwrap();
1581     }
1582 
1583     // This bug used to end up in a livelock inside of the Receiver destructor
1584     // because the internal state of the Shared packet was corrupted
1585     #[test]
destroy_upgraded_shared_port_when_sender_still_active()1586     fn destroy_upgraded_shared_port_when_sender_still_active() {
1587         let (tx, rx) = sync_channel::<()>(0);
1588         let (tx2, rx2) = sync_channel::<()>(0);
1589         let t = thread::spawn(move || {
1590             rx.recv().unwrap(); // wait on a oneshot
1591             drop(rx); // destroy a shared
1592             tx2.send(()).unwrap();
1593         });
1594         // make sure the other thread has gone to sleep
1595         for _ in 0..5000 {
1596             thread::yield_now();
1597         }
1598 
1599         // upgrade to a shared chan and send a message
1600         let tx2 = tx.clone();
1601         drop(tx);
1602         tx2.send(()).unwrap();
1603 
1604         // wait for the child thread to exit before we exit
1605         rx2.recv().unwrap();
1606         t.join().unwrap();
1607     }
1608 
1609     #[test]
send1()1610     fn send1() {
1611         let (tx, rx) = sync_channel::<i32>(0);
1612         let t = thread::spawn(move || {
1613             rx.recv().unwrap();
1614         });
1615         assert_eq!(tx.send(1), Ok(()));
1616         t.join().unwrap();
1617     }
1618 
1619     #[test]
send2()1620     fn send2() {
1621         let (tx, rx) = sync_channel::<i32>(0);
1622         let t = thread::spawn(move || {
1623             drop(rx);
1624         });
1625         assert!(tx.send(1).is_err());
1626         t.join().unwrap();
1627     }
1628 
1629     #[test]
send3()1630     fn send3() {
1631         let (tx, rx) = sync_channel::<i32>(1);
1632         assert_eq!(tx.send(1), Ok(()));
1633         let t = thread::spawn(move || {
1634             drop(rx);
1635         });
1636         assert!(tx.send(1).is_err());
1637         t.join().unwrap();
1638     }
1639 
1640     #[test]
send4()1641     fn send4() {
1642         let (tx, rx) = sync_channel::<i32>(0);
1643         let tx2 = tx.clone();
1644         let (done, donerx) = channel();
1645         let done2 = done.clone();
1646         let t = thread::spawn(move || {
1647             assert!(tx.send(1).is_err());
1648             done.send(()).unwrap();
1649         });
1650         let t2 = thread::spawn(move || {
1651             assert!(tx2.send(2).is_err());
1652             done2.send(()).unwrap();
1653         });
1654         drop(rx);
1655         donerx.recv().unwrap();
1656         donerx.recv().unwrap();
1657         t.join().unwrap();
1658         t2.join().unwrap();
1659     }
1660 
1661     #[test]
try_send1()1662     fn try_send1() {
1663         let (tx, _rx) = sync_channel::<i32>(0);
1664         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1665     }
1666 
1667     #[test]
try_send2()1668     fn try_send2() {
1669         let (tx, _rx) = sync_channel::<i32>(1);
1670         assert_eq!(tx.try_send(1), Ok(()));
1671         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1672     }
1673 
1674     #[test]
try_send3()1675     fn try_send3() {
1676         let (tx, rx) = sync_channel::<i32>(1);
1677         assert_eq!(tx.try_send(1), Ok(()));
1678         drop(rx);
1679         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
1680     }
1681 
1682     #[test]
issue_15761()1683     fn issue_15761() {
1684         fn repro() {
1685             let (tx1, rx1) = sync_channel::<()>(3);
1686             let (tx2, rx2) = sync_channel::<()>(3);
1687 
1688             let _t = thread::spawn(move || {
1689                 rx1.recv().unwrap();
1690                 tx2.try_send(()).unwrap();
1691             });
1692 
1693             tx1.try_send(()).unwrap();
1694             rx2.recv().unwrap();
1695         }
1696 
1697         for _ in 0..100 {
1698             repro()
1699         }
1700     }
1701 }
1702 
1703 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs
1704 mod select_tests {
1705     use super::*;
1706 
1707     use std::thread;
1708 
1709     #[test]
smoke()1710     fn smoke() {
1711         let (tx1, rx1) = channel::<i32>();
1712         let (tx2, rx2) = channel::<i32>();
1713         tx1.send(1).unwrap();
1714         select! {
1715             foo = rx1.recv() => assert_eq!(foo.unwrap(), 1),
1716             _bar = rx2.recv() => panic!()
1717         }
1718         tx2.send(2).unwrap();
1719         select! {
1720             _foo = rx1.recv() => panic!(),
1721             bar = rx2.recv() => assert_eq!(bar.unwrap(), 2)
1722         }
1723         drop(tx1);
1724         select! {
1725             foo = rx1.recv() => assert!(foo.is_err()),
1726             _bar = rx2.recv() => panic!()
1727         }
1728         drop(tx2);
1729         select! {
1730             bar = rx2.recv() => assert!(bar.is_err())
1731         }
1732     }
1733 
1734     #[test]
smoke2()1735     fn smoke2() {
1736         let (_tx1, rx1) = channel::<i32>();
1737         let (_tx2, rx2) = channel::<i32>();
1738         let (_tx3, rx3) = channel::<i32>();
1739         let (_tx4, rx4) = channel::<i32>();
1740         let (tx5, rx5) = channel::<i32>();
1741         tx5.send(4).unwrap();
1742         select! {
1743             _foo = rx1.recv() => panic!("1"),
1744             _foo = rx2.recv() => panic!("2"),
1745             _foo = rx3.recv() => panic!("3"),
1746             _foo = rx4.recv() => panic!("4"),
1747             foo = rx5.recv() => assert_eq!(foo.unwrap(), 4)
1748         }
1749     }
1750 
1751     #[test]
closed()1752     fn closed() {
1753         let (_tx1, rx1) = channel::<i32>();
1754         let (tx2, rx2) = channel::<i32>();
1755         drop(tx2);
1756 
1757         select! {
1758             _a1 = rx1.recv() => panic!(),
1759             a2 = rx2.recv() => assert!(a2.is_err())
1760         }
1761     }
1762 
1763     #[test]
unblocks()1764     fn unblocks() {
1765         let (tx1, rx1) = channel::<i32>();
1766         let (_tx2, rx2) = channel::<i32>();
1767         let (tx3, rx3) = channel::<i32>();
1768 
1769         let t = thread::spawn(move || {
1770             for _ in 0..20 {
1771                 thread::yield_now();
1772             }
1773             tx1.send(1).unwrap();
1774             rx3.recv().unwrap();
1775             for _ in 0..20 {
1776                 thread::yield_now();
1777             }
1778         });
1779 
1780         select! {
1781             a = rx1.recv() => assert_eq!(a.unwrap(), 1),
1782             _b = rx2.recv() => panic!()
1783         }
1784         tx3.send(1).unwrap();
1785         select! {
1786             a = rx1.recv() => assert!(a.is_err()),
1787             _b = rx2.recv() => panic!()
1788         }
1789         t.join().unwrap();
1790     }
1791 
1792     #[test]
both_ready()1793     fn both_ready() {
1794         let (tx1, rx1) = channel::<i32>();
1795         let (tx2, rx2) = channel::<i32>();
1796         let (tx3, rx3) = channel::<()>();
1797 
1798         let t = thread::spawn(move || {
1799             for _ in 0..20 {
1800                 thread::yield_now();
1801             }
1802             tx1.send(1).unwrap();
1803             tx2.send(2).unwrap();
1804             rx3.recv().unwrap();
1805         });
1806 
1807         select! {
1808             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1809             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1810         }
1811         select! {
1812             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1813             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1814         }
1815         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1816         assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
1817         tx3.send(()).unwrap();
1818         t.join().unwrap();
1819     }
1820 
1821     #[test]
stress()1822     fn stress() {
1823         #[cfg(miri)]
1824         const AMT: i32 = 100;
1825         #[cfg(not(miri))]
1826         const AMT: i32 = 10000;
1827 
1828         let (tx1, rx1) = channel::<i32>();
1829         let (tx2, rx2) = channel::<i32>();
1830         let (tx3, rx3) = channel::<()>();
1831 
1832         let t = thread::spawn(move || {
1833             for i in 0..AMT {
1834                 if i % 2 == 0 {
1835                     tx1.send(i).unwrap();
1836                 } else {
1837                     tx2.send(i).unwrap();
1838                 }
1839                 rx3.recv().unwrap();
1840             }
1841         });
1842 
1843         for i in 0..AMT {
1844             select! {
1845                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
1846                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
1847             }
1848             tx3.send(()).unwrap();
1849         }
1850         t.join().unwrap();
1851     }
1852 
1853     #[allow(unused_must_use)]
1854     #[test]
cloning()1855     fn cloning() {
1856         let (tx1, rx1) = channel::<i32>();
1857         let (_tx2, rx2) = channel::<i32>();
1858         let (tx3, rx3) = channel::<()>();
1859 
1860         let t = thread::spawn(move || {
1861             rx3.recv().unwrap();
1862             tx1.clone();
1863             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1864             tx1.send(2).unwrap();
1865             rx3.recv().unwrap();
1866         });
1867 
1868         tx3.send(()).unwrap();
1869         select! {
1870             _i1 = rx1.recv() => {},
1871             _i2 = rx2.recv() => panic!()
1872         }
1873         tx3.send(()).unwrap();
1874         t.join().unwrap();
1875     }
1876 
1877     #[allow(unused_must_use)]
1878     #[test]
cloning2()1879     fn cloning2() {
1880         let (tx1, rx1) = channel::<i32>();
1881         let (_tx2, rx2) = channel::<i32>();
1882         let (tx3, rx3) = channel::<()>();
1883 
1884         let t = thread::spawn(move || {
1885             rx3.recv().unwrap();
1886             tx1.clone();
1887             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1888             tx1.send(2).unwrap();
1889             rx3.recv().unwrap();
1890         });
1891 
1892         tx3.send(()).unwrap();
1893         select! {
1894             _i1 = rx1.recv() => {},
1895             _i2 = rx2.recv() => panic!()
1896         }
1897         tx3.send(()).unwrap();
1898         t.join().unwrap();
1899     }
1900 
1901     #[test]
cloning3()1902     fn cloning3() {
1903         let (tx1, rx1) = channel::<()>();
1904         let (tx2, rx2) = channel::<()>();
1905         let (tx3, rx3) = channel::<()>();
1906         let t = thread::spawn(move || {
1907             select! {
1908                 _ = rx1.recv() => panic!(),
1909                 _ = rx2.recv() => {}
1910             }
1911             tx3.send(()).unwrap();
1912         });
1913 
1914         for _ in 0..1000 {
1915             thread::yield_now();
1916         }
1917         drop(tx1.clone());
1918         tx2.send(()).unwrap();
1919         rx3.recv().unwrap();
1920         t.join().unwrap();
1921     }
1922 
1923     #[test]
preflight1()1924     fn preflight1() {
1925         let (tx, rx) = channel();
1926         tx.send(()).unwrap();
1927         select! {
1928             _n = rx.recv() => {}
1929         }
1930     }
1931 
1932     #[test]
preflight2()1933     fn preflight2() {
1934         let (tx, rx) = channel();
1935         tx.send(()).unwrap();
1936         tx.send(()).unwrap();
1937         select! {
1938             _n = rx.recv() => {}
1939         }
1940     }
1941 
1942     #[test]
preflight3()1943     fn preflight3() {
1944         let (tx, rx) = channel();
1945         drop(tx.clone());
1946         tx.send(()).unwrap();
1947         select! {
1948             _n = rx.recv() => {}
1949         }
1950     }
1951 
1952     #[test]
preflight4()1953     fn preflight4() {
1954         let (tx, rx) = channel();
1955         tx.send(()).unwrap();
1956         select! {
1957             _ = rx.recv() => {}
1958         }
1959     }
1960 
1961     #[test]
preflight5()1962     fn preflight5() {
1963         let (tx, rx) = channel();
1964         tx.send(()).unwrap();
1965         tx.send(()).unwrap();
1966         select! {
1967             _ = rx.recv() => {}
1968         }
1969     }
1970 
1971     #[test]
preflight6()1972     fn preflight6() {
1973         let (tx, rx) = channel();
1974         drop(tx.clone());
1975         tx.send(()).unwrap();
1976         select! {
1977             _ = rx.recv() => {}
1978         }
1979     }
1980 
1981     #[test]
preflight7()1982     fn preflight7() {
1983         let (tx, rx) = channel::<()>();
1984         drop(tx);
1985         select! {
1986             _ = rx.recv() => {}
1987         }
1988     }
1989 
1990     #[test]
preflight8()1991     fn preflight8() {
1992         let (tx, rx) = channel();
1993         tx.send(()).unwrap();
1994         drop(tx);
1995         rx.recv().unwrap();
1996         select! {
1997             _ = rx.recv() => {}
1998         }
1999     }
2000 
2001     #[test]
preflight9()2002     fn preflight9() {
2003         let (tx, rx) = channel();
2004         drop(tx.clone());
2005         tx.send(()).unwrap();
2006         drop(tx);
2007         rx.recv().unwrap();
2008         select! {
2009             _ = rx.recv() => {}
2010         }
2011     }
2012 
2013     #[test]
oneshot_data_waiting()2014     fn oneshot_data_waiting() {
2015         let (tx1, rx1) = channel();
2016         let (tx2, rx2) = channel();
2017         let t = thread::spawn(move || {
2018             select! {
2019                 _n = rx1.recv() => {}
2020             }
2021             tx2.send(()).unwrap();
2022         });
2023 
2024         for _ in 0..100 {
2025             thread::yield_now()
2026         }
2027         tx1.send(()).unwrap();
2028         rx2.recv().unwrap();
2029         t.join().unwrap();
2030     }
2031 
2032     #[test]
stream_data_waiting()2033     fn stream_data_waiting() {
2034         let (tx1, rx1) = channel();
2035         let (tx2, rx2) = channel();
2036         tx1.send(()).unwrap();
2037         tx1.send(()).unwrap();
2038         rx1.recv().unwrap();
2039         rx1.recv().unwrap();
2040         let t = thread::spawn(move || {
2041             select! {
2042                 _n = rx1.recv() => {}
2043             }
2044             tx2.send(()).unwrap();
2045         });
2046 
2047         for _ in 0..100 {
2048             thread::yield_now()
2049         }
2050         tx1.send(()).unwrap();
2051         rx2.recv().unwrap();
2052         t.join().unwrap();
2053     }
2054 
2055     #[test]
shared_data_waiting()2056     fn shared_data_waiting() {
2057         let (tx1, rx1) = channel();
2058         let (tx2, rx2) = channel();
2059         drop(tx1.clone());
2060         tx1.send(()).unwrap();
2061         rx1.recv().unwrap();
2062         let t = thread::spawn(move || {
2063             select! {
2064                 _n = rx1.recv() => {}
2065             }
2066             tx2.send(()).unwrap();
2067         });
2068 
2069         for _ in 0..100 {
2070             thread::yield_now()
2071         }
2072         tx1.send(()).unwrap();
2073         rx2.recv().unwrap();
2074         t.join().unwrap();
2075     }
2076 
2077     #[test]
sync1()2078     fn sync1() {
2079         let (tx, rx) = sync_channel::<i32>(1);
2080         tx.send(1).unwrap();
2081         select! {
2082             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2083         }
2084     }
2085 
2086     #[test]
sync2()2087     fn sync2() {
2088         let (tx, rx) = sync_channel::<i32>(0);
2089         let t = thread::spawn(move || {
2090             for _ in 0..100 {
2091                 thread::yield_now()
2092             }
2093             tx.send(1).unwrap();
2094         });
2095         select! {
2096             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2097         }
2098         t.join().unwrap();
2099     }
2100 
2101     #[test]
sync3()2102     fn sync3() {
2103         let (tx1, rx1) = sync_channel::<i32>(0);
2104         let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
2105         let t = thread::spawn(move || {
2106             tx1.send(1).unwrap();
2107         });
2108         let t2 = thread::spawn(move || {
2109             tx2.send(2).unwrap();
2110         });
2111         select! {
2112             n = rx1.recv() => {
2113                 let n = n.unwrap();
2114                 assert_eq!(n, 1);
2115                 assert_eq!(rx2.recv().unwrap(), 2);
2116             },
2117             n = rx2.recv() => {
2118                 let n = n.unwrap();
2119                 assert_eq!(n, 2);
2120                 assert_eq!(rx1.recv().unwrap(), 1);
2121             }
2122         }
2123         t.join().unwrap();
2124         t2.join().unwrap();
2125     }
2126 }
2127