1 use super::future::ResponseFuture; 2 use crate::{util::ServiceExt, BoxError}; 3 use futures_core::ready; 4 use futures_util::future::TryFutureExt; 5 use std::{ 6 future::Future, 7 pin::Pin, 8 task::{Context, Poll}, 9 }; 10 use tower_service::Service; 11 use tracing::Instrument; 12 13 /// Spawns tasks to drive an inner service to readiness. 14 /// 15 /// See crate level documentation for more details. 16 #[derive(Debug)] 17 pub struct SpawnReady<S> { 18 inner: Inner<S>, 19 } 20 21 #[derive(Debug)] 22 enum Inner<S> { 23 Service(Option<S>), 24 Future(tokio::task::JoinHandle<Result<S, BoxError>>), 25 } 26 27 impl<S> SpawnReady<S> { 28 /// Creates a new [`SpawnReady`] wrapping `service`. new(service: S) -> Self29 pub fn new(service: S) -> Self { 30 Self { 31 inner: Inner::Service(Some(service)), 32 } 33 } 34 } 35 36 impl<S> Drop for SpawnReady<S> { drop(&mut self)37 fn drop(&mut self) { 38 if let Inner::Future(ref mut task) = self.inner { 39 task.abort(); 40 } 41 } 42 } 43 44 impl<S, Req> Service<Req> for SpawnReady<S> 45 where 46 Req: 'static, 47 S: Service<Req> + Send + 'static, 48 S::Error: Into<BoxError>, 49 { 50 type Response = S::Response; 51 type Error = BoxError; 52 type Future = ResponseFuture<S::Future, S::Error>; 53 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>>54 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> { 55 loop { 56 self.inner = match self.inner { 57 Inner::Service(ref mut svc) => { 58 if let Poll::Ready(r) = svc.as_mut().expect("illegal state").poll_ready(cx) { 59 return Poll::Ready(r.map_err(Into::into)); 60 } 61 62 let svc = svc.take().expect("illegal state"); 63 let rx = 64 tokio::spawn(svc.ready_oneshot().map_err(Into::into).in_current_span()); 65 Inner::Future(rx) 66 } 67 Inner::Future(ref mut fut) => { 68 let svc = ready!(Pin::new(fut).poll(cx))??; 69 Inner::Service(Some(svc)) 70 } 71 } 72 } 73 } 74 call(&mut self, request: Req) -> Self::Future75 fn call(&mut self, request: Req) -> Self::Future { 76 match self.inner { 77 Inner::Service(Some(ref mut svc)) => { 78 ResponseFuture::new(svc.call(request).map_err(Into::into)) 79 } 80 _ => unreachable!("poll_ready must be called"), 81 } 82 } 83 } 84