1 #![cfg(feature = "balance")]
2 #[path = "../support.rs"]
3 mod support;
4 
5 use std::future::Future;
6 use std::task::{Context, Poll};
7 use tokio_test::{assert_pending, assert_ready, task};
8 use tower::balance::p2c::Balance;
9 use tower::discover::Change;
10 use tower_service::Service;
11 use tower_test::mock;
12 
13 type Req = &'static str;
14 struct Mock(mock::Mock<Req, Req>);
15 
16 impl Service<Req> for Mock {
17     type Response = <mock::Mock<Req, Req> as Service<Req>>::Response;
18     type Error = <mock::Mock<Req, Req> as Service<Req>>::Error;
19     type Future = <mock::Mock<Req, Req> as Service<Req>>::Future;
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>>20     fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
21         self.0.poll_ready(cx)
22     }
call(&mut self, req: Req) -> Self::Future23     fn call(&mut self, req: Req) -> Self::Future {
24         self.0.call(req)
25     }
26 }
27 
28 impl tower::load::Load for Mock {
29     type Metric = usize;
load(&self) -> Self::Metric30     fn load(&self) -> Self::Metric {
31         rand::random()
32     }
33 }
34 
35 #[test]
stress()36 fn stress() {
37     let _t = support::trace_init();
38     let mut task = task::spawn(());
39     let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<_, &'static str>>();
40     let mut cache = Balance::<_, Req>::new(support::IntoStream::new(rx));
41 
42     let mut nready = 0;
43     let mut services = slab::Slab::<(mock::Handle<Req, Req>, bool)>::new();
44     let mut retired = Vec::<mock::Handle<Req, Req>>::new();
45     for _ in 0..100_000 {
46         for _ in 0..(rand::random::<u8>() % 8) {
47             if !services.is_empty() && rand::random() {
48                 if nready == 0 || rand::random::<u8>() > u8::max_value() / 4 {
49                     // ready a service
50                     // TODO: sometimes ready a removed service?
51                     for (_, (handle, ready)) in &mut services {
52                         if !*ready {
53                             handle.allow(1);
54                             *ready = true;
55                             nready += 1;
56                             break;
57                         }
58                     }
59                 } else {
60                     // use a service
61                     use std::task::Poll;
62                     match task.enter(|cx, _| cache.poll_ready(cx)) {
63                         Poll::Ready(Ok(())) => {
64                             assert_ne!(nready, 0, "got ready when no service is ready");
65                             let mut fut = cache.call("hello");
66                             let mut fut = std::pin::Pin::new(&mut fut);
67                             assert_pending!(task.enter(|cx, _| fut.as_mut().poll(cx)));
68                             let mut found = false;
69                             for (_, (handle, ready)) in &mut services {
70                                 if *ready {
71                                     if let Poll::Ready(Some((req, res))) = handle.poll_request() {
72                                         assert_eq!(req, "hello");
73                                         res.send_response("world");
74                                         *ready = false;
75                                         nready -= 1;
76                                         found = true;
77                                         break;
78                                     }
79                                 }
80                             }
81                             if !found {
82                                 // we must have been given a retired service
83                                 let mut at = None;
84                                 for (i, handle) in retired.iter_mut().enumerate() {
85                                     if let Poll::Ready(Some((req, res))) = handle.poll_request() {
86                                         assert_eq!(req, "hello");
87                                         res.send_response("world");
88                                         at = Some(i);
89                                         break;
90                                     }
91                                 }
92                                 let _ = retired.swap_remove(
93                                     at.expect("request was not sent to a ready service"),
94                                 );
95                                 nready -= 1;
96                             }
97                             assert_ready!(task.enter(|cx, _| fut.as_mut().poll(cx))).unwrap();
98                         }
99                         Poll::Ready(_) => unreachable!("discover stream has not failed"),
100                         Poll::Pending => {
101                             // assert_eq!(nready, 0, "got pending when a service is ready");
102                         }
103                     }
104                 }
105             } else if services.is_empty() || rand::random() {
106                 if services.is_empty() || nready == 0 || rand::random() {
107                     // add
108                     let (svc, mut handle) = mock::pair::<Req, Req>();
109                     let svc = Mock(svc);
110                     handle.allow(0);
111                     let k = services.insert((handle, false));
112                     let ok = tx.send(Ok(Change::Insert(k, svc)));
113                     assert!(ok.is_ok());
114                 } else {
115                     // remove
116                     while !services.is_empty() {
117                         let k = rand::random::<usize>() % (services.iter().last().unwrap().0 + 1);
118                         if services.contains(k) {
119                             let (handle, ready) = services.remove(k);
120                             if ready {
121                                 retired.push(handle);
122                             }
123                             let ok = tx.send(Ok(Change::Remove(k)));
124                             assert!(ok.is_ok());
125                             break;
126                         }
127                     }
128                 }
129             } else {
130                 // fail a service
131                 while !services.is_empty() {
132                     let k = rand::random::<usize>() % (services.iter().last().unwrap().0 + 1);
133                     if services.contains(k) {
134                         let (mut handle, ready) = services.remove(k);
135                         if ready {
136                             nready -= 1;
137                         }
138                         handle.send_error("doom");
139                         break;
140                     }
141                 }
142             }
143         }
144 
145         let r = task.enter(|cx, _| cache.poll_ready(cx));
146 
147         // drop any retired services that the p2c has gotten rid of
148         let mut removed = Vec::new();
149         for (i, handle) in retired.iter_mut().enumerate() {
150             if let Poll::Ready(None) = handle.poll_request() {
151                 removed.push(i);
152             }
153         }
154         for i in removed.into_iter().rev() {
155             retired.swap_remove(i);
156             nready -= 1;
157         }
158 
159         use std::task::Poll;
160         match r {
161             Poll::Ready(Ok(())) => {
162                 assert_ne!(nready, 0, "got ready when no service is ready");
163             }
164             Poll::Ready(_) => unreachable!("discover stream has not failed"),
165             Poll::Pending => {
166                 assert_eq!(nready, 0, "got pending when a service is ready");
167             }
168         }
169     }
170 }
171