1 #![cfg(feature = "buffer")]
2 #[path = "../support.rs"]
3 mod support;
4 use std::thread;
5 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
6 use tower::buffer::{error, Buffer};
7 use tower::{util::ServiceExt, Service};
8 use tower_test::{assert_request_eq, mock};
9
let_worker_work()10 fn let_worker_work() {
11 // Allow the Buffer's executor to do work
12 thread::sleep(::std::time::Duration::from_millis(100));
13 }
14
15 #[tokio::test(flavor = "current_thread")]
req_and_res()16 async fn req_and_res() {
17 let _t = support::trace_init();
18
19 let (mut service, mut handle) = new_service();
20
21 assert_ready_ok!(service.poll_ready());
22 let mut response = task::spawn(service.call("hello"));
23
24 assert_request_eq!(handle, "hello").send_response("world");
25
26 let_worker_work();
27 assert_eq!(assert_ready_ok!(response.poll()), "world");
28 }
29
30 #[tokio::test(flavor = "current_thread")]
clears_canceled_requests()31 async fn clears_canceled_requests() {
32 let _t = support::trace_init();
33
34 let (mut service, mut handle) = new_service();
35
36 handle.allow(1);
37
38 assert_ready_ok!(service.poll_ready());
39 let mut res1 = task::spawn(service.call("hello"));
40
41 let send_response1 = assert_request_eq!(handle, "hello");
42
43 // don't respond yet, new requests will get buffered
44 assert_ready_ok!(service.poll_ready());
45 let res2 = task::spawn(service.call("hello2"));
46
47 assert_pending!(handle.poll_request());
48
49 assert_ready_ok!(service.poll_ready());
50 let mut res3 = task::spawn(service.call("hello3"));
51
52 drop(res2);
53
54 send_response1.send_response("world");
55
56 let_worker_work();
57 assert_eq!(assert_ready_ok!(res1.poll()), "world");
58
59 // res2 was dropped, so it should have been canceled in the buffer
60 handle.allow(1);
61
62 assert_request_eq!(handle, "hello3").send_response("world3");
63
64 let_worker_work();
65 assert_eq!(assert_ready_ok!(res3.poll()), "world3");
66 }
67
68 #[tokio::test(flavor = "current_thread")]
when_inner_is_not_ready()69 async fn when_inner_is_not_ready() {
70 let _t = support::trace_init();
71
72 let (mut service, mut handle) = new_service();
73
74 // Make the service NotReady
75 handle.allow(0);
76
77 assert_ready_ok!(service.poll_ready());
78 let mut res1 = task::spawn(service.call("hello"));
79
80 let_worker_work();
81 assert_pending!(res1.poll());
82 assert_pending!(handle.poll_request());
83
84 handle.allow(1);
85
86 assert_request_eq!(handle, "hello").send_response("world");
87
88 let_worker_work();
89 assert_eq!(assert_ready_ok!(res1.poll()), "world");
90 }
91
92 #[tokio::test(flavor = "current_thread")]
when_inner_fails()93 async fn when_inner_fails() {
94 use std::error::Error as StdError;
95 let _t = support::trace_init();
96
97 let (mut service, mut handle) = new_service();
98
99 // Make the service NotReady
100 handle.allow(0);
101 handle.send_error("foobar");
102
103 assert_ready_ok!(service.poll_ready());
104 let mut res1 = task::spawn(service.call("hello"));
105
106 let_worker_work();
107 let e = assert_ready_err!(res1.poll());
108 if let Some(e) = e.downcast_ref::<error::ServiceError>() {
109 let e = e.source().unwrap();
110
111 assert_eq!(e.to_string(), "foobar");
112 } else {
113 panic!("unexpected error type: {:?}", e);
114 }
115 }
116
117 #[tokio::test(flavor = "current_thread")]
poll_ready_when_worker_is_dropped_early()118 async fn poll_ready_when_worker_is_dropped_early() {
119 let _t = support::trace_init();
120
121 let (service, _handle) = mock::pair::<(), ()>();
122
123 let (service, worker) = Buffer::pair(service, 1);
124
125 let mut service = mock::Spawn::new(service);
126
127 drop(worker);
128
129 let err = assert_ready_err!(service.poll_ready());
130
131 assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
132 }
133
134 #[tokio::test(flavor = "current_thread")]
response_future_when_worker_is_dropped_early()135 async fn response_future_when_worker_is_dropped_early() {
136 let _t = support::trace_init();
137
138 let (service, mut handle) = mock::pair::<_, ()>();
139
140 let (service, worker) = Buffer::pair(service, 1);
141
142 let mut service = mock::Spawn::new(service);
143
144 // keep the request in the worker
145 handle.allow(0);
146 assert_ready_ok!(service.poll_ready());
147 let mut response = task::spawn(service.call("hello"));
148
149 drop(worker);
150
151 let_worker_work();
152 let err = assert_ready_err!(response.poll());
153 assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
154 }
155
156 #[tokio::test(flavor = "current_thread")]
waits_for_channel_capacity()157 async fn waits_for_channel_capacity() {
158 let _t = support::trace_init();
159
160 let (service, mut handle) = mock::pair::<&'static str, &'static str>();
161
162 let (service, worker) = Buffer::pair(service, 3);
163
164 let mut service = mock::Spawn::new(service);
165 let mut worker = task::spawn(worker);
166
167 // keep requests in the worker
168 handle.allow(0);
169 assert_ready_ok!(service.poll_ready());
170 let mut response1 = task::spawn(service.call("hello"));
171 assert_pending!(worker.poll());
172
173 assert_ready_ok!(service.poll_ready());
174 let mut response2 = task::spawn(service.call("hello"));
175 assert_pending!(worker.poll());
176
177 assert_ready_ok!(service.poll_ready());
178 let mut response3 = task::spawn(service.call("hello"));
179 assert_pending!(service.poll_ready());
180 assert_pending!(worker.poll());
181
182 handle.allow(1);
183 assert_pending!(worker.poll());
184
185 handle
186 .next_request()
187 .await
188 .unwrap()
189 .1
190 .send_response("world");
191 assert_pending!(worker.poll());
192 assert_ready_ok!(response1.poll());
193
194 assert_ready_ok!(service.poll_ready());
195 let mut response4 = task::spawn(service.call("hello"));
196 assert_pending!(worker.poll());
197
198 handle.allow(3);
199 assert_pending!(worker.poll());
200
201 handle
202 .next_request()
203 .await
204 .unwrap()
205 .1
206 .send_response("world");
207 assert_pending!(worker.poll());
208 assert_ready_ok!(response2.poll());
209
210 assert_pending!(worker.poll());
211 handle
212 .next_request()
213 .await
214 .unwrap()
215 .1
216 .send_response("world");
217 assert_pending!(worker.poll());
218 assert_ready_ok!(response3.poll());
219
220 assert_pending!(worker.poll());
221 handle
222 .next_request()
223 .await
224 .unwrap()
225 .1
226 .send_response("world");
227 assert_pending!(worker.poll());
228 assert_ready_ok!(response4.poll());
229 }
230
231 #[tokio::test(flavor = "current_thread")]
wakes_pending_waiters_on_close()232 async fn wakes_pending_waiters_on_close() {
233 let _t = support::trace_init();
234
235 let (service, mut handle) = mock::pair::<_, ()>();
236
237 let (mut service, worker) = Buffer::pair(service, 1);
238 let mut worker = task::spawn(worker);
239
240 // keep the request in the worker
241 handle.allow(0);
242 let service1 = service.ready().await.unwrap();
243 assert_pending!(worker.poll());
244 let mut response = task::spawn(service1.call("hello"));
245
246 let mut service1 = service.clone();
247 let mut ready1 = task::spawn(service1.ready());
248 assert_pending!(worker.poll());
249 assert_pending!(ready1.poll(), "no capacity");
250
251 let mut service1 = service.clone();
252 let mut ready2 = task::spawn(service1.ready());
253 assert_pending!(worker.poll());
254 assert_pending!(ready2.poll(), "no capacity");
255
256 // kill the worker task
257 drop(worker);
258
259 let err = assert_ready_err!(response.poll());
260 assert!(
261 err.is::<error::Closed>(),
262 "response should fail with a Closed, got: {:?}",
263 err
264 );
265
266 assert!(
267 ready1.is_woken(),
268 "dropping worker should wake ready task 1"
269 );
270 let err = assert_ready_err!(ready1.poll());
271 assert!(
272 err.is::<error::Closed>(),
273 "ready 1 should fail with a Closed, got: {:?}",
274 err
275 );
276
277 assert!(
278 ready2.is_woken(),
279 "dropping worker should wake ready task 2"
280 );
281 let err = assert_ready_err!(ready1.poll());
282 assert!(
283 err.is::<error::Closed>(),
284 "ready 2 should fail with a Closed, got: {:?}",
285 err
286 );
287 }
288
289 #[tokio::test(flavor = "current_thread")]
wakes_pending_waiters_on_failure()290 async fn wakes_pending_waiters_on_failure() {
291 let _t = support::trace_init();
292
293 let (service, mut handle) = mock::pair::<_, ()>();
294
295 let (mut service, worker) = Buffer::pair(service, 1);
296 let mut worker = task::spawn(worker);
297
298 // keep the request in the worker
299 handle.allow(0);
300 let service1 = service.ready().await.unwrap();
301 assert_pending!(worker.poll());
302 let mut response = task::spawn(service1.call("hello"));
303
304 let mut service1 = service.clone();
305 let mut ready1 = task::spawn(service1.ready());
306 assert_pending!(worker.poll());
307 assert_pending!(ready1.poll(), "no capacity");
308
309 let mut service1 = service.clone();
310 let mut ready2 = task::spawn(service1.ready());
311 assert_pending!(worker.poll());
312 assert_pending!(ready2.poll(), "no capacity");
313
314 // fail the inner service
315 handle.send_error("foobar");
316 // worker task terminates
317 assert_ready!(worker.poll());
318
319 let err = assert_ready_err!(response.poll());
320 assert!(
321 err.is::<error::ServiceError>(),
322 "response should fail with a ServiceError, got: {:?}",
323 err
324 );
325
326 assert!(
327 ready1.is_woken(),
328 "dropping worker should wake ready task 1"
329 );
330 let err = assert_ready_err!(ready1.poll());
331 assert!(
332 err.is::<error::ServiceError>(),
333 "ready 1 should fail with a ServiceError, got: {:?}",
334 err
335 );
336
337 assert!(
338 ready2.is_woken(),
339 "dropping worker should wake ready task 2"
340 );
341 let err = assert_ready_err!(ready1.poll());
342 assert!(
343 err.is::<error::ServiceError>(),
344 "ready 2 should fail with a ServiceError, got: {:?}",
345 err
346 );
347 }
348
349 #[tokio::test(flavor = "current_thread")]
propagates_trace_spans()350 async fn propagates_trace_spans() {
351 use tower::util::ServiceExt;
352 use tracing::Instrument;
353
354 let _t = support::trace_init();
355
356 let span = tracing::info_span!("my_span");
357
358 let service = support::AssertSpanSvc::new(span.clone());
359 let (service, worker) = Buffer::pair(service, 5);
360 let worker = tokio::spawn(worker);
361
362 let result = tokio::spawn(service.oneshot(()).instrument(span));
363
364 result.await.expect("service panicked").expect("failed");
365 worker.await.expect("worker panicked");
366 }
367
368 #[tokio::test(flavor = "current_thread")]
doesnt_leak_permits()369 async fn doesnt_leak_permits() {
370 let _t = support::trace_init();
371
372 let (service, mut handle) = mock::pair::<_, ()>();
373
374 let (mut service1, worker) = Buffer::pair(service, 2);
375 let mut worker = task::spawn(worker);
376 let mut service2 = service1.clone();
377 let mut service3 = service1.clone();
378
379 // Attempt to poll the first clone of the buffer to readiness multiple
380 // times. These should all succeed, because the readiness is never
381 // *consumed* --- no request is sent.
382 assert_ready_ok!(task::spawn(service1.ready()).poll());
383 assert_ready_ok!(task::spawn(service1.ready()).poll());
384 assert_ready_ok!(task::spawn(service1.ready()).poll());
385
386 // It should also be possible to drive the second clone of the service to
387 // readiness --- it should only acquire one permit, as well.
388 assert_ready_ok!(task::spawn(service2.ready()).poll());
389 assert_ready_ok!(task::spawn(service2.ready()).poll());
390 assert_ready_ok!(task::spawn(service2.ready()).poll());
391
392 // The third clone *doesn't* poll ready, because the first two clones have
393 // each acquired one permit.
394 let mut ready3 = task::spawn(service3.ready());
395 assert_pending!(ready3.poll());
396
397 // Consume the first service's readiness.
398 let mut response = task::spawn(service1.call(()));
399 handle.allow(1);
400 assert_pending!(worker.poll());
401
402 handle.next_request().await.unwrap().1.send_response(());
403 assert_pending!(worker.poll());
404 assert_ready_ok!(response.poll());
405
406 // Now, the third service should acquire a permit...
407 assert!(ready3.is_woken());
408 assert_ready_ok!(ready3.poll());
409 }
410
411 type Mock = mock::Mock<&'static str, &'static str>;
412 type Handle = mock::Handle<&'static str, &'static str>;
413
new_service() -> (mock::Spawn<Buffer<Mock, &'static str>>, Handle)414 fn new_service() -> (mock::Spawn<Buffer<Mock, &'static str>>, Handle) {
415 // bound is >0 here because clears_canceled_requests needs multiple outstanding requests
416 new_service_with_bound(10)
417 }
418
new_service_with_bound(bound: usize) -> (mock::Spawn<Buffer<Mock, &'static str>>, Handle)419 fn new_service_with_bound(bound: usize) -> (mock::Spawn<Buffer<Mock, &'static str>>, Handle) {
420 mock::spawn_with(|s| {
421 let (svc, worker) = Buffer::pair(s, bound);
422
423 thread::spawn(move || {
424 let mut fut = tokio_test::task::spawn(worker);
425 while fut.poll().is_pending() {}
426 });
427
428 svc
429 })
430 }
431