1 //! Tests copied from `std::sync::mpsc`.
2 //!
3 //! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but
4 //! modified to work with `crossbeam-channel` instead.
5 //!
6 //! Minor tweaks were needed to make the tests compile:
7 //!
8 //! - Replace `box` syntax with `Box::new`.
9 //! - Replace all uses of `Select` with `select!`.
10 //! - Change the imports.
11 //! - Join all spawned threads.
12 //! - Removed assertion from oneshot_multi_thread_send_close_stress tests.
13 //!
14 //! Source:
15 //! - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc
16 //!
17 //! Copyright & License:
18 //! - Copyright 2013-2014 The Rust Project Developers
19 //! - Apache License, Version 2.0 or MIT license, at your option
20 //! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT
21 //! - https://www.rust-lang.org/en-US/legal.html
22
23 #![allow(clippy::match_single_binding, clippy::redundant_clone)]
24
25 use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
26 use std::sync::mpsc::{SendError, TrySendError};
27 use std::thread::JoinHandle;
28 use std::time::Duration;
29
30 use crossbeam_channel as cc;
31
32 pub struct Sender<T> {
33 pub inner: cc::Sender<T>,
34 }
35
36 impl<T> Sender<T> {
send(&self, t: T) -> Result<(), SendError<T>>37 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
38 self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
39 }
40 }
41
42 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>43 fn clone(&self) -> Sender<T> {
44 Sender {
45 inner: self.inner.clone(),
46 }
47 }
48 }
49
50 pub struct SyncSender<T> {
51 pub inner: cc::Sender<T>,
52 }
53
54 impl<T> SyncSender<T> {
send(&self, t: T) -> Result<(), SendError<T>>55 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
56 self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
57 }
58
try_send(&self, t: T) -> Result<(), TrySendError<T>>59 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
60 self.inner.try_send(t).map_err(|err| match err {
61 cc::TrySendError::Full(m) => TrySendError::Full(m),
62 cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m),
63 })
64 }
65 }
66
67 impl<T> Clone for SyncSender<T> {
clone(&self) -> SyncSender<T>68 fn clone(&self) -> SyncSender<T> {
69 SyncSender {
70 inner: self.inner.clone(),
71 }
72 }
73 }
74
75 pub struct Receiver<T> {
76 pub inner: cc::Receiver<T>,
77 }
78
79 impl<T> Receiver<T> {
try_recv(&self) -> Result<T, TryRecvError>80 pub fn try_recv(&self) -> Result<T, TryRecvError> {
81 self.inner.try_recv().map_err(|err| match err {
82 cc::TryRecvError::Empty => TryRecvError::Empty,
83 cc::TryRecvError::Disconnected => TryRecvError::Disconnected,
84 })
85 }
86
recv(&self) -> Result<T, RecvError>87 pub fn recv(&self) -> Result<T, RecvError> {
88 self.inner.recv().map_err(|_| RecvError)
89 }
90
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>91 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
92 self.inner.recv_timeout(timeout).map_err(|err| match err {
93 cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
94 cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
95 })
96 }
97
iter(&self) -> Iter<T>98 pub fn iter(&self) -> Iter<T> {
99 Iter { inner: self }
100 }
101
try_iter(&self) -> TryIter<T>102 pub fn try_iter(&self) -> TryIter<T> {
103 TryIter { inner: self }
104 }
105 }
106
107 impl<'a, T> IntoIterator for &'a Receiver<T> {
108 type Item = T;
109 type IntoIter = Iter<'a, T>;
110
into_iter(self) -> Iter<'a, T>111 fn into_iter(self) -> Iter<'a, T> {
112 self.iter()
113 }
114 }
115
116 impl<T> IntoIterator for Receiver<T> {
117 type Item = T;
118 type IntoIter = IntoIter<T>;
119
into_iter(self) -> IntoIter<T>120 fn into_iter(self) -> IntoIter<T> {
121 IntoIter { inner: self }
122 }
123 }
124
125 pub struct TryIter<'a, T: 'a> {
126 inner: &'a Receiver<T>,
127 }
128
129 impl<'a, T> Iterator for TryIter<'a, T> {
130 type Item = T;
131
next(&mut self) -> Option<T>132 fn next(&mut self) -> Option<T> {
133 self.inner.try_recv().ok()
134 }
135 }
136
137 pub struct Iter<'a, T: 'a> {
138 inner: &'a Receiver<T>,
139 }
140
141 impl<'a, T> Iterator for Iter<'a, T> {
142 type Item = T;
143
next(&mut self) -> Option<T>144 fn next(&mut self) -> Option<T> {
145 self.inner.recv().ok()
146 }
147 }
148
149 pub struct IntoIter<T> {
150 inner: Receiver<T>,
151 }
152
153 impl<T> Iterator for IntoIter<T> {
154 type Item = T;
155
next(&mut self) -> Option<T>156 fn next(&mut self) -> Option<T> {
157 self.inner.recv().ok()
158 }
159 }
160
channel<T>() -> (Sender<T>, Receiver<T>)161 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
162 let (s, r) = cc::unbounded();
163 let s = Sender { inner: s };
164 let r = Receiver { inner: r };
165 (s, r)
166 }
167
sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)168 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
169 let (s, r) = cc::bounded(bound);
170 let s = SyncSender { inner: s };
171 let r = Receiver { inner: r };
172 (s, r)
173 }
174
175 macro_rules! select {
176 (
177 $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
178 ) => ({
179 const _IS_BIASED: bool = false;
180
181 cc::crossbeam_channel_internal! {
182 $(
183 $meth(($rx).inner) -> res => {
184 let $name = res.map_err(|_| ::std::sync::mpsc::RecvError);
185 $code
186 }
187 )+
188 }
189 })
190 }
191
192 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
193 mod channel_tests {
194 use super::*;
195
196 use std::env;
197 use std::thread;
198 use std::time::Instant;
199
stress_factor() -> usize200 pub fn stress_factor() -> usize {
201 match env::var("RUST_TEST_STRESS") {
202 Ok(val) => val.parse().unwrap(),
203 Err(..) => 1,
204 }
205 }
206
207 #[test]
smoke()208 fn smoke() {
209 let (tx, rx) = channel::<i32>();
210 tx.send(1).unwrap();
211 assert_eq!(rx.recv().unwrap(), 1);
212 }
213
214 #[test]
drop_full()215 fn drop_full() {
216 let (tx, _rx) = channel::<Box<isize>>();
217 tx.send(Box::new(1)).unwrap();
218 }
219
220 #[test]
drop_full_shared()221 fn drop_full_shared() {
222 let (tx, _rx) = channel::<Box<isize>>();
223 drop(tx.clone());
224 drop(tx.clone());
225 tx.send(Box::new(1)).unwrap();
226 }
227
228 #[test]
smoke_shared()229 fn smoke_shared() {
230 let (tx, rx) = channel::<i32>();
231 tx.send(1).unwrap();
232 assert_eq!(rx.recv().unwrap(), 1);
233 let tx = tx.clone();
234 tx.send(1).unwrap();
235 assert_eq!(rx.recv().unwrap(), 1);
236 }
237
238 #[test]
smoke_threads()239 fn smoke_threads() {
240 let (tx, rx) = channel::<i32>();
241 let t = thread::spawn(move || {
242 tx.send(1).unwrap();
243 });
244 assert_eq!(rx.recv().unwrap(), 1);
245 t.join().unwrap();
246 }
247
248 #[test]
smoke_port_gone()249 fn smoke_port_gone() {
250 let (tx, rx) = channel::<i32>();
251 drop(rx);
252 assert!(tx.send(1).is_err());
253 }
254
255 #[test]
smoke_shared_port_gone()256 fn smoke_shared_port_gone() {
257 let (tx, rx) = channel::<i32>();
258 drop(rx);
259 assert!(tx.send(1).is_err())
260 }
261
262 #[test]
smoke_shared_port_gone2()263 fn smoke_shared_port_gone2() {
264 let (tx, rx) = channel::<i32>();
265 drop(rx);
266 let tx2 = tx.clone();
267 drop(tx);
268 assert!(tx2.send(1).is_err());
269 }
270
271 #[test]
port_gone_concurrent()272 fn port_gone_concurrent() {
273 let (tx, rx) = channel::<i32>();
274 let t = thread::spawn(move || {
275 rx.recv().unwrap();
276 });
277 while tx.send(1).is_ok() {}
278 t.join().unwrap();
279 }
280
281 #[test]
port_gone_concurrent_shared()282 fn port_gone_concurrent_shared() {
283 let (tx, rx) = channel::<i32>();
284 let tx2 = tx.clone();
285 let t = thread::spawn(move || {
286 rx.recv().unwrap();
287 });
288 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
289 t.join().unwrap();
290 }
291
292 #[test]
smoke_chan_gone()293 fn smoke_chan_gone() {
294 let (tx, rx) = channel::<i32>();
295 drop(tx);
296 assert!(rx.recv().is_err());
297 }
298
299 #[test]
smoke_chan_gone_shared()300 fn smoke_chan_gone_shared() {
301 let (tx, rx) = channel::<()>();
302 let tx2 = tx.clone();
303 drop(tx);
304 drop(tx2);
305 assert!(rx.recv().is_err());
306 }
307
308 #[test]
chan_gone_concurrent()309 fn chan_gone_concurrent() {
310 let (tx, rx) = channel::<i32>();
311 let t = thread::spawn(move || {
312 tx.send(1).unwrap();
313 tx.send(1).unwrap();
314 });
315 while rx.recv().is_ok() {}
316 t.join().unwrap();
317 }
318
319 #[test]
stress()320 fn stress() {
321 #[cfg(miri)]
322 const COUNT: usize = 100;
323 #[cfg(not(miri))]
324 const COUNT: usize = 10000;
325
326 let (tx, rx) = channel::<i32>();
327 let t = thread::spawn(move || {
328 for _ in 0..COUNT {
329 tx.send(1).unwrap();
330 }
331 });
332 for _ in 0..COUNT {
333 assert_eq!(rx.recv().unwrap(), 1);
334 }
335 t.join().ok().unwrap();
336 }
337
338 #[test]
stress_shared()339 fn stress_shared() {
340 let amt: u32 = if cfg!(miri) { 100 } else { 10_000 };
341 let nthreads: u32 = if cfg!(miri) { 4 } else { 8 };
342 let (tx, rx) = channel::<i32>();
343
344 let t = thread::spawn(move || {
345 for _ in 0..amt * nthreads {
346 assert_eq!(rx.recv().unwrap(), 1);
347 }
348 assert!(rx.try_recv().is_err());
349 });
350
351 let mut ts = Vec::with_capacity(nthreads as usize);
352 for _ in 0..nthreads {
353 let tx = tx.clone();
354 let t = thread::spawn(move || {
355 for _ in 0..amt {
356 tx.send(1).unwrap();
357 }
358 });
359 ts.push(t);
360 }
361 drop(tx);
362 t.join().ok().unwrap();
363 for t in ts {
364 t.join().unwrap();
365 }
366 }
367
368 #[test]
send_from_outside_runtime()369 fn send_from_outside_runtime() {
370 let (tx1, rx1) = channel::<()>();
371 let (tx2, rx2) = channel::<i32>();
372 let t1 = thread::spawn(move || {
373 tx1.send(()).unwrap();
374 for _ in 0..40 {
375 assert_eq!(rx2.recv().unwrap(), 1);
376 }
377 });
378 rx1.recv().unwrap();
379 let t2 = thread::spawn(move || {
380 for _ in 0..40 {
381 tx2.send(1).unwrap();
382 }
383 });
384 t1.join().ok().unwrap();
385 t2.join().ok().unwrap();
386 }
387
388 #[test]
recv_from_outside_runtime()389 fn recv_from_outside_runtime() {
390 let (tx, rx) = channel::<i32>();
391 let t = thread::spawn(move || {
392 for _ in 0..40 {
393 assert_eq!(rx.recv().unwrap(), 1);
394 }
395 });
396 for _ in 0..40 {
397 tx.send(1).unwrap();
398 }
399 t.join().ok().unwrap();
400 }
401
402 #[test]
no_runtime()403 fn no_runtime() {
404 let (tx1, rx1) = channel::<i32>();
405 let (tx2, rx2) = channel::<i32>();
406 let t1 = thread::spawn(move || {
407 assert_eq!(rx1.recv().unwrap(), 1);
408 tx2.send(2).unwrap();
409 });
410 let t2 = thread::spawn(move || {
411 tx1.send(1).unwrap();
412 assert_eq!(rx2.recv().unwrap(), 2);
413 });
414 t1.join().ok().unwrap();
415 t2.join().ok().unwrap();
416 }
417
418 #[test]
oneshot_single_thread_close_port_first()419 fn oneshot_single_thread_close_port_first() {
420 // Simple test of closing without sending
421 let (_tx, rx) = channel::<i32>();
422 drop(rx);
423 }
424
425 #[test]
oneshot_single_thread_close_chan_first()426 fn oneshot_single_thread_close_chan_first() {
427 // Simple test of closing without sending
428 let (tx, _rx) = channel::<i32>();
429 drop(tx);
430 }
431
432 #[test]
oneshot_single_thread_send_port_close()433 fn oneshot_single_thread_send_port_close() {
434 // Testing that the sender cleans up the payload if receiver is closed
435 let (tx, rx) = channel::<Box<i32>>();
436 drop(rx);
437 assert!(tx.send(Box::new(0)).is_err());
438 }
439
440 #[test]
oneshot_single_thread_recv_chan_close()441 fn oneshot_single_thread_recv_chan_close() {
442 let (tx, rx) = channel::<i32>();
443 drop(tx);
444 assert_eq!(rx.recv(), Err(RecvError));
445 }
446
447 #[test]
oneshot_single_thread_send_then_recv()448 fn oneshot_single_thread_send_then_recv() {
449 let (tx, rx) = channel::<Box<i32>>();
450 tx.send(Box::new(10)).unwrap();
451 assert!(*rx.recv().unwrap() == 10);
452 }
453
454 #[test]
oneshot_single_thread_try_send_open()455 fn oneshot_single_thread_try_send_open() {
456 let (tx, rx) = channel::<i32>();
457 assert!(tx.send(10).is_ok());
458 assert!(rx.recv().unwrap() == 10);
459 }
460
461 #[test]
oneshot_single_thread_try_send_closed()462 fn oneshot_single_thread_try_send_closed() {
463 let (tx, rx) = channel::<i32>();
464 drop(rx);
465 assert!(tx.send(10).is_err());
466 }
467
468 #[test]
oneshot_single_thread_try_recv_open()469 fn oneshot_single_thread_try_recv_open() {
470 let (tx, rx) = channel::<i32>();
471 tx.send(10).unwrap();
472 assert!(rx.recv() == Ok(10));
473 }
474
475 #[test]
oneshot_single_thread_try_recv_closed()476 fn oneshot_single_thread_try_recv_closed() {
477 let (tx, rx) = channel::<i32>();
478 drop(tx);
479 assert!(rx.recv().is_err());
480 }
481
482 #[test]
oneshot_single_thread_peek_data()483 fn oneshot_single_thread_peek_data() {
484 let (tx, rx) = channel::<i32>();
485 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
486 tx.send(10).unwrap();
487 assert_eq!(rx.try_recv(), Ok(10));
488 }
489
490 #[test]
oneshot_single_thread_peek_close()491 fn oneshot_single_thread_peek_close() {
492 let (tx, rx) = channel::<i32>();
493 drop(tx);
494 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
495 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
496 }
497
498 #[test]
oneshot_single_thread_peek_open()499 fn oneshot_single_thread_peek_open() {
500 let (_tx, rx) = channel::<i32>();
501 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
502 }
503
504 #[test]
oneshot_multi_task_recv_then_send()505 fn oneshot_multi_task_recv_then_send() {
506 let (tx, rx) = channel::<Box<i32>>();
507 let t = thread::spawn(move || {
508 assert!(*rx.recv().unwrap() == 10);
509 });
510
511 tx.send(Box::new(10)).unwrap();
512 t.join().unwrap();
513 }
514
515 #[test]
oneshot_multi_task_recv_then_close()516 fn oneshot_multi_task_recv_then_close() {
517 let (tx, rx) = channel::<Box<i32>>();
518 let t = thread::spawn(move || {
519 drop(tx);
520 });
521 thread::spawn(move || {
522 assert_eq!(rx.recv(), Err(RecvError));
523 })
524 .join()
525 .unwrap();
526 t.join().unwrap();
527 }
528
529 #[test]
oneshot_multi_thread_close_stress()530 fn oneshot_multi_thread_close_stress() {
531 let stress_factor = stress_factor();
532 let mut ts = Vec::with_capacity(stress_factor);
533 for _ in 0..stress_factor {
534 let (tx, rx) = channel::<i32>();
535 let t = thread::spawn(move || {
536 drop(rx);
537 });
538 ts.push(t);
539 drop(tx);
540 }
541 for t in ts {
542 t.join().unwrap();
543 }
544 }
545
546 #[test]
oneshot_multi_thread_send_close_stress()547 fn oneshot_multi_thread_send_close_stress() {
548 let stress_factor = stress_factor();
549 let mut ts = Vec::with_capacity(2 * stress_factor);
550 for _ in 0..stress_factor {
551 let (tx, rx) = channel::<i32>();
552 let t = thread::spawn(move || {
553 drop(rx);
554 });
555 ts.push(t);
556 thread::spawn(move || {
557 let _ = tx.send(1);
558 })
559 .join()
560 .unwrap();
561 }
562 for t in ts {
563 t.join().unwrap();
564 }
565 }
566
567 #[test]
oneshot_multi_thread_recv_close_stress()568 fn oneshot_multi_thread_recv_close_stress() {
569 let stress_factor = stress_factor();
570 let mut ts = Vec::with_capacity(2 * stress_factor);
571 for _ in 0..stress_factor {
572 let (tx, rx) = channel::<i32>();
573 let t = thread::spawn(move || {
574 thread::spawn(move || {
575 assert_eq!(rx.recv(), Err(RecvError));
576 })
577 .join()
578 .unwrap();
579 });
580 ts.push(t);
581 let t2 = thread::spawn(move || {
582 let t = thread::spawn(move || {
583 drop(tx);
584 });
585 t.join().unwrap();
586 });
587 ts.push(t2);
588 }
589 for t in ts {
590 t.join().unwrap();
591 }
592 }
593
594 #[test]
oneshot_multi_thread_send_recv_stress()595 fn oneshot_multi_thread_send_recv_stress() {
596 let stress_factor = stress_factor();
597 let mut ts = Vec::with_capacity(stress_factor);
598 for _ in 0..stress_factor {
599 let (tx, rx) = channel::<Box<isize>>();
600 let t = thread::spawn(move || {
601 tx.send(Box::new(10)).unwrap();
602 });
603 ts.push(t);
604 assert!(*rx.recv().unwrap() == 10);
605 }
606 for t in ts {
607 t.join().unwrap();
608 }
609 }
610
611 #[test]
stream_send_recv_stress()612 fn stream_send_recv_stress() {
613 let stress_factor = stress_factor();
614 let mut ts = Vec::with_capacity(2 * stress_factor);
615 for _ in 0..stress_factor {
616 let (tx, rx) = channel();
617
618 if let Some(t) = send(tx, 0) {
619 ts.push(t);
620 }
621 if let Some(t2) = recv(rx, 0) {
622 ts.push(t2);
623 }
624
625 fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
626 if i == 10 {
627 return None;
628 }
629
630 Some(thread::spawn(move || {
631 tx.send(Box::new(i)).unwrap();
632 send(tx, i + 1);
633 }))
634 }
635
636 fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
637 if i == 10 {
638 return None;
639 }
640
641 Some(thread::spawn(move || {
642 assert!(*rx.recv().unwrap() == i);
643 recv(rx, i + 1);
644 }))
645 }
646 }
647 for t in ts {
648 t.join().unwrap();
649 }
650 }
651
652 #[test]
oneshot_single_thread_recv_timeout()653 fn oneshot_single_thread_recv_timeout() {
654 let (tx, rx) = channel();
655 tx.send(()).unwrap();
656 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
657 assert_eq!(
658 rx.recv_timeout(Duration::from_millis(1)),
659 Err(RecvTimeoutError::Timeout)
660 );
661 tx.send(()).unwrap();
662 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
663 }
664
665 #[test]
stress_recv_timeout_two_threads()666 fn stress_recv_timeout_two_threads() {
667 let (tx, rx) = channel();
668 let stress = stress_factor() + 100;
669 let timeout = Duration::from_millis(100);
670
671 let t = thread::spawn(move || {
672 for i in 0..stress {
673 if i % 2 == 0 {
674 thread::sleep(timeout * 2);
675 }
676 tx.send(1usize).unwrap();
677 }
678 });
679
680 let mut recv_count = 0;
681 loop {
682 match rx.recv_timeout(timeout) {
683 Ok(n) => {
684 assert_eq!(n, 1usize);
685 recv_count += 1;
686 }
687 Err(RecvTimeoutError::Timeout) => continue,
688 Err(RecvTimeoutError::Disconnected) => break,
689 }
690 }
691
692 assert_eq!(recv_count, stress);
693 t.join().unwrap()
694 }
695
696 #[test]
recv_timeout_upgrade()697 fn recv_timeout_upgrade() {
698 let (tx, rx) = channel::<()>();
699 let timeout = Duration::from_millis(1);
700 let _tx_clone = tx.clone();
701
702 let start = Instant::now();
703 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
704 assert!(Instant::now() >= start + timeout);
705 }
706
707 #[test]
stress_recv_timeout_shared()708 fn stress_recv_timeout_shared() {
709 let (tx, rx) = channel();
710 let stress = stress_factor() + 100;
711
712 let mut ts = Vec::with_capacity(stress);
713 for i in 0..stress {
714 let tx = tx.clone();
715 let t = thread::spawn(move || {
716 thread::sleep(Duration::from_millis(i as u64 * 10));
717 tx.send(1usize).unwrap();
718 });
719 ts.push(t);
720 }
721
722 drop(tx);
723
724 let mut recv_count = 0;
725 loop {
726 match rx.recv_timeout(Duration::from_millis(10)) {
727 Ok(n) => {
728 assert_eq!(n, 1usize);
729 recv_count += 1;
730 }
731 Err(RecvTimeoutError::Timeout) => continue,
732 Err(RecvTimeoutError::Disconnected) => break,
733 }
734 }
735
736 assert_eq!(recv_count, stress);
737 for t in ts {
738 t.join().unwrap();
739 }
740 }
741
742 #[test]
recv_a_lot()743 fn recv_a_lot() {
744 #[cfg(miri)]
745 const N: usize = 50;
746 #[cfg(not(miri))]
747 const N: usize = 10000;
748
749 // Regression test that we don't run out of stack in scheduler context
750 let (tx, rx) = channel();
751 for _ in 0..N {
752 tx.send(()).unwrap();
753 }
754 for _ in 0..N {
755 rx.recv().unwrap();
756 }
757 }
758
759 #[test]
shared_recv_timeout()760 fn shared_recv_timeout() {
761 let (tx, rx) = channel();
762 let total = 5;
763 let mut ts = Vec::with_capacity(total);
764 for _ in 0..total {
765 let tx = tx.clone();
766 let t = thread::spawn(move || {
767 tx.send(()).unwrap();
768 });
769 ts.push(t);
770 }
771
772 for _ in 0..total {
773 rx.recv().unwrap();
774 }
775
776 assert_eq!(
777 rx.recv_timeout(Duration::from_millis(1)),
778 Err(RecvTimeoutError::Timeout)
779 );
780 tx.send(()).unwrap();
781 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
782 for t in ts {
783 t.join().unwrap();
784 }
785 }
786
787 #[test]
shared_chan_stress()788 fn shared_chan_stress() {
789 let (tx, rx) = channel();
790 let total = stress_factor() + 100;
791 let mut ts = Vec::with_capacity(total);
792 for _ in 0..total {
793 let tx = tx.clone();
794 let t = thread::spawn(move || {
795 tx.send(()).unwrap();
796 });
797 ts.push(t);
798 }
799
800 for _ in 0..total {
801 rx.recv().unwrap();
802 }
803 for t in ts {
804 t.join().unwrap();
805 }
806 }
807
808 #[test]
test_nested_recv_iter()809 fn test_nested_recv_iter() {
810 let (tx, rx) = channel::<i32>();
811 let (total_tx, total_rx) = channel::<i32>();
812
813 let t = thread::spawn(move || {
814 let mut acc = 0;
815 for x in rx.iter() {
816 acc += x;
817 }
818 total_tx.send(acc).unwrap();
819 });
820
821 tx.send(3).unwrap();
822 tx.send(1).unwrap();
823 tx.send(2).unwrap();
824 drop(tx);
825 assert_eq!(total_rx.recv().unwrap(), 6);
826 t.join().unwrap();
827 }
828
829 #[test]
test_recv_iter_break()830 fn test_recv_iter_break() {
831 let (tx, rx) = channel::<i32>();
832 let (count_tx, count_rx) = channel();
833
834 let t = thread::spawn(move || {
835 let mut count = 0;
836 for x in rx.iter() {
837 if count >= 3 {
838 break;
839 } else {
840 count += x;
841 }
842 }
843 count_tx.send(count).unwrap();
844 });
845
846 tx.send(2).unwrap();
847 tx.send(2).unwrap();
848 tx.send(2).unwrap();
849 let _ = tx.send(2);
850 drop(tx);
851 assert_eq!(count_rx.recv().unwrap(), 4);
852 t.join().unwrap();
853 }
854
855 #[test]
test_recv_try_iter()856 fn test_recv_try_iter() {
857 let (request_tx, request_rx) = channel();
858 let (response_tx, response_rx) = channel();
859
860 // Request `x`s until we have `6`.
861 let t = thread::spawn(move || {
862 let mut count = 0;
863 loop {
864 for x in response_rx.try_iter() {
865 count += x;
866 if count == 6 {
867 return count;
868 }
869 }
870 request_tx.send(()).unwrap();
871 }
872 });
873
874 for _ in request_rx.iter() {
875 if response_tx.send(2).is_err() {
876 break;
877 }
878 }
879
880 assert_eq!(t.join().unwrap(), 6);
881 }
882
883 #[test]
test_recv_into_iter_owned()884 fn test_recv_into_iter_owned() {
885 let mut iter = {
886 let (tx, rx) = channel::<i32>();
887 tx.send(1).unwrap();
888 tx.send(2).unwrap();
889
890 rx.into_iter()
891 };
892 assert_eq!(iter.next().unwrap(), 1);
893 assert_eq!(iter.next().unwrap(), 2);
894 assert!(iter.next().is_none());
895 }
896
897 #[test]
test_recv_into_iter_borrowed()898 fn test_recv_into_iter_borrowed() {
899 let (tx, rx) = channel::<i32>();
900 tx.send(1).unwrap();
901 tx.send(2).unwrap();
902 drop(tx);
903 let mut iter = (&rx).into_iter();
904 assert_eq!(iter.next().unwrap(), 1);
905 assert_eq!(iter.next().unwrap(), 2);
906 assert!(iter.next().is_none());
907 }
908
909 #[test]
try_recv_states()910 fn try_recv_states() {
911 let (tx1, rx1) = channel::<i32>();
912 let (tx2, rx2) = channel::<()>();
913 let (tx3, rx3) = channel::<()>();
914 let t = thread::spawn(move || {
915 rx2.recv().unwrap();
916 tx1.send(1).unwrap();
917 tx3.send(()).unwrap();
918 rx2.recv().unwrap();
919 drop(tx1);
920 tx3.send(()).unwrap();
921 });
922
923 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
924 tx2.send(()).unwrap();
925 rx3.recv().unwrap();
926 assert_eq!(rx1.try_recv(), Ok(1));
927 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
928 tx2.send(()).unwrap();
929 rx3.recv().unwrap();
930 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
931 t.join().unwrap();
932 }
933
934 // This bug used to end up in a livelock inside of the Receiver destructor
935 // because the internal state of the Shared packet was corrupted
936 #[test]
destroy_upgraded_shared_port_when_sender_still_active()937 fn destroy_upgraded_shared_port_when_sender_still_active() {
938 let (tx, rx) = channel();
939 let (tx2, rx2) = channel();
940 let t = thread::spawn(move || {
941 rx.recv().unwrap(); // wait on a oneshot
942 drop(rx); // destroy a shared
943 tx2.send(()).unwrap();
944 });
945 // make sure the other thread has gone to sleep
946 for _ in 0..5000 {
947 thread::yield_now();
948 }
949
950 // upgrade to a shared chan and send a message
951 let tx2 = tx.clone();
952 drop(tx);
953 tx2.send(()).unwrap();
954
955 // wait for the child thread to exit before we exit
956 rx2.recv().unwrap();
957 t.join().unwrap();
958 }
959
960 #[test]
issue_32114()961 fn issue_32114() {
962 let (tx, _) = channel();
963 let _ = tx.send(123);
964 assert_eq!(tx.send(123), Err(SendError(123)));
965 }
966 }
967
968 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
969 mod sync_channel_tests {
970 use super::*;
971
972 use std::env;
973 use std::thread;
974
stress_factor() -> usize975 pub fn stress_factor() -> usize {
976 match env::var("RUST_TEST_STRESS") {
977 Ok(val) => val.parse().unwrap(),
978 Err(..) => 1,
979 }
980 }
981
982 #[test]
smoke()983 fn smoke() {
984 let (tx, rx) = sync_channel::<i32>(1);
985 tx.send(1).unwrap();
986 assert_eq!(rx.recv().unwrap(), 1);
987 }
988
989 #[test]
drop_full()990 fn drop_full() {
991 let (tx, _rx) = sync_channel::<Box<isize>>(1);
992 tx.send(Box::new(1)).unwrap();
993 }
994
995 #[test]
smoke_shared()996 fn smoke_shared() {
997 let (tx, rx) = sync_channel::<i32>(1);
998 tx.send(1).unwrap();
999 assert_eq!(rx.recv().unwrap(), 1);
1000 let tx = tx.clone();
1001 tx.send(1).unwrap();
1002 assert_eq!(rx.recv().unwrap(), 1);
1003 }
1004
1005 #[test]
recv_timeout()1006 fn recv_timeout() {
1007 let (tx, rx) = sync_channel::<i32>(1);
1008 assert_eq!(
1009 rx.recv_timeout(Duration::from_millis(1)),
1010 Err(RecvTimeoutError::Timeout)
1011 );
1012 tx.send(1).unwrap();
1013 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
1014 }
1015
1016 #[test]
smoke_threads()1017 fn smoke_threads() {
1018 let (tx, rx) = sync_channel::<i32>(0);
1019 let t = thread::spawn(move || {
1020 tx.send(1).unwrap();
1021 });
1022 assert_eq!(rx.recv().unwrap(), 1);
1023 t.join().unwrap();
1024 }
1025
1026 #[test]
smoke_port_gone()1027 fn smoke_port_gone() {
1028 let (tx, rx) = sync_channel::<i32>(0);
1029 drop(rx);
1030 assert!(tx.send(1).is_err());
1031 }
1032
1033 #[test]
smoke_shared_port_gone2()1034 fn smoke_shared_port_gone2() {
1035 let (tx, rx) = sync_channel::<i32>(0);
1036 drop(rx);
1037 let tx2 = tx.clone();
1038 drop(tx);
1039 assert!(tx2.send(1).is_err());
1040 }
1041
1042 #[test]
port_gone_concurrent()1043 fn port_gone_concurrent() {
1044 let (tx, rx) = sync_channel::<i32>(0);
1045 let t = thread::spawn(move || {
1046 rx.recv().unwrap();
1047 });
1048 while tx.send(1).is_ok() {}
1049 t.join().unwrap();
1050 }
1051
1052 #[test]
port_gone_concurrent_shared()1053 fn port_gone_concurrent_shared() {
1054 let (tx, rx) = sync_channel::<i32>(0);
1055 let tx2 = tx.clone();
1056 let t = thread::spawn(move || {
1057 rx.recv().unwrap();
1058 });
1059 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1060 t.join().unwrap();
1061 }
1062
1063 #[test]
smoke_chan_gone()1064 fn smoke_chan_gone() {
1065 let (tx, rx) = sync_channel::<i32>(0);
1066 drop(tx);
1067 assert!(rx.recv().is_err());
1068 }
1069
1070 #[test]
smoke_chan_gone_shared()1071 fn smoke_chan_gone_shared() {
1072 let (tx, rx) = sync_channel::<()>(0);
1073 let tx2 = tx.clone();
1074 drop(tx);
1075 drop(tx2);
1076 assert!(rx.recv().is_err());
1077 }
1078
1079 #[test]
chan_gone_concurrent()1080 fn chan_gone_concurrent() {
1081 let (tx, rx) = sync_channel::<i32>(0);
1082 let t = thread::spawn(move || {
1083 tx.send(1).unwrap();
1084 tx.send(1).unwrap();
1085 });
1086 while rx.recv().is_ok() {}
1087 t.join().unwrap();
1088 }
1089
1090 #[test]
stress()1091 fn stress() {
1092 #[cfg(miri)]
1093 const N: usize = 100;
1094 #[cfg(not(miri))]
1095 const N: usize = 10000;
1096
1097 let (tx, rx) = sync_channel::<i32>(0);
1098 let t = thread::spawn(move || {
1099 for _ in 0..N {
1100 tx.send(1).unwrap();
1101 }
1102 });
1103 for _ in 0..N {
1104 assert_eq!(rx.recv().unwrap(), 1);
1105 }
1106 t.join().unwrap();
1107 }
1108
1109 #[test]
stress_recv_timeout_two_threads()1110 fn stress_recv_timeout_two_threads() {
1111 #[cfg(miri)]
1112 const N: usize = 100;
1113 #[cfg(not(miri))]
1114 const N: usize = 10000;
1115
1116 let (tx, rx) = sync_channel::<i32>(0);
1117
1118 let t = thread::spawn(move || {
1119 for _ in 0..N {
1120 tx.send(1).unwrap();
1121 }
1122 });
1123
1124 let mut recv_count = 0;
1125 loop {
1126 match rx.recv_timeout(Duration::from_millis(1)) {
1127 Ok(v) => {
1128 assert_eq!(v, 1);
1129 recv_count += 1;
1130 }
1131 Err(RecvTimeoutError::Timeout) => continue,
1132 Err(RecvTimeoutError::Disconnected) => break,
1133 }
1134 }
1135
1136 assert_eq!(recv_count, N);
1137 t.join().unwrap();
1138 }
1139
1140 #[test]
stress_recv_timeout_shared()1141 fn stress_recv_timeout_shared() {
1142 #[cfg(miri)]
1143 const AMT: u32 = 100;
1144 #[cfg(not(miri))]
1145 const AMT: u32 = 1000;
1146 const NTHREADS: u32 = 8;
1147 let (tx, rx) = sync_channel::<i32>(0);
1148 let (dtx, drx) = sync_channel::<()>(0);
1149
1150 let t = thread::spawn(move || {
1151 let mut recv_count = 0;
1152 loop {
1153 match rx.recv_timeout(Duration::from_millis(10)) {
1154 Ok(v) => {
1155 assert_eq!(v, 1);
1156 recv_count += 1;
1157 }
1158 Err(RecvTimeoutError::Timeout) => continue,
1159 Err(RecvTimeoutError::Disconnected) => break,
1160 }
1161 }
1162
1163 assert_eq!(recv_count, AMT * NTHREADS);
1164 assert!(rx.try_recv().is_err());
1165
1166 dtx.send(()).unwrap();
1167 });
1168
1169 let mut ts = Vec::with_capacity(NTHREADS as usize);
1170 for _ in 0..NTHREADS {
1171 let tx = tx.clone();
1172 let t = thread::spawn(move || {
1173 for _ in 0..AMT {
1174 tx.send(1).unwrap();
1175 }
1176 });
1177 ts.push(t);
1178 }
1179
1180 drop(tx);
1181
1182 drx.recv().unwrap();
1183 for t in ts {
1184 t.join().unwrap();
1185 }
1186 t.join().unwrap();
1187 }
1188
1189 #[test]
stress_shared()1190 fn stress_shared() {
1191 #[cfg(miri)]
1192 const AMT: u32 = 100;
1193 #[cfg(not(miri))]
1194 const AMT: u32 = 1000;
1195 const NTHREADS: u32 = 8;
1196 let (tx, rx) = sync_channel::<i32>(0);
1197 let (dtx, drx) = sync_channel::<()>(0);
1198
1199 let t = thread::spawn(move || {
1200 for _ in 0..AMT * NTHREADS {
1201 assert_eq!(rx.recv().unwrap(), 1);
1202 }
1203 assert!(rx.try_recv().is_err());
1204 dtx.send(()).unwrap();
1205 });
1206
1207 let mut ts = Vec::with_capacity(NTHREADS as usize);
1208 for _ in 0..NTHREADS {
1209 let tx = tx.clone();
1210 let t = thread::spawn(move || {
1211 for _ in 0..AMT {
1212 tx.send(1).unwrap();
1213 }
1214 });
1215 ts.push(t);
1216 }
1217 drop(tx);
1218 drx.recv().unwrap();
1219 for t in ts {
1220 t.join().unwrap();
1221 }
1222 t.join().unwrap();
1223 }
1224
1225 #[test]
oneshot_single_thread_close_port_first()1226 fn oneshot_single_thread_close_port_first() {
1227 // Simple test of closing without sending
1228 let (_tx, rx) = sync_channel::<i32>(0);
1229 drop(rx);
1230 }
1231
1232 #[test]
oneshot_single_thread_close_chan_first()1233 fn oneshot_single_thread_close_chan_first() {
1234 // Simple test of closing without sending
1235 let (tx, _rx) = sync_channel::<i32>(0);
1236 drop(tx);
1237 }
1238
1239 #[test]
oneshot_single_thread_send_port_close()1240 fn oneshot_single_thread_send_port_close() {
1241 // Testing that the sender cleans up the payload if receiver is closed
1242 let (tx, rx) = sync_channel::<Box<i32>>(0);
1243 drop(rx);
1244 assert!(tx.send(Box::new(0)).is_err());
1245 }
1246
1247 #[test]
oneshot_single_thread_recv_chan_close()1248 fn oneshot_single_thread_recv_chan_close() {
1249 let (tx, rx) = sync_channel::<i32>(0);
1250 drop(tx);
1251 assert_eq!(rx.recv(), Err(RecvError));
1252 }
1253
1254 #[test]
oneshot_single_thread_send_then_recv()1255 fn oneshot_single_thread_send_then_recv() {
1256 let (tx, rx) = sync_channel::<Box<i32>>(1);
1257 tx.send(Box::new(10)).unwrap();
1258 assert!(*rx.recv().unwrap() == 10);
1259 }
1260
1261 #[test]
oneshot_single_thread_try_send_open()1262 fn oneshot_single_thread_try_send_open() {
1263 let (tx, rx) = sync_channel::<i32>(1);
1264 assert_eq!(tx.try_send(10), Ok(()));
1265 assert!(rx.recv().unwrap() == 10);
1266 }
1267
1268 #[test]
oneshot_single_thread_try_send_closed()1269 fn oneshot_single_thread_try_send_closed() {
1270 let (tx, rx) = sync_channel::<i32>(0);
1271 drop(rx);
1272 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1273 }
1274
1275 #[test]
oneshot_single_thread_try_send_closed2()1276 fn oneshot_single_thread_try_send_closed2() {
1277 let (tx, _rx) = sync_channel::<i32>(0);
1278 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1279 }
1280
1281 #[test]
oneshot_single_thread_try_recv_open()1282 fn oneshot_single_thread_try_recv_open() {
1283 let (tx, rx) = sync_channel::<i32>(1);
1284 tx.send(10).unwrap();
1285 assert!(rx.recv() == Ok(10));
1286 }
1287
1288 #[test]
oneshot_single_thread_try_recv_closed()1289 fn oneshot_single_thread_try_recv_closed() {
1290 let (tx, rx) = sync_channel::<i32>(0);
1291 drop(tx);
1292 assert!(rx.recv().is_err());
1293 }
1294
1295 #[test]
oneshot_single_thread_try_recv_closed_with_data()1296 fn oneshot_single_thread_try_recv_closed_with_data() {
1297 let (tx, rx) = sync_channel::<i32>(1);
1298 tx.send(10).unwrap();
1299 drop(tx);
1300 assert_eq!(rx.try_recv(), Ok(10));
1301 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1302 }
1303
1304 #[test]
oneshot_single_thread_peek_data()1305 fn oneshot_single_thread_peek_data() {
1306 let (tx, rx) = sync_channel::<i32>(1);
1307 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1308 tx.send(10).unwrap();
1309 assert_eq!(rx.try_recv(), Ok(10));
1310 }
1311
1312 #[test]
oneshot_single_thread_peek_close()1313 fn oneshot_single_thread_peek_close() {
1314 let (tx, rx) = sync_channel::<i32>(0);
1315 drop(tx);
1316 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1317 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1318 }
1319
1320 #[test]
oneshot_single_thread_peek_open()1321 fn oneshot_single_thread_peek_open() {
1322 let (_tx, rx) = sync_channel::<i32>(0);
1323 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1324 }
1325
1326 #[test]
oneshot_multi_task_recv_then_send()1327 fn oneshot_multi_task_recv_then_send() {
1328 let (tx, rx) = sync_channel::<Box<i32>>(0);
1329 let t = thread::spawn(move || {
1330 assert!(*rx.recv().unwrap() == 10);
1331 });
1332
1333 tx.send(Box::new(10)).unwrap();
1334 t.join().unwrap();
1335 }
1336
1337 #[test]
oneshot_multi_task_recv_then_close()1338 fn oneshot_multi_task_recv_then_close() {
1339 let (tx, rx) = sync_channel::<Box<i32>>(0);
1340 let t = thread::spawn(move || {
1341 drop(tx);
1342 });
1343 thread::spawn(move || {
1344 assert_eq!(rx.recv(), Err(RecvError));
1345 })
1346 .join()
1347 .unwrap();
1348 t.join().unwrap();
1349 }
1350
1351 #[test]
oneshot_multi_thread_close_stress()1352 fn oneshot_multi_thread_close_stress() {
1353 let stress_factor = stress_factor();
1354 let mut ts = Vec::with_capacity(stress_factor);
1355 for _ in 0..stress_factor {
1356 let (tx, rx) = sync_channel::<i32>(0);
1357 let t = thread::spawn(move || {
1358 drop(rx);
1359 });
1360 ts.push(t);
1361 drop(tx);
1362 }
1363 for t in ts {
1364 t.join().unwrap();
1365 }
1366 }
1367
1368 #[test]
oneshot_multi_thread_send_close_stress()1369 fn oneshot_multi_thread_send_close_stress() {
1370 let stress_factor = stress_factor();
1371 let mut ts = Vec::with_capacity(stress_factor);
1372 for _ in 0..stress_factor {
1373 let (tx, rx) = sync_channel::<i32>(0);
1374 let t = thread::spawn(move || {
1375 drop(rx);
1376 });
1377 ts.push(t);
1378 thread::spawn(move || {
1379 let _ = tx.send(1);
1380 })
1381 .join()
1382 .unwrap();
1383 }
1384 for t in ts {
1385 t.join().unwrap();
1386 }
1387 }
1388
1389 #[test]
oneshot_multi_thread_recv_close_stress()1390 fn oneshot_multi_thread_recv_close_stress() {
1391 let stress_factor = stress_factor();
1392 let mut ts = Vec::with_capacity(2 * stress_factor);
1393 for _ in 0..stress_factor {
1394 let (tx, rx) = sync_channel::<i32>(0);
1395 let t = thread::spawn(move || {
1396 thread::spawn(move || {
1397 assert_eq!(rx.recv(), Err(RecvError));
1398 })
1399 .join()
1400 .unwrap();
1401 });
1402 ts.push(t);
1403 let t2 = thread::spawn(move || {
1404 thread::spawn(move || {
1405 drop(tx);
1406 });
1407 });
1408 ts.push(t2);
1409 }
1410 for t in ts {
1411 t.join().unwrap();
1412 }
1413 }
1414
1415 #[test]
oneshot_multi_thread_send_recv_stress()1416 fn oneshot_multi_thread_send_recv_stress() {
1417 let stress_factor = stress_factor();
1418 let mut ts = Vec::with_capacity(stress_factor);
1419 for _ in 0..stress_factor {
1420 let (tx, rx) = sync_channel::<Box<i32>>(0);
1421 let t = thread::spawn(move || {
1422 tx.send(Box::new(10)).unwrap();
1423 });
1424 ts.push(t);
1425 assert!(*rx.recv().unwrap() == 10);
1426 }
1427 for t in ts {
1428 t.join().unwrap();
1429 }
1430 }
1431
1432 #[test]
stream_send_recv_stress()1433 fn stream_send_recv_stress() {
1434 let stress_factor = stress_factor();
1435 let mut ts = Vec::with_capacity(2 * stress_factor);
1436 for _ in 0..stress_factor {
1437 let (tx, rx) = sync_channel::<Box<i32>>(0);
1438
1439 if let Some(t) = send(tx, 0) {
1440 ts.push(t);
1441 }
1442 if let Some(t) = recv(rx, 0) {
1443 ts.push(t);
1444 }
1445
1446 fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1447 if i == 10 {
1448 return None;
1449 }
1450
1451 Some(thread::spawn(move || {
1452 tx.send(Box::new(i)).unwrap();
1453 send(tx, i + 1);
1454 }))
1455 }
1456
1457 fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1458 if i == 10 {
1459 return None;
1460 }
1461
1462 Some(thread::spawn(move || {
1463 assert!(*rx.recv().unwrap() == i);
1464 recv(rx, i + 1);
1465 }))
1466 }
1467 }
1468 for t in ts {
1469 t.join().unwrap();
1470 }
1471 }
1472
1473 #[test]
recv_a_lot()1474 fn recv_a_lot() {
1475 #[cfg(miri)]
1476 const N: usize = 100;
1477 #[cfg(not(miri))]
1478 const N: usize = 10000;
1479
1480 // Regression test that we don't run out of stack in scheduler context
1481 let (tx, rx) = sync_channel(N);
1482 for _ in 0..N {
1483 tx.send(()).unwrap();
1484 }
1485 for _ in 0..N {
1486 rx.recv().unwrap();
1487 }
1488 }
1489
1490 #[test]
shared_chan_stress()1491 fn shared_chan_stress() {
1492 let (tx, rx) = sync_channel(0);
1493 let total = stress_factor() + 100;
1494 let mut ts = Vec::with_capacity(total);
1495 for _ in 0..total {
1496 let tx = tx.clone();
1497 let t = thread::spawn(move || {
1498 tx.send(()).unwrap();
1499 });
1500 ts.push(t);
1501 }
1502
1503 for _ in 0..total {
1504 rx.recv().unwrap();
1505 }
1506 for t in ts {
1507 t.join().unwrap();
1508 }
1509 }
1510
1511 #[test]
test_nested_recv_iter()1512 fn test_nested_recv_iter() {
1513 let (tx, rx) = sync_channel::<i32>(0);
1514 let (total_tx, total_rx) = sync_channel::<i32>(0);
1515
1516 let t = thread::spawn(move || {
1517 let mut acc = 0;
1518 for x in rx.iter() {
1519 acc += x;
1520 }
1521 total_tx.send(acc).unwrap();
1522 });
1523
1524 tx.send(3).unwrap();
1525 tx.send(1).unwrap();
1526 tx.send(2).unwrap();
1527 drop(tx);
1528 assert_eq!(total_rx.recv().unwrap(), 6);
1529 t.join().unwrap();
1530 }
1531
1532 #[test]
test_recv_iter_break()1533 fn test_recv_iter_break() {
1534 let (tx, rx) = sync_channel::<i32>(0);
1535 let (count_tx, count_rx) = sync_channel(0);
1536
1537 let t = thread::spawn(move || {
1538 let mut count = 0;
1539 for x in rx.iter() {
1540 if count >= 3 {
1541 break;
1542 } else {
1543 count += x;
1544 }
1545 }
1546 count_tx.send(count).unwrap();
1547 });
1548
1549 tx.send(2).unwrap();
1550 tx.send(2).unwrap();
1551 tx.send(2).unwrap();
1552 let _ = tx.try_send(2);
1553 drop(tx);
1554 assert_eq!(count_rx.recv().unwrap(), 4);
1555 t.join().unwrap();
1556 }
1557
1558 #[test]
try_recv_states()1559 fn try_recv_states() {
1560 let (tx1, rx1) = sync_channel::<i32>(1);
1561 let (tx2, rx2) = sync_channel::<()>(1);
1562 let (tx3, rx3) = sync_channel::<()>(1);
1563 let t = thread::spawn(move || {
1564 rx2.recv().unwrap();
1565 tx1.send(1).unwrap();
1566 tx3.send(()).unwrap();
1567 rx2.recv().unwrap();
1568 drop(tx1);
1569 tx3.send(()).unwrap();
1570 });
1571
1572 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1573 tx2.send(()).unwrap();
1574 rx3.recv().unwrap();
1575 assert_eq!(rx1.try_recv(), Ok(1));
1576 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1577 tx2.send(()).unwrap();
1578 rx3.recv().unwrap();
1579 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1580 t.join().unwrap();
1581 }
1582
1583 // This bug used to end up in a livelock inside of the Receiver destructor
1584 // because the internal state of the Shared packet was corrupted
1585 #[test]
destroy_upgraded_shared_port_when_sender_still_active()1586 fn destroy_upgraded_shared_port_when_sender_still_active() {
1587 let (tx, rx) = sync_channel::<()>(0);
1588 let (tx2, rx2) = sync_channel::<()>(0);
1589 let t = thread::spawn(move || {
1590 rx.recv().unwrap(); // wait on a oneshot
1591 drop(rx); // destroy a shared
1592 tx2.send(()).unwrap();
1593 });
1594 // make sure the other thread has gone to sleep
1595 for _ in 0..5000 {
1596 thread::yield_now();
1597 }
1598
1599 // upgrade to a shared chan and send a message
1600 let tx2 = tx.clone();
1601 drop(tx);
1602 tx2.send(()).unwrap();
1603
1604 // wait for the child thread to exit before we exit
1605 rx2.recv().unwrap();
1606 t.join().unwrap();
1607 }
1608
1609 #[test]
send1()1610 fn send1() {
1611 let (tx, rx) = sync_channel::<i32>(0);
1612 let t = thread::spawn(move || {
1613 rx.recv().unwrap();
1614 });
1615 assert_eq!(tx.send(1), Ok(()));
1616 t.join().unwrap();
1617 }
1618
1619 #[test]
send2()1620 fn send2() {
1621 let (tx, rx) = sync_channel::<i32>(0);
1622 let t = thread::spawn(move || {
1623 drop(rx);
1624 });
1625 assert!(tx.send(1).is_err());
1626 t.join().unwrap();
1627 }
1628
1629 #[test]
send3()1630 fn send3() {
1631 let (tx, rx) = sync_channel::<i32>(1);
1632 assert_eq!(tx.send(1), Ok(()));
1633 let t = thread::spawn(move || {
1634 drop(rx);
1635 });
1636 assert!(tx.send(1).is_err());
1637 t.join().unwrap();
1638 }
1639
1640 #[test]
send4()1641 fn send4() {
1642 let (tx, rx) = sync_channel::<i32>(0);
1643 let tx2 = tx.clone();
1644 let (done, donerx) = channel();
1645 let done2 = done.clone();
1646 let t = thread::spawn(move || {
1647 assert!(tx.send(1).is_err());
1648 done.send(()).unwrap();
1649 });
1650 let t2 = thread::spawn(move || {
1651 assert!(tx2.send(2).is_err());
1652 done2.send(()).unwrap();
1653 });
1654 drop(rx);
1655 donerx.recv().unwrap();
1656 donerx.recv().unwrap();
1657 t.join().unwrap();
1658 t2.join().unwrap();
1659 }
1660
1661 #[test]
try_send1()1662 fn try_send1() {
1663 let (tx, _rx) = sync_channel::<i32>(0);
1664 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1665 }
1666
1667 #[test]
try_send2()1668 fn try_send2() {
1669 let (tx, _rx) = sync_channel::<i32>(1);
1670 assert_eq!(tx.try_send(1), Ok(()));
1671 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1672 }
1673
1674 #[test]
try_send3()1675 fn try_send3() {
1676 let (tx, rx) = sync_channel::<i32>(1);
1677 assert_eq!(tx.try_send(1), Ok(()));
1678 drop(rx);
1679 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
1680 }
1681
1682 #[test]
issue_15761()1683 fn issue_15761() {
1684 fn repro() {
1685 let (tx1, rx1) = sync_channel::<()>(3);
1686 let (tx2, rx2) = sync_channel::<()>(3);
1687
1688 let _t = thread::spawn(move || {
1689 rx1.recv().unwrap();
1690 tx2.try_send(()).unwrap();
1691 });
1692
1693 tx1.try_send(()).unwrap();
1694 rx2.recv().unwrap();
1695 }
1696
1697 for _ in 0..100 {
1698 repro()
1699 }
1700 }
1701 }
1702
1703 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs
1704 mod select_tests {
1705 use super::*;
1706
1707 use std::thread;
1708
1709 #[test]
smoke()1710 fn smoke() {
1711 let (tx1, rx1) = channel::<i32>();
1712 let (tx2, rx2) = channel::<i32>();
1713 tx1.send(1).unwrap();
1714 select! {
1715 foo = rx1.recv() => assert_eq!(foo.unwrap(), 1),
1716 _bar = rx2.recv() => panic!()
1717 }
1718 tx2.send(2).unwrap();
1719 select! {
1720 _foo = rx1.recv() => panic!(),
1721 bar = rx2.recv() => assert_eq!(bar.unwrap(), 2)
1722 }
1723 drop(tx1);
1724 select! {
1725 foo = rx1.recv() => assert!(foo.is_err()),
1726 _bar = rx2.recv() => panic!()
1727 }
1728 drop(tx2);
1729 select! {
1730 bar = rx2.recv() => assert!(bar.is_err())
1731 }
1732 }
1733
1734 #[test]
smoke2()1735 fn smoke2() {
1736 let (_tx1, rx1) = channel::<i32>();
1737 let (_tx2, rx2) = channel::<i32>();
1738 let (_tx3, rx3) = channel::<i32>();
1739 let (_tx4, rx4) = channel::<i32>();
1740 let (tx5, rx5) = channel::<i32>();
1741 tx5.send(4).unwrap();
1742 select! {
1743 _foo = rx1.recv() => panic!("1"),
1744 _foo = rx2.recv() => panic!("2"),
1745 _foo = rx3.recv() => panic!("3"),
1746 _foo = rx4.recv() => panic!("4"),
1747 foo = rx5.recv() => assert_eq!(foo.unwrap(), 4)
1748 }
1749 }
1750
1751 #[test]
closed()1752 fn closed() {
1753 let (_tx1, rx1) = channel::<i32>();
1754 let (tx2, rx2) = channel::<i32>();
1755 drop(tx2);
1756
1757 select! {
1758 _a1 = rx1.recv() => panic!(),
1759 a2 = rx2.recv() => assert!(a2.is_err())
1760 }
1761 }
1762
1763 #[test]
unblocks()1764 fn unblocks() {
1765 let (tx1, rx1) = channel::<i32>();
1766 let (_tx2, rx2) = channel::<i32>();
1767 let (tx3, rx3) = channel::<i32>();
1768
1769 let t = thread::spawn(move || {
1770 for _ in 0..20 {
1771 thread::yield_now();
1772 }
1773 tx1.send(1).unwrap();
1774 rx3.recv().unwrap();
1775 for _ in 0..20 {
1776 thread::yield_now();
1777 }
1778 });
1779
1780 select! {
1781 a = rx1.recv() => assert_eq!(a.unwrap(), 1),
1782 _b = rx2.recv() => panic!()
1783 }
1784 tx3.send(1).unwrap();
1785 select! {
1786 a = rx1.recv() => assert!(a.is_err()),
1787 _b = rx2.recv() => panic!()
1788 }
1789 t.join().unwrap();
1790 }
1791
1792 #[test]
both_ready()1793 fn both_ready() {
1794 let (tx1, rx1) = channel::<i32>();
1795 let (tx2, rx2) = channel::<i32>();
1796 let (tx3, rx3) = channel::<()>();
1797
1798 let t = thread::spawn(move || {
1799 for _ in 0..20 {
1800 thread::yield_now();
1801 }
1802 tx1.send(1).unwrap();
1803 tx2.send(2).unwrap();
1804 rx3.recv().unwrap();
1805 });
1806
1807 select! {
1808 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1809 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1810 }
1811 select! {
1812 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1813 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1814 }
1815 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1816 assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
1817 tx3.send(()).unwrap();
1818 t.join().unwrap();
1819 }
1820
1821 #[test]
stress()1822 fn stress() {
1823 #[cfg(miri)]
1824 const AMT: i32 = 100;
1825 #[cfg(not(miri))]
1826 const AMT: i32 = 10000;
1827
1828 let (tx1, rx1) = channel::<i32>();
1829 let (tx2, rx2) = channel::<i32>();
1830 let (tx3, rx3) = channel::<()>();
1831
1832 let t = thread::spawn(move || {
1833 for i in 0..AMT {
1834 if i % 2 == 0 {
1835 tx1.send(i).unwrap();
1836 } else {
1837 tx2.send(i).unwrap();
1838 }
1839 rx3.recv().unwrap();
1840 }
1841 });
1842
1843 for i in 0..AMT {
1844 select! {
1845 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
1846 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
1847 }
1848 tx3.send(()).unwrap();
1849 }
1850 t.join().unwrap();
1851 }
1852
1853 #[allow(unused_must_use)]
1854 #[test]
cloning()1855 fn cloning() {
1856 let (tx1, rx1) = channel::<i32>();
1857 let (_tx2, rx2) = channel::<i32>();
1858 let (tx3, rx3) = channel::<()>();
1859
1860 let t = thread::spawn(move || {
1861 rx3.recv().unwrap();
1862 tx1.clone();
1863 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1864 tx1.send(2).unwrap();
1865 rx3.recv().unwrap();
1866 });
1867
1868 tx3.send(()).unwrap();
1869 select! {
1870 _i1 = rx1.recv() => {},
1871 _i2 = rx2.recv() => panic!()
1872 }
1873 tx3.send(()).unwrap();
1874 t.join().unwrap();
1875 }
1876
1877 #[allow(unused_must_use)]
1878 #[test]
cloning2()1879 fn cloning2() {
1880 let (tx1, rx1) = channel::<i32>();
1881 let (_tx2, rx2) = channel::<i32>();
1882 let (tx3, rx3) = channel::<()>();
1883
1884 let t = thread::spawn(move || {
1885 rx3.recv().unwrap();
1886 tx1.clone();
1887 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1888 tx1.send(2).unwrap();
1889 rx3.recv().unwrap();
1890 });
1891
1892 tx3.send(()).unwrap();
1893 select! {
1894 _i1 = rx1.recv() => {},
1895 _i2 = rx2.recv() => panic!()
1896 }
1897 tx3.send(()).unwrap();
1898 t.join().unwrap();
1899 }
1900
1901 #[test]
cloning3()1902 fn cloning3() {
1903 let (tx1, rx1) = channel::<()>();
1904 let (tx2, rx2) = channel::<()>();
1905 let (tx3, rx3) = channel::<()>();
1906 let t = thread::spawn(move || {
1907 select! {
1908 _ = rx1.recv() => panic!(),
1909 _ = rx2.recv() => {}
1910 }
1911 tx3.send(()).unwrap();
1912 });
1913
1914 for _ in 0..1000 {
1915 thread::yield_now();
1916 }
1917 drop(tx1.clone());
1918 tx2.send(()).unwrap();
1919 rx3.recv().unwrap();
1920 t.join().unwrap();
1921 }
1922
1923 #[test]
preflight1()1924 fn preflight1() {
1925 let (tx, rx) = channel();
1926 tx.send(()).unwrap();
1927 select! {
1928 _n = rx.recv() => {}
1929 }
1930 }
1931
1932 #[test]
preflight2()1933 fn preflight2() {
1934 let (tx, rx) = channel();
1935 tx.send(()).unwrap();
1936 tx.send(()).unwrap();
1937 select! {
1938 _n = rx.recv() => {}
1939 }
1940 }
1941
1942 #[test]
preflight3()1943 fn preflight3() {
1944 let (tx, rx) = channel();
1945 drop(tx.clone());
1946 tx.send(()).unwrap();
1947 select! {
1948 _n = rx.recv() => {}
1949 }
1950 }
1951
1952 #[test]
preflight4()1953 fn preflight4() {
1954 let (tx, rx) = channel();
1955 tx.send(()).unwrap();
1956 select! {
1957 _ = rx.recv() => {}
1958 }
1959 }
1960
1961 #[test]
preflight5()1962 fn preflight5() {
1963 let (tx, rx) = channel();
1964 tx.send(()).unwrap();
1965 tx.send(()).unwrap();
1966 select! {
1967 _ = rx.recv() => {}
1968 }
1969 }
1970
1971 #[test]
preflight6()1972 fn preflight6() {
1973 let (tx, rx) = channel();
1974 drop(tx.clone());
1975 tx.send(()).unwrap();
1976 select! {
1977 _ = rx.recv() => {}
1978 }
1979 }
1980
1981 #[test]
preflight7()1982 fn preflight7() {
1983 let (tx, rx) = channel::<()>();
1984 drop(tx);
1985 select! {
1986 _ = rx.recv() => {}
1987 }
1988 }
1989
1990 #[test]
preflight8()1991 fn preflight8() {
1992 let (tx, rx) = channel();
1993 tx.send(()).unwrap();
1994 drop(tx);
1995 rx.recv().unwrap();
1996 select! {
1997 _ = rx.recv() => {}
1998 }
1999 }
2000
2001 #[test]
preflight9()2002 fn preflight9() {
2003 let (tx, rx) = channel();
2004 drop(tx.clone());
2005 tx.send(()).unwrap();
2006 drop(tx);
2007 rx.recv().unwrap();
2008 select! {
2009 _ = rx.recv() => {}
2010 }
2011 }
2012
2013 #[test]
oneshot_data_waiting()2014 fn oneshot_data_waiting() {
2015 let (tx1, rx1) = channel();
2016 let (tx2, rx2) = channel();
2017 let t = thread::spawn(move || {
2018 select! {
2019 _n = rx1.recv() => {}
2020 }
2021 tx2.send(()).unwrap();
2022 });
2023
2024 for _ in 0..100 {
2025 thread::yield_now()
2026 }
2027 tx1.send(()).unwrap();
2028 rx2.recv().unwrap();
2029 t.join().unwrap();
2030 }
2031
2032 #[test]
stream_data_waiting()2033 fn stream_data_waiting() {
2034 let (tx1, rx1) = channel();
2035 let (tx2, rx2) = channel();
2036 tx1.send(()).unwrap();
2037 tx1.send(()).unwrap();
2038 rx1.recv().unwrap();
2039 rx1.recv().unwrap();
2040 let t = thread::spawn(move || {
2041 select! {
2042 _n = rx1.recv() => {}
2043 }
2044 tx2.send(()).unwrap();
2045 });
2046
2047 for _ in 0..100 {
2048 thread::yield_now()
2049 }
2050 tx1.send(()).unwrap();
2051 rx2.recv().unwrap();
2052 t.join().unwrap();
2053 }
2054
2055 #[test]
shared_data_waiting()2056 fn shared_data_waiting() {
2057 let (tx1, rx1) = channel();
2058 let (tx2, rx2) = channel();
2059 drop(tx1.clone());
2060 tx1.send(()).unwrap();
2061 rx1.recv().unwrap();
2062 let t = thread::spawn(move || {
2063 select! {
2064 _n = rx1.recv() => {}
2065 }
2066 tx2.send(()).unwrap();
2067 });
2068
2069 for _ in 0..100 {
2070 thread::yield_now()
2071 }
2072 tx1.send(()).unwrap();
2073 rx2.recv().unwrap();
2074 t.join().unwrap();
2075 }
2076
2077 #[test]
sync1()2078 fn sync1() {
2079 let (tx, rx) = sync_channel::<i32>(1);
2080 tx.send(1).unwrap();
2081 select! {
2082 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2083 }
2084 }
2085
2086 #[test]
sync2()2087 fn sync2() {
2088 let (tx, rx) = sync_channel::<i32>(0);
2089 let t = thread::spawn(move || {
2090 for _ in 0..100 {
2091 thread::yield_now()
2092 }
2093 tx.send(1).unwrap();
2094 });
2095 select! {
2096 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2097 }
2098 t.join().unwrap();
2099 }
2100
2101 #[test]
sync3()2102 fn sync3() {
2103 let (tx1, rx1) = sync_channel::<i32>(0);
2104 let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
2105 let t = thread::spawn(move || {
2106 tx1.send(1).unwrap();
2107 });
2108 let t2 = thread::spawn(move || {
2109 tx2.send(2).unwrap();
2110 });
2111 select! {
2112 n = rx1.recv() => {
2113 let n = n.unwrap();
2114 assert_eq!(n, 1);
2115 assert_eq!(rx2.recv().unwrap(), 2);
2116 },
2117 n = rx2.recv() => {
2118 let n = n.unwrap();
2119 assert_eq!(n, 2);
2120 assert_eq!(rx1.recv().unwrap(), 1);
2121 }
2122 }
2123 t.join().unwrap();
2124 t2.join().unwrap();
2125 }
2126 }
2127