1 use super::support;
2 use futures_core::Stream;
3 use futures_util::{
4     future::{ready, Ready},
5     pin_mut,
6 };
7 use std::task::{Context, Poll};
8 use std::{cell::Cell, rc::Rc};
9 use tokio_test::{assert_pending, assert_ready, task};
10 use tower::util::ServiceExt;
11 use tower_service::*;
12 use tower_test::{assert_request_eq, mock};
13 
14 type Error = Box<dyn std::error::Error + Send + Sync>;
15 
16 #[derive(Debug, Eq, PartialEq)]
17 struct Srv {
18     admit: Rc<Cell<bool>>,
19     count: Rc<Cell<usize>>,
20 }
21 impl Service<&'static str> for Srv {
22     type Response = &'static str;
23     type Error = Error;
24     type Future = Ready<Result<Self::Response, Error>>;
25 
poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>26     fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
27         if !self.admit.get() {
28             return Poll::Pending;
29         }
30 
31         self.admit.set(false);
32         Poll::Ready(Ok(()))
33     }
34 
call(&mut self, req: &'static str) -> Self::Future35     fn call(&mut self, req: &'static str) -> Self::Future {
36         self.count.set(self.count.get() + 1);
37         ready(Ok(req))
38     }
39 }
40 
41 #[test]
ordered()42 fn ordered() {
43     let _t = support::trace_init();
44 
45     let mut mock = task::spawn(());
46 
47     let admit = Rc::new(Cell::new(false));
48     let count = Rc::new(Cell::new(0));
49     let srv = Srv {
50         count: count.clone(),
51         admit: admit.clone(),
52     };
53     let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
54     let ca = srv.call_all(support::IntoStream::new(rx));
55     pin_mut!(ca);
56 
57     assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
58     tx.send("one").unwrap();
59     mock.is_woken();
60     assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
61     admit.set(true);
62     let v = assert_ready!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)))
63         .transpose()
64         .unwrap();
65     assert_eq!(v, Some("one"));
66     assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
67     admit.set(true);
68     tx.send("two").unwrap();
69     mock.is_woken();
70     tx.send("three").unwrap();
71     let v = assert_ready!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)))
72         .transpose()
73         .unwrap();
74     assert_eq!(v, Some("two"));
75     assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
76     admit.set(true);
77     let v = assert_ready!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)))
78         .transpose()
79         .unwrap();
80     assert_eq!(v, Some("three"));
81     admit.set(true);
82     assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
83     admit.set(true);
84     tx.send("four").unwrap();
85     mock.is_woken();
86     let v = assert_ready!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)))
87         .transpose()
88         .unwrap();
89     assert_eq!(v, Some("four"));
90     assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
91 
92     // need to be ready since impl doesn't know it'll get EOF
93     admit.set(true);
94 
95     // When we drop the request stream, CallAll should return None.
96     drop(tx);
97     mock.is_woken();
98     let v = assert_ready!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)))
99         .transpose()
100         .unwrap();
101     assert!(v.is_none());
102     assert_eq!(count.get(), 4);
103 
104     // We should also be able to recover the wrapped Service.
105     assert_eq!(ca.take_service(), Srv { count, admit });
106 }
107 
108 #[tokio::test(flavor = "current_thread")]
unordered()109 async fn unordered() {
110     let _t = support::trace_init();
111 
112     let (mock, handle) = mock::pair::<_, &'static str>();
113     pin_mut!(handle);
114 
115     let mut task = task::spawn(());
116     let requests = futures_util::stream::iter(&["one", "two"]);
117 
118     let svc = mock.call_all(requests).unordered();
119     pin_mut!(svc);
120 
121     assert_pending!(task.enter(|cx, _| svc.as_mut().poll_next(cx)));
122 
123     let resp1 = assert_request_eq!(handle, &"one");
124     let resp2 = assert_request_eq!(handle, &"two");
125 
126     resp2.send_response("resp 1");
127 
128     let v = assert_ready!(task.enter(|cx, _| svc.as_mut().poll_next(cx)))
129         .transpose()
130         .unwrap();
131     assert_eq!(v, Some("resp 1"));
132     assert_pending!(task.enter(|cx, _| svc.as_mut().poll_next(cx)));
133 
134     resp1.send_response("resp 2");
135 
136     let v = assert_ready!(task.enter(|cx, _| svc.as_mut().poll_next(cx)))
137         .transpose()
138         .unwrap();
139     assert_eq!(v, Some("resp 2"));
140 
141     let v = assert_ready!(task.enter(|cx, _| svc.as_mut().poll_next(cx)))
142         .transpose()
143         .unwrap();
144     assert!(v.is_none());
145 }
146 
147 #[tokio::test]
pending()148 async fn pending() {
149     let _t = support::trace_init();
150 
151     let (mock, mut handle) = mock::pair::<_, &'static str>();
152 
153     let mut task = task::spawn(());
154 
155     let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
156     let ca = mock.call_all(support::IntoStream::new(rx));
157     pin_mut!(ca);
158 
159     assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
160     tx.send("req").unwrap();
161     assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
162     assert_request_eq!(handle, "req").send_response("res");
163     let res = assert_ready!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
164     assert_eq!(res.transpose().unwrap(), Some("res"));
165     assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
166 }
167