1 //! Future types
2 
3 use super::{Policy, Retry};
4 use futures_core::ready;
5 use pin_project_lite::pin_project;
6 use std::future::Future;
7 use std::pin::Pin;
8 use std::task::{Context, Poll};
9 use tower_service::Service;
10 
11 pin_project! {
12     /// The [`Future`] returned by a [`Retry`] service.
13     #[derive(Debug)]
14     pub struct ResponseFuture<P, S, Request>
15     where
16         P: Policy<Request, S::Response, S::Error>,
17         S: Service<Request>,
18     {
19         request: Option<Request>,
20         #[pin]
21         retry: Retry<P, S>,
22         #[pin]
23         state: State<S::Future, P::Future>,
24     }
25 }
26 
27 pin_project! {
28     #[project = StateProj]
29     #[derive(Debug)]
30     enum State<F, P> {
31         // Polling the future from [`Service::call`]
32         Called {
33             #[pin]
34             future: F
35         },
36         // Polling the future from [`Policy::retry`]
37         Checking {
38             #[pin]
39             checking: P
40         },
41         // Polling [`Service::poll_ready`] after [`Checking`] was OK.
42         Retrying,
43     }
44 }
45 
46 impl<P, S, Request> ResponseFuture<P, S, Request>
47 where
48     P: Policy<Request, S::Response, S::Error>,
49     S: Service<Request>,
50 {
new( request: Option<Request>, retry: Retry<P, S>, future: S::Future, ) -> ResponseFuture<P, S, Request>51     pub(crate) fn new(
52         request: Option<Request>,
53         retry: Retry<P, S>,
54         future: S::Future,
55     ) -> ResponseFuture<P, S, Request> {
56         ResponseFuture {
57             request,
58             retry,
59             state: State::Called { future },
60         }
61     }
62 }
63 
64 impl<P, S, Request> Future for ResponseFuture<P, S, Request>
65 where
66     P: Policy<Request, S::Response, S::Error> + Clone,
67     S: Service<Request> + Clone,
68 {
69     type Output = Result<S::Response, S::Error>;
70 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>71     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72         let mut this = self.project();
73 
74         loop {
75             match this.state.as_mut().project() {
76                 StateProj::Called { future } => {
77                     let result = ready!(future.poll(cx));
78                     if let Some(ref req) = this.request {
79                         match this.retry.policy.retry(req, result.as_ref()) {
80                             Some(checking) => {
81                                 this.state.set(State::Checking { checking });
82                             }
83                             None => return Poll::Ready(result),
84                         }
85                     } else {
86                         // request wasn't cloned, so no way to retry it
87                         return Poll::Ready(result);
88                     }
89                 }
90                 StateProj::Checking { checking } => {
91                     this.retry
92                         .as_mut()
93                         .project()
94                         .policy
95                         .set(ready!(checking.poll(cx)));
96                     this.state.set(State::Retrying);
97                 }
98                 StateProj::Retrying => {
99                     // NOTE: we assume here that
100                     //
101                     //   this.retry.poll_ready()
102                     //
103                     // is equivalent to
104                     //
105                     //   this.retry.service.poll_ready()
106                     //
107                     // we need to make that assumption to avoid adding an Unpin bound to the Policy
108                     // in Ready to make it Unpin so that we can get &mut Ready as needed to call
109                     // poll_ready on it.
110                     ready!(this.retry.as_mut().project().service.poll_ready(cx))?;
111                     let req = this
112                         .request
113                         .take()
114                         .expect("retrying requires cloned request");
115                     *this.request = this.retry.policy.clone_request(&req);
116                     this.state.set(State::Called {
117                         future: this.retry.as_mut().project().service.call(req),
118                     });
119                 }
120             }
121         }
122     }
123 }
124