1 use tokio_stream::{self as stream, Stream, StreamExt};
2 use tokio_test::{assert_pending, assert_ready, task};
3 
4 mod support {
5     pub(crate) mod mpsc;
6 }
7 
8 use support::mpsc;
9 use tokio_stream::adapters::Chain;
10 
11 #[tokio::test]
basic_usage()12 async fn basic_usage() {
13     let one = stream::iter(vec![1, 2, 3]);
14     let two = stream::iter(vec![4, 5, 6]);
15 
16     let mut stream = visibility_test(one, two);
17 
18     assert_eq!(stream.size_hint(), (6, Some(6)));
19     assert_eq!(stream.next().await, Some(1));
20 
21     assert_eq!(stream.size_hint(), (5, Some(5)));
22     assert_eq!(stream.next().await, Some(2));
23 
24     assert_eq!(stream.size_hint(), (4, Some(4)));
25     assert_eq!(stream.next().await, Some(3));
26 
27     assert_eq!(stream.size_hint(), (3, Some(3)));
28     assert_eq!(stream.next().await, Some(4));
29 
30     assert_eq!(stream.size_hint(), (2, Some(2)));
31     assert_eq!(stream.next().await, Some(5));
32 
33     assert_eq!(stream.size_hint(), (1, Some(1)));
34     assert_eq!(stream.next().await, Some(6));
35 
36     assert_eq!(stream.size_hint(), (0, Some(0)));
37     assert_eq!(stream.next().await, None);
38 
39     assert_eq!(stream.size_hint(), (0, Some(0)));
40     assert_eq!(stream.next().await, None);
41 }
42 
visibility_test<I, S1, S2>(s1: S1, s2: S2) -> Chain<S1, S2> where S1: Stream<Item = I>, S2: Stream<Item = I>,43 fn visibility_test<I, S1, S2>(s1: S1, s2: S2) -> Chain<S1, S2>
44 where
45     S1: Stream<Item = I>,
46     S2: Stream<Item = I>,
47 {
48     s1.chain(s2)
49 }
50 
51 #[tokio::test]
pending_first()52 async fn pending_first() {
53     let (tx1, rx1) = mpsc::unbounded_channel_stream();
54     let (tx2, rx2) = mpsc::unbounded_channel_stream();
55 
56     let mut stream = task::spawn(rx1.chain(rx2));
57     assert_eq!(stream.size_hint(), (0, None));
58 
59     assert_pending!(stream.poll_next());
60 
61     tx2.send(2).unwrap();
62     assert!(!stream.is_woken());
63 
64     assert_pending!(stream.poll_next());
65 
66     tx1.send(1).unwrap();
67     assert!(stream.is_woken());
68     assert_eq!(Some(1), assert_ready!(stream.poll_next()));
69 
70     assert_pending!(stream.poll_next());
71 
72     drop(tx1);
73 
74     assert_eq!(stream.size_hint(), (0, None));
75 
76     assert!(stream.is_woken());
77     assert_eq!(Some(2), assert_ready!(stream.poll_next()));
78 
79     assert_eq!(stream.size_hint(), (0, None));
80 
81     drop(tx2);
82 
83     assert_eq!(stream.size_hint(), (0, None));
84     assert_eq!(None, assert_ready!(stream.poll_next()));
85 }
86 
87 #[test]
size_overflow()88 fn size_overflow() {
89     struct Monster;
90 
91     impl tokio_stream::Stream for Monster {
92         type Item = ();
93         fn poll_next(
94             self: std::pin::Pin<&mut Self>,
95             _cx: &mut std::task::Context<'_>,
96         ) -> std::task::Poll<Option<()>> {
97             panic!()
98         }
99 
100         fn size_hint(&self) -> (usize, Option<usize>) {
101             (usize::MAX, Some(usize::MAX))
102         }
103     }
104 
105     let m1 = Monster;
106     let m2 = Monster;
107     let m = m1.chain(m2);
108     assert_eq!(m.size_hint(), (usize::MAX, None));
109 }
110