1 #![warn( 2 missing_debug_implementations, 3 missing_docs, 4 rust_2018_idioms, 5 unreachable_pub 6 )] 7 #![forbid(unsafe_code)] 8 // `rustdoc::broken_intra_doc_links` is checked on CI 9 10 //! Definition of the core `Service` trait to Tower 11 //! 12 //! The [`Service`] trait provides the necessary abstractions for defining 13 //! request / response clients and servers. It is simple but powerful and is 14 //! used as the foundation for the rest of Tower. 15 16 use std::future::Future; 17 use std::task::{Context, Poll}; 18 19 /// An asynchronous function from a `Request` to a `Response`. 20 /// 21 /// The `Service` trait is a simplified interface making it easy to write 22 /// network applications in a modular and reusable way, decoupled from the 23 /// underlying protocol. It is one of Tower's fundamental abstractions. 24 /// 25 /// # Functional 26 /// 27 /// A `Service` is a function of a `Request`. It immediately returns a 28 /// `Future` representing the eventual completion of processing the 29 /// request. The actual request processing may happen at any time in the 30 /// future, on any thread or executor. The processing may depend on calling 31 /// other services. At some point in the future, the processing will complete, 32 /// and the `Future` will resolve to a response or error. 33 /// 34 /// At a high level, the `Service::call` function represents an RPC request. The 35 /// `Service` value can be a server or a client. 36 /// 37 /// # Server 38 /// 39 /// An RPC server *implements* the `Service` trait. Requests received by the 40 /// server over the network are deserialized and then passed as an argument to the 41 /// server value. The returned response is sent back over the network. 42 /// 43 /// As an example, here is how an HTTP request is processed by a server: 44 /// 45 /// ```rust 46 /// # use std::pin::Pin; 47 /// # use std::task::{Poll, Context}; 48 /// # use std::future::Future; 49 /// # use tower_service::Service; 50 /// use http::{Request, Response, StatusCode}; 51 /// 52 /// struct HelloWorld; 53 /// 54 /// impl Service<Request<Vec<u8>>> for HelloWorld { 55 /// type Response = Response<Vec<u8>>; 56 /// type Error = http::Error; 57 /// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; 58 /// 59 /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 60 /// Poll::Ready(Ok(())) 61 /// } 62 /// 63 /// fn call(&mut self, req: Request<Vec<u8>>) -> Self::Future { 64 /// // create the body 65 /// let body: Vec<u8> = "hello, world!\n" 66 /// .as_bytes() 67 /// .to_owned(); 68 /// // Create the HTTP response 69 /// let resp = Response::builder() 70 /// .status(StatusCode::OK) 71 /// .body(body) 72 /// .expect("Unable to create `http::Response`"); 73 /// 74 /// // create a response in a future. 75 /// let fut = async { 76 /// Ok(resp) 77 /// }; 78 /// 79 /// // Return the response as an immediate future 80 /// Box::pin(fut) 81 /// } 82 /// } 83 /// ``` 84 /// 85 /// # Client 86 /// 87 /// A client consumes a service by using a `Service` value. The client may 88 /// issue requests by invoking `call` and passing the request as an argument. 89 /// It then receives the response by waiting for the returned future. 90 /// 91 /// As an example, here is how a Redis request would be issued: 92 /// 93 /// ```rust,ignore 94 /// let client = redis::Client::new() 95 /// .connect("127.0.0.1:6379".parse().unwrap()) 96 /// .unwrap(); 97 /// 98 /// let resp = client.call(Cmd::set("foo", "this is the value of foo")).await?; 99 /// 100 /// // Wait for the future to resolve 101 /// println!("Redis response: {:?}", resp); 102 /// ``` 103 /// 104 /// # Middleware / Layer 105 /// 106 /// More often than not, all the pieces needed for writing robust, scalable 107 /// network applications are the same no matter the underlying protocol. By 108 /// unifying the API for both clients and servers in a protocol agnostic way, 109 /// it is possible to write middleware that provide these pieces in a 110 /// reusable way. 111 /// 112 /// Take timeouts as an example: 113 /// 114 /// ```rust 115 /// use tower_service::Service; 116 /// use tower_layer::Layer; 117 /// use futures::FutureExt; 118 /// use std::future::Future; 119 /// use std::task::{Context, Poll}; 120 /// use std::time::Duration; 121 /// use std::pin::Pin; 122 /// use std::fmt; 123 /// use std::error::Error; 124 /// 125 /// // Our timeout service, which wraps another service and 126 /// // adds a timeout to its response future. 127 /// pub struct Timeout<T> { 128 /// inner: T, 129 /// timeout: Duration, 130 /// } 131 /// 132 /// impl<T> Timeout<T> { 133 /// pub fn new(inner: T, timeout: Duration) -> Timeout<T> { 134 /// Timeout { 135 /// inner, 136 /// timeout 137 /// } 138 /// } 139 /// } 140 /// 141 /// // The error returned if processing a request timed out 142 /// #[derive(Debug)] 143 /// pub struct Expired; 144 /// 145 /// impl fmt::Display for Expired { 146 /// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 147 /// write!(f, "expired") 148 /// } 149 /// } 150 /// 151 /// impl Error for Expired {} 152 /// 153 /// // We can implement `Service` for `Timeout<T>` if `T` is a `Service` 154 /// impl<T, Request> Service<Request> for Timeout<T> 155 /// where 156 /// T: Service<Request>, 157 /// T::Future: 'static, 158 /// T::Error: Into<Box<dyn Error + Send + Sync>> + 'static, 159 /// T::Response: 'static, 160 /// { 161 /// // `Timeout` doesn't modify the response type, so we use `T`'s response type 162 /// type Response = T::Response; 163 /// // Errors may be either `Expired` if the timeout expired, or the inner service's 164 /// // `Error` type. Therefore, we return a boxed `dyn Error + Send + Sync` trait object to erase 165 /// // the error's type. 166 /// type Error = Box<dyn Error + Send + Sync>; 167 /// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; 168 /// 169 /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 170 /// // Our timeout service is ready if the inner service is ready. 171 /// // This is how backpressure can be propagated through a tree of nested services. 172 /// self.inner.poll_ready(cx).map_err(Into::into) 173 /// } 174 /// 175 /// fn call(&mut self, req: Request) -> Self::Future { 176 /// // Create a future that completes after `self.timeout` 177 /// let timeout = tokio::time::sleep(self.timeout); 178 /// 179 /// // Call the inner service and get a future that resolves to the response 180 /// let fut = self.inner.call(req); 181 /// 182 /// // Wrap those two futures in another future that completes when either one completes 183 /// // 184 /// // If the inner service is too slow the `sleep` future will complete first 185 /// // And an error will be returned and `fut` will be dropped and not polled again 186 /// // 187 /// // We have to box the errors so the types match 188 /// let f = async move { 189 /// tokio::select! { 190 /// res = fut => { 191 /// res.map_err(|err| err.into()) 192 /// }, 193 /// _ = timeout => { 194 /// Err(Box::new(Expired) as Box<dyn Error + Send + Sync>) 195 /// }, 196 /// } 197 /// }; 198 /// 199 /// Box::pin(f) 200 /// } 201 /// } 202 /// 203 /// // A layer for wrapping services in `Timeout` 204 /// pub struct TimeoutLayer(Duration); 205 /// 206 /// impl TimeoutLayer { 207 /// pub fn new(delay: Duration) -> Self { 208 /// TimeoutLayer(delay) 209 /// } 210 /// } 211 /// 212 /// impl<S> Layer<S> for TimeoutLayer { 213 /// type Service = Timeout<S>; 214 /// 215 /// fn layer(&self, service: S) -> Timeout<S> { 216 /// Timeout::new(service, self.0) 217 /// } 218 /// } 219 /// ``` 220 /// 221 /// The above timeout implementation is decoupled from the underlying protocol 222 /// and is also decoupled from client or server concerns. In other words, the 223 /// same timeout middleware could be used in either a client or a server. 224 /// 225 /// # Backpressure 226 /// 227 /// Calling a `Service` which is at capacity (i.e., it is temporarily unable to process a 228 /// request) should result in an error. The caller is responsible for ensuring 229 /// that the service is ready to receive the request before calling it. 230 /// 231 /// `Service` provides a mechanism by which the caller is able to coordinate 232 /// readiness. `Service::poll_ready` returns `Ready` if the service expects that 233 /// it is able to process a request. 234 /// 235 /// # Be careful when cloning inner services 236 /// 237 /// Services are permitted to panic if `call` is invoked without obtaining `Poll::Ready(Ok(()))` 238 /// from `poll_ready`. You should therefore be careful when cloning services for example to move 239 /// them into boxed futures. Even though the original service is ready, the clone might not be. 240 /// 241 /// Therefore this kind of code is wrong and might panic: 242 /// 243 /// ```rust 244 /// # use std::pin::Pin; 245 /// # use std::task::{Poll, Context}; 246 /// # use std::future::Future; 247 /// # use tower_service::Service; 248 /// # 249 /// struct Wrapper<S> { 250 /// inner: S, 251 /// } 252 /// 253 /// impl<R, S> Service<R> for Wrapper<S> 254 /// where 255 /// S: Service<R> + Clone + 'static, 256 /// R: 'static, 257 /// { 258 /// type Response = S::Response; 259 /// type Error = S::Error; 260 /// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; 261 /// 262 /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 263 /// Poll::Ready(Ok(())) 264 /// } 265 /// 266 /// fn call(&mut self, req: R) -> Self::Future { 267 /// let mut inner = self.inner.clone(); 268 /// Box::pin(async move { 269 /// // `inner` might not be ready since its a clone 270 /// inner.call(req).await 271 /// }) 272 /// } 273 /// } 274 /// ``` 275 /// 276 /// You should instead use [`std::mem::replace`] to take the service that was ready: 277 /// 278 /// ```rust 279 /// # use std::pin::Pin; 280 /// # use std::task::{Poll, Context}; 281 /// # use std::future::Future; 282 /// # use tower_service::Service; 283 /// # 284 /// struct Wrapper<S> { 285 /// inner: S, 286 /// } 287 /// 288 /// impl<R, S> Service<R> for Wrapper<S> 289 /// where 290 /// S: Service<R> + Clone + 'static, 291 /// R: 'static, 292 /// { 293 /// type Response = S::Response; 294 /// type Error = S::Error; 295 /// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; 296 /// 297 /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 298 /// Poll::Ready(Ok(())) 299 /// } 300 /// 301 /// fn call(&mut self, req: R) -> Self::Future { 302 /// let clone = self.inner.clone(); 303 /// // take the service that was ready 304 /// let mut inner = std::mem::replace(&mut self.inner, clone); 305 /// Box::pin(async move { 306 /// inner.call(req).await 307 /// }) 308 /// } 309 /// } 310 /// ``` 311 pub trait Service<Request> { 312 /// Responses given by the service. 313 type Response; 314 315 /// Errors produced by the service. 316 type Error; 317 318 /// The future response value. 319 type Future: Future<Output = Result<Self::Response, Self::Error>>; 320 321 /// Returns `Poll::Ready(Ok(()))` when the service is able to process requests. 322 /// 323 /// If the service is at capacity, then `Poll::Pending` is returned and the task 324 /// is notified when the service becomes ready again. This function is 325 /// expected to be called while on a task. Generally, this can be done with 326 /// a simple `futures::future::poll_fn` call. 327 /// 328 /// If `Poll::Ready(Err(_))` is returned, the service is no longer able to service requests 329 /// and the caller should discard the service instance. 330 /// 331 /// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a request may be dispatched to the 332 /// service using `call`. Until a request is dispatched, repeated calls to 333 /// `poll_ready` must return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))`. 334 /// 335 /// Note that `poll_ready` may reserve shared resources that are consumed in a subsequent 336 /// invocation of `call`. Thus, it is critical for implementations to not assume that `call` 337 /// will always be invoked and to ensure that such resources are released if the service is 338 /// dropped before `call` is invoked or the future returned by `call` is dropped before it 339 /// is polled. poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>340 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; 341 342 /// Process the request and return the response asynchronously. 343 /// 344 /// This function is expected to be callable off task. As such, 345 /// implementations should take care to not call `poll_ready`. 346 /// 347 /// Before dispatching a request, `poll_ready` must be called and return 348 /// `Poll::Ready(Ok(()))`. 349 /// 350 /// # Panics 351 /// 352 /// Implementations are permitted to panic if `call` is invoked without 353 /// obtaining `Poll::Ready(Ok(()))` from `poll_ready`. call(&mut self, req: Request) -> Self::Future354 fn call(&mut self, req: Request) -> Self::Future; 355 } 356 357 impl<'a, S, Request> Service<Request> for &'a mut S 358 where 359 S: Service<Request> + 'a, 360 { 361 type Response = S::Response; 362 type Error = S::Error; 363 type Future = S::Future; 364 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>365 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> { 366 (**self).poll_ready(cx) 367 } 368 call(&mut self, request: Request) -> S::Future369 fn call(&mut self, request: Request) -> S::Future { 370 (**self).call(request) 371 } 372 } 373 374 impl<S, Request> Service<Request> for Box<S> 375 where 376 S: Service<Request> + ?Sized, 377 { 378 type Response = S::Response; 379 type Error = S::Error; 380 type Future = S::Future; 381 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>382 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> { 383 (**self).poll_ready(cx) 384 } 385 call(&mut self, request: Request) -> S::Future386 fn call(&mut self, request: Request) -> S::Future { 387 (**self).call(request) 388 } 389 } 390