use futures_util::ready; use pin_project_lite::pin_project; use std::time::Duration; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; use tower_service::Service; use crate::util::Oneshot; /// A policy which specifies how long each request should be delayed for. pub trait Policy { fn delay(&self, req: &Request) -> Duration; } /// A middleware which delays sending the request to the underlying service /// for an amount of time specified by the policy. #[derive(Debug)] pub struct Delay { policy: P, service: S, } pin_project! { #[derive(Debug)] pub struct ResponseFuture where S: Service, { service: Option, #[pin] state: State>, } } pin_project! { #[project = StateProj] #[derive(Debug)] enum State { Delaying { #[pin] delay: tokio::time::Sleep, req: Option, }, Called { #[pin] fut: F, }, } } impl State { fn delaying(delay: tokio::time::Sleep, req: Option) -> Self { Self::Delaying { delay, req } } fn called(fut: F) -> Self { Self::Called { fut } } } impl Delay { pub fn new(policy: P, service: S) -> Self where P: Policy, S: Service + Clone, S::Error: Into, { Delay { policy, service } } } impl Service for Delay where P: Policy, S: Service + Clone, S::Error: Into, { type Response = S::Response; type Error = crate::BoxError; type Future = ResponseFuture; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { // Calling self.service.poll_ready would reserve a slot for the delayed request, // potentially well in advance of actually making it. Instead, signal readiness here and // treat the service as a Oneshot in the future. Poll::Ready(Ok(())) } fn call(&mut self, request: Request) -> Self::Future { let delay = self.policy.delay(&request); ResponseFuture { service: Some(self.service.clone()), state: State::delaying(tokio::time::sleep(delay), Some(request)), } } } impl Future for ResponseFuture where E: Into, S: Service, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); loop { match this.state.as_mut().project() { StateProj::Delaying { delay, req } => { ready!(delay.poll(cx)); let req = req.take().expect("Missing request in delay"); let svc = this.service.take().expect("Missing service in delay"); let fut = Oneshot::new(svc, req); this.state.set(State::called(fut)); } StateProj::Called { fut } => { return fut.poll(cx).map_err(Into::into); } }; } } }