1 #![cfg(feature = "retry")]
2 #[path = "../support.rs"]
3 mod support;
4 
5 use futures_util::future;
6 use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task};
7 use tower::retry::Policy;
8 use tower_test::{assert_request_eq, mock};
9 
10 #[tokio::test(flavor = "current_thread")]
retry_errors()11 async fn retry_errors() {
12     let _t = support::trace_init();
13 
14     let (mut service, mut handle) = new_service(RetryErrors);
15 
16     assert_ready_ok!(service.poll_ready());
17 
18     let mut fut = task::spawn(service.call("hello"));
19 
20     assert_request_eq!(handle, "hello").send_error("retry me");
21 
22     assert_pending!(fut.poll());
23 
24     assert_request_eq!(handle, "hello").send_response("world");
25 
26     assert_eq!(fut.into_inner().await.unwrap(), "world");
27 }
28 
29 #[tokio::test(flavor = "current_thread")]
retry_limit()30 async fn retry_limit() {
31     let _t = support::trace_init();
32 
33     let (mut service, mut handle) = new_service(Limit(2));
34 
35     assert_ready_ok!(service.poll_ready());
36 
37     let mut fut = task::spawn(service.call("hello"));
38 
39     assert_request_eq!(handle, "hello").send_error("retry 1");
40     assert_pending!(fut.poll());
41 
42     assert_request_eq!(handle, "hello").send_error("retry 2");
43     assert_pending!(fut.poll());
44 
45     assert_request_eq!(handle, "hello").send_error("retry 3");
46     assert_eq!(assert_ready_err!(fut.poll()).to_string(), "retry 3");
47 }
48 
49 #[tokio::test(flavor = "current_thread")]
retry_error_inspection()50 async fn retry_error_inspection() {
51     let _t = support::trace_init();
52 
53     let (mut service, mut handle) = new_service(UnlessErr("reject"));
54 
55     assert_ready_ok!(service.poll_ready());
56     let mut fut = task::spawn(service.call("hello"));
57 
58     assert_request_eq!(handle, "hello").send_error("retry 1");
59     assert_pending!(fut.poll());
60 
61     assert_request_eq!(handle, "hello").send_error("reject");
62     assert_eq!(assert_ready_err!(fut.poll()).to_string(), "reject");
63 }
64 
65 #[tokio::test(flavor = "current_thread")]
retry_cannot_clone_request()66 async fn retry_cannot_clone_request() {
67     let _t = support::trace_init();
68 
69     let (mut service, mut handle) = new_service(CannotClone);
70 
71     assert_ready_ok!(service.poll_ready());
72     let mut fut = task::spawn(service.call("hello"));
73 
74     assert_request_eq!(handle, "hello").send_error("retry 1");
75     assert_eq!(assert_ready_err!(fut.poll()).to_string(), "retry 1");
76 }
77 
78 #[tokio::test(flavor = "current_thread")]
success_with_cannot_clone()79 async fn success_with_cannot_clone() {
80     let _t = support::trace_init();
81 
82     // Even though the request couldn't be cloned, if the first request succeeds,
83     // it should succeed overall.
84     let (mut service, mut handle) = new_service(CannotClone);
85 
86     assert_ready_ok!(service.poll_ready());
87     let mut fut = task::spawn(service.call("hello"));
88 
89     assert_request_eq!(handle, "hello").send_response("world");
90     assert_ready_ok!(fut.poll(), "world");
91 }
92 
93 type Req = &'static str;
94 type Res = &'static str;
95 type InnerError = &'static str;
96 type Error = Box<dyn std::error::Error + Send + Sync>;
97 type Mock = mock::Mock<Req, Res>;
98 type Handle = mock::Handle<Req, Res>;
99 
100 #[derive(Clone)]
101 struct RetryErrors;
102 
103 impl Policy<Req, Res, Error> for RetryErrors {
104     type Future = future::Ready<Self>;
retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future>105     fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
106         if result.is_err() {
107             Some(future::ready(RetryErrors))
108         } else {
109             None
110         }
111     }
112 
clone_request(&self, req: &Req) -> Option<Req>113     fn clone_request(&self, req: &Req) -> Option<Req> {
114         Some(*req)
115     }
116 }
117 
118 #[derive(Clone)]
119 struct Limit(usize);
120 
121 impl Policy<Req, Res, Error> for Limit {
122     type Future = future::Ready<Self>;
retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future>123     fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
124         if result.is_err() && self.0 > 0 {
125             Some(future::ready(Limit(self.0 - 1)))
126         } else {
127             None
128         }
129     }
130 
clone_request(&self, req: &Req) -> Option<Req>131     fn clone_request(&self, req: &Req) -> Option<Req> {
132         Some(*req)
133     }
134 }
135 
136 #[derive(Clone)]
137 struct UnlessErr(InnerError);
138 
139 impl Policy<Req, Res, Error> for UnlessErr {
140     type Future = future::Ready<Self>;
retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future>141     fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
142         result.err().and_then(|err| {
143             if err.to_string() != self.0 {
144                 Some(future::ready(self.clone()))
145             } else {
146                 None
147             }
148         })
149     }
150 
clone_request(&self, req: &Req) -> Option<Req>151     fn clone_request(&self, req: &Req) -> Option<Req> {
152         Some(*req)
153     }
154 }
155 
156 #[derive(Clone)]
157 struct CannotClone;
158 
159 impl Policy<Req, Res, Error> for CannotClone {
160     type Future = future::Ready<Self>;
retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option<Self::Future>161     fn retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option<Self::Future> {
162         unreachable!("retry cannot be called since request isn't cloned");
163     }
164 
clone_request(&self, _req: &Req) -> Option<Req>165     fn clone_request(&self, _req: &Req) -> Option<Req> {
166         None
167     }
168 }
169 
new_service<P: Policy<Req, Res, Error> + Clone>( policy: P, ) -> (mock::Spawn<tower::retry::Retry<P, Mock>>, Handle)170 fn new_service<P: Policy<Req, Res, Error> + Clone>(
171     policy: P,
172 ) -> (mock::Spawn<tower::retry::Retry<P, Mock>>, Handle) {
173     let retry = tower::retry::RetryLayer::new(policy);
174     mock::spawn_layer(retry)
175 }
176