1 #![cfg(feature = "ready-cache")]
2 #[path = "../support.rs"]
3 mod support;
4
5 use std::pin::Pin;
6 use tokio_test::{assert_pending, assert_ready, task};
7 use tower::ready_cache::{error, ReadyCache};
8 use tower_test::mock;
9
10 type Req = &'static str;
11 type Mock = mock::Mock<Req, Req>;
12
13 #[test]
poll_ready_inner_failure()14 fn poll_ready_inner_failure() {
15 let _t = support::trace_init();
16
17 let mut task = task::spawn(());
18 let mut cache = ReadyCache::<usize, Mock, Req>::default();
19
20 let (service0, mut handle0) = mock::pair::<Req, Req>();
21 handle0.send_error("doom");
22 cache.push(0, service0);
23
24 let (service1, mut handle1) = mock::pair::<Req, Req>();
25 handle1.allow(1);
26 cache.push(1, service1);
27
28 let failed = assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap_err();
29
30 assert_eq!(failed.0, 0);
31 assert_eq!(format!("{}", failed.1), "doom");
32
33 assert_eq!(cache.len(), 1);
34 }
35
36 #[test]
poll_ready_not_ready()37 fn poll_ready_not_ready() {
38 let _t = support::trace_init();
39
40 let mut task = task::spawn(());
41 let mut cache = ReadyCache::<usize, Mock, Req>::default();
42
43 let (service0, mut handle0) = mock::pair::<Req, Req>();
44 handle0.allow(0);
45 cache.push(0, service0);
46
47 let (service1, mut handle1) = mock::pair::<Req, Req>();
48 handle1.allow(0);
49 cache.push(1, service1);
50
51 assert_pending!(task.enter(|cx, _| cache.poll_pending(cx)));
52
53 assert_eq!(cache.ready_len(), 0);
54 assert_eq!(cache.pending_len(), 2);
55 assert_eq!(cache.len(), 2);
56 }
57
58 #[test]
poll_ready_promotes_inner()59 fn poll_ready_promotes_inner() {
60 let _t = support::trace_init();
61
62 let mut task = task::spawn(());
63 let mut cache = ReadyCache::<usize, Mock, Req>::default();
64
65 let (service0, mut handle0) = mock::pair::<Req, Req>();
66 handle0.allow(1);
67 cache.push(0, service0);
68
69 let (service1, mut handle1) = mock::pair::<Req, Req>();
70 handle1.allow(1);
71 cache.push(1, service1);
72
73 assert_eq!(cache.ready_len(), 0);
74 assert_eq!(cache.pending_len(), 2);
75 assert_eq!(cache.len(), 2);
76
77 assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
78
79 assert_eq!(cache.ready_len(), 2);
80 assert_eq!(cache.pending_len(), 0);
81 assert_eq!(cache.len(), 2);
82 }
83
84 #[test]
evict_ready_then_error()85 fn evict_ready_then_error() {
86 let _t = support::trace_init();
87
88 let mut task = task::spawn(());
89 let mut cache = ReadyCache::<usize, Mock, Req>::default();
90
91 let (service, mut handle) = mock::pair::<Req, Req>();
92 handle.allow(0);
93 cache.push(0, service);
94
95 assert_pending!(task.enter(|cx, _| cache.poll_pending(cx)));
96
97 handle.allow(1);
98 assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
99
100 handle.send_error("doom");
101 assert!(cache.evict(&0));
102
103 assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
104 }
105
106 #[test]
evict_pending_then_error()107 fn evict_pending_then_error() {
108 let _t = support::trace_init();
109
110 let mut task = task::spawn(());
111 let mut cache = ReadyCache::<usize, Mock, Req>::default();
112
113 let (service, mut handle) = mock::pair::<Req, Req>();
114 handle.allow(0);
115 cache.push(0, service);
116
117 assert_pending!(task.enter(|cx, _| cache.poll_pending(cx)));
118
119 handle.send_error("doom");
120 assert!(cache.evict(&0));
121
122 assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
123 }
124
125 #[test]
push_then_evict()126 fn push_then_evict() {
127 let _t = support::trace_init();
128
129 let mut task = task::spawn(());
130 let mut cache = ReadyCache::<usize, Mock, Req>::default();
131
132 let (service, mut handle) = mock::pair::<Req, Req>();
133 handle.allow(0);
134 cache.push(0, service);
135 handle.send_error("doom");
136 assert!(cache.evict(&0));
137
138 assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
139 }
140
141 #[test]
error_after_promote()142 fn error_after_promote() {
143 let _t = support::trace_init();
144
145 let mut task = task::spawn(());
146 let mut cache = ReadyCache::<usize, Mock, Req>::default();
147
148 let (service, mut handle) = mock::pair::<Req, Req>();
149 handle.allow(0);
150 cache.push(0, service);
151
152 assert_pending!(task.enter(|cx, _| cache.poll_pending(cx)));
153
154 handle.allow(1);
155 assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
156
157 handle.send_error("doom");
158 assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
159 }
160
161 #[test]
duplicate_key_by_index()162 fn duplicate_key_by_index() {
163 let _t = support::trace_init();
164
165 let mut task = task::spawn(());
166 let mut cache = ReadyCache::<usize, Mock, Req>::default();
167
168 let (service0, mut handle0) = mock::pair::<Req, Req>();
169 handle0.allow(1);
170 cache.push(0, service0);
171
172 let (service1, mut handle1) = mock::pair::<Req, Req>();
173 handle1.allow(1);
174 // this push should replace the old service (service0)
175 cache.push(0, service1);
176
177 // this evict should evict service1
178 cache.evict(&0);
179
180 // poll_pending should complete (there are no remaining pending services)
181 assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
182 // but service 0 should not be ready (1 replaced, 1 evicted)
183 assert!(!task.enter(|cx, _| cache.check_ready(cx, &0)).unwrap());
184
185 let (service2, mut handle2) = mock::pair::<Req, Req>();
186 handle2.allow(1);
187 // this push should ensure replace the evicted service1
188 cache.push(0, service2);
189
190 // there should be no more pending
191 assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
192 // _and_ service 0 should now be callable
193 assert!(task.enter(|cx, _| cache.check_ready(cx, &0)).unwrap());
194 }
195
196 // Tests https://github.com/tower-rs/tower/issues/415
197 #[tokio::test(flavor = "current_thread")]
cancelation_observed()198 async fn cancelation_observed() {
199 let mut cache = ReadyCache::default();
200 let mut handles = vec![];
201
202 // NOTE This test passes at 129 items, but fails at 130 items (if coop
203 // schedulding interferes with cancelation).
204 for _ in 0..130 {
205 let (svc, mut handle) = tower_test::mock::pair::<(), ()>();
206 handle.allow(1);
207 cache.push("ep0", svc);
208 handles.push(handle);
209 }
210
211 struct Ready(ReadyCache<&'static str, tower_test::mock::Mock<(), ()>, ()>);
212 impl Unpin for Ready {}
213 impl std::future::Future for Ready {
214 type Output = Result<(), error::Failed<&'static str>>;
215 fn poll(
216 self: Pin<&mut Self>,
217 cx: &mut std::task::Context<'_>,
218 ) -> std::task::Poll<Self::Output> {
219 self.get_mut().0.poll_pending(cx)
220 }
221 }
222 Ready(cache).await.unwrap();
223 }
224