1 use futures_util::ready; 2 use pin_project_lite::pin_project; 3 use std::time::Duration; 4 use std::{ 5 future::Future, 6 pin::Pin, 7 task::{Context, Poll}, 8 }; 9 use tokio::time::Instant; 10 use tower_service::Service; 11 12 /// Record is the interface for accepting request latency measurements. When 13 /// a request completes, record is called with the elapsed duration between 14 /// when the service was called and when the future completed. 15 pub trait Record { record(&mut self, latency: Duration)16 fn record(&mut self, latency: Duration); 17 } 18 19 /// Latency is a middleware that measures request latency and records it to the 20 /// provided Record instance. 21 #[derive(Clone, Debug)] 22 pub struct Latency<R, S> { 23 rec: R, 24 service: S, 25 } 26 27 pin_project! { 28 #[derive(Debug)] 29 pub struct ResponseFuture<R, F> { 30 start: Instant, 31 rec: R, 32 #[pin] 33 inner: F, 34 } 35 } 36 37 impl<S, R> Latency<R, S> 38 where 39 R: Record + Clone, 40 { new<Request>(rec: R, service: S) -> Self where S: Service<Request>, S::Error: Into<crate::BoxError>,41 pub fn new<Request>(rec: R, service: S) -> Self 42 where 43 S: Service<Request>, 44 S::Error: Into<crate::BoxError>, 45 { 46 Latency { rec, service } 47 } 48 } 49 50 impl<S, R, Request> Service<Request> for Latency<R, S> 51 where 52 S: Service<Request>, 53 S::Error: Into<crate::BoxError>, 54 R: Record + Clone, 55 { 56 type Response = S::Response; 57 type Error = crate::BoxError; 58 type Future = ResponseFuture<R, S::Future>; 59 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>60 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 61 self.service.poll_ready(cx).map_err(Into::into) 62 } 63 call(&mut self, request: Request) -> Self::Future64 fn call(&mut self, request: Request) -> Self::Future { 65 ResponseFuture { 66 start: Instant::now(), 67 rec: self.rec.clone(), 68 inner: self.service.call(request), 69 } 70 } 71 } 72 73 impl<R, F, T, E> Future for ResponseFuture<R, F> 74 where 75 R: Record, 76 F: Future<Output = Result<T, E>>, 77 E: Into<crate::BoxError>, 78 { 79 type Output = Result<T, crate::BoxError>; 80 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>81 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 82 let this = self.project(); 83 84 let rsp = ready!(this.inner.poll(cx)).map_err(Into::into)?; 85 let duration = Instant::now().saturating_duration_since(*this.start); 86 this.rec.record(duration); 87 Poll::Ready(Ok(rsp)) 88 } 89 } 90