diff options
Diffstat (limited to 'vendor/tower/src/util')
29 files changed, 3939 insertions, 0 deletions
diff --git a/vendor/tower/src/util/and_then.rs b/vendor/tower/src/util/and_then.rs new file mode 100644 index 00000000..adb9ada7 --- /dev/null +++ b/vendor/tower/src/util/and_then.rs @@ -0,0 +1,130 @@ +use futures_core::TryFuture; +use futures_util::{future, TryFutureExt}; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower_layer::Layer; +use tower_service::Service; + +/// Service returned by the [`and_then`] combinator. +/// +/// [`and_then`]: crate::util::ServiceExt::and_then +#[derive(Clone)] +pub struct AndThen<S, F> { + inner: S, + f: F, +} + +impl<S, F> fmt::Debug for AndThen<S, F> +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AndThen") + .field("inner", &self.inner) + .field("f", &format_args!("{}", std::any::type_name::<F>())) + .finish() + } +} + +pin_project_lite::pin_project! { + /// Response future from [`AndThen`] services. + /// + /// [`AndThen`]: crate::util::AndThen + pub struct AndThenFuture<F1, F2: TryFuture, N> { + #[pin] + inner: future::AndThen<future::ErrInto<F1, F2::Error>, F2, N>, + } +} + +impl<F1, F2: TryFuture, N> AndThenFuture<F1, F2, N> { + pub(crate) fn new(inner: future::AndThen<future::ErrInto<F1, F2::Error>, F2, N>) -> Self { + Self { inner } + } +} + +impl<F1, F2: TryFuture, N> std::fmt::Debug for AndThenFuture<F1, F2, N> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("AndThenFuture") + .field(&format_args!("...")) + .finish() + } +} + +impl<F1, F2: TryFuture, N> Future for AndThenFuture<F1, F2, N> +where + future::AndThen<future::ErrInto<F1, F2::Error>, F2, N>: Future, +{ + type Output = <future::AndThen<future::ErrInto<F1, F2::Error>, F2, N> as Future>::Output; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.project().inner.poll(cx) + } +} + +/// A [`Layer`] that produces a [`AndThen`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Clone, Debug)] +pub struct AndThenLayer<F> { + f: F, +} + +impl<S, F> AndThen<S, F> { + /// Creates a new `AndThen` service. + pub const fn new(inner: S, f: F) -> Self { + AndThen { f, inner } + } + + /// Returns a new [`Layer`] that produces [`AndThen`] services. + /// + /// This is a convenience function that simply calls [`AndThenLayer::new`]. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(f: F) -> AndThenLayer<F> { + AndThenLayer { f } + } +} + +impl<S, F, Request, Fut> Service<Request> for AndThen<S, F> +where + S: Service<Request>, + S::Error: Into<Fut::Error>, + F: FnOnce(S::Response) -> Fut + Clone, + Fut: TryFuture, +{ + type Response = Fut::Ok; + type Error = Fut::Error; + type Future = AndThenFuture<S::Future, Fut, F>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, request: Request) -> Self::Future { + AndThenFuture::new(self.inner.call(request).err_into().and_then(self.f.clone())) + } +} + +impl<F> AndThenLayer<F> { + /// Creates a new [`AndThenLayer`] layer. + pub const fn new(f: F) -> Self { + AndThenLayer { f } + } +} + +impl<S, F> Layer<S> for AndThenLayer<F> +where + F: Clone, +{ + type Service = AndThen<S, F>; + + fn layer(&self, inner: S) -> Self::Service { + AndThen { + f: self.f.clone(), + inner, + } + } +} diff --git a/vendor/tower/src/util/boxed/layer.rs b/vendor/tower/src/util/boxed/layer.rs new file mode 100644 index 00000000..34e65fa4 --- /dev/null +++ b/vendor/tower/src/util/boxed/layer.rs @@ -0,0 +1,97 @@ +use crate::util::BoxService; +use std::{fmt, sync::Arc}; +use tower_layer::{layer_fn, Layer}; +use tower_service::Service; + +/// A boxed [`Layer`] trait object. +/// +/// [`BoxLayer`] turns a layer into a trait object, allowing both the [`Layer`] itself +/// and the output [`Service`] to be dynamic, while having consistent types. +/// +/// This [`Layer`] produces [`BoxService`] instances erasing the type of the +/// [`Service`] produced by the wrapped [`Layer`]. +/// +/// # Example +/// +/// `BoxLayer` can, for example, be useful to create layers dynamically that otherwise wouldn't have +/// the same types. In this example, we include a [`Timeout`] layer +/// only if an environment variable is set. We can use `BoxLayer` +/// to return a consistent type regardless of runtime configuration: +/// +/// ``` +/// use std::time::Duration; +/// use tower::{Service, ServiceBuilder, BoxError, util::BoxLayer}; +/// +/// fn common_layer<S, T>() -> BoxLayer<S, T, S::Response, BoxError> +/// where +/// S: Service<T> + Send + 'static, +/// S::Future: Send + 'static, +/// S::Error: Into<BoxError> + 'static, +/// { +/// let builder = ServiceBuilder::new() +/// .concurrency_limit(100); +/// +/// if std::env::var("SET_TIMEOUT").is_ok() { +/// let layer = builder +/// .timeout(Duration::from_secs(30)) +/// .into_inner(); +/// +/// BoxLayer::new(layer) +/// } else { +/// let layer = builder +/// .map_err(Into::into) +/// .into_inner(); +/// +/// BoxLayer::new(layer) +/// } +/// } +/// ``` +/// +/// [`Layer`]: tower_layer::Layer +/// [`Service`]: tower_service::Service +/// [`BoxService`]: super::BoxService +/// [`Timeout`]: crate::timeout +pub struct BoxLayer<In, T, U, E> { + boxed: Arc<dyn Layer<In, Service = BoxService<T, U, E>> + Send + Sync + 'static>, +} + +impl<In, T, U, E> BoxLayer<In, T, U, E> { + /// Create a new [`BoxLayer`]. + pub fn new<L>(inner_layer: L) -> Self + where + L: Layer<In> + Send + Sync + 'static, + L::Service: Service<T, Response = U, Error = E> + Send + 'static, + <L::Service as Service<T>>::Future: Send + 'static, + { + let layer = layer_fn(move |inner: In| { + let out = inner_layer.layer(inner); + BoxService::new(out) + }); + + Self { + boxed: Arc::new(layer), + } + } +} + +impl<In, T, U, E> Layer<In> for BoxLayer<In, T, U, E> { + type Service = BoxService<T, U, E>; + + fn layer(&self, inner: In) -> Self::Service { + self.boxed.layer(inner) + } +} + +impl<In, T, U, E> Clone for BoxLayer<In, T, U, E> { + fn clone(&self) -> Self { + Self { + boxed: Arc::clone(&self.boxed), + } + } +} + +impl<In, T, U, E> fmt::Debug for BoxLayer<In, T, U, E> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BoxLayer").finish() + } +} diff --git a/vendor/tower/src/util/boxed/layer_clone.rs b/vendor/tower/src/util/boxed/layer_clone.rs new file mode 100644 index 00000000..1f626899 --- /dev/null +++ b/vendor/tower/src/util/boxed/layer_clone.rs @@ -0,0 +1,128 @@ +use crate::util::BoxCloneService; +use std::{fmt, sync::Arc}; +use tower_layer::{layer_fn, Layer}; +use tower_service::Service; + +/// A [`Clone`] + [`Send`] boxed [`Layer`]. +/// +/// [`BoxCloneServiceLayer`] turns a layer into a trait object, allowing both the [`Layer`] itself +/// and the output [`Service`] to be dynamic, while having consistent types. +/// +/// This [`Layer`] produces [`BoxCloneService`] instances erasing the type of the +/// [`Service`] produced by the wrapped [`Layer`]. +/// +/// This is similar to [`BoxLayer`](super::BoxLayer) except the layer and resulting +/// service implements [`Clone`]. +/// +/// # Example +/// +/// `BoxCloneServiceLayer` can, for example, be useful to create layers dynamically that otherwise wouldn't have +/// the same types, when the underlying service must be clone (for example, when building a MakeService) +/// In this example, we include a [`Timeout`] layer only if an environment variable is set. We can use +/// `BoxCloneService` to return a consistent type regardless of runtime configuration: +/// +/// ``` +/// use std::time::Duration; +/// use tower::{Service, ServiceBuilder, BoxError}; +/// use tower::util::{BoxCloneServiceLayer, BoxCloneService}; +/// +/// # +/// # struct Request; +/// # struct Response; +/// # impl Response { +/// # fn new() -> Self { Self } +/// # } +/// +/// fn common_layer<S, T>() -> BoxCloneServiceLayer<S, T, S::Response, BoxError> +/// where +/// S: Service<T> + Clone + Send + 'static, +/// S::Future: Send + 'static, +/// S::Error: Into<BoxError> + 'static, +/// { +/// let builder = ServiceBuilder::new() +/// .concurrency_limit(100); +/// +/// if std::env::var("SET_TIMEOUT").is_ok() { +/// let layer = builder +/// .timeout(Duration::from_secs(30)) +/// .into_inner(); +/// +/// BoxCloneServiceLayer::new(layer) +/// } else { +/// let layer = builder +/// .map_err(Into::into) +/// .into_inner(); +/// +/// BoxCloneServiceLayer::new(layer) +/// } +/// } +/// +/// // We can clone the layer (this is true of BoxLayer as well) +/// let boxed_clone_layer = common_layer(); +/// +/// let cloned_layer = boxed_clone_layer.clone(); +/// +/// // Using the `BoxCloneServiceLayer` we can create a `BoxCloneService` +/// let service: BoxCloneService<Request, Response, BoxError> = ServiceBuilder::new().layer(boxed_clone_layer) +/// .service_fn(|req: Request| async { +/// Ok::<_, BoxError>(Response::new()) +/// }); +/// +/// # let service = assert_service(service); +/// +/// // And we can still clone the service +/// let cloned_service = service.clone(); +/// # +/// # fn assert_service<S, R>(svc: S) -> S +/// # where S: Service<R> { svc } +/// +/// ``` +/// +/// [`Layer`]: tower_layer::Layer +/// [`Service`]: tower_service::Service +/// [`BoxService`]: super::BoxService +/// [`Timeout`]: crate::timeout +pub struct BoxCloneServiceLayer<In, T, U, E> { + boxed: Arc<dyn Layer<In, Service = BoxCloneService<T, U, E>> + Send + Sync + 'static>, +} + +impl<In, T, U, E> BoxCloneServiceLayer<In, T, U, E> { + /// Create a new [`BoxCloneServiceLayer`]. + pub fn new<L>(inner_layer: L) -> Self + where + L: Layer<In> + Send + Sync + 'static, + L::Service: Service<T, Response = U, Error = E> + Send + Clone + 'static, + <L::Service as Service<T>>::Future: Send + 'static, + { + let layer = layer_fn(move |inner: In| { + let out = inner_layer.layer(inner); + BoxCloneService::new(out) + }); + + Self { + boxed: Arc::new(layer), + } + } +} + +impl<In, T, U, E> Layer<In> for BoxCloneServiceLayer<In, T, U, E> { + type Service = BoxCloneService<T, U, E>; + + fn layer(&self, inner: In) -> Self::Service { + self.boxed.layer(inner) + } +} + +impl<In, T, U, E> Clone for BoxCloneServiceLayer<In, T, U, E> { + fn clone(&self) -> Self { + Self { + boxed: Arc::clone(&self.boxed), + } + } +} + +impl<In, T, U, E> fmt::Debug for BoxCloneServiceLayer<In, T, U, E> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BoxCloneServiceLayer").finish() + } +} diff --git a/vendor/tower/src/util/boxed/layer_clone_sync.rs b/vendor/tower/src/util/boxed/layer_clone_sync.rs new file mode 100644 index 00000000..950e66be --- /dev/null +++ b/vendor/tower/src/util/boxed/layer_clone_sync.rs @@ -0,0 +1,129 @@ +use std::{fmt, sync::Arc}; +use tower_layer::{layer_fn, Layer}; +use tower_service::Service; + +use crate::util::BoxCloneSyncService; + +/// A [`Clone`] + [`Send`] + [`Sync`] boxed [`Layer`]. +/// +/// [`BoxCloneSyncServiceLayer`] turns a layer into a trait object, allowing both the [`Layer`] itself +/// and the output [`Service`] to be dynamic, while having consistent types. +/// +/// This [`Layer`] produces [`BoxCloneSyncService`] instances erasing the type of the +/// [`Service`] produced by the wrapped [`Layer`]. +/// +/// This is similar to [`BoxCloneServiceLayer`](super::BoxCloneServiceLayer) except the layer and resulting +/// service implements [`Sync`]. +/// +/// # Example +/// +/// `BoxCloneSyncServiceLayer` can, for example, be useful to create layers dynamically that otherwise wouldn't have +/// the same types, when the underlying service must be clone and sync (for example, when building a Hyper connector). +/// In this example, we include a [`Timeout`] layer only if an environment variable is set. We can use +/// `BoxCloneSyncServiceLayer` to return a consistent type regardless of runtime configuration: +/// +/// ``` +/// use std::time::Duration; +/// use tower::{Service, ServiceBuilder, BoxError}; +/// use tower::util::{BoxCloneSyncServiceLayer, BoxCloneSyncService}; +/// +/// # +/// # struct Request; +/// # struct Response; +/// # impl Response { +/// # fn new() -> Self { Self } +/// # } +/// +/// fn common_layer<S, T>() -> BoxCloneSyncServiceLayer<S, T, S::Response, BoxError> +/// where +/// S: Service<T> + Clone + Send + Sync + 'static, +/// S::Future: Send + 'static, +/// S::Error: Into<BoxError> + 'static, +/// { +/// let builder = ServiceBuilder::new() +/// .concurrency_limit(100); +/// +/// if std::env::var("SET_TIMEOUT").is_ok() { +/// let layer = builder +/// .timeout(Duration::from_secs(30)) +/// .into_inner(); +/// +/// BoxCloneSyncServiceLayer::new(layer) +/// } else { +/// let layer = builder +/// .map_err(Into::into) +/// .into_inner(); +/// +/// BoxCloneSyncServiceLayer::new(layer) +/// } +/// } +/// +/// // We can clone the layer (this is true of BoxLayer as well) +/// let boxed_clone_sync_layer = common_layer(); +/// +/// let cloned_sync_layer = boxed_clone_sync_layer.clone(); +/// +/// // Using the `BoxCloneSyncServiceLayer` we can create a `BoxCloneSyncService` +/// let service: BoxCloneSyncService<Request, Response, BoxError> = ServiceBuilder::new().layer(cloned_sync_layer) +/// .service_fn(|req: Request| async { +/// Ok::<_, BoxError>(Response::new()) +/// }); +/// +/// # let service = assert_service(service); +/// +/// // And we can still clone the service +/// let cloned_service = service.clone(); +/// # +/// # fn assert_service<S, R>(svc: S) -> S +/// # where S: Service<R> { svc } +/// +/// ``` +/// +/// [`Layer`]: tower_layer::Layer +/// [`Service`]: tower_service::Service +/// [`BoxService`]: super::BoxService +/// [`Timeout`]: crate::timeout +pub struct BoxCloneSyncServiceLayer<In, T, U, E> { + boxed: Arc<dyn Layer<In, Service = BoxCloneSyncService<T, U, E>> + Send + Sync + 'static>, +} + +impl<In, T, U, E> BoxCloneSyncServiceLayer<In, T, U, E> { + /// Create a new [`BoxCloneSyncServiceLayer`]. + pub fn new<L>(inner_layer: L) -> Self + where + L: Layer<In> + Send + Sync + 'static, + L::Service: Service<T, Response = U, Error = E> + Send + Sync + Clone + 'static, + <L::Service as Service<T>>::Future: Send + 'static, + { + let layer = layer_fn(move |inner: In| { + let out = inner_layer.layer(inner); + BoxCloneSyncService::new(out) + }); + + Self { + boxed: Arc::new(layer), + } + } +} + +impl<In, T, U, E> Layer<In> for BoxCloneSyncServiceLayer<In, T, U, E> { + type Service = BoxCloneSyncService<T, U, E>; + + fn layer(&self, inner: In) -> Self::Service { + self.boxed.layer(inner) + } +} + +impl<In, T, U, E> Clone for BoxCloneSyncServiceLayer<In, T, U, E> { + fn clone(&self) -> Self { + Self { + boxed: Arc::clone(&self.boxed), + } + } +} + +impl<In, T, U, E> fmt::Debug for BoxCloneSyncServiceLayer<In, T, U, E> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BoxCloneSyncServiceLayer").finish() + } +} diff --git a/vendor/tower/src/util/boxed/mod.rs b/vendor/tower/src/util/boxed/mod.rs new file mode 100644 index 00000000..7da5d63c --- /dev/null +++ b/vendor/tower/src/util/boxed/mod.rs @@ -0,0 +1,11 @@ +mod layer; +mod layer_clone; +mod layer_clone_sync; +mod sync; +mod unsync; + +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::{ + layer::BoxLayer, layer_clone::BoxCloneServiceLayer, layer_clone_sync::BoxCloneSyncServiceLayer, + sync::BoxService, unsync::UnsyncBoxService, +}; diff --git a/vendor/tower/src/util/boxed/sync.rs b/vendor/tower/src/util/boxed/sync.rs new file mode 100644 index 00000000..57dcfec7 --- /dev/null +++ b/vendor/tower/src/util/boxed/sync.rs @@ -0,0 +1,111 @@ +use crate::ServiceExt; +use tower_layer::{layer_fn, LayerFn}; +use tower_service::Service; + +use sync_wrapper::SyncWrapper; + +use std::fmt; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// A boxed `Service + Send` trait object. +/// +/// [`BoxService`] turns a service into a trait object, allowing the response +/// future type to be dynamic. This type requires both the service and the +/// response future to be [`Send`]. +/// +/// If you need a boxed [`Service`] that implements [`Clone`] consider using +/// [`BoxCloneService`](crate::util::BoxCloneService). +/// +/// Dynamically dispatched [`Service`] objects allow for erasing the underlying +/// [`Service`] type and using the `Service` instances as opaque handles. This can +/// be useful when the service instance cannot be explicitly named for whatever +/// reason. +/// +/// # Examples +/// +/// ``` +/// use futures_util::future::ready; +/// # use tower_service::Service; +/// # use tower::util::{BoxService, service_fn}; +/// // Respond to requests using a closure, but closures cannot be named... +/// # pub fn main() { +/// let svc = service_fn(|mut request: String| { +/// request.push_str(" response"); +/// ready(Ok(request)) +/// }); +/// +/// let service: BoxService<String, String, ()> = BoxService::new(svc); +/// # drop(service); +/// } +/// ``` +/// +/// [`Service`]: crate::Service +/// [`Rc`]: std::rc::Rc +pub struct BoxService<T, U, E> { + inner: + SyncWrapper<Box<dyn Service<T, Response = U, Error = E, Future = BoxFuture<U, E>> + Send>>, +} + +/// A boxed `Future + Send` trait object. +/// +/// This type alias represents a boxed future that is [`Send`] and can be moved +/// across threads. +type BoxFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>>; + +impl<T, U, E> BoxService<T, U, E> { + #[allow(missing_docs)] + pub fn new<S>(inner: S) -> Self + where + S: Service<T, Response = U, Error = E> + Send + 'static, + S::Future: Send + 'static, + { + // rust can't infer the type + let inner: Box<dyn Service<T, Response = U, Error = E, Future = BoxFuture<U, E>> + Send> = + Box::new(inner.map_future(|f: S::Future| Box::pin(f) as _)); + let inner = SyncWrapper::new(inner); + BoxService { inner } + } + + /// Returns a [`Layer`] for wrapping a [`Service`] in a [`BoxService`] + /// middleware. + /// + /// [`Layer`]: crate::Layer + pub fn layer<S>() -> LayerFn<fn(S) -> Self> + where + S: Service<T, Response = U, Error = E> + Send + 'static, + S::Future: Send + 'static, + { + layer_fn(Self::new) + } +} + +impl<T, U, E> Service<T> for BoxService<T, U, E> { + type Response = U; + type Error = E; + type Future = BoxFuture<U, E>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> { + self.inner.get_mut().poll_ready(cx) + } + + fn call(&mut self, request: T) -> BoxFuture<U, E> { + self.inner.get_mut().call(request) + } +} + +impl<T, U, E> fmt::Debug for BoxService<T, U, E> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BoxService").finish() + } +} + +#[test] +fn is_sync() { + fn assert_sync<T: Sync>() {} + + assert_sync::<BoxService<(), (), ()>>(); +} diff --git a/vendor/tower/src/util/boxed/unsync.rs b/vendor/tower/src/util/boxed/unsync.rs new file mode 100644 index 00000000..f645f169 --- /dev/null +++ b/vendor/tower/src/util/boxed/unsync.rs @@ -0,0 +1,86 @@ +use tower_layer::{layer_fn, LayerFn}; +use tower_service::Service; + +use std::fmt; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// A boxed [`Service`] trait object. +pub struct UnsyncBoxService<T, U, E> { + inner: Box<dyn Service<T, Response = U, Error = E, Future = UnsyncBoxFuture<U, E>>>, +} + +/// A boxed [`Future`] trait object. +/// +/// This type alias represents a boxed future that is *not* [`Send`] and must +/// remain on the current thread. +type UnsyncBoxFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>>>>; + +#[derive(Debug)] +struct UnsyncBoxed<S> { + inner: S, +} + +impl<T, U, E> UnsyncBoxService<T, U, E> { + #[allow(missing_docs)] + pub fn new<S>(inner: S) -> Self + where + S: Service<T, Response = U, Error = E> + 'static, + S::Future: 'static, + { + let inner = Box::new(UnsyncBoxed { inner }); + UnsyncBoxService { inner } + } + + /// Returns a [`Layer`] for wrapping a [`Service`] in an [`UnsyncBoxService`] middleware. + /// + /// [`Layer`]: crate::Layer + pub fn layer<S>() -> LayerFn<fn(S) -> Self> + where + S: Service<T, Response = U, Error = E> + 'static, + S::Future: 'static, + { + layer_fn(Self::new) + } +} + +impl<T, U, E> Service<T> for UnsyncBoxService<T, U, E> { + type Response = U; + type Error = E; + type Future = UnsyncBoxFuture<U, E>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: T) -> UnsyncBoxFuture<U, E> { + self.inner.call(request) + } +} + +impl<T, U, E> fmt::Debug for UnsyncBoxService<T, U, E> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("UnsyncBoxService").finish() + } +} + +impl<S, Request> Service<Request> for UnsyncBoxed<S> +where + S: Service<Request> + 'static, + S::Future: 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = Pin<Box<dyn Future<Output = Result<S::Response, S::Error>>>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + Box::pin(self.inner.call(request)) + } +} diff --git a/vendor/tower/src/util/boxed_clone.rs b/vendor/tower/src/util/boxed_clone.rs new file mode 100644 index 00000000..1209fd2e --- /dev/null +++ b/vendor/tower/src/util/boxed_clone.rs @@ -0,0 +1,136 @@ +use super::ServiceExt; +use futures_util::future::BoxFuture; +use std::{ + fmt, + task::{Context, Poll}, +}; +use tower_layer::{layer_fn, LayerFn}; +use tower_service::Service; + +/// A [`Clone`] + [`Send`] boxed [`Service`]. +/// +/// [`BoxCloneService`] turns a service into a trait object, allowing the +/// response future type to be dynamic, and allowing the service to be cloned. +/// +/// This is similar to [`BoxService`](super::BoxService) except the resulting +/// service implements [`Clone`]. +/// +/// # Example +/// +/// ``` +/// use tower::{Service, ServiceBuilder, BoxError, util::BoxCloneService}; +/// use std::time::Duration; +/// # +/// # struct Request; +/// # struct Response; +/// # impl Response { +/// # fn new() -> Self { Self } +/// # } +/// +/// // This service has a complex type that is hard to name +/// let service = ServiceBuilder::new() +/// .map_request(|req| { +/// println!("received request"); +/// req +/// }) +/// .map_response(|res| { +/// println!("response produced"); +/// res +/// }) +/// .load_shed() +/// .concurrency_limit(64) +/// .timeout(Duration::from_secs(10)) +/// .service_fn(|req: Request| async { +/// Ok::<_, BoxError>(Response::new()) +/// }); +/// # let service = assert_service(service); +/// +/// // `BoxCloneService` will erase the type so it's nameable +/// let service: BoxCloneService<Request, Response, BoxError> = BoxCloneService::new(service); +/// # let service = assert_service(service); +/// +/// // And we can still clone the service +/// let cloned_service = service.clone(); +/// # +/// # fn assert_service<S, R>(svc: S) -> S +/// # where S: Service<R> { svc } +/// ``` +pub struct BoxCloneService<T, U, E>( + Box< + dyn CloneService<T, Response = U, Error = E, Future = BoxFuture<'static, Result<U, E>>> + + Send, + >, +); + +impl<T, U, E> BoxCloneService<T, U, E> { + /// Create a new `BoxCloneService`. + pub fn new<S>(inner: S) -> Self + where + S: Service<T, Response = U, Error = E> + Clone + Send + 'static, + S::Future: Send + 'static, + { + let inner = inner.map_future(|f| Box::pin(f) as _); + BoxCloneService(Box::new(inner)) + } + + /// Returns a [`Layer`] for wrapping a [`Service`] in a [`BoxCloneService`] + /// middleware. + /// + /// [`Layer`]: crate::Layer + pub fn layer<S>() -> LayerFn<fn(S) -> Self> + where + S: Service<T, Response = U, Error = E> + Clone + Send + 'static, + S::Future: Send + 'static, + { + layer_fn(Self::new) + } +} + +impl<T, U, E> Service<T> for BoxCloneService<T, U, E> { + type Response = U; + type Error = E; + type Future = BoxFuture<'static, Result<U, E>>; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> { + self.0.poll_ready(cx) + } + + #[inline] + fn call(&mut self, request: T) -> Self::Future { + self.0.call(request) + } +} + +impl<T, U, E> Clone for BoxCloneService<T, U, E> { + fn clone(&self) -> Self { + Self(self.0.clone_box()) + } +} + +trait CloneService<R>: Service<R> { + fn clone_box( + &self, + ) -> Box< + dyn CloneService<R, Response = Self::Response, Error = Self::Error, Future = Self::Future> + + Send, + >; +} + +impl<R, T> CloneService<R> for T +where + T: Service<R> + Send + Clone + 'static, +{ + fn clone_box( + &self, + ) -> Box<dyn CloneService<R, Response = T::Response, Error = T::Error, Future = T::Future> + Send> + { + Box::new(self.clone()) + } +} + +impl<T, U, E> fmt::Debug for BoxCloneService<T, U, E> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BoxCloneService").finish() + } +} diff --git a/vendor/tower/src/util/boxed_clone_sync.rs b/vendor/tower/src/util/boxed_clone_sync.rs new file mode 100644 index 00000000..d62e8ff2 --- /dev/null +++ b/vendor/tower/src/util/boxed_clone_sync.rs @@ -0,0 +1,101 @@ +use super::ServiceExt; +use futures_util::future::BoxFuture; +use std::{ + fmt, + task::{Context, Poll}, +}; +use tower_layer::{layer_fn, LayerFn}; +use tower_service::Service; + +/// A [`Clone`] + [`Send`] + [`Sync`] boxed [`Service`]. +/// +/// [`BoxCloneSyncService`] turns a service into a trait object, allowing the +/// response future type to be dynamic, and allowing the service to be cloned and shared. +/// +/// This is similar to [`BoxCloneService`](super::BoxCloneService) except the resulting +/// service implements [`Sync`]. +/// ``` +pub struct BoxCloneSyncService<T, U, E>( + Box< + dyn CloneService<T, Response = U, Error = E, Future = BoxFuture<'static, Result<U, E>>> + + Send + + Sync, + >, +); + +impl<T, U, E> BoxCloneSyncService<T, U, E> { + /// Create a new `BoxCloneSyncService`. + pub fn new<S>(inner: S) -> Self + where + S: Service<T, Response = U, Error = E> + Clone + Send + Sync + 'static, + S::Future: Send + 'static, + { + let inner = inner.map_future(|f| Box::pin(f) as _); + BoxCloneSyncService(Box::new(inner)) + } + + /// Returns a [`Layer`] for wrapping a [`Service`] in a [`BoxCloneSyncService`] + /// middleware. + /// + /// [`Layer`]: crate::Layer + pub fn layer<S>() -> LayerFn<fn(S) -> Self> + where + S: Service<T, Response = U, Error = E> + Clone + Send + Sync + 'static, + S::Future: Send + 'static, + { + layer_fn(Self::new) + } +} + +impl<T, U, E> Service<T> for BoxCloneSyncService<T, U, E> { + type Response = U; + type Error = E; + type Future = BoxFuture<'static, Result<U, E>>; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), E>> { + self.0.poll_ready(cx) + } + + #[inline] + fn call(&mut self, request: T) -> Self::Future { + self.0.call(request) + } +} + +impl<T, U, E> Clone for BoxCloneSyncService<T, U, E> { + fn clone(&self) -> Self { + Self(self.0.clone_box()) + } +} + +trait CloneService<R>: Service<R> { + fn clone_box( + &self, + ) -> Box< + dyn CloneService<R, Response = Self::Response, Error = Self::Error, Future = Self::Future> + + Send + + Sync, + >; +} + +impl<R, T> CloneService<R> for T +where + T: Service<R> + Send + Sync + Clone + 'static, +{ + fn clone_box( + &self, + ) -> Box< + dyn CloneService<R, Response = T::Response, Error = T::Error, Future = T::Future> + + Send + + Sync, + > { + Box::new(self.clone()) + } +} + +impl<T, U, E> fmt::Debug for BoxCloneSyncService<T, U, E> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("BoxCloneSyncService").finish() + } +} diff --git a/vendor/tower/src/util/call_all/common.rs b/vendor/tower/src/util/call_all/common.rs new file mode 100644 index 00000000..9f2490b6 --- /dev/null +++ b/vendor/tower/src/util/call_all/common.rs @@ -0,0 +1,141 @@ +use futures_core::{ready, Stream}; +use pin_project_lite::pin_project; +use std::{ + fmt, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower_service::Service; + +pin_project! { + /// The [`Future`] returned by the [`ServiceExt::call_all`] combinator. + pub(crate) struct CallAll<Svc, S, Q> + where + S: Stream, + { + service: Option<Svc>, + #[pin] + stream: S, + queue: Q, + eof: bool, + curr_req: Option<S::Item> + } +} + +impl<Svc, S, Q> fmt::Debug for CallAll<Svc, S, Q> +where + Svc: fmt::Debug, + S: Stream + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CallAll") + .field("service", &self.service) + .field("stream", &self.stream) + .field("eof", &self.eof) + .finish() + } +} + +pub(crate) trait Drive<F: Future> { + fn is_empty(&self) -> bool; + + fn push(&mut self, future: F); + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>; +} + +impl<Svc, S, Q> CallAll<Svc, S, Q> +where + Svc: Service<S::Item>, + S: Stream, + Q: Drive<Svc::Future>, +{ + pub(crate) const fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> { + CallAll { + service: Some(service), + stream, + queue, + eof: false, + curr_req: None, + } + } + + /// Extract the wrapped [`Service`]. + pub(crate) fn into_inner(mut self) -> Svc { + self.service.take().expect("Service already taken") + } + + /// Extract the wrapped [`Service`]. + pub(crate) fn take_service(self: Pin<&mut Self>) -> Svc { + self.project() + .service + .take() + .expect("Service already taken") + } + + pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> { + assert!(self.queue.is_empty() && !self.eof); + + super::CallAllUnordered::new(self.service.take().unwrap(), self.stream) + } +} + +impl<Svc, S, Q> Stream for CallAll<Svc, S, Q> +where + Svc: Service<S::Item>, + S: Stream, + Q: Drive<Svc::Future>, +{ + type Item = Result<Svc::Response, Svc::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + + loop { + // First, see if we have any responses to yield + if let Poll::Ready(r) = this.queue.poll(cx) { + if let Some(rsp) = r.transpose()? { + return Poll::Ready(Some(Ok(rsp))); + } + } + + // If there are no more requests coming, check if we're done + if *this.eof { + if this.queue.is_empty() { + return Poll::Ready(None); + } else { + return Poll::Pending; + } + } + + // If not done, and we don't have a stored request, gather the next request from the + // stream (if there is one), or return `Pending` if the stream is not ready. + if this.curr_req.is_none() { + *this.curr_req = match ready!(this.stream.as_mut().poll_next(cx)) { + Some(next_req) => Some(next_req), + None => { + // Mark that there will be no more requests. + *this.eof = true; + continue; + } + }; + } + + // Then, see that the service is ready for another request + let svc = this + .service + .as_mut() + .expect("Using CallAll after extracting inner Service"); + + if let Err(e) = ready!(svc.poll_ready(cx)) { + // Set eof to prevent the service from being called again after a `poll_ready` error + *this.eof = true; + return Poll::Ready(Some(Err(e))); + } + + // Unwrap: The check above always sets `this.curr_req` if none. + this.queue.push(svc.call(this.curr_req.take().unwrap())); + } + } +} diff --git a/vendor/tower/src/util/call_all/mod.rs b/vendor/tower/src/util/call_all/mod.rs new file mode 100644 index 00000000..0cac72d1 --- /dev/null +++ b/vendor/tower/src/util/call_all/mod.rs @@ -0,0 +1,11 @@ +//! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream]. +//! +//! [`Service<Request>`]: crate::Service +//! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html + +mod common; +mod ordered; +mod unordered; + +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::{ordered::CallAll, unordered::CallAllUnordered}; diff --git a/vendor/tower/src/util/call_all/ordered.rs b/vendor/tower/src/util/call_all/ordered.rs new file mode 100644 index 00000000..9a283916 --- /dev/null +++ b/vendor/tower/src/util/call_all/ordered.rs @@ -0,0 +1,177 @@ +//! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream]. +//! +//! [`Service<Request>`]: crate::Service +//! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html + +use super::common; +use futures_core::Stream; +use futures_util::stream::FuturesOrdered; +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower_service::Service; + +pin_project! { + /// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each + /// request received on the wrapped [`Stream`]. + /// + /// ```rust + /// # use std::task::{Poll, Context}; + /// # use std::cell::Cell; + /// # use std::error::Error; + /// # use std::rc::Rc; + /// # + /// use futures::future::{ready, Ready}; + /// use futures::StreamExt; + /// use futures::channel::mpsc; + /// use tower_service::Service; + /// use tower::util::ServiceExt; + /// + /// // First, we need to have a Service to process our requests. + /// #[derive(Debug, Eq, PartialEq)] + /// struct FirstLetter; + /// impl Service<&'static str> for FirstLetter { + /// type Response = &'static str; + /// type Error = Box<dyn Error + Send + Sync>; + /// type Future = Ready<Result<Self::Response, Self::Error>>; + /// + /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// Poll::Ready(Ok(())) + /// } + /// + /// fn call(&mut self, req: &'static str) -> Self::Future { + /// ready(Ok(&req[..1])) + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// // Next, we need a Stream of requests. + // TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams, + // tokio::sync::mpsc again? + /// let (mut reqs, rx) = mpsc::unbounded(); + /// // Note that we have to help Rust out here by telling it what error type to use. + /// // Specifically, it has to be From<Service::Error> + From<Stream::Error>. + /// let mut rsps = FirstLetter.call_all(rx); + /// + /// // Now, let's send a few requests and then check that we get the corresponding responses. + /// reqs.unbounded_send("one").unwrap(); + /// reqs.unbounded_send("two").unwrap(); + /// reqs.unbounded_send("three").unwrap(); + /// drop(reqs); + /// + /// // We then loop over the response `Stream` that we get back from call_all. + /// let mut i = 0usize; + /// while let Some(rsp) = rsps.next().await { + /// // Each response is a Result (we could also have used TryStream::try_next) + /// match (i + 1, rsp.unwrap()) { + /// (1, "o") | + /// (2, "t") | + /// (3, "t") => {} + /// (n, i) => { + /// unreachable!("{}. response was '{}'", n, i); + /// } + /// } + /// i += 1; + /// } + /// + /// // And at the end, we can get the Service back when there are no more requests. + /// assert_eq!(rsps.into_inner(), FirstLetter); + /// } + /// ``` + /// + /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html + #[derive(Debug)] + pub struct CallAll<Svc, S> + where + Svc: Service<S::Item>, + S: Stream, + { + #[pin] + inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>, + } +} + +impl<Svc, S> CallAll<Svc, S> +where + Svc: Service<S::Item>, + S: Stream, +{ + /// Create new [`CallAll`] combinator. + /// + /// Each request yielded by `stream` is passed to `svc`, and the resulting responses are + /// yielded in the same order by the implementation of [`Stream`] for [`CallAll`]. + /// + /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html + pub fn new(service: Svc, stream: S) -> CallAll<Svc, S> { + CallAll { + inner: common::CallAll::new(service, stream, FuturesOrdered::new()), + } + } + + /// Extract the wrapped [`Service`]. + /// + /// # Panics + /// + /// Panics if [`take_service`] was already called. + /// + /// [`take_service`]: crate::util::CallAll::take_service + pub fn into_inner(self) -> Svc { + self.inner.into_inner() + } + + /// Extract the wrapped [`Service`]. + /// + /// This [`CallAll`] can no longer be used after this function has been called. + /// + /// # Panics + /// + /// Panics if [`take_service`] was already called. + /// + /// [`take_service`]: crate::util::CallAll::take_service + pub fn take_service(self: Pin<&mut Self>) -> Svc { + self.project().inner.take_service() + } + + /// Return responses as they are ready, regardless of the initial order. + /// + /// This function must be called before the stream is polled. + /// + /// # Panics + /// + /// Panics if [`poll`] was called. + /// + /// [`poll`]: std::future::Future::poll + pub fn unordered(self) -> super::CallAllUnordered<Svc, S> { + self.inner.unordered() + } +} + +impl<Svc, S> Stream for CallAll<Svc, S> +where + Svc: Service<S::Item>, + S: Stream, +{ + type Item = Result<Svc::Response, Svc::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.project().inner.poll_next(cx) + } +} + +impl<F: Future> common::Drive<F> for FuturesOrdered<F> { + fn is_empty(&self) -> bool { + FuturesOrdered::is_empty(self) + } + + fn push(&mut self, future: F) { + FuturesOrdered::push_back(self, future) + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> { + Stream::poll_next(Pin::new(self), cx) + } +} diff --git a/vendor/tower/src/util/call_all/unordered.rs b/vendor/tower/src/util/call_all/unordered.rs new file mode 100644 index 00000000..3038932f --- /dev/null +++ b/vendor/tower/src/util/call_all/unordered.rs @@ -0,0 +1,98 @@ +//! [`Stream<Item = Request>`][stream] + [`Service<Request>`] => [`Stream<Item = Response>`][stream]. +//! +//! [`Service<Request>`]: crate::Service +//! [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html + +use super::common; +use futures_core::Stream; +use futures_util::stream::FuturesUnordered; +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower_service::Service; + +pin_project! { + /// A stream of responses received from the inner service in received order. + /// + /// Similar to [`CallAll`] except, instead of yielding responses in request order, + /// responses are returned as they are available. + /// + /// [`CallAll`]: crate::util::CallAll + #[derive(Debug)] + pub struct CallAllUnordered<Svc, S> + where + Svc: Service<S::Item>, + S: Stream, + { + #[pin] + inner: common::CallAll<Svc, S, FuturesUnordered<Svc::Future>>, + } +} + +impl<Svc, S> CallAllUnordered<Svc, S> +where + Svc: Service<S::Item>, + S: Stream, +{ + /// Create new [`CallAllUnordered`] combinator. + /// + /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html + pub fn new(service: Svc, stream: S) -> CallAllUnordered<Svc, S> { + CallAllUnordered { + inner: common::CallAll::new(service, stream, FuturesUnordered::new()), + } + } + + /// Extract the wrapped [`Service`]. + /// + /// # Panics + /// + /// Panics if [`take_service`] was already called. + /// + /// [`take_service`]: crate::util::CallAllUnordered::take_service + pub fn into_inner(self) -> Svc { + self.inner.into_inner() + } + + /// Extract the wrapped `Service`. + /// + /// This [`CallAllUnordered`] can no longer be used after this function has been called. + /// + /// # Panics + /// + /// Panics if [`take_service`] was already called. + /// + /// [`take_service`]: crate::util::CallAllUnordered::take_service + pub fn take_service(self: Pin<&mut Self>) -> Svc { + self.project().inner.take_service() + } +} + +impl<Svc, S> Stream for CallAllUnordered<Svc, S> +where + Svc: Service<S::Item>, + S: Stream, +{ + type Item = Result<Svc::Response, Svc::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.project().inner.poll_next(cx) + } +} + +impl<F: Future> common::Drive<F> for FuturesUnordered<F> { + fn is_empty(&self) -> bool { + FuturesUnordered::is_empty(self) + } + + fn push(&mut self, future: F) { + FuturesUnordered::push(self, future) + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> { + Stream::poll_next(Pin::new(self), cx) + } +} diff --git a/vendor/tower/src/util/either.rs b/vendor/tower/src/util/either.rs new file mode 100644 index 00000000..371abb4d --- /dev/null +++ b/vendor/tower/src/util/either.rs @@ -0,0 +1,103 @@ +//! Contains [`Either`] and related types and functions. +//! +//! See [`Either`] documentation for more details. + +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower_layer::Layer; +use tower_service::Service; + +/// Combine two different service types into a single type. +/// +/// Both services must be of the same request, response, and error types. +/// [`Either`] is useful for handling conditional branching in service middleware +/// to different inner service types. +#[derive(Clone, Copy, Debug)] +pub enum Either<A, B> { + #[allow(missing_docs)] + Left(A), + #[allow(missing_docs)] + Right(B), +} + +impl<A, B, Request> Service<Request> for Either<A, B> +where + A: Service<Request>, + B: Service<Request, Response = A::Response, Error = A::Error>, +{ + type Response = A::Response; + type Error = A::Error; + type Future = EitherResponseFuture<A::Future, B::Future>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + match self { + Either::Left(service) => service.poll_ready(cx), + Either::Right(service) => service.poll_ready(cx), + } + } + + fn call(&mut self, request: Request) -> Self::Future { + match self { + Either::Left(service) => EitherResponseFuture { + kind: Kind::Left { + inner: service.call(request), + }, + }, + Either::Right(service) => EitherResponseFuture { + kind: Kind::Right { + inner: service.call(request), + }, + }, + } + } +} + +pin_project! { + /// Response future for [`Either`]. + pub struct EitherResponseFuture<A, B> { + #[pin] + kind: Kind<A, B> + } +} + +pin_project! { + #[project = KindProj] + enum Kind<A, B> { + Left { #[pin] inner: A }, + Right { #[pin] inner: B }, + } +} + +impl<A, B> Future for EitherResponseFuture<A, B> +where + A: Future, + B: Future<Output = A::Output>, +{ + type Output = A::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match self.project().kind.project() { + KindProj::Left { inner } => inner.poll(cx), + KindProj::Right { inner } => inner.poll(cx), + } + } +} + +impl<S, A, B> Layer<S> for Either<A, B> +where + A: Layer<S>, + B: Layer<S>, +{ + type Service = Either<A::Service, B::Service>; + + fn layer(&self, inner: S) -> Self::Service { + match self { + Either::Left(layer) => Either::Left(layer.layer(inner)), + Either::Right(layer) => Either::Right(layer.layer(inner)), + } + } +} diff --git a/vendor/tower/src/util/future_service.rs b/vendor/tower/src/util/future_service.rs new file mode 100644 index 00000000..c0a36df2 --- /dev/null +++ b/vendor/tower/src/util/future_service.rs @@ -0,0 +1,215 @@ +use std::fmt; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower_service::Service; + +/// Returns a new [`FutureService`] for the given future. +/// +/// A [`FutureService`] allows you to treat a future that resolves to a service as a service. This +/// can be useful for services that are created asynchronously. +/// +/// # Example +/// ``` +/// use tower::{service_fn, Service, ServiceExt}; +/// use tower::util::future_service; +/// use std::convert::Infallible; +/// +/// # fn main() { +/// # async { +/// // A future which outputs a type implementing `Service`. +/// let future_of_a_service = async { +/// let svc = service_fn(|_req: ()| async { Ok::<_, Infallible>("ok") }); +/// Ok::<_, Infallible>(svc) +/// }; +/// +/// // Wrap the future with a `FutureService`, allowing it to be used +/// // as a service without awaiting the future's completion: +/// let mut svc = future_service(Box::pin(future_of_a_service)); +/// +/// // Now, when we wait for the service to become ready, it will +/// // drive the future to completion internally. +/// let svc = svc.ready().await.unwrap(); +/// let res = svc.call(()).await.unwrap(); +/// # }; +/// # } +/// ``` +/// +/// # Regarding the [`Unpin`] bound +/// +/// The [`Unpin`] bound on `F` is necessary because the future will be polled in +/// [`Service::poll_ready`] which doesn't have a pinned receiver (it takes `&mut self` and not `self: +/// Pin<&mut Self>`). So we cannot put the future into a `Pin` without requiring `Unpin`. +/// +/// This will most likely come up if you're calling `future_service` with an async block. In that +/// case you can use `Box::pin(async { ... })` as shown in the example. +pub fn future_service<F, S, R, E>(future: F) -> FutureService<F, S> +where + F: Future<Output = Result<S, E>> + Unpin, + S: Service<R, Error = E>, +{ + FutureService::new(future) +} + +/// A type that implements [`Service`] for a [`Future`] that produces a [`Service`]. +/// +/// See [`future_service`] for more details. +#[derive(Clone)] +pub struct FutureService<F, S> { + state: State<F, S>, +} + +impl<F, S> FutureService<F, S> { + /// Returns a new [`FutureService`] for the given future. + /// + /// A [`FutureService`] allows you to treat a future that resolves to a service as a service. This + /// can be useful for services that are created asynchronously. + /// + /// # Example + /// ``` + /// use tower::{service_fn, Service, ServiceExt}; + /// use tower::util::FutureService; + /// use std::convert::Infallible; + /// + /// # fn main() { + /// # async { + /// // A future which outputs a type implementing `Service`. + /// let future_of_a_service = async { + /// let svc = service_fn(|_req: ()| async { Ok::<_, Infallible>("ok") }); + /// Ok::<_, Infallible>(svc) + /// }; + /// + /// // Wrap the future with a `FutureService`, allowing it to be used + /// // as a service without awaiting the future's completion: + /// let mut svc = FutureService::new(Box::pin(future_of_a_service)); + /// + /// // Now, when we wait for the service to become ready, it will + /// // drive the future to completion internally. + /// let svc = svc.ready().await.unwrap(); + /// let res = svc.call(()).await.unwrap(); + /// # }; + /// # } + /// ``` + /// + /// # Regarding the [`Unpin`] bound + /// + /// The [`Unpin`] bound on `F` is necessary because the future will be polled in + /// [`Service::poll_ready`] which doesn't have a pinned receiver (it takes `&mut self` and not `self: + /// Pin<&mut Self>`). So we cannot put the future into a `Pin` without requiring `Unpin`. + /// + /// This will most likely come up if you're calling `future_service` with an async block. In that + /// case you can use `Box::pin(async { ... })` as shown in the example. + pub const fn new(future: F) -> Self { + Self { + state: State::Future(future), + } + } +} + +impl<F, S> fmt::Debug for FutureService<F, S> +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FutureService") + .field("state", &format_args!("{:?}", self.state)) + .finish() + } +} + +#[derive(Clone)] +enum State<F, S> { + Future(F), + Service(S), +} + +impl<F, S> fmt::Debug for State<F, S> +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + State::Future(_) => f + .debug_tuple("State::Future") + .field(&format_args!("<{}>", std::any::type_name::<F>())) + .finish(), + State::Service(svc) => f.debug_tuple("State::Service").field(svc).finish(), + } + } +} + +impl<F, S, R, E> Service<R> for FutureService<F, S> +where + F: Future<Output = Result<S, E>> + Unpin, + S: Service<R, Error = E>, +{ + type Response = S::Response; + type Error = E; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + loop { + self.state = match &mut self.state { + State::Future(fut) => { + let fut = Pin::new(fut); + let svc = futures_core::ready!(fut.poll(cx)?); + State::Service(svc) + } + State::Service(svc) => return svc.poll_ready(cx), + }; + } + } + + fn call(&mut self, req: R) -> Self::Future { + if let State::Service(svc) = &mut self.state { + svc.call(req) + } else { + panic!("FutureService::call was called before FutureService::poll_ready") + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::util::{future_service, ServiceExt}; + use crate::Service; + use futures::future::{ready, Ready}; + use std::convert::Infallible; + + #[tokio::test] + async fn pending_service_debug_impl() { + let mut pending_svc = future_service(ready(Ok(DebugService))); + + assert_eq!( + format!("{:?}", pending_svc), + "FutureService { state: State::Future(<futures_util::future::ready::Ready<core::result::Result<tower::util::future_service::tests::DebugService, core::convert::Infallible>>>) }" + ); + + pending_svc.ready().await.unwrap(); + + assert_eq!( + format!("{:?}", pending_svc), + "FutureService { state: State::Service(DebugService) }" + ); + } + + #[derive(Debug)] + struct DebugService; + + impl Service<()> for DebugService { + type Response = (); + type Error = Infallible; + type Future = Ready<Result<Self::Response, Self::Error>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Ok(()).into() + } + + fn call(&mut self, _req: ()) -> Self::Future { + ready(Ok(())) + } + } +} diff --git a/vendor/tower/src/util/map_err.rs b/vendor/tower/src/util/map_err.rs new file mode 100644 index 00000000..1b936acb --- /dev/null +++ b/vendor/tower/src/util/map_err.rs @@ -0,0 +1,98 @@ +use futures_util::{future, TryFutureExt}; +use std::fmt; +use std::task::{Context, Poll}; +use tower_layer::Layer; +use tower_service::Service; + +/// Service returned by the [`map_err`] combinator. +/// +/// [`map_err`]: crate::util::ServiceExt::map_err +#[derive(Clone)] +pub struct MapErr<S, F> { + inner: S, + f: F, +} + +impl<S, F> fmt::Debug for MapErr<S, F> +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapErr") + .field("inner", &self.inner) + .field("f", &format_args!("{}", std::any::type_name::<F>())) + .finish() + } +} + +/// A [`Layer`] that produces [`MapErr`] services. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Clone, Debug)] +pub struct MapErrLayer<F> { + f: F, +} + +opaque_future! { + /// Response future from [`MapErr`] services. + /// + /// [`MapErr`]: crate::util::MapErr + pub type MapErrFuture<F, N> = future::MapErr<F, N>; +} + +impl<S, F> MapErr<S, F> { + /// Creates a new [`MapErr`] service. + pub const fn new(inner: S, f: F) -> Self { + MapErr { f, inner } + } + + /// Returns a new [`Layer`] that produces [`MapErr`] services. + /// + /// This is a convenience function that simply calls [`MapErrLayer::new`]. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(f: F) -> MapErrLayer<F> { + MapErrLayer { f } + } +} + +impl<S, F, Request, Error> Service<Request> for MapErr<S, F> +where + S: Service<Request>, + F: FnOnce(S::Error) -> Error + Clone, +{ + type Response = S::Response; + type Error = Error; + type Future = MapErrFuture<S::Future, F>; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner.poll_ready(cx).map_err(self.f.clone()) + } + + #[inline] + fn call(&mut self, request: Request) -> Self::Future { + MapErrFuture::new(self.inner.call(request).map_err(self.f.clone())) + } +} + +impl<F> MapErrLayer<F> { + /// Creates a new [`MapErrLayer`]. + pub const fn new(f: F) -> Self { + MapErrLayer { f } + } +} + +impl<S, F> Layer<S> for MapErrLayer<F> +where + F: Clone, +{ + type Service = MapErr<S, F>; + + fn layer(&self, inner: S) -> Self::Service { + MapErr { + f: self.f.clone(), + inner, + } + } +} diff --git a/vendor/tower/src/util/map_future.rs b/vendor/tower/src/util/map_future.rs new file mode 100644 index 00000000..55bf96d0 --- /dev/null +++ b/vendor/tower/src/util/map_future.rs @@ -0,0 +1,113 @@ +use std::{ + fmt, + future::Future, + task::{Context, Poll}, +}; +use tower_layer::Layer; +use tower_service::Service; + +/// [`Service`] returned by the [`map_future`] combinator. +/// +/// [`map_future`]: crate::util::ServiceExt::map_future +#[derive(Clone)] +pub struct MapFuture<S, F> { + inner: S, + f: F, +} + +impl<S, F> MapFuture<S, F> { + /// Creates a new [`MapFuture`] service. + pub const fn new(inner: S, f: F) -> Self { + Self { inner, f } + } + + /// Returns a new [`Layer`] that produces [`MapFuture`] services. + /// + /// This is a convenience function that simply calls [`MapFutureLayer::new`]. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(f: F) -> MapFutureLayer<F> { + MapFutureLayer::new(f) + } + + /// Get a reference to the inner service + pub fn get_ref(&self) -> &S { + &self.inner + } + + /// Get a mutable reference to the inner service + pub fn get_mut(&mut self) -> &mut S { + &mut self.inner + } + + /// Consume `self`, returning the inner service + pub fn into_inner(self) -> S { + self.inner + } +} + +impl<R, S, F, T, E, Fut> Service<R> for MapFuture<S, F> +where + S: Service<R>, + F: FnMut(S::Future) -> Fut, + E: From<S::Error>, + Fut: Future<Output = Result<T, E>>, +{ + type Response = T; + type Error = E; + type Future = Fut; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner.poll_ready(cx).map_err(From::from) + } + + fn call(&mut self, req: R) -> Self::Future { + (self.f)(self.inner.call(req)) + } +} + +impl<S, F> fmt::Debug for MapFuture<S, F> +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapFuture") + .field("inner", &self.inner) + .field("f", &format_args!("{}", std::any::type_name::<F>())) + .finish() + } +} + +/// A [`Layer`] that produces a [`MapFuture`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Clone)] +pub struct MapFutureLayer<F> { + f: F, +} + +impl<F> MapFutureLayer<F> { + /// Creates a new [`MapFutureLayer`] layer. + pub const fn new(f: F) -> Self { + Self { f } + } +} + +impl<S, F> Layer<S> for MapFutureLayer<F> +where + F: Clone, +{ + type Service = MapFuture<S, F>; + + fn layer(&self, inner: S) -> Self::Service { + MapFuture::new(inner, self.f.clone()) + } +} + +impl<F> fmt::Debug for MapFutureLayer<F> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapFutureLayer") + .field("f", &format_args!("{}", std::any::type_name::<F>())) + .finish() + } +} diff --git a/vendor/tower/src/util/map_request.rs b/vendor/tower/src/util/map_request.rs new file mode 100644 index 00000000..62f2de3c --- /dev/null +++ b/vendor/tower/src/util/map_request.rs @@ -0,0 +1,90 @@ +use std::fmt; +use std::task::{Context, Poll}; +use tower_layer::Layer; +use tower_service::Service; + +/// Service returned by the [`MapRequest`] combinator. +/// +/// [`MapRequest`]: crate::util::ServiceExt::map_request +#[derive(Clone)] +pub struct MapRequest<S, F> { + inner: S, + f: F, +} + +impl<S, F> fmt::Debug for MapRequest<S, F> +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapRequest") + .field("inner", &self.inner) + .field("f", &format_args!("{}", std::any::type_name::<F>())) + .finish() + } +} + +impl<S, F> MapRequest<S, F> { + /// Creates a new [`MapRequest`] service. + pub const fn new(inner: S, f: F) -> Self { + MapRequest { inner, f } + } + + /// Returns a new [`Layer`] that produces [`MapRequest`] services. + /// + /// This is a convenience function that simply calls [`MapRequestLayer::new`]. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(f: F) -> MapRequestLayer<F> { + MapRequestLayer { f } + } +} + +impl<S, F, R1, R2> Service<R1> for MapRequest<S, F> +where + S: Service<R2>, + F: FnMut(R1) -> R2, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> { + self.inner.poll_ready(cx) + } + + #[inline] + fn call(&mut self, request: R1) -> S::Future { + self.inner.call((self.f)(request)) + } +} + +/// A [`Layer`] that produces [`MapRequest`] services. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Clone, Debug)] +pub struct MapRequestLayer<F> { + f: F, +} + +impl<F> MapRequestLayer<F> { + /// Creates a new [`MapRequestLayer`]. + pub const fn new(f: F) -> Self { + MapRequestLayer { f } + } +} + +impl<S, F> Layer<S> for MapRequestLayer<F> +where + F: Clone, +{ + type Service = MapRequest<S, F>; + + fn layer(&self, inner: S) -> Self::Service { + MapRequest { + f: self.f.clone(), + inner, + } + } +} diff --git a/vendor/tower/src/util/map_response.rs b/vendor/tower/src/util/map_response.rs new file mode 100644 index 00000000..8edac10a --- /dev/null +++ b/vendor/tower/src/util/map_response.rs @@ -0,0 +1,98 @@ +use futures_util::{future::MapOk, TryFutureExt}; +use std::fmt; +use std::task::{Context, Poll}; +use tower_layer::Layer; +use tower_service::Service; + +/// Service returned by the [`map_response`] combinator. +/// +/// [`map_response`]: crate::util::ServiceExt::map_response +#[derive(Clone)] +pub struct MapResponse<S, F> { + inner: S, + f: F, +} + +impl<S, F> fmt::Debug for MapResponse<S, F> +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapResponse") + .field("inner", &self.inner) + .field("f", &format_args!("{}", std::any::type_name::<F>())) + .finish() + } +} + +/// A [`Layer`] that produces a [`MapResponse`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug, Clone)] +pub struct MapResponseLayer<F> { + f: F, +} + +opaque_future! { + /// Response future from [`MapResponse`] services. + /// + /// [`MapResponse`]: crate::util::MapResponse + pub type MapResponseFuture<F, N> = MapOk<F, N>; +} + +impl<S, F> MapResponse<S, F> { + /// Creates a new `MapResponse` service. + pub const fn new(inner: S, f: F) -> Self { + MapResponse { f, inner } + } + + /// Returns a new [`Layer`] that produces [`MapResponse`] services. + /// + /// This is a convenience function that simply calls [`MapResponseLayer::new`]. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(f: F) -> MapResponseLayer<F> { + MapResponseLayer { f } + } +} + +impl<S, F, Request, Response> Service<Request> for MapResponse<S, F> +where + S: Service<Request>, + F: FnOnce(S::Response) -> Response + Clone, +{ + type Response = Response; + type Error = S::Error; + type Future = MapResponseFuture<S::Future, F>; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner.poll_ready(cx) + } + + #[inline] + fn call(&mut self, request: Request) -> Self::Future { + MapResponseFuture::new(self.inner.call(request).map_ok(self.f.clone())) + } +} + +impl<F> MapResponseLayer<F> { + /// Creates a new [`MapResponseLayer`] layer. + pub const fn new(f: F) -> Self { + MapResponseLayer { f } + } +} + +impl<S, F> Layer<S> for MapResponseLayer<F> +where + F: Clone, +{ + type Service = MapResponse<S, F>; + + fn layer(&self, inner: S) -> Self::Service { + MapResponse { + f: self.f.clone(), + inner, + } + } +} diff --git a/vendor/tower/src/util/map_result.rs b/vendor/tower/src/util/map_result.rs new file mode 100644 index 00000000..5a96af2d --- /dev/null +++ b/vendor/tower/src/util/map_result.rs @@ -0,0 +1,99 @@ +use futures_util::{future::Map, FutureExt}; +use std::fmt; +use std::task::{Context, Poll}; +use tower_layer::Layer; +use tower_service::Service; + +/// Service returned by the [`map_result`] combinator. +/// +/// [`map_result`]: crate::util::ServiceExt::map_result +#[derive(Clone)] +pub struct MapResult<S, F> { + inner: S, + f: F, +} + +impl<S, F> fmt::Debug for MapResult<S, F> +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapResult") + .field("inner", &self.inner) + .field("f", &format_args!("{}", std::any::type_name::<F>())) + .finish() + } +} + +/// A [`Layer`] that produces a [`MapResult`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug, Clone)] +pub struct MapResultLayer<F> { + f: F, +} + +opaque_future! { + /// Response future from [`MapResult`] services. + /// + /// [`MapResult`]: crate::util::MapResult + pub type MapResultFuture<F, N> = Map<F, N>; +} + +impl<S, F> MapResult<S, F> { + /// Creates a new [`MapResult`] service. + pub const fn new(inner: S, f: F) -> Self { + MapResult { f, inner } + } + + /// Returns a new [`Layer`] that produces [`MapResult`] services. + /// + /// This is a convenience function that simply calls [`MapResultLayer::new`]. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(f: F) -> MapResultLayer<F> { + MapResultLayer { f } + } +} + +impl<S, F, Request, Response, Error> Service<Request> for MapResult<S, F> +where + S: Service<Request>, + Error: From<S::Error>, + F: FnOnce(Result<S::Response, S::Error>) -> Result<Response, Error> + Clone, +{ + type Response = Response; + type Error = Error; + type Future = MapResultFuture<S::Future, F>; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + #[inline] + fn call(&mut self, request: Request) -> Self::Future { + MapResultFuture::new(self.inner.call(request).map(self.f.clone())) + } +} + +impl<F> MapResultLayer<F> { + /// Creates a new [`MapResultLayer`] layer. + pub const fn new(f: F) -> Self { + MapResultLayer { f } + } +} + +impl<S, F> Layer<S> for MapResultLayer<F> +where + F: Clone, +{ + type Service = MapResult<S, F>; + + fn layer(&self, inner: S) -> Self::Service { + MapResult { + f: self.f.clone(), + inner, + } + } +} diff --git a/vendor/tower/src/util/mod.rs b/vendor/tower/src/util/mod.rs new file mode 100644 index 00000000..4c56de81 --- /dev/null +++ b/vendor/tower/src/util/mod.rs @@ -0,0 +1,1073 @@ +//! Various utility types and functions that are generally used with Tower. + +mod and_then; +mod boxed; +mod boxed_clone; +mod boxed_clone_sync; +mod call_all; +mod either; + +mod future_service; +mod map_err; +mod map_request; +mod map_response; +mod map_result; + +mod map_future; +mod oneshot; +mod optional; +mod ready; +mod service_fn; +mod then; + +pub mod rng; + +pub use self::{ + and_then::{AndThen, AndThenLayer}, + boxed::{ + BoxCloneServiceLayer, BoxCloneSyncServiceLayer, BoxLayer, BoxService, UnsyncBoxService, + }, + boxed_clone::BoxCloneService, + boxed_clone_sync::BoxCloneSyncService, + either::Either, + future_service::{future_service, FutureService}, + map_err::{MapErr, MapErrLayer}, + map_future::{MapFuture, MapFutureLayer}, + map_request::{MapRequest, MapRequestLayer}, + map_response::{MapResponse, MapResponseLayer}, + map_result::{MapResult, MapResultLayer}, + oneshot::Oneshot, + optional::Optional, + ready::{Ready, ReadyOneshot}, + service_fn::{service_fn, ServiceFn}, + then::{Then, ThenLayer}, +}; + +pub use self::call_all::{CallAll, CallAllUnordered}; +use std::future::Future; + +use crate::layer::util::Identity; + +pub mod error { + //! Error types + + pub use super::optional::error as optional; +} + +pub mod future { + //! Future types + + pub use super::and_then::AndThenFuture; + pub use super::either::EitherResponseFuture; + pub use super::map_err::MapErrFuture; + pub use super::map_response::MapResponseFuture; + pub use super::map_result::MapResultFuture; + pub use super::optional::future as optional; + pub use super::then::ThenFuture; +} + +/// An extension trait for `Service`s that provides a variety of convenient +/// adapters +pub trait ServiceExt<Request>: tower_service::Service<Request> { + /// Yields a mutable reference to the service when it is ready to accept a request. + fn ready(&mut self) -> Ready<'_, Self, Request> + where + Self: Sized, + { + Ready::new(self) + } + + /// Yields the service when it is ready to accept a request. + fn ready_oneshot(self) -> ReadyOneshot<Self, Request> + where + Self: Sized, + { + ReadyOneshot::new(self) + } + + /// Consume this `Service`, calling it with the provided request once it is ready. + fn oneshot(self, req: Request) -> Oneshot<Self, Request> + where + Self: Sized, + { + Oneshot::new(self, req) + } + + /// Process all requests from the given [`Stream`], and produce a [`Stream`] of their responses. + /// + /// This is essentially [`Stream<Item = Request>`][stream] + `Self` => [`Stream<Item = + /// Response>`][stream]. See the documentation for [`CallAll`] for + /// details. + /// + /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html + /// [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html + fn call_all<S>(self, reqs: S) -> CallAll<Self, S> + where + Self: Sized, + S: futures_core::Stream<Item = Request>, + { + CallAll::new(self, reqs) + } + + /// Executes a new future after this service's future resolves. This does + /// not alter the behaviour of the [`poll_ready`] method. + /// + /// This method can be used to change the [`Response`] type of the service + /// into a different type. You can use this method to chain along a computation once the + /// service's response has been resolved. + /// + /// [`Response`]: crate::Service::Response + /// [`poll_ready`]: crate::Service::poll_ready + /// + /// # Example + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Record { + /// # pub name: String, + /// # pub age: u16 + /// # } + /// # + /// # impl Service<u32> for DatabaseService { + /// # type Response = Record; + /// # type Error = u8; + /// # type Future = futures_util::future::Ready<Result<Record, u8>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 })) + /// # } + /// # } + /// # + /// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result<Record, _> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Map the response into a new response + /// let mut new_service = service.and_then(|record: Record| async move { + /// let name = record.name; + /// avatar_lookup(name).await + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let avatar = new_service.call(id).await.unwrap(); + /// # }; + /// # } + /// ``` + fn and_then<F>(self, f: F) -> AndThen<Self, F> + where + Self: Sized, + F: Clone, + { + AndThen::new(self, f) + } + + /// Maps this service's response value to a different value. This does not + /// alter the behaviour of the [`poll_ready`] method. + /// + /// This method can be used to change the [`Response`] type of the service + /// into a different type. It is similar to the [`Result::map`] + /// method. You can use this method to chain along a computation once the + /// service's response has been resolved. + /// + /// [`Response`]: crate::Service::Response + /// [`poll_ready`]: crate::Service::poll_ready + /// + /// # Example + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Record { + /// # pub name: String, + /// # pub age: u16 + /// # } + /// # + /// # impl Service<u32> for DatabaseService { + /// # type Response = Record; + /// # type Error = u8; + /// # type Future = futures_util::future::Ready<Result<Record, u8>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 })) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result<Record, _> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Map the response into a new response + /// let mut new_service = service.map_response(|record| record.name); + /// + /// // Call the new service + /// let id = 13; + /// let name = new_service + /// .ready() + /// .await? + /// .call(id) + /// .await?; + /// # Ok::<(), u8>(()) + /// # }; + /// # } + /// ``` + fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F> + where + Self: Sized, + F: FnOnce(Self::Response) -> Response + Clone, + { + MapResponse::new(self, f) + } + + /// Maps this service's error value to a different value. This does not + /// alter the behaviour of the [`poll_ready`] method. + /// + /// This method can be used to change the [`Error`] type of the service + /// into a different type. It is similar to the [`Result::map_err`] method. + /// + /// [`Error`]: crate::Service::Error + /// [`poll_ready`]: crate::Service::poll_ready + /// + /// # Example + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Error { + /// # pub code: u32, + /// # pub message: String + /// # } + /// # + /// # impl Service<u32> for DatabaseService { + /// # type Response = String; + /// # type Error = Error; + /// # type Future = futures_util::future::Ready<Result<String, Error>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(String::new())) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result<_, Error> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Map the error to a new error + /// let mut new_service = service.map_err(|err| err.code); + /// + /// // Call the new service + /// let id = 13; + /// let code = new_service + /// .ready() + /// .await? + /// .call(id) + /// .await + /// .unwrap_err(); + /// # Ok::<(), u32>(()) + /// # }; + /// # } + /// ``` + fn map_err<F, Error>(self, f: F) -> MapErr<Self, F> + where + Self: Sized, + F: FnOnce(Self::Error) -> Error + Clone, + { + MapErr::new(self, f) + } + + /// Maps this service's result type (`Result<Self::Response, Self::Error>`) + /// to a different value, regardless of whether the future succeeds or + /// fails. + /// + /// This is similar to the [`map_response`] and [`map_err`] combinators, + /// except that the *same* function is invoked when the service's future + /// completes, whether it completes successfully or fails. This function + /// takes the [`Result`] returned by the service's future, and returns a + /// [`Result`]. + /// + /// Like the standard library's [`Result::and_then`], this method can be + /// used to implement control flow based on `Result` values. For example, it + /// may be used to implement error recovery, by turning some [`Err`] + /// responses from the service into [`Ok`] responses. Similarly, some + /// successful responses from the service could be rejected, by returning an + /// [`Err`] conditionally, depending on the value inside the [`Ok`.] Finally, + /// this method can also be used to implement behaviors that must run when a + /// service's future completes, regardless of whether it succeeded or failed. + /// + /// This method can be used to change the [`Response`] type of the service + /// into a different type. It can also be used to change the [`Error`] type + /// of the service. However, because the [`map_result`] function is not applied + /// to the errors returned by the service's [`poll_ready`] method, it must + /// be possible to convert the service's [`Error`] type into the error type + /// returned by the [`map_result`] function. This is trivial when the function + /// returns the same error type as the service, but in other cases, it can + /// be useful to use [`BoxError`] to erase differing error types. + /// + /// # Examples + /// + /// Recovering from certain errors: + /// + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Record { + /// # pub name: String, + /// # pub age: u16 + /// # } + /// # #[derive(Debug)] + /// # enum DbError { + /// # Parse(std::num::ParseIntError), + /// # NoRecordsFound, + /// # } + /// # + /// # impl Service<u32> for DatabaseService { + /// # type Response = Vec<Record>; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready<Result<Vec<Record>, DbError>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(vec![Record { name: "Jack".into(), age: 32 }])) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result<Vec<Record>, DbError> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // If the database returns no records for the query, we just want an empty `Vec`. + /// let mut new_service = service.map_result(|result| match result { + /// // If the error indicates that no records matched the query, return an empty + /// // `Vec` instead. + /// Err(DbError::NoRecordsFound) => Ok(Vec::new()), + /// // Propagate all other responses (`Ok` and `Err`) unchanged + /// x => x, + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let name = new_service + /// .ready() + /// .await? + /// .call(id) + /// .await?; + /// # Ok::<(), DbError>(()) + /// # }; + /// # } + /// ``` + /// + /// Rejecting some `Ok` responses: + /// + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # struct Record { + /// # pub name: String, + /// # pub age: u16 + /// # } + /// # type DbError = String; + /// # type AppError = String; + /// # + /// # impl Service<u32> for DatabaseService { + /// # type Response = Record; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready<Result<Record, DbError>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(Record { name: "Jack".into(), age: 32 })) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// use tower::BoxError; + /// + /// // A service returning Result<Record, DbError> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // If the user is zero years old, return an error. + /// let mut new_service = service.map_result(|result| { + /// let record = result?; + /// + /// if record.age == 0 { + /// // Users must have been born to use our app! + /// let app_error = AppError::from("users cannot be 0 years old!"); + /// + /// // Box the error to erase its type (as it can be an `AppError` + /// // *or* the inner service's `DbError`). + /// return Err(BoxError::from(app_error)); + /// } + /// + /// // Otherwise, return the record. + /// Ok(record) + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let record = new_service + /// .ready() + /// .await? + /// .call(id) + /// .await?; + /// # Ok::<(), BoxError>(()) + /// # }; + /// # } + /// ``` + /// + /// Performing an action that must be run for both successes and failures: + /// + /// ``` + /// # use std::convert::TryFrom; + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # impl Service<u32> for DatabaseService { + /// # type Response = String; + /// # type Error = u8; + /// # type Future = futures_util::future::Ready<Result<String, u8>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(String::new())) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service returning Result<Record, DbError> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Print a message whenever a query completes. + /// let mut new_service = service.map_result(|result| { + /// println!("query completed; success={}", result.is_ok()); + /// result + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let response = new_service + /// .ready() + /// .await? + /// .call(id) + /// .await; + /// # response + /// # }; + /// # } + /// ``` + /// + /// [`map_response`]: ServiceExt::map_response + /// [`map_err`]: ServiceExt::map_err + /// [`map_result`]: ServiceExt::map_result + /// [`Error`]: crate::Service::Error + /// [`Response`]: crate::Service::Response + /// [`poll_ready`]: crate::Service::poll_ready + /// [`BoxError`]: crate::BoxError + fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F> + where + Self: Sized, + Error: From<Self::Error>, + F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone, + { + MapResult::new(self, f) + } + + /// Composes a function *in front of* the service. + /// + /// This adapter produces a new service that passes each value through the + /// given function `f` before sending it to `self`. + /// + /// # Example + /// ``` + /// # use std::convert::TryFrom; + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # impl Service<String> for DatabaseService { + /// # type Response = String; + /// # type Error = u8; + /// # type Future = futures_util::future::Ready<Result<String, u8>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: String) -> Self::Future { + /// # futures_util::future::ready(Ok(String::new())) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service taking a String as a request + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Map the request to a new request + /// let mut new_service = service.map_request(|id: u32| id.to_string()); + /// + /// // Call the new service + /// let id = 13; + /// let response = new_service + /// .ready() + /// .await? + /// .call(id) + /// .await; + /// # response + /// # }; + /// # } + /// ``` + fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F> + where + Self: Sized, + F: FnMut(NewRequest) -> Request, + { + MapRequest::new(self, f) + } + + /// Composes this service with a [`Filter`] that conditionally accepts or + /// rejects requests based on a [predicate]. + /// + /// This adapter produces a new service that passes each value through the + /// given function `predicate` before sending it to `self`. + /// + /// # Example + /// ``` + /// # use std::convert::TryFrom; + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # #[derive(Debug)] enum DbError { + /// # Parse(std::num::ParseIntError) + /// # } + /// # + /// # impl std::fmt::Display for DbError { + /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) } + /// # } + /// # impl std::error::Error for DbError {} + /// # impl Service<u32> for DatabaseService { + /// # type Response = String; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready<Result<String, DbError>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(String::new())) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service taking a u32 as a request and returning Result<_, DbError> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // Fallibly map the request to a new request + /// let mut new_service = service + /// .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse)); + /// + /// // Call the new service + /// let id = "13"; + /// let response = new_service + /// .ready() + /// .await? + /// .call(id) + /// .await; + /// # response + /// # }; + /// # } + /// ``` + /// + /// [`Filter`]: crate::filter::Filter + /// [predicate]: crate::filter::Predicate + #[cfg(feature = "filter")] + fn filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F> + where + Self: Sized, + F: crate::filter::Predicate<NewRequest>, + { + crate::filter::Filter::new(self, filter) + } + + /// Composes this service with an [`AsyncFilter`] that conditionally accepts or + /// rejects requests based on an [async predicate]. + /// + /// This adapter produces a new service that passes each value through the + /// given function `predicate` before sending it to `self`. + /// + /// # Example + /// ``` + /// # use std::convert::TryFrom; + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # #[derive(Clone)] struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # #[derive(Debug)] + /// # enum DbError { + /// # Rejected + /// # } + /// # impl std::fmt::Display for DbError { + /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) } + /// # } + /// # impl std::error::Error for DbError {} + /// # + /// # impl Service<u32> for DatabaseService { + /// # type Response = String; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready<Result<String, DbError>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(String::new())) + /// # } + /// # } + /// # + /// # fn main() { + /// # async { + /// // A service taking a u32 as a request and returning Result<_, DbError> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// /// Returns `true` if we should query the database for an ID. + /// async fn should_query(id: u32) -> bool { + /// // ... + /// # true + /// } + /// + /// // Filter requests based on `should_query`. + /// let mut new_service = service + /// .filter_async(|id: u32| async move { + /// if should_query(id).await { + /// return Ok(id); + /// } + /// + /// Err(DbError::Rejected) + /// }); + /// + /// // Call the new service + /// let id = 13; + /// # let id: u32 = id; + /// let response = new_service + /// .ready() + /// .await? + /// .call(id) + /// .await; + /// # response + /// # }; + /// # } + /// ``` + /// + /// [`AsyncFilter`]: crate::filter::AsyncFilter + /// [asynchronous predicate]: crate::filter::AsyncPredicate + #[cfg(feature = "filter")] + fn filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F> + where + Self: Sized, + F: crate::filter::AsyncPredicate<NewRequest>, + { + crate::filter::AsyncFilter::new(self, filter) + } + + /// Composes an asynchronous function *after* this service. + /// + /// This takes a function or closure returning a future, and returns a new + /// `Service` that chains that function after this service's [`Future`]. The + /// new `Service`'s future will consist of this service's future, followed + /// by the future returned by calling the chained function with the future's + /// [`Output`] type. The chained function is called regardless of whether + /// this service's future completes with a successful response or with an + /// error. + /// + /// This method can be thought of as an equivalent to the [`futures` + /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that + /// _return_ futures, rather than on an individual future. Similarly to that + /// combinator, [`ServiceExt::then`] can be used to implement asynchronous + /// error recovery, by calling some asynchronous function with errors + /// returned by this service. Alternatively, it may also be used to call a + /// fallible async function with the successful response of this service. + /// + /// This method can be used to change the [`Response`] type of the service + /// into a different type. It can also be used to change the [`Error`] type + /// of the service. However, because the `then` function is not applied + /// to the errors returned by the service's [`poll_ready`] method, it must + /// be possible to convert the service's [`Error`] type into the error type + /// returned by the `then` future. This is trivial when the function + /// returns the same error type as the service, but in other cases, it can + /// be useful to use [`BoxError`] to erase differing error types. + /// + /// # Examples + /// + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # type Record = (); + /// # type DbError = (); + /// # + /// # impl Service<u32> for DatabaseService { + /// # type Response = Record; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready<Result<Record, DbError>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(())) + /// # } + /// # } + /// # + /// # fn main() { + /// // A service returning Result<Record, DbError> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// + /// // An async function that attempts to recover from errors returned by the + /// // database. + /// async fn recover_from_error(error: DbError) -> Result<Record, DbError> { + /// // ... + /// # Ok(()) + /// } + /// # async { + /// + /// // If the database service returns an error, attempt to recover by + /// // calling `recover_from_error`. Otherwise, return the successful response. + /// let mut new_service = service.then(|result| async move { + /// match result { + /// Ok(record) => Ok(record), + /// Err(e) => recover_from_error(e).await, + /// } + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let record = new_service + /// .ready() + /// .await? + /// .call(id) + /// .await?; + /// # Ok::<(), DbError>(()) + /// # }; + /// # } + /// ``` + /// + /// [`Future`]: crate::Service::Future + /// [`Output`]: std::future::Future::Output + /// [`futures` crate]: https://docs.rs/futures + /// [`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then + /// [`Error`]: crate::Service::Error + /// [`Response`]: crate::Service::Response + /// [`poll_ready`]: crate::Service::poll_ready + /// [`BoxError`]: crate::BoxError + fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F> + where + Self: Sized, + Error: From<Self::Error>, + F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone, + Fut: Future<Output = Result<Response, Error>>, + { + Then::new(self, f) + } + + /// Composes a function that transforms futures produced by the service. + /// + /// This takes a function or closure returning a future computed from the future returned by + /// the service's [`call`] method, as opposed to the responses produced by the future. + /// + /// # Examples + /// + /// ``` + /// # use std::task::{Poll, Context}; + /// # use tower::{Service, ServiceExt, BoxError}; + /// # + /// # struct DatabaseService; + /// # impl DatabaseService { + /// # fn new(address: &str) -> Self { + /// # DatabaseService + /// # } + /// # } + /// # + /// # type Record = (); + /// # type DbError = crate::BoxError; + /// # + /// # impl Service<u32> for DatabaseService { + /// # type Response = Record; + /// # type Error = DbError; + /// # type Future = futures_util::future::Ready<Result<Record, DbError>>; + /// # + /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + /// # Poll::Ready(Ok(())) + /// # } + /// # + /// # fn call(&mut self, request: u32) -> Self::Future { + /// # futures_util::future::ready(Ok(())) + /// # } + /// # } + /// # + /// # fn main() { + /// use std::time::Duration; + /// use tokio::time::timeout; + /// + /// // A service returning Result<Record, DbError> + /// let service = DatabaseService::new("127.0.0.1:8080"); + /// # async { + /// + /// let mut new_service = service.map_future(|future| async move { + /// let res = timeout(Duration::from_secs(1), future).await?; + /// Ok::<_, BoxError>(res) + /// }); + /// + /// // Call the new service + /// let id = 13; + /// let record = new_service + /// .ready() + /// .await? + /// .call(id) + /// .await?; + /// # Ok::<(), BoxError>(()) + /// # }; + /// # } + /// ``` + /// + /// Note that normally you wouldn't implement timeouts like this and instead use [`Timeout`]. + /// + /// [`call`]: crate::Service::call + /// [`Timeout`]: crate::timeout::Timeout + fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F> + where + Self: Sized, + F: FnMut(Self::Future) -> Fut, + Error: From<Self::Error>, + Fut: Future<Output = Result<Response, Error>>, + { + MapFuture::new(self, f) + } + + /// Convert the service into a [`Service`] + [`Send`] trait object. + /// + /// See [`BoxService`] for more details. + /// + /// If `Self` implements the [`Clone`] trait, the [`boxed_clone`] method + /// can be used instead, to produce a boxed service which will also + /// implement [`Clone`]. + /// + /// # Example + /// + /// ``` + /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxService}; + /// # + /// # struct Request; + /// # struct Response; + /// # impl Response { + /// # fn new() -> Self { Self } + /// # } + /// + /// let service = service_fn(|req: Request| async { + /// Ok::<_, BoxError>(Response::new()) + /// }); + /// + /// let service: BoxService<Request, Response, BoxError> = service + /// .map_request(|req| { + /// println!("received request"); + /// req + /// }) + /// .map_response(|res| { + /// println!("response produced"); + /// res + /// }) + /// .boxed(); + /// # let service = assert_service(service); + /// # fn assert_service<S, R>(svc: S) -> S + /// # where S: Service<R> { svc } + /// ``` + /// + /// [`Service`]: crate::Service + /// [`boxed_clone`]: Self::boxed_clone + fn boxed(self) -> BoxService<Request, Self::Response, Self::Error> + where + Self: Sized + Send + 'static, + Self::Future: Send + 'static, + { + BoxService::new(self) + } + + /// Convert the service into a [`Service`] + [`Clone`] + [`Send`] trait object. + /// + /// This is similar to the [`boxed`] method, but it requires that `Self` implement + /// [`Clone`], and the returned boxed service implements [`Clone`]. + /// See [`BoxCloneService`] for more details. + /// + /// # Example + /// + /// ``` + /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService}; + /// # + /// # struct Request; + /// # struct Response; + /// # impl Response { + /// # fn new() -> Self { Self } + /// # } + /// + /// let service = service_fn(|req: Request| async { + /// Ok::<_, BoxError>(Response::new()) + /// }); + /// + /// let service: BoxCloneService<Request, Response, BoxError> = service + /// .map_request(|req| { + /// println!("received request"); + /// req + /// }) + /// .map_response(|res| { + /// println!("response produced"); + /// res + /// }) + /// .boxed_clone(); + /// + /// // The boxed service can still be cloned. + /// service.clone(); + /// # let service = assert_service(service); + /// # fn assert_service<S, R>(svc: S) -> S + /// # where S: Service<R> { svc } + /// ``` + /// + /// [`Service`]: crate::Service + /// [`boxed`]: Self::boxed + fn boxed_clone(self) -> BoxCloneService<Request, Self::Response, Self::Error> + where + Self: Clone + Sized + Send + 'static, + Self::Future: Send + 'static, + { + BoxCloneService::new(self) + } +} + +impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {} + +/// Convert an `Option<Layer>` into a [`Layer`]. +/// +/// ``` +/// # use std::time::Duration; +/// # use tower::Service; +/// # use tower::builder::ServiceBuilder; +/// use tower::util::option_layer; +/// # use tower::timeout::TimeoutLayer; +/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send { +/// # let timeout = Some(Duration::new(10, 0)); +/// // Layer to apply a timeout if configured +/// let maybe_timeout = option_layer(timeout.map(TimeoutLayer::new)); +/// +/// ServiceBuilder::new() +/// .layer(maybe_timeout) +/// .service(svc); +/// # } +/// ``` +/// +/// [`Layer`]: crate::layer::Layer +pub fn option_layer<L>(layer: Option<L>) -> Either<L, Identity> { + if let Some(layer) = layer { + Either::Left(layer) + } else { + Either::Right(Identity::new()) + } +} diff --git a/vendor/tower/src/util/oneshot.rs b/vendor/tower/src/util/oneshot.rs new file mode 100644 index 00000000..114b2f82 --- /dev/null +++ b/vendor/tower/src/util/oneshot.rs @@ -0,0 +1,105 @@ +use futures_core::ready; +use pin_project_lite::pin_project; +use std::{ + fmt, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower_service::Service; + +pin_project! { + /// A [`Future`] consuming a [`Service`] and request, waiting until the [`Service`] + /// is ready, and then calling [`Service::call`] with the request, and + /// waiting for that [`Future`]. + #[derive(Debug)] + pub struct Oneshot<S: Service<Req>, Req> { + #[pin] + state: State<S, Req>, + } +} + +pin_project! { + #[project = StateProj] + enum State<S: Service<Req>, Req> { + NotReady { + svc: S, + req: Option<Req>, + }, + Called { + #[pin] + fut: S::Future, + }, + Done, + } +} + +impl<S: Service<Req>, Req> State<S, Req> { + const fn not_ready(svc: S, req: Option<Req>) -> Self { + Self::NotReady { svc, req } + } + + const fn called(fut: S::Future) -> Self { + Self::Called { fut } + } +} + +impl<S, Req> fmt::Debug for State<S, Req> +where + S: Service<Req> + fmt::Debug, + Req: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + State::NotReady { + svc, + req: Some(req), + } => f + .debug_tuple("State::NotReady") + .field(svc) + .field(req) + .finish(), + State::NotReady { req: None, .. } => unreachable!(), + State::Called { .. } => f.debug_tuple("State::Called").field(&"S::Future").finish(), + State::Done => f.debug_tuple("State::Done").finish(), + } + } +} + +impl<S, Req> Oneshot<S, Req> +where + S: Service<Req>, +{ + #[allow(missing_docs)] + pub const fn new(svc: S, req: Req) -> Self { + Oneshot { + state: State::not_ready(svc, Some(req)), + } + } +} + +impl<S, Req> Future for Oneshot<S, Req> +where + S: Service<Req>, +{ + type Output = Result<S::Response, S::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + loop { + match this.state.as_mut().project() { + StateProj::NotReady { svc, req } => { + let _ = ready!(svc.poll_ready(cx))?; + let f = svc.call(req.take().expect("already called")); + this.state.set(State::called(f)); + } + StateProj::Called { fut } => { + let res = ready!(fut.poll(cx))?; + this.state.set(State::Done); + return Poll::Ready(Ok(res)); + } + StateProj::Done => panic!("polled after complete"), + } + } + } +} diff --git a/vendor/tower/src/util/optional/error.rs b/vendor/tower/src/util/optional/error.rs new file mode 100644 index 00000000..78061335 --- /dev/null +++ b/vendor/tower/src/util/optional/error.rs @@ -0,0 +1,21 @@ +use std::{error, fmt}; + +/// Error returned if the inner [`Service`] has not been set. +/// +/// [`Service`]: crate::Service +#[derive(Debug)] +pub struct None(()); + +impl None { + pub(crate) fn new() -> None { + None(()) + } +} + +impl fmt::Display for None { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "None") + } +} + +impl error::Error for None {} diff --git a/vendor/tower/src/util/optional/future.rs b/vendor/tower/src/util/optional/future.rs new file mode 100644 index 00000000..7d289b7b --- /dev/null +++ b/vendor/tower/src/util/optional/future.rs @@ -0,0 +1,40 @@ +use super::error; +use futures_core::ready; +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Response future returned by [`Optional`]. + /// + /// [`Optional`]: crate::util::Optional + #[derive(Debug)] + pub struct ResponseFuture<T> { + #[pin] + inner: Option<T>, + } +} + +impl<T> ResponseFuture<T> { + pub(crate) fn new(inner: Option<T>) -> ResponseFuture<T> { + ResponseFuture { inner } + } +} + +impl<F, T, E> Future for ResponseFuture<F> +where + F: Future<Output = Result<T, E>>, + E: Into<crate::BoxError>, +{ + type Output = Result<T, crate::BoxError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match self.project().inner.as_pin_mut() { + Some(inner) => Poll::Ready(Ok(ready!(inner.poll(cx)).map_err(Into::into)?)), + None => Poll::Ready(Err(error::None::new().into())), + } + } +} diff --git a/vendor/tower/src/util/optional/mod.rs b/vendor/tower/src/util/optional/mod.rs new file mode 100644 index 00000000..4d020709 --- /dev/null +++ b/vendor/tower/src/util/optional/mod.rs @@ -0,0 +1,59 @@ +//! Contains [`Optional`] and related types and functions. +//! +//! See [`Optional`] documentation for more details. + +/// Error types for [`Optional`]. +pub mod error; +/// Future types for [`Optional`]. +pub mod future; + +use self::future::ResponseFuture; +use std::task::{Context, Poll}; +use tower_service::Service; + +/// Optionally forwards requests to an inner service. +/// +/// If the inner service is [`None`], [`optional::None`] is returned as the response. +/// +/// [`optional::None`]: crate::util::error::optional::None +#[derive(Debug)] +pub struct Optional<T> { + inner: Option<T>, +} + +impl<T> Optional<T> { + /// Create a new [`Optional`]. + pub const fn new<Request>(inner: Option<T>) -> Optional<T> + where + T: Service<Request>, + T::Error: Into<crate::BoxError>, + { + Optional { inner } + } +} + +impl<T, Request> Service<Request> for Optional<T> +where + T: Service<Request>, + T::Error: Into<crate::BoxError>, +{ + type Response = T::Response; + type Error = crate::BoxError; + type Future = ResponseFuture<T::Future>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + match self.inner { + Some(ref mut inner) => match inner.poll_ready(cx) { + Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)), + Poll::Pending => Poll::Pending, + }, + // None services are always ready + None => Poll::Ready(Ok(())), + } + } + + fn call(&mut self, request: Request) -> Self::Future { + let inner = self.inner.as_mut().map(|i| i.call(request)); + ResponseFuture::new(inner) + } +} diff --git a/vendor/tower/src/util/ready.rs b/vendor/tower/src/util/ready.rs new file mode 100644 index 00000000..750db872 --- /dev/null +++ b/vendor/tower/src/util/ready.rs @@ -0,0 +1,103 @@ +use std::{fmt, marker::PhantomData}; + +use futures_core::ready; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower_service::Service; + +/// A [`Future`] that yields the service when it is ready to accept a request. +/// +/// [`ReadyOneshot`] values are produced by [`ServiceExt::ready_oneshot`]. +/// +/// [`ServiceExt::ready_oneshot`]: crate::util::ServiceExt::ready_oneshot +pub struct ReadyOneshot<T, Request> { + inner: Option<T>, + _p: PhantomData<fn() -> Request>, +} + +// Safety: This is safe because `Services`'s are always `Unpin`. +impl<T, Request> Unpin for ReadyOneshot<T, Request> {} + +impl<T, Request> ReadyOneshot<T, Request> +where + T: Service<Request>, +{ + #[allow(missing_docs)] + pub const fn new(service: T) -> Self { + Self { + inner: Some(service), + _p: PhantomData, + } + } +} + +impl<T, Request> Future for ReadyOneshot<T, Request> +where + T: Service<Request>, +{ + type Output = Result<T, T::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + ready!(self + .inner + .as_mut() + .expect("poll after Poll::Ready") + .poll_ready(cx))?; + + Poll::Ready(Ok(self.inner.take().expect("poll after Poll::Ready"))) + } +} + +impl<T, Request> fmt::Debug for ReadyOneshot<T, Request> +where + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ReadyOneshot") + .field("inner", &self.inner) + .finish() + } +} + +/// A future that yields a mutable reference to the service when it is ready to accept a request. +/// +/// [`Ready`] values are produced by [`ServiceExt::ready`]. +/// +/// [`ServiceExt::ready`]: crate::util::ServiceExt::ready +pub struct Ready<'a, T, Request>(ReadyOneshot<&'a mut T, Request>); + +// Safety: This is safe for the same reason that the impl for ReadyOneshot is safe. +impl<'a, T, Request> Unpin for Ready<'a, T, Request> {} + +impl<'a, T, Request> Ready<'a, T, Request> +where + T: Service<Request>, +{ + #[allow(missing_docs)] + pub fn new(service: &'a mut T) -> Self { + Self(ReadyOneshot::new(service)) + } +} + +impl<'a, T, Request> Future for Ready<'a, T, Request> +where + T: Service<Request>, +{ + type Output = Result<&'a mut T, T::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut self.0).poll(cx) + } +} + +impl<'a, T, Request> fmt::Debug for Ready<'a, T, Request> +where + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("Ready").field(&self.0).finish() + } +} diff --git a/vendor/tower/src/util/rng.rs b/vendor/tower/src/util/rng.rs new file mode 100644 index 00000000..5b2f9fce --- /dev/null +++ b/vendor/tower/src/util/rng.rs @@ -0,0 +1,181 @@ +//! [PRNG] utilities for tower middleware. +//! +//! This module provides a generic [`Rng`] trait and a [`HasherRng`] that +//! implements the trait based on [`RandomState`] or any other [`Hasher`]. +//! +//! These utilities replace tower's internal usage of `rand` with these smaller, +//! more lightweight methods. Most of the implementations are extracted from +//! their corresponding `rand` implementations. +//! +//! [PRNG]: https://en.wikipedia.org/wiki/Pseudorandom_number_generator + +use std::{ + collections::hash_map::RandomState, + hash::{BuildHasher, Hasher}, + ops::Range, +}; + +/// A simple [PRNG] trait for use within tower middleware. +/// +/// [PRNG]: https://en.wikipedia.org/wiki/Pseudorandom_number_generator +pub trait Rng { + /// Generate a random [`u64`]. + fn next_u64(&mut self) -> u64; + + /// Generate a random [`f64`] between `[0, 1)`. + fn next_f64(&mut self) -> f64 { + // Borrowed from: + // https://github.com/rust-random/rand/blob/master/src/distributions/float.rs#L106 + let float_size = std::mem::size_of::<f64>() as u32 * 8; + let precision = 52 + 1; + let scale = 1.0 / ((1u64 << precision) as f64); + + let value = self.next_u64(); + let value = value >> (float_size - precision); + + scale * value as f64 + } + + /// Randomly pick a value within the range. + /// + /// # Panic + /// + /// - If start < end this will panic in debug mode. + fn next_range(&mut self, range: Range<u64>) -> u64 { + debug_assert!( + range.start < range.end, + "The range start must be smaller than the end" + ); + let start = range.start; + let end = range.end; + + let range = end - start; + + let n = self.next_u64(); + + (n % range) + start + } +} + +impl<R: Rng + ?Sized> Rng for Box<R> { + fn next_u64(&mut self) -> u64 { + (**self).next_u64() + } +} + +/// A [`Rng`] implementation that uses a [`Hasher`] to generate the random +/// values. The implementation uses an internal counter to pass to the hasher +/// for each iteration of [`Rng::next_u64`]. +/// +/// # Default +/// +/// This hasher has a default type of [`RandomState`] which just uses the +/// libstd method of getting a random u64. +#[derive(Clone, Debug)] +pub struct HasherRng<H = RandomState> { + hasher: H, + counter: u64, +} + +impl HasherRng { + /// Create a new default [`HasherRng`]. + pub fn new() -> Self { + HasherRng::default() + } +} + +impl Default for HasherRng { + fn default() -> Self { + HasherRng::with_hasher(RandomState::default()) + } +} + +impl<H> HasherRng<H> { + /// Create a new [`HasherRng`] with the provided hasher. + pub fn with_hasher(hasher: H) -> Self { + HasherRng { hasher, counter: 0 } + } +} + +impl<H> Rng for HasherRng<H> +where + H: BuildHasher, +{ + fn next_u64(&mut self) -> u64 { + let mut hasher = self.hasher.build_hasher(); + hasher.write_u64(self.counter); + self.counter = self.counter.wrapping_add(1); + hasher.finish() + } +} + +/// A sampler modified from the Rand implementation for use internally for the balance middleware. +/// +/// It's an implementation of Floyd's combination algorithm with amount fixed at 2. This uses no allocated +/// memory and finishes in constant time (only 2 random calls). +/// +/// ref: This was borrowed and modified from the following Rand implementation +/// https://github.com/rust-random/rand/blob/b73640705d6714509f8ceccc49e8df996fa19f51/src/seq/index.rs#L375-L411 +#[cfg(feature = "balance")] +pub(crate) fn sample_floyd2<R: Rng>(rng: &mut R, length: u64) -> [u64; 2] { + debug_assert!(2 <= length); + let aidx = rng.next_range(0..length - 1); + let bidx = rng.next_range(0..length); + let aidx = if aidx == bidx { length - 1 } else { aidx }; + [aidx, bidx] +} + +#[cfg(test)] +mod tests { + use super::*; + use quickcheck::*; + + quickcheck! { + fn next_f64(counter: u64) -> TestResult { + let mut rng = HasherRng::default(); + rng.counter = counter; + let n = rng.next_f64(); + + TestResult::from_bool(n < 1.0 && n >= 0.0) + } + + fn next_range(counter: u64, range: Range<u64>) -> TestResult { + if range.start >= range.end{ + return TestResult::discard(); + } + + let mut rng = HasherRng::default(); + rng.counter = counter; + + let n = rng.next_range(range.clone()); + + TestResult::from_bool(n >= range.start && (n < range.end || range.start == range.end)) + } + + fn sample_floyd2(counter: u64, length: u64) -> TestResult { + if length < 2 || length > 256 { + return TestResult::discard(); + } + + let mut rng = HasherRng::default(); + rng.counter = counter; + + let [a, b] = super::sample_floyd2(&mut rng, length); + + if a >= length || b >= length || a == b { + return TestResult::failed(); + } + + TestResult::passed() + } + } + + #[test] + fn sample_inplace_boundaries() { + let mut r = HasherRng::default(); + match super::sample_floyd2(&mut r, 2) { + [0, 1] | [1, 0] => (), + array => panic!("unexpected inplace boundaries: {:?}", array), + } + } +} diff --git a/vendor/tower/src/util/service_fn.rs b/vendor/tower/src/util/service_fn.rs new file mode 100644 index 00000000..d6e6be87 --- /dev/null +++ b/vendor/tower/src/util/service_fn.rs @@ -0,0 +1,82 @@ +use std::fmt; +use std::future::Future; +use std::task::{Context, Poll}; +use tower_service::Service; + +/// Returns a new [`ServiceFn`] with the given closure. +/// +/// This lets you build a [`Service`] from an async function that returns a [`Result`]. +/// +/// # Example +/// +/// ``` +/// use tower::{service_fn, Service, ServiceExt, BoxError}; +/// # struct Request; +/// # impl Request { +/// # fn new() -> Self { Self } +/// # } +/// # struct Response(&'static str); +/// # impl Response { +/// # fn new(body: &'static str) -> Self { +/// # Self(body) +/// # } +/// # fn into_body(self) -> &'static str { self.0 } +/// # } +/// +/// # #[tokio::main] +/// # async fn main() -> Result<(), BoxError> { +/// async fn handle(request: Request) -> Result<Response, BoxError> { +/// let response = Response::new("Hello, World!"); +/// Ok(response) +/// } +/// +/// let mut service = service_fn(handle); +/// +/// let response = service +/// .ready() +/// .await? +/// .call(Request::new()) +/// .await?; +/// +/// assert_eq!("Hello, World!", response.into_body()); +/// # +/// # Ok(()) +/// # } +/// ``` +pub fn service_fn<T>(f: T) -> ServiceFn<T> { + ServiceFn { f } +} + +/// A [`Service`] implemented by a closure. +/// +/// See [`service_fn`] for more details. +#[derive(Copy, Clone)] +pub struct ServiceFn<T> { + f: T, +} + +impl<T> fmt::Debug for ServiceFn<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ServiceFn") + .field("f", &format_args!("{}", std::any::type_name::<T>())) + .finish() + } +} + +impl<T, F, Request, R, E> Service<Request> for ServiceFn<T> +where + T: FnMut(Request) -> F, + F: Future<Output = Result<R, E>>, +{ + type Response = R; + type Error = E; + type Future = F; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), E>> { + Ok(()).into() + } + + fn call(&mut self, req: Request) -> Self::Future { + (self.f)(req) + } +} diff --git a/vendor/tower/src/util/then.rs b/vendor/tower/src/util/then.rs new file mode 100644 index 00000000..5e934506 --- /dev/null +++ b/vendor/tower/src/util/then.rs @@ -0,0 +1,103 @@ +use futures_util::{future, FutureExt}; +use std::{ + fmt, + future::Future, + task::{Context, Poll}, +}; +use tower_layer::Layer; +use tower_service::Service; + +/// [`Service`] returned by the [`then`] combinator. +/// +/// [`then`]: crate::util::ServiceExt::then +#[derive(Clone)] +pub struct Then<S, F> { + inner: S, + f: F, +} + +impl<S, F> fmt::Debug for Then<S, F> +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Then") + .field("inner", &self.inner) + .field("f", &format_args!("{}", std::any::type_name::<F>())) + .finish() + } +} + +/// A [`Layer`] that produces a [`Then`] service. +/// +/// [`Layer`]: tower_layer::Layer +#[derive(Debug, Clone)] +pub struct ThenLayer<F> { + f: F, +} + +impl<S, F> Then<S, F> { + /// Creates a new `Then` service. + pub const fn new(inner: S, f: F) -> Self { + Then { f, inner } + } + + /// Returns a new [`Layer`] that produces [`Then`] services. + /// + /// This is a convenience function that simply calls [`ThenLayer::new`]. + /// + /// [`Layer`]: tower_layer::Layer + pub fn layer(f: F) -> ThenLayer<F> { + ThenLayer { f } + } +} + +opaque_future! { + /// Response future from [`Then`] services. + /// + /// [`Then`]: crate::util::Then + pub type ThenFuture<F1, F2, N> = future::Then<F1, F2, N>; +} + +impl<S, F, Request, Response, Error, Fut> Service<Request> for Then<S, F> +where + S: Service<Request>, + S::Error: Into<Error>, + F: FnOnce(Result<S::Response, S::Error>) -> Fut + Clone, + Fut: Future<Output = Result<Response, Error>>, +{ + type Response = Response; + type Error = Error; + type Future = ThenFuture<S::Future, Fut, F>; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + #[inline] + fn call(&mut self, request: Request) -> Self::Future { + ThenFuture::new(self.inner.call(request).then(self.f.clone())) + } +} + +impl<F> ThenLayer<F> { + /// Creates a new [`ThenLayer`] layer. + pub const fn new(f: F) -> Self { + ThenLayer { f } + } +} + +impl<S, F> Layer<S> for ThenLayer<F> +where + F: Clone, +{ + type Service = Then<S, F>; + + fn layer(&self, inner: S) -> Self::Service { + Then { + f: self.f.clone(), + inner, + } + } +} |
