1 use futures::channel::mpsc;
2 use futures::executor::block_on;
3 use futures::future::Future;
4 use futures::sink::SinkExt;
5 use futures::stream::StreamExt;
6 use futures::task::{Context, Poll};
7 use std::pin::Pin;
8 use std::sync::{Arc, Weak};
9 use std::thread;
10 use std::time::{Duration, Instant};
11 
12 #[test]
smoke()13 fn smoke() {
14     let (mut sender, receiver) = mpsc::channel(1);
15 
16     let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});
17 
18     // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
19     block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
20 
21     t.join().unwrap()
22 }
23 
24 #[test]
multiple_senders_disconnect()25 fn multiple_senders_disconnect() {
26     {
27         let (mut tx1, mut rx) = mpsc::channel(1);
28         let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
29 
30         // disconnect, dropping and Sink::poll_close should all close this sender but leave the
31         // channel open for other senders
32         tx1.disconnect();
33         drop(tx2);
34         block_on(tx3.close()).unwrap();
35 
36         assert!(tx1.is_closed());
37         assert!(tx3.is_closed());
38         assert!(!tx4.is_closed());
39 
40         block_on(tx4.send(5)).unwrap();
41         assert_eq!(block_on(rx.next()), Some(5));
42 
43         // dropping the final sender will close the channel
44         drop(tx4);
45         assert_eq!(block_on(rx.next()), None);
46     }
47 
48     {
49         let (mut tx1, mut rx) = mpsc::unbounded();
50         let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
51 
52         // disconnect, dropping and Sink::poll_close should all close this sender but leave the
53         // channel open for other senders
54         tx1.disconnect();
55         drop(tx2);
56         block_on(tx3.close()).unwrap();
57 
58         assert!(tx1.is_closed());
59         assert!(tx3.is_closed());
60         assert!(!tx4.is_closed());
61 
62         block_on(tx4.send(5)).unwrap();
63         assert_eq!(block_on(rx.next()), Some(5));
64 
65         // dropping the final sender will close the channel
66         drop(tx4);
67         assert_eq!(block_on(rx.next()), None);
68     }
69 }
70 
71 #[test]
multiple_senders_close_channel()72 fn multiple_senders_close_channel() {
73     {
74         let (mut tx1, mut rx) = mpsc::channel(1);
75         let mut tx2 = tx1.clone();
76 
77         // close_channel should shut down the whole channel
78         tx1.close_channel();
79 
80         assert!(tx1.is_closed());
81         assert!(tx2.is_closed());
82 
83         let err = block_on(tx2.send(5)).unwrap_err();
84         assert!(err.is_disconnected());
85 
86         assert_eq!(block_on(rx.next()), None);
87     }
88 
89     {
90         let (tx1, mut rx) = mpsc::unbounded();
91         let mut tx2 = tx1.clone();
92 
93         // close_channel should shut down the whole channel
94         tx1.close_channel();
95 
96         assert!(tx1.is_closed());
97         assert!(tx2.is_closed());
98 
99         let err = block_on(tx2.send(5)).unwrap_err();
100         assert!(err.is_disconnected());
101 
102         assert_eq!(block_on(rx.next()), None);
103     }
104 }
105 
106 #[test]
single_receiver_drop_closes_channel_and_drains()107 fn single_receiver_drop_closes_channel_and_drains() {
108     {
109         let ref_count = Arc::new(0);
110         let weak_ref = Arc::downgrade(&ref_count);
111 
112         let (sender, receiver) = mpsc::unbounded();
113         sender.unbounded_send(ref_count).expect("failed to send");
114 
115         // Verify that the sent message is still live.
116         assert!(weak_ref.upgrade().is_some());
117 
118         drop(receiver);
119 
120         // The sender should know the channel is closed.
121         assert!(sender.is_closed());
122 
123         // Verify that the sent message has been dropped.
124         assert!(weak_ref.upgrade().is_none());
125     }
126 
127     {
128         let ref_count = Arc::new(0);
129         let weak_ref = Arc::downgrade(&ref_count);
130 
131         let (mut sender, receiver) = mpsc::channel(1);
132         sender.try_send(ref_count).expect("failed to send");
133 
134         // Verify that the sent message is still live.
135         assert!(weak_ref.upgrade().is_some());
136 
137         drop(receiver);
138 
139         // The sender should know the channel is closed.
140         assert!(sender.is_closed());
141 
142         // Verify that the sent message has been dropped.
143         assert!(weak_ref.upgrade().is_none());
144         assert!(sender.is_closed());
145     }
146 }
147 
148 // Stress test that `try_send()`s occurring concurrently with receiver
149 // close/drops don't appear as successful sends.
150 #[cfg_attr(miri, ignore)] // Miri is too slow
151 #[test]
stress_try_send_as_receiver_closes()152 fn stress_try_send_as_receiver_closes() {
153     const AMT: usize = 10000;
154     // To provide variable timing characteristics (in the hopes of
155     // reproducing the collision that leads to a race), we busy-re-poll
156     // the test MPSC receiver a variable number of times before actually
157     // stopping.  We vary this countdown between 1 and the following
158     // value.
159     const MAX_COUNTDOWN: usize = 20;
160     // When we detect that a successfully sent item is still in the
161     // queue after a disconnect, we spin for up to 100ms to confirm that
162     // it is a persistent condition and not a concurrency illusion.
163     const SPIN_TIMEOUT_S: u64 = 10;
164     const SPIN_SLEEP_MS: u64 = 10;
165     struct TestRx {
166         rx: mpsc::Receiver<Arc<()>>,
167         // The number of times to query `rx` before dropping it.
168         poll_count: usize,
169     }
170     struct TestTask {
171         command_rx: mpsc::Receiver<TestRx>,
172         test_rx: Option<mpsc::Receiver<Arc<()>>>,
173         countdown: usize,
174     }
175     impl TestTask {
176         /// Create a new TestTask
177         fn new() -> (Self, mpsc::Sender<TestRx>) {
178             let (command_tx, command_rx) = mpsc::channel::<TestRx>(0);
179             (
180                 Self {
181                     command_rx,
182                     test_rx: None,
183                     countdown: 0, // 0 means no countdown is in progress.
184                 },
185                 command_tx,
186             )
187         }
188     }
189     impl Future for TestTask {
190         type Output = ();
191 
192         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193             // Poll the test channel, if one is present.
194             if let Some(rx) = &mut self.test_rx {
195                 if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
196                     let _ = v.expect("test finished unexpectedly!");
197                 }
198                 self.countdown -= 1;
199                 // Busy-poll until the countdown is finished.
200                 cx.waker().wake_by_ref();
201             }
202             // Accept any newly submitted MPSC channels for testing.
203             match self.command_rx.poll_next_unpin(cx) {
204                 Poll::Ready(Some(TestRx { rx, poll_count })) => {
205                     self.test_rx = Some(rx);
206                     self.countdown = poll_count;
207                     cx.waker().wake_by_ref();
208                 }
209                 Poll::Ready(None) => return Poll::Ready(()),
210                 Poll::Pending => {}
211             }
212             if self.countdown == 0 {
213                 // Countdown complete -- drop the Receiver.
214                 self.test_rx = None;
215             }
216             Poll::Pending
217         }
218     }
219     let (f, mut cmd_tx) = TestTask::new();
220     let bg = thread::spawn(move || block_on(f));
221     for i in 0..AMT {
222         let (mut test_tx, rx) = mpsc::channel(0);
223         let poll_count = i % MAX_COUNTDOWN;
224         cmd_tx.try_send(TestRx { rx, poll_count }).unwrap();
225         let mut prev_weak: Option<Weak<()>> = None;
226         let mut attempted_sends = 0;
227         let mut successful_sends = 0;
228         loop {
229             // Create a test item.
230             let item = Arc::new(());
231             let weak = Arc::downgrade(&item);
232             match test_tx.try_send(item) {
233                 Ok(_) => {
234                     prev_weak = Some(weak);
235                     successful_sends += 1;
236                 }
237                 Err(ref e) if e.is_full() => {}
238                 Err(ref e) if e.is_disconnected() => {
239                     // Test for evidence of the race condition.
240                     if let Some(prev_weak) = prev_weak {
241                         if prev_weak.upgrade().is_some() {
242                             // The previously sent item is still allocated.
243                             // However, there appears to be some aspect of the
244                             // concurrency that can legitimately cause the Arc
245                             // to be momentarily valid.  Spin for up to 100ms
246                             // waiting for the previously sent item to be
247                             // dropped.
248                             let t0 = Instant::now();
249                             let mut spins = 0;
250                             loop {
251                                 if prev_weak.upgrade().is_none() {
252                                     break;
253                                 }
254                                 assert!(
255                                     t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
256                                     "item not dropped on iteration {} after \
257                                      {} sends ({} successful). spin=({})",
258                                     i,
259                                     attempted_sends,
260                                     successful_sends,
261                                     spins
262                                 );
263                                 spins += 1;
264                                 thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
265                             }
266                         }
267                     }
268                     break;
269                 }
270                 Err(ref e) => panic!("unexpected error: {}", e),
271             }
272             attempted_sends += 1;
273         }
274     }
275     drop(cmd_tx);
276     bg.join().expect("background thread join");
277 }
278 
279 #[test]
unbounded_try_next_after_none()280 fn unbounded_try_next_after_none() {
281     let (tx, mut rx) = mpsc::unbounded::<String>();
282     // Drop the sender, close the channel.
283     drop(tx);
284     // Receive the end of channel.
285     assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
286     // None received, check we can call `try_next` again.
287     assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
288 }
289 
290 #[test]
bounded_try_next_after_none()291 fn bounded_try_next_after_none() {
292     let (tx, mut rx) = mpsc::channel::<String>(17);
293     // Drop the sender, close the channel.
294     drop(tx);
295     // Receive the end of channel.
296     assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
297     // None received, check we can call `try_next` again.
298     assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
299 }
300