1 #![cfg(feature = "retry")]
2 #[path = "../support.rs"]
3 mod support;
4
5 use futures_util::future;
6 use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task};
7 use tower::retry::Policy;
8 use tower_test::{assert_request_eq, mock};
9
10 #[tokio::test(flavor = "current_thread")]
retry_errors()11 async fn retry_errors() {
12 let _t = support::trace_init();
13
14 let (mut service, mut handle) = new_service(RetryErrors);
15
16 assert_ready_ok!(service.poll_ready());
17
18 let mut fut = task::spawn(service.call("hello"));
19
20 assert_request_eq!(handle, "hello").send_error("retry me");
21
22 assert_pending!(fut.poll());
23
24 assert_request_eq!(handle, "hello").send_response("world");
25
26 assert_eq!(fut.into_inner().await.unwrap(), "world");
27 }
28
29 #[tokio::test(flavor = "current_thread")]
retry_limit()30 async fn retry_limit() {
31 let _t = support::trace_init();
32
33 let (mut service, mut handle) = new_service(Limit(2));
34
35 assert_ready_ok!(service.poll_ready());
36
37 let mut fut = task::spawn(service.call("hello"));
38
39 assert_request_eq!(handle, "hello").send_error("retry 1");
40 assert_pending!(fut.poll());
41
42 assert_request_eq!(handle, "hello").send_error("retry 2");
43 assert_pending!(fut.poll());
44
45 assert_request_eq!(handle, "hello").send_error("retry 3");
46 assert_eq!(assert_ready_err!(fut.poll()).to_string(), "retry 3");
47 }
48
49 #[tokio::test(flavor = "current_thread")]
retry_error_inspection()50 async fn retry_error_inspection() {
51 let _t = support::trace_init();
52
53 let (mut service, mut handle) = new_service(UnlessErr("reject"));
54
55 assert_ready_ok!(service.poll_ready());
56 let mut fut = task::spawn(service.call("hello"));
57
58 assert_request_eq!(handle, "hello").send_error("retry 1");
59 assert_pending!(fut.poll());
60
61 assert_request_eq!(handle, "hello").send_error("reject");
62 assert_eq!(assert_ready_err!(fut.poll()).to_string(), "reject");
63 }
64
65 #[tokio::test(flavor = "current_thread")]
retry_cannot_clone_request()66 async fn retry_cannot_clone_request() {
67 let _t = support::trace_init();
68
69 let (mut service, mut handle) = new_service(CannotClone);
70
71 assert_ready_ok!(service.poll_ready());
72 let mut fut = task::spawn(service.call("hello"));
73
74 assert_request_eq!(handle, "hello").send_error("retry 1");
75 assert_eq!(assert_ready_err!(fut.poll()).to_string(), "retry 1");
76 }
77
78 #[tokio::test(flavor = "current_thread")]
success_with_cannot_clone()79 async fn success_with_cannot_clone() {
80 let _t = support::trace_init();
81
82 // Even though the request couldn't be cloned, if the first request succeeds,
83 // it should succeed overall.
84 let (mut service, mut handle) = new_service(CannotClone);
85
86 assert_ready_ok!(service.poll_ready());
87 let mut fut = task::spawn(service.call("hello"));
88
89 assert_request_eq!(handle, "hello").send_response("world");
90 assert_ready_ok!(fut.poll(), "world");
91 }
92
93 type Req = &'static str;
94 type Res = &'static str;
95 type InnerError = &'static str;
96 type Error = Box<dyn std::error::Error + Send + Sync>;
97 type Mock = mock::Mock<Req, Res>;
98 type Handle = mock::Handle<Req, Res>;
99
100 #[derive(Clone)]
101 struct RetryErrors;
102
103 impl Policy<Req, Res, Error> for RetryErrors {
104 type Future = future::Ready<Self>;
retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future>105 fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
106 if result.is_err() {
107 Some(future::ready(RetryErrors))
108 } else {
109 None
110 }
111 }
112
clone_request(&self, req: &Req) -> Option<Req>113 fn clone_request(&self, req: &Req) -> Option<Req> {
114 Some(*req)
115 }
116 }
117
118 #[derive(Clone)]
119 struct Limit(usize);
120
121 impl Policy<Req, Res, Error> for Limit {
122 type Future = future::Ready<Self>;
retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future>123 fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
124 if result.is_err() && self.0 > 0 {
125 Some(future::ready(Limit(self.0 - 1)))
126 } else {
127 None
128 }
129 }
130
clone_request(&self, req: &Req) -> Option<Req>131 fn clone_request(&self, req: &Req) -> Option<Req> {
132 Some(*req)
133 }
134 }
135
136 #[derive(Clone)]
137 struct UnlessErr(InnerError);
138
139 impl Policy<Req, Res, Error> for UnlessErr {
140 type Future = future::Ready<Self>;
retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future>141 fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
142 result.err().and_then(|err| {
143 if err.to_string() != self.0 {
144 Some(future::ready(self.clone()))
145 } else {
146 None
147 }
148 })
149 }
150
clone_request(&self, req: &Req) -> Option<Req>151 fn clone_request(&self, req: &Req) -> Option<Req> {
152 Some(*req)
153 }
154 }
155
156 #[derive(Clone)]
157 struct CannotClone;
158
159 impl Policy<Req, Res, Error> for CannotClone {
160 type Future = future::Ready<Self>;
retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option<Self::Future>161 fn retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option<Self::Future> {
162 unreachable!("retry cannot be called since request isn't cloned");
163 }
164
clone_request(&self, _req: &Req) -> Option<Req>165 fn clone_request(&self, _req: &Req) -> Option<Req> {
166 None
167 }
168 }
169
new_service<P: Policy<Req, Res, Error> + Clone>( policy: P, ) -> (mock::Spawn<tower::retry::Retry<P, Mock>>, Handle)170 fn new_service<P: Policy<Req, Res, Error> + Clone>(
171 policy: P,
172 ) -> (mock::Spawn<tower::retry::Retry<P, Mock>>, Handle) {
173 let retry = tower::retry::RetryLayer::new(policy);
174 mock::spawn_layer(retry)
175 }
176