1 use crate::Stream;
2 
3 use core::future::Future;
4 use core::marker::PhantomPinned;
5 use core::pin::Pin;
6 use core::task::{ready, Context, Poll};
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Future for the [`all`](super::StreamExt::all) method.
11     #[derive(Debug)]
12     #[must_use = "futures do nothing unless you `.await` or poll them"]
13     pub struct AllFuture<'a, St: ?Sized, F> {
14         stream: &'a mut St,
15         f: F,
16         // Make this future `!Unpin` for compatibility with async trait methods.
17         #[pin]
18         _pin: PhantomPinned,
19     }
20 }
21 
22 impl<'a, St: ?Sized, F> AllFuture<'a, St, F> {
new(stream: &'a mut St, f: F) -> Self23     pub(super) fn new(stream: &'a mut St, f: F) -> Self {
24         Self {
25             stream,
26             f,
27             _pin: PhantomPinned,
28         }
29     }
30 }
31 
32 impl<St, F> Future for AllFuture<'_, St, F>
33 where
34     St: ?Sized + Stream + Unpin,
35     F: FnMut(St::Item) -> bool,
36 {
37     type Output = bool;
38 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>39     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
40         let me = self.project();
41         let mut stream = Pin::new(me.stream);
42 
43         // Take a maximum of 32 items from the stream before yielding.
44         for _ in 0..32 {
45             match ready!(stream.as_mut().poll_next(cx)) {
46                 Some(v) => {
47                     if !(me.f)(v) {
48                         return Poll::Ready(false);
49                     }
50                 }
51                 None => return Poll::Ready(true),
52             }
53         }
54 
55         cx.waker().wake_by_ref();
56         Poll::Pending
57     }
58 }
59