1 #![cfg(loom)]
2
3 use oneshot::TryRecvError;
4
5 use loom::hint;
6 use loom::thread;
7 #[cfg(feature = "async")]
8 use std::future::Future;
9 #[cfg(feature = "async")]
10 use std::pin::Pin;
11 #[cfg(feature = "async")]
12 use std::task::{self, Poll};
13 #[cfg(feature = "std")]
14 use std::time::Duration;
15
16 mod helpers;
17
18 #[test]
try_recv()19 fn try_recv() {
20 loom::model(|| {
21 let (sender, receiver) = oneshot::channel::<u128>();
22
23 let t = thread::spawn(move || loop {
24 match receiver.try_recv() {
25 Ok(msg) => break msg,
26 Err(TryRecvError::Empty) => hint::spin_loop(),
27 Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"),
28 }
29 });
30
31 assert!(sender.send(19).is_ok());
32 assert_eq!(t.join().unwrap(), 19);
33 })
34 }
35
36 #[cfg(feature = "std")]
37 #[test]
send_recv_different_threads()38 fn send_recv_different_threads() {
39 loom::model(|| {
40 let (sender, receiver) = oneshot::channel();
41 let t2 = thread::spawn(move || {
42 assert_eq!(receiver.recv_timeout(Duration::from_millis(1)), Ok(9));
43 });
44 let t1 = thread::spawn(move || {
45 sender.send(9u128).unwrap();
46 });
47 t1.join().unwrap();
48 t2.join().unwrap();
49 })
50 }
51
52 #[cfg(feature = "std")]
53 #[test]
recv_drop_sender_different_threads()54 fn recv_drop_sender_different_threads() {
55 loom::model(|| {
56 let (sender, receiver) = oneshot::channel::<u128>();
57 let t2 = thread::spawn(move || {
58 assert!(receiver.recv_timeout(Duration::from_millis(0)).is_err());
59 });
60 let t1 = thread::spawn(move || {
61 drop(sender);
62 });
63 t1.join().unwrap();
64 t2.join().unwrap();
65 })
66 }
67
68 #[cfg(feature = "async")]
69 #[test]
async_recv()70 fn async_recv() {
71 loom::model(|| {
72 let (sender, receiver) = oneshot::channel::<u128>();
73 let t1 = thread::spawn(move || {
74 sender.send(987).unwrap();
75 });
76 assert_eq!(loom::future::block_on(receiver), Ok(987));
77 t1.join().unwrap();
78 })
79 }
80
81 #[cfg(feature = "async")]
82 #[test]
send_then_poll()83 fn send_then_poll() {
84 loom::model(|| {
85 let (sender, mut receiver) = oneshot::channel::<u128>();
86 sender.send(1234).unwrap();
87
88 let (waker, waker_handle) = helpers::waker::waker();
89 let mut context = task::Context::from_waker(&waker);
90
91 assert_eq!(
92 Pin::new(&mut receiver).poll(&mut context),
93 Poll::Ready(Ok(1234))
94 );
95 assert_eq!(waker_handle.clone_count(), 0);
96 assert_eq!(waker_handle.drop_count(), 0);
97 assert_eq!(waker_handle.wake_count(), 0);
98 })
99 }
100
101 #[cfg(feature = "async")]
102 #[test]
poll_then_send()103 fn poll_then_send() {
104 loom::model(|| {
105 let (sender, mut receiver) = oneshot::channel::<u128>();
106
107 let (waker, waker_handle) = helpers::waker::waker();
108 let mut context = task::Context::from_waker(&waker);
109
110 assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
111 assert_eq!(waker_handle.clone_count(), 1);
112 assert_eq!(waker_handle.drop_count(), 0);
113 assert_eq!(waker_handle.wake_count(), 0);
114
115 sender.send(1234).unwrap();
116 assert_eq!(waker_handle.clone_count(), 1);
117 assert_eq!(waker_handle.drop_count(), 1);
118 assert_eq!(waker_handle.wake_count(), 1);
119
120 assert_eq!(
121 Pin::new(&mut receiver).poll(&mut context),
122 Poll::Ready(Ok(1234))
123 );
124 assert_eq!(waker_handle.clone_count(), 1);
125 assert_eq!(waker_handle.drop_count(), 1);
126 assert_eq!(waker_handle.wake_count(), 1);
127 })
128 }
129
130 #[cfg(feature = "async")]
131 #[test]
poll_with_different_wakers()132 fn poll_with_different_wakers() {
133 loom::model(|| {
134 let (sender, mut receiver) = oneshot::channel::<u128>();
135
136 let (waker1, waker_handle1) = helpers::waker::waker();
137 let mut context1 = task::Context::from_waker(&waker1);
138
139 assert_eq!(Pin::new(&mut receiver).poll(&mut context1), Poll::Pending);
140 assert_eq!(waker_handle1.clone_count(), 1);
141 assert_eq!(waker_handle1.drop_count(), 0);
142 assert_eq!(waker_handle1.wake_count(), 0);
143
144 let (waker2, waker_handle2) = helpers::waker::waker();
145 let mut context2 = task::Context::from_waker(&waker2);
146
147 assert_eq!(Pin::new(&mut receiver).poll(&mut context2), Poll::Pending);
148 assert_eq!(waker_handle1.clone_count(), 1);
149 assert_eq!(waker_handle1.drop_count(), 1);
150 assert_eq!(waker_handle1.wake_count(), 0);
151
152 assert_eq!(waker_handle2.clone_count(), 1);
153 assert_eq!(waker_handle2.drop_count(), 0);
154 assert_eq!(waker_handle2.wake_count(), 0);
155
156 // Sending should cause the waker from the latest poll to be woken up
157 sender.send(1234).unwrap();
158 assert_eq!(waker_handle1.clone_count(), 1);
159 assert_eq!(waker_handle1.drop_count(), 1);
160 assert_eq!(waker_handle1.wake_count(), 0);
161
162 assert_eq!(waker_handle2.clone_count(), 1);
163 assert_eq!(waker_handle2.drop_count(), 1);
164 assert_eq!(waker_handle2.wake_count(), 1);
165 })
166 }
167
168 #[cfg(feature = "async")]
169 #[test]
poll_then_try_recv()170 fn poll_then_try_recv() {
171 loom::model(|| {
172 let (_sender, mut receiver) = oneshot::channel::<u128>();
173
174 let (waker, waker_handle) = helpers::waker::waker();
175 let mut context = task::Context::from_waker(&waker);
176
177 assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
178 assert_eq!(waker_handle.clone_count(), 1);
179 assert_eq!(waker_handle.drop_count(), 0);
180 assert_eq!(waker_handle.wake_count(), 0);
181
182 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
183
184 assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
185 assert_eq!(waker_handle.clone_count(), 2);
186 assert_eq!(waker_handle.drop_count(), 1);
187 assert_eq!(waker_handle.wake_count(), 0);
188 })
189 }
190
191 #[cfg(feature = "async")]
192 #[test]
poll_then_try_recv_while_sending()193 fn poll_then_try_recv_while_sending() {
194 loom::model(|| {
195 let (sender, mut receiver) = oneshot::channel::<u128>();
196
197 let (waker, waker_handle) = helpers::waker::waker();
198 let mut context = task::Context::from_waker(&waker);
199
200 assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending);
201 assert_eq!(waker_handle.clone_count(), 1);
202 assert_eq!(waker_handle.drop_count(), 0);
203 assert_eq!(waker_handle.wake_count(), 0);
204
205 let t = thread::spawn(move || {
206 sender.send(1234).unwrap();
207 });
208
209 let msg = loop {
210 match receiver.try_recv() {
211 Ok(msg) => break msg,
212 Err(TryRecvError::Empty) => hint::spin_loop(),
213 Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"),
214 }
215 };
216 assert_eq!(msg, 1234);
217 assert_eq!(waker_handle.clone_count(), 1);
218 assert_eq!(waker_handle.drop_count(), 1);
219 assert_eq!(waker_handle.wake_count(), 1);
220
221 t.join().unwrap();
222 })
223 }
224