1 //! Various utility types and functions that are generally used with Tower.
2 
3 mod and_then;
4 mod boxed;
5 mod boxed_clone;
6 mod call_all;
7 mod either;
8 
9 mod future_service;
10 mod map_err;
11 mod map_request;
12 mod map_response;
13 mod map_result;
14 
15 mod map_future;
16 mod oneshot;
17 mod optional;
18 mod ready;
19 mod service_fn;
20 mod then;
21 
22 #[allow(deprecated)]
23 pub use self::{
24     and_then::{AndThen, AndThenLayer},
25     boxed::{BoxLayer, BoxService, UnsyncBoxService},
26     boxed_clone::BoxCloneService,
27     either::Either,
28     future_service::{future_service, FutureService},
29     map_err::{MapErr, MapErrLayer},
30     map_future::{MapFuture, MapFutureLayer},
31     map_request::{MapRequest, MapRequestLayer},
32     map_response::{MapResponse, MapResponseLayer},
33     map_result::{MapResult, MapResultLayer},
34     oneshot::Oneshot,
35     optional::Optional,
36     ready::{Ready, ReadyAnd, ReadyOneshot},
37     service_fn::{service_fn, ServiceFn},
38     then::{Then, ThenLayer},
39 };
40 
41 pub use self::call_all::{CallAll, CallAllUnordered};
42 use std::future::Future;
43 
44 use crate::layer::util::Identity;
45 
46 pub mod error {
47     //! Error types
48 
49     pub use super::optional::error as optional;
50 }
51 
52 pub mod future {
53     //! Future types
54 
55     pub use super::and_then::AndThenFuture;
56     pub use super::map_err::MapErrFuture;
57     pub use super::map_response::MapResponseFuture;
58     pub use super::map_result::MapResultFuture;
59     pub use super::optional::future as optional;
60     pub use super::then::ThenFuture;
61 }
62 
63 /// An extension trait for `Service`s that provides a variety of convenient
64 /// adapters
65 pub trait ServiceExt<Request>: tower_service::Service<Request> {
66     /// Yields a mutable reference to the service when it is ready to accept a request.
ready(&mut self) -> Ready<'_, Self, Request> where Self: Sized,67     fn ready(&mut self) -> Ready<'_, Self, Request>
68     where
69         Self: Sized,
70     {
71         Ready::new(self)
72     }
73 
74     /// Yields a mutable reference to the service when it is ready to accept a request.
75     #[deprecated(
76         since = "0.4.6",
77         note = "please use the `ServiceExt::ready` method instead"
78     )]
79     #[allow(deprecated)]
ready_and(&mut self) -> ReadyAnd<'_, Self, Request> where Self: Sized,80     fn ready_and(&mut self) -> ReadyAnd<'_, Self, Request>
81     where
82         Self: Sized,
83     {
84         ReadyAnd::new(self)
85     }
86 
87     /// Yields the service when it is ready to accept a request.
ready_oneshot(self) -> ReadyOneshot<Self, Request> where Self: Sized,88     fn ready_oneshot(self) -> ReadyOneshot<Self, Request>
89     where
90         Self: Sized,
91     {
92         ReadyOneshot::new(self)
93     }
94 
95     /// Consume this `Service`, calling with the providing request once it is ready.
oneshot(self, req: Request) -> Oneshot<Self, Request> where Self: Sized,96     fn oneshot(self, req: Request) -> Oneshot<Self, Request>
97     where
98         Self: Sized,
99     {
100         Oneshot::new(self, req)
101     }
102 
103     /// Process all requests from the given [`Stream`], and produce a [`Stream`] of their responses.
104     ///
105     /// This is essentially [`Stream<Item = Request>`][stream] + `Self` => [`Stream<Item =
106     /// Response>`][stream]. See the documentation for [`CallAll`] for
107     /// details.
108     ///
109     /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
110     /// [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
call_all<S>(self, reqs: S) -> CallAll<Self, S> where Self: Sized, Self::Error: Into<crate::BoxError>, S: futures_core::Stream<Item = Request>,111     fn call_all<S>(self, reqs: S) -> CallAll<Self, S>
112     where
113         Self: Sized,
114         Self::Error: Into<crate::BoxError>,
115         S: futures_core::Stream<Item = Request>,
116     {
117         CallAll::new(self, reqs)
118     }
119 
120     /// Executes a new future after this service's future resolves. This does
121     /// not alter the behaviour of the [`poll_ready`] method.
122     ///
123     /// This method can be used to change the [`Response`] type of the service
124     /// into a different type. You can use this method to chain along a computation once the
125     /// service's response has been resolved.
126     ///
127     /// [`Response`]: crate::Service::Response
128     /// [`poll_ready`]: crate::Service::poll_ready
129     ///
130     /// # Example
131     /// ```
132     /// # use std::task::{Poll, Context};
133     /// # use tower::{Service, ServiceExt};
134     /// #
135     /// # struct DatabaseService;
136     /// # impl DatabaseService {
137     /// #   fn new(address: &str) -> Self {
138     /// #       DatabaseService
139     /// #   }
140     /// # }
141     /// #
142     /// # struct Record {
143     /// #   pub name: String,
144     /// #   pub age: u16
145     /// # }
146     /// #
147     /// # impl Service<u32> for DatabaseService {
148     /// #   type Response = Record;
149     /// #   type Error = u8;
150     /// #   type Future = futures_util::future::Ready<Result<Record, u8>>;
151     /// #
152     /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153     /// #       Poll::Ready(Ok(()))
154     /// #   }
155     /// #
156     /// #   fn call(&mut self, request: u32) -> Self::Future {
157     /// #       futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
158     /// #   }
159     /// # }
160     /// #
161     /// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) }
162     /// #
163     /// # fn main() {
164     /// #    async {
165     /// // A service returning Result<Record, _>
166     /// let service = DatabaseService::new("127.0.0.1:8080");
167     ///
168     /// // Map the response into a new response
169     /// let mut new_service = service.and_then(|record: Record| async move {
170     ///     let name = record.name;
171     ///     avatar_lookup(name).await
172     /// });
173     ///
174     /// // Call the new service
175     /// let id = 13;
176     /// let avatar = new_service.call(id).await.unwrap();
177     /// #    };
178     /// # }
179     /// ```
and_then<F>(self, f: F) -> AndThen<Self, F> where Self: Sized, F: Clone,180     fn and_then<F>(self, f: F) -> AndThen<Self, F>
181     where
182         Self: Sized,
183         F: Clone,
184     {
185         AndThen::new(self, f)
186     }
187 
188     /// Maps this service's response value to a different value. This does not
189     /// alter the behaviour of the [`poll_ready`] method.
190     ///
191     /// This method can be used to change the [`Response`] type of the service
192     /// into a different type. It is similar to the [`Result::map`]
193     /// method. You can use this method to chain along a computation once the
194     /// service's response has been resolved.
195     ///
196     /// [`Response`]: crate::Service::Response
197     /// [`poll_ready`]: crate::Service::poll_ready
198     ///
199     /// # Example
200     /// ```
201     /// # use std::task::{Poll, Context};
202     /// # use tower::{Service, ServiceExt};
203     /// #
204     /// # struct DatabaseService;
205     /// # impl DatabaseService {
206     /// #   fn new(address: &str) -> Self {
207     /// #       DatabaseService
208     /// #   }
209     /// # }
210     /// #
211     /// # struct Record {
212     /// #   pub name: String,
213     /// #   pub age: u16
214     /// # }
215     /// #
216     /// # impl Service<u32> for DatabaseService {
217     /// #   type Response = Record;
218     /// #   type Error = u8;
219     /// #   type Future = futures_util::future::Ready<Result<Record, u8>>;
220     /// #
221     /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
222     /// #       Poll::Ready(Ok(()))
223     /// #   }
224     /// #
225     /// #   fn call(&mut self, request: u32) -> Self::Future {
226     /// #       futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
227     /// #   }
228     /// # }
229     /// #
230     /// # fn main() {
231     /// #    async {
232     /// // A service returning Result<Record, _>
233     /// let service = DatabaseService::new("127.0.0.1:8080");
234     ///
235     /// // Map the response into a new response
236     /// let mut new_service = service.map_response(|record| record.name);
237     ///
238     /// // Call the new service
239     /// let id = 13;
240     /// let name = new_service
241     ///     .ready()
242     ///     .await?
243     ///     .call(id)
244     ///     .await?;
245     /// # Ok::<(), u8>(())
246     /// #    };
247     /// # }
248     /// ```
map_response<F, Response>(self, f: F) -> MapResponse<Self, F> where Self: Sized, F: FnOnce(Self::Response) -> Response + Clone,249     fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
250     where
251         Self: Sized,
252         F: FnOnce(Self::Response) -> Response + Clone,
253     {
254         MapResponse::new(self, f)
255     }
256 
257     /// Maps this service's error value to a different value. This does not
258     /// alter the behaviour of the [`poll_ready`] method.
259     ///
260     /// This method can be used to change the [`Error`] type of the service
261     /// into a different type. It is similar to the [`Result::map_err`] method.
262     ///
263     /// [`Error`]: crate::Service::Error
264     /// [`poll_ready`]: crate::Service::poll_ready
265     ///
266     /// # Example
267     /// ```
268     /// # use std::task::{Poll, Context};
269     /// # use tower::{Service, ServiceExt};
270     /// #
271     /// # struct DatabaseService;
272     /// # impl DatabaseService {
273     /// #   fn new(address: &str) -> Self {
274     /// #       DatabaseService
275     /// #   }
276     /// # }
277     /// #
278     /// # struct Error {
279     /// #   pub code: u32,
280     /// #   pub message: String
281     /// # }
282     /// #
283     /// # impl Service<u32> for DatabaseService {
284     /// #   type Response = String;
285     /// #   type Error = Error;
286     /// #   type Future = futures_util::future::Ready<Result<String, Error>>;
287     /// #
288     /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
289     /// #       Poll::Ready(Ok(()))
290     /// #   }
291     /// #
292     /// #   fn call(&mut self, request: u32) -> Self::Future {
293     /// #       futures_util::future::ready(Ok(String::new()))
294     /// #   }
295     /// # }
296     /// #
297     /// # fn main() {
298     /// #   async {
299     /// // A service returning Result<_, Error>
300     /// let service = DatabaseService::new("127.0.0.1:8080");
301     ///
302     /// // Map the error to a new error
303     /// let mut new_service = service.map_err(|err| err.code);
304     ///
305     /// // Call the new service
306     /// let id = 13;
307     /// let code = new_service
308     ///     .ready()
309     ///     .await?
310     ///     .call(id)
311     ///     .await
312     ///     .unwrap_err();
313     /// # Ok::<(), u32>(())
314     /// #   };
315     /// # }
316     /// ```
map_err<F, Error>(self, f: F) -> MapErr<Self, F> where Self: Sized, F: FnOnce(Self::Error) -> Error + Clone,317     fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
318     where
319         Self: Sized,
320         F: FnOnce(Self::Error) -> Error + Clone,
321     {
322         MapErr::new(self, f)
323     }
324 
325     /// Maps this service's result type (`Result<Self::Response, Self::Error>`)
326     /// to a different value, regardless of whether the future succeeds or
327     /// fails.
328     ///
329     /// This is similar to the [`map_response`] and [`map_err`] combinators,
330     /// except that the *same* function is invoked when the service's future
331     /// completes, whether it completes successfully or fails. This function
332     /// takes the [`Result`] returned by the service's future, and returns a
333     /// [`Result`].
334     ///
335     /// Like the standard library's [`Result::and_then`], this method can be
336     /// used to implement control flow based on `Result` values. For example, it
337     /// may be used to implement error recovery, by turning some [`Err`]
338     /// responses from the service into [`Ok`] responses. Similarly, some
339     /// successful responses from the service could be rejected, by returning an
340     /// [`Err`] conditionally, depending on the value inside the [`Ok`.] Finally,
341     /// this method can also be used to implement behaviors that must run when a
342     /// service's future completes, regardless of whether it succeeded or failed.
343     ///
344     /// This method can be used to change the [`Response`] type of the service
345     /// into a different type. It can also be used to change the [`Error`] type
346     /// of the service. However, because the [`map_result`] function is not applied
347     /// to the errors returned by the service's [`poll_ready`] method, it must
348     /// be possible to convert the service's [`Error`] type into the error type
349     /// returned by the [`map_result`] function. This is trivial when the function
350     /// returns the same error type as the service, but in other cases, it can
351     /// be useful to use [`BoxError`] to erase differing error types.
352     ///
353     /// # Examples
354     ///
355     /// Recovering from certain errors:
356     ///
357     /// ```
358     /// # use std::task::{Poll, Context};
359     /// # use tower::{Service, ServiceExt};
360     /// #
361     /// # struct DatabaseService;
362     /// # impl DatabaseService {
363     /// #   fn new(address: &str) -> Self {
364     /// #       DatabaseService
365     /// #   }
366     /// # }
367     /// #
368     /// # struct Record {
369     /// #   pub name: String,
370     /// #   pub age: u16
371     /// # }
372     /// # #[derive(Debug)]
373     /// # enum DbError {
374     /// #   Parse(std::num::ParseIntError),
375     /// #   NoRecordsFound,
376     /// # }
377     /// #
378     /// # impl Service<u32> for DatabaseService {
379     /// #   type Response = Vec<Record>;
380     /// #   type Error = DbError;
381     /// #   type Future = futures_util::future::Ready<Result<Vec<Record>, DbError>>;
382     /// #
383     /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
384     /// #       Poll::Ready(Ok(()))
385     /// #   }
386     /// #
387     /// #   fn call(&mut self, request: u32) -> Self::Future {
388     /// #       futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
389     /// #   }
390     /// # }
391     /// #
392     /// # fn main() {
393     /// #    async {
394     /// // A service returning Result<Vec<Record>, DbError>
395     /// let service = DatabaseService::new("127.0.0.1:8080");
396     ///
397     /// // If the database returns no records for the query, we just want an empty `Vec`.
398     /// let mut new_service = service.map_result(|result| match result {
399     ///     // If the error indicates that no records matched the query, return an empty
400     ///     // `Vec` instead.
401     ///     Err(DbError::NoRecordsFound) => Ok(Vec::new()),
402     ///     // Propagate all other responses (`Ok` and `Err`) unchanged
403     ///     x => x,
404     /// });
405     ///
406     /// // Call the new service
407     /// let id = 13;
408     /// let name = new_service
409     ///     .ready()
410     ///     .await?
411     ///     .call(id)
412     ///     .await?;
413     /// # Ok::<(), DbError>(())
414     /// #    };
415     /// # }
416     /// ```
417     ///
418     /// Rejecting some `Ok` responses:
419     ///
420     /// ```
421     /// # use std::task::{Poll, Context};
422     /// # use tower::{Service, ServiceExt};
423     /// #
424     /// # struct DatabaseService;
425     /// # impl DatabaseService {
426     /// #   fn new(address: &str) -> Self {
427     /// #       DatabaseService
428     /// #   }
429     /// # }
430     /// #
431     /// # struct Record {
432     /// #   pub name: String,
433     /// #   pub age: u16
434     /// # }
435     /// # type DbError = String;
436     /// # type AppError = String;
437     /// #
438     /// # impl Service<u32> for DatabaseService {
439     /// #   type Response = Record;
440     /// #   type Error = DbError;
441     /// #   type Future = futures_util::future::Ready<Result<Record, DbError>>;
442     /// #
443     /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
444     /// #       Poll::Ready(Ok(()))
445     /// #   }
446     /// #
447     /// #   fn call(&mut self, request: u32) -> Self::Future {
448     /// #       futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
449     /// #   }
450     /// # }
451     /// #
452     /// # fn main() {
453     /// #    async {
454     /// use tower::BoxError;
455     ///
456     /// // A service returning Result<Record, DbError>
457     /// let service = DatabaseService::new("127.0.0.1:8080");
458     ///
459     /// // If the user is zero years old, return an error.
460     /// let mut new_service = service.map_result(|result| {
461     ///    let record = result?;
462     ///
463     ///    if record.age == 0 {
464     ///         // Users must have been born to use our app!
465     ///         let app_error = AppError::from("users cannot be 0 years old!");
466     ///
467     ///         // Box the error to erase its type (as it can be an `AppError`
468     ///         // *or* the inner service's `DbError`).
469     ///         return Err(BoxError::from(app_error));
470     ///     }
471     ///
472     ///     // Otherwise, return the record.
473     ///     Ok(record)
474     /// });
475     ///
476     /// // Call the new service
477     /// let id = 13;
478     /// let record = new_service
479     ///     .ready()
480     ///     .await?
481     ///     .call(id)
482     ///     .await?;
483     /// # Ok::<(), BoxError>(())
484     /// #    };
485     /// # }
486     /// ```
487     ///
488     /// Performing an action that must be run for both successes and failures:
489     ///
490     /// ```
491     /// # use std::convert::TryFrom;
492     /// # use std::task::{Poll, Context};
493     /// # use tower::{Service, ServiceExt};
494     /// #
495     /// # struct DatabaseService;
496     /// # impl DatabaseService {
497     /// #   fn new(address: &str) -> Self {
498     /// #       DatabaseService
499     /// #   }
500     /// # }
501     /// #
502     /// # impl Service<u32> for DatabaseService {
503     /// #   type Response = String;
504     /// #   type Error = u8;
505     /// #   type Future = futures_util::future::Ready<Result<String, u8>>;
506     /// #
507     /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
508     /// #       Poll::Ready(Ok(()))
509     /// #   }
510     /// #
511     /// #   fn call(&mut self, request: u32) -> Self::Future {
512     /// #       futures_util::future::ready(Ok(String::new()))
513     /// #   }
514     /// # }
515     /// #
516     /// # fn main() {
517     /// #   async {
518     /// // A service returning Result<Record, DbError>
519     /// let service = DatabaseService::new("127.0.0.1:8080");
520     ///
521     /// // Print a message whenever a query completes.
522     /// let mut new_service = service.map_result(|result| {
523     ///     println!("query completed; success={}", result.is_ok());
524     ///     result
525     /// });
526     ///
527     /// // Call the new service
528     /// let id = 13;
529     /// let response = new_service
530     ///     .ready()
531     ///     .await?
532     ///     .call(id)
533     ///     .await;
534     /// # response
535     /// #    };
536     /// # }
537     /// ```
538     ///
539     /// [`map_response`]: ServiceExt::map_response
540     /// [`map_err`]: ServiceExt::map_err
541     /// [`map_result`]: ServiceExt::map_result
542     /// [`Error`]: crate::Service::Error
543     /// [`Response`]: crate::Service::Response
544     /// [`poll_ready`]: crate::Service::poll_ready
545     /// [`BoxError`]: crate::BoxError
map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F> where Self: Sized, Error: From<Self::Error>, F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone,546     fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
547     where
548         Self: Sized,
549         Error: From<Self::Error>,
550         F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone,
551     {
552         MapResult::new(self, f)
553     }
554 
555     /// Composes a function *in front of* the service.
556     ///
557     /// This adapter produces a new service that passes each value through the
558     /// given function `f` before sending it to `self`.
559     ///
560     /// # Example
561     /// ```
562     /// # use std::convert::TryFrom;
563     /// # use std::task::{Poll, Context};
564     /// # use tower::{Service, ServiceExt};
565     /// #
566     /// # struct DatabaseService;
567     /// # impl DatabaseService {
568     /// #   fn new(address: &str) -> Self {
569     /// #       DatabaseService
570     /// #   }
571     /// # }
572     /// #
573     /// # impl Service<String> for DatabaseService {
574     /// #   type Response = String;
575     /// #   type Error = u8;
576     /// #   type Future = futures_util::future::Ready<Result<String, u8>>;
577     /// #
578     /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
579     /// #       Poll::Ready(Ok(()))
580     /// #   }
581     /// #
582     /// #   fn call(&mut self, request: String) -> Self::Future {
583     /// #       futures_util::future::ready(Ok(String::new()))
584     /// #   }
585     /// # }
586     /// #
587     /// # fn main() {
588     /// #   async {
589     /// // A service taking a String as a request
590     /// let service = DatabaseService::new("127.0.0.1:8080");
591     ///
592     /// // Map the request to a new request
593     /// let mut new_service = service.map_request(|id: u32| id.to_string());
594     ///
595     /// // Call the new service
596     /// let id = 13;
597     /// let response = new_service
598     ///     .ready()
599     ///     .await?
600     ///     .call(id)
601     ///     .await;
602     /// # response
603     /// #    };
604     /// # }
605     /// ```
map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F> where Self: Sized, F: FnMut(NewRequest) -> Request,606     fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
607     where
608         Self: Sized,
609         F: FnMut(NewRequest) -> Request,
610     {
611         MapRequest::new(self, f)
612     }
613 
614     /// Composes this service with a [`Filter`] that conditionally accepts or
615     /// rejects requests based on a [predicate].
616     ///
617     /// This adapter produces a new service that passes each value through the
618     /// given function `predicate` before sending it to `self`.
619     ///
620     /// # Example
621     /// ```
622     /// # use std::convert::TryFrom;
623     /// # use std::task::{Poll, Context};
624     /// # use tower::{Service, ServiceExt};
625     /// #
626     /// # struct DatabaseService;
627     /// # impl DatabaseService {
628     /// #   fn new(address: &str) -> Self {
629     /// #       DatabaseService
630     /// #   }
631     /// # }
632     /// #
633     /// # #[derive(Debug)] enum DbError {
634     /// #   Parse(std::num::ParseIntError)
635     /// # }
636     /// #
637     /// # impl std::fmt::Display for DbError {
638     /// #    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
639     /// # }
640     /// # impl std::error::Error for DbError {}
641     /// # impl Service<u32> for DatabaseService {
642     /// #   type Response = String;
643     /// #   type Error = DbError;
644     /// #   type Future = futures_util::future::Ready<Result<String, DbError>>;
645     /// #
646     /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
647     /// #       Poll::Ready(Ok(()))
648     /// #   }
649     /// #
650     /// #   fn call(&mut self, request: u32) -> Self::Future {
651     /// #       futures_util::future::ready(Ok(String::new()))
652     /// #   }
653     /// # }
654     /// #
655     /// # fn main() {
656     /// #    async {
657     /// // A service taking a u32 as a request and returning Result<_, DbError>
658     /// let service = DatabaseService::new("127.0.0.1:8080");
659     ///
660     /// // Fallibly map the request to a new request
661     /// let mut new_service = service
662     ///     .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse));
663     ///
664     /// // Call the new service
665     /// let id = "13";
666     /// let response = new_service
667     ///     .ready()
668     ///     .await?
669     ///     .call(id)
670     ///     .await;
671     /// # response
672     /// #    };
673     /// # }
674     /// ```
675     ///
676     /// [`Filter`]: crate::filter::Filter
677     /// [predicate]: crate::filter::Predicate
678     #[cfg(feature = "filter")]
679     #[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F> where Self: Sized, F: crate::filter::Predicate<NewRequest>,680     fn filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F>
681     where
682         Self: Sized,
683         F: crate::filter::Predicate<NewRequest>,
684     {
685         crate::filter::Filter::new(self, filter)
686     }
687 
688     /// Composes this service with an [`AsyncFilter`] that conditionally accepts or
689     /// rejects requests based on an [async predicate].
690     ///
691     /// This adapter produces a new service that passes each value through the
692     /// given function `predicate` before sending it to `self`.
693     ///
694     /// # Example
695     /// ```
696     /// # use std::convert::TryFrom;
697     /// # use std::task::{Poll, Context};
698     /// # use tower::{Service, ServiceExt};
699     /// #
700     /// # #[derive(Clone)] struct DatabaseService;
701     /// # impl DatabaseService {
702     /// #   fn new(address: &str) -> Self {
703     /// #       DatabaseService
704     /// #   }
705     /// # }
706     /// # #[derive(Debug)]
707     /// # enum DbError {
708     /// #   Rejected
709     /// # }
710     /// # impl std::fmt::Display for DbError {
711     /// #    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
712     /// # }
713     /// # impl std::error::Error for DbError {}
714     /// #
715     /// # impl Service<u32> for DatabaseService {
716     /// #   type Response = String;
717     /// #   type Error = DbError;
718     /// #   type Future = futures_util::future::Ready<Result<String, DbError>>;
719     /// #
720     /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
721     /// #       Poll::Ready(Ok(()))
722     /// #   }
723     /// #
724     /// #   fn call(&mut self, request: u32) -> Self::Future {
725     /// #       futures_util::future::ready(Ok(String::new()))
726     /// #   }
727     /// # }
728     /// #
729     /// # fn main() {
730     /// #    async {
731     /// // A service taking a u32 as a request and returning Result<_, DbError>
732     /// let service = DatabaseService::new("127.0.0.1:8080");
733     ///
734     /// /// Returns `true` if we should query the database for an ID.
735     /// async fn should_query(id: u32) -> bool {
736     ///     // ...
737     ///     # true
738     /// }
739     ///
740     /// // Filter requests based on `should_query`.
741     /// let mut new_service = service
742     ///     .filter_async(|id: u32| async move {
743     ///         if should_query(id).await {
744     ///             return Ok(id);
745     ///         }
746     ///
747     ///         Err(DbError::Rejected)
748     ///     });
749     ///
750     /// // Call the new service
751     /// let id = 13;
752     /// # let id: u32 = id;
753     /// let response = new_service
754     ///     .ready()
755     ///     .await?
756     ///     .call(id)
757     ///     .await;
758     /// # response
759     /// #    };
760     /// # }
761     /// ```
762     ///
763     /// [`AsyncFilter`]: crate::filter::AsyncFilter
764     /// [asynchronous predicate]: crate::filter::AsyncPredicate
765     #[cfg(feature = "filter")]
766     #[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F> where Self: Sized, F: crate::filter::AsyncPredicate<NewRequest>,767     fn filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F>
768     where
769         Self: Sized,
770         F: crate::filter::AsyncPredicate<NewRequest>,
771     {
772         crate::filter::AsyncFilter::new(self, filter)
773     }
774 
775     /// Composes an asynchronous function *after* this service.
776     ///
777     /// This takes a function or closure returning a future, and returns a new
778     /// `Service` that chains that function after this service's [`Future`]. The
779     /// new `Service`'s future will consist of this service's future, followed
780     /// by the future returned by calling the chained function with the future's
781     /// [`Output`] type. The chained function is called regardless of whether
782     /// this service's future completes with a successful response or with an
783     /// error.
784     ///
785     /// This method can be thought of as an equivalent to the [`futures`
786     /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that
787     /// _return_ futures, rather than on an individual future. Similarly to that
788     /// combinator, [`ServiceExt::then`] can be used to implement asynchronous
789     /// error recovery, by calling some asynchronous function with errors
790     /// returned by this service. Alternatively, it may also be used to call a
791     /// fallible async function with the successful response of this service.
792     ///
793     /// This method can be used to change the [`Response`] type of the service
794     /// into a different type. It can also be used to change the [`Error`] type
795     /// of the service. However, because the `then` function is not applied
796     /// to the errors returned by the service's [`poll_ready`] method, it must
797     /// be possible to convert the service's [`Error`] type into the error type
798     /// returned by the `then` future. This is trivial when the function
799     /// returns the same error type as the service, but in other cases, it can
800     /// be useful to use [`BoxError`] to erase differing error types.
801     ///
802     /// # Examples
803     ///
804     /// ```
805     /// # use std::task::{Poll, Context};
806     /// # use tower::{Service, ServiceExt};
807     /// #
808     /// # struct DatabaseService;
809     /// # impl DatabaseService {
810     /// #   fn new(address: &str) -> Self {
811     /// #       DatabaseService
812     /// #   }
813     /// # }
814     /// #
815     /// # type Record = ();
816     /// # type DbError = ();
817     /// #
818     /// # impl Service<u32> for DatabaseService {
819     /// #   type Response = Record;
820     /// #   type Error = DbError;
821     /// #   type Future = futures_util::future::Ready<Result<Record, DbError>>;
822     /// #
823     /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
824     /// #       Poll::Ready(Ok(()))
825     /// #   }
826     /// #
827     /// #   fn call(&mut self, request: u32) -> Self::Future {
828     /// #       futures_util::future::ready(Ok(()))
829     /// #   }
830     /// # }
831     /// #
832     /// # fn main() {
833     /// // A service returning Result<Record, DbError>
834     /// let service = DatabaseService::new("127.0.0.1:8080");
835     ///
836     /// // An async function that attempts to recover from errors returned by the
837     /// // database.
838     /// async fn recover_from_error(error: DbError) -> Result<Record, DbError> {
839     ///     // ...
840     ///     # Ok(())
841     /// }
842     /// #    async {
843     ///
844     /// // If the database service returns an error, attempt to recover by
845     /// // calling `recover_from_error`. Otherwise, return the successful response.
846     /// let mut new_service = service.then(|result| async move {
847     ///     match result {
848     ///         Ok(record) => Ok(record),
849     ///         Err(e) => recover_from_error(e).await,
850     ///     }
851     /// });
852     ///
853     /// // Call the new service
854     /// let id = 13;
855     /// let record = new_service
856     ///     .ready()
857     ///     .await?
858     ///     .call(id)
859     ///     .await?;
860     /// # Ok::<(), DbError>(())
861     /// #    };
862     /// # }
863     /// ```
864     ///
865     /// [`Future`]: crate::Service::Future
866     /// [`Output`]: std::future::Future::Output
867     /// [`futures` crate]: https://docs.rs/futures
868     /// [`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then
869     /// [`Error`]: crate::Service::Error
870     /// [`Response`]: crate::Service::Response
871     /// [`poll_ready`]: crate::Service::poll_ready
872     /// [`BoxError`]: crate::BoxError
then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F> where Self: Sized, Error: From<Self::Error>, F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone, Fut: Future<Output = Result<Response, Error>>,873     fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
874     where
875         Self: Sized,
876         Error: From<Self::Error>,
877         F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone,
878         Fut: Future<Output = Result<Response, Error>>,
879     {
880         Then::new(self, f)
881     }
882 
883     /// Composes a function that transforms futures produced by the service.
884     ///
885     /// This takes a function or closure returning a future computed from the future returned by
886     /// the service's [`call`] method, as opposed to the responses produced by the future.
887     ///
888     /// # Examples
889     ///
890     /// ```
891     /// # use std::task::{Poll, Context};
892     /// # use tower::{Service, ServiceExt, BoxError};
893     /// #
894     /// # struct DatabaseService;
895     /// # impl DatabaseService {
896     /// #   fn new(address: &str) -> Self {
897     /// #       DatabaseService
898     /// #   }
899     /// # }
900     /// #
901     /// # type Record = ();
902     /// # type DbError = crate::BoxError;
903     /// #
904     /// # impl Service<u32> for DatabaseService {
905     /// #   type Response = Record;
906     /// #   type Error = DbError;
907     /// #   type Future = futures_util::future::Ready<Result<Record, DbError>>;
908     /// #
909     /// #   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
910     /// #       Poll::Ready(Ok(()))
911     /// #   }
912     /// #
913     /// #   fn call(&mut self, request: u32) -> Self::Future {
914     /// #       futures_util::future::ready(Ok(()))
915     /// #   }
916     /// # }
917     /// #
918     /// # fn main() {
919     /// use std::time::Duration;
920     /// use tokio::time::timeout;
921     ///
922     /// // A service returning Result<Record, DbError>
923     /// let service = DatabaseService::new("127.0.0.1:8080");
924     /// #    async {
925     ///
926     /// let mut new_service = service.map_future(|future| async move {
927     ///     let res = timeout(Duration::from_secs(1), future).await?;
928     ///     Ok::<_, BoxError>(res)
929     /// });
930     ///
931     /// // Call the new service
932     /// let id = 13;
933     /// let record = new_service
934     ///     .ready()
935     ///     .await?
936     ///     .call(id)
937     ///     .await?;
938     /// # Ok::<(), BoxError>(())
939     /// #    };
940     /// # }
941     /// ```
942     ///
943     /// Note that normally you wouldn't implement timeouts like this and instead use [`Timeout`].
944     ///
945     /// [`call`]: crate::Service::call
946     /// [`Timeout`]: crate::timeout::Timeout
map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F> where Self: Sized, F: FnMut(Self::Future) -> Fut, Error: From<Self::Error>, Fut: Future<Output = Result<Response, Error>>,947     fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
948     where
949         Self: Sized,
950         F: FnMut(Self::Future) -> Fut,
951         Error: From<Self::Error>,
952         Fut: Future<Output = Result<Response, Error>>,
953     {
954         MapFuture::new(self, f)
955     }
956 
957     /// Convert the service into a [`Service`] + [`Send`] trait object.
958     ///
959     /// See [`BoxService`] for more details.
960     ///
961     /// If `Self` implements the [`Clone`] trait, the [`boxed_clone`] method
962     /// can be used instead, to produce a boxed service which will also
963     /// implement [`Clone`].
964     ///
965     /// # Example
966     ///
967     /// ```
968     /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxService};
969     /// #
970     /// # struct Request;
971     /// # struct Response;
972     /// # impl Response {
973     /// #     fn new() -> Self { Self }
974     /// # }
975     ///
976     /// let service = service_fn(|req: Request| async {
977     ///     Ok::<_, BoxError>(Response::new())
978     /// });
979     ///
980     /// let service: BoxService<Request, Response, BoxError> = service
981     ///     .map_request(|req| {
982     ///         println!("received request");
983     ///         req
984     ///     })
985     ///     .map_response(|res| {
986     ///         println!("response produced");
987     ///         res
988     ///     })
989     ///     .boxed();
990     /// # let service = assert_service(service);
991     /// # fn assert_service<S, R>(svc: S) -> S
992     /// # where S: Service<R> { svc }
993     /// ```
994     ///
995     /// [`Service`]: crate::Service
996     /// [`boxed_clone`]: Self::boxed_clone
boxed(self) -> BoxService<Request, Self::Response, Self::Error> where Self: Sized + Send + 'static, Self::Future: Send + 'static,997     fn boxed(self) -> BoxService<Request, Self::Response, Self::Error>
998     where
999         Self: Sized + Send + 'static,
1000         Self::Future: Send + 'static,
1001     {
1002         BoxService::new(self)
1003     }
1004 
1005     /// Convert the service into a [`Service`] + [`Clone`] + [`Send`] trait object.
1006     ///
1007     /// This is similar to the [`boxed`] method, but it requires that `Self` implement
1008     /// [`Clone`], and the returned boxed service implements [`Clone`].
1009     /// See [`BoxCloneService`] for more details.
1010     ///
1011     /// # Example
1012     ///
1013     /// ```
1014     /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService};
1015     /// #
1016     /// # struct Request;
1017     /// # struct Response;
1018     /// # impl Response {
1019     /// #     fn new() -> Self { Self }
1020     /// # }
1021     ///
1022     /// let service = service_fn(|req: Request| async {
1023     ///     Ok::<_, BoxError>(Response::new())
1024     /// });
1025     ///
1026     /// let service: BoxCloneService<Request, Response, BoxError> = service
1027     ///     .map_request(|req| {
1028     ///         println!("received request");
1029     ///         req
1030     ///     })
1031     ///     .map_response(|res| {
1032     ///         println!("response produced");
1033     ///         res
1034     ///     })
1035     ///     .boxed_clone();
1036     ///
1037     /// // The boxed service can still be cloned.
1038     /// service.clone();
1039     /// # let service = assert_service(service);
1040     /// # fn assert_service<S, R>(svc: S) -> S
1041     /// # where S: Service<R> { svc }
1042     /// ```
1043     ///
1044     /// [`Service`]: crate::Service
1045     /// [`boxed`]: Self::boxed
boxed_clone(self) -> BoxCloneService<Request, Self::Response, Self::Error> where Self: Clone + Sized + Send + 'static, Self::Future: Send + 'static,1046     fn boxed_clone(self) -> BoxCloneService<Request, Self::Response, Self::Error>
1047     where
1048         Self: Clone + Sized + Send + 'static,
1049         Self::Future: Send + 'static,
1050     {
1051         BoxCloneService::new(self)
1052     }
1053 }
1054 
1055 impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {}
1056 
1057 /// Convert an `Option<Layer>` into a [`Layer`].
1058 ///
1059 /// ```
1060 /// # use std::time::Duration;
1061 /// # use tower::Service;
1062 /// # use tower::builder::ServiceBuilder;
1063 /// use tower::util::option_layer;
1064 /// # use tower::timeout::TimeoutLayer;
1065 /// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
1066 /// # let timeout = Some(Duration::new(10, 0));
1067 /// // Layer to apply a timeout if configured
1068 /// let maybe_timeout = option_layer(timeout.map(TimeoutLayer::new));
1069 ///
1070 /// ServiceBuilder::new()
1071 ///     .layer(maybe_timeout)
1072 ///     .service(svc);
1073 /// # }
1074 /// ```
1075 ///
1076 /// [`Layer`]: crate::layer::Layer
option_layer<L>(layer: Option<L>) -> Either<L, Identity>1077 pub fn option_layer<L>(layer: Option<L>) -> Either<L, Identity> {
1078     if let Some(layer) = layer {
1079         Either::A(layer)
1080     } else {
1081         Either::B(Identity::new())
1082     }
1083 }
1084