1 use crate::Stream;
2 
3 use core::fmt;
4 use core::future::Future;
5 use core::pin::Pin;
6 use core::task::{Context, Poll};
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Stream for the [`then`](super::StreamExt::then) method.
11     #[must_use = "streams do nothing unless polled"]
12     pub struct Then<St, Fut, F> {
13         #[pin]
14         stream: St,
15         #[pin]
16         future: Option<Fut>,
17         f: F,
18     }
19 }
20 
21 impl<St, Fut, F> fmt::Debug for Then<St, Fut, F>
22 where
23     St: fmt::Debug,
24 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result25     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26         f.debug_struct("Then")
27             .field("stream", &self.stream)
28             .finish()
29     }
30 }
31 
32 impl<St, Fut, F> Then<St, Fut, F> {
new(stream: St, f: F) -> Self33     pub(super) fn new(stream: St, f: F) -> Self {
34         Then {
35             stream,
36             future: None,
37             f,
38         }
39     }
40 }
41 
42 impl<St, F, Fut> Stream for Then<St, Fut, F>
43 where
44     St: Stream,
45     Fut: Future,
46     F: FnMut(St::Item) -> Fut,
47 {
48     type Item = Fut::Output;
49 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>>50     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> {
51         let mut me = self.project();
52 
53         loop {
54             if let Some(future) = me.future.as_mut().as_pin_mut() {
55                 match future.poll(cx) {
56                     Poll::Ready(item) => {
57                         me.future.set(None);
58                         return Poll::Ready(Some(item));
59                     }
60                     Poll::Pending => return Poll::Pending,
61                 }
62             }
63 
64             match me.stream.as_mut().poll_next(cx) {
65                 Poll::Ready(Some(item)) => {
66                     me.future.set(Some((me.f)(item)));
67                 }
68                 Poll::Ready(None) => return Poll::Ready(None),
69                 Poll::Pending => return Poll::Pending,
70             }
71         }
72     }
73 
size_hint(&self) -> (usize, Option<usize>)74     fn size_hint(&self) -> (usize, Option<usize>) {
75         let future_len = usize::from(self.future.is_some());
76         let (lower, upper) = self.stream.size_hint();
77 
78         let lower = lower.saturating_add(future_len);
79         let upper = upper.and_then(|upper| upper.checked_add(future_len));
80 
81         (lower, upper)
82     }
83 }
84