1 use super::future::ResponseFuture;
2 use crate::{util::ServiceExt, BoxError};
3 use futures_core::ready;
4 use futures_util::future::TryFutureExt;
5 use std::{
6     future::Future,
7     pin::Pin,
8     task::{Context, Poll},
9 };
10 use tower_service::Service;
11 use tracing::Instrument;
12 
13 /// Spawns tasks to drive an inner service to readiness.
14 ///
15 /// See crate level documentation for more details.
16 #[derive(Debug)]
17 pub struct SpawnReady<S> {
18     inner: Inner<S>,
19 }
20 
21 #[derive(Debug)]
22 enum Inner<S> {
23     Service(Option<S>),
24     Future(tokio::task::JoinHandle<Result<S, BoxError>>),
25 }
26 
27 impl<S> SpawnReady<S> {
28     /// Creates a new [`SpawnReady`] wrapping `service`.
new(service: S) -> Self29     pub fn new(service: S) -> Self {
30         Self {
31             inner: Inner::Service(Some(service)),
32         }
33     }
34 }
35 
36 impl<S> Drop for SpawnReady<S> {
drop(&mut self)37     fn drop(&mut self) {
38         if let Inner::Future(ref mut task) = self.inner {
39             task.abort();
40         }
41     }
42 }
43 
44 impl<S, Req> Service<Req> for SpawnReady<S>
45 where
46     Req: 'static,
47     S: Service<Req> + Send + 'static,
48     S::Error: Into<BoxError>,
49 {
50     type Response = S::Response;
51     type Error = BoxError;
52     type Future = ResponseFuture<S::Future, S::Error>;
53 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>>54     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
55         loop {
56             self.inner = match self.inner {
57                 Inner::Service(ref mut svc) => {
58                     if let Poll::Ready(r) = svc.as_mut().expect("illegal state").poll_ready(cx) {
59                         return Poll::Ready(r.map_err(Into::into));
60                     }
61 
62                     let svc = svc.take().expect("illegal state");
63                     let rx =
64                         tokio::spawn(svc.ready_oneshot().map_err(Into::into).in_current_span());
65                     Inner::Future(rx)
66                 }
67                 Inner::Future(ref mut fut) => {
68                     let svc = ready!(Pin::new(fut).poll(cx))??;
69                     Inner::Service(Some(svc))
70                 }
71             }
72         }
73     }
74 
call(&mut self, request: Req) -> Self::Future75     fn call(&mut self, request: Req) -> Self::Future {
76         match self.inner {
77             Inner::Service(Some(ref mut svc)) => {
78                 ResponseFuture::new(svc.call(request).map_err(Into::into))
79             }
80             _ => unreachable!("poll_ready must be called"),
81         }
82     }
83 }
84