1 use futures::channel::{mpsc, oneshot};
2 use futures::executor::block_on;
3 use futures::future::{self, poll_fn, FutureExt};
4 use futures::sink::SinkExt;
5 use futures::stream::StreamExt;
6 use futures::task::{Context, Poll};
7 use futures::{
8     join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join,
9 };
10 use std::mem;
11 
12 #[test]
poll_and_pending()13 fn poll_and_pending() {
14     let pending_once = async { pending!() };
15     block_on(async {
16         pin_mut!(pending_once);
17         assert_eq!(Poll::Pending, poll!(&mut pending_once));
18         assert_eq!(Poll::Ready(()), poll!(&mut pending_once));
19     });
20 }
21 
22 #[test]
join()23 fn join() {
24     let (tx1, rx1) = oneshot::channel::<i32>();
25     let (tx2, rx2) = oneshot::channel::<i32>();
26 
27     let fut = async {
28         let res = join!(rx1, rx2);
29         assert_eq!((Ok(1), Ok(2)), res);
30     };
31 
32     block_on(async {
33         pin_mut!(fut);
34         assert_eq!(Poll::Pending, poll!(&mut fut));
35         tx1.send(1).unwrap();
36         assert_eq!(Poll::Pending, poll!(&mut fut));
37         tx2.send(2).unwrap();
38         assert_eq!(Poll::Ready(()), poll!(&mut fut));
39     });
40 }
41 
42 #[test]
select()43 fn select() {
44     let (tx1, rx1) = oneshot::channel::<i32>();
45     let (_tx2, rx2) = oneshot::channel::<i32>();
46     tx1.send(1).unwrap();
47     let mut ran = false;
48     block_on(async {
49         select! {
50             res = rx1.fuse() => {
51                 assert_eq!(Ok(1), res);
52                 ran = true;
53             },
54             _ = rx2.fuse() => unreachable!(),
55         }
56     });
57     assert!(ran);
58 }
59 
60 #[test]
select_grammar()61 fn select_grammar() {
62     // Parsing after `=>` using Expr::parse would parse `{}() = future::ready(())`
63     // as one expression.
64     block_on(async {
65         select! {
66             () = future::pending::<()>() => {}
67             () = future::ready(()) => {}
68         }
69     });
70 }
71 
72 #[test]
select_biased()73 fn select_biased() {
74     let (tx1, rx1) = oneshot::channel::<i32>();
75     let (_tx2, rx2) = oneshot::channel::<i32>();
76     tx1.send(1).unwrap();
77     let mut ran = false;
78     block_on(async {
79         select_biased! {
80             res = rx1.fuse() => {
81                 assert_eq!(Ok(1), res);
82                 ran = true;
83             },
84             _ = rx2.fuse() => unreachable!(),
85         }
86     });
87     assert!(ran);
88 }
89 
90 #[test]
select_streams()91 fn select_streams() {
92     let (mut tx1, rx1) = mpsc::channel::<i32>(1);
93     let (mut tx2, rx2) = mpsc::channel::<i32>(1);
94     let mut rx1 = rx1.fuse();
95     let mut rx2 = rx2.fuse();
96     let mut ran = false;
97     let mut total = 0;
98     block_on(async {
99         let mut tx1_opt;
100         let mut tx2_opt;
101         select! {
102             _ = rx1.next() => panic!(),
103             _ = rx2.next() => panic!(),
104             default => {
105                 tx1.send(2).await.unwrap();
106                 tx2.send(3).await.unwrap();
107                 tx1_opt = Some(tx1);
108                 tx2_opt = Some(tx2);
109             }
110             complete => panic!(),
111         }
112         loop {
113             select! {
114                 // runs first and again after default
115                 x = rx1.next() => if let Some(x) = x { total += x; },
116                 // runs second and again after default
117                 x = rx2.next()  => if let Some(x) = x { total += x; },
118                 // runs third
119                 default => {
120                     assert_eq!(total, 5);
121                     ran = true;
122                     drop(tx1_opt.take().unwrap());
123                     drop(tx2_opt.take().unwrap());
124                 },
125                 // runs last
126                 complete => break,
127             };
128         }
129     });
130     assert!(ran);
131 }
132 
133 #[test]
select_can_move_uncompleted_futures()134 fn select_can_move_uncompleted_futures() {
135     let (tx1, rx1) = oneshot::channel::<i32>();
136     let (tx2, rx2) = oneshot::channel::<i32>();
137     tx1.send(1).unwrap();
138     tx2.send(2).unwrap();
139     let mut ran = false;
140     let mut rx1 = rx1.fuse();
141     let mut rx2 = rx2.fuse();
142     block_on(async {
143         select! {
144             res = rx1 => {
145                 assert_eq!(Ok(1), res);
146                 assert_eq!(Ok(2), rx2.await);
147                 ran = true;
148             },
149             res = rx2 => {
150                 assert_eq!(Ok(2), res);
151                 assert_eq!(Ok(1), rx1.await);
152                 ran = true;
153             },
154         }
155     });
156     assert!(ran);
157 }
158 
159 #[test]
select_nested()160 fn select_nested() {
161     let mut outer_fut = future::ready(1);
162     let mut inner_fut = future::ready(2);
163     let res = block_on(async {
164         select! {
165             x = outer_fut => {
166                 select! {
167                     y = inner_fut => x + y,
168                 }
169             }
170         }
171     });
172     assert_eq!(res, 3);
173 }
174 
175 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
176 #[test]
select_size()177 fn select_size() {
178     let fut = async {
179         let mut ready = future::ready(0i32);
180         select! {
181             _ = ready => {},
182         }
183     };
184     assert_eq!(mem::size_of_val(&fut), 24);
185 
186     let fut = async {
187         let mut ready1 = future::ready(0i32);
188         let mut ready2 = future::ready(0i32);
189         select! {
190             _ = ready1 => {},
191             _ = ready2 => {},
192         }
193     };
194     assert_eq!(mem::size_of_val(&fut), 40);
195 }
196 
197 #[test]
select_on_non_unpin_expressions()198 fn select_on_non_unpin_expressions() {
199     // The returned Future is !Unpin
200     let make_non_unpin_fut = || async { 5 };
201 
202     let res = block_on(async {
203         let select_res;
204         select! {
205             value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
206             value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
207         };
208         select_res
209     });
210     assert_eq!(res, 5);
211 }
212 
213 #[test]
select_on_non_unpin_expressions_with_default()214 fn select_on_non_unpin_expressions_with_default() {
215     // The returned Future is !Unpin
216     let make_non_unpin_fut = || async { 5 };
217 
218     let res = block_on(async {
219         let select_res;
220         select! {
221             value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
222             value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
223             default => select_res = 7,
224         };
225         select_res
226     });
227     assert_eq!(res, 5);
228 }
229 
230 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
231 #[test]
select_on_non_unpin_size()232 fn select_on_non_unpin_size() {
233     // The returned Future is !Unpin
234     let make_non_unpin_fut = || async { 5 };
235 
236     let fut = async {
237         let select_res;
238         select! {
239             value_1 = make_non_unpin_fut().fuse() => select_res = value_1,
240             value_2 = make_non_unpin_fut().fuse() => select_res = value_2,
241         };
242         select_res
243     };
244 
245     assert_eq!(32, mem::size_of_val(&fut));
246 }
247 
248 #[test]
select_can_be_used_as_expression()249 fn select_can_be_used_as_expression() {
250     block_on(async {
251         let res = select! {
252             x = future::ready(7) => x,
253             y = future::ready(3) => y + 1,
254         };
255         assert!(res == 7 || res == 4);
256     });
257 }
258 
259 #[test]
select_with_default_can_be_used_as_expression()260 fn select_with_default_can_be_used_as_expression() {
261     fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> {
262         Poll::Pending
263     }
264 
265     block_on(async {
266         let res = select! {
267             x = poll_fn(poll_always_pending::<i32>).fuse() => x,
268             y = poll_fn(poll_always_pending::<i32>).fuse() => y + 1,
269             default => 99,
270         };
271         assert_eq!(res, 99);
272     });
273 }
274 
275 #[test]
select_with_complete_can_be_used_as_expression()276 fn select_with_complete_can_be_used_as_expression() {
277     block_on(async {
278         let res = select! {
279             x = future::pending::<i32>() => x,
280             y = future::pending::<i32>() => y + 1,
281             default => 99,
282             complete => 237,
283         };
284         assert_eq!(res, 237);
285     });
286 }
287 
288 #[test]
289 #[allow(unused_assignments)]
select_on_mutable_borrowing_future_with_same_borrow_in_block()290 fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
291     async fn require_mutable(_: &mut i32) {}
292     async fn async_noop() {}
293 
294     block_on(async {
295         let mut value = 234;
296         select! {
297             _ = require_mutable(&mut value).fuse() => { },
298             _ = async_noop().fuse() => {
299                 value += 5;
300             },
301         }
302     });
303 }
304 
305 #[test]
306 #[allow(unused_assignments)]
select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default()307 fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
308     async fn require_mutable(_: &mut i32) {}
309     async fn async_noop() {}
310 
311     block_on(async {
312         let mut value = 234;
313         select! {
314             _ = require_mutable(&mut value).fuse() => { },
315             _ = async_noop().fuse() => {
316                 value += 5;
317             },
318             default => {
319                 value += 27;
320             },
321         }
322     });
323 }
324 
325 #[test]
326 #[allow(unused_assignments)]
stream_select()327 fn stream_select() {
328     // stream_select! macro
329     block_on(async {
330         let endless_ints = |i| stream::iter(vec![i].into_iter().cycle());
331 
332         let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending());
333         assert_eq!(endless_ones.next().await, Some(1));
334         assert_eq!(endless_ones.next().await, Some(1));
335 
336         let mut finite_list =
337             stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter()));
338         assert_eq!(finite_list.next().await, Some(1));
339         assert_eq!(finite_list.next().await, Some(1));
340         assert_eq!(finite_list.next().await, None);
341 
342         let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
343         // Take 1000, and assert a somewhat even distribution of values.
344         // The fairness is randomized, but over 1000 samples we should be pretty close to even.
345         // This test may be a bit flaky. Feel free to adjust the margins as you see fit.
346         let mut count = 0;
347         let results = endless_mixed
348             .take_while(move |_| {
349                 count += 1;
350                 let ret = count < 1000;
351                 async move { ret }
352             })
353             .collect::<Vec<_>>()
354             .await;
355         assert!(results.iter().filter(|x| **x == 1).count() >= 299);
356         assert!(results.iter().filter(|x| **x == 2).count() >= 299);
357         assert!(results.iter().filter(|x| **x == 3).count() >= 299);
358     });
359 }
360 
361 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
362 #[test]
join_size()363 fn join_size() {
364     let fut = async {
365         let ready = future::ready(0i32);
366         join!(ready)
367     };
368     assert_eq!(mem::size_of_val(&fut), 24);
369 
370     let fut = async {
371         let ready1 = future::ready(0i32);
372         let ready2 = future::ready(0i32);
373         join!(ready1, ready2)
374     };
375     assert_eq!(mem::size_of_val(&fut), 40);
376 }
377 
378 #[cfg_attr(not(target_pointer_width = "64"), ignore)]
379 #[test]
try_join_size()380 fn try_join_size() {
381     let fut = async {
382         let ready = future::ready(Ok::<i32, i32>(0));
383         try_join!(ready)
384     };
385     assert_eq!(mem::size_of_val(&fut), 24);
386 
387     let fut = async {
388         let ready1 = future::ready(Ok::<i32, i32>(0));
389         let ready2 = future::ready(Ok::<i32, i32>(0));
390         try_join!(ready1, ready2)
391     };
392     assert_eq!(mem::size_of_val(&fut), 48);
393 }
394 
395 #[allow(clippy::let_underscore_future)]
396 #[test]
join_doesnt_require_unpin()397 fn join_doesnt_require_unpin() {
398     let _ = async { join!(async {}, async {}) };
399 }
400 
401 #[allow(clippy::let_underscore_future)]
402 #[test]
try_join_doesnt_require_unpin()403 fn try_join_doesnt_require_unpin() {
404     let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) };
405 }
406