diff options
Diffstat (limited to 'vendor/hyper/src/client/dispatch.rs')
| -rw-r--r-- | vendor/hyper/src/client/dispatch.rs | 510 |
1 files changed, 510 insertions, 0 deletions
diff --git a/vendor/hyper/src/client/dispatch.rs b/vendor/hyper/src/client/dispatch.rs new file mode 100644 index 00000000..4ae41c50 --- /dev/null +++ b/vendor/hyper/src/client/dispatch.rs @@ -0,0 +1,510 @@ +use std::task::{Context, Poll}; +#[cfg(feature = "http2")] +use std::{future::Future, pin::Pin}; + +#[cfg(feature = "http2")] +use http::{Request, Response}; +#[cfg(feature = "http2")] +use http_body::Body; +#[cfg(feature = "http2")] +use pin_project_lite::pin_project; +use tokio::sync::{mpsc, oneshot}; + +#[cfg(feature = "http2")] +use crate::{body::Incoming, proto::h2::client::ResponseFutMap}; + +pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>; +pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>; + +/// An error when calling `try_send_request`. +/// +/// There is a possibility of an error occurring on a connection in-between the +/// time that a request is queued and when it is actually written to the IO +/// transport. If that happens, it is safe to return the request back to the +/// caller, as it was never fully sent. +#[derive(Debug)] +pub struct TrySendError<T> { + pub(crate) error: crate::Error, + pub(crate) message: Option<T>, +} + +pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) { + let (tx, rx) = mpsc::unbounded_channel(); + let (giver, taker) = want::new(); + let tx = Sender { + #[cfg(feature = "http1")] + buffered_once: false, + giver, + inner: tx, + }; + let rx = Receiver { inner: rx, taker }; + (tx, rx) +} + +/// A bounded sender of requests and callbacks for when responses are ready. +/// +/// While the inner sender is unbounded, the Giver is used to determine +/// if the Receiver is ready for another request. +pub(crate) struct Sender<T, U> { + /// One message is always allowed, even if the Receiver hasn't asked + /// for it yet. This boolean keeps track of whether we've sent one + /// without notice. + #[cfg(feature = "http1")] + buffered_once: bool, + /// The Giver helps watch that the Receiver side has been polled + /// when the queue is empty. This helps us know when a request and + /// response have been fully processed, and a connection is ready + /// for more. + giver: want::Giver, + /// Actually bounded by the Giver, plus `buffered_once`. + inner: mpsc::UnboundedSender<Envelope<T, U>>, +} + +/// An unbounded version. +/// +/// Cannot poll the Giver, but can still use it to determine if the Receiver +/// has been dropped. However, this version can be cloned. +#[cfg(feature = "http2")] +pub(crate) struct UnboundedSender<T, U> { + /// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked. + giver: want::SharedGiver, + inner: mpsc::UnboundedSender<Envelope<T, U>>, +} + +impl<T, U> Sender<T, U> { + #[cfg(feature = "http1")] + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { + self.giver + .poll_want(cx) + .map_err(|_| crate::Error::new_closed()) + } + + #[cfg(feature = "http1")] + pub(crate) fn is_ready(&self) -> bool { + self.giver.is_wanting() + } + + #[cfg(feature = "http1")] + pub(crate) fn is_closed(&self) -> bool { + self.giver.is_canceled() + } + + #[cfg(feature = "http1")] + fn can_send(&mut self) -> bool { + if self.giver.give() || !self.buffered_once { + // If the receiver is ready *now*, then of course we can send. + // + // If the receiver isn't ready yet, but we don't have anything + // in the channel yet, then allow one message. + self.buffered_once = true; + true + } else { + false + } + } + + #[cfg(feature = "http1")] + pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { + if !self.can_send() { + return Err(val); + } + let (tx, rx) = oneshot::channel(); + self.inner + .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) + .map(move |_| rx) + .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) + } + + #[cfg(feature = "http1")] + pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> { + if !self.can_send() { + return Err(val); + } + let (tx, rx) = oneshot::channel(); + self.inner + .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) + .map(move |_| rx) + .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) + } + + #[cfg(feature = "http2")] + pub(crate) fn unbound(self) -> UnboundedSender<T, U> { + UnboundedSender { + giver: self.giver.shared(), + inner: self.inner, + } + } +} + +#[cfg(feature = "http2")] +impl<T, U> UnboundedSender<T, U> { + pub(crate) fn is_ready(&self) -> bool { + !self.giver.is_canceled() + } + + pub(crate) fn is_closed(&self) -> bool { + self.giver.is_canceled() + } + + pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { + let (tx, rx) = oneshot::channel(); + self.inner + .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) + .map(move |_| rx) + .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) + } + + pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> { + let (tx, rx) = oneshot::channel(); + self.inner + .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) + .map(move |_| rx) + .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) + } +} + +#[cfg(feature = "http2")] +impl<T, U> Clone for UnboundedSender<T, U> { + fn clone(&self) -> Self { + UnboundedSender { + giver: self.giver.clone(), + inner: self.inner.clone(), + } + } +} + +pub(crate) struct Receiver<T, U> { + inner: mpsc::UnboundedReceiver<Envelope<T, U>>, + taker: want::Taker, +} + +impl<T, U> Receiver<T, U> { + pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> { + match self.inner.poll_recv(cx) { + Poll::Ready(item) => { + Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped"))) + } + Poll::Pending => { + self.taker.want(); + Poll::Pending + } + } + } + + #[cfg(feature = "http1")] + pub(crate) fn close(&mut self) { + self.taker.cancel(); + self.inner.close(); + } + + #[cfg(feature = "http1")] + pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> { + use futures_util::FutureExt; + match self.inner.recv().now_or_never() { + Some(Some(mut env)) => env.0.take(), + _ => None, + } + } +} + +impl<T, U> Drop for Receiver<T, U> { + fn drop(&mut self) { + // Notify the giver about the closure first, before dropping + // the mpsc::Receiver. + self.taker.cancel(); + } +} + +struct Envelope<T, U>(Option<(T, Callback<T, U>)>); + +impl<T, U> Drop for Envelope<T, U> { + fn drop(&mut self) { + if let Some((val, cb)) = self.0.take() { + cb.send(Err(TrySendError { + error: crate::Error::new_canceled().with("connection closed"), + message: Some(val), + })); + } + } +} + +pub(crate) enum Callback<T, U> { + #[allow(unused)] + Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>), + NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>), +} + +impl<T, U> Drop for Callback<T, U> { + fn drop(&mut self) { + match self { + Callback::Retry(tx) => { + if let Some(tx) = tx.take() { + let _ = tx.send(Err(TrySendError { + error: dispatch_gone(), + message: None, + })); + } + } + Callback::NoRetry(tx) => { + if let Some(tx) = tx.take() { + let _ = tx.send(Err(dispatch_gone())); + } + } + } + } +} + +#[cold] +fn dispatch_gone() -> crate::Error { + // FIXME(nox): What errors do we want here? + crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() { + "user code panicked" + } else { + "runtime dropped the dispatch task" + }) +} + +impl<T, U> Callback<T, U> { + #[cfg(feature = "http2")] + pub(crate) fn is_canceled(&self) -> bool { + match *self { + Callback::Retry(Some(ref tx)) => tx.is_closed(), + Callback::NoRetry(Some(ref tx)) => tx.is_closed(), + _ => unreachable!(), + } + } + + pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { + match *self { + Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx), + Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx), + _ => unreachable!(), + } + } + + pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) { + match self { + Callback::Retry(ref mut tx) => { + let _ = tx.take().unwrap().send(val); + } + Callback::NoRetry(ref mut tx) => { + let _ = tx.take().unwrap().send(val.map_err(|e| e.error)); + } + } + } +} + +impl<T> TrySendError<T> { + /// Take the message from this error. + /// + /// The message will not always have been recovered. If an error occurs + /// after the message has been serialized onto the connection, it will not + /// be available here. + pub fn take_message(&mut self) -> Option<T> { + self.message.take() + } + + /// Consumes this to return the inner error. + pub fn into_error(self) -> crate::Error { + self.error + } +} + +#[cfg(feature = "http2")] +pin_project! { + pub struct SendWhen<B> + where + B: Body, + B: 'static, + { + #[pin] + pub(crate) when: ResponseFutMap<B>, + #[pin] + pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>, + } +} + +#[cfg(feature = "http2")] +impl<B> Future for SendWhen<B> +where + B: Body + 'static, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + let mut call_back = this.call_back.take().expect("polled after complete"); + + match Pin::new(&mut this.when).poll(cx) { + Poll::Ready(Ok(res)) => { + call_back.send(Ok(res)); + Poll::Ready(()) + } + Poll::Pending => { + // check if the callback is canceled + match call_back.poll_canceled(cx) { + Poll::Ready(v) => v, + Poll::Pending => { + // Move call_back back to struct before return + this.call_back.set(Some(call_back)); + return Poll::Pending; + } + }; + trace!("send_when canceled"); + Poll::Ready(()) + } + Poll::Ready(Err((error, message))) => { + call_back.send(Err(TrySendError { error, message })); + Poll::Ready(()) + } + } + } +} + +#[cfg(test)] +mod tests { + #[cfg(feature = "nightly")] + extern crate test; + + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use super::{channel, Callback, Receiver}; + + #[derive(Debug)] + struct Custom(#[allow(dead_code)] i32); + + impl<T, U> Future for Receiver<T, U> { + type Output = Option<(T, Callback<T, U>)>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.poll_recv(cx) + } + } + + /// Helper to check if the future is ready after polling once. + struct PollOnce<'a, F>(&'a mut F); + + impl<F, T> Future for PollOnce<'_, F> + where + F: Future<Output = T> + Unpin, + { + type Output = Option<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match Pin::new(&mut self.0).poll(cx) { + Poll::Ready(_) => Poll::Ready(Some(())), + Poll::Pending => Poll::Ready(None), + } + } + } + + #[cfg(not(miri))] + #[tokio::test] + async fn drop_receiver_sends_cancel_errors() { + let _ = pretty_env_logger::try_init(); + + let (mut tx, mut rx) = channel::<Custom, ()>(); + + // must poll once for try_send to succeed + assert!(PollOnce(&mut rx).await.is_none(), "rx empty"); + + let promise = tx.try_send(Custom(43)).unwrap(); + drop(rx); + + let fulfilled = promise.await; + let err = fulfilled + .expect("fulfilled") + .expect_err("promise should error"); + match (err.error.is_canceled(), err.message) { + (true, Some(_)) => (), + e => panic!("expected Error::Cancel(_), found {:?}", e), + } + } + + #[cfg(not(miri))] + #[tokio::test] + async fn sender_checks_for_want_on_send() { + let (mut tx, mut rx) = channel::<Custom, ()>(); + + // one is allowed to buffer, second is rejected + let _ = tx.try_send(Custom(1)).expect("1 buffered"); + tx.try_send(Custom(2)).expect_err("2 not ready"); + + assert!(PollOnce(&mut rx).await.is_some(), "rx once"); + + // Even though 1 has been popped, only 1 could be buffered for the + // lifetime of the channel. + tx.try_send(Custom(2)).expect_err("2 still not ready"); + + assert!(PollOnce(&mut rx).await.is_none(), "rx empty"); + + let _ = tx.try_send(Custom(2)).expect("2 ready"); + } + + #[cfg(feature = "http2")] + #[test] + fn unbounded_sender_doesnt_bound_on_want() { + let (tx, rx) = channel::<Custom, ()>(); + let mut tx = tx.unbound(); + + let _ = tx.try_send(Custom(1)).unwrap(); + let _ = tx.try_send(Custom(2)).unwrap(); + let _ = tx.try_send(Custom(3)).unwrap(); + + drop(rx); + + let _ = tx.try_send(Custom(4)).unwrap_err(); + } + + #[cfg(feature = "nightly")] + #[bench] + fn giver_queue_throughput(b: &mut test::Bencher) { + use crate::{body::Incoming, Request, Response}; + + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + let (mut tx, mut rx) = channel::<Request<Incoming>, Response<Incoming>>(); + + b.iter(move || { + let _ = tx.send(Request::new(Incoming::empty())).unwrap(); + rt.block_on(async { + loop { + let poll_once = PollOnce(&mut rx); + let opt = poll_once.await; + if opt.is_none() { + break; + } + } + }); + }) + } + + #[cfg(feature = "nightly")] + #[bench] + fn giver_queue_not_ready(b: &mut test::Bencher) { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + let (_tx, mut rx) = channel::<i32, ()>(); + b.iter(move || { + rt.block_on(async { + let poll_once = PollOnce(&mut rx); + assert!(poll_once.await.is_none()); + }); + }) + } + + #[cfg(feature = "nightly")] + #[bench] + fn giver_queue_cancel(b: &mut test::Bencher) { + let (_tx, mut rx) = channel::<i32, ()>(); + + b.iter(move || { + rx.taker.cancel(); + }) + } +} |
