1 //! Tests copied from Go and manually rewritten in Rust.
2 //!
3 //! Source:
4 //! - https://github.com/golang/go
5 //!
6 //! Copyright & License:
7 //! - Copyright (c) 2009 The Go Authors
8 //! - https://golang.org/AUTHORS
9 //! - https://golang.org/LICENSE
10 //! - https://golang.org/PATENTS
11
12 #![allow(clippy::redundant_clone)]
13
14 use std::alloc::{GlobalAlloc, Layout, System};
15 use std::any::Any;
16 use std::cell::Cell;
17 use std::collections::HashMap;
18 use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering::SeqCst};
19 use std::sync::{Arc, Condvar, Mutex};
20 use std::thread;
21 use std::time::Duration;
22
23 use crossbeam_channel::{bounded, never, select, tick, unbounded, Receiver, Select, Sender};
24
ms(ms: u64) -> Duration25 fn ms(ms: u64) -> Duration {
26 Duration::from_millis(ms)
27 }
28
29 struct Chan<T> {
30 inner: Arc<Mutex<ChanInner<T>>>,
31 }
32
33 struct ChanInner<T> {
34 s: Option<Sender<T>>,
35 r: Option<Receiver<T>>,
36 // Receiver to use when r is None (Go blocks on receiving from nil)
37 nil_r: Receiver<T>,
38 // Sender to use when s is None (Go blocks on sending to nil)
39 nil_s: Sender<T>,
40 // Hold this receiver to prevent nil sender channel from disconnection
41 _nil_sr: Receiver<T>,
42 }
43
44 impl<T> Clone for Chan<T> {
clone(&self) -> Chan<T>45 fn clone(&self) -> Chan<T> {
46 Chan {
47 inner: self.inner.clone(),
48 }
49 }
50 }
51
52 impl<T> Chan<T> {
send(&self, msg: T)53 fn send(&self, msg: T) {
54 let s = self
55 .inner
56 .lock()
57 .unwrap()
58 .s
59 .as_ref()
60 .expect("sending into closed channel")
61 .clone();
62 let _ = s.send(msg);
63 }
64
try_recv(&self) -> Option<T>65 fn try_recv(&self) -> Option<T> {
66 let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone();
67 r.try_recv().ok()
68 }
69
recv(&self) -> Option<T>70 fn recv(&self) -> Option<T> {
71 let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone();
72 r.recv().ok()
73 }
74
close_s(&self)75 fn close_s(&self) {
76 self.inner
77 .lock()
78 .unwrap()
79 .s
80 .take()
81 .expect("channel sender already closed");
82 }
83
close_r(&self)84 fn close_r(&self) {
85 self.inner
86 .lock()
87 .unwrap()
88 .r
89 .take()
90 .expect("channel receiver already closed");
91 }
92
has_rx(&self) -> bool93 fn has_rx(&self) -> bool {
94 self.inner.lock().unwrap().r.is_some()
95 }
96
has_tx(&self) -> bool97 fn has_tx(&self) -> bool {
98 self.inner.lock().unwrap().s.is_some()
99 }
100
rx(&self) -> Receiver<T>101 fn rx(&self) -> Receiver<T> {
102 let inner = self.inner.lock().unwrap();
103 match inner.r.as_ref() {
104 None => inner.nil_r.clone(),
105 Some(r) => r.clone(),
106 }
107 }
108
tx(&self) -> Sender<T>109 fn tx(&self) -> Sender<T> {
110 let inner = self.inner.lock().unwrap();
111 match inner.s.as_ref() {
112 None => inner.nil_s.clone(),
113 Some(s) => s.clone(),
114 }
115 }
116 }
117
118 impl<T> Iterator for Chan<T> {
119 type Item = T;
120
next(&mut self) -> Option<Self::Item>121 fn next(&mut self) -> Option<Self::Item> {
122 self.recv()
123 }
124 }
125
126 impl<'a, T> IntoIterator for &'a Chan<T> {
127 type Item = T;
128 type IntoIter = Chan<T>;
129
into_iter(self) -> Self::IntoIter130 fn into_iter(self) -> Self::IntoIter {
131 self.clone()
132 }
133 }
134
make<T>(cap: usize) -> Chan<T>135 fn make<T>(cap: usize) -> Chan<T> {
136 let (s, r) = bounded(cap);
137 let (nil_s, _nil_sr) = bounded(0);
138 Chan {
139 inner: Arc::new(Mutex::new(ChanInner {
140 s: Some(s),
141 r: Some(r),
142 nil_r: never(),
143 nil_s,
144 _nil_sr,
145 })),
146 }
147 }
148
make_unbounded<T>() -> Chan<T>149 fn make_unbounded<T>() -> Chan<T> {
150 let (s, r) = unbounded();
151 let (nil_s, _nil_sr) = bounded(0);
152 Chan {
153 inner: Arc::new(Mutex::new(ChanInner {
154 s: Some(s),
155 r: Some(r),
156 nil_r: never(),
157 nil_s,
158 _nil_sr,
159 })),
160 }
161 }
162
163 #[derive(Clone)]
164 struct WaitGroup(Arc<WaitGroupInner>);
165
166 struct WaitGroupInner {
167 cond: Condvar,
168 count: Mutex<i32>,
169 }
170
171 impl WaitGroup {
new() -> WaitGroup172 fn new() -> WaitGroup {
173 WaitGroup(Arc::new(WaitGroupInner {
174 cond: Condvar::new(),
175 count: Mutex::new(0),
176 }))
177 }
178
add(&self, delta: i32)179 fn add(&self, delta: i32) {
180 let mut count = self.0.count.lock().unwrap();
181 *count += delta;
182 assert!(*count >= 0);
183 self.0.cond.notify_all();
184 }
185
done(&self)186 fn done(&self) {
187 self.add(-1);
188 }
189
wait(&self)190 fn wait(&self) {
191 let mut count = self.0.count.lock().unwrap();
192 while *count > 0 {
193 count = self.0.cond.wait(count).unwrap();
194 }
195 }
196 }
197
198 struct Defer<F: FnOnce()> {
199 f: Option<Box<F>>,
200 }
201
202 impl<F: FnOnce()> Drop for Defer<F> {
drop(&mut self)203 fn drop(&mut self) {
204 let f = self.f.take().unwrap();
205 let mut f = Some(f);
206 let mut f = move || f.take().unwrap()();
207 f();
208 }
209 }
210
211 struct Counter;
212
213 static ALLOCATED: AtomicUsize = AtomicUsize::new(0);
214 unsafe impl GlobalAlloc for Counter {
alloc(&self, layout: Layout) -> *mut u8215 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
216 let ret = System.alloc(layout);
217 if !ret.is_null() {
218 ALLOCATED.fetch_add(layout.size(), SeqCst);
219 }
220 ret
221 }
222
dealloc(&self, ptr: *mut u8, layout: Layout)223 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
224 System.dealloc(ptr, layout);
225 ALLOCATED.fetch_sub(layout.size(), SeqCst);
226 }
227 }
228
229 #[global_allocator]
230 static A: Counter = Counter;
231
232 macro_rules! defer {
233 ($body:expr) => {
234 let _defer = Defer {
235 f: Some(Box::new(|| $body)),
236 };
237 };
238 }
239
240 macro_rules! go {
241 (@parse $v:ident, $($tail:tt)*) => {{
242 let $v = $v.clone();
243 go!(@parse $($tail)*)
244 }};
245 (@parse $body:expr) => {
246 ::std::thread::spawn(move || {
247 let res = ::std::panic::catch_unwind(::std::panic::AssertUnwindSafe(|| {
248 $body
249 }));
250 if res.is_err() {
251 eprintln!("goroutine panicked: {:?}", res);
252 ::std::process::abort();
253 }
254 })
255 };
256 (@parse $($tail:tt)*) => {
257 compile_error!("invalid `go!` syntax")
258 };
259 ($($tail:tt)*) => {{
260 go!(@parse $($tail)*)
261 }};
262 }
263
264 // https://github.com/golang/go/blob/master/test/chan/doubleselect.go
265 mod doubleselect {
266 use super::*;
267
268 #[cfg(miri)]
269 const ITERATIONS: i32 = 100;
270 #[cfg(not(miri))]
271 const ITERATIONS: i32 = 10_000;
272
sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>)273 fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) {
274 defer! { c1.close_s() }
275 defer! { c2.close_s() }
276 defer! { c3.close_s() }
277 defer! { c4.close_s() }
278
279 for i in 0..n {
280 select! {
281 send(c1.tx(), i) -> _ => {}
282 send(c2.tx(), i) -> _ => {}
283 send(c3.tx(), i) -> _ => {}
284 send(c4.tx(), i) -> _ => {}
285 }
286 }
287 }
288
mux(out: Chan<i32>, inp: Chan<i32>, done: Chan<bool>)289 fn mux(out: Chan<i32>, inp: Chan<i32>, done: Chan<bool>) {
290 for v in inp {
291 out.send(v);
292 }
293 done.send(true);
294 }
295
recver(inp: Chan<i32>)296 fn recver(inp: Chan<i32>) {
297 let mut seen = HashMap::new();
298
299 for v in &inp {
300 if seen.contains_key(&v) {
301 panic!("got duplicate value for {}", v);
302 }
303 seen.insert(v, true);
304 }
305 }
306
307 #[test]
main()308 fn main() {
309 let c1 = make::<i32>(0);
310 let c2 = make::<i32>(0);
311 let c3 = make::<i32>(0);
312 let c4 = make::<i32>(0);
313 let done = make::<bool>(0);
314 let cmux = make::<i32>(0);
315
316 go!(c1, c2, c3, c4, sender(ITERATIONS, c1, c2, c3, c4));
317 go!(cmux, c1, done, mux(cmux, c1, done));
318 go!(cmux, c2, done, mux(cmux, c2, done));
319 go!(cmux, c3, done, mux(cmux, c3, done));
320 go!(cmux, c4, done, mux(cmux, c4, done));
321 go!(done, cmux, {
322 done.recv();
323 done.recv();
324 done.recv();
325 done.recv();
326 cmux.close_s();
327 });
328 recver(cmux);
329 }
330 }
331
332 // https://github.com/golang/go/blob/master/test/chan/fifo.go
333 mod fifo {
334 use super::*;
335
336 const N: i32 = 10;
337
338 #[test]
asynch_fifo()339 fn asynch_fifo() {
340 let ch = make::<i32>(N as usize);
341 for i in 0..N {
342 ch.send(i);
343 }
344 for i in 0..N {
345 if ch.recv() != Some(i) {
346 panic!("bad receive");
347 }
348 }
349 }
350
chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>)351 fn chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>) {
352 inp.recv();
353 if ch.recv() != Some(val) {
354 panic!("{}", val);
355 }
356 out.send(1);
357 }
358
359 #[test]
synch_fifo()360 fn synch_fifo() {
361 let ch = make::<i32>(0);
362 let mut inp = make::<i32>(0);
363 let start = inp.clone();
364
365 for i in 0..N {
366 let out = make::<i32>(0);
367 go!(ch, i, inp, out, chain(ch, i, inp, out));
368 inp = out;
369 }
370
371 start.send(0);
372 for i in 0..N {
373 ch.send(i);
374 }
375 inp.recv();
376 }
377 }
378
379 // https://github.com/golang/go/blob/master/test/chan/goroutines.go
380 mod goroutines {
381 use super::*;
382
f(left: Chan<i32>, right: Chan<i32>)383 fn f(left: Chan<i32>, right: Chan<i32>) {
384 left.send(right.recv().unwrap());
385 }
386
387 #[test]
main()388 fn main() {
389 let n = 100i32;
390
391 let leftmost = make::<i32>(0);
392 let mut right = leftmost.clone();
393 let mut left = leftmost.clone();
394
395 for _ in 0..n {
396 right = make::<i32>(0);
397 go!(left, right, f(left, right));
398 left = right.clone();
399 }
400
401 go!(right, right.send(1));
402 leftmost.recv().unwrap();
403 }
404 }
405
406 // https://github.com/golang/go/blob/master/test/chan/nonblock.go
407 mod nonblock {
408 use super::*;
409
i32receiver(c: Chan<i32>, strobe: Chan<bool>)410 fn i32receiver(c: Chan<i32>, strobe: Chan<bool>) {
411 if c.recv().unwrap() != 123 {
412 panic!("i32 value");
413 }
414 strobe.send(true);
415 }
416
i32sender(c: Chan<i32>, strobe: Chan<bool>)417 fn i32sender(c: Chan<i32>, strobe: Chan<bool>) {
418 c.send(234);
419 strobe.send(true);
420 }
421
i64receiver(c: Chan<i64>, strobe: Chan<bool>)422 fn i64receiver(c: Chan<i64>, strobe: Chan<bool>) {
423 if c.recv().unwrap() != 123456 {
424 panic!("i64 value");
425 }
426 strobe.send(true);
427 }
428
i64sender(c: Chan<i64>, strobe: Chan<bool>)429 fn i64sender(c: Chan<i64>, strobe: Chan<bool>) {
430 c.send(234567);
431 strobe.send(true);
432 }
433
breceiver(c: Chan<bool>, strobe: Chan<bool>)434 fn breceiver(c: Chan<bool>, strobe: Chan<bool>) {
435 if !c.recv().unwrap() {
436 panic!("b value");
437 }
438 strobe.send(true);
439 }
440
bsender(c: Chan<bool>, strobe: Chan<bool>)441 fn bsender(c: Chan<bool>, strobe: Chan<bool>) {
442 c.send(true);
443 strobe.send(true);
444 }
445
sreceiver(c: Chan<String>, strobe: Chan<bool>)446 fn sreceiver(c: Chan<String>, strobe: Chan<bool>) {
447 if c.recv().unwrap() != "hello" {
448 panic!("x value");
449 }
450 strobe.send(true);
451 }
452
ssender(c: Chan<String>, strobe: Chan<bool>)453 fn ssender(c: Chan<String>, strobe: Chan<bool>) {
454 c.send("hello again".to_string());
455 strobe.send(true);
456 }
457
458 const MAX_TRIES: usize = 10000; // Up to 100ms per test.
459
460 #[test]
main()461 fn main() {
462 let ticker = tick(Duration::new(0, 10_000)); // 10 us
463 let sleep = || {
464 ticker.recv().unwrap();
465 ticker.recv().unwrap();
466 thread::yield_now();
467 thread::yield_now();
468 thread::yield_now();
469 };
470
471 let sync = make::<bool>(0);
472
473 for buffer in 0..2 {
474 let c32 = make::<i32>(buffer);
475 let c64 = make::<i64>(buffer);
476 let cb = make::<bool>(buffer);
477 let cs = make::<String>(buffer);
478
479 select! {
480 recv(c32.rx()) -> _ => panic!("blocked i32sender"),
481 default => {}
482 }
483
484 select! {
485 recv(c64.rx()) -> _ => panic!("blocked i64sender"),
486 default => {}
487 }
488
489 select! {
490 recv(cb.rx()) -> _ => panic!("blocked bsender"),
491 default => {}
492 }
493
494 select! {
495 recv(cs.rx()) -> _ => panic!("blocked ssender"),
496 default => {}
497 }
498
499 go!(c32, sync, i32receiver(c32, sync));
500 let mut r#try = 0;
501 loop {
502 select! {
503 send(c32.tx(), 123) -> _ => break,
504 default => {
505 r#try += 1;
506 if r#try > MAX_TRIES {
507 println!("i32receiver buffer={}", buffer);
508 panic!("fail")
509 }
510 sleep();
511 }
512 }
513 }
514 sync.recv();
515 go!(c32, sync, i32sender(c32, sync));
516 if buffer > 0 {
517 sync.recv();
518 }
519 let mut r#try = 0;
520 loop {
521 select! {
522 recv(c32.rx()) -> v => {
523 if v != Ok(234) {
524 panic!("i32sender value");
525 }
526 break;
527 }
528 default => {
529 r#try += 1;
530 if r#try > MAX_TRIES {
531 println!("i32sender buffer={}", buffer);
532 panic!("fail");
533 }
534 sleep();
535 }
536 }
537 }
538 if buffer == 0 {
539 sync.recv();
540 }
541
542 go!(c64, sync, i64receiver(c64, sync));
543 let mut r#try = 0;
544 loop {
545 select! {
546 send(c64.tx(), 123456) -> _ => break,
547 default => {
548 r#try += 1;
549 if r#try > MAX_TRIES {
550 println!("i64receiver buffer={}", buffer);
551 panic!("fail")
552 }
553 sleep();
554 }
555 }
556 }
557 sync.recv();
558 go!(c64, sync, i64sender(c64, sync));
559 if buffer > 0 {
560 sync.recv();
561 }
562 let mut r#try = 0;
563 loop {
564 select! {
565 recv(c64.rx()) -> v => {
566 if v != Ok(234567) {
567 panic!("i64sender value");
568 }
569 break;
570 }
571 default => {
572 r#try += 1;
573 if r#try > MAX_TRIES {
574 println!("i64sender buffer={}", buffer);
575 panic!("fail");
576 }
577 sleep();
578 }
579 }
580 }
581 if buffer == 0 {
582 sync.recv();
583 }
584
585 go!(cb, sync, breceiver(cb, sync));
586 let mut r#try = 0;
587 loop {
588 select! {
589 send(cb.tx(), true) -> _ => break,
590 default => {
591 r#try += 1;
592 if r#try > MAX_TRIES {
593 println!("breceiver buffer={}", buffer);
594 panic!("fail")
595 }
596 sleep();
597 }
598 }
599 }
600 sync.recv();
601 go!(cb, sync, bsender(cb, sync));
602 if buffer > 0 {
603 sync.recv();
604 }
605 let mut r#try = 0;
606 loop {
607 select! {
608 recv(cb.rx()) -> v => {
609 if v != Ok(true) {
610 panic!("bsender value");
611 }
612 break;
613 }
614 default => {
615 r#try += 1;
616 if r#try > MAX_TRIES {
617 println!("bsender buffer={}", buffer);
618 panic!("fail");
619 }
620 sleep();
621 }
622 }
623 }
624 if buffer == 0 {
625 sync.recv();
626 }
627
628 go!(cs, sync, sreceiver(cs, sync));
629 let mut r#try = 0;
630 loop {
631 select! {
632 send(cs.tx(), "hello".to_string()) -> _ => break,
633 default => {
634 r#try += 1;
635 if r#try > MAX_TRIES {
636 println!("sreceiver buffer={}", buffer);
637 panic!("fail")
638 }
639 sleep();
640 }
641 }
642 }
643 sync.recv();
644 go!(cs, sync, ssender(cs, sync));
645 if buffer > 0 {
646 sync.recv();
647 }
648 let mut r#try = 0;
649 loop {
650 select! {
651 recv(cs.rx()) -> v => {
652 if v != Ok("hello again".to_string()) {
653 panic!("ssender value");
654 }
655 break;
656 }
657 default => {
658 r#try += 1;
659 if r#try > MAX_TRIES {
660 println!("ssender buffer={}", buffer);
661 panic!("fail");
662 }
663 sleep();
664 }
665 }
666 }
667 if buffer == 0 {
668 sync.recv();
669 }
670 }
671 }
672 }
673
674 // https://github.com/golang/go/blob/master/test/chan/select.go
675 mod select {
676 use super::*;
677
678 #[test]
main()679 fn main() {
680 let shift = Cell::new(0);
681 let counter = Cell::new(0);
682
683 let get_value = || {
684 counter.set(counter.get() + 1);
685 1 << shift.get()
686 };
687
688 let send = |mut a: Option<&Chan<u32>>, mut b: Option<&Chan<u32>>| {
689 let mut i = 0;
690 let never = make::<u32>(0);
691 loop {
692 let nil1 = never.tx();
693 let nil2 = never.tx();
694 let v1 = get_value();
695 let v2 = get_value();
696 select! {
697 send(a.map(|c| c.tx()).unwrap_or(nil1), v1) -> _ => {
698 i += 1;
699 a = None;
700 }
701 send(b.map(|c| c.tx()).unwrap_or(nil2), v2) -> _ => {
702 i += 1;
703 b = None;
704 }
705 default => break,
706 }
707 shift.set(shift.get() + 1);
708 }
709 i
710 };
711
712 let a = make::<u32>(1);
713 let b = make::<u32>(1);
714
715 assert_eq!(send(Some(&a), Some(&b)), 2);
716
717 let av = a.recv().unwrap();
718 let bv = b.recv().unwrap();
719 assert_eq!(av | bv, 3);
720
721 assert_eq!(send(Some(&a), None), 1);
722 assert_eq!(counter.get(), 10);
723 }
724 }
725
726 // https://github.com/golang/go/blob/master/test/chan/select2.go
727 mod select2 {
728 use super::*;
729
730 #[cfg(miri)]
731 const N: i32 = 200;
732 #[cfg(not(miri))]
733 const N: i32 = 100000;
734
735 #[test]
main()736 fn main() {
737 fn sender(c: &Chan<i32>, n: i32) {
738 for _ in 0..n {
739 c.send(1);
740 }
741 }
742
743 fn receiver(c: &Chan<i32>, dummy: &Chan<i32>, n: i32) {
744 for _ in 0..n {
745 select! {
746 recv(c.rx()) -> _ => {}
747 recv(dummy.rx()) -> _ => {
748 panic!("dummy");
749 }
750 }
751 }
752 }
753
754 let c = make_unbounded::<i32>();
755 let dummy = make_unbounded::<i32>();
756
757 ALLOCATED.store(0, SeqCst);
758
759 go!(c, sender(&c, N));
760 receiver(&c, &dummy, N);
761
762 let alloc = ALLOCATED.load(SeqCst);
763
764 go!(c, sender(&c, N));
765 receiver(&c, &dummy, N);
766
767 assert!(
768 !(ALLOCATED.load(SeqCst) > alloc
769 && (ALLOCATED.load(SeqCst) - alloc) > (N as usize + 10000))
770 )
771 }
772 }
773
774 // https://github.com/golang/go/blob/master/test/chan/select3.go
775 mod select3 {
776 // TODO
777 }
778
779 // https://github.com/golang/go/blob/master/test/chan/select4.go
780 mod select4 {
781 use super::*;
782
783 #[test]
main()784 fn main() {
785 let c = make::<i32>(1);
786 let c1 = make::<i32>(0);
787 c.send(42);
788 select! {
789 recv(c1.rx()) -> _ => panic!("BUG"),
790 recv(c.rx()) -> v => assert_eq!(v, Ok(42)),
791 }
792 }
793 }
794
795 // https://github.com/golang/go/blob/master/test/chan/select6.go
796 mod select6 {
797 use super::*;
798
799 #[test]
main()800 fn main() {
801 let c1 = make::<bool>(0);
802 let c2 = make::<bool>(0);
803 let c3 = make::<bool>(0);
804
805 go!(c1, c1.recv());
806 go!(c1, c2, c3, {
807 select! {
808 recv(c1.rx()) -> _ => panic!("dummy"),
809 recv(c2.rx()) -> _ => c3.send(true),
810 }
811 c1.recv();
812 });
813 go!(c2, c2.send(true));
814
815 c3.recv();
816 c1.send(true);
817 c1.send(true);
818 }
819 }
820
821 // https://github.com/golang/go/blob/master/test/chan/select7.go
822 mod select7 {
823 use super::*;
824
recv1(c: Chan<i32>)825 fn recv1(c: Chan<i32>) {
826 c.recv().unwrap();
827 }
828
recv2(c: Chan<i32>)829 fn recv2(c: Chan<i32>) {
830 select! {
831 recv(c.rx()) -> _ => ()
832 }
833 }
834
recv3(c: Chan<i32>)835 fn recv3(c: Chan<i32>) {
836 let c2 = make::<i32>(1);
837 select! {
838 recv(c.rx()) -> _ => (),
839 recv(c2.rx()) -> _ => ()
840 }
841 }
842
send1(recv: fn(Chan<i32>))843 fn send1(recv: fn(Chan<i32>)) {
844 let c = make::<i32>(1);
845 go!(c, recv(c));
846 thread::yield_now();
847 c.send(1);
848 }
849
send2(recv: fn(Chan<i32>))850 fn send2(recv: fn(Chan<i32>)) {
851 let c = make::<i32>(1);
852 go!(c, recv(c));
853 thread::yield_now();
854 select! {
855 send(c.tx(), 1) -> _ => ()
856 }
857 }
858
send3(recv: fn(Chan<i32>))859 fn send3(recv: fn(Chan<i32>)) {
860 let c = make::<i32>(1);
861 go!(c, recv(c));
862 thread::yield_now();
863 let c2 = make::<i32>(1);
864 select! {
865 send(c.tx(), 1) -> _ => (),
866 send(c2.tx(), 1) -> _ => ()
867 }
868 }
869
870 #[test]
main()871 fn main() {
872 send1(recv1);
873 send2(recv1);
874 send3(recv1);
875 send1(recv2);
876 send2(recv2);
877 send3(recv2);
878 send1(recv3);
879 send2(recv3);
880 send3(recv3);
881 }
882 }
883
884 // https://github.com/golang/go/blob/master/test/chan/sieve1.go
885 mod sieve1 {
886 use super::*;
887
generate(ch: Chan<i32>)888 fn generate(ch: Chan<i32>) {
889 let mut i = 2;
890 loop {
891 ch.send(i);
892 i += 1;
893 }
894 }
895
filter(in_ch: Chan<i32>, out_ch: Chan<i32>, prime: i32)896 fn filter(in_ch: Chan<i32>, out_ch: Chan<i32>, prime: i32) {
897 for i in in_ch {
898 if i % prime != 0 {
899 out_ch.send(i);
900 }
901 }
902 }
903
sieve(primes: Chan<i32>)904 fn sieve(primes: Chan<i32>) {
905 let mut ch = make::<i32>(1);
906 go!(ch, generate(ch));
907 loop {
908 let prime = ch.recv().unwrap();
909 primes.send(prime);
910
911 let ch1 = make::<i32>(1);
912 go!(ch, ch1, prime, filter(ch, ch1, prime));
913 ch = ch1;
914 }
915 }
916
917 #[test]
main()918 fn main() {
919 let primes = make::<i32>(1);
920 go!(primes, sieve(primes));
921
922 let a = [
923 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83,
924 89, 97,
925 ];
926 #[cfg(miri)]
927 let a = &a[..10];
928
929 for item in a.iter() {
930 let x = primes.recv().unwrap();
931 if x != *item {
932 println!("{} != {}", x, item);
933 panic!("fail");
934 }
935 }
936 }
937 }
938
939 // https://github.com/golang/go/blob/master/test/chan/zerosize.go
940 mod zerosize {
941 use super::*;
942
943 #[test]
zero_size_struct()944 fn zero_size_struct() {
945 struct ZeroSize;
946 let _ = make::<ZeroSize>(0);
947 }
948
949 #[test]
zero_size_array()950 fn zero_size_array() {
951 let _ = make::<[u8; 0]>(0);
952 }
953 }
954
955 // https://github.com/golang/go/blob/master/src/runtime/chan_test.go
956 mod chan_test {
957 use super::*;
958
959 #[test]
test_chan()960 fn test_chan() {
961 #[cfg(miri)]
962 const N: i32 = 12;
963 #[cfg(not(miri))]
964 const N: i32 = 200;
965
966 #[cfg(miri)]
967 const MESSAGES_COUNT: i32 = 20;
968 #[cfg(not(miri))]
969 const MESSAGES_COUNT: i32 = 100;
970
971 for cap in 0..N {
972 {
973 // Ensure that receive from empty chan blocks.
974 let c = make::<i32>(cap as usize);
975
976 let recv1 = Arc::new(Mutex::new(false));
977 go!(c, recv1, {
978 c.recv();
979 *recv1.lock().unwrap() = true;
980 });
981
982 let recv2 = Arc::new(Mutex::new(false));
983 go!(c, recv2, {
984 c.recv();
985 *recv2.lock().unwrap() = true;
986 });
987
988 thread::sleep(ms(1));
989
990 if *recv1.lock().unwrap() || *recv2.lock().unwrap() {
991 panic!();
992 }
993
994 // Ensure that non-blocking receive does not block.
995 select! {
996 recv(c.rx()) -> _ => panic!(),
997 default => {}
998 }
999 select! {
1000 recv(c.rx()) -> _ => panic!(),
1001 default => {}
1002 }
1003
1004 c.send(0);
1005 c.send(0);
1006 }
1007
1008 {
1009 // Ensure that send to full chan blocks.
1010 let c = make::<i32>(cap as usize);
1011 for i in 0..cap {
1012 c.send(i);
1013 }
1014
1015 let sent = Arc::new(Mutex::new(0));
1016 go!(sent, c, {
1017 c.send(0);
1018 *sent.lock().unwrap() = 1;
1019 });
1020
1021 thread::sleep(ms(1));
1022
1023 if *sent.lock().unwrap() != 0 {
1024 panic!();
1025 }
1026
1027 // Ensure that non-blocking send does not block.
1028 select! {
1029 send(c.tx(), 0) -> _ => panic!(),
1030 default => {}
1031 }
1032 c.recv();
1033 }
1034
1035 {
1036 // Ensure that we receive 0 from closed chan.
1037 let c = make::<i32>(cap as usize);
1038 for i in 0..cap {
1039 c.send(i);
1040 }
1041 c.close_s();
1042
1043 for i in 0..cap {
1044 let v = c.recv();
1045 if v != Some(i) {
1046 panic!();
1047 }
1048 }
1049
1050 if c.recv() != None {
1051 panic!();
1052 }
1053 if c.try_recv() != None {
1054 panic!();
1055 }
1056 }
1057
1058 {
1059 // Ensure that close unblocks receive.
1060 let c = make::<i32>(cap as usize);
1061 let done = make::<bool>(0);
1062
1063 go!(c, done, {
1064 let v = c.try_recv();
1065 done.send(v.is_none());
1066 });
1067
1068 thread::sleep(ms(1));
1069 c.close_s();
1070
1071 if !done.recv().unwrap() {
1072 panic!();
1073 }
1074 }
1075
1076 {
1077 // Send many integers,
1078 // ensure that we receive them non-corrupted in FIFO order.
1079 let c = make::<i32>(cap as usize);
1080 go!(c, {
1081 for i in 0..MESSAGES_COUNT {
1082 c.send(i);
1083 }
1084 });
1085 for i in 0..MESSAGES_COUNT {
1086 if c.recv() != Some(i) {
1087 panic!();
1088 }
1089 }
1090
1091 // Same, but using recv2.
1092 go!(c, {
1093 for i in 0..MESSAGES_COUNT {
1094 c.send(i);
1095 }
1096 });
1097 for i in 0..MESSAGES_COUNT {
1098 if c.recv() != Some(i) {
1099 panic!();
1100 }
1101 }
1102 }
1103 }
1104 }
1105
1106 #[test]
test_nonblock_recv_race()1107 fn test_nonblock_recv_race() {
1108 #[cfg(miri)]
1109 const N: usize = 100;
1110 #[cfg(not(miri))]
1111 const N: usize = 1000;
1112
1113 for _ in 0..N {
1114 let c = make::<i32>(1);
1115 c.send(1);
1116
1117 let t = go!(c, {
1118 select! {
1119 recv(c.rx()) -> _ => {}
1120 default => panic!("chan is not ready"),
1121 }
1122 });
1123
1124 c.close_s();
1125 c.recv();
1126 t.join().unwrap();
1127 }
1128 }
1129
1130 #[test]
test_nonblock_select_race()1131 fn test_nonblock_select_race() {
1132 #[cfg(miri)]
1133 const N: usize = 100;
1134 #[cfg(not(miri))]
1135 const N: usize = 1000;
1136
1137 let done = make::<bool>(1);
1138 for _ in 0..N {
1139 let c1 = make::<i32>(1);
1140 let c2 = make::<i32>(1);
1141 c1.send(1);
1142
1143 go!(c1, c2, done, {
1144 select! {
1145 recv(c1.rx()) -> _ => {}
1146 recv(c2.rx()) -> _ => {}
1147 default => {
1148 done.send(false);
1149 return;
1150 }
1151 }
1152 done.send(true);
1153 });
1154
1155 c2.send(1);
1156 select! {
1157 recv(c1.rx()) -> _ => {}
1158 default => {}
1159 }
1160 if !done.recv().unwrap() {
1161 panic!("no chan is ready");
1162 }
1163 }
1164 }
1165
1166 #[test]
test_nonblock_select_race2()1167 fn test_nonblock_select_race2() {
1168 #[cfg(miri)]
1169 const N: usize = 100;
1170 #[cfg(not(miri))]
1171 const N: usize = 1000;
1172
1173 let done = make::<bool>(1);
1174 for _ in 0..N {
1175 let c1 = make::<i32>(1);
1176 let c2 = make::<i32>(0);
1177 c1.send(1);
1178
1179 go!(c1, c2, done, {
1180 select! {
1181 recv(c1.rx()) -> _ => {}
1182 recv(c2.rx()) -> _ => {}
1183 default => {
1184 done.send(false);
1185 return;
1186 }
1187 }
1188 done.send(true);
1189 });
1190
1191 c2.close_s();
1192 select! {
1193 recv(c1.rx()) -> _ => {}
1194 default => {}
1195 }
1196 if !done.recv().unwrap() {
1197 panic!("no chan is ready");
1198 }
1199 }
1200 }
1201
1202 #[test]
test_self_select()1203 fn test_self_select() {
1204 // Ensure that send/recv on the same chan in select
1205 // does not crash nor deadlock.
1206
1207 #[cfg(miri)]
1208 const N: usize = 100;
1209 #[cfg(not(miri))]
1210 const N: usize = 1000;
1211
1212 for &cap in &[0, 10] {
1213 let wg = WaitGroup::new();
1214 wg.add(2);
1215 let c = make::<i32>(cap);
1216
1217 for p in 0..2 {
1218 let p = p;
1219 go!(wg, p, c, {
1220 defer! { wg.done() }
1221 for i in 0..N {
1222 if p == 0 || i % 2 == 0 {
1223 select! {
1224 send(c.tx(), p) -> _ => {}
1225 recv(c.rx()) -> v => {
1226 if cap == 0 && v.ok() == Some(p) {
1227 panic!("self receive");
1228 }
1229 }
1230 }
1231 } else {
1232 select! {
1233 recv(c.rx()) -> v => {
1234 if cap == 0 && v.ok() == Some(p) {
1235 panic!("self receive");
1236 }
1237 }
1238 send(c.tx(), p) -> _ => {}
1239 }
1240 }
1241 }
1242 });
1243 }
1244 wg.wait();
1245 }
1246 }
1247
1248 #[test]
test_select_stress()1249 fn test_select_stress() {
1250 #[cfg(miri)]
1251 const N: usize = 100;
1252 #[cfg(not(miri))]
1253 const N: usize = 10000;
1254
1255 let c = vec![
1256 make::<i32>(0),
1257 make::<i32>(0),
1258 make::<i32>(2),
1259 make::<i32>(3),
1260 ];
1261
1262 // There are 4 goroutines that send N values on each of the chans,
1263 // + 4 goroutines that receive N values on each of the chans,
1264 // + 1 goroutine that sends N values on each of the chans in a single select,
1265 // + 1 goroutine that receives N values on each of the chans in a single select.
1266 // All these sends, receives and selects interact chaotically at runtime,
1267 // but we are careful that this whole construct does not deadlock.
1268 let wg = WaitGroup::new();
1269 wg.add(10);
1270
1271 for k in 0..4 {
1272 go!(k, c, wg, {
1273 for _ in 0..N {
1274 c[k].send(0);
1275 }
1276 wg.done();
1277 });
1278 go!(k, c, wg, {
1279 for _ in 0..N {
1280 c[k].recv();
1281 }
1282 wg.done();
1283 });
1284 }
1285
1286 go!(c, wg, {
1287 let mut n = [0; 4];
1288 let mut c1 = c.iter().map(|c| Some(c.rx().clone())).collect::<Vec<_>>();
1289
1290 for _ in 0..4 * N {
1291 let index = {
1292 let mut sel = Select::new();
1293 let mut opers = [!0; 4];
1294 for &i in &[3, 2, 0, 1] {
1295 if let Some(c) = &c1[i] {
1296 opers[i] = sel.recv(c);
1297 }
1298 }
1299
1300 let oper = sel.select();
1301 let mut index = !0;
1302 for i in 0..4 {
1303 if opers[i] == oper.index() {
1304 index = i;
1305 let _ = oper.recv(c1[i].as_ref().unwrap());
1306 break;
1307 }
1308 }
1309 index
1310 };
1311
1312 n[index] += 1;
1313 if n[index] == N {
1314 c1[index] = None;
1315 }
1316 }
1317 wg.done();
1318 });
1319
1320 go!(c, wg, {
1321 let mut n = [0; 4];
1322 let mut c1 = c.iter().map(|c| Some(c.tx().clone())).collect::<Vec<_>>();
1323
1324 for _ in 0..4 * N {
1325 let index = {
1326 let mut sel = Select::new();
1327 let mut opers = [!0; 4];
1328 for &i in &[0, 1, 2, 3] {
1329 if let Some(c) = &c1[i] {
1330 opers[i] = sel.send(c);
1331 }
1332 }
1333
1334 let oper = sel.select();
1335 let mut index = !0;
1336 for i in 0..4 {
1337 if opers[i] == oper.index() {
1338 index = i;
1339 let _ = oper.send(c1[i].as_ref().unwrap(), 0);
1340 break;
1341 }
1342 }
1343 index
1344 };
1345
1346 n[index] += 1;
1347 if n[index] == N {
1348 c1[index] = None;
1349 }
1350 }
1351 wg.done();
1352 });
1353
1354 wg.wait();
1355 }
1356
1357 #[test]
test_select_fairness()1358 fn test_select_fairness() {
1359 #[cfg(miri)]
1360 const TRIALS: usize = 100;
1361 #[cfg(not(miri))]
1362 const TRIALS: usize = 10000;
1363
1364 let c1 = make::<u8>(TRIALS + 1);
1365 let c2 = make::<u8>(TRIALS + 1);
1366
1367 for _ in 0..TRIALS + 1 {
1368 c1.send(1);
1369 c2.send(2);
1370 }
1371
1372 let c3 = make::<u8>(0);
1373 let c4 = make::<u8>(0);
1374 let out = make::<u8>(0);
1375 let done = make::<u8>(0);
1376 let wg = WaitGroup::new();
1377
1378 wg.add(1);
1379 go!(wg, c1, c2, c3, c4, out, done, {
1380 defer! { wg.done() };
1381 loop {
1382 let b;
1383 select! {
1384 recv(c3.rx()) -> m => b = m.unwrap(),
1385 recv(c4.rx()) -> m => b = m.unwrap(),
1386 recv(c1.rx()) -> m => b = m.unwrap(),
1387 recv(c2.rx()) -> m => b = m.unwrap(),
1388 }
1389 select! {
1390 send(out.tx(), b) -> _ => {}
1391 recv(done.rx()) -> _ => return,
1392 }
1393 }
1394 });
1395
1396 let (mut cnt1, mut cnt2) = (0, 0);
1397 for _ in 0..TRIALS {
1398 match out.recv() {
1399 Some(1) => cnt1 += 1,
1400 Some(2) => cnt2 += 1,
1401 b => panic!("unexpected value {:?} on channel", b),
1402 }
1403 }
1404
1405 // If the select in the goroutine is fair,
1406 // cnt1 and cnt2 should be about the same value.
1407 // With 10,000 trials, the expected margin of error at
1408 // a confidence level of five nines is 4.4172 / (2 * Sqrt(10000)).
1409
1410 let r = cnt1 as f64 / TRIALS as f64;
1411 let e = (r - 0.5).abs();
1412
1413 if e > 4.4172 / (2.0 * (TRIALS as f64).sqrt()) {
1414 panic!(
1415 "unfair select: in {} trials, results were {}, {}",
1416 TRIALS, cnt1, cnt2,
1417 );
1418 }
1419
1420 done.close_s();
1421 wg.wait();
1422 }
1423
1424 #[test]
test_chan_send_interface()1425 fn test_chan_send_interface() {
1426 struct Mt;
1427
1428 let c = make::<Box<dyn Any>>(1);
1429 c.send(Box::new(Mt));
1430
1431 select! {
1432 send(c.tx(), Box::new(Mt)) -> _ => {}
1433 default => {}
1434 }
1435
1436 select! {
1437 send(c.tx(), Box::new(Mt)) -> _ => {}
1438 send(c.tx(), Box::new(Mt)) -> _ => {}
1439 default => {}
1440 }
1441 }
1442
1443 #[test]
test_pseudo_random_send()1444 fn test_pseudo_random_send() {
1445 #[cfg(miri)]
1446 const N: usize = 20;
1447 #[cfg(not(miri))]
1448 const N: usize = 100;
1449
1450 for cap in 0..N {
1451 let c = make::<i32>(cap);
1452 let l = Arc::new(Mutex::new(vec![0i32; N]));
1453 let done = make::<bool>(0);
1454
1455 go!(c, done, l, {
1456 let mut l = l.lock().unwrap();
1457 for i in 0..N {
1458 thread::yield_now();
1459 l[i] = c.recv().unwrap();
1460 }
1461 done.send(true);
1462 });
1463
1464 for _ in 0..N {
1465 select! {
1466 send(c.tx(), 1) -> _ => {}
1467 send(c.tx(), 0) -> _ => {}
1468 }
1469 }
1470 done.recv();
1471
1472 let mut n0 = 0;
1473 let mut n1 = 0;
1474 for &i in l.lock().unwrap().iter() {
1475 n0 += (i + 1) % 2;
1476 n1 += i;
1477 }
1478
1479 if n0 <= N as i32 / 10 || n1 <= N as i32 / 10 {
1480 panic!(
1481 "Want pseudorandom, got {} zeros and {} ones (chan cap {})",
1482 n0, n1, cap,
1483 );
1484 }
1485 }
1486 }
1487
1488 #[test]
test_multi_consumer()1489 fn test_multi_consumer() {
1490 const NWORK: usize = 23;
1491 #[cfg(miri)]
1492 const NITER: usize = 50;
1493 #[cfg(not(miri))]
1494 const NITER: usize = 271828;
1495
1496 let pn = [2, 3, 7, 11, 13, 17, 19, 23, 27, 31];
1497
1498 let q = make::<i32>(NWORK * 3);
1499 let r = make::<i32>(NWORK * 3);
1500
1501 let wg = WaitGroup::new();
1502 for i in 0..NWORK {
1503 wg.add(1);
1504 let w = i;
1505 go!(q, r, wg, pn, {
1506 for v in &q {
1507 if pn[w % pn.len()] == v {
1508 thread::yield_now();
1509 }
1510 r.send(v);
1511 }
1512 wg.done();
1513 });
1514 }
1515
1516 let expect = Arc::new(Mutex::new(0));
1517 go!(q, r, expect, wg, pn, {
1518 for i in 0..NITER {
1519 let v = pn[i % pn.len()];
1520 *expect.lock().unwrap() += v;
1521 q.send(v);
1522 }
1523 q.close_s();
1524 wg.wait();
1525 r.close_s();
1526 });
1527
1528 let mut n = 0;
1529 let mut s = 0;
1530 for v in &r {
1531 n += 1;
1532 s += v;
1533 }
1534
1535 if n != NITER || s != *expect.lock().unwrap() {
1536 panic!();
1537 }
1538 }
1539
1540 #[test]
test_select_duplicate_channel()1541 fn test_select_duplicate_channel() {
1542 // This test makes sure we can queue a G on
1543 // the same channel multiple times.
1544 let c = make::<i32>(0);
1545 let d = make::<i32>(0);
1546 let e = make::<i32>(0);
1547
1548 go!(c, d, e, {
1549 select! {
1550 recv(c.rx()) -> _ => {}
1551 recv(d.rx()) -> _ => {}
1552 recv(e.rx()) -> _ => {}
1553 }
1554 e.send(9);
1555 });
1556 thread::sleep(ms(1));
1557
1558 go!(c, c.recv());
1559 thread::sleep(ms(1));
1560
1561 d.send(7);
1562 e.recv();
1563 c.send(8);
1564 }
1565 }
1566
1567 // https://github.com/golang/go/blob/master/test/closedchan.go
1568 mod closedchan {
1569 // TODO
1570 }
1571
1572 // https://github.com/golang/go/blob/master/src/runtime/chanbarrier_test.go
1573 mod chanbarrier_test {
1574 // TODO
1575 }
1576
1577 // https://github.com/golang/go/blob/master/src/runtime/race/testdata/chan_test.go
1578 mod race_chan_test {
1579 // TODO
1580 }
1581
1582 // https://github.com/golang/go/blob/master/test/ken/chan.go
1583 mod chan {
1584 use super::*;
1585
1586 const MESSAGES_PER_CHANEL: u32 = 76;
1587 const MESSAGES_RANGE_LEN: u32 = 100;
1588 const END: i32 = 10000;
1589
1590 struct ChanWithVals {
1591 chan: Chan<i32>,
1592 /// Next value to send
1593 sv: Arc<AtomicI32>,
1594 /// Next value to receive
1595 rv: Arc<AtomicI32>,
1596 }
1597
1598 struct Totals {
1599 /// Total sent messages
1600 tots: u32,
1601 /// Total received messages
1602 totr: u32,
1603 }
1604
1605 struct Context {
1606 nproc: Arc<Mutex<i32>>,
1607 cval: Arc<Mutex<i32>>,
1608 tot: Arc<Mutex<Totals>>,
1609 nc: ChanWithVals,
1610 randx: Arc<Mutex<i32>>,
1611 }
1612
1613 impl ChanWithVals {
with_capacity(capacity: usize) -> Self1614 fn with_capacity(capacity: usize) -> Self {
1615 ChanWithVals {
1616 chan: make(capacity),
1617 sv: Arc::new(AtomicI32::new(0)),
1618 rv: Arc::new(AtomicI32::new(0)),
1619 }
1620 }
1621
closed() -> Self1622 fn closed() -> Self {
1623 let ch = ChanWithVals::with_capacity(0);
1624 ch.chan.close_r();
1625 ch.chan.close_s();
1626 ch
1627 }
1628
rv(&self) -> i321629 fn rv(&self) -> i32 {
1630 self.rv.load(SeqCst)
1631 }
1632
sv(&self) -> i321633 fn sv(&self) -> i32 {
1634 self.sv.load(SeqCst)
1635 }
1636
send(&mut self, tot: &Mutex<Totals>) -> bool1637 fn send(&mut self, tot: &Mutex<Totals>) -> bool {
1638 {
1639 let mut tot = tot.lock().unwrap();
1640 tot.tots += 1
1641 }
1642 let esv = expect(self.sv(), self.sv());
1643 self.sv.store(esv, SeqCst);
1644 if self.sv() == END {
1645 self.chan.close_s();
1646 return true;
1647 }
1648 false
1649 }
1650
recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool1651 fn recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool {
1652 {
1653 let mut tot = tot.lock().unwrap();
1654 tot.totr += 1
1655 }
1656 let erv = expect(self.rv(), v);
1657 self.rv.store(erv, SeqCst);
1658 if self.rv() == END {
1659 self.chan.close_r();
1660 return true;
1661 }
1662 false
1663 }
1664 }
1665
1666 impl Clone for ChanWithVals {
clone(&self) -> Self1667 fn clone(&self) -> Self {
1668 ChanWithVals {
1669 chan: self.chan.clone(),
1670 sv: self.sv.clone(),
1671 rv: self.rv.clone(),
1672 }
1673 }
1674 }
1675
1676 impl Context {
nproc(&self) -> &Mutex<i32>1677 fn nproc(&self) -> &Mutex<i32> {
1678 self.nproc.as_ref()
1679 }
1680
cval(&self) -> &Mutex<i32>1681 fn cval(&self) -> &Mutex<i32> {
1682 self.cval.as_ref()
1683 }
1684
tot(&self) -> &Mutex<Totals>1685 fn tot(&self) -> &Mutex<Totals> {
1686 self.tot.as_ref()
1687 }
1688
randx(&self) -> &Mutex<i32>1689 fn randx(&self) -> &Mutex<i32> {
1690 self.randx.as_ref()
1691 }
1692 }
1693
1694 impl Clone for Context {
clone(&self) -> Self1695 fn clone(&self) -> Self {
1696 Context {
1697 nproc: self.nproc.clone(),
1698 cval: self.cval.clone(),
1699 tot: self.tot.clone(),
1700 nc: self.nc.clone(),
1701 randx: self.randx.clone(),
1702 }
1703 }
1704 }
1705
nrand(n: i32, randx: &Mutex<i32>) -> i321706 fn nrand(n: i32, randx: &Mutex<i32>) -> i32 {
1707 let mut randx = randx.lock().unwrap();
1708 *randx += 10007;
1709 if *randx >= 1000000 {
1710 *randx -= 1000000
1711 }
1712 *randx % n
1713 }
1714
change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i321715 fn change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i32 {
1716 let mut nproc = nproc.lock().unwrap();
1717 *nproc += adjust;
1718 *nproc
1719 }
1720
mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals>1721 fn mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals> {
1722 let mut ca = Vec::<ChanWithVals>::with_capacity(n);
1723 let mut cval = cval.lock().unwrap();
1724 for _ in 0..n {
1725 *cval += MESSAGES_RANGE_LEN as i32;
1726 let chl = ChanWithVals::with_capacity(c);
1727 chl.sv.store(*cval, SeqCst);
1728 chl.rv.store(*cval, SeqCst);
1729 ca.push(chl);
1730 }
1731 ca
1732 }
1733
expect(v: i32, v0: i32) -> i321734 fn expect(v: i32, v0: i32) -> i32 {
1735 if v == v0 {
1736 return if v % MESSAGES_RANGE_LEN as i32 == MESSAGES_PER_CHANEL as i32 - 1 {
1737 END
1738 } else {
1739 v + 1
1740 };
1741 }
1742 panic!("got {}, expected {}", v, v0 + 1);
1743 }
1744
send(mut c: ChanWithVals, ctx: Context)1745 fn send(mut c: ChanWithVals, ctx: Context) {
1746 loop {
1747 for _ in 0..=nrand(10, ctx.randx()) {
1748 thread::yield_now();
1749 }
1750 c.chan.tx().send(c.sv()).unwrap();
1751 if c.send(ctx.tot()) {
1752 break;
1753 }
1754 }
1755 change_nproc(-1, ctx.nproc());
1756 }
1757
recv(mut c: ChanWithVals, ctx: Context)1758 fn recv(mut c: ChanWithVals, ctx: Context) {
1759 loop {
1760 for _ in (0..nrand(10, ctx.randx())).rev() {
1761 thread::yield_now();
1762 }
1763 let v = c.chan.rx().recv().unwrap();
1764 if c.recv(v, ctx.tot()) {
1765 break;
1766 }
1767 }
1768 change_nproc(-1, ctx.nproc());
1769 }
1770
1771 #[allow(clippy::too_many_arguments)]
sel( mut r0: ChanWithVals, mut r1: ChanWithVals, mut r2: ChanWithVals, mut r3: ChanWithVals, mut s0: ChanWithVals, mut s1: ChanWithVals, mut s2: ChanWithVals, mut s3: ChanWithVals, ctx: Context, )1772 fn sel(
1773 mut r0: ChanWithVals,
1774 mut r1: ChanWithVals,
1775 mut r2: ChanWithVals,
1776 mut r3: ChanWithVals,
1777 mut s0: ChanWithVals,
1778 mut s1: ChanWithVals,
1779 mut s2: ChanWithVals,
1780 mut s3: ChanWithVals,
1781 ctx: Context,
1782 ) {
1783 let mut a = 0; // local chans running
1784
1785 if r0.chan.has_rx() {
1786 a += 1;
1787 }
1788 if r1.chan.has_rx() {
1789 a += 1;
1790 }
1791 if r2.chan.has_rx() {
1792 a += 1;
1793 }
1794 if r3.chan.has_rx() {
1795 a += 1;
1796 }
1797 if s0.chan.has_tx() {
1798 a += 1;
1799 }
1800 if s1.chan.has_tx() {
1801 a += 1;
1802 }
1803 if s2.chan.has_tx() {
1804 a += 1;
1805 }
1806 if s3.chan.has_tx() {
1807 a += 1;
1808 }
1809
1810 loop {
1811 for _ in 0..=nrand(5, ctx.randx()) {
1812 thread::yield_now();
1813 }
1814 select! {
1815 recv(r0.chan.rx()) -> v => if r0.recv(v.unwrap(), ctx.tot()) { a -= 1 },
1816 recv(r1.chan.rx()) -> v => if r1.recv(v.unwrap(), ctx.tot()) { a -= 1 },
1817 recv(r2.chan.rx()) -> v => if r2.recv(v.unwrap(), ctx.tot()) { a -= 1 },
1818 recv(r3.chan.rx()) -> v => if r3.recv(v.unwrap(), ctx.tot()) { a -= 1 },
1819 send(s0.chan.tx(), s0.sv()) -> _ => if s0.send(ctx.tot()) { a -= 1 },
1820 send(s1.chan.tx(), s1.sv()) -> _ => if s1.send(ctx.tot()) { a -= 1 },
1821 send(s2.chan.tx(), s2.sv()) -> _ => if s2.send(ctx.tot()) { a -= 1 },
1822 send(s3.chan.tx(), s3.sv()) -> _ => if s3.send(ctx.tot()) { a -= 1 },
1823 }
1824 if a == 0 {
1825 break;
1826 }
1827 }
1828 change_nproc(-1, ctx.nproc());
1829 }
1830
get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals1831 fn get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals {
1832 vec.get(idx).unwrap().clone()
1833 }
1834
1835 /// Direct send to direct recv
test1(c: ChanWithVals, ctx: &mut Context)1836 fn test1(c: ChanWithVals, ctx: &mut Context) {
1837 change_nproc(2, ctx.nproc());
1838 go!(c, ctx, send(c, ctx));
1839 go!(c, ctx, recv(c, ctx));
1840 }
1841
1842 /// Direct send to select recv
test2(c: usize, ctx: &mut Context)1843 fn test2(c: usize, ctx: &mut Context) {
1844 let ca = mkchan(c, 4, ctx.cval());
1845
1846 change_nproc(4, ctx.nproc());
1847 go!(ca, ctx, send(get(&ca, 0), ctx));
1848 go!(ca, ctx, send(get(&ca, 1), ctx));
1849 go!(ca, ctx, send(get(&ca, 2), ctx));
1850 go!(ca, ctx, send(get(&ca, 3), ctx));
1851
1852 change_nproc(1, ctx.nproc());
1853 go!(
1854 ca,
1855 ctx,
1856 sel(
1857 get(&ca, 0),
1858 get(&ca, 1),
1859 get(&ca, 2),
1860 get(&ca, 3),
1861 ctx.nc.clone(),
1862 ctx.nc.clone(),
1863 ctx.nc.clone(),
1864 ctx.nc.clone(),
1865 ctx,
1866 )
1867 );
1868 }
1869
1870 /// Select send to direct recv
test3(c: usize, ctx: &mut Context)1871 fn test3(c: usize, ctx: &mut Context) {
1872 let ca = mkchan(c, 4, ctx.cval());
1873
1874 change_nproc(4, ctx.nproc());
1875 go!(ca, ctx, recv(get(&ca, 0), ctx));
1876 go!(ca, ctx, recv(get(&ca, 1), ctx));
1877 go!(ca, ctx, recv(get(&ca, 2), ctx));
1878 go!(ca, ctx, recv(get(&ca, 3), ctx));
1879
1880 change_nproc(1, ctx.nproc());
1881 go!(
1882 ca,
1883 ctx,
1884 sel(
1885 ctx.nc.clone(),
1886 ctx.nc.clone(),
1887 ctx.nc.clone(),
1888 ctx.nc.clone(),
1889 get(&ca, 0),
1890 get(&ca, 1),
1891 get(&ca, 2),
1892 get(&ca, 3),
1893 ctx,
1894 )
1895 );
1896 }
1897
1898 /// Select send to select recv, 4 channels
test4(c: usize, ctx: &mut Context)1899 fn test4(c: usize, ctx: &mut Context) {
1900 let ca = mkchan(c, 4, ctx.cval());
1901
1902 change_nproc(2, ctx.nproc());
1903 go!(
1904 ca,
1905 ctx,
1906 sel(
1907 ctx.nc.clone(),
1908 ctx.nc.clone(),
1909 ctx.nc.clone(),
1910 ctx.nc.clone(),
1911 get(&ca, 0),
1912 get(&ca, 1),
1913 get(&ca, 2),
1914 get(&ca, 3),
1915 ctx,
1916 )
1917 );
1918 go!(
1919 ca,
1920 ctx,
1921 sel(
1922 get(&ca, 0),
1923 get(&ca, 1),
1924 get(&ca, 2),
1925 get(&ca, 3),
1926 ctx.nc.clone(),
1927 ctx.nc.clone(),
1928 ctx.nc.clone(),
1929 ctx.nc.clone(),
1930 ctx,
1931 )
1932 );
1933 }
1934
1935 /// Select send to select recv, 8 channels
test5(c: usize, ctx: &mut Context)1936 fn test5(c: usize, ctx: &mut Context) {
1937 let ca = mkchan(c, 8, ctx.cval());
1938
1939 change_nproc(2, ctx.nproc());
1940 go!(
1941 ca,
1942 ctx,
1943 sel(
1944 get(&ca, 4),
1945 get(&ca, 5),
1946 get(&ca, 6),
1947 get(&ca, 7),
1948 get(&ca, 0),
1949 get(&ca, 1),
1950 get(&ca, 2),
1951 get(&ca, 3),
1952 ctx,
1953 )
1954 );
1955 go!(
1956 ca,
1957 ctx,
1958 sel(
1959 get(&ca, 0),
1960 get(&ca, 1),
1961 get(&ca, 2),
1962 get(&ca, 3),
1963 get(&ca, 4),
1964 get(&ca, 5),
1965 get(&ca, 6),
1966 get(&ca, 7),
1967 ctx,
1968 )
1969 );
1970 }
1971
1972 // Direct and select send to direct and select recv
test6(c: usize, ctx: &mut Context)1973 fn test6(c: usize, ctx: &mut Context) {
1974 let ca = mkchan(c, 12, ctx.cval());
1975
1976 change_nproc(4, ctx.nproc());
1977 go!(ca, ctx, send(get(&ca, 4), ctx));
1978 go!(ca, ctx, send(get(&ca, 5), ctx));
1979 go!(ca, ctx, send(get(&ca, 6), ctx));
1980 go!(ca, ctx, send(get(&ca, 7), ctx));
1981
1982 change_nproc(4, ctx.nproc());
1983 go!(ca, ctx, recv(get(&ca, 8), ctx));
1984 go!(ca, ctx, recv(get(&ca, 9), ctx));
1985 go!(ca, ctx, recv(get(&ca, 10), ctx));
1986 go!(ca, ctx, recv(get(&ca, 11), ctx));
1987
1988 change_nproc(2, ctx.nproc());
1989 go!(
1990 ca,
1991 ctx,
1992 sel(
1993 get(&ca, 4),
1994 get(&ca, 5),
1995 get(&ca, 6),
1996 get(&ca, 7),
1997 get(&ca, 0),
1998 get(&ca, 1),
1999 get(&ca, 2),
2000 get(&ca, 3),
2001 ctx,
2002 )
2003 );
2004 go!(
2005 ca,
2006 ctx,
2007 sel(
2008 get(&ca, 0),
2009 get(&ca, 1),
2010 get(&ca, 2),
2011 get(&ca, 3),
2012 get(&ca, 8),
2013 get(&ca, 9),
2014 get(&ca, 10),
2015 get(&ca, 11),
2016 ctx,
2017 )
2018 );
2019 }
2020
wait(ctx: &mut Context)2021 fn wait(ctx: &mut Context) {
2022 thread::yield_now();
2023 while change_nproc(0, ctx.nproc()) != 0 {
2024 thread::yield_now();
2025 }
2026 }
2027
tests(c: usize, ctx: &mut Context)2028 fn tests(c: usize, ctx: &mut Context) {
2029 let ca = mkchan(c, 4, ctx.cval());
2030 test1(get(&ca, 0), ctx);
2031 test1(get(&ca, 1), ctx);
2032 test1(get(&ca, 2), ctx);
2033 test1(get(&ca, 3), ctx);
2034 wait(ctx);
2035
2036 test2(c, ctx);
2037 wait(ctx);
2038
2039 test3(c, ctx);
2040 wait(ctx);
2041
2042 test4(c, ctx);
2043 wait(ctx);
2044
2045 test5(c, ctx);
2046 wait(ctx);
2047
2048 test6(c, ctx);
2049 wait(ctx);
2050 }
2051
2052 #[test]
2053 #[cfg_attr(miri, ignore)] // Miri is too slow
main()2054 fn main() {
2055 let mut ctx = Context {
2056 nproc: Arc::new(Mutex::new(0)),
2057 cval: Arc::new(Mutex::new(0)),
2058 tot: Arc::new(Mutex::new(Totals { tots: 0, totr: 0 })),
2059 nc: ChanWithVals::closed(),
2060 randx: Arc::new(Mutex::new(0)),
2061 };
2062
2063 tests(0, &mut ctx);
2064 tests(1, &mut ctx);
2065 tests(10, &mut ctx);
2066 tests(100, &mut ctx);
2067
2068 #[rustfmt::skip]
2069 let t = 4 * // buffer sizes
2070 (4*4 + // tests 1,2,3,4 channels
2071 8 + // test 5 channels
2072 12) * // test 6 channels
2073 MESSAGES_PER_CHANEL; // sends/recvs on a channel
2074
2075 let tot = ctx.tot.lock().unwrap();
2076 if tot.tots != t || tot.totr != t {
2077 panic!("tots={} totr={} sb={}", tot.tots, tot.totr, t);
2078 }
2079 }
2080 }
2081
2082 // https://github.com/golang/go/blob/master/test/ken/chan1.go
2083 mod chan1 {
2084 use super::*;
2085
2086 // sent messages
2087 #[cfg(miri)]
2088 const N: usize = 20;
2089 #[cfg(not(miri))]
2090 const N: usize = 1000;
2091 // receiving "goroutines"
2092 const M: usize = 10;
2093 // channel buffering
2094 const W: usize = 2;
2095
r(c: Chan<usize>, m: usize, h: Arc<Mutex<[usize; N]>>)2096 fn r(c: Chan<usize>, m: usize, h: Arc<Mutex<[usize; N]>>) {
2097 loop {
2098 select! {
2099 recv(c.rx()) -> rr => {
2100 let r = rr.unwrap();
2101 let mut data = h.lock().unwrap();
2102 if data[r] != 1 {
2103 println!("r\nm={}\nr={}\nh={}\n", m, r, data[r]);
2104 panic!("fail")
2105 }
2106 data[r] = 2;
2107 }
2108 }
2109 }
2110 }
2111
s(c: Chan<usize>, h: Arc<Mutex<[usize; N]>>)2112 fn s(c: Chan<usize>, h: Arc<Mutex<[usize; N]>>) {
2113 for n in 0..N {
2114 let r = n;
2115 let mut data = h.lock().unwrap();
2116 if data[r] != 0 {
2117 println!("s");
2118 panic!("fail");
2119 }
2120 data[r] = 1;
2121 // https://github.com/crossbeam-rs/crossbeam/pull/615#discussion_r550281094
2122 drop(data);
2123 c.send(r);
2124 }
2125 }
2126
2127 #[test]
main()2128 fn main() {
2129 let h = Arc::new(Mutex::new([0usize; N]));
2130 let c = make::<usize>(W);
2131 for m in 0..M {
2132 go!(c, h, {
2133 r(c, m, h);
2134 });
2135 thread::yield_now();
2136 }
2137 thread::yield_now();
2138 thread::yield_now();
2139 s(c, h);
2140 }
2141 }
2142