1 use futures_util::ready;
2 use pin_project_lite::pin_project;
3 use std::time::Duration;
4 use std::{
5     future::Future,
6     pin::Pin,
7     task::{Context, Poll},
8 };
9 use tokio::time::Instant;
10 use tower_service::Service;
11 
12 /// Record is the interface for accepting request latency measurements.  When
13 /// a request completes, record is called with the elapsed duration between
14 /// when the service was called and when the future completed.
15 pub trait Record {
record(&mut self, latency: Duration)16     fn record(&mut self, latency: Duration);
17 }
18 
19 /// Latency is a middleware that measures request latency and records it to the
20 /// provided Record instance.
21 #[derive(Clone, Debug)]
22 pub struct Latency<R, S> {
23     rec: R,
24     service: S,
25 }
26 
27 pin_project! {
28     #[derive(Debug)]
29     pub struct ResponseFuture<R, F> {
30         start: Instant,
31         rec: R,
32         #[pin]
33         inner: F,
34     }
35 }
36 
37 impl<S, R> Latency<R, S>
38 where
39     R: Record + Clone,
40 {
new<Request>(rec: R, service: S) -> Self where S: Service<Request>, S::Error: Into<crate::BoxError>,41     pub fn new<Request>(rec: R, service: S) -> Self
42     where
43         S: Service<Request>,
44         S::Error: Into<crate::BoxError>,
45     {
46         Latency { rec, service }
47     }
48 }
49 
50 impl<S, R, Request> Service<Request> for Latency<R, S>
51 where
52     S: Service<Request>,
53     S::Error: Into<crate::BoxError>,
54     R: Record + Clone,
55 {
56     type Response = S::Response;
57     type Error = crate::BoxError;
58     type Future = ResponseFuture<R, S::Future>;
59 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>60     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61         self.service.poll_ready(cx).map_err(Into::into)
62     }
63 
call(&mut self, request: Request) -> Self::Future64     fn call(&mut self, request: Request) -> Self::Future {
65         ResponseFuture {
66             start: Instant::now(),
67             rec: self.rec.clone(),
68             inner: self.service.call(request),
69         }
70     }
71 }
72 
73 impl<R, F, T, E> Future for ResponseFuture<R, F>
74 where
75     R: Record,
76     F: Future<Output = Result<T, E>>,
77     E: Into<crate::BoxError>,
78 {
79     type Output = Result<T, crate::BoxError>;
80 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>81     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82         let this = self.project();
83 
84         let rsp = ready!(this.inner.poll(cx)).map_err(Into::into)?;
85         let duration = Instant::now().saturating_duration_since(*this.start);
86         this.rec.record(duration);
87         Poll::Ready(Ok(rsp))
88     }
89 }
90