summaryrefslogtreecommitdiff
path: root/vendor/tower/src/buffer
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
committermo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
commit8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch)
tree22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/tower/src/buffer
parent4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff)
chore: add vendor directory
Diffstat (limited to 'vendor/tower/src/buffer')
-rw-r--r--vendor/tower/src/buffer/error.rs68
-rw-r--r--vendor/tower/src/buffer/future.rs79
-rw-r--r--vendor/tower/src/buffer/layer.rs75
-rw-r--r--vendor/tower/src/buffer/message.rs16
-rw-r--r--vendor/tower/src/buffer/mod.rs47
-rw-r--r--vendor/tower/src/buffer/service.rs144
-rw-r--r--vendor/tower/src/buffer/worker.rs227
7 files changed, 656 insertions, 0 deletions
diff --git a/vendor/tower/src/buffer/error.rs b/vendor/tower/src/buffer/error.rs
new file mode 100644
index 00000000..f046cbca
--- /dev/null
+++ b/vendor/tower/src/buffer/error.rs
@@ -0,0 +1,68 @@
+//! Error types for the `Buffer` middleware.
+
+use crate::BoxError;
+use std::{fmt, sync::Arc};
+
+/// An error produced by a [`Service`] wrapped by a [`Buffer`]
+///
+/// [`Service`]: crate::Service
+/// [`Buffer`]: crate::buffer::Buffer
+#[derive(Debug)]
+pub struct ServiceError {
+ inner: Arc<BoxError>,
+}
+
+/// An error produced when the a buffer's worker closes unexpectedly.
+pub struct Closed {
+ _p: (),
+}
+
+// ===== impl ServiceError =====
+
+impl ServiceError {
+ pub(crate) fn new(inner: BoxError) -> ServiceError {
+ let inner = Arc::new(inner);
+ ServiceError { inner }
+ }
+
+ // Private to avoid exposing `Clone` trait as part of the public API
+ pub(crate) fn clone(&self) -> ServiceError {
+ ServiceError {
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+impl fmt::Display for ServiceError {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "buffered service failed: {}", self.inner)
+ }
+}
+
+impl std::error::Error for ServiceError {
+ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+ Some(&**self.inner)
+ }
+}
+
+// ===== impl Closed =====
+
+impl Closed {
+ pub(crate) fn new() -> Self {
+ Closed { _p: () }
+ }
+}
+
+impl fmt::Debug for Closed {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_tuple("Closed").finish()
+ }
+}
+
+impl fmt::Display for Closed {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.write_str("buffer's worker closed unexpectedly")
+ }
+}
+
+impl std::error::Error for Closed {}
diff --git a/vendor/tower/src/buffer/future.rs b/vendor/tower/src/buffer/future.rs
new file mode 100644
index 00000000..41178900
--- /dev/null
+++ b/vendor/tower/src/buffer/future.rs
@@ -0,0 +1,79 @@
+//! Future types for the [`Buffer`] middleware.
+//!
+//! [`Buffer`]: crate::buffer::Buffer
+
+use super::{error::Closed, message};
+use futures_core::ready;
+use pin_project_lite::pin_project;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+pin_project! {
+ /// Future that completes when the buffered service eventually services the submitted request.
+ #[derive(Debug)]
+ pub struct ResponseFuture<T> {
+ #[pin]
+ state: ResponseState<T>,
+ }
+}
+
+pin_project! {
+ #[project = ResponseStateProj]
+ #[derive(Debug)]
+ enum ResponseState<T> {
+ Failed {
+ error: Option<crate::BoxError>,
+ },
+ Rx {
+ #[pin]
+ rx: message::Rx<T>,
+ },
+ Poll {
+ #[pin]
+ fut: T,
+ },
+ }
+}
+
+impl<T> ResponseFuture<T> {
+ pub(crate) fn new(rx: message::Rx<T>) -> Self {
+ ResponseFuture {
+ state: ResponseState::Rx { rx },
+ }
+ }
+
+ pub(crate) fn failed(err: crate::BoxError) -> Self {
+ ResponseFuture {
+ state: ResponseState::Failed { error: Some(err) },
+ }
+ }
+}
+
+impl<F, T, E> Future for ResponseFuture<F>
+where
+ F: Future<Output = Result<T, E>>,
+ E: Into<crate::BoxError>,
+{
+ type Output = Result<T, crate::BoxError>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+
+ loop {
+ match this.state.as_mut().project() {
+ ResponseStateProj::Failed { error } => {
+ return Poll::Ready(Err(error.take().expect("polled after error")));
+ }
+ ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
+ Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
+ Ok(Err(e)) => return Poll::Ready(Err(e.into())),
+ Err(_) => return Poll::Ready(Err(Closed::new().into())),
+ },
+ ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
+ }
+ }
+ }
+}
diff --git a/vendor/tower/src/buffer/layer.rs b/vendor/tower/src/buffer/layer.rs
new file mode 100644
index 00000000..3fc26ab5
--- /dev/null
+++ b/vendor/tower/src/buffer/layer.rs
@@ -0,0 +1,75 @@
+use super::service::Buffer;
+use std::{fmt, marker::PhantomData};
+use tower_layer::Layer;
+use tower_service::Service;
+
+/// Adds an mpsc buffer in front of an inner service.
+///
+/// The default Tokio executor is used to run the given service,
+/// which means that this layer can only be used on the Tokio runtime.
+///
+/// See the module documentation for more details.
+pub struct BufferLayer<Request> {
+ bound: usize,
+ _p: PhantomData<fn(Request)>,
+}
+
+impl<Request> BufferLayer<Request> {
+ /// Creates a new [`BufferLayer`] with the provided `bound`.
+ ///
+ /// `bound` gives the maximal number of requests that can be queued for the service before
+ /// backpressure is applied to callers.
+ ///
+ /// # A note on choosing a `bound`
+ ///
+ /// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
+ /// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
+ /// this reserved slot may be held up for a long time. As a result, it's advisable to set
+ /// `bound` to be at least the maximum number of concurrent requests the [`Buffer`] will see.
+ /// If you do not, all the slots in the buffer may be held up by futures that have just called
+ /// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new
+ /// requests.
+ ///
+ /// [`Poll::Ready`]: std::task::Poll::Ready
+ /// [`call`]: crate::Service::call
+ /// [`poll_ready`]: crate::Service::poll_ready
+ pub const fn new(bound: usize) -> Self {
+ BufferLayer {
+ bound,
+ _p: PhantomData,
+ }
+ }
+}
+
+impl<S, Request> Layer<S> for BufferLayer<Request>
+where
+ S: Service<Request> + Send + 'static,
+ S::Future: Send,
+ S::Error: Into<crate::BoxError> + Send + Sync,
+ Request: Send + 'static,
+{
+ type Service = Buffer<Request, S::Future>;
+
+ fn layer(&self, service: S) -> Self::Service {
+ Buffer::new(service, self.bound)
+ }
+}
+
+impl<Request> fmt::Debug for BufferLayer<Request> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("BufferLayer")
+ .field("bound", &self.bound)
+ .finish()
+ }
+}
+
+impl<Request> Clone for BufferLayer<Request> {
+ fn clone(&self) -> Self {
+ Self {
+ bound: self.bound,
+ _p: PhantomData,
+ }
+ }
+}
+
+impl<Request> Copy for BufferLayer<Request> {}
diff --git a/vendor/tower/src/buffer/message.rs b/vendor/tower/src/buffer/message.rs
new file mode 100644
index 00000000..6d13aa12
--- /dev/null
+++ b/vendor/tower/src/buffer/message.rs
@@ -0,0 +1,16 @@
+use super::error::ServiceError;
+use tokio::sync::oneshot;
+
+/// Message sent over buffer
+#[derive(Debug)]
+pub(crate) struct Message<Request, Fut> {
+ pub(crate) request: Request,
+ pub(crate) tx: Tx<Fut>,
+ pub(crate) span: tracing::Span,
+}
+
+/// Response sender
+pub(crate) type Tx<Fut> = oneshot::Sender<Result<Fut, ServiceError>>;
+
+/// Response receiver
+pub(crate) type Rx<Fut> = oneshot::Receiver<Result<Fut, ServiceError>>;
diff --git a/vendor/tower/src/buffer/mod.rs b/vendor/tower/src/buffer/mod.rs
new file mode 100644
index 00000000..923b4420
--- /dev/null
+++ b/vendor/tower/src/buffer/mod.rs
@@ -0,0 +1,47 @@
+//! Middleware that provides a buffered mpsc channel to a service.
+//!
+//! Sometimes you want to give out multiple handles to a single service, and allow each handle to
+//! enqueue requests. That is, you want a [`Service`] to be [`Clone`]. This module allows you to do
+//! that by placing the service behind a multi-producer, single-consumer buffering channel. Clients
+//! enqueue requests by sending on the channel from any of the handles ([`Buffer`]), and the single
+//! service running elsewhere (usually spawned) receives and services the requests one by one. Each
+//! request is enqueued alongside a response channel that allows the service to report the result
+//! of the request back to the caller.
+//!
+//! # Examples
+//!
+//! ```rust
+//! # #[cfg(feature = "util")]
+//! use tower::buffer::Buffer;
+//! # #[cfg(feature = "util")]
+//! use tower::{Service, ServiceExt};
+//! # #[cfg(feature = "util")]
+//! async fn mass_produce<S: Service<usize>>(svc: S)
+//! where
+//! S: 'static + Send,
+//! S::Error: Send + Sync + std::error::Error,
+//! S::Future: Send
+//! {
+//! let svc = Buffer::new(svc, 10 /* buffer length */);
+//! for _ in 0..10 {
+//! let mut svc = svc.clone();
+//! tokio::spawn(async move {
+//! for i in 0usize.. {
+//! svc.ready().await.expect("service crashed").call(i).await;
+//! }
+//! });
+//! }
+//! }
+//! ```
+//!
+//! [`Service`]: crate::Service
+
+pub mod error;
+pub mod future;
+mod layer;
+mod message;
+mod service;
+mod worker;
+
+pub use self::layer::BufferLayer;
+pub use self::service::Buffer;
diff --git a/vendor/tower/src/buffer/service.rs b/vendor/tower/src/buffer/service.rs
new file mode 100644
index 00000000..9493f107
--- /dev/null
+++ b/vendor/tower/src/buffer/service.rs
@@ -0,0 +1,144 @@
+use super::{
+ future::ResponseFuture,
+ message::Message,
+ worker::{Handle, Worker},
+};
+
+use std::{
+ future::Future,
+ task::{Context, Poll},
+};
+use tokio::sync::{mpsc, oneshot};
+use tokio_util::sync::PollSender;
+use tower_service::Service;
+
+/// Adds an mpsc buffer in front of an inner service.
+///
+/// See the module documentation for more details.
+#[derive(Debug)]
+pub struct Buffer<Req, F> {
+ tx: PollSender<Message<Req, F>>,
+ handle: Handle,
+}
+
+impl<Req, F> Buffer<Req, F>
+where
+ F: 'static,
+{
+ /// Creates a new [`Buffer`] wrapping `service`.
+ ///
+ /// `bound` gives the maximal number of requests that can be queued for the service before
+ /// backpressure is applied to callers.
+ ///
+ /// The default Tokio executor is used to run the given service, which means that this method
+ /// must be called while on the Tokio runtime.
+ ///
+ /// # A note on choosing a `bound`
+ ///
+ /// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
+ /// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
+ /// this reserved slot may be held up for a long time. As a result, it's advisable to set
+ /// `bound` to be at least the maximum number of concurrent requests the [`Buffer`] will see.
+ /// If you do not, all the slots in the buffer may be held up by futures that have just called
+ /// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new
+ /// requests.
+ ///
+ /// [`Poll::Ready`]: std::task::Poll::Ready
+ /// [`call`]: crate::Service::call
+ /// [`poll_ready`]: crate::Service::poll_ready
+ pub fn new<S>(service: S, bound: usize) -> Self
+ where
+ S: Service<Req, Future = F> + Send + 'static,
+ F: Send,
+ S::Error: Into<crate::BoxError> + Send + Sync,
+ Req: Send + 'static,
+ {
+ let (service, worker) = Self::pair(service, bound);
+ tokio::spawn(worker);
+ service
+ }
+
+ /// Creates a new [`Buffer`] wrapping `service`, but returns the background worker.
+ ///
+ /// This is useful if you do not want to spawn directly onto the tokio runtime
+ /// but instead want to use your own executor. This will return the [`Buffer`] and
+ /// the background `Worker` that you can then spawn.
+ pub fn pair<S>(service: S, bound: usize) -> (Self, Worker<S, Req>)
+ where
+ S: Service<Req, Future = F> + Send + 'static,
+ F: Send,
+ S::Error: Into<crate::BoxError> + Send + Sync,
+ Req: Send + 'static,
+ {
+ let (tx, rx) = mpsc::channel(bound);
+ let (handle, worker) = Worker::new(service, rx);
+ let buffer = Self {
+ tx: PollSender::new(tx),
+ handle,
+ };
+ (buffer, worker)
+ }
+
+ fn get_worker_error(&self) -> crate::BoxError {
+ self.handle.get_error_on_closed()
+ }
+}
+
+impl<Req, Rsp, F, E> Service<Req> for Buffer<Req, F>
+where
+ F: Future<Output = Result<Rsp, E>> + Send + 'static,
+ E: Into<crate::BoxError>,
+ Req: Send + 'static,
+{
+ type Response = Rsp;
+ type Error = crate::BoxError;
+ type Future = ResponseFuture<F>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ // First, check if the worker is still alive.
+ if self.tx.is_closed() {
+ // If the inner service has errored, then we error here.
+ return Poll::Ready(Err(self.get_worker_error()));
+ }
+
+ // Poll the sender to acquire a permit.
+ self.tx
+ .poll_reserve(cx)
+ .map_err(|_| self.get_worker_error())
+ }
+
+ fn call(&mut self, request: Req) -> Self::Future {
+ tracing::trace!("sending request to buffer worker");
+
+ // get the current Span so that we can explicitly propagate it to the worker
+ // if we didn't do this, events on the worker related to this span wouldn't be counted
+ // towards that span since the worker would have no way of entering it.
+ let span = tracing::Span::current();
+
+ // If we've made it here, then a channel permit has already been
+ // acquired, so we can freely allocate a oneshot.
+ let (tx, rx) = oneshot::channel();
+
+ match self.tx.send_item(Message { request, span, tx }) {
+ Ok(_) => ResponseFuture::new(rx),
+ // If the channel is closed, propagate the error from the worker.
+ Err(_) => {
+ tracing::trace!("buffer channel closed");
+ ResponseFuture::failed(self.get_worker_error())
+ }
+ }
+ }
+}
+
+impl<Req, F> Clone for Buffer<Req, F>
+where
+ Req: Send + 'static,
+ F: Send + 'static,
+{
+ fn clone(&self) -> Self {
+ Self {
+ handle: self.handle.clone(),
+ tx: self.tx.clone(),
+ }
+ }
+}
diff --git a/vendor/tower/src/buffer/worker.rs b/vendor/tower/src/buffer/worker.rs
new file mode 100644
index 00000000..7f4416d6
--- /dev/null
+++ b/vendor/tower/src/buffer/worker.rs
@@ -0,0 +1,227 @@
+use super::{
+ error::{Closed, ServiceError},
+ message::Message,
+};
+use futures_core::ready;
+use std::sync::{Arc, Mutex};
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tokio::sync::mpsc;
+use tower_service::Service;
+
+pin_project_lite::pin_project! {
+ /// Task that handles processing the buffer. This type should not be used
+ /// directly, instead `Buffer` requires an `Executor` that can accept this task.
+ ///
+ /// The struct is `pub` in the private module and the type is *not* re-exported
+ /// as part of the public API. This is the "sealed" pattern to include "private"
+ /// types in public traits that are not meant for consumers of the library to
+ /// implement (only call).
+ #[derive(Debug)]
+ pub struct Worker<T, Request>
+ where
+ T: Service<Request>,
+ {
+ current_message: Option<Message<Request, T::Future>>,
+ rx: mpsc::Receiver<Message<Request, T::Future>>,
+ service: T,
+ finish: bool,
+ failed: Option<ServiceError>,
+ handle: Handle,
+ }
+}
+
+/// Get the error out
+#[derive(Debug)]
+pub(crate) struct Handle {
+ inner: Arc<Mutex<Option<ServiceError>>>,
+}
+
+impl<T, Request> Worker<T, Request>
+where
+ T: Service<Request>,
+ T::Error: Into<crate::BoxError>,
+{
+ pub(crate) fn new(
+ service: T,
+ rx: mpsc::Receiver<Message<Request, T::Future>>,
+ ) -> (Handle, Worker<T, Request>) {
+ let handle = Handle {
+ inner: Arc::new(Mutex::new(None)),
+ };
+
+ let worker = Worker {
+ current_message: None,
+ finish: false,
+ failed: None,
+ rx,
+ service,
+ handle: handle.clone(),
+ };
+
+ (handle, worker)
+ }
+
+ /// Return the next queued Message that hasn't been canceled.
+ ///
+ /// If a `Message` is returned, the `bool` is true if this is the first time we received this
+ /// message, and false otherwise (i.e., we tried to forward it to the backing service before).
+ fn poll_next_msg(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<(Message<Request, T::Future>, bool)>> {
+ if self.finish {
+ // We've already received None and are shutting down
+ return Poll::Ready(None);
+ }
+
+ tracing::trace!("worker polling for next message");
+ if let Some(msg) = self.current_message.take() {
+ // If the oneshot sender is closed, then the receiver is dropped,
+ // and nobody cares about the response. If this is the case, we
+ // should continue to the next request.
+ if !msg.tx.is_closed() {
+ tracing::trace!("resuming buffered request");
+ return Poll::Ready(Some((msg, false)));
+ }
+
+ tracing::trace!("dropping cancelled buffered request");
+ }
+
+ // Get the next request
+ while let Some(msg) = ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
+ if !msg.tx.is_closed() {
+ tracing::trace!("processing new request");
+ return Poll::Ready(Some((msg, true)));
+ }
+ // Otherwise, request is canceled, so pop the next one.
+ tracing::trace!("dropping cancelled request");
+ }
+
+ Poll::Ready(None)
+ }
+
+ fn failed(&mut self, error: crate::BoxError) {
+ // The underlying service failed when we called `poll_ready` on it with the given `error`. We
+ // need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in
+ // an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent
+ // requests will also fail with the same error.
+
+ // Note that we need to handle the case where some handle is concurrently trying to send us
+ // a request. We need to make sure that *either* the send of the request fails *or* it
+ // receives an error on the `oneshot` it constructed. Specifically, we want to avoid the
+ // case where we send errors to all outstanding requests, and *then* the caller sends its
+ // request. We do this by *first* exposing the error, *then* closing the channel used to
+ // send more requests (so the client will see the error when the send fails), and *then*
+ // sending the error to all outstanding requests.
+ let error = ServiceError::new(error);
+
+ let mut inner = self.handle.inner.lock().unwrap();
+
+ if inner.is_some() {
+ // Future::poll was called after we've already errored out!
+ return;
+ }
+
+ *inner = Some(error.clone());
+ drop(inner);
+
+ self.rx.close();
+
+ // By closing the mpsc::Receiver, we know that poll_next_msg will soon return Ready(None),
+ // which will trigger the `self.finish == true` phase. We just need to make sure that any
+ // requests that we receive before we've exhausted the receiver receive the error:
+ self.failed = Some(error);
+ }
+}
+
+impl<T, Request> Future for Worker<T, Request>
+where
+ T: Service<Request>,
+ T::Error: Into<crate::BoxError>,
+{
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if self.finish {
+ return Poll::Ready(());
+ }
+
+ loop {
+ match ready!(self.poll_next_msg(cx)) {
+ Some((msg, first)) => {
+ let _guard = msg.span.enter();
+ if let Some(ref failed) = self.failed {
+ tracing::trace!("notifying caller about worker failure");
+ let _ = msg.tx.send(Err(failed.clone()));
+ continue;
+ }
+
+ // Wait for the service to be ready
+ tracing::trace!(
+ resumed = !first,
+ message = "worker received request; waiting for service readiness"
+ );
+ match self.service.poll_ready(cx) {
+ Poll::Ready(Ok(())) => {
+ tracing::debug!(service.ready = true, message = "processing request");
+ let response = self.service.call(msg.request);
+
+ // Send the response future back to the sender.
+ //
+ // An error means the request had been canceled in-between
+ // our calls, the response future will just be dropped.
+ tracing::trace!("returning response future");
+ let _ = msg.tx.send(Ok(response));
+ }
+ Poll::Pending => {
+ tracing::trace!(service.ready = false, message = "delay");
+ // Put out current message back in its slot.
+ drop(_guard);
+ self.current_message = Some(msg);
+ return Poll::Pending;
+ }
+ Poll::Ready(Err(e)) => {
+ let error = e.into();
+ tracing::debug!({ %error }, "service failed");
+ drop(_guard);
+ self.failed(error);
+ let _ = msg.tx.send(Err(self
+ .failed
+ .as_ref()
+ .expect("Worker::failed did not set self.failed?")
+ .clone()));
+ }
+ }
+ }
+ None => {
+ // No more more requests _ever_.
+ self.finish = true;
+ return Poll::Ready(());
+ }
+ }
+ }
+ }
+}
+
+impl Handle {
+ pub(crate) fn get_error_on_closed(&self) -> crate::BoxError {
+ self.inner
+ .lock()
+ .unwrap()
+ .as_ref()
+ .map(|svc_err| svc_err.clone().into())
+ .unwrap_or_else(|| Closed::new().into())
+ }
+}
+
+impl Clone for Handle {
+ fn clone(&self) -> Handle {
+ Handle {
+ inner: self.inner.clone(),
+ }
+ }
+}