summaryrefslogtreecommitdiff
path: root/vendor/tower/src/ready_cache/cache.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tower/src/ready_cache/cache.rs')
-rw-r--r--vendor/tower/src/ready_cache/cache.rs503
1 files changed, 503 insertions, 0 deletions
diff --git a/vendor/tower/src/ready_cache/cache.rs b/vendor/tower/src/ready_cache/cache.rs
new file mode 100644
index 00000000..a6299033
--- /dev/null
+++ b/vendor/tower/src/ready_cache/cache.rs
@@ -0,0 +1,503 @@
+//! A cache of services.
+
+use super::error;
+use futures_core::Stream;
+use futures_util::{stream::FuturesUnordered, task::AtomicWaker};
+pub use indexmap::Equivalent;
+use indexmap::IndexMap;
+use std::fmt;
+use std::future::Future;
+use std::hash::Hash;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use tower_service::Service;
+use tracing::{debug, trace};
+
+/// Drives readiness over a set of services.
+///
+/// The cache maintains two internal data structures:
+///
+/// * a set of _pending_ services that have not yet become ready; and
+/// * a set of _ready_ services that have previously polled ready.
+///
+/// As each `S` typed [`Service`] is added to the cache via [`ReadyCache::push`], it
+/// is added to the _pending set_. As [`ReadyCache::poll_pending`] is invoked,
+/// pending services are polled and added to the _ready set_.
+///
+/// [`ReadyCache::call_ready`] (or [`ReadyCache::call_ready_index`]) dispatches a
+/// request to the specified service, but panics if the specified service is not
+/// in the ready set. The `ReadyCache::check_*` functions can be used to ensure
+/// that a service is ready before dispatching a request.
+///
+/// The ready set can hold services for an arbitrarily long time. During this
+/// time, the runtime may process events that invalidate that ready state (for
+/// instance, if a keepalive detects a lost connection). In such cases, callers
+/// should use [`ReadyCache::check_ready`] (or [`ReadyCache::check_ready_index`])
+/// immediately before dispatching a request to ensure that the service has not
+/// become unavailable.
+///
+/// Once `ReadyCache::call_ready*` is invoked, the service is placed back into
+/// the _pending_ set to be driven to readiness again.
+///
+/// When `ReadyCache::check_ready*` returns `false`, it indicates that the
+/// specified service is _not_ ready. If an error is returned, this indicates that
+/// the server failed and has been removed from the cache entirely.
+///
+/// [`ReadyCache::evict`] can be used to remove a service from the cache (by key),
+/// though the service may not be dropped (if it is currently pending) until
+/// [`ReadyCache::poll_pending`] is invoked.
+///
+/// Note that the by-index accessors are provided to support use cases (like
+/// power-of-two-choices load balancing) where the caller does not care to keep
+/// track of each service's key. Instead, it needs only to access _some_ ready
+/// service. In such a case, it should be noted that calls to
+/// [`ReadyCache::poll_pending`] and [`ReadyCache::evict`] may perturb the order of
+/// the ready set, so any cached indexes should be discarded after such a call.
+pub struct ReadyCache<K, S, Req>
+where
+ K: Eq + Hash,
+{
+ /// A stream of services that are not yet ready.
+ pending: FuturesUnordered<Pending<K, S, Req>>,
+ /// An index of cancelation handles for pending streams.
+ pending_cancel_txs: IndexMap<K, CancelTx>,
+
+ /// Services that have previously become ready. Readiness can become stale,
+ /// so a given service should be polled immediately before use.
+ ///
+ /// The cancelation oneshot is preserved (though unused) while the service is
+ /// ready so that it need not be reallocated each time a request is
+ /// dispatched.
+ ready: IndexMap<K, (S, CancelPair)>,
+}
+
+// Safety: This is safe because we do not use `Pin::new_unchecked`.
+impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}
+
+#[derive(Debug)]
+struct Cancel {
+ waker: AtomicWaker,
+ canceled: AtomicBool,
+}
+
+#[derive(Debug)]
+struct CancelRx(Arc<Cancel>);
+
+#[derive(Debug)]
+struct CancelTx(Arc<Cancel>);
+
+type CancelPair = (CancelTx, CancelRx);
+
+#[derive(Debug)]
+enum PendingError<K, E> {
+ Canceled(K),
+ Inner(K, E),
+}
+
+pin_project_lite::pin_project! {
+ /// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
+ ///
+ /// May fail due to cancelation, i.e. if the service is evicted from the balancer.
+ struct Pending<K, S, Req> {
+ key: Option<K>,
+ cancel: Option<CancelRx>,
+ ready: Option<S>,
+ _pd: std::marker::PhantomData<Req>,
+ }
+}
+
+// === ReadyCache ===
+
+impl<K, S, Req> Default for ReadyCache<K, S, Req>
+where
+ K: Eq + Hash,
+ S: Service<Req>,
+{
+ fn default() -> Self {
+ Self {
+ ready: IndexMap::default(),
+ pending: FuturesUnordered::new(),
+ pending_cancel_txs: IndexMap::default(),
+ }
+ }
+}
+
+impl<K, S, Req> fmt::Debug for ReadyCache<K, S, Req>
+where
+ K: fmt::Debug + Eq + Hash,
+ S: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ let Self {
+ pending,
+ pending_cancel_txs,
+ ready,
+ } = self;
+ f.debug_struct("ReadyCache")
+ .field("pending", pending)
+ .field("pending_cancel_txs", pending_cancel_txs)
+ .field("ready", ready)
+ .finish()
+ }
+}
+
+impl<K, S, Req> ReadyCache<K, S, Req>
+where
+ K: Eq + Hash,
+{
+ /// Returns the total number of services in the cache.
+ pub fn len(&self) -> usize {
+ self.ready_len() + self.pending_len()
+ }
+
+ /// Returns whether or not there are any services in the cache.
+ pub fn is_empty(&self) -> bool {
+ self.ready.is_empty() && self.pending.is_empty()
+ }
+
+ /// Returns the number of services in the ready set.
+ pub fn ready_len(&self) -> usize {
+ self.ready.len()
+ }
+
+ /// Returns the number of services in the unready set.
+ pub fn pending_len(&self) -> usize {
+ self.pending.len()
+ }
+
+ /// Returns true iff the given key is in the unready set.
+ pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool {
+ self.pending_cancel_txs.contains_key(key)
+ }
+
+ /// Obtains a reference to a service in the ready set by key.
+ pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> {
+ self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0))
+ }
+
+ /// Obtains a mutable reference to a service in the ready set by key.
+ pub fn get_ready_mut<Q: Hash + Equivalent<K>>(
+ &mut self,
+ key: &Q,
+ ) -> Option<(usize, &K, &mut S)> {
+ self.ready
+ .get_full_mut(key)
+ .map(|(i, k, v)| (i, k, &mut v.0))
+ }
+
+ /// Obtains a reference to a service in the ready set by index.
+ pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> {
+ self.ready.get_index(idx).map(|(k, v)| (k, &v.0))
+ }
+
+ /// Obtains a mutable reference to a service in the ready set by index.
+ pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&K, &mut S)> {
+ self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0))
+ }
+
+ /// Returns an iterator over the ready keys and services.
+ pub fn iter_ready(&self) -> impl Iterator<Item = (&K, &S)> {
+ self.ready.iter().map(|(k, s)| (k, &s.0))
+ }
+
+ /// Returns a mutable iterator over the ready keys and services.
+ pub fn iter_ready_mut(&mut self) -> impl Iterator<Item = (&K, &mut S)> {
+ self.ready.iter_mut().map(|(k, s)| (k, &mut s.0))
+ }
+
+ /// Evicts an item from the cache.
+ ///
+ /// Returns true if a service was marked for eviction.
+ ///
+ /// Services are dropped from the ready set immediately. Services in the
+ /// pending set are marked for cancellation, but [`ReadyCache::poll_pending`]
+ /// must be called to cause the service to be dropped.
+ pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
+ let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
+ c.cancel();
+ true
+ } else {
+ false
+ };
+
+ self.ready
+ .swap_remove_full(key)
+ .map(|_| true)
+ .unwrap_or(canceled)
+ }
+}
+
+impl<K, S, Req> ReadyCache<K, S, Req>
+where
+ K: Clone + Eq + Hash,
+ S: Service<Req>,
+ <S as Service<Req>>::Error: Into<crate::BoxError>,
+ S::Error: Into<crate::BoxError>,
+{
+ /// Pushes a new service onto the pending set.
+ ///
+ /// The service will be promoted to the ready set as [`poll_pending`] is invoked.
+ ///
+ /// Note that this does **not** remove services from the ready set. Once the
+ /// old service is used, it will be dropped instead of being added back to
+ /// the pending set; OR, when the new service becomes ready, it will replace
+ /// the prior service in the ready set.
+ ///
+ /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
+ pub fn push(&mut self, key: K, svc: S) {
+ let cancel = cancelable();
+ self.push_pending(key, svc, cancel);
+ }
+
+ fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
+ if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
+ // If there is already a service for this key, cancel it.
+ c.cancel();
+ }
+ self.pending.push(Pending {
+ key: Some(key),
+ cancel: Some(cancel_rx),
+ ready: Some(svc),
+ _pd: std::marker::PhantomData,
+ });
+ }
+
+ /// Polls services pending readiness, adding ready services to the ready set.
+ ///
+ /// Returns [`Poll::Ready`] when there are no remaining unready services.
+ /// [`poll_pending`] should be called again after [`push`] or
+ /// [`call_ready_index`] are invoked.
+ ///
+ /// Failures indicate that an individual pending service failed to become
+ /// ready (and has been removed from the cache). In such a case,
+ /// [`poll_pending`] should typically be called again to continue driving
+ /// pending services to readiness.
+ ///
+ /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
+ /// [`push`]: crate::ready_cache::cache::ReadyCache::push
+ /// [`call_ready_index`]: crate::ready_cache::cache::ReadyCache::call_ready_index
+ pub fn poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>> {
+ loop {
+ match Pin::new(&mut self.pending).poll_next(cx) {
+ Poll::Pending => return Poll::Pending,
+ Poll::Ready(None) => return Poll::Ready(Ok(())),
+ Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => {
+ trace!("endpoint ready");
+ let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
+ if let Some(cancel_tx) = cancel_tx {
+ // Keep track of the cancelation so that it need not be
+ // recreated after the service is used.
+ self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
+ } else {
+ assert!(
+ cancel_tx.is_some(),
+ "services that become ready must have a pending cancelation"
+ );
+ }
+ }
+ Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
+ debug!("endpoint canceled");
+ // The cancellation for this service was removed in order to
+ // cause this cancellation.
+ }
+ Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
+ let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
+ assert!(
+ cancel_tx.is_some(),
+ "services that return an error must have a pending cancelation"
+ );
+ return Err(error::Failed(key, e.into())).into();
+ }
+ }
+ }
+ }
+
+ /// Checks whether the referenced endpoint is ready.
+ ///
+ /// Returns true if the endpoint is ready and false if it is not. An error is
+ /// returned if the endpoint fails.
+ pub fn check_ready<Q: Hash + Equivalent<K>>(
+ &mut self,
+ cx: &mut Context<'_>,
+ key: &Q,
+ ) -> Result<bool, error::Failed<K>> {
+ match self.ready.get_full_mut(key) {
+ Some((index, _, _)) => self.check_ready_index(cx, index),
+ None => Ok(false),
+ }
+ }
+
+ /// Checks whether the referenced endpoint is ready.
+ ///
+ /// If the service is no longer ready, it is moved back into the pending set
+ /// and `false` is returned.
+ ///
+ /// If the service errors, it is removed and dropped and the error is returned.
+ pub fn check_ready_index(
+ &mut self,
+ cx: &mut Context<'_>,
+ index: usize,
+ ) -> Result<bool, error::Failed<K>> {
+ let svc = match self.ready.get_index_mut(index) {
+ None => return Ok(false),
+ Some((_, (svc, _))) => svc,
+ };
+ match svc.poll_ready(cx) {
+ Poll::Ready(Ok(())) => Ok(true),
+ Poll::Pending => {
+ // became unready; so move it back there.
+ let (key, (svc, cancel)) = self
+ .ready
+ .swap_remove_index(index)
+ .expect("invalid ready index");
+
+ // If a new version of this service has been added to the
+ // unready set, don't overwrite it.
+ if !self.pending_contains(&key) {
+ self.push_pending(key, svc, cancel);
+ }
+
+ Ok(false)
+ }
+ Poll::Ready(Err(e)) => {
+ // failed, so drop it.
+ let (key, _) = self
+ .ready
+ .swap_remove_index(index)
+ .expect("invalid ready index");
+ Err(error::Failed(key, e.into()))
+ }
+ }
+ }
+
+ /// Calls a ready service by key.
+ ///
+ /// # Panics
+ ///
+ /// If the specified key does not exist in the ready
+ pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future {
+ let (index, _, _) = self
+ .ready
+ .get_full_mut(key)
+ .expect("check_ready was not called");
+ self.call_ready_index(index, req)
+ }
+
+ /// Calls a ready service by index.
+ ///
+ /// # Panics
+ ///
+ /// If the specified index is out of range.
+ pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future {
+ let (key, (mut svc, cancel)) = self
+ .ready
+ .swap_remove_index(index)
+ .expect("check_ready_index was not called");
+
+ let fut = svc.call(req);
+
+ // If a new version of this service has been added to the
+ // unready set, don't overwrite it.
+ if !self.pending_contains(&key) {
+ self.push_pending(key, svc, cancel);
+ }
+
+ fut
+ }
+}
+
+// === impl Cancel ===
+
+/// Creates a cancelation sender and receiver.
+///
+/// A `tokio::sync::oneshot` is NOT used, as a `Receiver` is not guaranteed to
+/// observe results as soon as a `Sender` fires. Using an `AtomicBool` allows
+/// the state to be observed as soon as the cancelation is triggered.
+fn cancelable() -> CancelPair {
+ let cx = Arc::new(Cancel {
+ waker: AtomicWaker::new(),
+ canceled: AtomicBool::new(false),
+ });
+ (CancelTx(cx.clone()), CancelRx(cx))
+}
+
+impl CancelTx {
+ fn cancel(self) {
+ self.0.canceled.store(true, Ordering::SeqCst);
+ self.0.waker.wake();
+ }
+}
+
+// === Pending ===
+
+impl<K, S, Req> Future for Pending<K, S, Req>
+where
+ S: Service<Req>,
+{
+ type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = self.project();
+ // Before checking whether the service is ready, check to see whether
+ // readiness has been canceled.
+ let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
+ if cancel.canceled.load(Ordering::SeqCst) {
+ let key = this.key.take().expect("polled after complete");
+ return Err(PendingError::Canceled(key)).into();
+ }
+
+ match this
+ .ready
+ .as_mut()
+ .expect("polled after ready")
+ .poll_ready(cx)
+ {
+ Poll::Pending => {
+ // Before returning Pending, register interest in cancelation so
+ // that this future is polled again if the state changes.
+ let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
+ cancel.waker.register(cx.waker());
+ // Because both the cancel receiver and cancel sender are held
+ // by the `ReadyCache` (i.e., on a single task), then it must
+ // not be possible for the cancelation state to change while
+ // polling a `Pending` service.
+ assert!(
+ !cancel.canceled.load(Ordering::SeqCst),
+ "cancelation cannot be notified while polling a pending service"
+ );
+ Poll::Pending
+ }
+ Poll::Ready(Ok(())) => {
+ let key = this.key.take().expect("polled after complete");
+ let cancel = this.cancel.take().expect("polled after complete");
+ Ok((key, this.ready.take().expect("polled after ready"), cancel)).into()
+ }
+ Poll::Ready(Err(e)) => {
+ let key = this.key.take().expect("polled after compete");
+ Err(PendingError::Inner(key, e)).into()
+ }
+ }
+ }
+}
+
+impl<K, S, Req> fmt::Debug for Pending<K, S, Req>
+where
+ K: fmt::Debug,
+ S: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ let Self {
+ key,
+ cancel,
+ ready,
+ _pd,
+ } = self;
+ f.debug_struct("Pending")
+ .field("key", key)
+ .field("cancel", cancel)
+ .field("ready", ready)
+ .finish()
+ }
+}