summaryrefslogtreecommitdiff
path: root/vendor/tower/src/buffer
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tower/src/buffer')
-rw-r--r--vendor/tower/src/buffer/error.rs68
-rw-r--r--vendor/tower/src/buffer/future.rs79
-rw-r--r--vendor/tower/src/buffer/layer.rs75
-rw-r--r--vendor/tower/src/buffer/message.rs16
-rw-r--r--vendor/tower/src/buffer/mod.rs47
-rw-r--r--vendor/tower/src/buffer/service.rs144
-rw-r--r--vendor/tower/src/buffer/worker.rs227
7 files changed, 0 insertions, 656 deletions
diff --git a/vendor/tower/src/buffer/error.rs b/vendor/tower/src/buffer/error.rs
deleted file mode 100644
index f046cbca..00000000
--- a/vendor/tower/src/buffer/error.rs
+++ /dev/null
@@ -1,68 +0,0 @@
-//! Error types for the `Buffer` middleware.
-
-use crate::BoxError;
-use std::{fmt, sync::Arc};
-
-/// An error produced by a [`Service`] wrapped by a [`Buffer`]
-///
-/// [`Service`]: crate::Service
-/// [`Buffer`]: crate::buffer::Buffer
-#[derive(Debug)]
-pub struct ServiceError {
- inner: Arc<BoxError>,
-}
-
-/// An error produced when the a buffer's worker closes unexpectedly.
-pub struct Closed {
- _p: (),
-}
-
-// ===== impl ServiceError =====
-
-impl ServiceError {
- pub(crate) fn new(inner: BoxError) -> ServiceError {
- let inner = Arc::new(inner);
- ServiceError { inner }
- }
-
- // Private to avoid exposing `Clone` trait as part of the public API
- pub(crate) fn clone(&self) -> ServiceError {
- ServiceError {
- inner: self.inner.clone(),
- }
- }
-}
-
-impl fmt::Display for ServiceError {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- write!(fmt, "buffered service failed: {}", self.inner)
- }
-}
-
-impl std::error::Error for ServiceError {
- fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
- Some(&**self.inner)
- }
-}
-
-// ===== impl Closed =====
-
-impl Closed {
- pub(crate) fn new() -> Self {
- Closed { _p: () }
- }
-}
-
-impl fmt::Debug for Closed {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- fmt.debug_tuple("Closed").finish()
- }
-}
-
-impl fmt::Display for Closed {
- fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
- fmt.write_str("buffer's worker closed unexpectedly")
- }
-}
-
-impl std::error::Error for Closed {}
diff --git a/vendor/tower/src/buffer/future.rs b/vendor/tower/src/buffer/future.rs
deleted file mode 100644
index 41178900..00000000
--- a/vendor/tower/src/buffer/future.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-//! Future types for the [`Buffer`] middleware.
-//!
-//! [`Buffer`]: crate::buffer::Buffer
-
-use super::{error::Closed, message};
-use futures_core::ready;
-use pin_project_lite::pin_project;
-use std::{
- future::Future,
- pin::Pin,
- task::{Context, Poll},
-};
-
-pin_project! {
- /// Future that completes when the buffered service eventually services the submitted request.
- #[derive(Debug)]
- pub struct ResponseFuture<T> {
- #[pin]
- state: ResponseState<T>,
- }
-}
-
-pin_project! {
- #[project = ResponseStateProj]
- #[derive(Debug)]
- enum ResponseState<T> {
- Failed {
- error: Option<crate::BoxError>,
- },
- Rx {
- #[pin]
- rx: message::Rx<T>,
- },
- Poll {
- #[pin]
- fut: T,
- },
- }
-}
-
-impl<T> ResponseFuture<T> {
- pub(crate) fn new(rx: message::Rx<T>) -> Self {
- ResponseFuture {
- state: ResponseState::Rx { rx },
- }
- }
-
- pub(crate) fn failed(err: crate::BoxError) -> Self {
- ResponseFuture {
- state: ResponseState::Failed { error: Some(err) },
- }
- }
-}
-
-impl<F, T, E> Future for ResponseFuture<F>
-where
- F: Future<Output = Result<T, E>>,
- E: Into<crate::BoxError>,
-{
- type Output = Result<T, crate::BoxError>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let mut this = self.project();
-
- loop {
- match this.state.as_mut().project() {
- ResponseStateProj::Failed { error } => {
- return Poll::Ready(Err(error.take().expect("polled after error")));
- }
- ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
- Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
- Ok(Err(e)) => return Poll::Ready(Err(e.into())),
- Err(_) => return Poll::Ready(Err(Closed::new().into())),
- },
- ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
- }
- }
- }
-}
diff --git a/vendor/tower/src/buffer/layer.rs b/vendor/tower/src/buffer/layer.rs
deleted file mode 100644
index 3fc26ab5..00000000
--- a/vendor/tower/src/buffer/layer.rs
+++ /dev/null
@@ -1,75 +0,0 @@
-use super::service::Buffer;
-use std::{fmt, marker::PhantomData};
-use tower_layer::Layer;
-use tower_service::Service;
-
-/// Adds an mpsc buffer in front of an inner service.
-///
-/// The default Tokio executor is used to run the given service,
-/// which means that this layer can only be used on the Tokio runtime.
-///
-/// See the module documentation for more details.
-pub struct BufferLayer<Request> {
- bound: usize,
- _p: PhantomData<fn(Request)>,
-}
-
-impl<Request> BufferLayer<Request> {
- /// Creates a new [`BufferLayer`] with the provided `bound`.
- ///
- /// `bound` gives the maximal number of requests that can be queued for the service before
- /// backpressure is applied to callers.
- ///
- /// # A note on choosing a `bound`
- ///
- /// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
- /// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
- /// this reserved slot may be held up for a long time. As a result, it's advisable to set
- /// `bound` to be at least the maximum number of concurrent requests the [`Buffer`] will see.
- /// If you do not, all the slots in the buffer may be held up by futures that have just called
- /// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new
- /// requests.
- ///
- /// [`Poll::Ready`]: std::task::Poll::Ready
- /// [`call`]: crate::Service::call
- /// [`poll_ready`]: crate::Service::poll_ready
- pub const fn new(bound: usize) -> Self {
- BufferLayer {
- bound,
- _p: PhantomData,
- }
- }
-}
-
-impl<S, Request> Layer<S> for BufferLayer<Request>
-where
- S: Service<Request> + Send + 'static,
- S::Future: Send,
- S::Error: Into<crate::BoxError> + Send + Sync,
- Request: Send + 'static,
-{
- type Service = Buffer<Request, S::Future>;
-
- fn layer(&self, service: S) -> Self::Service {
- Buffer::new(service, self.bound)
- }
-}
-
-impl<Request> fmt::Debug for BufferLayer<Request> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.debug_struct("BufferLayer")
- .field("bound", &self.bound)
- .finish()
- }
-}
-
-impl<Request> Clone for BufferLayer<Request> {
- fn clone(&self) -> Self {
- Self {
- bound: self.bound,
- _p: PhantomData,
- }
- }
-}
-
-impl<Request> Copy for BufferLayer<Request> {}
diff --git a/vendor/tower/src/buffer/message.rs b/vendor/tower/src/buffer/message.rs
deleted file mode 100644
index 6d13aa12..00000000
--- a/vendor/tower/src/buffer/message.rs
+++ /dev/null
@@ -1,16 +0,0 @@
-use super::error::ServiceError;
-use tokio::sync::oneshot;
-
-/// Message sent over buffer
-#[derive(Debug)]
-pub(crate) struct Message<Request, Fut> {
- pub(crate) request: Request,
- pub(crate) tx: Tx<Fut>,
- pub(crate) span: tracing::Span,
-}
-
-/// Response sender
-pub(crate) type Tx<Fut> = oneshot::Sender<Result<Fut, ServiceError>>;
-
-/// Response receiver
-pub(crate) type Rx<Fut> = oneshot::Receiver<Result<Fut, ServiceError>>;
diff --git a/vendor/tower/src/buffer/mod.rs b/vendor/tower/src/buffer/mod.rs
deleted file mode 100644
index 923b4420..00000000
--- a/vendor/tower/src/buffer/mod.rs
+++ /dev/null
@@ -1,47 +0,0 @@
-//! Middleware that provides a buffered mpsc channel to a service.
-//!
-//! Sometimes you want to give out multiple handles to a single service, and allow each handle to
-//! enqueue requests. That is, you want a [`Service`] to be [`Clone`]. This module allows you to do
-//! that by placing the service behind a multi-producer, single-consumer buffering channel. Clients
-//! enqueue requests by sending on the channel from any of the handles ([`Buffer`]), and the single
-//! service running elsewhere (usually spawned) receives and services the requests one by one. Each
-//! request is enqueued alongside a response channel that allows the service to report the result
-//! of the request back to the caller.
-//!
-//! # Examples
-//!
-//! ```rust
-//! # #[cfg(feature = "util")]
-//! use tower::buffer::Buffer;
-//! # #[cfg(feature = "util")]
-//! use tower::{Service, ServiceExt};
-//! # #[cfg(feature = "util")]
-//! async fn mass_produce<S: Service<usize>>(svc: S)
-//! where
-//! S: 'static + Send,
-//! S::Error: Send + Sync + std::error::Error,
-//! S::Future: Send
-//! {
-//! let svc = Buffer::new(svc, 10 /* buffer length */);
-//! for _ in 0..10 {
-//! let mut svc = svc.clone();
-//! tokio::spawn(async move {
-//! for i in 0usize.. {
-//! svc.ready().await.expect("service crashed").call(i).await;
-//! }
-//! });
-//! }
-//! }
-//! ```
-//!
-//! [`Service`]: crate::Service
-
-pub mod error;
-pub mod future;
-mod layer;
-mod message;
-mod service;
-mod worker;
-
-pub use self::layer::BufferLayer;
-pub use self::service::Buffer;
diff --git a/vendor/tower/src/buffer/service.rs b/vendor/tower/src/buffer/service.rs
deleted file mode 100644
index 9493f107..00000000
--- a/vendor/tower/src/buffer/service.rs
+++ /dev/null
@@ -1,144 +0,0 @@
-use super::{
- future::ResponseFuture,
- message::Message,
- worker::{Handle, Worker},
-};
-
-use std::{
- future::Future,
- task::{Context, Poll},
-};
-use tokio::sync::{mpsc, oneshot};
-use tokio_util::sync::PollSender;
-use tower_service::Service;
-
-/// Adds an mpsc buffer in front of an inner service.
-///
-/// See the module documentation for more details.
-#[derive(Debug)]
-pub struct Buffer<Req, F> {
- tx: PollSender<Message<Req, F>>,
- handle: Handle,
-}
-
-impl<Req, F> Buffer<Req, F>
-where
- F: 'static,
-{
- /// Creates a new [`Buffer`] wrapping `service`.
- ///
- /// `bound` gives the maximal number of requests that can be queued for the service before
- /// backpressure is applied to callers.
- ///
- /// The default Tokio executor is used to run the given service, which means that this method
- /// must be called while on the Tokio runtime.
- ///
- /// # A note on choosing a `bound`
- ///
- /// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
- /// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
- /// this reserved slot may be held up for a long time. As a result, it's advisable to set
- /// `bound` to be at least the maximum number of concurrent requests the [`Buffer`] will see.
- /// If you do not, all the slots in the buffer may be held up by futures that have just called
- /// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new
- /// requests.
- ///
- /// [`Poll::Ready`]: std::task::Poll::Ready
- /// [`call`]: crate::Service::call
- /// [`poll_ready`]: crate::Service::poll_ready
- pub fn new<S>(service: S, bound: usize) -> Self
- where
- S: Service<Req, Future = F> + Send + 'static,
- F: Send,
- S::Error: Into<crate::BoxError> + Send + Sync,
- Req: Send + 'static,
- {
- let (service, worker) = Self::pair(service, bound);
- tokio::spawn(worker);
- service
- }
-
- /// Creates a new [`Buffer`] wrapping `service`, but returns the background worker.
- ///
- /// This is useful if you do not want to spawn directly onto the tokio runtime
- /// but instead want to use your own executor. This will return the [`Buffer`] and
- /// the background `Worker` that you can then spawn.
- pub fn pair<S>(service: S, bound: usize) -> (Self, Worker<S, Req>)
- where
- S: Service<Req, Future = F> + Send + 'static,
- F: Send,
- S::Error: Into<crate::BoxError> + Send + Sync,
- Req: Send + 'static,
- {
- let (tx, rx) = mpsc::channel(bound);
- let (handle, worker) = Worker::new(service, rx);
- let buffer = Self {
- tx: PollSender::new(tx),
- handle,
- };
- (buffer, worker)
- }
-
- fn get_worker_error(&self) -> crate::BoxError {
- self.handle.get_error_on_closed()
- }
-}
-
-impl<Req, Rsp, F, E> Service<Req> for Buffer<Req, F>
-where
- F: Future<Output = Result<Rsp, E>> + Send + 'static,
- E: Into<crate::BoxError>,
- Req: Send + 'static,
-{
- type Response = Rsp;
- type Error = crate::BoxError;
- type Future = ResponseFuture<F>;
-
- fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- // First, check if the worker is still alive.
- if self.tx.is_closed() {
- // If the inner service has errored, then we error here.
- return Poll::Ready(Err(self.get_worker_error()));
- }
-
- // Poll the sender to acquire a permit.
- self.tx
- .poll_reserve(cx)
- .map_err(|_| self.get_worker_error())
- }
-
- fn call(&mut self, request: Req) -> Self::Future {
- tracing::trace!("sending request to buffer worker");
-
- // get the current Span so that we can explicitly propagate it to the worker
- // if we didn't do this, events on the worker related to this span wouldn't be counted
- // towards that span since the worker would have no way of entering it.
- let span = tracing::Span::current();
-
- // If we've made it here, then a channel permit has already been
- // acquired, so we can freely allocate a oneshot.
- let (tx, rx) = oneshot::channel();
-
- match self.tx.send_item(Message { request, span, tx }) {
- Ok(_) => ResponseFuture::new(rx),
- // If the channel is closed, propagate the error from the worker.
- Err(_) => {
- tracing::trace!("buffer channel closed");
- ResponseFuture::failed(self.get_worker_error())
- }
- }
- }
-}
-
-impl<Req, F> Clone for Buffer<Req, F>
-where
- Req: Send + 'static,
- F: Send + 'static,
-{
- fn clone(&self) -> Self {
- Self {
- handle: self.handle.clone(),
- tx: self.tx.clone(),
- }
- }
-}
diff --git a/vendor/tower/src/buffer/worker.rs b/vendor/tower/src/buffer/worker.rs
deleted file mode 100644
index 7f4416d6..00000000
--- a/vendor/tower/src/buffer/worker.rs
+++ /dev/null
@@ -1,227 +0,0 @@
-use super::{
- error::{Closed, ServiceError},
- message::Message,
-};
-use futures_core::ready;
-use std::sync::{Arc, Mutex};
-use std::{
- future::Future,
- pin::Pin,
- task::{Context, Poll},
-};
-use tokio::sync::mpsc;
-use tower_service::Service;
-
-pin_project_lite::pin_project! {
- /// Task that handles processing the buffer. This type should not be used
- /// directly, instead `Buffer` requires an `Executor` that can accept this task.
- ///
- /// The struct is `pub` in the private module and the type is *not* re-exported
- /// as part of the public API. This is the "sealed" pattern to include "private"
- /// types in public traits that are not meant for consumers of the library to
- /// implement (only call).
- #[derive(Debug)]
- pub struct Worker<T, Request>
- where
- T: Service<Request>,
- {
- current_message: Option<Message<Request, T::Future>>,
- rx: mpsc::Receiver<Message<Request, T::Future>>,
- service: T,
- finish: bool,
- failed: Option<ServiceError>,
- handle: Handle,
- }
-}
-
-/// Get the error out
-#[derive(Debug)]
-pub(crate) struct Handle {
- inner: Arc<Mutex<Option<ServiceError>>>,
-}
-
-impl<T, Request> Worker<T, Request>
-where
- T: Service<Request>,
- T::Error: Into<crate::BoxError>,
-{
- pub(crate) fn new(
- service: T,
- rx: mpsc::Receiver<Message<Request, T::Future>>,
- ) -> (Handle, Worker<T, Request>) {
- let handle = Handle {
- inner: Arc::new(Mutex::new(None)),
- };
-
- let worker = Worker {
- current_message: None,
- finish: false,
- failed: None,
- rx,
- service,
- handle: handle.clone(),
- };
-
- (handle, worker)
- }
-
- /// Return the next queued Message that hasn't been canceled.
- ///
- /// If a `Message` is returned, the `bool` is true if this is the first time we received this
- /// message, and false otherwise (i.e., we tried to forward it to the backing service before).
- fn poll_next_msg(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<Option<(Message<Request, T::Future>, bool)>> {
- if self.finish {
- // We've already received None and are shutting down
- return Poll::Ready(None);
- }
-
- tracing::trace!("worker polling for next message");
- if let Some(msg) = self.current_message.take() {
- // If the oneshot sender is closed, then the receiver is dropped,
- // and nobody cares about the response. If this is the case, we
- // should continue to the next request.
- if !msg.tx.is_closed() {
- tracing::trace!("resuming buffered request");
- return Poll::Ready(Some((msg, false)));
- }
-
- tracing::trace!("dropping cancelled buffered request");
- }
-
- // Get the next request
- while let Some(msg) = ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
- if !msg.tx.is_closed() {
- tracing::trace!("processing new request");
- return Poll::Ready(Some((msg, true)));
- }
- // Otherwise, request is canceled, so pop the next one.
- tracing::trace!("dropping cancelled request");
- }
-
- Poll::Ready(None)
- }
-
- fn failed(&mut self, error: crate::BoxError) {
- // The underlying service failed when we called `poll_ready` on it with the given `error`. We
- // need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in
- // an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent
- // requests will also fail with the same error.
-
- // Note that we need to handle the case where some handle is concurrently trying to send us
- // a request. We need to make sure that *either* the send of the request fails *or* it
- // receives an error on the `oneshot` it constructed. Specifically, we want to avoid the
- // case where we send errors to all outstanding requests, and *then* the caller sends its
- // request. We do this by *first* exposing the error, *then* closing the channel used to
- // send more requests (so the client will see the error when the send fails), and *then*
- // sending the error to all outstanding requests.
- let error = ServiceError::new(error);
-
- let mut inner = self.handle.inner.lock().unwrap();
-
- if inner.is_some() {
- // Future::poll was called after we've already errored out!
- return;
- }
-
- *inner = Some(error.clone());
- drop(inner);
-
- self.rx.close();
-
- // By closing the mpsc::Receiver, we know that poll_next_msg will soon return Ready(None),
- // which will trigger the `self.finish == true` phase. We just need to make sure that any
- // requests that we receive before we've exhausted the receiver receive the error:
- self.failed = Some(error);
- }
-}
-
-impl<T, Request> Future for Worker<T, Request>
-where
- T: Service<Request>,
- T::Error: Into<crate::BoxError>,
-{
- type Output = ();
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- if self.finish {
- return Poll::Ready(());
- }
-
- loop {
- match ready!(self.poll_next_msg(cx)) {
- Some((msg, first)) => {
- let _guard = msg.span.enter();
- if let Some(ref failed) = self.failed {
- tracing::trace!("notifying caller about worker failure");
- let _ = msg.tx.send(Err(failed.clone()));
- continue;
- }
-
- // Wait for the service to be ready
- tracing::trace!(
- resumed = !first,
- message = "worker received request; waiting for service readiness"
- );
- match self.service.poll_ready(cx) {
- Poll::Ready(Ok(())) => {
- tracing::debug!(service.ready = true, message = "processing request");
- let response = self.service.call(msg.request);
-
- // Send the response future back to the sender.
- //
- // An error means the request had been canceled in-between
- // our calls, the response future will just be dropped.
- tracing::trace!("returning response future");
- let _ = msg.tx.send(Ok(response));
- }
- Poll::Pending => {
- tracing::trace!(service.ready = false, message = "delay");
- // Put out current message back in its slot.
- drop(_guard);
- self.current_message = Some(msg);
- return Poll::Pending;
- }
- Poll::Ready(Err(e)) => {
- let error = e.into();
- tracing::debug!({ %error }, "service failed");
- drop(_guard);
- self.failed(error);
- let _ = msg.tx.send(Err(self
- .failed
- .as_ref()
- .expect("Worker::failed did not set self.failed?")
- .clone()));
- }
- }
- }
- None => {
- // No more more requests _ever_.
- self.finish = true;
- return Poll::Ready(());
- }
- }
- }
- }
-}
-
-impl Handle {
- pub(crate) fn get_error_on_closed(&self) -> crate::BoxError {
- self.inner
- .lock()
- .unwrap()
- .as_ref()
- .map(|svc_err| svc_err.clone().into())
- .unwrap_or_else(|| Closed::new().into())
- }
-}
-
-impl Clone for Handle {
- fn clone(&self) -> Handle {
- Handle {
- inner: self.inner.clone(),
- }
- }
-}