1 use crate::discover::ServiceList;
2 use crate::load;
3 use futures_util::pin_mut;
4 use std::task::Poll;
5 use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
6 use tower_test::{assert_request_eq, mock};
7
8 use super::*;
9
10 #[tokio::test]
empty()11 async fn empty() {
12 let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
13 let disco = ServiceList::new(empty);
14 let mut svc = mock::Spawn::new(Balance::new(disco));
15 assert_pending!(svc.poll_ready());
16 }
17
18 #[tokio::test]
single_endpoint()19 async fn single_endpoint() {
20 let (mut svc, mut handle) = mock::spawn_with(|s| {
21 let mock = load::Constant::new(s, 0);
22 let disco = ServiceList::new(vec![mock].into_iter());
23 Balance::new(disco)
24 });
25
26 handle.allow(0);
27 assert_pending!(svc.poll_ready());
28 assert_eq!(
29 svc.get_ref().len(),
30 1,
31 "balancer must have discovered endpoint"
32 );
33
34 handle.allow(1);
35 assert_ready_ok!(svc.poll_ready());
36
37 let mut fut = task::spawn(svc.call(()));
38
39 assert_request_eq!(handle, ()).send_response(1);
40
41 assert_eq!(assert_ready_ok!(fut.poll()), 1);
42 handle.allow(1);
43 assert_ready_ok!(svc.poll_ready());
44
45 handle.send_error("endpoint lost");
46 assert_pending!(svc.poll_ready());
47 assert!(
48 svc.get_ref().is_empty(),
49 "balancer must drop failed endpoints"
50 );
51 }
52
53 #[tokio::test]
two_endpoints_with_equal_load()54 async fn two_endpoints_with_equal_load() {
55 let (mock_a, handle_a) = mock::pair();
56 let (mock_b, handle_b) = mock::pair();
57 let mock_a = load::Constant::new(mock_a, 1);
58 let mock_b = load::Constant::new(mock_b, 1);
59
60 pin_mut!(handle_a);
61 pin_mut!(handle_b);
62
63 let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
64 let mut svc = mock::Spawn::new(Balance::new(disco));
65
66 handle_a.allow(0);
67 handle_b.allow(0);
68 assert_pending!(svc.poll_ready());
69 assert_eq!(
70 svc.get_ref().len(),
71 2,
72 "balancer must have discovered both endpoints"
73 );
74
75 handle_a.allow(1);
76 handle_b.allow(0);
77 assert_ready_ok!(
78 svc.poll_ready(),
79 "must be ready when one of two services is ready"
80 );
81 {
82 let mut fut = task::spawn(svc.call(()));
83 assert_request_eq!(handle_a, ()).send_response("a");
84 assert_eq!(assert_ready_ok!(fut.poll()), "a");
85 }
86
87 handle_a.allow(0);
88 handle_b.allow(1);
89 assert_ready_ok!(
90 svc.poll_ready(),
91 "must be ready when both endpoints are ready"
92 );
93 {
94 let mut fut = task::spawn(svc.call(()));
95 assert_request_eq!(handle_b, ()).send_response("b");
96 assert_eq!(assert_ready_ok!(fut.poll()), "b");
97 }
98
99 handle_a.allow(1);
100 handle_b.allow(1);
101 for _ in 0..2 {
102 assert_ready_ok!(
103 svc.poll_ready(),
104 "must be ready when both endpoints are ready"
105 );
106 let mut fut = task::spawn(svc.call(()));
107
108 for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] {
109 if let Poll::Ready(Some((_, tx))) = h.as_mut().poll_request() {
110 tracing::info!("using {}", c);
111 tx.send_response(c);
112 h.allow(0);
113 }
114 }
115 assert_ready_ok!(fut.poll());
116 }
117
118 handle_a.send_error("endpoint lost");
119 assert_pending!(svc.poll_ready());
120 assert_eq!(
121 svc.get_ref().len(),
122 1,
123 "balancer must drop failed endpoints",
124 );
125 }
126