1 use futures::channel::{mpsc, oneshot};
2 use futures::executor::block_on;
3 use futures::future::{self, poll_fn, FutureExt};
4 use futures::sink::SinkExt;
5 use futures::stream::StreamExt;
6 use futures::task::{Context, Poll};
7 use futures::{
8 join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join,
9 };
10 use std::mem;
11
12 #[test]
poll_and_pending()13 fn poll_and_pending() {
14 let pending_once = async { pending!() };
15 block_on(async {
16 pin_mut!(pending_once);
17 assert_eq!(Poll::Pending, poll!(&mut pending_once));
18 assert_eq!(Poll::Ready(()), poll!(&mut pending_once));
19 });
20 }
21
22 #[test]
join()23 fn join() {
24 let (tx1, rx1) = oneshot::channel::<i32>();
25 let (tx2, rx2) = oneshot::channel::<i32>();
26
27 let fut = async {
28 let res = join!(rx1, rx2);
29 assert_eq!((Ok(1), Ok(2)), res);
30 };
31
32 block_on(async {
33 pin_mut!(fut);
34 assert_eq!(Poll::Pending, poll!(&mut fut));
35 tx1.send(1).unwrap();
36 assert_eq!(Poll::Pending, poll!(&mut fut));
37 tx2.send(2).unwrap();
38 assert_eq!(Poll::Ready(()), poll!(&mut fut));
39 });
40 }
41
42 #[test]
select()43 fn select() {
44 let (tx1, rx1) = oneshot::channel::<i32>();
45 let (_tx2, rx2) = oneshot::channel::<i32>();
46 tx1.send(1).unwrap();
47 let mut ran = false;
48 block_on(async {
49 select! {
50 res = rx1.fuse() => {
51 assert_eq!(Ok(1), res);
52 ran = true;
53 },
54 _ = rx2.fuse() => unreachable!(),
55 }
56 });
57 assert!(ran);
58 }
59
60 #[test]
select_grammar()61 fn select_grammar() {
62 // Parsing after `=>` using Expr::parse would parse `{}() = future::ready(())`
63 // as one expression.
64 block_on(async {
65 select! {
66 () = future::pending::<()>() => {}
67 () = future::ready(()) => {}
68 }
69 });
70 }
71
72 #[test]
select_biased()73 fn select_biased() {
74 let (tx1, rx1) = oneshot::channel::<i32>();
75 let (_tx2, rx2) = oneshot::channel::<i32>();
76 tx1.send(1).unwrap();
77 let mut ran = false;
78 block_on(async {
79 select_biased! {
80 res = rx1.fuse() => {
81 assert_eq!(Ok(1), res);
82 ran = true;
83 },
84 _ = rx2.fuse() => unreachable!(),
85 }
86 });
87 assert!(ran);
88 }
89
90 #[test]
select_streams()91 fn select_streams() {
92 let (mut tx1, rx1) = mpsc::channel::<i32>(1);
93 let (mut tx2, rx2) = mpsc::channel::<i32>(1);
94 let mut rx1 = rx1.fuse();
95 let mut rx2 = rx2.fuse();
96 let mut ran = false;
97 let mut total = 0;
98 block_on(async {
99 let mut tx1_opt;
100 let mut tx2_opt;
101 select! {
102 _ = rx1.next() => panic!(),
103 _ = rx2.next() => panic!(),
104 default => {
105 tx1.send(2).await.unwrap();
106 tx2.send(3).await.unwrap();
107 tx1_opt = Some(tx1);
108 tx2_opt = Some(tx2);
109 }
110 complete => panic!(),
111 }
112 loop {
113 select! {
114 // runs first and again after default
115 x = rx1.next() => if let Some(x) = x { total += x; },
116 // runs second and again after default
117 x = rx2.next() => if let Some(x) = x { total += x; },
118 // runs third
119 default => {
120 assert_eq!(total, 5);
121 ran = true;
122 drop(tx1_opt.take().unwrap());
123 drop(tx2_opt.take().unwrap());
124 },
125 // runs last
126 complete => break,
127 };
128 }
129 });
130 assert!(ran);
131 }
132
133 #[test]
select_can_move_uncompleted_futures()134 fn select_can_move_uncompleted_futures() {
135 let (tx1, rx1) = oneshot::channel::<i32>();
136 let (tx2, rx2) = oneshot::channel::<i32>();
137 tx1.send(1).unwrap();
138 tx2.send(2).unwrap();
139 let mut ran = false;
140 let mut rx1 = rx1.fuse();
141 let mut rx2 = rx2.fuse();
142 block_on(async {
143 select! {
144 res = rx1 => {
145 assert_eq!(Ok(1), res);
146 assert_eq!(Ok(2), rx2.await);
147 ran = true;
148 },
149 res = rx2 => {
150 assert_eq!(Ok(2), res);
151 assert_eq!(Ok(1), rx1.await);
152 ran = true;
153 },
154 }
155 });
156 assert!(ran);
157 }
158
159 #[test]
select_nested()160 fn select_nested() {
161 let mut outer_fut = future::ready(1);
162 let mut inner_fut = future::ready(2);
163 let res = block_on(async {
164 select! {
165 x = outer_fut => {
166 select! {
167 y = inner_fut => x + y,
168 }
169 }
170 }
171 });
172 assert_eq!(res, 3);
173 }
174
175 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
176 #[test]
select_size()177 fn select_size() {
178 let fut = async {
179 let mut ready = future::ready(0i32);
180 select! {
181 _ = ready => {},
182 }
183 };
184 assert_eq!(mem::size_of_val(&fut), 24);
185
186 let fut = async {
187 let mut ready1 = future::ready(0i32);
188 let mut ready2 = future::ready(0i32);
189 select! {
190 _ = ready1 => {},
191 _ = ready2 => {},
192 }
193 };
194 assert_eq!(mem::size_of_val(&fut), 40);
195 }
196
197 #[test]
select_on_non_unpin_expressions()198 fn select_on_non_unpin_expressions() {
199 // The returned Future is !Unpin
200 let make_non_unpin_fut = || async { 5 };
201
202 let res = block_on(async {
203 let select_res;
204 select! {
205 value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
206 value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
207 };
208 select_res
209 });
210 assert_eq!(res, 5);
211 }
212
213 #[test]
select_on_non_unpin_expressions_with_default()214 fn select_on_non_unpin_expressions_with_default() {
215 // The returned Future is !Unpin
216 let make_non_unpin_fut = || async { 5 };
217
218 let res = block_on(async {
219 let select_res;
220 select! {
221 value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
222 value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
223 default => select_res = 7,
224 };
225 select_res
226 });
227 assert_eq!(res, 5);
228 }
229
230 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
231 #[test]
select_on_non_unpin_size()232 fn select_on_non_unpin_size() {
233 // The returned Future is !Unpin
234 let make_non_unpin_fut = || async { 5 };
235
236 let fut = async {
237 let select_res;
238 select! {
239 value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
240 value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
241 };
242 select_res
243 };
244
245 assert_eq!(32, mem::size_of_val(&fut));
246 }
247
248 #[test]
select_can_be_used_as_expression()249 fn select_can_be_used_as_expression() {
250 block_on(async {
251 let res = select! {
252 x = future::ready(7) => x,
253 y = future::ready(3) => y + 1,
254 };
255 assert!(res == 7 || res == 4);
256 });
257 }
258
259 #[test]
select_with_default_can_be_used_as_expression()260 fn select_with_default_can_be_used_as_expression() {
261 fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> {
262 Poll::Pending
263 }
264
265 block_on(async {
266 let res = select! {
267 x = poll_fn(poll_always_pending::<i32>).fuse() => x,
268 y = poll_fn(poll_always_pending::<i32>).fuse() => y + 1,
269 default => 99,
270 };
271 assert_eq!(res, 99);
272 });
273 }
274
275 #[test]
select_with_complete_can_be_used_as_expression()276 fn select_with_complete_can_be_used_as_expression() {
277 block_on(async {
278 let res = select! {
279 x = future::pending::<i32>() => x,
280 y = future::pending::<i32>() => y + 1,
281 default => 99,
282 complete => 237,
283 };
284 assert_eq!(res, 237);
285 });
286 }
287
288 #[test]
289 #[allow(unused_assignments)]
select_on_mutable_borrowing_future_with_same_borrow_in_block()290 fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
291 async fn require_mutable(_: &mut i32) {}
292 async fn async_noop() {}
293
294 block_on(async {
295 let mut value = 234;
296 select! {
297 _ = require_mutable(&mut value).fuse() => { },
298 _ = async_noop().fuse() => {
299 value += 5;
300 },
301 }
302 });
303 }
304
305 #[test]
306 #[allow(unused_assignments)]
select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default()307 fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
308 async fn require_mutable(_: &mut i32) {}
309 async fn async_noop() {}
310
311 block_on(async {
312 let mut value = 234;
313 select! {
314 _ = require_mutable(&mut value).fuse() => { },
315 _ = async_noop().fuse() => {
316 value += 5;
317 },
318 default => {
319 value += 27;
320 },
321 }
322 });
323 }
324
325 #[test]
326 #[allow(unused_assignments)]
stream_select()327 fn stream_select() {
328 // stream_select! macro
329 block_on(async {
330 let endless_ints = |i| stream::iter(vec![i].into_iter().cycle());
331
332 let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending());
333 assert_eq!(endless_ones.next().await, Some(1));
334 assert_eq!(endless_ones.next().await, Some(1));
335
336 let mut finite_list =
337 stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter()));
338 assert_eq!(finite_list.next().await, Some(1));
339 assert_eq!(finite_list.next().await, Some(1));
340 assert_eq!(finite_list.next().await, None);
341
342 let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
343 // Take 1000, and assert a somewhat even distribution of values.
344 // The fairness is randomized, but over 1000 samples we should be pretty close to even.
345 // This test may be a bit flaky. Feel free to adjust the margins as you see fit.
346 let mut count = 0;
347 let results = endless_mixed
348 .take_while(move |_| {
349 count += 1;
350 let ret = count < 1000;
351 async move { ret }
352 })
353 .collect::<Vec<_>>()
354 .await;
355 assert!(results.iter().filter(|x| **x == 1).count() >= 299);
356 assert!(results.iter().filter(|x| **x == 2).count() >= 299);
357 assert!(results.iter().filter(|x| **x == 3).count() >= 299);
358 });
359 }
360
361 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
362 #[test]
join_size()363 fn join_size() {
364 let fut = async {
365 let ready = future::ready(0i32);
366 join!(ready)
367 };
368 assert_eq!(mem::size_of_val(&fut), 24);
369
370 let fut = async {
371 let ready1 = future::ready(0i32);
372 let ready2 = future::ready(0i32);
373 join!(ready1, ready2)
374 };
375 assert_eq!(mem::size_of_val(&fut), 40);
376 }
377
378 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
379 #[test]
try_join_size()380 fn try_join_size() {
381 let fut = async {
382 let ready = future::ready(Ok::<i32, i32>(0));
383 try_join!(ready)
384 };
385 assert_eq!(mem::size_of_val(&fut), 24);
386
387 let fut = async {
388 let ready1 = future::ready(Ok::<i32, i32>(0));
389 let ready2 = future::ready(Ok::<i32, i32>(0));
390 try_join!(ready1, ready2)
391 };
392 assert_eq!(mem::size_of_val(&fut), 48);
393 }
394
395 #[allow(clippy::let_underscore_future)]
396 #[test]
join_doesnt_require_unpin()397 fn join_doesnt_require_unpin() {
398 let _ = async { join!(async {}, async {}) };
399 }
400
401 #[allow(clippy::let_underscore_future)]
402 #[test]
try_join_doesnt_require_unpin()403 fn try_join_doesnt_require_unpin() {
404 let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) };
405 }
406