1 //! Tests copied from Go and manually rewritten in Rust.
2 //!
3 //! Source:
4 //!   - https://github.com/golang/go
5 //!
6 //! Copyright & License:
7 //!   - Copyright (c) 2009 The Go Authors
8 //!   - https://golang.org/AUTHORS
9 //!   - https://golang.org/LICENSE
10 //!   - https://golang.org/PATENTS
11 
12 #![allow(clippy::redundant_clone)]
13 
14 use std::alloc::{GlobalAlloc, Layout, System};
15 use std::any::Any;
16 use std::cell::Cell;
17 use std::collections::HashMap;
18 use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering::SeqCst};
19 use std::sync::{Arc, Condvar, Mutex};
20 use std::thread;
21 use std::time::Duration;
22 
23 use crossbeam_channel::{bounded, never, select, tick, unbounded, Receiver, Select, Sender};
24 
ms(ms: u64) -> Duration25 fn ms(ms: u64) -> Duration {
26     Duration::from_millis(ms)
27 }
28 
29 struct Chan<T> {
30     inner: Arc<Mutex<ChanInner<T>>>,
31 }
32 
33 struct ChanInner<T> {
34     s: Option<Sender<T>>,
35     r: Option<Receiver<T>>,
36     // Receiver to use when r is None (Go blocks on receiving from nil)
37     nil_r: Receiver<T>,
38     // Sender to use when s is None (Go blocks on sending to nil)
39     nil_s: Sender<T>,
40     // Hold this receiver to prevent nil sender channel from disconnection
41     _nil_sr: Receiver<T>,
42 }
43 
44 impl<T> Clone for Chan<T> {
clone(&self) -> Chan<T>45     fn clone(&self) -> Chan<T> {
46         Chan {
47             inner: self.inner.clone(),
48         }
49     }
50 }
51 
52 impl<T> Chan<T> {
send(&self, msg: T)53     fn send(&self, msg: T) {
54         let s = self
55             .inner
56             .lock()
57             .unwrap()
58             .s
59             .as_ref()
60             .expect("sending into closed channel")
61             .clone();
62         let _ = s.send(msg);
63     }
64 
try_recv(&self) -> Option<T>65     fn try_recv(&self) -> Option<T> {
66         let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone();
67         r.try_recv().ok()
68     }
69 
recv(&self) -> Option<T>70     fn recv(&self) -> Option<T> {
71         let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone();
72         r.recv().ok()
73     }
74 
close_s(&self)75     fn close_s(&self) {
76         self.inner
77             .lock()
78             .unwrap()
79             .s
80             .take()
81             .expect("channel sender already closed");
82     }
83 
close_r(&self)84     fn close_r(&self) {
85         self.inner
86             .lock()
87             .unwrap()
88             .r
89             .take()
90             .expect("channel receiver already closed");
91     }
92 
has_rx(&self) -> bool93     fn has_rx(&self) -> bool {
94         self.inner.lock().unwrap().r.is_some()
95     }
96 
has_tx(&self) -> bool97     fn has_tx(&self) -> bool {
98         self.inner.lock().unwrap().s.is_some()
99     }
100 
rx(&self) -> Receiver<T>101     fn rx(&self) -> Receiver<T> {
102         let inner = self.inner.lock().unwrap();
103         match inner.r.as_ref() {
104             None => inner.nil_r.clone(),
105             Some(r) => r.clone(),
106         }
107     }
108 
tx(&self) -> Sender<T>109     fn tx(&self) -> Sender<T> {
110         let inner = self.inner.lock().unwrap();
111         match inner.s.as_ref() {
112             None => inner.nil_s.clone(),
113             Some(s) => s.clone(),
114         }
115     }
116 }
117 
118 impl<T> Iterator for Chan<T> {
119     type Item = T;
120 
next(&mut self) -> Option<Self::Item>121     fn next(&mut self) -> Option<Self::Item> {
122         self.recv()
123     }
124 }
125 
126 impl<'a, T> IntoIterator for &'a Chan<T> {
127     type Item = T;
128     type IntoIter = Chan<T>;
129 
into_iter(self) -> Self::IntoIter130     fn into_iter(self) -> Self::IntoIter {
131         self.clone()
132     }
133 }
134 
make<T>(cap: usize) -> Chan<T>135 fn make<T>(cap: usize) -> Chan<T> {
136     let (s, r) = bounded(cap);
137     let (nil_s, _nil_sr) = bounded(0);
138     Chan {
139         inner: Arc::new(Mutex::new(ChanInner {
140             s: Some(s),
141             r: Some(r),
142             nil_r: never(),
143             nil_s,
144             _nil_sr,
145         })),
146     }
147 }
148 
make_unbounded<T>() -> Chan<T>149 fn make_unbounded<T>() -> Chan<T> {
150     let (s, r) = unbounded();
151     let (nil_s, _nil_sr) = bounded(0);
152     Chan {
153         inner: Arc::new(Mutex::new(ChanInner {
154             s: Some(s),
155             r: Some(r),
156             nil_r: never(),
157             nil_s,
158             _nil_sr,
159         })),
160     }
161 }
162 
163 #[derive(Clone)]
164 struct WaitGroup(Arc<WaitGroupInner>);
165 
166 struct WaitGroupInner {
167     cond: Condvar,
168     count: Mutex<i32>,
169 }
170 
171 impl WaitGroup {
new() -> WaitGroup172     fn new() -> WaitGroup {
173         WaitGroup(Arc::new(WaitGroupInner {
174             cond: Condvar::new(),
175             count: Mutex::new(0),
176         }))
177     }
178 
add(&self, delta: i32)179     fn add(&self, delta: i32) {
180         let mut count = self.0.count.lock().unwrap();
181         *count += delta;
182         assert!(*count >= 0);
183         self.0.cond.notify_all();
184     }
185 
done(&self)186     fn done(&self) {
187         self.add(-1);
188     }
189 
wait(&self)190     fn wait(&self) {
191         let mut count = self.0.count.lock().unwrap();
192         while *count > 0 {
193             count = self.0.cond.wait(count).unwrap();
194         }
195     }
196 }
197 
198 struct Defer<F: FnOnce()> {
199     f: Option<Box<F>>,
200 }
201 
202 impl<F: FnOnce()> Drop for Defer<F> {
drop(&mut self)203     fn drop(&mut self) {
204         let f = self.f.take().unwrap();
205         let mut f = Some(f);
206         let mut f = move || f.take().unwrap()();
207         f();
208     }
209 }
210 
211 struct Counter;
212 
213 static ALLOCATED: AtomicUsize = AtomicUsize::new(0);
214 unsafe impl GlobalAlloc for Counter {
alloc(&self, layout: Layout) -> *mut u8215     unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
216         let ret = System.alloc(layout);
217         if !ret.is_null() {
218             ALLOCATED.fetch_add(layout.size(), SeqCst);
219         }
220         ret
221     }
222 
dealloc(&self, ptr: *mut u8, layout: Layout)223     unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
224         System.dealloc(ptr, layout);
225         ALLOCATED.fetch_sub(layout.size(), SeqCst);
226     }
227 }
228 
229 #[global_allocator]
230 static A: Counter = Counter;
231 
232 macro_rules! defer {
233     ($body:expr) => {
234         let _defer = Defer {
235             f: Some(Box::new(|| $body)),
236         };
237     };
238 }
239 
240 macro_rules! go {
241     (@parse $v:ident, $($tail:tt)*) => {{
242         let $v = $v.clone();
243         go!(@parse $($tail)*)
244     }};
245     (@parse $body:expr) => {
246         ::std::thread::spawn(move || {
247             let res = ::std::panic::catch_unwind(::std::panic::AssertUnwindSafe(|| {
248                 $body
249             }));
250             if res.is_err() {
251                 eprintln!("goroutine panicked: {:?}", res);
252                 ::std::process::abort();
253             }
254         })
255     };
256     (@parse $($tail:tt)*) => {
257         compile_error!("invalid `go!` syntax")
258     };
259     ($($tail:tt)*) => {{
260         go!(@parse $($tail)*)
261     }};
262 }
263 
264 // https://github.com/golang/go/blob/master/test/chan/doubleselect.go
265 mod doubleselect {
266     use super::*;
267 
268     #[cfg(miri)]
269     const ITERATIONS: i32 = 100;
270     #[cfg(not(miri))]
271     const ITERATIONS: i32 = 10_000;
272 
sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>)273     fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) {
274         defer! { c1.close_s() }
275         defer! { c2.close_s() }
276         defer! { c3.close_s() }
277         defer! { c4.close_s() }
278 
279         for i in 0..n {
280             select! {
281                 send(c1.tx(), i) -> _ => {}
282                 send(c2.tx(), i) -> _ => {}
283                 send(c3.tx(), i) -> _ => {}
284                 send(c4.tx(), i) -> _ => {}
285             }
286         }
287     }
288 
mux(out: Chan<i32>, inp: Chan<i32>, done: Chan<bool>)289     fn mux(out: Chan<i32>, inp: Chan<i32>, done: Chan<bool>) {
290         for v in inp {
291             out.send(v);
292         }
293         done.send(true);
294     }
295 
recver(inp: Chan<i32>)296     fn recver(inp: Chan<i32>) {
297         let mut seen = HashMap::new();
298 
299         for v in &inp {
300             if seen.contains_key(&v) {
301                 panic!("got duplicate value for {}", v);
302             }
303             seen.insert(v, true);
304         }
305     }
306 
307     #[test]
main()308     fn main() {
309         let c1 = make::<i32>(0);
310         let c2 = make::<i32>(0);
311         let c3 = make::<i32>(0);
312         let c4 = make::<i32>(0);
313         let done = make::<bool>(0);
314         let cmux = make::<i32>(0);
315 
316         go!(c1, c2, c3, c4, sender(ITERATIONS, c1, c2, c3, c4));
317         go!(cmux, c1, done, mux(cmux, c1, done));
318         go!(cmux, c2, done, mux(cmux, c2, done));
319         go!(cmux, c3, done, mux(cmux, c3, done));
320         go!(cmux, c4, done, mux(cmux, c4, done));
321         go!(done, cmux, {
322             done.recv();
323             done.recv();
324             done.recv();
325             done.recv();
326             cmux.close_s();
327         });
328         recver(cmux);
329     }
330 }
331 
332 // https://github.com/golang/go/blob/master/test/chan/fifo.go
333 mod fifo {
334     use super::*;
335 
336     const N: i32 = 10;
337 
338     #[test]
asynch_fifo()339     fn asynch_fifo() {
340         let ch = make::<i32>(N as usize);
341         for i in 0..N {
342             ch.send(i);
343         }
344         for i in 0..N {
345             if ch.recv() != Some(i) {
346                 panic!("bad receive");
347             }
348         }
349     }
350 
chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>)351     fn chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>) {
352         inp.recv();
353         if ch.recv() != Some(val) {
354             panic!("{}", val);
355         }
356         out.send(1);
357     }
358 
359     #[test]
synch_fifo()360     fn synch_fifo() {
361         let ch = make::<i32>(0);
362         let mut inp = make::<i32>(0);
363         let start = inp.clone();
364 
365         for i in 0..N {
366             let out = make::<i32>(0);
367             go!(ch, i, inp, out, chain(ch, i, inp, out));
368             inp = out;
369         }
370 
371         start.send(0);
372         for i in 0..N {
373             ch.send(i);
374         }
375         inp.recv();
376     }
377 }
378 
379 // https://github.com/golang/go/blob/master/test/chan/goroutines.go
380 mod goroutines {
381     use super::*;
382 
f(left: Chan<i32>, right: Chan<i32>)383     fn f(left: Chan<i32>, right: Chan<i32>) {
384         left.send(right.recv().unwrap());
385     }
386 
387     #[test]
main()388     fn main() {
389         let n = 100i32;
390 
391         let leftmost = make::<i32>(0);
392         let mut right = leftmost.clone();
393         let mut left = leftmost.clone();
394 
395         for _ in 0..n {
396             right = make::<i32>(0);
397             go!(left, right, f(left, right));
398             left = right.clone();
399         }
400 
401         go!(right, right.send(1));
402         leftmost.recv().unwrap();
403     }
404 }
405 
406 // https://github.com/golang/go/blob/master/test/chan/nonblock.go
407 mod nonblock {
408     use super::*;
409 
i32receiver(c: Chan<i32>, strobe: Chan<bool>)410     fn i32receiver(c: Chan<i32>, strobe: Chan<bool>) {
411         if c.recv().unwrap() != 123 {
412             panic!("i32 value");
413         }
414         strobe.send(true);
415     }
416 
i32sender(c: Chan<i32>, strobe: Chan<bool>)417     fn i32sender(c: Chan<i32>, strobe: Chan<bool>) {
418         c.send(234);
419         strobe.send(true);
420     }
421 
i64receiver(c: Chan<i64>, strobe: Chan<bool>)422     fn i64receiver(c: Chan<i64>, strobe: Chan<bool>) {
423         if c.recv().unwrap() != 123456 {
424             panic!("i64 value");
425         }
426         strobe.send(true);
427     }
428 
i64sender(c: Chan<i64>, strobe: Chan<bool>)429     fn i64sender(c: Chan<i64>, strobe: Chan<bool>) {
430         c.send(234567);
431         strobe.send(true);
432     }
433 
breceiver(c: Chan<bool>, strobe: Chan<bool>)434     fn breceiver(c: Chan<bool>, strobe: Chan<bool>) {
435         if !c.recv().unwrap() {
436             panic!("b value");
437         }
438         strobe.send(true);
439     }
440 
bsender(c: Chan<bool>, strobe: Chan<bool>)441     fn bsender(c: Chan<bool>, strobe: Chan<bool>) {
442         c.send(true);
443         strobe.send(true);
444     }
445 
sreceiver(c: Chan<String>, strobe: Chan<bool>)446     fn sreceiver(c: Chan<String>, strobe: Chan<bool>) {
447         if c.recv().unwrap() != "hello" {
448             panic!("x value");
449         }
450         strobe.send(true);
451     }
452 
ssender(c: Chan<String>, strobe: Chan<bool>)453     fn ssender(c: Chan<String>, strobe: Chan<bool>) {
454         c.send("hello again".to_string());
455         strobe.send(true);
456     }
457 
458     const MAX_TRIES: usize = 10000; // Up to 100ms per test.
459 
460     #[test]
main()461     fn main() {
462         let ticker = tick(Duration::new(0, 10_000)); // 10 us
463         let sleep = || {
464             ticker.recv().unwrap();
465             ticker.recv().unwrap();
466             thread::yield_now();
467             thread::yield_now();
468             thread::yield_now();
469         };
470 
471         let sync = make::<bool>(0);
472 
473         for buffer in 0..2 {
474             let c32 = make::<i32>(buffer);
475             let c64 = make::<i64>(buffer);
476             let cb = make::<bool>(buffer);
477             let cs = make::<String>(buffer);
478 
479             select! {
480                 recv(c32.rx()) -> _ => panic!("blocked i32sender"),
481                 default => {}
482             }
483 
484             select! {
485                 recv(c64.rx()) -> _ => panic!("blocked i64sender"),
486                 default => {}
487             }
488 
489             select! {
490                 recv(cb.rx()) -> _ => panic!("blocked bsender"),
491                 default => {}
492             }
493 
494             select! {
495                 recv(cs.rx()) -> _ => panic!("blocked ssender"),
496                 default => {}
497             }
498 
499             go!(c32, sync, i32receiver(c32, sync));
500             let mut r#try = 0;
501             loop {
502                 select! {
503                     send(c32.tx(), 123) -> _ => break,
504                     default => {
505                         r#try += 1;
506                         if r#try > MAX_TRIES {
507                             println!("i32receiver buffer={}", buffer);
508                             panic!("fail")
509                         }
510                         sleep();
511                     }
512                 }
513             }
514             sync.recv();
515             go!(c32, sync, i32sender(c32, sync));
516             if buffer > 0 {
517                 sync.recv();
518             }
519             let mut r#try = 0;
520             loop {
521                 select! {
522                     recv(c32.rx()) -> v => {
523                         if v != Ok(234) {
524                             panic!("i32sender value");
525                         }
526                         break;
527                     }
528                     default => {
529                         r#try += 1;
530                         if r#try > MAX_TRIES {
531                             println!("i32sender buffer={}", buffer);
532                             panic!("fail");
533                         }
534                         sleep();
535                     }
536                 }
537             }
538             if buffer == 0 {
539                 sync.recv();
540             }
541 
542             go!(c64, sync, i64receiver(c64, sync));
543             let mut r#try = 0;
544             loop {
545                 select! {
546                     send(c64.tx(), 123456) -> _ => break,
547                     default => {
548                         r#try += 1;
549                         if r#try > MAX_TRIES {
550                             println!("i64receiver buffer={}", buffer);
551                             panic!("fail")
552                         }
553                         sleep();
554                     }
555                 }
556             }
557             sync.recv();
558             go!(c64, sync, i64sender(c64, sync));
559             if buffer > 0 {
560                 sync.recv();
561             }
562             let mut r#try = 0;
563             loop {
564                 select! {
565                     recv(c64.rx()) -> v => {
566                         if v != Ok(234567) {
567                             panic!("i64sender value");
568                         }
569                         break;
570                     }
571                     default => {
572                         r#try += 1;
573                         if r#try > MAX_TRIES {
574                             println!("i64sender buffer={}", buffer);
575                             panic!("fail");
576                         }
577                         sleep();
578                     }
579                 }
580             }
581             if buffer == 0 {
582                 sync.recv();
583             }
584 
585             go!(cb, sync, breceiver(cb, sync));
586             let mut r#try = 0;
587             loop {
588                 select! {
589                     send(cb.tx(), true) -> _ => break,
590                     default => {
591                         r#try += 1;
592                         if r#try > MAX_TRIES {
593                             println!("breceiver buffer={}", buffer);
594                             panic!("fail")
595                         }
596                         sleep();
597                     }
598                 }
599             }
600             sync.recv();
601             go!(cb, sync, bsender(cb, sync));
602             if buffer > 0 {
603                 sync.recv();
604             }
605             let mut r#try = 0;
606             loop {
607                 select! {
608                     recv(cb.rx()) -> v => {
609                         if v != Ok(true) {
610                             panic!("bsender value");
611                         }
612                         break;
613                     }
614                     default => {
615                         r#try += 1;
616                         if r#try > MAX_TRIES {
617                             println!("bsender buffer={}", buffer);
618                             panic!("fail");
619                         }
620                         sleep();
621                     }
622                 }
623             }
624             if buffer == 0 {
625                 sync.recv();
626             }
627 
628             go!(cs, sync, sreceiver(cs, sync));
629             let mut r#try = 0;
630             loop {
631                 select! {
632                     send(cs.tx(), "hello".to_string()) -> _ => break,
633                     default => {
634                         r#try += 1;
635                         if r#try > MAX_TRIES {
636                             println!("sreceiver buffer={}", buffer);
637                             panic!("fail")
638                         }
639                         sleep();
640                     }
641                 }
642             }
643             sync.recv();
644             go!(cs, sync, ssender(cs, sync));
645             if buffer > 0 {
646                 sync.recv();
647             }
648             let mut r#try = 0;
649             loop {
650                 select! {
651                     recv(cs.rx()) -> v => {
652                         if v != Ok("hello again".to_string()) {
653                             panic!("ssender value");
654                         }
655                         break;
656                     }
657                     default => {
658                         r#try += 1;
659                         if r#try > MAX_TRIES {
660                             println!("ssender buffer={}", buffer);
661                             panic!("fail");
662                         }
663                         sleep();
664                     }
665                 }
666             }
667             if buffer == 0 {
668                 sync.recv();
669             }
670         }
671     }
672 }
673 
674 // https://github.com/golang/go/blob/master/test/chan/select.go
675 mod select {
676     use super::*;
677 
678     #[test]
main()679     fn main() {
680         let shift = Cell::new(0);
681         let counter = Cell::new(0);
682 
683         let get_value = || {
684             counter.set(counter.get() + 1);
685             1 << shift.get()
686         };
687 
688         let send = |mut a: Option<&Chan<u32>>, mut b: Option<&Chan<u32>>| {
689             let mut i = 0;
690             let never = make::<u32>(0);
691             loop {
692                 let nil1 = never.tx();
693                 let nil2 = never.tx();
694                 let v1 = get_value();
695                 let v2 = get_value();
696                 select! {
697                     send(a.map(|c| c.tx()).unwrap_or(nil1), v1) -> _ => {
698                         i += 1;
699                         a = None;
700                     }
701                     send(b.map(|c| c.tx()).unwrap_or(nil2), v2) -> _ => {
702                         i += 1;
703                         b = None;
704                     }
705                     default => break,
706                 }
707                 shift.set(shift.get() + 1);
708             }
709             i
710         };
711 
712         let a = make::<u32>(1);
713         let b = make::<u32>(1);
714 
715         assert_eq!(send(Some(&a), Some(&b)), 2);
716 
717         let av = a.recv().unwrap();
718         let bv = b.recv().unwrap();
719         assert_eq!(av | bv, 3);
720 
721         assert_eq!(send(Some(&a), None), 1);
722         assert_eq!(counter.get(), 10);
723     }
724 }
725 
726 // https://github.com/golang/go/blob/master/test/chan/select2.go
727 mod select2 {
728     use super::*;
729 
730     #[cfg(miri)]
731     const N: i32 = 200;
732     #[cfg(not(miri))]
733     const N: i32 = 100000;
734 
735     #[test]
main()736     fn main() {
737         fn sender(c: &Chan<i32>, n: i32) {
738             for _ in 0..n {
739                 c.send(1);
740             }
741         }
742 
743         fn receiver(c: &Chan<i32>, dummy: &Chan<i32>, n: i32) {
744             for _ in 0..n {
745                 select! {
746                     recv(c.rx()) -> _ => {}
747                     recv(dummy.rx()) -> _ => {
748                         panic!("dummy");
749                     }
750                 }
751             }
752         }
753 
754         let c = make_unbounded::<i32>();
755         let dummy = make_unbounded::<i32>();
756 
757         ALLOCATED.store(0, SeqCst);
758 
759         go!(c, sender(&c, N));
760         receiver(&c, &dummy, N);
761 
762         let alloc = ALLOCATED.load(SeqCst);
763 
764         go!(c, sender(&c, N));
765         receiver(&c, &dummy, N);
766 
767         assert!(
768             !(ALLOCATED.load(SeqCst) > alloc
769                 && (ALLOCATED.load(SeqCst) - alloc) > (N as usize + 10000))
770         )
771     }
772 }
773 
774 // https://github.com/golang/go/blob/master/test/chan/select3.go
775 mod select3 {
776     // TODO
777 }
778 
779 // https://github.com/golang/go/blob/master/test/chan/select4.go
780 mod select4 {
781     use super::*;
782 
783     #[test]
main()784     fn main() {
785         let c = make::<i32>(1);
786         let c1 = make::<i32>(0);
787         c.send(42);
788         select! {
789             recv(c1.rx()) -> _ => panic!("BUG"),
790             recv(c.rx()) -> v => assert_eq!(v, Ok(42)),
791         }
792     }
793 }
794 
795 // https://github.com/golang/go/blob/master/test/chan/select6.go
796 mod select6 {
797     use super::*;
798 
799     #[test]
main()800     fn main() {
801         let c1 = make::<bool>(0);
802         let c2 = make::<bool>(0);
803         let c3 = make::<bool>(0);
804 
805         go!(c1, c1.recv());
806         go!(c1, c2, c3, {
807             select! {
808                 recv(c1.rx()) -> _ => panic!("dummy"),
809                 recv(c2.rx()) -> _ => c3.send(true),
810             }
811             c1.recv();
812         });
813         go!(c2, c2.send(true));
814 
815         c3.recv();
816         c1.send(true);
817         c1.send(true);
818     }
819 }
820 
821 // https://github.com/golang/go/blob/master/test/chan/select7.go
822 mod select7 {
823     use super::*;
824 
recv1(c: Chan<i32>)825     fn recv1(c: Chan<i32>) {
826         c.recv().unwrap();
827     }
828 
recv2(c: Chan<i32>)829     fn recv2(c: Chan<i32>) {
830         select! {
831             recv(c.rx()) -> _ => ()
832         }
833     }
834 
recv3(c: Chan<i32>)835     fn recv3(c: Chan<i32>) {
836         let c2 = make::<i32>(1);
837         select! {
838             recv(c.rx()) -> _ => (),
839             recv(c2.rx()) -> _ => ()
840         }
841     }
842 
send1(recv: fn(Chan<i32>))843     fn send1(recv: fn(Chan<i32>)) {
844         let c = make::<i32>(1);
845         go!(c, recv(c));
846         thread::yield_now();
847         c.send(1);
848     }
849 
send2(recv: fn(Chan<i32>))850     fn send2(recv: fn(Chan<i32>)) {
851         let c = make::<i32>(1);
852         go!(c, recv(c));
853         thread::yield_now();
854         select! {
855             send(c.tx(), 1) -> _ => ()
856         }
857     }
858 
send3(recv: fn(Chan<i32>))859     fn send3(recv: fn(Chan<i32>)) {
860         let c = make::<i32>(1);
861         go!(c, recv(c));
862         thread::yield_now();
863         let c2 = make::<i32>(1);
864         select! {
865             send(c.tx(), 1) -> _ => (),
866             send(c2.tx(), 1) -> _ => ()
867         }
868     }
869 
870     #[test]
main()871     fn main() {
872         send1(recv1);
873         send2(recv1);
874         send3(recv1);
875         send1(recv2);
876         send2(recv2);
877         send3(recv2);
878         send1(recv3);
879         send2(recv3);
880         send3(recv3);
881     }
882 }
883 
884 // https://github.com/golang/go/blob/master/test/chan/sieve1.go
885 mod sieve1 {
886     use super::*;
887 
generate(ch: Chan<i32>)888     fn generate(ch: Chan<i32>) {
889         let mut i = 2;
890         loop {
891             ch.send(i);
892             i += 1;
893         }
894     }
895 
filter(in_ch: Chan<i32>, out_ch: Chan<i32>, prime: i32)896     fn filter(in_ch: Chan<i32>, out_ch: Chan<i32>, prime: i32) {
897         for i in in_ch {
898             if i % prime != 0 {
899                 out_ch.send(i);
900             }
901         }
902     }
903 
sieve(primes: Chan<i32>)904     fn sieve(primes: Chan<i32>) {
905         let mut ch = make::<i32>(1);
906         go!(ch, generate(ch));
907         loop {
908             let prime = ch.recv().unwrap();
909             primes.send(prime);
910 
911             let ch1 = make::<i32>(1);
912             go!(ch, ch1, prime, filter(ch, ch1, prime));
913             ch = ch1;
914         }
915     }
916 
917     #[test]
main()918     fn main() {
919         let primes = make::<i32>(1);
920         go!(primes, sieve(primes));
921 
922         let a = [
923             2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83,
924             89, 97,
925         ];
926         #[cfg(miri)]
927         let a = &a[..10];
928 
929         for item in a.iter() {
930             let x = primes.recv().unwrap();
931             if x != *item {
932                 println!("{} != {}", x, item);
933                 panic!("fail");
934             }
935         }
936     }
937 }
938 
939 // https://github.com/golang/go/blob/master/test/chan/zerosize.go
940 mod zerosize {
941     use super::*;
942 
943     #[test]
zero_size_struct()944     fn zero_size_struct() {
945         struct ZeroSize;
946         let _ = make::<ZeroSize>(0);
947     }
948 
949     #[test]
zero_size_array()950     fn zero_size_array() {
951         let _ = make::<[u8; 0]>(0);
952     }
953 }
954 
955 // https://github.com/golang/go/blob/master/src/runtime/chan_test.go
956 mod chan_test {
957     use super::*;
958 
959     #[test]
test_chan()960     fn test_chan() {
961         #[cfg(miri)]
962         const N: i32 = 12;
963         #[cfg(not(miri))]
964         const N: i32 = 200;
965 
966         #[cfg(miri)]
967         const MESSAGES_COUNT: i32 = 20;
968         #[cfg(not(miri))]
969         const MESSAGES_COUNT: i32 = 100;
970 
971         for cap in 0..N {
972             {
973                 // Ensure that receive from empty chan blocks.
974                 let c = make::<i32>(cap as usize);
975 
976                 let recv1 = Arc::new(Mutex::new(false));
977                 go!(c, recv1, {
978                     c.recv();
979                     *recv1.lock().unwrap() = true;
980                 });
981 
982                 let recv2 = Arc::new(Mutex::new(false));
983                 go!(c, recv2, {
984                     c.recv();
985                     *recv2.lock().unwrap() = true;
986                 });
987 
988                 thread::sleep(ms(1));
989 
990                 if *recv1.lock().unwrap() || *recv2.lock().unwrap() {
991                     panic!();
992                 }
993 
994                 // Ensure that non-blocking receive does not block.
995                 select! {
996                     recv(c.rx()) -> _ => panic!(),
997                     default => {}
998                 }
999                 select! {
1000                     recv(c.rx()) -> _ => panic!(),
1001                     default => {}
1002                 }
1003 
1004                 c.send(0);
1005                 c.send(0);
1006             }
1007 
1008             {
1009                 // Ensure that send to full chan blocks.
1010                 let c = make::<i32>(cap as usize);
1011                 for i in 0..cap {
1012                     c.send(i);
1013                 }
1014 
1015                 let sent = Arc::new(Mutex::new(0));
1016                 go!(sent, c, {
1017                     c.send(0);
1018                     *sent.lock().unwrap() = 1;
1019                 });
1020 
1021                 thread::sleep(ms(1));
1022 
1023                 if *sent.lock().unwrap() != 0 {
1024                     panic!();
1025                 }
1026 
1027                 // Ensure that non-blocking send does not block.
1028                 select! {
1029                     send(c.tx(), 0) -> _ => panic!(),
1030                     default => {}
1031                 }
1032                 c.recv();
1033             }
1034 
1035             {
1036                 // Ensure that we receive 0 from closed chan.
1037                 let c = make::<i32>(cap as usize);
1038                 for i in 0..cap {
1039                     c.send(i);
1040                 }
1041                 c.close_s();
1042 
1043                 for i in 0..cap {
1044                     let v = c.recv();
1045                     if v != Some(i) {
1046                         panic!();
1047                     }
1048                 }
1049 
1050                 if c.recv() != None {
1051                     panic!();
1052                 }
1053                 if c.try_recv() != None {
1054                     panic!();
1055                 }
1056             }
1057 
1058             {
1059                 // Ensure that close unblocks receive.
1060                 let c = make::<i32>(cap as usize);
1061                 let done = make::<bool>(0);
1062 
1063                 go!(c, done, {
1064                     let v = c.try_recv();
1065                     done.send(v.is_none());
1066                 });
1067 
1068                 thread::sleep(ms(1));
1069                 c.close_s();
1070 
1071                 if !done.recv().unwrap() {
1072                     panic!();
1073                 }
1074             }
1075 
1076             {
1077                 // Send many integers,
1078                 // ensure that we receive them non-corrupted in FIFO order.
1079                 let c = make::<i32>(cap as usize);
1080                 go!(c, {
1081                     for i in 0..MESSAGES_COUNT {
1082                         c.send(i);
1083                     }
1084                 });
1085                 for i in 0..MESSAGES_COUNT {
1086                     if c.recv() != Some(i) {
1087                         panic!();
1088                     }
1089                 }
1090 
1091                 // Same, but using recv2.
1092                 go!(c, {
1093                     for i in 0..MESSAGES_COUNT {
1094                         c.send(i);
1095                     }
1096                 });
1097                 for i in 0..MESSAGES_COUNT {
1098                     if c.recv() != Some(i) {
1099                         panic!();
1100                     }
1101                 }
1102             }
1103         }
1104     }
1105 
1106     #[test]
test_nonblock_recv_race()1107     fn test_nonblock_recv_race() {
1108         #[cfg(miri)]
1109         const N: usize = 100;
1110         #[cfg(not(miri))]
1111         const N: usize = 1000;
1112 
1113         for _ in 0..N {
1114             let c = make::<i32>(1);
1115             c.send(1);
1116 
1117             let t = go!(c, {
1118                 select! {
1119                     recv(c.rx()) -> _ => {}
1120                     default => panic!("chan is not ready"),
1121                 }
1122             });
1123 
1124             c.close_s();
1125             c.recv();
1126             t.join().unwrap();
1127         }
1128     }
1129 
1130     #[test]
test_nonblock_select_race()1131     fn test_nonblock_select_race() {
1132         #[cfg(miri)]
1133         const N: usize = 100;
1134         #[cfg(not(miri))]
1135         const N: usize = 1000;
1136 
1137         let done = make::<bool>(1);
1138         for _ in 0..N {
1139             let c1 = make::<i32>(1);
1140             let c2 = make::<i32>(1);
1141             c1.send(1);
1142 
1143             go!(c1, c2, done, {
1144                 select! {
1145                     recv(c1.rx()) -> _ => {}
1146                     recv(c2.rx()) -> _ => {}
1147                     default => {
1148                         done.send(false);
1149                         return;
1150                     }
1151                 }
1152                 done.send(true);
1153             });
1154 
1155             c2.send(1);
1156             select! {
1157                 recv(c1.rx()) -> _ => {}
1158                 default => {}
1159             }
1160             if !done.recv().unwrap() {
1161                 panic!("no chan is ready");
1162             }
1163         }
1164     }
1165 
1166     #[test]
test_nonblock_select_race2()1167     fn test_nonblock_select_race2() {
1168         #[cfg(miri)]
1169         const N: usize = 100;
1170         #[cfg(not(miri))]
1171         const N: usize = 1000;
1172 
1173         let done = make::<bool>(1);
1174         for _ in 0..N {
1175             let c1 = make::<i32>(1);
1176             let c2 = make::<i32>(0);
1177             c1.send(1);
1178 
1179             go!(c1, c2, done, {
1180                 select! {
1181                     recv(c1.rx()) -> _ => {}
1182                     recv(c2.rx()) -> _ => {}
1183                     default => {
1184                         done.send(false);
1185                         return;
1186                     }
1187                 }
1188                 done.send(true);
1189             });
1190 
1191             c2.close_s();
1192             select! {
1193                 recv(c1.rx()) -> _ => {}
1194                 default => {}
1195             }
1196             if !done.recv().unwrap() {
1197                 panic!("no chan is ready");
1198             }
1199         }
1200     }
1201 
1202     #[test]
test_self_select()1203     fn test_self_select() {
1204         // Ensure that send/recv on the same chan in select
1205         // does not crash nor deadlock.
1206 
1207         #[cfg(miri)]
1208         const N: usize = 100;
1209         #[cfg(not(miri))]
1210         const N: usize = 1000;
1211 
1212         for &cap in &[0, 10] {
1213             let wg = WaitGroup::new();
1214             wg.add(2);
1215             let c = make::<i32>(cap);
1216 
1217             for p in 0..2 {
1218                 let p = p;
1219                 go!(wg, p, c, {
1220                     defer! { wg.done() }
1221                     for i in 0..N {
1222                         if p == 0 || i % 2 == 0 {
1223                             select! {
1224                                 send(c.tx(), p) -> _ => {}
1225                                 recv(c.rx()) -> v => {
1226                                     if cap == 0 && v.ok() == Some(p) {
1227                                         panic!("self receive");
1228                                     }
1229                                 }
1230                             }
1231                         } else {
1232                             select! {
1233                                 recv(c.rx()) -> v => {
1234                                     if cap == 0 && v.ok() == Some(p) {
1235                                         panic!("self receive");
1236                                     }
1237                                 }
1238                                 send(c.tx(), p) -> _ => {}
1239                             }
1240                         }
1241                     }
1242                 });
1243             }
1244             wg.wait();
1245         }
1246     }
1247 
1248     #[test]
test_select_stress()1249     fn test_select_stress() {
1250         #[cfg(miri)]
1251         const N: usize = 100;
1252         #[cfg(not(miri))]
1253         const N: usize = 10000;
1254 
1255         let c = vec![
1256             make::<i32>(0),
1257             make::<i32>(0),
1258             make::<i32>(2),
1259             make::<i32>(3),
1260         ];
1261 
1262         // There are 4 goroutines that send N values on each of the chans,
1263         // + 4 goroutines that receive N values on each of the chans,
1264         // + 1 goroutine that sends N values on each of the chans in a single select,
1265         // + 1 goroutine that receives N values on each of the chans in a single select.
1266         // All these sends, receives and selects interact chaotically at runtime,
1267         // but we are careful that this whole construct does not deadlock.
1268         let wg = WaitGroup::new();
1269         wg.add(10);
1270 
1271         for k in 0..4 {
1272             go!(k, c, wg, {
1273                 for _ in 0..N {
1274                     c[k].send(0);
1275                 }
1276                 wg.done();
1277             });
1278             go!(k, c, wg, {
1279                 for _ in 0..N {
1280                     c[k].recv();
1281                 }
1282                 wg.done();
1283             });
1284         }
1285 
1286         go!(c, wg, {
1287             let mut n = [0; 4];
1288             let mut c1 = c.iter().map(|c| Some(c.rx().clone())).collect::<Vec<_>>();
1289 
1290             for _ in 0..4 * N {
1291                 let index = {
1292                     let mut sel = Select::new();
1293                     let mut opers = [!0; 4];
1294                     for &i in &[3, 2, 0, 1] {
1295                         if let Some(c) = &c1[i] {
1296                             opers[i] = sel.recv(c);
1297                         }
1298                     }
1299 
1300                     let oper = sel.select();
1301                     let mut index = !0;
1302                     for i in 0..4 {
1303                         if opers[i] == oper.index() {
1304                             index = i;
1305                             let _ = oper.recv(c1[i].as_ref().unwrap());
1306                             break;
1307                         }
1308                     }
1309                     index
1310                 };
1311 
1312                 n[index] += 1;
1313                 if n[index] == N {
1314                     c1[index] = None;
1315                 }
1316             }
1317             wg.done();
1318         });
1319 
1320         go!(c, wg, {
1321             let mut n = [0; 4];
1322             let mut c1 = c.iter().map(|c| Some(c.tx().clone())).collect::<Vec<_>>();
1323 
1324             for _ in 0..4 * N {
1325                 let index = {
1326                     let mut sel = Select::new();
1327                     let mut opers = [!0; 4];
1328                     for &i in &[0, 1, 2, 3] {
1329                         if let Some(c) = &c1[i] {
1330                             opers[i] = sel.send(c);
1331                         }
1332                     }
1333 
1334                     let oper = sel.select();
1335                     let mut index = !0;
1336                     for i in 0..4 {
1337                         if opers[i] == oper.index() {
1338                             index = i;
1339                             let _ = oper.send(c1[i].as_ref().unwrap(), 0);
1340                             break;
1341                         }
1342                     }
1343                     index
1344                 };
1345 
1346                 n[index] += 1;
1347                 if n[index] == N {
1348                     c1[index] = None;
1349                 }
1350             }
1351             wg.done();
1352         });
1353 
1354         wg.wait();
1355     }
1356 
1357     #[test]
test_select_fairness()1358     fn test_select_fairness() {
1359         #[cfg(miri)]
1360         const TRIALS: usize = 100;
1361         #[cfg(not(miri))]
1362         const TRIALS: usize = 10000;
1363 
1364         let c1 = make::<u8>(TRIALS + 1);
1365         let c2 = make::<u8>(TRIALS + 1);
1366 
1367         for _ in 0..TRIALS + 1 {
1368             c1.send(1);
1369             c2.send(2);
1370         }
1371 
1372         let c3 = make::<u8>(0);
1373         let c4 = make::<u8>(0);
1374         let out = make::<u8>(0);
1375         let done = make::<u8>(0);
1376         let wg = WaitGroup::new();
1377 
1378         wg.add(1);
1379         go!(wg, c1, c2, c3, c4, out, done, {
1380             defer! { wg.done() };
1381             loop {
1382                 let b;
1383                 select! {
1384                     recv(c3.rx()) -> m => b = m.unwrap(),
1385                     recv(c4.rx()) -> m => b = m.unwrap(),
1386                     recv(c1.rx()) -> m => b = m.unwrap(),
1387                     recv(c2.rx()) -> m => b = m.unwrap(),
1388                 }
1389                 select! {
1390                     send(out.tx(), b) -> _ => {}
1391                     recv(done.rx()) -> _ => return,
1392                 }
1393             }
1394         });
1395 
1396         let (mut cnt1, mut cnt2) = (0, 0);
1397         for _ in 0..TRIALS {
1398             match out.recv() {
1399                 Some(1) => cnt1 += 1,
1400                 Some(2) => cnt2 += 1,
1401                 b => panic!("unexpected value {:?} on channel", b),
1402             }
1403         }
1404 
1405         // If the select in the goroutine is fair,
1406         // cnt1 and cnt2 should be about the same value.
1407         // With 10,000 trials, the expected margin of error at
1408         // a confidence level of five nines is 4.4172 / (2 * Sqrt(10000)).
1409 
1410         let r = cnt1 as f64 / TRIALS as f64;
1411         let e = (r - 0.5).abs();
1412 
1413         if e > 4.4172 / (2.0 * (TRIALS as f64).sqrt()) {
1414             panic!(
1415                 "unfair select: in {} trials, results were {}, {}",
1416                 TRIALS, cnt1, cnt2,
1417             );
1418         }
1419 
1420         done.close_s();
1421         wg.wait();
1422     }
1423 
1424     #[test]
test_chan_send_interface()1425     fn test_chan_send_interface() {
1426         struct Mt;
1427 
1428         let c = make::<Box<dyn Any>>(1);
1429         c.send(Box::new(Mt));
1430 
1431         select! {
1432             send(c.tx(), Box::new(Mt)) -> _ => {}
1433             default => {}
1434         }
1435 
1436         select! {
1437             send(c.tx(), Box::new(Mt)) -> _ => {}
1438             send(c.tx(), Box::new(Mt)) -> _ => {}
1439             default => {}
1440         }
1441     }
1442 
1443     #[test]
test_pseudo_random_send()1444     fn test_pseudo_random_send() {
1445         #[cfg(miri)]
1446         const N: usize = 20;
1447         #[cfg(not(miri))]
1448         const N: usize = 100;
1449 
1450         for cap in 0..N {
1451             let c = make::<i32>(cap);
1452             let l = Arc::new(Mutex::new(vec![0i32; N]));
1453             let done = make::<bool>(0);
1454 
1455             go!(c, done, l, {
1456                 let mut l = l.lock().unwrap();
1457                 for i in 0..N {
1458                     thread::yield_now();
1459                     l[i] = c.recv().unwrap();
1460                 }
1461                 done.send(true);
1462             });
1463 
1464             for _ in 0..N {
1465                 select! {
1466                     send(c.tx(), 1) -> _ => {}
1467                     send(c.tx(), 0) -> _ => {}
1468                 }
1469             }
1470             done.recv();
1471 
1472             let mut n0 = 0;
1473             let mut n1 = 0;
1474             for &i in l.lock().unwrap().iter() {
1475                 n0 += (i + 1) % 2;
1476                 n1 += i;
1477             }
1478 
1479             if n0 <= N as i32 / 10 || n1 <= N as i32 / 10 {
1480                 panic!(
1481                     "Want pseudorandom, got {} zeros and {} ones (chan cap {})",
1482                     n0, n1, cap,
1483                 );
1484             }
1485         }
1486     }
1487 
1488     #[test]
test_multi_consumer()1489     fn test_multi_consumer() {
1490         const NWORK: usize = 23;
1491         #[cfg(miri)]
1492         const NITER: usize = 50;
1493         #[cfg(not(miri))]
1494         const NITER: usize = 271828;
1495 
1496         let pn = [2, 3, 7, 11, 13, 17, 19, 23, 27, 31];
1497 
1498         let q = make::<i32>(NWORK * 3);
1499         let r = make::<i32>(NWORK * 3);
1500 
1501         let wg = WaitGroup::new();
1502         for i in 0..NWORK {
1503             wg.add(1);
1504             let w = i;
1505             go!(q, r, wg, pn, {
1506                 for v in &q {
1507                     if pn[w % pn.len()] == v {
1508                         thread::yield_now();
1509                     }
1510                     r.send(v);
1511                 }
1512                 wg.done();
1513             });
1514         }
1515 
1516         let expect = Arc::new(Mutex::new(0));
1517         go!(q, r, expect, wg, pn, {
1518             for i in 0..NITER {
1519                 let v = pn[i % pn.len()];
1520                 *expect.lock().unwrap() += v;
1521                 q.send(v);
1522             }
1523             q.close_s();
1524             wg.wait();
1525             r.close_s();
1526         });
1527 
1528         let mut n = 0;
1529         let mut s = 0;
1530         for v in &r {
1531             n += 1;
1532             s += v;
1533         }
1534 
1535         if n != NITER || s != *expect.lock().unwrap() {
1536             panic!();
1537         }
1538     }
1539 
1540     #[test]
test_select_duplicate_channel()1541     fn test_select_duplicate_channel() {
1542         // This test makes sure we can queue a G on
1543         // the same channel multiple times.
1544         let c = make::<i32>(0);
1545         let d = make::<i32>(0);
1546         let e = make::<i32>(0);
1547 
1548         go!(c, d, e, {
1549             select! {
1550                 recv(c.rx()) -> _ => {}
1551                 recv(d.rx()) -> _ => {}
1552                 recv(e.rx()) -> _ => {}
1553             }
1554             e.send(9);
1555         });
1556         thread::sleep(ms(1));
1557 
1558         go!(c, c.recv());
1559         thread::sleep(ms(1));
1560 
1561         d.send(7);
1562         e.recv();
1563         c.send(8);
1564     }
1565 }
1566 
1567 // https://github.com/golang/go/blob/master/test/closedchan.go
1568 mod closedchan {
1569     // TODO
1570 }
1571 
1572 // https://github.com/golang/go/blob/master/src/runtime/chanbarrier_test.go
1573 mod chanbarrier_test {
1574     // TODO
1575 }
1576 
1577 // https://github.com/golang/go/blob/master/src/runtime/race/testdata/chan_test.go
1578 mod race_chan_test {
1579     // TODO
1580 }
1581 
1582 // https://github.com/golang/go/blob/master/test/ken/chan.go
1583 mod chan {
1584     use super::*;
1585 
1586     const MESSAGES_PER_CHANEL: u32 = 76;
1587     const MESSAGES_RANGE_LEN: u32 = 100;
1588     const END: i32 = 10000;
1589 
1590     struct ChanWithVals {
1591         chan: Chan<i32>,
1592         /// Next value to send
1593         sv: Arc<AtomicI32>,
1594         /// Next value to receive
1595         rv: Arc<AtomicI32>,
1596     }
1597 
1598     struct Totals {
1599         /// Total sent messages
1600         tots: u32,
1601         /// Total received messages
1602         totr: u32,
1603     }
1604 
1605     struct Context {
1606         nproc: Arc<Mutex<i32>>,
1607         cval: Arc<Mutex<i32>>,
1608         tot: Arc<Mutex<Totals>>,
1609         nc: ChanWithVals,
1610         randx: Arc<Mutex<i32>>,
1611     }
1612 
1613     impl ChanWithVals {
with_capacity(capacity: usize) -> Self1614         fn with_capacity(capacity: usize) -> Self {
1615             ChanWithVals {
1616                 chan: make(capacity),
1617                 sv: Arc::new(AtomicI32::new(0)),
1618                 rv: Arc::new(AtomicI32::new(0)),
1619             }
1620         }
1621 
closed() -> Self1622         fn closed() -> Self {
1623             let ch = ChanWithVals::with_capacity(0);
1624             ch.chan.close_r();
1625             ch.chan.close_s();
1626             ch
1627         }
1628 
rv(&self) -> i321629         fn rv(&self) -> i32 {
1630             self.rv.load(SeqCst)
1631         }
1632 
sv(&self) -> i321633         fn sv(&self) -> i32 {
1634             self.sv.load(SeqCst)
1635         }
1636 
send(&mut self, tot: &Mutex<Totals>) -> bool1637         fn send(&mut self, tot: &Mutex<Totals>) -> bool {
1638             {
1639                 let mut tot = tot.lock().unwrap();
1640                 tot.tots += 1
1641             }
1642             let esv = expect(self.sv(), self.sv());
1643             self.sv.store(esv, SeqCst);
1644             if self.sv() == END {
1645                 self.chan.close_s();
1646                 return true;
1647             }
1648             false
1649         }
1650 
recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool1651         fn recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool {
1652             {
1653                 let mut tot = tot.lock().unwrap();
1654                 tot.totr += 1
1655             }
1656             let erv = expect(self.rv(), v);
1657             self.rv.store(erv, SeqCst);
1658             if self.rv() == END {
1659                 self.chan.close_r();
1660                 return true;
1661             }
1662             false
1663         }
1664     }
1665 
1666     impl Clone for ChanWithVals {
clone(&self) -> Self1667         fn clone(&self) -> Self {
1668             ChanWithVals {
1669                 chan: self.chan.clone(),
1670                 sv: self.sv.clone(),
1671                 rv: self.rv.clone(),
1672             }
1673         }
1674     }
1675 
1676     impl Context {
nproc(&self) -> &Mutex<i32>1677         fn nproc(&self) -> &Mutex<i32> {
1678             self.nproc.as_ref()
1679         }
1680 
cval(&self) -> &Mutex<i32>1681         fn cval(&self) -> &Mutex<i32> {
1682             self.cval.as_ref()
1683         }
1684 
tot(&self) -> &Mutex<Totals>1685         fn tot(&self) -> &Mutex<Totals> {
1686             self.tot.as_ref()
1687         }
1688 
randx(&self) -> &Mutex<i32>1689         fn randx(&self) -> &Mutex<i32> {
1690             self.randx.as_ref()
1691         }
1692     }
1693 
1694     impl Clone for Context {
clone(&self) -> Self1695         fn clone(&self) -> Self {
1696             Context {
1697                 nproc: self.nproc.clone(),
1698                 cval: self.cval.clone(),
1699                 tot: self.tot.clone(),
1700                 nc: self.nc.clone(),
1701                 randx: self.randx.clone(),
1702             }
1703         }
1704     }
1705 
nrand(n: i32, randx: &Mutex<i32>) -> i321706     fn nrand(n: i32, randx: &Mutex<i32>) -> i32 {
1707         let mut randx = randx.lock().unwrap();
1708         *randx += 10007;
1709         if *randx >= 1000000 {
1710             *randx -= 1000000
1711         }
1712         *randx % n
1713     }
1714 
change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i321715     fn change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i32 {
1716         let mut nproc = nproc.lock().unwrap();
1717         *nproc += adjust;
1718         *nproc
1719     }
1720 
mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals>1721     fn mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals> {
1722         let mut ca = Vec::<ChanWithVals>::with_capacity(n);
1723         let mut cval = cval.lock().unwrap();
1724         for _ in 0..n {
1725             *cval += MESSAGES_RANGE_LEN as i32;
1726             let chl = ChanWithVals::with_capacity(c);
1727             chl.sv.store(*cval, SeqCst);
1728             chl.rv.store(*cval, SeqCst);
1729             ca.push(chl);
1730         }
1731         ca
1732     }
1733 
expect(v: i32, v0: i32) -> i321734     fn expect(v: i32, v0: i32) -> i32 {
1735         if v == v0 {
1736             return if v % MESSAGES_RANGE_LEN as i32 == MESSAGES_PER_CHANEL as i32 - 1 {
1737                 END
1738             } else {
1739                 v + 1
1740             };
1741         }
1742         panic!("got {}, expected {}", v, v0 + 1);
1743     }
1744 
send(mut c: ChanWithVals, ctx: Context)1745     fn send(mut c: ChanWithVals, ctx: Context) {
1746         loop {
1747             for _ in 0..=nrand(10, ctx.randx()) {
1748                 thread::yield_now();
1749             }
1750             c.chan.tx().send(c.sv()).unwrap();
1751             if c.send(ctx.tot()) {
1752                 break;
1753             }
1754         }
1755         change_nproc(-1, ctx.nproc());
1756     }
1757 
recv(mut c: ChanWithVals, ctx: Context)1758     fn recv(mut c: ChanWithVals, ctx: Context) {
1759         loop {
1760             for _ in (0..nrand(10, ctx.randx())).rev() {
1761                 thread::yield_now();
1762             }
1763             let v = c.chan.rx().recv().unwrap();
1764             if c.recv(v, ctx.tot()) {
1765                 break;
1766             }
1767         }
1768         change_nproc(-1, ctx.nproc());
1769     }
1770 
1771     #[allow(clippy::too_many_arguments)]
sel( mut r0: ChanWithVals, mut r1: ChanWithVals, mut r2: ChanWithVals, mut r3: ChanWithVals, mut s0: ChanWithVals, mut s1: ChanWithVals, mut s2: ChanWithVals, mut s3: ChanWithVals, ctx: Context, )1772     fn sel(
1773         mut r0: ChanWithVals,
1774         mut r1: ChanWithVals,
1775         mut r2: ChanWithVals,
1776         mut r3: ChanWithVals,
1777         mut s0: ChanWithVals,
1778         mut s1: ChanWithVals,
1779         mut s2: ChanWithVals,
1780         mut s3: ChanWithVals,
1781         ctx: Context,
1782     ) {
1783         let mut a = 0; // local chans running
1784 
1785         if r0.chan.has_rx() {
1786             a += 1;
1787         }
1788         if r1.chan.has_rx() {
1789             a += 1;
1790         }
1791         if r2.chan.has_rx() {
1792             a += 1;
1793         }
1794         if r3.chan.has_rx() {
1795             a += 1;
1796         }
1797         if s0.chan.has_tx() {
1798             a += 1;
1799         }
1800         if s1.chan.has_tx() {
1801             a += 1;
1802         }
1803         if s2.chan.has_tx() {
1804             a += 1;
1805         }
1806         if s3.chan.has_tx() {
1807             a += 1;
1808         }
1809 
1810         loop {
1811             for _ in 0..=nrand(5, ctx.randx()) {
1812                 thread::yield_now();
1813             }
1814             select! {
1815                 recv(r0.chan.rx()) -> v => if r0.recv(v.unwrap(), ctx.tot()) { a -= 1 },
1816                 recv(r1.chan.rx()) -> v => if r1.recv(v.unwrap(), ctx.tot()) { a -= 1 },
1817                 recv(r2.chan.rx()) -> v => if r2.recv(v.unwrap(), ctx.tot()) { a -= 1 },
1818                 recv(r3.chan.rx()) -> v => if r3.recv(v.unwrap(), ctx.tot()) { a -= 1 },
1819                 send(s0.chan.tx(), s0.sv()) -> _ => if s0.send(ctx.tot()) { a -= 1 },
1820                 send(s1.chan.tx(), s1.sv()) -> _ => if s1.send(ctx.tot()) { a -= 1 },
1821                 send(s2.chan.tx(), s2.sv()) -> _ => if s2.send(ctx.tot()) { a -= 1 },
1822                 send(s3.chan.tx(), s3.sv()) -> _ => if s3.send(ctx.tot()) { a -= 1 },
1823             }
1824             if a == 0 {
1825                 break;
1826             }
1827         }
1828         change_nproc(-1, ctx.nproc());
1829     }
1830 
get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals1831     fn get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals {
1832         vec.get(idx).unwrap().clone()
1833     }
1834 
1835     /// Direct send to direct recv
test1(c: ChanWithVals, ctx: &mut Context)1836     fn test1(c: ChanWithVals, ctx: &mut Context) {
1837         change_nproc(2, ctx.nproc());
1838         go!(c, ctx, send(c, ctx));
1839         go!(c, ctx, recv(c, ctx));
1840     }
1841 
1842     /// Direct send to select recv
test2(c: usize, ctx: &mut Context)1843     fn test2(c: usize, ctx: &mut Context) {
1844         let ca = mkchan(c, 4, ctx.cval());
1845 
1846         change_nproc(4, ctx.nproc());
1847         go!(ca, ctx, send(get(&ca, 0), ctx));
1848         go!(ca, ctx, send(get(&ca, 1), ctx));
1849         go!(ca, ctx, send(get(&ca, 2), ctx));
1850         go!(ca, ctx, send(get(&ca, 3), ctx));
1851 
1852         change_nproc(1, ctx.nproc());
1853         go!(
1854             ca,
1855             ctx,
1856             sel(
1857                 get(&ca, 0),
1858                 get(&ca, 1),
1859                 get(&ca, 2),
1860                 get(&ca, 3),
1861                 ctx.nc.clone(),
1862                 ctx.nc.clone(),
1863                 ctx.nc.clone(),
1864                 ctx.nc.clone(),
1865                 ctx,
1866             )
1867         );
1868     }
1869 
1870     /// Select send to direct recv
test3(c: usize, ctx: &mut Context)1871     fn test3(c: usize, ctx: &mut Context) {
1872         let ca = mkchan(c, 4, ctx.cval());
1873 
1874         change_nproc(4, ctx.nproc());
1875         go!(ca, ctx, recv(get(&ca, 0), ctx));
1876         go!(ca, ctx, recv(get(&ca, 1), ctx));
1877         go!(ca, ctx, recv(get(&ca, 2), ctx));
1878         go!(ca, ctx, recv(get(&ca, 3), ctx));
1879 
1880         change_nproc(1, ctx.nproc());
1881         go!(
1882             ca,
1883             ctx,
1884             sel(
1885                 ctx.nc.clone(),
1886                 ctx.nc.clone(),
1887                 ctx.nc.clone(),
1888                 ctx.nc.clone(),
1889                 get(&ca, 0),
1890                 get(&ca, 1),
1891                 get(&ca, 2),
1892                 get(&ca, 3),
1893                 ctx,
1894             )
1895         );
1896     }
1897 
1898     /// Select send to select recv, 4 channels
test4(c: usize, ctx: &mut Context)1899     fn test4(c: usize, ctx: &mut Context) {
1900         let ca = mkchan(c, 4, ctx.cval());
1901 
1902         change_nproc(2, ctx.nproc());
1903         go!(
1904             ca,
1905             ctx,
1906             sel(
1907                 ctx.nc.clone(),
1908                 ctx.nc.clone(),
1909                 ctx.nc.clone(),
1910                 ctx.nc.clone(),
1911                 get(&ca, 0),
1912                 get(&ca, 1),
1913                 get(&ca, 2),
1914                 get(&ca, 3),
1915                 ctx,
1916             )
1917         );
1918         go!(
1919             ca,
1920             ctx,
1921             sel(
1922                 get(&ca, 0),
1923                 get(&ca, 1),
1924                 get(&ca, 2),
1925                 get(&ca, 3),
1926                 ctx.nc.clone(),
1927                 ctx.nc.clone(),
1928                 ctx.nc.clone(),
1929                 ctx.nc.clone(),
1930                 ctx,
1931             )
1932         );
1933     }
1934 
1935     /// Select send to select recv, 8 channels
test5(c: usize, ctx: &mut Context)1936     fn test5(c: usize, ctx: &mut Context) {
1937         let ca = mkchan(c, 8, ctx.cval());
1938 
1939         change_nproc(2, ctx.nproc());
1940         go!(
1941             ca,
1942             ctx,
1943             sel(
1944                 get(&ca, 4),
1945                 get(&ca, 5),
1946                 get(&ca, 6),
1947                 get(&ca, 7),
1948                 get(&ca, 0),
1949                 get(&ca, 1),
1950                 get(&ca, 2),
1951                 get(&ca, 3),
1952                 ctx,
1953             )
1954         );
1955         go!(
1956             ca,
1957             ctx,
1958             sel(
1959                 get(&ca, 0),
1960                 get(&ca, 1),
1961                 get(&ca, 2),
1962                 get(&ca, 3),
1963                 get(&ca, 4),
1964                 get(&ca, 5),
1965                 get(&ca, 6),
1966                 get(&ca, 7),
1967                 ctx,
1968             )
1969         );
1970     }
1971 
1972     // Direct and select send to direct and select recv
test6(c: usize, ctx: &mut Context)1973     fn test6(c: usize, ctx: &mut Context) {
1974         let ca = mkchan(c, 12, ctx.cval());
1975 
1976         change_nproc(4, ctx.nproc());
1977         go!(ca, ctx, send(get(&ca, 4), ctx));
1978         go!(ca, ctx, send(get(&ca, 5), ctx));
1979         go!(ca, ctx, send(get(&ca, 6), ctx));
1980         go!(ca, ctx, send(get(&ca, 7), ctx));
1981 
1982         change_nproc(4, ctx.nproc());
1983         go!(ca, ctx, recv(get(&ca, 8), ctx));
1984         go!(ca, ctx, recv(get(&ca, 9), ctx));
1985         go!(ca, ctx, recv(get(&ca, 10), ctx));
1986         go!(ca, ctx, recv(get(&ca, 11), ctx));
1987 
1988         change_nproc(2, ctx.nproc());
1989         go!(
1990             ca,
1991             ctx,
1992             sel(
1993                 get(&ca, 4),
1994                 get(&ca, 5),
1995                 get(&ca, 6),
1996                 get(&ca, 7),
1997                 get(&ca, 0),
1998                 get(&ca, 1),
1999                 get(&ca, 2),
2000                 get(&ca, 3),
2001                 ctx,
2002             )
2003         );
2004         go!(
2005             ca,
2006             ctx,
2007             sel(
2008                 get(&ca, 0),
2009                 get(&ca, 1),
2010                 get(&ca, 2),
2011                 get(&ca, 3),
2012                 get(&ca, 8),
2013                 get(&ca, 9),
2014                 get(&ca, 10),
2015                 get(&ca, 11),
2016                 ctx,
2017             )
2018         );
2019     }
2020 
wait(ctx: &mut Context)2021     fn wait(ctx: &mut Context) {
2022         thread::yield_now();
2023         while change_nproc(0, ctx.nproc()) != 0 {
2024             thread::yield_now();
2025         }
2026     }
2027 
tests(c: usize, ctx: &mut Context)2028     fn tests(c: usize, ctx: &mut Context) {
2029         let ca = mkchan(c, 4, ctx.cval());
2030         test1(get(&ca, 0), ctx);
2031         test1(get(&ca, 1), ctx);
2032         test1(get(&ca, 2), ctx);
2033         test1(get(&ca, 3), ctx);
2034         wait(ctx);
2035 
2036         test2(c, ctx);
2037         wait(ctx);
2038 
2039         test3(c, ctx);
2040         wait(ctx);
2041 
2042         test4(c, ctx);
2043         wait(ctx);
2044 
2045         test5(c, ctx);
2046         wait(ctx);
2047 
2048         test6(c, ctx);
2049         wait(ctx);
2050     }
2051 
2052     #[test]
2053     #[cfg_attr(miri, ignore)] // Miri is too slow
main()2054     fn main() {
2055         let mut ctx = Context {
2056             nproc: Arc::new(Mutex::new(0)),
2057             cval: Arc::new(Mutex::new(0)),
2058             tot: Arc::new(Mutex::new(Totals { tots: 0, totr: 0 })),
2059             nc: ChanWithVals::closed(),
2060             randx: Arc::new(Mutex::new(0)),
2061         };
2062 
2063         tests(0, &mut ctx);
2064         tests(1, &mut ctx);
2065         tests(10, &mut ctx);
2066         tests(100, &mut ctx);
2067 
2068         #[rustfmt::skip]
2069         let t = 4 * // buffer sizes
2070                     (4*4 + // tests 1,2,3,4 channels
2071                             8 + // test 5 channels
2072                             12) * // test 6 channels
2073                         MESSAGES_PER_CHANEL; // sends/recvs on a channel
2074 
2075         let tot = ctx.tot.lock().unwrap();
2076         if tot.tots != t || tot.totr != t {
2077             panic!("tots={} totr={} sb={}", tot.tots, tot.totr, t);
2078         }
2079     }
2080 }
2081 
2082 // https://github.com/golang/go/blob/master/test/ken/chan1.go
2083 mod chan1 {
2084     use super::*;
2085 
2086     // sent messages
2087     #[cfg(miri)]
2088     const N: usize = 20;
2089     #[cfg(not(miri))]
2090     const N: usize = 1000;
2091     // receiving "goroutines"
2092     const M: usize = 10;
2093     // channel buffering
2094     const W: usize = 2;
2095 
r(c: Chan<usize>, m: usize, h: Arc<Mutex<[usize; N]>>)2096     fn r(c: Chan<usize>, m: usize, h: Arc<Mutex<[usize; N]>>) {
2097         loop {
2098             select! {
2099                 recv(c.rx()) -> rr => {
2100                     let r = rr.unwrap();
2101                     let mut data = h.lock().unwrap();
2102                     if data[r] != 1 {
2103                         println!("r\nm={}\nr={}\nh={}\n", m, r, data[r]);
2104                         panic!("fail")
2105                     }
2106                     data[r] = 2;
2107                 }
2108             }
2109         }
2110     }
2111 
s(c: Chan<usize>, h: Arc<Mutex<[usize; N]>>)2112     fn s(c: Chan<usize>, h: Arc<Mutex<[usize; N]>>) {
2113         for n in 0..N {
2114             let r = n;
2115             let mut data = h.lock().unwrap();
2116             if data[r] != 0 {
2117                 println!("s");
2118                 panic!("fail");
2119             }
2120             data[r] = 1;
2121             // https://github.com/crossbeam-rs/crossbeam/pull/615#discussion_r550281094
2122             drop(data);
2123             c.send(r);
2124         }
2125     }
2126 
2127     #[test]
main()2128     fn main() {
2129         let h = Arc::new(Mutex::new([0usize; N]));
2130         let c = make::<usize>(W);
2131         for m in 0..M {
2132             go!(c, h, {
2133                 r(c, m, h);
2134             });
2135             thread::yield_now();
2136         }
2137         thread::yield_now();
2138         thread::yield_now();
2139         s(c, h);
2140     }
2141 }
2142