diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-15 16:37:08 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-17 16:30:22 -0600 |
| commit | 45df4d0d9b577fecee798d672695fe24ff57fb1b (patch) | |
| tree | 1b99bf645035b58e0d6db08c7a83521f41f7a75b /vendor/hyper-util/src/server | |
| parent | f94f79608393d4ab127db63cc41668445ef6b243 (diff) | |
feat: migrate from Cedar to SpiceDB authorization system
This is a major architectural change that replaces the Cedar policy-based
authorization system with SpiceDB's relation-based authorization.
Key changes:
- Migrate from Rust to Go implementation
- Replace Cedar policies with SpiceDB schema and relationships
- Switch from envoy `ext_authz` with Cedar to SpiceDB permission checks
- Update build system and dependencies for Go ecosystem
- Maintain Envoy integration for external authorization
This change enables more flexible permission modeling through SpiceDB's
Google Zanzibar inspired relation-based system, supporting complex
hierarchical permissions that were difficult to express in Cedar.
Breaking change: Existing Cedar policies and Rust-based configuration
will no longer work and need to be migrated to SpiceDB schema.
Diffstat (limited to 'vendor/hyper-util/src/server')
| -rw-r--r-- | vendor/hyper-util/src/server/conn/auto/mod.rs | 1304 | ||||
| -rw-r--r-- | vendor/hyper-util/src/server/conn/auto/upgrade.rs | 68 | ||||
| -rw-r--r-- | vendor/hyper-util/src/server/conn/mod.rs | 4 | ||||
| -rw-r--r-- | vendor/hyper-util/src/server/graceful.rs | 488 | ||||
| -rw-r--r-- | vendor/hyper-util/src/server/mod.rs | 6 |
5 files changed, 0 insertions, 1870 deletions
diff --git a/vendor/hyper-util/src/server/conn/auto/mod.rs b/vendor/hyper-util/src/server/conn/auto/mod.rs deleted file mode 100644 index b2fc6556..00000000 --- a/vendor/hyper-util/src/server/conn/auto/mod.rs +++ /dev/null @@ -1,1304 +0,0 @@ -//! Http1 or Http2 connection. - -pub mod upgrade; - -use hyper::service::HttpService; -use std::future::Future; -use std::marker::PhantomPinned; -use std::mem::MaybeUninit; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{error::Error as StdError, io, time::Duration}; - -use bytes::Bytes; -use futures_core::ready; -use http::{Request, Response}; -use http_body::Body; -use hyper::{ - body::Incoming, - rt::{Read, ReadBuf, Timer, Write}, - service::Service, -}; - -#[cfg(feature = "http1")] -use hyper::server::conn::http1; - -#[cfg(feature = "http2")] -use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2}; - -#[cfg(any(not(feature = "http2"), not(feature = "http1")))] -use std::marker::PhantomData; - -use pin_project_lite::pin_project; - -use crate::common::rewind::Rewind; - -type Error = Box<dyn std::error::Error + Send + Sync>; - -type Result<T> = std::result::Result<T, Error>; - -const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; - -/// Exactly equivalent to [`Http2ServerConnExec`]. -#[cfg(feature = "http2")] -pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {} - -#[cfg(feature = "http2")] -impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {} - -/// Exactly equivalent to [`Http2ServerConnExec`]. -#[cfg(not(feature = "http2"))] -pub trait HttpServerConnExec<A, B: Body> {} - -#[cfg(not(feature = "http2"))] -impl<A, B: Body, T> HttpServerConnExec<A, B> for T {} - -/// Http1 or Http2 connection builder. -#[derive(Clone, Debug)] -pub struct Builder<E> { - #[cfg(feature = "http1")] - http1: http1::Builder, - #[cfg(feature = "http2")] - http2: http2::Builder<E>, - #[cfg(any(feature = "http1", feature = "http2"))] - version: Option<Version>, - #[cfg(not(feature = "http2"))] - _executor: E, -} - -impl<E: Default> Default for Builder<E> { - fn default() -> Self { - Self::new(E::default()) - } -} - -impl<E> Builder<E> { - /// Create a new auto connection builder. - /// - /// `executor` parameter should be a type that implements - /// [`Executor`](hyper::rt::Executor) trait. - /// - /// # Example - /// - /// ``` - /// use hyper_util::{ - /// rt::TokioExecutor, - /// server::conn::auto, - /// }; - /// - /// auto::Builder::new(TokioExecutor::new()); - /// ``` - pub fn new(executor: E) -> Self { - Self { - #[cfg(feature = "http1")] - http1: http1::Builder::new(), - #[cfg(feature = "http2")] - http2: http2::Builder::new(executor), - #[cfg(any(feature = "http1", feature = "http2"))] - version: None, - #[cfg(not(feature = "http2"))] - _executor: executor, - } - } - - /// Http1 configuration. - #[cfg(feature = "http1")] - pub fn http1(&mut self) -> Http1Builder<'_, E> { - Http1Builder { inner: self } - } - - /// Http2 configuration. - #[cfg(feature = "http2")] - pub fn http2(&mut self) -> Http2Builder<'_, E> { - Http2Builder { inner: self } - } - - /// Only accepts HTTP/2 - /// - /// Does not do anything if used with [`serve_connection_with_upgrades`] - /// - /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades - #[cfg(feature = "http2")] - pub fn http2_only(mut self) -> Self { - assert!(self.version.is_none()); - self.version = Some(Version::H2); - self - } - - /// Only accepts HTTP/1 - /// - /// Does not do anything if used with [`serve_connection_with_upgrades`] - /// - /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades - #[cfg(feature = "http1")] - pub fn http1_only(mut self) -> Self { - assert!(self.version.is_none()); - self.version = Some(Version::H1); - self - } - - /// Returns `true` if this builder can serve an HTTP/1.1-based connection. - pub fn is_http1_available(&self) -> bool { - match self.version { - #[cfg(feature = "http1")] - Some(Version::H1) => true, - #[cfg(feature = "http2")] - Some(Version::H2) => false, - #[cfg(any(feature = "http1", feature = "http2"))] - _ => true, - } - } - - /// Returns `true` if this builder can serve an HTTP/2-based connection. - pub fn is_http2_available(&self) -> bool { - match self.version { - #[cfg(feature = "http1")] - Some(Version::H1) => false, - #[cfg(feature = "http2")] - Some(Version::H2) => true, - #[cfg(any(feature = "http1", feature = "http2"))] - _ => true, - } - } - - /// Bind a connection together with a [`Service`]. - pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E> - where - S: Service<Request<Incoming>, Response = Response<B>>, - S::Future: 'static, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - B: Body + 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - I: Read + Write + Unpin + 'static, - E: HttpServerConnExec<S::Future, B>, - { - let state = match self.version { - #[cfg(feature = "http1")] - Some(Version::H1) => { - let io = Rewind::new_buffered(io, Bytes::new()); - let conn = self.http1.serve_connection(io, service); - ConnState::H1 { conn } - } - #[cfg(feature = "http2")] - Some(Version::H2) => { - let io = Rewind::new_buffered(io, Bytes::new()); - let conn = self.http2.serve_connection(io, service); - ConnState::H2 { conn } - } - #[cfg(any(feature = "http1", feature = "http2"))] - _ => ConnState::ReadVersion { - read_version: read_version(io), - builder: Cow::Borrowed(self), - service: Some(service), - }, - }; - - Connection { state } - } - - /// Bind a connection together with a [`Service`], with the ability to - /// handle HTTP upgrades. This requires that the IO object implements - /// `Send`. - /// - /// Note that if you ever want to use [`hyper::upgrade::Upgraded::downcast`] - /// with this crate, you'll need to use [`hyper_util::server::conn::auto::upgrade::downcast`] - /// instead. See the documentation of the latter to understand why. - /// - /// [`hyper_util::server::conn::auto::upgrade::downcast`]: crate::server::conn::auto::upgrade::downcast - pub fn serve_connection_with_upgrades<I, S, B>( - &self, - io: I, - service: S, - ) -> UpgradeableConnection<'_, I, S, E> - where - S: Service<Request<Incoming>, Response = Response<B>>, - S::Future: 'static, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - B: Body + 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - I: Read + Write + Unpin + Send + 'static, - E: HttpServerConnExec<S::Future, B>, - { - UpgradeableConnection { - state: UpgradeableConnState::ReadVersion { - read_version: read_version(io), - builder: Cow::Borrowed(self), - service: Some(service), - }, - } - } -} - -#[derive(Copy, Clone, Debug)] -enum Version { - H1, - H2, -} - -impl Version { - #[must_use] - #[cfg(any(not(feature = "http2"), not(feature = "http1")))] - pub fn unsupported(self) -> Error { - match self { - Version::H1 => Error::from("HTTP/1 is not supported"), - Version::H2 => Error::from("HTTP/2 is not supported"), - } - } -} - -fn read_version<I>(io: I) -> ReadVersion<I> -where - I: Read + Unpin, -{ - ReadVersion { - io: Some(io), - buf: [MaybeUninit::uninit(); 24], - filled: 0, - version: Version::H2, - cancelled: false, - _pin: PhantomPinned, - } -} - -pin_project! { - struct ReadVersion<I> { - io: Option<I>, - buf: [MaybeUninit<u8>; 24], - // the amount of `buf` thats been filled - filled: usize, - version: Version, - cancelled: bool, - // Make this future `!Unpin` for compatibility with async trait methods. - #[pin] - _pin: PhantomPinned, - } -} - -impl<I> ReadVersion<I> { - pub fn cancel(self: Pin<&mut Self>) { - *self.project().cancelled = true; - } -} - -impl<I> Future for ReadVersion<I> -where - I: Read + Unpin, -{ - type Output = io::Result<(Version, Rewind<I>)>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let this = self.project(); - if *this.cancelled { - return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled"))); - } - - let mut buf = ReadBuf::uninit(&mut *this.buf); - // SAFETY: `this.filled` tracks how many bytes have been read (and thus initialized) and - // we're only advancing by that many. - unsafe { - buf.unfilled().advance(*this.filled); - }; - - // We start as H2 and switch to H1 as soon as we don't have the preface. - while buf.filled().len() < H2_PREFACE.len() { - let len = buf.filled().len(); - ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?; - *this.filled = buf.filled().len(); - - // We starts as H2 and switch to H1 when we don't get the preface. - if buf.filled().len() == len - || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()] - { - *this.version = Version::H1; - break; - } - } - - let io = this.io.take().unwrap(); - let buf = buf.filled().to_vec(); - Poll::Ready(Ok(( - *this.version, - Rewind::new_buffered(io, Bytes::from(buf)), - ))) - } -} - -pin_project! { - /// A [`Future`](core::future::Future) representing an HTTP/1 connection, returned from - /// [`Builder::serve_connection`](struct.Builder.html#method.serve_connection). - /// - /// To drive HTTP on this connection this future **must be polled**, typically with - /// `.await`. If it isn't polled, no progress will be made on this connection. - #[must_use = "futures do nothing unless polled"] - pub struct Connection<'a, I, S, E> - where - S: HttpService<Incoming>, - { - #[pin] - state: ConnState<'a, I, S, E>, - } -} - -// A custom COW, since the libstd is has ToOwned bounds that are too eager. -enum Cow<'a, T> { - Borrowed(&'a T), - Owned(T), -} - -impl<T> std::ops::Deref for Cow<'_, T> { - type Target = T; - fn deref(&self) -> &T { - match self { - Cow::Borrowed(t) => &*t, - Cow::Owned(ref t) => t, - } - } -} - -#[cfg(feature = "http1")] -type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>; - -#[cfg(not(feature = "http1"))] -type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>); - -#[cfg(feature = "http2")] -type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>; - -#[cfg(not(feature = "http2"))] -type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>); - -pin_project! { - #[project = ConnStateProj] - enum ConnState<'a, I, S, E> - where - S: HttpService<Incoming>, - { - ReadVersion { - #[pin] - read_version: ReadVersion<I>, - builder: Cow<'a, Builder<E>>, - service: Option<S>, - }, - H1 { - #[pin] - conn: Http1Connection<I, S>, - }, - H2 { - #[pin] - conn: Http2Connection<I, S, E>, - }, - } -} - -impl<I, S, E, B> Connection<'_, I, S, E> -where - S: HttpService<Incoming, ResBody = B>, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - I: Read + Write + Unpin, - B: Body + 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - E: HttpServerConnExec<S::Future, B>, -{ - /// Start a graceful shutdown process for this connection. - /// - /// This `Connection` should continue to be polled until shutdown can finish. - /// - /// # Note - /// - /// This should only be called while the `Connection` future is still pending. If called after - /// `Connection::poll` has resolved, this does nothing. - pub fn graceful_shutdown(self: Pin<&mut Self>) { - match self.project().state.project() { - ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(), - #[cfg(feature = "http1")] - ConnStateProj::H1 { conn } => conn.graceful_shutdown(), - #[cfg(feature = "http2")] - ConnStateProj::H2 { conn } => conn.graceful_shutdown(), - #[cfg(any(not(feature = "http1"), not(feature = "http2")))] - _ => unreachable!(), - } - } - - /// Make this Connection static, instead of borrowing from Builder. - pub fn into_owned(self) -> Connection<'static, I, S, E> - where - Builder<E>: Clone, - { - Connection { - state: match self.state { - ConnState::ReadVersion { - read_version, - builder, - service, - } => ConnState::ReadVersion { - read_version, - service, - builder: Cow::Owned(builder.clone()), - }, - #[cfg(feature = "http1")] - ConnState::H1 { conn } => ConnState::H1 { conn }, - #[cfg(feature = "http2")] - ConnState::H2 { conn } => ConnState::H2 { conn }, - #[cfg(any(not(feature = "http1"), not(feature = "http2")))] - _ => unreachable!(), - }, - } - } -} - -impl<I, S, E, B> Future for Connection<'_, I, S, E> -where - S: Service<Request<Incoming>, Response = Response<B>>, - S::Future: 'static, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - B: Body + 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - I: Read + Write + Unpin + 'static, - E: HttpServerConnExec<S::Future, B>, -{ - type Output = Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - loop { - let mut this = self.as_mut().project(); - - match this.state.as_mut().project() { - ConnStateProj::ReadVersion { - read_version, - builder, - service, - } => { - let (version, io) = ready!(read_version.poll(cx))?; - let service = service.take().unwrap(); - match version { - #[cfg(feature = "http1")] - Version::H1 => { - let conn = builder.http1.serve_connection(io, service); - this.state.set(ConnState::H1 { conn }); - } - #[cfg(feature = "http2")] - Version::H2 => { - let conn = builder.http2.serve_connection(io, service); - this.state.set(ConnState::H2 { conn }); - } - #[cfg(any(not(feature = "http1"), not(feature = "http2")))] - _ => return Poll::Ready(Err(version.unsupported())), - } - } - #[cfg(feature = "http1")] - ConnStateProj::H1 { conn } => { - return conn.poll(cx).map_err(Into::into); - } - #[cfg(feature = "http2")] - ConnStateProj::H2 { conn } => { - return conn.poll(cx).map_err(Into::into); - } - #[cfg(any(not(feature = "http1"), not(feature = "http2")))] - _ => unreachable!(), - } - } - } -} - -pin_project! { - /// An upgradable [`Connection`], returned by - /// [`Builder::serve_upgradable_connection`](struct.Builder.html#method.serve_connection_with_upgrades). - /// - /// To drive HTTP on this connection this future **must be polled**, typically with - /// `.await`. If it isn't polled, no progress will be made on this connection. - #[must_use = "futures do nothing unless polled"] - pub struct UpgradeableConnection<'a, I, S, E> - where - S: HttpService<Incoming>, - { - #[pin] - state: UpgradeableConnState<'a, I, S, E>, - } -} - -#[cfg(feature = "http1")] -type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>; - -#[cfg(not(feature = "http1"))] -type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>); - -pin_project! { - #[project = UpgradeableConnStateProj] - enum UpgradeableConnState<'a, I, S, E> - where - S: HttpService<Incoming>, - { - ReadVersion { - #[pin] - read_version: ReadVersion<I>, - builder: Cow<'a, Builder<E>>, - service: Option<S>, - }, - H1 { - #[pin] - conn: Http1UpgradeableConnection<Rewind<I>, S>, - }, - H2 { - #[pin] - conn: Http2Connection<I, S, E>, - }, - } -} - -impl<I, S, E, B> UpgradeableConnection<'_, I, S, E> -where - S: HttpService<Incoming, ResBody = B>, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - I: Read + Write + Unpin, - B: Body + 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - E: HttpServerConnExec<S::Future, B>, -{ - /// Start a graceful shutdown process for this connection. - /// - /// This `UpgradeableConnection` should continue to be polled until shutdown can finish. - /// - /// # Note - /// - /// This should only be called while the `Connection` future is still nothing. pending. If - /// called after `UpgradeableConnection::poll` has resolved, this does nothing. - pub fn graceful_shutdown(self: Pin<&mut Self>) { - match self.project().state.project() { - UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(), - #[cfg(feature = "http1")] - UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(), - #[cfg(feature = "http2")] - UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(), - #[cfg(any(not(feature = "http1"), not(feature = "http2")))] - _ => unreachable!(), - } - } - - /// Make this Connection static, instead of borrowing from Builder. - pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E> - where - Builder<E>: Clone, - { - UpgradeableConnection { - state: match self.state { - UpgradeableConnState::ReadVersion { - read_version, - builder, - service, - } => UpgradeableConnState::ReadVersion { - read_version, - service, - builder: Cow::Owned(builder.clone()), - }, - #[cfg(feature = "http1")] - UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn }, - #[cfg(feature = "http2")] - UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn }, - #[cfg(any(not(feature = "http1"), not(feature = "http2")))] - _ => unreachable!(), - }, - } - } -} - -impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E> -where - S: Service<Request<Incoming>, Response = Response<B>>, - S::Future: 'static, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - B: Body + 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - I: Read + Write + Unpin + Send + 'static, - E: HttpServerConnExec<S::Future, B>, -{ - type Output = Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - loop { - let mut this = self.as_mut().project(); - - match this.state.as_mut().project() { - UpgradeableConnStateProj::ReadVersion { - read_version, - builder, - service, - } => { - let (version, io) = ready!(read_version.poll(cx))?; - let service = service.take().unwrap(); - match version { - #[cfg(feature = "http1")] - Version::H1 => { - let conn = builder.http1.serve_connection(io, service).with_upgrades(); - this.state.set(UpgradeableConnState::H1 { conn }); - } - #[cfg(feature = "http2")] - Version::H2 => { - let conn = builder.http2.serve_connection(io, service); - this.state.set(UpgradeableConnState::H2 { conn }); - } - #[cfg(any(not(feature = "http1"), not(feature = "http2")))] - _ => return Poll::Ready(Err(version.unsupported())), - } - } - #[cfg(feature = "http1")] - UpgradeableConnStateProj::H1 { conn } => { - return conn.poll(cx).map_err(Into::into); - } - #[cfg(feature = "http2")] - UpgradeableConnStateProj::H2 { conn } => { - return conn.poll(cx).map_err(Into::into); - } - #[cfg(any(not(feature = "http1"), not(feature = "http2")))] - _ => unreachable!(), - } - } - } -} - -/// Http1 part of builder. -#[cfg(feature = "http1")] -pub struct Http1Builder<'a, E> { - inner: &'a mut Builder<E>, -} - -#[cfg(feature = "http1")] -impl<E> Http1Builder<'_, E> { - /// Http2 configuration. - #[cfg(feature = "http2")] - pub fn http2(&mut self) -> Http2Builder<'_, E> { - Http2Builder { inner: self.inner } - } - - /// Set whether the `date` header should be included in HTTP responses. - /// - /// Note that including the `date` header is recommended by RFC 7231. - /// - /// Default is true. - pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self { - self.inner.http1.auto_date_header(enabled); - self - } - - /// Set whether HTTP/1 connections should support half-closures. - /// - /// Clients can chose to shutdown their write-side while waiting - /// for the server to respond. Setting this to `true` will - /// prevent closing the connection immediately if `read` - /// detects an EOF in the middle of a request. - /// - /// Default is `false`. - pub fn half_close(&mut self, val: bool) -> &mut Self { - self.inner.http1.half_close(val); - self - } - - /// Enables or disables HTTP/1 keep-alive. - /// - /// Default is true. - pub fn keep_alive(&mut self, val: bool) -> &mut Self { - self.inner.http1.keep_alive(val); - self - } - - /// Set whether HTTP/1 connections will write header names as title case at - /// the socket level. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self { - self.inner.http1.title_case_headers(enabled); - self - } - - /// Set whether HTTP/1 connections will silently ignored malformed header lines. - /// - /// If this is enabled and a header line does not start with a valid header - /// name, or does not include a colon at all, the line will be silently ignored - /// and no error will be reported. - /// - /// Default is false. - pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self { - self.inner.http1.ignore_invalid_headers(enabled); - self - } - - /// Set whether to support preserving original header cases. - /// - /// Currently, this will record the original cases received, and store them - /// in a private extension on the `Request`. It will also look for and use - /// such an extension in any provided `Response`. - /// - /// Since the relevant extension is still private, there is no way to - /// interact with the original cases. The only effect this can have now is - /// to forward the cases in a proxy-like fashion. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self { - self.inner.http1.preserve_header_case(enabled); - self - } - - /// Set the maximum number of headers. - /// - /// When a request is received, the parser will reserve a buffer to store headers for optimal - /// performance. - /// - /// If server receives more headers than the buffer size, it responds to the client with - /// "431 Request Header Fields Too Large". - /// - /// The headers is allocated on the stack by default, which has higher performance. After - /// setting this value, headers will be allocated in heap memory, that is, heap memory - /// allocation will occur for each request, and there will be a performance drop of about 5%. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is 100. - pub fn max_headers(&mut self, val: usize) -> &mut Self { - self.inner.http1.max_headers(val); - self - } - - /// Set a timeout for reading client request headers. If a client does not - /// transmit the entire header within this time, the connection is closed. - /// - /// Requires a [`Timer`] set by [`Http1Builder::timer`] to take effect. Panics if `header_read_timeout` is configured - /// without a [`Timer`]. - /// - /// Pass `None` to disable. - /// - /// Default is currently 30 seconds, but do not depend on that. - pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self { - self.inner.http1.header_read_timeout(read_timeout); - self - } - - /// Set whether HTTP/1 connections should try to use vectored writes, - /// or always flatten into a single buffer. - /// - /// Note that setting this to false may mean more copies of body data, - /// but may also improve performance when an IO transport doesn't - /// support vectored writes well, such as most TLS implementations. - /// - /// Setting this to true will force hyper to use queued strategy - /// which may eliminate unnecessary cloning on some TLS backends - /// - /// Default is `auto`. In this mode hyper will try to guess which - /// mode to use - pub fn writev(&mut self, val: bool) -> &mut Self { - self.inner.http1.writev(val); - self - } - - /// Set the maximum buffer size for the connection. - /// - /// Default is ~400kb. - /// - /// # Panics - /// - /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. - pub fn max_buf_size(&mut self, max: usize) -> &mut Self { - self.inner.http1.max_buf_size(max); - self - } - - /// Aggregates flushes to better support pipelined responses. - /// - /// Experimental, may have bugs. - /// - /// Default is false. - pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self { - self.inner.http1.pipeline_flush(enabled); - self - } - - /// Set the timer used in background tasks. - pub fn timer<M>(&mut self, timer: M) -> &mut Self - where - M: Timer + Send + Sync + 'static, - { - self.inner.http1.timer(timer); - self - } - - /// Bind a connection together with a [`Service`]. - #[cfg(feature = "http2")] - pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()> - where - S: Service<Request<Incoming>, Response = Response<B>>, - S::Future: 'static, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - B: Body + 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - I: Read + Write + Unpin + 'static, - E: HttpServerConnExec<S::Future, B>, - { - self.inner.serve_connection(io, service).await - } - - /// Bind a connection together with a [`Service`]. - #[cfg(not(feature = "http2"))] - pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()> - where - S: Service<Request<Incoming>, Response = Response<B>>, - S::Future: 'static, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - B: Body + 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - I: Read + Write + Unpin + 'static, - { - self.inner.serve_connection(io, service).await - } - - /// Bind a connection together with a [`Service`], with the ability to - /// handle HTTP upgrades. This requires that the IO object implements - /// `Send`. - #[cfg(feature = "http2")] - pub fn serve_connection_with_upgrades<I, S, B>( - &self, - io: I, - service: S, - ) -> UpgradeableConnection<'_, I, S, E> - where - S: Service<Request<Incoming>, Response = Response<B>>, - S::Future: 'static, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - B: Body + 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - I: Read + Write + Unpin + Send + 'static, - E: HttpServerConnExec<S::Future, B>, - { - self.inner.serve_connection_with_upgrades(io, service) - } -} - -/// Http2 part of builder. -#[cfg(feature = "http2")] -pub struct Http2Builder<'a, E> { - inner: &'a mut Builder<E>, -} - -#[cfg(feature = "http2")] -impl<E> Http2Builder<'_, E> { - #[cfg(feature = "http1")] - /// Http1 configuration. - pub fn http1(&mut self) -> Http1Builder<'_, E> { - Http1Builder { inner: self.inner } - } - - /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent. - /// - /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2). - /// As of v0.4.0, it is 20. - /// - /// See <https://github.com/hyperium/hyper/issues/2877> for more information. - pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self { - self.inner.http2.max_pending_accept_reset_streams(max); - self - } - - /// Configures the maximum number of local reset streams allowed before a GOAWAY will be sent. - /// - /// If not set, hyper will use a default, currently of 1024. - /// - /// If `None` is supplied, hyper will not apply any limit. - /// This is not advised, as it can potentially expose servers to DOS vulnerabilities. - /// - /// See <https://rustsec.org/advisories/RUSTSEC-2024-0003.html> for more information. - pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self { - self.inner.http2.max_local_error_reset_streams(max); - self - } - - /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 - /// stream-level flow control. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE - pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { - self.inner.http2.initial_stream_window_size(sz); - self - } - - /// Sets the max connection-level flow control for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { - self.inner.http2.initial_connection_window_size(sz); - self - } - - /// Sets whether to use an adaptive flow control. - /// - /// Enabling this will override the limits set in - /// `http2_initial_stream_window_size` and - /// `http2_initial_connection_window_size`. - pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self { - self.inner.http2.adaptive_window(enabled); - self - } - - /// Sets the maximum frame size to use for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { - self.inner.http2.max_frame_size(sz); - self - } - - /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 - /// connections. - /// - /// Default is 200. Passing `None` will remove any limit. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS - pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self { - self.inner.http2.max_concurrent_streams(max); - self - } - - /// Sets an interval for HTTP2 Ping frames should be sent to keep a - /// connection alive. - /// - /// Pass `None` to disable HTTP2 keep-alive. - /// - /// Default is currently disabled. - /// - /// # Cargo Feature - /// - pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self { - self.inner.http2.keep_alive_interval(interval); - self - } - - /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. - /// - /// If the ping is not acknowledged within the timeout, the connection will - /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. - /// - /// Default is 20 seconds. - /// - /// # Cargo Feature - /// - pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { - self.inner.http2.keep_alive_timeout(timeout); - self - } - - /// Set the maximum write buffer size for each HTTP/2 stream. - /// - /// Default is currently ~400KB, but may change. - /// - /// # Panics - /// - /// The value must be no larger than `u32::MAX`. - pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self { - self.inner.http2.max_send_buf_size(max); - self - } - - /// Enables the [extended CONNECT protocol]. - /// - /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 - pub fn enable_connect_protocol(&mut self) -> &mut Self { - self.inner.http2.enable_connect_protocol(); - self - } - - /// Sets the max size of received header frames. - /// - /// Default is currently ~16MB, but may change. - pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { - self.inner.http2.max_header_list_size(max); - self - } - - /// Set the timer used in background tasks. - pub fn timer<M>(&mut self, timer: M) -> &mut Self - where - M: Timer + Send + Sync + 'static, - { - self.inner.http2.timer(timer); - self - } - - /// Set whether the `date` header should be included in HTTP responses. - /// - /// Note that including the `date` header is recommended by RFC 7231. - /// - /// Default is true. - pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self { - self.inner.http2.auto_date_header(enabled); - self - } - - /// Bind a connection together with a [`Service`]. - pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()> - where - S: Service<Request<Incoming>, Response = Response<B>>, - S::Future: 'static, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - B: Body + 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - I: Read + Write + Unpin + 'static, - E: HttpServerConnExec<S::Future, B>, - { - self.inner.serve_connection(io, service).await - } - - /// Bind a connection together with a [`Service`], with the ability to - /// handle HTTP upgrades. This requires that the IO object implements - /// `Send`. - pub fn serve_connection_with_upgrades<I, S, B>( - &self, - io: I, - service: S, - ) -> UpgradeableConnection<'_, I, S, E> - where - S: Service<Request<Incoming>, Response = Response<B>>, - S::Future: 'static, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - B: Body + 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - I: Read + Write + Unpin + Send + 'static, - E: HttpServerConnExec<S::Future, B>, - { - self.inner.serve_connection_with_upgrades(io, service) - } -} - -#[cfg(test)] -mod tests { - use crate::{ - rt::{TokioExecutor, TokioIo}, - server::conn::auto, - }; - use http::{Request, Response}; - use http_body::Body; - use http_body_util::{BodyExt, Empty, Full}; - use hyper::{body, body::Bytes, client, service::service_fn}; - use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration}; - use tokio::{ - net::{TcpListener, TcpStream}, - pin, - }; - - const BODY: &[u8] = b"Hello, world!"; - - #[test] - fn configuration() { - // One liner. - auto::Builder::new(TokioExecutor::new()) - .http1() - .keep_alive(true) - .http2() - .keep_alive_interval(None); - // .serve_connection(io, service); - - // Using variable. - let mut builder = auto::Builder::new(TokioExecutor::new()); - - builder.http1().keep_alive(true); - builder.http2().keep_alive_interval(None); - // builder.serve_connection(io, service); - } - - #[cfg(not(miri))] - #[tokio::test] - async fn http1() { - let addr = start_server(false, false).await; - let mut sender = connect_h1(addr).await; - - let response = sender - .send_request(Request::new(Empty::<Bytes>::new())) - .await - .unwrap(); - - let body = response.into_body().collect().await.unwrap().to_bytes(); - - assert_eq!(body, BODY); - } - - #[cfg(not(miri))] - #[tokio::test] - async fn http2() { - let addr = start_server(false, false).await; - let mut sender = connect_h2(addr).await; - - let response = sender - .send_request(Request::new(Empty::<Bytes>::new())) - .await - .unwrap(); - - let body = response.into_body().collect().await.unwrap().to_bytes(); - - assert_eq!(body, BODY); - } - - #[cfg(not(miri))] - #[tokio::test] - async fn http2_only() { - let addr = start_server(false, true).await; - let mut sender = connect_h2(addr).await; - - let response = sender - .send_request(Request::new(Empty::<Bytes>::new())) - .await - .unwrap(); - - let body = response.into_body().collect().await.unwrap().to_bytes(); - - assert_eq!(body, BODY); - } - - #[cfg(not(miri))] - #[tokio::test] - async fn http2_only_fail_if_client_is_http1() { - let addr = start_server(false, true).await; - let mut sender = connect_h1(addr).await; - - let _ = sender - .send_request(Request::new(Empty::<Bytes>::new())) - .await - .expect_err("should fail"); - } - - #[cfg(not(miri))] - #[tokio::test] - async fn http1_only() { - let addr = start_server(true, false).await; - let mut sender = connect_h1(addr).await; - - let response = sender - .send_request(Request::new(Empty::<Bytes>::new())) - .await - .unwrap(); - - let body = response.into_body().collect().await.unwrap().to_bytes(); - - assert_eq!(body, BODY); - } - - #[cfg(not(miri))] - #[tokio::test] - async fn http1_only_fail_if_client_is_http2() { - let addr = start_server(true, false).await; - let mut sender = connect_h2(addr).await; - - let _ = sender - .send_request(Request::new(Empty::<Bytes>::new())) - .await - .expect_err("should fail"); - } - - #[cfg(not(miri))] - #[tokio::test] - async fn graceful_shutdown() { - let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) - .await - .unwrap(); - - let listener_addr = listener.local_addr().unwrap(); - - // Spawn the task in background so that we can connect there - let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() }); - // Only connect a stream, do not send headers or anything - let _stream = TcpStream::connect(listener_addr).await.unwrap(); - - let (stream, _) = listen_task.await.unwrap(); - let stream = TokioIo::new(stream); - let builder = auto::Builder::new(TokioExecutor::new()); - let connection = builder.serve_connection(stream, service_fn(hello)); - - pin!(connection); - - connection.as_mut().graceful_shutdown(); - - let connection_error = tokio::time::timeout(Duration::from_millis(200), connection) - .await - .expect("Connection should have finished in a timely manner after graceful shutdown.") - .expect_err("Connection should have been interrupted."); - - let connection_error = connection_error - .downcast_ref::<std::io::Error>() - .expect("The error should have been `std::io::Error`."); - assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted); - } - - async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B> - where - B: Body + Send + 'static, - B::Data: Send, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - { - let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap()); - let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap(); - - tokio::spawn(connection); - - sender - } - - async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B> - where - B: Body + Unpin + Send + 'static, - B::Data: Send, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - { - let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap()); - let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new()) - .handshake(stream) - .await - .unwrap(); - - tokio::spawn(connection); - - sender - } - - async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr { - let addr: SocketAddr = ([127, 0, 0, 1], 0).into(); - let listener = TcpListener::bind(addr).await.unwrap(); - - let local_addr = listener.local_addr().unwrap(); - - tokio::spawn(async move { - loop { - let (stream, _) = listener.accept().await.unwrap(); - let stream = TokioIo::new(stream); - tokio::task::spawn(async move { - let mut builder = auto::Builder::new(TokioExecutor::new()); - if h1_only { - builder = builder.http1_only(); - builder.serve_connection(stream, service_fn(hello)).await - } else if h2_only { - builder = builder.http2_only(); - builder.serve_connection(stream, service_fn(hello)).await - } else { - builder - .http2() - .max_header_list_size(4096) - .serve_connection_with_upgrades(stream, service_fn(hello)) - .await - } - .unwrap(); - }); - } - }); - - local_addr - } - - async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> { - Ok(Response::new(Full::new(Bytes::from(BODY)))) - } -} diff --git a/vendor/hyper-util/src/server/conn/auto/upgrade.rs b/vendor/hyper-util/src/server/conn/auto/upgrade.rs deleted file mode 100644 index 8d94c409..00000000 --- a/vendor/hyper-util/src/server/conn/auto/upgrade.rs +++ /dev/null @@ -1,68 +0,0 @@ -//! Upgrade utilities. - -use bytes::{Bytes, BytesMut}; -use hyper::{ - rt::{Read, Write}, - upgrade::Upgraded, -}; - -use crate::common::rewind::Rewind; - -/// Tries to downcast the internal trait object to the type passed. -/// -/// On success, returns the downcasted parts. On error, returns the Upgraded back. -/// This is a kludge to work around the fact that the machinery provided by -/// [`hyper_util::server::conn::auto`] wraps the inner `T` with a private type -/// that is not reachable from outside the crate. -/// -/// [`hyper_util::server::conn::auto`]: crate::server::conn::auto -/// -/// This kludge will be removed when this machinery is added back to the main -/// `hyper` code. -pub fn downcast<T>(upgraded: Upgraded) -> Result<Parts<T>, Upgraded> -where - T: Read + Write + Unpin + 'static, -{ - let hyper::upgrade::Parts { - io: rewind, - mut read_buf, - .. - } = upgraded.downcast::<Rewind<T>>()?; - - if let Some(pre) = rewind.pre { - read_buf = if read_buf.is_empty() { - pre - } else { - let mut buf = BytesMut::from(read_buf); - - buf.extend_from_slice(&pre); - - buf.freeze() - }; - } - - Ok(Parts { - io: rewind.inner, - read_buf, - }) -} - -/// The deconstructed parts of an [`Upgraded`] type. -/// -/// Includes the original IO type, and a read buffer of bytes that the -/// HTTP state machine may have already read before completing an upgrade. -#[derive(Debug)] -#[non_exhaustive] -pub struct Parts<T> { - /// The original IO object used before the upgrade. - pub io: T, - /// A buffer of bytes that have been read but not processed as HTTP. - /// - /// For instance, if the `Connection` is used for an HTTP upgrade request, - /// it is possible the server sent back the first bytes of the new protocol - /// along with the response upgrade. - /// - /// You will want to check for any existing bytes if you plan to continue - /// communicating on the IO object. - pub read_buf: Bytes, -} diff --git a/vendor/hyper-util/src/server/conn/mod.rs b/vendor/hyper-util/src/server/conn/mod.rs deleted file mode 100644 index b23503a1..00000000 --- a/vendor/hyper-util/src/server/conn/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -//! Connection utilities. - -#[cfg(any(feature = "http1", feature = "http2"))] -pub mod auto; diff --git a/vendor/hyper-util/src/server/graceful.rs b/vendor/hyper-util/src/server/graceful.rs deleted file mode 100644 index b367fc8a..00000000 --- a/vendor/hyper-util/src/server/graceful.rs +++ /dev/null @@ -1,488 +0,0 @@ -//! Utility to gracefully shutdown a server. -//! -//! This module provides a [`GracefulShutdown`] type, -//! which can be used to gracefully shutdown a server. -//! -//! See <https://github.com/hyperium/hyper-util/blob/master/examples/server_graceful.rs> -//! for an example of how to use this. - -use std::{ - fmt::{self, Debug}, - future::Future, - pin::Pin, - task::{self, Poll}, -}; - -use pin_project_lite::pin_project; -use tokio::sync::watch; - -/// A graceful shutdown utility -// Purposefully not `Clone`, see `watcher()` method for why. -pub struct GracefulShutdown { - tx: watch::Sender<()>, -} - -/// A watcher side of the graceful shutdown. -/// -/// This type can only watch a connection, it cannot trigger a shutdown. -/// -/// Call [`GracefulShutdown::watcher()`] to construct one of these. -pub struct Watcher { - rx: watch::Receiver<()>, -} - -impl GracefulShutdown { - /// Create a new graceful shutdown helper. - pub fn new() -> Self { - let (tx, _) = watch::channel(()); - Self { tx } - } - - /// Wrap a future for graceful shutdown watching. - pub fn watch<C: GracefulConnection>(&self, conn: C) -> impl Future<Output = C::Output> { - self.watcher().watch(conn) - } - - /// Create an owned type that can watch a connection. - /// - /// This method allows created an owned type that can be sent onto another - /// task before calling [`Watcher::watch()`]. - // Internal: this function exists because `Clone` allows footguns. - // If the `tx` were cloned (or the `rx`), race conditions can happens where - // one task starting a shutdown is scheduled and interwined with a task - // starting to watch a connection, and the "watch version" is one behind. - pub fn watcher(&self) -> Watcher { - let rx = self.tx.subscribe(); - Watcher { rx } - } - - /// Signal shutdown for all watched connections. - /// - /// This returns a `Future` which will complete once all watched - /// connections have shutdown. - pub async fn shutdown(self) { - let Self { tx } = self; - - // signal all the watched futures about the change - let _ = tx.send(()); - // and then wait for all of them to complete - tx.closed().await; - } - - /// Returns the number of the watching connections. - pub fn count(&self) -> usize { - self.tx.receiver_count() - } -} - -impl Debug for GracefulShutdown { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("GracefulShutdown").finish() - } -} - -impl Default for GracefulShutdown { - fn default() -> Self { - Self::new() - } -} - -impl Watcher { - /// Wrap a future for graceful shutdown watching. - pub fn watch<C: GracefulConnection>(self, conn: C) -> impl Future<Output = C::Output> { - let Watcher { mut rx } = self; - GracefulConnectionFuture::new(conn, async move { - let _ = rx.changed().await; - // hold onto the rx until the watched future is completed - rx - }) - } -} - -impl Debug for Watcher { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("GracefulWatcher").finish() - } -} - -pin_project! { - struct GracefulConnectionFuture<C, F: Future> { - #[pin] - conn: C, - #[pin] - cancel: F, - #[pin] - // If cancelled, this is held until the inner conn is done. - cancelled_guard: Option<F::Output>, - } -} - -impl<C, F: Future> GracefulConnectionFuture<C, F> { - fn new(conn: C, cancel: F) -> Self { - Self { - conn, - cancel, - cancelled_guard: None, - } - } -} - -impl<C, F: Future> Debug for GracefulConnectionFuture<C, F> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("GracefulConnectionFuture").finish() - } -} - -impl<C, F> Future for GracefulConnectionFuture<C, F> -where - C: GracefulConnection, - F: Future, -{ - type Output = C::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { - let mut this = self.project(); - if this.cancelled_guard.is_none() { - if let Poll::Ready(guard) = this.cancel.poll(cx) { - this.cancelled_guard.set(Some(guard)); - this.conn.as_mut().graceful_shutdown(); - } - } - this.conn.poll(cx) - } -} - -/// An internal utility trait as an umbrella target for all (hyper) connection -/// types that the [`GracefulShutdown`] can watch. -pub trait GracefulConnection: Future<Output = Result<(), Self::Error>> + private::Sealed { - /// The error type returned by the connection when used as a future. - type Error; - - /// Start a graceful shutdown process for this connection. - fn graceful_shutdown(self: Pin<&mut Self>); -} - -#[cfg(feature = "http1")] -impl<I, B, S> GracefulConnection for hyper::server::conn::http1::Connection<I, S> -where - S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>, - S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static, - B: hyper::body::Body + 'static, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, -{ - type Error = hyper::Error; - - fn graceful_shutdown(self: Pin<&mut Self>) { - hyper::server::conn::http1::Connection::graceful_shutdown(self); - } -} - -#[cfg(feature = "http2")] -impl<I, B, S, E> GracefulConnection for hyper::server::conn::http2::Connection<I, S, E> -where - S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>, - S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static, - B: hyper::body::Body + 'static, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>, -{ - type Error = hyper::Error; - - fn graceful_shutdown(self: Pin<&mut Self>) { - hyper::server::conn::http2::Connection::graceful_shutdown(self); - } -} - -#[cfg(feature = "server-auto")] -impl<I, B, S, E> GracefulConnection for crate::server::conn::auto::Connection<'_, I, S, E> -where - S: hyper::service::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>>, - S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - S::Future: 'static, - I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static, - B: hyper::body::Body + 'static, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>, -{ - type Error = Box<dyn std::error::Error + Send + Sync>; - - fn graceful_shutdown(self: Pin<&mut Self>) { - crate::server::conn::auto::Connection::graceful_shutdown(self); - } -} - -#[cfg(feature = "server-auto")] -impl<I, B, S, E> GracefulConnection - for crate::server::conn::auto::UpgradeableConnection<'_, I, S, E> -where - S: hyper::service::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>>, - S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - S::Future: 'static, - I: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static, - B: hyper::body::Body + 'static, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>, -{ - type Error = Box<dyn std::error::Error + Send + Sync>; - - fn graceful_shutdown(self: Pin<&mut Self>) { - crate::server::conn::auto::UpgradeableConnection::graceful_shutdown(self); - } -} - -mod private { - pub trait Sealed {} - - #[cfg(feature = "http1")] - impl<I, B, S> Sealed for hyper::server::conn::http1::Connection<I, S> - where - S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>, - S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static, - B: hyper::body::Body + 'static, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - { - } - - #[cfg(feature = "http1")] - impl<I, B, S> Sealed for hyper::server::conn::http1::UpgradeableConnection<I, S> - where - S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>, - S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static, - B: hyper::body::Body + 'static, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - { - } - - #[cfg(feature = "http2")] - impl<I, B, S, E> Sealed for hyper::server::conn::http2::Connection<I, S, E> - where - S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B>, - S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static, - B: hyper::body::Body + 'static, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>, - { - } - - #[cfg(feature = "server-auto")] - impl<I, B, S, E> Sealed for crate::server::conn::auto::Connection<'_, I, S, E> - where - S: hyper::service::Service< - http::Request<hyper::body::Incoming>, - Response = http::Response<B>, - >, - S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - S::Future: 'static, - I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static, - B: hyper::body::Body + 'static, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>, - { - } - - #[cfg(feature = "server-auto")] - impl<I, B, S, E> Sealed for crate::server::conn::auto::UpgradeableConnection<'_, I, S, E> - where - S: hyper::service::Service< - http::Request<hyper::body::Incoming>, - Response = http::Response<B>, - >, - S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - S::Future: 'static, - I: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static, - B: hyper::body::Body + 'static, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - E: hyper::rt::bounds::Http2ServerConnExec<S::Future, B>, - { - } -} - -#[cfg(test)] -mod test { - use super::*; - use pin_project_lite::pin_project; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Arc; - - pin_project! { - #[derive(Debug)] - struct DummyConnection<F> { - #[pin] - future: F, - shutdown_counter: Arc<AtomicUsize>, - } - } - - impl<F> private::Sealed for DummyConnection<F> {} - - impl<F: Future> GracefulConnection for DummyConnection<F> { - type Error = (); - - fn graceful_shutdown(self: Pin<&mut Self>) { - self.shutdown_counter.fetch_add(1, Ordering::SeqCst); - } - } - - impl<F: Future> Future for DummyConnection<F> { - type Output = Result<(), ()>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { - match self.project().future.poll(cx) { - Poll::Ready(_) => Poll::Ready(Ok(())), - Poll::Pending => Poll::Pending, - } - } - } - - #[cfg(not(miri))] - #[tokio::test] - async fn test_graceful_shutdown_ok() { - let graceful = GracefulShutdown::new(); - let shutdown_counter = Arc::new(AtomicUsize::new(0)); - let (dummy_tx, _) = tokio::sync::broadcast::channel(1); - - for i in 1..=3 { - let mut dummy_rx = dummy_tx.subscribe(); - let shutdown_counter = shutdown_counter.clone(); - - let future = async move { - tokio::time::sleep(std::time::Duration::from_millis(i * 10)).await; - let _ = dummy_rx.recv().await; - }; - let dummy_conn = DummyConnection { - future, - shutdown_counter, - }; - let conn = graceful.watch(dummy_conn); - tokio::spawn(async move { - conn.await.unwrap(); - }); - } - - assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0); - let _ = dummy_tx.send(()); - - tokio::select! { - _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => { - panic!("timeout") - }, - _ = graceful.shutdown() => { - assert_eq!(shutdown_counter.load(Ordering::SeqCst), 3); - } - } - } - - #[cfg(not(miri))] - #[tokio::test] - async fn test_graceful_shutdown_delayed_ok() { - let graceful = GracefulShutdown::new(); - let shutdown_counter = Arc::new(AtomicUsize::new(0)); - - for i in 1..=3 { - let shutdown_counter = shutdown_counter.clone(); - - //tokio::time::sleep(std::time::Duration::from_millis(i * 5)).await; - let future = async move { - tokio::time::sleep(std::time::Duration::from_millis(i * 50)).await; - }; - let dummy_conn = DummyConnection { - future, - shutdown_counter, - }; - let conn = graceful.watch(dummy_conn); - tokio::spawn(async move { - conn.await.unwrap(); - }); - } - - assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0); - - tokio::select! { - _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => { - panic!("timeout") - }, - _ = graceful.shutdown() => { - assert_eq!(shutdown_counter.load(Ordering::SeqCst), 3); - } - } - } - - #[cfg(not(miri))] - #[tokio::test] - async fn test_graceful_shutdown_multi_per_watcher_ok() { - let graceful = GracefulShutdown::new(); - let shutdown_counter = Arc::new(AtomicUsize::new(0)); - - for i in 1..=3 { - let shutdown_counter = shutdown_counter.clone(); - - let mut futures = Vec::new(); - for u in 1..=i { - let future = tokio::time::sleep(std::time::Duration::from_millis(u * 50)); - let dummy_conn = DummyConnection { - future, - shutdown_counter: shutdown_counter.clone(), - }; - let conn = graceful.watch(dummy_conn); - futures.push(conn); - } - tokio::spawn(async move { - futures_util::future::join_all(futures).await; - }); - } - - assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0); - - tokio::select! { - _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => { - panic!("timeout") - }, - _ = graceful.shutdown() => { - assert_eq!(shutdown_counter.load(Ordering::SeqCst), 6); - } - } - } - - #[cfg(not(miri))] - #[tokio::test] - async fn test_graceful_shutdown_timeout() { - let graceful = GracefulShutdown::new(); - let shutdown_counter = Arc::new(AtomicUsize::new(0)); - - for i in 1..=3 { - let shutdown_counter = shutdown_counter.clone(); - - let future = async move { - if i == 1 { - std::future::pending::<()>().await - } else { - std::future::ready(()).await - } - }; - let dummy_conn = DummyConnection { - future, - shutdown_counter, - }; - let conn = graceful.watch(dummy_conn); - tokio::spawn(async move { - conn.await.unwrap(); - }); - } - - assert_eq!(shutdown_counter.load(Ordering::SeqCst), 0); - - tokio::select! { - _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => { - assert_eq!(shutdown_counter.load(Ordering::SeqCst), 3); - }, - _ = graceful.shutdown() => { - panic!("shutdown should not be completed: as not all our conns finish") - } - } - } -} diff --git a/vendor/hyper-util/src/server/mod.rs b/vendor/hyper-util/src/server/mod.rs deleted file mode 100644 index a4838ac5..00000000 --- a/vendor/hyper-util/src/server/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! Server utilities. - -pub mod conn; - -#[cfg(feature = "server-graceful")] -pub mod graceful; |
