summaryrefslogtreecommitdiff
path: root/vendor/tower/src/buffer/service.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tower/src/buffer/service.rs')
-rw-r--r--vendor/tower/src/buffer/service.rs144
1 files changed, 144 insertions, 0 deletions
diff --git a/vendor/tower/src/buffer/service.rs b/vendor/tower/src/buffer/service.rs
new file mode 100644
index 00000000..9493f107
--- /dev/null
+++ b/vendor/tower/src/buffer/service.rs
@@ -0,0 +1,144 @@
+use super::{
+ future::ResponseFuture,
+ message::Message,
+ worker::{Handle, Worker},
+};
+
+use std::{
+ future::Future,
+ task::{Context, Poll},
+};
+use tokio::sync::{mpsc, oneshot};
+use tokio_util::sync::PollSender;
+use tower_service::Service;
+
+/// Adds an mpsc buffer in front of an inner service.
+///
+/// See the module documentation for more details.
+#[derive(Debug)]
+pub struct Buffer<Req, F> {
+ tx: PollSender<Message<Req, F>>,
+ handle: Handle,
+}
+
+impl<Req, F> Buffer<Req, F>
+where
+ F: 'static,
+{
+ /// Creates a new [`Buffer`] wrapping `service`.
+ ///
+ /// `bound` gives the maximal number of requests that can be queued for the service before
+ /// backpressure is applied to callers.
+ ///
+ /// The default Tokio executor is used to run the given service, which means that this method
+ /// must be called while on the Tokio runtime.
+ ///
+ /// # A note on choosing a `bound`
+ ///
+ /// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
+ /// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
+ /// this reserved slot may be held up for a long time. As a result, it's advisable to set
+ /// `bound` to be at least the maximum number of concurrent requests the [`Buffer`] will see.
+ /// If you do not, all the slots in the buffer may be held up by futures that have just called
+ /// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new
+ /// requests.
+ ///
+ /// [`Poll::Ready`]: std::task::Poll::Ready
+ /// [`call`]: crate::Service::call
+ /// [`poll_ready`]: crate::Service::poll_ready
+ pub fn new<S>(service: S, bound: usize) -> Self
+ where
+ S: Service<Req, Future = F> + Send + 'static,
+ F: Send,
+ S::Error: Into<crate::BoxError> + Send + Sync,
+ Req: Send + 'static,
+ {
+ let (service, worker) = Self::pair(service, bound);
+ tokio::spawn(worker);
+ service
+ }
+
+ /// Creates a new [`Buffer`] wrapping `service`, but returns the background worker.
+ ///
+ /// This is useful if you do not want to spawn directly onto the tokio runtime
+ /// but instead want to use your own executor. This will return the [`Buffer`] and
+ /// the background `Worker` that you can then spawn.
+ pub fn pair<S>(service: S, bound: usize) -> (Self, Worker<S, Req>)
+ where
+ S: Service<Req, Future = F> + Send + 'static,
+ F: Send,
+ S::Error: Into<crate::BoxError> + Send + Sync,
+ Req: Send + 'static,
+ {
+ let (tx, rx) = mpsc::channel(bound);
+ let (handle, worker) = Worker::new(service, rx);
+ let buffer = Self {
+ tx: PollSender::new(tx),
+ handle,
+ };
+ (buffer, worker)
+ }
+
+ fn get_worker_error(&self) -> crate::BoxError {
+ self.handle.get_error_on_closed()
+ }
+}
+
+impl<Req, Rsp, F, E> Service<Req> for Buffer<Req, F>
+where
+ F: Future<Output = Result<Rsp, E>> + Send + 'static,
+ E: Into<crate::BoxError>,
+ Req: Send + 'static,
+{
+ type Response = Rsp;
+ type Error = crate::BoxError;
+ type Future = ResponseFuture<F>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ // First, check if the worker is still alive.
+ if self.tx.is_closed() {
+ // If the inner service has errored, then we error here.
+ return Poll::Ready(Err(self.get_worker_error()));
+ }
+
+ // Poll the sender to acquire a permit.
+ self.tx
+ .poll_reserve(cx)
+ .map_err(|_| self.get_worker_error())
+ }
+
+ fn call(&mut self, request: Req) -> Self::Future {
+ tracing::trace!("sending request to buffer worker");
+
+ // get the current Span so that we can explicitly propagate it to the worker
+ // if we didn't do this, events on the worker related to this span wouldn't be counted
+ // towards that span since the worker would have no way of entering it.
+ let span = tracing::Span::current();
+
+ // If we've made it here, then a channel permit has already been
+ // acquired, so we can freely allocate a oneshot.
+ let (tx, rx) = oneshot::channel();
+
+ match self.tx.send_item(Message { request, span, tx }) {
+ Ok(_) => ResponseFuture::new(rx),
+ // If the channel is closed, propagate the error from the worker.
+ Err(_) => {
+ tracing::trace!("buffer channel closed");
+ ResponseFuture::failed(self.get_worker_error())
+ }
+ }
+ }
+}
+
+impl<Req, F> Clone for Buffer<Req, F>
+where
+ Req: Send + 'static,
+ F: Send + 'static,
+{
+ fn clone(&self) -> Self {
+ Self {
+ handle: self.handle.clone(),
+ tx: self.tx.clone(),
+ }
+ }
+}