summaryrefslogtreecommitdiff
path: root/vendor/hyper-util/src/server
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-15 16:37:08 -0600
committermo khan <mo@mokhan.ca>2025-07-17 16:30:22 -0600
commit45df4d0d9b577fecee798d672695fe24ff57fb1b (patch)
tree1b99bf645035b58e0d6db08c7a83521f41f7a75b /vendor/hyper-util/src/server
parentf94f79608393d4ab127db63cc41668445ef6b243 (diff)
feat: migrate from Cedar to SpiceDB authorization system
This is a major architectural change that replaces the Cedar policy-based authorization system with SpiceDB's relation-based authorization. Key changes: - Migrate from Rust to Go implementation - Replace Cedar policies with SpiceDB schema and relationships - Switch from envoy `ext_authz` with Cedar to SpiceDB permission checks - Update build system and dependencies for Go ecosystem - Maintain Envoy integration for external authorization This change enables more flexible permission modeling through SpiceDB's Google Zanzibar inspired relation-based system, supporting complex hierarchical permissions that were difficult to express in Cedar. Breaking change: Existing Cedar policies and Rust-based configuration will no longer work and need to be migrated to SpiceDB schema.
Diffstat (limited to 'vendor/hyper-util/src/server')
-rw-r--r--vendor/hyper-util/src/server/conn/auto/mod.rs1304
-rw-r--r--vendor/hyper-util/src/server/conn/auto/upgrade.rs68
-rw-r--r--vendor/hyper-util/src/server/conn/mod.rs4
-rw-r--r--vendor/hyper-util/src/server/graceful.rs488
-rw-r--r--vendor/hyper-util/src/server/mod.rs6
5 files changed, 0 insertions, 1870 deletions
diff --git a/vendor/hyper-util/src/server/conn/auto/mod.rs b/vendor/hyper-util/src/server/conn/auto/mod.rs
deleted file mode 100644
index b2fc6556..00000000
--- a/vendor/hyper-util/src/server/conn/auto/mod.rs
+++ /dev/null
@@ -1,1304 +0,0 @@
-//! Http1 or Http2 connection.
-
-pub mod upgrade;
-
-use hyper::service::HttpService;
-use std::future::Future;
-use std::marker::PhantomPinned;
-use std::mem::MaybeUninit;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-use std::{error::Error as StdError, io, time::Duration};
-
-use bytes::Bytes;
-use futures_core::ready;
-use http::{Request, Response};
-use http_body::Body;
-use hyper::{
- body::Incoming,
- rt::{Read, ReadBuf, Timer, Write},
- service::Service,
-};
-
-#[cfg(feature = "http1")]
-use hyper::server::conn::http1;
-
-#[cfg(feature = "http2")]
-use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
-
-#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
-use std::marker::PhantomData;
-
-use pin_project_lite::pin_project;
-
-use crate::common::rewind::Rewind;
-
-type Error = Box<dyn std::error::Error + Send + Sync>;
-
-type Result<T> = std::result::Result<T, Error>;
-
-const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
-
-/// Exactly equivalent to [`Http2ServerConnExec`].
-#[cfg(feature = "http2")]
-pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
-
-#[cfg(feature = "http2")]
-impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
-
-/// Exactly equivalent to [`Http2ServerConnExec`].
-#[cfg(not(feature = "http2"))]
-pub trait HttpServerConnExec<A, B: Body> {}
-
-#[cfg(not(feature = "http2"))]
-impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
-
-/// Http1 or Http2 connection builder.
-#[derive(Clone, Debug)]
-pub struct Builder<E> {
- #[cfg(feature = "http1")]
- http1: http1::Builder,
- #[cfg(feature = "http2")]
- http2: http2::Builder<E>,
- #[cfg(any(feature = "http1", feature = "http2"))]
- version: Option<Version>,
- #[cfg(not(feature = "http2"))]
- _executor: E,
-}
-
-impl<E: Default> Default for Builder<E> {
- fn default() -> Self {
- Self::new(E::default())
- }
-}
-
-impl<E> Builder<E> {
- /// Create a new auto connection builder.
- ///
- /// `executor` parameter should be a type that implements
- /// [`Executor`](hyper::rt::Executor) trait.
- ///
- /// # Example
- ///
- /// ```
- /// use hyper_util::{
- /// rt::TokioExecutor,
- /// server::conn::auto,
- /// };
- ///
- /// auto::Builder::new(TokioExecutor::new());
- /// ```
- pub fn new(executor: E) -> Self {
- Self {
- #[cfg(feature = "http1")]
- http1: http1::Builder::new(),
- #[cfg(feature = "http2")]
- http2: http2::Builder::new(executor),
- #[cfg(any(feature = "http1", feature = "http2"))]
- version: None,
- #[cfg(not(feature = "http2"))]
- _executor: executor,
- }
- }
-
- /// Http1 configuration.
- #[cfg(feature = "http1")]
- pub fn http1(&mut self) -> Http1Builder<'_, E> {
- Http1Builder { inner: self }
- }
-
- /// Http2 configuration.
- #[cfg(feature = "http2")]
- pub fn http2(&mut self) -> Http2Builder<'_, E> {
- Http2Builder { inner: self }
- }
-
- /// Only accepts HTTP/2
- ///
- /// Does not do anything if used with [`serve_connection_with_upgrades`]
- ///
- /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
- #[cfg(feature = "http2")]
- pub fn http2_only(mut self) -> Self {
- assert!(self.version.is_none());
- self.version = Some(Version::H2);
- self
- }
-
- /// Only accepts HTTP/1
- ///
- /// Does not do anything if used with [`serve_connection_with_upgrades`]
- ///
- /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
- #[cfg(feature = "http1")]
- pub fn http1_only(mut self) -> Self {
- assert!(self.version.is_none());
- self.version = Some(Version::H1);
- self
- }
-
- /// Returns `true` if this builder can serve an HTTP/1.1-based connection.
- pub fn is_http1_available(&self) -> bool {
- match self.version {
- #[cfg(feature = "http1")]
- Some(Version::H1) => true,
- #[cfg(feature = "http2")]
- Some(Version::H2) => false,
- #[cfg(any(feature = "http1", feature = "http2"))]
- _ => true,
- }
- }
-
- /// Returns `true` if this builder can serve an HTTP/2-based connection.
- pub fn is_http2_available(&self) -> bool {
- match self.version {
- #[cfg(feature = "http1")]
- Some(Version::H1) => false,
- #[cfg(feature = "http2")]
- Some(Version::H2) => true,
- #[cfg(any(feature = "http1", feature = "http2"))]
- _ => true,
- }
- }
-
- /// Bind a connection together with a [`Service`].
- pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
- where
- S: Service<Request<Incoming>, Response = Response<B>>,
- S::Future: 'static,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- B: Body + 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- I: Read + Write + Unpin + 'static,
- E: HttpServerConnExec<S::Future, B>,
- {
- let state = match self.version {
- #[cfg(feature = "http1")]
- Some(Version::H1) => {
- let io = Rewind::new_buffered(io, Bytes::new());
- let conn = self.http1.serve_connection(io, service);
- ConnState::H1 { conn }
- }
- #[cfg(feature = "http2")]
- Some(Version::H2) => {
- let io = Rewind::new_buffered(io, Bytes::new());
- let conn = self.http2.serve_connection(io, service);
- ConnState::H2 { conn }
- }
- #[cfg(any(feature = "http1", feature = "http2"))]
- _ => ConnState::ReadVersion {
- read_version: read_version(io),
- builder: Cow::Borrowed(self),
- service: Some(service),
- },
- };
-
- Connection { state }
- }
-
- /// Bind a connection together with a [`Service`], with the ability to
- /// handle HTTP upgrades. This requires that the IO object implements
- /// `Send`.
- ///
- /// Note that if you ever want to use [`hyper::upgrade::Upgraded::downcast`]
- /// with this crate, you'll need to use [`hyper_util::server::conn::auto::upgrade::downcast`]
- /// instead. See the documentation of the latter to understand why.
- ///
- /// [`hyper_util::server::conn::auto::upgrade::downcast`]: crate::server::conn::auto::upgrade::downcast
- pub fn serve_connection_with_upgrades<I, S, B>(
- &self,
- io: I,
- service: S,
- ) -> UpgradeableConnection<'_, I, S, E>
- where
- S: Service<Request<Incoming>, Response = Response<B>>,
- S::Future: 'static,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- B: Body + 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- I: Read + Write + Unpin + Send + 'static,
- E: HttpServerConnExec<S::Future, B>,
- {
- UpgradeableConnection {
- state: UpgradeableConnState::ReadVersion {
- read_version: read_version(io),
- builder: Cow::Borrowed(self),
- service: Some(service),
- },
- }
- }
-}
-
-#[derive(Copy, Clone, Debug)]
-enum Version {
- H1,
- H2,
-}
-
-impl Version {
- #[must_use]
- #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
- pub fn unsupported(self) -> Error {
- match self {
- Version::H1 => Error::from("HTTP/1 is not supported"),
- Version::H2 => Error::from("HTTP/2 is not supported"),
- }
- }
-}
-
-fn read_version<I>(io: I) -> ReadVersion<I>
-where
- I: Read + Unpin,
-{
- ReadVersion {
- io: Some(io),
- buf: [MaybeUninit::uninit(); 24],
- filled: 0,
- version: Version::H2,
- cancelled: false,
- _pin: PhantomPinned,
- }
-}
-
-pin_project! {
- struct ReadVersion<I> {
- io: Option<I>,
- buf: [MaybeUninit<u8>; 24],
- // the amount of `buf` thats been filled
- filled: usize,
- version: Version,
- cancelled: bool,
- // Make this future `!Unpin` for compatibility with async trait methods.
- #[pin]
- _pin: PhantomPinned,
- }
-}
-
-impl<I> ReadVersion<I> {
- pub fn cancel(self: Pin<&mut Self>) {
- *self.project().cancelled = true;
- }
-}
-
-impl<I> Future for ReadVersion<I>
-where
- I: Read + Unpin,
-{
- type Output = io::Result<(Version, Rewind<I>)>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let this = self.project();
- if *this.cancelled {
- return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
- }
-
- let mut buf = ReadBuf::uninit(&mut *this.buf);
- // SAFETY: `this.filled` tracks how many bytes have been read (and thus initialized) and
- // we're only advancing by that many.
- unsafe {
- buf.unfilled().advance(*this.filled);
- };
-
- // We start as H2 and switch to H1 as soon as we don't have the preface.
- while buf.filled().len() < H2_PREFACE.len() {
- let len = buf.filled().len();
- ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
- *this.filled = buf.filled().len();
-
- // We starts as H2 and switch to H1 when we don't get the preface.
- if buf.filled().len() == len
- || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
- {
- *this.version = Version::H1;
- break;
- }
- }
-
- let io = this.io.take().unwrap();
- let buf = buf.filled().to_vec();
- Poll::Ready(Ok((
- *this.version,
- Rewind::new_buffered(io, Bytes::from(buf)),
- )))
- }
-}
-
-pin_project! {
- /// A [`Future`](core::future::Future) representing an HTTP/1 connection, returned from
- /// [`Builder::serve_connection`](struct.Builder.html#method.serve_connection).
- ///
- /// To drive HTTP on this connection this future **must be polled**, typically with
- /// `.await`. If it isn't polled, no progress will be made on this connection.
- #[must_use = "futures do nothing unless polled"]
- pub struct Connection<'a, I, S, E>
- where
- S: HttpService<Incoming>,
- {
- #[pin]
- state: ConnState<'a, I, S, E>,
- }
-}
-
-// A custom COW, since the libstd is has ToOwned bounds that are too eager.
-enum Cow<'a, T> {
- Borrowed(&'a T),
- Owned(T),
-}
-
-impl<T> std::ops::Deref for Cow<'_, T> {
- type Target = T;
- fn deref(&self) -> &T {
- match self {
- Cow::Borrowed(t) => &*t,
- Cow::Owned(ref t) => t,
- }
- }
-}
-
-#[cfg(feature = "http1")]
-type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
-
-#[cfg(not(feature = "http1"))]
-type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
-
-#[cfg(feature = "http2")]
-type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
-
-#[cfg(not(feature = "http2"))]
-type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
-
-pin_project! {
- #[project = ConnStateProj]
- enum ConnState<'a, I, S, E>
- where
- S: HttpService<Incoming>,
- {
- ReadVersion {
- #[pin]
- read_version: ReadVersion<I>,
- builder: Cow<'a, Builder<E>>,
- service: Option<S>,
- },
- H1 {
- #[pin]
- conn: Http1Connection<I, S>,
- },
- H2 {
- #[pin]
- conn: Http2Connection<I, S, E>,
- },
- }
-}
-
-impl<I, S, E, B> Connection<'_, I, S, E>
-where
- S: HttpService<Incoming, ResBody = B>,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- I: Read + Write + Unpin,
- B: Body + 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- E: HttpServerConnExec<S::Future, B>,
-{
- /// Start a graceful shutdown process for this connection.
- ///
- /// This `Connection` should continue to be polled until shutdown can finish.
- ///
- /// # Note
- ///
- /// This should only be called while the `Connection` future is still pending. If called after
- /// `Connection::poll` has resolved, this does nothing.
- pub fn graceful_shutdown(self: Pin<&mut Self>) {
- match self.project().state.project() {
- ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
- #[cfg(feature = "http1")]
- ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
- #[cfg(feature = "http2")]
- ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
- #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
- _ => unreachable!(),
- }
- }
-
- /// Make this Connection static, instead of borrowing from Builder.
- pub fn into_owned(self) -> Connection<'static, I, S, E>
- where
- Builder<E>: Clone,
- {
- Connection {
- state: match self.state {
- ConnState::ReadVersion {
- read_version,
- builder,
- service,
- } => ConnState::ReadVersion {
- read_version,
- service,
- builder: Cow::Owned(builder.clone()),
- },
- #[cfg(feature = "http1")]
- ConnState::H1 { conn } => ConnState::H1 { conn },
- #[cfg(feature = "http2")]
- ConnState::H2 { conn } => ConnState::H2 { conn },
- #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
- _ => unreachable!(),
- },
- }
- }
-}
-
-impl<I, S, E, B> Future for Connection<'_, I, S, E>
-where
- S: Service<Request<Incoming>, Response = Response<B>>,
- S::Future: 'static,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- B: Body + 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- I: Read + Write + Unpin + 'static,
- E: HttpServerConnExec<S::Future, B>,
-{
- type Output = Result<()>;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- loop {
- let mut this = self.as_mut().project();
-
- match this.state.as_mut().project() {
- ConnStateProj::ReadVersion {
- read_version,
- builder,
- service,
- } => {
- let (version, io) = ready!(read_version.poll(cx))?;
- let service = service.take().unwrap();
- match version {
- #[cfg(feature = "http1")]
- Version::H1 => {
- let conn = builder.http1.serve_connection(io, service);
- this.state.set(ConnState::H1 { conn });
- }
- #[cfg(feature = "http2")]
- Version::H2 => {
- let conn = builder.http2.serve_connection(io, service);
- this.state.set(ConnState::H2 { conn });
- }
- #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
- _ => return Poll::Ready(Err(version.unsupported())),
- }
- }
- #[cfg(feature = "http1")]
- ConnStateProj::H1 { conn } => {
- return conn.poll(cx).map_err(Into::into);
- }
- #[cfg(feature = "http2")]
- ConnStateProj::H2 { conn } => {
- return conn.poll(cx).map_err(Into::into);
- }
- #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
- _ => unreachable!(),
- }
- }
- }
-}
-
-pin_project! {
- /// An upgradable [`Connection`], returned by
- /// [`Builder::serve_upgradable_connection`](struct.Builder.html#method.serve_connection_with_upgrades).
- ///
- /// To drive HTTP on this connection this future **must be polled**, typically with
- /// `.await`. If it isn't polled, no progress will be made on this connection.
- #[must_use = "futures do nothing unless polled"]
- pub struct UpgradeableConnection<'a, I, S, E>
- where
- S: HttpService<Incoming>,
- {
- #[pin]
- state: UpgradeableConnState<'a, I, S, E>,
- }
-}
-
-#[cfg(feature = "http1")]
-type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
-
-#[cfg(not(feature = "http1"))]
-type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
-
-pin_project! {
- #[project = UpgradeableConnStateProj]
- enum UpgradeableConnState<'a, I, S, E>
- where
- S: HttpService<Incoming>,
- {
- ReadVersion {
- #[pin]
- read_version: ReadVersion<I>,
- builder: Cow<'a, Builder<E>>,
- service: Option<S>,
- },
- H1 {
- #[pin]
- conn: Http1UpgradeableConnection<Rewind<I>, S>,
- },
- H2 {
- #[pin]
- conn: Http2Connection<I, S, E>,
- },
- }
-}
-
-impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
-where
- S: HttpService<Incoming, ResBody = B>,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- I: Read + Write + Unpin,
- B: Body + 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- E: HttpServerConnExec<S::Future, B>,
-{
- /// Start a graceful shutdown process for this connection.
- ///
- /// This `UpgradeableConnection` should continue to be polled until shutdown can finish.
- ///
- /// # Note
- ///
- /// This should only be called while the `Connection` future is still nothing. pending. If
- /// called after `UpgradeableConnection::poll` has resolved, this does nothing.
- pub fn graceful_shutdown(self: Pin<&mut Self>) {
- match self.project().state.project() {
- UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
- #[cfg(feature = "http1")]
- UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
- #[cfg(feature = "http2")]
- UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
- #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
- _ => unreachable!(),
- }
- }
-
- /// Make this Connection static, instead of borrowing from Builder.
- pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
- where
- Builder<E>: Clone,
- {
- UpgradeableConnection {
- state: match self.state {
- UpgradeableConnState::ReadVersion {
- read_version,
- builder,
- service,
- } => UpgradeableConnState::ReadVersion {
- read_version,
- service,
- builder: Cow::Owned(builder.clone()),
- },
- #[cfg(feature = "http1")]
- UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
- #[cfg(feature = "http2")]
- UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
- #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
- _ => unreachable!(),
- },
- }
- }
-}
-
-impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
-where
- S: Service<Request<Incoming>, Response = Response<B>>,
- S::Future: 'static,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- B: Body + 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- I: Read + Write + Unpin + Send + 'static,
- E: HttpServerConnExec<S::Future, B>,
-{
- type Output = Result<()>;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- loop {
- let mut this = self.as_mut().project();
-
- match this.state.as_mut().project() {
- UpgradeableConnStateProj::ReadVersion {
- read_version,
- builder,
- service,
- } => {
- let (version, io) = ready!(read_version.poll(cx))?;
- let service = service.take().unwrap();
- match version {
- #[cfg(feature = "http1")]
- Version::H1 => {
- let conn = builder.http1.serve_connection(io, service).with_upgrades();
- this.state.set(UpgradeableConnState::H1 { conn });
- }
- #[cfg(feature = "http2")]
- Version::H2 => {
- let conn = builder.http2.serve_connection(io, service);
- this.state.set(UpgradeableConnState::H2 { conn });
- }
- #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
- _ => return Poll::Ready(Err(version.unsupported())),
- }
- }
- #[cfg(feature = "http1")]
- UpgradeableConnStateProj::H1 { conn } => {
- return conn.poll(cx).map_err(Into::into);
- }
- #[cfg(feature = "http2")]
- UpgradeableConnStateProj::H2 { conn } => {
- return conn.poll(cx).map_err(Into::into);
- }
- #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
- _ => unreachable!(),
- }
- }
- }
-}
-
-/// Http1 part of builder.
-#[cfg(feature = "http1")]
-pub struct Http1Builder<'a, E> {
- inner: &'a mut Builder<E>,
-}
-
-#[cfg(feature = "http1")]
-impl<E> Http1Builder<'_, E> {
- /// Http2 configuration.
- #[cfg(feature = "http2")]
- pub fn http2(&mut self) -> Http2Builder<'_, E> {
- Http2Builder { inner: self.inner }
- }
-
- /// Set whether the `date` header should be included in HTTP responses.
- ///
- /// Note that including the `date` header is recommended by RFC 7231.
- ///
- /// Default is true.
- pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
- self.inner.http1.auto_date_header(enabled);
- self
- }
-
- /// Set whether HTTP/1 connections should support half-closures.
- ///
- /// Clients can chose to shutdown their write-side while waiting
- /// for the server to respond. Setting this to `true` will
- /// prevent closing the connection immediately if `read`
- /// detects an EOF in the middle of a request.
- ///
- /// Default is `false`.
- pub fn half_close(&mut self, val: bool) -> &mut Self {
- self.inner.http1.half_close(val);
- self
- }
-
- /// Enables or disables HTTP/1 keep-alive.
- ///
- /// Default is true.
- pub fn keep_alive(&mut self, val: bool) -> &mut Self {
- self.inner.http1.keep_alive(val);
- self
- }
-
- /// Set whether HTTP/1 connections will write header names as title case at
- /// the socket level.
- ///
- /// Note that this setting does not affect HTTP/2.
- ///
- /// Default is false.
- pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
- self.inner.http1.title_case_headers(enabled);
- self
- }
-
- /// Set whether HTTP/1 connections will silently ignored malformed header lines.
- ///
- /// If this is enabled and a header line does not start with a valid header
- /// name, or does not include a colon at all, the line will be silently ignored
- /// and no error will be reported.
- ///
- /// Default is false.
- pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self {
- self.inner.http1.ignore_invalid_headers(enabled);
- self
- }
-
- /// Set whether to support preserving original header cases.
- ///
- /// Currently, this will record the original cases received, and store them
- /// in a private extension on the `Request`. It will also look for and use
- /// such an extension in any provided `Response`.
- ///
- /// Since the relevant extension is still private, there is no way to
- /// interact with the original cases. The only effect this can have now is
- /// to forward the cases in a proxy-like fashion.
- ///
- /// Note that this setting does not affect HTTP/2.
- ///
- /// Default is false.
- pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
- self.inner.http1.preserve_header_case(enabled);
- self
- }
-
- /// Set the maximum number of headers.
- ///
- /// When a request is received, the parser will reserve a buffer to store headers for optimal
- /// performance.
- ///
- /// If server receives more headers than the buffer size, it responds to the client with
- /// "431 Request Header Fields Too Large".
- ///
- /// The headers is allocated on the stack by default, which has higher performance. After
- /// setting this value, headers will be allocated in heap memory, that is, heap memory
- /// allocation will occur for each request, and there will be a performance drop of about 5%.
- ///
- /// Note that this setting does not affect HTTP/2.
- ///
- /// Default is 100.
- pub fn max_headers(&mut self, val: usize) -> &mut Self {
- self.inner.http1.max_headers(val);
- self
- }
-
- /// Set a timeout for reading client request headers. If a client does not
- /// transmit the entire header within this time, the connection is closed.
- ///
- /// Requires a [`Timer`] set by [`Http1Builder::timer`] to take effect. Panics if `header_read_timeout` is configured
- /// without a [`Timer`].
- ///
- /// Pass `None` to disable.
- ///
- /// Default is currently 30 seconds, but do not depend on that.
- pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
- self.inner.http1.header_read_timeout(read_timeout);
- self
- }
-
- /// Set whether HTTP/1 connections should try to use vectored writes,
- /// or always flatten into a single buffer.
- ///
- /// Note that setting this to false may mean more copies of body data,
- /// but may also improve performance when an IO transport doesn't
- /// support vectored writes well, such as most TLS implementations.
- ///
- /// Setting this to true will force hyper to use queued strategy
- /// which may eliminate unnecessary cloning on some TLS backends
- ///
- /// Default is `auto`. In this mode hyper will try to guess which
- /// mode to use
- pub fn writev(&mut self, val: bool) -> &mut Self {
- self.inner.http1.writev(val);
- self
- }
-
- /// Set the maximum buffer size for the connection.
- ///
- /// Default is ~400kb.
- ///
- /// # Panics
- ///
- /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
- pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
- self.inner.http1.max_buf_size(max);
- self
- }
-
- /// Aggregates flushes to better support pipelined responses.
- ///
- /// Experimental, may have bugs.
- ///
- /// Default is false.
- pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
- self.inner.http1.pipeline_flush(enabled);
- self
- }
-
- /// Set the timer used in background tasks.
- pub fn timer<M>(&mut self, timer: M) -> &mut Self
- where
- M: Timer + Send + Sync + 'static,
- {
- self.inner.http1.timer(timer);
- self
- }
-
- /// Bind a connection together with a [`Service`].
- #[cfg(feature = "http2")]
- pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
- where
- S: Service<Request<Incoming>, Response = Response<B>>,
- S::Future: 'static,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- B: Body + 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- I: Read + Write + Unpin + 'static,
- E: HttpServerConnExec<S::Future, B>,
- {
- self.inner.serve_connection(io, service).await
- }
-
- /// Bind a connection together with a [`Service`].
- #[cfg(not(feature = "http2"))]
- pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
- where
- S: Service<Request<Incoming>, Response = Response<B>>,
- S::Future: 'static,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- B: Body + 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- I: Read + Write + Unpin + 'static,
- {
- self.inner.serve_connection(io, service).await
- }
-
- /// Bind a connection together with a [`Service`], with the ability to
- /// handle HTTP upgrades. This requires that the IO object implements
- /// `Send`.
- #[cfg(feature = "http2")]
- pub fn serve_connection_with_upgrades<I, S, B>(
- &self,
- io: I,
- service: S,
- ) -> UpgradeableConnection<'_, I, S, E>
- where
- S: Service<Request<Incoming>, Response = Response<B>>,
- S::Future: 'static,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- B: Body + 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- I: Read + Write + Unpin + Send + 'static,
- E: HttpServerConnExec<S::Future, B>,
- {
- self.inner.serve_connection_with_upgrades(io, service)
- }
-}
-
-/// Http2 part of builder.
-#[cfg(feature = "http2")]
-pub struct Http2Builder<'a, E> {
- inner: &'a mut Builder<E>,
-}
-
-#[cfg(feature = "http2")]
-impl<E> Http2Builder<'_, E> {
- #[cfg(feature = "http1")]
- /// Http1 configuration.
- pub fn http1(&mut self) -> Http1Builder<'_, E> {
- Http1Builder { inner: self.inner }
- }
-
- /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
- ///
- /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
- /// As of v0.4.0, it is 20.
- ///
- /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
- pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
- self.inner.http2.max_pending_accept_reset_streams(max);
- self
- }
-
- /// Configures the maximum number of local reset streams allowed before a GOAWAY will be sent.
- ///
- /// If not set, hyper will use a default, currently of 1024.
- ///
- /// If `None` is supplied, hyper will not apply any limit.
- /// This is not advised, as it can potentially expose servers to DOS vulnerabilities.
- ///
- /// See <https://rustsec.org/advisories/RUSTSEC-2024-0003.html> for more information.
- pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
- self.inner.http2.max_local_error_reset_streams(max);
- self
- }
-
- /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
- /// stream-level flow control.
- ///
- /// Passing `None` will do nothing.
- ///
- /// If not set, hyper will use a default.
- ///
- /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
- pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
- self.inner.http2.initial_stream_window_size(sz);
- self
- }
-
- /// Sets the max connection-level flow control for HTTP2.
- ///
- /// Passing `None` will do nothing.
- ///
- /// If not set, hyper will use a default.
- pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
- self.inner.http2.initial_connection_window_size(sz);
- self
- }
-
- /// Sets whether to use an adaptive flow control.
- ///
- /// Enabling this will override the limits set in
- /// `http2_initial_stream_window_size` and
- /// `http2_initial_connection_window_size`.
- pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
- self.inner.http2.adaptive_window(enabled);
- self
- }
-
- /// Sets the maximum frame size to use for HTTP2.
- ///
- /// Passing `None` will do nothing.
- ///
- /// If not set, hyper will use a default.
- pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
- self.inner.http2.max_frame_size(sz);
- self
- }
-
- /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
- /// connections.
- ///
- /// Default is 200. Passing `None` will remove any limit.
- ///
- /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
- pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
- self.inner.http2.max_concurrent_streams(max);
- self
- }
-
- /// Sets an interval for HTTP2 Ping frames should be sent to keep a
- /// connection alive.
- ///
- /// Pass `None` to disable HTTP2 keep-alive.
- ///
- /// Default is currently disabled.
- ///
- /// # Cargo Feature
- ///
- pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
- self.inner.http2.keep_alive_interval(interval);
- self
- }
-
- /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
- ///
- /// If the ping is not acknowledged within the timeout, the connection will
- /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
- ///
- /// Default is 20 seconds.
- ///
- /// # Cargo Feature
- ///
- pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
- self.inner.http2.keep_alive_timeout(timeout);
- self
- }
-
- /// Set the maximum write buffer size for each HTTP/2 stream.
- ///
- /// Default is currently ~400KB, but may change.
- ///
- /// # Panics
- ///
- /// The value must be no larger than `u32::MAX`.
- pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
- self.inner.http2.max_send_buf_size(max);
- self
- }
-
- /// Enables the [extended CONNECT protocol].
- ///
- /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
- pub fn enable_connect_protocol(&mut self) -> &mut Self {
- self.inner.http2.enable_connect_protocol();
- self
- }
-
- /// Sets the max size of received header frames.
- ///
- /// Default is currently ~16MB, but may change.
- pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
- self.inner.http2.max_header_list_size(max);
- self
- }
-
- /// Set the timer used in background tasks.
- pub fn timer<M>(&mut self, timer: M) -> &mut Self
- where
- M: Timer + Send + Sync + 'static,
- {
- self.inner.http2.timer(timer);
- self
- }
-
- /// Set whether the `date` header should be included in HTTP responses.
- ///
- /// Note that including the `date` header is recommended by RFC 7231.
- ///
- /// Default is true.
- pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
- self.inner.http2.auto_date_header(enabled);
- self
- }
-
- /// Bind a connection together with a [`Service`].
- pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
- where
- S: Service<Request<Incoming>, Response = Response<B>>,
- S::Future: 'static,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- B: Body + 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- I: Read + Write + Unpin + 'static,
- E: HttpServerConnExec<S::Future, B>,
- {
- self.inner.serve_connection(io, service).await
- }
-
- /// Bind a connection together with a [`Service`], with the ability to
- /// handle HTTP upgrades. This requires that the IO object implements
- /// `Send`.
- pub fn serve_connection_with_upgrades<I, S, B>(
- &self,
- io: I,
- service: S,
- ) -> UpgradeableConnection<'_, I, S, E>
- where
- S: Service<Request<Incoming>, Response = Response<B>>,
- S::Future: 'static,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- B: Body + 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- I: Read + Write + Unpin + Send + 'static,
- E: HttpServerConnExec<S::Future, B>,
- {
- self.inner.serve_connection_with_upgrades(io, service)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use crate::{
- rt::{TokioExecutor, TokioIo},
- server::conn::auto,
- };
- use http::{Request, Response};
- use http_body::Body;
- use http_body_util::{BodyExt, Empty, Full};
- use hyper::{body, body::Bytes, client, service::service_fn};
- use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
- use tokio::{
- net::{TcpListener, TcpStream},
- pin,
- };
-
- const BODY: &[u8] = b"Hello, world!";
-
- #[test]
- fn configuration() {
- // One liner.
- auto::Builder::new(TokioExecutor::new())
- .http1()
- .keep_alive(true)
- .http2()
- .keep_alive_interval(None);
- // .serve_connection(io, service);
-
- // Using variable.
- let mut builder = auto::Builder::new(TokioExecutor::new());
-
- builder.http1().keep_alive(true);
- builder.http2().keep_alive_interval(None);
- // builder.serve_connection(io, service);
- }
-
- #[cfg(not(miri))]
- #[tokio::test]
- async fn http1() {
- let addr = start_server(false, false).await;
- let mut sender = connect_h1(addr).await;
-
- let response = sender
- .send_request(Request::new(Empty::<Bytes>::new()))
- .await
- .unwrap();
-
- let body = response.into_body().collect().await.unwrap().to_bytes();
-
- assert_eq!(body, BODY);
- }
-
- #[cfg(not(miri))]
- #[tokio::test]
- async fn http2() {
- let addr = start_server(false, false).await;
- let mut sender = connect_h2(addr).await;
-
- let response = sender
- .send_request(Request::new(Empty::<Bytes>::new()))
- .await
- .unwrap();
-
- let body = response.into_body().collect().await.unwrap().to_bytes();
-
- assert_eq!(body, BODY);
- }
-
- #[cfg(not(miri))]
- #[tokio::test]
- async fn http2_only() {
- let addr = start_server(false, true).await;
- let mut sender = connect_h2(addr).await;
-
- let response = sender
- .send_request(Request::new(Empty::<Bytes>::new()))
- .await
- .unwrap();
-
- let body = response.into_body().collect().await.unwrap().to_bytes();
-
- assert_eq!(body, BODY);
- }
-
- #[cfg(not(miri))]
- #[tokio::test]
- async fn http2_only_fail_if_client_is_http1() {
- let addr = start_server(false, true).await;
- let mut sender = connect_h1(addr).await;
-
- let _ = sender
- .send_request(Request::new(Empty::<Bytes>::new()))
- .await
- .expect_err("should fail");
- }
-
- #[cfg(not(miri))]
- #[tokio::test]
- async fn http1_only() {
- let addr = start_server(true, false).await;
- let mut sender = connect_h1(addr).await;
-
- let response = sender
- .send_request(Request::new(Empty::<Bytes>::new()))
- .await
- .unwrap();
-
- let body = response.into_body().collect().await.unwrap().to_bytes();
-
- assert_eq!(body, BODY);
- }
-
- #[cfg(not(miri))]
- #[tokio::test]
- async fn http1_only_fail_if_client_is_http2() {
- let addr = start_server(true, false).await;
- let mut sender = connect_h2(addr).await;
-
- let _ = sender
- .send_request(Request::new(Empty::<Bytes>::new()))
- .await
- .expect_err("should fail");
- }
-
- #[cfg(not(miri))]
- #[tokio::test]
- async fn graceful_shutdown() {
- let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
- .await
- .unwrap();
-
- let listener_addr = listener.local_addr().unwrap();
-
- // Spawn the task in background so that we can connect there
- let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
- // Only connect a stream, do not send headers or anything
- let _stream = TcpStream::connect(listener_addr).await.unwrap();
-
- let (stream, _) = listen_task.await.unwrap();
- let stream = TokioIo::new(stream);
- let builder = auto::Builder::new(TokioExecutor::new());
- let connection = builder.serve_connection(stream, service_fn(hello));
-
- pin!(connection);
-
- connection.as_mut().graceful_shutdown();
-
- let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
- .await
- .expect("Connection should have finished in a timely manner after graceful shutdown.")
- .expect_err("Connection should have been interrupted.");
-
- let connection_error = connection_error
- .downcast_ref::<std::io::Error>()
- .expect("The error should have been `std::io::Error`.");
- assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
- }
-
- async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
- where
- B: Body + Send + 'static,
- B::Data: Send,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- {
- let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
- let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
-
- tokio::spawn(connection);
-
- sender
- }
-
- async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
- where
- B: Body + Unpin + Send + 'static,
- B::Data: Send,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- {
- let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
- let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
- .handshake(stream)
- .await
- .unwrap();
-
- tokio::spawn(connection);
-
- sender
- }
-
- async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
- let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
- let listener = TcpListener::bind(addr).await.unwrap();
-
- let local_addr = listener.local_addr().unwrap();
-
- tokio::spawn(async move {
- loop {
- let (stream, _) = listener.accept().await.unwrap();
- let stream = TokioIo::new(stream);
- tokio::task::spawn(async move {
- let mut builder = auto::Builder::new(TokioExecutor::new());
- if h1_only {
- builder = builder.http1_only();
- builder.serve_connection(stream, service_fn(hello)).await
- } else if h2_only {
- builder = builder.http2_only();
- builder.serve_connection(stream, service_fn(hello)).await
- } else {
- builder
- .http2()
- .max_header_list_size(4096)
- .serve_connection_with_upgrades(stream, service_fn(hello))
- .await
- }
- .unwrap();
- });
- }
- });
-
- local_addr
- }
-
- async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
- Ok(Response::new(Full::new(Bytes::from(BODY))))
- }
-}
diff --git a/vendor/hyper-util/src/server/conn/auto/upgrade.rs b/vendor/hyper-util/src/server/conn/auto/upgrade.rs
deleted file mode 100644
index 8d94c409..00000000
--- a/vendor/hyper-util/src/server/conn/auto/upgrade.rs
+++ /dev/null
@@ -1,68 +0,0 @@
-//! Upgrade utilities.
-
-use bytes::{Bytes, BytesMut};
-use hyper::{
- rt::{Read, Write},
- upgrade::Upgraded,
-};
-
-use crate::common::rewind::Rewind;
-
-/// Tries to downcast the internal trait object to the type passed.
-///
-/// On success, returns the downcasted parts. On error, returns the Upgraded back.
-/// This is a kludge to work around the fact that the machinery provided by
-/// [`hyper_util::server::conn::auto`] wraps the inner `T` with a private type
-/// that is not reachable from outside the crate.
-///
-/// [`hyper_util::server::conn::auto`]: crate::server::conn::auto
-///
-/// This kludge will be removed when this machinery is added back to the main
-/// `hyper` code.
-pub fn downcast<T>(upgraded: Upgraded) -> Result<Parts<T>, Upgraded>
-where
- T: Read + Write + Unpin + 'static,
-{
- let hyper::upgrade::Parts {
- io: rewind,
- mut read_buf,
- ..
- } = upgraded.downcast::<Rewind<T>>()?;
-
- if let Some(pre) = rewind.pre {
- read_buf = if read_buf.is_empty() {
- pre
- } else {
- let mut buf = BytesMut::from(read_buf);
-
- buf.extend_from_slice(&pre);
-
- buf.freeze()
- };
- }
-
- Ok(Parts {
- io: rewind.inner,
- read_buf,
- })
-}
-
-/// The deconstructed parts of an [`Upgraded`] type.
-///
-/// Includes the original IO type, and a read buffer of bytes that the
-/// HTTP state machine may have already read before completing an upgrade.
-#[derive(Debug)]
-#[non_exhaustive]
-pub struct Parts<T> {
- /// The original IO object used before the upgrade.
- pub io: T,
- /// A buffer of bytes that have been read but not processed as HTTP.
- ///
- /// For instance, if the `Connection` is used for an HTTP upgrade request,
- /// it is possible the server sent back the first bytes of the new protocol
- /// along with the response upgrade.
- ///
- /// You will want to check for any existing bytes if you plan to continue
- /// communicating on the IO object.
- pub read_buf: Bytes,
-}
diff --git a/vendor/hyper-util/src/server/conn/mod.rs b/vendor/hyper-util/src/server/conn/mod.rs
deleted file mode 100644
index b23503a1..00000000
--- a/vendor/hyper-util/src/server/conn/mod.rs
+++ /dev/null
@@ -1,4 +0,0 @@
-//! Connection utilities.
-
-#[cfg(any(feature = "http1", feature = "http2"))]
-pub mod auto;
diff --git a/vendor/hyper-util/src/server/graceful.rs b/vendor/hyper-util/src/server/graceful.rs
deleted file mode 100644
index b367fc8a..00000000
--- a/vendor/hyper-util/src/server/graceful.rs
+++ /dev/null
@@ -1,488 +0,0 @@
-//! Utility to gracefully shutdown a server.
-//!
-//! This module provides a [`GracefulShutdown`] type,
-//! which can be used to gracefully shutdown a server.
-//!
-//! See <https://github.com/hyperium/hyper-util/blob/master/examples/server_graceful.rs>
-//! for an example of how to use this.
-
-use std::{
- fmt::{self, Debug},
- future::Future,
- pin::Pin,
- task::{self, Poll},
-};
-
-use pin_project_lite::pin_project;
-use tokio::sync::watch;
-
-/// A graceful shutdown utility
-// Purposefully not `Clone`, see `watcher()` method for why.
-pub struct GracefulShutdown {
- tx: watch::Sender<()>,
-}
-
-/// A watcher side of the graceful shutdown.
-///
-/// This type can only watch a connection, it cannot trigger a shutdown.
-///
-/// Call [`GracefulShutdown::watcher()`] to construct one of these.
-pub struct Watcher {
- rx: watch::Receiver<()>,
-}
-
-impl GracefulShutdown {
- /// Create a new graceful shutdown helper.
- pub fn new() -> Self {
- let (tx, _) = watch::channel(());
- Self { tx }
- }
-
- /// Wrap a future for graceful shutdown watching.
- pub fn watch<C: GracefulConnection>(&self, conn: C) -> impl Future<Output = C::Output> {
- self.watcher().watch(conn)
- }
-
- /// Create an owned type that can watch a connection.
- ///
- /// This method allows created an owned type that can be sent onto another
- /// task before calling [`Watcher::watch()`].
- // Internal: this function exists because `Clone` allows footguns.
- // If the `tx` were cloned (or the `rx`), race conditions can happens where
- // one task starting a shutdown is scheduled and interwined with a task
- // starting to watch a connection, and the "watch version" is one behind.
- pub fn watcher(&self) -> Watcher {
- let rx = self.tx.subscribe();
- Watcher { rx }
- }
-
- /// Signal shutdown for all watched connections.
- ///
- /// This returns a `Future` which will complete once all watched
- /// connections have shutdown.
- pub async fn shutdown(self) {
- let Self { tx } = self;
-
- // signal all the watched futures about the change
- let _ = tx.send(());
- // and then wait for all of them to complete
- tx.closed().await;
- }
-
- /// Returns the number of the watching connections.
- pub fn count(&self) -> usize {
- self.tx.receiver_count()
- }
-}
-
-impl Debug for GracefulShutdown {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("GracefulShutdown").finish()
- }
-}
-
-impl Default for GracefulShutdown {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl Watcher {
- /// Wrap a future for graceful shutdown watching.
- pub fn watch<C: GracefulConnection>(self, conn: C) -> impl Future<Output = C::Output> {
- let Watcher { mut rx } = self;
- GracefulConnectionFuture::new(conn, async move {
- let _ = rx.changed().await;
- // hold onto the rx until the watched future is completed
- rx
- })
- }
-}
-
-impl Debug for Watcher {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("GracefulWatcher").finish()
- }
-}
-
-pin_project! {
- struct GracefulConnectionFuture<C, F: Future> {
- #[pin]
- conn: C,
- #[pin]
- cancel: F,
- #[pin]
- // If cancelled, this is held until the inner conn is done.
- cancelled_guard: Option<F::Output>,
- }
-}
-
-impl<C, F: Future> GracefulConnectionFuture<C, F> {
- fn new(conn: C, cancel: F) -> Self {
- Self {
- conn,
- cancel,
- cancelled_guard: None,
- }
- }
-}
-
-impl<C, F: Future> Debug for GracefulConnectionFuture<C, F> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("GracefulConnectionFuture").finish()
- }
-}
-
-impl<C, F> Future for GracefulConnectionFuture<C, F>
-where
- C: GracefulConnection,
- F: Future,
-{
- type Output = C::Output;
-
- fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
- let mut this = self.project();
- if this.cancelled_guard.is_none() {
- if let Poll::Ready(guard) = this.cancel.poll(cx) {
- this.cancelled_guard.set(Some(guard));
- this.conn.as_mut().graceful_shutdown();
- }
- }
- this.conn.poll(cx)
- }
-}
-
-/// An internal utility trait as an umbrella target for all (hyper) connection
-/// types that the [`GracefulShutdown`] can watch.
-pub trait GracefulConnection: Future<Output = Result<(), Self::Error>> + private::Sealed {
- /// The error type returned by the connection when used as a future.
- type Error;
-
- /// Start a graceful shutdown process for this connection.
- fn graceful_shutdown(self: Pin<&mut Self>);
-}
-
-#[cfg(feature = "http1")]
-impl<I, B, S> GracefulConnection for hyper::server::conn::http1::Connection<I, S>
-where
- S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>,
- S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
- B: hyper::body::Body + 'static,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
-{
- type Error = hyper::Error;
-
- fn graceful_shutdown(self: Pin<&mut Self>) {
- hyper::server::conn::http1::Connection::graceful_shutdown(self);
- }
-}
-
-#[cfg(feature = "http2")]
-impl<I, B, S, E> GracefulConnection for hyper::server::conn::http2::Connection<I, S, E>
-where
- S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>,
- S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
- B: hyper::body::Body + 'static,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
-{
- type Error = hyper::Error;
-
- fn graceful_shutdown(self: Pin<&mut Self>) {
- hyper::server::conn::http2::Connection::graceful_shutdown(self);
- }
-}
-
-#[cfg(feature = "server-auto")]
-impl<I, B, S, E> GracefulConnection for crate::server::conn::auto::Connection<'_, I, S, E>
-where
- S: hyper::service::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>>,
- S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- S::Future: 'static,
- I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
- B: hyper::body::Body + 'static,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
-{
- type Error = Box<dyn std::error::Error + Send + Sync>;
-
- fn graceful_shutdown(self: Pin<&mut Self>) {
- crate::server::conn::auto::Connection::graceful_shutdown(self);
- }
-}
-
-#[cfg(feature = "server-auto")]
-impl<I, B, S, E> GracefulConnection
- for crate::server::conn::auto::UpgradeableConnection<'_, I, S, E>
-where
- S: hyper::service::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>>,
- S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- S::Future: 'static,
- I: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
- B: hyper::body::Body + 'static,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
-{
- type Error = Box<dyn std::error::Error + Send + Sync>;
-
- fn graceful_shutdown(self: Pin<&mut Self>) {
- crate::server::conn::auto::UpgradeableConnection::graceful_shutdown(self);
- }
-}
-
-mod private {
- pub trait Sealed {}
-
- #[cfg(feature = "http1")]
- impl<I, B, S> Sealed for hyper::server::conn::http1::Connection<I, S>
- where
- S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>,
- S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
- B: hyper::body::Body + 'static,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- {
- }
-
- #[cfg(feature = "http1")]
- impl<I, B, S> Sealed for hyper::server::conn::http1::UpgradeableConnection<I, S>
- where
- S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>,
- S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
- B: hyper::body::Body + 'static,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- {
- }
-
- #[cfg(feature = "http2")]
- impl<I, B, S, E> Sealed for hyper::server::conn::http2::Connection<I, S, E>
- where
- S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>,
- S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
- B: hyper::body::Body + 'static,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
- {
- }
-
- #[cfg(feature = "server-auto")]
- impl<I, B, S, E> Sealed for crate::server::conn::auto::Connection<'_, I, S, E>
- where
- S: hyper::service::Service<
- http::Request<hyper::body::Incoming>,
- Response = http::Response<B>,
- >,
- S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- S::Future: 'static,
- I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
- B: hyper::body::Body + 'static,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
- {
- }
-
- #[cfg(feature = "server-auto")]
- impl<I, B, S, E> Sealed for crate::server::conn::auto::UpgradeableConnection<'_, I, S, E>
- where
- S: hyper::service::Service<
- http::Request<hyper::body::Incoming>,
- Response = http::Response<B>,
- >,
- S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- S::Future: 'static,
- I: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
- B: hyper::body::Body + 'static,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>,
- {
- }
-}
-
-#[cfg(test)]
-mod test {
- use super::*;
- use pin_project_lite::pin_project;
- use std::sync::atomic::{AtomicUsize, Ordering};
- use std::sync::Arc;
-
- pin_project! {
- #[derive(Debug)]
- struct DummyConnection<F> {
- #[pin]
- future: F,
- shutdown_counter: Arc<AtomicUsize>,
- }
- }
-
- impl<F> private::Sealed for DummyConnection<F> {}
-
- impl<F: Future> GracefulConnection for DummyConnection<F> {
- type Error = ();
-
- fn graceful_shutdown(self: Pin<&mut Self>) {
- self.shutdown_counter.fetch_add(1, Ordering::SeqCst);
- }
- }
-
- impl<F: Future> Future for DummyConnection<F> {
- type Output = Result<(), ()>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
- match self.project().future.poll(cx) {
- Poll::Ready(_) => Poll::Ready(Ok(())),
- Poll::Pending => Poll::Pending,
- }
- }
- }
-
- #[cfg(not(miri))]
- #[tokio::test]
- async fn test_graceful_shutdown_ok() {
- let graceful = GracefulShutdown::new();
- let shutdown_counter = Arc::new(AtomicUsize::new(0));
- let (dummy_tx, _) = tokio::sync::broadcast::channel(1);
-
- for i in 1..=3 {
- let mut dummy_rx = dummy_tx.subscribe();
- let shutdown_counter = shutdown_counter.clone();
-
- let future = async move {
- tokio::time::sleep(std::time::Duration::from_millis(i * 10)).await;
- let _ = dummy_rx.recv().await;
- };
- let dummy_conn = DummyConnection {
- future,
- shutdown_counter,
- };
- let conn = graceful.watch(dummy_conn);
- tokio::spawn(async move {
- conn.await.unwrap();
- });
- }
-
- assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0);
- let _ = dummy_tx.send(());
-
- tokio::select! {
- _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
- panic!("timeout")
- },
- _ = graceful.shutdown() => {
- assert_eq!(shutdown_counter.load(Ordering::SeqCst), 3);
- }
- }
- }
-
- #[cfg(not(miri))]
- #[tokio::test]
- async fn test_graceful_shutdown_delayed_ok() {
- let graceful = GracefulShutdown::new();
- let shutdown_counter = Arc::new(AtomicUsize::new(0));
-
- for i in 1..=3 {
- let shutdown_counter = shutdown_counter.clone();
-
- //tokio::time::sleep(std::time::Duration::from_millis(i * 5)).await;
- let future = async move {
- tokio::time::sleep(std::time::Duration::from_millis(i * 50)).await;
- };
- let dummy_conn = DummyConnection {
- future,
- shutdown_counter,
- };
- let conn = graceful.watch(dummy_conn);
- tokio::spawn(async move {
- conn.await.unwrap();
- });
- }
-
- assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0);
-
- tokio::select! {
- _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => {
- panic!("timeout")
- },
- _ = graceful.shutdown() => {
- assert_eq!(shutdown_counter.load(Ordering::SeqCst), 3);
- }
- }
- }
-
- #[cfg(not(miri))]
- #[tokio::test]
- async fn test_graceful_shutdown_multi_per_watcher_ok() {
- let graceful = GracefulShutdown::new();
- let shutdown_counter = Arc::new(AtomicUsize::new(0));
-
- for i in 1..=3 {
- let shutdown_counter = shutdown_counter.clone();
-
- let mut futures = Vec::new();
- for u in 1..=i {
- let future = tokio::time::sleep(std::time::Duration::from_millis(u * 50));
- let dummy_conn = DummyConnection {
- future,
- shutdown_counter: shutdown_counter.clone(),
- };
- let conn = graceful.watch(dummy_conn);
- futures.push(conn);
- }
- tokio::spawn(async move {
- futures_util::future::join_all(futures).await;
- });
- }
-
- assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0);
-
- tokio::select! {
- _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => {
- panic!("timeout")
- },
- _ = graceful.shutdown() => {
- assert_eq!(shutdown_counter.load(Ordering::SeqCst), 6);
- }
- }
- }
-
- #[cfg(not(miri))]
- #[tokio::test]
- async fn test_graceful_shutdown_timeout() {
- let graceful = GracefulShutdown::new();
- let shutdown_counter = Arc::new(AtomicUsize::new(0));
-
- for i in 1..=3 {
- let shutdown_counter = shutdown_counter.clone();
-
- let future = async move {
- if i == 1 {
- std::future::pending::<()>().await
- } else {
- std::future::ready(()).await
- }
- };
- let dummy_conn = DummyConnection {
- future,
- shutdown_counter,
- };
- let conn = graceful.watch(dummy_conn);
- tokio::spawn(async move {
- conn.await.unwrap();
- });
- }
-
- assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0);
-
- tokio::select! {
- _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
- assert_eq!(shutdown_counter.load(Ordering::SeqCst), 3);
- },
- _ = graceful.shutdown() => {
- panic!("shutdown should not be completed: as not all our conns finish")
- }
- }
- }
-}
diff --git a/vendor/hyper-util/src/server/mod.rs b/vendor/hyper-util/src/server/mod.rs
deleted file mode 100644
index a4838ac5..00000000
--- a/vendor/hyper-util/src/server/mod.rs
+++ /dev/null
@@ -1,6 +0,0 @@
-//! Server utilities.
-
-pub mod conn;
-
-#[cfg(feature = "server-graceful")]
-pub mod graceful;