summaryrefslogtreecommitdiff
path: root/vendor/hyper-util/src/server
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
committermo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
commit8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch)
tree22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/hyper-util/src/server
parent4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff)
chore: add vendor directory
Diffstat (limited to 'vendor/hyper-util/src/server')
-rw-r--r--vendor/hyper-util/src/server/conn/auto/mod.rs1304
-rw-r--r--vendor/hyper-util/src/server/conn/auto/upgrade.rs68
-rw-r--r--vendor/hyper-util/src/server/conn/mod.rs4
-rw-r--r--vendor/hyper-util/src/server/graceful.rs488
-rw-r--r--vendor/hyper-util/src/server/mod.rs6
5 files changed, 1870 insertions, 0 deletions
diff --git a/vendor/hyper-util/src/server/conn/auto/mod.rs b/vendor/hyper-util/src/server/conn/auto/mod.rs
new file mode 100644
index 00000000..b2fc6556
--- /dev/null
+++ b/vendor/hyper-util/src/server/conn/auto/mod.rs
@@ -0,0 +1,1304 @@
+//! Http1 or Http2 connection.
+
+pub mod upgrade;
+
+use hyper::service::HttpService;
+use std::future::Future;
+use std::marker::PhantomPinned;
+use std::mem::MaybeUninit;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{error::Error as StdError, io, time::Duration};
+
+use bytes::Bytes;
+use futures_core::ready;
+use http::{Request, Response};
+use http_body::Body;
+use hyper::{
+ body::Incoming,
+ rt::{Read, ReadBuf, Timer, Write},
+ service::Service,
+};
+
+#[cfg(feature = "http1")]
+use hyper::server::conn::http1;
+
+#[cfg(feature = "http2")]
+use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
+
+#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
+use std::marker::PhantomData;
+
+use pin_project_lite::pin_project;
+
+use crate::common::rewind::Rewind;
+
+type Error = Box<dyn std::error::Error + Send + Sync>;
+
+type Result<T> = std::result::Result<T, Error>;
+
+const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
+
+/// Exactly equivalent to [`Http2ServerConnExec`].
+#[cfg(feature = "http2")]
+pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
+
+#[cfg(feature = "http2")]
+impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
+
+/// Exactly equivalent to [`Http2ServerConnExec`].
+#[cfg(not(feature = "http2"))]
+pub trait HttpServerConnExec<A, B: Body> {}
+
+#[cfg(not(feature = "http2"))]
+impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
+
+/// Http1 or Http2 connection builder.
+#[derive(Clone, Debug)]
+pub struct Builder<E> {
+ #[cfg(feature = "http1")]
+ http1: http1::Builder,
+ #[cfg(feature = "http2")]
+ http2: http2::Builder<E>,
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ version: Option<Version>,
+ #[cfg(not(feature = "http2"))]
+ _executor: E,
+}
+
+impl<E: Default> Default for Builder<E> {
+ fn default() -> Self {
+ Self::new(E::default())
+ }
+}
+
+impl<E> Builder<E> {
+ /// Create a new auto connection builder.
+ ///
+ /// `executor` parameter should be a type that implements
+ /// [`Executor`](hyper::rt::Executor) trait.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use hyper_util::{
+ /// rt::TokioExecutor,
+ /// server::conn::auto,
+ /// };
+ ///
+ /// auto::Builder::new(TokioExecutor::new());
+ /// ```
+ pub fn new(executor: E) -> Self {
+ Self {
+ #[cfg(feature = "http1")]
+ http1: http1::Builder::new(),
+ #[cfg(feature = "http2")]
+ http2: http2::Builder::new(executor),
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ version: None,
+ #[cfg(not(feature = "http2"))]
+ _executor: executor,
+ }
+ }
+
+ /// Http1 configuration.
+ #[cfg(feature = "http1")]
+ pub fn http1(&mut self) -> Http1Builder<'_, E> {
+ Http1Builder { inner: self }
+ }
+
+ /// Http2 configuration.
+ #[cfg(feature = "http2")]
+ pub fn http2(&mut self) -> Http2Builder<'_, E> {
+ Http2Builder { inner: self }
+ }
+
+ /// Only accepts HTTP/2
+ ///
+ /// Does not do anything if used with [`serve_connection_with_upgrades`]
+ ///
+ /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
+ #[cfg(feature = "http2")]
+ pub fn http2_only(mut self) -> Self {
+ assert!(self.version.is_none());
+ self.version = Some(Version::H2);
+ self
+ }
+
+ /// Only accepts HTTP/1
+ ///
+ /// Does not do anything if used with [`serve_connection_with_upgrades`]
+ ///
+ /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
+ #[cfg(feature = "http1")]
+ pub fn http1_only(mut self) -> Self {
+ assert!(self.version.is_none());
+ self.version = Some(Version::H1);
+ self
+ }
+
+ /// Returns `true` if this builder can serve an HTTP/1.1-based connection.
+ pub fn is_http1_available(&self) -> bool {
+ match self.version {
+ #[cfg(feature = "http1")]
+ Some(Version::H1) => true,
+ #[cfg(feature = "http2")]
+ Some(Version::H2) => false,
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ _ => true,
+ }
+ }
+
+ /// Returns `true` if this builder can serve an HTTP/2-based connection.
+ pub fn is_http2_available(&self) -> bool {
+ match self.version {
+ #[cfg(feature = "http1")]
+ Some(Version::H1) => false,
+ #[cfg(feature = "http2")]
+ Some(Version::H2) => true,
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ _ => true,
+ }
+ }
+
+ /// Bind a connection together with a [`Service`].
+ pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
+ where
+ S: Service<Request<Incoming>, Response = Response<B>>,
+ S::Future: 'static,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Body + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: Read + Write + Unpin + 'static,
+ E: HttpServerConnExec<S::Future, B>,
+ {
+ let state = match self.version {
+ #[cfg(feature = "http1")]
+ Some(Version::H1) => {
+ let io = Rewind::new_buffered(io, Bytes::new());
+ let conn = self.http1.serve_connection(io, service);
+ ConnState::H1 { conn }
+ }
+ #[cfg(feature = "http2")]
+ Some(Version::H2) => {
+ let io = Rewind::new_buffered(io, Bytes::new());
+ let conn = self.http2.serve_connection(io, service);
+ ConnState::H2 { conn }
+ }
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ _ => ConnState::ReadVersion {
+ read_version: read_version(io),
+ builder: Cow::Borrowed(self),
+ service: Some(service),
+ },
+ };
+
+ Connection { state }
+ }
+
+ /// Bind a connection together with a [`Service`], with the ability to
+ /// handle HTTP upgrades. This requires that the IO object implements
+ /// `Send`.
+ ///
+ /// Note that if you ever want to use [`hyper::upgrade::Upgraded::downcast`]
+ /// with this crate, you'll need to use [`hyper_util::server::conn::auto::upgrade::downcast`]
+ /// instead. See the documentation of the latter to understand why.
+ ///
+ /// [`hyper_util::server::conn::auto::upgrade::downcast`]: crate::server::conn::auto::upgrade::downcast
+ pub fn serve_connection_with_upgrades<I, S, B>(
+ &self,
+ io: I,
+ service: S,
+ ) -> UpgradeableConnection<'_, I, S, E>
+ where
+ S: Service<Request<Incoming>, Response = Response<B>>,
+ S::Future: 'static,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Body + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: Read + Write + Unpin + Send + 'static,
+ E: HttpServerConnExec<S::Future, B>,
+ {
+ UpgradeableConnection {
+ state: UpgradeableConnState::ReadVersion {
+ read_version: read_version(io),
+ builder: Cow::Borrowed(self),
+ service: Some(service),
+ },
+ }
+ }
+}
+
+#[derive(Copy, Clone, Debug)]
+enum Version {
+ H1,
+ H2,
+}
+
+impl Version {
+ #[must_use]
+ #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
+ pub fn unsupported(self) -> Error {
+ match self {
+ Version::H1 => Error::from("HTTP/1 is not supported"),
+ Version::H2 => Error::from("HTTP/2 is not supported"),
+ }
+ }
+}
+
+fn read_version<I>(io: I) -> ReadVersion<I>
+where
+ I: Read + Unpin,
+{
+ ReadVersion {
+ io: Some(io),
+ buf: [MaybeUninit::uninit(); 24],
+ filled: 0,
+ version: Version::H2,
+ cancelled: false,
+ _pin: PhantomPinned,
+ }
+}
+
+pin_project! {
+ struct ReadVersion<I> {
+ io: Option<I>,
+ buf: [MaybeUninit<u8>; 24],
+ // the amount of `buf` thats been filled
+ filled: usize,
+ version: Version,
+ cancelled: bool,
+ // Make this future `!Unpin` for compatibility with async trait methods.
+ #[pin]
+ _pin: PhantomPinned,
+ }
+}
+
+impl<I> ReadVersion<I> {
+ pub fn cancel(self: Pin<&mut Self>) {
+ *self.project().cancelled = true;
+ }
+}
+
+impl<I> Future for ReadVersion<I>
+where
+ I: Read + Unpin,
+{
+ type Output = io::Result<(Version, Rewind<I>)>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = self.project();
+ if *this.cancelled {
+ return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
+ }
+
+ let mut buf = ReadBuf::uninit(&mut *this.buf);
+ // SAFETY: `this.filled` tracks how many bytes have been read (and thus initialized) and
+ // we're only advancing by that many.
+ unsafe {
+ buf.unfilled().advance(*this.filled);
+ };
+
+ // We start as H2 and switch to H1 as soon as we don't have the preface.
+ while buf.filled().len() < H2_PREFACE.len() {
+ let len = buf.filled().len();
+ ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
+ *this.filled = buf.filled().len();
+
+ // We starts as H2 and switch to H1 when we don't get the preface.
+ if buf.filled().len() == len
+ || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
+ {
+ *this.version = Version::H1;
+ break;
+ }
+ }
+
+ let io = this.io.take().unwrap();
+ let buf = buf.filled().to_vec();
+ Poll::Ready(Ok((
+ *this.version,
+ Rewind::new_buffered(io, Bytes::from(buf)),
+ )))
+ }
+}
+
+pin_project! {
+ /// A [`Future`](core::future::Future) representing an HTTP/1 connection, returned from
+ /// [`Builder::serve_connection`](struct.Builder.html#method.serve_connection).
+ ///
+ /// To drive HTTP on this connection this future **must be polled**, typically with
+ /// `.await`. If it isn't polled, no progress will be made on this connection.
+ #[must_use = "futures do nothing unless polled"]
+ pub struct Connection<'a, I, S, E>
+ where
+ S: HttpService<Incoming>,
+ {
+ #[pin]
+ state: ConnState<'a, I, S, E>,
+ }
+}
+
+// A custom COW, since the libstd is has ToOwned bounds that are too eager.
+enum Cow<'a, T> {
+ Borrowed(&'a T),
+ Owned(T),
+}
+
+impl<T> std::ops::Deref for Cow<'_, T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ match self {
+ Cow::Borrowed(t) => &*t,
+ Cow::Owned(ref t) => t,
+ }
+ }
+}
+
+#[cfg(feature = "http1")]
+type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
+
+#[cfg(not(feature = "http1"))]
+type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
+
+#[cfg(feature = "http2")]
+type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
+
+#[cfg(not(feature = "http2"))]
+type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
+
+pin_project! {
+ #[project = ConnStateProj]
+ enum ConnState<'a, I, S, E>
+ where
+ S: HttpService<Incoming>,
+ {
+ ReadVersion {
+ #[pin]
+ read_version: ReadVersion<I>,
+ builder: Cow<'a, Builder<E>>,
+ service: Option<S>,
+ },
+ H1 {
+ #[pin]
+ conn: Http1Connection<I, S>,
+ },
+ H2 {
+ #[pin]
+ conn: Http2Connection<I, S, E>,
+ },
+ }
+}
+
+impl<I, S, E, B> Connection<'_, I, S, E>
+where
+ S: HttpService<Incoming, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: Read + Write + Unpin,
+ B: Body + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: HttpServerConnExec<S::Future, B>,
+{
+ /// Start a graceful shutdown process for this connection.
+ ///
+ /// This `Connection` should continue to be polled until shutdown can finish.
+ ///
+ /// # Note
+ ///
+ /// This should only be called while the `Connection` future is still pending. If called after
+ /// `Connection::poll` has resolved, this does nothing.
+ pub fn graceful_shutdown(self: Pin<&mut Self>) {
+ match self.project().state.project() {
+ ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
+ #[cfg(feature = "http1")]
+ ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
+ #[cfg(feature = "http2")]
+ ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
+ #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
+ _ => unreachable!(),
+ }
+ }
+
+ /// Make this Connection static, instead of borrowing from Builder.
+ pub fn into_owned(self) -> Connection<'static, I, S, E>
+ where
+ Builder<E>: Clone,
+ {
+ Connection {
+ state: match self.state {
+ ConnState::ReadVersion {
+ read_version,
+ builder,
+ service,
+ } => ConnState::ReadVersion {
+ read_version,
+ service,
+ builder: Cow::Owned(builder.clone()),
+ },
+ #[cfg(feature = "http1")]
+ ConnState::H1 { conn } => ConnState::H1 { conn },
+ #[cfg(feature = "http2")]
+ ConnState::H2 { conn } => ConnState::H2 { conn },
+ #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
+ _ => unreachable!(),
+ },
+ }
+ }
+}
+
+impl<I, S, E, B> Future for Connection<'_, I, S, E>
+where
+ S: Service<Request<Incoming>, Response = Response<B>>,
+ S::Future: 'static,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Body + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: Read + Write + Unpin + 'static,
+ E: HttpServerConnExec<S::Future, B>,
+{
+ type Output = Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ loop {
+ let mut this = self.as_mut().project();
+
+ match this.state.as_mut().project() {
+ ConnStateProj::ReadVersion {
+ read_version,
+ builder,
+ service,
+ } => {
+ let (version, io) = ready!(read_version.poll(cx))?;
+ let service = service.take().unwrap();
+ match version {
+ #[cfg(feature = "http1")]
+ Version::H1 => {
+ let conn = builder.http1.serve_connection(io, service);
+ this.state.set(ConnState::H1 { conn });
+ }
+ #[cfg(feature = "http2")]
+ Version::H2 => {
+ let conn = builder.http2.serve_connection(io, service);
+ this.state.set(ConnState::H2 { conn });
+ }
+ #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
+ _ => return Poll::Ready(Err(version.unsupported())),
+ }
+ }
+ #[cfg(feature = "http1")]
+ ConnStateProj::H1 { conn } => {
+ return conn.poll(cx).map_err(Into::into);
+ }
+ #[cfg(feature = "http2")]
+ ConnStateProj::H2 { conn } => {
+ return conn.poll(cx).map_err(Into::into);
+ }
+ #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
+ _ => unreachable!(),
+ }
+ }
+ }
+}
+
+pin_project! {
+ /// An upgradable [`Connection`], returned by
+ /// [`Builder::serve_upgradable_connection`](struct.Builder.html#method.serve_connection_with_upgrades).
+ ///
+ /// To drive HTTP on this connection this future **must be polled**, typically with
+ /// `.await`. If it isn't polled, no progress will be made on this connection.
+ #[must_use = "futures do nothing unless polled"]
+ pub struct UpgradeableConnection<'a, I, S, E>
+ where
+ S: HttpService<Incoming>,
+ {
+ #[pin]
+ state: UpgradeableConnState<'a, I, S, E>,
+ }
+}
+
+#[cfg(feature = "http1")]
+type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
+
+#[cfg(not(feature = "http1"))]
+type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
+
+pin_project! {
+ #[project = UpgradeableConnStateProj]
+ enum UpgradeableConnState<'a, I, S, E>
+ where
+ S: HttpService<Incoming>,
+ {
+ ReadVersion {
+ #[pin]
+ read_version: ReadVersion<I>,
+ builder: Cow<'a, Builder<E>>,
+ service: Option<S>,
+ },
+ H1 {
+ #[pin]
+ conn: Http1UpgradeableConnection<Rewind<I>, S>,
+ },
+ H2 {
+ #[pin]
+ conn: Http2Connection<I, S, E>,
+ },
+ }
+}
+
+impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
+where
+ S: HttpService<Incoming, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: Read + Write + Unpin,
+ B: Body + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: HttpServerConnExec<S::Future, B>,
+{
+ /// Start a graceful shutdown process for this connection.
+ ///
+ /// This `UpgradeableConnection` should continue to be polled until shutdown can finish.
+ ///
+ /// # Note
+ ///
+ /// This should only be called while the `Connection` future is still nothing. pending. If
+ /// called after `UpgradeableConnection::poll` has resolved, this does nothing.
+ pub fn graceful_shutdown(self: Pin<&mut Self>) {
+ match self.project().state.project() {
+ UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
+ #[cfg(feature = "http1")]
+ UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
+ #[cfg(feature = "http2")]
+ UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
+ #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
+ _ => unreachable!(),
+ }
+ }
+
+ /// Make this Connection static, instead of borrowing from Builder.
+ pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
+ where
+ Builder<E>: Clone,
+ {
+ UpgradeableConnection {
+ state: match self.state {
+ UpgradeableConnState::ReadVersion {
+ read_version,
+ builder,
+ service,
+ } => UpgradeableConnState::ReadVersion {
+ read_version,
+ service,
+ builder: Cow::Owned(builder.clone()),
+ },
+ #[cfg(feature = "http1")]
+ UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
+ #[cfg(feature = "http2")]
+ UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
+ #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
+ _ => unreachable!(),
+ },
+ }
+ }
+}
+
+impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
+where
+ S: Service<Request<Incoming>, Response = Response<B>>,
+ S::Future: 'static,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Body + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: Read + Write + Unpin + Send + 'static,
+ E: HttpServerConnExec<S::Future, B>,
+{
+ type Output = Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ loop {
+ let mut this = self.as_mut().project();
+
+ match this.state.as_mut().project() {
+ UpgradeableConnStateProj::ReadVersion {
+ read_version,
+ builder,
+ service,
+ } => {
+ let (version, io) = ready!(read_version.poll(cx))?;
+ let service = service.take().unwrap();
+ match version {
+ #[cfg(feature = "http1")]
+ Version::H1 => {
+ let conn = builder.http1.serve_connection(io, service).with_upgrades();
+ this.state.set(UpgradeableConnState::H1 { conn });
+ }
+ #[cfg(feature = "http2")]
+ Version::H2 => {
+ let conn = builder.http2.serve_connection(io, service);
+ this.state.set(UpgradeableConnState::H2 { conn });
+ }
+ #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
+ _ => return Poll::Ready(Err(version.unsupported())),
+ }
+ }
+ #[cfg(feature = "http1")]
+ UpgradeableConnStateProj::H1 { conn } => {
+ return conn.poll(cx).map_err(Into::into);
+ }
+ #[cfg(feature = "http2")]
+ UpgradeableConnStateProj::H2 { conn } => {
+ return conn.poll(cx).map_err(Into::into);
+ }
+ #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
+ _ => unreachable!(),
+ }
+ }
+ }
+}
+
+/// Http1 part of builder.
+#[cfg(feature = "http1")]
+pub struct Http1Builder<'a, E> {
+ inner: &'a mut Builder<E>,
+}
+
+#[cfg(feature = "http1")]
+impl<E> Http1Builder<'_, E> {
+ /// Http2 configuration.
+ #[cfg(feature = "http2")]
+ pub fn http2(&mut self) -> Http2Builder<'_, E> {
+ Http2Builder { inner: self.inner }
+ }
+
+ /// Set whether the `date` header should be included in HTTP responses.
+ ///
+ /// Note that including the `date` header is recommended by RFC 7231.
+ ///
+ /// Default is true.
+ pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
+ self.inner.http1.auto_date_header(enabled);
+ self
+ }
+
+ /// Set whether HTTP/1 connections should support half-closures.
+ ///
+ /// Clients can chose to shutdown their write-side while waiting
+ /// for the server to respond. Setting this to `true` will
+ /// prevent closing the connection immediately if `read`
+ /// detects an EOF in the middle of a request.
+ ///
+ /// Default is `false`.
+ pub fn half_close(&mut self, val: bool) -> &mut Self {
+ self.inner.http1.half_close(val);
+ self
+ }
+
+ /// Enables or disables HTTP/1 keep-alive.
+ ///
+ /// Default is true.
+ pub fn keep_alive(&mut self, val: bool) -> &mut Self {
+ self.inner.http1.keep_alive(val);
+ self
+ }
+
+ /// Set whether HTTP/1 connections will write header names as title case at
+ /// the socket level.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
+ self.inner.http1.title_case_headers(enabled);
+ self
+ }
+
+ /// Set whether HTTP/1 connections will silently ignored malformed header lines.
+ ///
+ /// If this is enabled and a header line does not start with a valid header
+ /// name, or does not include a colon at all, the line will be silently ignored
+ /// and no error will be reported.
+ ///
+ /// Default is false.
+ pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self {
+ self.inner.http1.ignore_invalid_headers(enabled);
+ self
+ }
+
+ /// Set whether to support preserving original header cases.
+ ///
+ /// Currently, this will record the original cases received, and store them
+ /// in a private extension on the `Request`. It will also look for and use
+ /// such an extension in any provided `Response`.
+ ///
+ /// Since the relevant extension is still private, there is no way to
+ /// interact with the original cases. The only effect this can have now is
+ /// to forward the cases in a proxy-like fashion.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
+ self.inner.http1.preserve_header_case(enabled);
+ self
+ }
+
+ /// Set the maximum number of headers.
+ ///
+ /// When a request is received, the parser will reserve a buffer to store headers for optimal
+ /// performance.
+ ///
+ /// If server receives more headers than the buffer size, it responds to the client with
+ /// "431 Request Header Fields Too Large".
+ ///
+ /// The headers is allocated on the stack by default, which has higher performance. After
+ /// setting this value, headers will be allocated in heap memory, that is, heap memory
+ /// allocation will occur for each request, and there will be a performance drop of about 5%.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is 100.
+ pub fn max_headers(&mut self, val: usize) -> &mut Self {
+ self.inner.http1.max_headers(val);
+ self
+ }
+
+ /// Set a timeout for reading client request headers. If a client does not
+ /// transmit the entire header within this time, the connection is closed.
+ ///
+ /// Requires a [`Timer`] set by [`Http1Builder::timer`] to take effect. Panics if `header_read_timeout` is configured
+ /// without a [`Timer`].
+ ///
+ /// Pass `None` to disable.
+ ///
+ /// Default is currently 30 seconds, but do not depend on that.
+ pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
+ self.inner.http1.header_read_timeout(read_timeout);
+ self
+ }
+
+ /// Set whether HTTP/1 connections should try to use vectored writes,
+ /// or always flatten into a single buffer.
+ ///
+ /// Note that setting this to false may mean more copies of body data,
+ /// but may also improve performance when an IO transport doesn't
+ /// support vectored writes well, such as most TLS implementations.
+ ///
+ /// Setting this to true will force hyper to use queued strategy
+ /// which may eliminate unnecessary cloning on some TLS backends
+ ///
+ /// Default is `auto`. In this mode hyper will try to guess which
+ /// mode to use
+ pub fn writev(&mut self, val: bool) -> &mut Self {
+ self.inner.http1.writev(val);
+ self
+ }
+
+ /// Set the maximum buffer size for the connection.
+ ///
+ /// Default is ~400kb.
+ ///
+ /// # Panics
+ ///
+ /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
+ pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
+ self.inner.http1.max_buf_size(max);
+ self
+ }
+
+ /// Aggregates flushes to better support pipelined responses.
+ ///
+ /// Experimental, may have bugs.
+ ///
+ /// Default is false.
+ pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
+ self.inner.http1.pipeline_flush(enabled);
+ self
+ }
+
+ /// Set the timer used in background tasks.
+ pub fn timer<M>(&mut self, timer: M) -> &mut Self
+ where
+ M: Timer + Send + Sync + 'static,
+ {
+ self.inner.http1.timer(timer);
+ self
+ }
+
+ /// Bind a connection together with a [`Service`].
+ #[cfg(feature = "http2")]
+ pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
+ where
+ S: Service<Request<Incoming>, Response = Response<B>>,
+ S::Future: 'static,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Body + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: Read + Write + Unpin + 'static,
+ E: HttpServerConnExec<S::Future, B>,
+ {
+ self.inner.serve_connection(io, service).await
+ }
+
+ /// Bind a connection together with a [`Service`].
+ #[cfg(not(feature = "http2"))]
+ pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
+ where
+ S: Service<Request<Incoming>, Response = Response<B>>,
+ S::Future: 'static,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Body + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: Read + Write + Unpin + 'static,
+ {
+ self.inner.serve_connection(io, service).await
+ }
+
+ /// Bind a connection together with a [`Service`], with the ability to
+ /// handle HTTP upgrades. This requires that the IO object implements
+ /// `Send`.
+ #[cfg(feature = "http2")]
+ pub fn serve_connection_with_upgrades<I, S, B>(
+ &self,
+ io: I,
+ service: S,
+ ) -> UpgradeableConnection<'_, I, S, E>
+ where
+ S: Service<Request<Incoming>, Response = Response<B>>,
+ S::Future: 'static,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Body + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: Read + Write + Unpin + Send + 'static,
+ E: HttpServerConnExec<S::Future, B>,
+ {
+ self.inner.serve_connection_with_upgrades(io, service)
+ }
+}
+
+/// Http2 part of builder.
+#[cfg(feature = "http2")]
+pub struct Http2Builder<'a, E> {
+ inner: &'a mut Builder<E>,
+}
+
+#[cfg(feature = "http2")]
+impl<E> Http2Builder<'_, E> {
+ #[cfg(feature = "http1")]
+ /// Http1 configuration.
+ pub fn http1(&mut self) -> Http1Builder<'_, E> {
+ Http1Builder { inner: self.inner }
+ }
+
+ /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
+ ///
+ /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
+ /// As of v0.4.0, it is 20.
+ ///
+ /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
+ pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
+ self.inner.http2.max_pending_accept_reset_streams(max);
+ self
+ }
+
+ /// Configures the maximum number of local reset streams allowed before a GOAWAY will be sent.
+ ///
+ /// If not set, hyper will use a default, currently of 1024.
+ ///
+ /// If `None` is supplied, hyper will not apply any limit.
+ /// This is not advised, as it can potentially expose servers to DOS vulnerabilities.
+ ///
+ /// See <https://rustsec.org/advisories/RUSTSEC-2024-0003.html> for more information.
+ pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
+ self.inner.http2.max_local_error_reset_streams(max);
+ self
+ }
+
+ /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
+ /// stream-level flow control.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
+ pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
+ self.inner.http2.initial_stream_window_size(sz);
+ self
+ }
+
+ /// Sets the max connection-level flow control for HTTP2.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
+ self.inner.http2.initial_connection_window_size(sz);
+ self
+ }
+
+ /// Sets whether to use an adaptive flow control.
+ ///
+ /// Enabling this will override the limits set in
+ /// `http2_initial_stream_window_size` and
+ /// `http2_initial_connection_window_size`.
+ pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
+ self.inner.http2.adaptive_window(enabled);
+ self
+ }
+
+ /// Sets the maximum frame size to use for HTTP2.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
+ self.inner.http2.max_frame_size(sz);
+ self
+ }
+
+ /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
+ /// connections.
+ ///
+ /// Default is 200. Passing `None` will remove any limit.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
+ pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
+ self.inner.http2.max_concurrent_streams(max);
+ self
+ }
+
+ /// Sets an interval for HTTP2 Ping frames should be sent to keep a
+ /// connection alive.
+ ///
+ /// Pass `None` to disable HTTP2 keep-alive.
+ ///
+ /// Default is currently disabled.
+ ///
+ /// # Cargo Feature
+ ///
+ pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
+ self.inner.http2.keep_alive_interval(interval);
+ self
+ }
+
+ /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
+ ///
+ /// If the ping is not acknowledged within the timeout, the connection will
+ /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
+ ///
+ /// Default is 20 seconds.
+ ///
+ /// # Cargo Feature
+ ///
+ pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
+ self.inner.http2.keep_alive_timeout(timeout);
+ self
+ }
+
+ /// Set the maximum write buffer size for each HTTP/2 stream.
+ ///
+ /// Default is currently ~400KB, but may change.
+ ///
+ /// # Panics
+ ///
+ /// The value must be no larger than `u32::MAX`.
+ pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
+ self.inner.http2.max_send_buf_size(max);
+ self
+ }
+
+ /// Enables the [extended CONNECT protocol].
+ ///
+ /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
+ pub fn enable_connect_protocol(&mut self) -> &mut Self {
+ self.inner.http2.enable_connect_protocol();
+ self
+ }
+
+ /// Sets the max size of received header frames.
+ ///
+ /// Default is currently ~16MB, but may change.
+ pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
+ self.inner.http2.max_header_list_size(max);
+ self
+ }
+
+ /// Set the timer used in background tasks.
+ pub fn timer<M>(&mut self, timer: M) -> &mut Self
+ where
+ M: Timer + Send + Sync + 'static,
+ {
+ self.inner.http2.timer(timer);
+ self
+ }
+
+ /// Set whether the `date` header should be included in HTTP responses.
+ ///
+ /// Note that including the `date` header is recommended by RFC 7231.
+ ///
+ /// Default is true.
+ pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
+ self.inner.http2.auto_date_header(enabled);
+ self
+ }
+
+ /// Bind a connection together with a [`Service`].
+ pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
+ where
+ S: Service<Request<Incoming>, Response = Response<B>>,
+ S::Future: 'static,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Body + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: Read + Write + Unpin + 'static,
+ E: HttpServerConnExec<S::Future, B>,
+ {
+ self.inner.serve_connection(io, service).await
+ }
+
+ /// Bind a connection together with a [`Service`], with the ability to
+ /// handle HTTP upgrades. This requires that the IO object implements
+ /// `Send`.
+ pub fn serve_connection_with_upgrades<I, S, B>(
+ &self,
+ io: I,
+ service: S,
+ ) -> UpgradeableConnection<'_, I, S, E>
+ where
+ S: Service<Request<Incoming>, Response = Response<B>>,
+ S::Future: 'static,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Body + 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ I: Read + Write + Unpin + Send + 'static,
+ E: HttpServerConnExec<S::Future, B>,
+ {
+ self.inner.serve_connection_with_upgrades(io, service)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::{
+ rt::{TokioExecutor, TokioIo},
+ server::conn::auto,
+ };
+ use http::{Request, Response};
+ use http_body::Body;
+ use http_body_util::{BodyExt, Empty, Full};
+ use hyper::{body, body::Bytes, client, service::service_fn};
+ use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
+ use tokio::{
+ net::{TcpListener, TcpStream},
+ pin,
+ };
+
+ const BODY: &[u8] = b"Hello, world!";
+
+ #[test]
+ fn configuration() {
+ // One liner.
+ auto::Builder::new(TokioExecutor::new())
+ .http1()
+ .keep_alive(true)
+ .http2()
+ .keep_alive_interval(None);
+ // .serve_connection(io, service);
+
+ // Using variable.
+ let mut builder = auto::Builder::new(TokioExecutor::new());
+
+ builder.http1().keep_alive(true);
+ builder.http2().keep_alive_interval(None);
+ // builder.serve_connection(io, service);
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn http1() {
+ let addr = start_server(false, false).await;
+ let mut sender = connect_h1(addr).await;
+
+ let response = sender
+ .send_request(Request::new(Empty::<Bytes>::new()))
+ .await
+ .unwrap();
+
+ let body = response.into_body().collect().await.unwrap().to_bytes();
+
+ assert_eq!(body, BODY);
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn http2() {
+ let addr = start_server(false, false).await;
+ let mut sender = connect_h2(addr).await;
+
+ let response = sender
+ .send_request(Request::new(Empty::<Bytes>::new()))
+ .await
+ .unwrap();
+
+ let body = response.into_body().collect().await.unwrap().to_bytes();
+
+ assert_eq!(body, BODY);
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn http2_only() {
+ let addr = start_server(false, true).await;
+ let mut sender = connect_h2(addr).await;
+
+ let response = sender
+ .send_request(Request::new(Empty::<Bytes>::new()))
+ .await
+ .unwrap();
+
+ let body = response.into_body().collect().await.unwrap().to_bytes();
+
+ assert_eq!(body, BODY);
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn http2_only_fail_if_client_is_http1() {
+ let addr = start_server(false, true).await;
+ let mut sender = connect_h1(addr).await;
+
+ let _ = sender
+ .send_request(Request::new(Empty::<Bytes>::new()))
+ .await
+ .expect_err("should fail");
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn http1_only() {
+ let addr = start_server(true, false).await;
+ let mut sender = connect_h1(addr).await;
+
+ let response = sender
+ .send_request(Request::new(Empty::<Bytes>::new()))
+ .await
+ .unwrap();
+
+ let body = response.into_body().collect().await.unwrap().to_bytes();
+
+ assert_eq!(body, BODY);
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn http1_only_fail_if_client_is_http2() {
+ let addr = start_server(true, false).await;
+ let mut sender = connect_h2(addr).await;
+
+ let _ = sender
+ .send_request(Request::new(Empty::<Bytes>::new()))
+ .await
+ .expect_err("should fail");
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn graceful_shutdown() {
+ let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
+ .await
+ .unwrap();
+
+ let listener_addr = listener.local_addr().unwrap();
+
+ // Spawn the task in background so that we can connect there
+ let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
+ // Only connect a stream, do not send headers or anything
+ let _stream = TcpStream::connect(listener_addr).await.unwrap();
+
+ let (stream, _) = listen_task.await.unwrap();
+ let stream = TokioIo::new(stream);
+ let builder = auto::Builder::new(TokioExecutor::new());
+ let connection = builder.serve_connection(stream, service_fn(hello));
+
+ pin!(connection);
+
+ connection.as_mut().graceful_shutdown();
+
+ let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
+ .await
+ .expect("Connection should have finished in a timely manner after graceful shutdown.")
+ .expect_err("Connection should have been interrupted.");
+
+ let connection_error = connection_error
+ .downcast_ref::<std::io::Error>()
+ .expect("The error should have been `std::io::Error`.");
+ assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
+ }
+
+ async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
+ where
+ B: Body + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ {
+ let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
+ let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
+
+ tokio::spawn(connection);
+
+ sender
+ }
+
+ async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
+ where
+ B: Body + Unpin + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ {
+ let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
+ let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
+ .handshake(stream)
+ .await
+ .unwrap();
+
+ tokio::spawn(connection);
+
+ sender
+ }
+
+ async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
+ let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
+ let listener = TcpListener::bind(addr).await.unwrap();
+
+ let local_addr = listener.local_addr().unwrap();
+
+ tokio::spawn(async move {
+ loop {
+ let (stream, _) = listener.accept().await.unwrap();
+ let stream = TokioIo::new(stream);
+ tokio::task::spawn(async move {
+ let mut builder = auto::Builder::new(TokioExecutor::new());
+ if h1_only {
+ builder = builder.http1_only();
+ builder.serve_connection(stream, service_fn(hello)).await
+ } else if h2_only {
+ builder = builder.http2_only();
+ builder.serve_connection(stream, service_fn(hello)).await
+ } else {
+ builder
+ .http2()
+ .max_header_list_size(4096)
+ .serve_connection_with_upgrades(stream, service_fn(hello))
+ .await
+ }
+ .unwrap();
+ });
+ }
+ });
+
+ local_addr
+ }
+
+ async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
+ Ok(Response::new(Full::new(Bytes::from(BODY))))
+ }
+}
diff --git a/vendor/hyper-util/src/server/conn/auto/upgrade.rs b/vendor/hyper-util/src/server/conn/auto/upgrade.rs
new file mode 100644
index 00000000..8d94c409
--- /dev/null
+++ b/vendor/hyper-util/src/server/conn/auto/upgrade.rs
@@ -0,0 +1,68 @@
+//! Upgrade utilities.
+
+use bytes::{Bytes, BytesMut};
+use hyper::{
+ rt::{Read, Write},
+ upgrade::Upgraded,
+};
+
+use crate::common::rewind::Rewind;
+
+/// Tries to downcast the internal trait object to the type passed.
+///
+/// On success, returns the downcasted parts. On error, returns the Upgraded back.
+/// This is a kludge to work around the fact that the machinery provided by
+/// [`hyper_util::server::conn::auto`] wraps the inner `T` with a private type
+/// that is not reachable from outside the crate.
+///
+/// [`hyper_util::server::conn::auto`]: crate::server::conn::auto
+///
+/// This kludge will be removed when this machinery is added back to the main
+/// `hyper` code.
+pub fn downcast<T>(upgraded: Upgraded) -> Result<Parts<T>, Upgraded>
+where
+ T: Read + Write + Unpin + 'static,
+{
+ let hyper::upgrade::Parts {
+ io: rewind,
+ mut read_buf,
+ ..
+ } = upgraded.downcast::<Rewind<T>>()?;
+
+ if let Some(pre) = rewind.pre {
+ read_buf = if read_buf.is_empty() {
+ pre
+ } else {
+ let mut buf = BytesMut::from(read_buf);
+
+ buf.extend_from_slice(&pre);
+
+ buf.freeze()
+ };
+ }
+
+ Ok(Parts {
+ io: rewind.inner,
+ read_buf,
+ })
+}
+
+/// The deconstructed parts of an [`Upgraded`] type.
+///
+/// Includes the original IO type, and a read buffer of bytes that the
+/// HTTP state machine may have already read before completing an upgrade.
+#[derive(Debug)]
+#[non_exhaustive]
+pub struct Parts<T> {
+ /// The original IO object used before the upgrade.
+ pub io: T,
+ /// A buffer of bytes that have been read but not processed as HTTP.
+ ///
+ /// For instance, if the `Connection` is used for an HTTP upgrade request,
+ /// it is possible the server sent back the first bytes of the new protocol
+ /// along with the response upgrade.
+ ///
+ /// You will want to check for any existing bytes if you plan to continue
+ /// communicating on the IO object.
+ pub read_buf: Bytes,
+}
diff --git a/vendor/hyper-util/src/server/conn/mod.rs b/vendor/hyper-util/src/server/conn/mod.rs
new file mode 100644
index 00000000..b23503a1
--- /dev/null
+++ b/vendor/hyper-util/src/server/conn/mod.rs
@@ -0,0 +1,4 @@
+//! Connection utilities.
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+pub mod auto;
diff --git a/vendor/hyper-util/src/server/graceful.rs b/vendor/hyper-util/src/server/graceful.rs
new file mode 100644
index 00000000..b367fc8a
--- /dev/null
+++ b/vendor/hyper-util/src/server/graceful.rs
@@ -0,0 +1,488 @@
+//! Utility to gracefully shutdown a server.
+//!
+//! This module provides a [`GracefulShutdown`] type,
+//! which can be used to gracefully shutdown a server.
+//!
+//! See <https://github.com/hyperium/hyper-util/blob/master/examples/server_graceful.rs>
+//! for an example of how to use this.
+
+use std::{
+ fmt::{self, Debug},
+ future::Future,
+ pin::Pin,
+ task::{self, Poll},
+};
+
+use pin_project_lite::pin_project;
+use tokio::sync::watch;
+
+/// A graceful shutdown utility
+// Purposefully not `Clone`, see `watcher()` method for why.
+pub struct GracefulShutdown {
+ tx: watch::Sender<()>,
+}
+
+/// A watcher side of the graceful shutdown.
+///
+/// This type can only watch a connection, it cannot trigger a shutdown.
+///
+/// Call [`GracefulShutdown::watcher()`] to construct one of these.
+pub struct Watcher {
+ rx: watch::Receiver<()>,
+}
+
+impl GracefulShutdown {
+ /// Create a new graceful shutdown helper.
+ pub fn new() -> Self {
+ let (tx, _) = watch::channel(());
+ Self { tx }
+ }
+
+ /// Wrap a future for graceful shutdown watching.
+ pub fn watch<C: GracefulConnection>(&self, conn: C) -> impl Future<Output = C::Output> {
+ self.watcher().watch(conn)
+ }
+
+ /// Create an owned type that can watch a connection.
+ ///
+ /// This method allows created an owned type that can be sent onto another
+ /// task before calling [`Watcher::watch()`].
+ // Internal: this function exists because `Clone` allows footguns.
+ // If the `tx` were cloned (or the `rx`), race conditions can happens where
+ // one task starting a shutdown is scheduled and interwined with a task
+ // starting to watch a connection, and the "watch version" is one behind.
+ pub fn watcher(&self) -> Watcher {
+ let rx = self.tx.subscribe();
+ Watcher { rx }
+ }
+
+ /// Signal shutdown for all watched connections.
+ ///
+ /// This returns a `Future` which will complete once all watched
+ /// connections have shutdown.
+ pub async fn shutdown(self) {
+ let Self { tx } = self;
+
+ // signal all the watched futures about the change
+ let _ = tx.send(());
+ // and then wait for all of them to complete
+ tx.closed().await;
+ }
+
+ /// Returns the number of the watching connections.
+ pub fn count(&self) -> usize {
+ self.tx.receiver_count()
+ }
+}
+
+impl Debug for GracefulShutdown {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("GracefulShutdown").finish()
+ }
+}
+
+impl Default for GracefulShutdown {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl Watcher {
+ /// Wrap a future for graceful shutdown watching.
+ pub fn watch<C: GracefulConnection>(self, conn: C) -> impl Future<Output = C::Output> {
+ let Watcher { mut rx } = self;
+ GracefulConnectionFuture::new(conn, async move {
+ let _ = rx.changed().await;
+ // hold onto the rx until the watched future is completed
+ rx
+ })
+ }
+}
+
+impl Debug for Watcher {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("GracefulWatcher").finish()
+ }
+}
+
+pin_project! {
+ struct GracefulConnectionFuture<C, F: Future> {
+ #[pin]
+ conn: C,
+ #[pin]
+ cancel: F,
+ #[pin]
+ // If cancelled, this is held until the inner conn is done.
+ cancelled_guard: Option<F::Output>,
+ }
+}
+
+impl<C, F: Future> GracefulConnectionFuture<C, F> {
+ fn new(conn: C, cancel: F) -> Self {
+ Self {
+ conn,
+ cancel,
+ cancelled_guard: None,
+ }
+ }
+}
+
+impl<C, F: Future> Debug for GracefulConnectionFuture<C, F> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("GracefulConnectionFuture").finish()
+ }
+}
+
+impl<C, F> Future for GracefulConnectionFuture<C, F>
+where
+ C: GracefulConnection,
+ F: Future,
+{
+ type Output = C::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ if this.cancelled_guard.is_none() {
+ if let Poll::Ready(guard) = this.cancel.poll(cx) {
+ this.cancelled_guard.set(Some(guard));
+ this.conn.as_mut().graceful_shutdown();
+ }
+ }
+ this.conn.poll(cx)
+ }
+}
+
+/// An internal utility trait as an umbrella target for all (hyper) connection
+/// types that the [`GracefulShutdown`] can watch.
+pub trait GracefulConnection: Future<Output = Result<(), Self::Error>> + private::Sealed {
+ /// The error type returned by the connection when used as a future.
+ type Error;
+
+ /// Start a graceful shutdown process for this connection.
+ fn graceful_shutdown(self: Pin<&mut Self>);
+}
+
+#[cfg(feature = "http1")]
+impl<I, B, S> GracefulConnection for hyper::server::conn::http1::Connection<I, S>
+where
+ S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
+ B: hyper::body::Body + 'static,
+ B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+{
+ type Error = hyper::Error;
+
+ fn graceful_shutdown(self: Pin<&mut Self>) {
+ hyper::server::conn::http1::Connection::graceful_shutdown(self);
+ }
+}
+
+#[cfg(feature = "http2")]
+impl<I, B, S, E> GracefulConnection for hyper::server::conn::http2::Connection<I, S, E>
+where
+ S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
+ B: hyper::body::Body + 'static,
+ B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
+{
+ type Error = hyper::Error;
+
+ fn graceful_shutdown(self: Pin<&mut Self>) {
+ hyper::server::conn::http2::Connection::graceful_shutdown(self);
+ }
+}
+
+#[cfg(feature = "server-auto")]
+impl<I, B, S, E> GracefulConnection for crate::server::conn::auto::Connection<'_, I, S, E>
+where
+ S: hyper::service::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>>,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ S::Future: 'static,
+ I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
+ B: hyper::body::Body + 'static,
+ B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
+{
+ type Error = Box<dyn std::error::Error + Send + Sync>;
+
+ fn graceful_shutdown(self: Pin<&mut Self>) {
+ crate::server::conn::auto::Connection::graceful_shutdown(self);
+ }
+}
+
+#[cfg(feature = "server-auto")]
+impl<I, B, S, E> GracefulConnection
+ for crate::server::conn::auto::UpgradeableConnection<'_, I, S, E>
+where
+ S: hyper::service::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>>,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ S::Future: 'static,
+ I: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
+ B: hyper::body::Body + 'static,
+ B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
+{
+ type Error = Box<dyn std::error::Error + Send + Sync>;
+
+ fn graceful_shutdown(self: Pin<&mut Self>) {
+ crate::server::conn::auto::UpgradeableConnection::graceful_shutdown(self);
+ }
+}
+
+mod private {
+ pub trait Sealed {}
+
+ #[cfg(feature = "http1")]
+ impl<I, B, S> Sealed for hyper::server::conn::http1::Connection<I, S>
+ where
+ S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
+ B: hyper::body::Body + 'static,
+ B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ {
+ }
+
+ #[cfg(feature = "http1")]
+ impl<I, B, S> Sealed for hyper::server::conn::http1::UpgradeableConnection<I, S>
+ where
+ S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
+ B: hyper::body::Body + 'static,
+ B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ {
+ }
+
+ #[cfg(feature = "http2")]
+ impl<I, B, S, E> Sealed for hyper::server::conn::http2::Connection<I, S, E>
+ where
+ S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
+ B: hyper::body::Body + 'static,
+ B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
+ {
+ }
+
+ #[cfg(feature = "server-auto")]
+ impl<I, B, S, E> Sealed for crate::server::conn::auto::Connection<'_, I, S, E>
+ where
+ S: hyper::service::Service<
+ http::Request<hyper::body::Incoming>,
+ Response = http::Response<B>,
+ >,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ S::Future: 'static,
+ I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
+ B: hyper::body::Body + 'static,
+ B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
+ {
+ }
+
+ #[cfg(feature = "server-auto")]
+ impl<I, B, S, E> Sealed for crate::server::conn::auto::UpgradeableConnection<'_, I, S, E>
+ where
+ S: hyper::service::Service<
+ http::Request<hyper::body::Incoming>,
+ Response = http::Response<B>,
+ >,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ S::Future: 'static,
+ I: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
+ B: hyper::body::Body + 'static,
+ B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
+ {
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use pin_project_lite::pin_project;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::sync::Arc;
+
+ pin_project! {
+ #[derive(Debug)]
+ struct DummyConnection<F> {
+ #[pin]
+ future: F,
+ shutdown_counter: Arc<AtomicUsize>,
+ }
+ }
+
+ impl<F> private::Sealed for DummyConnection<F> {}
+
+ impl<F: Future> GracefulConnection for DummyConnection<F> {
+ type Error = ();
+
+ fn graceful_shutdown(self: Pin<&mut Self>) {
+ self.shutdown_counter.fetch_add(1, Ordering::SeqCst);
+ }
+ }
+
+ impl<F: Future> Future for DummyConnection<F> {
+ type Output = Result<(), ()>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ match self.project().future.poll(cx) {
+ Poll::Ready(_) => Poll::Ready(Ok(())),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn test_graceful_shutdown_ok() {
+ let graceful = GracefulShutdown::new();
+ let shutdown_counter = Arc::new(AtomicUsize::new(0));
+ let (dummy_tx, _) = tokio::sync::broadcast::channel(1);
+
+ for i in 1..=3 {
+ let mut dummy_rx = dummy_tx.subscribe();
+ let shutdown_counter = shutdown_counter.clone();
+
+ let future = async move {
+ tokio::time::sleep(std::time::Duration::from_millis(i * 10)).await;
+ let _ = dummy_rx.recv().await;
+ };
+ let dummy_conn = DummyConnection {
+ future,
+ shutdown_counter,
+ };
+ let conn = graceful.watch(dummy_conn);
+ tokio::spawn(async move {
+ conn.await.unwrap();
+ });
+ }
+
+ assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0);
+ let _ = dummy_tx.send(());
+
+ tokio::select! {
+ _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
+ panic!("timeout")
+ },
+ _ = graceful.shutdown() => {
+ assert_eq!(shutdown_counter.load(Ordering::SeqCst), 3);
+ }
+ }
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn test_graceful_shutdown_delayed_ok() {
+ let graceful = GracefulShutdown::new();
+ let shutdown_counter = Arc::new(AtomicUsize::new(0));
+
+ for i in 1..=3 {
+ let shutdown_counter = shutdown_counter.clone();
+
+ //tokio::time::sleep(std::time::Duration::from_millis(i * 5)).await;
+ let future = async move {
+ tokio::time::sleep(std::time::Duration::from_millis(i * 50)).await;
+ };
+ let dummy_conn = DummyConnection {
+ future,
+ shutdown_counter,
+ };
+ let conn = graceful.watch(dummy_conn);
+ tokio::spawn(async move {
+ conn.await.unwrap();
+ });
+ }
+
+ assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0);
+
+ tokio::select! {
+ _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => {
+ panic!("timeout")
+ },
+ _ = graceful.shutdown() => {
+ assert_eq!(shutdown_counter.load(Ordering::SeqCst), 3);
+ }
+ }
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn test_graceful_shutdown_multi_per_watcher_ok() {
+ let graceful = GracefulShutdown::new();
+ let shutdown_counter = Arc::new(AtomicUsize::new(0));
+
+ for i in 1..=3 {
+ let shutdown_counter = shutdown_counter.clone();
+
+ let mut futures = Vec::new();
+ for u in 1..=i {
+ let future = tokio::time::sleep(std::time::Duration::from_millis(u * 50));
+ let dummy_conn = DummyConnection {
+ future,
+ shutdown_counter: shutdown_counter.clone(),
+ };
+ let conn = graceful.watch(dummy_conn);
+ futures.push(conn);
+ }
+ tokio::spawn(async move {
+ futures_util::future::join_all(futures).await;
+ });
+ }
+
+ assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0);
+
+ tokio::select! {
+ _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => {
+ panic!("timeout")
+ },
+ _ = graceful.shutdown() => {
+ assert_eq!(shutdown_counter.load(Ordering::SeqCst), 6);
+ }
+ }
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn test_graceful_shutdown_timeout() {
+ let graceful = GracefulShutdown::new();
+ let shutdown_counter = Arc::new(AtomicUsize::new(0));
+
+ for i in 1..=3 {
+ let shutdown_counter = shutdown_counter.clone();
+
+ let future = async move {
+ if i == 1 {
+ std::future::pending::<()>().await
+ } else {
+ std::future::ready(()).await
+ }
+ };
+ let dummy_conn = DummyConnection {
+ future,
+ shutdown_counter,
+ };
+ let conn = graceful.watch(dummy_conn);
+ tokio::spawn(async move {
+ conn.await.unwrap();
+ });
+ }
+
+ assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0);
+
+ tokio::select! {
+ _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
+ assert_eq!(shutdown_counter.load(Ordering::SeqCst), 3);
+ },
+ _ = graceful.shutdown() => {
+ panic!("shutdown should not be completed: as not all our conns finish")
+ }
+ }
+ }
+}
diff --git a/vendor/hyper-util/src/server/mod.rs b/vendor/hyper-util/src/server/mod.rs
new file mode 100644
index 00000000..a4838ac5
--- /dev/null
+++ b/vendor/hyper-util/src/server/mod.rs
@@ -0,0 +1,6 @@
+//! Server utilities.
+
+pub mod conn;
+
+#[cfg(feature = "server-graceful")]
+pub mod graceful;