1 use async_stream::stream;
2 
3 use futures_core::stream::{FusedStream, Stream};
4 use futures_util::pin_mut;
5 use futures_util::stream::StreamExt;
6 use tokio::sync::mpsc;
7 use tokio_test::assert_ok;
8 
9 #[tokio::test]
noop_stream()10 async fn noop_stream() {
11     let s = stream! {};
12     pin_mut!(s);
13 
14     while s.next().await.is_some() {
15         unreachable!();
16     }
17 }
18 
19 #[tokio::test]
empty_stream()20 async fn empty_stream() {
21     let mut ran = false;
22 
23     {
24         let r = &mut ran;
25         let s = stream! {
26             *r = true;
27             println!("hello world!");
28         };
29         pin_mut!(s);
30 
31         while s.next().await.is_some() {
32             unreachable!();
33         }
34     }
35 
36     assert!(ran);
37 }
38 
39 #[tokio::test]
yield_single_value()40 async fn yield_single_value() {
41     let s = stream! {
42         yield "hello";
43     };
44 
45     let values: Vec<_> = s.collect().await;
46 
47     assert_eq!(1, values.len());
48     assert_eq!("hello", values[0]);
49 }
50 
51 #[tokio::test]
fused()52 async fn fused() {
53     let s = stream! {
54         yield "hello";
55     };
56     pin_mut!(s);
57 
58     assert!(!s.is_terminated());
59     assert_eq!(s.next().await, Some("hello"));
60     assert_eq!(s.next().await, None);
61 
62     assert!(s.is_terminated());
63     // This should return None from now on
64     assert_eq!(s.next().await, None);
65 }
66 
67 #[tokio::test]
yield_multi_value()68 async fn yield_multi_value() {
69     let s = stream! {
70         yield "hello";
71         yield "world";
72         yield "dizzy";
73     };
74 
75     let values: Vec<_> = s.collect().await;
76 
77     assert_eq!(3, values.len());
78     assert_eq!("hello", values[0]);
79     assert_eq!("world", values[1]);
80     assert_eq!("dizzy", values[2]);
81 }
82 
83 #[tokio::test]
unit_yield_in_select()84 async fn unit_yield_in_select() {
85     use tokio::select;
86 
87     async fn do_stuff_async() {}
88 
89     let s = stream! {
90         select! {
91             _ = do_stuff_async() => yield,
92             else => yield,
93         }
94     };
95 
96     let values: Vec<_> = s.collect().await;
97     assert_eq!(values.len(), 1);
98 }
99 
100 #[tokio::test]
yield_with_select()101 async fn yield_with_select() {
102     use tokio::select;
103 
104     async fn do_stuff_async() {}
105     async fn more_async_work() {}
106 
107     let s = stream! {
108         select! {
109             _ = do_stuff_async() => yield "hey",
110             _ = more_async_work() => yield "hey",
111             else => yield "hey",
112         }
113     };
114 
115     let values: Vec<_> = s.collect().await;
116     assert_eq!(values, vec!["hey"]);
117 }
118 
119 #[tokio::test]
return_stream()120 async fn return_stream() {
121     fn build_stream() -> impl Stream<Item = u32> {
122         stream! {
123             yield 1;
124             yield 2;
125             yield 3;
126         }
127     }
128 
129     let s = build_stream();
130 
131     let values: Vec<_> = s.collect().await;
132     assert_eq!(3, values.len());
133     assert_eq!(1, values[0]);
134     assert_eq!(2, values[1]);
135     assert_eq!(3, values[2]);
136 }
137 
138 #[tokio::test]
consume_channel()139 async fn consume_channel() {
140     let (tx, mut rx) = mpsc::channel(10);
141 
142     let s = stream! {
143         while let Some(v) = rx.recv().await {
144             yield v;
145         }
146     };
147 
148     pin_mut!(s);
149 
150     for i in 0..3 {
151         assert_ok!(tx.send(i).await);
152         assert_eq!(Some(i), s.next().await);
153     }
154 
155     drop(tx);
156     assert_eq!(None, s.next().await);
157 }
158 
159 #[tokio::test]
borrow_self()160 async fn borrow_self() {
161     struct Data(String);
162 
163     impl Data {
164         fn stream<'a>(&'a self) -> impl Stream<Item = &str> + 'a {
165             stream! {
166                 yield &self.0[..];
167             }
168         }
169     }
170 
171     let data = Data("hello".to_string());
172     let s = data.stream();
173     pin_mut!(s);
174 
175     assert_eq!(Some("hello"), s.next().await);
176 }
177 
178 #[tokio::test]
stream_in_stream()179 async fn stream_in_stream() {
180     let s = stream! {
181         let s = stream! {
182             for i in 0..3 {
183                 yield i;
184             }
185         };
186 
187         pin_mut!(s);
188         while let Some(v) = s.next().await {
189             yield v;
190         }
191     };
192 
193     let values: Vec<_> = s.collect().await;
194     assert_eq!(3, values.len());
195 }
196 
197 #[tokio::test]
yield_non_unpin_value()198 async fn yield_non_unpin_value() {
199     let s: Vec<_> = stream! {
200         for i in 0..3 {
201             yield async move { i };
202         }
203     }
204     .buffered(1)
205     .collect()
206     .await;
207 
208     assert_eq!(s, vec![0, 1, 2]);
209 }
210 
211 #[test]
inner_try_stream()212 fn inner_try_stream() {
213     use async_stream::try_stream;
214     use tokio::select;
215 
216     async fn do_stuff_async() {}
217 
218     let _ = stream! {
219         select! {
220             _ = do_stuff_async() => {
221                 let another_s = try_stream! {
222                     yield;
223                 };
224                 let _: Result<(), ()> = Box::pin(another_s).next().await.unwrap();
225             },
226             else => {},
227         }
228         yield
229     };
230 }
231 
232 #[rustversion::attr(not(stable), ignore)]
233 #[test]
test()234 fn test() {
235     let t = trybuild::TestCases::new();
236     t.compile_fail("tests/ui/*.rs");
237 }
238