1 //! Tests for channel selection using the `Select` struct.
2
3 use std::any::Any;
4 use std::cell::Cell;
5 use std::thread;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError};
9 use crossbeam_utils::thread::scope;
10
ms(ms: u64) -> Duration11 fn ms(ms: u64) -> Duration {
12 Duration::from_millis(ms)
13 }
14
15 #[test]
smoke1()16 fn smoke1() {
17 let (s1, r1) = unbounded::<usize>();
18 let (s2, r2) = unbounded::<usize>();
19
20 s1.send(1).unwrap();
21
22 let mut sel = Select::new();
23 let oper1 = sel.recv(&r1);
24 let oper2 = sel.recv(&r2);
25 let oper = sel.select();
26 match oper.index() {
27 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
28 i if i == oper2 => panic!(),
29 _ => unreachable!(),
30 }
31
32 s2.send(2).unwrap();
33
34 let mut sel = Select::new();
35 let oper1 = sel.recv(&r1);
36 let oper2 = sel.recv(&r2);
37 let oper = sel.select();
38 match oper.index() {
39 i if i == oper1 => panic!(),
40 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
41 _ => unreachable!(),
42 }
43 }
44
45 #[test]
smoke2()46 fn smoke2() {
47 let (_s1, r1) = unbounded::<i32>();
48 let (_s2, r2) = unbounded::<i32>();
49 let (_s3, r3) = unbounded::<i32>();
50 let (_s4, r4) = unbounded::<i32>();
51 let (s5, r5) = unbounded::<i32>();
52
53 s5.send(5).unwrap();
54
55 let mut sel = Select::new();
56 let oper1 = sel.recv(&r1);
57 let oper2 = sel.recv(&r2);
58 let oper3 = sel.recv(&r3);
59 let oper4 = sel.recv(&r4);
60 let oper5 = sel.recv(&r5);
61 let oper = sel.select();
62 match oper.index() {
63 i if i == oper1 => panic!(),
64 i if i == oper2 => panic!(),
65 i if i == oper3 => panic!(),
66 i if i == oper4 => panic!(),
67 i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)),
68 _ => unreachable!(),
69 }
70 }
71
72 #[test]
disconnected()73 fn disconnected() {
74 let (s1, r1) = unbounded::<i32>();
75 let (s2, r2) = unbounded::<i32>();
76
77 scope(|scope| {
78 scope.spawn(|_| {
79 drop(s1);
80 thread::sleep(ms(500));
81 s2.send(5).unwrap();
82 });
83
84 let mut sel = Select::new();
85 let oper1 = sel.recv(&r1);
86 let oper2 = sel.recv(&r2);
87 let oper = sel.select_timeout(ms(1000));
88 match oper {
89 Err(_) => panic!(),
90 Ok(oper) => match oper.index() {
91 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
92 i if i == oper2 => panic!(),
93 _ => unreachable!(),
94 },
95 }
96
97 r2.recv().unwrap();
98 })
99 .unwrap();
100
101 let mut sel = Select::new();
102 let oper1 = sel.recv(&r1);
103 let oper2 = sel.recv(&r2);
104 let oper = sel.select_timeout(ms(1000));
105 match oper {
106 Err(_) => panic!(),
107 Ok(oper) => match oper.index() {
108 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
109 i if i == oper2 => panic!(),
110 _ => unreachable!(),
111 },
112 }
113
114 scope(|scope| {
115 scope.spawn(|_| {
116 thread::sleep(ms(500));
117 drop(s2);
118 });
119
120 let mut sel = Select::new();
121 let oper1 = sel.recv(&r2);
122 let oper = sel.select_timeout(ms(1000));
123 match oper {
124 Err(_) => panic!(),
125 Ok(oper) => match oper.index() {
126 i if i == oper1 => assert!(oper.recv(&r2).is_err()),
127 _ => unreachable!(),
128 },
129 }
130 })
131 .unwrap();
132 }
133
134 #[test]
default()135 fn default() {
136 let (s1, r1) = unbounded::<i32>();
137 let (s2, r2) = unbounded::<i32>();
138
139 let mut sel = Select::new();
140 let _oper1 = sel.recv(&r1);
141 let _oper2 = sel.recv(&r2);
142 let oper = sel.try_select();
143 match oper {
144 Err(_) => {}
145 Ok(_) => panic!(),
146 }
147
148 drop(s1);
149
150 let mut sel = Select::new();
151 let oper1 = sel.recv(&r1);
152 let oper2 = sel.recv(&r2);
153 let oper = sel.try_select();
154 match oper {
155 Err(_) => panic!(),
156 Ok(oper) => match oper.index() {
157 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
158 i if i == oper2 => panic!(),
159 _ => unreachable!(),
160 },
161 }
162
163 s2.send(2).unwrap();
164
165 let mut sel = Select::new();
166 let oper1 = sel.recv(&r2);
167 let oper = sel.try_select();
168 match oper {
169 Err(_) => panic!(),
170 Ok(oper) => match oper.index() {
171 i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)),
172 _ => unreachable!(),
173 },
174 }
175
176 let mut sel = Select::new();
177 let _oper1 = sel.recv(&r2);
178 let oper = sel.try_select();
179 match oper {
180 Err(_) => {}
181 Ok(_) => panic!(),
182 }
183
184 let mut sel = Select::new();
185 let oper = sel.try_select();
186 match oper {
187 Err(_) => {}
188 Ok(_) => panic!(),
189 }
190 }
191
192 #[test]
timeout()193 fn timeout() {
194 let (_s1, r1) = unbounded::<i32>();
195 let (s2, r2) = unbounded::<i32>();
196
197 scope(|scope| {
198 scope.spawn(|_| {
199 thread::sleep(ms(1500));
200 s2.send(2).unwrap();
201 });
202
203 let mut sel = Select::new();
204 let oper1 = sel.recv(&r1);
205 let oper2 = sel.recv(&r2);
206 let oper = sel.select_timeout(ms(1000));
207 match oper {
208 Err(_) => {}
209 Ok(oper) => match oper.index() {
210 i if i == oper1 => panic!(),
211 i if i == oper2 => panic!(),
212 _ => unreachable!(),
213 },
214 }
215
216 let mut sel = Select::new();
217 let oper1 = sel.recv(&r1);
218 let oper2 = sel.recv(&r2);
219 let oper = sel.select_timeout(ms(1000));
220 match oper {
221 Err(_) => panic!(),
222 Ok(oper) => match oper.index() {
223 i if i == oper1 => panic!(),
224 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
225 _ => unreachable!(),
226 },
227 }
228 })
229 .unwrap();
230
231 scope(|scope| {
232 let (s, r) = unbounded::<i32>();
233
234 scope.spawn(move |_| {
235 thread::sleep(ms(500));
236 drop(s);
237 });
238
239 let mut sel = Select::new();
240 let oper = sel.select_timeout(ms(1000));
241 match oper {
242 Err(_) => {
243 let mut sel = Select::new();
244 let oper1 = sel.recv(&r);
245 let oper = sel.try_select();
246 match oper {
247 Err(_) => panic!(),
248 Ok(oper) => match oper.index() {
249 i if i == oper1 => assert!(oper.recv(&r).is_err()),
250 _ => unreachable!(),
251 },
252 }
253 }
254 Ok(_) => unreachable!(),
255 }
256 })
257 .unwrap();
258 }
259
260 #[test]
default_when_disconnected()261 fn default_when_disconnected() {
262 let (_, r) = unbounded::<i32>();
263
264 let mut sel = Select::new();
265 let oper1 = sel.recv(&r);
266 let oper = sel.try_select();
267 match oper {
268 Err(_) => panic!(),
269 Ok(oper) => match oper.index() {
270 i if i == oper1 => assert!(oper.recv(&r).is_err()),
271 _ => unreachable!(),
272 },
273 }
274
275 let (_, r) = unbounded::<i32>();
276
277 let mut sel = Select::new();
278 let oper1 = sel.recv(&r);
279 let oper = sel.select_timeout(ms(1000));
280 match oper {
281 Err(_) => panic!(),
282 Ok(oper) => match oper.index() {
283 i if i == oper1 => assert!(oper.recv(&r).is_err()),
284 _ => unreachable!(),
285 },
286 }
287
288 let (s, _) = bounded::<i32>(0);
289
290 let mut sel = Select::new();
291 let oper1 = sel.send(&s);
292 let oper = sel.try_select();
293 match oper {
294 Err(_) => panic!(),
295 Ok(oper) => match oper.index() {
296 i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
297 _ => unreachable!(),
298 },
299 }
300
301 let (s, _) = bounded::<i32>(0);
302
303 let mut sel = Select::new();
304 let oper1 = sel.send(&s);
305 let oper = sel.select_timeout(ms(1000));
306 match oper {
307 Err(_) => panic!(),
308 Ok(oper) => match oper.index() {
309 i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
310 _ => unreachable!(),
311 },
312 }
313 }
314
315 #[test]
default_only()316 fn default_only() {
317 let start = Instant::now();
318
319 let mut sel = Select::new();
320 let oper = sel.try_select();
321 assert!(oper.is_err());
322 let now = Instant::now();
323 assert!(now - start <= ms(50));
324
325 let start = Instant::now();
326 let mut sel = Select::new();
327 let oper = sel.select_timeout(ms(500));
328 assert!(oper.is_err());
329 let now = Instant::now();
330 assert!(now - start >= ms(450));
331 assert!(now - start <= ms(550));
332 }
333
334 #[test]
unblocks()335 fn unblocks() {
336 let (s1, r1) = bounded::<i32>(0);
337 let (s2, r2) = bounded::<i32>(0);
338
339 scope(|scope| {
340 scope.spawn(|_| {
341 thread::sleep(ms(500));
342 s2.send(2).unwrap();
343 });
344
345 let mut sel = Select::new();
346 let oper1 = sel.recv(&r1);
347 let oper2 = sel.recv(&r2);
348 let oper = sel.select_timeout(ms(1000));
349 match oper {
350 Err(_) => panic!(),
351 Ok(oper) => match oper.index() {
352 i if i == oper1 => panic!(),
353 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
354 _ => unreachable!(),
355 },
356 }
357 })
358 .unwrap();
359
360 scope(|scope| {
361 scope.spawn(|_| {
362 thread::sleep(ms(500));
363 assert_eq!(r1.recv().unwrap(), 1);
364 });
365
366 let mut sel = Select::new();
367 let oper1 = sel.send(&s1);
368 let oper2 = sel.send(&s2);
369 let oper = sel.select_timeout(ms(1000));
370 match oper {
371 Err(_) => panic!(),
372 Ok(oper) => match oper.index() {
373 i if i == oper1 => oper.send(&s1, 1).unwrap(),
374 i if i == oper2 => panic!(),
375 _ => unreachable!(),
376 },
377 }
378 })
379 .unwrap();
380 }
381
382 #[test]
both_ready()383 fn both_ready() {
384 let (s1, r1) = bounded(0);
385 let (s2, r2) = bounded(0);
386
387 scope(|scope| {
388 scope.spawn(|_| {
389 thread::sleep(ms(500));
390 s1.send(1).unwrap();
391 assert_eq!(r2.recv().unwrap(), 2);
392 });
393
394 for _ in 0..2 {
395 let mut sel = Select::new();
396 let oper1 = sel.recv(&r1);
397 let oper2 = sel.send(&s2);
398 let oper = sel.select();
399 match oper.index() {
400 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
401 i if i == oper2 => oper.send(&s2, 2).unwrap(),
402 _ => unreachable!(),
403 }
404 }
405 })
406 .unwrap();
407 }
408
409 #[test]
loop_try()410 fn loop_try() {
411 const RUNS: usize = 20;
412
413 for _ in 0..RUNS {
414 let (s1, r1) = bounded::<i32>(0);
415 let (s2, r2) = bounded::<i32>(0);
416 let (s_end, r_end) = bounded::<()>(0);
417
418 scope(|scope| {
419 scope.spawn(|_| loop {
420 let mut done = false;
421
422 let mut sel = Select::new();
423 let oper1 = sel.send(&s1);
424 let oper = sel.try_select();
425 match oper {
426 Err(_) => {}
427 Ok(oper) => match oper.index() {
428 i if i == oper1 => {
429 let _ = oper.send(&s1, 1);
430 done = true;
431 }
432 _ => unreachable!(),
433 },
434 }
435 if done {
436 break;
437 }
438
439 let mut sel = Select::new();
440 let oper1 = sel.recv(&r_end);
441 let oper = sel.try_select();
442 match oper {
443 Err(_) => {}
444 Ok(oper) => match oper.index() {
445 i if i == oper1 => {
446 let _ = oper.recv(&r_end);
447 done = true;
448 }
449 _ => unreachable!(),
450 },
451 }
452 if done {
453 break;
454 }
455 });
456
457 scope.spawn(|_| loop {
458 if let Ok(x) = r2.try_recv() {
459 assert_eq!(x, 2);
460 break;
461 }
462
463 let mut done = false;
464 let mut sel = Select::new();
465 let oper1 = sel.recv(&r_end);
466 let oper = sel.try_select();
467 match oper {
468 Err(_) => {}
469 Ok(oper) => match oper.index() {
470 i if i == oper1 => {
471 let _ = oper.recv(&r_end);
472 done = true;
473 }
474 _ => unreachable!(),
475 },
476 }
477 if done {
478 break;
479 }
480 });
481
482 scope.spawn(|_| {
483 thread::sleep(ms(500));
484
485 let mut sel = Select::new();
486 let oper1 = sel.recv(&r1);
487 let oper2 = sel.send(&s2);
488 let oper = sel.select_timeout(ms(1000));
489 match oper {
490 Err(_) => {}
491 Ok(oper) => match oper.index() {
492 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
493 i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()),
494 _ => unreachable!(),
495 },
496 }
497
498 drop(s_end);
499 });
500 })
501 .unwrap();
502 }
503 }
504
505 #[test]
cloning1()506 fn cloning1() {
507 scope(|scope| {
508 let (s1, r1) = unbounded::<i32>();
509 let (_s2, r2) = unbounded::<i32>();
510 let (s3, r3) = unbounded::<()>();
511
512 scope.spawn(move |_| {
513 r3.recv().unwrap();
514 drop(s1.clone());
515 assert!(r3.try_recv().is_err());
516 s1.send(1).unwrap();
517 r3.recv().unwrap();
518 });
519
520 s3.send(()).unwrap();
521
522 let mut sel = Select::new();
523 let oper1 = sel.recv(&r1);
524 let oper2 = sel.recv(&r2);
525 let oper = sel.select();
526 match oper.index() {
527 i if i == oper1 => drop(oper.recv(&r1)),
528 i if i == oper2 => drop(oper.recv(&r2)),
529 _ => unreachable!(),
530 }
531
532 s3.send(()).unwrap();
533 })
534 .unwrap();
535 }
536
537 #[test]
cloning2()538 fn cloning2() {
539 let (s1, r1) = unbounded::<()>();
540 let (s2, r2) = unbounded::<()>();
541 let (_s3, _r3) = unbounded::<()>();
542
543 scope(|scope| {
544 scope.spawn(move |_| {
545 let mut sel = Select::new();
546 let oper1 = sel.recv(&r1);
547 let oper2 = sel.recv(&r2);
548 let oper = sel.select();
549 match oper.index() {
550 i if i == oper1 => panic!(),
551 i if i == oper2 => drop(oper.recv(&r2)),
552 _ => unreachable!(),
553 }
554 });
555
556 thread::sleep(ms(500));
557 drop(s1.clone());
558 s2.send(()).unwrap();
559 })
560 .unwrap();
561 }
562
563 #[test]
preflight1()564 fn preflight1() {
565 let (s, r) = unbounded();
566 s.send(()).unwrap();
567
568 let mut sel = Select::new();
569 let oper1 = sel.recv(&r);
570 let oper = sel.select();
571 match oper.index() {
572 i if i == oper1 => drop(oper.recv(&r)),
573 _ => unreachable!(),
574 }
575 }
576
577 #[test]
preflight2()578 fn preflight2() {
579 let (s, r) = unbounded();
580 drop(s.clone());
581 s.send(()).unwrap();
582 drop(s);
583
584 let mut sel = Select::new();
585 let oper1 = sel.recv(&r);
586 let oper = sel.select();
587 match oper.index() {
588 i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())),
589 _ => unreachable!(),
590 }
591
592 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
593 }
594
595 #[test]
preflight3()596 fn preflight3() {
597 let (s, r) = unbounded();
598 drop(s.clone());
599 s.send(()).unwrap();
600 drop(s);
601 r.recv().unwrap();
602
603 let mut sel = Select::new();
604 let oper1 = sel.recv(&r);
605 let oper = sel.select();
606 match oper.index() {
607 i if i == oper1 => assert!(oper.recv(&r).is_err()),
608 _ => unreachable!(),
609 }
610 }
611
612 #[test]
duplicate_operations()613 fn duplicate_operations() {
614 let (s, r) = unbounded::<i32>();
615 let hit = vec![Cell::new(false); 4];
616
617 while hit.iter().map(|h| h.get()).any(|hit| !hit) {
618 let mut sel = Select::new();
619 let oper0 = sel.recv(&r);
620 let oper1 = sel.recv(&r);
621 let oper2 = sel.send(&s);
622 let oper3 = sel.send(&s);
623 let oper = sel.select();
624 match oper.index() {
625 i if i == oper0 => {
626 assert!(oper.recv(&r).is_ok());
627 hit[0].set(true);
628 }
629 i if i == oper1 => {
630 assert!(oper.recv(&r).is_ok());
631 hit[1].set(true);
632 }
633 i if i == oper2 => {
634 assert!(oper.send(&s, 0).is_ok());
635 hit[2].set(true);
636 }
637 i if i == oper3 => {
638 assert!(oper.send(&s, 0).is_ok());
639 hit[3].set(true);
640 }
641 _ => unreachable!(),
642 }
643 }
644 }
645
646 #[test]
nesting()647 fn nesting() {
648 let (s, r) = unbounded::<i32>();
649
650 let mut sel = Select::new();
651 let oper1 = sel.send(&s);
652 let oper = sel.select();
653 match oper.index() {
654 i if i == oper1 => {
655 assert!(oper.send(&s, 0).is_ok());
656
657 let mut sel = Select::new();
658 let oper1 = sel.recv(&r);
659 let oper = sel.select();
660 match oper.index() {
661 i if i == oper1 => {
662 assert_eq!(oper.recv(&r), Ok(0));
663
664 let mut sel = Select::new();
665 let oper1 = sel.send(&s);
666 let oper = sel.select();
667 match oper.index() {
668 i if i == oper1 => {
669 assert!(oper.send(&s, 1).is_ok());
670
671 let mut sel = Select::new();
672 let oper1 = sel.recv(&r);
673 let oper = sel.select();
674 match oper.index() {
675 i if i == oper1 => {
676 assert_eq!(oper.recv(&r), Ok(1));
677 }
678 _ => unreachable!(),
679 }
680 }
681 _ => unreachable!(),
682 }
683 }
684 _ => unreachable!(),
685 }
686 }
687 _ => unreachable!(),
688 }
689 }
690
691 #[test]
stress_recv()692 fn stress_recv() {
693 #[cfg(miri)]
694 const COUNT: usize = 50;
695 #[cfg(not(miri))]
696 const COUNT: usize = 10_000;
697
698 let (s1, r1) = unbounded();
699 let (s2, r2) = bounded(5);
700 let (s3, r3) = bounded(100);
701
702 scope(|scope| {
703 scope.spawn(|_| {
704 for i in 0..COUNT {
705 s1.send(i).unwrap();
706 r3.recv().unwrap();
707
708 s2.send(i).unwrap();
709 r3.recv().unwrap();
710 }
711 });
712
713 for i in 0..COUNT {
714 for _ in 0..2 {
715 let mut sel = Select::new();
716 let oper1 = sel.recv(&r1);
717 let oper2 = sel.recv(&r2);
718 let oper = sel.select();
719 match oper.index() {
720 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
721 ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)),
722 _ => unreachable!(),
723 }
724
725 s3.send(()).unwrap();
726 }
727 }
728 })
729 .unwrap();
730 }
731
732 #[test]
stress_send()733 fn stress_send() {
734 #[cfg(miri)]
735 const COUNT: usize = 50;
736 #[cfg(not(miri))]
737 const COUNT: usize = 10_000;
738
739 let (s1, r1) = bounded(0);
740 let (s2, r2) = bounded(0);
741 let (s3, r3) = bounded(100);
742
743 scope(|scope| {
744 scope.spawn(|_| {
745 for i in 0..COUNT {
746 assert_eq!(r1.recv().unwrap(), i);
747 assert_eq!(r2.recv().unwrap(), i);
748 r3.recv().unwrap();
749 }
750 });
751
752 for i in 0..COUNT {
753 for _ in 0..2 {
754 let mut sel = Select::new();
755 let oper1 = sel.send(&s1);
756 let oper2 = sel.send(&s2);
757 let oper = sel.select();
758 match oper.index() {
759 ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()),
760 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
761 _ => unreachable!(),
762 }
763 }
764 s3.send(()).unwrap();
765 }
766 })
767 .unwrap();
768 }
769
770 #[test]
stress_mixed()771 fn stress_mixed() {
772 #[cfg(miri)]
773 const COUNT: usize = 100;
774 #[cfg(not(miri))]
775 const COUNT: usize = 10_000;
776
777 let (s1, r1) = bounded(0);
778 let (s2, r2) = bounded(0);
779 let (s3, r3) = bounded(100);
780
781 scope(|scope| {
782 scope.spawn(|_| {
783 for i in 0..COUNT {
784 s1.send(i).unwrap();
785 assert_eq!(r2.recv().unwrap(), i);
786 r3.recv().unwrap();
787 }
788 });
789
790 for i in 0..COUNT {
791 for _ in 0..2 {
792 let mut sel = Select::new();
793 let oper1 = sel.recv(&r1);
794 let oper2 = sel.send(&s2);
795 let oper = sel.select();
796 match oper.index() {
797 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
798 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
799 _ => unreachable!(),
800 }
801 }
802 s3.send(()).unwrap();
803 }
804 })
805 .unwrap();
806 }
807
808 #[test]
stress_timeout_two_threads()809 fn stress_timeout_two_threads() {
810 const COUNT: usize = 20;
811
812 let (s, r) = bounded(2);
813
814 scope(|scope| {
815 scope.spawn(|_| {
816 for i in 0..COUNT {
817 if i % 2 == 0 {
818 thread::sleep(ms(500));
819 }
820
821 loop {
822 let mut sel = Select::new();
823 let oper1 = sel.send(&s);
824 let oper = sel.select_timeout(ms(100));
825 match oper {
826 Err(_) => {}
827 Ok(oper) => match oper.index() {
828 ix if ix == oper1 => {
829 assert!(oper.send(&s, i).is_ok());
830 break;
831 }
832 _ => unreachable!(),
833 },
834 }
835 }
836 }
837 });
838
839 scope.spawn(|_| {
840 for i in 0..COUNT {
841 if i % 2 == 0 {
842 thread::sleep(ms(500));
843 }
844
845 loop {
846 let mut sel = Select::new();
847 let oper1 = sel.recv(&r);
848 let oper = sel.select_timeout(ms(100));
849 match oper {
850 Err(_) => {}
851 Ok(oper) => match oper.index() {
852 ix if ix == oper1 => {
853 assert_eq!(oper.recv(&r), Ok(i));
854 break;
855 }
856 _ => unreachable!(),
857 },
858 }
859 }
860 }
861 });
862 })
863 .unwrap();
864 }
865
866 #[test]
send_recv_same_channel()867 fn send_recv_same_channel() {
868 let (s, r) = bounded::<i32>(0);
869 let mut sel = Select::new();
870 let oper1 = sel.send(&s);
871 let oper2 = sel.recv(&r);
872 let oper = sel.select_timeout(ms(100));
873 match oper {
874 Err(_) => {}
875 Ok(oper) => match oper.index() {
876 ix if ix == oper1 => panic!(),
877 ix if ix == oper2 => panic!(),
878 _ => unreachable!(),
879 },
880 }
881
882 let (s, r) = unbounded::<i32>();
883 let mut sel = Select::new();
884 let oper1 = sel.send(&s);
885 let oper2 = sel.recv(&r);
886 let oper = sel.select_timeout(ms(100));
887 match oper {
888 Err(_) => panic!(),
889 Ok(oper) => match oper.index() {
890 ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()),
891 ix if ix == oper2 => panic!(),
892 _ => unreachable!(),
893 },
894 }
895 }
896
897 #[test]
matching()898 fn matching() {
899 const THREADS: usize = 44;
900
901 let (s, r) = &bounded::<usize>(0);
902
903 scope(|scope| {
904 for i in 0..THREADS {
905 scope.spawn(move |_| {
906 let mut sel = Select::new();
907 let oper1 = sel.recv(r);
908 let oper2 = sel.send(s);
909 let oper = sel.select();
910 match oper.index() {
911 ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
912 ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
913 _ => unreachable!(),
914 }
915 });
916 }
917 })
918 .unwrap();
919
920 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
921 }
922
923 #[test]
matching_with_leftover()924 fn matching_with_leftover() {
925 const THREADS: usize = 55;
926
927 let (s, r) = &bounded::<usize>(0);
928
929 scope(|scope| {
930 for i in 0..THREADS {
931 scope.spawn(move |_| {
932 let mut sel = Select::new();
933 let oper1 = sel.recv(r);
934 let oper2 = sel.send(s);
935 let oper = sel.select();
936 match oper.index() {
937 ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
938 ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
939 _ => unreachable!(),
940 }
941 });
942 }
943 s.send(!0).unwrap();
944 })
945 .unwrap();
946
947 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
948 }
949
950 #[test]
channel_through_channel()951 fn channel_through_channel() {
952 #[cfg(miri)]
953 const COUNT: usize = 50;
954 #[cfg(not(miri))]
955 const COUNT: usize = 1000;
956
957 type T = Box<dyn Any + Send>;
958
959 for cap in 0..3 {
960 let (s, r) = bounded::<T>(cap);
961
962 scope(|scope| {
963 scope.spawn(move |_| {
964 let mut s = s;
965
966 for _ in 0..COUNT {
967 let (new_s, new_r) = bounded(cap);
968 let new_r: T = Box::new(Some(new_r));
969
970 {
971 let mut sel = Select::new();
972 let oper1 = sel.send(&s);
973 let oper = sel.select();
974 match oper.index() {
975 ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()),
976 _ => unreachable!(),
977 }
978 }
979
980 s = new_s;
981 }
982 });
983
984 scope.spawn(move |_| {
985 let mut r = r;
986
987 for _ in 0..COUNT {
988 let new = {
989 let mut sel = Select::new();
990 let oper1 = sel.recv(&r);
991 let oper = sel.select();
992 match oper.index() {
993 ix if ix == oper1 => oper
994 .recv(&r)
995 .unwrap()
996 .downcast_mut::<Option<Receiver<T>>>()
997 .unwrap()
998 .take()
999 .unwrap(),
1000 _ => unreachable!(),
1001 }
1002 };
1003 r = new;
1004 }
1005 });
1006 })
1007 .unwrap();
1008 }
1009 }
1010
1011 #[test]
linearizable_try()1012 fn linearizable_try() {
1013 #[cfg(miri)]
1014 const COUNT: usize = 50;
1015 #[cfg(not(miri))]
1016 const COUNT: usize = 100_000;
1017
1018 for step in 0..2 {
1019 let (start_s, start_r) = bounded::<()>(0);
1020 let (end_s, end_r) = bounded::<()>(0);
1021
1022 let ((s1, r1), (s2, r2)) = if step == 0 {
1023 (bounded::<i32>(1), bounded::<i32>(1))
1024 } else {
1025 (unbounded::<i32>(), unbounded::<i32>())
1026 };
1027
1028 scope(|scope| {
1029 scope.spawn(|_| {
1030 for _ in 0..COUNT {
1031 start_s.send(()).unwrap();
1032
1033 s1.send(1).unwrap();
1034
1035 let mut sel = Select::new();
1036 let oper1 = sel.recv(&r1);
1037 let oper2 = sel.recv(&r2);
1038 let oper = sel.try_select();
1039 match oper {
1040 Err(_) => unreachable!(),
1041 Ok(oper) => match oper.index() {
1042 ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1043 ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1044 _ => unreachable!(),
1045 },
1046 }
1047
1048 end_s.send(()).unwrap();
1049 let _ = r2.try_recv();
1050 }
1051 });
1052
1053 for _ in 0..COUNT {
1054 start_r.recv().unwrap();
1055
1056 s2.send(1).unwrap();
1057 let _ = r1.try_recv();
1058
1059 end_r.recv().unwrap();
1060 }
1061 })
1062 .unwrap();
1063 }
1064 }
1065
1066 #[test]
linearizable_timeout()1067 fn linearizable_timeout() {
1068 #[cfg(miri)]
1069 const COUNT: usize = 50;
1070 #[cfg(not(miri))]
1071 const COUNT: usize = 100_000;
1072
1073 for step in 0..2 {
1074 let (start_s, start_r) = bounded::<()>(0);
1075 let (end_s, end_r) = bounded::<()>(0);
1076
1077 let ((s1, r1), (s2, r2)) = if step == 0 {
1078 (bounded::<i32>(1), bounded::<i32>(1))
1079 } else {
1080 (unbounded::<i32>(), unbounded::<i32>())
1081 };
1082
1083 scope(|scope| {
1084 scope.spawn(|_| {
1085 for _ in 0..COUNT {
1086 start_s.send(()).unwrap();
1087
1088 s1.send(1).unwrap();
1089
1090 let mut sel = Select::new();
1091 let oper1 = sel.recv(&r1);
1092 let oper2 = sel.recv(&r2);
1093 let oper = sel.select_timeout(ms(0));
1094 match oper {
1095 Err(_) => unreachable!(),
1096 Ok(oper) => match oper.index() {
1097 ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1098 ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1099 _ => unreachable!(),
1100 },
1101 }
1102
1103 end_s.send(()).unwrap();
1104 let _ = r2.try_recv();
1105 }
1106 });
1107
1108 for _ in 0..COUNT {
1109 start_r.recv().unwrap();
1110
1111 s2.send(1).unwrap();
1112 let _ = r1.try_recv();
1113
1114 end_r.recv().unwrap();
1115 }
1116 })
1117 .unwrap();
1118 }
1119 }
1120
1121 #[test]
fairness1()1122 fn fairness1() {
1123 #[cfg(miri)]
1124 const COUNT: usize = 50;
1125 #[cfg(not(miri))]
1126 const COUNT: usize = 10_000;
1127
1128 let (s1, r1) = bounded::<()>(COUNT);
1129 let (s2, r2) = unbounded::<()>();
1130
1131 for _ in 0..COUNT {
1132 s1.send(()).unwrap();
1133 s2.send(()).unwrap();
1134 }
1135
1136 let hits = vec![Cell::new(0usize); 4];
1137 for _ in 0..COUNT {
1138 let after = after(ms(0));
1139 let tick = tick(ms(0));
1140
1141 let mut sel = Select::new();
1142 let oper1 = sel.recv(&r1);
1143 let oper2 = sel.recv(&r2);
1144 let oper3 = sel.recv(&after);
1145 let oper4 = sel.recv(&tick);
1146 let oper = sel.select();
1147 match oper.index() {
1148 i if i == oper1 => {
1149 oper.recv(&r1).unwrap();
1150 hits[0].set(hits[0].get() + 1);
1151 }
1152 i if i == oper2 => {
1153 oper.recv(&r2).unwrap();
1154 hits[1].set(hits[1].get() + 1);
1155 }
1156 i if i == oper3 => {
1157 oper.recv(&after).unwrap();
1158 hits[2].set(hits[2].get() + 1);
1159 }
1160 i if i == oper4 => {
1161 oper.recv(&tick).unwrap();
1162 hits[3].set(hits[3].get() + 1);
1163 }
1164 _ => unreachable!(),
1165 }
1166 }
1167 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
1168 }
1169
1170 #[test]
fairness2()1171 fn fairness2() {
1172 #[cfg(miri)]
1173 const COUNT: usize = 50;
1174 #[cfg(not(miri))]
1175 const COUNT: usize = 10_000;
1176
1177 let (s1, r1) = unbounded::<()>();
1178 let (s2, r2) = bounded::<()>(1);
1179 let (s3, r3) = bounded::<()>(0);
1180
1181 scope(|scope| {
1182 scope.spawn(|_| {
1183 for _ in 0..COUNT {
1184 let mut sel = Select::new();
1185 let mut oper1 = None;
1186 let mut oper2 = None;
1187 if s1.is_empty() {
1188 oper1 = Some(sel.send(&s1));
1189 }
1190 if s2.is_empty() {
1191 oper2 = Some(sel.send(&s2));
1192 }
1193 let oper3 = sel.send(&s3);
1194 let oper = sel.select();
1195 match oper.index() {
1196 i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
1197 i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
1198 i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
1199 _ => unreachable!(),
1200 }
1201 }
1202 });
1203
1204 let hits = vec![Cell::new(0usize); 3];
1205 for _ in 0..COUNT {
1206 let mut sel = Select::new();
1207 let oper1 = sel.recv(&r1);
1208 let oper2 = sel.recv(&r2);
1209 let oper3 = sel.recv(&r3);
1210 let oper = sel.select();
1211 match oper.index() {
1212 i if i == oper1 => {
1213 oper.recv(&r1).unwrap();
1214 hits[0].set(hits[0].get() + 1);
1215 }
1216 i if i == oper2 => {
1217 oper.recv(&r2).unwrap();
1218 hits[1].set(hits[1].get() + 1);
1219 }
1220 i if i == oper3 => {
1221 oper.recv(&r3).unwrap();
1222 hits[2].set(hits[2].get() + 1);
1223 }
1224 _ => unreachable!(),
1225 }
1226 }
1227 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
1228 })
1229 .unwrap();
1230 }
1231
1232 #[test]
sync_and_clone()1233 fn sync_and_clone() {
1234 const THREADS: usize = 20;
1235
1236 let (s, r) = &bounded::<usize>(0);
1237
1238 let mut sel = Select::new();
1239 let oper1 = sel.recv(r);
1240 let oper2 = sel.send(s);
1241 let sel = &sel;
1242
1243 scope(|scope| {
1244 for i in 0..THREADS {
1245 scope.spawn(move |_| {
1246 let mut sel = sel.clone();
1247 let oper = sel.select();
1248 match oper.index() {
1249 ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
1250 ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
1251 _ => unreachable!(),
1252 }
1253 });
1254 }
1255 })
1256 .unwrap();
1257
1258 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1259 }
1260
1261 #[test]
send_and_clone()1262 fn send_and_clone() {
1263 const THREADS: usize = 20;
1264
1265 let (s, r) = &bounded::<usize>(0);
1266
1267 let mut sel = Select::new();
1268 let oper1 = sel.recv(r);
1269 let oper2 = sel.send(s);
1270
1271 scope(|scope| {
1272 for i in 0..THREADS {
1273 let mut sel = sel.clone();
1274 scope.spawn(move |_| {
1275 let oper = sel.select();
1276 match oper.index() {
1277 ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
1278 ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
1279 _ => unreachable!(),
1280 }
1281 });
1282 }
1283 })
1284 .unwrap();
1285
1286 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1287 }
1288
1289 #[test]
reuse()1290 fn reuse() {
1291 #[cfg(miri)]
1292 const COUNT: usize = 50;
1293 #[cfg(not(miri))]
1294 const COUNT: usize = 10_000;
1295
1296 let (s1, r1) = bounded(0);
1297 let (s2, r2) = bounded(0);
1298 let (s3, r3) = bounded(100);
1299
1300 scope(|scope| {
1301 scope.spawn(|_| {
1302 for i in 0..COUNT {
1303 s1.send(i).unwrap();
1304 assert_eq!(r2.recv().unwrap(), i);
1305 r3.recv().unwrap();
1306 }
1307 });
1308
1309 let mut sel = Select::new();
1310 let oper1 = sel.recv(&r1);
1311 let oper2 = sel.send(&s2);
1312
1313 for i in 0..COUNT {
1314 for _ in 0..2 {
1315 let oper = sel.select();
1316 match oper.index() {
1317 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
1318 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
1319 _ => unreachable!(),
1320 }
1321 }
1322 s3.send(()).unwrap();
1323 }
1324 })
1325 .unwrap();
1326 }
1327