use futures_util::ready; use pin_project_lite::pin_project; use std::time::Duration; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; use tokio::time::Instant; use tower_service::Service; /// Record is the interface for accepting request latency measurements. When /// a request completes, record is called with the elapsed duration between /// when the service was called and when the future completed. pub trait Record { fn record(&mut self, latency: Duration); } /// Latency is a middleware that measures request latency and records it to the /// provided Record instance. #[derive(Clone, Debug)] pub struct Latency { rec: R, service: S, } pin_project! { #[derive(Debug)] pub struct ResponseFuture { start: Instant, rec: R, #[pin] inner: F, } } impl Latency where R: Record + Clone, { pub fn new(rec: R, service: S) -> Self where S: Service, S::Error: Into, { Latency { rec, service } } } impl Service for Latency where S: Service, S::Error: Into, R: Record + Clone, { type Response = S::Response; type Error = crate::BoxError; type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx).map_err(Into::into) } fn call(&mut self, request: Request) -> Self::Future { ResponseFuture { start: Instant::now(), rec: self.rec.clone(), inner: self.service.call(request), } } } impl Future for ResponseFuture where R: Record, F: Future>, E: Into, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); let rsp = ready!(this.inner.poll(cx)).map_err(Into::into)?; let duration = Instant::now().saturating_duration_since(*this.start); this.rec.record(duration); Poll::Ready(Ok(rsp)) } }