1 use super::super::error;
2 use crate::discover::{Change, Discover};
3 use crate::load::Load;
4 use crate::ready_cache::{error::Failed, ReadyCache};
5 use futures_core::ready;
6 use futures_util::future::{self, TryFutureExt};
7 use pin_project_lite::pin_project;
8 use rand::{rngs::SmallRng, Rng, SeedableRng};
9 use std::hash::Hash;
10 use std::marker::PhantomData;
11 use std::{
12     fmt,
13     future::Future,
14     pin::Pin,
15     task::{Context, Poll},
16 };
17 use tokio::sync::oneshot;
18 use tower_service::Service;
19 use tracing::{debug, trace};
20 
21 /// Efficiently distributes requests across an arbitrary number of services.
22 ///
23 /// See the [module-level documentation](..) for details.
24 ///
25 /// Note that [`Balance`] requires that the [`Discover`] you use is [`Unpin`] in order to implement
26 /// [`Service`]. This is because it needs to be accessed from [`Service::poll_ready`], which takes
27 /// `&mut self`. You can achieve this easily by wrapping your [`Discover`] in [`Box::pin`] before you
28 /// construct the [`Balance`] instance. For more details, see [#319].
29 ///
30 /// [`Box::pin`]: std::boxed::Box::pin()
31 /// [#319]: https://github.com/tower-rs/tower/issues/319
32 pub struct Balance<D, Req>
33 where
34     D: Discover,
35     D::Key: Hash,
36 {
37     discover: D,
38 
39     services: ReadyCache<D::Key, D::Service, Req>,
40     ready_index: Option<usize>,
41 
42     rng: SmallRng,
43 
44     _req: PhantomData<Req>,
45 }
46 
47 impl<D: Discover, Req> fmt::Debug for Balance<D, Req>
48 where
49     D: fmt::Debug,
50     D::Key: Hash + fmt::Debug,
51     D::Service: fmt::Debug,
52 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result53     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54         f.debug_struct("Balance")
55             .field("discover", &self.discover)
56             .field("services", &self.services)
57             .finish()
58     }
59 }
60 
61 pin_project! {
62     /// A Future that becomes satisfied when an `S`-typed service is ready.
63     ///
64     /// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
65     struct UnreadyService<K, S, Req> {
66         key: Option<K>,
67         #[pin]
68         cancel: oneshot::Receiver<()>,
69         service: Option<S>,
70 
71         _req: PhantomData<Req>,
72     }
73 }
74 
75 enum Error<E> {
76     Inner(E),
77     Canceled,
78 }
79 
80 impl<D, Req> Balance<D, Req>
81 where
82     D: Discover,
83     D::Key: Hash,
84     D::Service: Service<Req>,
85     <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
86 {
87     /// Constructs a load balancer that uses operating system entropy.
new(discover: D) -> Self88     pub fn new(discover: D) -> Self {
89         Self::from_rng(discover, &mut rand::thread_rng()).expect("ThreadRNG must be valid")
90     }
91 
92     /// Constructs a load balancer seeded with the provided random number generator.
from_rng<R: Rng>(discover: D, rng: R) -> Result<Self, rand::Error>93     pub fn from_rng<R: Rng>(discover: D, rng: R) -> Result<Self, rand::Error> {
94         let rng = SmallRng::from_rng(rng)?;
95         Ok(Self {
96             rng,
97             discover,
98             services: ReadyCache::default(),
99             ready_index: None,
100 
101             _req: PhantomData,
102         })
103     }
104 
105     /// Returns the number of endpoints currently tracked by the balancer.
len(&self) -> usize106     pub fn len(&self) -> usize {
107         self.services.len()
108     }
109 
110     /// Returns whether or not the balancer is empty.
is_empty(&self) -> bool111     pub fn is_empty(&self) -> bool {
112         self.services.is_empty()
113     }
114 }
115 
116 impl<D, Req> Balance<D, Req>
117 where
118     D: Discover + Unpin,
119     D::Key: Hash + Clone,
120     D::Error: Into<crate::BoxError>,
121     D::Service: Service<Req> + Load,
122     <D::Service as Load>::Metric: std::fmt::Debug,
123     <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
124 {
125     /// Polls `discover` for updates, adding new items to `not_ready`.
126     ///
127     /// Removals may alter the order of either `ready` or `not_ready`.
update_pending_from_discover( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<(), error::Discover>>>128     fn update_pending_from_discover(
129         &mut self,
130         cx: &mut Context<'_>,
131     ) -> Poll<Option<Result<(), error::Discover>>> {
132         debug!("updating from discover");
133         loop {
134             match ready!(Pin::new(&mut self.discover).poll_discover(cx))
135                 .transpose()
136                 .map_err(|e| error::Discover(e.into()))?
137             {
138                 None => return Poll::Ready(None),
139                 Some(Change::Remove(key)) => {
140                     trace!("remove");
141                     self.services.evict(&key);
142                 }
143                 Some(Change::Insert(key, svc)) => {
144                     trace!("insert");
145                     // If this service already existed in the set, it will be
146                     // replaced as the new one becomes ready.
147                     self.services.push(key, svc);
148                 }
149             }
150         }
151     }
152 
promote_pending_to_ready(&mut self, cx: &mut Context<'_>)153     fn promote_pending_to_ready(&mut self, cx: &mut Context<'_>) {
154         loop {
155             match self.services.poll_pending(cx) {
156                 Poll::Ready(Ok(())) => {
157                     // There are no remaining pending services.
158                     debug_assert_eq!(self.services.pending_len(), 0);
159                     break;
160                 }
161                 Poll::Pending => {
162                     // None of the pending services are ready.
163                     debug_assert!(self.services.pending_len() > 0);
164                     break;
165                 }
166                 Poll::Ready(Err(error)) => {
167                     // An individual service was lost; continue processing
168                     // pending services.
169                     debug!(%error, "dropping failed endpoint");
170                 }
171             }
172         }
173         trace!(
174             ready = %self.services.ready_len(),
175             pending = %self.services.pending_len(),
176             "poll_unready"
177         );
178     }
179 
180     /// Performs P2C on inner services to find a suitable endpoint.
p2c_ready_index(&mut self) -> Option<usize>181     fn p2c_ready_index(&mut self) -> Option<usize> {
182         match self.services.ready_len() {
183             0 => None,
184             1 => Some(0),
185             len => {
186                 // Get two distinct random indexes (in a random order) and
187                 // compare the loads of the service at each index.
188                 let idxs = rand::seq::index::sample(&mut self.rng, len, 2);
189 
190                 let aidx = idxs.index(0);
191                 let bidx = idxs.index(1);
192                 debug_assert_ne!(aidx, bidx, "random indices must be distinct");
193 
194                 let aload = self.ready_index_load(aidx);
195                 let bload = self.ready_index_load(bidx);
196                 let chosen = if aload <= bload { aidx } else { bidx };
197 
198                 trace!(
199                     a.index = aidx,
200                     a.load = ?aload,
201                     b.index = bidx,
202                     b.load = ?bload,
203                     chosen = if chosen == aidx { "a" } else { "b" },
204                     "p2c",
205                 );
206                 Some(chosen)
207             }
208         }
209     }
210 
211     /// Accesses a ready endpoint by index and returns its current load.
ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric212     fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
213         let (_, svc) = self.services.get_ready_index(index).expect("invalid index");
214         svc.load()
215     }
216 
discover_mut(&mut self) -> &mut D217     pub(crate) fn discover_mut(&mut self) -> &mut D {
218         &mut self.discover
219     }
220 }
221 
222 impl<D, Req> Service<Req> for Balance<D, Req>
223 where
224     D: Discover + Unpin,
225     D::Key: Hash + Clone,
226     D::Error: Into<crate::BoxError>,
227     D::Service: Service<Req> + Load,
228     <D::Service as Load>::Metric: std::fmt::Debug,
229     <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
230 {
231     type Response = <D::Service as Service<Req>>::Response;
232     type Error = crate::BoxError;
233     type Future = future::MapErr<
234         <D::Service as Service<Req>>::Future,
235         fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
236     >;
237 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>238     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
239         // `ready_index` may have already been set by a prior invocation. These
240         // updates cannot disturb the order of existing ready services.
241         let _ = self.update_pending_from_discover(cx)?;
242         self.promote_pending_to_ready(cx);
243 
244         loop {
245             // If a service has already been selected, ensure that it is ready.
246             // This ensures that the underlying service is ready immediately
247             // before a request is dispatched to it (i.e. in the same task
248             // invocation). If, e.g., a failure detector has changed the state
249             // of the service, it may be evicted from the ready set so that
250             // another service can be selected.
251             if let Some(index) = self.ready_index.take() {
252                 match self.services.check_ready_index(cx, index) {
253                     Ok(true) => {
254                         // The service remains ready.
255                         self.ready_index = Some(index);
256                         return Poll::Ready(Ok(()));
257                     }
258                     Ok(false) => {
259                         // The service is no longer ready. Try to find a new one.
260                         trace!("ready service became unavailable");
261                     }
262                     Err(Failed(_, error)) => {
263                         // The ready endpoint failed, so log the error and try
264                         // to find a new one.
265                         debug!(%error, "endpoint failed");
266                     }
267                 }
268             }
269 
270             // Select a new service by comparing two at random and using the
271             // lesser-loaded service.
272             self.ready_index = self.p2c_ready_index();
273             if self.ready_index.is_none() {
274                 debug_assert_eq!(self.services.ready_len(), 0);
275                 // We have previously registered interest in updates from
276                 // discover and pending services.
277                 return Poll::Pending;
278             }
279         }
280     }
281 
call(&mut self, request: Req) -> Self::Future282     fn call(&mut self, request: Req) -> Self::Future {
283         let index = self.ready_index.take().expect("called before ready");
284         self.services
285             .call_ready_index(index, request)
286             .map_err(Into::into)
287     }
288 }
289 
290 impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
291     type Output = Result<(K, S), (K, Error<S::Error>)>;
292 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>293     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
294         let this = self.project();
295 
296         if let Poll::Ready(Ok(())) = this.cancel.poll(cx) {
297             let key = this.key.take().expect("polled after ready");
298             return Poll::Ready(Err((key, Error::Canceled)));
299         }
300 
301         let res = ready!(this
302             .service
303             .as_mut()
304             .expect("poll after ready")
305             .poll_ready(cx));
306 
307         let key = this.key.take().expect("polled after ready");
308         let svc = this.service.take().expect("polled after ready");
309 
310         match res {
311             Ok(()) => Poll::Ready(Ok((key, svc))),
312             Err(e) => Poll::Ready(Err((key, Error::Inner(e)))),
313         }
314     }
315 }
316 
317 impl<K, S, Req> fmt::Debug for UnreadyService<K, S, Req>
318 where
319     K: fmt::Debug,
320     S: fmt::Debug,
321 {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result322     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
323         let Self {
324             key,
325             cancel,
326             service,
327             _req,
328         } = self;
329         f.debug_struct("UnreadyService")
330             .field("key", key)
331             .field("cancel", cancel)
332             .field("service", service)
333             .finish()
334     }
335 }
336