1 #![cfg(feature = "hedge")]
2 #[path = "../support.rs"]
3 mod support;
4 
5 use std::time::Duration;
6 use tokio::time;
7 use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
8 use tower::hedge::{Hedge, Policy};
9 use tower_test::{assert_request_eq, mock};
10 
11 #[tokio::test(flavor = "current_thread")]
hedge_orig_completes_first()12 async fn hedge_orig_completes_first() {
13     let _t = support::trace_init();
14     time::pause();
15 
16     let (mut service, mut handle) = new_service(TestPolicy);
17 
18     assert_ready_ok!(service.poll_ready());
19     let mut fut = task::spawn(service.call("orig"));
20 
21     // Check that orig request has been issued.
22     let req = assert_request_eq!(handle, "orig");
23     // Check fut is not ready.
24     assert_pending!(fut.poll());
25 
26     // Check hedge has not been issued.
27     assert_pending!(handle.poll_request());
28     time::advance(Duration::from_millis(11)).await;
29     // Check fut is not ready.
30     assert_pending!(fut.poll());
31     // Check that the hedge has been issued.
32     let _hedge_req = assert_request_eq!(handle, "orig");
33 
34     req.send_response("orig-done");
35     // Check that fut gets orig response.
36     assert_eq!(assert_ready_ok!(fut.poll()), "orig-done");
37 }
38 
39 #[tokio::test(flavor = "current_thread")]
hedge_hedge_completes_first()40 async fn hedge_hedge_completes_first() {
41     let _t = support::trace_init();
42     time::pause();
43 
44     let (mut service, mut handle) = new_service(TestPolicy);
45 
46     assert_ready_ok!(service.poll_ready());
47     let mut fut = task::spawn(service.call("orig"));
48 
49     // Check that orig request has been issued.
50     let _req = assert_request_eq!(handle, "orig");
51 
52     // Check fut is not ready.
53     assert_pending!(fut.poll());
54 
55     // Check hedge has not been issued.
56     assert_pending!(handle.poll_request());
57     time::advance(Duration::from_millis(11)).await;
58     // Check fut is not ready.
59     assert_pending!(fut.poll());
60 
61     // Check that the hedge has been issued.
62     let hedge_req = assert_request_eq!(handle, "orig");
63     hedge_req.send_response("hedge-done");
64     // Check that fut gets hedge response.
65     assert_eq!(assert_ready_ok!(fut.poll()), "hedge-done");
66 }
67 
68 #[tokio::test(flavor = "current_thread")]
completes_before_hedge()69 async fn completes_before_hedge() {
70     let _t = support::trace_init();
71     let (mut service, mut handle) = new_service(TestPolicy);
72 
73     assert_ready_ok!(service.poll_ready());
74     let mut fut = task::spawn(service.call("orig"));
75 
76     // Check that orig request has been issued.
77     let req = assert_request_eq!(handle, "orig");
78     // Check fut is not ready.
79     assert_pending!(fut.poll());
80 
81     req.send_response("orig-done");
82     // Check hedge has not been issued.
83     assert_pending!(handle.poll_request());
84     // Check that fut gets orig response.
85     assert_eq!(assert_ready_ok!(fut.poll()), "orig-done");
86 }
87 
88 #[tokio::test(flavor = "current_thread")]
request_not_retyable()89 async fn request_not_retyable() {
90     let _t = support::trace_init();
91     time::pause();
92 
93     let (mut service, mut handle) = new_service(TestPolicy);
94 
95     assert_ready_ok!(service.poll_ready());
96     let mut fut = task::spawn(service.call(NOT_RETRYABLE));
97 
98     // Check that orig request has been issued.
99     let req = assert_request_eq!(handle, NOT_RETRYABLE);
100     // Check fut is not ready.
101     assert_pending!(fut.poll());
102 
103     // Check hedge has not been issued.
104     assert_pending!(handle.poll_request());
105     time::advance(Duration::from_millis(10)).await;
106     // Check fut is not ready.
107     assert_pending!(fut.poll());
108     // Check hedge has not been issued.
109     assert_pending!(handle.poll_request());
110 
111     req.send_response("orig-done");
112     // Check that fut gets orig response.
113     assert_eq!(assert_ready_ok!(fut.poll()), "orig-done");
114 }
115 
116 #[tokio::test(flavor = "current_thread")]
request_not_clonable()117 async fn request_not_clonable() {
118     let _t = support::trace_init();
119     time::pause();
120 
121     let (mut service, mut handle) = new_service(TestPolicy);
122 
123     assert_ready_ok!(service.poll_ready());
124     let mut fut = task::spawn(service.call(NOT_CLONABLE));
125 
126     // Check that orig request has been issued.
127     let req = assert_request_eq!(handle, NOT_CLONABLE);
128     // Check fut is not ready.
129     assert_pending!(fut.poll());
130 
131     // Check hedge has not been issued.
132     assert_pending!(handle.poll_request());
133     time::advance(Duration::from_millis(10)).await;
134     // Check fut is not ready.
135     assert_pending!(fut.poll());
136     // Check hedge has not been issued.
137     assert_pending!(handle.poll_request());
138 
139     req.send_response("orig-done");
140     // Check that fut gets orig response.
141     assert_eq!(assert_ready_ok!(fut.poll()), "orig-done");
142 }
143 
144 type Req = &'static str;
145 type Res = &'static str;
146 type Mock = tower_test::mock::Mock<Req, Res>;
147 type Handle = tower_test::mock::Handle<Req, Res>;
148 
149 static NOT_RETRYABLE: &str = "NOT_RETRYABLE";
150 static NOT_CLONABLE: &str = "NOT_CLONABLE";
151 
152 #[derive(Clone)]
153 struct TestPolicy;
154 
155 impl tower::hedge::Policy<Req> for TestPolicy {
can_retry(&self, req: &Req) -> bool156     fn can_retry(&self, req: &Req) -> bool {
157         *req != NOT_RETRYABLE
158     }
159 
clone_request(&self, req: &Req) -> Option<Req>160     fn clone_request(&self, req: &Req) -> Option<Req> {
161         if *req == NOT_CLONABLE {
162             None
163         } else {
164             Some(req)
165         }
166     }
167 }
168 
new_service<P: Policy<Req> + Clone>(policy: P) -> (mock::Spawn<Hedge<Mock, P>>, Handle)169 fn new_service<P: Policy<Req> + Clone>(policy: P) -> (mock::Spawn<Hedge<Mock, P>>, Handle) {
170     let (service, handle) = tower_test::mock::pair();
171 
172     let mock_latencies: [u64; 10] = [1, 1, 1, 1, 1, 1, 1, 1, 10, 10];
173 
174     let service = Hedge::new_with_mock_latencies(
175         service,
176         policy,
177         10,
178         0.9,
179         Duration::from_secs(60),
180         &mock_latencies,
181     );
182 
183     (mock::Spawn::new(service), handle)
184 }
185