1 use super::support;
2 use futures_core::Stream;
3 use futures_util::{
4 future::{ready, Ready},
5 pin_mut,
6 };
7 use std::task::{Context, Poll};
8 use std::{cell::Cell, rc::Rc};
9 use tokio_test::{assert_pending, assert_ready, task};
10 use tower::util::ServiceExt;
11 use tower_service::*;
12 use tower_test::{assert_request_eq, mock};
13
14 type Error = Box<dyn std::error::Error + Send + Sync>;
15
16 #[derive(Debug, Eq, PartialEq)]
17 struct Srv {
18 admit: Rc<Cell<bool>>,
19 count: Rc<Cell<usize>>,
20 }
21 impl Service<&'static str> for Srv {
22 type Response = &'static str;
23 type Error = Error;
24 type Future = Ready<Result<Self::Response, Error>>;
25
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>26 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
27 if !self.admit.get() {
28 return Poll::Pending;
29 }
30
31 self.admit.set(false);
32 Poll::Ready(Ok(()))
33 }
34
call(&mut self, req: &'static str) -> Self::Future35 fn call(&mut self, req: &'static str) -> Self::Future {
36 self.count.set(self.count.get() + 1);
37 ready(Ok(req))
38 }
39 }
40
41 #[test]
ordered()42 fn ordered() {
43 let _t = support::trace_init();
44
45 let mut mock = task::spawn(());
46
47 let admit = Rc::new(Cell::new(false));
48 let count = Rc::new(Cell::new(0));
49 let srv = Srv {
50 count: count.clone(),
51 admit: admit.clone(),
52 };
53 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
54 let ca = srv.call_all(support::IntoStream::new(rx));
55 pin_mut!(ca);
56
57 assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
58 tx.send("one").unwrap();
59 mock.is_woken();
60 assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
61 admit.set(true);
62 let v = assert_ready!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)))
63 .transpose()
64 .unwrap();
65 assert_eq!(v, Some("one"));
66 assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
67 admit.set(true);
68 tx.send("two").unwrap();
69 mock.is_woken();
70 tx.send("three").unwrap();
71 let v = assert_ready!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)))
72 .transpose()
73 .unwrap();
74 assert_eq!(v, Some("two"));
75 assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
76 admit.set(true);
77 let v = assert_ready!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)))
78 .transpose()
79 .unwrap();
80 assert_eq!(v, Some("three"));
81 admit.set(true);
82 assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
83 admit.set(true);
84 tx.send("four").unwrap();
85 mock.is_woken();
86 let v = assert_ready!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)))
87 .transpose()
88 .unwrap();
89 assert_eq!(v, Some("four"));
90 assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
91
92 // need to be ready since impl doesn't know it'll get EOF
93 admit.set(true);
94
95 // When we drop the request stream, CallAll should return None.
96 drop(tx);
97 mock.is_woken();
98 let v = assert_ready!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)))
99 .transpose()
100 .unwrap();
101 assert!(v.is_none());
102 assert_eq!(count.get(), 4);
103
104 // We should also be able to recover the wrapped Service.
105 assert_eq!(ca.take_service(), Srv { count, admit });
106 }
107
108 #[tokio::test(flavor = "current_thread")]
unordered()109 async fn unordered() {
110 let _t = support::trace_init();
111
112 let (mock, handle) = mock::pair::<_, &'static str>();
113 pin_mut!(handle);
114
115 let mut task = task::spawn(());
116 let requests = futures_util::stream::iter(&["one", "two"]);
117
118 let svc = mock.call_all(requests).unordered();
119 pin_mut!(svc);
120
121 assert_pending!(task.enter(|cx, _| svc.as_mut().poll_next(cx)));
122
123 let resp1 = assert_request_eq!(handle, &"one");
124 let resp2 = assert_request_eq!(handle, &"two");
125
126 resp2.send_response("resp 1");
127
128 let v = assert_ready!(task.enter(|cx, _| svc.as_mut().poll_next(cx)))
129 .transpose()
130 .unwrap();
131 assert_eq!(v, Some("resp 1"));
132 assert_pending!(task.enter(|cx, _| svc.as_mut().poll_next(cx)));
133
134 resp1.send_response("resp 2");
135
136 let v = assert_ready!(task.enter(|cx, _| svc.as_mut().poll_next(cx)))
137 .transpose()
138 .unwrap();
139 assert_eq!(v, Some("resp 2"));
140
141 let v = assert_ready!(task.enter(|cx, _| svc.as_mut().poll_next(cx)))
142 .transpose()
143 .unwrap();
144 assert!(v.is_none());
145 }
146
147 #[tokio::test]
pending()148 async fn pending() {
149 let _t = support::trace_init();
150
151 let (mock, mut handle) = mock::pair::<_, &'static str>();
152
153 let mut task = task::spawn(());
154
155 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
156 let ca = mock.call_all(support::IntoStream::new(rx));
157 pin_mut!(ca);
158
159 assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
160 tx.send("req").unwrap();
161 assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
162 assert_request_eq!(handle, "req").send_response("res");
163 let res = assert_ready!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
164 assert_eq!(res.transpose().unwrap(), Some("res"));
165 assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
166 }
167