1 use std::cell::Cell;
2 use std::iter;
3 use std::pin::Pin;
4 use std::rc::Rc;
5 use std::sync::Arc;
6 use std::task::Context;
7 
8 use futures::channel::mpsc;
9 use futures::executor::block_on;
10 use futures::future::{self, Future};
11 use futures::lock::Mutex;
12 use futures::sink::SinkExt;
13 use futures::stream::{self, StreamExt};
14 use futures::task::Poll;
15 use futures::{ready, FutureExt};
16 use futures_core::Stream;
17 use futures_executor::ThreadPool;
18 use futures_test::task::noop_context;
19 
20 #[test]
select()21 fn select() {
22     fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) {
23         let a = stream::iter(a);
24         let b = stream::iter(b);
25         let vec = block_on(stream::select(a, b).collect::<Vec<_>>());
26         assert_eq!(vec, expected);
27     }
28 
29     select_and_compare(vec![1, 2, 3], vec![4, 5, 6], vec![1, 4, 2, 5, 3, 6]);
30     select_and_compare(vec![1, 2, 3], vec![4, 5], vec![1, 4, 2, 5, 3]);
31     select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]);
32 }
33 
34 #[test]
flat_map()35 fn flat_map() {
36     block_on(async {
37         let st =
38             stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(0..=2)]);
39 
40         let values: Vec<_> =
41             st.flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))).collect().await;
42 
43         assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]);
44     });
45 }
46 
47 #[test]
scan()48 fn scan() {
49     block_on(async {
50         let values = stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
51             .scan(1, |state, e| {
52                 *state += 1;
53                 futures::future::ready(if e < *state { Some(e) } else { None })
54             })
55             .collect::<Vec<_>>()
56             .await;
57 
58         assert_eq!(values, vec![1u8, 2, 3, 4]);
59     });
60 }
61 
62 #[test]
flatten_unordered()63 fn flatten_unordered() {
64     use futures::executor::block_on;
65     use futures::stream::*;
66     use futures::task::*;
67     use std::convert::identity;
68     use std::pin::Pin;
69     use std::sync::atomic::{AtomicBool, Ordering};
70     use std::thread;
71     use std::time::Duration;
72 
73     struct DataStream {
74         data: Vec<u8>,
75         polled: bool,
76         wake_immediately: bool,
77     }
78 
79     impl Stream for DataStream {
80         type Item = u8;
81 
82         fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83             if !self.polled {
84                 if !self.wake_immediately {
85                     let waker = ctx.waker().clone();
86                     let sleep_time =
87                         Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10);
88                     thread::spawn(move || {
89                         thread::sleep(sleep_time);
90                         waker.wake_by_ref();
91                     });
92                 } else {
93                     ctx.waker().wake_by_ref();
94                 }
95                 self.polled = true;
96                 Poll::Pending
97             } else {
98                 self.polled = false;
99                 Poll::Ready(self.data.pop())
100             }
101         }
102     }
103 
104     struct Interchanger {
105         polled: bool,
106         base: u8,
107         wake_immediately: bool,
108     }
109 
110     impl Stream for Interchanger {
111         type Item = DataStream;
112 
113         fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
114             if !self.polled {
115                 self.polled = true;
116                 if !self.wake_immediately {
117                     let waker = ctx.waker().clone();
118                     let sleep_time = Duration::from_millis(self.base as u64);
119                     thread::spawn(move || {
120                         thread::sleep(sleep_time);
121                         waker.wake_by_ref();
122                     });
123                 } else {
124                     ctx.waker().wake_by_ref();
125                 }
126                 Poll::Pending
127             } else {
128                 let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect();
129                 self.base += 1;
130                 self.polled = false;
131                 Poll::Ready(Some(DataStream {
132                     polled: false,
133                     data,
134                     wake_immediately: self.wake_immediately && self.base % 2 == 0,
135                 }))
136             }
137         }
138     }
139 
140     // basic behaviour
141     {
142         block_on(async {
143             let st = stream::iter(vec![
144                 stream::iter(0..=4u8),
145                 stream::iter(6..=10),
146                 stream::iter(10..=12),
147             ]);
148 
149             let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;
150 
151             assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
152         });
153 
154         block_on(async {
155             let st = stream::iter(vec![
156                 stream::iter(0..=4u8),
157                 stream::iter(6..=10),
158                 stream::iter(0..=2),
159             ]);
160 
161             let mut fm_unordered = st
162                 .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
163                 .collect::<Vec<_>>()
164                 .await;
165 
166             fm_unordered.sort_unstable();
167 
168             assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
169         });
170     }
171 
172     // wake up immediately
173     {
174         block_on(async {
175             let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
176                 .take(10)
177                 .map(|s| s.map(identity))
178                 .flatten_unordered(10)
179                 .collect::<Vec<_>>()
180                 .await;
181 
182             fl_unordered.sort_unstable();
183 
184             assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
185         });
186 
187         block_on(async {
188             let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
189                 .take(10)
190                 .flat_map_unordered(10, |s| s.map(identity))
191                 .collect::<Vec<_>>()
192                 .await;
193 
194             fm_unordered.sort_unstable();
195 
196             assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
197         });
198     }
199 
200     // wake up after delay
201     {
202         block_on(async {
203             let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
204                 .take(10)
205                 .map(|s| s.map(identity))
206                 .flatten_unordered(10)
207                 .collect::<Vec<_>>()
208                 .await;
209 
210             fl_unordered.sort_unstable();
211 
212             assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
213         });
214 
215         block_on(async {
216             let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
217                 .take(10)
218                 .flat_map_unordered(10, |s| s.map(identity))
219                 .collect::<Vec<_>>()
220                 .await;
221 
222             fm_unordered.sort_unstable();
223 
224             assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
225         });
226 
227         block_on(async {
228             let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
229                 Interchanger { polled: false, base: 0, wake_immediately: false }
230                     .take(10)
231                     .flat_map_unordered(10, |s| s.map(identity))
232                     .collect::<Vec<_>>(),
233                 Interchanger { polled: false, base: 0, wake_immediately: false }
234                     .take(10)
235                     .map(|s| s.map(identity))
236                     .flatten_unordered(10)
237                     .collect::<Vec<_>>()
238             );
239 
240             fm_unordered.sort_unstable();
241             fl_unordered.sort_unstable();
242 
243             assert_eq!(fm_unordered, fl_unordered);
244             assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
245         });
246     }
247 
248     // waker panics
249     {
250         let stream = Arc::new(Mutex::new(
251             Interchanger { polled: false, base: 0, wake_immediately: true }
252                 .take(10)
253                 .flat_map_unordered(10, |s| s.map(identity)),
254         ));
255 
256         struct PanicWaker;
257 
258         impl ArcWake for PanicWaker {
259             fn wake_by_ref(_arc_self: &Arc<Self>) {
260                 panic!("WAKE UP");
261             }
262         }
263 
264         std::thread::spawn({
265             let stream = stream.clone();
266             move || {
267                 let mut st = poll_fn(|cx| {
268                     let mut lock = ready!(stream.lock().poll_unpin(cx));
269 
270                     let panic_waker = waker(Arc::new(PanicWaker));
271                     let mut panic_cx = Context::from_waker(&panic_waker);
272                     let _ = ready!(lock.poll_next_unpin(&mut panic_cx));
273 
274                     Poll::Ready(Some(()))
275                 });
276 
277                 block_on(st.next())
278             }
279         })
280         .join()
281         .unwrap_err();
282 
283         block_on(async move {
284             let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
285             values.sort_unstable();
286 
287             assert_eq!(values, (0..60).collect::<Vec<u8>>());
288         });
289     }
290 
291     // stream panics
292     {
293         let st = stream::iter(iter::once(
294             once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(),
295         ))
296         .chain(
297             Interchanger { polled: false, base: 0, wake_immediately: true }
298                 .map(|stream| stream.right_stream())
299                 .take(10),
300         );
301 
302         let stream = Arc::new(Mutex::new(st.flatten_unordered(10)));
303 
304         std::thread::spawn({
305             let stream = stream.clone();
306             move || {
307                 let mut st = poll_fn(|cx| {
308                     let mut lock = ready!(stream.lock().poll_unpin(cx));
309                     let data = ready!(lock.poll_next_unpin(cx));
310 
311                     Poll::Ready(data)
312                 });
313 
314                 block_on(st.next())
315             }
316         })
317         .join()
318         .unwrap_err();
319 
320         block_on(async move {
321             let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
322             values.sort_unstable();
323 
324             assert_eq!(values, (0..60).collect::<Vec<u8>>());
325         });
326     }
327 
328     fn timeout<I: Clone>(time: Duration, value: I) -> impl Future<Output = I> {
329         let ready = Arc::new(AtomicBool::new(false));
330         let mut spawned = false;
331 
332         future::poll_fn(move |cx| {
333             if !spawned {
334                 let waker = cx.waker().clone();
335                 let ready = ready.clone();
336 
337                 std::thread::spawn(move || {
338                     std::thread::sleep(time);
339                     ready.store(true, Ordering::Release);
340 
341                     waker.wake_by_ref()
342                 });
343                 spawned = true;
344             }
345 
346             if ready.load(Ordering::Acquire) {
347                 Poll::Ready(value.clone())
348             } else {
349                 Poll::Pending
350             }
351         })
352     }
353 
354     fn build_nested_fu<S: Stream + Unpin>(st: S) -> impl Stream<Item = S::Item> + Unpin
355     where
356         S::Item: Clone,
357     {
358         let inner = st
359             .then(|item| timeout(Duration::from_millis(50), item))
360             .enumerate()
361             .map(|(idx, value)| {
362                 stream::once(if idx % 2 == 0 {
363                     future::ready(value).left_future()
364                 } else {
365                     timeout(Duration::from_millis(100), value).right_future()
366                 })
367             })
368             .flatten_unordered(None);
369 
370         stream::once(future::ready(inner)).flatten_unordered(None)
371     }
372 
373     // nested `flatten_unordered`
374     let te = ThreadPool::new().unwrap();
375     let base_handle = te
376         .spawn_with_handle(async move {
377             let fu = build_nested_fu(stream::iter(1..=10));
378 
379             assert_eq!(fu.count().await, 10);
380         })
381         .unwrap();
382 
383     block_on(base_handle);
384 
385     let empty_state_move_handle = te
386         .spawn_with_handle(async move {
387             let mut fu = build_nested_fu(stream::iter(1..10));
388             {
389                 let mut cx = noop_context();
390                 let _ = fu.poll_next_unpin(&mut cx);
391                 let _ = fu.poll_next_unpin(&mut cx);
392             }
393 
394             assert_eq!(fu.count().await, 9);
395         })
396         .unwrap();
397 
398     block_on(empty_state_move_handle);
399 }
400 
401 #[test]
take_until()402 fn take_until() {
403     fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
404         let mut i = 0;
405         future::poll_fn(move |_cx| {
406             i += 1;
407             if i <= stop_on {
408                 Poll::Pending
409             } else {
410                 Poll::Ready(())
411             }
412         })
413     }
414 
415     block_on(async {
416         // Verify stopping works:
417         let stream = stream::iter(1u32..=10);
418         let stop_fut = make_stop_fut(5);
419 
420         let stream = stream.take_until(stop_fut);
421         let last = stream.fold(0, |_, i| async move { i }).await;
422         assert_eq!(last, 5);
423 
424         // Verify take_future() works:
425         let stream = stream::iter(1..=10);
426         let stop_fut = make_stop_fut(5);
427 
428         let mut stream = stream.take_until(stop_fut);
429 
430         assert_eq!(stream.next().await, Some(1));
431         assert_eq!(stream.next().await, Some(2));
432 
433         stream.take_future();
434 
435         let last = stream.fold(0, |_, i| async move { i }).await;
436         assert_eq!(last, 10);
437 
438         // Verify take_future() returns None if stream is stopped:
439         let stream = stream::iter(1u32..=10);
440         let stop_fut = make_stop_fut(1);
441         let mut stream = stream.take_until(stop_fut);
442         assert_eq!(stream.next().await, Some(1));
443         assert_eq!(stream.next().await, None);
444         assert!(stream.take_future().is_none());
445 
446         // Verify TakeUntil is fused:
447         let mut i = 0;
448         let stream = stream::poll_fn(move |_cx| {
449             i += 1;
450             match i {
451                 1 => Poll::Ready(Some(1)),
452                 2 => Poll::Ready(None),
453                 _ => panic!("TakeUntil not fused"),
454             }
455         });
456 
457         let stop_fut = make_stop_fut(1);
458         let mut stream = stream.take_until(stop_fut);
459         assert_eq!(stream.next().await, Some(1));
460         assert_eq!(stream.next().await, None);
461         assert_eq!(stream.next().await, None);
462     });
463 }
464 
465 #[test]
466 #[should_panic]
chunks_panic_on_cap_zero()467 fn chunks_panic_on_cap_zero() {
468     let (_, rx1) = mpsc::channel::<()>(1);
469 
470     let _ = rx1.chunks(0);
471 }
472 
473 #[test]
474 #[should_panic]
ready_chunks_panic_on_cap_zero()475 fn ready_chunks_panic_on_cap_zero() {
476     let (_, rx1) = mpsc::channel::<()>(1);
477 
478     let _ = rx1.ready_chunks(0);
479 }
480 
481 #[test]
ready_chunks()482 fn ready_chunks() {
483     let (mut tx, rx1) = mpsc::channel::<i32>(16);
484 
485     let mut s = rx1.ready_chunks(2);
486 
487     let mut cx = noop_context();
488     assert!(s.next().poll_unpin(&mut cx).is_pending());
489 
490     block_on(async {
491         tx.send(1).await.unwrap();
492 
493         assert_eq!(s.next().await.unwrap(), vec![1]);
494         tx.send(2).await.unwrap();
495         tx.send(3).await.unwrap();
496         tx.send(4).await.unwrap();
497         assert_eq!(s.next().await.unwrap(), vec![2, 3]);
498         assert_eq!(s.next().await.unwrap(), vec![4]);
499     });
500 }
501 
502 struct SlowStream {
503     times_should_poll: usize,
504     times_polled: Rc<Cell<usize>>,
505 }
506 impl Stream for SlowStream {
507     type Item = usize;
508 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>509     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
510         self.times_polled.set(self.times_polled.get() + 1);
511         if self.times_polled.get() % 2 == 0 {
512             cx.waker().wake_by_ref();
513             return Poll::Pending;
514         }
515         if self.times_polled.get() >= self.times_should_poll {
516             return Poll::Ready(None);
517         }
518         Poll::Ready(Some(self.times_polled.get()))
519     }
520 }
521 
522 #[test]
select_with_strategy_doesnt_terminate_early()523 fn select_with_strategy_doesnt_terminate_early() {
524     for side in [stream::PollNext::Left, stream::PollNext::Right] {
525         let times_should_poll = 10;
526         let count = Rc::new(Cell::new(0));
527         let b = stream::iter([10, 20]);
528 
529         let mut selected = stream::select_with_strategy(
530             SlowStream { times_should_poll, times_polled: count.clone() },
531             b,
532             |_: &mut ()| side,
533         );
534         block_on(async move { while selected.next().await.is_some() {} });
535         assert_eq!(count.get(), times_should_poll + 1);
536     }
537 }
538 
is_even(number: u8) -> bool539 async fn is_even(number: u8) -> bool {
540     number % 2 == 0
541 }
542 
543 #[test]
all()544 fn all() {
545     block_on(async {
546         let empty: [u8; 0] = [];
547         let st = stream::iter(empty);
548         let all = st.all(is_even).await;
549         assert!(all);
550 
551         let st = stream::iter([2, 4, 6, 8]);
552         let all = st.all(is_even).await;
553         assert!(all);
554 
555         let st = stream::iter([2, 3, 4]);
556         let all = st.all(is_even).await;
557         assert!(!all);
558     });
559 }
560 
561 #[test]
any()562 fn any() {
563     block_on(async {
564         let empty: [u8; 0] = [];
565         let st = stream::iter(empty);
566         let any = st.any(is_even).await;
567         assert!(!any);
568 
569         let st = stream::iter([1, 2, 3]);
570         let any = st.any(is_even).await;
571         assert!(any);
572 
573         let st = stream::iter([1, 3, 5]);
574         let any = st.any(is_even).await;
575         assert!(!any);
576     });
577 }
578