1 use futures::channel::{mpsc, oneshot};
2 use futures::executor::block_on;
3 use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt};
4 use futures::never::Never;
5 use futures::ready;
6 use futures::sink::{self, Sink, SinkErrInto, SinkExt};
7 use futures::stream::{self, Stream, StreamExt};
8 use futures::task::{self, ArcWake, Context, Poll, Waker};
9 use futures_test::task::panic_context;
10 use std::cell::{Cell, RefCell};
11 use std::collections::VecDeque;
12 use std::fmt;
13 use std::mem;
14 use std::pin::Pin;
15 use std::rc::Rc;
16 use std::sync::atomic::{AtomicBool, Ordering};
17 use std::sync::Arc;
18
sassert_next<S>(s: &mut S, item: S::Item) where S: Stream + Unpin, S::Item: Eq + fmt::Debug,19 fn sassert_next<S>(s: &mut S, item: S::Item)
20 where
21 S: Stream + Unpin,
22 S::Item: Eq + fmt::Debug,
23 {
24 match s.poll_next_unpin(&mut panic_context()) {
25 Poll::Ready(None) => panic!("stream is at its end"),
26 Poll::Ready(Some(e)) => assert_eq!(e, item),
27 Poll::Pending => panic!("stream wasn't ready"),
28 }
29 }
30
unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T31 fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
32 match x {
33 Poll::Ready(Ok(x)) => x,
34 Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
35 Poll::Pending => panic!("Poll::Pending"),
36 }
37 }
38
39 // An Unpark struct that records unpark events for inspection
40 struct Flag(AtomicBool);
41
42 impl Flag {
new() -> Arc<Self>43 fn new() -> Arc<Self> {
44 Arc::new(Self(AtomicBool::new(false)))
45 }
46
take(&self) -> bool47 fn take(&self) -> bool {
48 self.0.swap(false, Ordering::SeqCst)
49 }
50
set(&self, v: bool)51 fn set(&self, v: bool) {
52 self.0.store(v, Ordering::SeqCst)
53 }
54 }
55
56 impl ArcWake for Flag {
wake_by_ref(arc_self: &Arc<Self>)57 fn wake_by_ref(arc_self: &Arc<Self>) {
58 arc_self.set(true)
59 }
60 }
61
flag_cx<F, R>(f: F) -> R where F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,62 fn flag_cx<F, R>(f: F) -> R
63 where
64 F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
65 {
66 let flag = Flag::new();
67 let waker = task::waker_ref(&flag);
68 let cx = &mut Context::from_waker(&waker);
69 f(flag.clone(), cx)
70 }
71
72 // Sends a value on an i32 channel sink
73 struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
74
75 impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
new(sink: S, item: Item) -> Self76 fn new(sink: S, item: Item) -> Self {
77 Self(Some(sink), Some(item))
78 }
79 }
80
81 impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
82 type Output = Result<S, S::Error>;
83
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>84 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
85 let Self(inner, item) = self.get_mut();
86 {
87 let mut inner = inner.as_mut().unwrap();
88 ready!(Pin::new(&mut inner).poll_ready(cx))?;
89 Pin::new(&mut inner).start_send(item.take().unwrap())?;
90 }
91 Poll::Ready(Ok(inner.take().unwrap()))
92 }
93 }
94
95 // Immediately accepts all requests to start pushing, but completion is managed
96 // by manually flushing
97 struct ManualFlush<T: Unpin> {
98 data: Vec<T>,
99 waiting_tasks: Vec<Waker>,
100 }
101
102 impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
103 type Error = ();
104
poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>105 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106 Poll::Ready(Ok(()))
107 }
108
start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error>109 fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
110 if let Some(item) = item {
111 self.data.push(item);
112 } else {
113 self.force_flush();
114 }
115 Ok(())
116 }
117
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>118 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119 if self.data.is_empty() {
120 Poll::Ready(Ok(()))
121 } else {
122 self.waiting_tasks.push(cx.waker().clone());
123 Poll::Pending
124 }
125 }
126
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>127 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
128 self.poll_flush(cx)
129 }
130 }
131
132 impl<T: Unpin> ManualFlush<T> {
new() -> Self133 fn new() -> Self {
134 Self { data: Vec::new(), waiting_tasks: Vec::new() }
135 }
136
force_flush(&mut self) -> Vec<T>137 fn force_flush(&mut self) -> Vec<T> {
138 for task in self.waiting_tasks.drain(..) {
139 task.wake()
140 }
141 mem::take(&mut self.data)
142 }
143 }
144
145 struct ManualAllow<T: Unpin> {
146 data: Vec<T>,
147 allow: Rc<Allow>,
148 }
149
150 struct Allow {
151 flag: Cell<bool>,
152 tasks: RefCell<Vec<Waker>>,
153 }
154
155 impl Allow {
new() -> Self156 fn new() -> Self {
157 Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) }
158 }
159
check(&self, cx: &mut Context<'_>) -> bool160 fn check(&self, cx: &mut Context<'_>) -> bool {
161 if self.flag.get() {
162 true
163 } else {
164 self.tasks.borrow_mut().push(cx.waker().clone());
165 false
166 }
167 }
168
start(&self)169 fn start(&self) {
170 self.flag.set(true);
171 let mut tasks = self.tasks.borrow_mut();
172 for task in tasks.drain(..) {
173 task.wake();
174 }
175 }
176 }
177
178 impl<T: Unpin> Sink<T> for ManualAllow<T> {
179 type Error = ();
180
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>181 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182 if self.allow.check(cx) {
183 Poll::Ready(Ok(()))
184 } else {
185 Poll::Pending
186 }
187 }
188
start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error>189 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
190 self.data.push(item);
191 Ok(())
192 }
193
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>194 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195 Poll::Ready(Ok(()))
196 }
197
poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>198 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199 Poll::Ready(Ok(()))
200 }
201 }
202
manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>)203 fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
204 let allow = Rc::new(Allow::new());
205 let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() };
206 (manual_allow, allow)
207 }
208
209 #[test]
either_sink()210 fn either_sink() {
211 let mut s =
212 if true { Vec::<i32>::new().left_sink() } else { VecDeque::<i32>::new().right_sink() };
213
214 Pin::new(&mut s).start_send(0).unwrap();
215 }
216
217 #[test]
vec_sink()218 fn vec_sink() {
219 let mut v = Vec::new();
220 Pin::new(&mut v).start_send(0).unwrap();
221 Pin::new(&mut v).start_send(1).unwrap();
222 assert_eq!(v, vec![0, 1]);
223 block_on(v.flush()).unwrap();
224 assert_eq!(v, vec![0, 1]);
225 }
226
227 #[test]
vecdeque_sink()228 fn vecdeque_sink() {
229 let mut deque = VecDeque::new();
230 Pin::new(&mut deque).start_send(2).unwrap();
231 Pin::new(&mut deque).start_send(3).unwrap();
232
233 assert_eq!(deque.pop_front(), Some(2));
234 assert_eq!(deque.pop_front(), Some(3));
235 assert_eq!(deque.pop_front(), None);
236 }
237
238 #[test]
send()239 fn send() {
240 let mut v = Vec::new();
241
242 block_on(v.send(0)).unwrap();
243 assert_eq!(v, vec![0]);
244
245 block_on(v.send(1)).unwrap();
246 assert_eq!(v, vec![0, 1]);
247
248 block_on(v.send(2)).unwrap();
249 assert_eq!(v, vec![0, 1, 2]);
250 }
251
252 #[test]
send_all()253 fn send_all() {
254 let mut v = Vec::new();
255
256 block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap();
257 assert_eq!(v, vec![0, 1]);
258
259 block_on(v.send_all(&mut stream::iter(vec![2, 3]).map(Ok))).unwrap();
260 assert_eq!(v, vec![0, 1, 2, 3]);
261
262 block_on(v.send_all(&mut stream::iter(vec![4, 5]).map(Ok))).unwrap();
263 assert_eq!(v, vec![0, 1, 2, 3, 4, 5]);
264 }
265
266 // Test that `start_send` on an `mpsc` channel does indeed block when the
267 // channel is full
268 #[test]
mpsc_blocking_start_send()269 fn mpsc_blocking_start_send() {
270 let (mut tx, mut rx) = mpsc::channel::<i32>(0);
271
272 block_on(future::lazy(|_| {
273 tx.start_send(0).unwrap();
274
275 flag_cx(|flag, cx| {
276 let mut task = StartSendFut::new(tx, 1);
277
278 assert!(task.poll_unpin(cx).is_pending());
279 assert!(!flag.take());
280 sassert_next(&mut rx, 0);
281 assert!(flag.take());
282 unwrap(task.poll_unpin(cx));
283 assert!(!flag.take());
284 sassert_next(&mut rx, 1);
285 })
286 }));
287 }
288
289 // test `flush` by using `with` to make the first insertion into a sink block
290 // until a oneshot is completed
291 #[test]
with_flush()292 fn with_flush() {
293 let (tx, rx) = oneshot::channel();
294 let mut block = rx.boxed();
295 let mut sink = Vec::new().with(|elem| {
296 mem::replace(&mut block, future::ok(()).boxed())
297 .map_ok(move |()| elem + 1)
298 .map_err(|_| -> Never { panic!() })
299 });
300
301 assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(()));
302
303 flag_cx(|flag, cx| {
304 let mut task = sink.flush();
305 assert!(task.poll_unpin(cx).is_pending());
306 tx.send(()).unwrap();
307 assert!(flag.take());
308
309 unwrap(task.poll_unpin(cx));
310
311 block_on(sink.send(1)).unwrap();
312 assert_eq!(sink.get_ref(), &[1, 2]);
313 })
314 }
315
316 // test simple use of with to change data
317 #[test]
with_as_map()318 fn with_as_map() {
319 let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2));
320 block_on(sink.send(0)).unwrap();
321 block_on(sink.send(1)).unwrap();
322 block_on(sink.send(2)).unwrap();
323 assert_eq!(sink.get_ref(), &[0, 2, 4]);
324 }
325
326 // test simple use of with_flat_map
327 #[test]
with_flat_map()328 fn with_flat_map() {
329 let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok));
330 block_on(sink.send(0)).unwrap();
331 block_on(sink.send(1)).unwrap();
332 block_on(sink.send(2)).unwrap();
333 block_on(sink.send(3)).unwrap();
334 assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]);
335 }
336
337 // Check that `with` propagates `poll_ready` to the inner sink.
338 // Regression test for the issue #1834.
339 #[test]
with_propagates_poll_ready()340 fn with_propagates_poll_ready() {
341 let (tx, mut rx) = mpsc::channel::<i32>(0);
342 let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10));
343
344 block_on(future::lazy(|_| {
345 flag_cx(|flag, cx| {
346 let mut tx = Pin::new(&mut tx);
347
348 // Should be ready for the first item.
349 assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
350 assert_eq!(tx.as_mut().start_send(0), Ok(()));
351
352 // Should be ready for the second item only after the first one is received.
353 assert_eq!(tx.as_mut().poll_ready(cx), Poll::Pending);
354 assert!(!flag.take());
355 sassert_next(&mut rx, 10);
356 assert!(flag.take());
357 assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
358 assert_eq!(tx.as_mut().start_send(1), Ok(()));
359 })
360 }));
361 }
362
363 // test that the `with` sink doesn't require the underlying sink to flush,
364 // but doesn't claim to be flushed until the underlying sink is
365 #[test]
with_flush_propagate()366 fn with_flush_propagate() {
367 let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>);
368 flag_cx(|flag, cx| {
369 unwrap(Pin::new(&mut sink).poll_ready(cx));
370 Pin::new(&mut sink).start_send(Some(0)).unwrap();
371 unwrap(Pin::new(&mut sink).poll_ready(cx));
372 Pin::new(&mut sink).start_send(Some(1)).unwrap();
373
374 {
375 let mut task = sink.flush();
376 assert!(task.poll_unpin(cx).is_pending());
377 assert!(!flag.take());
378 }
379 assert_eq!(sink.get_mut().force_flush(), vec![0, 1]);
380 assert!(flag.take());
381 unwrap(sink.flush().poll_unpin(cx));
382 })
383 }
384
385 // test that `Clone` is implemented on `with` sinks
386 #[test]
with_implements_clone()387 fn with_implements_clone() {
388 let (mut tx, rx) = mpsc::channel(5);
389
390 {
391 let mut is_positive = tx.clone().with(|item| future::ok::<bool, mpsc::SendError>(item > 0));
392
393 let mut is_long =
394 tx.clone().with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5));
395
396 block_on(is_positive.clone().send(-1)).unwrap();
397 block_on(is_long.clone().send("123456")).unwrap();
398 block_on(is_long.send("123")).unwrap();
399 block_on(is_positive.send(1)).unwrap();
400 }
401
402 block_on(tx.send(false)).unwrap();
403
404 block_on(tx.close()).unwrap();
405
406 assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![false, true, false, true, false]);
407 }
408
409 // test that a buffer is a no-nop around a sink that always accepts sends
410 #[test]
buffer_noop()411 fn buffer_noop() {
412 let mut sink = Vec::new().buffer(0);
413 block_on(sink.send(0)).unwrap();
414 block_on(sink.send(1)).unwrap();
415 assert_eq!(sink.get_ref(), &[0, 1]);
416
417 let mut sink = Vec::new().buffer(1);
418 block_on(sink.send(0)).unwrap();
419 block_on(sink.send(1)).unwrap();
420 assert_eq!(sink.get_ref(), &[0, 1]);
421 }
422
423 // test basic buffer functionality, including both filling up to capacity,
424 // and writing out when the underlying sink is ready
425 #[test]
buffer()426 fn buffer() {
427 let (sink, allow) = manual_allow::<i32>();
428 let sink = sink.buffer(2);
429
430 let sink = block_on(StartSendFut::new(sink, 0)).unwrap();
431 let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap();
432
433 flag_cx(|flag, cx| {
434 let mut task = sink.send(2);
435 assert!(task.poll_unpin(cx).is_pending());
436 assert!(!flag.take());
437 allow.start();
438 assert!(flag.take());
439 unwrap(task.poll_unpin(cx));
440 assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
441 })
442 }
443
444 #[test]
fanout_smoke()445 fn fanout_smoke() {
446 let sink1 = Vec::new();
447 let sink2 = Vec::new();
448 let mut sink = sink1.fanout(sink2);
449 block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]).map(Ok))).unwrap();
450 let (sink1, sink2) = sink.into_inner();
451 assert_eq!(sink1, vec![1, 2, 3]);
452 assert_eq!(sink2, vec![1, 2, 3]);
453 }
454
455 #[test]
fanout_backpressure()456 fn fanout_backpressure() {
457 let (left_send, mut left_recv) = mpsc::channel(0);
458 let (right_send, mut right_recv) = mpsc::channel(0);
459 let sink = left_send.fanout(right_send);
460
461 let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap();
462
463 flag_cx(|flag, cx| {
464 let mut task = sink.send(2);
465 assert!(!flag.take());
466 assert!(task.poll_unpin(cx).is_pending());
467 assert_eq!(block_on(left_recv.next()), Some(0));
468 assert!(flag.take());
469 assert!(task.poll_unpin(cx).is_pending());
470 assert_eq!(block_on(right_recv.next()), Some(0));
471 assert!(flag.take());
472
473 assert!(task.poll_unpin(cx).is_pending());
474 assert_eq!(block_on(left_recv.next()), Some(2));
475 assert!(flag.take());
476 assert!(task.poll_unpin(cx).is_pending());
477 assert_eq!(block_on(right_recv.next()), Some(2));
478 assert!(flag.take());
479
480 unwrap(task.poll_unpin(cx));
481 // make sure receivers live until end of test to prevent send errors
482 drop(left_recv);
483 drop(right_recv);
484 })
485 }
486
487 #[test]
sink_map_err()488 fn sink_map_err() {
489 {
490 let cx = &mut panic_context();
491 let (tx, _rx) = mpsc::channel(1);
492 let mut tx = tx.sink_map_err(|_| ());
493 assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
494 assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
495 }
496
497 let tx = mpsc::channel(0).0;
498 assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(()));
499 }
500
501 #[test]
sink_unfold()502 fn sink_unfold() {
503 block_on(poll_fn(|cx| {
504 let (tx, mut rx) = mpsc::channel(1);
505 let unfold = sink::unfold((), |(), i: i32| {
506 let mut tx = tx.clone();
507 async move {
508 tx.send(i).await.unwrap();
509 Ok::<_, String>(())
510 }
511 });
512 futures::pin_mut!(unfold);
513 assert_eq!(unfold.as_mut().start_send(1), Ok(()));
514 assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
515 assert_eq!(rx.try_next().unwrap(), Some(1));
516
517 assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
518 assert_eq!(unfold.as_mut().start_send(2), Ok(()));
519 assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
520 assert_eq!(unfold.as_mut().start_send(3), Ok(()));
521 assert_eq!(rx.try_next().unwrap(), Some(2));
522 assert!(rx.try_next().is_err());
523 assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
524 assert_eq!(unfold.as_mut().start_send(4), Ok(()));
525 assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full
526 assert_eq!(rx.try_next().unwrap(), Some(3));
527 assert_eq!(rx.try_next().unwrap(), Some(4));
528
529 Poll::Ready(())
530 }))
531 }
532
533 #[test]
err_into()534 fn err_into() {
535 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
536 struct ErrIntoTest;
537
538 impl From<mpsc::SendError> for ErrIntoTest {
539 fn from(_: mpsc::SendError) -> Self {
540 Self
541 }
542 }
543
544 {
545 let cx = &mut panic_context();
546 let (tx, _rx) = mpsc::channel(1);
547 let mut tx: SinkErrInto<mpsc::Sender<()>, _, ErrIntoTest> = tx.sink_err_into();
548 assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
549 assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
550 }
551
552 let tx = mpsc::channel(0).0;
553 assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest));
554 }
555