1 use crate::Stream; 2 3 use core::future::Future; 4 use core::marker::PhantomPinned; 5 use core::pin::Pin; 6 use core::task::{ready, Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Future for the [`all`](super::StreamExt::all) method. 11 #[derive(Debug)] 12 #[must_use = "futures do nothing unless you `.await` or poll them"] 13 pub struct AllFuture<'a, St: ?Sized, F> { 14 stream: &'a mut St, 15 f: F, 16 // Make this future `!Unpin` for compatibility with async trait methods. 17 #[pin] 18 _pin: PhantomPinned, 19 } 20 } 21 22 impl<'a, St: ?Sized, F> AllFuture<'a, St, F> { new(stream: &'a mut St, f: F) -> Self23 pub(super) fn new(stream: &'a mut St, f: F) -> Self { 24 Self { 25 stream, 26 f, 27 _pin: PhantomPinned, 28 } 29 } 30 } 31 32 impl<St, F> Future for AllFuture<'_, St, F> 33 where 34 St: ?Sized + Stream + Unpin, 35 F: FnMut(St::Item) -> bool, 36 { 37 type Output = bool; 38 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>39 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 40 let me = self.project(); 41 let mut stream = Pin::new(me.stream); 42 43 // Take a maximum of 32 items from the stream before yielding. 44 for _ in 0..32 { 45 match ready!(stream.as_mut().poll_next(cx)) { 46 Some(v) => { 47 if !(me.f)(v) { 48 return Poll::Ready(false); 49 } 50 } 51 None => return Poll::Ready(true), 52 } 53 } 54 55 cx.waker().wake_by_ref(); 56 Poll::Pending 57 } 58 } 59