1 #![cfg(feature = "ready-cache")]
2 #[path = "../support.rs"]
3 mod support;
4 
5 use std::pin::Pin;
6 use tokio_test::{assert_pending, assert_ready, task};
7 use tower::ready_cache::{error, ReadyCache};
8 use tower_test::mock;
9 
10 type Req = &'static str;
11 type Mock = mock::Mock<Req, Req>;
12 
13 #[test]
poll_ready_inner_failure()14 fn poll_ready_inner_failure() {
15     let _t = support::trace_init();
16 
17     let mut task = task::spawn(());
18     let mut cache = ReadyCache::<usize, Mock, Req>::default();
19 
20     let (service0, mut handle0) = mock::pair::<Req, Req>();
21     handle0.send_error("doom");
22     cache.push(0, service0);
23 
24     let (service1, mut handle1) = mock::pair::<Req, Req>();
25     handle1.allow(1);
26     cache.push(1, service1);
27 
28     let failed = assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap_err();
29 
30     assert_eq!(failed.0, 0);
31     assert_eq!(format!("{}", failed.1), "doom");
32 
33     assert_eq!(cache.len(), 1);
34 }
35 
36 #[test]
poll_ready_not_ready()37 fn poll_ready_not_ready() {
38     let _t = support::trace_init();
39 
40     let mut task = task::spawn(());
41     let mut cache = ReadyCache::<usize, Mock, Req>::default();
42 
43     let (service0, mut handle0) = mock::pair::<Req, Req>();
44     handle0.allow(0);
45     cache.push(0, service0);
46 
47     let (service1, mut handle1) = mock::pair::<Req, Req>();
48     handle1.allow(0);
49     cache.push(1, service1);
50 
51     assert_pending!(task.enter(|cx, _| cache.poll_pending(cx)));
52 
53     assert_eq!(cache.ready_len(), 0);
54     assert_eq!(cache.pending_len(), 2);
55     assert_eq!(cache.len(), 2);
56 }
57 
58 #[test]
poll_ready_promotes_inner()59 fn poll_ready_promotes_inner() {
60     let _t = support::trace_init();
61 
62     let mut task = task::spawn(());
63     let mut cache = ReadyCache::<usize, Mock, Req>::default();
64 
65     let (service0, mut handle0) = mock::pair::<Req, Req>();
66     handle0.allow(1);
67     cache.push(0, service0);
68 
69     let (service1, mut handle1) = mock::pair::<Req, Req>();
70     handle1.allow(1);
71     cache.push(1, service1);
72 
73     assert_eq!(cache.ready_len(), 0);
74     assert_eq!(cache.pending_len(), 2);
75     assert_eq!(cache.len(), 2);
76 
77     assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
78 
79     assert_eq!(cache.ready_len(), 2);
80     assert_eq!(cache.pending_len(), 0);
81     assert_eq!(cache.len(), 2);
82 }
83 
84 #[test]
evict_ready_then_error()85 fn evict_ready_then_error() {
86     let _t = support::trace_init();
87 
88     let mut task = task::spawn(());
89     let mut cache = ReadyCache::<usize, Mock, Req>::default();
90 
91     let (service, mut handle) = mock::pair::<Req, Req>();
92     handle.allow(0);
93     cache.push(0, service);
94 
95     assert_pending!(task.enter(|cx, _| cache.poll_pending(cx)));
96 
97     handle.allow(1);
98     assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
99 
100     handle.send_error("doom");
101     assert!(cache.evict(&0));
102 
103     assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
104 }
105 
106 #[test]
evict_pending_then_error()107 fn evict_pending_then_error() {
108     let _t = support::trace_init();
109 
110     let mut task = task::spawn(());
111     let mut cache = ReadyCache::<usize, Mock, Req>::default();
112 
113     let (service, mut handle) = mock::pair::<Req, Req>();
114     handle.allow(0);
115     cache.push(0, service);
116 
117     assert_pending!(task.enter(|cx, _| cache.poll_pending(cx)));
118 
119     handle.send_error("doom");
120     assert!(cache.evict(&0));
121 
122     assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
123 }
124 
125 #[test]
push_then_evict()126 fn push_then_evict() {
127     let _t = support::trace_init();
128 
129     let mut task = task::spawn(());
130     let mut cache = ReadyCache::<usize, Mock, Req>::default();
131 
132     let (service, mut handle) = mock::pair::<Req, Req>();
133     handle.allow(0);
134     cache.push(0, service);
135     handle.send_error("doom");
136     assert!(cache.evict(&0));
137 
138     assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
139 }
140 
141 #[test]
error_after_promote()142 fn error_after_promote() {
143     let _t = support::trace_init();
144 
145     let mut task = task::spawn(());
146     let mut cache = ReadyCache::<usize, Mock, Req>::default();
147 
148     let (service, mut handle) = mock::pair::<Req, Req>();
149     handle.allow(0);
150     cache.push(0, service);
151 
152     assert_pending!(task.enter(|cx, _| cache.poll_pending(cx)));
153 
154     handle.allow(1);
155     assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
156 
157     handle.send_error("doom");
158     assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
159 }
160 
161 #[test]
duplicate_key_by_index()162 fn duplicate_key_by_index() {
163     let _t = support::trace_init();
164 
165     let mut task = task::spawn(());
166     let mut cache = ReadyCache::<usize, Mock, Req>::default();
167 
168     let (service0, mut handle0) = mock::pair::<Req, Req>();
169     handle0.allow(1);
170     cache.push(0, service0);
171 
172     let (service1, mut handle1) = mock::pair::<Req, Req>();
173     handle1.allow(1);
174     // this push should replace the old service (service0)
175     cache.push(0, service1);
176 
177     // this evict should evict service1
178     cache.evict(&0);
179 
180     // poll_pending should complete (there are no remaining pending services)
181     assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
182     // but service 0 should not be ready (1 replaced, 1 evicted)
183     assert!(!task.enter(|cx, _| cache.check_ready(cx, &0)).unwrap());
184 
185     let (service2, mut handle2) = mock::pair::<Req, Req>();
186     handle2.allow(1);
187     // this push should ensure replace the evicted service1
188     cache.push(0, service2);
189 
190     // there should be no more pending
191     assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
192     // _and_ service 0 should now be callable
193     assert!(task.enter(|cx, _| cache.check_ready(cx, &0)).unwrap());
194 }
195 
196 // Tests https://github.com/tower-rs/tower/issues/415
197 #[tokio::test(flavor = "current_thread")]
cancelation_observed()198 async fn cancelation_observed() {
199     let mut cache = ReadyCache::default();
200     let mut handles = vec![];
201 
202     // NOTE This test passes at 129 items, but fails at 130 items (if coop
203     // schedulding interferes with cancelation).
204     for _ in 0..130 {
205         let (svc, mut handle) = tower_test::mock::pair::<(), ()>();
206         handle.allow(1);
207         cache.push("ep0", svc);
208         handles.push(handle);
209     }
210 
211     struct Ready(ReadyCache<&'static str, tower_test::mock::Mock<(), ()>, ()>);
212     impl Unpin for Ready {}
213     impl std::future::Future for Ready {
214         type Output = Result<(), error::Failed<&'static str>>;
215         fn poll(
216             self: Pin<&mut Self>,
217             cx: &mut std::task::Context<'_>,
218         ) -> std::task::Poll<Self::Output> {
219             self.get_mut().0.poll_pending(cx)
220         }
221     }
222     Ready(cache).await.unwrap();
223 }
224