1 use async_stream::stream;
2
3 use futures_core::stream::{FusedStream, Stream};
4 use futures_util::pin_mut;
5 use futures_util::stream::StreamExt;
6 use tokio::sync::mpsc;
7 use tokio_test::assert_ok;
8
9 #[tokio::test]
noop_stream()10 async fn noop_stream() {
11 let s = stream! {};
12 pin_mut!(s);
13
14 while s.next().await.is_some() {
15 unreachable!();
16 }
17 }
18
19 #[tokio::test]
empty_stream()20 async fn empty_stream() {
21 let mut ran = false;
22
23 {
24 let r = &mut ran;
25 let s = stream! {
26 *r = true;
27 println!("hello world!");
28 };
29 pin_mut!(s);
30
31 while s.next().await.is_some() {
32 unreachable!();
33 }
34 }
35
36 assert!(ran);
37 }
38
39 #[tokio::test]
yield_single_value()40 async fn yield_single_value() {
41 let s = stream! {
42 yield "hello";
43 };
44
45 let values: Vec<_> = s.collect().await;
46
47 assert_eq!(1, values.len());
48 assert_eq!("hello", values[0]);
49 }
50
51 #[tokio::test]
fused()52 async fn fused() {
53 let s = stream! {
54 yield "hello";
55 };
56 pin_mut!(s);
57
58 assert!(!s.is_terminated());
59 assert_eq!(s.next().await, Some("hello"));
60 assert_eq!(s.next().await, None);
61
62 assert!(s.is_terminated());
63 // This should return None from now on
64 assert_eq!(s.next().await, None);
65 }
66
67 #[tokio::test]
yield_multi_value()68 async fn yield_multi_value() {
69 let s = stream! {
70 yield "hello";
71 yield "world";
72 yield "dizzy";
73 };
74
75 let values: Vec<_> = s.collect().await;
76
77 assert_eq!(3, values.len());
78 assert_eq!("hello", values[0]);
79 assert_eq!("world", values[1]);
80 assert_eq!("dizzy", values[2]);
81 }
82
83 #[tokio::test]
unit_yield_in_select()84 async fn unit_yield_in_select() {
85 use tokio::select;
86
87 async fn do_stuff_async() {}
88
89 let s = stream! {
90 select! {
91 _ = do_stuff_async() => yield,
92 else => yield,
93 }
94 };
95
96 let values: Vec<_> = s.collect().await;
97 assert_eq!(values.len(), 1);
98 }
99
100 #[tokio::test]
yield_with_select()101 async fn yield_with_select() {
102 use tokio::select;
103
104 async fn do_stuff_async() {}
105 async fn more_async_work() {}
106
107 let s = stream! {
108 select! {
109 _ = do_stuff_async() => yield "hey",
110 _ = more_async_work() => yield "hey",
111 else => yield "hey",
112 }
113 };
114
115 let values: Vec<_> = s.collect().await;
116 assert_eq!(values, vec!["hey"]);
117 }
118
119 #[tokio::test]
return_stream()120 async fn return_stream() {
121 fn build_stream() -> impl Stream<Item = u32> {
122 stream! {
123 yield 1;
124 yield 2;
125 yield 3;
126 }
127 }
128
129 let s = build_stream();
130
131 let values: Vec<_> = s.collect().await;
132 assert_eq!(3, values.len());
133 assert_eq!(1, values[0]);
134 assert_eq!(2, values[1]);
135 assert_eq!(3, values[2]);
136 }
137
138 #[tokio::test]
consume_channel()139 async fn consume_channel() {
140 let (tx, mut rx) = mpsc::channel(10);
141
142 let s = stream! {
143 while let Some(v) = rx.recv().await {
144 yield v;
145 }
146 };
147
148 pin_mut!(s);
149
150 for i in 0..3 {
151 assert_ok!(tx.send(i).await);
152 assert_eq!(Some(i), s.next().await);
153 }
154
155 drop(tx);
156 assert_eq!(None, s.next().await);
157 }
158
159 #[tokio::test]
borrow_self()160 async fn borrow_self() {
161 struct Data(String);
162
163 impl Data {
164 fn stream<'a>(&'a self) -> impl Stream<Item = &str> + 'a {
165 stream! {
166 yield &self.0[..];
167 }
168 }
169 }
170
171 let data = Data("hello".to_string());
172 let s = data.stream();
173 pin_mut!(s);
174
175 assert_eq!(Some("hello"), s.next().await);
176 }
177
178 #[tokio::test]
stream_in_stream()179 async fn stream_in_stream() {
180 let s = stream! {
181 let s = stream! {
182 for i in 0..3 {
183 yield i;
184 }
185 };
186
187 pin_mut!(s);
188 while let Some(v) = s.next().await {
189 yield v;
190 }
191 };
192
193 let values: Vec<_> = s.collect().await;
194 assert_eq!(3, values.len());
195 }
196
197 #[tokio::test]
yield_non_unpin_value()198 async fn yield_non_unpin_value() {
199 let s: Vec<_> = stream! {
200 for i in 0..3 {
201 yield async move { i };
202 }
203 }
204 .buffered(1)
205 .collect()
206 .await;
207
208 assert_eq!(s, vec![0, 1, 2]);
209 }
210
211 #[test]
inner_try_stream()212 fn inner_try_stream() {
213 use async_stream::try_stream;
214 use tokio::select;
215
216 async fn do_stuff_async() {}
217
218 let _ = stream! {
219 select! {
220 _ = do_stuff_async() => {
221 let another_s = try_stream! {
222 yield;
223 };
224 let _: Result<(), ()> = Box::pin(another_s).next().await.unwrap();
225 },
226 else => {},
227 }
228 yield
229 };
230 }
231
232 #[rustversion::attr(not(stable), ignore)]
233 #[test]
test()234 fn test() {
235 let t = trybuild::TestCases::new();
236 t.compile_fail("tests/ui/*.rs");
237 }
238