1 #![cfg(feature = "balance")]
2 #[path = "../support.rs"]
3 mod support;
4
5 use std::future::Future;
6 use std::task::{Context, Poll};
7 use tokio_test::{assert_pending, assert_ready, task};
8 use tower::balance::p2c::Balance;
9 use tower::discover::Change;
10 use tower_service::Service;
11 use tower_test::mock;
12
13 type Req = &'static str;
14 struct Mock(mock::Mock<Req, Req>);
15
16 impl Service<Req> for Mock {
17 type Response = <mock::Mock<Req, Req> as Service<Req>>::Response;
18 type Error = <mock::Mock<Req, Req> as Service<Req>>::Error;
19 type Future = <mock::Mock<Req, Req> as Service<Req>>::Future;
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>>20 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
21 self.0.poll_ready(cx)
22 }
call(&mut self, req: Req) -> Self::Future23 fn call(&mut self, req: Req) -> Self::Future {
24 self.0.call(req)
25 }
26 }
27
28 impl tower::load::Load for Mock {
29 type Metric = usize;
load(&self) -> Self::Metric30 fn load(&self) -> Self::Metric {
31 rand::random()
32 }
33 }
34
35 #[test]
stress()36 fn stress() {
37 let _t = support::trace_init();
38 let mut task = task::spawn(());
39 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<_, &'static str>>();
40 let mut cache = Balance::<_, Req>::new(support::IntoStream::new(rx));
41
42 let mut nready = 0;
43 let mut services = slab::Slab::<(mock::Handle<Req, Req>, bool)>::new();
44 let mut retired = Vec::<mock::Handle<Req, Req>>::new();
45 for _ in 0..100_000 {
46 for _ in 0..(rand::random::<u8>() % 8) {
47 if !services.is_empty() && rand::random() {
48 if nready == 0 || rand::random::<u8>() > u8::max_value() / 4 {
49 // ready a service
50 // TODO: sometimes ready a removed service?
51 for (_, (handle, ready)) in &mut services {
52 if !*ready {
53 handle.allow(1);
54 *ready = true;
55 nready += 1;
56 break;
57 }
58 }
59 } else {
60 // use a service
61 use std::task::Poll;
62 match task.enter(|cx, _| cache.poll_ready(cx)) {
63 Poll::Ready(Ok(())) => {
64 assert_ne!(nready, 0, "got ready when no service is ready");
65 let mut fut = cache.call("hello");
66 let mut fut = std::pin::Pin::new(&mut fut);
67 assert_pending!(task.enter(|cx, _| fut.as_mut().poll(cx)));
68 let mut found = false;
69 for (_, (handle, ready)) in &mut services {
70 if *ready {
71 if let Poll::Ready(Some((req, res))) = handle.poll_request() {
72 assert_eq!(req, "hello");
73 res.send_response("world");
74 *ready = false;
75 nready -= 1;
76 found = true;
77 break;
78 }
79 }
80 }
81 if !found {
82 // we must have been given a retired service
83 let mut at = None;
84 for (i, handle) in retired.iter_mut().enumerate() {
85 if let Poll::Ready(Some((req, res))) = handle.poll_request() {
86 assert_eq!(req, "hello");
87 res.send_response("world");
88 at = Some(i);
89 break;
90 }
91 }
92 let _ = retired.swap_remove(
93 at.expect("request was not sent to a ready service"),
94 );
95 nready -= 1;
96 }
97 assert_ready!(task.enter(|cx, _| fut.as_mut().poll(cx))).unwrap();
98 }
99 Poll::Ready(_) => unreachable!("discover stream has not failed"),
100 Poll::Pending => {
101 // assert_eq!(nready, 0, "got pending when a service is ready");
102 }
103 }
104 }
105 } else if services.is_empty() || rand::random() {
106 if services.is_empty() || nready == 0 || rand::random() {
107 // add
108 let (svc, mut handle) = mock::pair::<Req, Req>();
109 let svc = Mock(svc);
110 handle.allow(0);
111 let k = services.insert((handle, false));
112 let ok = tx.send(Ok(Change::Insert(k, svc)));
113 assert!(ok.is_ok());
114 } else {
115 // remove
116 while !services.is_empty() {
117 let k = rand::random::<usize>() % (services.iter().last().unwrap().0 + 1);
118 if services.contains(k) {
119 let (handle, ready) = services.remove(k);
120 if ready {
121 retired.push(handle);
122 }
123 let ok = tx.send(Ok(Change::Remove(k)));
124 assert!(ok.is_ok());
125 break;
126 }
127 }
128 }
129 } else {
130 // fail a service
131 while !services.is_empty() {
132 let k = rand::random::<usize>() % (services.iter().last().unwrap().0 + 1);
133 if services.contains(k) {
134 let (mut handle, ready) = services.remove(k);
135 if ready {
136 nready -= 1;
137 }
138 handle.send_error("doom");
139 break;
140 }
141 }
142 }
143 }
144
145 let r = task.enter(|cx, _| cache.poll_ready(cx));
146
147 // drop any retired services that the p2c has gotten rid of
148 let mut removed = Vec::new();
149 for (i, handle) in retired.iter_mut().enumerate() {
150 if let Poll::Ready(None) = handle.poll_request() {
151 removed.push(i);
152 }
153 }
154 for i in removed.into_iter().rev() {
155 retired.swap_remove(i);
156 nready -= 1;
157 }
158
159 use std::task::Poll;
160 match r {
161 Poll::Ready(Ok(())) => {
162 assert_ne!(nready, 0, "got ready when no service is ready");
163 }
164 Poll::Ready(_) => unreachable!("discover stream has not failed"),
165 Poll::Pending => {
166 assert_eq!(nready, 0, "got pending when a service is ready");
167 }
168 }
169 }
170 }
171