1 #![cfg(feature = "hedge")]
2 #[path = "../support.rs"]
3 mod support;
4
5 use std::time::Duration;
6 use tokio::time;
7 use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
8 use tower::hedge::{Hedge, Policy};
9 use tower_test::{assert_request_eq, mock};
10
11 #[tokio::test(flavor = "current_thread")]
hedge_orig_completes_first()12 async fn hedge_orig_completes_first() {
13 let _t = support::trace_init();
14 time::pause();
15
16 let (mut service, mut handle) = new_service(TestPolicy);
17
18 assert_ready_ok!(service.poll_ready());
19 let mut fut = task::spawn(service.call("orig"));
20
21 // Check that orig request has been issued.
22 let req = assert_request_eq!(handle, "orig");
23 // Check fut is not ready.
24 assert_pending!(fut.poll());
25
26 // Check hedge has not been issued.
27 assert_pending!(handle.poll_request());
28 time::advance(Duration::from_millis(11)).await;
29 // Check fut is not ready.
30 assert_pending!(fut.poll());
31 // Check that the hedge has been issued.
32 let _hedge_req = assert_request_eq!(handle, "orig");
33
34 req.send_response("orig-done");
35 // Check that fut gets orig response.
36 assert_eq!(assert_ready_ok!(fut.poll()), "orig-done");
37 }
38
39 #[tokio::test(flavor = "current_thread")]
hedge_hedge_completes_first()40 async fn hedge_hedge_completes_first() {
41 let _t = support::trace_init();
42 time::pause();
43
44 let (mut service, mut handle) = new_service(TestPolicy);
45
46 assert_ready_ok!(service.poll_ready());
47 let mut fut = task::spawn(service.call("orig"));
48
49 // Check that orig request has been issued.
50 let _req = assert_request_eq!(handle, "orig");
51
52 // Check fut is not ready.
53 assert_pending!(fut.poll());
54
55 // Check hedge has not been issued.
56 assert_pending!(handle.poll_request());
57 time::advance(Duration::from_millis(11)).await;
58 // Check fut is not ready.
59 assert_pending!(fut.poll());
60
61 // Check that the hedge has been issued.
62 let hedge_req = assert_request_eq!(handle, "orig");
63 hedge_req.send_response("hedge-done");
64 // Check that fut gets hedge response.
65 assert_eq!(assert_ready_ok!(fut.poll()), "hedge-done");
66 }
67
68 #[tokio::test(flavor = "current_thread")]
completes_before_hedge()69 async fn completes_before_hedge() {
70 let _t = support::trace_init();
71 let (mut service, mut handle) = new_service(TestPolicy);
72
73 assert_ready_ok!(service.poll_ready());
74 let mut fut = task::spawn(service.call("orig"));
75
76 // Check that orig request has been issued.
77 let req = assert_request_eq!(handle, "orig");
78 // Check fut is not ready.
79 assert_pending!(fut.poll());
80
81 req.send_response("orig-done");
82 // Check hedge has not been issued.
83 assert_pending!(handle.poll_request());
84 // Check that fut gets orig response.
85 assert_eq!(assert_ready_ok!(fut.poll()), "orig-done");
86 }
87
88 #[tokio::test(flavor = "current_thread")]
request_not_retyable()89 async fn request_not_retyable() {
90 let _t = support::trace_init();
91 time::pause();
92
93 let (mut service, mut handle) = new_service(TestPolicy);
94
95 assert_ready_ok!(service.poll_ready());
96 let mut fut = task::spawn(service.call(NOT_RETRYABLE));
97
98 // Check that orig request has been issued.
99 let req = assert_request_eq!(handle, NOT_RETRYABLE);
100 // Check fut is not ready.
101 assert_pending!(fut.poll());
102
103 // Check hedge has not been issued.
104 assert_pending!(handle.poll_request());
105 time::advance(Duration::from_millis(10)).await;
106 // Check fut is not ready.
107 assert_pending!(fut.poll());
108 // Check hedge has not been issued.
109 assert_pending!(handle.poll_request());
110
111 req.send_response("orig-done");
112 // Check that fut gets orig response.
113 assert_eq!(assert_ready_ok!(fut.poll()), "orig-done");
114 }
115
116 #[tokio::test(flavor = "current_thread")]
request_not_clonable()117 async fn request_not_clonable() {
118 let _t = support::trace_init();
119 time::pause();
120
121 let (mut service, mut handle) = new_service(TestPolicy);
122
123 assert_ready_ok!(service.poll_ready());
124 let mut fut = task::spawn(service.call(NOT_CLONABLE));
125
126 // Check that orig request has been issued.
127 let req = assert_request_eq!(handle, NOT_CLONABLE);
128 // Check fut is not ready.
129 assert_pending!(fut.poll());
130
131 // Check hedge has not been issued.
132 assert_pending!(handle.poll_request());
133 time::advance(Duration::from_millis(10)).await;
134 // Check fut is not ready.
135 assert_pending!(fut.poll());
136 // Check hedge has not been issued.
137 assert_pending!(handle.poll_request());
138
139 req.send_response("orig-done");
140 // Check that fut gets orig response.
141 assert_eq!(assert_ready_ok!(fut.poll()), "orig-done");
142 }
143
144 type Req = &'static str;
145 type Res = &'static str;
146 type Mock = tower_test::mock::Mock<Req, Res>;
147 type Handle = tower_test::mock::Handle<Req, Res>;
148
149 static NOT_RETRYABLE: &str = "NOT_RETRYABLE";
150 static NOT_CLONABLE: &str = "NOT_CLONABLE";
151
152 #[derive(Clone)]
153 struct TestPolicy;
154
155 impl tower::hedge::Policy<Req> for TestPolicy {
can_retry(&self, req: &Req) -> bool156 fn can_retry(&self, req: &Req) -> bool {
157 *req != NOT_RETRYABLE
158 }
159
clone_request(&self, req: &Req) -> Option<Req>160 fn clone_request(&self, req: &Req) -> Option<Req> {
161 if *req == NOT_CLONABLE {
162 None
163 } else {
164 Some(req)
165 }
166 }
167 }
168
new_service<P: Policy<Req> + Clone>(policy: P) -> (mock::Spawn<Hedge<Mock, P>>, Handle)169 fn new_service<P: Policy<Req> + Clone>(policy: P) -> (mock::Spawn<Hedge<Mock, P>>, Handle) {
170 let (service, handle) = tower_test::mock::pair();
171
172 let mock_latencies: [u64; 10] = [1, 1, 1, 1, 1, 1, 1, 1, 10, 10];
173
174 let service = Hedge::new_with_mock_latencies(
175 service,
176 policy,
177 10,
178 0.9,
179 Duration::from_secs(60),
180 &mock_latencies,
181 );
182
183 (mock::Spawn::new(service), handle)
184 }
185