diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
| commit | 8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch) | |
| tree | 22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/tower/src/buffer/worker.rs | |
| parent | 4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff) | |
chore: add vendor directory
Diffstat (limited to 'vendor/tower/src/buffer/worker.rs')
| -rw-r--r-- | vendor/tower/src/buffer/worker.rs | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/vendor/tower/src/buffer/worker.rs b/vendor/tower/src/buffer/worker.rs new file mode 100644 index 00000000..7f4416d6 --- /dev/null +++ b/vendor/tower/src/buffer/worker.rs @@ -0,0 +1,227 @@ +use super::{ + error::{Closed, ServiceError}, + message::Message, +}; +use futures_core::ready; +use std::sync::{Arc, Mutex}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::sync::mpsc; +use tower_service::Service; + +pin_project_lite::pin_project! { + /// Task that handles processing the buffer. This type should not be used + /// directly, instead `Buffer` requires an `Executor` that can accept this task. + /// + /// The struct is `pub` in the private module and the type is *not* re-exported + /// as part of the public API. This is the "sealed" pattern to include "private" + /// types in public traits that are not meant for consumers of the library to + /// implement (only call). + #[derive(Debug)] + pub struct Worker<T, Request> + where + T: Service<Request>, + { + current_message: Option<Message<Request, T::Future>>, + rx: mpsc::Receiver<Message<Request, T::Future>>, + service: T, + finish: bool, + failed: Option<ServiceError>, + handle: Handle, + } +} + +/// Get the error out +#[derive(Debug)] +pub(crate) struct Handle { + inner: Arc<Mutex<Option<ServiceError>>>, +} + +impl<T, Request> Worker<T, Request> +where + T: Service<Request>, + T::Error: Into<crate::BoxError>, +{ + pub(crate) fn new( + service: T, + rx: mpsc::Receiver<Message<Request, T::Future>>, + ) -> (Handle, Worker<T, Request>) { + let handle = Handle { + inner: Arc::new(Mutex::new(None)), + }; + + let worker = Worker { + current_message: None, + finish: false, + failed: None, + rx, + service, + handle: handle.clone(), + }; + + (handle, worker) + } + + /// Return the next queued Message that hasn't been canceled. + /// + /// If a `Message` is returned, the `bool` is true if this is the first time we received this + /// message, and false otherwise (i.e., we tried to forward it to the backing service before). + fn poll_next_msg( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<Option<(Message<Request, T::Future>, bool)>> { + if self.finish { + // We've already received None and are shutting down + return Poll::Ready(None); + } + + tracing::trace!("worker polling for next message"); + if let Some(msg) = self.current_message.take() { + // If the oneshot sender is closed, then the receiver is dropped, + // and nobody cares about the response. If this is the case, we + // should continue to the next request. + if !msg.tx.is_closed() { + tracing::trace!("resuming buffered request"); + return Poll::Ready(Some((msg, false))); + } + + tracing::trace!("dropping cancelled buffered request"); + } + + // Get the next request + while let Some(msg) = ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + if !msg.tx.is_closed() { + tracing::trace!("processing new request"); + return Poll::Ready(Some((msg, true))); + } + // Otherwise, request is canceled, so pop the next one. + tracing::trace!("dropping cancelled request"); + } + + Poll::Ready(None) + } + + fn failed(&mut self, error: crate::BoxError) { + // The underlying service failed when we called `poll_ready` on it with the given `error`. We + // need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in + // an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent + // requests will also fail with the same error. + + // Note that we need to handle the case where some handle is concurrently trying to send us + // a request. We need to make sure that *either* the send of the request fails *or* it + // receives an error on the `oneshot` it constructed. Specifically, we want to avoid the + // case where we send errors to all outstanding requests, and *then* the caller sends its + // request. We do this by *first* exposing the error, *then* closing the channel used to + // send more requests (so the client will see the error when the send fails), and *then* + // sending the error to all outstanding requests. + let error = ServiceError::new(error); + + let mut inner = self.handle.inner.lock().unwrap(); + + if inner.is_some() { + // Future::poll was called after we've already errored out! + return; + } + + *inner = Some(error.clone()); + drop(inner); + + self.rx.close(); + + // By closing the mpsc::Receiver, we know that poll_next_msg will soon return Ready(None), + // which will trigger the `self.finish == true` phase. We just need to make sure that any + // requests that we receive before we've exhausted the receiver receive the error: + self.failed = Some(error); + } +} + +impl<T, Request> Future for Worker<T, Request> +where + T: Service<Request>, + T::Error: Into<crate::BoxError>, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if self.finish { + return Poll::Ready(()); + } + + loop { + match ready!(self.poll_next_msg(cx)) { + Some((msg, first)) => { + let _guard = msg.span.enter(); + if let Some(ref failed) = self.failed { + tracing::trace!("notifying caller about worker failure"); + let _ = msg.tx.send(Err(failed.clone())); + continue; + } + + // Wait for the service to be ready + tracing::trace!( + resumed = !first, + message = "worker received request; waiting for service readiness" + ); + match self.service.poll_ready(cx) { + Poll::Ready(Ok(())) => { + tracing::debug!(service.ready = true, message = "processing request"); + let response = self.service.call(msg.request); + + // Send the response future back to the sender. + // + // An error means the request had been canceled in-between + // our calls, the response future will just be dropped. + tracing::trace!("returning response future"); + let _ = msg.tx.send(Ok(response)); + } + Poll::Pending => { + tracing::trace!(service.ready = false, message = "delay"); + // Put out current message back in its slot. + drop(_guard); + self.current_message = Some(msg); + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + let error = e.into(); + tracing::debug!({ %error }, "service failed"); + drop(_guard); + self.failed(error); + let _ = msg.tx.send(Err(self + .failed + .as_ref() + .expect("Worker::failed did not set self.failed?") + .clone())); + } + } + } + None => { + // No more more requests _ever_. + self.finish = true; + return Poll::Ready(()); + } + } + } + } +} + +impl Handle { + pub(crate) fn get_error_on_closed(&self) -> crate::BoxError { + self.inner + .lock() + .unwrap() + .as_ref() + .map(|svc_err| svc_err.clone().into()) + .unwrap_or_else(|| Closed::new().into()) + } +} + +impl Clone for Handle { + fn clone(&self) -> Handle { + Handle { + inner: self.inner.clone(), + } + } +} |
