1 #![cfg(feature = "buffer")]
2 #[path = "../support.rs"]
3 mod support;
4 use std::thread;
5 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
6 use tower::buffer::{error, Buffer};
7 use tower::{util::ServiceExt, Service};
8 use tower_test::{assert_request_eq, mock};
9 
let_worker_work()10 fn let_worker_work() {
11     // Allow the Buffer's executor to do work
12     thread::sleep(::std::time::Duration::from_millis(100));
13 }
14 
15 #[tokio::test(flavor = "current_thread")]
req_and_res()16 async fn req_and_res() {
17     let _t = support::trace_init();
18 
19     let (mut service, mut handle) = new_service();
20 
21     assert_ready_ok!(service.poll_ready());
22     let mut response = task::spawn(service.call("hello"));
23 
24     assert_request_eq!(handle, "hello").send_response("world");
25 
26     let_worker_work();
27     assert_eq!(assert_ready_ok!(response.poll()), "world");
28 }
29 
30 #[tokio::test(flavor = "current_thread")]
clears_canceled_requests()31 async fn clears_canceled_requests() {
32     let _t = support::trace_init();
33 
34     let (mut service, mut handle) = new_service();
35 
36     handle.allow(1);
37 
38     assert_ready_ok!(service.poll_ready());
39     let mut res1 = task::spawn(service.call("hello"));
40 
41     let send_response1 = assert_request_eq!(handle, "hello");
42 
43     // don't respond yet, new requests will get buffered
44     assert_ready_ok!(service.poll_ready());
45     let res2 = task::spawn(service.call("hello2"));
46 
47     assert_pending!(handle.poll_request());
48 
49     assert_ready_ok!(service.poll_ready());
50     let mut res3 = task::spawn(service.call("hello3"));
51 
52     drop(res2);
53 
54     send_response1.send_response("world");
55 
56     let_worker_work();
57     assert_eq!(assert_ready_ok!(res1.poll()), "world");
58 
59     // res2 was dropped, so it should have been canceled in the buffer
60     handle.allow(1);
61 
62     assert_request_eq!(handle, "hello3").send_response("world3");
63 
64     let_worker_work();
65     assert_eq!(assert_ready_ok!(res3.poll()), "world3");
66 }
67 
68 #[tokio::test(flavor = "current_thread")]
when_inner_is_not_ready()69 async fn when_inner_is_not_ready() {
70     let _t = support::trace_init();
71 
72     let (mut service, mut handle) = new_service();
73 
74     // Make the service NotReady
75     handle.allow(0);
76 
77     assert_ready_ok!(service.poll_ready());
78     let mut res1 = task::spawn(service.call("hello"));
79 
80     let_worker_work();
81     assert_pending!(res1.poll());
82     assert_pending!(handle.poll_request());
83 
84     handle.allow(1);
85 
86     assert_request_eq!(handle, "hello").send_response("world");
87 
88     let_worker_work();
89     assert_eq!(assert_ready_ok!(res1.poll()), "world");
90 }
91 
92 #[tokio::test(flavor = "current_thread")]
when_inner_fails()93 async fn when_inner_fails() {
94     use std::error::Error as StdError;
95     let _t = support::trace_init();
96 
97     let (mut service, mut handle) = new_service();
98 
99     // Make the service NotReady
100     handle.allow(0);
101     handle.send_error("foobar");
102 
103     assert_ready_ok!(service.poll_ready());
104     let mut res1 = task::spawn(service.call("hello"));
105 
106     let_worker_work();
107     let e = assert_ready_err!(res1.poll());
108     if let Some(e) = e.downcast_ref::<error::ServiceError>() {
109         let e = e.source().unwrap();
110 
111         assert_eq!(e.to_string(), "foobar");
112     } else {
113         panic!("unexpected error type: {:?}", e);
114     }
115 }
116 
117 #[tokio::test(flavor = "current_thread")]
poll_ready_when_worker_is_dropped_early()118 async fn poll_ready_when_worker_is_dropped_early() {
119     let _t = support::trace_init();
120 
121     let (service, _handle) = mock::pair::<(), ()>();
122 
123     let (service, worker) = Buffer::pair(service, 1);
124 
125     let mut service = mock::Spawn::new(service);
126 
127     drop(worker);
128 
129     let err = assert_ready_err!(service.poll_ready());
130 
131     assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
132 }
133 
134 #[tokio::test(flavor = "current_thread")]
response_future_when_worker_is_dropped_early()135 async fn response_future_when_worker_is_dropped_early() {
136     let _t = support::trace_init();
137 
138     let (service, mut handle) = mock::pair::<_, ()>();
139 
140     let (service, worker) = Buffer::pair(service, 1);
141 
142     let mut service = mock::Spawn::new(service);
143 
144     // keep the request in the worker
145     handle.allow(0);
146     assert_ready_ok!(service.poll_ready());
147     let mut response = task::spawn(service.call("hello"));
148 
149     drop(worker);
150 
151     let_worker_work();
152     let err = assert_ready_err!(response.poll());
153     assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
154 }
155 
156 #[tokio::test(flavor = "current_thread")]
waits_for_channel_capacity()157 async fn waits_for_channel_capacity() {
158     let _t = support::trace_init();
159 
160     let (service, mut handle) = mock::pair::<&'static str, &'static str>();
161 
162     let (service, worker) = Buffer::pair(service, 3);
163 
164     let mut service = mock::Spawn::new(service);
165     let mut worker = task::spawn(worker);
166 
167     // keep requests in the worker
168     handle.allow(0);
169     assert_ready_ok!(service.poll_ready());
170     let mut response1 = task::spawn(service.call("hello"));
171     assert_pending!(worker.poll());
172 
173     assert_ready_ok!(service.poll_ready());
174     let mut response2 = task::spawn(service.call("hello"));
175     assert_pending!(worker.poll());
176 
177     assert_ready_ok!(service.poll_ready());
178     let mut response3 = task::spawn(service.call("hello"));
179     assert_pending!(service.poll_ready());
180     assert_pending!(worker.poll());
181 
182     handle.allow(1);
183     assert_pending!(worker.poll());
184 
185     handle
186         .next_request()
187         .await
188         .unwrap()
189         .1
190         .send_response("world");
191     assert_pending!(worker.poll());
192     assert_ready_ok!(response1.poll());
193 
194     assert_ready_ok!(service.poll_ready());
195     let mut response4 = task::spawn(service.call("hello"));
196     assert_pending!(worker.poll());
197 
198     handle.allow(3);
199     assert_pending!(worker.poll());
200 
201     handle
202         .next_request()
203         .await
204         .unwrap()
205         .1
206         .send_response("world");
207     assert_pending!(worker.poll());
208     assert_ready_ok!(response2.poll());
209 
210     assert_pending!(worker.poll());
211     handle
212         .next_request()
213         .await
214         .unwrap()
215         .1
216         .send_response("world");
217     assert_pending!(worker.poll());
218     assert_ready_ok!(response3.poll());
219 
220     assert_pending!(worker.poll());
221     handle
222         .next_request()
223         .await
224         .unwrap()
225         .1
226         .send_response("world");
227     assert_pending!(worker.poll());
228     assert_ready_ok!(response4.poll());
229 }
230 
231 #[tokio::test(flavor = "current_thread")]
wakes_pending_waiters_on_close()232 async fn wakes_pending_waiters_on_close() {
233     let _t = support::trace_init();
234 
235     let (service, mut handle) = mock::pair::<_, ()>();
236 
237     let (mut service, worker) = Buffer::pair(service, 1);
238     let mut worker = task::spawn(worker);
239 
240     // keep the request in the worker
241     handle.allow(0);
242     let service1 = service.ready().await.unwrap();
243     assert_pending!(worker.poll());
244     let mut response = task::spawn(service1.call("hello"));
245 
246     let mut service1 = service.clone();
247     let mut ready1 = task::spawn(service1.ready());
248     assert_pending!(worker.poll());
249     assert_pending!(ready1.poll(), "no capacity");
250 
251     let mut service1 = service.clone();
252     let mut ready2 = task::spawn(service1.ready());
253     assert_pending!(worker.poll());
254     assert_pending!(ready2.poll(), "no capacity");
255 
256     // kill the worker task
257     drop(worker);
258 
259     let err = assert_ready_err!(response.poll());
260     assert!(
261         err.is::<error::Closed>(),
262         "response should fail with a Closed, got: {:?}",
263         err
264     );
265 
266     assert!(
267         ready1.is_woken(),
268         "dropping worker should wake ready task 1"
269     );
270     let err = assert_ready_err!(ready1.poll());
271     assert!(
272         err.is::<error::Closed>(),
273         "ready 1 should fail with a Closed, got: {:?}",
274         err
275     );
276 
277     assert!(
278         ready2.is_woken(),
279         "dropping worker should wake ready task 2"
280     );
281     let err = assert_ready_err!(ready1.poll());
282     assert!(
283         err.is::<error::Closed>(),
284         "ready 2 should fail with a Closed, got: {:?}",
285         err
286     );
287 }
288 
289 #[tokio::test(flavor = "current_thread")]
wakes_pending_waiters_on_failure()290 async fn wakes_pending_waiters_on_failure() {
291     let _t = support::trace_init();
292 
293     let (service, mut handle) = mock::pair::<_, ()>();
294 
295     let (mut service, worker) = Buffer::pair(service, 1);
296     let mut worker = task::spawn(worker);
297 
298     // keep the request in the worker
299     handle.allow(0);
300     let service1 = service.ready().await.unwrap();
301     assert_pending!(worker.poll());
302     let mut response = task::spawn(service1.call("hello"));
303 
304     let mut service1 = service.clone();
305     let mut ready1 = task::spawn(service1.ready());
306     assert_pending!(worker.poll());
307     assert_pending!(ready1.poll(), "no capacity");
308 
309     let mut service1 = service.clone();
310     let mut ready2 = task::spawn(service1.ready());
311     assert_pending!(worker.poll());
312     assert_pending!(ready2.poll(), "no capacity");
313 
314     // fail the inner service
315     handle.send_error("foobar");
316     // worker task terminates
317     assert_ready!(worker.poll());
318 
319     let err = assert_ready_err!(response.poll());
320     assert!(
321         err.is::<error::ServiceError>(),
322         "response should fail with a ServiceError, got: {:?}",
323         err
324     );
325 
326     assert!(
327         ready1.is_woken(),
328         "dropping worker should wake ready task 1"
329     );
330     let err = assert_ready_err!(ready1.poll());
331     assert!(
332         err.is::<error::ServiceError>(),
333         "ready 1 should fail with a ServiceError, got: {:?}",
334         err
335     );
336 
337     assert!(
338         ready2.is_woken(),
339         "dropping worker should wake ready task 2"
340     );
341     let err = assert_ready_err!(ready1.poll());
342     assert!(
343         err.is::<error::ServiceError>(),
344         "ready 2 should fail with a ServiceError, got: {:?}",
345         err
346     );
347 }
348 
349 #[tokio::test(flavor = "current_thread")]
propagates_trace_spans()350 async fn propagates_trace_spans() {
351     use tower::util::ServiceExt;
352     use tracing::Instrument;
353 
354     let _t = support::trace_init();
355 
356     let span = tracing::info_span!("my_span");
357 
358     let service = support::AssertSpanSvc::new(span.clone());
359     let (service, worker) = Buffer::pair(service, 5);
360     let worker = tokio::spawn(worker);
361 
362     let result = tokio::spawn(service.oneshot(()).instrument(span));
363 
364     result.await.expect("service panicked").expect("failed");
365     worker.await.expect("worker panicked");
366 }
367 
368 #[tokio::test(flavor = "current_thread")]
doesnt_leak_permits()369 async fn doesnt_leak_permits() {
370     let _t = support::trace_init();
371 
372     let (service, mut handle) = mock::pair::<_, ()>();
373 
374     let (mut service1, worker) = Buffer::pair(service, 2);
375     let mut worker = task::spawn(worker);
376     let mut service2 = service1.clone();
377     let mut service3 = service1.clone();
378 
379     // Attempt to poll the first clone of the buffer to readiness multiple
380     // times. These should all succeed, because the readiness is never
381     // *consumed* --- no request is sent.
382     assert_ready_ok!(task::spawn(service1.ready()).poll());
383     assert_ready_ok!(task::spawn(service1.ready()).poll());
384     assert_ready_ok!(task::spawn(service1.ready()).poll());
385 
386     // It should also be possible to drive the second clone of the service to
387     // readiness --- it should only acquire one permit, as well.
388     assert_ready_ok!(task::spawn(service2.ready()).poll());
389     assert_ready_ok!(task::spawn(service2.ready()).poll());
390     assert_ready_ok!(task::spawn(service2.ready()).poll());
391 
392     // The third clone *doesn't* poll ready, because the first two clones have
393     // each acquired one permit.
394     let mut ready3 = task::spawn(service3.ready());
395     assert_pending!(ready3.poll());
396 
397     // Consume the first service's readiness.
398     let mut response = task::spawn(service1.call(()));
399     handle.allow(1);
400     assert_pending!(worker.poll());
401 
402     handle.next_request().await.unwrap().1.send_response(());
403     assert_pending!(worker.poll());
404     assert_ready_ok!(response.poll());
405 
406     // Now, the third service should acquire a permit...
407     assert!(ready3.is_woken());
408     assert_ready_ok!(ready3.poll());
409 }
410 
411 type Mock = mock::Mock<&'static str, &'static str>;
412 type Handle = mock::Handle<&'static str, &'static str>;
413 
new_service() -> (mock::Spawn<Buffer<Mock, &'static str>>, Handle)414 fn new_service() -> (mock::Spawn<Buffer<Mock, &'static str>>, Handle) {
415     // bound is >0 here because clears_canceled_requests needs multiple outstanding requests
416     new_service_with_bound(10)
417 }
418 
new_service_with_bound(bound: usize) -> (mock::Spawn<Buffer<Mock, &'static str>>, Handle)419 fn new_service_with_bound(bound: usize) -> (mock::Spawn<Buffer<Mock, &'static str>>, Handle) {
420     mock::spawn_with(|s| {
421         let (svc, worker) = Buffer::pair(s, bound);
422 
423         thread::spawn(move || {
424             let mut fut = tokio_test::task::spawn(worker);
425             while fut.poll().is_pending() {}
426         });
427 
428         svc
429     })
430 }
431