1 //! A cache of services.
2 
3 use super::error;
4 use futures_core::Stream;
5 use futures_util::{stream::FuturesUnordered, task::AtomicWaker};
6 pub use indexmap::Equivalent;
7 use indexmap::IndexMap;
8 use std::fmt;
9 use std::future::Future;
10 use std::hash::Hash;
11 use std::pin::Pin;
12 use std::sync::atomic::{AtomicBool, Ordering};
13 use std::sync::Arc;
14 use std::task::{Context, Poll};
15 use tower_service::Service;
16 use tracing::{debug, trace};
17 
18 /// Drives readiness over a set of services.
19 ///
20 /// The cache maintains two internal data structures:
21 ///
22 /// * a set of _pending_ services that have not yet become ready; and
23 /// * a set of _ready_ services that have previously polled ready.
24 ///
25 /// As each `S` typed [`Service`] is added to the cache via [`ReadyCache::push`], it
26 /// is added to the _pending set_. As [`ReadyCache::poll_pending`] is invoked,
27 /// pending services are polled and added to the _ready set_.
28 ///
29 /// [`ReadyCache::call_ready`] (or [`ReadyCache::call_ready_index`]) dispatches a
30 /// request to the specified service, but panics if the specified service is not
31 /// in the ready set. The `ReadyCache::check_*` functions can be used to ensure
32 /// that a service is ready before dispatching a request.
33 ///
34 /// The ready set can hold services for an abitrarily long time. During this
35 /// time, the runtime may process events that invalidate that ready state (for
36 /// instance, if a keepalive detects a lost connection). In such cases, callers
37 /// should use [`ReadyCache::check_ready`] (or [`ReadyCache::check_ready_index`])
38 /// immediately before dispatching a request to ensure that the service has not
39 /// become unavailable.
40 ///
41 /// Once `ReadyCache::call_ready*` is invoked, the service is placed back into
42 /// the _pending_ set to be driven to readiness again.
43 ///
44 /// When `ReadyCache::check_ready*` returns `false`, it indicates that the
45 /// specified service is _not_ ready. If an error is returned, this indicats that
46 /// the server failed and has been removed from the cache entirely.
47 ///
48 /// [`ReadyCache::evict`] can be used to remove a service from the cache (by key),
49 /// though the service may not be dropped (if it is currently pending) until
50 /// [`ReadyCache::poll_pending`] is invoked.
51 ///
52 /// Note that the by-index accessors are provided to support use cases (like
53 /// power-of-two-choices load balancing) where the caller does not care to keep
54 /// track of each service's key. Instead, it needs only to access _some_ ready
55 /// service. In such a case, it should be noted that calls to
56 /// [`ReadyCache::poll_pending`] and [`ReadyCache::evict`] may perturb the order of
57 /// the ready set, so any cached indexes should be discarded after such a call.
58 pub struct ReadyCache<K, S, Req>
59 where
60     K: Eq + Hash,
61 {
62     /// A stream of services that are not yet ready.
63     pending: FuturesUnordered<Pending<K, S, Req>>,
64     /// An index of cancelation handles for pending streams.
65     pending_cancel_txs: IndexMap<K, CancelTx>,
66 
67     /// Services that have previously become ready. Readiness can become stale,
68     /// so a given service should be polled immediately before use.
69     ///
70     /// The cancelation oneshot is preserved (though unused) while the service is
71     /// ready so that it need not be reallocated each time a request is
72     /// dispatched.
73     ready: IndexMap<K, (S, CancelPair)>,
74 }
75 
76 // Safety: This is safe because we do not use `Pin::new_unchecked`.
77 impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}
78 
79 #[derive(Debug)]
80 struct Cancel {
81     waker: AtomicWaker,
82     canceled: AtomicBool,
83 }
84 
85 #[derive(Debug)]
86 struct CancelRx(Arc<Cancel>);
87 
88 #[derive(Debug)]
89 struct CancelTx(Arc<Cancel>);
90 
91 type CancelPair = (CancelTx, CancelRx);
92 
93 #[derive(Debug)]
94 enum PendingError<K, E> {
95     Canceled(K),
96     Inner(K, E),
97 }
98 
99 pin_project_lite::pin_project! {
100     /// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
101     ///
102     /// May fail due to cancelation, i.e. if the service is evicted from the balancer.
103     struct Pending<K, S, Req> {
104         key: Option<K>,
105         cancel: Option<CancelRx>,
106         ready: Option<S>,
107         _pd: std::marker::PhantomData<Req>,
108     }
109 }
110 
111 // === ReadyCache ===
112 
113 impl<K, S, Req> Default for ReadyCache<K, S, Req>
114 where
115     K: Eq + Hash,
116     S: Service<Req>,
117 {
default() -> Self118     fn default() -> Self {
119         Self {
120             ready: IndexMap::default(),
121             pending: FuturesUnordered::new(),
122             pending_cancel_txs: IndexMap::default(),
123         }
124     }
125 }
126 
127 impl<K, S, Req> fmt::Debug for ReadyCache<K, S, Req>
128 where
129     K: fmt::Debug + Eq + Hash,
130     S: fmt::Debug,
131 {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result132     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133         let Self {
134             pending,
135             pending_cancel_txs,
136             ready,
137         } = self;
138         f.debug_struct("ReadyCache")
139             .field("pending", pending)
140             .field("pending_cancel_txs", pending_cancel_txs)
141             .field("ready", ready)
142             .finish()
143     }
144 }
145 
146 impl<K, S, Req> ReadyCache<K, S, Req>
147 where
148     K: Eq + Hash,
149 {
150     /// Returns the total number of services in the cache.
len(&self) -> usize151     pub fn len(&self) -> usize {
152         self.ready_len() + self.pending_len()
153     }
154 
155     /// Returns whether or not there are any services in the cache.
is_empty(&self) -> bool156     pub fn is_empty(&self) -> bool {
157         self.ready.is_empty() && self.pending.is_empty()
158     }
159 
160     /// Returns the number of services in the ready set.
ready_len(&self) -> usize161     pub fn ready_len(&self) -> usize {
162         self.ready.len()
163     }
164 
165     /// Returns the number of services in the unready set.
pending_len(&self) -> usize166     pub fn pending_len(&self) -> usize {
167         self.pending.len()
168     }
169 
170     /// Returns true iff the given key is in the unready set.
pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool171     pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool {
172         self.pending_cancel_txs.contains_key(key)
173     }
174 
175     /// Obtains a reference to a service in the ready set by key.
get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)>176     pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> {
177         self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0))
178     }
179 
180     /// Obtains a mutable reference to a service in the ready set by key.
get_ready_mut<Q: Hash + Equivalent<K>>( &mut self, key: &Q, ) -> Option<(usize, &K, &mut S)>181     pub fn get_ready_mut<Q: Hash + Equivalent<K>>(
182         &mut self,
183         key: &Q,
184     ) -> Option<(usize, &K, &mut S)> {
185         self.ready
186             .get_full_mut(key)
187             .map(|(i, k, v)| (i, k, &mut v.0))
188     }
189 
190     /// Obtains a reference to a service in the ready set by index.
get_ready_index(&self, idx: usize) -> Option<(&K, &S)>191     pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> {
192         self.ready.get_index(idx).map(|(k, v)| (k, &v.0))
193     }
194 
195     /// Obtains a mutable reference to a service in the ready set by index.
get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)>196     pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)> {
197         self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0))
198     }
199 
200     /// Evicts an item from the cache.
201     ///
202     /// Returns true if a service was marked for eviction.
203     ///
204     /// Services are dropped from the ready set immediately. Services in the
205     /// pending set are marked for cancellation, but [`ReadyCache::poll_pending`]
206     /// must be called to cause the service to be dropped.
evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool207     pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
208         let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
209             c.cancel();
210             true
211         } else {
212             false
213         };
214 
215         self.ready
216             .swap_remove_full(key)
217             .map(|_| true)
218             .unwrap_or(canceled)
219     }
220 }
221 
222 impl<K, S, Req> ReadyCache<K, S, Req>
223 where
224     K: Clone + Eq + Hash,
225     S: Service<Req>,
226     <S as Service<Req>>::Error: Into<crate::BoxError>,
227     S::Error: Into<crate::BoxError>,
228 {
229     /// Pushes a new service onto the pending set.
230     ///
231     /// The service will be promoted to the ready set as [`poll_pending`] is invoked.
232     ///
233     /// Note that this does **not** remove services from the ready set. Once the
234     /// old service is used, it will be dropped instead of being added back to
235     /// the pending set; OR, when the new service becomes ready, it will replace
236     /// the prior service in the ready set.
237     ///
238     /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
push(&mut self, key: K, svc: S)239     pub fn push(&mut self, key: K, svc: S) {
240         let cancel = cancelable();
241         self.push_pending(key, svc, cancel);
242     }
243 
push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair)244     fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
245         if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
246             // If there is already a service for this key, cancel it.
247             c.cancel();
248         }
249         self.pending.push(Pending {
250             key: Some(key),
251             cancel: Some(cancel_rx),
252             ready: Some(svc),
253             _pd: std::marker::PhantomData,
254         });
255     }
256 
257     /// Polls services pending readiness, adding ready services to the ready set.
258     ///
259     /// Returns [`Poll::Ready`] when there are no remaining unready services.
260     /// [`poll_pending`] should be called again after [`push`] or
261     /// [`call_ready_index`] are invoked.
262     ///
263     /// Failures indicate that an individual pending service failed to become
264     /// ready (and has been removed from the cache). In such a case,
265     /// [`poll_pending`] should typically be called again to continue driving
266     /// pending services to readiness.
267     ///
268     /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
269     /// [`push`]: crate::ready_cache::cache::ReadyCache::push
270     /// [`call_ready_index`]: crate::ready_cache::cache::ReadyCache::call_ready_index
poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>>271     pub fn poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>> {
272         loop {
273             match Pin::new(&mut self.pending).poll_next(cx) {
274                 Poll::Pending => return Poll::Pending,
275                 Poll::Ready(None) => return Poll::Ready(Ok(())),
276                 Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => {
277                     trace!("endpoint ready");
278                     let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
279                     if let Some(cancel_tx) = cancel_tx {
280                         // Keep track of the cancelation so that it need not be
281                         // recreated after the service is used.
282                         self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
283                     } else {
284                         assert!(
285                             cancel_tx.is_some(),
286                             "services that become ready must have a pending cancelation"
287                         );
288                     }
289                 }
290                 Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
291                     debug!("endpoint canceled");
292                     // The cancellation for this service was removed in order to
293                     // cause this cancellation.
294                 }
295                 Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
296                     let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
297                     assert!(
298                         cancel_tx.is_some(),
299                         "services that return an error must have a pending cancelation"
300                     );
301                     return Err(error::Failed(key, e.into())).into();
302                 }
303             }
304         }
305     }
306 
307     /// Checks whether the referenced endpoint is ready.
308     ///
309     /// Returns true if the endpoint is ready and false if it is not. An error is
310     /// returned if the endpoint fails.
check_ready<Q: Hash + Equivalent<K>>( &mut self, cx: &mut Context<'_>, key: &Q, ) -> Result<bool, error::Failed<K>>311     pub fn check_ready<Q: Hash + Equivalent<K>>(
312         &mut self,
313         cx: &mut Context<'_>,
314         key: &Q,
315     ) -> Result<bool, error::Failed<K>> {
316         match self.ready.get_full_mut(key) {
317             Some((index, _, _)) => self.check_ready_index(cx, index),
318             None => Ok(false),
319         }
320     }
321 
322     /// Checks whether the referenced endpoint is ready.
323     ///
324     /// If the service is no longer ready, it is moved back into the pending set
325     /// and `false` is returned.
326     ///
327     /// If the service errors, it is removed and dropped and the error is returned.
check_ready_index( &mut self, cx: &mut Context<'_>, index: usize, ) -> Result<bool, error::Failed<K>>328     pub fn check_ready_index(
329         &mut self,
330         cx: &mut Context<'_>,
331         index: usize,
332     ) -> Result<bool, error::Failed<K>> {
333         let svc = match self.ready.get_index_mut(index) {
334             None => return Ok(false),
335             Some((_, (svc, _))) => svc,
336         };
337         match svc.poll_ready(cx) {
338             Poll::Ready(Ok(())) => Ok(true),
339             Poll::Pending => {
340                 // became unready; so move it back there.
341                 let (key, (svc, cancel)) = self
342                     .ready
343                     .swap_remove_index(index)
344                     .expect("invalid ready index");
345 
346                 // If a new version of this service has been added to the
347                 // unready set, don't overwrite it.
348                 if !self.pending_contains(&key) {
349                     self.push_pending(key, svc, cancel);
350                 }
351 
352                 Ok(false)
353             }
354             Poll::Ready(Err(e)) => {
355                 // failed, so drop it.
356                 let (key, _) = self
357                     .ready
358                     .swap_remove_index(index)
359                     .expect("invalid ready index");
360                 Err(error::Failed(key, e.into()))
361             }
362         }
363     }
364 
365     /// Calls a ready service by key.
366     ///
367     /// # Panics
368     ///
369     /// If the specified key does not exist in the ready
call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future370     pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future {
371         let (index, _, _) = self
372             .ready
373             .get_full_mut(key)
374             .expect("check_ready was not called");
375         self.call_ready_index(index, req)
376     }
377 
378     /// Calls a ready service by index.
379     ///
380     /// # Panics
381     ///
382     /// If the specified index is out of range.
call_ready_index(&mut self, index: usize, req: Req) -> S::Future383     pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future {
384         let (key, (mut svc, cancel)) = self
385             .ready
386             .swap_remove_index(index)
387             .expect("check_ready_index was not called");
388 
389         let fut = svc.call(req);
390 
391         // If a new version of this service has been added to the
392         // unready set, don't overwrite it.
393         if !self.pending_contains(&key) {
394             self.push_pending(key, svc, cancel);
395         }
396 
397         fut
398     }
399 }
400 
401 // === impl Cancel ===
402 
403 /// Creates a cancelation sender and receiver.
404 ///
405 /// A `tokio::sync::oneshot` is NOT used, as a `Receiver` is not guaranteed to
406 /// observe results as soon as a `Sender` fires. Using an `AtomicBool` allows
407 /// the state to be observed as soon as the cancelation is triggered.
cancelable() -> CancelPair408 fn cancelable() -> CancelPair {
409     let cx = Arc::new(Cancel {
410         waker: AtomicWaker::new(),
411         canceled: AtomicBool::new(false),
412     });
413     (CancelTx(cx.clone()), CancelRx(cx))
414 }
415 
416 impl CancelTx {
cancel(self)417     fn cancel(self) {
418         self.0.canceled.store(true, Ordering::SeqCst);
419         self.0.waker.wake();
420     }
421 }
422 
423 // === Pending ===
424 
425 impl<K, S, Req> Future for Pending<K, S, Req>
426 where
427     S: Service<Req>,
428 {
429     type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;
430 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>431     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
432         let this = self.project();
433         // Before checking whether the service is ready, check to see whether
434         // readiness has been canceled.
435         let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
436         if cancel.canceled.load(Ordering::SeqCst) {
437             let key = this.key.take().expect("polled after complete");
438             return Err(PendingError::Canceled(key)).into();
439         }
440 
441         match this
442             .ready
443             .as_mut()
444             .expect("polled after ready")
445             .poll_ready(cx)
446         {
447             Poll::Pending => {
448                 // Before returning Pending, register interest in cancelation so
449                 // that this future is polled again if the state changes.
450                 let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
451                 cancel.waker.register(cx.waker());
452                 // Because both the cancel receiver and cancel sender are held
453                 // by the `ReadyCache` (i.e., on a single task), then it must
454                 // not be possible for the cancelation state to change while
455                 // polling a `Pending` service.
456                 assert!(
457                     !cancel.canceled.load(Ordering::SeqCst),
458                     "cancelation cannot be notified while polling a pending service"
459                 );
460                 Poll::Pending
461             }
462             Poll::Ready(Ok(())) => {
463                 let key = this.key.take().expect("polled after complete");
464                 let cancel = this.cancel.take().expect("polled after complete");
465                 Ok((key, this.ready.take().expect("polled after ready"), cancel)).into()
466             }
467             Poll::Ready(Err(e)) => {
468                 let key = this.key.take().expect("polled after compete");
469                 Err(PendingError::Inner(key, e)).into()
470             }
471         }
472     }
473 }
474 
475 impl<K, S, Req> fmt::Debug for Pending<K, S, Req>
476 where
477     K: fmt::Debug,
478     S: fmt::Debug,
479 {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result480     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
481         let Self {
482             key,
483             cancel,
484             ready,
485             _pd,
486         } = self;
487         f.debug_struct("Pending")
488             .field("key", key)
489             .field("cancel", cancel)
490             .field("ready", ready)
491             .finish()
492     }
493 }
494