summaryrefslogtreecommitdiff
path: root/vendor/tower/src/util
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tower/src/util')
-rw-r--r--vendor/tower/src/util/and_then.rs130
-rw-r--r--vendor/tower/src/util/boxed/layer.rs97
-rw-r--r--vendor/tower/src/util/boxed/layer_clone.rs128
-rw-r--r--vendor/tower/src/util/boxed/layer_clone_sync.rs129
-rw-r--r--vendor/tower/src/util/boxed/mod.rs11
-rw-r--r--vendor/tower/src/util/boxed/sync.rs111
-rw-r--r--vendor/tower/src/util/boxed/unsync.rs86
-rw-r--r--vendor/tower/src/util/boxed_clone.rs136
-rw-r--r--vendor/tower/src/util/boxed_clone_sync.rs101
-rw-r--r--vendor/tower/src/util/call_all/common.rs141
-rw-r--r--vendor/tower/src/util/call_all/mod.rs11
-rw-r--r--vendor/tower/src/util/call_all/ordered.rs177
-rw-r--r--vendor/tower/src/util/call_all/unordered.rs98
-rw-r--r--vendor/tower/src/util/either.rs103
-rw-r--r--vendor/tower/src/util/future_service.rs215
-rw-r--r--vendor/tower/src/util/map_err.rs98
-rw-r--r--vendor/tower/src/util/map_future.rs113
-rw-r--r--vendor/tower/src/util/map_request.rs90
-rw-r--r--vendor/tower/src/util/map_response.rs98
-rw-r--r--vendor/tower/src/util/map_result.rs99
-rw-r--r--vendor/tower/src/util/mod.rs1073
-rw-r--r--vendor/tower/src/util/oneshot.rs105
-rw-r--r--vendor/tower/src/util/optional/error.rs21
-rw-r--r--vendor/tower/src/util/optional/future.rs40
-rw-r--r--vendor/tower/src/util/optional/mod.rs59
-rw-r--r--vendor/tower/src/util/ready.rs103
-rw-r--r--vendor/tower/src/util/rng.rs181
-rw-r--r--vendor/tower/src/util/service_fn.rs82
-rw-r--r--vendor/tower/src/util/then.rs103
29 files changed, 3939 insertions, 0 deletions
diff --git a/vendor/tower/src/util/and_then.rs b/vendor/tower/src/util/and_then.rs
new file mode 100644
index 00000000..adb9ada7
--- /dev/null
+++ b/vendor/tower/src/util/and_then.rs
@@ -0,0 +1,130 @@
+use futures_core::TryFuture;
+use futures_util::{future, TryFutureExt};
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tower_layer::Layer;
+use tower_service::Service;
+
+/// Service returned by the [`and_then`] combinator.
+///
+/// [`and_then`]: crate::util::ServiceExt::and_then
+#[derive(Clone)]
+pub struct AndThen<S, F> {
+ inner: S,
+ f: F,
+}
+
+impl<S, F> fmt::Debug for AndThen<S, F>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AndThen")
+ .field("inner", &self.inner)
+ .field("f", &format_args!("{}", std::any::type_name::<F>()))
+ .finish()
+ }
+}
+
+pin_project_lite::pin_project! {
+ /// Response future from [`AndThen`] services.
+ ///
+ /// [`AndThen`]: crate::util::AndThen
+ pub struct AndThenFuture<F1, F2: TryFuture, N> {
+ #[pin]
+ inner: future::AndThen<future::ErrInto<F1, F2::Error>, F2, N>,
+ }
+}
+
+impl<F1, F2: TryFuture, N> AndThenFuture<F1, F2, N> {
+ pub(crate) fn new(inner: future::AndThen<future::ErrInto<F1, F2::Error>, F2, N>) -> Self {
+ Self { inner }
+ }
+}
+
+impl<F1, F2: TryFuture, N> std::fmt::Debug for AndThenFuture<F1, F2, N> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_tuple("AndThenFuture")
+ .field(&format_args!("..."))
+ .finish()
+ }
+}
+
+impl<F1, F2: TryFuture, N> Future for AndThenFuture<F1, F2, N>
+where
+ future::AndThen<future::ErrInto<F1, F2::Error>, F2, N>: Future,
+{
+ type Output = <future::AndThen<future::ErrInto<F1, F2::Error>, F2, N> as Future>::Output;
+
+ #[inline]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.project().inner.poll(cx)
+ }
+}
+
+/// A [`Layer`] that produces a [`AndThen`] service.
+///
+/// [`Layer`]: tower_layer::Layer
+#[derive(Clone, Debug)]
+pub struct AndThenLayer<F> {
+ f: F,
+}
+
+impl<S, F> AndThen<S, F> {
+ /// Creates a new `AndThen` service.
+ pub const fn new(inner: S, f: F) -> Self {
+ AndThen { f, inner }
+ }
+
+ /// Returns a new [`Layer`] that produces [`AndThen`] services.
+ ///
+ /// This is a convenience function that simply calls [`AndThenLayer::new`].
+ ///
+ /// [`Layer`]: tower_layer::Layer
+ pub fn layer(f: F) -> AndThenLayer<F> {
+ AndThenLayer { f }
+ }
+}
+
+impl<S, F, Request, Fut> Service<Request> for AndThen<S, F>
+where
+ S: Service<Request>,
+ S::Error: Into<Fut::Error>,
+ F: FnOnce(S::Response) -> Fut + Clone,
+ Fut: TryFuture,
+{
+ type Response = Fut::Ok;
+ type Error = Fut::Error;
+ type Future = AndThenFuture<S::Future, Fut, F>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner.poll_ready(cx).map_err(Into::into)
+ }
+
+ fn call(&mut self, request: Request) -> Self::Future {
+ AndThenFuture::new(self.inner.call(request).err_into().and_then(self.f.clone()))
+ }
+}
+
+impl<F> AndThenLayer<F> {
+ /// Creates a new [`AndThenLayer`] layer.
+ pub const fn new(f: F) -> Self {
+ AndThenLayer { f }
+ }
+}
+
+impl<S, F> Layer<S> for AndThenLayer<F>
+where
+ F: Clone,
+{
+ type Service = AndThen<S, F>;
+
+ fn layer(&self, inner: S) -> Self::Service {
+ AndThen {
+ f: self.f.clone(),
+ inner,
+ }
+ }
+}
diff --git a/vendor/tower/src/util/boxed/layer.rs b/vendor/tower/src/util/boxed/layer.rs
new file mode 100644
index 00000000..34e65fa4
--- /dev/null
+++ b/vendor/tower/src/util/boxed/layer.rs
@@ -0,0 +1,97 @@
+use crate::util::BoxService;
+use std::{fmt, sync::Arc};
+use tower_layer::{layer_fn, Layer};
+use tower_service::Service;
+
+/// A boxed [`Layer`] trait object.
+///
+/// [`BoxLayer`] turns a layer into a trait object, allowing both the [`Layer`] itself
+/// and the output [`Service`] to be dynamic, while having consistent types.
+///
+/// This [`Layer`] produces [`BoxService`] instances erasing the type of the
+/// [`Service`] produced by the wrapped [`Layer`].
+///
+/// # Example
+///
+/// `BoxLayer` can, for example, be useful to create layers dynamically that otherwise wouldn't have
+/// the same types. In this example, we include a [`Timeout`] layer
+/// only if an environment variable is set. We can use `BoxLayer`
+/// to return a consistent type regardless of runtime configuration:
+///
+/// ```
+/// use std::time::Duration;
+/// use tower::{Service, ServiceBuilder, BoxError, util::BoxLayer};
+///
+/// fn common_layer<S, T>() -> BoxLayer<S, T, S::Response, BoxError>
+/// where
+/// S: Service<T> + Send + 'static,
+/// S::Future: Send + 'static,
+/// S::Error: Into<BoxError> + 'static,
+/// {
+/// let builder = ServiceBuilder::new()
+/// .concurrency_limit(100);
+///
+/// if std::env::var("SET_TIMEOUT").is_ok() {
+/// let layer = builder
+/// .timeout(Duration::from_secs(30))
+/// .into_inner();
+///
+/// BoxLayer::new(layer)
+/// } else {
+/// let layer = builder
+/// .map_err(Into::into)
+/// .into_inner();
+///
+/// BoxLayer::new(layer)
+/// }
+/// }
+/// ```
+///
+/// [`Layer`]: tower_layer::Layer
+/// [`Service`]: tower_service::Service
+/// [`BoxService`]: super::BoxService
+/// [`Timeout`]: crate::timeout
+pub struct BoxLayer<In, T, U, E> {
+ boxed: Arc<dyn Layer<In, Service = BoxService<T, U, E>> + Send + Sync + 'static>,
+}
+
+impl<In, T, U, E> BoxLayer<In, T, U, E> {
+ /// Create a new [`BoxLayer`].
+ pub fn new<L>(inner_layer: L) -> Self
+ where
+ L: Layer<In> + Send + Sync + 'static,
+ L::Service: Service<T, Response = U, Error = E> + Send + 'static,
+ <L::Service as Service<T>>::Future: Send + 'static,
+ {
+ let layer = layer_fn(move |inner: In| {
+ let out = inner_layer.layer(inner);
+ BoxService::new(out)
+ });
+
+ Self {
+ boxed: Arc::new(layer),
+ }
+ }
+}
+
+impl<In, T, U, E> Layer<In> for BoxLayer<In, T, U, E> {
+ type Service = BoxService<T, U, E>;
+
+ fn layer(&self, inner: In) -> Self::Service {
+ self.boxed.layer(inner)
+ }
+}
+
+impl<In, T, U, E> Clone for BoxLayer<In, T, U, E> {
+ fn clone(&self) -> Self {
+ Self {
+ boxed: Arc::clone(&self.boxed),
+ }
+ }
+}
+
+impl<In, T, U, E> fmt::Debug for BoxLayer<In, T, U, E> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("BoxLayer").finish()
+ }
+}
diff --git a/vendor/tower/src/util/boxed/layer_clone.rs b/vendor/tower/src/util/boxed/layer_clone.rs
new file mode 100644
index 00000000..1f626899
--- /dev/null
+++ b/vendor/tower/src/util/boxed/layer_clone.rs
@@ -0,0 +1,128 @@
+use crate::util::BoxCloneService;
+use std::{fmt, sync::Arc};
+use tower_layer::{layer_fn, Layer};
+use tower_service::Service;
+
+/// A [`Clone`] + [`Send`] boxed [`Layer`].
+///
+/// [`BoxCloneServiceLayer`] turns a layer into a trait object, allowing both the [`Layer`] itself
+/// and the output [`Service`] to be dynamic, while having consistent types.
+///
+/// This [`Layer`] produces [`BoxCloneService`] instances erasing the type of the
+/// [`Service`] produced by the wrapped [`Layer`].
+///
+/// This is similar to [`BoxLayer`](super::BoxLayer) except the layer and resulting
+/// service implements [`Clone`].
+///
+/// # Example
+///
+/// `BoxCloneServiceLayer` can, for example, be useful to create layers dynamically that otherwise wouldn't have
+/// the same types, when the underlying service must be clone (for example, when building a MakeService)
+/// In this example, we include a [`Timeout`] layer only if an environment variable is set. We can use
+/// `BoxCloneService` to return a consistent type regardless of runtime configuration:
+///
+/// ```
+/// use std::time::Duration;
+/// use tower::{Service, ServiceBuilder, BoxError};
+/// use tower::util::{BoxCloneServiceLayer, BoxCloneService};
+///
+/// #
+/// # struct Request;
+/// # struct Response;
+/// # impl Response {
+/// # fn new() -> Self { Self }
+/// # }
+///
+/// fn common_layer<S, T>() -> BoxCloneServiceLayer<S, T, S::Response, BoxError>
+/// where
+/// S: Service<T> + Clone + Send + 'static,
+/// S::Future: Send + 'static,
+/// S::Error: Into<BoxError> + 'static,
+/// {
+/// let builder = ServiceBuilder::new()
+/// .concurrency_limit(100);
+///
+/// if std::env::var("SET_TIMEOUT").is_ok() {
+/// let layer = builder
+/// .timeout(Duration::from_secs(30))
+/// .into_inner();
+///
+/// BoxCloneServiceLayer::new(layer)
+/// } else {
+/// let layer = builder
+/// .map_err(Into::into)
+/// .into_inner();
+///
+/// BoxCloneServiceLayer::new(layer)
+/// }
+/// }
+///
+/// // We can clone the layer (this is true of BoxLayer as well)
+/// let boxed_clone_layer = common_layer();
+///
+/// let cloned_layer = boxed_clone_layer.clone();
+///
+/// // Using the `BoxCloneServiceLayer` we can create a `BoxCloneService`
+/// let service: BoxCloneService<Request, Response, BoxError> = ServiceBuilder::new().layer(boxed_clone_layer)
+/// .service_fn(|req: Request| async {
+/// Ok::<_, BoxError>(Response::new())
+/// });
+///
+/// # let service = assert_service(service);
+///
+/// // And we can still clone the service
+/// let cloned_service = service.clone();
+/// #
+/// # fn assert_service<S, R>(svc: S) -> S
+/// # where S: Service<R> { svc }
+///
+/// ```
+///
+/// [`Layer`]: tower_layer::Layer
+/// [`Service`]: tower_service::Service
+/// [`BoxService`]: super::BoxService
+/// [`Timeout`]: crate::timeout
+pub struct BoxCloneServiceLayer<In, T, U, E> {
+ boxed: Arc<dyn Layer<In, Service = BoxCloneService<T, U, E>> + Send + Sync + 'static>,
+}
+
+impl<In, T, U, E> BoxCloneServiceLayer<In, T, U, E> {
+ /// Create a new [`BoxCloneServiceLayer`].
+ pub fn new<L>(inner_layer: L) -> Self
+ where
+ L: Layer<In> + Send + Sync + 'static,
+ L::Service: Service<T, Response = U, Error = E> + Send + Clone + 'static,
+ <L::Service as Service<T>>::Future: Send + 'static,
+ {
+ let layer = layer_fn(move |inner: In| {
+ let out = inner_layer.layer(inner);
+ BoxCloneService::new(out)
+ });
+
+ Self {
+ boxed: Arc::new(layer),
+ }
+ }
+}
+
+impl<In, T, U, E> Layer<In> for BoxCloneServiceLayer<In, T, U, E> {
+ type Service = BoxCloneService<T, U, E>;
+
+ fn layer(&self, inner: In) -> Self::Service {
+ self.boxed.layer(inner)
+ }
+}
+
+impl<In, T, U, E> Clone for BoxCloneServiceLayer<In, T, U, E> {
+ fn clone(&self) -> Self {
+ Self {
+ boxed: Arc::clone(&self.boxed),
+ }
+ }
+}
+
+impl<In, T, U, E> fmt::Debug for BoxCloneServiceLayer<In, T, U, E> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("BoxCloneServiceLayer").finish()
+ }
+}
diff --git a/vendor/tower/src/util/boxed/layer_clone_sync.rs b/vendor/tower/src/util/boxed/layer_clone_sync.rs
new file mode 100644
index 00000000..950e66be
--- /dev/null
+++ b/vendor/tower/src/util/boxed/layer_clone_sync.rs
@@ -0,0 +1,129 @@
+use std::{fmt, sync::Arc};
+use tower_layer::{layer_fn, Layer};
+use tower_service::Service;
+
+use crate::util::BoxCloneSyncService;
+
+/// A [`Clone`] + [`Send`] + [`Sync`] boxed [`Layer`].
+///
+/// [`BoxCloneSyncServiceLayer`] turns a layer into a trait object, allowing both the [`Layer`] itself
+/// and the output [`Service`] to be dynamic, while having consistent types.
+///
+/// This [`Layer`] produces [`BoxCloneSyncService`] instances erasing the type of the
+/// [`Service`] produced by the wrapped [`Layer`].
+///
+/// This is similar to [`BoxCloneServiceLayer`](super::BoxCloneServiceLayer) except the layer and resulting
+/// service implements [`Sync`].
+///
+/// # Example
+///
+/// `BoxCloneSyncServiceLayer` can, for example, be useful to create layers dynamically that otherwise wouldn't have
+/// the same types, when the underlying service must be clone and sync (for example, when building a Hyper connector).
+/// In this example, we include a [`Timeout`] layer only if an environment variable is set. We can use
+/// `BoxCloneSyncServiceLayer` to return a consistent type regardless of runtime configuration:
+///
+/// ```
+/// use std::time::Duration;
+/// use tower::{Service, ServiceBuilder, BoxError};
+/// use tower::util::{BoxCloneSyncServiceLayer, BoxCloneSyncService};
+///
+/// #
+/// # struct Request;
+/// # struct Response;
+/// # impl Response {
+/// # fn new() -> Self { Self }
+/// # }
+///
+/// fn common_layer<S, T>() -> BoxCloneSyncServiceLayer<S, T, S::Response, BoxError>
+/// where
+/// S: Service<T> + Clone + Send + Sync + 'static,
+/// S::Future: Send + 'static,
+/// S::Error: Into<BoxError> + 'static,
+/// {
+/// let builder = ServiceBuilder::new()
+/// .concurrency_limit(100);
+///
+/// if std::env::var("SET_TIMEOUT").is_ok() {
+/// let layer = builder
+/// .timeout(Duration::from_secs(30))
+/// .into_inner();
+///
+/// BoxCloneSyncServiceLayer::new(layer)
+/// } else {
+/// let layer = builder
+/// .map_err(Into::into)
+/// .into_inner();
+///
+/// BoxCloneSyncServiceLayer::new(layer)
+/// }
+/// }
+///
+/// // We can clone the layer (this is true of BoxLayer as well)
+/// let boxed_clone_sync_layer = common_layer();
+///
+/// let cloned_sync_layer = boxed_clone_sync_layer.clone();
+///
+/// // Using the `BoxCloneSyncServiceLayer` we can create a `BoxCloneSyncService`
+/// let service: BoxCloneSyncService<Request, Response, BoxError> = ServiceBuilder::new().layer(cloned_sync_layer)
+/// .service_fn(|req: Request| async {
+/// Ok::<_, BoxError>(Response::new())
+/// });
+///
+/// # let service = assert_service(service);
+///
+/// // And we can still clone the service
+/// let cloned_service = service.clone();
+/// #
+/// # fn assert_service<S, R>(svc: S) -> S
+/// # where S: Service<R> { svc }
+///
+/// ```
+///
+/// [`Layer`]: tower_layer::Layer
+/// [`Service`]: tower_service::Service
+/// [`BoxService`]: super::BoxService
+/// [`Timeout`]: crate::timeout
+pub struct BoxCloneSyncServiceLayer<In, T, U, E> {
+ boxed: Arc<dyn Layer<In, Service = BoxCloneSyncService<T, U, E>> + Send + Sync + 'static>,
+}
+
+impl<In, T, U, E> BoxCloneSyncServiceLayer<In, T, U, E> {
+ /// Create a new [`BoxCloneSyncServiceLayer`].
+ pub fn new<L>(inner_layer: L) -> Self
+ where
+ L: Layer<In> + Send + Sync + 'static,
+ L::Service: Service<T, Response = U, Error = E> + Send + Sync + Clone + 'static,
+ <L::Service as Service<T>>::Future: Send + 'static,
+ {
+ let layer = layer_fn(move |inner: In| {
+ let out = inner_layer.layer(inner);
+ BoxCloneSyncService::new(out)
+ });
+
+ Self {
+ boxed: Arc::new(layer),
+ }
+ }
+}
+
+impl<In, T, U, E> Layer<In> for BoxCloneSyncServiceLayer<In, T, U, E> {
+ type Service = BoxCloneSyncService<T, U, E>;
+
+ fn layer(&self, inner: In) -> Self::Service {
+ self.boxed.layer(inner)
+ }
+}
+
+impl<In, T, U, E> Clone for BoxCloneSyncServiceLayer<In, T, U, E> {
+ fn clone(&self) -> Self {
+ Self {
+ boxed: Arc::clone(&self.boxed),
+ }
+ }
+}
+
+impl<In, T, U, E> fmt::Debug for BoxCloneSyncServiceLayer<In, T, U, E> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("BoxCloneSyncServiceLayer").finish()
+ }
+}
diff --git a/vendor/tower/src/util/boxed/mod.rs b/vendor/tower/src/util/boxed/mod.rs
new file mode 100644
index 00000000..7da5d63c
--- /dev/null
+++ b/vendor/tower/src/util/boxed/mod.rs
@@ -0,0 +1,11 @@
+mod layer;
+mod layer_clone;
+mod layer_clone_sync;
+mod sync;
+mod unsync;
+
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::{
+ layer::BoxLayer, layer_clone::BoxCloneServiceLayer, layer_clone_sync::BoxCloneSyncServiceLayer,
+ sync::BoxService, unsync::UnsyncBoxService,
+};
diff --git a/vendor/tower/src/util/boxed/sync.rs b/vendor/tower/src/util/boxed/sync.rs
new file mode 100644
index 00000000..57dcfec7
--- /dev/null
+++ b/vendor/tower/src/util/boxed/sync.rs
@@ -0,0 +1,111 @@
+use crate::ServiceExt;
+use tower_layer::{layer_fn, LayerFn};
+use tower_service::Service;
+
+use sync_wrapper::SyncWrapper;
+
+use std::fmt;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+/// A boxed `Service + Send` trait object.
+///
+/// [`BoxService`] turns a service into a trait object, allowing the response
+/// future type to be dynamic. This type requires both the service and the
+/// response future to be [`Send`].
+///
+/// If you need a boxed [`Service`] that implements [`Clone`] consider using
+/// [`BoxCloneService`](crate::util::BoxCloneService).
+///
+/// Dynamically dispatched [`Service`] objects allow for erasing the underlying
+/// [`Service`] type and using the `Service` instances as opaque handles. This can
+/// be useful when the service instance cannot be explicitly named for whatever
+/// reason.
+///
+/// # Examples
+///
+/// ```
+/// use futures_util::future::ready;
+/// # use tower_service::Service;
+/// # use tower::util::{BoxService, service_fn};
+/// // Respond to requests using a closure, but closures cannot be named...
+/// # pub fn main() {
+/// let svc = service_fn(|mut request: String| {
+/// request.push_str(" response");
+/// ready(Ok(request))
+/// });
+///
+/// let service: BoxService<String, String, ()> = BoxService::new(svc);
+/// # drop(service);
+/// }
+/// ```
+///
+/// [`Service`]: crate::Service
+/// [`Rc`]: std::rc::Rc
+pub struct BoxService<T, U, E> {
+ inner:
+ SyncWrapper<Box<dyn Service<T, Response = U, Error = E, Future = BoxFuture<U, E>> + Send>>,
+}
+
+/// A boxed `Future + Send` trait object.
+///
+/// This type alias represents a boxed future that is [`Send`] and can be moved
+/// across threads.
+type BoxFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;
+
+impl<T, U, E> BoxService<T, U, E> {
+ #[allow(missing_docs)]
+ pub fn new<S>(inner: S) -> Self
+ where
+ S: Service<T, Response = U, Error = E> + Send + 'static,
+ S::Future: Send + 'static,
+ {
+ // rust can't infer the type
+ let inner: Box<dyn Service<T, Response = U, Error = E, Future = BoxFuture<U, E>> + Send> =
+ Box::new(inner.map_future(|f: S::Future| Box::pin(f) as _));
+ let inner = SyncWrapper::new(inner);
+ BoxService { inner }
+ }
+
+ /// Returns a [`Layer`] for wrapping a [`Service`] in a [`BoxService`]
+ /// middleware.
+ ///
+ /// [`Layer`]: crate::Layer
+ pub fn layer<S>() -> LayerFn<fn(S) -> Self>
+ where
+ S: Service<T, Response = U, Error = E> + Send + 'static,
+ S::Future: Send + 'static,
+ {
+ layer_fn(Self::new)
+ }
+}
+
+impl<T, U, E> Service<T> for BoxService<T, U, E> {
+ type Response = U;
+ type Error = E;
+ type Future = BoxFuture<U, E>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
+ self.inner.get_mut().poll_ready(cx)
+ }
+
+ fn call(&mut self, request: T) -> BoxFuture<U, E> {
+ self.inner.get_mut().call(request)
+ }
+}
+
+impl<T, U, E> fmt::Debug for BoxService<T, U, E> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("BoxService").finish()
+ }
+}
+
+#[test]
+fn is_sync() {
+ fn assert_sync<T: Sync>() {}
+
+ assert_sync::<BoxService<(), (), ()>>();
+}
diff --git a/vendor/tower/src/util/boxed/unsync.rs b/vendor/tower/src/util/boxed/unsync.rs
new file mode 100644
index 00000000..f645f169
--- /dev/null
+++ b/vendor/tower/src/util/boxed/unsync.rs
@@ -0,0 +1,86 @@
+use tower_layer::{layer_fn, LayerFn};
+use tower_service::Service;
+
+use std::fmt;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+/// A boxed [`Service`] trait object.
+pub struct UnsyncBoxService<T, U, E> {
+ inner: Box<dyn Service<T, Response = U, Error = E, Future = UnsyncBoxFuture<U, E>>>,
+}
+
+/// A boxed [`Future`] trait object.
+///
+/// This type alias represents a boxed future that is *not* [`Send`] and must
+/// remain on the current thread.
+type UnsyncBoxFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>>>>;
+
+#[derive(Debug)]
+struct UnsyncBoxed<S> {
+ inner: S,
+}
+
+impl<T, U, E> UnsyncBoxService<T, U, E> {
+ #[allow(missing_docs)]
+ pub fn new<S>(inner: S) -> Self
+ where
+ S: Service<T, Response = U, Error = E> + 'static,
+ S::Future: 'static,
+ {
+ let inner = Box::new(UnsyncBoxed { inner });
+ UnsyncBoxService { inner }
+ }
+
+ /// Returns a [`Layer`] for wrapping a [`Service`] in an [`UnsyncBoxService`] middleware.
+ ///
+ /// [`Layer`]: crate::Layer
+ pub fn layer<S>() -> LayerFn<fn(S) -> Self>
+ where
+ S: Service<T, Response = U, Error = E> + 'static,
+ S::Future: 'static,
+ {
+ layer_fn(Self::new)
+ }
+}
+
+impl<T, U, E> Service<T> for UnsyncBoxService<T, U, E> {
+ type Response = U;
+ type Error = E;
+ type Future = UnsyncBoxFuture<U, E>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
+ self.inner.poll_ready(cx)
+ }
+
+ fn call(&mut self, request: T) -> UnsyncBoxFuture<U, E> {
+ self.inner.call(request)
+ }
+}
+
+impl<T, U, E> fmt::Debug for UnsyncBoxService<T, U, E> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("UnsyncBoxService").finish()
+ }
+}
+
+impl<S, Request> Service<Request> for UnsyncBoxed<S>
+where
+ S: Service<Request> + 'static,
+ S::Future: 'static,
+{
+ type Response = S::Response;
+ type Error = S::Error;
+ type Future = Pin<Box<dyn Future<Output = Result<S::Response, S::Error>>>>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner.poll_ready(cx)
+ }
+
+ fn call(&mut self, request: Request) -> Self::Future {
+ Box::pin(self.inner.call(request))
+ }
+}
diff --git a/vendor/tower/src/util/boxed_clone.rs b/vendor/tower/src/util/boxed_clone.rs
new file mode 100644
index 00000000..1209fd2e
--- /dev/null
+++ b/vendor/tower/src/util/boxed_clone.rs
@@ -0,0 +1,136 @@
+use super::ServiceExt;
+use futures_util::future::BoxFuture;
+use std::{
+ fmt,
+ task::{Context, Poll},
+};
+use tower_layer::{layer_fn, LayerFn};
+use tower_service::Service;
+
+/// A [`Clone`] + [`Send`] boxed [`Service`].
+///
+/// [`BoxCloneService`] turns a service into a trait object, allowing the
+/// response future type to be dynamic, and allowing the service to be cloned.
+///
+/// This is similar to [`BoxService`](super::BoxService) except the resulting
+/// service implements [`Clone`].
+///
+/// # Example
+///
+/// ```
+/// use tower::{Service, ServiceBuilder, BoxError, util::BoxCloneService};
+/// use std::time::Duration;
+/// #
+/// # struct Request;
+/// # struct Response;
+/// # impl Response {
+/// # fn new() -> Self { Self }
+/// # }
+///
+/// // This service has a complex type that is hard to name
+/// let service = ServiceBuilder::new()
+/// .map_request(|req| {
+/// println!("received request");
+/// req
+/// })
+/// .map_response(|res| {
+/// println!("response produced");
+/// res
+/// })
+/// .load_shed()
+/// .concurrency_limit(64)
+/// .timeout(Duration::from_secs(10))
+/// .service_fn(|req: Request| async {
+/// Ok::<_, BoxError>(Response::new())
+/// });
+/// # let service = assert_service(service);
+///
+/// // `BoxCloneService` will erase the type so it's nameable
+/// let service: BoxCloneService<Request, Response, BoxError> = BoxCloneService::new(service);
+/// # let service = assert_service(service);
+///
+/// // And we can still clone the service
+/// let cloned_service = service.clone();
+/// #
+/// # fn assert_service<S, R>(svc: S) -> S
+/// # where S: Service<R> { svc }
+/// ```
+pub struct BoxCloneService<T, U, E>(
+ Box<
+ dyn CloneService<T, Response = U, Error = E, Future = BoxFuture<'static, Result<U, E>>>
+ + Send,
+ >,
+);
+
+impl<T, U, E> BoxCloneService<T, U, E> {
+ /// Create a new `BoxCloneService`.
+ pub fn new<S>(inner: S) -> Self
+ where
+ S: Service<T, Response = U, Error = E> + Clone + Send + 'static,
+ S::Future: Send + 'static,
+ {
+ let inner = inner.map_future(|f| Box::pin(f) as _);
+ BoxCloneService(Box::new(inner))
+ }
+
+ /// Returns a [`Layer`] for wrapping a [`Service`] in a [`BoxCloneService`]
+ /// middleware.
+ ///
+ /// [`Layer`]: crate::Layer
+ pub fn layer<S>() -> LayerFn<fn(S) -> Self>
+ where
+ S: Service<T, Response = U, Error = E> + Clone + Send + 'static,
+ S::Future: Send + 'static,
+ {
+ layer_fn(Self::new)
+ }
+}
+
+impl<T, U, E> Service<T> for BoxCloneService<T, U, E> {
+ type Response = U;
+ type Error = E;
+ type Future = BoxFuture<'static, Result<U, E>>;
+
+ #[inline]
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
+ self.0.poll_ready(cx)
+ }
+
+ #[inline]
+ fn call(&mut self, request: T) -> Self::Future {
+ self.0.call(request)
+ }
+}
+
+impl<T, U, E> Clone for BoxCloneService<T, U, E> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone_box())
+ }
+}
+
+trait CloneService<R>: Service<R> {
+ fn clone_box(
+ &self,
+ ) -> Box<
+ dyn CloneService<R, Response = Self::Response, Error = Self::Error, Future = Self::Future>
+ + Send,
+ >;
+}
+
+impl<R, T> CloneService<R> for T
+where
+ T: Service<R> + Send + Clone + 'static,
+{
+ fn clone_box(
+ &self,
+ ) -> Box<dyn CloneService<R, Response = T::Response, Error = T::Error, Future = T::Future> + Send>
+ {
+ Box::new(self.clone())
+ }
+}
+
+impl<T, U, E> fmt::Debug for BoxCloneService<T, U, E> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("BoxCloneService").finish()
+ }
+}
diff --git a/vendor/tower/src/util/boxed_clone_sync.rs b/vendor/tower/src/util/boxed_clone_sync.rs
new file mode 100644
index 00000000..d62e8ff2
--- /dev/null
+++ b/vendor/tower/src/util/boxed_clone_sync.rs
@@ -0,0 +1,101 @@
+use super::ServiceExt;
+use futures_util::future::BoxFuture;
+use std::{
+ fmt,
+ task::{Context, Poll},
+};
+use tower_layer::{layer_fn, LayerFn};
+use tower_service::Service;
+
+/// A [`Clone`] + [`Send`] + [`Sync`] boxed [`Service`].
+///
+/// [`BoxCloneSyncService`] turns a service into a trait object, allowing the
+/// response future type to be dynamic, and allowing the service to be cloned and shared.
+///
+/// This is similar to [`BoxCloneService`](super::BoxCloneService) except the resulting
+/// service implements [`Sync`].
+/// ```
+pub struct BoxCloneSyncService<T, U, E>(
+ Box<
+ dyn CloneService<T, Response = U, Error = E, Future = BoxFuture<'static, Result<U, E>>>
+ + Send
+ + Sync,
+ >,
+);
+
+impl<T, U, E> BoxCloneSyncService<T, U, E> {
+ /// Create a new `BoxCloneSyncService`.
+ pub fn new<S>(inner: S) -> Self
+ where
+ S: Service<T, Response = U, Error = E> + Clone + Send + Sync + 'static,
+ S::Future: Send + 'static,
+ {
+ let inner = inner.map_future(|f| Box::pin(f) as _);
+ BoxCloneSyncService(Box::new(inner))
+ }
+
+ /// Returns a [`Layer`] for wrapping a [`Service`] in a [`BoxCloneSyncService`]
+ /// middleware.
+ ///
+ /// [`Layer`]: crate::Layer
+ pub fn layer<S>() -> LayerFn<fn(S) -> Self>
+ where
+ S: Service<T, Response = U, Error = E> + Clone + Send + Sync + 'static,
+ S::Future: Send + 'static,
+ {
+ layer_fn(Self::new)
+ }
+}
+
+impl<T, U, E> Service<T> for BoxCloneSyncService<T, U, E> {
+ type Response = U;
+ type Error = E;
+ type Future = BoxFuture<'static, Result<U, E>>;
+
+ #[inline]
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> {
+ self.0.poll_ready(cx)
+ }
+
+ #[inline]
+ fn call(&mut self, request: T) -> Self::Future {
+ self.0.call(request)
+ }
+}
+
+impl<T, U, E> Clone for BoxCloneSyncService<T, U, E> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone_box())
+ }
+}
+
+trait CloneService<R>: Service<R> {
+ fn clone_box(
+ &self,
+ ) -> Box<
+ dyn CloneService<R, Response = Self::Response, Error = Self::Error, Future = Self::Future>
+ + Send
+ + Sync,
+ >;
+}
+
+impl<R, T> CloneService<R> for T
+where
+ T: Service<R> + Send + Sync + Clone + 'static,
+{
+ fn clone_box(
+ &self,
+ ) -> Box<
+ dyn CloneService<R, Response = T::Response, Error = T::Error, Future = T::Future>
+ + Send
+ + Sync,
+ > {
+ Box::new(self.clone())
+ }
+}
+
+impl<T, U, E> fmt::Debug for BoxCloneSyncService<T, U, E> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("BoxCloneSyncService").finish()
+ }
+}
diff --git a/vendor/tower/src/util/call_all/common.rs b/vendor/tower/src/util/call_all/common.rs
new file mode 100644
index 00000000..9f2490b6
--- /dev/null
+++ b/vendor/tower/src/util/call_all/common.rs
@@ -0,0 +1,141 @@
+use futures_core::{ready, Stream};
+use pin_project_lite::pin_project;
+use std::{
+ fmt,
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tower_service::Service;
+
+pin_project! {
+ /// The [`Future`] returned by the [`ServiceExt::call_all`] combinator.
+ pub(crate) struct CallAll<Svc, S, Q>
+ where
+ S: Stream,
+ {
+ service: Option<Svc>,
+ #[pin]
+ stream: S,
+ queue: Q,
+ eof: bool,
+ curr_req: Option<S::Item>
+ }
+}
+
+impl<Svc, S, Q> fmt::Debug for CallAll<Svc, S, Q>
+where
+ Svc: fmt::Debug,
+ S: Stream + fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("CallAll")
+ .field("service", &self.service)
+ .field("stream", &self.stream)
+ .field("eof", &self.eof)
+ .finish()
+ }
+}
+
+pub(crate) trait Drive<F: Future> {
+ fn is_empty(&self) -> bool;
+
+ fn push(&mut self, future: F);
+
+ fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>;
+}
+
+impl<Svc, S, Q> CallAll<Svc, S, Q>
+where
+ Svc: Service<S::Item>,
+ S: Stream,
+ Q: Drive<Svc::Future>,
+{
+ pub(crate) const fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> {
+ CallAll {
+ service: Some(service),
+ stream,
+ queue,
+ eof: false,
+ curr_req: None,
+ }
+ }
+
+ /// Extract the wrapped [`Service`].
+ pub(crate) fn into_inner(mut self) -> Svc {
+ self.service.take().expect("Service already taken")
+ }
+
+ /// Extract the wrapped [`Service`].
+ pub(crate) fn take_service(self: Pin<&mut Self>) -> Svc {
+ self.project()
+ .service
+ .take()
+ .expect("Service already taken")
+ }
+
+ pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> {
+ assert!(self.queue.is_empty() && !self.eof);
+
+ super::CallAllUnordered::new(self.service.take().unwrap(), self.stream)
+ }
+}
+
+impl<Svc, S, Q> Stream for CallAll<Svc, S, Q>
+where
+ Svc: Service<S::Item>,
+ S: Stream,
+ Q: Drive<Svc::Future>,
+{
+ type Item = Result<Svc::Response, Svc::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ loop {
+ // First, see if we have any responses to yield
+ if let Poll::Ready(r) = this.queue.poll(cx) {
+ if let Some(rsp) = r.transpose()? {
+ return Poll::Ready(Some(Ok(rsp)));
+ }
+ }
+
+ // If there are no more requests coming, check if we're done
+ if *this.eof {
+ if this.queue.is_empty() {
+ return Poll::Ready(None);
+ } else {
+ return Poll::Pending;
+ }
+ }
+
+ // If not done, and we don't have a stored request, gather the next request from the
+ // stream (if there is one), or return `Pending` if the stream is not ready.
+ if this.curr_req.is_none() {
+ *this.curr_req = match ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(next_req) => Some(next_req),
+ None => {
+ // Mark that there will be no more requests.
+ *this.eof = true;
+ continue;
+ }
+ };
+ }
+
+ // Then, see that the service is ready for another request
+ let svc = this
+ .service
+ .as_mut()
+ .expect("Using CallAll after extracting inner Service");
+
+ if let Err(e) = ready!(svc.poll_ready(cx)) {
+ // Set eof to prevent the service from being called again after a `poll_ready` error
+ *this.eof = true;
+ return Poll::Ready(Some(Err(e)));
+ }
+
+ // Unwrap: The check above always sets `this.curr_req` if none.
+ this.queue.push(svc.call(this.curr_req.take().unwrap()));
+ }
+ }
+}
diff --git a/vendor/tower/src/util/call_all/mod.rs b/vendor/tower/src/util/call_all/mod.rs
new file mode 100644
index 00000000..0cac72d1
--- /dev/null
+++ b/vendor/tower/src/util/call_all/mod.rs
@@ -0,0 +1,11 @@
+//! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream].
+//!
+//! [`Service<Request>`]: crate::Service
+//! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
+
+mod common;
+mod ordered;
+mod unordered;
+
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::{ordered::CallAll, unordered::CallAllUnordered};
diff --git a/vendor/tower/src/util/call_all/ordered.rs b/vendor/tower/src/util/call_all/ordered.rs
new file mode 100644
index 00000000..9a283916
--- /dev/null
+++ b/vendor/tower/src/util/call_all/ordered.rs
@@ -0,0 +1,177 @@
+//! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream].
+//!
+//! [`Service<Request>`]: crate::Service
+//! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
+
+use super::common;
+use futures_core::Stream;
+use futures_util::stream::FuturesOrdered;
+use pin_project_lite::pin_project;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tower_service::Service;
+
+pin_project! {
+ /// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each
+ /// request received on the wrapped [`Stream`].
+ ///
+ /// ```rust
+ /// # use std::task::{Poll, Context};
+ /// # use std::cell::Cell;
+ /// # use std::error::Error;
+ /// # use std::rc::Rc;
+ /// #
+ /// use futures::future::{ready, Ready};
+ /// use futures::StreamExt;
+ /// use futures::channel::mpsc;
+ /// use tower_service::Service;
+ /// use tower::util::ServiceExt;
+ ///
+ /// // First, we need to have a Service to process our requests.
+ /// #[derive(Debug, Eq, PartialEq)]
+ /// struct FirstLetter;
+ /// impl Service<&'static str> for FirstLetter {
+ /// type Response = &'static str;
+ /// type Error = Box<dyn Error + Send + Sync>;
+ /// type Future = Ready<Result<Self::Response, Self::Error>>;
+ ///
+ /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// Poll::Ready(Ok(()))
+ /// }
+ ///
+ /// fn call(&mut self, req: &'static str) -> Self::Future {
+ /// ready(Ok(&req[..1]))
+ /// }
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// // Next, we need a Stream of requests.
+ // TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams,
+ // tokio::sync::mpsc again?
+ /// let (mut reqs, rx) = mpsc::unbounded();
+ /// // Note that we have to help Rust out here by telling it what error type to use.
+ /// // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
+ /// let mut rsps = FirstLetter.call_all(rx);
+ ///
+ /// // Now, let's send a few requests and then check that we get the corresponding responses.
+ /// reqs.unbounded_send("one").unwrap();
+ /// reqs.unbounded_send("two").unwrap();
+ /// reqs.unbounded_send("three").unwrap();
+ /// drop(reqs);
+ ///
+ /// // We then loop over the response `Stream` that we get back from call_all.
+ /// let mut i = 0usize;
+ /// while let Some(rsp) = rsps.next().await {
+ /// // Each response is a Result (we could also have used TryStream::try_next)
+ /// match (i + 1, rsp.unwrap()) {
+ /// (1, "o") |
+ /// (2, "t") |
+ /// (3, "t") => {}
+ /// (n, i) => {
+ /// unreachable!("{}. response was '{}'", n, i);
+ /// }
+ /// }
+ /// i += 1;
+ /// }
+ ///
+ /// // And at the end, we can get the Service back when there are no more requests.
+ /// assert_eq!(rsps.into_inner(), FirstLetter);
+ /// }
+ /// ```
+ ///
+ /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
+ #[derive(Debug)]
+ pub struct CallAll<Svc, S>
+ where
+ Svc: Service<S::Item>,
+ S: Stream,
+ {
+ #[pin]
+ inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>,
+ }
+}
+
+impl<Svc, S> CallAll<Svc, S>
+where
+ Svc: Service<S::Item>,
+ S: Stream,
+{
+ /// Create new [`CallAll`] combinator.
+ ///
+ /// Each request yielded by `stream` is passed to `svc`, and the resulting responses are
+ /// yielded in the same order by the implementation of [`Stream`] for [`CallAll`].
+ ///
+ /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
+ pub fn new(service: Svc, stream: S) -> CallAll<Svc, S> {
+ CallAll {
+ inner: common::CallAll::new(service, stream, FuturesOrdered::new()),
+ }
+ }
+
+ /// Extract the wrapped [`Service`].
+ ///
+ /// # Panics
+ ///
+ /// Panics if [`take_service`] was already called.
+ ///
+ /// [`take_service`]: crate::util::CallAll::take_service
+ pub fn into_inner(self) -> Svc {
+ self.inner.into_inner()
+ }
+
+ /// Extract the wrapped [`Service`].
+ ///
+ /// This [`CallAll`] can no longer be used after this function has been called.
+ ///
+ /// # Panics
+ ///
+ /// Panics if [`take_service`] was already called.
+ ///
+ /// [`take_service`]: crate::util::CallAll::take_service
+ pub fn take_service(self: Pin<&mut Self>) -> Svc {
+ self.project().inner.take_service()
+ }
+
+ /// Return responses as they are ready, regardless of the initial order.
+ ///
+ /// This function must be called before the stream is polled.
+ ///
+ /// # Panics
+ ///
+ /// Panics if [`poll`] was called.
+ ///
+ /// [`poll`]: std::future::Future::poll
+ pub fn unordered(self) -> super::CallAllUnordered<Svc, S> {
+ self.inner.unordered()
+ }
+}
+
+impl<Svc, S> Stream for CallAll<Svc, S>
+where
+ Svc: Service<S::Item>,
+ S: Stream,
+{
+ type Item = Result<Svc::Response, Svc::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.project().inner.poll_next(cx)
+ }
+}
+
+impl<F: Future> common::Drive<F> for FuturesOrdered<F> {
+ fn is_empty(&self) -> bool {
+ FuturesOrdered::is_empty(self)
+ }
+
+ fn push(&mut self, future: F) {
+ FuturesOrdered::push_back(self, future)
+ }
+
+ fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
+ Stream::poll_next(Pin::new(self), cx)
+ }
+}
diff --git a/vendor/tower/src/util/call_all/unordered.rs b/vendor/tower/src/util/call_all/unordered.rs
new file mode 100644
index 00000000..3038932f
--- /dev/null
+++ b/vendor/tower/src/util/call_all/unordered.rs
@@ -0,0 +1,98 @@
+//! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream].
+//!
+//! [`Service<Request>`]: crate::Service
+//! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
+
+use super::common;
+use futures_core::Stream;
+use futures_util::stream::FuturesUnordered;
+use pin_project_lite::pin_project;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tower_service::Service;
+
+pin_project! {
+ /// A stream of responses received from the inner service in received order.
+ ///
+ /// Similar to [`CallAll`] except, instead of yielding responses in request order,
+ /// responses are returned as they are available.
+ ///
+ /// [`CallAll`]: crate::util::CallAll
+ #[derive(Debug)]
+ pub struct CallAllUnordered<Svc, S>
+ where
+ Svc: Service<S::Item>,
+ S: Stream,
+ {
+ #[pin]
+ inner: common::CallAll<Svc, S, FuturesUnordered<Svc::Future>>,
+ }
+}
+
+impl<Svc, S> CallAllUnordered<Svc, S>
+where
+ Svc: Service<S::Item>,
+ S: Stream,
+{
+ /// Create new [`CallAllUnordered`] combinator.
+ ///
+ /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
+ pub fn new(service: Svc, stream: S) -> CallAllUnordered<Svc, S> {
+ CallAllUnordered {
+ inner: common::CallAll::new(service, stream, FuturesUnordered::new()),
+ }
+ }
+
+ /// Extract the wrapped [`Service`].
+ ///
+ /// # Panics
+ ///
+ /// Panics if [`take_service`] was already called.
+ ///
+ /// [`take_service`]: crate::util::CallAllUnordered::take_service
+ pub fn into_inner(self) -> Svc {
+ self.inner.into_inner()
+ }
+
+ /// Extract the wrapped `Service`.
+ ///
+ /// This [`CallAllUnordered`] can no longer be used after this function has been called.
+ ///
+ /// # Panics
+ ///
+ /// Panics if [`take_service`] was already called.
+ ///
+ /// [`take_service`]: crate::util::CallAllUnordered::take_service
+ pub fn take_service(self: Pin<&mut Self>) -> Svc {
+ self.project().inner.take_service()
+ }
+}
+
+impl<Svc, S> Stream for CallAllUnordered<Svc, S>
+where
+ Svc: Service<S::Item>,
+ S: Stream,
+{
+ type Item = Result<Svc::Response, Svc::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.project().inner.poll_next(cx)
+ }
+}
+
+impl<F: Future> common::Drive<F> for FuturesUnordered<F> {
+ fn is_empty(&self) -> bool {
+ FuturesUnordered::is_empty(self)
+ }
+
+ fn push(&mut self, future: F) {
+ FuturesUnordered::push(self, future)
+ }
+
+ fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
+ Stream::poll_next(Pin::new(self), cx)
+ }
+}
diff --git a/vendor/tower/src/util/either.rs b/vendor/tower/src/util/either.rs
new file mode 100644
index 00000000..371abb4d
--- /dev/null
+++ b/vendor/tower/src/util/either.rs
@@ -0,0 +1,103 @@
+//! Contains [`Either`] and related types and functions.
+//!
+//! See [`Either`] documentation for more details.
+
+use pin_project_lite::pin_project;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tower_layer::Layer;
+use tower_service::Service;
+
+/// Combine two different service types into a single type.
+///
+/// Both services must be of the same request, response, and error types.
+/// [`Either`] is useful for handling conditional branching in service middleware
+/// to different inner service types.
+#[derive(Clone, Copy, Debug)]
+pub enum Either<A, B> {
+ #[allow(missing_docs)]
+ Left(A),
+ #[allow(missing_docs)]
+ Right(B),
+}
+
+impl<A, B, Request> Service<Request> for Either<A, B>
+where
+ A: Service<Request>,
+ B: Service<Request, Response = A::Response, Error = A::Error>,
+{
+ type Response = A::Response;
+ type Error = A::Error;
+ type Future = EitherResponseFuture<A::Future, B::Future>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ match self {
+ Either::Left(service) => service.poll_ready(cx),
+ Either::Right(service) => service.poll_ready(cx),
+ }
+ }
+
+ fn call(&mut self, request: Request) -> Self::Future {
+ match self {
+ Either::Left(service) => EitherResponseFuture {
+ kind: Kind::Left {
+ inner: service.call(request),
+ },
+ },
+ Either::Right(service) => EitherResponseFuture {
+ kind: Kind::Right {
+ inner: service.call(request),
+ },
+ },
+ }
+ }
+}
+
+pin_project! {
+ /// Response future for [`Either`].
+ pub struct EitherResponseFuture<A, B> {
+ #[pin]
+ kind: Kind<A, B>
+ }
+}
+
+pin_project! {
+ #[project = KindProj]
+ enum Kind<A, B> {
+ Left { #[pin] inner: A },
+ Right { #[pin] inner: B },
+ }
+}
+
+impl<A, B> Future for EitherResponseFuture<A, B>
+where
+ A: Future,
+ B: Future<Output = A::Output>,
+{
+ type Output = A::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match self.project().kind.project() {
+ KindProj::Left { inner } => inner.poll(cx),
+ KindProj::Right { inner } => inner.poll(cx),
+ }
+ }
+}
+
+impl<S, A, B> Layer<S> for Either<A, B>
+where
+ A: Layer<S>,
+ B: Layer<S>,
+{
+ type Service = Either<A::Service, B::Service>;
+
+ fn layer(&self, inner: S) -> Self::Service {
+ match self {
+ Either::Left(layer) => Either::Left(layer.layer(inner)),
+ Either::Right(layer) => Either::Right(layer.layer(inner)),
+ }
+ }
+}
diff --git a/vendor/tower/src/util/future_service.rs b/vendor/tower/src/util/future_service.rs
new file mode 100644
index 00000000..c0a36df2
--- /dev/null
+++ b/vendor/tower/src/util/future_service.rs
@@ -0,0 +1,215 @@
+use std::fmt;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tower_service::Service;
+
+/// Returns a new [`FutureService`] for the given future.
+///
+/// A [`FutureService`] allows you to treat a future that resolves to a service as a service. This
+/// can be useful for services that are created asynchronously.
+///
+/// # Example
+/// ```
+/// use tower::{service_fn, Service, ServiceExt};
+/// use tower::util::future_service;
+/// use std::convert::Infallible;
+///
+/// # fn main() {
+/// # async {
+/// // A future which outputs a type implementing `Service`.
+/// let future_of_a_service = async {
+/// let svc = service_fn(|_req: ()| async { Ok::<_, Infallible>("ok") });
+/// Ok::<_, Infallible>(svc)
+/// };
+///
+/// // Wrap the future with a `FutureService`, allowing it to be used
+/// // as a service without awaiting the future's completion:
+/// let mut svc = future_service(Box::pin(future_of_a_service));
+///
+/// // Now, when we wait for the service to become ready, it will
+/// // drive the future to completion internally.
+/// let svc = svc.ready().await.unwrap();
+/// let res = svc.call(()).await.unwrap();
+/// # };
+/// # }
+/// ```
+///
+/// # Regarding the [`Unpin`] bound
+///
+/// The [`Unpin`] bound on `F` is necessary because the future will be polled in
+/// [`Service::poll_ready`] which doesn't have a pinned receiver (it takes `&mut self` and not `self:
+/// Pin<&mut Self>`). So we cannot put the future into a `Pin` without requiring `Unpin`.
+///
+/// This will most likely come up if you're calling `future_service` with an async block. In that
+/// case you can use `Box::pin(async { ... })` as shown in the example.
+pub fn future_service<F, S, R, E>(future: F) -> FutureService<F, S>
+where
+ F: Future<Output = Result<S, E>> + Unpin,
+ S: Service<R, Error = E>,
+{
+ FutureService::new(future)
+}
+
+/// A type that implements [`Service`] for a [`Future`] that produces a [`Service`].
+///
+/// See [`future_service`] for more details.
+#[derive(Clone)]
+pub struct FutureService<F, S> {
+ state: State<F, S>,
+}
+
+impl<F, S> FutureService<F, S> {
+ /// Returns a new [`FutureService`] for the given future.
+ ///
+ /// A [`FutureService`] allows you to treat a future that resolves to a service as a service. This
+ /// can be useful for services that are created asynchronously.
+ ///
+ /// # Example
+ /// ```
+ /// use tower::{service_fn, Service, ServiceExt};
+ /// use tower::util::FutureService;
+ /// use std::convert::Infallible;
+ ///
+ /// # fn main() {
+ /// # async {
+ /// // A future which outputs a type implementing `Service`.
+ /// let future_of_a_service = async {
+ /// let svc = service_fn(|_req: ()| async { Ok::<_, Infallible>("ok") });
+ /// Ok::<_, Infallible>(svc)
+ /// };
+ ///
+ /// // Wrap the future with a `FutureService`, allowing it to be used
+ /// // as a service without awaiting the future's completion:
+ /// let mut svc = FutureService::new(Box::pin(future_of_a_service));
+ ///
+ /// // Now, when we wait for the service to become ready, it will
+ /// // drive the future to completion internally.
+ /// let svc = svc.ready().await.unwrap();
+ /// let res = svc.call(()).await.unwrap();
+ /// # };
+ /// # }
+ /// ```
+ ///
+ /// # Regarding the [`Unpin`] bound
+ ///
+ /// The [`Unpin`] bound on `F` is necessary because the future will be polled in
+ /// [`Service::poll_ready`] which doesn't have a pinned receiver (it takes `&mut self` and not `self:
+ /// Pin<&mut Self>`). So we cannot put the future into a `Pin` without requiring `Unpin`.
+ ///
+ /// This will most likely come up if you're calling `future_service` with an async block. In that
+ /// case you can use `Box::pin(async { ... })` as shown in the example.
+ pub const fn new(future: F) -> Self {
+ Self {
+ state: State::Future(future),
+ }
+ }
+}
+
+impl<F, S> fmt::Debug for FutureService<F, S>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FutureService")
+ .field("state", &format_args!("{:?}", self.state))
+ .finish()
+ }
+}
+
+#[derive(Clone)]
+enum State<F, S> {
+ Future(F),
+ Service(S),
+}
+
+impl<F, S> fmt::Debug for State<F, S>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ State::Future(_) => f
+ .debug_tuple("State::Future")
+ .field(&format_args!("<{}>", std::any::type_name::<F>()))
+ .finish(),
+ State::Service(svc) => f.debug_tuple("State::Service").field(svc).finish(),
+ }
+ }
+}
+
+impl<F, S, R, E> Service<R> for FutureService<F, S>
+where
+ F: Future<Output = Result<S, E>> + Unpin,
+ S: Service<R, Error = E>,
+{
+ type Response = S::Response;
+ type Error = E;
+ type Future = S::Future;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ loop {
+ self.state = match &mut self.state {
+ State::Future(fut) => {
+ let fut = Pin::new(fut);
+ let svc = futures_core::ready!(fut.poll(cx)?);
+ State::Service(svc)
+ }
+ State::Service(svc) => return svc.poll_ready(cx),
+ };
+ }
+ }
+
+ fn call(&mut self, req: R) -> Self::Future {
+ if let State::Service(svc) = &mut self.state {
+ svc.call(req)
+ } else {
+ panic!("FutureService::call was called before FutureService::poll_ready")
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::util::{future_service, ServiceExt};
+ use crate::Service;
+ use futures::future::{ready, Ready};
+ use std::convert::Infallible;
+
+ #[tokio::test]
+ async fn pending_service_debug_impl() {
+ let mut pending_svc = future_service(ready(Ok(DebugService)));
+
+ assert_eq!(
+ format!("{:?}", pending_svc),
+ "FutureService { state: State::Future(<futures_util::future::ready::Ready<core::result::Result<tower::util::future_service::tests::DebugService, core::convert::Infallible>>>) }"
+ );
+
+ pending_svc.ready().await.unwrap();
+
+ assert_eq!(
+ format!("{:?}", pending_svc),
+ "FutureService { state: State::Service(DebugService) }"
+ );
+ }
+
+ #[derive(Debug)]
+ struct DebugService;
+
+ impl Service<()> for DebugService {
+ type Response = ();
+ type Error = Infallible;
+ type Future = Ready<Result<Self::Response, Self::Error>>;
+
+ fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Ok(()).into()
+ }
+
+ fn call(&mut self, _req: ()) -> Self::Future {
+ ready(Ok(()))
+ }
+ }
+}
diff --git a/vendor/tower/src/util/map_err.rs b/vendor/tower/src/util/map_err.rs
new file mode 100644
index 00000000..1b936acb
--- /dev/null
+++ b/vendor/tower/src/util/map_err.rs
@@ -0,0 +1,98 @@
+use futures_util::{future, TryFutureExt};
+use std::fmt;
+use std::task::{Context, Poll};
+use tower_layer::Layer;
+use tower_service::Service;
+
+/// Service returned by the [`map_err`] combinator.
+///
+/// [`map_err`]: crate::util::ServiceExt::map_err
+#[derive(Clone)]
+pub struct MapErr<S, F> {
+ inner: S,
+ f: F,
+}
+
+impl<S, F> fmt::Debug for MapErr<S, F>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MapErr")
+ .field("inner", &self.inner)
+ .field("f", &format_args!("{}", std::any::type_name::<F>()))
+ .finish()
+ }
+}
+
+/// A [`Layer`] that produces [`MapErr`] services.
+///
+/// [`Layer`]: tower_layer::Layer
+#[derive(Clone, Debug)]
+pub struct MapErrLayer<F> {
+ f: F,
+}
+
+opaque_future! {
+ /// Response future from [`MapErr`] services.
+ ///
+ /// [`MapErr`]: crate::util::MapErr
+ pub type MapErrFuture<F, N> = future::MapErr<F, N>;
+}
+
+impl<S, F> MapErr<S, F> {
+ /// Creates a new [`MapErr`] service.
+ pub const fn new(inner: S, f: F) -> Self {
+ MapErr { f, inner }
+ }
+
+ /// Returns a new [`Layer`] that produces [`MapErr`] services.
+ ///
+ /// This is a convenience function that simply calls [`MapErrLayer::new`].
+ ///
+ /// [`Layer`]: tower_layer::Layer
+ pub fn layer(f: F) -> MapErrLayer<F> {
+ MapErrLayer { f }
+ }
+}
+
+impl<S, F, Request, Error> Service<Request> for MapErr<S, F>
+where
+ S: Service<Request>,
+ F: FnOnce(S::Error) -> Error + Clone,
+{
+ type Response = S::Response;
+ type Error = Error;
+ type Future = MapErrFuture<S::Future, F>;
+
+ #[inline]
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner.poll_ready(cx).map_err(self.f.clone())
+ }
+
+ #[inline]
+ fn call(&mut self, request: Request) -> Self::Future {
+ MapErrFuture::new(self.inner.call(request).map_err(self.f.clone()))
+ }
+}
+
+impl<F> MapErrLayer<F> {
+ /// Creates a new [`MapErrLayer`].
+ pub const fn new(f: F) -> Self {
+ MapErrLayer { f }
+ }
+}
+
+impl<S, F> Layer<S> for MapErrLayer<F>
+where
+ F: Clone,
+{
+ type Service = MapErr<S, F>;
+
+ fn layer(&self, inner: S) -> Self::Service {
+ MapErr {
+ f: self.f.clone(),
+ inner,
+ }
+ }
+}
diff --git a/vendor/tower/src/util/map_future.rs b/vendor/tower/src/util/map_future.rs
new file mode 100644
index 00000000..55bf96d0
--- /dev/null
+++ b/vendor/tower/src/util/map_future.rs
@@ -0,0 +1,113 @@
+use std::{
+ fmt,
+ future::Future,
+ task::{Context, Poll},
+};
+use tower_layer::Layer;
+use tower_service::Service;
+
+/// [`Service`] returned by the [`map_future`] combinator.
+///
+/// [`map_future`]: crate::util::ServiceExt::map_future
+#[derive(Clone)]
+pub struct MapFuture<S, F> {
+ inner: S,
+ f: F,
+}
+
+impl<S, F> MapFuture<S, F> {
+ /// Creates a new [`MapFuture`] service.
+ pub const fn new(inner: S, f: F) -> Self {
+ Self { inner, f }
+ }
+
+ /// Returns a new [`Layer`] that produces [`MapFuture`] services.
+ ///
+ /// This is a convenience function that simply calls [`MapFutureLayer::new`].
+ ///
+ /// [`Layer`]: tower_layer::Layer
+ pub fn layer(f: F) -> MapFutureLayer<F> {
+ MapFutureLayer::new(f)
+ }
+
+ /// Get a reference to the inner service
+ pub fn get_ref(&self) -> &S {
+ &self.inner
+ }
+
+ /// Get a mutable reference to the inner service
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.inner
+ }
+
+ /// Consume `self`, returning the inner service
+ pub fn into_inner(self) -> S {
+ self.inner
+ }
+}
+
+impl<R, S, F, T, E, Fut> Service<R> for MapFuture<S, F>
+where
+ S: Service<R>,
+ F: FnMut(S::Future) -> Fut,
+ E: From<S::Error>,
+ Fut: Future<Output = Result<T, E>>,
+{
+ type Response = T;
+ type Error = E;
+ type Future = Fut;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner.poll_ready(cx).map_err(From::from)
+ }
+
+ fn call(&mut self, req: R) -> Self::Future {
+ (self.f)(self.inner.call(req))
+ }
+}
+
+impl<S, F> fmt::Debug for MapFuture<S, F>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MapFuture")
+ .field("inner", &self.inner)
+ .field("f", &format_args!("{}", std::any::type_name::<F>()))
+ .finish()
+ }
+}
+
+/// A [`Layer`] that produces a [`MapFuture`] service.
+///
+/// [`Layer`]: tower_layer::Layer
+#[derive(Clone)]
+pub struct MapFutureLayer<F> {
+ f: F,
+}
+
+impl<F> MapFutureLayer<F> {
+ /// Creates a new [`MapFutureLayer`] layer.
+ pub const fn new(f: F) -> Self {
+ Self { f }
+ }
+}
+
+impl<S, F> Layer<S> for MapFutureLayer<F>
+where
+ F: Clone,
+{
+ type Service = MapFuture<S, F>;
+
+ fn layer(&self, inner: S) -> Self::Service {
+ MapFuture::new(inner, self.f.clone())
+ }
+}
+
+impl<F> fmt::Debug for MapFutureLayer<F> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MapFutureLayer")
+ .field("f", &format_args!("{}", std::any::type_name::<F>()))
+ .finish()
+ }
+}
diff --git a/vendor/tower/src/util/map_request.rs b/vendor/tower/src/util/map_request.rs
new file mode 100644
index 00000000..62f2de3c
--- /dev/null
+++ b/vendor/tower/src/util/map_request.rs
@@ -0,0 +1,90 @@
+use std::fmt;
+use std::task::{Context, Poll};
+use tower_layer::Layer;
+use tower_service::Service;
+
+/// Service returned by the [`MapRequest`] combinator.
+///
+/// [`MapRequest`]: crate::util::ServiceExt::map_request
+#[derive(Clone)]
+pub struct MapRequest<S, F> {
+ inner: S,
+ f: F,
+}
+
+impl<S, F> fmt::Debug for MapRequest<S, F>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MapRequest")
+ .field("inner", &self.inner)
+ .field("f", &format_args!("{}", std::any::type_name::<F>()))
+ .finish()
+ }
+}
+
+impl<S, F> MapRequest<S, F> {
+ /// Creates a new [`MapRequest`] service.
+ pub const fn new(inner: S, f: F) -> Self {
+ MapRequest { inner, f }
+ }
+
+ /// Returns a new [`Layer`] that produces [`MapRequest`] services.
+ ///
+ /// This is a convenience function that simply calls [`MapRequestLayer::new`].
+ ///
+ /// [`Layer`]: tower_layer::Layer
+ pub fn layer(f: F) -> MapRequestLayer<F> {
+ MapRequestLayer { f }
+ }
+}
+
+impl<S, F, R1, R2> Service<R1> for MapRequest<S, F>
+where
+ S: Service<R2>,
+ F: FnMut(R1) -> R2,
+{
+ type Response = S::Response;
+ type Error = S::Error;
+ type Future = S::Future;
+
+ #[inline]
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
+ self.inner.poll_ready(cx)
+ }
+
+ #[inline]
+ fn call(&mut self, request: R1) -> S::Future {
+ self.inner.call((self.f)(request))
+ }
+}
+
+/// A [`Layer`] that produces [`MapRequest`] services.
+///
+/// [`Layer`]: tower_layer::Layer
+#[derive(Clone, Debug)]
+pub struct MapRequestLayer<F> {
+ f: F,
+}
+
+impl<F> MapRequestLayer<F> {
+ /// Creates a new [`MapRequestLayer`].
+ pub const fn new(f: F) -> Self {
+ MapRequestLayer { f }
+ }
+}
+
+impl<S, F> Layer<S> for MapRequestLayer<F>
+where
+ F: Clone,
+{
+ type Service = MapRequest<S, F>;
+
+ fn layer(&self, inner: S) -> Self::Service {
+ MapRequest {
+ f: self.f.clone(),
+ inner,
+ }
+ }
+}
diff --git a/vendor/tower/src/util/map_response.rs b/vendor/tower/src/util/map_response.rs
new file mode 100644
index 00000000..8edac10a
--- /dev/null
+++ b/vendor/tower/src/util/map_response.rs
@@ -0,0 +1,98 @@
+use futures_util::{future::MapOk, TryFutureExt};
+use std::fmt;
+use std::task::{Context, Poll};
+use tower_layer::Layer;
+use tower_service::Service;
+
+/// Service returned by the [`map_response`] combinator.
+///
+/// [`map_response`]: crate::util::ServiceExt::map_response
+#[derive(Clone)]
+pub struct MapResponse<S, F> {
+ inner: S,
+ f: F,
+}
+
+impl<S, F> fmt::Debug for MapResponse<S, F>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MapResponse")
+ .field("inner", &self.inner)
+ .field("f", &format_args!("{}", std::any::type_name::<F>()))
+ .finish()
+ }
+}
+
+/// A [`Layer`] that produces a [`MapResponse`] service.
+///
+/// [`Layer`]: tower_layer::Layer
+#[derive(Debug, Clone)]
+pub struct MapResponseLayer<F> {
+ f: F,
+}
+
+opaque_future! {
+ /// Response future from [`MapResponse`] services.
+ ///
+ /// [`MapResponse`]: crate::util::MapResponse
+ pub type MapResponseFuture<F, N> = MapOk<F, N>;
+}
+
+impl<S, F> MapResponse<S, F> {
+ /// Creates a new `MapResponse` service.
+ pub const fn new(inner: S, f: F) -> Self {
+ MapResponse { f, inner }
+ }
+
+ /// Returns a new [`Layer`] that produces [`MapResponse`] services.
+ ///
+ /// This is a convenience function that simply calls [`MapResponseLayer::new`].
+ ///
+ /// [`Layer`]: tower_layer::Layer
+ pub fn layer(f: F) -> MapResponseLayer<F> {
+ MapResponseLayer { f }
+ }
+}
+
+impl<S, F, Request, Response> Service<Request> for MapResponse<S, F>
+where
+ S: Service<Request>,
+ F: FnOnce(S::Response) -> Response + Clone,
+{
+ type Response = Response;
+ type Error = S::Error;
+ type Future = MapResponseFuture<S::Future, F>;
+
+ #[inline]
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner.poll_ready(cx)
+ }
+
+ #[inline]
+ fn call(&mut self, request: Request) -> Self::Future {
+ MapResponseFuture::new(self.inner.call(request).map_ok(self.f.clone()))
+ }
+}
+
+impl<F> MapResponseLayer<F> {
+ /// Creates a new [`MapResponseLayer`] layer.
+ pub const fn new(f: F) -> Self {
+ MapResponseLayer { f }
+ }
+}
+
+impl<S, F> Layer<S> for MapResponseLayer<F>
+where
+ F: Clone,
+{
+ type Service = MapResponse<S, F>;
+
+ fn layer(&self, inner: S) -> Self::Service {
+ MapResponse {
+ f: self.f.clone(),
+ inner,
+ }
+ }
+}
diff --git a/vendor/tower/src/util/map_result.rs b/vendor/tower/src/util/map_result.rs
new file mode 100644
index 00000000..5a96af2d
--- /dev/null
+++ b/vendor/tower/src/util/map_result.rs
@@ -0,0 +1,99 @@
+use futures_util::{future::Map, FutureExt};
+use std::fmt;
+use std::task::{Context, Poll};
+use tower_layer::Layer;
+use tower_service::Service;
+
+/// Service returned by the [`map_result`] combinator.
+///
+/// [`map_result`]: crate::util::ServiceExt::map_result
+#[derive(Clone)]
+pub struct MapResult<S, F> {
+ inner: S,
+ f: F,
+}
+
+impl<S, F> fmt::Debug for MapResult<S, F>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MapResult")
+ .field("inner", &self.inner)
+ .field("f", &format_args!("{}", std::any::type_name::<F>()))
+ .finish()
+ }
+}
+
+/// A [`Layer`] that produces a [`MapResult`] service.
+///
+/// [`Layer`]: tower_layer::Layer
+#[derive(Debug, Clone)]
+pub struct MapResultLayer<F> {
+ f: F,
+}
+
+opaque_future! {
+ /// Response future from [`MapResult`] services.
+ ///
+ /// [`MapResult`]: crate::util::MapResult
+ pub type MapResultFuture<F, N> = Map<F, N>;
+}
+
+impl<S, F> MapResult<S, F> {
+ /// Creates a new [`MapResult`] service.
+ pub const fn new(inner: S, f: F) -> Self {
+ MapResult { f, inner }
+ }
+
+ /// Returns a new [`Layer`] that produces [`MapResult`] services.
+ ///
+ /// This is a convenience function that simply calls [`MapResultLayer::new`].
+ ///
+ /// [`Layer`]: tower_layer::Layer
+ pub fn layer(f: F) -> MapResultLayer<F> {
+ MapResultLayer { f }
+ }
+}
+
+impl<S, F, Request, Response, Error> Service<Request> for MapResult<S, F>
+where
+ S: Service<Request>,
+ Error: From<S::Error>,
+ F: FnOnce(Result<S::Response, S::Error>) -> Result<Response, Error> + Clone,
+{
+ type Response = Response;
+ type Error = Error;
+ type Future = MapResultFuture<S::Future, F>;
+
+ #[inline]
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner.poll_ready(cx).map_err(Into::into)
+ }
+
+ #[inline]
+ fn call(&mut self, request: Request) -> Self::Future {
+ MapResultFuture::new(self.inner.call(request).map(self.f.clone()))
+ }
+}
+
+impl<F> MapResultLayer<F> {
+ /// Creates a new [`MapResultLayer`] layer.
+ pub const fn new(f: F) -> Self {
+ MapResultLayer { f }
+ }
+}
+
+impl<S, F> Layer<S> for MapResultLayer<F>
+where
+ F: Clone,
+{
+ type Service = MapResult<S, F>;
+
+ fn layer(&self, inner: S) -> Self::Service {
+ MapResult {
+ f: self.f.clone(),
+ inner,
+ }
+ }
+}
diff --git a/vendor/tower/src/util/mod.rs b/vendor/tower/src/util/mod.rs
new file mode 100644
index 00000000..4c56de81
--- /dev/null
+++ b/vendor/tower/src/util/mod.rs
@@ -0,0 +1,1073 @@
+//! Various utility types and functions that are generally used with Tower.
+
+mod and_then;
+mod boxed;
+mod boxed_clone;
+mod boxed_clone_sync;
+mod call_all;
+mod either;
+
+mod future_service;
+mod map_err;
+mod map_request;
+mod map_response;
+mod map_result;
+
+mod map_future;
+mod oneshot;
+mod optional;
+mod ready;
+mod service_fn;
+mod then;
+
+pub mod rng;
+
+pub use self::{
+ and_then::{AndThen, AndThenLayer},
+ boxed::{
+ BoxCloneServiceLayer, BoxCloneSyncServiceLayer, BoxLayer, BoxService, UnsyncBoxService,
+ },
+ boxed_clone::BoxCloneService,
+ boxed_clone_sync::BoxCloneSyncService,
+ either::Either,
+ future_service::{future_service, FutureService},
+ map_err::{MapErr, MapErrLayer},
+ map_future::{MapFuture, MapFutureLayer},
+ map_request::{MapRequest, MapRequestLayer},
+ map_response::{MapResponse, MapResponseLayer},
+ map_result::{MapResult, MapResultLayer},
+ oneshot::Oneshot,
+ optional::Optional,
+ ready::{Ready, ReadyOneshot},
+ service_fn::{service_fn, ServiceFn},
+ then::{Then, ThenLayer},
+};
+
+pub use self::call_all::{CallAll, CallAllUnordered};
+use std::future::Future;
+
+use crate::layer::util::Identity;
+
+pub mod error {
+ //! Error types
+
+ pub use super::optional::error as optional;
+}
+
+pub mod future {
+ //! Future types
+
+ pub use super::and_then::AndThenFuture;
+ pub use super::either::EitherResponseFuture;
+ pub use super::map_err::MapErrFuture;
+ pub use super::map_response::MapResponseFuture;
+ pub use super::map_result::MapResultFuture;
+ pub use super::optional::future as optional;
+ pub use super::then::ThenFuture;
+}
+
+/// An extension trait for `Service`s that provides a variety of convenient
+/// adapters
+pub trait ServiceExt<Request>: tower_service::Service<Request> {
+ /// Yields a mutable reference to the service when it is ready to accept a request.
+ fn ready(&mut self) -> Ready<'_, Self, Request>
+ where
+ Self: Sized,
+ {
+ Ready::new(self)
+ }
+
+ /// Yields the service when it is ready to accept a request.
+ fn ready_oneshot(self) -> ReadyOneshot<Self, Request>
+ where
+ Self: Sized,
+ {
+ ReadyOneshot::new(self)
+ }
+
+ /// Consume this `Service`, calling it with the provided request once it is ready.
+ fn oneshot(self, req: Request) -> Oneshot<Self, Request>
+ where
+ Self: Sized,
+ {
+ Oneshot::new(self, req)
+ }
+
+ /// Process all requests from the given [`Stream`], and produce a [`Stream`] of their responses.
+ ///
+ /// This is essentially [`Stream<Item = Request>`][stream] + `Self` => [`Stream<Item =
+ /// Response>`][stream]. See the documentation for [`CallAll`] for
+ /// details.
+ ///
+ /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
+ /// [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
+ fn call_all<S>(self, reqs: S) -> CallAll<Self, S>
+ where
+ Self: Sized,
+ S: futures_core::Stream<Item = Request>,
+ {
+ CallAll::new(self, reqs)
+ }
+
+ /// Executes a new future after this service's future resolves. This does
+ /// not alter the behaviour of the [`poll_ready`] method.
+ ///
+ /// This method can be used to change the [`Response`] type of the service
+ /// into a different type. You can use this method to chain along a computation once the
+ /// service's response has been resolved.
+ ///
+ /// [`Response`]: crate::Service::Response
+ /// [`poll_ready`]: crate::Service::poll_ready
+ ///
+ /// # Example
+ /// ```
+ /// # use std::task::{Poll, Context};
+ /// # use tower::{Service, ServiceExt};
+ /// #
+ /// # struct DatabaseService;
+ /// # impl DatabaseService {
+ /// # fn new(address: &str) -> Self {
+ /// # DatabaseService
+ /// # }
+ /// # }
+ /// #
+ /// # struct Record {
+ /// # pub name: String,
+ /// # pub age: u16
+ /// # }
+ /// #
+ /// # impl Service<u32> for DatabaseService {
+ /// # type Response = Record;
+ /// # type Error = u8;
+ /// # type Future = futures_util::future::Ready<Result<Record, u8>>;
+ /// #
+ /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// # Poll::Ready(Ok(()))
+ /// # }
+ /// #
+ /// # fn call(&mut self, request: u32) -> Self::Future {
+ /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
+ /// # }
+ /// # }
+ /// #
+ /// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) }
+ /// #
+ /// # fn main() {
+ /// # async {
+ /// // A service returning Result<Record, _>
+ /// let service = DatabaseService::new("127.0.0.1:8080");
+ ///
+ /// // Map the response into a new response
+ /// let mut new_service = service.and_then(|record: Record| async move {
+ /// let name = record.name;
+ /// avatar_lookup(name).await
+ /// });
+ ///
+ /// // Call the new service
+ /// let id = 13;
+ /// let avatar = new_service.call(id).await.unwrap();
+ /// # };
+ /// # }
+ /// ```
+ fn and_then<F>(self, f: F) -> AndThen<Self, F>
+ where
+ Self: Sized,
+ F: Clone,
+ {
+ AndThen::new(self, f)
+ }
+
+ /// Maps this service's response value to a different value. This does not
+ /// alter the behaviour of the [`poll_ready`] method.
+ ///
+ /// This method can be used to change the [`Response`] type of the service
+ /// into a different type. It is similar to the [`Result::map`]
+ /// method. You can use this method to chain along a computation once the
+ /// service's response has been resolved.
+ ///
+ /// [`Response`]: crate::Service::Response
+ /// [`poll_ready`]: crate::Service::poll_ready
+ ///
+ /// # Example
+ /// ```
+ /// # use std::task::{Poll, Context};
+ /// # use tower::{Service, ServiceExt};
+ /// #
+ /// # struct DatabaseService;
+ /// # impl DatabaseService {
+ /// # fn new(address: &str) -> Self {
+ /// # DatabaseService
+ /// # }
+ /// # }
+ /// #
+ /// # struct Record {
+ /// # pub name: String,
+ /// # pub age: u16
+ /// # }
+ /// #
+ /// # impl Service<u32> for DatabaseService {
+ /// # type Response = Record;
+ /// # type Error = u8;
+ /// # type Future = futures_util::future::Ready<Result<Record, u8>>;
+ /// #
+ /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// # Poll::Ready(Ok(()))
+ /// # }
+ /// #
+ /// # fn call(&mut self, request: u32) -> Self::Future {
+ /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
+ /// # }
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # async {
+ /// // A service returning Result<Record, _>
+ /// let service = DatabaseService::new("127.0.0.1:8080");
+ ///
+ /// // Map the response into a new response
+ /// let mut new_service = service.map_response(|record| record.name);
+ ///
+ /// // Call the new service
+ /// let id = 13;
+ /// let name = new_service
+ /// .ready()
+ /// .await?
+ /// .call(id)
+ /// .await?;
+ /// # Ok::<(), u8>(())
+ /// # };
+ /// # }
+ /// ```
+ fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
+ where
+ Self: Sized,
+ F: FnOnce(Self::Response) -> Response + Clone,
+ {
+ MapResponse::new(self, f)
+ }
+
+ /// Maps this service's error value to a different value. This does not
+ /// alter the behaviour of the [`poll_ready`] method.
+ ///
+ /// This method can be used to change the [`Error`] type of the service
+ /// into a different type. It is similar to the [`Result::map_err`] method.
+ ///
+ /// [`Error`]: crate::Service::Error
+ /// [`poll_ready`]: crate::Service::poll_ready
+ ///
+ /// # Example
+ /// ```
+ /// # use std::task::{Poll, Context};
+ /// # use tower::{Service, ServiceExt};
+ /// #
+ /// # struct DatabaseService;
+ /// # impl DatabaseService {
+ /// # fn new(address: &str) -> Self {
+ /// # DatabaseService
+ /// # }
+ /// # }
+ /// #
+ /// # struct Error {
+ /// # pub code: u32,
+ /// # pub message: String
+ /// # }
+ /// #
+ /// # impl Service<u32> for DatabaseService {
+ /// # type Response = String;
+ /// # type Error = Error;
+ /// # type Future = futures_util::future::Ready<Result<String, Error>>;
+ /// #
+ /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// # Poll::Ready(Ok(()))
+ /// # }
+ /// #
+ /// # fn call(&mut self, request: u32) -> Self::Future {
+ /// # futures_util::future::ready(Ok(String::new()))
+ /// # }
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # async {
+ /// // A service returning Result<_, Error>
+ /// let service = DatabaseService::new("127.0.0.1:8080");
+ ///
+ /// // Map the error to a new error
+ /// let mut new_service = service.map_err(|err| err.code);
+ ///
+ /// // Call the new service
+ /// let id = 13;
+ /// let code = new_service
+ /// .ready()
+ /// .await?
+ /// .call(id)
+ /// .await
+ /// .unwrap_err();
+ /// # Ok::<(), u32>(())
+ /// # };
+ /// # }
+ /// ```
+ fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
+ where
+ Self: Sized,
+ F: FnOnce(Self::Error) -> Error + Clone,
+ {
+ MapErr::new(self, f)
+ }
+
+ /// Maps this service's result type (`Result<Self::Response, Self::Error>`)
+ /// to a different value, regardless of whether the future succeeds or
+ /// fails.
+ ///
+ /// This is similar to the [`map_response`] and [`map_err`] combinators,
+ /// except that the *same* function is invoked when the service's future
+ /// completes, whether it completes successfully or fails. This function
+ /// takes the [`Result`] returned by the service's future, and returns a
+ /// [`Result`].
+ ///
+ /// Like the standard library's [`Result::and_then`], this method can be
+ /// used to implement control flow based on `Result` values. For example, it
+ /// may be used to implement error recovery, by turning some [`Err`]
+ /// responses from the service into [`Ok`] responses. Similarly, some
+ /// successful responses from the service could be rejected, by returning an
+ /// [`Err`] conditionally, depending on the value inside the [`Ok`.] Finally,
+ /// this method can also be used to implement behaviors that must run when a
+ /// service's future completes, regardless of whether it succeeded or failed.
+ ///
+ /// This method can be used to change the [`Response`] type of the service
+ /// into a different type. It can also be used to change the [`Error`] type
+ /// of the service. However, because the [`map_result`] function is not applied
+ /// to the errors returned by the service's [`poll_ready`] method, it must
+ /// be possible to convert the service's [`Error`] type into the error type
+ /// returned by the [`map_result`] function. This is trivial when the function
+ /// returns the same error type as the service, but in other cases, it can
+ /// be useful to use [`BoxError`] to erase differing error types.
+ ///
+ /// # Examples
+ ///
+ /// Recovering from certain errors:
+ ///
+ /// ```
+ /// # use std::task::{Poll, Context};
+ /// # use tower::{Service, ServiceExt};
+ /// #
+ /// # struct DatabaseService;
+ /// # impl DatabaseService {
+ /// # fn new(address: &str) -> Self {
+ /// # DatabaseService
+ /// # }
+ /// # }
+ /// #
+ /// # struct Record {
+ /// # pub name: String,
+ /// # pub age: u16
+ /// # }
+ /// # #[derive(Debug)]
+ /// # enum DbError {
+ /// # Parse(std::num::ParseIntError),
+ /// # NoRecordsFound,
+ /// # }
+ /// #
+ /// # impl Service<u32> for DatabaseService {
+ /// # type Response = Vec<Record>;
+ /// # type Error = DbError;
+ /// # type Future = futures_util::future::Ready<Result<Vec<Record>, DbError>>;
+ /// #
+ /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// # Poll::Ready(Ok(()))
+ /// # }
+ /// #
+ /// # fn call(&mut self, request: u32) -> Self::Future {
+ /// # futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }]))
+ /// # }
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # async {
+ /// // A service returning Result<Vec<Record>, DbError>
+ /// let service = DatabaseService::new("127.0.0.1:8080");
+ ///
+ /// // If the database returns no records for the query, we just want an empty `Vec`.
+ /// let mut new_service = service.map_result(|result| match result {
+ /// // If the error indicates that no records matched the query, return an empty
+ /// // `Vec` instead.
+ /// Err(DbError::NoRecordsFound) => Ok(Vec::new()),
+ /// // Propagate all other responses (`Ok` and `Err`) unchanged
+ /// x => x,
+ /// });
+ ///
+ /// // Call the new service
+ /// let id = 13;
+ /// let name = new_service
+ /// .ready()
+ /// .await?
+ /// .call(id)
+ /// .await?;
+ /// # Ok::<(), DbError>(())
+ /// # };
+ /// # }
+ /// ```
+ ///
+ /// Rejecting some `Ok` responses:
+ ///
+ /// ```
+ /// # use std::task::{Poll, Context};
+ /// # use tower::{Service, ServiceExt};
+ /// #
+ /// # struct DatabaseService;
+ /// # impl DatabaseService {
+ /// # fn new(address: &str) -> Self {
+ /// # DatabaseService
+ /// # }
+ /// # }
+ /// #
+ /// # struct Record {
+ /// # pub name: String,
+ /// # pub age: u16
+ /// # }
+ /// # type DbError = String;
+ /// # type AppError = String;
+ /// #
+ /// # impl Service<u32> for DatabaseService {
+ /// # type Response = Record;
+ /// # type Error = DbError;
+ /// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
+ /// #
+ /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// # Poll::Ready(Ok(()))
+ /// # }
+ /// #
+ /// # fn call(&mut self, request: u32) -> Self::Future {
+ /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 }))
+ /// # }
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # async {
+ /// use tower::BoxError;
+ ///
+ /// // A service returning Result<Record, DbError>
+ /// let service = DatabaseService::new("127.0.0.1:8080");
+ ///
+ /// // If the user is zero years old, return an error.
+ /// let mut new_service = service.map_result(|result| {
+ /// let record = result?;
+ ///
+ /// if record.age == 0 {
+ /// // Users must have been born to use our app!
+ /// let app_error = AppError::from("users cannot be 0 years old!");
+ ///
+ /// // Box the error to erase its type (as it can be an `AppError`
+ /// // *or* the inner service's `DbError`).
+ /// return Err(BoxError::from(app_error));
+ /// }
+ ///
+ /// // Otherwise, return the record.
+ /// Ok(record)
+ /// });
+ ///
+ /// // Call the new service
+ /// let id = 13;
+ /// let record = new_service
+ /// .ready()
+ /// .await?
+ /// .call(id)
+ /// .await?;
+ /// # Ok::<(), BoxError>(())
+ /// # };
+ /// # }
+ /// ```
+ ///
+ /// Performing an action that must be run for both successes and failures:
+ ///
+ /// ```
+ /// # use std::convert::TryFrom;
+ /// # use std::task::{Poll, Context};
+ /// # use tower::{Service, ServiceExt};
+ /// #
+ /// # struct DatabaseService;
+ /// # impl DatabaseService {
+ /// # fn new(address: &str) -> Self {
+ /// # DatabaseService
+ /// # }
+ /// # }
+ /// #
+ /// # impl Service<u32> for DatabaseService {
+ /// # type Response = String;
+ /// # type Error = u8;
+ /// # type Future = futures_util::future::Ready<Result<String, u8>>;
+ /// #
+ /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// # Poll::Ready(Ok(()))
+ /// # }
+ /// #
+ /// # fn call(&mut self, request: u32) -> Self::Future {
+ /// # futures_util::future::ready(Ok(String::new()))
+ /// # }
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # async {
+ /// // A service returning Result<Record, DbError>
+ /// let service = DatabaseService::new("127.0.0.1:8080");
+ ///
+ /// // Print a message whenever a query completes.
+ /// let mut new_service = service.map_result(|result| {
+ /// println!("query completed; success={}", result.is_ok());
+ /// result
+ /// });
+ ///
+ /// // Call the new service
+ /// let id = 13;
+ /// let response = new_service
+ /// .ready()
+ /// .await?
+ /// .call(id)
+ /// .await;
+ /// # response
+ /// # };
+ /// # }
+ /// ```
+ ///
+ /// [`map_response`]: ServiceExt::map_response
+ /// [`map_err`]: ServiceExt::map_err
+ /// [`map_result`]: ServiceExt::map_result
+ /// [`Error`]: crate::Service::Error
+ /// [`Response`]: crate::Service::Response
+ /// [`poll_ready`]: crate::Service::poll_ready
+ /// [`BoxError`]: crate::BoxError
+ fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
+ where
+ Self: Sized,
+ Error: From<Self::Error>,
+ F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone,
+ {
+ MapResult::new(self, f)
+ }
+
+ /// Composes a function *in front of* the service.
+ ///
+ /// This adapter produces a new service that passes each value through the
+ /// given function `f` before sending it to `self`.
+ ///
+ /// # Example
+ /// ```
+ /// # use std::convert::TryFrom;
+ /// # use std::task::{Poll, Context};
+ /// # use tower::{Service, ServiceExt};
+ /// #
+ /// # struct DatabaseService;
+ /// # impl DatabaseService {
+ /// # fn new(address: &str) -> Self {
+ /// # DatabaseService
+ /// # }
+ /// # }
+ /// #
+ /// # impl Service<String> for DatabaseService {
+ /// # type Response = String;
+ /// # type Error = u8;
+ /// # type Future = futures_util::future::Ready<Result<String, u8>>;
+ /// #
+ /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// # Poll::Ready(Ok(()))
+ /// # }
+ /// #
+ /// # fn call(&mut self, request: String) -> Self::Future {
+ /// # futures_util::future::ready(Ok(String::new()))
+ /// # }
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # async {
+ /// // A service taking a String as a request
+ /// let service = DatabaseService::new("127.0.0.1:8080");
+ ///
+ /// // Map the request to a new request
+ /// let mut new_service = service.map_request(|id: u32| id.to_string());
+ ///
+ /// // Call the new service
+ /// let id = 13;
+ /// let response = new_service
+ /// .ready()
+ /// .await?
+ /// .call(id)
+ /// .await;
+ /// # response
+ /// # };
+ /// # }
+ /// ```
+ fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
+ where
+ Self: Sized,
+ F: FnMut(NewRequest) -> Request,
+ {
+ MapRequest::new(self, f)
+ }
+
+ /// Composes this service with a [`Filter`] that conditionally accepts or
+ /// rejects requests based on a [predicate].
+ ///
+ /// This adapter produces a new service that passes each value through the
+ /// given function `predicate` before sending it to `self`.
+ ///
+ /// # Example
+ /// ```
+ /// # use std::convert::TryFrom;
+ /// # use std::task::{Poll, Context};
+ /// # use tower::{Service, ServiceExt};
+ /// #
+ /// # struct DatabaseService;
+ /// # impl DatabaseService {
+ /// # fn new(address: &str) -> Self {
+ /// # DatabaseService
+ /// # }
+ /// # }
+ /// #
+ /// # #[derive(Debug)] enum DbError {
+ /// # Parse(std::num::ParseIntError)
+ /// # }
+ /// #
+ /// # impl std::fmt::Display for DbError {
+ /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
+ /// # }
+ /// # impl std::error::Error for DbError {}
+ /// # impl Service<u32> for DatabaseService {
+ /// # type Response = String;
+ /// # type Error = DbError;
+ /// # type Future = futures_util::future::Ready<Result<String, DbError>>;
+ /// #
+ /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// # Poll::Ready(Ok(()))
+ /// # }
+ /// #
+ /// # fn call(&mut self, request: u32) -> Self::Future {
+ /// # futures_util::future::ready(Ok(String::new()))
+ /// # }
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # async {
+ /// // A service taking a u32 as a request and returning Result<_, DbError>
+ /// let service = DatabaseService::new("127.0.0.1:8080");
+ ///
+ /// // Fallibly map the request to a new request
+ /// let mut new_service = service
+ /// .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse));
+ ///
+ /// // Call the new service
+ /// let id = "13";
+ /// let response = new_service
+ /// .ready()
+ /// .await?
+ /// .call(id)
+ /// .await;
+ /// # response
+ /// # };
+ /// # }
+ /// ```
+ ///
+ /// [`Filter`]: crate::filter::Filter
+ /// [predicate]: crate::filter::Predicate
+ #[cfg(feature = "filter")]
+ fn filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F>
+ where
+ Self: Sized,
+ F: crate::filter::Predicate<NewRequest>,
+ {
+ crate::filter::Filter::new(self, filter)
+ }
+
+ /// Composes this service with an [`AsyncFilter`] that conditionally accepts or
+ /// rejects requests based on an [async predicate].
+ ///
+ /// This adapter produces a new service that passes each value through the
+ /// given function `predicate` before sending it to `self`.
+ ///
+ /// # Example
+ /// ```
+ /// # use std::convert::TryFrom;
+ /// # use std::task::{Poll, Context};
+ /// # use tower::{Service, ServiceExt};
+ /// #
+ /// # #[derive(Clone)] struct DatabaseService;
+ /// # impl DatabaseService {
+ /// # fn new(address: &str) -> Self {
+ /// # DatabaseService
+ /// # }
+ /// # }
+ /// # #[derive(Debug)]
+ /// # enum DbError {
+ /// # Rejected
+ /// # }
+ /// # impl std::fmt::Display for DbError {
+ /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) }
+ /// # }
+ /// # impl std::error::Error for DbError {}
+ /// #
+ /// # impl Service<u32> for DatabaseService {
+ /// # type Response = String;
+ /// # type Error = DbError;
+ /// # type Future = futures_util::future::Ready<Result<String, DbError>>;
+ /// #
+ /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// # Poll::Ready(Ok(()))
+ /// # }
+ /// #
+ /// # fn call(&mut self, request: u32) -> Self::Future {
+ /// # futures_util::future::ready(Ok(String::new()))
+ /// # }
+ /// # }
+ /// #
+ /// # fn main() {
+ /// # async {
+ /// // A service taking a u32 as a request and returning Result<_, DbError>
+ /// let service = DatabaseService::new("127.0.0.1:8080");
+ ///
+ /// /// Returns `true` if we should query the database for an ID.
+ /// async fn should_query(id: u32) -> bool {
+ /// // ...
+ /// # true
+ /// }
+ ///
+ /// // Filter requests based on `should_query`.
+ /// let mut new_service = service
+ /// .filter_async(|id: u32| async move {
+ /// if should_query(id).await {
+ /// return Ok(id);
+ /// }
+ ///
+ /// Err(DbError::Rejected)
+ /// });
+ ///
+ /// // Call the new service
+ /// let id = 13;
+ /// # let id: u32 = id;
+ /// let response = new_service
+ /// .ready()
+ /// .await?
+ /// .call(id)
+ /// .await;
+ /// # response
+ /// # };
+ /// # }
+ /// ```
+ ///
+ /// [`AsyncFilter`]: crate::filter::AsyncFilter
+ /// [asynchronous predicate]: crate::filter::AsyncPredicate
+ #[cfg(feature = "filter")]
+ fn filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F>
+ where
+ Self: Sized,
+ F: crate::filter::AsyncPredicate<NewRequest>,
+ {
+ crate::filter::AsyncFilter::new(self, filter)
+ }
+
+ /// Composes an asynchronous function *after* this service.
+ ///
+ /// This takes a function or closure returning a future, and returns a new
+ /// `Service` that chains that function after this service's [`Future`]. The
+ /// new `Service`'s future will consist of this service's future, followed
+ /// by the future returned by calling the chained function with the future's
+ /// [`Output`] type. The chained function is called regardless of whether
+ /// this service's future completes with a successful response or with an
+ /// error.
+ ///
+ /// This method can be thought of as an equivalent to the [`futures`
+ /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that
+ /// _return_ futures, rather than on an individual future. Similarly to that
+ /// combinator, [`ServiceExt::then`] can be used to implement asynchronous
+ /// error recovery, by calling some asynchronous function with errors
+ /// returned by this service. Alternatively, it may also be used to call a
+ /// fallible async function with the successful response of this service.
+ ///
+ /// This method can be used to change the [`Response`] type of the service
+ /// into a different type. It can also be used to change the [`Error`] type
+ /// of the service. However, because the `then` function is not applied
+ /// to the errors returned by the service's [`poll_ready`] method, it must
+ /// be possible to convert the service's [`Error`] type into the error type
+ /// returned by the `then` future. This is trivial when the function
+ /// returns the same error type as the service, but in other cases, it can
+ /// be useful to use [`BoxError`] to erase differing error types.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::task::{Poll, Context};
+ /// # use tower::{Service, ServiceExt};
+ /// #
+ /// # struct DatabaseService;
+ /// # impl DatabaseService {
+ /// # fn new(address: &str) -> Self {
+ /// # DatabaseService
+ /// # }
+ /// # }
+ /// #
+ /// # type Record = ();
+ /// # type DbError = ();
+ /// #
+ /// # impl Service<u32> for DatabaseService {
+ /// # type Response = Record;
+ /// # type Error = DbError;
+ /// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
+ /// #
+ /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// # Poll::Ready(Ok(()))
+ /// # }
+ /// #
+ /// # fn call(&mut self, request: u32) -> Self::Future {
+ /// # futures_util::future::ready(Ok(()))
+ /// # }
+ /// # }
+ /// #
+ /// # fn main() {
+ /// // A service returning Result<Record, DbError>
+ /// let service = DatabaseService::new("127.0.0.1:8080");
+ ///
+ /// // An async function that attempts to recover from errors returned by the
+ /// // database.
+ /// async fn recover_from_error(error: DbError) -> Result<Record, DbError> {
+ /// // ...
+ /// # Ok(())
+ /// }
+ /// # async {
+ ///
+ /// // If the database service returns an error, attempt to recover by
+ /// // calling `recover_from_error`. Otherwise, return the successful response.
+ /// let mut new_service = service.then(|result| async move {
+ /// match result {
+ /// Ok(record) => Ok(record),
+ /// Err(e) => recover_from_error(e).await,
+ /// }
+ /// });
+ ///
+ /// // Call the new service
+ /// let id = 13;
+ /// let record = new_service
+ /// .ready()
+ /// .await?
+ /// .call(id)
+ /// .await?;
+ /// # Ok::<(), DbError>(())
+ /// # };
+ /// # }
+ /// ```
+ ///
+ /// [`Future`]: crate::Service::Future
+ /// [`Output`]: std::future::Future::Output
+ /// [`futures` crate]: https://docs.rs/futures
+ /// [`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then
+ /// [`Error`]: crate::Service::Error
+ /// [`Response`]: crate::Service::Response
+ /// [`poll_ready`]: crate::Service::poll_ready
+ /// [`BoxError`]: crate::BoxError
+ fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
+ where
+ Self: Sized,
+ Error: From<Self::Error>,
+ F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone,
+ Fut: Future<Output = Result<Response, Error>>,
+ {
+ Then::new(self, f)
+ }
+
+ /// Composes a function that transforms futures produced by the service.
+ ///
+ /// This takes a function or closure returning a future computed from the future returned by
+ /// the service's [`call`] method, as opposed to the responses produced by the future.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::task::{Poll, Context};
+ /// # use tower::{Service, ServiceExt, BoxError};
+ /// #
+ /// # struct DatabaseService;
+ /// # impl DatabaseService {
+ /// # fn new(address: &str) -> Self {
+ /// # DatabaseService
+ /// # }
+ /// # }
+ /// #
+ /// # type Record = ();
+ /// # type DbError = crate::BoxError;
+ /// #
+ /// # impl Service<u32> for DatabaseService {
+ /// # type Response = Record;
+ /// # type Error = DbError;
+ /// # type Future = futures_util::future::Ready<Result<Record, DbError>>;
+ /// #
+ /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ /// # Poll::Ready(Ok(()))
+ /// # }
+ /// #
+ /// # fn call(&mut self, request: u32) -> Self::Future {
+ /// # futures_util::future::ready(Ok(()))
+ /// # }
+ /// # }
+ /// #
+ /// # fn main() {
+ /// use std::time::Duration;
+ /// use tokio::time::timeout;
+ ///
+ /// // A service returning Result<Record, DbError>
+ /// let service = DatabaseService::new("127.0.0.1:8080");
+ /// # async {
+ ///
+ /// let mut new_service = service.map_future(|future| async move {
+ /// let res = timeout(Duration::from_secs(1), future).await?;
+ /// Ok::<_, BoxError>(res)
+ /// });
+ ///
+ /// // Call the new service
+ /// let id = 13;
+ /// let record = new_service
+ /// .ready()
+ /// .await?
+ /// .call(id)
+ /// .await?;
+ /// # Ok::<(), BoxError>(())
+ /// # };
+ /// # }
+ /// ```
+ ///
+ /// Note that normally you wouldn't implement timeouts like this and instead use [`Timeout`].
+ ///
+ /// [`call`]: crate::Service::call
+ /// [`Timeout`]: crate::timeout::Timeout
+ fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
+ where
+ Self: Sized,
+ F: FnMut(Self::Future) -> Fut,
+ Error: From<Self::Error>,
+ Fut: Future<Output = Result<Response, Error>>,
+ {
+ MapFuture::new(self, f)
+ }
+
+ /// Convert the service into a [`Service`] + [`Send`] trait object.
+ ///
+ /// See [`BoxService`] for more details.
+ ///
+ /// If `Self` implements the [`Clone`] trait, the [`boxed_clone`] method
+ /// can be used instead, to produce a boxed service which will also
+ /// implement [`Clone`].
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxService};
+ /// #
+ /// # struct Request;
+ /// # struct Response;
+ /// # impl Response {
+ /// # fn new() -> Self { Self }
+ /// # }
+ ///
+ /// let service = service_fn(|req: Request| async {
+ /// Ok::<_, BoxError>(Response::new())
+ /// });
+ ///
+ /// let service: BoxService<Request, Response, BoxError> = service
+ /// .map_request(|req| {
+ /// println!("received request");
+ /// req
+ /// })
+ /// .map_response(|res| {
+ /// println!("response produced");
+ /// res
+ /// })
+ /// .boxed();
+ /// # let service = assert_service(service);
+ /// # fn assert_service<S, R>(svc: S) -> S
+ /// # where S: Service<R> { svc }
+ /// ```
+ ///
+ /// [`Service`]: crate::Service
+ /// [`boxed_clone`]: Self::boxed_clone
+ fn boxed(self) -> BoxService<Request, Self::Response, Self::Error>
+ where
+ Self: Sized + Send + 'static,
+ Self::Future: Send + 'static,
+ {
+ BoxService::new(self)
+ }
+
+ /// Convert the service into a [`Service`] + [`Clone`] + [`Send`] trait object.
+ ///
+ /// This is similar to the [`boxed`] method, but it requires that `Self` implement
+ /// [`Clone`], and the returned boxed service implements [`Clone`].
+ /// See [`BoxCloneService`] for more details.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService};
+ /// #
+ /// # struct Request;
+ /// # struct Response;
+ /// # impl Response {
+ /// # fn new() -> Self { Self }
+ /// # }
+ ///
+ /// let service = service_fn(|req: Request| async {
+ /// Ok::<_, BoxError>(Response::new())
+ /// });
+ ///
+ /// let service: BoxCloneService<Request, Response, BoxError> = service
+ /// .map_request(|req| {
+ /// println!("received request");
+ /// req
+ /// })
+ /// .map_response(|res| {
+ /// println!("response produced");
+ /// res
+ /// })
+ /// .boxed_clone();
+ ///
+ /// // The boxed service can still be cloned.
+ /// service.clone();
+ /// # let service = assert_service(service);
+ /// # fn assert_service<S, R>(svc: S) -> S
+ /// # where S: Service<R> { svc }
+ /// ```
+ ///
+ /// [`Service`]: crate::Service
+ /// [`boxed`]: Self::boxed
+ fn boxed_clone(self) -> BoxCloneService<Request, Self::Response, Self::Error>
+ where
+ Self: Clone + Sized + Send + 'static,
+ Self::Future: Send + 'static,
+ {
+ BoxCloneService::new(self)
+ }
+}
+
+impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {}
+
+/// Convert an `Option<Layer>` into a [`Layer`].
+///
+/// ```
+/// # use std::time::Duration;
+/// # use tower::Service;
+/// # use tower::builder::ServiceBuilder;
+/// use tower::util::option_layer;
+/// # use tower::timeout::TimeoutLayer;
+/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
+/// # let timeout = Some(Duration::new(10, 0));
+/// // Layer to apply a timeout if configured
+/// let maybe_timeout = option_layer(timeout.map(TimeoutLayer::new));
+///
+/// ServiceBuilder::new()
+/// .layer(maybe_timeout)
+/// .service(svc);
+/// # }
+/// ```
+///
+/// [`Layer`]: crate::layer::Layer
+pub fn option_layer<L>(layer: Option<L>) -> Either<L, Identity> {
+ if let Some(layer) = layer {
+ Either::Left(layer)
+ } else {
+ Either::Right(Identity::new())
+ }
+}
diff --git a/vendor/tower/src/util/oneshot.rs b/vendor/tower/src/util/oneshot.rs
new file mode 100644
index 00000000..114b2f82
--- /dev/null
+++ b/vendor/tower/src/util/oneshot.rs
@@ -0,0 +1,105 @@
+use futures_core::ready;
+use pin_project_lite::pin_project;
+use std::{
+ fmt,
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tower_service::Service;
+
+pin_project! {
+ /// A [`Future`] consuming a [`Service`] and request, waiting until the [`Service`]
+ /// is ready, and then calling [`Service::call`] with the request, and
+ /// waiting for that [`Future`].
+ #[derive(Debug)]
+ pub struct Oneshot<S: Service<Req>, Req> {
+ #[pin]
+ state: State<S, Req>,
+ }
+}
+
+pin_project! {
+ #[project = StateProj]
+ enum State<S: Service<Req>, Req> {
+ NotReady {
+ svc: S,
+ req: Option<Req>,
+ },
+ Called {
+ #[pin]
+ fut: S::Future,
+ },
+ Done,
+ }
+}
+
+impl<S: Service<Req>, Req> State<S, Req> {
+ const fn not_ready(svc: S, req: Option<Req>) -> Self {
+ Self::NotReady { svc, req }
+ }
+
+ const fn called(fut: S::Future) -> Self {
+ Self::Called { fut }
+ }
+}
+
+impl<S, Req> fmt::Debug for State<S, Req>
+where
+ S: Service<Req> + fmt::Debug,
+ Req: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ State::NotReady {
+ svc,
+ req: Some(req),
+ } => f
+ .debug_tuple("State::NotReady")
+ .field(svc)
+ .field(req)
+ .finish(),
+ State::NotReady { req: None, .. } => unreachable!(),
+ State::Called { .. } => f.debug_tuple("State::Called").field(&"S::Future").finish(),
+ State::Done => f.debug_tuple("State::Done").finish(),
+ }
+ }
+}
+
+impl<S, Req> Oneshot<S, Req>
+where
+ S: Service<Req>,
+{
+ #[allow(missing_docs)]
+ pub const fn new(svc: S, req: Req) -> Self {
+ Oneshot {
+ state: State::not_ready(svc, Some(req)),
+ }
+ }
+}
+
+impl<S, Req> Future for Oneshot<S, Req>
+where
+ S: Service<Req>,
+{
+ type Output = Result<S::Response, S::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ loop {
+ match this.state.as_mut().project() {
+ StateProj::NotReady { svc, req } => {
+ let _ = ready!(svc.poll_ready(cx))?;
+ let f = svc.call(req.take().expect("already called"));
+ this.state.set(State::called(f));
+ }
+ StateProj::Called { fut } => {
+ let res = ready!(fut.poll(cx))?;
+ this.state.set(State::Done);
+ return Poll::Ready(Ok(res));
+ }
+ StateProj::Done => panic!("polled after complete"),
+ }
+ }
+ }
+}
diff --git a/vendor/tower/src/util/optional/error.rs b/vendor/tower/src/util/optional/error.rs
new file mode 100644
index 00000000..78061335
--- /dev/null
+++ b/vendor/tower/src/util/optional/error.rs
@@ -0,0 +1,21 @@
+use std::{error, fmt};
+
+/// Error returned if the inner [`Service`] has not been set.
+///
+/// [`Service`]: crate::Service
+#[derive(Debug)]
+pub struct None(());
+
+impl None {
+ pub(crate) fn new() -> None {
+ None(())
+ }
+}
+
+impl fmt::Display for None {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "None")
+ }
+}
+
+impl error::Error for None {}
diff --git a/vendor/tower/src/util/optional/future.rs b/vendor/tower/src/util/optional/future.rs
new file mode 100644
index 00000000..7d289b7b
--- /dev/null
+++ b/vendor/tower/src/util/optional/future.rs
@@ -0,0 +1,40 @@
+use super::error;
+use futures_core::ready;
+use pin_project_lite::pin_project;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+pin_project! {
+ /// Response future returned by [`Optional`].
+ ///
+ /// [`Optional`]: crate::util::Optional
+ #[derive(Debug)]
+ pub struct ResponseFuture<T> {
+ #[pin]
+ inner: Option<T>,
+ }
+}
+
+impl<T> ResponseFuture<T> {
+ pub(crate) fn new(inner: Option<T>) -> ResponseFuture<T> {
+ ResponseFuture { inner }
+ }
+}
+
+impl<F, T, E> Future for ResponseFuture<F>
+where
+ F: Future<Output = Result<T, E>>,
+ E: Into<crate::BoxError>,
+{
+ type Output = Result<T, crate::BoxError>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match self.project().inner.as_pin_mut() {
+ Some(inner) => Poll::Ready(Ok(ready!(inner.poll(cx)).map_err(Into::into)?)),
+ None => Poll::Ready(Err(error::None::new().into())),
+ }
+ }
+}
diff --git a/vendor/tower/src/util/optional/mod.rs b/vendor/tower/src/util/optional/mod.rs
new file mode 100644
index 00000000..4d020709
--- /dev/null
+++ b/vendor/tower/src/util/optional/mod.rs
@@ -0,0 +1,59 @@
+//! Contains [`Optional`] and related types and functions.
+//!
+//! See [`Optional`] documentation for more details.
+
+/// Error types for [`Optional`].
+pub mod error;
+/// Future types for [`Optional`].
+pub mod future;
+
+use self::future::ResponseFuture;
+use std::task::{Context, Poll};
+use tower_service::Service;
+
+/// Optionally forwards requests to an inner service.
+///
+/// If the inner service is [`None`], [`optional::None`] is returned as the response.
+///
+/// [`optional::None`]: crate::util::error::optional::None
+#[derive(Debug)]
+pub struct Optional<T> {
+ inner: Option<T>,
+}
+
+impl<T> Optional<T> {
+ /// Create a new [`Optional`].
+ pub const fn new<Request>(inner: Option<T>) -> Optional<T>
+ where
+ T: Service<Request>,
+ T::Error: Into<crate::BoxError>,
+ {
+ Optional { inner }
+ }
+}
+
+impl<T, Request> Service<Request> for Optional<T>
+where
+ T: Service<Request>,
+ T::Error: Into<crate::BoxError>,
+{
+ type Response = T::Response;
+ type Error = crate::BoxError;
+ type Future = ResponseFuture<T::Future>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ match self.inner {
+ Some(ref mut inner) => match inner.poll_ready(cx) {
+ Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)),
+ Poll::Pending => Poll::Pending,
+ },
+ // None services are always ready
+ None => Poll::Ready(Ok(())),
+ }
+ }
+
+ fn call(&mut self, request: Request) -> Self::Future {
+ let inner = self.inner.as_mut().map(|i| i.call(request));
+ ResponseFuture::new(inner)
+ }
+}
diff --git a/vendor/tower/src/util/ready.rs b/vendor/tower/src/util/ready.rs
new file mode 100644
index 00000000..750db872
--- /dev/null
+++ b/vendor/tower/src/util/ready.rs
@@ -0,0 +1,103 @@
+use std::{fmt, marker::PhantomData};
+
+use futures_core::ready;
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tower_service::Service;
+
+/// A [`Future`] that yields the service when it is ready to accept a request.
+///
+/// [`ReadyOneshot`] values are produced by [`ServiceExt::ready_oneshot`].
+///
+/// [`ServiceExt::ready_oneshot`]: crate::util::ServiceExt::ready_oneshot
+pub struct ReadyOneshot<T, Request> {
+ inner: Option<T>,
+ _p: PhantomData<fn() -> Request>,
+}
+
+// Safety: This is safe because `Services`'s are always `Unpin`.
+impl<T, Request> Unpin for ReadyOneshot<T, Request> {}
+
+impl<T, Request> ReadyOneshot<T, Request>
+where
+ T: Service<Request>,
+{
+ #[allow(missing_docs)]
+ pub const fn new(service: T) -> Self {
+ Self {
+ inner: Some(service),
+ _p: PhantomData,
+ }
+ }
+}
+
+impl<T, Request> Future for ReadyOneshot<T, Request>
+where
+ T: Service<Request>,
+{
+ type Output = Result<T, T::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ ready!(self
+ .inner
+ .as_mut()
+ .expect("poll after Poll::Ready")
+ .poll_ready(cx))?;
+
+ Poll::Ready(Ok(self.inner.take().expect("poll after Poll::Ready")))
+ }
+}
+
+impl<T, Request> fmt::Debug for ReadyOneshot<T, Request>
+where
+ T: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("ReadyOneshot")
+ .field("inner", &self.inner)
+ .finish()
+ }
+}
+
+/// A future that yields a mutable reference to the service when it is ready to accept a request.
+///
+/// [`Ready`] values are produced by [`ServiceExt::ready`].
+///
+/// [`ServiceExt::ready`]: crate::util::ServiceExt::ready
+pub struct Ready<'a, T, Request>(ReadyOneshot<&'a mut T, Request>);
+
+// Safety: This is safe for the same reason that the impl for ReadyOneshot is safe.
+impl<'a, T, Request> Unpin for Ready<'a, T, Request> {}
+
+impl<'a, T, Request> Ready<'a, T, Request>
+where
+ T: Service<Request>,
+{
+ #[allow(missing_docs)]
+ pub fn new(service: &'a mut T) -> Self {
+ Self(ReadyOneshot::new(service))
+ }
+}
+
+impl<'a, T, Request> Future for Ready<'a, T, Request>
+where
+ T: Service<Request>,
+{
+ type Output = Result<&'a mut T, T::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut self.0).poll(cx)
+ }
+}
+
+impl<'a, T, Request> fmt::Debug for Ready<'a, T, Request>
+where
+ T: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_tuple("Ready").field(&self.0).finish()
+ }
+}
diff --git a/vendor/tower/src/util/rng.rs b/vendor/tower/src/util/rng.rs
new file mode 100644
index 00000000..5b2f9fce
--- /dev/null
+++ b/vendor/tower/src/util/rng.rs
@@ -0,0 +1,181 @@
+//! [PRNG] utilities for tower middleware.
+//!
+//! This module provides a generic [`Rng`] trait and a [`HasherRng`] that
+//! implements the trait based on [`RandomState`] or any other [`Hasher`].
+//!
+//! These utilities replace tower's internal usage of `rand` with these smaller,
+//! more lightweight methods. Most of the implementations are extracted from
+//! their corresponding `rand` implementations.
+//!
+//! [PRNG]: https://en.wikipedia.org/wiki/Pseudorandom_number_generator
+
+use std::{
+ collections::hash_map::RandomState,
+ hash::{BuildHasher, Hasher},
+ ops::Range,
+};
+
+/// A simple [PRNG] trait for use within tower middleware.
+///
+/// [PRNG]: https://en.wikipedia.org/wiki/Pseudorandom_number_generator
+pub trait Rng {
+ /// Generate a random [`u64`].
+ fn next_u64(&mut self) -> u64;
+
+ /// Generate a random [`f64`] between `[0, 1)`.
+ fn next_f64(&mut self) -> f64 {
+ // Borrowed from:
+ // https://github.com/rust-random/rand/blob/master/src/distributions/float.rs#L106
+ let float_size = std::mem::size_of::<f64>() as u32 * 8;
+ let precision = 52 + 1;
+ let scale = 1.0 / ((1u64 << precision) as f64);
+
+ let value = self.next_u64();
+ let value = value >> (float_size - precision);
+
+ scale * value as f64
+ }
+
+ /// Randomly pick a value within the range.
+ ///
+ /// # Panic
+ ///
+ /// - If start < end this will panic in debug mode.
+ fn next_range(&mut self, range: Range<u64>) -> u64 {
+ debug_assert!(
+ range.start < range.end,
+ "The range start must be smaller than the end"
+ );
+ let start = range.start;
+ let end = range.end;
+
+ let range = end - start;
+
+ let n = self.next_u64();
+
+ (n % range) + start
+ }
+}
+
+impl<R: Rng + ?Sized> Rng for Box<R> {
+ fn next_u64(&mut self) -> u64 {
+ (**self).next_u64()
+ }
+}
+
+/// A [`Rng`] implementation that uses a [`Hasher`] to generate the random
+/// values. The implementation uses an internal counter to pass to the hasher
+/// for each iteration of [`Rng::next_u64`].
+///
+/// # Default
+///
+/// This hasher has a default type of [`RandomState`] which just uses the
+/// libstd method of getting a random u64.
+#[derive(Clone, Debug)]
+pub struct HasherRng<H = RandomState> {
+ hasher: H,
+ counter: u64,
+}
+
+impl HasherRng {
+ /// Create a new default [`HasherRng`].
+ pub fn new() -> Self {
+ HasherRng::default()
+ }
+}
+
+impl Default for HasherRng {
+ fn default() -> Self {
+ HasherRng::with_hasher(RandomState::default())
+ }
+}
+
+impl<H> HasherRng<H> {
+ /// Create a new [`HasherRng`] with the provided hasher.
+ pub fn with_hasher(hasher: H) -> Self {
+ HasherRng { hasher, counter: 0 }
+ }
+}
+
+impl<H> Rng for HasherRng<H>
+where
+ H: BuildHasher,
+{
+ fn next_u64(&mut self) -> u64 {
+ let mut hasher = self.hasher.build_hasher();
+ hasher.write_u64(self.counter);
+ self.counter = self.counter.wrapping_add(1);
+ hasher.finish()
+ }
+}
+
+/// A sampler modified from the Rand implementation for use internally for the balance middleware.
+///
+/// It's an implementation of Floyd's combination algorithm with amount fixed at 2. This uses no allocated
+/// memory and finishes in constant time (only 2 random calls).
+///
+/// ref: This was borrowed and modified from the following Rand implementation
+/// https://github.com/rust-random/rand/blob/b73640705d6714509f8ceccc49e8df996fa19f51/src/seq/index.rs#L375-L411
+#[cfg(feature = "balance")]
+pub(crate) fn sample_floyd2<R: Rng>(rng: &mut R, length: u64) -> [u64; 2] {
+ debug_assert!(2 <= length);
+ let aidx = rng.next_range(0..length - 1);
+ let bidx = rng.next_range(0..length);
+ let aidx = if aidx == bidx { length - 1 } else { aidx };
+ [aidx, bidx]
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use quickcheck::*;
+
+ quickcheck! {
+ fn next_f64(counter: u64) -> TestResult {
+ let mut rng = HasherRng::default();
+ rng.counter = counter;
+ let n = rng.next_f64();
+
+ TestResult::from_bool(n < 1.0 && n >= 0.0)
+ }
+
+ fn next_range(counter: u64, range: Range<u64>) -> TestResult {
+ if range.start >= range.end{
+ return TestResult::discard();
+ }
+
+ let mut rng = HasherRng::default();
+ rng.counter = counter;
+
+ let n = rng.next_range(range.clone());
+
+ TestResult::from_bool(n >= range.start && (n < range.end || range.start == range.end))
+ }
+
+ fn sample_floyd2(counter: u64, length: u64) -> TestResult {
+ if length < 2 || length > 256 {
+ return TestResult::discard();
+ }
+
+ let mut rng = HasherRng::default();
+ rng.counter = counter;
+
+ let [a, b] = super::sample_floyd2(&mut rng, length);
+
+ if a >= length || b >= length || a == b {
+ return TestResult::failed();
+ }
+
+ TestResult::passed()
+ }
+ }
+
+ #[test]
+ fn sample_inplace_boundaries() {
+ let mut r = HasherRng::default();
+ match super::sample_floyd2(&mut r, 2) {
+ [0, 1] | [1, 0] => (),
+ array => panic!("unexpected inplace boundaries: {:?}", array),
+ }
+ }
+}
diff --git a/vendor/tower/src/util/service_fn.rs b/vendor/tower/src/util/service_fn.rs
new file mode 100644
index 00000000..d6e6be87
--- /dev/null
+++ b/vendor/tower/src/util/service_fn.rs
@@ -0,0 +1,82 @@
+use std::fmt;
+use std::future::Future;
+use std::task::{Context, Poll};
+use tower_service::Service;
+
+/// Returns a new [`ServiceFn`] with the given closure.
+///
+/// This lets you build a [`Service`] from an async function that returns a [`Result`].
+///
+/// # Example
+///
+/// ```
+/// use tower::{service_fn, Service, ServiceExt, BoxError};
+/// # struct Request;
+/// # impl Request {
+/// # fn new() -> Self { Self }
+/// # }
+/// # struct Response(&'static str);
+/// # impl Response {
+/// # fn new(body: &'static str) -> Self {
+/// # Self(body)
+/// # }
+/// # fn into_body(self) -> &'static str { self.0 }
+/// # }
+///
+/// # #[tokio::main]
+/// # async fn main() -> Result<(), BoxError> {
+/// async fn handle(request: Request) -> Result<Response, BoxError> {
+/// let response = Response::new("Hello, World!");
+/// Ok(response)
+/// }
+///
+/// let mut service = service_fn(handle);
+///
+/// let response = service
+/// .ready()
+/// .await?
+/// .call(Request::new())
+/// .await?;
+///
+/// assert_eq!("Hello, World!", response.into_body());
+/// #
+/// # Ok(())
+/// # }
+/// ```
+pub fn service_fn<T>(f: T) -> ServiceFn<T> {
+ ServiceFn { f }
+}
+
+/// A [`Service`] implemented by a closure.
+///
+/// See [`service_fn`] for more details.
+#[derive(Copy, Clone)]
+pub struct ServiceFn<T> {
+ f: T,
+}
+
+impl<T> fmt::Debug for ServiceFn<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ServiceFn")
+ .field("f", &format_args!("{}", std::any::type_name::<T>()))
+ .finish()
+ }
+}
+
+impl<T, F, Request, R, E> Service<Request> for ServiceFn<T>
+where
+ T: FnMut(Request) -> F,
+ F: Future<Output = Result<R, E>>,
+{
+ type Response = R;
+ type Error = E;
+ type Future = F;
+
+ fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), E>> {
+ Ok(()).into()
+ }
+
+ fn call(&mut self, req: Request) -> Self::Future {
+ (self.f)(req)
+ }
+}
diff --git a/vendor/tower/src/util/then.rs b/vendor/tower/src/util/then.rs
new file mode 100644
index 00000000..5e934506
--- /dev/null
+++ b/vendor/tower/src/util/then.rs
@@ -0,0 +1,103 @@
+use futures_util::{future, FutureExt};
+use std::{
+ fmt,
+ future::Future,
+ task::{Context, Poll},
+};
+use tower_layer::Layer;
+use tower_service::Service;
+
+/// [`Service`] returned by the [`then`] combinator.
+///
+/// [`then`]: crate::util::ServiceExt::then
+#[derive(Clone)]
+pub struct Then<S, F> {
+ inner: S,
+ f: F,
+}
+
+impl<S, F> fmt::Debug for Then<S, F>
+where
+ S: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Then")
+ .field("inner", &self.inner)
+ .field("f", &format_args!("{}", std::any::type_name::<F>()))
+ .finish()
+ }
+}
+
+/// A [`Layer`] that produces a [`Then`] service.
+///
+/// [`Layer`]: tower_layer::Layer
+#[derive(Debug, Clone)]
+pub struct ThenLayer<F> {
+ f: F,
+}
+
+impl<S, F> Then<S, F> {
+ /// Creates a new `Then` service.
+ pub const fn new(inner: S, f: F) -> Self {
+ Then { f, inner }
+ }
+
+ /// Returns a new [`Layer`] that produces [`Then`] services.
+ ///
+ /// This is a convenience function that simply calls [`ThenLayer::new`].
+ ///
+ /// [`Layer`]: tower_layer::Layer
+ pub fn layer(f: F) -> ThenLayer<F> {
+ ThenLayer { f }
+ }
+}
+
+opaque_future! {
+ /// Response future from [`Then`] services.
+ ///
+ /// [`Then`]: crate::util::Then
+ pub type ThenFuture<F1, F2, N> = future::Then<F1, F2, N>;
+}
+
+impl<S, F, Request, Response, Error, Fut> Service<Request> for Then<S, F>
+where
+ S: Service<Request>,
+ S::Error: Into<Error>,
+ F: FnOnce(Result<S::Response, S::Error>) -> Fut + Clone,
+ Fut: Future<Output = Result<Response, Error>>,
+{
+ type Response = Response;
+ type Error = Error;
+ type Future = ThenFuture<S::Future, Fut, F>;
+
+ #[inline]
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner.poll_ready(cx).map_err(Into::into)
+ }
+
+ #[inline]
+ fn call(&mut self, request: Request) -> Self::Future {
+ ThenFuture::new(self.inner.call(request).then(self.f.clone()))
+ }
+}
+
+impl<F> ThenLayer<F> {
+ /// Creates a new [`ThenLayer`] layer.
+ pub const fn new(f: F) -> Self {
+ ThenLayer { f }
+ }
+}
+
+impl<S, F> Layer<S> for ThenLayer<F>
+where
+ F: Clone,
+{
+ type Service = Then<S, F>;
+
+ fn layer(&self, inner: S) -> Self::Service {
+ Then {
+ f: self.f.clone(),
+ inner,
+ }
+ }
+}