1 #![cfg(feature = "bilock")]
2 
3 use futures::executor::block_on;
4 use futures::future;
5 use futures::stream;
6 use futures::task::{Context, Poll};
7 use futures::Future;
8 use futures::StreamExt;
9 use futures_test::task::noop_context;
10 use futures_util::lock::BiLock;
11 use std::pin::Pin;
12 use std::thread;
13 
14 #[test]
smoke()15 fn smoke() {
16     let future = future::lazy(|cx| {
17         let (a, b) = BiLock::new(1);
18 
19         {
20             let mut lock = match a.poll_lock(cx) {
21                 Poll::Ready(l) => l,
22                 Poll::Pending => panic!("poll not ready"),
23             };
24             assert_eq!(*lock, 1);
25             *lock = 2;
26 
27             assert!(b.poll_lock(cx).is_pending());
28             assert!(a.poll_lock(cx).is_pending());
29         }
30 
31         assert!(b.poll_lock(cx).is_ready());
32         assert!(a.poll_lock(cx).is_ready());
33 
34         {
35             let lock = match b.poll_lock(cx) {
36                 Poll::Ready(l) => l,
37                 Poll::Pending => panic!("poll not ready"),
38             };
39             assert_eq!(*lock, 2);
40         }
41 
42         assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
43 
44         Ok::<(), ()>(())
45     });
46 
47     assert_eq!(block_on(future), Ok(()));
48 }
49 
50 #[test]
concurrent()51 fn concurrent() {
52     const N: usize = 10000;
53     let mut cx = noop_context();
54     let (a, b) = BiLock::new(0);
55 
56     let a = Increment { a: Some(a), remaining: N };
57     let b = stream::iter(0..N).fold(b, |b, _n| async {
58         let mut g = b.lock().await;
59         *g += 1;
60         drop(g);
61         b
62     });
63 
64     let t1 = thread::spawn(move || block_on(a));
65     let b = block_on(b);
66     let a = t1.join().unwrap();
67 
68     match a.poll_lock(&mut cx) {
69         Poll::Ready(l) => assert_eq!(*l, 2 * N),
70         Poll::Pending => panic!("poll not ready"),
71     }
72     match b.poll_lock(&mut cx) {
73         Poll::Ready(l) => assert_eq!(*l, 2 * N),
74         Poll::Pending => panic!("poll not ready"),
75     }
76 
77     assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
78 
79     struct Increment {
80         remaining: usize,
81         a: Option<BiLock<usize>>,
82     }
83 
84     impl Future for Increment {
85         type Output = BiLock<usize>;
86 
87         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<BiLock<usize>> {
88             loop {
89                 if self.remaining == 0 {
90                     return self.a.take().unwrap().into();
91                 }
92 
93                 let a = self.a.as_mut().unwrap();
94                 let mut a = match a.poll_lock(cx) {
95                     Poll::Ready(l) => l,
96                     Poll::Pending => return Poll::Pending,
97                 };
98                 *a += 1;
99                 drop(a);
100                 self.remaining -= 1;
101             }
102         }
103     }
104 }
105