1 //! Tests for channel selection using the `Select` struct.
2 
3 use std::any::Any;
4 use std::cell::Cell;
5 use std::thread;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError};
9 use crossbeam_utils::thread::scope;
10 
ms(ms: u64) -> Duration11 fn ms(ms: u64) -> Duration {
12     Duration::from_millis(ms)
13 }
14 
15 #[test]
smoke1()16 fn smoke1() {
17     let (s1, r1) = unbounded::<usize>();
18     let (s2, r2) = unbounded::<usize>();
19 
20     s1.send(1).unwrap();
21 
22     let mut sel = Select::new();
23     let oper1 = sel.recv(&r1);
24     let oper2 = sel.recv(&r2);
25     let oper = sel.select();
26     match oper.index() {
27         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
28         i if i == oper2 => panic!(),
29         _ => unreachable!(),
30     }
31 
32     s2.send(2).unwrap();
33 
34     let mut sel = Select::new();
35     let oper1 = sel.recv(&r1);
36     let oper2 = sel.recv(&r2);
37     let oper = sel.select();
38     match oper.index() {
39         i if i == oper1 => panic!(),
40         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
41         _ => unreachable!(),
42     }
43 }
44 
45 #[test]
smoke2()46 fn smoke2() {
47     let (_s1, r1) = unbounded::<i32>();
48     let (_s2, r2) = unbounded::<i32>();
49     let (_s3, r3) = unbounded::<i32>();
50     let (_s4, r4) = unbounded::<i32>();
51     let (s5, r5) = unbounded::<i32>();
52 
53     s5.send(5).unwrap();
54 
55     let mut sel = Select::new();
56     let oper1 = sel.recv(&r1);
57     let oper2 = sel.recv(&r2);
58     let oper3 = sel.recv(&r3);
59     let oper4 = sel.recv(&r4);
60     let oper5 = sel.recv(&r5);
61     let oper = sel.select();
62     match oper.index() {
63         i if i == oper1 => panic!(),
64         i if i == oper2 => panic!(),
65         i if i == oper3 => panic!(),
66         i if i == oper4 => panic!(),
67         i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)),
68         _ => unreachable!(),
69     }
70 }
71 
72 #[test]
disconnected()73 fn disconnected() {
74     let (s1, r1) = unbounded::<i32>();
75     let (s2, r2) = unbounded::<i32>();
76 
77     scope(|scope| {
78         scope.spawn(|_| {
79             drop(s1);
80             thread::sleep(ms(500));
81             s2.send(5).unwrap();
82         });
83 
84         let mut sel = Select::new();
85         let oper1 = sel.recv(&r1);
86         let oper2 = sel.recv(&r2);
87         let oper = sel.select_timeout(ms(1000));
88         match oper {
89             Err(_) => panic!(),
90             Ok(oper) => match oper.index() {
91                 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
92                 i if i == oper2 => panic!(),
93                 _ => unreachable!(),
94             },
95         }
96 
97         r2.recv().unwrap();
98     })
99     .unwrap();
100 
101     let mut sel = Select::new();
102     let oper1 = sel.recv(&r1);
103     let oper2 = sel.recv(&r2);
104     let oper = sel.select_timeout(ms(1000));
105     match oper {
106         Err(_) => panic!(),
107         Ok(oper) => match oper.index() {
108             i if i == oper1 => assert!(oper.recv(&r1).is_err()),
109             i if i == oper2 => panic!(),
110             _ => unreachable!(),
111         },
112     }
113 
114     scope(|scope| {
115         scope.spawn(|_| {
116             thread::sleep(ms(500));
117             drop(s2);
118         });
119 
120         let mut sel = Select::new();
121         let oper1 = sel.recv(&r2);
122         let oper = sel.select_timeout(ms(1000));
123         match oper {
124             Err(_) => panic!(),
125             Ok(oper) => match oper.index() {
126                 i if i == oper1 => assert!(oper.recv(&r2).is_err()),
127                 _ => unreachable!(),
128             },
129         }
130     })
131     .unwrap();
132 }
133 
134 #[test]
default()135 fn default() {
136     let (s1, r1) = unbounded::<i32>();
137     let (s2, r2) = unbounded::<i32>();
138 
139     let mut sel = Select::new();
140     let _oper1 = sel.recv(&r1);
141     let _oper2 = sel.recv(&r2);
142     let oper = sel.try_select();
143     match oper {
144         Err(_) => {}
145         Ok(_) => panic!(),
146     }
147 
148     drop(s1);
149 
150     let mut sel = Select::new();
151     let oper1 = sel.recv(&r1);
152     let oper2 = sel.recv(&r2);
153     let oper = sel.try_select();
154     match oper {
155         Err(_) => panic!(),
156         Ok(oper) => match oper.index() {
157             i if i == oper1 => assert!(oper.recv(&r1).is_err()),
158             i if i == oper2 => panic!(),
159             _ => unreachable!(),
160         },
161     }
162 
163     s2.send(2).unwrap();
164 
165     let mut sel = Select::new();
166     let oper1 = sel.recv(&r2);
167     let oper = sel.try_select();
168     match oper {
169         Err(_) => panic!(),
170         Ok(oper) => match oper.index() {
171             i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)),
172             _ => unreachable!(),
173         },
174     }
175 
176     let mut sel = Select::new();
177     let _oper1 = sel.recv(&r2);
178     let oper = sel.try_select();
179     match oper {
180         Err(_) => {}
181         Ok(_) => panic!(),
182     }
183 
184     let mut sel = Select::new();
185     let oper = sel.try_select();
186     match oper {
187         Err(_) => {}
188         Ok(_) => panic!(),
189     }
190 }
191 
192 #[test]
timeout()193 fn timeout() {
194     let (_s1, r1) = unbounded::<i32>();
195     let (s2, r2) = unbounded::<i32>();
196 
197     scope(|scope| {
198         scope.spawn(|_| {
199             thread::sleep(ms(1500));
200             s2.send(2).unwrap();
201         });
202 
203         let mut sel = Select::new();
204         let oper1 = sel.recv(&r1);
205         let oper2 = sel.recv(&r2);
206         let oper = sel.select_timeout(ms(1000));
207         match oper {
208             Err(_) => {}
209             Ok(oper) => match oper.index() {
210                 i if i == oper1 => panic!(),
211                 i if i == oper2 => panic!(),
212                 _ => unreachable!(),
213             },
214         }
215 
216         let mut sel = Select::new();
217         let oper1 = sel.recv(&r1);
218         let oper2 = sel.recv(&r2);
219         let oper = sel.select_timeout(ms(1000));
220         match oper {
221             Err(_) => panic!(),
222             Ok(oper) => match oper.index() {
223                 i if i == oper1 => panic!(),
224                 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
225                 _ => unreachable!(),
226             },
227         }
228     })
229     .unwrap();
230 
231     scope(|scope| {
232         let (s, r) = unbounded::<i32>();
233 
234         scope.spawn(move |_| {
235             thread::sleep(ms(500));
236             drop(s);
237         });
238 
239         let mut sel = Select::new();
240         let oper = sel.select_timeout(ms(1000));
241         match oper {
242             Err(_) => {
243                 let mut sel = Select::new();
244                 let oper1 = sel.recv(&r);
245                 let oper = sel.try_select();
246                 match oper {
247                     Err(_) => panic!(),
248                     Ok(oper) => match oper.index() {
249                         i if i == oper1 => assert!(oper.recv(&r).is_err()),
250                         _ => unreachable!(),
251                     },
252                 }
253             }
254             Ok(_) => unreachable!(),
255         }
256     })
257     .unwrap();
258 }
259 
260 #[test]
default_when_disconnected()261 fn default_when_disconnected() {
262     let (_, r) = unbounded::<i32>();
263 
264     let mut sel = Select::new();
265     let oper1 = sel.recv(&r);
266     let oper = sel.try_select();
267     match oper {
268         Err(_) => panic!(),
269         Ok(oper) => match oper.index() {
270             i if i == oper1 => assert!(oper.recv(&r).is_err()),
271             _ => unreachable!(),
272         },
273     }
274 
275     let (_, r) = unbounded::<i32>();
276 
277     let mut sel = Select::new();
278     let oper1 = sel.recv(&r);
279     let oper = sel.select_timeout(ms(1000));
280     match oper {
281         Err(_) => panic!(),
282         Ok(oper) => match oper.index() {
283             i if i == oper1 => assert!(oper.recv(&r).is_err()),
284             _ => unreachable!(),
285         },
286     }
287 
288     let (s, _) = bounded::<i32>(0);
289 
290     let mut sel = Select::new();
291     let oper1 = sel.send(&s);
292     let oper = sel.try_select();
293     match oper {
294         Err(_) => panic!(),
295         Ok(oper) => match oper.index() {
296             i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
297             _ => unreachable!(),
298         },
299     }
300 
301     let (s, _) = bounded::<i32>(0);
302 
303     let mut sel = Select::new();
304     let oper1 = sel.send(&s);
305     let oper = sel.select_timeout(ms(1000));
306     match oper {
307         Err(_) => panic!(),
308         Ok(oper) => match oper.index() {
309             i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
310             _ => unreachable!(),
311         },
312     }
313 }
314 
315 #[test]
default_only()316 fn default_only() {
317     let start = Instant::now();
318 
319     let mut sel = Select::new();
320     let oper = sel.try_select();
321     assert!(oper.is_err());
322     let now = Instant::now();
323     assert!(now - start <= ms(50));
324 
325     let start = Instant::now();
326     let mut sel = Select::new();
327     let oper = sel.select_timeout(ms(500));
328     assert!(oper.is_err());
329     let now = Instant::now();
330     assert!(now - start >= ms(450));
331     assert!(now - start <= ms(550));
332 }
333 
334 #[test]
unblocks()335 fn unblocks() {
336     let (s1, r1) = bounded::<i32>(0);
337     let (s2, r2) = bounded::<i32>(0);
338 
339     scope(|scope| {
340         scope.spawn(|_| {
341             thread::sleep(ms(500));
342             s2.send(2).unwrap();
343         });
344 
345         let mut sel = Select::new();
346         let oper1 = sel.recv(&r1);
347         let oper2 = sel.recv(&r2);
348         let oper = sel.select_timeout(ms(1000));
349         match oper {
350             Err(_) => panic!(),
351             Ok(oper) => match oper.index() {
352                 i if i == oper1 => panic!(),
353                 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
354                 _ => unreachable!(),
355             },
356         }
357     })
358     .unwrap();
359 
360     scope(|scope| {
361         scope.spawn(|_| {
362             thread::sleep(ms(500));
363             assert_eq!(r1.recv().unwrap(), 1);
364         });
365 
366         let mut sel = Select::new();
367         let oper1 = sel.send(&s1);
368         let oper2 = sel.send(&s2);
369         let oper = sel.select_timeout(ms(1000));
370         match oper {
371             Err(_) => panic!(),
372             Ok(oper) => match oper.index() {
373                 i if i == oper1 => oper.send(&s1, 1).unwrap(),
374                 i if i == oper2 => panic!(),
375                 _ => unreachable!(),
376             },
377         }
378     })
379     .unwrap();
380 }
381 
382 #[test]
both_ready()383 fn both_ready() {
384     let (s1, r1) = bounded(0);
385     let (s2, r2) = bounded(0);
386 
387     scope(|scope| {
388         scope.spawn(|_| {
389             thread::sleep(ms(500));
390             s1.send(1).unwrap();
391             assert_eq!(r2.recv().unwrap(), 2);
392         });
393 
394         for _ in 0..2 {
395             let mut sel = Select::new();
396             let oper1 = sel.recv(&r1);
397             let oper2 = sel.send(&s2);
398             let oper = sel.select();
399             match oper.index() {
400                 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
401                 i if i == oper2 => oper.send(&s2, 2).unwrap(),
402                 _ => unreachable!(),
403             }
404         }
405     })
406     .unwrap();
407 }
408 
409 #[test]
loop_try()410 fn loop_try() {
411     const RUNS: usize = 20;
412 
413     for _ in 0..RUNS {
414         let (s1, r1) = bounded::<i32>(0);
415         let (s2, r2) = bounded::<i32>(0);
416         let (s_end, r_end) = bounded::<()>(0);
417 
418         scope(|scope| {
419             scope.spawn(|_| loop {
420                 let mut done = false;
421 
422                 let mut sel = Select::new();
423                 let oper1 = sel.send(&s1);
424                 let oper = sel.try_select();
425                 match oper {
426                     Err(_) => {}
427                     Ok(oper) => match oper.index() {
428                         i if i == oper1 => {
429                             let _ = oper.send(&s1, 1);
430                             done = true;
431                         }
432                         _ => unreachable!(),
433                     },
434                 }
435                 if done {
436                     break;
437                 }
438 
439                 let mut sel = Select::new();
440                 let oper1 = sel.recv(&r_end);
441                 let oper = sel.try_select();
442                 match oper {
443                     Err(_) => {}
444                     Ok(oper) => match oper.index() {
445                         i if i == oper1 => {
446                             let _ = oper.recv(&r_end);
447                             done = true;
448                         }
449                         _ => unreachable!(),
450                     },
451                 }
452                 if done {
453                     break;
454                 }
455             });
456 
457             scope.spawn(|_| loop {
458                 if let Ok(x) = r2.try_recv() {
459                     assert_eq!(x, 2);
460                     break;
461                 }
462 
463                 let mut done = false;
464                 let mut sel = Select::new();
465                 let oper1 = sel.recv(&r_end);
466                 let oper = sel.try_select();
467                 match oper {
468                     Err(_) => {}
469                     Ok(oper) => match oper.index() {
470                         i if i == oper1 => {
471                             let _ = oper.recv(&r_end);
472                             done = true;
473                         }
474                         _ => unreachable!(),
475                     },
476                 }
477                 if done {
478                     break;
479                 }
480             });
481 
482             scope.spawn(|_| {
483                 thread::sleep(ms(500));
484 
485                 let mut sel = Select::new();
486                 let oper1 = sel.recv(&r1);
487                 let oper2 = sel.send(&s2);
488                 let oper = sel.select_timeout(ms(1000));
489                 match oper {
490                     Err(_) => {}
491                     Ok(oper) => match oper.index() {
492                         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
493                         i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()),
494                         _ => unreachable!(),
495                     },
496                 }
497 
498                 drop(s_end);
499             });
500         })
501         .unwrap();
502     }
503 }
504 
505 #[test]
cloning1()506 fn cloning1() {
507     scope(|scope| {
508         let (s1, r1) = unbounded::<i32>();
509         let (_s2, r2) = unbounded::<i32>();
510         let (s3, r3) = unbounded::<()>();
511 
512         scope.spawn(move |_| {
513             r3.recv().unwrap();
514             drop(s1.clone());
515             assert!(r3.try_recv().is_err());
516             s1.send(1).unwrap();
517             r3.recv().unwrap();
518         });
519 
520         s3.send(()).unwrap();
521 
522         let mut sel = Select::new();
523         let oper1 = sel.recv(&r1);
524         let oper2 = sel.recv(&r2);
525         let oper = sel.select();
526         match oper.index() {
527             i if i == oper1 => drop(oper.recv(&r1)),
528             i if i == oper2 => drop(oper.recv(&r2)),
529             _ => unreachable!(),
530         }
531 
532         s3.send(()).unwrap();
533     })
534     .unwrap();
535 }
536 
537 #[test]
cloning2()538 fn cloning2() {
539     let (s1, r1) = unbounded::<()>();
540     let (s2, r2) = unbounded::<()>();
541     let (_s3, _r3) = unbounded::<()>();
542 
543     scope(|scope| {
544         scope.spawn(move |_| {
545             let mut sel = Select::new();
546             let oper1 = sel.recv(&r1);
547             let oper2 = sel.recv(&r2);
548             let oper = sel.select();
549             match oper.index() {
550                 i if i == oper1 => panic!(),
551                 i if i == oper2 => drop(oper.recv(&r2)),
552                 _ => unreachable!(),
553             }
554         });
555 
556         thread::sleep(ms(500));
557         drop(s1.clone());
558         s2.send(()).unwrap();
559     })
560     .unwrap();
561 }
562 
563 #[test]
preflight1()564 fn preflight1() {
565     let (s, r) = unbounded();
566     s.send(()).unwrap();
567 
568     let mut sel = Select::new();
569     let oper1 = sel.recv(&r);
570     let oper = sel.select();
571     match oper.index() {
572         i if i == oper1 => drop(oper.recv(&r)),
573         _ => unreachable!(),
574     }
575 }
576 
577 #[test]
preflight2()578 fn preflight2() {
579     let (s, r) = unbounded();
580     drop(s.clone());
581     s.send(()).unwrap();
582     drop(s);
583 
584     let mut sel = Select::new();
585     let oper1 = sel.recv(&r);
586     let oper = sel.select();
587     match oper.index() {
588         i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())),
589         _ => unreachable!(),
590     }
591 
592     assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
593 }
594 
595 #[test]
preflight3()596 fn preflight3() {
597     let (s, r) = unbounded();
598     drop(s.clone());
599     s.send(()).unwrap();
600     drop(s);
601     r.recv().unwrap();
602 
603     let mut sel = Select::new();
604     let oper1 = sel.recv(&r);
605     let oper = sel.select();
606     match oper.index() {
607         i if i == oper1 => assert!(oper.recv(&r).is_err()),
608         _ => unreachable!(),
609     }
610 }
611 
612 #[test]
duplicate_operations()613 fn duplicate_operations() {
614     let (s, r) = unbounded::<i32>();
615     let hit = vec![Cell::new(false); 4];
616 
617     while hit.iter().map(|h| h.get()).any(|hit| !hit) {
618         let mut sel = Select::new();
619         let oper0 = sel.recv(&r);
620         let oper1 = sel.recv(&r);
621         let oper2 = sel.send(&s);
622         let oper3 = sel.send(&s);
623         let oper = sel.select();
624         match oper.index() {
625             i if i == oper0 => {
626                 assert!(oper.recv(&r).is_ok());
627                 hit[0].set(true);
628             }
629             i if i == oper1 => {
630                 assert!(oper.recv(&r).is_ok());
631                 hit[1].set(true);
632             }
633             i if i == oper2 => {
634                 assert!(oper.send(&s, 0).is_ok());
635                 hit[2].set(true);
636             }
637             i if i == oper3 => {
638                 assert!(oper.send(&s, 0).is_ok());
639                 hit[3].set(true);
640             }
641             _ => unreachable!(),
642         }
643     }
644 }
645 
646 #[test]
nesting()647 fn nesting() {
648     let (s, r) = unbounded::<i32>();
649 
650     let mut sel = Select::new();
651     let oper1 = sel.send(&s);
652     let oper = sel.select();
653     match oper.index() {
654         i if i == oper1 => {
655             assert!(oper.send(&s, 0).is_ok());
656 
657             let mut sel = Select::new();
658             let oper1 = sel.recv(&r);
659             let oper = sel.select();
660             match oper.index() {
661                 i if i == oper1 => {
662                     assert_eq!(oper.recv(&r), Ok(0));
663 
664                     let mut sel = Select::new();
665                     let oper1 = sel.send(&s);
666                     let oper = sel.select();
667                     match oper.index() {
668                         i if i == oper1 => {
669                             assert!(oper.send(&s, 1).is_ok());
670 
671                             let mut sel = Select::new();
672                             let oper1 = sel.recv(&r);
673                             let oper = sel.select();
674                             match oper.index() {
675                                 i if i == oper1 => {
676                                     assert_eq!(oper.recv(&r), Ok(1));
677                                 }
678                                 _ => unreachable!(),
679                             }
680                         }
681                         _ => unreachable!(),
682                     }
683                 }
684                 _ => unreachable!(),
685             }
686         }
687         _ => unreachable!(),
688     }
689 }
690 
691 #[test]
stress_recv()692 fn stress_recv() {
693     #[cfg(miri)]
694     const COUNT: usize = 50;
695     #[cfg(not(miri))]
696     const COUNT: usize = 10_000;
697 
698     let (s1, r1) = unbounded();
699     let (s2, r2) = bounded(5);
700     let (s3, r3) = bounded(100);
701 
702     scope(|scope| {
703         scope.spawn(|_| {
704             for i in 0..COUNT {
705                 s1.send(i).unwrap();
706                 r3.recv().unwrap();
707 
708                 s2.send(i).unwrap();
709                 r3.recv().unwrap();
710             }
711         });
712 
713         for i in 0..COUNT {
714             for _ in 0..2 {
715                 let mut sel = Select::new();
716                 let oper1 = sel.recv(&r1);
717                 let oper2 = sel.recv(&r2);
718                 let oper = sel.select();
719                 match oper.index() {
720                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
721                     ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)),
722                     _ => unreachable!(),
723                 }
724 
725                 s3.send(()).unwrap();
726             }
727         }
728     })
729     .unwrap();
730 }
731 
732 #[test]
stress_send()733 fn stress_send() {
734     #[cfg(miri)]
735     const COUNT: usize = 50;
736     #[cfg(not(miri))]
737     const COUNT: usize = 10_000;
738 
739     let (s1, r1) = bounded(0);
740     let (s2, r2) = bounded(0);
741     let (s3, r3) = bounded(100);
742 
743     scope(|scope| {
744         scope.spawn(|_| {
745             for i in 0..COUNT {
746                 assert_eq!(r1.recv().unwrap(), i);
747                 assert_eq!(r2.recv().unwrap(), i);
748                 r3.recv().unwrap();
749             }
750         });
751 
752         for i in 0..COUNT {
753             for _ in 0..2 {
754                 let mut sel = Select::new();
755                 let oper1 = sel.send(&s1);
756                 let oper2 = sel.send(&s2);
757                 let oper = sel.select();
758                 match oper.index() {
759                     ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()),
760                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
761                     _ => unreachable!(),
762                 }
763             }
764             s3.send(()).unwrap();
765         }
766     })
767     .unwrap();
768 }
769 
770 #[test]
stress_mixed()771 fn stress_mixed() {
772     #[cfg(miri)]
773     const COUNT: usize = 100;
774     #[cfg(not(miri))]
775     const COUNT: usize = 10_000;
776 
777     let (s1, r1) = bounded(0);
778     let (s2, r2) = bounded(0);
779     let (s3, r3) = bounded(100);
780 
781     scope(|scope| {
782         scope.spawn(|_| {
783             for i in 0..COUNT {
784                 s1.send(i).unwrap();
785                 assert_eq!(r2.recv().unwrap(), i);
786                 r3.recv().unwrap();
787             }
788         });
789 
790         for i in 0..COUNT {
791             for _ in 0..2 {
792                 let mut sel = Select::new();
793                 let oper1 = sel.recv(&r1);
794                 let oper2 = sel.send(&s2);
795                 let oper = sel.select();
796                 match oper.index() {
797                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
798                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
799                     _ => unreachable!(),
800                 }
801             }
802             s3.send(()).unwrap();
803         }
804     })
805     .unwrap();
806 }
807 
808 #[test]
stress_timeout_two_threads()809 fn stress_timeout_two_threads() {
810     const COUNT: usize = 20;
811 
812     let (s, r) = bounded(2);
813 
814     scope(|scope| {
815         scope.spawn(|_| {
816             for i in 0..COUNT {
817                 if i % 2 == 0 {
818                     thread::sleep(ms(500));
819                 }
820 
821                 loop {
822                     let mut sel = Select::new();
823                     let oper1 = sel.send(&s);
824                     let oper = sel.select_timeout(ms(100));
825                     match oper {
826                         Err(_) => {}
827                         Ok(oper) => match oper.index() {
828                             ix if ix == oper1 => {
829                                 assert!(oper.send(&s, i).is_ok());
830                                 break;
831                             }
832                             _ => unreachable!(),
833                         },
834                     }
835                 }
836             }
837         });
838 
839         scope.spawn(|_| {
840             for i in 0..COUNT {
841                 if i % 2 == 0 {
842                     thread::sleep(ms(500));
843                 }
844 
845                 loop {
846                     let mut sel = Select::new();
847                     let oper1 = sel.recv(&r);
848                     let oper = sel.select_timeout(ms(100));
849                     match oper {
850                         Err(_) => {}
851                         Ok(oper) => match oper.index() {
852                             ix if ix == oper1 => {
853                                 assert_eq!(oper.recv(&r), Ok(i));
854                                 break;
855                             }
856                             _ => unreachable!(),
857                         },
858                     }
859                 }
860             }
861         });
862     })
863     .unwrap();
864 }
865 
866 #[test]
send_recv_same_channel()867 fn send_recv_same_channel() {
868     let (s, r) = bounded::<i32>(0);
869     let mut sel = Select::new();
870     let oper1 = sel.send(&s);
871     let oper2 = sel.recv(&r);
872     let oper = sel.select_timeout(ms(100));
873     match oper {
874         Err(_) => {}
875         Ok(oper) => match oper.index() {
876             ix if ix == oper1 => panic!(),
877             ix if ix == oper2 => panic!(),
878             _ => unreachable!(),
879         },
880     }
881 
882     let (s, r) = unbounded::<i32>();
883     let mut sel = Select::new();
884     let oper1 = sel.send(&s);
885     let oper2 = sel.recv(&r);
886     let oper = sel.select_timeout(ms(100));
887     match oper {
888         Err(_) => panic!(),
889         Ok(oper) => match oper.index() {
890             ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()),
891             ix if ix == oper2 => panic!(),
892             _ => unreachable!(),
893         },
894     }
895 }
896 
897 #[test]
matching()898 fn matching() {
899     const THREADS: usize = 44;
900 
901     let (s, r) = &bounded::<usize>(0);
902 
903     scope(|scope| {
904         for i in 0..THREADS {
905             scope.spawn(move |_| {
906                 let mut sel = Select::new();
907                 let oper1 = sel.recv(r);
908                 let oper2 = sel.send(s);
909                 let oper = sel.select();
910                 match oper.index() {
911                     ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
912                     ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
913                     _ => unreachable!(),
914                 }
915             });
916         }
917     })
918     .unwrap();
919 
920     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
921 }
922 
923 #[test]
matching_with_leftover()924 fn matching_with_leftover() {
925     const THREADS: usize = 55;
926 
927     let (s, r) = &bounded::<usize>(0);
928 
929     scope(|scope| {
930         for i in 0..THREADS {
931             scope.spawn(move |_| {
932                 let mut sel = Select::new();
933                 let oper1 = sel.recv(r);
934                 let oper2 = sel.send(s);
935                 let oper = sel.select();
936                 match oper.index() {
937                     ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
938                     ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
939                     _ => unreachable!(),
940                 }
941             });
942         }
943         s.send(!0).unwrap();
944     })
945     .unwrap();
946 
947     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
948 }
949 
950 #[test]
channel_through_channel()951 fn channel_through_channel() {
952     #[cfg(miri)]
953     const COUNT: usize = 50;
954     #[cfg(not(miri))]
955     const COUNT: usize = 1000;
956 
957     type T = Box<dyn Any + Send>;
958 
959     for cap in 0..3 {
960         let (s, r) = bounded::<T>(cap);
961 
962         scope(|scope| {
963             scope.spawn(move |_| {
964                 let mut s = s;
965 
966                 for _ in 0..COUNT {
967                     let (new_s, new_r) = bounded(cap);
968                     let new_r: T = Box::new(Some(new_r));
969 
970                     {
971                         let mut sel = Select::new();
972                         let oper1 = sel.send(&s);
973                         let oper = sel.select();
974                         match oper.index() {
975                             ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()),
976                             _ => unreachable!(),
977                         }
978                     }
979 
980                     s = new_s;
981                 }
982             });
983 
984             scope.spawn(move |_| {
985                 let mut r = r;
986 
987                 for _ in 0..COUNT {
988                     let new = {
989                         let mut sel = Select::new();
990                         let oper1 = sel.recv(&r);
991                         let oper = sel.select();
992                         match oper.index() {
993                             ix if ix == oper1 => oper
994                                 .recv(&r)
995                                 .unwrap()
996                                 .downcast_mut::<Option<Receiver<T>>>()
997                                 .unwrap()
998                                 .take()
999                                 .unwrap(),
1000                             _ => unreachable!(),
1001                         }
1002                     };
1003                     r = new;
1004                 }
1005             });
1006         })
1007         .unwrap();
1008     }
1009 }
1010 
1011 #[test]
linearizable_try()1012 fn linearizable_try() {
1013     #[cfg(miri)]
1014     const COUNT: usize = 50;
1015     #[cfg(not(miri))]
1016     const COUNT: usize = 100_000;
1017 
1018     for step in 0..2 {
1019         let (start_s, start_r) = bounded::<()>(0);
1020         let (end_s, end_r) = bounded::<()>(0);
1021 
1022         let ((s1, r1), (s2, r2)) = if step == 0 {
1023             (bounded::<i32>(1), bounded::<i32>(1))
1024         } else {
1025             (unbounded::<i32>(), unbounded::<i32>())
1026         };
1027 
1028         scope(|scope| {
1029             scope.spawn(|_| {
1030                 for _ in 0..COUNT {
1031                     start_s.send(()).unwrap();
1032 
1033                     s1.send(1).unwrap();
1034 
1035                     let mut sel = Select::new();
1036                     let oper1 = sel.recv(&r1);
1037                     let oper2 = sel.recv(&r2);
1038                     let oper = sel.try_select();
1039                     match oper {
1040                         Err(_) => unreachable!(),
1041                         Ok(oper) => match oper.index() {
1042                             ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1043                             ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1044                             _ => unreachable!(),
1045                         },
1046                     }
1047 
1048                     end_s.send(()).unwrap();
1049                     let _ = r2.try_recv();
1050                 }
1051             });
1052 
1053             for _ in 0..COUNT {
1054                 start_r.recv().unwrap();
1055 
1056                 s2.send(1).unwrap();
1057                 let _ = r1.try_recv();
1058 
1059                 end_r.recv().unwrap();
1060             }
1061         })
1062         .unwrap();
1063     }
1064 }
1065 
1066 #[test]
linearizable_timeout()1067 fn linearizable_timeout() {
1068     #[cfg(miri)]
1069     const COUNT: usize = 50;
1070     #[cfg(not(miri))]
1071     const COUNT: usize = 100_000;
1072 
1073     for step in 0..2 {
1074         let (start_s, start_r) = bounded::<()>(0);
1075         let (end_s, end_r) = bounded::<()>(0);
1076 
1077         let ((s1, r1), (s2, r2)) = if step == 0 {
1078             (bounded::<i32>(1), bounded::<i32>(1))
1079         } else {
1080             (unbounded::<i32>(), unbounded::<i32>())
1081         };
1082 
1083         scope(|scope| {
1084             scope.spawn(|_| {
1085                 for _ in 0..COUNT {
1086                     start_s.send(()).unwrap();
1087 
1088                     s1.send(1).unwrap();
1089 
1090                     let mut sel = Select::new();
1091                     let oper1 = sel.recv(&r1);
1092                     let oper2 = sel.recv(&r2);
1093                     let oper = sel.select_timeout(ms(0));
1094                     match oper {
1095                         Err(_) => unreachable!(),
1096                         Ok(oper) => match oper.index() {
1097                             ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1098                             ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1099                             _ => unreachable!(),
1100                         },
1101                     }
1102 
1103                     end_s.send(()).unwrap();
1104                     let _ = r2.try_recv();
1105                 }
1106             });
1107 
1108             for _ in 0..COUNT {
1109                 start_r.recv().unwrap();
1110 
1111                 s2.send(1).unwrap();
1112                 let _ = r1.try_recv();
1113 
1114                 end_r.recv().unwrap();
1115             }
1116         })
1117         .unwrap();
1118     }
1119 }
1120 
1121 #[test]
fairness1()1122 fn fairness1() {
1123     #[cfg(miri)]
1124     const COUNT: usize = 50;
1125     #[cfg(not(miri))]
1126     const COUNT: usize = 10_000;
1127 
1128     let (s1, r1) = bounded::<()>(COUNT);
1129     let (s2, r2) = unbounded::<()>();
1130 
1131     for _ in 0..COUNT {
1132         s1.send(()).unwrap();
1133         s2.send(()).unwrap();
1134     }
1135 
1136     let hits = vec![Cell::new(0usize); 4];
1137     for _ in 0..COUNT {
1138         let after = after(ms(0));
1139         let tick = tick(ms(0));
1140 
1141         let mut sel = Select::new();
1142         let oper1 = sel.recv(&r1);
1143         let oper2 = sel.recv(&r2);
1144         let oper3 = sel.recv(&after);
1145         let oper4 = sel.recv(&tick);
1146         let oper = sel.select();
1147         match oper.index() {
1148             i if i == oper1 => {
1149                 oper.recv(&r1).unwrap();
1150                 hits[0].set(hits[0].get() + 1);
1151             }
1152             i if i == oper2 => {
1153                 oper.recv(&r2).unwrap();
1154                 hits[1].set(hits[1].get() + 1);
1155             }
1156             i if i == oper3 => {
1157                 oper.recv(&after).unwrap();
1158                 hits[2].set(hits[2].get() + 1);
1159             }
1160             i if i == oper4 => {
1161                 oper.recv(&tick).unwrap();
1162                 hits[3].set(hits[3].get() + 1);
1163             }
1164             _ => unreachable!(),
1165         }
1166     }
1167     assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
1168 }
1169 
1170 #[test]
fairness2()1171 fn fairness2() {
1172     #[cfg(miri)]
1173     const COUNT: usize = 50;
1174     #[cfg(not(miri))]
1175     const COUNT: usize = 10_000;
1176 
1177     let (s1, r1) = unbounded::<()>();
1178     let (s2, r2) = bounded::<()>(1);
1179     let (s3, r3) = bounded::<()>(0);
1180 
1181     scope(|scope| {
1182         scope.spawn(|_| {
1183             for _ in 0..COUNT {
1184                 let mut sel = Select::new();
1185                 let mut oper1 = None;
1186                 let mut oper2 = None;
1187                 if s1.is_empty() {
1188                     oper1 = Some(sel.send(&s1));
1189                 }
1190                 if s2.is_empty() {
1191                     oper2 = Some(sel.send(&s2));
1192                 }
1193                 let oper3 = sel.send(&s3);
1194                 let oper = sel.select();
1195                 match oper.index() {
1196                     i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
1197                     i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
1198                     i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
1199                     _ => unreachable!(),
1200                 }
1201             }
1202         });
1203 
1204         let hits = vec![Cell::new(0usize); 3];
1205         for _ in 0..COUNT {
1206             let mut sel = Select::new();
1207             let oper1 = sel.recv(&r1);
1208             let oper2 = sel.recv(&r2);
1209             let oper3 = sel.recv(&r3);
1210             let oper = sel.select();
1211             match oper.index() {
1212                 i if i == oper1 => {
1213                     oper.recv(&r1).unwrap();
1214                     hits[0].set(hits[0].get() + 1);
1215                 }
1216                 i if i == oper2 => {
1217                     oper.recv(&r2).unwrap();
1218                     hits[1].set(hits[1].get() + 1);
1219                 }
1220                 i if i == oper3 => {
1221                     oper.recv(&r3).unwrap();
1222                     hits[2].set(hits[2].get() + 1);
1223                 }
1224                 _ => unreachable!(),
1225             }
1226         }
1227         assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
1228     })
1229     .unwrap();
1230 }
1231 
1232 #[test]
sync_and_clone()1233 fn sync_and_clone() {
1234     const THREADS: usize = 20;
1235 
1236     let (s, r) = &bounded::<usize>(0);
1237 
1238     let mut sel = Select::new();
1239     let oper1 = sel.recv(r);
1240     let oper2 = sel.send(s);
1241     let sel = &sel;
1242 
1243     scope(|scope| {
1244         for i in 0..THREADS {
1245             scope.spawn(move |_| {
1246                 let mut sel = sel.clone();
1247                 let oper = sel.select();
1248                 match oper.index() {
1249                     ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
1250                     ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
1251                     _ => unreachable!(),
1252                 }
1253             });
1254         }
1255     })
1256     .unwrap();
1257 
1258     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1259 }
1260 
1261 #[test]
send_and_clone()1262 fn send_and_clone() {
1263     const THREADS: usize = 20;
1264 
1265     let (s, r) = &bounded::<usize>(0);
1266 
1267     let mut sel = Select::new();
1268     let oper1 = sel.recv(r);
1269     let oper2 = sel.send(s);
1270 
1271     scope(|scope| {
1272         for i in 0..THREADS {
1273             let mut sel = sel.clone();
1274             scope.spawn(move |_| {
1275                 let oper = sel.select();
1276                 match oper.index() {
1277                     ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)),
1278                     ix if ix == oper2 => assert!(oper.send(s, i).is_ok()),
1279                     _ => unreachable!(),
1280                 }
1281             });
1282         }
1283     })
1284     .unwrap();
1285 
1286     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1287 }
1288 
1289 #[test]
reuse()1290 fn reuse() {
1291     #[cfg(miri)]
1292     const COUNT: usize = 50;
1293     #[cfg(not(miri))]
1294     const COUNT: usize = 10_000;
1295 
1296     let (s1, r1) = bounded(0);
1297     let (s2, r2) = bounded(0);
1298     let (s3, r3) = bounded(100);
1299 
1300     scope(|scope| {
1301         scope.spawn(|_| {
1302             for i in 0..COUNT {
1303                 s1.send(i).unwrap();
1304                 assert_eq!(r2.recv().unwrap(), i);
1305                 r3.recv().unwrap();
1306             }
1307         });
1308 
1309         let mut sel = Select::new();
1310         let oper1 = sel.recv(&r1);
1311         let oper2 = sel.send(&s2);
1312 
1313         for i in 0..COUNT {
1314             for _ in 0..2 {
1315                 let oper = sel.select();
1316                 match oper.index() {
1317                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
1318                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
1319                     _ => unreachable!(),
1320                 }
1321             }
1322             s3.send(()).unwrap();
1323         }
1324     })
1325     .unwrap();
1326 }
1327