diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
| commit | 8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch) | |
| tree | 22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/tower/src/buffer/future.rs | |
| parent | 4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff) | |
chore: add vendor directory
Diffstat (limited to 'vendor/tower/src/buffer/future.rs')
| -rw-r--r-- | vendor/tower/src/buffer/future.rs | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/vendor/tower/src/buffer/future.rs b/vendor/tower/src/buffer/future.rs new file mode 100644 index 00000000..41178900 --- /dev/null +++ b/vendor/tower/src/buffer/future.rs @@ -0,0 +1,79 @@ +//! Future types for the [`Buffer`] middleware. +//! +//! [`Buffer`]: crate::buffer::Buffer + +use super::{error::Closed, message}; +use futures_core::ready; +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Future that completes when the buffered service eventually services the submitted request. + #[derive(Debug)] + pub struct ResponseFuture<T> { + #[pin] + state: ResponseState<T>, + } +} + +pin_project! { + #[project = ResponseStateProj] + #[derive(Debug)] + enum ResponseState<T> { + Failed { + error: Option<crate::BoxError>, + }, + Rx { + #[pin] + rx: message::Rx<T>, + }, + Poll { + #[pin] + fut: T, + }, + } +} + +impl<T> ResponseFuture<T> { + pub(crate) fn new(rx: message::Rx<T>) -> Self { + ResponseFuture { + state: ResponseState::Rx { rx }, + } + } + + pub(crate) fn failed(err: crate::BoxError) -> Self { + ResponseFuture { + state: ResponseState::Failed { error: Some(err) }, + } + } +} + +impl<F, T, E> Future for ResponseFuture<F> +where + F: Future<Output = Result<T, E>>, + E: Into<crate::BoxError>, +{ + type Output = Result<T, crate::BoxError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + loop { + match this.state.as_mut().project() { + ResponseStateProj::Failed { error } => { + return Poll::Ready(Err(error.take().expect("polled after error"))); + } + ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) { + Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }), + Ok(Err(e)) => return Poll::Ready(Err(e.into())), + Err(_) => return Poll::Ready(Err(Closed::new().into())), + }, + ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into), + } + } + } +} |
