diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
| commit | 8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch) | |
| tree | 22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/tower/src/limit | |
| parent | 4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff) | |
chore: add vendor directory
Diffstat (limited to 'vendor/tower/src/limit')
| -rw-r--r-- | vendor/tower/src/limit/concurrency/future.rs | 41 | ||||
| -rw-r--r-- | vendor/tower/src/limit/concurrency/layer.rs | 60 | ||||
| -rw-r--r-- | vendor/tower/src/limit/concurrency/mod.rs | 10 | ||||
| -rw-r--r-- | vendor/tower/src/limit/concurrency/service.rs | 118 | ||||
| -rw-r--r-- | vendor/tower/src/limit/mod.rs | 9 | ||||
| -rw-r--r-- | vendor/tower/src/limit/rate/layer.rs | 26 | ||||
| -rw-r--r-- | vendor/tower/src/limit/rate/mod.rs | 8 | ||||
| -rw-r--r-- | vendor/tower/src/limit/rate/rate.rs | 30 | ||||
| -rw-r--r-- | vendor/tower/src/limit/rate/service.rs | 130 |
9 files changed, 432 insertions, 0 deletions
diff --git a/vendor/tower/src/limit/concurrency/future.rs b/vendor/tower/src/limit/concurrency/future.rs new file mode 100644 index 00000000..6eb0100a --- /dev/null +++ b/vendor/tower/src/limit/concurrency/future.rs @@ -0,0 +1,41 @@ +//! [`Future`] types +//! +//! [`Future`]: std::future::Future +use futures_core::ready; +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::sync::OwnedSemaphorePermit; + +pin_project! { + /// Future for the [`ConcurrencyLimit`] service. + /// + /// [`ConcurrencyLimit`]: crate::limit::ConcurrencyLimit + #[derive(Debug)] + pub struct ResponseFuture<T> { + #[pin] + inner: T, + // Keep this around so that it is dropped when the future completes + _permit: OwnedSemaphorePermit, + } +} + +impl<T> ResponseFuture<T> { + pub(crate) fn new(inner: T, _permit: OwnedSemaphorePermit) -> ResponseFuture<T> { + ResponseFuture { inner, _permit } + } +} + +impl<F, T, E> Future for ResponseFuture<F> +where + F: Future<Output = Result<T, E>>, +{ + type Output = Result<T, E>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Poll::Ready(ready!(self.project().inner.poll(cx))) + } +} diff --git a/vendor/tower/src/limit/concurrency/layer.rs b/vendor/tower/src/limit/concurrency/layer.rs new file mode 100644 index 00000000..30257c45 --- /dev/null +++ b/vendor/tower/src/limit/concurrency/layer.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use super::ConcurrencyLimit; +use tokio::sync::Semaphore; +use tower_layer::Layer; + +/// Enforces a limit on the concurrent number of requests the underlying +/// service can handle. +#[derive(Debug, Clone)] +pub struct ConcurrencyLimitLayer { + max: usize, +} + +impl ConcurrencyLimitLayer { + /// Create a new concurrency limit layer. + pub const fn new(max: usize) -> Self { + ConcurrencyLimitLayer { max } + } +} + +impl<S> Layer<S> for ConcurrencyLimitLayer { + type Service = ConcurrencyLimit<S>; + + fn layer(&self, service: S) -> Self::Service { + ConcurrencyLimit::new(service, self.max) + } +} + +/// Enforces a limit on the concurrent number of requests the underlying +/// service can handle. +/// +/// Unlike [`ConcurrencyLimitLayer`], which enforces a per-service concurrency +/// limit, this layer accepts a owned semaphore (`Arc<Semaphore>`) which can be +/// shared across multiple services. +/// +/// Cloning this layer will not create a new semaphore. +#[derive(Debug, Clone)] +pub struct GlobalConcurrencyLimitLayer { + semaphore: Arc<Semaphore>, +} + +impl GlobalConcurrencyLimitLayer { + /// Create a new `GlobalConcurrencyLimitLayer`. + pub fn new(max: usize) -> Self { + Self::with_semaphore(Arc::new(Semaphore::new(max))) + } + + /// Create a new `GlobalConcurrencyLimitLayer` from a `Arc<Semaphore>` + pub fn with_semaphore(semaphore: Arc<Semaphore>) -> Self { + GlobalConcurrencyLimitLayer { semaphore } + } +} + +impl<S> Layer<S> for GlobalConcurrencyLimitLayer { + type Service = ConcurrencyLimit<S>; + + fn layer(&self, service: S) -> Self::Service { + ConcurrencyLimit::with_semaphore(service, self.semaphore.clone()) + } +} diff --git a/vendor/tower/src/limit/concurrency/mod.rs b/vendor/tower/src/limit/concurrency/mod.rs new file mode 100644 index 00000000..ac0be8a5 --- /dev/null +++ b/vendor/tower/src/limit/concurrency/mod.rs @@ -0,0 +1,10 @@ +//! Limit the max number of requests being concurrently processed. + +pub mod future; +mod layer; +mod service; + +pub use self::{ + layer::{ConcurrencyLimitLayer, GlobalConcurrencyLimitLayer}, + service::ConcurrencyLimit, +}; diff --git a/vendor/tower/src/limit/concurrency/service.rs b/vendor/tower/src/limit/concurrency/service.rs new file mode 100644 index 00000000..bb9cf5ee --- /dev/null +++ b/vendor/tower/src/limit/concurrency/service.rs @@ -0,0 +1,118 @@ +use super::future::ResponseFuture; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio_util::sync::PollSemaphore; +use tower_service::Service; + +use futures_core::ready; +use std::{ + sync::Arc, + task::{Context, Poll}, +}; + +/// Enforces a limit on the concurrent number of requests the underlying +/// service can handle. +#[derive(Debug)] +pub struct ConcurrencyLimit<T> { + inner: T, + semaphore: PollSemaphore, + /// The currently acquired semaphore permit, if there is sufficient + /// concurrency to send a new request. + /// + /// The permit is acquired in `poll_ready`, and taken in `call` when sending + /// a new request. + permit: Option<OwnedSemaphorePermit>, +} + +impl<T> ConcurrencyLimit<T> { + /// Create a new concurrency limiter. + pub fn new(inner: T, max: usize) -> Self { + Self::with_semaphore(inner, Arc::new(Semaphore::new(max))) + } + + /// Create a new concurrency limiter with a provided shared semaphore + pub fn with_semaphore(inner: T, semaphore: Arc<Semaphore>) -> Self { + ConcurrencyLimit { + inner, + semaphore: PollSemaphore::new(semaphore), + permit: None, + } + } + + /// Get a reference to the inner service + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Get a mutable reference to the inner service + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consume `self`, returning the inner service + pub fn into_inner(self) -> T { + self.inner + } +} + +impl<S, Request> Service<Request> for ConcurrencyLimit<S> +where + S: Service<Request>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = ResponseFuture<S::Future>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + // If we haven't already acquired a permit from the semaphore, try to + // acquire one first. + if self.permit.is_none() { + self.permit = ready!(self.semaphore.poll_acquire(cx)); + debug_assert!( + self.permit.is_some(), + "ConcurrencyLimit semaphore is never closed, so `poll_acquire` \ + should never fail", + ); + } + + // Once we've acquired a permit (or if we already had one), poll the + // inner service. + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + // Take the permit + let permit = self + .permit + .take() + .expect("max requests in-flight; poll_ready must be called first"); + + // Call the inner service + let future = self.inner.call(request); + + ResponseFuture::new(future, permit) + } +} + +impl<T: Clone> Clone for ConcurrencyLimit<T> { + fn clone(&self) -> Self { + // Since we hold an `OwnedSemaphorePermit`, we can't derive `Clone`. + // Instead, when cloning the service, create a new service with the + // same semaphore, but with the permit in the un-acquired state. + Self { + inner: self.inner.clone(), + semaphore: self.semaphore.clone(), + permit: None, + } + } +} + +#[cfg(feature = "load")] +impl<S> crate::load::Load for ConcurrencyLimit<S> +where + S: crate::load::Load, +{ + type Metric = S::Metric; + fn load(&self) -> Self::Metric { + self.inner.load() + } +} diff --git a/vendor/tower/src/limit/mod.rs b/vendor/tower/src/limit/mod.rs new file mode 100644 index 00000000..6a10dcae --- /dev/null +++ b/vendor/tower/src/limit/mod.rs @@ -0,0 +1,9 @@ +//! Tower middleware for limiting requests. + +pub mod concurrency; +pub mod rate; + +pub use self::{ + concurrency::{ConcurrencyLimit, ConcurrencyLimitLayer, GlobalConcurrencyLimitLayer}, + rate::{RateLimit, RateLimitLayer}, +}; diff --git a/vendor/tower/src/limit/rate/layer.rs b/vendor/tower/src/limit/rate/layer.rs new file mode 100644 index 00000000..5f8d31aa --- /dev/null +++ b/vendor/tower/src/limit/rate/layer.rs @@ -0,0 +1,26 @@ +use super::{Rate, RateLimit}; +use std::time::Duration; +use tower_layer::Layer; + +/// Enforces a rate limit on the number of requests the underlying +/// service can handle over a period of time. +#[derive(Debug, Clone)] +pub struct RateLimitLayer { + rate: Rate, +} + +impl RateLimitLayer { + /// Create new rate limit layer. + pub const fn new(num: u64, per: Duration) -> Self { + let rate = Rate::new(num, per); + RateLimitLayer { rate } + } +} + +impl<S> Layer<S> for RateLimitLayer { + type Service = RateLimit<S>; + + fn layer(&self, service: S) -> Self::Service { + RateLimit::new(service, self.rate) + } +} diff --git a/vendor/tower/src/limit/rate/mod.rs b/vendor/tower/src/limit/rate/mod.rs new file mode 100644 index 00000000..52b179b8 --- /dev/null +++ b/vendor/tower/src/limit/rate/mod.rs @@ -0,0 +1,8 @@ +//! Limit the rate at which requests are processed. + +mod layer; +#[allow(clippy::module_inception)] +mod rate; +mod service; + +pub use self::{layer::RateLimitLayer, rate::Rate, service::RateLimit}; diff --git a/vendor/tower/src/limit/rate/rate.rs b/vendor/tower/src/limit/rate/rate.rs new file mode 100644 index 00000000..66736dea --- /dev/null +++ b/vendor/tower/src/limit/rate/rate.rs @@ -0,0 +1,30 @@ +use std::time::Duration; + +/// A rate of requests per time period. +#[derive(Debug, Copy, Clone)] +pub struct Rate { + num: u64, + per: Duration, +} + +impl Rate { + /// Create a new rate. + /// + /// # Panics + /// + /// This function panics if `num` or `per` is 0. + pub const fn new(num: u64, per: Duration) -> Self { + assert!(num > 0); + assert!(per.as_nanos() > 0); + + Rate { num, per } + } + + pub(crate) fn num(&self) -> u64 { + self.num + } + + pub(crate) fn per(&self) -> Duration { + self.per + } +} diff --git a/vendor/tower/src/limit/rate/service.rs b/vendor/tower/src/limit/rate/service.rs new file mode 100644 index 00000000..0aa1d694 --- /dev/null +++ b/vendor/tower/src/limit/rate/service.rs @@ -0,0 +1,130 @@ +use super::Rate; +use futures_core::ready; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::time::{Instant, Sleep}; +use tower_service::Service; + +/// Enforces a rate limit on the number of requests the underlying +/// service can handle over a period of time. +#[derive(Debug)] +pub struct RateLimit<T> { + inner: T, + rate: Rate, + state: State, + sleep: Pin<Box<Sleep>>, +} + +#[derive(Debug)] +enum State { + // The service has hit its limit + Limited, + Ready { until: Instant, rem: u64 }, +} + +impl<T> RateLimit<T> { + /// Create a new rate limiter + pub fn new(inner: T, rate: Rate) -> Self { + let until = Instant::now(); + let state = State::Ready { + until, + rem: rate.num(), + }; + + RateLimit { + inner, + rate, + state, + // The sleep won't actually be used with this duration, but + // we create it eagerly so that we can reset it in place rather than + // `Box::pin`ning a new `Sleep` every time we need one. + sleep: Box::pin(tokio::time::sleep_until(until)), + } + } + + /// Get a reference to the inner service + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Get a mutable reference to the inner service + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consume `self`, returning the inner service + pub fn into_inner(self) -> T { + self.inner + } +} + +impl<S, Request> Service<Request> for RateLimit<S> +where + S: Service<Request>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + match self.state { + State::Ready { .. } => return Poll::Ready(ready!(self.inner.poll_ready(cx))), + State::Limited => { + if Pin::new(&mut self.sleep).poll(cx).is_pending() { + tracing::trace!("rate limit exceeded; sleeping."); + return Poll::Pending; + } + } + } + + self.state = State::Ready { + until: Instant::now() + self.rate.per(), + rem: self.rate.num(), + }; + + Poll::Ready(ready!(self.inner.poll_ready(cx))) + } + + fn call(&mut self, request: Request) -> Self::Future { + match self.state { + State::Ready { mut until, mut rem } => { + let now = Instant::now(); + + // If the period has elapsed, reset it. + if now >= until { + until = now + self.rate.per(); + rem = self.rate.num(); + } + + if rem > 1 { + rem -= 1; + self.state = State::Ready { until, rem }; + } else { + // The service is disabled until further notice + // Reset the sleep future in place, so that we don't have to + // deallocate the existing box and allocate a new one. + self.sleep.as_mut().reset(until); + self.state = State::Limited; + } + + // Call the inner future + self.inner.call(request) + } + State::Limited => panic!("service not ready; poll_ready must be called first"), + } + } +} + +#[cfg(feature = "load")] +impl<S> crate::load::Load for RateLimit<S> +where + S: crate::load::Load, +{ + type Metric = S::Metric; + fn load(&self) -> Self::Metric { + self.inner.load() + } +} |
