1 use crate::Stream; 2 3 use core::fmt; 4 use core::future::Future; 5 use core::pin::Pin; 6 use core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Stream for the [`then`](super::StreamExt::then) method. 11 #[must_use = "streams do nothing unless polled"] 12 pub struct Then<St, Fut, F> { 13 #[pin] 14 stream: St, 15 #[pin] 16 future: Option<Fut>, 17 f: F, 18 } 19 } 20 21 impl<St, Fut, F> fmt::Debug for Then<St, Fut, F> 22 where 23 St: fmt::Debug, 24 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 26 f.debug_struct("Then") 27 .field("stream", &self.stream) 28 .finish() 29 } 30 } 31 32 impl<St, Fut, F> Then<St, Fut, F> { new(stream: St, f: F) -> Self33 pub(super) fn new(stream: St, f: F) -> Self { 34 Then { 35 stream, 36 future: None, 37 f, 38 } 39 } 40 } 41 42 impl<St, F, Fut> Stream for Then<St, Fut, F> 43 where 44 St: Stream, 45 Fut: Future, 46 F: FnMut(St::Item) -> Fut, 47 { 48 type Item = Fut::Output; 49 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>>50 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> { 51 let mut me = self.project(); 52 53 loop { 54 if let Some(future) = me.future.as_mut().as_pin_mut() { 55 match future.poll(cx) { 56 Poll::Ready(item) => { 57 me.future.set(None); 58 return Poll::Ready(Some(item)); 59 } 60 Poll::Pending => return Poll::Pending, 61 } 62 } 63 64 match me.stream.as_mut().poll_next(cx) { 65 Poll::Ready(Some(item)) => { 66 me.future.set(Some((me.f)(item))); 67 } 68 Poll::Ready(None) => return Poll::Ready(None), 69 Poll::Pending => return Poll::Pending, 70 } 71 } 72 } 73 size_hint(&self) -> (usize, Option<usize>)74 fn size_hint(&self) -> (usize, Option<usize>) { 75 let future_len = usize::from(self.future.is_some()); 76 let (lower, upper) = self.stream.size_hint(); 77 78 let lower = lower.saturating_add(future_len); 79 let upper = upper.and_then(|upper| upper.checked_add(future_len)); 80 81 (lower, upper) 82 } 83 } 84