1 use crate::discover::ServiceList;
2 use crate::load;
3 use futures_util::pin_mut;
4 use std::task::Poll;
5 use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
6 use tower_test::{assert_request_eq, mock};
7 
8 use super::*;
9 
10 #[tokio::test]
empty()11 async fn empty() {
12     let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
13     let disco = ServiceList::new(empty);
14     let mut svc = mock::Spawn::new(Balance::new(disco));
15     assert_pending!(svc.poll_ready());
16 }
17 
18 #[tokio::test]
single_endpoint()19 async fn single_endpoint() {
20     let (mut svc, mut handle) = mock::spawn_with(|s| {
21         let mock = load::Constant::new(s, 0);
22         let disco = ServiceList::new(vec![mock].into_iter());
23         Balance::new(disco)
24     });
25 
26     handle.allow(0);
27     assert_pending!(svc.poll_ready());
28     assert_eq!(
29         svc.get_ref().len(),
30         1,
31         "balancer must have discovered endpoint"
32     );
33 
34     handle.allow(1);
35     assert_ready_ok!(svc.poll_ready());
36 
37     let mut fut = task::spawn(svc.call(()));
38 
39     assert_request_eq!(handle, ()).send_response(1);
40 
41     assert_eq!(assert_ready_ok!(fut.poll()), 1);
42     handle.allow(1);
43     assert_ready_ok!(svc.poll_ready());
44 
45     handle.send_error("endpoint lost");
46     assert_pending!(svc.poll_ready());
47     assert!(
48         svc.get_ref().is_empty(),
49         "balancer must drop failed endpoints"
50     );
51 }
52 
53 #[tokio::test]
two_endpoints_with_equal_load()54 async fn two_endpoints_with_equal_load() {
55     let (mock_a, handle_a) = mock::pair();
56     let (mock_b, handle_b) = mock::pair();
57     let mock_a = load::Constant::new(mock_a, 1);
58     let mock_b = load::Constant::new(mock_b, 1);
59 
60     pin_mut!(handle_a);
61     pin_mut!(handle_b);
62 
63     let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
64     let mut svc = mock::Spawn::new(Balance::new(disco));
65 
66     handle_a.allow(0);
67     handle_b.allow(0);
68     assert_pending!(svc.poll_ready());
69     assert_eq!(
70         svc.get_ref().len(),
71         2,
72         "balancer must have discovered both endpoints"
73     );
74 
75     handle_a.allow(1);
76     handle_b.allow(0);
77     assert_ready_ok!(
78         svc.poll_ready(),
79         "must be ready when one of two services is ready"
80     );
81     {
82         let mut fut = task::spawn(svc.call(()));
83         assert_request_eq!(handle_a, ()).send_response("a");
84         assert_eq!(assert_ready_ok!(fut.poll()), "a");
85     }
86 
87     handle_a.allow(0);
88     handle_b.allow(1);
89     assert_ready_ok!(
90         svc.poll_ready(),
91         "must be ready when both endpoints are ready"
92     );
93     {
94         let mut fut = task::spawn(svc.call(()));
95         assert_request_eq!(handle_b, ()).send_response("b");
96         assert_eq!(assert_ready_ok!(fut.poll()), "b");
97     }
98 
99     handle_a.allow(1);
100     handle_b.allow(1);
101     for _ in 0..2 {
102         assert_ready_ok!(
103             svc.poll_ready(),
104             "must be ready when both endpoints are ready"
105         );
106         let mut fut = task::spawn(svc.call(()));
107 
108         for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] {
109             if let Poll::Ready(Some((_, tx))) = h.as_mut().poll_request() {
110                 tracing::info!("using {}", c);
111                 tx.send_response(c);
112                 h.allow(0);
113             }
114         }
115         assert_ready_ok!(fut.poll());
116     }
117 
118     handle_a.send_error("endpoint lost");
119     assert_pending!(svc.poll_ready());
120     assert_eq!(
121         svc.get_ref().len(),
122         1,
123         "balancer must drop failed endpoints",
124     );
125 }
126