diff options
Diffstat (limited to 'vendor/tower/src/buffer')
| -rw-r--r-- | vendor/tower/src/buffer/error.rs | 68 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/future.rs | 79 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/layer.rs | 75 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/message.rs | 16 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/mod.rs | 47 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/service.rs | 144 | ||||
| -rw-r--r-- | vendor/tower/src/buffer/worker.rs | 227 |
7 files changed, 0 insertions, 656 deletions
diff --git a/vendor/tower/src/buffer/error.rs b/vendor/tower/src/buffer/error.rs deleted file mode 100644 index f046cbca..00000000 --- a/vendor/tower/src/buffer/error.rs +++ /dev/null @@ -1,68 +0,0 @@ -//! Error types for the `Buffer` middleware. - -use crate::BoxError; -use std::{fmt, sync::Arc}; - -/// An error produced by a [`Service`] wrapped by a [`Buffer`] -/// -/// [`Service`]: crate::Service -/// [`Buffer`]: crate::buffer::Buffer -#[derive(Debug)] -pub struct ServiceError { - inner: Arc<BoxError>, -} - -/// An error produced when the a buffer's worker closes unexpectedly. -pub struct Closed { - _p: (), -} - -// ===== impl ServiceError ===== - -impl ServiceError { - pub(crate) fn new(inner: BoxError) -> ServiceError { - let inner = Arc::new(inner); - ServiceError { inner } - } - - // Private to avoid exposing `Clone` trait as part of the public API - pub(crate) fn clone(&self) -> ServiceError { - ServiceError { - inner: self.inner.clone(), - } - } -} - -impl fmt::Display for ServiceError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "buffered service failed: {}", self.inner) - } -} - -impl std::error::Error for ServiceError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - Some(&**self.inner) - } -} - -// ===== impl Closed ===== - -impl Closed { - pub(crate) fn new() -> Self { - Closed { _p: () } - } -} - -impl fmt::Debug for Closed { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_tuple("Closed").finish() - } -} - -impl fmt::Display for Closed { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.write_str("buffer's worker closed unexpectedly") - } -} - -impl std::error::Error for Closed {} diff --git a/vendor/tower/src/buffer/future.rs b/vendor/tower/src/buffer/future.rs deleted file mode 100644 index 41178900..00000000 --- a/vendor/tower/src/buffer/future.rs +++ /dev/null @@ -1,79 +0,0 @@ -//! Future types for the [`Buffer`] middleware. -//! -//! [`Buffer`]: crate::buffer::Buffer - -use super::{error::Closed, message}; -use futures_core::ready; -use pin_project_lite::pin_project; -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -pin_project! { - /// Future that completes when the buffered service eventually services the submitted request. - #[derive(Debug)] - pub struct ResponseFuture<T> { - #[pin] - state: ResponseState<T>, - } -} - -pin_project! { - #[project = ResponseStateProj] - #[derive(Debug)] - enum ResponseState<T> { - Failed { - error: Option<crate::BoxError>, - }, - Rx { - #[pin] - rx: message::Rx<T>, - }, - Poll { - #[pin] - fut: T, - }, - } -} - -impl<T> ResponseFuture<T> { - pub(crate) fn new(rx: message::Rx<T>) -> Self { - ResponseFuture { - state: ResponseState::Rx { rx }, - } - } - - pub(crate) fn failed(err: crate::BoxError) -> Self { - ResponseFuture { - state: ResponseState::Failed { error: Some(err) }, - } - } -} - -impl<F, T, E> Future for ResponseFuture<F> -where - F: Future<Output = Result<T, E>>, - E: Into<crate::BoxError>, -{ - type Output = Result<T, crate::BoxError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut this = self.project(); - - loop { - match this.state.as_mut().project() { - ResponseStateProj::Failed { error } => { - return Poll::Ready(Err(error.take().expect("polled after error"))); - } - ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) { - Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }), - Ok(Err(e)) => return Poll::Ready(Err(e.into())), - Err(_) => return Poll::Ready(Err(Closed::new().into())), - }, - ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into), - } - } - } -} diff --git a/vendor/tower/src/buffer/layer.rs b/vendor/tower/src/buffer/layer.rs deleted file mode 100644 index 3fc26ab5..00000000 --- a/vendor/tower/src/buffer/layer.rs +++ /dev/null @@ -1,75 +0,0 @@ -use super::service::Buffer; -use std::{fmt, marker::PhantomData}; -use tower_layer::Layer; -use tower_service::Service; - -/// Adds an mpsc buffer in front of an inner service. -/// -/// The default Tokio executor is used to run the given service, -/// which means that this layer can only be used on the Tokio runtime. -/// -/// See the module documentation for more details. -pub struct BufferLayer<Request> { - bound: usize, - _p: PhantomData<fn(Request)>, -} - -impl<Request> BufferLayer<Request> { - /// Creates a new [`BufferLayer`] with the provided `bound`. - /// - /// `bound` gives the maximal number of requests that can be queued for the service before - /// backpressure is applied to callers. - /// - /// # A note on choosing a `bound` - /// - /// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a - /// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive, - /// this reserved slot may be held up for a long time. As a result, it's advisable to set - /// `bound` to be at least the maximum number of concurrent requests the [`Buffer`] will see. - /// If you do not, all the slots in the buffer may be held up by futures that have just called - /// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new - /// requests. - /// - /// [`Poll::Ready`]: std::task::Poll::Ready - /// [`call`]: crate::Service::call - /// [`poll_ready`]: crate::Service::poll_ready - pub const fn new(bound: usize) -> Self { - BufferLayer { - bound, - _p: PhantomData, - } - } -} - -impl<S, Request> Layer<S> for BufferLayer<Request> -where - S: Service<Request> + Send + 'static, - S::Future: Send, - S::Error: Into<crate::BoxError> + Send + Sync, - Request: Send + 'static, -{ - type Service = Buffer<Request, S::Future>; - - fn layer(&self, service: S) -> Self::Service { - Buffer::new(service, self.bound) - } -} - -impl<Request> fmt::Debug for BufferLayer<Request> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("BufferLayer") - .field("bound", &self.bound) - .finish() - } -} - -impl<Request> Clone for BufferLayer<Request> { - fn clone(&self) -> Self { - Self { - bound: self.bound, - _p: PhantomData, - } - } -} - -impl<Request> Copy for BufferLayer<Request> {} diff --git a/vendor/tower/src/buffer/message.rs b/vendor/tower/src/buffer/message.rs deleted file mode 100644 index 6d13aa12..00000000 --- a/vendor/tower/src/buffer/message.rs +++ /dev/null @@ -1,16 +0,0 @@ -use super::error::ServiceError; -use tokio::sync::oneshot; - -/// Message sent over buffer -#[derive(Debug)] -pub(crate) struct Message<Request, Fut> { - pub(crate) request: Request, - pub(crate) tx: Tx<Fut>, - pub(crate) span: tracing::Span, -} - -/// Response sender -pub(crate) type Tx<Fut> = oneshot::Sender<Result<Fut, ServiceError>>; - -/// Response receiver -pub(crate) type Rx<Fut> = oneshot::Receiver<Result<Fut, ServiceError>>; diff --git a/vendor/tower/src/buffer/mod.rs b/vendor/tower/src/buffer/mod.rs deleted file mode 100644 index 923b4420..00000000 --- a/vendor/tower/src/buffer/mod.rs +++ /dev/null @@ -1,47 +0,0 @@ -//! Middleware that provides a buffered mpsc channel to a service. -//! -//! Sometimes you want to give out multiple handles to a single service, and allow each handle to -//! enqueue requests. That is, you want a [`Service`] to be [`Clone`]. This module allows you to do -//! that by placing the service behind a multi-producer, single-consumer buffering channel. Clients -//! enqueue requests by sending on the channel from any of the handles ([`Buffer`]), and the single -//! service running elsewhere (usually spawned) receives and services the requests one by one. Each -//! request is enqueued alongside a response channel that allows the service to report the result -//! of the request back to the caller. -//! -//! # Examples -//! -//! ```rust -//! # #[cfg(feature = "util")] -//! use tower::buffer::Buffer; -//! # #[cfg(feature = "util")] -//! use tower::{Service, ServiceExt}; -//! # #[cfg(feature = "util")] -//! async fn mass_produce<S: Service<usize>>(svc: S) -//! where -//! S: 'static + Send, -//! S::Error: Send + Sync + std::error::Error, -//! S::Future: Send -//! { -//! let svc = Buffer::new(svc, 10 /* buffer length */); -//! for _ in 0..10 { -//! let mut svc = svc.clone(); -//! tokio::spawn(async move { -//! for i in 0usize.. { -//! svc.ready().await.expect("service crashed").call(i).await; -//! } -//! }); -//! } -//! } -//! ``` -//! -//! [`Service`]: crate::Service - -pub mod error; -pub mod future; -mod layer; -mod message; -mod service; -mod worker; - -pub use self::layer::BufferLayer; -pub use self::service::Buffer; diff --git a/vendor/tower/src/buffer/service.rs b/vendor/tower/src/buffer/service.rs deleted file mode 100644 index 9493f107..00000000 --- a/vendor/tower/src/buffer/service.rs +++ /dev/null @@ -1,144 +0,0 @@ -use super::{ - future::ResponseFuture, - message::Message, - worker::{Handle, Worker}, -}; - -use std::{ - future::Future, - task::{Context, Poll}, -}; -use tokio::sync::{mpsc, oneshot}; -use tokio_util::sync::PollSender; -use tower_service::Service; - -/// Adds an mpsc buffer in front of an inner service. -/// -/// See the module documentation for more details. -#[derive(Debug)] -pub struct Buffer<Req, F> { - tx: PollSender<Message<Req, F>>, - handle: Handle, -} - -impl<Req, F> Buffer<Req, F> -where - F: 'static, -{ - /// Creates a new [`Buffer`] wrapping `service`. - /// - /// `bound` gives the maximal number of requests that can be queued for the service before - /// backpressure is applied to callers. - /// - /// The default Tokio executor is used to run the given service, which means that this method - /// must be called while on the Tokio runtime. - /// - /// # A note on choosing a `bound` - /// - /// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a - /// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive, - /// this reserved slot may be held up for a long time. As a result, it's advisable to set - /// `bound` to be at least the maximum number of concurrent requests the [`Buffer`] will see. - /// If you do not, all the slots in the buffer may be held up by futures that have just called - /// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new - /// requests. - /// - /// [`Poll::Ready`]: std::task::Poll::Ready - /// [`call`]: crate::Service::call - /// [`poll_ready`]: crate::Service::poll_ready - pub fn new<S>(service: S, bound: usize) -> Self - where - S: Service<Req, Future = F> + Send + 'static, - F: Send, - S::Error: Into<crate::BoxError> + Send + Sync, - Req: Send + 'static, - { - let (service, worker) = Self::pair(service, bound); - tokio::spawn(worker); - service - } - - /// Creates a new [`Buffer`] wrapping `service`, but returns the background worker. - /// - /// This is useful if you do not want to spawn directly onto the tokio runtime - /// but instead want to use your own executor. This will return the [`Buffer`] and - /// the background `Worker` that you can then spawn. - pub fn pair<S>(service: S, bound: usize) -> (Self, Worker<S, Req>) - where - S: Service<Req, Future = F> + Send + 'static, - F: Send, - S::Error: Into<crate::BoxError> + Send + Sync, - Req: Send + 'static, - { - let (tx, rx) = mpsc::channel(bound); - let (handle, worker) = Worker::new(service, rx); - let buffer = Self { - tx: PollSender::new(tx), - handle, - }; - (buffer, worker) - } - - fn get_worker_error(&self) -> crate::BoxError { - self.handle.get_error_on_closed() - } -} - -impl<Req, Rsp, F, E> Service<Req> for Buffer<Req, F> -where - F: Future<Output = Result<Rsp, E>> + Send + 'static, - E: Into<crate::BoxError>, - Req: Send + 'static, -{ - type Response = Rsp; - type Error = crate::BoxError; - type Future = ResponseFuture<F>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - // First, check if the worker is still alive. - if self.tx.is_closed() { - // If the inner service has errored, then we error here. - return Poll::Ready(Err(self.get_worker_error())); - } - - // Poll the sender to acquire a permit. - self.tx - .poll_reserve(cx) - .map_err(|_| self.get_worker_error()) - } - - fn call(&mut self, request: Req) -> Self::Future { - tracing::trace!("sending request to buffer worker"); - - // get the current Span so that we can explicitly propagate it to the worker - // if we didn't do this, events on the worker related to this span wouldn't be counted - // towards that span since the worker would have no way of entering it. - let span = tracing::Span::current(); - - // If we've made it here, then a channel permit has already been - // acquired, so we can freely allocate a oneshot. - let (tx, rx) = oneshot::channel(); - - match self.tx.send_item(Message { request, span, tx }) { - Ok(_) => ResponseFuture::new(rx), - // If the channel is closed, propagate the error from the worker. - Err(_) => { - tracing::trace!("buffer channel closed"); - ResponseFuture::failed(self.get_worker_error()) - } - } - } -} - -impl<Req, F> Clone for Buffer<Req, F> -where - Req: Send + 'static, - F: Send + 'static, -{ - fn clone(&self) -> Self { - Self { - handle: self.handle.clone(), - tx: self.tx.clone(), - } - } -} diff --git a/vendor/tower/src/buffer/worker.rs b/vendor/tower/src/buffer/worker.rs deleted file mode 100644 index 7f4416d6..00000000 --- a/vendor/tower/src/buffer/worker.rs +++ /dev/null @@ -1,227 +0,0 @@ -use super::{ - error::{Closed, ServiceError}, - message::Message, -}; -use futures_core::ready; -use std::sync::{Arc, Mutex}; -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; -use tokio::sync::mpsc; -use tower_service::Service; - -pin_project_lite::pin_project! { - /// Task that handles processing the buffer. This type should not be used - /// directly, instead `Buffer` requires an `Executor` that can accept this task. - /// - /// The struct is `pub` in the private module and the type is *not* re-exported - /// as part of the public API. This is the "sealed" pattern to include "private" - /// types in public traits that are not meant for consumers of the library to - /// implement (only call). - #[derive(Debug)] - pub struct Worker<T, Request> - where - T: Service<Request>, - { - current_message: Option<Message<Request, T::Future>>, - rx: mpsc::Receiver<Message<Request, T::Future>>, - service: T, - finish: bool, - failed: Option<ServiceError>, - handle: Handle, - } -} - -/// Get the error out -#[derive(Debug)] -pub(crate) struct Handle { - inner: Arc<Mutex<Option<ServiceError>>>, -} - -impl<T, Request> Worker<T, Request> -where - T: Service<Request>, - T::Error: Into<crate::BoxError>, -{ - pub(crate) fn new( - service: T, - rx: mpsc::Receiver<Message<Request, T::Future>>, - ) -> (Handle, Worker<T, Request>) { - let handle = Handle { - inner: Arc::new(Mutex::new(None)), - }; - - let worker = Worker { - current_message: None, - finish: false, - failed: None, - rx, - service, - handle: handle.clone(), - }; - - (handle, worker) - } - - /// Return the next queued Message that hasn't been canceled. - /// - /// If a `Message` is returned, the `bool` is true if this is the first time we received this - /// message, and false otherwise (i.e., we tried to forward it to the backing service before). - fn poll_next_msg( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<Option<(Message<Request, T::Future>, bool)>> { - if self.finish { - // We've already received None and are shutting down - return Poll::Ready(None); - } - - tracing::trace!("worker polling for next message"); - if let Some(msg) = self.current_message.take() { - // If the oneshot sender is closed, then the receiver is dropped, - // and nobody cares about the response. If this is the case, we - // should continue to the next request. - if !msg.tx.is_closed() { - tracing::trace!("resuming buffered request"); - return Poll::Ready(Some((msg, false))); - } - - tracing::trace!("dropping cancelled buffered request"); - } - - // Get the next request - while let Some(msg) = ready!(Pin::new(&mut self.rx).poll_recv(cx)) { - if !msg.tx.is_closed() { - tracing::trace!("processing new request"); - return Poll::Ready(Some((msg, true))); - } - // Otherwise, request is canceled, so pop the next one. - tracing::trace!("dropping cancelled request"); - } - - Poll::Ready(None) - } - - fn failed(&mut self, error: crate::BoxError) { - // The underlying service failed when we called `poll_ready` on it with the given `error`. We - // need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in - // an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent - // requests will also fail with the same error. - - // Note that we need to handle the case where some handle is concurrently trying to send us - // a request. We need to make sure that *either* the send of the request fails *or* it - // receives an error on the `oneshot` it constructed. Specifically, we want to avoid the - // case where we send errors to all outstanding requests, and *then* the caller sends its - // request. We do this by *first* exposing the error, *then* closing the channel used to - // send more requests (so the client will see the error when the send fails), and *then* - // sending the error to all outstanding requests. - let error = ServiceError::new(error); - - let mut inner = self.handle.inner.lock().unwrap(); - - if inner.is_some() { - // Future::poll was called after we've already errored out! - return; - } - - *inner = Some(error.clone()); - drop(inner); - - self.rx.close(); - - // By closing the mpsc::Receiver, we know that poll_next_msg will soon return Ready(None), - // which will trigger the `self.finish == true` phase. We just need to make sure that any - // requests that we receive before we've exhausted the receiver receive the error: - self.failed = Some(error); - } -} - -impl<T, Request> Future for Worker<T, Request> -where - T: Service<Request>, - T::Error: Into<crate::BoxError>, -{ - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - if self.finish { - return Poll::Ready(()); - } - - loop { - match ready!(self.poll_next_msg(cx)) { - Some((msg, first)) => { - let _guard = msg.span.enter(); - if let Some(ref failed) = self.failed { - tracing::trace!("notifying caller about worker failure"); - let _ = msg.tx.send(Err(failed.clone())); - continue; - } - - // Wait for the service to be ready - tracing::trace!( - resumed = !first, - message = "worker received request; waiting for service readiness" - ); - match self.service.poll_ready(cx) { - Poll::Ready(Ok(())) => { - tracing::debug!(service.ready = true, message = "processing request"); - let response = self.service.call(msg.request); - - // Send the response future back to the sender. - // - // An error means the request had been canceled in-between - // our calls, the response future will just be dropped. - tracing::trace!("returning response future"); - let _ = msg.tx.send(Ok(response)); - } - Poll::Pending => { - tracing::trace!(service.ready = false, message = "delay"); - // Put out current message back in its slot. - drop(_guard); - self.current_message = Some(msg); - return Poll::Pending; - } - Poll::Ready(Err(e)) => { - let error = e.into(); - tracing::debug!({ %error }, "service failed"); - drop(_guard); - self.failed(error); - let _ = msg.tx.send(Err(self - .failed - .as_ref() - .expect("Worker::failed did not set self.failed?") - .clone())); - } - } - } - None => { - // No more more requests _ever_. - self.finish = true; - return Poll::Ready(()); - } - } - } - } -} - -impl Handle { - pub(crate) fn get_error_on_closed(&self) -> crate::BoxError { - self.inner - .lock() - .unwrap() - .as_ref() - .map(|svc_err| svc_err.clone().into()) - .unwrap_or_else(|| Closed::new().into()) - } -} - -impl Clone for Handle { - fn clone(&self) -> Handle { - Handle { - inner: self.inner.clone(), - } - } -} |
