summaryrefslogtreecommitdiff
path: root/vendor/tower/src/limit
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
committermo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
commit8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch)
tree22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/tower/src/limit
parent4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff)
chore: add vendor directory
Diffstat (limited to 'vendor/tower/src/limit')
-rw-r--r--vendor/tower/src/limit/concurrency/future.rs41
-rw-r--r--vendor/tower/src/limit/concurrency/layer.rs60
-rw-r--r--vendor/tower/src/limit/concurrency/mod.rs10
-rw-r--r--vendor/tower/src/limit/concurrency/service.rs118
-rw-r--r--vendor/tower/src/limit/mod.rs9
-rw-r--r--vendor/tower/src/limit/rate/layer.rs26
-rw-r--r--vendor/tower/src/limit/rate/mod.rs8
-rw-r--r--vendor/tower/src/limit/rate/rate.rs30
-rw-r--r--vendor/tower/src/limit/rate/service.rs130
9 files changed, 432 insertions, 0 deletions
diff --git a/vendor/tower/src/limit/concurrency/future.rs b/vendor/tower/src/limit/concurrency/future.rs
new file mode 100644
index 00000000..6eb0100a
--- /dev/null
+++ b/vendor/tower/src/limit/concurrency/future.rs
@@ -0,0 +1,41 @@
+//! [`Future`] types
+//!
+//! [`Future`]: std::future::Future
+use futures_core::ready;
+use pin_project_lite::pin_project;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tokio::sync::OwnedSemaphorePermit;
+
+pin_project! {
+ /// Future for the [`ConcurrencyLimit`] service.
+ ///
+ /// [`ConcurrencyLimit`]: crate::limit::ConcurrencyLimit
+ #[derive(Debug)]
+ pub struct ResponseFuture<T> {
+ #[pin]
+ inner: T,
+ // Keep this around so that it is dropped when the future completes
+ _permit: OwnedSemaphorePermit,
+ }
+}
+
+impl<T> ResponseFuture<T> {
+ pub(crate) fn new(inner: T, _permit: OwnedSemaphorePermit) -> ResponseFuture<T> {
+ ResponseFuture { inner, _permit }
+ }
+}
+
+impl<F, T, E> Future for ResponseFuture<F>
+where
+ F: Future<Output = Result<T, E>>,
+{
+ type Output = Result<T, E>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Poll::Ready(ready!(self.project().inner.poll(cx)))
+ }
+}
diff --git a/vendor/tower/src/limit/concurrency/layer.rs b/vendor/tower/src/limit/concurrency/layer.rs
new file mode 100644
index 00000000..30257c45
--- /dev/null
+++ b/vendor/tower/src/limit/concurrency/layer.rs
@@ -0,0 +1,60 @@
+use std::sync::Arc;
+
+use super::ConcurrencyLimit;
+use tokio::sync::Semaphore;
+use tower_layer::Layer;
+
+/// Enforces a limit on the concurrent number of requests the underlying
+/// service can handle.
+#[derive(Debug, Clone)]
+pub struct ConcurrencyLimitLayer {
+ max: usize,
+}
+
+impl ConcurrencyLimitLayer {
+ /// Create a new concurrency limit layer.
+ pub const fn new(max: usize) -> Self {
+ ConcurrencyLimitLayer { max }
+ }
+}
+
+impl<S> Layer<S> for ConcurrencyLimitLayer {
+ type Service = ConcurrencyLimit<S>;
+
+ fn layer(&self, service: S) -> Self::Service {
+ ConcurrencyLimit::new(service, self.max)
+ }
+}
+
+/// Enforces a limit on the concurrent number of requests the underlying
+/// service can handle.
+///
+/// Unlike [`ConcurrencyLimitLayer`], which enforces a per-service concurrency
+/// limit, this layer accepts a owned semaphore (`Arc<Semaphore>`) which can be
+/// shared across multiple services.
+///
+/// Cloning this layer will not create a new semaphore.
+#[derive(Debug, Clone)]
+pub struct GlobalConcurrencyLimitLayer {
+ semaphore: Arc<Semaphore>,
+}
+
+impl GlobalConcurrencyLimitLayer {
+ /// Create a new `GlobalConcurrencyLimitLayer`.
+ pub fn new(max: usize) -> Self {
+ Self::with_semaphore(Arc::new(Semaphore::new(max)))
+ }
+
+ /// Create a new `GlobalConcurrencyLimitLayer` from a `Arc<Semaphore>`
+ pub fn with_semaphore(semaphore: Arc<Semaphore>) -> Self {
+ GlobalConcurrencyLimitLayer { semaphore }
+ }
+}
+
+impl<S> Layer<S> for GlobalConcurrencyLimitLayer {
+ type Service = ConcurrencyLimit<S>;
+
+ fn layer(&self, service: S) -> Self::Service {
+ ConcurrencyLimit::with_semaphore(service, self.semaphore.clone())
+ }
+}
diff --git a/vendor/tower/src/limit/concurrency/mod.rs b/vendor/tower/src/limit/concurrency/mod.rs
new file mode 100644
index 00000000..ac0be8a5
--- /dev/null
+++ b/vendor/tower/src/limit/concurrency/mod.rs
@@ -0,0 +1,10 @@
+//! Limit the max number of requests being concurrently processed.
+
+pub mod future;
+mod layer;
+mod service;
+
+pub use self::{
+ layer::{ConcurrencyLimitLayer, GlobalConcurrencyLimitLayer},
+ service::ConcurrencyLimit,
+};
diff --git a/vendor/tower/src/limit/concurrency/service.rs b/vendor/tower/src/limit/concurrency/service.rs
new file mode 100644
index 00000000..bb9cf5ee
--- /dev/null
+++ b/vendor/tower/src/limit/concurrency/service.rs
@@ -0,0 +1,118 @@
+use super::future::ResponseFuture;
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
+use tokio_util::sync::PollSemaphore;
+use tower_service::Service;
+
+use futures_core::ready;
+use std::{
+ sync::Arc,
+ task::{Context, Poll},
+};
+
+/// Enforces a limit on the concurrent number of requests the underlying
+/// service can handle.
+#[derive(Debug)]
+pub struct ConcurrencyLimit<T> {
+ inner: T,
+ semaphore: PollSemaphore,
+ /// The currently acquired semaphore permit, if there is sufficient
+ /// concurrency to send a new request.
+ ///
+ /// The permit is acquired in `poll_ready`, and taken in `call` when sending
+ /// a new request.
+ permit: Option<OwnedSemaphorePermit>,
+}
+
+impl<T> ConcurrencyLimit<T> {
+ /// Create a new concurrency limiter.
+ pub fn new(inner: T, max: usize) -> Self {
+ Self::with_semaphore(inner, Arc::new(Semaphore::new(max)))
+ }
+
+ /// Create a new concurrency limiter with a provided shared semaphore
+ pub fn with_semaphore(inner: T, semaphore: Arc<Semaphore>) -> Self {
+ ConcurrencyLimit {
+ inner,
+ semaphore: PollSemaphore::new(semaphore),
+ permit: None,
+ }
+ }
+
+ /// Get a reference to the inner service
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Get a mutable reference to the inner service
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Consume `self`, returning the inner service
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+impl<S, Request> Service<Request> for ConcurrencyLimit<S>
+where
+ S: Service<Request>,
+{
+ type Response = S::Response;
+ type Error = S::Error;
+ type Future = ResponseFuture<S::Future>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ // If we haven't already acquired a permit from the semaphore, try to
+ // acquire one first.
+ if self.permit.is_none() {
+ self.permit = ready!(self.semaphore.poll_acquire(cx));
+ debug_assert!(
+ self.permit.is_some(),
+ "ConcurrencyLimit semaphore is never closed, so `poll_acquire` \
+ should never fail",
+ );
+ }
+
+ // Once we've acquired a permit (or if we already had one), poll the
+ // inner service.
+ self.inner.poll_ready(cx)
+ }
+
+ fn call(&mut self, request: Request) -> Self::Future {
+ // Take the permit
+ let permit = self
+ .permit
+ .take()
+ .expect("max requests in-flight; poll_ready must be called first");
+
+ // Call the inner service
+ let future = self.inner.call(request);
+
+ ResponseFuture::new(future, permit)
+ }
+}
+
+impl<T: Clone> Clone for ConcurrencyLimit<T> {
+ fn clone(&self) -> Self {
+ // Since we hold an `OwnedSemaphorePermit`, we can't derive `Clone`.
+ // Instead, when cloning the service, create a new service with the
+ // same semaphore, but with the permit in the un-acquired state.
+ Self {
+ inner: self.inner.clone(),
+ semaphore: self.semaphore.clone(),
+ permit: None,
+ }
+ }
+}
+
+#[cfg(feature = "load")]
+impl<S> crate::load::Load for ConcurrencyLimit<S>
+where
+ S: crate::load::Load,
+{
+ type Metric = S::Metric;
+ fn load(&self) -> Self::Metric {
+ self.inner.load()
+ }
+}
diff --git a/vendor/tower/src/limit/mod.rs b/vendor/tower/src/limit/mod.rs
new file mode 100644
index 00000000..6a10dcae
--- /dev/null
+++ b/vendor/tower/src/limit/mod.rs
@@ -0,0 +1,9 @@
+//! Tower middleware for limiting requests.
+
+pub mod concurrency;
+pub mod rate;
+
+pub use self::{
+ concurrency::{ConcurrencyLimit, ConcurrencyLimitLayer, GlobalConcurrencyLimitLayer},
+ rate::{RateLimit, RateLimitLayer},
+};
diff --git a/vendor/tower/src/limit/rate/layer.rs b/vendor/tower/src/limit/rate/layer.rs
new file mode 100644
index 00000000..5f8d31aa
--- /dev/null
+++ b/vendor/tower/src/limit/rate/layer.rs
@@ -0,0 +1,26 @@
+use super::{Rate, RateLimit};
+use std::time::Duration;
+use tower_layer::Layer;
+
+/// Enforces a rate limit on the number of requests the underlying
+/// service can handle over a period of time.
+#[derive(Debug, Clone)]
+pub struct RateLimitLayer {
+ rate: Rate,
+}
+
+impl RateLimitLayer {
+ /// Create new rate limit layer.
+ pub const fn new(num: u64, per: Duration) -> Self {
+ let rate = Rate::new(num, per);
+ RateLimitLayer { rate }
+ }
+}
+
+impl<S> Layer<S> for RateLimitLayer {
+ type Service = RateLimit<S>;
+
+ fn layer(&self, service: S) -> Self::Service {
+ RateLimit::new(service, self.rate)
+ }
+}
diff --git a/vendor/tower/src/limit/rate/mod.rs b/vendor/tower/src/limit/rate/mod.rs
new file mode 100644
index 00000000..52b179b8
--- /dev/null
+++ b/vendor/tower/src/limit/rate/mod.rs
@@ -0,0 +1,8 @@
+//! Limit the rate at which requests are processed.
+
+mod layer;
+#[allow(clippy::module_inception)]
+mod rate;
+mod service;
+
+pub use self::{layer::RateLimitLayer, rate::Rate, service::RateLimit};
diff --git a/vendor/tower/src/limit/rate/rate.rs b/vendor/tower/src/limit/rate/rate.rs
new file mode 100644
index 00000000..66736dea
--- /dev/null
+++ b/vendor/tower/src/limit/rate/rate.rs
@@ -0,0 +1,30 @@
+use std::time::Duration;
+
+/// A rate of requests per time period.
+#[derive(Debug, Copy, Clone)]
+pub struct Rate {
+ num: u64,
+ per: Duration,
+}
+
+impl Rate {
+ /// Create a new rate.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if `num` or `per` is 0.
+ pub const fn new(num: u64, per: Duration) -> Self {
+ assert!(num > 0);
+ assert!(per.as_nanos() > 0);
+
+ Rate { num, per }
+ }
+
+ pub(crate) fn num(&self) -> u64 {
+ self.num
+ }
+
+ pub(crate) fn per(&self) -> Duration {
+ self.per
+ }
+}
diff --git a/vendor/tower/src/limit/rate/service.rs b/vendor/tower/src/limit/rate/service.rs
new file mode 100644
index 00000000..0aa1d694
--- /dev/null
+++ b/vendor/tower/src/limit/rate/service.rs
@@ -0,0 +1,130 @@
+use super::Rate;
+use futures_core::ready;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tokio::time::{Instant, Sleep};
+use tower_service::Service;
+
+/// Enforces a rate limit on the number of requests the underlying
+/// service can handle over a period of time.
+#[derive(Debug)]
+pub struct RateLimit<T> {
+ inner: T,
+ rate: Rate,
+ state: State,
+ sleep: Pin<Box<Sleep>>,
+}
+
+#[derive(Debug)]
+enum State {
+ // The service has hit its limit
+ Limited,
+ Ready { until: Instant, rem: u64 },
+}
+
+impl<T> RateLimit<T> {
+ /// Create a new rate limiter
+ pub fn new(inner: T, rate: Rate) -> Self {
+ let until = Instant::now();
+ let state = State::Ready {
+ until,
+ rem: rate.num(),
+ };
+
+ RateLimit {
+ inner,
+ rate,
+ state,
+ // The sleep won't actually be used with this duration, but
+ // we create it eagerly so that we can reset it in place rather than
+ // `Box::pin`ning a new `Sleep` every time we need one.
+ sleep: Box::pin(tokio::time::sleep_until(until)),
+ }
+ }
+
+ /// Get a reference to the inner service
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Get a mutable reference to the inner service
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Consume `self`, returning the inner service
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+impl<S, Request> Service<Request> for RateLimit<S>
+where
+ S: Service<Request>,
+{
+ type Response = S::Response;
+ type Error = S::Error;
+ type Future = S::Future;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ match self.state {
+ State::Ready { .. } => return Poll::Ready(ready!(self.inner.poll_ready(cx))),
+ State::Limited => {
+ if Pin::new(&mut self.sleep).poll(cx).is_pending() {
+ tracing::trace!("rate limit exceeded; sleeping.");
+ return Poll::Pending;
+ }
+ }
+ }
+
+ self.state = State::Ready {
+ until: Instant::now() + self.rate.per(),
+ rem: self.rate.num(),
+ };
+
+ Poll::Ready(ready!(self.inner.poll_ready(cx)))
+ }
+
+ fn call(&mut self, request: Request) -> Self::Future {
+ match self.state {
+ State::Ready { mut until, mut rem } => {
+ let now = Instant::now();
+
+ // If the period has elapsed, reset it.
+ if now >= until {
+ until = now + self.rate.per();
+ rem = self.rate.num();
+ }
+
+ if rem > 1 {
+ rem -= 1;
+ self.state = State::Ready { until, rem };
+ } else {
+ // The service is disabled until further notice
+ // Reset the sleep future in place, so that we don't have to
+ // deallocate the existing box and allocate a new one.
+ self.sleep.as_mut().reset(until);
+ self.state = State::Limited;
+ }
+
+ // Call the inner future
+ self.inner.call(request)
+ }
+ State::Limited => panic!("service not ready; poll_ready must be called first"),
+ }
+ }
+}
+
+#[cfg(feature = "load")]
+impl<S> crate::load::Load for RateLimit<S>
+where
+ S: crate::load::Load,
+{
+ type Metric = S::Metric;
+ fn load(&self) -> Self::Metric {
+ self.inner.load()
+ }
+}