1 #![cfg(loom)]
2 
3 use oneshot::TryRecvError;
4 
5 use loom::hint;
6 use loom::thread;
7 #[cfg(feature = "async")]
8 use std::future::Future;
9 #[cfg(feature = "async")]
10 use std::pin::Pin;
11 #[cfg(feature = "async")]
12 use std::task::{self, Poll};
13 #[cfg(feature = "std")]
14 use std::time::Duration;
15 
16 mod helpers;
17 
18 #[test]
try_recv()19 fn try_recv() {
20     loom::model(|| {
21         let (sender, receiver) = oneshot::channel::<u128>();
22 
23         let t = thread::spawn(move || loop {
24             match receiver.try_recv() {
25                 Ok(msg) => break msg,
26                 Err(TryRecvError::Empty) => hint::spin_loop(),
27                 Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"),
28             }
29         });
30 
31         assert!(sender.send(19).is_ok());
32         assert_eq!(t.join().unwrap(), 19);
33     })
34 }
35 
36 #[cfg(feature = "std")]
37 #[test]
send_recv_different_threads()38 fn send_recv_different_threads() {
39     loom::model(|| {
40         let (sender, receiver) = oneshot::channel();
41         let t2 = thread::spawn(move || {
42             assert_eq!(receiver.recv_timeout(Duration::from_millis(1)), Ok(9));
43         });
44         let t1 = thread::spawn(move || {
45             sender.send(9u128).unwrap();
46         });
47         t1.join().unwrap();
48         t2.join().unwrap();
49     })
50 }
51 
52 #[cfg(feature = "std")]
53 #[test]
recv_drop_sender_different_threads()54 fn recv_drop_sender_different_threads() {
55     loom::model(|| {
56         let (sender, receiver) = oneshot::channel::<u128>();
57         let t2 = thread::spawn(move || {
58             assert!(receiver.recv_timeout(Duration::from_millis(0)).is_err());
59         });
60         let t1 = thread::spawn(move || {
61             drop(sender);
62         });
63         t1.join().unwrap();
64         t2.join().unwrap();
65     })
66 }
67 
68 #[cfg(feature = "async")]
69 #[test]
async_recv()70 fn async_recv() {
71     loom::model(|| {
72         let (sender, receiver) = oneshot::channel::<u128>();
73         let t1 = thread::spawn(move || {
74             sender.send(987).unwrap();
75         });
76         assert_eq!(loom::future::block_on(receiver), Ok(987));
77         t1.join().unwrap();
78     })
79 }
80 
81 #[cfg(feature = "async")]
82 #[test]
send_then_poll()83 fn send_then_poll() {
84     loom::model(|| {
85         let (sender, mut receiver) = oneshot::channel::<u128>();
86         sender.send(1234).unwrap();
87 
88         let (waker, waker_handle) = helpers::waker::waker();
89         let mut context = task::Context::from_waker(&waker);
90 
91         assert_eq!(
92             Pin::new(&mut receiver).poll(&mut context),
93             Poll::Ready(Ok(1234))
94         );
95         assert_eq!(waker_handle.clone_count(), 0);
96         assert_eq!(waker_handle.drop_count(), 0);
97         assert_eq!(waker_handle.wake_count(), 0);
98     })
99 }
100 
101 #[cfg(feature = "async")]
102 #[test]
poll_then_send()103 fn poll_then_send() {
104     loom::model(|| {
105         let (sender, mut receiver) = oneshot::channel::<u128>();
106 
107         let (waker, waker_handle) = helpers::waker::waker();
108         let mut context = task::Context::from_waker(&waker);
109 
110         assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
111         assert_eq!(waker_handle.clone_count(), 1);
112         assert_eq!(waker_handle.drop_count(), 0);
113         assert_eq!(waker_handle.wake_count(), 0);
114 
115         sender.send(1234).unwrap();
116         assert_eq!(waker_handle.clone_count(), 1);
117         assert_eq!(waker_handle.drop_count(), 1);
118         assert_eq!(waker_handle.wake_count(), 1);
119 
120         assert_eq!(
121             Pin::new(&mut receiver).poll(&mut context),
122             Poll::Ready(Ok(1234))
123         );
124         assert_eq!(waker_handle.clone_count(), 1);
125         assert_eq!(waker_handle.drop_count(), 1);
126         assert_eq!(waker_handle.wake_count(), 1);
127     })
128 }
129 
130 #[cfg(feature = "async")]
131 #[test]
poll_with_different_wakers()132 fn poll_with_different_wakers() {
133     loom::model(|| {
134         let (sender, mut receiver) = oneshot::channel::<u128>();
135 
136         let (waker1, waker_handle1) = helpers::waker::waker();
137         let mut context1 = task::Context::from_waker(&waker1);
138 
139         assert_eq!(Pin::new(&mut receiver).poll(&mut context1), Poll::Pending);
140         assert_eq!(waker_handle1.clone_count(), 1);
141         assert_eq!(waker_handle1.drop_count(), 0);
142         assert_eq!(waker_handle1.wake_count(), 0);
143 
144         let (waker2, waker_handle2) = helpers::waker::waker();
145         let mut context2 = task::Context::from_waker(&waker2);
146 
147         assert_eq!(Pin::new(&mut receiver).poll(&mut context2), Poll::Pending);
148         assert_eq!(waker_handle1.clone_count(), 1);
149         assert_eq!(waker_handle1.drop_count(), 1);
150         assert_eq!(waker_handle1.wake_count(), 0);
151 
152         assert_eq!(waker_handle2.clone_count(), 1);
153         assert_eq!(waker_handle2.drop_count(), 0);
154         assert_eq!(waker_handle2.wake_count(), 0);
155 
156         // Sending should cause the waker from the latest poll to be woken up
157         sender.send(1234).unwrap();
158         assert_eq!(waker_handle1.clone_count(), 1);
159         assert_eq!(waker_handle1.drop_count(), 1);
160         assert_eq!(waker_handle1.wake_count(), 0);
161 
162         assert_eq!(waker_handle2.clone_count(), 1);
163         assert_eq!(waker_handle2.drop_count(), 1);
164         assert_eq!(waker_handle2.wake_count(), 1);
165     })
166 }
167 
168 #[cfg(feature = "async")]
169 #[test]
poll_then_try_recv()170 fn poll_then_try_recv() {
171     loom::model(|| {
172         let (_sender, mut receiver) = oneshot::channel::<u128>();
173 
174         let (waker, waker_handle) = helpers::waker::waker();
175         let mut context = task::Context::from_waker(&waker);
176 
177         assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
178         assert_eq!(waker_handle.clone_count(), 1);
179         assert_eq!(waker_handle.drop_count(), 0);
180         assert_eq!(waker_handle.wake_count(), 0);
181 
182         assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
183 
184         assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
185         assert_eq!(waker_handle.clone_count(), 2);
186         assert_eq!(waker_handle.drop_count(), 1);
187         assert_eq!(waker_handle.wake_count(), 0);
188     })
189 }
190 
191 #[cfg(feature = "async")]
192 #[test]
poll_then_try_recv_while_sending()193 fn poll_then_try_recv_while_sending() {
194     loom::model(|| {
195         let (sender, mut receiver) = oneshot::channel::<u128>();
196 
197         let (waker, waker_handle) = helpers::waker::waker();
198         let mut context = task::Context::from_waker(&waker);
199 
200         assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
201         assert_eq!(waker_handle.clone_count(), 1);
202         assert_eq!(waker_handle.drop_count(), 0);
203         assert_eq!(waker_handle.wake_count(), 0);
204 
205         let t = thread::spawn(move || {
206             sender.send(1234).unwrap();
207         });
208 
209         let msg = loop {
210             match receiver.try_recv() {
211                 Ok(msg) => break msg,
212                 Err(TryRecvError::Empty) => hint::spin_loop(),
213                 Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"),
214             }
215         };
216         assert_eq!(msg, 1234);
217         assert_eq!(waker_handle.clone_count(), 1);
218         assert_eq!(waker_handle.drop_count(), 1);
219         assert_eq!(waker_handle.wake_count(), 1);
220 
221         t.join().unwrap();
222     })
223 }
224