1 use super::super::error; 2 use crate::discover::{Change, Discover}; 3 use crate::load::Load; 4 use crate::ready_cache::{error::Failed, ReadyCache}; 5 use futures_core::ready; 6 use futures_util::future::{self, TryFutureExt}; 7 use pin_project_lite::pin_project; 8 use rand::{rngs::SmallRng, Rng, SeedableRng}; 9 use std::hash::Hash; 10 use std::marker::PhantomData; 11 use std::{ 12 fmt, 13 future::Future, 14 pin::Pin, 15 task::{Context, Poll}, 16 }; 17 use tokio::sync::oneshot; 18 use tower_service::Service; 19 use tracing::{debug, trace}; 20 21 /// Efficiently distributes requests across an arbitrary number of services. 22 /// 23 /// See the [module-level documentation](..) for details. 24 /// 25 /// Note that [`Balance`] requires that the [`Discover`] you use is [`Unpin`] in order to implement 26 /// [`Service`]. This is because it needs to be accessed from [`Service::poll_ready`], which takes 27 /// `&mut self`. You can achieve this easily by wrapping your [`Discover`] in [`Box::pin`] before you 28 /// construct the [`Balance`] instance. For more details, see [#319]. 29 /// 30 /// [`Box::pin`]: std::boxed::Box::pin() 31 /// [#319]: https://github.com/tower-rs/tower/issues/319 32 pub struct Balance<D, Req> 33 where 34 D: Discover, 35 D::Key: Hash, 36 { 37 discover: D, 38 39 services: ReadyCache<D::Key, D::Service, Req>, 40 ready_index: Option<usize>, 41 42 rng: SmallRng, 43 44 _req: PhantomData<Req>, 45 } 46 47 impl<D: Discover, Req> fmt::Debug for Balance<D, Req> 48 where 49 D: fmt::Debug, 50 D::Key: Hash + fmt::Debug, 51 D::Service: fmt::Debug, 52 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 54 f.debug_struct("Balance") 55 .field("discover", &self.discover) 56 .field("services", &self.services) 57 .finish() 58 } 59 } 60 61 pin_project! { 62 /// A Future that becomes satisfied when an `S`-typed service is ready. 63 /// 64 /// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set. 65 struct UnreadyService<K, S, Req> { 66 key: Option<K>, 67 #[pin] 68 cancel: oneshot::Receiver<()>, 69 service: Option<S>, 70 71 _req: PhantomData<Req>, 72 } 73 } 74 75 enum Error<E> { 76 Inner(E), 77 Canceled, 78 } 79 80 impl<D, Req> Balance<D, Req> 81 where 82 D: Discover, 83 D::Key: Hash, 84 D::Service: Service<Req>, 85 <D::Service as Service<Req>>::Error: Into<crate::BoxError>, 86 { 87 /// Constructs a load balancer that uses operating system entropy. new(discover: D) -> Self88 pub fn new(discover: D) -> Self { 89 Self::from_rng(discover, &mut rand::thread_rng()).expect("ThreadRNG must be valid") 90 } 91 92 /// Constructs a load balancer seeded with the provided random number generator. from_rng<R: Rng>(discover: D, rng: R) -> Result<Self, rand::Error>93 pub fn from_rng<R: Rng>(discover: D, rng: R) -> Result<Self, rand::Error> { 94 let rng = SmallRng::from_rng(rng)?; 95 Ok(Self { 96 rng, 97 discover, 98 services: ReadyCache::default(), 99 ready_index: None, 100 101 _req: PhantomData, 102 }) 103 } 104 105 /// Returns the number of endpoints currently tracked by the balancer. len(&self) -> usize106 pub fn len(&self) -> usize { 107 self.services.len() 108 } 109 110 /// Returns whether or not the balancer is empty. is_empty(&self) -> bool111 pub fn is_empty(&self) -> bool { 112 self.services.is_empty() 113 } 114 } 115 116 impl<D, Req> Balance<D, Req> 117 where 118 D: Discover + Unpin, 119 D::Key: Hash + Clone, 120 D::Error: Into<crate::BoxError>, 121 D::Service: Service<Req> + Load, 122 <D::Service as Load>::Metric: std::fmt::Debug, 123 <D::Service as Service<Req>>::Error: Into<crate::BoxError>, 124 { 125 /// Polls `discover` for updates, adding new items to `not_ready`. 126 /// 127 /// Removals may alter the order of either `ready` or `not_ready`. update_pending_from_discover( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<(), error::Discover>>>128 fn update_pending_from_discover( 129 &mut self, 130 cx: &mut Context<'_>, 131 ) -> Poll<Option<Result<(), error::Discover>>> { 132 debug!("updating from discover"); 133 loop { 134 match ready!(Pin::new(&mut self.discover).poll_discover(cx)) 135 .transpose() 136 .map_err(|e| error::Discover(e.into()))? 137 { 138 None => return Poll::Ready(None), 139 Some(Change::Remove(key)) => { 140 trace!("remove"); 141 self.services.evict(&key); 142 } 143 Some(Change::Insert(key, svc)) => { 144 trace!("insert"); 145 // If this service already existed in the set, it will be 146 // replaced as the new one becomes ready. 147 self.services.push(key, svc); 148 } 149 } 150 } 151 } 152 promote_pending_to_ready(&mut self, cx: &mut Context<'_>)153 fn promote_pending_to_ready(&mut self, cx: &mut Context<'_>) { 154 loop { 155 match self.services.poll_pending(cx) { 156 Poll::Ready(Ok(())) => { 157 // There are no remaining pending services. 158 debug_assert_eq!(self.services.pending_len(), 0); 159 break; 160 } 161 Poll::Pending => { 162 // None of the pending services are ready. 163 debug_assert!(self.services.pending_len() > 0); 164 break; 165 } 166 Poll::Ready(Err(error)) => { 167 // An individual service was lost; continue processing 168 // pending services. 169 debug!(%error, "dropping failed endpoint"); 170 } 171 } 172 } 173 trace!( 174 ready = %self.services.ready_len(), 175 pending = %self.services.pending_len(), 176 "poll_unready" 177 ); 178 } 179 180 /// Performs P2C on inner services to find a suitable endpoint. p2c_ready_index(&mut self) -> Option<usize>181 fn p2c_ready_index(&mut self) -> Option<usize> { 182 match self.services.ready_len() { 183 0 => None, 184 1 => Some(0), 185 len => { 186 // Get two distinct random indexes (in a random order) and 187 // compare the loads of the service at each index. 188 let idxs = rand::seq::index::sample(&mut self.rng, len, 2); 189 190 let aidx = idxs.index(0); 191 let bidx = idxs.index(1); 192 debug_assert_ne!(aidx, bidx, "random indices must be distinct"); 193 194 let aload = self.ready_index_load(aidx); 195 let bload = self.ready_index_load(bidx); 196 let chosen = if aload <= bload { aidx } else { bidx }; 197 198 trace!( 199 a.index = aidx, 200 a.load = ?aload, 201 b.index = bidx, 202 b.load = ?bload, 203 chosen = if chosen == aidx { "a" } else { "b" }, 204 "p2c", 205 ); 206 Some(chosen) 207 } 208 } 209 } 210 211 /// Accesses a ready endpoint by index and returns its current load. ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric212 fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric { 213 let (_, svc) = self.services.get_ready_index(index).expect("invalid index"); 214 svc.load() 215 } 216 discover_mut(&mut self) -> &mut D217 pub(crate) fn discover_mut(&mut self) -> &mut D { 218 &mut self.discover 219 } 220 } 221 222 impl<D, Req> Service<Req> for Balance<D, Req> 223 where 224 D: Discover + Unpin, 225 D::Key: Hash + Clone, 226 D::Error: Into<crate::BoxError>, 227 D::Service: Service<Req> + Load, 228 <D::Service as Load>::Metric: std::fmt::Debug, 229 <D::Service as Service<Req>>::Error: Into<crate::BoxError>, 230 { 231 type Response = <D::Service as Service<Req>>::Response; 232 type Error = crate::BoxError; 233 type Future = future::MapErr< 234 <D::Service as Service<Req>>::Future, 235 fn(<D::Service as Service<Req>>::Error) -> crate::BoxError, 236 >; 237 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>238 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 239 // `ready_index` may have already been set by a prior invocation. These 240 // updates cannot disturb the order of existing ready services. 241 let _ = self.update_pending_from_discover(cx)?; 242 self.promote_pending_to_ready(cx); 243 244 loop { 245 // If a service has already been selected, ensure that it is ready. 246 // This ensures that the underlying service is ready immediately 247 // before a request is dispatched to it (i.e. in the same task 248 // invocation). If, e.g., a failure detector has changed the state 249 // of the service, it may be evicted from the ready set so that 250 // another service can be selected. 251 if let Some(index) = self.ready_index.take() { 252 match self.services.check_ready_index(cx, index) { 253 Ok(true) => { 254 // The service remains ready. 255 self.ready_index = Some(index); 256 return Poll::Ready(Ok(())); 257 } 258 Ok(false) => { 259 // The service is no longer ready. Try to find a new one. 260 trace!("ready service became unavailable"); 261 } 262 Err(Failed(_, error)) => { 263 // The ready endpoint failed, so log the error and try 264 // to find a new one. 265 debug!(%error, "endpoint failed"); 266 } 267 } 268 } 269 270 // Select a new service by comparing two at random and using the 271 // lesser-loaded service. 272 self.ready_index = self.p2c_ready_index(); 273 if self.ready_index.is_none() { 274 debug_assert_eq!(self.services.ready_len(), 0); 275 // We have previously registered interest in updates from 276 // discover and pending services. 277 return Poll::Pending; 278 } 279 } 280 } 281 call(&mut self, request: Req) -> Self::Future282 fn call(&mut self, request: Req) -> Self::Future { 283 let index = self.ready_index.take().expect("called before ready"); 284 self.services 285 .call_ready_index(index, request) 286 .map_err(Into::into) 287 } 288 } 289 290 impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> { 291 type Output = Result<(K, S), (K, Error<S::Error>)>; 292 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>293 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 294 let this = self.project(); 295 296 if let Poll::Ready(Ok(())) = this.cancel.poll(cx) { 297 let key = this.key.take().expect("polled after ready"); 298 return Poll::Ready(Err((key, Error::Canceled))); 299 } 300 301 let res = ready!(this 302 .service 303 .as_mut() 304 .expect("poll after ready") 305 .poll_ready(cx)); 306 307 let key = this.key.take().expect("polled after ready"); 308 let svc = this.service.take().expect("polled after ready"); 309 310 match res { 311 Ok(()) => Poll::Ready(Ok((key, svc))), 312 Err(e) => Poll::Ready(Err((key, Error::Inner(e)))), 313 } 314 } 315 } 316 317 impl<K, S, Req> fmt::Debug for UnreadyService<K, S, Req> 318 where 319 K: fmt::Debug, 320 S: fmt::Debug, 321 { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result322 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 323 let Self { 324 key, 325 cancel, 326 service, 327 _req, 328 } = self; 329 f.debug_struct("UnreadyService") 330 .field("key", key) 331 .field("cancel", cancel) 332 .field("service", service) 333 .finish() 334 } 335 } 336