diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
| commit | 8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch) | |
| tree | 22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/tower/src/buffer | |
| parent | 4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff) | |
chore: add vendor directory
Diffstat (limited to 'vendor/tower/src/buffer')
| -rw-r--r-- | vendor/tower/src/buffer/error.rs | 68 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/future.rs | 79 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/layer.rs | 75 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/message.rs | 16 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/mod.rs | 47 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/service.rs | 144 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/worker.rs | 227 |
7 files changed, 656 insertions, 0 deletions
diff --git a/vendor/tower/src/buffer/error.rs b/vendor/tower/src/buffer/error.rs new file mode 100644 index 00000000..f046cbca --- /dev/null +++ b/vendor/tower/src/buffer/error.rs @@ -0,0 +1,68 @@ +//! Error types for the `Buffer` middleware. + +use crate::BoxError; +use std::{fmt, sync::Arc}; + +/// An error produced by a [`Service`] wrapped by a [`Buffer`] +/// +/// [`Service`]: crate::Service +/// [`Buffer`]: crate::buffer::Buffer +#[derive(Debug)] +pub struct ServiceError { + inner: Arc<BoxError>, +} + +/// An error produced when the a buffer's worker closes unexpectedly. +pub struct Closed { + _p: (), +} + +// ===== impl ServiceError ===== + +impl ServiceError { + pub(crate) fn new(inner: BoxError) -> ServiceError { + let inner = Arc::new(inner); + ServiceError { inner } + } + + // Private to avoid exposing `Clone` trait as part of the public API + pub(crate) fn clone(&self) -> ServiceError { + ServiceError { + inner: self.inner.clone(), + } + } +} + +impl fmt::Display for ServiceError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "buffered service failed: {}", self.inner) + } +} + +impl std::error::Error for ServiceError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&**self.inner) + } +} + +// ===== impl Closed ===== + +impl Closed { + pub(crate) fn new() -> Self { + Closed { _p: () } + } +} + +impl fmt::Debug for Closed { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("Closed").finish() + } +} + +impl fmt::Display for Closed { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.write_str("buffer's worker closed unexpectedly") + } +} + +impl std::error::Error for Closed {} diff --git a/vendor/tower/src/buffer/future.rs b/vendor/tower/src/buffer/future.rs new file mode 100644 index 00000000..41178900 --- /dev/null +++ b/vendor/tower/src/buffer/future.rs @@ -0,0 +1,79 @@ +//! Future types for the [`Buffer`] middleware. +//! +//! [`Buffer`]: crate::buffer::Buffer + +use super::{error::Closed, message}; +use futures_core::ready; +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Future that completes when the buffered service eventually services the submitted request. + #[derive(Debug)] + pub struct ResponseFuture<T> { + #[pin] + state: ResponseState<T>, + } +} + +pin_project! { + #[project = ResponseStateProj] + #[derive(Debug)] + enum ResponseState<T> { + Failed { + error: Option<crate::BoxError>, + }, + Rx { + #[pin] + rx: message::Rx<T>, + }, + Poll { + #[pin] + fut: T, + }, + } +} + +impl<T> ResponseFuture<T> { + pub(crate) fn new(rx: message::Rx<T>) -> Self { + ResponseFuture { + state: ResponseState::Rx { rx }, + } + } + + pub(crate) fn failed(err: crate::BoxError) -> Self { + ResponseFuture { + state: ResponseState::Failed { error: Some(err) }, + } + } +} + +impl<F, T, E> Future for ResponseFuture<F> +where + F: Future<Output = Result<T, E>>, + E: Into<crate::BoxError>, +{ + type Output = Result<T, crate::BoxError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + loop { + match this.state.as_mut().project() { + ResponseStateProj::Failed { error } => { + return Poll::Ready(Err(error.take().expect("polled after error"))); + } + ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) { + Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }), + Ok(Err(e)) => return Poll::Ready(Err(e.into())), + Err(_) => return Poll::Ready(Err(Closed::new().into())), + }, + ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into), + } + } + } +} diff --git a/vendor/tower/src/buffer/layer.rs b/vendor/tower/src/buffer/layer.rs new file mode 100644 index 00000000..3fc26ab5 --- /dev/null +++ b/vendor/tower/src/buffer/layer.rs @@ -0,0 +1,75 @@ +use super::service::Buffer; +use std::{fmt, marker::PhantomData}; +use tower_layer::Layer; +use tower_service::Service; + +/// Adds an mpsc buffer in front of an inner service. +/// +/// The default Tokio executor is used to run the given service, +/// which means that this layer can only be used on the Tokio runtime. +/// +/// See the module documentation for more details. +pub struct BufferLayer<Request> { + bound: usize, + _p: PhantomData<fn(Request)>, +} + +impl<Request> BufferLayer<Request> { + /// Creates a new [`BufferLayer`] with the provided `bound`. + /// + /// `bound` gives the maximal number of requests that can be queued for the service before + /// backpressure is applied to callers. + /// + /// # A note on choosing a `bound` + /// + /// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a + /// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive, + /// this reserved slot may be held up for a long time. As a result, it's advisable to set + /// `bound` to be at least the maximum number of concurrent requests the [`Buffer`] will see. + /// If you do not, all the slots in the buffer may be held up by futures that have just called + /// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new + /// requests. + /// + /// [`Poll::Ready`]: std::task::Poll::Ready + /// [`call`]: crate::Service::call + /// [`poll_ready`]: crate::Service::poll_ready + pub const fn new(bound: usize) -> Self { + BufferLayer { + bound, + _p: PhantomData, + } + } +} + +impl<S, Request> Layer<S> for BufferLayer<Request> +where + S: Service<Request> + Send + 'static, + S::Future: Send, + S::Error: Into<crate::BoxError> + Send + Sync, + Request: Send + 'static, +{ + type Service = Buffer<Request, S::Future>; + + fn layer(&self, service: S) -> Self::Service { + Buffer::new(service, self.bound) + } +} + +impl<Request> fmt::Debug for BufferLayer<Request> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("BufferLayer") + .field("bound", &self.bound) + .finish() + } +} + +impl<Request> Clone for BufferLayer<Request> { + fn clone(&self) -> Self { + Self { + bound: self.bound, + _p: PhantomData, + } + } +} + +impl<Request> Copy for BufferLayer<Request> {} diff --git a/vendor/tower/src/buffer/message.rs b/vendor/tower/src/buffer/message.rs new file mode 100644 index 00000000..6d13aa12 --- /dev/null +++ b/vendor/tower/src/buffer/message.rs @@ -0,0 +1,16 @@ +use super::error::ServiceError; +use tokio::sync::oneshot; + +/// Message sent over buffer +#[derive(Debug)] +pub(crate) struct Message<Request, Fut> { + pub(crate) request: Request, + pub(crate) tx: Tx<Fut>, + pub(crate) span: tracing::Span, +} + +/// Response sender +pub(crate) type Tx<Fut> = oneshot::Sender<Result<Fut, ServiceError>>; + +/// Response receiver +pub(crate) type Rx<Fut> = oneshot::Receiver<Result<Fut, ServiceError>>; diff --git a/vendor/tower/src/buffer/mod.rs b/vendor/tower/src/buffer/mod.rs new file mode 100644 index 00000000..923b4420 --- /dev/null +++ b/vendor/tower/src/buffer/mod.rs @@ -0,0 +1,47 @@ +//! Middleware that provides a buffered mpsc channel to a service. +//! +//! Sometimes you want to give out multiple handles to a single service, and allow each handle to +//! enqueue requests. That is, you want a [`Service`] to be [`Clone`]. This module allows you to do +//! that by placing the service behind a multi-producer, single-consumer buffering channel. Clients +//! enqueue requests by sending on the channel from any of the handles ([`Buffer`]), and the single +//! service running elsewhere (usually spawned) receives and services the requests one by one. Each +//! request is enqueued alongside a response channel that allows the service to report the result +//! of the request back to the caller. +//! +//! # Examples +//! +//! ```rust +//! # #[cfg(feature = "util")] +//! use tower::buffer::Buffer; +//! # #[cfg(feature = "util")] +//! use tower::{Service, ServiceExt}; +//! # #[cfg(feature = "util")] +//! async fn mass_produce<S: Service<usize>>(svc: S) +//! where +//! S: 'static + Send, +//! S::Error: Send + Sync + std::error::Error, +//! S::Future: Send +//! { +//! let svc = Buffer::new(svc, 10 /* buffer length */); +//! for _ in 0..10 { +//! let mut svc = svc.clone(); +//! tokio::spawn(async move { +//! for i in 0usize.. { +//! svc.ready().await.expect("service crashed").call(i).await; +//! } +//! }); +//! } +//! } +//! ``` +//! +//! [`Service`]: crate::Service + +pub mod error; +pub mod future; +mod layer; +mod message; +mod service; +mod worker; + +pub use self::layer::BufferLayer; +pub use self::service::Buffer; diff --git a/vendor/tower/src/buffer/service.rs b/vendor/tower/src/buffer/service.rs new file mode 100644 index 00000000..9493f107 --- /dev/null +++ b/vendor/tower/src/buffer/service.rs @@ -0,0 +1,144 @@ +use super::{ + future::ResponseFuture, + message::Message, + worker::{Handle, Worker}, +}; + +use std::{ + future::Future, + task::{Context, Poll}, +}; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::sync::PollSender; +use tower_service::Service; + +/// Adds an mpsc buffer in front of an inner service. +/// +/// See the module documentation for more details. +#[derive(Debug)] +pub struct Buffer<Req, F> { + tx: PollSender<Message<Req, F>>, + handle: Handle, +} + +impl<Req, F> Buffer<Req, F> +where + F: 'static, +{ + /// Creates a new [`Buffer`] wrapping `service`. + /// + /// `bound` gives the maximal number of requests that can be queued for the service before + /// backpressure is applied to callers. + /// + /// The default Tokio executor is used to run the given service, which means that this method + /// must be called while on the Tokio runtime. + /// + /// # A note on choosing a `bound` + /// + /// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a + /// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive, + /// this reserved slot may be held up for a long time. As a result, it's advisable to set + /// `bound` to be at least the maximum number of concurrent requests the [`Buffer`] will see. + /// If you do not, all the slots in the buffer may be held up by futures that have just called + /// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new + /// requests. + /// + /// [`Poll::Ready`]: std::task::Poll::Ready + /// [`call`]: crate::Service::call + /// [`poll_ready`]: crate::Service::poll_ready + pub fn new<S>(service: S, bound: usize) -> Self + where + S: Service<Req, Future = F> + Send + 'static, + F: Send, + S::Error: Into<crate::BoxError> + Send + Sync, + Req: Send + 'static, + { + let (service, worker) = Self::pair(service, bound); + tokio::spawn(worker); + service + } + + /// Creates a new [`Buffer`] wrapping `service`, but returns the background worker. + /// + /// This is useful if you do not want to spawn directly onto the tokio runtime + /// but instead want to use your own executor. This will return the [`Buffer`] and + /// the background `Worker` that you can then spawn. + pub fn pair<S>(service: S, bound: usize) -> (Self, Worker<S, Req>) + where + S: Service<Req, Future = F> + Send + 'static, + F: Send, + S::Error: Into<crate::BoxError> + Send + Sync, + Req: Send + 'static, + { + let (tx, rx) = mpsc::channel(bound); + let (handle, worker) = Worker::new(service, rx); + let buffer = Self { + tx: PollSender::new(tx), + handle, + }; + (buffer, worker) + } + + fn get_worker_error(&self) -> crate::BoxError { + self.handle.get_error_on_closed() + } +} + +impl<Req, Rsp, F, E> Service<Req> for Buffer<Req, F> +where + F: Future<Output = Result<Rsp, E>> + Send + 'static, + E: Into<crate::BoxError>, + Req: Send + 'static, +{ + type Response = Rsp; + type Error = crate::BoxError; + type Future = ResponseFuture<F>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + // First, check if the worker is still alive. + if self.tx.is_closed() { + // If the inner service has errored, then we error here. + return Poll::Ready(Err(self.get_worker_error())); + } + + // Poll the sender to acquire a permit. + self.tx + .poll_reserve(cx) + .map_err(|_| self.get_worker_error()) + } + + fn call(&mut self, request: Req) -> Self::Future { + tracing::trace!("sending request to buffer worker"); + + // get the current Span so that we can explicitly propagate it to the worker + // if we didn't do this, events on the worker related to this span wouldn't be counted + // towards that span since the worker would have no way of entering it. + let span = tracing::Span::current(); + + // If we've made it here, then a channel permit has already been + // acquired, so we can freely allocate a oneshot. + let (tx, rx) = oneshot::channel(); + + match self.tx.send_item(Message { request, span, tx }) { + Ok(_) => ResponseFuture::new(rx), + // If the channel is closed, propagate the error from the worker. + Err(_) => { + tracing::trace!("buffer channel closed"); + ResponseFuture::failed(self.get_worker_error()) + } + } + } +} + +impl<Req, F> Clone for Buffer<Req, F> +where + Req: Send + 'static, + F: Send + 'static, +{ + fn clone(&self) -> Self { + Self { + handle: self.handle.clone(), + tx: self.tx.clone(), + } + } +} diff --git a/vendor/tower/src/buffer/worker.rs b/vendor/tower/src/buffer/worker.rs new file mode 100644 index 00000000..7f4416d6 --- /dev/null +++ b/vendor/tower/src/buffer/worker.rs @@ -0,0 +1,227 @@ +use super::{ + error::{Closed, ServiceError}, + message::Message, +}; +use futures_core::ready; +use std::sync::{Arc, Mutex}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::sync::mpsc; +use tower_service::Service; + +pin_project_lite::pin_project! { + /// Task that handles processing the buffer. This type should not be used + /// directly, instead `Buffer` requires an `Executor` that can accept this task. + /// + /// The struct is `pub` in the private module and the type is *not* re-exported + /// as part of the public API. This is the "sealed" pattern to include "private" + /// types in public traits that are not meant for consumers of the library to + /// implement (only call). + #[derive(Debug)] + pub struct Worker<T, Request> + where + T: Service<Request>, + { + current_message: Option<Message<Request, T::Future>>, + rx: mpsc::Receiver<Message<Request, T::Future>>, + service: T, + finish: bool, + failed: Option<ServiceError>, + handle: Handle, + } +} + +/// Get the error out +#[derive(Debug)] +pub(crate) struct Handle { + inner: Arc<Mutex<Option<ServiceError>>>, +} + +impl<T, Request> Worker<T, Request> +where + T: Service<Request>, + T::Error: Into<crate::BoxError>, +{ + pub(crate) fn new( + service: T, + rx: mpsc::Receiver<Message<Request, T::Future>>, + ) -> (Handle, Worker<T, Request>) { + let handle = Handle { + inner: Arc::new(Mutex::new(None)), + }; + + let worker = Worker { + current_message: None, + finish: false, + failed: None, + rx, + service, + handle: handle.clone(), + }; + + (handle, worker) + } + + /// Return the next queued Message that hasn't been canceled. + /// + /// If a `Message` is returned, the `bool` is true if this is the first time we received this + /// message, and false otherwise (i.e., we tried to forward it to the backing service before). + fn poll_next_msg( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<Option<(Message<Request, T::Future>, bool)>> { + if self.finish { + // We've already received None and are shutting down + return Poll::Ready(None); + } + + tracing::trace!("worker polling for next message"); + if let Some(msg) = self.current_message.take() { + // If the oneshot sender is closed, then the receiver is dropped, + // and nobody cares about the response. If this is the case, we + // should continue to the next request. + if !msg.tx.is_closed() { + tracing::trace!("resuming buffered request"); + return Poll::Ready(Some((msg, false))); + } + + tracing::trace!("dropping cancelled buffered request"); + } + + // Get the next request + while let Some(msg) = ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + if !msg.tx.is_closed() { + tracing::trace!("processing new request"); + return Poll::Ready(Some((msg, true))); + } + // Otherwise, request is canceled, so pop the next one. + tracing::trace!("dropping cancelled request"); + } + + Poll::Ready(None) + } + + fn failed(&mut self, error: crate::BoxError) { + // The underlying service failed when we called `poll_ready` on it with the given `error`. We + // need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in + // an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent + // requests will also fail with the same error. + + // Note that we need to handle the case where some handle is concurrently trying to send us + // a request. We need to make sure that *either* the send of the request fails *or* it + // receives an error on the `oneshot` it constructed. Specifically, we want to avoid the + // case where we send errors to all outstanding requests, and *then* the caller sends its + // request. We do this by *first* exposing the error, *then* closing the channel used to + // send more requests (so the client will see the error when the send fails), and *then* + // sending the error to all outstanding requests. + let error = ServiceError::new(error); + + let mut inner = self.handle.inner.lock().unwrap(); + + if inner.is_some() { + // Future::poll was called after we've already errored out! + return; + } + + *inner = Some(error.clone()); + drop(inner); + + self.rx.close(); + + // By closing the mpsc::Receiver, we know that poll_next_msg will soon return Ready(None), + // which will trigger the `self.finish == true` phase. We just need to make sure that any + // requests that we receive before we've exhausted the receiver receive the error: + self.failed = Some(error); + } +} + +impl<T, Request> Future for Worker<T, Request> +where + T: Service<Request>, + T::Error: Into<crate::BoxError>, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if self.finish { + return Poll::Ready(()); + } + + loop { + match ready!(self.poll_next_msg(cx)) { + Some((msg, first)) => { + let _guard = msg.span.enter(); + if let Some(ref failed) = self.failed { + tracing::trace!("notifying caller about worker failure"); + let _ = msg.tx.send(Err(failed.clone())); + continue; + } + + // Wait for the service to be ready + tracing::trace!( + resumed = !first, + message = "worker received request; waiting for service readiness" + ); + match self.service.poll_ready(cx) { + Poll::Ready(Ok(())) => { + tracing::debug!(service.ready = true, message = "processing request"); + let response = self.service.call(msg.request); + + // Send the response future back to the sender. + // + // An error means the request had been canceled in-between + // our calls, the response future will just be dropped. + tracing::trace!("returning response future"); + let _ = msg.tx.send(Ok(response)); + } + Poll::Pending => { + tracing::trace!(service.ready = false, message = "delay"); + // Put out current message back in its slot. + drop(_guard); + self.current_message = Some(msg); + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + let error = e.into(); + tracing::debug!({ %error }, "service failed"); + drop(_guard); + self.failed(error); + let _ = msg.tx.send(Err(self + .failed + .as_ref() + .expect("Worker::failed did not set self.failed?") + .clone())); + } + } + } + None => { + // No more more requests _ever_. + self.finish = true; + return Poll::Ready(()); + } + } + } + } +} + +impl Handle { + pub(crate) fn get_error_on_closed(&self) -> crate::BoxError { + self.inner + .lock() + .unwrap() + .as_ref() + .map(|svc_err| svc_err.clone().into()) + .unwrap_or_else(|| Closed::new().into()) + } +} + +impl Clone for Handle { + fn clone(&self) -> Handle { + Handle { + inner: self.inner.clone(), + } + } +} |
