//! A cache of services. use super::error; use futures_core::Stream; use futures_util::{stream::FuturesUnordered, task::AtomicWaker}; pub use indexmap::Equivalent; use indexmap::IndexMap; use std::fmt; use std::future::Future; use std::hash::Hash; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use tower_service::Service; use tracing::{debug, trace}; /// Drives readiness over a set of services. /// /// The cache maintains two internal data structures: /// /// * a set of _pending_ services that have not yet become ready; and /// * a set of _ready_ services that have previously polled ready. /// /// As each `S` typed [`Service`] is added to the cache via [`ReadyCache::push`], it /// is added to the _pending set_. As [`ReadyCache::poll_pending`] is invoked, /// pending services are polled and added to the _ready set_. /// /// [`ReadyCache::call_ready`] (or [`ReadyCache::call_ready_index`]) dispatches a /// request to the specified service, but panics if the specified service is not /// in the ready set. The `ReadyCache::check_*` functions can be used to ensure /// that a service is ready before dispatching a request. /// /// The ready set can hold services for an abitrarily long time. During this /// time, the runtime may process events that invalidate that ready state (for /// instance, if a keepalive detects a lost connection). In such cases, callers /// should use [`ReadyCache::check_ready`] (or [`ReadyCache::check_ready_index`]) /// immediately before dispatching a request to ensure that the service has not /// become unavailable. /// /// Once `ReadyCache::call_ready*` is invoked, the service is placed back into /// the _pending_ set to be driven to readiness again. /// /// When `ReadyCache::check_ready*` returns `false`, it indicates that the /// specified service is _not_ ready. If an error is returned, this indicats that /// the server failed and has been removed from the cache entirely. /// /// [`ReadyCache::evict`] can be used to remove a service from the cache (by key), /// though the service may not be dropped (if it is currently pending) until /// [`ReadyCache::poll_pending`] is invoked. /// /// Note that the by-index accessors are provided to support use cases (like /// power-of-two-choices load balancing) where the caller does not care to keep /// track of each service's key. Instead, it needs only to access _some_ ready /// service. In such a case, it should be noted that calls to /// [`ReadyCache::poll_pending`] and [`ReadyCache::evict`] may perturb the order of /// the ready set, so any cached indexes should be discarded after such a call. pub struct ReadyCache where K: Eq + Hash, { /// A stream of services that are not yet ready. pending: FuturesUnordered>, /// An index of cancelation handles for pending streams. pending_cancel_txs: IndexMap, /// Services that have previously become ready. Readiness can become stale, /// so a given service should be polled immediately before use. /// /// The cancelation oneshot is preserved (though unused) while the service is /// ready so that it need not be reallocated each time a request is /// dispatched. ready: IndexMap, } // Safety: This is safe because we do not use `Pin::new_unchecked`. impl Unpin for ReadyCache {} #[derive(Debug)] struct Cancel { waker: AtomicWaker, canceled: AtomicBool, } #[derive(Debug)] struct CancelRx(Arc); #[derive(Debug)] struct CancelTx(Arc); type CancelPair = (CancelTx, CancelRx); #[derive(Debug)] enum PendingError { Canceled(K), Inner(K, E), } pin_project_lite::pin_project! { /// A [`Future`] that becomes satisfied when an `S`-typed service is ready. /// /// May fail due to cancelation, i.e. if the service is evicted from the balancer. struct Pending { key: Option, cancel: Option, ready: Option, _pd: std::marker::PhantomData, } } // === ReadyCache === impl Default for ReadyCache where K: Eq + Hash, S: Service, { fn default() -> Self { Self { ready: IndexMap::default(), pending: FuturesUnordered::new(), pending_cancel_txs: IndexMap::default(), } } } impl fmt::Debug for ReadyCache where K: fmt::Debug + Eq + Hash, S: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let Self { pending, pending_cancel_txs, ready, } = self; f.debug_struct("ReadyCache") .field("pending", pending) .field("pending_cancel_txs", pending_cancel_txs) .field("ready", ready) .finish() } } impl ReadyCache where K: Eq + Hash, { /// Returns the total number of services in the cache. pub fn len(&self) -> usize { self.ready_len() + self.pending_len() } /// Returns whether or not there are any services in the cache. pub fn is_empty(&self) -> bool { self.ready.is_empty() && self.pending.is_empty() } /// Returns the number of services in the ready set. pub fn ready_len(&self) -> usize { self.ready.len() } /// Returns the number of services in the unready set. pub fn pending_len(&self) -> usize { self.pending.len() } /// Returns true iff the given key is in the unready set. pub fn pending_contains>(&self, key: &Q) -> bool { self.pending_cancel_txs.contains_key(key) } /// Obtains a reference to a service in the ready set by key. pub fn get_ready>(&self, key: &Q) -> Option<(usize, &K, &S)> { self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0)) } /// Obtains a mutable reference to a service in the ready set by key. pub fn get_ready_mut>( &mut self, key: &Q, ) -> Option<(usize, &K, &mut S)> { self.ready .get_full_mut(key) .map(|(i, k, v)| (i, k, &mut v.0)) } /// Obtains a reference to a service in the ready set by index. pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> { self.ready.get_index(idx).map(|(k, v)| (k, &v.0)) } /// Obtains a mutable reference to a service in the ready set by index. pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)> { self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0)) } /// Evicts an item from the cache. /// /// Returns true if a service was marked for eviction. /// /// Services are dropped from the ready set immediately. Services in the /// pending set are marked for cancellation, but [`ReadyCache::poll_pending`] /// must be called to cause the service to be dropped. pub fn evict>(&mut self, key: &Q) -> bool { let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) { c.cancel(); true } else { false }; self.ready .swap_remove_full(key) .map(|_| true) .unwrap_or(canceled) } } impl ReadyCache where K: Clone + Eq + Hash, S: Service, >::Error: Into, S::Error: Into, { /// Pushes a new service onto the pending set. /// /// The service will be promoted to the ready set as [`poll_pending`] is invoked. /// /// Note that this does **not** remove services from the ready set. Once the /// old service is used, it will be dropped instead of being added back to /// the pending set; OR, when the new service becomes ready, it will replace /// the prior service in the ready set. /// /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending pub fn push(&mut self, key: K, svc: S) { let cancel = cancelable(); self.push_pending(key, svc, cancel); } fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) { if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) { // If there is already a service for this key, cancel it. c.cancel(); } self.pending.push(Pending { key: Some(key), cancel: Some(cancel_rx), ready: Some(svc), _pd: std::marker::PhantomData, }); } /// Polls services pending readiness, adding ready services to the ready set. /// /// Returns [`Poll::Ready`] when there are no remaining unready services. /// [`poll_pending`] should be called again after [`push`] or /// [`call_ready_index`] are invoked. /// /// Failures indicate that an individual pending service failed to become /// ready (and has been removed from the cache). In such a case, /// [`poll_pending`] should typically be called again to continue driving /// pending services to readiness. /// /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending /// [`push`]: crate::ready_cache::cache::ReadyCache::push /// [`call_ready_index`]: crate::ready_cache::cache::ReadyCache::call_ready_index pub fn poll_pending(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match Pin::new(&mut self.pending).poll_next(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => return Poll::Ready(Ok(())), Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => { trace!("endpoint ready"); let cancel_tx = self.pending_cancel_txs.swap_remove(&key); if let Some(cancel_tx) = cancel_tx { // Keep track of the cancelation so that it need not be // recreated after the service is used. self.ready.insert(key, (svc, (cancel_tx, cancel_rx))); } else { assert!( cancel_tx.is_some(), "services that become ready must have a pending cancelation" ); } } Poll::Ready(Some(Err(PendingError::Canceled(_)))) => { debug!("endpoint canceled"); // The cancellation for this service was removed in order to // cause this cancellation. } Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => { let cancel_tx = self.pending_cancel_txs.swap_remove(&key); assert!( cancel_tx.is_some(), "services that return an error must have a pending cancelation" ); return Err(error::Failed(key, e.into())).into(); } } } } /// Checks whether the referenced endpoint is ready. /// /// Returns true if the endpoint is ready and false if it is not. An error is /// returned if the endpoint fails. pub fn check_ready>( &mut self, cx: &mut Context<'_>, key: &Q, ) -> Result> { match self.ready.get_full_mut(key) { Some((index, _, _)) => self.check_ready_index(cx, index), None => Ok(false), } } /// Checks whether the referenced endpoint is ready. /// /// If the service is no longer ready, it is moved back into the pending set /// and `false` is returned. /// /// If the service errors, it is removed and dropped and the error is returned. pub fn check_ready_index( &mut self, cx: &mut Context<'_>, index: usize, ) -> Result> { let svc = match self.ready.get_index_mut(index) { None => return Ok(false), Some((_, (svc, _))) => svc, }; match svc.poll_ready(cx) { Poll::Ready(Ok(())) => Ok(true), Poll::Pending => { // became unready; so move it back there. let (key, (svc, cancel)) = self .ready .swap_remove_index(index) .expect("invalid ready index"); // If a new version of this service has been added to the // unready set, don't overwrite it. if !self.pending_contains(&key) { self.push_pending(key, svc, cancel); } Ok(false) } Poll::Ready(Err(e)) => { // failed, so drop it. let (key, _) = self .ready .swap_remove_index(index) .expect("invalid ready index"); Err(error::Failed(key, e.into())) } } } /// Calls a ready service by key. /// /// # Panics /// /// If the specified key does not exist in the ready pub fn call_ready>(&mut self, key: &Q, req: Req) -> S::Future { let (index, _, _) = self .ready .get_full_mut(key) .expect("check_ready was not called"); self.call_ready_index(index, req) } /// Calls a ready service by index. /// /// # Panics /// /// If the specified index is out of range. pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future { let (key, (mut svc, cancel)) = self .ready .swap_remove_index(index) .expect("check_ready_index was not called"); let fut = svc.call(req); // If a new version of this service has been added to the // unready set, don't overwrite it. if !self.pending_contains(&key) { self.push_pending(key, svc, cancel); } fut } } // === impl Cancel === /// Creates a cancelation sender and receiver. /// /// A `tokio::sync::oneshot` is NOT used, as a `Receiver` is not guaranteed to /// observe results as soon as a `Sender` fires. Using an `AtomicBool` allows /// the state to be observed as soon as the cancelation is triggered. fn cancelable() -> CancelPair { let cx = Arc::new(Cancel { waker: AtomicWaker::new(), canceled: AtomicBool::new(false), }); (CancelTx(cx.clone()), CancelRx(cx)) } impl CancelTx { fn cancel(self) { self.0.canceled.store(true, Ordering::SeqCst); self.0.waker.wake(); } } // === Pending === impl Future for Pending where S: Service, { type Output = Result<(K, S, CancelRx), PendingError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); // Before checking whether the service is ready, check to see whether // readiness has been canceled. let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete"); if cancel.canceled.load(Ordering::SeqCst) { let key = this.key.take().expect("polled after complete"); return Err(PendingError::Canceled(key)).into(); } match this .ready .as_mut() .expect("polled after ready") .poll_ready(cx) { Poll::Pending => { // Before returning Pending, register interest in cancelation so // that this future is polled again if the state changes. let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete"); cancel.waker.register(cx.waker()); // Because both the cancel receiver and cancel sender are held // by the `ReadyCache` (i.e., on a single task), then it must // not be possible for the cancelation state to change while // polling a `Pending` service. assert!( !cancel.canceled.load(Ordering::SeqCst), "cancelation cannot be notified while polling a pending service" ); Poll::Pending } Poll::Ready(Ok(())) => { let key = this.key.take().expect("polled after complete"); let cancel = this.cancel.take().expect("polled after complete"); Ok((key, this.ready.take().expect("polled after ready"), cancel)).into() } Poll::Ready(Err(e)) => { let key = this.key.take().expect("polled after compete"); Err(PendingError::Inner(key, e)).into() } } } } impl fmt::Debug for Pending where K: fmt::Debug, S: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let Self { key, cancel, ready, _pd, } = self; f.debug_struct("Pending") .field("key", key) .field("cancel", cancel) .field("ready", ready) .finish() } }